summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/engine/AudioBuffer.cpp26
-rw-r--r--src/engine/AudioBuffer.hpp32
-rw-r--r--src/engine/ConnectionImpl.cpp95
-rw-r--r--src/engine/ConnectionImpl.hpp18
-rw-r--r--src/engine/DuplexPort.cpp2
-rw-r--r--src/engine/DuplexPort.hpp2
-rw-r--r--src/engine/InputPort.cpp81
-rw-r--r--src/engine/InputPort.hpp5
-rw-r--r--src/engine/OutputPort.cpp4
-rw-r--r--src/engine/OutputPort.hpp2
-rw-r--r--src/engine/PatchImpl.cpp10
-rw-r--r--src/engine/PortImpl.cpp11
-rw-r--r--src/engine/PortImpl.hpp4
-rw-r--r--src/engine/events/Connect.cpp12
-rw-r--r--src/engine/events/Disconnect.cpp4
15 files changed, 123 insertions, 185 deletions
diff --git a/src/engine/AudioBuffer.cpp b/src/engine/AudioBuffer.cpp
index 145df559..e29fa5c4 100644
--- a/src/engine/AudioBuffer.cpp
+++ b/src/engine/AudioBuffer.cpp
@@ -194,32 +194,6 @@ AudioBuffer::copy(Context& context, const Buffer* src)
}
-/** Accumulate a block of @a src into buffer.
- *
- * @a start_sample and @a end_sample define the inclusive range to be accumulated.
- * This function only adds the same range in one buffer to another.
- */
-void
-AudioBuffer::accumulate(Context& context, const AudioBuffer* const src)
-{
- Sample* const buf = data();
- const Sample* const src_buf = src->data();
-
- const size_t frames = std::min(nframes(), src->nframes());
- assert(frames != 0);
-
- // Mix initial portions
- SampleCount i = 0;
- for (; i < frames; ++i)
- buf[i] += src_buf[i];
-
- // Extend/Mix the final sample of src if it is shorter
- const Sample last = src_buf[i - 1];
- while (i < nframes())
- buf[i++] += last;
-}
-
-
void
AudioBuffer::prepare_read(Context& context)
{
diff --git a/src/engine/AudioBuffer.hpp b/src/engine/AudioBuffer.hpp
index cfac3d59..7edfdef3 100644
--- a/src/engine/AudioBuffer.hpp
+++ b/src/engine/AudioBuffer.hpp
@@ -23,6 +23,7 @@
#include <boost/utility.hpp>
#include "types.hpp"
#include "ObjectBuffer.hpp"
+#include "Context.hpp"
using namespace std;
@@ -42,7 +43,7 @@ public:
void copy(Context& context, const Buffer* src);
void accumulate(Context& context, const AudioBuffer* src);
- bool is_control() const { return _type.symbol() == Shared::PortType::CONTROL; }
+ inline bool is_control() const { return _type.symbol() == Shared::PortType::CONTROL; }
inline Sample* data() const {
return (is_control())
@@ -75,6 +76,35 @@ private:
};
+/** Accumulate a block of @a src into buffer.
+ */
+inline void
+AudioBuffer::accumulate(Context& context, const AudioBuffer* const src)
+{
+ Sample* const buf = data();
+ const Sample* const src_buf = src->data();
+
+ if (is_control()) {
+ if (src->is_control()) { // control => control
+ buf[0] += src_buf[0];
+ } else { // audio => control
+ buf[0] += src_buf[context.offset()];
+ }
+ } else {
+ const SampleCount end = context.offset() + context.nframes();
+ if (src->is_control()) { // control => audio
+ for (SampleCount i = context.offset(); i < end; ++i) {
+ buf[i] += src_buf[0];
+ }
+ } else { // audio => audio
+ for (SampleCount i = context.offset(); i < end; ++i) {
+ buf[i] += src_buf[i];
+ }
+ }
+ }
+}
+
+
} // namespace Ingen
#endif // INGEN_ENGINE_AUDIOBUFFER_HPP
diff --git a/src/engine/ConnectionImpl.cpp b/src/engine/ConnectionImpl.cpp
index d3ec6cc7..c6ccf3ba 100644
--- a/src/engine/ConnectionImpl.cpp
+++ b/src/engine/ConnectionImpl.cpp
@@ -55,9 +55,6 @@ ConnectionImpl::ConnectionImpl(BufferFactory& bufs, PortImpl* src_port, PortImpl
assert(src_port != dst_port);
assert(src_port->path() != dst_port->path());
- if (must_mix() || must_queue())
- _local_buffer = bufs.get(dst_port->buffer_type(), dst_port->buffer_size(), true);
-
if (must_queue())
_queue = new Raul::RingBuffer<LV2_Object>(src_port->buffer_size() * 2);
}
@@ -74,82 +71,23 @@ ConnectionImpl::dump() const
void
-ConnectionImpl::update_buffer_size(Context& context, BufferFactory& bufs)
-{
- if (must_mix() || must_queue())
- allocate_buffer(bufs);
-}
-
-
-void
-ConnectionImpl::allocate_buffer(BufferFactory& bufs)
-{
- if (!_local_buffer)
- _local_buffer = bufs.get(_dst_port->buffer_type(), _dst_port->buffer_size());
-}
-
-
-void
-ConnectionImpl::prepare_poly(BufferFactory& bufs, uint32_t poly)
-{
- ThreadManager::assert_thread(THREAD_PRE_PROCESS);
-
- assert(_src_port->prepared_poly() == _dst_port->prepared_poly()
- || _src_port->prepared_poly() == 1
- || _dst_port->prepared_poly() == 1);
-
- const bool mix = _src_port->prepared_poly() > _dst_port->prepared_poly();
- if ((mix || must_queue()) && !_local_buffer)
- _local_buffer = bufs.get(_dst_port->buffer_type(), _dst_port->buffer(0)->size());
-}
-
-
-void
-ConnectionImpl::apply_poly(Raul::Maid& maid, uint32_t poly)
+ConnectionImpl::get_sources(Context& context, uint32_t voice,
+ Buffer** srcs, uint32_t max_num_srcs, uint32_t& num_srcs)
{
- ThreadManager::assert_thread(THREAD_PROCESS);
-
- assert(_src_port->poly() == _dst_port->poly()
- || _src_port->poly() == 1
- || _dst_port->poly() == 1);
-
- // Recycle buffer if it's no longer needed
- if (!(must_mix() || must_queue()))
- _local_buffer = NULL;
-}
-
-
-void
-ConnectionImpl::process(Context& context)
-{
- if (must_queue()) {
- IntrusivePtr<EventBuffer> src_buf = PtrCast<EventBuffer>(_src_port->buffer(0));
- if (!src_buf) {
- error << "Queued connection but source is not an EventBuffer" << endl;
- return;
- }
-
- IntrusivePtr<ObjectBuffer> local_buf = PtrCast<ObjectBuffer>(_local_buffer);
- if (!local_buf) {
- error << "Queued connection but local buffer is not an ObjectBuffer" << endl;
- return;
- }
-
- local_buf->clear();
+ if (must_queue())
+ return;
- if (_queue->read_space()) {
- LV2_Object obj;
- _queue->full_peek(sizeof(LV2_Object), &obj);
- _queue->full_read(sizeof(LV2_Object) + obj.size, local_buf->object());
+ if (must_mix()) {
+ // Mixing down voices: every src voice mixed into every dst voice
+ for (uint32_t v = 0; v < _src_port->poly(); ++v) {
+ assert(num_srcs < max_num_srcs);
+ srcs[num_srcs++] = _src_port->buffer(v).get();
}
-
- } else if (must_mix()) {
- const uint32_t num_srcs = src_port()->poly();
- Buffer* srcs[num_srcs];
- for (uint32_t v = 0; v < num_srcs; ++v)
- srcs[v] = src_port()->buffer(v).get();
-
- mix(context, _local_buffer.get(), srcs, num_srcs);
+ } else {
+ // Matching polyphony: each src voice mixed into corresponding dst voice
+ assert(_src_port->poly() == _dst_port->poly());
+ assert(num_srcs < max_num_srcs);
+ srcs[num_srcs++] = _src_port->buffer(voice).get();
}
}
@@ -166,8 +104,7 @@ ConnectionImpl::queue(Context& context)
return;
}
- src_buf->rewind();
- while (src_buf->is_valid()) {
+ for (src_buf->rewind(); src_buf->is_valid(); src_buf->increment()) {
LV2_Event* ev = src_buf->get_event();
LV2_Object* obj = LV2_OBJECT_FROM_EVENT(ev);
/*debug << _src_port->path() << " -> " << _dst_port->path()
@@ -177,8 +114,6 @@ ConnectionImpl::queue(Context& context)
debug << endl;*/
_queue->write(sizeof(LV2_Object) + obj->size, obj);
- src_buf->increment();
-
context.engine().message_context()->run(_dst_port, context.start() + ev->frames);
}
}
diff --git a/src/engine/ConnectionImpl.hpp b/src/engine/ConnectionImpl.hpp
index 232d8033..2725dd83 100644
--- a/src/engine/ConnectionImpl.hpp
+++ b/src/engine/ConnectionImpl.hpp
@@ -20,6 +20,7 @@
#include <cstdlib>
#include <boost/utility.hpp>
+#include "raul/log.hpp"
#include "raul/Deletable.hpp"
#include "interface/PortType.hpp"
#include "interface/Connection.hpp"
@@ -63,11 +64,10 @@ public:
bool pending_disconnection() { return _pending_disconnection; }
void pending_disconnection(bool b) { _pending_disconnection = b; }
- void process(Context& context);
void queue(Context& context);
- void allocate_buffer(BufferFactory& bufs);
- void recycle_buffer() { _local_buffer = NULL; }
+ void get_sources(Context& context, uint32_t voice,
+ Buffer** srcs, uint32_t max_num_srcs, uint32_t& num_srcs);
/** Get the buffer for a particular voice.
* A Connection is smart - it knows the destination port requesting the
@@ -75,19 +75,16 @@ public:
* voice in a mono->poly connection).
*/
inline BufferFactory::Ref buffer(uint32_t voice) const {
- if (must_mix() || must_queue()) {
- return _local_buffer;
- } else if (_src_port->poly() == 1) {
+ assert(!must_mix());
+ assert(!must_queue());
+ assert(_src_port->poly() == 1 || _src_port->poly() > voice);
+ if (_src_port->poly() == 1) {
return _src_port->buffer(0);
} else {
return _src_port->buffer(voice);
}
}
- void update_buffer_size(Context& context, BufferFactory& bufs);
- void prepare_poly(BufferFactory& bufs, uint32_t poly);
- void apply_poly(Raul::Maid& maid, uint32_t poly);
-
/** Returns true if this connection must mix down voices into a local buffer */
inline bool must_mix() const { return _src_port->poly() > _dst_port->poly(); }
@@ -104,7 +101,6 @@ protected:
BufferFactory& _bufs;
PortImpl* const _src_port;
PortImpl* const _dst_port;
- BufferFactory::Ref _local_buffer;
bool _pending_disconnection;
};
diff --git a/src/engine/DuplexPort.cpp b/src/engine/DuplexPort.cpp
index 1e23b5b5..a78da58a 100644
--- a/src/engine/DuplexPort.cpp
+++ b/src/engine/DuplexPort.cpp
@@ -54,7 +54,7 @@ DuplexPort::DuplexPort(
}
-void
+bool
DuplexPort::get_buffers(BufferFactory& bufs, Raul::Array<BufferFactory::Ref>* buffers, uint32_t poly)
{
if (_is_output)
diff --git a/src/engine/DuplexPort.hpp b/src/engine/DuplexPort.hpp
index aab33c33..ff375edd 100644
--- a/src/engine/DuplexPort.hpp
+++ b/src/engine/DuplexPort.hpp
@@ -52,7 +52,7 @@ public:
virtual ~DuplexPort() {}
- void get_buffers(BufferFactory& bufs, Raul::Array<BufferFactory::Ref>* buffers, uint32_t poly);
+ bool get_buffers(BufferFactory& bufs, Raul::Array<BufferFactory::Ref>* buffers, uint32_t poly);
void pre_process(Context& context);
void post_process(Context& context);
diff --git a/src/engine/InputPort.cpp b/src/engine/InputPort.cpp
index d5051865..4c5b56e2 100644
--- a/src/engine/InputPort.cpp
+++ b/src/engine/InputPort.cpp
@@ -75,19 +75,10 @@ InputPort::apply_poly(Maid& maid, uint32_t poly)
}
-void
-InputPort::set_buffer_size(Context& context, BufferFactory& bufs, size_t size)
-{
- PortImpl::set_buffer_size(context, bufs, size);
-
- for (Connections::iterator c = _connections.begin(); c != _connections.end(); ++c)
- (*c)->update_buffer_size(context, bufs);
-}
-
-
/** Set \a buffers appropriately if this port has \a num_connections connections.
+ * \return true iff buffers are locally owned by the port
*/
-void
+bool
InputPort::get_buffers(BufferFactory& bufs, Raul::Array<BufferFactory::Ref>* buffers, uint32_t poly)
{
size_t num_connections = (ThreadManager::current_thread_id() == THREAD_PROCESS)
@@ -97,26 +88,25 @@ InputPort::get_buffers(BufferFactory& bufs, Raul::Array<BufferFactory::Ref>* buf
// Audio input with no connections, use shared zero buffer
for (uint32_t v = 0; v < poly; ++v)
buffers->at(v) = bufs.silent_buffer();
+ return false;
} else if (num_connections == 1) {
if (ThreadManager::current_thread_id() == THREAD_PROCESS) {
- // Single connection, use it directly
- for (uint32_t v = 0; v < poly; ++v)
- buffers->at(v) = _connections.front()->buffer(v);
- } else {
- // Not in the process thread, will be set later
- for (uint32_t v = 0; v < poly; ++v)
- buffers->at(v) = NULL;
+ if (!_connections.front()->must_mix()) {
+ // Single non-mixing conneciton, use buffers directly
+ for (uint32_t v = 0; v < poly; ++v)
+ buffers->at(v) = _connections.front()->buffer(v);
+ return false;
+ }
}
+ }
- } else {
- // Use local buffers
- for (uint32_t v = 0; v < poly; ++v) {
- buffers->at(v) = NULL; // Release first (potential immediate recycling)
- buffers->at(v) = _bufs.get(buffer_type(), _buffer_size);
- buffers->at(v)->clear();
- }
+ // Otherwise, allocate local buffers
+ for (uint32_t v = 0; v < poly; ++v) {
+ buffers->at(v) = _bufs.get(buffer_type(), _buffer_size);
+ buffers->at(v)->clear();
}
+ return true;
}
@@ -186,31 +176,32 @@ InputPort::pre_process(Context& context)
if (_set_by_user)
return;
- // Process connections (mix down polyphony, if necessary)
+ uint32_t max_num_srcs = 0;
for (Connections::iterator c = _connections.begin(); c != _connections.end(); ++c)
- (*c)->process(context);
+ max_num_srcs += (*c)->src_port()->poly();
- // Multiple connections, mix them all into our local buffers
- if (_connections.size() > 1) {
- for (uint32_t v = 0; v < _poly; ++v)
- buffer(v)->prepare_write(context);
+ Buffer* srcs[max_num_srcs];
+ if (_connections.empty()) {
for (uint32_t v = 0; v < _poly; ++v) {
- const uint32_t num_srcs = _connections.size();
- Connections::iterator c = _connections.begin();
- Buffer* srcs[num_srcs];
- for (uint32_t i = 0; c != _connections.end(); ++c) {
- assert(i < num_srcs);
- srcs[i++] = (*c)->buffer(v).get();
- }
+ buffer(v)->prepare_read(context);
+ }
+ } else if (direct_connect()) {
+ for (uint32_t v = 0; v < _poly; ++v) {
+ _buffers->at(v) = _connections.front()->buffer(v);
+ _buffers->at(v)->prepare_read(context);
+ }
+ } else {
+ for (uint32_t v = 0; v < _poly; ++v) {
+ uint32_t num_srcs = 0;
+ for (Connections::iterator c = _connections.begin(); c != _connections.end(); ++c)
+ (*c)->get_sources(context, v, srcs, max_num_srcs, num_srcs);
mix(context, buffer(v).get(), srcs, num_srcs);
+ buffer(v)->prepare_read(context);
}
}
- for (uint32_t v = 0; v < _poly; ++v)
- buffer(v)->prepare_read(context);
-
if (_broadcast)
broadcast_value(context, false);
}
@@ -223,5 +214,13 @@ InputPort::post_process(Context& context)
}
+bool
+InputPort::direct_connect() const
+{
+ ThreadManager::assert_thread(THREAD_PROCESS);
+ return _connections.size() == 1 && !_connections.front()->must_mix();
+}
+
+
} // namespace Ingen
diff --git a/src/engine/InputPort.hpp b/src/engine/InputPort.hpp
index 44f82370..40282dee 100644
--- a/src/engine/InputPort.hpp
+++ b/src/engine/InputPort.hpp
@@ -65,9 +65,8 @@ public:
Connections::Node* remove_connection(ProcessContext& context, const OutputPort* src_port);
bool apply_poly(Raul::Maid& maid, uint32_t poly);
- void set_buffer_size(Context& context, BufferFactory& bufs, size_t size);
- void get_buffers(BufferFactory& bufs, Raul::Array<BufferFactory::Ref>* buffers, uint32_t poly);
+ bool get_buffers(BufferFactory& bufs, Raul::Array<BufferFactory::Ref>* buffers, uint32_t poly);
void pre_process(Context& context);
void post_process(Context& context);
@@ -79,6 +78,8 @@ public:
bool is_input() const { return true; }
bool is_output() const { return false; }
+ bool direct_connect() const;
+
protected:
size_t _num_connections; ///< Pre-process thread
Connections _connections;
diff --git a/src/engine/OutputPort.cpp b/src/engine/OutputPort.cpp
index c2381cff..5509b655 100644
--- a/src/engine/OutputPort.cpp
+++ b/src/engine/OutputPort.cpp
@@ -51,11 +51,13 @@ OutputPort::OutputPort(BufferFactory& bufs,
}
-void
+bool
OutputPort::get_buffers(BufferFactory& bufs, Raul::Array<BufferFactory::Ref>* buffers, uint32_t poly)
{
for (uint32_t v = 0; v < poly; ++v)
buffers->at(v) = bufs.get(buffer_type(), _buffer_size);
+
+ return true;
}
diff --git a/src/engine/OutputPort.hpp b/src/engine/OutputPort.hpp
index b143ae32..fde0f861 100644
--- a/src/engine/OutputPort.hpp
+++ b/src/engine/OutputPort.hpp
@@ -48,7 +48,7 @@ public:
const Raul::Atom& value,
size_t buffer_size=0);
- void get_buffers(BufferFactory& bufs, Raul::Array<BufferFactory::Ref>* buffers, uint32_t poly);
+ bool get_buffers(BufferFactory& bufs, Raul::Array<BufferFactory::Ref>* buffers, uint32_t poly);
void pre_process(Context& context);
void post_process(Context& context);
diff --git a/src/engine/PatchImpl.cpp b/src/engine/PatchImpl.cpp
index 309ac3f8..c5327e44 100644
--- a/src/engine/PatchImpl.cpp
+++ b/src/engine/PatchImpl.cpp
@@ -111,8 +111,9 @@ PatchImpl::prepare_internal_poly(BufferFactory& bufs, uint32_t poly)
for (List<NodeImpl*>::iterator i = _nodes.begin(); i != _nodes.end(); ++i)
(*i)->prepare_poly(bufs, poly);
- for (Connections::iterator i = _connections.begin(); i != _connections.end(); ++i)
- ((ConnectionImpl*)i->second.get())->prepare_poly(bufs, poly);
+ for (List<NodeImpl*>::iterator i = _nodes.begin(); i != _nodes.end(); ++i)
+ for (uint32_t j = 0; j < (*i)->num_ports(); ++j)
+ (*i)->port_impl(j)->prepare_poly_buffers(bufs);
return true;
}
@@ -128,13 +129,10 @@ PatchImpl::apply_internal_poly(ProcessContext& context, BufferFactory& bufs, Rau
for (List<NodeImpl*>::iterator i = _nodes.begin(); i != _nodes.end(); ++i)
(*i)->apply_poly(maid, poly);
- for (Connections::iterator i = _connections.begin(); i != _connections.end(); ++i)
- ((ConnectionImpl*)i->second.get())->apply_poly(maid, poly);
-
for (List<NodeImpl*>::iterator i = _nodes.begin(); i != _nodes.end(); ++i) {
for (uint32_t j = 0; j < (*i)->num_ports(); ++j) {
PortImpl* const port = (*i)->port_impl(j);
- if (port->is_input() && dynamic_cast<InputPort*>(port)->num_connections() == 1)
+ if (port->is_input() && dynamic_cast<InputPort*>(port)->direct_connect())
port->setup_buffers(bufs, port->poly());
port->connect_buffers(context.offset());
}
diff --git a/src/engine/PortImpl.cpp b/src/engine/PortImpl.cpp
index a1a20ba8..7abfb90d 100644
--- a/src/engine/PortImpl.cpp
+++ b/src/engine/PortImpl.cpp
@@ -127,13 +127,18 @@ PortImpl::prepare_poly(BufferFactory& bufs, uint32_t poly)
if (!_prepared_buffers)
_prepared_buffers = new Array<BufferFactory::Ref>(poly, *_buffers, NULL);
- get_buffers(bufs, _prepared_buffers, poly);
- assert(prepared_poly() == poly);
-
return true;
}
+void
+PortImpl::prepare_poly_buffers(BufferFactory& bufs)
+{
+ if (_prepared_buffers)
+ get_buffers(bufs, _prepared_buffers, _prepared_buffers->size());
+}
+
+
bool
PortImpl::apply_poly(Maid& maid, uint32_t poly)
{
diff --git a/src/engine/PortImpl.hpp b/src/engine/PortImpl.hpp
index 637336af..cb458d01 100644
--- a/src/engine/PortImpl.hpp
+++ b/src/engine/PortImpl.hpp
@@ -68,6 +68,8 @@ public:
*/
virtual bool prepare_poly(BufferFactory& bufs, uint32_t poly);
+ virtual void prepare_poly_buffers(BufferFactory& bufs);
+
/** Apply a new polyphony value.
*
* Audio thread.
@@ -92,7 +94,7 @@ public:
/** Empty buffer contents completely (ie silence) */
virtual void clear_buffers();
- virtual void get_buffers(BufferFactory& bufs, Raul::Array<BufferFactory::Ref>* buffers, uint32_t poly) = 0;
+ virtual bool get_buffers(BufferFactory& bufs, Raul::Array<BufferFactory::Ref>* buffers, uint32_t poly) = 0;
void setup_buffers(BufferFactory& bufs, uint32_t poly) {
get_buffers(bufs, _buffers, poly);
diff --git a/src/engine/events/Connect.cpp b/src/engine/events/Connect.cpp
index 5b7726d9..5eefb181 100644
--- a/src/engine/events/Connect.cpp
+++ b/src/engine/events/Connect.cpp
@@ -135,15 +135,11 @@ Connect::pre_process()
_patch->add_connection(_connection);
_dst_input_port->increment_num_connections();
- switch (_dst_input_port->num_connections()) {
- case 1:
- _connection->allocate_buffer(*_engine.buffer_factory());
- break;
- case 2:
+ if ((_dst_input_port->num_connections() == 1 && _connection->must_mix())
+ || _dst_input_port->num_connections() == 2) {
_buffers = new Raul::Array<BufferFactory::Ref>(_dst_input_port->poly());
- _dst_input_port->get_buffers(*_engine.buffer_factory(), _buffers, _dst_input_port->poly());
- default:
- break;
+ _dst_input_port->get_buffers(*_engine.buffer_factory(),
+ _buffers, _dst_input_port->poly());
}
if (_patch->enabled())
diff --git a/src/engine/events/Disconnect.cpp b/src/engine/events/Disconnect.cpp
index f25c0f4a..443cc512 100644
--- a/src/engine/events/Disconnect.cpp
+++ b/src/engine/events/Disconnect.cpp
@@ -161,7 +161,8 @@ Disconnect::pre_process()
if (_dst_input_port->num_connections() == 0) {
_buffers = new Raul::Array<BufferFactory::Ref>(_dst_input_port->poly());
- _dst_input_port->get_buffers(*_engine.buffer_factory(), _buffers, _dst_input_port->poly());
+ _dst_input_port->get_buffers(*_engine.buffer_factory(),
+ _buffers, _dst_input_port->poly());
_clear_dst_port = true;
}
@@ -181,7 +182,6 @@ Disconnect::execute(ProcessContext& context)
if (_error == NO_ERROR) {
InputPort::Connections::Node* const port_connections_node
= _dst_input_port->remove_connection(context, _src_output_port);
- port_connections_node->elem()->recycle_buffer();
if (_reconnect_dst_port) {
if (_buffers)
_engine.maid()->push(_dst_input_port->set_buffers(_buffers));