autobrr/internal/feed/service.go
ze0s 3c3b47fa10
feat(macros): add IndexerName (#1511)
* feat(macros): add IndexerName

* fix: tests

* fix: tests
2024-04-16 17:35:17 +02:00

560 lines
14 KiB
Go

// Copyright (c) 2021 - 2024, Ludvig Lundgren and the autobrr contributors.
// SPDX-License-Identifier: GPL-2.0-or-later
package feed
import (
"context"
"fmt"
"log"
"time"
"github.com/autobrr/autobrr/internal/domain"
"github.com/autobrr/autobrr/internal/logger"
"github.com/autobrr/autobrr/internal/release"
"github.com/autobrr/autobrr/internal/scheduler"
"github.com/autobrr/autobrr/pkg/errors"
"github.com/autobrr/autobrr/pkg/newznab"
"github.com/autobrr/autobrr/pkg/torznab"
"github.com/dcarbone/zadapters/zstdlog"
"github.com/mmcdole/gofeed"
"github.com/robfig/cron/v3"
"github.com/rs/zerolog"
)
type Service interface {
FindByID(ctx context.Context, id int) (*domain.Feed, error)
FindByIndexerIdentifier(ctx context.Context, indexer string) (*domain.Feed, error)
Find(ctx context.Context) ([]domain.Feed, error)
GetCacheByID(ctx context.Context, feedId int) ([]domain.FeedCacheItem, error)
Store(ctx context.Context, feed *domain.Feed) error
Update(ctx context.Context, feed *domain.Feed) error
Test(ctx context.Context, feed *domain.Feed) error
ToggleEnabled(ctx context.Context, id int, enabled bool) error
Delete(ctx context.Context, id int) error
DeleteFeedCache(ctx context.Context, id int) error
GetLastRunData(ctx context.Context, id int) (string, error)
DeleteFeedCacheStale(ctx context.Context) error
ForceRun(ctx context.Context, id int) error
Start() error
}
type feedInstance struct {
Feed *domain.Feed
Name string
Indexer domain.IndexerMinimal
URL string
ApiKey string
Implementation string
CronSchedule time.Duration
Timeout time.Duration
}
// feedKey creates a unique identifier to be used for controlling jobs in the scheduler
type feedKey struct {
id int
}
// ToString creates a string of the unique id to be used for controlling jobs in the scheduler
func (k feedKey) ToString() string {
return fmt.Sprintf("feed-%d", k.id)
}
type service struct {
log zerolog.Logger
jobs map[string]int
repo domain.FeedRepo
cacheRepo domain.FeedCacheRepo
releaseSvc release.Service
scheduler scheduler.Service
}
func NewService(log logger.Logger, repo domain.FeedRepo, cacheRepo domain.FeedCacheRepo, releaseSvc release.Service, scheduler scheduler.Service) Service {
return &service{
log: log.With().Str("module", "feed").Logger(),
jobs: map[string]int{},
repo: repo,
cacheRepo: cacheRepo,
releaseSvc: releaseSvc,
scheduler: scheduler,
}
}
func (s *service) FindByID(ctx context.Context, id int) (*domain.Feed, error) {
return s.repo.FindByID(ctx, id)
}
func (s *service) FindByIndexerIdentifier(ctx context.Context, indexer string) (*domain.Feed, error) {
return s.repo.FindByIndexerIdentifier(ctx, indexer)
}
func (s *service) Find(ctx context.Context) ([]domain.Feed, error) {
feeds, err := s.repo.Find(ctx)
if err != nil {
return nil, err
}
for i, feed := range feeds {
t, err := s.scheduler.GetNextRun(feedKey{id: feed.ID}.ToString())
if err != nil {
continue
}
feed.NextRun = t
feeds[i] = feed
}
return feeds, nil
}
func (s *service) GetCacheByID(ctx context.Context, feedId int) ([]domain.FeedCacheItem, error) {
return s.cacheRepo.GetByFeed(ctx, feedId)
}
func (s *service) Store(ctx context.Context, feed *domain.Feed) error {
return s.repo.Store(ctx, feed)
}
func (s *service) Update(ctx context.Context, feed *domain.Feed) error {
return s.update(ctx, feed)
}
func (s *service) Delete(ctx context.Context, id int) error {
return s.delete(ctx, id)
}
func (s *service) DeleteFeedCache(ctx context.Context, id int) error {
return s.cacheRepo.DeleteByFeed(ctx, id)
}
func (s *service) DeleteFeedCacheStale(ctx context.Context) error {
return s.cacheRepo.DeleteStale(ctx)
}
func (s *service) ToggleEnabled(ctx context.Context, id int, enabled bool) error {
return s.toggleEnabled(ctx, id, enabled)
}
func (s *service) Test(ctx context.Context, feed *domain.Feed) error {
return s.test(ctx, feed)
}
func (s *service) Start() error {
return s.start()
}
func (s *service) update(ctx context.Context, feed *domain.Feed) error {
if err := s.repo.Update(ctx, feed); err != nil {
s.log.Error().Err(err).Msg("error updating feed")
return err
}
if err := s.restartJob(feed); err != nil {
s.log.Error().Err(err).Msg("error restarting feed")
return err
}
return nil
}
func (s *service) delete(ctx context.Context, id int) error {
f, err := s.repo.FindByID(ctx, id)
if err != nil {
s.log.Error().Err(err).Msg("error finding feed")
return err
}
s.log.Debug().Msgf("stopping and removing feed: %s", f.Name)
if err := s.stopFeedJob(f.ID); err != nil {
s.log.Error().Err(err).Msgf("error stopping rss job: %s id: %d", f.Name, f.ID)
return err
}
// delete feed and cascade delete feed_cache by fk
if err := s.repo.Delete(ctx, f.ID); err != nil {
s.log.Error().Err(err).Msgf("error deleting feed: %s", f.Name)
return err
}
return nil
}
func (s *service) toggleEnabled(ctx context.Context, id int, enabled bool) error {
f, err := s.repo.FindByID(ctx, id)
if err != nil {
s.log.Error().Err(err).Msg("error finding feed")
return err
}
if err := s.repo.ToggleEnabled(ctx, id, enabled); err != nil {
s.log.Error().Err(err).Msg("error feed toggle enabled")
return err
}
if f.Enabled != enabled {
if enabled {
// override enabled
f.Enabled = true
if err := s.startJob(f); err != nil {
s.log.Error().Err(err).Msg("error starting feed job")
return err
}
s.log.Debug().Msgf("feed started: %s", f.Name)
return nil
}
s.log.Debug().Msgf("stopping feed: %s", f.Name)
if err := s.stopFeedJob(f.ID); err != nil {
s.log.Error().Err(err).Msg("error stopping feed job")
return err
}
s.log.Debug().Msgf("feed stopped: %s", f.Name)
return nil
}
return nil
}
func (s *service) test(ctx context.Context, feed *domain.Feed) error {
// create sub logger
subLogger := zstdlog.NewStdLoggerWithLevel(s.log.With().Logger(), zerolog.DebugLevel)
// test feeds
switch feed.Type {
case string(domain.FeedTypeTorznab):
if err := s.testTorznab(ctx, feed, subLogger); err != nil {
return err
}
case string(domain.FeedTypeNewznab):
if err := s.testNewznab(ctx, feed, subLogger); err != nil {
return err
}
case string(domain.FeedTypeRSS):
if err := s.testRSS(ctx, feed); err != nil {
return err
}
default:
return errors.New("unsupported feed type: %s", feed.Type)
}
s.log.Info().Msgf("feed test successful - connected to feed: %s", feed.URL)
return nil
}
func (s *service) testRSS(ctx context.Context, feed *domain.Feed) error {
f, err := gofeed.NewParser().ParseURLWithContext(feed.URL, ctx)
if err != nil {
s.log.Error().Err(err).Msgf("error fetching rss feed items")
return errors.Wrap(err, "error fetching rss feed items")
}
s.log.Info().Msgf("refreshing rss feed: %s, found (%d) items", feed.Name, len(f.Items))
return nil
}
func (s *service) testTorznab(ctx context.Context, feed *domain.Feed, subLogger *log.Logger) error {
// setup torznab Client
c := torznab.NewClient(torznab.Config{Host: feed.URL, ApiKey: feed.ApiKey, Log: subLogger})
items, err := c.FetchFeed(ctx)
if err != nil {
s.log.Error().Err(err).Msg("error getting torznab feed")
return err
}
s.log.Info().Msgf("refreshing torznab feed: %s, found (%d) items", feed.Name, len(items.Channel.Items))
return nil
}
func (s *service) testNewznab(ctx context.Context, feed *domain.Feed, subLogger *log.Logger) error {
// setup newznab Client
c := newznab.NewClient(newznab.Config{Host: feed.URL, ApiKey: feed.ApiKey, Log: subLogger})
items, err := c.GetFeed(ctx)
if err != nil {
s.log.Error().Err(err).Msg("error getting newznab feed")
return err
}
s.log.Info().Msgf("refreshing newznab feed: %s, found (%d) items", feed.Name, len(items.Channel.Items))
return nil
}
func (s *service) start() error {
// always run feed cache maintenance job
if err := s.createCleanupJob(); err != nil {
s.log.Error().Err(err).Msg("could not start feed cache cleanup job")
}
// get all feeds
feeds, err := s.repo.Find(context.TODO())
if err != nil {
s.log.Error().Err(err).Msg("error finding feeds")
return err
}
for _, feed := range feeds {
feed := feed
if !feed.Enabled {
s.log.Trace().Msgf("feed disabled, skipping... %s", feed.Name)
continue
}
if err := s.startJob(&feed); err != nil {
s.log.Error().Err(err).Msgf("failed to initialize feed job: %s", feed.Name)
continue
}
}
return nil
}
func (s *service) restartJob(f *domain.Feed) error {
s.log.Debug().Msgf("stopping feed: %s", f.Name)
// stop feed job
if err := s.stopFeedJob(f.ID); err != nil {
s.log.Error().Err(err).Msg("error stopping feed job")
return err
}
if f.Enabled {
if err := s.startJob(f); err != nil {
s.log.Error().Err(err).Msg("error starting feed job")
return err
}
s.log.Debug().Msgf("restarted feed: %s", f.Name)
}
return nil
}
func newFeedInstance(f *domain.Feed) feedInstance {
// cron schedule to run every X minutes
fi := feedInstance{
Feed: f,
Name: f.Name,
Indexer: f.Indexer,
Implementation: f.Type,
URL: f.URL,
ApiKey: f.ApiKey,
CronSchedule: time.Duration(f.Interval) * time.Minute,
Timeout: time.Duration(f.Timeout) * time.Second,
}
return fi
}
func (s *service) initializeFeedJob(fi feedInstance) (FeedJob, error) {
var err error
var job FeedJob
switch fi.Implementation {
case string(domain.FeedTypeTorznab):
job, err = s.createTorznabJob(fi)
case string(domain.FeedTypeNewznab):
job, err = s.createNewznabJob(fi)
case string(domain.FeedTypeRSS):
job, err = s.createRSSJob(fi)
default:
return nil, errors.New("unsupported feed type: %s", fi.Implementation)
}
if err != nil {
s.log.Error().Err(err).Msgf("failed to initialize %s feed", fi.Implementation)
return nil, err
}
return job, nil
}
func (s *service) startJob(f *domain.Feed) error {
// if it's not enabled we should not start it
if !f.Enabled {
return errors.New("feed %s not enabled", f.Name)
}
// get url from settings
if f.URL == "" {
return errors.New("no URL provided for feed: %s", f.Name)
}
fi := newFeedInstance(f)
job, err := s.initializeFeedJob(fi)
if err != nil {
return errors.Wrap(err, "initialize job %s failed", f.Name)
}
if err := s.scheduleJob(fi, job); err != nil {
return errors.Wrap(err, "schedule job %s failed", f.Name)
}
s.log.Debug().Msgf("successfully started feed: %s", f.Name)
return nil
}
func (s *service) scheduleJob(fi feedInstance, job cron.Job) error {
identifierKey := feedKey{fi.Feed.ID}.ToString()
// schedule job
id, err := s.scheduler.ScheduleJob(job, fi.CronSchedule, identifierKey)
if err != nil {
return errors.Wrap(err, "add job %s failed", identifierKey)
}
// add to job map
s.jobs[identifierKey] = id
return nil
}
func (s *service) createTorznabJob(f feedInstance) (FeedJob, error) {
s.log.Debug().Msgf("create torznab job: %s", f.Name)
if f.URL == "" {
return nil, errors.New("torznab feed requires URL")
}
//if f.CronSchedule < 5*time.Minute {
// f.CronSchedule = 15 * time.Minute
//}
// setup logger
l := s.log.With().Str("feed", f.Name).Logger()
// setup torznab Client
client := torznab.NewClient(torznab.Config{Host: f.URL, ApiKey: f.ApiKey, Timeout: f.Timeout})
// create job
job := NewTorznabJob(f.Feed, f.Name, l, f.URL, client, s.repo, s.cacheRepo, s.releaseSvc)
return job, nil
}
func (s *service) createNewznabJob(f feedInstance) (FeedJob, error) {
s.log.Debug().Msgf("create newznab job: %s", f.Name)
if f.URL == "" {
return nil, errors.New("newznab feed requires URL")
}
// setup logger
l := s.log.With().Str("feed", f.Name).Logger()
// setup newznab Client
client := newznab.NewClient(newznab.Config{Host: f.URL, ApiKey: f.ApiKey, Timeout: f.Timeout})
// create job
job := NewNewznabJob(f.Feed, f.Name, l, f.URL, client, s.repo, s.cacheRepo, s.releaseSvc)
return job, nil
}
func (s *service) createRSSJob(f feedInstance) (FeedJob, error) {
s.log.Debug().Msgf("create rss job: %s", f.Name)
if f.URL == "" {
return nil, errors.New("rss feed requires URL")
}
//if f.CronSchedule < time.Duration(5*time.Minute) {
// f.CronSchedule = time.Duration(15 * time.Minute)
//}
// setup logger
l := s.log.With().Str("feed", f.Name).Logger()
// create job
job := NewRSSJob(f.Feed, f.Name, l, f.URL, s.repo, s.cacheRepo, s.releaseSvc, f.Timeout)
return job, nil
}
func (s *service) createCleanupJob() error {
// setup logger
l := s.log.With().Str("job", "feed-cache-cleanup").Logger()
// create job
job := NewCleanupJob(l, s.cacheRepo)
identifierKey := "feed-cache-cleanup"
// schedule job for every day at 03:05
id, err := s.scheduler.AddJob(job, "5 3 * * *", identifierKey)
if err != nil {
return errors.Wrap(err, "add job %s failed", identifierKey)
}
// add to job map
s.jobs[identifierKey] = id
return nil
}
func (s *service) stopFeedJob(id int) error {
// remove job from scheduler
if err := s.scheduler.RemoveJobByIdentifier(feedKey{id}.ToString()); err != nil {
return errors.Wrap(err, "stop job failed")
}
s.log.Debug().Msgf("stop feed job: %d", id)
return nil
}
func (s *service) GetNextRun(id int) (time.Time, error) {
return s.scheduler.GetNextRun(feedKey{id}.ToString())
}
func (s *service) GetLastRunData(ctx context.Context, id int) (string, error) {
feed, err := s.repo.GetLastRunDataByID(ctx, id)
if err != nil {
return "", err
}
return feed, nil
}
func (s *service) ForceRun(ctx context.Context, id int) error {
feed, err := s.FindByID(ctx, id)
if err != nil {
return err
}
fi := newFeedInstance(feed)
job, err := s.initializeFeedJob(fi)
if err != nil {
s.log.Error().Err(err).Msg("failed to initialize feed job")
return err
}
if err := job.RunE(ctx); err != nil {
s.log.Error().Err(err).Msg("failed to refresh feed")
return err
}
return nil
}