diff --git a/internal/webrtc/manager.go b/internal/webrtc/manager.go index 4cc5fc83..01e4f9d1 100644 --- a/internal/webrtc/manager.go +++ b/internal/webrtc/manager.go @@ -8,7 +8,6 @@ import ( "sync" "time" - "github.com/pion/interceptor" "github.com/pion/webrtc/v3" "github.com/pion/webrtc/v3/pkg/media" "github.com/rs/zerolog" @@ -19,7 +18,6 @@ import ( "demodesk/neko/internal/types/event" "demodesk/neko/internal/types/message" "demodesk/neko/internal/webrtc/cursor" - "demodesk/neko/internal/webrtc/pionlog" ) // the duration without network activity before a Agent is considered disconnected. Default is 5 Seconds @@ -33,27 +31,31 @@ const keepAliveInterval = 2 * time.Second func New(desktop types.DesktopManager, capture types.CaptureManager, config *config.WebRTC) *WebRTCManagerCtx { return &WebRTCManagerCtx{ - logger: log.With().Str("module", "webrtc").Logger(), - desktop: desktop, - capture: capture, - config: config, + logger: log.With().Str("module", "webrtc").Logger(), + config: config, + + desktop: desktop, + capture: capture, + curImage: cursor.NewImage(desktop), + curPosition: cursor.NewPosition(desktop), + participants: 0, - curImage: cursor.NewImage(desktop), - curPosition: cursor.NewPosition(desktop), } } type WebRTCManagerCtx struct { - mu sync.Mutex - logger zerolog.Logger - audioTrack *webrtc.TrackLocalStaticSample - audioStop func() - desktop types.DesktopManager - capture types.CaptureManager - config *config.WebRTC - participants uint32 - curImage *cursor.ImageCtx - curPosition *cursor.PositionCtx + mu sync.Mutex + logger zerolog.Logger + config *config.WebRTC + + desktop types.DesktopManager + capture types.CaptureManager + curImage *cursor.ImageCtx + curPosition *cursor.PositionCtx + + audioTrack *webrtc.TrackLocalStaticSample + audioListener func(sample types.Sample) + participants uint32 } func (manager *WebRTCManagerCtx) Start() { @@ -66,7 +68,7 @@ func (manager *WebRTCManagerCtx) Start() { manager.logger.Panic().Err(err).Msg("unable to create audio track") } - audioListener := func(sample types.Sample) { + manager.audioListener = func(sample types.Sample) { if err := manager.audioTrack.WriteSample(media.Sample(sample)); err != nil { if errors.Is(err, io.ErrClosedPipe) { // The peerConnection has been closed. @@ -75,11 +77,7 @@ func (manager *WebRTCManagerCtx) Start() { manager.logger.Warn().Err(err).Msg("audio pipeline failed to write") } } - - audio.AddListener(&audioListener) - manager.audioStop = func() { - audio.RemoveListener(&audioListener) - } + audio.AddListener(&manager.audioListener) manager.curImage.Start() @@ -98,7 +96,9 @@ func (manager *WebRTCManagerCtx) Shutdown() error { manager.curImage.Shutdown() manager.curPosition.Shutdown() - manager.audioStop() + audio := manager.capture.Audio() + audio.RemoveListener(&manager.audioListener) + return nil } @@ -111,30 +111,18 @@ func (manager *WebRTCManagerCtx) CreatePeer(session types.Session, videoID strin logger := manager.logger.With().Str("session_id", session.ID()).Logger() logger.Info().Msg("creating webrtc peer") - // Create MediaEngine - engine, err := manager.mediaEngine(videoID) + // all videos must have the same codec + videoStream, ok := manager.capture.Video(videoID) + if !ok { + return nil, types.ErrWebRTCVideoNotFound + } + + connection, err := manager.newPeerConnection(videoStream.Codec(), logger) if err != nil { return nil, err } - // Custom settings & configuration - settings := manager.apiSettings(logger) - configuration := manager.apiConfiguration() - - registry := &interceptor.Registry{} - if err := webrtc.RegisterDefaultInterceptors(engine, registry); err != nil { - return nil, err - } - - // Create NewAPI with MediaEngine and SettingEngine - api := webrtc.NewAPI(webrtc.WithMediaEngine(engine), webrtc.WithSettingEngine(settings), webrtc.WithInterceptorRegistry(registry)) - - connection, err := api.NewPeerConnection(*configuration) - if err != nil { - return nil, err - } - - // Asynchronously send local ICE Candidates + // asynchronously send local ICE Candidates if manager.config.ICETrickle { connection.OnICECandidate(func(candidate *webrtc.ICECandidate) { if candidate == nil { @@ -151,11 +139,6 @@ func (manager *WebRTCManagerCtx) CreatePeer(session types.Session, videoID strin } // create video track - videoStream, ok := manager.capture.Video(videoID) - if !ok { - return nil, types.ErrWebRTCVideoNotFound - } - videoTrack, err := webrtc.NewTrackLocalStaticSample(videoStream.Codec().Capability, "video", "stream") if err != nil { return nil, err @@ -345,69 +328,3 @@ func (manager *WebRTCManagerCtx) CreatePeer(session types.Session, videoID strin session.SetWebRTCPeer(peer) return peer.CreateOffer(false) } - -func (manager *WebRTCManagerCtx) mediaEngine(videoID string) (*webrtc.MediaEngine, error) { - engine := &webrtc.MediaEngine{} - - // all videos must have the same codec - video, ok := manager.capture.Video(videoID) - if !ok { - return nil, types.ErrWebRTCVideoNotFound - } - - videoCodec := video.Codec() - if err := videoCodec.Register(engine); err != nil { - return nil, err - } - - audioCodec := manager.capture.Audio().Codec() - if err := audioCodec.Register(engine); err != nil { - return nil, err - } - - return engine, nil -} - -func (manager *WebRTCManagerCtx) apiSettings(logger zerolog.Logger) webrtc.SettingEngine { - settings := webrtc.SettingEngine{ - LoggerFactory: pionlog.New(logger), - } - - //nolint - settings.SetEphemeralUDPPortRange(manager.config.EphemeralMin, manager.config.EphemeralMax) - settings.SetICETimeouts(disconnectedTimeout, failedTimeout, keepAliveInterval) - settings.SetNAT1To1IPs(manager.config.NAT1To1IPs, webrtc.ICECandidateTypeHost) - //settings.SetSRTPReplayProtectionWindow(512) - settings.SetLite(manager.config.ICELite) - - return settings -} - -func (manager *WebRTCManagerCtx) apiConfiguration() *webrtc.Configuration { - if manager.config.ICELite { - return &webrtc.Configuration{ - SDPSemantics: webrtc.SDPSemanticsUnifiedPlanWithFallback, - } - } - - ICEServers := []webrtc.ICEServer{} - for _, server := range manager.config.ICEServers { - var credential interface{} - if server.Credential != "" { - credential = server.Credential - } else { - credential = false - } - - ICEServers = append(ICEServers, webrtc.ICEServer{ - URLs: server.URLs, - Username: server.Username, - Credential: credential, - }) - } - - return &webrtc.Configuration{ - ICEServers: ICEServers, - SDPSemantics: webrtc.SDPSemanticsUnifiedPlanWithFallback, - } -} diff --git a/internal/webrtc/peerconnection.go b/internal/webrtc/peerconnection.go new file mode 100644 index 00000000..cab64f2e --- /dev/null +++ b/internal/webrtc/peerconnection.go @@ -0,0 +1,97 @@ +package webrtc + +import ( + "demodesk/neko/internal/types/codec" + "demodesk/neko/internal/webrtc/pionlog" + + "github.com/pion/interceptor" + "github.com/pion/webrtc/v3" + "github.com/rs/zerolog" +) + +func (manager *WebRTCManagerCtx) newPeerConnection(codec codec.RTPCodec, logger zerolog.Logger) (*webrtc.PeerConnection, error) { + // create media engine + engine, err := manager.mediaEngine(codec) + if err != nil { + return nil, err + } + + // create setting engine + settings := manager.settingEngine(logger) + + // create interceptor registry + registry := &interceptor.Registry{} + if err := webrtc.RegisterDefaultInterceptors(engine, registry); err != nil { + return nil, err + } + + // create new API + api := webrtc.NewAPI( + webrtc.WithMediaEngine(engine), + webrtc.WithSettingEngine(settings), + webrtc.WithInterceptorRegistry(registry), + ) + + // create new peer connection + configuration := manager.peerConfiguration() + return api.NewPeerConnection(configuration) +} + +func (manager *WebRTCManagerCtx) mediaEngine(codec codec.RTPCodec) (*webrtc.MediaEngine, error) { + engine := &webrtc.MediaEngine{} + + if err := codec.Register(engine); err != nil { + return nil, err + } + + audioCodec := manager.capture.Audio().Codec() + if err := audioCodec.Register(engine); err != nil { + return nil, err + } + + return engine, nil +} + +func (manager *WebRTCManagerCtx) settingEngine(logger zerolog.Logger) webrtc.SettingEngine { + settings := webrtc.SettingEngine{ + LoggerFactory: pionlog.New(logger), + } + + //nolint + settings.SetEphemeralUDPPortRange(manager.config.EphemeralMin, manager.config.EphemeralMax) + settings.SetICETimeouts(disconnectedTimeout, failedTimeout, keepAliveInterval) + settings.SetNAT1To1IPs(manager.config.NAT1To1IPs, webrtc.ICECandidateTypeHost) + //settings.SetSRTPReplayProtectionWindow(512) + settings.SetLite(manager.config.ICELite) + + return settings +} + +func (manager *WebRTCManagerCtx) peerConfiguration() webrtc.Configuration { + if manager.config.ICELite { + return webrtc.Configuration{ + SDPSemantics: webrtc.SDPSemanticsUnifiedPlanWithFallback, + } + } + + ICEServers := []webrtc.ICEServer{} + for _, server := range manager.config.ICEServers { + var credential interface{} + if server.Credential != "" { + credential = server.Credential + } else { + credential = false + } + + ICEServers = append(ICEServers, webrtc.ICEServer{ + URLs: server.URLs, + Username: server.Username, + Credential: credential, + }) + } + + return webrtc.Configuration{ + ICEServers: ICEServers, + SDPSemantics: webrtc.SDPSemanticsUnifiedPlanWithFallback, + } +}