-
Notifications
You must be signed in to change notification settings - Fork 23
Expand file tree
/
Copy pathLogConsumer.js
More file actions
374 lines (351 loc) · 12.9 KB
/
LogConsumer.js
File metadata and controls
374 lines (351 loc) · 12.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
const { EventEmitter } = require('events');
const kafka = require('node-rdkafka');
const async = require('async');
const jsutil = require('arsenal').jsutil;
const ListRecordStream = require('./ListRecordStream');
const KafkaBacklogMetrics = require('../../KafkaBacklogMetrics');
const { unassignStatus } = require('../../constants');
const EVENTS = {
DRAINED: 'drained',
UNASSIGNED: 'unassigned',
};
class LogConsumer extends EventEmitter {
/**
* @constructor
* @param {Object} kafkaConfig queue populator kafka config
* @param {string} kafkaConfig.hosts kafka hosts
* @param {string} kafkaConfig.topic kafka oplog topic
* @param {string} kafkaConfig.consumerGroupId consumer group id
* @param {Logger} logger logger
*/
constructor(kafkaConfig, logger) {
super();
const { hosts, topic, consumerGroupId, maxPollIntervalMs } = kafkaConfig;
this._kafkaHosts = hosts;
this._maxPollIntervalMs = maxPollIntervalMs || 300000; // default to 5 minutes
this._topic = topic;
this._consumerGroupId = consumerGroupId;
this._topicPartition = null;
this._pendingCommit = false;
this._log = logger;
this._consumer = null;
}
/**
* Get partition offsets
* Offsets are stored in kafka and not managed
* by the logReader, only keeping this function
* to not alter the logic path in the logReader
* @returns {null}
*/
_getOffset() {
return null;
}
/**
* Connects consumer to kafka and subscribes
* to oplog topic
* @param {Function} done callback
* @returns {undefined}
*/
setup(done) {
// partition offsets will be managed by kafka
const consumerParams = {
// This is the default in our current librdkafka version, but we
// pin it explicitly because we depend on eager rebalancing and
// don't want it changed implicitly by future version updates.
'partition.assignment.strategy': 'range,roundrobin',
// Manually manage storing offsets to ensure they are only stored
// after the batch processing is fully completed.
'enable.auto.offset.store': false,
// Default auto-commit interval is 5 seconds
'enable.auto.commit': true,
'offset_commit_cb': this._onOffsetCommit.bind(this),
'rebalance_cb': this._onRebalance.bind(this),
'metadata.broker.list': this._kafkaHosts,
'group.id': this._consumerGroupId,
'max.poll.interval.ms': this._maxPollIntervalMs,
};
const topicParams = {
'auto.offset.reset': 'earliest',
};
this._consumer = new kafka.KafkaConsumer(consumerParams, topicParams);
this._consumer.connect();
this._consumer.once('ready', () => {
this._consumer.subscribe([this._topic]);
done();
});
}
/**
* Offset commit callback
* @param {Error} err
* @param {Object} topicPartitions
* @returns {undefined}
*/
_onOffsetCommit(err, topicPartitions) {
if (err) {
// NO_OFFSET is a "soft error" meaning that the same
// offset is already committed, which occurs because of
// auto-commit (e.g. if nothing was done by the producer
// on this partition since last commit).
if (err.code === kafka.CODES.ERRORS.ERR__NO_OFFSET) {
return undefined;
}
this._log.error('Error committing offsets to kafka', {
method: 'LogConsumer._onOffsetCommit',
errorCode: err,
topicPartitions,
groupId: this._consumerGroupId,
});
return undefined;
}
this._pendingCommit = false;
if (!this._hasUnprocessedMessages()) {
this.emit(EVENTS.DRAINED);
}
this._log.debug('Offsets committed', {
method: 'LogConsumer._onOffsetCommit',
topicPartitions,
groupId: this._consumerGroupId
});
return undefined;
}
/**
* @param {kafka.KafkaError} err Rebalance event
* @param {TopicPartition[]} assignment List of (un)assigned partitions
* @returns {void}
*/
_onRebalance(err, assignment) {
if (err.code === kafka.CODES.ERRORS.ERR__ASSIGN_PARTITIONS) {
this._log.info('rdkafka.assign', {
method: 'LogConsumer._onRebalance',
assignment,
topic: this._topic,
consumerGroupId: this._consumerGroupId,
});
this._assignPartitions(assignment);
} else if (err.code === kafka.CODES.ERRORS.ERR__REVOKE_PARTITIONS) {
this._log.info('rdkafka.revoke', {
method: 'LogConsumer._onRebalance',
assignment,
topic: this._topic,
consumerGroupId: this._consumerGroupId,
});
this._drainAndUnassign();
} else {
this._log.error('rdkafka.rebalance', {
method: 'LogConsumer._onRebalance',
err,
assignment,
topic: this._topic,
consumerGroupId: this._consumerGroupId,
});
}
}
/**
* Assign partitions to the consumer
* @param {TopicPartition[]} assignment
*/
_assignPartitions(assignment) {
try {
this._consumer.assign(assignment);
} catch (e) {
const logger = this._consumer.isConnected() ? this._log.error : this._log.debug;
logger.bind(this._log)('rdkafka.assign failed', {
method: 'LogConsumer._onRebalance',
error: e.toString(),
assignment,
topic: this._topic,
consumerGroupId: this._consumerGroupId,
});
}
}
/**
* Unassign partitions from the consumer
* @returns {void}
*/
_unassignPartitions() {
try {
this._consumer.unassign();
} catch (e) {
const logger = this._consumer.isConnected() ? this._log.error : this._log.info;
logger.bind(this._log)('rdkafka.unassign failed', {
method: 'LogConsumer._onRebalance',
error: e.toString(),
topic: this._topic,
consumerGroupId: this._consumerGroupId,
});
}
this.emit(EVENTS.UNASSIGNED);
}
/**
* Waits for all messages to be processed before unassigning
* the partitions from the consumer
* @returns {void}
*/
_drainAndUnassign() {
let drainTimeout;
const drainHandler = () => unassign(unassignStatus.DRAINED);
const unassign = jsutil.once(status => {
this.removeListener(EVENTS.DRAINED, drainHandler);
clearTimeout(drainTimeout);
drainTimeout = null;
KafkaBacklogMetrics.onRebalance(this._topic, this._consumerGroupId, status);
this._unassignPartitions();
});
if (!this._hasUnprocessedMessages() && !this._pendingCommit) {
return unassign(unassignStatus.IDLE);
}
this.once(EVENTS.DRAINED, drainHandler);
drainTimeout = setTimeout(() => {
unassign(unassignStatus.TIMEOUT);
this._log.error('Timeout waiting for messages to be processed', {
method: 'LogConsumer._drainAndUnassign',
topic: this._topic,
consumerGroupId: this._consumerGroupId,
});
this._consumer.disconnect();
}, this._maxPollIntervalMs - 1000);
return undefined;
}
/**
* Inintializes record stream
* @returns {undefined}
*/
_resetRecordStream() {
this._listRecordStream = new ListRecordStream(this._log);
this._listRecordStream.getOffset = this._getOffset.bind(this);
}
/**
* Consumes kafka messages and writes them to record
* stream
* @param {Number} limit maximum messages to consume
* @param {Function} cb callback
* @returns {undefined}
*/
_consumeKafkaMessages(limit, cb) {
this._resetRecordStream();
const isConnected = this._consumer.isConnected();
if (!isConnected || this._hasUnprocessedMessages()) {
this._log.info('Skipping message consumption', {
method: 'LogConsumer._consumeKafkaMessages',
topic: this._topic,
consumerGroupId: this._consumerGroupId,
reason: !isConnected ? 'consumer not connected' :
'awaiting processing of previous batch',
});
return cb();
}
return this._consumer.consume(limit, (err, messages) => {
if (err) {
this._log.error('An error occured while consuming messages', {
method: 'LogConsumer.readRecords',
topic: this._topic,
error: err.message,
consumerGroupId: this._consumerGroupId,
});
return cb();
}
// Use a Map to store the latest offset for each partition
const topicPartition = new Map();
// store next offsets to commit
messages.forEach(message => {
topicPartition.set(`${message.topic}-${message.partition}`, {
topic: message.topic,
partition: message.partition,
offset: message.offset + 1, // next offset to commit
});
// writing consumed messages to the stream
this._listRecordStream.write(message);
});
// format offsets for commit
this._topicPartition = Array.from(topicPartition.values());
return cb();
});
}
/**
* Reads a certain number of messages from oplog kafka topic
* The caller of this function expects a stream to be returned
* in the callback
* @param {Object} params reading params
* @param {number} params.limit maximum number of elements to fetch
* @param {Function} cb callback
* @returns {undefined}
*/
readRecords(params, cb) {
async.series([
// consuming the desired number of messages at most
next => this._consumeKafkaMessages(params.limit, next),
next => {
// ending and returning the stream
this._listRecordStream.end();
return next(null, { log: this._listRecordStream, tailable: false });
}
], (err, res) => cb(err, res?.[1]));
}
/**
* Stores the offsets locally for the consumer to
* auto commit them at a later time
* @returns {undefined}
*/
storeOffsets() {
if (!this._hasUnprocessedMessages()) {
this._log.debug('No offsets to store', {
method: 'LogConsumer.storeOffsets',
topic: this._topic,
consumerGroupId: this._consumerGroupId,
});
return;
}
this._consumer.offsetsStore(this._topicPartition);
this._log.info('Offsets stored', {
method: 'LogConsumer.storeOffsets',
consumerGroupId: this._consumerGroupId,
topicPartition: this._topicPartition,
});
this._pendingCommit = true;
this._topicPartition = null;
}
_hasUnprocessedMessages() {
return this._topicPartition?.length > 0;
}
/**
* LogConsumer is considered ready if it is connected
* to the Kafka broker and ready to consume messages.
* @returns {boolean}
*/
isReady() {
return this._consumer.isConnected();
}
/**
* Closes the consumer connection, while making sure
* that all messages are processed and offsets are committed.
* @param {Function} cb
*/
close(cb) {
async.series([
next => {
if (this._consumer?.isConnected()) {
const subscription = this._consumer.subscription() || [];
if (subscription.length > 0) {
// we first unsubscribe from the topic
// to initiate the rebalance process
// allowing the message processing to finish
// and offsets to be committed
this.once(EVENTS.UNASSIGNED, () => next());
this._consumer.unsubscribe();
return null;
}
}
return next();
},
next => {
if (this._consumer?.isConnected()) {
this._consumer.once('disconnected', () => next());
this._consumer.disconnect();
return null;
}
return next();
},
], cb);
}
}
module.exports = LogConsumer;