/*
* ***** BEGIN LICENSE BLOCK *****
* Version: MIT
*
* Portions created by Alan Antonuk are Copyright (c) 2012-2013
* Alan Antonuk. All Rights Reserved.
*
* Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc.
* All Rights Reserved.
*
* Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010
* VMware, Inc. and Tony Garnock-Jones. All Rights Reserved.
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
* ***** END LICENSE BLOCK *****
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#ifdef _MSC_VER
/* MSVC complains about sprintf being deprecated in favor of sprintf_s */
#define _CRT_SECURE_NO_WARNINGS
/* MSVC complains about strdup being deprecated in favor of _strdup */
#define _CRT_NONSTDC_NO_DEPRECATE
#endif
#include "amqp_private.h"
#include "amqp_time.h"
#include <stdarg.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#define ERROR_MASK (0x00FF)
#define ERROR_CATEGORY_MASK (0xFF00)
enum error_category_enum_ { EC_base = 0, EC_tcp = 1, EC_ssl = 2 };
static const char *base_error_strings[] = {
/* AMQP_STATUS_OK 0x0 */
"operation completed successfully",
/* AMQP_STATUS_NO_MEMORY -0x0001 */
"could not allocate memory",
/* AMQP_STATUS_BAD_AQMP_DATA -0x0002 */
"invalid AMQP data",
/* AMQP_STATUS_UNKNOWN_CLASS -0x0003 */
"unknown AMQP class id",
/* AMQP_STATUS_UNKNOWN_METHOD -0x0004 */
"unknown AMQP method id",
/* AMQP_STATUS_HOSTNAME_RESOLUTION_FAILED -0x0005 */
"hostname lookup failed",
/* AMQP_STATUS_INCOMPATIBLE_AMQP_VERSION -0x0006 */
"incompatible AMQP version",
/* AMQP_STATUS_CONNECTION_CLOSED -0x0007 */
"connection closed unexpectedly",
/* AMQP_STATUS_BAD_AMQP_URL -0x0008 */
"could not parse AMQP URL",
/* AMQP_STATUS_SOCKET_ERROR -0x0009 */
"a socket error occurred",
/* AMQP_STATUS_INVALID_PARAMETER -0x000A */
"invalid parameter",
/* AMQP_STATUS_TABLE_TOO_BIG -0x000B */
"table too large for buffer",
/* AMQP_STATUS_WRONG_METHOD -0x000C */
"unexpected method received",
/* AMQP_STATUS_TIMEOUT -0x000D */
"request timed out",
/* AMQP_STATUS_TIMER_FAILED -0x000E */
"system timer has failed",
/* AMQP_STATUS_HEARTBEAT_TIMEOUT -0x000F */
"heartbeat timeout, connection closed",
/* AMQP_STATUS_UNEXPECTED STATE -0x0010 */
"unexpected protocol state",
/* AMQP_STATUS_SOCKET_CLOSED -0x0011 */
"socket is closed",
/* AMQP_STATUS_SOCKET_INUSE -0x0012 */
"socket already open",
/* AMQP_STATUS_BROKER_UNSUPPORTED_SASL_METHOD -0x00013 */
"unsupported sasl method requested",
/* AMQP_STATUS_UNSUPPORTED -0x0014 */
"parameter value is unsupported"};
static const char *tcp_error_strings[] = {
/* AMQP_STATUS_TCP_ERROR -0x0100 */
"a socket error occurred",
/* AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR -0x0101 */
"socket library initialization failed"};
static const char *ssl_error_strings[] = {
/* AMQP_STATUS_SSL_ERRO R -0x0200 */
"a SSL error occurred",
/* AMQP_STATUS_SSL_HOSTNAME_VERIFY_FAILED -0x0201 */
"SSL hostname verification failed",
/* AMQP_STATUS_SSL_PEER_VERIFY_FAILED -0x0202 */
"SSL peer cert verification failed",
/* AMQP_STATUS_SSL_CONNECTION_FAILED -0x0203 */
"SSL handshake failed"};
static const char *unknown_error_string = "(unknown error)";
const char *amqp_error_string2(int code) {
const char *error_string;
size_t category = (((-code) & ERROR_CATEGORY_MASK) >> 8);
size_t error = (-code) & ERROR_MASK;
switch (category) {
case EC_base:
if (error < (sizeof(base_error_strings) / sizeof(char *))) {
error_string = base_error_strings[error];
} else {
error_string = unknown_error_string;
}
break;
case EC_tcp:
if (error < (sizeof(tcp_error_strings) / sizeof(char *))) {
error_string = tcp_error_strings[error];
} else {
error_string = unknown_error_string;
}
break;
case EC_ssl:
if (error < (sizeof(ssl_error_strings) / sizeof(char *))) {
error_string = ssl_error_strings[error];
} else {
error_string = unknown_error_string;
}
break;
default:
error_string = unknown_error_string;
break;
}
return error_string;
}
char *amqp_error_string(int code) {
/* Previously sometimes clients had to flip the sign on a return value from a
* function to get the correct error code. Now, all error codes are negative.
* To keep people's legacy code running correctly, we map all error codes to
* negative values.
*
* This is only done with this deprecated function.
*/
if (code > 0) {
code = -code;
}
return strdup(amqp_error_string2(code));
}
void amqp_abort(const char *fmt, ...) {
va_list ap;
va_start(ap, fmt);
vfprintf(stderr, fmt, ap);
va_end(ap);
fputc('\n', stderr);
abort();
}
const amqp_bytes_t amqp_empty_bytes = {0, NULL};
const amqp_table_t amqp_empty_table = {0, NULL};
const amqp_array_t amqp_empty_array = {0, NULL};
int amqp_basic_publish(amqp_connection_state_t state, amqp_channel_t channel,
amqp_bytes_t exchange, amqp_bytes_t routing_key,
amqp_boolean_t mandatory, amqp_boolean_t immediate,
amqp_basic_properties_t const *properties,
amqp_bytes_t body) {
amqp_frame_t f;
size_t body_offset;
size_t usable_body_payload_size =
state->frame_max - (HEADER_SIZE + FOOTER_SIZE);
int res;
int flagz;
amqp_basic_publish_t m;
amqp_basic_properties_t default_properties;
m.exchange = exchange;
m.routing_key = routing_key;
m.mandatory = mandatory;
m.immediate = immediate;
m.ticket = 0;
/* TODO(alanxz): this heartbeat check is happening in the wrong place, it
* should really be done in amqp_try_send/writev */
res = amqp_time_has_past(state->next_recv_heartbeat);
if (AMQP_STATUS_TIMER_FAILURE == res) {
return res;
} else if (AMQP_STATUS_TIMEOUT == res) {
res = amqp_try_recv(state);
if (AMQP_STATUS_TIMEOUT == res) {
return AMQP_STATUS_HEARTBEAT_TIMEOUT;
} else if (AMQP_STATUS_OK != res) {
return res;
}
}
res = amqp_send_method_inner(state, channel, AMQP_BASIC_PUBLISH_METHOD, &m,
AMQP_SF_MORE, amqp_time_infinite());
if (res < 0) {
return res;
}
if (properties == NULL) {
memset(&default_properties, 0, sizeof(default_properties));
properties = &default_properties;
}
f.frame_type = AMQP_FRAME_HEADER;
f.channel = channel;
f.payload.properties.class_id = AMQP_BASIC_CLASS;
f.payload.properties.body_size = body.len;
f.payload.properties.decoded = (void *)properties;
if (body.len > 0) {
flagz = AMQP_SF_MORE;
} else {
flagz = AMQP_SF_NONE;
}
res = amqp_send_frame_inner(state, &f, flagz, amqp_time_infinite());
if (res < 0) {
return res;
}
body_offset = 0;
while (body_offset < body.len) {
size_t remaining = body.len - body_offset;
if (remaining == 0) {
break;
}
f.frame_type = AMQP_FRAME_BODY;
f.channel = channel;
f.payload.body_fragment.bytes = amqp_offset(body.bytes, body_offset);
if (remaining >= usable_body_payload_size) {
f.payload.body_fragment.len = usable_body_payload_size;
flagz = AMQP_SF_MORE;
} else {
f.payload.body_fragment.len = remaining;
flagz = AMQP_SF_NONE;
}
body_offset += f.payload.body_fragment.len;
res = amqp_send_frame_inner(state, &f, flagz, amqp_time_infinite());
if (res < 0) {
return res;
}
}
return AMQP_STATUS_OK;
}
amqp_rpc_reply_t amqp_channel_close(amqp_connection_state_t state,
amqp_channel_t channel, int code) {
char codestr[13];
amqp_method_number_t replies[2] = {AMQP_CHANNEL_CLOSE_OK_METHOD, 0};
amqp_channel_close_t req;
if (code < 0 || code > UINT16_MAX) {
return amqp_rpc_reply_error(AMQP_STATUS_INVALID_PARAMETER);
}
req.reply_code = (uint16_t)code;
req.reply_text.bytes = codestr;
req.reply_text.len = sprintf(codestr, "%d", code);
req.class_id = 0;
req.method_id = 0;
return amqp_simple_rpc(state, channel, AMQP_CHANNEL_CLOSE_METHOD, replies,
&req);
}
amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state,
int code) {
char codestr[13];
amqp_method_number_t replies[2] = {AMQP_CONNECTION_CLOSE_OK_METHOD, 0};
amqp_channel_close_t req;
if (code < 0 || code > UINT16_MAX) {
return amqp_rpc_reply_error(AMQP_STATUS_INVALID_PARAMETER);
}
req.reply_code = (uint16_t)code;
req.reply_text.bytes = codestr;
req.reply_text.len = sprintf(codestr, "%d", code);
req.class_id = 0;
req.method_id = 0;
return amqp_simple_rpc(state, 0, AMQP_CONNECTION_CLOSE_METHOD, replies, &req);
}
int amqp_basic_ack(amqp_connection_state_t state, amqp_channel_t channel,
uint64_t delivery_tag, amqp_boolean_t multiple) {
amqp_basic_ack_t m;
m.delivery_tag = delivery_tag;
m.multiple = multiple;
return amqp_send_method(state, channel, AMQP_BASIC_ACK_METHOD, &m);
}
amqp_rpc_reply_t amqp_basic_get(amqp_connection_state_t state,
amqp_channel_t channel, amqp_bytes_t queue,
amqp_boolean_t no_ack) {
amqp_method_number_t replies[] = {AMQP_BASIC_GET_OK_METHOD,
AMQP_BASIC_GET_EMPTY_METHOD, 0};
amqp_basic_get_t req;
req.ticket = 0;
req.queue = queue;
req.no_ack = no_ack;
state->most_recent_api_result =
amqp_simple_rpc(state, channel, AMQP_BASIC_GET_METHOD, replies, &req);
return state->most_recent_api_result;
}
int amqp_basic_reject(amqp_connection_state_t state, amqp_channel_t channel,
uint64_t delivery_tag, amqp_boolean_t requeue) {
amqp_basic_reject_t req;
req.delivery_tag = delivery_tag;
req.requeue = requeue;
return amqp_send_method(state, channel, AMQP_BASIC_REJECT_METHOD, &req);
}
int amqp_basic_nack(amqp_connection_state_t state, amqp_channel_t channel,
uint64_t delivery_tag, amqp_boolean_t multiple,
amqp_boolean_t requeue) {
amqp_basic_nack_t req;
req.delivery_tag = delivery_tag;
req.multiple = multiple;
req.requeue = requeue;
return amqp_send_method(state, channel, AMQP_BASIC_NACK_METHOD, &req);
}
struct timeval *amqp_get_handshake_timeout(amqp_connection_state_t state) {
return state->handshake_timeout;
}
int amqp_set_handshake_timeout(amqp_connection_state_t state,
struct timeval *timeout) {
if (timeout) {
if (timeout->tv_sec < 0 || timeout->tv_usec < 0) {
return AMQP_STATUS_INVALID_PARAMETER;
}
state->internal_handshake_timeout = *timeout;
state->handshake_timeout = &state->internal_handshake_timeout;
} else {
state->handshake_timeout = NULL;
}
return AMQP_STATUS_OK;
}
struct timeval *amqp_get_rpc_timeout(amqp_connection_state_t state) {
return state->rpc_timeout;
}
int amqp_set_rpc_timeout(amqp_connection_state_t state,
struct timeval *timeout) {
if (timeout) {
if (timeout->tv_sec < 0 || timeout->tv_usec < 0) {
return AMQP_STATUS_INVALID_PARAMETER;
}
state->rpc_timeout = &state->internal_rpc_timeout;
*state->rpc_timeout = *timeout;
} else {
state->rpc_timeout = NULL;
}
return AMQP_STATUS_OK;
}