Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 31 additions & 11 deletions lib/BackbeatConsumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,10 @@ class BackbeatConsumer extends EventEmitter {
const consumerParams = {
'metadata.broker.list': this._kafkaHosts,
'group.id': this._groupId,
// This is the default in our current librdkafka version, but we
// pin it explicitly because we depend on eager rebalancing and
// don't want it changed implicitly by future version updates.
'partition.assignment.strategy': 'range,roundrobin',
// we manage stored offsets based on the highest
// contiguous offset fully processed by a worker, so
// disabling automatic offset store is needed
Expand Down Expand Up @@ -873,7 +877,32 @@ class BackbeatConsumer extends EventEmitter {
let producer; // eslint-disable-line prefer-const
let producerTimer;
let consumerTimer;
let inFlightConsumes = 0;
// Wait for all in-flight consume(1, consumeCb) workers to complete
// before calling unsubscribe(). The setInterval(200ms) pattern can
// have multiple consume workers running concurrently in the C++
// thread pool (each with a 1000ms timeout). Since librdkafka 2.10.0,
// these workers survive an unsubscribe→subscribe transition and can
// steal messages from the next subscription.
function _finishBootstrap(partition, offset) {
if (inFlightConsumes > 0) {
Comment thread
delthas marked this conversation as resolved.
Outdated
setTimeout(() => _finishBootstrap(partition, offset), 50);
return;
}
self._consumer.offsetsStore([{
topic: self._topic,
partition,
offset,
}]);
self._consumer.commit();
self._consumer.unsubscribe();
producer.close(() => {
self._bootstrapping = false;
self._onReady();
});
}
function consumeCb(err, messages) {
inFlightConsumes--;
if (err) {
return undefined;
}
Expand All @@ -888,17 +917,7 @@ class BackbeatConsumer extends EventEmitter {
{ topic: self._topic, groupId: self._groupId });
clearInterval(producerTimer);
clearInterval(consumerTimer);
self._consumer.offsetsStore([{
topic: self._topic,
partition: message.partition,
offset: message.offset + 1,
}]);
self._consumer.commit();
self._consumer.unsubscribe();
producer.close(() => {
self._bootstrapping = false;
self._onReady();
});
_finishBootstrap(message.partition, message.offset + 1);
}
}
});
Expand Down Expand Up @@ -928,6 +947,7 @@ class BackbeatConsumer extends EventEmitter {
this._consumer.on('ready', () => {
this._consumer.subscribe([this._topic]);
consumerTimer = setInterval(() => {
inFlightConsumes++;
this._consumer.consume(1, consumeCb);
}, 200);
});
Expand Down
4 changes: 4 additions & 0 deletions lib/queuePopulator/KafkaLogConsumer/LogConsumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ class LogConsumer extends EventEmitter {
setup(done) {
// partition offsets will be managed by kafka
const consumerParams = {
// This is the default in our current librdkafka version, but we
// pin it explicitly because we depend on eager rebalancing and
// don't want it changed implicitly by future version updates.
'partition.assignment.strategy': 'range,roundrobin',
// Manually manage storing offsets to ensure they are only stored
// after the batch processing is fully completed.
'enable.auto.offset.store': false,
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
"minimatch": "^10.0.1",
"mongodb": "^6.11.0",
"node-forge": "^1.3.1",
"node-rdkafka": "^2.12.0",
"node-rdkafka": "^3.6.0",
Comment thread
delthas marked this conversation as resolved.
Comment thread
delthas marked this conversation as resolved.
"node-rdkafka-prometheus": "^1.0.0",
"node-schedule": "^2.1.1",
"node-zookeeper-client": "^1.1.3",
Expand Down
17 changes: 11 additions & 6 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -7613,11 +7613,16 @@ ms@2.1.3, ms@^2.0.0, ms@^2.1.1, ms@^2.1.3:
resolved "https://registry.yarnpkg.com/ms/-/ms-2.1.3.tgz#574c8138ce1d2b5861f0b44579dbadd60c6615b2"
integrity sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==

nan@^2.14.0, nan@^2.17.0, nan@^2.18.0, nan@^2.3.2:
nan@^2.14.0, nan@^2.18.0, nan@^2.3.2:
version "2.23.1"
resolved "https://registry.yarnpkg.com/nan/-/nan-2.23.1.tgz#6f86a31dd87e3d1eb77512bf4b9e14c8aded3975"
integrity sha512-r7bBUGKzlqk8oPBDYxt6Z0aEdF1G1rwlMcLk8LCOMbOzf0mG+JUfUzG4fIMWwHWP0iyaLWEQZJmtB7nOHEm/qw==

nan@^2.24.0:
version "2.26.2"
resolved "https://registry.yarnpkg.com/nan/-/nan-2.26.2.tgz#2e5e25764224c737b9897790b57c3294d4dcee9c"
integrity sha512-0tTvBTYkt3tdGw22nrAy50x7gpbGCCFH3AFcyS5WiUu7Eu4vWlri1woE6qHBSfy11vksDqkiwjOnlR7WV8G1Hw==

napi-macros@~2.0.0:
version "2.0.0"
resolved "https://registry.yarnpkg.com/napi-macros/-/napi-macros-2.0.0.tgz#2b6bae421e7b96eb687aa6c77a7858640670001b"
Expand Down Expand Up @@ -7751,13 +7756,13 @@ node-rdkafka-prometheus@^1.0.0:
"@log4js-node/log4js-api" "^1.0.0"
prom-client "^12.0.0"

node-rdkafka@^2.12.0:
version "2.18.0"
resolved "https://registry.yarnpkg.com/node-rdkafka/-/node-rdkafka-2.18.0.tgz#116950e49dfe804932c8bc6dbc68949793e72ee2"
integrity sha512-jYkmO0sPvjesmzhv1WFOO4z7IMiAFpThR6/lcnFDWgSPkYL95CtcuVNo/R5PpjujmqSgS22GMkL1qvU4DTAvEQ==
node-rdkafka@^3.6.0:
version "3.6.1"
resolved "https://registry.yarnpkg.com/node-rdkafka/-/node-rdkafka-3.6.1.tgz#164357f08ff23a4722e89bfb3b2b09481c1e8cc1"
integrity sha512-sfpTbrT35429cs0RE8Yb9avGX9BiRRvpV7aweQsghKTjtrVZP3jBrKOPItWXAqHb8doyh3SkuGuUVBLgig1SRg==
dependencies:
bindings "^1.3.1"
nan "^2.17.0"
nan "^2.24.0"

node-releases@^2.0.27:
version "2.0.27"
Expand Down
Loading