capture add mutexes.

This commit is contained in:
Miroslav Šedivý 2021-02-05 12:49:02 +01:00
parent 2b72f43c9e
commit 3515c67045
3 changed files with 35 additions and 5 deletions

View File

@ -1,6 +1,9 @@
package capture package capture
import ( import (
"fmt"
"sync"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
@ -10,6 +13,7 @@ import (
type BroacastManagerCtx struct { type BroacastManagerCtx struct {
logger zerolog.Logger logger zerolog.Logger
mu sync.Mutex
config *config.Capture config *config.Capture
pipeline *gst.Pipeline pipeline *gst.Pipeline
enabled bool enabled bool
@ -19,6 +23,7 @@ type BroacastManagerCtx struct {
func broadcastNew(config *config.Capture) *BroacastManagerCtx { func broadcastNew(config *config.Capture) *BroacastManagerCtx {
return &BroacastManagerCtx{ return &BroacastManagerCtx{
logger: log.With().Str("module", "capture").Str("submodule", "broadcast").Logger(), logger: log.With().Str("module", "capture").Str("submodule", "broadcast").Logger(),
mu: sync.Mutex{},
config: config, config: config,
enabled: false, enabled: false,
url: "", url: "",
@ -32,6 +37,9 @@ func (manager *BroacastManagerCtx) shutdown() {
} }
func (manager *BroacastManagerCtx) Start(url string) error { func (manager *BroacastManagerCtx) Start(url string) error {
manager.mu.Lock()
defer manager.mu.Unlock()
err := manager.createPipeline() err := manager.createPipeline()
if err != nil { if err != nil {
return err return err
@ -43,6 +51,9 @@ func (manager *BroacastManagerCtx) Start(url string) error {
} }
func (manager *BroacastManagerCtx) Stop() { func (manager *BroacastManagerCtx) Stop() {
manager.mu.Lock()
defer manager.mu.Unlock()
manager.enabled = false manager.enabled = false
manager.destroyPipeline() manager.destroyPipeline()
} }
@ -56,6 +67,10 @@ func (manager *BroacastManagerCtx) Url() string {
} }
func (manager *BroacastManagerCtx) createPipeline() error { func (manager *BroacastManagerCtx) createPipeline() error {
if manager.pipeline != nil {
return fmt.Errorf("pipeline already running")
}
var err error var err error
manager.logger.Info(). manager.logger.Info().

View File

@ -107,6 +107,9 @@ func (manager *ScreencastManagerCtx) Image() ([]byte, error) {
} }
func (manager *ScreencastManagerCtx) start() error { func (manager *ScreencastManagerCtx) start() error {
manager.mu.Lock()
defer manager.mu.Unlock()
if !manager.enabled { if !manager.enabled {
return fmt.Errorf("screenshot pipeline not enabled") return fmt.Errorf("screenshot pipeline not enabled")
} }
@ -121,13 +124,17 @@ func (manager *ScreencastManagerCtx) start() error {
} }
func (manager *ScreencastManagerCtx) stop() { func (manager *ScreencastManagerCtx) stop() {
manager.mu.Lock()
defer manager.mu.Unlock()
manager.started = false manager.started = false
manager.destroyPipeline() manager.destroyPipeline()
} }
func (manager *ScreencastManagerCtx) createPipeline() error { func (manager *ScreencastManagerCtx) createPipeline() error {
manager.mu.Lock() if manager.pipeline != nil {
defer manager.mu.Unlock() return fmt.Errorf("pipeline already running")
}
var err error var err error
@ -158,9 +165,6 @@ func (manager *ScreencastManagerCtx) createPipeline() error {
} }
func (manager *ScreencastManagerCtx) destroyPipeline() { func (manager *ScreencastManagerCtx) destroyPipeline() {
manager.mu.Lock()
defer manager.mu.Unlock()
if manager.pipeline == nil { if manager.pipeline == nil {
return return
} }

View File

@ -1,6 +1,7 @@
package capture package capture
import ( import (
"fmt"
"sync" "sync"
"github.com/kataras/go-events" "github.com/kataras/go-events"
@ -76,6 +77,9 @@ func (manager *StreamManagerCtx) OnSample(listener func(sample types.Sample)) {
} }
func (manager *StreamManagerCtx) Start() error { func (manager *StreamManagerCtx) Start() error {
manager.mu.Lock()
defer manager.mu.Unlock()
err := manager.createPipeline() err := manager.createPipeline()
if err != nil { if err != nil {
return err return err
@ -86,6 +90,9 @@ func (manager *StreamManagerCtx) Start() error {
} }
func (manager *StreamManagerCtx) Stop() { func (manager *StreamManagerCtx) Stop() {
manager.mu.Lock()
defer manager.mu.Unlock()
manager.enabled = false manager.enabled = false
manager.destroyPipeline() manager.destroyPipeline()
} }
@ -95,6 +102,10 @@ func (manager *StreamManagerCtx) Enabled() bool {
} }
func (manager *StreamManagerCtx) createPipeline() error { func (manager *StreamManagerCtx) createPipeline() error {
if manager.pipeline != nil {
return fmt.Errorf("pipeline already running")
}
var err error var err error
codec := manager.Codec() codec := manager.Codec()