Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions GFramework.Core.Abstractions/Architectures/IArchitecture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,20 @@ public interface IArchitecture : IAsyncInitializable, IAsyncDestroyable, IInitia
void RegisterCqrsPipelineBehavior<TBehavior>()
where TBehavior : class;

/// <summary>
/// 注册 CQRS 流式请求管道行为。
/// 既支持实现 <c>IStreamPipelineBehavior&lt;,&gt;</c> 的开放泛型行为类型,
/// 也支持绑定到单一流式请求/响应对的封闭行为类型。
/// </summary>
/// <typeparam name="TBehavior">行为类型,必须是引用类型</typeparam>
/// <exception cref="InvalidOperationException">当前架构的底层容器已冻结,无法继续注册流式管道行为。</exception>
/// <exception cref="ObjectDisposedException">当前架构的底层容器已释放,无法继续注册流式管道行为。</exception>
/// <remarks>
/// 该入口应在架构初始化冻结容器之前调用;具体开放泛型或封闭行为类型的校验逻辑由底层容器负责。
/// </remarks>
void RegisterCqrsStreamPipelineBehavior<TBehavior>()
where TBehavior : class;

/// <summary>
/// 从指定程序集显式注册 CQRS 处理器。
/// 当处理器位于默认架构程序集之外的模块或扩展程序集中时,可在初始化阶段调用该入口接入对应程序集。
Expand Down
14 changes: 14 additions & 0 deletions GFramework.Core.Abstractions/Ioc/IIocContainer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,20 @@ void RegisterScoped<TService, TImpl>()
void RegisterCqrsPipelineBehavior<TBehavior>()
where TBehavior : class;

/// <summary>
/// 注册 CQRS 流式请求管道行为。
/// </summary>
/// <typeparam name="TBehavior">行为类型,必须是引用类型</typeparam>
/// <exception cref="InvalidOperationException">容器已冻结,无法继续注册流式管道行为。</exception>
/// <exception cref="ObjectDisposedException">容器已释放,无法继续注册流式管道行为。</exception>
/// <remarks>
/// 该入口既支持实现 <c>IStreamPipelineBehavior&lt;,&gt;</c> 的开放泛型行为类型,
/// 也支持绑定到单一流式请求/响应对的封闭行为类型。
/// 应在容器冻结前的注册阶段调用;具体可注册形态由实现容器负责校验。
/// </remarks>
void RegisterCqrsStreamPipelineBehavior<TBehavior>()
where TBehavior : class;
Comment thread
coderabbitai[bot] marked this conversation as resolved.

/// <summary>
/// 从指定程序集显式注册 CQRS 处理器。
/// 该入口适用于处理器不位于默认架构程序集中的场景,例如扩展包、模块程序集或拆分后的业务程序集。
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public void SetUp()
LoggerFactoryResolver.Provider = new ConsoleLoggerFactoryProvider();
GameContext.Clear();
TrackingPipelineBehavior<ModuleBehaviorRequest, string>.InvocationCount = 0;
TrackingStreamPipelineBehavior<ModuleStreamBehaviorRequest, int>.InvocationCount = 0;
}

/// <summary>
Expand All @@ -38,6 +39,7 @@ public void TearDown()
{
GameContext.Clear();
TrackingPipelineBehavior<ModuleBehaviorRequest, string>.InvocationCount = 0;
TrackingStreamPipelineBehavior<ModuleStreamBehaviorRequest, int>.InvocationCount = 0;
LegacyBridgePipelineTracker.Reset();
}

Expand Down Expand Up @@ -92,6 +94,34 @@ public async Task RegisterCqrsPipelineBehavior_Should_Apply_Pipeline_Behavior_To
}
}

/// <summary>
/// 验证注册的 CQRS stream 行为会参与建流处理流程。
/// </summary>
[Test]
public async Task RegisterCqrsStreamPipelineBehavior_Should_Apply_Pipeline_Behavior_To_Stream_Request()
{
var architecture = new ModuleTestArchitecture(target =>
target.RegisterCqrsStreamPipelineBehavior<TrackingStreamPipelineBehavior<ModuleStreamBehaviorRequest, int>>());

await architecture.InitializeAsync();
try
{
var response = await DrainAsync(architecture.Context.CreateStream(new ModuleStreamBehaviorRequest()));

Assert.Multiple(() =>
{
Assert.That(response, Is.EqualTo([7]));
Assert.That(
TrackingStreamPipelineBehavior<ModuleStreamBehaviorRequest, int>.InvocationCount,
Is.EqualTo(1));
});
}
finally
{
await architecture.DestroyAsync();
}
}

