autobrr/internal/filter/service.go
ze0s ef7317dde6
feat(filters): show current download count in list (#2001)
* feat(filters): show current and max downloads in list

* feat(filters): remove unused func param
2025-03-16 18:33:47 +01:00

1031 lines
32 KiB
Go

// Copyright (c) 2021 - 2025, Ludvig Lundgren and the autobrr contributors.
// SPDX-License-Identifier: GPL-2.0-or-later
package filter
import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"os/exec"
"sort"
"strconv"
"strings"
"time"
"github.com/autobrr/autobrr/internal/action"
"github.com/autobrr/autobrr/internal/domain"
"github.com/autobrr/autobrr/internal/indexer"
"github.com/autobrr/autobrr/internal/logger"
"github.com/autobrr/autobrr/internal/releasedownload"
"github.com/autobrr/autobrr/internal/utils"
"github.com/autobrr/autobrr/pkg/errors"
"github.com/autobrr/autobrr/pkg/sharedhttp"
"github.com/avast/retry-go/v4"
"github.com/mattn/go-shellwords"
"github.com/rs/zerolog"
)
type Service interface {
FindByID(ctx context.Context, filterID int) (*domain.Filter, error)
FindByIndexerIdentifier(ctx context.Context, indexer string) ([]*domain.Filter, error)
Find(ctx context.Context, params domain.FilterQueryParams) ([]domain.Filter, error)
CheckFilter(ctx context.Context, f *domain.Filter, release *domain.Release) (bool, error)
ListFilters(ctx context.Context) ([]domain.Filter, error)
Store(ctx context.Context, filter *domain.Filter) error
Update(ctx context.Context, filter *domain.Filter) error
UpdatePartial(ctx context.Context, filter domain.FilterUpdate) error
Duplicate(ctx context.Context, filterID int) (*domain.Filter, error)
ToggleEnabled(ctx context.Context, filterID int, enabled bool) error
Delete(ctx context.Context, filterID int) error
AdditionalSizeCheck(ctx context.Context, f *domain.Filter, release *domain.Release) (bool, error)
AdditionalUploaderCheck(ctx context.Context, f *domain.Filter, release *domain.Release) (bool, error)
AdditionalRecordLabelCheck(ctx context.Context, f *domain.Filter, release *domain.Release) (bool, error)
CheckSmartEpisodeCanDownload(ctx context.Context, params *domain.SmartEpisodeParams) (bool, error)
GetDownloadsByFilterId(ctx context.Context, filterID int) (*domain.FilterDownloads, error)
CheckIsDuplicateRelease(ctx context.Context, profile *domain.DuplicateReleaseProfile, release *domain.Release) (bool, error)
}
type service struct {
log zerolog.Logger
repo domain.FilterRepo
actionService action.Service
releaseRepo domain.ReleaseRepo
indexerSvc indexer.Service
apiService indexer.APIService
downloadSvc *releasedownload.DownloadService
httpClient *http.Client
}
func NewService(log logger.Logger, repo domain.FilterRepo, actionSvc action.Service, releaseRepo domain.ReleaseRepo, apiService indexer.APIService, indexerSvc indexer.Service, downloadSvc *releasedownload.DownloadService) Service {
return &service{
log: log.With().Str("module", "filter").Logger(),
repo: repo,
releaseRepo: releaseRepo,
actionService: actionSvc,
apiService: apiService,
indexerSvc: indexerSvc,
downloadSvc: downloadSvc,
httpClient: &http.Client{
Timeout: time.Second * 120,
Transport: sharedhttp.TransportTLSInsecure,
},
}
}
func (s *service) Find(ctx context.Context, params domain.FilterQueryParams) ([]domain.Filter, error) {
// get filters
filters, err := s.repo.Find(ctx, params)
if err != nil {
s.log.Error().Err(err).Msgf("could not find list filters")
return nil, err
}
ret := make([]domain.Filter, 0)
for _, filter := range filters {
indexers, err := s.indexerSvc.FindByFilterID(ctx, filter.ID)
if err != nil {
return ret, err
}
filter.Indexers = indexers
if filter.MaxDownloads > 0 && filter.MaxDownloadsUnit != "" {
counts, err := s.repo.GetDownloadsByFilterId(ctx, filter.ID)
if err != nil {
return ret, err
}
filter.Downloads = counts
}
ret = append(ret, filter)
}
return ret, nil
}
func (s *service) ListFilters(ctx context.Context) ([]domain.Filter, error) {
// get filters
filters, err := s.repo.ListFilters(ctx)
if err != nil {
s.log.Error().Err(err).Msgf("could not find list filters")
return nil, err
}
ret := make([]domain.Filter, 0)
for _, filter := range filters {
indexers, err := s.indexerSvc.FindByFilterID(ctx, filter.ID)
if err != nil {
return ret, err
}
filter.Indexers = indexers
ret = append(ret, filter)
}
return ret, nil
}
func (s *service) FindByID(ctx context.Context, filterID int) (*domain.Filter, error) {
filter, err := s.repo.FindByID(ctx, filterID)
if err != nil {
s.log.Error().Err(err).Msgf("could not find filter for id: %v", filterID)
return nil, err
}
externalFilters, err := s.repo.FindExternalFiltersByID(ctx, filter.ID)
if err != nil {
s.log.Error().Err(err).Msgf("could not find external filters for filter id: %v", filter.ID)
}
filter.External = externalFilters
actions, err := s.actionService.FindByFilterID(ctx, filter.ID, nil, false)
if err != nil {
s.log.Error().Err(err).Msgf("could not find filter actions for filter id: %v", filter.ID)
}
filter.Actions = actions
indexers, err := s.indexerSvc.FindByFilterID(ctx, filter.ID)
if err != nil {
s.log.Error().Err(err).Msgf("could not find indexers for filter: %v", filter.Name)
return nil, err
}
filter.Indexers = indexers
return filter, nil
}
func (s *service) FindByIndexerIdentifier(ctx context.Context, indexer string) ([]*domain.Filter, error) {
// get filters for indexer
filters, err := s.repo.FindByIndexerIdentifier(ctx, indexer)
if err != nil {
return nil, err
}
// we do not load actions here since we do not need it at this stage
// only load those after filter has matched
for _, filter := range filters {
filter := filter
externalFilters, err := s.repo.FindExternalFiltersByID(ctx, filter.ID)
if err != nil {
s.log.Error().Err(err).Msgf("could not find external filters for filter id: %v", filter.ID)
}
filter.External = externalFilters
}
return filters, nil
}
func (s *service) GetDownloadsByFilterId(ctx context.Context, filterID int) (*domain.FilterDownloads, error) {
return s.GetDownloadsByFilterId(ctx, filterID)
}
func (s *service) Store(ctx context.Context, filter *domain.Filter) error {
if err := filter.Validate(); err != nil {
s.log.Error().Err(err).Msgf("invalid filter: %v", filter)
return err
}
if filter.AnnounceTypes == nil || len(filter.AnnounceTypes) == 0 {
filter.AnnounceTypes = []string{string(domain.AnnounceTypeNew)}
}
if err := s.repo.Store(ctx, filter); err != nil {
s.log.Error().Err(err).Msgf("could not store filter: %v", filter)
return err
}
return nil
}
func (s *service) Update(ctx context.Context, filter *domain.Filter) error {
err := filter.Validate()
if err != nil {
s.log.Error().Err(err).Msgf("validation error filter: %+v", filter)
return err
}
err = filter.Sanitize()
if err != nil {
s.log.Error().Err(err).Msgf("could not sanitize filter: %v", filter)
return err
}
// update
err = s.repo.Update(ctx, filter)
if err != nil {
s.log.Error().Err(err).Msgf("could not update filter: %s", filter.Name)
return err
}
// take care of connected indexers
err = s.repo.StoreIndexerConnections(ctx, filter.ID, filter.Indexers)
if err != nil {
s.log.Error().Err(err).Msgf("could not store filter indexer connections: %s", filter.Name)
return err
}
// take care of connected external filters
err = s.repo.StoreFilterExternal(ctx, filter.ID, filter.External)
if err != nil {
s.log.Error().Err(err).Msgf("could not store external filters: %s", filter.Name)
return err
}
// take care of filter actions
actions, err := s.actionService.StoreFilterActions(ctx, int64(filter.ID), filter.Actions)
if err != nil {
s.log.Error().Err(err).Msgf("could not store filter actions: %s", filter.Name)
return err
}
filter.Actions = actions
return nil
}
func (s *service) UpdatePartial(ctx context.Context, filter domain.FilterUpdate) error {
// cleanup
if filter.Shows != nil {
// replace newline with comma
clean := strings.ReplaceAll(*filter.Shows, "\n", ",")
clean = strings.ReplaceAll(clean, ",,", ",")
filter.Shows = &clean
}
// update
if err := s.repo.UpdatePartial(ctx, filter); err != nil {
s.log.Error().Err(err).Msgf("could not update partial filter: %v", filter.ID)
return err
}
if filter.Indexers != nil {
// take care of connected indexers
if err := s.repo.StoreIndexerConnections(ctx, filter.ID, filter.Indexers); err != nil {
s.log.Error().Err(err).Msgf("could not store filter indexer connections: %v", filter.Name)
return err
}
}
if filter.External != nil {
// take care of connected external filters
if err := s.repo.StoreFilterExternal(ctx, filter.ID, filter.External); err != nil {
s.log.Error().Err(err).Msgf("could not store external filters: %v", filter.Name)
return err
}
}
if filter.Actions != nil {
// take care of filter actions
if _, err := s.actionService.StoreFilterActions(ctx, int64(filter.ID), filter.Actions); err != nil {
s.log.Error().Err(err).Msgf("could not store filter actions: %v", filter.ID)
return err
}
}
return nil
}
func (s *service) Duplicate(ctx context.Context, filterID int) (*domain.Filter, error) {
// find filter with actions, indexers and external filters
filter, err := s.FindByID(ctx, filterID)
if err != nil {
return nil, err
}
// reset id and name
filter.ID = 0
filter.Name = fmt.Sprintf("%s Copy", filter.Name)
filter.Enabled = false
// store new filter
if err := s.repo.Store(ctx, filter); err != nil {
s.log.Error().Err(err).Msgf("could not update filter: %s", filter.Name)
return nil, err
}
// take care of connected indexers
if err := s.repo.StoreIndexerConnections(ctx, filter.ID, filter.Indexers); err != nil {
s.log.Error().Err(err).Msgf("could not store filter indexer connections: %s", filter.Name)
return nil, err
}
// reset action id to 0
for i, a := range filter.Actions {
a := a
a.ID = 0
filter.Actions[i] = a
}
// take care of filter actions
if _, err := s.actionService.StoreFilterActions(ctx, int64(filter.ID), filter.Actions); err != nil {
s.log.Error().Err(err).Msgf("could not store filter actions: %s", filter.Name)
return nil, err
}
// take care of connected external filters
// the external filters are fetched with FindByID
if err := s.repo.StoreFilterExternal(ctx, filter.ID, filter.External); err != nil {
s.log.Error().Err(err).Msgf("could not store external filters: %s", filter.Name)
return nil, err
}
return filter, nil
}
func (s *service) ToggleEnabled(ctx context.Context, filterID int, enabled bool) error {
if err := s.repo.ToggleEnabled(ctx, filterID, enabled); err != nil {
s.log.Error().Err(err).Msg("could not update filter enabled")
return err
}
s.log.Debug().Msgf("filter.toggle_enabled: update filter '%v' to '%v'", filterID, enabled)
return nil
}
func (s *service) Delete(ctx context.Context, filterID int) error {
if filterID == 0 {
return nil
}
// take care of filter actions
if err := s.actionService.DeleteByFilterID(ctx, filterID); err != nil {
s.log.Error().Err(err).Msg("could not delete filter actions")
return err
}
// take care of filter indexers
if err := s.repo.DeleteIndexerConnections(ctx, filterID); err != nil {
s.log.Error().Err(err).Msg("could not delete filter indexers")
return err
}
// delete filter external
if err := s.repo.DeleteFilterExternal(ctx, filterID); err != nil {
s.log.Error().Err(err).Msgf("could not delete filter external: %v", filterID)
return err
}
// delete filter
if err := s.repo.Delete(ctx, filterID); err != nil {
s.log.Error().Err(err).Msgf("could not delete filter: %v", filterID)
return err
}
return nil
}
func (s *service) CheckFilter(ctx context.Context, f *domain.Filter, release *domain.Release) (bool, error) {
l := s.log.With().Str("method", "CheckFilter").Logger()
l.Debug().Msgf("checking filter: %s with release %s", f.Name, release.TorrentName)
l.Trace().Msgf("checking filter: %s %+v", f.Name, f)
l.Trace().Msgf("checking filter: %s for release: %+v", f.Name, release)
// do additional fetch to get download counts for filter
if f.MaxDownloads > 0 {
downloadCounts, err := s.repo.GetDownloadsByFilterId(ctx, f.ID)
if err != nil {
l.Error().Err(err).Msg("error getting download counters for filter")
return false, nil
}
f.Downloads = downloadCounts
}
rejections, matchedFilter := f.CheckFilter(release)
if rejections.Len() > 0 {
l.Debug().Msgf("(%s) for release: %v rejections: (%s)", f.Name, release.TorrentName, rejections.StringTruncated())
return false, nil
}
if !matchedFilter {
// if no match, return nil
return false, nil
}
// smartEpisode check
if f.SmartEpisode {
params := &domain.SmartEpisodeParams{
Title: release.Title,
Season: release.Season,
Episode: release.Episode,
Year: release.Year,
Month: release.Month,
Day: release.Day,
Repack: release.Repack,
Proper: release.Proper,
Group: release.Group,
}
canDownloadShow, err := s.CheckSmartEpisodeCanDownload(ctx, params)
if err != nil {
l.Trace().Msgf("failed smart episode check: %s", f.Name)
return false, nil
}
if !canDownloadShow {
l.Trace().Msgf("failed smart episode check: %s", f.Name)
if params.IsDailyEpisode() {
f.RejectReasons.Add("smart episode", fmt.Sprintf("not new (%s) daily: %d-%d-%d", release.Title, release.Year, release.Month, release.Day), fmt.Sprintf("expected newer than (%s) daily: %d-%d-%d", release.Title, release.Year, release.Month, release.Day))
} else {
f.RejectReasons.Add("smart episode", fmt.Sprintf("not new (%s) season: %d ep: %d", release.Title, release.Season, release.Episode), fmt.Sprintf("expected newer than (%s) season: %d ep: %d", release.Title, release.Season, release.Episode))
}
return false, nil
}
}
// check duplicates
if f.DuplicateHandling != nil {
l.Debug().Msgf("(%s) check is duplicate with profile %s", f.Name, f.DuplicateHandling.Name)
release.SkipDuplicateProfileID = f.DuplicateHandling.ID
release.SkipDuplicateProfileName = f.DuplicateHandling.Name
isDuplicate, err := s.CheckIsDuplicateRelease(ctx, f.DuplicateHandling, release)
if err != nil {
return false, errors.Wrap(err, "error finding duplicate handle")
}
if isDuplicate {
l.Debug().Msgf("filter %s rejected release %q as duplicate with profile %q", f.Name, release.TorrentName, f.DuplicateHandling.Name)
f.RejectReasons.Add("duplicate", "duplicate", "not duplicate")
// let it continue so external filters can trigger checks
//return false, nil
release.IsDuplicate = true
}
}
// if matched, do additional size check if needed, attach actions and return the filter
l.Debug().Msgf("found and matched filter: %s", f.Name)
// If size constraints are set in a filter and the indexer did not
// announce the size, we need to do an additional out of band size check.
if release.AdditionalSizeCheckRequired {
l.Debug().Msgf("(%s) additional size check required", f.Name)
ok, err := s.AdditionalSizeCheck(ctx, f, release)
if err != nil {
l.Error().Err(err).Msgf("(%s) additional size check error", f.Name)
return false, err
}
if !ok {
l.Trace().Msgf("(%s) additional size check not matching what filter wanted", f.Name)
return false, nil
}
}
// check uploader if the indexer supports check via api
if release.AdditionalUploaderCheckRequired {
l.Debug().Msgf("(%s) additional uploader check required", f.Name)
ok, err := s.AdditionalUploaderCheck(ctx, f, release)
if err != nil {
l.Error().Err(err).Msgf("(%s) additional uploader check error", f.Name)
return false, err
}
if !ok {
l.Trace().Msgf("(%s) additional uploader check not matching what filter wanted", f.Name)
return false, nil
}
}
if release.AdditionalRecordLabelCheckRequired {
l.Debug().Msgf("(%s) additional record label check required", f.Name)
ok, err := s.AdditionalRecordLabelCheck(ctx, f, release)
if err != nil {
l.Error().Err(err).Msgf("(%s) additional record label check error", f.Name)
return false, err
}
if !ok {
l.Trace().Msgf("(%s) additional record label check not matching what filter wanted", f.Name)
return false, nil
}
}
// run external filters
if f.External != nil {
externalOk, err := s.RunExternalFilters(ctx, f, f.External, release)
if err != nil {
l.Error().Err(err).Msgf("(%s) external filter check error", f.Name)
return false, err
}
if !externalOk {
l.Debug().Msgf("(%s) external filter check not matching what filter wanted", f.Name)
return false, nil
}
}
return true, nil
}
// AdditionalSizeCheck performs additional out-of-band checks to determine the
// values of a torrent. Some indexers do not announce torrent size, so it is
// necessary to determine the size of the torrent in some other way. Some
// indexers have an API implemented to fetch this data. For those which don't,
// it is necessary to download the torrent file and parse it to make the size
// check. We use the API where available to minimize the number of torrents we
// need to download.
func (s *service) AdditionalSizeCheck(ctx context.Context, f *domain.Filter, release *domain.Release) (ok bool, err error) {
defer func() {
// try recover panic if anything went wrong with API or size checks
errors.RecoverPanic(recover(), &err)
}()
// do additional size check against indexer api or torrent for size
l := s.log.With().Str("method", "AdditionalSizeCheck").Logger()
l.Debug().Msgf("(%s) additional api size check required", f.Name)
switch release.Indexer.Identifier {
case "btn", "ggn", "redacted", "ops", "mock":
if (release.Size == 0 && release.AdditionalSizeCheckRequired) || (release.Uploader == "" && release.AdditionalUploaderCheckRequired) || (release.RecordLabel == "" && release.AdditionalRecordLabelCheckRequired) {
l.Trace().Msgf("(%s) preparing to check size via api", f.Name)
torrentInfo, err := s.apiService.GetTorrentByID(ctx, release.Indexer.Identifier, release.TorrentID)
if err != nil || torrentInfo == nil {
l.Error().Err(err).Msgf("(%s) could not get torrent info from api: '%s' from: %s", f.Name, release.TorrentID, release.Indexer.Identifier)
return false, err
}
l.Debug().Msgf("(%s) got torrent info from api: %+v", f.Name, torrentInfo)
torrentSize := torrentInfo.ReleaseSizeBytes()
if release.Size == 0 && torrentSize > 0 {
release.Size = torrentSize
}
if release.Uploader == "" {
release.Uploader = torrentInfo.Uploader
}
if release.RecordLabel == "" {
release.RecordLabel = torrentInfo.RecordLabel
}
}
default:
if release.Size == 0 && release.AdditionalSizeCheckRequired {
l.Trace().Msgf("(%s) preparing to download torrent metafile", f.Name)
// if indexer doesn't have api, download torrent and add to tmpPath
if err := s.downloadSvc.DownloadRelease(ctx, release); err != nil {
l.Error().Err(err).Msgf("(%s) could not download torrent file with id: '%s' from: %s", f.Name, release.TorrentID, release.Indexer.Identifier)
return false, errors.Wrap(err, "could not download torrent file for release: %s", release.TorrentName)
}
}
}
sizeOk, err := f.CheckReleaseSize(release.Size)
if err != nil {
l.Error().Err(err).Msgf("(%s) error comparing release and filter size", f.Name)
return false, err
}
// reset AdditionalSizeCheckRequired to not re-trigger check
release.AdditionalSizeCheckRequired = false
if !sizeOk {
l.Debug().Msgf("(%s) filter did not match after additional size check, trying next", f.Name)
return false, nil
}
return true, nil
}
func (s *service) AdditionalUploaderCheck(ctx context.Context, f *domain.Filter, release *domain.Release) (ok bool, err error) {
defer func() {
// try recover panic if anything went wrong with API or size checks
errors.RecoverPanic(recover(), &err)
}()
// do additional check against indexer api
l := s.log.With().Str("method", "AdditionalUploaderCheck").Logger()
// if uploader was fetched before during size check we check it and return early
if release.Uploader != "" {
uploaderOk, err := f.CheckUploader(release.Uploader)
if err != nil {
l.Error().Err(err).Msgf("(%s) error comparing release and uploaders", f.Name)
return false, err
}
// reset AdditionalUploaderCheckRequired to not re-trigger check
release.AdditionalUploaderCheckRequired = false
if !uploaderOk {
l.Debug().Msgf("(%s) filter did not match after additional uploaders check, trying next", f.Name)
return false, nil
}
return true, nil
}
l.Debug().Msgf("(%s) additional api uploader check required", f.Name)
switch release.Indexer.Identifier {
case "redacted", "ops", "mock":
l.Trace().Msgf("(%s) preparing to check via api", f.Name)
torrentInfo, err := s.apiService.GetTorrentByID(ctx, release.Indexer.Identifier, release.TorrentID)
if err != nil || torrentInfo == nil {
l.Error().Err(err).Msgf("(%s) could not get torrent info from api: '%s' from: %s", f.Name, release.TorrentID, release.Indexer.Identifier)
return false, err
}
l.Debug().Msgf("(%s) got torrent info from api: %+v", f.Name, torrentInfo)
torrentSize := torrentInfo.ReleaseSizeBytes()
if release.Size == 0 && torrentSize > 0 {
release.Size = torrentSize
}
if release.RecordLabel == "" {
release.RecordLabel = torrentInfo.RecordLabel
}
if release.Uploader == "" {
release.Uploader = torrentInfo.Uploader
}
default:
return false, errors.New("additional uploader check not supported for this indexer: %s", release.Indexer.Identifier)
}
uploaderOk, err := f.CheckUploader(release.Uploader)
if err != nil {
l.Error().Err(err).Msgf("(%s) error comparing release and uploaders", f.Name)
return false, err
}
// reset AdditionalUploaderCheckRequired to not re-trigger check
release.AdditionalUploaderCheckRequired = false
if !uploaderOk {
l.Debug().Msgf("(%s) filter did not match after additional uploaders check, trying next", f.Name)
return false, nil
}
return true, nil
}
func (s *service) AdditionalRecordLabelCheck(ctx context.Context, f *domain.Filter, release *domain.Release) (ok bool, err error) {
defer func() {
// try recover panic if anything went wrong with API or size checks
errors.RecoverPanic(recover(), &err)
if err != nil {
ok = false
}
}()
// do additional check against indexer api
l := s.log.With().Str("method", "AdditionalRecordLabelCheck").Logger()
// if record label was fetched before during size check or uploader check we check it and return early
if release.RecordLabel != "" {
recordLabelOk, err := f.CheckRecordLabel(release.RecordLabel)
if err != nil {
l.Error().Err(err).Msgf("(%s) error comparing release and record label", f.Name)
return false, err
}
// reset AdditionalRecordLabelCheckRequired to not re-trigger check
release.AdditionalRecordLabelCheckRequired = false
if !recordLabelOk {
l.Debug().Msgf("(%s) filter did not match after additional record label check, trying next", f.Name)
return false, nil
}
return true, nil
}
l.Debug().Msgf("(%s) additional api record label check required", f.Name)
switch release.Indexer.Identifier {
case "redacted", "ops", "mock":
l.Trace().Msgf("(%s) preparing to check via api", f.Name)
torrentInfo, err := s.apiService.GetTorrentByID(ctx, release.Indexer.Identifier, release.TorrentID)
if err != nil || torrentInfo == nil {
l.Error().Err(err).Msgf("(%s) could not get torrent info from api: '%s' from: %s", f.Name, release.TorrentID, release.Indexer.Identifier)
return false, err
}
l.Debug().Msgf("(%s) got torrent info from api: %+v", f.Name, torrentInfo)
torrentSize := torrentInfo.ReleaseSizeBytes()
if release.Size == 0 && torrentSize > 0 {
release.Size = torrentSize
}
if release.Uploader == "" {
release.Uploader = torrentInfo.Uploader
}
if release.RecordLabel == "" {
release.RecordLabel = torrentInfo.RecordLabel
}
default:
return false, errors.New("additional record label check not supported for this indexer: %s", release.Indexer.Identifier)
}
recordLabelOk, err := f.CheckRecordLabel(release.RecordLabel)
if err != nil {
l.Error().Err(err).Msgf("(%s) error comparing release and record label", f.Name)
return false, err
}
// reset AdditionalRecordLabelCheckRequired to not re-trigger check
release.AdditionalRecordLabelCheckRequired = false
if !recordLabelOk {
l.Debug().Msgf("(%s) filter did not match after additional record label check, trying next", f.Name)
return false, nil
}
return true, nil
}
func (s *service) CheckSmartEpisodeCanDownload(ctx context.Context, params *domain.SmartEpisodeParams) (bool, error) {
return s.releaseRepo.CheckSmartEpisodeCanDownload(ctx, params)
}
func (s *service) CheckIsDuplicateRelease(ctx context.Context, profile *domain.DuplicateReleaseProfile, release *domain.Release) (bool, error) {
return s.releaseRepo.CheckIsDuplicateRelease(ctx, profile, release)
}
func (s *service) RunExternalFilters(ctx context.Context, f *domain.Filter, externalFilters []domain.FilterExternal, release *domain.Release) (ok bool, err error) {
defer func() {
// try recover panic if anything went wrong with the external filter checks
errors.RecoverPanic(recover(), &err)
if err != nil {
s.log.Error().Err(err).Msgf("filter %s external filter check panic", f.Name)
ok = false
}
}()
// sort filters by index
sort.Slice(externalFilters, func(i, j int) bool {
return externalFilters[i].Index < externalFilters[j].Index
})
for _, external := range externalFilters {
if !external.Enabled {
s.log.Debug().Msgf("external filter %s not enabled, skipping...", external.Name)
continue
}
if external.NeedTorrentDownloaded() {
if err := s.downloadSvc.DownloadRelease(ctx, release); err != nil {
return false, errors.Wrap(err, "could not download torrent file for release: %s", release.TorrentName)
}
}
switch external.Type {
case domain.ExternalFilterTypeExec:
// run external script
exitCode, err := s.execCmd(ctx, external, release)
if err != nil {
return false, errors.Wrap(err, "error executing external command")
}
if exitCode != external.ExecExpectStatus {
s.log.Trace().Msgf("filter.Service.CheckFilter: external script unexpected exit code. got: %d want: %d", exitCode, external.ExecExpectStatus)
f.RejectReasons.Add("external script exit code", exitCode, external.ExecExpectStatus)
return false, nil
}
case domain.ExternalFilterTypeWebhook:
// run external webhook
statusCode, err := s.webhook(ctx, external, release)
if err != nil {
return false, errors.Wrap(err, "error executing external webhook")
}
if statusCode != external.WebhookExpectStatus {
s.log.Trace().Msgf("filter.Service.CheckFilter: external webhook unexpected status code. got: %d want: %d", statusCode, external.WebhookExpectStatus)
f.RejectReasons.Add("external webhook status code", statusCode, external.WebhookExpectStatus)
return false, nil
}
}
}
return true, nil
}
func (s *service) execCmd(_ context.Context, external domain.FilterExternal, release *domain.Release) (int, error) {
s.log.Trace().Msgf("filter exec release: %s", release.TorrentName)
// read the file into bytes we can then use in the macro
if len(release.TorrentDataRawBytes) == 0 && release.TorrentTmpFile != "" {
if err := release.OpenTorrentFile(); err != nil {
return 0, errors.Wrap(err, "could not open torrent file for release: %s", release.TorrentName)
}
}
// check if program exists
cmd, err := exec.LookPath(external.ExecCmd)
if err != nil {
return 0, errors.Wrap(err, "exec failed, could not find program: %s", cmd)
}
// handle args and replace vars
m := domain.NewMacro(*release)
// parse and replace values in argument string before continuing
parsedArgs, err := m.Parse(external.ExecArgs)
if err != nil {
return 0, errors.Wrap(err, "could not parse macro")
}
// we need to split on space into a string slice, so we can spread the args into exec
p := shellwords.NewParser()
p.ParseBacktick = true
commandArgs, err := p.Parse(parsedArgs)
if err != nil {
return 0, errors.Wrap(err, "could not parse into shell-words")
}
start := time.Now()
// setup command and args
command := exec.Command(cmd, commandArgs...)
s.log.Debug().Msgf("script: %s args: %s", cmd, strings.Join(commandArgs, " "))
// Create a pipe to capture the standard output of the command
cmdOutput, err := command.StdoutPipe()
if err != nil {
s.log.Error().Err(err).Msg("could not create stdout pipe")
return 0, err
}
duration := time.Since(start)
// Start the command
if err := command.Start(); err != nil {
s.log.Error().Err(err).Msg("error starting command")
return 0, err
}
// Create a buffer to store the output
outputBuffer := make([]byte, 4096)
execLogger := s.log.With().Str("release", release.TorrentName).Str("filter", release.FilterName).Logger()
for {
// Read the output into the buffer
n, err := cmdOutput.Read(outputBuffer)
if err != nil {
break
}
// Write the output to the logger
execLogger.Trace().Msg(string(outputBuffer[:n]))
}
// Wait for the command to finish and check for any errors
if err := command.Wait(); err != nil {
var exitErr *exec.ExitError
if errors.As(err, &exitErr) {
s.log.Debug().Msgf("filter script command exited with non zero code: %v", exitErr.ExitCode())
return exitErr.ExitCode(), nil
}
s.log.Error().Err(err).Msg("error waiting for command")
return 0, err
}
s.log.Debug().Msgf("executed external script: (%s), args: (%s) for release: (%s) indexer: (%s) total time (%s)", cmd, parsedArgs, release.TorrentName, release.Indexer.Name, duration)
return 0, nil
}
func (s *service) webhook(ctx context.Context, external domain.FilterExternal, release *domain.Release) (int, error) {
s.log.Trace().Msgf("preparing to run external webhook filter to: (%s) payload: (%s)", external.WebhookHost, external.WebhookData)
if external.WebhookHost == "" {
return 0, errors.New("external filter: missing host for webhook")
}
// if webhook data contains TorrentDataRawBytes, lets read the file into bytes we can then use in the macro
if len(release.TorrentDataRawBytes) == 0 && strings.Contains(external.WebhookData, "TorrentDataRawBytes") {
if err := release.OpenTorrentFile(); err != nil {
return 0, errors.Wrap(err, "could not open torrent file for release: %s", release.TorrentName)
}
}
m := domain.NewMacro(*release)
// parse and replace values in argument string before continuing
dataArgs, err := m.Parse(external.WebhookData)
if err != nil {
return 0, errors.Wrap(err, "could not parse webhook data macro: %s", external.WebhookData)
}
s.log.Trace().Msgf("sending %s to external webhook filter: (%s) payload: (%s)", external.WebhookMethod, external.WebhookHost, external.WebhookData)
method := http.MethodPost
if external.WebhookMethod != "" {
method = external.WebhookMethod
}
req, err := http.NewRequestWithContext(ctx, method, external.WebhookHost, nil)
if err != nil {
return 0, errors.Wrap(err, "could not build request for webhook")
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("User-Agent", "autobrr")
if external.WebhookHeaders != "" {
headers := strings.Split(external.WebhookHeaders, ";")
for _, header := range headers {
h := strings.Split(header, "=")
if len(h) != 2 {
continue
}
// add header to req
req.Header.Add(h[0], h[1]) // go already canonicalizes the provided header key.
}
}
var opts []retry.Option
opts = append(opts, retry.DelayType(retry.FixedDelay))
opts = append(opts, retry.LastErrorOnly(true))
if external.WebhookRetryAttempts > 0 {
opts = append(opts, retry.Attempts(uint(external.WebhookRetryAttempts)))
}
if external.WebhookRetryDelaySeconds > 0 {
opts = append(opts, retry.Delay(time.Duration(external.WebhookRetryDelaySeconds)*time.Second))
}
var retryStatusCodes []string
if external.WebhookRetryStatus != "" {
retryStatusCodes = strings.Split(strings.ReplaceAll(external.WebhookRetryStatus, " ", ""), ",")
}
start := time.Now()
statusCode, err := retry.DoWithData(
func() (int, error) {
clonereq := req.Clone(ctx)
if external.WebhookData != "" && dataArgs != "" {
clonereq.Body = io.NopCloser(bytes.NewBufferString(dataArgs))
}
res, err := s.httpClient.Do(clonereq)
if err != nil {
return 0, errors.Wrap(err, "could not make request for webhook")
}
defer res.Body.Close()
s.log.Debug().Msgf("filter external webhook response status: %d", res.StatusCode)
if s.log.Debug().Enabled() {
body, err := io.ReadAll(res.Body)
if err != nil {
return res.StatusCode, errors.Wrap(err, "could not read request body")
}
if len(body) > 0 {
s.log.Debug().Msgf("filter external webhook response status: %d body: %s", res.StatusCode, body)
}
}
if utils.StrSliceContains(retryStatusCodes, strconv.Itoa(res.StatusCode)) {
return 0, errors.New("webhook got unwanted status code: %d", res.StatusCode)
}
return res.StatusCode, nil
},
opts...)
s.log.Debug().Msgf("successfully ran external webhook filter to: (%s) payload: (%s) finished in %s", external.WebhookHost, dataArgs, time.Since(start))
return statusCode, err
}