Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,14 @@ PYTHON=python
all: build

add-submodules:
-git submodule add -b v0.8.0 https://github.com/alanxz/rabbitmq-c.git
git submodule add -b master https://github.com/alanxz/rabbitmq-c.git
Comment thread
thedrow marked this conversation as resolved.
Outdated

submodules:
git submodule init
git submodule update

rabbitmq-c: submodules
(cd $(RABBIT_DIR); test -f configure || autoreconf -i)
(cd $(RABBIT_DIR); test -f Makefile || automake --add-missing)

(cd $(RABBIT_DIR); cmake .; cmake --build .)

rabbitmq-clean:
-(cd $(RABBIT_DIR) && make clean)
Expand Down Expand Up @@ -50,8 +48,7 @@ distclean: pyclean rabbitmq-distclean removepyc
-rm -f erl_crash.dump

$(RABBIT_TARGET):
(test -f config.h || cd $(RABBIT_DIR); ./configure --disable-tools --disable-docs)
(cd $(RABBIT_DIR); make)
(cd $(RABBIT_DIR); cmake .; cmake --build .;)


dist: rabbitmq-c $(RABBIT_TARGET)
Expand Down
51 changes: 42 additions & 9 deletions Modules/_librabbitmq/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@

#include <amqp.h>
#include <amqp_tcp_socket.h>
#include <amqp_ssl_socket.h>
#include <amqp_ssl_socket.h>
#include <amqp_framing.h>

#include "connection.h"
#include "distmeta.h"
Expand Down Expand Up @@ -974,6 +977,8 @@ PyRabbitMQ_ConnectionType_init(PyRabbitMQ_Connection *self,
"channel_max",
"frame_max",
"heartbeat",
"ssl",
"confirmed",
"client_properties",
NULL
};
Expand All @@ -985,12 +990,15 @@ PyRabbitMQ_ConnectionType_init(PyRabbitMQ_Connection *self,
int channel_max = 0xffff;
int frame_max = 131072;
int heartbeat = 0;
int ssl = 0;
int confirmed = 0;
int port = 5672;
PyObject *client_properties = NULL;

if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|ssssiiiiO", kwlist,
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|ssssiiiiiiO", kwlist,
&hostname, &userid, &password, &virtual_host, &port,
&channel_max, &frame_max, &heartbeat, &client_properties)) {
&channel_max, &frame_max, &heartbeat, &ssl, &confirmed,
&client_properties)) {
return -1;
}

Expand All @@ -1012,6 +1020,8 @@ PyRabbitMQ_ConnectionType_init(PyRabbitMQ_Connection *self,
self->channel_max = channel_max;
self->frame_max = frame_max;
self->heartbeat = heartbeat;
self->ssl = ssl;
self->confirmed = confirmed;
self->weakreflist = NULL;
self->callbacks = PyDict_New();
if (self->callbacks == NULL) return -1;
Expand Down Expand Up @@ -1057,7 +1067,13 @@ PyRabbitMQ_Connection_connect(PyRabbitMQ_Connection *self)
}
Py_BEGIN_ALLOW_THREADS;
self->conn = amqp_new_connection();
socket = amqp_tcp_socket_new(self->conn);
if (self->ssl == 1 ) {
socket = amqp_ssl_socket_new(self->conn);
amqp_ssl_socket_set_verify_peer(socket, 0);
amqp_ssl_socket_set_verify_hostname(socket, 0);
} else {
socket = amqp_tcp_socket_new(self->conn);
}
Py_END_ALLOW_THREADS;

