From dac6e283e8a47ba19060b1295dec7b1970c2a2e5 Mon Sep 17 00:00:00 2001 From: Toastie Date: Sun, 12 May 2024 21:22:46 +1200 Subject: [PATCH] Added EllieBot.Voice --- EllieBot.sln | 19 +- src/EllieBot.Voice/CloseCodes.cs | 35 ++ src/EllieBot.Voice/EllieBot.Voice.csproj | 15 + src/EllieBot.Voice/LibOpus.cs | 125 ++++++ src/EllieBot.Voice/LibSodium.cs | 32 ++ src/EllieBot.Voice/Models/SelectProtocol.cs | 23 ++ src/EllieBot.Voice/Models/VoiceHello.cs | 10 + src/EllieBot.Voice/Models/VoiceIdentify.cs | 20 + src/EllieBot.Voice/Models/VoicePayload.cs | 29 ++ src/EllieBot.Voice/Models/VoiceReady.cs | 22 + src/EllieBot.Voice/Models/VoiceResume.cs | 16 + .../Models/VoiceSessionDescription.cs | 13 + src/EllieBot.Voice/Models/VoiceSpeaking.cs | 26 ++ src/EllieBot.Voice/PoopyBufferImmortalized.cs | 136 +++++++ src/EllieBot.Voice/SocketClient.cs | 154 +++++++ src/EllieBot.Voice/SongBuffer.cs | 95 +++++ src/EllieBot.Voice/VoiceClient.cs | 207 ++++++++++ src/EllieBot.Voice/VoiceGateway.cs | 375 ++++++++++++++++++ 18 files changed, 1339 insertions(+), 13 deletions(-) create mode 100644 src/EllieBot.Voice/CloseCodes.cs create mode 100644 src/EllieBot.Voice/EllieBot.Voice.csproj create mode 100644 src/EllieBot.Voice/LibOpus.cs create mode 100644 src/EllieBot.Voice/LibSodium.cs create mode 100644 src/EllieBot.Voice/Models/SelectProtocol.cs create mode 100644 src/EllieBot.Voice/Models/VoiceHello.cs create mode 100644 src/EllieBot.Voice/Models/VoiceIdentify.cs create mode 100644 src/EllieBot.Voice/Models/VoicePayload.cs create mode 100644 src/EllieBot.Voice/Models/VoiceReady.cs create mode 100644 src/EllieBot.Voice/Models/VoiceResume.cs create mode 100644 src/EllieBot.Voice/Models/VoiceSessionDescription.cs create mode 100644 src/EllieBot.Voice/Models/VoiceSpeaking.cs create mode 100644 src/EllieBot.Voice/PoopyBufferImmortalized.cs create mode 100644 src/EllieBot.Voice/SocketClient.cs create mode 100644 src/EllieBot.Voice/SongBuffer.cs create mode 100644 src/EllieBot.Voice/VoiceClient.cs create mode 100644 src/EllieBot.Voice/VoiceGateway.cs diff --git a/EllieBot.sln b/EllieBot.sln index ecc63b7..5a74f26 100644 --- a/EllieBot.sln +++ b/EllieBot.sln @@ -26,9 +26,7 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EllieBot.VotesApi", "src\El EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Ellie.Marmalade", "src\Ellie.Marmalade\Ellie.Marmalade.csproj", "{76AC715D-12FF-4CBE-9585-A861139A2D0C}" EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Ellie.Common", "src\Ellie.Common\Ellie.Common.csproj", "{5C1B88B0-B881-4E20-8382-4DDE275F8642}" -EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Ellie.Econ", "src\Ellie.Econ\Ellie.Econ.csproj", "{A73A6399-50E1-4362-BE29-86C2C88CF05A}" +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EllieBot.Voice", "src\EllieBot.Voice\EllieBot.Voice.csproj", "{1D93CE3C-80B4-49C7-A9A2-99988920AAEC}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -60,14 +58,10 @@ Global {76AC715D-12FF-4CBE-9585-A861139A2D0C}.Debug|Any CPU.Build.0 = Debug|Any CPU {76AC715D-12FF-4CBE-9585-A861139A2D0C}.Release|Any CPU.ActiveCfg = Release|Any CPU {76AC715D-12FF-4CBE-9585-A861139A2D0C}.Release|Any CPU.Build.0 = Release|Any CPU - {5C1B88B0-B881-4E20-8382-4DDE275F8642}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {5C1B88B0-B881-4E20-8382-4DDE275F8642}.Debug|Any CPU.Build.0 = Debug|Any CPU - {5C1B88B0-B881-4E20-8382-4DDE275F8642}.Release|Any CPU.ActiveCfg = Release|Any CPU - {5C1B88B0-B881-4E20-8382-4DDE275F8642}.Release|Any CPU.Build.0 = Release|Any CPU - {A73A6399-50E1-4362-BE29-86C2C88CF05A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {A73A6399-50E1-4362-BE29-86C2C88CF05A}.Debug|Any CPU.Build.0 = Debug|Any CPU - {A73A6399-50E1-4362-BE29-86C2C88CF05A}.Release|Any CPU.ActiveCfg = Release|Any CPU - {A73A6399-50E1-4362-BE29-86C2C88CF05A}.Release|Any CPU.Build.0 = Release|Any CPU + {1D93CE3C-80B4-49C7-A9A2-99988920AAEC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {1D93CE3C-80B4-49C7-A9A2-99988920AAEC}.Debug|Any CPU.Build.0 = Debug|Any CPU + {1D93CE3C-80B4-49C7-A9A2-99988920AAEC}.Release|Any CPU.ActiveCfg = Release|Any CPU + {1D93CE3C-80B4-49C7-A9A2-99988920AAEC}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -79,8 +73,7 @@ Global {CB1A5307-DD85-4795-8A8A-A25D36DADC51} = {B28FB883-9688-41EB-BF5A-945F4A4EB628} {F1A77F56-71B0-430E-AE46-94CDD7D43874} = {B28FB883-9688-41EB-BF5A-945F4A4EB628} {76AC715D-12FF-4CBE-9585-A861139A2D0C} = {B28FB883-9688-41EB-BF5A-945F4A4EB628} - {5C1B88B0-B881-4E20-8382-4DDE275F8642} = {B28FB883-9688-41EB-BF5A-945F4A4EB628} - {A73A6399-50E1-4362-BE29-86C2C88CF05A} = {B28FB883-9688-41EB-BF5A-945F4A4EB628} + {1D93CE3C-80B4-49C7-A9A2-99988920AAEC} = {B28FB883-9688-41EB-BF5A-945F4A4EB628} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {79F61C2C-CDBB-4361-A234-91A0B334CFE4} diff --git a/src/EllieBot.Voice/CloseCodes.cs b/src/EllieBot.Voice/CloseCodes.cs new file mode 100644 index 0000000..6102a19 --- /dev/null +++ b/src/EllieBot.Voice/CloseCodes.cs @@ -0,0 +1,35 @@ +using System.Collections.Generic; +using System.Collections.ObjectModel; + +namespace Ayu.Discord.Gateway +{ + public static class CloseCodes + { + private static IReadOnlyDictionary _closeCodes = new ReadOnlyDictionary( + new Dictionary() + { + { 4000, ("Unknown error", "We're not sure what went wrong. Try reconnecting?")}, + { 4001, ("Unknown opcode", "You sent an invalid Gateway opcode or an invalid payload for an opcode. Don't do that!")}, + { 4002, ("Decode error", "You sent an invalid payload to us. Don't do that!")}, + { 4003, ("Not authenticated", "You sent us a payload prior to identifying.")}, + { 4004, ("Authentication failed", "The account token sent with your identify payload is incorrect.")}, + { 4005, ("Already authenticated", "You sent more than one identify payload. Don't do that!")}, + { 4007, ("Invalid seq", "The sequence sent when resuming the session was invalid. Reconnect and start a new session.")}, + { 4008, ("Rate limited", "Woah nelly! You're sending payloads to us too quickly. Slow it down! You will be disconnected on receiving this.")}, + { 4009, ("Session timed out", "Your session timed out. Reconnect and start a new one.")}, + { 4010, ("Invalid shard", "You sent us an invalid shard when identifying.")}, + { 4011, ("Sharding required", "The session would have handled too many guilds - you are required to shard your connection in order to connect.")}, + { 4012, ("Invalid API version", "You sent an invalid version for the gateway.")}, + { 4013, ("Invalid intent(s)", "You sent an invalid intent for a Gateway Intent. You may have incorrectly calculated the bitwise value.")}, + { 4014, ("Disallowed intent(s)", "You sent a disallowed intent for a Gateway Intent. You may have tried to specify an intent that you have not enabled or are not whitelisted for.")} + }); + + public static (string Error, string Message) GetErrorCodeMessage(int closeCode) + { + if (_closeCodes.TryGetValue(closeCode, out var data)) + return data; + + return ("Unknown error", closeCode.ToString()); + } + } +} \ No newline at end of file diff --git a/src/EllieBot.Voice/EllieBot.Voice.csproj b/src/EllieBot.Voice/EllieBot.Voice.csproj new file mode 100644 index 0000000..b8d42d2 --- /dev/null +++ b/src/EllieBot.Voice/EllieBot.Voice.csproj @@ -0,0 +1,15 @@ + + + netstandard2.1 + 9.0 + true + CS8632 + 1.0.2 + EllieBot.Voice + + + + + + + diff --git a/src/EllieBot.Voice/LibOpus.cs b/src/EllieBot.Voice/LibOpus.cs new file mode 100644 index 0000000..a43a9d8 --- /dev/null +++ b/src/EllieBot.Voice/LibOpus.cs @@ -0,0 +1,125 @@ +using System; +using System.Runtime.InteropServices; + +namespace EllieBot.Voice +{ + internal static unsafe class LibOpus + { + public const string OPUS = "data/lib/opus"; + + [DllImport(OPUS, EntryPoint = "opus_encoder_create", CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr CreateEncoder(int Fs, int channels, int application, out OpusError error); + + [DllImport(OPUS, EntryPoint = "opus_encoder_destroy", CallingConvention = CallingConvention.Cdecl)] + internal static extern void DestroyEncoder(IntPtr encoder); + + [DllImport(OPUS, EntryPoint = "opus_encode", CallingConvention = CallingConvention.Cdecl)] + internal static extern int Encode(IntPtr st, byte* pcm, int frame_size, byte* data, int max_data_bytes); + + [DllImport(OPUS, EntryPoint = "opus_encode_float", CallingConvention = CallingConvention.Cdecl)] + internal static extern int EncodeFloat(IntPtr st, byte* pcm, int frame_size, byte* data, int max_data_bytes); + + [DllImport(OPUS, EntryPoint = "opus_encoder_ctl", CallingConvention = CallingConvention.Cdecl)] + internal static extern int EncoderCtl(IntPtr st, OpusCtl request, int value); + } + + public enum OpusApplication + { + VOIP = 2048, + Audio = 2049, + RestrictedLowdelay = 2051 + } + + public unsafe class LibOpusEncoder : IDisposable + { + private readonly IntPtr _encoderPtr; + + private readonly int _sampleRate; + + // private readonly int _channels; + // private readonly int _bitRate; + private readonly int _frameDelay; + + private readonly int _frameSizePerChannel; + public int FrameSizePerChannel => _frameSizePerChannel; + + public const int MaxData = 1276; + + public LibOpusEncoder(int sampleRate, int channels, int bitRate, int frameDelay) + { + _sampleRate = sampleRate; + // _channels = channels; + // _bitRate = bitRate; + _frameDelay = frameDelay; + _frameSizePerChannel = _sampleRate * _frameDelay / 1000; + + _encoderPtr = LibOpus.CreateEncoder(sampleRate, channels, (int)OpusApplication.Audio, out var error); + if (error != OpusError.OK) + throw new ExternalException(error.ToString()); + + LibOpus.EncoderCtl(_encoderPtr, OpusCtl.SetSignal, (int)OpusSignal.Music); + LibOpus.EncoderCtl(_encoderPtr, OpusCtl.SetInbandFEC, 1); + LibOpus.EncoderCtl(_encoderPtr, OpusCtl.SetBitrate, bitRate); + LibOpus.EncoderCtl(_encoderPtr, OpusCtl.SetPacketLossPerc, 2); + } + + public int SetControl(OpusCtl ctl, int value) + => LibOpus.EncoderCtl(_encoderPtr, ctl, value); + + public int Encode(Span input, byte[] output) + { + fixed (byte* inPtr = input) + fixed (byte* outPtr = output) + return LibOpus.Encode(_encoderPtr, inPtr, FrameSizePerChannel, outPtr, output.Length); + } + + public int EncodeFloat(Span input, byte[] output) + { + fixed (byte* inPtr = input) + fixed (byte* outPtr = output) + return LibOpus.EncodeFloat(_encoderPtr, inPtr, FrameSizePerChannel, outPtr, output.Length); + } + + + public void Dispose() + => LibOpus.DestroyEncoder(_encoderPtr); + } + + public enum OpusCtl + { + SetBitrate = 4002, + GetBitrate = 4003, + SetBandwidth = 4008, + GetBandwidth = 4009, + SetComplexity = 4010, + GetComplexity = 4011, + SetInbandFEC = 4012, + GetInbandFEC = 4013, + SetPacketLossPerc = 4014, + GetPacketLossPerc = 4015, + SetLsbDepth = 4036, + GetLsbDepth = 4037, + SetDtx = 4016, + GetDtx = 4017, + SetSignal = 4024 + } + + public enum OpusError + { + OK = 0, + BadArg = -1, + BufferToSmall = -2, + InternalError = -3, + InvalidPacket = -4, + Unimplemented = -5, + InvalidState = -6, + AllocFail = -7 + } + + public enum OpusSignal + { + Auto = -1000, + Voice = 3001, + Music = 3002, + } +} \ No newline at end of file diff --git a/src/EllieBot.Voice/LibSodium.cs b/src/EllieBot.Voice/LibSodium.cs new file mode 100644 index 0000000..bbbc77d --- /dev/null +++ b/src/EllieBot.Voice/LibSodium.cs @@ -0,0 +1,32 @@ +using System; +using System.Runtime.InteropServices; + +namespace EllieBot.Voice +{ + internal static unsafe class Sodium + { + private const string SODIUM = "data/lib/libsodium"; + + [DllImport(SODIUM, EntryPoint = "crypto_secretbox_easy", CallingConvention = CallingConvention.Cdecl)] + private static extern int SecretBoxEasy(byte* output, byte* input, long inputLength, byte* nonce, byte* secret); + [DllImport(SODIUM, EntryPoint = "crypto_secretbox_open_easy", CallingConvention = CallingConvention.Cdecl)] + private static extern int SecretBoxOpenEasy(byte* output, byte* input, ulong inputLength, byte* nonce, byte* secret); + + public static int Encrypt(byte[] input, int inputOffset, long inputLength, byte[] output, int outputOffset, in ReadOnlySpan nonce, byte[] secret) + { + fixed (byte* inPtr = input) + fixed (byte* outPtr = output) + fixed (byte* noncePtr = nonce) + fixed (byte* secretPtr = secret) + return SecretBoxEasy(outPtr + outputOffset, inPtr + inputOffset, inputLength - inputOffset, noncePtr, secretPtr); + } + public static int Decrypt(byte[] input, ulong inputLength, byte[] output, in ReadOnlySpan nonce, byte[] secret) + { + fixed (byte* outPtr = output) + fixed (byte* inPtr = input) + fixed (byte* noncePtr = nonce) + fixed (byte* secretPtr = secret) + return SecretBoxOpenEasy(outPtr, inPtr, inputLength, noncePtr, secretPtr); + } + } +} \ No newline at end of file diff --git a/src/EllieBot.Voice/Models/SelectProtocol.cs b/src/EllieBot.Voice/Models/SelectProtocol.cs new file mode 100644 index 0000000..1a9dfa9 --- /dev/null +++ b/src/EllieBot.Voice/Models/SelectProtocol.cs @@ -0,0 +1,23 @@ +using Newtonsoft.Json; + +namespace EllieBot.Voice.Models +{ + public sealed class SelectProtocol + { + [JsonProperty("protocol")] + public string Protocol { get; set; } + + [JsonProperty("data")] + public ProtocolData Data { get; set; } + + public sealed class ProtocolData + { + [JsonProperty("address")] + public string Address { get; set; } + [JsonProperty("port")] + public int Port { get; set; } + [JsonProperty("mode")] + public string Mode { get; set; } + } + } +} \ No newline at end of file diff --git a/src/EllieBot.Voice/Models/VoiceHello.cs b/src/EllieBot.Voice/Models/VoiceHello.cs new file mode 100644 index 0000000..8fda1d1 --- /dev/null +++ b/src/EllieBot.Voice/Models/VoiceHello.cs @@ -0,0 +1,10 @@ +using Newtonsoft.Json; + +namespace EllieBot.Voice.Models +{ + public sealed class VoiceHello + { + [JsonProperty("heartbeat_interval")] + public int HeartbeatInterval { get; set; } + } +} \ No newline at end of file diff --git a/src/EllieBot.Voice/Models/VoiceIdentify.cs b/src/EllieBot.Voice/Models/VoiceIdentify.cs new file mode 100644 index 0000000..9841869 --- /dev/null +++ b/src/EllieBot.Voice/Models/VoiceIdentify.cs @@ -0,0 +1,20 @@ +using Newtonsoft.Json; + +namespace EllieBot.Voice.Models +{ + public sealed class VoiceIdentify + { + [JsonProperty("server_id")] + public string ServerId { get; set; } + + [JsonProperty("user_id")] + public string UserId { get; set; } + + [JsonProperty("session_id")] + public string SessionId { get; set; } + + [JsonProperty("token")] + public string Token { get; set; } + + } +} \ No newline at end of file diff --git a/src/EllieBot.Voice/Models/VoicePayload.cs b/src/EllieBot.Voice/Models/VoicePayload.cs new file mode 100644 index 0000000..5ac642e --- /dev/null +++ b/src/EllieBot.Voice/Models/VoicePayload.cs @@ -0,0 +1,29 @@ +using Newtonsoft.Json; +using Newtonsoft.Json.Linq; + +namespace Discord.Models.Gateway +{ + public sealed class VoicePayload + { + [JsonProperty("op")] + public VoiceOpCode OpCode { get; set; } + + [JsonProperty("d")] + public JToken Data { get; set; } + } + + public enum VoiceOpCode + { + Identify = 0, + SelectProtocol = 1, + Ready = 2, + Heartbeat = 3, + SessionDescription = 4, + Speaking = 5, + HeartbeatAck = 6, + Resume = 7, + Hello = 8, + Resumed = 9, + ClientDisconnect = 13, + } +} \ No newline at end of file diff --git a/src/EllieBot.Voice/Models/VoiceReady.cs b/src/EllieBot.Voice/Models/VoiceReady.cs new file mode 100644 index 0000000..99cc753 --- /dev/null +++ b/src/EllieBot.Voice/Models/VoiceReady.cs @@ -0,0 +1,22 @@ +using Newtonsoft.Json; + +namespace EllieBot.Voice.Models +{ + public sealed class VoiceReady + { + [JsonProperty("ssrc")] + public uint Ssrc { get; set; } + + [JsonProperty("ip")] + public string Ip { get; set; } + + [JsonProperty("port")] + public int Port { get; set; } + + [JsonProperty("modes")] + public string[] Modes { get; set; } + + [JsonProperty("heartbeat_interval")] + public string HeartbeatInterval { get; set; } + } +} \ No newline at end of file diff --git a/src/EllieBot.Voice/Models/VoiceResume.cs b/src/EllieBot.Voice/Models/VoiceResume.cs new file mode 100644 index 0000000..bad82b2 --- /dev/null +++ b/src/EllieBot.Voice/Models/VoiceResume.cs @@ -0,0 +1,16 @@ +using Newtonsoft.Json; + +namespace EllieBot.Voice.Models +{ + public sealed class VoiceResume + { + [JsonProperty("server_id")] + public string ServerId { get; set; } + + [JsonProperty("session_id")] + public string SessionId { get; set; } + + [JsonProperty("token")] + public string Token { get; set; } + } +} \ No newline at end of file diff --git a/src/EllieBot.Voice/Models/VoiceSessionDescription.cs b/src/EllieBot.Voice/Models/VoiceSessionDescription.cs new file mode 100644 index 0000000..85bdc5e --- /dev/null +++ b/src/EllieBot.Voice/Models/VoiceSessionDescription.cs @@ -0,0 +1,13 @@ +using Newtonsoft.Json; + +namespace EllieBot.Voice.Models +{ + public sealed class VoiceSessionDescription + { + [JsonProperty("mode")] + public string Mode { get; set; } + + [JsonProperty("secret_key")] + public byte[] SecretKey { get; set; } + } +} \ No newline at end of file diff --git a/src/EllieBot.Voice/Models/VoiceSpeaking.cs b/src/EllieBot.Voice/Models/VoiceSpeaking.cs new file mode 100644 index 0000000..a9a0610 --- /dev/null +++ b/src/EllieBot.Voice/Models/VoiceSpeaking.cs @@ -0,0 +1,26 @@ +using Newtonsoft.Json; +using System; + +namespace EllieBot.Voice.Models +{ + public sealed class VoiceSpeaking + { + [JsonProperty("speaking")] + public int Speaking { get; set; } + + [JsonProperty("delay")] + public int Delay { get; set; } + + [JsonProperty("ssrc")] + public uint Ssrc { get; set; } + + [Flags] + public enum State + { + None = 0, + Microphone = 1 << 0, + Soundshare = 1 << 1, + Priority = 1 << 2 + } + } +} \ No newline at end of file diff --git a/src/EllieBot.Voice/PoopyBufferImmortalized.cs b/src/EllieBot.Voice/PoopyBufferImmortalized.cs new file mode 100644 index 0000000..4a80c86 --- /dev/null +++ b/src/EllieBot.Voice/PoopyBufferImmortalized.cs @@ -0,0 +1,136 @@ +#nullable enable +using System; +using System.Buffers; +using System.Threading; +using System.Threading.Tasks; + +namespace EllieBot.Voice +{ + public sealed class PoopyBufferImmortalized : ISongBuffer + { + private readonly byte[] _buffer; + private readonly byte[] _outputArray; + private CancellationToken _cancellationToken; + private bool _isStopped; + + public int ReadPosition { get; private set; } + public int WritePosition { get; private set; } + + public int ContentLength => WritePosition >= ReadPosition + ? WritePosition - ReadPosition + : (_buffer.Length - ReadPosition) + WritePosition; + + public int FreeSpace => _buffer.Length - ContentLength; + + public bool Stopped => _cancellationToken.IsCancellationRequested || _isStopped; + + public PoopyBufferImmortalized(int frameSize) + { + _buffer = ArrayPool.Shared.Rent(1_000_000); + _outputArray = new byte[frameSize]; + + ReadPosition = 0; + WritePosition = 0; + } + + public void Stop() + => _isStopped = true; + + // this method needs a rewrite + public Task BufferAsync(ITrackDataSource source, CancellationToken cancellationToken) + { + _cancellationToken = cancellationToken; + var bufferingCompleted = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + Task.Run(async () => + { + var output = ArrayPool.Shared.Rent(38400); + try + { + int read; + while (!Stopped && (read = source.Read(output)) > 0) + { + while (!Stopped && FreeSpace <= read) + { + bufferingCompleted.TrySetResult(true); + await Task.Delay(100, cancellationToken).ConfigureAwait(false); + } + + if (Stopped) + break; + + Write(output, read); + } + } + finally + { + ArrayPool.Shared.Return(output); + bufferingCompleted.TrySetResult(true); + } + }, cancellationToken); + + return bufferingCompleted.Task; + } + + private void Write(byte[] input, int writeCount) + { + if (WritePosition + writeCount < _buffer.Length) + { + Buffer.BlockCopy(input, 0, _buffer, WritePosition, writeCount); + WritePosition += writeCount; + return; + } + + var wroteNormally = _buffer.Length - WritePosition; + Buffer.BlockCopy(input, 0, _buffer, WritePosition, wroteNormally); + var wroteFromStart = writeCount - wroteNormally; + Buffer.BlockCopy(input, wroteNormally, _buffer, 0, wroteFromStart); + WritePosition = wroteFromStart; + } + + public Span Read(int count, out int length) + { + var toRead = Math.Min(ContentLength, count); + var wp = WritePosition; + + if (ContentLength == 0) + { + length = 0; + return Span.Empty; + } + + if (wp > ReadPosition || ReadPosition + toRead <= _buffer.Length) + { + // thsi can be achieved without copying if + // writer never writes until the end, + // but leaves a single chunk free + Span toReturn = _outputArray; + ((Span)_buffer).Slice(ReadPosition, toRead).CopyTo(toReturn); + ReadPosition += toRead; + length = toRead; + return toReturn; + } + else + { + Span toReturn = _outputArray; + var toEnd = _buffer.Length - ReadPosition; + var bufferSpan = (Span)_buffer; + + bufferSpan.Slice(ReadPosition, toEnd).CopyTo(toReturn); + var fromStart = toRead - toEnd; + bufferSpan.Slice(0, fromStart).CopyTo(toReturn.Slice(toEnd)); + ReadPosition = fromStart; + length = toEnd + fromStart; + return toReturn; + } + } + + public void Dispose() + => ArrayPool.Shared.Return(_buffer); + + public void Reset() + { + ReadPosition = 0; + WritePosition = 0; + } + } +} \ No newline at end of file diff --git a/src/EllieBot.Voice/SocketClient.cs b/src/EllieBot.Voice/SocketClient.cs new file mode 100644 index 0000000..7c32af1 --- /dev/null +++ b/src/EllieBot.Voice/SocketClient.cs @@ -0,0 +1,154 @@ +using Serilog; +using System; +using System.Buffers; +using System.Net.WebSockets; +using System.Threading; +using System.Threading.Tasks; + +namespace Ayu.Discord.Gateway +{ + public class SocketClient : IDisposable + { + private ClientWebSocket? _ws; + + public event Func? PayloadReceived = delegate { return Task.CompletedTask; }; + public event Func? WebsocketClosed = delegate { return Task.CompletedTask; }; + + const int CHUNK_SIZE = 1024 * 16; + + public async Task RunAndBlockAsync(Uri url, CancellationToken cancel) + { + var error = "Error."; + var bufferWriter = new ArrayBufferWriter(CHUNK_SIZE); + try + { + using (_ws = new()) + { + await _ws.ConnectAsync(url, cancel).ConfigureAwait(false); + // WebsocketConnected!.Invoke(this); + + while (true) + { + var result = await _ws.ReceiveAsync(bufferWriter.GetMemory(CHUNK_SIZE), cancel); + bufferWriter.Advance(result.Count); + if (result.MessageType == WebSocketMessageType.Close) + { + var closeMessage = CloseCodes.GetErrorCodeMessage((int?)_ws.CloseStatus ?? 0).Message; + error = $"Websocket closed ({_ws.CloseStatus}): {_ws.CloseStatusDescription} {closeMessage}"; + break; + } + + if (result.EndOfMessage) + { + var pr = PayloadReceived; + var data = bufferWriter.WrittenMemory.ToArray(); + bufferWriter.Clear(); + + if (pr is not null) + { + await pr.Invoke(data); + } + } + } + } + } + catch (WebSocketException ex) + { + Log.Warning("Disconnected, check your internet connection..."); + Log.Debug(ex, "Websocket Exception in websocket client"); + } + catch (OperationCanceledException) + { + // ignored + } + catch (Exception ex) + { + Log.Error(ex, "Error in websocket client. {Message}", ex.Message); + } + finally + { + bufferWriter.Clear(); + _ws = null; + await ClosedAsync(error).ConfigureAwait(false); + } + } + + private async Task ClosedAsync(string msg = "Error") + { + try + { + await WebsocketClosed!.Invoke(msg).ConfigureAwait(false); + } + catch + { + } + } + + private readonly SemaphoreSlim _sendLock = new SemaphoreSlim(1, 1); + + public async Task SendAsync(byte[] data) + { + await _sendLock.WaitAsync().ConfigureAwait(false); + try + { + var ws = _ws; + if (ws is null) + throw new WebSocketException("Websocket is disconnected."); + for (var i = 0; i < data.Length; i += 4096) + { + var count = i + 4096 > data.Length ? data.Length - i : 4096; + await ws.SendAsync(new(data, i, count), + WebSocketMessageType.Text, + i + count >= data.Length, + CancellationToken.None).ConfigureAwait(false); + } + } + finally + { + _sendLock.Release(); + } + } + + public async Task SendBulkAsync(byte[] data) + { + var ws = _ws; + if (ws is null) + throw new WebSocketException("Websocket is disconnected."); + + await ws.SendAsync(new(data, 0, data.Length), + WebSocketMessageType.Binary, + true, + CancellationToken.None).ConfigureAwait(false); + } + + public async Task CloseAsync(string msg = "Stop") + { + if (_ws is not null && _ws.State != WebSocketState.Closed) + { + try + { + await _ws.CloseAsync(WebSocketCloseStatus.InternalServerError, msg, CancellationToken.None) + .ConfigureAwait(false); + + return true; + } + catch + { + } + } + + return false; + } + + public void Dispose() + { + PayloadReceived = null; + WebsocketClosed = null; + var ws = _ws; + if (ws is null) + return; + + ws.Dispose(); + } + } +} \ No newline at end of file diff --git a/src/EllieBot.Voice/SongBuffer.cs b/src/EllieBot.Voice/SongBuffer.cs new file mode 100644 index 0000000..73f2bd1 --- /dev/null +++ b/src/EllieBot.Voice/SongBuffer.cs @@ -0,0 +1,95 @@ +using Serilog; +using System; +using System.Diagnostics; +using System.Threading; +using System.Threading.Tasks; + +namespace EllieBot.Voice +{ + public interface ISongBuffer : IDisposable + { + Span Read(int toRead, out int read); + Task BufferAsync(ITrackDataSource source, CancellationToken cancellationToken); + void Reset(); + void Stop(); + } + + public interface ITrackDataSource + { + public int Read(byte[] output); + } + + public sealed class FfmpegTrackDataSource : ITrackDataSource, IDisposable + { + private Process _p; + + private readonly string _streamUrl; + private readonly bool _isLocal; + private readonly string _pcmType; + + private FfmpegTrackDataSource(int bitDepth, string streamUrl, bool isLocal) + { + this._pcmType = bitDepth == 16 ? "s16le" : "f32le"; + this._streamUrl = streamUrl; + this._isLocal = isLocal; + } + + public static FfmpegTrackDataSource CreateAsync(int bitDepth, string streamUrl, bool isLocal) + { + try + { + var source = new FfmpegTrackDataSource(bitDepth, streamUrl, isLocal); + source.StartFFmpegProcess(); + return source; + } + catch (System.ComponentModel.Win32Exception) + { + Log.Error(@"You have not properly installed or configured FFMPEG. +Please install and configure FFMPEG to play music. +Check the guides for your platform on how to setup ffmpeg correctly: + Windows Guide: https://goo.gl/OjKk8F + Linux Guide: https://goo.gl/ShjCUo"); + throw; + } + catch (OperationCanceledException) + { + } + catch (InvalidOperationException) + { + } + catch (Exception ex) + { + Log.Information(ex, "Error starting ffmpeg: {ErrorMessage}", ex.Message); + } + + return null; + } + + private Process StartFFmpegProcess() + { + var args = $"-err_detect ignore_err -i {_streamUrl} -f {_pcmType} -ar 48000 -vn -ac 2 pipe:1 -loglevel error"; + if (!_isLocal) + args = $"-reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 5 {args}"; + + return _p = Process.Start(new ProcessStartInfo + { + FileName = "ffmpeg", + Arguments = args, + UseShellExecute = false, + RedirectStandardOutput = true, + RedirectStandardError = false, + CreateNoWindow = true, + }); + } + + public int Read(byte[] output) + => _p.StandardOutput.BaseStream.Read(output); + + public void Dispose() + { + try { _p?.Kill(); } catch { } + + try { _p?.Dispose(); } catch { } + } + } +} \ No newline at end of file diff --git a/src/EllieBot.Voice/VoiceClient.cs b/src/EllieBot.Voice/VoiceClient.cs new file mode 100644 index 0000000..12d6e20 --- /dev/null +++ b/src/EllieBot.Voice/VoiceClient.cs @@ -0,0 +1,207 @@ +using System; +using System.Buffers; + +namespace EllieBot.Voice +{ + public sealed class VoiceClient : IDisposable + { + delegate int EncodeDelegate(Span input, byte[] output); + + private readonly int sampleRate; + private readonly int bitRate; + private readonly int channels; + private readonly int frameDelay; + private readonly int bitDepth; + + public LibOpusEncoder Encoder { get; } + private readonly ArrayPool _arrayPool; + + public int BitDepth => bitDepth * 8; + public int Delay => frameDelay; + + private int FrameSizePerChannel => Encoder.FrameSizePerChannel; + public int InputLength => FrameSizePerChannel * channels * bitDepth; + + EncodeDelegate Encode; + + // https://github.com/xiph/opus/issues/42 w + public VoiceClient(SampleRate sampleRate = SampleRate._48k, + Bitrate bitRate = Bitrate._192k, + Channels channels = Channels.Two, + FrameDelay frameDelay = FrameDelay.Delay20, + BitDepthEnum bitDepthEnum = BitDepthEnum.Float32) + { + this.frameDelay = (int)frameDelay; + this.sampleRate = (int)sampleRate; + this.bitRate = (int)bitRate; + this.channels = (int)channels; + this.bitDepth = (int)bitDepthEnum; + + this.Encoder = new(this.sampleRate, this.channels, this.bitRate, this.frameDelay); + + Encode = bitDepthEnum switch + { + BitDepthEnum.Float32 => Encoder.EncodeFloat, + BitDepthEnum.UInt16 => Encoder.Encode, + _ => throw new NotSupportedException(nameof(BitDepth)) + }; + + if (bitDepthEnum == BitDepthEnum.Float32) + { + Encode = Encoder.EncodeFloat; + } + else + { + Encode = Encoder.Encode; + } + + _arrayPool = ArrayPool.Shared; + } + + public int SendPcmFrame(VoiceGateway gw, Span data, int offset, int count) + { + var secretKey = gw.SecretKey; + if (secretKey.Length == 0) + { + return (int)SendPcmError.SecretKeyUnavailable; + } + + // encode using opus + var encodeOutput = _arrayPool.Rent(LibOpusEncoder.MaxData); + try + { + var encodeOutputLength = Encode(data, encodeOutput); + return SendOpusFrame(gw, encodeOutput, 0, encodeOutputLength); + } + finally + { + _arrayPool.Return(encodeOutput); + } + } + + public int SendOpusFrame(VoiceGateway gw, byte[] data, int offset, int count) + { + var secretKey = gw.SecretKey; + if (secretKey is null) + { + return (int)SendPcmError.SecretKeyUnavailable; + } + + // form RTP header + var headerLength = 1 // version + flags + + 1 // payload type + + 2 // sequence + + 4 // timestamp + + 4; // ssrc + + var header = new byte[headerLength]; + + header[0] = 0x80; // version + flags + header[1] = 0x78; // payload type + + // get byte values for header data + var seqBytes = BitConverter.GetBytes(gw.Sequence); // 2 + var nonceBytes = BitConverter.GetBytes(gw.NonceSequence); // 2 + var timestampBytes = BitConverter.GetBytes(gw.Timestamp); // 4 + var ssrcBytes = BitConverter.GetBytes(gw.Ssrc); // 4 + + gw.Timestamp += (uint)FrameSizePerChannel; + gw.Sequence++; + gw.NonceSequence++; + + if (BitConverter.IsLittleEndian) + { + Array.Reverse(seqBytes); + Array.Reverse(nonceBytes); + Array.Reverse(timestampBytes); + Array.Reverse(ssrcBytes); + } + + // copy headers + Buffer.BlockCopy(seqBytes, 0, header, 2, 2); + Buffer.BlockCopy(timestampBytes, 0, header, 4, 4); + Buffer.BlockCopy(ssrcBytes, 0, header, 8, 4); + + //// encryption part + //// create a byte array where to store the encrypted data + //// it has to be inputLength + crypto_secretbox_MACBYTES (constant with value 16) + var encryptedBytes = new byte[count + 16]; + + //// form nonce with header + 12 empty bytes + //var nonce = new byte[24]; + //Buffer.BlockCopy(rtpHeader, 0, nonce, 0, rtpHeader.Length); + + var nonce = new byte[4]; + Buffer.BlockCopy(seqBytes, 0, nonce, 2, 2); + + Sodium.Encrypt(data, 0, count, encryptedBytes, 0, nonce, secretKey); + + var rtpDataLength = headerLength + encryptedBytes.Length + nonce.Length; + var rtpData = _arrayPool.Rent(rtpDataLength); + try + { + //copy headers + Buffer.BlockCopy(header, 0, rtpData, 0, header.Length); + //copy audio data + Buffer.BlockCopy(encryptedBytes, 0, rtpData, header.Length, encryptedBytes.Length); + Buffer.BlockCopy(nonce, 0, rtpData, rtpDataLength - 4, 4); + + gw.SendRtpData(rtpData, rtpDataLength); + // FUTURE When there's a break in the sent data, + // the packet transmission shouldn't simply stop. + // Instead, send five frames of silence (0xF8, 0xFF, 0xFE) + // before stopping to avoid unintended Opus interpolation + // with subsequent transmissions. + + return rtpDataLength; + } + finally + { + _arrayPool.Return(rtpData); + } + } + + public void Dispose() + => Encoder.Dispose(); + } + + public enum SendPcmError + { + SecretKeyUnavailable = -1, + } + + + public enum FrameDelay + { + Delay5 = 5, + Delay10 = 10, + Delay20 = 20, + Delay40 = 40, + Delay60 = 60, + } + + public enum BitDepthEnum + { + UInt16 = sizeof(UInt16), + Float32 = sizeof(float), + } + + public enum SampleRate + { + _48k = 48_000, + } + + public enum Bitrate + { + _64k = 64 * 1024, + _96k = 96 * 1024, + _128k = 128 * 1024, + _192k = 192 * 1024, + } + + public enum Channels + { + One = 1, + Two = 2, + } +} \ No newline at end of file diff --git a/src/EllieBot.Voice/VoiceGateway.cs b/src/EllieBot.Voice/VoiceGateway.cs new file mode 100644 index 0000000..af20aa3 --- /dev/null +++ b/src/EllieBot.Voice/VoiceGateway.cs @@ -0,0 +1,375 @@ +using EllieBot.Voice.Models; +using Discord.Models.Gateway; +using Newtonsoft.Json.Linq; +using Serilog; +using System; +using System.Net; +using System.Net.Sockets; +using System.Text; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; +using Ayu.Discord.Gateway; +using Newtonsoft.Json; + +namespace EllieBot.Voice +{ + public class VoiceGateway + { + private class QueueItem + { + public VoicePayload Payload { get; } + public TaskCompletionSource Result { get; } + + public QueueItem(VoicePayload payload, TaskCompletionSource result) + { + Payload = payload; + Result = result; + } + } + + private readonly ulong _guildId; + private readonly ulong _userId; + private readonly string _sessionId; + private readonly string _token; + private readonly string _endpoint; + private readonly Uri _websocketUrl; + private readonly Channel _channel; + + public TaskCompletionSource ConnectingFinished { get; } + + private readonly Random _rng; + private readonly SocketClient _ws; + private readonly UdpClient _udpClient; + private Timer? _heartbeatTimer; + private bool _receivedAck; + private IPEndPoint? _udpEp; + + public uint Ssrc { get; private set; } + public string Ip { get; private set; } = string.Empty; + public int Port { get; private set; } = 0; + public byte[] SecretKey { get; private set; } = Array.Empty(); + public string Mode { get; private set; } = string.Empty; + public ushort Sequence { get; set; } + public uint NonceSequence { get; set; } + public uint Timestamp { get; set; } + public string MyIp { get; private set; } = string.Empty; + public ushort MyPort { get; private set; } + private bool _shouldResume; + + private readonly CancellationTokenSource _stopCancellationSource; + private readonly CancellationToken _stopCancellationToken; + public bool Stopped => _stopCancellationToken.IsCancellationRequested; + + public event Func OnClosed = delegate { return Task.CompletedTask; }; + + public VoiceGateway(ulong guildId, ulong userId, string session, string token, string endpoint) + { + this._guildId = guildId; + this._userId = userId; + this._sessionId = session; + this._token = token; + this._endpoint = endpoint; + + //Log.Information("g: {GuildId} u: {UserId} sess: {Session} tok: {Token} ep: {Endpoint}", + // guildId, userId, session, token, endpoint); + + this._websocketUrl = new($"wss://{_endpoint.Replace(":80", "")}?v=4"); + this._channel = Channel.CreateUnbounded(new() + { + SingleReader = true, + SingleWriter = false, + AllowSynchronousContinuations = false, + }); + + ConnectingFinished = new(); + + _rng = new(); + + _ws = new(); + _udpClient = new(); + _stopCancellationSource = new(); + _stopCancellationToken = _stopCancellationSource.Token; + + _ws.PayloadReceived += _ws_PayloadReceived; + _ws.WebsocketClosed += _ws_WebsocketClosed; + } + + public Task WaitForReadyAsync() + => ConnectingFinished.Task; + + private async Task SendLoop() + { + while (!_stopCancellationToken.IsCancellationRequested) + { + try + { + var qi = await _channel.Reader.ReadAsync(_stopCancellationToken); + //Log.Information("Sending payload with opcode {OpCode}", qi.Payload.OpCode); + + var json = JsonConvert.SerializeObject(qi.Payload); + + if (!_stopCancellationToken.IsCancellationRequested) + await _ws.SendAsync(Encoding.UTF8.GetBytes(json)); + _ = Task.Run(() => qi.Result.TrySetResult(true)); + } + catch (ChannelClosedException) + { + Log.Warning("Voice gateway send channel is closed"); + } + } + } + + private async Task _ws_PayloadReceived(byte[] arg) + { + var payload = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(arg)); + if (payload is null) + return; + try + { + //Log.Information("Received payload with opcode {OpCode}", payload.OpCode); + + switch (payload.OpCode) + { + case VoiceOpCode.Identify: + // sent, not received. + break; + case VoiceOpCode.SelectProtocol: + // sent, not received + break; + case VoiceOpCode.Ready: + var ready = payload.Data.ToObject(); + await HandleReadyAsync(ready!); + _shouldResume = true; + break; + case VoiceOpCode.Heartbeat: + // sent, not received + break; + case VoiceOpCode.SessionDescription: + var sd = payload.Data.ToObject(); + await HandleSessionDescription(sd!); + break; + case VoiceOpCode.Speaking: + // ignore for now + break; + case VoiceOpCode.HeartbeatAck: + _receivedAck = true; + break; + case VoiceOpCode.Resume: + // sent, not received + break; + case VoiceOpCode.Hello: + var hello = payload.Data.ToObject(); + await HandleHelloAsync(hello!); + break; + case VoiceOpCode.Resumed: + _shouldResume = true; + break; + case VoiceOpCode.ClientDisconnect: + break; + } + } + catch (Exception ex) + { + Log.Error(ex, "Error handling payload with opcode {OpCode}: {Message}", payload.OpCode, ex.Message); + } + } + private Task _ws_WebsocketClosed(string arg) + { + if (!string.IsNullOrWhiteSpace(arg)) + { + Log.Warning("Voice Websocket closed: {Arg}", arg); + } + + var hbt = _heartbeatTimer; + hbt?.Change(Timeout.Infinite, Timeout.Infinite); + _heartbeatTimer = null; + + if (!_stopCancellationToken.IsCancellationRequested && _shouldResume) + { + _ = _ws.RunAndBlockAsync(_websocketUrl, _stopCancellationToken); + return Task.CompletedTask; + } + + _ws.WebsocketClosed -= _ws_WebsocketClosed; + _ws.PayloadReceived -= _ws_PayloadReceived; + + if (!_stopCancellationToken.IsCancellationRequested) + _stopCancellationSource.Cancel(); + + return this.OnClosed(this); + } + + public void SendRtpData(byte[] rtpData, int length) + => _udpClient.Send(rtpData, length, _udpEp); + + private Task HandleSessionDescription(VoiceSessionDescription sd) + { + SecretKey = sd.SecretKey; + Mode = sd.Mode; + + _ = Task.Run(() => ConnectingFinished.TrySetResult(true)); + + return Task.CompletedTask; + } + + private Task ResumeAsync() + { + _shouldResume = false; + return SendCommandPayloadAsync(new() + { + OpCode = VoiceOpCode.Resume, + Data = JToken.FromObject(new VoiceResume + { + ServerId = this._guildId.ToString(), + SessionId = this._sessionId, + Token = this._token, + }) + }); + } + + private async Task HandleReadyAsync(VoiceReady ready) + { + Ssrc = ready.Ssrc; + + //Log.Information("Received ready {GuildId}, {Session}, {Token}", guildId, session, token); + + _udpEp = new(IPAddress.Parse(ready.Ip), ready.Port); + + var ssrcBytes = BitConverter.GetBytes(Ssrc); + Array.Reverse(ssrcBytes); + var ipDiscoveryData = new byte[74]; + Buffer.BlockCopy(ssrcBytes, 0, ipDiscoveryData, 4, ssrcBytes.Length); + ipDiscoveryData[0] = 0x00; + ipDiscoveryData[1] = 0x01; + ipDiscoveryData[2] = 0x00; + ipDiscoveryData[3] = 0x46; + await _udpClient.SendAsync(ipDiscoveryData, ipDiscoveryData.Length, _udpEp); + while (true) + { + var buffer = _udpClient.Receive(ref _udpEp); + + if (buffer.Length == 74) + { + //Log.Information("Received IP discovery data."); + + var myIp = Encoding.UTF8.GetString(buffer, 8, buffer.Length - 10); + MyIp = myIp.TrimEnd('\0'); + MyPort = (ushort)((buffer[^2] << 8) | buffer[^1]); + + //Log.Information("{MyIp}:{MyPort}", MyIp, MyPort); + + await SelectProtocol(); + return; + } + + //Log.Information("Received voice data"); + } + } + + private Task HandleHelloAsync(VoiceHello data) + { + _receivedAck = true; + _heartbeatTimer = new(async _ => + { + await SendHeartbeatAsync(); + }, default, data.HeartbeatInterval, data.HeartbeatInterval); + + if (_shouldResume) + { + return ResumeAsync(); + } + + return IdentifyAsync(); + } + + private Task IdentifyAsync() + => SendCommandPayloadAsync(new() + { + OpCode = VoiceOpCode.Identify, + Data = JToken.FromObject(new VoiceIdentify + { + ServerId = _guildId.ToString(), + SessionId = _sessionId, + Token = _token, + UserId = _userId.ToString(), + }) + }); + + private Task SelectProtocol() + => SendCommandPayloadAsync(new() + { + OpCode = VoiceOpCode.SelectProtocol, + Data = JToken.FromObject(new SelectProtocol + { + Protocol = "udp", + Data = new() + { + Address = MyIp, + Port = MyPort, + Mode = "xsalsa20_poly1305_lite", + } + }) + }); + + private async Task SendHeartbeatAsync() + { + if (!_receivedAck) + { + Log.Warning("Voice gateway didn't receive HearbeatAck - closing"); + var success = await _ws.CloseAsync(); + if (!success) + await _ws_WebsocketClosed(null); + return; + } + + _receivedAck = false; + await SendCommandPayloadAsync(new() + { + OpCode = VoiceOpCode.Heartbeat, + Data = JToken.FromObject(_rng.Next()) + }); + } + + public Task SendSpeakingAsync(VoiceSpeaking.State speaking) + => SendCommandPayloadAsync(new() + { + OpCode = VoiceOpCode.Speaking, + Data = JToken.FromObject(new VoiceSpeaking + { + Delay = 0, + Ssrc = Ssrc, + Speaking = (int)speaking + }) + }); + + public Task StopAsync() + { + Started = false; + _shouldResume = false; + if (!_stopCancellationSource.IsCancellationRequested) + try { _stopCancellationSource.Cancel(); } catch { } + return _ws.CloseAsync("Stopped by the user."); + } + + public Task Start() + { + Started = true; + _ = SendLoop(); + return _ws.RunAndBlockAsync(_websocketUrl, _stopCancellationToken); + } + + public bool Started { get; set; } + + public async Task SendCommandPayloadAsync(VoicePayload payload) + { + var complete = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var queueItem = new QueueItem(payload, complete); + + if (!_channel.Writer.TryWrite(queueItem)) + await _channel.Writer.WriteAsync(queueItem); + + await complete.Task; + } + } +} \ No newline at end of file