-
Notifications
You must be signed in to change notification settings - Fork 23
Expand file tree
/
Copy pathtask.js
More file actions
75 lines (65 loc) · 2.57 KB
/
task.js
File metadata and controls
75 lines (65 loc) · 2.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
'use strict';
require('../../../lib/otel');
const werelogs = require('werelogs');
const ReplicationStatusProcessor = require('./ReplicationStatusProcessor');
const { startProbeServer } = require('../../../lib/util/probe');
const { DEFAULT_LIVE_ROUTE, DEFAULT_METRICS_ROUTE, DEFAULT_READY_ROUTE } =
require('arsenal').network.probe.ProbeServer;
const config = require('../../../lib/Config');
const kafkaConfig = config.kafka;
const repConfig = config.extensions.replication;
const sourceConfig = repConfig.source;
const internalHttpsConfig = config.internalHttps;
const mConfig = config.metrics;
const gcConfig = config.extensions.gc;
const { initManagement } = require('../../../lib/management/index');
const replicationStatusProcessor = new ReplicationStatusProcessor(
kafkaConfig, sourceConfig, repConfig, internalHttpsConfig, mConfig, gcConfig);
werelogs.configure({ level: config.log.logLevel,
dump: config.log.dumpLevel });
const logger = new werelogs.Logger('backbeat:ReplicationStatusProcessor:Init');
function initAndStart() {
initManagement({
serviceName: 'replication',
serviceAccount: sourceConfig.auth.account,
}, error => {
if (error) {
logger.error('could not load management db', { error });
setTimeout(initAndStart, 5000);
return;
}
replicationStatusProcessor.start(null, startProbeServer(
repConfig.replicationStatusProcessor.probeServer,
(err, probeServer) => {
if (err) {
logger.error('error starting probe server', {
error: err,
method: 'ReplicationStatusProcessor::startProbeServer',
});
return;
}
if (probeServer !== undefined) {
probeServer.addHandler([DEFAULT_LIVE_ROUTE, DEFAULT_READY_ROUTE],
(res, log) => replicationStatusProcessor.handleLiveness(res, log)
);
probeServer.addHandler(DEFAULT_METRICS_ROUTE,
(res, log) => replicationStatusProcessor.handleMetrics(res, log)
);
}
logger.info('management init done');
}
));
});
}
initAndStart();
process.on('SIGTERM', () => {
logger.info('received SIGTERM, exiting');
replicationStatusProcessor.stop(error => {
if (error) {
logger.error('failed to exit properly', {
error,
});
process.exit(1);
}
});
});