Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,7 @@ releases

# bazel
bazel-*
out/
venv/
.ijwb/
.envrc
33 changes: 31 additions & 2 deletions aggregator/src/test/scala/ai/chronon/aggregator/test/DataGen.scala
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,12 @@ object CStream {
}
}

case class Column(name: String, `type`: DataType, cardinality: Int, chunkSize: Int = 10, nullRate: Double = 0.01) {
case class Column(name: String,
`type`: DataType,
cardinality: Int,
chunkSize: Int = 10,
nullRate: Double = 0.01,
fixedListSize: Boolean = false) {
def genImpl(dtype: DataType, partitionColumn: String, partitionSpec: PartitionSpec): CStream[Any] =
dtype match {
case StringType =>
Expand All @@ -169,7 +174,31 @@ case class Column(name: String, `type`: DataType, cardinality: Int, chunkSize: I
case _ => new LongStream(cardinality, nullRate)
}
case ListType(elementType) =>
genImpl(elementType, partitionColumn, partitionSpec).chunk(chunkSize)
val value = genImpl(elementType, partitionColumn, partitionSpec)
if (fixedListSize) value.chunk(chunkSize, chunkSize) else value.chunk(chunkSize)
case StructType(_, fields) =>
val fieldGens = fields.map(f => genImpl(f.fieldType, partitionColumn, partitionSpec))
new CStream[Any] {
override def next(): Any = {
if (math.random < nullRate) null
else fieldGens.map(_.next()).toArray
}
}
case MapType(keyType, valueType) =>
val keyGen = genImpl(keyType, partitionColumn, partitionSpec)
val valGen = genImpl(valueType, partitionColumn, partitionSpec)
new CStream[Any] {
override def next(): Any = {
if (math.random < nullRate) return null
val size = (math.random * chunkSize).toInt + 1
val map = new java.util.HashMap[Any, Any]()
(0 until size).foreach { _ =>
val k = keyGen.next()
if (k != null) map.put(k, valGen.next())
}
map
}
}
case otherType => throw new UnsupportedOperationException(s"Can't generate random data for $otherType yet.")
}

Expand Down
19 changes: 19 additions & 0 deletions api/py/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Virtual environment
.venv/
venv/

# Build artifacts
dist/
build/
*.egg-info/

# Python cache
__pycache__/
*.py[cod]
*$py.class

# Testing
.pytest_cache/
.coverage
htmlcov/
.tox/
186 changes: 186 additions & 0 deletions api/src/main/scala/ai/chronon/api/Row.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,192 @@ trait SchemaTraverser[SchemaType] {
}

object Row {

/**
* TODO migrate in the rest of the codebase
* This is done in the Netflix fork but wanted to break out the PR
*
* Build a reusable converter function that transforms values according to their DataType.
* Unlike `Row.to` which recreates the conversion logic on every call, this method builds
* the converter once and returns a function that can be applied many times. This allows
* the JIT compiler to optimize the hot path.
*
* @param dataType The Chronon DataType to convert
* @param composer Function to compose struct fields into target StructType
* @param binarizer Function to convert byte arrays to target BinaryType
* @param collector Function to collect array elements into target ListType
* @param mapper Function to convert maps to target MapType
* @param extraneousRecord Optional handler for non-standard record types
* @param schemaTraverser Optional traverser for output schema (used by Avro)
* @return A converter function that transforms values according to the DataType
*/
def buildToConverter[StructType, BinaryType, ListType, MapType, OutputSchema](
dataType: DataType,
composer: (Array[Any], DataType, Option[OutputSchema]) => StructType,
binarizer: Array[Byte] => BinaryType,
collector: (Array[Any], Int) => ListType,
mapper: util.Map[Any, Any] => MapType,
extraneousRecord: Any => Array[Any] = null,
schemaTraverser: Option[SchemaTraverser[OutputSchema]] = None): Any => Any = {

val unguardedFunc: Any => Any = dataType match {
case StructType(_, fields) =>
val fieldConverters = fields.map { field =>
buildToConverter(
field.fieldType,
composer,
binarizer,
collector,
mapper,
extraneousRecord,
schemaTraverser.map(_.getField(field))
)
}
val traverser = schemaTraverser

(value: Any) =>
value match {
case arr: Array[Any] =>
val result = new Array[Any](arr.length)
var i = 0
while (i < arr.length) {
result(i) = fieldConverters(i)(arr(i))
i += 1
}
composer(result, dataType, traverser.map(_.currentNode))
case list: util.ArrayList[Any] =>
val result = new Array[Any](list.size())
var i = 0
while (i < list.size()) {
result(i) = fieldConverters(i)(list.get(i))
i += 1
}
composer(result, dataType, traverser.map(_.currentNode))
case other: Any =>
assert(extraneousRecord != null, s"No handler for $other of class ${other.getClass}")
val arr = extraneousRecord(other)
val result = new Array[Any](arr.length)
var i = 0
while (i < arr.length) {
result(i) = fieldConverters(i)(arr(i))
i += 1
}
composer(result, dataType, traverser.map(_.currentNode))
}

case ListType(elemType) =>
val elemConverter = buildToConverter(
elemType,
composer,
binarizer,
collector,
mapper,
extraneousRecord,
schemaTraverser.map(_.getCollectionType)
)

(value: Any) =>
value match {
case list: util.ArrayList[Any] =>
val result = new Array[Any](list.size())
val it = list.iterator()
var i = 0
while (it.hasNext) {
result(i) = elemConverter(it.next())
i += 1
}
collector(result, list.size())
case arr: Array[_] =>
val result = new Array[Any](arr.length)
var i = 0
while (i < arr.length) {
result(i) = elemConverter(arr(i))
i += 1
}
collector(result, arr.length)
case arr: mutable.WrappedArray[Any] =>
val result = new Array[Any](arr.length)
var i = 0
while (i < arr.length) {
result(i) = elemConverter(arr(i))
i += 1
}
collector(result, arr.length)
}

case MapType(keyType, valueType) =>
val keyConverter = buildToConverter(
keyType,
composer,
binarizer,
collector,
mapper,
extraneousRecord,
schemaTraverser.map(_.getMapKeyType)
)
val valConverter = buildToConverter(
valueType,
composer,
binarizer,
collector,
mapper,
extraneousRecord,
schemaTraverser.map(_.getMapValueType)
)

(value: Any) =>
value match {
case map: util.Map[Any, Any] =>
val newMap = new util.HashMap[Any, Any](map.size())
val iter = map.entrySet().iterator()
while (iter.hasNext) {
val entry = iter.next()
newMap.put(keyConverter(entry.getKey), valConverter(entry.getValue))
}
mapper(newMap)
case map: collection.Map[Any, Any] =>
val newMap = new util.HashMap[Any, Any](map.size)
map.foreach { entry =>
newMap.put(keyConverter(entry._1), valConverter(entry._2))
}
mapper(newMap)
}

case BinaryType =>
(value: Any) => binarizer(value.asInstanceOf[Array[Byte]])

case IntType =>
(value: Any) => value.asInstanceOf[Number].intValue()

case LongType =>
(value: Any) => value.asInstanceOf[Number].longValue()

case DoubleType =>
(value: Any) => value.asInstanceOf[Number].doubleValue()

case FloatType =>
(value: Any) => value.asInstanceOf[Number].floatValue()

case ShortType =>
(value: Any) => value.asInstanceOf[Number].shortValue()

case ByteType =>
(value: Any) => value.asInstanceOf[Number].byteValue()

case BooleanType =>
(value: Any) => value.asInstanceOf[Boolean]

case StringType =>
(value: Any) => value.toString

case _ =>
(value: Any) => value
}

// Guard function handles nulls
(value: Any) => if (value == null) null else unguardedFunc(value)
}

// recursively traverse a logical struct, and convert it chronon's row type
def from[CompositeType, BinaryType, ArrayType, StringType](
value: Any,
Expand Down
Loading