feat(feeds): add generic RSS support (#410)

* feat(feeds): add generic rss support

* feat(feeds/web): add generic rss support

* implement rss downloading

* gosum + mod

* re-add size from Custom field.

* implement uploader + category

* sync

* remove double assignment (+torznab)

* didn't save the rss file >.>

* cleanup

* fixfeeds): create rss indexer

* fix(feeds): stop feed

* feat(feeds): support nexusphp rss enclosure link

* feat(feeds): check size for custom size

* fix(feeds): race condition and only stop enabled feeds

* fix(feeds): unify indexer implementation badge

Co-authored-by: Kyle Sanderson <kyle.leet@gmail.com>
This commit is contained in:
ze0s 2022-08-20 00:34:46 +02:00 committed by GitHub
parent b607aef63e
commit b50688159e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 498 additions and 89 deletions

173
internal/feed/rss.go Normal file
View file

@ -0,0 +1,173 @@
package feed
import (
"context"
"sort"
"time"
"github.com/autobrr/autobrr/internal/domain"
"github.com/autobrr/autobrr/internal/release"
"github.com/autobrr/autobrr/pkg/errors"
"github.com/mmcdole/gofeed"
"github.com/rs/zerolog"
)
type RSSJob struct {
Name string
IndexerIdentifier string
Log zerolog.Logger
URL string
Repo domain.FeedCacheRepo
ReleaseSvc release.Service
attempts int
errors []error
JobID int
}
func NewRSSJob(name string, indexerIdentifier string, log zerolog.Logger, url string, repo domain.FeedCacheRepo, releaseSvc release.Service) *RSSJob {
return &RSSJob{
Name: name,
IndexerIdentifier: indexerIdentifier,
Log: log,
URL: url,
Repo: repo,
ReleaseSvc: releaseSvc,
}
}
func (j *RSSJob) Run() {
if err := j.process(); err != nil {
j.Log.Err(err).Int("attempts", j.attempts).Msg("rss feed process error")
j.errors = append(j.errors, err)
return
}
j.attempts = 0
j.errors = []error{}
return
}
func (j *RSSJob) process() error {
items, err := j.getFeed()
if err != nil {
j.Log.Error().Err(err).Msgf("error fetching rss feed items")
return errors.Wrap(err, "error getting rss feed items")
}
j.Log.Debug().Msgf("found (%d) new items to process", len(items))
if len(items) == 0 {
return nil
}
releases := make([]*domain.Release, 0)
for _, item := range items {
rls := domain.NewRelease(j.IndexerIdentifier)
rls.Implementation = domain.ReleaseImplementationRSS
rls.ParseString(item.Title)
if len(item.Enclosures) > 0 {
e := item.Enclosures[0]
if e.Type == "application/x-bittorrent" && e.URL != "" {
rls.TorrentURL = e.URL
}
if e.Length != "" {
rls.ParseSizeBytesString(e.Length)
}
}
if rls.TorrentURL == "" && item.Link != "" {
rls.TorrentURL = item.Link
}
for _, v := range item.Categories {
if len(rls.Category) != 0 {
rls.Category += ", "
}
rls.Category += v
}
for _, v := range item.Authors {
if len(rls.Uploader) != 0 {
rls.Uploader += ", "
}
rls.Uploader += v.Name
}
if rls.Size == 0 {
// parse size bytes string
if sz, ok := item.Custom["size"]; ok {
rls.ParseSizeBytesString(sz)
}
}
releases = append(releases, rls)
}
// process all new releases
go j.ReleaseSvc.ProcessMultiple(releases)
return nil
}
func (j *RSSJob) getFeed() (items []*gofeed.Item, err error) {
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
feed, err := gofeed.NewParser().ParseURLWithContext(j.URL, ctx) // there's an RSS specific parser as well.
if err != nil {
j.Log.Error().Err(err).Msgf("error fetching rss feed items")
return nil, errors.Wrap(err, "error fetching rss feed items")
}
j.Log.Debug().Msgf("refreshing rss feed: %v, found (%d) items", j.Name, len(feed.Items))
if len(feed.Items) == 0 {
return
}
sort.Sort(feed)
for _, i := range feed.Items {
s := i.GUID
if len(s) == 0 {
s = i.Title
if len(s) == 0 {
continue
}
}
exists, err := j.Repo.Exists(j.Name, s)
if err != nil {
j.Log.Error().Err(err).Msg("could not check if item exists")
continue
}
if exists {
j.Log.Trace().Msgf("cache item exists, skipping release: %v", i.Title)
continue
}
// set ttl to 1 month
ttl := time.Now().AddDate(0, 1, 0)
if err := j.Repo.Put(j.Name, s, []byte(i.Title), ttl); err != nil {
j.Log.Error().Stack().Err(err).Str("entry", s).Msg("cache.Put: error storing item in cache")
continue
}
// only append if we successfully added to cache
items = append(items, i)
}
// send to filters
return
}

View file

