Blame libvirt-gobject/libvirt-gobject-stream.c

Packit a07778
/*
Packit a07778
 * libvirt-gobject-stream.c: libvirt glib integration
Packit a07778
 *
Packit a07778
 * Copyright (C) 2008 Daniel P. Berrange
Packit a07778
 * Copyright (C) 2010-2011 Red Hat, Inc.
Packit a07778
 *
Packit a07778
 * This library is free software; you can redistribute it and/or
Packit a07778
 * modify it under the terms of the GNU Lesser General Public
Packit a07778
 * License as published by the Free Software Foundation; either
Packit a07778
 * version 2.1 of the License, or (at your option) any later version.
Packit a07778
 *
Packit a07778
 * This library is distributed in the hope that it will be useful,
Packit a07778
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
Packit a07778
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
Packit a07778
 * Lesser General Public License for more details.
Packit a07778
 *
Packit a07778
 * You should have received a copy of the GNU Lesser General Public
Packit a07778
 * License along with this library. If not, see
Packit a07778
 * <http://www.gnu.org/licenses/>.
Packit a07778
 *
Packit a07778
 * Authors: Daniel P. Berrange <berrange@redhat.com>
Packit a07778
 *          Marc-André Lureau <marcandre.lureau@redhat.com>
Packit a07778
 */
Packit a07778
Packit a07778
#include <config.h>
Packit a07778
Packit a07778
#include <libvirt/virterror.h>
Packit a07778
#include <string.h>
Packit a07778
Packit a07778
#include <glib/gi18n-lib.h>
Packit a07778
Packit a07778
#include "libvirt-glib/libvirt-glib.h"
Packit a07778
#include "libvirt-gobject/libvirt-gobject.h"
Packit a07778
#include "libvirt-gobject-compat.h"
Packit a07778
Packit a07778
#include "libvirt-gobject/libvirt-gobject-input-stream.h"
Packit a07778
#include "libvirt-gobject/libvirt-gobject-output-stream.h"
Packit a07778
Packit a07778
#define GVIR_STREAM_GET_PRIVATE(obj)                         \
Packit a07778
        (G_TYPE_INSTANCE_GET_PRIVATE((obj), GVIR_TYPE_STREAM, GVirStreamPrivate))
Packit a07778
Packit a07778
struct _GVirStreamPrivate
Packit a07778
{
Packit a07778
    virStreamPtr   handle;
Packit a07778
    GInputStream  *input_stream;
Packit a07778
    GOutputStream  *output_stream;
Packit a07778
Packit a07778
    gboolean eventRegistered;
Packit a07778
    int eventLast;
Packit a07778
    GList *sources;
Packit a07778
};
Packit a07778
Packit a07778
typedef struct {
Packit a07778
    GSource source;
Packit a07778
    GVirStreamIOCondition cond;
Packit a07778
    GVirStreamIOCondition newCond;
Packit a07778
    GVirStream *stream;
Packit a07778
} GVirStreamSource;
Packit a07778
Packit a07778
Packit a07778
G_DEFINE_TYPE_WITH_PRIVATE(GVirStream, gvir_stream, G_TYPE_IO_STREAM);
Packit a07778
Packit a07778
Packit a07778
enum {
Packit a07778
    PROP_0,
Packit a07778
    PROP_HANDLE,
Packit a07778
};
Packit a07778
Packit a07778
Packit a07778
#define GVIR_STREAM_ERROR gvir_stream_error_quark()
Packit a07778
Packit a07778
static void gvir_stream_update_events(GVirStream *stream);
Packit a07778
Packit a07778
static GQuark
Packit a07778
gvir_stream_error_quark(void)
Packit a07778
{
Packit a07778
    return g_quark_from_static_string("gvir-stream");
Packit a07778
}
Packit a07778
Packit a07778
Packit a07778
static GInputStream* gvir_stream_get_input_stream(GIOStream *io_stream)
Packit a07778
{
Packit a07778
    GVirStream *self = GVIR_STREAM(io_stream);
Packit a07778
Packit a07778
    if (self->priv->input_stream == NULL)
Packit a07778
        self->priv->input_stream = (GInputStream *)_gvir_input_stream_new(self);
Packit a07778
Packit a07778
    return self->priv->input_stream;
Packit a07778
}
Packit a07778
Packit a07778
Packit a07778
static GOutputStream* gvir_stream_get_output_stream(GIOStream *io_stream)
Packit a07778
{
Packit a07778
    GVirStream *self = GVIR_STREAM(io_stream);
Packit a07778
Packit a07778
    if (self->priv->output_stream == NULL)
Packit a07778
        self->priv->output_stream = (GOutputStream *)_gvir_output_stream_new(self);
Packit a07778
Packit a07778
    return self->priv->output_stream;
Packit a07778
}
Packit a07778
Packit a07778
Packit a07778
static gboolean gvir_stream_close(GIOStream *io_stream,
Packit a07778
                                  GCancellable *cancellable,
Packit a07778
                                  GError **error)
