Skip to content
527 changes: 527 additions & 0 deletions GFramework.Cqrs.Benchmarks/Messaging/StreamPipelineBenchmarks.cs

Large diffs are not rendered by default.

42 changes: 35 additions & 7 deletions GFramework.Cqrs.Benchmarks/Messaging/StreamingBenchmarks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@
using GFramework.Cqrs.Abstractions.Cqrs;
using MediatR;
using Microsoft.Extensions.DependencyInjection;
using GeneratedMediator = Mediator.Mediator;

[assembly: GFramework.Cqrs.CqrsHandlerRegistryAttribute(
typeof(GFramework.Cqrs.Benchmarks.Messaging.GeneratedDefaultStreamingBenchmarkRegistry))]

namespace GFramework.Cqrs.Benchmarks.Messaging;

/// <summary>
/// 对比单个 stream request 在直接调用、GFramework.CQRS runtime 与 MediatR 之间的 steady-state stream 开销。
/// 对比单个 stream request 在直接调用、GFramework.CQRS runtime、NuGet `Mediator` 与 MediatR 之间的 steady-state stream 开销。
/// </summary>
/// <remarks>
/// 默认 generated-provider stream 宿主同时暴露 <see cref="StreamObservation.FirstItem" /> 与
Expand All @@ -36,8 +37,10 @@ public class StreamingBenchmarks
{
private MicrosoftDiContainer _container = null!;
private ICqrsRuntime _runtime = null!;
private ServiceProvider _serviceProvider = null!;
private ServiceProvider _mediatrServiceProvider = null!;
private ServiceProvider _mediatorServiceProvider = null!;
private IMediator _mediatr = null!;
private GeneratedMediator _mediator = null!;
private BenchmarkStreamHandler _baselineHandler = null!;
private BenchmarkStreamRequest _request = null!;

Expand Down Expand Up @@ -100,25 +103,28 @@ public void Setup()
_container,
LoggerFactoryResolver.Provider.CreateLogger(nameof(StreamingBenchmarks)));

_serviceProvider = BenchmarkHostFactory.CreateMediatRServiceProvider(
_mediatrServiceProvider = BenchmarkHostFactory.CreateMediatRServiceProvider(
configure: null,
typeof(StreamingBenchmarks),
static candidateType => candidateType == typeof(BenchmarkStreamHandler),
ServiceLifetime.Singleton);
_mediatr = _serviceProvider.GetRequiredService<IMediator>();
_mediatr = _mediatrServiceProvider.GetRequiredService<IMediator>();

_mediatorServiceProvider = BenchmarkHostFactory.CreateMediatorServiceProvider(configure: null);
_mediator = _mediatorServiceProvider.GetRequiredService<GeneratedMediator>();

_request = new BenchmarkStreamRequest(Guid.NewGuid(), 3);
}

