Blob Blame History Raw
/*
 * Copyright (C) 2011 Colin Walters <walters@verbum.org>
 *
 * SPDX-License-Identifier: LGPL-2.0+
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 2 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this library; if not, write to the
 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
 * Boston, MA 02111-1307, USA.
 *
 * Author: Colin Walters <walters@verbum.org>
 */

#include "config.h"

#include <gio/gio.h>
#include <gio/gfiledescriptorbased.h>
#include <gio/gunixoutputstream.h>
#define LIBSOUP_USE_UNSTABLE_REQUEST_API
#include <libsoup/soup.h>
#include <libsoup/soup-requester.h>
#include <libsoup/soup-request-http.h>

#include "libglnx.h"
#include "ostree-fetcher.h"
#include "ostree-fetcher-util.h"
#ifdef HAVE_LIBSOUP_CLIENT_CERTS
#include "ostree-tls-cert-interaction.h"
#endif
#include "ostree-enumtypes.h"
#include "ostree.h"
#include "ostree-repo-private.h"
#include "otutil.h"

typedef enum {
  OSTREE_FETCHER_STATE_PENDING,
  OSTREE_FETCHER_STATE_DOWNLOADING,
  OSTREE_FETCHER_STATE_COMPLETE
} OstreeFetcherState;

typedef struct {
  volatile int ref_count;

  SoupSession *session;  /* not referenced */
  GMainContext *main_context;
  volatile gint running;
  GError *initialization_error; /* Any failure to load the db */

  char *remote_name;
  int base_tmpdir_dfd;

  GVariant *extra_headers;
  gboolean transfer_gzip;

  /* Our active HTTP requests */
  GHashTable *outstanding;

  /* Shared across threads; be sure to lock. */
  GHashTable *output_stream_set;  /* set<GOutputStream> */
  GMutex output_stream_set_lock;

  /* Also protected by output_stream_set_lock. */
  guint64 total_downloaded;

  GError *oob_error;

} ThreadClosure;

typedef struct {
  volatile int ref_count;

  ThreadClosure *thread_closure;
  GPtrArray *mirrorlist; /* list of base URIs */
  char *filename; /* relative name to fetch or NULL */
  guint mirrorlist_idx;

  OstreeFetcherState state;

  SoupRequest *request;

  gboolean is_membuf;
  OstreeFetcherRequestFlags flags;
  GInputStream *request_body;
  GLnxTmpfile tmpf;
  GOutputStream *out_stream;

  guint64 max_size;
  guint64 current_size;
  guint64 content_length;
} OstreeFetcherPendingURI;

/* Used by session_thread_idle_add() */
typedef void (*SessionThreadFunc) (ThreadClosure *thread_closure,
                                   gpointer data);

/* Used by session_thread_idle_add() */
typedef struct {
  ThreadClosure *thread_closure;
  SessionThreadFunc function;
  gpointer data;
  GDestroyNotify notify;
} IdleClosure;

struct OstreeFetcher
{
  GObject parent_instance;

  OstreeFetcherConfigFlags config_flags;

  GThread *session_thread;
  ThreadClosure *thread_closure;
};

enum {
  PROP_0,
  PROP_CONFIG_FLAGS
};

G_DEFINE_TYPE (OstreeFetcher, _ostree_fetcher, G_TYPE_OBJECT)

static ThreadClosure *
thread_closure_ref (ThreadClosure *thread_closure)
{
  int refcount;
  g_return_val_if_fail (thread_closure != NULL, NULL);
  refcount = g_atomic_int_add (&thread_closure->ref_count, 1);
  g_assert (refcount > 0);
  return thread_closure;
}

static void
thread_closure_unref (ThreadClosure *thread_closure)
{
  g_return_if_fail (thread_closure != NULL);

  if (g_atomic_int_dec_and_test (&thread_closure->ref_count))
    {
      /* The session thread should have cleared this by now. */
      g_assert (thread_closure->session == NULL);

      g_clear_pointer (&thread_closure->main_context, g_main_context_unref);

      g_clear_pointer (&thread_closure->extra_headers, (GDestroyNotify)g_variant_unref);

      g_clear_pointer (&thread_closure->output_stream_set, g_hash_table_unref);
      g_mutex_clear (&thread_closure->output_stream_set_lock);

      g_clear_pointer (&thread_closure->oob_error, g_error_free);

      g_free (thread_closure->remote_name);

      g_slice_free (ThreadClosure, thread_closure);
    }
}

static void
idle_closure_free (IdleClosure *idle_closure)
{
  g_clear_pointer (&idle_closure->thread_closure, thread_closure_unref);

  if (idle_closure->notify != NULL)
    idle_closure->notify (idle_closure->data);

  g_slice_free (IdleClosure, idle_closure);
}

static OstreeFetcherPendingURI *
pending_uri_ref (OstreeFetcherPendingURI *pending)
{
  gint refcount;
  g_return_val_if_fail (pending != NULL, NULL);
  refcount = g_atomic_int_add (&pending->ref_count, 1);
  g_assert (refcount > 0);
  return pending;
}