Packit a07778
{
Packit a07778
    GVirStream *self = GVIR_STREAM(io_stream);
Packit a07778
    GError *local_error = NULL;
Packit a07778
    gboolean i_ret = TRUE, o_ret = TRUE;
Packit a07778
Packit a07778
    g_return_val_if_fail(error == NULL || *error == NULL, FALSE);
Packit a07778
Packit a07778
    if (self->priv->input_stream)
Packit a07778
        i_ret = g_input_stream_close(self->priv->input_stream, cancellable, &local_error);
Packit a07778
Packit a07778
    if (local_error)
Packit a07778
        g_propagate_error(error, local_error);
Packit a07778
Packit a07778
    if (self->priv->output_stream)
Packit a07778
        o_ret = g_output_stream_close(self->priv->output_stream, cancellable, &local_error);
Packit a07778
Packit a07778
    if (local_error) {
Packit a07778
        if (i_ret)
Packit a07778
            g_propagate_error(error, local_error);
Packit a07778
        else
Packit a07778
            g_error_free(local_error);
Packit a07778
    }
Packit a07778
Packit a07778
    return (i_ret && o_ret);
Packit a07778
}
Packit a07778
Packit a07778
static gboolean close_in_idle (gpointer data)
Packit a07778
{
Packit a07778
    GTask *task = G_TASK (data);
Packit a07778
    GIOStream *stream = G_IO_STREAM(g_task_get_source_object (task));
Packit a07778
    GCancellable *cancellable = g_task_get_cancellable (task);
Packit a07778
    GIOStreamClass *class;
Packit a07778
    GError *error;
Packit a07778
Packit a07778
    class = G_IO_STREAM_GET_CLASS(stream);
Packit a07778
Packit a07778
    /* close is not blocked, just do it! */
Packit a07778
    error = NULL;
Packit a07778
    if (class->close_fn &&
Packit a07778
        !class->close_fn(stream, cancellable, &error)) {
Packit a07778
        g_task_return_error(task, error);
Packit a07778
Packit a07778
        goto exit;
Packit a07778
    }
Packit a07778
Packit a07778
    g_task_return_boolean(task, TRUE);
Packit a07778
exit:
Packit a07778
    g_object_unref(task);
Packit a07778
    return FALSE;
Packit a07778
}
Packit a07778
Packit a07778
static void gvir_stream_close_async(GIOStream *stream,
Packit a07778
                                    int io_priority G_GNUC_UNUSED,
Packit a07778
                                    GCancellable *cancellable,
Packit a07778
                                    GAsyncReadyCallback callback,
Packit a07778
                                    gpointer user_data)
Packit a07778
{
Packit a07778
    GTask *task;
Packit a07778
Packit a07778
    task = g_task_new(G_OBJECT(stream),
Packit a07778
                      cancellable,
Packit a07778
                      callback,
Packit a07778
                      user_data);
Packit a07778
    g_idle_add(close_in_idle, task);
Packit a07778
}
Packit a07778
Packit a07778
static gboolean
Packit a07778
gvir_stream_close_finish(GIOStream *stream,
Packit a07778
                         GAsyncResult *result,
Packit a07778
                         GError **error)
Packit a07778
{
Packit a07778
    g_return_val_if_fail(GVIR_IS_STREAM(stream), FALSE);
Packit a07778
    g_return_val_if_fail(g_task_is_valid(result, stream), FALSE);
Packit a07778
    g_return_val_if_fail(error == NULL || *error == NULL, FALSE);
Packit a07778
Packit a07778
    return g_task_propagate_boolean(G_TASK(result), error);
Packit a07778
}
Packit a07778
Packit a07778
Packit a07778
static void gvir_stream_get_property(GObject *object,
Packit a07778
                                     guint prop_id,
Packit a07778
                                     GValue *value,
Packit a07778
                                     GParamSpec *pspec)
