-
Notifications
You must be signed in to change notification settings - Fork 23
Expand file tree
/
Copy pathKafkaNotificationDestination.js
More file actions
160 lines (151 loc) · 5.06 KB
/
KafkaNotificationDestination.js
File metadata and controls
160 lines (151 loc) · 5.06 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
const NotificationDestination = require('./NotificationDestination');
const KafkaProducer = require('./KafkaProducer');
const { ZenkoMetrics } = require('arsenal').metrics;
const notificationSize = ZenkoMetrics.createCounter({
name: 's3_notification_queue_processor_notification_size_total',
help: 'Total size of the notifications that were sent',
labelNames: ['target'],
});
const deliveryLag = ZenkoMetrics.createHistogram({
name: 's3_notification_queue_processor_delivery_delay_seconds',
help: 'Difference between the time a notification is sent and when it gets delivered',
labelNames: ['target', 'status'],
buckets: [0.001, 0.01, 1, 10, 100, 1000, 10000],
});
function onNotificationSent(target, status, messages, delay) {
if (status === 'success') {
// Update total size of notifications sent
const messageSize = new TextEncoder().encode(JSON.stringify(messages)).length;
notificationSize.inc({
target,
}, messageSize);
}
if (delay) {
// update notification delivery delay
deliveryLag.observe({
target,
status,
}, delay);
}
}
class KafkaNotificationDestination extends NotificationDestination {
/**
* @constructor
* @param {Object} params - constructor params
* @param {Object} params.destConfig - destination-specific
* configuration object
* @param {Logger} params.logger - logger object
*/
constructor(params) {
super(params);
this._notificationProducer = null;
}
_setupProducer(done) {
const { host, port, topic, pollIntervalMs, auth, requiredAcks, compressionType } = this._destinationConfig;
let kafkaHost = host;
if (port) {
kafkaHost = `${host}:${port}`;
}
const producer = new KafkaProducer({
kafka: { hosts: kafkaHost },
topic,
pollIntervalMs,
auth,
compressionType,
requiredAcks,
});
producer.once('error', done);
producer.once('ready', () => {
producer.removeAllListeners('error');
producer.on('error', err => {
this._log.error('error from kafka producer', {
topic,
error: err,
});
});
this._notificationProducer = producer;
done();
});
}
/**
* Init/setup/authenticate the notification client
*
* @param {function} done - callback
* @return {undefined}
*/
init(done) {
this._setupProducer(err => {
if (err) {
this._log.info('error setting up kafka notif destination',
{ error: err.message });
done(err);
} else {
done();
}
});
}
/**
* Process entry in the sub-class and send it
*
* @param {Object[]} messages - message to be sent
* @param {function} done - callback when delivery report has been received
* @return {undefined}
*/
send(messages, done) {
const starTime = Date.now();
// Trust boundary: strip any trace headers before producing to the
// external customer Kafka destination. There's no ingress-layer
// strip on the Kafka protocol (unlike HTTP via nginx), so the only
// place to drop headers is at the producer. Keeps everything else
// on the message intact.
const safeMessages = Array.isArray(messages)
? messages.map(m => {
if (m && m.headers) {
// eslint-disable-next-line no-unused-vars
const { headers, ...rest } = m;
return rest;
}
return m;
})
: messages;
this._notificationProducer.send(safeMessages, error => {
if (error) {
const { host, topic } = this._destinationConfig;
this._log.error('error in message delivery to external Kafka destination', {
method: 'KafkaNotificationDestination.send',
host,
topic,
error: error.message,
});
onNotificationSent(
this._destinationConfig.resource,
'failure',
messages,
null,
);
return done();
}
const delay = (Date.now() - starTime) / 1000;
onNotificationSent(
this._destinationConfig.resource,
'success',
messages,
delay,
);
return done();
});
}
/**
* Stop the notification client
*
* @param {function} done - callback
* @return {undefined}
*/
stop(done) {
if (!this._notificationProducer) {
return setImmediate(done);
}
return this._notificationProducer.close(done);
}
}
module.exports = KafkaNotificationDestination;