Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.calcite.adapter.arrow;

import org.apache.calcite.avatica.util.ByteString;
import org.apache.calcite.linq4j.Enumerator;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.calcite.util.Util;
Expand Down Expand Up @@ -95,7 +96,11 @@ private static Object getValue(ValueVector vector, int index) {
(ArrowType.Timestamp) vector.getField().getType();
return toMillis(rawValue, tsType.getUnit());
}
return vector.getObject(index);
final Object value = vector.getObject(index);
if (value instanceof byte[]) {
return new ByteString((byte[]) value);
}
return value;
}

/** Converts a raw timestamp value to milliseconds since epoch.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@

/**
* Enumerator that reads projected Arrow value-vectors directly.
*
* <p>This path is used for identity projections that Gandiva cannot project
* through the existing {@code Projector} path, such as Arrow binary vectors.
* It is not a replacement for Gandiva expression evaluation.
*/
class ArrowDirectEnumerator extends AbstractArrowEnumerator {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this related to the issue being addressed?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is related. The existing projection path goes through Gandiva Projector, but Gandiva does not handle these Arrow binary vectors well in this case.

For this PR, the direct enumerator is only used for simple field projection without filters. It reads the selected Arrow vectors directly instead of asking Gandiva to project them. I added comments to make that scope clearer.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought you want to get rid of Gandiva.
Do you still want to keep this new comment?
Will it be removed later?

Copy link
Copy Markdown
Member Author

@caicancai caicancai Jun 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will do this in another pull request. After merging this pull request, I will pull the latest branch code for modification.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private final Runnable onClose;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ class ArrowEnumerable extends AbstractEnumerable<Object> {
return new ArrowFilterEnumerator(arrowFileReader, fields, filter,
onClose);
}
// No projector and no filter means the query is an identity projection
// that should read selected value-vectors directly.
return new ArrowDirectEnumerator(arrowFileReader, fields, onClose);
} catch (Exception e) {
throw Util.toUnchecked(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ private static RelDataType of(Field field, JavaTypeFactory typeFactory) {
return typeFactory.createSqlType(SqlTypeName.BOOLEAN);
case Utf8:
return typeFactory.createSqlType(SqlTypeName.VARCHAR);
case Binary:
case LargeBinary:
return typeFactory.createSqlType(SqlTypeName.VARBINARY);
case FixedSizeBinary:
return typeFactory.createSqlType(SqlTypeName.BINARY,
((ArrowType.FixedSizeBinary) arrowType).getByteWidth());
case FloatingPoint:
FloatingPointPrecision precision = ((ArrowType.FloatingPoint) arrowType).getPrecision();
switch (precision) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,10 @@ private static RelDataType deduceRowType(Schema schema,
}

private @Nullable Projector makeProjector(ImmutableIntList fields) {
if (containsListField(fields)) {
if (requiresDirectVectorProjection(fields)) {
// Returning null selects ArrowEnumerable's direct vector-read path.
// Use that path for list fields because Gandiva does not support identity
// projection expressions over Arrow List vectors.
// Use that path because Gandiva does not support identity projection
// expressions over Arrow List and binary vectors.
return null;
}

Expand All @@ -223,11 +223,24 @@ private static RelDataType deduceRowType(Schema schema,
}
}

private boolean containsListField(ImmutableIntList fields) {
/** Returns whether selected fields should be projected by reading Arrow
* value-vectors directly rather than by creating a Gandiva projector.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question

*
* <p>CALCITE-7541 extends this direct projection path for Arrow binary vector
* families because Gandiva cannot project them through the existing identity
* projection path. Queries with filters still use Gandiva filters; this direct
* path only applies to no-filter projections.
*/
private boolean requiresDirectVectorProjection(ImmutableIntList fields) {
for (int fieldOrdinal : fields) {
if (schema.getFields().get(fieldOrdinal).getType().getTypeID()
== ArrowType.ArrowTypeID.List) {
switch (schema.getFields().get(fieldOrdinal).getType().getTypeID()) {
case List:
case Binary:
case LargeBinary:
case FixedSizeBinary:
return true;
default:
break;
}
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ static void initializeArrowState(@TempDir Path sharedTempDir)
ArrowDataTest arrowListDataGenerator = new ArrowDataTest();
arrowListDataGenerator.writeArrowListData(listDataLocationFile);

File binaryDataLocationFile = arrowFilesDirectory.resolve("arrowbinary.arrow").toFile();
ArrowDataTest arrowBinaryDataGenerator = new ArrowDataTest();
arrowBinaryDataGenerator.writeArrowBinaryData(binaryDataLocationFile);

arrow = ImmutableMap.of("model", modelFileTarget.toAbsolutePath().toString());
}

Expand All @@ -82,6 +86,24 @@ static void initializeArrowState(@TempDir Path sharedTempDir)
.explainContains(plan);
}

/** Test case for
* <a href="https://issues.apache.org/jira/browse/CALCITE-7541">[CALCITE-7541]
* Support Binary Arrow types in Arrow adapter</a>. */
@Test void testBinaryProject() {
String sql = "select \"binaryField\", \"largeBinaryField\", \"fixedSizeBinaryField\" "
+ "from arrowbinary";
String plan = "PLAN=ArrowToEnumerableConverter\n"
+ " ArrowTableScan(table=[[ARROW, ARROWBINARY]], fields=[[0, 1, 2]])\n\n";
String result = "binaryField=0001; largeBinaryField=0a0b; fixedSizeBinaryField=141516\n"
+ "binaryField=null; largeBinaryField=null; fixedSizeBinaryField=null\n"
+ "binaryField=020304; largeBinaryField=0c0d0e; fixedSizeBinaryField=171819\n";
CalciteAssert.that()
.with(arrow)
.query(sql)
.returns(result)
.explainContains(plan);
}

@Test void testTinyIntProject() {
String sql = "select \"tinyIntField\" from arrowdatatype";
String plan = "PLAN=ArrowToEnumerableConverter\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,19 @@
import org.apache.arrow.vector.DateDayVector;
import org.apache.arrow.vector.DecimalVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.FixedSizeBinaryVector;
import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.FloatingPointVector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.LargeVarBinaryVector;
import org.apache.arrow.vector.SmallIntVector;
import org.apache.arrow.vector.TimeSecVector;
import org.apache.arrow.vector.TimeStampMicroVector;
import org.apache.arrow.vector.TimeStampMilliVector;
import org.apache.arrow.vector.TimeStampNanoVector;
import org.apache.arrow.vector.TimeStampSecVector;
import org.apache.arrow.vector.TinyIntVector;
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.complex.ListVector;
Expand Down Expand Up @@ -179,6 +182,19 @@ private Schema makeArrowSchema() {
return new Schema(childrenBuilder.build(), null);
}

private Schema makeArrowBinarySchema() {
ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder();
FieldType binaryType = FieldType.nullable(new ArrowType.Binary());
FieldType largeBinaryType = FieldType.nullable(new ArrowType.LargeBinary());
FieldType fixedSizeBinaryType =
FieldType.nullable(new ArrowType.FixedSizeBinary(3));

childrenBuilder.add(new Field("binaryField", binaryType, null));
childrenBuilder.add(new Field("largeBinaryField", largeBinaryType, null));
childrenBuilder.add(new Field("fixedSizeBinaryField", fixedSizeBinaryType, null));

return new Schema(childrenBuilder.build(), null);
}

public void writeScottEmpData(Path arrowDataDirectory) throws IOException, SQLException {
List<String> tableNames = ImmutableList.of("EMP", "DEPT", "SALGRADE");
Expand Down Expand Up @@ -265,6 +281,26 @@ public void writeArrowData(File file) throws IOException {
fileOutputStream.close();
}

public void writeArrowBinaryData(File file) throws IOException {
FileOutputStream fileOutputStream = new FileOutputStream(file);
Schema arrowSchema = makeArrowBinarySchema();
VectorSchemaRoot vectorSchemaRoot =
VectorSchemaRoot.create(arrowSchema, new RootAllocator(Integer.MAX_VALUE));
ArrowFileWriter arrowFileWriter =
new ArrowFileWriter(vectorSchemaRoot, null, fileOutputStream.getChannel());

arrowFileWriter.start();
vectorSchemaRoot.setRowCount(3);
binaryField(vectorSchemaRoot.getVector("binaryField"));
largeBinaryField(vectorSchemaRoot.getVector("largeBinaryField"));
fixedSizeBinaryField(vectorSchemaRoot.getVector("fixedSizeBinaryField"));
arrowFileWriter.writeBatch();
arrowFileWriter.end();
arrowFileWriter.close();
fileOutputStream.flush();
fileOutputStream.close();
}

public void writeArrowDataType(File file) throws IOException {
FileOutputStream fileOutputStream = new FileOutputStream(file);
Schema arrowSchema = makeArrowDateTypeSchema();
Expand Down Expand Up @@ -342,7 +378,6 @@ public void writeArrowDataType(File file) throws IOException {
fileOutputStream.close();
}


public void writeArrowListData(File file) throws IOException {
Schema arrowSchema = makeArrowListSchema();
try (RootAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
Expand All @@ -361,6 +396,36 @@ public void writeArrowListData(File file) throws IOException {
}
}

private void binaryField(FieldVector fieldVector) {
VarBinaryVector binaryVector = (VarBinaryVector) fieldVector;
binaryVector.setInitialCapacity(3);
binaryVector.allocateNew();
binaryVector.setSafe(0, new byte[] {0, 1});
binaryVector.setNull(1);
binaryVector.setSafe(2, new byte[] {2, 3, 4});
fieldVector.setValueCount(3);
}

private void largeBinaryField(FieldVector fieldVector) {
LargeVarBinaryVector largeBinaryVector = (LargeVarBinaryVector) fieldVector;
largeBinaryVector.setInitialCapacity(3);
largeBinaryVector.allocateNew();
largeBinaryVector.setSafe(0, new byte[] {10, 11});
largeBinaryVector.setNull(1);
largeBinaryVector.setSafe(2, new byte[] {12, 13, 14});
fieldVector.setValueCount(3);
}

private void fixedSizeBinaryField(FieldVector fieldVector) {
FixedSizeBinaryVector fixedSizeBinaryVector = (FixedSizeBinaryVector) fieldVector;
fixedSizeBinaryVector.setInitialCapacity(3);
fixedSizeBinaryVector.allocateNew();
fixedSizeBinaryVector.setSafe(0, new byte[] {20, 21, 22});
fixedSizeBinaryVector.setNull(1);
fixedSizeBinaryVector.setSafe(2, new byte[] {23, 24, 25});
fieldVector.setValueCount(3);
}

private void tinyIntField(FieldVector fieldVector, int rowCount) {
TinyIntVector tinyIntVector = (TinyIntVector) fieldVector;
tinyIntVector.setInitialCapacity(rowCount);
Expand Down
Loading