Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
// Copyright (c) 2025-2026 GeWuYou
// SPDX-License-Identifier: Apache-2.0

using System.Reflection;
using GFramework.Core.Abstractions.Architectures;
using GFramework.Core.Abstractions.Logging;
using GFramework.Core.Abstractions.Utility;
using GFramework.Core.Architectures;
using GFramework.Core.Logging;
using GFramework.Cqrs;
using GFramework.Cqrs.Abstractions.Cqrs;
using GFramework.Cqrs.Notification;
using Microsoft.Extensions.DependencyInjection;

namespace GFramework.Core.Tests.Architectures;
Expand All @@ -27,6 +30,7 @@ public void SetUp()
{
LoggerFactoryResolver.Provider = new ConsoleLoggerFactoryProvider();
GameContext.Clear();
AdditionalAssemblyNotificationHandlerState.Reset();
TrackingPipelineBehavior<ModuleBehaviorRequest, string>.InvocationCount = 0;
TrackingStreamPipelineBehavior<ModuleStreamBehaviorRequest, int>.InvocationCount = 0;
}
Expand All @@ -37,6 +41,7 @@ public void SetUp()
[TearDown]
public void TearDown()
{
AdditionalAssemblyNotificationHandlerState.Reset();
GameContext.Clear();
TrackingPipelineBehavior<ModuleBehaviorRequest, string>.InvocationCount = 0;
TrackingStreamPipelineBehavior<ModuleStreamBehaviorRequest, int>.InvocationCount = 0;
Expand Down Expand Up @@ -156,6 +161,35 @@ public async Task InitializeAsync_Should_AutoRegister_LegacyBridgeHandlers_For_D
}
}

/// <summary>
/// 验证标准架构启动路径会复用通过 <see cref="Architecture.Configurator" /> 声明的自定义 notification publisher,
/// 而不是在 <see cref="GFramework.Core.Services.Modules.CqrsRuntimeModule" /> 创建 runtime 时提前固化默认顺序策略。
/// </summary>
[Test]
public async Task InitializeAsync_Should_Reuse_Custom_NotificationPublisher_From_Configurator()
{
var generatedAssembly = CreateGeneratedHandlerAssembly();
var architecture = new ConfiguredNotificationPublisherArchitecture(generatedAssembly.Object);

await architecture.InitializeAsync();
try
{
var probe = architecture.Context.GetService<ArchitectureNotificationPublisherProbe>();

await architecture.Context.PublishAsync(new AdditionalAssemblyNotification());

Assert.Multiple(() =>
{
Assert.That(probe.WasCalled, Is.True);
Assert.That(AdditionalAssemblyNotificationHandlerState.InvocationCount, Is.EqualTo(1));
});
}
finally
{
await architecture.DestroyAsync();
}
}

/// <summary>
/// 用于测试模块行为的最小架构实现。
/// </summary>
Expand Down Expand Up @@ -191,6 +225,31 @@ protected override void OnInitialize()
}
}

/// <summary>
/// 通过标准架构启动路径声明自定义 notification publisher 的最小架构。
/// </summary>
private sealed class ConfiguredNotificationPublisherArchitecture(Assembly generatedAssembly) : Architecture
{
/// <summary>
/// 在服务钩子阶段注册 probe 与自定义 publisher,
/// 以模拟真实项目在组合根里通过 <see cref="IServiceCollection" /> 覆盖默认策略的路径。
/// </summary>
public override Action<IServiceCollection>? Configurator => services =>
{
services.AddSingleton<ArchitectureNotificationPublisherProbe>();
services.AddSingleton<INotificationPublisher, ArchitectureTrackingNotificationPublisher>();
};

/// <summary>
/// 在用户初始化阶段显式接入额外程序集里的 notification handler,
/// 让测试聚焦“publisher 是否被复用”,而不是依赖当前测试文件自己的 handler 扫描形状。
/// </summary>
protected override void OnInitialize()
{
RegisterCqrsHandlersFromAssembly(generatedAssembly);
}
}

/// <summary>
/// 记录模块安装调用情况的测试模块。
/// </summary>
Expand Down Expand Up @@ -225,6 +284,69 @@ private sealed class InstalledByModuleUtility : IUtility
{
}

