-
Notifications
You must be signed in to change notification settings - Fork 23
Expand file tree
/
Copy pathBackbeatConsumer.js
More file actions
1123 lines (1039 loc) · 44.8 KB
/
BackbeatConsumer.js
File metadata and controls
1123 lines (1039 loc) · 44.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
const { EventEmitter } = require('events');
const kafka = require('node-rdkafka');
const assert = require('assert');
const async = require('async');
const joi = require('joi');
const jsutil = require('arsenal').jsutil;
const Logger = require('werelogs').Logger;
const { BreakerState, CircuitBreaker } = require('breakbeat').CircuitBreaker;
const BackbeatProducer = require('./BackbeatProducer');
const OffsetLedger = require('./OffsetLedger');
const TaskScheduler = require('./tasks/TaskScheduler');
const KafkaBacklogMetrics = require('./KafkaBacklogMetrics');
const {
startCircuitBreakerMetricsExport,
} = require('./CircuitBreaker');
const { observeKafkaStats } = require('./util/probe');
const {
unassignStatus,
backbeatConsumer: {
CONCURRENCY_DEFAULT,
MAX_QUEUED_DEFAULT,
}
} = require('./constants');
const CLIENT_ID = 'BackbeatConsumer';
const { withTopicPrefix } = require('./util/topic');
/**
* Stats on how we are consuming Kafka
* @typedef {Object} ConsumerStats
* @property {Map<string, number>} lag - Map of partitions to kafka message lag
*/
class BackbeatConsumer extends EventEmitter {
/**
* constructor
* @param {Object} config - config
* @param {string} config.topic - Kafka topic to subscribe to
* @param {function} config.queueProcessor - function to invoke to
* process an item in a queue (see doc of
* this.onEntryCommittable() if it's desired not to allow
* committing the consumer offset immediately after this function
* calls its callback).
* @param {string} config.groupId - consumer group id. Messages are
* distributed among multiple consumers belonging to the same group
* @param {Object} [config.zookeeper] - zookeeper endpoint config
* (only needed if config.kafka.backlogMetrics is set)
* @param {string} config.zookeeper.connectionString - zookeeper
* connection string as "host:port[/chroot]" (only needed if
* config.kafka.backlogMetrics is set)
* @param {Object} config.kafka - kafka connection config
* @param {string} config.kafka.hosts - kafka hosts list
* as "host:port[,host:port...]"
* @param {object} [config.kafka.site] - name of site where this
* consumer runs, enables Kafka follower fetching when provided
* @param {object} [config.kafka.backlogMetrics] - param object to
* publish kafka topic metrics to zookeeper (disabled if param object
* is not set)
* @param {string} config.kafka.backlogMetrics.zkPath - zookeeper base
* path to publish metrics to
* @param {boolean} [config.kafka.backlogMetrics.intervalS=60] -
* interval in seconds between iterations of kafka metrics
* publishing task
* @param {string} [config.fromOffset] - valid values latest/earliest/none
* @param {number} [config.concurrency] - represents the number of entries
* that can be processed in parallel
* @param {number} [config.fetchMaxBytes] - max. bytes to fetch in a
* fetch loop
* @param {boolean} [config.canary=false] - true to send a canary
* message to bootstrap partition offsets (useful for pause/resume
* functionality to work)
* @param {boolean} [config.bootstrap=false] - TEST ONLY: true to
* bootstrap the consumer with test messages until it starts
* consuming them
*/
constructor(config) {
super();
const configJoi = joi.object({
clientId: joi.string().default(CLIENT_ID),
zookeeper: joi.object({
connectionString: joi.string().required(),
}).when('kafka.backlogMetrics', { is: joi.exist(), then: joi.required() }),
kafka: joi.object({
hosts: joi.string().required(),
backlogMetrics: {
zkPath: joi.string().required(),
intervalS: joi.number().default(60),
},
site: joi.string(),
maxPollIntervalMs: joi.number().min(45000).default(300000),
// Kafka producer params
compressionType: joi.string(),
requiredAcks: joi.number(),
}).required(),
topic: joi.string().required(),
groupId: joi.string().required(),
queueProcessor: joi.func(),
fromOffset: joi.alternatives().try('latest', 'earliest', 'none'),
concurrency: joi.number().greater(0).default(CONCURRENCY_DEFAULT),
fetchMaxBytes: joi.number(),
canary: joi.boolean().default(false),
bootstrap: joi.boolean().default(false),
circuitBreaker: joi.object().optional(),
circuitBreakerMetrics: joi.object({
type: joi.string().required(),
}).optional(),
// Function to determine what to order the processing queue by.
// Default function orders by the key of the message, if the message
// has a key. Messages with the same key are processed in order.
// Messages with different keys are processed in parallel.
// Setting this to null will disable ordering and will process messages
// in parallel.
orderByFunc: joi.function()
.default(ctx => ctx?.entry?.key?.toString())
.allow(null),
// When ordering is enabled, one message is processed at a time
// for a given key, other messages with the same key are queued.
// This parameter controls the maximum number of messages that can
// be queued for processing (not per key but globally).
// Avoid having a very high value for this parameter as it will increase
// the risk of re-processing previously processed messages in case of
// a crash or rebalance, as the queued messages will block offset commit
// for more recent messages with different keys that might have been processed.
maxQueued: joi.number()
.greater(0)
.default(MAX_QUEUED_DEFAULT),
});
const validConfig = joi.attempt(config, configJoi,
'invalid config params');
const { clientId, zookeeper, kafka, topic, groupId, queueProcessor,
fromOffset, concurrency, maxQueued, fetchMaxBytes,
canary, bootstrap, circuitBreaker, circuitBreakerMetrics, orderByFunc } = validConfig;
this._zookeeperEndpoint = zookeeper && zookeeper.connectionString;
this._kafkaHosts = kafka.hosts;
this._kafkaBacklogMetricsConfig = kafka.backlogMetrics;
this._maxPollIntervalMs = kafka.maxPollIntervalMs;
this._producerCompressionType = kafka.compressionType;
this._producerRequiredAcks = kafka.requiredAcks;
this._site = kafka.site;
this._fromOffset = fromOffset;
this._log = new Logger(clientId);
this._topic = withTopicPrefix(topic);
this._groupId = groupId;
this._queueProcessor = queueProcessor;
this._concurrency = concurrency;
this._maxQueued = maxQueued;
this._fetchMaxBytes = fetchMaxBytes;
this._canary = canary;
this._bootstrap = bootstrap;
this._offsetLedger = new OffsetLedger();
this._processingQueue = new TaskScheduler(
(ctx, done) => this._processTask(ctx.entry, done),
orderByFunc,
null,
this._concurrency,
);
this._messagesConsumed = 0;
// this variable represents how many kafka messages have been
// requested without having been received yet, i.e. still
// being fetched by this._consumer.consume()
this._nConsumePendingRequests = 0;
this._consumer = null;
this._consumerReady = false;
this._bootstrapping = false;
this._kafkaBacklogMetrics = null;
this._kafkaBacklogMetricsReady = false;
this._publishOffsetsCronTimer = null;
this._publishOffsetsCronActive = false;
this._consumedEventTimeout = null;
this._drainProcessQueueTimeout = null;
this._tryConsumedTimeout = null;
// This variable is set to true when tasks have been completed
// since the last consume, and is used to trigger a new consume
// immediately after the current one
this._tasksCompletedSinceLastConsume = false;
/** @type {ConsumerStats} */
this.consumerStats = { lag: {} };
this._circuitBreaker = new CircuitBreaker(circuitBreaker);
this._circuitBreakerMetrics = circuitBreakerMetrics;
this._circuitBreaker.on('state-changed', this._onCircuitBreakerStateChanged.bind(this));
this._init();
return this;
}
_init() {
if (this._bootstrap) {
this._consumerReady = true;
} else {
this._initConsumer();
}
if (this._kafkaBacklogMetricsConfig) {
this._initKafkaBacklogMetrics();
} else {
this._kafkaBacklogMetricsReady = true;
}
process.nextTick(this._checkIfReady.bind(this));
}
_initConsumer() {
// TODO: Ask Rahul/Jonathan if at least once delivery is acceptable.
// We automatically and periodically commit offsets in the background
// every 5 seconds (default value of "auto.commit.interval.ms").
// That means if the consumer process crashes, it will re-process the last
// 5 seconds of messages before crashing.
// Therefore, a message could be comsumed multiple times leading to:
// - an object/version could be put to its destination location
// more than once (if queue_processor crashed).
// - same object/version could be replayed multiple times (if
// replication_status_processor crashes).
const consumerParams = {
'metadata.broker.list': this._kafkaHosts,
'group.id': this._groupId,
// we manage stored offsets based on the highest
// contiguous offset fully processed by a worker, so
// disabling automatic offset store is needed
'enable.auto.offset.store': false,
// this function is called periodically based on
// auto-commit of stored offsets
'offset_commit_cb': this._onOffsetCommit.bind(this),
// automatically create topic
'allow.auto.create.topics': true,
'statistics.interval.ms': 1000,
'rebalance_cb': this._onRebalance.bind(this),
// decrease max metadata age to 5s to decrease potential lag between
// consumer and producer when a partition is added to a topic
'metadata.max.age.ms': 5000,
'max.poll.interval.ms': this._maxPollIntervalMs,
};
const topicParams = {};
if (this._fromOffset !== undefined) {
topicParams['auto.offset.reset'] = this._fromOffset;
}
if (this._fetchMaxBytes !== undefined) {
consumerParams['fetch.message.max.bytes'] = this._fetchMaxBytes;
}
if (process.env.RDKAFKA_DEBUG_LOGS) {
consumerParams.debug = process.env.RDKAFKA_DEBUG_LOGS;
}
if (this._site) {
this._log.info('follower fetching enabled for topic/consumer group', {
site: this._site,
topic: this._topic,
groupId: this._groupId,
});
consumerParams['client.rack'] = this._site;
}
this._consumer = new kafka.KafkaConsumer(consumerParams, topicParams);
this._consumer.on('event', event => this._log.info('rdkafka.event', { event }));
this._consumer.on('event.log', log => this._log.info('rdkafka.log', { log }));
this._consumer.on('warning', warning => this._log.warn('rdkafka.warning', { warning }));
this._consumer.on('event.error', err => this._log.error('rdkafka.error', { err }));
this._consumer.on('event.throttle', throttle => this._log.info('rdkafka.throttle', { throttle }));
this._consumer.on('event.stats', observeKafkaStats);
this._consumer.connect({ timeout: 10000 }, () => {
const opts = {
topic: withTopicPrefix('backbeat-sanitycheck'),
timeout: 10000,
};
this._consumer.getMetadata(opts, err => {
if (err) {
this.emit('error', err);
}
});
});
return this._consumer.once('ready', () => {
this._consumerReady = true;
this._checkIfReady();
});
}
_initKafkaBacklogMetrics() {
this._kafkaBacklogMetrics = new KafkaBacklogMetrics(
this._zookeeperEndpoint, this._kafkaBacklogMetricsConfig);
this._kafkaBacklogMetrics.init();
this._kafkaBacklogMetrics.once('ready', () => {
this._kafkaBacklogMetricsReady = true;
this._checkIfReady();
});
}
_checkIfReady() {
if (this._consumerReady && this._kafkaBacklogMetricsReady) {
if (this._bootstrap) {
if (!this._bootstrapping) {
this._bootstrapConsumer();
}
} else {
this._onReady();
}
}
}
_onReady() {
this.emit('ready');
this._circuitBreaker.start();
const cbm = this._circuitBreakerMetrics;
if (cbm?.type) {
startCircuitBreakerMetricsExport(this._circuitBreaker, cbm.type);
}
if (this._kafkaBacklogMetricsConfig) {
this._publishOffsetsCronTimer =
setInterval(this._publishOffsetsCron.bind(this),
this._kafkaBacklogMetricsConfig.intervalS * 1000);
}
}
isReady() {
let kMetrics = null;
let cs = null;
if (this._kafkaBacklogMetricsConfig) {
kMetrics = this._kafkaBacklogMetrics.isReady();
} else {
kMetrics = true;
}
if (this._bootstrap) {
cs = true;
} else {
cs = this._consumer && this._consumer.isConnected();
if (!cs) {
this._log.error('KafkaConsumer is not connected',
{ topic: this._topic, groupId: this._groupId });
}
}
return kMetrics && cs;
}
/**
* subscribe to messages from a topic
* Once subscribed, the consumer does a fetch from the topic with new
* messages. Each fetch loop can contain one or more messages, so the fetch
* is paused until the current queue of tasks are processed. Once the task
* queue is empty, the current offset is committed and the fetch is resumed
* to get the next batch of messages
* @param {Boolean} [paused] - optional field. If true, kafka consumer should
* not subscribe to its topic
* @return {this} current instance
*/
subscribe(paused) {
if (!paused) {
this._consumer.subscribe([this._topic]);
} else {
this._log.debug(`consumer is paused for topic ${this._topic}`,
{ groupId: this._groupId });
}
if (this._canary) {
this._sendCanary(() => {
this.emit('canary');
});
}
this._consumer.on('event.error', error => {
// This is a bit hacky: the "broker transport failure"
// error occurs when the kafka broker reaps the idle
// connections every few minutes, and librdkafka handles
// reconnection automatically anyway, so we ignore those
// harmless errors
if (error.code === kafka.CODES.ERRORS.ERR__ALL_BROKERS_DOWN ||
error.code === kafka.CODES.ERRORS.ERR__TRANSPORT) {
this._log.error('consumer error', {
error,
topic: this._topic,
groupId: this._groupId,
});
this.emit('error', error);
}
});
// trigger initial consumption
this._tryConsume();
return this;
}
/**
* Get how many messages we may attempt to consume at this time,
* considering the maximum concurrency, queued messages, and message
* retrievals still pending.
*
* @return {integer} a strictly positive number representing the
* number of new messages that the pipeline is ready to process,
* or zero if the processing pipeline is 100% busy
*/
_getAvailableSlotsInPipeline() {
// do not exceed max queued tasks limit
const nQueuedSlots = this._maxQueued
- this._processingQueue.length();
// do not exceed max concurrency limit
const nRunningSlots = this._concurrency
- this._processingQueue.running()
- this._nConsumePendingRequests;
return Math.max(0, Math.min(nQueuedSlots, nRunningSlots));
}
_tryConsume() {
if (this._tryConsumedTimeout) {
clearTimeout(this._tryConsumedTimeout);
this._tryConsumedTimeout = null;
}
// use non-flowing mode of consumption to add some flow
// control: explicit consumption of messages is required,
// needs explicit polling to get new messages
const nNewConsumeRequests = this._getAvailableSlotsInPipeline();
if (nNewConsumeRequests === 0) {
// processing pipeline is 100% busy, do not attempt to consume
return undefined;
}
// Calls to consume() should be done sequentially to make sure messages
// are consumed in order, as there might be delay in the transport
// layer causing the responses of the consume() calls to be received
// out of order in a case of concurrent calls.
if (this._nConsumePendingRequests > 0) {
this._tasksCompletedSinceLastConsume = true;
return undefined;
}
this._nConsumePendingRequests += nNewConsumeRequests;
return this._consumer.consume(nNewConsumeRequests, (err, entries) => {
this._nConsumePendingRequests -= nNewConsumeRequests;
if (!err) {
entries.forEach(entry => {
const { topic, partition, offset, key, timestamp } = entry;
this._log.debug('marked consumed entry', {
entry: { topic, partition, offset,
key: key && key.toString(), timestamp },
groupId: this._groupId,
offsetLedger: this._offsetLedger.toString(),
});
if (topic === undefined ||
partition === undefined ||
offset === undefined) {
this._log.error('"topic" or "partition" or "offset" ' +
'is undefined in entry', {
entry: {
topic, partition, offset,
key: key && key.toString(),
timestamp,
},
groupId: this._groupId,
});
return undefined;
}
this._offsetLedger.onOffsetConsumed(
entry.topic, entry.partition, entry.offset);
this._messagesConsumed++;
KafkaBacklogMetrics.onTaskQueued(topic, partition, this._groupId);
this._processingQueue.push({ entry }, (err, completionArgs, finishProcessingTask) => {
this._onEntryProcessingDone(err, entry, completionArgs);
finishProcessingTask(err);
});
// update Zenko metrics with the latest consumed
// message timestamp, to later allow computing
// backlog metrics on demand
KafkaBacklogMetrics.onMessageConsumed(
entry.topic, entry.partition, this._groupId,
entry.timestamp / 1000);
return undefined;
});
// Immediately try to consume more messages
// if tasks have completed since the last consume
if (this._tasksCompletedSinceLastConsume) {
this._tasksCompletedSinceLastConsume = false;
process.nextTick(() => this._tryConsume());
return;
}
}
if (err || entries.length < nNewConsumeRequests) {
this._log.debug(err ? `error, retry in 1s: ${err.message}` :
`not enough message available yet, retry in 1s: ${entries.length}`,
{ topic: this._topic, groupId: this._groupId });
// if the topic does not exist (even with 'allow.auto.create.topics'),
// the consumer will be ready only after automatic getMetadata is called,
// so we want to getMetadata manually to avoid waiting for the next
// automatic getMetadata call
if (err?.code === kafka.CODES.ERRORS.ERR_UNKNOWN_TOPIC_OR_PART &&
this._consumer.isConnected() && !this.isPaused()) {
this._consumer.getMetadata({ topic: this._topic }, () => {});
}
this._scheduleNextTryConsume();
}
});
}
/**
* wrapper around the queueProcessor function to track processing
* of a task
* @param {Object} entry - kafka entry being processed
* @param {function} done - callback to be called when task has been processed
* @return {undefined}
*/
_processTask(entry, done) {
const finishProcessingTask = this._startProcessingTask(entry);
const { topic, partition } = entry;
KafkaBacklogMetrics.onTaskStarted(topic, partition, this._groupId);
this._queueProcessor(entry, (err, completionArgs) => done(err, completionArgs, finishProcessingTask));
}
/**
* Track processing of a task.
* @param {*} entry - kafka entry being processed
* @param {string} entry.topic - topic name
* @param {string} entry.partition - partition number of consumed message
* @param {string} entry.offset - offset of consumed message
* @returns {function} - callback function to be called when task has been processed
*/
_startProcessingTask(entry) {
const { topic, partition, offset } = entry;
const consumerGroup = this._groupId;
const taskStartTime = Date.now();
let slow = false;
const slowTimer = setTimeout(() => {
this._log.warn('slow task', { topic, partition, consumerGroup, offset });
slow = true;
KafkaBacklogMetrics.onSlowTask(topic, partition, consumerGroup);
}, this._maxPollIntervalMs);
return err => {
clearTimeout(slowTimer);
const duration = Date.now() - taskStartTime;
KafkaBacklogMetrics.onTaskProcessed(topic, partition, consumerGroup, !!err, slow, duration);
};
}
_scheduleNextTryConsume() {
this._tryConsumedTimeout = setTimeout(this._tryConsume.bind(this), 1000);
}
_onEntryProcessingDone(err, entry, completionArgs) {
const { topic, partition, offset, key, timestamp } = entry;
this._log.debug('finished processing of consumed entry', {
method: 'BackbeatConsumer.subscribe',
entry: { topic, partition, offset, key, timestamp },
groupId: this._groupId,
});
if (err) {
this._log.error('error processing an entry', {
error: err,
entry: { topic, partition, offset, key, timestamp },
groupId: this._groupId,
});
this.emit('error', err, entry);
}
// use setTimeout to do gathering and emit less events
if (!this._consumedEventTimeout) {
this._consumedEventTimeout = setTimeout(() => {
if (this._messagesConsumed > 0) {
this.emit('consumed', this._messagesConsumed);
this._messagesConsumed = 0;
}
this._consumedEventTimeout = null;
}, 100);
}
if (!(completionArgs && completionArgs.committable === false)) {
this.onEntryCommittable(entry);
}
// check whether we may get new messages now that the queue
// can accomodate more work
process.nextTick(() => this._tryConsume());
}
_onOffsetCommit(err, topicPartitions) {
if (err) {
// NO_OFFSET is a "soft error" meaning that the same
// offset is already committed, which occurs because of
// auto-commit (e.g. if nothing was done by the producer
// on this partition since last commit).
if (err.code === kafka.CODES.ERRORS.ERR__NO_OFFSET) {
return undefined;
}
this._log.error('error committing offsets to kafka',
{ errorCode: err, topicPartitions, groupId: this._groupId });
}
this._log.debug('commit offsets callback',
{ topicPartitions, groupId: this._groupId });
return undefined;
}
/**
* Pauses consumption of assigned partitions
* @returns {undefined}
*/
_pauseAssignments() {
try {
const assignments = this._consumer?.assignments();
if (!assignments || assignments.length === 0) {
return;
}
const topicPartitions = assignments?.map(assignment => ({
topic: assignment.topic,
partition: assignment.partition
}));
this._consumer.pause(topicPartitions);
this._log.info('circuitbreaker tripped, consumption paused', {
method: 'BackbeatConsumer._pauseAssignments',
});
} catch (error) {
this._log.error('failed to pause partitions', {
method: 'BackbeatConsumer._pauseAssignments',
error,
});
}
}
/**
* Resumes consumption of paused partitions
* @returns {undefined}
*/
_resumePausedPartitions() {
try {
const assignments = this._consumer?.assignments();
if (!assignments || assignments.length === 0) {
return;
}
const topicPartitions = assignments?.map(assignment => ({
topic: assignment.topic,
partition: assignment.partition
}));
this._consumer.resume(topicPartitions);
this._log.info('Resumed consumption', {
method: 'BackbeatConsumer._resumePausedPartitions',
reason: this._circuitBreaker.state === BreakerState.Nominal ?
'circuitbreaker state is nominal' : 'partitions unassigned',
});
} catch (error) {
this._log.error('failed to resume partitions', {
method: 'BackbeatConsumer._resumePausedPartitions',
error,
});
}
}
/**
* Circuit breaker state change handler
* Pauses consumption when circuit breaker is tripped
* @param {BreakerState} state circuit breaker state
* @returns {undefined}
*/
_onCircuitBreakerStateChanged(state) {
switch (state) {
case BreakerState.Nominal:
this._resumePausedPartitions();
break;
case BreakerState.Tripped:
this._pauseAssignments();
break;
case BreakerState.Stabilizing:
// Nothing to do, wait for state
// to become nominal
break;
default:
// this should never happen
this._log.warn('unsupported circuit breaker state', {
method: 'BackbeatConsumer._onCircuitBreakerStateChanged',
state,
});
break;
}
}
/**
* @param {kafka.KafkaError} err Rebalance event
* @param {TopicPartition[]} assignment List of (un)assigned partitions
* @returns {void}
*/
_onRebalance(err, assignment) {
if (err.code === kafka.CODES.ERRORS.ERR__ASSIGN_PARTITIONS) {
this._log.info('rdkafka.assign', { assignment });
try {
this._consumer.assign(assignment);
if (this._circuitBreaker.state !== BreakerState.Nominal) {
this._pauseAssignments();
}
} catch (e) {
// Ignore exceptions if we are not connected
const logger = this._consumer.isConnected() ? this._log.error : this._log.debug;
logger.bind(this._log)('rdkafka.assign failed', { e: e.toString(), assignment });
}
} else if (err.code === kafka.CODES.ERRORS.ERR__REVOKE_PARTITIONS) {
this._log.info('rdkafka.revoke', {
assignment,
queueLen: this._processingQueue.length(),
running: this._processingQueue.running(),
});
const unassign = jsutil.once(status => {
this._log.info(`processing queue ${status}, un-assigning`, {
queueLen: this._processingQueue.length(),
running: this._processingQueue.running(),
});
KafkaBacklogMetrics.onRebalance(this._topic, this._groupId, status);
// Reset the drain callback
this._processingQueue.setDrain(() => {});
// Reset the timer
clearTimeout(this._drainProcessQueueTimeout);
this._drainProcessQueueTimeout = null;
try {
// Ensure the state is committed before un-assigning
this._consumer.commit();
} catch (e) {
// Ignore exceptions if we are not connected
const logger = this._consumer.isConnected() &&
e.code !== kafka.CODES.ERRORS.ERR__STATE ? this._log.error : this._log.info;
logger.bind(this._log)('rdkafka.commit failed', { e: e.toString(), assignment });
}
const doUnassign = () => {
this._resumePausedPartitions();
try {
this._consumer.unassign();
} catch (e) {
// Ignore exceptions if we are not connected
const logger = this._consumer.isConnected() ? this._log.error : this._log.info;
logger.bind(this._log)('rdkafka.unassign failed', { e: e.toString(), assignment });
}
this.emit('unassign', status);
};
// publish offsets to zookeeper
if (this._kafkaBacklogMetricsConfig) {
this._publishOffsetsCron(doUnassign);
} else {
doUnassign();
}
});
if (this._processingQueue.idle()) {
unassign(unassignStatus.IDLE);
return;
}
this._processingQueue.setDrain(() => unassign(unassignStatus.DRAINED));
// Set timeout of `max.poll.interval.ms`), to ensure we complete the rebalance
// eventually (even if a task is really stuck), and don't get kicked out of the consumer
// group. If the timeout is reached, since one task (or more) tasks are stuck, we
// disconnect the consumer, so that the healthcheck will fail and the process will get
// restarted.
this._drainProcessQueueTimeout = setTimeout(() => {
unassign(unassignStatus.TIMEOUT);
this._log.error('rdkafka.rebalance timeout: consumer stuck, disconnecting');
this._consumer.disconnect();
}, this._maxPollIntervalMs - 1000); // 1 second earlier, to be within the limit
} else {
this._log.error('rdkafka.rebalance', { err, assignment });
}
}
/**
* Function to be called when safe to commit the consumer offset
* of the given entry
*
* When a task completes from queueProcessor function (constructor
* param), one has to call the callback with an extra argument
* "committable" set to false, like "done(null, { committable:
* false })". This will prevent the consumer from committing any
* consumer group offset on or after this entry for the entry's
* partition, until this.onEntryCommittable(entry) is called when
* safe or desired. Older entries in the same partition may still
* hold commits on this partition if they have not been advertised
* as being committable yet.
*
* Note: this.onEntryCommittable() *must* be called in order to
* allow progress committing the consumer offsets, whenever the
* "committable: false" option has been passed to the task
* callback. It may be called otherwise but will have no effect
* because the entry will already have been considered committable
* at task completion time.
*
* @param {object} entry - entry passed originally as first
* parameter to the "queueProcessor" function
* @return {undefined}
*/
onEntryCommittable(entry) {
const { topic, partition, offset, key, timestamp } = entry;
// record the highest committable offset and let auto-commit
// persist it periodically
const committableOffset =
this._offsetLedger.onOffsetProcessed(topic, partition, offset);
this._log.debug('marked committable entry', {
entry: { topic, partition, offset,
key: key && key.toString(), timestamp },
committableOffset,
groupId: this._groupId,
offsetLedger: this._offsetLedger.toString(),
});
// ensure consumer is active when calling offsetsStore() on
// it, to avoid raising an exception (invalid state)
if (committableOffset !== null && !this.isPaused() && this._consumer.isConnected()) {
try {
this._consumer.offsetsStore([{ topic, partition, offset: committableOffset }]);
} catch (e) {
// offsetsStore() should not throw given the guards above;
// if it does (e.g. ERR__STATE race during a rebalance or
// shutdown), log as error but do not crash — the offset will
// be re-committed after the partition is re-assigned.
this._log.error('offsetsStore failed', {
error: e.toString(),
topic,
partition,
offset: committableOffset,
});
}
}
}
_publishOffsetsCron(cb) {
if (this._publishOffsetsCronActive) {
// skipping
if (cb) {
return process.nextTick(cb);
}
return undefined;
}
this._publishOffsetsCronActive = true;
return this._kafkaBacklogMetrics.publishConsumerBacklog(
this._consumer, this._topic, this._groupId, err => {
this._publishOffsetsCronActive = false;
if (cb) {
// used for shutdown
cb(err);
}
});
}
/**
* get metadata from kafka topics
* @param {object} params - call params
* @param {string} params.topic - topic name
* @param {number} params.timeout - timeout for the request
* @param {function} cb - callback: cb(err, response)
* @return {undefined}
*/
getMetadata(params, cb) {
this._consumer.getMetadata(params, cb);
}
/**
* Bootstrap consumer by periodically sending bootstrap messages
* and wait until it's receiving newly produced messages in a
* timely fashion.
* @return {undefined}
*/
_bootstrapConsumer() {
const self = this;
let lastBootstrapId;
let producer; // eslint-disable-line prefer-const
let producerTimer;
let consumerTimer;
function consumeCb(err, messages) {
if (err) {
return undefined;
}
messages.forEach(message => {
const bootstrapId = JSON.parse(message.value).bootstrapId;
if (bootstrapId) {
self._log.info('bootstraping backbeat consumer: ' +
'received bootstrap message',
{ bootstrapId, topic: self._topic, groupId: self._groupId });
if (bootstrapId === lastBootstrapId) {
self._log.info('backbeat consumer is bootstrapped',
{ topic: self._topic, groupId: self._groupId });
clearInterval(producerTimer);
clearInterval(consumerTimer);
self._consumer.offsetsStore([{
topic: self._topic,
partition: message.partition,
offset: message.offset + 1,
}]);
self._consumer.commit();
self._consumer.unsubscribe();
producer.close(() => {
self._bootstrapping = false;
self._onReady();
});
}
}
});
return undefined;
}
assert.strictEqual(this._consumer, null);
producer = new BackbeatProducer({
kafka: { hosts: this._kafkaHosts },
compressionType: this._producerCompressionType,
requiredAcks: this._producerRequiredAcks,
topic: this._topic,
});
producer.on('ready', () => {
producerTimer = setInterval(() => {
lastBootstrapId = `${Math.round(Math.random() * 1000000)}`;
const contents = `{"bootstrapId":"${lastBootstrapId}"}`;
this._log.info('bootstraping backbeat consumer: ' +
'sending bootstrap message',
{ contents, topic: this._topic, groupId: this._groupId });
producer.send([{ key: 'bootstrap',
message: contents }],
() => {});
if (!this._consumer) {
setTimeout(() => {
this._bootstrapping = true;
this._initConsumer();
this._consumer.on('ready', () => {
this._consumer.subscribe([this._topic]);
consumerTimer = setInterval(() => {
this._consumer.consume(1, consumeCb);
}, 200);
});
}, 500);
}
}, 5000);
});
}
/**
* pause the kafka consumer
* @param {string} site - name of site
* @return {undefined}
*/
pause(site) {
// Use of KafkaConsumer#pause did not work. Using alternative
// of unsubscribe/subscribe
this._consumer.unsubscribe();
this._log.debug(`paused consumer for location: ${site}`, {
method: 'BackbeatConsumer.pause',
topic: this._topic,
groupId: this._groupId,
});
}
/**
* resume the kafka consumer
* @param {string} site - name of site
* @return {undefined}
*/
resume(site) {
// if not subscribed, then subscribe
if (this._consumer.subscription().length === 0) {
this._consumer.subscribe([this._topic]);
this._log.debug(`resumed consumer for location: ${site}`, {
method: 'BackbeatConsumer.resume',
topic: this._topic,
groupId: this._groupId,
});
}
}
/**
* tells whether the consumer is in paused state
*
* Note that right now this duplicates getServiceStatus(), but
* this one is meant to be backported to the oldest release that
* needs it, and is potentially more specific than
* getServiceStatus().
*
* @return {boolean} true if paused
*/
isPaused() {
return this._consumer.subscription().length === 0;
}
/**
* check if the kafka consumer is active or paused
* @return {boolean} if false, consumer is paused