Packit a07778
{
Packit a07778
    GVirStream *self = GVIR_STREAM(object);
Packit a07778
    GVirStreamPrivate *priv = self->priv;
Packit a07778
Packit a07778
    switch (prop_id) {
Packit a07778
    case PROP_HANDLE:
Packit a07778
        g_value_set_boxed(value, priv->handle);
Packit a07778
        break;
Packit a07778
Packit a07778
    default:
Packit a07778
        G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
Packit a07778
    }
Packit a07778
}
Packit a07778
Packit a07778
Packit a07778
static void gvir_stream_set_property(GObject *object,
Packit a07778
                                     guint prop_id,
Packit a07778
                                     const GValue *value,
Packit a07778
                                     GParamSpec *pspec)
Packit a07778
{
Packit a07778
    GVirStream *self = GVIR_STREAM(object);
Packit a07778
    GVirStreamPrivate *priv = self->priv;
Packit a07778
Packit a07778
    switch (prop_id) {
Packit a07778
    case PROP_HANDLE:
Packit a07778
        if (priv->handle)
Packit a07778
            virStreamFree(priv->handle);
Packit a07778
        priv->handle = g_value_get_boxed(value);
Packit a07778
        break;
Packit a07778
Packit a07778
    default:
Packit a07778
        G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
Packit a07778
    }
Packit a07778
}
Packit a07778
Packit a07778
Packit a07778
static void gvir_stream_finalize(GObject *object)
Packit a07778
{
Packit a07778
    GVirStream *self = GVIR_STREAM(object);
Packit a07778
    GVirStreamPrivate *priv = self->priv;
Packit a07778
    GList *tmp;
Packit a07778
Packit a07778
    if (self->priv->input_stream)
Packit a07778
        g_object_unref(self->priv->input_stream);
Packit a07778
Packit a07778
    tmp = priv->sources;
Packit a07778
    while (tmp) {
Packit a07778
        GVirStreamSource *source = tmp->data;
Packit a07778
        g_source_destroy((GSource*)source);
Packit a07778
        tmp = tmp->next;
Packit a07778
    }
Packit a07778
    g_list_free(priv->sources);
Packit a07778
    priv->sources = NULL;
Packit a07778
Packit a07778
    if (priv->handle) {
Packit a07778
        gvir_stream_update_events(self);
Packit a07778
Packit a07778
        if (virStreamFinish(priv->handle) < 0)
Packit a07778
            gvir_critical("cannot finish stream");
Packit a07778
Packit a07778
        virStreamFree(priv->handle);
Packit a07778
    }
Packit a07778
Packit a07778
    G_OBJECT_CLASS(gvir_stream_parent_class)->finalize(object);
Packit a07778
}
Packit a07778
Packit a07778
Packit a07778
static void gvir_stream_class_init(GVirStreamClass *klass)
Packit a07778
{
Packit a07778
    GObjectClass *object_class = G_OBJECT_CLASS(klass);
Packit a07778
    GIOStreamClass *stream_class = G_IO_STREAM_CLASS(klass);
Packit a07778
Packit a07778
    object_class->finalize = gvir_stream_finalize;
Packit a07778
    object_class->get_property = gvir_stream_get_property;
Packit a07778
    object_class->set_property = gvir_stream_set_property;
Packit a07778
Packit a07778
    stream_class->get_input_stream = gvir_stream_get_input_stream;
Packit a07778
    stream_class->get_output_stream = gvir_stream_get_output_stream;
Packit a07778
    stream_class->close_fn = gvir_stream_close;
Packit a07778
    stream_class->close_async = gvir_stream_close_async;
Packit a07778
    stream_class->close_finish = gvir_stream_close_finish;
Packit a07778
Packit a07778
    g_object_class_install_property(object_class,
Packit a07778
                                    PROP_HANDLE,
Packit a07778
                                    g_param_spec_boxed("handle",
Packit a07778
                                                       "Handle",
Packit a07778
                                                       "The stream handle",
Packit a07778
                                                       GVIR_TYPE_STREAM_HANDLE,
Packit a07778
                                                       G_PARAM_READABLE |
Packit a07778
                                                       G_PARAM_WRITABLE |
Packit a07778
                                                       G_PARAM_CONSTRUCT_ONLY |
Packit a07778
                                                       G_PARAM_STATIC_STRINGS));
Packit a07778
}
Packit a07778
Packit a07778
Packit a07778
static void gvir_stream_init(GVirStream *self)
Packit a07778
{
Packit a07778
    self->priv = GVIR_STREAM_GET_PRIVATE(self);
Packit a07778
}
Packit a07778
Packit a07778
typedef struct virStream GVirStreamHandle;
Packit a07778
Packit a07778
static GVirStreamHandle*
Packit a07778
gvir_stream_handle_copy(GVirStreamHandle *src)
Packit a07778
{
Packit a07778
    virStreamRef((virStreamPtr)src);
Packit a07778
    return src;
Packit a07778
}
Packit a07778
Packit a07778
static void
Packit a07778
gvir_stream_handle_free(GVirStreamHandle *src)
Packit a07778
{
Packit a07778
    virStreamFree((virStreamPtr)src);
Packit a07778
}
Packit a07778
Packit a07778
G_DEFINE_BOXED_TYPE(GVirStreamHandle, gvir_stream_handle,
Packit a07778
                    gvir_stream_handle_copy, gvir_stream_handle_free)
