Added pubsub to Ellie
This commit is contained in:
parent
08756eeb5c
commit
20627b1a61
7 changed files with 245 additions and 0 deletions
80
src/EllieBot/Common/PubSub/EventPubSub.cs
Normal file
80
src/EllieBot/Common/PubSub/EventPubSub.cs
Normal file
|
@ -0,0 +1,80 @@
|
||||||
|
namespace EllieBot.Common;
|
||||||
|
|
||||||
|
public class EventPubSub : IPubSub
|
||||||
|
{
|
||||||
|
private readonly Dictionary<string, Dictionary<Delegate, List<Func<object, ValueTask>>>> _actions = new();
|
||||||
|
private readonly object _locker = new();
|
||||||
|
|
||||||
|
public Task Sub<TData>(in TypedKey<TData> key, Func<TData, ValueTask> action)
|
||||||
|
where TData : notnull
|
||||||
|
{
|
||||||
|
Func<object, ValueTask> localAction = obj => action((TData)obj);
|
||||||
|
lock (_locker)
|
||||||
|
{
|
||||||
|
if (!_actions.TryGetValue(key.Key, out var keyActions))
|
||||||
|
{
|
||||||
|
keyActions = new();
|
||||||
|
_actions[key.Key] = keyActions;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!keyActions.TryGetValue(action, out var sameActions))
|
||||||
|
{
|
||||||
|
sameActions = new();
|
||||||
|
keyActions[action] = sameActions;
|
||||||
|
}
|
||||||
|
|
||||||
|
sameActions.Add(localAction);
|
||||||
|
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task Pub<TData>(in TypedKey<TData> key, TData data)
|
||||||
|
where TData : notnull
|
||||||
|
{
|
||||||
|
lock (_locker)
|
||||||
|
{
|
||||||
|
if (_actions.TryGetValue(key.Key, out var actions))
|
||||||
|
// if this class ever gets used, this needs to be properly implemented
|
||||||
|
// 1. ignore all valuetasks which are completed
|
||||||
|
// 2. run all other tasks in parallel
|
||||||
|
return actions.SelectMany(kvp => kvp.Value).Select(action => action(data).AsTask()).WhenAll();
|
||||||
|
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task Unsub<TData>(in TypedKey<TData> key, Func<TData, ValueTask> action)
|
||||||
|
{
|
||||||
|
lock (_locker)
|
||||||
|
{
|
||||||
|
// get subscriptions for this action
|
||||||
|
if (_actions.TryGetValue(key.Key, out var actions))
|
||||||
|
// get subscriptions which have the same action hash code
|
||||||
|
// note: having this as a list allows for multiple subscriptions of
|
||||||
|
// the same insance's/static method
|
||||||
|
{
|
||||||
|
if (actions.TryGetValue(action, out var sameActions))
|
||||||
|
{
|
||||||
|
// remove last subscription
|
||||||
|
sameActions.RemoveAt(sameActions.Count - 1);
|
||||||
|
|
||||||
|
// if the last subscription was the only subscription
|
||||||
|
// we can safely remove this action's dictionary entry
|
||||||
|
if (sameActions.Count == 0)
|
||||||
|
{
|
||||||
|
actions.Remove(action);
|
||||||
|
|
||||||
|
// if our dictionary has no more elements after
|
||||||
|
// removing the entry
|
||||||
|
// it's safe to remove it from the key's subscriptions
|
||||||
|
if (actions.Count == 0)
|
||||||
|
_actions.Remove(key.Key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
10
src/EllieBot/Common/PubSub/IPubSub.cs
Normal file
10
src/EllieBot/Common/PubSub/IPubSub.cs
Normal file
|
@ -0,0 +1,10 @@
|
||||||
|
namespace EllieBot.Common;
|
||||||
|
|
||||||
|
public interface IPubSub
|
||||||
|
{
|
||||||
|
public Task Pub<TData>(in TypedKey<TData> key, TData data)
|
||||||
|
where TData : notnull;
|
||||||
|
|
||||||
|
public Task Sub<TData>(in TypedKey<TData> key, Func<TData, ValueTask> action)
|
||||||
|
where TData : notnull;
|
||||||
|
}
|
7
src/EllieBot/Common/PubSub/ISeria.cs
Normal file
7
src/EllieBot/Common/PubSub/ISeria.cs
Normal file
|
@ -0,0 +1,7 @@
|
||||||
|
namespace EllieBot.Common;
|
||||||
|
|
||||||
|
public interface ISeria
|
||||||
|
{
|
||||||
|
byte[] Serialize<T>(T data);
|
||||||
|
T? Deserialize<T>(byte[]? data);
|
||||||
|
}
|
27
src/EllieBot/Common/PubSub/JsonSeria.cs
Normal file
27
src/EllieBot/Common/PubSub/JsonSeria.cs
Normal file
|
@ -0,0 +1,27 @@
|
||||||
|
using EllieBot.Common.JsonConverters;
|
||||||
|
using System.Text.Json;
|
||||||
|
|
||||||
|
namespace EllieBot.Common;
|
||||||
|
|
||||||
|
public class JsonSeria : ISeria
|
||||||
|
{
|
||||||
|
private readonly JsonSerializerOptions _serializerOptions = new()
|
||||||
|
{
|
||||||
|
Converters =
|
||||||
|
{
|
||||||
|
new Rgba32Converter(),
|
||||||
|
new CultureInfoConverter()
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
public byte[] Serialize<T>(T data)
|
||||||
|
=> JsonSerializer.SerializeToUtf8Bytes(data, _serializerOptions);
|
||||||
|
|
||||||
|
public T? Deserialize<T>(byte[]? data)
|
||||||
|
{
|
||||||
|
if (data is null)
|
||||||
|
return default;
|
||||||
|
|
||||||
|
return JsonSerializer.Deserialize<T>(data, _serializerOptions);
|
||||||
|
}
|
||||||
|
}
|
52
src/EllieBot/Common/PubSub/RedisPubSub.cs
Normal file
52
src/EllieBot/Common/PubSub/RedisPubSub.cs
Normal file
|
@ -0,0 +1,52 @@
|
||||||
|
using StackExchange.Redis;
|
||||||
|
|
||||||
|
namespace EllieBot.Common;
|
||||||
|
|
||||||
|
public sealed class RedisPubSub : IPubSub
|
||||||
|
{
|
||||||
|
private readonly IBotCredentials _creds;
|
||||||
|
private readonly ConnectionMultiplexer _multi;
|
||||||
|
private readonly ISeria _serializer;
|
||||||
|
|
||||||
|
public RedisPubSub(ConnectionMultiplexer multi, ISeria serializer, IBotCredentials creds)
|
||||||
|
{
|
||||||
|
_multi = multi;
|
||||||
|
_serializer = serializer;
|
||||||
|
_creds = creds;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task Pub<TData>(in TypedKey<TData> key, TData data)
|
||||||
|
where TData : notnull
|
||||||
|
{
|
||||||
|
var serialized = _serializer.Serialize(data);
|
||||||
|
return _multi.GetSubscriber()
|
||||||
|
.PublishAsync($"{_creds.RedisKey()}:{key.Key}", serialized, CommandFlags.FireAndForget);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task Sub<TData>(in TypedKey<TData> key, Func<TData, ValueTask> action)
|
||||||
|
where TData : notnull
|
||||||
|
{
|
||||||
|
var eventName = key.Key;
|
||||||
|
|
||||||
|
async void OnSubscribeHandler(RedisChannel _, RedisValue data)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var dataObj = _serializer.Deserialize<TData>(data);
|
||||||
|
if (dataObj is not null)
|
||||||
|
await action(dataObj);
|
||||||
|
else
|
||||||
|
{
|
||||||
|
Log.Warning("Publishing event {EventName} with a null value. This is not allowed",
|
||||||
|
eventName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
Log.Error("Error handling the event {EventName}: {ErrorMessage}", eventName, ex.Message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return _multi.GetSubscriber().SubscribeAsync($"{_creds.RedisKey()}:{eventName}", OnSubscribeHandler);
|
||||||
|
}
|
||||||
|
}
|
30
src/EllieBot/Common/PubSub/TypedKey.cs
Normal file
30
src/EllieBot/Common/PubSub/TypedKey.cs
Normal file
|
@ -0,0 +1,30 @@
|
||||||
|
namespace EllieBot.Common;
|
||||||
|
|
||||||
|
public readonly struct TypedKey<TData>
|
||||||
|
{
|
||||||
|
public string Key { get; }
|
||||||
|
|
||||||
|
public TypedKey(in string key)
|
||||||
|
=> Key = key;
|
||||||
|
|
||||||
|
public static implicit operator TypedKey<TData>(in string input)
|
||||||
|
=> new(input);
|
||||||
|
|
||||||
|
public static implicit operator string(in TypedKey<TData> input)
|
||||||
|
=> input.Key;
|
||||||
|
|
||||||
|
public static bool operator ==(in TypedKey<TData> left, in TypedKey<TData> right)
|
||||||
|
=> left.Key == right.Key;
|
||||||
|
|
||||||
|
public static bool operator !=(in TypedKey<TData> left, in TypedKey<TData> right)
|
||||||
|
=> !(left == right);
|
||||||
|
|
||||||
|
public override bool Equals(object? obj)
|
||||||
|
=> obj is TypedKey<TData> o && o == this;
|
||||||
|
|
||||||
|
public override int GetHashCode()
|
||||||
|
=> Key?.GetHashCode() ?? 0;
|
||||||
|
|
||||||
|
public override string ToString()
|
||||||
|
=> Key;
|
||||||
|
}
|
39
src/EllieBot/Common/PubSub/YamlSeria.cs
Normal file
39
src/EllieBot/Common/PubSub/YamlSeria.cs
Normal file
|
@ -0,0 +1,39 @@
|
||||||
|
using EllieBot.Common.Configs;
|
||||||
|
using EllieBot.Common.Yml;
|
||||||
|
using System.Text.RegularExpressions;
|
||||||
|
using YamlDotNet.Serialization;
|
||||||
|
|
||||||
|
namespace EllieBot.Common;
|
||||||
|
|
||||||
|
public class YamlSeria : IConfigSeria
|
||||||
|
{
|
||||||
|
private static readonly Regex _codePointRegex =
|
||||||
|
new(@"(\\U(?<code>[a-zA-Z0-9]{8})|\\u(?<code>[a-zA-Z0-9]{4})|\\x(?<code>[a-zA-Z0-9]{2}))",
|
||||||
|
RegexOptions.Compiled);
|
||||||
|
|
||||||
|
private readonly IDeserializer _deserializer;
|
||||||
|
private readonly ISerializer _serializer;
|
||||||
|
|
||||||
|
public YamlSeria()
|
||||||
|
{
|
||||||
|
_serializer = Yaml.Serializer;
|
||||||
|
_deserializer = Yaml.Deserializer;
|
||||||
|
}
|
||||||
|
|
||||||
|
public string Serialize<T>(T obj)
|
||||||
|
where T : notnull
|
||||||
|
{
|
||||||
|
var escapedOutput = _serializer.Serialize(obj);
|
||||||
|
var output = _codePointRegex.Replace(escapedOutput,
|
||||||
|
me =>
|
||||||
|
{
|
||||||
|
var str = me.Groups["code"].Value;
|
||||||
|
var newString = YamlHelper.UnescapeUnicodeCodePoint(str);
|
||||||
|
return newString;
|
||||||
|
});
|
||||||
|
return output;
|
||||||
|
}
|
||||||
|
|
||||||
|
public T Deserialize<T>(string data)
|
||||||
|
=> _deserializer.Deserialize<T>(data);
|
||||||
|
}
|
Loading…
Reference in a new issue