static void
pending_uri_unref (OstreeFetcherPendingURI *pending)
{
  if (!g_atomic_int_dec_and_test (&pending->ref_count))
    return;

  g_clear_pointer (&pending->thread_closure, thread_closure_unref);

  g_clear_pointer (&pending->mirrorlist, g_ptr_array_unref);
  g_free (pending->filename);
  g_clear_object (&pending->request);
  g_clear_object (&pending->request_body);
  glnx_tmpfile_clear (&pending->tmpf);
  g_clear_object (&pending->out_stream);
  g_free (pending);
}

static gboolean
session_thread_idle_dispatch (gpointer data)
{
  IdleClosure *idle_closure = data;

  idle_closure->function (idle_closure->thread_closure,
                          idle_closure->data);

  return G_SOURCE_REMOVE;
}

static void
session_thread_idle_add (ThreadClosure *thread_closure,
                         SessionThreadFunc function,
                         gpointer data,
                         GDestroyNotify notify)
{
  IdleClosure *idle_closure;

  g_return_if_fail (thread_closure != NULL);
  g_return_if_fail (function != NULL);

  idle_closure = g_slice_new (IdleClosure);
  idle_closure->thread_closure = thread_closure_ref (thread_closure);
  idle_closure->function = function;
  idle_closure->data = data;
  idle_closure->notify = notify;

  g_main_context_invoke_full (thread_closure->main_context,
                              G_PRIORITY_DEFAULT,
                              session_thread_idle_dispatch,
                              idle_closure,  /* takes ownership */
                              (GDestroyNotify) idle_closure_free);
}

static void
session_thread_add_logger (ThreadClosure *thread_closure,
                           gpointer data)
{
  glnx_unref_object SoupLogger *logger = NULL;

  logger = soup_logger_new (SOUP_LOGGER_LOG_BODY, 500);
  soup_session_add_feature (thread_closure->session,
                            SOUP_SESSION_FEATURE (logger));
}

static void
session_thread_config_flags (ThreadClosure *thread_closure,
                             gpointer data)
{
  OstreeFetcherConfigFlags config_flags;

  config_flags = GPOINTER_TO_UINT (data);

  if ((config_flags & OSTREE_FETCHER_FLAGS_TLS_PERMISSIVE) > 0)
    {
      g_object_set (thread_closure->session,
                    SOUP_SESSION_SSL_STRICT,
                    FALSE, NULL);
    }
}

static void
on_authenticate (SoupSession *session, SoupMessage *msg, SoupAuth *auth,
                 gboolean retrying, gpointer user_data)
{
  ThreadClosure *thread_closure = user_data;

  if (msg->status_code == SOUP_STATUS_PROXY_UNAUTHORIZED)
    {
      SoupURI *uri = NULL;
      g_object_get (session, SOUP_SESSION_PROXY_URI, &uri, NULL);
      if (retrying)
        {
          g_autofree char *s = soup_uri_to_string (uri, FALSE);
          g_set_error (&thread_closure->oob_error,
                       G_IO_ERROR, G_IO_ERROR_PROXY_AUTH_FAILED,
                       "Invalid username or password for proxy '%s'", s);
        }
      else
        soup_auth_authenticate (auth, soup_uri_get_user (uri),
                                      soup_uri_get_password (uri));
    }
}

static void
session_thread_set_proxy_cb (ThreadClosure *thread_closure,
                             gpointer data)
{
  SoupURI *proxy_uri = data;

  g_object_set (thread_closure->session,
                SOUP_SESSION_PROXY_URI,
                proxy_uri, NULL);

  /* libsoup won't necessarily pass any embedded username and password to proxy
   * requests, so we have to be ready to handle 407 and handle them ourselves.
   * See also: https://bugzilla.gnome.org/show_bug.cgi?id=772932
   * */
  if (soup_uri_get_user (proxy_uri) &&
      soup_uri_get_password (proxy_uri))
    {
      g_signal_connect (thread_closure->session, "authenticate",
                        G_CALLBACK (on_authenticate), thread_closure);
    }
}

static void
session_thread_set_cookie_jar_cb (ThreadClosure *thread_closure,
                                  gpointer data)
{
  SoupCookieJar *jar = data;

  soup_session_add_feature (thread_closure->session,
                            SOUP_SESSION_FEATURE (jar));
}

static void
session_thread_set_headers_cb (ThreadClosure *thread_closure,
                               gpointer data)
{
  GVariant *headers = data;

  g_clear_pointer (&thread_closure->extra_headers, (GDestroyNotify)g_variant_unref);
  thread_closure->extra_headers = g_variant_ref (headers);
}

#ifdef HAVE_LIBSOUP_CLIENT_CERTS
static void
session_thread_set_tls_interaction_cb (ThreadClosure *thread_closure,
                                       gpointer data)
{
  const char *cert_and_key_path = data; /* str\0str\0 in one malloc buf */
  const char *cert_path = cert_and_key_path;
  const char *key_path = cert_and_key_path + strlen (cert_and_key_path) + 1;
  g_autoptr(OstreeTlsCertInteraction) interaction = NULL;

  /* The GTlsInteraction instance must be created in the
   * session thread so it uses the correct GMainContext. */
  interaction = _ostree_tls_cert_interaction_new (cert_path, key_path);

  g_object_set (thread_closure->session,
                SOUP_SESSION_TLS_INTERACTION,
                interaction, NULL);
}
#endif

