Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,12 @@
*/
package com.alibaba.csp.sentinel.cluster.server;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import com.alibaba.csp.sentinel.cluster.server.codec.netty.NettyRequestDecoder;
import com.alibaba.csp.sentinel.cluster.server.codec.netty.NettyResponseEncoder;
import com.alibaba.csp.sentinel.cluster.server.connection.Connection;
import com.alibaba.csp.sentinel.cluster.server.connection.ConnectionPool;
import com.alibaba.csp.sentinel.cluster.server.handler.TokenServerHandler;
import com.alibaba.csp.sentinel.log.RecordLog;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
Expand All @@ -42,7 +37,14 @@
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.internal.SystemPropertyUtil;

import static com.alibaba.csp.sentinel.cluster.server.ServerConstants.*;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import static com.alibaba.csp.sentinel.cluster.server.ServerConstants.NETTY_MAX_FRAME_LENGTH;
import static com.alibaba.csp.sentinel.cluster.server.ServerConstants.SERVER_STATUS_OFF;
import static com.alibaba.csp.sentinel.cluster.server.ServerConstants.SERVER_STATUS_STARTED;
import static com.alibaba.csp.sentinel.cluster.server.ServerConstants.SERVER_STATUS_STARTING;

/**
* @author Eric Zhao
Expand All @@ -51,7 +53,7 @@
public class NettyTransportServer implements ClusterTokenServer {

private static final int DEFAULT_EVENT_LOOP_THREADS = Math.max(1,
SystemPropertyUtil.getInt("io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));
SystemPropertyUtil.getInt("io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));
private static final int MAX_RETRY_TIMES = 3;
private static final int RETRY_SLEEP_MS = 2000;

Expand Down Expand Up @@ -79,32 +81,32 @@ public void start() {
this.bossGroup = new NioEventLoopGroup(1);
this.workerGroup = new NioEventLoopGroup(DEFAULT_EVENT_LOOP_THREADS);
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2));
p.addLast(new NettyRequestDecoder());
p.addLast(new LengthFieldPrepender(2));
p.addLast(new NettyResponseEncoder());
p.addLast(new TokenServerHandler(connectionPool));
}
})
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.SO_SNDBUF, 32 * 1024)
.childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
.childOption(ChannelOption.SO_TIMEOUT, 10)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_RCVBUF, 32 * 1024);
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new LengthFieldBasedFrameDecoder(NETTY_MAX_FRAME_LENGTH, 0, 2, 0, 2));
p.addLast(new NettyRequestDecoder());
p.addLast(new LengthFieldPrepender(2));
p.addLast(new NettyResponseEncoder());
p.addLast(new TokenServerHandler(connectionPool));
}
})
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.SO_SNDBUF, 32 * 1024)
.childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
.childOption(ChannelOption.SO_TIMEOUT, 10)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_RCVBUF, 32 * 1024);
b.bind(port).addListener(new GenericFutureListener<ChannelFuture>() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.cause() != null) {
RecordLog.info("[NettyTransportServer] Token server start failed (port=" + port + "), failedTimes: " + failedTimes.get(),
future.cause());
future.cause());
currentState.compareAndSet(SERVER_STATUS_STARTING, SERVER_STATUS_OFF);
int failCount = failedTimes.incrementAndGet();
if (failCount > MAX_RETRY_TIMES) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,8 @@ public final class ServerConstants {

public static final String DEFAULT_NAMESPACE = "default";

private ServerConstants() {}
public static final int NETTY_MAX_FRAME_LENGTH = 1024;

private ServerConstants() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@
*/
package com.alibaba.csp.sentinel.cluster.server.codec.data;

import java.util.ArrayList;
import java.util.List;

import com.alibaba.csp.sentinel.cluster.ClusterConstants;
import com.alibaba.csp.sentinel.cluster.codec.EntityDecoder;
import com.alibaba.csp.sentinel.cluster.request.data.ParamFlowRequestData;

import com.alibaba.csp.sentinel.cluster.server.ServerConstants;
import io.netty.buffer.ByteBuf;

import java.util.ArrayList;
import java.util.List;

/**
* @author jialiang.linjl
* @author Eric Zhao
Expand All @@ -35,12 +35,12 @@ public class ParamFlowRequestDataDecoder implements EntityDecoder<ByteBuf, Param
public ParamFlowRequestData decode(ByteBuf source) {
if (source.readableBytes() >= 16) {
ParamFlowRequestData requestData = new ParamFlowRequestData()
.setFlowId(source.readLong())
.setCount(source.readInt());
.setFlowId(source.readLong())
.setCount(source.readInt());

int amount = source.readInt();
if (amount > 0) {
List<Object> params = new ArrayList<>(amount);
List<Object> params = new ArrayList<>(16);
for (int i = 0; i < amount; i++) {
decodeParam(source, params);
}
Expand All @@ -61,6 +61,9 @@ private boolean decodeParam(ByteBuf source, List<Object> params) {
return true;
case ClusterConstants.PARAM_TYPE_STRING:
int length = source.readInt();
if (length > ServerConstants.NETTY_MAX_FRAME_LENGTH) {
throw new IllegalStateException("[ParamFlowRequestDataDecoder] String param length (" + length + ") exceeds max frame length (" + ServerConstants.NETTY_MAX_FRAME_LENGTH + ")");
}
byte[] bytes = new byte[length];
source.readBytes(bytes);
// TODO: take care of charset?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package com.alibaba.csp.sentinel.cluster.server.codec.data;

import com.alibaba.csp.sentinel.cluster.codec.EntityDecoder;

import com.alibaba.csp.sentinel.log.RecordLog;
import io.netty.buffer.ByteBuf;

/**
Expand All @@ -27,13 +27,16 @@ public class PingRequestDataDecoder implements EntityDecoder<ByteBuf, String> {

@Override
public String decode(ByteBuf source) {
if (source.readableBytes() >= 4) {
int length = source.readInt();
if (length > 0 && source.readableBytes() > 0) {
byte[] bytes = new byte[length];
source.readBytes(bytes);
return new String(bytes);
int readableBytes = source.readableBytes();
if (readableBytes >= 4) {
int packetLen = source.readInt();
if (readableBytes != packetLen) { // 畸形报文,可能是端口扫描
RecordLog.warn("[NettyTransportServer] Abnormal packet detected: expected length = {}, actual readableBytes = {}", packetLen, readableBytes);
return null;
}
byte[] bytes = new byte[packetLen];
source.readBytes(bytes);
return new String(bytes);
}
return null;
}
Expand Down