Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions GFramework.Core.Abstractions/Architectures/IArchitecture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,15 @@ public interface IArchitecture : IAsyncInitializable, IAsyncDestroyable, IInitia
void RegisterCqrsPipelineBehavior<TBehavior>()
where TBehavior : class;

/// <summary>
/// 注册 CQRS 流式请求管道行为。
/// 既支持实现 <c>IStreamPipelineBehavior&lt;,&gt;</c> 的开放泛型行为类型,
/// 也支持绑定到单一流式请求/响应对的封闭行为类型。
/// </summary>
/// <typeparam name="TBehavior">行为类型,必须是引用类型</typeparam>
void RegisterCqrsStreamPipelineBehavior<TBehavior>()
where TBehavior : class;

/// <summary>
/// 从指定程序集显式注册 CQRS 处理器。
/// 当处理器位于默认架构程序集之外的模块或扩展程序集中时,可在初始化阶段调用该入口接入对应程序集。
Expand Down
7 changes: 7 additions & 0 deletions GFramework.Core.Abstractions/Ioc/IIocContainer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,13 @@ void RegisterScoped<TService, TImpl>()
void RegisterCqrsPipelineBehavior<TBehavior>()
where TBehavior : class;

/// <summary>
/// 注册 CQRS 流式请求管道行为。
/// </summary>
/// <typeparam name="TBehavior">行为类型,必须是引用类型</typeparam>
void RegisterCqrsStreamPipelineBehavior<TBehavior>()
where TBehavior : class;
Comment thread
coderabbitai[bot] marked this conversation as resolved.

/// <summary>
/// 从指定程序集显式注册 CQRS 处理器。
/// 该入口适用于处理器不位于默认架构程序集中的场景,例如扩展包、模块程序集或拆分后的业务程序集。
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public void SetUp()
LoggerFactoryResolver.Provider = new ConsoleLoggerFactoryProvider();
GameContext.Clear();
TrackingPipelineBehavior<ModuleBehaviorRequest, string>.InvocationCount = 0;
TrackingStreamPipelineBehavior<ModuleStreamBehaviorRequest, int>.InvocationCount = 0;
}

/// <summary>
Expand All @@ -38,6 +39,7 @@ public void TearDown()
{
GameContext.Clear();
TrackingPipelineBehavior<ModuleBehaviorRequest, string>.InvocationCount = 0;
TrackingStreamPipelineBehavior<ModuleStreamBehaviorRequest, int>.InvocationCount = 0;
LegacyBridgePipelineTracker.Reset();
}

Expand Down Expand Up @@ -92,6 +94,34 @@ public async Task RegisterCqrsPipelineBehavior_Should_Apply_Pipeline_Behavior_To
}
}

/// <summary>
/// 验证注册的 CQRS stream 行为会参与建流处理流程。
/// </summary>
[Test]
public async Task RegisterCqrsStreamPipelineBehavior_Should_Apply_Pipeline_Behavior_To_Stream_Request()
{
var architecture = new ModuleTestArchitecture(target =>
target.RegisterCqrsStreamPipelineBehavior<TrackingStreamPipelineBehavior<ModuleStreamBehaviorRequest, int>>());

await architecture.InitializeAsync();
try
{
var response = await DrainAsync(architecture.Context.CreateStream(new ModuleStreamBehaviorRequest()));

Assert.Multiple(() =>
{
Assert.That(response, Is.EqualTo([7]));
Assert.That(
TrackingStreamPipelineBehavior<ModuleStreamBehaviorRequest, int>.InvocationCount,
Is.EqualTo(1));
});
}
finally
{
await architecture.DestroyAsync();
}
}

/// <summary>
/// 验证默认架构初始化路径会自动扫描 Core 程序集里的 legacy bridge handler,
/// 使旧 <c>SendCommand</c> / <c>SendQuery</c> 入口也能进入统一 CQRS pipeline。
Expand Down Expand Up @@ -194,4 +224,22 @@ public void Install(IArchitecture architecture)
private sealed class InstalledByModuleUtility : IUtility
{
}