Packit a07778
Packit a07778
/**
Packit a07778
 * gvir_stream_receive:
Packit a07778
 * @stream: the stream
Packit a07778
 * @buffer: (array length=size) (element-type guint8): a buffer
Packit a07778
 *     to read data into (which should be at least @size bytes long).
Packit a07778
 * @size: the number of bytes you want to read from the stream
Packit a07778
 * @cancellable: (allow-none): a %GCancellable or %NULL
Packit a07778
 * @error: #GError for error reporting, or %NULL to ignore.
Packit a07778
 *
Packit a07778
 * Receive data (up to @size bytes) from a stream.
Packit a07778
 * On error -1 is returned and @error is set accordingly.
Packit a07778
 *
Packit a07778
 * gvir_stream_receive() can return any number of bytes, up to
Packit a07778
 * @size. If more than @size bytes have been received, the additional
Packit a07778
 * data will be returned in future calls to gvir_stream_receive().
Packit a07778
 *
Packit a07778
 * If there is no data available, a %G_IO_ERROR_WOULD_BLOCK error will be
Packit a07778
 * returned.
Packit a07778
 *
Packit a07778
 * Returns: Number of bytes read, or 0 if the end of stream reached,
Packit a07778
 * or -1 on error.
Packit a07778
 */
Packit a07778
gssize gvir_stream_receive(GVirStream *self,
Packit a07778
                           gchar *buffer,
Packit a07778
                           gsize size,
Packit a07778
                           GCancellable *cancellable,
Packit a07778
                           GError **error)
Packit a07778
{
Packit a07778
    int got;
Packit a07778
Packit a07778
    g_return_val_if_fail(GVIR_IS_STREAM(self), -1);
Packit a07778
    g_return_val_if_fail(buffer != NULL, -1);
Packit a07778
Packit a07778
    if (g_cancellable_set_error_if_cancelled (cancellable, error))
Packit a07778
        return -1;
Packit a07778
Packit a07778
    got = virStreamRecv(self->priv->handle, buffer, size);
Packit a07778
Packit a07778
    if (got == -2) {  /* blocking */
Packit a07778
        g_set_error_literal(error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
Packit a07778
                            _("virStreamRecv call would block"));
Packit a07778
    } else if (got < 0) {
Packit a07778
        g_set_error(error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
Packit a07778
                    _("Got virStreamRecv error in %s"), G_STRFUNC);
Packit a07778
    }
Packit a07778
Packit a07778
    return got;
Packit a07778
}
Packit a07778
Packit a07778
struct stream_sink_helper {
Packit a07778
    GVirStream *self;
Packit a07778
    GVirStreamSinkFunc func;
Packit a07778
    gpointer user_data;
Packit a07778
    GCancellable *cancellable;
Packit a07778
};
Packit a07778
Packit a07778
static int
Packit a07778
stream_sink(virStreamPtr st G_GNUC_UNUSED,
Packit a07778
            const char *bytes,
Packit a07778
            size_t nbytes,
Packit a07778
            void *opaque)