/// <summary>
/// 创建一个仅暴露程序集级 CQRS registry 元数据的 mocked Assembly。
/// 该测试替身模拟扩展程序集已经提供 notification handler registry,而架构只需在初始化时显式接入该程序集。
/// </summary>
/// <returns>包含程序集级 notification handler registry 元数据的 mocked Assembly。</returns>
private static Mock<Assembly> CreateGeneratedHandlerAssembly()
{
var generatedAssembly = new Mock<Assembly>();
generatedAssembly
.SetupGet(static assembly => assembly.FullName)
.Returns("GFramework.Core.Tests.Architectures.ExplicitAdditionalHandlers, Version=1.0.0.0");
generatedAssembly
.Setup(static assembly => assembly.GetCustomAttributes(typeof(CqrsHandlerRegistryAttribute), false))
.Returns([new CqrsHandlerRegistryAttribute(typeof(AdditionalAssemblyNotificationHandlerRegistry))]);
return generatedAssembly;
}

/// <summary>
/// 记录自定义 notification publisher 是否真正参与了标准架构启动路径下的 publish 调用。
/// </summary>
private sealed class ArchitectureNotificationPublisherProbe
{
/// <summary>
/// 获取 probe 是否已被 publisher 标记为执行过。
/// </summary>
public bool WasCalled { get; private set; }

/// <summary>
/// 记录当前 publish 调用已经命中了自定义 publisher。
/// </summary>
public void MarkCalled()
{
WasCalled = true;
}
}

/// <summary>
/// 依赖容器内 probe 的自定义 notification publisher。
/// 该类型通过显式标记 + 正常转发处理器执行,验证标准架构启动路径不会把自定义策略短路成默认顺序发布器。
/// </summary>
private sealed class ArchitectureTrackingNotificationPublisher(
ArchitectureNotificationPublisherProbe probe) : INotificationPublisher
{
/// <summary>
/// 记录自定义 publisher 已参与当前发布调用,并继续按处理器解析顺序转发执行。
/// </summary>
public async ValueTask PublishAsync<TNotification>(
NotificationPublishContext<TNotification> context,
CancellationToken cancellationToken = default)
where TNotification : INotification
{
ArgumentNullException.ThrowIfNull(context);
cancellationToken.ThrowIfCancellationRequested();

probe.MarkCalled();

foreach (var handler in context.Handlers)
{
await context.InvokeHandlerAsync(handler, cancellationToken).ConfigureAwait(false);
}
}
}

/// <summary>
/// 物化异步流为只读列表,便于断言 stream pipeline 行为的最终可观察结果。
/// </summary>
Expand Down
4 changes: 1 addition & 3 deletions GFramework.Core/Services/Modules/CqrsRuntimeModule.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
using GFramework.Core.Abstractions.Logging;
using GFramework.Cqrs;
using GFramework.Cqrs.Abstractions.Cqrs;
using GFramework.Cqrs.Notification;
using LegacyICqrsRuntime = GFramework.Core.Abstractions.Cqrs.ICqrsRuntime;

namespace GFramework.Core.Services.Modules;
Expand Down Expand Up @@ -46,8 +45,7 @@ public void Register(IIocContainer container)
var dispatcherLogger = LoggerFactoryResolver.Provider.CreateLogger("CqrsDispatcher");
var registrarLogger = LoggerFactoryResolver.Provider.CreateLogger("DefaultCqrsHandlerRegistrar");
var registrationLogger = LoggerFactoryResolver.Provider.CreateLogger("DefaultCqrsRegistrationService");
var notificationPublisher = container.Get<INotificationPublisher>();
var runtime = CqrsRuntimeFactory.CreateRuntime(container, dispatcherLogger, notificationPublisher);
var runtime = CqrsRuntimeFactory.CreateRuntime(container, dispatcherLogger);
var registrar = CqrsRuntimeFactory.CreateHandlerRegistrar(container, registrarLogger);

container.Register(runtime);
Expand Down
166 changes: 166 additions & 0 deletions GFramework.Cqrs.Tests/Cqrs/CqrsDispatcherCacheTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,53 @@ public async Task Dispatcher_Should_Cache_Request_Dispatch_Bindings_Per_Response
});
}

