From 7902d7b1f11c58301f6161f9871b59d432a015da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20=C5=A0ediv=C3=BD?= Date: Thu, 4 Feb 2021 20:39:48 +0000 Subject: [PATCH] Implement Audio & Video using custom StreamManager. --- internal/capture/manager.go | 181 +++++++----------------------------- internal/capture/stream.go | 126 +++++++++++++++++++++++++ internal/types/capture.go | 19 ++-- internal/webrtc/manager.go | 79 ++++++++-------- internal/webrtc/peer.go | 20 +++- 5 files changed, 229 insertions(+), 196 deletions(-) create mode 100644 internal/capture/stream.go diff --git a/internal/capture/manager.go b/internal/capture/manager.go index 61734d22..2f2b660a 100644 --- a/internal/capture/manager.go +++ b/internal/capture/manager.go @@ -3,45 +3,36 @@ package capture import ( "sync" - "github.com/kataras/go-events" "github.com/rs/zerolog" "github.com/rs/zerolog/log" "demodesk/neko/internal/types" - "demodesk/neko/internal/types/codec" "demodesk/neko/internal/config" - "demodesk/neko/internal/capture/gst" ) type CaptureManagerCtx struct { - logger zerolog.Logger - mu sync.Mutex - video *gst.Pipeline - audio *gst.Pipeline - config *config.Capture - emit_update chan bool - emit_stop chan bool - video_sample chan types.Sample - audio_sample chan types.Sample - emmiter events.EventEmmiter - streaming bool - desktop types.DesktopManager - broadcast *BroacastManagerCtx - screencast *ScreencastManagerCtx + logger zerolog.Logger + mu sync.Mutex + config *config.Capture + desktop types.DesktopManager + streaming bool + broadcast *BroacastManagerCtx + screencast *ScreencastManagerCtx + audio *StreamManagerCtx + video *StreamManagerCtx } func New(desktop types.DesktopManager, config *config.Capture) *CaptureManagerCtx { return &CaptureManagerCtx{ - logger: log.With().Str("module", "capture").Logger(), - mu: sync.Mutex{}, - emit_update: make(chan bool), - emit_stop: make(chan bool), - emmiter: events.New(), - config: config, - streaming: false, - desktop: desktop, - broadcast: broadcastNew(config), - screencast: screencastNew(config), + logger: log.With().Str("module", "capture").Logger(), + mu: sync.Mutex{}, + config: config, + desktop: desktop, + streaming: false, + broadcast: broadcastNew(config), + screencast: screencastNew(config), + audio: streamNew(config.AudioCodec, config.Device, config.AudioParams), + video: streamNew(config.VideoCodec, config.Display, config.VideoParams), } } @@ -53,8 +44,8 @@ func (manager *CaptureManagerCtx) Start() { } manager.desktop.OnBeforeScreenSizeChange(func() { - if manager.Streaming() { - manager.destroyVideoPipeline() + if manager.video.Enabled() { + manager.video.destroyPipeline() } if manager.broadcast.Enabled() { @@ -67,8 +58,10 @@ func (manager *CaptureManagerCtx) Start() { }) manager.desktop.OnAfterScreenSizeChange(func() { - if manager.Streaming() { - manager.createVideoPipeline() + if manager.video.Enabled() { + if err := manager.video.createPipeline(); err != nil { + manager.logger.Panic().Err(err).Msg("unable to recreate video pipeline") + } } if manager.broadcast.Enabled() { @@ -83,38 +76,16 @@ func (manager *CaptureManagerCtx) Start() { } } }) - - go func() { - manager.logger.Debug().Msg("started emitting samples") - - for { - select { - case <-manager.emit_stop: - manager.logger.Debug().Msg("stopped emitting samples") - return - case <-manager.emit_update: - manager.logger.Debug().Msg("update emitting samples") - case sample := <-manager.video_sample: - manager.emmiter.Emit("video", sample) - case sample := <-manager.audio_sample: - manager.emmiter.Emit("audio", sample) - } - } - }() } func (manager *CaptureManagerCtx) Shutdown() error { manager.logger.Info().Msgf("capture shutting down") - if manager.Streaming() { - manager.StopStream() - } - manager.broadcast.destroyPipeline() manager.screencast.destroyPipeline() - - manager.emit_stop <- true manager.screencast.shutdown <- true + manager.audio.Shutdown() + manager.video.Shutdown() return nil } @@ -126,34 +97,22 @@ func (manager *CaptureManagerCtx) Screencast() types.ScreencastManager { return manager.screencast } -func (manager *CaptureManagerCtx) VideoCodec() codec.RTPCodec { - return manager.config.VideoCodec +func (manager *CaptureManagerCtx) Audio() types.StreamManager { + return manager.audio } -func (manager *CaptureManagerCtx) AudioCodec() codec.RTPCodec { - return manager.config.AudioCodec -} - -func (manager *CaptureManagerCtx) OnVideoFrame(listener func(sample types.Sample)) { - manager.emmiter.On("video", func(payload ...interface{}) { - listener(payload[0].(types.Sample)) - }) -} - -func (manager *CaptureManagerCtx) OnAudioFrame(listener func(sample types.Sample)) { - manager.emmiter.On("audio", func(payload ...interface{}) { - listener(payload[0].(types.Sample)) - }) +func (manager *CaptureManagerCtx) Video() types.StreamManager { + return manager.video } func (manager *CaptureManagerCtx) StartStream() { manager.mu.Lock() defer manager.mu.Unlock() - manager.logger.Info().Msgf("starting pipelines") + manager.logger.Info().Msgf("starting stream pipelines") - manager.createVideoPipeline() - manager.createAudioPipeline() + manager.Video().Start() + manager.Audio().Start() manager.streaming = true } @@ -161,10 +120,10 @@ func (manager *CaptureManagerCtx) StopStream() { manager.mu.Lock() defer manager.mu.Unlock() - manager.logger.Info().Msgf("stopping pipelines") + manager.logger.Info().Msgf("stopping stream pipelines") - manager.destroyVideoPipeline() - manager.destroyAudioPipeline() + manager.Video().Stop() + manager.Audio().Stop() manager.streaming = false } @@ -174,71 +133,3 @@ func (manager *CaptureManagerCtx) Streaming() bool { return manager.streaming } - -func (manager *CaptureManagerCtx) createVideoPipeline() { - var err error - - manager.logger.Info(). - Str("video_codec", manager.config.VideoCodec.Name). - Str("video_display", manager.config.Display). - Str("video_params", manager.config.VideoParams). - Msgf("creating video pipeline") - - manager.video, err = gst.CreateAppPipeline( - manager.config.VideoCodec, - manager.config.Display, - manager.config.VideoParams, - ) - - if err != nil { - manager.logger.Panic().Err(err).Msg("unable to create video pipeline") - } - - manager.logger.Info(). - Str("src", manager.video.Src). - Msgf("starting video pipeline") - - manager.video.Start() - - manager.video_sample = manager.video.Sample - manager.emit_update <-true -} - -func (manager *CaptureManagerCtx) destroyVideoPipeline() { - manager.logger.Info().Msgf("stopping video pipeline") - manager.video.Stop() -} - -func (manager *CaptureManagerCtx) createAudioPipeline() { - var err error - - manager.logger.Info(). - Str("audio_codec", manager.config.AudioCodec.Name). - Str("audio_display", manager.config.Device). - Str("audio_params", manager.config.AudioParams). - Msgf("creating audio pipeline") - - manager.audio, err = gst.CreateAppPipeline( - manager.config.AudioCodec, - manager.config.Device, - manager.config.AudioParams, - ) - - if err != nil { - manager.logger.Panic().Err(err).Msg("unable to create audio pipeline") - } - - manager.logger.Info(). - Str("src", manager.audio.Src). - Msgf("starting audio pipeline") - - manager.audio.Start() - - manager.audio_sample = manager.audio.Sample - manager.emit_update <-true -} - -func (manager *CaptureManagerCtx) destroyAudioPipeline() { - manager.logger.Info().Msgf("stopping audio pipeline") - manager.audio.Stop() -} diff --git a/internal/capture/stream.go b/internal/capture/stream.go new file mode 100644 index 00000000..e5494a2d --- /dev/null +++ b/internal/capture/stream.go @@ -0,0 +1,126 @@ +package capture + +import ( + "sync" + + "github.com/kataras/go-events" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" + + "demodesk/neko/internal/types" + "demodesk/neko/internal/types/codec" + "demodesk/neko/internal/capture/gst" +) + +type StreamManagerCtx struct { + logger zerolog.Logger + mu sync.Mutex + codec codec.RTPCodec + pipelineDevice string + pipelineSrc string + pipeline *gst.Pipeline + sample chan types.Sample + emmiter events.EventEmmiter + emitUpdate chan bool + emitStop chan bool + enabled bool +} + +func streamNew(codec codec.RTPCodec, pipelineDevice string, pipelineSrc string) *StreamManagerCtx { + manager := &StreamManagerCtx{ + logger: log.With().Str("module", "capture").Str("submodule", "stream").Logger(), + mu: sync.Mutex{}, + codec: codec, + pipelineDevice: pipelineDevice, + pipelineSrc: pipelineSrc, + emmiter: events.New(), + emitUpdate: make(chan bool), + emitStop: make(chan bool), + enabled: false, + } + + go func() { + manager.logger.Debug().Msg("started emitting samples") + + for { + select { + case <-manager.emitStop: + manager.logger.Debug().Msg("stopped emitting samples") + return + case <-manager.emitUpdate: + manager.logger.Debug().Msg("update emitting samples") + case sample := <-manager.sample: + manager.emmiter.Emit("sample", sample) + } + } + }() + + return manager +} + +func (manager *StreamManagerCtx) Shutdown() { + manager.logger.Info().Msgf("shutting down") + + manager.destroyPipeline() + manager.emitStop <- true +} + +func (manager *StreamManagerCtx) Codec() codec.RTPCodec { + return manager.codec +} + +func (manager *StreamManagerCtx) OnSample(listener func(sample types.Sample)) { + manager.emmiter.On("sample", func(payload ...interface{}) { + listener(payload[0].(types.Sample)) + }) +} + +func (manager *StreamManagerCtx) Start() { + manager.enabled = true + manager.createPipeline() +} + +func (manager *StreamManagerCtx) Stop() { + manager.enabled = false + manager.destroyPipeline() +} + +func (manager *StreamManagerCtx) Enabled() bool { + return manager.enabled +} + +func (manager *StreamManagerCtx) createPipeline() error { + var err error + + codec := manager.Codec() + manager.logger.Info(). + Str("codec", codec.Name). + Str("device", manager.pipelineDevice). + Str("src", manager.pipelineSrc). + Msgf("creating pipeline") + + manager.pipeline, err = gst.CreateAppPipeline(codec, manager.pipelineDevice, manager.pipelineSrc) + if err != nil { + return err + } + + manager.logger.Info(). + Str("src", manager.pipeline.Src). + Msgf("starting pipeline") + + manager.pipeline.Start() + + manager.sample = manager.pipeline.Sample + manager.emitUpdate <-true + return nil +} + +func (manager *StreamManagerCtx) destroyPipeline() { + if manager.pipeline == nil { + return + } + + manager.pipeline.Stop() + manager.logger.Info().Msgf("stopping pipeline") + manager.pipeline = nil +} diff --git a/internal/types/capture.go b/internal/types/capture.go index 030a8ea9..bc61813d 100644 --- a/internal/types/capture.go +++ b/internal/types/capture.go @@ -21,18 +21,25 @@ type ScreencastManager interface { Image() ([]byte, error) } +type StreamManager interface { + Shutdown() + + Codec() codec.RTPCodec + OnSample(listener func(sample Sample)) + + Start() + Stop() + Enabled() bool +} + type CaptureManager interface { Start() Shutdown() error Broadcast() BroadcastManager Screencast() ScreencastManager - - VideoCodec() codec.RTPCodec - AudioCodec() codec.RTPCodec - - OnVideoFrame(listener func(sample Sample)) - OnAudioFrame(listener func(sample Sample)) + Audio() StreamManager + Video() StreamManager StartStream() StopStream() diff --git a/internal/webrtc/manager.go b/internal/webrtc/manager.go index a7e836d8..f8e0f0a2 100644 --- a/internal/webrtc/manager.go +++ b/internal/webrtc/manager.go @@ -19,48 +19,48 @@ import ( func New(desktop types.DesktopManager, capture types.CaptureManager, config *config.WebRTC) *WebRTCManagerCtx { return &WebRTCManagerCtx{ - logger: log.With().Str("module", "webrtc").Logger(), - desktop: desktop, - capture: capture, - config: config, + logger: log.With().Str("module", "webrtc").Logger(), + videoCodec: capture.Video().Codec(), + audioCodec: capture.Audio().Codec(), + desktop: desktop, + capture: capture, + config: config, } } type WebRTCManagerCtx struct { - logger zerolog.Logger - videoTrack *webrtc.TrackLocalStaticSample - audioTrack *webrtc.TrackLocalStaticSample - videoCodec codec.RTPCodec - audioCodec codec.RTPCodec - desktop types.DesktopManager - capture types.CaptureManager - config *config.WebRTC + logger zerolog.Logger + videoTrack *webrtc.TrackLocalStaticSample + audioTrack *webrtc.TrackLocalStaticSample + videoCodec codec.RTPCodec + audioCodec codec.RTPCodec + desktop types.DesktopManager + capture types.CaptureManager + config *config.WebRTC } func (manager *WebRTCManagerCtx) Start() { var err error // create audio track - manager.audioCodec = manager.capture.AudioCodec() manager.audioTrack, err = webrtc.NewTrackLocalStaticSample(manager.audioCodec.Capability, "audio", "stream") if err != nil { manager.logger.Panic().Err(err).Msg("unable to create audio track") } - manager.capture.OnAudioFrame(func(sample types.Sample) { + manager.capture.Audio().OnSample(func(sample types.Sample) { if err := manager.audioTrack.WriteSample(media.Sample(sample)); err != nil && err != io.ErrClosedPipe { manager.logger.Warn().Err(err).Msg("audio pipeline failed to write") } }) // create video track - manager.videoCodec = manager.capture.VideoCodec() manager.videoTrack, err = webrtc.NewTrackLocalStaticSample(manager.videoCodec.Capability, "video", "stream") if err != nil { manager.logger.Panic().Err(err).Msg("unable to create video track") } - manager.capture.OnVideoFrame(func(sample types.Sample) { + manager.capture.Video().OnSample(func(sample types.Sample) { if err := manager.videoTrack.WriteSample(media.Sample(sample)); err != nil && err != io.ErrClosedPipe { manager.logger.Warn().Err(err).Msg("video pipeline failed to write") } @@ -91,6 +91,7 @@ func (manager *WebRTCManagerCtx) ICEServers() []string { func (manager *WebRTCManagerCtx) CreatePeer(session types.Session) (*webrtc.SessionDescription, error) { logger := manager.logger.With().Str("id", session.ID()).Logger() + // Create MediaEngine engine, err := manager.mediaEngine() if err != nil { return nil, err @@ -129,7 +130,21 @@ func (manager *WebRTCManagerCtx) CreatePeer(session types.Session) (*webrtc.Sess }) } - if err := manager.registerTracks(connection); err != nil { + audioTransceiver, err := connection.AddTransceiverFromTrack(manager.audioTrack, webrtc.RtpTransceiverInit{ + Direction: webrtc.RTPTransceiverDirectionSendonly, + }) + if err != nil { + return nil, err + } + + videoTransceiver, err := connection.AddTransceiverFromTrack(manager.videoTrack, webrtc.RtpTransceiverInit{ + Direction: webrtc.RTPTransceiverDirectionSendonly, + }) + if err != nil { + return nil, err + } + + if _, err := connection.CreateDataChannel("data", nil); err != nil { return nil, err } @@ -179,18 +194,19 @@ func (manager *WebRTCManagerCtx) CreatePeer(session types.Session) (*webrtc.Sess }) session.SetWebRTCPeer(&WebRTCPeerCtx{ - api: api, - engine: engine, - settings: settings, - connection: connection, - configuration: configuration, + api: api, + engine: engine, + settings: settings, + connection: connection, + configuration: configuration, + audioTransceiver: audioTransceiver, + videoTransceiver: videoTransceiver, }) return connection.LocalDescription(), nil } func (manager *WebRTCManagerCtx) mediaEngine() (*webrtc.MediaEngine, error) { - // Create MediaEngine engine := &webrtc.MediaEngine{} if err := manager.videoCodec.Register(engine); err != nil { @@ -235,20 +251,3 @@ func (manager *WebRTCManagerCtx) apiConfiguration() *webrtc.Configuration { SDPSemantics: webrtc.SDPSemanticsUnifiedPlanWithFallback, } } - -func (manager *WebRTCManagerCtx) registerTracks(connection *webrtc.PeerConnection) error { - if _, err := connection.AddTransceiverFromTrack(manager.videoTrack, webrtc.RtpTransceiverInit{ - Direction: webrtc.RTPTransceiverDirectionSendonly, - }); err != nil { - return err - } - - if _, err := connection.AddTransceiverFromTrack(manager.audioTrack, webrtc.RtpTransceiverInit{ - Direction: webrtc.RTPTransceiverDirectionSendonly, - }); err != nil { - return err - } - - _, err := connection.CreateDataChannel("data", nil) - return err -} diff --git a/internal/webrtc/peer.go b/internal/webrtc/peer.go index e6e8e05f..f4240cac 100644 --- a/internal/webrtc/peer.go +++ b/internal/webrtc/peer.go @@ -3,11 +3,13 @@ package webrtc import "github.com/pion/webrtc/v3" type WebRTCPeerCtx struct { - api *webrtc.API - engine *webrtc.MediaEngine - settings *webrtc.SettingEngine - connection *webrtc.PeerConnection - configuration *webrtc.Configuration + api *webrtc.API + engine *webrtc.MediaEngine + settings *webrtc.SettingEngine + connection *webrtc.PeerConnection + configuration *webrtc.Configuration + audioTransceiver *webrtc.RTPTransceiver + videoTransceiver *webrtc.RTPTransceiver } func (webrtc_peer *WebRTCPeerCtx) SignalAnswer(sdp string) error { @@ -21,6 +23,14 @@ func (webrtc_peer *WebRTCPeerCtx) SignalCandidate(candidate webrtc.ICECandidateI return webrtc_peer.connection.AddICECandidate(candidate) } +func (webrtc_peer *WebRTCPeerCtx) ReplaceAudioTrack(track webrtc.TrackLocal) error { + return webrtc_peer.audioTransceiver.Sender().ReplaceTrack(track) +} + +func (webrtc_peer *WebRTCPeerCtx) ReplaceVideoTrack(track webrtc.TrackLocal) error { + return webrtc_peer.videoTransceiver.Sender().ReplaceTrack(track) +} + func (webrtc_peer *WebRTCPeerCtx) Destroy() error { if webrtc_peer.connection == nil || webrtc_peer.connection.ConnectionState() != webrtc.PeerConnectionStateConnected { return nil