Packit a07778
{
Packit a07778
  struct stream_sink_helper *helper = opaque;
Packit a07778
Packit a07778
  if (g_cancellable_is_cancelled(helper->cancellable))
Packit a07778
      return -1;
Packit a07778
Packit a07778
  return helper->func(helper->self, bytes, nbytes, helper->user_data);
Packit a07778
}
Packit a07778
Packit a07778
/**
Packit a07778
 * gvir_stream_receive_all:
Packit a07778
 * @stream: the stream
Packit a07778
 * @cancellable: cancellation notifier
Packit a07778
 * @func: (scope notified): the callback for writing data to application
Packit a07778
 * @user_data: (closure): data to be passed to @callback
Packit a07778
 * @error: #GError for error reporting, or %NULL to ignore.
Packit a07778
 *
Packit a07778
 * Receive the entire data stream, sending the data to the
Packit a07778
 * requested data sink. This is simply a convenient alternative
Packit a07778
 * to virStreamRecv, for apps that do blocking-I/o.
Packit a07778
 *
Packit a07778
 * Returns: the number of bytes consumed or -1 upon error
Packit a07778
 */
Packit a07778
gssize
Packit a07778
gvir_stream_receive_all(GVirStream *self,
Packit a07778
                        GCancellable *cancellable,
Packit a07778
                        GVirStreamSinkFunc func,
Packit a07778
                        gpointer user_data,
Packit a07778
                        GError **error)
Packit a07778
{
Packit a07778
    struct stream_sink_helper helper = {
Packit a07778
        .self = self,
Packit a07778
        .func = func,
Packit a07778
        .user_data = user_data,
Packit a07778
        .cancellable = cancellable,
Packit a07778
    };
Packit a07778
    int r;
Packit a07778
Packit a07778
    g_return_val_if_fail(GVIR_IS_STREAM(self), -1);
Packit a07778
    g_return_val_if_fail((cancellable == NULL) || G_IS_CANCELLABLE(cancellable), -1);
Packit a07778
    g_return_val_if_fail(func != NULL, -1);
Packit a07778
    g_return_val_if_fail(error == NULL || *error == NULL, -1);
Packit a07778
Packit a07778
    r = virStreamRecvAll(self->priv->handle, stream_sink, &helper);
Packit a07778
    if (r < 0) {
Packit a07778
        gvir_set_error_literal(error, GVIR_STREAM_ERROR,
Packit a07778
                               0,
Packit a07778
                               _("Unable to perform RecvAll"));
Packit a07778
    }
Packit a07778
Packit a07778
    return r;
Packit a07778
}
Packit a07778
Packit a07778
Packit a07778
/**
Packit a07778
 * gvir_stream_send:
Packit a07778
 * @stream: the stream
Packit a07778
 * @buffer: a buffer to write data from (which should be at least @size
Packit a07778
 *     bytes long).
Packit a07778
 * @size: the number of bytes you want to write to the stream
Packit a07778
 * @cancellable: (allow-none): a %GCancellable or %NULL
Packit a07778
 * @error: #GError for error reporting, or %NULL to ignore.
Packit a07778
 *
Packit a07778
 * Send data (up to @size bytes) from a stream.
Packit a07778
 * On error -1 is returned and @error is set accordingly.
Packit a07778
 *
Packit a07778
 * gvir_stream_send() can return any number of bytes, up to
Packit a07778
 * @size. If more than @size bytes have been sendd, the additional
Packit a07778
 * data will be returned in future calls to gvir_stream_send().
Packit a07778
 *
Packit a07778
 * If there is no data available, a %G_IO_ERROR_WOULD_BLOCK error will be
Packit a07778
 * returned.
Packit a07778
 *
Packit a07778
 * Returns: Number of bytes written.
Packit a07778
 */
Packit a07778
gssize gvir_stream_send(GVirStream *self,
Packit a07778
                        const gchar *buffer,
Packit a07778
                        gsize size,
Packit a07778
                        GCancellable *cancellable,
Packit a07778
                        GError **error)
