major refactor.

This commit is contained in:
Miroslav Šedivý 2020-11-01 16:09:48 +01:00
parent 5c92b75cf7
commit 5d906e0a8b
53 changed files with 1189 additions and 1133 deletions

View File

@ -5,7 +5,7 @@ import (
"github.com/spf13/cobra"
"demodesk/neko"
"demodesk/neko/internal/types/config"
"demodesk/neko/internal/config"
)
func init() {
@ -17,10 +17,9 @@ func init() {
}
configs := []config.Config{
neko.Service.Configs.Capture,
neko.Service.Configs.Server,
neko.Service.Configs.WebRTC,
neko.Service.Configs.Remote,
neko.Service.Configs.Broadcast,
neko.Service.Configs.WebSocket,
}

View File

@ -1,83 +0,0 @@
package broadcast
import (
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"demodesk/neko/internal/gst"
"demodesk/neko/internal/types/config"
)
type BroadcastManager struct {
logger zerolog.Logger
pipeline *gst.Pipeline
remote *config.Remote
config *config.Broadcast
enabled bool
url string
}
func New(remote *config.Remote, config *config.Broadcast) *BroadcastManager {
return &BroadcastManager{
logger: log.With().Str("module", "remote").Logger(),
remote: remote,
config: config,
enabled: false,
url: "",
}
}
func (manager *BroadcastManager) Start() {
if !manager.enabled || manager.IsActive() {
return
}
var err error
manager.pipeline, err = gst.CreateRTMPPipeline(
manager.remote.Device,
manager.remote.Display,
manager.config.Pipeline,
manager.url,
)
manager.logger.Info().
Str("audio_device", manager.remote.Device).
Str("video_display", manager.remote.Display).
Str("rtmp_pipeline_src", manager.pipeline.Src).
Msgf("RTMP pipeline is starting...")
if err != nil {
manager.logger.Panic().Err(err).Msg("unable to create rtmp pipeline")
return
}
manager.pipeline.Play()
}
func (manager *BroadcastManager) Stop() {
if !manager.IsActive() {
return
}
manager.pipeline.Stop()
manager.pipeline = nil
}
func (manager *BroadcastManager) IsActive() bool {
return manager.pipeline != nil
}
func (manager *BroadcastManager) Create(url string) {
manager.url = url
manager.enabled = true
manager.Start()
}
func (manager *BroadcastManager) Destroy() {
manager.Stop()
manager.enabled = false
}
func (manager *BroadcastManager) GetUrl() string {
return manager.url
}

View File

@ -0,0 +1,60 @@
package capture
import (
"demodesk/neko/internal/capture/gst"
)
func (manager *CaptureManagerCtx) StartBroadcastPipeline() {
var err error
if manager.IsBoradcasting() || manager.broadcast_url == "" {
return
}
manager.logger.Info().
Str("audio_device", manager.config.Device).
Str("video_display", manager.config.Display).
Str("rtmp_pipeline_src", manager.broadcast.Src).
Msgf("Creating broadcast pipeline...")
manager.broadcast, err = gst.CreateRTMPPipeline(
manager.config.Device,
manager.config.Display,
manager.config.BroadcastPipeline,
manager.broadcast_url,
)
if err != nil {
manager.logger.Panic().Err(err).Msg("unable to create broadcast pipeline")
}
manager.broadcast.Play()
manager.logger.Info().Msgf("Starting broadcast pipeline...")
}
func (manager *CaptureManagerCtx) StopBroadcastPipeline() {
if !manager.IsBoradcasting() {
return
}
manager.broadcast.DestroyPipeline()
manager.broadcast = nil
}
func (manager *CaptureManagerCtx) StartBroadcast(url string) {
manager.broadcast_url = url
manager.StartBroadcastPipeline()
}
func (manager *CaptureManagerCtx) StopBroadcast() {
manager.broadcast_url = ""
manager.StopBroadcastPipeline()
}
func (manager *CaptureManagerCtx) IsBoradcasting() bool {
return manager.broadcast != nil
}
func (manager *CaptureManagerCtx) BroadcastUrl() string {
return manager.broadcast_url
}

View File

@ -4,7 +4,6 @@ package gst
#cgo pkg-config: gstreamer-1.0 gstreamer-app-1.0
#include "gst.h"
*/
import "C"
import (
@ -17,26 +16,6 @@ import (
"demodesk/neko/internal/types"
)
/*
apt-get install \
libgstreamer1.0-0 \
gstreamer1.0-plugins-base \
gstreamer1.0-plugins-good \
gstreamer1.0-plugins-bad \
gstreamer1.0-plugins-ugly\
gstreamer1.0-libav \
gstreamer1.0-doc \
gstreamer1.0-tools \
gstreamer1.0-x \
gstreamer1.0-alsa \
gstreamer1.0-pulseaudio
gst-inspect-1.0 --version
gst-inspect-1.0 plugin
gst-launch-1.0 ximagesrc show-pointer=true use-damage=false ! video/x-raw,framerate=30/1 ! videoconvert ! queue ! vp8enc error-resilient=partitions keyframe-max-dist=10 auto-alt-ref=true cpu-used=5 deadline=1 ! autovideosink
gst-launch-1.0 pulsesrc ! audioconvert ! opusenc ! autoaudiosink
*/
// Pipeline is a wrapper for a GStreamer Pipeline
type Pipeline struct {
Pipeline *C.GstElement

175
internal/capture/manager.go Normal file
View File

@ -0,0 +1,175 @@
package capture
import (
"fmt"
"github.com/kataras/go-events"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"demodesk/neko/internal/types"
"demodesk/neko/internal/config"
"demodesk/neko/internal/capture/gst"
)
type CaptureManagerCtx struct {
logger zerolog.Logger
video *gst.Pipeline
audio *gst.Pipeline
broadcast *gst.Pipeline
config *config.Capture
shutdown chan bool
emmiter events.EventEmmiter
streaming bool
broadcast_url string
desktop types.DesktopManager
}
func New(desktop types.DesktopManager, config *config.Capture) *CaptureManagerCtx {
return &CaptureManagerCtx{
logger: log.With().Str("module", "capture").Logger(),
shutdown: make(chan bool),
emmiter: events.New(),
config: config,
streaming: false,
broadcast_url: "",
desktop: desktop,
}
}
func (manager *CaptureManagerCtx) Start() {
manager.logger.Info().
Str("screen_resolution", fmt.Sprintf("%dx%d@%d", manager.config.ScreenWidth, manager.config.ScreenHeight, manager.config.ScreenRate)).
Msgf("Setting screen resolution...")
if err := manager.desktop.ChangeScreenSize(manager.config.ScreenWidth, manager.config.ScreenHeight, manager.config.ScreenRate); err != nil {
manager.logger.Warn().Err(err).Msg("unable to change screen size")
}
manager.CreateVideoPipeline()
manager.CreateAudioPipeline()
manager.StartBroadcastPipeline()
go func() {
defer func() {
manager.logger.Info().Msg("shutdown")
}()
for {
select {
case <-manager.shutdown:
return
case sample := <-manager.video.Sample:
manager.emmiter.Emit("video", sample)
case sample := <-manager.audio.Sample:
manager.emmiter.Emit("audio", sample)
}
}
}()
}
func (manager *CaptureManagerCtx) Shutdown() error {
manager.logger.Info().Msgf("capture shutting down")
manager.video.DestroyPipeline()
manager.audio.DestroyPipeline()
manager.StopBroadcastPipeline()
manager.shutdown <- true
return nil
}
func (manager *CaptureManagerCtx) VideoCodec() string {
return manager.config.VideoCodec
}
func (manager *CaptureManagerCtx) AudioCodec() string {
return manager.config.AudioCodec
}
func (manager *CaptureManagerCtx) OnVideoFrame(listener func(sample types.Sample)) {
manager.emmiter.On("video", func(payload ...interface{}) {
listener(payload[0].(types.Sample))
})
}
func (manager *CaptureManagerCtx) OnAudioFrame(listener func(sample types.Sample)) {
manager.emmiter.On("audio", func(payload ...interface{}) {
listener(payload[0].(types.Sample))
})
}
func (manager *CaptureManagerCtx) StartStream() {
manager.logger.Info().Msgf("Pipelines starting...")
manager.video.Start()
manager.audio.Start()
manager.streaming = true
}
func (manager *CaptureManagerCtx) StopStream() {
manager.logger.Info().Msgf("Pipelines shutting down...")
manager.video.Stop()
manager.audio.Stop()
manager.streaming = false
}
func (manager *CaptureManagerCtx) Streaming() bool {
return manager.streaming
}
func (manager *CaptureManagerCtx) CreateVideoPipeline() {
var err error
manager.logger.Info().
Str("video_codec", manager.config.VideoCodec).
Str("video_display", manager.config.Display).
Str("video_params", manager.config.VideoParams).
Msgf("Creating video pipeline...")
manager.video, err = gst.CreateAppPipeline(
manager.config.VideoCodec,
manager.config.Display,
manager.config.VideoParams,
)
if err != nil {
manager.logger.Panic().Err(err).Msg("unable to create video pipeline")
}
}
func (manager *CaptureManagerCtx) CreateAudioPipeline() {
var err error
manager.logger.Info().
Str("audio_codec", manager.config.AudioCodec).
Str("audio_display", manager.config.Device).
Str("audio_params", manager.config.AudioParams).
Msgf("Creating audio pipeline...")
manager.audio, err = gst.CreateAppPipeline(
manager.config.AudioCodec,
manager.config.Device,
manager.config.AudioParams,
)
if err != nil {
manager.logger.Panic().Err(err).Msg("unable to create audio pipeline")
}
}
func (manager *CaptureManagerCtx) ChangeResolution(width int, height int, rate int) error {
manager.video.DestroyPipeline()
manager.StopBroadcastPipeline()
defer func() {
manager.CreateVideoPipeline()
manager.video.Start()
manager.logger.Info().Msg("starting video pipeline...")
manager.StartBroadcastPipeline()
}()
return manager.desktop.ChangeScreenSize(width, height, rate)
}

View File

@ -9,7 +9,7 @@ import (
"github.com/spf13/viper"
)
type Remote struct {
type Capture struct {
Display string
Device string
AudioCodec string
@ -19,9 +19,10 @@ type Remote struct {
ScreenWidth int
ScreenHeight int
ScreenRate int
BroadcastPipeline string
}
func (Remote) Init(cmd *cobra.Command) error {
func (Capture) Init(cmd *cobra.Command) error {
cmd.PersistentFlags().String("display", ":99.0", "XDisplay to capture")
if err := viper.BindPFlag("display", cmd.PersistentFlags().Lookup("display")); err != nil {
return err
@ -84,10 +85,16 @@ func (Remote) Init(cmd *cobra.Command) error {
return err
}
// broadcast
cmd.PersistentFlags().String("broadcast_pipeline", "", "audio video codec parameters to use for broadcasting")
if err := viper.BindPFlag("broadcast_pipeline", cmd.PersistentFlags().Lookup("broadcast_pipeline")); err != nil {
return err
}
return nil
}
func (s *Remote) Set() {
func (s *Capture) Set() {
videoCodec := webrtc.VP8
if viper.GetBool("vp8") {
videoCodec = webrtc.VP8
@ -133,4 +140,6 @@ func (s *Remote) Set() {
s.ScreenRate = int(rate)
}
}
s.BroadcastPipeline = viper.GetString("broadcast_pipeline")
}

View File

@ -0,0 +1,13 @@
package desktop
import (
"demodesk/neko/internal/desktop/clipboard"
)
func (manager *DesktopManagerCtx) ReadClipboard() string {
return clipboard.ReadClipboard()
}
func (manager *DesktopManagerCtx) WriteClipboard(data string) {
clipboard.WriteClipboard(data)
}

View File

@ -0,0 +1,21 @@
#include "clipboard.h"
static clipboard_c *CLIPBOARD = NULL;
clipboard_c *getClipboard(void) {
if (CLIPBOARD == NULL) {
CLIPBOARD = clipboard_new(NULL);
}
return CLIPBOARD;
}
void ClipboardSet(char *src) {
clipboard_c *cb = getClipboard();
clipboard_set_text_ex(cb, src, strlen(src), 0);
}
char *ClipboardGet() {
clipboard_c *cb = getClipboard();
return clipboard_text_ex(cb, NULL, 0);
}

View File

@ -0,0 +1,36 @@
package clipboard
/*
#cgo linux CFLAGS: -I/usr/src -I/usr/local/include/
#cgo linux LDFLAGS: /usr/local/lib/libclipboard.a -L/usr/src -L/usr/local/lib -lxcb
#include "clipboard.h"
*/
import "C"
import (
"sync"
"unsafe"
)
var mu = sync.Mutex{}
func ReadClipboard() string {
mu.Lock()
defer mu.Unlock()
clipboardUnsafe := C.ClipboardGet()
defer C.free(unsafe.Pointer(clipboardUnsafe))
return C.GoString(clipboardUnsafe)
}
func WriteClipboard(data string) {
mu.Lock()
defer mu.Unlock()
clipboardUnsafe := C.CString(data)
defer C.free(unsafe.Pointer(clipboardUnsafe))
C.ClipboardSet(clipboardUnsafe)
}

View File

@ -0,0 +1,9 @@
#pragma once
#include <libclipboard.h>
#include <string.h>
clipboard_c *getClipboard(void);
void ClipboardSet(char *src);
char *ClipboardGet();

View File

@ -0,0 +1,53 @@
package desktop
import (
"time"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"demodesk/neko/internal/desktop/xorg"
)
type DesktopManagerCtx struct {
logger zerolog.Logger
cleanup *time.Ticker
shutdown chan bool
display string
}
func New(display string) *DesktopManagerCtx {
return &DesktopManagerCtx{
logger: log.With().Str("module", "desktop").Logger(),
cleanup: time.NewTicker(1 * time.Second),
shutdown: make(chan bool),
display: display,
}
}
func (manager *DesktopManagerCtx) Start() {
xorg.Display(manager.display)
go func() {
defer func() {
manager.logger.Info().Msg("shutdown")
}()
for {
select {
case <-manager.shutdown:
return
case <-manager.cleanup.C:
xorg.CheckKeys(time.Second * 10)
}
}
}()
}
func (manager *DesktopManagerCtx) Shutdown() error {
manager.logger.Info().Msgf("remote shutting down")
manager.cleanup.Stop()
manager.shutdown <- true
return nil
}

54
internal/desktop/xorg.go Normal file
View File

@ -0,0 +1,54 @@
package desktop
import (
"demodesk/neko/internal/types"
"demodesk/neko/internal/desktop/xorg"
)
func (manager *DesktopManagerCtx) ChangeScreenSize(width int, height int, rate int) error {
return xorg.ChangeScreenSize(width, height, rate)
}
func (manager *DesktopManagerCtx) Move(x, y int) {
xorg.Move(x, y)
}
func (manager *DesktopManagerCtx) Scroll(x, y int) {
xorg.Scroll(x, y)
}
func (manager *DesktopManagerCtx) ButtonDown(code int) error {
return xorg.ButtonDown(code)
}
func (manager *DesktopManagerCtx) KeyDown(code uint64) error {
return xorg.KeyDown(code)
}
func (manager *DesktopManagerCtx) ButtonUp(code int) error {
return xorg.ButtonUp(code)
}
func (manager *DesktopManagerCtx) KeyUp(code uint64) error {
return xorg.KeyUp(code)
}
func (manager *DesktopManagerCtx) ResetKeys() {
xorg.ResetKeys()
}
func (manager *DesktopManagerCtx) ScreenConfigurations() map[int]types.ScreenConfiguration {
return xorg.ScreenConfigurations
}
func (manager *DesktopManagerCtx) GetScreenSize() *types.ScreenSize {
return xorg.GetScreenSize()
}
func (manager *DesktopManagerCtx) SetKeyboardLayout(layout string) {
xorg.SetKeyboardLayout(layout)
}
func (manager *DesktopManagerCtx) SetKeyboardModifiers(NumLock int, CapsLock int, ScrollLock int) {
xorg.SetKeyboardModifiers(NumLock, CapsLock, ScrollLock)
}

View File

@ -1,6 +1,5 @@
#include "xorg.h"
static clipboard_c *CLIPBOARD = NULL;
static Display *DISPLAY = NULL;
static char *NAME = ":0.0";
static int REGISTERED = 0;
@ -33,13 +32,6 @@ Display *getXDisplay(void) {
return DISPLAY;
}
clipboard_c *getClipboard(void) {
if (CLIPBOARD == NULL) {
CLIPBOARD = clipboard_new(NULL);
}
return CLIPBOARD;
}
void XDisplayClose(void) {
if (DISPLAY != NULL) {
XCloseDisplay(DISPLAY);
@ -118,16 +110,6 @@ void XKey(unsigned long key, int down) {
}
}
void XClipboardSet(char *src) {
clipboard_c *cb = getClipboard();
clipboard_set_text_ex(cb, src, strlen(src), 0);
}
char *XClipboardGet() {
clipboard_c *cb = getClipboard();
return clipboard_text_ex(cb, NULL, 0);
}
void XGetScreenConfigurations() {
Display *display = getXDisplay();
Window root = RootWindow(display, 0);

View File

@ -2,7 +2,7 @@ package xorg
/*
#cgo linux CFLAGS: -I/usr/src -I/usr/local/include/
#cgo linux LDFLAGS: /usr/local/lib/libclipboard.a -L/usr/src -L/usr/local/lib -lX11 -lXtst -lXrandr -lxcb
#cgo linux LDFLAGS: -L/usr/src -L/usr/local/lib -lX11 -lXtst -lXrandr -lxcb
#include "xorg.h"
*/
@ -108,26 +108,6 @@ func KeyUp(code uint64) error {
return nil
}
func ReadClipboard() string {
mu.Lock()
defer mu.Unlock()
clipboardUnsafe := C.XClipboardGet()
defer C.free(unsafe.Pointer(clipboardUnsafe))
return C.GoString(clipboardUnsafe)
}
func WriteClipboard(data string) {
mu.Lock()
defer mu.Unlock()
clipboardUnsafe := C.CString(data)
defer C.free(unsafe.Pointer(clipboardUnsafe))
C.XClipboardSet(clipboardUnsafe)
}
func ResetKeys() {
for code := range debounce_button {
//nolint

View File

@ -0,0 +1,41 @@
#pragma once
#ifndef XDISPLAY_H
#define XDISPLAY_H
#include <X11/Xlib.h>
#include <X11/XKBlib.h>
#include <X11/extensions/Xrandr.h>
#include <X11/extensions/XTest.h>
#include <stdint.h>
#include <stdlib.h>
#include <stdio.h> /* For fputs() */
#include <string.h> /* For strdup() */
extern void goCreateScreenSize(int index, int width, int height, int mwidth, int mheight);
extern void goSetScreenRates(int index, int rate_index, short rate);
/* Returns the main display, closed either on exit or when closeMainDisplay()
* is invoked. This removes a bit of the overhead of calling XOpenDisplay() &
* XCloseDisplay() everytime the main display needs to be used.
*
* Note that this is almost certainly not thread safe. */
Display *getXDisplay(void);
void XMove(int x, int y);
void XScroll(int x, int y);
void XButton(unsigned int button, int down);
void XKey(unsigned long key, int down);
void XGetScreenConfigurations();
void XSetScreenConfiguration(int index, short rate);
int XGetScreenSize();
short XGetScreenRate();
void XDisplayClose(void);
void XDisplaySet(char *input);
void SetKeyboardLayout(char *layout);
void SetKeyboardModifiers(int num_lock, int caps_lock, int scroll_lock);
#endif

View File

@ -11,26 +11,19 @@ import (
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"demodesk/neko/internal/api"
"demodesk/neko/internal/http/endpoint"
"demodesk/neko/internal/types"
"demodesk/neko/internal/types/config"
"demodesk/neko/internal/config"
"demodesk/neko/internal/http/endpoint"
)
type Server struct {
type ServerCtx struct {
logger zerolog.Logger
router *chi.Mux
http *http.Server
conf *config.Server
}
func New(
sessions types.SessionManager,
remote types.RemoteManager,
broadcast types.BroadcastManager,
webSocketHandler types.WebSocketHandler,
conf *config.Server,
) *Server {
func New(webSocketHandler types.WebSocketManager, conf *config.Server) *ServerCtx {
logger := log.With().Str("module", "http").Logger()
router := chi.NewRouter()
@ -38,10 +31,6 @@ func New(
router.Use(middleware.RequestID) // Create a request ID for each request
router.Use(Logger) // Log API request calls using custom logger function
// Mount REST API
apiManager := api.New(sessions, remote, broadcast, webSocketHandler, conf)
apiManager.Mount(router)
router.Get("/ws", func(w http.ResponseWriter, r *http.Request) {
if webSocketHandler.Upgrade(w, r) != nil {
//nolint
@ -70,7 +59,7 @@ func New(
Handler: router,
}
return &Server{
return &ServerCtx{
logger: logger,
router: router,
http: http,
@ -78,7 +67,7 @@ func New(
}
}
func (s *Server) Start() {
func (s *ServerCtx) Start() {
if s.conf.Cert != "" && s.conf.Key != "" {
go func() {
if err := s.http.ListenAndServeTLS(s.conf.Cert, s.conf.Key); err != http.ErrServerClosed {
@ -96,6 +85,6 @@ func (s *Server) Start() {
}
}
func (s *Server) Shutdown() error {
func (s *ServerCtx) Shutdown() error {
return s.http.Shutdown(context.Background())
}

View File

@ -1,233 +0,0 @@
package remote
import (
"fmt"
"time"
"github.com/kataras/go-events"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"demodesk/neko/internal/gst"
"demodesk/neko/internal/types"
"demodesk/neko/internal/types/config"
"demodesk/neko/internal/xorg"
)
type RemoteManager struct {
logger zerolog.Logger
video *gst.Pipeline
audio *gst.Pipeline
config *config.Remote
broadcast types.BroadcastManager
cleanup *time.Ticker
shutdown chan bool
emmiter events.EventEmmiter
streaming bool
}
func New(config *config.Remote, broadcast types.BroadcastManager) *RemoteManager {
return &RemoteManager{
logger: log.With().Str("module", "remote").Logger(),
cleanup: time.NewTicker(1 * time.Second),
shutdown: make(chan bool),
emmiter: events.New(),
config: config,
broadcast: broadcast,
streaming: false,
}
}
func (manager *RemoteManager) VideoCodec() string {
return manager.config.VideoCodec
}
func (manager *RemoteManager) AudioCodec() string {
return manager.config.AudioCodec
}
func (manager *RemoteManager) Start() {
xorg.Display(manager.config.Display)
manager.logger.Info().
Str("screen_resolution", fmt.Sprintf("%dx%d@%d", manager.config.ScreenWidth, manager.config.ScreenHeight, manager.config.ScreenRate)).
Msgf("Setting screen resolution...")
if err := xorg.ChangeScreenSize(manager.config.ScreenWidth, manager.config.ScreenHeight, manager.config.ScreenRate); err != nil {
manager.logger.Warn().Err(err).Msg("unable to change screen size")
}
manager.CreateVideoPipeline()
manager.CreateAudioPipeline()
manager.broadcast.Start()
go func() {
defer func() {
manager.logger.Info().Msg("shutdown")
}()
for {
select {
case <-manager.shutdown:
return
case sample := <-manager.video.Sample:
manager.emmiter.Emit("video", sample)
case sample := <-manager.audio.Sample:
manager.emmiter.Emit("audio", sample)
case <-manager.cleanup.C:
xorg.CheckKeys(time.Second * 10)
}
}
}()
}
func (manager *RemoteManager) Shutdown() error {
manager.logger.Info().Msgf("remote shutting down")
manager.video.DestroyPipeline()
manager.audio.DestroyPipeline()
manager.broadcast.Stop()
manager.cleanup.Stop()
manager.shutdown <- true
return nil
}
func (manager *RemoteManager) OnVideoFrame(listener func(sample types.Sample)) {
manager.emmiter.On("video", func(payload ...interface{}) {
listener(payload[0].(types.Sample))
})
}
func (manager *RemoteManager) OnAudioFrame(listener func(sample types.Sample)) {
manager.emmiter.On("audio", func(payload ...interface{}) {
listener(payload[0].(types.Sample))
})
}
func (manager *RemoteManager) StartStream() {
manager.logger.Info().Msgf("Pipelines starting...")
manager.video.Start()
manager.audio.Start()
manager.streaming = true
}
func (manager *RemoteManager) StopStream() {
manager.logger.Info().Msgf("Pipelines shutting down...")
manager.video.Stop()
manager.audio.Stop()
manager.streaming = false
}
func (manager *RemoteManager) Streaming() bool {
return manager.streaming
}
func (manager *RemoteManager) CreateVideoPipeline() {
var err error
manager.logger.Info().
Str("video_codec", manager.config.VideoCodec).
Str("video_display", manager.config.Display).
Str("video_params", manager.config.VideoParams).
Msgf("Creating video pipeline...")
manager.video, err = gst.CreateAppPipeline(
manager.config.VideoCodec,
manager.config.Display,
manager.config.VideoParams,
)
if err != nil {
manager.logger.Panic().Err(err).Msg("unable to create video pipeline")
}
}
func (manager *RemoteManager) CreateAudioPipeline() {
var err error
manager.logger.Info().
Str("audio_codec", manager.config.AudioCodec).
Str("audio_display", manager.config.Device).
Str("audio_params", manager.config.AudioParams).
Msgf("Creating audio pipeline...")
manager.audio, err = gst.CreateAppPipeline(
manager.config.AudioCodec,
manager.config.Device,
manager.config.AudioParams,
)
if err != nil {
manager.logger.Panic().Err(err).Msg("unable to create audio pipeline")
}
}
func (manager *RemoteManager) ChangeResolution(width int, height int, rate int) error {
manager.video.DestroyPipeline()
manager.broadcast.Stop()
defer func() {
manager.CreateVideoPipeline()
manager.video.Start()
manager.broadcast.Start()
manager.logger.Info().Msg("starting video pipeline...")
}()
return xorg.ChangeScreenSize(width, height, rate)
}
func (manager *RemoteManager) Move(x, y int) {
xorg.Move(x, y)
}
func (manager *RemoteManager) Scroll(x, y int) {
xorg.Scroll(x, y)
}
func (manager *RemoteManager) ButtonDown(code int) error {
return xorg.ButtonDown(code)
}
func (manager *RemoteManager) KeyDown(code uint64) error {
return xorg.KeyDown(code)
}
func (manager *RemoteManager) ButtonUp(code int) error {
return xorg.ButtonUp(code)
}
func (manager *RemoteManager) KeyUp(code uint64) error {
return xorg.KeyUp(code)
}
func (manager *RemoteManager) ReadClipboard() string {
return xorg.ReadClipboard()
}
func (manager *RemoteManager) WriteClipboard(data string) {
xorg.WriteClipboard(data)
}
func (manager *RemoteManager) ResetKeys() {
xorg.ResetKeys()
}
func (manager *RemoteManager) ScreenConfigurations() map[int]types.ScreenConfiguration {
return xorg.ScreenConfigurations
}
func (manager *RemoteManager) GetScreenSize() *types.ScreenSize {
return xorg.GetScreenSize()
}
func (manager *RemoteManager) SetKeyboardLayout(layout string) {
xorg.SetKeyboardLayout(layout)
}
func (manager *RemoteManager) SetKeyboardModifiers(NumLock int, CapsLock int, ScrollLock int) {
xorg.SetKeyboardModifiers(NumLock, CapsLock, ScrollLock)
}

View File

@ -9,26 +9,26 @@ import (
"demodesk/neko/internal/utils"
)
func New(remote types.RemoteManager) *SessionManager {
return &SessionManager{
func New(capture types.CaptureManager) *SessionManagerCtx {
return &SessionManagerCtx{
logger: log.With().Str("module", "session").Logger(),
host: nil,
remote: remote,
members: make(map[string]*Session),
capture: capture,
members: make(map[string]*SessionCtx),
emmiter: events.New(),
}
}
type SessionManager struct {
type SessionManagerCtx struct {
logger zerolog.Logger
host types.Session
remote types.RemoteManager
members map[string]*Session
capture types.CaptureManager
members map[string]*SessionCtx
emmiter events.EventEmmiter
}
func (manager *SessionManager) New(id string, admin bool, socket types.WebSocket) types.Session {
session := &Session{
func (manager *SessionManagerCtx) New(id string, admin bool, socket types.WebSocket) types.Session {
session := &SessionCtx{
id: id,
admin: admin,
manager: manager,
@ -40,80 +40,31 @@ func (manager *SessionManager) New(id string, admin bool, socket types.WebSocket
manager.members[id] = session
manager.emmiter.Emit("created", session)
if !manager.remote.Streaming() && len(manager.members) > 0 {
manager.remote.StartStream()
if !manager.capture.Streaming() && len(manager.members) > 0 {
manager.capture.StartStream()
}
return session
}
func (manager *SessionManager) HasHost() bool {
return manager.host != nil
}
func (manager *SessionManager) SetHost(host types.Session) {
manager.host = host
manager.emmiter.Emit("host", host)
}
func (manager *SessionManager) GetHost() types.Session {
return manager.host
}
func (manager *SessionManager) ClearHost() {
host := manager.host
manager.host = nil
manager.emmiter.Emit("host_cleared", host)
}
func (manager *SessionManager) Has(id string) bool {
_, ok := manager.members[id]
return ok
}
func (manager *SessionManager) Get(id string) (types.Session, bool) {
func (manager *SessionManagerCtx) Get(id string) (types.Session, bool) {
session, ok := manager.members[id]
return session, ok
}
func (manager *SessionManager) Admins() []*types.Member {
members := []*types.Member{}
for _, session := range manager.members {
if !session.connected || !session.admin {
continue
}
member := session.Member()
if member != nil {
members = append(members, member)
}
}
return members
func (manager *SessionManagerCtx) Has(id string) bool {
_, ok := manager.members[id]
return ok
}
func (manager *SessionManager) Members() []*types.Member {
members := []*types.Member{}
for _, session := range manager.members {
if !session.connected {
continue
}
member := session.Member()
if member != nil {
members = append(members, member)
}
}
return members
}
func (manager *SessionManager) Destroy(id string) error {
func (manager *SessionManagerCtx) Destroy(id string) error {
session, ok := manager.members[id]
if ok {
delete(manager.members, id)
err := session.destroy()
if !manager.remote.Streaming() && len(manager.members) <= 0 {
manager.remote.StopStream()
if !manager.capture.Streaming() && len(manager.members) <= 0 {
manager.capture.StopStream()
}
manager.emmiter.Emit("destroy", id)
@ -123,7 +74,58 @@ func (manager *SessionManager) Destroy(id string) error {
return nil
}
func (manager *SessionManager) Broadcast(v interface{}, exclude interface{}) error {
// ---
// host
// ---
func (manager *SessionManagerCtx) HasHost() bool {
return manager.host != nil
}
func (manager *SessionManagerCtx) SetHost(host types.Session) {
manager.host = host
manager.emmiter.Emit("host", host)
}
func (manager *SessionManagerCtx) GetHost() types.Session {
return manager.host
}
func (manager *SessionManagerCtx) ClearHost() {
host := manager.host
manager.host = nil
manager.emmiter.Emit("host_cleared", host)
}
// ---
// members list
// ---
func (manager *SessionManagerCtx) Admins() []types.Session {
var sessions []types.Session
for _, session := range manager.members {
if !session.connected || !session.admin {
continue
}
sessions = append(sessions, session)
}
return sessions
}
func (manager *SessionManagerCtx) Members() []types.Session {
var sessions []types.Session
for _, session := range manager.members {
if !session.connected {
continue
}
sessions = append(sessions, session)
}
return sessions
}
func (manager *SessionManagerCtx) Broadcast(v interface{}, exclude interface{}) error {
for id, session := range manager.members {
if !session.connected {
continue
@ -142,32 +144,35 @@ func (manager *SessionManager) Broadcast(v interface{}, exclude interface{}) err
return nil
}
func (manager *SessionManager) OnHost(listener func(session types.Session)) {
// ---
// events
// ---
func (manager *SessionManagerCtx) OnHost(listener func(session types.Session)) {
manager.emmiter.On("host", func(payload ...interface{}) {
listener(payload[0].(*Session))
listener(payload[0].(*SessionCtx))
})
}
func (manager *SessionManager) OnHostCleared(listener func(session types.Session)) {
func (manager *SessionManagerCtx) OnHostCleared(listener func(session types.Session)) {
manager.emmiter.On("host_cleared", func(payload ...interface{}) {
listener(payload[0].(*Session))
listener(payload[0].(*SessionCtx))
})
}
func (manager *SessionManager) OnDestroy(listener func(id string)) {
func (manager *SessionManagerCtx) OnDestroy(listener func(id string)) {
manager.emmiter.On("destroy", func(payload ...interface{}) {
listener(payload[0].(string))
})
}
func (manager *SessionManager) OnCreated(listener func(session types.Session)) {
func (manager *SessionManagerCtx) OnCreated(listener func(session types.Session)) {
manager.emmiter.On("created", func(payload ...interface{}) {
listener(payload[0].(*Session))
listener(payload[0].(*SessionCtx))
})
}
func (manager *SessionManager) OnConnected(listener func(session types.Session)) {
func (manager *SessionManagerCtx) OnConnected(listener func(session types.Session)) {
manager.emmiter.On("connected", func(payload ...interface{}) {
listener(payload[0].(*Session))
listener(payload[0].(*SessionCtx))
})
}

View File

@ -8,43 +8,43 @@ import (
"demodesk/neko/internal/types/message"
)
type Session struct {
type SessionCtx struct {
logger zerolog.Logger
id string
name string
admin bool
muted bool
connected bool
manager *SessionManager
manager *SessionManagerCtx
socket types.WebSocket
peer types.Peer
}
func (session *Session) ID() string {
func (session *SessionCtx) ID() string {
return session.id
}
func (session *Session) Name() string {
func (session *SessionCtx) Name() string {
return session.name
}
func (session *Session) Admin() bool {
func (session *SessionCtx) Admin() bool {
return session.admin
}
func (session *Session) Muted() bool {
func (session *SessionCtx) Muted() bool {
return session.muted
}
func (session *Session) IsHost() bool {
func (session *SessionCtx) IsHost() bool {
return session.manager.host != nil && session.manager.host.ID() == session.ID()
}
func (session *Session) Connected() bool {
func (session *SessionCtx) Connected() bool {
return session.connected
}
func (session *Session) Address() string {
func (session *SessionCtx) Address() string {
if session.socket == nil {
return ""
}
@ -52,41 +52,33 @@ func (session *Session) Address() string {
return session.socket.Address()
}
func (session *Session) Member() *types.Member {
return &types.Member{
ID: session.id,
Name: session.name,
Admin: session.admin,
Muted: session.muted,
}
}
func (session *Session) SetMuted(muted bool) {
func (session *SessionCtx) SetMuted(muted bool) {
session.muted = muted
}
func (session *Session) SetName(name string) {
func (session *SessionCtx) SetName(name string) {
session.name = name
}
func (session *Session) SetSocket(socket types.WebSocket) {
func (session *SessionCtx) SetSocket(socket types.WebSocket) {
session.socket = socket
}
func (session *Session) SetPeer(peer types.Peer) {
func (session *SessionCtx) SetPeer(peer types.Peer) {
session.peer = peer
}
func (session *Session) SetConnected() {
func (session *SessionCtx) SetConnected() {
session.connected = true
session.manager.emmiter.Emit("connected", session)
}
func (session *Session) Disconnect(reason string) error {
func (session *SessionCtx) Disconnect(reason string) error {
if session.socket == nil {
return nil
}
// TODO: Refcator
if err := session.socket.Send(&message.Disconnect{
Event: event.SYSTEM_DISCONNECT,
Message: reason,
@ -97,7 +89,7 @@ func (session *Session) Disconnect(reason string) error {
return session.manager.Destroy(session.id)
}
func (session *Session) Send(v interface{}) error {
func (session *SessionCtx) Send(v interface{}) error {
if session.socket == nil {
return nil
}
@ -105,7 +97,7 @@ func (session *Session) Send(v interface{}) error {
return session.socket.Send(v)
}
func (session *Session) SignalAnswer(sdp string) error {
func (session *SessionCtx) SignalAnswer(sdp string) error {
if session.peer == nil {
return nil
}
@ -113,7 +105,7 @@ func (session *Session) SignalAnswer(sdp string) error {
return session.peer.SignalAnswer(sdp)
}
func (session *Session) destroy() error {
func (session *SessionCtx) destroy() error {
if session.socket != nil {
if err := session.socket.Destroy(); err != nil {
return err

View File

@ -1,10 +0,0 @@
package types
type BroadcastManager interface {
Start()
Stop()
IsActive() bool
Create(url string)
Destroy()
GetUrl() string
}

29
internal/types/capture.go Normal file
View File

@ -0,0 +1,29 @@
package types
type Sample struct {
Data []byte
Samples uint32
}
type CaptureManager interface {
Start()
Shutdown() error
VideoCodec() string
AudioCodec() string
OnVideoFrame(listener func(sample Sample))
OnAudioFrame(listener func(sample Sample))
StartStream()
StopStream()
Streaming() bool
ChangeResolution(width int, height int, rate int) error
// broacast
StartBroadcast(url string)
StopBroadcast()
IsBoradcasting() bool
BroadcastUrl() string
}

View File

@ -1,23 +0,0 @@
package config
import (
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
type Broadcast struct {
Pipeline string
}
func (Broadcast) Init(cmd *cobra.Command) error {
cmd.PersistentFlags().String("broadcast_pipeline", "", "audio codec parameters to use for broadcasting")
if err := viper.BindPFlag("broadcast_pipeline", cmd.PersistentFlags().Lookup("broadcast_pipeline")); err != nil {
return err
}
return nil
}
func (s *Broadcast) Set() {
s.Pipeline = viper.GetString("broadcast_pipeline")
}

View File

@ -1,27 +1,36 @@
package types
type RemoteManager interface {
VideoCodec() string
AudioCodec() string
type ScreenSize struct {
Width int `json:"width"`
Height int `json:"height"`
Rate int16 `json:"rate"`
}
type ScreenConfiguration struct {
Width int `json:"width"`
Height int `json:"height"`
Rates map[int]int16 `json:"rates"`
}
type DesktopManager interface {
Start()
Shutdown() error
OnVideoFrame(listener func(sample Sample))
OnAudioFrame(listener func(sample Sample))
StartStream()
StopStream()
Streaming() bool
ChangeResolution(width int, height int, rate int) error
GetScreenSize() *ScreenSize
ScreenConfigurations() map[int]ScreenConfiguration
// xorg
ChangeScreenSize(width int, height int, rate int) error
Move(x, y int)
Scroll(x, y int)
ButtonDown(code int) error
KeyDown(code uint64) error
ButtonUp(code int) error
KeyUp(code uint64) error
ReadClipboard() string
WriteClipboard(data string)
ResetKeys()
ScreenConfigurations() map[int]ScreenConfiguration
GetScreenSize() *ScreenSize
SetKeyboardLayout(layout string)
SetKeyboardModifiers(NumLock int, CapsLock int, ScrollLock int)
// clipboard
ReadClipboard() string
WriteClipboard(data string)
}

View File

@ -1,14 +0,0 @@
package types
type Button struct {
Name string
Code int
Keysym int
}
type Key struct {
Name string
Value string
Code int
Keysym int
}

View File

@ -29,13 +29,21 @@ type SignalAnswer struct {
type MembersList struct {
Event string `json:"event"`
Memebers []*types.Member `json:"members"`
Memebers []*MembersListEntry `json:"members"`
}
type MembersListEntry struct {
ID string `json:"id"`
Name string `json:"displayname"`
Admin bool `json:"admin"`
Muted bool `json:"muted"`
}
type Member struct {
Event string `json:"event"`
*types.Member
Member *MembersListEntry
}
type MemberDisconnected struct {
Event string `json:"event"`
ID string `json:"id"`

View File

@ -1,12 +1,5 @@
package types
type Member struct {
ID string `json:"id"`
Name string `json:"displayname"`
Admin bool `json:"admin"`
Muted bool `json:"muted"`
}
type Session interface {
ID() string
Name() string
@ -14,30 +7,32 @@ type Session interface {
Muted() bool
IsHost() bool
Connected() bool
Member() *Member
Address() string
SetMuted(muted bool)
SetName(name string)
SetConnected()
SetSocket(socket WebSocket)
SetPeer(peer Peer)
Address() string
Disconnect(message string) error
SetConnected()
Disconnect(reason string) error
Send(v interface{}) error
SignalAnswer(sdp string) error
}
type SessionManager interface {
New(id string, admin bool, socket WebSocket) Session
Get(id string) (Session, bool)
Has(id string) bool
Destroy(id string) error
HasHost() bool
SetHost(Session)
SetHost(host Session)
GetHost() Session
ClearHost()
Has(id string) bool
Get(id string) (Session, bool)
Members() []*Member
Admins() []*Member
Destroy(id string) error
Admins() []Session
Members() []Session
Broadcast(v interface{}, exclude interface{}) error
OnHost(listener func(session Session))
OnHostCleared(listener func(session Session))
OnDestroy(listener func(id string))

View File

@ -1,14 +1,9 @@
package types
type Sample struct {
Data []byte
Samples uint32
}
type WebRTCManager interface {
Start()
Shutdown() error
CreatePeer(id string, session Session) (string, bool, []string, error)
CreatePeer(session Session) (string, bool, []string, error)
}
type Peer interface {

View File

@ -8,7 +8,7 @@ type WebSocket interface {
Destroy() error
}
type WebSocketHandler interface {
type WebSocketManager interface {
Start()
Shutdown() error
Upgrade(w http.ResponseWriter, r *http.Request) error

View File

@ -1,13 +0,0 @@
package types
type ScreenSize struct {
Width int `json:"width"`
Height int `json:"height"`
Rate int16 `json:"rate"`
}
type ScreenConfiguration struct {
Width int `json:"width"`
Height int `json:"height"`
Rates map[int]int16 `json:"rates"`
}

View File

@ -6,15 +6,15 @@ import (
"strconv"
"github.com/pion/webrtc/v2"
"demodesk/neko/internal/types"
)
const OP_MOVE = 0x01
const OP_SCROLL = 0x02
const OP_KEY_DOWN = 0x03
const OP_KEY_UP = 0x04
const OP_KEY_CLK = 0x05
const (
OP_MOVE = 0x01
OP_SCROLL = 0x02
OP_KEY_DOWN = 0x03
OP_KEY_UP = 0x04
OP_KEY_CLK = 0x05
)
type PayloadHeader struct {
Event uint8
@ -38,11 +38,7 @@ type PayloadKey struct {
Key uint64
}
func (manager *WebRTCManager) handle(session types.Session, msg webrtc.DataChannelMessage) error {
if !session.IsHost() {
return nil
}
func (manager *WebRTCManagerCtx) handle(msg webrtc.DataChannelMessage) error {
buffer := bytes.NewBuffer(msg.Data)
header := &PayloadHeader{}
hbytes := make([]byte, 3)
@ -64,7 +60,7 @@ func (manager *WebRTCManager) handle(session types.Session, msg webrtc.DataChann
return err
}
manager.remote.Move(int(payload.X), int(payload.Y))
manager.desktop.Move(int(payload.X), int(payload.Y))
case OP_SCROLL:
payload := &PayloadScroll{}
if err := binary.Read(buffer, binary.LittleEndian, payload); err != nil {
@ -77,7 +73,7 @@ func (manager *WebRTCManager) handle(session types.Session, msg webrtc.DataChann
Str("y", strconv.Itoa(int(payload.Y))).
Msg("scroll")
manager.remote.Scroll(int(payload.X), int(payload.Y))
manager.desktop.Scroll(int(payload.X), int(payload.Y))
case OP_KEY_DOWN:
payload := &PayloadKey{}
if err := binary.Read(buffer, binary.LittleEndian, payload); err != nil {
@ -85,7 +81,7 @@ func (manager *WebRTCManager) handle(session types.Session, msg webrtc.DataChann
}
if payload.Key < 8 {
err := manager.remote.ButtonDown(int(payload.Key))
err := manager.desktop.ButtonDown(int(payload.Key))
if err != nil {
manager.logger.Warn().Err(err).Msg("button down failed")
return nil
@ -93,7 +89,7 @@ func (manager *WebRTCManager) handle(session types.Session, msg webrtc.DataChann
manager.logger.Debug().Msgf("button down %d", payload.Key)
} else {
err := manager.remote.KeyDown(uint64(payload.Key))
err := manager.desktop.KeyDown(uint64(payload.Key))
if err != nil {
manager.logger.Warn().Err(err).Msg("key down failed")
return nil
@ -109,7 +105,7 @@ func (manager *WebRTCManager) handle(session types.Session, msg webrtc.DataChann
}
if payload.Key < 8 {
err := manager.remote.ButtonUp(int(payload.Key))
err := manager.desktop.ButtonUp(int(payload.Key))
if err != nil {
manager.logger.Warn().Err(err).Msg("button up failed")
return nil
@ -117,7 +113,7 @@ func (manager *WebRTCManager) handle(session types.Session, msg webrtc.DataChann
manager.logger.Debug().Msgf("button up %d", payload.Key)
} else {
err := manager.remote.KeyUp(uint64(payload.Key))
err := manager.desktop.KeyUp(uint64(payload.Key))
if err != nil {
manager.logger.Warn().Err(err).Msg("key up failed")
return nil

View File

@ -8,7 +8,7 @@ import (
"github.com/rs/zerolog"
)
type nulllog struct{}
type nulllog struct {}
func (l nulllog) Trace(msg string) {}
func (l nulllog) Tracef(format string, args ...interface{}) {}
@ -26,10 +26,18 @@ type logger struct {
subsystem string
}
func (l logger) Trace(msg string) { l.logger.Trace().Msg(msg) }
func (l logger) Tracef(format string, args ...interface{}) { l.logger.Trace().Msgf(format, args...) }
func (l logger) Debug(msg string) { l.logger.Debug().Msg(msg) }
func (l logger) Debugf(format string, args ...interface{}) { l.logger.Debug().Msgf(format, args...) }
func (l logger) Trace(msg string) {
l.logger.Trace().Msg(msg)
}
func (l logger) Tracef(format string, args ...interface{}) {
l.logger.Trace().Msgf(format, args...)
}
func (l logger) Debug(msg string) {
l.logger.Debug().Msg(msg)
}
func (l logger) Debugf(format string, args ...interface{}) {
l.logger.Debug().Msgf(format, args...)
}
func (l logger) Info(msg string) {
if strings.Contains(msg, "packetio.Buffer is full") {
//l.logger.Panic().Msg(msg)
@ -45,10 +53,18 @@ func (l logger) Infof(format string, args ...interface{}) {
}
l.logger.Info().Msg(msg)
}
func (l logger) Warn(msg string) { l.logger.Warn().Msg(msg) }
func (l logger) Warnf(format string, args ...interface{}) { l.logger.Warn().Msgf(format, args...) }
func (l logger) Error(msg string) { l.logger.Error().Msg(msg) }
func (l logger) Errorf(format string, args ...interface{}) { l.logger.Error().Msgf(format, args...) }
func (l logger) Warn(msg string) {
l.logger.Warn().Msg(msg)
}
func (l logger) Warnf(format string, args ...interface{}) {
l.logger.Warn().Msgf(format, args...)
}
func (l logger) Error(msg string) {
l.logger.Error().Msg(msg)
}
func (l logger) Errorf(format string, args ...interface{}) {
l.logger.Error().Msgf(format, args...)
}
type loggerFactory struct {
logger zerolog.Logger

View File

@ -4,26 +4,25 @@ import (
"github.com/pion/webrtc/v2"
)
type Peer struct {
id string
type PeerCtx struct {
api *webrtc.API
engine *webrtc.MediaEngine
manager *WebRTCManager
settings *webrtc.SettingEngine
connection *webrtc.PeerConnection
configuration *webrtc.Configuration
}
func (peer *Peer) SignalAnswer(sdp string) error {
return peer.connection.SetRemoteDescription(webrtc.SessionDescription{SDP: sdp, Type: webrtc.SDPTypeAnswer})
func (peer *PeerCtx) SignalAnswer(sdp string) error {
return peer.connection.SetRemoteDescription(webrtc.SessionDescription{
SDP: sdp,
Type: webrtc.SDPTypeAnswer,
})
}
func (peer *Peer) Destroy() error {
if peer.connection != nil && peer.connection.ConnectionState() == webrtc.PeerConnectionStateConnected {
if err := peer.connection.Close(); err != nil {
return err
}
}
func (peer *PeerCtx) Destroy() error {
if peer.connection == nil || peer.connection.ConnectionState() != webrtc.PeerConnectionStateConnected {
return nil
}
return peer.connection.Close()
}

View File

@ -12,46 +12,49 @@ import (
"github.com/rs/zerolog/log"
"demodesk/neko/internal/types"
"demodesk/neko/internal/types/config"
"demodesk/neko/internal/config"
)
func New(remote types.RemoteManager, config *config.WebRTC) *WebRTCManager {
return &WebRTCManager{
func New(desktop types.DesktopManager, capture types.CaptureManager, config *config.WebRTC) *WebRTCManagerCtx {
return &WebRTCManagerCtx{
logger: log.With().Str("module", "webrtc").Logger(),
remote: remote,
desktop: desktop,
capture: capture,
config: config,
}
}
type WebRTCManager struct {
type WebRTCManagerCtx struct {
logger zerolog.Logger
videoTrack *webrtc.Track
audioTrack *webrtc.Track
videoCodec *webrtc.RTPCodec
audioCodec *webrtc.RTPCodec
remote types.RemoteManager
desktop types.DesktopManager
capture types.CaptureManager
config *config.WebRTC
}
func (manager *WebRTCManager) Start() {
func (manager *WebRTCManagerCtx) Start() {
var err error
manager.audioTrack, manager.audioCodec, err = manager.createTrack(manager.remote.AudioCodec())
manager.audioTrack, manager.audioCodec, err = manager.createTrack(manager.capture.AudioCodec())
if err != nil {
manager.logger.Panic().Err(err).Msg("unable to create audio track")
}
manager.remote.OnAudioFrame(func(sample types.Sample) {
manager.capture.OnAudioFrame(func(sample types.Sample) {
if err := manager.audioTrack.WriteSample(media.Sample(sample)); err != nil && err != io.ErrClosedPipe {
manager.logger.Warn().Err(err).Msg("audio pipeline failed to write")
}
})
manager.videoTrack, manager.videoCodec, err = manager.createTrack(manager.remote.VideoCodec())
manager.videoTrack, manager.videoCodec, err = manager.createTrack(manager.capture.VideoCodec())
if err != nil {
manager.logger.Panic().Err(err).Msg("unable to create video track")
}
manager.remote.OnVideoFrame(func(sample types.Sample) {
manager.capture.OnVideoFrame(func(sample types.Sample) {
if err := manager.videoTrack.WriteSample(media.Sample(sample)); err != nil && err != io.ErrClosedPipe {
manager.logger.Warn().Err(err).Msg("video pipeline failed to write")
}
@ -65,12 +68,12 @@ func (manager *WebRTCManager) Start() {
Msgf("webrtc starting")
}
func (manager *WebRTCManager) Shutdown() error {
func (manager *WebRTCManagerCtx) Shutdown() error {
manager.logger.Info().Msgf("webrtc shutting down")
return nil
}
func (manager *WebRTCManager) CreatePeer(id string, session types.Session) (string, bool, []string, error) {
func (manager *WebRTCManagerCtx) CreatePeer(session types.Session) (string, bool, []string, error) {
configuration := &webrtc.Configuration{
ICEServers: []webrtc.ICEServer{
{
@ -134,7 +137,11 @@ func (manager *WebRTCManager) CreatePeer(id string, session types.Session) (stri
connection.OnDataChannel(func(d *webrtc.DataChannel) {
d.OnMessage(func(msg webrtc.DataChannelMessage) {
if err = manager.handle(session, msg); err != nil {
if !session.IsHost() {
return
}
if err = manager.handle(msg); err != nil {
manager.logger.Warn().Err(err).Msg("data handle failed")
}
})
@ -148,21 +155,19 @@ func (manager *WebRTCManager) CreatePeer(id string, session types.Session) (stri
switch state {
case webrtc.PeerConnectionStateDisconnected:
case webrtc.PeerConnectionStateFailed:
manager.logger.Info().Str("id", id).Msg("peer disconnected")
manager.logger.Info().Str("id", session.ID()).Msg("peer disconnected")
if err:= session.Disconnect("peer connection state failed"); err != nil {
manager.logger.Warn().Err(err).Msg("error while disconnecting session")
}
case webrtc.PeerConnectionStateConnected:
manager.logger.Info().Str("id", id).Msg("peer connected")
manager.logger.Info().Str("id", session.ID()).Msg("peer connected")
session.SetConnected()
}
})
session.SetPeer(&Peer{
id: id,
session.SetPeer(&PeerCtx{
api: api,
engine: &engine,
manager: manager,
settings: &settings,
connection: connection,
configuration: configuration,
@ -171,7 +176,7 @@ func (manager *WebRTCManager) CreatePeer(id string, session types.Session) (stri
return description.SDP, manager.config.ICELite, manager.config.ICEServers, nil
}
func (m *WebRTCManager) createTrack(codecName string) (*webrtc.Track, *webrtc.RTPCodec, error) {
func (m *WebRTCManagerCtx) createTrack(codecName string) (*webrtc.Track, *webrtc.RTPCodec, error) {
var codec *webrtc.RTPCodec
switch codecName {
case webrtc.VP8:

View File

@ -1,17 +0,0 @@
package broadcast
import (
"demodesk/neko/internal/types"
"demodesk/neko/internal/types/event"
"demodesk/neko/internal/types/message"
)
func ScreenConfiguration(session types.SessionManager, id string, width int, height int, rate int) error {
return session.Broadcast(message.ScreenResolution{
Event: event.SCREEN_RESOLUTION,
ID: id,
Width: width,
Height: height,
Rate: rate,
}, nil)
}

View File

@ -1,4 +1,4 @@
package websocket
package handler
import (
"strings"
@ -8,7 +8,7 @@ import (
"demodesk/neko/internal/types/message"
)
func (h *MessageHandler) adminLock(session types.Session) error {
func (h *MessageHandlerCtx) adminLock(session types.Session) error {
if !session.Admin() {
h.logger.Debug().Msg("user not admin")
return nil
@ -33,7 +33,7 @@ func (h *MessageHandler) adminLock(session types.Session) error {
return nil
}
func (h *MessageHandler) adminUnlock(session types.Session) error {
func (h *MessageHandlerCtx) adminUnlock(session types.Session) error {
if !session.Admin() {
h.logger.Debug().Msg("user not admin")
return nil
@ -58,7 +58,7 @@ func (h *MessageHandler) adminUnlock(session types.Session) error {
return nil
}
func (h *MessageHandler) adminControl(session types.Session) error {
func (h *MessageHandlerCtx) adminControl(session types.Session) error {
if !session.Admin() {
h.logger.Debug().Msg("user not admin")
return nil
@ -91,7 +91,7 @@ func (h *MessageHandler) adminControl(session types.Session) error {
return nil
}
func (h *MessageHandler) adminRelease(session types.Session) error {
func (h *MessageHandlerCtx) adminRelease(session types.Session) error {
if !session.Admin() {
h.logger.Debug().Msg("user not admin")
return nil
@ -124,7 +124,7 @@ func (h *MessageHandler) adminRelease(session types.Session) error {
return nil
}
func (h *MessageHandler) adminGive(session types.Session, payload *message.Admin) error {
func (h *MessageHandlerCtx) adminGive(session types.Session, payload *message.Admin) error {
if !session.Admin() {
h.logger.Debug().Msg("user not admin")
return nil
@ -153,7 +153,7 @@ func (h *MessageHandler) adminGive(session types.Session, payload *message.Admin
return nil
}
func (h *MessageHandler) adminMute(session types.Session, payload *message.Admin) error {
func (h *MessageHandlerCtx) adminMute(session types.Session, payload *message.Admin) error {
if !session.Admin() {
h.logger.Debug().Msg("user not admin")
return nil
@ -185,7 +185,7 @@ func (h *MessageHandler) adminMute(session types.Session, payload *message.Admin
return nil
}
func (h *MessageHandler) adminUnmute(session types.Session, payload *message.Admin) error {
func (h *MessageHandlerCtx) adminUnmute(session types.Session, payload *message.Admin) error {
if !session.Admin() {
h.logger.Debug().Msg("user not admin")
return nil
@ -212,7 +212,7 @@ func (h *MessageHandler) adminUnmute(session types.Session, payload *message.Adm
return nil
}
func (h *MessageHandler) adminKick(session types.Session, payload *message.Admin) error {
func (h *MessageHandlerCtx) adminKick(session types.Session, payload *message.Admin) error {
if !session.Admin() {
h.logger.Debug().Msg("user not admin")
return nil
@ -246,7 +246,7 @@ func (h *MessageHandler) adminKick(session types.Session, payload *message.Admin
return nil
}
func (h *MessageHandler) adminBan(session types.Session, payload *message.Admin) error {
func (h *MessageHandlerCtx) adminBan(session types.Session, payload *message.Admin) error {
if !session.Admin() {
h.logger.Debug().Msg("user not admin")
return nil

View File

@ -1,4 +1,4 @@
package websocket
package handler
import (
"demodesk/neko/internal/types"
@ -6,13 +6,13 @@ import (
"demodesk/neko/internal/types/message"
)
func (h *MessageHandler) boradcastCreate(session types.Session, payload *message.BroadcastCreate) error {
func (h *MessageHandlerCtx) boradcastCreate(session types.Session, payload *message.BroadcastCreate) error {
if !session.Admin() {
h.logger.Debug().Msg("user not admin")
return nil
}
h.broadcast.Create(payload.URL)
h.capture.StartBroadcast(payload.URL)
if err := h.boradcastStatus(session); err != nil {
return err
@ -21,13 +21,13 @@ func (h *MessageHandler) boradcastCreate(session types.Session, payload *message
return nil
}
func (h *MessageHandler) boradcastDestroy(session types.Session) error {
func (h *MessageHandlerCtx) boradcastDestroy(session types.Session) error {
if !session.Admin() {
h.logger.Debug().Msg("user not admin")
return nil
}
h.broadcast.Destroy()
h.capture.StopBroadcast()
if err := h.boradcastStatus(session); err != nil {
return err
@ -36,7 +36,7 @@ func (h *MessageHandler) boradcastDestroy(session types.Session) error {
return nil
}
func (h *MessageHandler) boradcastStatus(session types.Session) error {
func (h *MessageHandlerCtx) boradcastStatus(session types.Session) error {
if !session.Admin() {
h.logger.Debug().Msg("user not admin")
return nil
@ -45,8 +45,8 @@ func (h *MessageHandler) boradcastStatus(session types.Session) error {
if err := session.Send(
message.BroadcastStatus{
Event: event.BORADCAST_STATUS,
IsActive: h.broadcast.IsActive(),
URL: h.broadcast.GetUrl(),
IsActive: h.capture.IsBoradcasting(),
URL: h.capture.BroadcastUrl(),
}); err != nil {
h.logger.Warn().Err(err).Msgf("sending event %s has failed", event.BORADCAST_STATUS)
return err

View File

@ -1,4 +1,4 @@
package websocket
package handler
import (
"demodesk/neko/internal/types"
@ -6,7 +6,7 @@ import (
"demodesk/neko/internal/types/message"
)
func (h *MessageHandler) controlRelease(session types.Session) error {
func (h *MessageHandlerCtx) controlRelease(session types.Session) error {
// check if session is host
if !session.IsHost() {
h.logger.Debug().Str("id", session.ID()).Msg("is not the host")
@ -30,7 +30,7 @@ func (h *MessageHandler) controlRelease(session types.Session) error {
return nil
}
func (h *MessageHandler) controlRequest(session types.Session) error {
func (h *MessageHandlerCtx) controlRequest(session types.Session) error {
host := h.sessions.GetHost()
if host == nil {
@ -69,7 +69,7 @@ func (h *MessageHandler) controlRequest(session types.Session) error {
return nil
}
func (h *MessageHandler) controlGive(session types.Session, payload *message.Control) error {
func (h *MessageHandlerCtx) controlGive(session types.Session, payload *message.Control) error {
// check if session is host
if !session.IsHost() {
h.logger.Debug().Str("id", session.ID()).Msg("is not the host")
@ -99,18 +99,18 @@ func (h *MessageHandler) controlGive(session types.Session, payload *message.Con
return nil
}
func (h *MessageHandler) controlClipboard(session types.Session, payload *message.Clipboard) error {
func (h *MessageHandlerCtx) controlClipboard(session types.Session, payload *message.Clipboard) error {
// check if session is host
if !session.IsHost() {
h.logger.Debug().Str("id", session.ID()).Msg("is not the host")
return nil
}
h.remote.WriteClipboard(payload.Text)
h.desktop.WriteClipboard(payload.Text)
return nil
}
func (h *MessageHandler) controlKeyboard(session types.Session, payload *message.Keyboard) error {
func (h *MessageHandlerCtx) controlKeyboard(session types.Session, payload *message.Keyboard) error {
// check if session is host
if !session.IsHost() {
h.logger.Debug().Str("id", session.ID()).Msg("is not the host")
@ -119,7 +119,7 @@ func (h *MessageHandler) controlKeyboard(session types.Session, payload *message
// change layout
if payload.Layout != nil {
h.remote.SetKeyboardLayout(*payload.Layout)
h.desktop.SetKeyboardLayout(*payload.Layout)
}
// set num lock
@ -152,6 +152,6 @@ func (h *MessageHandler) controlKeyboard(session types.Session, payload *message
Int("ScrollLock", ScrollLock).
Msg("setting keyboard modifiers")
h.remote.SetKeyboardModifiers(NumLock, CapsLock, ScrollLock)
h.desktop.SetKeyboardModifiers(NumLock, CapsLock, ScrollLock)
return nil
}

View File

@ -1,10 +1,11 @@
package websocket
package handler
import (
"encoding/json"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"demodesk/neko/internal/types"
"demodesk/neko/internal/types/event"
@ -12,40 +13,60 @@ import (
"demodesk/neko/internal/utils"
)
type MessageHandler struct {
func New(
sessions types.SessionManager,
desktop types.DesktopManager,
capture types.CaptureManager,
webrtc types.WebRTCManager,
) *MessageHandlerCtx {
logger := log.With().Str("module", "handler").Logger()
return &MessageHandlerCtx{
logger: logger,
sessions: sessions,
desktop: desktop,
capture: capture,
webrtc: webrtc,
banned: make(map[string]bool),
locked: false,
}
}
type MessageHandlerCtx struct {
logger zerolog.Logger
sessions types.SessionManager
webrtc types.WebRTCManager
remote types.RemoteManager
broadcast types.BroadcastManager
desktop types.DesktopManager
capture types.CaptureManager
banned map[string]bool
locked bool
}
func (h *MessageHandler) Connected(id string, socket *WebSocket) (bool, string, error) {
func (h *MessageHandlerCtx) Connected(id string, socket types.WebSocket) (bool, string) {
address := socket.Address()
if address == "" {
h.logger.Debug().Msg("no remote address")
} else {
if address != "" {
ok, banned := h.banned[address]
if ok && banned {
h.logger.Debug().Str("address", address).Msg("banned")
return false, "banned", nil
return false, "banned"
}
} else {
h.logger.Debug().Msg("no remote address")
}
if h.locked {
session, ok := h.sessions.Get(id)
if !ok || !session.Admin() {
h.logger.Debug().Msg("server locked")
return false, "locked", nil
return false, "locked"
}
}
return true, "", nil
return true, ""
}
func (h *MessageHandler) Disconnected(id string) error {
func (h *MessageHandlerCtx) Disconnected(id string) error {
// TODO: Refactor.
if h.locked && len(h.sessions.Admins()) == 0 {
h.locked = false
}
@ -53,7 +74,7 @@ func (h *MessageHandler) Disconnected(id string) error {
return h.sessions.Destroy(id)
}
func (h *MessageHandler) Message(id string, raw []byte) error {
func (h *MessageHandlerCtx) Message(id string, raw []byte) error {
header := message.Message{}
if err := json.Unmarshal(raw, &header); err != nil {
return err

View File

@ -1,33 +1,38 @@
package websocket
package handler
import (
"demodesk/neko/internal/types"
"demodesk/neko/internal/types/event"
"demodesk/neko/internal/types/message"
"demodesk/neko/internal/websocket/broadcast"
)
func (h *MessageHandler) screenSet(session types.Session, payload *message.ScreenResolution) error {
func (h *MessageHandlerCtx) screenSet(session types.Session, payload *message.ScreenResolution) error {
if !session.Admin() {
h.logger.Debug().Msg("user not admin")
return nil
}
if err := h.remote.ChangeResolution(payload.Width, payload.Height, payload.Rate); err != nil {
if err := h.capture.ChangeResolution(payload.Width, payload.Height, payload.Rate); err != nil {
h.logger.Warn().Err(err).Msgf("unable to change screen size")
return err
}
if err := broadcast.ScreenConfiguration(h.sessions, session.ID(), payload.Width, payload.Height, payload.Rate); err != nil {
h.logger.Warn().Err(err).Msgf("broadcasting event %s has failed", event.SCREEN_RESOLUTION)
if err := h.sessions.Broadcast(message.ScreenResolution{
Event: event.SCREEN_RESOLUTION,
ID: session.ID(),
Width: payload.Width,
Height: payload.Height,
Rate: payload.Rate,
}, nil); err != nil {
h.logger.Warn().Err(err).Msgf("sending event %s has failed", event.SCREEN_RESOLUTION)
return err
}
return nil
}
func (h *MessageHandler) screenResolution(session types.Session) error {
if size := h.remote.GetScreenSize(); size != nil {
func (h *MessageHandlerCtx) screenResolution(session types.Session) error {
if size := h.desktop.GetScreenSize(); size != nil {
if err := session.Send(message.ScreenResolution{
Event: event.SCREEN_RESOLUTION,
Width: size.Width,
@ -42,7 +47,7 @@ func (h *MessageHandler) screenResolution(session types.Session) error {
return nil
}
func (h *MessageHandler) screenConfigurations(session types.Session) error {
func (h *MessageHandlerCtx) screenConfigurations(session types.Session) error {
if !session.Admin() {
h.logger.Debug().Msg("user not admin")
return nil
@ -50,7 +55,7 @@ func (h *MessageHandler) screenConfigurations(session types.Session) error {
if err := session.Send(message.ScreenConfigurations{
Event: event.SCREEN_CONFIGURATIONS,
Configurations: h.remote.ScreenConfigurations(),
Configurations: h.desktop.ScreenConfigurations(),
}); err != nil {
h.logger.Warn().Err(err).Msgf("sending event %s has failed", event.SCREEN_CONFIGURATIONS)
return err

View File

@ -1,4 +1,4 @@
package websocket
package handler
import (
"demodesk/neko/internal/types"
@ -6,7 +6,7 @@ import (
"demodesk/neko/internal/types/message"
)
func (h *MessageHandler) SessionCreated(session types.Session) error {
func (h *MessageHandlerCtx) SessionCreated(session types.Session) error {
// send sdp and id over to client
if err := h.signalProvide(session); err != nil {
return err
@ -27,11 +27,22 @@ func (h *MessageHandler) SessionCreated(session types.Session) error {
return nil
}
func (h *MessageHandler) SessionConnected(session types.Session) error {
func (h *MessageHandlerCtx) SessionConnected(session types.Session) error {
// TODO: Refactor.
members := []*message.MembersListEntry{}
for _, session := range h.sessions.Members() {
members = append(members, &message.MembersListEntry{
ID: session.ID(),
Name: session.Name(),
Admin: session.Admin(),
Muted: session.Muted(),
})
}
// send list of members to session
if err := session.Send(message.MembersList{
Event: event.MEMBER_LIST,
Memebers: h.sessions.Members(),
Memebers: members,
}); err != nil {
h.logger.Warn().Str("id", session.ID()).Err(err).Msgf("sending event %s has failed", event.MEMBER_LIST)
return err
@ -58,7 +69,12 @@ func (h *MessageHandler) SessionConnected(session types.Session) error {
if err := h.sessions.Broadcast(
message.Member{
Event: event.MEMBER_CONNECTED,
Member: session.Member(),
Member: &message.MembersListEntry{
ID: session.ID(),
Name: session.Name(),
Admin: session.Admin(),
Muted: session.Muted(),
},
}, nil); err != nil {
h.logger.Warn().Err(err).Msgf("broadcasting event %s has failed", event.CONTROL_RELEASE)
return err
@ -67,7 +83,7 @@ func (h *MessageHandler) SessionConnected(session types.Session) error {
return nil
}
func (h *MessageHandler) SessionDestroyed(id string) error {
func (h *MessageHandlerCtx) SessionDestroyed(id string) error {
// clear host if exists
host := h.sessions.GetHost()
if host != nil && host.ID() == id {

View File

@ -1,4 +1,4 @@
package websocket
package handler
import (
"demodesk/neko/internal/types"
@ -6,8 +6,8 @@ import (
"demodesk/neko/internal/types/message"
)
func (h *MessageHandler) signalProvide(session types.Session) error {
sdp, lite, ice, err := h.webrtc.CreatePeer(session.ID(), session)
func (h *MessageHandlerCtx) signalProvide(session types.Session) error {
sdp, lite, ice, err := h.webrtc.CreatePeer(session)
if err != nil {
return err
}
@ -25,7 +25,7 @@ func (h *MessageHandler) signalProvide(session types.Session) error {
return nil
}
func (h *MessageHandler) signalAnswer(session types.Session, payload *message.SignalAnswer) error {
func (h *MessageHandlerCtx) signalAnswer(session types.Session, payload *message.SignalAnswer) error {
session.SetName(payload.DisplayName)
if err := session.SignalAnswer(payload.SDP); err != nil {

View File

@ -0,0 +1,270 @@
package websocket
import (
"fmt"
"net/http"
"time"
"github.com/gorilla/websocket"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"demodesk/neko/internal/websocket/handler"
"demodesk/neko/internal/types/event"
"demodesk/neko/internal/types/message"
"demodesk/neko/internal/types"
"demodesk/neko/internal/config"
"demodesk/neko/internal/utils"
)
func New(
sessions types.SessionManager,
desktop types.DesktopManager,
capture types.CaptureManager,
webrtc types.WebRTCManager,
conf *config.WebSocket,
) *WebSocketManagerCtx {
logger := log.With().Str("module", "websocket").Logger()
return &WebSocketManagerCtx{
logger: logger,
conf: conf,
sessions: sessions,
desktop: desktop,
upgrader: websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
},
handler: handler.New(sessions, desktop, capture, webrtc),
}
}
// Send pings to peer with this period. Must be less than pongWait.
const pingPeriod = 60 * time.Second
type WebSocketManagerCtx struct {
logger zerolog.Logger
upgrader websocket.Upgrader
sessions types.SessionManager
desktop types.DesktopManager
conf *config.WebSocket
handler *handler.MessageHandlerCtx
shutdown chan bool
}
func (ws *WebSocketManagerCtx) Start() {
ws.sessions.OnCreated(func(session types.Session) {
if err := ws.handler.SessionCreated(session); err != nil {
ws.logger.Warn().Str("id", session.ID()).Err(err).Msg("session created with and error")
} else {
ws.logger.Debug().Str("id", session.ID()).Msg("session created")
}
})
ws.sessions.OnConnected(func(session types.Session) {
if err := ws.handler.SessionConnected(session); err != nil {
ws.logger.Warn().Str("id", session.ID()).Err(err).Msg("session connected with and error")
} else {
ws.logger.Debug().Str("id", session.ID()).Msg("session connected")
}
})
ws.sessions.OnDestroy(func(id string) {
if err := ws.handler.SessionDestroyed(id); err != nil {
ws.logger.Warn().Str("id", id).Err(err).Msg("session destroyed with and error")
} else {
ws.logger.Debug().Str("id", id).Msg("session destroyed")
}
})
go func() {
defer func() {
ws.logger.Info().Msg("shutdown")
}()
current := ws.desktop.ReadClipboard()
for {
select {
case <-ws.shutdown:
return
default:
session := ws.sessions.GetHost()
if session != nil {
break
}
text := ws.desktop.ReadClipboard()
if text == current {
break
}
// TODO: Refactor
if err := session.Send(message.Clipboard{
Event: event.CONTROL_CLIPBOARD,
Text: text,
}); err != nil {
ws.logger.Warn().Err(err).Msg("could not sync clipboard")
}
current = text
}
time.Sleep(100 * time.Millisecond)
}
}()
}
func (ws *WebSocketManagerCtx) Shutdown() error {
ws.shutdown <- true
return nil
}
func (ws *WebSocketManagerCtx) Upgrade(w http.ResponseWriter, r *http.Request) error {
ws.logger.Debug().Msg("attempting to upgrade connection")
connection, err := ws.upgrader.Upgrade(w, r, nil)
if err != nil {
ws.logger.Error().Err(err).Msg("failed to upgrade connection")
return err
}
id, ip, admin, err := ws.authenticate(r)
if err != nil {
ws.logger.Warn().Err(err).Msg("authentication failed")
// TODO: Refactor
if err = connection.WriteJSON(message.Disconnect{
Event: event.SYSTEM_DISCONNECT,
Message: "invalid_password",
}); err != nil {
ws.logger.Error().Err(err).Msg("failed to send disconnect")
}
return connection.Close()
}
socket := &WebSocketCtx{
id: id,
ws: ws,
address: ip,
connection: connection,
}
ok, reason := ws.handler.Connected(id, socket)
if !ok {
// TODO: Refactor
if err = connection.WriteJSON(message.Disconnect{
Event: event.SYSTEM_DISCONNECT,
Message: reason,
}); err != nil {
ws.logger.Error().Err(err).Msg("failed to send disconnect")
}
return connection.Close()
}
ws.sessions.New(id, admin, socket)
ws.logger.
Debug().
Str("session", id).
Str("address", connection.RemoteAddr().String()).
Msg("new connection created")
defer func() {
ws.logger.
Debug().
Str("session", id).
Str("address", connection.RemoteAddr().String()).
Msg("session ended")
}()
ws.handle(connection, id)
return nil
}
// TODO: Refactor
func (ws *WebSocketManagerCtx) authenticate(r *http.Request) (string, string, bool, error) {
ip := r.RemoteAddr
if ws.conf.Proxy {
ip = utils.ReadUserIP(r)
}
id, err := utils.NewUID(32)
if err != nil {
return "", ip, false, err
}
passwords, ok := r.URL.Query()["password"]
if !ok || len(passwords[0]) < 1 {
return "", ip, false, fmt.Errorf("no password provided")
}
if passwords[0] == ws.conf.AdminPassword {
return id, ip, true, nil
}
if passwords[0] == ws.conf.Password {
return id, ip, false, nil
}
return "", ip, false, fmt.Errorf("invalid password: %s", passwords[0])
}
func (ws *WebSocketManagerCtx) handle(connection *websocket.Conn, id string) {
bytes := make(chan []byte)
cancel := make(chan struct{})
ticker := time.NewTicker(pingPeriod)
go func() {
defer func() {
ticker.Stop()
ws.logger.Debug().Str("address", connection.RemoteAddr().String()).Msg("handle socket ending")
if err := ws.handler.Disconnected(id); err != nil {
ws.logger.Warn().Err(err).Msg("socket disconnected with error")
}
}()
for {
_, raw, err := connection.ReadMessage()
if err == nil {
bytes <- raw
continue
}
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
ws.logger.Warn().Err(err).Msg("read message error")
} else {
ws.logger.Debug().Err(err).Msg("read message error")
}
close(cancel)
}
}()
for {
select {
case raw := <-bytes:
ws.logger.Debug().
Str("session", id).
Str("address", connection.RemoteAddr().String()).
Str("raw", string(raw)).
Msg("received message from client")
if err := ws.handler.Message(id, raw); err != nil {
ws.logger.Error().Err(err).Msg("message handler has failed")
}
case <-cancel:
return
case <-ticker.C:
if err := connection.WriteMessage(websocket.PingMessage, nil); err != nil {
ws.logger.Error().Err(err).Msg("ping message has failed")
return
}
}
}
}

View File

@ -1,55 +0,0 @@
package websocket
import (
"encoding/json"
"strings"
"sync"
"github.com/gorilla/websocket"
)
type WebSocket struct {
id string
address string
ws *WebSocketHandler
connection *websocket.Conn
mu sync.Mutex
}
func (socket *WebSocket) Address() string {
//remote := socket.connection.RemoteAddr()
address := strings.SplitN(socket.address, ":", -1)
if len(address[0]) < 1 {
return socket.address
}
return address[0]
}
func (socket *WebSocket) Send(v interface{}) error {
socket.mu.Lock()
defer socket.mu.Unlock()
if socket.connection == nil {
return nil
}
raw, err := json.Marshal(v)
if err != nil {
return err
}
socket.ws.logger.Debug().
Str("session", socket.id).
Str("address", socket.connection.RemoteAddr().String()).
Str("raw", string(raw)).
Msg("sending message to client")
return socket.connection.WriteMessage(websocket.TextMessage, raw)
}
func (socket *WebSocket) Destroy() error {
if socket.connection == nil {
return nil
}
return socket.connection.Close()
}

View File

@ -1,270 +1,57 @@
package websocket
import (
"fmt"
"net/http"
"time"
"encoding/json"
"strings"
"sync"
"github.com/gorilla/websocket"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"demodesk/neko/internal/types"
"demodesk/neko/internal/types/config"
"demodesk/neko/internal/types/event"
"demodesk/neko/internal/types/message"
"demodesk/neko/internal/utils"
)
func New(sessions types.SessionManager, remote types.RemoteManager, broadcast types.BroadcastManager, webrtc types.WebRTCManager, conf *config.WebSocket) *WebSocketHandler {
logger := log.With().Str("module", "websocket").Logger()
return &WebSocketHandler{
logger: logger,
conf: conf,
sessions: sessions,
remote: remote,
upgrader: websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
},
handler: &MessageHandler{
logger: logger.With().Str("subsystem", "handler").Logger(),
remote: remote,
broadcast: broadcast,
sessions: sessions,
webrtc: webrtc,
banned: make(map[string]bool),
locked: false,
},
}
type WebSocketCtx struct {
id string
address string
ws *WebSocketManagerCtx
connection *websocket.Conn
mu sync.Mutex
}
// Send pings to peer with this period. Must be less than pongWait.
const pingPeriod = 60 * time.Second
func (socket *WebSocketCtx) Address() string {
//remote := socket.connection.RemoteAddr()
address := strings.SplitN(socket.address, ":", -1)
if len(address[0]) < 1 {
return socket.address
}
type WebSocketHandler struct {
logger zerolog.Logger
upgrader websocket.Upgrader
sessions types.SessionManager
remote types.RemoteManager
conf *config.WebSocket
handler *MessageHandler
shutdown chan bool
return address[0]
}
func (ws *WebSocketHandler) Start() {
ws.sessions.OnCreated(func(session types.Session) {
if err := ws.handler.SessionCreated(session); err != nil {
ws.logger.Warn().Str("id", session.ID()).Err(err).Msg("session created with and error")
} else {
ws.logger.Debug().Str("id", session.ID()).Msg("session created")
}
})
func (socket *WebSocketCtx) Send(v interface{}) error {
socket.mu.Lock()
defer socket.mu.Unlock()
ws.sessions.OnConnected(func(session types.Session) {
if err := ws.handler.SessionConnected(session); err != nil {
ws.logger.Warn().Str("id", session.ID()).Err(err).Msg("session connected with and error")
} else {
ws.logger.Debug().Str("id", session.ID()).Msg("session connected")
}
})
ws.sessions.OnDestroy(func(id string) {
if err := ws.handler.SessionDestroyed(id); err != nil {
ws.logger.Warn().Str("id", id).Err(err).Msg("session destroyed with and error")
} else {
ws.logger.Debug().Str("id", id).Msg("session destroyed")
}
})
go func() {
defer func() {
ws.logger.Info().Msg("shutdown")
}()
current := ws.remote.ReadClipboard()
for {
select {
case <-ws.shutdown:
return
default:
if ws.sessions.HasHost() {
text := ws.remote.ReadClipboard()
if text != current {
session := ws.sessions.GetHost()
if session != nil {
if err := session.Send(message.Clipboard{
Event: event.CONTROL_CLIPBOARD,
Text: text,
}); err != nil {
ws.logger.Warn().Err(err).Msg("could not sync clipboard")
}
}
current = text
}
}
time.Sleep(100 * time.Millisecond)
}
}
}()
}
func (ws *WebSocketHandler) Shutdown() error {
ws.shutdown <- true
return nil
}
func (ws *WebSocketHandler) Upgrade(w http.ResponseWriter, r *http.Request) error {
ws.logger.Debug().Msg("attempting to upgrade connection")
connection, err := ws.upgrader.Upgrade(w, r, nil)
if err != nil {
ws.logger.Error().Err(err).Msg("failed to upgrade connection")
return err
}
id, ip, admin, err := ws.authenticate(r)
if err != nil {
ws.logger.Warn().Err(err).Msg("authentication failed")
if err = connection.WriteJSON(message.Disconnect{
Event: event.SYSTEM_DISCONNECT,
Message: "invalid_password",
}); err != nil {
ws.logger.Error().Err(err).Msg("failed to send disconnect")
}
if err = connection.Close(); err != nil {
return err
}
if socket.connection == nil {
return nil
}
socket := &WebSocket{
id: id,
ws: ws,
address: ip,
connection: connection,
}
ok, reason, err := ws.handler.Connected(id, socket)
raw, err := json.Marshal(v)
if err != nil {
ws.logger.Error().Err(err).Msg("connection failed")
return err
}
if !ok {
if err = connection.WriteJSON(message.Disconnect{
Event: event.SYSTEM_DISCONNECT,
Message: reason,
}); err != nil {
ws.logger.Error().Err(err).Msg("failed to send disconnect")
}
if err = connection.Close(); err != nil {
return err
}
return nil
}
ws.sessions.New(id, admin, socket)
ws.logger.
Debug().
Str("session", id).
Str("address", connection.RemoteAddr().String()).
Msg("new connection created")
defer func() {
ws.logger.
Debug().
Str("session", id).
Str("address", connection.RemoteAddr().String()).
Msg("session ended")
}()
ws.handle(connection, id)
return nil
}
func (ws *WebSocketHandler) authenticate(r *http.Request) (string, string, bool, error) {
ip := r.RemoteAddr
if ws.conf.Proxy {
ip = utils.ReadUserIP(r)
}
id, err := utils.NewUID(32)
if err != nil {
return "", ip, false, err
}
passwords, ok := r.URL.Query()["password"]
if !ok || len(passwords[0]) < 1 {
return "", ip, false, fmt.Errorf("no password provided")
}
if passwords[0] == ws.conf.AdminPassword {
return id, ip, true, nil
}
if passwords[0] == ws.conf.Password {
return id, ip, false, nil
}
return "", ip, false, fmt.Errorf("invalid password: %s", passwords[0])
}
func (ws *WebSocketHandler) handle(connection *websocket.Conn, id string) {
bytes := make(chan []byte)
cancel := make(chan struct{})
ticker := time.NewTicker(pingPeriod)
go func() {
defer func() {
ticker.Stop()
ws.logger.Debug().Str("address", connection.RemoteAddr().String()).Msg("handle socket ending")
if err := ws.handler.Disconnected(id); err != nil {
ws.logger.Warn().Err(err).Msg("socket disconnected with error")
}
}()
for {
_, raw, err := connection.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
ws.logger.Warn().Err(err).Msg("read message error")
} else {
ws.logger.Debug().Err(err).Msg("read message error")
}
close(cancel)
break
}
bytes <- raw
}
}()
for {
select {
case raw := <-bytes:
ws.logger.Debug().
Str("session", id).
Str("address", connection.RemoteAddr().String()).
socket.ws.logger.Debug().
Str("session", socket.id).
Str("address", socket.connection.RemoteAddr().String()).
Str("raw", string(raw)).
Msg("received message from client")
if err := ws.handler.Message(id, raw); err != nil {
ws.logger.Error().Err(err).Msg("message handler has failed")
}
case <-cancel:
return
case <-ticker.C:
if err := connection.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
Msg("sending message to client")
return socket.connection.WriteMessage(websocket.TextMessage, raw)
}
func (socket *WebSocketCtx) Destroy() error {
if socket.connection == nil {
return nil
}
return socket.connection.Close()
}

View File

@ -1,46 +0,0 @@
#pragma once
#ifndef XDISPLAY_H
#define XDISPLAY_H
#include <X11/Xlib.h>
#include <X11/XKBlib.h>
#include <X11/extensions/Xrandr.h>
#include <X11/extensions/XTest.h>
#include <libclipboard.h>
#include <stdint.h>
#include <stdlib.h>
#include <stdio.h> /* For fputs() */
#include <string.h> /* For strdup() */
extern void goCreateScreenSize(int index, int width, int height, int mwidth, int mheight);
extern void goSetScreenRates(int index, int rate_index, short rate);
/* Returns the main display, closed either on exit or when closeMainDisplay()
* is invoked. This removes a bit of the overhead of calling XOpenDisplay() &
* XCloseDisplay() everytime the main display needs to be used.
*
* Note that this is almost certainly not thread safe. */
Display *getXDisplay(void);
clipboard_c *getClipboard(void);
void XMove(int x, int y);
void XScroll(int x, int y);
void XButton(unsigned int button, int down);
void XKey(unsigned long key, int down);
void XClipboardSet(char *src);
char *XClipboardGet();
void XGetScreenConfigurations();
void XSetScreenConfiguration(int index, short rate);
int XGetScreenSize();
short XGetScreenRate();
void XDisplayClose(void);
void XDisplaySet(char *input);
void SetKeyboardLayout(char *layout);
void SetKeyboardModifiers(int num_lock, int caps_lock, int scroll_lock);
#endif

82
neko.go
View File

@ -6,13 +6,13 @@ import (
"os/signal"
"runtime"
"demodesk/neko/internal/broadcast"
"demodesk/neko/internal/http"
"demodesk/neko/internal/remote"
"demodesk/neko/internal/session"
"demodesk/neko/internal/types/config"
"demodesk/neko/internal/config"
"demodesk/neko/internal/desktop"
"demodesk/neko/internal/capture"
"demodesk/neko/internal/webrtc"
"demodesk/neko/internal/session"
"demodesk/neko/internal/websocket"
"demodesk/neko/internal/http"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
@ -61,10 +61,9 @@ func init() {
},
Configs: &Configs{
Root: &config.Root{},
Server: &config.Server{},
Remote: &config.Remote{},
Broadcast: &config.Broadcast{},
Capture: &config.Capture{},
WebRTC: &config.WebRTC{},
Server: &config.Server{},
WebSocket: &config.WebSocket{},
},
}
@ -101,10 +100,9 @@ func (i *Version) Details() string {
type Configs struct {
Root *config.Root
Remote *config.Remote
Broadcast *config.Broadcast
Server *config.Server
Capture *config.Capture
WebRTC *config.WebRTC
Server *config.Server
WebSocket *config.WebSocket
}
@ -113,12 +111,12 @@ type Neko struct {
Configs *Configs
logger zerolog.Logger
server *http.Server
sessionManager *session.SessionManager
remoteManager *remote.RemoteManager
broadcastManager *broadcast.BroadcastManager
webRTCManager *webrtc.WebRTCManager
webSocketHandler *websocket.WebSocketHandler
desktopManager *desktop.DesktopManagerCtx
captureManager *capture.CaptureManagerCtx
webRTCManager *webrtc.WebRTCManagerCtx
sessionManager *session.SessionManagerCtx
webSocketManager *websocket.WebSocketManagerCtx
server *http.ServerCtx
}
func (neko *Neko) Preflight() {
@ -126,51 +124,55 @@ func (neko *Neko) Preflight() {
}
func (neko *Neko) Start() {
neko.broadcastManager = broadcast.New(
neko.Configs.Remote,
neko.Configs.Broadcast,
neko.desktopManager = desktop.New(
neko.Configs.Capture.Display,
)
neko.desktopManager.Start()
neko.remoteManager = remote.New(
neko.Configs.Remote,
neko.broadcastManager,
neko.captureManager = capture.New(
neko.desktopManager,
neko.Configs.Capture,
)
neko.remoteManager.Start()
neko.captureManager.Start()
neko.webRTCManager = webrtc.New(
neko.remoteManager,
neko.desktopManager,
neko.captureManager,
neko.Configs.WebRTC,
)
neko.webRTCManager.Start()
neko.sessionManager = session.New(
neko.remoteManager,
neko.captureManager,
)
neko.webSocketHandler = websocket.New(
neko.webSocketManager = websocket.New(
neko.sessionManager,
neko.remoteManager,
neko.broadcastManager,
neko.desktopManager,
neko.captureManager,
neko.webRTCManager,
neko.Configs.WebSocket,
)
neko.webSocketHandler.Start()
neko.webSocketManager.Start()
neko.server = http.New(
neko.sessionManager,
neko.remoteManager,
neko.broadcastManager,
neko.webSocketHandler,
neko.webSocketManager,
neko.Configs.Server,
)
neko.server.Start()
}
func (neko *Neko) Shutdown() {
if err := neko.remoteManager.Shutdown(); err != nil {
neko.logger.Err(err).Msg("remote manager shutdown with an error")
if err := neko.desktopManager.Shutdown(); err != nil {
neko.logger.Err(err).Msg("desktop manager shutdown with an error")
} else {
neko.logger.Debug().Msg("remote manager shutdown")
neko.logger.Debug().Msg("desktop manager shutdown")
}
if err := neko.captureManager.Shutdown(); err != nil {
neko.logger.Err(err).Msg("capture manager shutdown with an error")
} else {
neko.logger.Debug().Msg("capture manager shutdown")
}
if err := neko.webRTCManager.Shutdown(); err != nil {
@ -179,10 +181,10 @@ func (neko *Neko) Shutdown() {
neko.logger.Debug().Msg("webrtc manager shutdown")
}
if err := neko.webSocketHandler.Shutdown(); err != nil {
neko.logger.Err(err).Msg("websocket handler shutdown with an error")
if err := neko.webSocketManager.Shutdown(); err != nil {
neko.logger.Err(err).Msg("websocket manager shutdown with an error")
} else {
neko.logger.Debug().Msg("websocket handler shutdown")
neko.logger.Debug().Msg("websocket manager shutdown")
}
if err := neko.server.Shutdown(); err != nil {