feat(filters): external webhook retry on status codes (#1206)

* feat: external filter retry status codes

* chore: go mod tidy

* fix(database): migrations

---------

Co-authored-by: ze0s <ze0s@riseup.net>
This commit is contained in:
Steven Kreitzer 2023-10-27 10:37:57 -05:00 committed by GitHub
parent 40a1a4c014
commit 2080136669
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 330 additions and 165 deletions

View file

@ -247,6 +247,10 @@ func (r *FilterRepo) FindByID(ctx context.Context, filterID int) (*domain.Filter
"fe.webhook_data",
"fe.webhook_headers",
"fe.webhook_expect_status",
"fe.webhook_retry_status",
"fe.webhook_retry_attempts",
"fe.webhook_retry_delay_seconds",
"fe.webhook_retry_max_jitter_seconds",
).
From("filter f").
LeftJoin("filter_external fe ON f.id = fe.filter_id").
@ -276,8 +280,8 @@ func (r *FilterRepo) FindByID(ctx context.Context, filterID int) (*domain.Filter
var delay, maxDownloads, logScore sql.NullInt32
// filter external
var extName, extType, extExecCmd, extExecArgs, extWebhookHost, extWebhookMethod, extWebhookHeaders, extWebhookData sql.NullString
var extId, extIndex, extWebhookStatus, extExecStatus sql.NullInt32
var extName, extType, extExecCmd, extExecArgs, extWebhookHost, extWebhookMethod, extWebhookHeaders, extWebhookData, extWebhookRetryStatus sql.NullString
var extId, extIndex, extWebhookStatus, extWebhookRetryAttempts, extWebhookDelaySeconds, extWebhookRetryJitterSeconds, extExecStatus sql.NullInt32
var extEnabled sql.NullBool
if err := rows.Scan(
@ -354,6 +358,10 @@ func (r *FilterRepo) FindByID(ctx context.Context, filterID int) (*domain.Filter
&extWebhookData,
&extWebhookHeaders,
&extWebhookStatus,
&extWebhookRetryStatus,
&extWebhookRetryAttempts,
&extWebhookDelaySeconds,
&extWebhookRetryJitterSeconds,
); err != nil {
return nil, errors.Wrap(err, "error scanning row")
}
@ -396,19 +404,23 @@ func (r *FilterRepo) FindByID(ctx context.Context, filterID int) (*domain.Filter
if extId.Valid {
external := domain.FilterExternal{
ID: int(extId.Int32),
Name: extName.String,
Index: int(extIndex.Int32),
Type: domain.FilterExternalType(extType.String),
Enabled: extEnabled.Bool,
ExecCmd: extExecCmd.String,
ExecArgs: extExecArgs.String,
ExecExpectStatus: int(extExecStatus.Int32),
WebhookHost: extWebhookHost.String,
WebhookMethod: extWebhookMethod.String,
WebhookData: extWebhookData.String,
WebhookHeaders: extWebhookHeaders.String,
WebhookExpectStatus: int(extWebhookStatus.Int32),
ID: int(extId.Int32),
Name: extName.String,
Index: int(extIndex.Int32),
Type: domain.FilterExternalType(extType.String),
Enabled: extEnabled.Bool,
ExecCmd: extExecCmd.String,
ExecArgs: extExecArgs.String,
ExecExpectStatus: int(extExecStatus.Int32),
WebhookHost: extWebhookHost.String,
WebhookMethod: extWebhookMethod.String,
WebhookData: extWebhookData.String,
WebhookHeaders: extWebhookHeaders.String,
WebhookExpectStatus: int(extWebhookStatus.Int32),
WebhookRetryStatus: extWebhookRetryStatus.String,
WebhookRetryAttempts: int(extWebhookRetryAttempts.Int32),
WebhookRetryDelaySeconds: int(extWebhookDelaySeconds.Int32),
WebhookRetryMaxJitterSeconds: int(extWebhookRetryJitterSeconds.Int32),
}
externalMap[external.ID] = external
}
@ -502,6 +514,10 @@ func (r *FilterRepo) findByIndexerIdentifier(ctx context.Context, indexer string
"fe.webhook_data",
"fe.webhook_headers",
"fe.webhook_expect_status",
"fe.webhook_retry_status",
"fe.webhook_retry_attempts",
"fe.webhook_retry_delay_seconds",
"fe.webhook_retry_max_jitter_seconds",
"fe.filter_id",
).
From("filter f").
@ -537,8 +553,8 @@ func (r *FilterRepo) findByIndexerIdentifier(ctx context.Context, indexer string
var delay, maxDownloads, logScore sql.NullInt32
// filter external
var extName, extType, extExecCmd, extExecArgs, extWebhookHost, extWebhookMethod, extWebhookHeaders, extWebhookData sql.NullString
var extId, extIndex, extWebhookStatus, extExecStatus, extFilterId sql.NullInt32
var extName, extType, extExecCmd, extExecArgs, extWebhookHost, extWebhookMethod, extWebhookHeaders, extWebhookData, extWebhookRetryStatus sql.NullString
var extId, extIndex, extWebhookStatus, extWebhookRetryAttempts, extWebhookDelaySeconds, extWebhookRetryJitterSeconds, extExecStatus, extFilterId sql.NullInt32
var extEnabled sql.NullBool
if err := rows.Scan(
@ -615,6 +631,10 @@ func (r *FilterRepo) findByIndexerIdentifier(ctx context.Context, indexer string
&extWebhookData,
&extWebhookHeaders,
&extWebhookStatus,
&extWebhookRetryStatus,
&extWebhookRetryAttempts,
&extWebhookDelaySeconds,
&extWebhookRetryJitterSeconds,
&extFilterId,
); err != nil {
return nil, errors.Wrap(err, "error scanning row")
@ -658,20 +678,24 @@ func (r *FilterRepo) findByIndexerIdentifier(ctx context.Context, indexer string
if extId.Valid {
external := domain.FilterExternal{
ID: int(extId.Int32),
Name: extName.String,
Index: int(extIndex.Int32),
Type: domain.FilterExternalType(extType.String),
Enabled: extEnabled.Bool,
ExecCmd: extExecCmd.String,
ExecArgs: extExecArgs.String,
ExecExpectStatus: int(extExecStatus.Int32),
WebhookHost: extWebhookHost.String,
WebhookMethod: extWebhookMethod.String,
WebhookData: extWebhookData.String,
WebhookHeaders: extWebhookHeaders.String,
WebhookExpectStatus: int(extWebhookStatus.Int32),
FilterId: int(extFilterId.Int32),
ID: int(extId.Int32),
Name: extName.String,
Index: int(extIndex.Int32),
Type: domain.FilterExternalType(extType.String),
Enabled: extEnabled.Bool,
ExecCmd: extExecCmd.String,
ExecArgs: extExecArgs.String,
ExecExpectStatus: int(extExecStatus.Int32),
WebhookHost: extWebhookHost.String,
WebhookMethod: extWebhookMethod.String,
WebhookData: extWebhookData.String,
WebhookHeaders: extWebhookHeaders.String,
WebhookExpectStatus: int(extWebhookStatus.Int32),
WebhookRetryStatus: extWebhookRetryStatus.String,
WebhookRetryAttempts: int(extWebhookRetryAttempts.Int32),
WebhookRetryDelaySeconds: int(extWebhookDelaySeconds.Int32),
WebhookRetryMaxJitterSeconds: int(extWebhookRetryJitterSeconds.Int32),
FilterId: int(extFilterId.Int32),
}
externalMap[external.FilterId] = append(externalMap[external.FilterId], external)
}
@ -709,6 +733,10 @@ func (r *FilterRepo) FindExternalFiltersByID(ctx context.Context, filterId int)
"fe.webhook_data",
"fe.webhook_headers",
"fe.webhook_expect_status",
"fe.webhook_retry_status",
"fe.webhook_retry_attempts",
"fe.webhook_retry_delay_seconds",
"fe.webhook_retry_max_jitter_seconds",
).
From("filter_external fe").
Where(sq.Eq{"fe.filter_id": filterId})
@ -732,8 +760,8 @@ func (r *FilterRepo) FindExternalFiltersByID(ctx context.Context, filterId int)
var external domain.FilterExternal
// filter external
var extExecCmd, extExecArgs, extWebhookHost, extWebhookMethod, extWebhookHeaders, extWebhookData sql.NullString
var extWebhookStatus, extExecStatus sql.NullInt32
var extExecCmd, extExecArgs, extWebhookHost, extWebhookMethod, extWebhookHeaders, extWebhookData, extWebhookRetryStatus sql.NullString
var extWebhookStatus, extWebhookRetryAttempts, extWebhookDelaySeconds, extWebhookRetryJitterSeconds, extExecStatus sql.NullInt32
if err := rows.Scan(
&external.ID,
@ -749,6 +777,10 @@ func (r *FilterRepo) FindExternalFiltersByID(ctx context.Context, filterId int)
&extWebhookData,
&extWebhookHeaders,
&extWebhookStatus,
&extWebhookRetryStatus,
&extWebhookRetryAttempts,
&extWebhookDelaySeconds,
&extWebhookRetryJitterSeconds,
); err != nil {
return nil, errors.Wrap(err, "error scanning row")
}
@ -762,6 +794,10 @@ func (r *FilterRepo) FindExternalFiltersByID(ctx context.Context, filterId int)
external.WebhookData = extWebhookData.String
external.WebhookHeaders = extWebhookHeaders.String
external.WebhookExpectStatus = int(extWebhookStatus.Int32)
external.WebhookRetryStatus = extWebhookRetryStatus.String
external.WebhookRetryAttempts = int(extWebhookRetryAttempts.Int32)
external.WebhookRetryDelaySeconds = int(extWebhookDelaySeconds.Int32)
external.WebhookRetryMaxJitterSeconds = int(extWebhookRetryJitterSeconds.Int32)
externalFilters = append(externalFilters, external)
}
@ -1182,6 +1218,18 @@ func (r *FilterRepo) UpdatePartial(ctx context.Context, filter domain.FilterUpda
if filter.ExternalWebhookExpectStatus != nil {
q = q.Set("external_webhook_expect_status", filter.ExternalWebhookExpectStatus)
}
if filter.ExternalWebhookRetryStatus != nil {
q = q.Set("external_webhook_retry_status", filter.ExternalWebhookRetryStatus)
}
if filter.ExternalWebhookRetryAttempts != nil {
q = q.Set("external_webhook_retry_attempts", filter.ExternalWebhookRetryAttempts)
}
if filter.ExternalWebhookRetryDelaySeconds != nil {
q = q.Set("external_webhook_retry_delay_seconds", filter.ExternalWebhookRetryDelaySeconds)
}
if filter.ExternalWebhookRetryMaxJitterSeconds != nil {
q = q.Set("external_webhook_retry_max_jitter_seconds", filter.ExternalWebhookRetryMaxJitterSeconds)
}
q = q.Where(sq.Eq{"id": filter.ID})
@ -1462,6 +1510,10 @@ func (r *FilterRepo) StoreFilterExternal(ctx context.Context, filterID int, exte
"webhook_data",
"webhook_headers",
"webhook_expect_status",
"webhook_retry_status",
"webhook_retry_attempts",
"webhook_retry_delay_seconds",
"webhook_retry_max_jitter_seconds",
"filter_id",
)
@ -1479,6 +1531,10 @@ func (r *FilterRepo) StoreFilterExternal(ctx context.Context, filterID int, exte
toNullString(external.WebhookData),
toNullString(external.WebhookHeaders),
toNullInt32(int32(external.WebhookExpectStatus)),
toNullString(external.WebhookRetryStatus),
toNullInt32(int32(external.WebhookRetryAttempts)),
toNullInt32(int32(external.WebhookRetryDelaySeconds)),
toNullInt32(int32(external.WebhookRetryMaxJitterSeconds)),
filterID,
)
}