feat(filters): add support for multiple external filters (#1030)

* feat(filters): add support for multiple ext filters

* refactor(filters): crud and check

* feat(filters): add postgres migrations

* fix(filters): field array types

* fix(filters): formatting

* fix(filters): formatting

* feat(filters): external webhook improve logs
This commit is contained in:
ze0s 2023-08-15 23:07:39 +02:00 committed by GitHub
parent db209319da
commit dde0d0ed61
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 1514 additions and 478 deletions

View file

@ -8,9 +8,11 @@ import (
"context"
"crypto/tls"
"fmt"
"io"
"net/http"
"os"
"os/exec"
"sort"
"strings"
"time"
@ -134,13 +136,9 @@ func (s *service) FindByID(ctx context.Context, filterID int) (*domain.Filter, e
func (s *service) FindByIndexerIdentifier(ctx context.Context, indexer string) ([]domain.Filter, error) {
// get filters for indexer
filters, err := s.repo.FindByIndexerIdentifier(ctx, indexer)
if err != nil {
s.log.Error().Err(err).Msgf("could not find filters for indexer: %v", indexer)
return nil, err
}
return filters, nil
// we do not load actions here since we do not need it at this stage
// only load those after filter has matched
return s.repo.FindByIndexerIdentifier(ctx, indexer)
}
func (s *service) GetDownloadsByFilterId(ctx context.Context, filterID int) (*domain.FilterDownloads, error) {
@ -169,20 +167,26 @@ func (s *service) Update(ctx context.Context, filter domain.Filter) (*domain.Fil
// update
f, err := s.repo.Update(ctx, filter)
if err != nil {
s.log.Error().Err(err).Msgf("could not update filter: %v", filter.Name)
s.log.Error().Err(err).Msgf("could not update filter: %s", filter.Name)
return nil, err
}
// take care of connected indexers
if err = s.repo.StoreIndexerConnections(ctx, f.ID, filter.Indexers); err != nil {
s.log.Error().Err(err).Msgf("could not store filter indexer connections: %v", filter.Name)
s.log.Error().Err(err).Msgf("could not store filter indexer connections: %s", filter.Name)
return nil, err
}
// take care of connected external filters
if err = s.repo.StoreFilterExternal(ctx, f.ID, filter.External); err != nil {
s.log.Error().Err(err).Msgf("could not store external filters: %s", filter.Name)
return nil, err
}
// take care of filter actions
actions, err := s.actionRepo.StoreFilterActions(ctx, filter.Actions, int64(filter.ID))
if err != nil {
s.log.Error().Err(err).Msgf("could not store filter actions: %v", filter.Name)
s.log.Error().Err(err).Msgf("could not store filter actions: %s", filter.Name)
return nil, err
}
@ -313,8 +317,8 @@ func (s *service) Delete(ctx context.Context, filterID int) error {
func (s *service) CheckFilter(ctx context.Context, f domain.Filter, release *domain.Release) (bool, error) {
s.log.Trace().Msgf("filter.Service.CheckFilter: checking filter: %v %+v", f.Name, f)
s.log.Trace().Msgf("filter.Service.CheckFilter: checking filter: %v for release: %+v", f.Name, release)
s.log.Trace().Msgf("filter.Service.CheckFilter: checking filter: %s %+v", f.Name, f)
s.log.Trace().Msgf("filter.Service.CheckFilter: checking filter: %s for release: %+v", f.Name, release)
// do additional fetch to get download counts for filter
if f.MaxDownloads > 0 {
@ -328,7 +332,7 @@ func (s *service) CheckFilter(ctx context.Context, f domain.Filter, release *dom
rejections, matchedFilter := f.CheckFilter(release)
if len(rejections) > 0 {
s.log.Debug().Msgf("filter.Service.CheckFilter: (%v) for release: %v rejections: (%v)", f.Name, release.TorrentName, release.RejectionsString(true))
s.log.Debug().Msgf("filter.Service.CheckFilter: (%s) for release: %v rejections: (%s)", f.Name, release.TorrentName, release.RejectionsString(true))
return false, nil
}
@ -350,7 +354,7 @@ func (s *service) CheckFilter(ctx context.Context, f domain.Filter, release *dom
// if matched, do additional size check if needed, attach actions and return the filter
s.log.Debug().Msgf("filter.Service.CheckFilter: found and matched filter: %+v", f.Name)
s.log.Debug().Msgf("filter.Service.CheckFilter: found and matched filter: %s", f.Name)
// Some indexers do not announce the size and if size (min,max) is set in a filter then it will need
// additional size check. Some indexers have api implemented to fetch this data and for the others
@ -358,65 +362,34 @@ func (s *service) CheckFilter(ctx context.Context, f domain.Filter, release *dom
// do additional size check against indexer api or download torrent for size check
if release.AdditionalSizeCheckRequired {
s.log.Debug().Msgf("filter.Service.CheckFilter: (%v) additional size check required", f.Name)
s.log.Debug().Msgf("filter.Service.CheckFilter: (%s) additional size check required", f.Name)
ok, err := s.AdditionalSizeCheck(ctx, f, release)
if err != nil {
s.log.Error().Stack().Err(err).Msgf("filter.Service.CheckFilter: (%v) additional size check error", f.Name)
s.log.Error().Err(err).Msgf("filter.Service.CheckFilter: (%s) additional size check error", f.Name)
return false, err
}
if !ok {
s.log.Trace().Msgf("filter.Service.CheckFilter: (%v) additional size check not matching what filter wanted", f.Name)
s.log.Trace().Msgf("filter.Service.CheckFilter: (%s) additional size check not matching what filter wanted", f.Name)
return false, nil
}
}
// run external script
if f.ExternalScriptEnabled && f.ExternalScriptCmd != "" {
exitCode, err := s.execCmd(ctx, release, f.ExternalScriptCmd, f.ExternalScriptArgs)
// run external filters
if f.External != nil {
externalOk, err := s.RunExternalFilters(ctx, f.External, release)
if err != nil {
s.log.Error().Err(err).Msgf("filter.Service.CheckFilter: error executing external command for filter: %+v", f.Name)
s.log.Error().Err(err).Msgf("filter.Service.CheckFilter: (%s) external filter check error", f.Name)
return false, err
}
if exitCode != f.ExternalScriptExpectStatus {
s.log.Trace().Msgf("filter.Service.CheckFilter: external script unexpected exit code. got: %v want: %v", exitCode, f.ExternalScriptExpectStatus)
release.AddRejectionF("external script unexpected exit code. got: %v want: %v", exitCode, f.ExternalScriptExpectStatus)
if !externalOk {
s.log.Trace().Msgf("filter.Service.CheckFilter: (%s) additional size check not matching what filter wanted", f.Name)
return false, nil
}
}
// run external webhook
if f.ExternalWebhookEnabled && f.ExternalWebhookHost != "" && f.ExternalWebhookData != "" {
// run external scripts
statusCode, err := s.webhook(ctx, release, f.ExternalWebhookHost, f.ExternalWebhookData)
if err != nil {
s.log.Error().Err(err).Msgf("filter.Service.CheckFilter: error executing external webhook for filter: %v", f.Name)
return false, err
}
if statusCode != f.ExternalWebhookExpectStatus {
s.log.Trace().Msgf("filter.Service.CheckFilter: external webhook unexpected status code. got: %v want: %v", statusCode, f.ExternalWebhookExpectStatus)
release.AddRejectionF("external webhook unexpected status code. got: %v want: %v", statusCode, f.ExternalWebhookExpectStatus)
return false, nil
}
}
// found matching filter, lets find the filter actions and attach
actions, err := s.actionRepo.FindByFilterID(ctx, f.ID)
if err != nil {
s.log.Error().Err(err).Msgf("filter.Service.CheckFilter: error finding actions for filter: %+v", f.Name)
return false, err
}
// if no actions, continue to next filter
if len(actions) == 0 {
s.log.Trace().Msgf("filter.Service.CheckFilter: no actions found for filter '%v', trying next one..", f.Name)
return false, nil
}
release.Filter.Actions = actions
return true, nil
}
@ -515,12 +488,64 @@ func (s *service) CanDownloadShow(ctx context.Context, release *domain.Release)
return s.releaseRepo.CanDownloadShow(ctx, release.Title, release.Season, release.Episode)
}
func (s *service) execCmd(ctx context.Context, release *domain.Release, cmd string, args string) (int, error) {
s.log.Debug().Msgf("filter exec release: %v", release.TorrentName)
func (s *service) RunExternalFilters(ctx context.Context, externalFilters []domain.FilterExternal, release *domain.Release) (bool, error) {
var err error
if release.TorrentTmpFile == "" && strings.Contains(args, "TorrentPathName") {
defer func() {
// try recover panic if anything went wrong with the external filter checks
errors.RecoverPanic(recover(), &err)
}()
// sort filters by index
sort.Slice(externalFilters, func(i, j int) bool {
return externalFilters[i].Index < externalFilters[j].Index
})
for _, external := range externalFilters {
if !external.Enabled {
s.log.Debug().Msgf("external filter %s not enabled, skipping...", external.Name)
continue
}
switch external.Type {
case domain.ExternalFilterTypeExec:
// run external script
exitCode, err := s.execCmd(ctx, external, release)
if err != nil {
return false, errors.Wrap(err, "error executing external command")
}
if exitCode != external.ExecExpectStatus {
s.log.Trace().Msgf("filter.Service.CheckFilter: external script unexpected exit code. got: %d want: %d", exitCode, external.ExecExpectStatus)
release.AddRejectionF("external script unexpected exit code. got: %d want: %d", exitCode, external.ExecExpectStatus)
return false, nil
}
case domain.ExternalFilterTypeWebhook:
// run external webhook
statusCode, err := s.webhook(ctx, external, release)
if err != nil {
return false, errors.Wrap(err, "error executing external webhook")
}
if statusCode != external.WebhookExpectStatus {
s.log.Trace().Msgf("filter.Service.CheckFilter: external webhook unexpected status code. got: %d want: %d", statusCode, external.WebhookExpectStatus)
release.AddRejectionF("external webhook unexpected status code. got: %d want: %d", statusCode, external.WebhookExpectStatus)
return false, nil
}
}
}
return false, nil
}
func (s *service) execCmd(ctx context.Context, external domain.FilterExternal, release *domain.Release) (int, error) {
s.log.Trace().Msgf("filter exec release: %s", release.TorrentName)
if release.TorrentTmpFile == "" && strings.Contains(external.ExecArgs, "TorrentPathName") {
if err := release.DownloadTorrentFileCtx(ctx); err != nil {
return 0, errors.Wrap(err, "error downloading torrent file for release: %v", release.TorrentName)
return 0, errors.Wrap(err, "error downloading torrent file for release: %s", release.TorrentName)
}
}
@ -528,23 +553,23 @@ func (s *service) execCmd(ctx context.Context, release *domain.Release, cmd stri
if len(release.TorrentDataRawBytes) == 0 && release.TorrentTmpFile != "" {
t, err := os.ReadFile(release.TorrentTmpFile)
if err != nil {
return 0, errors.Wrap(err, "could not read torrent file: %v", release.TorrentTmpFile)
return 0, errors.Wrap(err, "could not read torrent file: %s", release.TorrentTmpFile)
}
release.TorrentDataRawBytes = t
}
// check if program exists
cmd, err := exec.LookPath(cmd)
cmd, err := exec.LookPath(external.ExecCmd)
if err != nil {
return 0, errors.Wrap(err, "exec failed, could not find program: %v", cmd)
return 0, errors.Wrap(err, "exec failed, could not find program: %s", cmd)
}
// handle args and replace vars
m := domain.NewMacro(*release)
// parse and replace values in argument string before continuing
parsedArgs, err := m.Parse(args)
parsedArgs, err := m.Parse(external.ExecArgs)
if err != nil {
return 0, errors.Wrap(err, "could not parse macro")
}
@ -565,29 +590,33 @@ func (s *service) execCmd(ctx context.Context, release *domain.Release, cmd stri
err = command.Run()
var exitErr *exec.ExitError
if errors.As(err, &exitErr) {
s.log.Debug().Msgf("filter script command exited with non zero code: %v", exitErr.ExitCode())
s.log.Debug().Msgf("filter script command exited with non zero code: %d", exitErr.ExitCode())
return exitErr.ExitCode(), nil
}
duration := time.Since(start)
s.log.Debug().Msgf("executed external script: (%v), args: (%v) for release: (%v) indexer: (%v) total time (%v)", cmd, args, release.TorrentName, release.Indexer, duration)
s.log.Debug().Msgf("executed external script: (%s), args: (%s) for release: (%s) indexer: (%s) total time (%s)", cmd, external.ExecArgs, release.TorrentName, release.Indexer, duration)
return 0, nil
}
func (s *service) webhook(ctx context.Context, release *domain.Release, url string, data string) (int, error) {
s.log.Debug().Msgf("preparing to run external webhook filter to: (%s) payload: (%s)", url, data)
func (s *service) webhook(ctx context.Context, external domain.FilterExternal, release *domain.Release) (int, error) {
s.log.Trace().Msgf("preparing to run external webhook filter to: (%s) payload: (%s)", external.WebhookHost, external.WebhookData)
if external.WebhookHost == "" {
return 0, errors.New("external filter: missing host for webhook")
}
// if webhook data contains TorrentPathName or TorrentDataRawBytes, lets download the torrent file
if release.TorrentTmpFile == "" && (strings.Contains(data, "TorrentPathName") || strings.Contains(data, "TorrentDataRawBytes")) {
if release.TorrentTmpFile == "" && (strings.Contains(external.WebhookData, "TorrentPathName") || strings.Contains(external.WebhookData, "TorrentDataRawBytes")) {
if err := release.DownloadTorrentFileCtx(ctx); err != nil {
return 0, errors.Wrap(err, "webhook: could not download torrent file for release: %s", release.TorrentName)
}
}
// if webhook data contains TorrentDataRawBytes, lets read the file into bytes we can then use in the macro
if len(release.TorrentDataRawBytes) == 0 && strings.Contains(data, "TorrentDataRawBytes") {
if len(release.TorrentDataRawBytes) == 0 && strings.Contains(external.WebhookData, "TorrentDataRawBytes") {
t, err := os.ReadFile(release.TorrentTmpFile)
if err != nil {
return 0, errors.Wrap(err, "could not read torrent file: %s", release.TorrentTmpFile)
@ -599,12 +628,12 @@ func (s *service) webhook(ctx context.Context, release *domain.Release, url stri
m := domain.NewMacro(*release)
// parse and replace values in argument string before continuing
dataArgs, err := m.Parse(data)
dataArgs, err := m.Parse(external.WebhookData)
if err != nil {
return 0, errors.Wrap(err, "could not parse webhook data macro: %s", data)
return 0, errors.Wrap(err, "could not parse webhook data macro: %s", external.WebhookData)
}
s.log.Debug().Msgf("sending POST to external webhook filter: (%s) payload: (%s)", url, data)
s.log.Trace().Msgf("sending %s to external webhook filter: (%s) payload: (%s)", external.WebhookMethod, external.WebhookHost, external.WebhookData)
t := &http.Transport{
TLSClientConfig: &tls.Config{
@ -614,14 +643,41 @@ func (s *service) webhook(ctx context.Context, release *domain.Release, url stri
client := http.Client{Transport: t, Timeout: 120 * time.Second}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewBufferString(dataArgs))
method := http.MethodPost
if external.WebhookMethod != "" {
method = external.WebhookMethod
}
req, err := http.NewRequestWithContext(ctx, method, external.WebhookHost, nil)
if err != nil {
return 0, errors.Wrap(err, "could not build request for webhook")
}
if external.WebhookData != "" && dataArgs != "" {
req, err = http.NewRequestWithContext(ctx, method, external.WebhookHost, bytes.NewBufferString(dataArgs))
if err != nil {
return 0, errors.Wrap(err, "could not build request for webhook")
}
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("User-Agent", "autobrr")
if external.WebhookHeaders != "" {
headers := strings.Split(external.WebhookHeaders, ";")
for _, header := range headers {
h := strings.Split(header, "=")
if len(h) != 2 {
continue
}
// add header to req
req.Header.Add(http.CanonicalHeaderKey(h[0]), h[1])
}
}
start := time.Now()
res, err := client.Do(req)
@ -631,11 +687,16 @@ func (s *service) webhook(ctx context.Context, release *domain.Release, url stri
defer res.Body.Close()
if res.StatusCode > 299 {
return res.StatusCode, nil
body, err := io.ReadAll(res.Body)
if err != nil {
return 0, errors.Wrap(err, "could not read request body")
}
s.log.Debug().Msgf("successfully ran external webhook filter to: (%s) payload: (%s) finished in %s", url, dataArgs, time.Since(start))
if len(body) > 0 {
s.log.Debug().Msgf("filter external webhook response status: %d body: %s", res.StatusCode, body)
}
s.log.Debug().Msgf("successfully ran external webhook filter to: (%s) payload: (%s) finished in %s", external.WebhookHost, dataArgs, time.Since(start))
return res.StatusCode, nil
}