static void
session_thread_set_tls_database_cb (ThreadClosure *thread_closure,
                                    gpointer data)
{
  const char *db_path = data;

  if (db_path != NULL)
    {
      glnx_unref_object GTlsDatabase *tlsdb = NULL;

      g_clear_error (&thread_closure->initialization_error);
      tlsdb = g_tls_file_database_new (db_path, &thread_closure->initialization_error);

      if (tlsdb)
        g_object_set (thread_closure->session,
                      SOUP_SESSION_TLS_DATABASE,
                      tlsdb, NULL);
    }
  else
    {
      g_object_set (thread_closure->session,
                    SOUP_SESSION_SSL_USE_SYSTEM_CA_FILE,
                    TRUE, NULL);
    }
}

static void
session_thread_set_extra_user_agent_cb (ThreadClosure *thread_closure,
                                        gpointer data)
{
  const char *extra_user_agent = data;
  if (extra_user_agent != NULL)
    {
      g_autofree char *ua =
        g_strdup_printf ("%s %s", OSTREE_FETCHER_USERAGENT_STRING, extra_user_agent);
      g_object_set (thread_closure->session, SOUP_SESSION_USER_AGENT, ua, NULL);
    }
  else
    {
      g_object_set (thread_closure->session, SOUP_SESSION_USER_AGENT,
                    OSTREE_FETCHER_USERAGENT_STRING, NULL);
    }
}

static void
on_request_sent (GObject        *object, GAsyncResult   *result, gpointer        user_data);

static void
start_pending_request (ThreadClosure *thread_closure,
                       GTask         *task)
{

  OstreeFetcherPendingURI *pending;
  GCancellable *cancellable;

  pending = g_task_get_task_data (task);
  cancellable = g_task_get_cancellable (task);

  g_hash_table_add (thread_closure->outstanding, pending_uri_ref (pending));
  soup_request_send_async (pending->request,
                           cancellable,
                           on_request_sent,
                           g_object_ref (task));
}

static void
create_pending_soup_request (OstreeFetcherPendingURI  *pending,
                             GError                  **error)
{
  OstreeFetcherURI *next_mirror = NULL;
  g_autoptr(OstreeFetcherURI) uri = NULL;

  g_assert (pending->mirrorlist);
  g_assert (pending->mirrorlist_idx < pending->mirrorlist->len);

  next_mirror = g_ptr_array_index (pending->mirrorlist, pending->mirrorlist_idx);
  if (pending->filename)
    uri = _ostree_fetcher_uri_new_subpath (next_mirror, pending->filename);

  g_clear_object (&pending->request);

  pending->request = soup_session_request_uri (pending->thread_closure->session,
                                               (SoupURI*)(uri ? uri : next_mirror), error);
}

static void
session_thread_request_uri (ThreadClosure *thread_closure,
                            gpointer data)
{
  GTask *task = G_TASK (data);
  OstreeFetcherPendingURI *pending;
  GCancellable *cancellable;
  GError *local_error = NULL;

  pending = g_task_get_task_data (task);
  cancellable = g_task_get_cancellable (task);

  /* If we caught an error in init, re-throw it for every request */
  if (thread_closure->initialization_error)
    {
      g_task_return_error (task, g_error_copy (thread_closure->initialization_error));
      return;
    }

  create_pending_soup_request (pending, &local_error);
  if (local_error != NULL)
    {
      g_task_return_error (task, local_error);
      return;
    }

  if (SOUP_IS_REQUEST_HTTP (pending->request) && thread_closure->extra_headers)
    {
      glnx_unref_object SoupMessage *msg = soup_request_http_get_message ((SoupRequestHTTP*) pending->request);
      g_autoptr(GVariantIter) viter = g_variant_iter_new (thread_closure->extra_headers);
      const char *key;
      const char *value;

      while (g_variant_iter_next (viter, "(&s&s)", &key, &value))
        soup_message_headers_append (msg->request_headers, key, value);
    }

  if (pending->is_membuf)
    {
      soup_request_send_async (pending->request,
                               cancellable,
                               on_request_sent,
                               g_object_ref (task));
    }
  else
    {
     start_pending_request (thread_closure, task);
    }
}

