This commit is contained in:
Miroslav Šedivý 2023-01-28 22:08:36 +01:00
parent dfe8b8b57d
commit ee13e40d4c
16 changed files with 135 additions and 103 deletions

View File

@ -177,9 +177,9 @@ func goHandlePipelineBuffer(buffer unsafe.Pointer, bufferLen C.int, duration C.i
if ok {
pipeline.Sample <- types.Sample{
Data: C.GoBytes(buffer, bufferLen),
Data: C.GoBytes(buffer, bufferLen),
Timestamp: time.Now(),
Duration: time.Duration(duration),
Duration: time.Duration(duration),
}
} else {
log.Warn().

View File

@ -18,7 +18,6 @@ type CaptureManagerCtx struct {
broadcast *BroacastManagerCtx
audio *StreamSinkManagerCtx
video *StreamSinkManagerCtx
}
func New(desktop types.DesktopManager, config *config.Capture) *CaptureManagerCtx {
@ -53,7 +52,8 @@ func (manager *CaptureManagerCtx) Start() {
go func() {
for {
_ = <- manager.desktop.GetBeforeScreenSizeChangeChannel()
<-manager.desktop.GetBeforeScreenSizeChangeChannel()
if manager.video.Started() {
manager.video.destroyPipeline()
}
@ -66,7 +66,8 @@ func (manager *CaptureManagerCtx) Start() {
go func() {
for {
framerate := <- manager.desktop.GetAfterScreenSizeChangeChannel();
framerate := <-manager.desktop.GetAfterScreenSizeChangeChannel()
if manager.video.Started() {
manager.video.SetChangeFramerate(framerate)
err := manager.video.createPipeline()

View File

@ -107,26 +107,27 @@ func NewVideoPipeline(rtpCodec codec.RTPCodec, display string, pipelineSrc strin
pipelineStr = fmt.Sprintf(videoSrc+"vp9enc target-bitrate=%d cpu-used=-5 threads=4 deadline=1 keyframe-max-dist=30 auto-alt-ref=true"+pipelineStr, display, fps, bitrate*1000)
case codec.AV1().Name:
// https://gstreamer.freedesktop.org/documentation/aom/av1enc.html?gi-language=c
// gstreamer1.0-plugins-bad
// av1enc usage-profile=1
if err := gst.CheckPlugins([]string{"ximagesrc", "vpx"}); err != nil {
return "", err
}
// https://gstreamer.freedesktop.org/documentation/aom/av1enc.html?gi-language=c
// gstreamer1.0-plugins-bad
// av1enc usage-profile=1
// TODO: check for plugin.
if err := gst.CheckPlugins([]string{"ximagesrc", "vpx"}); err != nil {
return "", err
}
pipelineStr = strings.Join([]string{
fmt.Sprintf(videoSrc, display, fps),
"av1enc",
fmt.Sprintf("target-bitrate=%d", bitrate*650),
"cpu-used=4",
"end-usage=cbr",
// "usage-profile=realtime",
"undershoot=95",
"keyframe-max-dist=25",
"min-quantizer=4",
"max-quantizer=20",
pipelineStr,
}, " ")
pipelineStr = strings.Join([]string{
fmt.Sprintf(videoSrc, display, fps),
"av1enc",
fmt.Sprintf("target-bitrate=%d", bitrate*650),
"cpu-used=4",
"end-usage=cbr",
// "usage-profile=realtime",
"undershoot=95",
"keyframe-max-dist=25",
"min-quantizer=4",
"max-quantizer=20",
pipelineStr,
}, " ")
case codec.H264().Name:
if err := gst.CheckPlugins([]string{"ximagesrc"}); err != nil {
return "", err

View File

@ -2,9 +2,9 @@ package capture
import (
"errors"
"sync"
"regexp"
"strconv"
"sync"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
@ -143,7 +143,7 @@ func (manager *StreamSinkManagerCtx) createPipeline() error {
if manager.changeFramerate > 0 && manager.adaptiveFramerate {
m1 := regexp.MustCompile(`framerate=\d+/1`)
pipelineStr = m1.ReplaceAllString(pipelineStr, "framerate=" + strconv.FormatInt(int64(manager.changeFramerate), 10) + "/1")
pipelineStr = m1.ReplaceAllString(pipelineStr, "framerate="+strconv.FormatInt(int64(manager.changeFramerate), 10)+"/1")
}
manager.logger.Info().
@ -180,7 +180,7 @@ func (manager *StreamSinkManagerCtx) destroyPipeline() {
manager.pipeline = nil
}
func (manager *StreamSinkManagerCtx) GetSampleChannel() (chan types.Sample) {
func (manager *StreamSinkManagerCtx) GetSampleChannel() chan types.Sample {
if manager.pipeline != nil {
return manager.pipeline.Sample
}

View File

@ -11,12 +11,12 @@ import (
type Capture struct {
// video
Display string
VideoCodec codec.RTPCodec
VideoHWEnc string // TODO: Pipeline builder.
VideoBitrate uint // TODO: Pipeline builder.
VideoMaxFPS int16 // TODO: Pipeline builder.
VideoPipeline string
Display string
VideoCodec codec.RTPCodec
VideoHWEnc string // TODO: Pipeline builder.
VideoBitrate uint // TODO: Pipeline builder.
VideoMaxFPS int16 // TODO: Pipeline builder.
VideoPipeline string
VideoAdaptiveFramerate bool
// audio
@ -60,7 +60,7 @@ func (Capture) Init(cmd *cobra.Command) error {
// DEPRECATED: video codec
cmd.PersistentFlags().Bool("av1", false, "DEPRECATED: use video_codec")
if err := viper.BindPFlag("av1", cmd.PersistentFlags().Lookup("av1")); err != nil {
return err
return err
}
// DEPRECATED: video codec

View File

@ -16,21 +16,21 @@ import (
var mu = sync.Mutex{}
type DesktopManagerCtx struct {
logger zerolog.Logger
wg sync.WaitGroup
shutdown chan struct{}
beforeScreenSizeChangeChannel chan bool
afterScreenSizeChangeChannel chan int16
config *config.Desktop
logger zerolog.Logger
wg sync.WaitGroup
shutdown chan struct{}
beforeScreenSizeChangeChannel chan bool
afterScreenSizeChangeChannel chan int16
config *config.Desktop
}
func New(config *config.Desktop) *DesktopManagerCtx {
return &DesktopManagerCtx{
logger: log.With().Str("module", "desktop").Logger(),
shutdown: make(chan struct{}),
beforeScreenSizeChangeChannel: make (chan bool),
afterScreenSizeChangeChannel: make (chan int16),
config: config,
logger: log.With().Str("module", "desktop").Logger(),
shutdown: make(chan struct{}),
beforeScreenSizeChangeChannel: make(chan bool),
afterScreenSizeChangeChannel: make(chan int16),
config: config,
}
}
@ -50,13 +50,13 @@ func (manager *DesktopManagerCtx) Start() {
go func() {
for {
desktopErrorMessage := <- xevent.EventErrorChannel
msg := <-xevent.EventErrorChannel
manager.logger.Warn().
Uint8("error_code", desktopErrorMessage.Error_code).
Str("message", desktopErrorMessage.Message).
Uint8("request_code", desktopErrorMessage.Request_code).
Uint8("minor_code", desktopErrorMessage.Minor_code).
Msg("X event error occurred")
Uint8("error_code", msg.Error_code).
Str("message", msg.Message).
Uint8("request_code", msg.Request_code).
Uint8("minor_code", msg.Minor_code).
Msg("X event error occurred")
}
}()
@ -79,11 +79,11 @@ func (manager *DesktopManagerCtx) Start() {
}()
}
func (manager *DesktopManagerCtx) GetBeforeScreenSizeChangeChannel() (chan bool) {
func (manager *DesktopManagerCtx) GetBeforeScreenSizeChangeChannel() chan bool {
return manager.beforeScreenSizeChangeChannel
}
func (manager *DesktopManagerCtx) GetAfterScreenSizeChangeChannel() (chan int16) {
func (manager *DesktopManagerCtx) GetAfterScreenSizeChangeChannel() chan int16 {
return manager.afterScreenSizeChangeChannel
}

View File

@ -5,22 +5,22 @@ import (
"m1k1o/neko/internal/types"
)
func (manager *DesktopManagerCtx) GetCursorChangedChannel() (chan uint64) {
func (manager *DesktopManagerCtx) GetCursorChangedChannel() chan uint64 {
return xevent.CursorChangedChannel
}
func (manager *DesktopManagerCtx) GetClipboardUpdatedChannel() (chan bool) {
func (manager *DesktopManagerCtx) GetClipboardUpdatedChannel() chan bool {
return xevent.ClipboardUpdatedChannel
}
func (manager *DesktopManagerCtx) GetFileChooserDialogOpenedChannel() (chan bool) {
func (manager *DesktopManagerCtx) GetFileChooserDialogOpenedChannel() chan bool {
return xevent.FileChooserDialogOpenedChannel
}
func (manager *DesktopManagerCtx) GetFileChooserDialogClosedChannel() (chan bool) {
func (manager *DesktopManagerCtx) GetFileChooserDialogClosedChannel() chan bool {
return xevent.FileChooserDialogClosedChannel
}
func (manager *DesktopManagerCtx) GetEventErrorChannel() (chan types.DesktopErrorMessage) {
func (manager *DesktopManagerCtx) GetEventErrorChannel() chan types.DesktopErrorMessage {
return xevent.EventErrorChannel
}

View File

@ -9,6 +9,7 @@ import "C"
import (
"unsafe"
"m1k1o/neko/internal/types"
)
@ -25,20 +26,22 @@ func init() {
FileChooserDialogOpenedChannel = make(chan bool)
EventErrorChannel = make(chan types.DesktopErrorMessage)
// Dummy goroutines since there is no consumer for the channel otherwise
go func() {
for {
_ = <-CursorChangedChannel
// TODO: Unused.
<-CursorChangedChannel
}
}()
go func() {
for {
_ = <-FileChooserDialogClosedChannel
// TODO: Unused.
<-FileChooserDialogClosedChannel
}
}()
go func() {
for {
_ = <-FileChooserDialogOpenedChannel
// TODO: Unused.
<-FileChooserDialogOpenedChannel
}
}()
}
@ -72,7 +75,12 @@ func goXEventUnmapNotify(window C.Window) {
//export goXEventError
func goXEventError(event *C.XErrorEvent, message *C.char) {
EventErrorChannel <- types.DesktopErrorMessage{ uint8(event.error_code), C.GoString(message), uint8(event.request_code), uint8(event.minor_code) }
EventErrorChannel <- types.DesktopErrorMessage{
Error_code: uint8(event.error_code),
Message: C.GoString(message),
Request_code: uint8(event.request_code),
Minor_code: uint8(event.minor_code),
}
}
//export goXEventActive

View File

@ -13,25 +13,25 @@ import (
func New(capture types.CaptureManager) *SessionManager {
return &SessionManager{
logger: log.With().Str("module", "session").Logger(),
host: "",
capture: capture,
logger: log.With().Str("module", "session").Logger(),
host: "",
capture: capture,
sessionChannel: make(chan types.SessionInformation, 10),
hostChannel: make(chan types.HostInformation, 10),
members: make(map[string]*Session),
hostChannel: make(chan types.HostInformation, 10),
members: make(map[string]*Session),
}
}
type SessionManager struct {
mu sync.Mutex
logger zerolog.Logger
host string
capture types.CaptureManager
members map[string]*Session
sessionChannel chan types.SessionInformation
hostChannel chan types.HostInformation
mu sync.Mutex
logger zerolog.Logger
host string
capture types.CaptureManager
members map[string]*Session
sessionChannel chan types.SessionInformation
hostChannel chan types.HostInformation
// TODO: Handle locks in sessions as flags.
controlLocked bool
controlLocked bool
}
func (manager *SessionManager) New(id string, admin bool, socket types.WebSocket) types.Session {
@ -50,11 +50,16 @@ func (manager *SessionManager) New(id string, admin bool, socket types.WebSocket
manager.capture.Video().AddListener()
manager.mu.Unlock()
manager.sessionChannel <- types.SessionInformation{ "created", id, session }
manager.sessionChannel <- types.SessionInformation{
Type: "created",
Id: id,
Session: session,
}
go func() {
for {
_ = <- manager.hostChannel
// TODO: Unused.
<-manager.hostChannel
}
}()
return session
@ -76,7 +81,10 @@ func (manager *SessionManager) SetHost(id string) error {
if ok {
manager.host = id
manager.hostChannel <- types.HostInformation{ "host", id }
manager.hostChannel <- types.HostInformation{
Type: "host",
Id: id,
}
return nil
}
@ -94,7 +102,10 @@ func (manager *SessionManager) GetHost() (types.Session, bool) {
func (manager *SessionManager) ClearHost() {
id := manager.host
manager.host = ""
manager.hostChannel <- types.HostInformation{ "host_cleared", id }
manager.hostChannel <- types.HostInformation{
Type: "host_cleared",
Id: id,
}
}
func (manager *SessionManager) Has(id string) bool {
@ -171,7 +182,11 @@ func (manager *SessionManager) Destroy(id string) {
manager.capture.Video().RemoveListener()
manager.mu.Unlock()
manager.sessionChannel <- types.SessionInformation{ "destroyed", id, session }
manager.sessionChannel <- types.SessionInformation{
Type: "destroyed",
Id: id,
Session: session,
}
manager.logger.Err(err).Str("session_id", id).Msg("destroying session")
return
}
@ -229,10 +244,10 @@ func (manager *SessionManager) AdminBroadcast(v interface{}, exclude interface{}
return nil
}
func (manager *SessionManager) GetSessionChannel() (chan types.SessionInformation) {
func (manager *SessionManager) GetSessionChannel() chan types.SessionInformation {
return manager.sessionChannel
}
func (manager *SessionManager) GetHostChannel() (chan types.HostInformation) {
func (manager *SessionManager) GetHostChannel() chan types.HostInformation {
return manager.hostChannel
}

View File

@ -78,7 +78,11 @@ func (session *Session) SetPeer(peer types.Peer) error {
func (session *Session) SetConnected(connected bool) error {
session.connected = connected
if connected {
session.manager.sessionChannel <- types.SessionInformation{ "connected", session.id, session }
session.manager.sessionChannel <- types.SessionInformation{
Type: "connected",
Id: session.id,
Session: session,
}
}
return nil
}

View File

@ -25,7 +25,7 @@ type StreamSinkManager interface {
ListenersCount() int
Started() bool
GetSampleChannel() (chan Sample)
GetSampleChannel() chan Sample
SetChangeFramerate(rate int16)
SetAdaptiveFramerate(allow bool)
}

View File

@ -116,6 +116,7 @@ func H264() RTPCodec {
},
}
}
// TODO: Profile ID.
func AV1() RTPCodec {
return RTPCodec{

View File

@ -43,8 +43,8 @@ type DesktopErrorMessage struct {
type DesktopManager interface {
Start()
Shutdown() error
GetBeforeScreenSizeChangeChannel() (chan bool)
GetAfterScreenSizeChangeChannel() (chan int16)
GetBeforeScreenSizeChangeChannel() chan bool
GetAfterScreenSizeChangeChannel() chan int16
// clipboard
ReadClipboard() string
@ -72,9 +72,9 @@ type DesktopManager interface {
GetScreenshotImage() *image.RGBA
// xevent
GetCursorChangedChannel() (chan uint64)
GetClipboardUpdatedChannel() (chan bool)
GetFileChooserDialogOpenedChannel() (chan bool)
GetFileChooserDialogClosedChannel() (chan bool)
GetEventErrorChannel() (chan DesktopErrorMessage)
GetCursorChangedChannel() chan uint64
GetClipboardUpdatedChannel() chan bool
GetFileChooserDialogOpenedChannel() chan bool
GetFileChooserDialogClosedChannel() chan bool
GetEventErrorChannel() chan DesktopErrorMessage
}

View File

@ -8,9 +8,9 @@ type Member struct {
}
type SessionInformation struct {
Type string
Id string
Session Session
Type string
Id string
Session Session
}
type HostInformation struct {
@ -57,6 +57,6 @@ type SessionManager interface {
Clear() error
Broadcast(v interface{}, exclude interface{}) error
AdminBroadcast(v interface{}, exclude interface{}) error
GetSessionChannel() (chan SessionInformation)
GetHostChannel() (chan HostInformation)
GetSessionChannel() chan SessionInformation
GetHostChannel() chan HostInformation
}

View File

@ -62,7 +62,8 @@ func (manager *WebRTCManager) Start() {
time.Sleep(50 * time.Millisecond)
continue
}
newSample := <- manager.capture.Audio().GetSampleChannel()
newSample := <-manager.capture.Audio().GetSampleChannel()
err := manager.audioTrack.WriteSample(media.Sample(newSample))
if err != nil && errors.Is(err, io.ErrClosedPipe) {
manager.logger.Warn().Err(err).Msg("audio pipeline failed to write")
@ -87,7 +88,8 @@ func (manager *WebRTCManager) Start() {
time.Sleep(50 * time.Millisecond)
continue
}
newSample := <- manager.capture.Video().GetSampleChannel()
newSample := <-manager.capture.Video().GetSampleChannel()
err := manager.videoTrack.WriteSample(media.Sample(newSample))
if err != nil && errors.Is(err, io.ErrClosedPipe) {
manager.logger.Warn().Err(err).Msg("video pipeline failed to write")

View File

@ -101,9 +101,9 @@ type WebSocketHandler struct {
}
func (ws *WebSocketHandler) Start() {
go func () {
go func() {
for {
channelMessage := <- ws.sessions.GetSessionChannel()
channelMessage := <-ws.sessions.GetSessionChannel()
switch channelMessage.Type {
case "created":
@ -189,7 +189,7 @@ func (ws *WebSocketHandler) Start() {
go func() {
for {
_ = <- ws.desktop.GetClipboardUpdatedChannel()
_ = <-ws.desktop.GetClipboardUpdatedChannel()
session, ok := ws.sessions.GetHost()
if !ok {
return