summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorWim Taymans <wim.taymans@gmail.com>2007-04-18 18:58:53 +0000
committerWim Taymans <wim.taymans@gmail.com>2007-04-18 18:58:53 +0000
commit1d75a69ccf4b4ff63037cf5b4ddf9491dad7ca4b (patch)
tree16fbb02702429169c282584cb66cd92a9045ac49
parent6cbfc31aaeff594d4c092e9200f5b6fc5c907d17 (diff)
downloadgst-plugins-bad-1d75a69ccf4b4ff63037cf5b4ddf9491dad7ca4b.tar.gz
gst-plugins-bad-1d75a69ccf4b4ff63037cf5b4ddf9491dad7ca4b.tar.bz2
gst-plugins-bad-1d75a69ccf4b4ff63037cf5b4ddf9491dad7ca4b.zip
configure.ac: Disable rtpmanager for now because it depends on CVS -base.
Original commit message from CVS: * configure.ac: Disable rtpmanager for now because it depends on CVS -base. * gst/rtpmanager/Makefile.am: Added new files for session manager. * gst/rtpmanager/gstrtpjitterbuffer.h: * gst/rtpmanager/gstrtpbin.c: (create_session), (get_pt_map), (create_stream), (pt_map_requested), (new_ssrc_pad_found): Some cleanups. the session manager can now also request a pt-map. * gst/rtpmanager/gstrtpsession.c: (gst_rtp_session_base_init), (gst_rtp_session_class_init), (gst_rtp_session_init), (gst_rtp_session_finalize), (rtcp_thread), (start_rtcp_thread), (stop_rtcp_thread), (gst_rtp_session_change_state), (gst_rtp_session_process_rtp), (gst_rtp_session_send_rtp), (gst_rtp_session_send_rtcp), (gst_rtp_session_clock_rate), (gst_rtp_session_get_time), (gst_rtp_session_event_recv_rtp_sink), (gst_rtp_session_chain_recv_rtp), (gst_rtp_session_event_recv_rtcp_sink), (gst_rtp_session_chain_recv_rtcp), (gst_rtp_session_event_send_rtp_sink), (gst_rtp_session_chain_send_rtp), (create_send_rtcp_src), (gst_rtp_session_request_new_pad): * gst/rtpmanager/gstrtpsession.h: We can ask for pt-map now too when the session manager needs it. Hook up to the new session manager, implement the needed callbacks for pushing data, getting clock time and requesting clock-rates. Rename rtcp_src to send_rtcp_src to make it clear that this RTCP is to be send to clients. Add code to start and stop the thread that will schedule RTCP through the session manager. * gst/rtpmanager/rtpsession.c: (rtp_session_class_init), (rtp_session_init), (rtp_session_finalize), (rtp_session_set_property), (rtp_session_get_property), (on_new_ssrc), (on_ssrc_collision), (on_ssrc_validated), (on_bye_ssrc), (rtp_session_new), (rtp_session_set_callbacks), (rtp_session_set_bandwidth), (rtp_session_get_bandwidth), (rtp_session_set_rtcp_bandwidth), (rtp_session_get_rtcp_bandwidth), (source_push_rtp), (source_clock_rate), (check_collision), (obtain_source), (rtp_session_add_source), (rtp_session_get_num_sources), (rtp_session_get_num_active_sources), (rtp_session_get_source_by_ssrc), (rtp_session_get_source_by_cname), (rtp_session_create_source), (update_arrival_stats), (rtp_session_process_rtp), (rtp_session_process_sr), (rtp_session_process_rr), (rtp_session_process_sdes), (rtp_session_process_bye), (rtp_session_process_app), (rtp_session_process_rtcp), (rtp_session_send_rtp), (rtp_session_get_rtcp_interval), (rtp_session_produce_rtcp): * gst/rtpmanager/rtpsession.h: The advanced beginnings of the main session manager that handles the participant database of RTPSources, SSRC probation, SSRC collisions, parse RTCP to update source stats. etc.. * gst/rtpmanager/rtpsource.c: (rtp_source_class_init), (rtp_source_init), (rtp_source_finalize), (rtp_source_new), (rtp_source_set_callbacks), (rtp_source_set_as_csrc), (rtp_source_set_rtp_from), (rtp_source_set_rtcp_from), (push_packet), (get_clock_rate), (calculate_jitter), (rtp_source_process_rtp), (rtp_source_process_bye), (rtp_source_send_rtp), (rtp_source_process_sr), (rtp_source_process_rb): * gst/rtpmanager/rtpsource.h: Object that encapsulates an SSRC and its state in the database. Calculates the jitter and transit times of data packets. * gst/rtpmanager/rtpstats.c: (rtp_stats_init_defaults), (rtp_stats_calculate_rtcp_interval), (rtp_stats_add_rtcp_jitter): * gst/rtpmanager/rtpstats.h: Various stats regarding the session and sources. Used to calculate the RTCP interval.
-rw-r--r--ChangeLog78
m---------common0
-rw-r--r--configure.ac1
-rw-r--r--gst/rtpmanager/Makefile.am9
-rw-r--r--gst/rtpmanager/gstrtpbin.c28
-rw-r--r--gst/rtpmanager/gstrtpjitterbuffer.h1
-rw-r--r--gst/rtpmanager/gstrtpsession.c325
-rw-r--r--gst/rtpmanager/gstrtpsession.h6
-rw-r--r--gst/rtpmanager/rtpsession.c1026
-rw-r--r--gst/rtpmanager/rtpsession.h206
-rw-r--r--gst/rtpmanager/rtpsource.c477
-rw-r--r--gst/rtpmanager/rtpsource.h162
-rw-r--r--gst/rtpmanager/rtpstats.c111
-rw-r--r--gst/rtpmanager/rtpstats.h161
14 files changed, 2555 insertions, 36 deletions
diff --git a/ChangeLog b/ChangeLog
index 8dddf1d2..c94887c1 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,81 @@
+2007-04-18 Wim Taymans <wim@fluendo.com>
+
+ * configure.ac:
+ Disable rtpmanager for now because it depends on CVS -base.
+
+ * gst/rtpmanager/Makefile.am:
+ Added new files for session manager.
+
+ * gst/rtpmanager/gstrtpjitterbuffer.h:
+ * gst/rtpmanager/gstrtpbin.c: (create_session), (get_pt_map),
+ (create_stream), (pt_map_requested), (new_ssrc_pad_found):
+ Some cleanups.
+ the session manager can now also request a pt-map.
+
+ * gst/rtpmanager/gstrtpsession.c: (gst_rtp_session_base_init),
+ (gst_rtp_session_class_init), (gst_rtp_session_init),
+ (gst_rtp_session_finalize), (rtcp_thread), (start_rtcp_thread),
+ (stop_rtcp_thread), (gst_rtp_session_change_state),
+ (gst_rtp_session_process_rtp), (gst_rtp_session_send_rtp),
+ (gst_rtp_session_send_rtcp), (gst_rtp_session_clock_rate),
+ (gst_rtp_session_get_time), (gst_rtp_session_event_recv_rtp_sink),
+ (gst_rtp_session_chain_recv_rtp),
+ (gst_rtp_session_event_recv_rtcp_sink),
+ (gst_rtp_session_chain_recv_rtcp),
+ (gst_rtp_session_event_send_rtp_sink),
+ (gst_rtp_session_chain_send_rtp), (create_send_rtcp_src),
+ (gst_rtp_session_request_new_pad):
+ * gst/rtpmanager/gstrtpsession.h:
+ We can ask for pt-map now too when the session manager needs it.
+ Hook up to the new session manager, implement the needed callbacks for
+ pushing data, getting clock time and requesting clock-rates.
+ Rename rtcp_src to send_rtcp_src to make it clear that this RTCP is to
+ be send to clients.
+ Add code to start and stop the thread that will schedule RTCP through
+ the session manager.
+
+ * gst/rtpmanager/rtpsession.c: (rtp_session_class_init),
+ (rtp_session_init), (rtp_session_finalize),
+ (rtp_session_set_property), (rtp_session_get_property),
+ (on_new_ssrc), (on_ssrc_collision), (on_ssrc_validated),
+ (on_bye_ssrc), (rtp_session_new), (rtp_session_set_callbacks),
+ (rtp_session_set_bandwidth), (rtp_session_get_bandwidth),
+ (rtp_session_set_rtcp_bandwidth), (rtp_session_get_rtcp_bandwidth),
+ (source_push_rtp), (source_clock_rate), (check_collision),
+ (obtain_source), (rtp_session_add_source),
+ (rtp_session_get_num_sources),
+ (rtp_session_get_num_active_sources),
+ (rtp_session_get_source_by_ssrc),
+ (rtp_session_get_source_by_cname), (rtp_session_create_source),
+ (update_arrival_stats), (rtp_session_process_rtp),
+ (rtp_session_process_sr), (rtp_session_process_rr),
+ (rtp_session_process_sdes), (rtp_session_process_bye),
+ (rtp_session_process_app), (rtp_session_process_rtcp),
+ (rtp_session_send_rtp), (rtp_session_get_rtcp_interval),
+ (rtp_session_produce_rtcp):
+ * gst/rtpmanager/rtpsession.h:
+ The advanced beginnings of the main session manager that handles the
+ participant database of RTPSources, SSRC probation, SSRC collisions,
+ parse RTCP to update source stats. etc..
+
+ * gst/rtpmanager/rtpsource.c: (rtp_source_class_init),
+ (rtp_source_init), (rtp_source_finalize), (rtp_source_new),
+ (rtp_source_set_callbacks), (rtp_source_set_as_csrc),
+ (rtp_source_set_rtp_from), (rtp_source_set_rtcp_from),
+ (push_packet), (get_clock_rate), (calculate_jitter),
+ (rtp_source_process_rtp), (rtp_source_process_bye),
+ (rtp_source_send_rtp), (rtp_source_process_sr),
+ (rtp_source_process_rb):
+ * gst/rtpmanager/rtpsource.h:
+ Object that encapsulates an SSRC and its state in the database.
+ Calculates the jitter and transit times of data packets.
+
+ * gst/rtpmanager/rtpstats.c: (rtp_stats_init_defaults),
+ (rtp_stats_calculate_rtcp_interval), (rtp_stats_add_rtcp_jitter):
+ * gst/rtpmanager/rtpstats.h:
+ Various stats regarding the session and sources.
+ Used to calculate the RTCP interval.
+
2007-04-17 Tim-Philipp Müller <tim at centricular dot net>
* gst/app/Makefile.am:
diff --git a/common b/common
-Subproject 9097e252e477e18182f08a032d8860bdee9a041
+Subproject e05f45f13961b851501ca8938aa2049fa96c7b1
diff --git a/configure.ac b/configure.ac
index f8499a4d..97f46b52 100644
--- a/configure.ac
+++ b/configure.ac
@@ -95,7 +95,6 @@ GST_PLUGINS_ALL="\
nuvdemux \
real \
replaygain \
- rtpmanager \
spectrum \
speed \
qtdemux \
diff --git a/gst/rtpmanager/Makefile.am b/gst/rtpmanager/Makefile.am
index f844e47c..9e47cbdf 100644
--- a/gst/rtpmanager/Makefile.am
+++ b/gst/rtpmanager/Makefile.am
@@ -17,6 +17,9 @@ libgstrtpmanager_la_SOURCES = gstrtpmanager.c \
gstrtpjitterbuffer.c \
gstrtpptdemux.c \
gstrtpssrcdemux.c \
+ rtpsession.c \
+ rtpsource.c \
+ rtpstats.c \
gstrtpsession.c
nodist_libgstrtpmanager_la_SOURCES = \
@@ -28,11 +31,15 @@ noinst_HEADERS = gstrtpbin.h \
gstrtpjitterbuffer.h \
gstrtpptdemux.h \
gstrtpssrcdemux.h \
+ rtpsession.h \
+ rtpsource.h \
+ rtpstats.h \
gstrtpsession.h
libgstrtpmanager_la_CFLAGS = $(GST_CFLAGS) $(GST_PLUGINS_BASE_CFLAGS) $(ERROR_CFLAGS)
libgstrtpmanager_la_LIBADD = $(GST_LIBS_LIBS)
-libgstrtpmanager_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS) $(GST_BASE_LIBS) $(GST_PLUGINS_BASE_LIBS) -lgstrtp-@GST_MAJORMINOR@
+libgstrtpmanager_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS) $(GST_BASE_LIBS) $(GST_PLUGINS_BASE_LIBS) -lgstrtp-@GST_MAJORMINOR@ \
+ -lgstnetbuffer-@GST_MAJORMINOR@
CLEANFILES = $(BUILT_SOURCES)
diff --git a/gst/rtpmanager/gstrtpbin.c b/gst/rtpmanager/gstrtpbin.c
index 6825e9cc..9162d76c 100644
--- a/gst/rtpmanager/gstrtpbin.c
+++ b/gst/rtpmanager/gstrtpbin.c
@@ -129,7 +129,7 @@ typedef struct _GstRTPBinClient GstRTPBinClient;
static guint gst_rtp_bin_signals[LAST_SIGNAL] = { 0 };
static GstCaps *pt_map_requested (GstElement * element, guint pt,
- GstRTPBinStream * stream);
+ GstRTPBinSession * session);
/* Manages the RTP stream for one SSRC.
*
@@ -215,9 +215,9 @@ static GstRTPBinSession *
create_session (GstRTPBin * rtpbin, gint id)
{
GstRTPBinSession *sess;
- GstElement *elem, *demux;
+ GstElement *session, *demux;
- if (!(elem = gst_element_factory_make ("rtpsession", NULL)))
+ if (!(session = gst_element_factory_make ("rtpsession", NULL)))
goto no_session;
if (!(demux = gst_element_factory_make ("rtpssrcdemux", NULL)))
@@ -227,13 +227,17 @@ create_session (GstRTPBin * rtpbin, gint id)
sess->lock = g_mutex_new ();
sess->id = id;
sess->bin = rtpbin;
- sess->session = elem;
+ sess->session = session;
sess->demux = demux;
sess->ptmap = g_hash_table_new (NULL, NULL);
rtpbin->sessions = g_slist_prepend (rtpbin->sessions, sess);
- gst_bin_add (GST_BIN_CAST (rtpbin), elem);
- gst_element_set_state (elem, GST_STATE_PLAYING);
+ /* provide clock_rate to the session manager when needed */
+ g_signal_connect (session, "request-pt-map",
+ (GCallback) pt_map_requested, sess);
+
+ gst_bin_add (GST_BIN_CAST (rtpbin), session);
+ gst_element_set_state (session, GST_STATE_PLAYING);
gst_bin_add (GST_BIN_CAST (rtpbin), demux);
gst_element_set_state (demux, GST_STATE_PLAYING);
@@ -247,7 +251,7 @@ no_session:
}
no_demux:
{
- gst_object_unref (elem);
+ gst_object_unref (session);
g_warning ("rtpbin: could not create rtpssrcdemux element");
return NULL;
}
@@ -351,7 +355,7 @@ create_stream (GstRTPBinSession * session, guint32 ssrc)
/* provide clock_rate to the jitterbuffer when needed */
g_signal_connect (buffer, "request-pt-map",
- (GCallback) pt_map_requested, stream);
+ (GCallback) pt_map_requested, session);
gst_bin_add (GST_BIN_CAST (session->bin), buffer);
gst_element_set_state (buffer, GST_STATE_PLAYING);
@@ -590,14 +594,12 @@ new_payload_found (GstElement * element, guint pt, GstPad * pad,
}
static GstCaps *
-pt_map_requested (GstElement * element, guint pt, GstRTPBinStream * stream)
+pt_map_requested (GstElement * element, guint pt, GstRTPBinSession * session)
{
GstRTPBin *rtpbin;
- GstRTPBinSession *session;
GstCaps *caps;
- rtpbin = stream->bin;
- session = stream->session;
+ rtpbin = session->bin;
GST_DEBUG_OBJECT (rtpbin, "payload map requested for pt %d in session %d", pt,
session->id);
@@ -647,7 +649,7 @@ new_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad,
* demuxer so that it can apply a proper caps on the buffers for the
* depayloaders. */
stream->demux_ptreq_sig = g_signal_connect (stream->demux,
- "request-pt-map", (GCallback) pt_map_requested, stream);
+ "request-pt-map", (GCallback) pt_map_requested, session);
GST_RTP_SESSION_UNLOCK (session);
diff --git a/gst/rtpmanager/gstrtpjitterbuffer.h b/gst/rtpmanager/gstrtpjitterbuffer.h
index e101039a..3cbcd62f 100644
--- a/gst/rtpmanager/gstrtpjitterbuffer.h
+++ b/gst/rtpmanager/gstrtpjitterbuffer.h
@@ -63,6 +63,7 @@ struct _GstRTPJitterBufferClass
{
GstElementClass parent_class;
+ /* signals */
GstCaps* (*request_pt_map) (GstRTPJitterBuffer *buffer, guint pt);
/*< private > */
diff --git a/gst/rtpmanager/gstrtpsession.c b/gst/rtpmanager/gstrtpsession.c
index cdad7e9b..03b0802b 100644
--- a/gst/rtpmanager/gstrtpsession.c
+++ b/gst/rtpmanager/gstrtpsession.c
@@ -39,7 +39,10 @@
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
+
+#include "gstrtpbin-marshal.h"
#include "gstrtpsession.h"
+#include "rtpsession.h"
GST_DEBUG_CATEGORY_STATIC (gst_rtp_session_debug);
#define GST_CAT_DEFAULT gst_rtp_session_debug
@@ -95,8 +98,8 @@ GST_STATIC_PAD_TEMPLATE ("send_rtp_src",
GST_STATIC_CAPS ("application/x-rtp")
);
-static GstStaticPadTemplate rtpsession_rtcp_src_template =
-GST_STATIC_PAD_TEMPLATE ("rtcp_src",
+static GstStaticPadTemplate rtpsession_send_rtcp_src_template =
+GST_STATIC_PAD_TEMPLATE ("send_rtcp_src",
GST_PAD_SRC,
GST_PAD_REQUEST,
GST_STATIC_CAPS ("application/x-rtcp")
@@ -105,7 +108,7 @@ GST_STATIC_PAD_TEMPLATE ("rtcp_src",
/* signals and args */
enum
{
- /* FILL ME */
+ SIGNAL_REQUEST_PT_MAP,
LAST_SIGNAL
};
@@ -123,6 +126,31 @@ enum
struct _GstRTPSessionPrivate
{
GMutex *lock;
+ RTPSession *session;
+ /* thread for sending out RTCP */
+ GstClockID id;
+ gboolean stop_thread;
+ GThread *thread;
+};
+
+/* callbacks to handle actions from the session manager */
+static GstFlowReturn gst_rtp_session_process_rtp (RTPSession * sess,
+ RTPSource * src, GstBuffer * buffer, gpointer user_data);
+static GstFlowReturn gst_rtp_session_send_rtp (RTPSession * sess,
+ RTPSource * src, GstBuffer * buffer, gpointer user_data);
+static GstFlowReturn gst_rtp_session_send_rtcp (RTPSession * sess,
+ RTPSource * src, GstBuffer * buffer, gpointer user_data);
+static gint gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload,
+ gpointer user_data);
+static GstClockTime gst_rtp_session_get_time (RTPSession * sess,
+ gpointer user_data);
+
+static RTPSessionCallbacks callbacks = {
+ gst_rtp_session_process_rtp,
+ gst_rtp_session_send_rtp,
+ gst_rtp_session_send_rtcp,
+ gst_rtp_session_clock_rate,
+ gst_rtp_session_get_time
};
/* GObject vmethods */
@@ -139,7 +167,7 @@ static GstPad *gst_rtp_session_request_new_pad (GstElement * element,
GstPadTemplate * templ, const gchar * name);
static void gst_rtp_session_release_pad (GstElement * element, GstPad * pad);
-/*static guint gst_rtp_session_signals[LAST_SIGNAL] = { 0 }; */
+static guint gst_rtp_session_signals[LAST_SIGNAL] = { 0 };
GST_BOILERPLATE (GstRTPSession, gst_rtp_session, GstElement, GST_TYPE_ELEMENT);
@@ -164,7 +192,7 @@ gst_rtp_session_base_init (gpointer klass)
gst_element_class_add_pad_template (element_class,
gst_static_pad_template_get (&rtpsession_send_rtp_src_template));
gst_element_class_add_pad_template (element_class,
- gst_static_pad_template_get (&rtpsession_rtcp_src_template));
+ gst_static_pad_template_get (&rtpsession_send_rtcp_src_template));
gst_element_class_set_details (element_class, &rtpsession_details);
}
@@ -184,6 +212,19 @@ gst_rtp_session_class_init (GstRTPSessionClass * klass)
gobject_class->set_property = gst_rtp_session_set_property;
gobject_class->get_property = gst_rtp_session_get_property;
+ /**
+ * GstRTPSession::request-pt-map:
+ * @sess: the object which received the signal
+ * @pt: the pt
+ *
+ * Request the payload type as #GstCaps for @pt.
+ */
+ gst_rtp_session_signals[SIGNAL_REQUEST_PT_MAP] =
+ g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRTPSessionClass, request_pt_map),
+ NULL, NULL, gst_rtp_bin_marshal_BOXED__UINT, GST_TYPE_CAPS, 1,
+ G_TYPE_UINT);
+
gstelement_class->change_state =
GST_DEBUG_FUNCPTR (gst_rtp_session_change_state);
gstelement_class->request_new_pad =
@@ -200,6 +241,9 @@ gst_rtp_session_init (GstRTPSession * rtpsession, GstRTPSessionClass * klass)
{
rtpsession->priv = GST_RTP_SESSION_GET_PRIVATE (rtpsession);
rtpsession->priv->lock = g_mutex_new ();
+ rtpsession->priv->session = rtp_session_new ();
+ /* configure callbacks */
+ rtp_session_set_callbacks (rtpsession->priv->session, &callbacks, rtpsession);
}
static void
@@ -209,6 +253,7 @@ gst_rtp_session_finalize (GObject * object)
rtpsession = GST_RTP_SESSION (object);
g_mutex_free (rtpsession->priv->lock);
+ g_object_unref (rtpsession->priv->session);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
@@ -243,6 +288,87 @@ gst_rtp_session_get_property (GObject * object, guint prop_id,
}
}
+static void
+rtcp_thread (GstRTPSession * rtpsession)
+{
+ GstClock *clock;
+ GstClockID id;
+
+ clock = gst_element_get_clock (GST_ELEMENT_CAST (rtpsession));
+ if (clock == NULL)
+ return;
+
+ GST_DEBUG_OBJECT (rtpsession, "entering RTCP thread");
+
+ GST_RTP_SESSION_LOCK (rtpsession);
+ while (!rtpsession->priv->stop_thread) {
+ gdouble timeout;
+ GstClockTime target;
+
+ timeout = rtp_session_get_rtcp_interval (rtpsession->priv->session);
+ GST_DEBUG_OBJECT (rtpsession, "next RTCP timeout: %lf", timeout);
+
+ target = gst_clock_get_time (clock);
+ target += GST_SECOND * timeout;
+ id = rtpsession->priv->id = gst_clock_new_single_shot_id (clock, target);
+ GST_RTP_SESSION_UNLOCK (rtpsession);
+
+ gst_clock_id_wait (id, NULL);
+
+ GST_DEBUG_OBJECT (rtpsession, "got RTCP timeout");
+
+ /* make the session manager produce RTCP, we ignore the result. */
+ rtp_session_produce_rtcp (rtpsession->priv->session);
+
+ GST_RTP_SESSION_LOCK (rtpsession);
+ gst_clock_id_unref (id);
+ rtpsession->priv->id = NULL;
+ }
+ GST_RTP_SESSION_UNLOCK (rtpsession);
+
+ gst_object_unref (clock);
+
+ GST_DEBUG_OBJECT (rtpsession, "leaving RTCP thread");
+}
+
+static gboolean
+start_rtcp_thread (GstRTPSession * rtpsession)
+{
+ GError *error = NULL;
+ gboolean res;
+
+ GST_DEBUG_OBJECT (rtpsession, "starting RTCP thread");
+
+ GST_RTP_SESSION_LOCK (rtpsession);
+ rtpsession->priv->stop_thread = FALSE;
+ rtpsession->priv->thread =
+ g_thread_create ((GThreadFunc) rtcp_thread, rtpsession, TRUE, &error);
+ GST_RTP_SESSION_UNLOCK (rtpsession);
+
+ if (error != NULL) {
+ res = FALSE;
+ GST_DEBUG_OBJECT (rtpsession, "failed to start thread, %s", error->message);
+ g_error_free (error);
+ } else {
+ res = TRUE;
+ }
+ return res;
+}
+
+static void
+stop_rtcp_thread (GstRTPSession * rtpsession)
+{
+ GST_DEBUG_OBJECT (rtpsession, "stopping RTCP thread");
+
+ GST_RTP_SESSION_LOCK (rtpsession);
+ rtpsession->priv->stop_thread = TRUE;
+ if (rtpsession->priv->id)
+ gst_clock_id_unschedule (rtpsession->priv->id);
+ GST_RTP_SESSION_UNLOCK (rtpsession);
+
+ g_thread_join (rtpsession->priv->thread);
+}
+
static GstStateChangeReturn
gst_rtp_session_change_state (GstElement * element, GstStateChange transition)
{
@@ -258,6 +384,8 @@ gst_rtp_session_change_state (GstElement * element, GstStateChange transition)
break;
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
break;
+ case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
+ stop_rtcp_thread (rtpsession);
default:
break;
}
@@ -265,6 +393,10 @@ gst_rtp_session_change_state (GstElement * element, GstStateChange transition)
res = parent_class->change_state (element, transition);
switch (transition) {
+ case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
+ if (!start_rtcp_thread (rtpsession))
+ goto failed_thread;
+ break;
case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:
@@ -275,15 +407,158 @@ gst_rtp_session_change_state (GstElement * element, GstStateChange transition)
break;
}
return res;
+
+ /* ERRORS */
+failed_thread:
+ {
+ return GST_STATE_CHANGE_FAILURE;
+ }
+}
+
+/* called when the session manager has an RTP packet ready for further
+ * processing */
+static GstFlowReturn
+gst_rtp_session_process_rtp (RTPSession * sess, RTPSource * src,
+ GstBuffer * buffer, gpointer user_data)
+{
+ GstFlowReturn result;
+ GstRTPSession *rtpsession;
+ GstRTPSessionPrivate *priv;
+
+ rtpsession = GST_RTP_SESSION (user_data);
+ priv = rtpsession->priv;
+
+ if (rtpsession->recv_rtp_src) {
+ result = gst_pad_push (rtpsession->recv_rtp_src, buffer);
+ } else {
+ gst_buffer_unref (buffer);
+ result = GST_FLOW_OK;
+ }
+ return result;
+}
+
+/* called when the session manager has an RTP packet ready for further
+ * sending */
+static GstFlowReturn
+gst_rtp_session_send_rtp (RTPSession * sess, RTPSource * src,
+ GstBuffer * buffer, gpointer user_data)
+{
+ GstFlowReturn result;
+ GstRTPSession *rtpsession;
+ GstRTPSessionPrivate *priv;
+
+ rtpsession = GST_RTP_SESSION (user_data);
+ priv = rtpsession->priv;
+
+ if (rtpsession->send_rtp_src) {
+ result = gst_pad_push (rtpsession->send_rtp_src, buffer);
+ } else {
+ gst_buffer_unref (buffer);
+ result = GST_FLOW_OK;
+ }
+ return result;
+}
+
+/* called when the session manager has an RTCP packet ready for further
+ * sending */
+static GstFlowReturn
+gst_rtp_session_send_rtcp (RTPSession * sess, RTPSource * src,
+ GstBuffer * buffer, gpointer user_data)
+{
+ GstFlowReturn result;
+ GstRTPSession *rtpsession;
+ GstRTPSessionPrivate *priv;
+
+ rtpsession = GST_RTP_SESSION (user_data);
+ priv = rtpsession->priv;
+
+ if (rtpsession->send_rtcp_src) {
+ result = gst_pad_push (rtpsession->send_rtcp_src, buffer);
+ } else {
+ gst_buffer_unref (buffer);
+ result = GST_FLOW_OK;
+ }
+ return result;
+}
+
+
+/* called when the session manager needs the clock rate */
+static gint
+gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload,
+ gpointer user_data)
+{
+ gint result = -1;
+ GstRTPSession *rtpsession;
+ GValue ret = { 0 };
+ GValue args[2] = { {0}, {0} };
+ GstCaps *caps;
+ const GstStructure *caps_struct;
+
+ rtpsession = GST_RTP_SESSION_CAST (user_data);
+
+ g_value_init (&args[0], GST_TYPE_ELEMENT);
+ g_value_set_object (&args[0], rtpsession);
+ g_value_init (&args[1], G_TYPE_UINT);
+ g_value_set_uint (&args[1], payload);
+
+ g_value_init (&ret, GST_TYPE_CAPS);
+ g_value_set_boxed (&ret, NULL);
+
+ g_signal_emitv (args, gst_rtp_session_signals[SIGNAL_REQUEST_PT_MAP], 0,
+ &ret);
+
+ caps = (GstCaps *) g_value_get_boxed (&ret);
+ if (!caps)
+ goto no_caps;
+
+ caps_struct = gst_caps_get_structure (caps, 0);
+ if (!gst_structure_get_int (caps_struct, "clock-rate", &result))
+ goto no_clock_rate;
+
+ return result;
+
+ /* ERRORS */
+no_caps:
+ {
+ GST_DEBUG_OBJECT (rtpsession, "could not get caps");
+ return -1;
+ }
+no_clock_rate:
+ {
+ GST_DEBUG_OBJECT (rtpsession, "could not clock-rate from caps");
+ return -1;
+ }
+}
+
+/* called when the session manager needs the time of clock */
+static GstClockTime
+gst_rtp_session_get_time (RTPSession * sess, gpointer user_data)
+{
+ GstClockTime result;
+ GstRTPSession *rtpsession;
+ GstClock *clock;
+
+ rtpsession = GST_RTP_SESSION_CAST (user_data);
+
+ clock = gst_element_get_clock (GST_ELEMENT_CAST (rtpsession));
+ if (clock) {
+ result = gst_clock_get_time (clock);
+ gst_object_unref (clock);
+ } else
+ result = GST_CLOCK_TIME_NONE;
+
+ return result;
}
static GstFlowReturn
gst_rtp_session_event_recv_rtp_sink (GstPad * pad, GstEvent * event)
{
GstRTPSession *rtpsession;
+ GstRTPSessionPrivate *priv;
gboolean ret = FALSE;
rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
+ priv = rtpsession->priv;
GST_DEBUG_OBJECT (rtpsession, "received event %s",
GST_EVENT_TYPE_NAME (event));
@@ -305,14 +580,15 @@ static GstFlowReturn
gst_rtp_session_chain_recv_rtp (GstPad * pad, GstBuffer * buffer)
{
GstRTPSession *rtpsession;
+ GstRTPSessionPrivate *priv;
GstFlowReturn ret;
rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
+ priv = rtpsession->priv;
GST_DEBUG_OBJECT (rtpsession, "received RTP packet");
- /* FIXME, do something */
- ret = gst_pad_push (rtpsession->recv_rtp_src, buffer);
+ ret = rtp_session_process_rtp (priv->session, buffer);
gst_object_unref (rtpsession);
@@ -323,9 +599,11 @@ static GstFlowReturn
gst_rtp_session_event_recv_rtcp_sink (GstPad * pad, GstEvent * event)
{
GstRTPSession *rtpsession;
+ GstRTPSessionPrivate *priv;
gboolean ret = FALSE;
rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
+ priv = rtpsession->priv;
GST_DEBUG_OBJECT (rtpsession, "received event %s",
GST_EVENT_TYPE_NAME (event));
@@ -347,14 +625,15 @@ static GstFlowReturn
gst_rtp_session_chain_recv_rtcp (GstPad * pad, GstBuffer * buffer)
{
GstRTPSession *rtpsession;
+ GstRTPSessionPrivate *priv;
GstFlowReturn ret;
rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
+ priv = rtpsession->priv;
- /* FIXME, do something */
GST_DEBUG_OBJECT (rtpsession, "received RTCP packet");
- ret = gst_pad_push (rtpsession->sync_src, buffer);
+ ret = rtp_session_process_rtcp (priv->session, buffer);
gst_object_unref (rtpsession);
@@ -365,9 +644,11 @@ static GstFlowReturn
gst_rtp_session_event_send_rtp_sink (GstPad * pad, GstEvent * event)
{
GstRTPSession *rtpsession;
+ GstRTPSessionPrivate *priv;
gboolean ret = FALSE;
rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
+ priv = rtpsession->priv;
GST_DEBUG_OBJECT (rtpsession, "received event");
@@ -388,14 +669,15 @@ static GstFlowReturn
gst_rtp_session_chain_send_rtp (GstPad * pad, GstBuffer * buffer)
{
GstRTPSession *rtpsession;
+ GstRTPSessionPrivate *priv;
GstFlowReturn ret;
rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
+ priv = rtpsession->priv;
GST_DEBUG_OBJECT (rtpsession, "received RTP packet");
- /* FIXME, do something */
- ret = gst_pad_push (rtpsession->send_rtp_src, buffer);
+ ret = rtp_session_send_rtp (priv->session, buffer);
gst_object_unref (rtpsession);
@@ -494,16 +776,18 @@ create_send_rtp_sink (GstRTPSession * rtpsession)
* RTCP packets.
*/
static GstPad *
-create_rtcp_src (GstRTPSession * rtpsession)
+create_send_rtcp_src (GstRTPSession * rtpsession)
{
GST_DEBUG_OBJECT (rtpsession, "creating pad");
- rtpsession->rtcp_src =
- gst_pad_new_from_static_template (&rtpsession_rtcp_src_template, NULL);
- gst_pad_set_active (rtpsession->rtcp_src, TRUE);
- gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->rtcp_src);
+ rtpsession->send_rtcp_src =
+ gst_pad_new_from_static_template (&rtpsession_send_rtcp_src_template,
+ NULL);
+ gst_pad_set_active (rtpsession->send_rtcp_src, TRUE);
+ gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
+ rtpsession->send_rtcp_src);
- return rtpsession->rtcp_src;
+ return rtpsession->send_rtcp_src;
}
static GstPad *
@@ -542,11 +826,12 @@ gst_rtp_session_request_new_pad (GstElement * element,
goto exists;
result = create_send_rtp_sink (rtpsession);
- } else if (templ == gst_element_class_get_pad_template (klass, "rtcp_src")) {
- if (rtpsession->rtcp_src != NULL)
+ } else if (templ == gst_element_class_get_pad_template (klass,
+ "send_rtcp_src")) {
+ if (rtpsession->send_rtcp_src != NULL)
goto exists;
- result = create_rtcp_src (rtpsession);
+ result = create_send_rtcp_src (rtpsession);
} else
goto wrong_template;
diff --git a/gst/rtpmanager/gstrtpsession.h b/gst/rtpmanager/gstrtpsession.h
index 8b343064..25bbb6eb 100644
--- a/gst/rtpmanager/gstrtpsession.h
+++ b/gst/rtpmanager/gstrtpsession.h
@@ -32,6 +32,7 @@
(G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_RTP_SESSION))
#define GST_IS_RTP_SESSION_CLASS(klass) \
(G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_RTP_SESSION))
+#define GST_RTP_SESSION_CAST(obj) ((GstRTPSession *)(obj))
typedef struct _GstRTPSession GstRTPSession;
typedef struct _GstRTPSessionClass GstRTPSessionClass;
@@ -48,13 +49,16 @@ struct _GstRTPSession {
GstPad *recv_rtp_src;
GstPad *sync_src;
GstPad *send_rtp_src;
- GstPad *rtcp_src;
+ GstPad *send_rtcp_src;
GstRTPSessionPrivate *priv;
};
struct _GstRTPSessionClass {
GstElementClass parent_class;
+
+ /* signals */
+ GstCaps* (*request_pt_map) (GstRTPSession *sess, guint pt);
};
GType gst_rtp_session_get_type (void);
diff --git a/gst/rtpmanager/rtpsession.c b/gst/rtpmanager/rtpsession.c
new file mode 100644
index 00000000..2283dc97
--- /dev/null
+++ b/gst/rtpmanager/rtpsession.c
@@ -0,0 +1,1026 @@
+/* GStreamer
+ * Copyright (C) <2007> Wim Taymans <wim@fluendo.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#include <string.h>
+
+#include <gst/rtp/gstrtpbuffer.h>
+#include <gst/rtp/gstrtcpbuffer.h>
+#include <gst/netbuffer/gstnetbuffer.h>
+
+#include "rtpsession.h"
+
+GST_DEBUG_CATEGORY_STATIC (rtp_session_debug);
+#define GST_CAT_DEFAULT rtp_session_debug
+
+/* signals and args */
+enum
+{
+ SIGNAL_ON_NEW_SSRC,
+ SIGNAL_ON_SSRC_COLLISION,
+ SIGNAL_ON_SSRC_VALIDATED,
+ SIGNAL_ON_BYE_SSRC,
+ LAST_SIGNAL
+};
+
+#define RTP_DEFAULT_BANDWIDTH 64000.0
+#define RTP_DEFAULT_RTCP_BANDWIDTH 1000
+
+enum
+{
+ PROP_0
+};
+
+/* GObject vmethods */
+static void rtp_session_finalize (GObject * object);
+static void rtp_session_set_property (GObject * object, guint prop_id,
+ const GValue * value, GParamSpec * pspec);
+static void rtp_session_get_property (GObject * object, guint prop_id,
+ GValue * value, GParamSpec * pspec);
+
+static guint rtp_session_signals[LAST_SIGNAL] = { 0 };
+
+G_DEFINE_TYPE (RTPSession, rtp_session, G_TYPE_OBJECT);
+
+static void
+rtp_session_class_init (RTPSessionClass * klass)
+{
+ GObjectClass *gobject_class;
+
+ gobject_class = (GObjectClass *) klass;
+
+ gobject_class->finalize = rtp_session_finalize;
+ gobject_class->set_property = rtp_session_set_property;
+ gobject_class->get_property = rtp_session_get_property;
+
+ /**
+ * RTPSession::on-new-ssrc:
+ * @session: the object which received the signal
+ * @src: the new RTPSource
+ *
+ * Notify of a new SSRC that entered @session.
+ */
+ rtp_session_signals[SIGNAL_ON_NEW_SSRC] =
+ g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_new_ssrc),
+ NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
+ G_TYPE_OBJECT);
+ /**
+ * RTPSession::on-ssrc_collision:
+ * @session: the object which received the signal
+ * @src: the #RTPSource that caused a collision
+ *
+ * Notify when we have an SSRC collision
+ */
+ rtp_session_signals[SIGNAL_ON_SSRC_COLLISION] =
+ g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_collision),
+ NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
+ G_TYPE_OBJECT);
+ /**
+ * RTPSession::on-ssrc_validated:
+ * @session: the object which received the signal
+ * @src: the new validated RTPSource
+ *
+ * Notify of a new SSRC that became validated.
+ */
+ rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED] =
+ g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_validated),
+ NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
+ G_TYPE_OBJECT);
+ /**
+ * RTPSession::on-bye-ssrc:
+ * @session: the object which received the signal
+ * @src: the RTPSource that went away
+ *
+ * Notify of an SSRC that became inactive because of a BYE packet.
+ */
+ rtp_session_signals[SIGNAL_ON_BYE_SSRC] =
+ g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_ssrc),
+ NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
+ G_TYPE_OBJECT);
+
+ GST_DEBUG_CATEGORY_INIT (rtp_session_debug, "rtpsession", 0, "RTP Session");
+}
+
+static void
+rtp_session_init (RTPSession * sess)
+{
+ sess->lock = g_mutex_new ();
+ sess->ssrcs =
+ g_hash_table_new_full (NULL, NULL, NULL, (GDestroyNotify) g_object_unref);
+ sess->cnames = g_hash_table_new_full (NULL, NULL, g_free, NULL);
+
+ /* create an SSRC for this session manager */
+ sess->source = rtp_session_create_source (sess);
+
+ rtp_stats_init_defaults (&sess->stats);
+
+ /* default UDP header length */
+ sess->header_len = 28;
+
+ GST_DEBUG ("%p: session using SSRC: %08x", sess, sess->source->ssrc);
+}
+
+static void
+rtp_session_finalize (GObject * object)
+{
+ RTPSession *sess;
+
+ sess = RTP_SESSION_CAST (object);
+
+ g_mutex_free (sess->lock);
+ g_hash_table_unref (sess->ssrcs);
+ g_hash_table_unref (sess->cnames);
+ g_object_unref (sess->source);
+
+ G_OBJECT_CLASS (rtp_session_parent_class)->finalize (object);
+}
+
+static void
+rtp_session_set_property (GObject * object, guint prop_id,
+ const GValue * value, GParamSpec * pspec)
+{
+ RTPSession *sess;
+
+ sess = RTP_SESSION (object);
+
+ switch (prop_id) {
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+rtp_session_get_property (GObject * object, guint prop_id,
+ GValue * value, GParamSpec * pspec)
+{
+ RTPSession *sess;
+
+ sess = RTP_SESSION (object);
+
+ switch (prop_id) {
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+on_new_ssrc (RTPSession * sess, RTPSource * source)
+{
+ g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_NEW_SSRC], 0, source);
+}
+
+static void
+on_ssrc_collision (RTPSession * sess, RTPSource * source)
+{
+ g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_COLLISION], 0,
+ source);
+}
+
+static void
+on_ssrc_validated (RTPSession * sess, RTPSource * source)
+{
+ g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
+ source);
+}
+
+static void
+on_bye_ssrc (RTPSession * sess, RTPSource * source)
+{
+ g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_SSRC], 0, source);
+}
+
+/**
+ * rtp_session_new:
+ *
+ * Create a new session object.
+ *
+ * Returns: a new #RTPSession. g_object_unref() after usage.
+ */
+RTPSession *
+rtp_session_new (void)
+{
+ RTPSession *sess;
+
+ sess = g_object_new (RTP_TYPE_SESSION, NULL);
+
+ return sess;
+}
+
+/**
+ * rtp_session_set_callbacks:
+ * @sess: an #RTPSession
+ * @callbacks: callbacks to configure
+ * @user_data: user data passed in the callbacks
+ *
+ * Configure a set of callbacks to be notified of actions.
+ */
+void
+rtp_session_set_callbacks (RTPSession * sess, RTPSessionCallbacks * callbacks,
+ gpointer user_data)
+{
+ g_return_if_fail (RTP_IS_SESSION (sess));
+
+ sess->callbacks.process_rtp = callbacks->process_rtp;
+ sess->callbacks.send_rtp = callbacks->send_rtp;
+ sess->callbacks.send_rtcp = callbacks->send_rtcp;
+ sess->callbacks.clock_rate = callbacks->clock_rate;
+ sess->callbacks.get_time = callbacks->get_time;
+ sess->user_data = user_data;
+}
+
+/**
+ * rtp_session_set_bandwidth:
+ * @sess: an #RTPSession
+ * @bandwidth: the bandwidth allocated
+ *
+ * Set the session bandwidth in bytes per second.
+ */
+void
+rtp_session_set_bandwidth (RTPSession * sess, gdouble bandwidth)
+{
+ g_return_if_fail (RTP_IS_SESSION (sess));
+
+ sess->stats.bandwidth = bandwidth;
+}
+
+/**
+ * rtp_session_get_bandwidth:
+ * @sess: an #RTPSession
+ *
+ * Get the session bandwidth.
+ *
+ * Returns: the session bandwidth.
+ */
+gdouble
+rtp_session_get_bandwidth (RTPSession * sess)
+{
+ g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
+
+ return sess->stats.bandwidth;
+}
+
+/**
+ * rtp_session_set_rtcp_bandwidth:
+ * @sess: an #RTPSession
+ * @bandwidth: the RTCP bandwidth
+ *
+ * Set the bandwidth that should be used for RTCP
+ * messages.
+ */
+void
+rtp_session_set_rtcp_bandwidth (RTPSession * sess, gdouble bandwidth)
+{
+ g_return_if_fail (RTP_IS_SESSION (sess));
+
+ sess->stats.rtcp_bandwidth = bandwidth;
+}
+
+/**
+ * rtp_session_get_rtcp_bandwidth:
+ * @sess: an #RTPSession
+ *
+ * Get the session bandwidth used for RTCP.
+ *
+ * Returns: The bandwidth used for RTCP messages.
+ */
+gdouble
+rtp_session_get_rtcp_bandwidth (RTPSession * sess)
+{
+ g_return_val_if_fail (RTP_IS_SESSION (sess), 0.0);
+
+ return sess->stats.rtcp_bandwidth;
+}
+
+static GstFlowReturn
+source_push_rtp (RTPSource * source, GstBuffer * buffer, RTPSession * session)
+{
+ GstFlowReturn result = GST_FLOW_OK;
+
+ if (source == session->source) {
+ GST_DEBUG ("source %08x pushed sender RTP packet", source->ssrc);
+ if (session->callbacks.send_rtp)
+ result =
+ session->callbacks.send_rtp (session, source, buffer,
+ session->user_data);
+ else
+ gst_buffer_unref (buffer);
+ } else {
+ GST_DEBUG ("source %08x pushed receiver RTP packet", source->ssrc);
+ if (session->callbacks.process_rtp)
+ result =
+ session->callbacks.process_rtp (session, source, buffer,
+ session->user_data);
+ else
+ gst_buffer_unref (buffer);
+ }
+ return result;
+}
+
+static gint
+source_clock_rate (RTPSource * source, guint8 pt, RTPSession * session)
+{
+ gint result;
+
+ if (session->callbacks.clock_rate)
+ result = session->callbacks.clock_rate (session, pt, session->user_data);
+ else
+ result = -1;
+
+ GST_DEBUG ("got clock-rate %d for pt %d", result, pt);
+
+ return result;
+}
+
+static RTPSourceCallbacks callbacks = {
+ (RTPSourcePushRTP) source_push_rtp,
+ (RTPSourceClockRate) source_clock_rate,
+};
+
+static gboolean
+check_collision (RTPSession * sess, RTPSource * source,
+ RTPArrivalStats * arrival)
+{
+ /* FIXME, do collision check */
+ return FALSE;
+}
+
+static RTPSource *
+obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created,
+ RTPArrivalStats * arrival, gboolean rtp)
+{
+ RTPSource *source;
+
+ source = g_hash_table_lookup (sess->ssrcs, GINT_TO_POINTER (ssrc));
+ if (source == NULL) {
+ /* make new Source in probation and insert */
+ source = rtp_source_new (ssrc);
+
+ if (rtp)
+ source->probation = RTP_DEFAULT_PROBATION;
+ else
+ source->probation = 0;
+
+ /* store from address, if any */
+ if (arrival->have_address) {
+ if (rtp)
+ rtp_source_set_rtp_from (source, &arrival->address);
+ else
+ rtp_source_set_rtcp_from (source, &arrival->address);
+ }
+
+ /* configure a callback on the source */
+ rtp_source_set_callbacks (source, &callbacks, sess);
+
+ g_hash_table_insert (sess->ssrcs, GINT_TO_POINTER (ssrc), source);
+
+ /* we have one more source now */
+ sess->total_sources++;
+ *created = TRUE;
+ } else {
+ *created = FALSE;
+ /* check for collision, this updates the address when not previously set */
+ if (check_collision (sess, source, arrival))
+ on_ssrc_collision (sess, source);
+ }
+ return source;
+}
+
+/**
+ * rtp_session_add_source:
+ * @sess: a #RTPSession
+ * @src: #RTPSource to add
+ *
+ * Add @src to @session.
+ *
+ * Returns: %TRUE on success, %FALSE if a source with the same SSRC already
+ * existed in the session.
+ */
+gboolean
+rtp_session_add_source (RTPSession * sess, RTPSource * src)
+{
+ gboolean result = FALSE;
+ RTPSource *find;
+
+ g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
+ g_return_val_if_fail (src != NULL, FALSE);
+
+ RTP_SESSION_LOCK (sess);
+ find = g_hash_table_lookup (sess->ssrcs, GINT_TO_POINTER (src->ssrc));
+ if (find == NULL) {
+ g_hash_table_insert (sess->ssrcs, GINT_TO_POINTER (src->ssrc), src);
+ /* we have one more source now */
+ sess->total_sources++;
+ result = TRUE;
+ }
+ RTP_SESSION_UNLOCK (sess);
+
+ return result;
+}
+
+/**
+ * rtp_session_get_num_sources:
+ * @sess: an #RTPSession
+ *
+ * Get the number of sources in @sess.
+ *
+ * Returns: The number of sources in @sess.
+ */
+gint
+rtp_session_get_num_sources (RTPSession * sess)
+{
+ gint result;
+
+ g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
+
+ RTP_SESSION_LOCK (sess);
+ result = sess->total_sources;
+ RTP_SESSION_UNLOCK (sess);
+
+ return result;
+}
+
+/**
+ * rtp_session_get_num_active_sources:
+ * @sess: an #RTPSession
+ *
+ * Get the number of active sources in @sess. A source is considered active when
+ * it has been validated and has not yet received a BYE RTCP message.
+ *
+ * Returns: The number of active sources in @sess.
+ */
+gint
+rtp_session_get_num_active_sources (RTPSession * sess)
+{
+ gint result;
+
+ g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
+
+ RTP_SESSION_LOCK (sess);
+ result = sess->stats.active_sources;
+ RTP_SESSION_UNLOCK (sess);
+
+ return result;
+}
+
+/**
+ * rtp_session_get_source_by_ssrc:
+ * @sess: an #RTPSession
+ * @ssrc: an SSRC
+ *
+ * Find the source with @ssrc in @sess.
+ *
+ * Returns: a #RTPSource with SSRC @ssrc or NULL if the source was not found.
+ * g_object_unref() after usage.
+ */
+RTPSource *
+rtp_session_get_source_by_ssrc (RTPSession * sess, guint32 ssrc)
+{
+ RTPSource *result;
+
+ g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
+
+ RTP_SESSION_LOCK (sess);
+ result = g_hash_table_lookup (sess->ssrcs, GINT_TO_POINTER (ssrc));
+ if (result)
+ g_object_ref (result);
+ RTP_SESSION_UNLOCK (sess);
+
+ return result;
+}
+
+/**
+ * rtp_session_get_source_by_cname:
+ * @sess: a #RTPSession
+ * @cname: an CNAME
+ *
+ * Find the source with @cname in @sess.
+ *
+ * Returns: a #RTPSource with CNAME @cname or NULL if the source was not found.
+ * g_object_unref() after usage.
+ */
+RTPSource *
+rtp_session_get_source_by_cname (RTPSession * sess, const gchar * cname)
+{
+ RTPSource *result;
+
+ g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
+ g_return_val_if_fail (cname != NULL, NULL);
+
+ RTP_SESSION_LOCK (sess);
+ result = g_hash_table_lookup (sess->cnames, cname);
+ if (result)
+ g_object_ref (result);
+ RTP_SESSION_UNLOCK (sess);
+
+ return result;
+}
+
+/**
+ * rtp_session_create_source:
+ * @sess: an #RTPSession
+ *
+ * Create an #RTPSource for use in @sess. This function will create a source
+ * with an ssrc that is currently not used by any participants in the session.
+ *
+ * Returns: an #RTPSource.
+ */
+RTPSource *
+rtp_session_create_source (RTPSession * sess)
+{
+ guint32 ssrc;
+ RTPSource *source;
+
+ RTP_SESSION_LOCK (sess);
+ while (TRUE) {
+ ssrc = g_random_int ();
+
+ /* see if it exists in the session, we're done if it doesn't */
+ if (g_hash_table_lookup (sess->ssrcs, GINT_TO_POINTER (ssrc)) == NULL)
+ break;
+ }
+ source = rtp_source_new (ssrc);
+ g_hash_table_insert (sess->ssrcs, GINT_TO_POINTER (ssrc), source);
+ /* we have one more source now */
+ sess->total_sources++;
+ RTP_SESSION_UNLOCK (sess);
+
+ return source;
+}
+
+/* update the RTPArrivalStats structure with the current time and other bits
+ * about the current buffer we are handling.
+ * This function is typically called when a validated packet is received.
+ */
+static void
+update_arrival_stats (RTPSession * sess, RTPArrivalStats * arrival,
+ gboolean rtp, GstBuffer * buffer)
+{
+ /* get time or arrival */
+ if (sess->callbacks.get_time)
+ arrival->time = sess->callbacks.get_time (sess, sess->user_data);
+ else
+ arrival->time = GST_CLOCK_TIME_NONE;
+
+ /* update sizes */
+ arrival->bytes = GST_BUFFER_SIZE (buffer) + 28;
+ arrival->payload_len = (rtp ? gst_rtp_buffer_get_payload_len (buffer) : 0);
+
+ /* for netbuffer we can store the IP address to check for collisions */
+ arrival->have_address = GST_IS_NETBUFFER (buffer);
+ if (arrival->have_address) {
+ GstNetBuffer *netbuf = (GstNetBuffer *) buffer;
+
+ memcpy (&arrival->address, &netbuf->from, sizeof (GstNetAddress));
+ }
+}
+
+/**
+ * rtp_session_process_rtp:
+ * @sess: and #RTPSession
+ * @buffer: an RTP buffer
+ *
+ * Process an RTP buffer in the session manager. This function takes ownership
+ * of @buffer.
+ *
+ * Returns: a #GstFlowReturn.
+ */
+GstFlowReturn
+rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer)
+{
+ GstFlowReturn result;
+ guint32 ssrc;
+ RTPSource *source;
+ gboolean created;
+ gboolean prevsender, prevactive;
+ RTPArrivalStats arrival;
+
+ g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
+ g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
+
+ if (!gst_rtp_buffer_validate (buffer))
+ goto invalid_packet;
+
+ /* update arrival stats */
+ update_arrival_stats (sess, &arrival, TRUE, buffer);
+
+ /* get SSRC and look up in session database */
+ ssrc = gst_rtp_buffer_get_ssrc (buffer);
+
+ RTP_SESSION_LOCK (sess);
+ source = obtain_source (sess, ssrc, &created, &arrival, TRUE);
+
+ prevsender = RTP_SOURCE_IS_SENDER (source);
+ prevactive = RTP_SOURCE_IS_ACTIVE (source);
+
+ /* let source process the packet */
+ result = rtp_source_process_rtp (source, buffer, &arrival);
+
+ /* source became active */
+ if (prevactive != RTP_SOURCE_IS_ACTIVE (source)) {
+ sess->stats.active_sources++;
+ GST_DEBUG ("source: %08x became active, %d active sources", ssrc,
+ sess->stats.active_sources);
+ on_ssrc_validated (sess, source);
+ }
+ if (prevsender != RTP_SOURCE_IS_SENDER (source)) {
+ sess->stats.sender_sources++;
+ GST_DEBUG ("source: %08x became sender, %d sender sources", ssrc,
+ sess->stats.sender_sources);
+ }
+
+ if (created)
+ on_new_ssrc (sess, source);
+
+ /* for validated sources, we add the CSRCs as well */
+ if (source->validated) {
+ guint8 i, count;
+
+ count = gst_rtp_buffer_get_csrc_count (buffer);
+
+ for (i = 0; i < count; i++) {
+ guint32 csrc;
+ RTPSource *csrc_src;
+
+ csrc = gst_rtp_buffer_get_csrc (buffer, i);
+
+ /* get source */
+ csrc_src = obtain_source (sess, csrc, &created, &arrival, TRUE);
+ if (created) {
+ GST_DEBUG ("created new CSRC: %08x", csrc);
+ rtp_source_set_as_csrc (csrc_src);
+ if (RTP_SOURCE_IS_ACTIVE (csrc_src))
+ sess->stats.active_sources++;
+ on_new_ssrc (sess, source);
+ }
+ }
+ }
+ RTP_SESSION_UNLOCK (sess);
+
+ return result;
+
+ /* ERRORS */
+invalid_packet:
+ {
+ GST_DEBUG ("invalid RTP packet received");
+ return GST_FLOW_OK;
+ }
+}
+
+/* A Sender report contains statistics about how the sender is doing. This
+ * includes timing informataion about the relation between RTP and NTP
+ * timestamps is it using and the number of packets/bytes it sent to us.
+ *
+ * In this report is also included a set of report blocks related to how this
+ * sender is receiving data (in case we (or somebody else) is also sending stuff
+ * to it). This info includes the packet loss, jitter and seqnum. It also
+ * contains information to calculate the round trip time (LSR/DLSR).
+ */
+static void
+rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet,
+ RTPArrivalStats * arrival)
+{
+ guint32 senderssrc, rtptime, packet_count, octet_count;
+ guint64 ntptime;
+ guint count, i;
+ RTPSource *source;
+ gboolean created;
+
+ gst_rtcp_packet_sr_get_sender_info (packet, &senderssrc, &ntptime, &rtptime,
+ &packet_count, &octet_count);
+
+ RTP_SESSION_LOCK (sess);
+ source = obtain_source (sess, senderssrc, &created, arrival, FALSE);
+
+ /* first update the source */
+ rtp_source_process_sr (source, ntptime, rtptime, packet_count, octet_count);
+
+ if (created)
+ on_new_ssrc (sess, source);
+
+ count = gst_rtcp_packet_get_rb_count (packet);
+ for (i = 0; i < count; i++) {
+ guint32 ssrc, exthighestseq, jitter, lsr, dlsr;
+ guint8 fractionlost;
+ gint32 packetslost;
+
+ gst_rtcp_packet_get_rb (packet, i, &ssrc, &fractionlost,
+ &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
+
+ if (ssrc == sess->source->ssrc) {
+ /* only deal with report blocks for our session, we update the stats of
+ * the sender of the TCP message. We could also compare our stats against
+ * the other sender to see if we are better or worse. */
+ rtp_source_process_rb (source, fractionlost, packetslost,
+ exthighestseq, jitter, lsr, dlsr);
+ }
+ }
+ RTP_SESSION_UNLOCK (sess);
+}
+
+/* A receiver report contains statistics about how a receiver is doing. It
+ * includes stuff like packet loss, jitter and the seqnum it received last. It
+ * also contains info to calculate the round trip time.
+ *
+ * We are only interested in how the sender of this report is doing wrt to us.
+ */
+static void
+rtp_session_process_rr (RTPSession * sess, GstRTCPPacket * packet,
+ RTPArrivalStats * arrival)
+{
+ guint32 senderssrc;
+ guint count, i;
+ RTPSource *source;
+ gboolean created;
+
+ senderssrc = gst_rtcp_packet_rr_get_ssrc (packet);
+
+ GST_DEBUG ("got RR packet: SSRC %08x", senderssrc);
+
+ RTP_SESSION_LOCK (sess);
+ source = obtain_source (sess, senderssrc, &created, arrival, FALSE);
+
+ if (created)
+ on_new_ssrc (sess, source);
+
+ count = gst_rtcp_packet_get_rb_count (packet);
+ for (i = 0; i < count; i++) {
+ guint32 ssrc, exthighestseq, jitter, lsr, dlsr;
+ guint8 fractionlost;
+ gint32 packetslost;
+
+ gst_rtcp_packet_get_rb (packet, i, &ssrc, &fractionlost,
+ &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
+
+ if (ssrc == sess->source->ssrc) {
+ rtp_source_process_rb (source, fractionlost, packetslost,
+ exthighestseq, jitter, lsr, dlsr);
+ }
+ }
+ RTP_SESSION_UNLOCK (sess);
+}
+
+/* FIXME, we're just printing this for now... */
+static void
+rtp_session_process_sdes (RTPSession * sess, GstRTCPPacket * packet,
+ RTPArrivalStats * arrival)
+{
+ guint chunks, i, j;
+ gboolean more_chunks, more_items;
+
+ chunks = gst_rtcp_packet_sdes_get_chunk_count (packet);
+ GST_DEBUG ("got SDES packet with %d chunks", chunks);
+
+ more_chunks = gst_rtcp_packet_sdes_first_chunk (packet);
+ i = 0;
+ while (more_chunks) {
+ guint32 ssrc;
+
+ ssrc = gst_rtcp_packet_sdes_get_ssrc (packet);
+
+ GST_DEBUG ("chunk %d, SSRC %08x", i, ssrc);
+
+ more_items = gst_rtcp_packet_sdes_first_item (packet);
+ j = 0;
+ while (more_items) {
+ GstRTCPSDESType type;
+ guint8 len;
+ gchar *data;
+
+ gst_rtcp_packet_sdes_get_item (packet, &type, &len, &data);
+
+ GST_DEBUG ("item %d, type %d, len %d, data %s", j, type, len, data);
+
+ more_items = gst_rtcp_packet_sdes_next_item (packet);
+ j++;
+ }
+ more_chunks = gst_rtcp_packet_sdes_next_chunk (packet);
+ i++;
+ }
+}
+
+/* BYE is sent when a client leaves the session
+ */
+static void
+rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet,
+ RTPArrivalStats * arrival)
+{
+ guint count, i;
+ gchar *reason;
+
+ reason = gst_rtcp_packet_bye_get_reason (packet);
+ GST_DEBUG ("got BYE packet (reason: %s)", GST_STR_NULL (reason));
+
+ count = gst_rtcp_packet_bye_get_ssrc_count (packet);
+ for (i = 0; i < count; i++) {
+ guint32 ssrc;
+ RTPSource *source;
+ gboolean created, prevactive, prevsender;
+
+ ssrc = gst_rtcp_packet_bye_get_nth_ssrc (packet, i);
+ GST_DEBUG ("SSRC: %08x", ssrc);
+
+ /* find src and mark bye, no probation when dealing with RTCP */
+ RTP_SESSION_LOCK (sess);
+ source = obtain_source (sess, ssrc, &created, arrival, FALSE);
+
+ prevactive = RTP_SOURCE_IS_ACTIVE (source);
+ prevsender = RTP_SOURCE_IS_SENDER (source);
+
+ /* let the source handle the rest */
+ rtp_source_process_bye (source, reason);
+
+ if (prevactive && !RTP_SOURCE_IS_ACTIVE (source)) {
+ sess->stats.active_sources--;
+ GST_DEBUG ("source: %08x became inactive, %d active sources", ssrc,
+ sess->stats.active_sources);
+ }
+ if (prevsender && !RTP_SOURCE_IS_SENDER (source)) {
+ sess->stats.sender_sources--;
+ GST_DEBUG ("source: %08x became non sender, %d sender sources", ssrc,
+ sess->stats.sender_sources);
+ }
+
+ if (created)
+ on_new_ssrc (sess, source);
+
+ on_bye_ssrc (sess, source);
+ RTP_SESSION_UNLOCK (sess);
+ }
+ g_free (reason);
+}
+
+static void
+rtp_session_process_app (RTPSession * sess, GstRTCPPacket * packet,
+ RTPArrivalStats * arrival)
+{
+ GST_DEBUG ("received APP");
+}
+
+/**
+ * rtp_session_process_rtcp:
+ * @sess: and #RTPSession
+ * @buffer: an RTCP buffer
+ *
+ * Process an RTCP buffer in the session manager.
+ *
+ * Returns: a #GstFlowReturn.
+ */
+GstFlowReturn
+rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer)
+{
+ GstRTCPPacket packet;
+ gboolean more;
+ RTPArrivalStats arrival;
+ guint size;
+
+ g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
+ g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
+
+ if (!gst_rtcp_buffer_validate (buffer))
+ goto invalid_packet;
+
+ /* update arrival stats */
+ update_arrival_stats (sess, &arrival, FALSE, buffer);
+
+ GST_DEBUG ("received RTCP packet");
+
+ /* get packet size including header overhead */
+ size = GST_BUFFER_SIZE (buffer) + sess->header_len;
+
+ /* update average RTCP packet size */
+ if (sess->stats.avg_rtcp_packet_size == 0)
+ sess->stats.avg_rtcp_packet_size = size;
+ else
+ sess->stats.avg_rtcp_packet_size =
+ (size + (15 * sess->stats.avg_rtcp_packet_size)) >> 4;
+
+ /* start processing the compound packet */
+ more = gst_rtcp_buffer_get_first_packet (buffer, &packet);
+ while (more) {
+ switch (gst_rtcp_packet_get_type (&packet)) {
+ case GST_RTCP_TYPE_SR:
+ rtp_session_process_sr (sess, &packet, &arrival);
+ break;
+ case GST_RTCP_TYPE_RR:
+ rtp_session_process_rr (sess, &packet, &arrival);
+ break;
+ case GST_RTCP_TYPE_SDES:
+ rtp_session_process_sdes (sess, &packet, &arrival);
+ break;
+ case GST_RTCP_TYPE_BYE:
+ rtp_session_process_bye (sess, &packet, &arrival);
+ break;
+ case GST_RTCP_TYPE_APP:
+ rtp_session_process_app (sess, &packet, &arrival);
+ break;
+ default:
+ GST_WARNING ("got unknown RTCP packet");
+ break;
+ }
+ more = gst_rtcp_packet_move_to_next (&packet);
+ }
+
+ gst_buffer_unref (buffer);
+
+ return GST_FLOW_OK;
+
+ /* ERRORS */
+invalid_packet:
+ {
+ GST_DEBUG ("invalid RTCP packet received");
+ return GST_FLOW_OK;
+ }
+}
+
+/**
+ * rtp_session_send_rtp:
+ * @sess: and #RTPSession
+ * @buffer: an RTP buffer
+ *
+ * Send the RTP buffer in the session manager.
+ *
+ * Returns: a #GstFlowReturn.
+ */
+GstFlowReturn
+rtp_session_send_rtp (RTPSession * sess, GstBuffer * buffer)
+{
+ GstFlowReturn result;
+ RTPSource *source;
+ gboolean prevsender;
+
+ g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
+ g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
+
+ source = sess->source;
+
+ prevsender = RTP_SOURCE_IS_SENDER (source);
+
+ /* we use our own source to send */
+ result = rtp_source_send_rtp (sess->source, buffer);
+
+ if (RTP_SOURCE_IS_SENDER (source) && !prevsender)
+ sess->stats.sender_sources++;
+
+ return result;
+}
+
+/**
+ * rtp_session_get_rtcp_interval:
+ * @sess: an #RTPSession
+ *
+ * Get the interval for sending out the next RTCP packet
+ *
+ * Returns: an interval in seconds.
+ */
+gdouble
+rtp_session_get_rtcp_interval (RTPSession * sess)
+{
+ gdouble result;
+
+ g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
+
+ RTP_SESSION_LOCK (sess);
+ result = rtp_stats_calculate_rtcp_interval (&sess->stats, FALSE);
+ result = rtp_stats_add_rtcp_jitter (&sess->stats, result);
+ RTP_SESSION_UNLOCK (sess);
+
+ return result;
+}
+
+/**
+ * rtp_session_produce_rtcp:
+ * @sess: an #RTPSession
+ *
+ * Instruct the session manager to generate RTCP packets with current stats.
+ * This function will call the #RTPSessionSendRTCP callback, possibly multiple
+ * times, for each packet that should be processed.
+ *
+ * Returns: a #GstFlowReturn.
+ */
+GstFlowReturn
+rtp_session_produce_rtcp (RTPSession * sess)
+{
+ /* FIXME: implement me */
+ return GST_FLOW_NOT_SUPPORTED;
+}
diff --git a/gst/rtpmanager/rtpsession.h b/gst/rtpmanager/rtpsession.h
new file mode 100644
index 00000000..46062c99
--- /dev/null
+++ b/gst/rtpmanager/rtpsession.h
@@ -0,0 +1,206 @@
+/* GStreamer
+ * Copyright (C) <2007> Wim Taymans <wim@fluendo.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#ifndef __RTP_SESSION_H__
+#define __RTP_SESSION_H__
+
+#include <gst/gst.h>
+#include <gst/netbuffer/gstnetbuffer.h>
+
+#include "rtpsource.h"
+
+typedef struct _RTPSession RTPSession;
+typedef struct _RTPSessionClass RTPSessionClass;
+
+#define RTP_TYPE_SESSION (rtp_session_get_type())
+#define RTP_SESSION(sess) (G_TYPE_CHECK_INSTANCE_CAST((sess),RTP_TYPE_SESSION,RTPSession))
+#define RTP_SESSION_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass),RTP_TYPE_SESSION,RTPSessionClass))
+#define RTP_IS_SESSION(sess) (G_TYPE_CHECK_INSTANCE_TYPE((sess),RTP_TYPE_SESSION))
+#define RTP_IS_SESSION_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass),RTP_TYPE_SESSION))
+#define RTP_SESSION_CAST(sess) ((RTPSession *)(sess))
+
+#define RTP_SESSION_LOCK(sess) (g_mutex_lock ((sess)->lock))
+#define RTP_SESSION_UNLOCK(sess) (g_mutex_unlock ((sess)->lock))
+
+/**
+ * RTPSessionProcessRTP:
+ * @sess: an #RTPSession
+ * @src: the #RTPSource
+ * @buffer: the RTP buffer ready for processing
+ * @user_data: user data specified when registering
+ *
+ * This callback will be called when @sess has @buffer ready for further
+ * processing. Processing the buffer typically includes decoding and displaying
+ * the buffer.
+ *
+ * Returns: a #GstFlowReturn.
+ */
+typedef GstFlowReturn (*RTPSessionProcessRTP) (RTPSession *sess, RTPSource *src, GstBuffer *buffer, gpointer user_data);
+
+/**
+ * RTPSessionSendRTP:
+ * @sess: an #RTPSession
+ * @src: the #RTPSource
+ * @buffer: the RTP buffer ready for sending
+ * @user_data: user data specified when registering
+ *
+ * This callback will be called when @sess has @buffer ready for sending to
+ * all listening participants in this session.
+ *
+ * Returns: a #GstFlowReturn.
+ */
+typedef GstFlowReturn (*RTPSessionSendRTP) (RTPSession *sess, RTPSource *src, GstBuffer *buffer, gpointer user_data);
+
+/**
+ * RTPSessionSendRTCP:
+ * @sess: an #RTPSession
+ * @src: the #RTPSource
+ * @buffer: the RTCP buffer ready for sending
+ * @user_data: user data specified when registering
+ *
+ * This callback will be called when @sess has @buffer ready for sending to
+ * all listening participants in this session.
+ *
+ * Returns: a #GstFlowReturn.
+ */
+typedef GstFlowReturn (*RTPSessionSendRTCP) (RTPSession *sess, RTPSource *src, GstBuffer *buffer, gpointer user_data);
+
+/**
+ * RTPSessionClockRate:
+ * @sess: an #RTPSession
+ * @payload: the payload
+ * @user_data: user data specified when registering
+ *
+ * This callback will be called when @sess needs the clock-rate of @payload.
+ *
+ * Returns: the clock-rate of @pt.
+ */
+typedef gint (*RTPSessionClockRate) (RTPSession *sess, guint8 payload, gpointer user_data);
+
+/**
+ * RTPSessionGetTime:
+ * @sess: an #RTPSession
+ * @user_data: user data specified when registering
+ *
+ * This callback will be called when @sess needs the current time in
+ * nanoseconds.
+ *
+ * Returns: a #GstClockTime with the current time in nanoseconds.
+ */
+typedef GstClockTime (*RTPSessionGetTime) (RTPSession *sess, gpointer user_data);
+
+/**
+ * RTPSessionCallbacks:
+ * @RTPSessionProcessRTP: callback to process RTP packets
+ * @RTPSessionSendRTP: callback for sending RTP packets
+ * @RTPSessionSendRTCP: callback for sending RTCP packets
+ * @RTPSessionGetTime: callback for returning the current time
+ *
+ * These callbacks can be installed on the session manager to get notification
+ * when RTP and RTCP packets are ready for further processing. These callbacks
+ * are not implemented with signals for performance reasons.
+ */
+typedef struct {
+ RTPSessionProcessRTP process_rtp;
+ RTPSessionSendRTP send_rtp;
+ RTPSessionSendRTCP send_rtcp;
+ RTPSessionClockRate clock_rate;
+ RTPSessionGetTime get_time;
+} RTPSessionCallbacks;
+
+/**
+ * RTPSession:
+ * @lock: lock to protect the session
+ * @source: the source of this session
+ * @ssrcs: Hashtable of sources indexed by SSRC
+ * @cnames: Hashtable of sources indexed by CNAME
+ * @num_sources: the number of sources
+ * @activecount: the number of active sources
+ * @callbacks: callbacks
+ * @user_data: user data passed in callbacks
+ *
+ * The RTP session manager object
+ */
+struct _RTPSession {
+ GObject object;
+
+ GMutex *lock;
+
+ guint header_len;
+
+ RTPSource *source;
+ GHashTable *ssrcs;
+ GHashTable *cnames;
+ guint total_sources;
+
+ RTPSessionCallbacks callbacks;
+ gpointer user_data;
+
+ RTPSessionStats stats;
+};
+
+/**
+ * RTPSessionClass:
+ * @on_new_ssrc: emited when a new source is found
+ * @on_bye_ssrc: emited when a source is gone
+ *
+ * The session class.
+ */
+struct _RTPSessionClass {
+ GObjectClass parent_class;
+
+ /* signals */
+ void (*on_new_ssrc) (RTPSession *sess, RTPSource *source);
+ void (*on_ssrc_collision) (RTPSession *sess, RTPSource *source);
+ void (*on_ssrc_validated) (RTPSession *sess, RTPSource *source);
+ void (*on_bye_ssrc) (RTPSession *sess, RTPSource *source);
+};
+
+GType rtp_session_get_type (void);
+
+/* create and configure */
+RTPSession* rtp_session_new (void);
+void rtp_session_set_callbacks (RTPSession *sess,
+ RTPSessionCallbacks *callbacks,
+ gpointer user_data);
+void rtp_session_set_bandwidth (RTPSession *sess, gdouble bandwidth);
+gdouble rtp_session_get_bandwidth (RTPSession *sess);
+void rtp_session_set_rtcp_fraction (RTPSession *sess, gdouble fraction);
+gdouble rtp_session_get_rtcp_fraction (RTPSession *sess);
+
+/* handling sources */
+gboolean rtp_session_add_source (RTPSession *sess, RTPSource *src);
+gint rtp_session_get_num_sources (RTPSession *sess);
+gint rtp_session_get_num_active_sources (RTPSession *sess);
+RTPSource* rtp_session_get_source_by_ssrc (RTPSession *sess, guint32 ssrc);
+RTPSource* rtp_session_get_source_by_cname (RTPSession *sess, const gchar *cname);
+RTPSource* rtp_session_create_source (RTPSession *sess);
+
+/* processing packets from receivers */
+GstFlowReturn rtp_session_process_rtp (RTPSession *sess, GstBuffer *buffer);
+GstFlowReturn rtp_session_process_rtcp (RTPSession *sess, GstBuffer *buffer);
+
+/* processing packets for sending */
+GstFlowReturn rtp_session_send_rtp (RTPSession *sess, GstBuffer *buffer);
+
+/* get interval for next RTCP interval */
+gdouble rtp_session_get_rtcp_interval (RTPSession *sess);
+GstFlowReturn rtp_session_produce_rtcp (RTPSession *sess);
+
+#endif /* __RTP_SESSION_H__ */
diff --git a/gst/rtpmanager/rtpsource.c b/gst/rtpmanager/rtpsource.c
new file mode 100644
index 00000000..36f54381
--- /dev/null
+++ b/gst/rtpmanager/rtpsource.c
@@ -0,0 +1,477 @@
+/* GStreamer
+ * Copyright (C) <2007> Wim Taymans <wim@fluendo.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+#include <string.h>
+
+#include <gst/rtp/gstrtpbuffer.h>
+#include <gst/rtp/gstrtcpbuffer.h>
+
+#include "rtpsource.h"
+
+GST_DEBUG_CATEGORY_STATIC (rtp_source_debug);
+#define GST_CAT_DEFAULT rtp_source_debug
+
+#define RTP_MAX_PROBATION_LEN 32
+
+/* signals and args */
+enum
+{
+ LAST_SIGNAL
+};
+
+enum
+{
+ PROP_0
+};
+
+/* GObject vmethods */
+static void rtp_source_finalize (GObject * object);
+
+/* static guint rtp_source_signals[LAST_SIGNAL] = { 0 }; */
+
+G_DEFINE_TYPE (RTPSource, rtp_source, G_TYPE_OBJECT);
+
+static void
+rtp_source_class_init (RTPSourceClass * klass)
+{
+ GObjectClass *gobject_class;
+
+ gobject_class = (GObjectClass *) klass;
+
+ gobject_class->finalize = rtp_source_finalize;
+
+ GST_DEBUG_CATEGORY_INIT (rtp_source_debug, "rtpsource", 0, "RTP Source");
+}
+
+static void
+rtp_source_init (RTPSource * src)
+{
+ /* sources are initialy on probation until we receive enough valid RTP
+ * packets or a valid RTCP packet */
+ src->validated = FALSE;
+ src->probation = RTP_DEFAULT_PROBATION;
+
+ src->payload = 0;
+ src->clock_rate = -1;
+ src->packets = g_queue_new ();
+
+ src->stats.jitter = 0;
+ src->stats.transit = -1;
+ src->stats.curr_sr = 0;
+ src->stats.curr_rr = 0;
+}
+
+static void
+rtp_source_finalize (GObject * object)
+{
+ RTPSource *src;
+ GstBuffer *buffer;
+
+ src = RTP_SOURCE_CAST (object);
+
+ while ((buffer = g_queue_pop_head (src->packets)))
+ gst_buffer_unref (buffer);
+ g_queue_free (src->packets);
+
+ G_OBJECT_CLASS (rtp_source_parent_class)->finalize (object);
+}
+
+/**
+ * rtp_source_new:
+ * @ssrc: an SSRC
+ *
+ * Create a #RTPSource with @ssrc.
+ *
+ * Returns: a new #RTPSource. Use g_object_unref() after usage.
+ */
+RTPSource *
+rtp_source_new (guint32 ssrc)
+{
+ RTPSource *src;
+
+ src = g_object_new (RTP_TYPE_SOURCE, NULL);
+ src->ssrc = ssrc;
+
+ return src;
+}
+
+/**
+ * rtp_source_set_callbacks:
+ * @src: an #RTPSource
+ * @cb: callback functions
+ * @user_data: user data
+ *
+ * Set the callbacks for the source.
+ */
+void
+rtp_source_set_callbacks (RTPSource * src, RTPSourceCallbacks * cb,
+ gpointer user_data)
+{
+ g_return_if_fail (RTP_IS_SOURCE (src));
+
+ src->callbacks.push_rtp = cb->push_rtp;
+ src->callbacks.clock_rate = cb->clock_rate;
+ src->user_data = user_data;
+}
+
+/**
+ * rtp_source_set_as_csrc:
+ * @src: an #RTPSource
+ *
+ * Configure @src as a CSRC, this will validate the RTpSource.
+ */
+void
+rtp_source_set_as_csrc (RTPSource * src)
+{
+ g_return_if_fail (RTP_IS_SOURCE (src));
+
+ src->validated = TRUE;
+ src->is_csrc = TRUE;
+}
+
+/**
+ * rtp_source_set_rtp_from:
+ * @src: an #RTPSource
+ * @address: the RTP address to set
+ *
+ * Set that @src is receiving RTP packets from @address. This is used for
+ * collistion checking.
+ */
+void
+rtp_source_set_rtp_from (RTPSource * src, GstNetAddress * address)
+{
+ g_return_if_fail (RTP_IS_SOURCE (src));
+
+ src->have_rtp_from = TRUE;
+ memcpy (&src->rtp_from, address, sizeof (GstNetAddress));
+}
+
+/**
+ * rtp_source_set_rtcp_from:
+ * @src: an #RTPSource
+ * @address: the RTCP address to set
+ *
+ * Set that @src is receiving RTCP packets from @address. This is used for
+ * collistion checking.
+ */
+void
+rtp_source_set_rtcp_from (RTPSource * src, GstNetAddress * address)
+{
+ g_return_if_fail (RTP_IS_SOURCE (src));
+
+ src->have_rtcp_from = TRUE;
+ memcpy (&src->rtcp_from, address, sizeof (GstNetAddress));
+}
+
+static GstFlowReturn
+push_packet (RTPSource * src, GstBuffer * buffer)
+{
+ GstFlowReturn ret = GST_FLOW_OK;
+
+ /* push queued packets first if any */
+ while (!g_queue_is_empty (src->packets)) {
+ GstBuffer *buffer = GST_BUFFER_CAST (g_queue_pop_head (src->packets));
+
+ GST_DEBUG ("pushing queued packet");
+ if (src->callbacks.push_rtp)
+ src->callbacks.push_rtp (src, buffer, src->user_data);
+ else
+ gst_buffer_unref (buffer);
+ }
+ GST_DEBUG ("pushing new packet");
+ /* push packet */
+ if (src->callbacks.push_rtp)
+ ret = src->callbacks.push_rtp (src, buffer, src->user_data);
+ else
+ gst_buffer_unref (buffer);
+
+ return ret;
+}
+
+static gint
+get_clock_rate (RTPSource * src, guint8 payload)
+{
+ if (payload != src->payload) {
+ gint clock_rate = -1;
+
+ if (src->callbacks.clock_rate)
+ clock_rate = src->callbacks.clock_rate (src, payload, src->user_data);
+
+ GST_DEBUG ("new payload %d, got clock-rate %d", payload, clock_rate);
+
+ src->clock_rate = clock_rate;
+ src->payload = payload;
+ }
+ return src->clock_rate;
+}
+
+static void
+calculate_jitter (RTPSource * src, GstBuffer * buffer,
+ RTPArrivalStats * arrival)
+{
+ GstClockTime current;
+ guint32 rtparrival, transit, rtptime;
+ gint32 diff;
+ gint clock_rate;
+ guint8 pt;
+
+ /* get arrival time */
+ if ((current = arrival->time) == GST_CLOCK_TIME_NONE)
+ goto no_time;
+
+ pt = gst_rtp_buffer_get_payload_type (buffer);
+
+ /* get clockrate */
+ if ((clock_rate = get_clock_rate (src, pt)) == -1)
+ goto no_clock_rate;
+
+ rtptime = gst_rtp_buffer_get_timestamp (buffer);
+
+ /* convert arrival time to RTP timestamp units */
+ rtparrival = gst_util_uint64_scale_int (current, clock_rate, GST_SECOND);
+
+ /* transit time is difference with RTP timestamp */
+ transit = rtparrival - rtptime;
+ /* get diff with previous transit time */
+ if (src->stats.transit != -1)
+ diff = transit - src->stats.transit;
+ else
+ diff = 0;
+ src->stats.transit = transit;
+ if (diff < 0)
+ diff = -diff;
+ /* update jitter */
+ src->stats.jitter += diff - ((src->stats.jitter + 8) >> 4);
+
+ src->stats.prev_rtptime = src->stats.last_rtptime;
+ src->stats.last_rtptime = rtparrival;
+
+ GST_DEBUG ("rtparrival %u, rtptime %u, clock-rate %d, diff %d, jitter: %u",
+ rtparrival, rtptime, clock_rate, diff, src->stats.jitter);
+
+ return;
+
+ /* ERRORS */
+no_time:
+ {
+ GST_WARNING ("cannot get current time");
+ return;
+ }
+no_clock_rate:
+ {
+ GST_WARNING ("cannot get clock-rate for pt %d", pt);
+ return;
+ }
+}
+
+/**
+ * rtp_source_process_rtp:
+ * @src: an #RTPSource
+ * @buffer: an RTP buffer
+ *
+ * Let @src handle the incomming RTP @buffer.
+ *
+ * Returns: a #GstFlowReturn.
+ */
+GstFlowReturn
+rtp_source_process_rtp (RTPSource * src, GstBuffer * buffer,
+ RTPArrivalStats * arrival)
+{
+ GstFlowReturn result = GST_FLOW_OK;
+
+ g_return_val_if_fail (RTP_IS_SOURCE (src), GST_FLOW_ERROR);
+ g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
+
+ /* if we are still on probation, check seqnum */
+ if (src->probation) {
+ guint16 seqnr, expected;
+
+ expected = src->stats.max_seqnr + 1;
+
+ /* when in probation, we require consecutive seqnums */
+ seqnr = gst_rtp_buffer_get_seq (buffer);
+ if (seqnr == expected) {
+ /* expected packet */
+ src->probation--;
+ src->stats.max_seqnr = seqnr;
+
+ GST_DEBUG ("probation: seqnr %d == expected %d", seqnr, expected);
+ } else {
+ GST_DEBUG ("probation: seqnr %d != expected %d", seqnr, expected);
+ src->probation = RTP_DEFAULT_PROBATION;
+ src->stats.max_seqnr = seqnr;
+ }
+ }
+ if (src->probation) {
+ GstBuffer *q;
+
+ GST_DEBUG ("probation %d: queue buffer", src->probation);
+ /* when still in probation, keep packets in a list. */
+ g_queue_push_tail (src->packets, buffer);
+ /* remove packets from queue if there are too many */
+ while (g_queue_get_length (src->packets) > RTP_MAX_PROBATION_LEN) {
+ q = g_queue_pop_head (src->packets);
+ gst_object_unref (q);
+ }
+ } else {
+ /* we are not in probation */
+ src->stats.octetsreceived += arrival->payload_len;
+ src->stats.bytesreceived += arrival->bytes;
+ src->stats.packetsreceived++;
+ src->is_sender = TRUE;
+
+ GST_DEBUG ("PC: %" G_GUINT64_FORMAT ", OC: %" G_GUINT64_FORMAT,
+ src->stats.packetsreceived, src->stats.octetsreceived);
+
+ /* calculate jitter */
+ calculate_jitter (src, buffer, arrival);
+
+ /* we're ready to push the RTP packet now */
+ result = push_packet (src, buffer);
+ }
+ return result;
+}
+
+/**
+ * rtp_source_process_bye:
+ * @src: an #RTPSource
+ * @reason: the reason for leaving
+ *
+ * Notify @src that a BYE packet has been received. This will make the source
+ * inactive.
+ */
+void
+rtp_source_process_bye (RTPSource * src, const gchar * reason)
+{
+ g_return_if_fail (RTP_IS_SOURCE (src));
+
+ GST_DEBUG ("marking SSRC %08x as BYE, reason: %s", src->ssrc,
+ GST_STR_NULL (reason));
+
+ /* copy the reason and mark as received_bye */
+ g_free (src->bye_reason);
+ src->bye_reason = g_strdup (reason);
+ src->received_bye = TRUE;
+}
+
+/**
+ * rtp_source_send_rtp:
+ * @src: an #RTPSource
+ * @buffer: an RTP buffer
+ *
+ * Send an RTP @buffer originating from @src. This will make @src a sender.
+ *
+ * Returns: a #GstFlowReturn.
+ */
+GstFlowReturn
+rtp_source_send_rtp (RTPSource * src, GstBuffer * buffer)
+{
+ GstFlowReturn result = GST_FLOW_OK;
+
+ g_return_val_if_fail (RTP_IS_SOURCE (src), GST_FLOW_ERROR);
+ g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
+
+ /* we are a sender now */
+ src->is_sender = TRUE;
+
+ /* push packet */
+ if (src->callbacks.push_rtp)
+ result = src->callbacks.push_rtp (src, buffer, src->user_data);
+ else
+ gst_buffer_unref (buffer);
+
+ return result;
+}
+
+/**
+ * rtp_source_process_sr:
+ * @src: an #RTPSource
+ * @ntptime: the NTP time
+ * @rtptime: the RTP time
+ * @packet_count: the packet count
+ * @octet_count: the octect count
+ *
+ * Update the sender report in @src.
+ */
+void
+rtp_source_process_sr (RTPSource * src, guint64 ntptime, guint32 rtptime,
+ guint32 packet_count, guint32 octet_count)
+{
+ RTPSenderReport *curr;
+ gint curridx;
+
+ g_return_if_fail (RTP_IS_SOURCE (src));
+
+ GST_DEBUG ("got SR packet: SSRC %08x, NTP %" G_GUINT64_FORMAT
+ ", RTP %u, PC %u, OC %u", src->ssrc, ntptime, rtptime, packet_count,
+ octet_count);
+
+ curridx = src->stats.curr_sr ^ 1;
+ curr = &src->stats.sr[curridx];
+
+ /* update current */
+ curr->is_valid = TRUE;
+ curr->ntptime = ntptime;
+ curr->rtptime = rtptime;
+ curr->packet_count = packet_count;
+ curr->octet_count = octet_count;
+
+ /* make current */
+ src->stats.curr_sr = curridx;
+}
+
+/**
+ * rtp_source_process_rb:
+ * @src: an #RTPSource
+ * @fractionlost: fraction lost since last SR/RR
+ * @packetslost: the cumululative number of packets lost
+ * @exthighestseq: the extended last sequence number received
+ * @jitter: the interarrival jitter
+ * @lsr: the last SR packet from this source
+ * @dlsr: the delay since last SR packet
+ *
+ * Update the report block in @src.
+ */
+void
+rtp_source_process_rb (RTPSource * src, guint8 fractionlost, gint32 packetslost,
+ guint32 exthighestseq, guint32 jitter, guint32 lsr, guint32 dlsr)
+{
+ RTPReceiverReport *curr;
+ gint curridx;
+
+ g_return_if_fail (RTP_IS_SOURCE (src));
+
+ GST_DEBUG ("got RB packet %d: SSRC %08x, FL %u"
+ ", PL %u, HS %u, JITTER %u, LSR %u, DLSR %u", src->ssrc, fractionlost,
+ packetslost, exthighestseq, jitter, lsr, dlsr);
+
+ curridx = src->stats.curr_rr ^ 1;
+ curr = &src->stats.rr[curridx];
+
+ /* update current */
+ curr->is_valid = TRUE;
+ curr->fractionlost = fractionlost;
+ curr->packetslost = packetslost;
+ curr->exthighestseq = exthighestseq;
+ curr->jitter = jitter;
+ curr->lsr = lsr;
+ curr->dlsr = dlsr;
+
+ /* make current */
+ src->stats.curr_rr = curridx;
+}
diff --git a/gst/rtpmanager/rtpsource.h b/gst/rtpmanager/rtpsource.h
new file mode 100644
index 00000000..d4ae6f55
--- /dev/null
+++ b/gst/rtpmanager/rtpsource.h
@@ -0,0 +1,162 @@
+/* GStreamer
+ * Copyright (C) <2007> Wim Taymans <wim@fluendo.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#ifndef __RTP_SOURCE_H__
+#define __RTP_SOURCE_H__
+
+#include <gst/gst.h>
+#include <gst/rtp/gstrtcpbuffer.h>
+#include <gst/netbuffer/gstnetbuffer.h>
+
+#include "rtpstats.h"
+
+/* the default number of consecutive RTP packets we need to receive before the
+ * source is considered valid */
+#define RTP_NO_PROBATION 0
+#define RTP_DEFAULT_PROBATION 2
+
+typedef struct _RTPSource RTPSource;
+typedef struct _RTPSourceClass RTPSourceClass;
+
+#define RTP_TYPE_SOURCE (rtp_source_get_type())
+#define RTP_SOURCE(src) (G_TYPE_CHECK_INSTANCE_CAST((src),RTP_TYPE_SOURCE,RTPSource))
+#define RTP_SOURCE_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass),RTP_TYPE_SOURCE,RTPSourceClass))
+#define RTP_IS_SOURCE(src) (G_TYPE_CHECK_INSTANCE_TYPE((src),RTP_TYPE_SOURCE))
+#define RTP_IS_SOURCE_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass),RTP_TYPE_SOURCE))
+#define RTP_SOURCE_CAST(src) ((RTPSource *)(src))
+
+/**
+ * RTP_SOURCE_IS_ACTIVE:
+ * @src: an #RTPSource
+ *
+ * Check if @src is active. A source is active when it has been validated
+ * and has not yet received a BYE packet.
+ */
+#define RTP_SOURCE_IS_ACTIVE(src) (src->validated && !src->received_bye)
+
+/**
+ * RTP_SOURCE_IS_SENDER:
+ * @src: an #RTPSource
+ *
+ * Check if @src is a sender.
+ */
+#define RTP_SOURCE_IS_SENDER(src) (src->is_sender)
+
+/**
+ * RTPSourcePushRTP:
+ * @src: an #RTPSource
+ * @buffer: the RTP buffer ready for processing
+ * @user_data: user data specified when registering
+ *
+ * This callback will be called when @src has @buffer ready for further
+ * processing.
+ *
+ * Returns: a #GstFlowReturn.
+ */
+typedef GstFlowReturn (*RTPSourcePushRTP) (RTPSource *src, GstBuffer *buffer, gpointer user_data);
+
+/**
+ * RTPSourceClockRate:
+ * @src: an #RTPSource
+ * @payload: a payload type
+ * @user_data: user data specified when registering
+ *
+ * This callback will be called when @src needs the clock-rate of the
+ * @payload.
+ *
+ * Returns: a clock-rate for @payload.
+ */
+typedef gint (*RTPSourceClockRate) (RTPSource *src, guint8 payload, gpointer user_data);
+
+/**
+ * RTPSourceCallbacks:
+ * @push_rtp: a packet becomes available for handling
+ * @clock_rate: a clock-rate is requested
+ * @get_time: the current clock time is requested
+ *
+ * Callbacks performed by #RTPSource when actions need to be performed.
+ */
+typedef struct {
+ RTPSourcePushRTP push_rtp;
+ RTPSourceClockRate clock_rate;
+} RTPSourceCallbacks;
+
+/**
+ * RTPSource:
+ *
+ * A source in the #RTPSession
+ */
+struct _RTPSource {
+ GObject object;
+
+ /*< private >*/
+ RTPSourceCallbacks callbacks;
+ gpointer user_data;
+
+ guint32 ssrc;
+ gchar *cname;
+ gint probation;
+ gboolean validated;
+ gboolean received_bye;
+ gchar *bye_reason;
+
+ gboolean is_csrc;
+ gboolean is_sender;
+
+ gboolean have_rtp_from;
+ GstNetAddress rtp_from;
+ gboolean have_rtcp_from;
+ GstNetAddress rtcp_from;
+
+ guint8 payload;
+ gint clock_rate;
+
+ GQueue *packets;
+
+ RTPSourceStats stats;
+};
+
+struct _RTPSourceClass {
+ GObjectClass parent_class;
+};
+
+GType rtp_source_get_type (void);
+
+/* managing lifetime of sources */
+RTPSource* rtp_source_new (guint32 ssrc);
+
+void rtp_source_set_callbacks (RTPSource *src, RTPSourceCallbacks *cb, gpointer data);
+void rtp_source_set_as_csrc (RTPSource *src);
+
+void rtp_source_set_rtp_from (RTPSource *src, GstNetAddress *address);
+void rtp_source_set_rtcp_from (RTPSource *src, GstNetAddress *address);
+
+GstFlowReturn rtp_source_process_rtp (RTPSource *src, GstBuffer *buffer, RTPArrivalStats *arrival);
+
+GstFlowReturn rtp_source_send_rtp (RTPSource *src, GstBuffer *buffer);
+
+/* RTCP messages */
+void rtp_source_process_bye (RTPSource *src, const gchar *reason);
+void rtp_source_process_sr (RTPSource *src, guint64 ntptime, guint32 rtptime,
+ guint32 packet_count, guint32 octet_count);
+void rtp_source_process_rb (RTPSource *src, guint8 fractionlost, gint32 packetslost,
+ guint32 exthighestseq, guint32 jitter,
+ guint32 lsr, guint32 dlsr);
+
+#endif /* __RTP_SOURCE_H__ */
diff --git a/gst/rtpmanager/rtpstats.c b/gst/rtpmanager/rtpstats.c
new file mode 100644
index 00000000..b9076eac
--- /dev/null
+++ b/gst/rtpmanager/rtpstats.c
@@ -0,0 +1,111 @@
+/* GStreamer
+ * Copyright (C) <2007> Wim Taymans <wim@fluendo.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#include "rtpstats.h"
+
+/**
+ * rtp_stats_init_defaults:
+ * @stats: an #RTPSessionStats struct
+ *
+ * Initialize @stats with its default values.
+ */
+void
+rtp_stats_init_defaults (RTPSessionStats * stats)
+{
+ stats->bandwidth = RTP_STATS_BANDWIDTH;
+ stats->sender_fraction = RTP_STATS_SENDER_FRACTION;
+ stats->receiver_fraction = RTP_STATS_RECEIVER_FRACTION;
+ stats->rtcp_bandwidth = RTP_STATS_RTCP_BANDWIDTH;
+ stats->min_interval = RTP_STATS_MIN_INTERVAL;
+}
+
+/**
+ * rtp_stats_calculate_rtcp_interval:
+ * @stats: an #RTPSessionStats struct
+ *
+ * Calculate the RTCP interval. The result of this function is the amount of
+ * time to wait (in seconds) before sender a new RTCP message.
+ *
+ * Returns: the RTCP interval.
+ */
+gdouble
+rtp_stats_calculate_rtcp_interval (RTPSessionStats * stats, gboolean sender)
+{
+ gdouble active, senders, receivers, sfraction;
+ gboolean avg_rtcp;
+ gdouble interval;
+
+ active = stats->active_sources;
+ /* Try to avoid division by zero */
+ if (stats->active_sources == 0)
+ active += 1.0;
+
+ senders = (gdouble) stats->sender_sources;
+ receivers = (gdouble) (active - senders);
+ avg_rtcp = (gdouble) stats->avg_rtcp_packet_size;
+
+ sfraction = senders / active;
+
+ GST_DEBUG ("senders: %f, receivers %f, avg_rtcp %f, sfraction %f",
+ senders, receivers, avg_rtcp, sfraction);
+
+ if (sfraction <= stats->sender_fraction) {
+ if (sender) {
+ interval =
+ (avg_rtcp * senders) / (stats->sender_fraction *
+ stats->rtcp_bandwidth);
+ } else {
+ interval =
+ (avg_rtcp * receivers) / ((1.0 -
+ stats->sender_fraction) * stats->rtcp_bandwidth);
+ }
+ } else {
+ interval = (avg_rtcp * active) / stats->rtcp_bandwidth;
+ }
+
+ if (interval < stats->min_interval)
+ interval = stats->min_interval;
+
+ if (!stats->sent_rtcp)
+ interval /= 2.0;
+
+ return interval;
+}
+
+/**
+ * rtp_stats_calculate_rtcp_interval:
+ * @stats: an #RTPSessionStats struct
+ * @interval: an RTCP interval
+ *
+ * Apply a random jitter to the @interval. @interval is typically obtained with
+ * rtp_stats_calculate_rtcp_interval().
+ *
+ * Returns: the new RTCP interval.
+ */
+gdouble
+rtp_stats_add_rtcp_jitter (RTPSessionStats * stats, gdouble interval)
+{
+ /* see RFC 3550 p 30
+ * To compensate for "unconditional reconsideration" converging to a
+ * value below the intended average.
+ */
+#define COMPENSATION (2.71828 - 1.5);
+
+ return (interval * g_random_double_range (0.5, 1.5)) / COMPENSATION;
+}
diff --git a/gst/rtpmanager/rtpstats.h b/gst/rtpmanager/rtpstats.h
new file mode 100644
index 00000000..66aa7bf7
--- /dev/null
+++ b/gst/rtpmanager/rtpstats.h
@@ -0,0 +1,161 @@
+/* GStreamer
+ * Copyright (C) <2007> Wim Taymans <wim@fluendo.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#ifndef __RTP_STATS_H__
+#define __RTP_STATS_H__
+
+#include <gst/gst.h>
+#include <gst/netbuffer/gstnetbuffer.h>
+
+/**
+ * RTPSenderReport:
+ *
+ * A sender report structure.
+ */
+typedef struct {
+ gboolean is_valid;
+ guint64 ntptime;
+ guint32 rtptime;
+ guint32 packet_count;
+ guint32 octet_count;
+} RTPSenderReport;
+
+/**
+ * RTPReceiverReport:
+ *
+ * A receiver report structure.
+ */
+typedef struct {
+ gboolean is_valid;
+ guint32 ssrc; /* who the report is from */
+ guint8 fractionlost;
+ guint32 packetslost;
+ guint32 exthighestseq;
+ guint32 jitter;
+ guint32 lsr;
+ guint32 dlsr;
+} RTPReceiverReport;
+
+/**
+ * RTPArrivalStats:
+ * @time: arrival time of a packet
+ * @address: address of the sender of the packet
+ * @bytes: bytes of the packet including lowlevel overhead
+ * @payload_len: bytes of the RTP payload
+ *
+ * Structure holding information about the arrival stats of a packet.
+ */
+typedef struct {
+ GstClockTime time;
+ gboolean have_address;
+ GstNetAddress address;
+ guint bytes;
+ guint payload_len;
+} RTPArrivalStats;
+
+/**
+ * RTPSourceStats:
+ * @packetsreceived: number of received packets in total
+ * @prevpacketsreceived: number of packets received in previous reporting
+ * interval
+ * @octetsreceived: number of payload bytes received
+ * @bytesreceived: number of total bytes received including headers and lower
+ * protocol level overhead
+ * @max_seqnr: highest sequence number received
+ * @transit: previous transit time used for calculating @jitter
+ * @jitter: current jitter
+ * @prev_rtptime: previous time when an RTP packet was received
+ * @prev_rtcptime: previous time when an RTCP packet was received
+ * @last_rtptime: time when last RTP packet received
+ * @last_rtcptime: time when last RTCP packet received
+ * @curr_rr: index of current @rr block
+ * @rr: previous and current receiver report block
+ * @curr_sr: index of current @sr block
+ * @sr: previous and current sender report block
+ *
+ * Stats about a source.
+ */
+typedef struct {
+ guint64 packetsreceived;
+ guint64 prevpacketsreceived;
+ guint64 octetsreceived;
+ guint64 bytesreceived;
+ guint16 max_seqnr;
+ guint32 transit;
+ guint32 jitter;
+
+ /* when we received stuff */
+ GstClockTime prev_rtptime;
+ GstClockTime prev_rtcptime;
+ GstClockTime last_rtptime;
+ GstClockTime last_rtcptime;
+
+ /* sender and receiver reports */
+ gint curr_rr;
+ RTPReceiverReport rr[2];
+ gint curr_sr;
+ RTPSenderReport sr[2];
+} RTPSourceStats;
+
+#define RTP_STATS_BANDWIDTH 64000.0
+#define RTP_STATS_RTCP_BANDWIDTH 3000.0
+/*
+ * Minimum average time between RTCP packets from this site (in
+ * seconds). This time prevents the reports from `clumping' when
+ * sessions are small and the law of large numbers isn't helping
+ * to smooth out the traffic. It also keeps the report interval
+ * from becoming ridiculously small during transient outages like
+ * a network partition.
+ */
+#define RTP_STATS_MIN_INTERVAL 5.0
+ /*
+ * Fraction of the RTCP bandwidth to be shared among active
+ * senders. (This fraction was chosen so that in a typical
+ * session with one or two active senders, the computed report
+ * time would be roughly equal to the minimum report time so that
+ * we don't unnecessarily slow down receiver reports.) The
+ * receiver fraction must be 1 - the sender fraction.
+ */
+#define RTP_STATS_SENDER_FRACTION (0.25)
+#define RTP_STATS_RECEIVER_FRACTION (1.0 - RTP_STATS_SENDER_FRACTION)
+
+/**
+ * RTPSessionStats:
+ *
+ * Stats kept for a session and used to produce RTCP packet timeouts.
+ */
+typedef struct {
+ gdouble bandwidth;
+ gdouble sender_fraction;
+ gdouble receiver_fraction;
+ gdouble rtcp_bandwidth;
+ gdouble min_interval;
+ guint sender_sources;
+ guint active_sources;
+ guint avg_rtcp_packet_size;
+ guint avg_bye_packet_size;
+ gboolean sent_rtcp;
+} RTPSessionStats;
+
+void rtp_stats_init_defaults (RTPSessionStats *stats);
+
+gdouble rtp_stats_calculate_rtcp_interval (RTPSessionStats *stats, gboolean sender);
+gdouble rtp_stats_add_rtcp_jitter (RTPSessionStats *stats, gdouble interval);
+
+#endif /* __RTP_STATS_H__ */