From 19c9e4a59b92ce1aaf2ae0cefe2e0f2b6e2e44e3 Mon Sep 17 00:00:00 2001 From: David Robillard Date: Wed, 10 Aug 2022 15:18:59 -0400 Subject: Clean up and document worker implementation --- src/worker.c | 90 +++++++++++++++++++++++++++++++++++------------------------- 1 file changed, 52 insertions(+), 38 deletions(-) (limited to 'src') diff --git a/src/worker.c b/src/worker.c index b062723..1fdfaaf 100644 --- a/src/worker.c +++ b/src/worker.c @@ -1,4 +1,4 @@ -// Copyright 2007-2016 David Robillard +// Copyright 2007-2022 David Robillard // SPDX-License-Identifier: ISC #include "worker.h" @@ -12,8 +12,12 @@ #include #include +#define MAX_PACKET_SIZE 4096U + static LV2_Worker_Status -jalv_worker_write_packet(ZixRing* const target, uint32_t size, const void* data) +jalv_worker_write_packet(ZixRing* const target, + const uint32_t size, + const void* const data) { ZixRingTransaction tx = zix_ring_begin_write(target); if (zix_ring_amend_write(target, &tx, &size, sizeof(size)) || @@ -27,37 +31,46 @@ jalv_worker_write_packet(ZixRing* const target, uint32_t size, const void* data) static LV2_Worker_Status jalv_worker_respond(LV2_Worker_Respond_Handle handle, - uint32_t size, + const uint32_t size, const void* data) { return jalv_worker_write_packet(((JalvWorker*)handle)->responses, size, data); } static void* -worker_func(void* data) +worker_func(void* const data) { - JalvWorker* worker = (JalvWorker*)data; - void* buf = NULL; + JalvWorker* const worker = (JalvWorker*)data; + void* buf = NULL; + while (true) { + // Wait for a request zix_sem_wait(&worker->sem); if (*worker->exit) { break; } + // Read the size header of the request uint32_t size = 0; zix_ring_read(worker->requests, &size, sizeof(size)); + // Reallocate buffer to accommodate request if necessary void* const new_buf = realloc(buf, size); - if (!new_buf) { - break; + if (new_buf) { + // Read request into buffer + buf = new_buf; + zix_ring_read(worker->requests, buf, size); + + // Lock and dispatch request to plugin's work handler + zix_sem_wait(worker->lock); + worker->iface->work( + worker->handle, jalv_worker_respond, worker, size, buf); + zix_sem_post(worker->lock); + + } else { + // Reallocation failed, skip request to avoid corrupting ring + zix_ring_skip(worker->requests, size); } - - buf = new_buf; - zix_ring_read(worker->requests, buf, size); - - zix_sem_wait(worker->lock); - worker->iface->work(worker->handle, jalv_worker_respond, worker, size, buf); - zix_sem_post(worker->lock); } free(buf); @@ -65,24 +78,27 @@ worker_func(void* data) } void -jalv_worker_init(JalvWorker* worker, - const LV2_Worker_Interface* iface, - bool threaded) +jalv_worker_init(JalvWorker* const worker, + const LV2_Worker_Interface* const iface, + const bool threaded) { - worker->iface = iface; - worker->threaded = threaded; + worker->iface = iface; + worker->threaded = threaded; + worker->responses = zix_ring_new(NULL, MAX_PACKET_SIZE); + worker->response = malloc(MAX_PACKET_SIZE); + if (threaded) { - zix_thread_create(&worker->thread, 4096, worker_func, worker); - worker->requests = zix_ring_new(NULL, 4096); + worker->requests = zix_ring_new(NULL, MAX_PACKET_SIZE); + + zix_thread_create(&worker->thread, 4096U, worker_func, worker); zix_ring_mlock(worker->requests); } - worker->responses = zix_ring_new(NULL, 4096); - worker->response = malloc(4096); + zix_ring_mlock(worker->responses); } void -jalv_worker_finish(JalvWorker* worker) +jalv_worker_finish(JalvWorker* const worker) { if (worker->threaded) { zix_sem_post(&worker->sem); @@ -91,7 +107,7 @@ jalv_worker_finish(JalvWorker* worker) } void -jalv_worker_destroy(JalvWorker* worker) +jalv_worker_destroy(JalvWorker* const worker) { zix_ring_free(worker->requests); zix_ring_free(worker->responses); @@ -100,8 +116,8 @@ jalv_worker_destroy(JalvWorker* worker) LV2_Worker_Status jalv_worker_schedule(LV2_Worker_Schedule_Handle handle, - uint32_t size, - const void* data) + const uint32_t size, + const void* const data) { JalvWorker* worker = (JalvWorker*)handle; LV2_Worker_Status st = LV2_WORKER_SUCCESS; @@ -128,18 +144,16 @@ jalv_worker_schedule(LV2_Worker_Schedule_Handle handle, } void -jalv_worker_emit_responses(JalvWorker* worker, LV2_Handle lv2_handle) +jalv_worker_emit_responses(JalvWorker* const worker, LV2_Handle lv2_handle) { - if (worker->responses) { - uint32_t read_space = zix_ring_read_space(worker->responses); - while (read_space) { - uint32_t size = 0; - zix_ring_read(worker->responses, &size, sizeof(size)); - zix_ring_read(worker->responses, worker->response, size); - - worker->iface->work_response(lv2_handle, size, worker->response); + static const uint32_t size_size = (uint32_t)sizeof(uint32_t); - read_space -= sizeof(size) + size; + if (worker->responses) { + uint32_t size = 0U; + while (zix_ring_read(worker->responses, &size, size_size) == size_size) { + if (zix_ring_read(worker->responses, worker->response, size) == size) { + worker->iface->work_response(lv2_handle, size, worker->response); + } } } } -- cgit v1.2.1