diff --git a/dags/copy_deduplicate.py b/dags/copy_deduplicate.py index 005499dbc..7d33a25b6 100644 --- a/dags/copy_deduplicate.py +++ b/dags/copy_deduplicate.py @@ -3,6 +3,9 @@ from airflow import models from airflow.operators.empty import EmptyOperator +from airflow.providers.google.cloud.sensors.bigquery import ( + BigQueryTablePartitionExistenceSensor, +) from airflow.sensors.external_task import ExternalTaskMarker from airflow.utils.task_group import TaskGroup from kubernetes.client import models as k8s @@ -67,6 +70,36 @@ default_args=default_args, tags=tags, ) as dag: + # Normally the live tables will contain complete data for the previous day when this DAG runs at 01:00 UTC. + # However, as a safeguard against possible data ingestion delays, we wait until certain live tables contain + # data for the current day as an indication that the live data for the previous day is actually complete. + live_tables_to_wait_for = { + # Legacy + "telemetry": ["event_v4", "first_shutdown_v5", "main_v5"], + # Glean + "firefox_desktop": ["events_v1", "metrics_v1"], + "org_mozilla_firefox": ["events_v1", "metrics_v1"], + "org_mozilla_ios_firefox": ["events_v1", "metrics_v1"], + # Glean server + "accounts_backend": ["events_v1"], + } + with TaskGroup("wait_for_complete_live_data"): + live_table_sensors = { + f"{app_name}_live.{live_table}": BigQueryTablePartitionExistenceSensor( + task_id=f"wait_for_complete_{app_name}_live_{live_table}", + project_id="moz-fx-data-shared-prod", + dataset_id=f"{app_name}_live", + table_id=live_table, + partition_id="{{ data_interval_end | ds_nodash }}", + gcp_conn_id="google_cloud_shared_prod", + deferrable=True, + poke_interval=datetime.timedelta(minutes=5), + timeout=datetime.timedelta(hours=4), + ) + for app_name, live_tables in live_tables_to_wait_for.items() + for live_table in live_tables + } + # This single task is responsible for sequentially running copy queries # over all the tables in _live datasets into _stable datasets except those # that are specifically used in another DAG. @@ -94,6 +127,14 @@ ], container_resources=resources, ) + ( + live_table_sensors["firefox_desktop_live.events_v1"], + live_table_sensors["org_mozilla_firefox_live.events_v1"], + live_table_sensors["org_mozilla_firefox_live.metrics_v1"], + live_table_sensors["org_mozilla_ios_firefox_live.events_v1"], + live_table_sensors["org_mozilla_ios_firefox_live.metrics_v1"], + live_table_sensors["accounts_backend_live.events_v1"], + ) >> copy_deduplicate_all_base # temporary test task with no downstream dependencies copy_deduplicate_glean_v2_backfill = bigquery_etl_copy_deduplicate( @@ -106,6 +147,11 @@ column_removal_backfill_tables=column_removal_backfill_tables_live, container_resources=resources, ) + # `column_removal_backfill_tables_live` currently only contains Firefox Android tables. + ( + live_table_sensors["org_mozilla_firefox_live.events_v1"], + live_table_sensors["org_mozilla_firefox_live.metrics_v1"], + ) >> copy_deduplicate_glean_v2_backfill copy_deduplicate_sliced = bigquery_etl_copy_deduplicate( task_id="copy_deduplicate_sliced", @@ -119,6 +165,7 @@ ], container_resources=resources, ) + live_table_sensors["firefox_desktop_live.metrics_v1"] >> copy_deduplicate_sliced # EmptyOperator is used instead of a task group to maintain compatibility with downstream sensors copy_deduplicate_all = EmptyOperator( @@ -196,6 +243,7 @@ "akomarzewski@mozilla.com", ], ) + live_table_sensors["telemetry_live.main_v5"] >> copy_deduplicate_main_ping with TaskGroup("main_ping_external") as main_ping_external: downstream_dependencies = { @@ -241,6 +289,7 @@ parallelism=1, owner="akomarzewski@mozilla.com", ) + live_table_sensors["telemetry_live.first_shutdown_v5"] >> copy_deduplicate_first_shutdown_ping with TaskGroup("first_shutdown_ping_external") as first_shutdown_ping_external: downstream_dependencies = { @@ -270,6 +319,7 @@ parallelism=1, owner="akomarzewski@mozilla.com", ) + live_table_sensors["telemetry_live.event_v4"] >> copy_deduplicate_event_ping event_events = bigquery_etl_query( reattach_on_restart=True,