static gpointer
ostree_fetcher_session_thread (gpointer data)
{
  ThreadClosure *closure = data;
  g_autoptr(GMainContext) mainctx = g_main_context_ref (closure->main_context);

  /* This becomes the GMainContext that SoupSession schedules async
   * callbacks and emits signals from.  Make it the thread-default
   * context for this thread before creating the session. */
  g_main_context_push_thread_default (mainctx);

  /* We retain ownership of the SoupSession reference. */
  closure->session = soup_session_async_new_with_options (SOUP_SESSION_USER_AGENT, OSTREE_FETCHER_USERAGENT_STRING,
                                                          SOUP_SESSION_SSL_USE_SYSTEM_CA_FILE, TRUE,
                                                          SOUP_SESSION_USE_THREAD_CONTEXT, TRUE,
                                                          SOUP_SESSION_ADD_FEATURE_BY_TYPE, SOUP_TYPE_REQUESTER,
                                                          SOUP_SESSION_TIMEOUT, 60,
                                                          SOUP_SESSION_IDLE_TIMEOUT, 60,
                                                          NULL);

  if (closure->transfer_gzip)
    soup_session_add_feature_by_type (closure->session, SOUP_TYPE_CONTENT_DECODER);

  /* XXX: Now that we have mirrorlist support, we could make this even smarter
   * by spreading requests across mirrors. */
  gint max_conns;
  g_object_get (closure->session, "max-conns-per-host", &max_conns, NULL);
  if (max_conns < _OSTREE_MAX_OUTSTANDING_FETCHER_REQUESTS)
    {
      /* We download a lot of small objects in ostree, so this
       * helps a lot.  Also matches what most modern browsers do.
       *
       * Note since https://github.com/ostreedev/ostree/commit/f4d1334e19ce3ab2f8872b1e28da52044f559401
       * we don't do queuing in this libsoup backend, but we still
       * want to override libsoup's currently conservative
       * #define SOUP_SESSION_MAX_CONNS_PER_HOST_DEFAULT 2 (as of 2018-02-14).
       */
      max_conns = _OSTREE_MAX_OUTSTANDING_FETCHER_REQUESTS;
      g_object_set (closure->session,
                    "max-conns-per-host",
                    max_conns, NULL);
    }

  /* This model ensures we don't hit a race using g_main_loop_quit();
   * see also what pull_termination_condition() in ostree-repo-pull.c
   * is doing.
   */
  while (g_atomic_int_get (&closure->running))
    g_main_context_iteration (closure->main_context, TRUE);

  /* Since the ThreadClosure may be finalized from any thread we
   * unreference all data related to the SoupSession ourself to ensure
   * it's freed in the same thread where it was created. */
  g_clear_pointer (&closure->outstanding, g_hash_table_unref);
  g_clear_pointer (&closure->session, g_object_unref);

  thread_closure_unref (closure);

  /* Do this last, since libsoup uses g_main_current_source() which
   * relies on it.
   */
  g_main_context_pop_thread_default (mainctx);

  return NULL;
}

static void
_ostree_fetcher_set_property (GObject      *object,
                              guint         prop_id,
                              const GValue *value,
                              GParamSpec   *pspec)
{
  OstreeFetcher *self = OSTREE_FETCHER (object);

  switch (prop_id)
    {
      case PROP_CONFIG_FLAGS:
        self->config_flags = g_value_get_flags (value);
        break;
      default:
        G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
        break;
    }
}

static void
_ostree_fetcher_get_property (GObject    *object,
                              guint       prop_id,
                              GValue     *value,
                              GParamSpec *pspec)
{
  OstreeFetcher *self = OSTREE_FETCHER (object);

  switch (prop_id)
    {
      case PROP_CONFIG_FLAGS:
        g_value_set_flags (value, self->config_flags);
        break;
      default:
        G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
        break;
    }
}

static void
_ostree_fetcher_finalize (GObject *object)
{
  OstreeFetcher *self = OSTREE_FETCHER (object);

  /* Terminate the session thread. */
  g_atomic_int_set (&self->thread_closure->running, 0);
  g_main_context_wakeup (self->thread_closure->main_context);
  if (self->session_thread)
    {
      /* We need to explicitly synchronize to clean up TLS */
      if (self->session_thread != g_thread_self ())
        g_thread_join (self->session_thread);
      else
        g_clear_pointer (&self->session_thread, g_thread_unref);
    }
  g_clear_pointer (&self->thread_closure, thread_closure_unref);

  G_OBJECT_CLASS (_ostree_fetcher_parent_class)->finalize (object);
}

static void
_ostree_fetcher_constructed (GObject *object)
{
  OstreeFetcher *self = OSTREE_FETCHER (object);
  g_autoptr(GMainContext) main_context = NULL;
  const char *http_proxy;

  main_context = g_main_context_new ();

  self->thread_closure = g_slice_new0 (ThreadClosure);
  self->thread_closure->ref_count = 1;
  self->thread_closure->main_context = g_main_context_ref (main_context);
  self->thread_closure->running = 1;
  self->thread_closure->transfer_gzip = (self->config_flags & OSTREE_FETCHER_FLAGS_TRANSFER_GZIP) != 0;

  self->thread_closure->outstanding = g_hash_table_new_full (NULL, NULL, NULL, (GDestroyNotify)pending_uri_unref);
  self->thread_closure->output_stream_set = g_hash_table_new_full (NULL, NULL,
                                                                   (GDestroyNotify) NULL,
                                                                   (GDestroyNotify) g_object_unref);
  g_mutex_init (&self->thread_closure->output_stream_set_lock);

  if (g_getenv ("OSTREE_DEBUG_HTTP"))
    {
      session_thread_idle_add (self->thread_closure,
                               session_thread_add_logger,
                               NULL, (GDestroyNotify) NULL);
    }

  if (self->config_flags != 0)
    {
      session_thread_idle_add (self->thread_closure,
                               session_thread_config_flags,
                               GUINT_TO_POINTER (self->config_flags),
                               (GDestroyNotify) NULL);
    }

  http_proxy = g_getenv ("http_proxy");
  if (http_proxy != NULL && http_proxy[0] != '\0')
    _ostree_fetcher_set_proxy (self, http_proxy);

  /* FIXME Maybe implement GInitableIface and use g_thread_try_new()
   *       so we can try to handle thread creation errors gracefully? */
  self->session_thread = g_thread_new ("fetcher-session-thread",
                                       ostree_fetcher_session_thread,
                                       thread_closure_ref (self->thread_closure));

  G_OBJECT_CLASS (_ostree_fetcher_parent_class)->constructed (object);
}

