Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,12 @@ jobs:
--logger "trx;LogFileName=sg-$RANDOM.trx" \
--results-directory TestResults &

dotnet test GFramework.Cqrs.Tests \
-c Release \
--no-build \
--logger "trx;LogFileName=cqrs-$RANDOM.trx" \
--results-directory TestResults &

dotnet test GFramework.Ecs.Arch.Tests \
-c Release \
--no-build \
Expand Down
51 changes: 51 additions & 0 deletions GFramework.Core.Abstractions/Cqrs/ICqrsRuntime.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
using GFramework.Core.Abstractions.Architectures;

namespace GFramework.Core.Abstractions.Cqrs;

/// <summary>
/// 定义架构上下文使用的 CQRS runtime seam。
/// 该抽象把请求分发、通知发布与流式处理从具体实现中解耦,
/// 使 <see cref="IArchitectureContext" /> 不再直接依赖某个固定的 runtime 类型。
/// </summary>
public interface ICqrsRuntime
{
/// <summary>
/// 发送请求并返回响应。
/// </summary>
/// <typeparam name="TResponse">响应类型。</typeparam>
/// <param name="context">当前架构上下文,用于上下文感知处理器注入与嵌套请求访问。</param>
/// <param name="request">要分发的请求。</param>
/// <param name="cancellationToken">取消令牌。</param>
/// <returns>请求响应。</returns>
ValueTask<TResponse> SendAsync<TResponse>(
IArchitectureContext context,
IRequest<TResponse> request,
CancellationToken cancellationToken = default);

/// <summary>
/// 发布通知到所有已注册处理器。
/// </summary>
/// <typeparam name="TNotification">通知类型。</typeparam>
/// <param name="context">当前架构上下文,用于上下文感知处理器注入。</param>
/// <param name="notification">要发布的通知。</param>
/// <param name="cancellationToken">取消令牌。</param>
/// <returns>表示通知分发完成的值任务。</returns>
ValueTask PublishAsync<TNotification>(
IArchitectureContext context,
TNotification notification,
CancellationToken cancellationToken = default)
where TNotification : INotification;

/// <summary>
/// 创建流式请求的异步响应序列。
/// </summary>
/// <typeparam name="TResponse">流元素类型。</typeparam>
/// <param name="context">当前架构上下文,用于上下文感知处理器注入。</param>
/// <param name="request">流式请求。</param>
/// <param name="cancellationToken">取消令牌。</param>
/// <returns>按需生成的异步响应序列。</returns>
IAsyncEnumerable<TResponse> CreateStream<TResponse>(
IArchitectureContext context,
IStreamRequest<TResponse> request,
CancellationToken cancellationToken = default);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
<ItemGroup>
<Using Include="GFramework.Core.Abstractions"/>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\GFramework.Cqrs.Abstractions\GFramework.Cqrs.Abstractions.csproj"/>
</ItemGroup>
<ItemGroup>
<PackageReference Update="Meziantou.Analyzer" Version="3.0.46">
<PrivateAssets>all</PrivateAssets>
Expand Down
36 changes: 36 additions & 0 deletions GFramework.Core.Tests/Cqrs/ContainerRegistrationFixtures.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright (c) 2026 GeWuYou
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

namespace GFramework.Core.Tests.Cqrs;

/// <summary>
/// 为容器层测试提供可扫描的最小通知夹具。
/// </summary>
internal sealed record DeterministicOrderNotification : INotification;

Check failure on line 19 in GFramework.Core.Tests/Cqrs/ContainerRegistrationFixtures.cs

View workflow job for this annotation

GitHub Actions / Build and Test

The type or namespace name 'INotification' could not be found (are you missing a using directive or an assembly reference?)

Check failure on line 19 in GFramework.Core.Tests/Cqrs/ContainerRegistrationFixtures.cs

View workflow job for this annotation

GitHub Actions / Build and Test

The type or namespace name 'INotification' could not be found (are you missing a using directive or an assembly reference?)

/// <summary>
/// 供容器注册测试验证程序集扫描结果的通知处理器。
/// </summary>
internal sealed class DeterministicOrderNotificationHandler : INotificationHandler<DeterministicOrderNotification>

Check failure on line 24 in GFramework.Core.Tests/Cqrs/ContainerRegistrationFixtures.cs

View workflow job for this annotation

GitHub Actions / Build and Test

The type or namespace name 'INotificationHandler<>' could not be found (are you missing a using directive or an assembly reference?)

Check failure on line 24 in GFramework.Core.Tests/Cqrs/ContainerRegistrationFixtures.cs

View workflow job for this annotation

GitHub Actions / Build and Test

The type or namespace name 'INotificationHandler<>' could not be found (are you missing a using directive or an assembly reference?)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
{
/// <summary>
/// 无副作用地消费通知。
/// </summary>
/// <param name="notification">通知实例。</param>
/// <param name="cancellationToken">取消令牌。</param>
/// <returns>已完成任务。</returns>
public ValueTask Handle(DeterministicOrderNotification notification, CancellationToken cancellationToken)
{
return ValueTask.CompletedTask;
}
}
62 changes: 62 additions & 0 deletions GFramework.Core.Tests/CqrsTestRuntime.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Reflection;
using GFramework.Core.Abstractions.Cqrs;
using GFramework.Core.Abstractions.Ioc;
using GFramework.Core.Abstractions.Logging;
using GFramework.Core.Architectures;
Expand Down Expand Up @@ -38,6 +39,65 @@ internal static class CqrsTestRuntime
?? throw new InvalidOperationException(
"Failed to locate CqrsHandlerRegistrar.RegisterHandlers.");

private static readonly Type CqrsDispatcherType = typeof(ArchitectureContext).Assembly
.GetType(
"GFramework.Core.Cqrs.Internal.CqrsDispatcher",
throwOnError: true)!
?? throw new InvalidOperationException(
"Failed to locate CqrsDispatcher type.");

private static readonly ConstructorInfo CqrsDispatcherConstructor = CqrsDispatcherType.GetConstructor(
BindingFlags.Instance |
BindingFlags.Public |
BindingFlags.NonPublic,
binder: null,
[
typeof(IIocContainer),
typeof(ILogger)
],
modifiers: null)
?? throw new InvalidOperationException(
"Failed to locate CqrsDispatcher constructor.");

private static readonly Type DefaultCqrsHandlerRegistrarType = typeof(ArchitectureContext).Assembly
.GetType(
"GFramework.Core.Cqrs.Internal.DefaultCqrsHandlerRegistrar",
throwOnError: true)!
?? throw new InvalidOperationException(
"Failed to locate DefaultCqrsHandlerRegistrar type.");

private static readonly ConstructorInfo DefaultCqrsHandlerRegistrarConstructor =
DefaultCqrsHandlerRegistrarType.GetConstructor(
BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic,
binder: null,
[
typeof(IIocContainer),
typeof(ILogger)
],
modifiers: null)
?? throw new InvalidOperationException(
"Failed to locate DefaultCqrsHandlerRegistrar constructor.");

/// <summary>
/// 为裸测试容器补齐默认 CQRS runtime seam。
/// 这使仅使用 <see cref="MicrosoftDiContainer" /> 的测试环境也能观察与生产路径一致的 runtime 行为,
/// 而无需完整启动服务模块管理器。
/// </summary>
/// <param name="container">目标测试容器。</param>
internal static void RegisterInfrastructure(MicrosoftDiContainer container)
{
ArgumentNullException.ThrowIfNull(container);

var runtimeLogger = LoggerFactoryResolver.Provider.CreateLogger("CqrsDispatcher");
var registrarLogger = LoggerFactoryResolver.Provider.CreateLogger(nameof(CqrsTestRuntime));
var runtime = (ICqrsRuntime)CqrsDispatcherConstructor.Invoke([container, runtimeLogger]);
var registrar =
(ICqrsHandlerRegistrar)DefaultCqrsHandlerRegistrarConstructor.Invoke([container, registrarLogger]);

container.Register<ICqrsRuntime>(runtime);
container.Register<ICqrsHandlerRegistrar>(registrar);
}

/// <summary>
/// 通过与生产代码一致的注册入口扫描并注册指定程序集中的 CQRS 处理器。
/// </summary>
Expand All @@ -48,6 +108,8 @@ internal static void RegisterHandlers(MicrosoftDiContainer container, params Ass
ArgumentNullException.ThrowIfNull(container);
ArgumentNullException.ThrowIfNull(assemblies);

RegisterInfrastructure(container);

var logger = LoggerFactoryResolver.Provider.CreateLogger(nameof(CqrsTestRuntime));
RegisterHandlersMethod.Invoke(
null,
Expand Down
11 changes: 7 additions & 4 deletions GFramework.Core.Tests/Ioc/MicrosoftDiContainerTests.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using System.Reflection;
using GFramework.Core.Abstractions.Bases;
using GFramework.Core.Abstractions.Cqrs;
using GFramework.Core.Ioc;
using GFramework.Core.Logging;
using GFramework.Core.Tests.Cqrs;
Expand All @@ -14,6 +13,8 @@ namespace GFramework.Core.Tests.Ioc;
[TestFixture]
public class MicrosoftDiContainerTests
{
private MicrosoftDiContainer _container = null!;

/// <summary>
/// 在每个测试方法执行前进行设置
/// </summary>
Expand All @@ -29,9 +30,9 @@ public void SetUp()
BindingFlags.NonPublic | BindingFlags.Instance);
loggerField?.SetValue(_container,
LoggerFactoryResolver.Provider.CreateLogger(nameof(MicrosoftDiContainer)));
}

private MicrosoftDiContainer _container = null!;
CqrsTestRuntime.RegisterInfrastructure(_container);
}

/// <summary>
/// 测试注册单例实例的功能
Expand Down Expand Up @@ -314,7 +315,7 @@ public void Clear_Should_Remove_All_Instances()
[Test]
public void Clear_Should_Reset_Cqrs_Assembly_Deduplication_State()
{
var assembly = typeof(CqrsHandlerRegistrarTests).Assembly;
var assembly = typeof(DeterministicOrderNotification).Assembly;

_container.RegisterCqrsHandlersFromAssembly(assembly);
Assert.That(
Expand All @@ -328,6 +329,8 @@ public void Clear_Should_Reset_Cqrs_Assembly_Deduplication_State()
descriptor.ServiceType == typeof(INotificationHandler<DeterministicOrderNotification>)),
Is.False);

// Clear 会移除测试手工补齐的 CQRS seam,需要先恢复基础设施再验证程序集去重状态是否已重置。
CqrsTestRuntime.RegisterInfrastructure(_container);
_container.RegisterCqrsHandlersFromAssembly(assembly);

Assert.That(
Expand Down
31 changes: 13 additions & 18 deletions GFramework.Core/Architectures/ArchitectureContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,13 @@
using GFramework.Core.Abstractions.Architectures;
using GFramework.Core.Abstractions.Command;
using GFramework.Core.Abstractions.Cqrs;
using GFramework.Core.Abstractions.Cqrs.Command;
using GFramework.Core.Abstractions.Cqrs.Query;
using GFramework.Core.Abstractions.Environment;
using GFramework.Core.Abstractions.Events;
using GFramework.Core.Abstractions.Ioc;
using GFramework.Core.Abstractions.Logging;
using GFramework.Core.Abstractions.Model;
using GFramework.Core.Abstractions.Query;
using GFramework.Core.Abstractions.Systems;
using GFramework.Core.Abstractions.Utility;
using GFramework.Core.Cqrs.Internal;
using GFramework.Core.Logging;
using ICommand = GFramework.Core.Abstractions.Command.ICommand;

namespace GFramework.Core.Architectures;
Expand All @@ -25,15 +20,15 @@ public class ArchitectureContext(IIocContainer container) : IArchitectureContext
{
private readonly IIocContainer _container = container ?? throw new ArgumentNullException(nameof(container));
private readonly ConcurrentDictionary<Type, object> _serviceCache = new();
private readonly ILogger _logger = LoggerFactoryResolver.Provider.CreateLogger(nameof(ArchitectureContext));
private CqrsDispatcher? _cqrsDispatcher;
private ICqrsRuntime? _cqrsRuntime;

#region CQRS Integration

/// <summary>
/// 获取 CQRS 运行时分发器(延迟初始化)。
/// 获取 CQRS runtime seam(延迟初始化)。
/// </summary>
private CqrsDispatcher CqrsDispatcher => _cqrsDispatcher ??= new CqrsDispatcher(_container, this, _logger);
private ICqrsRuntime CqrsRuntime => _cqrsRuntime ??=
_container.Get<ICqrsRuntime>() ?? throw new InvalidOperationException("ICqrsRuntime not registered");

/// <summary>
/// 获取指定类型的服务实例,如果缓存中存在则直接返回,否则从容器中获取并缓存
Expand Down Expand Up @@ -73,7 +68,7 @@ public async ValueTask<TResponse> SendRequestAsync<TResponse>(
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(request);
return await CqrsDispatcher.SendAsync(request, cancellationToken);
return await CqrsRuntime.SendAsync(this, request, cancellationToken);
}

/// <summary>
Expand All @@ -100,7 +95,7 @@ public async ValueTask PublishAsync<TNotification>(
where TNotification : INotification
{
ArgumentNullException.ThrowIfNull(notification);
await CqrsDispatcher.PublishAsync(notification, cancellationToken);
await CqrsRuntime.PublishAsync(this, notification, cancellationToken);
}

/// <summary>
Expand All @@ -115,7 +110,7 @@ public IAsyncEnumerable<TResponse> CreateStream<TResponse>(
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(request);
return CqrsDispatcher.CreateStream(request, cancellationToken);
return CqrsRuntime.CreateStream(this, request, cancellationToken);
}

/// <summary>
Expand Down Expand Up @@ -151,7 +146,7 @@ public async ValueTask<TResponse> SendAsync<TResponse>(
/// <typeparam name="TResult">查询结果类型</typeparam>
/// <param name="query">要发送的查询</param>
/// <returns>查询结果</returns>
public TResult SendQuery<TResult>(Abstractions.Query.IQuery<TResult> query)
public TResult SendQuery<TResult>(IQuery<TResult> query)
{
if (query == null) throw new ArgumentNullException(nameof(query));
var queryBus = GetOrCache<IQueryExecutor>();
Expand All @@ -165,7 +160,7 @@ public TResult SendQuery<TResult>(Abstractions.Query.IQuery<TResult> query)
/// <typeparam name="TResponse">查询响应类型</typeparam>
/// <param name="query">要发送的查询对象</param>
/// <returns>查询结果</returns>
public TResponse SendQuery<TResponse>(GFramework.Core.Abstractions.Cqrs.Query.IQuery<TResponse> query)
public TResponse SendQuery<TResponse>(Abstractions.Cqrs.Query.IQuery<TResponse> query)
{
return SendQueryAsync(query).AsTask().GetAwaiter().GetResult();
}
Expand All @@ -191,7 +186,7 @@ public async Task<TResult> SendQueryAsync<TResult>(IAsyncQuery<TResult> query)
/// <param name="query">要发送的查询对象</param>
/// <param name="cancellationToken">取消令牌,用于取消操作</param>
/// <returns>包含查询结果的ValueTask</returns>
public async ValueTask<TResponse> SendQueryAsync<TResponse>(GFramework.Core.Abstractions.Cqrs.Query.IQuery<TResponse> query,
public async ValueTask<TResponse> SendQueryAsync<TResponse>(Abstractions.Cqrs.Query.IQuery<TResponse> query,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(query);
Expand Down Expand Up @@ -327,7 +322,7 @@ public IReadOnlyList<TUtility> GetUtilitiesByPriority<TUtility>() where TUtility
/// <param name="command">要发送的命令对象</param>
/// <param name="cancellationToken">取消令牌,用于取消操作</param>
/// <returns>包含命令执行结果的ValueTask</returns>
public async ValueTask<TResponse> SendCommandAsync<TResponse>(GFramework.Core.Abstractions.Cqrs.Command.ICommand<TResponse> command,
public async ValueTask<TResponse> SendCommandAsync<TResponse>(Abstractions.Cqrs.Command.ICommand<TResponse> command,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(command);
Expand Down Expand Up @@ -366,7 +361,7 @@ public async Task<TResult> SendCommandAsync<TResult>(IAsyncCommand<TResult> comm
/// <typeparam name="TResponse">命令响应类型</typeparam>
/// <param name="command">要发送的命令对象</param>
/// <returns>命令执行结果</returns>
public TResponse SendCommand<TResponse>(GFramework.Core.Abstractions.Cqrs.Command.ICommand<TResponse> command)
public TResponse SendCommand<TResponse>(Abstractions.Cqrs.Command.ICommand<TResponse> command)
{
return SendCommandAsync(command).AsTask().GetAwaiter().GetResult();
}
Expand All @@ -388,7 +383,7 @@ public void SendCommand(ICommand command)
/// <typeparam name="TResult">命令执行结果类型</typeparam>
/// <param name="command">要发送的命令</param>
/// <returns>命令执行结果</returns>
public TResult SendCommand<TResult>(Abstractions.Command.ICommand<TResult> command)
public TResult SendCommand<TResult>(ICommand<TResult> command)
{
ArgumentNullException.ThrowIfNull(command);
var commandBus = GetOrCache<ICommandExecutor>();
Expand Down
Loading
Loading