From 01a5dbccd65a9bf42aa9290ddda400b86229d14b Mon Sep 17 00:00:00 2001 From: flashwave Date: Wed, 6 Apr 2022 13:59:43 +0200 Subject: [PATCH 1/6] Bump to .NET 6.0. --- Hamakaze/Hamakaze.csproj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Hamakaze/Hamakaze.csproj b/Hamakaze/Hamakaze.csproj index f208d30..dbc1517 100644 --- a/Hamakaze/Hamakaze.csproj +++ b/Hamakaze/Hamakaze.csproj @@ -1,7 +1,7 @@ - net5.0 + net6.0 From 5f479a17e0ea21249b05e1abb0cfa8c68ea4a899 Mon Sep 17 00:00:00 2001 From: flashwave Date: Thu, 7 Apr 2022 20:59:52 +0200 Subject: [PATCH 2/6] Fixed connection issues. --- Hamakaze/HttpConnection.cs | 14 +++++++++----- Hamakaze/HttpException.cs | 7 +++++++ Hamakaze/HttpRequestMessage.cs | 3 +++ Hamakaze/HttpResponseMessage.cs | 2 +- Hamakaze/HttpTask.cs | 5 +++-- 5 files changed, 23 insertions(+), 8 deletions(-) diff --git a/Hamakaze/HttpConnection.cs b/Hamakaze/HttpConnection.cs index 8bb90d0..d5b2c9e 100644 --- a/Hamakaze/HttpConnection.cs +++ b/Hamakaze/HttpConnection.cs @@ -4,7 +4,6 @@ using System.Net; using System.Net.Security; using System.Net.Sockets; using System.Security.Authentication; -using System.Threading; namespace Hamakaze { public class HttpConnection : IDisposable { @@ -18,9 +17,9 @@ namespace Hamakaze { public string Host { get; } public bool IsSecure { get; } - public bool HasTimedOut => MaxRequests == 0 || (DateTimeOffset.Now - LastOperation) > MaxIdle; + public bool HasTimedOut => MaxRequests < 1 || (DateTimeOffset.Now - LastOperation) > MaxIdle; - public int MaxRequests { get; set; } = -1; + public int? MaxRequests { get; set; } = null; public TimeSpan MaxIdle { get; set; } = TimeSpan.MaxValue; public DateTimeOffset LastOperation { get; private set; } = DateTimeOffset.Now; @@ -45,14 +44,19 @@ namespace Hamakaze { if(IsSecure) { SslStream = new SslStream(NetworkStream, false, (s, ce, ch, e) => e == SslPolicyErrors.None, null); Stream = SslStream; - SslStream.AuthenticateAsClient(Host, null, SslProtocols.Tls11 | SslProtocols.Tls12 | SslProtocols.Tls13, true); + SslStream.AuthenticateAsClient( + Host, + null, + SslProtocols.Tls11 | SslProtocols.Tls12 | SslProtocols.Tls13, + true + ); } else Stream = NetworkStream; } public void MarkUsed() { LastOperation = DateTimeOffset.Now; - if(MaxRequests > 0) + if(MaxRequests != null) --MaxRequests; } diff --git a/Hamakaze/HttpException.cs b/Hamakaze/HttpException.cs index d6e0bce..6219e57 100644 --- a/Hamakaze/HttpException.cs +++ b/Hamakaze/HttpException.cs @@ -12,6 +12,13 @@ namespace Hamakaze { public HttpConnectionManagerLockException() : base(@"Failed to lock the connection manager in time.") { } } + public class HttpRequestMessageException : HttpException { + public HttpRequestMessageException(string message) : base(message) { } + } + public class HttpRequestMessageStreamException : HttpRequestMessageException { + public HttpRequestMessageStreamException() : base(@"Provided Stream is not writable.") { } + } + public class HttpTaskException : HttpException { public HttpTaskException(string message) : base(message) { } } diff --git a/Hamakaze/HttpRequestMessage.cs b/Hamakaze/HttpRequestMessage.cs index 6ce3ce3..77a7907 100644 --- a/Hamakaze/HttpRequestMessage.cs +++ b/Hamakaze/HttpRequestMessage.cs @@ -157,6 +157,9 @@ namespace Hamakaze { } public void WriteTo(Stream stream, Action onProgress = null) { + if(!stream.CanWrite) + throw new HttpRequestMessageStreamException(); + using(StreamWriter sw = new(stream, new ASCIIEncoding(), leaveOpen: true)) { sw.NewLine = "\r\n"; sw.Write(Method); diff --git a/Hamakaze/HttpResponseMessage.cs b/Hamakaze/HttpResponseMessage.cs index c041e93..c401ed1 100644 --- a/Hamakaze/HttpResponseMessage.cs +++ b/Hamakaze/HttpResponseMessage.cs @@ -127,7 +127,7 @@ namespace Hamakaze { for(; ; ) { byt = stream.ReadByte(); if(byt == -1 && ms.Length == 0) - return null; + throw new IOException(@"readLine: There is no data."); ms.WriteByte((byte)byt); diff --git a/Hamakaze/HttpTask.cs b/Hamakaze/HttpTask.cs index ddcd212..e3ae06e 100644 --- a/Hamakaze/HttpTask.cs +++ b/Hamakaze/HttpTask.cs @@ -131,7 +131,7 @@ namespace Hamakaze { try { Request.WriteTo(Connection.Stream, (p, t) => OnUploadProgress?.Invoke(this, p, t)); break; - } catch(IOException ex) { + } catch(HttpRequestMessageStreamException ex) { Connection.Dispose(); Connection = Connections.GetConnection(Request.Host, endPoint, Request.IsSecure); @@ -162,7 +162,8 @@ namespace Hamakaze { return; } - if(Response.Connection == HttpConnectionHeader.CLOSE) + if(Response.Connection == HttpConnectionHeader.CLOSE + || Response.ProtocolVersion.CompareTo(@"1.1") < 0) Connection.Dispose(); if(Response == null) Error(new HttpTaskRequestFailedException()); From b76202695824dd5ce034f8ec11d2ca007931dec5 Mon Sep 17 00:00:00 2001 From: flashwave Date: Sun, 10 Apr 2022 00:34:05 +0200 Subject: [PATCH 3/6] WIP WebSocket implementation. --- Hamakaze/Headers/HttpConnectionHeader.cs | 3 +- Hamakaze/HttpClient.cs | 91 ++++- Hamakaze/HttpConnection.cs | 30 +- Hamakaze/HttpException.cs | 26 ++ Hamakaze/HttpRequestMessage.cs | 3 +- Hamakaze/HttpResponseMessage.cs | 4 +- Hamakaze/HttpTask.cs | 130 +++---- Hamakaze/WebSocket/WsBinaryMessage.cs | 11 + Hamakaze/WebSocket/WsBufferedSend.cs | 31 ++ Hamakaze/WebSocket/WsClient.cs | 138 +++++++ Hamakaze/WebSocket/WsCloseMessage.cs | 36 ++ Hamakaze/WebSocket/WsCloseReason.cs | 16 + Hamakaze/WebSocket/WsConnection.cs | 468 +++++++++++++++++++++++ Hamakaze/WebSocket/WsException.cs | 29 ++ Hamakaze/WebSocket/WsMessage.cs | 5 + Hamakaze/WebSocket/WsOpcode.cs | 13 + Hamakaze/WebSocket/WsPingMessage.cs | 11 + Hamakaze/WebSocket/WsPongMessage.cs | 11 + Hamakaze/WebSocket/WsTextMessage.cs | 14 + Hamakaze/WebSocket/WsUtils.cs | 38 ++ 20 files changed, 1019 insertions(+), 89 deletions(-) create mode 100644 Hamakaze/WebSocket/WsBinaryMessage.cs create mode 100644 Hamakaze/WebSocket/WsBufferedSend.cs create mode 100644 Hamakaze/WebSocket/WsClient.cs create mode 100644 Hamakaze/WebSocket/WsCloseMessage.cs create mode 100644 Hamakaze/WebSocket/WsCloseReason.cs create mode 100644 Hamakaze/WebSocket/WsConnection.cs create mode 100644 Hamakaze/WebSocket/WsException.cs create mode 100644 Hamakaze/WebSocket/WsMessage.cs create mode 100644 Hamakaze/WebSocket/WsOpcode.cs create mode 100644 Hamakaze/WebSocket/WsPingMessage.cs create mode 100644 Hamakaze/WebSocket/WsPongMessage.cs create mode 100644 Hamakaze/WebSocket/WsTextMessage.cs create mode 100644 Hamakaze/WebSocket/WsUtils.cs diff --git a/Hamakaze/Headers/HttpConnectionHeader.cs b/Hamakaze/Headers/HttpConnectionHeader.cs index 50b1318..ff406ec 100644 --- a/Hamakaze/Headers/HttpConnectionHeader.cs +++ b/Hamakaze/Headers/HttpConnectionHeader.cs @@ -9,9 +9,10 @@ namespace Hamakaze.Headers { public const string CLOSE = @"close"; public const string KEEP_ALIVE = @"keep-alive"; + public const string UPGRADE = @"upgrade"; public HttpConnectionHeader(string mode) { - Value = mode ?? throw new ArgumentNullException(nameof(mode)); + Value = (mode ?? throw new ArgumentNullException(nameof(mode))).ToLowerInvariant(); } } } diff --git a/Hamakaze/HttpClient.cs b/Hamakaze/HttpClient.cs index be009b5..24f05d3 100644 --- a/Hamakaze/HttpClient.cs +++ b/Hamakaze/HttpClient.cs @@ -1,14 +1,22 @@ using Hamakaze.Headers; +using Hamakaze.WebSocket; using System; using System.Collections.Generic; +using System.Linq; +using System.Security.Cryptography; +using System.Text; namespace Hamakaze { public class HttpClient : IDisposable { public const string PRODUCT_STRING = @"HMKZ"; public const string VERSION_MAJOR = @"1"; - public const string VERSION_MINOR = @"0"; + public const string VERSION_MINOR = @"1"; public const string USER_AGENT = PRODUCT_STRING + @"/" + VERSION_MAJOR + @"." + VERSION_MINOR; + private const string WS_GUID = @"258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; + private const string WS_PROTO = @"websocket"; + private const int WS_RNG = 16; + private static HttpClient InstanceValue { get; set; } public static HttpClient Instance { get { @@ -47,7 +55,8 @@ namespace Hamakaze { request.UserAgent = DefaultUserAgent; if(!request.HasHeader(HttpAcceptEncodingHeader.NAME)) request.AcceptedEncodings = AcceptedEncodings; - request.Connection = ReuseConnections ? HttpConnectionHeader.KEEP_ALIVE : HttpConnectionHeader.CLOSE; + if(!request.HasHeader(HttpConnectionHeader.NAME)) + request.Connection = ReuseConnections ? HttpConnectionHeader.KEEP_ALIVE : HttpConnectionHeader.CLOSE; HttpTask task = new(Connections, request, disposeRequest, disposeResponse); @@ -85,6 +94,84 @@ namespace Hamakaze { RunTask(CreateTask(request, onComplete, onError, onCancel, onDownloadProgress, onUploadProgress, onStateChange, disposeRequest, disposeResponse)); } + public void CreateWsClient( + string url, + Action onOpen, + Action onMessage, + Action onError, + IEnumerable protocols = null + ) { + CreateWsConnection( + url, + conn => onOpen(new WsClient(conn, onMessage, onError)), + onError, + protocols + ); + } + + public void CreateWsConnection( + string url, + Action onOpen, + Action onError, + IEnumerable protocols = null + ) { + string key = Convert.ToBase64String(RandomNumberGenerator.GetBytes(WS_RNG)); + + HttpRequestMessage req = new HttpRequestMessage(@"GET", url); + req.Connection = HttpConnectionHeader.UPGRADE; + req.SetHeader(@"Cache-Control", @"no-cache"); + req.SetHeader(@"Upgrade", WS_PROTO); + req.SetHeader(@"Sec-WebSocket-Key", key); + req.SetHeader(@"Sec-WebSocket-Version", @"13"); + + if(protocols?.Any() == true) + req.SetHeader(@"Sec-WebSocket-Protocol", string.Join(@", ", protocols)); + + SendRequest( + req, + (t, res) => { + try { + if(res.ProtocolVersion.CompareTo(@"1.1") < 0) + throw new HttpUpgradeProtocolVersionException(@"1.1", res.ProtocolVersion); + + if(res.StatusCode != 101) + throw new HttpUpgradeUnexpectedStatusException(res.StatusCode); + + if(res.Connection != HttpConnectionHeader.UPGRADE) + throw new HttpUpgradeUnexpectedHeaderException( + @"Connection", + HttpConnectionHeader.UPGRADE, + res.Connection + ); + + string hUpgrade = res.GetHeaderLine(@"Upgrade"); + if(hUpgrade != WS_PROTO) + throw new HttpUpgradeUnexpectedHeaderException(@"Upgrade", WS_PROTO, hUpgrade); + + string serverHashStr = res.GetHeaderLine(@"Sec-WebSocket-Accept"); + byte[] expectHash = SHA1.HashData(Encoding.ASCII.GetBytes(key + WS_GUID)); + + if(string.IsNullOrWhiteSpace(serverHashStr)) + throw new HttpUpgradeUnexpectedHeaderException( + @"Sec-WebSocket-Accept", + Convert.ToBase64String(expectHash), + serverHashStr + ); + + byte[] givenHash = Convert.FromBase64String(serverHashStr.Trim()); + + if(!expectHash.SequenceEqual(givenHash)) + throw new HttpUpgradeInvalidHashException(Convert.ToBase64String(expectHash), serverHashStr); + + onOpen(t.Connection.ToWebSocket()); + } catch(Exception ex) { + onError(ex); + } + }, + (t, ex) => onError(ex) + ); + } + public static void Send( HttpRequestMessage request, Action onComplete = null, diff --git a/Hamakaze/HttpConnection.cs b/Hamakaze/HttpConnection.cs index d5b2c9e..509a6b6 100644 --- a/Hamakaze/HttpConnection.cs +++ b/Hamakaze/HttpConnection.cs @@ -4,15 +4,14 @@ using System.Net; using System.Net.Security; using System.Net.Sockets; using System.Security.Authentication; +using Hamakaze.WebSocket; namespace Hamakaze { public class HttpConnection : IDisposable { public IPEndPoint EndPoint { get; } public Stream Stream { get; } - public Socket Socket { get; } - public NetworkStream NetworkStream { get; } - public SslStream SslStream { get; } + private Socket Socket { get; } public string Host { get; } public bool IsSecure { get; } @@ -24,6 +23,7 @@ namespace Hamakaze { public DateTimeOffset LastOperation { get; private set; } = DateTimeOffset.Now; public bool InUse { get; private set; } + public bool HasUpgraded { get; private set; } public HttpConnection(string host, IPEndPoint endPoint, bool secure) { Host = host ?? throw new ArgumentNullException(nameof(host)); @@ -39,19 +39,19 @@ namespace Hamakaze { }; Socket.Connect(endPoint); - NetworkStream = new NetworkStream(Socket, true); + Stream stream = new NetworkStream(Socket, true); if(IsSecure) { - SslStream = new SslStream(NetworkStream, false, (s, ce, ch, e) => e == SslPolicyErrors.None, null); - Stream = SslStream; - SslStream.AuthenticateAsClient( + SslStream sslStream = new SslStream(stream, false, (s, ce, ch, e) => e == SslPolicyErrors.None, null); + Stream = sslStream; + sslStream.AuthenticateAsClient( Host, null, SslProtocols.Tls11 | SslProtocols.Tls12 | SslProtocols.Tls13, true ); } else - Stream = NetworkStream; + Stream = stream; } public void MarkUsed() { @@ -61,13 +61,21 @@ namespace Hamakaze { } public bool Acquire() { - return !InUse && (InUse = true); + return !HasUpgraded && !InUse && (InUse = true); } public void Release() { InUse = false; } + public WsConnection ToWebSocket() { + if(HasUpgraded) + throw new HttpConnectionAlreadyUpgradedException(); + HasUpgraded = true; + + return new WsConnection(Stream); + } + private bool IsDisposed; ~HttpConnection() => DoDispose(); @@ -79,7 +87,9 @@ namespace Hamakaze { if(IsDisposed) return; IsDisposed = true; - Stream.Dispose(); + + if(!HasUpgraded) + Stream.Dispose(); } } } diff --git a/Hamakaze/HttpException.cs b/Hamakaze/HttpException.cs index 6219e57..726b9ab 100644 --- a/Hamakaze/HttpException.cs +++ b/Hamakaze/HttpException.cs @@ -5,6 +5,32 @@ namespace Hamakaze { public HttpException(string message) : base(message) { } } + public class HttpUpgradeException : HttpException { + public HttpUpgradeException(string message) : base(message) { } + } + public class HttpUpgradeProtocolVersionException : HttpUpgradeException { + public HttpUpgradeProtocolVersionException(string expectedVersion, string givenVersion) + : base($@"Server HTTP version ({givenVersion}) is lower than what is expected {expectedVersion}.") { } + } + public class HttpUpgradeUnexpectedStatusException : HttpUpgradeException { + public HttpUpgradeUnexpectedStatusException(int statusCode) : base($@"Expected HTTP status code 101, got {statusCode}.") { } + } + public class HttpUpgradeUnexpectedHeaderException : HttpUpgradeException { + public HttpUpgradeUnexpectedHeaderException(string header, string expected, string given) + : base($@"Unexpected {header} header value ""{given}"", expected ""{expected}"".") { } + } + public class HttpUpgradeInvalidHashException : HttpUpgradeException { + public HttpUpgradeInvalidHashException(string expected, string given) + : base($@"Server sent invalid hash ""{given}"", expected ""{expected}"".") { } + } + + public class HttpConnectionException : HttpException { + public HttpConnectionException(string message) : base(message) { } + } + public class HttpConnectionAlreadyUpgradedException : HttpConnectionException { + public HttpConnectionAlreadyUpgradedException() : base(@"This connection has already been upgraded.") { } + } + public class HttpConnectionManagerException : HttpException { public HttpConnectionManagerException(string message) : base(message) { } } diff --git a/Hamakaze/HttpRequestMessage.cs b/Hamakaze/HttpRequestMessage.cs index 77a7907..ba01492 100644 --- a/Hamakaze/HttpRequestMessage.cs +++ b/Hamakaze/HttpRequestMessage.cs @@ -92,7 +92,8 @@ namespace Hamakaze { public HttpRequestMessage(string method, Uri uri) { Method = method ?? throw new ArgumentNullException(nameof(method)); RequestTarget = uri.PathAndQuery; - IsSecure = uri.Scheme.Equals(@"https", StringComparison.InvariantCultureIgnoreCase); + IsSecure = uri.Scheme.Equals(@"https", StringComparison.InvariantCultureIgnoreCase) + || uri.Scheme.Equals(@"wss", StringComparison.InvariantCultureIgnoreCase); Host = uri.Host; ushort defaultPort = (IsSecure ? HTTPS : HTTP); Port = uri.Port == -1 ? defaultPort : (ushort)uri.Port; diff --git a/Hamakaze/HttpResponseMessage.cs b/Hamakaze/HttpResponseMessage.cs index c401ed1..b2d7abb 100644 --- a/Hamakaze/HttpResponseMessage.cs +++ b/Hamakaze/HttpResponseMessage.cs @@ -124,7 +124,7 @@ namespace Hamakaze { using MemoryStream ms = new(); int byt; ushort lastTwo = 0; - for(; ; ) { + for(;;) { byt = stream.ReadByte(); if(byt == -1 && ms.Length == 0) throw new IOException(@"readLine: There is no data."); @@ -238,11 +238,9 @@ namespace Hamakaze { readBuffer(chunkLength); readLine(); } - readLine(); } else if(contentLength != 0) { body = new MemoryStream(); readBuffer(contentLength); - readLine(); } if(body != null) diff --git a/Hamakaze/HttpTask.cs b/Hamakaze/HttpTask.cs index e3ae06e..5bd604f 100644 --- a/Hamakaze/HttpTask.cs +++ b/Hamakaze/HttpTask.cs @@ -1,7 +1,6 @@ using Hamakaze.Headers; using System; using System.Collections.Generic; -using System.IO; using System.Linq; using System.Net; @@ -25,7 +24,7 @@ namespace Hamakaze { private HttpConnectionManager Connections { get; } private IEnumerable Addresses { get; set; } - private HttpConnection Connection { get; set; } + public HttpConnection Connection { get; private set; } public bool DisposeRequest { get; set; } public bool DisposeResponse { get; set; } @@ -70,103 +69,90 @@ namespace Hamakaze { if(IsCancelled) return false; - switch(State) { - case TaskState.Initial: - State = TaskState.Lookup; - OnStateChange?.Invoke(this, State); - DoLookup(); - break; - case TaskState.Lookup: - State = TaskState.Request; - OnStateChange?.Invoke(this, State); - DoRequest(); - break; - case TaskState.Request: - State = TaskState.Response; - OnStateChange?.Invoke(this, State); - DoResponse(); - break; - case TaskState.Response: - State = TaskState.Finished; - OnStateChange?.Invoke(this, State); - OnComplete?.Invoke(this, Response); - if(DisposeResponse) - Response?.Dispose(); - if(DisposeRequest) - Request?.Dispose(); - return false; - default: - Error(new HttpTaskInvalidStateException()); - return false; + try { + switch(State) { + case TaskState.Initial: + State = TaskState.Lookup; + OnStateChange?.Invoke(this, State); + DoLookup(); + break; + case TaskState.Lookup: + State = TaskState.Request; + OnStateChange?.Invoke(this, State); + DoRequest(); + break; + case TaskState.Request: + State = TaskState.Response; + OnStateChange?.Invoke(this, State); + DoResponse(); + break; + case TaskState.Response: + State = TaskState.Finished; + OnStateChange?.Invoke(this, State); + OnComplete?.Invoke(this, Response); + if(DisposeResponse) + Response?.Dispose(); + if(DisposeRequest) + Request?.Dispose(); + return false; + default: + throw new HttpTaskInvalidStateException(); + } + } catch(Exception ex) { + Error(ex); + return false; } return true; } private void DoLookup() { - try { - Addresses = Dns.GetHostAddresses(Request.Host); - } catch(Exception ex) { - Error(ex); - return; - } + Addresses = Dns.GetHostAddresses(Request.Host); if(!Addresses.Any()) - Error(new HttpTaskNoAddressesException()); + throw new HttpTaskNoAddressesException(); } private void DoRequest() { - Exception exception = null; + Queue addresses = new(Addresses); - try { - foreach(IPAddress addr in Addresses) { - int tries = 0; - IPEndPoint endPoint = new(addr, Request.Port); + while(addresses.TryDequeue(out IPAddress addr)) { + int tries = 0; + IPEndPoint endPoint = new(addr, Request.Port); - exception = null; + Connection = Connections.GetConnection(Request.Host, endPoint, Request.IsSecure); + + retry: + ++tries; + try { + Request.WriteTo(Connection.Stream, (p, t) => OnUploadProgress?.Invoke(this, p, t)); + break; + } catch(HttpRequestMessageStreamException) { + Connection.Dispose(); Connection = Connections.GetConnection(Request.Host, endPoint, Request.IsSecure); - retry: - ++tries; - try { - Request.WriteTo(Connection.Stream, (p, t) => OnUploadProgress?.Invoke(this, p, t)); - break; - } catch(HttpRequestMessageStreamException ex) { - Connection.Dispose(); - Connection = Connections.GetConnection(Request.Host, endPoint, Request.IsSecure); + if(tries < 2) + goto retry; - if(tries < 2) - goto retry; - - exception = ex; - continue; - } finally { - Connection.MarkUsed(); - } + if(!addresses.Any()) + throw; + } finally { + Connection.MarkUsed(); } - } catch(Exception ex) { - Error(ex); } - if(exception != null) - Error(exception); - else if(Connection == null) - Error(new HttpTaskNoConnectionException()); + if(Connection == null) + throw new HttpTaskNoConnectionException(); } private void DoResponse() { - try { - Response = HttpResponseMessage.ReadFrom(Connection.Stream, (p, t) => OnDownloadProgress?.Invoke(this, p, t)); - } catch(Exception ex) { - Error(ex); - return; - } + Response = HttpResponseMessage.ReadFrom(Connection.Stream, (p, t) => OnDownloadProgress?.Invoke(this, p, t)); if(Response.Connection == HttpConnectionHeader.CLOSE || Response.ProtocolVersion.CompareTo(@"1.1") < 0) Connection.Dispose(); if(Response == null) - Error(new HttpTaskRequestFailedException()); + throw new HttpTaskRequestFailedException(); HttpKeepAliveHeader hkah = Response.Headers.Where(x => x.Name == HttpKeepAliveHeader.NAME).Cast().FirstOrDefault(); if(hkah != null) { diff --git a/Hamakaze/WebSocket/WsBinaryMessage.cs b/Hamakaze/WebSocket/WsBinaryMessage.cs new file mode 100644 index 0000000..172ba4f --- /dev/null +++ b/Hamakaze/WebSocket/WsBinaryMessage.cs @@ -0,0 +1,11 @@ +using System; + +namespace Hamakaze.WebSocket { + public class WsBinaryMessage : WsMessage { + public byte[] Data { get; } + + public WsBinaryMessage(byte[] data) { + Data = data ?? Array.Empty(); + } + } +} diff --git a/Hamakaze/WebSocket/WsBufferedSend.cs b/Hamakaze/WebSocket/WsBufferedSend.cs new file mode 100644 index 0000000..b5e7c2d --- /dev/null +++ b/Hamakaze/WebSocket/WsBufferedSend.cs @@ -0,0 +1,31 @@ +using System; + +namespace Hamakaze.WebSocket { + public class WsBufferedSend : IDisposable { + private WsConnection Connection { get; } + + internal WsBufferedSend(WsConnection conn) { + Connection = conn ?? throw new ArgumentNullException(nameof(conn)); + } + + // + + private bool IsDisposed; + + ~WsBufferedSend() { + DoDispose(); + } + + public void Dispose() { + DoDispose(); + GC.SuppressFinalize(this); + } + + private void DoDispose() { + if(IsDisposed) + return; + IsDisposed = true; + + } + } +} diff --git a/Hamakaze/WebSocket/WsClient.cs b/Hamakaze/WebSocket/WsClient.cs new file mode 100644 index 0000000..5bb788e --- /dev/null +++ b/Hamakaze/WebSocket/WsClient.cs @@ -0,0 +1,138 @@ +using System; +using System.Threading; + +namespace Hamakaze.WebSocket { + public class WsClient : IDisposable { + public WsConnection Connection { get; } + public bool IsRunning { get; private set; } = true; + + private Thread ReadThread { get; } + private Action MessageHandler { get; } + private Action ExceptionHandler { get; } + + private Mutex SendLock { get; } + + public WsClient( + WsConnection connection, + Action messageHandler, + Action exceptionHandler + ) { + Connection = connection ?? throw new ArgumentNullException(nameof(connection)); + MessageHandler = messageHandler ?? throw new ArgumentNullException(nameof(messageHandler)); + ExceptionHandler = exceptionHandler ?? throw new ArgumentNullException(nameof(exceptionHandler)); + + SendLock = new(); + + ReadThread = new(ReadThreadBody) { IsBackground = true }; + ReadThread.Start(); + } + + private void ReadThreadBody() { + try { + while(IsRunning) + MessageHandler(Connection.Receive()); + } catch(Exception ex) { + IsRunning = false; + ExceptionHandler(ex); + } + } + + public void Send(string text) { + Connection.Send(text); + } + + public void Send(object obj) { + if(obj == null) + throw new ArgumentNullException(nameof(obj)); + + Connection.Send(obj.ToString()); + } + + public void Send(ReadOnlySpan data) { + Connection.Send(data); + } + + public void Send(byte[] buffer, int offset, int count) { + if(buffer == null) + throw new ArgumentNullException(nameof(buffer)); + + Connection.Send(buffer.AsSpan(offset, count)); + } + + public void Ping() { + Connection.Ping(); + } + + public void Ping(ReadOnlySpan data) { + Connection.Ping(data); + } + + public void Ping(byte[] buffer, int offset, int length) { + if(buffer == null) + throw new ArgumentNullException(nameof(buffer)); + + Connection.Ping(buffer.AsSpan(offset, length)); + } + + public void Pong() { + Connection.Pong(); + } + + public void Pong(ReadOnlySpan data) { + Connection.Pong(data); + } + + public void Pong(byte[] buffer, int offset, int length) { + if(buffer == null) + throw new ArgumentNullException(nameof(buffer)); + + Pong(buffer.AsSpan(offset, length)); + } + + public void Close() { + Connection.Close(WsCloseReason.NormalClosure); + } + + public void CloseEmpty() { + Connection.CloseEmpty(); + } + + public void Close(string reason) { + Connection.Close(WsCloseReason.NormalClosure, reason); + } + + public void Close(byte[] buffer, int offset, int length) { + if(buffer == null) + throw new ArgumentNullException(nameof(buffer)); + + Connection.Close(buffer.AsSpan(offset, length)); + } + + public void Close(WsCloseReason code, byte[] buffer, int offset, int length) { + if(buffer == null) + throw new ArgumentNullException(nameof(buffer)); + + Connection.Close(code, buffer.AsSpan(offset, length)); + } + + private bool IsDisposed; + + ~WsClient() { + DoDispose(); + } + + public void Dispose() { + DoDispose(); + GC.SuppressFinalize(this); + } + + private void DoDispose() { + if(IsDisposed) + return; + IsDisposed = true; + + SendLock.Dispose(); + Connection.Dispose(); + } + } +} diff --git a/Hamakaze/WebSocket/WsCloseMessage.cs b/Hamakaze/WebSocket/WsCloseMessage.cs new file mode 100644 index 0000000..e0a584a --- /dev/null +++ b/Hamakaze/WebSocket/WsCloseMessage.cs @@ -0,0 +1,36 @@ +using System; +using System.Text; + +namespace Hamakaze.WebSocket { + public class WsCloseMessage : WsMessage { + public WsCloseReason Reason { get; } + public string ReasonPhrase { get; } + public byte[] Data { get; } + + public WsCloseMessage(WsCloseReason reason) { + Reason = reason; + ReasonPhrase = string.Empty; + Data = Array.Empty(); + } + + public WsCloseMessage(byte[] data) { + if(data == null) { + Reason = WsCloseReason.NoStatus; + ReasonPhrase = string.Empty; + Data = Array.Empty(); + } else { + Reason = (WsCloseReason)WsUtils.ToU16(data); + Data = data; + + if(data.Length > 2) + try { + ReasonPhrase = Encoding.UTF8.GetString(data, 2, data.Length - 2); + } catch { + ReasonPhrase = string.Empty; + } + else + ReasonPhrase = string.Empty; + } + } + } +} diff --git a/Hamakaze/WebSocket/WsCloseReason.cs b/Hamakaze/WebSocket/WsCloseReason.cs new file mode 100644 index 0000000..5b6e5a0 --- /dev/null +++ b/Hamakaze/WebSocket/WsCloseReason.cs @@ -0,0 +1,16 @@ +namespace Hamakaze.WebSocket { + public enum WsCloseReason : ushort { + NormalClosure = 1000, + GoingAway = 1001, + ProtocolError = 1002, + InvalidData = 1003, + NoStatus = 1005, // virtual -> no data in close frame + AbnormalClosure = 1006, // virtual -> connection dropped + MalformedData = 1007, + PolicyViolation = 1008, + FrameTooLarge = 1009, + MissingExtension = 1010, + UnexpectedCondition = 1011, + TlsHandshakeFailed = 1015, // virtual -> obvious + } +} diff --git a/Hamakaze/WebSocket/WsConnection.cs b/Hamakaze/WebSocket/WsConnection.cs new file mode 100644 index 0000000..164dc13 --- /dev/null +++ b/Hamakaze/WebSocket/WsConnection.cs @@ -0,0 +1,468 @@ +using System; +using System.IO; +using System.Net.Security; +using System.Security.Cryptography; +using System.Text; + +// TODO: optimisations with newer .net feature to reduce memory copying +// i think we're generally aware of how much data we're shoving around +// so memorystream can be considered overkill + +// Should there be internal mutexing on the socket? (leaning towards no) + +// Should all external stream handling be moved to WsClient? +// - IDEA: Buffered send "session" class. +// Would require exposing the raw Write methods +// but i suppose that's what "internal" exists for + +namespace Hamakaze.WebSocket { + public class WsConnection : IDisposable { + public Stream Stream { get; } + + public bool IsSecure { get; } + public bool IsClosed { get; private set; } + + private const int BUFFER_SIZE = 0x2000; + private const byte MASK_FLAG = 0x80; + private const int MASK_SIZE = 4; + + private WsOpcode FragmentedType = 0; + private MemoryStream FragmentedStream; + + public WsConnection(Stream stream) { + Stream = stream ?? throw new ArgumentNullException(nameof(stream)); + IsSecure = stream is SslStream; + } + + private static byte[] GenerateMask() { + return RandomNumberGenerator.GetBytes(MASK_SIZE); + } + + private void StrictRead(byte[] buffer, int offset, int length) { + int read = Stream.Read(buffer, offset, length); + if(read < length) + throw new Exception(@"Was unable to read the requested amount of data."); + } + + private (WsOpcode opcode, long length, bool isFinal, byte[] mask) ReadFrameHeader() { + byte[] buffer = new byte[8]; + StrictRead(buffer, 0, 2); + + WsOpcode opcode = (WsOpcode)(buffer[0] & 0x0F); + bool isFinal = (buffer[0] & (byte)WsOpcode.FlagFinal) > 0; + + if(opcode >= WsOpcode.CtrlClose && !isFinal) + throw new WsInvalidOpcodeException((WsOpcode)buffer[0]); + + bool isControl = (opcode & WsOpcode.CtrlClose) > 0; + + if(isControl && !isFinal) + throw new WsInvalidControlFrameException(@"fragmented"); + + bool isMasked = (buffer[1] & MASK_FLAG) > 0; + + // this may look stupid and you'd be correct but it's better than the stack of casts + // i'd otherwise have to do otherwise because c# converts everything back to int32 + buffer[1] &= 0x7F; + long length = buffer[1]; + + if(length == 126) { + StrictRead(buffer, 0, 2); + length = WsUtils.ToU16(buffer); + } else if(length == 127) { + StrictRead(buffer, 0, 8); + length = WsUtils.ToI64(buffer); + } + + if(isControl && length > 125) + throw new WsInvalidControlFrameException(@"too large"); + + // should there be a sanity check on the length of frames? + // i seriously don't understand the rationale behind both + // having a framing system but then also supporting frame lengths + // of 2^63, feels like 2^16 per frame would be a fine max. + if(length < 0 || length > long.MaxValue) + throw new WsInvalidFrameSizeException(length); + + byte[] mask = null; + + if(isMasked) { + StrictRead(buffer, 0, MASK_SIZE); + mask = buffer; + } + + return (opcode, length, isFinal, mask); + } + + private long ReadFrameBody(Stream target, long length, byte[] mask, long offset = 0) { + if(target == null) + throw new ArgumentNullException(nameof(target)); + if(!target.CanWrite) + throw new ArgumentException(@"Target stream is not writable.", nameof(target)); + + bool isMasked = mask != null; + + int read; + int take = length > BUFFER_SIZE ? BUFFER_SIZE : (int)length; + byte[] buffer = new byte[take]; + + while(length > 0) { + read = Stream.Read(buffer, 0, take); + + if(isMasked) + for(int i = 0; i < read; ++i) + buffer[i] ^= mask[offset++ % MASK_SIZE]; + + target.Write(buffer, 0, read); + + offset += read; + length -= read; + + if(take > length) + take = (int)length; + } + + return offset; + } + + private WsMessage ReadFrame() { + (WsOpcode opcode, long length, bool isFinal, byte[] mask) = ReadFrameHeader(); + + if(opcode is not WsOpcode.DataContinue + and not WsOpcode.DataBinary + and not WsOpcode.DataText + and not WsOpcode.CtrlClose + and not WsOpcode.CtrlPing + and not WsOpcode.CtrlPong) + throw new WsUnsupportedOpcodeException(opcode); + + bool hasBody = length > 0; + bool isContinue = opcode == WsOpcode.DataContinue; + bool canFragment = (opcode & WsOpcode.CtrlClose) == 0; + + MemoryStream bodyStream = null; + + if(hasBody) { + if(canFragment) { + if(isContinue) { + if(FragmentedType == 0) + throw new WsUnexpectedContinueException(); + + opcode = FragmentedType; + + if(FragmentedStream == null) + FragmentedStream = bodyStream = new(); + else + bodyStream = FragmentedStream; + } else { + if(FragmentedType != 0) + throw new WsUnexpectedDataException(); + + if(isFinal) + bodyStream = new(); + else { + FragmentedType = opcode; + FragmentedStream = bodyStream = new(); + } + } + } else + bodyStream = new(); + + ReadFrameBody(bodyStream, length, mask); + } + + WsMessage msg; + + if(isFinal) { + if(canFragment && isContinue) { + FragmentedType = 0; + FragmentedStream = null; + } + + byte[] body = null; + + if(bodyStream != null) { + if(bodyStream.Length > 0) + body = bodyStream.ToArray(); + bodyStream.Dispose(); + } + + switch(opcode) { + case WsOpcode.DataText: + msg = new WsTextMessage(body); + break; + + case WsOpcode.DataBinary: + msg = new WsBinaryMessage(body); + break; + + case WsOpcode.CtrlClose: + msg = new WsCloseMessage(body); + break; + + case WsOpcode.CtrlPing: + msg = new WsPingMessage(body); + break; + + case WsOpcode.CtrlPong: + msg = new WsPongMessage(body); + break; + + default: // fallback, if we end up here something is very fucked + throw new WsUnsupportedOpcodeException(opcode); + } + } else msg = null; + + return msg; + } + + public WsMessage Receive() { + WsMessage msg; + while((msg = ReadFrame()) == null); + return msg; + } + + private void WriteFrameHeader(WsOpcode opcode, long length, bool isFinal, byte[] mask = null) { + bool shouldMask = mask != null; + + if(isFinal) + opcode |= WsOpcode.FlagFinal; + + Stream.WriteByte((byte)opcode); + + byte bLen1 = 0; + if(shouldMask) + bLen1 |= MASK_FLAG; + + byte[] bLenBuff = WsUtils.FromI64(length); + if(length < 126) { + Stream.WriteByte((byte)(bLen1 | bLenBuff[7])); + } else if(length <= ushort.MaxValue) { + Stream.WriteByte((byte)(bLen1 | 126)); + Stream.Write(bLenBuff, 6, 2); + } else { + Stream.WriteByte((byte)(bLen1 | 127)); + Stream.Write(bLenBuff, 0, 8); + } + + if(shouldMask) + Stream.Write(mask, 0, MASK_SIZE); + } + + private long WriteFrameBody(ReadOnlySpan body, byte[] mask = null, long offset = 0) { + if(mask != null) { + byte[] masked = new byte[body.Length]; + + for(int i = 0; i < body.Length; ++i) + masked[i] = (byte)(body[i] ^ mask[offset++ % MASK_SIZE]); + + body = masked; + } + + Stream.Write(body); + + return offset; + } + + private long WriteFrameBody(Stream body, byte[] mask = null, long offset = 0) { + bool shouldMask = mask != null; + + int read; + byte[] buffer = new byte[BUFFER_SIZE]; + while((read = body.Read(buffer, 0, BUFFER_SIZE)) > 0) + offset = WriteFrameBody(buffer.AsSpan(0, read), mask, offset); + + return offset; + } + + private void WriteFrame(WsOpcode opcode, ReadOnlySpan body, bool isFinal) { + byte[] mask = GenerateMask(); + WriteFrameHeader(opcode, body.Length, isFinal, mask); + if(body.Length > 0) + WriteFrameBody(body, mask); + Stream.Flush(); + } + + private void Write(WsOpcode opcode, ReadOnlySpan body) { + if(body.Length > 0xFFFF) { + WriteFrame(opcode, body.Slice(0, 0xFFFF), false); + body = body.Slice(0xFFFF); + + while(body.Length > 0xFFFF) { + WriteFrame(WsOpcode.DataContinue, body.Slice(0, 0xFFFF), false); + body = body.Slice(0xFFFF); + } + + WriteFrame(WsOpcode.DataContinue, body, true); + } else + WriteFrame(opcode, body, true); + } + + private void Write(WsOpcode opcode, Stream stream) { + if(stream == null) + throw new ArgumentNullException(nameof(stream)); + if(!stream.CanRead) + throw new ArgumentException(@"Provided stream cannot be read.", nameof(stream)); + + int read; + byte[] buffer = new byte[BUFFER_SIZE]; + + while((read = stream.Read(buffer, 0, BUFFER_SIZE)) > 0) { + WriteFrame(opcode, buffer.AsSpan(0, read), false); + + if(opcode != WsOpcode.DataContinue) + opcode = WsOpcode.DataContinue; + } + + // this kinda fucking sucks + WriteFrame(WsOpcode.CtrlClose, ReadOnlySpan.Empty, true); + } + + private void Write(WsOpcode opcode, Stream stream, int length) { + if(stream == null) + throw new ArgumentNullException(nameof(stream)); + if(!stream.CanRead) + throw new ArgumentException(@"Provided stream cannot be read.", nameof(stream)); + + int read; + byte[] buffer = new byte[BUFFER_SIZE]; + + if(length > BUFFER_SIZE) { + int take = BUFFER_SIZE; + + while((read = stream.Read(buffer, 0, take)) > 0) { + WriteFrame(opcode, buffer.AsSpan(0, read), false); + + if(opcode != WsOpcode.DataContinue) + opcode = WsOpcode.DataContinue; + + length -= read; + if(take > length) + take = length; + } + + // feel like there'd be a better way to do this + // but i feel like assuming that any successful read with something + // still coming (read == BUFFER_SIZE) will bite me in the ass later somehow + WriteFrame(WsOpcode.CtrlClose, Span.Empty, true); + } else { + read = stream.Read(buffer, 0, BUFFER_SIZE); + if(read > 0) + WriteFrame(WsOpcode.DataBinary, buffer.AsSpan(0, read), true); + } + } + + public void Send(string text) + => Write(WsOpcode.DataText, Encoding.UTF8.GetBytes(text)); + + public void Send(ReadOnlySpan buffer) + => Write(WsOpcode.DataBinary, buffer); + + public void Send(Stream source) + => Write(WsOpcode.DataBinary, source); + + public void Send(Stream source, int count) + => Write(WsOpcode.DataBinary, source, count); + + private void WriteControlFrame(WsOpcode opcode) { + WriteFrameHeader(opcode, 0, true, GenerateMask()); + Stream.Flush(); + } + + private void WriteControlFrame(WsOpcode opcode, ReadOnlySpan buffer) { + if(buffer.Length > 125) + throw new ArgumentException(@"Data may not be more than 125 bytes.", nameof(buffer)); + + byte[] mask = GenerateMask(); + WriteFrameHeader(opcode, buffer.Length, true, mask); + WriteFrameBody(buffer, mask); + Stream.Flush(); + } + + public void Ping() + => WriteControlFrame(WsOpcode.CtrlPing); + + public void Ping(ReadOnlySpan buffer) + => WriteControlFrame(WsOpcode.CtrlPing, buffer); + + public void Pong() + => WriteControlFrame(WsOpcode.CtrlPong); + + public void Pong(ReadOnlySpan buffer) + => WriteControlFrame(WsOpcode.CtrlPong, buffer); + + public void CloseEmpty() { + if(IsClosed) + return; + IsClosed = true; + + WriteControlFrame(WsOpcode.CtrlClose); + } + + public void Close(ReadOnlySpan buffer) { + if(IsClosed) + return; + IsClosed = true; + + WriteControlFrame(WsOpcode.CtrlClose, buffer); + } + + public void Close(WsCloseReason code) + => Close(WsUtils.FromU16((ushort)code)); + + public void Close(WsCloseReason code, ReadOnlySpan reason) { + if(reason.Length > 123) + throw new ArgumentException(@"Reason may not be more than 123 bytes.", nameof(reason)); + + if(IsClosed) + return; + IsClosed = true; + + byte[] mask = GenerateMask(); + WriteFrameHeader(WsOpcode.CtrlClose, 2 + reason.Length, true, mask); + WriteFrameBody(WsUtils.FromU16((ushort)code), mask); + WriteFrameBody(reason, mask, 2); + Stream.Flush(); + } + + public void Close(WsCloseReason code, string reason) { + if(string.IsNullOrEmpty(reason)) { + Close(code); + return; + } + + int length = Encoding.UTF8.GetByteCount(reason); + if(length > 123) + throw new ArgumentException(@"Reason string may not exceed 123 bytes in length.", nameof(reason)); + + if(IsClosed) + return; + IsClosed = true; + + byte[] mask = GenerateMask(); + WriteFrameHeader(WsOpcode.CtrlClose, 2 + reason.Length, true, mask); + WriteFrameBody(WsUtils.FromU16((ushort)code), mask); + WriteFrameBody(Encoding.UTF8.GetBytes(reason), mask, 2); + Stream.Flush(); + } + + private bool IsDisposed; + + ~WsConnection() { + DoDispose(); + } + + public void Dispose() { + DoDispose(); + GC.SuppressFinalize(this); + } + + private void DoDispose() { + if(IsDisposed) + return; + IsDisposed = true; + + Stream.Dispose(); + } + } +} diff --git a/Hamakaze/WebSocket/WsException.cs b/Hamakaze/WebSocket/WsException.cs new file mode 100644 index 0000000..50f0f5a --- /dev/null +++ b/Hamakaze/WebSocket/WsException.cs @@ -0,0 +1,29 @@ +namespace Hamakaze.WebSocket { + public class WsException : HttpException { + public WsException(string message) : base(message) { } + } + + public class WsInvalidOpcodeException : WsException { + public WsInvalidOpcodeException(WsOpcode opcode) : base($@"An invalid WebSocket opcode was encountered: {opcode}.") { } + } + + public class WsUnsupportedOpcodeException : WsException { + public WsUnsupportedOpcodeException(WsOpcode opcode) : base($@"An unsupported WebSocket opcode was encountered: {opcode}.") { } + } + + public class WsInvalidFrameSizeException : WsException { + public WsInvalidFrameSizeException(long size) : base($@"WebSocket frame size is too large: {size} bytes.") { } + } + + public class WsUnexpectedContinueException : WsException { + public WsUnexpectedContinueException() : base(@"A WebSocket continue frame was issued but there is nothing to continue.") { } + } + + public class WsUnexpectedDataException : WsException { + public WsUnexpectedDataException() : base(@"A WebSocket data frame was issued while a fragmented frame is being constructed.") { } + } + + public class WsInvalidControlFrameException : WsException { + public WsInvalidControlFrameException(string variant) : base($@"An invalid WebSocket control frame was encountered: {variant}") { } + } +} diff --git a/Hamakaze/WebSocket/WsMessage.cs b/Hamakaze/WebSocket/WsMessage.cs new file mode 100644 index 0000000..ebb9344 --- /dev/null +++ b/Hamakaze/WebSocket/WsMessage.cs @@ -0,0 +1,5 @@ +namespace Hamakaze.WebSocket { + public abstract class WsMessage { + // nothing, lol + } +} diff --git a/Hamakaze/WebSocket/WsOpcode.cs b/Hamakaze/WebSocket/WsOpcode.cs new file mode 100644 index 0000000..4491160 --- /dev/null +++ b/Hamakaze/WebSocket/WsOpcode.cs @@ -0,0 +1,13 @@ +namespace Hamakaze.WebSocket { + public enum WsOpcode : byte { + DataContinue = 0x00, + DataText = 0x01, + DataBinary = 0x02, + + CtrlClose = 0x08, + CtrlPing = 0x09, + CtrlPong = 0x0A, + + FlagFinal = 0x80, + } +} diff --git a/Hamakaze/WebSocket/WsPingMessage.cs b/Hamakaze/WebSocket/WsPingMessage.cs new file mode 100644 index 0000000..066d199 --- /dev/null +++ b/Hamakaze/WebSocket/WsPingMessage.cs @@ -0,0 +1,11 @@ +using System; + +namespace Hamakaze.WebSocket { + public class WsPingMessage : WsMessage { + public byte[] Data { get; } + + public WsPingMessage(byte[] data) { + Data = data ?? Array.Empty(); + } + } +} diff --git a/Hamakaze/WebSocket/WsPongMessage.cs b/Hamakaze/WebSocket/WsPongMessage.cs new file mode 100644 index 0000000..54d44bd --- /dev/null +++ b/Hamakaze/WebSocket/WsPongMessage.cs @@ -0,0 +1,11 @@ +using System; + +namespace Hamakaze.WebSocket { + public class WsPongMessage : WsMessage { + public byte[] Data { get; } + + public WsPongMessage(byte[] data) { + Data = data ?? Array.Empty(); + } + } +} diff --git a/Hamakaze/WebSocket/WsTextMessage.cs b/Hamakaze/WebSocket/WsTextMessage.cs new file mode 100644 index 0000000..fb41d76 --- /dev/null +++ b/Hamakaze/WebSocket/WsTextMessage.cs @@ -0,0 +1,14 @@ +using System.Text; + +namespace Hamakaze.WebSocket { + public class WsTextMessage : WsMessage { + public string Text { get; } + + public WsTextMessage(byte[] data) { + if(data?.Length > 0) + Text = Encoding.UTF8.GetString(data); + else + Text = string.Empty; + } + } +} diff --git a/Hamakaze/WebSocket/WsUtils.cs b/Hamakaze/WebSocket/WsUtils.cs new file mode 100644 index 0000000..ce2b319 --- /dev/null +++ b/Hamakaze/WebSocket/WsUtils.cs @@ -0,0 +1,38 @@ +using System; + +namespace Hamakaze.WebSocket { + internal static class WsUtils { + public static byte[] FromU16(ushort num) { + byte[] buff = BitConverter.GetBytes(num); + if(BitConverter.IsLittleEndian) + Array.Reverse(buff); + return buff; + } + + public static ushort ToU16(ReadOnlySpan buffer) { + if(BitConverter.IsLittleEndian) + buffer = new byte[2] { + buffer[1], buffer[0], + }; + + return BitConverter.ToUInt16(buffer); + } + + public static byte[] FromI64(long num) { + byte[] buff = BitConverter.GetBytes(num); + if(BitConverter.IsLittleEndian) + Array.Reverse(buff); + return buff; + } + + public static long ToI64(ReadOnlySpan buffer) { + if(BitConverter.IsLittleEndian) + buffer = new byte[8] { + buffer[7], buffer[6], buffer[5], buffer[4], + buffer[3], buffer[2], buffer[1], buffer[0], + }; + + return BitConverter.ToInt64(buffer); + } + } +} From 79591f27b7877cd4aa1e3dd4a215a90d0a48e540 Mon Sep 17 00:00:00 2001 From: flashwave Date: Sun, 10 Apr 2022 00:42:53 +0200 Subject: [PATCH 4/6] Fix chunked transfers for real this time. --- Hamakaze/HttpResponseMessage.cs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/Hamakaze/HttpResponseMessage.cs b/Hamakaze/HttpResponseMessage.cs index b2d7abb..a7bfb95 100644 --- a/Hamakaze/HttpResponseMessage.cs +++ b/Hamakaze/HttpResponseMessage.cs @@ -151,7 +151,7 @@ namespace Hamakaze { if(line == null) throw new IOException(@"Failed to read initial HTTP header."); if(!line.StartsWith(@"HTTP/")) - throw new IOException(@"Response is not a valid HTTP message."); + throw new IOException($@"Response is not a valid HTTP message: {line}."); string[] parts = line[5..].Split(' ', 3); if(!int.TryParse(parts.ElementAtOrDefault(1), out int statusCode)) throw new IOException(@"Invalid HTTP status code format."); @@ -238,6 +238,8 @@ namespace Hamakaze { readBuffer(chunkLength); readLine(); } + + readLine(); } else if(contentLength != 0) { body = new MemoryStream(); readBuffer(contentLength); From 0e1fb36721655a23d6242b8a488a77d9f76d0f3f Mon Sep 17 00:00:00 2001 From: flashwave Date: Sun, 10 Apr 2022 18:06:56 +0200 Subject: [PATCH 5/6] Pretty much complete websocket implementation. --- Hamakaze/HttpClient.cs | 131 ++++++++++-- Hamakaze/WebSocket/IHasBinaryData.cs | 5 + Hamakaze/WebSocket/WsBinaryMessage.cs | 2 +- Hamakaze/WebSocket/WsBufferedSend.cs | 7 +- Hamakaze/WebSocket/WsClient.cs | 198 ++++++++++++++++-- Hamakaze/WebSocket/WsCloseMessage.cs | 2 +- Hamakaze/WebSocket/WsConnection.cs | 283 ++++++++++---------------- Hamakaze/WebSocket/WsException.cs | 12 ++ Hamakaze/WebSocket/WsPingMessage.cs | 2 +- Hamakaze/WebSocket/WsPongMessage.cs | 2 +- Hamakaze/WebSocket/WsTextMessage.cs | 6 + 11 files changed, 434 insertions(+), 216 deletions(-) create mode 100644 Hamakaze/WebSocket/IHasBinaryData.cs diff --git a/Hamakaze/HttpClient.cs b/Hamakaze/HttpClient.cs index 24f05d3..5d694a9 100644 --- a/Hamakaze/HttpClient.cs +++ b/Hamakaze/HttpClient.cs @@ -99,38 +99,83 @@ namespace Hamakaze { Action onOpen, Action onMessage, Action onError, - IEnumerable protocols = null - ) { - CreateWsConnection( + IEnumerable protocols = null, + Action onResponse = null, + bool disposeRequest = true, + bool disposeResponse = true + ) => CreateWsConnection( url, conn => onOpen(new WsClient(conn, onMessage, onError)), onError, - protocols + protocols, + onResponse, + disposeRequest, + disposeResponse + ); + + public void CreateWsClient( + HttpRequestMessage request, + Action onOpen, + Action onMessage, + Action onError, + IEnumerable protocols = null, + Action onResponse = null, + bool disposeRequest = true, + bool disposeResponse = true + ) => CreateWsConnection( + request, + conn => onOpen(new WsClient(conn, onMessage, onError)), + onError, + protocols, + onResponse, + disposeRequest, + disposeResponse ); - } public void CreateWsConnection( string url, Action onOpen, Action onError, - IEnumerable protocols = null + IEnumerable protocols = null, + Action onResponse = null, + bool disposeRequest = true, + bool disposeResponse = true + ) => CreateWsConnection( + new HttpRequestMessage(@"GET", url), + onOpen, + onError, + protocols, + onResponse, + disposeRequest, + disposeResponse + ); + + public void CreateWsConnection( + HttpRequestMessage request, + Action onOpen, + Action onError, + IEnumerable protocols = null, + Action onResponse = null, + bool disposeRequest = true, + bool disposeResponse = true ) { string key = Convert.ToBase64String(RandomNumberGenerator.GetBytes(WS_RNG)); - HttpRequestMessage req = new HttpRequestMessage(@"GET", url); - req.Connection = HttpConnectionHeader.UPGRADE; - req.SetHeader(@"Cache-Control", @"no-cache"); - req.SetHeader(@"Upgrade", WS_PROTO); - req.SetHeader(@"Sec-WebSocket-Key", key); - req.SetHeader(@"Sec-WebSocket-Version", @"13"); + request.Connection = HttpConnectionHeader.UPGRADE; + request.SetHeader(@"Cache-Control", @"no-cache"); + request.SetHeader(@"Upgrade", WS_PROTO); + request.SetHeader(@"Sec-WebSocket-Key", key); + request.SetHeader(@"Sec-WebSocket-Version", @"13"); if(protocols?.Any() == true) - req.SetHeader(@"Sec-WebSocket-Protocol", string.Join(@", ", protocols)); + request.SetHeader(@"Sec-WebSocket-Protocol", string.Join(@", ", protocols)); SendRequest( - req, + request, (t, res) => { try { + onResponse?.Invoke(res); + if(res.ProtocolVersion.CompareTo(@"1.1") < 0) throw new HttpUpgradeProtocolVersionException(@"1.1", res.ProtocolVersion); @@ -168,7 +213,9 @@ namespace Hamakaze { onError(ex); } }, - (t, ex) => onError(ex) + (t, ex) => onError(ex), + disposeRequest: disposeRequest, + disposeResponse: disposeResponse ); } @@ -182,9 +229,57 @@ namespace Hamakaze { Action onStateChange = null, bool disposeRequest = true, bool disposeResponse = true - ) { - Instance.SendRequest(request, onComplete, onError, onCancel, onDownloadProgress, onUploadProgress, onStateChange, disposeRequest, disposeResponse); - } + ) => Instance.SendRequest( + request, + onComplete, + onError, + onCancel, + onDownloadProgress, + onUploadProgress, + onStateChange, + disposeRequest, + disposeResponse + ); + + public static void Connect( + string url, + Action onOpen, + Action onMessage, + Action onError, + IEnumerable protocols = null, + Action onResponse = null, + bool disposeRequest = true, + bool disposeResponse = true + ) => Instance.CreateWsClient( + url, + onOpen, + onMessage, + onError, + protocols, + onResponse, + disposeRequest, + disposeResponse + ); + + public static void Connect( + HttpRequestMessage request, + Action onOpen, + Action onMessage, + Action onError, + IEnumerable protocols = null, + Action onResponse = null, + bool disposeRequest = true, + bool disposeResponse = true + ) => Instance.CreateWsClient( + request, + onOpen, + onMessage, + onError, + protocols, + onResponse, + disposeRequest, + disposeResponse + ); private bool IsDisposed; ~HttpClient() diff --git a/Hamakaze/WebSocket/IHasBinaryData.cs b/Hamakaze/WebSocket/IHasBinaryData.cs new file mode 100644 index 0000000..bd994b7 --- /dev/null +++ b/Hamakaze/WebSocket/IHasBinaryData.cs @@ -0,0 +1,5 @@ +namespace Hamakaze.WebSocket { + public interface IHasBinaryData { + byte[] Data { get; } + } +} diff --git a/Hamakaze/WebSocket/WsBinaryMessage.cs b/Hamakaze/WebSocket/WsBinaryMessage.cs index 172ba4f..a0be483 100644 --- a/Hamakaze/WebSocket/WsBinaryMessage.cs +++ b/Hamakaze/WebSocket/WsBinaryMessage.cs @@ -1,7 +1,7 @@ using System; namespace Hamakaze.WebSocket { - public class WsBinaryMessage : WsMessage { + public class WsBinaryMessage : WsMessage, IHasBinaryData { public byte[] Data { get; } public WsBinaryMessage(byte[] data) { diff --git a/Hamakaze/WebSocket/WsBufferedSend.cs b/Hamakaze/WebSocket/WsBufferedSend.cs index b5e7c2d..9c628be 100644 --- a/Hamakaze/WebSocket/WsBufferedSend.cs +++ b/Hamakaze/WebSocket/WsBufferedSend.cs @@ -8,7 +8,11 @@ namespace Hamakaze.WebSocket { Connection = conn ?? throw new ArgumentNullException(nameof(conn)); } - // + public void SendPart(ReadOnlySpan data) + => Connection.WriteFrame(WsOpcode.DataBinary, data, false); + + public void SendFinalPart(ReadOnlySpan data) + => Connection.WriteFrame(WsOpcode.DataBinary, data, true); private bool IsDisposed; @@ -26,6 +30,7 @@ namespace Hamakaze.WebSocket { return; IsDisposed = true; + Connection.EndBufferedSend(); } } } diff --git a/Hamakaze/WebSocket/WsClient.cs b/Hamakaze/WebSocket/WsClient.cs index 5bb788e..d626c71 100644 --- a/Hamakaze/WebSocket/WsClient.cs +++ b/Hamakaze/WebSocket/WsClient.cs @@ -1,6 +1,8 @@ using System; using System.Threading; +// todo: sending errors as fake close messages + namespace Hamakaze.WebSocket { public class WsClient : IDisposable { public WsConnection Connection { get; } @@ -11,6 +13,7 @@ namespace Hamakaze.WebSocket { private Action ExceptionHandler { get; } private Mutex SendLock { get; } + private const int TIMEOUT = 60000; public WsClient( WsConnection connection, @@ -38,81 +41,246 @@ namespace Hamakaze.WebSocket { } public void Send(string text) { - Connection.Send(text); + try { + if(!SendLock.WaitOne(TIMEOUT)) + throw new WsClientMutexFailedException(); + Connection.Send(text); + } finally { + SendLock.ReleaseMutex(); + } } public void Send(object obj) { if(obj == null) throw new ArgumentNullException(nameof(obj)); - Connection.Send(obj.ToString()); + try { + if(!SendLock.WaitOne(TIMEOUT)) + throw new WsClientMutexFailedException(); + Connection.Send(obj.ToString()); + } finally { + SendLock.ReleaseMutex(); + } } public void Send(ReadOnlySpan data) { - Connection.Send(data); + if(data == null) + throw new ArgumentNullException(nameof(data)); + + try { + if(!SendLock.WaitOne(TIMEOUT)) + throw new WsClientMutexFailedException(); + Connection.Send(data); + } finally { + SendLock.ReleaseMutex(); + } } public void Send(byte[] buffer, int offset, int count) { if(buffer == null) throw new ArgumentNullException(nameof(buffer)); - Connection.Send(buffer.AsSpan(offset, count)); + try { + if(!SendLock.WaitOne(TIMEOUT)) + throw new WsClientMutexFailedException(); + Connection.Send(buffer.AsSpan(offset, count)); + } finally { + SendLock.ReleaseMutex(); + } + } + + public void Send(Action handler) { + if(handler == null) + throw new ArgumentNullException(nameof(handler)); + + try { + if(!SendLock.WaitOne(TIMEOUT)) + throw new WsClientMutexFailedException(); + using(WsBufferedSend bs = Connection.BeginBufferedSend()) + handler(bs); + } finally { + SendLock.ReleaseMutex(); + } } public void Ping() { - Connection.Ping(); + try { + if(!SendLock.WaitOne(TIMEOUT)) + throw new WsClientMutexFailedException(); + Connection.Ping(); + } finally { + SendLock.ReleaseMutex(); + } } public void Ping(ReadOnlySpan data) { - Connection.Ping(data); + if(data == null) + throw new ArgumentNullException(nameof(data)); + + try { + if(!SendLock.WaitOne(TIMEOUT)) + throw new WsClientMutexFailedException(); + Connection.Ping(data); + } finally { + SendLock.ReleaseMutex(); + } } public void Ping(byte[] buffer, int offset, int length) { if(buffer == null) throw new ArgumentNullException(nameof(buffer)); - Connection.Ping(buffer.AsSpan(offset, length)); + try { + if(!SendLock.WaitOne(TIMEOUT)) + throw new WsClientMutexFailedException(); + Connection.Ping(buffer.AsSpan(offset, length)); + } finally { + SendLock.ReleaseMutex(); + } } public void Pong() { - Connection.Pong(); + try { + if(!SendLock.WaitOne(TIMEOUT)) + throw new WsClientMutexFailedException(); + Connection.Pong(); + } finally { + SendLock.ReleaseMutex(); + } } public void Pong(ReadOnlySpan data) { - Connection.Pong(data); + if(data == null) + throw new ArgumentNullException(nameof(data)); + + try { + if(!SendLock.WaitOne(TIMEOUT)) + throw new WsClientMutexFailedException(); + Connection.Pong(data); + } finally { + SendLock.ReleaseMutex(); + } } public void Pong(byte[] buffer, int offset, int length) { if(buffer == null) throw new ArgumentNullException(nameof(buffer)); - Pong(buffer.AsSpan(offset, length)); + try { + if(!SendLock.WaitOne(TIMEOUT)) + throw new WsClientMutexFailedException(); + Connection.Pong(buffer.AsSpan(offset, length)); + } finally { + SendLock.ReleaseMutex(); + } } public void Close() { - Connection.Close(WsCloseReason.NormalClosure); + try { + if(!SendLock.WaitOne(TIMEOUT)) + throw new WsClientMutexFailedException(); + Connection.Close(WsCloseReason.NormalClosure); + } finally { + SendLock.ReleaseMutex(); + } } public void CloseEmpty() { - Connection.CloseEmpty(); + try { + if(!SendLock.WaitOne(TIMEOUT)) + throw new WsClientMutexFailedException(); + Connection.CloseEmpty(); + } finally { + SendLock.ReleaseMutex(); + } + } + + public void Close(WsCloseReason opcode) { + try { + if(!SendLock.WaitOne(TIMEOUT)) + throw new WsClientMutexFailedException(); + Connection.Close(opcode); + } finally { + SendLock.ReleaseMutex(); + } } public void Close(string reason) { - Connection.Close(WsCloseReason.NormalClosure, reason); + if(reason == null) + throw new ArgumentNullException(nameof(reason)); + + try { + if(!SendLock.WaitOne(TIMEOUT)) + throw new WsClientMutexFailedException(); + Connection.Close(WsCloseReason.NormalClosure, reason); + } finally { + SendLock.ReleaseMutex(); + } + } + + public void Close(WsCloseReason opcode, string reason) { + if(reason == null) + throw new ArgumentNullException(nameof(reason)); + + try { + if(!SendLock.WaitOne(TIMEOUT)) + throw new WsClientMutexFailedException(); + Connection.Close(opcode, reason); + } finally { + SendLock.ReleaseMutex(); + } + } + + public void Close(ReadOnlySpan data) { + if(data == null) + throw new ArgumentNullException(nameof(data)); + + try { + if(!SendLock.WaitOne(TIMEOUT)) + throw new WsClientMutexFailedException(); + Connection.Close(data); + } finally { + SendLock.ReleaseMutex(); + } } public void Close(byte[] buffer, int offset, int length) { if(buffer == null) throw new ArgumentNullException(nameof(buffer)); - Connection.Close(buffer.AsSpan(offset, length)); + try { + if(!SendLock.WaitOne(TIMEOUT)) + throw new WsClientMutexFailedException(); + Connection.Close(buffer.AsSpan(offset, length)); + } finally { + SendLock.ReleaseMutex(); + } + } + + public void Close(WsCloseReason opcode, ReadOnlySpan data) { + if(data == null) + throw new ArgumentNullException(nameof(data)); + + try { + if(!SendLock.WaitOne(TIMEOUT)) + throw new WsClientMutexFailedException(); + Connection.Close(opcode, data); + } finally { + SendLock.ReleaseMutex(); + } } public void Close(WsCloseReason code, byte[] buffer, int offset, int length) { if(buffer == null) throw new ArgumentNullException(nameof(buffer)); - Connection.Close(code, buffer.AsSpan(offset, length)); + try { + if(!SendLock.WaitOne(TIMEOUT)) + throw new WsClientMutexFailedException(); + Connection.Close(code, buffer.AsSpan(offset, length)); + } finally { + SendLock.ReleaseMutex(); + } } private bool IsDisposed; diff --git a/Hamakaze/WebSocket/WsCloseMessage.cs b/Hamakaze/WebSocket/WsCloseMessage.cs index e0a584a..9989046 100644 --- a/Hamakaze/WebSocket/WsCloseMessage.cs +++ b/Hamakaze/WebSocket/WsCloseMessage.cs @@ -2,7 +2,7 @@ using System; using System.Text; namespace Hamakaze.WebSocket { - public class WsCloseMessage : WsMessage { + public class WsCloseMessage : WsMessage, IHasBinaryData { public WsCloseReason Reason { get; } public string ReasonPhrase { get; } public byte[] Data { get; } diff --git a/Hamakaze/WebSocket/WsConnection.cs b/Hamakaze/WebSocket/WsConnection.cs index 164dc13..a1f42c3 100644 --- a/Hamakaze/WebSocket/WsConnection.cs +++ b/Hamakaze/WebSocket/WsConnection.cs @@ -4,17 +4,6 @@ using System.Net.Security; using System.Security.Cryptography; using System.Text; -// TODO: optimisations with newer .net feature to reduce memory copying -// i think we're generally aware of how much data we're shoving around -// so memorystream can be considered overkill - -// Should there be internal mutexing on the socket? (leaning towards no) - -// Should all external stream handling be moved to WsClient? -// - IDEA: Buffered send "session" class. -// Would require exposing the raw Write methods -// but i suppose that's what "internal" exists for - namespace Hamakaze.WebSocket { public class WsConnection : IDisposable { public Stream Stream { get; } @@ -22,12 +11,13 @@ namespace Hamakaze.WebSocket { public bool IsSecure { get; } public bool IsClosed { get; private set; } - private const int BUFFER_SIZE = 0x2000; private const byte MASK_FLAG = 0x80; private const int MASK_SIZE = 4; - private WsOpcode FragmentedType = 0; - private MemoryStream FragmentedStream; + private WsOpcode FragmentType = 0; + private MemoryStream FragmentStream; + + private WsBufferedSend BufferedSend; public WsConnection(Stream stream) { Stream = stream ?? throw new ArgumentNullException(nameof(stream)); @@ -44,7 +34,7 @@ namespace Hamakaze.WebSocket { throw new Exception(@"Was unable to read the requested amount of data."); } - private (WsOpcode opcode, long length, bool isFinal, byte[] mask) ReadFrameHeader() { + private (WsOpcode opcode, int length, bool isFinal, byte[] mask) ReadFrameHeader() { byte[] buffer = new byte[8]; StrictRead(buffer, 0, 2); @@ -81,7 +71,11 @@ namespace Hamakaze.WebSocket { // i seriously don't understand the rationale behind both // having a framing system but then also supporting frame lengths // of 2^63, feels like 2^16 per frame would be a fine max. - if(length < 0 || length > long.MaxValue) + // UPDATE: decided to put the max at 2^32-1 + // it's still more than you should ever need for a single frame + // and it makes working with the number within a .NET context + // less of a bother. + if(length < 0 || length > int.MaxValue) throw new WsInvalidFrameSizeException(length); byte[] mask = null; @@ -91,32 +85,30 @@ namespace Hamakaze.WebSocket { mask = buffer; } - return (opcode, length, isFinal, mask); + return (opcode, (int)length, isFinal, mask); } - private long ReadFrameBody(Stream target, long length, byte[] mask, long offset = 0) { + private int ReadFrameBody(byte[] target, int length, byte[] mask, int offset = 0) { if(target == null) throw new ArgumentNullException(nameof(target)); - if(!target.CanWrite) - throw new ArgumentException(@"Target stream is not writable.", nameof(target)); bool isMasked = mask != null; int read; - int take = length > BUFFER_SIZE ? BUFFER_SIZE : (int)length; - byte[] buffer = new byte[take]; + const int bufferSize = 0x1000; + int take = length > bufferSize ? bufferSize : (int)length; while(length > 0) { - read = Stream.Read(buffer, 0, take); + read = Stream.Read(target, offset, take); if(isMasked) - for(int i = 0; i < read; ++i) - buffer[i] ^= mask[offset++ % MASK_SIZE]; + for(int i = 0; i < read; ++i) { + int o = offset + i; + target[o] ^= mask[o % MASK_SIZE]; + } - target.Write(buffer, 0, read); - - offset += read; length -= read; + offset += read; if(take > length) take = (int)length; @@ -126,7 +118,7 @@ namespace Hamakaze.WebSocket { } private WsMessage ReadFrame() { - (WsOpcode opcode, long length, bool isFinal, byte[] mask) = ReadFrameHeader(); + (WsOpcode opcode, int length, bool isFinal, byte[] mask) = ReadFrameHeader(); if(opcode is not WsOpcode.DataContinue and not WsOpcode.DataBinary @@ -140,77 +132,55 @@ namespace Hamakaze.WebSocket { bool isContinue = opcode == WsOpcode.DataContinue; bool canFragment = (opcode & WsOpcode.CtrlClose) == 0; - MemoryStream bodyStream = null; + byte[] body = length < 1 ? null : new byte[length]; if(hasBody) { + ReadFrameBody(body, length, mask); + if(canFragment) { if(isContinue) { - if(FragmentedType == 0) + if(FragmentType == 0) throw new WsUnexpectedContinueException(); - opcode = FragmentedType; + opcode = FragmentType; - if(FragmentedStream == null) - FragmentedStream = bodyStream = new(); - else - bodyStream = FragmentedStream; + FragmentStream ??= new(); + FragmentStream.Write(body, 0, length); } else { - if(FragmentedType != 0) + if(FragmentType != 0) throw new WsUnexpectedDataException(); - if(isFinal) - bodyStream = new(); - else { - FragmentedType = opcode; - FragmentedStream = bodyStream = new(); + if(!isFinal) { + FragmentType = opcode; + FragmentStream = new(); + FragmentStream.Write(body, 0, length); } } - } else - bodyStream = new(); - - ReadFrameBody(bodyStream, length, mask); + } } WsMessage msg; if(isFinal) { if(canFragment && isContinue) { - FragmentedType = 0; - FragmentedStream = null; + FragmentType = 0; + + body = FragmentStream.ToArray(); + FragmentStream.Dispose(); + FragmentStream = null; } - byte[] body = null; + msg = opcode switch { + WsOpcode.DataText => new WsTextMessage(body), + WsOpcode.DataBinary => new WsBinaryMessage(body), - if(bodyStream != null) { - if(bodyStream.Length > 0) - body = bodyStream.ToArray(); - bodyStream.Dispose(); - } + WsOpcode.CtrlClose => new WsCloseMessage(body), + WsOpcode.CtrlPing => new WsPingMessage(body), + WsOpcode.CtrlPong => new WsPongMessage(body), - switch(opcode) { - case WsOpcode.DataText: - msg = new WsTextMessage(body); - break; - - case WsOpcode.DataBinary: - msg = new WsBinaryMessage(body); - break; - - case WsOpcode.CtrlClose: - msg = new WsCloseMessage(body); - break; - - case WsOpcode.CtrlPing: - msg = new WsPingMessage(body); - break; - - case WsOpcode.CtrlPong: - msg = new WsPongMessage(body); - break; - - default: // fallback, if we end up here something is very fucked - throw new WsUnsupportedOpcodeException(opcode); - } + // fallback, if we end up here something is very fucked + _ => throw new WsUnsupportedOpcodeException(opcode), + }; } else msg = null; return msg; @@ -222,7 +192,10 @@ namespace Hamakaze.WebSocket { return msg; } - private void WriteFrameHeader(WsOpcode opcode, long length, bool isFinal, byte[] mask = null) { + private void WriteFrameHeader(WsOpcode opcode, int length, bool isFinal, byte[] mask = null) { + if(length < 0 || length > int.MaxValue) + throw new WsInvalidFrameSizeException(length); + bool shouldMask = mask != null; if(isFinal) @@ -247,9 +220,13 @@ namespace Hamakaze.WebSocket { if(shouldMask) Stream.Write(mask, 0, MASK_SIZE); + Stream.Flush(); } - private long WriteFrameBody(ReadOnlySpan body, byte[] mask = null, long offset = 0) { + private int WriteFrameBody(ReadOnlySpan body, byte[] mask = null, int offset = 0) { + if(body == null) + throw new ArgumentNullException(nameof(body)); + if(mask != null) { byte[] masked = new byte[body.Length]; @@ -260,37 +237,34 @@ namespace Hamakaze.WebSocket { } Stream.Write(body); + Stream.Flush(); return offset; } - private long WriteFrameBody(Stream body, byte[] mask = null, long offset = 0) { - bool shouldMask = mask != null; + internal void WriteFrame(WsOpcode opcode, ReadOnlySpan body, bool isFinal) { + if(body == null) + throw new ArgumentNullException(nameof(body)); - int read; - byte[] buffer = new byte[BUFFER_SIZE]; - while((read = body.Read(buffer, 0, BUFFER_SIZE)) > 0) - offset = WriteFrameBody(buffer.AsSpan(0, read), mask, offset); - - return offset; - } - - private void WriteFrame(WsOpcode opcode, ReadOnlySpan body, bool isFinal) { byte[] mask = GenerateMask(); WriteFrameHeader(opcode, body.Length, isFinal, mask); if(body.Length > 0) WriteFrameBody(body, mask); - Stream.Flush(); } - private void Write(WsOpcode opcode, ReadOnlySpan body) { - if(body.Length > 0xFFFF) { - WriteFrame(opcode, body.Slice(0, 0xFFFF), false); - body = body.Slice(0xFFFF); + private void WriteData(WsOpcode opcode, ReadOnlySpan body) { + if(body == null) + throw new ArgumentNullException(nameof(body)); + if(BufferedSend != null) + throw new WsBufferedSendInSessionException(); - while(body.Length > 0xFFFF) { - WriteFrame(WsOpcode.DataContinue, body.Slice(0, 0xFFFF), false); - body = body.Slice(0xFFFF); + if(body.Length > ushort.MaxValue) { + WriteFrame(opcode, body.Slice(0, ushort.MaxValue), false); + body = body.Slice(ushort.MaxValue); + + while(body.Length > ushort.MaxValue) { + WriteFrame(WsOpcode.DataContinue, body.Slice(0, ushort.MaxValue), false); + body = body.Slice(ushort.MaxValue); } WriteFrame(WsOpcode.DataContinue, body, true); @@ -298,119 +272,74 @@ namespace Hamakaze.WebSocket { WriteFrame(opcode, body, true); } - private void Write(WsOpcode opcode, Stream stream) { - if(stream == null) - throw new ArgumentNullException(nameof(stream)); - if(!stream.CanRead) - throw new ArgumentException(@"Provided stream cannot be read.", nameof(stream)); - - int read; - byte[] buffer = new byte[BUFFER_SIZE]; - - while((read = stream.Read(buffer, 0, BUFFER_SIZE)) > 0) { - WriteFrame(opcode, buffer.AsSpan(0, read), false); - - if(opcode != WsOpcode.DataContinue) - opcode = WsOpcode.DataContinue; - } - - // this kinda fucking sucks - WriteFrame(WsOpcode.CtrlClose, ReadOnlySpan.Empty, true); - } - - private void Write(WsOpcode opcode, Stream stream, int length) { - if(stream == null) - throw new ArgumentNullException(nameof(stream)); - if(!stream.CanRead) - throw new ArgumentException(@"Provided stream cannot be read.", nameof(stream)); - - int read; - byte[] buffer = new byte[BUFFER_SIZE]; - - if(length > BUFFER_SIZE) { - int take = BUFFER_SIZE; - - while((read = stream.Read(buffer, 0, take)) > 0) { - WriteFrame(opcode, buffer.AsSpan(0, read), false); - - if(opcode != WsOpcode.DataContinue) - opcode = WsOpcode.DataContinue; - - length -= read; - if(take > length) - take = length; - } - - // feel like there'd be a better way to do this - // but i feel like assuming that any successful read with something - // still coming (read == BUFFER_SIZE) will bite me in the ass later somehow - WriteFrame(WsOpcode.CtrlClose, Span.Empty, true); - } else { - read = stream.Read(buffer, 0, BUFFER_SIZE); - if(read > 0) - WriteFrame(WsOpcode.DataBinary, buffer.AsSpan(0, read), true); - } - } - public void Send(string text) - => Write(WsOpcode.DataText, Encoding.UTF8.GetBytes(text)); + => WriteData(WsOpcode.DataText, Encoding.UTF8.GetBytes(text)); public void Send(ReadOnlySpan buffer) - => Write(WsOpcode.DataBinary, buffer); + => WriteData(WsOpcode.DataBinary, buffer); - public void Send(Stream source) - => Write(WsOpcode.DataBinary, source); - - public void Send(Stream source, int count) - => Write(WsOpcode.DataBinary, source, count); - - private void WriteControlFrame(WsOpcode opcode) { - WriteFrameHeader(opcode, 0, true, GenerateMask()); - Stream.Flush(); + public WsBufferedSend BeginBufferedSend() { + if(BufferedSend != null) + throw new WsBufferedSendAlreadyActiveException(); + return BufferedSend = new(this); } - private void WriteControlFrame(WsOpcode opcode, ReadOnlySpan buffer) { + // this method should only be called from within WsBufferedSend.Dispose + internal void EndBufferedSend() { + BufferedSend = null; + } + + private void WriteControl(WsOpcode opcode) + => WriteFrameHeader(opcode, 0, true, GenerateMask()); + + private void WriteControl(WsOpcode opcode, ReadOnlySpan buffer) { + if(buffer == null) + throw new ArgumentNullException(nameof(buffer)); if(buffer.Length > 125) throw new ArgumentException(@"Data may not be more than 125 bytes.", nameof(buffer)); byte[] mask = GenerateMask(); WriteFrameHeader(opcode, buffer.Length, true, mask); WriteFrameBody(buffer, mask); - Stream.Flush(); } public void Ping() - => WriteControlFrame(WsOpcode.CtrlPing); + => WriteControl(WsOpcode.CtrlPing); public void Ping(ReadOnlySpan buffer) - => WriteControlFrame(WsOpcode.CtrlPing, buffer); + => WriteControl(WsOpcode.CtrlPing, buffer); public void Pong() - => WriteControlFrame(WsOpcode.CtrlPong); + => WriteControl(WsOpcode.CtrlPong); public void Pong(ReadOnlySpan buffer) - => WriteControlFrame(WsOpcode.CtrlPong, buffer); + => WriteControl(WsOpcode.CtrlPong, buffer); public void CloseEmpty() { if(IsClosed) return; IsClosed = true; - WriteControlFrame(WsOpcode.CtrlClose); + WriteControl(WsOpcode.CtrlClose); } public void Close(ReadOnlySpan buffer) { + if(buffer == null) + throw new ArgumentNullException(nameof(buffer)); + if(IsClosed) return; IsClosed = true; - WriteControlFrame(WsOpcode.CtrlClose, buffer); + WriteControl(WsOpcode.CtrlClose, buffer); } public void Close(WsCloseReason code) => Close(WsUtils.FromU16((ushort)code)); public void Close(WsCloseReason code, ReadOnlySpan reason) { + if(reason == null) + throw new ArgumentNullException(nameof(reason)); if(reason.Length > 123) throw new ArgumentException(@"Reason may not be more than 123 bytes.", nameof(reason)); @@ -422,14 +351,11 @@ namespace Hamakaze.WebSocket { WriteFrameHeader(WsOpcode.CtrlClose, 2 + reason.Length, true, mask); WriteFrameBody(WsUtils.FromU16((ushort)code), mask); WriteFrameBody(reason, mask, 2); - Stream.Flush(); } public void Close(WsCloseReason code, string reason) { - if(string.IsNullOrEmpty(reason)) { - Close(code); - return; - } + if(reason == null) + throw new ArgumentNullException(nameof(reason)); int length = Encoding.UTF8.GetByteCount(reason); if(length > 123) @@ -443,7 +369,6 @@ namespace Hamakaze.WebSocket { WriteFrameHeader(WsOpcode.CtrlClose, 2 + reason.Length, true, mask); WriteFrameBody(WsUtils.FromU16((ushort)code), mask); WriteFrameBody(Encoding.UTF8.GetBytes(reason), mask, 2); - Stream.Flush(); } private bool IsDisposed; @@ -462,6 +387,8 @@ namespace Hamakaze.WebSocket { return; IsDisposed = true; + BufferedSend?.Dispose(); + FragmentStream?.Dispose(); Stream.Dispose(); } } diff --git a/Hamakaze/WebSocket/WsException.cs b/Hamakaze/WebSocket/WsException.cs index 50f0f5a..fc17bf6 100644 --- a/Hamakaze/WebSocket/WsException.cs +++ b/Hamakaze/WebSocket/WsException.cs @@ -26,4 +26,16 @@ namespace Hamakaze.WebSocket { public class WsInvalidControlFrameException : WsException { public WsInvalidControlFrameException(string variant) : base($@"An invalid WebSocket control frame was encountered: {variant}") { } } + + public class WsClientMutexFailedException : WsException { + public WsClientMutexFailedException() : base(@"Failed to acquire send mutex.") { } + } + + public class WsBufferedSendAlreadyActiveException : WsException { + public WsBufferedSendAlreadyActiveException() : base(@"A buffered websocket send is already in session.") { } + } + + public class WsBufferedSendInSessionException : WsException { + public WsBufferedSendInSessionException() : base(@"Cannot send data while a buffered send is in session.") { } + } } diff --git a/Hamakaze/WebSocket/WsPingMessage.cs b/Hamakaze/WebSocket/WsPingMessage.cs index 066d199..15548d1 100644 --- a/Hamakaze/WebSocket/WsPingMessage.cs +++ b/Hamakaze/WebSocket/WsPingMessage.cs @@ -1,7 +1,7 @@ using System; namespace Hamakaze.WebSocket { - public class WsPingMessage : WsMessage { + public class WsPingMessage : WsMessage, IHasBinaryData { public byte[] Data { get; } public WsPingMessage(byte[] data) { diff --git a/Hamakaze/WebSocket/WsPongMessage.cs b/Hamakaze/WebSocket/WsPongMessage.cs index 54d44bd..96218e7 100644 --- a/Hamakaze/WebSocket/WsPongMessage.cs +++ b/Hamakaze/WebSocket/WsPongMessage.cs @@ -1,7 +1,7 @@ using System; namespace Hamakaze.WebSocket { - public class WsPongMessage : WsMessage { + public class WsPongMessage : WsMessage, IHasBinaryData { public byte[] Data { get; } public WsPongMessage(byte[] data) { diff --git a/Hamakaze/WebSocket/WsTextMessage.cs b/Hamakaze/WebSocket/WsTextMessage.cs index fb41d76..13b050d 100644 --- a/Hamakaze/WebSocket/WsTextMessage.cs +++ b/Hamakaze/WebSocket/WsTextMessage.cs @@ -10,5 +10,11 @@ namespace Hamakaze.WebSocket { else Text = string.Empty; } + + public static implicit operator string(WsTextMessage msg) => msg.Text; + + public override string ToString() { + return Text; + } } } From ad80ef21c87a37337f907364355a62bff13a2022 Mon Sep 17 00:00:00 2001 From: flashwave Date: Sun, 10 Apr 2022 18:49:40 +0200 Subject: [PATCH 6/6] Fixed read timeout issue. --- Hamakaze/HttpConnection.cs | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/Hamakaze/HttpConnection.cs b/Hamakaze/HttpConnection.cs index 509a6b6..c7eda62 100644 --- a/Hamakaze/HttpConnection.cs +++ b/Hamakaze/HttpConnection.cs @@ -13,6 +13,9 @@ namespace Hamakaze { private Socket Socket { get; } + private NetworkStream NetworkStream { get; } + private SslStream SslStream { get; } + public string Host { get; } public bool IsSecure { get; } @@ -39,19 +42,19 @@ namespace Hamakaze { }; Socket.Connect(endPoint); - Stream stream = new NetworkStream(Socket, true); + NetworkStream = new NetworkStream(Socket, true); if(IsSecure) { - SslStream sslStream = new SslStream(stream, false, (s, ce, ch, e) => e == SslPolicyErrors.None, null); - Stream = sslStream; - sslStream.AuthenticateAsClient( + SslStream = new SslStream(NetworkStream, false, (s, ce, ch, e) => e == SslPolicyErrors.None, null); + Stream = SslStream; + SslStream.AuthenticateAsClient( Host, null, SslProtocols.Tls11 | SslProtocols.Tls12 | SslProtocols.Tls13, true ); } else - Stream = stream; + Stream = NetworkStream; } public void MarkUsed() { @@ -73,6 +76,9 @@ namespace Hamakaze { throw new HttpConnectionAlreadyUpgradedException(); HasUpgraded = true; + NetworkStream.ReadTimeout = -1; + SslStream.ReadTimeout = -1; + return new WsConnection(Stream); }