mirror of
https://github.com/idanoo/autobrr
synced 2025-07-23 00:39:13 +00:00
feat(feeds): improve RSS (#502)
* feat(feeds): improve rss * save last_run time * remove interval check * refactor feed job keys * add rss test * add max_age check * feat(feeds): rss basic freeleech parsing * feat(feeds): rss cookie support * feat(feeds): db get max_age * feat(feeds): update log messages * feat(feeds): pass cookie to release for download * feat(feeds): improve size parsing * feat(feeds): improve datetime check
This commit is contained in:
parent
ac988f28f4
commit
e2bb14afa4
15 changed files with 741 additions and 209 deletions
80
internal/feed/client.go
Normal file
80
internal/feed/client.go
Normal file
|
@ -0,0 +1,80 @@
|
|||
package feed
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"net/http"
|
||||
"net/http/cookiejar"
|
||||
"time"
|
||||
|
||||
"github.com/mmcdole/gofeed"
|
||||
"golang.org/x/net/publicsuffix"
|
||||
)
|
||||
|
||||
type RSSParser struct {
|
||||
parser *gofeed.Parser
|
||||
http *http.Client
|
||||
cookie string
|
||||
}
|
||||
|
||||
// NewFeedParser wraps the gofeed.Parser using our own http client for full control
|
||||
func NewFeedParser(timeout time.Duration, cookie string) *RSSParser {
|
||||
//store cookies in jar
|
||||
jarOptions := &cookiejar.Options{PublicSuffixList: publicsuffix.List}
|
||||
jar, _ := cookiejar.New(jarOptions)
|
||||
|
||||
customTransport := http.DefaultTransport.(*http.Transport).Clone()
|
||||
customTransport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
|
||||
httpClient := &http.Client{
|
||||
Timeout: time.Second * 60,
|
||||
Transport: customTransport,
|
||||
Jar: jar,
|
||||
}
|
||||
|
||||
c := &RSSParser{
|
||||
parser: gofeed.NewParser(),
|
||||
http: httpClient,
|
||||
cookie: cookie,
|
||||
}
|
||||
|
||||
c.http.Timeout = timeout
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *RSSParser) ParseURLWithContext(ctx context.Context, feedURL string) (feed *gofeed.Feed, err error) {
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, feedURL, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
req.Header.Set("User-Agent", "Gofeed/1.0")
|
||||
|
||||
if c.cookie != "" {
|
||||
// set raw cookie as header
|
||||
req.Header.Set("Cookie", c.cookie)
|
||||
}
|
||||
|
||||
resp, err := c.http.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if resp != nil {
|
||||
defer func() {
|
||||
ce := resp.Body.Close()
|
||||
if ce != nil {
|
||||
err = ce
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
||||
return nil, gofeed.HTTPError{
|
||||
StatusCode: resp.StatusCode,
|
||||
Status: resp.Status,
|
||||
}
|
||||
}
|
||||
|
||||
return c.parser.Parse(resp.Body)
|
||||
}
|
|
@ -2,8 +2,10 @@ package feed
|
|||
|
||||
import (
|
||||
"context"
|
||||
"encoding/xml"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"sort"
|
||||
"regexp"
|
||||
"time"
|
||||
|
||||
"github.com/autobrr/autobrr/internal/domain"
|
||||
|
@ -15,11 +17,13 @@ import (
|
|||
)
|
||||
|
||||
type RSSJob struct {
|
||||
Feed *domain.Feed
|
||||
Name string
|
||||
IndexerIdentifier string
|
||||
Log zerolog.Logger
|
||||
URL string
|
||||
Repo domain.FeedCacheRepo
|
||||
Repo domain.FeedRepo
|
||||
CacheRepo domain.FeedCacheRepo
|
||||
ReleaseSvc release.Service
|
||||
Timeout time.Duration
|
||||
|
||||
|
@ -29,13 +33,15 @@ type RSSJob struct {
|
|||
JobID int
|
||||
}
|
||||
|
||||
func NewRSSJob(name string, indexerIdentifier string, log zerolog.Logger, url string, repo domain.FeedCacheRepo, releaseSvc release.Service, timeout time.Duration) *RSSJob {
|
||||
func NewRSSJob(feed *domain.Feed, name string, indexerIdentifier string, log zerolog.Logger, url string, repo domain.FeedRepo, cacheRepo domain.FeedCacheRepo, releaseSvc release.Service, timeout time.Duration) *RSSJob {
|
||||
return &RSSJob{
|
||||
Feed: feed,
|
||||
Name: name,
|
||||
IndexerIdentifier: indexerIdentifier,
|
||||
Log: log,
|
||||
URL: url,
|
||||
Repo: repo,
|
||||
CacheRepo: cacheRepo,
|
||||
ReleaseSvc: releaseSvc,
|
||||
Timeout: timeout,
|
||||
}
|
||||
|
@ -43,7 +49,7 @@ func NewRSSJob(name string, indexerIdentifier string, log zerolog.Logger, url st
|
|||
|
||||
func (j *RSSJob) Run() {
|
||||
if err := j.process(); err != nil {
|
||||
j.Log.Err(err).Int("attempts", j.attempts).Msg("rss feed process error")
|
||||
j.Log.Error().Err(err).Int("attempts", j.attempts).Msg("rss feed process error")
|
||||
|
||||
j.errors = append(j.errors, err)
|
||||
return
|
||||
|
@ -71,9 +77,13 @@ func (j *RSSJob) process() error {
|
|||
releases := make([]*domain.Release, 0)
|
||||
|
||||
for _, item := range items {
|
||||
rls := j.processItem(item)
|
||||
item := item
|
||||
j.Log.Debug().Msgf("item: %v", item.Title)
|
||||
|
||||
releases = append(releases, rls)
|
||||
rls := j.processItem(item)
|
||||
if rls != nil {
|
||||
releases = append(releases, rls)
|
||||
}
|
||||
}
|
||||
|
||||
// process all new releases
|
||||
|
@ -83,6 +93,16 @@ func (j *RSSJob) process() error {
|
|||
}
|
||||
|
||||
func (j *RSSJob) processItem(item *gofeed.Item) *domain.Release {
|
||||
now := time.Now()
|
||||
|
||||
if j.Feed.MaxAge > 0 {
|
||||
if item.PublishedParsed != nil {
|
||||
if !isNewerThanMaxAge(j.Feed.MaxAge, *item.PublishedParsed, now) {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
rls := domain.NewRelease(j.IndexerIdentifier)
|
||||
rls.Implementation = domain.ReleaseImplementationRSS
|
||||
|
||||
|
@ -117,6 +137,8 @@ func (j *RSSJob) processItem(item *gofeed.Item) *domain.Release {
|
|||
}
|
||||
|
||||
for _, v := range item.Categories {
|
||||
rls.Categories = append(rls.Categories, item.Categories...)
|
||||
|
||||
if len(rls.Category) != 0 {
|
||||
rls.Category += ", "
|
||||
}
|
||||
|
@ -138,6 +160,38 @@ func (j *RSSJob) processItem(item *gofeed.Item) *domain.Release {
|
|||
rls.ParseSizeBytesString(sz)
|
||||
}
|
||||
}
|
||||
|
||||
// additional size parsing
|
||||
// some feeds have a fixed size for enclosure so lets check for custom elements
|
||||
// and parse size from there if it differs
|
||||
if customTorrent, ok := item.Custom["torrent"]; ok {
|
||||
var element itemCustomElement
|
||||
if err := xml.Unmarshal([]byte("<torrent>"+customTorrent+"</torrent>"), &element); err != nil {
|
||||
j.Log.Error().Err(err).Msg("could not unmarshal item.Custom.Torrent")
|
||||
}
|
||||
|
||||
if element.ContentLength > 0 {
|
||||
if uint64(element.ContentLength) != rls.Size {
|
||||
rls.Size = uint64(element.ContentLength)
|
||||
}
|
||||
}
|
||||
|
||||
if rls.TorrentHash == "" && element.InfoHash != "" {
|
||||
rls.TorrentHash = element.InfoHash
|
||||
}
|
||||
}
|
||||
|
||||
// basic freeleech parsing
|
||||
if isFreeleech([]string{item.Title, item.Description}) {
|
||||
rls.Freeleech = true
|
||||
rls.Bonus = []string{"Freeleech"}
|
||||
}
|
||||
|
||||
// add cookie to release for download if needed
|
||||
if j.Feed.Cookie != "" {
|
||||
rls.RawCookie = j.Feed.Cookie
|
||||
}
|
||||
|
||||
return rls
|
||||
}
|
||||
|
||||
|
@ -145,51 +199,103 @@ func (j *RSSJob) getFeed() (items []*gofeed.Item, err error) {
|
|||
ctx, cancel := context.WithTimeout(context.Background(), j.Timeout)
|
||||
defer cancel()
|
||||
|
||||
feed, err := gofeed.NewParser().ParseURLWithContext(j.URL, ctx) // there's an RSS specific parser as well.
|
||||
feed, err := NewFeedParser(j.Timeout, j.Feed.Cookie).ParseURLWithContext(ctx, j.URL)
|
||||
if err != nil {
|
||||
j.Log.Error().Err(err).Msgf("error fetching rss feed items")
|
||||
return nil, errors.Wrap(err, "error fetching rss feed items")
|
||||
}
|
||||
|
||||
// get feed as JSON string
|
||||
feedData := feed.String()
|
||||
|
||||
if err := j.Repo.UpdateLastRunWithData(context.Background(), j.Feed.ID, feedData); err != nil {
|
||||
j.Log.Error().Err(err).Msgf("error updating last run for feed id: %v", j.Feed.ID)
|
||||
}
|
||||
|
||||
j.Log.Debug().Msgf("refreshing rss feed: %v, found (%d) items", j.Name, len(feed.Items))
|
||||
|
||||
if len(feed.Items) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
sort.Sort(feed)
|
||||
bucketKey := fmt.Sprintf("%v+%v", j.IndexerIdentifier, j.Name)
|
||||
|
||||
//sort.Sort(feed)
|
||||
|
||||
bucketCount, err := j.CacheRepo.GetCountByBucket(ctx, bucketKey)
|
||||
if err != nil {
|
||||
j.Log.Error().Err(err).Msg("could not check if item exists")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// set ttl to 1 month
|
||||
ttl := time.Now().AddDate(0, 1, 0)
|
||||
|
||||
for _, i := range feed.Items {
|
||||
s := i.GUID
|
||||
if len(s) == 0 {
|
||||
s = i.Title
|
||||
if len(s) == 0 {
|
||||
item := i
|
||||
|
||||
key := item.GUID
|
||||
if len(key) == 0 {
|
||||
key = item.Title
|
||||
if len(key) == 0 {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
exists, err := j.Repo.Exists(j.Name, s)
|
||||
exists, err := j.CacheRepo.Exists(bucketKey, key)
|
||||
if err != nil {
|
||||
j.Log.Error().Err(err).Msg("could not check if item exists")
|
||||
continue
|
||||
}
|
||||
if exists {
|
||||
j.Log.Trace().Msgf("cache item exists, skipping release: %v", i.Title)
|
||||
j.Log.Trace().Msgf("cache item exists, skipping release: %v", item.Title)
|
||||
continue
|
||||
}
|
||||
|
||||
// set ttl to 1 month
|
||||
ttl := time.Now().AddDate(0, 1, 0)
|
||||
|
||||
if err := j.Repo.Put(j.Name, s, []byte(i.Title), ttl); err != nil {
|
||||
j.Log.Error().Stack().Err(err).Str("entry", s).Msg("cache.Put: error storing item in cache")
|
||||
if err := j.CacheRepo.Put(bucketKey, key, []byte(item.Title), ttl); err != nil {
|
||||
j.Log.Error().Err(err).Str("entry", key).Msg("cache.Put: error storing item in cache")
|
||||
continue
|
||||
}
|
||||
|
||||
// only append if we successfully added to cache
|
||||
items = append(items, i)
|
||||
// first time we fetch the feed the cached bucket count will be 0
|
||||
// only append to items if it's bigger than 0, so we get new items only
|
||||
if bucketCount > 0 {
|
||||
items = append(items, item)
|
||||
}
|
||||
}
|
||||
|
||||
// send to filters
|
||||
return
|
||||
}
|
||||
|
||||
func isNewerThanMaxAge(maxAge int, item, now time.Time) bool {
|
||||
// now minus max age
|
||||
nowMaxAge := now.Add(time.Duration(-maxAge) * time.Second)
|
||||
|
||||
if item.After(nowMaxAge) {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// isFreeleech basic freeleech parsing
|
||||
func isFreeleech(str []string) bool {
|
||||
for _, s := range str {
|
||||
var re = regexp.MustCompile(`(?mi)(\bfreeleech\b)`)
|
||||
|
||||
match := re.FindAllString(s, -1)
|
||||
|
||||
if len(match) > 0 {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// itemCustomElement
|
||||
// used for some feeds like Aviztas network
|
||||
type itemCustomElement struct {
|
||||
ContentLength int64 `xml:"contentLength"`
|
||||
InfoHash string `xml:"infoHash"`
|
||||
}
|
||||
|
|
|
@ -14,8 +14,10 @@ import (
|
|||
|
||||
func TestRSSJob_processItem(t *testing.T) {
|
||||
now := time.Now()
|
||||
nowMinusTime := time.Now().Add(time.Duration(-3000) * time.Second)
|
||||
|
||||
type fields struct {
|
||||
Feed *domain.Feed
|
||||
Name string
|
||||
IndexerIdentifier string
|
||||
Log zerolog.Logger
|
||||
|
@ -38,6 +40,9 @@ func TestRSSJob_processItem(t *testing.T) {
|
|||
{
|
||||
name: "no_baseurl",
|
||||
fields: fields{
|
||||
Feed: &domain.Feed{
|
||||
MaxAge: 3600,
|
||||
},
|
||||
Name: "test feed",
|
||||
IndexerIdentifier: "mock-feed",
|
||||
Log: zerolog.Logger{},
|
||||
|
@ -64,6 +69,9 @@ func TestRSSJob_processItem(t *testing.T) {
|
|||
{
|
||||
name: "with_baseurl",
|
||||
fields: fields{
|
||||
Feed: &domain.Feed{
|
||||
MaxAge: 3600,
|
||||
},
|
||||
Name: "test feed",
|
||||
IndexerIdentifier: "mock-feed",
|
||||
Log: zerolog.Logger{},
|
||||
|
@ -87,24 +95,124 @@ func TestRSSJob_processItem(t *testing.T) {
|
|||
}},
|
||||
want: &domain.Release{ID: 0, FilterStatus: "PENDING", Rejections: []string{}, Indexer: "mock-feed", FilterName: "", Protocol: "torrent", Implementation: "RSS", Timestamp: now, GroupID: "", TorrentID: "", TorrentURL: "https://fake-feed.com/details.php?id=00000&hit=1", TorrentTmpFile: "", TorrentDataRawBytes: []uint8(nil), TorrentHash: "", TorrentName: "Some.Release.Title.2022.09.22.720p.WEB.h264-GROUP", Size: 0x0, Title: "Some Release Title", Category: "", Season: 0, Episode: 0, Year: 2022, Resolution: "720p", Source: "WEB", Codec: []string{"H.264"}, Container: "", HDR: []string(nil), Audio: []string(nil), AudioChannels: "", Group: "GROUP", Region: "", Language: "", Proper: false, Repack: false, Website: "", Artists: "", Type: "", LogScore: 0, IsScene: false, Origin: "", Tags: []string{}, ReleaseTags: "", Freeleech: false, FreeleechPercent: 0, Bonus: []string(nil), Uploader: "", PreTime: "", Other: []string(nil), RawCookie: "", AdditionalSizeCheckRequired: false, FilterID: 0, Filter: (*domain.Filter)(nil), ActionStatus: []domain.ReleaseActionStatus(nil)},
|
||||
},
|
||||
{
|
||||
name: "time_parse",
|
||||
fields: fields{
|
||||
Feed: &domain.Feed{
|
||||
MaxAge: 360,
|
||||
},
|
||||
Name: "test feed",
|
||||
IndexerIdentifier: "mock-feed",
|
||||
Log: zerolog.Logger{},
|
||||
URL: "https://fake-feed.com/rss",
|
||||
Repo: nil,
|
||||
ReleaseSvc: nil,
|
||||
attempts: 0,
|
||||
errors: nil,
|
||||
JobID: 0,
|
||||
},
|
||||
args: args{item: &gofeed.Item{
|
||||
Title: "Some.Release.Title.2022.09.22.720p.WEB.h264-GROUP",
|
||||
Description: `Category: Example
|
||||
Size: 1.49 GB
|
||||
Status: 27 seeders and 1 leechers
|
||||
Speed: 772.16 kB/s
|
||||
Added: 2022-09-29 16:06:08
|
||||
`,
|
||||
Link: "https://fake-feed.com/details.php?id=00000&hit=1",
|
||||
GUID: "Some.Release.Title.2022.09.22.720p.WEB.h264-GROUP",
|
||||
//PublishedParsed: &nowMinusTime,
|
||||
}},
|
||||
want: &domain.Release{ID: 0, FilterStatus: "PENDING", Rejections: []string{}, Indexer: "mock-feed", FilterName: "", Protocol: "torrent", Implementation: "RSS", Timestamp: now, GroupID: "", TorrentID: "", TorrentURL: "https://fake-feed.com/details.php?id=00000&hit=1", TorrentTmpFile: "", TorrentDataRawBytes: []uint8(nil), TorrentHash: "", TorrentName: "Some.Release.Title.2022.09.22.720p.WEB.h264-GROUP", Size: 0x0, Title: "Some Release Title", Category: "", Season: 0, Episode: 0, Year: 2022, Resolution: "720p", Source: "WEB", Codec: []string{"H.264"}, Container: "", HDR: []string(nil), Audio: []string(nil), AudioChannels: "", Group: "GROUP", Region: "", Language: "", Proper: false, Repack: false, Website: "", Artists: "", Type: "", LogScore: 0, IsScene: false, Origin: "", Tags: []string{}, ReleaseTags: "", Freeleech: false, FreeleechPercent: 0, Bonus: []string(nil), Uploader: "", PreTime: "", Other: []string(nil), RawCookie: "", AdditionalSizeCheckRequired: false, FilterID: 0, Filter: (*domain.Filter)(nil), ActionStatus: []domain.ReleaseActionStatus(nil)},
|
||||
},
|
||||
{
|
||||
name: "time_parse",
|
||||
fields: fields{
|
||||
Feed: &domain.Feed{
|
||||
MaxAge: 360,
|
||||
},
|
||||
Name: "test feed",
|
||||
IndexerIdentifier: "mock-feed",
|
||||
Log: zerolog.Logger{},
|
||||
URL: "https://fake-feed.com/rss",
|
||||
Repo: nil,
|
||||
ReleaseSvc: nil,
|
||||
attempts: 0,
|
||||
errors: nil,
|
||||
JobID: 0,
|
||||
},
|
||||
args: args{item: &gofeed.Item{
|
||||
Title: "Some.Release.Title.2022.09.22.720p.WEB.h264-GROUP",
|
||||
Description: `Category: Example
|
||||
Size: 1.49 GB
|
||||
Status: 27 seeders and 1 leechers
|
||||
Speed: 772.16 kB/s
|
||||
Added: 2022-09-29 16:06:08
|
||||
`,
|
||||
Link: "https://fake-feed.com/details.php?id=00000&hit=1",
|
||||
GUID: "Some.Release.Title.2022.09.22.720p.WEB.h264-GROUP",
|
||||
PublishedParsed: &nowMinusTime,
|
||||
}},
|
||||
want: nil,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
j := &RSSJob{
|
||||
Feed: tt.fields.Feed,
|
||||
Name: tt.fields.Name,
|
||||
IndexerIdentifier: tt.fields.IndexerIdentifier,
|
||||
Log: tt.fields.Log,
|
||||
URL: tt.fields.URL,
|
||||
Repo: tt.fields.Repo,
|
||||
CacheRepo: tt.fields.Repo,
|
||||
ReleaseSvc: tt.fields.ReleaseSvc,
|
||||
attempts: tt.fields.attempts,
|
||||
errors: tt.fields.errors,
|
||||
JobID: tt.fields.JobID,
|
||||
}
|
||||
got := j.processItem(tt.args.item)
|
||||
got.Timestamp = now // override to match
|
||||
if got != nil {
|
||||
got.Timestamp = now // override to match
|
||||
}
|
||||
|
||||
assert.Equal(t, tt.want, got)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func Test_isMaxAge(t *testing.T) {
|
||||
type args struct {
|
||||
maxAge int
|
||||
item time.Time
|
||||
now time.Time
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
want bool
|
||||
}{
|
||||
{
|
||||
name: "01",
|
||||
args: args{
|
||||
maxAge: 3600,
|
||||
item: time.Now().Add(time.Duration(-500) * time.Second),
|
||||
now: time.Now(),
|
||||
},
|
||||
want: true,
|
||||
},
|
||||
{
|
||||
name: "02",
|
||||
args: args{
|
||||
maxAge: 3600,
|
||||
item: time.Now().Add(time.Duration(-5000) * time.Second),
|
||||
now: time.Now(),
|
||||
},
|
||||
want: false,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
assert.Equalf(t, tt.want, isNewerThanMaxAge(tt.args.maxAge, tt.args.item, tt.args.now), "isNewerThanMaxAge(%v, %v, %v)", tt.args.maxAge, tt.args.item, tt.args.now)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,6 +2,9 @@ package feed
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/autobrr/autobrr/internal/domain"
|
||||
|
@ -12,6 +15,7 @@ import (
|
|||
"github.com/autobrr/autobrr/pkg/torznab"
|
||||
|
||||
"github.com/dcarbone/zadapters/zstdlog"
|
||||
"github.com/mmcdole/gofeed"
|
||||
"github.com/rs/zerolog"
|
||||
)
|
||||
|
||||
|
@ -19,6 +23,7 @@ type Service interface {
|
|||
FindByID(ctx context.Context, id int) (*domain.Feed, error)
|
||||
FindByIndexerIdentifier(ctx context.Context, indexer string) (*domain.Feed, error)
|
||||
Find(ctx context.Context) ([]domain.Feed, error)
|
||||
GetCacheByID(ctx context.Context, bucket string) ([]domain.FeedCacheItem, error)
|
||||
Store(ctx context.Context, feed *domain.Feed) error
|
||||
Update(ctx context.Context, feed *domain.Feed) error
|
||||
Test(ctx context.Context, feed *domain.Feed) error
|
||||
|
@ -29,6 +34,7 @@ type Service interface {
|
|||
}
|
||||
|
||||
type feedInstance struct {
|
||||
Feed *domain.Feed
|
||||
Name string
|
||||
IndexerIdentifier string
|
||||
URL string
|
||||
|
@ -38,6 +44,16 @@ type feedInstance struct {
|
|||
Timeout time.Duration
|
||||
}
|
||||
|
||||
type feedKey struct {
|
||||
id int
|
||||
indexer string
|
||||
name string
|
||||
}
|
||||
|
||||
func (k feedKey) ToString() string {
|
||||
return fmt.Sprintf("%v+%v+%v", k.id, k.indexer, k.name)
|
||||
}
|
||||
|
||||
type service struct {
|
||||
log zerolog.Logger
|
||||
jobs map[string]int
|
||||
|
@ -60,82 +76,67 @@ func NewService(log logger.Logger, repo domain.FeedRepo, cacheRepo domain.FeedCa
|
|||
}
|
||||
|
||||
func (s *service) FindByID(ctx context.Context, id int) (*domain.Feed, error) {
|
||||
return s.repo.FindByID(ctx, id)
|
||||
}
|
||||
|
||||
func (s *service) FindByIndexerIdentifier(ctx context.Context, indexer string) (*domain.Feed, error) {
|
||||
return s.repo.FindByIndexerIdentifier(ctx, indexer)
|
||||
}
|
||||
|
||||
func (s *service) Find(ctx context.Context) ([]domain.Feed, error) {
|
||||
return s.repo.Find(ctx)
|
||||
}
|
||||
|
||||
func (s *service) GetCacheByID(ctx context.Context, bucket string) ([]domain.FeedCacheItem, error) {
|
||||
id, _ := strconv.Atoi(bucket)
|
||||
|
||||
feed, err := s.repo.FindByID(ctx, id)
|
||||
if err != nil {
|
||||
s.log.Error().Err(err).Msgf("could not find feed by id: %v", id)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return feed, nil
|
||||
}
|
||||
|
||||
func (s *service) FindByIndexerIdentifier(ctx context.Context, indexer string) (*domain.Feed, error) {
|
||||
feed, err := s.repo.FindByIndexerIdentifier(ctx, indexer)
|
||||
data, err := s.cacheRepo.GetByBucket(ctx, feed.Name)
|
||||
if err != nil {
|
||||
s.log.Error().Err(err).Msgf("could not find feed by indexer: %v", indexer)
|
||||
s.log.Error().Err(err).Msg("could not get feed cache")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return feed, nil
|
||||
}
|
||||
|
||||
func (s *service) Find(ctx context.Context) ([]domain.Feed, error) {
|
||||
feeds, err := s.repo.Find(ctx)
|
||||
if err != nil {
|
||||
s.log.Error().Err(err).Msg("could not find feeds")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return feeds, err
|
||||
return data, err
|
||||
}
|
||||
|
||||
func (s *service) Store(ctx context.Context, feed *domain.Feed) error {
|
||||
if err := s.repo.Store(ctx, feed); err != nil {
|
||||
s.log.Error().Err(err).Msgf("could not store feed: %+v", feed)
|
||||
return err
|
||||
}
|
||||
|
||||
s.log.Debug().Msgf("successfully added feed: %+v", feed)
|
||||
|
||||
return nil
|
||||
return s.repo.Store(ctx, feed)
|
||||
}
|
||||
|
||||
func (s *service) Update(ctx context.Context, feed *domain.Feed) error {
|
||||
if err := s.update(ctx, feed); err != nil {
|
||||
s.log.Error().Err(err).Msgf("could not update feed: %+v", feed)
|
||||
return err
|
||||
}
|
||||
|
||||
s.log.Debug().Msgf("successfully updated feed: %+v", feed)
|
||||
|
||||
return nil
|
||||
return s.update(ctx, feed)
|
||||
}
|
||||
|
||||
func (s *service) Delete(ctx context.Context, id int) error {
|
||||
if err := s.delete(ctx, id); err != nil {
|
||||
s.log.Error().Err(err).Msgf("could not delete feed by id: %v", id)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
return s.delete(ctx, id)
|
||||
}
|
||||
|
||||
func (s *service) ToggleEnabled(ctx context.Context, id int, enabled bool) error {
|
||||
if err := s.toggleEnabled(ctx, id, enabled); err != nil {
|
||||
s.log.Error().Err(err).Msgf("could not toggle feed by id: %v", id)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
return s.toggleEnabled(ctx, id, enabled)
|
||||
}
|
||||
|
||||
func (s *service) Test(ctx context.Context, feed *domain.Feed) error {
|
||||
return s.test(ctx, feed)
|
||||
}
|
||||
|
||||
func (s *service) Start() error {
|
||||
return s.start()
|
||||
}
|
||||
|
||||
func (s *service) update(ctx context.Context, feed *domain.Feed) error {
|
||||
if err := s.repo.Update(ctx, feed); err != nil {
|
||||
s.log.Error().Err(err).Msg("feed.Update: error updating feed")
|
||||
s.log.Error().Err(err).Msg("error updating feed")
|
||||
return err
|
||||
}
|
||||
|
||||
if err := s.restartJob(feed); err != nil {
|
||||
s.log.Error().Err(err).Msg("feed.Update: error restarting feed")
|
||||
s.log.Error().Err(err).Msg("error restarting feed")
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -149,17 +150,13 @@ func (s *service) delete(ctx context.Context, id int) error {
|
|||
return err
|
||||
}
|
||||
|
||||
switch f.Type {
|
||||
case string(domain.FeedTypeTorznab):
|
||||
if err := s.stopTorznabJob(f.Indexer); err != nil {
|
||||
s.log.Error().Err(err).Msg("error stopping torznab job")
|
||||
return err
|
||||
}
|
||||
case string(domain.FeedTypeRSS):
|
||||
if err := s.stopRSSJob(f.Indexer); err != nil {
|
||||
s.log.Error().Err(err).Msg("error stopping rss job")
|
||||
return err
|
||||
}
|
||||
s.log.Debug().Msgf("stopping and removing feed: %v", f.Name)
|
||||
|
||||
identifierKey := feedKey{f.ID, f.Indexer, f.Name}.ToString()
|
||||
|
||||
if err := s.stopFeedJob(identifierKey); err != nil {
|
||||
s.log.Error().Err(err).Msg("error stopping rss job")
|
||||
return err
|
||||
}
|
||||
|
||||
if err := s.repo.Delete(ctx, id); err != nil {
|
||||
|
@ -172,83 +169,112 @@ func (s *service) delete(ctx context.Context, id int) error {
|
|||
return err
|
||||
}
|
||||
|
||||
s.log.Debug().Msgf("feed.Delete: stopping and removing feed: %v", f.Name)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *service) toggleEnabled(ctx context.Context, id int, enabled bool) error {
|
||||
f, err := s.repo.FindByID(ctx, id)
|
||||
if err != nil {
|
||||
s.log.Error().Err(err).Msg("feed.ToggleEnabled: error finding feed")
|
||||
s.log.Error().Err(err).Msg("error finding feed")
|
||||
return err
|
||||
}
|
||||
|
||||
if err := s.repo.ToggleEnabled(ctx, id, enabled); err != nil {
|
||||
s.log.Error().Err(err).Msg("feed.ToggleEnabled: error toggle enabled")
|
||||
s.log.Error().Err(err).Msg("error feed toggle enabled")
|
||||
return err
|
||||
}
|
||||
|
||||
if f.Enabled && !enabled {
|
||||
switch f.Type {
|
||||
case string(domain.FeedTypeTorznab):
|
||||
if err := s.stopTorznabJob(f.Indexer); err != nil {
|
||||
s.log.Error().Err(err).Msg("feed.ToggleEnabled: error stopping torznab job")
|
||||
if f.Enabled != enabled {
|
||||
if enabled {
|
||||
// override enabled
|
||||
f.Enabled = true
|
||||
|
||||
if err := s.startJob(f); err != nil {
|
||||
s.log.Error().Err(err).Msg("error starting feed job")
|
||||
return err
|
||||
}
|
||||
case string(domain.FeedTypeRSS):
|
||||
if err := s.stopRSSJob(f.Indexer); err != nil {
|
||||
s.log.Error().Err(err).Msg("feed.ToggleEnabled: error stopping rss job")
|
||||
|
||||
s.log.Debug().Msgf("feed started: %v", f.Name)
|
||||
|
||||
return nil
|
||||
} else {
|
||||
s.log.Debug().Msgf("stopping feed: %v", f.Name)
|
||||
|
||||
identifierKey := feedKey{f.ID, f.Indexer, f.Name}.ToString()
|
||||
|
||||
if err := s.stopFeedJob(identifierKey); err != nil {
|
||||
s.log.Error().Err(err).Msg("error stopping feed job")
|
||||
return err
|
||||
}
|
||||
|
||||
s.log.Debug().Msgf("feed stopped: %v", f.Name)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
s.log.Debug().Msgf("feed.ToggleEnabled: stopping feed: %v", f.Name)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := s.startJob(*f); err != nil {
|
||||
s.log.Error().Err(err).Msg("feed.ToggleEnabled: error starting torznab job")
|
||||
return err
|
||||
}
|
||||
|
||||
s.log.Debug().Msgf("feed.ToggleEnabled: started feed: %v", f.Name)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *service) Test(ctx context.Context, feed *domain.Feed) error {
|
||||
|
||||
func (s *service) test(ctx context.Context, feed *domain.Feed) error {
|
||||
// create sub logger
|
||||
subLogger := zstdlog.NewStdLoggerWithLevel(s.log.With().Logger(), zerolog.DebugLevel)
|
||||
|
||||
// implementation == TORZNAB
|
||||
// test feeds
|
||||
if feed.Type == string(domain.FeedTypeTorznab) {
|
||||
// setup torznab Client
|
||||
c := torznab.NewClient(torznab.Config{Host: feed.URL, ApiKey: feed.ApiKey, Log: subLogger})
|
||||
|
||||
if _, err := c.FetchFeed(); err != nil {
|
||||
s.log.Error().Err(err).Msg("error getting torznab feed")
|
||||
if err := s.testTorznab(feed, subLogger); err != nil {
|
||||
return err
|
||||
}
|
||||
} else if feed.Type == string(domain.FeedTypeRSS) {
|
||||
if err := s.testRSS(ctx, feed); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
s.log.Debug().Msgf("test successful - connected to feed: %+v", feed.URL)
|
||||
s.log.Info().Msgf("feed test successful - connected to feed: %v", feed.URL)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *service) Start() error {
|
||||
// get all torznab indexer definitions
|
||||
feeds, err := s.repo.Find(context.TODO())
|
||||
func (s *service) testRSS(ctx context.Context, feed *domain.Feed) error {
|
||||
f, err := gofeed.NewParser().ParseURLWithContext(feed.URL, ctx)
|
||||
if err != nil {
|
||||
s.log.Error().Err(err).Msg("feed.Start: error finding feeds")
|
||||
s.log.Error().Err(err).Msgf("error fetching rss feed items")
|
||||
return errors.Wrap(err, "error fetching rss feed items")
|
||||
}
|
||||
|
||||
s.log.Info().Msgf("refreshing rss feed: %v, found (%d) items", feed.Name, len(f.Items))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *service) testTorznab(feed *domain.Feed, subLogger *log.Logger) error {
|
||||
// setup torznab Client
|
||||
c := torznab.NewClient(torznab.Config{Host: feed.URL, ApiKey: feed.ApiKey, Log: subLogger})
|
||||
|
||||
items, err := c.FetchFeed()
|
||||
if err != nil {
|
||||
s.log.Error().Err(err).Msg("error getting torznab feed")
|
||||
return err
|
||||
}
|
||||
|
||||
for _, i := range feeds {
|
||||
if err := s.startJob(i); err != nil {
|
||||
s.log.Error().Err(err).Msg("feed.Start: failed to initialize torznab job")
|
||||
s.log.Info().Msgf("refreshing torznab feed: %v, found (%d) items", feed.Name, len(items))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *service) start() error {
|
||||
// get all torznab indexer definitions
|
||||
feeds, err := s.repo.Find(context.TODO())
|
||||
if err != nil {
|
||||
s.log.Error().Err(err).Msg("error finding feeds")
|
||||
return err
|
||||
}
|
||||
|
||||
for _, feed := range feeds {
|
||||
feed := feed
|
||||
if err := s.startJob(&feed); err != nil {
|
||||
s.log.Error().Err(err).Msg("failed to initialize torznab job")
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
@ -257,27 +283,29 @@ func (s *service) Start() error {
|
|||
}
|
||||
|
||||
func (s *service) restartJob(f *domain.Feed) error {
|
||||
// stop feed
|
||||
if err := s.stopTorznabJob(f.Indexer); err != nil {
|
||||
s.log.Error().Err(err).Msg("feed.restartJob: error stopping torznab job")
|
||||
s.log.Debug().Msgf("stopping feed: %v", f.Name)
|
||||
|
||||
identifierKey := feedKey{f.ID, f.Indexer, f.Name}.ToString()
|
||||
|
||||
// stop feed job
|
||||
if err := s.stopFeedJob(identifierKey); err != nil {
|
||||
s.log.Error().Err(err).Msg("error stopping feed job")
|
||||
return err
|
||||
}
|
||||
|
||||
s.log.Debug().Msgf("feed.restartJob: stopping feed: %v", f.Name)
|
||||
|
||||
if f.Enabled {
|
||||
if err := s.startJob(*f); err != nil {
|
||||
s.log.Error().Err(err).Msg("feed.restartJob: error starting torznab job")
|
||||
if err := s.startJob(f); err != nil {
|
||||
s.log.Error().Err(err).Msg("error starting feed job")
|
||||
return err
|
||||
}
|
||||
|
||||
s.log.Debug().Msgf("feed.restartJob: restarted feed: %v", f.Name)
|
||||
s.log.Debug().Msgf("restarted feed: %v", f.Name)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *service) startJob(f domain.Feed) error {
|
||||
func (s *service) startJob(f *domain.Feed) error {
|
||||
// get all torznab indexer definitions
|
||||
if !f.Enabled {
|
||||
return nil
|
||||
|
@ -285,11 +313,12 @@ func (s *service) startJob(f domain.Feed) error {
|
|||
|
||||
// get torznab_url from settings
|
||||
if f.URL == "" {
|
||||
return nil
|
||||
return errors.New("no URL provided for feed: %v", f.Name)
|
||||
}
|
||||
|
||||
// cron schedule to run every X minutes
|
||||
fi := feedInstance{
|
||||
Feed: f,
|
||||
Name: f.Name,
|
||||
IndexerIdentifier: f.Indexer,
|
||||
Implementation: f.Type,
|
||||
|
@ -302,12 +331,12 @@ func (s *service) startJob(f domain.Feed) error {
|
|||
switch fi.Implementation {
|
||||
case string(domain.FeedTypeTorznab):
|
||||
if err := s.addTorznabJob(fi); err != nil {
|
||||
s.log.Error().Err(err).Msg("feed.startJob: failed to initialize torznab feed")
|
||||
s.log.Error().Err(err).Msg("failed to initialize torznab feed")
|
||||
return err
|
||||
}
|
||||
case string(domain.FeedTypeRSS):
|
||||
if err := s.addRSSJob(fi); err != nil {
|
||||
s.log.Error().Err(err).Msg("feed.startJob: failed to initialize rss feed")
|
||||
s.log.Error().Err(err).Msg("failed to initialize rss feed")
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -319,9 +348,10 @@ func (s *service) addTorznabJob(f feedInstance) error {
|
|||
if f.URL == "" {
|
||||
return errors.New("torznab feed requires URL")
|
||||
}
|
||||
if f.CronSchedule < time.Duration(5*time.Minute) {
|
||||
f.CronSchedule = time.Duration(15 * time.Minute)
|
||||
}
|
||||
|
||||
//if f.CronSchedule < 5*time.Minute {
|
||||
// f.CronSchedule = 15 * time.Minute
|
||||
//}
|
||||
|
||||
// setup logger
|
||||
l := s.log.With().Str("feed", f.Name).Logger()
|
||||
|
@ -332,28 +362,19 @@ func (s *service) addTorznabJob(f feedInstance) error {
|
|||
// create job
|
||||
job := NewTorznabJob(f.Name, f.IndexerIdentifier, l, f.URL, c, s.cacheRepo, s.releaseSvc)
|
||||
|
||||
identifierKey := feedKey{f.Feed.ID, f.Feed.Indexer, f.Feed.Name}.ToString()
|
||||
|
||||
// schedule job
|
||||
id, err := s.scheduler.AddJob(job, f.CronSchedule, f.IndexerIdentifier)
|
||||
id, err := s.scheduler.AddJob(job, f.CronSchedule, identifierKey)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "feed.AddTorznabJob: add job failed")
|
||||
}
|
||||
job.JobID = id
|
||||
|
||||
// add to job map
|
||||
s.jobs[f.IndexerIdentifier] = id
|
||||
s.jobs[identifierKey] = id
|
||||
|
||||
s.log.Debug().Msgf("feed.AddTorznabJob: %v", f.Name)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *service) stopTorznabJob(indexer string) error {
|
||||
// remove job from scheduler
|
||||
if err := s.scheduler.RemoveJobByIdentifier(indexer); err != nil {
|
||||
return errors.Wrap(err, "feed.stopTorznabJob: stop job failed")
|
||||
}
|
||||
|
||||
s.log.Debug().Msgf("feed.stopTorznabJob: %v", indexer)
|
||||
s.log.Debug().Msgf("add torznab job: %v", f.Name)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -362,38 +383,41 @@ func (s *service) addRSSJob(f feedInstance) error {
|
|||
if f.URL == "" {
|
||||
return errors.New("rss feed requires URL")
|
||||
}
|
||||
if f.CronSchedule < time.Duration(5*time.Minute) {
|
||||
f.CronSchedule = time.Duration(15 * time.Minute)
|
||||
}
|
||||
|
||||
//if f.CronSchedule < time.Duration(5*time.Minute) {
|
||||
// f.CronSchedule = time.Duration(15 * time.Minute)
|
||||
//}
|
||||
|
||||
// setup logger
|
||||
l := s.log.With().Str("feed", f.Name).Logger()
|
||||
|
||||
// create job
|
||||
job := NewRSSJob(f.Name, f.IndexerIdentifier, l, f.URL, s.cacheRepo, s.releaseSvc, f.Timeout)
|
||||
job := NewRSSJob(f.Feed, f.Name, f.IndexerIdentifier, l, f.URL, s.repo, s.cacheRepo, s.releaseSvc, f.Timeout)
|
||||
|
||||
identifierKey := feedKey{f.Feed.ID, f.Feed.Indexer, f.Feed.Name}.ToString()
|
||||
|
||||
// schedule job
|
||||
id, err := s.scheduler.AddJob(job, f.CronSchedule, f.IndexerIdentifier)
|
||||
id, err := s.scheduler.AddJob(job, f.CronSchedule, identifierKey)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "feed.AddRSSJob: add job failed")
|
||||
}
|
||||
job.JobID = id
|
||||
|
||||
// add to job map
|
||||
s.jobs[f.IndexerIdentifier] = id
|
||||
s.jobs[identifierKey] = id
|
||||
|
||||
s.log.Debug().Msgf("feed.AddRSSJob: %v", f.Name)
|
||||
s.log.Debug().Msgf("add rss job: %v", f.Name)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *service) stopRSSJob(indexer string) error {
|
||||
func (s *service) stopFeedJob(indexer string) error {
|
||||
// remove job from scheduler
|
||||
if err := s.scheduler.RemoveJobByIdentifier(indexer); err != nil {
|
||||
return errors.Wrap(err, "feed.stopRSSJob: stop job failed")
|
||||
return errors.Wrap(err, "stop job failed")
|
||||
}
|
||||
|
||||
s.log.Debug().Msgf("feed.stopRSSJob: %v", indexer)
|
||||
s.log.Debug().Msgf("stop feed job: %v", indexer)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -80,7 +80,7 @@ func (j *TorznabJob) process() error {
|
|||
|
||||
rls.ParseString(item.Title)
|
||||
|
||||
if parseFreeleech(item) {
|
||||
if parseFreeleechTorznab(item) {
|
||||
rls.Freeleech = true
|
||||
rls.Bonus = []string{"Freeleech"}
|
||||
}
|
||||
|
@ -100,7 +100,7 @@ func (j *TorznabJob) process() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func parseFreeleech(item torznab.FeedItem) bool {
|
||||
func parseFreeleechTorznab(item torznab.FeedItem) bool {
|
||||
for _, attr := range item.Attributes {
|
||||
if attr.Name == "downloadvolumefactor" {
|
||||
if attr.Value == "0" {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue