From 20627b1a61cba842aa48126b972d0b6a33f05a8a Mon Sep 17 00:00:00 2001 From: Toastie Date: Sun, 31 Mar 2024 23:52:43 +1300 Subject: [PATCH] Added pubsub to Ellie --- src/EllieBot/Common/PubSub/EventPubSub.cs | 80 +++++++++++++++++++++++ src/EllieBot/Common/PubSub/IPubSub.cs | 10 +++ src/EllieBot/Common/PubSub/ISeria.cs | 7 ++ src/EllieBot/Common/PubSub/JsonSeria.cs | 27 ++++++++ src/EllieBot/Common/PubSub/RedisPubSub.cs | 52 +++++++++++++++ src/EllieBot/Common/PubSub/TypedKey.cs | 30 +++++++++ src/EllieBot/Common/PubSub/YamlSeria.cs | 39 +++++++++++ 7 files changed, 245 insertions(+) create mode 100644 src/EllieBot/Common/PubSub/EventPubSub.cs create mode 100644 src/EllieBot/Common/PubSub/IPubSub.cs create mode 100644 src/EllieBot/Common/PubSub/ISeria.cs create mode 100644 src/EllieBot/Common/PubSub/JsonSeria.cs create mode 100644 src/EllieBot/Common/PubSub/RedisPubSub.cs create mode 100644 src/EllieBot/Common/PubSub/TypedKey.cs create mode 100644 src/EllieBot/Common/PubSub/YamlSeria.cs diff --git a/src/EllieBot/Common/PubSub/EventPubSub.cs b/src/EllieBot/Common/PubSub/EventPubSub.cs new file mode 100644 index 0000000..0e45ed8 --- /dev/null +++ b/src/EllieBot/Common/PubSub/EventPubSub.cs @@ -0,0 +1,80 @@ +namespace EllieBot.Common; + +public class EventPubSub : IPubSub +{ + private readonly Dictionary>>> _actions = new(); + private readonly object _locker = new(); + + public Task Sub(in TypedKey key, Func action) + where TData : notnull + { + Func localAction = obj => action((TData)obj); + lock (_locker) + { + if (!_actions.TryGetValue(key.Key, out var keyActions)) + { + keyActions = new(); + _actions[key.Key] = keyActions; + } + + if (!keyActions.TryGetValue(action, out var sameActions)) + { + sameActions = new(); + keyActions[action] = sameActions; + } + + sameActions.Add(localAction); + + return Task.CompletedTask; + } + } + + public Task Pub(in TypedKey key, TData data) + where TData : notnull + { + lock (_locker) + { + if (_actions.TryGetValue(key.Key, out var actions)) + // if this class ever gets used, this needs to be properly implemented + // 1. ignore all valuetasks which are completed + // 2. run all other tasks in parallel + return actions.SelectMany(kvp => kvp.Value).Select(action => action(data).AsTask()).WhenAll(); + + return Task.CompletedTask; + } + } + + public Task Unsub(in TypedKey key, Func action) + { + lock (_locker) + { + // get subscriptions for this action + if (_actions.TryGetValue(key.Key, out var actions)) + // get subscriptions which have the same action hash code + // note: having this as a list allows for multiple subscriptions of + // the same insance's/static method + { + if (actions.TryGetValue(action, out var sameActions)) + { + // remove last subscription + sameActions.RemoveAt(sameActions.Count - 1); + + // if the last subscription was the only subscription + // we can safely remove this action's dictionary entry + if (sameActions.Count == 0) + { + actions.Remove(action); + + // if our dictionary has no more elements after + // removing the entry + // it's safe to remove it from the key's subscriptions + if (actions.Count == 0) + _actions.Remove(key.Key); + } + } + } + + return Task.CompletedTask; + } + } +} diff --git a/src/EllieBot/Common/PubSub/IPubSub.cs b/src/EllieBot/Common/PubSub/IPubSub.cs new file mode 100644 index 0000000..0d52156 --- /dev/null +++ b/src/EllieBot/Common/PubSub/IPubSub.cs @@ -0,0 +1,10 @@ +namespace EllieBot.Common; + +public interface IPubSub +{ + public Task Pub(in TypedKey key, TData data) + where TData : notnull; + + public Task Sub(in TypedKey key, Func action) + where TData : notnull; +} \ No newline at end of file diff --git a/src/EllieBot/Common/PubSub/ISeria.cs b/src/EllieBot/Common/PubSub/ISeria.cs new file mode 100644 index 0000000..0eb9706 --- /dev/null +++ b/src/EllieBot/Common/PubSub/ISeria.cs @@ -0,0 +1,7 @@ +namespace EllieBot.Common; + +public interface ISeria +{ + byte[] Serialize(T data); + T? Deserialize(byte[]? data); +} \ No newline at end of file diff --git a/src/EllieBot/Common/PubSub/JsonSeria.cs b/src/EllieBot/Common/PubSub/JsonSeria.cs new file mode 100644 index 0000000..b8f8033 --- /dev/null +++ b/src/EllieBot/Common/PubSub/JsonSeria.cs @@ -0,0 +1,27 @@ +using EllieBot.Common.JsonConverters; +using System.Text.Json; + +namespace EllieBot.Common; + +public class JsonSeria : ISeria +{ + private readonly JsonSerializerOptions _serializerOptions = new() + { + Converters = + { + new Rgba32Converter(), + new CultureInfoConverter() + } + }; + + public byte[] Serialize(T data) + => JsonSerializer.SerializeToUtf8Bytes(data, _serializerOptions); + + public T? Deserialize(byte[]? data) + { + if (data is null) + return default; + + return JsonSerializer.Deserialize(data, _serializerOptions); + } +} \ No newline at end of file diff --git a/src/EllieBot/Common/PubSub/RedisPubSub.cs b/src/EllieBot/Common/PubSub/RedisPubSub.cs new file mode 100644 index 0000000..82d8a99 --- /dev/null +++ b/src/EllieBot/Common/PubSub/RedisPubSub.cs @@ -0,0 +1,52 @@ +using StackExchange.Redis; + +namespace EllieBot.Common; + +public sealed class RedisPubSub : IPubSub +{ + private readonly IBotCredentials _creds; + private readonly ConnectionMultiplexer _multi; + private readonly ISeria _serializer; + + public RedisPubSub(ConnectionMultiplexer multi, ISeria serializer, IBotCredentials creds) + { + _multi = multi; + _serializer = serializer; + _creds = creds; + } + + public Task Pub(in TypedKey key, TData data) + where TData : notnull + { + var serialized = _serializer.Serialize(data); + return _multi.GetSubscriber() + .PublishAsync($"{_creds.RedisKey()}:{key.Key}", serialized, CommandFlags.FireAndForget); + } + + public Task Sub(in TypedKey key, Func action) + where TData : notnull + { + var eventName = key.Key; + + async void OnSubscribeHandler(RedisChannel _, RedisValue data) + { + try + { + var dataObj = _serializer.Deserialize(data); + if (dataObj is not null) + await action(dataObj); + else + { + Log.Warning("Publishing event {EventName} with a null value. This is not allowed", + eventName); + } + } + catch (Exception ex) + { + Log.Error("Error handling the event {EventName}: {ErrorMessage}", eventName, ex.Message); + } + } + + return _multi.GetSubscriber().SubscribeAsync($"{_creds.RedisKey()}:{eventName}", OnSubscribeHandler); + } +} \ No newline at end of file diff --git a/src/EllieBot/Common/PubSub/TypedKey.cs b/src/EllieBot/Common/PubSub/TypedKey.cs new file mode 100644 index 0000000..e637707 --- /dev/null +++ b/src/EllieBot/Common/PubSub/TypedKey.cs @@ -0,0 +1,30 @@ +namespace EllieBot.Common; + +public readonly struct TypedKey +{ + public string Key { get; } + + public TypedKey(in string key) + => Key = key; + + public static implicit operator TypedKey(in string input) + => new(input); + + public static implicit operator string(in TypedKey input) + => input.Key; + + public static bool operator ==(in TypedKey left, in TypedKey right) + => left.Key == right.Key; + + public static bool operator !=(in TypedKey left, in TypedKey right) + => !(left == right); + + public override bool Equals(object? obj) + => obj is TypedKey o && o == this; + + public override int GetHashCode() + => Key?.GetHashCode() ?? 0; + + public override string ToString() + => Key; +} \ No newline at end of file diff --git a/src/EllieBot/Common/PubSub/YamlSeria.cs b/src/EllieBot/Common/PubSub/YamlSeria.cs new file mode 100644 index 0000000..2feb5c8 --- /dev/null +++ b/src/EllieBot/Common/PubSub/YamlSeria.cs @@ -0,0 +1,39 @@ +using EllieBot.Common.Configs; +using EllieBot.Common.Yml; +using System.Text.RegularExpressions; +using YamlDotNet.Serialization; + +namespace EllieBot.Common; + +public class YamlSeria : IConfigSeria +{ + private static readonly Regex _codePointRegex = + new(@"(\\U(?[a-zA-Z0-9]{8})|\\u(?[a-zA-Z0-9]{4})|\\x(?[a-zA-Z0-9]{2}))", + RegexOptions.Compiled); + + private readonly IDeserializer _deserializer; + private readonly ISerializer _serializer; + + public YamlSeria() + { + _serializer = Yaml.Serializer; + _deserializer = Yaml.Deserializer; + } + + public string Serialize(T obj) + where T : notnull + { + var escapedOutput = _serializer.Serialize(obj); + var output = _codePointRegex.Replace(escapedOutput, + me => + { + var str = me.Groups["code"].Value; + var newString = YamlHelper.UnescapeUnicodeCodePoint(str); + return newString; + }); + return output; + } + + public T Deserialize(string data) + => _deserializer.Deserialize(data); +} \ No newline at end of file