Merge pull request #43 from mbattista/add-turn-server

allow to add password protected turn server
This commit is contained in:
Miroslav Šedivý 2021-04-04 22:49:51 +02:00 committed by GitHub
commit 4589f758c6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 216 additions and 187 deletions

View File

@ -41,6 +41,7 @@ For n.eko room management software visit https://github.com/m1k1o/neko-rooms.
- Added `?usr=<display-name>` that will prefill username. This allows creating auto-join links.
- Added `?cast=1` that will hide all control and show only video.
- Shake keyboard icon if someone attempted to control when is nobody hosting.
- Support for password protected `NEKO_ICESERVERS` (by @mbattista).
### Bugs
- Fixed minor gst pipeline bug.
@ -280,7 +281,16 @@ NEKO_CERT:
NEKO_KEY:
- Path to the SSL-Certificate private key
- e.g. '/certs/key.pem'
NEKO_ICELITE:
- Use the ice lite protocol
- e.g. false
NEKO_ICESERVER:
- Describes a single STUN and TURN server that can be used by the ICEAgent to establish a connection with a peer (simple usage for server without authentication)
- e.g. 'stun:stun.l.google.com:19302'
NEKO_ICESERVERS:
- Describes multiple STUN and TURN server that can be used by the ICEAgent to establish a connection with a peer
- e.g. '[{"urls": ["turn:turn.example.com:19302", "stun:stun.example.com:19302"], "username": "name", "credential": "password"}, {"urls": ["stun:stun.example2.com:19302"]}]'
- [More information](https://developer.mozilla.org/en-US/docs/Web/API/RTCIceServer)
```
# How to contribute?

View File

@ -183,7 +183,7 @@ export abstract class BaseClient extends EventEmitter<BaseEvents> {
this._ws!.send(JSON.stringify({ event, ...payload }))
}
public createPeer(sdp: string, lite: boolean, servers: string[]) {
public createPeer(sdp: string, lite: boolean, servers: RTCIceServer[]) {
this.emit('debug', `creating peer`)
if (!this.socketOpen) {
this.emit(
@ -202,7 +202,7 @@ export abstract class BaseClient extends EventEmitter<BaseEvents> {
this._peer = new RTCPeerConnection()
if (lite !== true) {
this._peer = new RTCPeerConnection({
iceServers: [{ urls: servers }],
iceServers: servers,
})
}

View File

@ -67,7 +67,7 @@ export interface SignalProvideMessage extends WebSocketMessage, SignalProvidePay
export interface SignalProvidePayload {
id: string
lite: boolean
ice: string[]
ice: RTCIceServer[]
sdp: string
}

View File

@ -39,10 +39,10 @@ import (
// Pipeline is a wrapper for a GStreamer Pipeline
type Pipeline struct {
Pipeline *C.GstElement
Sample chan types.Sample
Src string
id int
Pipeline *C.GstElement
Sample chan types.Sample
Src string
id int
}
var pipelines = make(map[int]*Pipeline)
@ -209,10 +209,10 @@ func CreatePipeline(pipelineStr string) (*Pipeline, error) {
defer pipelinesLock.Unlock()
p := &Pipeline{
Pipeline: C.gstreamer_send_create_pipeline(pipelineStrUnsafe),
Sample: make(chan types.Sample),
Src: pipelineStr,
id: len(pipelines),
Pipeline: C.gstreamer_send_create_pipeline(pipelineStrUnsafe),
Sample: make(chan types.Sample),
Src: pipelineStr,
id: len(pipelines),
}
pipelines[p.id] = p

View File

@ -1,102 +1,102 @@
package endpoint
import (
"encoding/json"
"fmt"
"net/http"
"runtime/debug"
"encoding/json"
"fmt"
"net/http"
"runtime/debug"
"github.com/go-chi/chi/middleware"
"github.com/rs/zerolog/log"
"github.com/go-chi/chi/middleware"
"github.com/rs/zerolog/log"
)
type (
Endpoint func(http.ResponseWriter, *http.Request) error
Endpoint func(http.ResponseWriter, *http.Request) error
ErrResponse struct {
Status int `json:"status,omitempty"`
Err string `json:"error,omitempty"`
Message string `json:"message,omitempty"`
Details string `json:"details,omitempty"`
Code string `json:"code,omitempty"`
RequestID string `json:"request,omitempty"`
}
ErrResponse struct {
Status int `json:"status,omitempty"`
Err string `json:"error,omitempty"`
Message string `json:"message,omitempty"`
Details string `json:"details,omitempty"`
Code string `json:"code,omitempty"`
RequestID string `json:"request,omitempty"`
}
)
func Handle(handler Endpoint) http.HandlerFunc {
fn := func(w http.ResponseWriter, r *http.Request) {
if err := handler(w, r); err != nil {
WriteError(w, r, err)
}
}
fn := func(w http.ResponseWriter, r *http.Request) {
if err := handler(w, r); err != nil {
WriteError(w, r, err)
}
}
return http.HandlerFunc(fn)
return http.HandlerFunc(fn)
}
var nonErrorsCodes = map[int]bool{
404: true,
404: true,
}
func errResponse(input interface{}) *ErrResponse {
var res *ErrResponse
var err interface{}
var res *ErrResponse
var err interface{}
switch input.(type) {
case *HandlerError:
e := input.(*HandlerError)
res = &ErrResponse{
Status: e.Status,
Err: http.StatusText(e.Status),
Message: e.Message,
}
err = e.Err
default:
res = &ErrResponse{
Status: http.StatusInternalServerError,
Err: http.StatusText(http.StatusInternalServerError),
}
err = input
}
switch input.(type) {
case *HandlerError:
e := input.(*HandlerError)
res = &ErrResponse{
Status: e.Status,
Err: http.StatusText(e.Status),
Message: e.Message,
}
err = e.Err
default:
res = &ErrResponse{
Status: http.StatusInternalServerError,
Err: http.StatusText(http.StatusInternalServerError),
}
err = input
}
if err != nil {
switch err.(type) {
case *error:
e := err.(error)
res.Details = e.Error()
break
default:
res.Details = fmt.Sprintf("%+v", err)
break
}
}
if err != nil {
switch err.(type) {
case *error:
e := err.(error)
res.Details = e.Error()
break
default:
res.Details = fmt.Sprintf("%+v", err)
break
}
}
return res
return res
}
func WriteError(w http.ResponseWriter, r *http.Request, err interface{}) {
hlog := log.With().
Str("module", "http").
Logger()
hlog := log.With().
Str("module", "http").
Logger()
res := errResponse(err)
res := errResponse(err)
if reqID := middleware.GetReqID(r.Context()); reqID != "" {
res.RequestID = reqID
}
if reqID := middleware.GetReqID(r.Context()); reqID != "" {
res.RequestID = reqID
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(res.Status)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(res.Status)
if err := json.NewEncoder(w).Encode(res); err != nil {
hlog.Warn().Err(err).Msg("Failed writing json error response")
}
if err := json.NewEncoder(w).Encode(res); err != nil {
hlog.Warn().Err(err).Msg("Failed writing json error response")
}
if !nonErrorsCodes[res.Status] {
logEntry := middleware.GetLogEntry(r)
if logEntry != nil {
logEntry.Panic(err, debug.Stack())
} else {
hlog.Error().Str("stack", string(debug.Stack())).Msgf("%+v", err)
}
}
if !nonErrorsCodes[res.Status] {
logEntry := middleware.GetLogEntry(r)
if logEntry != nil {
logEntry.Panic(err, debug.Stack())
} else {
hlog.Error().Str("stack", string(debug.Stack())).Msgf("%+v", err)
}
}
}

View File

@ -3,15 +3,15 @@ package endpoint
import "fmt"
type HandlerError struct {
Status int
Message string
Err error
Status int
Message string
Err error
}
func (e *HandlerError) Error() string {
if e.Err != nil {
return fmt.Sprintf("%s: %s", e.Message, e.Err.Error())
}
if e.Err != nil {
return fmt.Sprintf("%s: %s", e.Message, e.Err.Error())
}
return e.Message
return e.Message
}

View File

@ -4,9 +4,9 @@ package middleware
// a pointer so it fits in an interface{} without allocation. This technique
// for defining context keys was copied from Go 1.7's new use of context in net/http.
type ctxKey struct {
name string
name string
}
func (k *ctxKey) String() string {
return "neko/ctx/" + k.name
return "neko/ctx/" + k.name
}

View File

@ -4,21 +4,21 @@ package middleware
// https://github.com/zenazn/goji/tree/master/web/middleware
import (
"net/http"
"net/http"
"n.eko.moe/neko/internal/http/endpoint"
"n.eko.moe/neko/internal/http/endpoint"
)
func Recoverer(next http.Handler) http.Handler {
fn := func(w http.ResponseWriter, r *http.Request) {
defer func() {
if rvr := recover(); rvr != nil {
endpoint.WriteError(w, r, rvr)
}
}()
fn := func(w http.ResponseWriter, r *http.Request) {
defer func() {
if rvr := recover(); rvr != nil {
endpoint.WriteError(w, r, rvr)
}
}()
next.ServeHTTP(w, r)
}
next.ServeHTTP(w, r)
}
return http.HandlerFunc(fn)
return http.HandlerFunc(fn)
}

View File

@ -1,14 +1,14 @@
package middleware
import (
"context"
"crypto/rand"
"encoding/base64"
"fmt"
"net/http"
"os"
"strings"
"sync/atomic"
"context"
"crypto/rand"
"encoding/base64"
"fmt"
"net/http"
"os"
"strings"
"sync/atomic"
)
// Key to use when setting the request ID.
@ -37,19 +37,19 @@ var reqid uint64
// than a millionth of a percent chance of generating two colliding IDs.
func init() {
hostname, err := os.Hostname()
if hostname == "" || err != nil {
hostname = "localhost"
}
var buf [12]byte
var b64 string
for len(b64) < 10 {
rand.Read(buf[:])
b64 = base64.StdEncoding.EncodeToString(buf[:])
b64 = strings.NewReplacer("+", "", "/", "").Replace(b64)
}
hostname, err := os.Hostname()
if hostname == "" || err != nil {
hostname = "localhost"
}
var buf [12]byte
var b64 string
for len(b64) < 10 {
rand.Read(buf[:])
b64 = base64.StdEncoding.EncodeToString(buf[:])
b64 = strings.NewReplacer("+", "", "/", "").Replace(b64)
}
prefix = fmt.Sprintf("%s/%s", hostname, b64[0:10])
prefix = fmt.Sprintf("%s/%s", hostname, b64[0:10])
}
// RequestID is a middleware that injects a request ID into the context of each
@ -58,32 +58,32 @@ func init() {
// process, and where the last number is an atomically incremented request
// counter.
func RequestID(next http.Handler) http.Handler {
fn := func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
requestID := r.Header.Get("X-Request-Id")
if requestID == "" {
myid := atomic.AddUint64(&reqid, 1)
requestID = fmt.Sprintf("%s-%06d", prefix, myid)
}
ctx = context.WithValue(ctx, RequestIDKey, requestID)
next.ServeHTTP(w, r.WithContext(ctx))
}
return http.HandlerFunc(fn)
fn := func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
requestID := r.Header.Get("X-Request-Id")
if requestID == "" {
myid := atomic.AddUint64(&reqid, 1)
requestID = fmt.Sprintf("%s-%06d", prefix, myid)
}
ctx = context.WithValue(ctx, RequestIDKey, requestID)
next.ServeHTTP(w, r.WithContext(ctx))
}
return http.HandlerFunc(fn)
}
// GetReqID returns a request ID from the given context if one is present.
// Returns the empty string if a request ID cannot be found.
func GetReqID(ctx context.Context) string {
if ctx == nil {
return ""
}
if reqID, ok := ctx.Value(RequestIDKey).(string); ok {
return reqID
}
return ""
if ctx == nil {
return ""
}
if reqID, ok := ctx.Value(RequestIDKey).(string); ok {
return reqID
}
return ""
}
// NextRequestID generates the next request ID in the sequence.
func NextRequestID() uint64 {
return atomic.AddUint64(&reqid, 1)
return atomic.AddUint64(&reqid, 1)
}

View File

@ -126,8 +126,8 @@ func (session *Session) SignalCandidate(data string) error {
}
return session.socket.Send(&message.SignalCandidate{
Event: event.SIGNAL_CANDIDATE,
Data: data,
});
Data: data,
})
}
func (session *Session) destroy() error {

View File

@ -1,17 +1,20 @@
package config
import (
"encoding/json"
"strconv"
"strings"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"n.eko.moe/neko/internal/utils"
"github.com/pion/webrtc/v3"
)
type WebRTC struct {
ICELite bool
ICEServers []string
ICEServers []webrtc.ICEServer
EphemeralMin uint16
EphemeralMax uint16
NAT1To1IPs []string
@ -38,13 +41,31 @@ func (WebRTC) Init(cmd *cobra.Command) error {
return err
}
cmd.PersistentFlags().String("iceservers", "", "describes a single STUN and TURN server that can be used by the ICEAgent to establish a connection with a peer")
if err := viper.BindPFlag("iceservers", cmd.PersistentFlags().Lookup("iceservers")); err != nil {
return err
}
return nil
}
func (s *WebRTC) Set() {
s.ICELite = viper.GetBool("icelite")
s.ICEServers = viper.GetStringSlice("iceserver")
s.NAT1To1IPs = viper.GetStringSlice("nat1to1")
s.ICELite = viper.GetBool("icelite")
s.ICEServers = []webrtc.ICEServer{}
iceServersJson := viper.GetString("iceservers")
if iceServersJson != "" {
err := json.Unmarshal([]byte(iceServersJson), &s.ICEServers)
if err != nil {
panic(err)
}
}
iceServerSlice := viper.GetStringSlice("iceserver")
if len(iceServerSlice) > 0 {
s.ICEServers = append(s.ICEServers, webrtc.ICEServer{URLs: iceServerSlice})
}
if len(s.NAT1To1IPs) == 0 {
ip, err := utils.GetIP()

View File

@ -2,6 +2,8 @@ package message
import (
"n.eko.moe/neko/internal/types"
"github.com/pion/webrtc/v3"
)
type Message struct {
@ -14,11 +16,11 @@ type Disconnect struct {
}
type SignalProvide struct {
Event string `json:"event"`
ID string `json:"id"`
SDP string `json:"sdp"`
Lite bool `json:"lite"`
ICE []string `json:"ice"`
Event string `json:"event"`
ID string `json:"id"`
SDP string `json:"sdp"`
Lite bool `json:"lite"`
ICE []webrtc.ICEServer `json:"ice"`
}
type SignalAnswer struct {
@ -28,8 +30,8 @@ type SignalAnswer struct {
}
type SignalCandidate struct {
Event string `json:"event"`
Data string `json:"data"`
Event string `json:"event"`
Data string `json:"data"`
}
type MembersList struct {
@ -123,6 +125,6 @@ type BroadcastStatus struct {
}
type BroadcastCreate struct {
Event string `json:"event"`
URL string `json:"url"`
Event string `json:"event"`
URL string `json:"url"`
}

View File

@ -2,18 +2,20 @@ package types
import (
"time"
"github.com/pion/webrtc/v3"
)
type Sample struct {
Data []byte
Timestamp time.Time
Duration time.Duration
Data []byte
Timestamp time.Time
Duration time.Duration
}
type WebRTCManager interface {
Start()
Shutdown() error
CreatePeer(id string, session Session) (string, bool, []string, error)
CreatePeer(id string, session Session) (string, bool, []webrtc.ICEServer, error)
}
type Peer interface {

View File

@ -63,7 +63,7 @@ func (manager *WebRTCManager) Start() {
manager.logger.Info().
Str("ice_lite", fmt.Sprintf("%t", manager.config.ICELite)).
Str("ice_servers", strings.Join(manager.config.ICEServers, ",")).
Str("ice_servers", fmt.Sprintf("%+v", manager.config.ICEServers)).
Str("ephemeral_port_range", fmt.Sprintf("%d-%d", manager.config.EphemeralMin, manager.config.EphemeralMax)).
Str("nat_ips", strings.Join(manager.config.NAT1To1IPs, ",")).
Msgf("webrtc starting")
@ -74,13 +74,9 @@ func (manager *WebRTCManager) Shutdown() error {
return nil
}
func (manager *WebRTCManager) CreatePeer(id string, session types.Session) (string, bool, []string, error) {
func (manager *WebRTCManager) CreatePeer(id string, session types.Session) (string, bool, []webrtc.ICEServer, error) {
configuration := &webrtc.Configuration{
ICEServers: []webrtc.ICEServer{
{
URLs: manager.config.ICEServers,
},
},
ICEServers: manager.config.ICEServers,
SDPSemantics: webrtc.SDPSemanticsUnifiedPlanWithFallback,
}
@ -99,7 +95,7 @@ func (manager *WebRTCManager) CreatePeer(id string, session types.Session) (stri
settings.SetEphemeralUDPPortRange(manager.config.EphemeralMin, manager.config.EphemeralMax)
settings.SetNAT1To1IPs(manager.config.NAT1To1IPs, webrtc.ICECandidateTypeHost)
settings.SetICETimeouts(6 * time.Second, 6 * time.Second, 3 * time.Second)
settings.SetICETimeouts(6*time.Second, 6*time.Second, 3*time.Second)
settings.SetSRTPReplayProtectionWindow(512)
// Create MediaEngine based off sdp
@ -146,12 +142,12 @@ func (manager *WebRTCManager) CreatePeer(id string, session types.Session) (stri
Msg("connection state has changed")
})
rtpVideo, err := connection.AddTrack(manager.videoTrack);
rtpVideo, err := connection.AddTrack(manager.videoTrack)
if err != nil {
return "", manager.config.ICELite, manager.config.ICEServers, err
}
rtpAudio, err := connection.AddTrack(manager.audioTrack);
rtpAudio, err := connection.AddTrack(manager.audioTrack)
if err != nil {
return "", manager.config.ICELite, manager.config.ICEServers, err
}
@ -234,7 +230,6 @@ func (manager *WebRTCManager) CreatePeer(id string, session types.Session) (stri
}
}()
return description.SDP, manager.config.ICELite, manager.config.ICEServers, nil
}

View File

@ -90,7 +90,6 @@ func (h *MessageHandler) Message(id string, raw []byte) error {
return h.controlKeyboard(id, session, payload)
}), "%s failed", header.Event)
// Chat Events
case event.CHAT_MESSAGE:
payload := &message.ChatReceive{}

View File

@ -21,11 +21,11 @@ func New(sessions types.SessionManager, remote types.RemoteManager, broadcast ty
logger := log.With().Str("module", "websocket").Logger()
return &WebSocketHandler{
logger: logger,
conf: conf,
sessions: sessions,
remote: remote,
upgrader: websocket.Upgrader{
logger: logger,
conf: conf,
sessions: sessions,
remote: remote,
upgrader: websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
@ -47,14 +47,14 @@ func New(sessions types.SessionManager, remote types.RemoteManager, broadcast ty
const pingPeriod = 60 * time.Second
type WebSocketHandler struct {
logger zerolog.Logger
upgrader websocket.Upgrader
sessions types.SessionManager
remote types.RemoteManager
conf *config.WebSocket
handler *MessageHandler
conns uint32
shutdown chan bool
logger zerolog.Logger
upgrader websocket.Upgrader
sessions types.SessionManager
remote types.RemoteManager
conf *config.WebSocket
handler *MessageHandler
conns uint32
shutdown chan bool
}
func (ws *WebSocketHandler) Start() error {

View File

@ -10,10 +10,10 @@ import "C"
import (
"fmt"
"regexp"
"sync"
"time"
"unsafe"
"regexp"
"n.eko.moe/neko/internal/types"
)