Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions dags/copy_deduplicate.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@

from airflow import models
from airflow.operators.empty import EmptyOperator
from airflow.providers.google.cloud.sensors.bigquery import (
BigQueryTablePartitionExistenceSensor,
)
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.utils.task_group import TaskGroup
from kubernetes.client import models as k8s
Expand Down Expand Up @@ -67,6 +70,36 @@
default_args=default_args,
tags=tags,
) as dag:
# Normally the live tables will contain complete data for the previous day when this DAG runs at 01:00 UTC.
# However, as a safeguard against possible data ingestion delays, we wait until certain live tables contain
# data for the current day as an indication that the live data for the previous day is actually complete.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is true because there are no ordering guarantees so if there's a backlog, data for the current day can (probably will) be written before the previous day is complete. The partition not existing is an indication that we shouldn't run copy_dedupe, but its existence doesn't mean that it's safe to run. So with this in place, I still wouldn't trust that it prevents copy-deduping incomplete data unless there's some additional delay after it succeeds.

Taking into account the actual pipeline latency (dataflow watermark age, pubsub oldest unacked message) would be more robust.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@whd recently mentioned some old tickets that concur with your suggested Pub/Sub metrics approach:

However, this approach is certainly simpler to implement for now using ready-made sensors, and might even still be worth doing in addition to the Pub/Sub metrics approach to cover the scenario if there's an Ingestion Edge or Cloud Logging issue preventing data from being submitted to Pub/Sub in the first place (at least I didn't see any Pub/Sub metrics that would tell us when the latest message was received).

@akkomar / @scholtzan / @BenWu we should come to some consensus on how to proceed:

  1. Proceed with this PR as is.
  2. Proceed with using partition sensors but with an additional delay added after the sensors succeed (in which case we'd either want to reschedule copy_deduplicate to start that much earlier, or move the sensors to a separate DAG that starts that much earlier, so the main copy_deduplicate tasks would normally still start at 01:00).
  3. Don't bother proceeding with this approach, just use the Pub/Sub metrics approach.

Votes/thoughts?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're right that there isn't an easy way to get the most recent pubsub message, but even if we could, that doesn't cover every edge case. To not overcomplicate things, I'm ok with merging this as-is to cover the extreme case of nothing making it to pubsub near the start of the pipeline, then later adding a sensor based on latency metrics.

I think we just need to be clear on what this doesn't do so we don't look at it and assume we're safe from ingestion latency. I'm not sure that this partition sensor will be that useful as-is but I don't see any downsides apart from potential minor confusion

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree in going with option 1. Even though this won't guarantee that there are no issues it's still an improvement to the current state. We can consider replacing it if we implement a better approach in the future.
Might be worth adding a comment as Ben mentioned, what this approach covers and what it does not.

live_tables_to_wait_for = {
# Legacy
"telemetry": ["event_v4", "first_shutdown_v5", "main_v5"],
# Glean
"firefox_desktop": ["events_v1", "metrics_v1"],
"org_mozilla_firefox": ["events_v1", "metrics_v1"],
"org_mozilla_ios_firefox": ["events_v1", "metrics_v1"],
# Glean server
"accounts_backend": ["events_v1"],
}
with TaskGroup("wait_for_complete_live_data"):
live_table_sensors = {
f"{app_name}_live.{live_table}": BigQueryTablePartitionExistenceSensor(
task_id=f"wait_for_complete_{app_name}_live_{live_table}",
project_id="moz-fx-data-shared-prod",
dataset_id=f"{app_name}_live",
table_id=live_table,
partition_id="{{ data_interval_end | ds_nodash }}",
gcp_conn_id="google_cloud_shared_prod",
deferrable=True,
poke_interval=datetime.timedelta(minutes=5),
timeout=datetime.timedelta(hours=4),
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I picked 4 hours as the timeout somewhat arbitrarily, so if there's some reason you feel the timeout should be different I'm happy to discuss.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would time out with notification at 5/9/13 UTC, LGTM.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@akkomar actually sensors don't retry, so it would just time out and fail at 05:00 UTC.

I was initially planning on having it be 8 hours (timing out at 09:00 UTC) but then reconsidered, thinking maybe that was too long.

@scholtzan / @BenWu, do either of you want to weigh in on this as well?

)
for app_name, live_tables in live_tables_to_wait_for.items()
for live_table in live_tables
}

# This single task is responsible for sequentially running copy queries
# over all the tables in _live datasets into _stable datasets except those
# that are specifically used in another DAG.
Expand Down Expand Up @@ -94,6 +127,14 @@
],
container_resources=resources,
)
(
live_table_sensors["firefox_desktop_live.events_v1"],
live_table_sensors["org_mozilla_firefox_live.events_v1"],
live_table_sensors["org_mozilla_firefox_live.metrics_v1"],
live_table_sensors["org_mozilla_ios_firefox_live.events_v1"],
live_table_sensors["org_mozilla_ios_firefox_live.metrics_v1"],
live_table_sensors["accounts_backend_live.events_v1"],
) >> copy_deduplicate_all_base

# temporary test task with no downstream dependencies
copy_deduplicate_glean_v2_backfill = bigquery_etl_copy_deduplicate(
Expand All @@ -106,6 +147,11 @@
column_removal_backfill_tables=column_removal_backfill_tables_live,
container_resources=resources,
)
# `column_removal_backfill_tables_live` currently only contains Firefox Android tables.
(
live_table_sensors["org_mozilla_firefox_live.events_v1"],
live_table_sensors["org_mozilla_firefox_live.metrics_v1"],
) >> copy_deduplicate_glean_v2_backfill

copy_deduplicate_sliced = bigquery_etl_copy_deduplicate(
task_id="copy_deduplicate_sliced",
Expand All @@ -119,6 +165,7 @@
],
container_resources=resources,
)
live_table_sensors["firefox_desktop_live.metrics_v1"] >> copy_deduplicate_sliced

# EmptyOperator is used instead of a task group to maintain compatibility with downstream sensors
copy_deduplicate_all = EmptyOperator(
Expand Down Expand Up @@ -196,6 +243,7 @@
"akomarzewski@mozilla.com",
],
)
live_table_sensors["telemetry_live.main_v5"] >> copy_deduplicate_main_ping

with TaskGroup("main_ping_external") as main_ping_external:
downstream_dependencies = {
Expand Down Expand Up @@ -241,6 +289,7 @@
parallelism=1,
owner="akomarzewski@mozilla.com",
)
live_table_sensors["telemetry_live.first_shutdown_v5"] >> copy_deduplicate_first_shutdown_ping

with TaskGroup("first_shutdown_ping_external") as first_shutdown_ping_external:
downstream_dependencies = {
Expand Down Expand Up @@ -270,6 +319,7 @@
parallelism=1,
owner="akomarzewski@mozilla.com",
)
live_table_sensors["telemetry_live.event_v4"] >> copy_deduplicate_event_ping

event_events = bigquery_etl_query(
reattach_on_restart=True,
Expand Down