Added DirectoryObserver

This commit is contained in:
TheXamlGuy
2024-09-29 13:10:39 +01:00
parent dc1afd5750
commit 760ce933bd
5 changed files with 96 additions and 49 deletions
+54
View File
@@ -0,0 +1,54 @@
using System.Collections.Concurrent;
namespace Toolkit.Foundation;
public class DirectoryObserver
{
public static async Task<string[]> EnumerateFiles(string path,
string[] filter,
int count,
CancellationToken cancellationToken = default)
{
string[] files = [];
HashSet<string> extensions = filter.Select(x => $".{x.ToLower()}").ToHashSet();
bool IsBatchComplete()
{
files = Directory.EnumerateFiles(path, "*.*", SearchOption.TopDirectoryOnly)
.Where(x => extensions.Contains(Path.GetExtension(x).ToLower()))
.ToArray();
if (files.Length != count)
{
return false;
}
ConcurrentBag<bool> fileAccessResults = [];
Parallel.ForEach(files, (file) =>
{
try
{
using FileStream fileStream = new(file, FileMode.Open, FileAccess.Read, FileShare.None);
fileAccessResults.Add(true);
}
catch (IOException)
{
fileAccessResults.Add(false);
}
});
return fileAccessResults.All(result => result);
}
await Task.Run(async () =>
{
while (!IsBatchComplete())
{
cancellationToken.ThrowIfCancellationRequested();
await Task.Delay(5000, cancellationToken);
}
}, cancellationToken);
return files;
}
}
+2 -2
View File
@@ -12,12 +12,12 @@ public interface IMediator
CancellationToken cancellationToken = default)
where TMessage : notnull;
IAsyncEnumerable<object?> HandleManyAsync(Type responseType,
IAsyncEnumerable<object?> HandleAsyncMany(Type responseType,
object message,
object? key = null,
CancellationToken cancellationToken = default);
IAsyncEnumerable<TResponse?> HandleManyAsync<TMessage, TResponse>(TMessage message,
IAsyncEnumerable<TResponse?> HandleAsyncMany<TMessage, TResponse>(TMessage message,
object? key = null,
CancellationToken cancellationToken = default)
where TMessage : notnull;
+38 -36
View File
@@ -23,7 +23,7 @@ public class Mediator(IHandlerProvider handlerProvider,
MethodInfo? handleMethod = handler?.GetType().GetMethod("Handle", [message.GetType(), typeof(CancellationToken)]);
if (handleMethod is not null)
{
return await (Task<TResponse?>)handleMethod.Invoke(handler, new object[] { message, cancellationToken })!;
return await (Task<TResponse?>)handleMethod.Invoke(handler, [message, cancellationToken])!;
}
}
@@ -47,7 +47,7 @@ public class Mediator(IHandlerProvider handlerProvider,
if (handleMethod is not null)
{
dynamic task = handleMethod.Invoke(handler, new object[] { message, cancellationToken })!;
dynamic task = handleMethod.Invoke(handler, [message, cancellationToken])!;
await task;
return task.Result;
@@ -57,35 +57,7 @@ public class Mediator(IHandlerProvider handlerProvider,
return default;
}
public async Task<List<object?>> HandleMany(Type responseType,
object message,
object? key = null,
CancellationToken cancellationToken = default)
{
List<object?> responses = [];
await foreach (object? response in HandleManyAsync(responseType, message, key, cancellationToken))
{
responses.Add(response);
}
return responses;
}
public async Task<IList<TResponse?>> HandleMany<TMessage, TResponse>(TMessage message,
object? key = null,
CancellationToken cancellationToken = default)
where TMessage : notnull
{
List<TResponse?> responses = [];
await foreach (TResponse? response in HandleManyAsync<TMessage, TResponse>(message, key, cancellationToken))
{
responses.Add(response);
}
return responses;
}
public async IAsyncEnumerable<object?> HandleManyAsync(Type responseType,
public async IAsyncEnumerable<object?> HandleAsyncMany(Type responseType,
object message,
object? key = null,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
@@ -102,12 +74,12 @@ public class Mediator(IHandlerProvider handlerProvider,
if (handleMethod is not null)
{
yield return await (Task<object?>)handleMethod.Invoke(handler, new object[] { message, cancellationToken })!;
yield return await (Task<object?>)handleMethod.Invoke(handler, [message, cancellationToken])!;
}
}
}
public async IAsyncEnumerable<TResponse?> HandleManyAsync<TMessage, TResponse>(TMessage message,
public async IAsyncEnumerable<TResponse?> HandleAsyncMany<TMessage, TResponse>(TMessage message,
object? key = null,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
where TMessage : notnull
@@ -122,11 +94,38 @@ public class Mediator(IHandlerProvider handlerProvider,
MethodInfo? handleMethod = handler?.GetType().GetMethod("Handle", [message.GetType(), typeof(CancellationToken)]);
if (handleMethod is not null)
{
yield return await (Task<TResponse?>)handleMethod.Invoke(handler, new object[] { message, cancellationToken })!;
yield return await (Task<TResponse?>)handleMethod.Invoke(handler, [message, cancellationToken])!;
}
}
}
public async Task<List<object?>> HandleMany(Type responseType,
object message,
object? key = null,
CancellationToken cancellationToken = default)
{
List<object?> responses = [];
await foreach (object? response in HandleAsyncMany(responseType, message, key, cancellationToken))
{
responses.Add(response);
}
return responses;
}
public async Task<IList<TResponse?>> HandleMany<TMessage, TResponse>(TMessage message,
object? key = null,
CancellationToken cancellationToken = default)
where TMessage : notnull
{
List<TResponse?> responses = [];
await foreach (TResponse? response in HandleAsyncMany<TMessage, TResponse>(message, key, cancellationToken))
{
responses.Add(response);
}
return responses;
}
private List<object?> GetHandlers(object message,
Type handlerWrapperType,
object? key)
@@ -155,8 +154,11 @@ public class Mediator(IHandlerProvider handlerProvider,
provider.GetServices(handlerWrapperType);
AddHandlers(keyedServices);
IEnumerable<object?> additionalHandlers = handlerProvider.Get(key);
AddHandlers(additionalHandlers);
if (key is not null)
{
IEnumerable<object?> additionalHandlers = handlerProvider.Get(key);
AddHandlers(additionalHandlers);
}
return handlers.SelectMany(entry => entry.Value).ToList();
}
-5
View File
@@ -1,5 +0,0 @@
namespace Toolkit.Foundation;
public static class MediatorExtensions
{
}