mirror of
https://github.com/idanoo/autobrr
synced 2025-07-22 16:29:12 +00:00
refactor: filter and action flow (#225)
* refactor: fitler and action flow * fix: save release before filters * feat: add action client to notifications * feat: improve filter check logging
This commit is contained in:
parent
f32379ae76
commit
a3854ecd59
21 changed files with 654 additions and 313 deletions
|
@ -11,10 +11,10 @@ import (
|
|||
"github.com/spf13/pflag"
|
||||
|
||||
"github.com/autobrr/autobrr/internal/action"
|
||||
"github.com/autobrr/autobrr/internal/announce"
|
||||
"github.com/autobrr/autobrr/internal/auth"
|
||||
"github.com/autobrr/autobrr/internal/config"
|
||||
"github.com/autobrr/autobrr/internal/database"
|
||||
"github.com/autobrr/autobrr/internal/domain"
|
||||
"github.com/autobrr/autobrr/internal/download_client"
|
||||
"github.com/autobrr/autobrr/internal/events"
|
||||
"github.com/autobrr/autobrr/internal/filter"
|
||||
|
@ -29,7 +29,6 @@ import (
|
|||
)
|
||||
|
||||
var (
|
||||
cfg domain.Config
|
||||
version = "dev"
|
||||
commit = ""
|
||||
date = ""
|
||||
|
@ -41,7 +40,7 @@ func main() {
|
|||
pflag.Parse()
|
||||
|
||||
// read config
|
||||
cfg = config.Read(configPath)
|
||||
cfg := config.Read(configPath)
|
||||
|
||||
// setup server-sent-events
|
||||
serverEvents := sse.New()
|
||||
|
@ -70,8 +69,8 @@ func main() {
|
|||
|
||||
// setup repos
|
||||
var (
|
||||
actionRepo = database.NewActionRepo(db)
|
||||
downloadClientRepo = database.NewDownloadClientRepo(db)
|
||||
actionRepo = database.NewActionRepo(db, downloadClientRepo)
|
||||
filterRepo = database.NewFilterRepo(db)
|
||||
indexerRepo = database.NewIndexerRepo(db)
|
||||
ircRepo = database.NewIrcRepo(db)
|
||||
|
@ -87,8 +86,9 @@ func main() {
|
|||
apiService = indexer.NewAPIService()
|
||||
indexerService = indexer.NewService(cfg, indexerRepo, apiService)
|
||||
filterService = filter.NewService(filterRepo, actionRepo, apiService, indexerService)
|
||||
releaseService = release.NewService(releaseRepo, actionService)
|
||||
ircService = irc.NewService(ircRepo, filterService, indexerService, releaseService)
|
||||
releaseService = release.NewService(releaseRepo)
|
||||
announceService = announce.NewService(actionService, filterService, releaseService)
|
||||
ircService = irc.NewService(ircRepo, announceService, indexerService)
|
||||
notificationService = notification.NewService(notificationRepo)
|
||||
userService = user.NewService(userRepo)
|
||||
authService = auth.NewService(userService)
|
||||
|
|
|
@ -59,6 +59,178 @@ func (s *service) RunActions(actions []domain.Action, release domain.Release) er
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *service) RunAction(action *domain.Action, release domain.Release) ([]string, error) {
|
||||
|
||||
var err error
|
||||
var rejections []string
|
||||
|
||||
switch action.Type {
|
||||
case domain.ActionTypeTest:
|
||||
s.test(action.Name)
|
||||
|
||||
case domain.ActionTypeExec:
|
||||
if release.TorrentTmpFile == "" {
|
||||
if err := release.DownloadTorrentFile(); err != nil {
|
||||
log.Error().Stack().Err(err)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
s.execCmd(release, *action)
|
||||
|
||||
case domain.ActionTypeWatchFolder:
|
||||
if release.TorrentTmpFile == "" {
|
||||
if err := release.DownloadTorrentFile(); err != nil {
|
||||
log.Error().Stack().Err(err)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
s.watchFolder(*action, release)
|
||||
|
||||
case domain.ActionTypeWebhook:
|
||||
if release.TorrentTmpFile == "" {
|
||||
if err := release.DownloadTorrentFile(); err != nil {
|
||||
log.Error().Stack().Err(err)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
s.webhook(*action, release)
|
||||
|
||||
case domain.ActionTypeDelugeV1, domain.ActionTypeDelugeV2:
|
||||
canDownload, err := s.delugeCheckRulesCanDownload(*action)
|
||||
if err != nil {
|
||||
log.Error().Stack().Err(err).Msgf("error checking client rules: %v", action.Name)
|
||||
break
|
||||
}
|
||||
if !canDownload {
|
||||
rejections = []string{"max active downloads reached, skipping"}
|
||||
break
|
||||
}
|
||||
|
||||
if release.TorrentTmpFile == "" {
|
||||
if err := release.DownloadTorrentFile(); err != nil {
|
||||
log.Error().Stack().Err(err)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
err = s.deluge(*action, release)
|
||||
if err != nil {
|
||||
log.Error().Stack().Err(err).Msg("error sending torrent to Deluge")
|
||||
break
|
||||
}
|
||||
|
||||
case domain.ActionTypeQbittorrent:
|
||||
canDownload, client, err := s.qbittorrentCheckRulesCanDownload(*action)
|
||||
if err != nil {
|
||||
log.Error().Stack().Err(err).Msgf("error checking client rules: %v", action.Name)
|
||||
break
|
||||
}
|
||||
if !canDownload {
|
||||
rejections = []string{"max active downloads reached, skipping"}
|
||||
break
|
||||
}
|
||||
|
||||
if release.TorrentTmpFile == "" {
|
||||
if err := release.DownloadTorrentFile(); err != nil {
|
||||
log.Error().Stack().Err(err)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
err = s.qbittorrent(client, *action, release)
|
||||
if err != nil {
|
||||
log.Error().Stack().Err(err).Msg("error sending torrent to qBittorrent")
|
||||
break
|
||||
}
|
||||
|
||||
case domain.ActionTypeRadarr:
|
||||
rejections, err = s.radarr(release, *action)
|
||||
if err != nil {
|
||||
log.Error().Stack().Err(err).Msg("error sending torrent to radarr")
|
||||
break
|
||||
}
|
||||
|
||||
case domain.ActionTypeSonarr:
|
||||
rejections, err = s.sonarr(release, *action)
|
||||
if err != nil {
|
||||
log.Error().Stack().Err(err).Msg("error sending torrent to sonarr")
|
||||
break
|
||||
}
|
||||
|
||||
case domain.ActionTypeLidarr:
|
||||
rejections, err = s.lidarr(release, *action)
|
||||
if err != nil {
|
||||
log.Error().Stack().Err(err).Msg("error sending torrent to lidarr")
|
||||
break
|
||||
}
|
||||
|
||||
case domain.ActionTypeWhisparr:
|
||||
rejections, err = s.whisparr(release, *action)
|
||||
if err != nil {
|
||||
log.Error().Stack().Err(err).Msg("error sending torrent to whisparr")
|
||||
break
|
||||
}
|
||||
|
||||
default:
|
||||
log.Warn().Msgf("unsupported action type: %v", action.Type)
|
||||
return rejections, err
|
||||
}
|
||||
|
||||
rlsActionStatus := &domain.ReleaseActionStatus{
|
||||
ReleaseID: release.ID,
|
||||
Status: domain.ReleasePushStatusApproved,
|
||||
Action: action.Name,
|
||||
Type: action.Type,
|
||||
Rejections: []string{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
notificationEvent := &domain.EventsReleasePushed{
|
||||
ReleaseName: release.TorrentName,
|
||||
Filter: release.Filter.Name,
|
||||
Indexer: release.Indexer,
|
||||
InfoHash: release.TorrentHash,
|
||||
Size: release.Size,
|
||||
Status: domain.ReleasePushStatusApproved,
|
||||
Action: action.Name,
|
||||
ActionType: action.Type,
|
||||
ActionClient: action.Client.Name,
|
||||
Rejections: []string{},
|
||||
Protocol: domain.ReleaseProtocolTorrent,
|
||||
Implementation: domain.ReleaseImplementationIRC,
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
log.Err(err).Stack().Msgf("process action failed: %v for '%v'", action.Name, release.TorrentName)
|
||||
|
||||
rlsActionStatus.Status = domain.ReleasePushStatusErr
|
||||
rlsActionStatus.Rejections = []string{err.Error()}
|
||||
|
||||
notificationEvent.Status = domain.ReleasePushStatusErr
|
||||
notificationEvent.Rejections = []string{err.Error()}
|
||||
}
|
||||
|
||||
if rejections != nil {
|
||||
rlsActionStatus.Status = domain.ReleasePushStatusRejected
|
||||
rlsActionStatus.Rejections = rejections
|
||||
|
||||
notificationEvent.Status = domain.ReleasePushStatusRejected
|
||||
notificationEvent.Rejections = rejections
|
||||
}
|
||||
|
||||
// send event for actions
|
||||
s.bus.Publish("release:push", rlsActionStatus)
|
||||
|
||||
// send separate event for notifications
|
||||
s.bus.Publish("events:release:push", notificationEvent)
|
||||
|
||||
return rejections, err
|
||||
}
|
||||
|
||||
func (s *service) runAction(action domain.Action, release domain.Release) error {
|
||||
|
||||
var err error
|
||||
|
|
|
@ -17,6 +17,7 @@ type Service interface {
|
|||
ToggleEnabled(actionID int) error
|
||||
|
||||
RunActions(actions []domain.Action, release domain.Release) error
|
||||
RunAction(action *domain.Action, release domain.Release) ([]string, error)
|
||||
CheckCanDownload(actions []domain.Action) bool
|
||||
}
|
||||
|
||||
|
|
|
@ -2,7 +2,6 @@ package announce
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/url"
|
||||
|
@ -11,9 +10,6 @@ import (
|
|||
"text/template"
|
||||
|
||||
"github.com/autobrr/autobrr/internal/domain"
|
||||
"github.com/autobrr/autobrr/internal/filter"
|
||||
"github.com/autobrr/autobrr/internal/release"
|
||||
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
|
@ -24,17 +20,15 @@ type Processor interface {
|
|||
type announceProcessor struct {
|
||||
indexer domain.IndexerDefinition
|
||||
|
||||
filterSvc filter.Service
|
||||
releaseSvc release.Service
|
||||
announceSvc Service
|
||||
|
||||
queues map[string]chan string
|
||||
}
|
||||
|
||||
func NewAnnounceProcessor(indexer domain.IndexerDefinition, filterSvc filter.Service, releaseSvc release.Service) Processor {
|
||||
func NewAnnounceProcessor(announceSvc Service, indexer domain.IndexerDefinition) Processor {
|
||||
ap := &announceProcessor{
|
||||
indexer: indexer,
|
||||
filterSvc: filterSvc,
|
||||
releaseSvc: releaseSvc,
|
||||
announceSvc: announceSvc,
|
||||
indexer: indexer,
|
||||
}
|
||||
|
||||
// setup queues and consumers
|
||||
|
@ -115,52 +109,8 @@ func (a *announceProcessor) processQueue(queue chan string) {
|
|||
continue
|
||||
}
|
||||
|
||||
// send to filter service to take care of the rest
|
||||
|
||||
// find and check filter
|
||||
filterOK, foundFilter, err := a.filterSvc.FindAndCheckFilters(newRelease)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("could not find filter")
|
||||
continue
|
||||
}
|
||||
|
||||
// no foundFilter found, lets return
|
||||
if !filterOK || foundFilter == nil {
|
||||
log.Trace().Msg("no matching filter found")
|
||||
continue
|
||||
|
||||
// TODO check in config for "Save all releases"
|
||||
// Save as rejected
|
||||
//newRelease.FilterStatus = domain.ReleaseStatusFilterRejected
|
||||
//err = s.releaseSvc.Store(ctx, newRelease)
|
||||
//if err != nil {
|
||||
// log.Error().Err(err).Msgf("error writing release to database: %+v", newRelease)
|
||||
// return nil
|
||||
//}
|
||||
//return nil
|
||||
}
|
||||
|
||||
// save release
|
||||
newRelease.Filter = foundFilter
|
||||
newRelease.FilterName = foundFilter.Name
|
||||
newRelease.FilterID = foundFilter.ID
|
||||
|
||||
newRelease.FilterStatus = domain.ReleaseStatusFilterApproved
|
||||
err = a.releaseSvc.Store(context.Background(), newRelease)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msgf("error writing release to database: %+v", newRelease)
|
||||
continue
|
||||
}
|
||||
|
||||
log.Info().Msgf("Matched '%v' (%v) for %v", newRelease.TorrentName, newRelease.Filter.Name, newRelease.Indexer)
|
||||
|
||||
// process release
|
||||
go func(rel *domain.Release) {
|
||||
err = a.releaseSvc.Process(*rel)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msgf("could not process release: %+v", newRelease)
|
||||
}
|
||||
}(newRelease)
|
||||
// process release in a new go routine
|
||||
go a.announceSvc.Process(newRelease)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
132
internal/announce/service.go
Normal file
132
internal/announce/service.go
Normal file
|
@ -0,0 +1,132 @@
|
|||
package announce
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
|
||||
"github.com/autobrr/autobrr/internal/action"
|
||||
"github.com/autobrr/autobrr/internal/domain"
|
||||
"github.com/autobrr/autobrr/internal/filter"
|
||||
"github.com/autobrr/autobrr/internal/release"
|
||||
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
type Service interface {
|
||||
Process(release *domain.Release)
|
||||
}
|
||||
|
||||
type service struct {
|
||||
actionSvc action.Service
|
||||
filterSvc filter.Service
|
||||
releaseSvc release.Service
|
||||
}
|
||||
|
||||
type actionClientTypeKey struct {
|
||||
Type domain.ActionType
|
||||
ClientID int32
|
||||
}
|
||||
|
||||
func NewService(actionSvc action.Service, filterSvc filter.Service, releaseSvc release.Service) Service {
|
||||
return &service{
|
||||
actionSvc: actionSvc,
|
||||
filterSvc: filterSvc,
|
||||
releaseSvc: releaseSvc,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *service) Process(release *domain.Release) {
|
||||
// TODO check in config for "Save all releases"
|
||||
// TODO cross-seed check
|
||||
// TODO dupe checks
|
||||
|
||||
// get filters by priority
|
||||
filters, err := s.filterSvc.FindByIndexerIdentifier(release.Indexer)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msgf("announce.Service.Process: error finding filters for indexer: %v", release.Indexer)
|
||||
return
|
||||
}
|
||||
|
||||
// keep track of action clients to avoid sending the same thing all over again
|
||||
// save both client type and client id to potentially try another client of same type
|
||||
triedActionClients := map[actionClientTypeKey]struct{}{}
|
||||
|
||||
// save release
|
||||
//release.FilterStatus = domain.ReleaseStatusFilterApproved
|
||||
err = s.releaseSvc.Store(context.Background(), release)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msgf("announce.Service.Process: error writing release to database: %+v", release)
|
||||
return
|
||||
}
|
||||
|
||||
// loop over and check filters
|
||||
for _, f := range filters {
|
||||
// save filter on release
|
||||
release.Filter = &f
|
||||
release.FilterName = f.Name
|
||||
release.FilterID = f.ID
|
||||
|
||||
// TODO filter limit checks
|
||||
|
||||
// test filter
|
||||
match, err := s.filterSvc.CheckFilter(f, release)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("announce.Service.Process: could not find filter")
|
||||
return
|
||||
}
|
||||
|
||||
if !match {
|
||||
log.Trace().Msgf("announce.Service.Process: indexer: %v, filter: %v release: %v, no match", release.Indexer, release.Filter.Name, release.TorrentName)
|
||||
continue
|
||||
}
|
||||
|
||||
log.Info().Msgf("Matched '%v' (%v) for %v", release.TorrentName, release.Filter.Name, release.Indexer)
|
||||
|
||||
var rejections []string
|
||||
|
||||
// run actions (watchFolder, test, exec, qBittorrent, Deluge, arr etc.)
|
||||
for _, a := range release.Filter.Actions {
|
||||
// only run enabled actions
|
||||
if !a.Enabled {
|
||||
log.Trace().Msgf("announce.Service.Process: indexer: %v, filter: %v release: %v action '%v' not enabled, skip", release.Indexer, release.Filter.Name, release.TorrentName, a.Name)
|
||||
continue
|
||||
}
|
||||
|
||||
log.Trace().Msgf("announce.Service.Process: indexer: %v, filter: %v release: %v , run action: %v", release.Indexer, release.Filter.Name, release.TorrentName, a.Name)
|
||||
|
||||
// keep track of action clients to avoid sending the same thing all over again
|
||||
_, tried := triedActionClients[actionClientTypeKey{Type: a.Type, ClientID: a.ClientID}]
|
||||
if tried {
|
||||
log.Trace().Msgf("announce.Service.Process: indexer: %v, filter: %v release: %v action client already tried, skip", release.Indexer, release.Filter.Name, release.TorrentName)
|
||||
continue
|
||||
}
|
||||
|
||||
rejections, err = s.actionSvc.RunAction(a, *release)
|
||||
if err != nil {
|
||||
log.Error().Stack().Err(err).Msgf("announce.Service.Process: error running actions for filter: %v", release.Filter.Name)
|
||||
continue
|
||||
}
|
||||
|
||||
if len(rejections) > 0 {
|
||||
// if we get a rejection, remember which action client it was from
|
||||
triedActionClients[actionClientTypeKey{Type: a.Type, ClientID: a.ClientID}] = struct{}{}
|
||||
|
||||
// log something and fire events
|
||||
log.Debug().Msgf("announce.Service.Process: indexer: %v, filter: %v release: %v, rejected: %v", release.Indexer, release.Filter.Name, release.TorrentName, strings.Join(rejections, ", "))
|
||||
}
|
||||
|
||||
// if no rejections consider action approved, run next
|
||||
continue
|
||||
}
|
||||
|
||||
// if we have rejections from arr, continue to next filter
|
||||
if len(rejections) > 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// all actions run, decide to stop or continue here
|
||||
break
|
||||
}
|
||||
|
||||
return
|
||||
}
|
|
@ -3,7 +3,7 @@ package database
|
|||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
|
||||
"encoding/json"
|
||||
"github.com/autobrr/autobrr/internal/domain"
|
||||
|
||||
sq "github.com/Masterminds/squirrel"
|
||||
|
@ -11,14 +11,45 @@ import (
|
|||
)
|
||||
|
||||
type ActionRepo struct {
|
||||
db *DB
|
||||
db *DB
|
||||
clientRepo domain.DownloadClientRepo
|
||||
}
|
||||
|
||||
func NewActionRepo(db *DB) domain.ActionRepo {
|
||||
return &ActionRepo{db: db}
|
||||
func NewActionRepo(db *DB, clientRepo domain.DownloadClientRepo) domain.ActionRepo {
|
||||
return &ActionRepo{
|
||||
db: db,
|
||||
clientRepo: clientRepo,
|
||||
}
|
||||
}
|
||||
|
||||
func (r *ActionRepo) FindByFilterID(ctx context.Context, filterID int) ([]domain.Action, error) {
|
||||
func (r *ActionRepo) FindByFilterID(ctx context.Context, filterID int) ([]*domain.Action, error) {
|
||||
|
||||
tx, err := r.db.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
defer tx.Rollback()
|
||||
|
||||
actions, err := r.findByFilterID(ctx, tx, filterID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, action := range actions {
|
||||
if action.ClientID != 0 {
|
||||
client, err := r.attachDownloadClient(ctx, tx, action.ClientID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
action.Client = *client
|
||||
}
|
||||
}
|
||||
|
||||
return actions, nil
|
||||
}
|
||||
|
||||
func (r *ActionRepo) findByFilterID(ctx context.Context, tx *Tx, filterID int) ([]*domain.Action, error) {
|
||||
queryBuilder := r.db.squirrel.
|
||||
Select(
|
||||
"id",
|
||||
|
@ -51,7 +82,7 @@ func (r *ActionRepo) FindByFilterID(ctx context.Context, filterID int) ([]domain
|
|||
return nil, err
|
||||
}
|
||||
|
||||
rows, err := r.db.handler.QueryContext(ctx, query, args...)
|
||||
rows, err := tx.QueryContext(ctx, query, args...)
|
||||
if err != nil {
|
||||
log.Error().Stack().Err(err).Msg("action.findByFilterID: query error")
|
||||
return nil, err
|
||||
|
@ -59,7 +90,7 @@ func (r *ActionRepo) FindByFilterID(ctx context.Context, filterID int) ([]domain
|
|||
|
||||
defer rows.Close()
|
||||
|
||||
actions := make([]domain.Action, 0)
|
||||
actions := make([]*domain.Action, 0)
|
||||
for rows.Next() {
|
||||
var a domain.Action
|
||||
|
||||
|
@ -91,7 +122,7 @@ func (r *ActionRepo) FindByFilterID(ctx context.Context, filterID int) ([]domain
|
|||
a.WebhookMethod = webhookMethod.String
|
||||
a.ClientID = clientID.Int32
|
||||
|
||||
actions = append(actions, a)
|
||||
actions = append(actions, &a)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
log.Error().Stack().Err(err).Msg("action.findByFilterID: row error")
|
||||
|
@ -100,6 +131,54 @@ func (r *ActionRepo) FindByFilterID(ctx context.Context, filterID int) ([]domain
|
|||
|
||||
return actions, nil
|
||||
}
|
||||
func (r *ActionRepo) attachDownloadClient(ctx context.Context, tx *Tx, clientID int32) (*domain.DownloadClient, error) {
|
||||
|
||||
queryBuilder := r.db.squirrel.
|
||||
Select(
|
||||
"id",
|
||||
"name",
|
||||
"type",
|
||||
"enabled",
|
||||
"host",
|
||||
"port",
|
||||
"tls",
|
||||
"tls_skip_verify",
|
||||
"username",
|
||||
"password",
|
||||
"settings",
|
||||
).
|
||||
From("client").
|
||||
Where("id = ?", clientID)
|
||||
|
||||
query, args, err := queryBuilder.ToSql()
|
||||
if err != nil {
|
||||
log.Error().Stack().Err(err).Msg("action.attachDownloadClient: error building query")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
row := tx.QueryRowContext(ctx, query, args...)
|
||||
if err := row.Err(); err != nil {
|
||||
log.Error().Stack().Err(err).Msg("action.attachDownloadClient: error query row")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var client domain.DownloadClient
|
||||
var settingsJsonStr string
|
||||
|
||||
if err := row.Scan(&client.ID, &client.Name, &client.Type, &client.Enabled, &client.Host, &client.Port, &client.TLS, &client.TLSSkipVerify, &client.Username, &client.Password, &settingsJsonStr); err != nil {
|
||||
log.Error().Stack().Err(err).Msg("action.attachDownloadClient: error scanning row")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if settingsJsonStr != "" {
|
||||
if err := json.Unmarshal([]byte(settingsJsonStr), &client.Settings); err != nil {
|
||||
log.Error().Stack().Err(err).Msgf("action.attachDownloadClient: could not marshal download client settings %v", settingsJsonStr)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return &client, nil
|
||||
}
|
||||
|
||||
func (r *ActionRepo) List(ctx context.Context) ([]domain.Action, error) {
|
||||
queryBuilder := r.db.squirrel.
|
||||
|
@ -361,7 +440,7 @@ func (r *ActionRepo) Update(ctx context.Context, action domain.Action) (*domain.
|
|||
return &action, nil
|
||||
}
|
||||
|
||||
func (r *ActionRepo) StoreFilterActions(ctx context.Context, actions []domain.Action, filterID int64) ([]domain.Action, error) {
|
||||
func (r *ActionRepo) StoreFilterActions(ctx context.Context, actions []*domain.Action, filterID int64) ([]*domain.Action, error) {
|
||||
tx, err := r.db.handler.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
@ -4,37 +4,38 @@ import "context"
|
|||
|
||||
type ActionRepo interface {
|
||||
Store(ctx context.Context, action Action) (*Action, error)
|
||||
StoreFilterActions(ctx context.Context, actions []Action, filterID int64) ([]Action, error)
|
||||
StoreFilterActions(ctx context.Context, actions []*Action, filterID int64) ([]*Action, error)
|
||||
DeleteByFilterID(ctx context.Context, filterID int) error
|
||||
FindByFilterID(ctx context.Context, filterID int) ([]Action, error)
|
||||
FindByFilterID(ctx context.Context, filterID int) ([]*Action, error)
|
||||
List(ctx context.Context) ([]Action, error)
|
||||
Delete(actionID int) error
|
||||
ToggleEnabled(actionID int) error
|
||||
}
|
||||
|
||||
type Action struct {
|
||||
ID int `json:"id"`
|
||||
Name string `json:"name"`
|
||||
Type ActionType `json:"type"`
|
||||
Enabled bool `json:"enabled"`
|
||||
ExecCmd string `json:"exec_cmd,omitempty"`
|
||||
ExecArgs string `json:"exec_args,omitempty"`
|
||||
WatchFolder string `json:"watch_folder,omitempty"`
|
||||
Category string `json:"category,omitempty"`
|
||||
Tags string `json:"tags,omitempty"`
|
||||
Label string `json:"label,omitempty"`
|
||||
SavePath string `json:"save_path,omitempty"`
|
||||
Paused bool `json:"paused,omitempty"`
|
||||
IgnoreRules bool `json:"ignore_rules,omitempty"`
|
||||
LimitUploadSpeed int64 `json:"limit_upload_speed,omitempty"`
|
||||
LimitDownloadSpeed int64 `json:"limit_download_speed,omitempty"`
|
||||
WebhookHost string `json:"webhook_host,omitempty"`
|
||||
WebhookType string `json:"webhook_type,omitempty"`
|
||||
WebhookMethod string `json:"webhook_method,omitempty"`
|
||||
WebhookData string `json:"webhook_data,omitempty"`
|
||||
WebhookHeaders []string `json:"webhook_headers,omitempty"`
|
||||
FilterID int `json:"filter_id,omitempty"`
|
||||
ClientID int32 `json:"client_id,omitempty"`
|
||||
ID int `json:"id"`
|
||||
Name string `json:"name"`
|
||||
Type ActionType `json:"type"`
|
||||
Enabled bool `json:"enabled"`
|
||||
ExecCmd string `json:"exec_cmd,omitempty"`
|
||||
ExecArgs string `json:"exec_args,omitempty"`
|
||||
WatchFolder string `json:"watch_folder,omitempty"`
|
||||
Category string `json:"category,omitempty"`
|
||||
Tags string `json:"tags,omitempty"`
|
||||
Label string `json:"label,omitempty"`
|
||||
SavePath string `json:"save_path,omitempty"`
|
||||
Paused bool `json:"paused,omitempty"`
|
||||
IgnoreRules bool `json:"ignore_rules,omitempty"`
|
||||
LimitUploadSpeed int64 `json:"limit_upload_speed,omitempty"`
|
||||
LimitDownloadSpeed int64 `json:"limit_download_speed,omitempty"`
|
||||
WebhookHost string `json:"webhook_host,omitempty"`
|
||||
WebhookType string `json:"webhook_type,omitempty"`
|
||||
WebhookMethod string `json:"webhook_method,omitempty"`
|
||||
WebhookData string `json:"webhook_data,omitempty"`
|
||||
WebhookHeaders []string `json:"webhook_headers,omitempty"`
|
||||
FilterID int `json:"filter_id,omitempty"`
|
||||
ClientID int32 `json:"client_id,omitempty"`
|
||||
Client DownloadClient `json:"client,omitempty"`
|
||||
}
|
||||
|
||||
type ActionType string
|
||||
|
|
|
@ -11,6 +11,7 @@ type EventsReleasePushed struct {
|
|||
Status ReleasePushStatus
|
||||
Action string
|
||||
ActionType ActionType
|
||||
ActionClient string
|
||||
Rejections []string
|
||||
Protocol ReleaseProtocol // torrent
|
||||
Implementation ReleaseImplementation // irc, rss, api
|
||||
|
|
|
@ -71,6 +71,6 @@ type Filter struct {
|
|||
ExceptTags string `json:"except_tags"`
|
||||
TagsAny string `json:"tags_any"`
|
||||
ExceptTagsAny string `json:"except_tags_any"`
|
||||
Actions []Action `json:"actions"`
|
||||
Actions []*Action `json:"actions"`
|
||||
Indexers []Indexer `json:"indexers"`
|
||||
}
|
||||
|
|
|
@ -705,196 +705,178 @@ func (r *Release) addRejection(reason string) {
|
|||
r.Rejections = append(r.Rejections, reason)
|
||||
}
|
||||
|
||||
func (r *Release) addRejectionF(format string, v ...interface{}) {
|
||||
r.Rejections = append(r.Rejections, fmt.Sprintf(format, v...))
|
||||
}
|
||||
|
||||
// ResetRejections reset rejections between filter checks
|
||||
func (r *Release) resetRejections() {
|
||||
r.Rejections = []string{}
|
||||
}
|
||||
|
||||
func (r *Release) CheckFilter(filter Filter) bool {
|
||||
func (r *Release) RejectionsString() string {
|
||||
if len(r.Rejections) > 0 {
|
||||
return strings.Join(r.Rejections, ", ")
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (r *Release) CheckFilter(filter Filter) ([]string, bool) {
|
||||
// reset rejections first to clean previous checks
|
||||
r.resetRejections()
|
||||
|
||||
if !filter.Enabled {
|
||||
return false
|
||||
return nil, false
|
||||
}
|
||||
|
||||
// FIXME what if someone explicitly doesnt want scene, or toggles in filter. Make enum? 0,1,2? Yes, No, Dont care
|
||||
if filter.Scene && r.IsScene != filter.Scene {
|
||||
r.addRejection("wanted: scene")
|
||||
return false
|
||||
}
|
||||
|
||||
if filter.Freeleech && r.Freeleech != filter.Freeleech {
|
||||
r.addRejection("wanted: freeleech")
|
||||
return false
|
||||
}
|
||||
|
||||
if filter.FreeleechPercent != "" && !checkFreeleechPercent(r.FreeleechPercent, filter.FreeleechPercent) {
|
||||
r.addRejection("freeleech percent not matching")
|
||||
return false
|
||||
r.addRejectionF("freeleech percent not matching. wanted: %v got: %v", filter.FreeleechPercent, r.FreeleechPercent)
|
||||
}
|
||||
|
||||
// check against TorrentName and Clean which is a cleaned name without (. _ -)
|
||||
if filter.Shows != "" && !checkMultipleFilterStrings(filter.Shows, r.TorrentName, r.Clean) {
|
||||
r.addRejection("shows not matching")
|
||||
return false
|
||||
}
|
||||
|
||||
if filter.Seasons != "" && !checkFilterIntStrings(r.Season, filter.Seasons) {
|
||||
r.addRejection("season not matching")
|
||||
return false
|
||||
r.addRejectionF("season not matching. wanted: %v got: %d", filter.Seasons, r.Season)
|
||||
}
|
||||
|
||||
if filter.Episodes != "" && !checkFilterIntStrings(r.Episode, filter.Episodes) {
|
||||
r.addRejection("episode not matching")
|
||||
return false
|
||||
r.addRejectionF("episodes not matching. wanted: %v got: %d", filter.Seasons, r.Season)
|
||||
}
|
||||
|
||||
// matchRelease
|
||||
// TODO allow to match against regex
|
||||
if filter.MatchReleases != "" && !checkMultipleFilterStrings(filter.MatchReleases, r.TorrentName, r.Clean) {
|
||||
r.addRejection("match release not matching")
|
||||
return false
|
||||
}
|
||||
|
||||
if filter.ExceptReleases != "" && checkMultipleFilterStrings(filter.ExceptReleases, r.TorrentName, r.Clean) {
|
||||
r.addRejection("except_releases: unwanted release")
|
||||
return false
|
||||
}
|
||||
|
||||
if filter.MatchReleaseGroups != "" && !checkMultipleFilterGroups(filter.MatchReleaseGroups, r.Group, r.Clean) {
|
||||
r.addRejection("release groups not matching")
|
||||
return false
|
||||
r.addRejectionF("release groups not matching. wanted: %v got: %v", filter.MatchReleaseGroups, r.Group)
|
||||
}
|
||||
|
||||
if filter.ExceptReleaseGroups != "" && checkMultipleFilterGroups(filter.ExceptReleaseGroups, r.Group, r.Clean) {
|
||||
r.addRejection("unwanted release group")
|
||||
return false
|
||||
r.addRejectionF("unwanted release group. unwanted: %v got: %v", filter.ExceptReleaseGroups, r.Group)
|
||||
}
|
||||
|
||||
if filter.MatchUploaders != "" && !checkFilterStrings(r.Uploader, filter.MatchUploaders) {
|
||||
r.addRejection("uploaders not matching")
|
||||
return false
|
||||
r.addRejectionF("uploaders not matching. wanted: %v got: %v", filter.MatchUploaders, r.Uploader)
|
||||
}
|
||||
|
||||
if filter.ExceptUploaders != "" && checkFilterStrings(r.Uploader, filter.ExceptUploaders) {
|
||||
r.addRejection("unwanted uploaders")
|
||||
return false
|
||||
r.addRejectionF("unwanted uploaders. unwanted: %v got: %v", filter.MatchUploaders, r.Uploader)
|
||||
}
|
||||
|
||||
if len(filter.Resolutions) > 0 && !checkFilterSlice(r.Resolution, filter.Resolutions) {
|
||||
r.addRejection("resolution not matching")
|
||||
return false
|
||||
r.addRejectionF("resolution not matching. wanted: %v got: %v", filter.Resolutions, r.Resolution)
|
||||
}
|
||||
|
||||
if len(filter.Codecs) > 0 && !checkFilterSlice(r.Codec, filter.Codecs) {
|
||||
r.addRejection("codec not matching")
|
||||
return false
|
||||
r.addRejectionF("codec not matching. wanted: %v got: %v", filter.Codecs, r.Codec)
|
||||
}
|
||||
|
||||
if len(filter.Sources) > 0 && !checkFilterSource(r.Source, filter.Sources) {
|
||||
r.addRejection("source not matching")
|
||||
return false
|
||||
r.addRejectionF("source not matching. wanted: %v got: %v", filter.Sources, r.Source)
|
||||
}
|
||||
|
||||
if len(filter.Containers) > 0 && !checkFilterSlice(r.Container, filter.Containers) {
|
||||
r.addRejection("container not matching")
|
||||
return false
|
||||
r.addRejectionF("container not matching. wanted: %v got: %v", filter.Containers, r.Container)
|
||||
}
|
||||
|
||||
if len(filter.MatchHDR) > 0 && !checkMultipleFilterHDR(filter.MatchHDR, r.HDR, r.TorrentName) {
|
||||
r.addRejection("hdr not matching")
|
||||
return false
|
||||
r.addRejectionF("hdr not matching. wanted: %v got: %v", filter.MatchHDR, r.HDR)
|
||||
}
|
||||
|
||||
if len(filter.ExceptHDR) > 0 && checkMultipleFilterHDR(filter.ExceptHDR, r.HDR, r.TorrentName) {
|
||||
r.addRejection("unwanted hdr")
|
||||
return false
|
||||
r.addRejectionF("hdr unwanted. unwanted: %v got: %v", filter.ExceptHDR, r.HDR)
|
||||
}
|
||||
|
||||
if filter.Years != "" && !checkFilterIntStrings(r.Year, filter.Years) {
|
||||
r.addRejection("year not matching")
|
||||
return false
|
||||
r.addRejectionF("year not matching. wanted: %v got: %d", filter.Years, r.Year)
|
||||
}
|
||||
|
||||
if filter.MatchCategories != "" && !checkFilterStrings(r.Category, filter.MatchCategories) {
|
||||
r.addRejection("category not matching")
|
||||
return false
|
||||
r.addRejectionF("category not matching. wanted: %v got: %v", filter.MatchCategories, r.Category)
|
||||
}
|
||||
|
||||
if filter.ExceptCategories != "" && checkFilterStrings(r.Category, filter.ExceptCategories) {
|
||||
r.addRejection("unwanted category")
|
||||
return false
|
||||
r.addRejectionF("category unwanted. unwanted: %v got: %v", filter.ExceptCategories, r.Category)
|
||||
}
|
||||
|
||||
if len(filter.MatchReleaseTypes) > 0 && !checkFilterSlice(r.Category, filter.MatchReleaseTypes) {
|
||||
r.addRejection("release type not matching")
|
||||
return false
|
||||
r.addRejectionF("release type not matching. wanted: %v got: %v", filter.MatchReleaseTypes, r.Category)
|
||||
}
|
||||
|
||||
if (filter.MinSize != "" || filter.MaxSize != "") && !r.CheckSizeFilter(filter.MinSize, filter.MaxSize) {
|
||||
return false
|
||||
r.addRejectionF("size not matching. wanted min: %v max: %v got: %v", filter.MinSize, filter.MaxSize, r.Size)
|
||||
}
|
||||
|
||||
if filter.Tags != "" && !checkFilterTags(r.Tags, filter.Tags) {
|
||||
r.addRejection("tags not matching")
|
||||
return false
|
||||
r.addRejectionF("tags not matching. wanted: %v got: %v", filter.Tags, r.Tags)
|
||||
}
|
||||
|
||||
if filter.ExceptTags != "" && checkFilterTags(r.Tags, filter.ExceptTags) {
|
||||
r.addRejection("unwanted tags")
|
||||
return false
|
||||
r.addRejectionF("tags unwanted. wanted: %v got: %v", filter.ExceptTags, r.Tags)
|
||||
}
|
||||
|
||||
if len(filter.Artists) > 0 && !checkFilterStrings(r.TorrentName, filter.Artists) {
|
||||
r.addRejection("artists not matching")
|
||||
return false
|
||||
}
|
||||
|
||||
if len(filter.Albums) > 0 && !checkFilterStrings(r.TorrentName, filter.Albums) {
|
||||
r.addRejection("albums not matching")
|
||||
return false
|
||||
}
|
||||
|
||||
// Perfect flac requires Cue, Log, Log Score 100, FLAC and 24bit Lossless
|
||||
if filter.PerfectFlac {
|
||||
if !r.HasLog || !r.HasCue || r.LogScore != 100 || r.Format != "FLAC" && !checkFilterSlice(r.Quality, []string{"Lossless", "24bit Lossless"}) {
|
||||
r.addRejection("wanted: log")
|
||||
return false
|
||||
r.addRejectionF("wanted: perfect flac. got: cue %v log %v log score %v format %v quality %v", r.HasCue, r.HasLog, r.LogScore, r.Format, r.Quality)
|
||||
}
|
||||
}
|
||||
|
||||
if len(filter.Formats) > 0 && !checkFilterSlice(r.Format, filter.Formats) {
|
||||
r.addRejection("formats not matching")
|
||||
return false
|
||||
r.addRejectionF("formats not matching. wanted: %v got: %v", filter.Formats, r.Format)
|
||||
}
|
||||
|
||||
if len(filter.Quality) > 0 && !checkFilterSlice(r.Quality, filter.Quality) {
|
||||
r.addRejection("formats not matching")
|
||||
return false
|
||||
r.addRejectionF("quality not matching. wanted: %v got: %v", filter.Quality, r.Quality)
|
||||
}
|
||||
|
||||
if len(filter.Media) > 0 && !checkFilterSource(r.Source, filter.Media) {
|
||||
r.addRejection("source not matching")
|
||||
return false
|
||||
r.addRejectionF("media not matching. wanted: %v got: %v", filter.Media, r.Source)
|
||||
}
|
||||
|
||||
if filter.Log && r.HasLog != filter.Log {
|
||||
r.addRejection("wanted: log")
|
||||
return false
|
||||
}
|
||||
|
||||
if filter.Log && filter.LogScore != 0 && r.LogScore != filter.LogScore {
|
||||
r.addRejection("wanted: log score")
|
||||
return false
|
||||
r.addRejectionF("wanted: log score %v got: %v", filter.LogScore, r.LogScore)
|
||||
}
|
||||
|
||||
if filter.Cue && r.HasCue != filter.Cue {
|
||||
r.addRejection("wanted: cue")
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
if len(r.Rejections) > 0 {
|
||||
return r.Rejections, false
|
||||
}
|
||||
|
||||
return nil, true
|
||||
}
|
||||
|
||||
// CheckSizeFilter additional size check
|
||||
|
|
|
@ -1249,7 +1249,7 @@ func TestRelease_CheckFilter(t *testing.T) {
|
|||
r := tt.fields // Release
|
||||
|
||||
_ = r.Parse() // Parse TorrentName into struct
|
||||
got := r.CheckFilter(tt.args.filter)
|
||||
_, got := r.CheckFilter(tt.args.filter)
|
||||
|
||||
assert.Equal(t, tt.want, got)
|
||||
})
|
||||
|
|
|
@ -15,7 +15,7 @@ import (
|
|||
type Service interface {
|
||||
FindByID(ctx context.Context, filterID int) (*domain.Filter, error)
|
||||
FindByIndexerIdentifier(indexer string) ([]domain.Filter, error)
|
||||
FindAndCheckFilters(release *domain.Release) (bool, *domain.Filter, error)
|
||||
CheckFilter(f domain.Filter, release *domain.Release) (bool, error)
|
||||
ListFilters(ctx context.Context) ([]domain.Filter, error)
|
||||
Store(ctx context.Context, filter domain.Filter) (*domain.Filter, error)
|
||||
Update(ctx context.Context, filter domain.Filter) (*domain.Filter, error)
|
||||
|
@ -229,110 +229,130 @@ func (s *service) Delete(ctx context.Context, filterID int) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *service) FindAndCheckFilters(release *domain.Release) (bool, *domain.Filter, error) {
|
||||
// find all enabled filters for indexer
|
||||
filters, err := s.repo.FindByIndexerIdentifier(release.Indexer)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msgf("filter-service.find_and_check_filters: could not find filters for indexer: %v", release.Indexer)
|
||||
return false, nil, err
|
||||
func (s *service) CheckFilter(f domain.Filter, release *domain.Release) (bool, error) {
|
||||
|
||||
log.Trace().Msgf("filter.Service.CheckFilter: checking filter: %v %+v", f.Name, f)
|
||||
log.Trace().Msgf("filter.Service.CheckFilter: checking filter: %v for release: %+v", f.Name, release)
|
||||
|
||||
rejections, matchedFilter := release.CheckFilter(f)
|
||||
if len(rejections) > 0 {
|
||||
log.Trace().Msgf("filter.Service.CheckFilter: (%v) for release: %v rejections: (%v)", f.Name, release.TorrentName, release.RejectionsString())
|
||||
return false, nil
|
||||
}
|
||||
|
||||
log.Trace().Msgf("filter-service.find_and_check_filters: found (%d) active filters to check for indexer '%v'", len(filters), release.Indexer)
|
||||
if matchedFilter {
|
||||
// if matched, do additional size check if needed, attach actions and return the filter
|
||||
|
||||
// save outside of loop to check multiple filters with only one fetch
|
||||
var torrentInfo *domain.TorrentBasic
|
||||
log.Debug().Msgf("filter.Service.CheckFilter: found and matched filter: %+v", f.Name)
|
||||
|
||||
// loop and check release to filter until match
|
||||
for _, f := range filters {
|
||||
log.Trace().Msgf("filter-service.find_and_check_filters: checking filter: %+v", f.Name)
|
||||
// Some indexers do not announce the size and if size (min,max) is set in a filter then it will need
|
||||
// additional size check. Some indexers have api implemented to fetch this data and for the others
|
||||
// it will download the torrent file to parse and make the size check. This is all to minimize the amount of downloads.
|
||||
|
||||
matchedFilter := release.CheckFilter(f)
|
||||
if matchedFilter {
|
||||
// if matched, do additional size check if needed, attach actions and return the filter
|
||||
// do additional size check against indexer api or torrent for size
|
||||
if release.AdditionalSizeCheckRequired {
|
||||
log.Debug().Msgf("filter.Service.CheckFilter: (%v) additional size check required", f.Name)
|
||||
|
||||
log.Debug().Msgf("filter-service.find_and_check_filters: found and matched filter: %+v", f.Name)
|
||||
|
||||
// Some indexers do not announce the size and if size (min,max) is set in a filter then it will need
|
||||
// additional size check. Some indexers have api implemented to fetch this data and for the others
|
||||
// it will download the torrent file to parse and make the size check. This is all to minimize the amount of downloads.
|
||||
|
||||
// do additional size check against indexer api or torrent for size
|
||||
if release.AdditionalSizeCheckRequired {
|
||||
log.Debug().Msgf("filter-service.find_and_check_filters: (%v) additional size check required", f.Name)
|
||||
|
||||
// check if indexer = btn, ptp, ggn or red
|
||||
if release.Indexer == "ptp" || release.Indexer == "btn" || release.Indexer == "ggn" || release.Indexer == "redacted" {
|
||||
// fetch torrent info from api
|
||||
// save outside of loop to check multiple filters with only one fetch
|
||||
if torrentInfo == nil {
|
||||
torrentInfo, err = s.apiService.GetTorrentByID(release.Indexer, release.TorrentID)
|
||||
if err != nil || torrentInfo == nil {
|
||||
log.Error().Stack().Err(err).Msgf("filter-service.find_and_check_filters: (%v) could not get torrent: '%v' from: %v", f.Name, release.TorrentID, release.Indexer)
|
||||
continue
|
||||
}
|
||||
|
||||
log.Debug().Msgf("filter-service.find_and_check_filters: (%v) got torrent info: %+v", f.Name, torrentInfo)
|
||||
}
|
||||
|
||||
// compare size against filters
|
||||
match, err := checkSizeFilter(f.MinSize, f.MaxSize, torrentInfo.ReleaseSizeBytes())
|
||||
if err != nil {
|
||||
log.Error().Stack().Err(err).Msgf("filter-service.find_and_check_filters: (%v) could not check size filter", f.Name)
|
||||
continue
|
||||
}
|
||||
|
||||
// no match, lets continue to next filter
|
||||
if !match {
|
||||
log.Debug().Msgf("filter-service.find_and_check_filters: (%v) filter did not match after additional size check, trying next", f.Name)
|
||||
continue
|
||||
}
|
||||
|
||||
// store size on the release
|
||||
release.Size = torrentInfo.ReleaseSizeBytes()
|
||||
} else {
|
||||
log.Trace().Msgf("filter-service.find_and_check_filters: (%v) additional size check required: preparing to download metafile", f.Name)
|
||||
|
||||
// if indexer doesn't have api, download torrent and add to tmpPath
|
||||
err = release.DownloadTorrentFile()
|
||||
if err != nil {
|
||||
log.Error().Stack().Err(err).Msgf("filter-service.find_and_check_filters: (%v) could not download torrent file with id: '%v' from: %v", f.Name, release.TorrentID, release.Indexer)
|
||||
return false, nil, err
|
||||
}
|
||||
|
||||
// compare size against filter
|
||||
match, err := checkSizeFilter(f.MinSize, f.MaxSize, release.Size)
|
||||
if err != nil {
|
||||
log.Error().Stack().Err(err).Msgf("filter-service.find_and_check_filters: (%v) could not check size filter", f.Name)
|
||||
continue
|
||||
}
|
||||
|
||||
// no match, lets continue to next filter
|
||||
if !match {
|
||||
log.Debug().Msgf("filter-service.find_and_check_filters: (%v) filter did not match after additional size check, trying next", f.Name)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// found matching filter, lets find the filter actions and attach
|
||||
actions, err := s.actionRepo.FindByFilterID(context.TODO(), f.ID)
|
||||
ok, err := s.AdditionalSizeCheck(f, release)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msgf("could not find actions for filter: %+v", f.Name)
|
||||
log.Error().Stack().Err(err).Msgf("filter.Service.CheckFilter: (%v) additional size check error", f.Name)
|
||||
return false, err
|
||||
}
|
||||
|
||||
// if no actions, continue to next filter
|
||||
if len(actions) == 0 {
|
||||
log.Trace().Msgf("filter-service.find_and_check_filters: no actions found for filter '%v', trying next one..", f.Name)
|
||||
continue
|
||||
if !ok {
|
||||
log.Trace().Msgf("filter.Service.CheckFilter: (%v) additional size check not matching what filter wanted", f.Name)
|
||||
return false, nil
|
||||
}
|
||||
f.Actions = actions
|
||||
|
||||
return true, &f, nil
|
||||
}
|
||||
|
||||
// found matching filter, lets find the filter actions and attach
|
||||
actions, err := s.actionRepo.FindByFilterID(context.TODO(), f.ID)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msgf("filter.Service.CheckFilter: error finding actions for filter: %+v", f.Name)
|
||||
return false, err
|
||||
}
|
||||
|
||||
// if no actions, continue to next filter
|
||||
if len(actions) == 0 {
|
||||
log.Trace().Msgf("filter.Service.CheckFilter: no actions found for filter '%v', trying next one..", f.Name)
|
||||
return false, err
|
||||
}
|
||||
release.Filter.Actions = actions
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// if no match, return nil
|
||||
return false, nil, nil
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (s *service) AdditionalSizeCheck(f domain.Filter, release *domain.Release) (bool, error) {
|
||||
|
||||
// save outside of loop to check multiple filters with only one fetch
|
||||
// TODO put on filter to reuse
|
||||
var torrentInfo *domain.TorrentBasic
|
||||
|
||||
// Some indexers do not announce the size and if size (min,max) is set in a filter then it will need
|
||||
// additional size check. Some indexers have api implemented to fetch this data and for the others
|
||||
// it will download the torrent file to parse and make the size check. This is all to minimize the amount of downloads.
|
||||
|
||||
// do additional size check against indexer api or torrent for size
|
||||
log.Debug().Msgf("filter-service.find_and_check_filters: (%v) additional size check required", f.Name)
|
||||
|
||||
// check if indexer = btn, ptp, ggn or red
|
||||
if release.Indexer == "ptp" || release.Indexer == "btn" || release.Indexer == "ggn" || release.Indexer == "redacted" {
|
||||
// fetch torrent info from api
|
||||
// save outside of loop to check multiple filters with only one fetch
|
||||
if torrentInfo == nil {
|
||||
torrentInfo, err := s.apiService.GetTorrentByID(release.Indexer, release.TorrentID)
|
||||
if err != nil || torrentInfo == nil {
|
||||
log.Error().Stack().Err(err).Msgf("filter-service.find_and_check_filters: (%v) could not get torrent: '%v' from: %v", f.Name, release.TorrentID, release.Indexer)
|
||||
return false, err
|
||||
}
|
||||
|
||||
log.Debug().Msgf("filter-service.find_and_check_filters: (%v) got torrent info: %+v", f.Name, torrentInfo)
|
||||
}
|
||||
|
||||
// compare size against filters
|
||||
match, err := checkSizeFilter(f.MinSize, f.MaxSize, torrentInfo.ReleaseSizeBytes())
|
||||
if err != nil {
|
||||
log.Error().Stack().Err(err).Msgf("filter-service.find_and_check_filters: (%v) could not check size filter", f.Name)
|
||||
return false, err
|
||||
}
|
||||
|
||||
// no match, lets continue to next filter
|
||||
if !match {
|
||||
log.Debug().Msgf("filter-service.find_and_check_filters: (%v) filter did not match after additional size check, trying next", f.Name)
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// store size on the release
|
||||
release.Size = torrentInfo.ReleaseSizeBytes()
|
||||
} else {
|
||||
log.Trace().Msgf("filter-service.find_and_check_filters: (%v) additional size check required: preparing to download metafile", f.Name)
|
||||
|
||||
// if indexer doesn't have api, download torrent and add to tmpPath
|
||||
err := release.DownloadTorrentFile()
|
||||
if err != nil {
|
||||
log.Error().Stack().Err(err).Msgf("filter-service.find_and_check_filters: (%v) could not download torrent file with id: '%v' from: %v", f.Name, release.TorrentID, release.Indexer)
|
||||
return false, err
|
||||
}
|
||||
|
||||
// compare size against filter
|
||||
match, err := checkSizeFilter(f.MinSize, f.MaxSize, release.Size)
|
||||
if err != nil {
|
||||
log.Error().Stack().Err(err).Msgf("filter-service.find_and_check_filters: (%v) could not check size filter", f.Name)
|
||||
return false, err
|
||||
}
|
||||
|
||||
// no match, lets continue to next filter
|
||||
if !match {
|
||||
log.Debug().Msgf("filter-service.find_and_check_filters: (%v) filter did not match after additional size check, trying next", f.Name)
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func checkSizeFilter(minSize string, maxSize string, releaseSize uint64) (bool, error) {
|
||||
|
|
|
@ -10,10 +10,7 @@ import (
|
|||
|
||||
"github.com/autobrr/autobrr/internal/announce"
|
||||
"github.com/autobrr/autobrr/internal/domain"
|
||||
"github.com/autobrr/autobrr/internal/filter"
|
||||
"github.com/autobrr/autobrr/internal/logger"
|
||||
"github.com/autobrr/autobrr/internal/release"
|
||||
|
||||
"github.com/ergochat/irc-go/ircevent"
|
||||
"github.com/ergochat/irc-go/ircmsg"
|
||||
"github.com/rs/zerolog/log"
|
||||
|
@ -57,8 +54,7 @@ func (h *channelHealth) resetMonitoring() {
|
|||
|
||||
type Handler struct {
|
||||
network *domain.IrcNetwork
|
||||
filterService filter.Service
|
||||
releaseService release.Service
|
||||
announceSvc announce.Service
|
||||
announceProcessors map[string]announce.Processor
|
||||
definitions map[string]*domain.IndexerDefinition
|
||||
|
||||
|
@ -75,12 +71,11 @@ type Handler struct {
|
|||
channelHealth map[string]*channelHealth
|
||||
}
|
||||
|
||||
func NewHandler(network domain.IrcNetwork, filterService filter.Service, releaseService release.Service, definitions []domain.IndexerDefinition) *Handler {
|
||||
func NewHandler(network domain.IrcNetwork, definitions []domain.IndexerDefinition, announceSvc announce.Service) *Handler {
|
||||
h := &Handler{
|
||||
client: nil,
|
||||
network: &network,
|
||||
filterService: filterService,
|
||||
releaseService: releaseService,
|
||||
announceSvc: announceSvc,
|
||||
definitions: map[string]*domain.IndexerDefinition{},
|
||||
announceProcessors: map[string]announce.Processor{},
|
||||
validAnnouncers: map[string]struct{}{},
|
||||
|
@ -109,7 +104,7 @@ func (h *Handler) InitIndexers(definitions []domain.IndexerDefinition) {
|
|||
// some channels are defined in mixed case
|
||||
channel = strings.ToLower(channel)
|
||||
|
||||
h.announceProcessors[channel] = announce.NewAnnounceProcessor(definition, h.filterService, h.releaseService)
|
||||
h.announceProcessors[channel] = announce.NewAnnounceProcessor(h.announceSvc, definition)
|
||||
|
||||
h.channelHealth[channel] = &channelHealth{
|
||||
name: channel,
|
||||
|
|
|
@ -3,15 +3,14 @@ package irc
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/pkg/errors"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/autobrr/autobrr/internal/announce"
|
||||
"github.com/autobrr/autobrr/internal/domain"
|
||||
"github.com/autobrr/autobrr/internal/filter"
|
||||
"github.com/autobrr/autobrr/internal/indexer"
|
||||
"github.com/autobrr/autobrr/internal/release"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
|
@ -29,24 +28,22 @@ type Service interface {
|
|||
}
|
||||
|
||||
type service struct {
|
||||
repo domain.IrcRepo
|
||||
filterService filter.Service
|
||||
indexerService indexer.Service
|
||||
releaseService release.Service
|
||||
indexerMap map[string]string
|
||||
handlers map[handlerKey]*Handler
|
||||
repo domain.IrcRepo
|
||||
announceService announce.Service
|
||||
indexerService indexer.Service
|
||||
indexerMap map[string]string
|
||||
handlers map[handlerKey]*Handler
|
||||
|
||||
stopWG sync.WaitGroup
|
||||
lock sync.Mutex
|
||||
}
|
||||
|
||||
func NewService(repo domain.IrcRepo, filterService filter.Service, indexerSvc indexer.Service, releaseSvc release.Service) Service {
|
||||
func NewService(repo domain.IrcRepo, announceSvc announce.Service, indexerSvc indexer.Service) Service {
|
||||
return &service{
|
||||
repo: repo,
|
||||
filterService: filterService,
|
||||
indexerService: indexerSvc,
|
||||
releaseService: releaseSvc,
|
||||
handlers: make(map[handlerKey]*Handler),
|
||||
repo: repo,
|
||||
announceService: announceSvc,
|
||||
indexerService: indexerSvc,
|
||||
handlers: make(map[handlerKey]*Handler),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -80,7 +77,7 @@ func (s *service) StartHandlers() {
|
|||
definitions := s.indexerService.GetIndexersByIRCNetwork(network.Server)
|
||||
|
||||
// init new irc handler
|
||||
handler := NewHandler(network, s.filterService, s.releaseService, definitions)
|
||||
handler := NewHandler(network, definitions, s.announceService)
|
||||
|
||||
// use network.Server + nick to use multiple indexers with different nick per network
|
||||
// this allows for multiple handlers to one network
|
||||
|
@ -136,7 +133,7 @@ func (s *service) startNetwork(network domain.IrcNetwork) error {
|
|||
definitions := s.indexerService.GetIndexersByIRCNetwork(network.Server)
|
||||
|
||||
// init new irc handler
|
||||
handler := NewHandler(network, s.filterService, s.releaseService, definitions)
|
||||
handler := NewHandler(network, definitions, s.announceService)
|
||||
|
||||
s.handlers[handlerKey{network.Server, network.NickServ.Account}] = handler
|
||||
s.lock.Unlock()
|
||||
|
|
|
@ -4,7 +4,9 @@ import (
|
|||
"bytes"
|
||||
"crypto/tls"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/autobrr/autobrr/internal/domain"
|
||||
|
@ -38,7 +40,7 @@ func discordNotification(event domain.EventsReleasePushed, webhookURL string) {
|
|||
},
|
||||
}
|
||||
|
||||
client := http.Client{Transport: t, Timeout: 15 * time.Second}
|
||||
client := http.Client{Transport: t, Timeout: 30 * time.Second}
|
||||
|
||||
color := map[domain.ReleasePushStatus]int{
|
||||
domain.ReleasePushStatusApproved: 5814783,
|
||||
|
@ -72,8 +74,18 @@ func discordNotification(event domain.EventsReleasePushed, webhookURL string) {
|
|||
{
|
||||
Name: "Action",
|
||||
Value: event.Action,
|
||||
Inline: false,
|
||||
Inline: true,
|
||||
},
|
||||
{
|
||||
Name: "Action type",
|
||||
Value: string(event.ActionType),
|
||||
Inline: true,
|
||||
},
|
||||
//{
|
||||
// Name: "Action client",
|
||||
// Value: event.ActionClient,
|
||||
// Inline: true,
|
||||
//},
|
||||
},
|
||||
Timestamp: time.Now(),
|
||||
},
|
||||
|
@ -81,6 +93,31 @@ func discordNotification(event domain.EventsReleasePushed, webhookURL string) {
|
|||
Username: "brr",
|
||||
}
|
||||
|
||||
if event.ActionClient == "" {
|
||||
rej := DiscordEmbedsFields{
|
||||
Name: "Action client",
|
||||
Value: "n/a",
|
||||
Inline: true,
|
||||
}
|
||||
m.Embeds[0].Fields = append(m.Embeds[0].Fields, rej)
|
||||
} else {
|
||||
rej := DiscordEmbedsFields{
|
||||
Name: "Action client",
|
||||
Value: event.ActionClient,
|
||||
Inline: true,
|
||||
}
|
||||
m.Embeds[0].Fields = append(m.Embeds[0].Fields, rej)
|
||||
}
|
||||
|
||||
if len(event.Rejections) > 0 {
|
||||
rej := DiscordEmbedsFields{
|
||||
Name: "Reasons",
|
||||
Value: fmt.Sprintf("```\n%v\n```", strings.Join(event.Rejections, " ,")),
|
||||
Inline: false,
|
||||
}
|
||||
m.Embeds[0].Fields = append(m.Embeds[0].Fields, rej)
|
||||
}
|
||||
|
||||
jsonData, err := json.Marshal(m)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msgf("discord client could not marshal data: %v", m)
|
||||
|
@ -89,7 +126,7 @@ func discordNotification(event domain.EventsReleasePushed, webhookURL string) {
|
|||
|
||||
req, err := http.NewRequest(http.MethodPost, webhookURL, bytes.NewBuffer(jsonData))
|
||||
if err != nil {
|
||||
//log.Error().Err(err).Msgf("webhook client request error: %v", action.WebhookHost)
|
||||
log.Error().Err(err).Msgf("discord client request error: %v", event.ReleaseName)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -98,7 +135,7 @@ func discordNotification(event domain.EventsReleasePushed, webhookURL string) {
|
|||
|
||||
res, err := client.Do(req)
|
||||
if err != nil {
|
||||
//log.Error().Err(err).Msgf("webhook client request error: %v", action.WebhookHost)
|
||||
log.Error().Err(err).Msgf("discord client request error: %v", event.ReleaseName)
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
@ -2,10 +2,6 @@ package release
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/rs/zerolog/log"
|
||||
|
||||
"github.com/autobrr/autobrr/internal/action"
|
||||
"github.com/autobrr/autobrr/internal/domain"
|
||||
)
|
||||
|
||||
|
@ -15,19 +11,16 @@ type Service interface {
|
|||
Stats(ctx context.Context) (*domain.ReleaseStats, error)
|
||||
Store(ctx context.Context, release *domain.Release) error
|
||||
StoreReleaseActionStatus(ctx context.Context, actionStatus *domain.ReleaseActionStatus) error
|
||||
Process(release domain.Release) error
|
||||
Delete(ctx context.Context) error
|
||||
}
|
||||
|
||||
type service struct {
|
||||
repo domain.ReleaseRepo
|
||||
actionSvc action.Service
|
||||
repo domain.ReleaseRepo
|
||||
}
|
||||
|
||||
func NewService(repo domain.ReleaseRepo, actionService action.Service) Service {
|
||||
func NewService(repo domain.ReleaseRepo) Service {
|
||||
return &service{
|
||||
repo: repo,
|
||||
actionSvc: actionService,
|
||||
repo: repo,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -56,25 +49,6 @@ func (s *service) StoreReleaseActionStatus(ctx context.Context, actionStatus *do
|
|||
return s.repo.StoreReleaseActionStatus(ctx, actionStatus)
|
||||
}
|
||||
|
||||
func (s *service) Process(release domain.Release) error {
|
||||
log.Trace().Msgf("start to process release: %+v", release)
|
||||
|
||||
if release.Filter.Actions == nil {
|
||||
return fmt.Errorf("no actions for filter: %v", release.Filter.Name)
|
||||
}
|
||||
|
||||
// smart episode?
|
||||
|
||||
// run actions (watchFolder, test, exec, qBittorrent, Deluge etc.)
|
||||
err := s.actionSvc.RunActions(release.Filter.Actions, release)
|
||||
if err != nil {
|
||||
log.Error().Stack().Err(err).Msgf("error running actions for filter: %v", release.Filter.Name)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *service) Delete(ctx context.Context) error {
|
||||
return s.repo.Delete(ctx)
|
||||
}
|
||||
|
|
|
@ -34,7 +34,7 @@ type client struct {
|
|||
func New(config Config) Client {
|
||||
|
||||
httpClient := &http.Client{
|
||||
Timeout: time.Second * 10,
|
||||
Timeout: time.Second * 30,
|
||||
}
|
||||
|
||||
c := &client{
|
||||
|
|
|
@ -23,7 +23,7 @@ var (
|
|||
10 * time.Second,
|
||||
20 * time.Second,
|
||||
}
|
||||
timeout = 20 * time.Second
|
||||
timeout = 60 * time.Second
|
||||
)
|
||||
|
||||
type Client struct {
|
||||
|
|
|
@ -33,7 +33,7 @@ type client struct {
|
|||
func New(config Config) Client {
|
||||
|
||||
httpClient := &http.Client{
|
||||
Timeout: time.Second * 10,
|
||||
Timeout: time.Second * 30,
|
||||
}
|
||||
|
||||
c := &client{
|
||||
|
|
|
@ -34,7 +34,7 @@ type client struct {
|
|||
func New(config Config) Client {
|
||||
|
||||
httpClient := &http.Client{
|
||||
Timeout: time.Second * 10,
|
||||
Timeout: time.Second * 30,
|
||||
}
|
||||
|
||||
c := &client{
|
||||
|
|
|
@ -33,7 +33,7 @@ type client struct {
|
|||
func New(config Config) Client {
|
||||
|
||||
httpClient := &http.Client{
|
||||
Timeout: time.Second * 10,
|
||||
Timeout: time.Second * 30,
|
||||
}
|
||||
|
||||
c := &client{
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue