Added Ayu.Discord.Voice

This commit is contained in:
Emotion 2023-07-11 16:35:28 +12:00
parent 433b14718d
commit ecd5d1a3c4
No known key found for this signature in database
GPG key ID: D7D3E4C27A98C37B
17 changed files with 1332 additions and 0 deletions

View file

@ -0,0 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk" ToolsVersion="Current">
<PropertyGroup>
<TargetFramework>netstandard2.1</TargetFramework>
<LangVersion>9.0</LangVersion>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<NoWarn>CS8632</NoWarn>
<Version>1.0.2</Version>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
<PackageReference Include="Serilog" Version="2.12.0" />
<PackageReference Include="System.Threading.Channels" Version="7.0.0" />
</ItemGroup>
</Project>

View file

@ -0,0 +1,35 @@
using System.Collections.Generic;
using System.Collections.ObjectModel;
namespace Ayu.Discord.Gateway
{
public static class CloseCodes
{
private static IReadOnlyDictionary<int, (string, string)> _closeCodes = new ReadOnlyDictionary<int, (string, string)>(
new Dictionary<int, (string, string)>()
{
{ 4000, ("Unknown error", "We're not sure what went wrong. Try reconnecting?")},
{ 4001, ("Unknown opcode", "You sent an invalid Gateway opcode or an invalid payload for an opcode. Don't do that!")},
{ 4002, ("Decode error", "You sent an invalid payload to us. Don't do that!")},
{ 4003, ("Not authenticated", "You sent us a payload prior to identifying.")},
{ 4004, ("Authentication failed", "The account token sent with your identify payload is incorrect.")},
{ 4005, ("Already authenticated", "You sent more than one identify payload. Don't do that!")},
{ 4007, ("Invalid seq", "The sequence sent when resuming the session was invalid. Reconnect and start a new session.")},
{ 4008, ("Rate limited", "Woah nelly! You're sending payloads to us too quickly. Slow it down! You will be disconnected on receiving this.")},
{ 4009, ("Session timed out", "Your session timed out. Reconnect and start a new one.")},
{ 4010, ("Invalid shard", "You sent us an invalid shard when identifying.")},
{ 4011, ("Sharding required", "The session would have handled too many guilds - you are required to shard your connection in order to connect.")},
{ 4012, ("Invalid API version", "You sent an invalid version for the gateway.")},
{ 4013, ("Invalid intent(s)", "You sent an invalid intent for a Gateway Intent. You may have incorrectly calculated the bitwise value.")},
{ 4014, ("Disallowed intent(s)", "You sent a disallowed intent for a Gateway Intent. You may have tried to specify an intent that you have not enabled or are not whitelisted for.")}
});
public static (string Error, string Message) GetErrorCodeMessage(int closeCode)
{
if (_closeCodes.TryGetValue(closeCode, out var data))
return data;
return ("Unknown error", closeCode.ToString());
}
}
}

View file

@ -0,0 +1,125 @@
using System;
using System.Runtime.InteropServices;
namespace Ayu.Discord.Voice
{
internal static unsafe class LibOpus
{
public const string OPUS = "data/lib/opus";
[DllImport(OPUS, EntryPoint = "opus_encoder_create", CallingConvention = CallingConvention.Cdecl)]
internal static extern IntPtr CreateEncoder(int Fs, int channels, int application, out OpusError error);
[DllImport(OPUS, EntryPoint = "opus_encoder_destroy", CallingConvention = CallingConvention.Cdecl)]
internal static extern void DestroyEncoder(IntPtr encoder);
[DllImport(OPUS, EntryPoint = "opus_encode", CallingConvention = CallingConvention.Cdecl)]
internal static extern int Encode(IntPtr st, byte* pcm, int frame_size, byte* data, int max_data_bytes);
[DllImport(OPUS, EntryPoint = "opus_encode_float", CallingConvention = CallingConvention.Cdecl)]
internal static extern int EncodeFloat(IntPtr st, byte* pcm, int frame_size, byte* data, int max_data_bytes);
[DllImport(OPUS, EntryPoint = "opus_encoder_ctl", CallingConvention = CallingConvention.Cdecl)]
internal static extern int EncoderCtl(IntPtr st, OpusCtl request, int value);
}
public enum OpusApplication
{
VOIP = 2048,
Audio = 2049,
RestrictedLowdelay = 2051
}
public unsafe class LibOpusEncoder : IDisposable
{
private readonly IntPtr _encoderPtr;
private readonly int _sampleRate;
// private readonly int _channels;
// private readonly int _bitRate;
private readonly int _frameDelay;
private readonly int _frameSizePerChannel;
public int FrameSizePerChannel => _frameSizePerChannel;
public const int MaxData = 1276;
public LibOpusEncoder(int sampleRate, int channels, int bitRate, int frameDelay)
{
_sampleRate = sampleRate;
// _channels = channels;
// _bitRate = bitRate;
_frameDelay = frameDelay;
_frameSizePerChannel = _sampleRate * _frameDelay / 1000;
_encoderPtr = LibOpus.CreateEncoder(sampleRate, channels, (int)OpusApplication.Audio, out var error);
if (error != OpusError.OK)
throw new ExternalException(error.ToString());
LibOpus.EncoderCtl(_encoderPtr, OpusCtl.SetSignal, (int)OpusSignal.Music);
LibOpus.EncoderCtl(_encoderPtr, OpusCtl.SetInbandFEC, 1);
LibOpus.EncoderCtl(_encoderPtr, OpusCtl.SetBitrate, bitRate);
LibOpus.EncoderCtl(_encoderPtr, OpusCtl.SetPacketLossPerc, 2);
}
public int SetControl(OpusCtl ctl, int value)
=> LibOpus.EncoderCtl(_encoderPtr, ctl, value);
public int Encode(Span<byte> input, byte[] output)
{
fixed (byte* inPtr = input)
fixed (byte* outPtr = output)
return LibOpus.Encode(_encoderPtr, inPtr, FrameSizePerChannel, outPtr, output.Length);
}
public int EncodeFloat(Span<byte> input, byte[] output)
{
fixed (byte* inPtr = input)
fixed (byte* outPtr = output)
return LibOpus.EncodeFloat(_encoderPtr, inPtr, FrameSizePerChannel, outPtr, output.Length);
}
public void Dispose()
=> LibOpus.DestroyEncoder(_encoderPtr);
}
public enum OpusCtl
{
SetBitrate = 4002,
GetBitrate = 4003,
SetBandwidth = 4008,
GetBandwidth = 4009,
SetComplexity = 4010,
GetComplexity = 4011,
SetInbandFEC = 4012,
GetInbandFEC = 4013,
SetPacketLossPerc = 4014,
GetPacketLossPerc = 4015,
SetLsbDepth = 4036,
GetLsbDepth = 4037,
SetDtx = 4016,
GetDtx = 4017,
SetSignal = 4024
}
public enum OpusError
{
OK = 0,
BadArg = -1,
BufferToSmall = -2,
InternalError = -3,
InvalidPacket = -4,
Unimplemented = -5,
InvalidState = -6,
AllocFail = -7
}
public enum OpusSignal
{
Auto = -1000,
Voice = 3001,
Music = 3002,
}
}

View file

@ -0,0 +1,32 @@
using System;
using System.Runtime.InteropServices;
namespace Ayu.Discord.Voice
{
internal static unsafe class Sodium
{
private const string SODIUM = "data/lib/libsodium";
[DllImport(SODIUM, EntryPoint = "crypto_secretbox_easy", CallingConvention = CallingConvention.Cdecl)]
private static extern int SecretBoxEasy(byte* output, byte* input, long inputLength, byte* nonce, byte* secret);
[DllImport(SODIUM, EntryPoint = "crypto_secretbox_open_easy", CallingConvention = CallingConvention.Cdecl)]
private static extern int SecretBoxOpenEasy(byte* output, byte* input, ulong inputLength, byte* nonce, byte* secret);
public static int Encrypt(byte[] input, int inputOffset, long inputLength, byte[] output, int outputOffset, in ReadOnlySpan<byte> nonce, byte[] secret)
{
fixed (byte* inPtr = input)
fixed (byte* outPtr = output)
fixed (byte* noncePtr = nonce)
fixed (byte* secretPtr = secret)
return SecretBoxEasy(outPtr + outputOffset, inPtr + inputOffset, inputLength - inputOffset, noncePtr, secretPtr);
}
public static int Decrypt(byte[] input, ulong inputLength, byte[] output, in ReadOnlySpan<byte> nonce, byte[] secret)
{
fixed (byte* outPtr = output)
fixed (byte* inPtr = input)
fixed (byte* noncePtr = nonce)
fixed (byte* secretPtr = secret)
return SecretBoxOpenEasy(outPtr, inPtr, inputLength, noncePtr, secretPtr);
}
}
}

View file

@ -0,0 +1,23 @@
using Newtonsoft.Json;
namespace Ayu.Discord.Voice.Models
{
public sealed class SelectProtocol
{
[JsonProperty("protocol")]
public string Protocol { get; set; }
[JsonProperty("data")]
public ProtocolData Data { get; set; }
public sealed class ProtocolData
{
[JsonProperty("address")]
public string Address { get; set; }
[JsonProperty("port")]
public int Port { get; set; }
[JsonProperty("mode")]
public string Mode { get; set; }
}
}
}

View file

@ -0,0 +1,10 @@
using Newtonsoft.Json;
namespace Ayu.Discord.Voice.Models
{
public sealed class VoiceHello
{
[JsonProperty("heartbeat_interval")]
public int HeartbeatInterval { get; set; }
}
}

View file

@ -0,0 +1,20 @@
using Newtonsoft.Json;
namespace Ayu.Discord.Voice.Models
{
public sealed class VoiceIdentify
{
[JsonProperty("server_id")]
public string ServerId { get; set; }
[JsonProperty("user_id")]
public string UserId { get; set; }
[JsonProperty("session_id")]
public string SessionId { get; set; }
[JsonProperty("token")]
public string Token { get; set; }
}
}

View file

@ -0,0 +1,29 @@
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
namespace Discord.Models.Gateway
{
public sealed class VoicePayload
{
[JsonProperty("op")]
public VoiceOpCode OpCode { get; set; }
[JsonProperty("d")]
public JToken Data { get; set; }
}
public enum VoiceOpCode
{
Identify = 0,
SelectProtocol = 1,
Ready = 2,
Heartbeat = 3,
SessionDescription = 4,
Speaking = 5,
HeartbeatAck = 6,
Resume = 7,
Hello = 8,
Resumed = 9,
ClientDisconnect = 13,
}
}

View file

@ -0,0 +1,22 @@
using Newtonsoft.Json;
namespace Ayu.Discord.Voice.Models
{
public sealed class VoiceReady
{
[JsonProperty("ssrc")]
public uint Ssrc { get; set; }
[JsonProperty("ip")]
public string Ip { get; set; }
[JsonProperty("port")]
public int Port { get; set; }
[JsonProperty("modes")]
public string[] Modes { get; set; }
[JsonProperty("heartbeat_interval")]
public string HeartbeatInterval { get; set; }
}
}

View file

@ -0,0 +1,16 @@
using Newtonsoft.Json;
namespace Ayu.Discord.Voice.Models
{
public sealed class VoiceResume
{
[JsonProperty("server_id")]
public string ServerId { get; set; }
[JsonProperty("session_id")]
public string SessionId { get; set; }
[JsonProperty("token")]
public string Token { get; set; }
}
}

View file

@ -0,0 +1,13 @@
using Newtonsoft.Json;
namespace Ayu.Discord.Voice.Models
{
public sealed class VoiceSessionDescription
{
[JsonProperty("mode")]
public string Mode { get; set; }
[JsonProperty("secret_key")]
public byte[] SecretKey { get; set; }
}
}

View file

@ -0,0 +1,26 @@
using Newtonsoft.Json;
using System;
namespace Ayu.Discord.Voice.Models
{
public sealed class VoiceSpeaking
{
[JsonProperty("speaking")]
public int Speaking { get; set; }
[JsonProperty("delay")]
public int Delay { get; set; }
[JsonProperty("ssrc")]
public uint Ssrc { get; set; }
[Flags]
public enum State
{
None = 0,
Microphone = 1 << 0,
Soundshare = 1 << 1,
Priority = 1 << 2
}
}
}

View file

@ -0,0 +1,136 @@
#nullable enable
using System;
using System.Buffers;
using System.Threading;
using System.Threading.Tasks;
namespace Ayu.Discord.Voice
{
public sealed class PoopyBufferImmortalized : ISongBuffer
{
private readonly byte[] _buffer;
private readonly byte[] _outputArray;
private CancellationToken _cancellationToken;
private bool _isStopped;
public int ReadPosition { get; private set; }
public int WritePosition { get; private set; }
public int ContentLength => WritePosition >= ReadPosition
? WritePosition - ReadPosition
: (_buffer.Length - ReadPosition) + WritePosition;
public int FreeSpace => _buffer.Length - ContentLength;
public bool Stopped => _cancellationToken.IsCancellationRequested || _isStopped;
public PoopyBufferImmortalized(int frameSize)
{
_buffer = ArrayPool<byte>.Shared.Rent(1_000_000);
_outputArray = new byte[frameSize];
ReadPosition = 0;
WritePosition = 0;
}
public void Stop()
=> _isStopped = true;
// this method needs a rewrite
public Task<bool> BufferAsync(ITrackDataSource source, CancellationToken cancellationToken)
{
_cancellationToken = cancellationToken;
var bufferingCompleted = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
Task.Run(async () =>
{
var output = ArrayPool<byte>.Shared.Rent(38400);
try
{
int read;
while (!Stopped && (read = source.Read(output)) > 0)
{
while (!Stopped && FreeSpace <= read)
{
bufferingCompleted.TrySetResult(true);
await Task.Delay(100, cancellationToken).ConfigureAwait(false);
}
if (Stopped)
break;
Write(output, read);
}
}
finally
{
ArrayPool<byte>.Shared.Return(output);
bufferingCompleted.TrySetResult(true);
}
}, cancellationToken);
return bufferingCompleted.Task;
}
private void Write(byte[] input, int writeCount)
{
if (WritePosition + writeCount < _buffer.Length)
{
Buffer.BlockCopy(input, 0, _buffer, WritePosition, writeCount);
WritePosition += writeCount;
return;
}
var wroteNormally = _buffer.Length - WritePosition;
Buffer.BlockCopy(input, 0, _buffer, WritePosition, wroteNormally);
var wroteFromStart = writeCount - wroteNormally;
Buffer.BlockCopy(input, wroteNormally, _buffer, 0, wroteFromStart);
WritePosition = wroteFromStart;
}
public Span<byte> Read(int count, out int length)
{
var toRead = Math.Min(ContentLength, count);
var wp = WritePosition;
if (ContentLength == 0)
{
length = 0;
return Span<byte>.Empty;
}
if (wp > ReadPosition || ReadPosition + toRead <= _buffer.Length)
{
// thsi can be achieved without copying if
// writer never writes until the end,
// but leaves a single chunk free
Span<byte> toReturn = _outputArray;
((Span<byte>)_buffer).Slice(ReadPosition, toRead).CopyTo(toReturn);
ReadPosition += toRead;
length = toRead;
return toReturn;
}
else
{
Span<byte> toReturn = _outputArray;
var toEnd = _buffer.Length - ReadPosition;
var bufferSpan = (Span<byte>)_buffer;
bufferSpan.Slice(ReadPosition, toEnd).CopyTo(toReturn);
var fromStart = toRead - toEnd;
bufferSpan.Slice(0, fromStart).CopyTo(toReturn.Slice(toEnd));
ReadPosition = fromStart;
length = toEnd + fromStart;
return toReturn;
}
}
public void Dispose()
=> ArrayPool<byte>.Shared.Return(_buffer);
public void Reset()
{
ReadPosition = 0;
WritePosition = 0;
}
}
}