/// <summary>
/// 物化异步流为只读列表,便于断言 stream pipeline 行为的最终可观察结果。
/// </summary>
/// <typeparam name="T">流元素类型。</typeparam>
/// <param name="stream">要物化的异步流。</param>
/// <returns>按枚举顺序收集的元素列表。</returns>
private static async Task<IReadOnlyList<T>> DrainAsync<T>(IAsyncEnumerable<T> stream)
{
var results = new List<T>();

await foreach (var item in stream.ConfigureAwait(false))
{
results.Add(item);
}

return results;
}
}
13 changes: 13 additions & 0 deletions GFramework.Core.Tests/Architectures/ModuleStreamBehaviorRequest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Copyright (c) 2025-2026 GeWuYou
// SPDX-License-Identifier: Apache-2.0

using GFramework.Cqrs.Abstractions.Cqrs;

namespace GFramework.Core.Tests.Architectures;

/// <summary>
/// 用于验证架构公开 stream pipeline 行为注册入口的最小流式请求。
/// </summary>
public sealed class ModuleStreamBehaviorRequest : IStreamRequest<int>
{
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (c) 2025-2026 GeWuYou
// SPDX-License-Identifier: Apache-2.0

using System.Collections.Generic;
using System.Runtime.CompilerServices;
using GFramework.Cqrs.Abstractions.Cqrs;

namespace GFramework.Core.Tests.Architectures;

/// <summary>
/// 处理 <see cref="ModuleStreamBehaviorRequest" /> 并返回一个固定元素。
/// </summary>
public sealed class ModuleStreamBehaviorRequestHandler : IStreamRequestHandler<ModuleStreamBehaviorRequest, int>
{
/// <summary>
/// 返回一个固定元素,供架构 stream pipeline 行为回归断言使用。
/// </summary>
/// <param name="request">当前流式请求。</param>
/// <param name="cancellationToken">取消令牌。</param>
/// <returns>包含一个固定元素的异步流。</returns>
public async IAsyncEnumerable<int> Handle(
ModuleStreamBehaviorRequest request,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
yield return 7;
await ValueTask.CompletedTask.ConfigureAwait(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,16 @@ public void RegisterCqrsPipelineBehavior<TBehavior>() where TBehavior : class
throw new NotSupportedException();
}

/// <summary>
/// 测试替身未实现 CQRS 流式管道行为注册。
/// </summary>
/// <typeparam name="TBehavior">行为类型。</typeparam>
/// <exception cref="NotSupportedException">该测试替身不参与 CQRS 流式管道配置验证。</exception>
public void RegisterCqrsStreamPipelineBehavior<TBehavior>() where TBehavior : class
{
throw new NotSupportedException();
}

/// <summary>
/// 测试替身未实现显式程序集 CQRS 处理器接入入口。
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,16 @@ public void RegisterCqrsPipelineBehavior<TBehavior>() where TBehavior : class
throw new NotSupportedException();
}

/// <summary>
/// 测试替身未实现 CQRS 流式管道行为注册。
/// </summary>
/// <typeparam name="TBehavior">行为类型。</typeparam>
/// <exception cref="NotSupportedException">该测试替身不参与 CQRS 流式管道配置验证。</exception>
public void RegisterCqrsStreamPipelineBehavior<TBehavior>() where TBehavior : class
{
throw new NotSupportedException();
}

/// <summary>
/// 测试替身未实现显式程序集 CQRS 处理器接入入口。
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright (c) 2025-2026 GeWuYou
// SPDX-License-Identifier: Apache-2.0

using System.Threading;
using GFramework.Cqrs.Abstractions.Cqrs;

namespace GFramework.Core.Tests.Architectures;

/// <summary>
/// 记录流式请求通过管道次数的测试行为。
/// </summary>
/// <typeparam name="TRequest">流式请求类型。</typeparam>
/// <typeparam name="TResponse">流式响应元素类型。</typeparam>
public sealed class TrackingStreamPipelineBehavior<TRequest, TResponse> : IStreamPipelineBehavior<TRequest, TResponse>
where TRequest : IStreamRequest<TResponse>
{
private static int _invocationCount;

/// <summary>
/// 获取当前测试进程中该流式请求类型对应的行为触发次数。
/// 该计数器是按泛型闭包共享的静态状态,测试需要在每次运行前显式重置。
/// </summary>
public static int InvocationCount
{
get => Volatile.Read(ref _invocationCount);
set => Volatile.Write(ref _invocationCount, value);
}

/// <summary>
/// 以线程安全方式记录一次行为执行,然后继续执行下一个处理阶段。
/// </summary>
/// <param name="message">当前流式请求消息。</param>
/// <param name="next">下一个处理阶段。</param>
/// <param name="cancellationToken">取消令牌。</param>
/// <returns>下游处理阶段返回的异步流。</returns>
public IAsyncEnumerable<TResponse> Handle(
TRequest message,
StreamMessageHandlerDelegate<TRequest, TResponse> next,
CancellationToken cancellationToken)
{
Interlocked.Increment(ref _invocationCount);
return next(message, cancellationToken);
}
}
10 changes: 10 additions & 0 deletions GFramework.Core/Architectures/Architecture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,16 @@ public void RegisterCqrsPipelineBehavior<TBehavior>() where TBehavior : class
_modules.RegisterCqrsPipelineBehavior<TBehavior>();
}

/// <summary>
/// 注册 CQRS 流式请求管道行为。
/// 可以传入开放泛型行为类型,也可以传入绑定到特定流式请求的封闭行为类型。
/// </summary>
/// <typeparam name="TBehavior">行为类型,必须是引用类型</typeparam>
public void RegisterCqrsStreamPipelineBehavior<TBehavior>() where TBehavior : class
{
_modules.RegisterCqrsStreamPipelineBehavior<TBehavior>();
}

/// <summary>
/// 从指定程序集显式注册 CQRS 处理器。
/// 该入口适用于把拆分到其他模块或扩展包程序集中的 handlers 接入当前架构。
Expand Down
11 changes: 11 additions & 0 deletions GFramework.Core/Architectures/ArchitectureModules.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,17 @@ public void RegisterCqrsPipelineBehavior<TBehavior>() where TBehavior : class
services.Container.RegisterCqrsPipelineBehavior<TBehavior>();
}

