WebRTC TCP and UDP mux.

This commit is contained in:
Miroslav Šedivý 2022-02-09 20:45:03 +00:00
parent c2cf9277dc
commit 924be20505
3 changed files with 121 additions and 38 deletions

View File

@ -1,7 +1,6 @@
package config package config
import ( import (
"fmt"
"strconv" "strconv"
"strings" "strings"
@ -13,12 +12,6 @@ import (
"demodesk/neko/internal/utils" "demodesk/neko/internal/utils"
) )
// default port range - min
const defEprMin = 59000
// default port range - max
const defEprMax = 59100
// default stun server // default stun server
const defStunSrv = "stun:stun.l.google.com:19302" const defStunSrv = "stun:stun.l.google.com:19302"
@ -28,6 +21,8 @@ type WebRTC struct {
ICEServers []types.ICEServer ICEServers []types.ICEServer
EphemeralMin uint16 EphemeralMin uint16
EphemeralMax uint16 EphemeralMax uint16
TCPMux int
UDPMux int
NAT1To1IPs []string NAT1To1IPs []string
IpRetrievalUrl string IpRetrievalUrl string
@ -49,6 +44,21 @@ func (WebRTC) Init(cmd *cobra.Command) error {
return err return err
} }
cmd.PersistentFlags().String("webrtc.epr", "", "limits the pool of ephemeral ports that ICE UDP connections can allocate from")
if err := viper.BindPFlag("webrtc.epr", cmd.PersistentFlags().Lookup("webrtc.epr")); err != nil {
return err
}
cmd.PersistentFlags().Int("webrtc.tcpmux", 0, "single TCP mux port for all peers")
if err := viper.BindPFlag("webrtc.tcpmux", cmd.PersistentFlags().Lookup("webrtc.tcpmux")); err != nil {
return err
}
cmd.PersistentFlags().Int("webrtc.udpmux", 0, "single UDP mux port for all peers, replaces EPR")
if err := viper.BindPFlag("webrtc.udpmux", cmd.PersistentFlags().Lookup("webrtc.udpmux")); err != nil {
return err
}
cmd.PersistentFlags().StringSlice("webrtc.nat1to1", []string{}, "sets a list of external IP addresses of 1:1 (D)NAT and a candidate type for which the external IP address is used") cmd.PersistentFlags().StringSlice("webrtc.nat1to1", []string{}, "sets a list of external IP addresses of 1:1 (D)NAT and a candidate type for which the external IP address is used")
if err := viper.BindPFlag("webrtc.nat1to1", cmd.PersistentFlags().Lookup("webrtc.nat1to1")); err != nil { if err := viper.BindPFlag("webrtc.nat1to1", cmd.PersistentFlags().Lookup("webrtc.nat1to1")); err != nil {
return err return err
@ -59,11 +69,6 @@ func (WebRTC) Init(cmd *cobra.Command) error {
return err return err
} }
cmd.PersistentFlags().String("webrtc.epr", fmt.Sprintf("%d-%d", defEprMin, defEprMax), "limits the pool of ephemeral ports that ICE UDP connections can allocate from")
if err := viper.BindPFlag("webrtc.epr", cmd.PersistentFlags().Lookup("webrtc.epr")); err != nil {
return err
}
return nil return nil
} }
@ -83,6 +88,43 @@ func (s *WebRTC) Set() {
}) })
} }
s.TCPMux = viper.GetInt("webrtc.tcpmux")
s.UDPMux = viper.GetInt("webrtc.udpmux")
epr := viper.GetString("webrtc.epr")
if epr != "" {
ports := strings.SplitN(epr, "-", -1)
if len(ports) > 1 {
min, err := strconv.ParseUint(ports[0], 10, 16)
if err != nil {
log.Panic().Err(err).Msgf("unable to parse ephemeral min port")
}
max, err := strconv.ParseUint(ports[1], 10, 16)
if err != nil {
log.Panic().Err(err).Msgf("unable to parse ephemeral max port")
}
s.EphemeralMin = uint16(min)
s.EphemeralMax = uint16(max)
}
if s.EphemeralMin > s.EphemeralMax {
log.Panic().Msgf("ephemeral min port cannot be bigger than max")
}
}
if epr == "" && s.TCPMux == 0 && s.UDPMux == 0 {
// using default epr range
s.EphemeralMin = 59000
s.EphemeralMax = 59100
log.Warn().
Uint16("min", s.EphemeralMin).
Uint16("max", s.EphemeralMax).
Msgf("no TCP, UDP mux or epr specified, using default epr range")
}
s.NAT1To1IPs = viper.GetStringSlice("webrtc.nat1to1") s.NAT1To1IPs = viper.GetStringSlice("webrtc.nat1to1")
s.IpRetrievalUrl = viper.GetString("webrtc.ip_retrieval_url") s.IpRetrievalUrl = viper.GetString("webrtc.ip_retrieval_url")
if s.IpRetrievalUrl != "" && len(s.NAT1To1IPs) == 0 { if s.IpRetrievalUrl != "" && len(s.NAT1To1IPs) == 0 {
@ -93,29 +135,4 @@ func (s *WebRTC) Set() {
log.Warn().Err(err).Msgf("IP retrieval failed") log.Warn().Err(err).Msgf("IP retrieval failed")
} }
} }
min := uint16(defEprMin)
max := uint16(defEprMax)
epr := viper.GetString("webrtc.epr")
ports := strings.SplitN(epr, "-", -1)
if len(ports) > 1 {
start, err := strconv.ParseUint(ports[0], 10, 16)
if err == nil {
min = uint16(start)
}
end, err := strconv.ParseUint(ports[1], 10, 16)
if err == nil {
max = uint16(end)
}
}
if min > max {
s.EphemeralMin = max
s.EphemeralMax = min
} else {
s.EphemeralMin = min
s.EphemeralMax = max
}
} }