/// <summary>
/// 释放 MediatR 对照组使用的 DI 宿主。
/// 释放 MediatR 与 `Mediator` 对照组使用的 DI 宿主。
/// </summary>
[GlobalCleanup]
public void Cleanup()
{
try
{
BenchmarkCleanupHelper.DisposeAll(_container, _serviceProvider);
BenchmarkCleanupHelper.DisposeAll(_container, _mediatrServiceProvider, _mediatorServiceProvider);
}
finally
{
Expand Down Expand Up @@ -158,6 +164,16 @@ public ValueTask Stream_MediatR()
return ObserveAsync(_mediatr.CreateStream(_request, CancellationToken.None), Observation);
}

/// <summary>
/// 通过 `ai-libs/Mediator` 的 source-generated concrete mediator 创建 stream,并按当前观测模式消费。
/// </summary>
/// <returns>按当前观测模式完成 stream 消费后的等待句柄。</returns>
[Benchmark]
public ValueTask Stream_Mediator()
{
return ObserveAsync(_mediator.CreateStream(_request, CancellationToken.None), Observation);
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

/// <summary>
/// 按观测模式消费 stream,便于把“建流/首个元素”和“完整枚举”分开观察。
/// </summary>
Expand Down Expand Up @@ -224,6 +240,7 @@ private static async ValueTask DrainAsync<TResponse>(IAsyncEnumerable<TResponse>
/// <param name="ItemCount">返回元素数量。</param>
public sealed record BenchmarkStreamRequest(Guid Id, int ItemCount) :
GFramework.Cqrs.Abstractions.Cqrs.IStreamRequest<BenchmarkResponse>,
Mediator.IStreamRequest<BenchmarkResponse>,
MediatR.IStreamRequest<BenchmarkResponse>;

/// <summary>
Expand All @@ -233,10 +250,11 @@ public sealed record BenchmarkStreamRequest(Guid Id, int ItemCount) :
public sealed record BenchmarkResponse(Guid Id);

/// <summary>
/// 同时实现 GFramework.CQRS 与 MediatR 契约的最小 stream handler。
/// 同时实现 GFramework.CQRS、NuGet `Mediator` 与 MediatR 契约的最小 stream handler。
/// </summary>
public sealed class BenchmarkStreamHandler :
GFramework.Cqrs.Abstractions.Cqrs.IStreamRequestHandler<BenchmarkStreamRequest, BenchmarkResponse>,
Mediator.IStreamRequestHandler<BenchmarkStreamRequest, BenchmarkResponse>,
MediatR.IStreamRequestHandler<BenchmarkStreamRequest, BenchmarkResponse>
{
/// <summary>
Expand All @@ -249,6 +267,16 @@ public IAsyncEnumerable<BenchmarkResponse> Handle(
return EnumerateAsync(request, cancellationToken);
}

/// <summary>
/// 处理 NuGet `Mediator` stream request。
/// </summary>
IAsyncEnumerable<BenchmarkResponse> Mediator.IStreamRequestHandler<BenchmarkStreamRequest, BenchmarkResponse>.Handle(
BenchmarkStreamRequest request,
CancellationToken cancellationToken)
{
return Handle(request, cancellationToken);
}

/// <summary>
/// 处理 MediatR stream request。
/// </summary>
Expand Down
11 changes: 7 additions & 4 deletions GFramework.Cqrs.Benchmarks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,17 @@
- `Initialization` 与 `ColdStart` 两组下,`GFramework.Cqrs`、NuGet `Mediator`、`MediatR`
- stream steady-state
- `Messaging/StreamingBenchmarks.cs`
- baseline、默认 generated-provider 宿主接线的 `GFramework.Cqrs` runtime 与 `MediatR`
- baseline、默认 generated-provider 宿主接线的 `GFramework.Cqrs` runtime、NuGet `Mediator` source-generated concrete path 与 `MediatR`
- 同时提供 `FirstItem` 与 `DrainAll` 两种观测口径
- `Messaging/StreamLifetimeBenchmarks.cs`
- `Singleton / Scoped / Transient` 三类 handler 生命周期下,baseline、`GFramework.Cqrs` reflection stream binding、`GFramework.Cqrs` generated stream registry、`MediatR`
- 同时提供 `FirstItem` 与 `DrainAll` 两种观测口径
- `Messaging/StreamInvokerBenchmarks.cs`
- baseline、`GFramework.Cqrs` reflection stream binding、`GFramework.Cqrs` generated stream invoker、`MediatR`
- 同时提供 `FirstItem` 与 `DrainAll` 两种观测口径
- `Messaging/StreamPipelineBenchmarks.cs`
- `0 / 1 / 4` 个 stream pipeline 行为下,baseline、默认 generated-provider 宿主接线的 `GFramework.Cqrs` runtime 与 `MediatR`
- 同时提供 `FirstItem` 与 `DrainAll` 两种观测口径
- stream startup
- `Messaging/StreamStartupBenchmarks.cs`
- `Initialization` 与 `ColdStart` 两组下,`GFramework.Cqrs` reflection、`GFramework.Cqrs` generated、`MediatR`
Expand Down Expand Up @@ -68,6 +71,7 @@ dotnet run --project GFramework.Cqrs.Benchmarks/GFramework.Cqrs.Benchmarks.cspro
```bash
dotnet run --project GFramework.Cqrs.Benchmarks/GFramework.Cqrs.Benchmarks.csproj -c Release --no-build -- --filter "*RequestLifetimeBenchmarks.SendRequest_*"
dotnet run --project GFramework.Cqrs.Benchmarks/GFramework.Cqrs.Benchmarks.csproj -c Release --no-build -- --filter "*StreamLifetimeBenchmarks.Stream_*"
dotnet run --project GFramework.Cqrs.Benchmarks/GFramework.Cqrs.Benchmarks.csproj -c Release --no-build -- --filter "*StreamPipelineBenchmarks.Stream_*"
```

## 并发运行约束
Expand All @@ -87,7 +91,7 @@ dotnet run --project GFramework.Cqrs.Benchmarks/GFramework.Cqrs.Benchmarks.cspro

- `RequestLifetimeBenchmarks` 的 `Scoped` 场景会在每次 request 分发时显式创建并释放真实 DI 作用域;它观察的是 scoped handler 的解析与 dispatch 成本,不把 runtime 构造常量成本混入生命周期对照
- `NotificationLifetimeBenchmarks` 的 `Scoped` 场景也采用真实 DI 作用域;它比较的是 publish 路径上的生命周期额外开销,不是根容器解析退化后的近似值
- `StreamingBenchmarks`、`StreamLifetimeBenchmarks`、`StreamInvokerBenchmarks` 同时暴露 `FirstItem` 与 `DrainAll`
- `StreamingBenchmarks`、`StreamLifetimeBenchmarks`、`StreamInvokerBenchmarks`、`StreamPipelineBenchmarks` 同时暴露 `FirstItem` 与 `DrainAll`
- `FirstItem` 适合观察“建流到首个元素”的固定成本
- `DrainAll` 适合观察完整枚举整个 stream 的总成本
- `StreamStartupBenchmarks` 的 `ColdStart` 只推进到首个元素,因此它回答的是“新宿主下首次建流命中”的边界,不回答完整枚举总成本
Expand All @@ -96,7 +100,6 @@ dotnet run --project GFramework.Cqrs.Benchmarks/GFramework.Cqrs.Benchmarks.cspro

## 当前缺口

- 当前没有 stream 版的 NuGet `Mediator` source-generated concrete path 对照;stream steady-state、lifetime、startup 现在都只覆盖 `GFramework.Cqrs` 与 `MediatR`
- 当前没有 stream 生命周期与 startup 版的 NuGet `Mediator` source-generated concrete path 对照;`StreamLifetimeBenchmarks` 与 `StreamStartupBenchmarks` 现在都只覆盖 `GFramework.Cqrs` 与 `MediatR`
- 当前没有 request 生命周期下的 NuGet `Mediator` compile-time lifetime 矩阵;`RequestLifetimeBenchmarks` 只覆盖 `GFramework.Cqrs` 与 `MediatR`
- 当前没有 notification fan-out 的生命周期矩阵;`NotificationFanOutBenchmarks` 只覆盖固定 `4 handler` 的已装配宿主
- 当前没有 stream pipeline benchmark;现有 pipeline coverage 仅限 request
131 changes: 131 additions & 0 deletions GFramework.Cqrs.Tests/Cqrs/CqrsHandlerRegistrarFallbackFailureTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,68 @@ public void RegisterHandlers_Should_Skip_Cross_Assembly_Direct_Fallback_Type_And
});
}

/// <summary>
/// 验证当 generated registry 是抽象类型时,registrar 会记录告警并回退到反射扫描。
/// </summary>
[Test]
public void RegisterHandlers_Should_Fall_Back_To_Reflection_When_Generated_Registry_Is_Abstract()
{
var generatedAssembly = CreateGeneratedRegistryAssembly(
"GFramework.Cqrs.Tests.Cqrs.AbstractGeneratedRegistryAssembly, Version=1.0.0.0",
typeof(AbstractGeneratedNotificationHandlerRegistry));
generatedAssembly
.Setup(static assembly => assembly.GetTypes())
.Returns([typeof(GeneratedRegistryNotificationHandler)]);

var container = new MicrosoftDiContainer();
CqrsTestRuntime.RegisterHandlers(container, generatedAssembly.Object);

Assert.Multiple(() =>
{
Assert.That(
GetGeneratedRegistryNotificationHandlerTypes(container),
Is.EqualTo([typeof(GeneratedRegistryNotificationHandler)]));
Assert.That(
GetWarningLogs().Any(log =>
log.Message.Contains("because it is abstract", StringComparison.Ordinal)),
Is.True);
});

generatedAssembly.Verify(static assembly => assembly.GetTypes(), Times.Once);
}

/// <summary>
/// 验证当 generated registry 不暴露可访问无参构造器时,registrar 会记录告警并回退到反射扫描。
/// </summary>
[Test]
public void RegisterHandlers_Should_Fall_Back_To_Reflection_When_Generated_Registry_Has_No_Parameterless_Constructor()
{
var generatedAssembly = CreateGeneratedRegistryAssembly(
"GFramework.Cqrs.Tests.Cqrs.NoParameterlessGeneratedRegistryAssembly, Version=1.0.0.0",
typeof(ConstructorArgumentNotificationHandlerRegistry));
generatedAssembly
.Setup(static assembly => assembly.GetTypes())
.Returns([typeof(GeneratedRegistryNotificationHandler)]);

var container = new MicrosoftDiContainer();
CqrsTestRuntime.RegisterHandlers(container, generatedAssembly.Object);

Assert.Multiple(() =>
{
Assert.That(
GetGeneratedRegistryNotificationHandlerTypes(container),
Is.EqualTo([typeof(GeneratedRegistryNotificationHandler)]));
Assert.That(
GetWarningLogs().Any(log =>
log.Message.Contains(
"does not expose an accessible parameterless constructor",
StringComparison.Ordinal)),
Is.True);
});

generatedAssembly.Verify(static assembly => assembly.GetTypes(), Times.Once);
}

/// <summary>
/// 创建一个仅通过 generated registry 注册主 handler、并附带指定 fallback 元数据的程序集替身。
/// </summary>
Expand All @@ -161,6 +223,24 @@ private static Mock<Assembly> CreateGeneratedFallbackAssembly(
return generatedAssembly;
}

/// <summary>
/// 创建一个只声明 generated registry attribute 的程序集替身,用于验证 registry 激活失败后的回退行为。
/// </summary>
/// <param name="assemblyName">用于日志与缓存键的程序集名。</param>
/// <param name="registryType">要暴露给 registrar 的 generated registry 类型。</param>
/// <returns>已完成基础接线的程序集 mock。</returns>
private static Mock<Assembly> CreateGeneratedRegistryAssembly(string assemblyName, Type registryType)
{
var generatedAssembly = new Mock<Assembly>();
generatedAssembly
.SetupGet(static assembly => assembly.FullName)
.Returns(assemblyName);
generatedAssembly
.Setup(static assembly => assembly.GetCustomAttributes(typeof(CqrsHandlerRegistryAttribute), false))
.Returns([new CqrsHandlerRegistryAttribute(registryType)]);
return generatedAssembly;
}

/// <summary>
/// 提取容器中针对 generated notification 注册的处理器实现类型。
/// </summary>
Expand Down Expand Up @@ -259,4 +339,55 @@ private static Type GetRegistrarType()
.Where(static log => log.Level == LogLevel.Warning)
.ToArray();
}

