/*
* Copyright (c) 2008-2012 Zmanda, Inc. All Rights Reserved.
* Copyright (c) 2013-2016 Carbonite, Inc. All Rights Reserved.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
* Contact information: Carbonite Inc., 756 N Pastoria Ave
* Sunnyvale, CA 94085, or: http://www.zmanda.com
*/
/* An S3 device uses Amazon's S3 service (http://www.amazon.com/s3) to store
* data. It stores data in keys named with a user-specified prefix, inside a
* user-specified bucket. Data is stored in the form of numbered (large)
* blocks.
*/
#include "amanda.h"
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <dirent.h>
#include <regex.h>
#include <time.h>
#include "amutil.h"
#include "conffile.h"
#include "device.h"
#include "s3-device.h"
#include "s3.h"
#include <curl/curl.h>
#ifdef HAVE_OPENSSL_HMAC_H
# include <openssl/hmac.h>
#else
# ifdef HAVE_CRYPTO_HMAC_H
# include <crypto/hmac.h>
# else
# ifdef HAVE_HMAC_H
# include <hmac.h>
# endif
# endif
#endif
/*
* Constants and static data
*/
#define S3_DEVICE_NAME "s3"
/* Maximum key length as specified in the S3 documentation
* (*excluding* null terminator) */
#define S3_MAX_KEY_LENGTH 1024
/* Note: for compatability, min can only be decreased and max increased */
#define S3_DEVICE_MIN_BLOCK_SIZE 1024
#define S3_DEVICE_MAX_BLOCK_SIZE (3*1024*1024*1024ULL)
#define S3_DEVICE_DEFAULT_BLOCK_SIZE (10*1024*1024)
#define EOM_EARLY_WARNING_ZONE_BLOCKS 4
/* This goes in lieu of file number for metadata. */
#define SPECIAL_INFIX "special-"
/* pointer to the class of our parent */
static DeviceClass *parent_class = NULL;
/*
* device-specific properties
*/
/* Authentication information for Amazon S3. Both of these are strings. */
DevicePropertyBase device_property_s3_access_key;
DevicePropertyBase device_property_s3_secret_key;
static DevicePropertyBase device_property_s3_session_token;
//#define PROPERTY_S3_SECRET_KEY (device_property_s3_secret_key.ID)
//#define PROPERTY_S3_ACCESS_KEY (device_property_s3_access_key.ID)
#define PROPERTY_S3_SESSION_TOKEN (device_property_s3_session_token.ID)
/* Authentication information for Openstack Swift. Both of these are strings. */
static DevicePropertyBase device_property_swift_account_id;
static DevicePropertyBase device_property_swift_access_key;
#define PROPERTY_SWIFT_ACCOUNT_ID (device_property_swift_account_id.ID)
#define PROPERTY_SWIFT_ACCESS_KEY (device_property_swift_access_key.ID)
/* Authentication information for Openstack Swift. Both of these are strings. */
static DevicePropertyBase device_property_username;
static DevicePropertyBase device_property_password;
static DevicePropertyBase device_property_tenant_id;
static DevicePropertyBase device_property_tenant_name;
static DevicePropertyBase device_property_project_name;
static DevicePropertyBase device_property_domain_name;
#define PROPERTY_USERNAME (device_property_username.ID)
#define PROPERTY_PASSWORD (device_property_password.ID)
#define PROPERTY_TENANT_ID (device_property_tenant_id.ID)
#define PROPERTY_TENANT_NAME (device_property_tenant_name.ID)
#define PROPERTY_PROJECT_NAME (device_property_project_name.ID)
#define PROPERTY_DOMAIN_NAME (device_property_domain_name.ID)
/* Host and path */
static DevicePropertyBase device_property_s3_host;
static DevicePropertyBase device_property_s3_service_path;
#define PROPERTY_S3_HOST (device_property_s3_host.ID)
#define PROPERTY_S3_SERVICE_PATH (device_property_s3_service_path.ID)
/* Same, but for S3 with DevPay. */
static DevicePropertyBase device_property_s3_user_token;
#define PROPERTY_S3_USER_TOKEN (device_property_s3_user_token.ID)
/* Location constraint for new buckets created on Amazon S3. */
static DevicePropertyBase device_property_s3_bucket_location;
#define PROPERTY_S3_BUCKET_LOCATION (device_property_s3_bucket_location.ID)
/* Storage class */
static DevicePropertyBase device_property_s3_storage_class;
#define PROPERTY_S3_STORAGE_CLASS (device_property_s3_storage_class.ID)
/* Server side encryption */
static DevicePropertyBase device_property_s3_server_side_encryption;
#define PROPERTY_S3_SERVER_SIDE_ENCRYPTION (device_property_s3_server_side_encryption.ID)
/* Which strotage api to use. */
static DevicePropertyBase device_property_storage_api;
#define PROPERTY_STORAGE_API (device_property_storage_api.ID)
/* Whether to use openstack protocol. */
/* DEPRECATED */
static DevicePropertyBase device_property_openstack_swift_api;
#define PROPERTY_OPENSTACK_SWIFT_API (device_property_openstack_swift_api.ID)
/* Whether to use chunked transfer-encoding */
static DevicePropertyBase device_property_chunked;
#define PROPERTY_CHUNKED (device_property_chunked.ID)
/* Whether to use SSL with Amazon S3. */
static DevicePropertyBase device_property_s3_ssl;
#define PROPERTY_S3_SSL (device_property_s3_ssl.ID)
/* Whether to use subdomain */
static DevicePropertyBase device_property_s3_subdomain;
#define PROPERTY_S3_SUBDOMAIN (device_property_s3_subdomain.ID)
/* Whether to use s3 multi-part upload */
static DevicePropertyBase device_property_s3_multi_part_upload;
#define PROPERTY_S3_MULTI_PART_UPLOAD (device_property_s3_multi_part_upload.ID)
/* If the s3 server have the multi-delete functionality */
static DevicePropertyBase device_property_s3_multi_delete;
#define PROPERTY_S3_MULTI_DELETE (device_property_s3_multi_delete.ID)
/* The client_id for OAUTH2 */
static DevicePropertyBase device_property_client_id;
#define PROPERTY_CLIENT_ID (device_property_client_id.ID)
/* The client_secret for OAUTH2 */
static DevicePropertyBase device_property_client_secret;
#define PROPERTY_CLIENT_SECRET (device_property_client_secret.ID)
/* The refresh token for OAUTH2 */
static DevicePropertyBase device_property_refresh_token;
#define PROPERTY_REFRESH_TOKEN (device_property_refresh_token.ID)
/* The PROJECT ID */
static DevicePropertyBase device_property_project_id;
#define PROPERTY_PROJECT_ID (device_property_project_id.ID)
/* The PROJECT ID */
static DevicePropertyBase device_property_create_bucket;
#define PROPERTY_CREATE_BUCKET (device_property_create_bucket.ID)
/* glacier */
static DevicePropertyBase device_property_read_from_glacier;
#define PROPERTY_READ_FROM_GLACIER (device_property_read_from_glacier.ID)
static DevicePropertyBase device_property_transition_to_glacier;
#define PROPERTY_TRANSITION_TO_GLACIER (device_property_transition_to_glacier.ID)
static DevicePropertyBase device_property_timeout;
#define PROPERTY_TIMEOUT (device_property_timeout.ID)
/* CAStor replication values for objects and buckets */
static DevicePropertyBase device_property_s3_reps;
#define PROPERTY_S3_REPS (device_property_s3_reps.ID)
#define S3_DEVICE_REPS_DEFAULT "2"
static DevicePropertyBase device_property_s3_reps_bucket;
#define PROPERTY_S3_REPS_BUCKET (device_property_s3_reps_bucket.ID)
#define S3_DEVICE_REPS_BUCKET_DEFAULT "4"
/*
* prototypes
*/
void s3_device_register(void);
/*
* utility functions */
/* Given file and block numbers, return an S3 key.
*
* @param self: the S3Device object
* @param file: the file number
* @returns: a newly allocated string containing the prefix for the file.
*/
static char *
file_to_prefix(S3Device *self,
int file);
/* Given file and block numbers, return an S3 key.
*
* @param self: the S3Device object
* @param file: the file number
* @param block: the block within that file
* @returns: a newly allocated string containing an S3 key.
*/
static char *
file_and_block_to_key(S3Device *self,
int file,
guint64 block);
/* Given file, return an S3 key.
*
* @param self: the S3Device object
* @param file: the file number
* @returns: a newly allocated string containing an S3 key.
*/
static char *
file_to_multi_part_key(S3Device *self,
int file);
/* Given the name of a special file (such as 'tapestart'), generate
* the S3 key to use for that file.
*
* @param self: the S3Device object
* @param special_name: name of the special file
* @param file: a file number to include; omitted if -1
* @returns: a newly alocated string containing an S3 key.
*/
static char *
special_file_to_key(S3Device *self,
char *special_name,
int file);
/* Write an amanda header file to S3.
*
* @param self: the S3Device object
* @param label: the volume label
* @param timestamp: the volume timestamp
*/
static gboolean
write_amanda_header(S3Device *self,
char *label,
char * timestamp);
/* "Fast forward" this device to the end by looking up the largest file number
* present and setting the current file number one greater.
*
* @param self: the S3Device object
*/
static gboolean
seek_to_end(S3Device *self);
/* Find the number of the last file that contains any data (even just a header).
*
* @param self: the S3Device object
* @returns: the last file, or -1 in event of an error
*/
static int
find_last_file(S3Device *self);
/* Delete all blocks in the given file, including the filestart block
*
* @param self: the S3Device object
* @param file: the file to delete
*/
static gboolean
delete_file(S3Device *self,
int file);
/* Delete all files in the given device
*
* @param self: the S3Device object
*/
static gboolean
delete_all_files(S3Device *self);
/* Set up self->s3t as best as possible.
*
* The return value is TRUE iff self->s3t is useable.
*
* @param self: the S3Device object
* @returns: TRUE if the handle is set up
*/
static gboolean
setup_handle(S3Device * self);
static void
s3_wait_thread_delete(S3Device *self);
static gboolean
catalog_open(S3Device *self);
static gboolean
catalog_reset(S3Device *self,
char *header,
char *label);
static gboolean
catalog_remove(S3Device *self);
static gboolean
catalog_close(S3Device *self);
/*
* class mechanics */
static void
s3_device_init(S3Device * o);
static void
s3_device_class_init(S3DeviceClass * c);
static void
s3_device_base_init( S3DeviceClass *c);
static void
s3_device_finalize(GObject * o);
static Device*
s3_device_factory(char * device_name, char * device_type, char * device_node);
/*
* Property{Get,Set}Fns */
static gboolean s3_device_set_access_key_fn(Device *self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
static gboolean s3_device_set_secret_key_fn(Device *self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
static gboolean s3_device_set_session_token_fn(Device *self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
static gboolean s3_device_set_swift_account_id_fn(Device *self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
static gboolean s3_device_set_swift_access_key_fn(Device *self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
static gboolean s3_device_set_username(Device *self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
static gboolean s3_device_set_password(Device *self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
static gboolean s3_device_set_tenant_id(Device *self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
static gboolean s3_device_set_tenant_name(Device *self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
static gboolean s3_device_set_project_name(Device *self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
static gboolean s3_device_set_domain_name(Device *self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
static gboolean s3_device_set_user_token_fn(Device *self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
static gboolean s3_device_set_bucket_location_fn(Device *self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
static gboolean s3_device_set_storage_class_fn(Device *self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
static gboolean s3_device_set_server_side_encryption_fn(Device *self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
static gboolean s3_device_set_proxy_fn(Device *self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
static gboolean s3_device_set_ca_info_fn(Device *self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
static gboolean s3_device_set_verbose_fn(Device *self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
static gboolean s3_device_set_create_bucket_fn(Device *self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
static gboolean s3_device_set_read_from_glacier_fn(Device *self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
static gboolean s3_device_set_transition_to_glacier_fn(Device *self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
static gboolean s3_device_set_storage_api(Device *self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
static gboolean s3_device_set_openstack_swift_api_fn(Device *self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
static gboolean s3_device_set_s3_multi_delete_fn(Device *self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
static gboolean s3_device_set_chunked_fn(Device *self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
static gboolean s3_device_set_ssl_fn(Device *self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
static gboolean s3_device_set_reuse_connection_fn(Device *self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
static gboolean s3_device_set_timeout_fn(Device *self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
static gboolean s3_device_set_max_send_speed_fn(Device *self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
static gboolean s3_device_set_max_recv_speed_fn(Device *self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
static gboolean s3_device_set_nb_threads_backup(Device *self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
static gboolean s3_device_set_nb_threads_recovery(Device *self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
static gboolean s3_device_set_s3_multi_part_upload(Device *self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
static gboolean s3_device_set_max_volume_usage_fn(Device *p_self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
static gboolean property_set_leom_fn(Device *p_self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
static gboolean s3_device_set_enforce_max_volume_usage_fn(Device *p_self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
static gboolean s3_device_set_use_subdomain_fn(Device *p_self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
static gboolean s3_device_set_host_fn(Device *p_self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
static gboolean s3_device_set_service_path_fn(Device *p_self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
static gboolean s3_device_set_client_id_fn(Device *p_self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
static gboolean s3_device_set_client_secret_fn(Device *p_self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
static gboolean s3_device_set_refresh_token_fn(Device *p_self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
static gboolean s3_device_set_project_id_fn(Device *p_self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
static gboolean s3_device_set_reps_fn(Device *self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
static gboolean s3_device_set_reps_bucket_fn(Device *self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
static void s3_thread_read_block(gpointer thread_data,
gpointer data);
static void s3_thread_write_block(gpointer thread_data,
gpointer data);
static gboolean make_bucket(Device * pself);
/* Wait that all threads are done */
static void reset_thread(S3Device *self);
/*
* virtual functions */
static void
s3_device_open_device(Device *pself, char *device_name,
char * device_type, char * device_node);
static gboolean
s3_device_create(Device *pself);
static DeviceStatusFlags s3_device_read_label(Device * self);
static gboolean
s3_device_start(Device * self,
DeviceAccessMode mode,
char * label,
char * timestamp);
static gboolean
s3_device_finish(Device * self);
static guint64
s3_device_get_bytes_read(Device * self);
static guint64
s3_device_get_bytes_written(Device * self);
static gboolean
s3_device_start_file(Device * self,
dumpfile_t * jobInfo);
static DeviceWriteResult
s3_device_write_block(Device * self,
guint size,
gpointer data);
static gboolean
s3_device_finish_file(Device * self);
static gboolean
s3_device_init_seek_file(Device *pself,
guint file);
static dumpfile_t*
s3_device_seek_file(Device *pself,
guint file);
static gboolean
s3_device_seek_block(Device *pself,
guint64 block);
static int
s3_device_read_block(Device *pself,
gpointer data,
int *size_req,
int max_block);
static gboolean
s3_device_recycle_file(Device *pself,
guint file);
static gboolean
s3_device_erase(Device *pself);
static gboolean
s3_device_set_reuse(Device *pself);
static gboolean
s3_device_set_no_reuse(Device *pself,
char *label,
char *datestamp);
static gboolean
check_at_leom(S3Device *self,
guint64 size);
static gboolean
check_at_peom(S3Device *self,
guint64 size);
/*
* Private functions
*/
static char *
file_to_prefix(S3Device *self,
int file)
{
char *prefix = g_strdup_printf("%sf%08x", self->prefix, file);
g_assert(strlen(prefix) <= S3_MAX_KEY_LENGTH);
return prefix;
}
static char *
file_and_block_to_key(S3Device *self,
int file,
guint64 block)
{
char *s3_key = g_strdup_printf("%sf%08x-b%016llx.data",
self->prefix, file, (long long unsigned int)block);
g_assert(strlen(s3_key) <= S3_MAX_KEY_LENGTH);
return s3_key;
}
static char *
file_to_multi_part_key(S3Device *self,
int file)
{
char *s3_key = g_strdup_printf("%sf%08x-mp.data", self->prefix, file);
g_assert(strlen(s3_key) <= S3_MAX_KEY_LENGTH);
return s3_key;
}
static char *
special_file_to_key(S3Device *self,
char *special_name,
int file)
{
if (file == -1)
return g_strdup_printf("%s" SPECIAL_INFIX "%s", self->prefix, special_name);
else
return g_strdup_printf("%sf%08x-%s", self->prefix, file, special_name);
}
static gboolean
write_amanda_header(S3Device *self,
char *label,
char * timestamp)
{
CurlBuffer amanda_header = {NULL, 0, 0, 0, TRUE, NULL, NULL};
char * key = NULL;
gboolean result;
dumpfile_t * dumpinfo = NULL;
Device *d_self = DEVICE(self);
size_t header_size;
/* build the header */
header_size = 0; /* no minimum size */
dumpinfo = make_tapestart_header(DEVICE(self), label, timestamp);
amanda_header.buffer = device_build_amanda_header(DEVICE(self), dumpinfo,
&header_size);
if (amanda_header.buffer == NULL) {
device_set_error(d_self,
g_strdup(_("Amanda tapestart header won't fit in a single block!")),
DEVICE_STATUS_DEVICE_ERROR);
dumpfile_free(dumpinfo);
g_free(amanda_header.buffer);
return FALSE;
}
if(check_at_leom(self, header_size))
d_self->is_eom = TRUE;
if(check_at_peom(self, header_size)) {
d_self->is_eom = TRUE;
device_set_error(d_self,
g_strdup(_("No space left on device")),
DEVICE_STATUS_DEVICE_ERROR);
dumpfile_free(dumpinfo);
g_free(amanda_header.buffer);
return FALSE;
}
/* write out the header and flush the uploads. */
catalog_reset(self, amanda_header.buffer, label);
key = special_file_to_key(self, "tapestart", -1);
g_assert(header_size < G_MAXUINT); /* for cast to guint */
amanda_header.buffer_len = (guint)header_size;
result = s3_upload(self->s3t[0].s3, self->bucket, key, FALSE,
S3_BUFFER_READ_FUNCS,
&amanda_header, NULL, NULL);
g_free(amanda_header.buffer);
g_free(key);
if (!result) {
device_set_error(d_self,
g_strdup_printf(_("While writing amanda header: %s"), s3_strerror(self->s3t[0].s3)),
DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
dumpfile_free(dumpinfo);
} else {
dumpfile_free(d_self->volume_header);
d_self->volume_header = dumpinfo;
self->volume_bytes += header_size;
}
d_self->header_block_size = header_size;
return result;
}
static gboolean
seek_to_end(S3Device *self) {
int last_file;
Device *pself = DEVICE(self);
last_file = find_last_file(self);
if (last_file < 0)
return FALSE;
pself->file = last_file;
return TRUE;
}
/* Convert an object name into a file number, assuming the given prefix
* length. Returns -1 if the object name is invalid, or 0 if the object name
* is a "special" key. */
static int key_to_file(guint prefix_len, const char * key) {
int file;
int i;
/* skip the prefix */
if (strlen(key) <= prefix_len)
return -1;
key += prefix_len;
if (g_str_has_prefix(key, SPECIAL_INFIX)) {
return 0;
}
/* check that key starts with 'f' */
if (key[0] != 'f')
return -1;
key++;
/* check that key is of the form "%08x-" */
for (i = 0; i < 8; i++) {
if (!(key[i] >= '0' && key[i] <= '9') &&
!(key[i] >= 'a' && key[i] <= 'f') &&
!(key[i] >= 'A' && key[i] <= 'F')) break;
}
if (key[i] != '-') return -1;
if (i < 8) return -1;
/* convert the file number */
errno = 0;
file = strtoul(key, NULL, 16);
if (errno != 0) {
g_warning(_("unparseable file number '%s'"), key);
return -1;
}
return file;
}
static int
abort_partial_upload(S3Device *self)
{
Device *d_self = DEVICE(self);
gboolean result;
GSList *objects;
s3_object *part = NULL;
if (!self->use_s3_multi_part_upload)
return TRUE;
result = s3_list_keys(self->s3t[0].s3, self->bucket, "uploads", self->prefix, NULL, &objects, NULL);
if (!result) {
device_set_error(d_self,
g_strdup_printf(_("While listing partial upload: %s"), s3_strerror(self->s3t[0].s3)),
DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
return FALSE;
}
for (; objects; ) {
part = (s3_object *)objects->data;
objects = g_slist_remove(objects, objects->data);
g_debug("partial upload: %s : %s", part->key, part->uploadId);
s3_abort_multi_part_upload(self->s3t[0].s3, self->bucket, part->key, part->uploadId);
free_s3_object(part);
}
return TRUE;
}
/* Find the number of the last file that contains any data (even just a header).
* Returns -1 in event of an error
*/
static int
find_last_file(S3Device *self) {
gboolean result;
GSList *keys;
unsigned int prefix_len = strlen(self->prefix);
int last_file = 0;
Device *d_self = DEVICE(self);
/* list all keys matching C{PREFIX*-*}, stripping the C{-*} */
result = s3_list_keys(self->s3t[0].s3, self->bucket, NULL, self->prefix, "-", &keys, NULL);
if (!result) {
device_set_error(d_self,
g_strdup_printf(_("While listing S3 keys: %s"), s3_strerror(self->s3t[0].s3)),
DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
return -1;
}
for (; keys; keys = g_slist_remove(keys, keys->data)) {
s3_object *object = keys->data;
int file = key_to_file(prefix_len, object->prefix);
/* and if it's the last, keep it */
if (file > last_file)
last_file = file;
}
return last_file;
}
/* Find the number of the file following the requested one, if any.
* Returns 0 if there is no such file or -1 in event of an error
*/
static int
find_next_file(S3Device *self, int last_file) {
gboolean result;
GSList *objects;
unsigned int prefix_len = strlen(self->prefix);
int next_file = 0;
Device *d_self = DEVICE(self);
/* list all keys matching C{PREFIX*-*}, stripping the C{-*} */
result = s3_list_keys(self->s3t[0].s3, self->bucket, NULL, self->prefix, "-",
&objects, NULL);
if (!result) {
device_set_error(d_self,
g_strdup_printf(_("While listing S3 keys: %s"), s3_strerror(self->s3t[0].s3)),
DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
return -1;
}
for (; objects; objects = g_slist_remove(objects, objects->data)) {
int file;
s3_object *object = objects->data;
file = key_to_file(prefix_len, object->prefix);
if (file < 0) {
/* Set this in case we don't find a next file; this is not a
* hard error, so if we can find a next file we'll return that
* instead. */
next_file = -1;
}
if (file < next_file && file > last_file) {
next_file = file;
}
}
return next_file;
}
static gboolean
delete_file(S3Device *self,
int file)
{
int thread = -1;
gboolean result;
GSList *objects;
guint64 total_size = 0;
Device *d_self = DEVICE(self);
char *my_prefix;
if (file == -1) {
my_prefix = g_strdup_printf("%sf", self->prefix);
} else {
my_prefix = g_strdup_printf("%sf%08x-", self->prefix, file);
}
result = s3_list_keys(self->s3t[0].s3, self->bucket, NULL, my_prefix, NULL,
&objects, &total_size);
g_free(my_prefix);
if (!result) {
guint response_code;
s3_error_code_t s3_error_code;
CURLcode curl_code;
s3_error(self->s3t[0].s3, NULL, &response_code,
&s3_error_code, NULL, &curl_code, NULL);
if (response_code == 404 && s3_error_code == S3_ERROR_NoSuchBucket) {
return TRUE;
}
device_set_error(d_self,
g_strdup_printf(_("While listing S3 keys: %s"),
s3_strerror(self->s3t[0].s3)),
DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
return FALSE;
}
g_mutex_lock(self->thread_idle_mutex);
if (!self->objects) {
self->objects = objects;
} else {
self->objects = g_slist_concat(self->objects, objects);
}
if (!self->objects) {
g_mutex_unlock(self->thread_idle_mutex);
return TRUE;
}
// start the threads
for (thread = 0; thread < self->nb_threads; thread++) {
if (self->s3t[thread].idle == 1) {
/* Check if the thread is in error */
if (self->s3t[thread].errflags != DEVICE_STATUS_SUCCESS) {
device_set_error(d_self,
(char *)self->s3t[thread].errmsg,
self->s3t[thread].errflags);
self->s3t[thread].errflags = DEVICE_STATUS_SUCCESS;
self->s3t[thread].errmsg = NULL;
g_mutex_unlock(self->thread_idle_mutex);
s3_wait_thread_delete(self);
return FALSE;
}
self->s3t[thread].idle = 0;
self->s3t[thread].done = 0;
g_thread_pool_push(self->thread_pool_delete, &self->s3t[thread],
NULL);
}
}
g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
g_mutex_unlock(self->thread_idle_mutex);
self->volume_bytes = total_size;
s3_wait_thread_delete(self);
return TRUE;
}
static void
s3_thread_delete_block(
gpointer thread_data,
gpointer data)
{
static int count = 0;
S3_by_thread *s3t = (S3_by_thread *)thread_data;
Device *pself = (Device *)data;
S3Device *self = S3_DEVICE(pself);
int result = 1;
char *filename;
s3_object *object;
g_mutex_lock(self->thread_idle_mutex);
while (result && self->objects) {
if (self->use_s3_multi_delete) {
GSList *lobjects = NULL;
int n = 0;
while (self->objects && n<1000) {
object = self->objects->data;
self->objects = g_slist_remove(self->objects, object);
lobjects = g_slist_prepend(lobjects, object);
n++;
}
g_mutex_unlock(self->thread_idle_mutex);
result = s3_multi_delete(s3t->s3, (const char *)self->bucket,
lobjects);
if (result != 1) {
if (result == 2) {
g_debug("Deleting multiple keys not implemented");
} else { /* result == 0 */
g_debug("Deleteing multiple keys failed: %s",
s3_strerror(s3t->s3));
}
g_mutex_lock(self->thread_idle_mutex);
self->use_s3_multi_delete = 0;
/* re-add all filenames */
while (lobjects) {
object = lobjects->data;
lobjects = g_slist_remove(lobjects, object);
self->objects = g_slist_prepend(self->objects, object);
}
g_mutex_unlock(self->thread_idle_mutex);
} else {
slist_free_full(lobjects, free_s3_object);
}
} else {
object = self->objects->data;
self->objects = g_slist_remove(self->objects, object);
filename = object->key;
count++;
if (count >= 1000) {
g_debug("Deleting %s ...", filename);
count = 0;
}
g_mutex_unlock(self->thread_idle_mutex);
result = s3_delete(s3t->s3, (const char *)self->bucket,
(const char *)filename);
if (!result) {
s3t->errflags = DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR;
s3t->errmsg = g_strdup_printf(_("While deleting key '%s': %s"),
filename, s3_strerror(s3t->s3));
}
g_free(filename);
}
g_mutex_lock(self->thread_idle_mutex);
}
s3t->idle = 1;
s3t->done = 1;
g_cond_broadcast(self->thread_idle_cond);
g_mutex_unlock(self->thread_idle_mutex);
}
static void
s3_wait_thread_delete(S3Device *self)
{
Device *d_self = (Device *)self;
int idle_thread = 0;
int thread;
g_mutex_lock(self->thread_idle_mutex);
while (idle_thread != self->nb_threads) {
idle_thread = 0;
for (thread = 0; thread < self->nb_threads; thread++) {
if (self->s3t[thread].idle == 1) {
idle_thread++;
}
/* Check if the thread is in error */
if (self->s3t[thread].errflags != DEVICE_STATUS_SUCCESS) {
device_set_error(d_self, (char *)self->s3t[thread].errmsg,
self->s3t[thread].errflags);
self->s3t[thread].errflags = DEVICE_STATUS_SUCCESS;
self->s3t[thread].errmsg = NULL;
}
}
if (idle_thread != self->nb_threads) {
g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
}
}
g_mutex_unlock(self->thread_idle_mutex);
}
static gboolean
delete_all_files(S3Device *self)
{
return delete_file(self, -1);
}
/*
* Class mechanics
*/
void
s3_device_register(void)
{
static const char * device_prefix_list[] = { S3_DEVICE_NAME, NULL };
g_assert(s3_init());
/* set up our properties */
device_property_fill_and_register(&device_property_s3_secret_key,
G_TYPE_STRING, "s3_secret_key",
"Secret access key to authenticate with Amazon S3");
device_property_fill_and_register(&device_property_s3_access_key,
G_TYPE_STRING, "s3_access_key",
"Access key ID to authenticate with Amazon S3");
device_property_fill_and_register(&device_property_s3_session_token,
G_TYPE_STRING, "s3_session_token",
"Session token to authenticate with Amazon S3");
device_property_fill_and_register(&device_property_swift_account_id,
G_TYPE_STRING, "swift_account_id",
"Account ID to authenticate with openstack swift");
device_property_fill_and_register(&device_property_swift_access_key,
G_TYPE_STRING, "swift_access_key",
"Access key to authenticate with openstack swift");
device_property_fill_and_register(&device_property_username,
G_TYPE_STRING, "username",
"Username to authenticate with");
device_property_fill_and_register(&device_property_password,
G_TYPE_STRING, "password",
"password to authenticate with");
device_property_fill_and_register(&device_property_tenant_id,
G_TYPE_STRING, "tenant_id",
"tenant_id to authenticate with");
device_property_fill_and_register(&device_property_tenant_name,
G_TYPE_STRING, "tenant_name",
"tenant_name to authenticate with");
device_property_fill_and_register(&device_property_project_name,
G_TYPE_STRING, "project_name",
"project_name to authenticate with");
device_property_fill_and_register(&device_property_domain_name,
G_TYPE_STRING, "domain_name",
"domain_name to authenticate with");
device_property_fill_and_register(&device_property_s3_host,
G_TYPE_STRING, "s3_host",
"hostname:port of the server");
device_property_fill_and_register(&device_property_s3_service_path,
G_TYPE_STRING, "s3_service_path",
"path to add in the url");
device_property_fill_and_register(&device_property_s3_user_token,
G_TYPE_STRING, "s3_user_token",
"User token for authentication Amazon devpay requests");
device_property_fill_and_register(&device_property_s3_bucket_location,
G_TYPE_STRING, "s3_bucket_location",
"Location constraint for buckets on Amazon S3");
device_property_fill_and_register(&device_property_s3_storage_class,
G_TYPE_STRING, "s3_storage_class",
"Storage class as specified by Amazon (STANDARD or REDUCED_REDUNDANCY)");
device_property_fill_and_register(&device_property_s3_server_side_encryption,
G_TYPE_STRING, "s3_server_side_encryption",
"Serve side encryption as specified by Amazon (AES256)");
device_property_fill_and_register(&device_property_storage_api,
G_TYPE_STRING, "storage_api",
"Which cloud API to use.");
device_property_fill_and_register(&device_property_openstack_swift_api,
G_TYPE_STRING, "openstack_swift_api",
"Whether to use openstack protocol");
device_property_fill_and_register(&device_property_client_id,
G_TYPE_STRING, "client_id",
"client_id for use with oauth2");
device_property_fill_and_register(&device_property_client_secret,
G_TYPE_STRING, "client_secret",
"client_secret for use with oauth2");
device_property_fill_and_register(&device_property_refresh_token,
G_TYPE_STRING, "refresh_token",
"refresh_token for use with oauth2");
device_property_fill_and_register(&device_property_project_id,
G_TYPE_STRING, "project_id",
"project id for use with google");
device_property_fill_and_register(&device_property_chunked,
G_TYPE_BOOLEAN, "chunked",
"Whether to use chunked transfer-encoding");
device_property_fill_and_register(&device_property_s3_ssl,
G_TYPE_BOOLEAN, "s3_ssl",
"Whether to use SSL with Amazon S3");
device_property_fill_and_register(&device_property_create_bucket,
G_TYPE_BOOLEAN, "create_bucket",
"Whether to create/delete bucket");
device_property_fill_and_register(&device_property_read_from_glacier,
G_TYPE_BOOLEAN, "read_from_glacier",
"Whether to add code to read from glacier storage class");
device_property_fill_and_register(&device_property_transition_to_glacier,
G_TYPE_UINT64, "transition_to_glacier",
"The number of days to wait before migrating to glacier after set to no-reuse");
device_property_fill_and_register(&device_property_s3_subdomain,
G_TYPE_BOOLEAN, "s3_subdomain",
"Whether to use subdomain");
device_property_fill_and_register(&device_property_s3_multi_delete,
G_TYPE_BOOLEAN, "s3_multi_delete",
"Whether to use multi-delete");
device_property_fill_and_register(&device_property_s3_reps,
G_TYPE_STRING, "reps",
"Number of replicas for data objects in CAStor");
device_property_fill_and_register(&device_property_s3_reps_bucket,
G_TYPE_STRING, "reps_bucket",
"Number of replicas for automatically created buckets in CAStor");
device_property_fill_and_register(&device_property_s3_multi_part_upload,
G_TYPE_BOOLEAN, "s3_multi_part_upload",
"If multi part upload must be used");
device_property_fill_and_register(&device_property_timeout,
G_TYPE_UINT64, "timeout",
"The timeout for one tranfer");
/* register the device itself */
register_device(s3_device_factory, device_prefix_list);
}
GType
s3_device_get_type(void)
{
static GType type = 0;
if (G_UNLIKELY(type == 0)) {
static const GTypeInfo info = {
sizeof (S3DeviceClass),
(GBaseInitFunc) s3_device_base_init,
(GBaseFinalizeFunc) NULL,
(GClassInitFunc) s3_device_class_init,
(GClassFinalizeFunc) NULL,
NULL /* class_data */,
sizeof (S3Device),
0 /* n_preallocs */,
(GInstanceInitFunc) s3_device_init,
NULL
};
type = g_type_register_static (TYPE_DEVICE, "S3Device", &info,
(GTypeFlags)0);
}
return type;
}
static void
s3_device_init(S3Device * self)
{
Device * dself = DEVICE(self);
GValue response;
self->s3_api = S3_API_UNKNOWN;
self->volume_bytes = 0;
self->volume_limit = 0;
self->leom = TRUE;
self->enforce_volume_limit = FALSE;
self->use_subdomain = FALSE;
self->nb_threads = 1;
self->nb_threads_backup = 1;
self->nb_threads_recovery = 1;
self->use_s3_multi_part_upload = FALSE;
self->thread_pool_delete = NULL;
self->thread_pool_write = NULL;
self->thread_pool_read = NULL;
self->thread_idle_cond = NULL;
self->thread_idle_mutex = NULL;
self->use_s3_multi_delete = 1;
self->set_s3_multi_delete = 0;
self->reps = NULL;
self->reps_bucket = NULL;
self->transition_to_glacier = -1;
/* Register property values
* Note: Some aren't added until s3_device_open_device()
*/
bzero(&response, sizeof(response));
g_value_init(&response, CONCURRENCY_PARADIGM_TYPE);
g_value_set_enum(&response, CONCURRENCY_PARADIGM_SHARED_READ);
device_set_simple_property(dself, PROPERTY_CONCURRENCY,
&response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
g_value_unset(&response);
g_value_init(&response, STREAMING_REQUIREMENT_TYPE);
g_value_set_enum(&response, STREAMING_REQUIREMENT_NONE);
device_set_simple_property(dself, PROPERTY_STREAMING,
&response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
g_value_unset(&response);
g_value_init(&response, G_TYPE_BOOLEAN);
g_value_set_boolean(&response, TRUE);
device_set_simple_property(dself, PROPERTY_APPENDABLE,
&response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
g_value_unset(&response);
g_value_init(&response, G_TYPE_BOOLEAN);
g_value_set_boolean(&response, TRUE);
device_set_simple_property(dself, PROPERTY_PARTIAL_DELETION,
&response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
g_value_unset(&response);
g_value_init(&response, G_TYPE_BOOLEAN);
g_value_set_boolean(&response, TRUE);
device_set_simple_property(dself, PROPERTY_FULL_DELETION,
&response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
g_value_unset(&response);
g_value_init(&response, G_TYPE_BOOLEAN);
g_value_set_boolean(&response, TRUE); /* well, there *is* no EOM on S3 .. */
device_set_simple_property(dself, PROPERTY_LEOM,
&response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
g_value_unset(&response);
g_value_init(&response, G_TYPE_BOOLEAN);
g_value_set_boolean(&response, FALSE);
device_set_simple_property(dself, PROPERTY_ENFORCE_MAX_VOLUME_USAGE,
&response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
g_value_unset(&response);
g_value_init(&response, G_TYPE_BOOLEAN);
g_value_set_boolean(&response, FALSE);
device_set_simple_property(dself, PROPERTY_S3_SUBDOMAIN,
&response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
g_value_unset(&response);
g_value_init(&response, G_TYPE_BOOLEAN);
g_value_set_boolean(&response, FALSE);
device_set_simple_property(dself, PROPERTY_COMPRESSION,
&response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
g_value_unset(&response);
g_value_init(&response, MEDIA_ACCESS_MODE_TYPE);
g_value_set_enum(&response, MEDIA_ACCESS_MODE_READ_WRITE);
device_set_simple_property(dself, PROPERTY_MEDIUM_ACCESS_TYPE,
&response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
g_value_unset(&response);
}
static void
s3_device_class_init(S3DeviceClass * c G_GNUC_UNUSED)
{
GObjectClass *g_object_class = (GObjectClass*) c;
DeviceClass *device_class = (DeviceClass *)c;
parent_class = g_type_class_ref (TYPE_DEVICE);
device_class->open_device = s3_device_open_device;
device_class->create = s3_device_create;
device_class->read_label = s3_device_read_label;
device_class->start = s3_device_start;
device_class->finish = s3_device_finish;
device_class->get_bytes_read = s3_device_get_bytes_read;
device_class->get_bytes_written = s3_device_get_bytes_written;
device_class->start_file = s3_device_start_file;
device_class->write_block = s3_device_write_block;
device_class->finish_file = s3_device_finish_file;
device_class->init_seek_file = s3_device_init_seek_file;
device_class->seek_file = s3_device_seek_file;
device_class->seek_block = s3_device_seek_block;
device_class->read_block = s3_device_read_block;
device_class->recycle_file = s3_device_recycle_file;
device_class->erase = s3_device_erase;
device_class->set_reuse = s3_device_set_reuse;
device_class->set_no_reuse = s3_device_set_no_reuse;
g_object_class->finalize = s3_device_finalize;
}
static void
s3_device_base_init(
S3DeviceClass *c)
{
DeviceClass *device_class = (DeviceClass *)c;
device_class_register_property(device_class, PROPERTY_S3_ACCESS_KEY,
PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
device_simple_property_get_fn,
s3_device_set_access_key_fn);
device_class_register_property(device_class, PROPERTY_S3_SECRET_KEY,
PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
device_simple_property_get_fn,
s3_device_set_secret_key_fn);
device_class_register_property(device_class, PROPERTY_S3_SESSION_TOKEN,
PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
device_simple_property_get_fn,
s3_device_set_session_token_fn);
device_class_register_property(device_class, PROPERTY_SWIFT_ACCOUNT_ID,
PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
device_simple_property_get_fn,
s3_device_set_swift_account_id_fn);
device_class_register_property(device_class, PROPERTY_SWIFT_ACCESS_KEY,
PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
device_simple_property_get_fn,
s3_device_set_swift_access_key_fn);
device_class_register_property(device_class, PROPERTY_USERNAME,
PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
device_simple_property_get_fn,
s3_device_set_username);
device_class_register_property(device_class, PROPERTY_PASSWORD,
PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
device_simple_property_get_fn,
s3_device_set_password);
device_class_register_property(device_class, PROPERTY_TENANT_ID,
PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
device_simple_property_get_fn,
s3_device_set_tenant_id);
device_class_register_property(device_class, PROPERTY_TENANT_NAME,
PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
device_simple_property_get_fn,
s3_device_set_tenant_name);
device_class_register_property(device_class, PROPERTY_PROJECT_NAME,
PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
device_simple_property_get_fn,
s3_device_set_project_name);
device_class_register_property(device_class, PROPERTY_DOMAIN_NAME,
PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
device_simple_property_get_fn,
s3_device_set_domain_name);
device_class_register_property(device_class, PROPERTY_S3_HOST,
PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
device_simple_property_get_fn,
s3_device_set_host_fn);
device_class_register_property(device_class, PROPERTY_S3_SERVICE_PATH,
PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
device_simple_property_get_fn,
s3_device_set_service_path_fn);
device_class_register_property(device_class, PROPERTY_S3_USER_TOKEN,
PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
device_simple_property_get_fn,
s3_device_set_user_token_fn);
device_class_register_property(device_class, PROPERTY_S3_BUCKET_LOCATION,
PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
device_simple_property_get_fn,
s3_device_set_bucket_location_fn);
device_class_register_property(device_class, PROPERTY_S3_STORAGE_CLASS,
PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
device_simple_property_get_fn,
s3_device_set_storage_class_fn);
device_class_register_property(device_class, PROPERTY_S3_SERVER_SIDE_ENCRYPTION,
PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
device_simple_property_get_fn,
s3_device_set_server_side_encryption_fn);
device_class_register_property(device_class, PROPERTY_PROXY,
PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
device_simple_property_get_fn,
s3_device_set_proxy_fn);
device_class_register_property(device_class, PROPERTY_SSL_CA_INFO,
PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
device_simple_property_get_fn,
s3_device_set_ca_info_fn);
device_class_register_property(device_class, PROPERTY_VERBOSE,
PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
device_simple_property_get_fn,
s3_device_set_verbose_fn);
device_class_register_property(device_class, PROPERTY_CREATE_BUCKET,
PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
device_simple_property_get_fn,
s3_device_set_create_bucket_fn);
device_class_register_property(device_class, PROPERTY_READ_FROM_GLACIER,
PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
device_simple_property_get_fn,
s3_device_set_read_from_glacier_fn);
device_class_register_property(device_class, PROPERTY_TRANSITION_TO_GLACIER,
PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
device_simple_property_get_fn,
s3_device_set_transition_to_glacier_fn);
device_class_register_property(device_class, PROPERTY_STORAGE_API,
PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
device_simple_property_get_fn,
s3_device_set_storage_api);
device_class_register_property(device_class, PROPERTY_OPENSTACK_SWIFT_API,
PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
device_simple_property_get_fn,
s3_device_set_openstack_swift_api_fn);
device_class_register_property(device_class, PROPERTY_S3_MULTI_DELETE,
PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
device_simple_property_get_fn,
s3_device_set_s3_multi_delete_fn);
device_class_register_property(device_class, PROPERTY_CHUNKED,
PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
device_simple_property_get_fn,
s3_device_set_chunked_fn);
device_class_register_property(device_class, PROPERTY_S3_SSL,
PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
device_simple_property_get_fn,
s3_device_set_ssl_fn);
device_class_register_property(device_class, PROPERTY_REUSE_CONNECTION,
PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
device_simple_property_get_fn,
s3_device_set_reuse_connection_fn);
device_class_register_property(device_class, PROPERTY_TIMEOUT,
PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
device_simple_property_get_fn,
s3_device_set_timeout_fn);
device_class_register_property(device_class, PROPERTY_MAX_SEND_SPEED,
PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
device_simple_property_get_fn,
s3_device_set_max_send_speed_fn);
device_class_register_property(device_class, PROPERTY_MAX_RECV_SPEED,
PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
device_simple_property_get_fn,
s3_device_set_max_recv_speed_fn);
device_class_register_property(device_class, PROPERTY_NB_THREADS_BACKUP,
PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
device_simple_property_get_fn,
s3_device_set_nb_threads_backup);
device_class_register_property(device_class, PROPERTY_NB_THREADS_RECOVERY,
PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
device_simple_property_get_fn,
s3_device_set_nb_threads_recovery);
device_class_register_property(device_class, PROPERTY_S3_MULTI_PART_UPLOAD,
PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
device_simple_property_get_fn,
s3_device_set_s3_multi_part_upload);
device_class_register_property(device_class, PROPERTY_COMPRESSION,
PROPERTY_ACCESS_GET_MASK,
device_simple_property_get_fn,
NULL);
device_class_register_property(device_class, PROPERTY_LEOM,
PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
device_simple_property_get_fn,
property_set_leom_fn);
device_class_register_property(device_class, PROPERTY_MAX_VOLUME_USAGE,
(PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_MASK) &
(~ PROPERTY_ACCESS_SET_INSIDE_FILE_WRITE),
device_simple_property_get_fn,
s3_device_set_max_volume_usage_fn);
device_class_register_property(device_class, PROPERTY_ENFORCE_MAX_VOLUME_USAGE,
(PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_MASK) &
(~ PROPERTY_ACCESS_SET_INSIDE_FILE_WRITE),
device_simple_property_get_fn,
s3_device_set_enforce_max_volume_usage_fn);
device_class_register_property(device_class, PROPERTY_S3_SUBDOMAIN,
PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
device_simple_property_get_fn,
s3_device_set_use_subdomain_fn);
device_class_register_property(device_class, PROPERTY_CLIENT_ID,
PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
device_simple_property_get_fn,
s3_device_set_client_id_fn);
device_class_register_property(device_class, PROPERTY_CLIENT_SECRET,
PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
device_simple_property_get_fn,
s3_device_set_client_secret_fn);
device_class_register_property(device_class, PROPERTY_REFRESH_TOKEN,
PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
device_simple_property_get_fn,
s3_device_set_refresh_token_fn);
device_class_register_property(device_class, PROPERTY_PROJECT_ID,
PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
device_simple_property_get_fn,
s3_device_set_project_id_fn);
device_class_register_property(device_class, PROPERTY_S3_REPS,
PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
device_simple_property_get_fn,
s3_device_set_reps_fn);
device_class_register_property(device_class, PROPERTY_S3_REPS_BUCKET,
PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
device_simple_property_get_fn,
s3_device_set_reps_bucket_fn);
}
static gboolean
s3_device_set_access_key_fn(Device *p_self, DevicePropertyBase *base,
GValue *val, PropertySurety surety, PropertySource source)
{
S3Device *self = S3_DEVICE(p_self);
amfree(self->access_key);
self->access_key = g_value_dup_string(val);
device_clear_volume_details(p_self);
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
static gboolean
s3_device_set_secret_key_fn(Device *p_self, DevicePropertyBase *base,
GValue *val, PropertySurety surety, PropertySource source)
{
S3Device *self = S3_DEVICE(p_self);
amfree(self->secret_key);
self->secret_key = g_value_dup_string(val);
device_clear_volume_details(p_self);
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
static gboolean
s3_device_set_session_token_fn(Device *p_self, DevicePropertyBase *base,
GValue *val, PropertySurety surety, PropertySource source)
{
S3Device *self = S3_DEVICE(p_self);
amfree(self->session_token);
self->session_token = g_value_dup_string(val);
device_clear_volume_details(p_self);
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
static gboolean
s3_device_set_swift_account_id_fn(Device *p_self, DevicePropertyBase *base,
GValue *val, PropertySurety surety, PropertySource source)
{
S3Device *self = S3_DEVICE(p_self);
amfree(self->swift_account_id);
self->swift_account_id = g_value_dup_string(val);
device_clear_volume_details(p_self);
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
static gboolean
s3_device_set_swift_access_key_fn(Device *p_self, DevicePropertyBase *base,
GValue *val, PropertySurety surety, PropertySource source)
{
S3Device *self = S3_DEVICE(p_self);
amfree(self->swift_access_key);
self->swift_access_key = g_value_dup_string(val);
device_clear_volume_details(p_self);
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
static gboolean
s3_device_set_username(Device *p_self, DevicePropertyBase *base,
GValue *val, PropertySurety surety, PropertySource source)
{
S3Device *self = S3_DEVICE(p_self);
amfree(self->username);
self->username = g_value_dup_string(val);
device_clear_volume_details(p_self);
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
static gboolean
s3_device_set_password(Device *p_self, DevicePropertyBase *base,
GValue *val, PropertySurety surety, PropertySource source)
{
S3Device *self = S3_DEVICE(p_self);
amfree(self->password);
self->password = g_value_dup_string(val);
device_clear_volume_details(p_self);
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
static gboolean
s3_device_set_tenant_id(Device *p_self, DevicePropertyBase *base,
GValue *val, PropertySurety surety, PropertySource source)
{
S3Device *self = S3_DEVICE(p_self);
amfree(self->tenant_id);
self->tenant_id = g_value_dup_string(val);
device_clear_volume_details(p_self);
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
static gboolean
s3_device_set_tenant_name(Device *p_self, DevicePropertyBase *base,
GValue *val, PropertySurety surety, PropertySource source)
{
S3Device *self = S3_DEVICE(p_self);
amfree(self->tenant_name);
self->tenant_name = g_value_dup_string(val);
device_clear_volume_details(p_self);
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
static gboolean
s3_device_set_project_name(Device *p_self, DevicePropertyBase *base,
GValue *val, PropertySurety surety, PropertySource source)
{
S3Device *self = S3_DEVICE(p_self);
amfree(self->project_name);
self->project_name = g_value_dup_string(val);
device_clear_volume_details(p_self);
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
static gboolean
s3_device_set_domain_name(Device *p_self, DevicePropertyBase *base,
GValue *val, PropertySurety surety, PropertySource source)
{
S3Device *self = S3_DEVICE(p_self);
amfree(self->domain_name);
self->domain_name = g_value_dup_string(val);
device_clear_volume_details(p_self);
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
static gboolean
s3_device_set_host_fn(Device *p_self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source)
{
S3Device *self = S3_DEVICE(p_self);
amfree(self->host);
self->host = g_value_dup_string(val);
device_clear_volume_details(p_self);
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
static gboolean
s3_device_set_service_path_fn(Device *p_self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source)
{
S3Device *self = S3_DEVICE(p_self);
amfree(self->service_path);
self->service_path = g_value_dup_string(val);
device_clear_volume_details(p_self);
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
static gboolean
s3_device_set_user_token_fn(Device *p_self, DevicePropertyBase *base,
GValue *val, PropertySurety surety, PropertySource source)
{
S3Device *self = S3_DEVICE(p_self);
amfree(self->user_token);
self->user_token = g_value_dup_string(val);
device_clear_volume_details(p_self);
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
static gboolean
s3_device_set_bucket_location_fn(Device *p_self, DevicePropertyBase *base,
GValue *val, PropertySurety surety, PropertySource source)
{
S3Device *self = S3_DEVICE(p_self);
char *str_val = g_value_dup_string(val);
if (str_val[0] && self->use_ssl && !s3_curl_location_compat()) {
device_set_error(p_self, g_strdup(_(
"Location constraint given for Amazon S3 bucket, "
"but libcurl is too old support wildcard certificates.")),
DEVICE_STATUS_DEVICE_ERROR);
goto fail;
}
if (str_val[0] && !s3_bucket_location_compat(self->bucket)) {
device_set_error(p_self, g_strdup_printf(_(
"Location constraint given for Amazon S3 bucket, "
"but the bucket name (%s) is not usable as a subdomain."),
self->bucket),
DEVICE_STATUS_DEVICE_ERROR);
goto fail;
}
amfree(self->bucket_location);
self->bucket_location = str_val;
device_clear_volume_details(p_self);
return device_simple_property_set_fn(p_self, base, val, surety, source);
fail:
g_free(str_val);
return FALSE;
}
static gboolean
s3_device_set_storage_class_fn(Device *p_self, DevicePropertyBase *base,
GValue *val, PropertySurety surety, PropertySource source)
{
S3Device *self = S3_DEVICE(p_self);
char *str_val = g_value_dup_string(val);
amfree(self->storage_class);
self->storage_class = str_val;
device_clear_volume_details(p_self);
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
static gboolean
s3_device_set_server_side_encryption_fn(Device *p_self, DevicePropertyBase *base,
GValue *val, PropertySurety surety, PropertySource source)
{
S3Device *self = S3_DEVICE(p_self);
char *str_val = g_value_dup_string(val);
amfree(self->server_side_encryption);
self->server_side_encryption = str_val;
device_clear_volume_details(p_self);
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
static gboolean
s3_device_set_proxy_fn(Device *p_self, DevicePropertyBase *base,
GValue *val, PropertySurety surety, PropertySource source)
{
S3Device *self = S3_DEVICE(p_self);
char *str_val = g_value_dup_string(val);
amfree(self->proxy);
self->proxy = str_val;
device_clear_volume_details(p_self);
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
static gboolean
s3_device_set_ca_info_fn(Device *p_self, DevicePropertyBase *base,
GValue *val, PropertySurety surety, PropertySource source)
{
S3Device *self = S3_DEVICE(p_self);
amfree(self->ca_info);
self->ca_info = g_value_dup_string(val);
device_clear_volume_details(p_self);
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
static gboolean
s3_device_set_verbose_fn(Device *p_self, DevicePropertyBase *base,
GValue *val, PropertySurety surety, PropertySource source)
{
S3Device *self = S3_DEVICE(p_self);
int thread;
self->verbose = g_value_get_boolean(val);
/* Our S3 handle may not yet have been instantiated; if so, it will
* get the proper verbose setting when it is created */
if (self->s3t) {
for (thread = 0; thread < self->nb_threads; thread++) {
if (self->s3t[thread].s3)
s3_verbose(self->s3t[thread].s3, self->verbose);
}
}
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
static gboolean
s3_device_set_create_bucket_fn(Device *p_self, DevicePropertyBase *base,
GValue *val, PropertySurety surety, PropertySource source)
{
S3Device *self = S3_DEVICE(p_self);
self->create_bucket = g_value_get_boolean(val);
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
static gboolean
s3_device_set_read_from_glacier_fn(Device *p_self, DevicePropertyBase *base,
GValue *val, PropertySurety surety, PropertySource source)
{
S3Device *self = S3_DEVICE(p_self);
self->read_from_glacier = g_value_get_boolean(val);
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
static gboolean
s3_device_set_transition_to_glacier_fn(Device *p_self, DevicePropertyBase *base,
GValue *val, PropertySurety surety, PropertySource source)
{
S3Device *self = S3_DEVICE(p_self);
self->transition_to_glacier = g_value_get_uint64(val);
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
static gboolean
s3_device_set_storage_api(Device *p_self, DevicePropertyBase *base,
GValue *val, PropertySurety surety, PropertySource source)
{
S3Device *self = S3_DEVICE(p_self);
const char *storage_api = g_value_get_string(val);
if (g_str_equal(storage_api, "S3")) {
self->s3_api = S3_API_S3;
if (!self->set_s3_multi_delete)
self->use_s3_multi_delete = 1;
} else if (g_str_equal(storage_api, "SWIFT-1.0")) {
self->s3_api = S3_API_SWIFT_1;
if (!self->set_s3_multi_delete)
self->use_s3_multi_delete = 0;
} else if (g_str_equal(storage_api, "SWIFT-2.0")) {
self->s3_api = S3_API_SWIFT_2;
if (!self->set_s3_multi_delete)
self->use_s3_multi_delete = 0;
} else if (g_str_equal(storage_api, "SWIFT-3")) {
self->s3_api = S3_API_SWIFT_3;
if (!self->set_s3_multi_delete)
self->use_s3_multi_delete = 1;
} else if (g_str_equal(storage_api, "OAUTH2")) {
self->s3_api = S3_API_OAUTH2;
if (!self->set_s3_multi_delete)
self->use_s3_multi_delete = 0;
} else if (g_str_equal(storage_api, "AWS4")) {
self->s3_api = S3_API_AWS4;
if (!self->set_s3_multi_delete)
self->use_s3_multi_delete = 1;
} else if (g_str_equal(storage_api, "CASTOR")) {
#if LIBCURL_VERSION_NUM >= 0x071301
curl_version_info_data *info;
if (!self->set_s3_multi_delete)
self->use_s3_multi_delete = 0;
/* check the runtime version too */
info = curl_version_info(CURLVERSION_NOW);
if (info->version_num >= 0x071301) {
self->s3_api = S3_API_CASTOR;
} else {
device_set_error(p_self, g_strdup_printf(_(
"Error setting STORAGE-API to castor "
"(You must install libcurl 7.19.1 or newer)")),
DEVICE_STATUS_DEVICE_ERROR);
return FALSE;
}
#else
device_set_error(p_self, g_strdup_printf(_(
"Error setting STORAGE-API to castor "
"This amanda is compiled with a too old libcurl, you must compile with libcurl 7.19.1 or newer")),
DEVICE_STATUS_DEVICE_ERROR);
return FALSE;
#endif
} else {
g_debug("Invalid STORAGE_API, using \"S3\".");
self->s3_api = S3_API_S3;
}
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
static gboolean
s3_device_set_openstack_swift_api_fn(Device *p_self, DevicePropertyBase *base,
GValue *val, PropertySurety surety, PropertySource source)
{
const gboolean openstack_swift_api = g_value_get_boolean(val);
if (openstack_swift_api) {
GValue storage_api_val;
bzero(&storage_api_val, sizeof(GValue));
g_value_init(&storage_api_val, G_TYPE_STRING);
g_value_set_static_string(&storage_api_val, "SWIFT-1.0");
return s3_device_set_storage_api(p_self, base, &storage_api_val,
surety, source);
}
return TRUE;
}
static gboolean
s3_device_set_s3_multi_delete_fn(Device *p_self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source)
{
S3Device *self = S3_DEVICE(p_self);
self->use_s3_multi_delete = g_value_get_boolean(val);
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
static gboolean
s3_device_set_ssl_fn(Device *p_self, DevicePropertyBase *base,
GValue *val, PropertySurety surety, PropertySource source)
{
S3Device *self = S3_DEVICE(p_self);
gboolean new_val;
int thread;
new_val = g_value_get_boolean(val);
/* Our S3 handle may not yet have been instantiated; if so, it will
* get the proper use_ssl setting when it is created */
if (self->s3t) {
for (thread = 0; thread < self->nb_threads; thread++) {
if (self->s3t[thread].s3 && !s3_use_ssl(self->s3t[thread].s3, new_val)) {
device_set_error(p_self, g_strdup_printf(_(
"Error setting S3 SSL/TLS use "
"(tried to enable SSL/TLS for S3, but curl doesn't support it?)")),
DEVICE_STATUS_DEVICE_ERROR);
return FALSE;
}
}
}
self->use_ssl = new_val;
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
static gboolean
s3_device_set_chunked_fn(Device *p_self, DevicePropertyBase *base,
GValue *val, PropertySurety surety, PropertySource source)
{
S3Device *self = S3_DEVICE(p_self);
gboolean new_val;
new_val = g_value_get_boolean(val);
self->chunked = new_val;
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
static gboolean
s3_device_set_reuse_connection_fn(Device *p_self, DevicePropertyBase *base,
GValue *val, PropertySurety surety, PropertySource source)
{
S3Device *self = S3_DEVICE(p_self);
self->reuse_connection = g_value_get_boolean(val);
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
static gboolean
s3_device_set_timeout_fn(Device *p_self, DevicePropertyBase *base,
GValue *val, PropertySurety surety, PropertySource source)
{
S3Device *self = S3_DEVICE(p_self);
self->timeout = g_value_get_uint64(val);
if (self->timeout > 0 && self->timeout < 300)
self->timeout = 300;
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
static gboolean
s3_device_set_max_send_speed_fn(Device *p_self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source)
{
S3Device *self = S3_DEVICE(p_self);
guint64 new_val;
int thread;
new_val = g_value_get_uint64(val);
if (new_val && new_val < 5120) {
for (thread = 0; thread < self->nb_threads; thread++) {
device_set_error(p_self,
g_strdup("MAX-SEND-SPEED property is too low (minimum value is 5120)"),
DEVICE_STATUS_DEVICE_ERROR);
}
return FALSE;
}
if (self->s3t) {
for (thread = 0; thread < self->nb_threads; thread++) {
if (self->s3t[thread].s3 && !s3_set_max_send_speed(self->s3t[thread].s3, new_val)) {
device_set_error(p_self,
g_strdup("Could not set S3 maximum send speed"),
DEVICE_STATUS_DEVICE_ERROR);
return FALSE;
}
}
}
self->max_send_speed = new_val;
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
static gboolean
s3_device_set_max_recv_speed_fn(Device *p_self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source)
{
S3Device *self = S3_DEVICE(p_self);
guint64 new_val;
int thread;
new_val = g_value_get_uint64(val);
if (new_val && new_val < 5120) {
for (thread = 0; thread < self->nb_threads; thread++) {
device_set_error(p_self,
g_strdup("MAX-RECV-SPEED property is too low (minimum value is 5120)"),
DEVICE_STATUS_DEVICE_ERROR);
}
return FALSE;
}
if (self->s3t) {
for (thread = 0; thread < self->nb_threads; thread++) {
if (self->s3t[thread].s3 &&
!s3_set_max_recv_speed(self->s3t[thread].s3, new_val)) {
device_set_error(p_self,
g_strdup("Could not set S3 maximum recv speed"),
DEVICE_STATUS_DEVICE_ERROR);
return FALSE;
}
}
}
self->max_recv_speed = new_val;
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
static gboolean
s3_device_set_nb_threads_backup(Device *p_self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source)
{
S3Device *self = S3_DEVICE(p_self);
guint64 new_val;
new_val = g_value_get_uint64(val);
self->nb_threads_backup = new_val;
if (self->nb_threads_backup > self->nb_threads) {
self->nb_threads = self->nb_threads_backup;
}
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
static gboolean
s3_device_set_nb_threads_recovery(Device *p_self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source)
{
S3Device *self = S3_DEVICE(p_self);
guint64 new_val;
new_val = g_value_get_uint64(val);
self->nb_threads_recovery = new_val;
if (self->nb_threads_recovery > self->nb_threads) {
self->nb_threads = self->nb_threads_recovery;
}
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
static gboolean
s3_device_set_s3_multi_part_upload(Device *p_self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source)
{
S3Device *self = S3_DEVICE(p_self);
self->use_s3_multi_part_upload = g_value_get_boolean(val);
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
static gboolean
s3_device_set_max_volume_usage_fn(Device *p_self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source)
{
S3Device *self = S3_DEVICE(p_self);
self->volume_limit = g_value_get_uint64(val);
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
static gboolean
s3_device_set_enforce_max_volume_usage_fn(Device *p_self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source)
{
S3Device *self = S3_DEVICE(p_self);
self->enforce_volume_limit = g_value_get_boolean(val);
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
static gboolean
s3_device_set_use_subdomain_fn(Device *p_self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source)
{
S3Device *self = S3_DEVICE(p_self);
self->use_subdomain = g_value_get_boolean(val);
if (self->use_subdomain && !s3_bucket_location_compat(self->bucket)) {
device_set_error(p_self, g_strdup_printf(_(
"S3-SUBDOMAIN is set, "
"but the bucket name (%s) is not usable as a subdomain, only [a-zo-9-] characters are allowed."),
self->bucket),
DEVICE_STATUS_DEVICE_ERROR);
self->use_subdomain = FALSE;
return FALSE;
}
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
static gboolean
property_set_leom_fn(Device *p_self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source)
{
S3Device *self = S3_DEVICE(p_self);
self->leom = g_value_get_boolean(val);
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
static gboolean
s3_device_set_client_id_fn(Device *p_self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source)
{
S3Device *self = S3_DEVICE(p_self);
amfree(self->client_id);
self->client_id = g_value_dup_string(val);
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
static gboolean
s3_device_set_client_secret_fn(Device *p_self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source)
{
S3Device *self = S3_DEVICE(p_self);
amfree(self->client_secret);
self->client_secret = g_value_dup_string(val);
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
static gboolean
s3_device_set_refresh_token_fn(Device *p_self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source)
{
S3Device *self = S3_DEVICE(p_self);
amfree(self->refresh_token);
self->refresh_token = g_value_dup_string(val);
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
static gboolean
s3_device_set_project_id_fn(Device *p_self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source)
{
S3Device *self = S3_DEVICE(p_self);
amfree(self->project_id);
self->project_id = g_value_dup_string(val);
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
static gboolean
s3_device_set_reps_fn(Device *p_self, DevicePropertyBase *base,
GValue *val, PropertySurety surety, PropertySource source)
{
S3Device *self = S3_DEVICE(p_self);
amfree(self->reps);
self->reps = g_value_dup_string(val);
device_clear_volume_details(p_self);
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
static gboolean
s3_device_set_reps_bucket_fn(Device *p_self, DevicePropertyBase *base,
GValue *val, PropertySurety surety, PropertySource source)
{
S3Device *self = S3_DEVICE(p_self);
amfree(self->reps_bucket);
self->reps_bucket = g_value_dup_string(val);
device_clear_volume_details(p_self);
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
static Device*
s3_device_factory(char * device_name, char * device_type, char * device_node)
{
Device *rval;
g_assert(g_str_equal(device_type, S3_DEVICE_NAME));
rval = DEVICE(g_object_new(TYPE_S3_DEVICE, NULL));
device_open_device(rval, device_name, device_type, device_node);
return rval;
}
/*
* Virtual function overrides
*/
static void
s3_device_open_device(Device *pself, char *device_name,
char * device_type, char * device_node)
{
S3Device *self = S3_DEVICE(pself);
char * name_colon;
GValue tmp_value;
pself->min_block_size = S3_DEVICE_MIN_BLOCK_SIZE;
pself->max_block_size = S3_DEVICE_MAX_BLOCK_SIZE;
pself->block_size = S3_DEVICE_DEFAULT_BLOCK_SIZE;
/* Device name may be bucket/prefix, to support multiple volumes in a
* single bucket. */
name_colon = strchr(device_node, '/');
if (name_colon == NULL) {
self->bucket = g_strdup(device_node);
self->prefix = g_strdup("");
} else {
self->bucket = g_strndup(device_node, name_colon - device_node);
self->prefix = g_strdup(name_colon + 1);
}
if (self->bucket == NULL || self->bucket[0] == '\0') {
device_set_error(pself,
g_strdup_printf(_("Empty bucket name in device %s"), device_name),
DEVICE_STATUS_DEVICE_ERROR);
amfree(self->bucket);
amfree(self->prefix);
return;
}
if (self->reps == NULL) {
self->reps = g_strdup(S3_DEVICE_REPS_DEFAULT);
}
if (self->reps_bucket == NULL) {
self->reps_bucket = g_strdup(S3_DEVICE_REPS_BUCKET_DEFAULT);
}
g_debug(_("S3 driver using bucket '%s', prefix '%s'"), self->bucket, self->prefix);
g_debug("curl version: %s", curl_version());
#ifdef LIBCURL_USE_OPENSSL
g_debug("curl compiled for OPENSSL");
#else
#if defined LIBCURL_USE_GNUTLS
g_debug("curl compiled for GNUTLS");
#else
g_debug("curl compiled for NSS");
#endif
#endif
/* default values */
self->verbose = FALSE;
self->s3_api = S3_API_UNKNOWN;
/* use SSL if available */
self->use_ssl = s3_curl_supports_ssl();
bzero(&tmp_value, sizeof(GValue));
g_value_init(&tmp_value, G_TYPE_BOOLEAN);
g_value_set_boolean(&tmp_value, self->use_ssl);
device_set_simple_property(pself, device_property_s3_ssl.ID,
&tmp_value, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DEFAULT);
/* reuse connection */
self->reuse_connection = TRUE;
bzero(&tmp_value, sizeof(GValue));
g_value_init(&tmp_value, G_TYPE_BOOLEAN);
g_value_set_boolean(&tmp_value, self->reuse_connection);
device_set_simple_property(pself, device_property_reuse_connection.ID,
&tmp_value, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DEFAULT);
/* timeout */
self->timeout = 0;
bzero(&tmp_value, sizeof(GValue));
g_value_init(&tmp_value, G_TYPE_UINT64);
g_value_set_uint64(&tmp_value, self->timeout);
device_set_simple_property(pself, device_property_timeout.ID,
&tmp_value, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DEFAULT);
/* Set default create_bucket */
self->create_bucket = TRUE;
bzero(&tmp_value, sizeof(GValue));
g_value_init(&tmp_value, G_TYPE_BOOLEAN);
g_value_set_boolean(&tmp_value, self->create_bucket);
device_set_simple_property(pself, device_property_create_bucket.ID,
&tmp_value, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DEFAULT);
if (parent_class->open_device) {
parent_class->open_device(pself, device_name, device_type, device_node);
}
}
static gboolean
s3_device_create(Device *pself)
{
S3Device *self = S3_DEVICE(pself);
guint response_code;
s3_error_code_t s3_error_code;
if (!setup_handle(self)) {
/* setup_handle already set our error message */
return FALSE;
}
if (!s3_make_bucket(self->s3t[0].s3, self->bucket, self->project_id)) {
s3_error(self->s3t[0].s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL);
device_set_error(pself,
g_strdup_printf(_("While creating S3 bucket: %s"), s3_strerror(self->s3t[0].s3)),
DEVICE_STATUS_DEVICE_ERROR);
return FALSE;
}
self->bucket_made = TRUE;
if (parent_class->create) {
return parent_class->create(pself);
}
return TRUE;
}
static void s3_device_finalize(GObject * obj_self) {
S3Device *self = S3_DEVICE (obj_self);
int thread;
if(G_OBJECT_CLASS(parent_class)->finalize)
(* G_OBJECT_CLASS(parent_class)->finalize)(obj_self);
if (self->thread_pool_delete) {
g_thread_pool_free(self->thread_pool_delete, 1, 1);
self->thread_pool_delete = NULL;
}
if (self->thread_pool_write) {
g_thread_pool_free(self->thread_pool_write, 1, 1);
self->thread_pool_write = NULL;
}
if (self->thread_pool_read) {
g_thread_pool_free(self->thread_pool_read, 1, 1);
self->thread_pool_read = NULL;
}
if (self->thread_idle_mutex) {
g_mutex_free(self->thread_idle_mutex);
self->thread_idle_mutex = NULL;
}
if (self->thread_idle_cond) {
g_cond_free(self->thread_idle_cond);
self->thread_idle_cond = NULL;
}
if (self->s3t) {
for (thread = 0; thread < self->nb_threads; thread++) {
g_mutex_free(self->s3t[thread].now_mutex);
if(self->s3t[thread].s3) s3_free(self->s3t[thread].s3);
g_free(self->s3t[thread].curl_buffer.buffer);
}
g_free(self->s3t);
}
if (self->catalog_filename) {
catalog_close(self);
}
if(self->bucket) g_free(self->bucket);
if(self->prefix) g_free(self->prefix);
if(self->access_key) g_free(self->access_key);
if(self->secret_key) g_free(self->secret_key);
if(self->session_token) g_free(self->session_token);
if(self->swift_account_id) g_free(self->swift_account_id);
if(self->swift_access_key) g_free(self->swift_access_key);
if(self->username) g_free(self->username);
if(self->password) g_free(self->password);
if(self->tenant_id) g_free(self->tenant_id);
if(self->tenant_name) g_free(self->tenant_name);
if(self->project_name) g_free(self->project_name);
if(self->domain_name) g_free(self->domain_name);
if(self->host) g_free(self->host);
if(self->service_path) g_free(self->service_path);
if(self->user_token) g_free(self->user_token);
if(self->bucket_location) g_free(self->bucket_location);
if(self->storage_class) g_free(self->storage_class);
if(self->server_side_encryption) g_free(self->server_side_encryption);
if(self->proxy) g_free(self->proxy);
if(self->ca_info) g_free(self->ca_info);
if(self->reps) g_free(self->reps);
if(self->reps_bucket) g_free(self->reps_bucket);
}
static gboolean
catalog_open(
S3Device *self)
{
char *filename;
char *dirname;
FILE *file;
char line[1025];
/* create the directory */
filename = g_strdup_printf("bucket-%s", self->bucket);
dirname = config_dir_relative(filename);
if (mkdir(dirname, 0700) == -1 && errno != EEXIST) {
g_debug("Can't create catalog directory '%s': %s",
dirname, strerror(errno));
return FALSE;
}
amfree(filename);
amfree(dirname);
filename = g_strdup_printf("bucket-%s/%s", self->bucket, self->prefix);
g_free(self->catalog_filename);
self->catalog_filename = config_dir_relative(filename);
g_free(filename);
file = fopen(self->catalog_filename, "r");
if (!file) {
g_free(self->catalog_label);
g_free(self->catalog_header);
self->catalog_label = NULL;
self->catalog_header = NULL;
return TRUE;
}
if (!fgets(line, 1024, file)) {
fclose(file);
return FALSE;
}
if (line[strlen(line)-1] == '\n')
line[strlen(line)-1] = '\0';
g_free(self->catalog_label);
self->catalog_label = g_strdup(line+7);
if (!fgets(line, 1024, file)) {
fclose(file);
return FALSE;
}
if (line[strlen(line)-1] == '\n')
line[strlen(line)-1] = '\0';
g_free(self->catalog_header);
self->catalog_header = g_strdup(line+8);
fclose(file);
return TRUE;
}
static gboolean
write_catalog(
S3Device *self)
{
FILE *file;
if (!self->catalog_label || !self->catalog_header)
return TRUE;
file = fopen(self->catalog_filename, "w");
if (!file) {
return FALSE;
}
g_fprintf(file,"LABEL: %s\n", self->catalog_label);
g_fprintf(file,"HEADER: %s\n", self->catalog_header);
fclose(file);
return TRUE;
}
static gboolean
catalog_reset(
S3Device *self,
char *header,
char *label)
{
g_free(self->catalog_header);
self->catalog_header = quote_string(header);
g_free(self->catalog_label);
self->catalog_label = g_strdup(label);
return write_catalog(self);
}
static gboolean
catalog_remove(
S3Device *self)
{
unlink(self->catalog_filename);
amfree(self->catalog_filename);
amfree(self->catalog_label);
amfree(self->catalog_header);
return TRUE;
}
static gboolean
catalog_close(
S3Device *self)
{
gboolean result;
result = write_catalog(self);
amfree(self->catalog_filename);
amfree(self->catalog_label);
amfree(self->catalog_header);
return result;
}
static gboolean
setup_handle(S3Device * self) {
Device *d_self = DEVICE(self);
int thread;
guint response_code;
s3_error_code_t s3_error_code;
CURLcode curl_code;
catalog_open(self);
if (self->s3_api == S3_API_UNKNOWN) {
if (self->host && strlen(self->host) > 14 &&
g_strncasecmp(self->host+strlen(self->host)-14, ".amazonaws.com", 14) == 0) {
self->s3_api = S3_API_AWS4;
} else {
self->s3_api = S3_API_S3;
}
}
if (self->s3t == NULL) {
if (self->s3_api == S3_API_S3) {
if (self->access_key == NULL || self->access_key[0] == '\0') {
device_set_error(d_self,
g_strdup(_("No Amazon access key specified")),
DEVICE_STATUS_DEVICE_ERROR);
return FALSE;
}
if (self->secret_key == NULL || self->secret_key[0] == '\0') {
device_set_error(d_self,
g_strdup(_("No Amazon secret key specified")),
DEVICE_STATUS_DEVICE_ERROR);
return FALSE;
}
} else if (self->s3_api == S3_API_SWIFT_1) {
if (self->swift_account_id == NULL ||
self->swift_account_id[0] == '\0') {
device_set_error(d_self,
g_strdup(_("No Swift account id specified")),
DEVICE_STATUS_DEVICE_ERROR);
return FALSE;
}
if (self->swift_access_key == NULL ||
self->swift_access_key[0] == '\0') {
device_set_error(d_self,
g_strdup(_("No Swift access key specified")),
DEVICE_STATUS_DEVICE_ERROR);
return FALSE;
}
} else if (self->s3_api == S3_API_SWIFT_2) {
if (!((self->username && self->password && self->tenant_id) ||
(self->username && self->password && self->tenant_name) ||
(self->access_key && self->secret_key && self->tenant_id) ||
(self->access_key && self->secret_key && self->tenant_name))) {
device_set_error(d_self,
g_strdup(_("Missing authorization properties")),
DEVICE_STATUS_DEVICE_ERROR);
return FALSE;
}
} else if (self->s3_api == S3_API_SWIFT_3) {
if (!(self->username && self->password)) {
// self->project_name & self->domain_name have default value
device_set_error(d_self,
g_strdup(_("Missing authorization properties")),
DEVICE_STATUS_DEVICE_ERROR);
return FALSE;
}
} else if (self->s3_api == S3_API_OAUTH2) {
if (self->client_id == NULL ||
self->client_id[0] == '\0') {
device_set_error(d_self,
g_strdup(_("Missing client_id properties")),
DEVICE_STATUS_DEVICE_ERROR);
return FALSE;
}
if (self->client_secret == NULL ||
self->client_secret[0] == '\0') {
device_set_error(d_self,
g_strdup(_("Missing client_secret properties")),
DEVICE_STATUS_DEVICE_ERROR);
return FALSE;
}
if (self->refresh_token == NULL ||
self->refresh_token[0] == '\0') {
device_set_error(d_self,
g_strdup(_("Missing refresh_token properties")),
DEVICE_STATUS_DEVICE_ERROR);
return FALSE;
}
if (self->project_id == NULL ||
self->project_id[0] == '\0') {
device_set_error(d_self,
g_strdup(_("Missing project_id properties")),
DEVICE_STATUS_DEVICE_ERROR);
return FALSE;
}
} else if (self->s3_api == S3_API_CASTOR) {
self->use_s3_multi_delete = 0;
self->use_subdomain = FALSE;
if(self->service_path) {
g_free(self->service_path);
self->service_path = NULL;
}
}
self->s3t = g_new0(S3_by_thread, self->nb_threads);
if (self->s3t == NULL) {
device_set_error(d_self,
g_strdup(_("Can't allocate S3Handle array")),
DEVICE_STATUS_DEVICE_ERROR);
return FALSE;
}
self->thread_idle_cond = g_cond_new();
self->thread_idle_mutex = g_mutex_new();
for (thread = 0; thread < self->nb_threads; thread++) {
self->s3t[thread].idle = 1;
self->s3t[thread].done = 1;
self->s3t[thread].eof = FALSE;
self->s3t[thread].errflags = DEVICE_STATUS_SUCCESS;
self->s3t[thread].errmsg = NULL;
self->s3t[thread].filename = NULL;
self->s3t[thread].curl_buffer.buffer = NULL;
self->s3t[thread].curl_buffer.buffer_len = 0;
self->s3t[thread].timeout = 0;
self->s3t[thread].now_mutex = g_mutex_new();
self->s3t[thread].s3 = s3_open(self->access_key, self->secret_key,
self->session_token,
self->swift_account_id,
self->swift_access_key,
self->host, self->service_path,
self->use_subdomain,
self->user_token, self->bucket_location,
self->storage_class, self->ca_info,
self->server_side_encryption,
self->proxy,
self->s3_api,
self->username,
self->password,
self->tenant_id,
self->tenant_name,
self->project_name,
self->domain_name,
self->client_id,
self->client_secret,
self->refresh_token,
self->reuse_connection,
self->read_from_glacier,
self->timeout,
self->reps, self->reps_bucket);
if (self->s3t[thread].s3 == NULL) {
device_set_error(d_self,
g_strdup(_("Internal error creating S3 handle")),
DEVICE_STATUS_DEVICE_ERROR);
self->nb_threads = thread+1;
return FALSE;
}
}
g_debug("Create %d threads", self->nb_threads);
self->thread_pool_delete = g_thread_pool_new(s3_thread_delete_block,
self, self->nb_threads, 0,
NULL);
self->thread_pool_write = g_thread_pool_new(s3_thread_write_block, self,
self->nb_threads, 0, NULL);
self->thread_pool_read = g_thread_pool_new(s3_thread_read_block, self,
self->nb_threads, 0, NULL);
for (thread = 0; thread < self->nb_threads; thread++) {
s3_verbose(self->s3t[thread].s3, self->verbose);
if (!s3_use_ssl(self->s3t[thread].s3, self->use_ssl)) {
device_set_error(d_self, g_strdup_printf(_(
"Error setting S3 SSL/TLS use "
"(tried to enable SSL/TLS for S3, but curl doesn't support it?)")),
DEVICE_STATUS_DEVICE_ERROR);
return FALSE;
}
if (self->max_send_speed && self->max_send_speed < 5120) {
device_set_error(d_self,
g_strdup("MAX-SEND-SPEED property is too low (minimum value is 5120)"),
DEVICE_STATUS_DEVICE_ERROR);
return FALSE;
}
if (self->max_send_speed &&
!s3_set_max_send_speed(self->s3t[thread].s3,
self->max_send_speed)) {
device_set_error(d_self,
g_strdup("Could not set S3 maximum send speed"),
DEVICE_STATUS_DEVICE_ERROR);
return FALSE;
}
if (self->max_recv_speed && self->max_recv_speed < 5120) {
device_set_error(d_self,
g_strdup("MAX-RECV-SPEED property is too low (minimum value is 5120)"),
DEVICE_STATUS_DEVICE_ERROR);
return FALSE;
}
if (self->max_recv_speed &&
!s3_set_max_recv_speed(self->s3t[thread].s3,
self->max_recv_speed)) {
device_set_error(d_self,
g_strdup("Could not set S3 maximum recv speed"),
DEVICE_STATUS_DEVICE_ERROR);
return FALSE;
}
}
for (thread = 0; thread < self->nb_threads; thread++) {
if (!s3_open2(self->s3t[thread].s3)) {
if (self->s3_api == S3_API_SWIFT_1 ||
self->s3_api == S3_API_SWIFT_2 ||
self->s3_api == S3_API_SWIFT_3) {
s3_error(self->s3t[0].s3, NULL, &response_code,
&s3_error_code, NULL, &curl_code, NULL);
device_set_error(d_self,
g_strdup_printf(_("s3_open2 failed: %s"),
s3_strerror(self->s3t[0].s3)),
DEVICE_STATUS_DEVICE_ERROR);
self->nb_threads = thread+1;
return FALSE;
} else {
device_set_error(d_self,
g_strdup("s3_open2 failed"),
DEVICE_STATUS_DEVICE_ERROR);
return FALSE;
}
}
}
}
return TRUE;
}
static gboolean
make_bucket(
Device * pself)
{
S3Device *self = S3_DEVICE(pself);
guint response_code;
s3_error_code_t s3_error_code;
CURLcode curl_code;
if (self->bucket_made) {
return TRUE;
}
if (s3_is_bucket_exists(self->s3t[0].s3, self->bucket, self->prefix, self->project_id)) {
self->bucket_made = TRUE;
abort_partial_upload(self);
return TRUE;
}
s3_error(self->s3t[0].s3, NULL, &response_code, &s3_error_code, NULL, &curl_code, NULL);
if (response_code == 0 && s3_error_code == 0 &&
(curl_code == CURLE_COULDNT_CONNECT ||
curl_code == CURLE_COULDNT_RESOLVE_HOST)) {
device_set_error(pself,
g_strdup_printf(_("While connecting to %s bucket: %s"),
S3_name[self->s3_api], s3_strerror(self->s3t[0].s3)),
DEVICE_STATUS_DEVICE_ERROR);
return FALSE;
}
if (!self->create_bucket) {
device_set_error(pself,
g_strdup_printf(_("Can't list bucket: %s"),
s3_strerror(self->s3t[0].s3)),
DEVICE_STATUS_DEVICE_ERROR);
return FALSE;
}
if (!s3_make_bucket(self->s3t[0].s3, self->bucket, self->project_id)) {
s3_error(self->s3t[0].s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL);
/* if it isn't an expected error (bucket already exists),
* return FALSE */
if (response_code != 409 ||
(s3_error_code != S3_ERROR_BucketAlreadyExists &&
s3_error_code != S3_ERROR_BucketAlreadyOwnedByYou)) {
device_set_error(pself,
g_strdup_printf(_("While creating new S3 bucket: %s"), s3_strerror(self->s3t[0].s3)),
DEVICE_STATUS_DEVICE_ERROR);
return FALSE;
}
}
self->bucket_made = TRUE;
abort_partial_upload(self);
return TRUE;
}
static int progress_func(
void *thread_data,
double dltotal G_GNUC_UNUSED,
double dlnow,
double ultotal G_GNUC_UNUSED,
double ulnow)
{
S3_by_thread *s3t = (S3_by_thread *)thread_data;
time_t now = time(NULL);
int ret = 0;
guint64 lnow;
if (dlnow < 1 && ulnow < 1) {
return 0;
}
g_mutex_lock(s3t->now_mutex);
lnow = dlnow;
if (s3t->dlnow != lnow) {
s3t->dlnow = lnow;
if (s3t->timeout > 0) {
s3t->timeout = now + 300;
}
}
lnow = ulnow;
if (s3t->ulnow != lnow) {
s3t->ulnow = lnow;
if (s3t->timeout > 0) {
s3t->timeout = now + 300;
}
}
if (s3t->timeout > 0 && now > s3t->timeout) {
g_debug("progress_func timeout");
ret = -1;
}
g_mutex_unlock(s3t->now_mutex);
return ret;
}
static DeviceStatusFlags
s3_device_read_label(Device *pself) {
S3Device *self = S3_DEVICE(pself);
char *key;
CurlBuffer buf = {NULL, 0, 0, S3_DEVICE_MAX_BLOCK_SIZE, TRUE, NULL, NULL};
dumpfile_t *amanda_header;
gboolean result;
/* note that this may be called from s3_device_start, when
* self->access_mode is not ACCESS_NULL */
amfree(pself->volume_label);
amfree(pself->volume_time);
dumpfile_free(pself->volume_header);
pself->volume_header = NULL;
if (device_in_error(self)) return pself->status;
if (!setup_handle(self)) {
/* setup_handle already set our error message */
return pself->status;
}
reset_thread(self);
if (self->catalog_label && self->catalog_header) {
char *header_buf;
header_buf = unquote_string(self->catalog_header);
amanda_header = g_new(dumpfile_t, 1);
fh_init(amanda_header);
if (strlen(header_buf) > 0) {
parse_file_header(header_buf, amanda_header, strlen(header_buf));
}
pself->header_block_size = strlen(header_buf);
g_free(header_buf);
pself->volume_header = amanda_header;
} else {
if (!make_bucket(pself)) {
return pself->status;
}
key = special_file_to_key(self, "tapestart", -1);
s3_device_init_seek_file(pself, 0);
result = s3_read(self->s3t[0].s3, self->bucket, key, S3_BUFFER_WRITE_FUNCS,
&buf, NULL, NULL);
g_free(key);
if (!result) {
guint response_code;
s3_error_code_t s3_error_code;
s3_error(self->s3t[0].s3, NULL, &response_code, &s3_error_code,
NULL, NULL, NULL);
g_free(buf.buffer);
/* if it's an expected error (not found), just return FALSE */
if (response_code == 404 &&
(s3_error_code == S3_ERROR_None ||
s3_error_code == S3_ERROR_NotFound ||
s3_error_code == S3_ERROR_Unknown ||
s3_error_code == S3_ERROR_NoSuchKey ||
s3_error_code == S3_ERROR_NoSuchEntity ||
s3_error_code == S3_ERROR_NoSuchBucket)) {
g_debug(_("Amanda header not found while reading tapestart header (this is expected for empty tapes)"));
device_set_error(pself,
g_strdup(_("Amanda header not found -- unlabeled volume?")),
DEVICE_STATUS_DEVICE_ERROR
| DEVICE_STATUS_VOLUME_ERROR
| DEVICE_STATUS_VOLUME_UNLABELED);
return pself->status;
}
/* otherwise, log it and return */
device_set_error(pself,
g_strdup_printf(_("While trying to read tapestart header: %s"),
s3_strerror(self->s3t[0].s3)),
DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
return pself->status;
}
/* handle an empty file gracefully */
if (buf.buffer_len == 0) {
device_set_error(pself, g_strdup(_("Empty header file")),
DEVICE_STATUS_VOLUME_ERROR);
g_free(buf.buffer);
return pself->status;
}
pself->header_block_size = buf.buffer_len;
g_assert(buf.buffer != NULL);
amanda_header = g_new(dumpfile_t, 1);
parse_file_header(buf.buffer, amanda_header, buf.buffer_pos);
pself->volume_header = amanda_header;
g_free(buf.buffer);
if (amanda_header->type != F_TAPESTART) {
device_set_error(pself, g_strdup(_("Invalid amanda header")),
DEVICE_STATUS_VOLUME_ERROR);
return pself->status;
}
if (!self->catalog_label || self->catalog_header) {
size_t header_size = 0;
char *buf;
buf = device_build_amanda_header(DEVICE(self), amanda_header,
&header_size);
catalog_reset(self, buf, amanda_header->name);
g_free(buf);
}
}
pself->volume_label = g_strdup(amanda_header->name);
pself->volume_time = g_strdup(amanda_header->datestamp);
/* pself->volume_header is already set */
device_set_error(pself, NULL, DEVICE_STATUS_SUCCESS);
return pself->status;
}
static gboolean
s3_device_start (Device * pself, DeviceAccessMode mode,
char * label, char * timestamp) {
S3Device * self;
GSList *keys;
guint64 total_size = 0;
gboolean result;
self = S3_DEVICE(pself);
if (device_in_error(self)) return FALSE;
if (!setup_handle(self)) {
/* setup_handle already set our error message */
return FALSE;
}
reset_thread(self);
pself->access_mode = mode;
g_mutex_lock(pself->device_mutex);
pself->in_file = FALSE;
g_mutex_unlock(pself->device_mutex);
/* try creating the bucket, in case it doesn't exist */
if (!make_bucket(pself)) {
return FALSE;
}
/* take care of any dirty work for this mode */
switch (mode) {
case ACCESS_READ:
if (pself->volume_label == NULL && s3_device_read_label(pself) != DEVICE_STATUS_SUCCESS) {
/* s3_device_read_label already set our error message */
return FALSE;
}
break;
case ACCESS_WRITE:
s3_device_set_reuse(pself);
if (!delete_all_files(self)) {
return FALSE;
}
/* write a new amanda header */
if (!write_amanda_header(self, label, timestamp)) {
return FALSE;
}
g_free(pself->volume_label);
pself->volume_label = g_strdup(label);
g_free(pself->volume_time);
pself->volume_time = g_strdup(timestamp);
/* unset the VOLUME_UNLABELED flag, if it was set */
device_set_error(pself, NULL, DEVICE_STATUS_SUCCESS);
break;
case ACCESS_APPEND:
if (pself->volume_label == NULL && s3_device_read_label(pself) != DEVICE_STATUS_SUCCESS) {
/* s3_device_read_label already set our error message */
return FALSE;
} else {
result = s3_list_keys(self->s3t[0].s3, self->bucket, NULL, self->prefix, NULL, &keys, &total_size);
if(!result) {
device_set_error(pself,
g_strdup_printf(_("While listing S3 keys: %s"), s3_strerror(self->s3t[0].s3)),
DEVICE_STATUS_DEVICE_ERROR|DEVICE_STATUS_VOLUME_ERROR);
return FALSE;
} else {
self->volume_bytes = total_size;
}
}
return seek_to_end(self);
break;
case ACCESS_NULL:
g_assert_not_reached();
}
return TRUE;
}
gint gint_cmp(gconstpointer a, gconstpointer b, gpointer data);
gint
gint_cmp(
gconstpointer a,
gconstpointer b,
gpointer data G_GNUC_UNUSED)
{
gint ai = GPOINTER_TO_INT(a);
gint bi = GPOINTER_TO_INT(b);
if (ai < bi)
return -1;
else if (ai > bi)
return 1;
else
return 0;
}
static gboolean
s3_device_finish (
Device * pself)
{
S3Device *self = S3_DEVICE(pself);
reset_thread(self);
/* we're not in a file anymore */
pself->access_mode = ACCESS_NULL;
if (device_in_error(pself)) return FALSE;
return TRUE;
}
/* functions for writing */
static guint64
s3_device_get_bytes_read(
Device * pself)
{
S3Device *self = S3_DEVICE(pself);
int thread;
guint64 dltotal;
g_mutex_unlock(pself->device_mutex);
/* Add per thread */
g_mutex_lock(self->thread_idle_mutex);
dltotal = self->dltotal;
for (thread = 0; thread < self->nb_threads_recovery; thread++) {
g_mutex_lock(self->s3t[thread].now_mutex);
dltotal += self->s3t[thread].dlnow;
g_mutex_unlock(self->s3t[thread].now_mutex);
}
g_mutex_unlock(self->thread_idle_mutex);
g_mutex_lock(pself->device_mutex);
return dltotal;
}
static guint64
s3_device_get_bytes_written(
Device * pself)
{
S3Device *self = S3_DEVICE(pself);
int thread;
guint64 ultotal;
g_mutex_unlock(pself->device_mutex);
/* Add per thread */
g_mutex_lock(self->thread_idle_mutex);
ultotal = self->ultotal;
for (thread = 0; thread < self->nb_threads_backup; thread++) {
g_mutex_lock(self->s3t[thread].now_mutex);
ultotal += self->s3t[thread].ulnow;
g_mutex_unlock(self->s3t[thread].now_mutex);
}
g_mutex_unlock(self->thread_idle_mutex);
g_mutex_lock(pself->device_mutex);
return ultotal;
}
static gboolean
s3_device_start_file (Device *pself, dumpfile_t *jobInfo) {
S3Device *self = S3_DEVICE(pself);
CurlBuffer amanda_header = {NULL, 0, 0, 0, TRUE, NULL, NULL};
gboolean result;
size_t header_size;
char *key;
int thread;
if (device_in_error(self)) return FALSE;
reset_thread(self);
pself->is_eom = FALSE;
/* Set the blocksize to zero, since there's no header to skip (it's stored
* in a distinct file, rather than block zero) */
jobInfo->blocksize = 0;
/* Build the amanda header. */
header_size = 0; /* no minimum size */
amanda_header.buffer = device_build_amanda_header(pself, jobInfo,
&header_size);
if (amanda_header.buffer == NULL) {
device_set_error(pself,
g_strdup(_("Amanda file header won't fit in a single block!")),
DEVICE_STATUS_DEVICE_ERROR);
return FALSE;
}
amanda_header.buffer_len = header_size;
if(check_at_leom(self, header_size))
pself->is_eom = TRUE;
if(check_at_peom(self, header_size)) {
pself->is_eom = TRUE;
device_set_error(pself,
g_strdup(_("No space left on device")),
DEVICE_STATUS_DEVICE_ERROR);
g_free(amanda_header.buffer);
return FALSE;
}
for (thread = 0; thread < self->nb_threads; thread++) {
self->s3t[thread].idle = 1;
self->s3t[thread].ulnow = 0;
}
/* set the file and block numbers correctly */
pself->file = (pself->file > 0)? pself->file+1 : 1;
pself->block = 0;
g_mutex_lock(pself->device_mutex);
pself->in_file = TRUE;
pself->bytes_written = 0;
g_mutex_unlock(pself->device_mutex);
g_mutex_lock(self->thread_idle_mutex);
self->ultotal = 0;
g_mutex_unlock(self->thread_idle_mutex);
/* write it out as a special block (not the 0th) */
key = special_file_to_key(self, "filestart", pself->file);
result = s3_upload(self->s3t[0].s3, self->bucket, key, FALSE,
S3_BUFFER_READ_FUNCS,
&amanda_header, NULL, NULL);
g_free(amanda_header.buffer);
g_free(key);
if (!result) {
device_set_error(pself,
g_strdup_printf(_("While writing filestart header: %s"), s3_strerror(self->s3t[0].s3)),
DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
return FALSE;
}
self->volume_bytes += header_size;
if (self->chunked) {
self->filename = file_to_multi_part_key(self, pself->file);
} else if (self->use_s3_multi_part_upload) {
self->filename = file_to_multi_part_key(self, pself->file);
self->uploadId = g_strdup(s3_initiate_multi_part_upload(self->s3t[0].s3,
self->bucket, self->filename));
self->part_etag = g_tree_new_full(gint_cmp, NULL, NULL, g_free);
}
return TRUE;
}
static DeviceWriteResult
s3_device_write_block (Device * pself, guint size, gpointer data) {
char *filename;
S3Device * self = S3_DEVICE(pself);
int idle_thread = 0;
int thread = -1;
int first_idle = -1;
guint allocate;
g_assert (self != NULL);
g_assert (data != NULL);
if (device_in_error(self)) return WRITE_FAILED;
if(check_at_leom(self, size))
pself->is_eom = TRUE;
if(check_at_peom(self, size)) {
pself->is_eom = TRUE;
device_set_error(pself,
g_strdup(_("No space left on device")),
DEVICE_STATUS_DEVICE_ERROR);
return WRITE_FAILED;
}
if (self->use_s3_multi_part_upload && self->uploadId) {
filename = g_strdup(self->filename);
} else if (self->chunked) {
filename = g_strdup(self->filename);
} else {
filename = file_and_block_to_key(self, pself->file, pself->block);
}
g_mutex_lock(self->thread_idle_mutex);
if (self->chunked) {
thread = 0;
if (pself->block == 0) {
allocate = size*2 + 1;
} else {
CurlBuffer *buf = &self->s3t[thread].curl_buffer;
g_mutex_lock(buf->mutex);
// Check for enough space in the buffer
while (1) {
guint avail;
if (buf->buffer_len > buf->buffer_pos) {
avail = buf->buffer_pos + buf->max_buffer_size - buf->buffer_len;
} else {
avail = buf->buffer_pos - buf->buffer_len;
}
if (avail > size) {
break;
}
g_cond_wait(buf->cond, buf->mutex);
}
// Copy the new data to the buffer
if (buf->buffer_len > buf->buffer_pos) {
guint count_end = buf->max_buffer_size - buf->buffer_len;
guint count_begin;
if (count_end > size)
count_end = size;
memcpy(buf->buffer + buf->buffer_len, data, count_end);
buf->buffer_len += count_end;
count_begin = size - count_end;
if (count_begin > 0) {
memcpy(buf->buffer, data + count_end, count_begin);
buf->buffer_len = count_begin;
}
} else {
memcpy(buf->buffer + buf->buffer_len, data, size);
buf->buffer_len += size;
}
g_cond_broadcast(buf->cond);
g_mutex_unlock(buf->mutex);
pself->block++;
self->volume_bytes += size;
g_mutex_unlock(self->thread_idle_mutex);
return WRITE_SUCCEED;
}
} else {
while (!idle_thread) {
idle_thread = 0;
for (thread = 0; thread < self->nb_threads_backup; thread++) {
if (self->s3t[thread].idle == 1) {
idle_thread++;
/* Check if the thread is in error */
if (self->s3t[thread].errflags != DEVICE_STATUS_SUCCESS) {
device_set_error(pself, (char *)self->s3t[thread].errmsg,
self->s3t[thread].errflags);
self->s3t[thread].errflags = DEVICE_STATUS_SUCCESS;
self->s3t[thread].errmsg = NULL;
g_mutex_unlock(self->thread_idle_mutex);
return WRITE_FAILED;
}
if (first_idle == -1) {
first_idle = thread;
break;
}
}
}
if (!idle_thread) {
g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
}
}
thread = first_idle;
allocate = size;
}
if (self->s3t[thread].curl_buffer.buffer &&
self->s3t[thread].curl_buffer.buffer_len < allocate) {
g_free((char *)self->s3t[thread].curl_buffer.buffer);
self->s3t[thread].curl_buffer.buffer = NULL;
self->s3t[thread].curl_buffer.buffer_len = 0;
self->s3t[thread].buffer_len = 0;
}
if (self->s3t[thread].curl_buffer.buffer == NULL) {
self->s3t[thread].curl_buffer.buffer = g_try_malloc(allocate);
if (self->s3t[thread].curl_buffer.buffer == NULL) {
device_set_error(pself, g_strdup("Failed to allocate memory"),
DEVICE_STATUS_DEVICE_ERROR);
g_mutex_unlock(self->thread_idle_mutex);
return WRITE_FAILED;
}
self->s3t[thread].curl_buffer.buffer_len = size;
self->s3t[thread].buffer_len = size;
}
self->s3t[thread].idle = 0;
self->s3t[thread].done = 0;
memcpy((char *)self->s3t[thread].curl_buffer.buffer, data, size);
self->s3t[thread].curl_buffer.buffer_pos = 0;
self->s3t[thread].curl_buffer.buffer_len = size;
self->s3t[thread].curl_buffer.max_buffer_size = allocate;
if (self->chunked) {
self->s3t[thread].curl_buffer.end_of_buffer = FALSE;
self->s3t[thread].curl_buffer.mutex = g_mutex_new();
self->s3t[thread].curl_buffer.cond = g_cond_new();
} else {
self->s3t[thread].curl_buffer.end_of_buffer = TRUE;
self->s3t[thread].curl_buffer.mutex = NULL;
self->s3t[thread].curl_buffer.cond = NULL;
}
self->s3t[thread].filename = filename;
self->s3t[thread].uploadId = g_strdup(self->uploadId);
self->s3t[thread].partNumber = pself->block + 1;
g_mutex_unlock(self->thread_idle_mutex);
g_thread_pool_push(self->thread_pool_write, &self->s3t[thread], NULL);
pself->block++;
self->volume_bytes += size;
return WRITE_SUCCEED;
}
static void
s3_thread_write_block(
gpointer thread_data,
gpointer data)
{
S3_by_thread *s3t = (S3_by_thread *)thread_data;
Device *pself = (Device *)data;
S3Device *self = S3_DEVICE(pself);
gboolean result;
char *etag = NULL;
if (s3t->uploadId) {
g_mutex_lock(s3t->now_mutex);
s3t->timeout = time(NULL) + 300;
g_mutex_unlock(s3t->now_mutex);
result = s3_part_upload(s3t->s3, self->bucket, (char *)s3t->filename,
(char *)s3t->uploadId, s3t->partNumber, &etag,
S3_BUFFER_READ_FUNCS,
(CurlBuffer *)&s3t->curl_buffer,
progress_func, s3t);
g_mutex_lock(s3t->now_mutex);
s3t->timeout = 0;
g_mutex_unlock(s3t->now_mutex);
} else {
g_mutex_lock(s3t->now_mutex);
s3t->timeout = time(NULL) + 300;
g_mutex_unlock(s3t->now_mutex);
result = s3_upload(s3t->s3, self->bucket, (char *)s3t->filename,
self->chunked,
S3_BUFFER_READ_FUNCS,
(CurlBuffer *)&s3t->curl_buffer,
progress_func, s3t);
g_mutex_lock(s3t->now_mutex);
s3t->timeout = 0;
g_mutex_unlock(s3t->now_mutex);
}
g_free((void *)s3t->filename);
g_free((void *)s3t->uploadId);
s3t->filename = NULL;
if (!result) {
s3t->errflags = DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR;
s3t->errmsg = g_strdup_printf(_("While writing data block to %s: %s"), S3_name[self->s3_api], s3_strerror(s3t->s3));
}
g_mutex_lock(self->thread_idle_mutex);
if (result && self->uploadId && etag) {
g_tree_insert(self->part_etag, GINT_TO_POINTER(s3t->partNumber), etag);
} else {
g_free(etag);
}
s3t->idle = 1;
s3t->done = 1;
if (result)
self->ultotal += s3t->curl_buffer.buffer_len;
s3t->curl_buffer.buffer_len = s3t->buffer_len;
s3t->ulnow = 0;
g_cond_broadcast(self->thread_idle_cond);
g_mutex_unlock(self->thread_idle_mutex);
}
gboolean add_part_etag(gpointer key, gpointer value, gpointer data);
gboolean
add_part_etag(
gpointer key,
gpointer value,
gpointer data)
{
int partnum = GPOINTER_TO_INT(key);
char *etag = (char *)value;
GString *buf = (GString *)data;
g_string_append_printf(buf,
" <Part>\n <PartNumber>%d</PartNumber>\n <ETag>%s</ETag>\n </Part>\n",
partnum, etag);
return FALSE;
}
static gboolean
s3_device_finish_file (Device * pself) {
S3Device *self = S3_DEVICE(pself);
/* Check all threads are done */
int idle_thread = 0;
int thread;
if (!pself->in_file)
return TRUE;
if (self->chunked) {
CurlBuffer *buf = &self->s3t[0].curl_buffer;
g_mutex_lock(buf->mutex);
buf->end_of_buffer = TRUE;
g_cond_broadcast(buf->cond);
g_mutex_unlock(buf->mutex);
}
g_mutex_lock(self->thread_idle_mutex);
while (idle_thread != self->nb_threads) {
idle_thread = 0;
for (thread = 0; thread < self->nb_threads; thread++) {
if (self->s3t[thread].idle == 1) {
idle_thread++;
}
/* check thread status */
if (self->s3t[thread].errflags != DEVICE_STATUS_SUCCESS) {
device_set_error(pself, (char *)self->s3t[thread].errmsg,
self->s3t[thread].errflags);
self->s3t[thread].errflags = DEVICE_STATUS_SUCCESS;
self->s3t[thread].errmsg = NULL;
}
}
if (idle_thread != self->nb_threads) {
g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
}
}
self->ultotal = 0;
g_mutex_unlock(self->thread_idle_mutex);
if (self->use_s3_multi_part_upload && self->uploadId) {
CurlBuffer data;
GString *buf = g_string_new("<CompleteMultipartUpload>\n");
g_tree_foreach(self->part_etag, add_part_etag, buf);
g_string_append_printf(buf, "</CompleteMultipartUpload>\n");
data.buffer = buf->str;
data.buffer_len = strlen(buf->str);
data.buffer_pos = 0;
data.max_buffer_size = data.buffer_len;
data.end_of_buffer = FALSE;
data.mutex = NULL;
data.cond = NULL;
s3_complete_multi_part_upload(self->s3t[0].s3,
self->bucket, self->filename, self->uploadId,
S3_BUFFER_READ_FUNCS, &data);
g_tree_destroy(self->part_etag);
self->part_etag = NULL;
g_free(self->filename);
}
amfree(self->uploadId);
if (self->chunked) {
CurlBuffer *buf = &self->s3t[0].curl_buffer;
g_cond_free(buf->cond);
buf->cond = NULL;
g_mutex_free(buf->mutex);
buf->mutex = NULL;
}
/* we're not in a file anymore */
g_mutex_lock(pself->device_mutex);
pself->in_file = FALSE;
pself->bytes_written = 0;;
g_mutex_unlock(pself->device_mutex);
if (pself->status != DEVICE_STATUS_SUCCESS) {
return FALSE;
}
return TRUE;
}
static gboolean
s3_device_recycle_file(Device *pself, guint file) {
S3Device *self = S3_DEVICE(pself);
if (device_in_error(self)) return FALSE;
reset_thread(self);
delete_file(self, file);
s3_wait_thread_delete(self);
return !device_in_error(self);
/* delete_file already set our error message if necessary */
}
static gboolean
s3_device_erase(Device *pself) {
S3Device *self = S3_DEVICE(pself);
char *key = NULL;
const char *errmsg = NULL;
guint response_code;
s3_error_code_t s3_error_code;
if (!setup_handle(self)) {
/* error set by setup_handle */
return FALSE;
}
reset_thread(self);
key = special_file_to_key(self, "tapestart", -1);
if (!s3_delete(self->s3t[0].s3, self->bucket, key)) {
s3_error(self->s3t[0].s3, &errmsg, NULL, NULL, NULL, NULL, NULL);
device_set_error(pself,
g_strdup(errmsg),
DEVICE_STATUS_DEVICE_ERROR);
return FALSE;
}
g_free(key);
dumpfile_free(pself->volume_header);
pself->volume_header = NULL;
if (!delete_all_files(self))
return FALSE;
device_set_error(pself, g_strdup("Unlabeled volume"),
DEVICE_STATUS_VOLUME_UNLABELED);
if (self->create_bucket &&
!s3_delete_bucket(self->s3t[0].s3, self->bucket)) {
s3_error(self->s3t[0].s3, &errmsg, &response_code, &s3_error_code, NULL, NULL, NULL);
/*
* ignore the error if the bucket isn't empty (there may be data from elsewhere)
* or the bucket not existing (already deleted perhaps?)
*/
if (!(
(response_code == 409 && s3_error_code == S3_ERROR_BucketNotEmpty) ||
(response_code == 404 && s3_error_code == S3_ERROR_NoSuchBucket))) {
device_set_error(pself,
g_strdup(errmsg),
DEVICE_STATUS_DEVICE_ERROR);
return FALSE;
}
self->bucket_made = FALSE;
}
self->volume_bytes = 0;
catalog_remove(self);
return TRUE;
}
static gboolean
s3_device_set_reuse(
Device *dself)
{
S3Device *self = S3_DEVICE(dself);
GSList *lifecycle = NULL, *life;
lifecycle_rule *rule;
gboolean removed = FALSE;
if (self->transition_to_glacier < 0 && !self->read_from_glacier) {
return TRUE;
}
if (device_in_error(self)) return dself->status;
if (!setup_handle(self)) {
/* setup_handle already set our error message */
return dself->status;
}
reset_thread(self);
s3_get_lifecycle(self->s3t[0].s3, self->bucket, &lifecycle);
/* remove it if it exist */
for (life = lifecycle; life != NULL; life = life->next) {
rule = (lifecycle_rule *)life->data;
if (g_str_equal(rule->id, dself->volume_label)) {
removed = TRUE;
lifecycle = g_slist_delete_link(lifecycle, life);
free_lifecycle_rule(rule);
break;
}
}
if (removed) {
s3_put_lifecycle(self->s3t[0].s3, self->bucket, lifecycle);
}
return TRUE;
}
static gboolean
s3_device_set_no_reuse(
Device *dself,
char *label,
char *datestamp)
{
S3Device *self = S3_DEVICE(dself);
GSList *lifecycle = NULL, *life, *next_life, *prev_life = NULL;
lifecycle_rule *rule;
guint count = 0;
GSList *to_remove = NULL;
char *lifecycle_datestamp = NULL;
time_t t;
struct tm tmp;
if (self->transition_to_glacier < 0) {
return TRUE;
}
if (!label || !datestamp) {
s3_device_read_label(dself);
label = dself->volume_label;
datestamp = dself->volume_time;
}
if (device_in_error(self)) return dself->status;
if (!setup_handle(self)) {
/* setup_handle already set our error message */
return dself->status;
}
reset_thread(self);
s3_get_lifecycle(self->s3t[0].s3, self->bucket, &lifecycle);
/* remove it if it exist */
for (life = lifecycle; life != NULL; life = next_life) {
next_life = life->next;
rule = (lifecycle_rule *)life->data;
if (g_str_equal(rule->id, label)) {
free_lifecycle_rule(rule);
if (prev_life == NULL) {
lifecycle = next_life;
} else {
prev_life->next = next_life;
}
//g_free(life);
} else {
if (!to_remove ||
strcmp(datestamp, lifecycle_datestamp) < 0) {
to_remove = life;
g_free(lifecycle_datestamp);
lifecycle_datestamp = g_strdup(datestamp);
}
prev_life = life;
count++;
}
}
/* remove a lifecycle */
if (count >= 999) {
rule = (lifecycle_rule *)to_remove->data;
free_lifecycle_rule(rule);
lifecycle = g_slist_delete_link(lifecycle, to_remove);
}
/* add it */
rule = g_new0(lifecycle_rule, 1);
rule->id = g_strdup(label);
rule->prefix = g_strdup_printf("%sf", self->prefix);
rule->status = g_strdup("Enabled");
rule->transition = g_new0(lifecycle_action, 1);
rule->transition->days = 0;
t = time(NULL) + ((self->transition_to_glacier+1) * 86400);
if (!gmtime_r(&t, &tmp)) perror("localtime");
rule->transition->date = g_strdup_printf(
"%04d-%02d-%02dT00:00:00.000Z",
1900+tmp.tm_year, tmp.tm_mon+1, tmp.tm_mday);
rule->transition->storage_class = g_strdup("GLACIER");
lifecycle = g_slist_append(lifecycle, rule);
s3_put_lifecycle(self->s3t[0].s3, self->bucket, lifecycle);
return TRUE;
}
/* functions for reading */
static gboolean
s3_device_init_seek_file(
Device *pself,
guint file)
{
S3Device *self = S3_DEVICE(pself);
gboolean result;
GSList *objects;
char *prefix;
const char *errmsg = NULL;
if (!self->read_from_glacier) {
return TRUE;
}
/* get a list of all objects */
if (file == 0) {
prefix = special_file_to_key(self, "tapestart", -1);
} else {
prefix = file_to_prefix(self, file);
}
result = s3_list_keys(self->s3t[0].s3, self->bucket, NULL, prefix, NULL,
&objects, NULL);
g_free(prefix);
if (!result) {
guint response_code;
s3_error_code_t s3_error_code;
s3_error(self->s3t[0].s3, &errmsg, &response_code, &s3_error_code,
NULL, NULL, NULL);
device_set_error(pself,
g_strdup_printf(_("failed to list objects: %s"), errmsg),
DEVICE_STATUS_SUCCESS);
return FALSE;
}
/* for all GLACIER objects */
for (; objects; ) {
s3_object *object = (s3_object *)objects->data;
s3_head_t *head;
objects = g_slist_remove(objects, objects->data);
if (object->storage_class == S3_SC_GLACIER) {
/* HEAD object */
head = s3_head(self->s3t[0].s3, self->bucket, object->key);
if (!head) {
guint response_code;
s3_error_code_t s3_error_code;
s3_error(self->s3t[0].s3, &errmsg, &response_code,
&s3_error_code, NULL, NULL, NULL);
device_set_error(pself,
g_strdup_printf(
_("failed to get head of objects '%s': %s"),
object->key, errmsg),
DEVICE_STATUS_SUCCESS);
return FALSE;
}
/* if it is not restored */
if (!head->x_amz_restore) {
/* init restore */
result = s3_init_restore(self->s3t[0].s3, self->bucket,
object->key);
if (!result) {
guint response_code;
s3_error_code_t s3_error_code;
s3_error(self->s3t[0].s3, &errmsg, &response_code,
&s3_error_code, NULL, NULL, NULL);
device_set_error(pself,
g_strdup_printf(_("failed to list objects: %s"),
errmsg),
DEVICE_STATUS_SUCCESS);
return FALSE;
}
}
free_s3_head(head);
}
free_s3_object(object);
}
return TRUE;
}
static dumpfile_t*
s3_device_seek_file(Device *pself, guint file) {
S3Device *self = S3_DEVICE(pself);
gboolean result;
char *key;
CurlBuffer buf = {NULL, 0, 0, S3_DEVICE_MAX_BLOCK_SIZE, TRUE, NULL, NULL};
dumpfile_t *amanda_header;
const char *errmsg = NULL;
int thread;
GSList *objects;
if (device_in_error(self)) return NULL;
reset_thread(self);
g_mutex_lock(pself->device_mutex);
pself->file = file;
pself->is_eof = FALSE;
pself->block = 0;
pself->in_file = FALSE;
pself->bytes_read = 0;
g_mutex_unlock(pself->device_mutex);
g_mutex_lock(self->thread_idle_mutex);
self->last_byte_read = -1;
self->next_block_to_read = 0;
self->next_byte_to_read = 0;
self->dltotal = 0;
g_mutex_unlock(self->thread_idle_mutex);
s3_device_init_seek_file(pself, file);
/* read it in */
key = special_file_to_key(self, "filestart", pself->file);
result = s3_read(self->s3t[0].s3, self->bucket, key, S3_BUFFER_WRITE_FUNCS,
&buf, NULL, NULL);
g_free(key);
if (!result) {
guint response_code;
s3_error_code_t s3_error_code;
s3_error(self->s3t[0].s3, &errmsg, &response_code, &s3_error_code, NULL, NULL, NULL);
/* if it's an expected error (not found), check what to do. */
if (response_code == 404 &&
(s3_error_code == S3_ERROR_None ||
s3_error_code == S3_ERROR_NotFound ||
s3_error_code == S3_ERROR_NoSuchKey ||
s3_error_code == S3_ERROR_NoSuchEntity)) {
int next_file;
next_file = find_next_file(self, pself->file);
if (next_file > 0) {
/* Note short-circut of dispatcher. */
return s3_device_seek_file(pself, next_file);
} else if (next_file == 0) {
/* No next file. Check if we are one past the end. */
s3_device_init_seek_file(pself, pself->file - 1);
key = special_file_to_key(self, "filestart", pself->file - 1);
result = s3_read(self->s3t[0].s3, self->bucket, key,
S3_BUFFER_WRITE_FUNCS, &buf, NULL, NULL);
g_free(key);
if (result) {
/* pself->file, etc. are already correct */
return make_tapeend_header();
} else {
device_set_error(pself,
g_strdup(_("Attempt to read past tape-end file")),
DEVICE_STATUS_SUCCESS);
return NULL;
}
}
} else {
/* An unexpected error occured finding out if we are the last file. */
device_set_error(pself,
g_strdup(errmsg),
DEVICE_STATUS_DEVICE_ERROR);
return NULL;
}
}
/* and make a dumpfile_t out of it */
g_assert(buf.buffer != NULL);
amanda_header = g_new(dumpfile_t, 1);
fh_init(amanda_header);
parse_file_header(buf.buffer, amanda_header, buf.buffer_pos);
g_free(buf.buffer);
switch (amanda_header->type) {
case F_DUMPFILE:
case F_CONT_DUMPFILE:
case F_SPLIT_DUMPFILE:
break;
default:
device_set_error(pself,
g_strdup(_("Invalid amanda header while reading file header")),
DEVICE_STATUS_VOLUME_ERROR);
g_free(amanda_header);
return NULL;
}
g_free(self->filename);
self->filename = file_to_multi_part_key(self, pself->file);
result = s3_list_keys(self->s3t[0].s3, self->bucket, NULL, self->filename,
NULL, &objects, NULL);
if (objects) { /* multi-part */
s3_object *part = (s3_object *)objects->data;
self->object_size = part->size;
slist_free_full(objects, free_s3_object);
} else {
g_free(self->filename);
self->filename = NULL;
self->object_size = 0;
}
pself->in_file = TRUE;
for (thread = 0; thread < self->nb_threads; thread++) {
self->s3t[thread].idle = 1;
self->s3t[thread].eof = FALSE;
self->s3t[thread].ulnow = 0;
}
g_mutex_lock(pself->device_mutex);
pself->in_file = TRUE;
g_mutex_unlock(pself->device_mutex);
return amanda_header;
}
static gboolean
s3_device_seek_block(Device *pself, guint64 block) {
S3Device * self = S3_DEVICE(pself);
if (device_in_error(pself)) return FALSE;
reset_thread(self);
pself->block = block;
self->last_byte_read = (block * pself->block_size) - 1;
self->next_block_to_read = block;
self->next_byte_to_read = block * pself->block_size;
return TRUE;
}
static void
s3_start_read_ahead(
Device * pself,
int max_block,
int size_req)
{
S3Device * self = S3_DEVICE(pself);
char *key;
int thread;
guint64 range_min = 0;
guint64 range_max = 0;
int allocate = size_req;
if (self->chunked) {
allocate = size_req*2 + 1;
}
/* start a read ahead for each thread */
for (thread = 0; thread < self->nb_threads_recovery; thread++) {
S3_by_thread *s3t = &self->s3t[thread];
if (s3t->idle) {
if (self->filename) {
if (max_block >= 0 &&
self->next_byte_to_read > self->last_byte_read + max_block * size_req)
break;
range_min = self->next_byte_to_read;
if (range_min >= self->object_size) {
break;
}
if (self->chunked && max_block > 0) {
range_max = range_min + max_block * size_req - 1;
} else if (self->chunked && max_block < 0) {
range_max = self->object_size-1;
} else {
range_max = range_min + size_req - 1;
}
if (range_max >= self->object_size) {
range_max = self->object_size-1;
}
key = g_strdup(self->filename);
} else {
if (max_block >= 0 &&
self->next_block_to_read >= (gint64)pself->block + max_block)
break;
key = file_and_block_to_key(self, pself->file,
self->next_block_to_read);
}
s3t->filename = key;
s3t->range_min = range_min;
s3t->range_max = range_max;
s3t->done = 0;
s3t->idle = 0;
s3t->eof = FALSE;
s3t->dlnow = 0;
s3t->ulnow = 0;
s3t->errflags = DEVICE_STATUS_SUCCESS;
if (self->chunked ||
(self->s3t[thread].curl_buffer.buffer &&
(int)self->s3t[thread].curl_buffer.buffer_len < size_req)) {
g_free(self->s3t[thread].curl_buffer.buffer);
self->s3t[thread].curl_buffer.buffer = NULL;
self->s3t[thread].curl_buffer.buffer_len = 0;
self->s3t[thread].buffer_len = 0;
}
if (!self->s3t[thread].curl_buffer.buffer) {
self->s3t[thread].curl_buffer.buffer = g_try_malloc(allocate);
if (self->s3t[thread].curl_buffer.buffer == NULL) {
s3t->done = 1;
s3t->idle = 1;
device_set_error(pself, g_strdup("Failed to allocate memory"),
DEVICE_STATUS_DEVICE_ERROR);
return;
}
self->s3t[thread].curl_buffer.buffer_len = allocate;
self->s3t[thread].buffer_len = allocate;
}
s3t->curl_buffer.buffer_pos = 0;
if (self->chunked) {
self->s3t[thread].curl_buffer.buffer_len = 0;
s3t->curl_buffer.max_buffer_size = allocate;
s3t->curl_buffer.end_of_buffer = FALSE;
s3t->curl_buffer.mutex = g_mutex_new();
s3t->curl_buffer.cond = g_cond_new();
} else {
s3t->curl_buffer.max_buffer_size = S3_DEVICE_MAX_BLOCK_SIZE;
s3t->curl_buffer.end_of_buffer = TRUE;
s3t->curl_buffer.mutex = NULL;
s3t->curl_buffer.cond = NULL;
}
self->next_block_to_read++;
self->next_byte_to_read += size_req;
g_thread_pool_push(self->thread_pool_read, s3t, NULL);
}
}
}
static int
s3_device_read_block (Device * pself, gpointer data, int *size_req, int max_block G_GNUC_UNUSED) {
S3Device * self = S3_DEVICE(pself);
char *key;
int thread;
int done = 0;
int found = 0;
guint64 range_min = 0;
S3_by_thread *s3t = NULL;
g_assert (self != NULL);
if (device_in_error(self)) return -1;
g_mutex_lock(self->thread_idle_mutex);
/* start a read ahead for each thread */
s3_start_read_ahead(pself, max_block, *size_req);
if (device_in_error(self)) {
g_mutex_unlock(self->thread_idle_mutex);
return -1;
}
if (self->chunked) {
CurlBuffer *buf = &self->s3t[0].curl_buffer;
guint avail;
guint size = *size_req;
g_mutex_unlock(self->thread_idle_mutex);
g_mutex_lock(buf->mutex);
/* wait for the data to be available */
while (1) {
if (buf->buffer_len == buf->buffer_pos) {
avail = 0;
} else if (buf->buffer_len > buf->buffer_pos) {
avail = buf->buffer_len - buf->buffer_pos;
} else {
avail = buf->max_buffer_size - buf->buffer_pos + buf->buffer_len;
}
if (avail > size || buf->end_of_buffer)
break;
g_cond_wait(buf->cond, buf->mutex);
}
if (size > avail)
size = avail;
if (size > 0) {
if (buf->buffer_len > buf->buffer_pos) {
memcpy((char *)data, buf->buffer + buf->buffer_pos, size);
buf->buffer_pos += size;
} else {
guint count_end = buf->max_buffer_size - buf->buffer_pos;
guint count_begin;
if (count_end > size)
count_end = size;
memcpy((char *)data, buf->buffer + buf->buffer_pos, count_end);
buf->buffer_pos += count_end;
count_begin = size - count_end;
if (count_begin > 0) {
memcpy((char *)data + count_end, buf->buffer, count_begin);
buf->buffer_pos = count_begin;
}
}
}
/* signal to add more data to the buffer */
g_cond_broadcast(buf->cond);
g_mutex_unlock(buf->mutex);
if (size == 0 && buf->end_of_buffer) {
pself->is_eof = TRUE;
pself->in_file = FALSE;
device_set_error(pself, g_strdup(_("EOF")), DEVICE_STATUS_SUCCESS);
return -1;
}
*size_req = size;
pself->block++;
/* disable read_ahead thread if the complete chunk is read */
if (self->chunked && max_block == 1) {
S3_by_thread *s3t = &self->s3t[0];
s3t->idle = TRUE;
s3t->curl_buffer.end_of_buffer = FALSE;
}
return size;
}
/* get the file*/
if (self->filename) {
key = g_strdup(self->filename);
range_min = self->last_byte_read + 1;
} else {
key = file_and_block_to_key(self, pself->file, pself->block);
}
g_assert(key != NULL);
found = 0;
/* find which thread read the key */
for (thread = 0; thread < self->nb_threads_recovery; thread++) {
s3t = &self->s3t[thread];
if (!s3t->idle &&
g_str_equal(key, (char *)s3t->filename) &&
range_min == s3t->range_min) {
found = 1;
break;
}
}
if (!found) {
/* return eof */
g_free(key);
pself->is_eof = TRUE;
pself->in_file = FALSE;
device_set_error(pself, g_strdup(_("EOF")), DEVICE_STATUS_SUCCESS);
g_mutex_unlock(self->thread_idle_mutex);
return -1;
}
while (!done) {
if (!s3t->done) {
} else if (s3t->eof) {
/* return eof */
g_free(key);
pself->is_eof = TRUE;
pself->in_file = FALSE;
device_set_error(pself, g_strdup(_("EOF")), DEVICE_STATUS_SUCCESS);
g_mutex_unlock(self->thread_idle_mutex);
return -1;
} else if (s3t->errflags != DEVICE_STATUS_SUCCESS) {
/* return the error */
device_set_error(pself, (char *)s3t->errmsg, s3t->errflags);
g_free(key);
g_mutex_unlock(self->thread_idle_mutex);
return -1;
} else if ((guint)*size_req >= s3t->curl_buffer.buffer_pos) {
/* return the buffer */
g_mutex_unlock(self->thread_idle_mutex);
memcpy(data, s3t->curl_buffer.buffer, s3t->curl_buffer.buffer_pos);
*size_req = s3t->curl_buffer.buffer_pos;
g_free(key);
s3t->idle = 1;
g_free((char *)s3t->filename);
pself->block++;
self->last_byte_read += *size_req;
done = 1;
g_mutex_lock(self->thread_idle_mutex);
break;
} else { /* buffer not enough large */
*size_req = s3t->curl_buffer.buffer_len;
g_free(key);
g_mutex_unlock(self->thread_idle_mutex);
return 0;
}
if (!done) {
g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
}
}
/* start a read ahead for each thread */
s3_start_read_ahead(pself, max_block-1, *size_req);
g_mutex_unlock(self->thread_idle_mutex);
return *size_req;
}
static void
s3_thread_read_block(
gpointer thread_data,
gpointer data)
{
S3_by_thread *s3t = (S3_by_thread *)thread_data;
Device *pself = (Device *)data;
S3Device *self = S3_DEVICE(pself);
gboolean result;
if (s3t->range_max > 0) {
g_mutex_lock(s3t->now_mutex);
s3t->timeout = time(NULL) + 300;
g_mutex_unlock(s3t->now_mutex);
result = s3_read_range(s3t->s3, self->bucket, (char *)s3t->filename,
s3t->range_min, s3t->range_max,
s3_buffer_write_func, s3_buffer_reset_func,
(CurlBuffer *)&s3t->curl_buffer,
progress_func, s3t);
g_mutex_lock(s3t->now_mutex);
s3t->timeout = 0;
g_mutex_unlock(s3t->now_mutex);
} else {
g_mutex_lock(s3t->now_mutex);
s3t->timeout = time(NULL) + 300;
g_mutex_unlock(s3t->now_mutex);
result = s3_read(s3t->s3, self->bucket, (char *)s3t->filename,
s3_buffer_write_func, s3_buffer_reset_func,
(CurlBuffer *)&s3t->curl_buffer,
progress_func, s3t);
g_mutex_lock(s3t->now_mutex);
s3t->timeout = 0;
g_mutex_unlock(s3t->now_mutex);
}
if (s3t->curl_buffer.mutex) {
g_mutex_lock(s3t->curl_buffer.mutex);
s3t->curl_buffer.end_of_buffer = TRUE;
g_cond_broadcast(s3t->curl_buffer.cond);
g_mutex_unlock(s3t->curl_buffer.mutex);
}
g_mutex_lock(self->thread_idle_mutex);
if (!result) {
guint response_code;
s3_error_code_t s3_error_code;
s3_error(s3t->s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL);
/* if it's an expected error (not found), just return -1 */
if ((response_code == 404 &&
(s3_error_code == S3_ERROR_None ||
s3_error_code == S3_ERROR_NotFound ||
s3_error_code == S3_ERROR_Unknown ||
s3_error_code == S3_ERROR_NoSuchKey ||
s3_error_code == S3_ERROR_NoSuchEntity)) ||
(response_code == 416 &&
s3_error_code == S3_ERROR_InvalidRange)) {
s3t->eof = TRUE;
} else {
/* otherwise, log it and return FALSE */
s3t->errflags = DEVICE_STATUS_VOLUME_ERROR;
s3t->errmsg = g_strdup_printf(_("While reading data block from S3: %s"),
s3_strerror(s3t->s3));
}
s3t->eof = TRUE;
} else {
self->dltotal += s3t->curl_buffer.buffer_pos;
}
s3t->dlnow = 0;
s3t->ulnow = 0;
s3t->done = 1;
g_cond_broadcast(self->thread_idle_cond);
g_mutex_unlock(self->thread_idle_mutex);
return;
}
static gboolean
check_at_peom(S3Device *self, guint64 size)
{
if(self->enforce_volume_limit && (self->volume_limit > 0)) {
guint64 newtotal = self->volume_bytes + size;
if(newtotal > self->volume_limit) {
return TRUE;
}
}
return FALSE;
}
static gboolean
check_at_leom(S3Device *self, guint64 size)
{
guint64 block_size = DEVICE(self)->block_size;
guint64 eom_warning_buffer = block_size *
(EOM_EARLY_WARNING_ZONE_BLOCKS + self->nb_threads);
if(!self->leom)
return FALSE;
if(self->enforce_volume_limit && (self->volume_limit > 0)) {
guint64 newtotal = self->volume_bytes + size + eom_warning_buffer;
if(newtotal > self->volume_limit) {
return TRUE;
}
}
return FALSE;
}
static void
reset_thread(
S3Device *self)
{
int thread;
int nb_done = 0;
if (self->thread_idle_mutex) {
g_mutex_lock(self->thread_idle_mutex);
while(nb_done != self->nb_threads) {
nb_done = 0;
for (thread = 0; thread < self->nb_threads; thread++) {
if (self->s3t[thread].done == 1)
nb_done++;
}
if (nb_done != self->nb_threads) {
g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
}
}
g_mutex_unlock(self->thread_idle_mutex);
}
}