diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/AbstractArrowEnumerator.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/AbstractArrowEnumerator.java index 8cc08b990475..e188757b0d2c 100644 --- a/arrow/src/main/java/org/apache/calcite/adapter/arrow/AbstractArrowEnumerator.java +++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/AbstractArrowEnumerator.java @@ -16,6 +16,7 @@ */ package org.apache.calcite.adapter.arrow; +import org.apache.calcite.avatica.util.ByteString; import org.apache.calcite.linq4j.Enumerator; import org.apache.calcite.util.ImmutableIntList; import org.apache.calcite.util.Util; @@ -95,7 +96,11 @@ private static Object getValue(ValueVector vector, int index) { (ArrowType.Timestamp) vector.getField().getType(); return toMillis(rawValue, tsType.getUnit()); } - return vector.getObject(index); + final Object value = vector.getObject(index); + if (value instanceof byte[]) { + return new ByteString((byte[]) value); + } + return value; } /** Converts a raw timestamp value to milliseconds since epoch. diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowDirectEnumerator.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowDirectEnumerator.java index 2ab896f09c9e..0cdec7baeb80 100644 --- a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowDirectEnumerator.java +++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowDirectEnumerator.java @@ -26,6 +26,10 @@ /** * Enumerator that reads projected Arrow value-vectors directly. + * + *

This path is used for identity projections that Gandiva cannot project + * through the existing {@code Projector} path, such as Arrow binary vectors. + * It is not a replacement for Gandiva expression evaluation. */ class ArrowDirectEnumerator extends AbstractArrowEnumerator { private final Runnable onClose; diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowEnumerable.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowEnumerable.java index 735c75c8ed8c..84ed5997aab2 100644 --- a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowEnumerable.java +++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowEnumerable.java @@ -56,6 +56,8 @@ class ArrowEnumerable extends AbstractEnumerable { return new ArrowFilterEnumerator(arrowFileReader, fields, filter, onClose); } + // No projector and no filter means the query is an identity projection + // that should read selected value-vectors directly. return new ArrowDirectEnumerator(arrowFileReader, fields, onClose); } catch (Exception e) { throw Util.toUnchecked(e); diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowFieldTypeFactory.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowFieldTypeFactory.java index 30c738bece84..017caad8662a 100644 --- a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowFieldTypeFactory.java +++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowFieldTypeFactory.java @@ -66,6 +66,12 @@ private static RelDataType of(Field field, JavaTypeFactory typeFactory) { return typeFactory.createSqlType(SqlTypeName.BOOLEAN); case Utf8: return typeFactory.createSqlType(SqlTypeName.VARCHAR); + case Binary: + case LargeBinary: + return typeFactory.createSqlType(SqlTypeName.VARBINARY); + case FixedSizeBinary: + return typeFactory.createSqlType(SqlTypeName.BINARY, + ((ArrowType.FixedSizeBinary) arrowType).getByteWidth()); case FloatingPoint: FloatingPointPrecision precision = ((ArrowType.FloatingPoint) arrowType).getPrecision(); switch (precision) { diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowTable.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowTable.java index 5afb74e51d3a..74438efe2a33 100644 --- a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowTable.java +++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowTable.java @@ -203,10 +203,10 @@ private static RelDataType deduceRowType(Schema schema, } private @Nullable Projector makeProjector(ImmutableIntList fields) { - if (containsListField(fields)) { + if (requiresDirectVectorProjection(fields)) { // Returning null selects ArrowEnumerable's direct vector-read path. - // Use that path for list fields because Gandiva does not support identity - // projection expressions over Arrow List vectors. + // Use that path because Gandiva does not support identity projection + // expressions over Arrow List and binary vectors. return null; } @@ -223,11 +223,24 @@ private static RelDataType deduceRowType(Schema schema, } } - private boolean containsListField(ImmutableIntList fields) { + /** Returns whether selected fields should be projected by reading Arrow + * value-vectors directly rather than by creating a Gandiva projector. + * + *

CALCITE-7541 extends this direct projection path for Arrow binary vector + * families because Gandiva cannot project them through the existing identity + * projection path. Queries with filters still use Gandiva filters; this direct + * path only applies to no-filter projections. + */ + private boolean requiresDirectVectorProjection(ImmutableIntList fields) { for (int fieldOrdinal : fields) { - if (schema.getFields().get(fieldOrdinal).getType().getTypeID() - == ArrowType.ArrowTypeID.List) { + switch (schema.getFields().get(fieldOrdinal).getType().getTypeID()) { + case List: + case Binary: + case LargeBinary: + case FixedSizeBinary: return true; + default: + break; } } return false; diff --git a/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowAdapterDataTypesTest.java b/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowAdapterDataTypesTest.java index 317d4dc26e91..bfd4ee14c280 100644 --- a/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowAdapterDataTypesTest.java +++ b/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowAdapterDataTypesTest.java @@ -65,6 +65,10 @@ static void initializeArrowState(@TempDir Path sharedTempDir) ArrowDataTest arrowListDataGenerator = new ArrowDataTest(); arrowListDataGenerator.writeArrowListData(listDataLocationFile); + File binaryDataLocationFile = arrowFilesDirectory.resolve("arrowbinary.arrow").toFile(); + ArrowDataTest arrowBinaryDataGenerator = new ArrowDataTest(); + arrowBinaryDataGenerator.writeArrowBinaryData(binaryDataLocationFile); + arrow = ImmutableMap.of("model", modelFileTarget.toAbsolutePath().toString()); } @@ -82,6 +86,24 @@ static void initializeArrowState(@TempDir Path sharedTempDir) .explainContains(plan); } + /** Test case for + * [CALCITE-7541] + * Support Binary Arrow types in Arrow adapter. */ + @Test void testBinaryProject() { + String sql = "select \"binaryField\", \"largeBinaryField\", \"fixedSizeBinaryField\" " + + "from arrowbinary"; + String plan = "PLAN=ArrowToEnumerableConverter\n" + + " ArrowTableScan(table=[[ARROW, ARROWBINARY]], fields=[[0, 1, 2]])\n\n"; + String result = "binaryField=0001; largeBinaryField=0a0b; fixedSizeBinaryField=141516\n" + + "binaryField=null; largeBinaryField=null; fixedSizeBinaryField=null\n" + + "binaryField=020304; largeBinaryField=0c0d0e; fixedSizeBinaryField=171819\n"; + CalciteAssert.that() + .with(arrow) + .query(sql) + .returns(result) + .explainContains(plan); + } + @Test void testTinyIntProject() { String sql = "select \"tinyIntField\" from arrowdatatype"; String plan = "PLAN=ArrowToEnumerableConverter\n" diff --git a/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowDataTest.java b/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowDataTest.java index 7fd9f19c5ea2..a53cc1231ad3 100644 --- a/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowDataTest.java +++ b/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowDataTest.java @@ -29,9 +29,11 @@ import org.apache.arrow.vector.DateDayVector; import org.apache.arrow.vector.DecimalVector; import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.FixedSizeBinaryVector; import org.apache.arrow.vector.Float8Vector; import org.apache.arrow.vector.FloatingPointVector; import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.LargeVarBinaryVector; import org.apache.arrow.vector.SmallIntVector; import org.apache.arrow.vector.TimeSecVector; import org.apache.arrow.vector.TimeStampMicroVector; @@ -39,6 +41,7 @@ import org.apache.arrow.vector.TimeStampNanoVector; import org.apache.arrow.vector.TimeStampSecVector; import org.apache.arrow.vector.TinyIntVector; +import org.apache.arrow.vector.VarBinaryVector; import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.complex.ListVector; @@ -179,6 +182,19 @@ private Schema makeArrowSchema() { return new Schema(childrenBuilder.build(), null); } + private Schema makeArrowBinarySchema() { + ImmutableList.Builder childrenBuilder = ImmutableList.builder(); + FieldType binaryType = FieldType.nullable(new ArrowType.Binary()); + FieldType largeBinaryType = FieldType.nullable(new ArrowType.LargeBinary()); + FieldType fixedSizeBinaryType = + FieldType.nullable(new ArrowType.FixedSizeBinary(3)); + + childrenBuilder.add(new Field("binaryField", binaryType, null)); + childrenBuilder.add(new Field("largeBinaryField", largeBinaryType, null)); + childrenBuilder.add(new Field("fixedSizeBinaryField", fixedSizeBinaryType, null)); + + return new Schema(childrenBuilder.build(), null); + } public void writeScottEmpData(Path arrowDataDirectory) throws IOException, SQLException { List tableNames = ImmutableList.of("EMP", "DEPT", "SALGRADE"); @@ -265,6 +281,26 @@ public void writeArrowData(File file) throws IOException { fileOutputStream.close(); } + public void writeArrowBinaryData(File file) throws IOException { + FileOutputStream fileOutputStream = new FileOutputStream(file); + Schema arrowSchema = makeArrowBinarySchema(); + VectorSchemaRoot vectorSchemaRoot = + VectorSchemaRoot.create(arrowSchema, new RootAllocator(Integer.MAX_VALUE)); + ArrowFileWriter arrowFileWriter = + new ArrowFileWriter(vectorSchemaRoot, null, fileOutputStream.getChannel()); + + arrowFileWriter.start(); + vectorSchemaRoot.setRowCount(3); + binaryField(vectorSchemaRoot.getVector("binaryField")); + largeBinaryField(vectorSchemaRoot.getVector("largeBinaryField")); + fixedSizeBinaryField(vectorSchemaRoot.getVector("fixedSizeBinaryField")); + arrowFileWriter.writeBatch(); + arrowFileWriter.end(); + arrowFileWriter.close(); + fileOutputStream.flush(); + fileOutputStream.close(); + } + public void writeArrowDataType(File file) throws IOException { FileOutputStream fileOutputStream = new FileOutputStream(file); Schema arrowSchema = makeArrowDateTypeSchema(); @@ -342,7 +378,6 @@ public void writeArrowDataType(File file) throws IOException { fileOutputStream.close(); } - public void writeArrowListData(File file) throws IOException { Schema arrowSchema = makeArrowListSchema(); try (RootAllocator allocator = new RootAllocator(Integer.MAX_VALUE); @@ -361,6 +396,36 @@ public void writeArrowListData(File file) throws IOException { } } + private void binaryField(FieldVector fieldVector) { + VarBinaryVector binaryVector = (VarBinaryVector) fieldVector; + binaryVector.setInitialCapacity(3); + binaryVector.allocateNew(); + binaryVector.setSafe(0, new byte[] {0, 1}); + binaryVector.setNull(1); + binaryVector.setSafe(2, new byte[] {2, 3, 4}); + fieldVector.setValueCount(3); + } + + private void largeBinaryField(FieldVector fieldVector) { + LargeVarBinaryVector largeBinaryVector = (LargeVarBinaryVector) fieldVector; + largeBinaryVector.setInitialCapacity(3); + largeBinaryVector.allocateNew(); + largeBinaryVector.setSafe(0, new byte[] {10, 11}); + largeBinaryVector.setNull(1); + largeBinaryVector.setSafe(2, new byte[] {12, 13, 14}); + fieldVector.setValueCount(3); + } + + private void fixedSizeBinaryField(FieldVector fieldVector) { + FixedSizeBinaryVector fixedSizeBinaryVector = (FixedSizeBinaryVector) fieldVector; + fixedSizeBinaryVector.setInitialCapacity(3); + fixedSizeBinaryVector.allocateNew(); + fixedSizeBinaryVector.setSafe(0, new byte[] {20, 21, 22}); + fixedSizeBinaryVector.setNull(1); + fixedSizeBinaryVector.setSafe(2, new byte[] {23, 24, 25}); + fieldVector.setValueCount(3); + } + private void tinyIntField(FieldVector fieldVector, int rowCount) { TinyIntVector tinyIntVector = (TinyIntVector) fieldVector; tinyIntVector.setInitialCapacity(rowCount);