Skip to content

Commit 2c191c1

Browse files
Remove ZMQ support (#674)
* remove zmq * less abstraction
1 parent f9ebdca commit 2c191c1

19 files changed

Lines changed: 50 additions & 441 deletions

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# MSGQ: A lock free single producer multi consumer message queue
22

33
## What is this library?
4-
MSGQ is a generic high performance IPC pub sub system with a single publisher and multiple subscribers. MSGQ is designed to be a high performance replacement for ZMQ-like SUB/PUB patterns. It uses a ring buffer in shared memory to efficiently read and write data. Each read requires a copy. Writing can be done without a copy, as long as the size of the data is known in advance. While MSGQ is the core of this library, this library also allows replacing the MSGQ backend with ZMQ or a spoofed implementation that can be used for deterministic testing. This library also contains visionipc, an IPC system specifically for large contiguous buffers (like images/video).
4+
MSGQ is a generic high performance IPC pub sub system with a single publisher and multiple subscribers. It uses a ring buffer in shared memory to efficiently read and write data. Each read requires a copy. Writing can be done without a copy, as long as the size of the data is known in advance. This library also provides a spoofed implementation that can be used for deterministic testing, and visionipc, an IPC system specifically for large contiguous buffers (like images/video).
55

66
## Storage
77
The storage for the queue consists of an area of metadata, and the actual buffer. The metadata contains:

SConscript

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,12 @@ gen_dir = Dir('gen')
99
msgq_objects = env.SharedObject([
1010
'msgq/ipc.cc',
1111
'msgq/event.cc',
12-
'msgq/impl_zmq.cc',
1312
'msgq/impl_msgq.cc',
1413
'msgq/impl_fake.cc',
1514
'msgq/msgq.cc',
1615
])
1716
msgq = env.Library('msgq', msgq_objects)
18-
msgq_python = envCython.Program('msgq/ipc_pyx.so', 'msgq/ipc_pyx.pyx', LIBS=envCython["LIBS"]+[msgq, "zmq", common])
17+
msgq_python = envCython.Program('msgq/ipc_pyx.so', 'msgq/ipc_pyx.pyx', LIBS=envCython["LIBS"]+[msgq, common])
1918

2019
# Build Vision IPC
2120
vipc_files = ['visionipc.cc', 'visionipc_server.cc', 'visionipc_client.cc']
@@ -29,7 +28,7 @@ vipc_objects = env.SharedObject(vipc_sources)
2928
visionipc = env.Library('visionipc', vipc_objects)
3029

3130

32-
vipc_libs = envCython["LIBS"] + [visionipc, msgq, common, "zmq"]
31+
vipc_libs = envCython["LIBS"] + [visionipc, msgq, common]
3332
envCython.Program(f'{visionipc_dir.abspath}/visionipc_pyx.so', f'{visionipc_dir.abspath}/visionipc_pyx.pyx',
3433
LIBS=vipc_libs)
3534

msgq/__init__.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
# must be built with scons
22
from msgq.ipc_pyx import Context, Poller, SubSocket, PubSocket, SocketEventHandle, toggle_fake_events, \
3-
set_fake_prefix, get_fake_prefix, delete_fake_prefix, wait_for_one_event, \
4-
context_is_zmq
3+
set_fake_prefix, get_fake_prefix, delete_fake_prefix, wait_for_one_event
54
from msgq.ipc_pyx import MultiplePublishersError, IpcError
65

76
from typing import Optional, List, Union
@@ -13,7 +12,6 @@
1312
assert get_fake_prefix
1413
assert delete_fake_prefix
1514
assert wait_for_one_event
16-
assert context_is_zmq
1715

1816
NO_TRAVERSAL_LIMIT = 2**64-1
1917

msgq/conftest.py

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,6 @@
1-
import os
21
import pytest
32
import msgq
43

5-
@pytest.fixture(params=[False, True], ids=["msgq", "zmq"], autouse=True)
6-
def zmq_mode(request):
7-
if request.param:
8-
os.environ["ZMQ"] = "1"
9-
else:
10-
os.environ.pop("ZMQ", None)
4+
@pytest.fixture(autouse=True)
5+
def msgq_context():
116
msgq.context = msgq.Context()
12-
assert msgq.context_is_zmq() == request.param
13-
yield request.param
14-
os.environ.pop("ZMQ", None)

msgq/impl_msgq.cc

Lines changed: 14 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -8,36 +8,30 @@
88

99
#include "msgq/impl_msgq.h"
1010

11-
MSGQContext::MSGQContext() {
12-
}
13-
14-
MSGQContext::~MSGQContext() {
15-
}
16-
17-
void MSGQMessage::init(size_t sz) {
11+
void Message::init(size_t sz) {
1812
size = sz;
1913
data = new char[size];
2014
}
2115

22-
void MSGQMessage::init(char * d, size_t sz) {
16+
void Message::init(char * d, size_t sz) {
2317
size = sz;
2418
data = new char[size];
2519
memcpy(data, d, size);
2620
}
2721

28-
void MSGQMessage::takeOwnership(char * d, size_t sz) {
22+
void Message::takeOwnership(char * d, size_t sz) {
2923
size = sz;
3024
data = d;
3125
}
3226

33-
void MSGQMessage::close() {
27+
void Message::close() {
3428
if (size > 0){
3529
delete[] data;
3630
}
3731
size = 0;
3832
}
3933

40-
MSGQMessage::~MSGQMessage() {
34+
Message::~Message() {
4135
this->close();
4236
}
4337

@@ -67,7 +61,7 @@ int MSGQSubSocket::connect(Context *context, std::string endpoint, std::string a
6761
Message * MSGQSubSocket::receive(bool non_blocking){
6862
msgq_msg_t msg;
6963

70-
MSGQMessage *r = NULL;
64+
Message *r = NULL;
7165

7266
int rc = msgq_msg_recv(&msg, q);
7367

@@ -92,11 +86,11 @@ Message * MSGQSubSocket::receive(bool non_blocking){
9286
}
9387

9488
if (rc > 0){
95-
r = new MSGQMessage;
89+
r = new Message;
9690
r->takeOwnership(msg.data, msg.size);
9791
}
9892

99-
return (Message*)r;
93+
return r;
10094
}
10195

10296
void MSGQSubSocket::setTimeout(int t){
@@ -110,14 +104,9 @@ MSGQSubSocket::~MSGQSubSocket(){
110104
}
111105
}
112106

113-
int MSGQPubSocket::connect(Context *context, std::string endpoint, bool check_endpoint, size_t segment_size){
107+
int PubSocket::connect(Context *context, std::string endpoint, bool check_endpoint, size_t segment_size){
114108
assert(context);
115109

116-
// TODO
117-
//if (check_endpoint && !service_exists(std::string(endpoint))){
118-
// std::cout << "Warning, " << std::string(endpoint) << " is not in service list." << std::endl;
119-
//}
120-
121110
q = new msgq_queue_t;
122111
size_t size = segment_size > 0 ? segment_size : DEFAULT_SEGMENT_SIZE;
123112
int r = msgq_new_queue(q, endpoint.c_str(), size);
@@ -130,27 +119,27 @@ int MSGQPubSocket::connect(Context *context, std::string endpoint, bool check_en
130119
return 0;
131120
}
132121

133-
int MSGQPubSocket::sendMessage(Message *message){
122+
int PubSocket::sendMessage(Message *message){
134123
msgq_msg_t msg;
135124
msg.data = message->getData();
136125
msg.size = message->getSize();
137126

138127
return msgq_msg_send(&msg, q);
139128
}
140129

141-
int MSGQPubSocket::send(char *data, size_t size){
130+
int PubSocket::send(char *data, size_t size){
142131
msgq_msg_t msg;
143132
msg.data = data;
144133
msg.size = size;
145134

146135
return msgq_msg_send(&msg, q);
147136
}
148137

149-
bool MSGQPubSocket::all_readers_updated() {
138+
bool PubSocket::all_readers_updated() {
150139
return msgq_all_readers_updated(q);
151140
}
152141

153-
MSGQPubSocket::~MSGQPubSocket(){
142+
PubSocket::~PubSocket(){
154143
if (q != NULL){
155144
msgq_close_queue(q);
156145
delete q;
@@ -160,7 +149,7 @@ MSGQPubSocket::~MSGQPubSocket(){
160149

161150
void MSGQPoller::registerSocket(SubSocket * socket){
162151
assert(num_polls + 1 < MAX_POLLERS);
163-
polls[num_polls].q = (msgq_queue_t*)socket->getRawSocket();
152+
polls[num_polls].q = static_cast<MSGQSubSocket*>(socket)->getQueue();
164153

165154
sockets.push_back(socket);
166155
num_polls++;

msgq/impl_msgq.h

Lines changed: 1 addition & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -8,52 +8,18 @@
88

99
#define MAX_POLLERS 128
1010

11-
class MSGQContext : public Context {
12-
private:
13-
void * context = NULL;
14-
public:
15-
MSGQContext();
16-
void * getRawContext() {return context;}
17-
~MSGQContext();
18-
};
19-
20-
class MSGQMessage : public Message {
21-
private:
22-
char * data;
23-
size_t size;
24-
public:
25-
void init(size_t size);
26-
void init(char *data, size_t size);
27-
void takeOwnership(char *data, size_t size);
28-
size_t getSize(){return size;}
29-
char * getData(){return data;}
30-
void close();
31-
~MSGQMessage();
32-
};
33-
3411
class MSGQSubSocket : public SubSocket {
3512
private:
3613
msgq_queue_t * q = NULL;
3714
int timeout;
3815
public:
3916
int connect(Context *context, std::string endpoint, std::string address, bool conflate=false, bool check_endpoint=true, size_t segment_size=0);
4017
void setTimeout(int timeout);
41-
void * getRawSocket() {return (void*)q;}
18+
msgq_queue_t * getQueue() {return q;}
4219
Message *receive(bool non_blocking=false);
4320
~MSGQSubSocket();
4421
};
4522

46-
class MSGQPubSocket : public PubSocket {
47-
private:
48-
msgq_queue_t * q = NULL;
49-
public:
50-
int connect(Context *context, std::string endpoint, bool check_endpoint=true, size_t segment_size=0);
51-
int sendMessage(Message *message);
52-
int send(char *data, size_t size);
53-
bool all_readers_updated();
54-
~MSGQPubSocket();
55-
};
56-
5723
class MSGQPoller : public Poller {
5824
private:
5925
std::vector<SubSocket*> sockets;

0 commit comments

Comments
 (0)