/// <summary>
/// 验证默认架构初始化路径会自动扫描 Core 程序集里的 legacy bridge handler,
/// 使旧 <c>SendCommand</c> / <c>SendQuery</c> 入口也能进入统一 CQRS pipeline。
Expand Down Expand Up @@ -194,4 +224,22 @@ public void Install(IArchitecture architecture)
private sealed class InstalledByModuleUtility : IUtility
{
}

/// <summary>
/// 物化异步流为只读列表,便于断言 stream pipeline 行为的最终可观察结果。
/// </summary>
/// <typeparam name="T">流元素类型。</typeparam>
/// <param name="stream">要物化的异步流。</param>
/// <returns>按枚举顺序收集的元素列表。</returns>
private static async Task<IReadOnlyList<T>> DrainAsync<T>(IAsyncEnumerable<T> stream)
{
var results = new List<T>();

await foreach (var item in stream.ConfigureAwait(false))
{
results.Add(item);
}

return results;
}
}
13 changes: 13 additions & 0 deletions GFramework.Core.Tests/Architectures/ModuleStreamBehaviorRequest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Copyright (c) 2025-2026 GeWuYou
// SPDX-License-Identifier: Apache-2.0

using GFramework.Cqrs.Abstractions.Cqrs;

namespace GFramework.Core.Tests.Architectures;