/// <summary>
/// 注册 CQRS 流式请求管道行为。
/// 支持开放泛型行为类型和针对单一流式请求的封闭行为类型。
/// </summary>
/// <typeparam name="TBehavior">行为类型,必须是引用类型</typeparam>
public void RegisterCqrsStreamPipelineBehavior<TBehavior>() where TBehavior : class
{
logger.Debug($"Registering CQRS stream pipeline behavior: {typeof(TBehavior).Name}");
services.Container.RegisterCqrsStreamPipelineBehavior<TBehavior>();
}

/// <summary>
/// 从指定程序集显式注册 CQRS 处理器。
/// 该入口用于把默认架构程序集之外的扩展处理器接入当前架构容器。
Expand Down
50 changes: 50 additions & 0 deletions GFramework.Core/Ioc/MicrosoftDiContainer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,56 @@ public void RegisterCqrsPipelineBehavior<TBehavior>() where TBehavior : class
}
}

/// <summary>
/// 注册 CQRS 流式请求管道行为。
/// 同时支持开放泛型行为类型和已闭合的具体行为类型,
/// 以兼容通用行为和针对单一流式请求的专用行为两种注册方式。
/// </summary>
/// <typeparam name="TBehavior">行为类型,必须是引用类型</typeparam>
public void RegisterCqrsStreamPipelineBehavior<TBehavior>() where TBehavior : class
{
ThrowIfDisposed();
EnterWriteLockOrThrowDisposed();
try
{
ThrowIfFrozen();

var behaviorType = typeof(TBehavior);

if (behaviorType.IsGenericTypeDefinition)
{
GetServicesUnsafe.AddSingleton(typeof(IStreamPipelineBehavior<,>), behaviorType);
}
else
{
var pipelineInterfaces = behaviorType
.GetInterfaces()
.Where(type => type.IsGenericType &&
type.GetGenericTypeDefinition() == typeof(IStreamPipelineBehavior<,>))
.ToList();

if (pipelineInterfaces.Count == 0)
{
var errorMessage = $"{behaviorType.Name} does not implement IStreamPipelineBehavior<,>";
_logger.Error(errorMessage);
throw new InvalidOperationException(errorMessage);
}

// 为每个已闭合的流式管道接口建立显式映射,支持针对特定流式请求/响应的专用行为。
foreach (var pipelineInterface in pipelineInterfaces)
{
GetServicesUnsafe.AddSingleton(pipelineInterface, behaviorType);
}
}

_logger.Debug($"CQRS stream pipeline behavior registered: {behaviorType.Name}");
}
finally
{
_lock.ExitWriteLock();
}
}

