summaryrefslogtreecommitdiffstats
path: root/gst/dccp/gstdccp.c
diff options
context:
space:
mode:
Diffstat (limited to 'gst/dccp/gstdccp.c')
-rw-r--r--gst/dccp/gstdccp.c229
1 files changed, 148 insertions, 81 deletions
diff --git a/gst/dccp/gstdccp.c b/gst/dccp/gstdccp.c
index 8a4f02df..c9c100e7 100644
--- a/gst/dccp/gstdccp.c
+++ b/gst/dccp/gstdccp.c
@@ -36,14 +36,16 @@
#endif
/* Prototypes and definitions for private functions and not exported via gstdccp.h */
-gint gst_dccp_socket_write (int socket, const void *buf, size_t count,
- int packet_size);
-gboolean gst_dccp_socket_connected (GstElement * element, int server_sock_fd);
+GstFlowReturn gst_dccp_socket_write (GstElement * element, int socket,
+ const void *buf, size_t count, int packet_size);
struct sockaddr_in gst_dccp_create_sockaddr (GstElement * element, gchar * ip,
int port);
-/* Resolves host to IP address
- * @return a gchar pointer containing the ip address or NULL
+/*
+ * Resolves host to IP address
+ * @param element - the element
+ * @return a gchar pointer containing the ip address or NULL if it
+ * couldn't resolve the host to a IP adress
*/
gchar *
gst_dccp_host_to_ip (GstElement * element, const gchar * host)
@@ -83,10 +85,15 @@ gst_dccp_host_to_ip (GstElement * element, const gchar * host)
return ip;
}
-/* Read a buffer from the given socket
- * @returns:
- * a GstBuffer from which data should be read
- * or NULL, indicating a connection close or an error. Handle it with EOS.
+/*
+ * Read a buffer from the given socket
+ *
+ * @param this - the element that has the socket that will be read
+ * @param socket - the socket fd that will be read
+ * @param buf - the buffer with the data read from the socket
+ * @return GST_FLOW_OK if the read operation was successful
+ * or GST_FLOW_ERROR indicating a connection close or an error.
+ * Handle it with EOS.
*/
GstFlowReturn
gst_dccp_read_buffer (GstElement * this, int socket, GstBuffer ** buf)
@@ -96,6 +103,8 @@ gst_dccp_read_buffer (GstElement * this, int socket, GstBuffer ** buf)
int ret;
ssize_t bytes_read;
int readsize;
+ struct msghdr mh;
+ struct iovec iov;
*buf = NULL;
@@ -124,7 +133,23 @@ gst_dccp_read_buffer (GstElement * this, int socket, GstBuffer ** buf)
}
*buf = gst_buffer_new_and_alloc (readsize);
- bytes_read = read (socket, GST_BUFFER_DATA (*buf), readsize);
+
+ memset (&mh, 0, sizeof (mh));
+ mh.msg_name = NULL;
+ mh.msg_namelen = 0;
+ iov.iov_base = GST_BUFFER_DATA (*buf);
+ iov.iov_len = readsize;
+ mh.msg_iov = &iov;
+ mh.msg_iovlen = 1;
+ mh.msg_control = NULL;
+ mh.msg_controllen = 0;
+
+ bytes_read = recvmsg (socket, &mh, 0);
+
+ if (bytes_read != readsize) {
+ GST_DEBUG_OBJECT (this, ("Error while reading data"));
+ return GST_FLOW_ERROR;
+ }
GST_LOG_OBJECT (this, "bytes read %" G_GSIZE_FORMAT, bytes_read);
GST_LOG_OBJECT (this, "returning buffer of size %d", GST_BUFFER_SIZE (*buf));
@@ -132,7 +157,9 @@ gst_dccp_read_buffer (GstElement * this, int socket, GstBuffer ** buf)
return GST_FLOW_OK;
}
-/* Create a new socket
+/* Create a new DCCP socket
+ *
+ * @param element - the element
* @return the socket file descriptor
*/
gint
@@ -147,7 +174,10 @@ gst_dccp_create_new_socket (GstElement * element)
}
/* Connect to a server
- * @return true in case of successfull connection, false otherwise
+ * @param element - the element
+ * @param server_sin - a struct with a server address and port
+ * @param sock_fd - the socket to connect
+ * @return TRUE in case of successful connection, FALSE otherwise
*/
gboolean
gst_dccp_connect_to_server (GstElement * element, struct sockaddr_in server_sin,
@@ -158,11 +188,15 @@ gst_dccp_connect_to_server (GstElement * element, struct sockaddr_in server_sin,
if (connect (sock_fd, (struct sockaddr *) &server_sin, sizeof (server_sin))) {
switch (errno) {
case ECONNREFUSED:
- GST_ERROR_OBJECT (element, "Connection refused.");
+ GST_ELEMENT_ERROR (element, RESOURCE, OPEN_WRITE,
+ ("Connection to %s:%d refused.", inet_ntoa (server_sin.sin_addr),
+ ntohs (server_sin.sin_port)), (NULL));
return FALSE;
break;
default:
- GST_ERROR_OBJECT (element, "Connection failed.");
+ GST_ELEMENT_ERROR (element, RESOURCE, OPEN_READ, (NULL),
+ ("Connect to %s:%d failed: %s", inet_ntoa (server_sin.sin_addr),
+ ntohs (server_sin.sin_port), g_strerror (errno)));
return FALSE;
break;
}
@@ -173,6 +207,9 @@ gst_dccp_connect_to_server (GstElement * element, struct sockaddr_in server_sin,
/* FIXME support only one client */
/*
* Accept connection on the server socket.
+ *
+ * @param element - the element
+ * @param server_sock_fd - the server socket file descriptor
* @return the socket of the client connected to the server.
*/
gint
@@ -183,29 +220,30 @@ gst_dccp_server_wait_connections (GstElement * element, int server_sock_fd)
struct sockaddr_in client_address;
unsigned int client_address_len;
- /* For some stupid reason, client_address and client_address_len has to be
- * zeroed */
memset (&client_address, 0, sizeof (client_address));
client_address_len = 0;
if ((client_sock_fd =
accept (server_sock_fd, (struct sockaddr *) &client_address,
&client_address_len)) == -1) {
+ GST_ELEMENT_ERROR (element, RESOURCE, OPEN_WRITE, (NULL),
+ ("Could not accept client on server socket %d: %s (%d)",
+ server_sock_fd, g_strerror (errno), errno));
return -1;
}
- /* to support multiple connection, fork here a new thread passing the
- * client_sock_fd returned by accept function.
- */
- GST_DEBUG_OBJECT (element, "added new client ip %s with fd %d",
+ GST_DEBUG_OBJECT (element, "Added new client ip %s with fd %d.",
inet_ntoa (client_address.sin_addr), client_sock_fd);
- /* return the thread object, instead of the fd */
return client_sock_fd;
}
/*
* Bind a server address.
+ *
+ * @param element - the element
+ * @param server_sock_fd - the server socket fd
+ * @param server_sin - the address and the port to bind the server on
* @return true in success, false otherwise.
*/
gboolean
@@ -214,14 +252,15 @@ gst_dccp_bind_server_socket (GstElement * element, int server_sock_fd,
{
int ret;
- GST_DEBUG_OBJECT (element, "binding server socket to address");
+ GST_DEBUG_OBJECT (element, "Binding server socket to address.");
+
ret = bind (server_sock_fd, (struct sockaddr *) &server_sin,
sizeof (server_sin));
if (ret) {
switch (errno) {
default:
GST_ELEMENT_ERROR (element, RESOURCE, OPEN_READ, (NULL),
- ("bind on port %d failed: %s", server_sin.sin_port,
+ ("Bind on port %d failed: %s", ntohs (server_sin.sin_port),
g_strerror (errno)));
return FALSE;
break;
@@ -230,45 +269,65 @@ gst_dccp_bind_server_socket (GstElement * element, int server_sock_fd,
return TRUE;
}
+/*
+ * Listen on server socket.
+ *
+ * @param element - the element
+ * @param server_sock_fd - the server socket fd
+ * @return true in success, false otherwise.
+ */
gboolean
gst_dccp_listen_server_socket (GstElement * element, int server_sock_fd)
{
- GST_DEBUG_OBJECT (element, "listening on server socket %d with queue of %d",
+ GST_DEBUG_OBJECT (element, "Listening on server socket %d with queue of %d",
server_sock_fd, DCCP_BACKLOG);
+
if (listen (server_sock_fd, DCCP_BACKLOG) == -1) {
GST_ELEMENT_ERROR (element, RESOURCE, OPEN_READ, (NULL),
("Could not listen on server socket: %s", g_strerror (errno)));
return FALSE;
}
+
GST_DEBUG_OBJECT (element,
- "listened on server socket %d, returning from connection setup",
+ "Listened on server socket %d, returning from connection setup",
server_sock_fd);
return TRUE;
}
-/* FIXME */
-gboolean
-gst_dccp_socket_connected (GstElement * element, int server_sock_fd)
-{
- return FALSE;
-}
-
/* Write buffer to given socket incrementally.
- * Returns number of bytes written.
+ *
+ * @param element - the element
+ * @param socket - the socket
+ * @param buf - the buffer that will be written
+ * @param size - the number of bytes of the buffer
+ * @param packet_size - the MTU
+ * @return the number of bytes written.
*/
-gint
-gst_dccp_socket_write (int socket, const void *buf, size_t size,
- int packet_size)
+GstFlowReturn
+gst_dccp_socket_write (GstElement * element, int socket, const void *buf,
+ size_t size, int packet_size)
{
size_t bytes_written = 0;
ssize_t wrote;
+ struct iovec iov;
+ struct msghdr mh;
+ memset (&mh, 0, sizeof (mh));
+
while (bytes_written < size) {
do {
- wrote = write (socket, (const char *) buf + bytes_written,
- MIN (packet_size, size - bytes_written));
+ mh.msg_name = NULL;
+ mh.msg_namelen = 0;
+ iov.iov_base = (char *) buf + bytes_written;
+ iov.iov_len = MIN (packet_size, size - bytes_written);
+ mh.msg_iov = &iov;
+ mh.msg_iovlen = 1;
+ mh.msg_control = NULL;
+ mh.msg_controllen = 0;
+
+ wrote = sendmsg (socket, &mh, 0);
} while (wrote == -1 && errno == EAGAIN);
/* TODO print the send error */
@@ -276,17 +335,35 @@ gst_dccp_socket_write (int socket, const void *buf, size_t size,
}
if (bytes_written < 0)
- GST_WARNING ("error while writing");
+ GST_WARNING ("Error while writing.");
else
- GST_LOG ("wrote %" G_GSIZE_FORMAT " bytes succesfully", bytes_written);
- return bytes_written;
+ GST_LOG_OBJECT (element, "Wrote %" G_GSIZE_FORMAT " bytes succesfully.",
+ bytes_written);
+
+ if (bytes_written != size) {
+ GST_ELEMENT_ERROR (element, RESOURCE, WRITE,
+ ("Error while sending data to socket %d.", socket),
+ ("Only %" G_GSIZE_FORMAT " of %u bytes written: %s",
+ bytes_written, size, g_strerror (errno)));
+ return GST_FLOW_ERROR;
+ }
+
+ return GST_FLOW_OK;
}
+/* Write buffer to given socket.
+ *
+ * @param this - the element
+ * @param buf - the buffer that will be written
+ * @param client_sock_fd - the client socket
+ * @param packet_size - the MTU
+ * @return GST_FLOW_OK if the send operation was successful, GST_FLOW_ERROR otherwise.
+ */
GstFlowReturn
gst_dccp_send_buffer (GstElement * this, GstBuffer * buffer, int client_sock_fd,
int packet_size)
{
- size_t wrote;
+// size_t wrote;
gint size = 0;
guint8 *data;
@@ -296,21 +373,17 @@ gst_dccp_send_buffer (GstElement * this, GstBuffer * buffer, int client_sock_fd,
GST_LOG_OBJECT (this, "writing %d bytes", size);
if (packet_size < 0) {
- GST_LOG_OBJECT (this, "error getting MTU");
return GST_FLOW_ERROR;
}
- wrote = gst_dccp_socket_write (client_sock_fd, data, size, packet_size);
-
- if (wrote != size) {
- GST_DEBUG_OBJECT (this, ("Error while sending data"));
- return GST_FLOW_ERROR;
- }
- return GST_FLOW_OK;
+ return gst_dccp_socket_write (this, client_sock_fd, data, size, packet_size);
}
/*
* Create socket address.
+ * @param element - the element
+ * @param ip - the ip address
+ * @param port - the port
* @return sockaddr_in.
*/
struct sockaddr_in
@@ -326,6 +399,12 @@ gst_dccp_create_sockaddr (GstElement * element, gchar * ip, int port)
return sin;
}
+/*
+ * Make address reusable.
+ * @param element - the element
+ * @param sock_fd - the socket
+ * @return TRUE if the operation was successful, FALSE otherwise.
+ */
gboolean
gst_dccp_make_address_reusable (GstElement * element, int sock_fd)
{
@@ -340,7 +419,13 @@ gst_dccp_make_address_reusable (GstElement * element, int sock_fd)
return TRUE;
}
-/* DCCP socket specific stuffs */
+/*
+ * Set DCCP congestion control.
+ * @param element - the element
+ * @param sock_fd - the socket
+ * @param ccid - the ccid number
+ * @return TRUE if the operation was successful, FALSE otherwise.
+ */
gboolean
gst_dccp_set_ccid (GstElement * element, int sock_fd, uint8_t ccid)
{
@@ -408,6 +493,12 @@ gst_dccp_get_ccid (GstElement * element, int sock_fd, int tx_or_rx)
return ccid;
}
+/*
+ * Get the socket MTU.
+ * @param element - the element
+ * @param sock - the socket
+ * @return the MTU if the operation was successful, -1 otherwise.
+ */
gint
gst_dccp_get_max_packet_size (GstElement * element, int sock)
{
@@ -423,36 +514,12 @@ gst_dccp_get_max_packet_size (GstElement * element, int sock)
return size;
}
-/* Still not used and need to be FIXED */
-gboolean
-gst_dccp_set_sock_windowsize (GstElement * element, int sock, int winSize,
- gboolean inSend)
+void
+gst_dccp_socket_close (GstElement * element, int *socket)
{
-#ifdef SO_SNDBUF
- int rc;
-
- if (!inSend) {
- /* receive buffer -- set
- * note: results are verified after connect() or listen(),
- * since some OS's don't show the corrected value until then. */
- rc = setsockopt (sock, SOL_DCCP, SO_RCVBUF,
- (char *) &winSize, sizeof (winSize));
- GST_DEBUG_OBJECT (element, "set rcv sockbuf: %d", winSize);
- } else {
- /* send buffer -- set
- * note: results are verified after connect() or listen(),
- * since some OS's don't show the corrected value until then. */
- rc = setsockopt (sock, SOL_DCCP, SO_SNDBUF,
- (char *) &winSize, sizeof (winSize));
- GST_DEBUG_OBJECT (element, "set snd sockbuf: %d", winSize);
+ if (socket >= 0) {
+ GST_DEBUG_OBJECT (element, "closing socket");
+ close (*socket);
+ *socket = -1;
}
-
- if (rc < 0) {
- GST_ELEMENT_ERROR (element, RESOURCE, SETTINGS, (NULL),
- ("Could not set window size %d: %s", errno, g_strerror (errno)));
- return FALSE;
- }
-#endif /* SO_SNDBUF */
-
- return TRUE;
}