From 924be2050546cf9f9940bb5778cdf0d6745d4588 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20=C5=A0ediv=C3=BD?= Date: Wed, 9 Feb 2022 20:45:03 +0000 Subject: [PATCH] WebRTC TCP and UDP mux. --- internal/config/webrtc.go | 91 ++++++++++++++++++------------- internal/webrtc/manager.go | 38 +++++++++++++ internal/webrtc/peerconnection.go | 30 +++++++++- 3 files changed, 121 insertions(+), 38 deletions(-) diff --git a/internal/config/webrtc.go b/internal/config/webrtc.go index ab6d84ed..2024274c 100644 --- a/internal/config/webrtc.go +++ b/internal/config/webrtc.go @@ -1,7 +1,6 @@ package config import ( - "fmt" "strconv" "strings" @@ -13,12 +12,6 @@ import ( "demodesk/neko/internal/utils" ) -// default port range - min -const defEprMin = 59000 - -// default port range - max -const defEprMax = 59100 - // default stun server const defStunSrv = "stun:stun.l.google.com:19302" @@ -28,6 +21,8 @@ type WebRTC struct { ICEServers []types.ICEServer EphemeralMin uint16 EphemeralMax uint16 + TCPMux int + UDPMux int NAT1To1IPs []string IpRetrievalUrl string @@ -49,6 +44,21 @@ func (WebRTC) Init(cmd *cobra.Command) error { return err } + cmd.PersistentFlags().String("webrtc.epr", "", "limits the pool of ephemeral ports that ICE UDP connections can allocate from") + if err := viper.BindPFlag("webrtc.epr", cmd.PersistentFlags().Lookup("webrtc.epr")); err != nil { + return err + } + + cmd.PersistentFlags().Int("webrtc.tcpmux", 0, "single TCP mux port for all peers") + if err := viper.BindPFlag("webrtc.tcpmux", cmd.PersistentFlags().Lookup("webrtc.tcpmux")); err != nil { + return err + } + + cmd.PersistentFlags().Int("webrtc.udpmux", 0, "single UDP mux port for all peers, replaces EPR") + if err := viper.BindPFlag("webrtc.udpmux", cmd.PersistentFlags().Lookup("webrtc.udpmux")); err != nil { + return err + } + cmd.PersistentFlags().StringSlice("webrtc.nat1to1", []string{}, "sets a list of external IP addresses of 1:1 (D)NAT and a candidate type for which the external IP address is used") if err := viper.BindPFlag("webrtc.nat1to1", cmd.PersistentFlags().Lookup("webrtc.nat1to1")); err != nil { return err @@ -59,11 +69,6 @@ func (WebRTC) Init(cmd *cobra.Command) error { return err } - cmd.PersistentFlags().String("webrtc.epr", fmt.Sprintf("%d-%d", defEprMin, defEprMax), "limits the pool of ephemeral ports that ICE UDP connections can allocate from") - if err := viper.BindPFlag("webrtc.epr", cmd.PersistentFlags().Lookup("webrtc.epr")); err != nil { - return err - } - return nil } @@ -83,6 +88,43 @@ func (s *WebRTC) Set() { }) } + s.TCPMux = viper.GetInt("webrtc.tcpmux") + s.UDPMux = viper.GetInt("webrtc.udpmux") + + epr := viper.GetString("webrtc.epr") + if epr != "" { + ports := strings.SplitN(epr, "-", -1) + if len(ports) > 1 { + min, err := strconv.ParseUint(ports[0], 10, 16) + if err != nil { + log.Panic().Err(err).Msgf("unable to parse ephemeral min port") + } + + max, err := strconv.ParseUint(ports[1], 10, 16) + if err != nil { + log.Panic().Err(err).Msgf("unable to parse ephemeral max port") + } + + s.EphemeralMin = uint16(min) + s.EphemeralMax = uint16(max) + } + + if s.EphemeralMin > s.EphemeralMax { + log.Panic().Msgf("ephemeral min port cannot be bigger than max") + } + } + + if epr == "" && s.TCPMux == 0 && s.UDPMux == 0 { + // using default epr range + s.EphemeralMin = 59000 + s.EphemeralMax = 59100 + + log.Warn(). + Uint16("min", s.EphemeralMin). + Uint16("max", s.EphemeralMax). + Msgf("no TCP, UDP mux or epr specified, using default epr range") + } + s.NAT1To1IPs = viper.GetStringSlice("webrtc.nat1to1") s.IpRetrievalUrl = viper.GetString("webrtc.ip_retrieval_url") if s.IpRetrievalUrl != "" && len(s.NAT1To1IPs) == 0 { @@ -93,29 +135,4 @@ func (s *WebRTC) Set() { log.Warn().Err(err).Msgf("IP retrieval failed") } } - - min := uint16(defEprMin) - max := uint16(defEprMax) - - epr := viper.GetString("webrtc.epr") - ports := strings.SplitN(epr, "-", -1) - if len(ports) > 1 { - start, err := strconv.ParseUint(ports[0], 10, 16) - if err == nil { - min = uint16(start) - } - - end, err := strconv.ParseUint(ports[1], 10, 16) - if err == nil { - max = uint16(end) - } - } - - if min > max { - s.EphemeralMin = max - s.EphemeralMax = min - } else { - s.EphemeralMin = min - s.EphemeralMax = max - } } diff --git a/internal/webrtc/manager.go b/internal/webrtc/manager.go index 49a982a6..5d381925 100644 --- a/internal/webrtc/manager.go +++ b/internal/webrtc/manager.go @@ -2,9 +2,11 @@ package webrtc import ( "fmt" + "net" "strings" "time" + "github.com/pion/ice/v2" "github.com/pion/rtcp" "github.com/pion/webrtc/v3" "github.com/rs/zerolog" @@ -16,6 +18,7 @@ import ( "demodesk/neko/internal/types/event" "demodesk/neko/internal/types/message" "demodesk/neko/internal/webrtc/cursor" + "demodesk/neko/internal/webrtc/pionlog" ) // the duration without network activity before a Agent is considered disconnected. Default is 5 Seconds @@ -51,18 +54,53 @@ type WebRTCManagerCtx struct { curImage *cursor.ImageCtx curPosition *cursor.PositionCtx + tcpMux ice.TCPMux + udpMux ice.UDPMux + camStop, micStop *func() } func (manager *WebRTCManagerCtx) Start() { manager.curImage.Start() + logger := pionlog.New(manager.logger) + + // add TCP Mux listener + if manager.config.TCPMux > 0 { + tcpListener, err := net.ListenTCP("tcp", &net.TCPAddr{ + IP: net.IP{0, 0, 0, 0}, + Port: manager.config.TCPMux, + }) + + if err != nil { + manager.logger.Panic().Err(err).Msg("unable to setup ice TCP mux") + } + + manager.tcpMux = webrtc.NewICETCPMux(logger.NewLogger("ice-tcp"), tcpListener, 32) + } + + // add UDP Mux listener + if manager.config.UDPMux > 0 { + udpListener, err := net.ListenUDP("udp", &net.UDPAddr{ + IP: net.IP{0, 0, 0, 0}, + Port: manager.config.UDPMux, + }) + + if err != nil { + manager.logger.Panic().Err(err).Msg("unable to setup ice UDP mux") + } + + manager.udpMux = webrtc.NewICEUDPMux(logger.NewLogger("ice-udp"), udpListener) + } + manager.logger.Info(). Bool("icelite", manager.config.ICELite). Bool("icetrickle", manager.config.ICETrickle). Interface("iceservers", manager.config.ICEServers). Str("nat1to1", strings.Join(manager.config.NAT1To1IPs, ",")). Str("epr", fmt.Sprintf("%d-%d", manager.config.EphemeralMin, manager.config.EphemeralMax)). + Int("tcpmux", manager.config.TCPMux). + Int("udpmux", manager.config.UDPMux). Msg("webrtc starting") } diff --git a/internal/webrtc/peerconnection.go b/internal/webrtc/peerconnection.go index 15a93c2d..6a9683b0 100644 --- a/internal/webrtc/peerconnection.go +++ b/internal/webrtc/peerconnection.go @@ -23,11 +23,39 @@ func (manager *WebRTCManagerCtx) newPeerConnection(codecs []codec.RTPCodec, logg LoggerFactory: pionlog.New(logger), } - _ = settings.SetEphemeralUDPPortRange(manager.config.EphemeralMin, manager.config.EphemeralMax) settings.SetICETimeouts(disconnectedTimeout, failedTimeout, keepAliveInterval) settings.SetNAT1To1IPs(manager.config.NAT1To1IPs, webrtc.ICECandidateTypeHost) settings.SetLite(manager.config.ICELite) + var networkType []webrtc.NetworkType + + // udp candidates + if manager.udpMux != nil { + settings.SetICEUDPMux(manager.udpMux) + networkType = append(networkType, + webrtc.NetworkTypeUDP4, + webrtc.NetworkTypeUDP6, + ) + } else if manager.config.EphemeralMax != 0 { + _ = settings.SetEphemeralUDPPortRange(manager.config.EphemeralMin, manager.config.EphemeralMax) + networkType = append(networkType, + webrtc.NetworkTypeUDP4, + webrtc.NetworkTypeUDP6, + ) + } + + // tcp candidates + if manager.tcpMux != nil { + settings.SetICETCPMux(manager.tcpMux) + networkType = append(networkType, + webrtc.NetworkTypeTCP4, + webrtc.NetworkTypeTCP6, + ) + } + + // enable support for TCP and UDP ICE candidates + settings.SetNetworkTypes(networkType) + // create interceptor registry registry := &interceptor.Registry{} if err := webrtc.RegisterDefaultInterceptors(engine, registry); err != nil {