Pretty much complete websocket implementation.

This commit is contained in:
flash 2022-04-10 18:06:56 +02:00
parent 79591f27b7
commit 0e1fb36721
11 changed files with 434 additions and 216 deletions

View file

@ -99,38 +99,83 @@ namespace Hamakaze {
Action<WsClient> onOpen, Action<WsClient> onOpen,
Action<WsMessage> onMessage, Action<WsMessage> onMessage,
Action<Exception> onError, Action<Exception> onError,
IEnumerable<string> protocols = null IEnumerable<string> protocols = null,
) { Action<HttpResponseMessage> onResponse = null,
CreateWsConnection( bool disposeRequest = true,
bool disposeResponse = true
) => CreateWsConnection(
url, url,
conn => onOpen(new WsClient(conn, onMessage, onError)), conn => onOpen(new WsClient(conn, onMessage, onError)),
onError, onError,
protocols protocols,
onResponse,
disposeRequest,
disposeResponse
);
public void CreateWsClient(
HttpRequestMessage request,
Action<WsClient> onOpen,
Action<WsMessage> onMessage,
Action<Exception> onError,
IEnumerable<string> protocols = null,
Action<HttpResponseMessage> onResponse = null,
bool disposeRequest = true,
bool disposeResponse = true
) => CreateWsConnection(
request,
conn => onOpen(new WsClient(conn, onMessage, onError)),
onError,
protocols,
onResponse,
disposeRequest,
disposeResponse
); );
}
public void CreateWsConnection( public void CreateWsConnection(
string url, string url,
Action<WsConnection> onOpen, Action<WsConnection> onOpen,
Action<Exception> onError, Action<Exception> onError,
IEnumerable<string> protocols = null IEnumerable<string> protocols = null,
Action<HttpResponseMessage> onResponse = null,
bool disposeRequest = true,
bool disposeResponse = true
) => CreateWsConnection(
new HttpRequestMessage(@"GET", url),
onOpen,
onError,
protocols,
onResponse,
disposeRequest,
disposeResponse
);
public void CreateWsConnection(
HttpRequestMessage request,
Action<WsConnection> onOpen,
Action<Exception> onError,
IEnumerable<string> protocols = null,
Action<HttpResponseMessage> onResponse = null,
bool disposeRequest = true,
bool disposeResponse = true
) { ) {
string key = Convert.ToBase64String(RandomNumberGenerator.GetBytes(WS_RNG)); string key = Convert.ToBase64String(RandomNumberGenerator.GetBytes(WS_RNG));
HttpRequestMessage req = new HttpRequestMessage(@"GET", url); request.Connection = HttpConnectionHeader.UPGRADE;
req.Connection = HttpConnectionHeader.UPGRADE; request.SetHeader(@"Cache-Control", @"no-cache");
req.SetHeader(@"Cache-Control", @"no-cache"); request.SetHeader(@"Upgrade", WS_PROTO);
req.SetHeader(@"Upgrade", WS_PROTO); request.SetHeader(@"Sec-WebSocket-Key", key);
req.SetHeader(@"Sec-WebSocket-Key", key); request.SetHeader(@"Sec-WebSocket-Version", @"13");
req.SetHeader(@"Sec-WebSocket-Version", @"13");
if(protocols?.Any() == true) if(protocols?.Any() == true)
req.SetHeader(@"Sec-WebSocket-Protocol", string.Join(@", ", protocols)); request.SetHeader(@"Sec-WebSocket-Protocol", string.Join(@", ", protocols));
SendRequest( SendRequest(
req, request,
(t, res) => { (t, res) => {
try { try {
onResponse?.Invoke(res);
if(res.ProtocolVersion.CompareTo(@"1.1") < 0) if(res.ProtocolVersion.CompareTo(@"1.1") < 0)
throw new HttpUpgradeProtocolVersionException(@"1.1", res.ProtocolVersion); throw new HttpUpgradeProtocolVersionException(@"1.1", res.ProtocolVersion);
@ -168,7 +213,9 @@ namespace Hamakaze {
onError(ex); onError(ex);
} }
}, },
(t, ex) => onError(ex) (t, ex) => onError(ex),
disposeRequest: disposeRequest,
disposeResponse: disposeResponse
); );
} }
@ -182,9 +229,57 @@ namespace Hamakaze {
Action<HttpTask, HttpTask.TaskState> onStateChange = null, Action<HttpTask, HttpTask.TaskState> onStateChange = null,
bool disposeRequest = true, bool disposeRequest = true,
bool disposeResponse = true bool disposeResponse = true
) { ) => Instance.SendRequest(
Instance.SendRequest(request, onComplete, onError, onCancel, onDownloadProgress, onUploadProgress, onStateChange, disposeRequest, disposeResponse); request,
} onComplete,
onError,
onCancel,
onDownloadProgress,
onUploadProgress,
onStateChange,
disposeRequest,
disposeResponse
);
public static void Connect(
string url,
Action<WsClient> onOpen,
Action<WsMessage> onMessage,
Action<Exception> onError,
IEnumerable<string> protocols = null,
Action<HttpResponseMessage> onResponse = null,
bool disposeRequest = true,
bool disposeResponse = true
) => Instance.CreateWsClient(
url,
onOpen,
onMessage,
onError,
protocols,
onResponse,
disposeRequest,
disposeResponse
);
public static void Connect(
HttpRequestMessage request,
Action<WsClient> onOpen,
Action<WsMessage> onMessage,
Action<Exception> onError,
IEnumerable<string> protocols = null,
Action<HttpResponseMessage> onResponse = null,
bool disposeRequest = true,
bool disposeResponse = true
) => Instance.CreateWsClient(
request,
onOpen,
onMessage,
onError,
protocols,
onResponse,
disposeRequest,
disposeResponse
);
private bool IsDisposed; private bool IsDisposed;
~HttpClient() ~HttpClient()

