diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java index 6fcf7e5c73..4cf553df6a 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java @@ -28,7 +28,6 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.hadoop.conf.Configuration; -import org.roaringbitmap.longlong.Roaring64NavigableMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,7 +65,7 @@ public class ShuffleFlushManager { private final boolean storageTypeWithMemory; private Configuration hadoopConf; // appId -> shuffleId -> committed shuffle blockIds - private Map> committedBlockIds = + private Map> committedBlockIdCount = JavaUtils.newConcurrentMap(); private final int retryMax; @@ -98,9 +97,9 @@ public ShuffleFlushManager( ShuffleServerMetrics.addLabeledCacheGauge( COMMITTED_BLOCK_COUNT, () -> - committedBlockIds.values().stream() + committedBlockIdCount.values().stream() .flatMap(innerMap -> innerMap.values().stream()) - .mapToLong(bitmap -> bitmap.getLongCardinality()) + .mapToLong(AtomicLong::get) .sum(), 2 * 60 * 1000L /* 2 minutes */); this.storageTypeWithMemory = StorageType.withMemory(StorageType.valueOf(storageType)); @@ -256,41 +255,37 @@ private String getShuffleServerId() { private void updateCommittedBlockIds( String appId, int shuffleId, Collection blocks) { - if (blocks == null || blocks.size() == 0) { + if (blocks == null || blocks.isEmpty()) { return; } - committedBlockIds.computeIfAbsent(appId, key -> JavaUtils.newConcurrentMap()); - Map shuffleToBlockIds = committedBlockIds.get(appId); - shuffleToBlockIds.computeIfAbsent(shuffleId, key -> Roaring64NavigableMap.bitmapOf()); - Roaring64NavigableMap bitmap = shuffleToBlockIds.get(shuffleId); - synchronized (bitmap) { - for (ShufflePartitionedBlock spb : blocks) { - bitmap.addLong(spb.getBlockId()); - } - } + committedBlockIdCount.computeIfAbsent(appId, key -> JavaUtils.newConcurrentMap()); + Map shuffleToBlockIds = committedBlockIdCount.get(appId); + shuffleToBlockIds.computeIfAbsent(shuffleId, key -> new AtomicLong()); + AtomicLong blockCounter = shuffleToBlockIds.get(shuffleId); + blockCounter.addAndGet(blocks.size()); } - public Roaring64NavigableMap getCommittedBlockIds(String appId, Integer shuffleId) { - Map shuffleIdToBlockIds = committedBlockIds.get(appId); - if (shuffleIdToBlockIds == null) { - LOG.warn("Unexpected value when getCommittedBlockIds for appId[" + appId + "]"); - return Roaring64NavigableMap.bitmapOf(); + public long getCommittedBlockCount(String appId, Integer shuffleId) { + Map shuffleIdToBlockCount = committedBlockIdCount.get(appId); + if (shuffleIdToBlockCount == null) { + LOG.warn("Unexpected value when getCommittedBlockCount for appId[" + appId + "]"); + return 0; } - Roaring64NavigableMap blockIds = shuffleIdToBlockIds.get(shuffleId); - if (blockIds == null) { + AtomicLong blockCount = shuffleIdToBlockCount.get(shuffleId); + if (blockCount == null) { LOG.warn( - "Unexpected value when getCommittedBlockIds for appId[" + "Unexpected value when getCommittedBlockCount for appId[" + appId + "], shuffleId[" + shuffleId + "]"); - return Roaring64NavigableMap.bitmapOf(); + return 0; } - return blockIds; + return blockCount.get(); } public void removeResources(String appId) { - committedBlockIds.remove(appId); + committedBlockIdCount.remove(appId); } protected void initHadoopConf() { @@ -314,7 +309,7 @@ public Configuration getHadoopConf() { } public void removeResourcesOfShuffleId(String appId, Collection shuffleIds) { - Optional.ofNullable(committedBlockIds.get(appId)) + Optional.ofNullable(committedBlockIdCount.get(appId)) .ifPresent(shuffleIdToBlockIds -> shuffleIds.forEach(shuffleIdToBlockIds::remove)); } diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java index 820c763579..ceda1fcdd4 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java @@ -27,7 +27,6 @@ import com.google.common.collect.Sets; import org.apache.commons.lang3.StringUtils; -import org.roaringbitmap.longlong.Roaring64NavigableMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,7 +53,7 @@ public class ShuffleTaskInfo { private Map commitLocks; /** shuffleId -> blockIds */ - private Map cachedBlockIds; + private Map cachedBlockCount; private AtomicReference user; @@ -91,7 +90,7 @@ public ShuffleTaskInfo(String appId) { this.currentTimes = System.currentTimeMillis(); this.commitCounts = JavaUtils.newConcurrentMap(); this.commitLocks = JavaUtils.newConcurrentMap(); - this.cachedBlockIds = JavaUtils.newConcurrentMap(); + this.cachedBlockCount = JavaUtils.newConcurrentMap(); this.user = new AtomicReference<>(); this.partitionDataSizes = JavaUtils.newConcurrentMap(); this.hugePartitionTags = JavaUtils.newConcurrentMap(); @@ -118,8 +117,8 @@ public Map getCommitLocks() { return commitLocks; } - public Map getCachedBlockIds() { - return cachedBlockIds; + public Map getCachedBlockCount() { + return cachedBlockCount; } public String getUser() { diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java index 33712244b1..5b1ed01dee 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java @@ -70,7 +70,6 @@ import org.apache.uniffle.common.util.Constants; import org.apache.uniffle.common.util.JavaUtils; import org.apache.uniffle.common.util.OutputUtils; -import org.apache.uniffle.common.util.RssUtils; import org.apache.uniffle.common.util.ThreadUtils; import org.apache.uniffle.common.util.UnitConverter; import org.apache.uniffle.server.block.ShuffleBlockIdManager; @@ -269,9 +268,9 @@ public ShuffleTaskManager( CACHED_BLOCK_COUNT, () -> shuffleTaskInfos.values().stream() - .map(ShuffleTaskInfo::getCachedBlockIds) + .map(ShuffleTaskInfo::getCachedBlockCount) .flatMap(map -> map.values().stream()) - .mapToLong(Roaring64NavigableMap::getLongCardinality) + .mapToLong(AtomicLong::get) .sum(), 2 * 60 * 1000L /* 2 minutes */); } @@ -391,8 +390,8 @@ void removeAndReleasePreAllocatedBuffer(long requireBufferId) { public StatusCode commitShuffle(String appId, int shuffleId) throws Exception { long start = System.currentTimeMillis(); refreshAppId(appId); - Roaring64NavigableMap cachedBlockIds = getCachedBlockIds(appId, shuffleId); - Roaring64NavigableMap cloneBlockIds; + long cachedBlockCount = getCachedBlockCount(appId, shuffleId); + ShuffleTaskInfo shuffleTaskInfo = shuffleTaskInfos.computeIfAbsent(appId, x -> new ShuffleTaskInfo(appId)); Object lock = shuffleTaskInfo.getCommitLocks().computeIfAbsent(shuffleId, x -> new Object()); @@ -401,27 +400,16 @@ public StatusCode commitShuffle(String appId, int shuffleId) throws Exception { if (System.currentTimeMillis() - start > commitTimeout) { throw new RssException("Shuffle data commit timeout for " + commitTimeout + " ms"); } - synchronized (cachedBlockIds) { - cloneBlockIds = RssUtils.cloneBitMap(cachedBlockIds); - } - long expectedCommitted = cloneBlockIds.getLongCardinality(); + long expectedCommitted = cachedBlockCount; shuffleBufferManager.commitShuffleTask(appId, shuffleId); - Roaring64NavigableMap committedBlockIds; - Roaring64NavigableMap cloneCommittedBlockIds; long checkInterval = 1000L; - while (true) { - committedBlockIds = shuffleFlushManager.getCommittedBlockIds(appId, shuffleId); - synchronized (committedBlockIds) { - cloneCommittedBlockIds = RssUtils.cloneBitMap(committedBlockIds); - } - cloneBlockIds.andNot(cloneCommittedBlockIds); - if (cloneBlockIds.isEmpty()) { - break; - } + long remain = expectedCommitted; + while (remain > 0) { Thread.sleep(checkInterval); if (System.currentTimeMillis() - start > commitTimeout) { throw new RssException("Shuffle data commit timeout for " + commitTimeout + " ms"); } + remain = expectedCommitted - shuffleFlushManager.getCommittedBlockCount(appId, shuffleId); LOG.info( "Checking commit result for appId[" + appId @@ -430,7 +418,7 @@ public StatusCode commitShuffle(String appId, int shuffleId) throws Exception { + "], expect committed[" + expectedCommitted + "], remain[" - + cloneBlockIds.getLongCardinality() + + remain + "]"); checkInterval = Math.min(checkInterval * 2, commitCheckIntervalMax); } @@ -494,50 +482,41 @@ public void updateCachedBlockIds( } ShuffleTaskInfo shuffleTaskInfo = shuffleTaskInfos.computeIfAbsent(appId, x -> new ShuffleTaskInfo(appId)); - long size = 0L; - // With memory storage type should never need cachedBlockIds, + int blockCount = spbs.length - shufflePartitionedData.getDuplicateBlockCount(); + // With memory storage type should never need cachedBlockCount, // since client do not need call finish shuffle rpc if (!storageTypeWithMemory) { - Roaring64NavigableMap bitmap = - shuffleTaskInfo - .getCachedBlockIds() - .computeIfAbsent(shuffleId, x -> Roaring64NavigableMap.bitmapOf()); - - synchronized (bitmap) { - for (ShufflePartitionedBlock spb : spbs) { - bitmap.addLong(spb.getBlockId()); - size += spb.getEncodedLength(); - } - } - } else { - for (ShufflePartitionedBlock spb : spbs) { - size += spb.getEncodedLength(); - } + AtomicLong counter = + shuffleTaskInfo.getCachedBlockCount().computeIfAbsent(shuffleId, x -> new AtomicLong()); + + counter.addAndGet(blockCount); } - int blockCount = spbs.length - shufflePartitionedData.getDuplicateBlockCount(); shuffleBufferManager.addInMemoryBlockCount(blockCount); shuffleTaskInfo.addInMemoryBlockCount(blockCount); long partitionSize = shuffleTaskInfo.addPartitionDataSize( - shuffleId, partitionId, size - shufflePartitionedData.getDuplicateBlockSize()); + shuffleId, + partitionId, + shufflePartitionedData.getTotalBlockEncodedLength() + - shufflePartitionedData.getDuplicateBlockSize()); HugePartitionUtils.markHugePartition( shuffleBufferManager, shuffleTaskInfo, shuffleId, partitionId, partitionSize); } - public Roaring64NavigableMap getCachedBlockIds(String appId, int shuffleId) { - Map shuffleIdToBlockIds = - shuffleTaskInfos.getOrDefault(appId, new ShuffleTaskInfo(appId)).getCachedBlockIds(); - Roaring64NavigableMap blockIds = shuffleIdToBlockIds.get(shuffleId); - if (blockIds == null) { + public long getCachedBlockCount(String appId, int shuffleId) { + Map shuffleIdToBlockIds = + shuffleTaskInfos.getOrDefault(appId, new ShuffleTaskInfo(appId)).getCachedBlockCount(); + AtomicLong blockCount = shuffleIdToBlockIds.get(shuffleId); + if (blockCount == null) { LOG.warn( - "Unexpected value when getCachedBlockIds for appId[" + "Unexpected value when getCachedBlockCount for appId[" + appId + "], shuffleId[" + shuffleId + "]"); - return Roaring64NavigableMap.bitmapOf(); + return 0L; } - return blockIds; + return blockCount.get(); } public long getPartitionDataSize(String appId, int shuffleId, int partitionId) { @@ -823,7 +802,7 @@ public void removeResourcesByShuffleIds( final ShuffleTaskInfo taskInfo = shuffleTaskInfos.get(appId); if (taskInfo != null) { for (Integer shuffleId : shuffleIds) { - taskInfo.getCachedBlockIds().remove(shuffleId); + taskInfo.getCachedBlockCount().remove(shuffleId); taskInfo.getCommitCounts().remove(shuffleId); taskInfo.getCommitLocks().remove(shuffleId); } diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerOnKerberizedHadoopTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerOnKerberizedHadoopTest.java index 76d06cb2cc..b4686b0d6e 100644 --- a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerOnKerberizedHadoopTest.java +++ b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerOnKerberizedHadoopTest.java @@ -124,8 +124,8 @@ public void clearTest() throws Exception { manager.addToFlushQueue(event2); waitForFlush(manager, appId1, 1, 5); waitForFlush(manager, appId2, 1, 5); - assertEquals(5, manager.getCommittedBlockIds(appId1, 1).getLongCardinality()); - assertEquals(5, manager.getCommittedBlockIds(appId2, 1).getLongCardinality()); + assertEquals(5, manager.getCommittedBlockCount(appId1, 1)); + assertEquals(5, manager.getCommittedBlockCount(appId2, 1)); assertEquals(storageManager.selectStorage(event1), storageManager.selectStorage(event2)); AbstractStorage storage = (AbstractStorage) storageManager.selectStorage(event1); int size = storage.getHandlerSize(); @@ -155,15 +155,15 @@ public void clearTest() throws Exception { assertTrue(kerberizedHadoop.getFileSystem().exists(new Path(remoteStorage.getPath()))); - assertEquals(0, manager.getCommittedBlockIds(appId1, 1).getLongCardinality()); - assertEquals(5, manager.getCommittedBlockIds(appId2, 1).getLongCardinality()); + assertEquals(0, manager.getCommittedBlockCount(appId1, 1)); + assertEquals(5, manager.getCommittedBlockCount(appId2, 1)); size = storage.getHandlerSize(); assertEquals(1, size); manager.removeResources(appId2); assertTrue(((HadoopStorageManager) storageManager).getAppIdToStorages().containsKey(appId2)); storageManager.removeResources(new AppPurgeEvent(appId2, "alex", Arrays.asList(1))); assertFalse(((HadoopStorageManager) storageManager).getAppIdToStorages().containsKey(appId2)); - assertEquals(0, manager.getCommittedBlockIds(appId2, 1).getLongCardinality()); + assertEquals(0, manager.getCommittedBlockCount(appId2, 1)); size = storage.getHandlerSize(); assertEquals(0, size); } diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java index 5d4bffb2a1..ffa67f886a 100644 --- a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java +++ b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java @@ -262,11 +262,11 @@ public void writeTest() throws Exception { waitForFlush(manager, appId, 1, 5); waitForFlush(manager, appId, 2, 10); validate(appId, 1, 1, blocks1, 1, remoteStorage.getPath()); - assertEquals(blocks1.size(), manager.getCommittedBlockIds(appId, 1).getLongCardinality()); + assertEquals(blocks1.size(), manager.getCommittedBlockCount(appId, 1)); blocks21.addAll(blocks22); validate(appId, 2, 2, blocks21, 1, remoteStorage.getPath()); - assertEquals(blocks21.size(), manager.getCommittedBlockIds(appId, 2).getLongCardinality()); + assertEquals(blocks21.size(), manager.getCommittedBlockCount(appId, 2)); assertEquals( 3.0, @@ -330,7 +330,7 @@ public void testCreateWriteHandlerFailed(@TempDir File tmpDir) throws Exception waitForFlush(manager, appId, 1, 5); assertEquals(1, event1.getRetryTimes()); Collection blocks1 = event1.getShuffleBlocks(); - assertEquals(blocks1.size(), manager.getCommittedBlockIds(appId, 1).getLongCardinality()); + assertEquals(blocks1.size(), manager.getCommittedBlockCount(appId, 1)); int maxRetryTimes = 5; shuffleServerConf.set(ShuffleServerConf.SERVER_WRITE_RETRY_MAX, maxRetryTimes); @@ -505,8 +505,8 @@ public void clearTest() throws Exception { waitForFlush(manager, appId1, 1, 5); waitForFlush(manager, appId2, 1, 5); final AbstractStorage storage = (AbstractStorage) storageManager.selectStorage(event1); - assertEquals(5, manager.getCommittedBlockIds(appId1, 1).getLongCardinality()); - assertEquals(5, manager.getCommittedBlockIds(appId2, 1).getLongCardinality()); + assertEquals(5, manager.getCommittedBlockCount(appId1, 1)); + assertEquals(5, manager.getCommittedBlockCount(appId2, 1)); assertEquals(storageManager.selectStorage(event1), storageManager.selectStorage(event2)); int size = storage.getHandlerSize(); assertEquals(2, size); @@ -524,15 +524,15 @@ public void clearTest() throws Exception { // expected exception } - assertEquals(0, manager.getCommittedBlockIds(appId1, 1).getLongCardinality()); - assertEquals(5, manager.getCommittedBlockIds(appId2, 1).getLongCardinality()); + assertEquals(0, manager.getCommittedBlockCount(appId1, 1)); + assertEquals(5, manager.getCommittedBlockCount(appId2, 1)); size = storage.getHandlerSize(); assertEquals(1, size); manager.removeResources(appId2); assertTrue(((HadoopStorageManager) storageManager).getAppIdToStorages().containsKey(appId2)); storageManager.removeResources(new AppPurgeEvent(appId2, StringUtils.EMPTY, Arrays.asList(1))); assertFalse(((HadoopStorageManager) storageManager).getAppIdToStorages().containsKey(appId2)); - assertEquals(0, manager.getCommittedBlockIds(appId2, 1).getLongCardinality()); + assertEquals(0, manager.getCommittedBlockCount(appId2, 1)); size = storage.getHandlerSize(); assertEquals(0, size); // fs create a remoteStorage for appId2 before remove resources, @@ -580,10 +580,10 @@ public void clearLocalTest(@TempDir File tempDir) throws Exception { waitForFlush(manager, appId2, 1, 5); waitForFlush(manager, appId2, 2, 5); waitForFlush(manager, appId2, 11, 5); - assertEquals(5, manager.getCommittedBlockIds(appId1, 1).getLongCardinality()); - assertEquals(5, manager.getCommittedBlockIds(appId2, 1).getLongCardinality()); - assertEquals(5, manager.getCommittedBlockIds(appId2, 2).getLongCardinality()); - assertEquals(5, manager.getCommittedBlockIds(appId2, 11).getLongCardinality()); + assertEquals(5, manager.getCommittedBlockCount(appId1, 1)); + assertEquals(5, manager.getCommittedBlockCount(appId2, 1)); + assertEquals(5, manager.getCommittedBlockCount(appId2, 2)); + assertEquals(5, manager.getCommittedBlockCount(appId2, 11)); assertEquals(2, storage.getHandlerSize()); File file = new File(tempDir, appId1); assertTrue(file.exists()); @@ -600,9 +600,9 @@ public void clearLocalTest(@TempDir File tempDir) throws Exception { ShuffleDataReadEvent shuffleReadEvent = new ShuffleDataReadEvent(appId2, 1, 0, 0); assertNotNull(storageManager.selectStorage(shuffleReadEvent)); - assertEquals(0, manager.getCommittedBlockIds(appId1, 1).getLongCardinality()); - assertEquals(5, manager.getCommittedBlockIds(appId2, 1).getLongCardinality()); - assertEquals(5, manager.getCommittedBlockIds(appId2, 2).getLongCardinality()); + assertEquals(0, manager.getCommittedBlockCount(appId1, 1)); + assertEquals(5, manager.getCommittedBlockCount(appId2, 1)); + assertEquals(5, manager.getCommittedBlockCount(appId2, 2)); assertEquals(1, storage.getHandlerSize()); manager.removeResources(appId2); storageManager.removeResources( @@ -618,7 +618,7 @@ public void clearLocalTest(@TempDir File tempDir) throws Exception { new ShufflePurgeEvent(appId2, StringUtils.EMPTY, Lists.newArrayList(2))); storageManager.removeResources( new AppPurgeEvent(appId2, StringUtils.EMPTY, Lists.newArrayList(1))); - assertEquals(0, manager.getCommittedBlockIds(appId2, 1).getLongCardinality()); + assertEquals(0, manager.getCommittedBlockCount(appId2, 1)); assertEquals(0, storage.getHandlerSize()); assertEquals( 0, ((LocalStorageManager) storageManager).getSortedPartitionsOfStorageMap().size()); @@ -666,7 +666,7 @@ public static void waitForFlush( fail("Unexpected flush process"); } retry++; - size = manager.getCommittedBlockIds(appId, shuffleId).getIntCardinality(); + size = (int) manager.getCommittedBlockCount(appId, shuffleId); } while (size < expectedBlockNum); } diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java index 3356cca5ae..085efb41f3 100644 --- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java +++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java @@ -421,8 +421,7 @@ public void writeProcessTest() throws Exception { shuffleTaskManager.removeAndReleasePreAllocatedBuffer(bufferId); ShuffleFlushManager shuffleFlushManager = shuffleServer.getShuffleFlushManager(); - assertEquals( - 1, shuffleFlushManager.getCommittedBlockIds(appId, shuffleId).getLongCardinality()); + assertEquals(1, shuffleFlushManager.getCommittedBlockCount(appId, shuffleId)); // flush for partition 1-1 ShufflePartitionedData partitionedData1 = createPartitionedData(1, 2, 35); @@ -673,8 +672,7 @@ public void clearTest() throws Exception { } // application "clearTest2" was removed according to rss.server.app.expired.withoutHeartbeat assertEquals(Sets.newHashSet("clearTest1"), shuffleTaskManager.getAppIds()); - assertEquals( - 10, shuffleTaskManager.getCachedBlockIds("clearTest1", shuffleId).getLongCardinality()); + assertEquals(10, shuffleTaskManager.getCachedBlockCount("clearTest1", shuffleId)); // register again shuffleTaskManager.registerShuffle( @@ -691,7 +689,7 @@ public void clearTest() throws Exception { // wait resource delete Thread.sleep(3000); assertEquals(Collections.EMPTY_SET, shuffleTaskManager.getAppIds()); - assertTrue(shuffleTaskManager.getCachedBlockIds("clearTest1", shuffleId).isEmpty()); + assertEquals(0, shuffleTaskManager.getCachedBlockCount("clearTest1", shuffleId)); } @Test @@ -742,7 +740,7 @@ public void clearMultiTimesTest() throws Exception { } countDownLatch.await(); assertEquals(Collections.EMPTY_SET, shuffleTaskManager.getAppIds()); - assertTrue(shuffleTaskManager.getCachedBlockIds(appId, shuffleId).isEmpty()); + assertEquals(0, shuffleTaskManager.getCachedBlockCount(appId, shuffleId)); } @Test @@ -1097,8 +1095,7 @@ private void waitForFlush( int retry = 0; while (true) { // remove flushed eventId to test timeout in commit - if (shuffleFlushManager.getCommittedBlockIds(appId, shuffleId).getIntCardinality() - == expectedBlockNum) { + if (shuffleFlushManager.getCommittedBlockCount(appId, shuffleId) == expectedBlockNum) { break; } Thread.sleep(1000); diff --git a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java index f2991b226a..12b901a655 100644 --- a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java +++ b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java @@ -667,8 +667,7 @@ private void waitForFlush( long committedCount = 0; do { try { - committedCount = - shuffleFlushManager.getCommittedBlockIds(appId, shuffleId).getLongCardinality(); + committedCount = shuffleFlushManager.getCommittedBlockCount(appId, shuffleId); } catch (Throwable e) { // ignore ArrayIndexOutOfBoundsException and ConcurrentModificationException LOG.error("Ignored exception.", e);