Skip to content

Commit f7783ad

Browse files
committed
added(feat): excludeColumns option parameter
1 parent 137aa04 commit f7783ad

14 files changed

Lines changed: 210 additions & 36 deletions

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,8 @@ com.spotify.dbeam.options.JdbcExportPipelineOptions:
133133
--nullableArrayItems=<Boolean>
134134
Default: false
135135
Controls whether array items should be nullable, ignored if arrayMode is 'bytes'.
136+
--excludeColumns=<String>
137+
A comma-separated list of columns to be excluded from the export.
136138
```
137139

138140
#### Input Avro schema file

dbeam-core/src/main/java/com/spotify/dbeam/args/JdbcExportArgs.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,11 @@
2222

2323
import com.google.auto.value.AutoValue;
2424
import com.google.common.annotations.VisibleForTesting;
25+
import com.google.common.collect.ImmutableSet;
2526
import java.io.Serializable;
2627
import java.sql.Connection;
2728
import java.time.Duration;
29+
import java.util.Arrays;
2830
import java.util.Optional;
2931
import org.apache.avro.Schema;
3032

@@ -49,6 +51,17 @@ public abstract class JdbcExportArgs implements Serializable {
4951

5052
public abstract Optional<Schema> inputAvroSchema();
5153

54+
public abstract Optional<ImmutableSet<String>> excludedColumns();
55+
56+
public static Optional<ImmutableSet<String>> parseExcludedColumns(
57+
final Optional<String> rawExcludedColumns) {
58+
return rawExcludedColumns.map(
59+
s ->
60+
Arrays.stream(s.split(","))
61+
.map(String::trim)
62+
.collect(ImmutableSet.toImmutableSet()));
63+
}
64+
5265
@AutoValue.Builder
5366
abstract static class Builder {
5467

@@ -68,6 +81,8 @@ abstract static class Builder {
6881

6982
abstract Builder setInputAvroSchema(Optional<Schema> inputAvroSchema);
7083

84+
abstract Builder setExcludedColumns(Optional<ImmutableSet<String>> excludedColumns);
85+
7186
abstract JdbcExportArgs build();
7287
}
7388

@@ -82,6 +97,7 @@ static JdbcExportArgs create(
8297
Optional.empty(),
8398
false,
8499
Duration.ofDays(7),
100+
Optional.empty(),
85101
Optional.empty());
86102
}
87103

@@ -93,7 +109,8 @@ public static JdbcExportArgs create(
93109
final Optional<String> avroDoc,
94110
final Boolean useAvroLogicalTypes,
95111
final Duration exportTimeout,
96-
final Optional<Schema> inputAvroSchema) {
112+
final Optional<Schema> inputAvroSchema,
113+
final Optional<ImmutableSet<String>> excludedColumns) {
97114
return new AutoValue_JdbcExportArgs.Builder()
98115
.setJdbcAvroOptions(jdbcAvroArgs)
99116
.setQueryBuilderArgs(queryBuilderArgs)
@@ -103,6 +120,7 @@ public static JdbcExportArgs create(
103120
.setUseAvroLogicalTypes(useAvroLogicalTypes)
104121
.setExportTimeout(exportTimeout)
105122
.setInputAvroSchema(inputAvroSchema)
123+
.setExcludedColumns(excludedColumns)
106124
.build();
107125
}
108126

dbeam-core/src/main/java/com/spotify/dbeam/args/QueryBuilder.java

Lines changed: 62 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
package com.spotify.dbeam.args;
2222

2323
import com.google.common.collect.ImmutableList;
24+
import com.google.common.collect.ImmutableSet;
2425
import java.io.Serializable;
2526
import java.util.List;
2627
import java.util.Optional;
@@ -117,18 +118,24 @@ public int hashCode() {
117118
private final QueryBase base;
118119
private final List<String> whereConditions;
119120
private final Optional<String> limitStr;
121+
private final Optional<ImmutableSet<String>> excludedColumns;
120122

121123
private QueryBuilder(final QueryBase base) {
122124
this.base = base;
123125
this.limitStr = Optional.empty();
124126
this.whereConditions = ImmutableList.of();
127+
this.excludedColumns = Optional.empty();
125128
}
126129

127130
private QueryBuilder(
128-
final QueryBase base, final List<String> whereConditions, final Optional<String> limitStr) {
131+
final QueryBase base,
132+
final List<String> whereConditions,
133+
final Optional<String> limitStr,
134+
final Optional<ImmutableSet<String>> excludedColumns) {
129135
this.base = base;
130136
this.whereConditions = whereConditions;
131137
this.limitStr = limitStr;
138+
this.excludedColumns = excludedColumns;
132139
}
133140

134141
public static QueryBuilder fromTablename(final String tableName) {
@@ -148,7 +155,22 @@ public QueryBuilder withPartitionCondition(
148155
Stream.of(
149156
createSqlPartitionCondition(partitionColumn, startPointIncl, endPointExcl)))
150157
.collect(Collectors.toList()),
151-
this.limitStr);
158+
this.limitStr,
159+
this.excludedColumns);
160+
}
161+
162+
public QueryBuilder withExcludedColumns(final Optional<ImmutableSet<String>> excludedColumns) {
163+
if (excludedColumns.isPresent() && this.base instanceof UserQueryBase) {
164+
UserQueryBase userQueryBase = (UserQueryBase) this.base;
165+
String newSqlQuery = rebuildSelectClause(userQueryBase.userSqlQuery, excludedColumns.get());
166+
return new QueryBuilder(
167+
new UserQueryBase(newSqlQuery, userQueryBase.selectClause),
168+
this.whereConditions,
169+
this.limitStr,
170+
excludedColumns);
171+
} else {
172+
return new QueryBuilder(this.base, this.whereConditions, this.limitStr, excludedColumns);
173+
}
152174
}
153175

154176
private static String createSqlPartitionCondition(
@@ -171,7 +193,8 @@ public QueryBuilder withParallelizationCondition(
171193
createSqlSplitCondition(
172194
partitionColumn, startPointIncl, endPoint, isEndPointExcl)))
173195
.collect(Collectors.toList()),
174-
this.limitStr);
196+
this.limitStr,
197+
this.excludedColumns);
175198
}
176199

177200
private static String createSqlSplitCondition(
@@ -205,9 +228,38 @@ private static String removeTrailingSymbols(String sqlQuery) {
205228
return sqlQuery.replaceAll(regex, "$1");
206229
}
207230

231+
private static String rebuildSelectClause(
232+
String sqlQuery, ImmutableSet<String> excludedColumns) {
233+
String lowerCaseQuery = sqlQuery.toLowerCase();
234+
int selectIdx = lowerCaseQuery.indexOf("select");
235+
int fromIdx = lowerCaseQuery.indexOf("from");
236+
237+
if (selectIdx == -1 || fromIdx == -1 || selectIdx > fromIdx) {
238+
// Cannot parse, return original query
239+
return sqlQuery;
240+
}
241+
242+
String selectClause = sqlQuery.substring(selectIdx + "select".length(), fromIdx).trim();
243+
String[] columns = selectClause.split(",");
244+
List<String> newColumns =
245+
Stream.of(columns)
246+
.map(String::trim)
247+
.filter(column -> !excludedColumns.contains(column))
248+
.collect(Collectors.toList());
249+
250+
if (newColumns.isEmpty()) {
251+
return "SELECT * " + sqlQuery.substring(fromIdx);
252+
} else {
253+
return "SELECT " + String.join(", ", newColumns) + " " + sqlQuery.substring(fromIdx);
254+
}
255+
}
256+
208257
public QueryBuilder withLimit(long limit) {
209258
return new QueryBuilder(
210-
this.base, this.whereConditions, Optional.of(String.format(" LIMIT %d", limit)));
259+
this.base,
260+
this.whereConditions,
261+
Optional.of(String.format(" LIMIT %d", limit)),
262+
this.excludedColumns);
211263
}
212264

213265
@Override
@@ -248,6 +300,11 @@ public QueryBuilder generateQueryToGetLimitsOfSplitColumn(
248300
"SELECT MIN(%s) as %s, MAX(%s) as %s",
249301
splitColumn, minSplitColumnName, splitColumn, maxSplitColumnName);
250302

251-
return new QueryBuilder(base.withSelect(selectMinMax), this.whereConditions, this.limitStr);
303+
return new QueryBuilder(
304+
base.withSelect(selectMinMax),
305+
this.whereConditions,
306+
this.limitStr,
307+
this.excludedColumns);
252308
}
253309
}
310+

dbeam-core/src/main/java/com/spotify/dbeam/args/QueryBuilderArgs.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import static com.spotify.dbeam.args.ParallelQueryBuilder.queriesForBounds;
2626

2727
import com.google.auto.value.AutoValue;
28+
import com.google.common.collect.ImmutableSet;
2829
import com.google.common.collect.Lists;
2930
import java.io.Serializable;
3031
import java.sql.Connection;
@@ -57,6 +58,8 @@ public abstract class QueryBuilderArgs implements Serializable {
5758

5859
public abstract Optional<Integer> queryParallelism();
5960

61+
public abstract Optional<ImmutableSet<String>> excludedColumns();
62+
6063
public abstract Builder builder();
6164

6265
@AutoValue.Builder
@@ -86,6 +89,8 @@ public abstract static class Builder {
8689

8790
public abstract Builder setQueryParallelism(Optional<Integer> queryParallelism);
8891

92+
public abstract Builder setExcludedColumns(Optional<ImmutableSet<String>> excludedColumns);
93+
8994
public abstract QueryBuilderArgs build();
9095
}
9196

@@ -122,6 +127,9 @@ public String sqlQueryWithLimitOne() {
122127
*/
123128
public List<String> buildQueries(final Connection connection) throws SQLException {
124129
QueryBuilder queryBuilder = this.baseSqlQuery();
130+
if (this.excludedColumns().isPresent()) {
131+
queryBuilder = queryBuilder.withExcludedColumns(this.excludedColumns());
132+
}
125133
if (this.partitionColumn().isPresent() && this.partition().isPresent()) {
126134
queryBuilder =
127135
configurePartitionCondition(

dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroSchema.java

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import static java.sql.Types.VARBINARY;
5151
import static java.sql.Types.VARCHAR;
5252

53+
import com.google.common.collect.ImmutableSet;
5354
import com.spotify.dbeam.args.QueryBuilderArgs;
5455
import com.spotify.dbeam.options.ArrayHandlingMode;
5556
import java.sql.Array;
@@ -95,9 +96,14 @@ public static Schema createSchemaByReadingOneRow(
9596
avroDoc,
9697
useLogicalTypes,
9798
arrayMode,
98-
nullableArrayItems);
99-
LOGGER.info("Schema created successfully. useLogicalTypes={}, arrayMode={}, "
100-
+ "Generated schema: {}", useLogicalTypes, arrayMode, schema.toString());
99+
nullableArrayItems,
100+
queryBuilderArgs.excludedColumns());
101+
LOGGER.info(
102+
"Schema created successfully. useLogicalTypes={}, arrayMode={}, "
103+
+ "Generated schema: {}",
104+
useLogicalTypes,
105+
arrayMode,
106+
schema.toString());
101107
return schema;
102108
}
103109
}
@@ -110,7 +116,8 @@ public static Schema createAvroSchema(
110116
final String avroDoc,
111117
final boolean useLogicalTypes,
112118
final String arrayMode,
113-
final boolean nullableArrayItems)
119+
final boolean nullableArrayItems,
120+
final Optional<ImmutableSet<String>> excludedColumns)
114121
throws SQLException {
115122

116123
final ResultSetMetaData meta = resultSet.getMetaData();
@@ -124,13 +131,18 @@ public static Schema createAvroSchema(
124131
.prop("tableName", tableName)
125132
.prop("connectionUrl", connectionUrl)
126133
.fields();
127-
return createAvroFields(resultSet, builder, useLogicalTypes, arrayMode, nullableArrayItems)
134+
return createAvroFields(
135+
resultSet,
136+
builder,
137+
useLogicalTypes,
138+
arrayMode,
139+
nullableArrayItems,
140+
excludedColumns.orElse(ImmutableSet.of()))
128141
.endRecord();
129142
}
130143

131144
static String getDatabaseTableName(final ResultSetMetaData meta) throws SQLException {
132145
final String defaultTableName = "no_table_name";
133-
134146
for (int i = 1; i <= meta.getColumnCount(); i++) {
135147
String metaTableName = meta.getTableName(i);
136148
if (metaTableName != null && !metaTableName.isEmpty()) {
@@ -145,20 +157,24 @@ private static SchemaBuilder.FieldAssembler<Schema> createAvroFields(
145157
final SchemaBuilder.FieldAssembler<Schema> builder,
146158
final boolean useLogicalTypes,
147159
final String arrayMode,
148-
final boolean nullableArrayItems)
160+
final boolean nullableArrayItems,
161+
final ImmutableSet<String> excludedColumns)
149162
throws SQLException {
150163

151164
ResultSetMetaData meta = resultSet.getMetaData();
152165

153166
for (int i = 1; i <= meta.getColumnCount(); i++) {
154-
155167
final String columnName;
156168
if (meta.getColumnName(i).isEmpty()) {
157169
columnName = meta.getColumnLabel(i);
158170
} else {
159171
columnName = meta.getColumnName(i);
160172
}
161173

174+
if (excludedColumns.contains(columnName)) {
175+
continue;
176+
}
177+
162178
final int columnType = meta.getColumnType(i);
163179
final String typeName = JDBCType.valueOf(columnType).getName();
164180
final String columnClassName = meta.getColumnClassName(i);

dbeam-core/src/main/java/com/spotify/dbeam/options/JdbcExportArgsFactory.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,9 @@ public static JdbcExportArgs fromPipelineOptions(final PipelineOptions options)
8181
Optional.ofNullable(exportOptions.getAvroDoc()),
8282
exportOptions.isUseAvroLogicalTypes(),
8383
Duration.parse(exportOptions.getExportTimeout()),
84-
BeamJdbcAvroSchema.parseOptionalInputAvroSchemaFile(exportOptions.getAvroSchemaFilePath()));
84+
BeamJdbcAvroSchema.parseOptionalInputAvroSchemaFile(exportOptions.getAvroSchemaFilePath()),
85+
JdbcExportArgs.parseExcludedColumns(
86+
Optional.ofNullable(exportOptions.getExcludeColumns())));
8587
}
8688

8789
public static QueryBuilderArgs createQueryArgs(final JdbcExportPipelineOptions options)
@@ -129,6 +131,9 @@ public static QueryBuilderArgs createQueryArgs(final JdbcExportPipelineOptions o
129131
.setPartitionPeriod(partitionPeriod)
130132
.setSplitColumn(splitColumn)
131133
.setQueryParallelism(queryParallelism)
134+
.setExcludedColumns(
135+
JdbcExportArgs.parseExcludedColumns(
136+
Optional.ofNullable(options.getExcludeColumns())))
132137
.build();
133138
}
134139

dbeam-core/src/main/java/com/spotify/dbeam/options/JdbcExportPipelineOptions.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,4 +149,9 @@ public interface JdbcExportPipelineOptions extends DBeamPipelineOptions {
149149
Long getMinRows();
150150

151151
void setMinRows(Long value);
152+
153+
@Description("A comma-separated list of columns to be excluded from the export.")
154+
String getExcludeColumns();
155+
156+
void setExcludeColumns(String value);
152157
}

dbeam-core/src/test/java/com/spotify/dbeam/args/JdbcExportOptionsTest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -368,4 +368,16 @@ public void shouldFailOnNegativeQueryParallelism() throws IOException, ClassNotF
368368
"--connectionUrl=jdbc:postgresql://some_db --table=some_table "
369369
+ "--password=secret --queryParallelism=-5 --splitColumn=id");
370370
}
371+
372+
@Test
373+
public void shouldParseExcludedColumns() throws IOException, ClassNotFoundException {
374+
final JdbcExportArgs options =
375+
optionsFromArgs(
376+
"--connectionUrl=jdbc:postgresql://some_db --table=some_table "
377+
+ "--password=secret --excludeColumns=col1,col2");
378+
379+
Assert.assertEquals(
380+
Optional.of(com.google.common.collect.ImmutableSet.of("col1", "col2")),
381+
options.excludedColumns());
382+
}
371383
}

dbeam-core/src/test/java/com/spotify/dbeam/args/QueryBuilderTest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,34 @@ public void testItGeneratesQueryForLimits() {
210210
Assert.assertEquals(expected, actual);
211211
}
212212

213+
214+
@Test
215+
public void testTableNameWithExcludedColumnsShouldStillUseSelectStar() {
216+
final com.google.common.collect.ImmutableSet<String> excludedColumns =
217+
com.google.common.collect.ImmutableSet.of("col1");
218+
final QueryBuilder wrapper =
219+
QueryBuilder.fromTablename("some_table")
220+
.withExcludedColumns(java.util.Optional.of(excludedColumns));
221+
222+
final String expected = "SELECT * FROM some_table WHERE 1=1";
223+
224+
Assert.assertEquals(expected, wrapper.build());
225+
}
226+
227+
@Test
228+
public void testRawSqlWithExcludedColumnShouldRemoveColumns() {
229+
final com.google.common.collect.ImmutableSet<String> excludedColumns =
230+
com.google.common.collect.ImmutableSet.of("col1");
231+
final QueryBuilder wrapper =
232+
QueryBuilder.fromSqlQuery("SELECT col1, col2 FROM some_table")
233+
.withExcludedColumns(java.util.Optional.of(excludedColumns));
234+
235+
final String expected =
236+
"SELECT * FROM (SELECT col2 FROM some_table) as user_sql_query WHERE 1=1";
237+
238+
Assert.assertEquals(expected, wrapper.build());
239+
}
240+
213241
private void execAndCompare(String rawInput, String expected) {
214242
final String actual = QueryBuilder.fromSqlQuery(rawInput).build();
215243

0 commit comments

Comments
 (0)