static void
_ostree_fetcher_class_init (OstreeFetcherClass *klass)
{
  GObjectClass *gobject_class = G_OBJECT_CLASS (klass);

  gobject_class->set_property = _ostree_fetcher_set_property;
  gobject_class->get_property = _ostree_fetcher_get_property;
  gobject_class->finalize = _ostree_fetcher_finalize;
  gobject_class->constructed = _ostree_fetcher_constructed;

  g_object_class_install_property (gobject_class,
                                   PROP_CONFIG_FLAGS,
                                   g_param_spec_flags ("config-flags",
                                                       "",
                                                       "",
                                                       OSTREE_TYPE_FETCHER_CONFIG_FLAGS,
                                                       OSTREE_FETCHER_FLAGS_NONE,
                                                       G_PARAM_READWRITE |
                                                       G_PARAM_CONSTRUCT_ONLY |
                                                       G_PARAM_STATIC_STRINGS));
}

static void
_ostree_fetcher_init (OstreeFetcher *self)
{
}

OstreeFetcher *
_ostree_fetcher_new (int                      tmpdir_dfd,
                     const char              *remote_name,
                     OstreeFetcherConfigFlags flags)
{
  OstreeFetcher *self;

  self = g_object_new (OSTREE_TYPE_FETCHER, "config-flags", flags, NULL);
  self->thread_closure->remote_name = g_strdup (remote_name);
  self->thread_closure->base_tmpdir_dfd = tmpdir_dfd;

  return self;
}

int
_ostree_fetcher_get_dfd (OstreeFetcher *fetcher)
{
  return fetcher->thread_closure->base_tmpdir_dfd;
}

void
_ostree_fetcher_set_proxy (OstreeFetcher *self,
                           const char    *http_proxy)
{
  SoupURI *proxy_uri;

  g_return_if_fail (OSTREE_IS_FETCHER (self));
  g_return_if_fail (http_proxy != NULL && http_proxy[0] != '\0');

  proxy_uri = soup_uri_new (http_proxy);

  if (!proxy_uri)
    {
      g_warning ("Invalid proxy URI '%s'", http_proxy);
    }
  else
    {
      session_thread_idle_add (self->thread_closure,
                               session_thread_set_proxy_cb,
                               proxy_uri,  /* takes ownership */
                               (GDestroyNotify) soup_uri_free);
    }
}

void
_ostree_fetcher_set_cookie_jar (OstreeFetcher *self,
                                const char    *jar_path)
{
  SoupCookieJar *jar;

  g_return_if_fail (OSTREE_IS_FETCHER (self));
  g_return_if_fail (jar_path != NULL);

  jar = soup_cookie_jar_text_new (jar_path, TRUE);

  session_thread_idle_add (self->thread_closure,
                           session_thread_set_cookie_jar_cb,
                           jar,  /* takes ownership */
                           (GDestroyNotify) g_object_unref);
}

void
_ostree_fetcher_set_client_cert (OstreeFetcher   *self,
                                 const char      *cert_path,
                                 const char      *key_path)
{
  g_autoptr(GString) buf = NULL;
  g_return_if_fail (OSTREE_IS_FETCHER (self));

  if (cert_path)
    {
      buf = g_string_new (cert_path);
      g_string_append_c (buf, '\0');
      g_string_append (buf, key_path);
    }

#ifdef HAVE_LIBSOUP_CLIENT_CERTS
  session_thread_idle_add (self->thread_closure,
                           session_thread_set_tls_interaction_cb,
                           g_string_free (g_steal_pointer (&buf), FALSE),
                           (GDestroyNotify) g_free);
#else
  g_warning ("This version of OSTree is compiled without client side certificate support");
#endif
}

void
_ostree_fetcher_set_tls_database (OstreeFetcher *self,
                                  const char    *tlsdb_path)
{
  g_return_if_fail (OSTREE_IS_FETCHER (self));

  session_thread_idle_add (self->thread_closure,
                           session_thread_set_tls_database_cb,
                           g_strdup (tlsdb_path),
                           (GDestroyNotify) g_free);
}

void
_ostree_fetcher_set_extra_headers (OstreeFetcher *self,
                                   GVariant      *extra_headers)
{
  session_thread_idle_add (self->thread_closure,
                           session_thread_set_headers_cb,
                           g_variant_ref (extra_headers),
                           (GDestroyNotify) g_variant_unref);
}

void
_ostree_fetcher_set_extra_user_agent (OstreeFetcher *self,
                                      const char    *extra_user_agent)
{
  session_thread_idle_add (self->thread_closure,
                           session_thread_set_extra_user_agent_cb,
                           g_strdup (extra_user_agent),
                           (GDestroyNotify) g_free);
}