Packit a07778
{
Packit a07778
    int got;
Packit a07778
Packit a07778
    g_return_val_if_fail(GVIR_IS_STREAM(self), -1);
Packit a07778
    g_return_val_if_fail(buffer != NULL, -1);
Packit a07778
    g_return_val_if_fail((cancellable == NULL) || G_IS_CANCELLABLE(cancellable), -1);
Packit a07778
    g_return_val_if_fail(error == NULL || *error == NULL, -1);
Packit a07778
Packit a07778
    if (g_cancellable_set_error_if_cancelled (cancellable, error))
Packit a07778
        return -1;
Packit a07778
Packit a07778
    got = virStreamSend(self->priv->handle, buffer, size);
Packit a07778
Packit a07778
    if (got == -2) {  /* blocking */
Packit a07778
        g_set_error_literal(error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
Packit a07778
                            _("virStreamSend call would block"));
Packit a07778
    } else if (got < 0) {
Packit a07778
        g_set_error(error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
Packit a07778
                    _("Got virStreamRecv error in %s"), G_STRFUNC);
Packit a07778
    }
Packit a07778
Packit a07778
    return got;
Packit a07778
}
Packit a07778
Packit a07778
struct stream_source_helper {
Packit a07778
    GVirStream *self;
Packit a07778
    GVirStreamSourceFunc func;
Packit a07778
    gpointer user_data;
Packit a07778
    GCancellable *cancellable;
Packit a07778
};
Packit a07778
Packit a07778
static int
Packit a07778
stream_source(virStreamPtr st G_GNUC_UNUSED,
Packit a07778
              char *bytes,
Packit a07778
              size_t nbytes,
Packit a07778
              void *opaque)
Packit a07778
{
Packit a07778
  struct stream_source_helper *helper = opaque;
Packit a07778
Packit a07778
  if (g_cancellable_is_cancelled(helper->cancellable))
Packit a07778
      return -1;
Packit a07778
Packit a07778
  return helper->func(helper->self, bytes, nbytes, helper->user_data);
Packit a07778
}
Packit a07778
Packit a07778
/**
Packit a07778
 * gvir_stream_send_all:
Packit a07778
 * @stream: the stream
Packit a07778
 * @cancellable: cancellation notifier
Packit a07778
 * @func: (scope notified): the callback for writing data to application
Packit a07778
 * @user_data: (closure): data to be passed to @callback
Packit a07778
 * @error: #GError for error reporting, or %NULL to ignore.
Packit a07778
 *
Packit a07778
 * Send the entire data stream, sending the data to the
Packit a07778
 * requested data source. This is simply a convenient alternative
Packit a07778
 * to virStreamRecv, for apps that do blocking-I/o.
Packit a07778
 *
Packit a07778
 * Returns: the number of bytes consumed or -1 upon error
Packit a07778
 */
Packit a07778
gssize
Packit a07778
gvir_stream_send_all(GVirStream *self,
Packit a07778
                     GCancellable *cancellable,
Packit a07778
                     GVirStreamSourceFunc func,
Packit a07778
                     gpointer user_data,
Packit a07778
                     GError **error)
Packit a07778
{
Packit a07778
    struct stream_source_helper helper = {
Packit a07778
        .self = self,
Packit a07778
        .func = func,
Packit a07778
        .user_data = user_data,
Packit a07778
        .cancellable = cancellable,
Packit a07778
    };
Packit a07778
    int r;
Packit a07778
Packit a07778
    g_return_val_if_fail(GVIR_IS_STREAM(self), -1);
Packit a07778
    g_return_val_if_fail((cancellable == NULL) || G_IS_CANCELLABLE(cancellable), -1);
Packit a07778
    g_return_val_if_fail(func != NULL, -1);
Packit a07778
    g_return_val_if_fail(error == NULL || *error == NULL, -1);
Packit a07778
Packit a07778
    r = virStreamSendAll(self->priv->handle, stream_source, &helper);
Packit a07778
    if (r < 0) {
Packit a07778
        gvir_set_error_literal(error, GVIR_STREAM_ERROR,
Packit a07778
                               0,
Packit a07778
                               _("Unable to perform SendAll"));
Packit a07778
    }
Packit a07778
Packit a07778
    return r;
Packit a07778
}
Packit a07778
Packit a07778
Packit a07778
static void gvir_stream_handle_events(virStreamPtr st G_GNUC_UNUSED,
Packit a07778
                                      int events,
Packit a07778
                                      void *opaque)
