Skip to content

feat: add Celery integration and improve PostHog client fork safety#464

Closed
parinporecha wants to merge 5 commits intoPostHog:masterfrom
parinporecha:feat/celery_integration
Closed

feat: add Celery integration and improve PostHog client fork safety#464
parinporecha wants to merge 5 commits intoPostHog:masterfrom
parinporecha:feat/celery_integration

Conversation

@parinporecha
Copy link
Copy Markdown
Contributor

@parinporecha parinporecha commented Mar 13, 2026

Summary

This PR:

  • Adds a new PosthogCeleryIntegration to automatically capture Celery task lifecycle events and exceptions.
  • Propagates PostHog context (distinct_id, session_id, tags) from the task producer to the worker so Celery tasks can be correlated with the originating user/session.
  • Makes Client safer across process forks by reinitializing fork-unsafe client state in child processes.
from posthog.integrations.celery import PosthogCeleryIntegration
integration = PosthogCeleryIntegration(
    capture_exceptions=True,
    capture_task_lifecycle_events=True,
    propagate_context=True
)
integration.instrument()

Context

I saw users asking for advice on how to use PostHog with Celery for error tracking in community questions and realized that there's currently no first-class way to instrument Celery workloads with PostHog.

That leaves a few gaps:

  • background task execution is hard to observe without manual instrumentation.
  • worker-side events are difficult to correlate back to the originating user or request.

This PR addresses those gaps by adding a Celery integration that helps users observe task execution end-to-end out of the box.

The integration takes inspiration from OpenTelemetry's Celery instrumentor and PostHog context propagation is achieved through task headers mirroring Sentry and DataDog's Celery integrations.

While testing this, I found a separate SDK issue: when a Client configured in async mode is inherited across a process fork, the child process inherits a client whose consumer threads no longer exist. In practice, that means worker-side events don't get delivered. This would also be a problem when using the SDK in some Django deployments.

So this PR also adds fork handling to Client by reinitializing its queue, consumers, and other state in the child process via os.register_at_fork.

Changes

New: Celery Integration (posthog/integrations/celery.py)

  • Lifecycle Events: Hooks into Celery signals (task_prerun, task_success, task_failure, etc.) to capture events like celery task started, celery task success etc. Check the docstring in the integration module code for complete list of supported events.
    • Lifecycle events include Celery-specific properties such as task ID, task name, queue, retry count, duration, Celery version etc. Check the docstring for complete set of event properties.
  • Context Propagation:
    • _on_before_task_publish: Injects current PostHog context (distinct_id, session_id, tags) into task headers.
    • _on_task_prerun: Extracts headers in the worker and restores the PostHog context for the duration of the task. This context is exited upon task completion.
    • Any custom events captured inside a task inherit the same propagated PostHog context and Celery task tags.
  • Exception Capture: Automatically captures exceptions from failed tasks.

Refactored: Client Fork Safety posthog/client.py

  • Added _reinit_after_fork method to reset the internal queue and spin up new consumers in a child process.
  • Uses os.register_at_fork (on supported platforms) to automatically call this method, so that the SDK does not drop captured events when used in child processes.

Examples examples/celery_integration.py

  • Added a complete example showing how to configure the integration on both the producer and worker sides and all features in practice.

Tests

  • Added posthog/test/integrations/test_celery_integration.py covering:
    • Signal handlers for all task states.
    • Context propagation (header injection and extraction).
    • Task filtering logic.
    • Exception capture.
  • Added posthog/test/test_client_fork.py covering:
    • Unit tests for client's fork safety
    • End to end tests forking Client and verifying behaviour
  • Manually tested the example against Celery 5.2.1 (2021), 5.3.1, 5.4.0, and 5.6.2 (2026).

Screenshots (created through example script)

  • Celery task lifecycle events and captured Exception -
    image

  • Celery task success event emitted from worker carrying correct distinct ID, session ID set in parent and context tags -
    image

  • Captured exception -
    image