View file

@ -0,0 +1,5 @@
namespace Hamakaze.WebSocket {
public interface IHasBinaryData {
byte[] Data { get; }
}
}

View file

@ -1,7 +1,7 @@
using System; using System;
namespace Hamakaze.WebSocket { namespace Hamakaze.WebSocket {
public class WsBinaryMessage : WsMessage { public class WsBinaryMessage : WsMessage, IHasBinaryData {
public byte[] Data { get; } public byte[] Data { get; }
public WsBinaryMessage(byte[] data) { public WsBinaryMessage(byte[] data) {

View file

@ -8,7 +8,11 @@ namespace Hamakaze.WebSocket {
Connection = conn ?? throw new ArgumentNullException(nameof(conn)); Connection = conn ?? throw new ArgumentNullException(nameof(conn));
} }
// public void SendPart(ReadOnlySpan<byte> data)
=> Connection.WriteFrame(WsOpcode.DataBinary, data, false);
public void SendFinalPart(ReadOnlySpan<byte> data)
=> Connection.WriteFrame(WsOpcode.DataBinary, data, true);
private bool IsDisposed; private bool IsDisposed;
@ -26,6 +30,7 @@ namespace Hamakaze.WebSocket {
return; return;
IsDisposed = true; IsDisposed = true;
Connection.EndBufferedSend();
} }
} }
} }

View file

