Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions examples/WebSocket/WebSocket.ino
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,13 @@ void setup() {
server.begin();
}

static uint32_t lastWS = 0;
static uint32_t deltaWS = 500;
#ifdef ESP32
static const uint32_t deltaWS = 50;
#else
static const uint32_t deltaWS = 200;
#endif

static uint32_t lastWS = 0;
static uint32_t lastHeap = 0;

void loop() {
Expand All @@ -186,6 +190,10 @@ void loop() {
// this can be called to also set a soft limit on the number of connected clients
ws.cleanupClients(2); // no more than 2 clients

// ping twice (2 control frames)
ws.pingAll();
ws.pingAll();

#ifdef ESP32
Serial.printf("Free heap: %" PRIu32 "\n", ESP.getFreeHeap());
#endif
Expand Down
91 changes: 71 additions & 20 deletions src/AsyncWebSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,62 +164,67 @@ bool AsyncWebSocketMessageBuffer::reserve(size_t size) {
AsyncWebSocketMessage::AsyncWebSocketMessage(AsyncWebSocketSharedBuffer buffer, uint8_t opcode, bool mask)
: _WSbuffer{buffer}, _opcode(opcode & 0x07), _mask{mask}, _status{_WSbuffer ? WS_MSG_SENDING : WS_MSG_ERROR} {}

void AsyncWebSocketMessage::ack(size_t len, uint32_t time) {
size_t AsyncWebSocketMessage::ack(size_t len, uint32_t time) {
(void)time;
_acked += len;
const size_t pending = std::min(len, _ack - _acked);
_acked += pending;
if (_sent >= _WSbuffer->size() && _acked >= _ack) {
_status = WS_MSG_SENT;
}
// ets_printf("A: %u\n", len);
async_ws_log_v("msg code: %" PRIu8 ", ack: %u/%u, remain=%u/%u, status: %d", _opcode, _acked, _ack, len - pending, len, static_cast<int>(_status));
return len - pending;
}

size_t AsyncWebSocketMessage::send(AsyncClient *client) {
if (!client) {
async_ws_log_v("No client");
return 0;
}

if (_status != WS_MSG_SENDING) {
async_ws_log_v("C[%" PRIu16 "] Wrong status: got: %d, expected: %d", client->remotePort(), static_cast<int>(_status), static_cast<int>(WS_MSG_SENDING));
return 0;
}
if (_acked < _ack) {
return 0;
}

if (_sent == _WSbuffer->size()) {
if (_acked == _ack) {
_status = WS_MSG_SENT;
}
async_ws_log_v("C[%" PRIu16 "] Already sent: %u/%u", client->remotePort(), _sent, _WSbuffer->size());
return 0;
}
if (_sent > _WSbuffer->size()) {
_status = WS_MSG_ERROR;
// ets_printf("E: %u > %u\n", _sent, _WSbuffer->length());
async_ws_log_v("C[%" PRIu16 "] Error, sent more: %u/%u", client->remotePort(), _sent, _WSbuffer->size());
return 0;
}

size_t toSend = _WSbuffer->size() - _sent;
size_t window = webSocketSendFrameWindow(client);
const size_t window = webSocketSendFrameWindow(client);

if (window < toSend) {
toSend = window;
// not enough space in lwip buffer ?
if (!window) {
async_ws_log_v("C[%" PRIu16 "] No space left to send more data: acked: %u, sent: %u, remaining: %u", client->remotePort(), _acked, _sent, toSend);
return 0;
}

toSend = std::min(toSend, window);

_sent += toSend;
_ack += toSend + ((toSend < 126) ? 2 : 4) + (_mask * 4);

// ets_printf("W: %u %u\n", _sent - toSend, toSend);

bool final = (_sent == _WSbuffer->size());
uint8_t *dPtr = (uint8_t *)(_WSbuffer->data() + (_sent - toSend));
uint8_t opCode = (toSend && _sent == toSend) ? _opcode : (uint8_t)WS_CONTINUATION;

size_t sent = webSocketSendFrame(client, final, opCode, _mask, dPtr, toSend);
_status = WS_MSG_SENDING;
if (toSend && sent != toSend) {
// ets_printf("E: %u != %u\n", toSend, sent);
_sent -= (toSend - sent);
_ack -= (toSend - sent);
}
// ets_printf("S: %u %u\n", _sent, sent);

async_ws_log_v("C[%" PRIu16 "] Sent %u/%u, ack: %u/%u, final: %d", client->remotePort(), _sent, _WSbuffer->size(), _acked, _ack, final);
return sent;
}

Expand Down Expand Up @@ -328,7 +333,12 @@ void AsyncWebSocketClient::_onAck(size_t len, uint32_t time) {
}

if (len && !_messageQueue.empty()) {
_messageQueue.front().ack(len, time);
for (auto &msg : _messageQueue) {
len = msg.ack(len, time);
if (len == 0) {
break;
}
}
}

_clearQueue();
Expand Down Expand Up @@ -362,11 +372,52 @@ void AsyncWebSocketClient::_runQueue() {

_clearQueue();

if (!_controlQueue.empty() && (_messageQueue.empty() || _messageQueue.front().betweenFrames())
&& webSocketSendFrameWindow(_client) > (size_t)(_controlQueue.front().len() - 1)) {
_controlQueue.front().send(_client);
} else if (!_messageQueue.empty() && _messageQueue.front().betweenFrames() && webSocketSendFrameWindow(_client)) {
_messageQueue.front().send(_client);
size_t space = webSocketSendFrameWindow(_client);

if (space) {
// control frames have priority over message frames
// we can send a control frame if:
// - there is no message frame in the queue, or the first message frame is between frames (all bytes sent are acked)
// - the control frame is not finished (not sent yet)
// - there is enough space to send the control frame (control frames are small, at most 129 bytes, so we can assume that if there is space to send it, it can be sent in one go)
if (_messageQueue.empty() || _messageQueue.front().betweenFrames()) {
for (auto &ctrl : _controlQueue) {
if (ctrl.finished()) {
continue;
}
if (space > (size_t)(ctrl.len() - 1)) {
async_ws_log_v("WS[%" PRIu32 "] Sending control frame: %" PRIu8 ", len: %" PRIu8, _clientId, ctrl.opcode(), ctrl.len());
ctrl.send(_client);
space = webSocketSendFrameWindow(_client);
}
}
}

// then we can send message frames if there is space
if (space) {
for (auto &msg : _messageQueue) {
if (msg._remainingBytesToSend()) {
async_ws_log_v(
"WS[%" PRIu32 "] Send message fragment: %u/%u, acked: %u/%u", _clientId, msg._remainingBytesToSend(), msg._sent + msg._remainingBytesToSend(),
msg._acked, msg._ack
);
// will use all the remaining space, or all the remaining bytes to send, whichever is smaller
msg.send(_client);
space = webSocketSendFrameWindow(_client);

// If we haven't finished sending this message, we must stop here to preserve WebSocket ordering.
// We can only pipeline subsequent messages if the current one is fully passed to TCP buffer.
if (msg._remainingBytesToSend()) {
break;
}
}

// not enough space for another message
if (!space) {
break;
}
}
}
}
}

Expand Down
8 changes: 7 additions & 1 deletion src/AsyncWebSocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,13 @@ class AsyncWebSocketMessageBuffer {
};

class AsyncWebSocketMessage {
friend AsyncWebSocketClient;

private:
size_t _remainingBytesToSend() const {
return _WSbuffer->size() - _sent;
}

AsyncWebSocketSharedBuffer _WSbuffer;
uint8_t _opcode{WS_TEXT};
bool _mask{false};
Expand All @@ -203,7 +209,7 @@ class AsyncWebSocketMessage {
return _acked == _ack;
}

void ack(size_t len, uint32_t time);
size_t ack(size_t len, uint32_t time);
size_t send(AsyncClient *client);
};

Expand Down