Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.hadoop.conf.Configuration;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -66,7 +65,7 @@ public class ShuffleFlushManager {
private final boolean storageTypeWithMemory;
private Configuration hadoopConf;
// appId -> shuffleId -> committed shuffle blockIds
private Map<String, Map<Integer, Roaring64NavigableMap>> committedBlockIds =
private Map<String, Map<Integer, AtomicLong>> committedBlockIdCount =
JavaUtils.newConcurrentMap();
private final int retryMax;

Expand Down Expand Up @@ -98,9 +97,9 @@ public ShuffleFlushManager(
ShuffleServerMetrics.addLabeledCacheGauge(
COMMITTED_BLOCK_COUNT,
() ->
committedBlockIds.values().stream()
committedBlockIdCount.values().stream()
.flatMap(innerMap -> innerMap.values().stream())
.mapToLong(bitmap -> bitmap.getLongCardinality())
.mapToLong(AtomicLong::get)
.sum(),
2 * 60 * 1000L /* 2 minutes */);
this.storageTypeWithMemory = StorageType.withMemory(StorageType.valueOf(storageType));
Expand Down Expand Up @@ -256,41 +255,37 @@ private String getShuffleServerId() {

private void updateCommittedBlockIds(
String appId, int shuffleId, Collection<ShufflePartitionedBlock> blocks) {
if (blocks == null || blocks.size() == 0) {
if (blocks == null || blocks.isEmpty()) {
return;
}
committedBlockIds.computeIfAbsent(appId, key -> JavaUtils.newConcurrentMap());
Map<Integer, Roaring64NavigableMap> shuffleToBlockIds = committedBlockIds.get(appId);
shuffleToBlockIds.computeIfAbsent(shuffleId, key -> Roaring64NavigableMap.bitmapOf());
Roaring64NavigableMap bitmap = shuffleToBlockIds.get(shuffleId);
synchronized (bitmap) {
for (ShufflePartitionedBlock spb : blocks) {
bitmap.addLong(spb.getBlockId());
}
}
committedBlockIdCount.computeIfAbsent(appId, key -> JavaUtils.newConcurrentMap());
Map<Integer, AtomicLong> shuffleToBlockIds = committedBlockIdCount.get(appId);
shuffleToBlockIds.computeIfAbsent(shuffleId, key -> new AtomicLong());
AtomicLong blockCounter = shuffleToBlockIds.get(shuffleId);
blockCounter.addAndGet(blocks.size());
}

public Roaring64NavigableMap getCommittedBlockIds(String appId, Integer shuffleId) {
Map<Integer, Roaring64NavigableMap> shuffleIdToBlockIds = committedBlockIds.get(appId);
if (shuffleIdToBlockIds == null) {
LOG.warn("Unexpected value when getCommittedBlockIds for appId[" + appId + "]");
return Roaring64NavigableMap.bitmapOf();
public long getCommittedBlockCount(String appId, Integer shuffleId) {
Map<Integer, AtomicLong> shuffleIdToBlockCount = committedBlockIdCount.get(appId);
if (shuffleIdToBlockCount == null) {
LOG.warn("Unexpected value when getCommittedBlockCount for appId[" + appId + "]");
return 0;
}
Roaring64NavigableMap blockIds = shuffleIdToBlockIds.get(shuffleId);
if (blockIds == null) {
AtomicLong blockCount = shuffleIdToBlockCount.get(shuffleId);
if (blockCount == null) {
LOG.warn(
"Unexpected value when getCommittedBlockIds for appId["
"Unexpected value when getCommittedBlockCount for appId["
+ appId
+ "], shuffleId["
+ shuffleId
+ "]");
return Roaring64NavigableMap.bitmapOf();
return 0;
}
return blockIds;
return blockCount.get();
}

public void removeResources(String appId) {
committedBlockIds.remove(appId);
committedBlockIdCount.remove(appId);
}

protected void initHadoopConf() {
Expand All @@ -314,7 +309,7 @@ public Configuration getHadoopConf() {
}

public void removeResourcesOfShuffleId(String appId, Collection<Integer> shuffleIds) {
Optional.ofNullable(committedBlockIds.get(appId))
Optional.ofNullable(committedBlockIdCount.get(appId))
.ifPresent(shuffleIdToBlockIds -> shuffleIds.forEach(shuffleIdToBlockIds::remove));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -54,7 +53,7 @@ public class ShuffleTaskInfo {

private Map<Integer, Object> commitLocks;
/** shuffleId -> blockIds */
private Map<Integer, Roaring64NavigableMap> cachedBlockIds;
private Map<Integer, AtomicLong> cachedBlockCount;

private AtomicReference<String> user;

Expand Down Expand Up @@ -91,7 +90,7 @@ public ShuffleTaskInfo(String appId) {
this.currentTimes = System.currentTimeMillis();
this.commitCounts = JavaUtils.newConcurrentMap();
this.commitLocks = JavaUtils.newConcurrentMap();
this.cachedBlockIds = JavaUtils.newConcurrentMap();
this.cachedBlockCount = JavaUtils.newConcurrentMap();
this.user = new AtomicReference<>();
this.partitionDataSizes = JavaUtils.newConcurrentMap();
this.hugePartitionTags = JavaUtils.newConcurrentMap();
Expand All @@ -118,8 +117,8 @@ public Map<Integer, Object> getCommitLocks() {
return commitLocks;
}

public Map<Integer, Roaring64NavigableMap> getCachedBlockIds() {
return cachedBlockIds;
public Map<Integer, AtomicLong> getCachedBlockCount() {
return cachedBlockCount;
}

public String getUser() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.OutputUtils;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.common.util.ThreadUtils;
import org.apache.uniffle.common.util.UnitConverter;
import org.apache.uniffle.server.block.ShuffleBlockIdManager;
Expand Down Expand Up @@ -269,9 +268,9 @@ public ShuffleTaskManager(
CACHED_BLOCK_COUNT,
() ->
shuffleTaskInfos.values().stream()
.map(ShuffleTaskInfo::getCachedBlockIds)
.map(ShuffleTaskInfo::getCachedBlockCount)
.flatMap(map -> map.values().stream())
.mapToLong(Roaring64NavigableMap::getLongCardinality)
.mapToLong(AtomicLong::get)
.sum(),
2 * 60 * 1000L /* 2 minutes */);
}
Expand Down Expand Up @@ -391,8 +390,8 @@ void removeAndReleasePreAllocatedBuffer(long requireBufferId) {
public StatusCode commitShuffle(String appId, int shuffleId) throws Exception {
long start = System.currentTimeMillis();
refreshAppId(appId);
Roaring64NavigableMap cachedBlockIds = getCachedBlockIds(appId, shuffleId);
Roaring64NavigableMap cloneBlockIds;
long cachedBlockCount = getCachedBlockCount(appId, shuffleId);

ShuffleTaskInfo shuffleTaskInfo =
shuffleTaskInfos.computeIfAbsent(appId, x -> new ShuffleTaskInfo(appId));
Object lock = shuffleTaskInfo.getCommitLocks().computeIfAbsent(shuffleId, x -> new Object());
Expand All @@ -401,27 +400,16 @@ public StatusCode commitShuffle(String appId, int shuffleId) throws Exception {
if (System.currentTimeMillis() - start > commitTimeout) {
throw new RssException("Shuffle data commit timeout for " + commitTimeout + " ms");
}
synchronized (cachedBlockIds) {
cloneBlockIds = RssUtils.cloneBitMap(cachedBlockIds);
}
long expectedCommitted = cloneBlockIds.getLongCardinality();
long expectedCommitted = cachedBlockCount;
shuffleBufferManager.commitShuffleTask(appId, shuffleId);
Roaring64NavigableMap committedBlockIds;
Roaring64NavigableMap cloneCommittedBlockIds;
long checkInterval = 1000L;
while (true) {
committedBlockIds = shuffleFlushManager.getCommittedBlockIds(appId, shuffleId);
synchronized (committedBlockIds) {
cloneCommittedBlockIds = RssUtils.cloneBitMap(committedBlockIds);
}
cloneBlockIds.andNot(cloneCommittedBlockIds);
if (cloneBlockIds.isEmpty()) {
break;
}
long remain = expectedCommitted;
while (remain > 0) {
Thread.sleep(checkInterval);
if (System.currentTimeMillis() - start > commitTimeout) {
throw new RssException("Shuffle data commit timeout for " + commitTimeout + " ms");
}
remain = expectedCommitted - shuffleFlushManager.getCommittedBlockCount(appId, shuffleId);
LOG.info(
"Checking commit result for appId["
+ appId
Expand All @@ -430,7 +418,7 @@ public StatusCode commitShuffle(String appId, int shuffleId) throws Exception {
+ "], expect committed["
+ expectedCommitted
+ "], remain["
+ cloneBlockIds.getLongCardinality()
+ remain
+ "]");
checkInterval = Math.min(checkInterval * 2, commitCheckIntervalMax);
}
Expand Down Expand Up @@ -494,50 +482,41 @@ public void updateCachedBlockIds(
}
ShuffleTaskInfo shuffleTaskInfo =
shuffleTaskInfos.computeIfAbsent(appId, x -> new ShuffleTaskInfo(appId));
long size = 0L;
// With memory storage type should never need cachedBlockIds,
int blockCount = spbs.length - shufflePartitionedData.getDuplicateBlockCount();
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicated blocks will not be countd. @roryqi
Furthermore, based on the current code, the same block will not be committed twice.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @zuston WDYT?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, the counter may be not accurate. We have too many branches. It is hard to prove it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need commit the data? If we use memory mode, we can avoid committing the blocks.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need commit the data? If we use memory mode, we can avoid committing the blocks.

+1

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need commit the data? If we use memory mode, we can avoid committing the blocks.

But we have an option not to use memory, or we should disable this option in non-unit test scenarios.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think twice. It may not influence our correctness. Because we don't use committed bitmap to judge correctness.

// With memory storage type should never need cachedBlockCount,
// since client do not need call finish shuffle rpc
if (!storageTypeWithMemory) {
Roaring64NavigableMap bitmap =
shuffleTaskInfo
.getCachedBlockIds()
.computeIfAbsent(shuffleId, x -> Roaring64NavigableMap.bitmapOf());

synchronized (bitmap) {
for (ShufflePartitionedBlock spb : spbs) {
bitmap.addLong(spb.getBlockId());
size += spb.getEncodedLength();
}
}
} else {
for (ShufflePartitionedBlock spb : spbs) {
size += spb.getEncodedLength();
}
AtomicLong counter =
shuffleTaskInfo.getCachedBlockCount().computeIfAbsent(shuffleId, x -> new AtomicLong());

counter.addAndGet(blockCount);
}
int blockCount = spbs.length - shufflePartitionedData.getDuplicateBlockCount();
shuffleBufferManager.addInMemoryBlockCount(blockCount);
shuffleTaskInfo.addInMemoryBlockCount(blockCount);
long partitionSize =
shuffleTaskInfo.addPartitionDataSize(
shuffleId, partitionId, size - shufflePartitionedData.getDuplicateBlockSize());
shuffleId,
partitionId,
shufflePartitionedData.getTotalBlockEncodedLength()
- shufflePartitionedData.getDuplicateBlockSize());
HugePartitionUtils.markHugePartition(
shuffleBufferManager, shuffleTaskInfo, shuffleId, partitionId, partitionSize);
}

public Roaring64NavigableMap getCachedBlockIds(String appId, int shuffleId) {
Map<Integer, Roaring64NavigableMap> shuffleIdToBlockIds =
shuffleTaskInfos.getOrDefault(appId, new ShuffleTaskInfo(appId)).getCachedBlockIds();
Roaring64NavigableMap blockIds = shuffleIdToBlockIds.get(shuffleId);
if (blockIds == null) {
public long getCachedBlockCount(String appId, int shuffleId) {
Map<Integer, AtomicLong> shuffleIdToBlockIds =
shuffleTaskInfos.getOrDefault(appId, new ShuffleTaskInfo(appId)).getCachedBlockCount();
AtomicLong blockCount = shuffleIdToBlockIds.get(shuffleId);
if (blockCount == null) {
LOG.warn(
"Unexpected value when getCachedBlockIds for appId["
"Unexpected value when getCachedBlockCount for appId["
+ appId
+ "], shuffleId["
+ shuffleId
+ "]");
return Roaring64NavigableMap.bitmapOf();
return 0L;
}
return blockIds;
return blockCount.get();
}

public long getPartitionDataSize(String appId, int shuffleId, int partitionId) {
Expand Down Expand Up @@ -823,7 +802,7 @@ public void removeResourcesByShuffleIds(
final ShuffleTaskInfo taskInfo = shuffleTaskInfos.get(appId);
if (taskInfo != null) {
for (Integer shuffleId : shuffleIds) {
taskInfo.getCachedBlockIds().remove(shuffleId);
taskInfo.getCachedBlockCount().remove(shuffleId);
taskInfo.getCommitCounts().remove(shuffleId);
taskInfo.getCommitLocks().remove(shuffleId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ public void clearTest() throws Exception {
manager.addToFlushQueue(event2);
waitForFlush(manager, appId1, 1, 5);
waitForFlush(manager, appId2, 1, 5);
assertEquals(5, manager.getCommittedBlockIds(appId1, 1).getLongCardinality());
assertEquals(5, manager.getCommittedBlockIds(appId2, 1).getLongCardinality());
assertEquals(5, manager.getCommittedBlockCount(appId1, 1));
assertEquals(5, manager.getCommittedBlockCount(appId2, 1));
assertEquals(storageManager.selectStorage(event1), storageManager.selectStorage(event2));
AbstractStorage storage = (AbstractStorage) storageManager.selectStorage(event1);
int size = storage.getHandlerSize();
Expand Down Expand Up @@ -155,15 +155,15 @@ public void clearTest() throws Exception {

assertTrue(kerberizedHadoop.getFileSystem().exists(new Path(remoteStorage.getPath())));

assertEquals(0, manager.getCommittedBlockIds(appId1, 1).getLongCardinality());
assertEquals(5, manager.getCommittedBlockIds(appId2, 1).getLongCardinality());
assertEquals(0, manager.getCommittedBlockCount(appId1, 1));
assertEquals(5, manager.getCommittedBlockCount(appId2, 1));
size = storage.getHandlerSize();
assertEquals(1, size);
manager.removeResources(appId2);
assertTrue(((HadoopStorageManager) storageManager).getAppIdToStorages().containsKey(appId2));
storageManager.removeResources(new AppPurgeEvent(appId2, "alex", Arrays.asList(1)));
assertFalse(((HadoopStorageManager) storageManager).getAppIdToStorages().containsKey(appId2));
assertEquals(0, manager.getCommittedBlockIds(appId2, 1).getLongCardinality());
assertEquals(0, manager.getCommittedBlockCount(appId2, 1));
size = storage.getHandlerSize();
assertEquals(0, size);
}
Expand Down
Loading
Loading