/// <summary>
/// 从指定程序集显式注册 CQRS 处理器。
/// </summary>
Expand Down
25 changes: 25 additions & 0 deletions GFramework.Cqrs.Abstractions/Cqrs/IStreamPipelineBehavior.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright (c) 2025-2026 GeWuYou
// SPDX-License-Identifier: Apache-2.0

namespace GFramework.Cqrs.Abstractions.Cqrs;

/// <summary>
/// 定义流式 CQRS 请求在建流阶段使用的管道行为。
/// </summary>
/// <typeparam name="TRequest">流式请求类型。</typeparam>
/// <typeparam name="TResponse">流式响应元素类型。</typeparam>
public interface IStreamPipelineBehavior<TRequest, TResponse>
where TRequest : IStreamRequest<TResponse>
{
/// <summary>
/// 处理当前流式请求,并决定是否继续调用后续行为或最终处理器。
/// </summary>
/// <param name="message">当前流式请求消息。</param>
/// <param name="next">下一个处理委托。</param>
/// <param name="cancellationToken">取消令牌。</param>
/// <returns>异步响应序列。</returns>
IAsyncEnumerable<TResponse> Handle(
TRequest message,
StreamMessageHandlerDelegate<TRequest, TResponse> next,
CancellationToken cancellationToken);
}
22 changes: 22 additions & 0 deletions GFramework.Cqrs.Abstractions/Cqrs/StreamMessageHandlerDelegate.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright (c) 2025-2026 GeWuYou
// SPDX-License-Identifier: Apache-2.0

namespace GFramework.Cqrs.Abstractions.Cqrs;

/// <summary>
/// 表示流式 CQRS 请求在管道中继续向下执行的处理委托。
/// </summary>
/// <remarks>
/// <para>stream 行为可以通过不调用该委托来短路整个流式处理链。</para>
/// <para>除显式实现重试、回放或分支等高级语义外,行为通常应最多调用一次该委托,以维持单次建流的确定性。</para>
/// <para>调用方应传递当前收到的 <paramref name="cancellationToken" />,确保取消信号沿建流入口与后续枚举链路一致传播。</para>
/// </remarks>
/// <typeparam name="TRequest">流式请求类型。</typeparam>
/// <typeparam name="TResponse">流式响应元素类型。</typeparam>
/// <param name="message">当前流式请求消息。</param>
/// <param name="cancellationToken">取消令牌。</param>
/// <returns>异步响应序列。</returns>
public delegate IAsyncEnumerable<TResponse> StreamMessageHandlerDelegate<in TRequest, out TResponse>(
TRequest message,
CancellationToken cancellationToken)
where TRequest : IStreamRequest<TResponse>;
4 changes: 2 additions & 2 deletions GFramework.Cqrs.Abstractions/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
推荐按职责引用:

- `GeWuYou.GFramework.Cqrs.Abstractions`
- 提供 `IRequest<TResponse>`、`INotification`、`IStreamRequest<TResponse>`、`IRequestHandler<,>`、`INotificationHandler<>`、`IPipelineBehavior<,>`、`ICqrsRuntime`、`ICqrsContext`、`Unit` 等基础契约。
- 提供 `IRequest<TResponse>`、`INotification`、`IStreamRequest<TResponse>`、`IRequestHandler<,>`、`INotificationHandler<>`、`IPipelineBehavior<,>`、`IStreamPipelineBehavior<,>`、`ICqrsRuntime`、`ICqrsContext`、`Unit` 等基础契约。
- `GeWuYou.GFramework.Cqrs`
- 引用本包,并提供默认 runtime、处理器注册、消息基类、处理器基类、上下文扩展方法。
- `GeWuYou.GFramework.Cqrs.SourceGenerators`
Expand All @@ -38,7 +38,7 @@
- 运行时协作接口
- `ICqrsRuntime`、`ICqrsContext`、`ICqrsHandlerRegistrar`
- 管道与辅助类型
- `IPipelineBehavior<,>`、`MessageHandlerDelegate<,>`、`Unit`
- `IPipelineBehavior<,>`、`IStreamPipelineBehavior<,>`、`MessageHandlerDelegate<,>`、`StreamMessageHandlerDelegate<,>`、`Unit`

## 最小接入路径

Expand Down
Loading
Loading