diff --git a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java index 1690a49ec4..3b11fd7d61 100644 --- a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java +++ b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java @@ -297,6 +297,11 @@ public ShuffleBlock readShuffleBlockData() { if (shuffleServerInfoList.size() > 1) { LOG.warn(errMsg); clientReadHandler.updateConsumedBlockInfo(bs, true); + if (decompressionWorker != null) { + decompressionWorker.get(batchIndex - 1, segmentIndex++); + } else { + segmentIndex += 1; + } continue; } else { throw new RssFetchFailedException(errMsg); diff --git a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java index a37f0193b0..ca29d4d7d5 100644 --- a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java +++ b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java @@ -18,6 +18,7 @@ package org.apache.uniffle.client.impl; import java.nio.ByteBuffer; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Random; @@ -38,7 +39,6 @@ import org.apache.uniffle.client.TestUtils; import org.apache.uniffle.client.factory.ShuffleClientFactory; -import org.apache.uniffle.client.response.ShuffleBlock; import org.apache.uniffle.common.ClientType; import org.apache.uniffle.common.ShufflePartitionedBlock; import org.apache.uniffle.common.ShuffleServerInfo; @@ -59,6 +59,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; public class ShuffleReadClientImplTest extends HadoopTestBase { @@ -371,11 +372,14 @@ public void readTest8(Supplier builderSu String basePath = uniq(HDFS_URI + "clientReadTest8"); HadoopShuffleWriteHandler writeHandler = new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(), conf); + HadoopShuffleWriteHandler writeHandler2 = + new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi2.getId(), conf); - Map expectedData = Maps.newHashMap(); + LinkedHashMap expectedData = Maps.newLinkedHashMap(); Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf(); Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0); writeTestData(writeHandler, 2, 30, 0, 0, expectedData, blockIdBitmap); + writeTestData(writeHandler2, 2, 30, 0, 0, expectedData, blockIdBitmap); ShuffleReadClientImpl readClient = builderSupplier .get() @@ -396,8 +400,19 @@ public void readTest8(Supplier builderSu .shuffleServerInfoList(Lists.newArrayList(ssi1, ssi2)) .build(); // crc32 is incorrect + AtomicInteger readCount = new AtomicInteger(0); try (MockedStatic checksumUtilsMock = Mockito.mockStatic(ChecksumUtils.class)) { - checksumUtilsMock.when(() -> ChecksumUtils.getCrc32((ByteBuffer) any())).thenReturn(-1L); + checksumUtilsMock + .when(() -> ChecksumUtils.getCrc32(any(ByteBuffer.class), anyInt(), anyInt())) + .then( + invocation -> { + // crc check fails for readClient1 and frist block of readClient2 + if (readCount.getAndIncrement() < 2) { + return -1; + } else { + return invocation.callRealMethod(); + } + }); try { ByteBuffer bb = readClient.readShuffleBlockData().getByteBuffer(); while (bb != null) { @@ -408,8 +423,15 @@ public void readTest8(Supplier builderSu assertTrue(e.getMessage().startsWith("Unexpected crc value"), e.getMessage()); } - ShuffleBlock block = readClient2.readShuffleBlockData(); - assertNull(block); + // the frist block has been skipped due to crc check failure + Long firstKey = expectedData.keySet().iterator().next(); + expectedData.remove(firstKey); + TestUtils.validateResult(readClient2, expectedData); + try { + readClient2.checkProcessedBlockIds(); + } catch (Exception e) { + assertTrue(e.getMessage().contains("expected 4 blocks, actual 3 blocks"), e.getMessage()); + } } readClient.close(); readClient2.close();