-
Notifications
You must be signed in to change notification settings - Fork 255
Expand file tree
/
Copy pathabortMultipartUpload.js
More file actions
234 lines (222 loc) · 11.2 KB
/
abortMultipartUpload.js
File metadata and controls
234 lines (222 loc) · 11.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
const async = require('async');
const constants = require('../../../../constants');
const { data } = require('../../../data/wrapper');
const locationConstraintCheck = require('../object/locationConstraintCheck');
const metadataUtils = require('../../../metadata/metadataUtils');
const { validateQuotas } = require('../quotas/quotaUtils');
const services = require('../../../services');
const metadata = require('../../../metadata/wrapper');
function abortMultipartUpload(authInfo, bucketName, objectKey, uploadId, log,
callback, request) {
const metadataValMPUparams = {
authInfo,
bucketName,
objectKey,
uploadId,
preciseRequestType: request.apiMethods || 'multipartDelete',
request,
};
log.debug('processing request', { method: 'abortMultipartUpload' });
// For validating the request at the destinationBucket level
// params are the same as validating at the MPU level
// but the requestType is the more general 'objectDelete'
const metadataValParams = Object.assign({}, metadataValMPUparams);
metadataValParams.requestType = 'objectPut';
const authzIdentityResult = request ? request.actionImplicitDenies : false;
async.waterfall([
function checkDestBucketVal(next) {
metadataUtils.standardMetadataValidateBucketAndObj(metadataValParams, authzIdentityResult, log,
(err, destinationBucket, objectMD) => {
if (err) {
log.error('error validating request', { error: err });
return next(err, destinationBucket);
}
if (destinationBucket.policies) {
// TODO: Check bucket policies to see if user is granted
// permission or forbidden permission to take
// given action.
// If permitted, add 'bucketPolicyGoAhead'
// attribute to params for validating at MPU level.
// This is GH Issue#76
metadataValMPUparams.requestType =
'bucketPolicyGoAhead';
}
return next(null, destinationBucket, objectMD);
});
},
function checkMPUval(destBucket, objectMD, next) {
metadataValParams.log = log;
services.metadataValidateMultipart(metadataValParams,
(err, mpuBucket, mpuOverviewObj) => {
if (err) {
log.error('error validating multipart', { error: err });
return next(err, destBucket);
}
return next(err, mpuBucket, mpuOverviewObj, destBucket, objectMD);
});
},
function abortExternalMpu(mpuBucket, mpuOverviewObj, destBucket, objectMD,
next) {
const location = mpuOverviewObj.controllingLocationConstraint;
const originalIdentityAuthzResults = request.actionImplicitDenies;
// eslint-disable-next-line no-param-reassign
delete request.actionImplicitDenies;
return data.abortMPU(objectKey, uploadId, location, bucketName,
request, destBucket, locationConstraintCheck, log,
(err, skipDataDelete) => {
// eslint-disable-next-line no-param-reassign
request.actionImplicitDenies = originalIdentityAuthzResults;
if (err) {
log.error('error aborting MPU', { error: err });
return next(err, destBucket);
}
// for Azure and GCP we do not need to delete data
// for all other backends, skipDataDelete will be set to false
return next(null, mpuBucket, destBucket, objectMD, skipDataDelete);
});
},
function getPartLocations(mpuBucket, destBucket, objectMD, skipDataDelete,
next) {
services.getMPUparts(mpuBucket.getName(), uploadId, log,
(err, result) => {
if (err) {
log.error('error getting parts', { error: err });
return next(err, destBucket);
}
const storedParts = result.Contents;
return next(null, mpuBucket, storedParts, destBucket, objectMD,
skipDataDelete);
});
},
// During Abort, we dynamically detect if the previous CompleteMPU call
// created potential object metadata wrongly, e.g. by creating
// an object version when some of the parts are missing.
// By passing a null objectMD, we tell the subsequent steps
// to skip the cleanup.
// Another approach is possible, but not supported by all backends:
// to honor the uploadId filter in standardMetadataValidateBucketAndObj
// ensuring the objMD returned has the right uploadId. But this is not
// supported by Metadata.
function findObjectToCleanup(mpuBucket, storedParts, destBucket,
objectMD, skipDataDelete, next) {
if (!objectMD) {
return next(null, mpuBucket, storedParts, destBucket, null, skipDataDelete);
}
// If objectMD exists and has matching uploadId, use it directly
// This handles all non-versioned cases, and some versioned cases.
if (objectMD.uploadId === uploadId) {
return next(null, mpuBucket, storedParts, destBucket, objectMD, skipDataDelete);
}
// If bucket is not versioned, no need to check versions:
// as the uploadId is not the same, we skip the cleanup.
if (!destBucket.isVersioningEnabled()) {
return next(null, mpuBucket, storedParts, destBucket, null, skipDataDelete);
}
// Otherwise, list all versions to find one with a matching uploadId.
return services.findObjectVersionByUploadId(bucketName, objectKey, uploadId, log, (err, foundVersion) => {
if (err) {
log.warn('error finding object version by uploadId, proceeding without cleanup', {
error: err,
method: 'abortMultipartUpload.findObjectToCleanup',
});
// On error, continue the abort without an objectMD to clean up.
return next(null, mpuBucket, storedParts, destBucket, null, skipDataDelete);
}
return next(null, mpuBucket, storedParts, destBucket, foundVersion, skipDataDelete);
});
},
function deleteObjectMetadata(mpuBucket, storedParts, destBucket, objectMD,
skipDataDelete, next) {
if (!objectMD) {
return next(null, mpuBucket, storedParts, destBucket, objectMD, skipDataDelete);
}
log.debug('Object has existing metadata, deleting them', {
method: 'abortMultipartUpload',
bucketName,
objectKey,
uploadId,
versionId: objectMD.versionId,
});
return metadata.deleteObjectMD(bucketName, objectKey, {
versionId: objectMD.versionId,
}, log, err => {
if (err) {
// Handle concurrent deletion of this object metadata
if (err.is?.NoSuchKey) {
log.debug('object metadata already deleted or does not exist', {
method: 'abortMultipartUpload',
bucketName,
objectKey,
versionId: objectMD.versionId,
});
} else {
log.error('error deleting object metadata', { error: err });
}
}
// Continue with the operation regardless of deletion success/failure
// The important part is that we tried to clean up
return next(null, mpuBucket, storedParts, destBucket, objectMD, skipDataDelete);
});
},
function deleteData(mpuBucket, storedParts, destBucket, objectMD,
skipDataDelete, next) {
if (skipDataDelete) {
return next(null, mpuBucket, storedParts, destBucket);
}
// The locations were sent to metadata as an array
// under partLocations. Pull the partLocations.
const locations = storedParts.flatMap(item => item.value.partLocations);
if (locations.length === 0) {
return next(null, mpuBucket, storedParts, destBucket);
}
// Add object data locations if they exist
if (objectMD?.location) {
const existingLocations = new Set(locations.map(loc => loc.key));
const remainingObjectLocations = objectMD.
location.filter(loc => !existingLocations.has(loc.key));
locations.push(...remainingObjectLocations);
}
return async.eachLimit(locations, 5, (loc, cb) => {
data.delete(loc, log, err => {
if (err) {
log.warn('delete ObjectPart failed', { err });
}
cb();
});
}, () => {
const length = storedParts.reduce((length, loc) => length + loc.value.Size, 0);
return validateQuotas(request, destBucket, request.accountQuotas,
['objectDelete'], 'objectDelete', -length, false, log, err => {
if (err) {
// Ignore error, as the data has been deleted already: only inflight count
// has not been updated, and will be eventually consistent anyway
log.warn('failed to update inflights', {
method: 'abortMultipartUpload',
locations,
error: err,
});
}
next(null, mpuBucket, storedParts, destBucket);
});
});
},
function deleteShadowObjectMetadata(mpuBucket, storedParts, destBucket, next) {
let splitter = constants.splitter;
// BACKWARD: Remove to remove the old splitter
if (mpuBucket.getMdBucketModelVersion() < 2) {
splitter = constants.oldSplitter;
}
// Reconstruct mpuOverviewKey
const mpuOverviewKey =
`overview${splitter}${objectKey}${splitter}${uploadId}`;
// Get the sum of all part sizes to include in pushMetric object
const partSizeSum = storedParts.map(item => item.value.Size)
.reduce((currPart, nextPart) => currPart + nextPart, 0);
const keysToDelete = storedParts.map(item => item.key);
keysToDelete.push(mpuOverviewKey);
services.batchDeleteObjectMetadata(mpuBucket.getName(),
keysToDelete, log, err => next(err, destBucket, partSizeSum));
},
], callback);
}
module.exports = abortMultipartUpload;