static gboolean
finish_stream (OstreeFetcherPendingURI *pending,
               GCancellable            *cancellable,
               GError                 **error)
{
  gboolean ret = FALSE;
  struct stat stbuf;

  /* Close it here since we do an async fstat(), where we don't want
   * to hit a bad fd.
   */
  if (pending->out_stream)
    {
      if ((pending->flags & OSTREE_FETCHER_REQUEST_NUL_TERMINATION) > 0)
        {
          const guint8 nulchar = 0;
          gsize bytes_written;

          if (!g_output_stream_write_all (pending->out_stream, &nulchar, 1, &bytes_written,
                                          cancellable, error))
            goto out;
        }

      if (!g_output_stream_close (pending->out_stream, cancellable, error))
        goto out;

      g_mutex_lock (&pending->thread_closure->output_stream_set_lock);
      g_hash_table_remove (pending->thread_closure->output_stream_set,
                           pending->out_stream);
      g_mutex_unlock (&pending->thread_closure->output_stream_set_lock);
    }

  if (!pending->is_membuf)
    {
      if (!glnx_fstat (pending->tmpf.fd, &stbuf, error))
        goto out;
    }

  pending->state = OSTREE_FETCHER_STATE_COMPLETE;

  if (!pending->is_membuf)
    {
      if (stbuf.st_size < pending->content_length)
        {
          g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, "Download incomplete");
          goto out;
        }
      else
        {
          g_mutex_lock (&pending->thread_closure->output_stream_set_lock);
          pending->thread_closure->total_downloaded += stbuf.st_size;
          g_mutex_unlock (&pending->thread_closure->output_stream_set_lock);
        }
    }

  ret = TRUE;
 out:
  (void) g_input_stream_close (pending->request_body, NULL, NULL);
  return ret;
}

static void
on_stream_read (GObject        *object,
                GAsyncResult   *result,
                gpointer        user_data);

static void
remove_pending (OstreeFetcherPendingURI *pending)
{
  /* Hold a temporary ref to ensure the reference to
   * pending->thread_closure is valid.
   */
  pending_uri_ref (pending);
  g_hash_table_remove (pending->thread_closure->outstanding, pending);
  pending_uri_unref (pending);
}

static void
on_out_splice_complete (GObject        *object,
                        GAsyncResult   *result,
                        gpointer        user_data) 
{
  GTask *task = G_TASK (user_data);
  OstreeFetcherPendingURI *pending;
  GCancellable *cancellable;
  gssize bytes_written;
  GError *local_error = NULL;

  pending = g_task_get_task_data (task);
  cancellable = g_task_get_cancellable (task);

  bytes_written = g_output_stream_splice_finish ((GOutputStream *)object,
                                                 result,
                                                 &local_error);
  if (bytes_written < 0)
    goto out;

  g_input_stream_read_bytes_async (pending->request_body,
                                   8192, G_PRIORITY_DEFAULT,
                                   cancellable,
                                   on_stream_read,
                                   g_object_ref (task));

 out:
  if (local_error)
    {
      g_task_return_error (task, local_error);
      remove_pending (pending);
    }

  g_object_unref (task);
}

static void
on_stream_read (GObject        *object,
                GAsyncResult   *result,
                gpointer        user_data) 
{
  GTask *task = G_TASK (user_data);
  OstreeFetcherPendingURI *pending;
  GCancellable *cancellable;
  g_autoptr(GBytes) bytes = NULL;
  gsize bytes_read;
  GError *local_error = NULL;

  pending = g_task_get_task_data (task);
  cancellable = g_task_get_cancellable (task);

  /* Only open the output stream on demand to ensure we use as
   * few file descriptors as possible.
   */
  if (!pending->out_stream)
    {
      if (!pending->is_membuf)
        {
          if (!_ostree_fetcher_tmpf_from_flags (pending->flags, pending->thread_closure->base_tmpdir_dfd,
                                                &pending->tmpf, &local_error))
            goto out;
          pending->out_stream = g_unix_output_stream_new (pending->tmpf.fd, FALSE);
        }
      else
        {
          pending->out_stream = g_memory_output_stream_new_resizable ();
        }

      g_mutex_lock (&pending->thread_closure->output_stream_set_lock);
      g_hash_table_add (pending->thread_closure->output_stream_set,
                        g_object_ref (pending->out_stream));
      g_mutex_unlock (&pending->thread_closure->output_stream_set_lock);
    }

  /* Get a GBytes buffer */
  bytes = g_input_stream_read_bytes_finish ((GInputStream*)object, result, &local_error);
  if (!bytes)
    goto out;
  bytes_read = g_bytes_get_size (bytes);

  /* Was this the end of the stream? */
  if (bytes_read == 0)
    {
      if (!finish_stream (pending, cancellable, &local_error))
        goto out;
      if (pending->is_membuf)
        {
          g_task_return_pointer (task,
                                 g_memory_output_stream_steal_as_bytes ((GMemoryOutputStream*)pending->out_stream),
                                 (GDestroyNotify) g_bytes_unref);
        }
      else
        {
          if (lseek (pending->tmpf.fd, 0, SEEK_SET) < 0)
            {
              glnx_set_error_from_errno (&local_error);
              g_task_return_error (task, g_steal_pointer (&local_error));
            }
          else
            g_task_return_boolean (task, TRUE);
        }
      remove_pending (pending);
    }
  else
    {
      /* Verify max size */
      if (pending->max_size > 0)
        {
          if (bytes_read > pending->max_size ||
              (bytes_read + pending->current_size) > pending->max_size)
            {
              g_autofree char *uristr =
                soup_uri_to_string (soup_request_get_uri (pending->request), FALSE);
              local_error = g_error_new (G_IO_ERROR, G_IO_ERROR_FAILED,
                                         "URI %s exceeded maximum size of %" G_GUINT64_FORMAT " bytes",
                                         uristr, pending->max_size);
              goto out;
            }
        }

      pending->current_size += bytes_read;

      /* We do this instead of _write_bytes_async() as that's not
       * guaranteed to do a complete write.
       */
      {
        g_autoptr(GInputStream) membuf =
          g_memory_input_stream_new_from_bytes (bytes);
        g_output_stream_splice_async (pending->out_stream, membuf,
                                      G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE,
                                      G_PRIORITY_DEFAULT,
                                      cancellable,
                                      on_out_splice_complete,
                                      g_object_ref (task));
      }
    }

 out:
  if (local_error)
    {
      g_task_return_error (task, local_error);
      remove_pending (pending);
    }

  g_object_unref (task);
}