if (!socket) {
Expand Down Expand Up @@ -1132,14 +1148,22 @@ PyRabbitMQ_Connection_close(PyRabbitMQ_Connection *self)
unsigned int
PyRabbitMQ_Connection_create_channel(PyRabbitMQ_Connection *self, unsigned int channel)
{
amqp_rpc_reply_t reply;
amqp_rpc_reply_t replyopen;
amqp_rpc_reply_t replyconfirm;

Py_BEGIN_ALLOW_THREADS;
amqp_channel_open(self->conn, channel);
reply = amqp_get_rpc_reply(self->conn);
replyopen = amqp_get_rpc_reply(self->conn);
if (self->confirmed){
amqp_confirm_select(self->conn, (amqp_channel_t)channel);
replyconfirm = amqp_get_rpc_reply(self->conn);
}
Py_END_ALLOW_THREADS;

return PyRabbitMQ_HandleAMQError(self, 0, reply, "Couldn't create channel");
if ((replyopen.reply_type != AMQP_RESPONSE_NORMAL) || !(self->confirmed)) {
return PyRabbitMQ_HandleAMQError(self, 0, replyopen, "Couldn't create channel");
} else {
return PyRabbitMQ_HandleAMQError(self, 0, replyconfirm, "Couldn't set confirm mode");
}
}


Expand Down Expand Up @@ -1811,13 +1835,16 @@ PyRabbitMQ_Connection_basic_publish(PyRabbitMQ_Connection *self,
PyObject *exchange = NULL;
PyObject *routing_key = NULL;
PyObject *propdict;
amqp_frame_t frame;

unsigned int channel = 0;
unsigned int mandatory = 0;
unsigned int immediate = 0;

char *body_buf = NULL;
Py_ssize_t body_size = 0;

int status = 0;
int ret = 0;
amqp_basic_properties_t props;
amqp_bytes_t bytes;
Expand Down Expand Up @@ -1852,21 +1879,27 @@ PyRabbitMQ_Connection_basic_publish(PyRabbitMQ_Connection *self,
(amqp_boolean_t)immediate,
&props,
bytes);
if (self->confirmed){
status = amqp_simple_wait_frame_on_channel(self->conn,channel,&frame);
}
amqp_maybe_release_buffers_on_channel(self->conn, channel);
Py_END_ALLOW_THREADS;

if (!PyRabbitMQ_HandleError(ret, "basic.publish")) {
goto error;
}
if ((self->confirmed) && (status != AMQP_STATUS_OK) &&
(frame.frame_type != AMQP_FRAME_METHOD) &&
(frame.payload.method.id != AMQP_BASIC_ACK_METHOD )){
goto error;
}
Py_RETURN_NONE;

error:
PyRabbitMQ_revive_channel(self, channel);
bail:
return 0;
}


/*
* Connection._basic_ack
*/
Expand Down
6 changes: 6 additions & 0 deletions Modules/_librabbitmq/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ typedef struct {
int frame_max;
int channel_max;
int heartbeat;
int ssl;
int confirmed;

int sockfd;
int connected;
Expand Down Expand Up @@ -271,6 +273,10 @@ static PyMemberDef PyRabbitMQ_ConnectionType_members[] = {
offsetof(PyRabbitMQ_Connection, port), READONLY, NULL},
{"heartbeat", T_INT,
offsetof(PyRabbitMQ_Connection, heartbeat), READONLY, NULL},
{"ssl", T_INT,
offsetof(PyRabbitMQ_Connection, ssl), READONLY, NULL},
{"confirmed", T_INT,
offsetof(PyRabbitMQ_Connection, confirmed), READONLY, NULL},
{"server_properties", T_OBJECT_EX,
offsetof(PyRabbitMQ_Connection, server_properties), READONLY, NULL},
{"connected", T_INT,
Expand Down
7 changes: 5 additions & 2 deletions librabbitmq/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,14 +190,17 @@ class Connection(_librabbitmq.Connection):

def __init__(self, host='localhost', userid='guest', password='guest',
virtual_host='/', port=5672, channel_max=0xffff,
frame_max=131072, heartbeat=0, lazy=False,
frame_max=131072, heartbeat=0, ssl=False, confirmed=False, lazy=False,
client_properties=None, **kwargs):
if ':' in host:
host, port = host.split(':')
if ssl:
ssl = True
confirmed = confirmed if confirmed else kwargs.pop("confirm_publish",False)
super(Connection, self).__init__(
hostname=host, port=int(port), userid=userid, password=password,
virtual_host=virtual_host, channel_max=channel_max,
frame_max=frame_max, heartbeat=heartbeat,
frame_max=frame_max, heartbeat=heartbeat, ssl=int(ssl),confirmed=int(confirmed),
client_properties=client_properties,
)
self.channels = {}
Expand Down
2 changes: 1 addition & 1 deletion rabbitmq-c
Submodule rabbitmq-c updated 104 files
37 changes: 19 additions & 18 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ def append_env(L, e):
'amqp_socket.c',
'amqp_table.c',
'amqp_tcp_socket.c',
'amqp_openssl_hostname_validation.c',
'amqp_openssl.c',
'amqp_openssl_bio.c',
'amqp_time.c',
'amqp_url.c',
])
Expand All @@ -72,7 +75,8 @@ def append_env(L, e):

if is_linux: # Issue #42
libs.append('rt') # -lrt for clock_gettime

libs.append('crypto')
Comment thread
thedrow marked this conversation as resolved.
Outdated
libs.append('ssl')
librabbitmq_ext = Extension(
'_librabbitmq',
sources=list(PyC_files) + list(librabbit_files),
Expand All @@ -96,6 +100,7 @@ def append_env(L, e):
class build(_build):
stdcflags = [
'-DHAVE_CONFIG_H',
'-DENABLE_SSL_SUPPORT=ON',
]
if os.environ.get('PEDANTIC'):
# Python.h breaks -pedantic, so can only use it while developing.
Expand Down Expand Up @@ -123,26 +128,22 @@ def run(self):
)

try:

if not os.path.isdir(os.path.join(LRMQDIST(), '.git')):
print('- pull submodule rabbitmq-c...')
if os.path.isfile('Makefile'):
os.system(' '.join([make, 'submodules']))
else:
os.system(' '.join(['git', 'clone', '-b', 'v0.8.0',
'https://github.com/alanxz/rabbitmq-c.git',
'rabbitmq-c']))
print('- pull submodule rabbitmq-c...')
if os.path.isfile('Makefile'):
os.system(' '.join([make, 'submodules']))
else:
os.system(' '.join(['git', 'clone', '-b', 'master',
'https://github.com/alanxz/rabbitmq-c.git',
'rabbitmq-c']))

os.chdir(LRMQDIST())

if not os.path.isfile('configure'):
print('- autoreconf')
os.system('autoreconf -i')

if not os.path.isfile('config.h'):
print('- configure rabbitmq-c...')
if os.system('/bin/sh configure --disable-tools \
--disable-docs --disable-dependency-tracking'):
return
print('- cmake')
os.system('cmake .')
Comment thread
dhananjaysathe marked this conversation as resolved.
print(' -build')
os.system('cmake --build .')
finally:
os.environ.update(restore)
finally:
Expand All @@ -157,7 +158,7 @@ def run(self):
return librabbitmq_ext, build


def find_make(alt=('gmake', 'gnumake', 'make', 'nmake')):
def find_make(alt=('gmake', 'gnumake', 'make', 'nmake','cmake')):
for path in os.environ['PATH'].split(':'):
for make in (os.path.join(path, m) for m in alt):
if os.path.isfile(make):
Expand Down