/// <summary>
/// 用于验证架构公开 stream pipeline 行为注册入口的最小流式请求。
/// </summary>
public sealed class ModuleStreamBehaviorRequest : IStreamRequest<int>
{
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (c) 2025-2026 GeWuYou
// SPDX-License-Identifier: Apache-2.0

using System.Collections.Generic;
using System.Runtime.CompilerServices;
using GFramework.Cqrs.Abstractions.Cqrs;

namespace GFramework.Core.Tests.Architectures;

/// <summary>
/// 处理 <see cref="ModuleStreamBehaviorRequest" /> 并返回一个固定元素。
/// </summary>
public sealed class ModuleStreamBehaviorRequestHandler : IStreamRequestHandler<ModuleStreamBehaviorRequest, int>
{
/// <summary>
/// 返回一个固定元素,供架构 stream pipeline 行为回归断言使用。
/// </summary>
/// <param name="request">当前流式请求。</param>
/// <param name="cancellationToken">取消令牌。</param>
/// <returns>包含一个固定元素的异步流。</returns>
public async IAsyncEnumerable<int> Handle(
ModuleStreamBehaviorRequest request,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
yield return 7;
await ValueTask.CompletedTask.ConfigureAwait(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,16 @@ public void RegisterCqrsPipelineBehavior<TBehavior>() where TBehavior : class
throw new NotSupportedException();
}

/// <summary>
/// 测试替身未实现 CQRS 流式管道行为注册。
/// </summary>
/// <typeparam name="TBehavior">行为类型。</typeparam>
/// <exception cref="NotSupportedException">该测试替身不参与 CQRS 流式管道配置验证。</exception>
public void RegisterCqrsStreamPipelineBehavior<TBehavior>() where TBehavior : class
{
throw new NotSupportedException();
}

/// <summary>
/// 测试替身未实现显式程序集 CQRS 处理器接入入口。
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,16 @@ public void RegisterCqrsPipelineBehavior<TBehavior>() where TBehavior : class
throw new NotSupportedException();
}

/// <summary>
/// 测试替身未实现 CQRS 流式管道行为注册。
/// </summary>
/// <typeparam name="TBehavior">行为类型。</typeparam>
/// <exception cref="NotSupportedException">该测试替身不参与 CQRS 流式管道配置验证。</exception>
public void RegisterCqrsStreamPipelineBehavior<TBehavior>() where TBehavior : class
{
throw new NotSupportedException();
}

/// <summary>
/// 测试替身未实现显式程序集 CQRS 处理器接入入口。
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright (c) 2025-2026 GeWuYou
// SPDX-License-Identifier: Apache-2.0

using System.Threading;
using GFramework.Cqrs.Abstractions.Cqrs;

namespace GFramework.Core.Tests.Architectures;

/// <summary>
/// 记录流式请求通过管道次数的测试行为。
/// </summary>
/// <typeparam name="TRequest">流式请求类型。</typeparam>
/// <typeparam name="TResponse">流式响应元素类型。</typeparam>
public sealed class TrackingStreamPipelineBehavior<TRequest, TResponse> : IStreamPipelineBehavior<TRequest, TResponse>
where TRequest : IStreamRequest<TResponse>
{
private static int _invocationCount;

/// <summary>
/// 获取当前测试进程中该流式请求类型对应的行为触发次数。
/// 该计数器是按泛型闭包共享的静态状态,测试需要在每次运行前显式重置。
/// </summary>
public static int InvocationCount
{
get => Volatile.Read(ref _invocationCount);
set => Volatile.Write(ref _invocationCount, value);
}

/// <summary>
/// 以线程安全方式记录一次行为执行,然后继续执行下一个处理阶段。
/// </summary>
/// <param name="message">当前流式请求消息。</param>
/// <param name="next">下一个处理阶段。</param>
/// <param name="cancellationToken">取消令牌。</param>
/// <returns>下游处理阶段返回的异步流。</returns>
public IAsyncEnumerable<TResponse> Handle(
TRequest message,
StreamMessageHandlerDelegate<TRequest, TResponse> next,
CancellationToken cancellationToken)
{
Interlocked.Increment(ref _invocationCount);
return next(message, cancellationToken);
}
}
15 changes: 15 additions & 0 deletions GFramework.Core/Architectures/Architecture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,21 @@ public void RegisterCqrsPipelineBehavior<TBehavior>() where TBehavior : class
_modules.RegisterCqrsPipelineBehavior<TBehavior>();
}

/// <summary>
/// 注册 CQRS 流式请求管道行为。
/// 可以传入开放泛型行为类型,也可以传入绑定到特定流式请求的封闭行为类型。
/// </summary>
/// <typeparam name="TBehavior">行为类型,必须是引用类型</typeparam>
/// <exception cref="InvalidOperationException">当前架构的底层容器已冻结,无法继续注册流式管道行为。</exception>
/// <exception cref="ObjectDisposedException">当前架构的底层容器已释放,无法继续注册流式管道行为。</exception>
/// <remarks>
/// 该调用会委托到底层容器完成校验与注册,因此应在初始化冻结前完成所有流式行为接线。
/// </remarks>
public void RegisterCqrsStreamPipelineBehavior<TBehavior>() where TBehavior : class
{
_modules.RegisterCqrsStreamPipelineBehavior<TBehavior>();
}

/// <summary>
/// 从指定程序集显式注册 CQRS 处理器。
/// 该入口适用于把拆分到其他模块或扩展包程序集中的 handlers 接入当前架构。
Expand Down
13 changes: 13 additions & 0 deletions GFramework.Core/Architectures/ArchitectureModules.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,19 @@ public void RegisterCqrsPipelineBehavior<TBehavior>() where TBehavior : class
services.Container.RegisterCqrsPipelineBehavior<TBehavior>();
}

/// <summary>
/// 注册 CQRS 流式请求管道行为。
/// 支持开放泛型行为类型和针对单一流式请求的封闭行为类型。
/// </summary>
/// <typeparam name="TBehavior">行为类型,必须是引用类型</typeparam>
/// <exception cref="InvalidOperationException">底层容器已冻结,无法继续注册流式管道行为。</exception>
/// <exception cref="ObjectDisposedException">底层容器已释放,无法继续注册流式管道行为。</exception>
public void RegisterCqrsStreamPipelineBehavior<TBehavior>() where TBehavior : class
{
logger.Debug($"Registering CQRS stream pipeline behavior: {typeof(TBehavior).Name}");
services.Container.RegisterCqrsStreamPipelineBehavior<TBehavior>();
}

