mirror of
https://github.com/idanoo/autobrr
synced 2025-07-23 00:39:13 +00:00
feat: add usenet support (#543)
* feat(autobrr): implement usenet support * feat(sonarr): implement usenet support * feat(radarr): implement usenet support * feat(announce): implement usenet support * announce: cast a line * feat(release): prevent unknown protocol transfer * release: lines for days. * feat: add newznab and sabnzbd support * feat: add category to sabnzbd * feat(newznab): map categories * feat(newznab): map categories --------- Co-authored-by: ze0s <43699394+zze0s@users.noreply.github.com> Co-authored-by: ze0s <ze0s@riseup.net>
This commit is contained in:
parent
b2d93d50c5
commit
13a74f7cc8
29 changed files with 1588 additions and 37 deletions
168
internal/feed/newznab.go
Normal file
168
internal/feed/newznab.go
Normal file
|
@ -0,0 +1,168 @@
|
|||
package feed
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sort"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/autobrr/autobrr/internal/domain"
|
||||
"github.com/autobrr/autobrr/internal/release"
|
||||
"github.com/autobrr/autobrr/internal/scheduler"
|
||||
"github.com/autobrr/autobrr/pkg/errors"
|
||||
"github.com/autobrr/autobrr/pkg/newznab"
|
||||
|
||||
"github.com/rs/zerolog"
|
||||
)
|
||||
|
||||
type NewznabJob struct {
|
||||
Feed *domain.Feed
|
||||
Name string
|
||||
IndexerIdentifier string
|
||||
Log zerolog.Logger
|
||||
URL string
|
||||
Client newznab.Client
|
||||
Repo domain.FeedRepo
|
||||
CacheRepo domain.FeedCacheRepo
|
||||
ReleaseSvc release.Service
|
||||
SchedulerSvc scheduler.Service
|
||||
|
||||
attempts int
|
||||
errors []error
|
||||
|
||||
JobID int
|
||||
}
|
||||
|
||||
func NewNewznabJob(feed *domain.Feed, name string, indexerIdentifier string, log zerolog.Logger, url string, client newznab.Client, repo domain.FeedRepo, cacheRepo domain.FeedCacheRepo, releaseSvc release.Service) *NewznabJob {
|
||||
return &NewznabJob{
|
||||
Feed: feed,
|
||||
Name: name,
|
||||
IndexerIdentifier: indexerIdentifier,
|
||||
Log: log,
|
||||
URL: url,
|
||||
Client: client,
|
||||
Repo: repo,
|
||||
CacheRepo: cacheRepo,
|
||||
ReleaseSvc: releaseSvc,
|
||||
}
|
||||
}
|
||||
|
||||
func (j *NewznabJob) Run() {
|
||||
ctx := context.Background()
|
||||
|
||||
if err := j.process(ctx); err != nil {
|
||||
j.Log.Err(err).Int("attempts", j.attempts).Msg("newznab process error")
|
||||
|
||||
j.errors = append(j.errors, err)
|
||||
}
|
||||
|
||||
j.attempts = 0
|
||||
j.errors = j.errors[:0]
|
||||
}
|
||||
|
||||
func (j *NewznabJob) process(ctx context.Context) error {
|
||||
// get feed
|
||||
items, err := j.getFeed(ctx)
|
||||
if err != nil {
|
||||
j.Log.Error().Err(err).Msgf("error fetching feed items")
|
||||
return errors.Wrap(err, "error getting feed items")
|
||||
}
|
||||
|
||||
j.Log.Debug().Msgf("found (%d) new items to process", len(items))
|
||||
|
||||
if len(items) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
releases := make([]*domain.Release, 0)
|
||||
|
||||
for _, item := range items {
|
||||
rls := domain.NewRelease(j.IndexerIdentifier)
|
||||
|
||||
rls.TorrentName = item.Title
|
||||
rls.InfoURL = item.GUID
|
||||
rls.Implementation = domain.ReleaseImplementationNewznab
|
||||
rls.Protocol = domain.ReleaseProtocolNzb
|
||||
|
||||
// parse size bytes string
|
||||
rls.ParseSizeBytesString(item.Size)
|
||||
|
||||
rls.ParseString(item.Title)
|
||||
|
||||
if item.Enclosure != nil {
|
||||
if item.Enclosure.Type == "application/x-nzb" {
|
||||
rls.TorrentURL = item.Enclosure.Url
|
||||
}
|
||||
}
|
||||
|
||||
// map newznab categories ID and Name into rls.Categories
|
||||
// so we can filter on both ID and Name
|
||||
for _, category := range item.Categories {
|
||||
rls.Categories = append(rls.Categories, []string{category.Name, strconv.Itoa(category.ID)}...)
|
||||
}
|
||||
|
||||
releases = append(releases, rls)
|
||||
}
|
||||
|
||||
// process all new releases
|
||||
go j.ReleaseSvc.ProcessMultiple(releases)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (j *NewznabJob) getFeed(ctx context.Context) ([]newznab.FeedItem, error) {
|
||||
// get feed
|
||||
feed, err := j.Client.GetFeed(ctx)
|
||||
if err != nil {
|
||||
j.Log.Error().Err(err).Msgf("error fetching feed items")
|
||||
return nil, errors.Wrap(err, "error fetching feed items")
|
||||
}
|
||||
|
||||
if err := j.Repo.UpdateLastRunWithData(ctx, j.Feed.ID, feed.Raw); err != nil {
|
||||
j.Log.Error().Err(err).Msgf("error updating last run for feed id: %v", j.Feed.ID)
|
||||
}
|
||||
|
||||
j.Log.Debug().Msgf("refreshing feed: %v, found (%d) items", j.Name, len(feed.Channel.Items))
|
||||
|
||||
items := make([]newznab.FeedItem, 0)
|
||||
if len(feed.Channel.Items) == 0 {
|
||||
return items, nil
|
||||
}
|
||||
|
||||
sort.SliceStable(feed.Channel.Items, func(i, j int) bool {
|
||||
return feed.Channel.Items[i].PubDate.After(feed.Channel.Items[j].PubDate.Time)
|
||||
})
|
||||
|
||||
for _, i := range feed.Channel.Items {
|
||||
if i.GUID == "" {
|
||||
j.Log.Error().Err(err).Msgf("missing GUID from feed: %s", j.Feed.Name)
|
||||
continue
|
||||
}
|
||||
|
||||
exists, err := j.CacheRepo.Exists(j.Name, i.GUID)
|
||||
if err != nil {
|
||||
j.Log.Error().Err(err).Msg("could not check if item exists")
|
||||
continue
|
||||
}
|
||||
if exists {
|
||||
j.Log.Trace().Msgf("cache item exists, skipping release: %s", i.Title)
|
||||
continue
|
||||
}
|
||||
|
||||
j.Log.Debug().Msgf("found new release: %s", i.Title)
|
||||
|
||||
// set ttl to 1 month
|
||||
ttl := time.Now().AddDate(0, 1, 0)
|
||||
|
||||
if err := j.CacheRepo.Put(j.Name, i.GUID, []byte(i.Title), ttl); err != nil {
|
||||
j.Log.Error().Stack().Err(err).Str("guid", i.GUID).Msg("cache.Put: error storing item in cache")
|
||||
continue
|
||||
}
|
||||
|
||||
// only append if we successfully added to cache
|
||||
items = append(items, i)
|
||||
}
|
||||
|
||||
// send to filters
|
||||
return items, nil
|
||||
}
|
|
@ -12,6 +12,7 @@ import (
|
|||
"github.com/autobrr/autobrr/internal/release"
|
||||
"github.com/autobrr/autobrr/internal/scheduler"
|
||||
"github.com/autobrr/autobrr/pkg/errors"
|
||||
"github.com/autobrr/autobrr/pkg/newznab"
|
||||
"github.com/autobrr/autobrr/pkg/torznab"
|
||||
|
||||
"github.com/dcarbone/zadapters/zstdlog"
|
||||
|
@ -222,17 +223,27 @@ func (s *service) test(ctx context.Context, feed *domain.Feed) error {
|
|||
subLogger := zstdlog.NewStdLoggerWithLevel(s.log.With().Logger(), zerolog.DebugLevel)
|
||||
|
||||
// test feeds
|
||||
if feed.Type == string(domain.FeedTypeTorznab) {
|
||||
switch feed.Type {
|
||||
case string(domain.FeedTypeTorznab):
|
||||
if err := s.testTorznab(ctx, feed, subLogger); err != nil {
|
||||
return err
|
||||
}
|
||||
} else if feed.Type == string(domain.FeedTypeRSS) {
|
||||
|
||||
case string(domain.FeedTypeNewznab):
|
||||
if err := s.testNewznab(ctx, feed, subLogger); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
case string(domain.FeedTypeRSS):
|
||||
if err := s.testRSS(ctx, feed); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
default:
|
||||
return errors.New("unsupported feed type: %s", feed.Type)
|
||||
}
|
||||
|
||||
s.log.Info().Msgf("feed test successful - connected to feed: %v", feed.URL)
|
||||
s.log.Info().Msgf("feed test successful - connected to feed: %s", feed.URL)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -264,6 +275,21 @@ func (s *service) testTorznab(ctx context.Context, feed *domain.Feed, subLogger
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *service) testNewznab(ctx context.Context, feed *domain.Feed, subLogger *log.Logger) error {
|
||||
// setup newznab Client
|
||||
c := newznab.NewClient(newznab.Config{Host: feed.URL, ApiKey: feed.ApiKey, Log: subLogger})
|
||||
|
||||
items, err := c.GetFeed(ctx)
|
||||
if err != nil {
|
||||
s.log.Error().Err(err).Msg("error getting newznab feed")
|
||||
return err
|
||||
}
|
||||
|
||||
s.log.Info().Msgf("refreshing newznab feed: %v, found (%d) items", feed.Name, len(items.Channel.Items))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *service) start() error {
|
||||
// get all torznab indexer definitions
|
||||
feeds, err := s.repo.Find(context.TODO())
|
||||
|
@ -335,6 +361,13 @@ func (s *service) startJob(f *domain.Feed) error {
|
|||
s.log.Error().Err(err).Msg("failed to initialize torznab feed")
|
||||
return err
|
||||
}
|
||||
|
||||
case string(domain.FeedTypeNewznab):
|
||||
if err := s.addNewznabJob(fi); err != nil {
|
||||
s.log.Error().Err(err).Msg("failed to initialize newznab feed")
|
||||
return err
|
||||
}
|
||||
|
||||
case string(domain.FeedTypeRSS):
|
||||
if err := s.addRSSJob(fi); err != nil {
|
||||
s.log.Error().Err(err).Msg("failed to initialize rss feed")
|
||||
|
@ -380,6 +413,37 @@ func (s *service) addTorznabJob(f feedInstance) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *service) addNewznabJob(f feedInstance) error {
|
||||
if f.URL == "" {
|
||||
return errors.New("newznab feed requires URL")
|
||||
}
|
||||
|
||||
// setup logger
|
||||
l := s.log.With().Str("feed", f.Name).Logger()
|
||||
|
||||
// setup newznab Client
|
||||
c := newznab.NewClient(newznab.Config{Host: f.URL, ApiKey: f.ApiKey, Timeout: f.Timeout})
|
||||
|
||||
// create job
|
||||
job := NewNewznabJob(f.Feed, f.Name, f.IndexerIdentifier, l, f.URL, c, s.repo, s.cacheRepo, s.releaseSvc)
|
||||
|
||||
identifierKey := feedKey{f.Feed.ID, f.Feed.Indexer, f.Feed.Name}.ToString()
|
||||
|
||||
// schedule job
|
||||
id, err := s.scheduler.AddJob(job, f.CronSchedule, identifierKey)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "feed.AddNewznabJob: add job failed")
|
||||
}
|
||||
job.JobID = id
|
||||
|
||||
// add to job map
|
||||
s.jobs[identifierKey] = id
|
||||
|
||||
s.log.Debug().Msgf("add newznab job: %v", f.Name)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *service) addRSSJob(f feedInstance) error {
|
||||
if f.URL == "" {
|
||||
return errors.New("rss feed requires URL")
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue