diff --git a/server/internal/capture/gst/gst.go b/server/internal/capture/gst/gst.go index cd4fb6e..7b39f85 100644 --- a/server/internal/capture/gst/gst.go +++ b/server/internal/capture/gst/gst.go @@ -177,9 +177,9 @@ func goHandlePipelineBuffer(buffer unsafe.Pointer, bufferLen C.int, duration C.i if ok { pipeline.Sample <- types.Sample{ - Data: C.GoBytes(buffer, bufferLen), - Timestamp: time.Now(), - Duration: time.Duration(duration), + Data: C.GoBytes(buffer, bufferLen), + Timestamp: time.Now(), + Duration: time.Duration(duration), } } else { log.Warn(). diff --git a/server/internal/capture/manager.go b/server/internal/capture/manager.go index 167fa74..91b2684 100644 --- a/server/internal/capture/manager.go +++ b/server/internal/capture/manager.go @@ -18,7 +18,6 @@ type CaptureManagerCtx struct { broadcast *BroacastManagerCtx audio *StreamSinkManagerCtx video *StreamSinkManagerCtx - } func New(desktop types.DesktopManager, config *config.Capture) *CaptureManagerCtx { @@ -53,7 +52,8 @@ func (manager *CaptureManagerCtx) Start() { go func() { for { - _ = <- manager.desktop.GetBeforeScreenSizeChangeChannel() + <-manager.desktop.GetBeforeScreenSizeChangeChannel() + if manager.video.Started() { manager.video.destroyPipeline() } @@ -66,7 +66,8 @@ func (manager *CaptureManagerCtx) Start() { go func() { for { - framerate := <- manager.desktop.GetAfterScreenSizeChangeChannel(); + framerate := <-manager.desktop.GetAfterScreenSizeChangeChannel() + if manager.video.Started() { manager.video.SetChangeFramerate(framerate) err := manager.video.createPipeline() diff --git a/server/internal/capture/pipelines.go b/server/internal/capture/pipelines.go index 499a3cd..bbe8402 100644 --- a/server/internal/capture/pipelines.go +++ b/server/internal/capture/pipelines.go @@ -107,26 +107,27 @@ func NewVideoPipeline(rtpCodec codec.RTPCodec, display string, pipelineSrc strin pipelineStr = fmt.Sprintf(videoSrc+"vp9enc target-bitrate=%d cpu-used=-5 threads=4 deadline=1 keyframe-max-dist=30 auto-alt-ref=true"+pipelineStr, display, fps, bitrate*1000) case codec.AV1().Name: - // https://gstreamer.freedesktop.org/documentation/aom/av1enc.html?gi-language=c - // gstreamer1.0-plugins-bad - // av1enc usage-profile=1 - if err := gst.CheckPlugins([]string{"ximagesrc", "vpx"}); err != nil { - return "", err - } + // https://gstreamer.freedesktop.org/documentation/aom/av1enc.html?gi-language=c + // gstreamer1.0-plugins-bad + // av1enc usage-profile=1 + // TODO: check for plugin. + if err := gst.CheckPlugins([]string{"ximagesrc", "vpx"}); err != nil { + return "", err + } - pipelineStr = strings.Join([]string{ - fmt.Sprintf(videoSrc, display, fps), - "av1enc", - fmt.Sprintf("target-bitrate=%d", bitrate*650), - "cpu-used=4", - "end-usage=cbr", -// "usage-profile=realtime", - "undershoot=95", - "keyframe-max-dist=25", - "min-quantizer=4", - "max-quantizer=20", - pipelineStr, - }, " ") + pipelineStr = strings.Join([]string{ + fmt.Sprintf(videoSrc, display, fps), + "av1enc", + fmt.Sprintf("target-bitrate=%d", bitrate*650), + "cpu-used=4", + "end-usage=cbr", + // "usage-profile=realtime", + "undershoot=95", + "keyframe-max-dist=25", + "min-quantizer=4", + "max-quantizer=20", + pipelineStr, + }, " ") case codec.H264().Name: if err := gst.CheckPlugins([]string{"ximagesrc"}); err != nil { return "", err diff --git a/server/internal/capture/streamsink.go b/server/internal/capture/streamsink.go index 01d2bd9..b0cc307 100644 --- a/server/internal/capture/streamsink.go +++ b/server/internal/capture/streamsink.go @@ -2,9 +2,9 @@ package capture import ( "errors" - "sync" "regexp" "strconv" + "sync" "github.com/rs/zerolog" "github.com/rs/zerolog/log" @@ -143,7 +143,7 @@ func (manager *StreamSinkManagerCtx) createPipeline() error { if manager.changeFramerate > 0 && manager.adaptiveFramerate { m1 := regexp.MustCompile(`framerate=\d+/1`) - pipelineStr = m1.ReplaceAllString(pipelineStr, "framerate=" + strconv.FormatInt(int64(manager.changeFramerate), 10) + "/1") + pipelineStr = m1.ReplaceAllString(pipelineStr, "framerate="+strconv.FormatInt(int64(manager.changeFramerate), 10)+"/1") } manager.logger.Info(). @@ -180,7 +180,7 @@ func (manager *StreamSinkManagerCtx) destroyPipeline() { manager.pipeline = nil } -func (manager *StreamSinkManagerCtx) GetSampleChannel() (chan types.Sample) { +func (manager *StreamSinkManagerCtx) GetSampleChannel() chan types.Sample { if manager.pipeline != nil { return manager.pipeline.Sample } diff --git a/server/internal/config/capture.go b/server/internal/config/capture.go index 083b2de..bdc4c7a 100644 --- a/server/internal/config/capture.go +++ b/server/internal/config/capture.go @@ -11,12 +11,12 @@ import ( type Capture struct { // video - Display string - VideoCodec codec.RTPCodec - VideoHWEnc string // TODO: Pipeline builder. - VideoBitrate uint // TODO: Pipeline builder. - VideoMaxFPS int16 // TODO: Pipeline builder. - VideoPipeline string + Display string + VideoCodec codec.RTPCodec + VideoHWEnc string // TODO: Pipeline builder. + VideoBitrate uint // TODO: Pipeline builder. + VideoMaxFPS int16 // TODO: Pipeline builder. + VideoPipeline string VideoAdaptiveFramerate bool // audio @@ -60,7 +60,7 @@ func (Capture) Init(cmd *cobra.Command) error { // DEPRECATED: video codec cmd.PersistentFlags().Bool("av1", false, "DEPRECATED: use video_codec") if err := viper.BindPFlag("av1", cmd.PersistentFlags().Lookup("av1")); err != nil { - return err + return err } // DEPRECATED: video codec diff --git a/server/internal/desktop/manager.go b/server/internal/desktop/manager.go index bcf0a8b..bb8cce0 100644 --- a/server/internal/desktop/manager.go +++ b/server/internal/desktop/manager.go @@ -16,21 +16,21 @@ import ( var mu = sync.Mutex{} type DesktopManagerCtx struct { - logger zerolog.Logger - wg sync.WaitGroup - shutdown chan struct{} - beforeScreenSizeChangeChannel chan bool - afterScreenSizeChangeChannel chan int16 - config *config.Desktop + logger zerolog.Logger + wg sync.WaitGroup + shutdown chan struct{} + beforeScreenSizeChangeChannel chan bool + afterScreenSizeChangeChannel chan int16 + config *config.Desktop } func New(config *config.Desktop) *DesktopManagerCtx { return &DesktopManagerCtx{ - logger: log.With().Str("module", "desktop").Logger(), - shutdown: make(chan struct{}), - beforeScreenSizeChangeChannel: make (chan bool), - afterScreenSizeChangeChannel: make (chan int16), - config: config, + logger: log.With().Str("module", "desktop").Logger(), + shutdown: make(chan struct{}), + beforeScreenSizeChangeChannel: make(chan bool), + afterScreenSizeChangeChannel: make(chan int16), + config: config, } } @@ -50,13 +50,13 @@ func (manager *DesktopManagerCtx) Start() { go func() { for { - desktopErrorMessage := <- xevent.EventErrorChannel + msg := <-xevent.EventErrorChannel manager.logger.Warn(). - Uint8("error_code", desktopErrorMessage.Error_code). - Str("message", desktopErrorMessage.Message). - Uint8("request_code", desktopErrorMessage.Request_code). - Uint8("minor_code", desktopErrorMessage.Minor_code). - Msg("X event error occurred") + Uint8("error_code", msg.Error_code). + Str("message", msg.Message). + Uint8("request_code", msg.Request_code). + Uint8("minor_code", msg.Minor_code). + Msg("X event error occurred") } }() @@ -79,11 +79,11 @@ func (manager *DesktopManagerCtx) Start() { }() } -func (manager *DesktopManagerCtx) GetBeforeScreenSizeChangeChannel() (chan bool) { +func (manager *DesktopManagerCtx) GetBeforeScreenSizeChangeChannel() chan bool { return manager.beforeScreenSizeChangeChannel } -func (manager *DesktopManagerCtx) GetAfterScreenSizeChangeChannel() (chan int16) { +func (manager *DesktopManagerCtx) GetAfterScreenSizeChangeChannel() chan int16 { return manager.afterScreenSizeChangeChannel } diff --git a/server/internal/desktop/xevent.go b/server/internal/desktop/xevent.go index efc8593..e90f551 100644 --- a/server/internal/desktop/xevent.go +++ b/server/internal/desktop/xevent.go @@ -5,22 +5,22 @@ import ( "m1k1o/neko/internal/types" ) -func (manager *DesktopManagerCtx) GetCursorChangedChannel() (chan uint64) { +func (manager *DesktopManagerCtx) GetCursorChangedChannel() chan uint64 { return xevent.CursorChangedChannel } -func (manager *DesktopManagerCtx) GetClipboardUpdatedChannel() (chan bool) { +func (manager *DesktopManagerCtx) GetClipboardUpdatedChannel() chan bool { return xevent.ClipboardUpdatedChannel } -func (manager *DesktopManagerCtx) GetFileChooserDialogOpenedChannel() (chan bool) { +func (manager *DesktopManagerCtx) GetFileChooserDialogOpenedChannel() chan bool { return xevent.FileChooserDialogOpenedChannel } -func (manager *DesktopManagerCtx) GetFileChooserDialogClosedChannel() (chan bool) { +func (manager *DesktopManagerCtx) GetFileChooserDialogClosedChannel() chan bool { return xevent.FileChooserDialogClosedChannel } -func (manager *DesktopManagerCtx) GetEventErrorChannel() (chan types.DesktopErrorMessage) { +func (manager *DesktopManagerCtx) GetEventErrorChannel() chan types.DesktopErrorMessage { return xevent.EventErrorChannel } diff --git a/server/internal/desktop/xevent/xevent.go b/server/internal/desktop/xevent/xevent.go index 63f1bf7..5b30a0a 100644 --- a/server/internal/desktop/xevent/xevent.go +++ b/server/internal/desktop/xevent/xevent.go @@ -9,6 +9,7 @@ import "C" import ( "unsafe" + "m1k1o/neko/internal/types" ) @@ -25,20 +26,22 @@ func init() { FileChooserDialogOpenedChannel = make(chan bool) EventErrorChannel = make(chan types.DesktopErrorMessage) - // Dummy goroutines since there is no consumer for the channel otherwise go func() { for { - _ = <-CursorChangedChannel + // TODO: Unused. + <-CursorChangedChannel } }() go func() { for { - _ = <-FileChooserDialogClosedChannel + // TODO: Unused. + <-FileChooserDialogClosedChannel } }() go func() { for { - _ = <-FileChooserDialogOpenedChannel + // TODO: Unused. + <-FileChooserDialogOpenedChannel } }() } @@ -72,7 +75,12 @@ func goXEventUnmapNotify(window C.Window) { //export goXEventError func goXEventError(event *C.XErrorEvent, message *C.char) { - EventErrorChannel <- types.DesktopErrorMessage{ uint8(event.error_code), C.GoString(message), uint8(event.request_code), uint8(event.minor_code) } + EventErrorChannel <- types.DesktopErrorMessage{ + Error_code: uint8(event.error_code), + Message: C.GoString(message), + Request_code: uint8(event.request_code), + Minor_code: uint8(event.minor_code), + } } //export goXEventActive diff --git a/server/internal/session/manager.go b/server/internal/session/manager.go index bd6df7e..9d48b85 100644 --- a/server/internal/session/manager.go +++ b/server/internal/session/manager.go @@ -13,25 +13,25 @@ import ( func New(capture types.CaptureManager) *SessionManager { return &SessionManager{ - logger: log.With().Str("module", "session").Logger(), - host: "", - capture: capture, + logger: log.With().Str("module", "session").Logger(), + host: "", + capture: capture, sessionChannel: make(chan types.SessionInformation, 10), - hostChannel: make(chan types.HostInformation, 10), - members: make(map[string]*Session), + hostChannel: make(chan types.HostInformation, 10), + members: make(map[string]*Session), } } type SessionManager struct { - mu sync.Mutex - logger zerolog.Logger - host string - capture types.CaptureManager - members map[string]*Session - sessionChannel chan types.SessionInformation - hostChannel chan types.HostInformation + mu sync.Mutex + logger zerolog.Logger + host string + capture types.CaptureManager + members map[string]*Session + sessionChannel chan types.SessionInformation + hostChannel chan types.HostInformation // TODO: Handle locks in sessions as flags. - controlLocked bool + controlLocked bool } func (manager *SessionManager) New(id string, admin bool, socket types.WebSocket) types.Session { @@ -50,11 +50,16 @@ func (manager *SessionManager) New(id string, admin bool, socket types.WebSocket manager.capture.Video().AddListener() manager.mu.Unlock() - manager.sessionChannel <- types.SessionInformation{ "created", id, session } + manager.sessionChannel <- types.SessionInformation{ + Type: "created", + Id: id, + Session: session, + } go func() { for { - _ = <- manager.hostChannel + // TODO: Unused. + <-manager.hostChannel } }() return session @@ -76,7 +81,10 @@ func (manager *SessionManager) SetHost(id string) error { if ok { manager.host = id - manager.hostChannel <- types.HostInformation{ "host", id } + manager.hostChannel <- types.HostInformation{ + Type: "host", + Id: id, + } return nil } @@ -94,7 +102,10 @@ func (manager *SessionManager) GetHost() (types.Session, bool) { func (manager *SessionManager) ClearHost() { id := manager.host manager.host = "" - manager.hostChannel <- types.HostInformation{ "host_cleared", id } + manager.hostChannel <- types.HostInformation{ + Type: "host_cleared", + Id: id, + } } func (manager *SessionManager) Has(id string) bool { @@ -171,7 +182,11 @@ func (manager *SessionManager) Destroy(id string) { manager.capture.Video().RemoveListener() manager.mu.Unlock() - manager.sessionChannel <- types.SessionInformation{ "destroyed", id, session } + manager.sessionChannel <- types.SessionInformation{ + Type: "destroyed", + Id: id, + Session: session, + } manager.logger.Err(err).Str("session_id", id).Msg("destroying session") return } @@ -229,10 +244,10 @@ func (manager *SessionManager) AdminBroadcast(v interface{}, exclude interface{} return nil } -func (manager *SessionManager) GetSessionChannel() (chan types.SessionInformation) { +func (manager *SessionManager) GetSessionChannel() chan types.SessionInformation { return manager.sessionChannel } -func (manager *SessionManager) GetHostChannel() (chan types.HostInformation) { +func (manager *SessionManager) GetHostChannel() chan types.HostInformation { return manager.hostChannel } diff --git a/server/internal/session/session.go b/server/internal/session/session.go index 368de0d..d81c16e 100644 --- a/server/internal/session/session.go +++ b/server/internal/session/session.go @@ -78,7 +78,11 @@ func (session *Session) SetPeer(peer types.Peer) error { func (session *Session) SetConnected(connected bool) error { session.connected = connected if connected { - session.manager.sessionChannel <- types.SessionInformation{ "connected", session.id, session } + session.manager.sessionChannel <- types.SessionInformation{ + Type: "connected", + Id: session.id, + Session: session, + } } return nil } diff --git a/server/internal/types/capture.go b/server/internal/types/capture.go index 67ae143..a54be65 100644 --- a/server/internal/types/capture.go +++ b/server/internal/types/capture.go @@ -25,7 +25,7 @@ type StreamSinkManager interface { ListenersCount() int Started() bool - GetSampleChannel() (chan Sample) + GetSampleChannel() chan Sample SetChangeFramerate(rate int16) SetAdaptiveFramerate(allow bool) } diff --git a/server/internal/types/codec/codecs.go b/server/internal/types/codec/codecs.go index 9cbe67a..8fe0691 100644 --- a/server/internal/types/codec/codecs.go +++ b/server/internal/types/codec/codecs.go @@ -116,6 +116,7 @@ func H264() RTPCodec { }, } } + // TODO: Profile ID. func AV1() RTPCodec { return RTPCodec{ diff --git a/server/internal/types/desktop.go b/server/internal/types/desktop.go index 7fbad6e..e2be4d2 100644 --- a/server/internal/types/desktop.go +++ b/server/internal/types/desktop.go @@ -43,8 +43,8 @@ type DesktopErrorMessage struct { type DesktopManager interface { Start() Shutdown() error - GetBeforeScreenSizeChangeChannel() (chan bool) - GetAfterScreenSizeChangeChannel() (chan int16) + GetBeforeScreenSizeChangeChannel() chan bool + GetAfterScreenSizeChangeChannel() chan int16 // clipboard ReadClipboard() string @@ -72,9 +72,9 @@ type DesktopManager interface { GetScreenshotImage() *image.RGBA // xevent - GetCursorChangedChannel() (chan uint64) - GetClipboardUpdatedChannel() (chan bool) - GetFileChooserDialogOpenedChannel() (chan bool) - GetFileChooserDialogClosedChannel() (chan bool) - GetEventErrorChannel() (chan DesktopErrorMessage) + GetCursorChangedChannel() chan uint64 + GetClipboardUpdatedChannel() chan bool + GetFileChooserDialogOpenedChannel() chan bool + GetFileChooserDialogClosedChannel() chan bool + GetEventErrorChannel() chan DesktopErrorMessage } diff --git a/server/internal/types/session.go b/server/internal/types/session.go index 40496ef..12e69f2 100644 --- a/server/internal/types/session.go +++ b/server/internal/types/session.go @@ -8,9 +8,9 @@ type Member struct { } type SessionInformation struct { - Type string - Id string - Session Session + Type string + Id string + Session Session } type HostInformation struct { @@ -57,6 +57,6 @@ type SessionManager interface { Clear() error Broadcast(v interface{}, exclude interface{}) error AdminBroadcast(v interface{}, exclude interface{}) error - GetSessionChannel() (chan SessionInformation) - GetHostChannel() (chan HostInformation) + GetSessionChannel() chan SessionInformation + GetHostChannel() chan HostInformation } diff --git a/server/internal/webrtc/webrtc.go b/server/internal/webrtc/webrtc.go index c0d7e15..ab1b347 100644 --- a/server/internal/webrtc/webrtc.go +++ b/server/internal/webrtc/webrtc.go @@ -62,7 +62,8 @@ func (manager *WebRTCManager) Start() { time.Sleep(50 * time.Millisecond) continue } - newSample := <- manager.capture.Audio().GetSampleChannel() + + newSample := <-manager.capture.Audio().GetSampleChannel() err := manager.audioTrack.WriteSample(media.Sample(newSample)) if err != nil && errors.Is(err, io.ErrClosedPipe) { manager.logger.Warn().Err(err).Msg("audio pipeline failed to write") @@ -87,7 +88,8 @@ func (manager *WebRTCManager) Start() { time.Sleep(50 * time.Millisecond) continue } - newSample := <- manager.capture.Video().GetSampleChannel() + + newSample := <-manager.capture.Video().GetSampleChannel() err := manager.videoTrack.WriteSample(media.Sample(newSample)) if err != nil && errors.Is(err, io.ErrClosedPipe) { manager.logger.Warn().Err(err).Msg("video pipeline failed to write") diff --git a/server/internal/websocket/websocket.go b/server/internal/websocket/websocket.go index 190759b..7955fd8 100644 --- a/server/internal/websocket/websocket.go +++ b/server/internal/websocket/websocket.go @@ -101,9 +101,9 @@ type WebSocketHandler struct { } func (ws *WebSocketHandler) Start() { - go func () { + go func() { for { - channelMessage := <- ws.sessions.GetSessionChannel() + channelMessage := <-ws.sessions.GetSessionChannel() switch channelMessage.Type { case "created": @@ -189,7 +189,7 @@ func (ws *WebSocketHandler) Start() { go func() { for { - _ = <- ws.desktop.GetClipboardUpdatedChannel() + _ = <-ws.desktop.GetClipboardUpdatedChannel() session, ok := ws.sessions.GetHost() if !ok { return