PR #5: Bug - Fix cascading checkpoint writes when a stopped flow re-enters TryStart on subsequent events#99
Open
AlexanderJohnston wants to merge 41 commits into
Open
PR #5: Bug - Fix cascading checkpoint writes when a stopped flow re-enters TryStart on subsequent events#99AlexanderJohnston wants to merge 41 commits into
AlexanderJohnston wants to merge 41 commits into
Conversation
- Write to the client stream whether or not the checkpoint write succeeds - Do not ignore events routed to stopped flows - Avoid a null reference when an existing flow fails to load
The SkipException handling that required await was removed in a prior commit. Simplify back to returning the task directly. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Feature/dotnet10 upgrade
Feature/grpc cleanup
Transition core standard libraries to net10.
- Upgrade Totem, Totem.Runtime, Totem.Timeline, Totem.App.Tests from netstandard2.0 to net10.0 - Update Microsoft.Extensions.* package versions from 2.2.0 to 10.0.0 - Replace FormatterServices.GetUninitializedObject() with RuntimeHelpers.GetUninitializedObject() in DurableType.cs - Replace Assembly.LoadWithPartialName() with Assembly.Load(new AssemblyName()) in TypeResolver.cs - Remove ToHashSet extension methods that conflict with built-in LINQ on net10.0 - Fix C# 14 field keyword conflict in Fields.cs Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Create new STJ infrastructure replacing Newtonsoft.Json core: - Add TotemJsonTypeInfoResolver: replaces JsonFormatContractResolver and JsonFormatSerializationBinder with DefaultJsonTypeInfoResolver + Modifiers - Many<T> collection creation via expression-compiled factory - Durable type object creation via RuntimeHelpers.GetUninitializedObject() - Property filtering: excludes [Transient], [CompilerGenerated], Notion-declared - Private field/property serialization for durable types - [WriteOnly] attribute support (serialize but don't deserialize) - Add DurableTypeDiscriminatorConverter: JsonConverterFactory handling polymorphic \ property with 'durable:Prefix:TypeName' values for backward compatibility with Newtonsoft-serialized EventStore data. Includes TypeResolver fallback for legacy assembly-qualified type names. - Redesign IJsonFormat: expose JsonSerializerOptions instead of Apply() pattern - Rewrite JsonFormat: simple wrapper around JsonSerializerOptions - Rewrite JsonFormatExtensions: use JsonSerializer/JsonNode instead of JsonConvert/JObject. Add ToJsonNode/ToJsonNodeUtf8 methods replacing JObject. Implement CopyProperties for PopulateObject equivalent. - Rewrite JsonFormatOptions: SerializerOptions replaces SerializerSettings - Rewrite JsonFormatOptionsSetup: configure WriteIndented, CamelCase naming, JsonStringEnumConverter, TotemJsonTypeInfoResolver, DurableTypeDiscriminatorConverter - Update JsonServiceExtensions: use SerializerOptions Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
… STJ Phase 3 - Timeline Converters: - Rewrite FlowKeyConverter as JsonConverter<FlowKey> using Utf8JsonReader/Writer - Rewrite TimelinePositionConverter as JsonConverter<TimelinePosition> - Update TimelineJsonFormatOptionsSetup to use SerializerOptions Phase 4 - EventStore Layer: - Rewrite SubscribeCommand: replace JObject/JArray/JToken (Newtonsoft.Json.Linq) with JsonNode/JsonArray (System.Text.Json.Nodes) - Use JsonNode.GetValue<T>() instead of JToken.Value<T>() - Use JsonArray instead of JArray, pattern matching with 'is JsonArray' Phase 5 - MVC Layer: - Rewrite WebRuntimeOptionsSetup: replace IPostConfigureOptions<MvcNewtonsoftJsonOptions> with IPostConfigureOptions<JsonOptions>, copying STJ settings instead of 22 Newtonsoft properties - Remove .AddNewtonsoftJson() from ConfigureWebApp.cs (STJ is MVC default) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…iles - Remove Newtonsoft.Json 13.0.3 package from Totem.Runtime.csproj - Remove Microsoft.AspNetCore.Mvc.NewtonsoftJson 10.0.0 package from Totem.Timeline.Mvc.csproj - Delete JsonFormatContractResolver.cs (replaced by TotemJsonTypeInfoResolver) - Delete JsonFormatSerializationBinder.cs (merged into DurableTypeDiscriminatorConverter) All direct Newtonsoft.Json dependencies are now removed. The only remaining Newtonsoft reference is a transitive dependency from EventStore.Client.Grpc (Newtonsoft.Json 9.0.1) which is outside our control. Solution builds with 0 errors across all projects. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
The DurableTypeDiscriminatorConverter.Read method was only checking the first property for the \ discriminator. Since JSON property order is not guaranteed, this could miss the discriminator and deserialize as the base type instead of the actual polymorphic type. Now scans through all properties using Utf8JsonReader.TrySkip() to find \ regardless of position in the JSON object. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…o STJ STJ does not auto-detect [TypeConverter] attributes like Newtonsoft did. This caused deserialization crashes for 20+ types (TypeName, Id, FileLink, etc.) that use TextConverter for string-based serialization. The factory detects types with a non-default TypeConverter that supports string conversion, and serializes/deserializes them as JSON strings via ConvertFromInvariantString/ConvertToInvariantString. Registered in JsonFormatOptionsSetup.Configure() after JsonStringEnumConverter, before the durable type converters in PostConfigure(). No conflicts since TypeConverter types are not durable. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…ctory Add TypeConverterJsonConverterFactory to bridge [TypeConverter] types to STJ
…only The previous check matched all .NET built-in types with TypeConverters (int, DateTime, bool, Guid, etc.), causing reader.GetString() to throw on non-string JSON tokens. This silently broke ASP.NET model binding for any model with non-string properties (e.g. SmartScanRecord.FileCount). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Narrow TypeConverterJsonConverterFactory and add dictionary key support
Refactor/newtonsoft to stj
When TryStart() encounters FlowInfo.Stopped, complete the lifetime directly instead of throwing through the ObserveNextPoint catch path, which redundantly re-enters Stop() and writes a new checkpoint per queued event. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
The ConnectionString early-return path was missing the DefaultDeadline assignment, so using a connection string bypassed the configured timeout entirely. Restructure to converge both paths before applying shared settings (LoggerFactory, DefaultDeadline). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…deadline Apply `DefaultDeadline` unconditionally in `BuildClientSettings` to fix silent timeout bypass
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
This image depicts modern Totem artwork in the style of a death spiral loop.
Problem
This bug has existed since ~2020 and is not part of the gRPC or net10 changes.
A single transient failure (e.g., a gRPC
DeadlineExceededtimeout writing to EventStore) permanently kills a flow and then triggers a cascading death spiral that writes redundant stopped checkpoints for every subsequent event routed to that flow.We observed this in production with
ProjectClassifierTopic. A timeout at position #257138 caused the flow to correctly stop and persist its error state — but then every new event routed to the topic (positions #257138 through #325995) re-entered the same stop/write cycle, producing ~100 nested exception wrappers and ~100 unnecessary checkpoint appends.The resulting log entry was a
System.AggregateExceptioncontaining a chain of"Flow is stopped at #N with this error:"messages nested roughly 100 levels deep, each wrapping the one before it, all the way back to the original timeout.Cause
The issue is in
FlowScope<T>.TryStart(). When a flow has no in-memory state (Flow == null) and receives a new event,ObservePointIfStartedcallsTryStart()to read the checkpoint from the database. If the checkpoint indicates the flow is stopped,TryStartthrows:That exception propagates up to
ObserveNextPoint's catch block, which callsStop().Stop()is designed for first-time failures — it persists the error state to a checkpoint and completes the lifetime task with an exception. But here, the error state is already persisted from the original failure. SoStop()redundantly:Flowis still null)The lifetime completing would normally end the
while(Running)loop — and it does. But the damage per iteration is already done: one unnecessary checkpoint write and one layer of exception nesting. SinceObserveQueueprocesses the backlog of enqueued points before checkingRunning, every pending event triggers one full cycle before the loop exits.The deeper issue is that
TryStarttreats "already stopped" as an observation-time error (by throwing), when it is actually a pre-existing terminal state that was already fully handled by the originalStop()call. The throw conflates discovering a stopped state with entering a stopped state.Fix
Replace the
throwin theFlowInfo.Stoppedcase with a direct lifetime completion and early return:CompleteTasksets the lifetime exception, which makesRunningreturnfalse. ThereturnexitsTryStartback intoObservePointIfStarted, which checksRunningbefore callingObservePoint()and skips it. Control returns toObserveNextPoint's try block (no exception, so the catch is not entered), then back to thewhile(Running)loop, which exits.Reasoning For Fix
Before the fix:
An event dequeues
→ TryStart reads FlowInfo.Stopped
→ throws
→ Stop() writes a redundant checkpoint
→ CompleteTask sets Running = false
→ loop exits.
Any remaining events in _queue are abandoned with the FlowScope itself.
After the fix:
An event dequeues
→ TryStart reads FlowInfo.Stopped
→ CompleteTask sets Running = false
→ return skips back through ObservePointIfStarted
→ loop exits.
Same abandonment, just on the first event instead of the Nth.
In both cases, the queued events are never processed. _queue is an in-memory structure that dies with the FlowScope. The events themselves are still in the timeline stream and the resume projection still has them tracked. They aren't lost from EventStore, they're just unprocessed by this flow instance.
The difference is only in how many events get individually dequeued and fed through the Stop() path before the loop breaks. Before: all of them, each writing a checkpoint. After: one dequeue, zero checkpoint writes, immediate exit.
The difference is only in how many events get individually dequeued and fed through the Stop() path before the loop breaks. Before: all of them, each writing a checkpoint. After: one dequeue, zero checkpoint writes, immediate exit.
Recovery of those events is the same either way — it requires manual intervention to clear the stopped state (resetting the checkpoint stream or the projection's isStopped flag), at which point the resume projection would see isStopped = false and the flow would replay from its last good checkpoint.
What this preserves
This change is intentionally minimal and does not alter any of the existing design invariants:
Stop()persists the checkpointStop()still runs on first failureStop()→CompleteTaskCompleteTaskdirectlyObservePointnot called for stopped flowsRunningbecomes falseRunningbecomes falseisStoppedflagWhat this changes
"Flow {Key} stopped at position {Position}"wrapping the original error string. Previously it was N levels deep, one per event processed after the failure."Flow is stopped at {Position} with this error: {Error}"to"Flow {Key} stopped at position {Position}"with the original error asInnerException. This includes the flow key for diagnostics and uses structured exception nesting rather than string concatenation of the original error.Scope
One file, one case branch, two lines. No new dependencies, no behavioral changes to healthy flows, no changes to the resume projection or checkpoint storage.