Packit a07778
{
Packit a07778
    GVirStream *stream = GVIR_STREAM(opaque);
Packit a07778
    GVirStreamPrivate *priv = stream->priv;
Packit a07778
    GList *tmp = priv->sources;
Packit a07778
Packit a07778
    while (tmp) {
Packit a07778
        GVirStreamSource *source = tmp->data;
Packit a07778
        source->newCond = 0;
Packit a07778
        if (source->cond & GVIR_STREAM_IO_CONDITION_READABLE) {
Packit a07778
            if (events & VIR_STREAM_EVENT_READABLE)
Packit a07778
                source->newCond |= GVIR_STREAM_IO_CONDITION_READABLE;
Packit a07778
            if (events & VIR_STREAM_EVENT_HANGUP)
Packit a07778
                source->newCond |= GVIR_STREAM_IO_CONDITION_HANGUP;
Packit a07778
            if (events & VIR_STREAM_EVENT_ERROR)
Packit a07778
                source->newCond |= GVIR_STREAM_IO_CONDITION_ERROR;
Packit a07778
        }
Packit a07778
        if (source->cond & GVIR_STREAM_IO_CONDITION_WRITABLE) {
Packit a07778
            if (events & VIR_STREAM_EVENT_WRITABLE)
Packit a07778
                source->newCond |= GVIR_STREAM_IO_CONDITION_WRITABLE;
Packit a07778
            if (events & VIR_STREAM_EVENT_HANGUP)
Packit a07778
                source->newCond |= GVIR_STREAM_IO_CONDITION_HANGUP;
Packit a07778
            if (events & VIR_STREAM_EVENT_ERROR)
Packit a07778
                source->newCond |= GVIR_STREAM_IO_CONDITION_ERROR;
Packit a07778
        }
Packit a07778
        tmp = tmp->next;
Packit a07778
    }
Packit a07778
Packit a07778
}
Packit a07778
Packit a07778
Packit a07778
static void gvir_stream_update_events(GVirStream *stream)
Packit a07778
{
Packit a07778
    GVirStreamPrivate *priv = stream->priv;
Packit a07778
    int mask = 0;
Packit a07778
    GList *tmp = priv->sources;
Packit a07778
Packit a07778
    while (tmp) {
Packit a07778
        GVirStreamSource *source = tmp->data;
Packit a07778
        if (source->cond & GVIR_STREAM_IO_CONDITION_READABLE)
Packit a07778
            mask |= VIR_STREAM_EVENT_READABLE;
Packit a07778
        if (source->cond & GVIR_STREAM_IO_CONDITION_WRITABLE)
Packit a07778
            mask |= VIR_STREAM_EVENT_WRITABLE;
Packit a07778
        tmp = tmp->next;
Packit a07778
    }
Packit a07778
Packit a07778
    if (mask) {
Packit a07778
        if (priv->eventRegistered) {
Packit a07778
            virStreamEventUpdateCallback(priv->handle, mask);
Packit a07778
        } else {
Packit a07778
            virStreamEventAddCallback(priv->handle, mask,
Packit a07778
                                      gvir_stream_handle_events,
Packit a07778
                                      stream,
Packit a07778
                                      NULL);
Packit a07778
            priv->eventRegistered = TRUE;
Packit a07778
        }
Packit a07778
    } else {
Packit a07778
        if (priv->eventRegistered) {
Packit a07778
            virStreamEventRemoveCallback(priv->handle);
Packit a07778
            priv->eventRegistered = FALSE;
Packit a07778
        }
Packit a07778
    }
Packit a07778
}
Packit a07778
Packit a07778
static gboolean gvir_stream_source_prepare(GSource *source,
Packit a07778
                                           gint *timeout)
Packit a07778
{
Packit a07778
    GVirStreamSource *gsource = (GVirStreamSource*)source;
Packit a07778
    if (gsource->newCond) {
Packit a07778
        *timeout = 0;
Packit a07778
        return TRUE;
Packit a07778
    }
Packit a07778
    *timeout = -1;
Packit a07778
    return FALSE;
Packit a07778
}
Packit a07778
Packit a07778
static gboolean gvir_stream_source_check(GSource *source)
Packit a07778
{
Packit a07778
    GVirStreamSource *gsource = (GVirStreamSource*)source;
Packit a07778
    if (gsource->newCond)
Packit a07778
        return TRUE;
Packit a07778
    return FALSE;
Packit a07778
}
Packit a07778
Packit a07778
static gboolean gvir_stream_source_dispatch(GSource *source,
Packit a07778
                                            GSourceFunc callback,
Packit a07778
                                            gpointer user_data)
