This commit is contained in:
Dan Clark
2024-11-25 14:27:17 +00:00
parent f7a96b3784
commit bb9e796600
10 changed files with 96 additions and 122 deletions
@@ -15,21 +15,15 @@ public class AsyncHandlerKeyedInitialization<TMessage, THandler>(string key, ISe
(provider, args) =>
{
IEnumerable<IAsyncHandler<TMessage>> handlers = provider.GetKeyedServices<IAsyncHandler<TMessage>>(key);
IEnumerable<IAsyncPipelineBehavior<TMessage, Unit>> behaviors = provider.GetServices<IAsyncPipelineBehavior<TMessage, Unit>>();
IEnumerable<IAsyncPipelineBehavior<TMessage>> behaviors = provider.GetKeyedServices<IAsyncPipelineBehavior<TMessage>>(key);
foreach (IAsyncHandler<TMessage> handler in handlers)
{
AsyncHandlerDelegate<Unit> handlerDelegate = () =>
handler.Handle(args.Message, args.CancellationToken).ContinueWith(_ => Unit.Value);
Task<Unit> ExecutePipeline(int index) => index < 0
? handler.Handle(args.Message, args.CancellationToken).ContinueWith(_ => Unit.Value)
: behaviors.ElementAt(index).Handle(args.Message, () => ExecutePipeline(index - 1)).ContinueWith(_ => Unit.Value);
foreach (IAsyncPipelineBehavior<TMessage, Unit> behavior in behaviors.Reverse())
{
AsyncHandlerDelegate<Unit> next = handlerDelegate;
handlerDelegate = () => behavior.Handle(args.Message, next);
}
handlerDelegate();
args.Reply(Unit.Value);
args.Reply(ExecutePipeline(behaviors.Count() - 1));
}
});
}
@@ -48,22 +42,17 @@ public class AsyncHandlerKeyedInitialization<TMessage, TResponse, THandler>(stri
(provider, args) =>
{
IEnumerable<IAsyncHandler<TMessage, TResponse>> handlers = provider.GetKeyedServices<IAsyncHandler<TMessage, TResponse>>(key);
IEnumerable<IAsyncPipelineBehavior<TMessage, TResponse>> behaviors = provider.GetServices<IAsyncPipelineBehavior<TMessage, TResponse>>();
IEnumerable<IAsyncPipelineBehavior<TMessage, TResponse>> behaviors = provider.GetKeyedServices<IAsyncPipelineBehavior<TMessage, TResponse>>(key);
foreach (IAsyncHandler<TMessage, TResponse> handler in handlers)
{
AsyncHandlerDelegate<TResponse> handlerDelegate = () =>
handler.Handle(args.Message, args.CancellationToken);
Task<TResponse> ExecutePipeline(int index) => index < 0
? handler.Handle(args.Message, args.CancellationToken)
: behaviors.ElementAt(index).Handle(args.Message, () => ExecutePipeline(index - 1));
foreach (IAsyncPipelineBehavior<TMessage, TResponse> behavior in behaviors.Reverse())
{
AsyncHandlerDelegate<TResponse> next = handlerDelegate;
handlerDelegate = () => behavior.Handle(args.Message, next);
}
args.Reply(handlerDelegate());
args.Reply(ExecutePipeline(behaviors.Count() - 1));
}
});
}
}
}
}