/* * libvirt-gobject-stream.c: libvirt glib integration * * Copyright (C) 2008 Daniel P. Berrange * Copyright (C) 2010-2011 Red Hat, Inc. * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library. If not, see * . * * Authors: Daniel P. Berrange * Marc-André Lureau */ #include #include #include #include #include "libvirt-glib/libvirt-glib.h" #include "libvirt-gobject/libvirt-gobject.h" #include "libvirt-gobject-compat.h" #include "libvirt-gobject/libvirt-gobject-input-stream.h" #include "libvirt-gobject/libvirt-gobject-output-stream.h" #define GVIR_STREAM_GET_PRIVATE(obj) \ (G_TYPE_INSTANCE_GET_PRIVATE((obj), GVIR_TYPE_STREAM, GVirStreamPrivate)) struct _GVirStreamPrivate { virStreamPtr handle; GInputStream *input_stream; GOutputStream *output_stream; gboolean eventRegistered; int eventLast; GList *sources; }; typedef struct { GSource source; GVirStreamIOCondition cond; GVirStreamIOCondition newCond; GVirStream *stream; } GVirStreamSource; G_DEFINE_TYPE_WITH_PRIVATE(GVirStream, gvir_stream, G_TYPE_IO_STREAM); enum { PROP_0, PROP_HANDLE, }; #define GVIR_STREAM_ERROR gvir_stream_error_quark() static void gvir_stream_update_events(GVirStream *stream); static GQuark gvir_stream_error_quark(void) { return g_quark_from_static_string("gvir-stream"); } static GInputStream* gvir_stream_get_input_stream(GIOStream *io_stream) { GVirStream *self = GVIR_STREAM(io_stream); if (self->priv->input_stream == NULL) self->priv->input_stream = (GInputStream *)_gvir_input_stream_new(self); return self->priv->input_stream; } static GOutputStream* gvir_stream_get_output_stream(GIOStream *io_stream) { GVirStream *self = GVIR_STREAM(io_stream); if (self->priv->output_stream == NULL) self->priv->output_stream = (GOutputStream *)_gvir_output_stream_new(self); return self->priv->output_stream; } static gboolean gvir_stream_close(GIOStream *io_stream, GCancellable *cancellable, GError **error) { GVirStream *self = GVIR_STREAM(io_stream); GError *local_error = NULL; gboolean i_ret = TRUE, o_ret = TRUE; g_return_val_if_fail(error == NULL || *error == NULL, FALSE); if (self->priv->input_stream) i_ret = g_input_stream_close(self->priv->input_stream, cancellable, &local_error); if (local_error) g_propagate_error(error, local_error); if (self->priv->output_stream) o_ret = g_output_stream_close(self->priv->output_stream, cancellable, &local_error); if (local_error) { if (i_ret) g_propagate_error(error, local_error); else g_error_free(local_error); } return (i_ret && o_ret); } static gboolean close_in_idle (gpointer data) { GTask *task = G_TASK (data); GIOStream *stream = G_IO_STREAM(g_task_get_source_object (task)); GCancellable *cancellable = g_task_get_cancellable (task); GIOStreamClass *class; GError *error; class = G_IO_STREAM_GET_CLASS(stream); /* close is not blocked, just do it! */ error = NULL; if (class->close_fn && !class->close_fn(stream, cancellable, &error)) { g_task_return_error(task, error); goto exit; } g_task_return_boolean(task, TRUE); exit: g_object_unref(task); return FALSE; } static void gvir_stream_close_async(GIOStream *stream, int io_priority G_GNUC_UNUSED, GCancellable *cancellable, GAsyncReadyCallback callback, gpointer user_data) { GTask *task; task = g_task_new(G_OBJECT(stream), cancellable, callback, user_data); g_idle_add(close_in_idle, task); } static gboolean gvir_stream_close_finish(GIOStream *stream, GAsyncResult *result, GError **error) { g_return_val_if_fail(GVIR_IS_STREAM(stream), FALSE); g_return_val_if_fail(g_task_is_valid(result, stream), FALSE); g_return_val_if_fail(error == NULL || *error == NULL, FALSE); return g_task_propagate_boolean(G_TASK(result), error); } static void gvir_stream_get_property(GObject *object, guint prop_id, GValue *value, GParamSpec *pspec) { GVirStream *self = GVIR_STREAM(object); GVirStreamPrivate *priv = self->priv; switch (prop_id) { case PROP_HANDLE: g_value_set_boxed(value, priv->handle); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); } } static void gvir_stream_set_property(GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec) { GVirStream *self = GVIR_STREAM(object); GVirStreamPrivate *priv = self->priv; switch (prop_id) { case PROP_HANDLE: if (priv->handle) virStreamFree(priv->handle); priv->handle = g_value_get_boxed(value); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); } } static void gvir_stream_finalize(GObject *object) { GVirStream *self = GVIR_STREAM(object); GVirStreamPrivate *priv = self->priv; GList *tmp; if (self->priv->input_stream) g_object_unref(self->priv->input_stream); tmp = priv->sources; while (tmp) { GVirStreamSource *source = tmp->data; g_source_destroy((GSource*)source); tmp = tmp->next; } g_list_free(priv->sources); priv->sources = NULL; if (priv->handle) { gvir_stream_update_events(self); if (virStreamFinish(priv->handle) < 0) gvir_critical("cannot finish stream"); virStreamFree(priv->handle); } G_OBJECT_CLASS(gvir_stream_parent_class)->finalize(object); } static void gvir_stream_class_init(GVirStreamClass *klass) { GObjectClass *object_class = G_OBJECT_CLASS(klass); GIOStreamClass *stream_class = G_IO_STREAM_CLASS(klass); object_class->finalize = gvir_stream_finalize; object_class->get_property = gvir_stream_get_property; object_class->set_property = gvir_stream_set_property; stream_class->get_input_stream = gvir_stream_get_input_stream; stream_class->get_output_stream = gvir_stream_get_output_stream; stream_class->close_fn = gvir_stream_close; stream_class->close_async = gvir_stream_close_async; stream_class->close_finish = gvir_stream_close_finish; g_object_class_install_property(object_class, PROP_HANDLE, g_param_spec_boxed("handle", "Handle", "The stream handle", GVIR_TYPE_STREAM_HANDLE, G_PARAM_READABLE | G_PARAM_WRITABLE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS)); } static void gvir_stream_init(GVirStream *self) { self->priv = GVIR_STREAM_GET_PRIVATE(self); } typedef struct virStream GVirStreamHandle; static GVirStreamHandle* gvir_stream_handle_copy(GVirStreamHandle *src) { virStreamRef((virStreamPtr)src); return src; } static void gvir_stream_handle_free(GVirStreamHandle *src) { virStreamFree((virStreamPtr)src); } G_DEFINE_BOXED_TYPE(GVirStreamHandle, gvir_stream_handle, gvir_stream_handle_copy, gvir_stream_handle_free) /** * gvir_stream_receive: * @stream: the stream * @buffer: (array length=size) (element-type guint8): a buffer * to read data into (which should be at least @size bytes long). * @size: the number of bytes you want to read from the stream * @cancellable: (allow-none): a %GCancellable or %NULL * @error: #GError for error reporting, or %NULL to ignore. * * Receive data (up to @size bytes) from a stream. * On error -1 is returned and @error is set accordingly. * * gvir_stream_receive() can return any number of bytes, up to * @size. If more than @size bytes have been received, the additional * data will be returned in future calls to gvir_stream_receive(). * * If there is no data available, a %G_IO_ERROR_WOULD_BLOCK error will be * returned. * * Returns: Number of bytes read, or 0 if the end of stream reached, * or -1 on error. */ gssize gvir_stream_receive(GVirStream *self, gchar *buffer, gsize size, GCancellable *cancellable, GError **error) { int got; g_return_val_if_fail(GVIR_IS_STREAM(self), -1); g_return_val_if_fail(buffer != NULL, -1); if (g_cancellable_set_error_if_cancelled (cancellable, error)) return -1; got = virStreamRecv(self->priv->handle, buffer, size); if (got == -2) { /* blocking */ g_set_error_literal(error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK, _("virStreamRecv call would block")); } else if (got < 0) { g_set_error(error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT, _("Got virStreamRecv error in %s"), G_STRFUNC); } return got; } struct stream_sink_helper { GVirStream *self; GVirStreamSinkFunc func; gpointer user_data; GCancellable *cancellable; }; static int stream_sink(virStreamPtr st G_GNUC_UNUSED, const char *bytes, size_t nbytes, void *opaque) { struct stream_sink_helper *helper = opaque; if (g_cancellable_is_cancelled(helper->cancellable)) return -1; return helper->func(helper->self, bytes, nbytes, helper->user_data); } /** * gvir_stream_receive_all: * @stream: the stream * @cancellable: cancellation notifier * @func: (scope notified): the callback for writing data to application * @user_data: (closure): data to be passed to @callback * @error: #GError for error reporting, or %NULL to ignore. * * Receive the entire data stream, sending the data to the * requested data sink. This is simply a convenient alternative * to virStreamRecv, for apps that do blocking-I/o. * * Returns: the number of bytes consumed or -1 upon error */ gssize gvir_stream_receive_all(GVirStream *self, GCancellable *cancellable, GVirStreamSinkFunc func, gpointer user_data, GError **error) { struct stream_sink_helper helper = { .self = self, .func = func, .user_data = user_data, .cancellable = cancellable, }; int r; g_return_val_if_fail(GVIR_IS_STREAM(self), -1); g_return_val_if_fail((cancellable == NULL) || G_IS_CANCELLABLE(cancellable), -1); g_return_val_if_fail(func != NULL, -1); g_return_val_if_fail(error == NULL || *error == NULL, -1); r = virStreamRecvAll(self->priv->handle, stream_sink, &helper); if (r < 0) { gvir_set_error_literal(error, GVIR_STREAM_ERROR, 0, _("Unable to perform RecvAll")); } return r; } /** * gvir_stream_send: * @stream: the stream * @buffer: a buffer to write data from (which should be at least @size * bytes long). * @size: the number of bytes you want to write to the stream * @cancellable: (allow-none): a %GCancellable or %NULL * @error: #GError for error reporting, or %NULL to ignore. * * Send data (up to @size bytes) from a stream. * On error -1 is returned and @error is set accordingly. * * gvir_stream_send() can return any number of bytes, up to * @size. If more than @size bytes have been sendd, the additional * data will be returned in future calls to gvir_stream_send(). * * If there is no data available, a %G_IO_ERROR_WOULD_BLOCK error will be * returned. * * Returns: Number of bytes written. */ gssize gvir_stream_send(GVirStream *self, const gchar *buffer, gsize size, GCancellable *cancellable, GError **error) { int got; g_return_val_if_fail(GVIR_IS_STREAM(self), -1); g_return_val_if_fail(buffer != NULL, -1); g_return_val_if_fail((cancellable == NULL) || G_IS_CANCELLABLE(cancellable), -1); g_return_val_if_fail(error == NULL || *error == NULL, -1); if (g_cancellable_set_error_if_cancelled (cancellable, error)) return -1; got = virStreamSend(self->priv->handle, buffer, size); if (got == -2) { /* blocking */ g_set_error_literal(error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK, _("virStreamSend call would block")); } else if (got < 0) { g_set_error(error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT, _("Got virStreamRecv error in %s"), G_STRFUNC); } return got; } struct stream_source_helper { GVirStream *self; GVirStreamSourceFunc func; gpointer user_data; GCancellable *cancellable; }; static int stream_source(virStreamPtr st G_GNUC_UNUSED, char *bytes, size_t nbytes, void *opaque) { struct stream_source_helper *helper = opaque; if (g_cancellable_is_cancelled(helper->cancellable)) return -1; return helper->func(helper->self, bytes, nbytes, helper->user_data); } /** * gvir_stream_send_all: * @stream: the stream * @cancellable: cancellation notifier * @func: (scope notified): the callback for writing data to application * @user_data: (closure): data to be passed to @callback * @error: #GError for error reporting, or %NULL to ignore. * * Send the entire data stream, sending the data to the * requested data source. This is simply a convenient alternative * to virStreamRecv, for apps that do blocking-I/o. * * Returns: the number of bytes consumed or -1 upon error */ gssize gvir_stream_send_all(GVirStream *self, GCancellable *cancellable, GVirStreamSourceFunc func, gpointer user_data, GError **error) { struct stream_source_helper helper = { .self = self, .func = func, .user_data = user_data, .cancellable = cancellable, }; int r; g_return_val_if_fail(GVIR_IS_STREAM(self), -1); g_return_val_if_fail((cancellable == NULL) || G_IS_CANCELLABLE(cancellable), -1); g_return_val_if_fail(func != NULL, -1); g_return_val_if_fail(error == NULL || *error == NULL, -1); r = virStreamSendAll(self->priv->handle, stream_source, &helper); if (r < 0) { gvir_set_error_literal(error, GVIR_STREAM_ERROR, 0, _("Unable to perform SendAll")); } return r; } static void gvir_stream_handle_events(virStreamPtr st G_GNUC_UNUSED, int events, void *opaque) { GVirStream *stream = GVIR_STREAM(opaque); GVirStreamPrivate *priv = stream->priv; GList *tmp = priv->sources; while (tmp) { GVirStreamSource *source = tmp->data; source->newCond = 0; if (source->cond & GVIR_STREAM_IO_CONDITION_READABLE) { if (events & VIR_STREAM_EVENT_READABLE) source->newCond |= GVIR_STREAM_IO_CONDITION_READABLE; if (events & VIR_STREAM_EVENT_HANGUP) source->newCond |= GVIR_STREAM_IO_CONDITION_HANGUP; if (events & VIR_STREAM_EVENT_ERROR) source->newCond |= GVIR_STREAM_IO_CONDITION_ERROR; } if (source->cond & GVIR_STREAM_IO_CONDITION_WRITABLE) { if (events & VIR_STREAM_EVENT_WRITABLE) source->newCond |= GVIR_STREAM_IO_CONDITION_WRITABLE; if (events & VIR_STREAM_EVENT_HANGUP) source->newCond |= GVIR_STREAM_IO_CONDITION_HANGUP; if (events & VIR_STREAM_EVENT_ERROR) source->newCond |= GVIR_STREAM_IO_CONDITION_ERROR; } tmp = tmp->next; } } static void gvir_stream_update_events(GVirStream *stream) { GVirStreamPrivate *priv = stream->priv; int mask = 0; GList *tmp = priv->sources; while (tmp) { GVirStreamSource *source = tmp->data; if (source->cond & GVIR_STREAM_IO_CONDITION_READABLE) mask |= VIR_STREAM_EVENT_READABLE; if (source->cond & GVIR_STREAM_IO_CONDITION_WRITABLE) mask |= VIR_STREAM_EVENT_WRITABLE; tmp = tmp->next; } if (mask) { if (priv->eventRegistered) { virStreamEventUpdateCallback(priv->handle, mask); } else { virStreamEventAddCallback(priv->handle, mask, gvir_stream_handle_events, stream, NULL); priv->eventRegistered = TRUE; } } else { if (priv->eventRegistered) { virStreamEventRemoveCallback(priv->handle); priv->eventRegistered = FALSE; } } } static gboolean gvir_stream_source_prepare(GSource *source, gint *timeout) { GVirStreamSource *gsource = (GVirStreamSource*)source; if (gsource->newCond) { *timeout = 0; return TRUE; } *timeout = -1; return FALSE; } static gboolean gvir_stream_source_check(GSource *source) { GVirStreamSource *gsource = (GVirStreamSource*)source; if (gsource->newCond) return TRUE; return FALSE; } static gboolean gvir_stream_source_dispatch(GSource *source, GSourceFunc callback, gpointer user_data) { GVirStreamSource *gsource = (GVirStreamSource*)source; GVirStreamIOFunc func = (GVirStreamIOFunc)callback; gboolean ret; ret = func(gsource->stream, gsource->newCond, user_data); gsource->newCond = 0; return ret; } static void gvir_stream_source_finalize(GSource *source) { GVirStreamSource *gsource = (GVirStreamSource*)source; GVirStreamPrivate *priv = gsource->stream->priv; priv->sources = g_list_remove(priv->sources, source); gvir_stream_update_events(gsource->stream); g_clear_object(&gsource->stream); } GSourceFuncs gvir_stream_source_funcs = { .prepare = gvir_stream_source_prepare, .check = gvir_stream_source_check, .dispatch = gvir_stream_source_dispatch, .finalize = gvir_stream_source_finalize, }; /** * gvir_stream_add_watch: (skip) * @stream: the stream * @cond: the conditions to watch for (bitfield of #GVirStreamIOCondition) * @func: (closure opaque): the function to call when the condition is satisfied * @opaque: (closure): user data to pass to @func * * Adds a watch for @stream to the mainloop * * Returns: the event source id */ guint gvir_stream_add_watch(GVirStream *stream, GVirStreamIOCondition cond, GVirStreamIOFunc func, gpointer opaque) { return gvir_stream_add_watch_full(stream, G_PRIORITY_DEFAULT, cond, func, opaque, NULL); } /** * gvir_stream_add_watch_full: (rename-to gvir_stream_add_watch) * @stream: the stream * @priority: the priority of the #GVirStream source * @cond: the conditions to watch for (bitfield of #GVirStreamIOCondition) * @func: (closure opaque): the function to call when the condition is satisfied * @opaque: (closure): user data to pass to @func * @notify: the function to call when the source is removed * * Adds a watch for @stream to the mainloop * * Returns: the event source id */ guint gvir_stream_add_watch_full(GVirStream *stream, gint priority, GVirStreamIOCondition cond, GVirStreamIOFunc func, gpointer opaque, GDestroyNotify notify) { g_return_val_if_fail(GVIR_IS_STREAM(stream), 0); GVirStreamPrivate *priv = stream->priv; GVirStreamSource *source = (GVirStreamSource*)g_source_new(&gvir_stream_source_funcs, sizeof(GVirStreamSource)); guint ret; source->stream = g_object_ref(stream); source->cond = cond; if (priority != G_PRIORITY_DEFAULT) g_source_set_priority((GSource*)source, priority); priv->sources = g_list_append(priv->sources, source); gvir_stream_update_events(source->stream); g_source_set_callback((GSource*)source, (GSourceFunc)func, opaque, notify); ret = g_source_attach((GSource*)source, g_main_context_default()); g_source_unref((GSource*)source); return ret; }