-
Notifications
You must be signed in to change notification settings - Fork 108
added custom metric sample #177
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
755fe64
e606ed5
70c1c73
95c09a6
1621855
2ab0cf1
3379774
84fdfee
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,5 @@ | ||
| .venv | ||
| .idea | ||
| __pycache__ | ||
| .vscode | ||
| .DS_Store |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,33 @@ | ||
| # Custom Metric | ||
|
GSmithApps marked this conversation as resolved.
|
||
|
|
||
| 1. Run the server with `temporal server start-dev` | ||
|
GSmithApps marked this conversation as resolved.
Outdated
|
||
| 2. Run the client with `uv run custom_metric/client.py` | ||
| 3. Run the workflow worker with `uv run custom_metric/workflow_worker.py` | ||
| 4. Run the activity worker with `uv run custom_metric/activity_worker.py` | ||
|
GSmithApps marked this conversation as resolved.
Outdated
|
||
| 5. Go to `http://127.0.0.1:9090/metrics` in your browser | ||
|
|
||
| You'll get something like the following: | ||
|
|
||
| ```txt | ||
| custom_activity_schedule_to_start_latency_bucket{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow",le="100"} 0 | ||
| custom_activity_schedule_to_start_latency_bucket{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow",le="500"} 0 | ||
| custom_activity_schedule_to_start_latency_bucket{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow",le="1000"} 0 | ||
| custom_activity_schedule_to_start_latency_bucket{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow",le="5000"} 0 | ||
| custom_activity_schedule_to_start_latency_bucket{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow",le="10000"} 1 | ||
| custom_activity_schedule_to_start_latency_bucket{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow",le="100000"} 1 | ||
| custom_activity_schedule_to_start_latency_bucket{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow",le="1000000"} 1 | ||
| custom_activity_schedule_to_start_latency_bucket{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow",le="+Inf"} 1 | ||
| custom_activity_schedule_to_start_latency_sum{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow"} 6336 | ||
| custom_activity_schedule_to_start_latency_count{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow"} 1 | ||
| ... | ||
| temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="100"} 0 | ||
| temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="500"} 0 | ||
| temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="1000"} 0 | ||
| temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="5000"} 0 | ||
| temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="10000"} 1 | ||
| temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="100000"} 1 | ||
| temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="1000000"} 1 | ||
| temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="+Inf"} 1 | ||
| temporal_activity_schedule_to_start_latency_sum{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue"} 6336 | ||
| temporal_activity_schedule_to_start_latency_count{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue"} 1 | ||
| ``` | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,71 @@ | ||
| import asyncio | ||
| from concurrent.futures import ThreadPoolExecutor | ||
|
|
||
| from temporalio import activity | ||
| from temporalio.client import Client | ||
| from temporalio.runtime import PrometheusConfig, Runtime, TelemetryConfig | ||
| from temporalio.worker import ( | ||
| ActivityInboundInterceptor, | ||
| ExecuteActivityInput, | ||
| Interceptor, | ||
| Worker, | ||
| ) | ||
|
|
||
|
|
||
| class SimpleWorkerInterceptor(Interceptor): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This sample is not just demonstrating custom metric, which can be done inside the activity, it's demonstrating interceptors. May be a little confusing to someone just wanting to understand the custom metric part.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the suggestion! I implemented it because what you mentioned off-PR, I thought you suggested to do this with an interceptor
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For this specific custom metric that makes sense, but people can create custom metrics without interceptors. I think it's fine to stay, may just need to clarify in README via that one/two-sentence explainer that the purpose of this sample is to demonstrate a custom schedule to start metric via interceptor, as opposed to just general custom metric.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added a little blurb. I hope it's along the lines you're thinking |
||
|
|
||
| def intercept_activity( | ||
| self, next: ActivityInboundInterceptor | ||
| ) -> ActivityInboundInterceptor: | ||
| return CustomScheduleToStartInterceptor(next) | ||
|
|
||
|
|
||
| class CustomScheduleToStartInterceptor(ActivityInboundInterceptor): | ||
|
|
||
| async def execute_activity(self, input: ExecuteActivityInput): | ||
|
|
||
| schedule_to_start = ( | ||
| activity.info().started_time | ||
| - activity.info().current_attempt_scheduled_time | ||
| ) | ||
| # Could do the original schedule time instead of current attempt | ||
| # schedule_to_start_second_option = activity.info().started_time - activity.info().scheduled_time | ||
|
|
||
| meter = activity.metric_meter() | ||
| histogram = meter.create_histogram_timedelta( | ||
| "custom_activity_schedule_to_start_latency", | ||
| description="Time between activity scheduling and start", | ||
| unit="duration", | ||
| ) | ||
| histogram.record( | ||
| schedule_to_start, {"workflow_type": activity.info().workflow_type} | ||
| ) | ||
| return await self.next.execute_activity(input) | ||
|
GSmithApps marked this conversation as resolved.
|
||
|
|
||
|
|
||
| @activity.defn | ||
| def print_message(): | ||
| print("in the activity") | ||
|
|
||
|
|
||
| async def main(): | ||
| runtime = Runtime( | ||
| telemetry=TelemetryConfig(metrics=PrometheusConfig(bind_address="0.0.0.0:9090")) | ||
| ) | ||
| client = await Client.connect( | ||
| "localhost:7233", | ||
| runtime=runtime, | ||
| ) | ||
| worker = Worker( | ||
| client, | ||
| task_queue="custom-metric-task-queue", | ||
| activities=[print_message], | ||
| activity_executor=ThreadPoolExecutor(5), | ||
| interceptors=[SimpleWorkerInterceptor()], | ||
| ) | ||
|
|
||
| await worker.run() | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| asyncio.run(main()) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,23 @@ | ||
| import asyncio | ||
| import uuid | ||
|
|
||
| from temporalio.client import Client | ||
|
|
||
| from custom_metric.workflow_worker import ExecuteActivityWorkflow | ||
|
|
||
|
|
||
| async def main(): | ||
|
|
||
| client = await Client.connect( | ||
| "localhost:7233", | ||
| ) | ||
|
|
||
| await client.start_workflow( | ||
| ExecuteActivityWorkflow.run, | ||
| id="execute-activity-workflow-" + str(uuid.uuid4()), | ||
| task_queue="custom-metric-task-queue", | ||
| ) | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| asyncio.run(main()) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,39 @@ | ||
| import asyncio | ||
| from datetime import timedelta | ||
|
|
||
| from temporalio import workflow | ||
| from temporalio.client import Client | ||
| from temporalio.worker import Worker | ||
|
|
||
| with workflow.unsafe.imports_passed_through(): | ||
| from custom_metric.activity_worker import print_message | ||
|
|
||
|
|
||
| @workflow.defn | ||
| class ExecuteActivityWorkflow: | ||
|
|
||
| @workflow.run | ||
| async def run(self): | ||
| await workflow.execute_activity( | ||
| print_message, | ||
| start_to_close_timeout=timedelta(seconds=5), | ||
| ) | ||
| return None | ||
|
|
||
|
|
||
| async def main(): | ||
|
|
||
| client = await Client.connect( | ||
| "localhost:7233", | ||
| ) | ||
| worker = Worker( | ||
| client, | ||
| task_queue="custom-metric-task-queue", | ||
| workflows=[ExecuteActivityWorkflow], | ||
| ) | ||
|
|
||
| await worker.run() | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| asyncio.run(main()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a test or two? (I know not all samples have tests, something we regret, but adding them helps us catch if they start breaking)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added!