|
3 | 3 | import logging |
4 | 4 | import os |
5 | 5 |
|
| 6 | +import boto3 |
6 | 7 | from helpers.data_source_test_helper import DataSourceTestHelper |
7 | 8 | from soda_core.common.logging_constants import soda_logger |
8 | 9 |
|
@@ -55,6 +56,59 @@ def drop_test_schema_if_exists(self) -> str: |
55 | 56 | super().drop_test_schema_if_exists() |
56 | 57 | self.data_source_impl._delete_s3_files(self._get_3_schema_dir()) |
57 | 58 |
|
| 59 | + def drop_schema_if_exists(self, schema: str) -> None: |
| 60 | + """Drop all tables/views via the Glue API, then drop the schema. |
| 61 | +
|
| 62 | + Athena's DROP SCHEMA CASCADE fails on Iceberg tables, and DROP TABLE |
| 63 | + via Athena SQL may fail if S3 data isn't accessible. Using the Glue API |
| 64 | + removes catalog entries without requiring S3 access. |
| 65 | + """ |
| 66 | + try: |
| 67 | + glue_client = self._create_glue_client() |
| 68 | + |
| 69 | + # Delete all tables/views in the database via Glue |
| 70 | + try: |
| 71 | + response = glue_client.get_tables(DatabaseName=schema) |
| 72 | + for table in response.get("TableList", []): |
| 73 | + table_name = table["Name"] |
| 74 | + try: |
| 75 | + glue_client.delete_table(DatabaseName=schema, Name=table_name) |
| 76 | + logger.info(f"Glue: deleted table/view {schema}.{table_name}") |
| 77 | + except Exception as e: |
| 78 | + logger.warning(f"Glue: error deleting table {table_name}: {e}") |
| 79 | + except glue_client.exceptions.EntityNotFoundException: |
| 80 | + logger.info(f"Schema {schema} does not exist in Glue, nothing to drop") |
| 81 | + return |
| 82 | + except Exception as e: |
| 83 | + logger.warning(f"Glue: error listing tables in {schema}: {e}") |
| 84 | + |
| 85 | + # Delete the database itself |
| 86 | + try: |
| 87 | + glue_client.delete_database(Name=schema) |
| 88 | + logger.info(f"Glue: deleted database {schema}") |
| 89 | + except Exception as e: |
| 90 | + logger.warning(f"Glue: error deleting database {schema}: {e}") |
| 91 | + |
| 92 | + # Best-effort S3 cleanup for the schema directory |
| 93 | + try: |
| 94 | + schema_location = self.data_source_impl.table_s3_location(f"{ATHENA_CATALOG}.{schema}", lowercase=False) |
| 95 | + self.data_source_impl._delete_s3_files(schema_location) |
| 96 | + except Exception as e: |
| 97 | + logger.warning(f"S3 cleanup for schema {schema} failed (non-fatal): {e}") |
| 98 | + except Exception as e: |
| 99 | + logger.warning(f"Error dropping test schema {schema}: {e}") |
| 100 | + |
| 101 | + def _create_glue_client(self): |
| 102 | + aws_credentials = self.data_source_impl.connection.aws_credentials |
| 103 | + aws_credentials = aws_credentials.resolve_role("soda_sql_test_cleanup") |
| 104 | + return boto3.client( |
| 105 | + "glue", |
| 106 | + region_name=aws_credentials.region_name, |
| 107 | + aws_access_key_id=aws_credentials.access_key_id, |
| 108 | + aws_secret_access_key=aws_credentials.secret_access_key, |
| 109 | + aws_session_token=aws_credentials.session_token, |
| 110 | + ) |
| 111 | + |
58 | 112 | def drop_schema_if_exists_sql(self, schema: str) -> str: |
59 | 113 | dialect = self.data_source_impl.sql_dialect |
60 | 114 | quoted_catalog = dialect.quote_for_ddl(ATHENA_CATALOG) |
|
0 commit comments