fix: announce parse queue (#50)

This commit is contained in:
Ludvig Lundgren 2021-12-25 17:44:50 +01:00 committed by GitHub
parent 709599104b
commit bc064e15e2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 12 additions and 7 deletions

View file

@ -1,4 +1,4 @@
package irc
package announce
import (
"bytes"
@ -50,6 +50,7 @@ func (a *announceProcessor) setupQueues() {
channel = strings.ToLower(channel)
queues[channel] = make(chan string, 128)
log.Trace().Msgf("announce: setup queue: %v", channel)
}
a.queues = queues
@ -58,7 +59,9 @@ func (a *announceProcessor) setupQueues() {
func (a *announceProcessor) setupQueueConsumers() {
for queueName, queue := range a.queues {
go func(name string, q chan string) {
log.Trace().Msgf("announce: setup queue consumer: %v", name)
a.processQueue(q)
log.Trace().Msgf("announce: queue consumer stopped: %v", name)
}(queueName, queue)
}
}
@ -95,6 +98,7 @@ func (a *announceProcessor) processQueue(queue chan string) {
}
if parseFailed {
log.Trace().Msg("announce: parse failed")
continue
}
@ -151,12 +155,12 @@ func (a *announceProcessor) processQueue(queue chan string) {
log.Info().Msgf("Matched '%v' (%v) for %v", newRelease.TorrentName, newRelease.Filter.Name, newRelease.Indexer)
// process release
go func() {
err = a.releaseSvc.Process(*newRelease)
go func(rel *domain.Release) {
err = a.releaseSvc.Process(*rel)
if err != nil {
log.Error().Err(err).Msgf("could not process release: %+v", newRelease)
}
}()
}(newRelease)
}
}

View file

@ -10,6 +10,7 @@ import (
"strings"
"time"
"github.com/autobrr/autobrr/internal/announce"
"github.com/autobrr/autobrr/internal/domain"
"github.com/autobrr/autobrr/internal/filter"
"github.com/autobrr/autobrr/internal/release"
@ -34,7 +35,7 @@ type Handler struct {
network *domain.IrcNetwork
filterService filter.Service
releaseService release.Service
announceProcessors map[string]Processor
announceProcessors map[string]announce.Processor
definitions []domain.IndexerDefinition
client *irc.Client
@ -60,7 +61,7 @@ func NewHandler(network domain.IrcNetwork, filterService filter.Service, release
filterService: filterService,
releaseService: releaseService,
definitions: definitions,
announceProcessors: map[string]Processor{},
announceProcessors: map[string]announce.Processor{},
validAnnouncers: map[string]struct{}{},
}
@ -71,7 +72,7 @@ func NewHandler(network domain.IrcNetwork, filterService filter.Service, release
for _, channel := range definition.IRC.Channels {
channel = strings.ToLower(channel)
h.announceProcessors[channel] = NewAnnounceProcessor(definition, filterService, releaseService)
h.announceProcessors[channel] = announce.NewAnnounceProcessor(definition, filterService, releaseService)
}
// create map of valid announcers