Packit a07778
{
Packit a07778
    GVirStreamSource *gsource = (GVirStreamSource*)source;
Packit a07778
    GVirStreamIOFunc func = (GVirStreamIOFunc)callback;
Packit a07778
    gboolean ret;
Packit a07778
    ret = func(gsource->stream, gsource->newCond, user_data);
Packit a07778
    gsource->newCond = 0;
Packit a07778
    return ret;
Packit a07778
}
Packit a07778
Packit a07778
static void gvir_stream_source_finalize(GSource *source)
Packit a07778
{
Packit a07778
    GVirStreamSource *gsource = (GVirStreamSource*)source;
Packit a07778
    GVirStreamPrivate *priv = gsource->stream->priv;
Packit a07778
Packit a07778
    priv->sources = g_list_remove(priv->sources, source);
Packit a07778
    gvir_stream_update_events(gsource->stream);
Packit a07778
Packit a07778
    g_clear_object(&gsource->stream);
Packit a07778
}
Packit a07778
Packit a07778
GSourceFuncs gvir_stream_source_funcs = {
Packit a07778
    .prepare = gvir_stream_source_prepare,
Packit a07778
    .check = gvir_stream_source_check,
Packit a07778
    .dispatch = gvir_stream_source_dispatch,
Packit a07778
    .finalize = gvir_stream_source_finalize,
Packit a07778
};
Packit a07778
Packit a07778
Packit a07778
/**
Packit a07778
 * gvir_stream_add_watch: (skip)
Packit a07778
 * @stream: the stream
Packit a07778
 * @cond: the conditions to watch for (bitfield of #GVirStreamIOCondition)
Packit a07778
 * @func: (closure opaque): the function to call when the condition is satisfied
Packit a07778
 * @opaque: (closure): user data to pass to @func
Packit a07778
 *
Packit a07778
 * Adds a watch for @stream to the mainloop
Packit a07778
 *
Packit a07778
 * Returns: the event source id
Packit a07778
 */
Packit a07778
guint gvir_stream_add_watch(GVirStream *stream,
Packit a07778
                            GVirStreamIOCondition cond,
Packit a07778
                            GVirStreamIOFunc func,
Packit a07778
                            gpointer opaque)
Packit a07778
{
Packit a07778
    return gvir_stream_add_watch_full(stream,
Packit a07778
                                      G_PRIORITY_DEFAULT,
Packit a07778
                                      cond,
Packit a07778
                                      func,
Packit a07778
                                      opaque,
Packit a07778
                                      NULL);
Packit a07778
}
Packit a07778
Packit a07778
/**
Packit a07778
 * gvir_stream_add_watch_full: (rename-to gvir_stream_add_watch)
Packit a07778
 * @stream: the stream
Packit a07778
 * @priority: the priority of the #GVirStream source
Packit a07778
 * @cond: the conditions to watch for (bitfield of #GVirStreamIOCondition)
Packit a07778
 * @func: (closure opaque): the function to call when the condition is satisfied
Packit a07778
 * @opaque: (closure): user data to pass to @func
Packit a07778
 * @notify: the function to call when the source is removed
Packit a07778
 *
Packit a07778
 * Adds a watch for @stream to the mainloop
Packit a07778
 *
Packit a07778
 * Returns: the event source id
Packit a07778
 */
Packit a07778
guint gvir_stream_add_watch_full(GVirStream *stream,
Packit a07778
                                 gint priority,
Packit a07778
                                 GVirStreamIOCondition cond,
Packit a07778
                                 GVirStreamIOFunc func,
Packit a07778
                                 gpointer opaque,
Packit a07778
                                 GDestroyNotify notify)
Packit a07778
{
Packit a07778
    g_return_val_if_fail(GVIR_IS_STREAM(stream), 0);
Packit a07778
Packit a07778
    GVirStreamPrivate *priv = stream->priv;
Packit a07778
    GVirStreamSource *source = (GVirStreamSource*)g_source_new(&gvir_stream_source_funcs,
Packit a07778
                                                               sizeof(GVirStreamSource));
Packit a07778
    guint ret;
Packit a07778
Packit a07778
    source->stream = g_object_ref(stream);
Packit a07778
    source->cond = cond;
Packit a07778
Packit a07778
    if (priority != G_PRIORITY_DEFAULT)
Packit a07778
        g_source_set_priority((GSource*)source, priority);
Packit a07778
Packit a07778
    priv->sources = g_list_append(priv->sources, source);
Packit a07778
Packit a07778
    gvir_stream_update_events(source->stream);
Packit a07778
Packit a07778
    g_source_set_callback((GSource*)source, (GSourceFunc)func, opaque, notify);
Packit a07778
    ret = g_source_attach((GSource*)source, g_main_context_default());
Packit a07778
Packit a07778
    g_source_unref((GSource*)source);
Packit a07778
Packit a07778
    return ret;
Packit a07778
}