Skip to content

Commit b988ab1

Browse files
fix unhandler error -172 on entry committable causing the server to crash
BB-758
1 parent fc1e238 commit b988ab1

File tree

3 files changed

+189
-4
lines changed

3 files changed

+189
-4
lines changed

lib/BackbeatConsumer.js

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -823,10 +823,22 @@ class BackbeatConsumer extends EventEmitter {
823823
});
824824
// ensure consumer is active when calling offsetsStore() on
825825
// it, to avoid raising an exception (invalid state)
826-
// TODO : potential issue here, see BB-758
827-
if (committableOffset !== null && !this.isPaused()) {
828-
this._consumer.offsetsStore([{ topic, partition,
829-
offset: committableOffset }]);
826+
if (committableOffset !== null && this._consumer.isConnected() && !this.isPaused()) {
827+
try {
828+
this._consumer.offsetsStore([{ topic, partition, offset: committableOffset }]);
829+
} catch (e) {
830+
// offsetsStore() should not throw given the guards above;
831+
// if it does (e.g. ERR__STATE race during a rebalance or
832+
// shutdown), log as error but do not crash — the offset will
833+
// be re-committed after the partition is re-assigned.
834+
this._log.error('offsetsStore failed', {
835+
error: e.toString(),
836+
topic,
837+
partition,
838+
offset: committableOffset,
839+
groupId: this._groupId,
840+
});
841+
}
830842
}
831843
}
832844

tests/functional/lib/BackbeatConsumer.js

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,104 @@ describe('BackbeatConsumer rebalance tests', () => {
378378
}
379379
}, 1000);
380380
}).timeout(60000);
381+
382+
});
383+
384+
describe('BackbeatConsumer deferred commit after rebalance', () => {
385+
const topic = 'backbeat-consumer-spec-ERR-STATE';
386+
const groupId = `replication-group-${Math.random()}`;
387+
let producer;
388+
let consumer1;
389+
let consumer2;
390+
391+
before(function before(done) {
392+
this.timeout(60000);
393+
394+
producer = new BackbeatProducer({
395+
kafka: producerKafkaConf, topic,
396+
pollIntervalMs: 100,
397+
compressionType: 'none',
398+
});
399+
consumer1 = new BackbeatConsumer({
400+
clientId: 'BackbeatConsumer-ERR-STATE-1',
401+
zookeeper: zookeeperConf,
402+
kafka: { ...consumerKafkaConf, compressionType: 'none' },
403+
groupId, topic,
404+
queueProcessor: (_msg, cb) => cb(),
405+
bootstrap: true,
406+
});
407+
async.parallel([
408+
innerDone => producer.on('ready', innerDone),
409+
innerDone => consumer1.on('ready', innerDone),
410+
], err => {
411+
if (err) return done(err);
412+
consumer2 = new BackbeatConsumer({
413+
clientId: 'BackbeatConsumer-ERR-STATE-2',
414+
zookeeper: zookeeperConf,
415+
kafka: { ...consumerKafkaConf, compressionType: 'none' },
416+
groupId, topic,
417+
queueProcessor: (_msg, cb) => cb(),
418+
});
419+
consumer2.on('ready', done);
420+
});
421+
});
422+
423+
after(function after(done) {
424+
this.timeout(10000);
425+
async.parallel([
426+
innerDone => producer.close(innerDone),
427+
innerDone => consumer1.close(innerDone),
428+
innerDone => (consumer2 ? consumer2.close(innerDone) : innerDone()),
429+
], done);
430+
});
431+
432+
it('should not crash when onEntryCommittable is called after partition revoke', done => {
433+
let deferredEntry = null;
434+
435+
// Setup: when consumer1 receives a message, complete with
436+
// { committable: false }. This frees the processing queue
437+
// slot but does NOT commit the offset.
438+
consumer1._queueProcessor = (message, cb) => {
439+
deferredEntry = message;
440+
process.nextTick(() => cb(null, { committable: false }));
441+
};
442+
443+
consumer2._queueProcessor = (_message, cb) => {
444+
process.nextTick(cb);
445+
};
446+
447+
// 1 : consumer1 subscribes and consumes the message.
448+
consumer1.subscribe();
449+
producer.send([{ key: 'foo', message: '{"hello":"foo"}' }], err => {
450+
assert.ifError(err);
451+
});
452+
453+
// 2 : wait until consumer1 has processed the message.
454+
// The processing queue is now idle but the
455+
// deferred commit is still pending.
456+
const waitForDeferred = setInterval(() => {
457+
if (!deferredEntry) {
458+
return;
459+
}
460+
clearInterval(waitForDeferred);
461+
462+
// 3 : consumer2 joins the same group, triggering a
463+
// rebalance. consumer1's revoke handler sees an idle
464+
// queue and immediately unassigns the partition.
465+
consumer1.once('unassign', () => {
466+
// 4 : the external caller finishes its work and calls
467+
// onEntryCommittable() for the now-revoked partition.
468+
// It would crash without the try catch in the method, as
469+
// an error ERR__STATE is returned by librdkafka when trying to commit
470+
assert.doesNotThrow(() => {
471+
consumer1.onEntryCommittable(deferredEntry);
472+
});
473+
done();
474+
});
475+
476+
consumer2.subscribe();
477+
}, 100);
478+
}).timeout(40000);
381479
});
382480

