refactor single video track to an array.

This commit is contained in:
Miroslav Šedivý 2021-02-05 17:40:29 +01:00
parent ebd7e7c065
commit aa7a131da2
3 changed files with 76 additions and 47 deletions

View File

@ -19,7 +19,8 @@ type CaptureManagerCtx struct {
broadcast *BroacastManagerCtx broadcast *BroacastManagerCtx
screencast *ScreencastManagerCtx screencast *ScreencastManagerCtx
audio *StreamManagerCtx audio *StreamManagerCtx
video *StreamManagerCtx videos map[string]*StreamManagerCtx
videoIDs []string
} }
func New(desktop types.DesktopManager, config *config.Capture) *CaptureManagerCtx { func New(desktop types.DesktopManager, config *config.Capture) *CaptureManagerCtx {
@ -87,7 +88,10 @@ func New(desktop types.DesktopManager, config *config.Capture) *CaptureManagerCt
broadcast: broadcastNew(broadcastPipeline), broadcast: broadcastNew(broadcastPipeline),
screencast: screencastNew(config.Screencast, screencastPipeline), screencast: screencastNew(config.Screencast, screencastPipeline),
audio: streamNew(config.AudioCodec, audioPipeline), audio: streamNew(config.AudioCodec, audioPipeline),
video: streamNew(config.VideoCodec, videoPipeline), videos: map[string]*StreamManagerCtx{
"hq": streamNew(config.VideoCodec, videoPipeline),
},
videoIDs: []string{ "hq" },
} }
} }
@ -99,8 +103,10 @@ func (manager *CaptureManagerCtx) Start() {
} }
manager.desktop.OnBeforeScreenSizeChange(func() { manager.desktop.OnBeforeScreenSizeChange(func() {
if manager.video.Started() { for _, video := range manager.videos {
manager.video.destroyPipeline() if video.Started() {
video.destroyPipeline()
}
} }
if manager.broadcast.Started() { if manager.broadcast.Started() {
@ -113,11 +119,13 @@ func (manager *CaptureManagerCtx) Start() {
}) })
manager.desktop.OnAfterScreenSizeChange(func() { manager.desktop.OnAfterScreenSizeChange(func() {
if manager.video.Started() { for _, video := range manager.videos {
if err := manager.video.createPipeline(); err != nil { if video.Started() {
if err := video.createPipeline(); err != nil {
manager.logger.Panic().Err(err).Msg("unable to recreate video pipeline") manager.logger.Panic().Err(err).Msg("unable to recreate video pipeline")
} }
} }
}
if manager.broadcast.Started() { if manager.broadcast.Started() {
if err := manager.broadcast.createPipeline(); err != nil { if err := manager.broadcast.createPipeline(); err != nil {
@ -140,7 +148,11 @@ func (manager *CaptureManagerCtx) Shutdown() error {
manager.screencast.shutdown() manager.screencast.shutdown()
manager.audio.shutdown() manager.audio.shutdown()
manager.video.shutdown()
for _, video := range manager.videos {
video.shutdown()
}
return nil return nil
} }
@ -156,8 +168,12 @@ func (manager *CaptureManagerCtx) Audio() types.StreamManager {
return manager.audio return manager.audio
} }
func (manager *CaptureManagerCtx) Video() types.StreamManager { func (manager *CaptureManagerCtx) Video(videoID string) types.StreamManager {
return manager.video return manager.videos[videoID]
}
func (manager *CaptureManagerCtx) VideoIDs() []string {
return manager.videoIDs
} }
func (manager *CaptureManagerCtx) StartStream() { func (manager *CaptureManagerCtx) StartStream() {
@ -166,14 +182,13 @@ func (manager *CaptureManagerCtx) StartStream() {
manager.logger.Info().Msgf("starting stream pipelines") manager.logger.Info().Msgf("starting stream pipelines")
var err error for _, video := range manager.videos {
err = manager.Video().Start() if err := video.Start(); err != nil {
if err != nil {
manager.logger.Panic().Err(err).Msg("unable to start video pipeline") manager.logger.Panic().Err(err).Msg("unable to start video pipeline")
} }
}
err = manager.Audio().Start() if err := manager.audio.Start(); err != nil {
if err != nil {
manager.logger.Panic().Err(err).Msg("unable to start audio pipeline") manager.logger.Panic().Err(err).Msg("unable to start audio pipeline")
} }
@ -186,8 +201,11 @@ func (manager *CaptureManagerCtx) StopStream() {
manager.logger.Info().Msgf("stopping stream pipelines") manager.logger.Info().Msgf("stopping stream pipelines")
manager.Video().Stop() for _, video := range manager.videos {
manager.Audio().Stop() video.Stop()
}
manager.audio.Stop()
manager.streaming = false manager.streaming = false
} }

View File

@ -37,7 +37,8 @@ type CaptureManager interface {
Broadcast() BroadcastManager Broadcast() BroadcastManager
Screencast() ScreencastManager Screencast() ScreencastManager
Audio() StreamManager Audio() StreamManager
Video() StreamManager Video(videoID string) StreamManager
VideoIDs() []string
StartStream() StartStream()
StopStream() StopStream()

View File

@ -20,8 +20,7 @@ import (
func New(desktop types.DesktopManager, capture types.CaptureManager, config *config.WebRTC) *WebRTCManagerCtx { func New(desktop types.DesktopManager, capture types.CaptureManager, config *config.WebRTC) *WebRTCManagerCtx {
return &WebRTCManagerCtx{ return &WebRTCManagerCtx{
logger: log.With().Str("module", "webrtc").Logger(), logger: log.With().Str("module", "webrtc").Logger(),
videoCodec: capture.Video().Codec(), defaultVideoID: capture.VideoIDs()[0],
audioCodec: capture.Audio().Codec(),
desktop: desktop, desktop: desktop,
capture: capture, capture: capture,
config: config, config: config,
@ -30,9 +29,9 @@ func New(desktop types.DesktopManager, capture types.CaptureManager, config *con
type WebRTCManagerCtx struct { type WebRTCManagerCtx struct {
logger zerolog.Logger logger zerolog.Logger
videoTrack *webrtc.TrackLocalStaticSample videoTracks map[string]*webrtc.TrackLocalStaticSample
audioTrack *webrtc.TrackLocalStaticSample audioTrack *webrtc.TrackLocalStaticSample
videoCodec codec.RTPCodec defaultVideoID string
audioCodec codec.RTPCodec audioCodec codec.RTPCodec
desktop types.DesktopManager desktop types.DesktopManager
capture types.CaptureManager capture types.CaptureManager
@ -43,29 +42,37 @@ func (manager *WebRTCManagerCtx) Start() {
var err error var err error
// create audio track // create audio track
manager.audioTrack, err = webrtc.NewTrackLocalStaticSample(manager.audioCodec.Capability, "audio", "stream") audio := manager.capture.Audio()
manager.audioTrack, err = webrtc.NewTrackLocalStaticSample(audio.Codec().Capability, "audio", "stream")
if err != nil { if err != nil {
manager.logger.Panic().Err(err).Msg("unable to create audio track") manager.logger.Panic().Err(err).Msg("unable to create audio track")
} }
manager.capture.Audio().OnSample(func(sample types.Sample) { audio.OnSample(func(sample types.Sample) {
if err := manager.audioTrack.WriteSample(media.Sample(sample)); err != nil && err != io.ErrClosedPipe { if err := manager.audioTrack.WriteSample(media.Sample(sample)); err != nil && err != io.ErrClosedPipe {
manager.logger.Warn().Err(err).Msg("audio pipeline failed to write") manager.logger.Warn().Err(err).Msg("audio pipeline failed to write")
} }
}) })
// create video track videoIDs := manager.capture.VideoIDs()
manager.videoTrack, err = webrtc.NewTrackLocalStaticSample(manager.videoCodec.Capability, "video", "stream") manager.videoTracks = map[string]*webrtc.TrackLocalStaticSample{}
for _, videoID := range videoIDs {
video := manager.capture.Video(videoID)
track, err := webrtc.NewTrackLocalStaticSample(video.Codec().Capability, "video", "stream")
if err != nil { if err != nil {
manager.logger.Panic().Err(err).Msg("unable to create video track") manager.logger.Panic().Err(err).Msgf("unable to create video (%s) track", videoID)
} }
manager.capture.Video().OnSample(func(sample types.Sample) { video.OnSample(func(sample types.Sample) {
if err := manager.videoTrack.WriteSample(media.Sample(sample)); err != nil && err != io.ErrClosedPipe { if err := track.WriteSample(media.Sample(sample)); err != nil && err != io.ErrClosedPipe {
manager.logger.Warn().Err(err).Msg("video pipeline failed to write") manager.logger.Warn().Err(err).Msgf("video (%s) pipeline failed to write", videoID)
} }
}) })
manager.videoTracks[videoID] = track
}
manager.logger.Info(). manager.logger.Info().
Str("ice_lite", fmt.Sprintf("%t", manager.config.ICELite)). Str("ice_lite", fmt.Sprintf("%t", manager.config.ICELite)).
Str("ice_trickle", fmt.Sprintf("%t", manager.config.ICETrickle)). Str("ice_trickle", fmt.Sprintf("%t", manager.config.ICETrickle)).
@ -137,7 +144,7 @@ func (manager *WebRTCManagerCtx) CreatePeer(session types.Session) (*webrtc.Sess
return nil, err return nil, err
} }
videoTransceiver, err := connection.AddTransceiverFromTrack(manager.videoTrack, webrtc.RtpTransceiverInit{ videoTransceiver, err := connection.AddTransceiverFromTrack(manager.videoTracks[manager.defaultVideoID], webrtc.RtpTransceiverInit{
Direction: webrtc.RTPTransceiverDirectionSendonly, Direction: webrtc.RTPTransceiverDirectionSendonly,
}) })
if err != nil { if err != nil {
@ -209,11 +216,14 @@ func (manager *WebRTCManagerCtx) CreatePeer(session types.Session) (*webrtc.Sess
func (manager *WebRTCManagerCtx) mediaEngine() (*webrtc.MediaEngine, error) { func (manager *WebRTCManagerCtx) mediaEngine() (*webrtc.MediaEngine, error) {
engine := &webrtc.MediaEngine{} engine := &webrtc.MediaEngine{}
if err := manager.videoCodec.Register(engine); err != nil { // all videos must have the same codec
videoCodec := manager.capture.Video(manager.defaultVideoID).Codec()
if err := videoCodec.Register(engine); err != nil {
return nil, err return nil, err
} }
if err := manager.audioCodec.Register(engine); err != nil { audioCodec := manager.capture.Audio().Codec()
if err := audioCodec.Register(engine); err != nil {
return nil, err return nil, err
} }