feat: add torznab feed support (#246)

* feat(torznab): initial impl

* feat: torznab processing

* feat: torznab more scheduling

* feat: feeds web

* feat(feeds): create on indexer create

* feat(feeds): update migration

* feat(feeds): restart on update

* feat(feeds): set cron schedule

* feat(feeds): use basic empty state

* chore: remove duplicate migrations

* feat: parse release size from torznab

* chore: cleanup unused code
This commit is contained in:
Ludvig Lundgren 2022-04-25 12:58:54 +02:00 committed by GitHub
parent d4d864cd2c
commit bb62e724a1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
34 changed files with 2408 additions and 361 deletions

View file

@ -10,6 +10,8 @@ import (
"text/template"
"github.com/autobrr/autobrr/internal/domain"
"github.com/autobrr/autobrr/internal/release"
"github.com/rs/zerolog/log"
)
@ -20,15 +22,15 @@ type Processor interface {
type announceProcessor struct {
indexer domain.IndexerDefinition
announceSvc Service
releaseSvc release.Service
queues map[string]chan string
}
func NewAnnounceProcessor(announceSvc Service, indexer domain.IndexerDefinition) Processor {
func NewAnnounceProcessor(releaseSvc release.Service, indexer domain.IndexerDefinition) Processor {
ap := &announceProcessor{
announceSvc: announceSvc,
indexer: indexer,
releaseSvc: releaseSvc,
indexer: indexer,
}
// setup queues and consumers
@ -110,7 +112,7 @@ func (a *announceProcessor) processQueue(queue chan string) {
}
// process release in a new go routine
go a.announceSvc.Process(newRelease)
go a.releaseSvc.Process(newRelease)
}
}

View file

@ -1,134 +0,0 @@
package announce
import (
"context"
"strings"
"github.com/autobrr/autobrr/internal/action"
"github.com/autobrr/autobrr/internal/domain"
"github.com/autobrr/autobrr/internal/filter"
"github.com/autobrr/autobrr/internal/release"
"github.com/rs/zerolog/log"
)
type Service interface {
Process(release *domain.Release)
}
type service struct {
actionSvc action.Service
filterSvc filter.Service
releaseSvc release.Service
}
type actionClientTypeKey struct {
Type domain.ActionType
ClientID int32
}
func NewService(actionSvc action.Service, filterSvc filter.Service, releaseSvc release.Service) Service {
return &service{
actionSvc: actionSvc,
filterSvc: filterSvc,
releaseSvc: releaseSvc,
}
}
func (s *service) Process(release *domain.Release) {
// TODO check in config for "Save all releases"
// TODO cross-seed check
// TODO dupe checks
// get filters by priority
filters, err := s.filterSvc.FindByIndexerIdentifier(release.Indexer)
if err != nil {
log.Error().Err(err).Msgf("announce.Service.Process: error finding filters for indexer: %v", release.Indexer)
return
}
// keep track of action clients to avoid sending the same thing all over again
// save both client type and client id to potentially try another client of same type
triedActionClients := map[actionClientTypeKey]struct{}{}
// loop over and check filters
for _, f := range filters {
// save filter on release
release.Filter = &f
release.FilterName = f.Name
release.FilterID = f.ID
// TODO filter limit checks
// test filter
match, err := s.filterSvc.CheckFilter(f, release)
if err != nil {
log.Error().Err(err).Msg("announce.Service.Process: could not find filter")
return
}
if !match {
log.Trace().Msgf("announce.Service.Process: indexer: %v, filter: %v release: %v, no match", release.Indexer, release.Filter.Name, release.TorrentName)
continue
}
log.Info().Msgf("Matched '%v' (%v) for %v", release.TorrentName, release.Filter.Name, release.Indexer)
// save release here to only save those with rejections from actions instead of all releases
if release.ID == 0 {
release.FilterStatus = domain.ReleaseStatusFilterApproved
err = s.releaseSvc.Store(context.Background(), release)
if err != nil {
log.Error().Err(err).Msgf("announce.Service.Process: error writing release to database: %+v", release)
return
}
}
var rejections []string
// run actions (watchFolder, test, exec, qBittorrent, Deluge, arr etc.)
for _, a := range release.Filter.Actions {
// only run enabled actions
if !a.Enabled {
log.Trace().Msgf("announce.Service.Process: indexer: %v, filter: %v release: %v action '%v' not enabled, skip", release.Indexer, release.Filter.Name, release.TorrentName, a.Name)
continue
}
log.Trace().Msgf("announce.Service.Process: indexer: %v, filter: %v release: %v , run action: %v", release.Indexer, release.Filter.Name, release.TorrentName, a.Name)
// keep track of action clients to avoid sending the same thing all over again
_, tried := triedActionClients[actionClientTypeKey{Type: a.Type, ClientID: a.ClientID}]
if tried {
log.Trace().Msgf("announce.Service.Process: indexer: %v, filter: %v release: %v action client already tried, skip", release.Indexer, release.Filter.Name, release.TorrentName)
continue
}
rejections, err = s.actionSvc.RunAction(a, *release)
if err != nil {
log.Error().Stack().Err(err).Msgf("announce.Service.Process: error running actions for filter: %v", release.Filter.Name)
continue
}
if len(rejections) > 0 {
// if we get a rejection, remember which action client it was from
triedActionClients[actionClientTypeKey{Type: a.Type, ClientID: a.ClientID}] = struct{}{}
// log something and fire events
log.Debug().Msgf("announce.Service.Process: indexer: %v, filter: %v release: %v, rejected: %v", release.Indexer, release.Filter.Name, release.TorrentName, strings.Join(rejections, ", "))
}
// if no rejections consider action approved, run next
continue
}
// if we have rejections from arr, continue to next filter
if len(rejections) > 0 {
continue
}
// all actions run, decide to stop or continue here
break
}
return
}

270
internal/database/feed.go Normal file
View file

@ -0,0 +1,270 @@
package database
import (
"context"
"database/sql"
"github.com/autobrr/autobrr/internal/domain"
sq "github.com/Masterminds/squirrel"
"github.com/rs/zerolog/log"
)
func NewFeedRepo(db *DB) domain.FeedRepo {
return &FeedRepo{
db: db,
}
}
type FeedRepo struct {
db *DB
}
func (r *FeedRepo) FindByID(ctx context.Context, id int) (*domain.Feed, error) {
queryBuilder := r.db.squirrel.
Select(
"id",
"indexer",
"name",
"type",
"enabled",
"url",
"interval",
"api_key",
"created_at",
"updated_at",
).
From("feed").
Where("id = ?", id)
query, args, err := queryBuilder.ToSql()
if err != nil {
log.Error().Stack().Err(err).Msg("feed.FindById: error building query")
return nil, err
}
row := r.db.handler.QueryRowContext(ctx, query, args...)
if err := row.Err(); err != nil {
log.Error().Stack().Err(err).Msg("feed.FindById: error executing query")
return nil, err
}
var f domain.Feed
var apiKey sql.NullString
if err := row.Scan(&f.ID, &f.Indexer, &f.Name, &f.Type, &f.Enabled, &f.URL, &f.Interval, &apiKey, &f.CreatedAt, &f.UpdatedAt); err != nil {
log.Error().Stack().Err(err).Msg("feed.FindById: error scanning row")
return nil, err
}
f.ApiKey = apiKey.String
return &f, nil
}
func (r *FeedRepo) FindByIndexerIdentifier(ctx context.Context, indexer string) (*domain.Feed, error) {
queryBuilder := r.db.squirrel.
Select(
"id",
"indexer",
"name",
"type",
"enabled",
"url",
"interval",
"api_key",
"created_at",
"updated_at",
).
From("feed").
Where("indexer = ?", indexer)
query, args, err := queryBuilder.ToSql()
if err != nil {
log.Error().Stack().Err(err).Msg("feed.FindByIndexerIdentifier: error building query")
return nil, err
}
row := r.db.handler.QueryRowContext(ctx, query, args...)
if err := row.Err(); err != nil {
log.Error().Stack().Err(err).Msg("feed.FindByIndexerIdentifier: error executing query")
return nil, err
}
var f domain.Feed
var apiKey sql.NullString
if err := row.Scan(&f.ID, &f.Indexer, &f.Name, &f.Type, &f.Enabled, &f.URL, &f.Interval, &apiKey, &f.CreatedAt, &f.UpdatedAt); err != nil {
log.Error().Stack().Err(err).Msg("feed.FindByIndexerIdentifier: error scanning row")
return nil, err
}
f.ApiKey = apiKey.String
return &f, nil
}
func (r *FeedRepo) Find(ctx context.Context) ([]domain.Feed, error) {
queryBuilder := r.db.squirrel.
Select(
"id",
"indexer",
"name",
"type",
"enabled",
"url",
"interval",
"api_key",
"created_at",
"updated_at",
).
From("feed").
OrderBy("name ASC")
query, args, err := queryBuilder.ToSql()
if err != nil {
log.Error().Stack().Err(err).Msg("feed.Find: error building query")
return nil, err
}
rows, err := r.db.handler.QueryContext(ctx, query, args...)
if err != nil {
log.Error().Stack().Err(err).Msg("feed.Find: error executing query")
return nil, err
}
defer rows.Close()
feeds := make([]domain.Feed, 0)
for rows.Next() {
var f domain.Feed
var apiKey sql.NullString
if err := rows.Scan(&f.ID, &f.Indexer, &f.Name, &f.Type, &f.Enabled, &f.URL, &f.Interval, &apiKey, &f.CreatedAt, &f.UpdatedAt); err != nil {
log.Error().Stack().Err(err).Msg("feed.Find: error scanning row")
return nil, err
}
f.ApiKey = apiKey.String
feeds = append(feeds, f)
}
return feeds, nil
}
func (r *FeedRepo) Store(ctx context.Context, feed *domain.Feed) error {
queryBuilder := r.db.squirrel.
Insert("feed").
Columns(
"name",
"indexer",
"type",
"enabled",
"url",
"interval",
"api_key",
"indexer_id",
).
Values(
feed.Name,
feed.Indexer,
feed.Type,
feed.Enabled,
feed.URL,
feed.Interval,
feed.ApiKey,
feed.IndexerID,
).
Suffix("RETURNING id").RunWith(r.db.handler)
var retID int
if err := queryBuilder.QueryRowContext(ctx).Scan(&retID); err != nil {
log.Error().Stack().Err(err).Msg("feed.Store: error executing query")
return err
}
feed.ID = retID
return nil
}
func (r *FeedRepo) Update(ctx context.Context, feed *domain.Feed) error {
queryBuilder := r.db.squirrel.
Update("feed").
Set("name", feed.Name).
Set("indexer", feed.Indexer).
Set("type", feed.Type).
Set("enabled", feed.Enabled).
Set("url", feed.URL).
Set("interval", feed.Interval).
Set("api_key", feed.ApiKey).
Set("indexer_id", feed.IndexerID).
Where("id = ?", feed.ID)
query, args, err := queryBuilder.ToSql()
if err != nil {
log.Error().Stack().Err(err).Msg("feed.Update: error building query")
return err
}
_, err = r.db.handler.ExecContext(ctx, query, args...)
if err != nil {
log.Error().Stack().Err(err).Msg("feed.Update: error executing query")
return err
}
return nil
}
func (r *FeedRepo) ToggleEnabled(ctx context.Context, id int, enabled bool) error {
var err error
queryBuilder := r.db.squirrel.
Update("feed").
Set("enabled", enabled).
Set("updated_at", sq.Expr("CURRENT_TIMESTAMP")).
Where("id = ?", id)
query, args, err := queryBuilder.ToSql()
if err != nil {
log.Error().Stack().Err(err).Msg("feed.ToggleEnabled: error building query")
return err
}
_, err = r.db.handler.ExecContext(ctx, query, args...)
if err != nil {
log.Error().Stack().Err(err).Msg("feed.ToggleEnabled: error executing query")
return err
}
return nil
}
func (r *FeedRepo) Delete(ctx context.Context, id int) error {
queryBuilder := r.db.squirrel.
Delete("feed").
Where("id = ?", id)
query, args, err := queryBuilder.ToSql()
if err != nil {
log.Error().Stack().Err(err).Msg("feed.delete: error building query")
return err
}
_, err = r.db.handler.ExecContext(ctx, query, args...)
if err != nil {
log.Error().Stack().Err(err).Msg("feed.delete: error executing query")
return err
}
log.Info().Msgf("feed.delete: successfully deleted: %v", id)
return nil
}

View file

@ -0,0 +1,103 @@
package database
import (
"database/sql"
"time"
"github.com/rs/zerolog/log"
"github.com/autobrr/autobrr/internal/domain"
)
type FeedCacheRepo struct {
db *DB
}
func NewFeedCacheRepo(db *DB) domain.FeedCacheRepo {
return &FeedCacheRepo{
db: db,
}
}
func (r *FeedCacheRepo) Get(bucket string, key string) ([]byte, error) {
queryBuilder := r.db.squirrel.
Select(
"value",
"ttl",
).
From("feed_cache").
Where("bucket = ?", bucket).
Where("key = ?", key).
Where("ttl > ?", time.Now())
query, args, err := queryBuilder.ToSql()
if err != nil {
log.Error().Stack().Err(err).Msg("feedCache.Get: error building query")
return nil, err
}
row := r.db.handler.QueryRow(query, args...)
if err := row.Err(); err != nil {
log.Error().Stack().Err(err).Msg("feedCache.Get: query error")
return nil, err
}
var value []byte
var ttl time.Duration
if err := row.Scan(&value, &ttl); err != nil && err != sql.ErrNoRows {
log.Error().Stack().Err(err).Msg("feedCache.Get: error scanning row")
return nil, err
}
return value, nil
}
func (r *FeedCacheRepo) Exists(bucket string, key string) (bool, error) {
queryBuilder := r.db.squirrel.
Select("1").
Prefix("SELECT EXISTS (").
From("feed_cache").
Where("bucket = ?", bucket).
Where("key = ?", key).
Suffix(")")
query, args, err := queryBuilder.ToSql()
if err != nil {
log.Error().Stack().Err(err).Msg("feedCache.Exists: error building query")
return false, err
}
var exists bool
err = r.db.handler.QueryRow(query, args...).Scan(&exists)
if err != nil && err != sql.ErrNoRows {
log.Error().Stack().Err(err).Msg("feedCache.Exists: query error")
}
return exists, nil
}
func (r *FeedCacheRepo) Put(bucket string, key string, val []byte, ttl time.Duration) error {
queryBuilder := r.db.squirrel.
Insert("feed_cache").
Columns("bucket", "key", "value", "ttl").
Values(bucket, key, val, ttl)
query, args, err := queryBuilder.ToSql()
if err != nil {
log.Error().Stack().Err(err).Msg("feedCache.Put: error building query")
return err
}
if _, err = r.db.handler.Exec(query, args...); err != nil {
log.Error().Stack().Err(err).Msg("feedCache.Put: error executing query")
return err
}
return nil
}
func (r *FeedCacheRepo) Delete(bucket string, key string) error {
//TODO implement me
panic("implement me")
}

View file

@ -2,6 +2,7 @@ package database
import (
"context"
"database/sql"
"encoding/json"
"time"
@ -28,8 +29,8 @@ func (r *IndexerRepo) Store(ctx context.Context, indexer domain.Indexer) (*domai
}
queryBuilder := r.db.squirrel.
Insert("indexer").Columns("enabled", "name", "identifier", "settings").
Values(indexer.Enabled, indexer.Name, indexer.Identifier, settings).
Insert("indexer").Columns("enabled", "name", "identifier", "implementation", "settings").
Values(indexer.Enabled, indexer.Name, indexer.Identifier, indexer.Implementation, settings).
Suffix("RETURNING id").RunWith(r.db.handler)
// return values
@ -77,7 +78,7 @@ func (r *IndexerRepo) Update(ctx context.Context, indexer domain.Indexer) (*doma
}
func (r *IndexerRepo) List(ctx context.Context) ([]domain.Indexer, error) {
rows, err := r.db.handler.QueryContext(ctx, "SELECT id, enabled, name, identifier, settings FROM indexer ORDER BY name ASC")
rows, err := r.db.handler.QueryContext(ctx, "SELECT id, enabled, name, identifier, implementation, settings FROM indexer ORDER BY name ASC")
if err != nil {
log.Error().Stack().Err(err).Msg("indexer.list: error query indexer")
return nil, err
@ -89,14 +90,17 @@ func (r *IndexerRepo) List(ctx context.Context) ([]domain.Indexer, error) {
for rows.Next() {
var f domain.Indexer
var implementation sql.NullString
var settings string
var settingsMap map[string]string
if err := rows.Scan(&f.ID, &f.Enabled, &f.Name, &f.Identifier, &settings); err != nil {
if err := rows.Scan(&f.ID, &f.Enabled, &f.Name, &f.Identifier, &implementation, &settings); err != nil {
log.Error().Stack().Err(err).Msg("indexer.list: error scanning data to struct")
return nil, err
}
f.Implementation = implementation.String
err = json.Unmarshal([]byte(settings), &settingsMap)
if err != nil {
log.Error().Stack().Err(err).Msg("indexer.list: error unmarshal settings")

View file

@ -13,13 +13,14 @@ CREATE TABLE users
CREATE TABLE indexer
(
id INTEGER PRIMARY KEY,
identifier TEXT,
enabled BOOLEAN,
name TEXT NOT NULL,
settings TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
id INTEGER PRIMARY KEY,
identifier TEXT,
implementation TEXT,
enabled BOOLEAN,
name TEXT NOT NULL,
settings TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE (identifier)
);
@ -241,6 +242,33 @@ CREATE TABLE notification
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE feed
(
id INTEGER PRIMARY KEY,
indexer TEXT,
name TEXT,
type TEXT,
enabled BOOLEAN,
url TEXT,
interval INTEGER,
categories TEXT [] DEFAULT '{}' NOT NULL,
capabilities TEXT [] DEFAULT '{}' NOT NULL,
api_key TEXT,
settings TEXT,
indexer_id INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (indexer_id) REFERENCES indexer(id) ON DELETE SET NULL
);
CREATE TABLE feed_cache
(
bucket TEXT,
key TEXT,
value TEXT,
ttl TIMESTAMP
);
`
var sqliteMigrations = []string{
@ -535,6 +563,38 @@ ALTER TABLE release_action_status_dg_tmp
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
`,
`
CREATE TABLE feed
(
id INTEGER PRIMARY KEY,
indexer TEXT,
name TEXT,
type TEXT,
enabled BOOLEAN,
url TEXT,
interval INTEGER,
categories TEXT [] DEFAULT '{}' NOT NULL,
capabilities TEXT [] DEFAULT '{}' NOT NULL,
api_key TEXT,
settings TEXT,
indexer_id INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (indexer_id) REFERENCES indexer(id) ON DELETE SET NULL
);
CREATE TABLE feed_cache
(
bucket TEXT,
key TEXT,
value TEXT,
ttl TIMESTAMP
);
`,
`
ALTER TABLE indexer
ADD COLUMN implementation TEXT;
`,
}
const postgresSchema = `
@ -550,13 +610,14 @@ CREATE TABLE users
CREATE TABLE indexer
(
id SERIAL PRIMARY KEY,
identifier TEXT,
enabled BOOLEAN,
name TEXT NOT NULL,
settings TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
id SERIAL PRIMARY KEY,
identifier TEXT,
implementation TEXT,
enabled BOOLEAN,
name TEXT NOT NULL,
settings TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE (identifier)
);
@ -778,6 +839,33 @@ CREATE TABLE notification
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE feed
(
id SERIAL PRIMARY KEY,
indexer TEXT,
name TEXT,
type TEXT,
enabled BOOLEAN,
url TEXT,
interval INTEGER,
categories TEXT [] DEFAULT '{}' NOT NULL,
capabilities TEXT [] DEFAULT '{}' NOT NULL,
api_key TEXT,
settings TEXT,
indexer_id INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (indexer_id) REFERENCES indexer(id) ON DELETE SET NULL
);
CREATE TABLE feed_cache
(
bucket TEXT,
key TEXT,
value TEXT,
ttl TIMESTAMP
);
`
var postgresMigrations = []string{
@ -806,4 +894,36 @@ var postgresMigrations = []string{
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
`,
`
CREATE TABLE feed
(
id SERIAL PRIMARY KEY,
indexer TEXT,
name TEXT,
type TEXT,
enabled BOOLEAN,
url TEXT,
interval INTEGER,
categories TEXT [] DEFAULT '{}' NOT NULL,
capabilities TEXT [] DEFAULT '{}' NOT NULL,
api_key TEXT,
settings TEXT,
indexer_id INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (indexer_id) REFERENCES indexer(id) ON DELETE SET NULL
);
CREATE TABLE feed_cache
(
bucket TEXT,
key TEXT,
value TEXT,
ttl TIMESTAMP
);
`,
`
ALTER TABLE indexer
ADD COLUMN implementation TEXT;
`,
}

52
internal/domain/feed.go Normal file
View file

@ -0,0 +1,52 @@
package domain
import (
"context"
"time"
)
type FeedCacheRepo interface {
Get(bucket string, key string) ([]byte, error)
Exists(bucket string, key string) (bool, error)
Put(bucket string, key string, val []byte, ttl time.Duration) error
Delete(bucket string, key string) error
}
type FeedRepo interface {
FindByID(ctx context.Context, id int) (*Feed, error)
FindByIndexerIdentifier(ctx context.Context, indexer string) (*Feed, error)
Find(ctx context.Context) ([]Feed, error)
Store(ctx context.Context, feed *Feed) error
Update(ctx context.Context, feed *Feed) error
ToggleEnabled(ctx context.Context, id int, enabled bool) error
Delete(ctx context.Context, id int) error
}
type Feed struct {
ID int `json:"id"`
Name string `json:"name"`
Indexer string `json:"indexer"`
Type string `json:"type"`
Enabled bool `json:"enabled"`
URL string `json:"url"`
Interval int `json:"interval"`
Capabilities []string `json:"capabilities"`
ApiKey string `json:"api_key"`
Settings map[string]string `json:"settings"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
IndexerID int `json:"-"`
Indexerr FeedIndexer `json:"-"`
}
type FeedIndexer struct {
ID int `json:"id"`
Name string `json:"name"`
Identifier string `json:"identifier"`
}
type FeedType string
const (
FeedTypeTorznab FeedType = "TORZNAB"
)

View file

@ -15,29 +15,31 @@ type IndexerRepo interface {
}
type Indexer struct {
ID int64 `json:"id"`
Name string `json:"name"`
Identifier string `json:"identifier"`
Enabled bool `json:"enabled"`
Type string `json:"type,omitempty"`
Settings map[string]string `json:"settings,omitempty"`
ID int64 `json:"id"`
Name string `json:"name"`
Identifier string `json:"identifier"`
Enabled bool `json:"enabled"`
Implementation string `json:"implementation"`
Settings map[string]string `json:"settings,omitempty"`
}
type IndexerDefinition struct {
ID int `json:"id,omitempty"`
Name string `json:"name"`
Identifier string `json:"identifier"`
Enabled bool `json:"enabled,omitempty"`
Description string `json:"description"`
Language string `json:"language"`
Privacy string `json:"privacy"`
Protocol string `json:"protocol"`
URLS []string `json:"urls"`
Supports []string `json:"supports"`
Settings []IndexerSetting `json:"settings"`
SettingsMap map[string]string `json:"-"`
IRC *IndexerIRC `json:"irc"`
Parse IndexerParse `json:"parse"`
ID int `json:"id,omitempty"`
Name string `json:"name"`
Identifier string `json:"identifier"`
Implementation string `json:"implementation"`
Enabled bool `json:"enabled,omitempty"`
Description string `json:"description"`
Language string `json:"language"`
Privacy string `json:"privacy"`
Protocol string `json:"protocol"`
URLS []string `json:"urls"`
Supports []string `json:"supports"`
Settings []IndexerSetting `json:"settings,omitempty"`
SettingsMap map[string]string `json:"-"`
IRC *IndexerIRC `json:"irc,omitempty"`
Torznab *Torznab `json:"torznab,omitempty"`
Parse *IndexerParse `json:"parse,omitempty"`
}
func (i IndexerDefinition) HasApi() bool {
@ -61,6 +63,11 @@ type IndexerSetting struct {
Regex string `json:"regex,omitempty"`
}
type Torznab struct {
MinInterval int `json:"minInterval"`
Settings []IndexerSetting `json:"settings"`
}
type IndexerIRC struct {
Network string `json:"network"`
Server string `json:"server"`

View file

@ -157,6 +157,15 @@ func (r *Release) Parse() error {
return nil
}
func (r *Release) ParseSizeBytesString(size string) {
s, err := humanize.ParseBytes(size)
if err != nil {
// log could not parse into bytes
r.Size = 0
}
r.Size = s
}
func (r *Release) extractYear() error {
if r.Year > 0 {
return nil
@ -1514,7 +1523,8 @@ const (
type ReleaseImplementation string
const (
ReleaseImplementationIRC ReleaseImplementation = "IRC"
ReleaseImplementationIRC ReleaseImplementation = "IRC"
ReleaseImplementationTorznab ReleaseImplementation = "TORZNAB"
)
type ReleaseQueryParams struct {

277
internal/feed/service.go Normal file
View file

@ -0,0 +1,277 @@
package feed
import (
"context"
"errors"
"fmt"
"github.com/autobrr/autobrr/internal/domain"
"github.com/autobrr/autobrr/internal/release"
"github.com/autobrr/autobrr/internal/scheduler"
"github.com/autobrr/autobrr/pkg/torznab"
"github.com/rs/zerolog/log"
)
type Service interface {
FindByID(ctx context.Context, id int) (*domain.Feed, error)
FindByIndexerIdentifier(ctx context.Context, indexer string) (*domain.Feed, error)
Find(ctx context.Context) ([]domain.Feed, error)
Store(ctx context.Context, feed *domain.Feed) error
Update(ctx context.Context, feed *domain.Feed) error
ToggleEnabled(ctx context.Context, id int, enabled bool) error
Delete(ctx context.Context, id int) error
Start() error
}
type feedInstance struct {
Name string
IndexerIdentifier string
URL string
ApiKey string
Implementation string
CronSchedule string
}
type service struct {
jobs map[string]int
repo domain.FeedRepo
cacheRepo domain.FeedCacheRepo
releaseSvc release.Service
scheduler scheduler.Service
}
func NewService(repo domain.FeedRepo, cacheRepo domain.FeedCacheRepo, releaseSvc release.Service, scheduler scheduler.Service) Service {
return &service{
jobs: map[string]int{},
repo: repo,
cacheRepo: cacheRepo,
releaseSvc: releaseSvc,
scheduler: scheduler,
}
}
func (s *service) FindByID(ctx context.Context, id int) (*domain.Feed, error) {
return s.repo.FindByID(ctx, id)
}
func (s *service) FindByIndexerIdentifier(ctx context.Context, indexer string) (*domain.Feed, error) {
return s.repo.FindByIndexerIdentifier(ctx, indexer)
}
func (s *service) Find(ctx context.Context) ([]domain.Feed, error) {
return s.repo.Find(ctx)
}
func (s *service) Store(ctx context.Context, feed *domain.Feed) error {
return s.repo.Store(ctx, feed)
}
func (s *service) Update(ctx context.Context, feed *domain.Feed) error {
return s.update(ctx, feed)
}
func (s *service) Delete(ctx context.Context, id int) error {
return s.delete(ctx, id)
}
func (s *service) ToggleEnabled(ctx context.Context, id int, enabled bool) error {
return s.toggleEnabled(ctx, id, enabled)
}
func (s *service) update(ctx context.Context, feed *domain.Feed) error {
if err := s.repo.Update(ctx, feed); err != nil {
log.Error().Err(err).Msg("feed.Update: error updating feed")
return err
}
if err := s.restartJob(feed); err != nil {
log.Error().Err(err).Msg("feed.Update: error restarting feed")
return err
}
return nil
}
func (s *service) delete(ctx context.Context, id int) error {
f, err := s.repo.FindByID(ctx, id)
if err != nil {
log.Error().Err(err).Msg("feed.ToggleEnabled: error finding feed")
return err
}
if err := s.stopTorznabJob(f.Indexer); err != nil {
log.Error().Err(err).Msg("feed.Delete: error stopping torznab job")
return err
}
if err := s.repo.Delete(ctx, id); err != nil {
log.Error().Err(err).Msg("feed.Delete: error deleting feed")
return err
}
log.Debug().Msgf("feed.Delete: stopping and removing feed: %v", f.Name)
return nil
}
func (s *service) toggleEnabled(ctx context.Context, id int, enabled bool) error {
if err := s.repo.ToggleEnabled(ctx, id, enabled); err != nil {
log.Error().Err(err).Msg("feed.ToggleEnabled: error toggle enabled")
return err
}
f, err := s.repo.FindByID(ctx, id)
if err != nil {
log.Error().Err(err).Msg("feed.ToggleEnabled: error finding feed")
return err
}
if !enabled {
if err := s.stopTorznabJob(f.Indexer); err != nil {
log.Error().Err(err).Msg("feed.ToggleEnabled: error stopping torznab job")
return err
}
log.Debug().Msgf("feed.ToggleEnabled: stopping feed: %v", f.Name)
return nil
}
if err := s.startJob(*f); err != nil {
log.Error().Err(err).Msg("feed.ToggleEnabled: error starting torznab job")
return err
}
log.Debug().Msgf("feed.ToggleEnabled: started feed: %v", f.Name)
return nil
}
func (s *service) Start() error {
// get all torznab indexer definitions
feeds, err := s.repo.Find(context.TODO())
if err != nil {
log.Error().Err(err).Msg("feed.Start: error getting feeds")
return err
}
for _, i := range feeds {
if err := s.startJob(i); err != nil {
log.Error().Err(err).Msg("feed.Start: failed to initialize torznab job")
continue
}
}
return nil
}
func (s *service) restartJob(f *domain.Feed) error {
// stop feed
if err := s.stopTorznabJob(f.Indexer); err != nil {
log.Error().Err(err).Msg("feed.restartJob: error stopping torznab job")
return err
}
log.Debug().Msgf("feed.restartJob: stopping feed: %v", f.Name)
if f.Enabled {
if err := s.startJob(*f); err != nil {
log.Error().Err(err).Msg("feed.restartJob: error starting torznab job")
return err
}
log.Debug().Msgf("feed.restartJob: restarted feed: %v", f.Name)
}
return nil
}
func (s *service) startJob(f domain.Feed) error {
// get all torznab indexer definitions
if !f.Enabled {
return nil
}
// get torznab_url from settings
if f.URL == "" {
return nil
}
// cron schedule to run every X minutes
schedule := fmt.Sprintf("*/%d * * * *", f.Interval)
fi := feedInstance{
Name: f.Name,
IndexerIdentifier: f.Indexer,
Implementation: f.Type,
URL: f.URL,
ApiKey: f.ApiKey,
CronSchedule: schedule,
}
switch fi.Implementation {
case string(domain.FeedTypeTorznab):
if err := s.addTorznabJob(fi); err != nil {
log.Error().Err(err).Msg("feed.startJob: failed to initialize feed")
return err
}
//case "rss":
}
return nil
}
func (s *service) addTorznabJob(f feedInstance) error {
if f.URL == "" {
return errors.New("torznab feed requires URL")
}
if f.CronSchedule == "" {
f.CronSchedule = "*/15 * * * *"
}
// setup logger
l := log.With().Str("feed_name", f.Name).Logger()
// setup torznab Client
c := torznab.NewClient(f.URL, f.ApiKey)
// create job
job := &TorznabJob{
Name: f.Name,
IndexerIdentifier: f.IndexerIdentifier,
Client: c,
Log: l,
Repo: s.cacheRepo,
ReleaseSvc: s.releaseSvc,
URL: f.URL,
}
// schedule job
id, err := s.scheduler.AddJob(job, f.CronSchedule, f.IndexerIdentifier)
if err != nil {
return fmt.Errorf("feed.AddTorznabJob: add job failed: %w", err)
}
job.JobID = id
// add to job map
s.jobs[f.IndexerIdentifier] = id
log.Debug().Msgf("feed.AddTorznabJob: %v", f.Name)
return nil
}
func (s *service) stopTorznabJob(indexer string) error {
// remove job from scheduler
if err := s.scheduler.RemoveJobByIdentifier(indexer); err != nil {
return fmt.Errorf("feed.stopTorznabJob: stop job failed: %w", err)
}
log.Debug().Msgf("feed.stopTorznabJob: %v", indexer)
return nil
}

134
internal/feed/torznab.go Normal file
View file

@ -0,0 +1,134 @@
package feed
import (
"fmt"
"sort"
"time"
"github.com/autobrr/autobrr/internal/domain"
"github.com/autobrr/autobrr/internal/release"
"github.com/autobrr/autobrr/pkg/torznab"
"github.com/rs/zerolog"
)
type TorznabJob struct {
Name string
IndexerIdentifier string
Log zerolog.Logger
URL string
Client *torznab.Client
Repo domain.FeedCacheRepo
ReleaseSvc release.Service
attempts int
errors []error
JobID int
}
func (j *TorznabJob) Run() {
err := j.process()
if err != nil {
j.Log.Err(err).Int("attempts", j.attempts).Msg("torznab process error")
j.errors = append(j.errors, err)
}
j.attempts = 0
j.errors = j.errors[:0]
}
func (j *TorznabJob) process() error {
// get feed
items, err := j.getFeed()
if err != nil {
j.Log.Error().Err(err).Msgf("torznab.process: error fetching feed items")
return fmt.Errorf("torznab.process: error getting feed items: %w", err)
}
if len(items) == 0 {
return nil
}
j.Log.Debug().Msgf("torznab.process: refreshing feed: %v, found (%d) new items to check", j.Name, len(items))
releases := make([]*domain.Release, 0)
for _, item := range items {
rls, err := domain.NewRelease(item.Title, "")
if err != nil {
continue
}
rls.TorrentName = item.Title
rls.TorrentURL = item.GUID
rls.Implementation = domain.ReleaseImplementationTorznab
rls.Indexer = j.IndexerIdentifier
// parse size bytes string
rls.ParseSizeBytesString(item.Size)
if err := rls.Parse(); err != nil {
j.Log.Error().Err(err).Msgf("torznab.process: error parsing release")
continue
}
releases = append(releases, rls)
}
// process all new releases
go j.ReleaseSvc.ProcessMultiple(releases)
return nil
}
func (j *TorznabJob) getFeed() ([]torznab.FeedItem, error) {
// get feed
feedItems, err := j.Client.GetFeed()
if err != nil {
j.Log.Error().Err(err).Msgf("torznab.getFeed: error fetching feed items")
return nil, err
}
j.Log.Trace().Msgf("torznab getFeed: refreshing feed: %v, found (%d) items", j.Name, len(feedItems))
items := make([]torznab.FeedItem, 0)
if len(feedItems) == 0 {
return items, nil
}
sort.SliceStable(feedItems, func(i, j int) bool {
return feedItems[i].PubDate.After(feedItems[j].PubDate.Time)
})
for _, i := range feedItems {
if i.GUID == "" {
continue
}
//if cacheValue, err := j.Repo.Get(j.Name, i.GUID); err == nil {
// j.Log.Trace().Msgf("torznab getFeed: cacheValue: %v", cacheValue)
//}
if exists, err := j.Repo.Exists(j.Name, i.GUID); err == nil {
if exists {
j.Log.Trace().Msg("torznab getFeed: cache item exists, skip")
continue
}
}
// do something more
items = append(items, i)
ttl := (24 * time.Hour) * 28
if err := j.Repo.Put(j.Name, i.GUID, []byte("test"), ttl); err != nil {
j.Log.Error().Err(err).Str("guid", i.GUID).Msg("torznab getFeed: cache.Put: error storing item in cache")
}
}
// send to filters
return items, nil
}

139
internal/http/feed.go Normal file
View file

@ -0,0 +1,139 @@
package http
import (
"context"
"encoding/json"
"net/http"
"strconv"
"github.com/autobrr/autobrr/internal/domain"
"github.com/go-chi/chi"
)
type feedService interface {
Find(ctx context.Context) ([]domain.Feed, error)
Store(ctx context.Context, feed *domain.Feed) error
Update(ctx context.Context, feed *domain.Feed) error
Delete(ctx context.Context, id int) error
ToggleEnabled(ctx context.Context, id int, enabled bool) error
}
type feedHandler struct {
encoder encoder
service feedService
}
func newFeedHandler(encoder encoder, service feedService) *feedHandler {
return &feedHandler{
encoder: encoder,
service: service,
}
}
func (h feedHandler) Routes(r chi.Router) {
r.Get("/", h.find)
r.Post("/", h.store)
r.Put("/{feedID}", h.update)
r.Patch("/{feedID}/enabled", h.toggleEnabled)
r.Delete("/{feedID}", h.delete)
}
func (h feedHandler) find(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
feeds, err := h.service.Find(ctx)
if err != nil {
h.encoder.StatusNotFound(ctx, w)
return
}
h.encoder.StatusResponse(ctx, w, feeds, http.StatusOK)
}
func (h feedHandler) store(w http.ResponseWriter, r *http.Request) {
var (
ctx = r.Context()
data *domain.Feed
)
if err := json.NewDecoder(r.Body).Decode(&data); err != nil {
// encode error
h.encoder.StatusNotFound(ctx, w)
return
}
err := h.service.Store(ctx, data)
if err != nil {
// encode error
h.encoder.StatusInternalError(w)
return
}
h.encoder.StatusResponse(ctx, w, data, http.StatusCreated)
}
func (h feedHandler) update(w http.ResponseWriter, r *http.Request) {
var (
ctx = r.Context()
data *domain.Feed
)
if err := json.NewDecoder(r.Body).Decode(&data); err != nil {
// encode error
h.encoder.StatusInternalError(w)
return
}
err := h.service.Update(ctx, data)
if err != nil {
// encode error
h.encoder.StatusInternalError(w)
return
}
h.encoder.StatusResponse(ctx, w, data, http.StatusCreated)
}
func (h feedHandler) toggleEnabled(w http.ResponseWriter, r *http.Request) {
var (
ctx = r.Context()
filterID = chi.URLParam(r, "feedID")
data struct {
Enabled bool `json:"enabled"`
}
)
id, _ := strconv.Atoi(filterID)
if err := json.NewDecoder(r.Body).Decode(&data); err != nil {
// encode error
h.encoder.StatusInternalError(w)
return
}
err := h.service.ToggleEnabled(ctx, id, data.Enabled)
if err != nil {
// encode error
h.encoder.StatusInternalError(w)
return
}
h.encoder.StatusResponse(ctx, w, nil, http.StatusNoContent)
}
func (h feedHandler) delete(w http.ResponseWriter, r *http.Request) {
var (
ctx = r.Context()
filterID = chi.URLParam(r, "feedID")
)
id, _ := strconv.Atoi(filterID)
if err := h.service.Delete(ctx, id); err != nil {
h.encoder.StatusInternalError(w)
return
}
h.encoder.StatusResponse(ctx, w, nil, http.StatusNoContent)
}

View file

@ -31,13 +31,14 @@ type Server struct {
authService authService
downloadClientService downloadClientService
filterService filterService
feedService feedService
indexerService indexerService
ircService ircService
notificationService notificationService
releaseService releaseService
}
func NewServer(config domain.Config, sse *sse.Server, db *database.DB, version string, commit string, date string, actionService actionService, authService authService, downloadClientSvc downloadClientService, filterSvc filterService, indexerSvc indexerService, ircSvc ircService, notificationSvc notificationService, releaseSvc releaseService) Server {
func NewServer(config domain.Config, sse *sse.Server, db *database.DB, version string, commit string, date string, actionService actionService, authService authService, downloadClientSvc downloadClientService, filterSvc filterService, feedSvc feedService, indexerSvc indexerService, ircSvc ircService, notificationSvc notificationService, releaseSvc releaseService) Server {
return Server{
config: config,
sse: sse,
@ -52,6 +53,7 @@ func NewServer(config domain.Config, sse *sse.Server, db *database.DB, version s
authService: authService,
downloadClientService: downloadClientSvc,
filterService: filterSvc,
feedService: feedSvc,
indexerService: indexerSvc,
ircService: ircSvc,
notificationService: notificationSvc,
@ -111,6 +113,7 @@ func (s Server) Handler() http.Handler {
r.Route("/config", newConfigHandler(encoder, s).Routes)
r.Route("/download_clients", newDownloadClientHandler(encoder, s.downloadClientService).Routes)
r.Route("/filters", newFilterHandler(encoder, s.filterService).Routes)
r.Route("/feeds", newFeedHandler(encoder, s.feedService).Routes)
r.Route("/irc", newIrcHandler(encoder, s.ircService).Routes)
r.Route("/indexer", newIndexerHandler(encoder, s.indexerService, s.ircService).Routes)
r.Route("/notification", newNotificationHandler(encoder, s.notificationService).Routes)

View file

@ -0,0 +1,27 @@
---
#id: torznab
name: Generic Torznab
identifier: torznab
description: Generic Torznab
language: en-us
urls:
- https://domain.com
privacy: private
protocol: torrent
implementation: torznab
supports:
- torznab
source: torznab
torznab:
minInterval: 15
settings:
- name: url
type: text
required: true
label: Torznab URL
- name: api_key
type: secret
required: false
label: Api key
help: Api key

View file

@ -2,16 +2,19 @@ package indexer
import (
"context"
"errors"
"fmt"
"io/fs"
"os"
"path/filepath"
"strings"
"github.com/autobrr/autobrr/internal/domain"
"github.com/autobrr/autobrr/internal/scheduler"
"github.com/gosimple/slug"
"github.com/rs/zerolog/log"
"gopkg.in/yaml.v2"
"github.com/autobrr/autobrr/internal/domain"
)
type Service interface {
@ -24,6 +27,7 @@ type Service interface {
GetTemplates() ([]domain.IndexerDefinition, error)
LoadIndexerDefinitions() error
GetIndexersByIRCNetwork(server string) []domain.IndexerDefinition
GetTorznabIndexers() []domain.IndexerDefinition
Start() error
}
@ -31,6 +35,7 @@ type service struct {
config domain.Config
repo domain.IndexerRepo
apiService APIService
scheduler scheduler.Service
// contains all raw indexer definitions
indexerDefinitions map[string]domain.IndexerDefinition
@ -39,20 +44,33 @@ type service struct {
mapIndexerIRCToName map[string]string
lookupIRCServerDefinition map[string]map[string]domain.IndexerDefinition
torznabIndexers map[string]*domain.IndexerDefinition
}
func NewService(config domain.Config, repo domain.IndexerRepo, apiService APIService) Service {
func NewService(config domain.Config, repo domain.IndexerRepo, apiService APIService, scheduler scheduler.Service) Service {
return &service{
config: config,
repo: repo,
apiService: apiService,
scheduler: scheduler,
indexerDefinitions: make(map[string]domain.IndexerDefinition),
mapIndexerIRCToName: make(map[string]string),
lookupIRCServerDefinition: make(map[string]map[string]domain.IndexerDefinition),
torznabIndexers: make(map[string]*domain.IndexerDefinition),
}
}
func (s *service) Store(ctx context.Context, indexer domain.Indexer) (*domain.Indexer, error) {
identifier := indexer.Identifier
if indexer.Identifier == "torznab" {
// if the name already contains torznab remove it
cleanName := strings.ReplaceAll(strings.ToLower(indexer.Name), "torznab", "")
identifier = slug.Make(fmt.Sprintf("%v-%v", indexer.Identifier, cleanName))
}
indexer.Identifier = identifier
i, err := s.repo.Store(ctx, indexer)
if err != nil {
log.Error().Stack().Err(err).Msgf("failed to store indexer: %v", indexer.Name)
@ -82,6 +100,12 @@ func (s *service) Update(ctx context.Context, indexer domain.Indexer) (*domain.I
return nil, err
}
if indexer.Implementation == "torznab" {
if !indexer.Enabled {
s.stopFeed(indexer.Identifier)
}
}
return i, nil
}
@ -130,27 +154,42 @@ func (s *service) GetAll() ([]*domain.IndexerDefinition, error) {
func (s *service) mapIndexer(indexer domain.Indexer) (*domain.IndexerDefinition, error) {
in := s.getDefinitionByName(indexer.Identifier)
if in == nil {
// if no indexerDefinition found, continue
return nil, nil
var in *domain.IndexerDefinition
if indexer.Implementation == "torznab" {
in = s.getDefinitionByName("torznab")
if in == nil {
// if no indexerDefinition found, continue
return nil, nil
}
} else {
in = s.getDefinitionByName(indexer.Identifier)
if in == nil {
// if no indexerDefinition found, continue
return nil, nil
}
}
indexerDefinition := domain.IndexerDefinition{
ID: int(indexer.ID),
Name: in.Name,
Identifier: in.Identifier,
Enabled: indexer.Enabled,
Description: in.Description,
Language: in.Language,
Privacy: in.Privacy,
Protocol: in.Protocol,
URLS: in.URLS,
Supports: in.Supports,
Settings: nil,
SettingsMap: make(map[string]string),
IRC: in.IRC,
Parse: in.Parse,
ID: int(indexer.ID),
Name: indexer.Name,
Identifier: indexer.Identifier,
Implementation: indexer.Implementation,
Enabled: indexer.Enabled,
Description: in.Description,
Language: in.Language,
Privacy: in.Privacy,
Protocol: in.Protocol,
URLS: in.URLS,
Supports: in.Supports,
Settings: nil,
SettingsMap: make(map[string]string),
IRC: in.IRC,
Torznab: in.Torznab,
Parse: in.Parse,
}
if indexerDefinition.Implementation == "" {
indexerDefinition.Implementation = "irc"
}
// map settings
@ -202,17 +241,24 @@ func (s *service) Start() error {
}
for _, indexer := range indexerDefinitions {
s.mapIRCIndexerLookup(indexer.Identifier, *indexer)
if indexer.IRC != nil {
s.mapIRCIndexerLookup(indexer.Identifier, *indexer)
// add to irc server lookup table
s.mapIRCServerDefinitionLookup(indexer.IRC.Server, *indexer)
// add to irc server lookup table
s.mapIRCServerDefinitionLookup(indexer.IRC.Server, *indexer)
// check if it has api and add to api service
if indexer.Enabled && indexer.HasApi() {
if err := s.apiService.AddClient(indexer.Identifier, indexer.SettingsMap); err != nil {
log.Error().Stack().Err(err).Msgf("indexer.start: could not init api client for: '%v'", indexer.Identifier)
// check if it has api and add to api service
if indexer.Enabled && indexer.HasApi() {
if err := s.apiService.AddClient(indexer.Identifier, indexer.SettingsMap); err != nil {
log.Error().Stack().Err(err).Msgf("indexer.start: could not init api client for: '%v'", indexer.Identifier)
}
}
}
// handle Torznab
if indexer.Implementation == "torznab" {
s.torznabIndexers[indexer.Identifier] = indexer
}
}
log.Info().Msgf("Loaded %d indexers", len(indexerDefinitions))
@ -238,23 +284,34 @@ func (s *service) addIndexer(indexer domain.Indexer) error {
return err
}
if indexerDefinition == nil {
return errors.New("addindexer: could not find definition")
}
// TODO only add enabled?
//if !indexer.Enabled {
// continue
//}
s.mapIRCIndexerLookup(indexer.Identifier, *indexerDefinition)
if indexerDefinition.IRC != nil {
s.mapIRCIndexerLookup(indexer.Identifier, *indexerDefinition)
// add to irc server lookup table
s.mapIRCServerDefinitionLookup(indexerDefinition.IRC.Server, *indexerDefinition)
// add to irc server lookup table
s.mapIRCServerDefinitionLookup(indexerDefinition.IRC.Server, *indexerDefinition)
// check if it has api and add to api service
if indexerDefinition.Enabled && indexerDefinition.HasApi() {
if err := s.apiService.AddClient(indexerDefinition.Identifier, indexerDefinition.SettingsMap); err != nil {
log.Error().Stack().Err(err).Msgf("indexer.start: could not init api client for: '%v'", indexer.Identifier)
// check if it has api and add to api service
if indexerDefinition.Enabled && indexerDefinition.HasApi() {
if err := s.apiService.AddClient(indexerDefinition.Identifier, indexerDefinition.SettingsMap); err != nil {
log.Error().Stack().Err(err).Msgf("indexer.start: could not init api client for: '%v'", indexer.Identifier)
}
}
}
// handle Torznab
if indexerDefinition.Implementation == "torznab" {
s.torznabIndexers[indexer.Identifier] = indexerDefinition
}
return nil
}
@ -410,6 +467,19 @@ func (s *service) GetIndexersByIRCNetwork(server string) []domain.IndexerDefinit
return indexerDefinitions
}
func (s *service) GetTorznabIndexers() []domain.IndexerDefinition {
indexerDefinitions := make([]domain.IndexerDefinition, 0)
for _, definition := range s.torznabIndexers {
if definition != nil {
indexerDefinitions = append(indexerDefinitions, *definition)
}
}
return indexerDefinitions
}
func (s *service) getDefinitionByName(name string) *domain.IndexerDefinition {
if v, ok := s.indexerDefinitions[name]; ok {
@ -429,3 +499,15 @@ func (s *service) getDefinitionForAnnounce(name string) *domain.IndexerDefinitio
return nil
}
func (s *service) stopFeed(indexer string) {
// verify indexer is torznab indexer
_, ok := s.torznabIndexers[indexer]
if !ok {
return
}
if err := s.scheduler.RemoveJobByIdentifier(indexer); err != nil {
return
}
}

View file

@ -11,6 +11,8 @@ import (
"github.com/autobrr/autobrr/internal/announce"
"github.com/autobrr/autobrr/internal/domain"
"github.com/autobrr/autobrr/internal/logger"
"github.com/autobrr/autobrr/internal/release"
"github.com/ergochat/irc-go/ircevent"
"github.com/ergochat/irc-go/ircmsg"
"github.com/rs/zerolog/log"
@ -54,7 +56,7 @@ func (h *channelHealth) resetMonitoring() {
type Handler struct {
network *domain.IrcNetwork
announceSvc announce.Service
releaseSvc release.Service
announceProcessors map[string]announce.Processor
definitions map[string]*domain.IndexerDefinition
@ -71,11 +73,11 @@ type Handler struct {
channelHealth map[string]*channelHealth
}
func NewHandler(network domain.IrcNetwork, definitions []domain.IndexerDefinition, announceSvc announce.Service) *Handler {
func NewHandler(network domain.IrcNetwork, definitions []domain.IndexerDefinition, releaseSvc release.Service) *Handler {
h := &Handler{
client: nil,
network: &network,
announceSvc: announceSvc,
releaseSvc: releaseSvc,
definitions: map[string]*domain.IndexerDefinition{},
announceProcessors: map[string]announce.Processor{},
validAnnouncers: map[string]struct{}{},
@ -104,7 +106,7 @@ func (h *Handler) InitIndexers(definitions []domain.IndexerDefinition) {
// some channels are defined in mixed case
channel = strings.ToLower(channel)
h.announceProcessors[channel] = announce.NewAnnounceProcessor(h.announceSvc, definition)
h.announceProcessors[channel] = announce.NewAnnounceProcessor(h.releaseSvc, definition)
h.channelHealth[channel] = &channelHealth{
name: channel,

View file

@ -6,9 +6,9 @@ import (
"strings"
"sync"
"github.com/autobrr/autobrr/internal/announce"
"github.com/autobrr/autobrr/internal/domain"
"github.com/autobrr/autobrr/internal/indexer"
"github.com/autobrr/autobrr/internal/release"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
@ -28,22 +28,22 @@ type Service interface {
}
type service struct {
repo domain.IrcRepo
announceService announce.Service
indexerService indexer.Service
indexerMap map[string]string
handlers map[handlerKey]*Handler
repo domain.IrcRepo
releaseService release.Service
indexerService indexer.Service
indexerMap map[string]string
handlers map[handlerKey]*Handler
stopWG sync.WaitGroup
lock sync.Mutex
}
func NewService(repo domain.IrcRepo, announceSvc announce.Service, indexerSvc indexer.Service) Service {
func NewService(repo domain.IrcRepo, releaseSvc release.Service, indexerSvc indexer.Service) Service {
return &service{
repo: repo,
announceService: announceSvc,
indexerService: indexerSvc,
handlers: make(map[handlerKey]*Handler),
repo: repo,
releaseService: releaseSvc,
indexerService: indexerSvc,
handlers: make(map[handlerKey]*Handler),
}
}
@ -77,7 +77,7 @@ func (s *service) StartHandlers() {
definitions := s.indexerService.GetIndexersByIRCNetwork(network.Server)
// init new irc handler
handler := NewHandler(network, definitions, s.announceService)
handler := NewHandler(network, definitions, s.releaseService)
// use network.Server + nick to use multiple indexers with different nick per network
// this allows for multiple handlers to one network
@ -133,7 +133,7 @@ func (s *service) startNetwork(network domain.IrcNetwork) error {
definitions := s.indexerService.GetIndexersByIRCNetwork(network.Server)
// init new irc handler
handler := NewHandler(network, definitions, s.announceService)
handler := NewHandler(network, definitions, s.releaseService)
s.handlers[handlerKey{network.Server, network.NickServ.Account}] = handler
s.lock.Unlock()

View file

@ -2,7 +2,13 @@ package release
import (
"context"
"strings"
"github.com/autobrr/autobrr/internal/action"
"github.com/autobrr/autobrr/internal/domain"
"github.com/autobrr/autobrr/internal/filter"
"github.com/rs/zerolog/log"
)
type Service interface {
@ -12,15 +18,28 @@ type Service interface {
Store(ctx context.Context, release *domain.Release) error
StoreReleaseActionStatus(ctx context.Context, actionStatus *domain.ReleaseActionStatus) error
Delete(ctx context.Context) error
Process(release *domain.Release)
ProcessMultiple(releases []*domain.Release)
}
type actionClientTypeKey struct {
Type domain.ActionType
ClientID int32
}
type service struct {
repo domain.ReleaseRepo
actionSvc action.Service
filterSvc filter.Service
}
func NewService(repo domain.ReleaseRepo) Service {
func NewService(repo domain.ReleaseRepo, actionSvc action.Service, filterSvc filter.Service) Service {
return &service{
repo: repo,
repo: repo,
actionSvc: actionSvc,
filterSvc: filterSvc,
}
}
@ -52,3 +71,118 @@ func (s *service) StoreReleaseActionStatus(ctx context.Context, actionStatus *do
func (s *service) Delete(ctx context.Context) error {
return s.repo.Delete(ctx)
}
func (s *service) Process(release *domain.Release) {
if release == nil {
return
}
// TODO check in config for "Save all releases"
// TODO cross-seed check
// TODO dupe checks
// get filters by priority
filters, err := s.filterSvc.FindByIndexerIdentifier(release.Indexer)
if err != nil {
log.Error().Err(err).Msgf("announce.Service.Process: error finding filters for indexer: %v", release.Indexer)
return
}
if len(filters) == 0 {
return
}
// keep track of action clients to avoid sending the same thing all over again
// save both client type and client id to potentially try another client of same type
triedActionClients := map[actionClientTypeKey]struct{}{}
// loop over and check filters
for _, f := range filters {
// save filter on release
release.Filter = &f
release.FilterName = f.Name
release.FilterID = f.ID
// TODO filter limit checks
// test filter
match, err := s.filterSvc.CheckFilter(f, release)
if err != nil {
log.Error().Err(err).Msg("announce.Service.Process: could not find filter")
return
}
if !match {
log.Trace().Msgf("announce.Service.Process: indexer: %v, filter: %v release: %v, no match", release.Indexer, release.Filter.Name, release.TorrentName)
continue
}
log.Info().Msgf("Matched '%v' (%v) for %v", release.TorrentName, release.Filter.Name, release.Indexer)
// save release here to only save those with rejections from actions instead of all releases
if release.ID == 0 {
release.FilterStatus = domain.ReleaseStatusFilterApproved
err = s.Store(context.Background(), release)
if err != nil {
log.Error().Err(err).Msgf("announce.Service.Process: error writing release to database: %+v", release)
return
}
}
var rejections []string
// run actions (watchFolder, test, exec, qBittorrent, Deluge, arr etc.)
for _, a := range release.Filter.Actions {
// only run enabled actions
if !a.Enabled {
log.Trace().Msgf("announce.Service.Process: indexer: %v, filter: %v release: %v action '%v' not enabled, skip", release.Indexer, release.Filter.Name, release.TorrentName, a.Name)
continue
}
log.Trace().Msgf("announce.Service.Process: indexer: %v, filter: %v release: %v , run action: %v", release.Indexer, release.Filter.Name, release.TorrentName, a.Name)
// keep track of action clients to avoid sending the same thing all over again
_, tried := triedActionClients[actionClientTypeKey{Type: a.Type, ClientID: a.ClientID}]
if tried {
log.Trace().Msgf("announce.Service.Process: indexer: %v, filter: %v release: %v action client already tried, skip", release.Indexer, release.Filter.Name, release.TorrentName)
continue
}
rejections, err = s.actionSvc.RunAction(a, *release)
if err != nil {
log.Error().Stack().Err(err).Msgf("announce.Service.Process: error running actions for filter: %v", release.Filter.Name)
continue
}
if len(rejections) > 0 {
// if we get a rejection, remember which action client it was from
triedActionClients[actionClientTypeKey{Type: a.Type, ClientID: a.ClientID}] = struct{}{}
// log something and fire events
log.Debug().Msgf("announce.Service.Process: indexer: %v, filter: %v release: %v, rejected: %v", release.Indexer, release.Filter.Name, release.TorrentName, strings.Join(rejections, ", "))
}
// if no rejections consider action approved, run next
continue
}
// if we have rejections from arr, continue to next filter
if len(rejections) > 0 {
continue
}
// all actions run, decide to stop or continue here
break
}
return
}
func (s *service) ProcessMultiple(releases []*domain.Release) {
for _, rls := range releases {
if rls == nil {
continue
}
s.Process(rls)
}
}

View file

@ -0,0 +1,87 @@
package scheduler
import (
"fmt"
"github.com/robfig/cron/v3"
"github.com/rs/zerolog/log"
)
type Service interface {
Start()
Stop()
AddJob(job cron.Job, interval string, identifier string) (int, error)
RemoveJobByID(id cron.EntryID) error
RemoveJobByIdentifier(id string) error
}
type service struct {
cron *cron.Cron
jobs map[string]cron.EntryID
}
func NewService() Service {
return &service{
cron: cron.New(cron.WithChain(
cron.Recover(cron.DefaultLogger),
)),
jobs: map[string]cron.EntryID{},
}
}
func (s *service) Start() {
log.Debug().Msg("scheduler.Start")
s.cron.Start()
return
}
func (s *service) Stop() {
log.Debug().Msg("scheduler.Stop")
s.cron.Stop()
return
}
func (s *service) AddJob(job cron.Job, interval string, identifier string) (int, error) {
id, err := s.cron.AddJob(interval, cron.NewChain(
cron.SkipIfStillRunning(cron.DiscardLogger)).Then(job),
)
if err != nil {
return 0, fmt.Errorf("scheduler: add job failed: %w", err)
}
log.Debug().Msgf("scheduler.AddJob: job successfully added: %v", id)
// add to job map
s.jobs[identifier] = id
return int(id), nil
}
func (s *service) RemoveJobByID(id cron.EntryID) error {
v, ok := s.jobs[""]
if !ok {
return nil
}
s.cron.Remove(v)
return nil
}
func (s *service) RemoveJobByIdentifier(id string) error {
v, ok := s.jobs[id]
if !ok {
return nil
}
log.Debug().Msgf("scheduler.Remove: removing job: %v", id)
// remove from cron
s.cron.Remove(v)
// remove from jobs map
delete(s.jobs, id)
return nil
}

View file

@ -3,10 +3,12 @@ package server
import (
"sync"
"github.com/rs/zerolog/log"
"github.com/autobrr/autobrr/internal/feed"
"github.com/autobrr/autobrr/internal/indexer"
"github.com/autobrr/autobrr/internal/irc"
"github.com/autobrr/autobrr/internal/scheduler"
"github.com/rs/zerolog/log"
)
type Server struct {
@ -15,30 +17,42 @@ type Server struct {
indexerService indexer.Service
ircService irc.Service
feedService feed.Service
scheduler scheduler.Service
stopWG sync.WaitGroup
lock sync.Mutex
}
func NewServer(ircSvc irc.Service, indexerSvc indexer.Service) *Server {
func NewServer(ircSvc irc.Service, indexerSvc indexer.Service, feedSvc feed.Service, scheduler scheduler.Service) *Server {
return &Server{
indexerService: indexerSvc,
ircService: ircSvc,
feedService: feedSvc,
scheduler: scheduler,
}
}
func (s *Server) Start() error {
log.Info().Msgf("Starting server. Listening on %v:%v", s.Hostname, s.Port)
// start cron scheduler
s.scheduler.Start()
// instantiate indexers
err := s.indexerService.Start()
if err != nil {
if err := s.indexerService.Start(); err != nil {
log.Error().Err(err).Msg("Could not start indexer service")
return err
}
// instantiate and start irc networks
s.ircService.StartHandlers()
// start torznab feeds
if err := s.feedService.Start(); err != nil {
log.Error().Err(err).Msg("Could not start feed service")
}
return nil
}
@ -47,4 +61,7 @@ func (s *Server) Shutdown() {
// stop all irc handlers
s.ircService.StopHandlers()
// stop cron scheduler
s.scheduler.Stop()
}