Skip to content

feat: Add live table partition sensors in copy_deduplicate DAG#2370

Open
sean-rose wants to merge 1 commit intomainfrom
copy_deduplicate-partition-sensors
Open

feat: Add live table partition sensors in copy_deduplicate DAG#2370
sean-rose wants to merge 1 commit intomainfrom
copy_deduplicate-partition-sensors

Conversation

@sean-rose
Copy link
Copy Markdown
Contributor

@sean-rose sean-rose commented Apr 14, 2026

Description

As a safeguard against possible data ingestion delays (example).

Related Tickets & Documents

  • SVCSE-4143: Migrate ingestion-sink to GCPv2
  • DENG-10849: Define SLOs for DE-managed telemetry ingestion services

Comment thread dags/copy_deduplicate.py
gcp_conn_id="google_cloud_shared_prod",
deferrable=True,
poke_interval=datetime.timedelta(minutes=5),
timeout=datetime.timedelta(hours=4),
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I picked 4 hours as the timeout somewhat arbitrarily, so if there's some reason you feel the timeout should be different I'm happy to discuss.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would time out with notification at 5/9/13 UTC, LGTM.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@akkomar actually sensors don't retry, so it would just time out and fail at 05:00 UTC.

I was initially planning on having it be 8 hours (timing out at 09:00 UTC) but then reconsidered, thinking maybe that was too long.

@scholtzan / @BenWu, do either of you want to weigh in on this as well?

Comment thread dags/copy_deduplicate.py
gcp_conn_id="google_cloud_shared_prod",
deferrable=True,
poke_interval=datetime.timedelta(minutes=5),
timeout=datetime.timedelta(hours=4),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would time out with notification at 5/9/13 UTC, LGTM.

Copy link
Copy Markdown
Contributor

@scholtzan scholtzan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks reasonable to me

Comment thread dags/copy_deduplicate.py
) as dag:
# Normally the live tables will contain complete data for the previous day when this DAG runs at 01:00 UTC.
# However, as a safeguard against possible data ingestion delays, we wait until certain live tables contain
# data for the current day as an indication that the live data for the previous day is actually complete.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is true because there are no ordering guarantees so if there's a backlog, data for the current day can (probably will) be written before the previous day is complete. The partition not existing is an indication that we shouldn't run copy_dedupe, but its existence doesn't mean that it's safe to run. So with this in place, I still wouldn't trust that it prevents copy-deduping incomplete data unless there's some additional delay after it succeeds.

Taking into account the actual pipeline latency (dataflow watermark age, pubsub oldest unacked message) would be more robust.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@whd recently mentioned some old tickets that concur with your suggested Pub/Sub metrics approach:

However, this approach is certainly simpler to implement for now using ready-made sensors, and might even still be worth doing in addition to the Pub/Sub metrics approach to cover the scenario if there's an Ingestion Edge or Cloud Logging issue preventing data from being submitted to Pub/Sub in the first place (at least I didn't see any Pub/Sub metrics that would tell us when the latest message was received).

@akkomar / @scholtzan / @BenWu we should come to some consensus on how to proceed:

  1. Proceed with this PR as is.
  2. Proceed with using partition sensors but with an additional delay added after the sensors succeed (in which case we'd either want to reschedule copy_deduplicate to start that much earlier, or move the sensors to a separate DAG that starts that much earlier, so the main copy_deduplicate tasks would normally still start at 01:00).
  3. Don't bother proceeding with this approach, just use the Pub/Sub metrics approach.

Votes/thoughts?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're right that there isn't an easy way to get the most recent pubsub message, but even if we could, that doesn't cover every edge case. To not overcomplicate things, I'm ok with merging this as-is to cover the extreme case of nothing making it to pubsub near the start of the pipeline, then later adding a sensor based on latency metrics.

I think we just need to be clear on what this doesn't do so we don't look at it and assume we're safe from ingestion latency. I'm not sure that this partition sensor will be that useful as-is but I don't see any downsides apart from potential minor confusion

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree in going with option 1. Even though this won't guarantee that there are no issues it's still an improvement to the current state. We can consider replacing it if we implement a better approach in the future.
Might be worth adding a comment as Ben mentioned, what this approach covers and what it does not.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants