Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions lib/BackbeatConsumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -823,10 +823,21 @@ class BackbeatConsumer extends EventEmitter {
});
// ensure consumer is active when calling offsetsStore() on
// it, to avoid raising an exception (invalid state)
// TODO : potential issue here, see BB-758
if (committableOffset !== null && !this.isPaused()) {
this._consumer.offsetsStore([{ topic, partition,
offset: committableOffset }]);
if (committableOffset !== null && !this.isPaused() && this._consumer.isConnected()) {
try {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since you added the isConnected() guard, do we also need to catch the exception?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could still have cases where is connected returns true, but then inside offsetStore something wrong happens 🤔

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we ? this is javascript, so single threaded...

this._consumer.offsetsStore([{ topic, partition, offset: committableOffset }]);
} catch (e) {
// offsetsStore() should not throw given the guards above;
// if it does (e.g. ERR__STATE race during a rebalance or
// shutdown), log as error but do not crash — the offset will
// be re-committed after the partition is re-assigned.
this._log.error('offsetsStore failed', {
error: e.toString(),
topic,
partition,
offset: committableOffset,
});
}
}
}

Expand Down
75 changes: 75 additions & 0 deletions tests/unit/backbeatConsumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ const assert = require('assert');
const sinon = require('sinon');

const BackbeatConsumer = require('../../lib/BackbeatConsumer');
const { CODES } = require('node-rdkafka');

const { kafka } = require('../config.json');
const { BreakerState } = require('breakbeat').CircuitBreaker;
Expand Down Expand Up @@ -238,6 +239,80 @@ describe('backbeatConsumer', () => {
});
});

describe('onEntryCommittable', () => {
let consumer;
let mockConsumer;

const entry = {
topic: 'my-test-topic',
partition: 2,
offset: 280,
key: null,
timestamp: Date.now(),
};

beforeEach(() => {
consumer = new BackbeatConsumerMock({
kafka,
groupId: 'unittest-group',
topic: 'my-test-topic',
});

mockConsumer = {
offsetsStore: sinon.stub(),
subscription: sinon.stub().returns(['my-test-topic']),
isConnected: sinon.stub().returns(true),
};
consumer._consumer = mockConsumer;

// pre-register the offset as consumed so onOffsetProcessed returns a value
consumer._offsetLedger.onOffsetConsumed(entry.topic, entry.partition, entry.offset);
});

afterEach(() => {
sinon.restore();
});

it('should call offsetsStore when consumer is active and connected', () => {
consumer.onEntryCommittable(entry);
assert(mockConsumer.offsetsStore.calledOnce);
});

it('should not call offsetsStore when consumer is paused (unsubscribed)', () => {
mockConsumer.subscription.returns([]);
consumer.onEntryCommittable(entry);
assert(mockConsumer.offsetsStore.notCalled);
});

it('should not call offsetsStore when consumer is not connected', () => {
mockConsumer.isConnected.returns(false);
consumer.onEntryCommittable(entry);
assert(mockConsumer.offsetsStore.notCalled);
});

it('should not throw and always log at error level when offsetsStore throws', () => {
const errState = new Error('Local: Erroneous state');
errState.code = CODES.ERRORS.ERR__STATE;
mockConsumer.offsetsStore.throws(errState);

const errorSpy = sinon.spy(consumer._log, 'error');

assert.doesNotThrow(() => consumer.onEntryCommittable(entry));
assert(errorSpy.calledOnce);
});

it('should not throw and log at error level when offsetsStore throws an unexpected error', () => {
const unexpectedErr = new Error('unexpected kafka error');
unexpectedErr.code = -1;
mockConsumer.offsetsStore.throws(unexpectedErr);

const errorSpy = sinon.spy(consumer._log, 'error');

assert.doesNotThrow(() => consumer.onEntryCommittable(entry));
assert(errorSpy.calledOnce);
});
});

describe('_getAvailableSlotsInPipeline', () => {
let consumer;

Expand Down
Loading