feat(feeds): view latest RSS and Torznab feed (#609)

feat(feeds): view latest run
This commit is contained in:
ze0s 2023-01-02 23:00:11 +01:00 committed by GitHub
parent 5972d421d8
commit fd67a7b24e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 205 additions and 47 deletions

View file

@ -48,7 +48,9 @@ func NewRSSJob(feed *domain.Feed, name string, indexerIdentifier string, log zer
}
func (j *RSSJob) Run() {
if err := j.process(); err != nil {
ctx := context.Background()
if err := j.process(ctx); err != nil {
j.Log.Error().Err(err).Int("attempts", j.attempts).Msg("rss feed process error")
j.errors = append(j.errors, err)
@ -57,12 +59,10 @@ func (j *RSSJob) Run() {
j.attempts = 0
j.errors = []error{}
return
}
func (j *RSSJob) process() error {
items, err := j.getFeed()
func (j *RSSJob) process(ctx context.Context) error {
items, err := j.getFeed(ctx)
if err != nil {
j.Log.Error().Err(err).Msgf("error fetching rss feed items")
return errors.Wrap(err, "error getting rss feed items")
@ -195,8 +195,8 @@ func (j *RSSJob) processItem(item *gofeed.Item) *domain.Release {
return rls
}
func (j *RSSJob) getFeed() (items []*gofeed.Item, err error) {
ctx, cancel := context.WithTimeout(context.Background(), j.Timeout)
func (j *RSSJob) getFeed(ctx context.Context) (items []*gofeed.Item, err error) {
ctx, cancel := context.WithTimeout(ctx, j.Timeout)
defer cancel()
feed, err := NewFeedParser(j.Timeout, j.Feed.Cookie).ParseURLWithContext(ctx, j.URL)
@ -207,7 +207,7 @@ func (j *RSSJob) getFeed() (items []*gofeed.Item, err error) {
// get feed as JSON string
feedData := feed.String()
if err := j.Repo.UpdateLastRunWithData(context.Background(), j.Feed.ID, feedData); err != nil {
if err := j.Repo.UpdateLastRunWithData(ctx, j.Feed.ID, feedData); err != nil {
j.Log.Error().Err(err).Msgf("error updating last run for feed id: %v", j.Feed.ID)
}
@ -247,10 +247,12 @@ func (j *RSSJob) getFeed() (items []*gofeed.Item, err error) {
continue
}
if exists {
j.Log.Trace().Msgf("cache item exists, skipping release: %v", item.Title)
j.Log.Trace().Msgf("cache item exists, skipping release: %s", item.Title)
continue
}
j.Log.Debug().Msgf("found new release: %s", i.Title)
if err := j.CacheRepo.Put(bucketKey, key, []byte(item.Title), ttl); err != nil {
j.Log.Error().Err(err).Str("entry", key).Msg("cache.Put: error storing item in cache")
continue

View file

@ -29,6 +29,7 @@ type Service interface {
Test(ctx context.Context, feed *domain.Feed) error
ToggleEnabled(ctx context.Context, id int, enabled bool) error
Delete(ctx context.Context, id int) error
GetLastRunData(ctx context.Context, id int) (string, error)
Start() error
}
@ -222,7 +223,7 @@ func (s *service) test(ctx context.Context, feed *domain.Feed) error {
// test feeds
if feed.Type == string(domain.FeedTypeTorznab) {
if err := s.testTorznab(feed, subLogger); err != nil {
if err := s.testTorznab(ctx, feed, subLogger); err != nil {
return err
}
} else if feed.Type == string(domain.FeedTypeRSS) {
@ -248,17 +249,17 @@ func (s *service) testRSS(ctx context.Context, feed *domain.Feed) error {
return nil
}
func (s *service) testTorznab(feed *domain.Feed, subLogger *log.Logger) error {
func (s *service) testTorznab(ctx context.Context, feed *domain.Feed, subLogger *log.Logger) error {
// setup torznab Client
c := torznab.NewClient(torznab.Config{Host: feed.URL, ApiKey: feed.ApiKey, Log: subLogger})
items, err := c.FetchFeed()
items, err := c.FetchFeed(ctx)
if err != nil {
s.log.Error().Err(err).Msg("error getting torznab feed")
return err
}
s.log.Info().Msgf("refreshing torznab feed: %v, found (%d) items", feed.Name, len(items))
s.log.Info().Msgf("refreshing torznab feed: %v, found (%d) items", feed.Name, len(items.Channel.Items))
return nil
}
@ -360,7 +361,7 @@ func (s *service) addTorznabJob(f feedInstance) error {
c := torznab.NewClient(torznab.Config{Host: f.URL, ApiKey: f.ApiKey, Timeout: f.Timeout})
// create job
job := NewTorznabJob(f.Name, f.IndexerIdentifier, l, f.URL, c, s.cacheRepo, s.releaseSvc)
job := NewTorznabJob(f.Feed, f.Name, f.IndexerIdentifier, l, f.URL, c, s.repo, s.cacheRepo, s.releaseSvc)
identifierKey := feedKey{f.Feed.ID, f.Feed.Indexer, f.Feed.Name}.ToString()
@ -421,3 +422,16 @@ func (s *service) stopFeedJob(indexer string) error {
return nil
}
func (s *service) GetNextRun(indexer string) (time.Time, error) {
return s.scheduler.GetNextRun(indexer)
}
func (s *service) GetLastRunData(ctx context.Context, id int) (string, error) {
feed, err := s.repo.GetLastRunDataByID(ctx, id)
if err != nil {
return "", err
}
return feed, nil
}

View file

@ -1,12 +1,14 @@
package feed
import (
"context"
"sort"
"strconv"
"time"
"github.com/autobrr/autobrr/internal/domain"
"github.com/autobrr/autobrr/internal/release"
"github.com/autobrr/autobrr/internal/scheduler"
"github.com/autobrr/autobrr/pkg/errors"
"github.com/autobrr/autobrr/pkg/torznab"
@ -14,13 +16,16 @@ import (
)
type TorznabJob struct {
Feed *domain.Feed
Name string
IndexerIdentifier string
Log zerolog.Logger
URL string
Client torznab.Client
Repo domain.FeedCacheRepo
Repo domain.FeedRepo
CacheRepo domain.FeedCacheRepo
ReleaseSvc release.Service
SchedulerSvc scheduler.Service
attempts int
errors []error
@ -28,21 +33,24 @@ type TorznabJob struct {
JobID int
}
func NewTorznabJob(name string, indexerIdentifier string, log zerolog.Logger, url string, client torznab.Client, repo domain.FeedCacheRepo, releaseSvc release.Service) *TorznabJob {
func NewTorznabJob(feed *domain.Feed, name string, indexerIdentifier string, log zerolog.Logger, url string, client torznab.Client, repo domain.FeedRepo, cacheRepo domain.FeedCacheRepo, releaseSvc release.Service) *TorznabJob {
return &TorznabJob{
Feed: feed,
Name: name,
IndexerIdentifier: indexerIdentifier,
Log: log,
URL: url,
Client: client,
Repo: repo,
CacheRepo: cacheRepo,
ReleaseSvc: releaseSvc,
}
}
func (j *TorznabJob) Run() {
err := j.process()
if err != nil {
ctx := context.Background()
if err := j.process(ctx); err != nil {
j.Log.Err(err).Int("attempts", j.attempts).Msg("torznab process error")
j.errors = append(j.errors, err)
@ -52,9 +60,9 @@ func (j *TorznabJob) Run() {
j.errors = j.errors[:0]
}
func (j *TorznabJob) process() error {
func (j *TorznabJob) process(ctx context.Context) error {
// get feed
items, err := j.getFeed()
items, err := j.getFeed(ctx)
if err != nil {
j.Log.Error().Err(err).Msgf("error fetching feed items")
return errors.Wrap(err, "error getting feed items")
@ -112,44 +120,51 @@ func parseFreeleechTorznab(item torznab.FeedItem) bool {
return false
}
func (j *TorznabJob) getFeed() ([]torznab.FeedItem, error) {
func (j *TorznabJob) getFeed(ctx context.Context) ([]torznab.FeedItem, error) {
// get feed
feedItems, err := j.Client.FetchFeed()
feed, err := j.Client.FetchFeed(ctx)
if err != nil {
j.Log.Error().Err(err).Msgf("error fetching feed items")
return nil, errors.Wrap(err, "error fetching feed items")
}
j.Log.Debug().Msgf("refreshing feed: %v, found (%d) items", j.Name, len(feedItems))
if err := j.Repo.UpdateLastRunWithData(ctx, j.Feed.ID, feed.Raw); err != nil {
j.Log.Error().Err(err).Msgf("error updating last run for feed id: %v", j.Feed.ID)
}
j.Log.Debug().Msgf("refreshing feed: %v, found (%d) items", j.Name, len(feed.Channel.Items))
items := make([]torznab.FeedItem, 0)
if len(feedItems) == 0 {
if len(feed.Channel.Items) == 0 {
return items, nil
}
sort.SliceStable(feedItems, func(i, j int) bool {
return feedItems[i].PubDate.After(feedItems[j].PubDate.Time)
sort.SliceStable(feed.Channel.Items, func(i, j int) bool {
return feed.Channel.Items[i].PubDate.After(feed.Channel.Items[j].PubDate.Time)
})
for _, i := range feedItems {
for _, i := range feed.Channel.Items {
if i.GUID == "" {
j.Log.Error().Err(err).Msgf("missing GUID from feed: %s", j.Feed.Name)
continue
}
exists, err := j.Repo.Exists(j.Name, i.GUID)
exists, err := j.CacheRepo.Exists(j.Name, i.GUID)
if err != nil {
j.Log.Error().Err(err).Msg("could not check if item exists")
continue
}
if exists {
j.Log.Trace().Msgf("cache item exists, skipping release: %v", i.Title)
j.Log.Trace().Msgf("cache item exists, skipping release: %s", i.Title)
continue
}
j.Log.Debug().Msgf("found new release: %s", i.Title)
// set ttl to 1 month
ttl := time.Now().AddDate(0, 1, 0)
if err := j.Repo.Put(j.Name, i.GUID, []byte(i.Title), ttl); err != nil {
if err := j.CacheRepo.Put(j.Name, i.GUID, []byte(i.Title), ttl); err != nil {
j.Log.Error().Stack().Err(err).Str("guid", i.GUID).Msg("cache.Put: error storing item in cache")
continue
}