Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import os

import boto3
from helpers.data_source_test_helper import DataSourceTestHelper
from soda_core.common.logging_constants import soda_logger

Expand Down Expand Up @@ -55,6 +56,59 @@ def drop_test_schema_if_exists(self) -> str:
super().drop_test_schema_if_exists()
self.data_source_impl._delete_s3_files(self._get_3_schema_dir())

def drop_schema_if_exists(self, schema: str) -> None:
"""Drop all tables/views via the Glue API, then drop the schema.

Athena's DROP SCHEMA CASCADE fails on Iceberg tables, and DROP TABLE
via Athena SQL may fail if S3 data isn't accessible. Using the Glue API
removes catalog entries without requiring S3 access.
"""
try:
glue_client = self._create_glue_client()

# Delete all tables/views in the database via Glue
try:
response = glue_client.get_tables(DatabaseName=schema)
for table in response.get("TableList", []):
table_name = table["Name"]
try:
glue_client.delete_table(DatabaseName=schema, Name=table_name)
logger.info(f"Glue: deleted table/view {schema}.{table_name}")
except Exception as e:
logger.warning(f"Glue: error deleting table {table_name}: {e}")
except glue_client.exceptions.EntityNotFoundException:
logger.info(f"Schema {schema} does not exist in Glue, nothing to drop")
return
except Exception as e:
logger.warning(f"Glue: error listing tables in {schema}: {e}")

# Delete the database itself
try:
glue_client.delete_database(Name=schema)
logger.info(f"Glue: deleted database {schema}")
except Exception as e:
logger.warning(f"Glue: error deleting database {schema}: {e}")

# Best-effort S3 cleanup for the schema directory
try:
schema_location = self.data_source_impl.table_s3_location(f"{ATHENA_CATALOG}.{schema}", lowercase=False)
self.data_source_impl._delete_s3_files(schema_location)
except Exception as e:
logger.warning(f"S3 cleanup for schema {schema} failed (non-fatal): {e}")
except Exception as e:
logger.warning(f"Error dropping test schema {schema}: {e}")

def _create_glue_client(self):
aws_credentials = self.data_source_impl.connection.aws_credentials
aws_credentials = aws_credentials.resolve_role("soda_sql_test_cleanup")
return boto3.client(
"glue",
region_name=aws_credentials.region_name,
aws_access_key_id=aws_credentials.access_key_id,
aws_secret_access_key=aws_credentials.secret_access_key,
aws_session_token=aws_credentials.session_token,
)

def drop_schema_if_exists_sql(self, schema: str) -> str:
dialect = self.data_source_impl.sql_dialect
quoted_catalog = dialect.quote_for_ddl(ATHENA_CATALOG)
Expand Down
3 changes: 2 additions & 1 deletion soda-tests/tests/integration/test_drop_old_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"bigquery",
"fabric",
"synapse",
# "athena", # Skipping for now, there is a bug whereby we can't delete the schemas on Athena currently.
"athena", # Skipping for now, there is a bug whereby we can't delete the schemas on Athena currently.
"redshift",
"oracle",
"databricks",
Expand Down Expand Up @@ -106,6 +106,7 @@ def test_drop_old_schemas(data_source_test_helper: DataSourceTestHelper):
try:
data_source_test_helper.drop_schema_if_exists(schema_name)
num_deleted_schemas += 1
# Old code, don't just create the SQL, but use the data source test helper to drop the schema.
# data_source_test_helper.data_source_impl.execute_update(
# data_source_test_helper.drop_schema_if_exists_sql(
# schema_name
Expand Down
Loading