mirror of
https://github.com/idanoo/autobrr
synced 2025-07-22 16:29:12 +00:00
1031 lines
32 KiB
Go
1031 lines
32 KiB
Go
// Copyright (c) 2021 - 2025, Ludvig Lundgren and the autobrr contributors.
|
|
// SPDX-License-Identifier: GPL-2.0-or-later
|
|
|
|
package filter
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"os/exec"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/autobrr/autobrr/internal/action"
|
|
"github.com/autobrr/autobrr/internal/domain"
|
|
"github.com/autobrr/autobrr/internal/indexer"
|
|
"github.com/autobrr/autobrr/internal/logger"
|
|
"github.com/autobrr/autobrr/internal/releasedownload"
|
|
"github.com/autobrr/autobrr/internal/utils"
|
|
"github.com/autobrr/autobrr/pkg/errors"
|
|
"github.com/autobrr/autobrr/pkg/sharedhttp"
|
|
|
|
"github.com/avast/retry-go/v4"
|
|
"github.com/mattn/go-shellwords"
|
|
"github.com/rs/zerolog"
|
|
)
|
|
|
|
type Service interface {
|
|
FindByID(ctx context.Context, filterID int) (*domain.Filter, error)
|
|
FindByIndexerIdentifier(ctx context.Context, indexer string) ([]*domain.Filter, error)
|
|
Find(ctx context.Context, params domain.FilterQueryParams) ([]domain.Filter, error)
|
|
CheckFilter(ctx context.Context, f *domain.Filter, release *domain.Release) (bool, error)
|
|
ListFilters(ctx context.Context) ([]domain.Filter, error)
|
|
Store(ctx context.Context, filter *domain.Filter) error
|
|
Update(ctx context.Context, filter *domain.Filter) error
|
|
UpdatePartial(ctx context.Context, filter domain.FilterUpdate) error
|
|
Duplicate(ctx context.Context, filterID int) (*domain.Filter, error)
|
|
ToggleEnabled(ctx context.Context, filterID int, enabled bool) error
|
|
Delete(ctx context.Context, filterID int) error
|
|
AdditionalSizeCheck(ctx context.Context, f *domain.Filter, release *domain.Release) (bool, error)
|
|
AdditionalUploaderCheck(ctx context.Context, f *domain.Filter, release *domain.Release) (bool, error)
|
|
AdditionalRecordLabelCheck(ctx context.Context, f *domain.Filter, release *domain.Release) (bool, error)
|
|
CheckSmartEpisodeCanDownload(ctx context.Context, params *domain.SmartEpisodeParams) (bool, error)
|
|
GetDownloadsByFilterId(ctx context.Context, filterID int) (*domain.FilterDownloads, error)
|
|
CheckIsDuplicateRelease(ctx context.Context, profile *domain.DuplicateReleaseProfile, release *domain.Release) (bool, error)
|
|
}
|
|
|
|
type service struct {
|
|
log zerolog.Logger
|
|
repo domain.FilterRepo
|
|
actionService action.Service
|
|
releaseRepo domain.ReleaseRepo
|
|
indexerSvc indexer.Service
|
|
apiService indexer.APIService
|
|
downloadSvc *releasedownload.DownloadService
|
|
|
|
httpClient *http.Client
|
|
}
|
|
|
|
func NewService(log logger.Logger, repo domain.FilterRepo, actionSvc action.Service, releaseRepo domain.ReleaseRepo, apiService indexer.APIService, indexerSvc indexer.Service, downloadSvc *releasedownload.DownloadService) Service {
|
|
return &service{
|
|
log: log.With().Str("module", "filter").Logger(),
|
|
repo: repo,
|
|
releaseRepo: releaseRepo,
|
|
actionService: actionSvc,
|
|
apiService: apiService,
|
|
indexerSvc: indexerSvc,
|
|
downloadSvc: downloadSvc,
|
|
httpClient: &http.Client{
|
|
Timeout: time.Second * 120,
|
|
Transport: sharedhttp.TransportTLSInsecure,
|
|
},
|
|
}
|
|
}
|
|
|
|
func (s *service) Find(ctx context.Context, params domain.FilterQueryParams) ([]domain.Filter, error) {
|
|
// get filters
|
|
filters, err := s.repo.Find(ctx, params)
|
|
if err != nil {
|
|
s.log.Error().Err(err).Msgf("could not find list filters")
|
|
return nil, err
|
|
}
|
|
|
|
ret := make([]domain.Filter, 0)
|
|
|
|
for _, filter := range filters {
|
|
indexers, err := s.indexerSvc.FindByFilterID(ctx, filter.ID)
|
|
if err != nil {
|
|
return ret, err
|
|
}
|
|
filter.Indexers = indexers
|
|
|
|
if filter.MaxDownloads > 0 && filter.MaxDownloadsUnit != "" {
|
|
counts, err := s.repo.GetDownloadsByFilterId(ctx, filter.ID)
|
|
if err != nil {
|
|
return ret, err
|
|
}
|
|
|
|
filter.Downloads = counts
|
|
}
|
|
|
|
ret = append(ret, filter)
|
|
}
|
|
|
|
return ret, nil
|
|
}
|
|
|
|
func (s *service) ListFilters(ctx context.Context) ([]domain.Filter, error) {
|
|
// get filters
|
|
filters, err := s.repo.ListFilters(ctx)
|
|
if err != nil {
|
|
s.log.Error().Err(err).Msgf("could not find list filters")
|
|
return nil, err
|
|
}
|
|
|
|
ret := make([]domain.Filter, 0)
|
|
|
|
for _, filter := range filters {
|
|
indexers, err := s.indexerSvc.FindByFilterID(ctx, filter.ID)
|
|
if err != nil {
|
|
return ret, err
|
|
}
|
|
filter.Indexers = indexers
|
|
|
|
ret = append(ret, filter)
|
|
}
|
|
|
|
return ret, nil
|
|
}
|
|
|
|
func (s *service) FindByID(ctx context.Context, filterID int) (*domain.Filter, error) {
|
|
filter, err := s.repo.FindByID(ctx, filterID)
|
|
if err != nil {
|
|
s.log.Error().Err(err).Msgf("could not find filter for id: %v", filterID)
|
|
return nil, err
|
|
}
|
|
|
|
externalFilters, err := s.repo.FindExternalFiltersByID(ctx, filter.ID)
|
|
if err != nil {
|
|
s.log.Error().Err(err).Msgf("could not find external filters for filter id: %v", filter.ID)
|
|
}
|
|
filter.External = externalFilters
|
|
|
|
actions, err := s.actionService.FindByFilterID(ctx, filter.ID, nil, false)
|
|
if err != nil {
|
|
s.log.Error().Err(err).Msgf("could not find filter actions for filter id: %v", filter.ID)
|
|
}
|
|
filter.Actions = actions
|
|
|
|
indexers, err := s.indexerSvc.FindByFilterID(ctx, filter.ID)
|
|
if err != nil {
|
|
s.log.Error().Err(err).Msgf("could not find indexers for filter: %v", filter.Name)
|
|
return nil, err
|
|
}
|
|
filter.Indexers = indexers
|
|
|
|
return filter, nil
|
|
}
|
|
|
|
func (s *service) FindByIndexerIdentifier(ctx context.Context, indexer string) ([]*domain.Filter, error) {
|
|
// get filters for indexer
|
|
filters, err := s.repo.FindByIndexerIdentifier(ctx, indexer)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// we do not load actions here since we do not need it at this stage
|
|
// only load those after filter has matched
|
|
for _, filter := range filters {
|
|
filter := filter
|
|
|
|
externalFilters, err := s.repo.FindExternalFiltersByID(ctx, filter.ID)
|
|
if err != nil {
|
|
s.log.Error().Err(err).Msgf("could not find external filters for filter id: %v", filter.ID)
|
|
}
|
|
filter.External = externalFilters
|
|
|
|
}
|
|
|
|
return filters, nil
|
|
}
|
|
|
|
func (s *service) GetDownloadsByFilterId(ctx context.Context, filterID int) (*domain.FilterDownloads, error) {
|
|
return s.GetDownloadsByFilterId(ctx, filterID)
|
|
}
|
|
|
|
func (s *service) Store(ctx context.Context, filter *domain.Filter) error {
|
|
if err := filter.Validate(); err != nil {
|
|
s.log.Error().Err(err).Msgf("invalid filter: %v", filter)
|
|
return err
|
|
}
|
|
|
|
if filter.AnnounceTypes == nil || len(filter.AnnounceTypes) == 0 {
|
|
filter.AnnounceTypes = []string{string(domain.AnnounceTypeNew)}
|
|
}
|
|
|
|
if err := s.repo.Store(ctx, filter); err != nil {
|
|
s.log.Error().Err(err).Msgf("could not store filter: %v", filter)
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *service) Update(ctx context.Context, filter *domain.Filter) error {
|
|
err := filter.Validate()
|
|
if err != nil {
|
|
s.log.Error().Err(err).Msgf("validation error filter: %+v", filter)
|
|
return err
|
|
}
|
|
|
|
err = filter.Sanitize()
|
|
if err != nil {
|
|
s.log.Error().Err(err).Msgf("could not sanitize filter: %v", filter)
|
|
return err
|
|
}
|
|
|
|
// update
|
|
err = s.repo.Update(ctx, filter)
|
|
if err != nil {
|
|
s.log.Error().Err(err).Msgf("could not update filter: %s", filter.Name)
|
|
return err
|
|
}
|
|
|
|
// take care of connected indexers
|
|
err = s.repo.StoreIndexerConnections(ctx, filter.ID, filter.Indexers)
|
|
if err != nil {
|
|
s.log.Error().Err(err).Msgf("could not store filter indexer connections: %s", filter.Name)
|
|
return err
|
|
}
|
|
|
|
// take care of connected external filters
|
|
err = s.repo.StoreFilterExternal(ctx, filter.ID, filter.External)
|
|
if err != nil {
|
|
s.log.Error().Err(err).Msgf("could not store external filters: %s", filter.Name)
|
|
return err
|
|
}
|
|
|
|
// take care of filter actions
|
|
actions, err := s.actionService.StoreFilterActions(ctx, int64(filter.ID), filter.Actions)
|
|
if err != nil {
|
|
s.log.Error().Err(err).Msgf("could not store filter actions: %s", filter.Name)
|
|
return err
|
|
}
|
|
|
|
filter.Actions = actions
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *service) UpdatePartial(ctx context.Context, filter domain.FilterUpdate) error {
|
|
// cleanup
|
|
if filter.Shows != nil {
|
|
// replace newline with comma
|
|
clean := strings.ReplaceAll(*filter.Shows, "\n", ",")
|
|
clean = strings.ReplaceAll(clean, ",,", ",")
|
|
|
|
filter.Shows = &clean
|
|
}
|
|
|
|
// update
|
|
if err := s.repo.UpdatePartial(ctx, filter); err != nil {
|
|
s.log.Error().Err(err).Msgf("could not update partial filter: %v", filter.ID)
|
|
return err
|
|
}
|
|
|
|
if filter.Indexers != nil {
|
|
// take care of connected indexers
|
|
if err := s.repo.StoreIndexerConnections(ctx, filter.ID, filter.Indexers); err != nil {
|
|
s.log.Error().Err(err).Msgf("could not store filter indexer connections: %v", filter.Name)
|
|
return err
|
|
}
|
|
}
|
|
|
|
if filter.External != nil {
|
|
// take care of connected external filters
|
|
if err := s.repo.StoreFilterExternal(ctx, filter.ID, filter.External); err != nil {
|
|
s.log.Error().Err(err).Msgf("could not store external filters: %v", filter.Name)
|
|
return err
|
|
}
|
|
}
|
|
|
|
if filter.Actions != nil {
|
|
// take care of filter actions
|
|
if _, err := s.actionService.StoreFilterActions(ctx, int64(filter.ID), filter.Actions); err != nil {
|
|
s.log.Error().Err(err).Msgf("could not store filter actions: %v", filter.ID)
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *service) Duplicate(ctx context.Context, filterID int) (*domain.Filter, error) {
|
|
// find filter with actions, indexers and external filters
|
|
filter, err := s.FindByID(ctx, filterID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// reset id and name
|
|
filter.ID = 0
|
|
filter.Name = fmt.Sprintf("%s Copy", filter.Name)
|
|
filter.Enabled = false
|
|
|
|
// store new filter
|
|
if err := s.repo.Store(ctx, filter); err != nil {
|
|
s.log.Error().Err(err).Msgf("could not update filter: %s", filter.Name)
|
|
return nil, err
|
|
}
|
|
|
|
// take care of connected indexers
|
|
if err := s.repo.StoreIndexerConnections(ctx, filter.ID, filter.Indexers); err != nil {
|
|
s.log.Error().Err(err).Msgf("could not store filter indexer connections: %s", filter.Name)
|
|
return nil, err
|
|
}
|
|
|
|
// reset action id to 0
|
|
for i, a := range filter.Actions {
|
|
a := a
|
|
a.ID = 0
|
|
filter.Actions[i] = a
|
|
}
|
|
|
|
// take care of filter actions
|
|
if _, err := s.actionService.StoreFilterActions(ctx, int64(filter.ID), filter.Actions); err != nil {
|
|
s.log.Error().Err(err).Msgf("could not store filter actions: %s", filter.Name)
|
|
return nil, err
|
|
}
|
|
|
|
// take care of connected external filters
|
|
// the external filters are fetched with FindByID
|
|
if err := s.repo.StoreFilterExternal(ctx, filter.ID, filter.External); err != nil {
|
|
s.log.Error().Err(err).Msgf("could not store external filters: %s", filter.Name)
|
|
return nil, err
|
|
}
|
|
|
|
return filter, nil
|
|
}
|
|
|
|
func (s *service) ToggleEnabled(ctx context.Context, filterID int, enabled bool) error {
|
|
if err := s.repo.ToggleEnabled(ctx, filterID, enabled); err != nil {
|
|
s.log.Error().Err(err).Msg("could not update filter enabled")
|
|
return err
|
|
}
|
|
|
|
s.log.Debug().Msgf("filter.toggle_enabled: update filter '%v' to '%v'", filterID, enabled)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *service) Delete(ctx context.Context, filterID int) error {
|
|
if filterID == 0 {
|
|
return nil
|
|
}
|
|
|
|
// take care of filter actions
|
|
if err := s.actionService.DeleteByFilterID(ctx, filterID); err != nil {
|
|
s.log.Error().Err(err).Msg("could not delete filter actions")
|
|
return err
|
|
}
|
|
|
|
// take care of filter indexers
|
|
if err := s.repo.DeleteIndexerConnections(ctx, filterID); err != nil {
|
|
s.log.Error().Err(err).Msg("could not delete filter indexers")
|
|
return err
|
|
}
|
|
|
|
// delete filter external
|
|
if err := s.repo.DeleteFilterExternal(ctx, filterID); err != nil {
|
|
s.log.Error().Err(err).Msgf("could not delete filter external: %v", filterID)
|
|
return err
|
|
}
|
|
|
|
// delete filter
|
|
if err := s.repo.Delete(ctx, filterID); err != nil {
|
|
s.log.Error().Err(err).Msgf("could not delete filter: %v", filterID)
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *service) CheckFilter(ctx context.Context, f *domain.Filter, release *domain.Release) (bool, error) {
|
|
l := s.log.With().Str("method", "CheckFilter").Logger()
|
|
|
|
l.Debug().Msgf("checking filter: %s with release %s", f.Name, release.TorrentName)
|
|
|
|
l.Trace().Msgf("checking filter: %s %+v", f.Name, f)
|
|
l.Trace().Msgf("checking filter: %s for release: %+v", f.Name, release)
|
|
|
|
// do additional fetch to get download counts for filter
|
|
if f.MaxDownloads > 0 {
|
|
downloadCounts, err := s.repo.GetDownloadsByFilterId(ctx, f.ID)
|
|
if err != nil {
|
|
l.Error().Err(err).Msg("error getting download counters for filter")
|
|
return false, nil
|
|
}
|
|
f.Downloads = downloadCounts
|
|
}
|
|
|
|
rejections, matchedFilter := f.CheckFilter(release)
|
|
if rejections.Len() > 0 {
|
|
l.Debug().Msgf("(%s) for release: %v rejections: (%s)", f.Name, release.TorrentName, rejections.StringTruncated())
|
|
return false, nil
|
|
}
|
|
|
|
if !matchedFilter {
|
|
// if no match, return nil
|
|
return false, nil
|
|
}
|
|
|
|
// smartEpisode check
|
|
if f.SmartEpisode {
|
|
params := &domain.SmartEpisodeParams{
|
|
Title: release.Title,
|
|
Season: release.Season,
|
|
Episode: release.Episode,
|
|
Year: release.Year,
|
|
Month: release.Month,
|
|
Day: release.Day,
|
|
Repack: release.Repack,
|
|
Proper: release.Proper,
|
|
Group: release.Group,
|
|
}
|
|
canDownloadShow, err := s.CheckSmartEpisodeCanDownload(ctx, params)
|
|
if err != nil {
|
|
l.Trace().Msgf("failed smart episode check: %s", f.Name)
|
|
return false, nil
|
|
}
|
|
|
|
if !canDownloadShow {
|
|
l.Trace().Msgf("failed smart episode check: %s", f.Name)
|
|
if params.IsDailyEpisode() {
|
|
f.RejectReasons.Add("smart episode", fmt.Sprintf("not new (%s) daily: %d-%d-%d", release.Title, release.Year, release.Month, release.Day), fmt.Sprintf("expected newer than (%s) daily: %d-%d-%d", release.Title, release.Year, release.Month, release.Day))
|
|
} else {
|
|
f.RejectReasons.Add("smart episode", fmt.Sprintf("not new (%s) season: %d ep: %d", release.Title, release.Season, release.Episode), fmt.Sprintf("expected newer than (%s) season: %d ep: %d", release.Title, release.Season, release.Episode))
|
|
}
|
|
return false, nil
|
|
}
|
|
}
|
|
|
|
// check duplicates
|
|
if f.DuplicateHandling != nil {
|
|
l.Debug().Msgf("(%s) check is duplicate with profile %s", f.Name, f.DuplicateHandling.Name)
|
|
|
|
release.SkipDuplicateProfileID = f.DuplicateHandling.ID
|
|
release.SkipDuplicateProfileName = f.DuplicateHandling.Name
|
|
|
|
isDuplicate, err := s.CheckIsDuplicateRelease(ctx, f.DuplicateHandling, release)
|
|
if err != nil {
|
|
return false, errors.Wrap(err, "error finding duplicate handle")
|
|
}
|
|
|
|
if isDuplicate {
|
|
l.Debug().Msgf("filter %s rejected release %q as duplicate with profile %q", f.Name, release.TorrentName, f.DuplicateHandling.Name)
|
|
f.RejectReasons.Add("duplicate", "duplicate", "not duplicate")
|
|
|
|
// let it continue so external filters can trigger checks
|
|
//return false, nil
|
|
release.IsDuplicate = true
|
|
}
|
|
}
|
|
|
|
// if matched, do additional size check if needed, attach actions and return the filter
|
|
|
|
l.Debug().Msgf("found and matched filter: %s", f.Name)
|
|
|
|
// If size constraints are set in a filter and the indexer did not
|
|
// announce the size, we need to do an additional out of band size check.
|
|
if release.AdditionalSizeCheckRequired {
|
|
l.Debug().Msgf("(%s) additional size check required", f.Name)
|
|
|
|
ok, err := s.AdditionalSizeCheck(ctx, f, release)
|
|
if err != nil {
|
|
l.Error().Err(err).Msgf("(%s) additional size check error", f.Name)
|
|
return false, err
|
|
}
|
|
|
|
if !ok {
|
|
l.Trace().Msgf("(%s) additional size check not matching what filter wanted", f.Name)
|
|
return false, nil
|
|
}
|
|
}
|
|
|
|
// check uploader if the indexer supports check via api
|
|
if release.AdditionalUploaderCheckRequired {
|
|
l.Debug().Msgf("(%s) additional uploader check required", f.Name)
|
|
|
|
ok, err := s.AdditionalUploaderCheck(ctx, f, release)
|
|
if err != nil {
|
|
l.Error().Err(err).Msgf("(%s) additional uploader check error", f.Name)
|
|
return false, err
|
|
}
|
|
|
|
if !ok {
|
|
l.Trace().Msgf("(%s) additional uploader check not matching what filter wanted", f.Name)
|
|
return false, nil
|
|
}
|
|
}
|
|
|
|
if release.AdditionalRecordLabelCheckRequired {
|
|
l.Debug().Msgf("(%s) additional record label check required", f.Name)
|
|
|
|
ok, err := s.AdditionalRecordLabelCheck(ctx, f, release)
|
|
if err != nil {
|
|
l.Error().Err(err).Msgf("(%s) additional record label check error", f.Name)
|
|
return false, err
|
|
}
|
|
|
|
if !ok {
|
|
l.Trace().Msgf("(%s) additional record label check not matching what filter wanted", f.Name)
|
|
return false, nil
|
|
}
|
|
}
|
|
|
|
// run external filters
|
|
if f.External != nil {
|
|
externalOk, err := s.RunExternalFilters(ctx, f, f.External, release)
|
|
if err != nil {
|
|
l.Error().Err(err).Msgf("(%s) external filter check error", f.Name)
|
|
return false, err
|
|
}
|
|
|
|
if !externalOk {
|
|
l.Debug().Msgf("(%s) external filter check not matching what filter wanted", f.Name)
|
|
return false, nil
|
|
}
|
|
}
|
|
|
|
return true, nil
|
|
}
|
|
|
|
// AdditionalSizeCheck performs additional out-of-band checks to determine the
|
|
// values of a torrent. Some indexers do not announce torrent size, so it is
|
|
// necessary to determine the size of the torrent in some other way. Some
|
|
// indexers have an API implemented to fetch this data. For those which don't,
|
|
// it is necessary to download the torrent file and parse it to make the size
|
|
// check. We use the API where available to minimize the number of torrents we
|
|
// need to download.
|
|
func (s *service) AdditionalSizeCheck(ctx context.Context, f *domain.Filter, release *domain.Release) (ok bool, err error) {
|
|
defer func() {
|
|
// try recover panic if anything went wrong with API or size checks
|
|
errors.RecoverPanic(recover(), &err)
|
|
}()
|
|
|
|
// do additional size check against indexer api or torrent for size
|
|
l := s.log.With().Str("method", "AdditionalSizeCheck").Logger()
|
|
|
|
l.Debug().Msgf("(%s) additional api size check required", f.Name)
|
|
|
|
switch release.Indexer.Identifier {
|
|
case "btn", "ggn", "redacted", "ops", "mock":
|
|
if (release.Size == 0 && release.AdditionalSizeCheckRequired) || (release.Uploader == "" && release.AdditionalUploaderCheckRequired) || (release.RecordLabel == "" && release.AdditionalRecordLabelCheckRequired) {
|
|
l.Trace().Msgf("(%s) preparing to check size via api", f.Name)
|
|
|
|
torrentInfo, err := s.apiService.GetTorrentByID(ctx, release.Indexer.Identifier, release.TorrentID, f.FreeleechToken)
|
|
if err != nil || torrentInfo == nil {
|
|
l.Error().Err(err).Msgf("(%s) could not get torrent info from api: '%s' from: %s", f.Name, release.TorrentID, release.Indexer.Identifier)
|
|
return false, err
|
|
}
|
|
|
|
l.Debug().Msgf("(%s) got torrent info from api: %+v", f.Name, torrentInfo)
|
|
|
|
torrentSize := torrentInfo.ReleaseSizeBytes()
|
|
if release.Size == 0 && torrentSize > 0 {
|
|
release.Size = torrentSize
|
|
}
|
|
|
|
if release.Uploader == "" {
|
|
release.Uploader = torrentInfo.Uploader
|
|
}
|
|
|
|
if release.RecordLabel == "" {
|
|
release.RecordLabel = torrentInfo.RecordLabel
|
|
}
|
|
}
|
|
|
|
default:
|
|
if release.Size == 0 && release.AdditionalSizeCheckRequired {
|
|
l.Trace().Msgf("(%s) preparing to download torrent metafile", f.Name)
|
|
|
|
// if indexer doesn't have api, download torrent and add to tmpPath
|
|
if err := s.downloadSvc.DownloadRelease(ctx, release); err != nil {
|
|
l.Error().Err(err).Msgf("(%s) could not download torrent file with id: '%s' from: %s", f.Name, release.TorrentID, release.Indexer.Identifier)
|
|
return false, errors.Wrap(err, "could not download torrent file for release: %s", release.TorrentName)
|
|
}
|
|
}
|
|
}
|
|
|
|
sizeOk, err := f.CheckReleaseSize(release.Size)
|
|
if err != nil {
|
|
l.Error().Err(err).Msgf("(%s) error comparing release and filter size", f.Name)
|
|
return false, err
|
|
}
|
|
|
|
// reset AdditionalSizeCheckRequired to not re-trigger check
|
|
release.AdditionalSizeCheckRequired = false
|
|
|
|
if !sizeOk {
|
|
l.Debug().Msgf("(%s) filter did not match after additional size check, trying next", f.Name)
|
|
return false, nil
|
|
}
|
|
|
|
return true, nil
|
|
}
|
|
|
|
func (s *service) AdditionalUploaderCheck(ctx context.Context, f *domain.Filter, release *domain.Release) (ok bool, err error) {
|
|
defer func() {
|
|
// try recover panic if anything went wrong with API or size checks
|
|
errors.RecoverPanic(recover(), &err)
|
|
}()
|
|
|
|
// do additional check against indexer api
|
|
l := s.log.With().Str("method", "AdditionalUploaderCheck").Logger()
|
|
|
|
// if uploader was fetched before during size check we check it and return early
|
|
if release.Uploader != "" {
|
|
uploaderOk, err := f.CheckUploader(release.Uploader)
|
|
if err != nil {
|
|
l.Error().Err(err).Msgf("(%s) error comparing release and uploaders", f.Name)
|
|
return false, err
|
|
}
|
|
|
|
// reset AdditionalUploaderCheckRequired to not re-trigger check
|
|
release.AdditionalUploaderCheckRequired = false
|
|
|
|
if !uploaderOk {
|
|
l.Debug().Msgf("(%s) filter did not match after additional uploaders check, trying next", f.Name)
|
|
return false, nil
|
|
}
|
|
|
|
return true, nil
|
|
}
|
|
|
|
l.Debug().Msgf("(%s) additional api uploader check required", f.Name)
|
|
|
|
switch release.Indexer.Identifier {
|
|
case "redacted", "ops", "mock":
|
|
l.Trace().Msgf("(%s) preparing to check via api", f.Name)
|
|
|
|
torrentInfo, err := s.apiService.GetTorrentByID(ctx, release.Indexer.Identifier, release.TorrentID, f.FreeleechToken)
|
|
if err != nil || torrentInfo == nil {
|
|
l.Error().Err(err).Msgf("(%s) could not get torrent info from api: '%s' from: %s", f.Name, release.TorrentID, release.Indexer.Identifier)
|
|
return false, err
|
|
}
|
|
|
|
l.Debug().Msgf("(%s) got torrent info from api: %+v", f.Name, torrentInfo)
|
|
|
|
torrentSize := torrentInfo.ReleaseSizeBytes()
|
|
if release.Size == 0 && torrentSize > 0 {
|
|
release.Size = torrentSize
|
|
}
|
|
|
|
if release.RecordLabel == "" {
|
|
release.RecordLabel = torrentInfo.RecordLabel
|
|
}
|
|
|
|
if release.Uploader == "" {
|
|
release.Uploader = torrentInfo.Uploader
|
|
}
|
|
|
|
default:
|
|
return false, errors.New("additional uploader check not supported for this indexer: %s", release.Indexer.Identifier)
|
|
}
|
|
|
|
uploaderOk, err := f.CheckUploader(release.Uploader)
|
|
if err != nil {
|
|
l.Error().Err(err).Msgf("(%s) error comparing release and uploaders", f.Name)
|
|
return false, err
|
|
}
|
|
|
|
// reset AdditionalUploaderCheckRequired to not re-trigger check
|
|
release.AdditionalUploaderCheckRequired = false
|
|
|
|
if !uploaderOk {
|
|
l.Debug().Msgf("(%s) filter did not match after additional uploaders check, trying next", f.Name)
|
|
return false, nil
|
|
}
|
|
|
|
return true, nil
|
|
}
|
|
|
|
func (s *service) AdditionalRecordLabelCheck(ctx context.Context, f *domain.Filter, release *domain.Release) (ok bool, err error) {
|
|
defer func() {
|
|
// try recover panic if anything went wrong with API or size checks
|
|
errors.RecoverPanic(recover(), &err)
|
|
if err != nil {
|
|
ok = false
|
|
}
|
|
}()
|
|
|
|
// do additional check against indexer api
|
|
l := s.log.With().Str("method", "AdditionalRecordLabelCheck").Logger()
|
|
|
|
// if record label was fetched before during size check or uploader check we check it and return early
|
|
if release.RecordLabel != "" {
|
|
recordLabelOk, err := f.CheckRecordLabel(release.RecordLabel)
|
|
if err != nil {
|
|
l.Error().Err(err).Msgf("(%s) error comparing release and record label", f.Name)
|
|
return false, err
|
|
}
|
|
|
|
// reset AdditionalRecordLabelCheckRequired to not re-trigger check
|
|
release.AdditionalRecordLabelCheckRequired = false
|
|
|
|
if !recordLabelOk {
|
|
l.Debug().Msgf("(%s) filter did not match after additional record label check, trying next", f.Name)
|
|
return false, nil
|
|
}
|
|
|
|
return true, nil
|
|
}
|
|
|
|
l.Debug().Msgf("(%s) additional api record label check required", f.Name)
|
|
|
|
switch release.Indexer.Identifier {
|
|
case "redacted", "ops", "mock":
|
|
l.Trace().Msgf("(%s) preparing to check via api", f.Name)
|
|
|
|
torrentInfo, err := s.apiService.GetTorrentByID(ctx, release.Indexer.Identifier, release.TorrentID, f.FreeleechToken)
|
|
if err != nil || torrentInfo == nil {
|
|
l.Error().Err(err).Msgf("(%s) could not get torrent info from api: '%s' from: %s", f.Name, release.TorrentID, release.Indexer.Identifier)
|
|
return false, err
|
|
}
|
|
|
|
l.Debug().Msgf("(%s) got torrent info from api: %+v", f.Name, torrentInfo)
|
|
|
|
torrentSize := torrentInfo.ReleaseSizeBytes()
|
|
if release.Size == 0 && torrentSize > 0 {
|
|
release.Size = torrentSize
|
|
}
|
|
|
|
if release.Uploader == "" {
|
|
release.Uploader = torrentInfo.Uploader
|
|
}
|
|
|
|
if release.RecordLabel == "" {
|
|
release.RecordLabel = torrentInfo.RecordLabel
|
|
}
|
|
|
|
default:
|
|
return false, errors.New("additional record label check not supported for this indexer: %s", release.Indexer.Identifier)
|
|
}
|
|
|
|
recordLabelOk, err := f.CheckRecordLabel(release.RecordLabel)
|
|
if err != nil {
|
|
l.Error().Err(err).Msgf("(%s) error comparing release and record label", f.Name)
|
|
return false, err
|
|
}
|
|
|
|
// reset AdditionalRecordLabelCheckRequired to not re-trigger check
|
|
release.AdditionalRecordLabelCheckRequired = false
|
|
|
|
if !recordLabelOk {
|
|
l.Debug().Msgf("(%s) filter did not match after additional record label check, trying next", f.Name)
|
|
return false, nil
|
|
}
|
|
|
|
return true, nil
|
|
}
|
|
|
|
func (s *service) CheckSmartEpisodeCanDownload(ctx context.Context, params *domain.SmartEpisodeParams) (bool, error) {
|
|
return s.releaseRepo.CheckSmartEpisodeCanDownload(ctx, params)
|
|
}
|
|
|
|
func (s *service) CheckIsDuplicateRelease(ctx context.Context, profile *domain.DuplicateReleaseProfile, release *domain.Release) (bool, error) {
|
|
return s.releaseRepo.CheckIsDuplicateRelease(ctx, profile, release)
|
|
}
|
|
|
|
func (s *service) RunExternalFilters(ctx context.Context, f *domain.Filter, externalFilters []domain.FilterExternal, release *domain.Release) (ok bool, err error) {
|
|
defer func() {
|
|
// try recover panic if anything went wrong with the external filter checks
|
|
errors.RecoverPanic(recover(), &err)
|
|
if err != nil {
|
|
s.log.Error().Err(err).Msgf("filter %s external filter check panic", f.Name)
|
|
ok = false
|
|
}
|
|
}()
|
|
|
|
// sort filters by index
|
|
sort.Slice(externalFilters, func(i, j int) bool {
|
|
return externalFilters[i].Index < externalFilters[j].Index
|
|
})
|
|
|
|
for _, external := range externalFilters {
|
|
if !external.Enabled {
|
|
s.log.Debug().Msgf("external filter %s not enabled, skipping...", external.Name)
|
|
|
|
continue
|
|
}
|
|
|
|
if external.NeedTorrentDownloaded() {
|
|
if err := s.downloadSvc.DownloadRelease(ctx, release); err != nil {
|
|
return false, errors.Wrap(err, "could not download torrent file for release: %s", release.TorrentName)
|
|
}
|
|
}
|
|
|
|
switch external.Type {
|
|
case domain.ExternalFilterTypeExec:
|
|
// run external script
|
|
exitCode, err := s.execCmd(ctx, external, release)
|
|
if err != nil {
|
|
return false, errors.Wrap(err, "error executing external command")
|
|
}
|
|
|
|
if exitCode != external.ExecExpectStatus {
|
|
s.log.Trace().Msgf("filter.Service.CheckFilter: external script unexpected exit code. got: %d want: %d", exitCode, external.ExecExpectStatus)
|
|
f.RejectReasons.Add("external script exit code", exitCode, external.ExecExpectStatus)
|
|
return false, nil
|
|
}
|
|
|
|
case domain.ExternalFilterTypeWebhook:
|
|
// run external webhook
|
|
statusCode, err := s.webhook(ctx, external, release)
|
|
if err != nil {
|
|
return false, errors.Wrap(err, "error executing external webhook")
|
|
}
|
|
|
|
if statusCode != external.WebhookExpectStatus {
|
|
s.log.Trace().Msgf("filter.Service.CheckFilter: external webhook unexpected status code. got: %d want: %d", statusCode, external.WebhookExpectStatus)
|
|
f.RejectReasons.Add("external webhook status code", statusCode, external.WebhookExpectStatus)
|
|
return false, nil
|
|
}
|
|
}
|
|
}
|
|
|
|
return true, nil
|
|
}
|
|
|
|
func (s *service) execCmd(_ context.Context, external domain.FilterExternal, release *domain.Release) (int, error) {
|
|
s.log.Trace().Msgf("filter exec release: %s", release.TorrentName)
|
|
|
|
// read the file into bytes we can then use in the macro
|
|
if len(release.TorrentDataRawBytes) == 0 && release.TorrentTmpFile != "" {
|
|
if err := release.OpenTorrentFile(); err != nil {
|
|
return 0, errors.Wrap(err, "could not open torrent file for release: %s", release.TorrentName)
|
|
}
|
|
}
|
|
|
|
// check if program exists
|
|
cmd, err := exec.LookPath(external.ExecCmd)
|
|
if err != nil {
|
|
return 0, errors.Wrap(err, "exec failed, could not find program: %s", cmd)
|
|
}
|
|
|
|
// handle args and replace vars
|
|
m := domain.NewMacro(*release)
|
|
|
|
// parse and replace values in argument string before continuing
|
|
parsedArgs, err := m.Parse(external.ExecArgs)
|
|
if err != nil {
|
|
return 0, errors.Wrap(err, "could not parse macro")
|
|
}
|
|
|
|
// we need to split on space into a string slice, so we can spread the args into exec
|
|
p := shellwords.NewParser()
|
|
p.ParseBacktick = true
|
|
commandArgs, err := p.Parse(parsedArgs)
|
|
if err != nil {
|
|
return 0, errors.Wrap(err, "could not parse into shell-words")
|
|
}
|
|
|
|
start := time.Now()
|
|
|
|
// setup command and args
|
|
command := exec.Command(cmd, commandArgs...)
|
|
|
|
s.log.Debug().Msgf("script: %s args: %s", cmd, strings.Join(commandArgs, " "))
|
|
|
|
// Create a pipe to capture the standard output of the command
|
|
cmdOutput, err := command.StdoutPipe()
|
|
if err != nil {
|
|
s.log.Error().Err(err).Msg("could not create stdout pipe")
|
|
return 0, err
|
|
}
|
|
|
|
duration := time.Since(start)
|
|
|
|
// Start the command
|
|
if err := command.Start(); err != nil {
|
|
s.log.Error().Err(err).Msg("error starting command")
|
|
return 0, err
|
|
}
|
|
|
|
// Create a buffer to store the output
|
|
outputBuffer := make([]byte, 4096)
|
|
|
|
execLogger := s.log.With().Str("release", release.TorrentName).Str("filter", release.FilterName).Logger()
|
|
|
|
for {
|
|
// Read the output into the buffer
|
|
n, err := cmdOutput.Read(outputBuffer)
|
|
if err != nil {
|
|
break
|
|
}
|
|
|
|
// Write the output to the logger
|
|
execLogger.Trace().Msg(string(outputBuffer[:n]))
|
|
}
|
|
|
|
// Wait for the command to finish and check for any errors
|
|
if err := command.Wait(); err != nil {
|
|
var exitErr *exec.ExitError
|
|
if errors.As(err, &exitErr) {
|
|
s.log.Debug().Msgf("filter script command exited with non zero code: %v", exitErr.ExitCode())
|
|
return exitErr.ExitCode(), nil
|
|
}
|
|
|
|
s.log.Error().Err(err).Msg("error waiting for command")
|
|
return 0, err
|
|
}
|
|
|
|
s.log.Debug().Msgf("executed external script: (%s), args: (%s) for release: (%s) indexer: (%s) total time (%s)", cmd, parsedArgs, release.TorrentName, release.Indexer.Name, duration)
|
|
|
|
return 0, nil
|
|
}
|
|
|
|
func (s *service) webhook(ctx context.Context, external domain.FilterExternal, release *domain.Release) (int, error) {
|
|
s.log.Trace().Msgf("preparing to run external webhook filter to: (%s) payload: (%s)", external.WebhookHost, external.WebhookData)
|
|
|
|
if external.WebhookHost == "" {
|
|
return 0, errors.New("external filter: missing host for webhook")
|
|
}
|
|
|
|
// if webhook data contains TorrentDataRawBytes, lets read the file into bytes we can then use in the macro
|
|
if len(release.TorrentDataRawBytes) == 0 && strings.Contains(external.WebhookData, "TorrentDataRawBytes") {
|
|
if err := release.OpenTorrentFile(); err != nil {
|
|
return 0, errors.Wrap(err, "could not open torrent file for release: %s", release.TorrentName)
|
|
}
|
|
}
|
|
|
|
m := domain.NewMacro(*release)
|
|
|
|
// parse and replace values in argument string before continuing
|
|
dataArgs, err := m.Parse(external.WebhookData)
|
|
if err != nil {
|
|
return 0, errors.Wrap(err, "could not parse webhook data macro: %s", external.WebhookData)
|
|
}
|
|
|
|
s.log.Trace().Msgf("sending %s to external webhook filter: (%s) payload: (%s)", external.WebhookMethod, external.WebhookHost, external.WebhookData)
|
|
|
|
method := http.MethodPost
|
|
if external.WebhookMethod != "" {
|
|
method = external.WebhookMethod
|
|
}
|
|
|
|
req, err := http.NewRequestWithContext(ctx, method, external.WebhookHost, nil)
|
|
if err != nil {
|
|
return 0, errors.Wrap(err, "could not build request for webhook")
|
|
}
|
|
|
|
req.Header.Set("Content-Type", "application/json")
|
|
req.Header.Set("User-Agent", "autobrr")
|
|
|
|
if external.WebhookHeaders != "" {
|
|
headers := strings.Split(external.WebhookHeaders, ";")
|
|
|
|
for _, header := range headers {
|
|
h := strings.Split(header, "=")
|
|
|
|
if len(h) != 2 {
|
|
continue
|
|
}
|
|
|
|
// add header to req
|
|
req.Header.Add(h[0], h[1]) // go already canonicalizes the provided header key.
|
|
}
|
|
}
|
|
|
|
var opts []retry.Option
|
|
|
|
opts = append(opts, retry.DelayType(retry.FixedDelay))
|
|
opts = append(opts, retry.LastErrorOnly(true))
|
|
|
|
if external.WebhookRetryAttempts > 0 {
|
|
opts = append(opts, retry.Attempts(uint(external.WebhookRetryAttempts)))
|
|
}
|
|
if external.WebhookRetryDelaySeconds > 0 {
|
|
opts = append(opts, retry.Delay(time.Duration(external.WebhookRetryDelaySeconds)*time.Second))
|
|
}
|
|
|
|
var retryStatusCodes []string
|
|
if external.WebhookRetryStatus != "" {
|
|
retryStatusCodes = strings.Split(strings.ReplaceAll(external.WebhookRetryStatus, " ", ""), ",")
|
|
}
|
|
|
|
start := time.Now()
|
|
|
|
statusCode, err := retry.DoWithData(
|
|
func() (int, error) {
|
|
clonereq := req.Clone(ctx)
|
|
if external.WebhookData != "" && dataArgs != "" {
|
|
clonereq.Body = io.NopCloser(bytes.NewBufferString(dataArgs))
|
|
}
|
|
res, err := s.httpClient.Do(clonereq)
|
|
if err != nil {
|
|
return 0, errors.Wrap(err, "could not make request for webhook")
|
|
}
|
|
|
|
defer res.Body.Close()
|
|
|
|
s.log.Debug().Msgf("filter external webhook response status: %d", res.StatusCode)
|
|
|
|
if s.log.Debug().Enabled() {
|
|
body, err := io.ReadAll(res.Body)
|
|
if err != nil {
|
|
return res.StatusCode, errors.Wrap(err, "could not read request body")
|
|
}
|
|
|
|
if len(body) > 0 {
|
|
s.log.Debug().Msgf("filter external webhook response status: %d body: %s", res.StatusCode, body)
|
|
}
|
|
}
|
|
|
|
if utils.StrSliceContains(retryStatusCodes, strconv.Itoa(res.StatusCode)) {
|
|
return 0, errors.New("webhook got unwanted status code: %d", res.StatusCode)
|
|
}
|
|
|
|
return res.StatusCode, nil
|
|
},
|
|
opts...)
|
|
|
|
s.log.Debug().Msgf("successfully ran external webhook filter to: (%s) payload: (%s) finished in %s", external.WebhookHost, dataArgs, time.Since(start))
|
|
|
|
return statusCode, err
|
|
}
|