/// <summary>
/// 验证 request 的“是否存在 pipeline behavior”判定会按 dispatcher 实例缓存,
/// 让零行为请求在首次分发后不再重复查询容器,同时不同 dispatcher 不共享该实例级状态。
/// </summary>
[Test]
public async Task Dispatcher_Should_Cache_Zero_Pipeline_Request_Presence_Per_Dispatcher_Instance()
{
var firstContext = new ArchitectureContext(_container!);
var secondContext = new ArchitectureContext(_container!);
var firstDispatcher = GetDispatcherFromContext(firstContext);
var secondDispatcher = GetDispatcherFromContext(secondContext);
using var isolatedContainer = CreateFrozenContainer();
var isolatedContext = new ArchitectureContext(isolatedContainer);
var isolatedDispatcher = GetDispatcherFromContext(isolatedContext);

AssertRequestBehaviorPresenceIsUnset(firstDispatcher, typeof(IPipelineBehavior<DispatcherCacheRequest, int>));
AssertRequestBehaviorPresenceIsUnset(secondDispatcher, typeof(IPipelineBehavior<DispatcherCacheRequest, int>));
AssertRequestBehaviorPresenceIsUnset(isolatedDispatcher, typeof(IPipelineBehavior<DispatcherCacheRequest, int>));
AssertRequestBehaviorPresenceIsUnset(
firstDispatcher,
typeof(IPipelineBehavior<DispatcherPipelineCacheRequest, int>));

await firstContext.SendRequestAsync(new DispatcherCacheRequest());
await firstContext.SendRequestAsync(new DispatcherPipelineCacheRequest());

var zeroPipelinePresence = GetRequestBehaviorPresenceCacheValue(
firstDispatcher,
typeof(IPipelineBehavior<DispatcherCacheRequest, int>));
var onePipelinePresence = GetRequestBehaviorPresenceCacheValue(
firstDispatcher,
typeof(IPipelineBehavior<DispatcherPipelineCacheRequest, int>));

AssertSharedDispatcherCacheState(
firstDispatcher,
secondDispatcher,
isolatedDispatcher,
zeroPipelinePresence,
onePipelinePresence);

await isolatedContext.SendRequestAsync(new DispatcherCacheRequest());

AssertRequestBehaviorPresenceEquals(
isolatedDispatcher,
typeof(IPipelineBehavior<DispatcherCacheRequest, int>),
false);
}

/// <summary>
/// 验证 request pipeline executor 会按行为数量在 binding 内首次创建并在后续分发中复用。
/// </summary>
Expand Down Expand Up @@ -565,6 +612,57 @@ private static object GetCacheField(string fieldName)
$"Dispatcher cache field {fieldName} returned null.");
}

/// <summary>
/// 从架构上下文中解析当前延迟创建的 dispatcher 实例,便于验证其实例级热路径缓存。
/// </summary>
private static object GetDispatcherFromContext(ArchitectureContext context)
{
ArgumentNullException.ThrowIfNull(context);

var lazyRuntimeField = typeof(ArchitectureContext).GetField(
"_cqrsRuntime",
BindingFlags.Instance | BindingFlags.NonPublic);

Assert.That(lazyRuntimeField, Is.Not.Null, "Missing ArchitectureContext._cqrsRuntime field.");

var lazyRuntime = lazyRuntimeField!.GetValue(context)
?? throw new InvalidOperationException(
"ArchitectureContext._cqrsRuntime returned null.");
var lazyValueProperty = lazyRuntime.GetType().GetProperty(
"Value",
BindingFlags.Instance | BindingFlags.Public);

Assert.That(lazyValueProperty, Is.Not.Null, "Missing Lazy<ICqrsRuntime>.Value accessor.");

return lazyValueProperty!.GetValue(lazyRuntime)
?? throw new InvalidOperationException("Resolved CQRS runtime instance was null.");
}

