Pipelines

This commit is contained in:
Dan Clark
2024-11-25 09:44:12 +00:00
parent 93e2be4eb0
commit d1ee8add0c
9 changed files with 197 additions and 36 deletions
@@ -0,0 +1,3 @@
namespace Toolkit.Foundation;
public delegate Task<TResponse> AsyncHandlerDelegate<TResponse>();
@@ -5,19 +5,35 @@ namespace Toolkit.Foundation;
public class AsyncHandlerInitialization<TMessage, TResponse, THandler>(IServiceProvider provider) : public class AsyncHandlerInitialization<TMessage, TResponse, THandler>(IServiceProvider provider) :
IInitialization where THandler : class, IAsyncHandler<TMessage, TResponse> IInitialization where THandler : class, IAsyncHandler<TMessage, TResponse>
where TMessage : class where TMessage : class
{ {
public void Initialize() public void Initialize()
{ {
if (!StrongReferenceMessenger.Default.IsRegistered<AsyncResponseEventArgs<TMessage, TResponse>>(provider)) if (!StrongReferenceMessenger.Default.IsRegistered<AsyncResponseEventArgs<TMessage, TResponse>>(provider))
{ {
StrongReferenceMessenger.Default.Register<IServiceProvider, AsyncResponseEventArgs<TMessage, TResponse>>(provider, StrongReferenceMessenger.Default.Register<IServiceProvider, AsyncResponseEventArgs<TMessage, TResponse>>(provider,
(provider, args) => async (provider, args) =>
{ {
foreach (IAsyncHandler<TMessage, TResponse> handler in provider.GetServices<IAsyncHandler<TMessage, TResponse>>()) IEnumerable<IAsyncHandler<TMessage, TResponse>> handlers = provider.GetServices<IAsyncHandler<TMessage, TResponse>>();
IEnumerable<IAsyncPipelineBehavior<TMessage, TResponse>> behaviors = provider.GetServices<IAsyncPipelineBehavior<TMessage, TResponse>>();
AsyncHandlerDelegate<TResponse> handlerDelegate = async () =>
{ {
args.Reply(handler.Handle(args.Message, args.CancellationToken)); TResponse response = default!;
foreach (IAsyncHandler<TMessage, TResponse> handler in handlers)
{
response = await handler.Handle(args.Message, args.CancellationToken);
}
return response;
};
foreach (IAsyncPipelineBehavior<TMessage, TResponse> behavior in behaviors.Reverse())
{
AsyncHandlerDelegate<TResponse> next = handlerDelegate;
handlerDelegate = () => behavior.Handle(args.Message, next);
} }
args.Reply(await handlerDelegate());
}); });
} }
} }
@@ -25,20 +41,35 @@ public class AsyncHandlerInitialization<TMessage, TResponse, THandler>(IServiceP
public class AsyncHandlerInitialization<TMessage, THandler>(IServiceProvider provider) : public class AsyncHandlerInitialization<TMessage, THandler>(IServiceProvider provider) :
IInitialization where THandler : class, IAsyncHandler<TMessage> IInitialization where THandler : class, IAsyncHandler<TMessage>
where TMessage : class where TMessage : class
{ {
public void Initialize() public void Initialize()
{ {
if (!StrongReferenceMessenger.Default.IsRegistered<AsyncResponseEventArgs<TMessage, Unit>>(provider)) if (!StrongReferenceMessenger.Default.IsRegistered<AsyncResponseEventArgs<TMessage, Unit>>(provider))
{ {
StrongReferenceMessenger.Default.Register<IServiceProvider, AsyncResponseEventArgs<TMessage, Unit>>(provider, StrongReferenceMessenger.Default.Register<IServiceProvider, AsyncResponseEventArgs<TMessage, Unit>>(provider,
(provider, args) => async (provider, args) =>
{ {
foreach (IAsyncHandler<TMessage> handler in provider.GetServices<IAsyncHandler<TMessage>>()) IEnumerable<IAsyncHandler<TMessage>> handlers = provider.GetServices<IAsyncHandler<TMessage>>();
IEnumerable<IAsyncPipelineBehavior<TMessage, Unit>> behaviors = provider.GetServices<IAsyncPipelineBehavior<TMessage, Unit>>();
AsyncHandlerDelegate<Unit> handlerDelegate = async () =>
{ {
handler.Handle(args.Message, args.CancellationToken); foreach (IAsyncHandler<TMessage> handler in handlers)
args.Reply(Unit.Value); {
await handler.Handle(args.Message, args.CancellationToken);
}
return Unit.Value;
};
foreach (IAsyncPipelineBehavior<TMessage, Unit> behavior in behaviors.Reverse())
{
AsyncHandlerDelegate<Unit> next = handlerDelegate;
handlerDelegate = () => behavior.Handle(args.Message, next);
} }
await handlerDelegate();
args.Reply(Unit.Value);
}); });
} }
} }
@@ -5,20 +5,35 @@ namespace Toolkit.Foundation;
public class AsyncHandlerKeyedInitialization<TMessage, THandler>(string key, IServiceProvider provider) : public class AsyncHandlerKeyedInitialization<TMessage, THandler>(string key, IServiceProvider provider) :
IInitialization where THandler : class, IAsyncHandler<TMessage> IInitialization where THandler : class, IAsyncHandler<TMessage>
where TMessage : class where TMessage : class
{ {
public void Initialize() public void Initialize()
{ {
if (!StrongReferenceMessenger.Default.IsRegistered<AsyncResponseEventArgs<TMessage, Unit>, string>(provider, key)) if (!StrongReferenceMessenger.Default.IsRegistered<AsyncResponseEventArgs<TMessage, Unit>, string>(provider, key))
{ {
StrongReferenceMessenger.Default.Register<IServiceProvider, AsyncResponseEventArgs<TMessage, Unit>, string>(provider, key, StrongReferenceMessenger.Default.Register<IServiceProvider, AsyncResponseEventArgs<TMessage, Unit>, string>(provider, key,
(provider, args) => async (provider, args) =>
{ {
foreach (IAsyncHandler<TMessage> handler in provider.GetKeyedServices<IAsyncHandler<TMessage>>(key)) IEnumerable<IAsyncHandler<TMessage>> handlers = provider.GetKeyedServices<IAsyncHandler<TMessage>>(key);
IEnumerable<IAsyncPipelineBehavior<TMessage, Unit>> behaviors = provider.GetServices<IAsyncPipelineBehavior<TMessage, Unit>>();
AsyncHandlerDelegate<Unit> handlerDelegate = async () =>
{ {
handler.Handle(args.Message, args.CancellationToken); foreach (IAsyncHandler<TMessage> handler in handlers)
args.Reply(Unit.Value); {
await handler.Handle(args.Message, args.CancellationToken);
}
return Unit.Value;
};
foreach (IAsyncPipelineBehavior<TMessage, Unit> behavior in behaviors.Reverse())
{
AsyncHandlerDelegate<Unit> next = handlerDelegate;
handlerDelegate = () => behavior.Handle(args.Message, next);
} }
await handlerDelegate();
args.Reply(Unit.Value);
}); });
} }
} }
@@ -26,20 +41,36 @@ public class AsyncHandlerKeyedInitialization<TMessage, THandler>(string key, ISe
public class AsyncHandlerKeyedInitialization<TMessage, TResponse, THandler>(string key, IServiceProvider provider) : public class AsyncHandlerKeyedInitialization<TMessage, TResponse, THandler>(string key, IServiceProvider provider) :
IInitialization where THandler : class, IAsyncHandler<TMessage, TResponse> IInitialization where THandler : class, IAsyncHandler<TMessage, TResponse>
where TMessage : class where TMessage : class
{ {
public void Initialize() public void Initialize()
{ {
if (!StrongReferenceMessenger.Default.IsRegistered<AsyncResponseEventArgs<TMessage, TResponse>, string>(provider, key)) if (!StrongReferenceMessenger.Default.IsRegistered<AsyncResponseEventArgs<TMessage, TResponse>, string>(provider, key))
{ {
StrongReferenceMessenger.Default.Register<IServiceProvider, AsyncResponseEventArgs<TMessage, TResponse>, string>(provider, key, StrongReferenceMessenger.Default.Register<IServiceProvider, AsyncResponseEventArgs<TMessage, TResponse>, string>(provider, key,
(provider, args) => async (provider, args) =>
{ {
foreach (IAsyncHandler<TMessage, TResponse> handler in provider.GetKeyedServices<IAsyncHandler<TMessage, TResponse>>(key)) IEnumerable<IAsyncHandler<TMessage, TResponse>> handlers = provider.GetKeyedServices<IAsyncHandler<TMessage, TResponse>>(key);
IEnumerable<IAsyncPipelineBehavior<TMessage, TResponse>> behaviors = provider.GetServices<IAsyncPipelineBehavior<TMessage, TResponse>>();
AsyncHandlerDelegate<TResponse> handlerDelegate = async () =>
{ {
args.Reply(handler.Handle(args.Message, args.CancellationToken)); TResponse response = default!;
foreach (IAsyncHandler<TMessage, TResponse> handler in handlers)
{
response = await handler.Handle(args.Message, args.CancellationToken);
}
return response;
};
foreach (IAsyncPipelineBehavior<TMessage, TResponse> behavior in behaviors.Reverse())
{
AsyncHandlerDelegate<TResponse> next = handlerDelegate;
handlerDelegate = () => behavior.Handle(args.Message, next);
} }
args.Reply(await handlerDelegate());
}); });
} }
} }
} }
+1 -2
View File
@@ -1,4 +1,3 @@
namespace Toolkit.Foundation; namespace Toolkit.Foundation;
public delegate Task<TResponse> HandlerDelegate<TRequest, TResponse>(TRequest request, public delegate TResponse HandlerDelegate<TResponse>();
CancellationToken cancellationToken);
+38 -6
View File
@@ -5,7 +5,7 @@ namespace Toolkit.Foundation;
public class HandlerInitialization<TMessage, TResponse, THandler>(IServiceProvider provider) : public class HandlerInitialization<TMessage, TResponse, THandler>(IServiceProvider provider) :
IInitialization where THandler : class, IHandler<TMessage, TResponse> IInitialization where THandler : class, IHandler<TMessage, TResponse>
where TMessage : class where TMessage : class
{ {
public void Initialize() public void Initialize()
{ {
@@ -14,10 +14,26 @@ public class HandlerInitialization<TMessage, TResponse, THandler>(IServiceProvid
StrongReferenceMessenger.Default.Register<IServiceProvider, ResponseEventArgs<TMessage, TResponse>>(provider, StrongReferenceMessenger.Default.Register<IServiceProvider, ResponseEventArgs<TMessage, TResponse>>(provider,
(provider, args) => (provider, args) =>
{ {
foreach (IHandler<TMessage, TResponse> handler in provider.GetServices<IHandler<TMessage, TResponse>>()) IEnumerable<IHandler<TMessage, TResponse>> handlers = provider.GetServices<IHandler<TMessage, TResponse>>();
IEnumerable<IPipelineBehavior<TMessage, TResponse>> behaviors = provider.GetServices<IPipelineBehavior<TMessage, TResponse>>();
HandlerDelegate<TResponse> handlerDelegate = () =>
{ {
handler.Handle(args.Message); TResponse response = default!;
foreach (IHandler<TMessage, TResponse> handler in handlers)
{
response = handler.Handle(args.Message);
}
return response;
};
foreach (IPipelineBehavior<TMessage, TResponse> behavior in behaviors.Reverse())
{
HandlerDelegate<TResponse> next = handlerDelegate;
handlerDelegate = () => behavior.Handle(args.Message, next);
} }
handlerDelegate();
}); });
} }
} }
@@ -25,7 +41,7 @@ public class HandlerInitialization<TMessage, TResponse, THandler>(IServiceProvid
public class HandlerInitialization<TMessage, THandler>(IServiceProvider provider) : public class HandlerInitialization<TMessage, THandler>(IServiceProvider provider) :
IInitialization where THandler : class, IHandler<TMessage> IInitialization where THandler : class, IHandler<TMessage>
where TMessage : class where TMessage : class
{ {
public void Initialize() public void Initialize()
{ {
@@ -34,11 +50,27 @@ public class HandlerInitialization<TMessage, THandler>(IServiceProvider provider
StrongReferenceMessenger.Default.Register<IServiceProvider, TMessage>(provider, StrongReferenceMessenger.Default.Register<IServiceProvider, TMessage>(provider,
(provider, args) => (provider, args) =>
{ {
foreach (IHandler<TMessage> handler in provider.GetServices<IHandler<TMessage>>()) IEnumerable<IHandler<TMessage>> handlers = provider.GetServices<IHandler<TMessage>>();
IEnumerable<IPipelineBehavior<TMessage, Unit>> behaviors = provider.GetServices<IPipelineBehavior<TMessage, Unit>>();
HandlerDelegate<Unit> handlerDelegate = () =>
{ {
handler.Handle(args); foreach (IHandler<TMessage> handler in handlers)
{
handler.Handle(args);
}
return Unit.Value;
};
foreach (IPipelineBehavior<TMessage, Unit> behavior in behaviors.Reverse())
{
HandlerDelegate<Unit> next = handlerDelegate;
handlerDelegate = () => behavior.Handle(args, next);
} }
handlerDelegate();
}); });
} }
} }
} }
@@ -5,7 +5,7 @@ namespace Toolkit.Foundation;
public class HandlerKeyedInitialization<TMessage, THandler>(string key, IServiceProvider provider) : public class HandlerKeyedInitialization<TMessage, THandler>(string key, IServiceProvider provider) :
IInitialization where THandler : class, IHandler<TMessage> IInitialization where THandler : class, IHandler<TMessage>
where TMessage : class where TMessage : class
{ {
public void Initialize() public void Initialize()
{ {
@@ -14,18 +14,34 @@ public class HandlerKeyedInitialization<TMessage, THandler>(string key, IService
StrongReferenceMessenger.Default.Register<IServiceProvider, TMessage, string>(provider, key, StrongReferenceMessenger.Default.Register<IServiceProvider, TMessage, string>(provider, key,
(provider, args) => (provider, args) =>
{ {
foreach (IHandler<TMessage> handler in provider.GetKeyedServices<IHandler<TMessage>>(key)) IEnumerable<IHandler<TMessage>> handlers = provider.GetKeyedServices<IHandler<TMessage>>(key);
IEnumerable<IPipelineBehavior<TMessage, Unit>> behaviors = provider.GetServices<IPipelineBehavior<TMessage, Unit>>();
HandlerDelegate<Unit> handlerDelegate = () =>
{ {
handler.Handle(args); foreach (IHandler<TMessage> handler in handlers)
{
handler.Handle(args);
}
return Unit.Value;
};
foreach (IPipelineBehavior<TMessage, Unit> behavior in behaviors.Reverse())
{
HandlerDelegate<Unit> next = handlerDelegate;
handlerDelegate = () => behavior.Handle(args, next);
} }
handlerDelegate();
}); });
} }
} }
} }
public class HandlerKeyedInitialization<TMessage, TResponse, THandler>(string key, IServiceProvider provider) : public class HandlerKeyedInitialization<TMessage, TResponse, THandler>(string key, IServiceProvider provider) :
IInitialization where THandler : class, IHandler<TMessage, TResponse> IInitialization where THandler : class, IHandler<TMessage, TResponse>
where TMessage : class where TMessage : class
{ {
public void Initialize() public void Initialize()
{ {
@@ -34,11 +50,27 @@ public class HandlerKeyedInitialization<TMessage, TResponse, THandler>(string ke
StrongReferenceMessenger.Default.Register<IServiceProvider, ResponseEventArgs<TMessage, TResponse>, string>(provider, key, StrongReferenceMessenger.Default.Register<IServiceProvider, ResponseEventArgs<TMessage, TResponse>, string>(provider, key,
(provider, args) => (provider, args) =>
{ {
foreach (IHandler<TMessage, TResponse> handler in provider.GetKeyedServices<IHandler<TMessage, TResponse>>(key)) IEnumerable<IHandler<TMessage, TResponse>> handlers = provider.GetKeyedServices<IHandler<TMessage, TResponse>>(key);
IEnumerable<IPipelineBehavior<TMessage, TResponse>> behaviors = provider.GetServices<IPipelineBehavior<TMessage, TResponse>>();
HandlerDelegate<TResponse> handlerDelegate = () =>
{ {
handler.Handle(args.Message); TResponse response = default!;
foreach (IHandler<TMessage, TResponse> handler in handlers)
{
response = handler.Handle(args.Message);
}
return response;
};
foreach (IPipelineBehavior<TMessage, TResponse> behavior in behaviors.Reverse())
{
HandlerDelegate<TResponse> next = handlerDelegate;
handlerDelegate = () => behavior.Handle(args.Message, next);
} }
handlerDelegate();
}); });
} }
} }
} }
@@ -0,0 +1,8 @@
namespace Toolkit.Foundation;
public interface IAsyncPipelineBehavior<TMessage,
TResponse>
{
Task<TResponse> Handle(TMessage message,
AsyncHandlerDelegate<TResponse> next);
}
+9
View File
@@ -0,0 +1,9 @@
namespace Toolkit.Foundation;
public interface IPipelineBehavior<TMessage,
TResponse>
{
TResponse Handle(TMessage message,
HandlerDelegate<TResponse> next);
}
@@ -124,7 +124,7 @@ public static class IServiceCollectionExtensions
where THandler : class, IHandler<TMessage> where THandler : class, IHandler<TMessage>
where TMessage : class where TMessage : class
{ {
if (key is { Length: > 0}) if (key is { Length: > 0 })
{ {
services.Add(new ServiceDescriptor(typeof(IHandler<TMessage>), key, typeof(THandler), lifetime)); services.Add(new ServiceDescriptor(typeof(IHandler<TMessage>), key, typeof(THandler), lifetime));
services.AddInitialization<HandlerKeyedInitialization<TMessage, IHandler<TMessage>>>(key); services.AddInitialization<HandlerKeyedInitialization<TMessage, IHandler<TMessage>>>(key);
@@ -158,7 +158,7 @@ public static class IServiceCollectionExtensions
return services; return services;
} }
public static IServiceCollection AddInitialization<TInitialization>(this IServiceCollection services, public static IServiceCollection AddInitialization<TInitialization>(this IServiceCollection services,
params object[] parameters) params object[] parameters)
where TInitialization : class, where TInitialization : class,
IInitialization IInitialization
@@ -167,6 +167,22 @@ public static class IServiceCollectionExtensions
return services; return services;
} }
public static IServiceCollection AddAsyncPipelineBehavior<TMessage, TResponse, TBehavior>(this IServiceCollection services)
where TBehavior : class,
IAsyncPipelineBehavior<TMessage, TResponse>
{
services.AddTransient<IAsyncPipelineBehavior<TMessage, TResponse>, TBehavior>();
return services;
}
public static IServiceCollection AddPipelineBehavior<TMessage, TResponse, TBehavior>(this IServiceCollection services)
where TBehavior : class,
IPipelineBehavior<TMessage, TResponse>
{
services.AddTransient<IPipelineBehavior<TMessage, TResponse>, TBehavior>();
return services;
}
public static IServiceCollection AddRange(this IServiceCollection services, public static IServiceCollection AddRange(this IServiceCollection services,
IServiceCollection fromServices) IServiceCollection fromServices)
{ {