Blob Blame History Raw
/* GStreamer
 * Copyright (C) 2011 David Schleef <ds@entropywave.com>
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Library General Public
 * License as published by the Free Software Foundation; either
 * version 2 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Library General Public License for more details.
 *
 * You should have received a copy of the GNU Library General Public
 * License along with this library; if not, write to the
 * Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
 * Boston, MA 02110-1335, USA.
 */
/**
 * SECTION:element-gstsouphttpclientsink
 *
 * The souphttpclientsink element sends pipeline data to an HTTP server
 * using HTTP PUT commands.
 *
 * <refsect2>
 * <title>Example launch line</title>
 * |[
 * gst-launch-1.0 -v videotestsrc num-buffers=300 ! theoraenc ! oggmux !
 *   souphttpclientsink location=http://server/filename.ogv
 * ]|
 * 
 * This example encodes 10 seconds of video and sends it to the HTTP
 * server "server" using HTTP PUT commands.
 * </refsect2>
 */

#ifdef HAVE_CONFIG_H
#include "config.h"
#endif

#include <gst/gst.h>
#include <gst/base/gstbasesink.h>
#include "gstsouphttpclientsink.h"
#include "gstsouputils.h"

GST_DEBUG_CATEGORY_STATIC (souphttpclientsink_dbg);
#define GST_CAT_DEFAULT souphttpclientsink_dbg

/* prototypes */


static void gst_soup_http_client_sink_set_property (GObject * object,
    guint property_id, const GValue * value, GParamSpec * pspec);
static void gst_soup_http_client_sink_get_property (GObject * object,
    guint property_id, GValue * value, GParamSpec * pspec);
static void gst_soup_http_client_sink_dispose (GObject * object);
static void gst_soup_http_client_sink_finalize (GObject * object);

static gboolean gst_soup_http_client_sink_set_caps (GstBaseSink * sink,
    GstCaps * caps);
static void gst_soup_http_client_sink_get_times (GstBaseSink * sink,
    GstBuffer * buffer, GstClockTime * start, GstClockTime * end);
static gboolean gst_soup_http_client_sink_start (GstBaseSink * sink);
static gboolean gst_soup_http_client_sink_stop (GstBaseSink * sink);
static gboolean gst_soup_http_client_sink_unlock (GstBaseSink * sink);
static gboolean gst_soup_http_client_sink_event (GstBaseSink * sink,
    GstEvent * event);
static GstFlowReturn gst_soup_http_client_sink_preroll (GstBaseSink * sink,
    GstBuffer * buffer);
static GstFlowReturn gst_soup_http_client_sink_render (GstBaseSink * sink,
    GstBuffer * buffer);

static void gst_soup_http_client_sink_reset (GstSoupHttpClientSink *
    souphttpsink);
static void authenticate (SoupSession * session, SoupMessage * msg,
    SoupAuth * auth, gboolean retrying, gpointer user_data);
static void callback (SoupSession * session, SoupMessage * msg,
    gpointer user_data);
static gboolean gst_soup_http_client_sink_set_proxy (GstSoupHttpClientSink *
    souphttpsink, const gchar * uri);

enum
{
  PROP_0,
  PROP_LOCATION,
  PROP_USER_AGENT,
  PROP_AUTOMATIC_REDIRECT,
  PROP_PROXY,
  PROP_USER_ID,
  PROP_USER_PW,
  PROP_PROXY_ID,
  PROP_PROXY_PW,
  PROP_COOKIES,
  PROP_SESSION,
  PROP_SOUP_LOG_LEVEL,
  PROP_RETRY_DELAY,
  PROP_RETRIES
};

#define DEFAULT_USER_AGENT           "GStreamer souphttpclientsink "
#define DEFAULT_SOUP_LOG_LEVEL       SOUP_LOGGER_LOG_NONE

/* pad templates */

static GstStaticPadTemplate gst_soup_http_client_sink_sink_template =
GST_STATIC_PAD_TEMPLATE ("sink",
    GST_PAD_SINK,
    GST_PAD_ALWAYS,
    GST_STATIC_CAPS_ANY);


/* class initialization */

