diff --git a/internal/capture/stream.go b/internal/capture/stream.go index 3e810a4e..96907dd9 100644 --- a/internal/capture/stream.go +++ b/internal/capture/stream.go @@ -100,13 +100,13 @@ func (manager *StreamManagerCtx) NewListener(listener *func(sample types.Sample) manager.mu.Lock() defer manager.mu.Unlock() - if manager.listenersCount == 0 { + manager.listenersCount++ + if manager.listenersCount == 1 { err := manager.createPipeline() if err != nil && !errors.Is(err, types.ErrCapturePipelineAlreadyExists) { return addListener, err } - manager.listenersCount++ manager.logger.Info().Msgf("first listener, starting") } @@ -134,11 +134,15 @@ func (manager *StreamManagerCtx) RemoveListener(listener *func(sample types.Samp manager.logger.Debug().Interface("ptr", ptr).Msgf("removing listener") + manager.mu.Lock() + manager.listenersCount-- + manager.mu.Unlock() + go func() { manager.mu.Lock() defer manager.mu.Unlock() - if manager.listenersCount == 1 { + if manager.listenersCount == 0 { manager.destroyPipeline() manager.listenersCount = 0 manager.logger.Info().Msgf("last listener, stopping")