diff --git a/examples/WebSocket/WebSocket.ino b/examples/WebSocket/WebSocket.ino index c82f2d30..010ec194 100644 --- a/examples/WebSocket/WebSocket.ino +++ b/examples/WebSocket/WebSocket.ino @@ -167,9 +167,13 @@ void setup() { server.begin(); } -static uint32_t lastWS = 0; -static uint32_t deltaWS = 500; +#ifdef ESP32 +static const uint32_t deltaWS = 50; +#else +static const uint32_t deltaWS = 200; +#endif +static uint32_t lastWS = 0; static uint32_t lastHeap = 0; void loop() { @@ -186,6 +190,10 @@ void loop() { // this can be called to also set a soft limit on the number of connected clients ws.cleanupClients(2); // no more than 2 clients + // ping twice (2 control frames) + ws.pingAll(); + ws.pingAll(); + #ifdef ESP32 Serial.printf("Free heap: %" PRIu32 "\n", ESP.getFreeHeap()); #endif diff --git a/src/AsyncWebSocket.cpp b/src/AsyncWebSocket.cpp index f30c79dd..04f2703c 100644 --- a/src/AsyncWebSocket.cpp +++ b/src/AsyncWebSocket.cpp @@ -164,50 +164,55 @@ bool AsyncWebSocketMessageBuffer::reserve(size_t size) { AsyncWebSocketMessage::AsyncWebSocketMessage(AsyncWebSocketSharedBuffer buffer, uint8_t opcode, bool mask) : _WSbuffer{buffer}, _opcode(opcode & 0x07), _mask{mask}, _status{_WSbuffer ? WS_MSG_SENDING : WS_MSG_ERROR} {} -void AsyncWebSocketMessage::ack(size_t len, uint32_t time) { +size_t AsyncWebSocketMessage::ack(size_t len, uint32_t time) { (void)time; - _acked += len; + const size_t pending = std::min(len, _ack - _acked); + _acked += pending; if (_sent >= _WSbuffer->size() && _acked >= _ack) { _status = WS_MSG_SENT; } - // ets_printf("A: %u\n", len); + async_ws_log_v("msg code: %" PRIu8 ", ack: %u/%u, remain=%u/%u, status: %d", _opcode, _acked, _ack, len - pending, len, static_cast(_status)); + return len - pending; } size_t AsyncWebSocketMessage::send(AsyncClient *client) { if (!client) { + async_ws_log_v("No client"); return 0; } if (_status != WS_MSG_SENDING) { + async_ws_log_v("C[%" PRIu16 "] Wrong status: got: %d, expected: %d", client->remotePort(), static_cast(_status), static_cast(WS_MSG_SENDING)); return 0; } - if (_acked < _ack) { - return 0; - } + if (_sent == _WSbuffer->size()) { if (_acked == _ack) { _status = WS_MSG_SENT; } + async_ws_log_v("C[%" PRIu16 "] Already sent: %u/%u", client->remotePort(), _sent, _WSbuffer->size()); return 0; } if (_sent > _WSbuffer->size()) { _status = WS_MSG_ERROR; - // ets_printf("E: %u > %u\n", _sent, _WSbuffer->length()); + async_ws_log_v("C[%" PRIu16 "] Error, sent more: %u/%u", client->remotePort(), _sent, _WSbuffer->size()); return 0; } size_t toSend = _WSbuffer->size() - _sent; - size_t window = webSocketSendFrameWindow(client); + const size_t window = webSocketSendFrameWindow(client); - if (window < toSend) { - toSend = window; + // not enough space in lwip buffer ? + if (!window) { + async_ws_log_v("C[%" PRIu16 "] No space left to send more data: acked: %u, sent: %u, remaining: %u", client->remotePort(), _acked, _sent, toSend); + return 0; } + toSend = std::min(toSend, window); + _sent += toSend; _ack += toSend + ((toSend < 126) ? 2 : 4) + (_mask * 4); - // ets_printf("W: %u %u\n", _sent - toSend, toSend); - bool final = (_sent == _WSbuffer->size()); uint8_t *dPtr = (uint8_t *)(_WSbuffer->data() + (_sent - toSend)); uint8_t opCode = (toSend && _sent == toSend) ? _opcode : (uint8_t)WS_CONTINUATION; @@ -215,11 +220,11 @@ size_t AsyncWebSocketMessage::send(AsyncClient *client) { size_t sent = webSocketSendFrame(client, final, opCode, _mask, dPtr, toSend); _status = WS_MSG_SENDING; if (toSend && sent != toSend) { - // ets_printf("E: %u != %u\n", toSend, sent); _sent -= (toSend - sent); _ack -= (toSend - sent); } - // ets_printf("S: %u %u\n", _sent, sent); + + async_ws_log_v("C[%" PRIu16 "] Sent %u/%u, ack: %u/%u, final: %d", client->remotePort(), _sent, _WSbuffer->size(), _acked, _ack, final); return sent; } @@ -328,7 +333,12 @@ void AsyncWebSocketClient::_onAck(size_t len, uint32_t time) { } if (len && !_messageQueue.empty()) { - _messageQueue.front().ack(len, time); + for (auto &msg : _messageQueue) { + len = msg.ack(len, time); + if (len == 0) { + break; + } + } } _clearQueue(); @@ -362,11 +372,52 @@ void AsyncWebSocketClient::_runQueue() { _clearQueue(); - if (!_controlQueue.empty() && (_messageQueue.empty() || _messageQueue.front().betweenFrames()) - && webSocketSendFrameWindow(_client) > (size_t)(_controlQueue.front().len() - 1)) { - _controlQueue.front().send(_client); - } else if (!_messageQueue.empty() && _messageQueue.front().betweenFrames() && webSocketSendFrameWindow(_client)) { - _messageQueue.front().send(_client); + size_t space = webSocketSendFrameWindow(_client); + + if (space) { + // control frames have priority over message frames + // we can send a control frame if: + // - there is no message frame in the queue, or the first message frame is between frames (all bytes sent are acked) + // - the control frame is not finished (not sent yet) + // - there is enough space to send the control frame (control frames are small, at most 129 bytes, so we can assume that if there is space to send it, it can be sent in one go) + if (_messageQueue.empty() || _messageQueue.front().betweenFrames()) { + for (auto &ctrl : _controlQueue) { + if (ctrl.finished()) { + continue; + } + if (space > (size_t)(ctrl.len() - 1)) { + async_ws_log_v("WS[%" PRIu32 "] Sending control frame: %" PRIu8 ", len: %" PRIu8, _clientId, ctrl.opcode(), ctrl.len()); + ctrl.send(_client); + space = webSocketSendFrameWindow(_client); + } + } + } + + // then we can send message frames if there is space + if (space) { + for (auto &msg : _messageQueue) { + if (msg._remainingBytesToSend()) { + async_ws_log_v( + "WS[%" PRIu32 "] Send message fragment: %u/%u, acked: %u/%u", _clientId, msg._remainingBytesToSend(), msg._sent + msg._remainingBytesToSend(), + msg._acked, msg._ack + ); + // will use all the remaining space, or all the remaining bytes to send, whichever is smaller + msg.send(_client); + space = webSocketSendFrameWindow(_client); + + // If we haven't finished sending this message, we must stop here to preserve WebSocket ordering. + // We can only pipeline subsequent messages if the current one is fully passed to TCP buffer. + if (msg._remainingBytesToSend()) { + break; + } + } + + // not enough space for another message + if (!space) { + break; + } + } + } } } diff --git a/src/AsyncWebSocket.h b/src/AsyncWebSocket.h index 2e67760f..33b04ba5 100644 --- a/src/AsyncWebSocket.h +++ b/src/AsyncWebSocket.h @@ -184,7 +184,13 @@ class AsyncWebSocketMessageBuffer { }; class AsyncWebSocketMessage { + friend AsyncWebSocketClient; + private: + size_t _remainingBytesToSend() const { + return _WSbuffer->size() - _sent; + } + AsyncWebSocketSharedBuffer _WSbuffer; uint8_t _opcode{WS_TEXT}; bool _mask{false}; @@ -203,7 +209,7 @@ class AsyncWebSocketMessage { return _acked == _ack; } - void ack(size_t len, uint32_t time); + size_t ack(size_t len, uint32_t time); size_t send(AsyncClient *client); };