diff --git a/bindings/cs/rl.net.cli/PerfTestCommand.cs b/bindings/cs/rl.net.cli/PerfTestCommand.cs index 210f33216..03535b0b8 100644 --- a/bindings/cs/rl.net.cli/PerfTestCommand.cs +++ b/bindings/cs/rl.net.cli/PerfTestCommand.cs @@ -73,11 +73,12 @@ private PerfTestStepProvider DoWork(string tag) }; Console.WriteLine(stepProvider.DataSize); - RLDriver rlDriver = new RLDriver(liveModel, loopKind: this.GetLoopKind()) + using (RLDriver rlDriver = new RLDriver(liveModel, loopKind: this.GetLoopKind())) { - StepInterval = TimeSpan.FromMilliseconds(this.SleepIntervalMs) - }; - rlDriver.Run(stepProvider); + rlDriver.StepInterval = TimeSpan.FromMilliseconds(this.SleepIntervalMs); + rlDriver.Run(stepProvider); + } + stepProvider.Stats.Print(); return stepProvider; } diff --git a/bindings/cs/rl.net.cli/RLDriver.cs b/bindings/cs/rl.net.cli/RLDriver.cs index de5ae0ae4..c2a2165ee 100644 --- a/bindings/cs/rl.net.cli/RLDriver.cs +++ b/bindings/cs/rl.net.cli/RLDriver.cs @@ -85,7 +85,7 @@ internal interface IOutcomeReporter bool TryQueueOutcomeEvent(RunContext runContext, string eventId, string slotId, TOutcome outcome); } - public class RLDriver : IOutcomeReporter, IOutcomeReporter + public class RLDriver : IOutcomeReporter, IOutcomeReporter, IDisposable { private LiveModel liveModel; private LoopKind loopKind; @@ -250,5 +250,30 @@ private void SafeRaiseError(ApiStatus errorStatus) localHandler(this, errorStatus); } } + + #region IDisposable Support + private bool disposedValue = false; // To detect redundant calls + + protected virtual void Dispose(bool disposing) + { + if (!disposedValue) + { + if (disposing) + { + this.liveModel?.Dispose(); + this.liveModel = null; + } + + disposedValue = true; + } + } + + + public void Dispose() + { + // Do not change this code. Put cleanup code in Dispose(bool disposing) above. + Dispose(true); + } + #endregion } } diff --git a/bindings/cs/rl.net.cli/RLSimulator.cs b/bindings/cs/rl.net.cli/RLSimulator.cs index 6b40c6354..39bc01dc1 100644 --- a/bindings/cs/rl.net.cli/RLSimulator.cs +++ b/bindings/cs/rl.net.cli/RLSimulator.cs @@ -283,7 +283,7 @@ public float GetContinuousActionOutcome(float action, float pdf_value) } } - internal class RLSimulator + internal class RLSimulator : IDisposable { private RLDriver driver; @@ -322,5 +322,29 @@ public event EventHandler OnError this.driver.OnError -= value; } } + + #region IDisposable Support + private bool disposedValue = false; // To detect redundant calls + + protected virtual void Dispose(bool disposing) + { + Console.WriteLine("Disponsing rlsim"); + if (!disposedValue) + { + if (disposing) + { + this.driver?.Dispose(); + this.driver = null; + } + + disposedValue = true; + } + } + + public void Dispose() + { + Dispose(true); + } + #endregion } } \ No newline at end of file diff --git a/bindings/cs/rl.net.cli/ReplayCommand.cs b/bindings/cs/rl.net.cli/ReplayCommand.cs index 915983e66..d5fbb9053 100644 --- a/bindings/cs/rl.net.cli/ReplayCommand.cs +++ b/bindings/cs/rl.net.cli/ReplayCommand.cs @@ -17,15 +17,17 @@ class ReplayCommand : CommandBase public override void Run() { LiveModel liveModel = Helpers.CreateLiveModelOrExit(this.ConfigPath); - RLDriver rlDriver = new RLDriver(liveModel, loopKind: this.GetLoopKind()); - rlDriver.StepInterval = TimeSpan.FromMilliseconds(this.SleepIntervalMs); - - using (TextReader textReader = File.OpenText(this.LogPath)) + using (RLDriver rlDriver = new RLDriver(liveModel, loopKind: this.GetLoopKind())) { - IEnumerable dsJsonLines = textReader.LazyReadLines(); - ReplayStepProvider stepProvider = new ReplayStepProvider(dsJsonLines); + rlDriver.StepInterval = TimeSpan.FromMilliseconds(this.SleepIntervalMs); + + using (TextReader textReader = File.OpenText(this.LogPath)) + { + IEnumerable dsJsonLines = textReader.LazyReadLines(); + ReplayStepProvider stepProvider = new ReplayStepProvider(dsJsonLines); - rlDriver.Run(stepProvider); + rlDriver.Run(stepProvider); + } } } } diff --git a/bindings/cs/rl.net.cli/RunSimulatorCommand.cs b/bindings/cs/rl.net.cli/RunSimulatorCommand.cs index 7d17e6580..1c0024b53 100644 --- a/bindings/cs/rl.net.cli/RunSimulatorCommand.cs +++ b/bindings/cs/rl.net.cli/RunSimulatorCommand.cs @@ -16,10 +16,12 @@ public override void Run() { LiveModel liveModel = Helpers.CreateLiveModelOrExit(this.ConfigPath); - RLSimulator rlSim = new RLSimulator(liveModel, loopKind: this.GetLoopKind()); - rlSim.StepInterval = TimeSpan.FromMilliseconds(this.SleepIntervalMs); - rlSim.OnError += (sender, apiStatus) => Helpers.WriteStatusAndExit(apiStatus); - rlSim.Run(this.Steps); + using (RLSimulator rlSim = new RLSimulator(liveModel, loopKind: this.GetLoopKind())) + { + rlSim.StepInterval = TimeSpan.FromMilliseconds(this.SleepIntervalMs); + rlSim.OnError += (sender, apiStatus) => Helpers.WriteStatusAndExit(apiStatus); + rlSim.Run(this.Steps); + } } } } diff --git a/bindings/cs/rl.net.native/binding_tracer.cc b/bindings/cs/rl.net.native/binding_tracer.cc index 87888ad0d..8917a3bcb 100644 --- a/bindings/cs/rl.net.native/binding_tracer.cc +++ b/bindings/cs/rl.net.native/binding_tracer.cc @@ -6,6 +6,8 @@ namespace rl_net_native { : context(_context) { } + binding_tracer::~binding_tracer() { } + void binding_tracer::log(int log_level, const std::string& msg) { if (context.trace_logger_callback != nullptr) { context.trace_logger_callback(log_level, msg.c_str()); diff --git a/bindings/cs/rl.net.native/binding_tracer.h b/bindings/cs/rl.net.native/binding_tracer.h index 455bf5194..c608cd092 100644 --- a/bindings/cs/rl.net.native/binding_tracer.h +++ b/bindings/cs/rl.net.native/binding_tracer.h @@ -8,6 +8,7 @@ namespace rl_net_native { // Inherited via i_trace binding_tracer(livemodel_context& _context); void log(int log_level, const std::string &msg) override; + virtual ~binding_tracer(); private: livemodel_context& context; }; diff --git a/rlclientlib/dedup.cc b/rlclientlib/dedup.cc index 161296232..af3af763f 100644 --- a/rlclientlib/dedup.cc +++ b/rlclientlib/dedup.cc @@ -281,7 +281,7 @@ class dedup_extensions : public logger::i_logger_extensions logger::i_logger_extensions(c), _dedup_state(c, use_compression, use_dedup, time_provider), _use_dedup(use_dedup), _use_compression(use_compression) {} logger::i_async_batcher* create_batcher(logger::i_message_sender* sender, utility::watchdog& watchdog, - error_callback_fn* perror_cb, const char* section) override { + error_callback_fn* perror_cb, i_trace* trace, const char* section) override { auto config = utility::get_batcher_config(_config, section); if(_use_dedup) { @@ -290,6 +290,7 @@ class dedup_extensions : public logger::i_logger_extensions watchdog, _dedup_state, perror_cb, + trace, config); } else { int _dummy = 0; @@ -298,6 +299,7 @@ class dedup_extensions : public logger::i_logger_extensions watchdog, _dummy, perror_cb, + trace, config); } diff --git a/rlclientlib/live_model_impl.cc b/rlclientlib/live_model_impl.cc index 8ecb3e8e4..064d891a7 100644 --- a/rlclientlib/live_model_impl.cc +++ b/rlclientlib/live_model_impl.cc @@ -397,6 +397,13 @@ namespace reinforcement_learning { _learning_mode = learning::to_learning_mode(_configuration.get(name::LEARNING_MODE, value::LEARNING_MODE_ONLINE)); } + live_model_impl::~live_model_impl() { + if (_interaction_logger) + _interaction_logger->flush(); + if (_outcome_logger) + _outcome_logger->flush(); + } + int live_model_impl::init_trace(api_status* status) { const auto trace_impl = _configuration.get(name::TRACE_LOG_IMPLEMENTATION, value::NULL_TRACE_LOGGER); i_trace* plogger; @@ -442,7 +449,7 @@ namespace reinforcement_learning { RETURN_IF_FAIL(_time_provider_factory->create(&ranking_time_provider, time_provider_impl, _configuration, _trace_logger.get(), status)); // Create a logger for interactions that will use msg sender to send interaction messages - _interaction_logger.reset(new logger::interaction_logger_facade(_model->model_type(), _configuration, ranking_msg_sender, _watchdog, ranking_time_provider, *_logger_extensions.get(), &_error_cb)); + _interaction_logger.reset(new logger::interaction_logger_facade(_model->model_type(), _configuration, ranking_msg_sender, _watchdog, ranking_time_provider, *_logger_extensions.get(), _trace_logger.get(), &_error_cb)); RETURN_IF_FAIL(_interaction_logger->init(status)); // Get the name of raw data (as opposed to message) sender for observations. @@ -463,7 +470,7 @@ namespace reinforcement_learning { RETURN_IF_FAIL(_time_provider_factory->create(&observation_time_provider, time_provider_impl, _configuration, _trace_logger.get(), status)); // Create a logger for observations that will use msg sender to send observation messages - _outcome_logger.reset(new logger::observation_logger_facade(_configuration, outcome_msg_sender, _watchdog, observation_time_provider, &_error_cb)); + _outcome_logger.reset(new logger::observation_logger_facade(_configuration, outcome_msg_sender, _watchdog, observation_time_provider, _trace_logger.get(), &_error_cb)); RETURN_IF_FAIL(_outcome_logger->init(status)); return error_code::success; diff --git a/rlclientlib/live_model_impl.h b/rlclientlib/live_model_impl.h index cab7cc40a..2ac86fd6f 100644 --- a/rlclientlib/live_model_impl.h +++ b/rlclientlib/live_model_impl.h @@ -59,6 +59,7 @@ namespace reinforcement_learning model_factory_t* m_factory, sender_factory_t* sender_factory, time_provider_factory_t* time_provider_factory); + ~live_model_impl(); live_model_impl(const live_model_impl&) = delete; live_model_impl(live_model_impl&&) = delete; diff --git a/rlclientlib/logger/async_batcher.h b/rlclientlib/logger/async_batcher.h index 6b9c57be7..d544902bc 100644 --- a/rlclientlib/logger/async_batcher.h +++ b/rlclientlib/logger/async_batcher.h @@ -13,6 +13,8 @@ #include "message_sender.h" #include "utility/config_helper.h" #include "utility/object_pool.h" +#include "trace_logger.h" +#include "str_util.h" namespace reinforcement_learning { class error_callback_fn; @@ -30,6 +32,7 @@ namespace reinforcement_learning { namespace logger { virtual int append(TEvent& evt, api_status* status = nullptr) = 0; virtual int run_iteration(api_status* status) = 0; + virtual void flush() = 0; //TODO surface errors }; // This class takes uses a queue and a background thread to accumulate events, and send them by batch asynchronously. @@ -46,20 +49,22 @@ namespace reinforcement_learning { namespace logger { int run_iteration(api_status* status) override; + void flush() override; //flush all batches + private: int fill_buffer(std::shared_ptr& retbuffer, size_t& remaining, api_status* status); - void flush(); //flush all batches public: async_batcher(i_message_sender* sender, utility::watchdog& watchdog, shared_state_t& shared_state, error_callback_fn* perror_cb, + i_trace *tracer, const utility::async_batcher_config& config); - ~async_batcher(); + virtual ~async_batcher(); private: std::unique_ptr _sender; @@ -67,6 +72,7 @@ namespace reinforcement_learning { namespace logger { event_queue _queue; // A queue to accumulate batch of events. size_t _send_high_water_mark; error_callback_fn* _perror_cb; + i_trace *_trace; shared_state_t& _shared_state; utility::periodic_background_proc _periodic_background_proc; @@ -122,6 +128,7 @@ namespace reinforcement_learning { namespace logger { TEvent evt; TSerializer collection_serializer(*buffer.get(), _batch_content_encoding, _shared_state); + int event_count = 0; while (remaining > 0 && collection_serializer.size() < _send_high_water_mark) { if (_queue.pop(&evt)) { if (queue_mode_enum::BLOCK == _queue_mode) { @@ -129,20 +136,24 @@ namespace reinforcement_learning { namespace logger { } RETURN_IF_FAIL(collection_serializer.add(evt, status)); --remaining; + ++event_count; } } - RETURN_IF_FAIL(collection_serializer.finalize(status)); + TRACE_INFO(_trace, utility::concat("async_batcher.fill_buffer: created batch with ", + event_count, " events and ", + collection_serializer.size(), " bytes")); + return error_code::success; } template class TSerializer> void async_batcher::flush() { const auto queue_size = _queue.size(); - // Early exit if queue is empty. if (queue_size == 0) { + TRACE_INFO(_trace, "async_batcher.flush: empty queue"); return; } @@ -157,7 +168,7 @@ namespace reinforcement_learning { namespace logger { ERROR_CALLBACK(_perror_cb, status); } - if (_sender->send(TSerializer::message_id(), buffer, &status) != error_code::success) { + if (_sender->send(TSerializer::message_id(), buffer, &status) != error_code::success) { ERROR_CALLBACK(_perror_cb, status); } } @@ -169,11 +180,13 @@ namespace reinforcement_learning { namespace logger { utility::watchdog& watchdog, typename TSerializer::shared_state_t& shared_state, error_callback_fn* perror_cb, + i_trace *trace, const utility::async_batcher_config& config) : _sender(sender) , _queue(config.send_queue_max_capacity) , _send_high_water_mark(config.send_high_water_mark) , _perror_cb(perror_cb) + , _trace(trace) , _shared_state(shared_state) , _periodic_background_proc(static_cast(config.send_batch_interval_ms), watchdog, "Async batcher thread", perror_cb) , _pass_prob(0.5) @@ -185,8 +198,5 @@ namespace reinforcement_learning { namespace logger { async_batcher::~async_batcher() { // Stop the background procedure the queue before exiting _periodic_background_proc.stop(); - if (_queue.size() > 0) { - flush(); - } } }} diff --git a/rlclientlib/logger/event_logger.h b/rlclientlib/logger/event_logger.h index 03647fee7..a0b609740 100644 --- a/rlclientlib/logger/event_logger.h +++ b/rlclientlib/logger/event_logger.h @@ -29,6 +29,8 @@ namespace reinforcement_learning { namespace logger { int init(api_status* status); + void flush(); + protected: int append(TEvent&& item, api_status* status); int append(TEvent& item, api_status* status); @@ -55,6 +57,13 @@ namespace reinforcement_learning { namespace logger { return error_code::success; } + + template + void event_logger::flush() + { + _batcher->flush(); + } + template int event_logger::append(TEvent&& item, api_status* status) { if (!_initialized) { diff --git a/rlclientlib/logger/logger_extensions.cc b/rlclientlib/logger/logger_extensions.cc index e70166a99..ddf50d29a 100644 --- a/rlclientlib/logger/logger_extensions.cc +++ b/rlclientlib/logger/logger_extensions.cc @@ -10,7 +10,7 @@ class default_extensions : public i_logger_extensions delete provider; //We don't use it } - i_async_batcher* create_batcher(i_message_sender* sender, utility::watchdog& watchdog, error_callback_fn* perror_cb, const char* section) override { + i_async_batcher* create_batcher(i_message_sender* sender, utility::watchdog& watchdog, error_callback_fn* perror_cb, i_trace* trace, const char* section) override { auto config = utility::get_batcher_config(_config, section); int _dummy = 0; return new async_batcher( @@ -18,6 +18,7 @@ class default_extensions : public i_logger_extensions watchdog, _dummy, perror_cb, + trace, config); } diff --git a/rlclientlib/logger/logger_facade.cc b/rlclientlib/logger/logger_facade.cc index fd90bcb5f..3e2074e0e 100644 --- a/rlclientlib/logger/logger_facade.cc +++ b/rlclientlib/logger/logger_facade.cc @@ -13,7 +13,7 @@ namespace reinforcement_learning { template i_async_batcher* create_legacy_async_batcher(const utility::configuration& c, i_message_sender* sender, utility::watchdog& watchdog, - error_callback_fn* perror_cb, const char *section, typename async_batcher::shared_state_t &shared_state) { + error_callback_fn* perror_cb, i_trace *trace, const char *section, typename async_batcher::shared_state_t &shared_state) { auto config = utility::get_batcher_config(c, section); return new async_batcher( @@ -21,6 +21,7 @@ namespace reinforcement_learning { watchdog, shared_state, perror_cb, + trace, config ); } @@ -32,17 +33,18 @@ namespace reinforcement_learning { utility::watchdog& watchdog, i_time_provider* time_provider, i_logger_extensions& ext, + i_trace* trace, error_callback_fn* perror_cb) : _model_type(model_type) , _version(c.get_int(name::PROTOCOL_VERSION, value::DEFAULT_PROTOCOL_VERSION)) , _serializer_shared_state(0) , _ext(ext) - , _v1_cb(_version == 1 && _model_type == model_type_t::CB ? new interaction_logger(time_provider, create_legacy_async_batcher(c, sender, watchdog, perror_cb, INTERACTION_SECTION, _serializer_shared_state)) : nullptr) - , _v1_ccb(_version == 1 && _model_type == model_type_t::CCB ? new ccb_logger(time_provider, create_legacy_async_batcher(c, sender, watchdog, perror_cb, INTERACTION_SECTION, _serializer_shared_state)) : nullptr) - , _v1_multislot(_version == 1 && _model_type == model_type_t::SLATES ? new multi_slot_logger(time_provider, create_legacy_async_batcher(c, sender, watchdog, perror_cb, INTERACTION_SECTION, _serializer_shared_state)) : nullptr) + , _v1_cb(_version == 1 && _model_type == model_type_t::CB ? new interaction_logger(time_provider, create_legacy_async_batcher(c, sender, watchdog, perror_cb, trace, INTERACTION_SECTION, _serializer_shared_state)) : nullptr) + , _v1_ccb(_version == 1 && _model_type == model_type_t::CCB ? new ccb_logger(time_provider, create_legacy_async_batcher(c, sender, watchdog, perror_cb, trace, INTERACTION_SECTION, _serializer_shared_state)) : nullptr) + , _v1_multislot(_version == 1 && _model_type == model_type_t::SLATES ? new multi_slot_logger(time_provider, create_legacy_async_batcher(c, sender, watchdog, perror_cb, trace, INTERACTION_SECTION, _serializer_shared_state)) : nullptr) , _v2(_version == 2 ? new generic_event_logger( time_provider, - ext.create_batcher(sender, watchdog, perror_cb, INTERACTION_SECTION)) : nullptr) { + ext.create_batcher(sender, watchdog, perror_cb, trace, INTERACTION_SECTION)) : nullptr) { } int interaction_logger_facade::init(api_status* status) { @@ -158,18 +160,34 @@ namespace reinforcement_learning { } } + void interaction_logger_facade::flush() { + switch (_version) { + case 1: + if(_v1_cb) + _v1_cb->flush(); + else if (_v1_ccb) + _v1_ccb->flush(); + else if (_v1_multislot) + _v1_multislot->flush(); + break; + case 2: _v2->flush(); break; + } + } + + observation_logger_facade::observation_logger_facade( const utility::configuration& c, i_message_sender* sender, utility::watchdog& watchdog, i_time_provider* time_provider, + i_trace* trace, error_callback_fn* perror_cb) : _version(c.get_int(name::PROTOCOL_VERSION, value::DEFAULT_PROTOCOL_VERSION)) , _serializer_shared_state(0) - , _v1(_version == 1 ? new observation_logger(time_provider, create_legacy_async_batcher(c, sender, watchdog, perror_cb, OBSERVATION_SECTION, _serializer_shared_state)) : nullptr) + , _v1(_version == 1 ? new observation_logger(time_provider, create_legacy_async_batcher(c, sender, watchdog, perror_cb, trace, OBSERVATION_SECTION, _serializer_shared_state)) : nullptr) , _v2(_version == 2 ? new generic_event_logger( time_provider, - create_legacy_async_batcher(c, sender, watchdog, perror_cb, OBSERVATION_SECTION, _serializer_shared_state)) : nullptr) { + create_legacy_async_batcher(c, sender, watchdog, perror_cb, trace, OBSERVATION_SECTION, _serializer_shared_state)) : nullptr) { } int observation_logger_facade::init(api_status* status) { @@ -232,5 +250,12 @@ namespace reinforcement_learning { default: return protocol_not_supported(status); } } + + void observation_logger_facade::flush() { + switch (_version) { + case 1: _v1->flush(); break; + case 2: _v2->flush(); break; + } + } } } diff --git a/rlclientlib/logger/logger_facade.h b/rlclientlib/logger/logger_facade.h index caa722a8d..c1ac44c31 100644 --- a/rlclientlib/logger/logger_facade.h +++ b/rlclientlib/logger/logger_facade.h @@ -14,6 +14,8 @@ #include "event_logger.h" #include "model_mgmt.h" +#include "trace_logger.h" + #include "serialization/payload_serializer.h" namespace reinforcement_learning @@ -30,7 +32,7 @@ namespace reinforcement_learning virtual bool is_object_extraction_enabled() const = 0; virtual bool is_serialization_transform_enabled() const = 0; - virtual i_async_batcher* create_batcher(i_message_sender* sender, utility::watchdog& watchdog, error_callback_fn* perror_cb, const char* section) = 0; + virtual i_async_batcher* create_batcher(i_message_sender* sender, utility::watchdog& watchdog, error_callback_fn* perror_cb, i_trace* trace, const char* section) = 0; virtual int transform_payload_and_extract_objects(const char* context, std::string& edited_payload, generic_event::object_list_t& objects, api_status* status) = 0; virtual int transform_serialized_payload(generic_event::payload_buffer_t& input, event_content_type &content_type, api_status* status) const = 0; @@ -41,7 +43,7 @@ namespace reinforcement_learning public: interaction_logger_facade(reinforcement_learning::model_management::model_type_t model_type, const utility::configuration& c, i_message_sender* sender, utility::watchdog& watchdog, - i_time_provider* time_provider, i_logger_extensions& ext, error_callback_fn* perror_cb = nullptr); + i_time_provider* time_provider, i_logger_extensions& ext, i_trace* trace, error_callback_fn* perror_cb = nullptr); interaction_logger_facade(const interaction_logger_facade& other) = delete; interaction_logger_facade& operator=(const interaction_logger_facade& other) = delete; @@ -51,6 +53,7 @@ namespace reinforcement_learning ~interaction_logger_facade() = default; int init(api_status* status); + void flush(); //CB v1/v2 int log(const char* context, unsigned int flags, const ranking_response& response, api_status* status, learning_mode learning_mode = ONLINE); @@ -87,7 +90,7 @@ namespace reinforcement_learning class observation_logger_facade { public: observation_logger_facade(const utility::configuration& c, - i_message_sender* sender, utility::watchdog& watchdog, i_time_provider* time_provider, error_callback_fn* perror_cb = nullptr); + i_message_sender* sender, utility::watchdog& watchdog, i_time_provider* time_provider, i_trace* trace, error_callback_fn* perror_cb = nullptr); observation_logger_facade(const observation_logger_facade& other) = delete; observation_logger_facade& operator=(const observation_logger_facade& other) = delete; @@ -97,6 +100,8 @@ namespace reinforcement_learning ~observation_logger_facade() = default; int init(api_status* status); + void flush(); + int log(const char* event_id, float outcome, api_status* status); int log(const char* event_id, const char* outcome, api_status* status); diff --git a/test_tools/log_parser/parser.py b/test_tools/log_parser/parser.py index 5966c2416..dd537809a 100755 --- a/test_tools/log_parser/parser.py +++ b/test_tools/log_parser/parser.py @@ -7,6 +7,9 @@ import struct import datetime +import warnings +warnings.filterwarnings("ignore", category=DeprecationWarning) + PREAMBLE_LENGTH = 8 PRETTY_PRINT_JSON=False @@ -145,9 +148,11 @@ def dump_event_batch(buf): def dump_preamble_file(file_name, buf): - preamble = parse_preamble(buf) - print(f'parsing preamble file {file_name}\n\tpreamble:{preamble}') - dump_event_batch(buf[PREAMBLE_LENGTH : PREAMBLE_LENGTH + preamble["msg_size"]]) + while len(buf) > 8: + preamble = parse_preamble(buf) + print(f'parsing preamble file {file_name}\n\tpreamble:{preamble}') + dump_event_batch(buf[PREAMBLE_LENGTH : PREAMBLE_LENGTH + preamble["msg_size"]]) + buf = buf[PREAMBLE_LENGTH + preamble["msg_size"]:] MSG_TYPE_HEADER = 0x55555555 MSG_TYPE_CHECKPOINT = 0x11111111 diff --git a/unit_test/async_batcher_test.cc b/unit_test/async_batcher_test.cc index 1f36e99ec..86e23422e 100644 --- a/unit_test/async_batcher_test.cc +++ b/unit_test/async_batcher_test.cc @@ -101,7 +101,7 @@ BOOST_AUTO_TEST_CASE(flush_timeout) { config.send_batch_interval_ms = timeout_ms; config.send_queue_max_capacity = 8192; int dummy = 0; - logger::async_batcher batcher(s, watchdog, dummy, &error_fn, config); + logger::async_batcher batcher(s, watchdog, dummy, &error_fn, nullptr, config); batcher.init(nullptr); // Allow periodic_background_proc inside async_batcher to start waiting // on a timer before sending any events to it. Else we risk not // triggering the batch mechanism and might get triggered by initial @@ -129,7 +129,7 @@ BOOST_AUTO_TEST_CASE(flush_batches) { config.send_batch_interval_ms = 100000; int dummy = 0; auto batcher = new logger::async_batcher - (s, watchdog, dummy, &error_fn, config); + (s, watchdog, dummy, &error_fn, nullptr, config); batcher->init(nullptr); // Allow periodic_background_proc inside async_batcher to start waiting // on a timer before sending any events to it. Else we risk not // triggering the batch mechanism and might get triggered by initial @@ -157,7 +157,7 @@ BOOST_AUTO_TEST_CASE(flush_after_deletion) { utility::async_batcher_config config; int dummy = 0; auto* batcher = new logger::async_batcher - (s, watchdog, dummy, nullptr, config); + (s, watchdog, dummy, nullptr, nullptr, config); batcher->init(nullptr); // Allow periodic_background_proc to start waiting std::this_thread::sleep_for(std::chrono::milliseconds(20)); std::string foo("foo"); @@ -184,7 +184,7 @@ BOOST_AUTO_TEST_CASE(queue_overflow_do_not_drop_event) { config.send_queue_max_capacity = queue_max_size; config.queue_mode = queue_mode; int dummy = 0; - auto batcher = new logger::async_batcher(s, watchdog, dummy, &error_fn, config); + auto batcher = new logger::async_batcher(s, watchdog, dummy, &error_fn, nullptr, config); batcher->init(nullptr); // Allow periodic_background_proc to start waiting std::this_thread::sleep_for(std::chrono::milliseconds(20)); int n = 10;