From ca5ebc8a795114a72f4410f05b2270f24ead1d60 Mon Sep 17 00:00:00 2001 From: xegineering Date: Wed, 11 Dec 2024 20:03:29 +0100 Subject: pipewire: Implement capture tear-down This closes the PipeWire process properly to not leak memory and remove the PipeWire capture node of soundbox as soon as the context passed to StreamPipewireContext() is closed. --- soundbox/pipewire-binding.c | 48 +++++++++++++++++++++++++++++------------ soundbox/pipewire-binding.h | 5 ++++- soundbox/pipewire.go | 52 ++++++++++++++++++++++++++++++++++----------- 3 files changed, 78 insertions(+), 27 deletions(-) (limited to 'soundbox') diff --git a/soundbox/pipewire-binding.c b/soundbox/pipewire-binding.c index bd23ff0..0a31843 100644 --- a/soundbox/pipewire-binding.c +++ b/soundbox/pipewire-binding.c @@ -2,6 +2,7 @@ #include #include "pipewire-binding.h" +#include "pipewire/main-loop.h" #define SAMPLING_RATE 48000 @@ -11,15 +12,21 @@ #define STRIDE sizeof(int16_t) * CHANNELS +struct pw_go_capture { + struct pw_main_loop *loop; + struct pw_stream *stream; +}; + + static void on_process(void *userdata) { - struct pw_stream *stream = *(struct pw_stream **)userdata; + struct pw_go_capture *capture = (struct pw_go_capture *)userdata; struct pw_buffer *pw_buf; struct spa_buffer *spa_buf; int n_frames; int16_t *src; - if ((pw_buf = pw_stream_dequeue_buffer(stream)) == NULL) { + if ((pw_buf = pw_stream_dequeue_buffer(capture->stream)) == NULL) { return; } @@ -41,7 +48,7 @@ static void on_process(void *userdata) spa_buf->datas[0].chunk->stride = STRIDE; spa_buf->datas[0].chunk->size = n_frames * STRIDE; - pw_stream_queue_buffer(stream, pw_buf); + pw_stream_queue_buffer(capture->stream, pw_buf); } @@ -51,15 +58,14 @@ static const struct pw_stream_events stream_events = { }; -void pw_stdout(void) +void *pw_go_capture_init(void) { pw_init(NULL, NULL); - struct pw_main_loop *loop = pw_main_loop_new(NULL); - - struct pw_stream *stream = NULL; - stream = pw_stream_new_simple( - pw_main_loop_get_loop(loop), + struct pw_go_capture *capture = malloc(sizeof(struct pw_go_capture)); + capture->loop = pw_main_loop_new(NULL); + capture->stream = pw_stream_new_simple( + pw_main_loop_get_loop(capture->loop), NODE_NAME, pw_properties_new( PW_KEY_MEDIA_TYPE, "Audio", @@ -69,7 +75,7 @@ void pw_stdout(void) NULL ), &stream_events, - &stream + capture ); uint8_t buffer[1024]; @@ -87,7 +93,7 @@ void pw_stdout(void) }; pw_stream_connect( - stream, + capture->stream, PW_DIRECTION_INPUT, PW_ID_ANY, PW_STREAM_FLAG_MAP_BUFFERS | @@ -96,9 +102,23 @@ void pw_stdout(void) sizeof(params) / sizeof(params[0]) ); - pw_main_loop_run(loop); + return (void *)capture; +} + - pw_stream_destroy(stream); - pw_main_loop_destroy(loop); +void pw_go_capture_run(void *cdata) +{ + struct pw_go_capture *capture = cdata; + pw_main_loop_run(capture->loop); + pw_stream_destroy(capture->stream); + pw_main_loop_destroy(capture->loop); + free(capture); pw_deinit(); } + + +void pw_go_capture_deinit(void *cdata) +{ + struct pw_go_capture *capture = cdata; + pw_main_loop_quit(capture->loop); +} diff --git a/soundbox/pipewire-binding.h b/soundbox/pipewire-binding.h index d25a0b9..58ee93a 100644 --- a/soundbox/pipewire-binding.h +++ b/soundbox/pipewire-binding.h @@ -5,6 +5,9 @@ #include extern void goHandleData(int16_t *, size_t); -void pw_stdout(void); + +void *pw_go_capture_init(void); +void pw_go_capture_run(void *); +void pw_go_capture_deinit(void *); #endif // !PIPEWIRE_BINDING_H diff --git a/soundbox/pipewire.go b/soundbox/pipewire.go index 00bbe49..476d2e4 100644 --- a/soundbox/pipewire.go +++ b/soundbox/pipewire.go @@ -7,7 +7,6 @@ package soundbox import "C" import ( - "bytes" "context" "io" "log" @@ -16,7 +15,41 @@ import ( "unsafe" ) -var pipewireAudio = make(chan []byte, 5) +type pwCapture struct { + cdata unsafe.Pointer +} + +var pwAudio chan []byte + +func newPWCapture(ctx context.Context) pwCapture { + pwc := pwCapture{} + + pwAudio = make(chan []byte, 5) + pwc.cdata = unsafe.Pointer(C.pw_go_capture_init()) // TODO pass &pwc.audio here + go C.pw_go_capture_run(pwc.cdata) + + go func() { + <-ctx.Done() + C.pw_go_capture_deinit(pwc.cdata) + }() + + return pwc +} + +func (pwc pwCapture) Read(p []byte) (int, error) { + select { + case chunk, ok := <-pwAudio: + if ok { + noSilence := s16leDropSilence(chunk) + i := copy(p, noSilence) + return i, nil + } else { + return 0, io.EOF + } + default: + return 0, nil + } +} func s16leDropSilence(input []byte) []byte { output := make([]byte, 0) @@ -61,16 +94,11 @@ func StreamPipewireContext(ctx context.Context, targets []net.HardwareAddr) erro return err } - go C.pw_stdout() - + pwc := newPWCapture(ctx) go func() { - for buffer := range pipewireAudio { - tempReader := bytes.NewReader(s16leDropSilence(buffer)) - _, err := io.Copy(stdin, tempReader) - if err != nil { - log.Println("Failed to copy from PipeWire to ffmpeg.") - break - } + _, err := io.Copy(stdin, pwc) + if err != nil { + log.Println("Failed to copy from PipeWire to ffmpeg.") } }() @@ -90,5 +118,5 @@ func StreamPipewireContext(ctx context.Context, targets []net.HardwareAddr) erro //export goHandleData func goHandleData(data *C.int16_t, size C.size_t) { buf := C.GoBytes(unsafe.Pointer(data), C.int(size)) - pipewireAudio <- buf + pwAudio <- buf } -- cgit v1.2.3-70-g09d2