Skip to content

Commit 45cf9ef

Browse files
authored
Removing inMemory Pipes (#1171)
* removing inmemory pipes * moving to ent * moving to ent * access change
1 parent c5608ca commit 45cf9ef

File tree

14 files changed

+15
-690
lines changed

14 files changed

+15
-690
lines changed

common/client/src/main/java/zingg/common/client/pipe/Pipe.java

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ public class Pipe<D,R,C> implements Serializable{ // St:StructType, Sv:SaveMode
3333
public static final String FORMAT_ELASTIC = "org.elasticsearch.spark.sql";
3434
public static final String FORMAT_EXASOL = "com.exasol.spark";
3535
public static final String FORMAT_BIGQUERY = "bigquery";
36-
public static final String FORMAT_INMEMORY = "inMemory";
3736

3837
String name;
3938
String format;
@@ -44,25 +43,18 @@ public class Pipe<D,R,C> implements Serializable{ // St:StructType, Sv:SaveMode
4443
String schema;
4544
String mode;
4645

47-
48-
49-
50-
5146
public String getSchema() {
5247
return schema;
5348
}
5449

55-
5650
public void setSchema(String schema) {
5751
this.schema = schema;
5852
}
5953

60-
6154
public String getName() {
6255
return name;
6356
}
6457

65-
6658
@JsonValue
6759
public void setName(String name) {
6860
this.name = name;
@@ -76,7 +68,6 @@ public String getFormat() {
7668
public void setFormat(String sinkType) {
7769
this.format = sinkType;
7870
}
79-
8071

8172
@JsonValue
8273
public void setProps(Map<String, String> props) {
@@ -102,23 +93,18 @@ public String get(String key) {
10293
return props.get(key);
10394
}
10495

105-
10696
public String getPreprocessors() {
10797
return preprocessors;
10898
}
10999

110-
111100
public void setPreprocessors(String preprocessors) {
112101
this.preprocessors = preprocessors;
113102
}
114103

115-
116-
117104
public int getId() {
118105
return id;
119106
}
120107

121-
122108
public void setId(int recId) {
123109
this.id = recId;
124110
}

common/client/src/main/java/zingg/common/client/util/PipeUtil.java

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import zingg.common.client.ZFrame;
1010
import zingg.common.client.ZinggClientException;
1111
import zingg.common.client.pipe.FilePipe;
12-
//import zingg.common.client.pipe.InMemoryPipe;
1312
import zingg.common.client.pipe.Pipe;
1413

1514
//import com.datastax.spark.connector.cql.*;
@@ -55,20 +54,18 @@ protected ZFrame<D,R,C> read(DFReader<D,R,C> reader, Pipe<D,R,C> p, boolean add
5554
LOG.warn("Reading " + p);
5655
try {
5756

58-
if (p.getFormat().equals(Pipe.FORMAT_INMEMORY)) {
59-
input = p.getDataset(); //.df();
60-
}
61-
else {
57+
6258
if (p.getProps().containsKey(FilePipe.LOCATION)) {
6359
input = reader.load(p.get(FilePipe.LOCATION));
6460
}
6561
else {
6662
input = reader.load();
6763
}
68-
}
64+
6965
if (addSource) {
7066
input = input.withColumn(ColName.SOURCE_COL, p.getName());
7167
}
68+
7269
p.setDataset(input);
7370
} catch (Exception ex) {
7471
LOG.warn(ex.getMessage());
@@ -195,10 +192,6 @@ public void write(ZFrame<D,R,C> toWriteOrig,
195192

196193
LOG.warn("Writing output " + p);
197194

198-
if (p.getFormat().equals(Pipe.FORMAT_INMEMORY)) {
199-
p.setDataset(toWriteOrig);
200-
return;
201-
}
202195
//SparkPipe sPipe = (SparkPipe) p;
203196
if (p.getMode() != null) {
204197
writer.setMode(p.getMode()); //SaveMode.valueOf(p.getMode()));

docs/python/markdown/index.md

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,6 @@ Requires **python 3.6+**; **spark 3.5.0.** Otherwise, [`zingg.client.Zingg()`](z
102102
* [`CsvPipe.setDelimiter()`](zingg.md#zingg.pipes.CsvPipe.setDelimiter)
103103
* [`CsvPipe.setHeader()`](zingg.md#zingg.pipes.CsvPipe.setHeader)
104104
* [`CsvPipe.setLocation()`](zingg.md#zingg.pipes.CsvPipe.setLocation)
105-
* [`InMemoryPipe`](zingg.md#zingg.pipes.InMemoryPipe)
106-
* [`InMemoryPipe.getDataset()`](zingg.md#zingg.pipes.InMemoryPipe.getDataset)
107-
* [`InMemoryPipe.setDataset()`](zingg.md#zingg.pipes.InMemoryPipe.setDataset)
108105
* [`Pipe`](zingg.md#zingg.pipes.Pipe)
109106
* [`Pipe.addProperty()`](zingg.md#zingg.pipes.Pipe.addProperty)
110107
* [`Pipe.getPipe()`](zingg.md#zingg.pipes.Pipe.getPipe)

docs/python/markdown/zingg.md

Lines changed: 1 addition & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -643,32 +643,6 @@ Method to set location of pipe
643643
* **Parameters:**
644644
**location** (*String*) – location from where we read data
645645

646-
### *class* zingg.pipes.InMemoryPipe(name, df=None)
647-
648-
Bases: [`Pipe`](#zingg.pipes.Pipe)
649-
650-
Pipe Class for working with InMemory pipeline
651-
652-
* **Parameters:**
653-
* **name** (*String*) – name of the pipe
654-
* **df** (*Dataset* *or* *None*) – provide dataset for this pipe (optional)
655-
656-
#### getDataset()
657-
658-
Method to get Dataset from pipe
659-
660-
* **Returns:**
661-
dataset of the pipe in the format of spark dataset
662-
* **Return type:**
663-
Dataset<Row>
664-
665-
#### setDataset(df)
666-
667-
Method to set DataFrame of the pipe
668-
669-
* **Parameters:**
670-
**df** (*DataFrame*) – pandas or spark dataframe for the pipe
671-
672646
### *class* zingg.pipes.Pipe(name, format)
673647

674648
Bases: `object`
@@ -677,7 +651,7 @@ Pipe class for working with different data-pipelines. Actual pipe def in the arg
677651

678652
* **Parameters:**
679653
* **name** (*String*) – name of the pipe
680-
* **format** (*Format*) – formate of pipe e.g. bigquery,InMemory, etc.
654+
* **format** (*Format*) – formate of pipe e.g. bigquery,csv, etc.
681655

682656
#### addProperty(name, value)
683657

python/zingg/pipes.py

Lines changed: 2 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ class Pipe:
3838
3939
:param name: name of the pipe
4040
:type name: String
41-
:param format: formate of pipe e.g. bigquery,InMemory, etc.
41+
:param format: formate of pipe e.g. bigquery,csv, etc.
4242
:type format: Format
4343
"""
4444

@@ -251,47 +251,4 @@ def setDbTable(self, dbtable):
251251
:param dbtable: provide bucket parameter.
252252
:type dbtable: String
253253
"""
254-
Pipe.addProperty(self, "dbtable", dbtable)
255-
256-
257-
class InMemoryPipe(Pipe):
258-
""" Pipe Class for working with InMemory pipeline
259-
260-
:param name: name of the pipe
261-
:type name: String
262-
:param df: provide dataset for this pipe (optional)
263-
:type df: Dataset or None
264-
"""
265-
266-
def __init__(self, name, df = None):
267-
setupPipes()
268-
Pipe.__init__(self, name, JPipe.FORMAT_INMEMORY)
269-
if (df is not None):
270-
self.setDataset(df)
271-
272-
def setDataset(self, df):
273-
""" Method to set DataFrame of the pipe
274-
275-
:param df: pandas or spark dataframe for the pipe
276-
:type df: DataFrame
277-
"""
278-
if (isinstance(df, pd.DataFrame)):
279-
print('schema of pandas df is ' , Pipe.getPipe(self).getSchema())
280-
if (Pipe.getPipe(self).getSchema() is not None):
281-
ds = getSparkSession().createDataFrame(df, schema=Pipe.getPipe(self).getSchema())
282-
else:
283-
ds = getSparkSession().createDataFrame(df)
284-
285-
Pipe.getPipe(self).setDataset(ds._jdf)
286-
elif (isinstance(df, DataFrame)):
287-
Pipe.getPipe(self).setDataset(df._jdf)
288-
else:
289-
LOG.error(" setDataset(): NUll or Unsupported type: %s", type(df))
290-
291-
def getDataset(self):
292-
""" Method to get Dataset from pipe
293-
294-
:return: dataset of the pipe in the format of spark dataset
295-
:rtype: Dataset<Row>
296-
"""
297-
return Pipe.getPipe(self).getDataset().df()
254+
Pipe.addProperty(self, "dbtable", dbtable)

spark/client/src/main/java/zingg/spark/client/util/SparkDFReader.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414

1515
public class SparkDFReader implements DFReader<Dataset<Row>, Row, Column> {
1616

17-
private SparkSession session;
18-
private DataFrameReader reader;
17+
protected SparkSession session;
18+
protected DataFrameReader reader;
1919

2020
public SparkDFReader(SparkSession s) {
2121
this.session = s;

spark/client/src/main/java/zingg/spark/client/util/SparkDFWriter.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,30 +10,33 @@
1010
import zingg.common.client.util.DFWriter;
1111

1212
public class SparkDFWriter implements DFWriter<Dataset<Row>, Row, Column>{
13-
private DataFrameWriter writer;
13+
14+
protected DataFrameWriter writer;
1415

1516
public SparkDFWriter(ZFrame<Dataset<Row>, Row, Column> toWriteOrig) {
1617
Dataset<Row> toWrite = toWriteOrig.df();
1718
this.writer = toWrite.write();
18-
1919
}
2020

21-
2221
public void setMode(String s) {
2322
this.writer.mode(SaveMode.valueOf(s));
2423

2524
}
25+
2626
public DFWriter<Dataset<Row>, Row, Column> format(String f) {
2727
writer.format(f);
2828
return this;
2929
}
30+
3031
public DFWriter<Dataset<Row>, Row, Column> option(String k, String v) {
3132
writer.option(k,v);
3233
return this;
3334
}
35+
3436
public void save(String location) {
3537
writer.save(location);
3638
}
39+
3740
public void save() {
3841
writer.save();
3942
}

spark/client/src/main/java/zingg/spark/client/util/SparkPipeUtil.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import org.apache.spark.sql.Row;
88

99
import zingg.common.client.ZFrame;
10-
//import zingg.common.client.pipe.InMemoryPipe;
1110
import zingg.common.client.util.DFReader;
1211
import zingg.common.client.util.DFWriter;
1312
import zingg.common.client.util.PipeUtil;

test/InMemPipeDataBricks.py

Lines changed: 0 additions & 52 deletions
This file was deleted.

test/InMemPipeTest.py

Lines changed: 0 additions & 60 deletions
This file was deleted.

0 commit comments

Comments
 (0)