1616
1717package io .cdap .plugin .gcs ;
1818
19+ import au .com .bytecode .opencsv .CSVReader ;
20+ import com .esotericsoftware .minlog .Log ;
1921import com .google .api .gax .paging .Page ;
2022import com .google .cloud .storage .Blob ;
2123import com .google .cloud .storage .Storage ;
2224import com .google .cloud .storage .StorageOptions ;
2325import com .google .gson .Gson ;
26+ import com .google .gson .JsonElement ;
2427import com .google .gson .JsonObject ;
2528import io .cdap .e2e .utils .PluginPropertyUtils ;
29+ import io .cdap .plugin .utils .DataFileFormat ;
2630import org .apache .avro .file .DataFileReader ;
2731import org .apache .avro .file .FileReader ;
2832import org .apache .avro .generic .GenericDatumReader ;
2933import org .apache .avro .generic .GenericRecord ;
3034import org .apache .avro .io .DatumReader ;
35+ import org .slf4j .Logger ;
36+ import org .slf4j .LoggerFactory ;
3137
38+ import java .io .BufferedReader ;
3239import java .io .File ;
3340import java .io .IOException ;
3441import java .nio .charset .StandardCharsets ;
4451 */
4552public class GCSValidationHelper {
4653 private static final String avroFilePath = PluginPropertyUtils .pluginProp ("gcsAvroExpectedFilePath" );
47- private static final String projectId = PluginPropertyUtils .pluginProp ("projectId" );
54+ private static final String csvFilePath = PluginPropertyUtils .pluginProp ("gcsCsvExpectedFilePath" );
55+ private static final String jsonFilePath = PluginPropertyUtils .pluginProp ("gcsMultipleFilesRegexFilePath" );
4856 private static final Gson gson = new Gson ();
57+ private static final Logger LOG = LoggerFactory .getLogger (GCSValidationHelper .class );
58+
59+ /**
60+ * Validates data in a Google Cloud Storage (GCS) bucket against expected JSON content.
61+ *
62+ * @param bucketName The name of the GCS bucket to validate.
63+ * @return true if the GCS bucket's content matches the expected JSON data, false otherwise.
64+ */
65+ public static boolean validateGCSSourceToGCSSinkWithJsonFormat (String bucketName ) {
66+ Map <String , JsonObject > expectedTextJsonData = new HashMap <>();
67+ getFileData (jsonFilePath , expectedTextJsonData );
68+ Map <String , JsonObject > actualGcsCsvData = listBucketObjects (bucketName , DataFileFormat .JSON );
69+ boolean isMatched = actualGcsCsvData .equals (expectedTextJsonData );
70+ return isMatched ;
71+ }
72+
73+ /**
74+ * Validates if the data in a (GCS) bucket matches the expected CSV data in JSON format.
75+ *
76+ * @param bucketName The name of the GCS bucket to validate.
77+ * @return True if the GCS CSV data matches the expected data, false otherwise.
78+ * @throws IOException If an IO error occurs during data retrieval.
79+ */
80+ public static boolean validateGCSSourceToGCSSinkWithCSVFormat (String bucketName ) {
81+ Map <String , JsonObject > expectedCSVData = readCsvFileDataAndConvertToJson (csvFilePath );
82+ Map <String , JsonObject > actualGcsCsvData = listBucketObjects (bucketName , DataFileFormat .CSV );
83+
84+ boolean isMatched = actualGcsCsvData .equals (expectedCSVData );
85+
86+ return isMatched ;
87+ }
4988
5089 /**
5190 * Validates if the data in a (GCS) bucket matches the data
@@ -55,15 +94,19 @@ public class GCSValidationHelper {
5594 * @return True if the GCS data matches the Avro data, false otherwise.
5695 * @throws IOException If an IO error occurs during data retrieval.
5796 */
58- public static boolean validateGCSSourceToGCSSink (String bucketName ) throws IOException {
59- Map <String , JsonObject > expectedAvroData = convertAvroToJsonWithKeys ();
60- Map <String , JsonObject > actualGcsData = listBucketObjects (bucketName );
97+ public static boolean validateGCSSourceToGCSSinkWithAVROFormat (String bucketName ) throws IOException {
98+ Map <String , JsonObject > expectedAvroData = convertAvroToJsonWithKeys (avroFilePath );
99+ Map <String , JsonObject > actualGcsData = listBucketObjects (bucketName , DataFileFormat .JSON );
100+
61101 boolean isMatched = actualGcsData .equals (expectedAvroData );
102+
62103 return isMatched ;
63104 }
64105
65- public static Map <String , JsonObject > listBucketObjects (String bucketName ) {
106+
107+ public static Map <String , JsonObject > listBucketObjects (String bucketName , DataFileFormat dataFormat ) {
66108 Map <String , JsonObject > bucketObjectData = new HashMap <>();
109+ String projectId = PluginPropertyUtils .pluginProp ("projectId" );
67110 Storage storage = StorageOptions .newBuilder ().setProjectId (projectId ).build ().getService ();
68111 Page <Blob > blobs = storage .list (bucketName );
69112
@@ -77,49 +120,106 @@ public static Map<String, JsonObject> listBucketObjects(String bucketName) {
77120 if (!bucketObjectNames .isEmpty ()) {
78121 String objectName = bucketObjectNames .get (0 );
79122 if (objectName .contains ("part-r" )) {
80- Map <String , JsonObject > dataMap = fetchObjectData (projectId , bucketName , objectName );
123+ Map <String , JsonObject > dataMap = fetchObjectData (projectId , bucketName , objectName , dataFormat );
81124 bucketObjectData .putAll (dataMap );
82125 }
83126 }
127+
84128 return bucketObjectData ;
85129 }
86130
87131 /**
88- * Fetches the data of a specific object from a GCS bucket
89- * and converts it to a map of JSON objects.
132+ * Fetches and parses data from a specified object in a GCS bucket.
90133 *
91- * @param projectId The ID of the GCP project.
92- * @param bucketName The name of the GCS bucket containing the object.
93- * @param objectName The name of the object to fetch.
94- * @return A map of object data where keys are IDs and values are JSON objects.
134+ * @param projectId The ID of the GCP project where the GCS bucket is located.
135+ * @param bucketName The name of the GCS bucket containing the object to fetch.
136+ * @param objectName The name of the object to fetch from the GCS bucket.
137+ * @param format The format of the object data (JSON or CSV).
138+ * @return A Map containing the parsed data from the object, with string keys and JSON objects as values.
95139 */
96- private static Map <String , JsonObject > fetchObjectData (String projectId , String bucketName , String objectName ) {
140+ public static Map <String , JsonObject > fetchObjectData (String projectId , String bucketName , String objectName ,
141+ DataFileFormat format ) {
97142 Map <String , JsonObject > dataMap = new HashMap <>();
98143 Storage storage = StorageOptions .newBuilder ().setProjectId (projectId ).build ().getService ();
99144 byte [] objectData = storage .readAllBytes (bucketName , objectName );
100145 String objectDataAsString = new String (objectData , StandardCharsets .UTF_8 );
101146
102- // Splitting using the delimiter as a File can have more than one record.
103- String [] lines = objectDataAsString .split ("\n " );
147+ switch (format ) {
148+ case JSON :
149+ parseDataToJson (objectDataAsString , dataMap );
150+ break ;
151+ case CSV :
152+ parseCsvDataToJson (objectDataAsString , dataMap );
153+ break ;
154+ default :
155+ LOG .error ("Unsupported File Format" );
156+ break ;
157+ }
158+ return dataMap ;
159+ }
160+
161+ private static void parseDataToJson (String data , Map <String , JsonObject > dataMap ) {
162+ String [] lines = data .split ("\n " );
104163 for (String line : lines ) {
105164 JsonObject json = gson .fromJson (line , JsonObject .class );
106165 String id = json .get ("id" ).getAsString ();
107166 dataMap .put (id , json );
108167 }
109- return dataMap ;
168+ }
169+
170+ private static void parseCsvDataToJson (String data , Map <String , JsonObject > dataMap ) {
171+ String [] lines = data .split ("\n " );
172+ String [] headers = lines [0 ].split ("," );
173+
174+ for (int lineCount = 1 ; lineCount < lines .length ; lineCount ++) {
175+ String [] values = lines [lineCount ].split ("," );
176+ JsonObject jsonObject = new JsonObject ();
177+ for (int headerCount = 0 ; headerCount < headers .length ; headerCount ++) {
178+ jsonObject .addProperty (headers [headerCount ], values [headerCount ]);
179+ }
180+ String id = values [0 ];
181+ dataMap .put (id , jsonObject );
182+ }
183+ }
184+
185+ /**
186+ * Converts data from a CSV filePath to a map of JSON objects.
187+ *
188+ * @return A map with identifiers (e.g., ID from the first column) as keys and JSON objects as values.
189+ * @throws IOException If there's an error reading the CSV file.
190+ */
191+ public static Map <String , JsonObject > readCsvFileDataAndConvertToJson (String filePath ) {
192+ Map <String , JsonObject > csvDataMap = new HashMap <>();
193+ try (CSVReader csvReader = new CSVReader (new java .io .FileReader (filePath ))) {
194+ // Read the header line to get column names
195+ String [] headers = csvReader .readNext ();
196+
197+ String [] line ;
198+ while ((line = csvReader .readNext ()) != null ) {
199+ JsonObject jsonObject = new JsonObject ();
200+
201+ for (int j = 0 ; j < headers .length ; j ++) {
202+ jsonObject .addProperty (headers [j ], line [j ]);
203+ }
204+ String id = line [0 ];
205+ csvDataMap .put (id , jsonObject );
206+ }
207+ } catch (IOException e ) {
208+ e .printStackTrace ();
209+ }
210+ return csvDataMap ;
110211 }
111212
112213 /**
113- * Converts Avro files to JSON objects with keys and stores them in a map.
214+ * Converts Avro filePath to JSON objects with keys and stores them in a map.
114215 *
115216 * @return A map of keys to JSON objects representing the Avro data.
116217 * @throws IOException If an IO error occurs during Avro to JSON conversion.
117218 */
118- public static Map <String , JsonObject > convertAvroToJsonWithKeys () throws IOException {
119- File avroFile = new File (avroFilePath );
219+ public static Map <String , JsonObject > convertAvroToJsonWithKeys (String filePath ) throws IOException {
220+ File avroFile = new File (filePath );
120221 DatumReader <GenericRecord > datumReader = new GenericDatumReader <>();
121222 Map <String , JsonObject > avroDataMap = new HashMap <>();
122-
123223 try (FileReader <GenericRecord > dataFileReader = DataFileReader .openReader (avroFile , datumReader )) {
124224 int keyCounter = 1 ;
125225 while (dataFileReader .hasNext ()) {
@@ -132,4 +232,31 @@ public static Map<String, JsonObject> convertAvroToJsonWithKeys() throws IOExcep
132232 }
133233 return avroDataMap ;
134234 }
235+
236+ /**
237+ * Reads data from a JSON file, parses each line into JSON objects, and populates a provided
238+ * map with these objects, using the "ID" field as the key.
239+ *
240+ * @param fileName The name of the JSON file to read.
241+ * @param fileMap A map where parsed JSON objects will be stored with their "ID" field as the key.
242+ */
243+ public static void getFileData (String fileName , Map <String , JsonObject > fileMap ) {
244+ try (BufferedReader br = new BufferedReader (new java .io .FileReader (fileName ))) {
245+ String line ;
246+ while ((line = br .readLine ()) != null ) {
247+ JsonObject json = gson .fromJson (line , JsonObject .class );
248+ if (json .has ("id" )) { // Check if the JSON object has the "id" key
249+ JsonElement idElement = json .get ("id" );
250+ if (idElement .isJsonPrimitive ()) {
251+ String idKey = idElement .getAsString ();
252+ fileMap .put (idKey , json );
253+ } else {
254+ Log .error ("ID key not found" );
255+ }
256+ }
257+ }
258+ } catch (IOException e ) {
259+ System .err .println ("Error reading the file: " + e .getMessage ());
260+ }
261+ }
135262}
0 commit comments