From 7efc94bc12d1c2c986353c9673f35f1dbe387282 Mon Sep 17 00:00:00 2001 From: Olivier Crete Date: Fri, 14 Sep 2007 04:20:42 +0000 Subject: [MOVED FROM GST-P-FARSIGHT] Implement stopping in a nice thread safe way 20070914042042-3e2dc-1fe257ff4b72aca4b0eb5f285a14650b8df268c3.gz --- gst/dtmf/gstdtmfsrc.c | 198 +++++++++++++++++++++++++++++++++----------------- gst/dtmf/gstdtmfsrc.h | 5 +- 2 files changed, 133 insertions(+), 70 deletions(-) diff --git a/gst/dtmf/gstdtmfsrc.c b/gst/dtmf/gstdtmfsrc.c index 8c373ea3..35e6813f 100644 --- a/gst/dtmf/gstdtmfsrc.c +++ b/gst/dtmf/gstdtmfsrc.c @@ -252,6 +252,7 @@ static void gst_dtmf_src_add_stop_event (GstDTMFSrc *dtmfsrc); static gboolean gst_dtmf_src_unlock (GstBaseSrc *src); +static gboolean gst_dtmf_src_unlock_stop (GstBaseSrc *src); static void gst_dtmf_src_base_init (gpointer g_class) @@ -294,6 +295,8 @@ gst_dtmf_src_class_init (GstDTMFSrcClass * klass) GST_DEBUG_FUNCPTR (gst_dtmf_src_change_state); gstbasesrc_class->unlock = GST_DEBUG_FUNCPTR (gst_dtmf_src_unlock); + gstbasesrc_class->unlock_stop = + GST_DEBUG_FUNCPTR (gst_dtmf_src_unlock_stop); gstbasesrc_class->event = GST_DEBUG_FUNCPTR (gst_dtmf_src_handle_event); @@ -315,8 +318,6 @@ gst_dtmf_src_init (GstDTMFSrc * dtmfsrc, GstDTMFSrcClass *g_class) dtmfsrc->event_queue = g_async_queue_new (); dtmfsrc->last_event = NULL; - dtmfsrc->clock_id = NULL; - GST_DEBUG_OBJECT (dtmfsrc, "init done"); } @@ -475,7 +476,7 @@ gst_dtmf_prepare_timestamps (GstDTMFSrc *dtmfsrc) GstClock *clock; GstClockTime base_time; - base_time = GST_ELEMENT_CAST (dtmfsrc)->base_time; + base_time = gst_element_get_base_time (GST_ELEMENT (dtmfsrc)); clock = gst_element_get_clock (GST_ELEMENT (dtmfsrc)); if (clock != NULL) { @@ -622,82 +623,124 @@ gst_dtmf_src_create (GstBaseSrc * basesrc, guint64 offset, guint length, GstBuffer ** buffer) { GstBuffer *buf = NULL; - GstFlowReturn ret; GstDTMFSrcEvent *event; GstDTMFSrc * dtmfsrc; + GstClock *clock; + GstClockID *clockid; + GstClockReturn clockret; dtmfsrc = GST_DTMF_SRC (basesrc); - g_async_queue_ref (dtmfsrc->event_queue); - - start: - if (dtmfsrc->last_event == NULL) { - GST_DEBUG_OBJECT (dtmfsrc, "popping"); - event = g_async_queue_pop (dtmfsrc->event_queue); - - GST_DEBUG_OBJECT (dtmfsrc, "popped %d", event->event_type); - - if (event->event_type == DTMF_EVENT_TYPE_STOP) { - GST_WARNING_OBJECT (dtmfsrc, - "Received a DTMF stop event when already stopped"); - } else if (event->event_type == DTMF_EVENT_TYPE_START) { - gst_dtmf_prepare_timestamps (dtmfsrc); - - /* Don't forget to get exclusive access to the stream */ - gst_dtmf_src_set_stream_lock (dtmfsrc, TRUE); - - event->packet_count = 0; - dtmfsrc->last_event = event; - } else if (event->event_type == DTMF_EVENT_TYPE_PAUSE_TASK) { - /* - * We're pushing it back because it has to stay in there until - * the task is really paused (and the queue will then be flushed) - */ - GST_DEBUG_OBJECT (dtmfsrc, "pushing pause_task..."); - g_async_queue_push (dtmfsrc->event_queue, event); - g_async_queue_unref (dtmfsrc->event_queue); - } - } else if (dtmfsrc->last_event->packet_count * dtmfsrc->interval >= - MIN_DUTY_CYCLE) { - event = g_async_queue_try_pop (dtmfsrc->event_queue); + do { + + if (dtmfsrc->paused) + goto paused; + + if (dtmfsrc->last_event == NULL) { + GST_DEBUG_OBJECT (dtmfsrc, "popping"); + event = g_async_queue_pop (dtmfsrc->event_queue); + + GST_DEBUG_OBJECT (dtmfsrc, "popped %d", event->event_type); - if (event != NULL) { - if (event->event_type == DTMF_EVENT_TYPE_START) { + if (event->event_type == DTMF_EVENT_TYPE_STOP) { GST_WARNING_OBJECT (dtmfsrc, - "Received two consecutive DTMF start events"); - } else if (event->event_type == DTMF_EVENT_TYPE_STOP) { - gst_dtmf_src_set_stream_lock (dtmfsrc, FALSE); - g_free (dtmfsrc->last_event); - dtmfsrc->last_event = NULL; - goto start; + "Received a DTMF stop event when already stopped"); + } else if (event->event_type == DTMF_EVENT_TYPE_START) { + gst_dtmf_prepare_timestamps (dtmfsrc); + + /* Don't forget to get exclusive access to the stream */ + gst_dtmf_src_set_stream_lock (dtmfsrc, TRUE); + + event->packet_count = 0; + dtmfsrc->last_event = event; } else if (event->event_type == DTMF_EVENT_TYPE_PAUSE_TASK) { /* * We're pushing it back because it has to stay in there until * the task is really paused (and the queue will then be flushed) */ GST_DEBUG_OBJECT (dtmfsrc, "pushing pause_task..."); - g_async_queue_push (dtmfsrc->event_queue, event); - g_async_queue_unref (dtmfsrc->event_queue); + if (dtmfsrc->paused) { + + g_async_queue_push (dtmfsrc->event_queue, event); + goto paused; + } + } + } else if (dtmfsrc->last_event->packet_count * dtmfsrc->interval >= + MIN_DUTY_CYCLE) { + event = g_async_queue_try_pop (dtmfsrc->event_queue); + + if (event != NULL) { + if (event->event_type == DTMF_EVENT_TYPE_START) { + GST_WARNING_OBJECT (dtmfsrc, + "Received two consecutive DTMF start events"); + } else if (event->event_type == DTMF_EVENT_TYPE_STOP) { + gst_dtmf_src_set_stream_lock (dtmfsrc, FALSE); + + g_free (dtmfsrc->last_event); + dtmfsrc->last_event = NULL; + } else if (event->event_type == DTMF_EVENT_TYPE_PAUSE_TASK) { + /* + * We're pushing it back because it has to stay in there until + * the task is really paused (and the queue will then be flushed) + */ + GST_DEBUG_OBJECT (dtmfsrc, "pushing pause_task..."); + if (dtmfsrc->paused) { + g_async_queue_push (dtmfsrc->event_queue, event); + goto paused; + } + } } } + } while (dtmfsrc->last_event == NULL); + + GST_DEBUG_OBJECT (dtmfsrc, "end event check, now wait for the proper time"); + + clock = gst_element_get_clock (GST_ELEMENT (basesrc)); + + clockid = gst_clock_new_single_shot_id (clock, dtmfsrc->timestamp + + gst_element_get_base_time (GST_ELEMENT (dtmfsrc))); + gst_object_unref (clock); + + GST_OBJECT_LOCK (dtmfsrc); + if (!dtmfsrc->paused) { + dtmfsrc->clockid = clockid; + GST_OBJECT_UNLOCK (dtmfsrc); + + clockret = gst_clock_id_wait (clockid, NULL); + + GST_OBJECT_LOCK (dtmfsrc); + if (dtmfsrc->paused) + clockret = GST_CLOCK_UNSCHEDULED; + } else { + clockret = GST_CLOCK_UNSCHEDULED; } - g_async_queue_unref (dtmfsrc->event_queue); + gst_clock_id_unref (clockid); + dtmfsrc->clockid = NULL; + GST_OBJECT_UNLOCK (dtmfsrc); - GST_DEBUG_OBJECT (dtmfsrc, "end event check"); + if (clockret == GST_CLOCK_UNSCHEDULED) { + goto paused; + } - if (dtmfsrc->last_event) { - buf = gst_dtmf_src_create_next_tone_packet (dtmfsrc, dtmfsrc->last_event); + buf = gst_dtmf_src_create_next_tone_packet (dtmfsrc, dtmfsrc->last_event); - GST_DEBUG_OBJECT (dtmfsrc, "Created buffer of size %d", GST_BUFFER_SIZE (buf)); - *buffer = buf; - ret = GST_FLOW_OK; - } else { - *buffer = NULL; - ret = GST_FLOW_WRONG_STATE; + GST_DEBUG_OBJECT (dtmfsrc, "Created buffer of size %d", GST_BUFFER_SIZE (buf)); + *buffer = buf; + + GST_DEBUG_OBJECT (dtmfsrc, "returning a buffer"); + return GST_FLOW_OK; + + paused: + + if (dtmfsrc->last_event) { + GST_DEBUG_OBJECT (dtmfsrc, "Stopping current event"); + /* Don't forget to release the stream lock */ + gst_dtmf_src_set_stream_lock (dtmfsrc, FALSE); + g_free (dtmfsrc->last_event); + dtmfsrc->last_event = NULL; } - GST_DEBUG_OBJECT (dtmfsrc, "returning"); - return ret; + return GST_FLOW_WRONG_STATE; } @@ -706,7 +749,16 @@ gst_dtmf_src_unlock (GstBaseSrc *src) { GstDTMFSrc *dtmfsrc = GST_DTMF_SRC (src); GstDTMFSrcEvent *event = NULL; - GST_DEBUG_OBJECT (dtmfsrc, "Pushing the PAUSE_TASK even on PAUSED_TO_READY change"); + GST_DEBUG_OBJECT (dtmfsrc, "Called unlock"); + + GST_OBJECT_LOCK (dtmfsrc); + dtmfsrc->paused = TRUE; + if (dtmfsrc->clockid) { + gst_clock_id_unschedule (dtmfsrc->clockid); + } + GST_OBJECT_UNLOCK (dtmfsrc); + + GST_DEBUG_OBJECT (dtmfsrc, "Pushing the PAUSE_TASK event on unlock request"); event = g_malloc (sizeof(GstDTMFSrcEvent)); event->event_type = DTMF_EVENT_TYPE_PAUSE_TASK; g_async_queue_push (dtmfsrc->event_queue, event); @@ -714,6 +766,20 @@ gst_dtmf_src_unlock (GstBaseSrc *src) { return TRUE; } + +static gboolean +gst_dtmf_src_unlock_stop (GstBaseSrc *src) { + GstDTMFSrc *dtmfsrc = GST_DTMF_SRC (src); + + GST_DEBUG_OBJECT (dtmfsrc, "Unlock stopped"); + + GST_OBJECT_LOCK (dtmfsrc); + dtmfsrc->paused = FALSE; + GST_OBJECT_UNLOCK (dtmfsrc); + + return TRUE; +} + static GstStateChangeReturn gst_dtmf_src_change_state (GstElement * element, GstStateChange transition) { @@ -726,7 +792,6 @@ gst_dtmf_src_change_state (GstElement * element, GstStateChange transition) switch (transition) { case GST_STATE_CHANGE_READY_TO_PAUSED: - case GST_STATE_CHANGE_PAUSED_TO_PLAYING: /* Flushing the event queue */ event = g_async_queue_try_pop (dtmfsrc->event_queue); @@ -734,6 +799,7 @@ gst_dtmf_src_change_state (GstElement * element, GstStateChange transition) g_free (event); event = g_async_queue_try_pop (dtmfsrc->event_queue); } + no_preroll = TRUE; break; default: break; @@ -746,16 +812,12 @@ gst_dtmf_src_change_state (GstElement * element, GstStateChange transition) switch (transition) { case GST_STATE_CHANGE_PLAYING_TO_PAUSED: - GST_DEBUG_OBJECT (dtmfsrc, "PLAYING TO PAUSED"); - if (dtmfsrc->last_event) { - GST_DEBUG_OBJECT (dtmfsrc, "Stopping current event"); - /* Don't forget to release the stream lock */ - gst_dtmf_src_set_stream_lock (dtmfsrc, FALSE); - g_free (dtmfsrc->last_event); - dtmfsrc->last_event = NULL; - } + GST_DEBUG_OBJECT (dtmfsrc, "PLAYING TO PAUSED"); + no_preroll = TRUE; + break; + case GST_STATE_CHANGE_PAUSED_TO_READY: GST_DEBUG_OBJECT (dtmfsrc, "Flushing event queue"); /* Flushing the event queue */ event = g_async_queue_try_pop (dtmfsrc->event_queue); diff --git a/gst/dtmf/gstdtmfsrc.h b/gst/dtmf/gstdtmfsrc.h index 1ca44643..0340a7c2 100644 --- a/gst/dtmf/gstdtmfsrc.h +++ b/gst/dtmf/gstdtmfsrc.h @@ -73,11 +73,12 @@ struct _GstDTMFSrc { GstBaseSrc parent; GAsyncQueue* event_queue; GstDTMFSrcEvent* last_event; - GstClockID clock_id; - gboolean task_paused; guint16 interval; GstClockTime timestamp; + + gboolean paused; + GstClockID clockid; }; -- cgit v1.2.1