@ -1,6 +1,8 @@
using System; using System;
using System.Threading; using System.Threading;
// todo: sending errors as fake close messages
namespace Hamakaze.WebSocket { namespace Hamakaze.WebSocket {
public class WsClient : IDisposable { public class WsClient : IDisposable {
public WsConnection Connection { get; } public WsConnection Connection { get; }
@ -11,6 +13,7 @@ namespace Hamakaze.WebSocket {
private Action<Exception> ExceptionHandler { get; } private Action<Exception> ExceptionHandler { get; }
private Mutex SendLock { get; } private Mutex SendLock { get; }
private const int TIMEOUT = 60000;
public WsClient( public WsClient(
WsConnection connection, WsConnection connection,
@ -38,81 +41,246 @@ namespace Hamakaze.WebSocket {
} }
public void Send(string text) { public void Send(string text) {
Connection.Send(text); try {
if(!SendLock.WaitOne(TIMEOUT))
throw new WsClientMutexFailedException();
Connection.Send(text);
} finally {
SendLock.ReleaseMutex();
}
} }
public void Send(object obj) { public void Send(object obj) {
if(obj == null) if(obj == null)
throw new ArgumentNullException(nameof(obj)); throw new ArgumentNullException(nameof(obj));
Connection.Send(obj.ToString()); try {
if(!SendLock.WaitOne(TIMEOUT))
throw new WsClientMutexFailedException();
Connection.Send(obj.ToString());
} finally {
SendLock.ReleaseMutex();
}
} }
public void Send(ReadOnlySpan<byte> data) { public void Send(ReadOnlySpan<byte> data) {
Connection.Send(data); if(data == null)
throw new ArgumentNullException(nameof(data));
try {
if(!SendLock.WaitOne(TIMEOUT))
throw new WsClientMutexFailedException();
Connection.Send(data);
} finally {
SendLock.ReleaseMutex();
}
} }
public void Send(byte[] buffer, int offset, int count) { public void Send(byte[] buffer, int offset, int count) {
if(buffer == null) if(buffer == null)
throw new ArgumentNullException(nameof(buffer)); throw new ArgumentNullException(nameof(buffer));
Connection.Send(buffer.AsSpan(offset, count)); try {
if(!SendLock.WaitOne(TIMEOUT))
throw new WsClientMutexFailedException();
Connection.Send(buffer.AsSpan(offset, count));
} finally {
SendLock.ReleaseMutex();
}
}
public void Send(Action<WsBufferedSend> handler) {
if(handler == null)
throw new ArgumentNullException(nameof(handler));
try {
if(!SendLock.WaitOne(TIMEOUT))
throw new WsClientMutexFailedException();
using(WsBufferedSend bs = Connection.BeginBufferedSend())
handler(bs);
} finally {
SendLock.ReleaseMutex();
}
} }
public void Ping() { public void Ping() {
Connection.Ping(); try {
if(!SendLock.WaitOne(TIMEOUT))
throw new WsClientMutexFailedException();
Connection.Ping();
} finally {
SendLock.ReleaseMutex();
}
} }
public void Ping(ReadOnlySpan<byte> data) { public void Ping(ReadOnlySpan<byte> data) {
Connection.Ping(data); if(data == null)
throw new ArgumentNullException(nameof(data));
try {
if(!SendLock.WaitOne(TIMEOUT))
throw new WsClientMutexFailedException();
Connection.Ping(data);
} finally {
SendLock.ReleaseMutex();
}
} }
public void Ping(byte[] buffer, int offset, int length) { public void Ping(byte[] buffer, int offset, int length) {
if(buffer == null) if(buffer == null)
throw new ArgumentNullException(nameof(buffer)); throw new ArgumentNullException(nameof(buffer));
Connection.Ping(buffer.AsSpan(offset, length)); try {
if(!SendLock.WaitOne(TIMEOUT))
throw new WsClientMutexFailedException();
Connection.Ping(buffer.AsSpan(offset, length));
} finally {
SendLock.ReleaseMutex();
}
} }
public void Pong() { public void Pong() {
Connection.Pong(); try {
if(!SendLock.WaitOne(TIMEOUT))
throw new WsClientMutexFailedException();
Connection.Pong();
} finally {
SendLock.ReleaseMutex();
}
} }
public void Pong(ReadOnlySpan<byte> data) { public void Pong(ReadOnlySpan<byte> data) {
Connection.Pong(data); if(data == null)
throw new ArgumentNullException(nameof(data));
try {
if(!SendLock.WaitOne(TIMEOUT))
throw new WsClientMutexFailedException();
Connection.Pong(data);
} finally {
SendLock.ReleaseMutex();
}
} }
public void Pong(byte[] buffer, int offset, int length) { public void Pong(byte[] buffer, int offset, int length) {
if(buffer == null) if(buffer == null)
throw new ArgumentNullException(nameof(buffer)); throw new ArgumentNullException(nameof(buffer));
Pong(buffer.AsSpan(offset, length)); try {
if(!SendLock.WaitOne(TIMEOUT))
throw new WsClientMutexFailedException();
Connection.Pong(buffer.AsSpan(offset, length));
} finally {
SendLock.ReleaseMutex();
}
} }
public void Close() { public void Close() {
Connection.Close(WsCloseReason.NormalClosure); try {
if(!SendLock.WaitOne(TIMEOUT))
throw new WsClientMutexFailedException();
Connection.Close(WsCloseReason.NormalClosure);
} finally {
SendLock.ReleaseMutex();
}
} }
public void CloseEmpty() { public void CloseEmpty() {
Connection.CloseEmpty(); try {
if(!SendLock.WaitOne(TIMEOUT))
throw new WsClientMutexFailedException();
Connection.CloseEmpty();
} finally {
SendLock.ReleaseMutex();
}
}
public void Close(WsCloseReason opcode) {
try {
if(!SendLock.WaitOne(TIMEOUT))
throw new WsClientMutexFailedException();
Connection.Close(opcode);
} finally {
SendLock.ReleaseMutex();
}
} }
public void Close(string reason) { public void Close(string reason) {
Connection.Close(WsCloseReason.NormalClosure, reason); if(reason == null)
throw new ArgumentNullException(nameof(reason));
try {
if(!SendLock.WaitOne(TIMEOUT))
throw new WsClientMutexFailedException();
Connection.Close(WsCloseReason.NormalClosure, reason);
} finally {
SendLock.ReleaseMutex();
}
}
public void Close(WsCloseReason opcode, string reason) {
if(reason == null)
throw new ArgumentNullException(nameof(reason));
try {
if(!SendLock.WaitOne(TIMEOUT))
throw new WsClientMutexFailedException();
Connection.Close(opcode, reason);
} finally {
SendLock.ReleaseMutex();
}
}
public void Close(ReadOnlySpan<byte> data) {
if(data == null)
throw new ArgumentNullException(nameof(data));
try {
if(!SendLock.WaitOne(TIMEOUT))
throw new WsClientMutexFailedException();
Connection.Close(data);
} finally {
SendLock.ReleaseMutex();
}
} }
public void Close(byte[] buffer, int offset, int length) { public void Close(byte[] buffer, int offset, int length) {
if(buffer == null) if(buffer == null)
throw new ArgumentNullException(nameof(buffer)); throw new ArgumentNullException(nameof(buffer));
Connection.Close(buffer.AsSpan(offset, length)); try {
if(!SendLock.WaitOne(TIMEOUT))
throw new WsClientMutexFailedException();
Connection.Close(buffer.AsSpan(offset, length));
} finally {
SendLock.ReleaseMutex();
}
}
public void Close(WsCloseReason opcode, ReadOnlySpan<byte> data) {
if(data == null)
throw new ArgumentNullException(nameof(data));
try {
if(!SendLock.WaitOne(TIMEOUT))
throw new WsClientMutexFailedException();
Connection.Close(opcode, data);
} finally {
SendLock.ReleaseMutex();
}
} }
public void Close(WsCloseReason code, byte[] buffer, int offset, int length) { public void Close(WsCloseReason code, byte[] buffer, int offset, int length) {
if(buffer == null) if(buffer == null)
throw new ArgumentNullException(nameof(buffer)); throw new ArgumentNullException(nameof(buffer));
Connection.Close(code, buffer.AsSpan(offset, length)); try {
if(!SendLock.WaitOne(TIMEOUT))
throw new WsClientMutexFailedException();
Connection.Close(code, buffer.AsSpan(offset, length));
} finally {
SendLock.ReleaseMutex();
}
} }
private bool IsDisposed; private bool IsDisposed;

View file

@ -2,7 +2,7 @@ using System;
using System.Text; using System.Text;
namespace Hamakaze.WebSocket { namespace Hamakaze.WebSocket {
public class WsCloseMessage : WsMessage { public class WsCloseMessage : WsMessage, IHasBinaryData {
public WsCloseReason Reason { get; } public WsCloseReason Reason { get; }
public string ReasonPhrase { get; } public string ReasonPhrase { get; }
public byte[] Data { get; } public byte[] Data { get; }

View file

@ -4,17 +4,6 @@ using System.Net.Security;
using System.Security.Cryptography; using System.Security.Cryptography;
using System.Text; using System.Text;
// TODO: optimisations with newer .net feature to reduce memory copying
// i think we're generally aware of how much data we're shoving around
// so memorystream can be considered overkill
// Should there be internal mutexing on the socket? (leaning towards no)
// Should all external stream handling be moved to WsClient?
// - IDEA: Buffered send "session" class.
// Would require exposing the raw Write methods
// but i suppose that's what "internal" exists for
namespace Hamakaze.WebSocket { namespace Hamakaze.WebSocket {
public class WsConnection : IDisposable { public class WsConnection : IDisposable {
public Stream Stream { get; } public Stream Stream { get; }
@ -22,12 +11,13 @@ namespace Hamakaze.WebSocket {
public bool IsSecure { get; } public bool IsSecure { get; }
public bool IsClosed { get; private set; } public bool IsClosed { get; private set; }
private const int BUFFER_SIZE = 0x2000;
private const byte MASK_FLAG = 0x80; private const byte MASK_FLAG = 0x80;
private const int MASK_SIZE = 4; private const int MASK_SIZE = 4;
private WsOpcode FragmentedType = 0; private WsOpcode FragmentType = 0;
private MemoryStream FragmentedStream; private MemoryStream FragmentStream;
private WsBufferedSend BufferedSend;
public WsConnection(Stream stream) { public WsConnection(Stream stream) {
Stream = stream ?? throw new ArgumentNullException(nameof(stream)); Stream = stream ?? throw new ArgumentNullException(nameof(stream));
@ -44,7 +34,7 @@ namespace Hamakaze.WebSocket {
throw new Exception(@"Was unable to read the requested amount of data."); throw new Exception(@"Was unable to read the requested amount of data.");
} }
private (WsOpcode opcode, long length, bool isFinal, byte[] mask) ReadFrameHeader() { private (WsOpcode opcode, int length, bool isFinal, byte[] mask) ReadFrameHeader() {
byte[] buffer = new byte[8]; byte[] buffer = new byte[8];
StrictRead(buffer, 0, 2); StrictRead(buffer, 0, 2);
@ -81,7 +71,11 @@ namespace Hamakaze.WebSocket {
// i seriously don't understand the rationale behind both // i seriously don't understand the rationale behind both
// having a framing system but then also supporting frame lengths // having a framing system but then also supporting frame lengths
// of 2^63, feels like 2^16 per frame would be a fine max. // of 2^63, feels like 2^16 per frame would be a fine max.
if(length < 0 || length > long.MaxValue) // UPDATE: decided to put the max at 2^32-1
// it's still more than you should ever need for a single frame
// and it makes working with the number within a .NET context
// less of a bother.
if(length < 0 || length > int.MaxValue)
throw new WsInvalidFrameSizeException(length); throw new WsInvalidFrameSizeException(length);
byte[] mask = null; byte[] mask = null;
@ -91,32 +85,30 @@ namespace Hamakaze.WebSocket {
mask = buffer; mask = buffer;
} }
return (opcode, length, isFinal, mask); return (opcode, (int)length, isFinal, mask);
} }
private long ReadFrameBody(Stream target, long length, byte[] mask, long offset = 0) { private int ReadFrameBody(byte[] target, int length, byte[] mask, int offset = 0) {
if(target == null) if(target == null)
throw new ArgumentNullException(nameof(target)); throw new ArgumentNullException(nameof(target));
if(!target.CanWrite)
throw new ArgumentException(@"Target stream is not writable.", nameof(target));
bool isMasked = mask != null; bool isMasked = mask != null;
int read; int read;
int take = length > BUFFER_SIZE ? BUFFER_SIZE : (int)length; const int bufferSize = 0x1000;
byte[] buffer = new byte[take]; int take = length > bufferSize ? bufferSize : (int)length;
while(length > 0) { while(length > 0) {
read = Stream.Read(buffer, 0, take); read = Stream.Read(target, offset, take);
if(isMasked) if(isMasked)
for(int i = 0; i < read; ++i) for(int i = 0; i < read; ++i) {
buffer[i] ^= mask[offset++ % MASK_SIZE]; int o = offset + i;
target[o] ^= mask[o % MASK_SIZE];
}
target.Write(buffer, 0, read);
offset += read;
length -= read; length -= read;
offset += read;
if(take > length) if(take > length)
take = (int)length; take = (int)length;
@ -126,7 +118,7 @@ namespace Hamakaze.WebSocket {
} }
private WsMessage ReadFrame() { private WsMessage ReadFrame() {
(WsOpcode opcode, long length, bool isFinal, byte[] mask) = ReadFrameHeader(); (WsOpcode opcode, int length, bool isFinal, byte[] mask) = ReadFrameHeader();
if(opcode is not WsOpcode.DataContinue if(opcode is not WsOpcode.DataContinue
and not WsOpcode.DataBinary and not WsOpcode.DataBinary
@ -140,77 +132,55 @@ namespace Hamakaze.WebSocket {
bool isContinue = opcode == WsOpcode.DataContinue; bool isContinue = opcode == WsOpcode.DataContinue;
bool canFragment = (opcode & WsOpcode.CtrlClose) == 0; bool canFragment = (opcode & WsOpcode.CtrlClose) == 0;
MemoryStream bodyStream = null; byte[] body = length < 1 ? null : new byte[length];
if(hasBody) { if(hasBody) {
ReadFrameBody(body, length, mask);
if(canFragment) { if(canFragment) {
if(isContinue) { if(isContinue) {
if(FragmentedType == 0) if(FragmentType == 0)
throw new WsUnexpectedContinueException(); throw new WsUnexpectedContinueException();
opcode = FragmentedType; opcode = FragmentType;
if(FragmentedStream == null) FragmentStream ??= new();
FragmentedStream = bodyStream = new(); FragmentStream.Write(body, 0, length);
else
bodyStream = FragmentedStream;
} else { } else {
if(FragmentedType != 0) if(FragmentType != 0)
throw new WsUnexpectedDataException(); throw new WsUnexpectedDataException();
if(isFinal) if(!isFinal) {
bodyStream = new(); FragmentType = opcode;
else { FragmentStream = new();
FragmentedType = opcode; FragmentStream.Write(body, 0, length);
FragmentedStream = bodyStream = new();
} }
} }
} else }
bodyStream = new();
ReadFrameBody(bodyStream, length, mask);
} }
WsMessage msg; WsMessage msg;
if(isFinal) { if(isFinal) {
if(canFragment && isContinue) { if(canFragment && isContinue) {
FragmentedType = 0; FragmentType = 0;
FragmentedStream = null;
body = FragmentStream.ToArray();
FragmentStream.Dispose();
FragmentStream = null;
} }
byte[] body = null; msg = opcode switch {
WsOpcode.DataText => new WsTextMessage(body),
WsOpcode.DataBinary => new WsBinaryMessage(body),
if(bodyStream != null) { WsOpcode.CtrlClose => new WsCloseMessage(body),
if(bodyStream.Length > 0) WsOpcode.CtrlPing => new WsPingMessage(body),
body = bodyStream.ToArray(); WsOpcode.CtrlPong => new WsPongMessage(body),
bodyStream.Dispose();
}
switch(opcode) { // fallback, if we end up here something is very fucked
case WsOpcode.DataText: _ => throw new WsUnsupportedOpcodeException(opcode),
msg = new WsTextMessage(body); };
break;
case WsOpcode.DataBinary:
msg = new WsBinaryMessage(body);
break;
case WsOpcode.CtrlClose:
msg = new WsCloseMessage(body);
break;
case WsOpcode.CtrlPing:
msg = new WsPingMessage(body);
break;
case WsOpcode.CtrlPong:
msg = new WsPongMessage(body);
break;
default: // fallback, if we end up here something is very fucked
throw new WsUnsupportedOpcodeException(opcode);
}
} else msg = null; } else msg = null;
return msg; return msg;
@ -222,7 +192,10 @@ namespace Hamakaze.WebSocket {
return msg; return msg;
} }
private void WriteFrameHeader(WsOpcode opcode, long length, bool isFinal, byte[] mask = null) { private void WriteFrameHeader(WsOpcode opcode, int length, bool isFinal, byte[] mask = null) {
if(length < 0 || length > int.MaxValue)
throw new WsInvalidFrameSizeException(length);
bool shouldMask = mask != null; bool shouldMask = mask != null;
if(isFinal) if(isFinal)
@ -247,9 +220,13 @@ namespace Hamakaze.WebSocket {
if(shouldMask) if(shouldMask)
Stream.Write(mask, 0, MASK_SIZE); Stream.Write(mask, 0, MASK_SIZE);
Stream.Flush();
} }
private long WriteFrameBody(ReadOnlySpan<byte> body, byte[] mask = null, long offset = 0) { private int WriteFrameBody(ReadOnlySpan<byte> body, byte[] mask = null, int offset = 0) {
if(body == null)
throw new ArgumentNullException(nameof(body));
if(mask != null) { if(mask != null) {
byte[] masked = new byte[body.Length]; byte[] masked = new byte[body.Length];
@ -260,37 +237,34 @@ namespace Hamakaze.WebSocket {
} }
Stream.Write(body); Stream.Write(body);
Stream.Flush();
return offset; return offset;
} }
private long WriteFrameBody(Stream body, byte[] mask = null, long offset = 0) { internal void WriteFrame(WsOpcode opcode, ReadOnlySpan<byte> body, bool isFinal) {
bool shouldMask = mask != null; if(body == null)
throw new ArgumentNullException(nameof(body));
int read;
byte[] buffer = new byte[BUFFER_SIZE];
while((read = body.Read(buffer, 0, BUFFER_SIZE)) > 0)
offset = WriteFrameBody(buffer.AsSpan(0, read), mask, offset);
return offset;
}
private void WriteFrame(WsOpcode opcode, ReadOnlySpan<byte> body, bool isFinal) {
byte[] mask = GenerateMask(); byte[] mask = GenerateMask();
WriteFrameHeader(opcode, body.Length, isFinal, mask); WriteFrameHeader(opcode, body.Length, isFinal, mask);
if(body.Length > 0) if(body.Length > 0)
WriteFrameBody(body, mask); WriteFrameBody(body, mask);
Stream.Flush();
} }
private void Write(WsOpcode opcode, ReadOnlySpan<byte> body) { private void WriteData(WsOpcode opcode, ReadOnlySpan<byte> body) {
if(body.Length > 0xFFFF) { if(body == null)
WriteFrame(opcode, body.Slice(0, 0xFFFF), false); throw new ArgumentNullException(nameof(body));
body = body.Slice(0xFFFF); if(BufferedSend != null)
throw new WsBufferedSendInSessionException();
while(body.Length > 0xFFFF) { if(body.Length > ushort.MaxValue) {
WriteFrame(WsOpcode.DataContinue, body.Slice(0, 0xFFFF), false); WriteFrame(opcode, body.Slice(0, ushort.MaxValue), false);
body = body.Slice(0xFFFF); body = body.Slice(ushort.MaxValue);
while(body.Length > ushort.MaxValue) {
WriteFrame(WsOpcode.DataContinue, body.Slice(0, ushort.MaxValue), false);
body = body.Slice(ushort.MaxValue);
} }
WriteFrame(WsOpcode.DataContinue, body, true); WriteFrame(WsOpcode.DataContinue, body, true);
@ -298,119 +272,74 @@ namespace Hamakaze.WebSocket {
WriteFrame(opcode, body, true); WriteFrame(opcode, body, true);
} }
private void Write(WsOpcode opcode, Stream stream) {
if(stream == null)
throw new ArgumentNullException(nameof(stream));
if(!stream.CanRead)
throw new ArgumentException(@"Provided stream cannot be read.", nameof(stream));
int read;
byte[] buffer = new byte[BUFFER_SIZE];
while((read = stream.Read(buffer, 0, BUFFER_SIZE)) > 0) {
WriteFrame(opcode, buffer.AsSpan(0, read), false);
if(opcode != WsOpcode.DataContinue)
opcode = WsOpcode.DataContinue;
}
// this kinda fucking sucks
WriteFrame(WsOpcode.CtrlClose, ReadOnlySpan<byte>.Empty, true);
}
private void Write(WsOpcode opcode, Stream stream, int length) {
if(stream == null)
throw new ArgumentNullException(nameof(stream));
if(!stream.CanRead)
throw new ArgumentException(@"Provided stream cannot be read.", nameof(stream));
int read;
byte[] buffer = new byte[BUFFER_SIZE];
if(length > BUFFER_SIZE) {
int take = BUFFER_SIZE;
while((read = stream.Read(buffer, 0, take)) > 0) {
WriteFrame(opcode, buffer.AsSpan(0, read), false);
if(opcode != WsOpcode.DataContinue)
opcode = WsOpcode.DataContinue;
length -= read;
if(take > length)
take = length;
}
// feel like there'd be a better way to do this
// but i feel like assuming that any successful read with something
// still coming (read == BUFFER_SIZE) will bite me in the ass later somehow
WriteFrame(WsOpcode.CtrlClose, Span<byte>.Empty, true);
} else {
read = stream.Read(buffer, 0, BUFFER_SIZE);
if(read > 0)
WriteFrame(WsOpcode.DataBinary, buffer.AsSpan(0, read), true);
}
}
public void Send(string text) public void Send(string text)
=> Write(WsOpcode.DataText, Encoding.UTF8.GetBytes(text)); => WriteData(WsOpcode.DataText, Encoding.UTF8.GetBytes(text));
public void Send(ReadOnlySpan<byte> buffer) public void Send(ReadOnlySpan<byte> buffer)
=> Write(WsOpcode.DataBinary, buffer); => WriteData(WsOpcode.DataBinary, buffer);
public void Send(Stream source) public WsBufferedSend BeginBufferedSend() {
=> Write(WsOpcode.DataBinary, source); if(BufferedSend != null)
throw new WsBufferedSendAlreadyActiveException();
public void Send(Stream source, int count) return BufferedSend = new(this);
=> Write(WsOpcode.DataBinary, source, count);
private void WriteControlFrame(WsOpcode opcode) {
WriteFrameHeader(opcode, 0, true, GenerateMask());
Stream.Flush();
} }
private void WriteControlFrame(WsOpcode opcode, ReadOnlySpan<byte> buffer) { // this method should only be called from within WsBufferedSend.Dispose
internal void EndBufferedSend() {
BufferedSend = null;
}
private void WriteControl(WsOpcode opcode)
=> WriteFrameHeader(opcode, 0, true, GenerateMask());
private void WriteControl(WsOpcode opcode, ReadOnlySpan<byte> buffer) {
if(buffer == null)
throw new ArgumentNullException(nameof(buffer));
if(buffer.Length > 125) if(buffer.Length > 125)
throw new ArgumentException(@"Data may not be more than 125 bytes.", nameof(buffer)); throw new ArgumentException(@"Data may not be more than 125 bytes.", nameof(buffer));
byte[] mask = GenerateMask(); byte[] mask = GenerateMask();
WriteFrameHeader(opcode, buffer.Length, true, mask); WriteFrameHeader(opcode, buffer.Length, true, mask);
WriteFrameBody(buffer, mask); WriteFrameBody(buffer, mask);
Stream.Flush();
} }
public void Ping() public void Ping()
=> WriteControlFrame(WsOpcode.CtrlPing); => WriteControl(WsOpcode.CtrlPing);
public void Ping(ReadOnlySpan<byte> buffer) public void Ping(ReadOnlySpan<byte> buffer)
=> WriteControlFrame(WsOpcode.CtrlPing, buffer); => WriteControl(WsOpcode.CtrlPing, buffer);
public void Pong() public void Pong()
=> WriteControlFrame(WsOpcode.CtrlPong); => WriteControl(WsOpcode.CtrlPong);
public void Pong(ReadOnlySpan<byte> buffer) public void Pong(ReadOnlySpan<byte> buffer)
=> WriteControlFrame(WsOpcode.CtrlPong, buffer); => WriteControl(WsOpcode.CtrlPong, buffer);
public void CloseEmpty() { public void CloseEmpty() {
if(IsClosed) if(IsClosed)
return; return;
IsClosed = true; IsClosed = true;
WriteControlFrame(WsOpcode.CtrlClose); WriteControl(WsOpcode.CtrlClose);
} }
public void Close(ReadOnlySpan<byte> buffer) { public void Close(ReadOnlySpan<byte> buffer) {
if(buffer == null)
throw new ArgumentNullException(nameof(buffer));
if(IsClosed) if(IsClosed)
return; return;
IsClosed = true; IsClosed = true;
WriteControlFrame(WsOpcode.CtrlClose, buffer); WriteControl(WsOpcode.CtrlClose, buffer);
} }
public void Close(WsCloseReason code) public void Close(WsCloseReason code)
=> Close(WsUtils.FromU16((ushort)code)); => Close(WsUtils.FromU16((ushort)code));
public void Close(WsCloseReason code, ReadOnlySpan<byte> reason) { public void Close(WsCloseReason code, ReadOnlySpan<byte> reason) {
if(reason == null)
throw new ArgumentNullException(nameof(reason));
if(reason.Length > 123) if(reason.Length > 123)
throw new ArgumentException(@"Reason may not be more than 123 bytes.", nameof(reason)); throw new ArgumentException(@"Reason may not be more than 123 bytes.", nameof(reason));
@ -422,14 +351,11 @@ namespace Hamakaze.WebSocket {
WriteFrameHeader(WsOpcode.CtrlClose, 2 + reason.Length, true, mask); WriteFrameHeader(WsOpcode.CtrlClose, 2 + reason.Length, true, mask);
WriteFrameBody(WsUtils.FromU16((ushort)code), mask); WriteFrameBody(WsUtils.FromU16((ushort)code), mask);
WriteFrameBody(reason, mask, 2); WriteFrameBody(reason, mask, 2);
Stream.Flush();
} }
public void Close(WsCloseReason code, string reason) { public void Close(WsCloseReason code, string reason) {
if(string.IsNullOrEmpty(reason)) { if(reason == null)
Close(code); throw new ArgumentNullException(nameof(reason));
return;
}
int length = Encoding.UTF8.GetByteCount(reason); int length = Encoding.UTF8.GetByteCount(reason);
if(length > 123) if(length > 123)
@ -443,7 +369,6 @@ namespace Hamakaze.WebSocket {
WriteFrameHeader(WsOpcode.CtrlClose, 2 + reason.Length, true, mask); WriteFrameHeader(WsOpcode.CtrlClose, 2 + reason.Length, true, mask);
WriteFrameBody(WsUtils.FromU16((ushort)code), mask); WriteFrameBody(WsUtils.FromU16((ushort)code), mask);
WriteFrameBody(Encoding.UTF8.GetBytes(reason), mask, 2); WriteFrameBody(Encoding.UTF8.GetBytes(reason), mask, 2);
Stream.Flush();
} }
private bool IsDisposed; private bool IsDisposed;
@ -462,6 +387,8 @@ namespace Hamakaze.WebSocket {
return; return;
IsDisposed = true; IsDisposed = true;
BufferedSend?.Dispose();
FragmentStream?.Dispose();
Stream.Dispose(); Stream.Dispose();
} }
} }

View file

@ -26,4 +26,16 @@ namespace Hamakaze.WebSocket {
public class WsInvalidControlFrameException : WsException { public class WsInvalidControlFrameException : WsException {
public WsInvalidControlFrameException(string variant) : base($@"An invalid WebSocket control frame was encountered: {variant}") { } public WsInvalidControlFrameException(string variant) : base($@"An invalid WebSocket control frame was encountered: {variant}") { }
} }
public class WsClientMutexFailedException : WsException {
public WsClientMutexFailedException() : base(@"Failed to acquire send mutex.") { }
}
public class WsBufferedSendAlreadyActiveException : WsException {
public WsBufferedSendAlreadyActiveException() : base(@"A buffered websocket send is already in session.") { }
}
public class WsBufferedSendInSessionException : WsException {
public WsBufferedSendInSessionException() : base(@"Cannot send data while a buffered send is in session.") { }
}
} }

View file

@ -1,7 +1,7 @@
using System; using System;
namespace Hamakaze.WebSocket { namespace Hamakaze.WebSocket {
public class WsPingMessage : WsMessage { public class WsPingMessage : WsMessage, IHasBinaryData {
public byte[] Data { get; } public byte[] Data { get; }
public WsPingMessage(byte[] data) { public WsPingMessage(byte[] data) {

View file

@ -1,7 +1,7 @@
using System; using System;
namespace Hamakaze.WebSocket { namespace Hamakaze.WebSocket {
public class WsPongMessage : WsMessage { public class WsPongMessage : WsMessage, IHasBinaryData {
public byte[] Data { get; } public byte[] Data { get; }
public WsPongMessage(byte[] data) { public WsPongMessage(byte[] data) {

View file

@ -10,5 +10,11 @@ namespace Hamakaze.WebSocket {
else else
Text = string.Empty; Text = string.Empty;
} }
public static implicit operator string(WsTextMessage msg) => msg.Text;
public override string ToString() {
return Text;
}
} }
} }