diff --git a/Hamakaze/HttpClient.cs b/Hamakaze/HttpClient.cs index 24f05d3..5d694a9 100644 --- a/Hamakaze/HttpClient.cs +++ b/Hamakaze/HttpClient.cs @@ -99,38 +99,83 @@ namespace Hamakaze { Action onOpen, Action onMessage, Action onError, - IEnumerable protocols = null - ) { - CreateWsConnection( + IEnumerable protocols = null, + Action onResponse = null, + bool disposeRequest = true, + bool disposeResponse = true + ) => CreateWsConnection( url, conn => onOpen(new WsClient(conn, onMessage, onError)), onError, - protocols + protocols, + onResponse, + disposeRequest, + disposeResponse + ); + + public void CreateWsClient( + HttpRequestMessage request, + Action onOpen, + Action onMessage, + Action onError, + IEnumerable protocols = null, + Action onResponse = null, + bool disposeRequest = true, + bool disposeResponse = true + ) => CreateWsConnection( + request, + conn => onOpen(new WsClient(conn, onMessage, onError)), + onError, + protocols, + onResponse, + disposeRequest, + disposeResponse ); - } public void CreateWsConnection( string url, Action onOpen, Action onError, - IEnumerable protocols = null + IEnumerable protocols = null, + Action onResponse = null, + bool disposeRequest = true, + bool disposeResponse = true + ) => CreateWsConnection( + new HttpRequestMessage(@"GET", url), + onOpen, + onError, + protocols, + onResponse, + disposeRequest, + disposeResponse + ); + + public void CreateWsConnection( + HttpRequestMessage request, + Action onOpen, + Action onError, + IEnumerable protocols = null, + Action onResponse = null, + bool disposeRequest = true, + bool disposeResponse = true ) { string key = Convert.ToBase64String(RandomNumberGenerator.GetBytes(WS_RNG)); - HttpRequestMessage req = new HttpRequestMessage(@"GET", url); - req.Connection = HttpConnectionHeader.UPGRADE; - req.SetHeader(@"Cache-Control", @"no-cache"); - req.SetHeader(@"Upgrade", WS_PROTO); - req.SetHeader(@"Sec-WebSocket-Key", key); - req.SetHeader(@"Sec-WebSocket-Version", @"13"); + request.Connection = HttpConnectionHeader.UPGRADE; + request.SetHeader(@"Cache-Control", @"no-cache"); + request.SetHeader(@"Upgrade", WS_PROTO); + request.SetHeader(@"Sec-WebSocket-Key", key); + request.SetHeader(@"Sec-WebSocket-Version", @"13"); if(protocols?.Any() == true) - req.SetHeader(@"Sec-WebSocket-Protocol", string.Join(@", ", protocols)); + request.SetHeader(@"Sec-WebSocket-Protocol", string.Join(@", ", protocols)); SendRequest( - req, + request, (t, res) => { try { + onResponse?.Invoke(res); + if(res.ProtocolVersion.CompareTo(@"1.1") < 0) throw new HttpUpgradeProtocolVersionException(@"1.1", res.ProtocolVersion); @@ -168,7 +213,9 @@ namespace Hamakaze { onError(ex); } }, - (t, ex) => onError(ex) + (t, ex) => onError(ex), + disposeRequest: disposeRequest, + disposeResponse: disposeResponse ); } @@ -182,9 +229,57 @@ namespace Hamakaze { Action onStateChange = null, bool disposeRequest = true, bool disposeResponse = true - ) { - Instance.SendRequest(request, onComplete, onError, onCancel, onDownloadProgress, onUploadProgress, onStateChange, disposeRequest, disposeResponse); - } + ) => Instance.SendRequest( + request, + onComplete, + onError, + onCancel, + onDownloadProgress, + onUploadProgress, + onStateChange, + disposeRequest, + disposeResponse + ); + + public static void Connect( + string url, + Action onOpen, + Action onMessage, + Action onError, + IEnumerable protocols = null, + Action onResponse = null, + bool disposeRequest = true, + bool disposeResponse = true + ) => Instance.CreateWsClient( + url, + onOpen, + onMessage, + onError, + protocols, + onResponse, + disposeRequest, + disposeResponse + ); + + public static void Connect( + HttpRequestMessage request, + Action onOpen, + Action onMessage, + Action onError, + IEnumerable protocols = null, + Action onResponse = null, + bool disposeRequest = true, + bool disposeResponse = true + ) => Instance.CreateWsClient( + request, + onOpen, + onMessage, + onError, + protocols, + onResponse, + disposeRequest, + disposeResponse + ); private bool IsDisposed; ~HttpClient() diff --git a/Hamakaze/WebSocket/IHasBinaryData.cs b/Hamakaze/WebSocket/IHasBinaryData.cs new file mode 100644 index 0000000..bd994b7 --- /dev/null +++ b/Hamakaze/WebSocket/IHasBinaryData.cs @@ -0,0 +1,5 @@ +namespace Hamakaze.WebSocket { + public interface IHasBinaryData { + byte[] Data { get; } + } +} diff --git a/Hamakaze/WebSocket/WsBinaryMessage.cs b/Hamakaze/WebSocket/WsBinaryMessage.cs index 172ba4f..a0be483 100644 --- a/Hamakaze/WebSocket/WsBinaryMessage.cs +++ b/Hamakaze/WebSocket/WsBinaryMessage.cs @@ -1,7 +1,7 @@ using System; namespace Hamakaze.WebSocket { - public class WsBinaryMessage : WsMessage { + public class WsBinaryMessage : WsMessage, IHasBinaryData { public byte[] Data { get; } public WsBinaryMessage(byte[] data) { diff --git a/Hamakaze/WebSocket/WsBufferedSend.cs b/Hamakaze/WebSocket/WsBufferedSend.cs index b5e7c2d..9c628be 100644 --- a/Hamakaze/WebSocket/WsBufferedSend.cs +++ b/Hamakaze/WebSocket/WsBufferedSend.cs @@ -8,7 +8,11 @@ namespace Hamakaze.WebSocket { Connection = conn ?? throw new ArgumentNullException(nameof(conn)); } - // + public void SendPart(ReadOnlySpan data) + => Connection.WriteFrame(WsOpcode.DataBinary, data, false); + + public void SendFinalPart(ReadOnlySpan data) + => Connection.WriteFrame(WsOpcode.DataBinary, data, true); private bool IsDisposed; @@ -26,6 +30,7 @@ namespace Hamakaze.WebSocket { return; IsDisposed = true; + Connection.EndBufferedSend(); } } } diff --git a/Hamakaze/WebSocket/WsClient.cs b/Hamakaze/WebSocket/WsClient.cs index 5bb788e..d626c71 100644 --- a/Hamakaze/WebSocket/WsClient.cs +++ b/Hamakaze/WebSocket/WsClient.cs @@ -1,6 +1,8 @@ using System; using System.Threading; +// todo: sending errors as fake close messages + namespace Hamakaze.WebSocket { public class WsClient : IDisposable { public WsConnection Connection { get; } @@ -11,6 +13,7 @@ namespace Hamakaze.WebSocket { private Action ExceptionHandler { get; } private Mutex SendLock { get; } + private const int TIMEOUT = 60000; public WsClient( WsConnection connection, @@ -38,81 +41,246 @@ namespace Hamakaze.WebSocket { } public void Send(string text) { - Connection.Send(text); + try { + if(!SendLock.WaitOne(TIMEOUT)) + throw new WsClientMutexFailedException(); + Connection.Send(text); + } finally { + SendLock.ReleaseMutex(); + } } public void Send(object obj) { if(obj == null) throw new ArgumentNullException(nameof(obj)); - Connection.Send(obj.ToString()); + try { + if(!SendLock.WaitOne(TIMEOUT)) + throw new WsClientMutexFailedException(); + Connection.Send(obj.ToString()); + } finally { + SendLock.ReleaseMutex(); + } } public void Send(ReadOnlySpan data) { - Connection.Send(data); + if(data == null) + throw new ArgumentNullException(nameof(data)); + + try { + if(!SendLock.WaitOne(TIMEOUT)) + throw new WsClientMutexFailedException(); + Connection.Send(data); + } finally { + SendLock.ReleaseMutex(); + } } public void Send(byte[] buffer, int offset, int count) { if(buffer == null) throw new ArgumentNullException(nameof(buffer)); - Connection.Send(buffer.AsSpan(offset, count)); + try { + if(!SendLock.WaitOne(TIMEOUT)) + throw new WsClientMutexFailedException(); + Connection.Send(buffer.AsSpan(offset, count)); + } finally { + SendLock.ReleaseMutex(); + } + } + + public void Send(Action handler) { + if(handler == null) + throw new ArgumentNullException(nameof(handler)); + + try { + if(!SendLock.WaitOne(TIMEOUT)) + throw new WsClientMutexFailedException(); + using(WsBufferedSend bs = Connection.BeginBufferedSend()) + handler(bs); + } finally { + SendLock.ReleaseMutex(); + } } public void Ping() { - Connection.Ping(); + try { + if(!SendLock.WaitOne(TIMEOUT)) + throw new WsClientMutexFailedException(); + Connection.Ping(); + } finally { + SendLock.ReleaseMutex(); + } } public void Ping(ReadOnlySpan data) { - Connection.Ping(data); + if(data == null) + throw new ArgumentNullException(nameof(data)); + + try { + if(!SendLock.WaitOne(TIMEOUT)) + throw new WsClientMutexFailedException(); + Connection.Ping(data); + } finally { + SendLock.ReleaseMutex(); + } } public void Ping(byte[] buffer, int offset, int length) { if(buffer == null) throw new ArgumentNullException(nameof(buffer)); - Connection.Ping(buffer.AsSpan(offset, length)); + try { + if(!SendLock.WaitOne(TIMEOUT)) + throw new WsClientMutexFailedException(); + Connection.Ping(buffer.AsSpan(offset, length)); + } finally { + SendLock.ReleaseMutex(); + } } public void Pong() { - Connection.Pong(); + try { + if(!SendLock.WaitOne(TIMEOUT)) + throw new WsClientMutexFailedException(); + Connection.Pong(); + } finally { + SendLock.ReleaseMutex(); + } } public void Pong(ReadOnlySpan data) { - Connection.Pong(data); + if(data == null) + throw new ArgumentNullException(nameof(data)); + + try { + if(!SendLock.WaitOne(TIMEOUT)) + throw new WsClientMutexFailedException(); + Connection.Pong(data); + } finally { + SendLock.ReleaseMutex(); + } } public void Pong(byte[] buffer, int offset, int length) { if(buffer == null) throw new ArgumentNullException(nameof(buffer)); - Pong(buffer.AsSpan(offset, length)); + try { + if(!SendLock.WaitOne(TIMEOUT)) + throw new WsClientMutexFailedException(); + Connection.Pong(buffer.AsSpan(offset, length)); + } finally { + SendLock.ReleaseMutex(); + } } public void Close() { - Connection.Close(WsCloseReason.NormalClosure); + try { + if(!SendLock.WaitOne(TIMEOUT)) + throw new WsClientMutexFailedException(); + Connection.Close(WsCloseReason.NormalClosure); + } finally { + SendLock.ReleaseMutex(); + } } public void CloseEmpty() { - Connection.CloseEmpty(); + try { + if(!SendLock.WaitOne(TIMEOUT)) + throw new WsClientMutexFailedException(); + Connection.CloseEmpty(); + } finally { + SendLock.ReleaseMutex(); + } + } + + public void Close(WsCloseReason opcode) { + try { + if(!SendLock.WaitOne(TIMEOUT)) + throw new WsClientMutexFailedException(); + Connection.Close(opcode); + } finally { + SendLock.ReleaseMutex(); + } } public void Close(string reason) { - Connection.Close(WsCloseReason.NormalClosure, reason); + if(reason == null) + throw new ArgumentNullException(nameof(reason)); + + try { + if(!SendLock.WaitOne(TIMEOUT)) + throw new WsClientMutexFailedException(); + Connection.Close(WsCloseReason.NormalClosure, reason); + } finally { + SendLock.ReleaseMutex(); + } + } + + public void Close(WsCloseReason opcode, string reason) { + if(reason == null) + throw new ArgumentNullException(nameof(reason)); + + try { + if(!SendLock.WaitOne(TIMEOUT)) + throw new WsClientMutexFailedException(); + Connection.Close(opcode, reason); + } finally { + SendLock.ReleaseMutex(); + } + } + + public void Close(ReadOnlySpan data) { + if(data == null) + throw new ArgumentNullException(nameof(data)); + + try { + if(!SendLock.WaitOne(TIMEOUT)) + throw new WsClientMutexFailedException(); + Connection.Close(data); + } finally { + SendLock.ReleaseMutex(); + } } public void Close(byte[] buffer, int offset, int length) { if(buffer == null) throw new ArgumentNullException(nameof(buffer)); - Connection.Close(buffer.AsSpan(offset, length)); + try { + if(!SendLock.WaitOne(TIMEOUT)) + throw new WsClientMutexFailedException(); + Connection.Close(buffer.AsSpan(offset, length)); + } finally { + SendLock.ReleaseMutex(); + } + } + + public void Close(WsCloseReason opcode, ReadOnlySpan data) { + if(data == null) + throw new ArgumentNullException(nameof(data)); + + try { + if(!SendLock.WaitOne(TIMEOUT)) + throw new WsClientMutexFailedException(); + Connection.Close(opcode, data); + } finally { + SendLock.ReleaseMutex(); + } } public void Close(WsCloseReason code, byte[] buffer, int offset, int length) { if(buffer == null) throw new ArgumentNullException(nameof(buffer)); - Connection.Close(code, buffer.AsSpan(offset, length)); + try { + if(!SendLock.WaitOne(TIMEOUT)) + throw new WsClientMutexFailedException(); + Connection.Close(code, buffer.AsSpan(offset, length)); + } finally { + SendLock.ReleaseMutex(); + } } private bool IsDisposed; diff --git a/Hamakaze/WebSocket/WsCloseMessage.cs b/Hamakaze/WebSocket/WsCloseMessage.cs index e0a584a..9989046 100644 --- a/Hamakaze/WebSocket/WsCloseMessage.cs +++ b/Hamakaze/WebSocket/WsCloseMessage.cs @@ -2,7 +2,7 @@ using System; using System.Text; namespace Hamakaze.WebSocket { - public class WsCloseMessage : WsMessage { + public class WsCloseMessage : WsMessage, IHasBinaryData { public WsCloseReason Reason { get; } public string ReasonPhrase { get; } public byte[] Data { get; } diff --git a/Hamakaze/WebSocket/WsConnection.cs b/Hamakaze/WebSocket/WsConnection.cs index 164dc13..a1f42c3 100644 --- a/Hamakaze/WebSocket/WsConnection.cs +++ b/Hamakaze/WebSocket/WsConnection.cs @@ -4,17 +4,6 @@ using System.Net.Security; using System.Security.Cryptography; using System.Text; -// TODO: optimisations with newer .net feature to reduce memory copying -// i think we're generally aware of how much data we're shoving around -// so memorystream can be considered overkill - -// Should there be internal mutexing on the socket? (leaning towards no) - -// Should all external stream handling be moved to WsClient? -// - IDEA: Buffered send "session" class. -// Would require exposing the raw Write methods -// but i suppose that's what "internal" exists for - namespace Hamakaze.WebSocket { public class WsConnection : IDisposable { public Stream Stream { get; } @@ -22,12 +11,13 @@ namespace Hamakaze.WebSocket { public bool IsSecure { get; } public bool IsClosed { get; private set; } - private const int BUFFER_SIZE = 0x2000; private const byte MASK_FLAG = 0x80; private const int MASK_SIZE = 4; - private WsOpcode FragmentedType = 0; - private MemoryStream FragmentedStream; + private WsOpcode FragmentType = 0; + private MemoryStream FragmentStream; + + private WsBufferedSend BufferedSend; public WsConnection(Stream stream) { Stream = stream ?? throw new ArgumentNullException(nameof(stream)); @@ -44,7 +34,7 @@ namespace Hamakaze.WebSocket { throw new Exception(@"Was unable to read the requested amount of data."); } - private (WsOpcode opcode, long length, bool isFinal, byte[] mask) ReadFrameHeader() { + private (WsOpcode opcode, int length, bool isFinal, byte[] mask) ReadFrameHeader() { byte[] buffer = new byte[8]; StrictRead(buffer, 0, 2); @@ -81,7 +71,11 @@ namespace Hamakaze.WebSocket { // i seriously don't understand the rationale behind both // having a framing system but then also supporting frame lengths // of 2^63, feels like 2^16 per frame would be a fine max. - if(length < 0 || length > long.MaxValue) + // UPDATE: decided to put the max at 2^32-1 + // it's still more than you should ever need for a single frame + // and it makes working with the number within a .NET context + // less of a bother. + if(length < 0 || length > int.MaxValue) throw new WsInvalidFrameSizeException(length); byte[] mask = null; @@ -91,32 +85,30 @@ namespace Hamakaze.WebSocket { mask = buffer; } - return (opcode, length, isFinal, mask); + return (opcode, (int)length, isFinal, mask); } - private long ReadFrameBody(Stream target, long length, byte[] mask, long offset = 0) { + private int ReadFrameBody(byte[] target, int length, byte[] mask, int offset = 0) { if(target == null) throw new ArgumentNullException(nameof(target)); - if(!target.CanWrite) - throw new ArgumentException(@"Target stream is not writable.", nameof(target)); bool isMasked = mask != null; int read; - int take = length > BUFFER_SIZE ? BUFFER_SIZE : (int)length; - byte[] buffer = new byte[take]; + const int bufferSize = 0x1000; + int take = length > bufferSize ? bufferSize : (int)length; while(length > 0) { - read = Stream.Read(buffer, 0, take); + read = Stream.Read(target, offset, take); if(isMasked) - for(int i = 0; i < read; ++i) - buffer[i] ^= mask[offset++ % MASK_SIZE]; + for(int i = 0; i < read; ++i) { + int o = offset + i; + target[o] ^= mask[o % MASK_SIZE]; + } - target.Write(buffer, 0, read); - - offset += read; length -= read; + offset += read; if(take > length) take = (int)length; @@ -126,7 +118,7 @@ namespace Hamakaze.WebSocket { } private WsMessage ReadFrame() { - (WsOpcode opcode, long length, bool isFinal, byte[] mask) = ReadFrameHeader(); + (WsOpcode opcode, int length, bool isFinal, byte[] mask) = ReadFrameHeader(); if(opcode is not WsOpcode.DataContinue and not WsOpcode.DataBinary @@ -140,77 +132,55 @@ namespace Hamakaze.WebSocket { bool isContinue = opcode == WsOpcode.DataContinue; bool canFragment = (opcode & WsOpcode.CtrlClose) == 0; - MemoryStream bodyStream = null; + byte[] body = length < 1 ? null : new byte[length]; if(hasBody) { + ReadFrameBody(body, length, mask); + if(canFragment) { if(isContinue) { - if(FragmentedType == 0) + if(FragmentType == 0) throw new WsUnexpectedContinueException(); - opcode = FragmentedType; + opcode = FragmentType; - if(FragmentedStream == null) - FragmentedStream = bodyStream = new(); - else - bodyStream = FragmentedStream; + FragmentStream ??= new(); + FragmentStream.Write(body, 0, length); } else { - if(FragmentedType != 0) + if(FragmentType != 0) throw new WsUnexpectedDataException(); - if(isFinal) - bodyStream = new(); - else { - FragmentedType = opcode; - FragmentedStream = bodyStream = new(); + if(!isFinal) { + FragmentType = opcode; + FragmentStream = new(); + FragmentStream.Write(body, 0, length); } } - } else - bodyStream = new(); - - ReadFrameBody(bodyStream, length, mask); + } } WsMessage msg; if(isFinal) { if(canFragment && isContinue) { - FragmentedType = 0; - FragmentedStream = null; + FragmentType = 0; + + body = FragmentStream.ToArray(); + FragmentStream.Dispose(); + FragmentStream = null; } - byte[] body = null; + msg = opcode switch { + WsOpcode.DataText => new WsTextMessage(body), + WsOpcode.DataBinary => new WsBinaryMessage(body), - if(bodyStream != null) { - if(bodyStream.Length > 0) - body = bodyStream.ToArray(); - bodyStream.Dispose(); - } + WsOpcode.CtrlClose => new WsCloseMessage(body), + WsOpcode.CtrlPing => new WsPingMessage(body), + WsOpcode.CtrlPong => new WsPongMessage(body), - switch(opcode) { - case WsOpcode.DataText: - msg = new WsTextMessage(body); - break; - - case WsOpcode.DataBinary: - msg = new WsBinaryMessage(body); - break; - - case WsOpcode.CtrlClose: - msg = new WsCloseMessage(body); - break; - - case WsOpcode.CtrlPing: - msg = new WsPingMessage(body); - break; - - case WsOpcode.CtrlPong: - msg = new WsPongMessage(body); - break; - - default: // fallback, if we end up here something is very fucked - throw new WsUnsupportedOpcodeException(opcode); - } + // fallback, if we end up here something is very fucked + _ => throw new WsUnsupportedOpcodeException(opcode), + }; } else msg = null; return msg; @@ -222,7 +192,10 @@ namespace Hamakaze.WebSocket { return msg; } - private void WriteFrameHeader(WsOpcode opcode, long length, bool isFinal, byte[] mask = null) { + private void WriteFrameHeader(WsOpcode opcode, int length, bool isFinal, byte[] mask = null) { + if(length < 0 || length > int.MaxValue) + throw new WsInvalidFrameSizeException(length); + bool shouldMask = mask != null; if(isFinal) @@ -247,9 +220,13 @@ namespace Hamakaze.WebSocket { if(shouldMask) Stream.Write(mask, 0, MASK_SIZE); + Stream.Flush(); } - private long WriteFrameBody(ReadOnlySpan body, byte[] mask = null, long offset = 0) { + private int WriteFrameBody(ReadOnlySpan body, byte[] mask = null, int offset = 0) { + if(body == null) + throw new ArgumentNullException(nameof(body)); + if(mask != null) { byte[] masked = new byte[body.Length]; @@ -260,37 +237,34 @@ namespace Hamakaze.WebSocket { } Stream.Write(body); + Stream.Flush(); return offset; } - private long WriteFrameBody(Stream body, byte[] mask = null, long offset = 0) { - bool shouldMask = mask != null; + internal void WriteFrame(WsOpcode opcode, ReadOnlySpan body, bool isFinal) { + if(body == null) + throw new ArgumentNullException(nameof(body)); - int read; - byte[] buffer = new byte[BUFFER_SIZE]; - while((read = body.Read(buffer, 0, BUFFER_SIZE)) > 0) - offset = WriteFrameBody(buffer.AsSpan(0, read), mask, offset); - - return offset; - } - - private void WriteFrame(WsOpcode opcode, ReadOnlySpan body, bool isFinal) { byte[] mask = GenerateMask(); WriteFrameHeader(opcode, body.Length, isFinal, mask); if(body.Length > 0) WriteFrameBody(body, mask); - Stream.Flush(); } - private void Write(WsOpcode opcode, ReadOnlySpan body) { - if(body.Length > 0xFFFF) { - WriteFrame(opcode, body.Slice(0, 0xFFFF), false); - body = body.Slice(0xFFFF); + private void WriteData(WsOpcode opcode, ReadOnlySpan body) { + if(body == null) + throw new ArgumentNullException(nameof(body)); + if(BufferedSend != null) + throw new WsBufferedSendInSessionException(); - while(body.Length > 0xFFFF) { - WriteFrame(WsOpcode.DataContinue, body.Slice(0, 0xFFFF), false); - body = body.Slice(0xFFFF); + if(body.Length > ushort.MaxValue) { + WriteFrame(opcode, body.Slice(0, ushort.MaxValue), false); + body = body.Slice(ushort.MaxValue); + + while(body.Length > ushort.MaxValue) { + WriteFrame(WsOpcode.DataContinue, body.Slice(0, ushort.MaxValue), false); + body = body.Slice(ushort.MaxValue); } WriteFrame(WsOpcode.DataContinue, body, true); @@ -298,119 +272,74 @@ namespace Hamakaze.WebSocket { WriteFrame(opcode, body, true); } - private void Write(WsOpcode opcode, Stream stream) { - if(stream == null) - throw new ArgumentNullException(nameof(stream)); - if(!stream.CanRead) - throw new ArgumentException(@"Provided stream cannot be read.", nameof(stream)); - - int read; - byte[] buffer = new byte[BUFFER_SIZE]; - - while((read = stream.Read(buffer, 0, BUFFER_SIZE)) > 0) { - WriteFrame(opcode, buffer.AsSpan(0, read), false); - - if(opcode != WsOpcode.DataContinue) - opcode = WsOpcode.DataContinue; - } - - // this kinda fucking sucks - WriteFrame(WsOpcode.CtrlClose, ReadOnlySpan.Empty, true); - } - - private void Write(WsOpcode opcode, Stream stream, int length) { - if(stream == null) - throw new ArgumentNullException(nameof(stream)); - if(!stream.CanRead) - throw new ArgumentException(@"Provided stream cannot be read.", nameof(stream)); - - int read; - byte[] buffer = new byte[BUFFER_SIZE]; - - if(length > BUFFER_SIZE) { - int take = BUFFER_SIZE; - - while((read = stream.Read(buffer, 0, take)) > 0) { - WriteFrame(opcode, buffer.AsSpan(0, read), false); - - if(opcode != WsOpcode.DataContinue) - opcode = WsOpcode.DataContinue; - - length -= read; - if(take > length) - take = length; - } - - // feel like there'd be a better way to do this - // but i feel like assuming that any successful read with something - // still coming (read == BUFFER_SIZE) will bite me in the ass later somehow - WriteFrame(WsOpcode.CtrlClose, Span.Empty, true); - } else { - read = stream.Read(buffer, 0, BUFFER_SIZE); - if(read > 0) - WriteFrame(WsOpcode.DataBinary, buffer.AsSpan(0, read), true); - } - } - public void Send(string text) - => Write(WsOpcode.DataText, Encoding.UTF8.GetBytes(text)); + => WriteData(WsOpcode.DataText, Encoding.UTF8.GetBytes(text)); public void Send(ReadOnlySpan buffer) - => Write(WsOpcode.DataBinary, buffer); + => WriteData(WsOpcode.DataBinary, buffer); - public void Send(Stream source) - => Write(WsOpcode.DataBinary, source); - - public void Send(Stream source, int count) - => Write(WsOpcode.DataBinary, source, count); - - private void WriteControlFrame(WsOpcode opcode) { - WriteFrameHeader(opcode, 0, true, GenerateMask()); - Stream.Flush(); + public WsBufferedSend BeginBufferedSend() { + if(BufferedSend != null) + throw new WsBufferedSendAlreadyActiveException(); + return BufferedSend = new(this); } - private void WriteControlFrame(WsOpcode opcode, ReadOnlySpan buffer) { + // this method should only be called from within WsBufferedSend.Dispose + internal void EndBufferedSend() { + BufferedSend = null; + } + + private void WriteControl(WsOpcode opcode) + => WriteFrameHeader(opcode, 0, true, GenerateMask()); + + private void WriteControl(WsOpcode opcode, ReadOnlySpan buffer) { + if(buffer == null) + throw new ArgumentNullException(nameof(buffer)); if(buffer.Length > 125) throw new ArgumentException(@"Data may not be more than 125 bytes.", nameof(buffer)); byte[] mask = GenerateMask(); WriteFrameHeader(opcode, buffer.Length, true, mask); WriteFrameBody(buffer, mask); - Stream.Flush(); } public void Ping() - => WriteControlFrame(WsOpcode.CtrlPing); + => WriteControl(WsOpcode.CtrlPing); public void Ping(ReadOnlySpan buffer) - => WriteControlFrame(WsOpcode.CtrlPing, buffer); + => WriteControl(WsOpcode.CtrlPing, buffer); public void Pong() - => WriteControlFrame(WsOpcode.CtrlPong); + => WriteControl(WsOpcode.CtrlPong); public void Pong(ReadOnlySpan buffer) - => WriteControlFrame(WsOpcode.CtrlPong, buffer); + => WriteControl(WsOpcode.CtrlPong, buffer); public void CloseEmpty() { if(IsClosed) return; IsClosed = true; - WriteControlFrame(WsOpcode.CtrlClose); + WriteControl(WsOpcode.CtrlClose); } public void Close(ReadOnlySpan buffer) { + if(buffer == null) + throw new ArgumentNullException(nameof(buffer)); + if(IsClosed) return; IsClosed = true; - WriteControlFrame(WsOpcode.CtrlClose, buffer); + WriteControl(WsOpcode.CtrlClose, buffer); } public void Close(WsCloseReason code) => Close(WsUtils.FromU16((ushort)code)); public void Close(WsCloseReason code, ReadOnlySpan reason) { + if(reason == null) + throw new ArgumentNullException(nameof(reason)); if(reason.Length > 123) throw new ArgumentException(@"Reason may not be more than 123 bytes.", nameof(reason)); @@ -422,14 +351,11 @@ namespace Hamakaze.WebSocket { WriteFrameHeader(WsOpcode.CtrlClose, 2 + reason.Length, true, mask); WriteFrameBody(WsUtils.FromU16((ushort)code), mask); WriteFrameBody(reason, mask, 2); - Stream.Flush(); } public void Close(WsCloseReason code, string reason) { - if(string.IsNullOrEmpty(reason)) { - Close(code); - return; - } + if(reason == null) + throw new ArgumentNullException(nameof(reason)); int length = Encoding.UTF8.GetByteCount(reason); if(length > 123) @@ -443,7 +369,6 @@ namespace Hamakaze.WebSocket { WriteFrameHeader(WsOpcode.CtrlClose, 2 + reason.Length, true, mask); WriteFrameBody(WsUtils.FromU16((ushort)code), mask); WriteFrameBody(Encoding.UTF8.GetBytes(reason), mask, 2); - Stream.Flush(); } private bool IsDisposed; @@ -462,6 +387,8 @@ namespace Hamakaze.WebSocket { return; IsDisposed = true; + BufferedSend?.Dispose(); + FragmentStream?.Dispose(); Stream.Dispose(); } } diff --git a/Hamakaze/WebSocket/WsException.cs b/Hamakaze/WebSocket/WsException.cs index 50f0f5a..fc17bf6 100644 --- a/Hamakaze/WebSocket/WsException.cs +++ b/Hamakaze/WebSocket/WsException.cs @@ -26,4 +26,16 @@ namespace Hamakaze.WebSocket { public class WsInvalidControlFrameException : WsException { public WsInvalidControlFrameException(string variant) : base($@"An invalid WebSocket control frame was encountered: {variant}") { } } + + public class WsClientMutexFailedException : WsException { + public WsClientMutexFailedException() : base(@"Failed to acquire send mutex.") { } + } + + public class WsBufferedSendAlreadyActiveException : WsException { + public WsBufferedSendAlreadyActiveException() : base(@"A buffered websocket send is already in session.") { } + } + + public class WsBufferedSendInSessionException : WsException { + public WsBufferedSendInSessionException() : base(@"Cannot send data while a buffered send is in session.") { } + } } diff --git a/Hamakaze/WebSocket/WsPingMessage.cs b/Hamakaze/WebSocket/WsPingMessage.cs index 066d199..15548d1 100644 --- a/Hamakaze/WebSocket/WsPingMessage.cs +++ b/Hamakaze/WebSocket/WsPingMessage.cs @@ -1,7 +1,7 @@ using System; namespace Hamakaze.WebSocket { - public class WsPingMessage : WsMessage { + public class WsPingMessage : WsMessage, IHasBinaryData { public byte[] Data { get; } public WsPingMessage(byte[] data) { diff --git a/Hamakaze/WebSocket/WsPongMessage.cs b/Hamakaze/WebSocket/WsPongMessage.cs index 54d44bd..96218e7 100644 --- a/Hamakaze/WebSocket/WsPongMessage.cs +++ b/Hamakaze/WebSocket/WsPongMessage.cs @@ -1,7 +1,7 @@ using System; namespace Hamakaze.WebSocket { - public class WsPongMessage : WsMessage { + public class WsPongMessage : WsMessage, IHasBinaryData { public byte[] Data { get; } public WsPongMessage(byte[] data) { diff --git a/Hamakaze/WebSocket/WsTextMessage.cs b/Hamakaze/WebSocket/WsTextMessage.cs index fb41d76..13b050d 100644 --- a/Hamakaze/WebSocket/WsTextMessage.cs +++ b/Hamakaze/WebSocket/WsTextMessage.cs @@ -10,5 +10,11 @@ namespace Hamakaze.WebSocket { else Text = string.Empty; } + + public static implicit operator string(WsTextMessage msg) => msg.Text; + + public override string ToString() { + return Text; + } } }