diff --git a/internal/capture/buckets.go b/internal/capture/buckets.go new file mode 100644 index 00000000..c35986aa --- /dev/null +++ b/internal/capture/buckets.go @@ -0,0 +1,86 @@ +package capture + +import ( + "errors" + + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" + + "github.com/demodesk/neko/pkg/types" + "github.com/demodesk/neko/pkg/types/codec" +) + +type BucketsManagerCtx struct { + logger zerolog.Logger + codec codec.RTPCodec + streams map[string]*StreamSinkManagerCtx + streamIDs []string +} + +func bucketsNew(codec codec.RTPCodec, streams map[string]*StreamSinkManagerCtx, streamIDs []string) *BucketsManagerCtx { + logger := log.With(). + Str("module", "capture"). + Str("submodule", "buckets"). + Logger() + + return &BucketsManagerCtx{ + logger: logger, + codec: codec, + streams: streams, + streamIDs: streamIDs, + } +} + +func (m *BucketsManagerCtx) shutdown() { + m.logger.Info().Msgf("shutdown") +} + +func (m *BucketsManagerCtx) destroyAll() { + for _, video := range m.streams { + if video.Started() { + video.destroyPipeline() + } + } +} + +func (m *BucketsManagerCtx) recreateAll() error { + for _, video := range m.streams { + if video.Started() { + err := video.createPipeline() + if err != nil && !errors.Is(err, types.ErrCapturePipelineAlreadyExists) { + return err + } + } + } + + return nil +} + +func (m *BucketsManagerCtx) IDs() []string { + return m.streamIDs +} + +func (m *BucketsManagerCtx) Codec() codec.RTPCodec { + return m.codec +} + +func (m *BucketsManagerCtx) SetReceiver(receiver types.Receiver) error { + receiver.OnVideoIdChange(func(videoID string) error { + videoStream, ok := m.streams[videoID] + if !ok { + return types.ErrWebRTCVideoNotFound + } + + return receiver.SetStream(videoStream) + }) + + // TODO: Save receiver. + return nil +} + +func (m *BucketsManagerCtx) RemoveReceiver(receiver types.Receiver) error { + // TODO: Unsubribe from OnVideoIdChange. + // TODO: Remove receiver. + receiver.RemoveStream() + return nil +} diff --git a/internal/capture/manager.go b/internal/capture/manager.go index eaee2423..c15b22fc 100644 --- a/internal/capture/manager.go +++ b/internal/capture/manager.go @@ -21,8 +21,7 @@ type CaptureManagerCtx struct { broadcast *BroacastManagerCtx screencast *ScreencastManagerCtx audio *StreamSinkManagerCtx - videos map[string]*StreamSinkManagerCtx - videoIDs []string + video *BucketsManagerCtx // sources webcam *StreamSrcManagerCtx @@ -118,6 +117,7 @@ func New(desktop types.DesktopManager, config *config.Capture) *CaptureManagerCt "! appsink name=appsink", config.Display, config.ScreencastRate, config.ScreencastQuality, ) }()), + audio: streamSinkNew(config.AudioCodec, func() (string, error) { if config.AudioPipeline != "" { // replace {device} with valid device @@ -133,8 +133,7 @@ func New(desktop types.DesktopManager, config *config.Capture) *CaptureManagerCt "! appsink name=appsink", config.AudioDevice, config.AudioCodec.Pipeline, ), nil }, "audio"), - videos: videos, - videoIDs: config.VideoIDs, + video: bucketsNew(config.VideoCodec, videos, config.VideoIDs), // sources webcam: streamSrcNew(config.WebcamEnabled, map[string]string{ @@ -195,11 +194,7 @@ func (manager *CaptureManagerCtx) Start() { } manager.desktop.OnBeforeScreenSizeChange(func() { - for _, video := range manager.videos { - if video.Started() { - video.destroyPipeline() - } - } + manager.video.destroyAll() if manager.broadcast.Started() { manager.broadcast.destroyPipeline() @@ -211,13 +206,9 @@ func (manager *CaptureManagerCtx) Start() { }) manager.desktop.OnAfterScreenSizeChange(func() { - for _, video := range manager.videos { - if video.Started() { - err := video.createPipeline() - if err != nil && !errors.Is(err, types.ErrCapturePipelineAlreadyExists) { - manager.logger.Panic().Err(err).Msg("unable to recreate video pipeline") - } - } + err := manager.video.recreateAll() + if err != nil { + manager.logger.Panic().Err(err).Msg("unable to recreate video pipelines") } if manager.broadcast.Started() { @@ -243,10 +234,7 @@ func (manager *CaptureManagerCtx) Shutdown() error { manager.screencast.shutdown() manager.audio.shutdown() - - for _, video := range manager.videos { - video.shutdown() - } + manager.video.shutdown() manager.webcam.shutdown() manager.microphone.shutdown() @@ -266,13 +254,8 @@ func (manager *CaptureManagerCtx) Audio() types.StreamSinkManager { return manager.audio } -func (manager *CaptureManagerCtx) Video(videoID string) (types.StreamSinkManager, bool) { - video, ok := manager.videos[videoID] - return video, ok -} - -func (manager *CaptureManagerCtx) VideoIDs() []string { - return manager.videoIDs +func (manager *CaptureManagerCtx) Video() types.BucketsManager { + return manager.video } func (manager *CaptureManagerCtx) Webcam() types.StreamSrcManager { diff --git a/internal/webrtc/manager.go b/internal/webrtc/manager.go index f7dab19e..48f68a92 100644 --- a/internal/webrtc/manager.go +++ b/internal/webrtc/manager.go @@ -8,6 +8,7 @@ import ( "time" "github.com/pion/ice/v2" + "github.com/pion/interceptor" "github.com/pion/rtcp" "github.com/pion/webrtc/v3" "github.com/rs/zerolog" @@ -35,11 +36,37 @@ const keepAliveInterval = 2 * time.Second const rtcpPLIInterval = 3 * time.Second func New(desktop types.DesktopManager, capture types.CaptureManager, config *config.WebRTC) *WebRTCManagerCtx { + configuration := webrtc.Configuration{ + SDPSemantics: webrtc.SDPSemanticsUnifiedPlanWithFallback, + } + + if !config.ICELite { + ICEServers := []webrtc.ICEServer{} + for _, server := range config.ICEServers { + var credential any + if server.Credential != "" { + credential = server.Credential + } else { + credential = false + } + + ICEServers = append(ICEServers, webrtc.ICEServer{ + URLs: server.URLs, + Username: server.Username, + Credential: credential, + }) + } + + configuration.ICEServers = ICEServers + } + return &WebRTCManagerCtx{ logger: log.With().Str("module", "webrtc").Logger(), config: config, metrics: newMetrics(), + webrtcConfiguration: configuration, + desktop: desktop, capture: capture, curImage: cursor.NewImage(desktop), @@ -58,6 +85,8 @@ type WebRTCManagerCtx struct { curImage *cursor.ImageCtx curPosition *cursor.PositionCtx + webrtcConfiguration webrtc.Configuration + tcpMux ice.TCPMux udpMux ice.UDPMux @@ -121,6 +150,70 @@ func (manager *WebRTCManagerCtx) ICEServers() []types.ICEServer { return manager.config.ICEServers } +func (manager *WebRTCManagerCtx) newPeerConnection(codecs []codec.RTPCodec, logger zerolog.Logger) (*webrtc.PeerConnection, error) { + // create media engine + engine := &webrtc.MediaEngine{} + for _, codec := range codecs { + if err := codec.Register(engine); err != nil { + return nil, err + } + } + + // create setting engine + settings := webrtc.SettingEngine{ + LoggerFactory: pionlog.New(logger), + } + + settings.SetICETimeouts(disconnectedTimeout, failedTimeout, keepAliveInterval) + settings.SetNAT1To1IPs(manager.config.NAT1To1IPs, webrtc.ICECandidateTypeHost) + settings.SetLite(manager.config.ICELite) + + var networkType []webrtc.NetworkType + + // udp candidates + if manager.udpMux != nil { + settings.SetICEUDPMux(manager.udpMux) + networkType = append(networkType, + webrtc.NetworkTypeUDP4, + webrtc.NetworkTypeUDP6, + ) + } else if manager.config.EphemeralMax != 0 { + _ = settings.SetEphemeralUDPPortRange(manager.config.EphemeralMin, manager.config.EphemeralMax) + networkType = append(networkType, + webrtc.NetworkTypeUDP4, + webrtc.NetworkTypeUDP6, + ) + } + + // tcp candidates + if manager.tcpMux != nil { + settings.SetICETCPMux(manager.tcpMux) + networkType = append(networkType, + webrtc.NetworkTypeTCP4, + webrtc.NetworkTypeTCP6, + ) + } + + // enable support for TCP and UDP ICE candidates + settings.SetNetworkTypes(networkType) + + // create interceptor registry + registry := &interceptor.Registry{} + if err := webrtc.RegisterDefaultInterceptors(engine, registry); err != nil { + return nil, err + } + + // create new API + api := webrtc.NewAPI( + webrtc.WithMediaEngine(engine), + webrtc.WithSettingEngine(settings), + webrtc.WithInterceptorRegistry(registry), + ) + + // create new peer connection + return api.NewPeerConnection(manager.webrtcConfiguration) +} + func (manager *WebRTCManagerCtx) CreatePeer(session types.Session, videoID string) (*webrtc.SessionDescription, error) { id := atomic.AddInt32(&manager.peerId, 1) manager.metrics.NewConnection(session) @@ -130,19 +223,16 @@ func (manager *WebRTCManagerCtx) CreatePeer(session types.Session, videoID strin logger.Info().Msg("creating webrtc peer") // all audios must have the same codec - audioStream := manager.capture.Audio() + audio := manager.capture.Audio() + audioCodec := audio.Codec() // all videos must have the same codec - videoStream, ok := manager.capture.Video(videoID) - if !ok { - return nil, types.ErrWebRTCVideoNotFound - } - - manager.metrics.SetVideoID(session, videoID) + video := manager.capture.Video() + videoCodec := video.Codec() connection, err := manager.newPeerConnection([]codec.RTPCodec{ - audioStream.Codec(), - videoStream.Codec(), + audioCodec, + videoCodec, }, logger) if err != nil { return nil, err @@ -166,26 +256,37 @@ func (manager *WebRTCManagerCtx) CreatePeer(session types.Session, videoID strin // audio track - audioTrack, err := manager.newPeerStreamTrack(audioStream, logger) + audioTrack, err := NewTrack(logger, audioCodec, connection) if err != nil { return nil, err } - if err := audioTrack.AddToConnection(connection); err != nil { + // set stream for audio track + err = audioTrack.SetStream(audio) + if err != nil { return nil, err } // video track - videoTrack, err := manager.newPeerStreamTrack(videoStream, logger) + videoTrack, err := NewTrack(logger, videoCodec, connection) if err != nil { return nil, err } - if err := videoTrack.AddToConnection(connection); err != nil { + // let video stream bucket manager handle stream subscriptions + err = video.SetReceiver(videoTrack) + if err != nil { return nil, err } + // set default video id + err = videoTrack.SetVideoID(videoID) + if err != nil { + return nil, err + } + manager.metrics.SetVideoID(session, videoID) + // data channel dataChannel, err := connection.CreateDataChannel("data", nil) @@ -198,13 +299,12 @@ func (manager *WebRTCManagerCtx) CreatePeer(session types.Session, videoID strin connection: connection, dataChannel: dataChannel, changeVideo: func(videoID string) error { - videoStream, ok := manager.capture.Video(videoID) - if !ok { - return types.ErrWebRTCVideoNotFound + if err := videoTrack.SetVideoID(videoID); err != nil { + return err } manager.metrics.SetVideoID(session, videoID) - return videoTrack.SetStream(videoStream) + return nil }, setPaused: func(isPaused bool) { videoTrack.SetPaused(isPaused) @@ -318,7 +418,7 @@ func (manager *WebRTCManagerCtx) CreatePeer(session types.Session, videoID strin connection.Close() case webrtc.PeerConnectionStateClosed: session.SetWebRTCConnected(peer, false) - videoTrack.RemoveStream() + video.RemoveReceiver(videoTrack) audioTrack.RemoveStream() } diff --git a/internal/webrtc/peer.go b/internal/webrtc/peer.go index 0c38cd27..e9be20f1 100644 --- a/internal/webrtc/peer.go +++ b/internal/webrtc/peer.go @@ -1,11 +1,14 @@ package webrtc import ( + "bytes" + "encoding/binary" "sync" "github.com/pion/webrtc/v3" "github.com/rs/zerolog" + "github.com/demodesk/neko/internal/webrtc/payload" "github.com/demodesk/neko/pkg/types" ) @@ -136,6 +139,63 @@ func (peer *WebRTCPeerCtx) SetPaused(isPaused bool) error { return nil } +func (peer *WebRTCPeerCtx) SendCursorPosition(x, y int) error { + peer.mu.Lock() + defer peer.mu.Unlock() + + if peer.dataChannel == nil { + return types.ErrWebRTCDataChannelNotFound + } + + data := payload.CursorPosition{ + Header: payload.Header{ + Event: payload.OP_CURSOR_POSITION, + Length: 7, + }, + X: uint16(x), + Y: uint16(y), + } + + buffer := &bytes.Buffer{} + if err := binary.Write(buffer, binary.BigEndian, data); err != nil { + return err + } + + return peer.dataChannel.Send(buffer.Bytes()) +} + +func (peer *WebRTCPeerCtx) SendCursorImage(cur *types.CursorImage, img []byte) error { + peer.mu.Lock() + defer peer.mu.Unlock() + + if peer.dataChannel == nil { + return types.ErrWebRTCDataChannelNotFound + } + + data := payload.CursorImage{ + Header: payload.Header{ + Event: payload.OP_CURSOR_IMAGE, + Length: uint16(11 + len(img)), + }, + Width: cur.Width, + Height: cur.Height, + Xhot: cur.Xhot, + Yhot: cur.Yhot, + } + + buffer := &bytes.Buffer{} + + if err := binary.Write(buffer, binary.BigEndian, data); err != nil { + return err + } + + if err := binary.Write(buffer, binary.BigEndian, img); err != nil { + return err + } + + return peer.dataChannel.Send(buffer.Bytes()) +} + func (peer *WebRTCPeerCtx) Destroy() { peer.mu.Lock() defer peer.mu.Unlock() diff --git a/internal/webrtc/peerconnection.go b/internal/webrtc/peerconnection.go deleted file mode 100644 index c2d6ab15..00000000 --- a/internal/webrtc/peerconnection.go +++ /dev/null @@ -1,104 +0,0 @@ -package webrtc - -import ( - "github.com/pion/interceptor" - "github.com/pion/webrtc/v3" - "github.com/rs/zerolog" - - "github.com/demodesk/neko/internal/webrtc/pionlog" - "github.com/demodesk/neko/pkg/types/codec" -) - -func (manager *WebRTCManagerCtx) newPeerConnection(codecs []codec.RTPCodec, logger zerolog.Logger) (*webrtc.PeerConnection, error) { - // create media engine - engine := &webrtc.MediaEngine{} - for _, codec := range codecs { - if err := codec.Register(engine); err != nil { - return nil, err - } - } - - // create setting engine - settings := webrtc.SettingEngine{ - LoggerFactory: pionlog.New(logger), - } - - settings.SetICETimeouts(disconnectedTimeout, failedTimeout, keepAliveInterval) - settings.SetNAT1To1IPs(manager.config.NAT1To1IPs, webrtc.ICECandidateTypeHost) - settings.SetLite(manager.config.ICELite) - - var networkType []webrtc.NetworkType - - // udp candidates - if manager.udpMux != nil { - settings.SetICEUDPMux(manager.udpMux) - networkType = append(networkType, - webrtc.NetworkTypeUDP4, - webrtc.NetworkTypeUDP6, - ) - } else if manager.config.EphemeralMax != 0 { - _ = settings.SetEphemeralUDPPortRange(manager.config.EphemeralMin, manager.config.EphemeralMax) - networkType = append(networkType, - webrtc.NetworkTypeUDP4, - webrtc.NetworkTypeUDP6, - ) - } - - // tcp candidates - if manager.tcpMux != nil { - settings.SetICETCPMux(manager.tcpMux) - networkType = append(networkType, - webrtc.NetworkTypeTCP4, - webrtc.NetworkTypeTCP6, - ) - } - - // enable support for TCP and UDP ICE candidates - settings.SetNetworkTypes(networkType) - - // create interceptor registry - registry := &interceptor.Registry{} - if err := webrtc.RegisterDefaultInterceptors(engine, registry); err != nil { - return nil, err - } - - // create new API - api := webrtc.NewAPI( - webrtc.WithMediaEngine(engine), - webrtc.WithSettingEngine(settings), - webrtc.WithInterceptorRegistry(registry), - ) - - // create new peer connection - configuration := manager.peerConfiguration() - return api.NewPeerConnection(configuration) -} - -func (manager *WebRTCManagerCtx) peerConfiguration() webrtc.Configuration { - if manager.config.ICELite { - return webrtc.Configuration{ - SDPSemantics: webrtc.SDPSemanticsUnifiedPlanWithFallback, - } - } - - ICEServers := []webrtc.ICEServer{} - for _, server := range manager.config.ICEServers { - var credential any - if server.Credential != "" { - credential = server.Credential - } else { - credential = false - } - - ICEServers = append(ICEServers, webrtc.ICEServer{ - URLs: server.URLs, - Username: server.Username, - Credential: credential, - }) - } - - return webrtc.Configuration{ - ICEServers: ICEServers, - SDPSemantics: webrtc.SDPSemanticsUnifiedPlanWithFallback, - } -} diff --git a/internal/webrtc/peerstreamtrack.go b/internal/webrtc/peerstreamtrack.go deleted file mode 100644 index 423b21e0..00000000 --- a/internal/webrtc/peerstreamtrack.go +++ /dev/null @@ -1,137 +0,0 @@ -package webrtc - -import ( - "errors" - "io" - "sync" - - "github.com/pion/rtcp" - "github.com/pion/webrtc/v3" - "github.com/pion/webrtc/v3/pkg/media" - "github.com/rs/zerolog" - - "github.com/demodesk/neko/pkg/types" -) - -func (manager *WebRTCManagerCtx) newPeerStreamTrack(stream types.StreamSinkManager, logger zerolog.Logger) (*PeerStreamTrack, error) { - codec := stream.Codec() - - id := codec.Type.String() - track, err := webrtc.NewTrackLocalStaticSample(codec.Capability, id, "stream") - if err != nil { - return nil, err - } - - logger = logger.With().Str("id", id).Logger() - - peer := &PeerStreamTrack{ - logger: logger, - track: track, - } - - peer.listener = func(sample types.Sample) { - if peer.paused { - return - } - - err := track.WriteSample(media.Sample(sample)) - if err != nil && errors.Is(err, io.ErrClosedPipe) { - logger.Warn().Err(err).Msg("pipeline failed to write") - } - } - - err = peer.SetStream(stream) - return peer, err -} - -type PeerStreamTrack struct { - logger zerolog.Logger - track *webrtc.TrackLocalStaticSample - paused bool - listener func(sample types.Sample) - - stream types.StreamSinkManager - streamMu sync.Mutex - - onRtcp func(rtcp.Packet) - onRtcpMu sync.RWMutex -} - -func (peer *PeerStreamTrack) SetStream(stream types.StreamSinkManager) error { - peer.streamMu.Lock() - defer peer.streamMu.Unlock() - - var err error - if peer.stream != nil { - err = peer.stream.MoveListenerTo(&peer.listener, stream) - } else { - err = stream.AddListener(&peer.listener) - } - - if err == nil { - peer.stream = stream - } - - return err -} - -func (peer *PeerStreamTrack) RemoveStream() { - peer.streamMu.Lock() - defer peer.streamMu.Unlock() - - if peer.stream != nil { - _ = peer.stream.RemoveListener(&peer.listener) - peer.stream = nil - } -} - -func (peer *PeerStreamTrack) AddToConnection(connection *webrtc.PeerConnection) error { - sender, err := connection.AddTrack(peer.track) - if err != nil { - return err - } - - go func() { - rtcpBuf := make([]byte, 1500) - for { - n, _, err := sender.Read(rtcpBuf) - if err != nil { - if err == io.EOF || err == io.ErrClosedPipe { - return - } - - peer.logger.Err(err).Msg("RTCP read error") - continue - } - - packets, err := rtcp.Unmarshal(rtcpBuf[:n]) - if err != nil { - peer.logger.Err(err).Msg("RTCP unmarshal error") - continue - } - - peer.onRtcpMu.RLock() - handler := peer.onRtcp - peer.onRtcpMu.RUnlock() - - for _, packet := range packets { - if handler != nil { - go handler(packet) - } - } - } - }() - - return nil -} - -func (peer *PeerStreamTrack) SetPaused(paused bool) { - peer.paused = paused -} - -func (peer *PeerStreamTrack) OnRTCP(f func(rtcp.Packet)) { - peer.onRtcpMu.Lock() - defer peer.onRtcpMu.Unlock() - - peer.onRtcp = f -} diff --git a/internal/webrtc/send.go b/internal/webrtc/send.go deleted file mode 100644 index c2d0b992..00000000 --- a/internal/webrtc/send.go +++ /dev/null @@ -1,66 +0,0 @@ -package webrtc - -import ( - "bytes" - "encoding/binary" - - "github.com/demodesk/neko/internal/webrtc/payload" - "github.com/demodesk/neko/pkg/types" -) - -func (peer *WebRTCPeerCtx) SendCursorPosition(x, y int) error { - peer.mu.Lock() - defer peer.mu.Unlock() - - if peer.dataChannel == nil { - return types.ErrWebRTCDataChannelNotFound - } - - data := payload.CursorPosition{ - Header: payload.Header{ - Event: payload.OP_CURSOR_POSITION, - Length: 7, - }, - X: uint16(x), - Y: uint16(y), - } - - buffer := &bytes.Buffer{} - if err := binary.Write(buffer, binary.BigEndian, data); err != nil { - return err - } - - return peer.dataChannel.Send(buffer.Bytes()) -} - -func (peer *WebRTCPeerCtx) SendCursorImage(cur *types.CursorImage, img []byte) error { - peer.mu.Lock() - defer peer.mu.Unlock() - - if peer.dataChannel == nil { - return types.ErrWebRTCDataChannelNotFound - } - - data := payload.CursorImage{ - Header: payload.Header{ - Event: payload.OP_CURSOR_IMAGE, - Length: uint16(11 + len(img)), - }, - Width: cur.Width, - Height: cur.Height, - Xhot: cur.Xhot, - Yhot: cur.Yhot, - } - - buffer := &bytes.Buffer{} - - if err := binary.Write(buffer, binary.BigEndian, data); err != nil { - return err - } - - if err := binary.Write(buffer, binary.BigEndian, img); err != nil { - return err - } - - return peer.dataChannel.Send(buffer.Bytes()) -} diff --git a/internal/webrtc/track.go b/internal/webrtc/track.go new file mode 100644 index 00000000..6f32282f --- /dev/null +++ b/internal/webrtc/track.go @@ -0,0 +1,148 @@ +package webrtc + +import ( + "errors" + "fmt" + "io" + "sync" + + "github.com/pion/rtcp" + "github.com/pion/webrtc/v3" + "github.com/pion/webrtc/v3/pkg/media" + "github.com/rs/zerolog" + + "github.com/demodesk/neko/pkg/types" + "github.com/demodesk/neko/pkg/types/codec" +) + +type Track struct { + logger zerolog.Logger + track *webrtc.TrackLocalStaticSample + paused bool + listener func(sample types.Sample) + + stream types.StreamSinkManager + streamMu sync.Mutex + + onRtcp func(rtcp.Packet) + onRtcpMu sync.RWMutex + + videoIdChange func(string) error +} + +func NewTrack(logger zerolog.Logger, codec codec.RTPCodec, connection *webrtc.PeerConnection) (*Track, error) { + id := codec.Type.String() + track, err := webrtc.NewTrackLocalStaticSample(codec.Capability, id, "stream") + if err != nil { + return nil, err + } + + logger = logger.With().Str("id", id).Logger() + + t := &Track{ + logger: logger, + track: track, + } + + t.listener = func(sample types.Sample) { + if t.paused { + return + } + + err := track.WriteSample(media.Sample(sample)) + if err != nil && errors.Is(err, io.ErrClosedPipe) { + logger.Warn().Err(err).Msg("pipeline failed to write") + } + } + + sender, err := connection.AddTrack(t.track) + if err != nil { + return nil, err + } + + go t.rtcpReader(sender) + + return t, nil +} + +func (t *Track) rtcpReader(sender *webrtc.RTPSender) { + rtcpBuf := make([]byte, 1500) + for { + n, _, err := sender.Read(rtcpBuf) + if err != nil { + if err == io.EOF || err == io.ErrClosedPipe { + return + } + + t.logger.Err(err).Msg("RTCP read error") + continue + } + + packets, err := rtcp.Unmarshal(rtcpBuf[:n]) + if err != nil { + t.logger.Err(err).Msg("RTCP unmarshal error") + continue + } + + t.onRtcpMu.RLock() + handler := t.onRtcp + t.onRtcpMu.RUnlock() + + for _, packet := range packets { + if handler != nil { + go handler(packet) + } + } + } +} + +func (t *Track) SetStream(stream types.StreamSinkManager) error { + t.streamMu.Lock() + defer t.streamMu.Unlock() + + var err error + if t.stream != nil { + err = t.stream.MoveListenerTo(&t.listener, stream) + } else { + err = stream.AddListener(&t.listener) + } + + if err == nil { + t.stream = stream + } + + return err +} + +func (t *Track) RemoveStream() { + t.streamMu.Lock() + defer t.streamMu.Unlock() + + if t.stream != nil { + _ = t.stream.RemoveListener(&t.listener) + t.stream = nil + } +} + +func (t *Track) SetPaused(paused bool) { + t.paused = paused +} + +func (t *Track) OnRTCP(f func(rtcp.Packet)) { + t.onRtcpMu.Lock() + defer t.onRtcpMu.Unlock() + + t.onRtcp = f +} + +func (t *Track) SetVideoID(videoID string) error { + if t.videoIdChange == nil { + return fmt.Errorf("video id change not supported") + } + + return t.videoIdChange(videoID) +} + +func (t *Track) OnVideoIdChange(f func(string) error) { + t.videoIdChange = f +} diff --git a/internal/websocket/handler/signal.go b/internal/websocket/handler/signal.go index d994ffd8..5fa65597 100644 --- a/internal/websocket/handler/signal.go +++ b/internal/websocket/handler/signal.go @@ -15,7 +15,7 @@ func (h *MessageHandlerCtx) signalRequest(session types.Session, payload *messag // use default first video, if not provided if payload.Video == "" { - videos := h.capture.VideoIDs() + videos := h.capture.Video().IDs() payload.Video = videos[0] } diff --git a/internal/websocket/handler/system.go b/internal/websocket/handler/system.go index 446c4330..d914d3cf 100644 --- a/internal/websocket/handler/system.go +++ b/internal/websocket/handler/system.go @@ -49,7 +49,7 @@ func (h *MessageHandlerCtx) systemInit(session types.Session) error { Settings: h.sessions.Settings(), ScreencastEnabled: h.capture.Screencast().Enabled(), WebRTC: message.SystemWebRTC{ - Videos: h.capture.VideoIDs(), + Videos: h.capture.Video().IDs(), }, }) diff --git a/pkg/types/capture.go b/pkg/types/capture.go index 5c7a5ab4..857132f2 100644 --- a/pkg/types/capture.go +++ b/pkg/types/capture.go @@ -19,6 +19,19 @@ var ( type Sample media.Sample +type Receiver interface { + SetStream(stream StreamSinkManager) error + RemoveStream() + OnVideoIdChange(f func(string) error) +} + +type BucketsManager interface { + IDs() []string + Codec() codec.RTPCodec + SetReceiver(receiver Receiver) error + RemoveReceiver(receiver Receiver) error +} + type BroadcastManager interface { Start(url string) error Stop() @@ -60,8 +73,7 @@ type CaptureManager interface { Broadcast() BroadcastManager Screencast() ScreencastManager Audio() StreamSinkManager - Video(videoID string) (StreamSinkManager, bool) - VideoIDs() []string + Video() BucketsManager Webcam() StreamSrcManager Microphone() StreamSrcManager