From e3e9d1606d388d1f23ca8fc19e76f3909935439b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20=C5=A0ediv=C3=BD?= Date: Mon, 26 Jun 2023 21:27:14 +0200 Subject: [PATCH] Refactor signaling for video and audio (#51) * add audio and signal request. * disable audio by default. * fix SignalProvide. * disable estimator when track disabled. --- internal/webrtc/manager.go | 7 +- internal/webrtc/peer.go | 229 +++++++++++++++++--------- internal/webrtc/track.go | 1 + internal/websocket/handler/handler.go | 7 +- internal/websocket/handler/signal.go | 61 ++++--- pkg/types/capture.go | 6 +- pkg/types/event/events.go | 1 + pkg/types/message/messages.go | 17 +- pkg/types/webrtc.go | 30 +++- 9 files changed, 253 insertions(+), 106 deletions(-) diff --git a/internal/webrtc/manager.go b/internal/webrtc/manager.go index e98f0b8e..5412c9e7 100644 --- a/internal/webrtc/manager.go +++ b/internal/webrtc/manager.go @@ -312,6 +312,9 @@ func (manager *WebRTCManagerCtx) CreatePeer(session types.Session) (*webrtc.Sess return nil, nil, err } + // we disable audio by default manually + audioTrack.SetPaused(true) + // set stream for audio track _, err = audioTrack.SetStream(audio) if err != nil { @@ -355,7 +358,8 @@ func (manager *WebRTCManagerCtx) CreatePeer(session types.Session) (*webrtc.Sess CollapseValues: true, }), // stream selectors - videoSelector: manager.capture.Video(), + video: video, + audio: audio, // tracks & channels audioTrack: audioTrack, videoTrack: videoTrack, @@ -364,6 +368,7 @@ func (manager *WebRTCManagerCtx) CreatePeer(session types.Session) (*webrtc.Sess // config iceTrickle: manager.config.ICETrickle, estimatorConfig: manager.config.Estimator, + audioDisabled: true, // we disable audio by default manually } connection.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) { diff --git a/internal/webrtc/peer.go b/internal/webrtc/peer.go index 5d0ae2a9..6abd4952 100644 --- a/internal/webrtc/peer.go +++ b/internal/webrtc/peer.go @@ -15,7 +15,6 @@ import ( "github.com/demodesk/neko/internal/webrtc/payload" "github.com/demodesk/neko/pkg/types" "github.com/demodesk/neko/pkg/types/event" - "github.com/demodesk/neko/pkg/types/message" "github.com/demodesk/neko/pkg/utils" ) @@ -29,7 +28,8 @@ type WebRTCPeerCtx struct { estimator cc.BandwidthEstimator estimateTrend *utils.TrendDetector // stream selectors - videoSelector types.StreamSelectorManager + video types.StreamSelectorManager + audio types.StreamSinkManager // tracks & channels audioTrack *Track videoTrack *Track @@ -38,7 +38,10 @@ type WebRTCPeerCtx struct { // config iceTrickle bool estimatorConfig config.WebRTCEstimator + paused bool videoAuto bool + videoDisabled bool + audioDisabled bool } // @@ -158,8 +161,8 @@ func (peer *WebRTCPeerCtx) estimatorReader() { break } - // if estimation is disabled, do nothing - if !peer.videoAuto || conf.Passive { + // if estimation or video is disabled, do nothing + if !peer.videoAuto || peer.videoDisabled || peer.paused || conf.Passive { continue } @@ -236,9 +239,11 @@ func (peer *WebRTCPeerCtx) estimatorReader() { continue } - err := peer.SetVideo(types.StreamSelector{ - ID: streamId, - Type: types.StreamSelectorTypeLower, + err := peer.SetVideo(types.PeerVideoRequest{ + Selector: &types.StreamSelector{ + ID: streamId, + Type: types.StreamSelectorTypeLower, + }, }) if err != nil && err != types.ErrWebRTCStreamNotFound { peer.logger.Warn().Err(err).Msg("failed to downgrade video stream") @@ -287,9 +292,11 @@ func (peer *WebRTCPeerCtx) estimatorReader() { continue } - err := peer.SetVideo(types.StreamSelector{ - ID: streamId, - Type: types.StreamSelectorTypeHigher, + err := peer.SetVideo(types.PeerVideoRequest{ + Selector: &types.StreamSelector{ + ID: streamId, + Type: types.StreamSelectorTypeHigher, + }, }) if err != nil && err != types.ErrWebRTCStreamNotFound { peer.logger.Warn().Err(err).Msg("failed to upgrade video stream") @@ -304,65 +311,16 @@ func (peer *WebRTCPeerCtx) estimatorReader() { } } -// -// video -// - -func (peer *WebRTCPeerCtx) SetVideo(selector types.StreamSelector) error { - peer.mu.Lock() - defer peer.mu.Unlock() - - // get requested video stream from selector - stream, ok := peer.videoSelector.GetStream(selector) - if !ok { - return types.ErrWebRTCStreamNotFound - } - - // set video stream to track - changed, err := peer.videoTrack.SetStream(stream) - if err != nil { - return err - } - - // if video stream was already set, do nothing - if !changed { - return nil - } - - videoID := stream.ID() - peer.metrics.SetVideoID(videoID) - - peer.logger.Info().Str("video_id", videoID).Msg("set video") - - go peer.session.Send( - event.SIGNAL_VIDEO, - message.SignalVideo{ - Video: videoID, - Auto: peer.videoAuto, - }) - - return nil -} - -func (peer *WebRTCPeerCtx) VideoID() (string, bool) { - peer.mu.Lock() - defer peer.mu.Unlock() - - stream, ok := peer.videoTrack.Stream() - if !ok { - return "", false - } - - return stream.ID(), true -} - func (peer *WebRTCPeerCtx) SetPaused(isPaused bool) error { peer.mu.Lock() defer peer.mu.Unlock() + peer.videoTrack.SetPaused(isPaused || peer.videoDisabled) + peer.audioTrack.SetPaused(isPaused || peer.audioDisabled) + peer.logger.Info().Bool("is_paused", isPaused).Msg("set paused") - peer.videoTrack.SetPaused(isPaused) - peer.audioTrack.SetPaused(isPaused) + peer.paused = isPaused + return nil } @@ -370,28 +328,149 @@ func (peer *WebRTCPeerCtx) Paused() bool { peer.mu.Lock() defer peer.mu.Unlock() - return peer.videoTrack.Paused() || peer.audioTrack.Paused() + return peer.paused } -func (peer *WebRTCPeerCtx) SetVideoAuto(videoAuto bool) { +// +// video +// + +func (peer *WebRTCPeerCtx) SetVideo(r types.PeerVideoRequest) error { peer.mu.Lock() defer peer.mu.Unlock() - // if estimator is enabled and is not passive, enable video auto bitrate - if peer.estimator != nil && !peer.estimatorConfig.Passive { - peer.logger.Info().Bool("video_auto", videoAuto).Msg("set video auto") - peer.videoAuto = videoAuto - } else { - peer.logger.Warn().Msg("estimator is disabled or in passive mode, cannot change video auto") - peer.videoAuto = false // ensure video auto is disabled + modified := false + + // video disabled + if r.Disabled != nil { + disabled := *r.Disabled + + // update only if changed + if peer.videoDisabled != disabled { + peer.videoDisabled = disabled + peer.videoTrack.SetPaused(disabled || peer.paused) + + peer.logger.Info().Bool("disabled", disabled).Msg("set video disabled") + modified = true + } + } + + // video selector + if r.Selector != nil { + selector := *r.Selector + + // get requested video stream from selector + stream, ok := peer.video.GetStream(selector) + if !ok { + return types.ErrWebRTCStreamNotFound + } + + // set video stream to track + changed, err := peer.videoTrack.SetStream(stream) + if err != nil { + return err + } + + // update only if stream changed + if changed { + videoID := stream.ID() + peer.metrics.SetVideoID(videoID) + + peer.logger.Info().Str("video_id", videoID).Msg("set video") + modified = true + } + } + + // video auto + if r.Auto != nil { + videoAuto := *r.Auto + + if peer.estimator == nil || peer.estimatorConfig.Passive { + peer.logger.Warn().Msg("estimator is disabled or in passive mode, cannot change video auto") + videoAuto = false // ensure video auto is disabled + } + + // update only if video auto changed + if peer.videoAuto != videoAuto { + peer.videoAuto = videoAuto + + peer.logger.Info().Bool("video_auto", videoAuto).Msg("set video auto") + modified = true + } + } + + // send video signal if modified + if modified { + go func() { + // in goroutine because of mutex and we don't want to block + peer.session.Send(event.SIGNAL_VIDEO, peer.Video()) + }() + } + + return nil +} + +func (peer *WebRTCPeerCtx) Video() types.PeerVideo { + peer.mu.Lock() + defer peer.mu.Unlock() + + // get current video stream ID + ID := "" + stream, ok := peer.videoTrack.Stream() + if ok { + ID = stream.ID() + } + + return types.PeerVideo{ + Disabled: peer.videoDisabled, + ID: ID, + Video: ID, // TODO: Remove, used for backward compatibility + Auto: peer.videoAuto, } } -func (peer *WebRTCPeerCtx) VideoAuto() bool { +// +// audio +// + +func (peer *WebRTCPeerCtx) SetAudio(r types.PeerAudioRequest) error { peer.mu.Lock() defer peer.mu.Unlock() - return peer.videoAuto + modified := false + + // audio disabled + if r.Disabled != nil { + disabled := *r.Disabled + + // update only if changed + if peer.audioDisabled != disabled { + peer.audioDisabled = disabled + peer.audioTrack.SetPaused(disabled || peer.paused) + + peer.logger.Info().Bool("disabled", disabled).Msg("set audio disabled") + modified = true + } + } + + // send video signal if modified + if modified { + go func() { + // in goroutine because of mutex and we don't want to block + peer.session.Send(event.SIGNAL_AUDIO, peer.Audio()) + }() + } + + return nil +} + +func (peer *WebRTCPeerCtx) Audio() types.PeerAudio { + peer.mu.Lock() + defer peer.mu.Unlock() + + return types.PeerAudio{ + Disabled: peer.audioDisabled, + } } // diff --git a/internal/webrtc/track.go b/internal/webrtc/track.go index f18122e2..907d6ad5 100644 --- a/internal/webrtc/track.go +++ b/internal/webrtc/track.go @@ -177,6 +177,7 @@ func (t *Track) SetPaused(paused bool) { // if there is no state change or no stream, do nothing if t.paused == paused || t.stream == nil { + t.paused = paused return } diff --git a/internal/websocket/handler/handler.go b/internal/websocket/handler/handler.go index c128c064..d42863a9 100644 --- a/internal/websocket/handler/handler.go +++ b/internal/websocket/handler/handler.go @@ -45,7 +45,7 @@ func (h *MessageHandlerCtx) Message(session types.Session, data types.WebSocketM // Signal Events case event.SIGNAL_REQUEST: - payload := &message.SignalVideo{} + payload := &message.SignalRequest{} err = utils.Unmarshal(payload, data.Payload, func() error { return h.signalRequest(session, payload) }) @@ -71,6 +71,11 @@ func (h *MessageHandlerCtx) Message(session types.Session, data types.WebSocketM err = utils.Unmarshal(payload, data.Payload, func() error { return h.signalVideo(session, payload) }) + case event.SIGNAL_AUDIO: + payload := &message.SignalAudio{} + err = utils.Unmarshal(payload, data.Payload, func() error { + return h.signalAudio(session, payload) + }) // Control Events case event.CONTROL_RELEASE: diff --git a/internal/websocket/handler/signal.go b/internal/websocket/handler/signal.go index 0fe6918a..e0e5fb88 100644 --- a/internal/websocket/handler/signal.go +++ b/internal/websocket/handler/signal.go @@ -9,17 +9,11 @@ import ( "github.com/pion/webrtc/v3" ) -func (h *MessageHandlerCtx) signalRequest(session types.Session, payload *message.SignalVideo) error { +func (h *MessageHandlerCtx) signalRequest(session types.Session, payload *message.SignalRequest) error { if !session.Profile().CanWatch { return errors.New("not allowed to watch") } - // use default first video, if not provided - if payload.Video == "" { - videos := h.capture.Video().IDs() - payload.Video = videos[0] - } - offer, peer, err := h.webrtc.CreatePeer(session) if err != nil { return err @@ -30,14 +24,38 @@ func (h *MessageHandlerCtx) signalRequest(session types.Session, payload *messag peer.SetPaused(true) } - // set video auto state - peer.SetVideoAuto(payload.Auto) + video := payload.Video + + // use default first video, if not provided + if video.Selector == nil { + videos := h.capture.Video().IDs() + video.Selector = &types.StreamSelector{ + ID: videos[0], + Type: types.StreamSelectorTypeExact, + } + } + + // TODO: Remove, used for compatibility with old clients. + if video.Auto == nil { + video.Auto = &payload.Auto + } // set video stream - err = peer.SetVideo(types.StreamSelector{ - ID: payload.Video, - Type: types.StreamSelectorTypeNearest, - }) + err = peer.SetVideo(video) + if err != nil { + return err + } + + audio := payload.Audio + + // enable by default if not requested otherwise + if audio.Disabled == nil { + disabled := false + audio.Disabled = &disabled + } + + // set audio stream + err = peer.SetAudio(audio) if err != nil { return err } @@ -47,6 +65,9 @@ func (h *MessageHandlerCtx) signalRequest(session types.Session, payload *messag message.SignalProvide{ SDP: offer.SDP, ICEServers: h.webrtc.ICEServers(), + + Video: peer.Video(), + Audio: peer.Audio(), }) return nil @@ -128,14 +149,14 @@ func (h *MessageHandlerCtx) signalVideo(session types.Session, payload *message. return errors.New("webRTC peer does not exist") } - peer.SetVideoAuto(payload.Auto) + return peer.SetVideo(payload.PeerVideoRequest) +} - if payload.Video != "" { - return peer.SetVideo(types.StreamSelector{ - ID: payload.Video, - Type: types.StreamSelectorTypeNearest, - }) +func (h *MessageHandlerCtx) signalAudio(session types.Session, payload *message.SignalAudio) error { + peer := session.GetWebRTCPeer() + if peer == nil { + return errors.New("webRTC peer does not exist") } - return nil + return peer.SetAudio(payload.PeerAudioRequest) } diff --git a/pkg/types/capture.go b/pkg/types/capture.go index 140cde44..4fbc68da 100644 --- a/pkg/types/capture.go +++ b/pkg/types/capture.go @@ -94,11 +94,11 @@ func (s StreamSelectorType) MarshalText() ([]byte, error) { type StreamSelector struct { // type of stream selector - Type StreamSelectorType + Type StreamSelectorType `json:"type"` // select stream by its ID - ID string + ID string `json:"id"` // select stream by its bitrate - Bitrate uint64 + Bitrate uint64 `json:"bitrate"` } type StreamSelectorManager interface { diff --git a/pkg/types/event/events.go b/pkg/types/event/events.go index 5a3f37dc..55e34f41 100644 --- a/pkg/types/event/events.go +++ b/pkg/types/event/events.go @@ -17,6 +17,7 @@ const ( SIGNAL_PROVIDE = "signal/provide" SIGNAL_CANDIDATE = "signal/candidate" SIGNAL_VIDEO = "signal/video" + SIGNAL_AUDIO = "signal/audio" SIGNAL_CLOSE = "signal/close" ) diff --git a/pkg/types/message/messages.go b/pkg/types/message/messages.go index 3b536b1e..1bded7b4 100644 --- a/pkg/types/message/messages.go +++ b/pkg/types/message/messages.go @@ -45,9 +45,19 @@ type SystemDisconnect struct { // Signal ///////////////////////////// +type SignalRequest struct { + Video types.PeerVideoRequest `json:"video"` + Audio types.PeerAudioRequest `json:"audio"` + + Auto bool `json:"auto"` // TODO: Remove this +} + type SignalProvide struct { SDP string `json:"sdp"` ICEServers []types.ICEServer `json:"iceservers"` + + Video types.PeerVideo `json:"video"` + Audio types.PeerAudio `json:"audio"` } type SignalCandidate struct { @@ -59,8 +69,11 @@ type SignalDescription struct { } type SignalVideo struct { - Video string `json:"video"` - Auto bool `json:"auto"` + types.PeerVideoRequest +} + +type SignalAudio struct { + types.PeerAudioRequest } ///////////////////////////// diff --git a/pkg/types/webrtc.go b/pkg/types/webrtc.go index 0b6f026c..5469a3b1 100644 --- a/pkg/types/webrtc.go +++ b/pkg/types/webrtc.go @@ -18,18 +18,40 @@ type ICEServer struct { Credential string `mapstructure:"credential" json:"credential,omitempty"` } +type PeerVideo struct { + Disabled bool `json:"disabled"` + ID string `json:"id"` + Video string `json:"video"` // TODO: Remove this, used for compatibility with old clients. + Auto bool `json:"auto"` +} + +type PeerVideoRequest struct { + Disabled *bool `json:"disabled,omitempty"` + Selector *StreamSelector `json:"selector,omitempty"` + Auto *bool `json:"auto,omitempty"` +} + +type PeerAudio struct { + Disabled bool `json:"disabled"` +} + +type PeerAudioRequest struct { + Disabled *bool `json:"disabled,omitempty"` +} + type WebRTCPeer interface { CreateOffer(ICERestart bool) (*webrtc.SessionDescription, error) CreateAnswer() (*webrtc.SessionDescription, error) SetRemoteDescription(webrtc.SessionDescription) error SetCandidate(webrtc.ICECandidateInit) error - SetVideo(StreamSelector) error - VideoID() (string, bool) SetPaused(isPaused bool) error Paused() bool - SetVideoAuto(auto bool) - VideoAuto() bool + + SetVideo(PeerVideoRequest) error + Video() PeerVideo + SetAudio(PeerAudioRequest) error + Audio() PeerAudio SendCursorPosition(x, y int) error SendCursorImage(cur *CursorImage, img []byte) error