View file

@ -0,0 +1,154 @@
using Serilog;
using System;
using System.Buffers;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;
namespace Ayu.Discord.Gateway
{
public class SocketClient : IDisposable
{
private ClientWebSocket? _ws;
public event Func<byte[], Task>? PayloadReceived = delegate { return Task.CompletedTask; };
public event Func<string, Task>? WebsocketClosed = delegate { return Task.CompletedTask; };
const int CHUNK_SIZE = 1024 * 16;
public async Task RunAndBlockAsync(Uri url, CancellationToken cancel)
{
var error = "Error.";
var bufferWriter = new ArrayBufferWriter<byte>(CHUNK_SIZE);
try
{
using (_ws = new())
{
await _ws.ConnectAsync(url, cancel).ConfigureAwait(false);
// WebsocketConnected!.Invoke(this);
while (true)
{
var result = await _ws.ReceiveAsync(bufferWriter.GetMemory(CHUNK_SIZE), cancel);
bufferWriter.Advance(result.Count);
if (result.MessageType == WebSocketMessageType.Close)
{
var closeMessage = CloseCodes.GetErrorCodeMessage((int?)_ws.CloseStatus ?? 0).Message;
error = $"Websocket closed ({_ws.CloseStatus}): {_ws.CloseStatusDescription} {closeMessage}";
break;
}
if (result.EndOfMessage)
{
var pr = PayloadReceived;
var data = bufferWriter.WrittenMemory.ToArray();
bufferWriter.Clear();
if (pr is not null)
{
await pr.Invoke(data);
}
}
}
}
}
catch (WebSocketException ex)
{
Log.Warning("Disconnected, check your internet connection...");
Log.Debug(ex, "Websocket Exception in websocket client");
}
catch (OperationCanceledException)
{
// ignored
}
catch (Exception ex)
{
Log.Error(ex, "Error in websocket client. {Message}", ex.Message);
}
finally
{
bufferWriter.Clear();
_ws = null;
await ClosedAsync(error).ConfigureAwait(false);
}
}
private async Task ClosedAsync(string msg = "Error")
{
try
{
await WebsocketClosed!.Invoke(msg).ConfigureAwait(false);
}
catch
{
}
}
private readonly SemaphoreSlim _sendLock = new SemaphoreSlim(1, 1);
public async Task SendAsync(byte[] data)
{
await _sendLock.WaitAsync().ConfigureAwait(false);
try
{
var ws = _ws;
if (ws is null)
throw new WebSocketException("Websocket is disconnected.");
for (var i = 0; i < data.Length; i += 4096)
{
var count = i + 4096 > data.Length ? data.Length - i : 4096;
await ws.SendAsync(new(data, i, count),
WebSocketMessageType.Text,
i + count >= data.Length,
CancellationToken.None).ConfigureAwait(false);
}
}
finally
{
_sendLock.Release();
}
}
public async Task SendBulkAsync(byte[] data)
{
var ws = _ws;
if (ws is null)
throw new WebSocketException("Websocket is disconnected.");
await ws.SendAsync(new(data, 0, data.Length),
WebSocketMessageType.Binary,
true,
CancellationToken.None).ConfigureAwait(false);
}
public async Task<bool> CloseAsync(string msg = "Stop")
{
if (_ws is not null && _ws.State != WebSocketState.Closed)
{
try
{
await _ws.CloseAsync(WebSocketCloseStatus.InternalServerError, msg, CancellationToken.None)
.ConfigureAwait(false);
return true;
}
catch
{
}
}
return false;
}
public void Dispose()
{
PayloadReceived = null;
WebsocketClosed = null;
var ws = _ws;
if (ws is null)
return;
ws.Dispose();
}
}
}

