diff --git a/internal/capture/screencast.go b/internal/capture/screencast.go index aa8d44ba..657133e3 100644 --- a/internal/capture/screencast.go +++ b/internal/capture/screencast.go @@ -25,10 +25,8 @@ type ScreencastManagerCtx struct { pipelineStr string pipelineMu sync.Mutex - image types.Sample - sample chan types.Sample - sampleStop chan struct{} - sampleUpdate chan struct{} + image types.Sample + tickerStop chan struct{} enabled bool started bool @@ -42,18 +40,16 @@ func screencastNew(enabled bool, pipelineStr string) *ScreencastManagerCtx { Logger() manager := &ScreencastManagerCtx{ - logger: logger, - pipelineStr: pipelineStr, - sampleStop: make(chan struct{}), - sampleUpdate: make(chan struct{}), - enabled: enabled, - started: false, + logger: logger, + pipelineStr: pipelineStr, + tickerStop: make(chan struct{}), + enabled: enabled, + started: false, } manager.wg.Add(1) go func() { - manager.logger.Debug().Msg("started emitting samples") defer manager.wg.Done() ticker := time.NewTicker(screencastTimeout) @@ -61,15 +57,10 @@ func screencastNew(enabled bool, pipelineStr string) *ScreencastManagerCtx { for { select { - case <-manager.sampleStop: - manager.logger.Debug().Msg("stopped emitting samples") + case <-manager.tickerStop: return - case <-manager.sampleUpdate: - manager.logger.Debug().Msg("update emitting samples") - case sample := <-manager.sample: - manager.image = sample case <-ticker.C: - if manager.started && !atomic.CompareAndSwapInt32(&manager.expired, 0, 1) { + if manager.Started() && !atomic.CompareAndSwapInt32(&manager.expired, 0, 1) { manager.stop() } } @@ -84,7 +75,7 @@ func (manager *ScreencastManagerCtx) shutdown() { manager.destroyPipeline() - close(manager.sampleStop) + close(manager.tickerStop) manager.wg.Wait() } @@ -105,22 +96,13 @@ func (manager *ScreencastManagerCtx) Started() bool { func (manager *ScreencastManagerCtx) Image() ([]byte, error) { atomic.StoreInt32(&manager.expired, 0) - if !manager.started { - err := manager.start() - if err != nil { - return nil, err - } - - select { - case sample := <-manager.sample: - return sample.Data, nil - case <-time.After(1 * time.Second): - return nil, errors.New("timeouted") - } + err := manager.start() + if err != nil && !errors.Is(err, types.ErrCapturePipelineAlreadyExists) { + return nil, err } if manager.image.Data == nil { - return nil, errors.New("image sample not found") + return nil, errors.New("image data not found") } return manager.image.Data, nil @@ -173,8 +155,34 @@ func (manager *ScreencastManagerCtx) createPipeline() error { manager.pipeline.AttachAppsink("appsink") manager.pipeline.Play() - manager.sample = manager.pipeline.Sample - manager.sampleUpdate <- struct{}{} + // get first image + var ok bool + select { + case manager.image, ok = <-manager.pipeline.Sample: + if !ok { + return errors.New("unable to get first image") + } + case <-time.After(1 * time.Second): + return errors.New("timeouted while waiting for first image") + } + + manager.wg.Add(1) + + go func() { + manager.logger.Debug().Msg("started receiving images") + defer manager.wg.Done() + + for { + image, ok := <-manager.pipeline.Sample + if !ok { + manager.logger.Debug().Msg("stopped receiving images") + return + } + + manager.image = image + } + }() + return nil } diff --git a/internal/capture/streamsink.go b/internal/capture/streamsink.go index 39b164a6..d012611f 100644 --- a/internal/capture/streamsink.go +++ b/internal/capture/streamsink.go @@ -25,10 +25,6 @@ type StreamSinkManagerCtx struct { pipelineMu sync.Mutex pipelineStr func() string - sample chan types.Sample - sampleStop chan struct{} - sampleUpdate chan struct{} - listeners map[uintptr]*func(sample types.Sample) listenersMu sync.Mutex } @@ -40,37 +36,12 @@ func streamSinkNew(codec codec.RTPCodec, pipelineStr func() string, video_id str Str("video_id", video_id).Logger() manager := &StreamSinkManagerCtx{ - logger: logger, - codec: codec, - pipelineStr: pipelineStr, - sampleStop: make(chan struct{}), - sampleUpdate: make(chan struct{}), - listeners: map[uintptr]*func(sample types.Sample){}, + logger: logger, + codec: codec, + pipelineStr: pipelineStr, + listeners: map[uintptr]*func(sample types.Sample){}, } - manager.wg.Add(1) - - go func() { - manager.logger.Debug().Msg("started emitting samples") - defer manager.wg.Done() - - for { - select { - case <-manager.sampleStop: - manager.logger.Debug().Msg("stopped emitting samples") - return - case <-manager.sampleUpdate: - manager.logger.Debug().Msg("update emitting samples") - case sample := <-manager.sample: - manager.listenersMu.Lock() - for _, emit := range manager.listeners { - (*emit)(sample) - } - manager.listenersMu.Unlock() - } - } - }() - return manager } @@ -84,8 +55,6 @@ func (manager *StreamSinkManagerCtx) shutdown() { manager.listenersMu.Unlock() manager.destroyPipeline() - - close(manager.sampleStop) manager.wg.Wait() } @@ -249,8 +218,27 @@ func (manager *StreamSinkManagerCtx) createPipeline() error { manager.pipeline.AttachAppsink("appsink") manager.pipeline.Play() - manager.sample = manager.pipeline.Sample - manager.sampleUpdate <- struct{}{} + manager.wg.Add(1) + + go func() { + manager.logger.Debug().Msg("started emitting samples") + defer manager.wg.Done() + + for { + sample, ok := <-manager.pipeline.Sample + if !ok { + manager.logger.Debug().Msg("stopped emitting samples") + return + } + + manager.listenersMu.Lock() + for _, emit := range manager.listeners { + (*emit)(sample) + } + manager.listenersMu.Unlock() + } + }() + return nil }