diff --git a/internal/capture/broadcast.go b/internal/capture/broadcast.go index 10ad3574..d481b07c 100644 --- a/internal/capture/broadcast.go +++ b/internal/capture/broadcast.go @@ -1,6 +1,9 @@ package capture import ( + "fmt" + "sync" + "github.com/rs/zerolog" "github.com/rs/zerolog/log" @@ -10,6 +13,7 @@ import ( type BroacastManagerCtx struct { logger zerolog.Logger + mu sync.Mutex config *config.Capture pipeline *gst.Pipeline enabled bool @@ -19,6 +23,7 @@ type BroacastManagerCtx struct { func broadcastNew(config *config.Capture) *BroacastManagerCtx { return &BroacastManagerCtx{ logger: log.With().Str("module", "capture").Str("submodule", "broadcast").Logger(), + mu: sync.Mutex{}, config: config, enabled: false, url: "", @@ -32,6 +37,9 @@ func (manager *BroacastManagerCtx) shutdown() { } func (manager *BroacastManagerCtx) Start(url string) error { + manager.mu.Lock() + defer manager.mu.Unlock() + err := manager.createPipeline() if err != nil { return err @@ -43,6 +51,9 @@ func (manager *BroacastManagerCtx) Start(url string) error { } func (manager *BroacastManagerCtx) Stop() { + manager.mu.Lock() + defer manager.mu.Unlock() + manager.enabled = false manager.destroyPipeline() } @@ -56,6 +67,10 @@ func (manager *BroacastManagerCtx) Url() string { } func (manager *BroacastManagerCtx) createPipeline() error { + if manager.pipeline != nil { + return fmt.Errorf("pipeline already running") + } + var err error manager.logger.Info(). diff --git a/internal/capture/screencast.go b/internal/capture/screencast.go index c092d3c4..bbac4766 100644 --- a/internal/capture/screencast.go +++ b/internal/capture/screencast.go @@ -107,6 +107,9 @@ func (manager *ScreencastManagerCtx) Image() ([]byte, error) { } func (manager *ScreencastManagerCtx) start() error { + manager.mu.Lock() + defer manager.mu.Unlock() + if !manager.enabled { return fmt.Errorf("screenshot pipeline not enabled") } @@ -121,13 +124,17 @@ func (manager *ScreencastManagerCtx) start() error { } func (manager *ScreencastManagerCtx) stop() { + manager.mu.Lock() + defer manager.mu.Unlock() + manager.started = false manager.destroyPipeline() } func (manager *ScreencastManagerCtx) createPipeline() error { - manager.mu.Lock() - defer manager.mu.Unlock() + if manager.pipeline != nil { + return fmt.Errorf("pipeline already running") + } var err error @@ -158,9 +165,6 @@ func (manager *ScreencastManagerCtx) createPipeline() error { } func (manager *ScreencastManagerCtx) destroyPipeline() { - manager.mu.Lock() - defer manager.mu.Unlock() - if manager.pipeline == nil { return } diff --git a/internal/capture/stream.go b/internal/capture/stream.go index e390377d..b374b032 100644 --- a/internal/capture/stream.go +++ b/internal/capture/stream.go @@ -1,6 +1,7 @@ package capture import ( + "fmt" "sync" "github.com/kataras/go-events" @@ -76,6 +77,9 @@ func (manager *StreamManagerCtx) OnSample(listener func(sample types.Sample)) { } func (manager *StreamManagerCtx) Start() error { + manager.mu.Lock() + defer manager.mu.Unlock() + err := manager.createPipeline() if err != nil { return err @@ -86,6 +90,9 @@ func (manager *StreamManagerCtx) Start() error { } func (manager *StreamManagerCtx) Stop() { + manager.mu.Lock() + defer manager.mu.Unlock() + manager.enabled = false manager.destroyPipeline() } @@ -95,6 +102,10 @@ func (manager *StreamManagerCtx) Enabled() bool { } func (manager *StreamManagerCtx) createPipeline() error { + if manager.pipeline != nil { + return fmt.Errorf("pipeline already running") + } + var err error codec := manager.Codec()