From 550084e195b00a2ac2da7fd680dd9b01a4889366 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20=C5=A0ediv=C3=BD?= Date: Thu, 30 Sep 2021 20:01:26 +0200 Subject: [PATCH] refactor screencast pipeline. --- internal/capture/screencast.go | 72 ++++++++++++++++++++++------------ 1 file changed, 47 insertions(+), 25 deletions(-) diff --git a/internal/capture/screencast.go b/internal/capture/screencast.go index 36222b17..394f0abf 100644 --- a/internal/capture/screencast.go +++ b/internal/capture/screencast.go @@ -13,32 +13,41 @@ import ( "demodesk/neko/internal/types" ) -type ScreencastManagerCtx struct { - logger zerolog.Logger - mu sync.Mutex - wg sync.WaitGroup - pipelineStr string - pipeline *gst.Pipeline - enabled bool - started bool - emitStop chan bool - emitUpdate chan bool - expired int32 - sample chan types.Sample - image types.Sample -} - // timeout between intervals, when screencast pipeline is checked const screencastTimeout = 5 * time.Second +type ScreencastManagerCtx struct { + logger zerolog.Logger + mu sync.Mutex + wg sync.WaitGroup + + pipeline *gst.Pipeline + pipelineStr string + pipelineMu sync.Mutex + + image types.Sample + sample chan types.Sample + sampleStop chan interface{} + sampleUpdate chan interface{} + + enabled bool + started bool + expired int32 +} + func screencastNew(enabled bool, pipelineStr string) *ScreencastManagerCtx { + logger := log.With(). + Str("module", "capture"). + Str("submodule", "screencast"). + Logger() + manager := &ScreencastManagerCtx{ - logger: log.With().Str("module", "capture").Str("submodule", "screencast").Logger(), - pipelineStr: pipelineStr, - enabled: enabled, - started: false, - emitStop: make(chan bool), - emitUpdate: make(chan bool), + logger: logger, + pipelineStr: pipelineStr, + sampleStop: make(chan interface{}), + sampleUpdate: make(chan interface{}), + enabled: enabled, + started: false, } manager.wg.Add(1) @@ -52,10 +61,10 @@ func screencastNew(enabled bool, pipelineStr string) *ScreencastManagerCtx { for { select { - case <-manager.emitStop: + case <-manager.sampleStop: manager.logger.Debug().Msg("stopped emitting samples") return - case <-manager.emitUpdate: + case <-manager.sampleUpdate: manager.logger.Debug().Msg("update emitting samples") case sample := <-manager.sample: manager.image = sample @@ -75,15 +84,21 @@ func (manager *ScreencastManagerCtx) shutdown() { manager.destroyPipeline() - manager.emitStop <- true + close(manager.sampleStop) manager.wg.Wait() } func (manager *ScreencastManagerCtx) Enabled() bool { + manager.mu.Lock() + defer manager.mu.Unlock() + return manager.enabled } func (manager *ScreencastManagerCtx) Started() bool { + manager.mu.Lock() + defer manager.mu.Unlock() + return manager.started } @@ -137,6 +152,9 @@ func (manager *ScreencastManagerCtx) stop() { } func (manager *ScreencastManagerCtx) createPipeline() error { + manager.pipelineMu.Lock() + defer manager.pipelineMu.Unlock() + if manager.pipeline != nil { return types.ErrCapturePipelineAlreadyExists } @@ -153,12 +171,16 @@ func (manager *ScreencastManagerCtx) createPipeline() error { } manager.pipeline.Start() + manager.sample = manager.pipeline.Sample - manager.emitUpdate <- true + manager.sampleUpdate <- struct{}{} return nil } func (manager *ScreencastManagerCtx) destroyPipeline() { + manager.pipelineMu.Lock() + defer manager.pipelineMu.Unlock() + if manager.pipeline == nil { return }