From c9c8cc67ca957af1788183f0efaeb225e780f219 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20=C5=A0ediv=C3=BD?= Date: Thu, 30 Sep 2021 20:06:43 +0200 Subject: [PATCH] refactor broadcast pipeline. --- internal/capture/broadcast.go | 34 +++++++++++++++++++++++++++------- 1 file changed, 27 insertions(+), 7 deletions(-) diff --git a/internal/capture/broadcast.go b/internal/capture/broadcast.go index fb9615ca..d5510ca0 100644 --- a/internal/capture/broadcast.go +++ b/internal/capture/broadcast.go @@ -12,20 +12,28 @@ import ( ) type BroacastManagerCtx struct { - logger zerolog.Logger - mu sync.Mutex - pipelineStr string + logger zerolog.Logger + mu sync.Mutex + pipeline *gst.Pipeline - started bool - url string + pipelineStr string + pipelineMu sync.Mutex + + url string + started bool } func broadcastNew(pipelineStr string) *BroacastManagerCtx { + logger := log.With(). + Str("module", "capture"). + Str("submodule", "broadcast"). + Logger() + return &BroacastManagerCtx{ - logger: log.With().Str("module", "capture").Str("submodule", "broadcast").Logger(), + logger: logger, pipelineStr: pipelineStr, - started: false, url: "", + started: false, } } @@ -58,14 +66,23 @@ func (manager *BroacastManagerCtx) Stop() { } func (manager *BroacastManagerCtx) Started() bool { + manager.mu.Lock() + defer manager.mu.Unlock() + return manager.started } func (manager *BroacastManagerCtx) Url() string { + manager.mu.Lock() + defer manager.mu.Unlock() + return manager.url } func (manager *BroacastManagerCtx) createPipeline() error { + manager.pipelineMu.Lock() + defer manager.pipelineMu.Unlock() + if manager.pipeline != nil { return types.ErrCapturePipelineAlreadyExists } @@ -89,6 +106,9 @@ func (manager *BroacastManagerCtx) createPipeline() error { } func (manager *BroacastManagerCtx) destroyPipeline() { + manager.pipelineMu.Lock() + defer manager.pipelineMu.Unlock() + if manager.pipeline == nil { return }