@ -2,6 +2,7 @@ package feed
import (
"context"
"time"
"github.com/autobrr/autobrr/internal/domain"
"github.com/autobrr/autobrr/internal/logger"
@ -12,7 +13,6 @@ import (
"github.com/dcarbone/zadapters/zstdlog"
"github.com/rs/zerolog"
"time"
)
type Service interface {
@ -148,9 +148,17 @@ func (s *service) delete(ctx context.Context, id int) error {
return err
}
if err := s.stopTorznabJob(f.Indexer); err != nil {
s.log.Error().Err(err).Msg("error stopping torznab job")
return err
switch f.Type {
case string(domain.FeedTypeTorznab):
if err := s.stopTorznabJob(f.Indexer); err != nil {
s.log.Error().Err(err).Msg("error stopping torznab job")
return err
}
case string(domain.FeedTypeRSS):
if err := s.stopRSSJob(f.Indexer); err != nil {
s.log.Error().Err(err).Msg("error stopping rss job")
return err
}
}
if err := s.repo.Delete(ctx, id); err != nil {
@ -169,21 +177,29 @@ func (s *service) delete(ctx context.Context, id int) error {
}
func (s *service) toggleEnabled(ctx context.Context, id int, enabled bool) error {
if err := s.repo.ToggleEnabled(ctx, id, enabled); err != nil {
s.log.Error().Err(err).Msg("feed.ToggleEnabled: error toggle enabled")
return err
}
f, err := s.repo.FindByID(ctx, id)
if err != nil {
s.log.Error().Err(err).Msg("feed.ToggleEnabled: error finding feed")
return err
}
if !enabled {
if err := s.stopTorznabJob(f.Indexer); err != nil {
s.log.Error().Err(err).Msg("feed.ToggleEnabled: error stopping torznab job")
return err
if err := s.repo.ToggleEnabled(ctx, id, enabled); err != nil {
s.log.Error().Err(err).Msg("feed.ToggleEnabled: error toggle enabled")
return err
}
if f.Enabled && !enabled {
switch f.Type {
case string(domain.FeedTypeTorznab):
if err := s.stopTorznabJob(f.Indexer); err != nil {
s.log.Error().Err(err).Msg("feed.ToggleEnabled: error stopping torznab job")
return err
}
case string(domain.FeedTypeRSS):
if err := s.stopRSSJob(f.Indexer); err != nil {
s.log.Error().Err(err).Msg("feed.ToggleEnabled: error stopping rss job")
return err
}
}
s.log.Debug().Msgf("feed.ToggleEnabled: stopping feed: %v", f.Name)
@ -205,17 +221,20 @@ func (s *service) Test(ctx context.Context, feed *domain.Feed) error {
subLogger := zstdlog.NewStdLoggerWithLevel(s.log.With().Logger(), zerolog.DebugLevel)
// setup torznab Client
c := torznab.NewClient(torznab.Config{Host: feed.URL, ApiKey: feed.ApiKey, Log: subLogger})
caps, err := c.GetCaps()
if err != nil {
s.log.Error().Err(err).Msg("error testing feed")
return err
}
// implementation == TORZNAB
if feed.Type == string(domain.FeedTypeTorznab) {
// setup torznab Client
c := torznab.NewClient(torznab.Config{Host: feed.URL, ApiKey: feed.ApiKey, Log: subLogger})
caps, err := c.GetCaps()
if err != nil {
s.log.Error().Err(err).Msg("error testing feed")
return err
}
if caps == nil {
s.log.Error().Msg("could not test feed and get caps")
return errors.New("could not test feed and get caps")
if caps == nil {
s.log.Error().Msg("could not test feed and get caps")
return errors.New("could not test feed and get caps")
}
}
s.log.Debug().Msgf("test successful - connected to feed: %+v", feed.URL)
@ -286,11 +305,14 @@ func (s *service) startJob(f domain.Feed) error {
switch fi.Implementation {
case string(domain.FeedTypeTorznab):
if err := s.addTorznabJob(fi); err != nil {
s.log.Error().Err(err).Msg("feed.startJob: failed to initialize feed")
s.log.Error().Err(err).Msg("feed.startJob: failed to initialize torznab feed")
return err
}
case string(domain.FeedTypeRSS):
if err := s.addRSSJob(fi); err != nil {
s.log.Error().Err(err).Msg("feed.startJob: failed to initialize rss feed")
return err
}
//case "rss":
}
return nil
@ -300,7 +322,7 @@ func (s *service) addTorznabJob(f feedInstance) error {
if f.URL == "" {
return errors.New("torznab feed requires URL")
}
if f.CronSchedule < time.Duration(5 * time.Minute) {
if f.CronSchedule < time.Duration(5*time.Minute) {
f.CronSchedule = time.Duration(15 * time.Minute)
}
@ -338,3 +360,43 @@ func (s *service) stopTorznabJob(indexer string) error {
return nil
}
func (s *service) addRSSJob(f feedInstance) error {
if f.URL == "" {
return errors.New("rss feed requires URL")
}
if f.CronSchedule < time.Duration(5*time.Minute) {
f.CronSchedule = time.Duration(15 * time.Minute)
}
// setup logger
l := s.log.With().Str("feed", f.Name).Logger()
// create job
job := NewRSSJob(f.Name, f.IndexerIdentifier, l, f.URL, s.cacheRepo, s.releaseSvc)
// schedule job
id, err := s.scheduler.AddJob(job, f.CronSchedule, f.IndexerIdentifier)
if err != nil {
return errors.Wrap(err, "feed.AddRSSJob: add job failed")
}
job.JobID = id
// add to job map
s.jobs[f.IndexerIdentifier] = id
s.log.Debug().Msgf("feed.AddRSSJob: %v", f.Name)
return nil
}
func (s *service) stopRSSJob(indexer string) error {
// remove job from scheduler
if err := s.scheduler.RemoveJobByIdentifier(indexer); err != nil {
return errors.Wrap(err, "feed.stopRSSJob: stop job failed")
}
s.log.Debug().Msgf("feed.stopRSSJob: %v", indexer)
return nil
}

View file

@ -73,7 +73,6 @@ func (j *TorznabJob) process() error {
rls.TorrentName = item.Title
rls.TorrentURL = item.Link
rls.Implementation = domain.ReleaseImplementationTorznab
rls.Indexer = j.IndexerIdentifier
// parse size bytes string
rls.ParseSizeBytesString(item.Size)