Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 49 additions & 31 deletions lib/BackbeatConsumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,10 @@ class BackbeatConsumer extends EventEmitter {
const consumerParams = {
'metadata.broker.list': this._kafkaHosts,
'group.id': this._groupId,
// This is the default in our current librdkafka version, but we
// pin it explicitly because we depend on eager rebalancing and
// don't want it changed implicitly by future version updates.
'partition.assignment.strategy': 'range,roundrobin',
// we manage stored offsets based on the highest
// contiguous offset fully processed by a worker, so
// disabling automatic offset store is needed
Expand Down Expand Up @@ -872,37 +876,53 @@ class BackbeatConsumer extends EventEmitter {
let lastBootstrapId;
let producer; // eslint-disable-line prefer-const
let producerTimer;
let consumerTimer;
function consumeCb(err, messages) {
if (err) {
return undefined;
}
messages.forEach(message => {
const bootstrapId = JSON.parse(message.value).bootstrapId;
if (bootstrapId) {
self._log.info('bootstraping backbeat consumer: ' +
'received bootstrap message',
{ bootstrapId, topic: self._topic, groupId: self._groupId });
if (bootstrapId === lastBootstrapId) {
self._log.info('backbeat consumer is bootstrapped',
{ topic: self._topic, groupId: self._groupId });
clearInterval(producerTimer);
clearInterval(consumerTimer);
self._consumer.offsetsStore([{
topic: self._topic,
partition: message.partition,
offset: message.offset + 1,
}]);
self._consumer.commit();
self._consumer.unsubscribe();
producer.close(() => {
self._bootstrapping = false;
self._onReady();
});
let bootstrapDone = false;
// Use chained setTimeout instead of setInterval to ensure
// only one consume() worker is in flight at a time. With
// setInterval, multiple C++ async workers can overlap, and
// since librdkafka 2.10.0 they survive an
// unsubscribe→subscribe transition, stealing messages from
// the next subscription.
function consumeNext() {
Comment thread
delthas marked this conversation as resolved.
Outdated
self._consumer.consume(1, (err, messages) => {
if (err || bootstrapDone) {
if (!bootstrapDone) {
setTimeout(consumeNext, 200);
}
return undefined;
}
Comment thread
delthas marked this conversation as resolved.
Outdated
let matched = false;
messages.forEach(message => {
Comment thread
delthas marked this conversation as resolved.
Outdated
const bootstrapId = JSON.parse(message.value).bootstrapId;
if (bootstrapId) {
self._log.info('bootstraping backbeat consumer: ' +
'received bootstrap message',
{ bootstrapId, topic: self._topic, groupId: self._groupId });
if (bootstrapId === lastBootstrapId) {
self._log.info('backbeat consumer is bootstrapped',
{ topic: self._topic, groupId: self._groupId });
matched = true;
bootstrapDone = true;
Comment thread
delthas marked this conversation as resolved.
Outdated
clearInterval(producerTimer);
Comment thread
delthas marked this conversation as resolved.
Outdated
self._consumer.offsetsStore([{
topic: self._topic,
partition: message.partition,
offset: message.offset + 1,
}]);
self._consumer.commit();
self._consumer.unsubscribe();
producer.close(() => {
self._bootstrapping = false;
self._onReady();
});
}
}
});
if (!matched) {
setTimeout(consumeNext, 200);
}
return undefined;
});
return undefined;
}
assert.strictEqual(this._consumer, null);
producer = new BackbeatProducer({
Expand All @@ -927,9 +947,7 @@ class BackbeatConsumer extends EventEmitter {
this._initConsumer();
this._consumer.on('ready', () => {
this._consumer.subscribe([this._topic]);
consumerTimer = setInterval(() => {
this._consumer.consume(1, consumeCb);
}, 200);
consumeNext();
});
}, 500);
}
Expand Down
4 changes: 4 additions & 0 deletions lib/queuePopulator/KafkaLogConsumer/LogConsumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ class LogConsumer extends EventEmitter {
setup(done) {
// partition offsets will be managed by kafka
const consumerParams = {
// This is the default in our current librdkafka version, but we
// pin it explicitly because we depend on eager rebalancing and
// don't want it changed implicitly by future version updates.
'partition.assignment.strategy': 'range,roundrobin',
// Manually manage storing offsets to ensure they are only stored
// after the batch processing is fully completed.
'enable.auto.offset.store': false,
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
"minimatch": "^10.0.1",
"mongodb": "^6.11.0",
"node-forge": "^1.3.1",
"node-rdkafka": "^2.12.0",
"node-rdkafka": "^3.6.0",
Comment thread
delthas marked this conversation as resolved.
Comment thread
delthas marked this conversation as resolved.
"node-rdkafka-prometheus": "^1.0.0",
"node-schedule": "^2.1.1",
"node-zookeeper-client": "^1.1.3",
Expand Down
17 changes: 11 additions & 6 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -7613,11 +7613,16 @@ ms@2.1.3, ms@^2.0.0, ms@^2.1.1, ms@^2.1.3:
resolved "https://registry.yarnpkg.com/ms/-/ms-2.1.3.tgz#574c8138ce1d2b5861f0b44579dbadd60c6615b2"
integrity sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==

nan@^2.14.0, nan@^2.17.0, nan@^2.18.0, nan@^2.3.2:
nan@^2.14.0, nan@^2.18.0, nan@^2.3.2:
version "2.23.1"
resolved "https://registry.yarnpkg.com/nan/-/nan-2.23.1.tgz#6f86a31dd87e3d1eb77512bf4b9e14c8aded3975"
integrity sha512-r7bBUGKzlqk8oPBDYxt6Z0aEdF1G1rwlMcLk8LCOMbOzf0mG+JUfUzG4fIMWwHWP0iyaLWEQZJmtB7nOHEm/qw==

nan@^2.24.0:
version "2.26.2"
resolved "https://registry.yarnpkg.com/nan/-/nan-2.26.2.tgz#2e5e25764224c737b9897790b57c3294d4dcee9c"
integrity sha512-0tTvBTYkt3tdGw22nrAy50x7gpbGCCFH3AFcyS5WiUu7Eu4vWlri1woE6qHBSfy11vksDqkiwjOnlR7WV8G1Hw==

napi-macros@~2.0.0:
version "2.0.0"
resolved "https://registry.yarnpkg.com/napi-macros/-/napi-macros-2.0.0.tgz#2b6bae421e7b96eb687aa6c77a7858640670001b"
Expand Down Expand Up @@ -7751,13 +7756,13 @@ node-rdkafka-prometheus@^1.0.0:
"@log4js-node/log4js-api" "^1.0.0"
prom-client "^12.0.0"

node-rdkafka@^2.12.0:
version "2.18.0"
resolved "https://registry.yarnpkg.com/node-rdkafka/-/node-rdkafka-2.18.0.tgz#116950e49dfe804932c8bc6dbc68949793e72ee2"
integrity sha512-jYkmO0sPvjesmzhv1WFOO4z7IMiAFpThR6/lcnFDWgSPkYL95CtcuVNo/R5PpjujmqSgS22GMkL1qvU4DTAvEQ==
node-rdkafka@^3.6.0:
version "3.6.1"
resolved "https://registry.yarnpkg.com/node-rdkafka/-/node-rdkafka-3.6.1.tgz#164357f08ff23a4722e89bfb3b2b09481c1e8cc1"
integrity sha512-sfpTbrT35429cs0RE8Yb9avGX9BiRRvpV7aweQsghKTjtrVZP3jBrKOPItWXAqHb8doyh3SkuGuUVBLgig1SRg==
dependencies:
bindings "^1.3.1"
nan "^2.17.0"
nan "^2.24.0"

node-releases@^2.0.27:
version "2.0.27"
Expand Down
Loading