From 06c4f8b0120f5598f9d179b8c0fea33df35659a8 Mon Sep 17 00:00:00 2001 From: xegineering Date: Sun, 17 Nov 2024 12:51:15 +0100 Subject: pipewire: Add experimental PipeWire support This implements a PipeWire capture device which can be used as an input source instead of the already available URL input. Known issues with the current PipeWire support are: - user has to connect the monitor of the default audio sink to the capture device manually - correct shutdown has to be tested - multiple instances do not work - medium code quality requires refactoring Since this is nevertheless usable and possible unknown bugs should be figured out in practise soon this implementation is already added. Bugfixes and refactoring might follow. --- soundbox/pipewire-binding.c | 105 ++++++++++++++++++++++++++++++++++++++++++++ soundbox/pipewire-binding.h | 10 +++++ soundbox/pipewire.go | 80 +++++++++++++++++++++++++++++++++ soundbox/pipewire_test.go | 35 +++++++++++++++ 4 files changed, 230 insertions(+) create mode 100644 soundbox/pipewire-binding.c create mode 100644 soundbox/pipewire-binding.h create mode 100644 soundbox/pipewire.go create mode 100644 soundbox/pipewire_test.go (limited to 'soundbox') diff --git a/soundbox/pipewire-binding.c b/soundbox/pipewire-binding.c new file mode 100644 index 0000000..bb10e31 --- /dev/null +++ b/soundbox/pipewire-binding.c @@ -0,0 +1,105 @@ +#include +#include + +#include "pipewire-binding.h" + + +#define SAMPLING_RATE 48000 +#define CHANNELS 2 +#define VOLUME 0.7 +#define NODE_NAME "soundbox" +#define STRIDE sizeof(int16_t) * CHANNELS + + +static void on_process(void *userdata) +{ + struct pw_stream *stream = *(struct pw_stream **)userdata; + struct pw_buffer *pw_buf; + struct spa_buffer *spa_buf; + int n_frames; + int16_t *src; + + if ((pw_buf = pw_stream_dequeue_buffer(stream)) == NULL) { + return; + } + + spa_buf = pw_buf->buffer; + if ((src = spa_buf->datas[0].data) == NULL) { + return; + } + + n_frames = spa_buf->datas[0].chunk->size / STRIDE; + if (pw_buf->requested) { + n_frames = SPA_MIN(pw_buf->requested, n_frames); + } + + size_t len = spa_buf->datas[0].chunk->size; + + goHandleData(src, len); + + spa_buf->datas[0].chunk->offset = 0; + spa_buf->datas[0].chunk->stride = STRIDE; + spa_buf->datas[0].chunk->size = n_frames * STRIDE; + + pw_stream_queue_buffer(stream, pw_buf); +} + + +static const struct pw_stream_events stream_events = { + PW_VERSION_STREAM_EVENTS, + .process = on_process, +}; + + +void pw_stdout(void) +{ + pw_init(NULL, NULL); + + struct pw_main_loop *loop = pw_main_loop_new(NULL); + + struct pw_stream *stream = NULL; + stream = pw_stream_new_simple( + pw_main_loop_get_loop(loop), + NODE_NAME, + pw_properties_new( + PW_KEY_MEDIA_TYPE, "Audio", + PW_KEY_CONFIG_NAME, "client-rt.conf", + PW_KEY_MEDIA_CATEGORY, "Capture", + PW_KEY_MEDIA_ROLE, "Music", + NULL + ), + &stream_events, + &stream + ); + + uint8_t buffer[1024]; + struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer)); + const struct spa_pod *params[] = { + spa_format_audio_raw_build( + &b, + SPA_PARAM_EnumFormat, + &SPA_AUDIO_INFO_RAW_INIT( + .format = SPA_AUDIO_FORMAT_S16, + .channels = CHANNELS, + .rate = SAMPLING_RATE + ) + ) + }; + + pw_stream_connect( + stream, + PW_DIRECTION_INPUT, + PW_ID_ANY, + PW_STREAM_FLAG_AUTOCONNECT | + PW_STREAM_FLAG_MAP_BUFFERS | + PW_STREAM_FLAG_RT_PROCESS, + params, + sizeof(params) / sizeof(params[0]) + ); + + pw_main_loop_run(loop); + + pw_stream_destroy(stream); + pw_main_loop_destroy(loop); + pw_deinit(); +} diff --git a/soundbox/pipewire-binding.h b/soundbox/pipewire-binding.h new file mode 100644 index 0000000..d25a0b9 --- /dev/null +++ b/soundbox/pipewire-binding.h @@ -0,0 +1,10 @@ +#ifndef PIPEWIRE_BINDING_H +#define PIPEWIRE_BINDING_H + +#include +#include + +extern void goHandleData(int16_t *, size_t); +void pw_stdout(void); + +#endif // !PIPEWIRE_BINDING_H diff --git a/soundbox/pipewire.go b/soundbox/pipewire.go new file mode 100644 index 0000000..fbae73b --- /dev/null +++ b/soundbox/pipewire.go @@ -0,0 +1,80 @@ +package soundbox + +/* +#cgo pkg-config: libpipewire-0.3 +#include "pipewire-binding.h" +*/ +import "C" + +import ( + "bytes" + "context" + "io" + "net" + "log" + "os/exec" + "unsafe" +) + +var pipewireAudio = make(chan []byte, 5) + +func StreamPipewireContext(ctx context.Context, targets []net.HardwareAddr) error { + cmd := exec.CommandContext( + ctx, + "ffmpeg", + "-ac", + "2", + "-ar", + "48000", + "-f", + "s16le", + "-channel_layout", + "stereo", + "-i", + "-", + "-acodec", + "flac", + "-f", + "ogg", + "-", + ) + stdout, err := cmd.StdoutPipe() + if err != nil { + return err + } + stdin, err := cmd.StdinPipe() + if err != nil { + return err + } + + go C.pw_stdout() + + go func() { + for buffer := range pipewireAudio { + tempReader := bytes.NewReader(buffer) + _, err := io.Copy(stdin, tempReader) + if err != nil { + log.Println("Failed to copy from PipeWire to ffmpeg.") + break + } + } + }() + + err = cmd.Start() + if err != nil { + return err + } + + err = streamContext(ctx, stdout, targets) + if err != nil { + return err + } + + return cmd.Wait() +} + +//export goHandleData +func goHandleData(data *C.int16_t, size C.size_t) { + buf := C.GoBytes(unsafe.Pointer(data), C.int(size)) + pipewireAudio <- buf +} diff --git a/soundbox/pipewire_test.go b/soundbox/pipewire_test.go new file mode 100644 index 0000000..b857414 --- /dev/null +++ b/soundbox/pipewire_test.go @@ -0,0 +1,35 @@ +package soundbox_test + +import ( + "context" + "log" + "net" + "time" + + "xengineering.eu/soundbox-go/soundbox" +) + +func ExampleStreamPipewireContext() { + ctx, cancel := context.WithCancel(context.Background()) + + // all soundboxes are referenced by their MAC address + soundboxes := []net.HardwareAddr{ + {0x00, 0x00, 0x5E, 0x00, 0x53, 0x01}, + {0x00, 0x00, 0x5E, 0x00, 0x53, 0x02}, + {0x00, 0x00, 0x5E, 0x00, 0x53, 0x03}, + } + + // start streaming + go func() { + err := soundbox.StreamPipewireContext(ctx, soundboxes) + if err != nil { + log.Fatal(err) + } + }() + + // let it play for some time + time.Sleep(time.Minute) + + // stop it + cancel() +} -- cgit v1.2.3-70-g09d2