stream fix listener counts.

This commit is contained in:
Miroslav Šedivý 2021-09-27 01:17:25 +02:00
parent 9d4d5766ef
commit 5bb2da2732

View File

@ -100,13 +100,13 @@ func (manager *StreamManagerCtx) NewListener(listener *func(sample types.Sample)
manager.mu.Lock() manager.mu.Lock()
defer manager.mu.Unlock() defer manager.mu.Unlock()
if manager.listenersCount == 0 { manager.listenersCount++
if manager.listenersCount == 1 {
err := manager.createPipeline() err := manager.createPipeline()
if err != nil && !errors.Is(err, types.ErrCapturePipelineAlreadyExists) { if err != nil && !errors.Is(err, types.ErrCapturePipelineAlreadyExists) {
return addListener, err return addListener, err
} }
manager.listenersCount++
manager.logger.Info().Msgf("first listener, starting") manager.logger.Info().Msgf("first listener, starting")
} }
@ -134,11 +134,15 @@ func (manager *StreamManagerCtx) RemoveListener(listener *func(sample types.Samp
manager.logger.Debug().Interface("ptr", ptr).Msgf("removing listener") manager.logger.Debug().Interface("ptr", ptr).Msgf("removing listener")
manager.mu.Lock()
manager.listenersCount--
manager.mu.Unlock()
go func() { go func() {
manager.mu.Lock() manager.mu.Lock()
defer manager.mu.Unlock() defer manager.mu.Unlock()
if manager.listenersCount == 1 { if manager.listenersCount == 0 {
manager.destroyPipeline() manager.destroyPipeline()
manager.listenersCount = 0 manager.listenersCount = 0
manager.logger.Info().Msgf("last listener, stopping") manager.logger.Info().Msgf("last listener, stopping")