#define gst_soup_http_client_sink_parent_class parent_class
G_DEFINE_TYPE (GstSoupHttpClientSink, gst_soup_http_client_sink,
    GST_TYPE_BASE_SINK);

static void
gst_soup_http_client_sink_class_init (GstSoupHttpClientSinkClass * klass)
{
  GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
  GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
  GstBaseSinkClass *base_sink_class = GST_BASE_SINK_CLASS (klass);

  gobject_class->set_property = gst_soup_http_client_sink_set_property;
  gobject_class->get_property = gst_soup_http_client_sink_get_property;
  gobject_class->dispose = gst_soup_http_client_sink_dispose;
  gobject_class->finalize = gst_soup_http_client_sink_finalize;

  g_object_class_install_property (gobject_class,
      PROP_LOCATION,
      g_param_spec_string ("location", "Location",
          "URI to send to", "", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
  g_object_class_install_property (gobject_class,
      PROP_USER_AGENT,
      g_param_spec_string ("user-agent", "User-Agent",
          "Value of the User-Agent HTTP request header field",
          DEFAULT_USER_AGENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
  g_object_class_install_property (gobject_class,
      PROP_AUTOMATIC_REDIRECT,
      g_param_spec_boolean ("automatic-redirect", "automatic-redirect",
          "Automatically follow HTTP redirects (HTTP Status Code 3xx)",
          TRUE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
  g_object_class_install_property (gobject_class,
      PROP_PROXY,
      g_param_spec_string ("proxy", "Proxy",
          "HTTP proxy server URI", "",
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
  g_object_class_install_property (gobject_class,
      PROP_USER_ID,
      g_param_spec_string ("user-id", "user-id",
          "user id for authentication", "",
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
  g_object_class_install_property (gobject_class, PROP_USER_PW,
      g_param_spec_string ("user-pw", "user-pw",
          "user password for authentication", "",
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
  g_object_class_install_property (gobject_class, PROP_PROXY_ID,
      g_param_spec_string ("proxy-id", "proxy-id",
          "user id for proxy authentication", "",
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
  g_object_class_install_property (gobject_class, PROP_PROXY_PW,
      g_param_spec_string ("proxy-pw", "proxy-pw",
          "user password for proxy authentication", "",
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
  g_object_class_install_property (gobject_class, PROP_SESSION,
      g_param_spec_object ("session", "session",
          "SoupSession object to use for communication",
          SOUP_TYPE_SESSION, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
  g_object_class_install_property (gobject_class, PROP_COOKIES,
      g_param_spec_boxed ("cookies", "Cookies", "HTTP request cookies",
          G_TYPE_STRV, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
  g_object_class_install_property (gobject_class, PROP_RETRY_DELAY,
      g_param_spec_int ("retry-delay", "Retry Delay",
          "Delay in seconds between retries after a failure", 1, G_MAXINT, 5,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
  g_object_class_install_property (gobject_class, PROP_RETRIES,
      g_param_spec_int ("retries", "Retries",
          "Maximum number of retries, zero to disable, -1 to retry forever",
          -1, G_MAXINT, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 /**
   * GstSoupHttpClientSink::http-log-level:
   *
   * If set and > 0, captures and dumps HTTP session data as
   * log messages if log level >= GST_LEVEL_TRACE
   *
   * Since: 1.4
   */
  g_object_class_install_property (gobject_class, PROP_SOUP_LOG_LEVEL,
      g_param_spec_enum ("http-log-level", "HTTP log level",
          "Set log level for soup's HTTP session log",
          SOUP_TYPE_LOGGER_LOG_LEVEL, DEFAULT_SOUP_LOG_LEVEL,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));

  gst_element_class_add_static_pad_template (gstelement_class,
      &gst_soup_http_client_sink_sink_template);

  gst_element_class_set_static_metadata (gstelement_class, "HTTP client sink",
      "Generic", "Sends streams to HTTP server via PUT",
      "David Schleef <ds@entropywave.com>");

  base_sink_class->set_caps =
      GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_set_caps);
  if (0)
    base_sink_class->get_times =
        GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_get_times);
  base_sink_class->start = GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_start);
  base_sink_class->stop = GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_stop);
  base_sink_class->unlock =
      GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_unlock);
  base_sink_class->event = GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_event);
  if (0)
    base_sink_class->preroll =
        GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_preroll);
  base_sink_class->render =
      GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_render);

  GST_DEBUG_CATEGORY_INIT (souphttpclientsink_dbg, "souphttpclientsink", 0,
      "souphttpclientsink element");

}

static void
gst_soup_http_client_sink_init (GstSoupHttpClientSink * souphttpsink)
{
  const char *proxy;

  g_mutex_init (&souphttpsink->mutex);
  g_cond_init (&souphttpsink->cond);

  souphttpsink->location = NULL;
  souphttpsink->automatic_redirect = TRUE;
  souphttpsink->user_agent = g_strdup (DEFAULT_USER_AGENT);
  souphttpsink->user_id = NULL;
  souphttpsink->user_pw = NULL;
  souphttpsink->proxy_id = NULL;
  souphttpsink->proxy_pw = NULL;
  souphttpsink->prop_session = NULL;
  souphttpsink->timeout = 1;
  souphttpsink->log_level = DEFAULT_SOUP_LOG_LEVEL;
  souphttpsink->retry_delay = 5;
  souphttpsink->retries = 0;
  proxy = g_getenv ("http_proxy");
  if (proxy && !gst_soup_http_client_sink_set_proxy (souphttpsink, proxy)) {
    GST_WARNING_OBJECT (souphttpsink,
        "The proxy in the http_proxy env var (\"%s\") cannot be parsed.",
        proxy);
  }

  gst_soup_http_client_sink_reset (souphttpsink);
}

static void
gst_soup_http_client_sink_reset (GstSoupHttpClientSink * souphttpsink)
{
  g_list_free_full (souphttpsink->queued_buffers,
      (GDestroyNotify) gst_buffer_unref);
  souphttpsink->queued_buffers = NULL;
  g_free (souphttpsink->reason_phrase);
  souphttpsink->reason_phrase = NULL;
  souphttpsink->status_code = 0;
  souphttpsink->offset = 0;
  souphttpsink->failures = 0;

  g_list_free_full (souphttpsink->streamheader_buffers,
      (GDestroyNotify) gst_buffer_unref);
  souphttpsink->streamheader_buffers = NULL;
  g_list_free_full (souphttpsink->sent_buffers,
      (GDestroyNotify) gst_buffer_unref);
  souphttpsink->sent_buffers = NULL;
}

static gboolean
gst_soup_http_client_sink_set_proxy (GstSoupHttpClientSink * souphttpsink,
    const gchar * uri)
{
  if (souphttpsink->proxy) {
    soup_uri_free (souphttpsink->proxy);
    souphttpsink->proxy = NULL;
  }
  if (g_str_has_prefix (uri, "http://")) {
    souphttpsink->proxy = soup_uri_new (uri);
  } else {
    gchar *new_uri = g_strconcat ("http://", uri, NULL);

    souphttpsink->proxy = soup_uri_new (new_uri);
    g_free (new_uri);
  }

  return TRUE;
}

void
gst_soup_http_client_sink_set_property (GObject * object, guint property_id,
    const GValue * value, GParamSpec * pspec)
{
  GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (object);

  g_mutex_lock (&souphttpsink->mutex);
  switch (property_id) {
    case PROP_SESSION:
      if (souphttpsink->prop_session) {
        g_object_unref (souphttpsink->prop_session);
      }
      souphttpsink->prop_session = g_value_dup_object (value);
      break;
    case PROP_LOCATION:
      g_free (souphttpsink->location);
      souphttpsink->location = g_value_dup_string (value);
      souphttpsink->offset = 0;
      if ((souphttpsink->location == NULL)
          || !gst_uri_is_valid (souphttpsink->location)) {
        GST_WARNING_OBJECT (souphttpsink,
            "The location (\"%s\") set, is not a valid uri.",
            souphttpsink->location);
        g_free (souphttpsink->location);
        souphttpsink->location = NULL;
      }
      break;
    case PROP_USER_AGENT:
      g_free (souphttpsink->user_agent);
      souphttpsink->user_agent = g_value_dup_string (value);
      break;
    case PROP_AUTOMATIC_REDIRECT:
      souphttpsink->automatic_redirect = g_value_get_boolean (value);
      break;
    case PROP_USER_ID:
      g_free (souphttpsink->user_id);
      souphttpsink->user_id = g_value_dup_string (value);
      break;
    case PROP_USER_PW:
      g_free (souphttpsink->user_pw);
      souphttpsink->user_pw = g_value_dup_string (value);
      break;
    case PROP_PROXY_ID:
      g_free (souphttpsink->proxy_id);
      souphttpsink->proxy_id = g_value_dup_string (value);
      break;
    case PROP_PROXY_PW:
      g_free (souphttpsink->proxy_pw);
      souphttpsink->proxy_pw = g_value_dup_string (value);
      break;
    case PROP_PROXY:
    {
      const gchar *proxy;

      proxy = g_value_get_string (value);

      if (proxy == NULL) {
        GST_WARNING ("proxy property cannot be NULL");
        goto done;
      }
      if (!gst_soup_http_client_sink_set_proxy (souphttpsink, proxy)) {
        GST_WARNING ("badly formatted proxy URI");
        goto done;
      }
      break;
    }
    case PROP_COOKIES:
      g_strfreev (souphttpsink->cookies);
      souphttpsink->cookies = g_strdupv (g_value_get_boxed (value));
      break;
    case PROP_SOUP_LOG_LEVEL:
      souphttpsink->log_level = g_value_get_enum (value);
      break;
    case PROP_RETRY_DELAY:
      souphttpsink->retry_delay = g_value_get_int (value);
      break;
    case PROP_RETRIES:
      souphttpsink->retries = g_value_get_int (value);
      break;
    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
      break;
  }
done:
  g_mutex_unlock (&souphttpsink->mutex);
}

void
gst_soup_http_client_sink_get_property (GObject * object, guint property_id,
    GValue * value, GParamSpec * pspec)
{
  GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (object);

  switch (property_id) {
    case PROP_SESSION:
      g_value_set_object (value, souphttpsink->prop_session);
      break;
    case PROP_LOCATION:
      g_value_set_string (value, souphttpsink->location);
      break;
    case PROP_AUTOMATIC_REDIRECT:
      g_value_set_boolean (value, souphttpsink->automatic_redirect);
      break;
    case PROP_USER_AGENT:
      g_value_set_string (value, souphttpsink->user_agent);
      break;
    case PROP_USER_ID:
      g_value_set_string (value, souphttpsink->user_id);
      break;
    case PROP_USER_PW:
      g_value_set_string (value, souphttpsink->user_pw);
      break;
    case PROP_PROXY_ID:
      g_value_set_string (value, souphttpsink->proxy_id);
      break;
    case PROP_PROXY_PW:
      g_value_set_string (value, souphttpsink->proxy_pw);
      break;
    case PROP_PROXY:
      if (souphttpsink->proxy == NULL)
        g_value_set_static_string (value, "");
      else {
        char *proxy = soup_uri_to_string (souphttpsink->proxy, FALSE);

        g_value_set_string (value, proxy);
        g_free (proxy);
      }
      break;
    case PROP_COOKIES:
      g_value_set_boxed (value, g_strdupv (souphttpsink->cookies));
      break;
    case PROP_SOUP_LOG_LEVEL:
      g_value_set_enum (value, souphttpsink->log_level);
      break;
    case PROP_RETRY_DELAY:
      g_value_set_int (value, souphttpsink->retry_delay);
      break;
    case PROP_RETRIES:
      g_value_set_int (value, souphttpsink->retries);
      break;
    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
      break;
  }
}

void
gst_soup_http_client_sink_dispose (GObject * object)
{
  GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (object);

  /* clean up as possible.  may be called multiple times */
  if (souphttpsink->prop_session)
    g_object_unref (souphttpsink->prop_session);
  souphttpsink->prop_session = NULL;

  G_OBJECT_CLASS (parent_class)->dispose (object);
}

void
gst_soup_http_client_sink_finalize (GObject * object)
{
  GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (object);

  /* clean up object here */

  g_free (souphttpsink->user_agent);
  g_free (souphttpsink->user_id);
  g_free (souphttpsink->user_pw);
  g_free (souphttpsink->proxy_id);
  g_free (souphttpsink->proxy_pw);
  if (souphttpsink->proxy)
    soup_uri_free (souphttpsink->proxy);
  g_free (souphttpsink->location);
  g_strfreev (souphttpsink->cookies);

  g_cond_clear (&souphttpsink->cond);
  g_mutex_clear (&souphttpsink->mutex);

  G_OBJECT_CLASS (parent_class)->finalize (object);
}



static gboolean
gst_soup_http_client_sink_set_caps (GstBaseSink * sink, GstCaps * caps)
{
  GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink);
  GstStructure *structure;
  const GValue *value_array;
  int i, n;

  GST_DEBUG_OBJECT (souphttpsink, "new stream headers set");
  structure = gst_caps_get_structure (caps, 0);
  value_array = gst_structure_get_value (structure, "streamheader");
  if (value_array) {
    g_list_free_full (souphttpsink->streamheader_buffers,
        (GDestroyNotify) gst_buffer_unref);
    souphttpsink->streamheader_buffers = NULL;

    n = gst_value_array_get_size (value_array);
    for (i = 0; i < n; i++) {
      const GValue *value;
      GstBuffer *buffer;
      value = gst_value_array_get_value (value_array, i);
      buffer = GST_BUFFER (gst_value_get_buffer (value));
      souphttpsink->streamheader_buffers =
          g_list_append (souphttpsink->streamheader_buffers,
          gst_buffer_ref (buffer));
    }
  }

  return TRUE;
}

static void
gst_soup_http_client_sink_get_times (GstBaseSink * sink, GstBuffer * buffer,
    GstClockTime * start, GstClockTime * end)
{

}

static gboolean
thread_ready_idle_cb (gpointer data)
{
  GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (data);

  GST_LOG_OBJECT (souphttpsink, "thread ready");

  g_mutex_lock (&souphttpsink->mutex);
  g_cond_signal (&souphttpsink->cond);
  g_mutex_unlock (&souphttpsink->mutex);

  return FALSE;                 /* only run once */
}

static gpointer
thread_func (gpointer ptr)
{
  GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (ptr);

  GST_DEBUG ("thread start");

  g_main_loop_run (souphttpsink->loop);

  GST_DEBUG ("thread quit");

  return NULL;
}

static gboolean
gst_soup_http_client_sink_start (GstBaseSink * sink)
{
  GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink);

  if (souphttpsink->prop_session) {
    souphttpsink->session = souphttpsink->prop_session;
  } else {
    GSource *source;
    GError *error = NULL;

    souphttpsink->context = g_main_context_new ();

    /* set up idle source to signal when the main loop is running and
     * it's safe for ::stop() to call g_main_loop_quit() */
    source = g_idle_source_new ();
    g_source_set_callback (source, thread_ready_idle_cb, sink, NULL);
    g_source_attach (source, souphttpsink->context);
    g_source_unref (source);

    souphttpsink->loop = g_main_loop_new (souphttpsink->context, TRUE);

    g_mutex_lock (&souphttpsink->mutex);

    souphttpsink->thread = g_thread_try_new ("souphttpclientsink-thread",
        thread_func, souphttpsink, &error);

    if (error != NULL) {
      GST_DEBUG_OBJECT (souphttpsink, "failed to start thread, %s",
          error->message);
      g_error_free (error);
      g_mutex_unlock (&souphttpsink->mutex);
      return FALSE;
    }

    GST_LOG_OBJECT (souphttpsink, "waiting for main loop thread to start up");
    g_cond_wait (&souphttpsink->cond, &souphttpsink->mutex);
    g_mutex_unlock (&souphttpsink->mutex);
    GST_LOG_OBJECT (souphttpsink, "main loop thread running");

    if (souphttpsink->proxy == NULL) {
      souphttpsink->session =
          soup_session_async_new_with_options (SOUP_SESSION_ASYNC_CONTEXT,
          souphttpsink->context, SOUP_SESSION_USER_AGENT,
          souphttpsink->user_agent, SOUP_SESSION_TIMEOUT, souphttpsink->timeout,
          SOUP_SESSION_ADD_FEATURE_BY_TYPE, SOUP_TYPE_PROXY_RESOLVER_DEFAULT,
          NULL);
    } else {
      souphttpsink->session =
          soup_session_async_new_with_options (SOUP_SESSION_ASYNC_CONTEXT,
          souphttpsink->context, SOUP_SESSION_USER_AGENT,
          souphttpsink->user_agent, SOUP_SESSION_TIMEOUT, souphttpsink->timeout,
          SOUP_SESSION_PROXY_URI, souphttpsink->proxy, NULL);
    }

    g_signal_connect (souphttpsink->session, "authenticate",
        G_CALLBACK (authenticate), souphttpsink);
  }

  /* Set up logging */
  gst_soup_util_log_setup (souphttpsink->session, souphttpsink->log_level,
      GST_ELEMENT (souphttpsink));

  return TRUE;
}

static gboolean
gst_soup_http_client_sink_stop (GstBaseSink * sink)
{
  GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink);

  GST_DEBUG ("stop");

  if (souphttpsink->prop_session == NULL) {
    soup_session_abort (souphttpsink->session);
    g_object_unref (souphttpsink->session);
  }

  g_mutex_lock (&souphttpsink->mutex);
  if (souphttpsink->timer) {
    g_source_destroy (souphttpsink->timer);
    g_source_unref (souphttpsink->timer);
    souphttpsink->timer = NULL;
  }
  g_mutex_unlock (&souphttpsink->mutex);

  if (souphttpsink->loop) {
    g_main_loop_quit (souphttpsink->loop);
    g_mutex_lock (&souphttpsink->mutex);
    g_cond_signal (&souphttpsink->cond);
    g_mutex_unlock (&souphttpsink->mutex);
    g_thread_join (souphttpsink->thread);
    g_main_loop_unref (souphttpsink->loop);
    souphttpsink->loop = NULL;
  }
  if (souphttpsink->context) {
    g_main_context_unref (souphttpsink->context);
    souphttpsink->context = NULL;
  }

  gst_soup_http_client_sink_reset (souphttpsink);

  return TRUE;
}

static gboolean
gst_soup_http_client_sink_unlock (GstBaseSink * sink)
{
  GST_DEBUG ("unlock");

  return TRUE;
}

static gboolean
gst_soup_http_client_sink_event (GstBaseSink * sink, GstEvent * event)
{
  GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink);

  GST_DEBUG_OBJECT (souphttpsink, "event");

  if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
    GST_DEBUG_OBJECT (souphttpsink, "got eos");
    g_mutex_lock (&souphttpsink->mutex);
    while (souphttpsink->message) {
      GST_DEBUG_OBJECT (souphttpsink, "waiting");
      g_cond_wait (&souphttpsink->cond, &souphttpsink->mutex);
    }
    g_mutex_unlock (&souphttpsink->mutex);
    GST_DEBUG_OBJECT (souphttpsink, "finished eos");
  }

  return GST_BASE_SINK_CLASS (parent_class)->event (sink, event);
}

static GstFlowReturn
gst_soup_http_client_sink_preroll (GstBaseSink * sink, GstBuffer * buffer)
{
  GST_DEBUG ("preroll");

  return GST_FLOW_OK;
}

static void
send_message_locked (GstSoupHttpClientSink * souphttpsink)
{
  GList *g;
  guint64 n;

  if (souphttpsink->queued_buffers == NULL || souphttpsink->message) {
    return;
  }

  /* If the URI went away, drop all these buffers */
  if (souphttpsink->location == NULL) {
    GST_DEBUG_OBJECT (souphttpsink, "URI went away, dropping queued buffers");
    g_list_free_full (souphttpsink->queued_buffers,
        (GDestroyNotify) gst_buffer_unref);
    souphttpsink->queued_buffers = NULL;
    return;
  }

  souphttpsink->message = soup_message_new ("PUT", souphttpsink->location);
  if (souphttpsink->message == NULL) {
    GST_WARNING_OBJECT (souphttpsink,
        "URI could not be parsed while creating message.");
    g_list_free_full (souphttpsink->queued_buffers,
        (GDestroyNotify) gst_buffer_unref);
    souphttpsink->queued_buffers = NULL;
    return;
  }

  soup_message_set_flags (souphttpsink->message,
      (souphttpsink->automatic_redirect ? 0 : SOUP_MESSAGE_NO_REDIRECT));

  if (souphttpsink->cookies) {
    gchar **cookie;

    for (cookie = souphttpsink->cookies; *cookie != NULL; cookie++) {
      soup_message_headers_append (souphttpsink->message->request_headers,
          "Cookie", *cookie);
    }
  }

  n = 0;
  if (souphttpsink->offset == 0) {
    for (g = souphttpsink->streamheader_buffers; g; g = g_list_next (g)) {
      GstBuffer *buffer = g->data;
      GstMapInfo map;

      GST_DEBUG_OBJECT (souphttpsink, "queueing stream headers");
      gst_buffer_map (buffer, &map, GST_MAP_READ);
      /* Stream headers are updated whenever ::set_caps is called, so there's
       * no guarantees about their lifetime and we ask libsoup to copy them 
       * into the message body with SOUP_MEMORY_COPY. */
      soup_message_body_append (souphttpsink->message->request_body,
          SOUP_MEMORY_COPY, map.data, map.size);
      n += map.size;
      gst_buffer_unmap (buffer, &map);
    }
  }

  for (g = souphttpsink->queued_buffers; g; g = g_list_next (g)) {
    GstBuffer *buffer = g->data;
    if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_HEADER)) {
      GstMapInfo map;

      gst_buffer_map (buffer, &map, GST_MAP_READ);
      /* Queued buffers are only freed in the next iteration of the mainloop
       * after the message body has been written out, so we don't need libsoup
       * to copy those while appending to the body. However, if the buffer is
       * used elsewhere, it should be copied. Hence, SOUP_MEMORY_TEMPORARY. */
      soup_message_body_append (souphttpsink->message->request_body,
          SOUP_MEMORY_TEMPORARY, map.data, map.size);
      n += map.size;
      gst_buffer_unmap (buffer, &map);
    }
  }

  if (souphttpsink->offset != 0) {
    char *s;
    s = g_strdup_printf ("bytes %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "/*",
        souphttpsink->offset, souphttpsink->offset + n - 1);
    soup_message_headers_append (souphttpsink->message->request_headers,
        "Content-Range", s);
    g_free (s);
  }

  if (n == 0) {
    GST_DEBUG_OBJECT (souphttpsink,
        "total size of buffers queued is 0, freeing everything");
    g_list_free_full (souphttpsink->queued_buffers,
        (GDestroyNotify) gst_buffer_unref);
    souphttpsink->queued_buffers = NULL;
    g_object_unref (souphttpsink->message);
    souphttpsink->message = NULL;
    return;
  }

  souphttpsink->sent_buffers = souphttpsink->queued_buffers;
  souphttpsink->queued_buffers = NULL;

  GST_DEBUG_OBJECT (souphttpsink,
      "queue message %" G_GUINT64_FORMAT " %" G_GUINT64_FORMAT,
      souphttpsink->offset, n);
  soup_session_queue_message (souphttpsink->session, souphttpsink->message,
      callback, souphttpsink);

  souphttpsink->offset += n;
}

static gboolean
send_message (GstSoupHttpClientSink * souphttpsink)
{
  g_mutex_lock (&souphttpsink->mutex);
  send_message_locked (souphttpsink);
  if (souphttpsink->timer) {
    g_source_destroy (souphttpsink->timer);
    g_source_unref (souphttpsink->timer);
    souphttpsink->timer = NULL;
  }
  g_mutex_unlock (&souphttpsink->mutex);

  return FALSE;
}

static void
callback (SoupSession * session, SoupMessage * msg, gpointer user_data)
{
  GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (user_data);

  GST_DEBUG_OBJECT (souphttpsink, "callback status=%d %s",
      msg->status_code, msg->reason_phrase);

  g_mutex_lock (&souphttpsink->mutex);
  g_cond_signal (&souphttpsink->cond);
  souphttpsink->message = NULL;

  if (!SOUP_STATUS_IS_SUCCESSFUL (msg->status_code)) {
    souphttpsink->failures++;
    if (souphttpsink->retries &&
        (souphttpsink->retries < 0 ||
            souphttpsink->retries >= souphttpsink->failures)) {
      guint64 retry_delay;
      const char *retry_after =
          soup_message_headers_get_one (msg->response_headers,
          "Retry-After");
      if (retry_after) {
        gchar *end = NULL;
        retry_delay = g_ascii_strtoull (retry_after, &end, 10);
        if (end || errno) {
          retry_delay = souphttpsink->retry_delay;
        } else {
          retry_delay = MAX (retry_delay, souphttpsink->retry_delay);
        }
        GST_WARNING_OBJECT (souphttpsink, "Could not write to HTTP URI: "
            "status: %d %s (retrying PUT after %" G_GINT64_FORMAT
            " seconds with Retry-After: %s)", msg->status_code,
            msg->reason_phrase, retry_delay, retry_after);
      } else {
        retry_delay = souphttpsink->retry_delay;
        GST_WARNING_OBJECT (souphttpsink, "Could not write to HTTP URI: "
            "status: %d %s (retrying PUT after %" G_GINT64_FORMAT
            " seconds)", msg->status_code, msg->reason_phrase, retry_delay);
      }
      souphttpsink->timer = g_timeout_source_new_seconds (retry_delay);
      g_source_set_callback (souphttpsink->timer, (GSourceFunc) (send_message),
          souphttpsink, NULL);
      g_source_attach (souphttpsink->timer, souphttpsink->context);
    } else {
      souphttpsink->status_code = msg->status_code;
      souphttpsink->reason_phrase = g_strdup (msg->reason_phrase);
    }
    g_mutex_unlock (&souphttpsink->mutex);
    return;
  }

  g_list_free_full (souphttpsink->sent_buffers,
      (GDestroyNotify) gst_buffer_unref);
  souphttpsink->sent_buffers = NULL;
  souphttpsink->failures = 0;

  send_message_locked (souphttpsink);
  g_mutex_unlock (&souphttpsink->mutex);
}

static GstFlowReturn
gst_soup_http_client_sink_render (GstBaseSink * sink, GstBuffer * buffer)
{
  GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink);
  GSource *source;
  gboolean wake;

  if (souphttpsink->status_code != 0) {
    GST_ELEMENT_ERROR (souphttpsink, RESOURCE, WRITE,
        ("Could not write to HTTP URI"),
        ("status: %d %s", souphttpsink->status_code,
            souphttpsink->reason_phrase));
    return GST_FLOW_ERROR;
  }

  g_mutex_lock (&souphttpsink->mutex);
  if (souphttpsink->location != NULL) {
    wake = (souphttpsink->queued_buffers == NULL);
    souphttpsink->queued_buffers =
        g_list_append (souphttpsink->queued_buffers, gst_buffer_ref (buffer));

    if (wake) {
      GST_DEBUG_OBJECT (souphttpsink, "setting callback for new buffers");
      source = g_idle_source_new ();
      g_source_set_callback (source, (GSourceFunc) (send_message),
          souphttpsink, NULL);
      g_source_attach (source, souphttpsink->context);
      g_source_unref (source);
    }
  }
  g_mutex_unlock (&souphttpsink->mutex);

  return GST_FLOW_OK;
}

static void
authenticate (SoupSession * session, SoupMessage * msg,
    SoupAuth * auth, gboolean retrying, gpointer user_data)
{
  GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (user_data);

  if (!retrying) {
    /* First time authentication only, if we fail and are called again with retry true fall through */
    if (msg->status_code == SOUP_STATUS_UNAUTHORIZED) {
      if (souphttpsink->user_id && souphttpsink->user_pw)
        soup_auth_authenticate (auth, souphttpsink->user_id,
            souphttpsink->user_pw);
    } else if (msg->status_code == SOUP_STATUS_PROXY_AUTHENTICATION_REQUIRED) {
      if (souphttpsink->proxy_id && souphttpsink->proxy_pw)
        soup_auth_authenticate (auth, souphttpsink->proxy_id,
            souphttpsink->proxy_pw);
    }
  }
}