diff --git a/tools/cabana/streams/devicestream.cc b/tools/cabana/streams/devicestream.cc index 462dd7a36142d7..20eaa70cd6b56f 100644 --- a/tools/cabana/streams/devicestream.cc +++ b/tools/cabana/streams/devicestream.cc @@ -6,7 +6,9 @@ #include "cereal/services.h" #include +#include #include +#include #include #include #include @@ -17,12 +19,37 @@ DeviceStream::DeviceStream(QObject *parent, QString address) : zmq_address(address), LiveStream(parent) { } +DeviceStream::~DeviceStream() { + if (!bridge_process) + return; + + bridge_process->terminate(); + if (!bridge_process->waitForFinished(3000)) { + bridge_process->kill(); + bridge_process->waitForFinished(); + } +} + +void DeviceStream::start() { + if (!zmq_address.isEmpty()) { + bridge_process = new QProcess(this); + QString bridge_path = QCoreApplication::applicationDirPath() + "/../../cereal/messaging/bridge"; + bridge_process->start(QFileInfo(bridge_path).absoluteFilePath(), QStringList { zmq_address, "/\"can/\"" }); + + if (!bridge_process->waitForStarted()) { + QMessageBox::warning(nullptr, tr("Error"), tr("Failed to start bridge: %1").arg(bridge_process->errorString())); + return; + } + } + + LiveStream::start(); +} + void DeviceStream::streamThread() { zmq_address.isEmpty() ? unsetenv("ZMQ") : setenv("ZMQ", "1", 1); std::unique_ptr context(Context::create()); - std::string address = zmq_address.isEmpty() ? "127.0.0.1" : zmq_address.toStdString(); - std::unique_ptr sock(SubSocket::create(context.get(), "can", address, false, true, services.at("can").queue_size)); + std::unique_ptr sock(SubSocket::create(context.get(), "can", "127.0.0.1", false, true, services.at("can").queue_size)); assert(sock != NULL); // run as fast as messages come in while (!QThread::currentThread()->isInterruptionRequested()) { diff --git a/tools/cabana/streams/devicestream.h b/tools/cabana/streams/devicestream.h index 3bdf22499868e5..6beb300d7adf56 100644 --- a/tools/cabana/streams/devicestream.h +++ b/tools/cabana/streams/devicestream.h @@ -2,16 +2,21 @@ #include "tools/cabana/streams/livestream.h" +#include + class DeviceStream : public LiveStream { Q_OBJECT public: DeviceStream(QObject *parent, QString address = {}); + ~DeviceStream(); inline QString routeName() const override { return QString("Live Streaming From %1").arg(zmq_address.isEmpty() ? "127.0.0.1" : zmq_address); } protected: + void start() override; void streamThread() override; + QProcess *bridge_process = nullptr; const QString zmq_address; };