diff --git a/cmd/serve.go b/cmd/serve.go index f1729c41..f26f472f 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -5,7 +5,7 @@ import ( "github.com/spf13/cobra" "demodesk/neko" - "demodesk/neko/internal/types/config" + "demodesk/neko/internal/config" ) func init() { @@ -17,10 +17,9 @@ func init() { } configs := []config.Config{ + neko.Service.Configs.Capture, neko.Service.Configs.Server, neko.Service.Configs.WebRTC, - neko.Service.Configs.Remote, - neko.Service.Configs.Broadcast, neko.Service.Configs.WebSocket, } diff --git a/internal/broadcast/manager.go b/internal/broadcast/manager.go deleted file mode 100644 index 6b28e3c2..00000000 --- a/internal/broadcast/manager.go +++ /dev/null @@ -1,83 +0,0 @@ -package broadcast - -import ( - "github.com/rs/zerolog" - "github.com/rs/zerolog/log" - - "demodesk/neko/internal/gst" - "demodesk/neko/internal/types/config" -) - -type BroadcastManager struct { - logger zerolog.Logger - pipeline *gst.Pipeline - remote *config.Remote - config *config.Broadcast - enabled bool - url string -} - -func New(remote *config.Remote, config *config.Broadcast) *BroadcastManager { - return &BroadcastManager{ - logger: log.With().Str("module", "remote").Logger(), - remote: remote, - config: config, - enabled: false, - url: "", - } -} - -func (manager *BroadcastManager) Start() { - if !manager.enabled || manager.IsActive() { - return - } - - var err error - manager.pipeline, err = gst.CreateRTMPPipeline( - manager.remote.Device, - manager.remote.Display, - manager.config.Pipeline, - manager.url, - ) - - manager.logger.Info(). - Str("audio_device", manager.remote.Device). - Str("video_display", manager.remote.Display). - Str("rtmp_pipeline_src", manager.pipeline.Src). - Msgf("RTMP pipeline is starting...") - - if err != nil { - manager.logger.Panic().Err(err).Msg("unable to create rtmp pipeline") - return - } - - manager.pipeline.Play() -} - -func (manager *BroadcastManager) Stop() { - if !manager.IsActive() { - return - } - - manager.pipeline.Stop() - manager.pipeline = nil -} - -func (manager *BroadcastManager) IsActive() bool { - return manager.pipeline != nil -} - -func (manager *BroadcastManager) Create(url string) { - manager.url = url - manager.enabled = true - manager.Start() -} - -func (manager *BroadcastManager) Destroy() { - manager.Stop() - manager.enabled = false -} - -func (manager *BroadcastManager) GetUrl() string { - return manager.url -} diff --git a/internal/capture/broadcast.go b/internal/capture/broadcast.go new file mode 100644 index 00000000..692a2c26 --- /dev/null +++ b/internal/capture/broadcast.go @@ -0,0 +1,60 @@ +package capture + +import ( + "demodesk/neko/internal/capture/gst" +) + +func (manager *CaptureManagerCtx) StartBroadcastPipeline() { + var err error + + if manager.IsBoradcasting() || manager.broadcast_url == "" { + return + } + + manager.logger.Info(). + Str("audio_device", manager.config.Device). + Str("video_display", manager.config.Display). + Str("rtmp_pipeline_src", manager.broadcast.Src). + Msgf("Creating broadcast pipeline...") + + manager.broadcast, err = gst.CreateRTMPPipeline( + manager.config.Device, + manager.config.Display, + manager.config.BroadcastPipeline, + manager.broadcast_url, + ) + + if err != nil { + manager.logger.Panic().Err(err).Msg("unable to create broadcast pipeline") + } + + manager.broadcast.Play() + manager.logger.Info().Msgf("Starting broadcast pipeline...") +} + +func (manager *CaptureManagerCtx) StopBroadcastPipeline() { + if !manager.IsBoradcasting() { + return + } + + manager.broadcast.DestroyPipeline() + manager.broadcast = nil +} + +func (manager *CaptureManagerCtx) StartBroadcast(url string) { + manager.broadcast_url = url + manager.StartBroadcastPipeline() +} + +func (manager *CaptureManagerCtx) StopBroadcast() { + manager.broadcast_url = "" + manager.StopBroadcastPipeline() +} + +func (manager *CaptureManagerCtx) IsBoradcasting() bool { + return manager.broadcast != nil +} + +func (manager *CaptureManagerCtx) BroadcastUrl() string { + return manager.broadcast_url +} diff --git a/internal/gst/gst.c b/internal/capture/gst/gst.c similarity index 100% rename from internal/gst/gst.c rename to internal/capture/gst/gst.c diff --git a/internal/gst/gst.go b/internal/capture/gst/gst.go similarity index 92% rename from internal/gst/gst.go rename to internal/capture/gst/gst.go index a147d472..2a138a2d 100644 --- a/internal/gst/gst.go +++ b/internal/capture/gst/gst.go @@ -4,7 +4,6 @@ package gst #cgo pkg-config: gstreamer-1.0 gstreamer-app-1.0 #include "gst.h" - */ import "C" import ( @@ -17,26 +16,6 @@ import ( "demodesk/neko/internal/types" ) -/* - apt-get install \ - libgstreamer1.0-0 \ - gstreamer1.0-plugins-base \ - gstreamer1.0-plugins-good \ - gstreamer1.0-plugins-bad \ - gstreamer1.0-plugins-ugly\ - gstreamer1.0-libav \ - gstreamer1.0-doc \ - gstreamer1.0-tools \ - gstreamer1.0-x \ - gstreamer1.0-alsa \ - gstreamer1.0-pulseaudio - - gst-inspect-1.0 --version - gst-inspect-1.0 plugin - gst-launch-1.0 ximagesrc show-pointer=true use-damage=false ! video/x-raw,framerate=30/1 ! videoconvert ! queue ! vp8enc error-resilient=partitions keyframe-max-dist=10 auto-alt-ref=true cpu-used=5 deadline=1 ! autovideosink - gst-launch-1.0 pulsesrc ! audioconvert ! opusenc ! autoaudiosink -*/ - // Pipeline is a wrapper for a GStreamer Pipeline type Pipeline struct { Pipeline *C.GstElement diff --git a/internal/gst/gst.h b/internal/capture/gst/gst.h similarity index 100% rename from internal/gst/gst.h rename to internal/capture/gst/gst.h diff --git a/internal/capture/manager.go b/internal/capture/manager.go new file mode 100644 index 00000000..889e3ec1 --- /dev/null +++ b/internal/capture/manager.go @@ -0,0 +1,175 @@ +package capture + +import ( + "fmt" + + "github.com/kataras/go-events" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" + + "demodesk/neko/internal/types" + "demodesk/neko/internal/config" + "demodesk/neko/internal/capture/gst" +) + +type CaptureManagerCtx struct { + logger zerolog.Logger + video *gst.Pipeline + audio *gst.Pipeline + broadcast *gst.Pipeline + config *config.Capture + shutdown chan bool + emmiter events.EventEmmiter + streaming bool + broadcast_url string + desktop types.DesktopManager +} + +func New(desktop types.DesktopManager, config *config.Capture) *CaptureManagerCtx { + return &CaptureManagerCtx{ + logger: log.With().Str("module", "capture").Logger(), + shutdown: make(chan bool), + emmiter: events.New(), + config: config, + streaming: false, + broadcast_url: "", + desktop: desktop, + } +} + +func (manager *CaptureManagerCtx) Start() { + manager.logger.Info(). + Str("screen_resolution", fmt.Sprintf("%dx%d@%d", manager.config.ScreenWidth, manager.config.ScreenHeight, manager.config.ScreenRate)). + Msgf("Setting screen resolution...") + + if err := manager.desktop.ChangeScreenSize(manager.config.ScreenWidth, manager.config.ScreenHeight, manager.config.ScreenRate); err != nil { + manager.logger.Warn().Err(err).Msg("unable to change screen size") + } + + manager.CreateVideoPipeline() + manager.CreateAudioPipeline() + manager.StartBroadcastPipeline() + + go func() { + defer func() { + manager.logger.Info().Msg("shutdown") + }() + + for { + select { + case <-manager.shutdown: + return + case sample := <-manager.video.Sample: + manager.emmiter.Emit("video", sample) + case sample := <-manager.audio.Sample: + manager.emmiter.Emit("audio", sample) + } + } + }() +} + +func (manager *CaptureManagerCtx) Shutdown() error { + manager.logger.Info().Msgf("capture shutting down") + manager.video.DestroyPipeline() + manager.audio.DestroyPipeline() + manager.StopBroadcastPipeline() + + manager.shutdown <- true + return nil +} + +func (manager *CaptureManagerCtx) VideoCodec() string { + return manager.config.VideoCodec +} + +func (manager *CaptureManagerCtx) AudioCodec() string { + return manager.config.AudioCodec +} + +func (manager *CaptureManagerCtx) OnVideoFrame(listener func(sample types.Sample)) { + manager.emmiter.On("video", func(payload ...interface{}) { + listener(payload[0].(types.Sample)) + }) +} + +func (manager *CaptureManagerCtx) OnAudioFrame(listener func(sample types.Sample)) { + manager.emmiter.On("audio", func(payload ...interface{}) { + listener(payload[0].(types.Sample)) + }) +} + +func (manager *CaptureManagerCtx) StartStream() { + manager.logger.Info().Msgf("Pipelines starting...") + + manager.video.Start() + manager.audio.Start() + manager.streaming = true +} + +func (manager *CaptureManagerCtx) StopStream() { + manager.logger.Info().Msgf("Pipelines shutting down...") + + manager.video.Stop() + manager.audio.Stop() + manager.streaming = false +} + +func (manager *CaptureManagerCtx) Streaming() bool { + return manager.streaming +} + +func (manager *CaptureManagerCtx) CreateVideoPipeline() { + var err error + + manager.logger.Info(). + Str("video_codec", manager.config.VideoCodec). + Str("video_display", manager.config.Display). + Str("video_params", manager.config.VideoParams). + Msgf("Creating video pipeline...") + + manager.video, err = gst.CreateAppPipeline( + manager.config.VideoCodec, + manager.config.Display, + manager.config.VideoParams, + ) + + if err != nil { + manager.logger.Panic().Err(err).Msg("unable to create video pipeline") + } +} + +func (manager *CaptureManagerCtx) CreateAudioPipeline() { + var err error + + manager.logger.Info(). + Str("audio_codec", manager.config.AudioCodec). + Str("audio_display", manager.config.Device). + Str("audio_params", manager.config.AudioParams). + Msgf("Creating audio pipeline...") + + manager.audio, err = gst.CreateAppPipeline( + manager.config.AudioCodec, + manager.config.Device, + manager.config.AudioParams, + ) + + if err != nil { + manager.logger.Panic().Err(err).Msg("unable to create audio pipeline") + } +} + +func (manager *CaptureManagerCtx) ChangeResolution(width int, height int, rate int) error { + manager.video.DestroyPipeline() + manager.StopBroadcastPipeline() + + defer func() { + manager.CreateVideoPipeline() + + manager.video.Start() + manager.logger.Info().Msg("starting video pipeline...") + + manager.StartBroadcastPipeline() + }() + + return manager.desktop.ChangeScreenSize(width, height, rate) +} diff --git a/internal/types/config/remote.go b/internal/config/capture.go similarity index 89% rename from internal/types/config/remote.go rename to internal/config/capture.go index 045a840b..500f7df5 100644 --- a/internal/types/config/remote.go +++ b/internal/config/capture.go @@ -9,7 +9,7 @@ import ( "github.com/spf13/viper" ) -type Remote struct { +type Capture struct { Display string Device string AudioCodec string @@ -19,9 +19,10 @@ type Remote struct { ScreenWidth int ScreenHeight int ScreenRate int + BroadcastPipeline string } -func (Remote) Init(cmd *cobra.Command) error { +func (Capture) Init(cmd *cobra.Command) error { cmd.PersistentFlags().String("display", ":99.0", "XDisplay to capture") if err := viper.BindPFlag("display", cmd.PersistentFlags().Lookup("display")); err != nil { return err @@ -84,10 +85,16 @@ func (Remote) Init(cmd *cobra.Command) error { return err } + // broadcast + cmd.PersistentFlags().String("broadcast_pipeline", "", "audio video codec parameters to use for broadcasting") + if err := viper.BindPFlag("broadcast_pipeline", cmd.PersistentFlags().Lookup("broadcast_pipeline")); err != nil { + return err + } + return nil } -func (s *Remote) Set() { +func (s *Capture) Set() { videoCodec := webrtc.VP8 if viper.GetBool("vp8") { videoCodec = webrtc.VP8 @@ -133,4 +140,6 @@ func (s *Remote) Set() { s.ScreenRate = int(rate) } } + + s.BroadcastPipeline = viper.GetString("broadcast_pipeline") } diff --git a/internal/types/config/config.go b/internal/config/config.go similarity index 100% rename from internal/types/config/config.go rename to internal/config/config.go diff --git a/internal/types/config/root.go b/internal/config/root.go similarity index 100% rename from internal/types/config/root.go rename to internal/config/root.go diff --git a/internal/types/config/server.go b/internal/config/server.go similarity index 100% rename from internal/types/config/server.go rename to internal/config/server.go diff --git a/internal/types/config/webrtc.go b/internal/config/webrtc.go similarity index 100% rename from internal/types/config/webrtc.go rename to internal/config/webrtc.go diff --git a/internal/types/config/websocket.go b/internal/config/websocket.go similarity index 100% rename from internal/types/config/websocket.go rename to internal/config/websocket.go diff --git a/internal/desktop/clipboard.go b/internal/desktop/clipboard.go new file mode 100644 index 00000000..9171bdbd --- /dev/null +++ b/internal/desktop/clipboard.go @@ -0,0 +1,13 @@ +package desktop + +import ( + "demodesk/neko/internal/desktop/clipboard" +) + +func (manager *DesktopManagerCtx) ReadClipboard() string { + return clipboard.ReadClipboard() +} + +func (manager *DesktopManagerCtx) WriteClipboard(data string) { + clipboard.WriteClipboard(data) +} diff --git a/internal/desktop/clipboard/clipboard.c b/internal/desktop/clipboard/clipboard.c new file mode 100644 index 00000000..c500b33e --- /dev/null +++ b/internal/desktop/clipboard/clipboard.c @@ -0,0 +1,21 @@ +#include "clipboard.h" + +static clipboard_c *CLIPBOARD = NULL; + +clipboard_c *getClipboard(void) { + if (CLIPBOARD == NULL) { + CLIPBOARD = clipboard_new(NULL); + } + + return CLIPBOARD; +} + +void ClipboardSet(char *src) { + clipboard_c *cb = getClipboard(); + clipboard_set_text_ex(cb, src, strlen(src), 0); +} + +char *ClipboardGet() { + clipboard_c *cb = getClipboard(); + return clipboard_text_ex(cb, NULL, 0); +} diff --git a/internal/desktop/clipboard/clipboard.go b/internal/desktop/clipboard/clipboard.go new file mode 100644 index 00000000..b01c2958 --- /dev/null +++ b/internal/desktop/clipboard/clipboard.go @@ -0,0 +1,36 @@ +package clipboard + +/* +#cgo linux CFLAGS: -I/usr/src -I/usr/local/include/ +#cgo linux LDFLAGS: /usr/local/lib/libclipboard.a -L/usr/src -L/usr/local/lib -lxcb + +#include "clipboard.h" +*/ +import "C" + +import ( + "sync" + "unsafe" +) + +var mu = sync.Mutex{} + +func ReadClipboard() string { + mu.Lock() + defer mu.Unlock() + + clipboardUnsafe := C.ClipboardGet() + defer C.free(unsafe.Pointer(clipboardUnsafe)) + + return C.GoString(clipboardUnsafe) +} + +func WriteClipboard(data string) { + mu.Lock() + defer mu.Unlock() + + clipboardUnsafe := C.CString(data) + defer C.free(unsafe.Pointer(clipboardUnsafe)) + + C.ClipboardSet(clipboardUnsafe) +} diff --git a/internal/desktop/clipboard/clipboard.h b/internal/desktop/clipboard/clipboard.h new file mode 100644 index 00000000..5f5cf36a --- /dev/null +++ b/internal/desktop/clipboard/clipboard.h @@ -0,0 +1,9 @@ +#pragma once + +#include +#include + +clipboard_c *getClipboard(void); + +void ClipboardSet(char *src); +char *ClipboardGet(); diff --git a/internal/desktop/manager.go b/internal/desktop/manager.go new file mode 100644 index 00000000..3982ee86 --- /dev/null +++ b/internal/desktop/manager.go @@ -0,0 +1,53 @@ +package desktop + +import ( + "time" + + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" + + "demodesk/neko/internal/desktop/xorg" +) + +type DesktopManagerCtx struct { + logger zerolog.Logger + cleanup *time.Ticker + shutdown chan bool + display string +} + +func New(display string) *DesktopManagerCtx { + return &DesktopManagerCtx{ + logger: log.With().Str("module", "desktop").Logger(), + cleanup: time.NewTicker(1 * time.Second), + shutdown: make(chan bool), + display: display, + } +} + +func (manager *DesktopManagerCtx) Start() { + xorg.Display(manager.display) + + go func() { + defer func() { + manager.logger.Info().Msg("shutdown") + }() + + for { + select { + case <-manager.shutdown: + return + case <-manager.cleanup.C: + xorg.CheckKeys(time.Second * 10) + } + } + }() +} + +func (manager *DesktopManagerCtx) Shutdown() error { + manager.logger.Info().Msgf("remote shutting down") + + manager.cleanup.Stop() + manager.shutdown <- true + return nil +} diff --git a/internal/desktop/xorg.go b/internal/desktop/xorg.go new file mode 100644 index 00000000..59e8643b --- /dev/null +++ b/internal/desktop/xorg.go @@ -0,0 +1,54 @@ +package desktop + +import ( + "demodesk/neko/internal/types" + "demodesk/neko/internal/desktop/xorg" +) + +func (manager *DesktopManagerCtx) ChangeScreenSize(width int, height int, rate int) error { + return xorg.ChangeScreenSize(width, height, rate) +} + +func (manager *DesktopManagerCtx) Move(x, y int) { + xorg.Move(x, y) +} + +func (manager *DesktopManagerCtx) Scroll(x, y int) { + xorg.Scroll(x, y) +} + +func (manager *DesktopManagerCtx) ButtonDown(code int) error { + return xorg.ButtonDown(code) +} + +func (manager *DesktopManagerCtx) KeyDown(code uint64) error { + return xorg.KeyDown(code) +} + +func (manager *DesktopManagerCtx) ButtonUp(code int) error { + return xorg.ButtonUp(code) +} + +func (manager *DesktopManagerCtx) KeyUp(code uint64) error { + return xorg.KeyUp(code) +} + +func (manager *DesktopManagerCtx) ResetKeys() { + xorg.ResetKeys() +} + +func (manager *DesktopManagerCtx) ScreenConfigurations() map[int]types.ScreenConfiguration { + return xorg.ScreenConfigurations +} + +func (manager *DesktopManagerCtx) GetScreenSize() *types.ScreenSize { + return xorg.GetScreenSize() +} + +func (manager *DesktopManagerCtx) SetKeyboardLayout(layout string) { + xorg.SetKeyboardLayout(layout) +} + +func (manager *DesktopManagerCtx) SetKeyboardModifiers(NumLock int, CapsLock int, ScrollLock int) { + xorg.SetKeyboardModifiers(NumLock, CapsLock, ScrollLock) +} diff --git a/internal/xorg/xorg.c b/internal/desktop/xorg/xorg.c similarity index 91% rename from internal/xorg/xorg.c rename to internal/desktop/xorg/xorg.c index 6c7b72eb..ace9a1ff 100644 --- a/internal/xorg/xorg.c +++ b/internal/desktop/xorg/xorg.c @@ -1,6 +1,5 @@ #include "xorg.h" -static clipboard_c *CLIPBOARD = NULL; static Display *DISPLAY = NULL; static char *NAME = ":0.0"; static int REGISTERED = 0; @@ -33,13 +32,6 @@ Display *getXDisplay(void) { return DISPLAY; } -clipboard_c *getClipboard(void) { - if (CLIPBOARD == NULL) { - CLIPBOARD = clipboard_new(NULL); - } - return CLIPBOARD; -} - void XDisplayClose(void) { if (DISPLAY != NULL) { XCloseDisplay(DISPLAY); @@ -118,16 +110,6 @@ void XKey(unsigned long key, int down) { } } -void XClipboardSet(char *src) { - clipboard_c *cb = getClipboard(); - clipboard_set_text_ex(cb, src, strlen(src), 0); -} - -char *XClipboardGet() { - clipboard_c *cb = getClipboard(); - return clipboard_text_ex(cb, NULL, 0); -} - void XGetScreenConfigurations() { Display *display = getXDisplay(); Window root = RootWindow(display, 0); diff --git a/internal/xorg/xorg.go b/internal/desktop/xorg/xorg.go similarity index 89% rename from internal/xorg/xorg.go rename to internal/desktop/xorg/xorg.go index b4bd10dd..4063d352 100644 --- a/internal/xorg/xorg.go +++ b/internal/desktop/xorg/xorg.go @@ -2,7 +2,7 @@ package xorg /* #cgo linux CFLAGS: -I/usr/src -I/usr/local/include/ -#cgo linux LDFLAGS: /usr/local/lib/libclipboard.a -L/usr/src -L/usr/local/lib -lX11 -lXtst -lXrandr -lxcb +#cgo linux LDFLAGS: -L/usr/src -L/usr/local/lib -lX11 -lXtst -lXrandr -lxcb #include "xorg.h" */ @@ -108,26 +108,6 @@ func KeyUp(code uint64) error { return nil } -func ReadClipboard() string { - mu.Lock() - defer mu.Unlock() - - clipboardUnsafe := C.XClipboardGet() - defer C.free(unsafe.Pointer(clipboardUnsafe)) - - return C.GoString(clipboardUnsafe) -} - -func WriteClipboard(data string) { - mu.Lock() - defer mu.Unlock() - - clipboardUnsafe := C.CString(data) - defer C.free(unsafe.Pointer(clipboardUnsafe)) - - C.XClipboardSet(clipboardUnsafe) -} - func ResetKeys() { for code := range debounce_button { //nolint diff --git a/internal/desktop/xorg/xorg.h b/internal/desktop/xorg/xorg.h new file mode 100644 index 00000000..4a7b49a2 --- /dev/null +++ b/internal/desktop/xorg/xorg.h @@ -0,0 +1,41 @@ +#pragma once + +#ifndef XDISPLAY_H +#define XDISPLAY_H + +#include +#include +#include +#include +#include +#include +#include /* For fputs() */ +#include /* For strdup() */ + +extern void goCreateScreenSize(int index, int width, int height, int mwidth, int mheight); +extern void goSetScreenRates(int index, int rate_index, short rate); + +/* Returns the main display, closed either on exit or when closeMainDisplay() +* is invoked. This removes a bit of the overhead of calling XOpenDisplay() & +* XCloseDisplay() everytime the main display needs to be used. +* +* Note that this is almost certainly not thread safe. */ +Display *getXDisplay(void); + +void XMove(int x, int y); +void XScroll(int x, int y); +void XButton(unsigned int button, int down); +void XKey(unsigned long key, int down); + +void XGetScreenConfigurations(); +void XSetScreenConfiguration(int index, short rate); +int XGetScreenSize(); +short XGetScreenRate(); + +void XDisplayClose(void); +void XDisplaySet(char *input); + +void SetKeyboardLayout(char *layout); +void SetKeyboardModifiers(int num_lock, int caps_lock, int scroll_lock); + +#endif diff --git a/internal/http/http.go b/internal/http/http.go index 9136d85e..22f1b83e 100644 --- a/internal/http/http.go +++ b/internal/http/http.go @@ -11,26 +11,19 @@ import ( "github.com/rs/zerolog" "github.com/rs/zerolog/log" - "demodesk/neko/internal/api" - "demodesk/neko/internal/http/endpoint" "demodesk/neko/internal/types" - "demodesk/neko/internal/types/config" + "demodesk/neko/internal/config" + "demodesk/neko/internal/http/endpoint" ) -type Server struct { +type ServerCtx struct { logger zerolog.Logger router *chi.Mux http *http.Server conf *config.Server } -func New( - sessions types.SessionManager, - remote types.RemoteManager, - broadcast types.BroadcastManager, - webSocketHandler types.WebSocketHandler, - conf *config.Server, -) *Server { +func New(webSocketHandler types.WebSocketManager, conf *config.Server) *ServerCtx { logger := log.With().Str("module", "http").Logger() router := chi.NewRouter() @@ -38,10 +31,6 @@ func New( router.Use(middleware.RequestID) // Create a request ID for each request router.Use(Logger) // Log API request calls using custom logger function - // Mount REST API - apiManager := api.New(sessions, remote, broadcast, webSocketHandler, conf) - apiManager.Mount(router) - router.Get("/ws", func(w http.ResponseWriter, r *http.Request) { if webSocketHandler.Upgrade(w, r) != nil { //nolint @@ -70,7 +59,7 @@ func New( Handler: router, } - return &Server{ + return &ServerCtx{ logger: logger, router: router, http: http, @@ -78,7 +67,7 @@ func New( } } -func (s *Server) Start() { +func (s *ServerCtx) Start() { if s.conf.Cert != "" && s.conf.Key != "" { go func() { if err := s.http.ListenAndServeTLS(s.conf.Cert, s.conf.Key); err != http.ErrServerClosed { @@ -96,6 +85,6 @@ func (s *Server) Start() { } } -func (s *Server) Shutdown() error { +func (s *ServerCtx) Shutdown() error { return s.http.Shutdown(context.Background()) } diff --git a/internal/remote/manager.go b/internal/remote/manager.go deleted file mode 100644 index ca01f7e6..00000000 --- a/internal/remote/manager.go +++ /dev/null @@ -1,233 +0,0 @@ -package remote - -import ( - "fmt" - "time" - - "github.com/kataras/go-events" - "github.com/rs/zerolog" - "github.com/rs/zerolog/log" - - "demodesk/neko/internal/gst" - "demodesk/neko/internal/types" - "demodesk/neko/internal/types/config" - "demodesk/neko/internal/xorg" -) - -type RemoteManager struct { - logger zerolog.Logger - video *gst.Pipeline - audio *gst.Pipeline - config *config.Remote - broadcast types.BroadcastManager - cleanup *time.Ticker - shutdown chan bool - emmiter events.EventEmmiter - streaming bool -} - -func New(config *config.Remote, broadcast types.BroadcastManager) *RemoteManager { - return &RemoteManager{ - logger: log.With().Str("module", "remote").Logger(), - cleanup: time.NewTicker(1 * time.Second), - shutdown: make(chan bool), - emmiter: events.New(), - config: config, - broadcast: broadcast, - streaming: false, - } -} - -func (manager *RemoteManager) VideoCodec() string { - return manager.config.VideoCodec -} - -func (manager *RemoteManager) AudioCodec() string { - return manager.config.AudioCodec -} - -func (manager *RemoteManager) Start() { - xorg.Display(manager.config.Display) - - manager.logger.Info(). - Str("screen_resolution", fmt.Sprintf("%dx%d@%d", manager.config.ScreenWidth, manager.config.ScreenHeight, manager.config.ScreenRate)). - Msgf("Setting screen resolution...") - - if err := xorg.ChangeScreenSize(manager.config.ScreenWidth, manager.config.ScreenHeight, manager.config.ScreenRate); err != nil { - manager.logger.Warn().Err(err).Msg("unable to change screen size") - } - - manager.CreateVideoPipeline() - manager.CreateAudioPipeline() - manager.broadcast.Start() - - go func() { - defer func() { - manager.logger.Info().Msg("shutdown") - }() - - for { - select { - case <-manager.shutdown: - return - case sample := <-manager.video.Sample: - manager.emmiter.Emit("video", sample) - case sample := <-manager.audio.Sample: - manager.emmiter.Emit("audio", sample) - case <-manager.cleanup.C: - xorg.CheckKeys(time.Second * 10) - } - } - }() -} - -func (manager *RemoteManager) Shutdown() error { - manager.logger.Info().Msgf("remote shutting down") - manager.video.DestroyPipeline() - manager.audio.DestroyPipeline() - manager.broadcast.Stop() - - manager.cleanup.Stop() - manager.shutdown <- true - return nil -} - -func (manager *RemoteManager) OnVideoFrame(listener func(sample types.Sample)) { - manager.emmiter.On("video", func(payload ...interface{}) { - listener(payload[0].(types.Sample)) - }) -} - -func (manager *RemoteManager) OnAudioFrame(listener func(sample types.Sample)) { - manager.emmiter.On("audio", func(payload ...interface{}) { - listener(payload[0].(types.Sample)) - }) -} - -func (manager *RemoteManager) StartStream() { - manager.logger.Info().Msgf("Pipelines starting...") - - manager.video.Start() - manager.audio.Start() - manager.streaming = true -} - -func (manager *RemoteManager) StopStream() { - manager.logger.Info().Msgf("Pipelines shutting down...") - - manager.video.Stop() - manager.audio.Stop() - manager.streaming = false -} - -func (manager *RemoteManager) Streaming() bool { - return manager.streaming -} - -func (manager *RemoteManager) CreateVideoPipeline() { - var err error - - manager.logger.Info(). - Str("video_codec", manager.config.VideoCodec). - Str("video_display", manager.config.Display). - Str("video_params", manager.config.VideoParams). - Msgf("Creating video pipeline...") - - manager.video, err = gst.CreateAppPipeline( - manager.config.VideoCodec, - manager.config.Display, - manager.config.VideoParams, - ) - - if err != nil { - manager.logger.Panic().Err(err).Msg("unable to create video pipeline") - } -} - -func (manager *RemoteManager) CreateAudioPipeline() { - var err error - - manager.logger.Info(). - Str("audio_codec", manager.config.AudioCodec). - Str("audio_display", manager.config.Device). - Str("audio_params", manager.config.AudioParams). - Msgf("Creating audio pipeline...") - - manager.audio, err = gst.CreateAppPipeline( - manager.config.AudioCodec, - manager.config.Device, - manager.config.AudioParams, - ) - - if err != nil { - manager.logger.Panic().Err(err).Msg("unable to create audio pipeline") - } -} - -func (manager *RemoteManager) ChangeResolution(width int, height int, rate int) error { - manager.video.DestroyPipeline() - manager.broadcast.Stop() - - defer func() { - manager.CreateVideoPipeline() - - manager.video.Start() - manager.broadcast.Start() - - manager.logger.Info().Msg("starting video pipeline...") - }() - - return xorg.ChangeScreenSize(width, height, rate) -} - -func (manager *RemoteManager) Move(x, y int) { - xorg.Move(x, y) -} - -func (manager *RemoteManager) Scroll(x, y int) { - xorg.Scroll(x, y) -} - -func (manager *RemoteManager) ButtonDown(code int) error { - return xorg.ButtonDown(code) -} - -func (manager *RemoteManager) KeyDown(code uint64) error { - return xorg.KeyDown(code) -} - -func (manager *RemoteManager) ButtonUp(code int) error { - return xorg.ButtonUp(code) -} - -func (manager *RemoteManager) KeyUp(code uint64) error { - return xorg.KeyUp(code) -} - -func (manager *RemoteManager) ReadClipboard() string { - return xorg.ReadClipboard() -} - -func (manager *RemoteManager) WriteClipboard(data string) { - xorg.WriteClipboard(data) -} - -func (manager *RemoteManager) ResetKeys() { - xorg.ResetKeys() -} - -func (manager *RemoteManager) ScreenConfigurations() map[int]types.ScreenConfiguration { - return xorg.ScreenConfigurations -} - -func (manager *RemoteManager) GetScreenSize() *types.ScreenSize { - return xorg.GetScreenSize() -} - -func (manager *RemoteManager) SetKeyboardLayout(layout string) { - xorg.SetKeyboardLayout(layout) -} - -func (manager *RemoteManager) SetKeyboardModifiers(NumLock int, CapsLock int, ScrollLock int) { - xorg.SetKeyboardModifiers(NumLock, CapsLock, ScrollLock) -} diff --git a/internal/session/manager.go b/internal/session/manager.go index 987783ce..9a882c37 100644 --- a/internal/session/manager.go +++ b/internal/session/manager.go @@ -9,26 +9,26 @@ import ( "demodesk/neko/internal/utils" ) -func New(remote types.RemoteManager) *SessionManager { - return &SessionManager{ +func New(capture types.CaptureManager) *SessionManagerCtx { + return &SessionManagerCtx{ logger: log.With().Str("module", "session").Logger(), host: nil, - remote: remote, - members: make(map[string]*Session), + capture: capture, + members: make(map[string]*SessionCtx), emmiter: events.New(), } } -type SessionManager struct { +type SessionManagerCtx struct { logger zerolog.Logger host types.Session - remote types.RemoteManager - members map[string]*Session + capture types.CaptureManager + members map[string]*SessionCtx emmiter events.EventEmmiter } -func (manager *SessionManager) New(id string, admin bool, socket types.WebSocket) types.Session { - session := &Session{ +func (manager *SessionManagerCtx) New(id string, admin bool, socket types.WebSocket) types.Session { + session := &SessionCtx{ id: id, admin: admin, manager: manager, @@ -40,80 +40,31 @@ func (manager *SessionManager) New(id string, admin bool, socket types.WebSocket manager.members[id] = session manager.emmiter.Emit("created", session) - if !manager.remote.Streaming() && len(manager.members) > 0 { - manager.remote.StartStream() + if !manager.capture.Streaming() && len(manager.members) > 0 { + manager.capture.StartStream() } return session } -func (manager *SessionManager) HasHost() bool { - return manager.host != nil -} - -func (manager *SessionManager) SetHost(host types.Session) { - manager.host = host - manager.emmiter.Emit("host", host) -} - -func (manager *SessionManager) GetHost() types.Session { - return manager.host -} - -func (manager *SessionManager) ClearHost() { - host := manager.host - manager.host = nil - manager.emmiter.Emit("host_cleared", host) -} - -func (manager *SessionManager) Has(id string) bool { - _, ok := manager.members[id] - return ok -} - -func (manager *SessionManager) Get(id string) (types.Session, bool) { +func (manager *SessionManagerCtx) Get(id string) (types.Session, bool) { session, ok := manager.members[id] return session, ok } -func (manager *SessionManager) Admins() []*types.Member { - members := []*types.Member{} - for _, session := range manager.members { - if !session.connected || !session.admin { - continue - } - - member := session.Member() - if member != nil { - members = append(members, member) - } - } - return members +func (manager *SessionManagerCtx) Has(id string) bool { + _, ok := manager.members[id] + return ok } -func (manager *SessionManager) Members() []*types.Member { - members := []*types.Member{} - for _, session := range manager.members { - if !session.connected { - continue - } - - member := session.Member() - if member != nil { - members = append(members, member) - } - } - return members -} - -func (manager *SessionManager) Destroy(id string) error { +func (manager *SessionManagerCtx) Destroy(id string) error { session, ok := manager.members[id] if ok { delete(manager.members, id) err := session.destroy() - if !manager.remote.Streaming() && len(manager.members) <= 0 { - manager.remote.StopStream() + if !manager.capture.Streaming() && len(manager.members) <= 0 { + manager.capture.StopStream() } manager.emmiter.Emit("destroy", id) @@ -123,7 +74,58 @@ func (manager *SessionManager) Destroy(id string) error { return nil } -func (manager *SessionManager) Broadcast(v interface{}, exclude interface{}) error { +// --- +// host +// --- +func (manager *SessionManagerCtx) HasHost() bool { + return manager.host != nil +} + +func (manager *SessionManagerCtx) SetHost(host types.Session) { + manager.host = host + manager.emmiter.Emit("host", host) +} + +func (manager *SessionManagerCtx) GetHost() types.Session { + return manager.host +} + +func (manager *SessionManagerCtx) ClearHost() { + host := manager.host + manager.host = nil + manager.emmiter.Emit("host_cleared", host) +} + +// --- +// members list +// --- +func (manager *SessionManagerCtx) Admins() []types.Session { + var sessions []types.Session + for _, session := range manager.members { + if !session.connected || !session.admin { + continue + } + + sessions = append(sessions, session) + } + + return sessions +} + +func (manager *SessionManagerCtx) Members() []types.Session { + var sessions []types.Session + for _, session := range manager.members { + if !session.connected { + continue + } + + sessions = append(sessions, session) + } + + return sessions +} + +func (manager *SessionManagerCtx) Broadcast(v interface{}, exclude interface{}) error { for id, session := range manager.members { if !session.connected { continue @@ -142,32 +144,35 @@ func (manager *SessionManager) Broadcast(v interface{}, exclude interface{}) err return nil } -func (manager *SessionManager) OnHost(listener func(session types.Session)) { +// --- +// events +// --- +func (manager *SessionManagerCtx) OnHost(listener func(session types.Session)) { manager.emmiter.On("host", func(payload ...interface{}) { - listener(payload[0].(*Session)) + listener(payload[0].(*SessionCtx)) }) } -func (manager *SessionManager) OnHostCleared(listener func(session types.Session)) { +func (manager *SessionManagerCtx) OnHostCleared(listener func(session types.Session)) { manager.emmiter.On("host_cleared", func(payload ...interface{}) { - listener(payload[0].(*Session)) + listener(payload[0].(*SessionCtx)) }) } -func (manager *SessionManager) OnDestroy(listener func(id string)) { +func (manager *SessionManagerCtx) OnDestroy(listener func(id string)) { manager.emmiter.On("destroy", func(payload ...interface{}) { listener(payload[0].(string)) }) } -func (manager *SessionManager) OnCreated(listener func(session types.Session)) { +func (manager *SessionManagerCtx) OnCreated(listener func(session types.Session)) { manager.emmiter.On("created", func(payload ...interface{}) { - listener(payload[0].(*Session)) + listener(payload[0].(*SessionCtx)) }) } -func (manager *SessionManager) OnConnected(listener func(session types.Session)) { +func (manager *SessionManagerCtx) OnConnected(listener func(session types.Session)) { manager.emmiter.On("connected", func(payload ...interface{}) { - listener(payload[0].(*Session)) + listener(payload[0].(*SessionCtx)) }) } diff --git a/internal/session/session.go b/internal/session/session.go index 41865f02..574d79bb 100644 --- a/internal/session/session.go +++ b/internal/session/session.go @@ -8,43 +8,43 @@ import ( "demodesk/neko/internal/types/message" ) -type Session struct { +type SessionCtx struct { logger zerolog.Logger id string name string admin bool muted bool connected bool - manager *SessionManager + manager *SessionManagerCtx socket types.WebSocket peer types.Peer } -func (session *Session) ID() string { +func (session *SessionCtx) ID() string { return session.id } -func (session *Session) Name() string { +func (session *SessionCtx) Name() string { return session.name } -func (session *Session) Admin() bool { +func (session *SessionCtx) Admin() bool { return session.admin } -func (session *Session) Muted() bool { +func (session *SessionCtx) Muted() bool { return session.muted } -func (session *Session) IsHost() bool { +func (session *SessionCtx) IsHost() bool { return session.manager.host != nil && session.manager.host.ID() == session.ID() } -func (session *Session) Connected() bool { +func (session *SessionCtx) Connected() bool { return session.connected } -func (session *Session) Address() string { +func (session *SessionCtx) Address() string { if session.socket == nil { return "" } @@ -52,41 +52,33 @@ func (session *Session) Address() string { return session.socket.Address() } -func (session *Session) Member() *types.Member { - return &types.Member{ - ID: session.id, - Name: session.name, - Admin: session.admin, - Muted: session.muted, - } -} - -func (session *Session) SetMuted(muted bool) { +func (session *SessionCtx) SetMuted(muted bool) { session.muted = muted } -func (session *Session) SetName(name string) { +func (session *SessionCtx) SetName(name string) { session.name = name } -func (session *Session) SetSocket(socket types.WebSocket) { +func (session *SessionCtx) SetSocket(socket types.WebSocket) { session.socket = socket } -func (session *Session) SetPeer(peer types.Peer) { +func (session *SessionCtx) SetPeer(peer types.Peer) { session.peer = peer } -func (session *Session) SetConnected() { +func (session *SessionCtx) SetConnected() { session.connected = true session.manager.emmiter.Emit("connected", session) } -func (session *Session) Disconnect(reason string) error { +func (session *SessionCtx) Disconnect(reason string) error { if session.socket == nil { return nil } + // TODO: Refcator if err := session.socket.Send(&message.Disconnect{ Event: event.SYSTEM_DISCONNECT, Message: reason, @@ -97,7 +89,7 @@ func (session *Session) Disconnect(reason string) error { return session.manager.Destroy(session.id) } -func (session *Session) Send(v interface{}) error { +func (session *SessionCtx) Send(v interface{}) error { if session.socket == nil { return nil } @@ -105,7 +97,7 @@ func (session *Session) Send(v interface{}) error { return session.socket.Send(v) } -func (session *Session) SignalAnswer(sdp string) error { +func (session *SessionCtx) SignalAnswer(sdp string) error { if session.peer == nil { return nil } @@ -113,7 +105,7 @@ func (session *Session) SignalAnswer(sdp string) error { return session.peer.SignalAnswer(sdp) } -func (session *Session) destroy() error { +func (session *SessionCtx) destroy() error { if session.socket != nil { if err := session.socket.Destroy(); err != nil { return err diff --git a/internal/types/broadcast.go b/internal/types/broadcast.go deleted file mode 100644 index fee3fd52..00000000 --- a/internal/types/broadcast.go +++ /dev/null @@ -1,10 +0,0 @@ -package types - -type BroadcastManager interface { - Start() - Stop() - IsActive() bool - Create(url string) - Destroy() - GetUrl() string -} diff --git a/internal/types/capture.go b/internal/types/capture.go new file mode 100644 index 00000000..7ad3dfee --- /dev/null +++ b/internal/types/capture.go @@ -0,0 +1,29 @@ +package types + +type Sample struct { + Data []byte + Samples uint32 +} + +type CaptureManager interface { + Start() + Shutdown() error + + VideoCodec() string + AudioCodec() string + + OnVideoFrame(listener func(sample Sample)) + OnAudioFrame(listener func(sample Sample)) + + StartStream() + StopStream() + Streaming() bool + + ChangeResolution(width int, height int, rate int) error + + // broacast + StartBroadcast(url string) + StopBroadcast() + IsBoradcasting() bool + BroadcastUrl() string +} diff --git a/internal/types/config/broadcast.go b/internal/types/config/broadcast.go deleted file mode 100644 index 22ef94ac..00000000 --- a/internal/types/config/broadcast.go +++ /dev/null @@ -1,23 +0,0 @@ -package config - -import ( - "github.com/spf13/cobra" - "github.com/spf13/viper" -) - -type Broadcast struct { - Pipeline string -} - -func (Broadcast) Init(cmd *cobra.Command) error { - cmd.PersistentFlags().String("broadcast_pipeline", "", "audio codec parameters to use for broadcasting") - if err := viper.BindPFlag("broadcast_pipeline", cmd.PersistentFlags().Lookup("broadcast_pipeline")); err != nil { - return err - } - - return nil -} - -func (s *Broadcast) Set() { - s.Pipeline = viper.GetString("broadcast_pipeline") -} diff --git a/internal/types/remote.go b/internal/types/desktop.go similarity index 53% rename from internal/types/remote.go rename to internal/types/desktop.go index b437d108..34f81647 100644 --- a/internal/types/remote.go +++ b/internal/types/desktop.go @@ -1,27 +1,36 @@ package types -type RemoteManager interface { - VideoCodec() string - AudioCodec() string +type ScreenSize struct { + Width int `json:"width"` + Height int `json:"height"` + Rate int16 `json:"rate"` +} + +type ScreenConfiguration struct { + Width int `json:"width"` + Height int `json:"height"` + Rates map[int]int16 `json:"rates"` +} + +type DesktopManager interface { Start() Shutdown() error - OnVideoFrame(listener func(sample Sample)) - OnAudioFrame(listener func(sample Sample)) - StartStream() - StopStream() - Streaming() bool - ChangeResolution(width int, height int, rate int) error - GetScreenSize() *ScreenSize - ScreenConfigurations() map[int]ScreenConfiguration + + // xorg + ChangeScreenSize(width int, height int, rate int) error Move(x, y int) Scroll(x, y int) ButtonDown(code int) error KeyDown(code uint64) error ButtonUp(code int) error KeyUp(code uint64) error - ReadClipboard() string - WriteClipboard(data string) ResetKeys() + ScreenConfigurations() map[int]ScreenConfiguration + GetScreenSize() *ScreenSize SetKeyboardLayout(layout string) SetKeyboardModifiers(NumLock int, CapsLock int, ScrollLock int) + + // clipboard + ReadClipboard() string + WriteClipboard(data string) } diff --git a/internal/types/keys.go b/internal/types/keys.go deleted file mode 100644 index f8a2fd16..00000000 --- a/internal/types/keys.go +++ /dev/null @@ -1,14 +0,0 @@ -package types - -type Button struct { - Name string - Code int - Keysym int -} - -type Key struct { - Name string - Value string - Code int - Keysym int -} diff --git a/internal/types/message/messages.go b/internal/types/message/messages.go index ee7a20ba..eef0c8f5 100644 --- a/internal/types/message/messages.go +++ b/internal/types/message/messages.go @@ -28,14 +28,22 @@ type SignalAnswer struct { } type MembersList struct { - Event string `json:"event"` - Memebers []*types.Member `json:"members"` + Event string `json:"event"` + Memebers []*MembersListEntry `json:"members"` +} + +type MembersListEntry struct { + ID string `json:"id"` + Name string `json:"displayname"` + Admin bool `json:"admin"` + Muted bool `json:"muted"` } type Member struct { Event string `json:"event"` - *types.Member + Member *MembersListEntry } + type MemberDisconnected struct { Event string `json:"event"` ID string `json:"id"` diff --git a/internal/types/session.go b/internal/types/session.go index ce973950..136ebb30 100644 --- a/internal/types/session.go +++ b/internal/types/session.go @@ -1,12 +1,5 @@ package types -type Member struct { - ID string `json:"id"` - Name string `json:"displayname"` - Admin bool `json:"admin"` - Muted bool `json:"muted"` -} - type Session interface { ID() string Name() string @@ -14,30 +7,32 @@ type Session interface { Muted() bool IsHost() bool Connected() bool - Member() *Member + Address() string SetMuted(muted bool) SetName(name string) - SetConnected() SetSocket(socket WebSocket) SetPeer(peer Peer) - Address() string - Disconnect(message string) error + SetConnected() + Disconnect(reason string) error Send(v interface{}) error SignalAnswer(sdp string) error } type SessionManager interface { New(id string, admin bool, socket WebSocket) Session + Get(id string) (Session, bool) + Has(id string) bool + Destroy(id string) error + HasHost() bool - SetHost(Session) + SetHost(host Session) GetHost() Session ClearHost() - Has(id string) bool - Get(id string) (Session, bool) - Members() []*Member - Admins() []*Member - Destroy(id string) error + + Admins() []Session + Members() []Session Broadcast(v interface{}, exclude interface{}) error + OnHost(listener func(session Session)) OnHostCleared(listener func(session Session)) OnDestroy(listener func(id string)) diff --git a/internal/types/webrtc.go b/internal/types/webrtc.go index d22253bd..aa8c8c85 100644 --- a/internal/types/webrtc.go +++ b/internal/types/webrtc.go @@ -1,14 +1,9 @@ package types -type Sample struct { - Data []byte - Samples uint32 -} - type WebRTCManager interface { Start() Shutdown() error - CreatePeer(id string, session Session) (string, bool, []string, error) + CreatePeer(session Session) (string, bool, []string, error) } type Peer interface { diff --git a/internal/types/webscoket.go b/internal/types/websocket.go similarity index 86% rename from internal/types/webscoket.go rename to internal/types/websocket.go index fd924b47..442e607b 100644 --- a/internal/types/webscoket.go +++ b/internal/types/websocket.go @@ -8,7 +8,7 @@ type WebSocket interface { Destroy() error } -type WebSocketHandler interface { +type WebSocketManager interface { Start() Shutdown() error Upgrade(w http.ResponseWriter, r *http.Request) error diff --git a/internal/types/xorg.go b/internal/types/xorg.go deleted file mode 100644 index dec51611..00000000 --- a/internal/types/xorg.go +++ /dev/null @@ -1,13 +0,0 @@ -package types - -type ScreenSize struct { - Width int `json:"width"` - Height int `json:"height"` - Rate int16 `json:"rate"` -} - -type ScreenConfiguration struct { - Width int `json:"width"` - Height int `json:"height"` - Rates map[int]int16 `json:"rates"` -} diff --git a/internal/webrtc/handle.go b/internal/webrtc/handle.go index a4a07a39..8ccb05b1 100644 --- a/internal/webrtc/handle.go +++ b/internal/webrtc/handle.go @@ -6,15 +6,15 @@ import ( "strconv" "github.com/pion/webrtc/v2" - - "demodesk/neko/internal/types" ) -const OP_MOVE = 0x01 -const OP_SCROLL = 0x02 -const OP_KEY_DOWN = 0x03 -const OP_KEY_UP = 0x04 -const OP_KEY_CLK = 0x05 +const ( + OP_MOVE = 0x01 + OP_SCROLL = 0x02 + OP_KEY_DOWN = 0x03 + OP_KEY_UP = 0x04 + OP_KEY_CLK = 0x05 +) type PayloadHeader struct { Event uint8 @@ -38,11 +38,7 @@ type PayloadKey struct { Key uint64 } -func (manager *WebRTCManager) handle(session types.Session, msg webrtc.DataChannelMessage) error { - if !session.IsHost() { - return nil - } - +func (manager *WebRTCManagerCtx) handle(msg webrtc.DataChannelMessage) error { buffer := bytes.NewBuffer(msg.Data) header := &PayloadHeader{} hbytes := make([]byte, 3) @@ -64,7 +60,7 @@ func (manager *WebRTCManager) handle(session types.Session, msg webrtc.DataChann return err } - manager.remote.Move(int(payload.X), int(payload.Y)) + manager.desktop.Move(int(payload.X), int(payload.Y)) case OP_SCROLL: payload := &PayloadScroll{} if err := binary.Read(buffer, binary.LittleEndian, payload); err != nil { @@ -77,7 +73,7 @@ func (manager *WebRTCManager) handle(session types.Session, msg webrtc.DataChann Str("y", strconv.Itoa(int(payload.Y))). Msg("scroll") - manager.remote.Scroll(int(payload.X), int(payload.Y)) + manager.desktop.Scroll(int(payload.X), int(payload.Y)) case OP_KEY_DOWN: payload := &PayloadKey{} if err := binary.Read(buffer, binary.LittleEndian, payload); err != nil { @@ -85,7 +81,7 @@ func (manager *WebRTCManager) handle(session types.Session, msg webrtc.DataChann } if payload.Key < 8 { - err := manager.remote.ButtonDown(int(payload.Key)) + err := manager.desktop.ButtonDown(int(payload.Key)) if err != nil { manager.logger.Warn().Err(err).Msg("button down failed") return nil @@ -93,7 +89,7 @@ func (manager *WebRTCManager) handle(session types.Session, msg webrtc.DataChann manager.logger.Debug().Msgf("button down %d", payload.Key) } else { - err := manager.remote.KeyDown(uint64(payload.Key)) + err := manager.desktop.KeyDown(uint64(payload.Key)) if err != nil { manager.logger.Warn().Err(err).Msg("key down failed") return nil @@ -109,7 +105,7 @@ func (manager *WebRTCManager) handle(session types.Session, msg webrtc.DataChann } if payload.Key < 8 { - err := manager.remote.ButtonUp(int(payload.Key)) + err := manager.desktop.ButtonUp(int(payload.Key)) if err != nil { manager.logger.Warn().Err(err).Msg("button up failed") return nil @@ -117,7 +113,7 @@ func (manager *WebRTCManager) handle(session types.Session, msg webrtc.DataChann manager.logger.Debug().Msgf("button up %d", payload.Key) } else { - err := manager.remote.KeyUp(uint64(payload.Key)) + err := manager.desktop.KeyUp(uint64(payload.Key)) if err != nil { manager.logger.Warn().Err(err).Msg("key up failed") return nil diff --git a/internal/webrtc/logger.go b/internal/webrtc/logger.go index 52764867..fd46ca88 100644 --- a/internal/webrtc/logger.go +++ b/internal/webrtc/logger.go @@ -8,17 +8,17 @@ import ( "github.com/rs/zerolog" ) -type nulllog struct{} +type nulllog struct {} -func (l nulllog) Trace(msg string) {} +func (l nulllog) Trace(msg string) {} func (l nulllog) Tracef(format string, args ...interface{}) {} -func (l nulllog) Debug(msg string) {} +func (l nulllog) Debug(msg string) {} func (l nulllog) Debugf(format string, args ...interface{}) {} -func (l nulllog) Info(msg string) {} -func (l nulllog) Infof(format string, args ...interface{}) {} -func (l nulllog) Warn(msg string) {} -func (l nulllog) Warnf(format string, args ...interface{}) {} -func (l nulllog) Error(msg string) {} +func (l nulllog) Info(msg string) {} +func (l nulllog) Infof(format string, args ...interface{}) {} +func (l nulllog) Warn(msg string) {} +func (l nulllog) Warnf(format string, args ...interface{}) {} +func (l nulllog) Error(msg string) {} func (l nulllog) Errorf(format string, args ...interface{}) {} type logger struct { @@ -26,10 +26,18 @@ type logger struct { subsystem string } -func (l logger) Trace(msg string) { l.logger.Trace().Msg(msg) } -func (l logger) Tracef(format string, args ...interface{}) { l.logger.Trace().Msgf(format, args...) } -func (l logger) Debug(msg string) { l.logger.Debug().Msg(msg) } -func (l logger) Debugf(format string, args ...interface{}) { l.logger.Debug().Msgf(format, args...) } +func (l logger) Trace(msg string) { + l.logger.Trace().Msg(msg) +} +func (l logger) Tracef(format string, args ...interface{}) { + l.logger.Trace().Msgf(format, args...) +} +func (l logger) Debug(msg string) { + l.logger.Debug().Msg(msg) +} +func (l logger) Debugf(format string, args ...interface{}) { + l.logger.Debug().Msgf(format, args...) +} func (l logger) Info(msg string) { if strings.Contains(msg, "packetio.Buffer is full") { //l.logger.Panic().Msg(msg) @@ -45,10 +53,18 @@ func (l logger) Infof(format string, args ...interface{}) { } l.logger.Info().Msg(msg) } -func (l logger) Warn(msg string) { l.logger.Warn().Msg(msg) } -func (l logger) Warnf(format string, args ...interface{}) { l.logger.Warn().Msgf(format, args...) } -func (l logger) Error(msg string) { l.logger.Error().Msg(msg) } -func (l logger) Errorf(format string, args ...interface{}) { l.logger.Error().Msgf(format, args...) } +func (l logger) Warn(msg string) { + l.logger.Warn().Msg(msg) +} +func (l logger) Warnf(format string, args ...interface{}) { + l.logger.Warn().Msgf(format, args...) +} +func (l logger) Error(msg string) { + l.logger.Error().Msg(msg) +} +func (l logger) Errorf(format string, args ...interface{}) { + l.logger.Error().Msgf(format, args...) +} type loggerFactory struct { logger zerolog.Logger diff --git a/internal/webrtc/peer.go b/internal/webrtc/peer.go index 59f66dbd..176b838e 100644 --- a/internal/webrtc/peer.go +++ b/internal/webrtc/peer.go @@ -4,26 +4,25 @@ import ( "github.com/pion/webrtc/v2" ) -type Peer struct { - id string +type PeerCtx struct { api *webrtc.API engine *webrtc.MediaEngine - manager *WebRTCManager settings *webrtc.SettingEngine connection *webrtc.PeerConnection configuration *webrtc.Configuration } -func (peer *Peer) SignalAnswer(sdp string) error { - return peer.connection.SetRemoteDescription(webrtc.SessionDescription{SDP: sdp, Type: webrtc.SDPTypeAnswer}) +func (peer *PeerCtx) SignalAnswer(sdp string) error { + return peer.connection.SetRemoteDescription(webrtc.SessionDescription{ + SDP: sdp, + Type: webrtc.SDPTypeAnswer, + }) } -func (peer *Peer) Destroy() error { - if peer.connection != nil && peer.connection.ConnectionState() == webrtc.PeerConnectionStateConnected { - if err := peer.connection.Close(); err != nil { - return err - } +func (peer *PeerCtx) Destroy() error { + if peer.connection == nil || peer.connection.ConnectionState() != webrtc.PeerConnectionStateConnected { + return nil } - - return nil + + return peer.connection.Close() } diff --git a/internal/webrtc/webrtc.go b/internal/webrtc/webrtc.go index d2839780..b5c11a4a 100644 --- a/internal/webrtc/webrtc.go +++ b/internal/webrtc/webrtc.go @@ -12,46 +12,49 @@ import ( "github.com/rs/zerolog/log" "demodesk/neko/internal/types" - "demodesk/neko/internal/types/config" + "demodesk/neko/internal/config" ) -func New(remote types.RemoteManager, config *config.WebRTC) *WebRTCManager { - return &WebRTCManager{ +func New(desktop types.DesktopManager, capture types.CaptureManager, config *config.WebRTC) *WebRTCManagerCtx { + return &WebRTCManagerCtx{ logger: log.With().Str("module", "webrtc").Logger(), - remote: remote, + desktop: desktop, + capture: capture, config: config, } } -type WebRTCManager struct { +type WebRTCManagerCtx struct { logger zerolog.Logger videoTrack *webrtc.Track audioTrack *webrtc.Track videoCodec *webrtc.RTPCodec audioCodec *webrtc.RTPCodec - remote types.RemoteManager + desktop types.DesktopManager + capture types.CaptureManager config *config.WebRTC } -func (manager *WebRTCManager) Start() { +func (manager *WebRTCManagerCtx) Start() { var err error - manager.audioTrack, manager.audioCodec, err = manager.createTrack(manager.remote.AudioCodec()) + + manager.audioTrack, manager.audioCodec, err = manager.createTrack(manager.capture.AudioCodec()) if err != nil { manager.logger.Panic().Err(err).Msg("unable to create audio track") } - manager.remote.OnAudioFrame(func(sample types.Sample) { + manager.capture.OnAudioFrame(func(sample types.Sample) { if err := manager.audioTrack.WriteSample(media.Sample(sample)); err != nil && err != io.ErrClosedPipe { manager.logger.Warn().Err(err).Msg("audio pipeline failed to write") } }) - manager.videoTrack, manager.videoCodec, err = manager.createTrack(manager.remote.VideoCodec()) + manager.videoTrack, manager.videoCodec, err = manager.createTrack(manager.capture.VideoCodec()) if err != nil { manager.logger.Panic().Err(err).Msg("unable to create video track") } - manager.remote.OnVideoFrame(func(sample types.Sample) { + manager.capture.OnVideoFrame(func(sample types.Sample) { if err := manager.videoTrack.WriteSample(media.Sample(sample)); err != nil && err != io.ErrClosedPipe { manager.logger.Warn().Err(err).Msg("video pipeline failed to write") } @@ -65,12 +68,12 @@ func (manager *WebRTCManager) Start() { Msgf("webrtc starting") } -func (manager *WebRTCManager) Shutdown() error { +func (manager *WebRTCManagerCtx) Shutdown() error { manager.logger.Info().Msgf("webrtc shutting down") return nil } -func (manager *WebRTCManager) CreatePeer(id string, session types.Session) (string, bool, []string, error) { +func (manager *WebRTCManagerCtx) CreatePeer(session types.Session) (string, bool, []string, error) { configuration := &webrtc.Configuration{ ICEServers: []webrtc.ICEServer{ { @@ -134,7 +137,11 @@ func (manager *WebRTCManager) CreatePeer(id string, session types.Session) (stri connection.OnDataChannel(func(d *webrtc.DataChannel) { d.OnMessage(func(msg webrtc.DataChannelMessage) { - if err = manager.handle(session, msg); err != nil { + if !session.IsHost() { + return + } + + if err = manager.handle(msg); err != nil { manager.logger.Warn().Err(err).Msg("data handle failed") } }) @@ -148,21 +155,19 @@ func (manager *WebRTCManager) CreatePeer(id string, session types.Session) (stri switch state { case webrtc.PeerConnectionStateDisconnected: case webrtc.PeerConnectionStateFailed: - manager.logger.Info().Str("id", id).Msg("peer disconnected") + manager.logger.Info().Str("id", session.ID()).Msg("peer disconnected") if err:= session.Disconnect("peer connection state failed"); err != nil { manager.logger.Warn().Err(err).Msg("error while disconnecting session") } case webrtc.PeerConnectionStateConnected: - manager.logger.Info().Str("id", id).Msg("peer connected") + manager.logger.Info().Str("id", session.ID()).Msg("peer connected") session.SetConnected() } }) - session.SetPeer(&Peer{ - id: id, + session.SetPeer(&PeerCtx{ api: api, engine: &engine, - manager: manager, settings: &settings, connection: connection, configuration: configuration, @@ -171,7 +176,7 @@ func (manager *WebRTCManager) CreatePeer(id string, session types.Session) (stri return description.SDP, manager.config.ICELite, manager.config.ICEServers, nil } -func (m *WebRTCManager) createTrack(codecName string) (*webrtc.Track, *webrtc.RTPCodec, error) { +func (m *WebRTCManagerCtx) createTrack(codecName string) (*webrtc.Track, *webrtc.RTPCodec, error) { var codec *webrtc.RTPCodec switch codecName { case webrtc.VP8: diff --git a/internal/websocket/broadcast/screen.go b/internal/websocket/broadcast/screen.go deleted file mode 100644 index ead004e3..00000000 --- a/internal/websocket/broadcast/screen.go +++ /dev/null @@ -1,17 +0,0 @@ -package broadcast - -import ( - "demodesk/neko/internal/types" - "demodesk/neko/internal/types/event" - "demodesk/neko/internal/types/message" -) - -func ScreenConfiguration(session types.SessionManager, id string, width int, height int, rate int) error { - return session.Broadcast(message.ScreenResolution{ - Event: event.SCREEN_RESOLUTION, - ID: id, - Width: width, - Height: height, - Rate: rate, - }, nil) -} diff --git a/internal/websocket/admin.go b/internal/websocket/handler/admin.go similarity index 88% rename from internal/websocket/admin.go rename to internal/websocket/handler/admin.go index eeea2db8..e6581cf3 100644 --- a/internal/websocket/admin.go +++ b/internal/websocket/handler/admin.go @@ -1,4 +1,4 @@ -package websocket +package handler import ( "strings" @@ -8,7 +8,7 @@ import ( "demodesk/neko/internal/types/message" ) -func (h *MessageHandler) adminLock(session types.Session) error { +func (h *MessageHandlerCtx) adminLock(session types.Session) error { if !session.Admin() { h.logger.Debug().Msg("user not admin") return nil @@ -33,7 +33,7 @@ func (h *MessageHandler) adminLock(session types.Session) error { return nil } -func (h *MessageHandler) adminUnlock(session types.Session) error { +func (h *MessageHandlerCtx) adminUnlock(session types.Session) error { if !session.Admin() { h.logger.Debug().Msg("user not admin") return nil @@ -58,7 +58,7 @@ func (h *MessageHandler) adminUnlock(session types.Session) error { return nil } -func (h *MessageHandler) adminControl(session types.Session) error { +func (h *MessageHandlerCtx) adminControl(session types.Session) error { if !session.Admin() { h.logger.Debug().Msg("user not admin") return nil @@ -91,7 +91,7 @@ func (h *MessageHandler) adminControl(session types.Session) error { return nil } -func (h *MessageHandler) adminRelease(session types.Session) error { +func (h *MessageHandlerCtx) adminRelease(session types.Session) error { if !session.Admin() { h.logger.Debug().Msg("user not admin") return nil @@ -124,7 +124,7 @@ func (h *MessageHandler) adminRelease(session types.Session) error { return nil } -func (h *MessageHandler) adminGive(session types.Session, payload *message.Admin) error { +func (h *MessageHandlerCtx) adminGive(session types.Session, payload *message.Admin) error { if !session.Admin() { h.logger.Debug().Msg("user not admin") return nil @@ -153,7 +153,7 @@ func (h *MessageHandler) adminGive(session types.Session, payload *message.Admin return nil } -func (h *MessageHandler) adminMute(session types.Session, payload *message.Admin) error { +func (h *MessageHandlerCtx) adminMute(session types.Session, payload *message.Admin) error { if !session.Admin() { h.logger.Debug().Msg("user not admin") return nil @@ -185,7 +185,7 @@ func (h *MessageHandler) adminMute(session types.Session, payload *message.Admin return nil } -func (h *MessageHandler) adminUnmute(session types.Session, payload *message.Admin) error { +func (h *MessageHandlerCtx) adminUnmute(session types.Session, payload *message.Admin) error { if !session.Admin() { h.logger.Debug().Msg("user not admin") return nil @@ -212,7 +212,7 @@ func (h *MessageHandler) adminUnmute(session types.Session, payload *message.Adm return nil } -func (h *MessageHandler) adminKick(session types.Session, payload *message.Admin) error { +func (h *MessageHandlerCtx) adminKick(session types.Session, payload *message.Admin) error { if !session.Admin() { h.logger.Debug().Msg("user not admin") return nil @@ -246,7 +246,7 @@ func (h *MessageHandler) adminKick(session types.Session, payload *message.Admin return nil } -func (h *MessageHandler) adminBan(session types.Session, payload *message.Admin) error { +func (h *MessageHandlerCtx) adminBan(session types.Session, payload *message.Admin) error { if !session.Admin() { h.logger.Debug().Msg("user not admin") return nil diff --git a/internal/websocket/broadcast.go b/internal/websocket/handler/broadcast.go similarity index 64% rename from internal/websocket/broadcast.go rename to internal/websocket/handler/broadcast.go index d62aa798..953c8654 100644 --- a/internal/websocket/broadcast.go +++ b/internal/websocket/handler/broadcast.go @@ -1,4 +1,4 @@ -package websocket +package handler import ( "demodesk/neko/internal/types" @@ -6,13 +6,13 @@ import ( "demodesk/neko/internal/types/message" ) -func (h *MessageHandler) boradcastCreate(session types.Session, payload *message.BroadcastCreate) error { +func (h *MessageHandlerCtx) boradcastCreate(session types.Session, payload *message.BroadcastCreate) error { if !session.Admin() { h.logger.Debug().Msg("user not admin") return nil } - h.broadcast.Create(payload.URL) + h.capture.StartBroadcast(payload.URL) if err := h.boradcastStatus(session); err != nil { return err @@ -21,13 +21,13 @@ func (h *MessageHandler) boradcastCreate(session types.Session, payload *message return nil } -func (h *MessageHandler) boradcastDestroy(session types.Session) error { +func (h *MessageHandlerCtx) boradcastDestroy(session types.Session) error { if !session.Admin() { h.logger.Debug().Msg("user not admin") return nil } - h.broadcast.Destroy() + h.capture.StopBroadcast() if err := h.boradcastStatus(session); err != nil { return err @@ -36,7 +36,7 @@ func (h *MessageHandler) boradcastDestroy(session types.Session) error { return nil } -func (h *MessageHandler) boradcastStatus(session types.Session) error { +func (h *MessageHandlerCtx) boradcastStatus(session types.Session) error { if !session.Admin() { h.logger.Debug().Msg("user not admin") return nil @@ -45,8 +45,8 @@ func (h *MessageHandler) boradcastStatus(session types.Session) error { if err := session.Send( message.BroadcastStatus{ Event: event.BORADCAST_STATUS, - IsActive: h.broadcast.IsActive(), - URL: h.broadcast.GetUrl(), + IsActive: h.capture.IsBoradcasting(), + URL: h.capture.BroadcastUrl(), }); err != nil { h.logger.Warn().Err(err).Msgf("sending event %s has failed", event.BORADCAST_STATUS) return err diff --git a/internal/websocket/control.go b/internal/websocket/handler/control.go similarity index 83% rename from internal/websocket/control.go rename to internal/websocket/handler/control.go index f05afc2b..1ad0a9c9 100644 --- a/internal/websocket/control.go +++ b/internal/websocket/handler/control.go @@ -1,4 +1,4 @@ -package websocket +package handler import ( "demodesk/neko/internal/types" @@ -6,7 +6,7 @@ import ( "demodesk/neko/internal/types/message" ) -func (h *MessageHandler) controlRelease(session types.Session) error { +func (h *MessageHandlerCtx) controlRelease(session types.Session) error { // check if session is host if !session.IsHost() { h.logger.Debug().Str("id", session.ID()).Msg("is not the host") @@ -30,7 +30,7 @@ func (h *MessageHandler) controlRelease(session types.Session) error { return nil } -func (h *MessageHandler) controlRequest(session types.Session) error { +func (h *MessageHandlerCtx) controlRequest(session types.Session) error { host := h.sessions.GetHost() if host == nil { @@ -69,7 +69,7 @@ func (h *MessageHandler) controlRequest(session types.Session) error { return nil } -func (h *MessageHandler) controlGive(session types.Session, payload *message.Control) error { +func (h *MessageHandlerCtx) controlGive(session types.Session, payload *message.Control) error { // check if session is host if !session.IsHost() { h.logger.Debug().Str("id", session.ID()).Msg("is not the host") @@ -99,18 +99,18 @@ func (h *MessageHandler) controlGive(session types.Session, payload *message.Con return nil } -func (h *MessageHandler) controlClipboard(session types.Session, payload *message.Clipboard) error { +func (h *MessageHandlerCtx) controlClipboard(session types.Session, payload *message.Clipboard) error { // check if session is host if !session.IsHost() { h.logger.Debug().Str("id", session.ID()).Msg("is not the host") return nil } - h.remote.WriteClipboard(payload.Text) + h.desktop.WriteClipboard(payload.Text) return nil } -func (h *MessageHandler) controlKeyboard(session types.Session, payload *message.Keyboard) error { +func (h *MessageHandlerCtx) controlKeyboard(session types.Session, payload *message.Keyboard) error { // check if session is host if !session.IsHost() { h.logger.Debug().Str("id", session.ID()).Msg("is not the host") @@ -119,7 +119,7 @@ func (h *MessageHandler) controlKeyboard(session types.Session, payload *message // change layout if payload.Layout != nil { - h.remote.SetKeyboardLayout(*payload.Layout) + h.desktop.SetKeyboardLayout(*payload.Layout) } // set num lock @@ -152,6 +152,6 @@ func (h *MessageHandler) controlKeyboard(session types.Session, payload *message Int("ScrollLock", ScrollLock). Msg("setting keyboard modifiers") - h.remote.SetKeyboardModifiers(NumLock, CapsLock, ScrollLock) + h.desktop.SetKeyboardModifiers(NumLock, CapsLock, ScrollLock) return nil } diff --git a/internal/websocket/handler.go b/internal/websocket/handler/handler.go similarity index 83% rename from internal/websocket/handler.go rename to internal/websocket/handler/handler.go index 54379989..f2ae9e9a 100644 --- a/internal/websocket/handler.go +++ b/internal/websocket/handler/handler.go @@ -1,10 +1,11 @@ -package websocket +package handler import ( "encoding/json" "github.com/pkg/errors" "github.com/rs/zerolog" + "github.com/rs/zerolog/log" "demodesk/neko/internal/types" "demodesk/neko/internal/types/event" @@ -12,40 +13,60 @@ import ( "demodesk/neko/internal/utils" ) -type MessageHandler struct { +func New( + sessions types.SessionManager, + desktop types.DesktopManager, + capture types.CaptureManager, + webrtc types.WebRTCManager, +) *MessageHandlerCtx { + logger := log.With().Str("module", "handler").Logger() + + return &MessageHandlerCtx{ + logger: logger, + sessions: sessions, + desktop: desktop, + capture: capture, + webrtc: webrtc, + banned: make(map[string]bool), + locked: false, + } +} + +type MessageHandlerCtx struct { logger zerolog.Logger sessions types.SessionManager webrtc types.WebRTCManager - remote types.RemoteManager - broadcast types.BroadcastManager + desktop types.DesktopManager + capture types.CaptureManager banned map[string]bool locked bool } -func (h *MessageHandler) Connected(id string, socket *WebSocket) (bool, string, error) { +func (h *MessageHandlerCtx) Connected(id string, socket types.WebSocket) (bool, string) { address := socket.Address() - if address == "" { - h.logger.Debug().Msg("no remote address") - } else { + if address != "" { ok, banned := h.banned[address] if ok && banned { h.logger.Debug().Str("address", address).Msg("banned") - return false, "banned", nil + return false, "banned" } + } else { + h.logger.Debug().Msg("no remote address") } if h.locked { session, ok := h.sessions.Get(id) if !ok || !session.Admin() { h.logger.Debug().Msg("server locked") - return false, "locked", nil + return false, "locked" } } - return true, "", nil + return true, "" } -func (h *MessageHandler) Disconnected(id string) error { +func (h *MessageHandlerCtx) Disconnected(id string) error { + // TODO: Refactor. if h.locked && len(h.sessions.Admins()) == 0 { h.locked = false } @@ -53,7 +74,7 @@ func (h *MessageHandler) Disconnected(id string) error { return h.sessions.Destroy(id) } -func (h *MessageHandler) Message(id string, raw []byte) error { +func (h *MessageHandlerCtx) Message(id string, raw []byte) error { header := message.Message{} if err := json.Unmarshal(raw, &header); err != nil { return err diff --git a/internal/websocket/screen.go b/internal/websocket/handler/screen.go similarity index 54% rename from internal/websocket/screen.go rename to internal/websocket/handler/screen.go index b9a1328d..16417394 100644 --- a/internal/websocket/screen.go +++ b/internal/websocket/handler/screen.go @@ -1,33 +1,38 @@ -package websocket +package handler import ( "demodesk/neko/internal/types" "demodesk/neko/internal/types/event" "demodesk/neko/internal/types/message" - "demodesk/neko/internal/websocket/broadcast" ) -func (h *MessageHandler) screenSet(session types.Session, payload *message.ScreenResolution) error { +func (h *MessageHandlerCtx) screenSet(session types.Session, payload *message.ScreenResolution) error { if !session.Admin() { h.logger.Debug().Msg("user not admin") return nil } - if err := h.remote.ChangeResolution(payload.Width, payload.Height, payload.Rate); err != nil { + if err := h.capture.ChangeResolution(payload.Width, payload.Height, payload.Rate); err != nil { h.logger.Warn().Err(err).Msgf("unable to change screen size") return err } - if err := broadcast.ScreenConfiguration(h.sessions, session.ID(), payload.Width, payload.Height, payload.Rate); err != nil { - h.logger.Warn().Err(err).Msgf("broadcasting event %s has failed", event.SCREEN_RESOLUTION) + if err := h.sessions.Broadcast(message.ScreenResolution{ + Event: event.SCREEN_RESOLUTION, + ID: session.ID(), + Width: payload.Width, + Height: payload.Height, + Rate: payload.Rate, + }, nil); err != nil { + h.logger.Warn().Err(err).Msgf("sending event %s has failed", event.SCREEN_RESOLUTION) return err } return nil } -func (h *MessageHandler) screenResolution(session types.Session) error { - if size := h.remote.GetScreenSize(); size != nil { +func (h *MessageHandlerCtx) screenResolution(session types.Session) error { + if size := h.desktop.GetScreenSize(); size != nil { if err := session.Send(message.ScreenResolution{ Event: event.SCREEN_RESOLUTION, Width: size.Width, @@ -42,7 +47,7 @@ func (h *MessageHandler) screenResolution(session types.Session) error { return nil } -func (h *MessageHandler) screenConfigurations(session types.Session) error { +func (h *MessageHandlerCtx) screenConfigurations(session types.Session) error { if !session.Admin() { h.logger.Debug().Msg("user not admin") return nil @@ -50,7 +55,7 @@ func (h *MessageHandler) screenConfigurations(session types.Session) error { if err := session.Send(message.ScreenConfigurations{ Event: event.SCREEN_CONFIGURATIONS, - Configurations: h.remote.ScreenConfigurations(), + Configurations: h.desktop.ScreenConfigurations(), }); err != nil { h.logger.Warn().Err(err).Msgf("sending event %s has failed", event.SCREEN_CONFIGURATIONS) return err diff --git a/internal/websocket/session.go b/internal/websocket/handler/session.go similarity index 75% rename from internal/websocket/session.go rename to internal/websocket/handler/session.go index 0a7ecbad..58d2d7f2 100644 --- a/internal/websocket/session.go +++ b/internal/websocket/handler/session.go @@ -1,4 +1,4 @@ -package websocket +package handler import ( "demodesk/neko/internal/types" @@ -6,7 +6,7 @@ import ( "demodesk/neko/internal/types/message" ) -func (h *MessageHandler) SessionCreated(session types.Session) error { +func (h *MessageHandlerCtx) SessionCreated(session types.Session) error { // send sdp and id over to client if err := h.signalProvide(session); err != nil { return err @@ -27,11 +27,22 @@ func (h *MessageHandler) SessionCreated(session types.Session) error { return nil } -func (h *MessageHandler) SessionConnected(session types.Session) error { +func (h *MessageHandlerCtx) SessionConnected(session types.Session) error { + // TODO: Refactor. + members := []*message.MembersListEntry{} + for _, session := range h.sessions.Members() { + members = append(members, &message.MembersListEntry{ + ID: session.ID(), + Name: session.Name(), + Admin: session.Admin(), + Muted: session.Muted(), + }) + } + // send list of members to session if err := session.Send(message.MembersList{ Event: event.MEMBER_LIST, - Memebers: h.sessions.Members(), + Memebers: members, }); err != nil { h.logger.Warn().Str("id", session.ID()).Err(err).Msgf("sending event %s has failed", event.MEMBER_LIST) return err @@ -58,7 +69,12 @@ func (h *MessageHandler) SessionConnected(session types.Session) error { if err := h.sessions.Broadcast( message.Member{ Event: event.MEMBER_CONNECTED, - Member: session.Member(), + Member: &message.MembersListEntry{ + ID: session.ID(), + Name: session.Name(), + Admin: session.Admin(), + Muted: session.Muted(), + }, }, nil); err != nil { h.logger.Warn().Err(err).Msgf("broadcasting event %s has failed", event.CONTROL_RELEASE) return err @@ -67,7 +83,7 @@ func (h *MessageHandler) SessionConnected(session types.Session) error { return nil } -func (h *MessageHandler) SessionDestroyed(id string) error { +func (h *MessageHandlerCtx) SessionDestroyed(id string) error { // clear host if exists host := h.sessions.GetHost() if host != nil && host.ID() == id { diff --git a/internal/websocket/signal.go b/internal/websocket/handler/signal.go similarity index 65% rename from internal/websocket/signal.go rename to internal/websocket/handler/signal.go index 9c001beb..b9b76d42 100644 --- a/internal/websocket/signal.go +++ b/internal/websocket/handler/signal.go @@ -1,4 +1,4 @@ -package websocket +package handler import ( "demodesk/neko/internal/types" @@ -6,8 +6,8 @@ import ( "demodesk/neko/internal/types/message" ) -func (h *MessageHandler) signalProvide(session types.Session) error { - sdp, lite, ice, err := h.webrtc.CreatePeer(session.ID(), session) +func (h *MessageHandlerCtx) signalProvide(session types.Session) error { + sdp, lite, ice, err := h.webrtc.CreatePeer(session) if err != nil { return err } @@ -25,7 +25,7 @@ func (h *MessageHandler) signalProvide(session types.Session) error { return nil } -func (h *MessageHandler) signalAnswer(session types.Session, payload *message.SignalAnswer) error { +func (h *MessageHandlerCtx) signalAnswer(session types.Session, payload *message.SignalAnswer) error { session.SetName(payload.DisplayName) if err := session.SignalAnswer(payload.SDP); err != nil { diff --git a/internal/websocket/manager.go b/internal/websocket/manager.go new file mode 100644 index 00000000..7b33ea2c --- /dev/null +++ b/internal/websocket/manager.go @@ -0,0 +1,270 @@ +package websocket + +import ( + "fmt" + "net/http" + "time" + + "github.com/gorilla/websocket" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" + + "demodesk/neko/internal/websocket/handler" + "demodesk/neko/internal/types/event" + "demodesk/neko/internal/types/message" + + "demodesk/neko/internal/types" + "demodesk/neko/internal/config" + "demodesk/neko/internal/utils" +) + +func New( + sessions types.SessionManager, + desktop types.DesktopManager, + capture types.CaptureManager, + webrtc types.WebRTCManager, + conf *config.WebSocket, +) *WebSocketManagerCtx { + logger := log.With().Str("module", "websocket").Logger() + + return &WebSocketManagerCtx{ + logger: logger, + conf: conf, + sessions: sessions, + desktop: desktop, + upgrader: websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { + return true + }, + }, + handler: handler.New(sessions, desktop, capture, webrtc), + } +} + +// Send pings to peer with this period. Must be less than pongWait. +const pingPeriod = 60 * time.Second + +type WebSocketManagerCtx struct { + logger zerolog.Logger + upgrader websocket.Upgrader + sessions types.SessionManager + desktop types.DesktopManager + conf *config.WebSocket + handler *handler.MessageHandlerCtx + shutdown chan bool +} + +func (ws *WebSocketManagerCtx) Start() { + ws.sessions.OnCreated(func(session types.Session) { + if err := ws.handler.SessionCreated(session); err != nil { + ws.logger.Warn().Str("id", session.ID()).Err(err).Msg("session created with and error") + } else { + ws.logger.Debug().Str("id", session.ID()).Msg("session created") + } + }) + + ws.sessions.OnConnected(func(session types.Session) { + if err := ws.handler.SessionConnected(session); err != nil { + ws.logger.Warn().Str("id", session.ID()).Err(err).Msg("session connected with and error") + } else { + ws.logger.Debug().Str("id", session.ID()).Msg("session connected") + } + }) + + ws.sessions.OnDestroy(func(id string) { + if err := ws.handler.SessionDestroyed(id); err != nil { + ws.logger.Warn().Str("id", id).Err(err).Msg("session destroyed with and error") + } else { + ws.logger.Debug().Str("id", id).Msg("session destroyed") + } + }) + + go func() { + defer func() { + ws.logger.Info().Msg("shutdown") + }() + + current := ws.desktop.ReadClipboard() + + for { + select { + case <-ws.shutdown: + return + default: + session := ws.sessions.GetHost() + if session != nil { + break + } + + text := ws.desktop.ReadClipboard() + if text == current { + break + } + + // TODO: Refactor + if err := session.Send(message.Clipboard{ + Event: event.CONTROL_CLIPBOARD, + Text: text, + }); err != nil { + ws.logger.Warn().Err(err).Msg("could not sync clipboard") + } + + current = text + } + + time.Sleep(100 * time.Millisecond) + } + }() +} + +func (ws *WebSocketManagerCtx) Shutdown() error { + ws.shutdown <- true + return nil +} + +func (ws *WebSocketManagerCtx) Upgrade(w http.ResponseWriter, r *http.Request) error { + ws.logger.Debug().Msg("attempting to upgrade connection") + + connection, err := ws.upgrader.Upgrade(w, r, nil) + if err != nil { + ws.logger.Error().Err(err).Msg("failed to upgrade connection") + return err + } + + id, ip, admin, err := ws.authenticate(r) + if err != nil { + ws.logger.Warn().Err(err).Msg("authentication failed") + + // TODO: Refactor + if err = connection.WriteJSON(message.Disconnect{ + Event: event.SYSTEM_DISCONNECT, + Message: "invalid_password", + }); err != nil { + ws.logger.Error().Err(err).Msg("failed to send disconnect") + } + + return connection.Close() + } + + socket := &WebSocketCtx{ + id: id, + ws: ws, + address: ip, + connection: connection, + } + + ok, reason := ws.handler.Connected(id, socket) + if !ok { + // TODO: Refactor + if err = connection.WriteJSON(message.Disconnect{ + Event: event.SYSTEM_DISCONNECT, + Message: reason, + }); err != nil { + ws.logger.Error().Err(err).Msg("failed to send disconnect") + } + + return connection.Close() + } + + ws.sessions.New(id, admin, socket) + + ws.logger. + Debug(). + Str("session", id). + Str("address", connection.RemoteAddr().String()). + Msg("new connection created") + + defer func() { + ws.logger. + Debug(). + Str("session", id). + Str("address", connection.RemoteAddr().String()). + Msg("session ended") + }() + + ws.handle(connection, id) + return nil +} + +// TODO: Refactor +func (ws *WebSocketManagerCtx) authenticate(r *http.Request) (string, string, bool, error) { + ip := r.RemoteAddr + + if ws.conf.Proxy { + ip = utils.ReadUserIP(r) + } + + id, err := utils.NewUID(32) + if err != nil { + return "", ip, false, err + } + + passwords, ok := r.URL.Query()["password"] + if !ok || len(passwords[0]) < 1 { + return "", ip, false, fmt.Errorf("no password provided") + } + + if passwords[0] == ws.conf.AdminPassword { + return id, ip, true, nil + } + + if passwords[0] == ws.conf.Password { + return id, ip, false, nil + } + + return "", ip, false, fmt.Errorf("invalid password: %s", passwords[0]) +} + +func (ws *WebSocketManagerCtx) handle(connection *websocket.Conn, id string) { + bytes := make(chan []byte) + cancel := make(chan struct{}) + ticker := time.NewTicker(pingPeriod) + + go func() { + defer func() { + ticker.Stop() + ws.logger.Debug().Str("address", connection.RemoteAddr().String()).Msg("handle socket ending") + if err := ws.handler.Disconnected(id); err != nil { + ws.logger.Warn().Err(err).Msg("socket disconnected with error") + } + }() + + for { + _, raw, err := connection.ReadMessage() + if err == nil { + bytes <- raw + continue + } + + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { + ws.logger.Warn().Err(err).Msg("read message error") + } else { + ws.logger.Debug().Err(err).Msg("read message error") + } + + close(cancel) + } + }() + + for { + select { + case raw := <-bytes: + ws.logger.Debug(). + Str("session", id). + Str("address", connection.RemoteAddr().String()). + Str("raw", string(raw)). + Msg("received message from client") + + if err := ws.handler.Message(id, raw); err != nil { + ws.logger.Error().Err(err).Msg("message handler has failed") + } + case <-cancel: + return + case <-ticker.C: + if err := connection.WriteMessage(websocket.PingMessage, nil); err != nil { + ws.logger.Error().Err(err).Msg("ping message has failed") + return + } + } + } +} diff --git a/internal/websocket/socket.go b/internal/websocket/socket.go deleted file mode 100644 index c9875bfd..00000000 --- a/internal/websocket/socket.go +++ /dev/null @@ -1,55 +0,0 @@ -package websocket - -import ( - "encoding/json" - "strings" - "sync" - - "github.com/gorilla/websocket" -) - -type WebSocket struct { - id string - address string - ws *WebSocketHandler - connection *websocket.Conn - mu sync.Mutex -} - -func (socket *WebSocket) Address() string { - //remote := socket.connection.RemoteAddr() - address := strings.SplitN(socket.address, ":", -1) - if len(address[0]) < 1 { - return socket.address - } - return address[0] -} - -func (socket *WebSocket) Send(v interface{}) error { - socket.mu.Lock() - defer socket.mu.Unlock() - if socket.connection == nil { - return nil - } - - raw, err := json.Marshal(v) - if err != nil { - return err - } - - socket.ws.logger.Debug(). - Str("session", socket.id). - Str("address", socket.connection.RemoteAddr().String()). - Str("raw", string(raw)). - Msg("sending message to client") - - return socket.connection.WriteMessage(websocket.TextMessage, raw) -} - -func (socket *WebSocket) Destroy() error { - if socket.connection == nil { - return nil - } - - return socket.connection.Close() -} diff --git a/internal/websocket/websocket.go b/internal/websocket/websocket.go index 248f5f78..6dd01327 100644 --- a/internal/websocket/websocket.go +++ b/internal/websocket/websocket.go @@ -1,270 +1,57 @@ package websocket import ( - "fmt" - "net/http" - "time" + "encoding/json" + "strings" + "sync" "github.com/gorilla/websocket" - "github.com/rs/zerolog" - "github.com/rs/zerolog/log" - - "demodesk/neko/internal/types" - "demodesk/neko/internal/types/config" - "demodesk/neko/internal/types/event" - "demodesk/neko/internal/types/message" - "demodesk/neko/internal/utils" ) -func New(sessions types.SessionManager, remote types.RemoteManager, broadcast types.BroadcastManager, webrtc types.WebRTCManager, conf *config.WebSocket) *WebSocketHandler { - logger := log.With().Str("module", "websocket").Logger() - - return &WebSocketHandler{ - logger: logger, - conf: conf, - sessions: sessions, - remote: remote, - upgrader: websocket.Upgrader{ - CheckOrigin: func(r *http.Request) bool { - return true - }, - }, - handler: &MessageHandler{ - logger: logger.With().Str("subsystem", "handler").Logger(), - remote: remote, - broadcast: broadcast, - sessions: sessions, - webrtc: webrtc, - banned: make(map[string]bool), - locked: false, - }, - } +type WebSocketCtx struct { + id string + address string + ws *WebSocketManagerCtx + connection *websocket.Conn + mu sync.Mutex } -// Send pings to peer with this period. Must be less than pongWait. -const pingPeriod = 60 * time.Second - -type WebSocketHandler struct { - logger zerolog.Logger - upgrader websocket.Upgrader - sessions types.SessionManager - remote types.RemoteManager - conf *config.WebSocket - handler *MessageHandler - shutdown chan bool -} - -func (ws *WebSocketHandler) Start() { - ws.sessions.OnCreated(func(session types.Session) { - if err := ws.handler.SessionCreated(session); err != nil { - ws.logger.Warn().Str("id", session.ID()).Err(err).Msg("session created with and error") - } else { - ws.logger.Debug().Str("id", session.ID()).Msg("session created") - } - }) - - ws.sessions.OnConnected(func(session types.Session) { - if err := ws.handler.SessionConnected(session); err != nil { - ws.logger.Warn().Str("id", session.ID()).Err(err).Msg("session connected with and error") - } else { - ws.logger.Debug().Str("id", session.ID()).Msg("session connected") - } - }) - - ws.sessions.OnDestroy(func(id string) { - if err := ws.handler.SessionDestroyed(id); err != nil { - ws.logger.Warn().Str("id", id).Err(err).Msg("session destroyed with and error") - } else { - ws.logger.Debug().Str("id", id).Msg("session destroyed") - } - }) - - go func() { - defer func() { - ws.logger.Info().Msg("shutdown") - }() - - current := ws.remote.ReadClipboard() - - for { - select { - case <-ws.shutdown: - return - default: - if ws.sessions.HasHost() { - text := ws.remote.ReadClipboard() - if text != current { - session := ws.sessions.GetHost() - if session != nil { - if err := session.Send(message.Clipboard{ - Event: event.CONTROL_CLIPBOARD, - Text: text, - }); err != nil { - ws.logger.Warn().Err(err).Msg("could not sync clipboard") - } - } - current = text - } - } - time.Sleep(100 * time.Millisecond) - } - } - }() -} - -func (ws *WebSocketHandler) Shutdown() error { - ws.shutdown <- true - return nil -} - -func (ws *WebSocketHandler) Upgrade(w http.ResponseWriter, r *http.Request) error { - ws.logger.Debug().Msg("attempting to upgrade connection") - - connection, err := ws.upgrader.Upgrade(w, r, nil) - if err != nil { - ws.logger.Error().Err(err).Msg("failed to upgrade connection") - return err +func (socket *WebSocketCtx) Address() string { + //remote := socket.connection.RemoteAddr() + address := strings.SplitN(socket.address, ":", -1) + if len(address[0]) < 1 { + return socket.address } - id, ip, admin, err := ws.authenticate(r) - if err != nil { - ws.logger.Warn().Err(err).Msg("authentication failed") + return address[0] +} - if err = connection.WriteJSON(message.Disconnect{ - Event: event.SYSTEM_DISCONNECT, - Message: "invalid_password", - }); err != nil { - ws.logger.Error().Err(err).Msg("failed to send disconnect") - } +func (socket *WebSocketCtx) Send(v interface{}) error { + socket.mu.Lock() + defer socket.mu.Unlock() - if err = connection.Close(); err != nil { - return err - } + if socket.connection == nil { return nil } - socket := &WebSocket{ - id: id, - ws: ws, - address: ip, - connection: connection, - } - - ok, reason, err := ws.handler.Connected(id, socket) + raw, err := json.Marshal(v) if err != nil { - ws.logger.Error().Err(err).Msg("connection failed") return err } - if !ok { - if err = connection.WriteJSON(message.Disconnect{ - Event: event.SYSTEM_DISCONNECT, - Message: reason, - }); err != nil { - ws.logger.Error().Err(err).Msg("failed to send disconnect") - } + socket.ws.logger.Debug(). + Str("session", socket.id). + Str("address", socket.connection.RemoteAddr().String()). + Str("raw", string(raw)). + Msg("sending message to client") - if err = connection.Close(); err != nil { - return err - } + return socket.connection.WriteMessage(websocket.TextMessage, raw) +} +func (socket *WebSocketCtx) Destroy() error { + if socket.connection == nil { return nil } - ws.sessions.New(id, admin, socket) - - ws.logger. - Debug(). - Str("session", id). - Str("address", connection.RemoteAddr().String()). - Msg("new connection created") - - defer func() { - ws.logger. - Debug(). - Str("session", id). - Str("address", connection.RemoteAddr().String()). - Msg("session ended") - }() - - ws.handle(connection, id) - return nil -} - -func (ws *WebSocketHandler) authenticate(r *http.Request) (string, string, bool, error) { - ip := r.RemoteAddr - - if ws.conf.Proxy { - ip = utils.ReadUserIP(r) - } - - id, err := utils.NewUID(32) - if err != nil { - return "", ip, false, err - } - - passwords, ok := r.URL.Query()["password"] - if !ok || len(passwords[0]) < 1 { - return "", ip, false, fmt.Errorf("no password provided") - } - - if passwords[0] == ws.conf.AdminPassword { - return id, ip, true, nil - } - - if passwords[0] == ws.conf.Password { - return id, ip, false, nil - } - - return "", ip, false, fmt.Errorf("invalid password: %s", passwords[0]) -} - -func (ws *WebSocketHandler) handle(connection *websocket.Conn, id string) { - bytes := make(chan []byte) - cancel := make(chan struct{}) - ticker := time.NewTicker(pingPeriod) - - go func() { - defer func() { - ticker.Stop() - ws.logger.Debug().Str("address", connection.RemoteAddr().String()).Msg("handle socket ending") - if err := ws.handler.Disconnected(id); err != nil { - ws.logger.Warn().Err(err).Msg("socket disconnected with error") - } - }() - - for { - _, raw, err := connection.ReadMessage() - if err != nil { - if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { - ws.logger.Warn().Err(err).Msg("read message error") - } else { - ws.logger.Debug().Err(err).Msg("read message error") - } - close(cancel) - break - } - bytes <- raw - } - }() - - for { - select { - case raw := <-bytes: - ws.logger.Debug(). - Str("session", id). - Str("address", connection.RemoteAddr().String()). - Str("raw", string(raw)). - Msg("received message from client") - if err := ws.handler.Message(id, raw); err != nil { - ws.logger.Error().Err(err).Msg("message handler has failed") - } - case <-cancel: - return - case <-ticker.C: - if err := connection.WriteMessage(websocket.PingMessage, nil); err != nil { - return - } - } - } + return socket.connection.Close() } diff --git a/internal/xorg/xorg.h b/internal/xorg/xorg.h deleted file mode 100644 index 88eca160..00000000 --- a/internal/xorg/xorg.h +++ /dev/null @@ -1,46 +0,0 @@ -#pragma once - -#ifndef XDISPLAY_H - #define XDISPLAY_H - - #include - #include - #include - #include - #include - #include - #include - #include /* For fputs() */ - #include /* For strdup() */ - - extern void goCreateScreenSize(int index, int width, int height, int mwidth, int mheight); - extern void goSetScreenRates(int index, int rate_index, short rate); - - /* Returns the main display, closed either on exit or when closeMainDisplay() - * is invoked. This removes a bit of the overhead of calling XOpenDisplay() & - * XCloseDisplay() everytime the main display needs to be used. - * - * Note that this is almost certainly not thread safe. */ - Display *getXDisplay(void); - clipboard_c *getClipboard(void); - - void XMove(int x, int y); - void XScroll(int x, int y); - void XButton(unsigned int button, int down); - void XKey(unsigned long key, int down); - - void XClipboardSet(char *src); - char *XClipboardGet(); - - void XGetScreenConfigurations(); - void XSetScreenConfiguration(int index, short rate); - int XGetScreenSize(); - short XGetScreenRate(); - - void XDisplayClose(void); - void XDisplaySet(char *input); - - void SetKeyboardLayout(char *layout); - void SetKeyboardModifiers(int num_lock, int caps_lock, int scroll_lock); -#endif - diff --git a/neko.go b/neko.go index 96853d25..b845cc69 100644 --- a/neko.go +++ b/neko.go @@ -6,13 +6,13 @@ import ( "os/signal" "runtime" - "demodesk/neko/internal/broadcast" - "demodesk/neko/internal/http" - "demodesk/neko/internal/remote" - "demodesk/neko/internal/session" - "demodesk/neko/internal/types/config" + "demodesk/neko/internal/config" + "demodesk/neko/internal/desktop" + "demodesk/neko/internal/capture" "demodesk/neko/internal/webrtc" + "demodesk/neko/internal/session" "demodesk/neko/internal/websocket" + "demodesk/neko/internal/http" "github.com/rs/zerolog" "github.com/rs/zerolog/log" @@ -61,10 +61,9 @@ func init() { }, Configs: &Configs{ Root: &config.Root{}, - Server: &config.Server{}, - Remote: &config.Remote{}, - Broadcast: &config.Broadcast{}, + Capture: &config.Capture{}, WebRTC: &config.WebRTC{}, + Server: &config.Server{}, WebSocket: &config.WebSocket{}, }, } @@ -101,10 +100,9 @@ func (i *Version) Details() string { type Configs struct { Root *config.Root - Remote *config.Remote - Broadcast *config.Broadcast - Server *config.Server + Capture *config.Capture WebRTC *config.WebRTC + Server *config.Server WebSocket *config.WebSocket } @@ -112,13 +110,13 @@ type Neko struct { Version *Version Configs *Configs - logger zerolog.Logger - server *http.Server - sessionManager *session.SessionManager - remoteManager *remote.RemoteManager - broadcastManager *broadcast.BroadcastManager - webRTCManager *webrtc.WebRTCManager - webSocketHandler *websocket.WebSocketHandler + logger zerolog.Logger + desktopManager *desktop.DesktopManagerCtx + captureManager *capture.CaptureManagerCtx + webRTCManager *webrtc.WebRTCManagerCtx + sessionManager *session.SessionManagerCtx + webSocketManager *websocket.WebSocketManagerCtx + server *http.ServerCtx } func (neko *Neko) Preflight() { @@ -126,51 +124,55 @@ func (neko *Neko) Preflight() { } func (neko *Neko) Start() { - neko.broadcastManager = broadcast.New( - neko.Configs.Remote, - neko.Configs.Broadcast, + neko.desktopManager = desktop.New( + neko.Configs.Capture.Display, ) + neko.desktopManager.Start() - neko.remoteManager = remote.New( - neko.Configs.Remote, - neko.broadcastManager, + neko.captureManager = capture.New( + neko.desktopManager, + neko.Configs.Capture, ) - neko.remoteManager.Start() + neko.captureManager.Start() neko.webRTCManager = webrtc.New( - neko.remoteManager, + neko.desktopManager, + neko.captureManager, neko.Configs.WebRTC, ) neko.webRTCManager.Start() neko.sessionManager = session.New( - neko.remoteManager, + neko.captureManager, ) - neko.webSocketHandler = websocket.New( + neko.webSocketManager = websocket.New( neko.sessionManager, - neko.remoteManager, - neko.broadcastManager, + neko.desktopManager, + neko.captureManager, neko.webRTCManager, neko.Configs.WebSocket, ) - neko.webSocketHandler.Start() + neko.webSocketManager.Start() neko.server = http.New( - neko.sessionManager, - neko.remoteManager, - neko.broadcastManager, - neko.webSocketHandler, + neko.webSocketManager, neko.Configs.Server, ) neko.server.Start() } func (neko *Neko) Shutdown() { - if err := neko.remoteManager.Shutdown(); err != nil { - neko.logger.Err(err).Msg("remote manager shutdown with an error") + if err := neko.desktopManager.Shutdown(); err != nil { + neko.logger.Err(err).Msg("desktop manager shutdown with an error") } else { - neko.logger.Debug().Msg("remote manager shutdown") + neko.logger.Debug().Msg("desktop manager shutdown") + } + + if err := neko.captureManager.Shutdown(); err != nil { + neko.logger.Err(err).Msg("capture manager shutdown with an error") + } else { + neko.logger.Debug().Msg("capture manager shutdown") } if err := neko.webRTCManager.Shutdown(); err != nil { @@ -179,10 +181,10 @@ func (neko *Neko) Shutdown() { neko.logger.Debug().Msg("webrtc manager shutdown") } - if err := neko.webSocketHandler.Shutdown(); err != nil { - neko.logger.Err(err).Msg("websocket handler shutdown with an error") + if err := neko.webSocketManager.Shutdown(); err != nil { + neko.logger.Err(err).Msg("websocket manager shutdown with an error") } else { - neko.logger.Debug().Msg("websocket handler shutdown") + neko.logger.Debug().Msg("websocket manager shutdown") } if err := neko.server.Shutdown(); err != nil {