Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
.venv
.idea
__pycache__
.vscode
.DS_Store
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ Some examples require extra dependencies. See each sample's directory for specif
* [context_propagation](context_propagation) - Context propagation through workflows/activities via interceptor.
* [custom_converter](custom_converter) - Use a custom payload converter to handle custom types.
* [custom_decorator](custom_decorator) - Custom decorator to auto-heartbeat a long-running activity.
* [custom_metric](custom_metric) - Custom metric to record the workflow type in the activity schedule to start latency.
* [dsl](dsl) - DSL workflow that executes steps defined in a YAML file.
* [encryption](encryption) - Apply end-to-end encryption for all input/output.
* [gevent_async](gevent_async) - Combine gevent and Temporal.
Expand Down
36 changes: 36 additions & 0 deletions custom_metric/README.md
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a test or two? (I know not all samples have tests, something we regret, but adding them helps us catch if they start breaking)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added!

Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Custom Metric
Comment thread
GSmithApps marked this conversation as resolved.

This sample deminstrates two things: (1) how to make a custom metric, and (2) how to use an interceptor.
The custom metric in this sample is an activity schedule-to-start-latency with a workflow type tag.

Please see the top-level [README](../README.md) for prerequisites such as Python, uv, starting the local temporal development server, etc.

1. Run the worker with `uv run custom_metric/worker.py`
2. Request execution of the workflow with `uv run custom_metric/starter.py`
3. Go to `http://127.0.0.1:9090/metrics` in your browser

You'll get something like the following:

```txt
custom_activity_schedule_to_start_latency_bucket{activity_type="print_and_sleep",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="StartTwoActivitiesWorkflow",le="100"} 1
custom_activity_schedule_to_start_latency_bucket{activity_type="print_and_sleep",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="StartTwoActivitiesWorkflow",le="500"} 1
custom_activity_schedule_to_start_latency_bucket{activity_type="print_and_sleep",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="StartTwoActivitiesWorkflow",le="1000"} 1
custom_activity_schedule_to_start_latency_bucket{activity_type="print_and_sleep",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="StartTwoActivitiesWorkflow",le="5000"} 2
custom_activity_schedule_to_start_latency_bucket{activity_type="print_and_sleep",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="StartTwoActivitiesWorkflow",le="10000"} 2
custom_activity_schedule_to_start_latency_bucket{activity_type="print_and_sleep",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="StartTwoActivitiesWorkflow",le="100000"} 2
custom_activity_schedule_to_start_latency_bucket{activity_type="print_and_sleep",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="StartTwoActivitiesWorkflow",le="1000000"} 2
custom_activity_schedule_to_start_latency_bucket{activity_type="print_and_sleep",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="StartTwoActivitiesWorkflow",le="+Inf"} 2
custom_activity_schedule_to_start_latency_sum{activity_type="print_and_sleep",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="StartTwoActivitiesWorkflow"} 1010
custom_activity_schedule_to_start_latency_count{activity_type="print_and_sleep",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="StartTwoActivitiesWorkflow"} 2
...
temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="100"} 1
temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="500"} 1
temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="1000"} 1
temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="5000"} 2
temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="10000"} 2
temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="100000"} 2
temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="1000000"} 2
temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="+Inf"} 2
temporal_activity_schedule_to_start_latency_sum{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue"} 1010
temporal_activity_schedule_to_start_latency_count{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue"} 2
```
Empty file added custom_metric/__init__.py
Empty file.
9 changes: 9 additions & 0 deletions custom_metric/activity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import time

from temporalio import activity


@activity.defn
def print_and_sleep():
print("In the activity.")
time.sleep(1)
23 changes: 23 additions & 0 deletions custom_metric/starter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import asyncio
import uuid

from temporalio.client import Client

from custom_metric.workflow import StartTwoActivitiesWorkflow


async def main():

client = await Client.connect(
"localhost:7233",
)

await client.start_workflow(
StartTwoActivitiesWorkflow.run,
id="execute-activity-workflow-" + str(uuid.uuid4()),
task_queue="custom-metric-task-queue",
)