383481
describe('BackbeatConsumer concurrency tests', () => {

tests/unit/backbeatConsumer.js

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ const assert = require('assert');
22
const sinon = require('sinon');
33

44
const BackbeatConsumer = require('../../lib/BackbeatConsumer');
5+
const { CODES } = require('node-rdkafka');
56

67
const { kafka } = require('../config.json');
78
const { BreakerState } = require('breakbeat').CircuitBreaker;
@@ -238,6 +239,80 @@ describe('backbeatConsumer', () => {
238239
});
239240
});
240241

242+
describe('onEntryCommittable', () => {
243+
let consumer;
244+
let mockConsumer;
245+
246+
const entry = {
247+
topic: 'my-test-topic',
248+
partition: 2,
249+
offset: 280,
250+
key: null,
251+
timestamp: Date.now(),
252+
};
253+
254+
beforeEach(() => {
255+
consumer = new BackbeatConsumerMock({
256+
kafka,
257+
groupId: 'unittest-group',
258+
topic: 'my-test-topic',
259+
});
260+
261+
mockConsumer = {
262+
offsetsStore: sinon.stub(),
263+
subscription: sinon.stub().returns(['my-test-topic']),
264+
isConnected: sinon.stub().returns(true),
265+
};
266+
consumer._consumer = mockConsumer;
267+
268+
// pre-register the offset as consumed so onOffsetProcessed returns a value
269+
consumer._offsetLedger.onOffsetConsumed(entry.topic, entry.partition, entry.offset);
270+
});
271+
272+
afterEach(() => {
273+
sinon.restore();
274+
});
275+
276+
it('should call offsetsStore when consumer is active and connected', () => {
277+
consumer.onEntryCommittable(entry);
278+
assert(mockConsumer.offsetsStore.calledOnce);
279+
});
280+
281+
it('should not call offsetsStore when consumer is paused (unsubscribed)', () => {
282+
mockConsumer.subscription.returns([]);
283+
consumer.onEntryCommittable(entry);
284+
assert(mockConsumer.offsetsStore.notCalled);
285+
});
286+
287+
it('should not call offsetsStore when consumer is not connected', () => {
288+
mockConsumer.isConnected.returns(false);
289+
consumer.onEntryCommittable(entry);
290+
assert(mockConsumer.offsetsStore.notCalled);
291+
});
292+
293+
it('should not throw and always log at error level when offsetsStore throws', () => {
294+
const errState = new Error('Local: Erroneous state');
295+
errState.code = CODES.ERRORS.ERR__STATE;
296+
mockConsumer.offsetsStore.throws(errState);
297+
298+
const errorSpy = sinon.spy(consumer._log, 'error');
299+
300+
assert.doesNotThrow(() => consumer.onEntryCommittable(entry));
301+
assert(errorSpy.calledOnce);
302+
});
303+
304+
it('should not throw and log at error level when offsetsStore throws an unexpected error', () => {
305+
const unexpectedErr = new Error('unexpected kafka error');
306+
unexpectedErr.code = -1;
307+
mockConsumer.offsetsStore.throws(unexpectedErr);
308+
309+
const errorSpy = sinon.spy(consumer._log, 'error');
310+
311+
assert.doesNotThrow(() => consumer.onEntryCommittable(entry));
312+
assert(errorSpy.calledOnce);
313+
});
314+
});
315+
241316
describe('_getAvailableSlotsInPipeline', () => {
242317
let consumer;
243318

0 commit comments

Comments
 (0)