summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorxegineering <me@xegineering.eu>2024-12-11 20:03:29 +0100
committerxegineering <me@xegineering.eu>2024-12-15 12:36:51 +0100
commitca5ebc8a795114a72f4410f05b2270f24ead1d60 (patch)
tree5ca901ebc8365909e4fd2ec11a4cf1ff114f149d
parent62d3045df3283872628b0b8b8a8caeef1c226dfa (diff)
downloadsoundbox-go-ca5ebc8a795114a72f4410f05b2270f24ead1d60.tar
soundbox-go-ca5ebc8a795114a72f4410f05b2270f24ead1d60.tar.zst
soundbox-go-ca5ebc8a795114a72f4410f05b2270f24ead1d60.zip
pipewire: Implement capture tear-down
This closes the PipeWire process properly to not leak memory and remove the PipeWire capture node of soundbox as soon as the context passed to StreamPipewireContext() is closed.
-rw-r--r--soundbox/pipewire-binding.c48
-rw-r--r--soundbox/pipewire-binding.h5
-rw-r--r--soundbox/pipewire.go52
3 files changed, 78 insertions, 27 deletions
diff --git a/soundbox/pipewire-binding.c b/soundbox/pipewire-binding.c
index bd23ff0..0a31843 100644
--- a/soundbox/pipewire-binding.c
+++ b/soundbox/pipewire-binding.c
@@ -2,6 +2,7 @@
#include <spa/param/audio/format-utils.h>
#include "pipewire-binding.h"
+#include "pipewire/main-loop.h"
#define SAMPLING_RATE 48000
@@ -11,15 +12,21 @@
#define STRIDE sizeof(int16_t) * CHANNELS
+struct pw_go_capture {
+ struct pw_main_loop *loop;
+ struct pw_stream *stream;
+};
+
+
static void on_process(void *userdata)
{
- struct pw_stream *stream = *(struct pw_stream **)userdata;
+ struct pw_go_capture *capture = (struct pw_go_capture *)userdata;
struct pw_buffer *pw_buf;
struct spa_buffer *spa_buf;
int n_frames;
int16_t *src;
- if ((pw_buf = pw_stream_dequeue_buffer(stream)) == NULL) {
+ if ((pw_buf = pw_stream_dequeue_buffer(capture->stream)) == NULL) {
return;
}
@@ -41,7 +48,7 @@ static void on_process(void *userdata)
spa_buf->datas[0].chunk->stride = STRIDE;
spa_buf->datas[0].chunk->size = n_frames * STRIDE;
- pw_stream_queue_buffer(stream, pw_buf);
+ pw_stream_queue_buffer(capture->stream, pw_buf);
}
@@ -51,15 +58,14 @@ static const struct pw_stream_events stream_events = {
};
-void pw_stdout(void)
+void *pw_go_capture_init(void)
{
pw_init(NULL, NULL);
- struct pw_main_loop *loop = pw_main_loop_new(NULL);
-
- struct pw_stream *stream = NULL;
- stream = pw_stream_new_simple(
- pw_main_loop_get_loop(loop),
+ struct pw_go_capture *capture = malloc(sizeof(struct pw_go_capture));
+ capture->loop = pw_main_loop_new(NULL);
+ capture->stream = pw_stream_new_simple(
+ pw_main_loop_get_loop(capture->loop),
NODE_NAME,
pw_properties_new(
PW_KEY_MEDIA_TYPE, "Audio",
@@ -69,7 +75,7 @@ void pw_stdout(void)
NULL
),
&stream_events,
- &stream
+ capture
);
uint8_t buffer[1024];
@@ -87,7 +93,7 @@ void pw_stdout(void)
};
pw_stream_connect(
- stream,
+ capture->stream,
PW_DIRECTION_INPUT,
PW_ID_ANY,
PW_STREAM_FLAG_MAP_BUFFERS |
@@ -96,9 +102,23 @@ void pw_stdout(void)
sizeof(params) / sizeof(params[0])
);
- pw_main_loop_run(loop);
+ return (void *)capture;
+}
+
- pw_stream_destroy(stream);
- pw_main_loop_destroy(loop);
+void pw_go_capture_run(void *cdata)
+{
+ struct pw_go_capture *capture = cdata;
+ pw_main_loop_run(capture->loop);
+ pw_stream_destroy(capture->stream);
+ pw_main_loop_destroy(capture->loop);
+ free(capture);
pw_deinit();
}
+
+
+void pw_go_capture_deinit(void *cdata)
+{
+ struct pw_go_capture *capture = cdata;
+ pw_main_loop_quit(capture->loop);
+}
diff --git a/soundbox/pipewire-binding.h b/soundbox/pipewire-binding.h
index d25a0b9..58ee93a 100644
--- a/soundbox/pipewire-binding.h
+++ b/soundbox/pipewire-binding.h
@@ -5,6 +5,9 @@
#include <stddef.h>
extern void goHandleData(int16_t *, size_t);
-void pw_stdout(void);
+
+void *pw_go_capture_init(void);
+void pw_go_capture_run(void *);
+void pw_go_capture_deinit(void *);
#endif // !PIPEWIRE_BINDING_H
diff --git a/soundbox/pipewire.go b/soundbox/pipewire.go
index 00bbe49..476d2e4 100644
--- a/soundbox/pipewire.go
+++ b/soundbox/pipewire.go
@@ -7,7 +7,6 @@ package soundbox
import "C"
import (
- "bytes"
"context"
"io"
"log"
@@ -16,7 +15,41 @@ import (
"unsafe"
)
-var pipewireAudio = make(chan []byte, 5)
+type pwCapture struct {
+ cdata unsafe.Pointer
+}
+
+var pwAudio chan []byte
+
+func newPWCapture(ctx context.Context) pwCapture {
+ pwc := pwCapture{}
+
+ pwAudio = make(chan []byte, 5)
+ pwc.cdata = unsafe.Pointer(C.pw_go_capture_init()) // TODO pass &pwc.audio here
+ go C.pw_go_capture_run(pwc.cdata)
+
+ go func() {
+ <-ctx.Done()
+ C.pw_go_capture_deinit(pwc.cdata)
+ }()
+
+ return pwc
+}
+
+func (pwc pwCapture) Read(p []byte) (int, error) {
+ select {
+ case chunk, ok := <-pwAudio:
+ if ok {
+ noSilence := s16leDropSilence(chunk)
+ i := copy(p, noSilence)
+ return i, nil
+ } else {
+ return 0, io.EOF
+ }
+ default:
+ return 0, nil
+ }
+}
func s16leDropSilence(input []byte) []byte {
output := make([]byte, 0)
@@ -61,16 +94,11 @@ func StreamPipewireContext(ctx context.Context, targets []net.HardwareAddr) erro
return err
}
- go C.pw_stdout()
-
+ pwc := newPWCapture(ctx)
go func() {
- for buffer := range pipewireAudio {
- tempReader := bytes.NewReader(s16leDropSilence(buffer))
- _, err := io.Copy(stdin, tempReader)
- if err != nil {
- log.Println("Failed to copy from PipeWire to ffmpeg.")
- break
- }
+ _, err := io.Copy(stdin, pwc)
+ if err != nil {
+ log.Println("Failed to copy from PipeWire to ffmpeg.")
}
}()
@@ -90,5 +118,5 @@ func StreamPipewireContext(ctx context.Context, targets []net.HardwareAddr) erro
//export goHandleData
func goHandleData(data *C.int16_t, size C.size_t) {
buf := C.GoBytes(unsafe.Pointer(data), C.int(size))
- pipewireAudio <- buf
+ pwAudio <- buf
}