/*
This file is part of Ingen.
Copyright 2007-2016 David Robillard
Ingen is free software: you can redistribute it and/or modify it under the
terms of the GNU Affero General Public License as published by the Free
Software Foundation, either version 3 of the License, or any later version.
Ingen is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
A PARTICULAR PURPOSE. See the GNU Affero General Public License for details.
You should have received a copy of the GNU Affero General Public License
along with Ingen. If not, see .
*/
#include "ingen/Forge.hpp"
#include "ingen/Log.hpp"
#include "ingen/URIMap.hpp"
#include "Broadcaster.hpp"
#include "BufferFactory.hpp"
#include "Engine.hpp"
#include "PortImpl.hpp"
#include "RunContext.hpp"
#include "Task.hpp"
namespace Ingen {
namespace Server {
struct Notification
{
inline Notification(PortImpl* p = nullptr,
FrameTime f = 0,
LV2_URID k = 0,
uint32_t s = 0,
LV2_URID t = 0)
: port(p), time(f), key(k), size(s), type(t)
{}
PortImpl* port;
FrameTime time;
LV2_URID key;
uint32_t size;
LV2_URID type;
};
RunContext::RunContext(Engine& engine,
Raul::RingBuffer* event_sink,
unsigned id,
bool threaded)
: _engine(engine)
, _event_sink(event_sink)
, _task(nullptr)
, _thread(threaded ? new std::thread(&RunContext::run, this) : nullptr)
, _id(id)
, _start(0)
, _end(0)
, _offset(0)
, _nframes(0)
, _realtime(true)
{}
RunContext::RunContext(const RunContext& copy)
: _engine(copy._engine)
, _event_sink(copy._event_sink)
, _task(nullptr)
, _thread(nullptr)
, _id(copy._id)
, _start(copy._start)
, _end(copy._end)
, _offset(copy._offset)
, _nframes(copy._nframes)
, _realtime(copy._realtime)
{}
bool
RunContext::must_notify(const PortImpl* port) const
{
return (port->is_monitored() || _engine.broadcaster()->must_broadcast());
}
bool
RunContext::notify(LV2_URID key,
FrameTime time,
PortImpl* port,
uint32_t size,
LV2_URID type,
const void* body)
{
const Notification n(port, time, key, size, type);
if (_event_sink->write_space() < sizeof(n) + size) {
return false;
}
if (_event_sink->write(sizeof(n), &n) != sizeof(n)) {
_engine.log().rt_error("Error writing header to notification ring\n");
} else if (_event_sink->write(size, body) != size) {
_engine.log().rt_error("Error writing body to notification ring\n");
} else {
return true;
}
return false;
}
void
RunContext::emit_notifications(FrameTime end)
{
const URIs& uris = _engine.buffer_factory()->uris();
const uint32_t read_space = _event_sink->read_space();
Notification note;
for (uint32_t i = 0; i < read_space; i += sizeof(note)) {
if (_event_sink->peek(sizeof(note), ¬e) != sizeof(note) ||
note.time >= end) {
return;
}
if (_event_sink->read(sizeof(note), ¬e) == sizeof(note)) {
Atom value = _engine.world()->forge().alloc(
note.size, note.type, nullptr);
if (_event_sink->read(note.size, value.get_body()) == note.size) {
i += note.size;
const char* key = _engine.world()->uri_map().unmap_uri(note.key);
if (key) {
_engine.broadcaster()->set_property(
note.port->uri(), URI(key), value);
if (note.port->is_input() &&
(note.key == uris.ingen_value ||
note.key == uris.midi_binding)) {
// FIXME: not thread safe
note.port->set_property(URI(key), value);
}
} else {
_engine.log().rt_error("Error unmapping notification key URI\n");
}
} else {
_engine.log().rt_error("Error reading body from notification ring\n");
}
} else {
_engine.log().rt_error("Error reading header from notification ring\n");
}
}
}
void
RunContext::claim_task(ParTask* task)
{
if (task) {
if (!_task) {
_task = task;
_engine.signal_tasks_available();
}
} else {
_task = nullptr;
}
}
Job
RunContext::steal_task() const
{
return _engine.steal_task(_id + 1);
}
void
RunContext::set_priority(int priority)
{
if (_thread) {
pthread_t pthread = _thread->native_handle();
const int policy = (priority > 0) ? SCHED_FIFO : SCHED_OTHER;
sched_param sp;
sp.sched_priority = (priority > 0) ? priority : 0;
if (pthread_setschedparam(pthread, policy, &sp)) {
_engine.log().error(
fmt("Failed to set real-time priority of run thread (%s)\n")
% strerror(errno));
}
}
}
void
RunContext::join()
{
if (_thread) {
if (_thread->joinable()) {
_thread->join();
}
delete _thread;
}
}
void
RunContext::run()
{
while (_engine.wait_for_tasks()) {
for (Job job; (job = _engine.steal_task(0)).task;) {
// fprintf(stderr, "%d run\n", id());
Server::run(job, *this);
// t->run(*this);
}
}
}
} // namespace Server
} // namespace Ingen