From fd67a7b24e766370c5e50511422f6056492c5028 Mon Sep 17 00:00:00 2001 From: ze0s <43699394+zze0s@users.noreply.github.com> Date: Mon, 2 Jan 2023 23:00:11 +0100 Subject: [PATCH] feat(feeds): view latest RSS and Torznab feed (#609) feat(feeds): view latest run --- internal/action/run.go | 2 +- internal/database/feed.go | 27 ++++++++++++++++++ internal/domain/feed.go | 1 + internal/feed/rss.go | 20 +++++++------ internal/feed/service.go | 24 ++++++++++++---- internal/feed/torznab.go | 47 ++++++++++++++++++++----------- internal/http/feed.go | 30 ++++++++++++++++++++ internal/scheduler/service.go | 25 ++++++++++++++++ pkg/torznab/feed.go | 14 +++++++++ pkg/torznab/torznab.go | 33 ++++++++++++---------- web/src/screens/settings/Feed.tsx | 29 ++++++++++++++++++- 11 files changed, 205 insertions(+), 47 deletions(-) diff --git a/internal/action/run.go b/internal/action/run.go index b8412b3..cd9c87f 100644 --- a/internal/action/run.go +++ b/internal/action/run.go @@ -105,7 +105,7 @@ func (s *service) RunAction(ctx context.Context, action *domain.Action, release ActionClient: action.Client.Name, Rejections: []string{}, Protocol: domain.ReleaseProtocolTorrent, - Implementation: domain.ReleaseImplementationIRC, + Implementation: release.Implementation, Timestamp: time.Now(), } diff --git a/internal/database/feed.go b/internal/database/feed.go index 3246d41..377cd2f 100644 --- a/internal/database/feed.go +++ b/internal/database/feed.go @@ -172,6 +172,33 @@ func (r *FeedRepo) Find(ctx context.Context) ([]domain.Feed, error) { return feeds, nil } +func (r *FeedRepo) GetLastRunDataByID(ctx context.Context, id int) (string, error) { + queryBuilder := r.db.squirrel. + Select( + "last_run_data", + ). + From("feed"). + Where(sq.Eq{"id": id}) + + query, args, err := queryBuilder.ToSql() + if err != nil { + return "", errors.Wrap(err, "error building query") + } + + row := r.db.handler.QueryRowContext(ctx, query, args...) + if err := row.Err(); err != nil { + return "", errors.Wrap(err, "error executing query") + } + + var data sql.NullString + + if err := row.Scan(&data); err != nil { + return "", errors.Wrap(err, "error scanning row") + } + + return data.String, nil +} + func (r *FeedRepo) Store(ctx context.Context, feed *domain.Feed) error { queryBuilder := r.db.squirrel. Insert("feed"). diff --git a/internal/domain/feed.go b/internal/domain/feed.go index a3afed5..54cd1de 100644 --- a/internal/domain/feed.go +++ b/internal/domain/feed.go @@ -19,6 +19,7 @@ type FeedRepo interface { FindByID(ctx context.Context, id int) (*Feed, error) FindByIndexerIdentifier(ctx context.Context, indexer string) (*Feed, error) Find(ctx context.Context) ([]Feed, error) + GetLastRunDataByID(ctx context.Context, id int) (string, error) Store(ctx context.Context, feed *Feed) error Update(ctx context.Context, feed *Feed) error UpdateLastRun(ctx context.Context, feedID int) error diff --git a/internal/feed/rss.go b/internal/feed/rss.go index 74fa54b..19a31b5 100644 --- a/internal/feed/rss.go +++ b/internal/feed/rss.go @@ -48,7 +48,9 @@ func NewRSSJob(feed *domain.Feed, name string, indexerIdentifier string, log zer } func (j *RSSJob) Run() { - if err := j.process(); err != nil { + ctx := context.Background() + + if err := j.process(ctx); err != nil { j.Log.Error().Err(err).Int("attempts", j.attempts).Msg("rss feed process error") j.errors = append(j.errors, err) @@ -57,12 +59,10 @@ func (j *RSSJob) Run() { j.attempts = 0 j.errors = []error{} - - return } -func (j *RSSJob) process() error { - items, err := j.getFeed() +func (j *RSSJob) process(ctx context.Context) error { + items, err := j.getFeed(ctx) if err != nil { j.Log.Error().Err(err).Msgf("error fetching rss feed items") return errors.Wrap(err, "error getting rss feed items") @@ -195,8 +195,8 @@ func (j *RSSJob) processItem(item *gofeed.Item) *domain.Release { return rls } -func (j *RSSJob) getFeed() (items []*gofeed.Item, err error) { - ctx, cancel := context.WithTimeout(context.Background(), j.Timeout) +func (j *RSSJob) getFeed(ctx context.Context) (items []*gofeed.Item, err error) { + ctx, cancel := context.WithTimeout(ctx, j.Timeout) defer cancel() feed, err := NewFeedParser(j.Timeout, j.Feed.Cookie).ParseURLWithContext(ctx, j.URL) @@ -207,7 +207,7 @@ func (j *RSSJob) getFeed() (items []*gofeed.Item, err error) { // get feed as JSON string feedData := feed.String() - if err := j.Repo.UpdateLastRunWithData(context.Background(), j.Feed.ID, feedData); err != nil { + if err := j.Repo.UpdateLastRunWithData(ctx, j.Feed.ID, feedData); err != nil { j.Log.Error().Err(err).Msgf("error updating last run for feed id: %v", j.Feed.ID) } @@ -247,10 +247,12 @@ func (j *RSSJob) getFeed() (items []*gofeed.Item, err error) { continue } if exists { - j.Log.Trace().Msgf("cache item exists, skipping release: %v", item.Title) + j.Log.Trace().Msgf("cache item exists, skipping release: %s", item.Title) continue } + j.Log.Debug().Msgf("found new release: %s", i.Title) + if err := j.CacheRepo.Put(bucketKey, key, []byte(item.Title), ttl); err != nil { j.Log.Error().Err(err).Str("entry", key).Msg("cache.Put: error storing item in cache") continue diff --git a/internal/feed/service.go b/internal/feed/service.go index 8a9ad3e..c9fefee 100644 --- a/internal/feed/service.go +++ b/internal/feed/service.go @@ -29,6 +29,7 @@ type Service interface { Test(ctx context.Context, feed *domain.Feed) error ToggleEnabled(ctx context.Context, id int, enabled bool) error Delete(ctx context.Context, id int) error + GetLastRunData(ctx context.Context, id int) (string, error) Start() error } @@ -222,7 +223,7 @@ func (s *service) test(ctx context.Context, feed *domain.Feed) error { // test feeds if feed.Type == string(domain.FeedTypeTorznab) { - if err := s.testTorznab(feed, subLogger); err != nil { + if err := s.testTorznab(ctx, feed, subLogger); err != nil { return err } } else if feed.Type == string(domain.FeedTypeRSS) { @@ -248,17 +249,17 @@ func (s *service) testRSS(ctx context.Context, feed *domain.Feed) error { return nil } -func (s *service) testTorznab(feed *domain.Feed, subLogger *log.Logger) error { +func (s *service) testTorznab(ctx context.Context, feed *domain.Feed, subLogger *log.Logger) error { // setup torznab Client c := torznab.NewClient(torznab.Config{Host: feed.URL, ApiKey: feed.ApiKey, Log: subLogger}) - items, err := c.FetchFeed() + items, err := c.FetchFeed(ctx) if err != nil { s.log.Error().Err(err).Msg("error getting torznab feed") return err } - s.log.Info().Msgf("refreshing torznab feed: %v, found (%d) items", feed.Name, len(items)) + s.log.Info().Msgf("refreshing torznab feed: %v, found (%d) items", feed.Name, len(items.Channel.Items)) return nil } @@ -360,7 +361,7 @@ func (s *service) addTorznabJob(f feedInstance) error { c := torznab.NewClient(torznab.Config{Host: f.URL, ApiKey: f.ApiKey, Timeout: f.Timeout}) // create job - job := NewTorznabJob(f.Name, f.IndexerIdentifier, l, f.URL, c, s.cacheRepo, s.releaseSvc) + job := NewTorznabJob(f.Feed, f.Name, f.IndexerIdentifier, l, f.URL, c, s.repo, s.cacheRepo, s.releaseSvc) identifierKey := feedKey{f.Feed.ID, f.Feed.Indexer, f.Feed.Name}.ToString() @@ -421,3 +422,16 @@ func (s *service) stopFeedJob(indexer string) error { return nil } + +func (s *service) GetNextRun(indexer string) (time.Time, error) { + return s.scheduler.GetNextRun(indexer) +} + +func (s *service) GetLastRunData(ctx context.Context, id int) (string, error) { + feed, err := s.repo.GetLastRunDataByID(ctx, id) + if err != nil { + return "", err + } + + return feed, nil +} diff --git a/internal/feed/torznab.go b/internal/feed/torznab.go index f0e7285..8d1e05c 100644 --- a/internal/feed/torznab.go +++ b/internal/feed/torznab.go @@ -1,12 +1,14 @@ package feed import ( + "context" "sort" "strconv" "time" "github.com/autobrr/autobrr/internal/domain" "github.com/autobrr/autobrr/internal/release" + "github.com/autobrr/autobrr/internal/scheduler" "github.com/autobrr/autobrr/pkg/errors" "github.com/autobrr/autobrr/pkg/torznab" @@ -14,13 +16,16 @@ import ( ) type TorznabJob struct { + Feed *domain.Feed Name string IndexerIdentifier string Log zerolog.Logger URL string Client torznab.Client - Repo domain.FeedCacheRepo + Repo domain.FeedRepo + CacheRepo domain.FeedCacheRepo ReleaseSvc release.Service + SchedulerSvc scheduler.Service attempts int errors []error @@ -28,21 +33,24 @@ type TorznabJob struct { JobID int } -func NewTorznabJob(name string, indexerIdentifier string, log zerolog.Logger, url string, client torznab.Client, repo domain.FeedCacheRepo, releaseSvc release.Service) *TorznabJob { +func NewTorznabJob(feed *domain.Feed, name string, indexerIdentifier string, log zerolog.Logger, url string, client torznab.Client, repo domain.FeedRepo, cacheRepo domain.FeedCacheRepo, releaseSvc release.Service) *TorznabJob { return &TorznabJob{ + Feed: feed, Name: name, IndexerIdentifier: indexerIdentifier, Log: log, URL: url, Client: client, Repo: repo, + CacheRepo: cacheRepo, ReleaseSvc: releaseSvc, } } func (j *TorznabJob) Run() { - err := j.process() - if err != nil { + ctx := context.Background() + + if err := j.process(ctx); err != nil { j.Log.Err(err).Int("attempts", j.attempts).Msg("torznab process error") j.errors = append(j.errors, err) @@ -52,9 +60,9 @@ func (j *TorznabJob) Run() { j.errors = j.errors[:0] } -func (j *TorznabJob) process() error { +func (j *TorznabJob) process(ctx context.Context) error { // get feed - items, err := j.getFeed() + items, err := j.getFeed(ctx) if err != nil { j.Log.Error().Err(err).Msgf("error fetching feed items") return errors.Wrap(err, "error getting feed items") @@ -112,44 +120,51 @@ func parseFreeleechTorznab(item torznab.FeedItem) bool { return false } -func (j *TorznabJob) getFeed() ([]torznab.FeedItem, error) { +func (j *TorznabJob) getFeed(ctx context.Context) ([]torznab.FeedItem, error) { // get feed - feedItems, err := j.Client.FetchFeed() + feed, err := j.Client.FetchFeed(ctx) if err != nil { j.Log.Error().Err(err).Msgf("error fetching feed items") return nil, errors.Wrap(err, "error fetching feed items") } - j.Log.Debug().Msgf("refreshing feed: %v, found (%d) items", j.Name, len(feedItems)) + if err := j.Repo.UpdateLastRunWithData(ctx, j.Feed.ID, feed.Raw); err != nil { + j.Log.Error().Err(err).Msgf("error updating last run for feed id: %v", j.Feed.ID) + } + + j.Log.Debug().Msgf("refreshing feed: %v, found (%d) items", j.Name, len(feed.Channel.Items)) items := make([]torznab.FeedItem, 0) - if len(feedItems) == 0 { + if len(feed.Channel.Items) == 0 { return items, nil } - sort.SliceStable(feedItems, func(i, j int) bool { - return feedItems[i].PubDate.After(feedItems[j].PubDate.Time) + sort.SliceStable(feed.Channel.Items, func(i, j int) bool { + return feed.Channel.Items[i].PubDate.After(feed.Channel.Items[j].PubDate.Time) }) - for _, i := range feedItems { + for _, i := range feed.Channel.Items { if i.GUID == "" { + j.Log.Error().Err(err).Msgf("missing GUID from feed: %s", j.Feed.Name) continue } - exists, err := j.Repo.Exists(j.Name, i.GUID) + exists, err := j.CacheRepo.Exists(j.Name, i.GUID) if err != nil { j.Log.Error().Err(err).Msg("could not check if item exists") continue } if exists { - j.Log.Trace().Msgf("cache item exists, skipping release: %v", i.Title) + j.Log.Trace().Msgf("cache item exists, skipping release: %s", i.Title) continue } + j.Log.Debug().Msgf("found new release: %s", i.Title) + // set ttl to 1 month ttl := time.Now().AddDate(0, 1, 0) - if err := j.Repo.Put(j.Name, i.GUID, []byte(i.Title), ttl); err != nil { + if err := j.CacheRepo.Put(j.Name, i.GUID, []byte(i.Title), ttl); err != nil { j.Log.Error().Stack().Err(err).Str("guid", i.GUID).Msg("cache.Put: error storing item in cache") continue } diff --git a/internal/http/feed.go b/internal/http/feed.go index 1b81f4b..b4a96c0 100644 --- a/internal/http/feed.go +++ b/internal/http/feed.go @@ -18,6 +18,7 @@ type feedService interface { Delete(ctx context.Context, id int) error ToggleEnabled(ctx context.Context, id int, enabled bool) error Test(ctx context.Context, feed *domain.Feed) error + GetLastRunData(ctx context.Context, id int) (string, error) } type feedHandler struct { @@ -34,6 +35,7 @@ func newFeedHandler(encoder encoder, service feedService) *feedHandler { func (h feedHandler) Routes(r chi.Router) { r.Get("/", h.find) + r.Get("/{feedID}/latest", h.latestRun) r.Post("/", h.store) r.Post("/test", h.test) r.Put("/{feedID}", h.update) @@ -160,3 +162,31 @@ func (h feedHandler) delete(w http.ResponseWriter, r *http.Request) { h.encoder.StatusResponse(ctx, w, nil, http.StatusNoContent) } + +func (h feedHandler) latestRun(w http.ResponseWriter, r *http.Request) { + var ( + ctx = r.Context() + filterID = chi.URLParam(r, "feedID") + ) + + id, err := strconv.Atoi(filterID) + if err != nil { + h.encoder.StatusInternalError(w) + return + } + + feed, err := h.service.GetLastRunData(ctx, id) + if err != nil { + h.encoder.StatusInternalError(w) + return + } + + if feed == "" { + h.encoder.StatusNotFound(ctx, w) + w.Write([]byte("No data found")) + return + } + + w.WriteHeader(http.StatusOK) + w.Write([]byte(feed)) +} diff --git a/internal/scheduler/service.go b/internal/scheduler/service.go index 61841f4..f68b173 100644 --- a/internal/scheduler/service.go +++ b/internal/scheduler/service.go @@ -16,6 +16,7 @@ type Service interface { Stop() AddJob(job cron.Job, interval time.Duration, identifier string) (int, error) RemoveJobByIdentifier(id string) error + GetNextRun(id string) (time.Time, error) } type service struct { @@ -110,6 +111,30 @@ func (s *service) RemoveJobByIdentifier(id string) error { return nil } +func (s *service) GetNextRun(id string) (time.Time, error) { + entry := s.getEntryById(id) + + if !entry.Valid() { + return time.Time{}, nil + } + + s.log.Debug().Msgf("scheduler.GetNextRun: %s next run: %s", id, entry.Next) + + return entry.Next, nil +} + +func (s *service) getEntryById(id string) cron.Entry { + s.m.Lock() + defer s.m.Unlock() + + v, ok := s.jobs[id] + if !ok { + return cron.Entry{} + } + + return s.cron.Entry(v) +} + type GenericJob struct { Name string Log zerolog.Logger diff --git a/pkg/torznab/feed.go b/pkg/torznab/feed.go index fb5fe4f..1cc1820 100644 --- a/pkg/torznab/feed.go +++ b/pkg/torznab/feed.go @@ -7,6 +7,20 @@ import ( "github.com/autobrr/autobrr/pkg/errors" ) +type Feed struct { + Channel Channel `xml:"channel"` + Raw string +} + +func (f Feed) Len() int { + return len(f.Channel.Items) +} + +type Channel struct { + Title string `xml:"title"` + Items []FeedItem `xml:"item"` +} + type Response struct { Channel struct { Items []FeedItem `xml:"item"` diff --git a/pkg/torznab/torznab.go b/pkg/torznab/torznab.go index 6e04c9f..4c5d159 100644 --- a/pkg/torznab/torznab.go +++ b/pkg/torznab/torznab.go @@ -2,6 +2,7 @@ package torznab import ( "bytes" + "context" "encoding/xml" "io" "log" @@ -15,8 +16,8 @@ import ( ) type Client interface { - FetchFeed() ([]FeedItem, error) - FetchCaps() (*Caps, error) + FetchFeed(ctx context.Context) (*Feed, error) + FetchCaps(ctx context.Context) (*Caps, error) GetCaps() *Caps } @@ -74,7 +75,7 @@ func NewClient(config Config) Client { return c } -func (c *client) get(endpoint string, opts map[string]string) (int, *Response, error) { +func (c *client) get(ctx context.Context, endpoint string, opts map[string]string) (int, *Feed, error) { params := url.Values{ "t": {"search"}, } @@ -88,7 +89,7 @@ func (c *client) get(endpoint string, opts map[string]string) (int, *Response, e u.RawQuery = params.Encode() reqUrl := u.String() - req, err := http.NewRequest("GET", reqUrl, nil) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, reqUrl, nil) if err != nil { return 0, nil, errors.Wrap(err, "could not build request") } @@ -121,17 +122,19 @@ func (c *client) get(endpoint string, opts map[string]string) (int, *Response, e return resp.StatusCode, nil, errors.Wrap(err, "torznab.io.Copy") } - var response Response + var response Feed if err := xml.Unmarshal(buf.Bytes(), &response); err != nil { return resp.StatusCode, nil, errors.Wrap(err, "torznab: could not decode feed") } + response.Raw = buf.String() + return resp.StatusCode, &response, nil } -func (c *client) FetchFeed() ([]FeedItem, error) { +func (c *client) FetchFeed(ctx context.Context) (*Feed, error) { if c.Capabilities == nil { - status, caps, err := c.getCaps("?t=caps", nil) + status, caps, err := c.getCaps(ctx, "?t=caps", nil) if err != nil { return nil, errors.Wrap(err, "could not get caps for feed") } @@ -143,7 +146,7 @@ func (c *client) FetchFeed() ([]FeedItem, error) { c.Capabilities = caps } - status, res, err := c.get("", nil) + status, res, err := c.get(ctx, "", nil) if err != nil { return nil, errors.Wrap(err, "could not get feed") } @@ -156,10 +159,10 @@ func (c *client) FetchFeed() ([]FeedItem, error) { item.MapCategories(c.Capabilities.Categories.Categories) } - return res.Channel.Items, nil + return res, nil } -func (c *client) getCaps(endpoint string, opts map[string]string) (int, *Caps, error) { +func (c *client) getCaps(ctx context.Context, endpoint string, opts map[string]string) (int, *Caps, error) { params := url.Values{ "t": {"caps"}, } @@ -173,7 +176,7 @@ func (c *client) getCaps(endpoint string, opts map[string]string) (int, *Caps, e u.RawQuery = params.Encode() reqUrl := u.String() - req, err := http.NewRequest("GET", reqUrl, nil) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, reqUrl, nil) if err != nil { return 0, nil, errors.Wrap(err, "could not build request") } @@ -220,9 +223,9 @@ func (c *client) getCaps(endpoint string, opts map[string]string) (int, *Caps, e return resp.StatusCode, &response, nil } -func (c *client) FetchCaps() (*Caps, error) { +func (c *client) FetchCaps(ctx context.Context) (*Caps, error) { - status, res, err := c.getCaps("?t=caps", nil) + status, res, err := c.getCaps(ctx, "?t=caps", nil) if err != nil { return nil, errors.Wrap(err, "could not get caps for feed") } @@ -238,12 +241,12 @@ func (c *client) GetCaps() *Caps { return c.Capabilities } -func (c *client) Search(query string) ([]FeedItem, error) { +func (c *client) Search(ctx context.Context, query string) ([]FeedItem, error) { v := url.Values{} v.Add("q", query) params := v.Encode() - status, res, err := c.get("&t=search&"+params, nil) + status, res, err := c.get(ctx, "&t=search&"+params, nil) if err != nil { return nil, errors.Wrap(err, "could not search feed") } diff --git a/web/src/screens/settings/Feed.tsx b/web/src/screens/settings/Feed.tsx index 6e65482..88ca471 100644 --- a/web/src/screens/settings/Feed.tsx +++ b/web/src/screens/settings/Feed.tsx @@ -9,7 +9,13 @@ import { toast } from "react-hot-toast"; import Toast from "../../components/notifications/Toast"; import { queryClient } from "../../App"; import { DeleteModal } from "../../components/modals"; -import { ArrowsRightLeftIcon, EllipsisHorizontalIcon, PencilSquareIcon, TrashIcon } from "@heroicons/react/24/outline"; +import { + ArrowsRightLeftIcon, + DocumentTextIcon, + EllipsisHorizontalIcon, + PencilSquareIcon, + TrashIcon +} from "@heroicons/react/24/outline"; import { FeedUpdateForm } from "../../forms/settings/FeedForms"; import { EmptySimple } from "../../components/emptystates"; import { ImplementationBadges } from "./Indexer"; @@ -242,6 +248,27 @@ const FeedItemDropdown = ({ )} +