refactor capture with broadcast.

This commit is contained in:
Miroslav Šedivý 2022-09-17 12:43:17 +02:00
parent 72da075972
commit fd43f84bd0
21 changed files with 1135 additions and 570 deletions

View File

@ -21,7 +21,6 @@ func init() {
neko.Service.WebRTC, neko.Service.WebRTC,
neko.Service.Capture, neko.Service.Capture,
neko.Service.Desktop, neko.Service.Desktop,
neko.Service.Broadcast,
neko.Service.WebSocket, neko.Service.WebSocket,
} }

View File

@ -7,98 +7,115 @@ import (
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
"m1k1o/neko/internal/capture/gst" "m1k1o/neko/internal/capture/gst"
"m1k1o/neko/internal/config" "m1k1o/neko/internal/types"
) )
type BroadcastManager struct { type BroacastManagerCtx struct {
mu sync.Mutex logger zerolog.Logger
logger zerolog.Logger mu sync.Mutex
pipeline *gst.Pipeline
capture *config.Capture pipeline *gst.Pipeline
config *config.Broadcast pipelineMu sync.Mutex
enabled bool pipelineFn func(url string) (*gst.Pipeline, error)
url string
url string
started bool
} }
func NewBroadcast(capture *config.Capture, config *config.Broadcast) *BroadcastManager { func broadcastNew(pipelineFn func(url string) (*gst.Pipeline, error), url string, started bool) *BroacastManagerCtx {
return &BroadcastManager{ logger := log.With().
logger: log.With().Str("module", "broadcast").Logger(), Str("module", "capture").
capture: capture, Str("submodule", "broadcast").
config: config, Logger()
enabled: config.Enabled,
url: config.URL, return &BroacastManagerCtx{
logger: logger,
pipelineFn: pipelineFn,
url: url,
started: started,
} }
} }
func (manager *BroadcastManager) Shutdown() error { func (manager *BroacastManagerCtx) shutdown() {
manager.Destroy() manager.logger.Info().Msgf("shutdown")
manager.destroyPipeline()
}
func (manager *BroacastManagerCtx) Start(url string) error {
manager.mu.Lock()
defer manager.mu.Unlock()
err := manager.createPipeline()
if err != nil {
return err
}
manager.url = url
manager.started = true
return nil return nil
} }
func (manager *BroadcastManager) Start() error { func (manager *BroacastManagerCtx) Stop() {
if !manager.enabled || manager.IsActive() { manager.mu.Lock()
return nil defer manager.mu.Unlock()
manager.started = false
manager.destroyPipeline()
}
func (manager *BroacastManagerCtx) Started() bool {
manager.mu.Lock()
defer manager.mu.Unlock()
return manager.started
}
func (manager *BroacastManagerCtx) Url() string {
manager.mu.Lock()
defer manager.mu.Unlock()
return manager.url
}
func (manager *BroacastManagerCtx) createPipeline() error {
manager.pipelineMu.Lock()
defer manager.pipelineMu.Unlock()
if manager.pipeline != nil {
return types.ErrCapturePipelineAlreadyExists
} }
var err error var err error
manager.pipeline, err = CreateRTMPPipeline(
manager.capture.Device,
manager.capture.Display,
manager.config.Pipeline,
manager.url,
)
manager.logger.Info().
Str("url", manager.url).
Msgf("creating pipeline")
manager.pipeline, err = manager.pipelineFn(manager.url)
if err != nil { if err != nil {
manager.pipeline = nil
return err return err
} }
manager.logger.Info(). manager.logger.Info().
Str("audio_device", manager.capture.Device). Str("url", manager.url).
Str("video_display", manager.capture.Display). Str("src", manager.pipeline.Src).
Str("rtmp_pipeline_src", manager.pipeline.Src). Msgf("created pipeline")
Msgf("RTMP pipeline is starting...")
manager.pipeline.Play() manager.pipeline.Play()
return nil return nil
} }
func (manager *BroadcastManager) Stop() { func (manager *BroacastManagerCtx) destroyPipeline() {
if !manager.IsActive() { manager.pipelineMu.Lock()
defer manager.pipelineMu.Unlock()
if manager.pipeline == nil {
return return
} }
manager.pipeline.Stop() manager.pipeline.Destroy()
manager.logger.Info().Msgf("destroying pipeline")
manager.pipeline = nil manager.pipeline = nil
} }
func (manager *BroadcastManager) IsActive() bool {
return manager.pipeline != nil
}
func (manager *BroadcastManager) Create(url string) error {
manager.mu.Lock()
defer manager.mu.Unlock()
manager.url = url
manager.enabled = true
err := manager.Start()
if err != nil {
manager.enabled = false
}
return err
}
func (manager *BroadcastManager) Destroy() {
manager.mu.Lock()
defer manager.mu.Unlock()
manager.Stop()
manager.enabled = false
}
func (manager *BroadcastManager) GetUrl() string {
return manager.url
}

View File

@ -1,83 +1,202 @@
#include "gst.h" #include "gst.h"
typedef struct SampleHandlerUserData { static void gstreamer_pipeline_log(GstPipelineCtx *ctx, char* level, const char* format, ...) {
int pipelineId; va_list argptr;
} SampleHandlerUserData; va_start(argptr, format);
char buffer[100];
void gstreamer_init(void) { vsprintf(buffer, format, argptr);
gst_init(NULL, NULL); va_end(argptr);
goPipelineLog(level, buffer, ctx->pipelineId);
} }
static gboolean gstreamer_send_bus_call(GstBus *bus, GstMessage *msg, gpointer data) { static gboolean gstreamer_bus_call(GstBus *bus, GstMessage *msg, gpointer user_data) {
GstPipelineCtx *ctx = (GstPipelineCtx *)user_data;
switch (GST_MESSAGE_TYPE(msg)) { switch (GST_MESSAGE_TYPE(msg)) {
case GST_MESSAGE_EOS: {
gstreamer_pipeline_log(ctx, "fatal", "end of stream");
break;
}
case GST_MESSAGE_EOS: case GST_MESSAGE_STATE_CHANGED: {
g_print("End of stream\n"); GstState old_state, new_state;
exit(1); gst_message_parse_state_changed(msg, &old_state, &new_state, NULL);
break;
case GST_MESSAGE_ERROR: { gstreamer_pipeline_log(ctx, "debug",
gchar *debug; "element %s changed state from %s to %s",
GError *error; GST_OBJECT_NAME(msg->src),
gst_element_state_get_name(old_state),
gst_element_state_get_name(new_state));
break;
}
gst_message_parse_error(msg, &error, &debug); case GST_MESSAGE_TAG: {
g_free(debug); GstTagList *tags = NULL;
gst_message_parse_tag(msg, &tags);
g_printerr("Error: %s\n", error->message); gstreamer_pipeline_log(ctx, "debug",
g_error_free(error); "got tags from element %s",
exit(1); GST_OBJECT_NAME(msg->src));
}
default: gst_tag_list_unref(tags);
break; break;
}
case GST_MESSAGE_ERROR: {
GError *err = NULL;
gchar *dbg_info = NULL;
gst_message_parse_error(msg, &err, &dbg_info);
gstreamer_pipeline_log(ctx, "error",
"error from element %s: %s",
GST_OBJECT_NAME(msg->src), err->message);
gstreamer_pipeline_log(ctx, "warn",
"debugging info: %s",
(dbg_info) ? dbg_info : "none");
g_error_free(err);
g_free(dbg_info);
break;
}
default:
gstreamer_pipeline_log(ctx, "trace", "unknown message");
break;
} }
return TRUE; return TRUE;
} }
GstFlowReturn gstreamer_send_new_sample_handler(GstElement *object, gpointer user_data) { GstPipelineCtx *gstreamer_pipeline_create(char *pipelineStr, int pipelineId, GError **error) {
GstElement *pipeline = gst_parse_launch(pipelineStr, error);
if (pipeline == NULL) return NULL;
// create gstreamer pipeline context
GstPipelineCtx *ctx = calloc(1, sizeof(GstPipelineCtx));
ctx->pipelineId = pipelineId;
ctx->pipeline = pipeline;
GstBus *bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline));
gst_bus_add_watch(bus, gstreamer_bus_call, ctx);
gst_object_unref(bus);
return ctx;
}
static GstFlowReturn gstreamer_send_new_sample_handler(GstElement *object, gpointer user_data) {
GstPipelineCtx *ctx = (GstPipelineCtx *)user_data;
GstSample *sample = NULL; GstSample *sample = NULL;
GstBuffer *buffer = NULL; GstBuffer *buffer = NULL;
gpointer copy = NULL; gpointer copy = NULL;
gsize copy_size = 0; gsize copy_size = 0;
SampleHandlerUserData *s = (SampleHandlerUserData *)user_data;
g_signal_emit_by_name (object, "pull-sample", &sample); g_signal_emit_by_name(object, "pull-sample", &sample);
if (sample) { if (sample) {
buffer = gst_sample_get_buffer(sample); buffer = gst_sample_get_buffer(sample);
if (buffer) { if (buffer) {
gst_buffer_extract_dup(buffer, 0, gst_buffer_get_size(buffer), &copy, &copy_size); gst_buffer_extract_dup(buffer, 0, gst_buffer_get_size(buffer), &copy, &copy_size);
goHandlePipelineBuffer(copy, copy_size, GST_BUFFER_DURATION(buffer), s->pipelineId); goHandlePipelineBuffer(copy, copy_size, GST_BUFFER_DURATION(buffer), ctx->pipelineId);
} }
gst_sample_unref (sample); gst_sample_unref(sample);
} }
return GST_FLOW_OK; return GST_FLOW_OK;
} }
GstElement *gstreamer_send_create_pipeline(char *pipeline, GError **error) { void gstreamer_pipeline_attach_appsink(GstPipelineCtx *ctx, char *sinkName) {
return gst_parse_launch(pipeline, error); ctx->appsink = gst_bin_get_by_name(GST_BIN(ctx->pipeline), sinkName);
g_object_set(ctx->appsink, "emit-signals", TRUE, NULL);
g_signal_connect(ctx->appsink, "new-sample", G_CALLBACK(gstreamer_send_new_sample_handler), ctx);
} }
void gstreamer_send_start_pipeline(GstElement *pipeline, int pipelineId) { void gstreamer_pipeline_attach_appsrc(GstPipelineCtx *ctx, char *srcName) {
SampleHandlerUserData *s = calloc(1, sizeof(SampleHandlerUserData)); ctx->appsrc = gst_bin_get_by_name(GST_BIN(ctx->pipeline), srcName);
s->pipelineId = pipelineId;
GstBus *bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline));
gst_bus_add_watch(bus, gstreamer_send_bus_call, NULL);
gst_object_unref(bus);
GstElement *appsink = gst_bin_get_by_name(GST_BIN(pipeline), "appsink");
g_object_set(appsink, "emit-signals", TRUE, NULL);
g_signal_connect(appsink, "new-sample", G_CALLBACK(gstreamer_send_new_sample_handler), s);
gst_object_unref(appsink);
gst_element_set_state(pipeline, GST_STATE_PLAYING);
} }
void gstreamer_send_play_pipeline(GstElement *pipeline) { void gstreamer_pipeline_play(GstPipelineCtx *ctx) {
gst_element_set_state(pipeline, GST_STATE_PLAYING); gst_element_set_state(GST_ELEMENT(ctx->pipeline), GST_STATE_PLAYING);
} }
void gstreamer_send_stop_pipeline(GstElement *pipeline) { void gstreamer_pipeline_pause(GstPipelineCtx *ctx) {
gst_element_set_state(pipeline, GST_STATE_NULL); gst_element_set_state(GST_ELEMENT(ctx->pipeline), GST_STATE_PAUSED);
}
void gstreamer_pipeline_destory(GstPipelineCtx *ctx) {
// end appsrc, if exists
if (ctx->appsrc) {
gst_app_src_end_of_stream(GST_APP_SRC(ctx->appsrc));
}
// send pipeline eos
gst_element_send_event(GST_ELEMENT(ctx->pipeline), gst_event_new_eos());
// set null state
gst_element_set_state(GST_ELEMENT(ctx->pipeline), GST_STATE_NULL);
if (ctx->appsink) {
gst_object_unref(ctx->appsink);
ctx->appsink = NULL;
}
if (ctx->appsrc) {
gst_object_unref(ctx->appsrc);
ctx->appsrc = NULL;
}
gst_object_unref(ctx->pipeline);
}
void gstreamer_pipeline_push(GstPipelineCtx *ctx, void *buffer, int bufferLen) {
if (ctx->appsrc != NULL) {
gpointer p = g_memdup(buffer, bufferLen);
GstBuffer *buffer = gst_buffer_new_wrapped(p, bufferLen);
gst_app_src_push_buffer(GST_APP_SRC(ctx->appsrc), buffer);
}
}
gboolean gstreamer_pipeline_set_prop_int(GstPipelineCtx *ctx, char *binName, char *prop, gint value) {
GstElement *el = gst_bin_get_by_name(GST_BIN(ctx->pipeline), binName);
if (el == NULL) return FALSE;
g_object_set(G_OBJECT(el),
prop, value,
NULL);
gst_object_unref(el);
return TRUE;
}
gboolean gstreamer_pipeline_set_caps_framerate(GstPipelineCtx *ctx, const gchar* binName, gint numerator, gint denominator) {
GstElement *el = gst_bin_get_by_name(GST_BIN(ctx->pipeline), binName);
if (el == NULL) return FALSE;
GstCaps *caps = gst_caps_new_simple("video/x-raw",
"framerate", GST_TYPE_FRACTION, numerator, denominator,
NULL);
g_object_set(G_OBJECT(el),
"caps", caps,
NULL);
gst_caps_unref(caps);
gst_object_unref(el);
return TRUE;
}
gboolean gstreamer_pipeline_set_caps_resolution(GstPipelineCtx *ctx, const gchar* binName, gint width, gint height) {
GstElement *el = gst_bin_get_by_name(GST_BIN(ctx->pipeline), binName);
if (el == NULL) return FALSE;
GstCaps *caps = gst_caps_new_simple("video/x-raw",
"width", G_TYPE_INT, width,
"height", G_TYPE_INT, height,
NULL);
g_object_set(G_OBJECT(el),
"caps", caps,
NULL);
gst_caps_unref(caps);
gst_object_unref(el);
return TRUE;
} }

