feat(releases): retry failed downloads (#491)

* feat(download): implement parsing and retry

* feat: retry torrent file downloads

* refactor: error handling downloadtorrentfile

* feat: add tests for download torrent file

* build: add runs-on self-hosted

* build: add runs-on self-hosted
This commit is contained in:
Kyle Sanderson 2022-10-19 12:52:31 -07:00 committed by GitHub
parent 5183f7683a
commit 2d8f7aeb4e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 349 additions and 143 deletions

View file

@ -16,7 +16,7 @@ permissions:
jobs:
web:
name: Build web
runs-on: ubuntu-latest
runs-on: self-hosted
steps:
- name: Checkout
uses: actions/checkout@v3
@ -44,7 +44,7 @@ jobs:
goreleaserbuild:
name: Build Go binaries
if: github.event_name == 'pull_request'
runs-on: ubuntu-latest
runs-on: self-hosted
needs: web
steps:
- name: Checkout
@ -82,7 +82,7 @@ jobs:
goreleaser:
name: Build & publish binaries and images
if: startsWith(github.ref, 'refs/tags/')
runs-on: ubuntu-latest
runs-on: self-hosted
needs: web
steps:
- name: Checkout

View file

@ -117,7 +117,7 @@ func (s *service) delugeV1(client *domain.DownloadClient, action domain.Action,
}
if release.TorrentTmpFile == "" {
if err = release.DownloadTorrentFile(); err != nil {
if err := release.DownloadTorrentFile(); err != nil {
s.log.Error().Err(err).Msgf("could not download torrent file for release: %v", release.TorrentName)
return nil, err
}
@ -206,7 +206,7 @@ func (s *service) delugeV2(client *domain.DownloadClient, action domain.Action,
}
if release.TorrentTmpFile == "" {
if err = release.DownloadTorrentFile(); err != nil {
if err := release.DownloadTorrentFile(); err != nil {
s.log.Error().Err(err).Msgf("could not download torrent file for release: %v", release.TorrentName)
return nil, err
}

View file

@ -68,8 +68,7 @@ func (s *service) qbittorrent(action domain.Action, release domain.Release) ([]s
}
if release.TorrentTmpFile == "" {
err = release.DownloadTorrentFile()
if err != nil {
if err := release.DownloadTorrentFile(); err != nil {
return nil, errors.Wrap(err, "error downloading torrent file for release: %v", release.TorrentName)
}
}
@ -279,11 +278,12 @@ func (s *service) reannounceTorrent(qb *qbittorrent.Client, action domain.Action
// Check if status not working or something else
// https://github.com/qbittorrent/qBittorrent/wiki/WebUI-API-(qBittorrent-4.1)#get-torrent-trackers
// 0 Tracker is disabled (used for DHT, PeX, and LSD)
// 1 Tracker has not been contacted yet
// 2 Tracker has been contacted and is working
// 3 Tracker is updating
// 4 Tracker has been contacted, but it is not working (or doesn't send proper replies)
//
// 0 Tracker is disabled (used for DHT, PeX, and LSD)
// 1 Tracker has not been contacted yet
// 2 Tracker has been contacted and is working
// 3 Tracker is updating
// 4 Tracker has been contacted, but it is not working (or doesn't send proper replies)
func isTrackerStatusOK(trackers []qbittorrent.TorrentTracker) bool {
for _, tracker := range trackers {
if tracker.Status == qbittorrent.TrackerStatusDisabled {

View file

@ -28,7 +28,7 @@ func (s *service) rtorrent(action domain.Action, release domain.Release) ([]stri
var rejections []string
if release.TorrentTmpFile == "" {
if err = release.DownloadTorrentFile(); err != nil {
if err := release.DownloadTorrentFile(); err != nil {
s.log.Error().Err(err).Msgf("could not download torrent file for release: %v", release.TorrentName)
return nil, err
}

View file

@ -28,8 +28,7 @@ func (s *service) transmission(action domain.Action, release domain.Release) ([]
var rejections []string
if release.TorrentTmpFile == "" {
err = release.DownloadTorrentFile()
if err != nil {
if err := release.DownloadTorrentFile(); err != nil {
s.log.Error().Err(err).Msgf("could not download torrent file for release: %v", release.TorrentName)
return nil, err
}

View file

@ -1,14 +1,15 @@
package client
import (
"errors"
"io"
"net/http"
"os"
"time"
"github.com/anacrolix/torrent/metainfo"
"github.com/autobrr/autobrr/pkg/errors"
"github.com/anacrolix/torrent/metainfo"
"github.com/avast/retry-go"
"github.com/rs/zerolog/log"
)
@ -35,58 +36,6 @@ func NewHttpClient() *HttpClient {
}
}
func (c *HttpClient) DownloadFile(url string, opts map[string]string) (*DownloadFileResponse, error) {
if url == "" {
return nil, errors.New("download_file: url can't be empty")
}
// Create tmp file
tmpFile, err := os.CreateTemp("", "autobrr-")
if err != nil {
log.Error().Stack().Err(err).Msg("error creating temp file")
return nil, err
}
defer tmpFile.Close()
// Get the data
resp, err := http.Get(url)
if err != nil {
log.Error().Stack().Err(err).Msgf("error downloading file from %v", url)
return nil, err
}
defer resp.Body.Close()
// retry logic
if resp.StatusCode != http.StatusOK {
log.Error().Stack().Err(err).Msgf("error downloading file from: %v - bad status: %d", url, resp.StatusCode)
return nil, err
}
// Write the body to file
_, err = io.Copy(tmpFile, resp.Body)
if err != nil {
log.Error().Stack().Err(err).Msgf("error writing downloaded file: %v", tmpFile.Name())
return nil, err
}
// remove file if fail
res := DownloadFileResponse{
Body: &resp.Body,
FileName: tmpFile.Name(),
}
if res.FileName == "" || res.Body == nil {
log.Error().Stack().Err(err).Msgf("tmp file error - empty body: %v", url)
return nil, errors.New("error downloading file, no tmp file")
}
log.Debug().Msgf("successfully downloaded file: %v", tmpFile.Name())
return &res, nil
}
func (c *HttpClient) DownloadTorrentFile(url string, opts map[string]string) (*DownloadTorrentFileResponse, error) {
if url == "" {
return nil, errors.New("download_file: url can't be empty")
@ -100,47 +49,63 @@ func (c *HttpClient) DownloadTorrentFile(url string, opts map[string]string) (*D
}
defer tmpFile.Close()
// Get the data
resp, err := http.Get(url)
res := &DownloadTorrentFileResponse{}
// try request and if fail run 3 retries
err = retry.Do(func() error {
resp, err := http.Get(url)
if err != nil {
return errors.New("error downloading file: %q", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return errors.New("error downloading file bad status: %d", resp.StatusCode)
}
nuke := func() {
tmpFile.Seek(0, io.SeekStart)
tmpFile.Truncate(0)
}
// Write the body to file
_, err = io.Copy(tmpFile, resp.Body)
if err != nil {
nuke()
return errors.New("error writing downloaded file: %v | %q", tmpFile.Name(), err)
}
meta, err := metainfo.Load(resp.Body)
if err != nil {
nuke()
return errors.New("metainfo could not load file contents: %v | %q", tmpFile.Name(), err)
}
res = &DownloadTorrentFileResponse{
MetaInfo: meta,
TmpFileName: tmpFile.Name(),
}
if res.TmpFileName == "" || res.MetaInfo == nil {
nuke()
return errors.New("tmp file error - empty body")
}
if len(res.MetaInfo.InfoBytes) < 1 {
nuke()
return errors.New("could not read infohash")
}
log.Debug().Msgf("successfully downloaded file: %v", tmpFile.Name())
return nil
},
//retry.OnRetry(func(n uint, err error) { c.log.Printf("%q: attempt %d - %v\n", err, n, url) }),
retry.Delay(time.Second*5),
retry.Attempts(3),
retry.MaxJitter(time.Second*1))
if err != nil {
log.Error().Stack().Err(err).Msgf("error downloading file from %v", url)
return nil, err
}
defer resp.Body.Close()
// retry logic
if resp.StatusCode != http.StatusOK {
log.Error().Stack().Err(err).Msgf("error downloading file from: %v - bad status: %d", url, resp.StatusCode)
return nil, err
res = nil
}
// Write the body to file
_, err = io.Copy(tmpFile, resp.Body)
if err != nil {
log.Error().Stack().Err(err).Msgf("error writing downloaded file: %v", tmpFile.Name())
return nil, err
}
meta, err := metainfo.Load(resp.Body)
if err != nil {
log.Error().Stack().Err(err).Msgf("metainfo could not load file contents: %v", tmpFile.Name())
return nil, err
}
// remove file if fail
res := DownloadTorrentFileResponse{
MetaInfo: meta,
TmpFileName: tmpFile.Name(),
}
if res.TmpFileName == "" || res.MetaInfo == nil {
log.Error().Stack().Err(err).Msgf("tmp file error - empty body: %v", url)
return nil, errors.New("error downloading file, no tmp file")
}
log.Debug().Msgf("successfully downloaded file: %v", tmpFile.Name())
return &res, nil
return res, err
}

View file

@ -17,6 +17,7 @@ import (
"github.com/autobrr/autobrr/pkg/errors"
"github.com/anacrolix/torrent/metainfo"
"github.com/avast/retry-go"
"github.com/dustin/go-humanize"
"github.com/moistari/rls"
"golang.org/x/net/publicsuffix"
@ -222,6 +223,8 @@ func (r *Release) ParseString(title string) {
return
}
var ErrUnrecoverableError = errors.New("unrecoverable error")
func (r *Release) ParseReleaseTagsString(tags string) {
// trim delimiters and closest space
re := regexp.MustCompile(`\| |/ |, `)
@ -292,9 +295,10 @@ func (r *Release) DownloadTorrentFile() error {
client := &http.Client{
Transport: customTransport,
Jar: jar,
Timeout: time.Second * 45,
}
req, err := http.NewRequest("GET", r.TorrentURL, nil)
req, err := http.NewRequest(http.MethodGet, r.TorrentURL, nil)
if err != nil {
return errors.Wrap(err, "error downloading file")
}
@ -305,19 +309,6 @@ func (r *Release) DownloadTorrentFile() error {
req.Header.Set("Cookie", r.RawCookie)
}
// Get the data
resp, err := client.Do(req)
if err != nil {
return errors.Wrap(err, "error downloading file")
}
defer resp.Body.Close()
// retry logic
if resp.StatusCode != http.StatusOK {
return errors.New("error downloading torrent (%v) file (%v) from '%v' - status code: %d", r.TorrentName, r.TorrentURL, r.Indexer, resp.StatusCode)
}
// Create tmp file
tmpFile, err := os.CreateTemp("", "autobrr-")
if err != nil {
@ -325,29 +316,65 @@ func (r *Release) DownloadTorrentFile() error {
}
defer tmpFile.Close()
// Write the body to file
_, err = io.Copy(tmpFile, resp.Body)
if err != nil {
return errors.Wrap(err, "error writing downloaded file: %v", tmpFile.Name())
}
errFunc := retry.Do(func() error {
// Get the data
resp, err := client.Do(req)
if err != nil {
return errors.Wrap(err, "error downloading file")
}
defer resp.Body.Close()
meta, err := metainfo.LoadFromFile(tmpFile.Name())
if err != nil {
return errors.Wrap(err, "metainfo could not load file contents: %v", tmpFile.Name())
}
if resp.StatusCode != http.StatusOK {
unRecoverableErr := errors.Wrap(ErrUnrecoverableError, "unrecoverable error downloading torrent (%v) file (%v) from '%v' - status code: %d", r.TorrentName, r.TorrentURL, r.Indexer, resp.StatusCode)
torrentMetaInfo, err := meta.UnmarshalInfo()
if err != nil {
return errors.Wrap(err, "metainfo could not unmarshal info from torrent: %v", tmpFile.Name())
}
if resp.StatusCode == 401 || resp.StatusCode == 403 || resp.StatusCode == 404 || resp.StatusCode == 405 {
return retry.Unrecoverable(unRecoverableErr)
}
r.TorrentTmpFile = tmpFile.Name()
r.TorrentHash = meta.HashInfoBytes().String()
r.Size = uint64(torrentMetaInfo.TotalLength())
return errors.New("unexpected status: %v", resp.StatusCode)
}
// remove file if fail
resetTmpFile := func() {
tmpFile.Seek(0, io.SeekStart)
tmpFile.Truncate(0)
}
return nil
// Write the body to file
if _, err := io.Copy(tmpFile, resp.Body); err != nil {
resetTmpFile()
return errors.Wrap(err, "error writing downloaded file: %v", tmpFile.Name())
}
meta, err := metainfo.LoadFromFile(tmpFile.Name())
if err != nil {
resetTmpFile()
return errors.Wrap(err, "metainfo could not load file contents: %v", tmpFile.Name())
}
torrentMetaInfo, err := meta.UnmarshalInfo()
if err != nil {
resetTmpFile()
return errors.Wrap(err, "metainfo could not unmarshal info from torrent: %v", tmpFile.Name())
}
hashInfoBytes := meta.HashInfoBytes().Bytes()
if len(hashInfoBytes) < 1 {
resetTmpFile()
return errors.New("could not read infohash")
}
r.TorrentTmpFile = tmpFile.Name()
r.TorrentHash = meta.HashInfoBytes().String()
r.Size = uint64(torrentMetaInfo.TotalLength())
return nil
},
retry.Delay(time.Second*3),
retry.Attempts(3),
retry.MaxJitter(time.Second*1),
)
return errFunc
}
func (r *Release) addRejection(reason string) {

View file

@ -1,8 +1,15 @@
package domain
import (
"fmt"
"net/http"
"net/http/httptest"
"os"
"strings"
"testing"
"time"
"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
)
@ -630,3 +637,212 @@ func TestRelease_ParseString(t *testing.T) {
})
}
}
var trackerLessTestTorrent = `d7:comment19:This is just a test10:created by12:Johnny Bravo13:creation datei1430648794e8:encoding5:UTF-84:infod6:lengthi1128e4:name12:testfile.bin12:piece lengthi32768e6:pieces20:Õˆë =UŒäiÎ^æ °Eâ?ÇÒe5:nodesl35:udp://tracker.openbittorrent.com:8035:udp://tracker.openbittorrent.com:80ee`
func TestRelease_DownloadTorrentFile(t *testing.T) {
// disable logger
zerolog.SetGlobalLevel(zerolog.Disabled)
mux := http.NewServeMux()
ts := httptest.NewServer(mux)
defer ts.Close()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
if strings.Contains(r.RequestURI, "401") {
w.WriteHeader(http.StatusUnauthorized)
w.Write([]byte("unauthorized"))
return
}
if strings.Contains(r.RequestURI, "403") {
w.WriteHeader(http.StatusUnauthorized)
w.Write([]byte("forbidden"))
return
}
if strings.Contains(r.RequestURI, "404") {
w.WriteHeader(http.StatusUnauthorized)
w.Write([]byte("not found"))
return
}
if strings.Contains(r.RequestURI, "405") {
w.WriteHeader(http.StatusUnauthorized)
w.Write([]byte("method not allowed"))
return
}
if strings.Contains(r.RequestURI, "file.torrent") {
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/x-bittorrent")
payload, _ := os.ReadFile("testdata/archlinux-2011.08.19-netinstall-i686.iso.torrent")
w.Write(payload)
return
}
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte("internal error"))
})
type fields struct {
ID int64
FilterStatus ReleaseFilterStatus
Rejections []string
Indexer string
FilterName string
Protocol ReleaseProtocol
Implementation ReleaseImplementation
Timestamp time.Time
GroupID string
TorrentID string
TorrentURL string
TorrentTmpFile string
TorrentDataRawBytes []byte
TorrentHash string
TorrentName string
Size uint64
Title string
Category string
Categories []string
Season int
Episode int
Year int
Resolution string
Source string
Codec []string
Container string
HDR []string
Audio []string
AudioChannels string
Group string
Region string
Language string
Proper bool
Repack bool
Website string
Artists string
Type string
LogScore int
IsScene bool
Origin string
Tags []string
ReleaseTags string
Freeleech bool
FreeleechPercent int
Bonus []string
Uploader string
PreTime string
Other []string
RawCookie string
AdditionalSizeCheckRequired bool
FilterID int
Filter *Filter
ActionStatus []ReleaseActionStatus
}
tests := []struct {
name string
fields fields
wantErr assert.ErrorAssertionFunc
}{
{
name: "401",
fields: fields{
Indexer: "mock-indexer",
TorrentName: "Test.Release-GROUP",
TorrentURL: fmt.Sprintf("%v/%v", ts.URL, 401),
},
wantErr: func(t assert.TestingT, err error, i ...interface{}) bool {
if err != nil {
return true
}
return false
},
},
{
name: "403",
fields: fields{
Indexer: "mock-indexer",
TorrentName: "Test.Release-GROUP",
TorrentURL: fmt.Sprintf("%v/%v", ts.URL, 403),
},
wantErr: func(t assert.TestingT, err error, i ...interface{}) bool {
if err != nil {
return true
}
return false
},
},
{
name: "ok",
fields: fields{
Indexer: "mock-indexer",
TorrentName: "Test.Release-GROUP",
TorrentURL: fmt.Sprintf("%v/%v", ts.URL, "file.torrent"),
},
wantErr: func(t assert.TestingT, err error, i ...interface{}) bool {
if err != nil {
return true
}
return false
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := &Release{
ID: tt.fields.ID,
FilterStatus: tt.fields.FilterStatus,
Rejections: tt.fields.Rejections,
Indexer: tt.fields.Indexer,
FilterName: tt.fields.FilterName,
Protocol: tt.fields.Protocol,
Implementation: tt.fields.Implementation,
Timestamp: tt.fields.Timestamp,
GroupID: tt.fields.GroupID,
TorrentID: tt.fields.TorrentID,
TorrentURL: tt.fields.TorrentURL,
TorrentTmpFile: tt.fields.TorrentTmpFile,
TorrentDataRawBytes: tt.fields.TorrentDataRawBytes,
TorrentHash: tt.fields.TorrentHash,
TorrentName: tt.fields.TorrentName,
Size: tt.fields.Size,
Title: tt.fields.Title,
Category: tt.fields.Category,
Categories: tt.fields.Categories,
Season: tt.fields.Season,
Episode: tt.fields.Episode,
Year: tt.fields.Year,
Resolution: tt.fields.Resolution,
Source: tt.fields.Source,
Codec: tt.fields.Codec,
Container: tt.fields.Container,
HDR: tt.fields.HDR,
Audio: tt.fields.Audio,
AudioChannels: tt.fields.AudioChannels,
Group: tt.fields.Group,
Region: tt.fields.Region,
Language: tt.fields.Language,
Proper: tt.fields.Proper,
Repack: tt.fields.Repack,
Website: tt.fields.Website,
Artists: tt.fields.Artists,
Type: tt.fields.Type,
LogScore: tt.fields.LogScore,
IsScene: tt.fields.IsScene,
Origin: tt.fields.Origin,
Tags: tt.fields.Tags,
ReleaseTags: tt.fields.ReleaseTags,
Freeleech: tt.fields.Freeleech,
FreeleechPercent: tt.fields.FreeleechPercent,
Bonus: tt.fields.Bonus,
Uploader: tt.fields.Uploader,
PreTime: tt.fields.PreTime,
Other: tt.fields.Other,
RawCookie: tt.fields.RawCookie,
AdditionalSizeCheckRequired: tt.fields.AdditionalSizeCheckRequired,
FilterID: tt.fields.FilterID,
Filter: tt.fields.Filter,
ActionStatus: tt.fields.ActionStatus,
}
tt.wantErr(t, r.DownloadTorrentFile(), fmt.Sprintf("DownloadTorrentFile()"))
})
}
}

View file

@ -409,8 +409,7 @@ func (s *service) AdditionalSizeCheck(f domain.Filter, release *domain.Release)
s.log.Trace().Msgf("filter.Service.AdditionalSizeCheck: (%v) preparing to download torrent metafile", f.Name)
// if indexer doesn't have api, download torrent and add to tmpPath
err := release.DownloadTorrentFile()
if err != nil {
if err := release.DownloadTorrentFile(); err != nil {
s.log.Error().Stack().Err(err).Msgf("filter.Service.AdditionalSizeCheck: (%v) could not download torrent file with id: '%v' from: %v", f.Name, release.TorrentID, release.Indexer)
return false, err
}