/// <summary>
/// 从指定程序集显式注册 CQRS 处理器。
/// 该入口用于把默认架构程序集之外的扩展处理器接入当前架构容器。
Expand Down
100 changes: 72 additions & 28 deletions GFramework.Core/Ioc/MicrosoftDiContainer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -485,41 +485,85 @@ public void RegisterCqrsPipelineBehavior<TBehavior>() where TBehavior : class
try
{
ThrowIfFrozen();
RegisterCqrsPipelineBehaviorCore(
typeof(TBehavior),
typeof(IPipelineBehavior<,>),
"IPipelineBehavior<,>",
"pipeline behavior");
}
finally
{
_lock.ExitWriteLock();
}
}

/// <summary>
/// 注册 CQRS 流式请求管道行为。
/// 同时支持开放泛型行为类型和已闭合的具体行为类型,
/// 以兼容通用行为和针对单一流式请求的专用行为两种注册方式。
/// </summary>
/// <typeparam name="TBehavior">行为类型,必须是引用类型</typeparam>
public void RegisterCqrsStreamPipelineBehavior<TBehavior>() where TBehavior : class
{
ThrowIfDisposed();
EnterWriteLockOrThrowDisposed();
try
{
ThrowIfFrozen();
RegisterCqrsPipelineBehaviorCore(
typeof(TBehavior),
typeof(IStreamPipelineBehavior<,>),
"IStreamPipelineBehavior<,>",
"stream pipeline behavior");
}
finally
{
_lock.ExitWriteLock();
}
}

var behaviorType = typeof(TBehavior);
/// <summary>
/// 复用 CQRS 行为注册的开放泛型/封闭接口校验逻辑,
/// 让 request 与 stream 两条入口保持一致的容器注册语义。
/// </summary>
/// <param name="behaviorType">待注册的行为运行时类型。</param>
/// <param name="openGenericInterfaceType">行为必须实现的开放泛型接口类型。</param>
/// <param name="interfaceTypeDisplayName">用于日志与异常的接口显示名称。</param>
/// <param name="registrationLabel">用于日志的注册类别名称。</param>
/// <exception cref="InvalidOperationException"><paramref name="behaviorType" /> 未实现目标行为接口。</exception>
private void RegisterCqrsPipelineBehaviorCore(
Type behaviorType,
Type openGenericInterfaceType,
string interfaceTypeDisplayName,
string registrationLabel)
{
if (behaviorType.IsGenericTypeDefinition)
{
GetServicesUnsafe.AddSingleton(openGenericInterfaceType, behaviorType);
}
else
{
var pipelineInterfaces = behaviorType
.GetInterfaces()
.Where(type => type.IsGenericType &&
type.GetGenericTypeDefinition() == openGenericInterfaceType)
.ToList();

if (behaviorType.IsGenericTypeDefinition)
if (pipelineInterfaces.Count == 0)
{
GetServicesUnsafe.AddSingleton(typeof(IPipelineBehavior<,>), behaviorType);
var errorMessage = $"{behaviorType.Name} does not implement {interfaceTypeDisplayName}";
_logger.Error(errorMessage);
throw new InvalidOperationException(errorMessage);
}
else
{
var pipelineInterfaces = behaviorType
.GetInterfaces()
.Where(type => type.IsGenericType &&
type.GetGenericTypeDefinition() == typeof(IPipelineBehavior<,>))
.ToList();

if (pipelineInterfaces.Count == 0)
{
var errorMessage = $"{behaviorType.Name} does not implement IPipelineBehavior<,>";
_logger.Error(errorMessage);
throw new InvalidOperationException(errorMessage);
}

// 为每个已闭合的管道接口建立显式映射,支持针对特定请求/响应的专用行为。
foreach (var pipelineInterface in pipelineInterfaces)
{
GetServicesUnsafe.AddSingleton(pipelineInterface, behaviorType);
}
// 为每个已闭合的行为接口建立显式映射,支持针对特定请求/响应对的专用行为。
foreach (var pipelineInterface in pipelineInterfaces)
{
GetServicesUnsafe.AddSingleton(pipelineInterface, behaviorType);
}

_logger.Debug($"CQRS pipeline behavior registered: {behaviorType.Name}");
}
finally
{
_lock.ExitWriteLock();
}

_logger.Debug($"CQRS {registrationLabel} registered: {behaviorType.Name}");
}

/// <summary>
Expand Down
Loading
Loading