From f32e7e707553a2af46681bf401d9dac6afaa1b30 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20=C5=A0ediv=C3=BD?= Date: Mon, 14 Feb 2022 17:41:47 +0000 Subject: [PATCH] fix-races. --- internal/capture/screencast.go | 16 +++++++++++++--- internal/capture/streamsink.go | 3 ++- internal/webrtc/cursor/image.go | 27 ++++++++++++++++----------- internal/webrtc/cursor/position.go | 22 +++++++++++++--------- 4 files changed, 44 insertions(+), 24 deletions(-) diff --git a/internal/capture/screencast.go b/internal/capture/screencast.go index 657133e3..e73705d8 100644 --- a/internal/capture/screencast.go +++ b/internal/capture/screencast.go @@ -26,6 +26,7 @@ type ScreencastManagerCtx struct { pipelineMu sync.Mutex image types.Sample + imageMu sync.Mutex tickerStop chan struct{} enabled bool @@ -101,6 +102,9 @@ func (manager *ScreencastManagerCtx) Image() ([]byte, error) { return nil, err } + manager.imageMu.Lock() + defer manager.imageMu.Unlock() + if manager.image.Data == nil { return nil, errors.New("image data not found") } @@ -156,30 +160,36 @@ func (manager *ScreencastManagerCtx) createPipeline() error { manager.pipeline.Play() // get first image - var ok bool select { - case manager.image, ok = <-manager.pipeline.Sample: + case image, ok := <-manager.pipeline.Sample: if !ok { return errors.New("unable to get first image") + } else { + manager.imageMu.Lock() + manager.image = image + manager.imageMu.Unlock() } case <-time.After(1 * time.Second): return errors.New("timeouted while waiting for first image") } manager.wg.Add(1) + pipeline := manager.pipeline go func() { manager.logger.Debug().Msg("started receiving images") defer manager.wg.Done() for { - image, ok := <-manager.pipeline.Sample + image, ok := <-pipeline.Sample if !ok { manager.logger.Debug().Msg("stopped receiving images") return } + manager.imageMu.Lock() manager.image = image + manager.imageMu.Unlock() } }() diff --git a/internal/capture/streamsink.go b/internal/capture/streamsink.go index d012611f..c5e3c9dc 100644 --- a/internal/capture/streamsink.go +++ b/internal/capture/streamsink.go @@ -219,13 +219,14 @@ func (manager *StreamSinkManagerCtx) createPipeline() error { manager.pipeline.Play() manager.wg.Add(1) + pipeline := manager.pipeline go func() { manager.logger.Debug().Msg("started emitting samples") defer manager.wg.Done() for { - sample, ok := <-manager.pipeline.Sample + sample, ok := <-pipeline.Sample if !ok { manager.logger.Debug().Msg("stopped emitting samples") return diff --git a/internal/webrtc/cursor/image.go b/internal/webrtc/cursor/image.go index 3b151152..fe6d203c 100644 --- a/internal/webrtc/cursor/image.go +++ b/internal/webrtc/cursor/image.go @@ -22,12 +22,14 @@ func NewImage(desktop types.DesktopManager) *ImageCtx { } type ImageCtx struct { - logger zerolog.Logger - desktop types.DesktopManager - emitMu sync.Mutex - listeners map[uintptr]*func(entry *ImageEntry) - cacheMu sync.Mutex + logger zerolog.Logger + desktop types.DesktopManager + + listeners map[uintptr]*func(entry *ImageEntry) + listenersMu sync.Mutex + cache map[uint64]*ImageEntry + cacheMu sync.Mutex current *ImageEntry maxSerial uint64 } @@ -46,9 +48,12 @@ func (manager *ImageCtx) Start() { } manager.current = entry + + manager.listenersMu.Lock() for _, emit := range manager.listeners { (*emit)(entry) } + manager.listenersMu.Unlock() }) manager.logger.Info().Msg("starting") @@ -57,11 +62,11 @@ func (manager *ImageCtx) Start() { func (manager *ImageCtx) Shutdown() { manager.logger.Info().Msg("shutdown") - manager.emitMu.Lock() + manager.listenersMu.Lock() for key := range manager.listeners { delete(manager.listeners, key) } - manager.emitMu.Unlock() + manager.listenersMu.Unlock() } func (manager *ImageCtx) GetCached(serial uint64) (*ImageEntry, error) { @@ -101,8 +106,8 @@ func (manager *ImageCtx) Get() (*ImageEntry, error) { } func (manager *ImageCtx) AddListener(listener *func(entry *ImageEntry)) { - manager.emitMu.Lock() - defer manager.emitMu.Unlock() + manager.listenersMu.Lock() + defer manager.listenersMu.Unlock() if listener != nil { ptr := reflect.ValueOf(listener).Pointer() @@ -111,8 +116,8 @@ func (manager *ImageCtx) AddListener(listener *func(entry *ImageEntry)) { } func (manager *ImageCtx) RemoveListener(listener *func(entry *ImageEntry)) { - manager.emitMu.Lock() - defer manager.emitMu.Unlock() + manager.listenersMu.Lock() + defer manager.listenersMu.Unlock() if listener != nil { ptr := reflect.ValueOf(listener).Pointer() diff --git a/internal/webrtc/cursor/position.go b/internal/webrtc/cursor/position.go index 81b65eed..6c4ea7e2 100644 --- a/internal/webrtc/cursor/position.go +++ b/internal/webrtc/cursor/position.go @@ -16,30 +16,34 @@ func NewPosition() *PositionCtx { } type PositionCtx struct { - logger zerolog.Logger - emitMu sync.Mutex - listeners map[uintptr]*func(x, y int) + logger zerolog.Logger + + listeners map[uintptr]*func(x, y int) + listenersMu sync.Mutex } func (manager *PositionCtx) Shutdown() { manager.logger.Info().Msg("shutdown") - manager.emitMu.Lock() + manager.listenersMu.Lock() for key := range manager.listeners { delete(manager.listeners, key) } - manager.emitMu.Unlock() + manager.listenersMu.Unlock() } func (manager *PositionCtx) Set(x, y int) { + manager.listenersMu.Lock() + defer manager.listenersMu.Unlock() + for _, emit := range manager.listeners { (*emit)(x, y) } } func (manager *PositionCtx) AddListener(listener *func(x, y int)) { - manager.emitMu.Lock() - defer manager.emitMu.Unlock() + manager.listenersMu.Lock() + defer manager.listenersMu.Unlock() if listener != nil { ptr := reflect.ValueOf(listener).Pointer() @@ -48,8 +52,8 @@ func (manager *PositionCtx) AddListener(listener *func(x, y int)) { } func (manager *PositionCtx) RemoveListener(listener *func(x, y int)) { - manager.emitMu.Lock() - defer manager.emitMu.Unlock() + manager.listenersMu.Lock() + defer manager.listenersMu.Unlock() if listener != nil { ptr := reflect.ValueOf(listener).Pointer()