From 6c815b019debdc285882fe6d663673732638955d Mon Sep 17 00:00:00 2001 From: Craig Date: Tue, 14 Jan 2020 08:36:48 +0000 Subject: [PATCH] better debouncing --- server/internal/keys/keyboard.go | 2 +- server/internal/webrtc/manager.go | 81 ++++++++++++++++------ server/internal/webrtc/peer.go | 104 ++++++++++++++++++++-------- server/internal/webrtc/websocket.go | 2 + server/neko.go | 13 ++++ 5 files changed, 151 insertions(+), 51 deletions(-) diff --git a/server/internal/keys/keyboard.go b/server/internal/keys/keyboard.go index 65556a2..7d97bbf 100644 --- a/server/internal/keys/keyboard.go +++ b/server/internal/keys/keyboard.go @@ -121,7 +121,7 @@ func init() { Keyboard[KEY_Q] = "q" Keyboard[KEY_R] = "r" Keyboard[KEY_S] = "s" - Keyboard[KEY_T] = "r" + Keyboard[KEY_T] = "t" Keyboard[KEY_U] = "u" Keyboard[KEY_V] = "v" Keyboard[KEY_W] = "w" diff --git a/server/internal/webrtc/manager.go b/server/internal/webrtc/manager.go index 777241d..cfb55bf 100644 --- a/server/internal/webrtc/manager.go +++ b/server/internal/webrtc/manager.go @@ -3,6 +3,7 @@ package webrtc import ( "math/rand" "net/http" + "time" "github.com/gorilla/websocket" "github.com/pion/webrtc/v2" @@ -20,29 +21,36 @@ func NewManager(password string) (*WebRTCManager, error) { if err != nil { return nil, err } - gst.CreatePipeline(webrtc.VP8, []*webrtc.Track{video}, "ximagesrc show-pointer=true use-damage=false ! video/x-raw,framerate=30/1 ! videoconvert").Start() engine.RegisterCodec(videoCodec) - // ximagesrc xid=0 show-pointer=true ! videoconvert ! queue | videotestsrc audioCodec := webrtc.NewRTPOpusCodec(webrtc.DefaultPayloadTypeOpus, 48000) audio, err := webrtc.NewTrack(webrtc.DefaultPayloadTypeOpus, rand.Uint32(), "stream", "stream", audioCodec) if err != nil { return nil, err } - gst.CreatePipeline(webrtc.Opus, []*webrtc.Track{audio}, "pulsesrc device=auto_null.monitor ! audioconvert").Start() engine.RegisterCodec(audioCodec) + + videoPipeline := gst.CreatePipeline(webrtc.VP8, []*webrtc.Track{video}, "ximagesrc show-pointer=true use-damage=false ! video/x-raw,framerate=30/1 ! videoconvert") + // ximagesrc xid=0 show-pointer=true ! videoconvert ! queue | videotestsrc + + audioPipeline := gst.CreatePipeline(webrtc.Opus, []*webrtc.Track{audio}, "pulsesrc device=auto_null.monitor ! audioconvert") // pulsesrc device=auto_null.monitor ! audioconvert | audiotestsrc // gst-launch-1.0 -v pulsesrc device=auto_null.monitor ! audioconvert ! vorbisenc ! oggmux ! filesink location=alsasrc.ogg return &WebRTCManager{ - logger: log.With().Str("service", "webrtc").Logger(), - engine: engine, - api: webrtc.NewAPI(webrtc.WithMediaEngine(engine)), - video: video, - audio: audio, - controller: "", - password: password, - sessions: make(map[string]*session), + logger: log.With().Str("service", "webrtc").Logger(), + engine: engine, + api: webrtc.NewAPI(webrtc.WithMediaEngine(engine)), + video: video, + videoPipeline: videoPipeline, + audio: audio, + audioPipeline: audioPipeline, + controller: "", + password: password, + sessions: make(map[string]*session), + debounce: make(map[int]time.Time), + cleanup: time.NewTicker(500 * time.Second), + shutdown: make(chan bool), upgrader: websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true @@ -60,14 +68,45 @@ func NewManager(password string) (*WebRTCManager, error) { } type WebRTCManager struct { - logger zerolog.Logger - upgrader websocket.Upgrader - engine webrtc.MediaEngine - api *webrtc.API - config webrtc.Configuration - password string - controller string - sessions map[string]*session - video *webrtc.Track - audio *webrtc.Track + logger zerolog.Logger + upgrader websocket.Upgrader + engine webrtc.MediaEngine + api *webrtc.API + config webrtc.Configuration + password string + controller string + sessions map[string]*session + debounce map[int]time.Time + shutdown chan bool + cleanup *time.Ticker + video *webrtc.Track + audio *webrtc.Track + videoPipeline *gst.Pipeline + audioPipeline *gst.Pipeline +} + +func (manager *WebRTCManager) Start() error { + manager.videoPipeline.Start() + manager.audioPipeline.Start() + + go func() { + for { + select { + case <-manager.shutdown: + return + case <-manager.cleanup.C: + manager.checkKeys() + } + } + }() + + return nil +} + +func (manager *WebRTCManager) Shutdown() error { + manager.cleanup.Stop() + manager.shutdown <- true + manager.videoPipeline.Stop() + manager.audioPipeline.Stop() + return nil } diff --git a/server/internal/webrtc/peer.go b/server/internal/webrtc/peer.go index d5e098a..4e6e04a 100644 --- a/server/internal/webrtc/peer.go +++ b/server/internal/webrtc/peer.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/binary" "encoding/json" + "time" "github.com/go-vgo/robotgo" "github.com/pion/webrtc/v2" @@ -76,8 +77,6 @@ func (manager *WebRTCManager) createPeer(session *session, raw []byte) error { return nil } -var debounce = map[int]bool{} - func (manager *WebRTCManager) onData(session *session, msg webrtc.DataChannelMessage) error { if manager.controller != session.id { return nil @@ -101,27 +100,28 @@ func (manager *WebRTCManager) onData(session *session, msg webrtc.DataChannelMes switch header.Event { case 0x01: // MOUSE_MOVE payload := &dataMouseMove{} - err := binary.Read(buffer, binary.LittleEndian, payload) - if err != nil { + if err := binary.Read(buffer, binary.LittleEndian, payload); err != nil { return err } + robotgo.Move(int(payload.X), int(payload.Y)) break case 0x02: // MOUSE_UP payload := &dataMouseKey{} - err := binary.Read(buffer, binary.LittleEndian, payload) - if err != nil { + if err := binary.Read(buffer, binary.LittleEndian, payload); err != nil { return err } - if key, ok := keys.Mouse[int(payload.Key)]; ok { - if !debounce[int(payload.Key)] { + code := int(payload.Key) + if key, ok := keys.Mouse[code]; ok { + if _, ok := manager.debounce[code]; !ok { return nil } - debounce[int(payload.Key)] = false + delete(manager.debounce, code) robotgo.MouseToggle("up", key) + manager.logger.Debug().Msgf("MOUSE_UP key: %v (%v)", code, key) } else { - manager.logger.Warn().Msgf("Unknown MOUSE_DOWN key: %v", payload.Key) + manager.logger.Warn().Msgf("Unknown MOUSE_UP key: %v", code) } break case 0x03: // MOUSE_DOWN @@ -131,15 +131,17 @@ func (manager *WebRTCManager) onData(session *session, msg webrtc.DataChannelMes return err } - if key, ok := keys.Mouse[int(payload.Key)]; ok { - if debounce[int(payload.Key)] { + code := int(payload.Key) + if key, ok := keys.Mouse[code]; ok { + if _, ok := manager.debounce[code]; ok { return nil } - debounce[int(payload.Key)] = true + manager.debounce[code] = time.Now() robotgo.MouseToggle("down", key) + manager.logger.Debug().Msgf("MOUSE_DOWN key: %v (%v)", code, key) } else { - manager.logger.Warn().Msgf("Unknown MOUSE_DOWN key: %v", payload.Key) + manager.logger.Warn().Msgf("Unknown MOUSE_DOWN key: %v", code) } break case 0x04: // MOUSE_CLK @@ -149,8 +151,9 @@ func (manager *WebRTCManager) onData(session *session, msg webrtc.DataChannelMes return err } - if key, ok := keys.Mouse[int(payload.Key)]; ok { - switch int(payload.Key) { + code := int(payload.Key) + if key, ok := keys.Mouse[code]; ok { + switch code { case keys.MOUSE_WHEEL_DOWN: robotgo.Scroll(0, -1) break @@ -166,8 +169,10 @@ func (manager *WebRTCManager) onData(session *session, msg webrtc.DataChannelMes default: robotgo.Click(key, false) } + + manager.logger.Debug().Msgf("MOUSE_CLK key: %v (%v)", code, key) } else { - manager.logger.Warn().Msgf("Unknown MOUSE_CLK key: %v", payload.Key) + manager.logger.Warn().Msgf("Unknown MOUSE_CLK key: %v", code) } break case 0x05: // KEY_DOWN @@ -176,30 +181,35 @@ func (manager *WebRTCManager) onData(session *session, msg webrtc.DataChannelMes if err != nil { return err } - if key, ok := keys.Keyboard[int(payload.Key)]; ok { - if debounce[int(payload.Key)] { + + code := int(payload.Key) + if key, ok := keys.Keyboard[code]; ok { + if _, ok := manager.debounce[code]; ok { return nil } - debounce[int(payload.Key)] = true + manager.debounce[code] = time.Now() robotgo.KeyToggle(key, "down") + manager.logger.Debug().Msgf("KEY_DOWN key: %v (%v)", code, key) } else { - manager.logger.Warn().Msgf("Unknown KEY_DOWN key: %v", payload.Key) + manager.logger.Warn().Msgf("Unknown KEY_DOWN key: %v", code) } break case 0x06: // KEY_UP payload := &dataKeyboardKey{} - err := binary.Read(buffer, binary.LittleEndian, payload) - if err != nil { + if err := binary.Read(buffer, binary.LittleEndian, payload); err != nil { return err } - if key, ok := keys.Keyboard[int(payload.Key)]; ok { - if !debounce[int(payload.Key)] { + + code := int(payload.Key) + if key, ok := keys.Keyboard[code]; ok { + if _, ok := manager.debounce[code]; !ok { return nil } - debounce[int(payload.Key)] = false + delete(manager.debounce, code) robotgo.KeyToggle(key, "up") + manager.logger.Debug().Msgf("KEY_UP key: %v (%v)", code, key) } else { - manager.logger.Warn().Msgf("Unknown KEY_UP key: %v", payload.Key) + manager.logger.Warn().Msgf("Unknown KEY_UP key: %v", code) } break case 0x07: // KEY_CLK @@ -208,13 +218,49 @@ func (manager *WebRTCManager) onData(session *session, msg webrtc.DataChannelMes if err != nil { return err } - if key, ok := keys.Keyboard[int(payload.Key)]; ok { + + code := int(payload.Key) + if key, ok := keys.Keyboard[code]; ok { robotgo.KeyTap(key) + manager.logger.Debug().Msgf("KEY_CLK key: %v (%v)", code, key) } else { - manager.logger.Warn().Msgf("Unknown KEY_CLK key: %v", payload.Key) + manager.logger.Warn().Msgf("Unknown KEY_CLK key: %v", code) } break } return nil } + +func (manager *WebRTCManager) clearKeys() { + for code := range manager.debounce { + if key, ok := keys.Keyboard[code]; ok { + robotgo.MouseToggle(key, "up") + } + + if key, ok := keys.Mouse[code]; ok { + robotgo.KeyToggle(key, "up") + } + + delete(manager.debounce, code) + } +} + +func (manager *WebRTCManager) checkKeys() { + t := time.Now() + for code, start := range manager.debounce { + if t.Sub(start) < (time.Second * 10) { + continue + } + + if key, ok := keys.Keyboard[code]; ok { + robotgo.MouseToggle(key, "up") + } + + if key, ok := keys.Mouse[code]; ok { + robotgo.KeyToggle(key, "up") + } + + delete(manager.debounce, code) + } +} diff --git a/server/internal/webrtc/websocket.go b/server/internal/webrtc/websocket.go index 4671f55..cf0c0d8 100644 --- a/server/internal/webrtc/websocket.go +++ b/server/internal/webrtc/websocket.go @@ -162,6 +162,7 @@ func (manager *WebRTCManager) handleWS(session *session) { func (manager *WebRTCManager) destroy(session *session) { if manager.controller == session.id { manager.controller = "" + manager.clearKeys() for id, sess := range manager.sessions { if id != session.id { if err := sess.send(message{Event: "control/released"}); err != nil { @@ -181,6 +182,7 @@ func (manager *WebRTCManager) destroy(session *session) { func (manager *WebRTCManager) controlRelease(session *session) error { if manager.controller == session.id { manager.controller = "" + manager.clearKeys() if err := session.send(message{Event: "control/release"}); err != nil { return err diff --git a/server/neko.go b/server/neko.go index 4f13fcc..35c97df 100644 --- a/server/neko.go +++ b/server/neko.go @@ -64,6 +64,7 @@ type Neko struct { Serve *config.Serve Logger zerolog.Logger http *http.Server + manager *webrtc.WebRTCManager } func (neko *Neko) Preflight() { @@ -75,6 +76,10 @@ func (neko *Neko) Start() { manager, err := webrtc.NewManager(neko.Serve.Password) if err != nil { + neko.Logger.Panic().Err(err).Msg("Can not create webrtc manager") + } + + if err := manager.Start(); err != nil { neko.Logger.Panic().Err(err).Msg("Can not start webrtc manager") } @@ -132,9 +137,17 @@ func (neko *Neko) Start() { } neko.http = server + neko.manager = manager } func (neko *Neko) Shutdown() { + if neko.manager != nil { + if err := neko.manager.Shutdown(); err != nil { + neko.Logger.Err(err).Msg("WebRTC manager shutdown with an error") + } else { + neko.Logger.Debug().Msg("WebRTC manager shutdown") + } + } if neko.http != nil { if err := neko.http.Shutdown(context.Background()); err != nil { neko.Logger.Err(err).Msg("HTTP server shutdown with an error")