if __name__ == "__main__":
asyncio.run(main())
71 changes: 71 additions & 0 deletions custom_metric/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import asyncio
from concurrent.futures import ThreadPoolExecutor

from temporalio import activity
from temporalio.client import Client
from temporalio.runtime import PrometheusConfig, Runtime, TelemetryConfig
from temporalio.worker import (
ActivityInboundInterceptor,
ExecuteActivityInput,
Interceptor,
Worker,
)

from custom_metric.activity import print_and_sleep
from custom_metric.workflow import StartTwoActivitiesWorkflow


class SimpleWorkerInterceptor(Interceptor):
def intercept_activity(
self, next: ActivityInboundInterceptor
) -> ActivityInboundInterceptor:
return CustomScheduleToStartInterceptor(next)


class CustomScheduleToStartInterceptor(ActivityInboundInterceptor):
async def execute_activity(self, input: ExecuteActivityInput):

schedule_to_start = (
activity.info().started_time
- activity.info().current_attempt_scheduled_time
)
# Could do the original schedule time instead of current attempt
# schedule_to_start_second_option = activity.info().started_time - activity.info().scheduled_time

meter = activity.metric_meter()
histogram = meter.create_histogram_timedelta(
"custom_activity_schedule_to_start_latency",
description="Time between activity scheduling and start",
unit="duration",
)
histogram.record(
schedule_to_start, {"workflow_type": activity.info().workflow_type}
)
return await self.next.execute_activity(input)


async def main():
runtime = Runtime(
telemetry=TelemetryConfig(metrics=PrometheusConfig(bind_address="0.0.0.0:9090"))
)
client = await Client.connect(
"localhost:7233",
runtime=runtime,
)
worker = Worker(
client,
task_queue="custom-metric-task-queue",
interceptors=[SimpleWorkerInterceptor()],
workflows=[StartTwoActivitiesWorkflow],
activities=[print_and_sleep],
# only one activity executor with two concurrently scheduled activities
# to force a nontrivial schedule to start times
activity_executor=ThreadPoolExecutor(1),
max_concurrent_activities=1,
)

await worker.run()


if __name__ == "__main__":
asyncio.run(main())
25 changes: 25 additions & 0 deletions custom_metric/workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import asyncio
from datetime import timedelta

from temporalio import workflow

with workflow.unsafe.imports_passed_through():
from custom_metric.activity import print_and_sleep


@workflow.defn
class StartTwoActivitiesWorkflow:
@workflow.run
async def run(self):
# Request two concurrent activities with only one task slot so
# we can see nontrivial schedule to start times.
activity1 = workflow.execute_activity(
print_and_sleep,
start_to_close_timeout=timedelta(seconds=5),
)
activity2 = workflow.execute_activity(
print_and_sleep,
start_to_close_timeout=timedelta(seconds=5),
)
await asyncio.gather(activity1, activity2)
return None
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ packages = [
"context_propagation",
"custom_converter",
"custom_decorator",
"custom_metric",
"dsl",
"encryption",
"gevent_async",
Expand Down
Empty file.
32 changes: 32 additions & 0 deletions tests/custom_metric/workflow_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import uuid

from temporalio import activity
from temporalio.client import Client
from temporalio.worker import Worker

from custom_metric.worker import StartTwoActivitiesWorkflow

_TASK_QUEUE = "custom-metric-task-queue"

activity_counter = 0


async def test_custom_metric_workflow(client: Client):
@activity.defn(name="print_message")
async def print_message_mock():
global activity_counter
activity_counter += 1

async with Worker(
Comment thread
GSmithApps marked this conversation as resolved.
Outdated
client,
task_queue=_TASK_QUEUE,
workflows=[StartTwoActivitiesWorkflow],
activities=[print_message_mock],
):
result = await client.execute_workflow(
StartTwoActivitiesWorkflow.run,
id=str(uuid.uuid4()),
task_queue=_TASK_QUEUE,
)
assert result is None
assert activity_counter == 2