@parinporecha parinporecha force-pushed the feat/celery_integration branch 7 times, most recently from e8eff44 to 896dfa8 Compare March 14, 2026 23:11
@parinporecha parinporecha changed the title feat: add Celery integration & add fork safety to PostHog client feat: add Celery integration and improve PostHog client fork safety Mar 15, 2026
@parinporecha parinporecha marked this pull request as ready for review March 15, 2026 10:42
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented Mar 15, 2026

Prompt To Fix All With AI
This is a comment left during a code review.
Path: posthog/integrations/celery.py
Line: 225-248

Comment:
**Context leak if `_on_task_prerun` raises after entering context**

If any code between line 233 (`context_manager.__enter__()`) and the end of the try block throws an exception, the `except` on line 247 swallows it but the context is never exited — it remains pushed onto the `contextvars` stack for the remainder of the thread's life. This corrupts context state for subsequent tasks in the same worker thread.

For example, if `self._apply_propagated_identity(request)` (line 239) or `self._capture_event(...)` (line 246) raises, the context will leak. Similarly, if `request` is `None`, the context manager is entered but never stored on `request._posthog_ctx`, so `_handle_task_end`'s finally block can never exit it.

Consider cleaning up the context in the `except` block:

```python
    def _on_task_prerun(self, *args, **kwargs):
        context_manager = None
        try:
            task_id = kwargs.get("task_id")
            if not task_id:
                return

            sender = kwargs.get("sender")
            request = getattr(sender, "request", None)
            context_tags = self._extract_propagated_tags(request)
            task_properties = self._build_task_properties(
                sender=sender,
                task_id=task_id,
                state="started",
            )
            task_name = task_properties.get("celery_task_name")

            context_manager = contexts.new_context(
                fresh=True,
                capture_exceptions=False,
                client=self.client,
            )
            context_manager.__enter__()

            if request is not None:
                request._posthog_ctx = context_manager
                request._posthog_start = time.monotonic()

            self._apply_propagated_identity(request)

            merged_tags = {**task_properties, **context_tags}
            for key, value in merged_tags.items():
                contexts.tag(key, value)

            if self.capture_task_lifecycle_events and self._should_track(task_name, task_properties):
                self._capture_event("celery task started", properties=task_properties)
        except Exception:
            logger.exception("Failed to process Celery task_prerun")
            if context_manager is not None:
                try:
                    context_manager.__exit__(None, None, None)
                except Exception:
                    pass
```

How can I resolve this? If you propose a fix, please make it concise.

---

This is a comment left during a code review.
Path: posthog/client.py
Line: 336-337

Comment:
**`register_at_fork` prevents Client garbage collection**

`os.register_at_fork` callbacks cannot be unregistered. The bound method `self._reinit_after_fork` holds a strong reference to `self`, which means:

1. Every `Client` instance registered here will never be garbage collected for the lifetime of the process.
2. If multiple `Client` instances are created (e.g., in tests, or per-request patterns), each fork will run *all* accumulated callbacks, including for defunct/shutdown clients.

A common mitigation is to use a `weakref` callback so the client can still be collected:

```python
import weakref

# in __init__, replace the current register_at_fork block with:
if hasattr(os, "register_at_fork"):
    weak_self = weakref.ref(self)
    def _reinit_child():
        client = weak_self()
        if client is not None:
            client._reinit_after_fork()
    os.register_at_fork(after_in_child=_reinit_child)
```

This way, if the Client is no longer referenced, the callback becomes a no-op rather than keeping the entire Client alive.

How can I resolve this? If you propose a fix, please make it concise.

Last reviewed commit: 84b314c

Comment thread posthog/integrations/celery.py Outdated
Comment thread posthog/client.py Outdated
Copy link
Copy Markdown
Contributor

@dustinbyrne dustinbyrne left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @parinporecha, love the direction here. I left a few questions/considerations below, let me know what you think!

