From 293944d9d1cc251a4ab2fcab2cfc8c183cafef33 Mon Sep 17 00:00:00 2001 From: Barry Broderick Date: Tue, 10 Feb 2026 23:10:23 -0500 Subject: [PATCH 1/3] Increase WebSocket throughput by reworking ack handling Enable processing of cumulative ACKs across multiple queued messages Larger window size for reduced fragmentation Update src/AsyncWebSocket.cpp Co-authored-by: Mathieu Carbou Revert threshold to 9 improve readability Fix loop exit condition Co-authored-by: Mathieu Carbou Replace public sent() with private _remainingBytesToSend() Leverage _remainingBytesToSend() and tidy messageQueue sending --- src/AsyncWebSocket.cpp | 55 +++++++++++++++++++++++++++++++----------- src/AsyncWebSocket.h | 10 ++++++-- 2 files changed, 49 insertions(+), 16 deletions(-) diff --git a/src/AsyncWebSocket.cpp b/src/AsyncWebSocket.cpp index f30c79dd..e2cdab6e 100644 --- a/src/AsyncWebSocket.cpp +++ b/src/AsyncWebSocket.cpp @@ -164,13 +164,16 @@ bool AsyncWebSocketMessageBuffer::reserve(size_t size) { AsyncWebSocketMessage::AsyncWebSocketMessage(AsyncWebSocketSharedBuffer buffer, uint8_t opcode, bool mask) : _WSbuffer{buffer}, _opcode(opcode & 0x07), _mask{mask}, _status{_WSbuffer ? WS_MSG_SENDING : WS_MSG_ERROR} {} -void AsyncWebSocketMessage::ack(size_t len, uint32_t time) { +size_t AsyncWebSocketMessage::ack(size_t len, uint32_t time) { (void)time; - _acked += len; - if (_sent >= _WSbuffer->size() && _acked >= _ack) { + const size_t pending = _ack - _acked; + const size_t received = std::min(len, pending); + _acked += received; + if (_sent >= _WSbuffer->size() && !pending) { _status = WS_MSG_SENT; } - // ets_printf("A: %u\n", len); + async_ws_log_v("status: %d, ack: %u/%u\n", static_cast(_status), _acked, _ack); + return len - received; } size_t AsyncWebSocketMessage::send(AsyncClient *client) { @@ -181,9 +184,7 @@ size_t AsyncWebSocketMessage::send(AsyncClient *client) { if (_status != WS_MSG_SENDING) { return 0; } - if (_acked < _ack) { - return 0; - } + if (_sent == _WSbuffer->size()) { if (_acked == _ack) { _status = WS_MSG_SENT; @@ -197,11 +198,14 @@ size_t AsyncWebSocketMessage::send(AsyncClient *client) { } size_t toSend = _WSbuffer->size() - _sent; - size_t window = webSocketSendFrameWindow(client); + const size_t window = webSocketSendFrameWindow(client); - if (window < toSend) { - toSend = window; + // not enough space in lwip buffer ? + if (!window) { + return 0; } + + toSend = std::min(toSend, window); _sent += toSend; _ack += toSend + ((toSend < 126) ? 2 : 4) + (_mask * 4); @@ -328,7 +332,12 @@ void AsyncWebSocketClient::_onAck(size_t len, uint32_t time) { } if (len && !_messageQueue.empty()) { - _messageQueue.front().ack(len, time); + for (auto &msg : _messageQueue) { + len = msg.ack(len, time); + if (len == 0) { + break; + } + } } _clearQueue(); @@ -336,6 +345,7 @@ void AsyncWebSocketClient::_onAck(size_t len, uint32_t time) { _runQueue(); } + void AsyncWebSocketClient::_onPoll() { if (!_client) { return; @@ -362,11 +372,28 @@ void AsyncWebSocketClient::_runQueue() { _clearQueue(); - if (!_controlQueue.empty() && (_messageQueue.empty() || _messageQueue.front().betweenFrames()) + if (!_controlQueue.empty() && !_controlQueue.front().finished() && (_messageQueue.empty() || _messageQueue.front().betweenFrames()) && webSocketSendFrameWindow(_client) > (size_t)(_controlQueue.front().len() - 1)) { _controlQueue.front().send(_client); - } else if (!_messageQueue.empty() && _messageQueue.front().betweenFrames() && webSocketSendFrameWindow(_client)) { - _messageQueue.front().send(_client); + } + + if (webSocketSendFrameWindow(_client)) { + for (auto &msg : _messageQueue) { + if (msg._remainingBytesToSend()) { + msg.send(_client); + } + + // If we haven't finished sending this message, we must stop here to preserve WebSocket ordering. + // We can only pipeline subsequent messages if the current one is fully passed to TCP buffer. + if (msg._remainingBytesToSend()) { + break; + } + + // not enough space for another message + if(!webSocketSendFrameWindow(_client)) { + return; + } + } } } diff --git a/src/AsyncWebSocket.h b/src/AsyncWebSocket.h index 2e67760f..430241a2 100644 --- a/src/AsyncWebSocket.h +++ b/src/AsyncWebSocket.h @@ -184,7 +184,13 @@ class AsyncWebSocketMessageBuffer { }; class AsyncWebSocketMessage { + friend AsyncWebSocketClient; + private: + size_t _remainingBytesToSend() const { + return _WSbuffer->size() - _sent; + } + AsyncWebSocketSharedBuffer _WSbuffer; uint8_t _opcode{WS_TEXT}; bool _mask{false}; @@ -202,8 +208,8 @@ class AsyncWebSocketMessage { bool betweenFrames() const { return _acked == _ack; } - - void ack(size_t len, uint32_t time); + + size_t ack(size_t len, uint32_t time); size_t send(AsyncClient *client); }; From 7f177230c7aa16963e985f409695b9db7419da94 Mon Sep 17 00:00:00 2001 From: Mathieu Carbou Date: Thu, 12 Feb 2026 10:26:38 +0100 Subject: [PATCH 2/3] PR Review & Testing - Increase throughput of WS example - Remove \n at the end of log line - Fix status that was incorrectly kept to 0 even when all bytes acked - Added traces in sent method --- examples/WebSocket/WebSocket.ino | 12 ++++++++-- src/AsyncWebSocket.cpp | 38 ++++++++++++++++---------------- src/AsyncWebSocket.h | 4 ++-- 3 files changed, 31 insertions(+), 23 deletions(-) diff --git a/examples/WebSocket/WebSocket.ino b/examples/WebSocket/WebSocket.ino index c82f2d30..010ec194 100644 --- a/examples/WebSocket/WebSocket.ino +++ b/examples/WebSocket/WebSocket.ino @@ -167,9 +167,13 @@ void setup() { server.begin(); } -static uint32_t lastWS = 0; -static uint32_t deltaWS = 500; +#ifdef ESP32 +static const uint32_t deltaWS = 50; +#else +static const uint32_t deltaWS = 200; +#endif +static uint32_t lastWS = 0; static uint32_t lastHeap = 0; void loop() { @@ -186,6 +190,10 @@ void loop() { // this can be called to also set a soft limit on the number of connected clients ws.cleanupClients(2); // no more than 2 clients + // ping twice (2 control frames) + ws.pingAll(); + ws.pingAll(); + #ifdef ESP32 Serial.printf("Free heap: %" PRIu32 "\n", ESP.getFreeHeap()); #endif diff --git a/src/AsyncWebSocket.cpp b/src/AsyncWebSocket.cpp index e2cdab6e..9fac47c5 100644 --- a/src/AsyncWebSocket.cpp +++ b/src/AsyncWebSocket.cpp @@ -166,22 +166,23 @@ AsyncWebSocketMessage::AsyncWebSocketMessage(AsyncWebSocketSharedBuffer buffer, size_t AsyncWebSocketMessage::ack(size_t len, uint32_t time) { (void)time; - const size_t pending = _ack - _acked; - const size_t received = std::min(len, pending); - _acked += received; - if (_sent >= _WSbuffer->size() && !pending) { + const size_t pending = std::min(len, _ack - _acked); + _acked += pending; + if (_sent >= _WSbuffer->size() && _acked >= _ack) { _status = WS_MSG_SENT; } - async_ws_log_v("status: %d, ack: %u/%u\n", static_cast(_status), _acked, _ack); - return len - received; + async_ws_log_v("msg code: %" PRIu8 ", ack: %u/%u, remain=%u/%u, status: %d", _opcode, _acked, _ack, len - pending, len, static_cast(_status)); + return len - pending; } size_t AsyncWebSocketMessage::send(AsyncClient *client) { if (!client) { + async_ws_log_v("No client"); return 0; } if (_status != WS_MSG_SENDING) { + async_ws_log_v("C[%" PRIu16 "] Wrong status: got: %d, expected: %d", client->remotePort(), static_cast(_status), static_cast(WS_MSG_SENDING)); return 0; } @@ -189,29 +190,29 @@ size_t AsyncWebSocketMessage::send(AsyncClient *client) { if (_acked == _ack) { _status = WS_MSG_SENT; } + async_ws_log_v("C[%" PRIu16 "] Already sent: %u/%u", client->remotePort(), _sent, _WSbuffer->size()); return 0; } if (_sent > _WSbuffer->size()) { _status = WS_MSG_ERROR; - // ets_printf("E: %u > %u\n", _sent, _WSbuffer->length()); + async_ws_log_v("C[%" PRIu16 "] Error, sent more: %u/%u", client->remotePort(), _sent, _WSbuffer->size()); return 0; } size_t toSend = _WSbuffer->size() - _sent; const size_t window = webSocketSendFrameWindow(client); - // not enough space in lwip buffer ? + // not enough space in lwip buffer ? if (!window) { + async_ws_log_v("C[%" PRIu16 "] No space left to send more data: acked: %u, sent: %u, remaining: %u", client->remotePort(), _acked, _sent, toSend); return 0; } - + toSend = std::min(toSend, window); _sent += toSend; _ack += toSend + ((toSend < 126) ? 2 : 4) + (_mask * 4); - // ets_printf("W: %u %u\n", _sent - toSend, toSend); - bool final = (_sent == _WSbuffer->size()); uint8_t *dPtr = (uint8_t *)(_WSbuffer->data() + (_sent - toSend)); uint8_t opCode = (toSend && _sent == toSend) ? _opcode : (uint8_t)WS_CONTINUATION; @@ -219,11 +220,11 @@ size_t AsyncWebSocketMessage::send(AsyncClient *client) { size_t sent = webSocketSendFrame(client, final, opCode, _mask, dPtr, toSend); _status = WS_MSG_SENDING; if (toSend && sent != toSend) { - // ets_printf("E: %u != %u\n", toSend, sent); _sent -= (toSend - sent); _ack -= (toSend - sent); } - // ets_printf("S: %u %u\n", _sent, sent); + + async_ws_log_v("C[%" PRIu16 "] Sent %u/%u, ack: %u/%u, final: %d", client->remotePort(), _sent, _WSbuffer->size(), _acked, _ack, final); return sent; } @@ -345,7 +346,6 @@ void AsyncWebSocketClient::_onAck(size_t len, uint32_t time) { _runQueue(); } - void AsyncWebSocketClient::_onPoll() { if (!_client) { return; @@ -375,22 +375,22 @@ void AsyncWebSocketClient::_runQueue() { if (!_controlQueue.empty() && !_controlQueue.front().finished() && (_messageQueue.empty() || _messageQueue.front().betweenFrames()) && webSocketSendFrameWindow(_client) > (size_t)(_controlQueue.front().len() - 1)) { _controlQueue.front().send(_client); - } - + } + if (webSocketSendFrameWindow(_client)) { for (auto &msg : _messageQueue) { if (msg._remainingBytesToSend()) { msg.send(_client); } - + // If we haven't finished sending this message, we must stop here to preserve WebSocket ordering. // We can only pipeline subsequent messages if the current one is fully passed to TCP buffer. if (msg._remainingBytesToSend()) { break; } - + // not enough space for another message - if(!webSocketSendFrameWindow(_client)) { + if (!webSocketSendFrameWindow(_client)) { return; } } diff --git a/src/AsyncWebSocket.h b/src/AsyncWebSocket.h index 430241a2..33b04ba5 100644 --- a/src/AsyncWebSocket.h +++ b/src/AsyncWebSocket.h @@ -185,7 +185,7 @@ class AsyncWebSocketMessageBuffer { class AsyncWebSocketMessage { friend AsyncWebSocketClient; - + private: size_t _remainingBytesToSend() const { return _WSbuffer->size() - _sent; @@ -208,7 +208,7 @@ class AsyncWebSocketMessage { bool betweenFrames() const { return _acked == _ack; } - + size_t ack(size_t len, uint32_t time); size_t send(AsyncClient *client); }; From a3a1466bf1d67c1c3c1cd8b21cea440af78ffd77 Mon Sep 17 00:00:00 2001 From: Mathieu Carbou Date: Thu, 12 Feb 2026 12:37:05 +0100 Subject: [PATCH 3/3] Reworked _runQueue to prioritize control frames --- src/AsyncWebSocket.cpp | 58 +++++++++++++++++++++++++++++------------- 1 file changed, 41 insertions(+), 17 deletions(-) diff --git a/src/AsyncWebSocket.cpp b/src/AsyncWebSocket.cpp index 9fac47c5..04f2703c 100644 --- a/src/AsyncWebSocket.cpp +++ b/src/AsyncWebSocket.cpp @@ -372,26 +372,50 @@ void AsyncWebSocketClient::_runQueue() { _clearQueue(); - if (!_controlQueue.empty() && !_controlQueue.front().finished() && (_messageQueue.empty() || _messageQueue.front().betweenFrames()) - && webSocketSendFrameWindow(_client) > (size_t)(_controlQueue.front().len() - 1)) { - _controlQueue.front().send(_client); - } - - if (webSocketSendFrameWindow(_client)) { - for (auto &msg : _messageQueue) { - if (msg._remainingBytesToSend()) { - msg.send(_client); + size_t space = webSocketSendFrameWindow(_client); + + if (space) { + // control frames have priority over message frames + // we can send a control frame if: + // - there is no message frame in the queue, or the first message frame is between frames (all bytes sent are acked) + // - the control frame is not finished (not sent yet) + // - there is enough space to send the control frame (control frames are small, at most 129 bytes, so we can assume that if there is space to send it, it can be sent in one go) + if (_messageQueue.empty() || _messageQueue.front().betweenFrames()) { + for (auto &ctrl : _controlQueue) { + if (ctrl.finished()) { + continue; + } + if (space > (size_t)(ctrl.len() - 1)) { + async_ws_log_v("WS[%" PRIu32 "] Sending control frame: %" PRIu8 ", len: %" PRIu8, _clientId, ctrl.opcode(), ctrl.len()); + ctrl.send(_client); + space = webSocketSendFrameWindow(_client); + } } + } - // If we haven't finished sending this message, we must stop here to preserve WebSocket ordering. - // We can only pipeline subsequent messages if the current one is fully passed to TCP buffer. - if (msg._remainingBytesToSend()) { - break; - } + // then we can send message frames if there is space + if (space) { + for (auto &msg : _messageQueue) { + if (msg._remainingBytesToSend()) { + async_ws_log_v( + "WS[%" PRIu32 "] Send message fragment: %u/%u, acked: %u/%u", _clientId, msg._remainingBytesToSend(), msg._sent + msg._remainingBytesToSend(), + msg._acked, msg._ack + ); + // will use all the remaining space, or all the remaining bytes to send, whichever is smaller + msg.send(_client); + space = webSocketSendFrameWindow(_client); + + // If we haven't finished sending this message, we must stop here to preserve WebSocket ordering. + // We can only pipeline subsequent messages if the current one is fully passed to TCP buffer. + if (msg._remainingBytesToSend()) { + break; + } + } - // not enough space for another message - if (!webSocketSendFrameWindow(_client)) { - return; + // not enough space for another message + if (!space) { + break; + } } } }