diff --git a/internal/capture/manager.go b/internal/capture/manager.go index fb622e02..d7d41a35 100644 --- a/internal/capture/manager.go +++ b/internal/capture/manager.go @@ -18,8 +18,8 @@ type CaptureManagerCtx struct { broadcast *BroacastManagerCtx screencast *ScreencastManagerCtx - audio *StreamManagerCtx - videos map[string]*StreamManagerCtx + audio *StreamSinkManagerCtx + videos map[string]*StreamSinkManagerCtx videoIDs []string } @@ -57,7 +57,7 @@ func New(desktop types.DesktopManager, config *config.Capture) *CaptureManagerCt ) } - videos := map[string]*StreamManagerCtx{} + videos := map[string]*StreamSinkManagerCtx{} for video_id, cnf := range config.VideoPipelines { pipelineConf := cnf @@ -88,7 +88,7 @@ func New(desktop types.DesktopManager, config *config.Capture) *CaptureManagerCt Msg("syntax check for video stream pipeline passed") // append to videos - videos[video_id] = streamNew(config.VideoCodec, createPipeline, video_id) + videos[video_id] = streamSinkNew(config.VideoCodec, createPipeline, video_id) } return &CaptureManagerCtx{ @@ -97,7 +97,7 @@ func New(desktop types.DesktopManager, config *config.Capture) *CaptureManagerCt broadcast: broadcastNew(broadcastPipeline), screencast: screencastNew(config.ScreencastEnabled, screencastPipeline), - audio: streamNew(config.AudioCodec, func() string { + audio: streamSinkNew(config.AudioCodec, func() string { if config.AudioPipeline != "" { return config.AudioPipeline } @@ -188,11 +188,11 @@ func (manager *CaptureManagerCtx) Screencast() types.ScreencastManager { return manager.screencast } -func (manager *CaptureManagerCtx) Audio() types.StreamManager { +func (manager *CaptureManagerCtx) Audio() types.StreamSinkManager { return manager.audio } -func (manager *CaptureManagerCtx) Video(videoID string) (types.StreamManager, bool) { +func (manager *CaptureManagerCtx) Video(videoID string) (types.StreamSinkManager, bool) { video, ok := manager.videos[videoID] return video, ok } diff --git a/internal/capture/stream.go b/internal/capture/streamsink.go similarity index 78% rename from internal/capture/stream.go rename to internal/capture/streamsink.go index 3272abf1..dfb40152 100644 --- a/internal/capture/stream.go +++ b/internal/capture/streamsink.go @@ -13,9 +13,9 @@ import ( "demodesk/neko/internal/types/codec" ) -var moveListenerMu = sync.Mutex{} +var moveSinkListenerMu = sync.Mutex{} -type StreamManagerCtx struct { +type StreamSinkManagerCtx struct { logger zerolog.Logger mu sync.Mutex wg sync.WaitGroup @@ -33,13 +33,13 @@ type StreamManagerCtx struct { listenersMu sync.Mutex } -func streamNew(codec codec.RTPCodec, pipelineStr func() string, video_id string) *StreamManagerCtx { +func streamSinkNew(codec codec.RTPCodec, pipelineStr func() string, video_id string) *StreamSinkManagerCtx { logger := log.With(). Str("module", "capture"). - Str("submodule", "stream"). + Str("submodule", "stream-sink"). Str("video_id", video_id).Logger() - manager := &StreamManagerCtx{ + manager := &StreamSinkManagerCtx{ logger: logger, codec: codec, pipelineStr: pipelineStr, @@ -74,7 +74,7 @@ func streamNew(codec codec.RTPCodec, pipelineStr func() string, video_id string) return manager } -func (manager *StreamManagerCtx) shutdown() { +func (manager *StreamSinkManagerCtx) shutdown() { manager.logger.Info().Msgf("shutdown") manager.listenersMu.Lock() @@ -89,11 +89,11 @@ func (manager *StreamManagerCtx) shutdown() { manager.wg.Wait() } -func (manager *StreamManagerCtx) Codec() codec.RTPCodec { +func (manager *StreamSinkManagerCtx) Codec() codec.RTPCodec { return manager.codec } -func (manager *StreamManagerCtx) start() error { +func (manager *StreamSinkManagerCtx) start() error { if len(manager.listeners) == 0 { err := manager.createPipeline() if err != nil && !errors.Is(err, types.ErrCapturePipelineAlreadyExists) { @@ -106,14 +106,14 @@ func (manager *StreamManagerCtx) start() error { return nil } -func (manager *StreamManagerCtx) stop() { +func (manager *StreamSinkManagerCtx) stop() { if len(manager.listeners) == 0 { manager.destroyPipeline() manager.logger.Info().Msgf("last listener, stopping") } } -func (manager *StreamManagerCtx) addListener(listener *func(sample types.Sample)) { +func (manager *StreamSinkManagerCtx) addListener(listener *func(sample types.Sample)) { ptr := reflect.ValueOf(listener).Pointer() manager.listenersMu.Lock() @@ -123,7 +123,7 @@ func (manager *StreamManagerCtx) addListener(listener *func(sample types.Sample) manager.logger.Debug().Interface("ptr", ptr).Msgf("adding listener") } -func (manager *StreamManagerCtx) removeListener(listener *func(sample types.Sample)) { +func (manager *StreamSinkManagerCtx) removeListener(listener *func(sample types.Sample)) { ptr := reflect.ValueOf(listener).Pointer() manager.listenersMu.Lock() @@ -133,7 +133,7 @@ func (manager *StreamManagerCtx) removeListener(listener *func(sample types.Samp manager.logger.Debug().Interface("ptr", ptr).Msgf("removing listener") } -func (manager *StreamManagerCtx) AddListener(listener *func(sample types.Sample)) error { +func (manager *StreamSinkManagerCtx) AddListener(listener *func(sample types.Sample)) error { manager.mu.Lock() defer manager.mu.Unlock() @@ -152,7 +152,7 @@ func (manager *StreamManagerCtx) AddListener(listener *func(sample types.Sample) return nil } -func (manager *StreamManagerCtx) RemoveListener(listener *func(sample types.Sample)) error { +func (manager *StreamSinkManagerCtx) RemoveListener(listener *func(sample types.Sample)) error { manager.mu.Lock() defer manager.mu.Unlock() @@ -171,12 +171,12 @@ func (manager *StreamManagerCtx) RemoveListener(listener *func(sample types.Samp // moving listeners between streams ensures, that target pipeline is running // before listener is added, and stops source pipeline if there are 0 listeners -func (manager *StreamManagerCtx) MoveListenerTo(listener *func(sample types.Sample), stream types.StreamManager) error { +func (manager *StreamSinkManagerCtx) MoveListenerTo(listener *func(sample types.Sample), stream types.StreamSinkManager) error { if listener == nil { return errors.New("listener cannot be nil") } - targetStream, ok := stream.(*StreamManagerCtx) + targetStream, ok := stream.(*StreamSinkManagerCtx) if !ok { return errors.New("target stream manager does not support moving listeners") } @@ -186,7 +186,7 @@ func (manager *StreamManagerCtx) MoveListenerTo(listener *func(sample types.Samp // global mutex, that ensures atomic locking // lock global mutex - moveListenerMu.Lock() + moveSinkListenerMu.Lock() // lock source stream manager.mu.Lock() @@ -197,7 +197,7 @@ func (manager *StreamManagerCtx) MoveListenerTo(listener *func(sample types.Samp defer targetStream.mu.Unlock() // unlock global mutex - moveListenerMu.Unlock() + moveSinkListenerMu.Unlock() // start if stopped if err := targetStream.start(); err != nil { @@ -214,18 +214,18 @@ func (manager *StreamManagerCtx) MoveListenerTo(listener *func(sample types.Samp return nil } -func (manager *StreamManagerCtx) ListenersCount() int { +func (manager *StreamSinkManagerCtx) ListenersCount() int { manager.listenersMu.Lock() defer manager.listenersMu.Unlock() return len(manager.listeners) } -func (manager *StreamManagerCtx) Started() bool { +func (manager *StreamSinkManagerCtx) Started() bool { return manager.ListenersCount() > 0 } -func (manager *StreamManagerCtx) createPipeline() error { +func (manager *StreamSinkManagerCtx) createPipeline() error { manager.pipelineMu.Lock() defer manager.pipelineMu.Unlock() @@ -235,10 +235,9 @@ func (manager *StreamManagerCtx) createPipeline() error { var err error - codec := manager.Codec() pipelineStr := manager.pipelineStr() manager.logger.Info(). - Str("codec", codec.Name). + Str("codec", manager.codec.Name). Str("src", pipelineStr). Msgf("creating pipeline") @@ -255,7 +254,7 @@ func (manager *StreamManagerCtx) createPipeline() error { return nil } -func (manager *StreamManagerCtx) destroyPipeline() { +func (manager *StreamSinkManagerCtx) destroyPipeline() { manager.pipelineMu.Lock() defer manager.pipelineMu.Unlock() diff --git a/internal/types/capture.go b/internal/types/capture.go index ed00b999..3fcc63fe 100644 --- a/internal/types/capture.go +++ b/internal/types/capture.go @@ -32,12 +32,12 @@ type ScreencastManager interface { Image() ([]byte, error) } -type StreamManager interface { +type StreamSinkManager interface { Codec() codec.RTPCodec AddListener(listener *func(sample Sample)) error RemoveListener(listener *func(sample Sample)) error - MoveListenerTo(listener *func(sample Sample), targetStream StreamManager) error + MoveListenerTo(listener *func(sample Sample), targetStream StreamSinkManager) error ListenersCount() int Started() bool @@ -49,8 +49,8 @@ type CaptureManager interface { Broadcast() BroadcastManager Screencast() ScreencastManager - Audio() StreamManager - Video(videoID string) (StreamManager, bool) + Audio() StreamSinkManager + Video(videoID string) (StreamSinkManager, bool) VideoIDs() []string } diff --git a/internal/webrtc/peerstreamtrack.go b/internal/webrtc/peerstreamtrack.go index 2e3251e6..e204243a 100644 --- a/internal/webrtc/peerstreamtrack.go +++ b/internal/webrtc/peerstreamtrack.go @@ -11,7 +11,7 @@ import ( "github.com/rs/zerolog" ) -func (manager *WebRTCManagerCtx) newPeerStreamTrack(stream types.StreamManager, logger zerolog.Logger) (*PeerStreamTrack, error) { +func (manager *WebRTCManagerCtx) newPeerStreamTrack(stream types.StreamSinkManager, logger zerolog.Logger) (*PeerStreamTrack, error) { codec := stream.Codec() id := codec.Type.String() @@ -42,11 +42,11 @@ type PeerStreamTrack struct { track *webrtc.TrackLocalStaticSample listener func(sample types.Sample) - stream types.StreamManager + stream types.StreamSinkManager streamMu sync.Mutex } -func (peer *PeerStreamTrack) SetStream(stream types.StreamManager) error { +func (peer *PeerStreamTrack) SetStream(stream types.StreamSinkManager) error { peer.streamMu.Lock() defer peer.streamMu.Unlock()