fix(irc): rewrite handler pipeline (#399)

* fix(irc): rewrite handler pipeline

This might be overkill but the pipeline has been made event driven. I've tested on a couple networks, and bouncing nicks / connections brings it back every time. creating the a different branch from https://github.com/autobrr/autobrr/pull/398 because it's pretty intrusive (and I didn't apply the GUI changes).

* fix(irc): couple adjustments

* fix(irc): deadlocks

* fix: add missing dependency

* fix(irc): remove more locks from connect cmds

* feat(irc): conditional monitoring message

* feat(irc): show unhealthy network in ui

* feat(irc): improve logs and comments
This commit is contained in:
Kyle Sanderson 2022-08-12 06:32:02 -07:00 committed by GitHub
parent 4c93cac248
commit 7deac6a781
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 317 additions and 281 deletions

View file

@ -4,6 +4,7 @@ import (
"context"
"strings"
"sync"
"time"
"github.com/autobrr/autobrr/internal/domain"
"github.com/autobrr/autobrr/internal/indexer"
@ -29,6 +30,9 @@ type Service interface {
}
type service struct {
stopWG sync.WaitGroup
lock sync.RWMutex
log zerolog.Logger
repo domain.IrcRepo
releaseService release.Service
@ -36,9 +40,6 @@ type service struct {
notificationService notification.Service
indexerMap map[string]string
handlers map[handlerKey]*Handler
stopWG sync.WaitGroup
lock sync.Mutex
}
func NewService(log logger.Logger, repo domain.IrcRepo, releaseSvc release.Service, indexerSvc indexer.Service, notificationSvc notification.Service) Service {
@ -91,15 +92,11 @@ func (s *service) StartHandlers() {
s.log.Debug().Msgf("starting network: %+v", network.Name)
s.stopWG.Add(1)
go func() {
go func(network domain.IrcNetwork) {
if err := handler.Run(); err != nil {
s.log.Error().Err(err).Msgf("failed to start handler for network %q", network.Name)
}
}()
s.stopWG.Done()
}(network)
}
}
@ -118,11 +115,11 @@ func (s *service) startNetwork(network domain.IrcNetwork) error {
s.log.Debug().Msgf("starting network: %+v", network.Name)
if !existingHandler.client.Connected() {
go func() {
if err := existingHandler.Run(); err != nil {
s.log.Error().Err(err).Msgf("failed to start existingHandler for network %q", existingHandler.network.Name)
go func(handler *Handler) {
if err := handler.Run(); err != nil {
s.log.Error().Err(err).Msgf("failed to start existingHandler for network %q", handler.network.Name)
}
}()
}(existingHandler)
}
} else {
// if not found in handlers, lets add it and run it
@ -145,15 +142,11 @@ func (s *service) startNetwork(network domain.IrcNetwork) error {
s.log.Debug().Msgf("starting network: %+v", network.Name)
s.stopWG.Add(1)
go func() {
go func(network domain.IrcNetwork) {
if err := handler.Run(); err != nil {
s.log.Error().Err(err).Msgf("failed to start handler for network %q", network.Name)
}
}()
s.stopWG.Done()
}(network)
}
return nil
@ -278,8 +271,7 @@ func (s *service) checkIfNetworkRestartNeeded(network *domain.IrcNetwork) error
existingHandler.InitIndexers(definitions)
}
} else {
err := s.startNetwork(*network)
if err != nil {
if err := s.startNetwork(*network); err != nil {
s.log.Error().Stack().Err(err).Msgf("failed to start network: %q", network.Name)
}
}
@ -403,24 +395,26 @@ func (s *service) GetNetworksWithHealth(ctx context.Context) ([]domain.IrcNetwor
handler, ok := s.handlers[handlerKey{n.Server, n.NickServ.Account}]
if ok {
handler.m.RLock()
// only set connected and connected since if we have an active handler and connection
if handler.client.Connected() {
handler.m.RLock()
netw.Connected = handler.connected
netw.Connected = handler.connectedSince != time.Time{}
netw.ConnectedSince = handler.connectedSince
// current and preferred nick is only available if the network is connected
netw.CurrentNick = handler.CurrentNick()
netw.PreferredNick = handler.PreferredNick()
handler.m.RUnlock()
}
netw.Healthy = handler.Healthy()
// if we have any connection errors like bad nickserv auth add them here
if len(handler.connectionErrors) > 0 {
netw.ConnectionErrors = handler.connectionErrors
}
handler.m.RUnlock()
}
channels, err := s.repo.ListChannels(n.ID)
@ -446,6 +440,7 @@ func (s *service) GetNetworksWithHealth(ctx context.Context) ([]domain.IrcNetwor
if handler != nil {
name := strings.ToLower(channel.Name)
handler.m.RLock()
chan1, ok := handler.channelHealth[name]
if ok {
chan1.m.RLock()
@ -455,12 +450,14 @@ func (s *service) GetNetworksWithHealth(ctx context.Context) ([]domain.IrcNetwor
chan1.m.RUnlock()
}
handler.m.RUnlock()
}
netw.Channels = append(netw.Channels, ch)
}
ret = append(ret, netw)
}
return ret, nil