View File

@ -9,68 +9,146 @@ import "C"
import ( import (
"fmt" "fmt"
"sync" "sync"
"sync/atomic"
"time" "time"
"unsafe" "unsafe"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"m1k1o/neko/internal/types" "m1k1o/neko/internal/types"
) )
// Pipeline is a wrapper for a GStreamer Pipeline
type Pipeline struct { type Pipeline struct {
Pipeline *C.GstElement id int
Sample chan types.Sample logger zerolog.Logger
Src string Src string
id int Ctx *C.GstPipelineCtx
Sample chan types.Sample
} }
var pSerial int32
var pipelines = make(map[int]*Pipeline) var pipelines = make(map[int]*Pipeline)
var pipelinesLock sync.Mutex var pipelinesLock sync.Mutex
var registry *C.GstRegistry var registry *C.GstRegistry
func init() { func init() {
C.gstreamer_init() C.gst_init(nil, nil)
registry = C.gst_registry_get() registry = C.gst_registry_get()
} }
// CreatePipeline creates a GStreamer Pipeline
func CreatePipeline(pipelineStr string) (*Pipeline, error) { func CreatePipeline(pipelineStr string) (*Pipeline, error) {
id := atomic.AddInt32(&pSerial, 1)
pipelineStrUnsafe := C.CString(pipelineStr) pipelineStrUnsafe := C.CString(pipelineStr)
defer C.free(unsafe.Pointer(pipelineStrUnsafe)) defer C.free(unsafe.Pointer(pipelineStrUnsafe))
pipelinesLock.Lock() pipelinesLock.Lock()
defer pipelinesLock.Unlock() defer pipelinesLock.Unlock()
var err *C.GError var gstError *C.GError
gstPipeline := C.gstreamer_send_create_pipeline(pipelineStrUnsafe, &err) ctx := C.gstreamer_pipeline_create(pipelineStrUnsafe, C.int(id), &gstError)
if err != nil {
defer C.g_error_free(err) if gstError != nil {
return nil, fmt.Errorf("%s", C.GoString(err.message)) defer C.g_error_free(gstError)
return nil, fmt.Errorf("(pipeline error) %s", C.GoString(gstError.message))
} }
p := &Pipeline{ p := &Pipeline{
Pipeline: gstPipeline, id: int(id),
Sample: make(chan types.Sample), logger: log.With().
Src: pipelineStr, Str("module", "capture").
id: len(pipelines), Str("submodule", "gstreamer").
Int("pipeline_id", int(id)).Logger(),
Src: pipelineStr,
Ctx: ctx,
Sample: make(chan types.Sample),
} }
pipelines[p.id] = p pipelines[p.id] = p
return p, nil return p, nil
} }
// Start starts the GStreamer Pipeline func (p *Pipeline) AttachAppsink(sinkName string) {
func (p *Pipeline) Start() { sinkNameUnsafe := C.CString(sinkName)
C.gstreamer_send_start_pipeline(p.Pipeline, C.int(p.id)) defer C.free(unsafe.Pointer(sinkNameUnsafe))
C.gstreamer_pipeline_attach_appsink(p.Ctx, sinkNameUnsafe)
}
func (p *Pipeline) AttachAppsrc(srcName string) {
srcNameUnsafe := C.CString(srcName)
defer C.free(unsafe.Pointer(srcNameUnsafe))
C.gstreamer_pipeline_attach_appsrc(p.Ctx, srcNameUnsafe)
} }
// Play starts the GStreamer Pipeline
func (p *Pipeline) Play() { func (p *Pipeline) Play() {
C.gstreamer_send_play_pipeline(p.Pipeline) C.gstreamer_pipeline_play(p.Ctx)
} }
// Stop stops the GStreamer Pipeline func (p *Pipeline) Pause() {
func (p *Pipeline) Stop() { C.gstreamer_pipeline_pause(p.Ctx)
C.gstreamer_send_stop_pipeline(p.Pipeline) }
func (p *Pipeline) Destroy() {
C.gstreamer_pipeline_destory(p.Ctx)
pipelinesLock.Lock()
delete(pipelines, p.id)
pipelinesLock.Unlock()
close(p.Sample)
C.free(unsafe.Pointer(p.Ctx))
p = nil
}
func (p *Pipeline) Push(buffer []byte) {
bytes := C.CBytes(buffer)
defer C.free(bytes)
C.gstreamer_pipeline_push(p.Ctx, bytes, C.int(len(buffer)))
}
func (p *Pipeline) SetPropInt(binName string, prop string, value int) bool {
cBinName := C.CString(binName)
defer C.free(unsafe.Pointer(cBinName))
cProp := C.CString(prop)
defer C.free(unsafe.Pointer(cProp))
cValue := C.int(value)
p.logger.Debug().Msgf("setting prop %s of %s to %d", prop, binName, value)
ok := C.gstreamer_pipeline_set_prop_int(p.Ctx, cBinName, cProp, cValue)
return ok == C.TRUE
}
func (p *Pipeline) SetCapsFramerate(binName string, numerator, denominator int) bool {
cBinName := C.CString(binName)
cNumerator := C.int(numerator)
cDenominator := C.int(denominator)
defer C.free(unsafe.Pointer(cBinName))
p.logger.Debug().Msgf("setting caps framerate of %s to %d/%d", binName, numerator, denominator)
ok := C.gstreamer_pipeline_set_caps_framerate(p.Ctx, cBinName, cNumerator, cDenominator)
return ok == C.TRUE
}
func (p *Pipeline) SetCapsResolution(binName string, width, height int) bool {
cBinName := C.CString(binName)
cWidth := C.int(width)
cHeight := C.int(height)
defer C.free(unsafe.Pointer(cBinName))
p.logger.Debug().Msgf("setting caps resolution of %s to %dx%d", binName, width, height)
ok := C.gstreamer_pipeline_set_caps_resolution(p.Ctx, cBinName, cWidth, cHeight)
return ok == C.TRUE
} }
// gst-inspect-1.0 // gst-inspect-1.0
@ -90,14 +168,35 @@ func CheckPlugins(plugins []string) error {
//export goHandlePipelineBuffer //export goHandlePipelineBuffer
func goHandlePipelineBuffer(buffer unsafe.Pointer, bufferLen C.int, duration C.int, pipelineID C.int) { func goHandlePipelineBuffer(buffer unsafe.Pointer, bufferLen C.int, duration C.int, pipelineID C.int) {
defer C.free(buffer)
pipelinesLock.Lock() pipelinesLock.Lock()
pipeline, ok := pipelines[int(pipelineID)] pipeline, ok := pipelines[int(pipelineID)]
pipelinesLock.Unlock() pipelinesLock.Unlock()
if ok { if ok {
pipeline.Sample <- types.Sample{Data: C.GoBytes(buffer, bufferLen), Timestamp: time.Now(), Duration: time.Duration(duration)} pipeline.Sample <- types.Sample{
Data: C.GoBytes(buffer, bufferLen),
Duration: time.Duration(duration),
}
} else { } else {
fmt.Printf("discarding buffer, no pipeline with id %d", int(pipelineID)) log.Warn().
Str("module", "capture").
Str("submodule", "gstreamer").
Int("pipeline_id", int(pipelineID)).
Msgf("discarding sample, pipeline not found")
} }
C.free(buffer) }
//export goPipelineLog
func goPipelineLog(levelUnsafe *C.char, msgUnsafe *C.char, pipelineID C.int) {
levelStr := C.GoString(levelUnsafe)
msg := C.GoString(msgUnsafe)
level, _ := zerolog.ParseLevel(levelStr)
log.WithLevel(level).
Str("module", "capture").
Str("submodule", "gstreamer").
Int("pipeline_id", int(pipelineID)).
Msg(msg)
} }

View File

@ -1,13 +1,27 @@
#pragma once #pragma once
#include <stdio.h>
#include <gst/gst.h> #include <gst/gst.h>
#include <gst/app/gstappsrc.h> #include <gst/app/gstappsrc.h>
typedef struct GstPipelineCtx {
int pipelineId;
GstElement *pipeline;
GstElement *appsink;
GstElement *appsrc;
} GstPipelineCtx;
extern void goHandlePipelineBuffer(void *buffer, int bufferLen, int samples, int pipelineId); extern void goHandlePipelineBuffer(void *buffer, int bufferLen, int samples, int pipelineId);
extern void goPipelineLog(char *level, char *msg, int pipelineId);
GstElement *gstreamer_send_create_pipeline(char *pipeline, GError **error); GstPipelineCtx *gstreamer_pipeline_create(char *pipelineStr, int pipelineId, GError **error);
void gstreamer_pipeline_attach_appsink(GstPipelineCtx *ctx, char *sinkName);
void gstreamer_pipeline_attach_appsrc(GstPipelineCtx *ctx, char *srcName);
void gstreamer_pipeline_play(GstPipelineCtx *ctx);
void gstreamer_pipeline_pause(GstPipelineCtx *ctx);
void gstreamer_pipeline_destory(GstPipelineCtx *ctx);
void gstreamer_pipeline_push(GstPipelineCtx *ctx, void *buffer, int bufferLen);
void gstreamer_send_start_pipeline(GstElement *pipeline, int pipelineId); gboolean gstreamer_pipeline_set_prop_int(GstPipelineCtx *ctx, char *binName, char *prop, gint value);
void gstreamer_send_play_pipeline(GstElement *pipeline); gboolean gstreamer_pipeline_set_caps_framerate(GstPipelineCtx *ctx, const gchar* binName, gint numerator, gint denominator);
void gstreamer_send_stop_pipeline(GstElement *pipeline); gboolean gstreamer_pipeline_set_caps_resolution(GstPipelineCtx *ctx, const gchar* binName, gint width, gint height);
void gstreamer_init(void);

View File

@ -1,199 +1,99 @@
package capture package capture
import ( import (
"time" "errors"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"m1k1o/neko/internal/capture/gst" "m1k1o/neko/internal/capture/gst"
"m1k1o/neko/internal/config" "m1k1o/neko/internal/config"
"m1k1o/neko/internal/types" "m1k1o/neko/internal/types"
"github.com/kataras/go-events"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
) )
type CaptureManagerCtx struct { type CaptureManagerCtx struct {
logger zerolog.Logger logger zerolog.Logger
video *gst.Pipeline desktop types.DesktopManager
audio *gst.Pipeline
config *config.Capture // sinks
broadcast types.BroadcastManager broadcast *BroacastManagerCtx
desktop types.DesktopManager audio *StreamSinkManagerCtx
cleanup *time.Ticker video *StreamSinkManagerCtx
shutdown chan bool
emmiter events.EventEmmiter
streaming bool
} }
func New(desktop types.DesktopManager, broadcast types.BroadcastManager, config *config.Capture) *CaptureManagerCtx { func New(desktop types.DesktopManager, config *config.Capture) *CaptureManagerCtx {
logger := log.With().Str("module", "capture").Logger()
return &CaptureManagerCtx{ return &CaptureManagerCtx{
logger: log.With().Str("module", "capture").Logger(), logger: logger,
cleanup: time.NewTicker(1 * time.Second), desktop: desktop,
shutdown: make(chan bool),
emmiter: events.New(), // sinks
config: config, broadcast: broadcastNew(func(url string) (*gst.Pipeline, error) {
broadcast: broadcast, return NewBroadcastPipeline(config.AudioDevice, config.Display, config.BroadcastPipeline, url)
desktop: desktop, }, config.BroadcastUrl, config.BroadcastStarted),
streaming: false, audio: streamSinkNew(config.AudioCodec, func() (*gst.Pipeline, error) {
return NewAudioPipeline(config.AudioCodec, config.AudioDevice, config.AudioPipeline, config.AudioBitrate)
}, "audio"),
video: streamSinkNew(config.VideoCodec, func() (*gst.Pipeline, error) {
return NewVideoPipeline(config.VideoCodec, config.Display, config.VideoPipeline, config.VideoMaxFPS, config.VideoBitrate, config.VideoHWEnc)
}, "audio"),
} }
} }
func (manager *CaptureManagerCtx) VideoCodec() string {
return manager.config.VideoCodec
}
func (manager *CaptureManagerCtx) AudioCodec() string {
return manager.config.AudioCodec
}
func (manager *CaptureManagerCtx) Start() { func (manager *CaptureManagerCtx) Start() {
manager.createPipelines() if manager.broadcast.Started() {
if err := manager.broadcast.Start(); err != nil { if err := manager.broadcast.createPipeline(); err != nil {
manager.logger.Panic().Err(err).Msg("unable to create rtmp pipeline") manager.logger.Panic().Err(err).Msg("unable to create broadcast pipeline")
}
} }
go func() { manager.desktop.OnBeforeScreenSizeChange(func() {
defer func() { if manager.video.Started() {
manager.logger.Info().Msg("shutdown") manager.video.destroyPipeline()
}() }
for { if manager.broadcast.Started() {
select { manager.broadcast.destroyPipeline()
case <-manager.shutdown: }
return })
case sample := <-manager.video.Sample:
manager.emmiter.Emit("video", sample) manager.desktop.OnAfterScreenSizeChange(func() {
case sample := <-manager.audio.Sample: if manager.video.Started() {
manager.emmiter.Emit("audio", sample) err := manager.video.createPipeline()
case <-manager.cleanup.C: if err != nil && !errors.Is(err, types.ErrCapturePipelineAlreadyExists) {
// TODO: refactor. manager.logger.Panic().Err(err).Msg("unable to recreate video pipeline")
} }
} }
}()
if manager.broadcast.Started() {
err := manager.broadcast.createPipeline()
if err != nil && !errors.Is(err, types.ErrCapturePipelineAlreadyExists) {
manager.logger.Panic().Err(err).Msg("unable to recreate broadcast pipeline")
}
}
})
} }
func (manager *CaptureManagerCtx) Shutdown() error { func (manager *CaptureManagerCtx) Shutdown() error {
manager.logger.Info().Msgf("capture shutting down") manager.logger.Info().Msgf("shutdown")
manager.video.Stop()
manager.audio.Stop()
manager.broadcast.Stop()
manager.cleanup.Stop() manager.broadcast.shutdown()
manager.shutdown <- true
return nil
}
func (manager *CaptureManagerCtx) OnVideoFrame(listener func(sample types.Sample)) { manager.audio.shutdown()
manager.emmiter.On("video", func(payload ...interface{}) { manager.video.shutdown()
listener(payload[0].(types.Sample))
})
}
func (manager *CaptureManagerCtx) OnAudioFrame(listener func(sample types.Sample)) {
manager.emmiter.On("audio", func(payload ...interface{}) {
listener(payload[0].(types.Sample))
})
}
func (manager *CaptureManagerCtx) StartStream() {
manager.createPipelines()
manager.logger.Info().
Str("video_display", manager.config.Display).
Str("video_codec", manager.config.VideoCodec).
Str("audio_device", manager.config.Device).
Str("audio_codec", manager.config.AudioCodec).
Str("audio_pipeline_src", manager.audio.Src).
Str("video_pipeline_src", manager.video.Src).
Msgf("Pipelines starting...")
manager.video.Start()
manager.audio.Start()
manager.streaming = true
}
func (manager *CaptureManagerCtx) StopStream() {
manager.logger.Info().Msgf("Pipelines shutting down...")
manager.video.Stop()
manager.audio.Stop()
manager.streaming = false
}
func (manager *CaptureManagerCtx) Streaming() bool {
return manager.streaming
}
func (manager *CaptureManagerCtx) createPipelines() {
// handle maximum fps
rate := manager.desktop.GetScreenSize().Rate
if manager.config.MaxFPS != 0 && manager.config.MaxFPS < rate {
rate = manager.config.MaxFPS
}
var err error
manager.video, err = CreateAppPipeline(
manager.config.VideoCodec,
manager.config.Display,
manager.config.VideoParams,
rate,
manager.config.VideoBitrate,
manager.config.VideoHWEnc,
)
if err != nil {
manager.logger.Panic().Err(err).Msg("unable to create video pipeline")
}
manager.audio, err = CreateAppPipeline(
manager.config.AudioCodec,
manager.config.Device,
manager.config.AudioParams,
0, // fps: n/a for audio
manager.config.AudioBitrate,
"", // hwenc: n/a for audio
)
if err != nil {
manager.logger.Panic().Err(err).Msg("unable to create audio pipeline")
}
}
func (manager *CaptureManagerCtx) ChangeResolution(width int, height int, rate int16) error {
manager.video.Stop()
manager.broadcast.Stop()
defer func() {
manager.video.Start()
if err := manager.broadcast.Start(); err != nil {
manager.logger.Panic().Err(err).Msg("unable to create rtmp pipeline")
}
manager.logger.Info().Msg("starting video pipeline...")
}()
if err := manager.desktop.SetScreenSize(types.ScreenSize{
Width: width,
Height: height,
Rate: rate,
}); err != nil {
return err
}
// handle maximum fps
if manager.config.MaxFPS != 0 && manager.config.MaxFPS < rate {
rate = manager.config.MaxFPS
}
var err error
manager.video, err = CreateAppPipeline(
manager.config.VideoCodec,
manager.config.Display,
manager.config.VideoParams,
rate,
manager.config.VideoBitrate,
manager.config.VideoHWEnc,
)
if err != nil {
manager.logger.Panic().Err(err).Msg("unable to create new video pipeline")
}
return nil return nil
} }
func (manager *CaptureManagerCtx) Broadcast() types.BroadcastManager {
return manager.broadcast
}
func (manager *CaptureManagerCtx) Audio() types.StreamSinkManager {
return manager.audio
}
func (manager *CaptureManagerCtx) Video() types.StreamSinkManager {
return manager.video
}

View File

@ -2,8 +2,10 @@ package capture
import ( import (
"fmt" "fmt"
"m1k1o/neko/internal/capture/gst"
"strings" "strings"
"m1k1o/neko/internal/capture/gst"
"m1k1o/neko/internal/types/codec"
) )
/* /*
@ -31,38 +33,36 @@ const (
audioSrc = "pulsesrc device=%s ! audio/x-raw,channels=2 ! audioconvert ! " audioSrc = "pulsesrc device=%s ! audio/x-raw,channels=2 ! audioconvert ! "
) )
// CreateRTMPPipeline creates a GStreamer Pipeline func NewBroadcastPipeline(device string, display string, pipelineSrc string, url string) (*gst.Pipeline, error) {
func CreateRTMPPipeline(pipelineDevice string, pipelineDisplay string, pipelineSrc string, pipelineRTMP string) (*gst.Pipeline, error) { video := fmt.Sprintf(videoSrc, display, 25)
video := fmt.Sprintf(videoSrc, pipelineDisplay, 25) audio := fmt.Sprintf(audioSrc, device)
audio := fmt.Sprintf(audioSrc, pipelineDevice)
var pipelineStr string var pipelineStr string
if pipelineSrc != "" { if pipelineSrc != "" {
// replace RTMP url // replace RTMP url
pipelineStr = strings.Replace(pipelineSrc, "{url}", pipelineRTMP, -1) pipelineStr = strings.Replace(pipelineSrc, "{url}", url, -1)
// replace audio device // replace audio device
pipelineStr = strings.Replace(pipelineStr, "{device}", pipelineDevice, -1) pipelineStr = strings.Replace(pipelineStr, "{device}", device, -1)
// replace display // replace display
pipelineStr = strings.Replace(pipelineStr, "{display}", pipelineDisplay, -1) pipelineStr = strings.Replace(pipelineStr, "{display}", display, -1)
} else { } else {
pipelineStr = fmt.Sprintf("flvmux name=mux ! rtmpsink location='%s live=1' %s audio/x-raw,channels=2 ! audioconvert ! voaacenc ! mux. %s x264enc bframes=0 key-int-max=60 byte-stream=true tune=zerolatency speed-preset=veryfast ! mux.", pipelineRTMP, audio, video) pipelineStr = fmt.Sprintf("flvmux name=mux ! rtmpsink location='%s live=1' %s audio/x-raw,channels=2 ! audioconvert ! voaacenc ! mux. %s x264enc bframes=0 key-int-max=60 byte-stream=true tune=zerolatency speed-preset=veryfast ! mux.", url, audio, video)
} }
return gst.CreatePipeline(pipelineStr) return gst.CreatePipeline(pipelineStr)
} }
// CreateAppPipeline creates a GStreamer Pipeline func NewVideoPipeline(rtpCodec codec.RTPCodec, display string, pipelineSrc string, fps int16, bitrate uint, hwenc string) (*gst.Pipeline, error) {
func CreateAppPipeline(codecName string, pipelineDevice string, pipelineSrc string, fps int16, bitrate uint, hwenc string) (*gst.Pipeline, error) {
pipelineStr := " ! appsink name=appsink" pipelineStr := " ! appsink name=appsink"
// if using custom pipeline // if using custom pipeline
if pipelineSrc != "" { if pipelineSrc != "" {
pipelineStr = fmt.Sprintf(pipelineSrc+pipelineStr, pipelineDevice) pipelineStr = fmt.Sprintf(pipelineSrc+pipelineStr, display)
return gst.CreatePipeline(pipelineStr) return gst.CreatePipeline(pipelineStr)
} }
switch codecName { switch rtpCodec.Name {
case "VP8": case codec.VP8().Name:
if hwenc == "VAAPI" { if hwenc == "VAAPI" {
if err := gst.CheckPlugins([]string{"ximagesrc", "vaapi"}); err != nil { if err := gst.CheckPlugins([]string{"ximagesrc", "vaapi"}); err != nil {
return nil, err return nil, err
@ -70,7 +70,7 @@ func CreateAppPipeline(codecName string, pipelineDevice string, pipelineSrc stri
// vp8 encode is missing from gstreamer.freedesktop.org/documentation // vp8 encode is missing from gstreamer.freedesktop.org/documentation
// note that it was removed from some recent intel CPUs: https://trac.ffmpeg.org/wiki/Hardware/QuickSync // note that it was removed from some recent intel CPUs: https://trac.ffmpeg.org/wiki/Hardware/QuickSync
// https://gstreamer.freedesktop.org/data/doc/gstreamer/head/gstreamer-vaapi-plugins/html/gstreamer-vaapi-plugins-vaapivp8enc.html // https://gstreamer.freedesktop.org/data/doc/gstreamer/head/gstreamer-vaapi-plugins/html/gstreamer-vaapi-plugins-vaapivp8enc.html
pipelineStr = fmt.Sprintf(videoSrc+"video/x-raw,format=NV12 ! vaapivp8enc rate-control=vbr bitrate=%d keyframe-period=180"+pipelineStr, pipelineDevice, fps, bitrate) pipelineStr = fmt.Sprintf(videoSrc+"video/x-raw,format=NV12 ! vaapivp8enc rate-control=vbr bitrate=%d keyframe-period=180"+pipelineStr, display, fps, bitrate)
} else { } else {
// https://gstreamer.freedesktop.org/documentation/vpx/vp8enc.html?gi-language=c // https://gstreamer.freedesktop.org/documentation/vpx/vp8enc.html?gi-language=c
// gstreamer1.0-plugins-good // gstreamer1.0-plugins-good
@ -80,7 +80,7 @@ func CreateAppPipeline(codecName string, pipelineDevice string, pipelineSrc stri
} }
pipelineStr = strings.Join([]string{ pipelineStr = strings.Join([]string{
fmt.Sprintf(videoSrc, pipelineDevice, fps), fmt.Sprintf(videoSrc, display, fps),
"vp8enc", "vp8enc",
fmt.Sprintf("target-bitrate=%d", bitrate*650), fmt.Sprintf("target-bitrate=%d", bitrate*650),
"cpu-used=4", "cpu-used=4",
@ -97,7 +97,7 @@ func CreateAppPipeline(codecName string, pipelineDevice string, pipelineSrc stri
pipelineStr, pipelineStr,
}, " ") }, " ")
} }
case "VP9": case codec.VP9().Name:
// https://gstreamer.freedesktop.org/documentation/vpx/vp9enc.html?gi-language=c // https://gstreamer.freedesktop.org/documentation/vpx/vp9enc.html?gi-language=c
// gstreamer1.0-plugins-good // gstreamer1.0-plugins-good
// vp9enc // vp9enc
@ -105,8 +105,8 @@ func CreateAppPipeline(codecName string, pipelineDevice string, pipelineSrc stri
return nil, err return nil, err
} }
pipelineStr = fmt.Sprintf(videoSrc+"vp9enc target-bitrate=%d cpu-used=-5 threads=4 deadline=1 keyframe-max-dist=30 auto-alt-ref=true"+pipelineStr, pipelineDevice, fps, bitrate*1000) pipelineStr = fmt.Sprintf(videoSrc+"vp9enc target-bitrate=%d cpu-used=-5 threads=4 deadline=1 keyframe-max-dist=30 auto-alt-ref=true"+pipelineStr, display, fps, bitrate*1000)
case "H264": case codec.H264().Name:
if err := gst.CheckPlugins([]string{"ximagesrc"}); err != nil { if err := gst.CheckPlugins([]string{"ximagesrc"}); err != nil {
return nil, err return nil, err
} }
@ -116,14 +116,14 @@ func CreateAppPipeline(codecName string, pipelineDevice string, pipelineSrc stri
return nil, err return nil, err
} }
pipelineStr = fmt.Sprintf(videoSrc+"video/x-raw,format=NV12 ! vaapih264enc rate-control=vbr bitrate=%d keyframe-period=180 quality-level=7 ! video/x-h264,stream-format=byte-stream"+pipelineStr, pipelineDevice, fps, bitrate) pipelineStr = fmt.Sprintf(videoSrc+"video/x-raw,format=NV12 ! vaapih264enc rate-control=vbr bitrate=%d keyframe-period=180 quality-level=7 ! video/x-h264,stream-format=byte-stream"+pipelineStr, display, fps, bitrate)
} else { } else {
// https://gstreamer.freedesktop.org/documentation/openh264/openh264enc.html?gi-language=c#openh264enc // https://gstreamer.freedesktop.org/documentation/openh264/openh264enc.html?gi-language=c#openh264enc
// gstreamer1.0-plugins-bad // gstreamer1.0-plugins-bad
// openh264enc multi-thread=4 complexity=high bitrate=3072000 max-bitrate=4096000 // openh264enc multi-thread=4 complexity=high bitrate=3072000 max-bitrate=4096000
if err := gst.CheckPlugins([]string{"openh264"}); err == nil { if err := gst.CheckPlugins([]string{"openh264"}); err == nil {
pipelineStr = fmt.Sprintf(videoSrc+"openh264enc multi-thread=4 complexity=high bitrate=%d max-bitrate=%d ! video/x-h264,stream-format=byte-stream"+pipelineStr, pipelineDevice, fps, bitrate*1000, (bitrate+1024)*1000) pipelineStr = fmt.Sprintf(videoSrc+"openh264enc multi-thread=4 complexity=high bitrate=%d max-bitrate=%d ! video/x-h264,stream-format=byte-stream"+pipelineStr, display, fps, bitrate*1000, (bitrate+1024)*1000)
break break
} }
@ -139,9 +139,26 @@ func CreateAppPipeline(codecName string, pipelineDevice string, pipelineSrc stri
vbvbuf = bitrate vbvbuf = bitrate
} }
pipelineStr = fmt.Sprintf(videoSrc+"video/x-raw,format=NV12 ! x264enc threads=4 bitrate=%d key-int-max=60 vbv-buf-capacity=%d byte-stream=true tune=zerolatency speed-preset=veryfast ! video/x-h264,stream-format=byte-stream"+pipelineStr, pipelineDevice, fps, bitrate, vbvbuf) pipelineStr = fmt.Sprintf(videoSrc+"video/x-raw,format=NV12 ! x264enc threads=4 bitrate=%d key-int-max=60 vbv-buf-capacity=%d byte-stream=true tune=zerolatency speed-preset=veryfast ! video/x-h264,stream-format=byte-stream"+pipelineStr, display, fps, bitrate, vbvbuf)
} }
case "Opus": default:
return nil, fmt.Errorf("unknown codec %s", rtpCodec.Name)
}
return gst.CreatePipeline(pipelineStr)
}
func NewAudioPipeline(rtpCodec codec.RTPCodec, device string, pipelineSrc string, bitrate uint) (*gst.Pipeline, error) {
pipelineStr := " ! appsink name=appsink"
// if using custom pipeline
if pipelineSrc != "" {
pipelineStr = fmt.Sprintf(pipelineSrc+pipelineStr, device)
return gst.CreatePipeline(pipelineStr)
}
switch rtpCodec.Name {
case codec.Opus().Name:
// https://gstreamer.freedesktop.org/documentation/opus/opusenc.html // https://gstreamer.freedesktop.org/documentation/opus/opusenc.html
// gstreamer1.0-plugins-base // gstreamer1.0-plugins-base
// opusenc // opusenc
@ -149,8 +166,8 @@ func CreateAppPipeline(codecName string, pipelineDevice string, pipelineSrc stri
return nil, err return nil, err
} }
pipelineStr = fmt.Sprintf(audioSrc+"opusenc inband-fec=true bitrate=%d"+pipelineStr, pipelineDevice, bitrate*1000) pipelineStr = fmt.Sprintf(audioSrc+"opusenc inband-fec=true bitrate=%d"+pipelineStr, device, bitrate*1000)
case "G722": case codec.G722().Name:
// https://gstreamer.freedesktop.org/documentation/libav/avenc_g722.html?gi-language=c // https://gstreamer.freedesktop.org/documentation/libav/avenc_g722.html?gi-language=c
// gstreamer1.0-libav // gstreamer1.0-libav
// avenc_g722 // avenc_g722
@ -158,8 +175,8 @@ func CreateAppPipeline(codecName string, pipelineDevice string, pipelineSrc stri
return nil, err return nil, err
} }
pipelineStr = fmt.Sprintf(audioSrc+"avenc_g722 bitrate=%d"+pipelineStr, pipelineDevice, bitrate*1000) pipelineStr = fmt.Sprintf(audioSrc+"avenc_g722 bitrate=%d"+pipelineStr, device, bitrate*1000)
case "PCMU": case codec.PCMU().Name:
// https://gstreamer.freedesktop.org/documentation/mulaw/mulawenc.html?gi-language=c // https://gstreamer.freedesktop.org/documentation/mulaw/mulawenc.html?gi-language=c
// gstreamer1.0-plugins-good // gstreamer1.0-plugins-good
// audio/x-raw, rate=8000 ! mulawenc // audio/x-raw, rate=8000 ! mulawenc
@ -167,8 +184,8 @@ func CreateAppPipeline(codecName string, pipelineDevice string, pipelineSrc stri
return nil, err return nil, err
} }
pipelineStr = fmt.Sprintf(audioSrc+"audio/x-raw, rate=8000 ! mulawenc"+pipelineStr, pipelineDevice) pipelineStr = fmt.Sprintf(audioSrc+"audio/x-raw, rate=8000 ! mulawenc"+pipelineStr, device)
case "PCMA": case codec.PCMA().Name:
// https://gstreamer.freedesktop.org/documentation/alaw/alawenc.html?gi-language=c // https://gstreamer.freedesktop.org/documentation/alaw/alawenc.html?gi-language=c
// gstreamer1.0-plugins-good // gstreamer1.0-plugins-good
// audio/x-raw, rate=8000 ! alawenc // audio/x-raw, rate=8000 ! alawenc
@ -176,9 +193,9 @@ func CreateAppPipeline(codecName string, pipelineDevice string, pipelineSrc stri
return nil, err return nil, err
} }
pipelineStr = fmt.Sprintf(audioSrc+"audio/x-raw, rate=8000 ! alawenc"+pipelineStr, pipelineDevice) pipelineStr = fmt.Sprintf(audioSrc+"audio/x-raw, rate=8000 ! alawenc"+pipelineStr, device)
default: default:
return nil, fmt.Errorf("unknown codec %s", codecName) return nil, fmt.Errorf("unknown codec %s", rtpCodec.Name)
} }
return gst.CreatePipeline(pipelineStr) return gst.CreatePipeline(pipelineStr)

View File

@ -0,0 +1,193 @@
package capture
import (
"errors"
"sync"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"m1k1o/neko/internal/capture/gst"
"m1k1o/neko/internal/types"
"m1k1o/neko/internal/types/codec"
)
var moveSinkListenerMu = sync.Mutex{}
type StreamSinkManagerCtx struct {
logger zerolog.Logger
mu sync.Mutex
wg sync.WaitGroup
codec codec.RTPCodec
pipeline *gst.Pipeline
pipelineMu sync.Mutex
pipelineFn func() (*gst.Pipeline, error)
listeners int
listenersMu sync.Mutex
sampleFn func(sample types.Sample)
}
func streamSinkNew(codec codec.RTPCodec, pipelineFn func() (*gst.Pipeline, error), video_id string) *StreamSinkManagerCtx {
logger := log.With().
Str("module", "capture").
Str("submodule", "stream-sink").
Str("video_id", video_id).Logger()
manager := &StreamSinkManagerCtx{
logger: logger,
codec: codec,
pipelineFn: pipelineFn,
}
return manager
}
func (manager *StreamSinkManagerCtx) shutdown() {
manager.logger.Info().Msgf("shutdown")
manager.destroyPipeline()
manager.wg.Wait()
}
func (manager *StreamSinkManagerCtx) OnSample(listener func(sample types.Sample)) {
manager.sampleFn = listener
}
func (manager *StreamSinkManagerCtx) Codec() codec.RTPCodec {
return manager.codec
}
func (manager *StreamSinkManagerCtx) start() error {
if manager.listeners == 0 {
err := manager.createPipeline()
if err != nil && !errors.Is(err, types.ErrCapturePipelineAlreadyExists) {
return err
}
manager.logger.Info().Msgf("first listener, starting")
}
return nil
}
func (manager *StreamSinkManagerCtx) stop() {
if manager.listeners == 0 {
manager.destroyPipeline()
manager.logger.Info().Msgf("last listener, stopping")
}
}
func (manager *StreamSinkManagerCtx) addListener() {
manager.listenersMu.Lock()
manager.listeners++
manager.listenersMu.Unlock()
}
func (manager *StreamSinkManagerCtx) removeListener() {
manager.listenersMu.Lock()
manager.listeners--
manager.listenersMu.Unlock()
}
func (manager *StreamSinkManagerCtx) AddListener() error {
manager.mu.Lock()
defer manager.mu.Unlock()
// start if stopped
if err := manager.start(); err != nil {
return err
}
// add listener
manager.addListener()
return nil
}
func (manager *StreamSinkManagerCtx) RemoveListener() error {
manager.mu.Lock()
defer manager.mu.Unlock()
// remove listener
manager.removeListener()
// stop if started
manager.stop()
return nil
}
func (manager *StreamSinkManagerCtx) ListenersCount() int {
manager.listenersMu.Lock()
defer manager.listenersMu.Unlock()
return manager.listeners
}
func (manager *StreamSinkManagerCtx) Started() bool {
return manager.ListenersCount() > 0
}
func (manager *StreamSinkManagerCtx) createPipeline() error {
manager.pipelineMu.Lock()
defer manager.pipelineMu.Unlock()
if manager.pipeline != nil {
return types.ErrCapturePipelineAlreadyExists
}
var err error
manager.logger.Info().
Str("codec", manager.codec.Name).
Msgf("creating pipeline")
manager.pipeline, err = manager.pipelineFn()
if err != nil {
return err
}
manager.logger.Info().
Str("codec", manager.codec.Name).
Str("src", manager.pipeline.Src).
Msgf("created pipeline")
manager.pipeline.AttachAppsink("appsink")
manager.pipeline.Play()
manager.wg.Add(1)
pipeline := manager.pipeline
go func() {
manager.logger.Debug().Msg("started emitting samples")
defer manager.wg.Done()
for {
sample, ok := <-pipeline.Sample
if !ok {
manager.logger.Debug().Msg("stopped emitting samples")
return
}
manager.sampleFn(sample)
}
}()
return nil
}
func (manager *StreamSinkManagerCtx) destroyPipeline() {
manager.pipelineMu.Lock()
defer manager.pipelineMu.Unlock()
if manager.pipeline == nil {
return
}
manager.pipeline.Destroy()
manager.logger.Info().Msgf("destroying pipeline")
manager.pipeline = nil
}

View File

@ -1,32 +0,0 @@
package config
import (
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
type Broadcast struct {
Pipeline string
URL string
Enabled bool
}
func (Broadcast) Init(cmd *cobra.Command) error {
cmd.PersistentFlags().String("broadcast_pipeline", "", "custom gst pipeline used for broadcasting, strings {url} {device} {display} will be replaced")
if err := viper.BindPFlag("broadcast_pipeline", cmd.PersistentFlags().Lookup("broadcast_pipeline")); err != nil {
return err
}
cmd.PersistentFlags().String("broadcast_url", "", "URL for broadcasting, setting this value will automatically enable broadcasting")
if err := viper.BindPFlag("broadcast_url", cmd.PersistentFlags().Lookup("broadcast_url")); err != nil {
return err
}
return nil
}
func (s *Broadcast) Set() {
s.Pipeline = viper.GetString("broadcast_pipeline")
s.URL = viper.GetString("broadcast_url")
s.Enabled = s.URL != ""
}

View File

@ -1,66 +1,45 @@
package config package config
import ( import (
"m1k1o/neko/internal/types/codec"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/spf13/viper" "github.com/spf13/viper"
) )
type Capture struct { type Capture struct {
Display string // video
Device string Display string
AudioCodec string VideoCodec codec.RTPCodec
AudioParams string VideoHWEnc string // TODO: Pipeline builder.
AudioBitrate uint VideoBitrate uint // TODO: Pipeline builder.
VideoHWEnc string VideoMaxFPS int16 // TODO: Pipeline builder.
VideoCodec string VideoPipeline string
VideoParams string
VideoBitrate uint // audio
MaxFPS int16 AudioDevice string
AudioCodec codec.RTPCodec
AudioBitrate uint // TODO: Pipeline builder.
AudioPipeline string
// broadcast
BroadcastPipeline string
BroadcastUrl string
BroadcastStarted bool
} }
func (Capture) Init(cmd *cobra.Command) error { func (Capture) Init(cmd *cobra.Command) error {
//
// video
//
cmd.PersistentFlags().String("display", ":99.0", "XDisplay to capture") cmd.PersistentFlags().String("display", ":99.0", "XDisplay to capture")
if err := viper.BindPFlag("display", cmd.PersistentFlags().Lookup("display")); err != nil { if err := viper.BindPFlag("display", cmd.PersistentFlags().Lookup("display")); err != nil {
return err return err
} }
cmd.PersistentFlags().String("device", "auto_null.monitor", "audio device to capture")
if err := viper.BindPFlag("device", cmd.PersistentFlags().Lookup("device")); err != nil {
return err
}
cmd.PersistentFlags().String("audio", "", "audio codec parameters to use for streaming")
if err := viper.BindPFlag("audio", cmd.PersistentFlags().Lookup("audio")); err != nil {
return err
}
cmd.PersistentFlags().Int("audio_bitrate", 128, "audio bitrate in kbit/s")
if err := viper.BindPFlag("audio_bitrate", cmd.PersistentFlags().Lookup("audio_bitrate")); err != nil {
return err
}
cmd.PersistentFlags().String("video", "", "video codec parameters to use for streaming")
if err := viper.BindPFlag("video", cmd.PersistentFlags().Lookup("video")); err != nil {
return err
}
cmd.PersistentFlags().Int("video_bitrate", 3072, "video bitrate in kbit/s")
if err := viper.BindPFlag("video_bitrate", cmd.PersistentFlags().Lookup("video_bitrate")); err != nil {
return err
}
cmd.PersistentFlags().Int("max_fps", 25, "maximum fps delivered via WebRTC, 0 is for no maximum")
if err := viper.BindPFlag("max_fps", cmd.PersistentFlags().Lookup("max_fps")); err != nil {
return err
}
// hw encoding
cmd.PersistentFlags().String("hwenc", "", "use hardware accelerated encoding")
if err := viper.BindPFlag("hwenc", cmd.PersistentFlags().Lookup("hwenc")); err != nil {
return err
}
// video codecs // video codecs
// TODO: video.codec
cmd.PersistentFlags().Bool("vp8", false, "use VP8 video codec") cmd.PersistentFlags().Bool("vp8", false, "use VP8 video codec")
if err := viper.BindPFlag("vp8", cmd.PersistentFlags().Lookup("vp8")); err != nil { if err := viper.BindPFlag("vp8", cmd.PersistentFlags().Lookup("vp8")); err != nil {
return err return err
@ -75,8 +54,39 @@ func (Capture) Init(cmd *cobra.Command) error {
if err := viper.BindPFlag("h264", cmd.PersistentFlags().Lookup("h264")); err != nil { if err := viper.BindPFlag("h264", cmd.PersistentFlags().Lookup("h264")); err != nil {
return err return err
} }
// video codecs
cmd.PersistentFlags().String("hwenc", "", "use hardware accelerated encoding")
if err := viper.BindPFlag("hwenc", cmd.PersistentFlags().Lookup("hwenc")); err != nil {
return err
}
cmd.PersistentFlags().Int("video_bitrate", 3072, "video bitrate in kbit/s")
if err := viper.BindPFlag("video_bitrate", cmd.PersistentFlags().Lookup("video_bitrate")); err != nil {
return err
}
cmd.PersistentFlags().Int("max_fps", 25, "maximum fps delivered via WebRTC, 0 is for no maximum")
if err := viper.BindPFlag("max_fps", cmd.PersistentFlags().Lookup("max_fps")); err != nil {
return err
}
cmd.PersistentFlags().String("video", "", "video codec parameters to use for streaming")
if err := viper.BindPFlag("video", cmd.PersistentFlags().Lookup("video")); err != nil {
return err
}
//
// audio
//
cmd.PersistentFlags().String("device", "auto_null.monitor", "audio device to capture")
if err := viper.BindPFlag("device", cmd.PersistentFlags().Lookup("device")); err != nil {
return err
}
// audio codecs // audio codecs
// TODO: audio.codec
cmd.PersistentFlags().Bool("opus", false, "use Opus audio codec") cmd.PersistentFlags().Bool("opus", false, "use Opus audio codec")
if err := viper.BindPFlag("opus", cmd.PersistentFlags().Lookup("opus")); err != nil { if err := viper.BindPFlag("opus", cmd.PersistentFlags().Lookup("opus")); err != nil {
return err return err
@ -96,44 +106,88 @@ func (Capture) Init(cmd *cobra.Command) error {
if err := viper.BindPFlag("pcma", cmd.PersistentFlags().Lookup("pcma")); err != nil { if err := viper.BindPFlag("pcma", cmd.PersistentFlags().Lookup("pcma")); err != nil {
return err return err
} }
// audio codecs
cmd.PersistentFlags().Int("audio_bitrate", 128, "audio bitrate in kbit/s")
if err := viper.BindPFlag("audio_bitrate", cmd.PersistentFlags().Lookup("audio_bitrate")); err != nil {
return err
}
cmd.PersistentFlags().String("audio", "", "audio codec parameters to use for streaming")
if err := viper.BindPFlag("audio", cmd.PersistentFlags().Lookup("audio")); err != nil {
return err
}
//
// broadcast
//
cmd.PersistentFlags().String("broadcast_pipeline", "", "custom gst pipeline used for broadcasting, strings {url} {device} {display} will be replaced")
if err := viper.BindPFlag("broadcast_pipeline", cmd.PersistentFlags().Lookup("broadcast_pipeline")); err != nil {
return err
}
cmd.PersistentFlags().String("broadcast_url", "", "URL for broadcasting, setting this value will automatically enable broadcasting")
if err := viper.BindPFlag("broadcast_url", cmd.PersistentFlags().Lookup("broadcast_url")); err != nil {
return err
}
return nil return nil
} }
func (s *Capture) Set() { func (s *Capture) Set() {
audioCodec := "Opus" //
if viper.GetBool("opus") { // video
audioCodec = "Opus" //
} else if viper.GetBool("g722") {
audioCodec = "G722"
} else if viper.GetBool("pcmu") {
audioCodec = "PCMU"
} else if viper.GetBool("pcma") {
audioCodec = "PCMA"
}
s.Device = viper.GetString("device") s.Display = viper.GetString("display")
s.AudioCodec = audioCodec
s.AudioParams = viper.GetString("audio")
s.AudioBitrate = viper.GetUint("audio_bitrate")
videoCodec := "VP8" videoCodec := codec.VP8()
if viper.GetBool("vp8") { if viper.GetBool("vp8") {
videoCodec = "VP8" videoCodec = codec.VP8()
} else if viper.GetBool("vp9") { } else if viper.GetBool("vp9") {
videoCodec = "VP9" videoCodec = codec.VP9()
} else if viper.GetBool("h264") { } else if viper.GetBool("h264") {
videoCodec = "H264" videoCodec = codec.H264()
} }
s.VideoCodec = videoCodec
videoHWEnc := "" videoHWEnc := ""
if viper.GetString("hwenc") == "VAAPI" { if viper.GetString("hwenc") == "VAAPI" {
videoHWEnc = "VAAPI" videoHWEnc = "VAAPI"
} }
s.Display = viper.GetString("display")
s.VideoHWEnc = videoHWEnc s.VideoHWEnc = videoHWEnc
s.VideoCodec = videoCodec
s.VideoParams = viper.GetString("video")
s.VideoBitrate = viper.GetUint("video_bitrate")
s.MaxFPS = int16(viper.GetInt("max_fps")) s.VideoBitrate = viper.GetUint("video_bitrate")
s.VideoMaxFPS = int16(viper.GetInt("max_fps"))
s.VideoPipeline = viper.GetString("video")
//
// audio
//
s.AudioDevice = viper.GetString("device")
audioCodec := codec.Opus()
if viper.GetBool("opus") {
audioCodec = codec.Opus()
} else if viper.GetBool("g722") {
audioCodec = codec.G722()
} else if viper.GetBool("pcmu") {
audioCodec = codec.PCMU()
} else if viper.GetBool("pcma") {
audioCodec = codec.PCMA()
}
s.AudioCodec = audioCodec
s.AudioBitrate = viper.GetUint("audio_bitrate")
s.AudioPipeline = viper.GetString("audio")
//
// broadcast
//
s.BroadcastPipeline = viper.GetString("broadcast_pipeline")
s.BroadcastUrl = viper.GetString("broadcast_url")
s.BroadcastStarted = s.BroadcastUrl != ""
} }

View File

@ -8,7 +8,6 @@ import (
"m1k1o/neko/internal/config" "m1k1o/neko/internal/config"
"m1k1o/neko/internal/desktop/xevent" "m1k1o/neko/internal/desktop/xevent"
"m1k1o/neko/internal/desktop/xorg" "m1k1o/neko/internal/desktop/xorg"
"m1k1o/neko/internal/types"
"github.com/kataras/go-events" "github.com/kataras/go-events"
"github.com/rs/zerolog" "github.com/rs/zerolog"
@ -25,7 +24,7 @@ type DesktopManagerCtx struct {
config *config.Desktop config *config.Desktop
} }
func New(config *config.Desktop, broadcast types.BroadcastManager) *DesktopManagerCtx { func New(config *config.Desktop) *DesktopManagerCtx {
return &DesktopManagerCtx{ return &DesktopManagerCtx{
logger: log.With().Str("module", "desktop").Logger(), logger: log.With().Str("module", "desktop").Logger(),
shutdown: make(chan struct{}), shutdown: make(chan struct{}),

View File

@ -45,9 +45,8 @@ func (manager *SessionManager) New(id string, admin bool, socket types.WebSocket
manager.mu.Lock() manager.mu.Lock()
manager.members[id] = session manager.members[id] = session
if !manager.capture.Streaming() && len(manager.members) > 0 { manager.capture.Audio().AddListener()
manager.capture.StartStream() manager.capture.Video().AddListener()
}
manager.mu.Unlock() manager.mu.Unlock()
manager.emmiter.Emit("created", id, session) manager.emmiter.Emit("created", id, session)
@ -160,9 +159,8 @@ func (manager *SessionManager) Destroy(id string) {
err := session.destroy() err := session.destroy()
delete(manager.members, id) delete(manager.members, id)
if manager.capture.Streaming() && len(manager.members) <= 0 { manager.capture.Audio().RemoveListener()
manager.capture.StopStream() manager.capture.Video().RemoveListener()
}
manager.mu.Unlock() manager.mu.Unlock()
manager.emmiter.Emit("destroyed", id, session) manager.emmiter.Emit("destroyed", id, session)

View File

@ -1,11 +0,0 @@
package types
type BroadcastManager interface {
Shutdown() error
Start() error
Stop()
IsActive() bool
Create(url string) error
Destroy()
GetUrl() string
}

View File

@ -1,14 +1,38 @@
package types package types
import (
"errors"
"m1k1o/neko/internal/types/codec"
)
var (
ErrCapturePipelineAlreadyExists = errors.New("capture pipeline already exists")
)
type BroadcastManager interface {
Start(url string) error
Stop()
Started() bool
Url() string
}
type StreamSinkManager interface {
Codec() codec.RTPCodec
OnSample(listener func(sample Sample))
AddListener() error
RemoveListener() error
ListenersCount() int
Started() bool
}
type CaptureManager interface { type CaptureManager interface {
VideoCodec() string
AudioCodec() string
Start() Start()
Shutdown() error Shutdown() error
OnVideoFrame(listener func(sample Sample))
OnAudioFrame(listener func(sample Sample)) Broadcast() BroadcastManager
StartStream() Audio() StreamSinkManager
StopStream() Video() StreamSinkManager
Streaming() bool
ChangeResolution(width int, height int, rate int16) error
} }

View File

@ -0,0 +1,170 @@
package codec
import (
"strings"
"github.com/pion/webrtc/v3"
)
var RTCPFeedback = []webrtc.RTCPFeedback{
{Type: webrtc.TypeRTCPFBTransportCC, Parameter: ""},
{Type: webrtc.TypeRTCPFBGoogREMB, Parameter: ""},
// https://www.iana.org/assignments/sdp-parameters/sdp-parameters.xhtml#sdp-parameters-19
{Type: webrtc.TypeRTCPFBCCM, Parameter: "fir"},
// https://www.iana.org/assignments/sdp-parameters/sdp-parameters.xhtml#sdp-parameters-15
{Type: webrtc.TypeRTCPFBNACK, Parameter: "pli"},
{Type: webrtc.TypeRTCPFBNACK, Parameter: ""},
}
func ParseRTC(codec webrtc.RTPCodecParameters) (RTPCodec, bool) {
codecName := strings.Split(codec.RTPCodecCapability.MimeType, "/")[1]
return ParseStr(codecName)
}
func ParseStr(codecName string) (codec RTPCodec, ok bool) {
ok = true
switch strings.ToLower(codecName) {
case VP8().Name:
codec = VP8()
case VP9().Name:
codec = VP9()
case H264().Name:
codec = H264()
case Opus().Name:
codec = Opus()
case G722().Name:
codec = G722()
case PCMU().Name:
codec = PCMU()
case PCMA().Name:
codec = PCMA()
default:
ok = false
}
return
}
type RTPCodec struct {
Name string
PayloadType webrtc.PayloadType
Type webrtc.RTPCodecType
Capability webrtc.RTPCodecCapability
}
func (codec *RTPCodec) Register(engine *webrtc.MediaEngine) error {
return engine.RegisterCodec(webrtc.RTPCodecParameters{
RTPCodecCapability: codec.Capability,
PayloadType: codec.PayloadType,
}, codec.Type)
}
func VP8() RTPCodec {
return RTPCodec{
Name: "vp8",
PayloadType: 96,
Type: webrtc.RTPCodecTypeVideo,
Capability: webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeVP8,
ClockRate: 90000,
Channels: 0,
SDPFmtpLine: "",
RTCPFeedback: RTCPFeedback,
},
}
}
// TODO: Profile ID.
func VP9() RTPCodec {
return RTPCodec{
Name: "vp9",
PayloadType: 98,
Type: webrtc.RTPCodecTypeVideo,
Capability: webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeVP9,
ClockRate: 90000,
Channels: 0,
SDPFmtpLine: "profile-id=0",
RTCPFeedback: RTCPFeedback,
},
}
}
// TODO: Profile ID.
func H264() RTPCodec {
return RTPCodec{
Name: "h264",
PayloadType: 102,
Type: webrtc.RTPCodecTypeVideo,
Capability: webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeH264,
ClockRate: 90000,
Channels: 0,
SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42e01f",
RTCPFeedback: RTCPFeedback,
},
}
}
func Opus() RTPCodec {
return RTPCodec{
Name: "opus",
PayloadType: 111,
Type: webrtc.RTPCodecTypeAudio,
Capability: webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeOpus,
ClockRate: 48000,
Channels: 2,
SDPFmtpLine: "useinbandfec=1",
RTCPFeedback: []webrtc.RTCPFeedback{},
},
}
}
func G722() RTPCodec {
return RTPCodec{
Name: "g722",
PayloadType: 9,
Type: webrtc.RTPCodecTypeAudio,
Capability: webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeG722,
ClockRate: 8000,
Channels: 0,
SDPFmtpLine: "",
RTCPFeedback: []webrtc.RTCPFeedback{},
},
}
}
func PCMU() RTPCodec {
return RTPCodec{
Name: "pcmu",
PayloadType: 0,
Type: webrtc.RTPCodecTypeAudio,
Capability: webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypePCMU,
ClockRate: 8000,
Channels: 0,
SDPFmtpLine: "",
RTCPFeedback: []webrtc.RTCPFeedback{},
},
}
}
func PCMA() RTPCodec {
return RTPCodec{
Name: "pcma",
PayloadType: 8,
Type: webrtc.RTPCodecTypeAudio,
Capability: webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypePCMA,
ClockRate: 8000,
Channels: 0,
SDPFmtpLine: "",
RTCPFeedback: []webrtc.RTCPFeedback{},
},
}
}

View File

@ -2,6 +2,7 @@ package webrtc
import ( import (
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"io" "io"
"net" "net"
@ -33,8 +34,6 @@ type WebRTCManager struct {
logger zerolog.Logger logger zerolog.Logger
videoTrack *webrtc.TrackLocalStaticSample videoTrack *webrtc.TrackLocalStaticSample
audioTrack *webrtc.TrackLocalStaticSample audioTrack *webrtc.TrackLocalStaticSample
videoCodec webrtc.RTPCodecParameters
audioCodec webrtc.RTPCodecParameters
sessions types.SessionManager sessions types.SessionManager
capture types.CaptureManager capture types.CaptureManager
desktop types.DesktopManager desktop types.DesktopManager
@ -44,28 +43,45 @@ type WebRTCManager struct {
func (manager *WebRTCManager) Start() { func (manager *WebRTCManager) Start() {
var err error var err error
manager.audioTrack, manager.audioCodec, err = manager.createTrack(manager.capture.AudioCodec())
//
// audio
//
audioCodec := manager.capture.Audio().Codec()
manager.audioTrack, err = webrtc.NewTrackLocalStaticSample(audioCodec.Capability, "audio", "stream")
if err != nil { if err != nil {
manager.logger.Panic().Err(err).Msg("unable to create audio track") manager.logger.Panic().Err(err).Msg("unable to create audio track")
} }
manager.capture.OnAudioFrame(func(sample types.Sample) { manager.capture.Audio().OnSample(func(sample types.Sample) {
if err := manager.audioTrack.WriteSample(media.Sample(sample)); err != nil && err != io.ErrClosedPipe { err := manager.audioTrack.WriteSample(media.Sample(sample))
if err != nil && errors.Is(err, io.ErrClosedPipe) {
manager.logger.Warn().Err(err).Msg("audio pipeline failed to write") manager.logger.Warn().Err(err).Msg("audio pipeline failed to write")
} }
}) })
manager.videoTrack, manager.videoCodec, err = manager.createTrack(manager.capture.VideoCodec()) //
// video
//
videoCodec := manager.capture.Video().Codec()
manager.videoTrack, err = webrtc.NewTrackLocalStaticSample(videoCodec.Capability, "video", "stream")
if err != nil { if err != nil {
manager.logger.Panic().Err(err).Msg("unable to create video track") manager.logger.Panic().Err(err).Msg("unable to create video track")
} }
manager.capture.OnVideoFrame(func(sample types.Sample) { manager.capture.Video().OnSample(func(sample types.Sample) {
if err := manager.videoTrack.WriteSample(media.Sample(sample)); err != nil && err != io.ErrClosedPipe { err := manager.videoTrack.WriteSample(media.Sample(sample))
if err != nil && errors.Is(err, io.ErrClosedPipe) {
manager.logger.Warn().Err(err).Msg("video pipeline failed to write") manager.logger.Warn().Err(err).Msg("video pipeline failed to write")
} }
}) })
//
// api
//
if err := manager.initAPI(); err != nil { if err := manager.initAPI(); err != nil {
manager.logger.Panic().Err(err).Msg("failed to initialize webrtc API") manager.logger.Panic().Err(err).Msg("failed to initialize webrtc API")
} }
@ -141,8 +157,18 @@ func (manager *WebRTCManager) initAPI() error {
// Create MediaEngine with selected codecs // Create MediaEngine with selected codecs
engine := webrtc.MediaEngine{} engine := webrtc.MediaEngine{}
_ = engine.RegisterCodec(manager.audioCodec, webrtc.RTPCodecTypeAudio)
_ = engine.RegisterCodec(manager.videoCodec, webrtc.RTPCodecTypeVideo) audioCodec := manager.capture.Audio().Codec()
_ = engine.RegisterCodec(webrtc.RTPCodecParameters{
RTPCodecCapability: audioCodec.Capability,
PayloadType: audioCodec.PayloadType,
}, audioCodec.Type)
videoCodec := manager.capture.Video().Codec()
_ = engine.RegisterCodec(webrtc.RTPCodecParameters{
RTPCodecCapability: videoCodec.Capability,
PayloadType: videoCodec.PayloadType,
}, videoCodec.Type)
// Register Interceptors // Register Interceptors
i := &interceptor.Registry{} i := &interceptor.Registry{}
@ -305,43 +331,3 @@ func (manager *WebRTCManager) ICEServers() []webrtc.ICEServer {
func (manager *WebRTCManager) ImplicitControl() bool { func (manager *WebRTCManager) ImplicitControl() bool {
return manager.config.ImplicitControl return manager.config.ImplicitControl
} }
func (manager *WebRTCManager) createTrack(codecName string) (*webrtc.TrackLocalStaticSample, webrtc.RTPCodecParameters, error) {
var codec webrtc.RTPCodecParameters
id := ""
fb := []webrtc.RTCPFeedback{}
switch codecName {
case "VP8":
codec = webrtc.RTPCodecParameters{RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeVP8, ClockRate: 90000, Channels: 0, SDPFmtpLine: "", RTCPFeedback: fb}, PayloadType: 96}
id = "video"
case "VP9":
codec = webrtc.RTPCodecParameters{RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeVP9, ClockRate: 90000, Channels: 0, SDPFmtpLine: "", RTCPFeedback: fb}, PayloadType: 98}
id = "video"
case "H264":
codec = webrtc.RTPCodecParameters{RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264, ClockRate: 90000, Channels: 0, SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42e01f", RTCPFeedback: fb}, PayloadType: 102}
id = "video"
case "Opus":
codec = webrtc.RTPCodecParameters{RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus, ClockRate: 48000, Channels: 2, SDPFmtpLine: "useinbandfec=1", RTCPFeedback: fb}, PayloadType: 111}
id = "audio"
case "G722":
codec = webrtc.RTPCodecParameters{RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeG722, ClockRate: 8000, Channels: 0, SDPFmtpLine: "", RTCPFeedback: fb}, PayloadType: 9}
id = "audio"
case "PCMU":
codec = webrtc.RTPCodecParameters{RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypePCMU, ClockRate: 8000, Channels: 0, SDPFmtpLine: "", RTCPFeedback: fb}, PayloadType: 0}
id = "audio"
case "PCMA":
codec = webrtc.RTPCodecParameters{RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypePCMA, ClockRate: 8000, Channels: 0, SDPFmtpLine: "", RTCPFeedback: fb}, PayloadType: 8}
id = "audio"
default:
return nil, codec, fmt.Errorf("unknown codec %s", codecName)
}
track, err := webrtc.NewTrackLocalStaticSample(codec.RTPCodecCapability, id, "stream")
if err != nil {
return nil, codec, err
}
return track, codec, nil
}

View File

@ -7,18 +7,37 @@ import (
) )
func (h *MessageHandler) boradcastCreate(session types.Session, payload *message.BroadcastCreate) error { func (h *MessageHandler) boradcastCreate(session types.Session, payload *message.BroadcastCreate) error {
broadcast := h.capture.Broadcast()
if !session.Admin() { if !session.Admin() {
h.logger.Debug().Msg("user not admin") h.logger.Debug().Msg("user not admin")
return nil return nil
} }
pipelineErr := h.broadcast.Create(payload.URL) if payload.URL == "" {
if pipelineErr != nil { return session.Send(
message.SystemMessage{
Event: event.SYSTEM_ERROR,
Title: "Error while starting broadcast",
Message: "missing broadcast URL",
})
}
if broadcast.Started() {
return session.Send(
message.SystemMessage{
Event: event.SYSTEM_ERROR,
Title: "Error while starting broadcast",
Message: "server is already broadcasting",
})
}
if err := broadcast.Start(payload.URL); err != nil {
if err := session.Send( if err := session.Send(
message.SystemMessage{ message.SystemMessage{
Event: event.SYSTEM_ERROR, Event: event.SYSTEM_ERROR,
Title: "Error while starting broadcast", Title: "Error while starting broadcast",
Message: pipelineErr.Error(), Message: err.Error(),
}); err != nil { }); err != nil {
h.logger.Warn().Err(err).Msgf("sending event %s has failed", event.SYSTEM_ERROR) h.logger.Warn().Err(err).Msgf("sending event %s has failed", event.SYSTEM_ERROR)
return err return err
@ -33,12 +52,23 @@ func (h *MessageHandler) boradcastCreate(session types.Session, payload *message
} }
func (h *MessageHandler) boradcastDestroy(session types.Session) error { func (h *MessageHandler) boradcastDestroy(session types.Session) error {
broadcast := h.capture.Broadcast()
if !session.Admin() { if !session.Admin() {
h.logger.Debug().Msg("user not admin") h.logger.Debug().Msg("user not admin")
return nil return nil
} }
h.broadcast.Destroy() if !broadcast.Started() {
return session.Send(
message.SystemMessage{
Event: event.SYSTEM_ERROR,
Title: "Error while stopping broadcast",
Message: "server is not broadcasting",
})
}
broadcast.Stop()
if err := h.boradcastStatus(nil); err != nil { if err := h.boradcastStatus(nil); err != nil {
return err return err
@ -48,14 +78,17 @@ func (h *MessageHandler) boradcastDestroy(session types.Session) error {
} }
func (h *MessageHandler) boradcastStatus(session types.Session) error { func (h *MessageHandler) boradcastStatus(session types.Session) error {
broadcast := h.capture.Broadcast()
msg := message.BroadcastStatus{
Event: event.BORADCAST_STATUS,
IsActive: broadcast.Started(),
URL: broadcast.Url(),
}
// if no session, broadcast change // if no session, broadcast change
if session == nil { if session == nil {
if err := h.sessions.AdminBroadcast( if err := h.sessions.AdminBroadcast(msg, nil); err != nil {
message.BroadcastStatus{
Event: event.BORADCAST_STATUS,
IsActive: h.broadcast.IsActive(),
URL: h.broadcast.GetUrl(),
}, nil); err != nil {
h.logger.Warn().Err(err).Msgf("broadcasting event %s has failed", event.BORADCAST_STATUS) h.logger.Warn().Err(err).Msgf("broadcasting event %s has failed", event.BORADCAST_STATUS)
return err return err
} }
@ -68,12 +101,7 @@ func (h *MessageHandler) boradcastStatus(session types.Session) error {
return nil return nil
} }
if err := session.Send( if err := session.Send(msg); err != nil {
message.BroadcastStatus{
Event: event.BORADCAST_STATUS,
IsActive: h.broadcast.IsActive(),
URL: h.broadcast.GetUrl(),
}); err != nil {
h.logger.Warn().Err(err).Msgf("sending event %s has failed", event.BORADCAST_STATUS) h.logger.Warn().Err(err).Msgf("sending event %s has failed", event.BORADCAST_STATUS)
return err return err
} }

View File

@ -15,13 +15,12 @@ import (
) )
type MessageHandler struct { type MessageHandler struct {
logger zerolog.Logger logger zerolog.Logger
sessions types.SessionManager sessions types.SessionManager
desktop types.DesktopManager desktop types.DesktopManager
capture types.CaptureManager capture types.CaptureManager
webrtc types.WebRTCManager webrtc types.WebRTCManager
broadcast types.BroadcastManager state *state.State
state *state.State
} }
func New( func New(
@ -29,17 +28,15 @@ func New(
desktop types.DesktopManager, desktop types.DesktopManager,
capture types.CaptureManager, capture types.CaptureManager,
webrtc types.WebRTCManager, webrtc types.WebRTCManager,
broadcast types.BroadcastManager,
state *state.State, state *state.State,
) *MessageHandler { ) *MessageHandler {
return &MessageHandler{ return &MessageHandler{
logger: log.With().Str("module", "websocket").Str("submodule", "handler").Logger(), logger: log.With().Str("module", "websocket").Str("submodule", "handler").Logger(),
sessions: sessions, sessions: sessions,
desktop: desktop, desktop: desktop,
capture: capture, capture: capture,
webrtc: webrtc, webrtc: webrtc,
broadcast: broadcast, state: state,
state: state,
} }
} }

View File

@ -12,7 +12,11 @@ func (h *MessageHandler) screenSet(id string, session types.Session, payload *me
return nil return nil
} }
if err := h.capture.ChangeResolution(payload.Width, payload.Height, payload.Rate); err != nil { if err := h.desktop.SetScreenSize(types.ScreenSize{
Width: payload.Width,
Height: payload.Height,
Rate: payload.Rate,
}); err != nil {
h.logger.Warn().Err(err).Msgf("unable to change screen size") h.logger.Warn().Err(err).Msgf("unable to change screen size")
return err return err
} }

View File

@ -22,7 +22,7 @@ import (
const CONTROL_PROTECTION_SESSION = "by_control_protection" const CONTROL_PROTECTION_SESSION = "by_control_protection"
func New(sessions types.SessionManager, desktop types.DesktopManager, capture types.CaptureManager, broadcast types.BroadcastManager, webrtc types.WebRTCManager, conf *config.WebSocket) *WebSocketHandler { func New(sessions types.SessionManager, desktop types.DesktopManager, capture types.CaptureManager, webrtc types.WebRTCManager, conf *config.WebSocket) *WebSocketHandler {
logger := log.With().Str("module", "websocket").Logger() logger := log.With().Str("module", "websocket").Logger()
state := state.New() state := state.New()
@ -47,7 +47,6 @@ func New(sessions types.SessionManager, desktop types.DesktopManager, capture ty
desktop, desktop,
capture, capture,
webrtc, webrtc,
broadcast,
state, state,
) )

View File

@ -63,7 +63,6 @@ func init() {
Server: &config.Server{}, Server: &config.Server{},
Capture: &config.Capture{}, Capture: &config.Capture{},
Desktop: &config.Desktop{}, Desktop: &config.Desktop{},
Broadcast: &config.Broadcast{},
WebRTC: &config.WebRTC{}, WebRTC: &config.WebRTC{},
WebSocket: &config.WebSocket{}, WebSocket: &config.WebSocket{},
} }
@ -103,7 +102,6 @@ type Neko struct {
Root *config.Root Root *config.Root
Capture *config.Capture Capture *config.Capture
Desktop *config.Desktop Desktop *config.Desktop
Broadcast *config.Broadcast
Server *config.Server Server *config.Server
WebRTC *config.WebRTC WebRTC *config.WebRTC
WebSocket *config.WebSocket WebSocket *config.WebSocket
@ -113,7 +111,6 @@ type Neko struct {
sessionManager *session.SessionManager sessionManager *session.SessionManager
captureManager *capture.CaptureManagerCtx captureManager *capture.CaptureManagerCtx
desktopManager *desktop.DesktopManagerCtx desktopManager *desktop.DesktopManagerCtx
broadcastManager *capture.BroadcastManager
webRTCManager *webrtc.WebRTCManager webRTCManager *webrtc.WebRTCManager
webSocketHandler *websocket.WebSocketHandler webSocketHandler *websocket.WebSocketHandler
} }
@ -123,12 +120,10 @@ func (neko *Neko) Preflight() {
} }
func (neko *Neko) Start() { func (neko *Neko) Start() {
broadcastManager := capture.NewBroadcast(neko.Capture, neko.Broadcast) desktopManager := desktop.New(neko.Desktop)
desktopManager := desktop.New(neko.Desktop, broadcastManager)
desktopManager.Start() desktopManager.Start()
captureManager := capture.New(desktopManager, broadcastManager, neko.Capture) captureManager := capture.New(desktopManager, neko.Capture)
captureManager.Start() captureManager.Start()
sessionManager := session.New(captureManager) sessionManager := session.New(captureManager)
@ -136,13 +131,12 @@ func (neko *Neko) Start() {
webRTCManager := webrtc.New(sessionManager, captureManager, desktopManager, neko.WebRTC) webRTCManager := webrtc.New(sessionManager, captureManager, desktopManager, neko.WebRTC)
webRTCManager.Start() webRTCManager.Start()
webSocketHandler := websocket.New(sessionManager, desktopManager, captureManager, broadcastManager, webRTCManager, neko.WebSocket) webSocketHandler := websocket.New(sessionManager, desktopManager, captureManager, webRTCManager, neko.WebSocket)
webSocketHandler.Start() webSocketHandler.Start()
server := http.New(neko.Server, webSocketHandler) server := http.New(neko.Server, webSocketHandler)
server.Start() server.Start()
neko.broadcastManager = broadcastManager
neko.sessionManager = sessionManager neko.sessionManager = sessionManager
neko.captureManager = captureManager neko.captureManager = captureManager
neko.desktopManager = desktopManager neko.desktopManager = desktopManager
@ -168,9 +162,6 @@ func (neko *Neko) Shutdown() {
err = neko.desktopManager.Shutdown() err = neko.desktopManager.Shutdown()
neko.logger.Err(err).Msg("desktop manager shutdown") neko.logger.Err(err).Msg("desktop manager shutdown")
err = neko.broadcastManager.Shutdown()
neko.logger.Err(err).Msg("broadcast manager shutdown")
} }
func (neko *Neko) ServeCommand(cmd *cobra.Command, args []string) { func (neko *Neko) ServeCommand(cmd *cobra.Command, args []string) {