static void
on_request_sent (GObject        *object,
                 GAsyncResult   *result,
                 gpointer        user_data) 
{
  GTask *task = G_TASK (user_data);
  /* Hold a ref to the pending across this function, since we remove
   * it from the hash early in some cases, not in others. */
  OstreeFetcherPendingURI *pending = pending_uri_ref (g_task_get_task_data (task));
  GCancellable *cancellable = g_task_get_cancellable (task);
  GError *local_error = NULL;
  glnx_unref_object SoupMessage *msg = NULL;

  pending->state = OSTREE_FETCHER_STATE_COMPLETE;
  pending->request_body = soup_request_send_finish ((SoupRequest*) object,
                                                   result, &local_error);

  if (!pending->request_body)
    goto out;
  g_assert_no_error (local_error);

  if (SOUP_IS_REQUEST_HTTP (object))
    {
      msg = soup_request_http_get_message ((SoupRequestHTTP*) object);
      if (!SOUP_STATUS_IS_SUCCESSFUL (msg->status_code))
        {
          /* is there another mirror we can try? */
          if (pending->mirrorlist_idx + 1 < pending->mirrorlist->len)
            {
              pending->mirrorlist_idx++;
              create_pending_soup_request (pending, &local_error);
              if (local_error != NULL)
                goto out;

              (void) g_input_stream_close (pending->request_body, NULL, NULL);

              start_pending_request (pending->thread_closure, task);
            }
          else
            {
              g_autofree char *uristring =
                soup_uri_to_string (soup_request_get_uri (pending->request), FALSE);

              GIOErrorEnum code;

              switch (msg->status_code)
                {
                /* These statuses are internal to libsoup, and not standard HTTP ones: */
                case SOUP_STATUS_CANCELLED:
                  code = G_IO_ERROR_CANCELLED;
                  break;
                case SOUP_STATUS_CANT_RESOLVE:
                case SOUP_STATUS_CANT_CONNECT:
                  code = G_IO_ERROR_HOST_NOT_FOUND;
                  break;
                case SOUP_STATUS_IO_ERROR:
#if !GLIB_CHECK_VERSION(2, 44, 0)
                  code = G_IO_ERROR_BROKEN_PIPE;
#else
                  code = G_IO_ERROR_CONNECTION_CLOSED;
#endif
                  break;
                default:
                  code = _ostree_fetcher_http_status_code_to_io_error (msg->status_code);
                  break;
                }

              {
                g_autofree char *errmsg =
                  g_strdup_printf ("Server returned status %u: %s",
                                   msg->status_code,
                                   soup_status_get_phrase (msg->status_code));

                /* Let's make OOB errors be the final one since they're probably
                 * the cause for the error here. */
                if (pending->thread_closure->oob_error)
                  {
                    local_error =
                      g_error_copy (pending->thread_closure->oob_error);
                    g_prefix_error (&local_error, "%s: ", errmsg);
                  }
                else
                  local_error = g_error_new_literal (G_IO_ERROR, code, errmsg);
              }

              if (pending->mirrorlist->len > 1)
                g_prefix_error (&local_error,
                                "All %u mirrors failed. Last error was: ",
                                pending->mirrorlist->len);
              if (pending->thread_closure->remote_name &&
                  !((pending->flags & OSTREE_FETCHER_REQUEST_OPTIONAL_CONTENT) > 0 &&
                    code == G_IO_ERROR_NOT_FOUND))
                _ostree_fetcher_journal_failure (pending->thread_closure->remote_name,
                                                 uristring, local_error->message);

            }
          goto out;
        }
    }

  pending->state = OSTREE_FETCHER_STATE_DOWNLOADING;
  
  pending->content_length = soup_request_get_content_length (pending->request);

  g_input_stream_read_bytes_async (pending->request_body,
                                   8192, G_PRIORITY_DEFAULT,
                                   cancellable,
                                   on_stream_read,
                                   g_object_ref (task));

 out:
  if (local_error)
    {
      if (pending->request_body)
        (void) g_input_stream_close (pending->request_body, NULL, NULL);
      g_task_return_error (task, local_error);
      remove_pending (pending);
    }

  pending_uri_unref (pending);
  g_object_unref (task);
}

