diff options
Diffstat (limited to 'gst/rtpmanager/rtpsource.c')
-rw-r--r-- | gst/rtpmanager/rtpsource.c | 128 |
1 files changed, 98 insertions, 30 deletions
diff --git a/gst/rtpmanager/rtpsource.c b/gst/rtpmanager/rtpsource.c index 36f54381..43acb084 100644 --- a/gst/rtpmanager/rtpsource.c +++ b/gst/rtpmanager/rtpsource.c @@ -70,6 +70,7 @@ rtp_source_init (RTPSource * src) src->clock_rate = -1; src->packets = g_queue_new (); + src->stats.cycles = -1; src->stats.jitter = 0; src->stats.transit = -1; src->stats.curr_sr = 0; @@ -279,6 +280,20 @@ no_clock_rate: } } +static void +init_seq (RTPSource * src, guint16 seq) +{ + src->stats.base_seq = seq; + src->stats.max_seq = seq; + src->stats.bad_seq = RTP_SEQ_MOD + 1; /* so seq == bad_seq is false */ + src->stats.cycles = 0; + src->stats.packets_received = 0; + src->stats.octets_received = 0; + src->stats.bytes_received = 0; + src->stats.prev_received = 0; + src->stats.prev_expected = 0; +} + /** * rtp_source_process_rtp: * @src: an #RTPSource @@ -293,58 +308,108 @@ rtp_source_process_rtp (RTPSource * src, GstBuffer * buffer, RTPArrivalStats * arrival) { GstFlowReturn result = GST_FLOW_OK; + guint16 seqnr, udelta; + RTPSourceStats *stats; g_return_val_if_fail (RTP_IS_SOURCE (src), GST_FLOW_ERROR); g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR); + stats = &src->stats; + + seqnr = gst_rtp_buffer_get_seq (buffer); + + if (stats->cycles == -1) { + GST_DEBUG ("first buffer"); + /* first time we heard of this source */ + init_seq (src, seqnr); + src->stats.max_seq = seqnr - 1; + src->probation = RTP_DEFAULT_PROBATION; + } + + udelta = seqnr - stats->max_seq; + /* if we are still on probation, check seqnum */ if (src->probation) { - guint16 seqnr, expected; + guint16 expected; - expected = src->stats.max_seqnr + 1; + expected = src->stats.max_seq + 1; /* when in probation, we require consecutive seqnums */ - seqnr = gst_rtp_buffer_get_seq (buffer); if (seqnr == expected) { /* expected packet */ - src->probation--; - src->stats.max_seqnr = seqnr; - GST_DEBUG ("probation: seqnr %d == expected %d", seqnr, expected); + src->probation--; + src->stats.max_seq = seqnr; + if (src->probation == 0) { + GST_DEBUG ("probation done!", src->probation); + init_seq (src, seqnr); + } else { + GstBuffer *q; + + GST_DEBUG ("probation %d: queue buffer", src->probation); + /* when still in probation, keep packets in a list. */ + g_queue_push_tail (src->packets, buffer); + /* remove packets from queue if there are too many */ + while (g_queue_get_length (src->packets) > RTP_MAX_PROBATION_LEN) { + q = g_queue_pop_head (src->packets); + gst_object_unref (q); + } + goto done; + } } else { GST_DEBUG ("probation: seqnr %d != expected %d", seqnr, expected); src->probation = RTP_DEFAULT_PROBATION; - src->stats.max_seqnr = seqnr; + src->stats.max_seq = seqnr; + goto done; } - } - if (src->probation) { - GstBuffer *q; - - GST_DEBUG ("probation %d: queue buffer", src->probation); - /* when still in probation, keep packets in a list. */ - g_queue_push_tail (src->packets, buffer); - /* remove packets from queue if there are too many */ - while (g_queue_get_length (src->packets) > RTP_MAX_PROBATION_LEN) { - q = g_queue_pop_head (src->packets); - gst_object_unref (q); + } else if (udelta < RTP_MAX_DROPOUT) { + /* in order, with permissible gap */ + if (seqnr < stats->max_seq) { + /* sequence number wrapped - count another 64K cycle. */ + stats->cycles++; + } + stats->max_seq = seqnr; + } else if (udelta <= RTP_SEQ_MOD - RTP_MAX_MISORDER) { + /* the sequence number made a very large jump */ + if (seqnr == stats->bad_seq) { + /* two sequential packets -- assume that the other side + * restarted without telling us so just re-sync + * (i.e., pretend this was the first packet). */ + init_seq (src, seqnr); + } else { + /* unacceptable jump */ + stats->bad_seq = (seqnr + 1) & (RTP_SEQ_MOD - 1); + goto bad_sequence; } } else { - /* we are not in probation */ - src->stats.octetsreceived += arrival->payload_len; - src->stats.bytesreceived += arrival->bytes; - src->stats.packetsreceived++; - src->is_sender = TRUE; + /* duplicate or reordered packet, will be filtered by jitterbuffer. */ + } - GST_DEBUG ("PC: %" G_GUINT64_FORMAT ", OC: %" G_GUINT64_FORMAT, - src->stats.packetsreceived, src->stats.octetsreceived); + src->stats.octets_received += arrival->payload_len; + src->stats.bytes_received += arrival->bytes; + src->stats.packets_received++; + /* the source that sent the packet must be a sender */ + src->is_sender = TRUE; + src->validated = TRUE; - /* calculate jitter */ - calculate_jitter (src, buffer, arrival); + GST_DEBUG ("PC: %" G_GUINT64_FORMAT ", OC: %" G_GUINT64_FORMAT, + src->stats.packets_received, src->stats.octets_received); - /* we're ready to push the RTP packet now */ - result = push_packet (src, buffer); - } + /* calculate jitter */ + calculate_jitter (src, buffer, arrival); + + /* we're ready to push the RTP packet now */ + result = push_packet (src, buffer); + +done: return result; + + /* ERRORS */ +bad_sequence: + { + GST_WARNING ("unacceptable seqnum received"); + return GST_FLOW_OK; + } } /** @@ -424,6 +489,9 @@ rtp_source_process_sr (RTPSource * src, guint64 ntptime, guint32 rtptime, curridx = src->stats.curr_sr ^ 1; curr = &src->stats.sr[curridx]; + /* this is a sender now */ + src->is_sender = TRUE; + /* update current */ curr->is_valid = TRUE; curr->ntptime = ntptime; |