Comment thread posthog/client.py
Comment on lines +1100 to +1111
def _reinit_after_fork(self):
"""Reinitialize queue and consumer threads in a forked child process.

Registered via os.register_at_fork(after_in_child=...) so it runs
exactly once in each child, before any user code, covering all code
paths (capture, flush, join, etc.).

Python threads do not survive fork() and queue.Queue internal locks
may be in an inconsistent state, so both are replaced.
Inherited queue items are intentionally discarded as they'll be
handled by the parent process's consumers.
"""
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One additional consideration here is that urllib3 sessions contain sockets that will be shared after fork. They're not mutex controlled, so they should should also be recreated.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah, turned out that Poller and RedisFlagCache also needed to be recreated so I've updated the client to handle them all.

Not sure what to do about flag_definition_cache_provider. It could be problematic due to locks but it's not in our control. Do we just clarify in the docs that we recommend recreating the client in that case?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little torn on this, because it feels a bit like the Celery integration and flag_definition_cache_provider are at odds with each other, unless we also expand the FlagDefinitionCacheProvider to handle _reinit_after_fork. In practice, Celery is a great use case for caching flag definitions.

Perhaps we could do something like:

integration = PosthogCeleryIntegration(
    client_factory=create_posthog_client,
)

where:

def create_posthog_client():
    cache = MyCacheProvider(...)
    return Posthog(
        "...",
        personal_api_key="...",
        flag_definition_cache_provider=cache,
    )

Then on worker_process_init, the integration could call the factory and store the client for that process and we avoid the whole fork.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm more inclined towards updating the docs/example to explicitly mention that users with a custom flag_definition_cache_provider should reinit their client in the workers and reinstrument the integration with that new client instance. That's more in line with what the other providers (Sentry, DD, OTel) recommend.

We can replacing client with factory but that would be a deviation from that standard practice which I guess users would be more likely to be used to.

Comment thread examples/celery_integration.py Outdated
Comment on lines +69 to +71
# Worker process setup.
# Celery's default prefork pool runs tasks in child processes, so initialize
# PostHog per child using worker_process_init.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a non-issue given _reinit_after_fork exists

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A part of me thinks it's better to suggest this to keep client' scope and ownership contained in the child, but don't have a strong opinion about it as it does make it easier for the user if we handle it out of the box.

I have commented out that code so now it's more of a suggestion of how one could use that signal in a multi-host setup.

Comment thread examples/celery_integration.py Outdated
Comment on lines +74 to +81
@worker_process_init.connect
def on_worker_process_init(**kwargs) -> None:
worker_posthog_client = create_client()
worker_integration = create_integration(worker_posthog_client)
worker_integration.instrument()

app._posthog_client = worker_posthog_client
app._posthog_integration = worker_integration
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're also calling instrument() at the module level. Is calling again from the worker necessary?

Regarding the client, consider using the singleton posthog instance for simplicity. Type checking won't be happy about us writing directly to the Celery app.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah makes sense. I've updated it to use the singleton instance.

Comment thread examples/celery_integration.py Outdated
Comment on lines +84 to +92
@worker_process_shutdown.connect
def on_worker_process_shutdown(**kwargs) -> None:
worker_integration = getattr(app, "_posthog_integration", None)
if worker_integration:
worker_integration.uninstrument()

worker_posthog_client = getattr(app, "_posthog_client", None)
if worker_posthog_client:
worker_posthog_client.shutdown()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this logic be part of instrument()? Ensuring we shutdown properly seems like a good idea, otherwise we could be losing event data.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very good suggestion, and I had missed this area completely.

I've now added a shutdown() functionality that disconnects signals and flushes the client. In instrument() it's also connected to worker_process_shutdown and atexit signals, so if user forgets it'll still be possible to salvage events.

Comment thread .sampo/changesets/ardent-knight-vainamoinen.md Outdated
@parinporecha parinporecha force-pushed the feat/celery_integration branch 7 times, most recently from 0081b75 to 9752bc7 Compare March 21, 2026 14:37
@parinporecha
Copy link
Copy Markdown
Contributor Author