static void
_ostree_fetcher_request_async (OstreeFetcher         *self,
                               GPtrArray             *mirrorlist,
                               const char            *filename,
                               OstreeFetcherRequestFlags flags,
                               gboolean               is_membuf,
                               guint64                max_size,
                               int                    priority,
                               GCancellable          *cancellable,
                               GAsyncReadyCallback    callback,
                               gpointer               user_data)
{
  g_autoptr(GTask) task = NULL;
  OstreeFetcherPendingURI *pending;

  g_return_if_fail (OSTREE_IS_FETCHER (self));
  g_return_if_fail (mirrorlist != NULL);
  g_return_if_fail (mirrorlist->len > 0);

  /* SoupRequest is created in session thread. */
  pending = g_new0 (OstreeFetcherPendingURI, 1);
  pending->ref_count = 1;
  pending->thread_closure = thread_closure_ref (self->thread_closure);
  pending->mirrorlist = g_ptr_array_ref (mirrorlist);
  pending->filename = g_strdup (filename);
  pending->flags = flags;
  pending->max_size = max_size;
  pending->is_membuf = is_membuf;

  task = g_task_new (self, cancellable, callback, user_data);
  g_task_set_source_tag (task, _ostree_fetcher_request_async);
  g_task_set_task_data (task, pending, (GDestroyNotify) pending_uri_unref);

  /* We'll use the GTask priority for our own priority queue. */
  g_task_set_priority (task, priority);

  session_thread_idle_add (self->thread_closure,
                           session_thread_request_uri,
                           g_object_ref (task),
                           (GDestroyNotify) g_object_unref);
}

void
_ostree_fetcher_request_to_tmpfile (OstreeFetcher         *self,
                                    GPtrArray             *mirrorlist,
                                    const char            *filename,
                                    OstreeFetcherRequestFlags flags,
                                    guint64                max_size,
                                    int                    priority,
                                    GCancellable          *cancellable,
                                    GAsyncReadyCallback    callback,
                                    gpointer               user_data)
{
  _ostree_fetcher_request_async (self, mirrorlist, filename, flags, FALSE,
                                 max_size, priority, cancellable,
                                 callback, user_data);
}

gboolean
_ostree_fetcher_request_to_tmpfile_finish (OstreeFetcher *self,
                                           GAsyncResult  *result,
                                           GLnxTmpfile   *out_tmpf,
                                           GError       **error)
{
  GTask *task;
  OstreeFetcherPendingURI *pending;
  gpointer ret;

  g_return_val_if_fail (g_task_is_valid (result, self), FALSE);
  g_return_val_if_fail (g_async_result_is_tagged (result, _ostree_fetcher_request_async), FALSE);

  task = (GTask*)result;
  pending = g_task_get_task_data (task);

  ret = g_task_propagate_pointer (task, error);
  if (!ret)
    return FALSE;

  g_assert (!pending->is_membuf);
  *out_tmpf = pending->tmpf;
  pending->tmpf.initialized = FALSE; /* Transfer ownership */

  return TRUE;
}

void
_ostree_fetcher_request_to_membuf (OstreeFetcher         *self,
                                   GPtrArray             *mirrorlist,
                                   const char            *filename,
                                   OstreeFetcherRequestFlags flags,
                                   guint64                max_size,
                                   int                    priority,
                                   GCancellable          *cancellable,
                                   GAsyncReadyCallback    callback,
                                   gpointer               user_data)
{
  _ostree_fetcher_request_async (self, mirrorlist, filename, flags, TRUE,
                                 max_size, priority, cancellable,
                                 callback, user_data);
}

gboolean
_ostree_fetcher_request_to_membuf_finish (OstreeFetcher *self,
                                          GAsyncResult  *result,
                                          GBytes       **out_buf,
                                          GError       **error)
{
  GTask *task;
  OstreeFetcherPendingURI *pending;
  gpointer ret;

  g_return_val_if_fail (g_task_is_valid (result, self), FALSE);
  g_return_val_if_fail (g_async_result_is_tagged (result, _ostree_fetcher_request_async), FALSE);

  task = (GTask*)result;
  pending = g_task_get_task_data (task);

  ret = g_task_propagate_pointer (task, error);
  if (!ret)
    return FALSE;

  g_assert (pending->is_membuf);
  g_assert (out_buf);
  *out_buf = ret;

  return TRUE;
}


guint64
_ostree_fetcher_bytes_transferred (OstreeFetcher       *self)
{
  g_return_val_if_fail (OSTREE_IS_FETCHER (self), 0);

  g_mutex_lock (&self->thread_closure->output_stream_set_lock);

  guint64 ret = self->thread_closure->total_downloaded;

  GLNX_HASH_TABLE_FOREACH (self->thread_closure->output_stream_set,
                           GFileOutputStream*, stream)
    {
      if (G_IS_FILE_DESCRIPTOR_BASED (stream))
        {
          int fd = g_file_descriptor_based_get_fd ((GFileDescriptorBased*)stream);
          struct stat stbuf;

          if (glnx_fstat (fd, &stbuf, NULL))
            ret += stbuf.st_size;
        }
    }

  g_mutex_unlock (&self->thread_closure->output_stream_set_lock);

  return ret;
}