View file

@ -0,0 +1,95 @@
using Serilog;
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
namespace Ayu.Discord.Voice
{
public interface ISongBuffer : IDisposable
{
Span<byte> Read(int toRead, out int read);
Task<bool> BufferAsync(ITrackDataSource source, CancellationToken cancellationToken);
void Reset();
void Stop();
}
public interface ITrackDataSource
{
public int Read(byte[] output);
}
public sealed class FfmpegTrackDataSource : ITrackDataSource, IDisposable
{
private Process _p;
private readonly string _streamUrl;
private readonly bool _isLocal;
private readonly string _pcmType;
private FfmpegTrackDataSource(int bitDepth, string streamUrl, bool isLocal)
{
this._pcmType = bitDepth == 16 ? "s16le" : "f32le";
this._streamUrl = streamUrl;
this._isLocal = isLocal;
}
public static FfmpegTrackDataSource CreateAsync(int bitDepth, string streamUrl, bool isLocal)
{
try
{
var source = new FfmpegTrackDataSource(bitDepth, streamUrl, isLocal);
source.StartFFmpegProcess();
return source;
}
catch (System.ComponentModel.Win32Exception)
{
Log.Error(@"You have not properly installed or configured FFMPEG.
Please install and configure FFMPEG to play music.
Check the guides for your platform on how to setup ffmpeg correctly:
Windows Guide: https://goo.gl/OjKk8F
Linux Guide: https://goo.gl/ShjCUo");
throw;
}
catch (OperationCanceledException)
{
}
catch (InvalidOperationException)
{
}
catch (Exception ex)
{
Log.Information(ex, "Error starting ffmpeg: {ErrorMessage}", ex.Message);
}
return null;
}
private Process StartFFmpegProcess()
{
var args = $"-err_detect ignore_err -i {_streamUrl} -f {_pcmType} -ar 48000 -vn -ac 2 pipe:1 -loglevel error";
if (!_isLocal)
args = $"-reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 5 {args}";
return _p = Process.Start(new ProcessStartInfo
{
FileName = "ffmpeg",
Arguments = args,
UseShellExecute = false,
RedirectStandardOutput = true,
RedirectStandardError = false,
CreateNoWindow = true,
});
}
public int Read(byte[] output)
=> _p.StandardOutput.BaseStream.Read(output);
public void Dispose()
{
try { _p?.Kill(); } catch { }
try { _p?.Dispose(); } catch { }
}
}
}

View file

@ -0,0 +1,207 @@
using System;
using System.Buffers;
namespace Ayu.Discord.Voice
{
public sealed class VoiceClient : IDisposable
{
delegate int EncodeDelegate(Span<byte> input, byte[] output);
private readonly int sampleRate;
private readonly int bitRate;
private readonly int channels;
private readonly int frameDelay;
private readonly int bitDepth;
public LibOpusEncoder Encoder { get; }
private readonly ArrayPool<byte> _arrayPool;
public int BitDepth => bitDepth * 8;
public int Delay => frameDelay;
private int FrameSizePerChannel => Encoder.FrameSizePerChannel;
public int InputLength => FrameSizePerChannel * channels * bitDepth;
EncodeDelegate Encode;
// https://github.com/xiph/opus/issues/42 w
public VoiceClient(SampleRate sampleRate = SampleRate._48k,
Bitrate bitRate = Bitrate._192k,
Channels channels = Channels.Two,
FrameDelay frameDelay = FrameDelay.Delay20,
BitDepthEnum bitDepthEnum = BitDepthEnum.Float32)
{
this.frameDelay = (int)frameDelay;
this.sampleRate = (int)sampleRate;
this.bitRate = (int)bitRate;
this.channels = (int)channels;
this.bitDepth = (int)bitDepthEnum;
this.Encoder = new(this.sampleRate, this.channels, this.bitRate, this.frameDelay);
Encode = bitDepthEnum switch
{
BitDepthEnum.Float32 => Encoder.EncodeFloat,
BitDepthEnum.UInt16 => Encoder.Encode,
_ => throw new NotSupportedException(nameof(BitDepth))
};
if (bitDepthEnum == BitDepthEnum.Float32)
{
Encode = Encoder.EncodeFloat;
}
else
{
Encode = Encoder.Encode;
}
_arrayPool = ArrayPool<byte>.Shared;
}
public int SendPcmFrame(VoiceGateway gw, Span<byte> data, int offset, int count)
{
var secretKey = gw.SecretKey;
if (secretKey.Length == 0)
{
return (int)SendPcmError.SecretKeyUnavailable;
}
// encode using opus
var encodeOutput = _arrayPool.Rent(LibOpusEncoder.MaxData);
try
{
var encodeOutputLength = Encode(data, encodeOutput);
return SendOpusFrame(gw, encodeOutput, 0, encodeOutputLength);
}
finally
{
_arrayPool.Return(encodeOutput);
}
}
public int SendOpusFrame(VoiceGateway gw, byte[] data, int offset, int count)
{
var secretKey = gw.SecretKey;
if (secretKey is null)
{
return (int)SendPcmError.SecretKeyUnavailable;
}
// form RTP header
var headerLength = 1 // version + flags
+ 1 // payload type
+ 2 // sequence
+ 4 // timestamp
+ 4; // ssrc
var header = new byte[headerLength];
header[0] = 0x80; // version + flags
header[1] = 0x78; // payload type
// get byte values for header data
var seqBytes = BitConverter.GetBytes(gw.Sequence); // 2
var nonceBytes = BitConverter.GetBytes(gw.NonceSequence); // 2
var timestampBytes = BitConverter.GetBytes(gw.Timestamp); // 4
var ssrcBytes = BitConverter.GetBytes(gw.Ssrc); // 4
gw.Timestamp += (uint)FrameSizePerChannel;
gw.Sequence++;
gw.NonceSequence++;
if (BitConverter.IsLittleEndian)
{
Array.Reverse(seqBytes);
Array.Reverse(nonceBytes);
Array.Reverse(timestampBytes);
Array.Reverse(ssrcBytes);
}
// copy headers
Buffer.BlockCopy(seqBytes, 0, header, 2, 2);
Buffer.BlockCopy(timestampBytes, 0, header, 4, 4);
Buffer.BlockCopy(ssrcBytes, 0, header, 8, 4);
//// encryption part
//// create a byte array where to store the encrypted data
//// it has to be inputLength + crypto_secretbox_MACBYTES (constant with value 16)
var encryptedBytes = new byte[count + 16];
//// form nonce with header + 12 empty bytes
//var nonce = new byte[24];
//Buffer.BlockCopy(rtpHeader, 0, nonce, 0, rtpHeader.Length);
var nonce = new byte[4];
Buffer.BlockCopy(seqBytes, 0, nonce, 2, 2);
Sodium.Encrypt(data, 0, count, encryptedBytes, 0, nonce, secretKey);
var rtpDataLength = headerLength + encryptedBytes.Length + nonce.Length;
var rtpData = _arrayPool.Rent(rtpDataLength);
try
{
//copy headers
Buffer.BlockCopy(header, 0, rtpData, 0, header.Length);
//copy audio data
Buffer.BlockCopy(encryptedBytes, 0, rtpData, header.Length, encryptedBytes.Length);
Buffer.BlockCopy(nonce, 0, rtpData, rtpDataLength - 4, 4);
gw.SendRtpData(rtpData, rtpDataLength);
// FUTURE When there's a break in the sent data,
// the packet transmission shouldn't simply stop.
// Instead, send five frames of silence (0xF8, 0xFF, 0xFE)
// before stopping to avoid unintended Opus interpolation
// with subsequent transmissions.
return rtpDataLength;
}
finally
{
_arrayPool.Return(rtpData);
}
}
public void Dispose()
=> Encoder.Dispose();
}
public enum SendPcmError
{
SecretKeyUnavailable = -1,
}
public enum FrameDelay
{
Delay5 = 5,
Delay10 = 10,
Delay20 = 20,
Delay40 = 40,
Delay60 = 60,
}
public enum BitDepthEnum
{
UInt16 = sizeof(UInt16),
Float32 = sizeof(float),
}
public enum SampleRate
{
_48k = 48_000,
}
public enum Bitrate
{
_64k = 64 * 1024,
_96k = 96 * 1024,
_128k = 128 * 1024,
_192k = 192 * 1024,
}
public enum Channels
{
One = 1,
Two = 2,
}
}

View file

@ -0,0 +1,375 @@
using Ayu.Discord.Voice.Models;
using Discord.Models.Gateway;
using Newtonsoft.Json.Linq;
using Serilog;
using System;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Ayu.Discord.Gateway;
using Newtonsoft.Json;
namespace Ayu.Discord.Voice
{
public class VoiceGateway
{
private class QueueItem
{
public VoicePayload Payload { get; }
public TaskCompletionSource<bool> Result { get; }
public QueueItem(VoicePayload payload, TaskCompletionSource<bool> result)
{
Payload = payload;
Result = result;
}
}
private readonly ulong _guildId;
private readonly ulong _userId;
private readonly string _sessionId;
private readonly string _token;
private readonly string _endpoint;
private readonly Uri _websocketUrl;
private readonly Channel<QueueItem> _channel;
public TaskCompletionSource<bool> ConnectingFinished { get; }
private readonly Random _rng;
private readonly SocketClient _ws;
private readonly UdpClient _udpClient;
private Timer? _heartbeatTimer;
private bool _receivedAck;
private IPEndPoint? _udpEp;
public uint Ssrc { get; private set; }
public string Ip { get; private set; } = string.Empty;
public int Port { get; private set; } = 0;
public byte[] SecretKey { get; private set; } = Array.Empty<byte>();
public string Mode { get; private set; } = string.Empty;
public ushort Sequence { get; set; }
public uint NonceSequence { get; set; }
public uint Timestamp { get; set; }
public string MyIp { get; private set; } = string.Empty;
public ushort MyPort { get; private set; }
private bool _shouldResume;
private readonly CancellationTokenSource _stopCancellationSource;
private readonly CancellationToken _stopCancellationToken;
public bool Stopped => _stopCancellationToken.IsCancellationRequested;
public event Func<VoiceGateway, Task> OnClosed = delegate { return Task.CompletedTask; };
public VoiceGateway(ulong guildId, ulong userId, string session, string token, string endpoint)
{
this._guildId = guildId;
this._userId = userId;
this._sessionId = session;
this._token = token;
this._endpoint = endpoint;
//Log.Information("g: {GuildId} u: {UserId} sess: {Session} tok: {Token} ep: {Endpoint}",
// guildId, userId, session, token, endpoint);
this._websocketUrl = new($"wss://{_endpoint.Replace(":80", "")}?v=4");
this._channel = Channel.CreateUnbounded<QueueItem>(new()
{
SingleReader = true,
SingleWriter = false,
AllowSynchronousContinuations = false,
});
ConnectingFinished = new();
_rng = new();
_ws = new();
_udpClient = new();
_stopCancellationSource = new();
_stopCancellationToken = _stopCancellationSource.Token;
_ws.PayloadReceived += _ws_PayloadReceived;
_ws.WebsocketClosed += _ws_WebsocketClosed;
}
public Task WaitForReadyAsync()
=> ConnectingFinished.Task;
private async Task SendLoop()
{
while (!_stopCancellationToken.IsCancellationRequested)
{
try
{
var qi = await _channel.Reader.ReadAsync(_stopCancellationToken);
//Log.Information("Sending payload with opcode {OpCode}", qi.Payload.OpCode);
var json = JsonConvert.SerializeObject(qi.Payload);
if (!_stopCancellationToken.IsCancellationRequested)
await _ws.SendAsync(Encoding.UTF8.GetBytes(json));
_ = Task.Run(() => qi.Result.TrySetResult(true));
}
catch (ChannelClosedException)
{
Log.Warning("Voice gateway send channel is closed");
}
}
}
private async Task _ws_PayloadReceived(byte[] arg)
{
var payload = JsonConvert.DeserializeObject<VoicePayload>(Encoding.UTF8.GetString(arg));
if (payload is null)
return;
try
{
//Log.Information("Received payload with opcode {OpCode}", payload.OpCode);
switch (payload.OpCode)
{
case VoiceOpCode.Identify:
// sent, not received.
break;
case VoiceOpCode.SelectProtocol:
// sent, not received
break;
case VoiceOpCode.Ready:
var ready = payload.Data.ToObject<VoiceReady>();
await HandleReadyAsync(ready!);
_shouldResume = true;
break;
case VoiceOpCode.Heartbeat:
// sent, not received
break;
case VoiceOpCode.SessionDescription:
var sd = payload.Data.ToObject<VoiceSessionDescription>();
await HandleSessionDescription(sd!);
break;
case VoiceOpCode.Speaking:
// ignore for now
break;
case VoiceOpCode.HeartbeatAck:
_receivedAck = true;
break;
case VoiceOpCode.Resume:
// sent, not received
break;
case VoiceOpCode.Hello:
var hello = payload.Data.ToObject<VoiceHello>();
await HandleHelloAsync(hello!);
break;
case VoiceOpCode.Resumed:
_shouldResume = true;
break;
case VoiceOpCode.ClientDisconnect:
break;
}
}
catch (Exception ex)
{
Log.Error(ex, "Error handling payload with opcode {OpCode}: {Message}", payload.OpCode, ex.Message);
}
}
private Task _ws_WebsocketClosed(string arg)
{
if (!string.IsNullOrWhiteSpace(arg))
{
Log.Warning("Voice Websocket closed: {Arg}", arg);
}
var hbt = _heartbeatTimer;
hbt?.Change(Timeout.Infinite, Timeout.Infinite);
_heartbeatTimer = null;
if (!_stopCancellationToken.IsCancellationRequested && _shouldResume)
{
_ = _ws.RunAndBlockAsync(_websocketUrl, _stopCancellationToken);
return Task.CompletedTask;
}
_ws.WebsocketClosed -= _ws_WebsocketClosed;
_ws.PayloadReceived -= _ws_PayloadReceived;
if (!_stopCancellationToken.IsCancellationRequested)
_stopCancellationSource.Cancel();
return this.OnClosed(this);
}
public void SendRtpData(byte[] rtpData, int length)
=> _udpClient.Send(rtpData, length, _udpEp);
private Task HandleSessionDescription(VoiceSessionDescription sd)
{
SecretKey = sd.SecretKey;
Mode = sd.Mode;
_ = Task.Run(() => ConnectingFinished.TrySetResult(true));
return Task.CompletedTask;
}
private Task ResumeAsync()
{
_shouldResume = false;
return SendCommandPayloadAsync(new()
{
OpCode = VoiceOpCode.Resume,
Data = JToken.FromObject(new VoiceResume
{
ServerId = this._guildId.ToString(),
SessionId = this._sessionId,
Token = this._token,
})
});
}
private async Task HandleReadyAsync(VoiceReady ready)
{
Ssrc = ready.Ssrc;
//Log.Information("Received ready {GuildId}, {Session}, {Token}", guildId, session, token);
_udpEp = new(IPAddress.Parse(ready.Ip), ready.Port);
var ssrcBytes = BitConverter.GetBytes(Ssrc);
Array.Reverse(ssrcBytes);
var ipDiscoveryData = new byte[74];
Buffer.BlockCopy(ssrcBytes, 0, ipDiscoveryData, 4, ssrcBytes.Length);
ipDiscoveryData[0] = 0x00;
ipDiscoveryData[1] = 0x01;
ipDiscoveryData[2] = 0x00;
ipDiscoveryData[3] = 0x46;
await _udpClient.SendAsync(ipDiscoveryData, ipDiscoveryData.Length, _udpEp);
while (true)
{
var buffer = _udpClient.Receive(ref _udpEp);
if (buffer.Length == 74)
{
//Log.Information("Received IP discovery data.");
var myIp = Encoding.UTF8.GetString(buffer, 8, buffer.Length - 10);
MyIp = myIp.TrimEnd('\0');
MyPort = (ushort)((buffer[^2] << 8) | buffer[^1]);
//Log.Information("{MyIp}:{MyPort}", MyIp, MyPort);
await SelectProtocol();
return;
}
//Log.Information("Received voice data");
}
}
private Task HandleHelloAsync(VoiceHello data)
{
_receivedAck = true;
_heartbeatTimer = new(async _ =>
{
await SendHeartbeatAsync();
}, default, data.HeartbeatInterval, data.HeartbeatInterval);
if (_shouldResume)
{
return ResumeAsync();
}
return IdentifyAsync();
}
private Task IdentifyAsync()
=> SendCommandPayloadAsync(new()
{
OpCode = VoiceOpCode.Identify,
Data = JToken.FromObject(new VoiceIdentify
{
ServerId = _guildId.ToString(),
SessionId = _sessionId,
Token = _token,
UserId = _userId.ToString(),
})
});
private Task SelectProtocol()
=> SendCommandPayloadAsync(new()
{
OpCode = VoiceOpCode.SelectProtocol,
Data = JToken.FromObject(new SelectProtocol
{
Protocol = "udp",
Data = new()
{
Address = MyIp,
Port = MyPort,
Mode = "xsalsa20_poly1305_lite",
}
})
});
private async Task SendHeartbeatAsync()
{
if (!_receivedAck)
{
Log.Warning("Voice gateway didn't receive HearbeatAck - closing");
var success = await _ws.CloseAsync();
if (!success)
await _ws_WebsocketClosed(null);
return;
}
_receivedAck = false;
await SendCommandPayloadAsync(new()
{
OpCode = VoiceOpCode.Heartbeat,
Data = JToken.FromObject(_rng.Next())
});
}
public Task SendSpeakingAsync(VoiceSpeaking.State speaking)
=> SendCommandPayloadAsync(new()
{
OpCode = VoiceOpCode.Speaking,
Data = JToken.FromObject(new VoiceSpeaking
{
Delay = 0,
Ssrc = Ssrc,
Speaking = (int)speaking
})
});
public Task StopAsync()
{
Started = false;
_shouldResume = false;
if (!_stopCancellationSource.IsCancellationRequested)
try { _stopCancellationSource.Cancel(); } catch { }
return _ws.CloseAsync("Stopped by the user.");
}
public Task Start()
{
Started = true;
_ = SendLoop();
return _ws.RunAndBlockAsync(_websocketUrl, _stopCancellationToken);
}
public bool Started { get; set; }
public async Task SendCommandPayloadAsync(VoicePayload payload)
{
var complete = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var queueItem = new QueueItem(payload, complete);
if (!_channel.Writer.TryWrite(queueItem))
await _channel.Writer.WriteAsync(queueItem);
await complete.Task;
}
}
}