-
Notifications
You must be signed in to change notification settings - Fork 23
Expand file tree
/
Copy pathtask.js
More file actions
126 lines (110 loc) · 3.81 KB
/
task.js
File metadata and controls
126 lines (110 loc) · 3.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
'use strict';
require('../../../lib/otel');
const async = require('async');
const werelogs = require('werelogs');
const { errors } = require('arsenal');
const {
DEFAULT_LIVE_ROUTE,
DEFAULT_READY_ROUTE,
DEFAULT_METRICS_ROUTE,
} = require('arsenal').network.probe.ProbeServer;
const { sendSuccess, sendError } = require('arsenal').network.probe.Utils;
const { ZenkoMetrics } = require('arsenal').metrics;
const { initManagement } = require('../../../lib/management/index');
const LifecycleObjectExpirationProcessor = require('./LifecycleObjectExpirationProcessor');
const LifecycleObjectTransitionProcessor = require('./LifecycleObjectTransitionProcessor');
const { startProbeServer } = require('../../../lib/util/probe');
const config = require('../../../lib/Config');
const zkConfig = config.zookeeper;
const kafkaConfig = config.kafka;
const lcConfig = config.extensions.lifecycle;
const s3Config = config.s3;
const transport = config.transport;
const logger = new werelogs.Logger('Backbeat:Lifecycle:Consumer');
let objectProcessor;
switch (process.env.LIFECYCLE_OBJECT_PROCESSOR_TYPE) {
case 'transition':
objectProcessor = new LifecycleObjectTransitionProcessor(
zkConfig, kafkaConfig, lcConfig, s3Config, transport);
break;
case 'expiration': // fallthrough
default:
objectProcessor = new LifecycleObjectExpirationProcessor(
zkConfig, kafkaConfig, lcConfig, s3Config, transport);
break;
}
werelogs.configure({ level: config.log.logLevel,
dump: config.log.dumpLevel });
function livenessCheck(res, log) {
if (objectProcessor.isReady()) {
sendSuccess(res, log);
} else {
sendError(res, log, errors.ServiceUnavailable, 'unhealthy');
}
}
/**
* Handle ProbeServer metrics
*
* @param {http.HTTPServerResponse} res - HTTP Response to respond with
* @param {Logger} log - Logger
* @returns {undefined}
*/
async function handleMetrics(res, log) {
log.debug('metrics requested');
res.writeHead(200, {
'Content-Type': ZenkoMetrics.asPrometheusContentType(),
});
const metrics = await ZenkoMetrics.asPrometheus();
res.end(metrics);
}
function probeServerSetup(config, done) {
startProbeServer(config, (err, probeServer) => {
if (err) {
return done(err);
}
if (!probeServer) {
logger.info('Skipping lifecycle object processor server setup');
return done();
}
probeServer.addHandler(DEFAULT_LIVE_ROUTE, livenessCheck);
probeServer.addHandler(DEFAULT_READY_ROUTE, livenessCheck);
// retaining the old route and adding support to new route, until
// metrics handling is consolidated
probeServer.addHandler(['/_/monitoring/metrics', DEFAULT_METRICS_ROUTE], handleMetrics);
logger.info('Starting lifecycle object processor server');
return done();
});
}
function initAndStart(done) {
initManagement({
serviceName: 'lifecycle',
serviceAccount: lcConfig.auth.account,
}, error => {
if (error) {
logger.error('could not load management db', { error });
setTimeout(initAndStart, 5000, done);
return;
}
logger.info('management init done');
done();
return;
});
}
async.waterfall([
done => initAndStart(done),
done => objectProcessor.start(err => done(err)),
done => probeServerSetup(lcConfig.objectProcessor.probeServer, done),
], err => {
if (err) {
logger.error('error during lifecycle object processor initialization',
{ error: err.message });
process.exit(1);
}
logger.info('lifecycle object processor running!');
});
process.on('SIGTERM', () => {
logger.info('received SIGTERM, exiting');
objectProcessor.close(() => {
process.exit(0);
});
});