/// <summary>
/// 模拟 generated registry 被错误声明为抽象类型时的激活失败场景。
/// </summary>
private abstract class AbstractGeneratedNotificationHandlerRegistry : ICqrsHandlerRegistry
{
/// <summary>
/// 抽象 registry 即便具备注册逻辑,也不应被运行时实例化。
/// </summary>
/// <param name="services">承载处理器映射的服务集合。</param>
/// <param name="logger">记录注册诊断的日志器。</param>
public void Register(IServiceCollection services, ILogger logger)
{
ArgumentNullException.ThrowIfNull(services);
ArgumentNullException.ThrowIfNull(logger);

services.AddTransient(
typeof(INotificationHandler<GeneratedRegistryNotification>),
typeof(GeneratedRegistryNotificationHandler));
}
}

/// <summary>
/// 模拟 generated registry 缺少可访问无参构造器时的激活失败场景。
/// </summary>
private sealed class ConstructorArgumentNotificationHandlerRegistry : ICqrsHandlerRegistry
{
/// <summary>
/// 初始化一个只能通过额外参数构造的测试 registry。
/// </summary>
/// <param name="marker">用于区分测试场景的占位参数。</param>
public ConstructorArgumentNotificationHandlerRegistry(string marker)
{
ArgumentNullException.ThrowIfNull(marker);
}

/// <summary>
/// 此实现仅用于满足接口契约;本用例关注的是实例化失败前的回退行为。
/// </summary>
/// <param name="services">承载处理器映射的服务集合。</param>
/// <param name="logger">记录注册诊断的日志器。</param>
public void Register(IServiceCollection services, ILogger logger)
{
ArgumentNullException.ThrowIfNull(services);
ArgumentNullException.ThrowIfNull(logger);

services.AddTransient(
typeof(INotificationHandler<GeneratedRegistryNotification>),
typeof(GeneratedRegistryNotificationHandler));
}
}
}
Loading
Loading