/*
* Farstream - Farstream MSN Connection
*
* Copyright 2008 Richard Spiers <richard.spiers@gmail.com>
* Copyright 2007 Nokia Corp.
* Copyright 2007-2009 Collabora Ltd.
* @author: Olivier Crete <olivier.crete@collabora.co.uk>
* @author: Youness Alaoui <youness.alaoui@collabora.co.uk>
*
* fs-msn-connection.c - A MSN Connection gobject
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include "fs-msn-connection.h"
#include <arpa/inet.h>
#include <errno.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <string.h>
#include <sys/socket.h>
#include <unistd.h>
#include <stdio.h>
#ifdef HAVE_STDLIB_H
#include <stdlib.h>
#endif
#include <gst/gst.h>
#include <nice/interfaces.h>
#define GST_CAT_DEFAULT fsmsnconference_debug
/* Signals */
enum
{
SIGNAL_NEW_LOCAL_CANDIDATE,
SIGNAL_LOCAL_CANDIDATES_PREPARED,
SIGNAL_CONNECTED,
SIGNAL_CONNECTION_FAILED,
N_SIGNALS
};
static guint signals[N_SIGNALS];
/* props */
enum
{
PROP_0,
PROP_SESSION_ID
};
typedef enum {
FS_MSN_STATUS_AUTH,
FS_MSN_STATUS_CONNECTED,
FS_MSN_STATUS_CONNECTED2,
FS_MSN_STATUS_SEND_RECEIVE,
FS_MSN_STATUS_PAUSED,
} FsMsnStatus;
typedef struct _FsMsnPollFD FsMsnPollFD;
typedef void (*PollFdCallback) (FsMsnConnection *self, FsMsnPollFD *pollfd);
struct _FsMsnPollFD {
GstPollFD pollfd;
FsMsnStatus status;
gboolean server;
gboolean want_read;
gboolean want_write;
PollFdCallback callback;
};
#define FS_MSN_CONNECTION_LOCK(conn) g_rec_mutex_lock(&(conn)->mutex)
#define FS_MSN_CONNECTION_UNLOCK(conn) g_rec_mutex_unlock(&(conn)->mutex)
G_DEFINE_TYPE(FsMsnConnection, fs_msn_connection, G_TYPE_OBJECT);
static void fs_msn_connection_dispose (GObject *object);
static void fs_msn_connection_finalize (GObject *object);
static void fs_msn_connection_get_property (GObject *object,
guint prop_id,
GValue *value,
GParamSpec *pspec);
static void fs_msn_connection_set_property (GObject *object,
guint prop_id,
const GValue *value,
GParamSpec *pspec);
static gboolean fs_msn_connection_attempt_connection_locked (
FsMsnConnection *connection,
FsCandidate *candidate,
GError **error);
static gboolean fs_msn_open_listening_port_unlock (FsMsnConnection *connection,
guint16 port,
GError **error);
static void successful_connection_cb (FsMsnConnection *self, FsMsnPollFD *fd);
static void accept_connection_cb (FsMsnConnection *self, FsMsnPollFD *fd);
static void connection_cb (FsMsnConnection *self, FsMsnPollFD *fd);
static gpointer connection_polling_thread (gpointer data);
static void shutdown_fd (FsMsnConnection *self, FsMsnPollFD *pollfd,
gboolean equal);
static void shutdown_fd_locked (FsMsnConnection *self, FsMsnPollFD *pollfd,
gboolean equal);
static FsMsnPollFD * add_pollfd_locked (FsMsnConnection *self, int fd,
PollFdCallback callback, gboolean read, gboolean write, gboolean server);
static void
fs_msn_connection_class_init (FsMsnConnectionClass *klass)
{
GObjectClass *gobject_class = (GObjectClass *) klass;
gobject_class->dispose = fs_msn_connection_dispose;
gobject_class->finalize = fs_msn_connection_finalize;
gobject_class->get_property = fs_msn_connection_get_property;
gobject_class->set_property = fs_msn_connection_set_property;
signals[SIGNAL_NEW_LOCAL_CANDIDATE] = g_signal_new
("new-local-candidate",
G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST,
0,
NULL,
NULL,
g_cclosure_marshal_VOID__BOXED,
G_TYPE_NONE, 1, FS_TYPE_CANDIDATE);
signals[SIGNAL_LOCAL_CANDIDATES_PREPARED] = g_signal_new
("local-candidates-prepared",
G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST,
0,
NULL,
NULL,
g_cclosure_marshal_VOID__VOID,
G_TYPE_NONE, 0);
signals[SIGNAL_CONNECTED] = g_signal_new
("connected",
G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST,
0,
NULL,
NULL,
g_cclosure_marshal_VOID__UINT,
G_TYPE_NONE, 1, G_TYPE_UINT);
signals[SIGNAL_CONNECTION_FAILED] = g_signal_new
("connection-failed",
G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST,
0,
NULL,
NULL,
g_cclosure_marshal_VOID__VOID,
G_TYPE_NONE, 0);
g_object_class_install_property (gobject_class,
PROP_SESSION_ID,
g_param_spec_uint ("session-id",
"The session-id of the session",
"This is the session-id of the MSN session",
1, 9999, 1,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
}
static void
fs_msn_connection_init (FsMsnConnection *self)
{
/* member init */
self->poll_timeout = GST_CLOCK_TIME_NONE;
self->poll = gst_poll_new (TRUE);
gst_poll_set_flushing (self->poll, FALSE);
self->pollfds = g_ptr_array_new ();
g_rec_mutex_init (&self->mutex);
}
static void
fs_msn_connection_dispose (GObject *object)
{
FsMsnConnection *self = FS_MSN_CONNECTION (object);
FS_MSN_CONNECTION_LOCK(self);
if (self->polling_thread)
{
GThread *polling_thread = g_thread_ref (self->polling_thread);
gst_poll_set_flushing (self->poll, TRUE);
FS_MSN_CONNECTION_UNLOCK(self);
g_thread_join (polling_thread);
FS_MSN_CONNECTION_LOCK(self);
g_thread_unref (polling_thread);
self->polling_thread = NULL;
}
FS_MSN_CONNECTION_UNLOCK(self);
G_OBJECT_CLASS (fs_msn_connection_parent_class)->dispose (object);
}
static void
fs_msn_connection_finalize (GObject *object)
{
FsMsnConnection *self = FS_MSN_CONNECTION (object);
gint i;
g_free (self->local_recipient_id);
g_free (self->remote_recipient_id);
gst_poll_free (self->poll);
for (i = 0; i < self->pollfds->len; i++)
{
FsMsnPollFD *p = g_ptr_array_index(self->pollfds, i);
close (p->pollfd.fd);
g_slice_free (FsMsnPollFD, p);
}
g_ptr_array_free (self->pollfds, TRUE);
g_rec_mutex_clear (&self->mutex);
G_OBJECT_CLASS (fs_msn_connection_parent_class)->finalize (object);
}
/**
* fs_msn_connection_new:
* @session: The #FsMsnSession this connection is a child of
* @participant: The #FsMsnParticipant this connection is for
* @direction: the initial #FsDirection for this connection
*
*
* This function create a new connection
*
* Returns: the newly created string or NULL on error
*/
FsMsnConnection *
fs_msn_connection_new (guint session_id, gboolean producer, guint initial_port)
{
FsMsnConnection *self = g_object_new (FS_TYPE_MSN_CONNECTION, NULL);
if (self) {
self->session_id = session_id;
self->initial_port = initial_port;
self->producer = producer;
}
return self;
}
static void
fs_msn_connection_get_property (GObject *object,
guint prop_id,
GValue *value,
GParamSpec *pspec)
{
FsMsnConnection *self = FS_MSN_CONNECTION (object);
FS_MSN_CONNECTION_LOCK (self);
switch (prop_id)
{
case PROP_SESSION_ID:
g_value_set_uint (value, self->session_id);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
FS_MSN_CONNECTION_UNLOCK (self);
}
static void
fs_msn_connection_set_property (GObject *object,
guint prop_id,
const GValue *value,
GParamSpec *pspec)
{
FsMsnConnection *self = FS_MSN_CONNECTION (object);
FS_MSN_CONNECTION_LOCK (self);
switch (prop_id)
{
case PROP_SESSION_ID:
self->session_id = g_value_get_uint (value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
FS_MSN_CONNECTION_LOCK (self);
}
gboolean
fs_msn_connection_gather_local_candidates (FsMsnConnection *self,
GError **error)
{
gboolean ret;
FS_MSN_CONNECTION_LOCK(self);
self->polling_thread = g_thread_try_new ("msn polling thread",
connection_polling_thread, self, error);
if (!self->polling_thread)
{
FS_MSN_CONNECTION_UNLOCK(self);
return FALSE;
}
ret = fs_msn_open_listening_port_unlock (self, self->initial_port, error);
g_signal_emit (self, signals[SIGNAL_LOCAL_CANDIDATES_PREPARED], 0);
return ret;
}
/**
* fs_msn_connection_add_remote_candidate:
*/
gboolean
fs_msn_connection_add_remote_candidates (FsMsnConnection *self,
GList *candidates, GError **error)
{
GList *item = NULL;
gchar *recipient_id = NULL;
gboolean ret = FALSE;
guint session_id = 0;
if (!candidates)
{
g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS,
"Candidate list can no be empty");
return FALSE;
}
FS_MSN_CONNECTION_LOCK(self);
recipient_id = self->remote_recipient_id;
for (item = candidates; item; item = g_list_next (item))
{
FsCandidate *candidate = item->data;
if (!candidate->ip || !candidate->port)
{
g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS,
"The candidate passed does not contain a valid ip or port");
goto out;
}
if (!candidate->foundation)
{
g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS,
"The candidate passed does not have a foundation (MSN recipient ID)");
goto out;
}
if (recipient_id)
{
if (g_strcmp0 (candidate->foundation, recipient_id) != 0)
{
g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS,
"The candidates do not have the same recipient ID");
goto out;
}
}
else
{
recipient_id = candidate->foundation;
}
if (candidate->username)
{
gint sid = atoi (candidate->username);
if (sid < 1 || sid > 9999)
{
g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS,
"The session ID (in the username) must be between 1 and 9999,"
" %d is invalid", sid);
goto out;
}
if (session_id)
{
if (session_id != sid)
{
g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS,
"The candidates do not have the same session ID"
" (in the username)");
goto out;
}
}
else
{
session_id = sid;
}
}
}
self->remote_recipient_id = g_strdup (recipient_id);
self->session_id = session_id;
ret = TRUE;
for (item = candidates; item; item = g_list_next (item))
{
FsCandidate *candidate = item->data;
if (!fs_msn_connection_attempt_connection_locked (self, candidate, error))
{
ret = FALSE;
break;
}
}
out:
FS_MSN_CONNECTION_UNLOCK(self);
return ret;
}
static GList *
filter_ips_ipv4 (GList *ips)
{
GList *item;
for (item = ips; item;)
{
gchar *ip = item->data;
GList *next = item->next;
if (!strchr (ip, '.'))
{
g_free (ip);
ips = g_list_delete_link (ips, item);
}
item = next;
}
return ips;
}
static gboolean
fs_msn_open_listening_port_unlock (FsMsnConnection *self, guint16 port,
GError **error)
{
gint fd = -1;
struct sockaddr_in myaddr;
guint myaddr_len = sizeof (struct sockaddr_in);
FsCandidate * candidate = NULL;
GList *addresses = nice_interfaces_get_local_ips (FALSE);
GList *item = NULL;
gchar *session_id;
addresses = filter_ips_ipv4 (addresses);
GST_DEBUG ("Attempting to listen on port %d.....",port);
if ( (fd = socket(PF_INET, SOCK_STREAM, 0)) < 0 )
{
gchar error_str[256];
strerror_r (errno, error_str, 256);
g_set_error (error, FS_ERROR, FS_ERROR_NETWORK,
"Could not create socket: %s", error_str);
goto error;
}
// set non-blocking mode
fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK);
for (;;) {
GST_DEBUG ("Attempting to listen on port %d.....",port);
memset(&myaddr, 0, sizeof(myaddr));
myaddr.sin_family = AF_INET;
myaddr.sin_port = htons (port);
// bind
if (bind(fd, (struct sockaddr *) &myaddr, sizeof(myaddr)) != 0)
{
if (port != 0 && errno == EADDRINUSE)
{
port++;
}
else
{
gchar error_str[256];
strerror_r (errno, error_str, 256);
g_set_error (error, FS_ERROR, FS_ERROR_NETWORK,
"Could not bind socket: %s", error_str);
goto error;
}
} else {
/* Listen */
if (listen(fd, 3) != 0)
{
if (port != 0 && errno == EADDRINUSE)
{
port++;
}
else
{
gchar error_str[256];
strerror_r (errno, error_str, 256);
g_set_error (error, FS_ERROR, FS_ERROR_NETWORK,
"Could not listen on socket: %s", error_str);
goto error;
}
}
else
{
goto done;
}
}
}
done:
if (getsockname (fd, (struct sockaddr *) &myaddr, &myaddr_len) < 0) {
gchar error_str[256];
strerror_r (errno, error_str, 256);
g_set_error (error, FS_ERROR, FS_ERROR_NETWORK,
"Could not get the socket name: %s", error_str);
goto error;
}
port = ntohs (myaddr.sin_port);
add_pollfd_locked (self, fd, accept_connection_cb, TRUE, TRUE, FALSE);
GST_DEBUG ("Listening on port %d", port);
self->local_recipient_id = g_strdup_printf ("%d",
g_random_int_range (100, 199));
session_id = g_strdup_printf ("%u", self->session_id);
FS_MSN_CONNECTION_UNLOCK (self);
for (item = addresses;
item;
item = g_list_next (item))
{
candidate = fs_candidate_new (self->local_recipient_id, 1,
FS_CANDIDATE_TYPE_HOST, FS_NETWORK_PROTOCOL_TCP, item->data, port);
candidate->username = g_strdup (session_id);
g_signal_emit (self, signals[SIGNAL_NEW_LOCAL_CANDIDATE], 0, candidate);
fs_candidate_destroy (candidate);
}
g_free (session_id);
g_list_foreach (addresses, (GFunc) g_free, NULL);
g_list_free (addresses);
return TRUE;
error:
if (fd >= 0)
close (fd);
g_list_foreach (addresses, (GFunc) g_free, NULL);
g_list_free (addresses);
FS_MSN_CONNECTION_UNLOCK (self);
return FALSE;
}
static gboolean
fs_msn_connection_attempt_connection_locked (FsMsnConnection *connection,
FsCandidate *candidate,
GError **error)
{
FsMsnConnection *self = FS_MSN_CONNECTION (connection);
gint fd = -1;
gint ret;
struct sockaddr_in theiraddr;
memset(&theiraddr, 0, sizeof(theiraddr));
if ( (fd = socket(PF_INET, SOCK_STREAM, 0)) == -1 )
{
gchar error_str[256];
strerror_r (errno, error_str, 256);
g_set_error (error, FS_ERROR, FS_ERROR_NETWORK,
"Could not create socket: %s", error_str);
return FALSE;
}
// set non-blocking mode
fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK);
theiraddr.sin_family = AF_INET;
theiraddr.sin_addr.s_addr = inet_addr (candidate->ip);
theiraddr.sin_port = htons (candidate->port);
GST_DEBUG ("Attempting connection to %s %d on socket %d", candidate->ip,
candidate->port, fd);
// this is non blocking, the return value isn't too usefull
ret = connect (fd, (struct sockaddr *) &theiraddr, sizeof (theiraddr));
if (ret < 0 && errno != EINPROGRESS)
{
gchar error_str[256];
strerror_r (errno, error_str, 256);
g_set_error (error, FS_ERROR, FS_ERROR_NETWORK,
"Could not connect socket: %s", error_str);
close (fd);
return FALSE;
}
FS_MSN_CONNECTION_LOCK (self);
add_pollfd_locked (self, fd, successful_connection_cb, TRUE, TRUE, FALSE);
FS_MSN_CONNECTION_UNLOCK (self);
return TRUE;
}
static void
accept_connection_cb (FsMsnConnection *self, FsMsnPollFD *pollfd)
{
struct sockaddr_in in;
int fd = -1;
socklen_t n = sizeof (in);
if (gst_poll_fd_has_error (self->poll, &pollfd->pollfd) ||
gst_poll_fd_has_closed (self->poll, &pollfd->pollfd))
{
GST_WARNING ("Error in accept socket : %d", pollfd->pollfd.fd);
goto error;
}
if ((fd = accept(pollfd->pollfd.fd,
(struct sockaddr*) &in, &n)) == -1)
{
GST_ERROR ("Error while running accept() %d", errno);
return;
}
FS_MSN_CONNECTION_LOCK (self);
add_pollfd_locked (self, fd, connection_cb, TRUE, FALSE, TRUE);
FS_MSN_CONNECTION_UNLOCK (self);
return;
/* Error */
error:
GST_WARNING ("Got error from fd %d, closing", fd);
// find, shutdown and remove channel from fdlist
shutdown_fd (self, pollfd, TRUE);
return;
}
static void
successful_connection_cb (FsMsnConnection *self, FsMsnPollFD *pollfd)
{
gint error;
socklen_t option_len;
GST_DEBUG ("handler called on fd %d", pollfd->pollfd.fd);
errno = 0;
if (gst_poll_fd_has_error (self->poll, &pollfd->pollfd) ||
gst_poll_fd_has_closed (self->poll, &pollfd->pollfd))
{
GST_WARNING ("connecton closed or error");
goto error;
}
option_len = sizeof(error);
/* Get the error option */
if (getsockopt(pollfd->pollfd.fd, SOL_SOCKET, SO_ERROR, (void*) &error, &option_len) < 0)
{
g_warning ("getsockopt() failed");
goto error;
}
/* Check if there is an error */
if (error)
{
GST_WARNING ("getsockopt gave an error : %d", error);
goto error;
}
pollfd->callback = connection_cb;
GST_DEBUG ("connection succeeded on socket %p", pollfd);
return;
/* Error */
error:
GST_WARNING ("Got error from fd %d, closing", pollfd->pollfd.fd);
// find, shutdown and remove channel from fdlist
shutdown_fd (self, pollfd, TRUE);
return;
}
static void
connection_cb (FsMsnConnection *self, FsMsnPollFD *pollfd)
{
gboolean success = FALSE;
GST_DEBUG ("handler called on fd:%d server: %d status:%d r:%d w:%d",
pollfd->pollfd.fd,
pollfd->server, pollfd->status,
gst_poll_fd_can_read (self->poll, &pollfd->pollfd),
gst_poll_fd_can_write (self->poll, &pollfd->pollfd));
if (gst_poll_fd_has_error (self->poll, &pollfd->pollfd) ||
gst_poll_fd_has_closed (self->poll, &pollfd->pollfd))
{
GST_WARNING ("connecton closed or error (error: %d closed: %d)",
gst_poll_fd_has_error (self->poll, &pollfd->pollfd),
gst_poll_fd_has_closed (self->poll, &pollfd->pollfd));
goto error;
}
if (gst_poll_fd_can_read (self->poll, &pollfd->pollfd))
{
switch (pollfd->status)
{
case FS_MSN_STATUS_AUTH:
if (pollfd->server)
{
gchar str[35] = {0};
gchar check[35] = {0};
if (recv (pollfd->pollfd.fd, str, 34, 0) == 34)
{
GST_DEBUG ("Got %s, checking if it's auth", str);
FS_MSN_CONNECTION_LOCK(self);
snprintf(check, 35, "recipientid=%s&sessionid=%d\r\n\r\n",
self->local_recipient_id, self->session_id);
FS_MSN_CONNECTION_UNLOCK(self);
if (strncmp (str, check, 35) == 0)
{
GST_DEBUG ("Authentication successful");
pollfd->status = FS_MSN_STATUS_CONNECTED;
pollfd->want_write = TRUE;
gst_poll_fd_ctl_write (self->poll, &pollfd->pollfd, TRUE);
}
else
{
GST_WARNING ("Authentication failed check=%s", check);
goto error;
}
}
else
{
gchar error_str[256];
strerror_r (errno, error_str, 256);
GST_WARNING ("auth: %s", error_str);
goto error;
}
} else {
GST_ERROR ("shouldn't receive data when client on AUTH state");
goto error;
}
break;
case FS_MSN_STATUS_CONNECTED:
if (!pollfd->server)
{
gchar str[14] = {0};
ssize_t size;
size = recv (pollfd->pollfd.fd, str, 13, MSG_PEEK);
if (size > 0)
{
GST_DEBUG ("Got %s, checking if it's connected", str);
if (size == 13 && strcmp (str, "connected\r\n\r\n") == 0)
{
GST_DEBUG ("connection successful");
if (recv (pollfd->pollfd.fd, str, 13, 0) != 13)
{
GST_WARNING ("Could not read 13 bytes that had previously"
" been peeked at!");
goto error;
}
pollfd->status = FS_MSN_STATUS_CONNECTED2;
pollfd->want_write = TRUE;
gst_poll_fd_ctl_write (self->poll, &pollfd->pollfd, TRUE);
}
else if (!self->producer)
{
GST_DEBUG ("connection successful");
pollfd->status = FS_MSN_STATUS_SEND_RECEIVE;
success = TRUE;
}
else
{
GST_WARNING ("connected failed");
goto error;
}
}
else
{
gchar error_str[256];
strerror_r (errno, error_str, 256);
GST_WARNING ("recv: %s", error_str);
goto error;
}
} else {
GST_ERROR ("shouldn't receive data when server on CONNECTED state");
goto error;
}
break;
case FS_MSN_STATUS_CONNECTED2:
if (pollfd->server)
{
gchar str[14] = {0};
ssize_t size;
size = recv (pollfd->pollfd.fd, str, 13, MSG_PEEK);
if (size > 0)
{
GST_DEBUG ("Got %s, checking if it's connected", str);
if (size == 13 && strcmp (str, "connected\r\n\r\n") == 0)
{
GST_DEBUG ("connection successful");
if (recv (pollfd->pollfd.fd, str, 13, 0) != 13)
{
GST_WARNING ("Could not read 13 bytes that had previously"
" been peeked at!");
goto error;
}
pollfd->status = FS_MSN_STATUS_SEND_RECEIVE;
success = TRUE;
}
else if (!self->producer)
{
GST_DEBUG ("connection successful");
pollfd->status = FS_MSN_STATUS_SEND_RECEIVE;
success = TRUE;
}
else
{
GST_WARNING ("connected failed");
goto error;
}
}
else
{
gchar error_str[256];
strerror_r (errno, error_str, 256);
GST_WARNING ("recv: %s", error_str);
goto error;
}
} else {
GST_ERROR ("shouldn't receive data when client on CONNECTED2 state");
goto error;
}
break;
default:
GST_ERROR ("Invalid status %d", pollfd->status);
goto error;
break;
}
}
else if (gst_poll_fd_can_write (self->poll, &pollfd->pollfd))
{
pollfd->want_write = FALSE;
gst_poll_fd_ctl_write (self->poll, &pollfd->pollfd, FALSE);
switch (pollfd->status)
{
case FS_MSN_STATUS_AUTH:
if (!pollfd->server)
{
gchar *str;
FS_MSN_CONNECTION_LOCK(self);
str = g_strdup_printf("recipientid=%s&sessionid=%d\r\n\r\n",
self->remote_recipient_id, self->session_id);
FS_MSN_CONNECTION_UNLOCK(self);
if (send(pollfd->pollfd.fd, str, strlen (str), 0) != -1)
{
GST_DEBUG ("Sent %s", str);
pollfd->status = FS_MSN_STATUS_CONNECTED;
g_free (str);
}
else
{
gchar error_str[256];
strerror_r (errno, error_str, 256);
GST_WARNING ("auth send: %s", error_str);
g_free (str);
goto error;
}
}
break;
case FS_MSN_STATUS_CONNECTED:
if (pollfd->server)
{
if (send(pollfd->pollfd.fd, "connected\r\n\r\n", 13, 0) != -1)
{
GST_DEBUG ("sent connected");
if (self->producer)
{
pollfd->status = FS_MSN_STATUS_SEND_RECEIVE;
success = TRUE;
}
else
{
pollfd->status = FS_MSN_STATUS_CONNECTED2;
}
}
else
{
gchar error_str[256];
strerror_r (errno, error_str, 256);
GST_WARNING ("sending connected: %s", error_str);
goto error;
}
} else {
GST_DEBUG ("shouldn't receive data when server on CONNECTED state");
goto error;
}
break;
case FS_MSN_STATUS_CONNECTED2:
if (!pollfd->server)
{
if (send(pollfd->pollfd.fd, "connected\r\n\r\n", 13, 0) != -1)
{
GST_DEBUG ("sent connected");
pollfd->status = FS_MSN_STATUS_SEND_RECEIVE;
success = TRUE;
}
else
{
gchar error_str[256];
strerror_r (errno, error_str, 256);
GST_WARNING ("sending connected: %s", error_str);
goto error;
}
} else {
GST_ERROR ("shouldn't receive data when client on CONNECTED2 state");
goto error;
}
break;
default:
GST_ERROR ("Invalid status %d", pollfd->status);
goto error;
break;
}
}
if (success) {
// success! we need to shutdown/close all other channels
shutdown_fd (self, pollfd, FALSE);
g_signal_emit (self, signals[SIGNAL_CONNECTED], 0, pollfd->pollfd.fd);
pollfd->want_read = FALSE;
pollfd->want_write = FALSE;
gst_poll_fd_ctl_read (self->poll, &pollfd->pollfd, FALSE);
gst_poll_fd_ctl_write (self->poll, &pollfd->pollfd, FALSE);
}
return;
error:
/* Error */
GST_WARNING ("Got error from fd %d, closing", pollfd->pollfd.fd);
shutdown_fd (self, pollfd, TRUE);
FS_MSN_CONNECTION_LOCK (self);
success = (self->pollfds->len > 1);
FS_MSN_CONNECTION_UNLOCK (self);
if (!success)
g_signal_emit (self, signals[SIGNAL_CONNECTION_FAILED], 0);
return;
}
static gpointer
connection_polling_thread (gpointer data)
{
FsMsnConnection *self = data;
gint ret;
GstClockTime timeout;
GstPoll * poll;
FS_MSN_CONNECTION_LOCK(self);
timeout = self->poll_timeout;
poll = self->poll;
GST_DEBUG ("poll waiting %d", self->pollfds->len);
FS_MSN_CONNECTION_UNLOCK(self);
while ((ret = gst_poll_wait (poll, timeout)) >= 0)
{
GST_DEBUG ("gst_poll_wait returned : %d", ret);
FS_MSN_CONNECTION_LOCK(self);
if (ret > 0)
{
gint i;
for (i = 0; i < self->pollfds->len; i++)
{
FsMsnPollFD *pollfd = NULL;
pollfd = g_ptr_array_index(self->pollfds, i);
GST_DEBUG ("ret %d - i = %d, len = %d", ret, i, self->pollfds->len);
GST_DEBUG ("%p - error %d, close %d, read %d-%d, write %d-%d",
pollfd,
gst_poll_fd_has_error (poll, &pollfd->pollfd),
gst_poll_fd_has_closed (poll, &pollfd->pollfd),
pollfd->want_read,
gst_poll_fd_can_read (poll, &pollfd->pollfd),
pollfd->want_write,
gst_poll_fd_can_write (poll, &pollfd->pollfd));
if (gst_poll_fd_has_error (poll, &pollfd->pollfd) ||
gst_poll_fd_has_closed (poll, &pollfd->pollfd))
{
pollfd->callback (self, pollfd);
shutdown_fd_locked (self, pollfd, TRUE);
i--;
continue;
}
if ((pollfd->want_read &&
gst_poll_fd_can_read (poll, &pollfd->pollfd)) ||
(pollfd->want_write &&
gst_poll_fd_can_write (poll, &pollfd->pollfd)))
{
pollfd->callback (self, pollfd);
}
}
}
timeout = self->poll_timeout;
FS_MSN_CONNECTION_UNLOCK(self);
}
return NULL;
}
static void
shutdown_fd (FsMsnConnection *self, FsMsnPollFD *pollfd, gboolean equal)
{
FS_MSN_CONNECTION_LOCK (self);
shutdown_fd_locked (self, pollfd, equal);
FS_MSN_CONNECTION_UNLOCK (self);
}
static void
shutdown_fd_locked (FsMsnConnection *self, FsMsnPollFD *pollfd, gboolean equal)
{
gint i;
guint closed = 0;
for (i = 0; i < self->pollfds->len; i++)
{
FsMsnPollFD *p = g_ptr_array_index(self->pollfds, i);
if ((equal && p == pollfd) || (!equal && p != pollfd))
{
GST_DEBUG ("Shutting down p %p (fd %d)", p, p->pollfd.fd);
if (!gst_poll_fd_has_closed (self->poll, &p->pollfd))
close (p->pollfd.fd);
if (!gst_poll_remove_fd (self->poll, &p->pollfd))
GST_WARNING ("Could not remove pollfd %p", p);
g_ptr_array_remove_index_fast (self->pollfds, i);
g_slice_free (FsMsnPollFD, p);
closed++;
i--;
}
}
if (closed)
gst_poll_restart (self->poll);
else
GST_WARNING ("Could find pollfd to remove");
}
static FsMsnPollFD *
add_pollfd_locked (FsMsnConnection *self, int fd, PollFdCallback callback,
gboolean read, gboolean write, gboolean server)
{
FsMsnPollFD *pollfd = g_slice_new0 (FsMsnPollFD);
gst_poll_fd_init (&pollfd->pollfd);
pollfd->pollfd.fd = fd;
pollfd->server = server;
pollfd->want_read = read;
pollfd->want_write = write;
pollfd->status = FS_MSN_STATUS_AUTH;
gst_poll_add_fd (self->poll, &pollfd->pollfd);
gst_poll_fd_ctl_read (self->poll, &pollfd->pollfd, read);
gst_poll_fd_ctl_write (self->poll, &pollfd->pollfd, write);
pollfd->callback = callback;
GST_DEBUG ("ADD_POLLFD %p (%p) - error %d, close %d, read %d-%d, write %d-%d",
self->pollfds, pollfd,
gst_poll_fd_has_error (self->poll, &pollfd->pollfd),
gst_poll_fd_has_closed (self->poll, &pollfd->pollfd),
pollfd->want_read,
gst_poll_fd_can_read (self->poll, &pollfd->pollfd),
pollfd->want_write,
gst_poll_fd_can_write (self->poll, &pollfd->pollfd));
g_ptr_array_add (self->pollfds, pollfd);
gst_poll_restart (self->poll);
return pollfd;
}