View File

@ -2,9 +2,11 @@ package webrtc
import ( import (
"fmt" "fmt"
"net"
"strings" "strings"
"time" "time"
"github.com/pion/ice/v2"
"github.com/pion/rtcp" "github.com/pion/rtcp"
"github.com/pion/webrtc/v3" "github.com/pion/webrtc/v3"
"github.com/rs/zerolog" "github.com/rs/zerolog"
@ -16,6 +18,7 @@ import (
"demodesk/neko/internal/types/event" "demodesk/neko/internal/types/event"
"demodesk/neko/internal/types/message" "demodesk/neko/internal/types/message"
"demodesk/neko/internal/webrtc/cursor" "demodesk/neko/internal/webrtc/cursor"
"demodesk/neko/internal/webrtc/pionlog"
) )
// the duration without network activity before a Agent is considered disconnected. Default is 5 Seconds // the duration without network activity before a Agent is considered disconnected. Default is 5 Seconds
@ -51,18 +54,53 @@ type WebRTCManagerCtx struct {
curImage *cursor.ImageCtx curImage *cursor.ImageCtx
curPosition *cursor.PositionCtx curPosition *cursor.PositionCtx
tcpMux ice.TCPMux
udpMux ice.UDPMux
camStop, micStop *func() camStop, micStop *func()
} }
func (manager *WebRTCManagerCtx) Start() { func (manager *WebRTCManagerCtx) Start() {
manager.curImage.Start() manager.curImage.Start()
logger := pionlog.New(manager.logger)
// add TCP Mux listener
if manager.config.TCPMux > 0 {
tcpListener, err := net.ListenTCP("tcp", &net.TCPAddr{
IP: net.IP{0, 0, 0, 0},
Port: manager.config.TCPMux,
})
if err != nil {
manager.logger.Panic().Err(err).Msg("unable to setup ice TCP mux")
}
manager.tcpMux = webrtc.NewICETCPMux(logger.NewLogger("ice-tcp"), tcpListener, 32)
}
// add UDP Mux listener
if manager.config.UDPMux > 0 {
udpListener, err := net.ListenUDP("udp", &net.UDPAddr{
IP: net.IP{0, 0, 0, 0},
Port: manager.config.UDPMux,
})
if err != nil {
manager.logger.Panic().Err(err).Msg("unable to setup ice UDP mux")
}
manager.udpMux = webrtc.NewICEUDPMux(logger.NewLogger("ice-udp"), udpListener)
}
manager.logger.Info(). manager.logger.Info().
Bool("icelite", manager.config.ICELite). Bool("icelite", manager.config.ICELite).
Bool("icetrickle", manager.config.ICETrickle). Bool("icetrickle", manager.config.ICETrickle).
Interface("iceservers", manager.config.ICEServers). Interface("iceservers", manager.config.ICEServers).
Str("nat1to1", strings.Join(manager.config.NAT1To1IPs, ",")). Str("nat1to1", strings.Join(manager.config.NAT1To1IPs, ",")).
Str("epr", fmt.Sprintf("%d-%d", manager.config.EphemeralMin, manager.config.EphemeralMax)). Str("epr", fmt.Sprintf("%d-%d", manager.config.EphemeralMin, manager.config.EphemeralMax)).
Int("tcpmux", manager.config.TCPMux).
Int("udpmux", manager.config.UDPMux).
Msg("webrtc starting") Msg("webrtc starting")
} }

View File

@ -23,11 +23,39 @@ func (manager *WebRTCManagerCtx) newPeerConnection(codecs []codec.RTPCodec, logg
LoggerFactory: pionlog.New(logger), LoggerFactory: pionlog.New(logger),
} }
_ = settings.SetEphemeralUDPPortRange(manager.config.EphemeralMin, manager.config.EphemeralMax)
settings.SetICETimeouts(disconnectedTimeout, failedTimeout, keepAliveInterval) settings.SetICETimeouts(disconnectedTimeout, failedTimeout, keepAliveInterval)
settings.SetNAT1To1IPs(manager.config.NAT1To1IPs, webrtc.ICECandidateTypeHost) settings.SetNAT1To1IPs(manager.config.NAT1To1IPs, webrtc.ICECandidateTypeHost)
settings.SetLite(manager.config.ICELite) settings.SetLite(manager.config.ICELite)
var networkType []webrtc.NetworkType
// udp candidates
if manager.udpMux != nil {
settings.SetICEUDPMux(manager.udpMux)
networkType = append(networkType,
webrtc.NetworkTypeUDP4,
webrtc.NetworkTypeUDP6,
)
} else if manager.config.EphemeralMax != 0 {
_ = settings.SetEphemeralUDPPortRange(manager.config.EphemeralMin, manager.config.EphemeralMax)
networkType = append(networkType,
webrtc.NetworkTypeUDP4,
webrtc.NetworkTypeUDP6,
)
}
// tcp candidates
if manager.tcpMux != nil {
settings.SetICETCPMux(manager.tcpMux)
networkType = append(networkType,
webrtc.NetworkTypeTCP4,
webrtc.NetworkTypeTCP6,
)
}
// enable support for TCP and UDP ICE candidates
settings.SetNetworkTypes(networkType)
// create interceptor registry // create interceptor registry
registry := &interceptor.Registry{} registry := &interceptor.Registry{}
if err := webrtc.RegisterDefaultInterceptors(engine, registry); err != nil { if err := webrtc.RegisterDefaultInterceptors(engine, registry); err != nil {