merge from remote

This commit is contained in:
gbrian
2021-03-29 11:03:25 +00:00
parent a1fcf87345
commit 8efc5d7094
95 changed files with 5789 additions and 874 deletions

View File

@ -3,6 +3,7 @@ package broadcast
import (
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"n.eko.moe/neko/internal/gst"
"n.eko.moe/neko/internal/types/config"
)
@ -10,34 +11,73 @@ import (
type BroadcastManager struct {
logger zerolog.Logger
pipeline *gst.Pipeline
remote *config.Remote
config *config.Broadcast
enabled bool
url string
}
func New(config *config.Broadcast) *BroadcastManager {
func New(remote *config.Remote, config *config.Broadcast) *BroadcastManager {
return &BroadcastManager{
logger: log.With().Str("module", "remote").Logger(),
config: config,
logger: log.With().Str("module", "remote").Logger(),
remote: remote,
config: config,
enabled: false,
url: "",
}
}
func (manager *BroadcastManager) Start() {
if !manager.enabled || manager.IsActive() {
return
}
var err error
manager.pipeline, err = gst.CreateRTMPPipeline(
manager.config.Device,
manager.config.Display,
manager.config.RTMP,
manager.remote.Device,
manager.remote.Display,
manager.config.Pipeline,
manager.url,
)
manager.logger.Info().
Str("audio_device", manager.remote.Device).
Str("video_display", manager.remote.Display).
Str("rtmp_pipeline_src", manager.pipeline.Src).
Msgf("RTMP pipeline is starting...")
if err != nil {
manager.logger.Panic().Err(err).Msg("unable to create rtmp pipeline")
return
}
manager.pipeline.Start()
manager.pipeline.Play()
}
func (manager *BroadcastManager) Shutdown() error {
if manager.pipeline != nil {
manager.pipeline.Stop()
func (manager *BroadcastManager) Stop() {
if !manager.IsActive() {
return
}
return nil
manager.pipeline.Stop()
manager.pipeline = nil
}
func (manager *BroadcastManager) IsActive() bool {
return manager.pipeline != nil
}
func (manager *BroadcastManager) Create(url string) {
manager.url = url
manager.enabled = true
manager.Start()
}
func (manager *BroadcastManager) Destroy() {
manager.Stop()
manager.enabled = false
}
func (manager *BroadcastManager) GetUrl() string {
return manager.url
}

View File

@ -84,6 +84,10 @@ void gstreamer_send_start_pipeline(GstElement *pipeline, int pipelineId) {
gst_element_set_state(pipeline, GST_STATE_PLAYING);
}
void gstreamer_send_play_pipeline(GstElement *pipeline) {
gst_element_set_state(pipeline, GST_STATE_PLAYING);
}
void gstreamer_send_stop_pipeline(GstElement *pipeline) {
gst_element_set_state(pipeline, GST_STATE_NULL);
}

View File

@ -10,10 +10,9 @@ import "C"
import (
"fmt"
"sync"
"time"
"unsafe"
"github.com/pion/webrtc/v2"
"n.eko.moe/neko/internal/types"
)
@ -42,7 +41,6 @@ type Pipeline struct {
Pipeline *C.GstElement
Sample chan types.Sample
CodecName string
ClockRate float32
Src string
id int
}
@ -52,11 +50,8 @@ var pipelinesLock sync.Mutex
var registry *C.GstRegistry
const (
videoClockRate = 90000
audioClockRate = 48000
pcmClockRate = 8000
videoSrc = "ximagesrc xid=%s show-pointer=true use-damage=false ! video/x-raw ! videoconvert ! queue ! "
audioSrc = "pulsesrc device=%s ! audioconvert ! "
videoSrc = "ximagesrc display-name=%s show-pointer=true use-damage=false ! video/x-raw,framerate=%d/1 ! videoconvert ! queue ! "
audioSrc = "pulsesrc device=%s ! audio/x-raw,channels=2 ! audioconvert ! "
)
func init() {
@ -65,20 +60,26 @@ func init() {
}
// CreateRTMPPipeline creates a GStreamer Pipeline
func CreateRTMPPipeline(pipelineDevice string, pipelineDisplay string, pipelineRTMP string) (*Pipeline, error) {
video := fmt.Sprintf(videoSrc, pipelineDisplay)
func CreateRTMPPipeline(pipelineDevice string, pipelineDisplay string, pipelineSrc string, pipelineRTMP string) (*Pipeline, error) {
video := fmt.Sprintf(videoSrc, pipelineDisplay, 25)
audio := fmt.Sprintf(audioSrc, pipelineDevice)
return CreatePipeline(fmt.Sprintf("%s ! x264enc ! flv. ! %s ! faac ! flv. ! flvmux name='flv' ! rtmpsink location='%s'", video, audio, pipelineRTMP), "", 0)
var pipelineStr string
if pipelineSrc != "" {
pipelineStr = fmt.Sprintf(pipelineSrc, pipelineRTMP, pipelineDevice, pipelineDisplay)
} else {
pipelineStr = fmt.Sprintf("flvmux name=mux ! rtmpsink location='%s live=1' %s audio/x-raw,channels=2 ! audioconvert ! voaacenc ! mux. %s x264enc bframes=0 key-int-max=60 byte-stream=true tune=zerolatency speed-preset=veryfast ! mux.", pipelineRTMP, audio, video)
}
return CreatePipeline(pipelineStr, "")
}
// CreateAppPipeline creates a GStreamer Pipeline
func CreateAppPipeline(codecName string, pipelineDevice string, pipelineSrc string) (*Pipeline, error) {
func CreateAppPipeline(codecName string, pipelineDevice string, pipelineSrc string, fps int, bitrate uint) (*Pipeline, error) {
pipelineStr := " ! appsink name=appsink"
var clockRate float32
switch codecName {
case webrtc.VP8:
case "VP8":
// https://gstreamer.freedesktop.org/documentation/vpx/vp8enc.html?gi-language=c
// gstreamer1.0-plugins-good
// vp8enc error-resilient=partitions keyframe-max-dist=10 auto-alt-ref=true cpu-used=5 deadline=1
@ -86,14 +87,12 @@ func CreateAppPipeline(codecName string, pipelineDevice string, pipelineSrc stri
return nil, err
}
clockRate = videoClockRate
if pipelineSrc != "" {
pipelineStr = fmt.Sprintf(pipelineSrc+pipelineStr, pipelineDevice)
} else {
pipelineStr = fmt.Sprintf(videoSrc+"vp8enc cpu-used=8 threads=2 deadline=1 error-resilient=partitions keyframe-max-dist=10 auto-alt-ref=true"+pipelineStr, pipelineDevice)
pipelineStr = fmt.Sprintf(videoSrc+"vp8enc target-bitrate=%d cpu-used=4 end-usage=cbr threads=4 deadline=1 undershoot=95 buffer-size=%d buffer-initial-size=%d buffer-optimal-size=%d keyframe-max-dist=180 min-quantizer=3 max-quantizer=40"+pipelineStr, pipelineDevice, fps, bitrate*1000, bitrate*6, bitrate*4, bitrate*5)
}
case webrtc.VP9:
case "VP9":
// https://gstreamer.freedesktop.org/documentation/vpx/vp9enc.html?gi-language=c
// gstreamer1.0-plugins-good
// vp9enc
@ -101,41 +100,43 @@ func CreateAppPipeline(codecName string, pipelineDevice string, pipelineSrc stri
return nil, err
}
clockRate = videoClockRate
// Causes panic! not sure why...
if pipelineSrc != "" {
pipelineStr = fmt.Sprintf(pipelineSrc+pipelineStr, pipelineDevice)
} else {
pipelineStr = fmt.Sprintf(videoSrc+"vp9enc"+pipelineStr, pipelineDevice)
pipelineStr = fmt.Sprintf(videoSrc+"vp9enc target-bitrate=%d cpu-used=-5 threads=4 deadline=1 keyframe-max-dist=30 auto-alt-ref=true"+pipelineStr, pipelineDevice, fps, bitrate*1000)
}
case webrtc.H264:
// https://gstreamer.freedesktop.org/documentation/openh264/openh264enc.html?gi-language=c#openh264enc
// gstreamer1.0-plugins-bad
// openh264enc multi-thread=4 complexity=high bitrate=3072000 max-bitrate=4096000
case "H264":
if err := CheckPlugins([]string{"ximagesrc"}); err != nil {
return nil, err
}
clockRate = videoClockRate
if pipelineSrc != "" {
pipelineStr = fmt.Sprintf(pipelineSrc+pipelineStr, pipelineDevice)
} else {
pipelineStr = fmt.Sprintf(videoSrc+"openh264enc multi-thread=4 complexity=high bitrate=3072000 max-bitrate=4096000 ! video/x-h264,stream-format=byte-stream"+pipelineStr, pipelineDevice)
// https://gstreamer.freedesktop.org/documentation/x264/index.html?gi-language=c
// gstreamer1.0-plugins-ugly
// video/x-raw,format=I420 ! x264enc bframes=0 key-int-max=60 byte-stream=true tune=zerolatency speed-preset=veryfast ! video/x-h264,stream-format=byte-stream
if err := CheckPlugins([]string{"openh264"}); err != nil {
pipelineStr = fmt.Sprintf(videoSrc+"video/x-raw,format=I420 ! x264enc bframes=0 key-int-max=60 byte-stream=true tune=zerolatency speed-preset=veryfast ! video/x-h264,stream-format=byte-stream"+pipelineStr, pipelineDevice)
if err := CheckPlugins([]string{"x264"}); err != nil {
return nil, err
}
}
break
}
case webrtc.Opus:
// https://gstreamer.freedesktop.org/documentation/openh264/openh264enc.html?gi-language=c#openh264enc
// gstreamer1.0-plugins-bad
// openh264enc multi-thread=4 complexity=high bitrate=3072000 max-bitrate=4096000
if err := CheckPlugins([]string{"openh264"}); err == nil {
pipelineStr = fmt.Sprintf(videoSrc+"openh264enc multi-thread=4 complexity=high bitrate=%d max-bitrate=%d ! video/x-h264,stream-format=byte-stream"+pipelineStr, pipelineDevice, fps, bitrate*1000, (bitrate+1024)*1000)
break
}
// https://gstreamer.freedesktop.org/documentation/x264/index.html?gi-language=c
// gstreamer1.0-plugins-ugly
// video/x-raw,format=I420 ! x264enc bframes=0 key-int-max=60 byte-stream=true tune=zerolatency speed-preset=veryfast ! video/x-h264,stream-format=byte-stream
if err := CheckPlugins([]string{"x264"}); err != nil {
return nil, err
}
vbvbuf := uint(1000)
if bitrate > 1000 {
vbvbuf = bitrate
}
pipelineStr = fmt.Sprintf(videoSrc+"video/x-raw,format=NV12 ! x264enc threads=4 bitrate=%d key-int-max=60 vbv-buf-capacity=%d byte-stream=true tune=zerolatency speed-preset=veryfast ! video/x-h264,stream-format=byte-stream"+pipelineStr, pipelineDevice, fps, bitrate, vbvbuf)
case "Opus":
// https://gstreamer.freedesktop.org/documentation/opus/opusenc.html
// gstreamer1.0-plugins-base
// opusenc
@ -143,14 +144,12 @@ func CreateAppPipeline(codecName string, pipelineDevice string, pipelineSrc stri
return nil, err
}
clockRate = audioClockRate
if pipelineSrc != "" {
pipelineStr = fmt.Sprintf(pipelineSrc+pipelineStr, pipelineDevice)
} else {
pipelineStr = fmt.Sprintf(audioSrc+"opusenc"+pipelineStr, pipelineDevice)
pipelineStr = fmt.Sprintf(audioSrc+"opusenc bitrate=%d"+pipelineStr, pipelineDevice, bitrate*1000)
}
case webrtc.G722:
case "G722":
// https://gstreamer.freedesktop.org/documentation/libav/avenc_g722.html?gi-language=c
// gstreamer1.0-libav
// avenc_g722
@ -158,14 +157,12 @@ func CreateAppPipeline(codecName string, pipelineDevice string, pipelineSrc stri
return nil, err
}
clockRate = audioClockRate
if pipelineSrc != "" {
pipelineStr = fmt.Sprintf(pipelineSrc+pipelineStr, pipelineDevice)
} else {
pipelineStr = fmt.Sprintf(audioSrc+"avenc_g722"+pipelineStr, pipelineDevice)
pipelineStr = fmt.Sprintf(audioSrc+"avenc_g722 bitrate=%d"+pipelineStr, pipelineDevice, bitrate*1000)
}
case webrtc.PCMU:
case "PCMU":
// https://gstreamer.freedesktop.org/documentation/mulaw/mulawenc.html?gi-language=c
// gstreamer1.0-plugins-good
// audio/x-raw, rate=8000 ! mulawenc
@ -173,14 +170,12 @@ func CreateAppPipeline(codecName string, pipelineDevice string, pipelineSrc stri
return nil, err
}
clockRate = pcmClockRate
if pipelineSrc != "" {
pipelineStr = fmt.Sprintf(pipelineSrc+pipelineStr, pipelineDevice)
} else {
pipelineStr = fmt.Sprintf(audioSrc+"audio/x-raw, rate=8000 ! mulawenc"+pipelineStr, pipelineDevice)
}
case webrtc.PCMA:
case "PCMA":
// https://gstreamer.freedesktop.org/documentation/alaw/alawenc.html?gi-language=c
// gstreamer1.0-plugins-good
// audio/x-raw, rate=8000 ! alawenc
@ -188,8 +183,6 @@ func CreateAppPipeline(codecName string, pipelineDevice string, pipelineSrc stri
return nil, err
}
clockRate = pcmClockRate
if pipelineSrc != "" {
pipelineStr = fmt.Sprintf(pipelineSrc+pipelineStr, pipelineDevice)
} else {
@ -199,11 +192,11 @@ func CreateAppPipeline(codecName string, pipelineDevice string, pipelineSrc stri
return nil, fmt.Errorf("unknown codec %s", codecName)
}
return CreatePipeline(pipelineStr, codecName, clockRate)
return CreatePipeline(pipelineStr, codecName)
}
// CreatePipeline creates a GStreamer Pipeline
func CreatePipeline(pipelineStr string, codecName string, clockRate float32) (*Pipeline, error) {
func CreatePipeline(pipelineStr string, codecName string) (*Pipeline, error) {
pipelineStrUnsafe := C.CString(pipelineStr)
defer C.free(unsafe.Pointer(pipelineStrUnsafe))
@ -214,7 +207,6 @@ func CreatePipeline(pipelineStr string, codecName string, clockRate float32) (*P
Pipeline: C.gstreamer_send_create_pipeline(pipelineStrUnsafe),
Sample: make(chan types.Sample),
CodecName: codecName,
ClockRate: clockRate,
Src: pipelineStr,
id: len(pipelines),
}
@ -228,6 +220,11 @@ func (p *Pipeline) Start() {
C.gstreamer_send_start_pipeline(p.Pipeline, C.int(p.id))
}
// Play starts the GStreamer Pipeline
func (p *Pipeline) Play() {
C.gstreamer_send_play_pipeline(p.Pipeline)
}
// Stop stops the GStreamer Pipeline
func (p *Pipeline) Stop() {
C.gstreamer_send_stop_pipeline(p.Pipeline)
@ -255,8 +252,7 @@ func goHandlePipelineBuffer(buffer unsafe.Pointer, bufferLen C.int, duration C.i
pipelinesLock.Unlock()
if ok {
samples := uint32(pipeline.ClockRate * (float32(duration) / 1000000000))
pipeline.Sample <- types.Sample{Data: C.GoBytes(buffer, bufferLen), Samples: samples}
pipeline.Sample <- types.Sample{Data: C.GoBytes(buffer, bufferLen), Timestamp: time.Now(), Duration: time.Duration(duration)}
} else {
fmt.Printf("discarding buffer, no pipeline with id %d", int(pipelineID))
}

View File

@ -11,6 +11,7 @@ extern void goHandlePipelineBuffer(void *buffer, int bufferLen, int samples, int
GstElement *gstreamer_send_create_pipeline(char *pipeline);
void gstreamer_send_start_pipeline(GstElement *pipeline, int pipelineId);
void gstreamer_send_play_pipeline(GstElement *pipeline);
void gstreamer_send_stop_pipeline(GstElement *pipeline);
void gstreamer_send_start_mainloop(void);
void gstreamer_init(void);

View File

@ -2,6 +2,7 @@ package http
import (
"context"
"encoding/json"
"fmt"
"net/http"
"os"
@ -10,7 +11,6 @@ import (
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"n.eko.moe/neko/internal/http/endpoint"
"n.eko.moe/neko/internal/http/middleware"
"n.eko.moe/neko/internal/types"
"n.eko.moe/neko/internal/types/config"
@ -24,7 +24,7 @@ type Server struct {
}
func New(conf *config.Server, webSocketHandler types.WebSocketHandler) *Server {
logger := log.With().Str("module", "webrtc").Logger()
logger := log.With().Str("module", "http").Logger()
router := chi.NewRouter()
// router.Use(middleware.Recoverer) // Recover from panics without crashing server
@ -35,21 +35,42 @@ func New(conf *config.Server, webSocketHandler types.WebSocketHandler) *Server {
webSocketHandler.Upgrade(w, r)
})
fs := http.FileServer(http.Dir(conf.Static))
router.Get("/*", func(w http.ResponseWriter, r *http.Request) {
if _, err := os.Stat(conf.Static + r.RequestURI); os.IsNotExist(err) {
http.StripPrefix(r.RequestURI, fs).ServeHTTP(w, r)
} else {
fs.ServeHTTP(w, r)
router.Get("/stats", func(w http.ResponseWriter, r *http.Request) {
password := r.URL.Query().Get("pwd")
isAdmin, err := webSocketHandler.IsAdmin(password)
if err != nil {
w.WriteHeader(http.StatusForbidden)
fmt.Fprint(w, err)
return
}
if !isAdmin {
w.WriteHeader(http.StatusUnauthorized)
fmt.Fprint(w, "bad authorization")
return
}
w.Header().Set("Content-Type", "application/json")
stats := webSocketHandler.Stats()
if err := json.NewEncoder(w).Encode(stats); err != nil {
logger.Warn().Err(err).Msg("failed writing json error response")
}
})
router.NotFound(endpoint.Handle(func(w http.ResponseWriter, r *http.Request) error {
return &endpoint.HandlerError{
Status: http.StatusNotFound,
Message: fmt.Sprintf("file '%s' is not found", r.RequestURI),
router.Get("/health", func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("true"))
})
fs := http.FileServer(http.Dir(conf.Static))
router.Get("/*", func(w http.ResponseWriter, r *http.Request) {
if _, err := os.Stat(conf.Static + r.URL.Path); !os.IsNotExist(err) {
fs.ServeHTTP(w, r)
} else {
w.WriteHeader(http.StatusNotFound)
fmt.Fprint(w, "404 page not found")
}
}))
})
server := &http.Server{
Addr: conf.Bind,

View File

@ -18,19 +18,21 @@ type RemoteManager struct {
video *gst.Pipeline
audio *gst.Pipeline
config *config.Remote
broadcast types.BroadcastManager
cleanup *time.Ticker
shutdown chan bool
emmiter events.EventEmmiter
streaming bool
}
func New(config *config.Remote) *RemoteManager {
func New(config *config.Remote, broadcast types.BroadcastManager) *RemoteManager {
return &RemoteManager{
logger: log.With().Str("module", "remote").Logger(),
cleanup: time.NewTicker(1 * time.Second),
shutdown: make(chan bool),
emmiter: events.New(),
config: config,
broadcast: broadcast,
streaming: false,
}
}
@ -44,7 +46,16 @@ func (manager *RemoteManager) AudioCodec() string {
}
func (manager *RemoteManager) Start() {
xorg.Display(manager.config.Display)
if !xorg.ValidScreenSize(manager.config.ScreenWidth, manager.config.ScreenHeight, manager.config.ScreenRate) {
manager.logger.Warn().Msgf("invalid screen option %dx%d@%d", manager.config.ScreenWidth, manager.config.ScreenHeight, manager.config.ScreenRate)
} else if err := xorg.ChangeScreenSize(manager.config.ScreenWidth, manager.config.ScreenHeight, manager.config.ScreenRate); err != nil {
manager.logger.Warn().Err(err).Msg("unable to change screen size")
}
manager.createPipelines()
manager.broadcast.Start()
go func() {
defer func() {
@ -70,6 +81,8 @@ func (manager *RemoteManager) Shutdown() error {
manager.logger.Info().Msgf("remote shutting down")
manager.video.Stop()
manager.audio.Stop()
manager.broadcast.Stop()
manager.cleanup.Stop()
manager.shutdown <- true
return nil
@ -88,6 +101,8 @@ func (manager *RemoteManager) OnAudioFrame(listener func(sample types.Sample)) {
}
func (manager *RemoteManager) StartStream() {
manager.createPipelines()
manager.logger.Info().
Str("video_display", manager.config.Display).
Str("video_codec", manager.config.VideoCodec).
@ -98,15 +113,6 @@ func (manager *RemoteManager) StartStream() {
Str("screen_resolution", fmt.Sprintf("%dx%d@%d", manager.config.ScreenWidth, manager.config.ScreenHeight, manager.config.ScreenRate)).
Msgf("Pipelines starting...")
xorg.Display(manager.config.Display)
if !xorg.ValidScreenSize(manager.config.ScreenWidth, manager.config.ScreenHeight, manager.config.ScreenRate) {
manager.logger.Warn().Msgf("invalid screen option %dx%d@%d", manager.config.ScreenWidth, manager.config.ScreenHeight, manager.config.ScreenRate)
} else if err := xorg.ChangeScreenSize(manager.config.ScreenWidth, manager.config.ScreenHeight, manager.config.ScreenRate); err != nil {
manager.logger.Warn().Err(err).Msg("unable to change screen size")
}
manager.createPipelines()
manager.video.Start()
manager.audio.Start()
manager.streaming = true
@ -124,11 +130,19 @@ func (manager *RemoteManager) Streaming() bool {
}
func (manager *RemoteManager) createPipelines() {
// handle maximum fps
rate := manager.config.ScreenRate
if manager.config.MaxFPS != 0 && manager.config.MaxFPS < manager.config.ScreenRate {
rate = manager.config.MaxFPS
}
var err error
manager.video, err = gst.CreateAppPipeline(
manager.config.VideoCodec,
manager.config.Display,
manager.config.VideoParams,
rate,
manager.config.VideoBitrate,
)
if err != nil {
manager.logger.Panic().Err(err).Msg("unable to create video pipeline")
@ -138,9 +152,11 @@ func (manager *RemoteManager) createPipelines() {
manager.config.AudioCodec,
manager.config.Device,
manager.config.AudioParams,
0, // fps: n/a for audio
manager.config.AudioBitrate,
)
if err != nil {
manager.logger.Panic().Err(err).Msg("unable to screate audio pipeline")
manager.logger.Panic().Err(err).Msg("unable to create audio pipeline")
}
}
@ -150,8 +166,12 @@ func (manager *RemoteManager) ChangeResolution(width int, height int, rate int)
}
manager.video.Stop()
manager.broadcast.Stop()
defer func() {
manager.video.Start()
manager.broadcast.Start()
manager.logger.Info().Msg("starting video pipeline...")
}()
@ -159,17 +179,23 @@ func (manager *RemoteManager) ChangeResolution(width int, height int, rate int)
return err
}
video, err := gst.CreateAppPipeline(
// handle maximum fps
if manager.config.MaxFPS != 0 && manager.config.MaxFPS < rate {
rate = manager.config.MaxFPS
}
var err error
manager.video, err = gst.CreateAppPipeline(
manager.config.VideoCodec,
manager.config.Display,
manager.config.VideoParams,
rate,
manager.config.VideoBitrate,
)
if err != nil {
manager.logger.Panic().Err(err).Msg("unable to create new video pipeline")
}
manager.video = video
return nil
}
@ -219,4 +245,8 @@ func (manager *RemoteManager) GetScreenSize() *types.ScreenSize {
func (manager *RemoteManager) SetKeyboardLayout(layout string) {
xorg.SetKeyboardLayout(layout)
}
}
func (manager *RemoteManager) SetKeyboardModifiers(NumLock int, CapsLock int, ScrollLock int) {
xorg.SetKeyboardModifiers(NumLock, CapsLock, ScrollLock)
}

View File

@ -2,6 +2,7 @@ package session
import (
"fmt"
"sync"
"github.com/kataras/go-events"
"github.com/rs/zerolog"
@ -22,6 +23,7 @@ func New(remote types.RemoteManager) *SessionManager {
}
type SessionManager struct {
mu sync.Mutex
logger zerolog.Logger
host string
remote types.RemoteManager
@ -39,13 +41,14 @@ func (manager *SessionManager) New(id string, admin bool, socket types.WebSocket
connected: false,
}
manager.mu.Lock()
manager.members[id] = session
manager.emmiter.Emit("created", id, session)
if manager.remote.Streaming() != true && len(manager.members) > 0 {
manager.remote.StartStream()
}
manager.mu.Unlock()
manager.emmiter.Emit("created", id, session)
return session
}
@ -58,16 +61,23 @@ func (manager *SessionManager) IsHost(id string) bool {
}
func (manager *SessionManager) SetHost(id string) error {
manager.mu.Lock()
_, ok := manager.members[id]
manager.mu.Unlock()
if ok {
manager.host = id
manager.emmiter.Emit("host", id)
return nil
}
return fmt.Errorf("invalid session id %s", id)
}
func (manager *SessionManager) GetHost() (types.Session, bool) {
manager.mu.Lock()
defer manager.mu.Unlock()
host, ok := manager.members[manager.host]
return host, ok
}
@ -79,16 +89,25 @@ func (manager *SessionManager) ClearHost() {
}
func (manager *SessionManager) Has(id string) bool {
manager.mu.Lock()
defer manager.mu.Unlock()
_, ok := manager.members[id]
return ok
}
func (manager *SessionManager) Get(id string) (types.Session, bool) {
manager.mu.Lock()
defer manager.mu.Unlock()
session, ok := manager.members[id]
return session, ok
}
func (manager *SessionManager) Admins() []*types.Member {
manager.mu.Lock()
defer manager.mu.Unlock()
members := []*types.Member{}
for _, session := range manager.members {
if !session.connected || !session.admin {
@ -100,10 +119,14 @@ func (manager *SessionManager) Admins() []*types.Member {
members = append(members, member)
}
}
return members
}
func (manager *SessionManager) Members() []*types.Member {
manager.mu.Lock()
defer manager.mu.Unlock()
members := []*types.Member{}
for _, session := range manager.members {
if !session.connected {
@ -119,6 +142,7 @@ func (manager *SessionManager) Members() []*types.Member {
}
func (manager *SessionManager) Destroy(id string) error {
manager.mu.Lock()
session, ok := manager.members[id]
if ok {
err := session.destroy()
@ -127,11 +151,13 @@ func (manager *SessionManager) Destroy(id string) error {
if manager.remote.Streaming() != false && len(manager.members) <= 0 {
manager.remote.StopStream()
}
manager.mu.Unlock()
manager.emmiter.Emit("destroyed", id, session)
return err
}
manager.mu.Unlock()
return nil
}
@ -140,6 +166,9 @@ func (manager *SessionManager) Clear() error {
}
func (manager *SessionManager) Broadcast(v interface{}, exclude interface{}) error {
manager.mu.Lock()
defer manager.mu.Unlock()
for id, session := range manager.members {
if !session.connected {
continue

View File

@ -120,6 +120,16 @@ func (session *Session) SignalAnswer(sdp string) error {
return session.peer.SignalAnswer(sdp)
}
func (session *Session) SignalCandidate(data string) error {
if session.socket == nil {
return nil
}
return session.socket.Send(&message.SignalCandidate{
Event: event.SIGNAL_CANDIDATE,
Data: data,
});
}
func (session *Session) destroy() error {
if session.socket != nil {
if err := session.socket.Destroy(); err != nil {

View File

@ -0,0 +1,10 @@
package types
type BroadcastManager interface {
Start()
Stop()
IsActive() bool
Create(url string)
Destroy()
GetUrl() string
}

View File

@ -6,32 +6,12 @@ import (
)
type Broadcast struct {
Enabled bool
Display string
Device string
AudioParams string
VideoParams string
RTMP string
Pipeline string
}
func (Broadcast) Init(cmd *cobra.Command) error {
cmd.PersistentFlags().Bool("broadcast", false, "use PCMA audio codec")
if err := viper.BindPFlag("broadcast", cmd.PersistentFlags().Lookup("broadcast")); err != nil {
return err
}
cmd.PersistentFlags().String("rtmp", "", "RMTP url for broadcasting")
if err := viper.BindPFlag("rtmp", cmd.PersistentFlags().Lookup("rtmp")); err != nil {
return err
}
cmd.PersistentFlags().String("cast_audio", "", "audio codec parameters to use for broadcasting")
if err := viper.BindPFlag("cast_audio", cmd.PersistentFlags().Lookup("cast_audio")); err != nil {
return err
}
cmd.PersistentFlags().String("cast_video", "", "video codec parameters to use for broadcasting")
if err := viper.BindPFlag("cast_video", cmd.PersistentFlags().Lookup("cast_video")); err != nil {
cmd.PersistentFlags().String("broadcast_pipeline", "", "audio codec parameters to use for broadcasting")
if err := viper.BindPFlag("broadcast_pipeline", cmd.PersistentFlags().Lookup("broadcast_pipeline")); err != nil {
return err
}
@ -39,10 +19,5 @@ func (Broadcast) Init(cmd *cobra.Command) error {
}
func (s *Broadcast) Set() {
s.Enabled = viper.GetBool("broadcast")
s.Display = viper.GetString("display")
s.Device = viper.GetString("device")
s.AudioParams = viper.GetString("cast_audio")
s.VideoParams = viper.GetString("cast_video")
s.RTMP = viper.GetString("rtmp")
s.Pipeline = viper.GetString("broadcast_pipeline")
}

View File

@ -4,7 +4,6 @@ import (
"regexp"
"strconv"
"github.com/pion/webrtc/v2"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
@ -14,11 +13,14 @@ type Remote struct {
Device string
AudioCodec string
AudioParams string
AudioBitrate uint
VideoCodec string
VideoParams string
VideoBitrate uint
ScreenWidth int
ScreenHeight int
ScreenRate int
MaxFPS int
}
func (Remote) Init(cmd *cobra.Command) error {
@ -37,16 +39,31 @@ func (Remote) Init(cmd *cobra.Command) error {
return err
}
cmd.PersistentFlags().Int("audio_bitrate", 128, "audio bitrate in kbit/s")
if err := viper.BindPFlag("audio_bitrate", cmd.PersistentFlags().Lookup("audio_bitrate")); err != nil {
return err
}
cmd.PersistentFlags().String("video", "", "video codec parameters to use for streaming")
if err := viper.BindPFlag("video", cmd.PersistentFlags().Lookup("video")); err != nil {
return err
}
cmd.PersistentFlags().Int("video_bitrate", 3072, "video bitrate in kbit/s")
if err := viper.BindPFlag("video_bitrate", cmd.PersistentFlags().Lookup("video_bitrate")); err != nil {
return err
}
cmd.PersistentFlags().String("screen", "1280x720@30", "default screen resolution and framerate")
if err := viper.BindPFlag("screen", cmd.PersistentFlags().Lookup("screen")); err != nil {
return err
}
cmd.PersistentFlags().Int("max_fps", 25, "maximum fps delivered via WebRTC, 0 is for no maximum")
if err := viper.BindPFlag("max_fps", cmd.PersistentFlags().Lookup("max_fps")); err != nil {
return err
}
// video codecs
cmd.PersistentFlags().Bool("vp8", false, "use VP8 video codec")
if err := viper.BindPFlag("vp8", cmd.PersistentFlags().Lookup("vp8")); err != nil {
@ -88,32 +105,34 @@ func (Remote) Init(cmd *cobra.Command) error {
}
func (s *Remote) Set() {
videoCodec := webrtc.VP8
videoCodec := "VP8"
if viper.GetBool("vp8") {
videoCodec = webrtc.VP8
videoCodec = "VP8"
} else if viper.GetBool("vp9") {
videoCodec = webrtc.VP9
videoCodec = "VP9"
} else if viper.GetBool("h264") {
videoCodec = webrtc.H264
videoCodec = "H264"
}
audioCodec := webrtc.Opus
audioCodec := "Opus"
if viper.GetBool("opus") {
audioCodec = webrtc.Opus
audioCodec = "Opus"
} else if viper.GetBool("g722") {
audioCodec = webrtc.G722
audioCodec = "G722"
} else if viper.GetBool("pcmu") {
audioCodec = webrtc.PCMU
audioCodec = "PCMU"
} else if viper.GetBool("pcma") {
audioCodec = webrtc.PCMA
audioCodec = "PCMA"
}
s.Device = viper.GetString("device")
s.AudioCodec = audioCodec
s.AudioParams = viper.GetString("audio")
s.AudioBitrate = viper.GetUint("audio_bitrate")
s.Display = viper.GetString("display")
s.VideoCodec = videoCodec
s.VideoParams = viper.GetString("video")
s.VideoBitrate = viper.GetUint("video_bitrate")
s.ScreenWidth = 1280
s.ScreenHeight = 720
@ -133,4 +152,6 @@ func (s *Remote) Set() {
s.ScreenRate = int(rate)
}
}
s.MaxFPS = viper.GetInt("max_fps")
}

View File

@ -5,8 +5,10 @@ const (
)
const (
SIGNAL_ANSWER = "signal/answer"
SIGNAL_PROVIDE = "signal/provide"
SIGNAL_ANSWER = "signal/answer"
SIGNAL_OFFER = "signal/offer"
SIGNAL_PROVIDE = "signal/provide"
SIGNAL_CANDIDATE = "signal/candidate"
)
const (
@ -36,6 +38,12 @@ const (
SCREEN_SET = "screen/set"
)
const (
BORADCAST_STATUS = "broadcast/status"
BORADCAST_CREATE = "broadcast/create"
BORADCAST_DESTROY = "broadcast/destroy"
)
const (
ADMIN_BAN = "admin/ban"
ADMIN_KICK = "admin/kick"

View File

@ -27,6 +27,11 @@ type SignalAnswer struct {
SDP string `json:"sdp"`
}
type SignalCandidate struct {
Event string `json:"event"`
Data string `json:"data"`
}
type MembersList struct {
Event string `json:"event"`
Memebers []*types.Member `json:"members"`
@ -47,8 +52,11 @@ type Clipboard struct {
}
type Keyboard struct {
Event string `json:"event"`
Layout string `json:"layout"`
Event string `json:"event"`
Layout *string `json:"layout,omitempty"`
CapsLock *bool `json:"capsLock,omitempty"`
NumLock *bool `json:"numLock,omitempty"`
ScrollLock *bool `json:"scrollLock,omitempty"`
}
type Control struct {
@ -107,3 +115,14 @@ type ScreenConfigurations struct {
Event string `json:"event"`
Configurations map[int]types.ScreenConfiguration `json:"configurations"`
}
type BroadcastStatus struct {
Event string `json:"event"`
URL string `json:"url"`
IsActive bool `json:"isActive"`
}
type BroadcastCreate struct {
Event string `json:"event"`
URL string `json:"url"`
}

View File

@ -23,4 +23,5 @@ type RemoteManager interface {
WriteClipboard(data string)
ResetKeys()
SetKeyboardLayout(layout string)
SetKeyboardModifiers(NumLock int, CapsLock int, ScrollLock int)
}

View File

@ -24,6 +24,7 @@ type Session interface {
Write(v interface{}) error
Send(v interface{}) error
SignalAnswer(sdp string) error
SignalCandidate(data string) error
}
type SessionManager interface {

View File

@ -1,8 +1,13 @@
package types
import (
"time"
)
type Sample struct {
Data []byte
Samples uint32
Data []byte
Timestamp time.Time
Duration time.Duration
}
type WebRTCManager interface {

View File

@ -2,6 +2,12 @@ package types
import "net/http"
type Stats struct {
Connections uint32 `json:"connections"`
Host string `json:"host"`
Members []*Member `json:"members"`
}
type WebSocket interface {
Address() string
Send(v interface{}) error
@ -12,4 +18,6 @@ type WebSocketHandler interface {
Start() error
Shutdown() error
Upgrade(w http.ResponseWriter, r *http.Request) error
Stats() Stats
IsAdmin(password string) (bool, error)
}

View File

@ -5,7 +5,7 @@ import (
"encoding/binary"
"strconv"
"github.com/pion/webrtc/v2"
"github.com/pion/webrtc/v3"
)
const OP_MOVE = 0x01

View File

@ -3,7 +3,7 @@ package webrtc
import (
"sync"
"github.com/pion/webrtc/v2"
"github.com/pion/webrtc/v3"
)
type Peer struct {
@ -28,7 +28,7 @@ func (peer *Peer) WriteData(v interface{}) error {
}
func (peer *Peer) Destroy() error {
if peer.connection != nil && peer.connection.ConnectionState() == webrtc.PeerConnectionStateConnected {
if peer.connection != nil && peer.connection.ConnectionState() != webrtc.PeerConnectionStateClosed {
if err := peer.connection.Close(); err != nil {
return err
}

View File

@ -1,13 +1,15 @@
package webrtc
import (
"encoding/json"
"fmt"
"io"
"math/rand"
"strings"
"time"
"github.com/pion/webrtc/v2"
"github.com/pion/webrtc/v2/pkg/media"
"github.com/pion/interceptor"
"github.com/pion/webrtc/v3"
"github.com/pion/webrtc/v3/pkg/media"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
@ -26,10 +28,10 @@ func New(sessions types.SessionManager, remote types.RemoteManager, config *conf
type WebRTCManager struct {
logger zerolog.Logger
videoTrack *webrtc.Track
audioTrack *webrtc.Track
videoCodec *webrtc.RTPCodec
audioCodec *webrtc.RTPCodec
videoTrack *webrtc.TrackLocalStaticSample
audioTrack *webrtc.TrackLocalStaticSample
videoCodec webrtc.RTPCodecParameters
audioCodec webrtc.RTPCodecParameters
sessions types.SessionManager
remote types.RemoteManager
config *config.WebRTC
@ -97,15 +99,22 @@ func (manager *WebRTCManager) CreatePeer(id string, session types.Session) (stri
settings.SetEphemeralUDPPortRange(manager.config.EphemeralMin, manager.config.EphemeralMax)
settings.SetNAT1To1IPs(manager.config.NAT1To1IPs, webrtc.ICECandidateTypeHost)
settings.SetICETimeouts(6 * time.Second, 6 * time.Second, 3 * time.Second)
settings.SetSRTPReplayProtectionWindow(512)
// Create MediaEngine based off sdp
engine := webrtc.MediaEngine{}
engine.RegisterCodec(manager.audioCodec)
engine.RegisterCodec(manager.videoCodec)
engine.RegisterCodec(manager.audioCodec, webrtc.RTPCodecTypeAudio)
engine.RegisterCodec(manager.videoCodec, webrtc.RTPCodecTypeVideo)
i := &interceptor.Registry{}
if err := webrtc.RegisterDefaultInterceptors(&engine, i); err != nil {
return "", manager.config.ICELite, manager.config.ICEServers, err
}
// Create API with MediaEngine and SettingEngine
api := webrtc.NewAPI(webrtc.WithMediaEngine(engine), webrtc.WithSettingEngine(settings))
api := webrtc.NewAPI(webrtc.WithMediaEngine(&engine), webrtc.WithSettingEngine(settings), webrtc.WithInterceptorRegistry(i))
// Create new peer connection
connection, err := api.NewPeerConnection(*configuration)
@ -113,19 +122,10 @@ func (manager *WebRTCManager) CreatePeer(id string, session types.Session) (stri
return "", manager.config.ICELite, manager.config.ICEServers, err
}
if _, err = connection.AddTransceiverFromTrack(manager.videoTrack, webrtc.RtpTransceiverInit{
Direction: webrtc.RTPTransceiverDirectionSendonly,
}); err != nil {
return "", manager.config.ICELite, manager.config.ICEServers, err
}
if _, err = connection.AddTransceiverFromTrack(manager.audioTrack, webrtc.RtpTransceiverInit{
Direction: webrtc.RTPTransceiverDirectionSendonly,
}); err != nil {
return "", manager.config.ICELite, manager.config.ICEServers, err
}
description, err := connection.CreateOffer(nil)
negotiated := true
_, err = connection.CreateDataChannel("data", &webrtc.DataChannelInit{
Negotiated: &negotiated,
})
if err != nil {
return "", manager.config.ICELite, manager.config.ICEServers, err
}
@ -138,21 +138,69 @@ func (manager *WebRTCManager) CreatePeer(id string, session types.Session) (stri
})
})
connection.SetLocalDescription(description)
// Set the handler for ICE connection state
// This will notify you when the peer has connected/disconnected
connection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) {
manager.logger.Info().
Str("connection_state", connectionState.String()).
Msg("connection state has changed")
})
rtpVideo, err := connection.AddTrack(manager.videoTrack);
if err != nil {
return "", manager.config.ICELite, manager.config.ICEServers, err
}
rtpAudio, err := connection.AddTrack(manager.audioTrack);
if err != nil {
return "", manager.config.ICELite, manager.config.ICEServers, err
}
description, err := connection.CreateOffer(nil)
if err != nil {
return "", manager.config.ICELite, manager.config.ICEServers, err
}
err = connection.SetLocalDescription(description)
if err != nil {
return "", manager.config.ICELite, manager.config.ICEServers, err
}
connection.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
switch state {
case webrtc.PeerConnectionStateDisconnected:
case webrtc.PeerConnectionStateFailed:
manager.logger.Info().Str("id", id).Msg("peer disconnected")
manager.sessions.Destroy(id)
break
case webrtc.PeerConnectionStateFailed:
manager.logger.Warn().Str("id", id).Msg("peer failed")
manager.sessions.Destroy(id)
case webrtc.PeerConnectionStateClosed:
manager.logger.Info().Str("id", id).Msg("peer closed")
manager.sessions.Destroy(id)
case webrtc.PeerConnectionStateConnected:
manager.logger.Info().Str("id", id).Msg("peer connected")
if err = session.SetConnected(true); err != nil {
manager.logger.Warn().Err(err).Msg("unable to set connected on peer")
manager.sessions.Destroy(id)
}
break
}
})
connection.OnICECandidate(func(i *webrtc.ICECandidate) {
if i == nil {
manager.logger.Info().Msg("sent all ICECandidates")
return
}
candidateString, err := json.Marshal(i.ToJSON())
if err != nil {
manager.logger.Warn().Err(err).Msg("converting ICECandidate to json failed")
return
}
if err := session.SignalCandidate(string(candidateString)); err != nil {
manager.logger.Warn().Err(err).Msg("sending SignalCandidate failed")
return
}
})
@ -168,33 +216,55 @@ func (manager *WebRTCManager) CreatePeer(id string, session types.Session) (stri
return "", manager.config.ICELite, manager.config.ICEServers, err
}
go func() {
rtcpBuf := make([]byte, 1500)
for {
if _, _, rtcpErr := rtpVideo.Read(rtcpBuf); rtcpErr != nil {
return
}
}
}()
go func() {
rtcpBuf := make([]byte, 1500)
for {
if _, _, rtcpErr := rtpAudio.Read(rtcpBuf); rtcpErr != nil {
return
}
}
}()
return description.SDP, manager.config.ICELite, manager.config.ICEServers, nil
}
func (m *WebRTCManager) createTrack(codecName string) (*webrtc.Track, *webrtc.RTPCodec, error) {
var codec *webrtc.RTPCodec
func (m *WebRTCManager) createTrack(codecName string) (*webrtc.TrackLocalStaticSample, webrtc.RTPCodecParameters, error) {
var codec webrtc.RTPCodecParameters
fb := []webrtc.RTCPFeedback{}
switch codecName {
case webrtc.VP8:
codec = webrtc.NewRTPVP8Codec(webrtc.DefaultPayloadTypeVP8, 90000)
case webrtc.VP9:
codec = webrtc.NewRTPVP9Codec(webrtc.DefaultPayloadTypeVP9, 90000)
case webrtc.H264:
codec = webrtc.NewRTPH264Codec(webrtc.DefaultPayloadTypeH264, 90000)
case webrtc.Opus:
codec = webrtc.NewRTPOpusCodec(webrtc.DefaultPayloadTypeOpus, 48000)
case webrtc.G722:
codec = webrtc.NewRTPG722Codec(webrtc.DefaultPayloadTypeG722, 8000)
case webrtc.PCMU:
codec = webrtc.NewRTPPCMUCodec(webrtc.DefaultPayloadTypePCMU, 8000)
case webrtc.PCMA:
codec = webrtc.NewRTPPCMACodec(webrtc.DefaultPayloadTypePCMA, 8000)
case "VP8":
codec = webrtc.RTPCodecParameters{RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: "video/VP8", ClockRate: 90000, Channels: 0, SDPFmtpLine: "", RTCPFeedback: fb}, PayloadType: 96}
case "VP9":
codec = webrtc.RTPCodecParameters{RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: "video/VP9", ClockRate: 90000, Channels: 0, SDPFmtpLine: "", RTCPFeedback: fb}, PayloadType: 98}
case "H264":
codec = webrtc.RTPCodecParameters{RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: "video/H264", ClockRate: 90000, Channels: 0, SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42001f", RTCPFeedback: fb}, PayloadType: 102}
case "Opus":
codec = webrtc.RTPCodecParameters{RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: "audio/opus", ClockRate: 48000, Channels: 2, SDPFmtpLine: "", RTCPFeedback: fb}, PayloadType: 111}
case "G722":
codec = webrtc.RTPCodecParameters{RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: "audio/G722", ClockRate: 8000, Channels: 0, SDPFmtpLine: "", RTCPFeedback: fb}, PayloadType: 9}
case "PCMU":
codec = webrtc.RTPCodecParameters{RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: "audio/PCMU", ClockRate: 8000, Channels: 0, SDPFmtpLine: "", RTCPFeedback: fb}, PayloadType: 0}
case "PCMA":
codec = webrtc.RTPCodecParameters{RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: "audio/PCMA", ClockRate: 8000, Channels: 0, SDPFmtpLine: "", RTCPFeedback: fb}, PayloadType: 8}
default:
return nil, nil, fmt.Errorf("unknown codec %s", codecName)
return nil, codec, fmt.Errorf("unknown codec %s", codecName)
}
track, err := webrtc.NewTrack(codec.PayloadType, rand.Uint32(), "stream", "stream", codec)
track, err := webrtc.NewTrackLocalStaticSample(codec.RTPCodecCapability, "stream", "stream")
if err != nil {
return nil, nil, err
return nil, codec, err
}
return track, codec, nil

View File

@ -0,0 +1,56 @@
package websocket
import (
"n.eko.moe/neko/internal/types"
"n.eko.moe/neko/internal/types/event"
"n.eko.moe/neko/internal/types/message"
)
func (h *MessageHandler) boradcastCreate(session types.Session, payload *message.BroadcastCreate) error {
if !session.Admin() {
h.logger.Debug().Msg("user not admin")
return nil
}
h.broadcast.Create(payload.URL)
if err := h.boradcastStatus(session); err != nil {
return err
}
return nil
}
func (h *MessageHandler) boradcastDestroy(session types.Session) error {
if !session.Admin() {
h.logger.Debug().Msg("user not admin")
return nil
}
h.broadcast.Destroy()
if err := h.boradcastStatus(session); err != nil {
return err
}
return nil
}
func (h *MessageHandler) boradcastStatus(session types.Session) error {
if !session.Admin() {
h.logger.Debug().Msg("user not admin")
return nil
}
if err := session.Send(
message.BroadcastStatus{
Event: event.BORADCAST_STATUS,
IsActive: h.broadcast.IsActive(),
URL: h.broadcast.GetUrl(),
}); err != nil {
h.logger.Warn().Err(err).Msgf("sending event %s has failed", event.BORADCAST_STATUS)
return err
}
return nil
}

View File

@ -123,6 +123,41 @@ func (h *MessageHandler) controlKeyboard(id string, session types.Session, paylo
return nil
}
h.remote.SetKeyboardLayout(payload.Layout)
// change layout
if payload.Layout != nil {
h.remote.SetKeyboardLayout(*payload.Layout)
}
// set num lock
var NumLock = 0
if payload.NumLock == nil {
NumLock = -1
} else if *payload.NumLock {
NumLock = 1
}
// set caps lock
var CapsLock = 0
if payload.CapsLock == nil {
CapsLock = -1
} else if *payload.CapsLock {
CapsLock = 1
}
// set scroll lock
var ScrollLock = 0
if payload.ScrollLock == nil {
ScrollLock = -1
} else if *payload.ScrollLock {
ScrollLock = 1
}
h.logger.Debug().
Int("NumLock", NumLock).
Int("CapsLock", CapsLock).
Int("ScrollLock", ScrollLock).
Msg("setting keyboard modifiers")
h.remote.SetKeyboardModifiers(NumLock, CapsLock, ScrollLock)
return nil
}

View File

@ -13,15 +13,16 @@ import (
)
type MessageHandler struct {
logger zerolog.Logger
sessions types.SessionManager
webrtc types.WebRTCManager
remote types.RemoteManager
banned map[string]bool
locked bool
logger zerolog.Logger
sessions types.SessionManager
webrtc types.WebRTCManager
remote types.RemoteManager
broadcast types.BroadcastManager
banned map[string]bool
locked bool
}
func (h *MessageHandler) Connected(id string, socket *WebSocket) (bool, string, error) {
func (h *MessageHandler) Connected(admin bool, socket *WebSocket) (bool, string, error) {
address := socket.Address()
if address == "" {
h.logger.Debug().Msg("no remote address")
@ -33,22 +34,15 @@ func (h *MessageHandler) Connected(id string, socket *WebSocket) (bool, string,
}
}
if h.locked {
session, ok := h.sessions.Get(id)
if !ok || !session.Admin() {
h.logger.Debug().Msg("server locked")
return false, "locked", nil
}
if h.locked && !admin {
h.logger.Debug().Msg("server locked")
return false, "locked", nil
}
return true, "", nil
}
func (h *MessageHandler) Disconnected(id string) error {
if h.locked && len(h.sessions.Admins()) == 0 {
h.locked = false
}
return h.sessions.Destroy(id)
}
@ -123,6 +117,16 @@ func (h *MessageHandler) Message(id string, raw []byte) error {
return h.screenSet(id, session, payload)
}), "%s failed", header.Event)
// Boradcast Events
case event.BORADCAST_CREATE:
payload := &message.BroadcastCreate{}
return errors.Wrapf(
utils.Unmarshal(payload, raw, func() error {
return h.boradcastCreate(session, payload)
}), "%s failed", header.Event)
case event.BORADCAST_DESTROY:
return errors.Wrapf(h.boradcastDestroy(session), "%s failed", header.Event)
// Admin Events
case event.ADMIN_LOCK:
return errors.Wrapf(h.adminLock(id, session), "%s failed", header.Event)

View File

@ -17,6 +17,22 @@ func (h *MessageHandler) SessionCreated(id string, session types.Session) error
if err := h.screenConfigurations(id, session); err != nil {
return err
}
// send broadcast status if admin
if err := h.boradcastStatus(session); err != nil {
return err
}
// if locked, notify admin about that
if h.locked {
if err := session.Send(message.Admin{
Event: event.ADMIN_LOCK,
ID: id,
}); err != nil {
h.logger.Warn().Str("id", id).Err(err).Msgf("sending event %s has failed", event.ADMIN_LOCK)
return err
}
}
}
return nil

View File

@ -3,6 +3,7 @@ package websocket
import (
"fmt"
"net/http"
"sync/atomic"
"time"
"github.com/gorilla/websocket"
@ -16,27 +17,29 @@ import (
"n.eko.moe/neko/internal/utils"
)
func New(sessions types.SessionManager, remote types.RemoteManager, webrtc types.WebRTCManager, conf *config.WebSocket) *WebSocketHandler {
func New(sessions types.SessionManager, remote types.RemoteManager, broadcast types.BroadcastManager, webrtc types.WebRTCManager, conf *config.WebSocket) *WebSocketHandler {
logger := log.With().Str("module", "websocket").Logger()
return &WebSocketHandler{
logger: logger,
conf: conf,
sessions: sessions,
remote: remote,
upgrader: websocket.Upgrader{
logger: logger,
conf: conf,
sessions: sessions,
remote: remote,
upgrader: websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
},
handler: &MessageHandler{
logger: logger.With().Str("subsystem", "handler").Logger(),
remote: remote,
sessions: sessions,
webrtc: webrtc,
banned: make(map[string]bool),
locked: false,
logger: logger.With().Str("subsystem", "handler").Logger(),
remote: remote,
broadcast: broadcast,
sessions: sessions,
webrtc: webrtc,
banned: make(map[string]bool),
locked: false,
},
conns: 0,
}
}
@ -44,13 +47,14 @@ func New(sessions types.SessionManager, remote types.RemoteManager, webrtc types
const pingPeriod = 60 * time.Second
type WebSocketHandler struct {
logger zerolog.Logger
upgrader websocket.Upgrader
sessions types.SessionManager
remote types.RemoteManager
conf *config.WebSocket
handler *MessageHandler
shutdown chan bool
logger zerolog.Logger
upgrader websocket.Upgrader
sessions types.SessionManager
remote types.RemoteManager
conf *config.WebSocket
handler *MessageHandler
conns uint32
shutdown chan bool
}
func (ws *WebSocketHandler) Start() error {
@ -149,7 +153,7 @@ func (ws *WebSocketHandler) Upgrade(w http.ResponseWriter, r *http.Request) erro
connection: connection,
}
ok, reason, err := ws.handler.Connected(id, socket)
ok, reason, err := ws.handler.Connected(admin, socket)
if err != nil {
ws.logger.Error().Err(err).Msg("connection failed")
return err
@ -178,18 +182,48 @@ func (ws *WebSocketHandler) Upgrade(w http.ResponseWriter, r *http.Request) erro
Str("address", connection.RemoteAddr().String()).
Msg("new connection created")
atomic.AddUint32(&ws.conns, uint32(1))
defer func() {
ws.logger.
Debug().
Str("session", id).
Str("address", connection.RemoteAddr().String()).
Msg("session ended")
atomic.AddUint32(&ws.conns, ^uint32(0))
}()
ws.handle(connection, id)
return nil
}
func (ws *WebSocketHandler) Stats() types.Stats {
host := ""
session, ok := ws.sessions.GetHost()
if ok {
host = session.ID()
}
return types.Stats{
Connections: atomic.LoadUint32(&ws.conns),
Host: host,
Members: ws.sessions.Members(),
}
}
func (ws *WebSocketHandler) IsAdmin(password string) (bool, error) {
if password == ws.conf.AdminPassword {
return true, nil
}
if password == ws.conf.Password {
return false, nil
}
return false, fmt.Errorf("invalid password")
}
func (ws *WebSocketHandler) authenticate(r *http.Request) (string, string, bool, error) {
ip := r.RemoteAddr
@ -207,15 +241,8 @@ func (ws *WebSocketHandler) authenticate(r *http.Request) (string, string, bool,
return "", ip, false, fmt.Errorf("no password provided")
}
if passwords[0] == ws.conf.AdminPassword {
return id, ip, true, nil
}
if passwords[0] == ws.conf.Password {
return id, ip, false, nil
}
return "", ip, false, fmt.Errorf("invalid password: %s", passwords[0])
isAdmin, err := ws.IsAdmin(passwords[0])
return id, ip, isAdmin, err
}
func (ws *WebSocketHandler) handle(connection *websocket.Conn, id string) {

View File

@ -170,5 +170,26 @@ void SetKeyboardLayout(char *layout) {
// TOOD: refactor, use native API.
char cmd[13] = "setxkbmap ";
strncat(cmd, layout, 2);
system(cmd);
int r = system(cmd);
}
void SetKeyboardModifiers(int num_lock, int caps_lock, int scroll_lock) {
Display *display = getXDisplay();
if (num_lock != -1) {
XkbLockModifiers(display, XkbUseCoreKbd, 16, num_lock * 16);
}
if (caps_lock != -1) {
XkbLockModifiers(display, XkbUseCoreKbd, 2, caps_lock * 2);
}
if (scroll_lock != -1) {
XKeyboardControl values;
values.led_mode = scroll_lock ? LedModeOn : LedModeOff;
values.led = 3;
XChangeKeyboardControl(display, KBLedMode, &values);
}
XFlush(display);
}

View File

@ -225,6 +225,13 @@ func SetKeyboardLayout(layout string) {
C.SetKeyboardLayout(layoutUnsafe)
}
func SetKeyboardModifiers(num_lock int, caps_lock int, scroll_lock int) {
mu.Lock()
defer mu.Unlock()
C.SetKeyboardModifiers(C.int(num_lock), C.int(caps_lock), C.int(scroll_lock))
}
//export goCreateScreenSize
func goCreateScreenSize(index C.int, width C.int, height C.int, mwidth C.int, mheight C.int) {
ScreenConfigurations[int(index)] = types.ScreenConfiguration{

View File

@ -4,6 +4,7 @@
#define XDISPLAY_H
#include <X11/Xlib.h>
#include <X11/XKBlib.h>
#include <X11/extensions/Xrandr.h>
#include <X11/extensions/XTest.h>
#include <libclipboard.h>
@ -40,5 +41,6 @@
void XDisplaySet(char *input);
void SetKeyboardLayout(char *layout);
void SetKeyboardModifiers(int num_lock, int caps_lock, int scroll_lock);
#endif