aboutsummaryrefslogtreecommitdiffstats
path: root/src/worker.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/worker.c')
-rw-r--r--src/worker.c324
1 files changed, 193 insertions, 131 deletions
diff --git a/src/worker.c b/src/worker.c
index 8cb09d1..0dca657 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -1,164 +1,226 @@
-/*
- Copyright 2007-2016 David Robillard <http://drobilla.net>
-
- Permission to use, copy, modify, and/or distribute this software for any
- purpose with or without fee is hereby granted, provided that the above
- copyright notice and this permission notice appear in all copies.
-
- THIS SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
- WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
- MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
- ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
- WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
- ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
- OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-*/
+// Copyright 2007-2022 David Robillard <d@drobilla.net>
+// SPDX-License-Identifier: ISC
#include "worker.h"
+#include "lv2/core/lv2.h"
+#include "lv2/worker/worker.h"
+#include "zix/ring.h"
+#include "zix/sem.h"
+#include "zix/status.h"
+#include "zix/thread.h"
+
+#include <stdio.h>
+#include <stdlib.h>
+
+#define MAX_PACKET_SIZE 4096U
+
+struct JalvWorkerImpl {
+ ZixRing* requests; ///< Requests to the worker
+ ZixRing* responses; ///< Responses from the worker
+ void* response; ///< Worker response buffer
+ ZixSem* lock; ///< Lock for plugin work() method
+ bool exit; ///< Exit flag
+ ZixSem sem; ///< Worker semaphore
+ ZixThread thread; ///< Worker thread
+ LV2_Handle handle; ///< Plugin handle
+ const LV2_Worker_Interface* iface; ///< Plugin worker interface
+ bool threaded; ///< Run work in another thread
+};
+
+static LV2_Worker_Status
+jalv_worker_write_packet(ZixRing* const target,
+ const uint32_t size,
+ const void* const data)
+{
+ ZixRingTransaction tx = zix_ring_begin_write(target);
+ if (zix_ring_amend_write(target, &tx, &size, sizeof(size)) ||
+ zix_ring_amend_write(target, &tx, data, size)) {
+ return LV2_WORKER_ERR_NO_SPACE;
+ }
+
+ zix_ring_commit_write(target, &tx);
+ return LV2_WORKER_SUCCESS;
+}
+
static LV2_Worker_Status
jalv_worker_respond(LV2_Worker_Respond_Handle handle,
- uint32_t size,
+ const uint32_t size,
const void* data)
{
- JalvWorker* worker = (JalvWorker*)handle;
- if (zix_ring_write_space(worker->responses) < (sizeof(size) + size)) {
- return LV2_WORKER_ERR_NO_SPACE;
- }
-
- zix_ring_write(worker->responses, (const char*)&size, sizeof(size));
- zix_ring_write(worker->responses, (const char*)data, size);
- return LV2_WORKER_SUCCESS;
+ return jalv_worker_write_packet(((JalvWorker*)handle)->responses, size, data);
}
static void*
-worker_func(void* data)
+worker_func(void* const data)
+{
+ JalvWorker* const worker = (JalvWorker*)data;
+ void* buf = NULL;
+
+ while (true) {
+ // Wait for a request
+ zix_sem_wait(&worker->sem);
+ if (worker->exit) {
+ break;
+ }
+
+ // Read the size header of the request
+ uint32_t size = 0;
+ zix_ring_read(worker->requests, &size, sizeof(size));
+
+ // Reallocate buffer to accommodate request if necessary
+ void* const new_buf = realloc(buf, size);
+ if (new_buf) {
+ // Read request into buffer
+ buf = new_buf;
+ zix_ring_read(worker->requests, buf, size);
+
+ // Lock and dispatch request to plugin's work handler
+ zix_sem_wait(worker->lock);
+ worker->iface->work(
+ worker->handle, jalv_worker_respond, worker, size, buf);
+ zix_sem_post(worker->lock);
+
+ } else {
+ // Reallocation failed, skip request to avoid corrupting ring
+ zix_ring_skip(worker->requests, size);
+ }
+ }
+
+ free(buf);
+ return NULL;
+}
+
+static ZixStatus
+jalv_worker_launch(JalvWorker* const worker)
{
- JalvWorker* worker = (JalvWorker*)data;
- Jalv* jalv = worker->jalv;
- void* buf = NULL;
- while (true) {
- zix_sem_wait(&worker->sem);
- if (jalv->exit) {
- break;
- }
-
- uint32_t size = 0;
- zix_ring_read(worker->requests, (char*)&size, sizeof(size));
-
- if (!(buf = realloc(buf, size))) {
- fprintf(stderr, "error: realloc() failed\n");
- free(buf);
- return NULL;
- }
-
- zix_ring_read(worker->requests, (char*)buf, size);
-
- zix_sem_wait(&jalv->work_lock);
- worker->iface->work(
- jalv->instance->lv2_handle, jalv_worker_respond, worker, size, buf);
- zix_sem_post(&jalv->work_lock);
- }
-
- free(buf);
- return NULL;
+ ZixStatus st = ZIX_STATUS_SUCCESS;
+
+ if ((st = zix_sem_init(&worker->sem, 0)) ||
+ (st = zix_thread_create(&worker->thread, 4096U, worker_func, worker))) {
+ return st;
+ }
+
+ ZixRing* const requests = zix_ring_new(NULL, MAX_PACKET_SIZE);
+ if (!requests) {
+ zix_thread_join(worker->thread);
+ zix_sem_destroy(&worker->sem);
+ return ZIX_STATUS_NO_MEM;
+ }
+
+ zix_ring_mlock(requests);
+ worker->requests = requests;
+ return ZIX_STATUS_SUCCESS;
+}
+
+JalvWorker*
+jalv_worker_new(ZixSem* const lock, const bool threaded)
+{
+ JalvWorker* const worker = (JalvWorker*)calloc(1, sizeof(JalvWorker));
+ ZixRing* const responses = zix_ring_new(NULL, MAX_PACKET_SIZE);
+ void* const response = calloc(1, MAX_PACKET_SIZE);
+
+ if (worker && responses && response) {
+ worker->threaded = threaded;
+ worker->responses = responses;
+ worker->response = response;
+ worker->lock = lock;
+ worker->exit = false;
+
+ zix_ring_mlock(responses);
+ if (!threaded || !jalv_worker_launch(worker)) {
+ return worker;
+ }
+ }
+
+ free(worker);
+ zix_ring_free(responses);
+ free(response);
+ return NULL;
}
void
-jalv_worker_init(ZIX_UNUSED Jalv* jalv,
- JalvWorker* worker,
- const LV2_Worker_Interface* iface,
- bool threaded)
+jalv_worker_start(JalvWorker* const worker,
+ const LV2_Worker_Interface* const iface,
+ LV2_Handle handle)
{
- worker->iface = iface;
- worker->threaded = threaded;
- if (threaded) {
- zix_thread_create(&worker->thread, 4096, worker_func, worker);
- worker->requests = zix_ring_new(4096);
- zix_ring_mlock(worker->requests);
- }
- worker->responses = zix_ring_new(4096);
- worker->response = malloc(4096);
- zix_ring_mlock(worker->responses);
+ if (worker) {
+ worker->iface = iface;
+ worker->handle = handle;
+ }
}
void
-jalv_worker_finish(JalvWorker* worker)
+jalv_worker_exit(JalvWorker* const worker)
{
- if (worker->threaded) {
- zix_sem_post(&worker->sem);
- zix_thread_join(worker->thread, NULL);
- }
+ if (worker && worker->threaded) {
+ worker->exit = true;
+ zix_sem_post(&worker->sem);
+ zix_thread_join(worker->thread);
+ worker->threaded = false;
+ }
}
void
-jalv_worker_destroy(JalvWorker* worker)
+jalv_worker_free(JalvWorker* const worker)
{
- if (worker->requests) {
- if (worker->threaded) {
- zix_ring_free(worker->requests);
- }
- zix_ring_free(worker->responses);
- free(worker->response);
- }
+ if (worker) {
+ jalv_worker_exit(worker);
+ zix_ring_free(worker->requests);
+ zix_ring_free(worker->responses);
+ free(worker->response);
+ free(worker);
+ }
}
LV2_Worker_Status
jalv_worker_schedule(LV2_Worker_Schedule_Handle handle,
- uint32_t size,
- const void* data)
+ const uint32_t size,
+ const void* const data)
+{
+ JalvWorker* const worker = (JalvWorker*)handle;
+ LV2_Worker_Status st = LV2_WORKER_SUCCESS;
+
+ if (!worker || !size) {
+ return LV2_WORKER_ERR_UNKNOWN;
+ }
+
+ if (worker->threaded) {
+ // Schedule a request to be executed by the worker thread
+ if (!(st = jalv_worker_write_packet(worker->requests, size, data))) {
+ zix_sem_post(&worker->sem);
+ }
+
+ } else {
+ // Execute work immediately in this thread
+ zix_sem_wait(worker->lock);
+ st = worker->iface->work(
+ worker->handle, jalv_worker_respond, worker, size, data);
+ zix_sem_post(worker->lock);
+ }
+
+ return st;
+}
+
+void
+jalv_worker_emit_responses(JalvWorker* const worker, LV2_Handle lv2_handle)
{
- JalvWorker* worker = (JalvWorker*)handle;
- Jalv* jalv = worker->jalv;
- if (worker->threaded) {
- // Schedule a request to be executed by the worker thread
- zix_ring_write(worker->requests, (const char*)&size, sizeof(size));
- zix_ring_write(worker->requests, (const char*)data, size);
- zix_sem_post(&worker->sem);
- } else {
- // Execute work immediately in this thread
- zix_sem_wait(&jalv->work_lock);
- worker->iface->work(
- jalv->instance->lv2_handle, jalv_worker_respond, worker, size, data);
- zix_sem_post(&jalv->work_lock);
- }
- return LV2_WORKER_SUCCESS;
+ static const uint32_t size_size = (uint32_t)sizeof(uint32_t);
+
+ if (worker && worker->responses) {
+ uint32_t size = 0U;
+ while (zix_ring_read(worker->responses, &size, size_size) == size_size) {
+ if (zix_ring_read(worker->responses, worker->response, size) == size) {
+ worker->iface->work_response(lv2_handle, size, worker->response);
+ }
+ }
+ }
}
void
-jalv_worker_emit_responses(JalvWorker* worker, LilvInstance* instance)
+jalv_worker_end_run(JalvWorker* const worker)
{
- if (worker->responses) {
- uint32_t read_space = 0;
- while (read_space = zix_ring_read_space(worker->responses)) {
- uint32_t size = 0;
- if (zix_ring_peek(worker->responses, (char*)&size, sizeof(size)) <= 0) {
- fprintf(stderr, "error: Response buffer corrupted (req %lu avail %u)\n",
- sizeof(size), read_space);
- break;
- }
-
- const uint32_t packet_size = sizeof(size) + size;
- if (read_space < packet_size) {
- fprintf(stderr, "warning: Try to read bigger response (%u) than data available (%u). Retry later.\n",
- packet_size, read_space);
- break;
- }
-
- if (zix_ring_skip(worker->responses, sizeof(size)) <= 0) {
- fprintf(stderr, "error: Response buffer corrupted on skip (req %lu avail %u)\n",
- sizeof(size), read_space);
- break;
- }
-
- if (zix_ring_read(worker->responses, (char*)worker->response, size) <= 0) {
- fprintf(stderr, "error: Response buffer corrupted on read response (req %u avail %u)\n",
- size, zix_ring_read_space(worker->responses));
- break;
- }
-
- worker->iface->work_response(
- instance->lv2_handle, size, worker->response);
- }
- }
+ if (worker && worker->iface && worker->iface->end_run) {
+ worker->iface->end_run(worker->handle);
+ }
}