diff --git a/docs/changelog.md b/docs/changelog.md index 2e166e32..a3e65c37 100644 --- a/docs/changelog.md +++ b/docs/changelog.md @@ -4,6 +4,7 @@ ### Misc - Automatic WebRTC SDP negotiation using onnegotiationneeded handlers. This allows adding/removing track on demand in a session. +- Added UDP and TCP mux for WebRTC connection. It should handle multiple peers. ## [n.eko v2.5](https://github.com/m1k1o/neko/releases/tag/v2.5) diff --git a/server/internal/types/config/webrtc.go b/server/internal/types/config/webrtc.go index 3924ce79..dfdfbfbe 100644 --- a/server/internal/types/config/webrtc.go +++ b/server/internal/types/config/webrtc.go @@ -20,6 +20,8 @@ type WebRTC struct { EphemeralMin uint16 EphemeralMax uint16 NAT1To1IPs []string + TCPMUX int + UDPMUX int } func (WebRTC) Init(cmd *cobra.Command) error { @@ -33,6 +35,16 @@ func (WebRTC) Init(cmd *cobra.Command) error { return err } + cmd.PersistentFlags().Int("tcpmux", 0, "single TCP mux port for all peers") + if err := viper.BindPFlag("tcpmux", cmd.PersistentFlags().Lookup("tcpmux")); err != nil { + return err + } + + cmd.PersistentFlags().Int("udpmux", 0, "single UDP mux port for all peers") + if err := viper.BindPFlag("udpmux", cmd.PersistentFlags().Lookup("udpmux")); err != nil { + return err + } + cmd.PersistentFlags().String("ipfetch", "http://checkip.amazonaws.com", "automatically fetch IP address from given URL when nat1to1 is not present") if err := viper.BindPFlag("ipfetch", cmd.PersistentFlags().Lookup("ipfetch")); err != nil { return err @@ -58,6 +70,8 @@ func (WebRTC) Init(cmd *cobra.Command) error { func (s *WebRTC) Set() { s.NAT1To1IPs = viper.GetStringSlice("nat1to1") + s.TCPMUX = viper.GetInt("tcpmux") + s.UDPMUX = viper.GetInt("udpmux") s.ICELite = viper.GetBool("icelite") s.ICEServers = []webrtc.ICEServer{} diff --git a/server/internal/webrtc/peer.go b/server/internal/webrtc/peer.go index 180a433b..9ff6d774 100644 --- a/server/internal/webrtc/peer.go +++ b/server/internal/webrtc/peer.go @@ -7,14 +7,10 @@ import ( ) type Peer struct { - id string - api *webrtc.API - engine *webrtc.MediaEngine - manager *WebRTCManager - settings *webrtc.SettingEngine - connection *webrtc.PeerConnection - configuration *webrtc.Configuration - mu sync.Mutex + id string + mu sync.Mutex + manager *WebRTCManager + connection *webrtc.PeerConnection } func (peer *Peer) CreateOffer() (string, error) { diff --git a/server/internal/webrtc/webrtc.go b/server/internal/webrtc/webrtc.go index 07613f5f..a97eb6fb 100644 --- a/server/internal/webrtc/webrtc.go +++ b/server/internal/webrtc/webrtc.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "io" + "net" "strings" "time" @@ -35,6 +36,7 @@ type WebRTCManager struct { sessions types.SessionManager remote types.RemoteManager config *config.WebRTC + api *webrtc.API } func (manager *WebRTCManager) Start() { @@ -61,6 +63,10 @@ func (manager *WebRTCManager) Start() { } }) + if err := manager.initAPI(); err != nil { + manager.logger.Panic().Err(err).Msg("failed to initialize webrtc API") + } + manager.logger.Info(). Str("ice_lite", fmt.Sprintf("%t", manager.config.ICELite)). Str("ice_servers", fmt.Sprintf("%+v", manager.config.ICEServers)). @@ -74,46 +80,96 @@ func (manager *WebRTCManager) Shutdown() error { return nil } -func (manager *WebRTCManager) CreatePeer(id string, session types.Session) (types.Peer, error) { - configuration := &webrtc.Configuration{ - ICEServers: manager.config.ICEServers, - SDPSemantics: webrtc.SDPSemanticsUnifiedPlanWithFallback, +func (manager *WebRTCManager) initAPI() error { + logger := loggerFactory{ + logger: manager.logger, } settings := webrtc.SettingEngine{ - LoggerFactory: loggerFactory{ - logger: manager.logger, - }, - } - - if manager.config.ICELite { - configuration = &webrtc.Configuration{ - SDPSemantics: webrtc.SDPSemanticsUnifiedPlanWithFallback, - } - settings.SetLite(true) + LoggerFactory: logger, } _ = settings.SetEphemeralUDPPortRange(manager.config.EphemeralMin, manager.config.EphemeralMax) settings.SetNAT1To1IPs(manager.config.NAT1To1IPs, webrtc.ICECandidateTypeHost) settings.SetICETimeouts(6*time.Second, 6*time.Second, 3*time.Second) settings.SetSRTPReplayProtectionWindow(512) + settings.SetLite(manager.config.ICELite) - // Create MediaEngine based off sdp + var networkType []webrtc.NetworkType + + // Add TCP Mux + if manager.config.TCPMUX > 0 { + tcpListener, err := net.ListenTCP("tcp", &net.TCPAddr{ + IP: net.IP{0, 0, 0, 0}, + Port: manager.config.TCPMUX, + }) + + if err != nil { + return err + } + + tcpMux := webrtc.NewICETCPMux(logger.NewLogger("ice-tcp"), tcpListener, 32) + settings.SetICETCPMux(tcpMux) + + networkType = append(networkType, webrtc.NetworkTypeTCP4) + manager.logger.Info().Str("listener", tcpListener.Addr().String()).Msg("using TCP MUX") + } + + // Add UDP Mux + if manager.config.UDPMUX > 0 { + udpListener, err := net.ListenUDP("udp", &net.UDPAddr{ + IP: net.IP{0, 0, 0, 0}, + Port: manager.config.UDPMUX, + }) + + if err != nil { + return err + } + + udpMux := webrtc.NewICEUDPMux(logger.NewLogger("ice-udp"), udpListener) + settings.SetICEUDPMux(udpMux) + + networkType = append(networkType, webrtc.NetworkTypeUDP4) + manager.logger.Info().Str("listener", udpListener.LocalAddr().String()).Msg("using UDP MUX") + } + + // Enable support for TCP and UDP ICE candidates + if len(networkType) > 0 { + settings.SetNetworkTypes(networkType) + } + + // Create MediaEngine with selected codecs engine := webrtc.MediaEngine{} - _ = engine.RegisterCodec(manager.audioCodec, webrtc.RTPCodecTypeAudio) _ = engine.RegisterCodec(manager.videoCodec, webrtc.RTPCodecTypeVideo) + // Register Interceptors i := &interceptor.Registry{} if err := webrtc.RegisterDefaultInterceptors(&engine, i); err != nil { - return nil, err + return err } // Create API with MediaEngine and SettingEngine - api := webrtc.NewAPI(webrtc.WithMediaEngine(&engine), webrtc.WithSettingEngine(settings), webrtc.WithInterceptorRegistry(i)) + manager.api = webrtc.NewAPI( + webrtc.WithMediaEngine(&engine), + webrtc.WithSettingEngine(settings), + webrtc.WithInterceptorRegistry(i), + ) + + return nil +} + +func (manager *WebRTCManager) CreatePeer(id string, session types.Session) (types.Peer, error) { + configuration := webrtc.Configuration{ + SDPSemantics: webrtc.SDPSemanticsUnifiedPlanWithFallback, + } + + if !manager.config.ICELite { + configuration.ICEServers = manager.config.ICEServers + } // Create new peer connection - connection, err := api.NewPeerConnection(*configuration) + connection, err := manager.api.NewPeerConnection(configuration) if err != nil { return nil, err } @@ -173,13 +229,9 @@ func (manager *WebRTCManager) CreatePeer(id string, session types.Session) (type }) peer := &Peer{ - id: id, - api: api, - engine: &engine, - manager: manager, - settings: &settings, - connection: connection, - configuration: configuration, + id: id, + manager: manager, + connection: connection, } connection.OnNegotiationNeeded(func() {