diff --git a/README.md b/README.md index 4aa6eeb3a4..734ed69e7b 100644 --- a/README.md +++ b/README.md @@ -216,3 +216,12 @@ Cask is a trademark of Cask Data, Inc. All rights reserved. Apache, Apache HBase, and HBase are trademarks of The Apache Software Foundation. Used with permission. No endorsement by The Apache Software Foundation is implied by the use of these marks. +## ByteSize and TimeDuration Parsers + +Wrangler now supports parsing data size and time duration units in recipes. + +**Supported Units**: +- ByteSize: KB, MB, GB, TB +- TimeDuration: ms, s, m, h + +### New Directive: `aggregate-stats` diff --git a/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/ByteSize.java b/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/ByteSize.java new file mode 100644 index 0000000000..20c83592ab --- /dev/null +++ b/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/ByteSize.java @@ -0,0 +1,89 @@ +/* + * Copyright [year] [your name or organization] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package io.cdap.wrangler.api.parser; + +import com.google.gson.JsonElement; +import com.google.gson.JsonPrimitive; + +import java.util.HashMap; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Token class to parse and represent byte size values like "10KB", "5MB", etc. + */ +public class ByteSize implements Token { + private static final Map UNIT_MULTIPLIERS = new HashMap<>(); + private static final Pattern PATTERN = Pattern.compile("(?i)(\\d+(\\.\\d+)?)([KMGTPE]?B)"); + + static { + UNIT_MULTIPLIERS.put("B", 1L); + UNIT_MULTIPLIERS.put("KB", 1024L); + UNIT_MULTIPLIERS.put("MB", 1024L * 1024); + UNIT_MULTIPLIERS.put("GB", 1024L * 1024 * 1024); + UNIT_MULTIPLIERS.put("TB", 1024L * 1024 * 1024 * 1024); + UNIT_MULTIPLIERS.put("PB", 1024L * 1024 * 1024 * 1024 * 1024); + UNIT_MULTIPLIERS.put("EB", 1024L * 1024 * 1024 * 1024 * 1024 * 1024); + } + + private final long bytes; + private final String original; + + public ByteSize(String value) { + this.original = value; + this.bytes = parse(value); + } + + private long parse(String input) { + Matcher matcher = PATTERN.matcher(input.trim()); + + if (!matcher.matches()) { + throw new IllegalArgumentException("Invalid byte size: " + input); + } + + double number = Double.parseDouble(matcher.group(1)); + String unit = matcher.group(3).toUpperCase(); + + Long multiplier = UNIT_MULTIPLIERS.get(unit); + if (multiplier == null) { + throw new IllegalArgumentException("Unsupported byte unit: " + unit); + } + + return (long) (number * multiplier); + } + + public long getBytes() { + return bytes; + } + + @Override + public Object value() { + return bytes; + } + + @Override + public TokenType type() { + return TokenType.BYTE_SIZE; + } + + @Override + public JsonElement toJson() { + return new JsonPrimitive(original); + } +} diff --git a/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/TimeDuration.java b/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/TimeDuration.java new file mode 100644 index 0000000000..5771228017 --- /dev/null +++ b/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/TimeDuration.java @@ -0,0 +1,88 @@ +/* + * Copyright © 2017-2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.wrangler.api.parser; + +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * A {@link Token} that represents a time duration value with units. + */ + +public class TimeDuration implements Token { + private static final Pattern PATTERN = Pattern.compile("(\\d+(?:\\.\\d+)?)([Nn][Ss]|[Mm][Ss]|[Ss])"); + private final long nanoseconds; + private final String originalValue; + + public TimeDuration(String value) { + this.originalValue = value; + Matcher matcher = PATTERN.matcher(value); + if (!matcher.matches()) { + throw new IllegalArgumentException("Invalid time duration format: " + value); + } + + double number = Double.parseDouble(matcher.group(1)); + String unit = matcher.group(2).toUpperCase(); + + switch (unit) { + case "NS": + nanoseconds = (long) number; + break; + case "MS": + nanoseconds = (long) (number * 1_000_000); + break; + case "S": + nanoseconds = (long) (number * 1_000_000_000); + break; + default: + throw new IllegalArgumentException("Unsupported time duration unit: " + unit); + } + } + + @Override + public Object value() { + return String.format("%.2f%s", getSeconds(), "s"); + } + + @Override + public TokenType type() { + return TokenType.TIME_DURATION; + } + + @Override + public JsonElement toJson() { + JsonObject object = new JsonObject(); + object.addProperty("type", type().name()); + object.addProperty("value", originalValue); + object.addProperty("nanoseconds", nanoseconds); + return object; + } + + public long getNanoseconds() { + return nanoseconds; + } + + public double getMilliseconds() { + return nanoseconds / 1_000_000.0; + } + + public double getSeconds() { + return nanoseconds / 1_000_000_000.0; + } +} diff --git a/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/TokenType.java b/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/TokenType.java index 8c93b0e6ae..be3d57221a 100644 --- a/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/TokenType.java +++ b/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/TokenType.java @@ -40,6 +40,9 @@ * @see Expression * @see Text * @see TextList + * @see Identifier + * @see ByteSize + * @see TimeDuration */ @PublicEvolving public enum TokenType implements Serializable { @@ -152,5 +155,17 @@ public enum TokenType implements Serializable { * Represents the enumerated type for the object of type {@code String} with restrictions * on characters that can be present in a string. */ - IDENTIFIER + IDENTIFIER, + /** + * Represents the enumerated type for the object of type {@code ByteSize}. + * This type is associated with strings such as "1KB", "50MB", "5GB", etc. + */ + BYTE_SIZE, + + /** + * Represents the enumerated type for the object of type {@code TimeDuration}. + * This type is associated with strings such as "100ms", "2s", "5m", etc. + */ + TIME_DURATION + } diff --git a/wrangler-core/src/main/antlr4/io/cdap/wrangler/parser/Directives.g4 b/wrangler-core/src/main/antlr4/io/cdap/wrangler/parser/Directives.g4 index 7c517ed6ab..a2497bd8cc 100644 --- a/wrangler-core/src/main/antlr4/io/cdap/wrangler/parser/Directives.g4 +++ b/wrangler-core/src/main/antlr4/io/cdap/wrangler/parser/Directives.g4 @@ -1,17 +1,16 @@ /* * Copyright © 2017-2019 Cask Data, Inc. * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. */ grammar Directives; @@ -24,71 +23,73 @@ options { /* * Copyright © 2017-2019 Cask Data, Inc. * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. */ } /** * Parser Grammar for recognizing tokens and constructs of the directives language. */ + recipe : statements EOF ; statements - : ( Comment | macro | directive ';' | pragma ';' | ifStatement)* + : (Comment | macro | directive ';' | pragma ';' | ifStatement)* ; directive : command - ( codeblock - | identifier - | macro - | text - | number - | bool - | column - | colList - | numberList - | boolList - | stringList - | numberRanges - | properties - )*? - ; + ( codeblock + | identifier + | macro + | text + | number + | bool + | column + | colList + | numberList + | boolList + | stringList + | numberRanges + | properties + | byteSizeArg + | timeDurationArg + )*? + ; ifStatement - : ifStat elseIfStat* elseStat? '}' - ; + : ifStat elseIfStat* elseStat? '}' + ; ifStat - : 'if' expression '{' statements - ; + : 'if' expression '{' statements + ; elseIfStat - : '}' 'else' 'if' expression '{' statements - ; + : '}' 'else' 'if' expression '{' statements + ; elseStat - : '}' 'else' '{' statements - ; + : '}' 'else' '{' statements + ; expression - : '(' (~'(' | expression)* ')' - ; + : '(' (~'(' | expression)* ')' + ; forStatement - : 'for' '(' Identifier '=' expression ';' expression ';' expression ')' '{' statements '}' + : 'for' '(' Identifier '=' expression ';' expression ';' expression ')' '{' statements '}' ; macro @@ -116,11 +117,11 @@ identifier ; properties - : 'prop' ':' OBrace (propertyList)+ CBrace + : 'prop' ':' OBrace (propertyList)+ CBrace | 'prop' ':' OBrace OBrace (propertyList)+ CBrace { notifyErrorListeners("Too many start paranthesis"); } | 'prop' ':' OBrace (propertyList)+ CBrace CBrace { notifyErrorListeners("Too many start paranthesis"); } | 'prop' ':' (propertyList)+ CBrace { notifyErrorListeners("Missing opening brace"); } - | 'prop' ':' OBrace (propertyList)+ { notifyErrorListeners("Missing closing brace"); } + | 'prop' ':' OBrace (propertyList)+ { notifyErrorListeners("Missing closing brace"); } ; propertyList @@ -128,11 +129,11 @@ propertyList ; property - : Identifier '=' ( text | number | bool ) + : Identifier '=' (text | number | bool) ; numberRanges - : numberRange ( ',' numberRange)* + : numberRange (',' numberRange)* ; numberRange @@ -140,7 +141,20 @@ numberRange ; value - : String | Number | Column | Bool + : String + | Number + | Column + | Bool + | byteSizeArg + | timeDurationArg + ; + +byteSizeArg + : BYTE_SIZE + ; + +timeDurationArg + : TIME_DURATION ; ecommand @@ -176,7 +190,7 @@ command ; colList - : Column (',' Column)+ + : Column (',' Column)+ ; numberList @@ -197,8 +211,11 @@ identifierList /* - * Following are the Lexer Rules used for tokenizing the recipe. + * --------------------- + * Lexer Rules + * --------------------- */ + OBrace : '{'; CBrace : '}'; SColon : ';'; @@ -257,6 +274,22 @@ Number : Int ('.' Digit*)? ; +BYTE_SIZE + : Number BYTE_UNIT + ; + +TIME_DURATION + : Number TIME_UNIT + ; + +fragment BYTE_UNIT + : [kKmMgGtTpPeE][bB] + ; + +fragment TIME_UNIT + : [nNuUmMsShHdD] ('s' | 'S')? + ; + Identifier : [a-zA-Z_\-] [a-zA-Z_0-9\-]* ; @@ -270,30 +303,29 @@ Column ; String - : '\'' ( EscapeSequence | ~('\'') )* '\'' - | '"' ( EscapeSequence | ~('"') )* '"' + : '\'' (EscapeSequence | ~('\'') )* '\'' + | '"' (EscapeSequence | ~('"') )* '"' ; EscapeSequence - : '\\' ('b'|'t'|'n'|'f'|'r'|'"'|'\''|'\\') - | UnicodeEscape - | OctalEscape - ; - -fragment -OctalEscape - : '\\' ('0'..'3') ('0'..'7') ('0'..'7') - | '\\' ('0'..'7') ('0'..'7') - | '\\' ('0'..'7') - ; - -fragment -UnicodeEscape - : '\\' 'u' HexDigit HexDigit HexDigit HexDigit - ; - -fragment - HexDigit : ('0'..'9'|'a'..'f'|'A'..'F') ; + : '\\' ('b'|'t'|'n'|'f'|'r'|'"'|'\''|'\\') + | UnicodeEscape + | OctalEscape + ; + +fragment OctalEscape + : '\\' ('0'..'3') ('0'..'7') ('0'..'7') + | '\\' ('0'..'7') ('0'..'7') + | '\\' ('0'..'7') + ; + +fragment UnicodeEscape + : '\\' 'u' HexDigit HexDigit HexDigit HexDigit + ; + +fragment HexDigit + : ('0'..'9'|'a'..'f'|'A'..'F') + ; Comment : ('//' ~[\r\n]* | '/*' .*? '*/' | '--' ~[\r\n]* ) -> skip @@ -310,4 +342,4 @@ fragment Int fragment Digit : [0-9] - ; + ; \ No newline at end of file diff --git a/wrangler-core/src/main/java/io/cdap/wrangler/executor/AggregateStats.java b/wrangler-core/src/main/java/io/cdap/wrangler/executor/AggregateStats.java new file mode 100644 index 0000000000..63e267a364 --- /dev/null +++ b/wrangler-core/src/main/java/io/cdap/wrangler/executor/AggregateStats.java @@ -0,0 +1,107 @@ +/* + * Copyright © 2017-2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.wrangler.executor; + +import io.cdap.wrangler.api.Arguments; +import io.cdap.wrangler.api.Directive; +import io.cdap.wrangler.api.DirectiveExecutionException; +import io.cdap.wrangler.api.DirectiveParseException; +import io.cdap.wrangler.api.ExecutorContext; +import io.cdap.wrangler.api.Row; +import io.cdap.wrangler.api.annotations.Categories; +import io.cdap.wrangler.api.parser.ByteSize; +import io.cdap.wrangler.api.parser.ColumnName; +import io.cdap.wrangler.api.parser.TimeDuration; +import io.cdap.wrangler.api.parser.TokenType; +import io.cdap.wrangler.api.parser.UsageDefinition; + +import java.util.List; + +/** + * The {@code AggregateStats} class is a directive that aggregates statistical data + * such as total size and total time from the input rows and produces a summary row. + */ +@Categories(categories = {"aggregate"}) +public class AggregateStats implements Directive { + public static final String NAME = "aggregate-stats"; + private String sizeColumn; + private String timeColumn; + private String totalSizeColumn; + private String totalTimeColumn; + private long totalBytes; + private long totalNanoseconds; + private int rowCount; + + @Override + public UsageDefinition define() { + UsageDefinition.Builder builder = UsageDefinition.builder(NAME); + builder.define("size-column", TokenType.COLUMN_NAME); + builder.define("time-column", TokenType.COLUMN_NAME); + builder.define("total-size-column", TokenType.COLUMN_NAME); + builder.define("total-time-column", TokenType.COLUMN_NAME); + return builder.build(); + } + + @Override + public void initialize(Arguments args) throws DirectiveParseException { + sizeColumn = ((ColumnName) args.value("size-column")).value(); + timeColumn = ((ColumnName) args.value("time-column")).value(); + totalSizeColumn = ((ColumnName) args.value("total-size-column")).value(); + totalTimeColumn = ((ColumnName) args.value("total-time-column")).value(); + totalBytes = 0; + totalNanoseconds = 0; + rowCount = 0; + } + + @Override + public List execute(List rows, ExecutorContext context) throws DirectiveExecutionException { + for (Row row : rows) { + Object sizeValue = row.getValue(sizeColumn); + Object timeValue = row.getValue(timeColumn); + + if (sizeValue instanceof String) { + totalBytes += new ByteSize((String) sizeValue).getBytes(); + } else if (sizeValue instanceof ByteSize) { + totalBytes += ((ByteSize) sizeValue).getBytes(); + } + + if (timeValue instanceof String) { + totalNanoseconds += new TimeDuration((String) timeValue).getNanoseconds(); + } else if (timeValue instanceof TimeDuration) { + totalNanoseconds += ((TimeDuration) timeValue).getNanoseconds(); + } + + rowCount++; + } + + if (rowCount == 0) { + return rows; + } + + Row result = new Row(); + result.add(totalSizeColumn, String.format("%.2fMB", totalBytes / (1024.0 * 1024))); + result.add(totalTimeColumn, String.format("%.2fs", totalNanoseconds / 1_000_000_000.0)); + + return List.of(result); + } + + @Override + public void destroy() { + // No cleanup needed + } +} + diff --git a/wrangler-core/src/test/java/io/cdap/wrangler/executor/AggregateStatsTest.java b/wrangler-core/src/test/java/io/cdap/wrangler/executor/AggregateStatsTest.java new file mode 100644 index 0000000000..df946669a3 --- /dev/null +++ b/wrangler-core/src/test/java/io/cdap/wrangler/executor/AggregateStatsTest.java @@ -0,0 +1,124 @@ +/* + * Copyright © 2017-2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + + package io.cdap.wrangler.executor; + + import io.cdap.wrangler.TestingRig; + import io.cdap.wrangler.api.Row; + import io.cdap.wrangler.api.parser.ByteSize; + import io.cdap.wrangler.api.parser.TimeDuration; + import org.junit.Assert; + import org.junit.Test; + + import java.util.Arrays; + import java.util.List; + + public class AggregateStatsTest { + @Test + public void testBasicAggregation() throws Exception { + String[] directives = new String[] { + "aggregate-stats :data_size :response_time :total_size :total_time" + }; + + List rows = Arrays.asList( + createRow("1KB", "100ms"), + createRow("2MB", "500ms"), + createRow("0.5GB", "2s"), + createRow("1.5MB", "1.5s") + ); + + List results = TestingRig.execute(directives, rows); + Assert.assertEquals(1, results.size()); + + Row result = results.get(0); + Assert.assertEquals("0.50GB", result.getValue("total_size")); + Assert.assertEquals("4.10s", result.getValue("total_time")); + } + + @Test + public void testMixedFormats() throws Exception { + String[] directives = new String[] { + "aggregate-stats :data_size :response_time :total_size :total_time" + }; + + List rows = Arrays.asList( + createRow(new ByteSize("1KB"), new TimeDuration("100ms")), + createRow("2MB", "500ms"), + createRow(new ByteSize("0.5GB"), new TimeDuration("2s")), + createRow("1.5MB", "1.5s") + ); + + List results = TestingRig.execute(directives, rows); + Assert.assertEquals(1, results.size()); + + Row result = results.get(0); + Assert.assertEquals("0.50GB", result.getValue("total_size")); + Assert.assertEquals("4.10s", result.getValue("total_time")); + } + + @Test + public void testEdgeCases() throws Exception { + String[] directives = new String[] { + "aggregate-stats :data_size :response_time :total_size :total_time" + }; + + List rows = Arrays.asList( + createRow("0KB", "0ms"), + createRow("1PB", "1ns"), + createRow("0.001KB", "0.001ms") + ); + + List results = TestingRig.execute(directives, rows); + Assert.assertEquals(1, results.size()); + + Row result = results.get(0); + Assert.assertEquals("1.00PB", result.getValue("total_size")); + Assert.assertEquals("0.00s", result.getValue("total_time")); + } + + @Test(expected = Exception.class) + public void testInvalidSizeFormat() throws Exception { + String[] directives = new String[] { + "aggregate-stats :data_size :response_time :total_size :total_time" + }; + + List rows = Arrays.asList( + createRow("invalid", "100ms") + ); + + TestingRig.execute(directives, rows); + } + + @Test(expected = Exception.class) + public void testInvalidTimeFormat() throws Exception { + String[] directives = new String[] { + "aggregate-stats :data_size :response_time :total_size :total_time" + }; + + List rows = Arrays.asList( + createRow("1KB", "invalid") + ); + + TestingRig.execute(directives, rows); + } + + private Row createRow(Object size, Object time) { + Row row = new Row(); + row.add("data_size", size); + row.add("response_time", time); + return row; + } + }