Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"python.testing.pytestArgs": [
"tests"
],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true
}
Comment thread
tmikula-dev marked this conversation as resolved.
Outdated
3 changes: 2 additions & 1 deletion conf/access.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@
}
},
"public.cps.za.dlchange": ["FooUser", "BarUser"],
"public.cps.za.test": ["TestUser"]
"public.cps.za.test": ["TestUser"],
"public.cps.za.status_change": ["TestUser"]
}
1 change: 1 addition & 0 deletions conf/config.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"access_config": "s3://<redacted>/access.json",
"topic_keys_config": "conf/topic_keys.json",
"token_provider_url": "https://<redacted>",
"token_public_keys_url": "https://<redacted>",
"kafka_bootstrap_server": "localhost:9092",
Expand Down
3 changes: 3 additions & 0 deletions conf/topic_keys.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"public.cps.za.status_change": "job_id"
}
289 changes: 289 additions & 0 deletions conf/topic_schemas/status_change.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,289 @@
{
"type": "object",
"properties": {
"event_type": {
"type": "string",
"enum": [
"JobCreatedEvent",
"JobCreatedAndStartedEvent",
"JobStartedEvent",
"JobUpdatedEvent",
"JobFinishedEvent"
],
"description": "Lifecycle event type for job status changes."
},
"event_id": {
"type": "string",
"format": "uuid",
"description": "Unique identifier for the event (UUID)"
},
"job_ref": {
"type": [
"string",
"null"
],
"description": "Identifier of the job in it's respective system (e.g. Spark Application Id, Glue Job Id, EMR Step Id, etc)."
},
"tenant_id": {
"type": [
"string",
"null"
],
"description": "Application ID or ServiceNow identifier"
},
Comment thread
coderabbitai[bot] marked this conversation as resolved.
"source_app": {
"type": "string",
"description": " Standardized source application name (aqueduct, unify, lum, etc)"
},
"source_app_version": {
"type": "string",
"description": "Source application version (SemVer preferred)"
},
"environment": {
"type": "string",
"description": "Environment (dev, uat, pre-prod, prod, test or others)"
},
"timestamp_event": {
"type": "integer",

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The runs schema uses "type": "number" for timestamps. Herei s "integer". While epoch milliseconds are integers, the inconsistency may confuse producers.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, number can also be floating point, so integer is the correct choice here. However, your call, rather be consistent but wrong, or eventually fix it in the runs schema as well?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this one more time and I prefer correctness over consistency. Let's fix the other schemas later: #163 - this is hopefully a quick and painless fix, as the number -> integer should be backward compatible

"minimum": 0,
"description": "Timestamp of the event in epoch milliseconds"
},
Comment thread
kevinwallimann marked this conversation as resolved.
"country": {
"type": [
"string",
"null"
],
"description": "The country the data is related to, e.g. za, ke, on-mu, nbc-tz, etc."
},
"job_id": {
"type": "string",
"format": "uuid",
"description": "Primary job identifier (UUID)."
},
"parent_job_id": {
"type": [
"string",
"null"
],
"format": "uuid",
"description": "Optional parent job identifier (UUID), to represent nested job hierarchies."
},
"initial_job_id": {
"type": [
"string",
"null"
],
"format": "uuid",
"description": "Optional initial job identifier (UUID), to represent retried or replayed jobs."
},
"job_group_id": {
"type": [
"string",
"null"
],
"format": "uuid",
"description": "Job group identifier (UUID), may or may not reference a job id."
},
"job_name": {
"type": "string",
"description": "Human-readable job name."
},
"attempt_number": {
"type": [
"integer",
"null"
],
"minimum": 1,
"description": "Attempt number for this job."
},
"platform": {
"type": [
"string",
"null"
],
"description": "Platform, e.g. aws.emr, aws.glue, aws.lambda."
},
"platform_metadata": {
"type": [
"object",
"null"
],
"description": "Platform-specific metadata (e.g. {\"cluster_id\": \"j-...\"})."
},
"input_arguments": {
"type": [
"object",
"null"
],
"description": "Arguments passed to the job."
},
"definition_id": {
"type": "string",
"description": "Definition (Pipeline, Domain, Process) identifier."
},
"definition_version": {
"type": [
"string",
"null"
],
"description": "Optional definition version."
},
"status_type": {
"type": "string",
"enum": [
"WAITING",
"RUNNING",
"SUCCEEDED",
"FAILED",
"KILLED"
],
"description": "High-level status type for the current lifecycle event."
},
"status_subtype": {
"type": [
"string",
"null"
],
"description": "Optional status subtype, e.g. NO_DATA or error code."
},
"status_detail": {
"type": [
"string",
"null"
],
"description": "Optional human-readable status detail, e.g. short error message."
},
"additional_context": {
"type": [
"object",
"null"
],
"description": "Additional context payload."
}
},
"required": [
"event_type",
"event_id",
"job_id",
"status_type"
],
"allOf": [
{
"if": {
"properties": {
"event_type": {
"enum": [
"JobCreatedEvent"
]
}
}
},
"then": {
"required": [
"job_name",
"source_app",
"source_app_version",
"timestamp_event",
"environment",
"definition_id",
"platform",
"input_arguments"
],
"properties": {
"status_type": {
"enum": [
"WAITING"
]
}
}
}
},
{
"if": {
"properties": {
"event_type": {
"enum": [
"JobCreatedAndStartedEvent"
]
}
}
},
"then": {
"required": [
"job_name",
"source_app",
"source_app_version",
"timestamp_event",
"environment",
"definition_id",
"platform",
"input_arguments"
],
"properties": {
"status_type": {
"enum": [
"RUNNING"
]
}
}
}
},
{
"if": {
"properties": {
"event_type": {
"const": "JobStartedEvent"
}
}
},
"then": {
"properties": {
"status_type": {
"enum": [
"RUNNING"
]
}
}
}
},
{
"if": {
"properties": {
"event_type": {
"const": "JobUpdatedEvent"
}
}
},
"then": {
"properties": {
"status_type": {
"enum": [
"WAITING",
"RUNNING"
]
}
}
}
},
{
"if": {
"properties": {
"event_type": {
"const": "JobFinishedEvent"
}
}
},
"then": {
"properties": {
"status_type": {
"enum": [
"SUCCEEDED",
"FAILED",
"KILLED"
]
}
}
}
}
]
}
7 changes: 6 additions & 1 deletion src/event_gate_lambda.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,12 @@

# Initialize EventGate handlers
handler_token = HandlerToken(config).with_public_keys_queried()
handler_topic = HandlerTopic(config, aws_s3, handler_token, writers).with_load_access_config().with_load_topic_schemas()
handler_topic = (
HandlerTopic(config, aws_s3, handler_token, writers)
.with_load_access_config()
.with_load_topic_keys_config()
.with_load_topic_schemas()
)
handler_health = HandlerHealth(writers)
handler_api = HandlerApi().with_api_definition_loaded()

Expand Down
Loading
Loading