summaryrefslogtreecommitdiffstats
path: root/gst/rtpmanager/rtpsource.c
diff options
context:
space:
mode:
Diffstat (limited to 'gst/rtpmanager/rtpsource.c')
-rw-r--r--gst/rtpmanager/rtpsource.c128
1 files changed, 98 insertions, 30 deletions
diff --git a/gst/rtpmanager/rtpsource.c b/gst/rtpmanager/rtpsource.c
index 36f54381..43acb084 100644
--- a/gst/rtpmanager/rtpsource.c
+++ b/gst/rtpmanager/rtpsource.c
@@ -70,6 +70,7 @@ rtp_source_init (RTPSource * src)
src->clock_rate = -1;
src->packets = g_queue_new ();
+ src->stats.cycles = -1;
src->stats.jitter = 0;
src->stats.transit = -1;
src->stats.curr_sr = 0;
@@ -279,6 +280,20 @@ no_clock_rate:
}
}
+static void
+init_seq (RTPSource * src, guint16 seq)
+{
+ src->stats.base_seq = seq;
+ src->stats.max_seq = seq;
+ src->stats.bad_seq = RTP_SEQ_MOD + 1; /* so seq == bad_seq is false */
+ src->stats.cycles = 0;
+ src->stats.packets_received = 0;
+ src->stats.octets_received = 0;
+ src->stats.bytes_received = 0;
+ src->stats.prev_received = 0;
+ src->stats.prev_expected = 0;
+}
+
/**
* rtp_source_process_rtp:
* @src: an #RTPSource
@@ -293,58 +308,108 @@ rtp_source_process_rtp (RTPSource * src, GstBuffer * buffer,
RTPArrivalStats * arrival)
{
GstFlowReturn result = GST_FLOW_OK;
+ guint16 seqnr, udelta;
+ RTPSourceStats *stats;
g_return_val_if_fail (RTP_IS_SOURCE (src), GST_FLOW_ERROR);
g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
+ stats = &src->stats;
+
+ seqnr = gst_rtp_buffer_get_seq (buffer);
+
+ if (stats->cycles == -1) {
+ GST_DEBUG ("first buffer");
+ /* first time we heard of this source */
+ init_seq (src, seqnr);
+ src->stats.max_seq = seqnr - 1;
+ src->probation = RTP_DEFAULT_PROBATION;
+ }
+
+ udelta = seqnr - stats->max_seq;
+
/* if we are still on probation, check seqnum */
if (src->probation) {
- guint16 seqnr, expected;
+ guint16 expected;
- expected = src->stats.max_seqnr + 1;
+ expected = src->stats.max_seq + 1;
/* when in probation, we require consecutive seqnums */
- seqnr = gst_rtp_buffer_get_seq (buffer);
if (seqnr == expected) {
/* expected packet */
- src->probation--;
- src->stats.max_seqnr = seqnr;
-
GST_DEBUG ("probation: seqnr %d == expected %d", seqnr, expected);
+ src->probation--;
+ src->stats.max_seq = seqnr;
+ if (src->probation == 0) {
+ GST_DEBUG ("probation done!", src->probation);
+ init_seq (src, seqnr);
+ } else {
+ GstBuffer *q;
+
+ GST_DEBUG ("probation %d: queue buffer", src->probation);
+ /* when still in probation, keep packets in a list. */
+ g_queue_push_tail (src->packets, buffer);
+ /* remove packets from queue if there are too many */
+ while (g_queue_get_length (src->packets) > RTP_MAX_PROBATION_LEN) {
+ q = g_queue_pop_head (src->packets);
+ gst_object_unref (q);
+ }
+ goto done;
+ }
} else {
GST_DEBUG ("probation: seqnr %d != expected %d", seqnr, expected);
src->probation = RTP_DEFAULT_PROBATION;
- src->stats.max_seqnr = seqnr;
+ src->stats.max_seq = seqnr;
+ goto done;
}
- }
- if (src->probation) {
- GstBuffer *q;
-
- GST_DEBUG ("probation %d: queue buffer", src->probation);
- /* when still in probation, keep packets in a list. */
- g_queue_push_tail (src->packets, buffer);
- /* remove packets from queue if there are too many */
- while (g_queue_get_length (src->packets) > RTP_MAX_PROBATION_LEN) {
- q = g_queue_pop_head (src->packets);
- gst_object_unref (q);
+ } else if (udelta < RTP_MAX_DROPOUT) {
+ /* in order, with permissible gap */
+ if (seqnr < stats->max_seq) {
+ /* sequence number wrapped - count another 64K cycle. */
+ stats->cycles++;
+ }
+ stats->max_seq = seqnr;
+ } else if (udelta <= RTP_SEQ_MOD - RTP_MAX_MISORDER) {
+ /* the sequence number made a very large jump */
+ if (seqnr == stats->bad_seq) {
+ /* two sequential packets -- assume that the other side
+ * restarted without telling us so just re-sync
+ * (i.e., pretend this was the first packet). */
+ init_seq (src, seqnr);
+ } else {
+ /* unacceptable jump */
+ stats->bad_seq = (seqnr + 1) & (RTP_SEQ_MOD - 1);
+ goto bad_sequence;
}
} else {
- /* we are not in probation */
- src->stats.octetsreceived += arrival->payload_len;
- src->stats.bytesreceived += arrival->bytes;
- src->stats.packetsreceived++;
- src->is_sender = TRUE;
+ /* duplicate or reordered packet, will be filtered by jitterbuffer. */
+ }
- GST_DEBUG ("PC: %" G_GUINT64_FORMAT ", OC: %" G_GUINT64_FORMAT,
- src->stats.packetsreceived, src->stats.octetsreceived);
+ src->stats.octets_received += arrival->payload_len;
+ src->stats.bytes_received += arrival->bytes;
+ src->stats.packets_received++;
+ /* the source that sent the packet must be a sender */
+ src->is_sender = TRUE;
+ src->validated = TRUE;
- /* calculate jitter */
- calculate_jitter (src, buffer, arrival);
+ GST_DEBUG ("PC: %" G_GUINT64_FORMAT ", OC: %" G_GUINT64_FORMAT,
+ src->stats.packets_received, src->stats.octets_received);
- /* we're ready to push the RTP packet now */
- result = push_packet (src, buffer);
- }
+ /* calculate jitter */
+ calculate_jitter (src, buffer, arrival);
+
+ /* we're ready to push the RTP packet now */
+ result = push_packet (src, buffer);
+
+done:
return result;
+
+ /* ERRORS */
+bad_sequence:
+ {
+ GST_WARNING ("unacceptable seqnum received");
+ return GST_FLOW_OK;
+ }
}
/**
@@ -424,6 +489,9 @@ rtp_source_process_sr (RTPSource * src, guint64 ntptime, guint32 rtptime,
curridx = src->stats.curr_sr ^ 1;
curr = &src->stats.sr[curridx];
+ /* this is a sender now */
+ src->is_sender = TRUE;
+
/* update current */
curr->is_valid = TRUE;
curr->ntptime = ntptime;