diff --git a/server/cmd/serve.go b/server/cmd/serve.go index b0d7c7dd..d8c4f605 100644 --- a/server/cmd/serve.go +++ b/server/cmd/serve.go @@ -21,7 +21,6 @@ func init() { neko.Service.WebRTC, neko.Service.Capture, neko.Service.Desktop, - neko.Service.Broadcast, neko.Service.WebSocket, } diff --git a/server/internal/capture/broadcast.go b/server/internal/capture/broadcast.go index bec0ecfa..ff566cee 100644 --- a/server/internal/capture/broadcast.go +++ b/server/internal/capture/broadcast.go @@ -7,98 +7,115 @@ import ( "github.com/rs/zerolog/log" "m1k1o/neko/internal/capture/gst" - "m1k1o/neko/internal/config" + "m1k1o/neko/internal/types" ) -type BroadcastManager struct { - mu sync.Mutex - logger zerolog.Logger - pipeline *gst.Pipeline - capture *config.Capture - config *config.Broadcast - enabled bool - url string +type BroacastManagerCtx struct { + logger zerolog.Logger + mu sync.Mutex + + pipeline *gst.Pipeline + pipelineMu sync.Mutex + pipelineFn func(url string) (*gst.Pipeline, error) + + url string + started bool } -func NewBroadcast(capture *config.Capture, config *config.Broadcast) *BroadcastManager { - return &BroadcastManager{ - logger: log.With().Str("module", "broadcast").Logger(), - capture: capture, - config: config, - enabled: config.Enabled, - url: config.URL, +func broadcastNew(pipelineFn func(url string) (*gst.Pipeline, error), url string, started bool) *BroacastManagerCtx { + logger := log.With(). + Str("module", "capture"). + Str("submodule", "broadcast"). + Logger() + + return &BroacastManagerCtx{ + logger: logger, + pipelineFn: pipelineFn, + url: url, + started: started, } } -func (manager *BroadcastManager) Shutdown() error { - manager.Destroy() +func (manager *BroacastManagerCtx) shutdown() { + manager.logger.Info().Msgf("shutdown") + + manager.destroyPipeline() +} + +func (manager *BroacastManagerCtx) Start(url string) error { + manager.mu.Lock() + defer manager.mu.Unlock() + + err := manager.createPipeline() + if err != nil { + return err + } + + manager.url = url + manager.started = true return nil } -func (manager *BroadcastManager) Start() error { - if !manager.enabled || manager.IsActive() { - return nil +func (manager *BroacastManagerCtx) Stop() { + manager.mu.Lock() + defer manager.mu.Unlock() + + manager.started = false + manager.destroyPipeline() +} + +func (manager *BroacastManagerCtx) Started() bool { + manager.mu.Lock() + defer manager.mu.Unlock() + + return manager.started +} + +func (manager *BroacastManagerCtx) Url() string { + manager.mu.Lock() + defer manager.mu.Unlock() + + return manager.url +} + +func (manager *BroacastManagerCtx) createPipeline() error { + manager.pipelineMu.Lock() + defer manager.pipelineMu.Unlock() + + if manager.pipeline != nil { + return types.ErrCapturePipelineAlreadyExists } var err error - manager.pipeline, err = CreateRTMPPipeline( - manager.capture.Device, - manager.capture.Display, - manager.config.Pipeline, - manager.url, - ) + manager.logger.Info(). + Str("url", manager.url). + Msgf("creating pipeline") + + manager.pipeline, err = manager.pipelineFn(manager.url) if err != nil { - manager.pipeline = nil return err } manager.logger.Info(). - Str("audio_device", manager.capture.Device). - Str("video_display", manager.capture.Display). - Str("rtmp_pipeline_src", manager.pipeline.Src). - Msgf("RTMP pipeline is starting...") + Str("url", manager.url). + Str("src", manager.pipeline.Src). + Msgf("created pipeline") manager.pipeline.Play() + return nil } -func (manager *BroadcastManager) Stop() { - if !manager.IsActive() { +func (manager *BroacastManagerCtx) destroyPipeline() { + manager.pipelineMu.Lock() + defer manager.pipelineMu.Unlock() + + if manager.pipeline == nil { return } - manager.pipeline.Stop() + manager.pipeline.Destroy() + manager.logger.Info().Msgf("destroying pipeline") manager.pipeline = nil } - -func (manager *BroadcastManager) IsActive() bool { - return manager.pipeline != nil -} - -func (manager *BroadcastManager) Create(url string) error { - manager.mu.Lock() - defer manager.mu.Unlock() - - manager.url = url - manager.enabled = true - - err := manager.Start() - if err != nil { - manager.enabled = false - } - - return err -} - -func (manager *BroadcastManager) Destroy() { - manager.mu.Lock() - defer manager.mu.Unlock() - - manager.Stop() - manager.enabled = false -} - -func (manager *BroadcastManager) GetUrl() string { - return manager.url -} diff --git a/server/internal/capture/gst/gst.c b/server/internal/capture/gst/gst.c index 78bfa9ee..e538f5d1 100644 --- a/server/internal/capture/gst/gst.c +++ b/server/internal/capture/gst/gst.c @@ -1,83 +1,202 @@ #include "gst.h" -typedef struct SampleHandlerUserData { - int pipelineId; -} SampleHandlerUserData; - -void gstreamer_init(void) { - gst_init(NULL, NULL); +static void gstreamer_pipeline_log(GstPipelineCtx *ctx, char* level, const char* format, ...) { + va_list argptr; + va_start(argptr, format); + char buffer[100]; + vsprintf(buffer, format, argptr); + va_end(argptr); + goPipelineLog(level, buffer, ctx->pipelineId); } -static gboolean gstreamer_send_bus_call(GstBus *bus, GstMessage *msg, gpointer data) { +static gboolean gstreamer_bus_call(GstBus *bus, GstMessage *msg, gpointer user_data) { + GstPipelineCtx *ctx = (GstPipelineCtx *)user_data; + switch (GST_MESSAGE_TYPE(msg)) { + case GST_MESSAGE_EOS: { + gstreamer_pipeline_log(ctx, "fatal", "end of stream"); + break; + } - case GST_MESSAGE_EOS: - g_print("End of stream\n"); - exit(1); - break; + case GST_MESSAGE_STATE_CHANGED: { + GstState old_state, new_state; + gst_message_parse_state_changed(msg, &old_state, &new_state, NULL); - case GST_MESSAGE_ERROR: { - gchar *debug; - GError *error; + gstreamer_pipeline_log(ctx, "debug", + "element %s changed state from %s to %s", + GST_OBJECT_NAME(msg->src), + gst_element_state_get_name(old_state), + gst_element_state_get_name(new_state)); + break; + } - gst_message_parse_error(msg, &error, &debug); - g_free(debug); + case GST_MESSAGE_TAG: { + GstTagList *tags = NULL; + gst_message_parse_tag(msg, &tags); - g_printerr("Error: %s\n", error->message); - g_error_free(error); - exit(1); - } - default: - break; + gstreamer_pipeline_log(ctx, "debug", + "got tags from element %s", + GST_OBJECT_NAME(msg->src)); + + gst_tag_list_unref(tags); + break; + } + + case GST_MESSAGE_ERROR: { + GError *err = NULL; + gchar *dbg_info = NULL; + gst_message_parse_error(msg, &err, &dbg_info); + + gstreamer_pipeline_log(ctx, "error", + "error from element %s: %s", + GST_OBJECT_NAME(msg->src), err->message); + gstreamer_pipeline_log(ctx, "warn", + "debugging info: %s", + (dbg_info) ? dbg_info : "none"); + + g_error_free(err); + g_free(dbg_info); + break; + } + + default: + gstreamer_pipeline_log(ctx, "trace", "unknown message"); + break; } return TRUE; } -GstFlowReturn gstreamer_send_new_sample_handler(GstElement *object, gpointer user_data) { +GstPipelineCtx *gstreamer_pipeline_create(char *pipelineStr, int pipelineId, GError **error) { + GstElement *pipeline = gst_parse_launch(pipelineStr, error); + if (pipeline == NULL) return NULL; + + // create gstreamer pipeline context + GstPipelineCtx *ctx = calloc(1, sizeof(GstPipelineCtx)); + ctx->pipelineId = pipelineId; + ctx->pipeline = pipeline; + + GstBus *bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline)); + gst_bus_add_watch(bus, gstreamer_bus_call, ctx); + gst_object_unref(bus); + + return ctx; +} + +static GstFlowReturn gstreamer_send_new_sample_handler(GstElement *object, gpointer user_data) { + GstPipelineCtx *ctx = (GstPipelineCtx *)user_data; GstSample *sample = NULL; GstBuffer *buffer = NULL; gpointer copy = NULL; gsize copy_size = 0; - SampleHandlerUserData *s = (SampleHandlerUserData *)user_data; - g_signal_emit_by_name (object, "pull-sample", &sample); + g_signal_emit_by_name(object, "pull-sample", &sample); if (sample) { buffer = gst_sample_get_buffer(sample); if (buffer) { gst_buffer_extract_dup(buffer, 0, gst_buffer_get_size(buffer), ©, ©_size); - goHandlePipelineBuffer(copy, copy_size, GST_BUFFER_DURATION(buffer), s->pipelineId); + goHandlePipelineBuffer(copy, copy_size, GST_BUFFER_DURATION(buffer), ctx->pipelineId); } - gst_sample_unref (sample); + gst_sample_unref(sample); } return GST_FLOW_OK; } -GstElement *gstreamer_send_create_pipeline(char *pipeline, GError **error) { - return gst_parse_launch(pipeline, error); +void gstreamer_pipeline_attach_appsink(GstPipelineCtx *ctx, char *sinkName) { + ctx->appsink = gst_bin_get_by_name(GST_BIN(ctx->pipeline), sinkName); + g_object_set(ctx->appsink, "emit-signals", TRUE, NULL); + g_signal_connect(ctx->appsink, "new-sample", G_CALLBACK(gstreamer_send_new_sample_handler), ctx); } -void gstreamer_send_start_pipeline(GstElement *pipeline, int pipelineId) { - SampleHandlerUserData *s = calloc(1, sizeof(SampleHandlerUserData)); - s->pipelineId = pipelineId; - - GstBus *bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline)); - gst_bus_add_watch(bus, gstreamer_send_bus_call, NULL); - gst_object_unref(bus); - - GstElement *appsink = gst_bin_get_by_name(GST_BIN(pipeline), "appsink"); - g_object_set(appsink, "emit-signals", TRUE, NULL); - g_signal_connect(appsink, "new-sample", G_CALLBACK(gstreamer_send_new_sample_handler), s); - gst_object_unref(appsink); - - gst_element_set_state(pipeline, GST_STATE_PLAYING); +void gstreamer_pipeline_attach_appsrc(GstPipelineCtx *ctx, char *srcName) { + ctx->appsrc = gst_bin_get_by_name(GST_BIN(ctx->pipeline), srcName); } -void gstreamer_send_play_pipeline(GstElement *pipeline) { - gst_element_set_state(pipeline, GST_STATE_PLAYING); +void gstreamer_pipeline_play(GstPipelineCtx *ctx) { + gst_element_set_state(GST_ELEMENT(ctx->pipeline), GST_STATE_PLAYING); } -void gstreamer_send_stop_pipeline(GstElement *pipeline) { - gst_element_set_state(pipeline, GST_STATE_NULL); +void gstreamer_pipeline_pause(GstPipelineCtx *ctx) { + gst_element_set_state(GST_ELEMENT(ctx->pipeline), GST_STATE_PAUSED); +} + +void gstreamer_pipeline_destory(GstPipelineCtx *ctx) { + // end appsrc, if exists + if (ctx->appsrc) { + gst_app_src_end_of_stream(GST_APP_SRC(ctx->appsrc)); + } + + // send pipeline eos + gst_element_send_event(GST_ELEMENT(ctx->pipeline), gst_event_new_eos()); + + // set null state + gst_element_set_state(GST_ELEMENT(ctx->pipeline), GST_STATE_NULL); + + if (ctx->appsink) { + gst_object_unref(ctx->appsink); + ctx->appsink = NULL; + } + + if (ctx->appsrc) { + gst_object_unref(ctx->appsrc); + ctx->appsrc = NULL; + } + + gst_object_unref(ctx->pipeline); +} + +void gstreamer_pipeline_push(GstPipelineCtx *ctx, void *buffer, int bufferLen) { + if (ctx->appsrc != NULL) { + gpointer p = g_memdup(buffer, bufferLen); + GstBuffer *buffer = gst_buffer_new_wrapped(p, bufferLen); + gst_app_src_push_buffer(GST_APP_SRC(ctx->appsrc), buffer); + } +} + +gboolean gstreamer_pipeline_set_prop_int(GstPipelineCtx *ctx, char *binName, char *prop, gint value) { + GstElement *el = gst_bin_get_by_name(GST_BIN(ctx->pipeline), binName); + if (el == NULL) return FALSE; + + g_object_set(G_OBJECT(el), + prop, value, + NULL); + + gst_object_unref(el); + return TRUE; +} + +gboolean gstreamer_pipeline_set_caps_framerate(GstPipelineCtx *ctx, const gchar* binName, gint numerator, gint denominator) { + GstElement *el = gst_bin_get_by_name(GST_BIN(ctx->pipeline), binName); + if (el == NULL) return FALSE; + + GstCaps *caps = gst_caps_new_simple("video/x-raw", + "framerate", GST_TYPE_FRACTION, numerator, denominator, + NULL); + + g_object_set(G_OBJECT(el), + "caps", caps, + NULL); + + gst_caps_unref(caps); + gst_object_unref(el); + return TRUE; +} + +gboolean gstreamer_pipeline_set_caps_resolution(GstPipelineCtx *ctx, const gchar* binName, gint width, gint height) { + GstElement *el = gst_bin_get_by_name(GST_BIN(ctx->pipeline), binName); + if (el == NULL) return FALSE; + + GstCaps *caps = gst_caps_new_simple("video/x-raw", + "width", G_TYPE_INT, width, + "height", G_TYPE_INT, height, + NULL); + + g_object_set(G_OBJECT(el), + "caps", caps, + NULL); + + gst_caps_unref(caps); + gst_object_unref(el); + return TRUE; } diff --git a/server/internal/capture/gst/gst.go b/server/internal/capture/gst/gst.go index 4d6d8167..57180dc0 100644 --- a/server/internal/capture/gst/gst.go +++ b/server/internal/capture/gst/gst.go @@ -9,68 +9,146 @@ import "C" import ( "fmt" "sync" + "sync/atomic" "time" "unsafe" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" + "m1k1o/neko/internal/types" ) -// Pipeline is a wrapper for a GStreamer Pipeline type Pipeline struct { - Pipeline *C.GstElement - Sample chan types.Sample - Src string - id int + id int + logger zerolog.Logger + Src string + Ctx *C.GstPipelineCtx + Sample chan types.Sample } +var pSerial int32 var pipelines = make(map[int]*Pipeline) var pipelinesLock sync.Mutex var registry *C.GstRegistry func init() { - C.gstreamer_init() + C.gst_init(nil, nil) registry = C.gst_registry_get() } -// CreatePipeline creates a GStreamer Pipeline func CreatePipeline(pipelineStr string) (*Pipeline, error) { + id := atomic.AddInt32(&pSerial, 1) + pipelineStrUnsafe := C.CString(pipelineStr) defer C.free(unsafe.Pointer(pipelineStrUnsafe)) pipelinesLock.Lock() defer pipelinesLock.Unlock() - var err *C.GError - gstPipeline := C.gstreamer_send_create_pipeline(pipelineStrUnsafe, &err) - if err != nil { - defer C.g_error_free(err) - return nil, fmt.Errorf("%s", C.GoString(err.message)) + var gstError *C.GError + ctx := C.gstreamer_pipeline_create(pipelineStrUnsafe, C.int(id), &gstError) + + if gstError != nil { + defer C.g_error_free(gstError) + return nil, fmt.Errorf("(pipeline error) %s", C.GoString(gstError.message)) } p := &Pipeline{ - Pipeline: gstPipeline, - Sample: make(chan types.Sample), - Src: pipelineStr, - id: len(pipelines), + id: int(id), + logger: log.With(). + Str("module", "capture"). + Str("submodule", "gstreamer"). + Int("pipeline_id", int(id)).Logger(), + Src: pipelineStr, + Ctx: ctx, + Sample: make(chan types.Sample), } pipelines[p.id] = p return p, nil } -// Start starts the GStreamer Pipeline -func (p *Pipeline) Start() { - C.gstreamer_send_start_pipeline(p.Pipeline, C.int(p.id)) +func (p *Pipeline) AttachAppsink(sinkName string) { + sinkNameUnsafe := C.CString(sinkName) + defer C.free(unsafe.Pointer(sinkNameUnsafe)) + + C.gstreamer_pipeline_attach_appsink(p.Ctx, sinkNameUnsafe) +} + +func (p *Pipeline) AttachAppsrc(srcName string) { + srcNameUnsafe := C.CString(srcName) + defer C.free(unsafe.Pointer(srcNameUnsafe)) + + C.gstreamer_pipeline_attach_appsrc(p.Ctx, srcNameUnsafe) } -// Play starts the GStreamer Pipeline func (p *Pipeline) Play() { - C.gstreamer_send_play_pipeline(p.Pipeline) + C.gstreamer_pipeline_play(p.Ctx) } -// Stop stops the GStreamer Pipeline -func (p *Pipeline) Stop() { - C.gstreamer_send_stop_pipeline(p.Pipeline) +func (p *Pipeline) Pause() { + C.gstreamer_pipeline_pause(p.Ctx) +} + +func (p *Pipeline) Destroy() { + C.gstreamer_pipeline_destory(p.Ctx) + + pipelinesLock.Lock() + delete(pipelines, p.id) + pipelinesLock.Unlock() + + close(p.Sample) + C.free(unsafe.Pointer(p.Ctx)) + p = nil +} + +func (p *Pipeline) Push(buffer []byte) { + bytes := C.CBytes(buffer) + defer C.free(bytes) + + C.gstreamer_pipeline_push(p.Ctx, bytes, C.int(len(buffer))) +} + +func (p *Pipeline) SetPropInt(binName string, prop string, value int) bool { + cBinName := C.CString(binName) + defer C.free(unsafe.Pointer(cBinName)) + + cProp := C.CString(prop) + defer C.free(unsafe.Pointer(cProp)) + + cValue := C.int(value) + + p.logger.Debug().Msgf("setting prop %s of %s to %d", prop, binName, value) + + ok := C.gstreamer_pipeline_set_prop_int(p.Ctx, cBinName, cProp, cValue) + return ok == C.TRUE +} + +func (p *Pipeline) SetCapsFramerate(binName string, numerator, denominator int) bool { + cBinName := C.CString(binName) + cNumerator := C.int(numerator) + cDenominator := C.int(denominator) + + defer C.free(unsafe.Pointer(cBinName)) + + p.logger.Debug().Msgf("setting caps framerate of %s to %d/%d", binName, numerator, denominator) + + ok := C.gstreamer_pipeline_set_caps_framerate(p.Ctx, cBinName, cNumerator, cDenominator) + return ok == C.TRUE +} + +func (p *Pipeline) SetCapsResolution(binName string, width, height int) bool { + cBinName := C.CString(binName) + cWidth := C.int(width) + cHeight := C.int(height) + + defer C.free(unsafe.Pointer(cBinName)) + + p.logger.Debug().Msgf("setting caps resolution of %s to %dx%d", binName, width, height) + + ok := C.gstreamer_pipeline_set_caps_resolution(p.Ctx, cBinName, cWidth, cHeight) + return ok == C.TRUE } // gst-inspect-1.0 @@ -90,14 +168,35 @@ func CheckPlugins(plugins []string) error { //export goHandlePipelineBuffer func goHandlePipelineBuffer(buffer unsafe.Pointer, bufferLen C.int, duration C.int, pipelineID C.int) { + defer C.free(buffer) + pipelinesLock.Lock() pipeline, ok := pipelines[int(pipelineID)] pipelinesLock.Unlock() if ok { - pipeline.Sample <- types.Sample{Data: C.GoBytes(buffer, bufferLen), Timestamp: time.Now(), Duration: time.Duration(duration)} + pipeline.Sample <- types.Sample{ + Data: C.GoBytes(buffer, bufferLen), + Duration: time.Duration(duration), + } } else { - fmt.Printf("discarding buffer, no pipeline with id %d", int(pipelineID)) + log.Warn(). + Str("module", "capture"). + Str("submodule", "gstreamer"). + Int("pipeline_id", int(pipelineID)). + Msgf("discarding sample, pipeline not found") } - C.free(buffer) +} + +//export goPipelineLog +func goPipelineLog(levelUnsafe *C.char, msgUnsafe *C.char, pipelineID C.int) { + levelStr := C.GoString(levelUnsafe) + msg := C.GoString(msgUnsafe) + + level, _ := zerolog.ParseLevel(levelStr) + log.WithLevel(level). + Str("module", "capture"). + Str("submodule", "gstreamer"). + Int("pipeline_id", int(pipelineID)). + Msg(msg) } diff --git a/server/internal/capture/gst/gst.h b/server/internal/capture/gst/gst.h index 9b19ed29..a562ec34 100644 --- a/server/internal/capture/gst/gst.h +++ b/server/internal/capture/gst/gst.h @@ -1,13 +1,27 @@ #pragma once +#include #include #include +typedef struct GstPipelineCtx { + int pipelineId; + GstElement *pipeline; + GstElement *appsink; + GstElement *appsrc; +} GstPipelineCtx; + extern void goHandlePipelineBuffer(void *buffer, int bufferLen, int samples, int pipelineId); +extern void goPipelineLog(char *level, char *msg, int pipelineId); -GstElement *gstreamer_send_create_pipeline(char *pipeline, GError **error); +GstPipelineCtx *gstreamer_pipeline_create(char *pipelineStr, int pipelineId, GError **error); +void gstreamer_pipeline_attach_appsink(GstPipelineCtx *ctx, char *sinkName); +void gstreamer_pipeline_attach_appsrc(GstPipelineCtx *ctx, char *srcName); +void gstreamer_pipeline_play(GstPipelineCtx *ctx); +void gstreamer_pipeline_pause(GstPipelineCtx *ctx); +void gstreamer_pipeline_destory(GstPipelineCtx *ctx); +void gstreamer_pipeline_push(GstPipelineCtx *ctx, void *buffer, int bufferLen); -void gstreamer_send_start_pipeline(GstElement *pipeline, int pipelineId); -void gstreamer_send_play_pipeline(GstElement *pipeline); -void gstreamer_send_stop_pipeline(GstElement *pipeline); -void gstreamer_init(void); +gboolean gstreamer_pipeline_set_prop_int(GstPipelineCtx *ctx, char *binName, char *prop, gint value); +gboolean gstreamer_pipeline_set_caps_framerate(GstPipelineCtx *ctx, const gchar* binName, gint numerator, gint denominator); +gboolean gstreamer_pipeline_set_caps_resolution(GstPipelineCtx *ctx, const gchar* binName, gint width, gint height); diff --git a/server/internal/capture/manager.go b/server/internal/capture/manager.go index 50930285..ea1ebdd3 100644 --- a/server/internal/capture/manager.go +++ b/server/internal/capture/manager.go @@ -1,199 +1,99 @@ package capture import ( - "time" + "errors" + + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" "m1k1o/neko/internal/capture/gst" "m1k1o/neko/internal/config" "m1k1o/neko/internal/types" - - "github.com/kataras/go-events" - "github.com/rs/zerolog" - "github.com/rs/zerolog/log" ) type CaptureManagerCtx struct { - logger zerolog.Logger - video *gst.Pipeline - audio *gst.Pipeline - config *config.Capture - broadcast types.BroadcastManager - desktop types.DesktopManager - cleanup *time.Ticker - shutdown chan bool - emmiter events.EventEmmiter - streaming bool + logger zerolog.Logger + desktop types.DesktopManager + + // sinks + broadcast *BroacastManagerCtx + audio *StreamSinkManagerCtx + video *StreamSinkManagerCtx } -func New(desktop types.DesktopManager, broadcast types.BroadcastManager, config *config.Capture) *CaptureManagerCtx { +func New(desktop types.DesktopManager, config *config.Capture) *CaptureManagerCtx { + logger := log.With().Str("module", "capture").Logger() + return &CaptureManagerCtx{ - logger: log.With().Str("module", "capture").Logger(), - cleanup: time.NewTicker(1 * time.Second), - shutdown: make(chan bool), - emmiter: events.New(), - config: config, - broadcast: broadcast, - desktop: desktop, - streaming: false, + logger: logger, + desktop: desktop, + + // sinks + broadcast: broadcastNew(func(url string) (*gst.Pipeline, error) { + return NewBroadcastPipeline(config.AudioDevice, config.Display, config.BroadcastPipeline, url) + }, config.BroadcastUrl, config.BroadcastStarted), + audio: streamSinkNew(config.AudioCodec, func() (*gst.Pipeline, error) { + return NewAudioPipeline(config.AudioCodec, config.AudioDevice, config.AudioPipeline, config.AudioBitrate) + }, "audio"), + video: streamSinkNew(config.VideoCodec, func() (*gst.Pipeline, error) { + return NewVideoPipeline(config.VideoCodec, config.Display, config.VideoPipeline, config.VideoMaxFPS, config.VideoBitrate, config.VideoHWEnc) + }, "audio"), } } -func (manager *CaptureManagerCtx) VideoCodec() string { - return manager.config.VideoCodec -} - -func (manager *CaptureManagerCtx) AudioCodec() string { - return manager.config.AudioCodec -} - func (manager *CaptureManagerCtx) Start() { - manager.createPipelines() - if err := manager.broadcast.Start(); err != nil { - manager.logger.Panic().Err(err).Msg("unable to create rtmp pipeline") + if manager.broadcast.Started() { + if err := manager.broadcast.createPipeline(); err != nil { + manager.logger.Panic().Err(err).Msg("unable to create broadcast pipeline") + } } - go func() { - defer func() { - manager.logger.Info().Msg("shutdown") - }() + manager.desktop.OnBeforeScreenSizeChange(func() { + if manager.video.Started() { + manager.video.destroyPipeline() + } - for { - select { - case <-manager.shutdown: - return - case sample := <-manager.video.Sample: - manager.emmiter.Emit("video", sample) - case sample := <-manager.audio.Sample: - manager.emmiter.Emit("audio", sample) - case <-manager.cleanup.C: - // TODO: refactor. + if manager.broadcast.Started() { + manager.broadcast.destroyPipeline() + } + }) + + manager.desktop.OnAfterScreenSizeChange(func() { + if manager.video.Started() { + err := manager.video.createPipeline() + if err != nil && !errors.Is(err, types.ErrCapturePipelineAlreadyExists) { + manager.logger.Panic().Err(err).Msg("unable to recreate video pipeline") } } - }() + + if manager.broadcast.Started() { + err := manager.broadcast.createPipeline() + if err != nil && !errors.Is(err, types.ErrCapturePipelineAlreadyExists) { + manager.logger.Panic().Err(err).Msg("unable to recreate broadcast pipeline") + } + } + }) } func (manager *CaptureManagerCtx) Shutdown() error { - manager.logger.Info().Msgf("capture shutting down") - manager.video.Stop() - manager.audio.Stop() - manager.broadcast.Stop() + manager.logger.Info().Msgf("shutdown") - manager.cleanup.Stop() - manager.shutdown <- true - return nil -} + manager.broadcast.shutdown() -func (manager *CaptureManagerCtx) OnVideoFrame(listener func(sample types.Sample)) { - manager.emmiter.On("video", func(payload ...interface{}) { - listener(payload[0].(types.Sample)) - }) -} - -func (manager *CaptureManagerCtx) OnAudioFrame(listener func(sample types.Sample)) { - manager.emmiter.On("audio", func(payload ...interface{}) { - listener(payload[0].(types.Sample)) - }) -} - -func (manager *CaptureManagerCtx) StartStream() { - manager.createPipelines() - - manager.logger.Info(). - Str("video_display", manager.config.Display). - Str("video_codec", manager.config.VideoCodec). - Str("audio_device", manager.config.Device). - Str("audio_codec", manager.config.AudioCodec). - Str("audio_pipeline_src", manager.audio.Src). - Str("video_pipeline_src", manager.video.Src). - Msgf("Pipelines starting...") - - manager.video.Start() - manager.audio.Start() - manager.streaming = true -} - -func (manager *CaptureManagerCtx) StopStream() { - manager.logger.Info().Msgf("Pipelines shutting down...") - manager.video.Stop() - manager.audio.Stop() - manager.streaming = false -} - -func (manager *CaptureManagerCtx) Streaming() bool { - return manager.streaming -} - -func (manager *CaptureManagerCtx) createPipelines() { - // handle maximum fps - rate := manager.desktop.GetScreenSize().Rate - if manager.config.MaxFPS != 0 && manager.config.MaxFPS < rate { - rate = manager.config.MaxFPS - } - - var err error - manager.video, err = CreateAppPipeline( - manager.config.VideoCodec, - manager.config.Display, - manager.config.VideoParams, - rate, - manager.config.VideoBitrate, - manager.config.VideoHWEnc, - ) - if err != nil { - manager.logger.Panic().Err(err).Msg("unable to create video pipeline") - } - - manager.audio, err = CreateAppPipeline( - manager.config.AudioCodec, - manager.config.Device, - manager.config.AudioParams, - 0, // fps: n/a for audio - manager.config.AudioBitrate, - "", // hwenc: n/a for audio - ) - if err != nil { - manager.logger.Panic().Err(err).Msg("unable to create audio pipeline") - } -} - -func (manager *CaptureManagerCtx) ChangeResolution(width int, height int, rate int16) error { - manager.video.Stop() - manager.broadcast.Stop() - - defer func() { - manager.video.Start() - if err := manager.broadcast.Start(); err != nil { - manager.logger.Panic().Err(err).Msg("unable to create rtmp pipeline") - } - - manager.logger.Info().Msg("starting video pipeline...") - }() - - if err := manager.desktop.SetScreenSize(types.ScreenSize{ - Width: width, - Height: height, - Rate: rate, - }); err != nil { - return err - } - - // handle maximum fps - if manager.config.MaxFPS != 0 && manager.config.MaxFPS < rate { - rate = manager.config.MaxFPS - } - - var err error - manager.video, err = CreateAppPipeline( - manager.config.VideoCodec, - manager.config.Display, - manager.config.VideoParams, - rate, - manager.config.VideoBitrate, - manager.config.VideoHWEnc, - ) - if err != nil { - manager.logger.Panic().Err(err).Msg("unable to create new video pipeline") - } + manager.audio.shutdown() + manager.video.shutdown() return nil } + +func (manager *CaptureManagerCtx) Broadcast() types.BroadcastManager { + return manager.broadcast +} + +func (manager *CaptureManagerCtx) Audio() types.StreamSinkManager { + return manager.audio +} + +func (manager *CaptureManagerCtx) Video() types.StreamSinkManager { + return manager.video +} diff --git a/server/internal/capture/pipelines.go b/server/internal/capture/pipelines.go index 4d24d68a..738bece5 100644 --- a/server/internal/capture/pipelines.go +++ b/server/internal/capture/pipelines.go @@ -2,8 +2,10 @@ package capture import ( "fmt" - "m1k1o/neko/internal/capture/gst" "strings" + + "m1k1o/neko/internal/capture/gst" + "m1k1o/neko/internal/types/codec" ) /* @@ -31,38 +33,36 @@ const ( audioSrc = "pulsesrc device=%s ! audio/x-raw,channels=2 ! audioconvert ! " ) -// CreateRTMPPipeline creates a GStreamer Pipeline -func CreateRTMPPipeline(pipelineDevice string, pipelineDisplay string, pipelineSrc string, pipelineRTMP string) (*gst.Pipeline, error) { - video := fmt.Sprintf(videoSrc, pipelineDisplay, 25) - audio := fmt.Sprintf(audioSrc, pipelineDevice) +func NewBroadcastPipeline(device string, display string, pipelineSrc string, url string) (*gst.Pipeline, error) { + video := fmt.Sprintf(videoSrc, display, 25) + audio := fmt.Sprintf(audioSrc, device) var pipelineStr string if pipelineSrc != "" { // replace RTMP url - pipelineStr = strings.Replace(pipelineSrc, "{url}", pipelineRTMP, -1) + pipelineStr = strings.Replace(pipelineSrc, "{url}", url, -1) // replace audio device - pipelineStr = strings.Replace(pipelineStr, "{device}", pipelineDevice, -1) + pipelineStr = strings.Replace(pipelineStr, "{device}", device, -1) // replace display - pipelineStr = strings.Replace(pipelineStr, "{display}", pipelineDisplay, -1) + pipelineStr = strings.Replace(pipelineStr, "{display}", display, -1) } else { - pipelineStr = fmt.Sprintf("flvmux name=mux ! rtmpsink location='%s live=1' %s audio/x-raw,channels=2 ! audioconvert ! voaacenc ! mux. %s x264enc bframes=0 key-int-max=60 byte-stream=true tune=zerolatency speed-preset=veryfast ! mux.", pipelineRTMP, audio, video) + pipelineStr = fmt.Sprintf("flvmux name=mux ! rtmpsink location='%s live=1' %s audio/x-raw,channels=2 ! audioconvert ! voaacenc ! mux. %s x264enc bframes=0 key-int-max=60 byte-stream=true tune=zerolatency speed-preset=veryfast ! mux.", url, audio, video) } return gst.CreatePipeline(pipelineStr) } -// CreateAppPipeline creates a GStreamer Pipeline -func CreateAppPipeline(codecName string, pipelineDevice string, pipelineSrc string, fps int16, bitrate uint, hwenc string) (*gst.Pipeline, error) { +func NewVideoPipeline(rtpCodec codec.RTPCodec, display string, pipelineSrc string, fps int16, bitrate uint, hwenc string) (*gst.Pipeline, error) { pipelineStr := " ! appsink name=appsink" // if using custom pipeline if pipelineSrc != "" { - pipelineStr = fmt.Sprintf(pipelineSrc+pipelineStr, pipelineDevice) + pipelineStr = fmt.Sprintf(pipelineSrc+pipelineStr, display) return gst.CreatePipeline(pipelineStr) } - switch codecName { - case "VP8": + switch rtpCodec.Name { + case codec.VP8().Name: if hwenc == "VAAPI" { if err := gst.CheckPlugins([]string{"ximagesrc", "vaapi"}); err != nil { return nil, err @@ -70,7 +70,7 @@ func CreateAppPipeline(codecName string, pipelineDevice string, pipelineSrc stri // vp8 encode is missing from gstreamer.freedesktop.org/documentation // note that it was removed from some recent intel CPUs: https://trac.ffmpeg.org/wiki/Hardware/QuickSync // https://gstreamer.freedesktop.org/data/doc/gstreamer/head/gstreamer-vaapi-plugins/html/gstreamer-vaapi-plugins-vaapivp8enc.html - pipelineStr = fmt.Sprintf(videoSrc+"video/x-raw,format=NV12 ! vaapivp8enc rate-control=vbr bitrate=%d keyframe-period=180"+pipelineStr, pipelineDevice, fps, bitrate) + pipelineStr = fmt.Sprintf(videoSrc+"video/x-raw,format=NV12 ! vaapivp8enc rate-control=vbr bitrate=%d keyframe-period=180"+pipelineStr, display, fps, bitrate) } else { // https://gstreamer.freedesktop.org/documentation/vpx/vp8enc.html?gi-language=c // gstreamer1.0-plugins-good @@ -80,7 +80,7 @@ func CreateAppPipeline(codecName string, pipelineDevice string, pipelineSrc stri } pipelineStr = strings.Join([]string{ - fmt.Sprintf(videoSrc, pipelineDevice, fps), + fmt.Sprintf(videoSrc, display, fps), "vp8enc", fmt.Sprintf("target-bitrate=%d", bitrate*650), "cpu-used=4", @@ -97,7 +97,7 @@ func CreateAppPipeline(codecName string, pipelineDevice string, pipelineSrc stri pipelineStr, }, " ") } - case "VP9": + case codec.VP9().Name: // https://gstreamer.freedesktop.org/documentation/vpx/vp9enc.html?gi-language=c // gstreamer1.0-plugins-good // vp9enc @@ -105,8 +105,8 @@ func CreateAppPipeline(codecName string, pipelineDevice string, pipelineSrc stri return nil, err } - pipelineStr = fmt.Sprintf(videoSrc+"vp9enc target-bitrate=%d cpu-used=-5 threads=4 deadline=1 keyframe-max-dist=30 auto-alt-ref=true"+pipelineStr, pipelineDevice, fps, bitrate*1000) - case "H264": + pipelineStr = fmt.Sprintf(videoSrc+"vp9enc target-bitrate=%d cpu-used=-5 threads=4 deadline=1 keyframe-max-dist=30 auto-alt-ref=true"+pipelineStr, display, fps, bitrate*1000) + case codec.H264().Name: if err := gst.CheckPlugins([]string{"ximagesrc"}); err != nil { return nil, err } @@ -116,14 +116,14 @@ func CreateAppPipeline(codecName string, pipelineDevice string, pipelineSrc stri return nil, err } - pipelineStr = fmt.Sprintf(videoSrc+"video/x-raw,format=NV12 ! vaapih264enc rate-control=vbr bitrate=%d keyframe-period=180 quality-level=7 ! video/x-h264,stream-format=byte-stream"+pipelineStr, pipelineDevice, fps, bitrate) + pipelineStr = fmt.Sprintf(videoSrc+"video/x-raw,format=NV12 ! vaapih264enc rate-control=vbr bitrate=%d keyframe-period=180 quality-level=7 ! video/x-h264,stream-format=byte-stream"+pipelineStr, display, fps, bitrate) } else { // https://gstreamer.freedesktop.org/documentation/openh264/openh264enc.html?gi-language=c#openh264enc // gstreamer1.0-plugins-bad // openh264enc multi-thread=4 complexity=high bitrate=3072000 max-bitrate=4096000 if err := gst.CheckPlugins([]string{"openh264"}); err == nil { - pipelineStr = fmt.Sprintf(videoSrc+"openh264enc multi-thread=4 complexity=high bitrate=%d max-bitrate=%d ! video/x-h264,stream-format=byte-stream"+pipelineStr, pipelineDevice, fps, bitrate*1000, (bitrate+1024)*1000) + pipelineStr = fmt.Sprintf(videoSrc+"openh264enc multi-thread=4 complexity=high bitrate=%d max-bitrate=%d ! video/x-h264,stream-format=byte-stream"+pipelineStr, display, fps, bitrate*1000, (bitrate+1024)*1000) break } @@ -139,9 +139,26 @@ func CreateAppPipeline(codecName string, pipelineDevice string, pipelineSrc stri vbvbuf = bitrate } - pipelineStr = fmt.Sprintf(videoSrc+"video/x-raw,format=NV12 ! x264enc threads=4 bitrate=%d key-int-max=60 vbv-buf-capacity=%d byte-stream=true tune=zerolatency speed-preset=veryfast ! video/x-h264,stream-format=byte-stream"+pipelineStr, pipelineDevice, fps, bitrate, vbvbuf) + pipelineStr = fmt.Sprintf(videoSrc+"video/x-raw,format=NV12 ! x264enc threads=4 bitrate=%d key-int-max=60 vbv-buf-capacity=%d byte-stream=true tune=zerolatency speed-preset=veryfast ! video/x-h264,stream-format=byte-stream"+pipelineStr, display, fps, bitrate, vbvbuf) } - case "Opus": + default: + return nil, fmt.Errorf("unknown codec %s", rtpCodec.Name) + } + + return gst.CreatePipeline(pipelineStr) +} + +func NewAudioPipeline(rtpCodec codec.RTPCodec, device string, pipelineSrc string, bitrate uint) (*gst.Pipeline, error) { + pipelineStr := " ! appsink name=appsink" + + // if using custom pipeline + if pipelineSrc != "" { + pipelineStr = fmt.Sprintf(pipelineSrc+pipelineStr, device) + return gst.CreatePipeline(pipelineStr) + } + + switch rtpCodec.Name { + case codec.Opus().Name: // https://gstreamer.freedesktop.org/documentation/opus/opusenc.html // gstreamer1.0-plugins-base // opusenc @@ -149,8 +166,8 @@ func CreateAppPipeline(codecName string, pipelineDevice string, pipelineSrc stri return nil, err } - pipelineStr = fmt.Sprintf(audioSrc+"opusenc inband-fec=true bitrate=%d"+pipelineStr, pipelineDevice, bitrate*1000) - case "G722": + pipelineStr = fmt.Sprintf(audioSrc+"opusenc inband-fec=true bitrate=%d"+pipelineStr, device, bitrate*1000) + case codec.G722().Name: // https://gstreamer.freedesktop.org/documentation/libav/avenc_g722.html?gi-language=c // gstreamer1.0-libav // avenc_g722 @@ -158,8 +175,8 @@ func CreateAppPipeline(codecName string, pipelineDevice string, pipelineSrc stri return nil, err } - pipelineStr = fmt.Sprintf(audioSrc+"avenc_g722 bitrate=%d"+pipelineStr, pipelineDevice, bitrate*1000) - case "PCMU": + pipelineStr = fmt.Sprintf(audioSrc+"avenc_g722 bitrate=%d"+pipelineStr, device, bitrate*1000) + case codec.PCMU().Name: // https://gstreamer.freedesktop.org/documentation/mulaw/mulawenc.html?gi-language=c // gstreamer1.0-plugins-good // audio/x-raw, rate=8000 ! mulawenc @@ -167,8 +184,8 @@ func CreateAppPipeline(codecName string, pipelineDevice string, pipelineSrc stri return nil, err } - pipelineStr = fmt.Sprintf(audioSrc+"audio/x-raw, rate=8000 ! mulawenc"+pipelineStr, pipelineDevice) - case "PCMA": + pipelineStr = fmt.Sprintf(audioSrc+"audio/x-raw, rate=8000 ! mulawenc"+pipelineStr, device) + case codec.PCMA().Name: // https://gstreamer.freedesktop.org/documentation/alaw/alawenc.html?gi-language=c // gstreamer1.0-plugins-good // audio/x-raw, rate=8000 ! alawenc @@ -176,9 +193,9 @@ func CreateAppPipeline(codecName string, pipelineDevice string, pipelineSrc stri return nil, err } - pipelineStr = fmt.Sprintf(audioSrc+"audio/x-raw, rate=8000 ! alawenc"+pipelineStr, pipelineDevice) + pipelineStr = fmt.Sprintf(audioSrc+"audio/x-raw, rate=8000 ! alawenc"+pipelineStr, device) default: - return nil, fmt.Errorf("unknown codec %s", codecName) + return nil, fmt.Errorf("unknown codec %s", rtpCodec.Name) } return gst.CreatePipeline(pipelineStr) diff --git a/server/internal/capture/streamsink.go b/server/internal/capture/streamsink.go new file mode 100644 index 00000000..07209452 --- /dev/null +++ b/server/internal/capture/streamsink.go @@ -0,0 +1,193 @@ +package capture + +import ( + "errors" + "sync" + + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" + + "m1k1o/neko/internal/capture/gst" + "m1k1o/neko/internal/types" + "m1k1o/neko/internal/types/codec" +) + +var moveSinkListenerMu = sync.Mutex{} + +type StreamSinkManagerCtx struct { + logger zerolog.Logger + mu sync.Mutex + wg sync.WaitGroup + + codec codec.RTPCodec + pipeline *gst.Pipeline + pipelineMu sync.Mutex + pipelineFn func() (*gst.Pipeline, error) + + listeners int + listenersMu sync.Mutex + + sampleFn func(sample types.Sample) +} + +func streamSinkNew(codec codec.RTPCodec, pipelineFn func() (*gst.Pipeline, error), video_id string) *StreamSinkManagerCtx { + logger := log.With(). + Str("module", "capture"). + Str("submodule", "stream-sink"). + Str("video_id", video_id).Logger() + + manager := &StreamSinkManagerCtx{ + logger: logger, + codec: codec, + pipelineFn: pipelineFn, + } + + return manager +} + +func (manager *StreamSinkManagerCtx) shutdown() { + manager.logger.Info().Msgf("shutdown") + + manager.destroyPipeline() + manager.wg.Wait() +} + +func (manager *StreamSinkManagerCtx) OnSample(listener func(sample types.Sample)) { + manager.sampleFn = listener +} + +func (manager *StreamSinkManagerCtx) Codec() codec.RTPCodec { + return manager.codec +} + +func (manager *StreamSinkManagerCtx) start() error { + if manager.listeners == 0 { + err := manager.createPipeline() + if err != nil && !errors.Is(err, types.ErrCapturePipelineAlreadyExists) { + return err + } + + manager.logger.Info().Msgf("first listener, starting") + } + + return nil +} + +func (manager *StreamSinkManagerCtx) stop() { + if manager.listeners == 0 { + manager.destroyPipeline() + manager.logger.Info().Msgf("last listener, stopping") + } +} + +func (manager *StreamSinkManagerCtx) addListener() { + manager.listenersMu.Lock() + manager.listeners++ + manager.listenersMu.Unlock() +} + +func (manager *StreamSinkManagerCtx) removeListener() { + manager.listenersMu.Lock() + manager.listeners-- + manager.listenersMu.Unlock() +} + +func (manager *StreamSinkManagerCtx) AddListener() error { + manager.mu.Lock() + defer manager.mu.Unlock() + + // start if stopped + if err := manager.start(); err != nil { + return err + } + + // add listener + manager.addListener() + + return nil +} + +func (manager *StreamSinkManagerCtx) RemoveListener() error { + manager.mu.Lock() + defer manager.mu.Unlock() + + // remove listener + manager.removeListener() + + // stop if started + manager.stop() + + return nil +} + +func (manager *StreamSinkManagerCtx) ListenersCount() int { + manager.listenersMu.Lock() + defer manager.listenersMu.Unlock() + + return manager.listeners +} + +func (manager *StreamSinkManagerCtx) Started() bool { + return manager.ListenersCount() > 0 +} + +func (manager *StreamSinkManagerCtx) createPipeline() error { + manager.pipelineMu.Lock() + defer manager.pipelineMu.Unlock() + + if manager.pipeline != nil { + return types.ErrCapturePipelineAlreadyExists + } + + var err error + + manager.logger.Info(). + Str("codec", manager.codec.Name). + Msgf("creating pipeline") + + manager.pipeline, err = manager.pipelineFn() + if err != nil { + return err + } + + manager.logger.Info(). + Str("codec", manager.codec.Name). + Str("src", manager.pipeline.Src). + Msgf("created pipeline") + + manager.pipeline.AttachAppsink("appsink") + manager.pipeline.Play() + + manager.wg.Add(1) + pipeline := manager.pipeline + + go func() { + manager.logger.Debug().Msg("started emitting samples") + defer manager.wg.Done() + + for { + sample, ok := <-pipeline.Sample + if !ok { + manager.logger.Debug().Msg("stopped emitting samples") + return + } + + manager.sampleFn(sample) + } + }() + + return nil +} + +func (manager *StreamSinkManagerCtx) destroyPipeline() { + manager.pipelineMu.Lock() + defer manager.pipelineMu.Unlock() + + if manager.pipeline == nil { + return + } + + manager.pipeline.Destroy() + manager.logger.Info().Msgf("destroying pipeline") + manager.pipeline = nil +} diff --git a/server/internal/config/broadcast.go b/server/internal/config/broadcast.go deleted file mode 100644 index df29c8bb..00000000 --- a/server/internal/config/broadcast.go +++ /dev/null @@ -1,32 +0,0 @@ -package config - -import ( - "github.com/spf13/cobra" - "github.com/spf13/viper" -) - -type Broadcast struct { - Pipeline string - URL string - Enabled bool -} - -func (Broadcast) Init(cmd *cobra.Command) error { - cmd.PersistentFlags().String("broadcast_pipeline", "", "custom gst pipeline used for broadcasting, strings {url} {device} {display} will be replaced") - if err := viper.BindPFlag("broadcast_pipeline", cmd.PersistentFlags().Lookup("broadcast_pipeline")); err != nil { - return err - } - - cmd.PersistentFlags().String("broadcast_url", "", "URL for broadcasting, setting this value will automatically enable broadcasting") - if err := viper.BindPFlag("broadcast_url", cmd.PersistentFlags().Lookup("broadcast_url")); err != nil { - return err - } - - return nil -} - -func (s *Broadcast) Set() { - s.Pipeline = viper.GetString("broadcast_pipeline") - s.URL = viper.GetString("broadcast_url") - s.Enabled = s.URL != "" -} diff --git a/server/internal/config/capture.go b/server/internal/config/capture.go index 1c50cebc..15370285 100644 --- a/server/internal/config/capture.go +++ b/server/internal/config/capture.go @@ -1,66 +1,45 @@ package config import ( + "m1k1o/neko/internal/types/codec" + "github.com/spf13/cobra" "github.com/spf13/viper" ) type Capture struct { - Display string - Device string - AudioCodec string - AudioParams string - AudioBitrate uint - VideoHWEnc string - VideoCodec string - VideoParams string - VideoBitrate uint - MaxFPS int16 + // video + Display string + VideoCodec codec.RTPCodec + VideoHWEnc string // TODO: Pipeline builder. + VideoBitrate uint // TODO: Pipeline builder. + VideoMaxFPS int16 // TODO: Pipeline builder. + VideoPipeline string + + // audio + AudioDevice string + AudioCodec codec.RTPCodec + AudioBitrate uint // TODO: Pipeline builder. + AudioPipeline string + + // broadcast + BroadcastPipeline string + BroadcastUrl string + BroadcastStarted bool } func (Capture) Init(cmd *cobra.Command) error { + // + // video + // + cmd.PersistentFlags().String("display", ":99.0", "XDisplay to capture") if err := viper.BindPFlag("display", cmd.PersistentFlags().Lookup("display")); err != nil { return err } - cmd.PersistentFlags().String("device", "auto_null.monitor", "audio device to capture") - if err := viper.BindPFlag("device", cmd.PersistentFlags().Lookup("device")); err != nil { - return err - } - - cmd.PersistentFlags().String("audio", "", "audio codec parameters to use for streaming") - if err := viper.BindPFlag("audio", cmd.PersistentFlags().Lookup("audio")); err != nil { - return err - } - - cmd.PersistentFlags().Int("audio_bitrate", 128, "audio bitrate in kbit/s") - if err := viper.BindPFlag("audio_bitrate", cmd.PersistentFlags().Lookup("audio_bitrate")); err != nil { - return err - } - - cmd.PersistentFlags().String("video", "", "video codec parameters to use for streaming") - if err := viper.BindPFlag("video", cmd.PersistentFlags().Lookup("video")); err != nil { - return err - } - - cmd.PersistentFlags().Int("video_bitrate", 3072, "video bitrate in kbit/s") - if err := viper.BindPFlag("video_bitrate", cmd.PersistentFlags().Lookup("video_bitrate")); err != nil { - return err - } - - cmd.PersistentFlags().Int("max_fps", 25, "maximum fps delivered via WebRTC, 0 is for no maximum") - if err := viper.BindPFlag("max_fps", cmd.PersistentFlags().Lookup("max_fps")); err != nil { - return err - } - - // hw encoding - cmd.PersistentFlags().String("hwenc", "", "use hardware accelerated encoding") - if err := viper.BindPFlag("hwenc", cmd.PersistentFlags().Lookup("hwenc")); err != nil { - return err - } - // video codecs + // TODO: video.codec cmd.PersistentFlags().Bool("vp8", false, "use VP8 video codec") if err := viper.BindPFlag("vp8", cmd.PersistentFlags().Lookup("vp8")); err != nil { return err @@ -75,8 +54,39 @@ func (Capture) Init(cmd *cobra.Command) error { if err := viper.BindPFlag("h264", cmd.PersistentFlags().Lookup("h264")); err != nil { return err } + // video codecs + + cmd.PersistentFlags().String("hwenc", "", "use hardware accelerated encoding") + if err := viper.BindPFlag("hwenc", cmd.PersistentFlags().Lookup("hwenc")); err != nil { + return err + } + + cmd.PersistentFlags().Int("video_bitrate", 3072, "video bitrate in kbit/s") + if err := viper.BindPFlag("video_bitrate", cmd.PersistentFlags().Lookup("video_bitrate")); err != nil { + return err + } + + cmd.PersistentFlags().Int("max_fps", 25, "maximum fps delivered via WebRTC, 0 is for no maximum") + if err := viper.BindPFlag("max_fps", cmd.PersistentFlags().Lookup("max_fps")); err != nil { + return err + } + + cmd.PersistentFlags().String("video", "", "video codec parameters to use for streaming") + if err := viper.BindPFlag("video", cmd.PersistentFlags().Lookup("video")); err != nil { + return err + } + + // + // audio + // + + cmd.PersistentFlags().String("device", "auto_null.monitor", "audio device to capture") + if err := viper.BindPFlag("device", cmd.PersistentFlags().Lookup("device")); err != nil { + return err + } // audio codecs + // TODO: audio.codec cmd.PersistentFlags().Bool("opus", false, "use Opus audio codec") if err := viper.BindPFlag("opus", cmd.PersistentFlags().Lookup("opus")); err != nil { return err @@ -96,44 +106,88 @@ func (Capture) Init(cmd *cobra.Command) error { if err := viper.BindPFlag("pcma", cmd.PersistentFlags().Lookup("pcma")); err != nil { return err } + // audio codecs + + cmd.PersistentFlags().Int("audio_bitrate", 128, "audio bitrate in kbit/s") + if err := viper.BindPFlag("audio_bitrate", cmd.PersistentFlags().Lookup("audio_bitrate")); err != nil { + return err + } + + cmd.PersistentFlags().String("audio", "", "audio codec parameters to use for streaming") + if err := viper.BindPFlag("audio", cmd.PersistentFlags().Lookup("audio")); err != nil { + return err + } + + // + // broadcast + // + + cmd.PersistentFlags().String("broadcast_pipeline", "", "custom gst pipeline used for broadcasting, strings {url} {device} {display} will be replaced") + if err := viper.BindPFlag("broadcast_pipeline", cmd.PersistentFlags().Lookup("broadcast_pipeline")); err != nil { + return err + } + + cmd.PersistentFlags().String("broadcast_url", "", "URL for broadcasting, setting this value will automatically enable broadcasting") + if err := viper.BindPFlag("broadcast_url", cmd.PersistentFlags().Lookup("broadcast_url")); err != nil { + return err + } return nil } func (s *Capture) Set() { - audioCodec := "Opus" - if viper.GetBool("opus") { - audioCodec = "Opus" - } else if viper.GetBool("g722") { - audioCodec = "G722" - } else if viper.GetBool("pcmu") { - audioCodec = "PCMU" - } else if viper.GetBool("pcma") { - audioCodec = "PCMA" - } + // + // video + // - s.Device = viper.GetString("device") - s.AudioCodec = audioCodec - s.AudioParams = viper.GetString("audio") - s.AudioBitrate = viper.GetUint("audio_bitrate") + s.Display = viper.GetString("display") - videoCodec := "VP8" + videoCodec := codec.VP8() if viper.GetBool("vp8") { - videoCodec = "VP8" + videoCodec = codec.VP8() } else if viper.GetBool("vp9") { - videoCodec = "VP9" + videoCodec = codec.VP9() } else if viper.GetBool("h264") { - videoCodec = "H264" + videoCodec = codec.H264() } + s.VideoCodec = videoCodec + videoHWEnc := "" if viper.GetString("hwenc") == "VAAPI" { videoHWEnc = "VAAPI" } - s.Display = viper.GetString("display") s.VideoHWEnc = videoHWEnc - s.VideoCodec = videoCodec - s.VideoParams = viper.GetString("video") - s.VideoBitrate = viper.GetUint("video_bitrate") - s.MaxFPS = int16(viper.GetInt("max_fps")) + s.VideoBitrate = viper.GetUint("video_bitrate") + s.VideoMaxFPS = int16(viper.GetInt("max_fps")) + s.VideoPipeline = viper.GetString("video") + + // + // audio + // + + s.AudioDevice = viper.GetString("device") + + audioCodec := codec.Opus() + if viper.GetBool("opus") { + audioCodec = codec.Opus() + } else if viper.GetBool("g722") { + audioCodec = codec.G722() + } else if viper.GetBool("pcmu") { + audioCodec = codec.PCMU() + } else if viper.GetBool("pcma") { + audioCodec = codec.PCMA() + } + s.AudioCodec = audioCodec + + s.AudioBitrate = viper.GetUint("audio_bitrate") + s.AudioPipeline = viper.GetString("audio") + + // + // broadcast + // + + s.BroadcastPipeline = viper.GetString("broadcast_pipeline") + s.BroadcastUrl = viper.GetString("broadcast_url") + s.BroadcastStarted = s.BroadcastUrl != "" } diff --git a/server/internal/desktop/manager.go b/server/internal/desktop/manager.go index a1ad408e..ae43579f 100644 --- a/server/internal/desktop/manager.go +++ b/server/internal/desktop/manager.go @@ -8,7 +8,6 @@ import ( "m1k1o/neko/internal/config" "m1k1o/neko/internal/desktop/xevent" "m1k1o/neko/internal/desktop/xorg" - "m1k1o/neko/internal/types" "github.com/kataras/go-events" "github.com/rs/zerolog" @@ -25,7 +24,7 @@ type DesktopManagerCtx struct { config *config.Desktop } -func New(config *config.Desktop, broadcast types.BroadcastManager) *DesktopManagerCtx { +func New(config *config.Desktop) *DesktopManagerCtx { return &DesktopManagerCtx{ logger: log.With().Str("module", "desktop").Logger(), shutdown: make(chan struct{}), diff --git a/server/internal/session/manager.go b/server/internal/session/manager.go index ed4e4b7c..14a72322 100644 --- a/server/internal/session/manager.go +++ b/server/internal/session/manager.go @@ -45,9 +45,8 @@ func (manager *SessionManager) New(id string, admin bool, socket types.WebSocket manager.mu.Lock() manager.members[id] = session - if !manager.capture.Streaming() && len(manager.members) > 0 { - manager.capture.StartStream() - } + manager.capture.Audio().AddListener() + manager.capture.Video().AddListener() manager.mu.Unlock() manager.emmiter.Emit("created", id, session) @@ -160,9 +159,8 @@ func (manager *SessionManager) Destroy(id string) { err := session.destroy() delete(manager.members, id) - if manager.capture.Streaming() && len(manager.members) <= 0 { - manager.capture.StopStream() - } + manager.capture.Audio().RemoveListener() + manager.capture.Video().RemoveListener() manager.mu.Unlock() manager.emmiter.Emit("destroyed", id, session) diff --git a/server/internal/types/broadcast.go b/server/internal/types/broadcast.go deleted file mode 100644 index 069c1015..00000000 --- a/server/internal/types/broadcast.go +++ /dev/null @@ -1,11 +0,0 @@ -package types - -type BroadcastManager interface { - Shutdown() error - Start() error - Stop() - IsActive() bool - Create(url string) error - Destroy() - GetUrl() string -} diff --git a/server/internal/types/capture.go b/server/internal/types/capture.go index c59d8064..b9229a21 100644 --- a/server/internal/types/capture.go +++ b/server/internal/types/capture.go @@ -1,14 +1,38 @@ package types +import ( + "errors" + + "m1k1o/neko/internal/types/codec" +) + +var ( + ErrCapturePipelineAlreadyExists = errors.New("capture pipeline already exists") +) + +type BroadcastManager interface { + Start(url string) error + Stop() + Started() bool + Url() string +} + +type StreamSinkManager interface { + Codec() codec.RTPCodec + OnSample(listener func(sample Sample)) + + AddListener() error + RemoveListener() error + + ListenersCount() int + Started() bool +} + type CaptureManager interface { - VideoCodec() string - AudioCodec() string Start() Shutdown() error - OnVideoFrame(listener func(sample Sample)) - OnAudioFrame(listener func(sample Sample)) - StartStream() - StopStream() - Streaming() bool - ChangeResolution(width int, height int, rate int16) error + + Broadcast() BroadcastManager + Audio() StreamSinkManager + Video() StreamSinkManager } diff --git a/server/internal/types/codec/codecs.go b/server/internal/types/codec/codecs.go new file mode 100644 index 00000000..f3bc2baa --- /dev/null +++ b/server/internal/types/codec/codecs.go @@ -0,0 +1,170 @@ +package codec + +import ( + "strings" + + "github.com/pion/webrtc/v3" +) + +var RTCPFeedback = []webrtc.RTCPFeedback{ + {Type: webrtc.TypeRTCPFBTransportCC, Parameter: ""}, + {Type: webrtc.TypeRTCPFBGoogREMB, Parameter: ""}, + + // https://www.iana.org/assignments/sdp-parameters/sdp-parameters.xhtml#sdp-parameters-19 + {Type: webrtc.TypeRTCPFBCCM, Parameter: "fir"}, + + // https://www.iana.org/assignments/sdp-parameters/sdp-parameters.xhtml#sdp-parameters-15 + {Type: webrtc.TypeRTCPFBNACK, Parameter: "pli"}, + {Type: webrtc.TypeRTCPFBNACK, Parameter: ""}, +} + +func ParseRTC(codec webrtc.RTPCodecParameters) (RTPCodec, bool) { + codecName := strings.Split(codec.RTPCodecCapability.MimeType, "/")[1] + return ParseStr(codecName) +} + +func ParseStr(codecName string) (codec RTPCodec, ok bool) { + ok = true + + switch strings.ToLower(codecName) { + case VP8().Name: + codec = VP8() + case VP9().Name: + codec = VP9() + case H264().Name: + codec = H264() + case Opus().Name: + codec = Opus() + case G722().Name: + codec = G722() + case PCMU().Name: + codec = PCMU() + case PCMA().Name: + codec = PCMA() + default: + ok = false + } + + return +} + +type RTPCodec struct { + Name string + PayloadType webrtc.PayloadType + Type webrtc.RTPCodecType + Capability webrtc.RTPCodecCapability +} + +func (codec *RTPCodec) Register(engine *webrtc.MediaEngine) error { + return engine.RegisterCodec(webrtc.RTPCodecParameters{ + RTPCodecCapability: codec.Capability, + PayloadType: codec.PayloadType, + }, codec.Type) +} + +func VP8() RTPCodec { + return RTPCodec{ + Name: "vp8", + PayloadType: 96, + Type: webrtc.RTPCodecTypeVideo, + Capability: webrtc.RTPCodecCapability{ + MimeType: webrtc.MimeTypeVP8, + ClockRate: 90000, + Channels: 0, + SDPFmtpLine: "", + RTCPFeedback: RTCPFeedback, + }, + } +} + +// TODO: Profile ID. +func VP9() RTPCodec { + return RTPCodec{ + Name: "vp9", + PayloadType: 98, + Type: webrtc.RTPCodecTypeVideo, + Capability: webrtc.RTPCodecCapability{ + MimeType: webrtc.MimeTypeVP9, + ClockRate: 90000, + Channels: 0, + SDPFmtpLine: "profile-id=0", + RTCPFeedback: RTCPFeedback, + }, + } +} + +// TODO: Profile ID. +func H264() RTPCodec { + return RTPCodec{ + Name: "h264", + PayloadType: 102, + Type: webrtc.RTPCodecTypeVideo, + Capability: webrtc.RTPCodecCapability{ + MimeType: webrtc.MimeTypeH264, + ClockRate: 90000, + Channels: 0, + SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42e01f", + RTCPFeedback: RTCPFeedback, + }, + } +} + +func Opus() RTPCodec { + return RTPCodec{ + Name: "opus", + PayloadType: 111, + Type: webrtc.RTPCodecTypeAudio, + Capability: webrtc.RTPCodecCapability{ + MimeType: webrtc.MimeTypeOpus, + ClockRate: 48000, + Channels: 2, + SDPFmtpLine: "useinbandfec=1", + RTCPFeedback: []webrtc.RTCPFeedback{}, + }, + } +} + +func G722() RTPCodec { + return RTPCodec{ + Name: "g722", + PayloadType: 9, + Type: webrtc.RTPCodecTypeAudio, + Capability: webrtc.RTPCodecCapability{ + MimeType: webrtc.MimeTypeG722, + ClockRate: 8000, + Channels: 0, + SDPFmtpLine: "", + RTCPFeedback: []webrtc.RTCPFeedback{}, + }, + } +} + +func PCMU() RTPCodec { + return RTPCodec{ + Name: "pcmu", + PayloadType: 0, + Type: webrtc.RTPCodecTypeAudio, + Capability: webrtc.RTPCodecCapability{ + MimeType: webrtc.MimeTypePCMU, + ClockRate: 8000, + Channels: 0, + SDPFmtpLine: "", + RTCPFeedback: []webrtc.RTCPFeedback{}, + }, + } +} + +func PCMA() RTPCodec { + return RTPCodec{ + Name: "pcma", + PayloadType: 8, + Type: webrtc.RTPCodecTypeAudio, + Capability: webrtc.RTPCodecCapability{ + MimeType: webrtc.MimeTypePCMA, + ClockRate: 8000, + Channels: 0, + SDPFmtpLine: "", + RTCPFeedback: []webrtc.RTCPFeedback{}, + }, + } +} diff --git a/server/internal/webrtc/webrtc.go b/server/internal/webrtc/webrtc.go index 2eea092f..d914dc97 100644 --- a/server/internal/webrtc/webrtc.go +++ b/server/internal/webrtc/webrtc.go @@ -2,6 +2,7 @@ package webrtc import ( "encoding/json" + "errors" "fmt" "io" "net" @@ -33,8 +34,6 @@ type WebRTCManager struct { logger zerolog.Logger videoTrack *webrtc.TrackLocalStaticSample audioTrack *webrtc.TrackLocalStaticSample - videoCodec webrtc.RTPCodecParameters - audioCodec webrtc.RTPCodecParameters sessions types.SessionManager capture types.CaptureManager desktop types.DesktopManager @@ -44,28 +43,45 @@ type WebRTCManager struct { func (manager *WebRTCManager) Start() { var err error - manager.audioTrack, manager.audioCodec, err = manager.createTrack(manager.capture.AudioCodec()) + + // + // audio + // + + audioCodec := manager.capture.Audio().Codec() + manager.audioTrack, err = webrtc.NewTrackLocalStaticSample(audioCodec.Capability, "audio", "stream") if err != nil { manager.logger.Panic().Err(err).Msg("unable to create audio track") } - manager.capture.OnAudioFrame(func(sample types.Sample) { - if err := manager.audioTrack.WriteSample(media.Sample(sample)); err != nil && err != io.ErrClosedPipe { + manager.capture.Audio().OnSample(func(sample types.Sample) { + err := manager.audioTrack.WriteSample(media.Sample(sample)) + if err != nil && errors.Is(err, io.ErrClosedPipe) { manager.logger.Warn().Err(err).Msg("audio pipeline failed to write") } }) - manager.videoTrack, manager.videoCodec, err = manager.createTrack(manager.capture.VideoCodec()) + // + // video + // + + videoCodec := manager.capture.Video().Codec() + manager.videoTrack, err = webrtc.NewTrackLocalStaticSample(videoCodec.Capability, "video", "stream") if err != nil { manager.logger.Panic().Err(err).Msg("unable to create video track") } - manager.capture.OnVideoFrame(func(sample types.Sample) { - if err := manager.videoTrack.WriteSample(media.Sample(sample)); err != nil && err != io.ErrClosedPipe { + manager.capture.Video().OnSample(func(sample types.Sample) { + err := manager.videoTrack.WriteSample(media.Sample(sample)) + if err != nil && errors.Is(err, io.ErrClosedPipe) { manager.logger.Warn().Err(err).Msg("video pipeline failed to write") } }) + // + // api + // + if err := manager.initAPI(); err != nil { manager.logger.Panic().Err(err).Msg("failed to initialize webrtc API") } @@ -141,8 +157,18 @@ func (manager *WebRTCManager) initAPI() error { // Create MediaEngine with selected codecs engine := webrtc.MediaEngine{} - _ = engine.RegisterCodec(manager.audioCodec, webrtc.RTPCodecTypeAudio) - _ = engine.RegisterCodec(manager.videoCodec, webrtc.RTPCodecTypeVideo) + + audioCodec := manager.capture.Audio().Codec() + _ = engine.RegisterCodec(webrtc.RTPCodecParameters{ + RTPCodecCapability: audioCodec.Capability, + PayloadType: audioCodec.PayloadType, + }, audioCodec.Type) + + videoCodec := manager.capture.Video().Codec() + _ = engine.RegisterCodec(webrtc.RTPCodecParameters{ + RTPCodecCapability: videoCodec.Capability, + PayloadType: videoCodec.PayloadType, + }, videoCodec.Type) // Register Interceptors i := &interceptor.Registry{} @@ -305,43 +331,3 @@ func (manager *WebRTCManager) ICEServers() []webrtc.ICEServer { func (manager *WebRTCManager) ImplicitControl() bool { return manager.config.ImplicitControl } - -func (manager *WebRTCManager) createTrack(codecName string) (*webrtc.TrackLocalStaticSample, webrtc.RTPCodecParameters, error) { - var codec webrtc.RTPCodecParameters - - id := "" - fb := []webrtc.RTCPFeedback{} - - switch codecName { - case "VP8": - codec = webrtc.RTPCodecParameters{RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeVP8, ClockRate: 90000, Channels: 0, SDPFmtpLine: "", RTCPFeedback: fb}, PayloadType: 96} - id = "video" - case "VP9": - codec = webrtc.RTPCodecParameters{RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeVP9, ClockRate: 90000, Channels: 0, SDPFmtpLine: "", RTCPFeedback: fb}, PayloadType: 98} - id = "video" - case "H264": - codec = webrtc.RTPCodecParameters{RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264, ClockRate: 90000, Channels: 0, SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42e01f", RTCPFeedback: fb}, PayloadType: 102} - id = "video" - case "Opus": - codec = webrtc.RTPCodecParameters{RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus, ClockRate: 48000, Channels: 2, SDPFmtpLine: "useinbandfec=1", RTCPFeedback: fb}, PayloadType: 111} - id = "audio" - case "G722": - codec = webrtc.RTPCodecParameters{RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeG722, ClockRate: 8000, Channels: 0, SDPFmtpLine: "", RTCPFeedback: fb}, PayloadType: 9} - id = "audio" - case "PCMU": - codec = webrtc.RTPCodecParameters{RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypePCMU, ClockRate: 8000, Channels: 0, SDPFmtpLine: "", RTCPFeedback: fb}, PayloadType: 0} - id = "audio" - case "PCMA": - codec = webrtc.RTPCodecParameters{RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypePCMA, ClockRate: 8000, Channels: 0, SDPFmtpLine: "", RTCPFeedback: fb}, PayloadType: 8} - id = "audio" - default: - return nil, codec, fmt.Errorf("unknown codec %s", codecName) - } - - track, err := webrtc.NewTrackLocalStaticSample(codec.RTPCodecCapability, id, "stream") - if err != nil { - return nil, codec, err - } - - return track, codec, nil -} diff --git a/server/internal/websocket/handler/broadcast.go b/server/internal/websocket/handler/broadcast.go index 947bb531..d04b4994 100644 --- a/server/internal/websocket/handler/broadcast.go +++ b/server/internal/websocket/handler/broadcast.go @@ -7,18 +7,37 @@ import ( ) func (h *MessageHandler) boradcastCreate(session types.Session, payload *message.BroadcastCreate) error { + broadcast := h.capture.Broadcast() + if !session.Admin() { h.logger.Debug().Msg("user not admin") return nil } - pipelineErr := h.broadcast.Create(payload.URL) - if pipelineErr != nil { + if payload.URL == "" { + return session.Send( + message.SystemMessage{ + Event: event.SYSTEM_ERROR, + Title: "Error while starting broadcast", + Message: "missing broadcast URL", + }) + } + + if broadcast.Started() { + return session.Send( + message.SystemMessage{ + Event: event.SYSTEM_ERROR, + Title: "Error while starting broadcast", + Message: "server is already broadcasting", + }) + } + + if err := broadcast.Start(payload.URL); err != nil { if err := session.Send( message.SystemMessage{ Event: event.SYSTEM_ERROR, Title: "Error while starting broadcast", - Message: pipelineErr.Error(), + Message: err.Error(), }); err != nil { h.logger.Warn().Err(err).Msgf("sending event %s has failed", event.SYSTEM_ERROR) return err @@ -33,12 +52,23 @@ func (h *MessageHandler) boradcastCreate(session types.Session, payload *message } func (h *MessageHandler) boradcastDestroy(session types.Session) error { + broadcast := h.capture.Broadcast() + if !session.Admin() { h.logger.Debug().Msg("user not admin") return nil } - h.broadcast.Destroy() + if !broadcast.Started() { + return session.Send( + message.SystemMessage{ + Event: event.SYSTEM_ERROR, + Title: "Error while stopping broadcast", + Message: "server is not broadcasting", + }) + } + + broadcast.Stop() if err := h.boradcastStatus(nil); err != nil { return err @@ -48,14 +78,17 @@ func (h *MessageHandler) boradcastDestroy(session types.Session) error { } func (h *MessageHandler) boradcastStatus(session types.Session) error { + broadcast := h.capture.Broadcast() + + msg := message.BroadcastStatus{ + Event: event.BORADCAST_STATUS, + IsActive: broadcast.Started(), + URL: broadcast.Url(), + } + // if no session, broadcast change if session == nil { - if err := h.sessions.AdminBroadcast( - message.BroadcastStatus{ - Event: event.BORADCAST_STATUS, - IsActive: h.broadcast.IsActive(), - URL: h.broadcast.GetUrl(), - }, nil); err != nil { + if err := h.sessions.AdminBroadcast(msg, nil); err != nil { h.logger.Warn().Err(err).Msgf("broadcasting event %s has failed", event.BORADCAST_STATUS) return err } @@ -68,12 +101,7 @@ func (h *MessageHandler) boradcastStatus(session types.Session) error { return nil } - if err := session.Send( - message.BroadcastStatus{ - Event: event.BORADCAST_STATUS, - IsActive: h.broadcast.IsActive(), - URL: h.broadcast.GetUrl(), - }); err != nil { + if err := session.Send(msg); err != nil { h.logger.Warn().Err(err).Msgf("sending event %s has failed", event.BORADCAST_STATUS) return err } diff --git a/server/internal/websocket/handler/handler.go b/server/internal/websocket/handler/handler.go index 4699ef4a..b2b529cb 100644 --- a/server/internal/websocket/handler/handler.go +++ b/server/internal/websocket/handler/handler.go @@ -15,13 +15,12 @@ import ( ) type MessageHandler struct { - logger zerolog.Logger - sessions types.SessionManager - desktop types.DesktopManager - capture types.CaptureManager - webrtc types.WebRTCManager - broadcast types.BroadcastManager - state *state.State + logger zerolog.Logger + sessions types.SessionManager + desktop types.DesktopManager + capture types.CaptureManager + webrtc types.WebRTCManager + state *state.State } func New( @@ -29,17 +28,15 @@ func New( desktop types.DesktopManager, capture types.CaptureManager, webrtc types.WebRTCManager, - broadcast types.BroadcastManager, state *state.State, ) *MessageHandler { return &MessageHandler{ - logger: log.With().Str("module", "websocket").Str("submodule", "handler").Logger(), - sessions: sessions, - desktop: desktop, - capture: capture, - webrtc: webrtc, - broadcast: broadcast, - state: state, + logger: log.With().Str("module", "websocket").Str("submodule", "handler").Logger(), + sessions: sessions, + desktop: desktop, + capture: capture, + webrtc: webrtc, + state: state, } } diff --git a/server/internal/websocket/handler/screen.go b/server/internal/websocket/handler/screen.go index a7f18ed4..c4cacc4c 100644 --- a/server/internal/websocket/handler/screen.go +++ b/server/internal/websocket/handler/screen.go @@ -12,7 +12,11 @@ func (h *MessageHandler) screenSet(id string, session types.Session, payload *me return nil } - if err := h.capture.ChangeResolution(payload.Width, payload.Height, payload.Rate); err != nil { + if err := h.desktop.SetScreenSize(types.ScreenSize{ + Width: payload.Width, + Height: payload.Height, + Rate: payload.Rate, + }); err != nil { h.logger.Warn().Err(err).Msgf("unable to change screen size") return err } diff --git a/server/internal/websocket/websocket.go b/server/internal/websocket/websocket.go index bd940fcc..8ec8e62b 100644 --- a/server/internal/websocket/websocket.go +++ b/server/internal/websocket/websocket.go @@ -22,7 +22,7 @@ import ( const CONTROL_PROTECTION_SESSION = "by_control_protection" -func New(sessions types.SessionManager, desktop types.DesktopManager, capture types.CaptureManager, broadcast types.BroadcastManager, webrtc types.WebRTCManager, conf *config.WebSocket) *WebSocketHandler { +func New(sessions types.SessionManager, desktop types.DesktopManager, capture types.CaptureManager, webrtc types.WebRTCManager, conf *config.WebSocket) *WebSocketHandler { logger := log.With().Str("module", "websocket").Logger() state := state.New() @@ -47,7 +47,6 @@ func New(sessions types.SessionManager, desktop types.DesktopManager, capture ty desktop, capture, webrtc, - broadcast, state, ) diff --git a/server/neko.go b/server/neko.go index 07ff68b4..981584e9 100644 --- a/server/neko.go +++ b/server/neko.go @@ -63,7 +63,6 @@ func init() { Server: &config.Server{}, Capture: &config.Capture{}, Desktop: &config.Desktop{}, - Broadcast: &config.Broadcast{}, WebRTC: &config.WebRTC{}, WebSocket: &config.WebSocket{}, } @@ -103,7 +102,6 @@ type Neko struct { Root *config.Root Capture *config.Capture Desktop *config.Desktop - Broadcast *config.Broadcast Server *config.Server WebRTC *config.WebRTC WebSocket *config.WebSocket @@ -113,7 +111,6 @@ type Neko struct { sessionManager *session.SessionManager captureManager *capture.CaptureManagerCtx desktopManager *desktop.DesktopManagerCtx - broadcastManager *capture.BroadcastManager webRTCManager *webrtc.WebRTCManager webSocketHandler *websocket.WebSocketHandler } @@ -123,12 +120,10 @@ func (neko *Neko) Preflight() { } func (neko *Neko) Start() { - broadcastManager := capture.NewBroadcast(neko.Capture, neko.Broadcast) - - desktopManager := desktop.New(neko.Desktop, broadcastManager) + desktopManager := desktop.New(neko.Desktop) desktopManager.Start() - captureManager := capture.New(desktopManager, broadcastManager, neko.Capture) + captureManager := capture.New(desktopManager, neko.Capture) captureManager.Start() sessionManager := session.New(captureManager) @@ -136,13 +131,12 @@ func (neko *Neko) Start() { webRTCManager := webrtc.New(sessionManager, captureManager, desktopManager, neko.WebRTC) webRTCManager.Start() - webSocketHandler := websocket.New(sessionManager, desktopManager, captureManager, broadcastManager, webRTCManager, neko.WebSocket) + webSocketHandler := websocket.New(sessionManager, desktopManager, captureManager, webRTCManager, neko.WebSocket) webSocketHandler.Start() server := http.New(neko.Server, webSocketHandler) server.Start() - neko.broadcastManager = broadcastManager neko.sessionManager = sessionManager neko.captureManager = captureManager neko.desktopManager = desktopManager @@ -168,9 +162,6 @@ func (neko *Neko) Shutdown() { err = neko.desktopManager.Shutdown() neko.logger.Err(err).Msg("desktop manager shutdown") - - err = neko.broadcastManager.Shutdown() - neko.logger.Err(err).Msg("broadcast manager shutdown") } func (neko *Neko) ServeCommand(cmd *cobra.Command, args []string) {