using System.Reactive.Concurrency; using System.Collections.Concurrent; using System.Reactive.Linq; using System.Reactive.Subjects; namespace TheXamlGuy.TaskbarGroup.Core { public class Messenger : IMessenger { public IMessageInvoker invoker; private readonly ConcurrentDictionary subjects = new(); private IScheduler dispatcher; public Messenger(IMessageInvoker invoker) { var synchronizationContext = SynchronizationContext.Current; if (synchronizationContext is null) throw new NullReferenceException(nameof(synchronizationContext)); this.invoker = invoker; dispatcher = new SynchronizationContextScheduler(synchronizationContext); } public ISubject GetSubject() { return (ISubject)subjects.GetOrAdd(typeof(TMessage), type => new BehaviorSubject(default)); } public void Send() where TMessage : new() { Send(new TMessage()); } public void Send(TMessage message) { GetSubject().OnNext(message); } public IDisposable Subscribe(Action actionDelegate, IScheduler? scheduler = null, Func? where = null) { if (scheduler is null) { scheduler = Scheduler.Default; } if (where == null) { where = x => true; } return GetSubject().AsObservable().Skip(1).Where(where).ObserveOn(scheduler).WeakSubscribe(invoker, actionDelegate); } } }