/// <summary>
/// 创建与当前 fixture 注册形状一致、但拥有独立 runtime 实例的冻结容器,
/// 用于验证 dispatcher 的实例级缓存不会跨容器共享。
/// </summary>
private static MicrosoftDiContainer CreateFrozenContainer()
{
var container = new MicrosoftDiContainer();
container.RegisterCqrsPipelineBehavior<DispatcherPipelineCacheBehavior>();
container.RegisterCqrsPipelineBehavior<DispatcherPipelineContextRefreshBehavior>();
container.RegisterCqrsPipelineBehavior<DispatcherPipelineOrderOuterBehavior>();
container.RegisterCqrsPipelineBehavior<DispatcherPipelineOrderInnerBehavior>();
container.RegisterCqrsStreamPipelineBehavior<DispatcherStreamPipelineCacheBehavior>();
container.RegisterCqrsStreamPipelineBehavior<DispatcherStreamPipelineContextRefreshBehavior>();
container.RegisterCqrsStreamPipelineBehavior<DispatcherStreamPipelineOrderOuterBehavior>();
container.RegisterCqrsStreamPipelineBehavior<DispatcherStreamPipelineOrderInnerBehavior>();

CqrsTestRuntime.RegisterHandlers(
container,
typeof(CqrsDispatcherCacheTests).Assembly,
typeof(ArchitectureContext).Assembly);

container.Freeze();
return container;
}

/// <summary>
/// 清空本测试依赖的 dispatcher 静态缓存,避免跨用例共享进程级状态导致断言漂移。
/// </summary>
Expand All @@ -591,6 +689,74 @@ private static void ClearDispatcherCaches()
return InvokeInstanceMethod(cache, "GetValueOrDefaultForTesting", primaryType, secondaryType);
}

/// <summary>
/// 读取指定 dispatcher 实例中当前保存的 request behavior presence 缓存项。
/// </summary>
private static object? GetRequestBehaviorPresenceCacheValue(object dispatcher, Type behaviorType)
{
var field = dispatcher.GetType().GetField(
"_requestBehaviorPresenceCache",
BindingFlags.Instance | BindingFlags.NonPublic);

Assert.That(field, Is.Not.Null, "Missing dispatcher request behavior presence cache field.");

var cache = field!.GetValue(dispatcher)
?? throw new InvalidOperationException(
"Dispatcher request behavior presence cache returned null.");
var tryGetValueMethod = cache.GetType().GetMethod(
"TryGetValue",
BindingFlags.Instance | BindingFlags.Public);

Assert.That(tryGetValueMethod, Is.Not.Null, "Missing ConcurrentDictionary.TryGetValue accessor.");

object?[] arguments = [behaviorType, null];
var found = (bool)(tryGetValueMethod!.Invoke(cache, arguments)
?? throw new InvalidOperationException(
"ConcurrentDictionary.TryGetValue returned null."));
return found ? arguments[1] : null;
}

/// <summary>
/// 断言指定 dispatcher 上某个 request behavior presence 缓存项尚未建立。
/// </summary>
private static void AssertRequestBehaviorPresenceIsUnset(object dispatcher, Type behaviorType)
{
Assert.That(GetRequestBehaviorPresenceCacheValue(dispatcher, behaviorType), Is.Null);
}

/// <summary>
/// 断言指定 dispatcher 上某个 request behavior presence 缓存项等于预期值。
/// </summary>
private static void AssertRequestBehaviorPresenceEquals(object dispatcher, Type behaviorType, bool expected)
{
Assert.That(GetRequestBehaviorPresenceCacheValue(dispatcher, behaviorType), Is.EqualTo(expected));
}

/// <summary>
/// 断言同一容器解析出的 dispatcher 会共享实例级缓存,而另一独立容器的 dispatcher 不会提前命中。
/// </summary>
private static void AssertSharedDispatcherCacheState(
object firstDispatcher,
object secondDispatcher,
object isolatedDispatcher,
object? zeroPipelinePresence,
object? onePipelinePresence)
{
Assert.Multiple(() =>
{
Assert.That(secondDispatcher, Is.SameAs(firstDispatcher));
Assert.That(zeroPipelinePresence, Is.EqualTo(false));
Assert.That(onePipelinePresence, Is.EqualTo(true));
AssertRequestBehaviorPresenceEquals(
secondDispatcher,
typeof(IPipelineBehavior<DispatcherCacheRequest, int>),
false);
AssertRequestBehaviorPresenceIsUnset(
isolatedDispatcher,
typeof(IPipelineBehavior<DispatcherCacheRequest, int>));
});
}

/// <summary>
/// 读取 request dispatch binding 中指定行为数量的 pipeline executor 缓存项。
/// </summary>
Expand Down
Loading
Loading