@dustinbyrne I have a couple of questions -

  • Is the naming of events fine? celery task started, celery task success etc.
  • Are there any log.info statements needed in the integration? Not sure what PostHog's philosophy is about log volumes, verbosity etc.

parinporecha and others added 5 commits March 23, 2026 16:59
Co-authored-by: Dustin Byrne <dustinsbyrne@gmail.com>
- make fork safety complete in the client
- add shutdown mechanism to the integration
- better test coverage
- better docs on usage
@parinporecha parinporecha force-pushed the feat/celery_integration branch from 9752bc7 to 9f4fb89 Compare March 23, 2026 15:59
@parinporecha
Copy link
Copy Markdown
Contributor Author

@dustinbyrne any thoughts on the updates when you get a chance?

@github-actions
Copy link
Copy Markdown
Contributor

This PR hasn't seen activity in a week! Should it be merged, closed, or further worked on? If you want to keep it open, post a comment or remove the stale label – otherwise this will be closed in another week.

@github-actions github-actions Bot added the stale label Apr 14, 2026
@marandaneto marandaneto deleted the branch PostHog:master April 16, 2026 08:09
@parinporecha
Copy link
Copy Markdown
Contributor Author

@marandaneto is this not needed?

@marandaneto
Copy link
Copy Markdown
Member

@marandaneto is this not needed?

i think this is useful, yes, and @dustinbyrne probably missed the ping
some PRs got auto closed because i changed the main branch from master -> main
mind opening up the PR again against main? and with signed commits otherwise we cant merge it

Comment on lines +69 to +82
# --- Worker process setup ---
# Celery's default prefork pool runs tasks in child processes. This example
# runs on a single host, so the inherited PostHog client and Celery
# integration are fork-safe and do not need to be recreated in each child.
# If workers run across multiple hosts, configure PostHog and instrument a
# worker-local integration in worker_process_init.
@worker_process_init.connect
def on_worker_process_init(**kwargs) -> None:
# global integration

# configure_posthog()
# integration = create_integration()
# integration.instrument()
return
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still necessary? It looks like it's a no-op

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this shows how to set it up if the workers are running on a different host. I'll comment out this handler entirely and make that clearer

Comment on lines +85 to +90
# Use this signal to shutdown the integration and PostHog client
# Calling shutdown() is important to flush any pending events
@worker_process_shutdown.connect
def on_worker_process_shutdown(**kwargs) -> None:
integration.shutdown()
posthog.shutdown()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this required setup configuration or can this be omitted? We're also calling doing this at the bottom of the example, but it's not clear why or if it's required.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed for the celery workers. The one at the bottom of the example does it for the producer process. I'll add comments explaining it

Comment thread posthog/client.py
Comment on lines +1100 to +1111
def _reinit_after_fork(self):
"""Reinitialize queue and consumer threads in a forked child process.

Registered via os.register_at_fork(after_in_child=...) so it runs
exactly once in each child, before any user code, covering all code
paths (capture, flush, join, etc.).

Python threads do not survive fork() and queue.Queue internal locks
may be in an inconsistent state, so both are replaced.
Inherited queue items are intentionally discarded as they'll be
handled by the parent process's consumers.
"""
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little torn on this, because it feels a bit like the Celery integration and flag_definition_cache_provider are at odds with each other, unless we also expand the FlagDefinitionCacheProvider to handle _reinit_after_fork. In practice, Celery is a great use case for caching flag definitions.

Perhaps we could do something like:

integration = PosthogCeleryIntegration(
    client_factory=create_posthog_client,
)

where:

def create_posthog_client():
    cache = MyCacheProvider(...)
    return Posthog(
        "...",
        personal_api_key="...",
        flag_definition_cache_provider=cache,
    )

Then on worker_process_init, the integration could call the factory and store the client for that process and we avoid the whole fork.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants