优化
本博客是根据黑马程序员Netty实战 学习时所做的笔记
1、拓展序列化算法 序列化接口 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public interface Serializer { <T> byte [] serialize(T object); <T> T deserialize (Class<T> clazz, byte [] bytes) ; }
枚举实现类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 public enum SerializerAlgorithm implements Serializer { Java { @Override public <T> byte [] serialize(T object) { byte [] bytes = null ; try (ByteArrayOutputStream bos = new ByteArrayOutputStream (); ObjectOutputStream oos = new ObjectOutputStream (bos)) { oos.writeObject(object); bytes = bos.toByteArray(); } catch (IOException e) { e.printStackTrace(); } return bytes; } @Override public <T> T deserialize (Class<T> clazz, byte [] bytes) { T target = null ; System.out.println(Arrays.toString(bytes)); try (ByteArrayInputStream bis = new ByteArrayInputStream (bytes); ObjectInputStream ois = new ObjectInputStream (bis)) { target = (T) ois.readObject(); } catch (IOException | ClassNotFoundException e) { e.printStackTrace(); } return target; } } Json { @Override public <T> byte [] serialize(T object) { String s = new Gson ().toJson(object); System.out.println(s); return s.getBytes(StandardCharsets.UTF_8); } @Override public <T> T deserialize (Class<T> clazz, byte [] bytes) { String s = new String (bytes, StandardCharsets.UTF_8); System.out.println(s); return new Gson ().fromJson(s, clazz); } } }
修改原编解码器 编码
1 2 3 4 5 SerializerAlgorithm[] values = SerializerAlgorithm.values(); byte [] bytes = values[out.getByte(5 )-1 ].serialize(msg);
解码
1 2 3 4 5 SerializerAlgorithm[] values = SerializerAlgorithm.values(); Message message = values[seqType-1 ].deserialize(Message.getMessageClass(messageType), bytes);
2、参数调优 CONNECT_TIMEOUT_MILLIS
属于 SocketChannal 的参数
用在客户端建立连接 时,如果在指定毫秒内无法连接,会抛出 timeout 异常
注意 :Netty 中不要用成了SO_TIMEOUT 主要用在阻塞 IO,而 Netty 是非阻塞 IO
使用 1 2 3 4 5 6 7 8 9 10 11 public class TestParam { public static void main (String[] args) { new Bootstrap ().option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000 ); new ServerBootstrap ().option(ChannelOption.CONNECT_TIMEOUT_MILLIS,5000 ); new ServerBootstrap ().childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000 ); } }
客户端通过 Bootstrap.option
函数来配置参数,配置参数作用于 SocketChannel
服务器通过ServerBootstrap
来配置参数,但是对于不同的 Channel 需要选择不同的方法
通过 option
来配置 ServerSocketChannel 上的参数
通过 childOption
来配置 SocketChannel 上的参数
源码分析 客户端中连接服务器的线程是 NIO 线程,抛出异常的是主线程。这是如何做到超时判断以及线程通信的呢?
AbstractNioChannel.AbstractNioUnsafe.connect
方法中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public final void connect ( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { ... int connectTimeoutMillis = config().getConnectTimeoutMillis(); if (connectTimeoutMillis > 0 ) { connectTimeoutFuture = eventLoop().schedule(new Runnable () { @Override public void run () { ChannelPromise connectPromise = AbstractNioChannel.this .connectPromise; ConnectTimeoutException cause = new ConnectTimeoutException ("connection timed out: " + remoteAddress); if (connectPromise != null && connectPromise.tryFailure(cause)) { close(voidPromise()); } } }, connectTimeoutMillis, TimeUnit.MILLISECONDS); } ... }
超时的判断主要是通过 Eventloop 的 schedule 方法和 Promise 共同实现的
schedule 设置了一个定时任务,延迟connectTimeoutMillis
秒后执行该方法
如果指定时间内没有建立连接,则会执行其中的任务
任务负责创建 ConnectTimeoutException
异常,并将异常通过 Pormise 传给主线程并抛出
SO_BACKLOG 该参数是 ServerSocketChannel 的参数
三次握手与连接队列 第一次握手时,因为客户端与服务器之间的连接还未完全建立,连接会被放入半连接队列 中
当完成三次握手以后,连接会被放入全连接队列中
服务器处理Accept事件是在TCP三次握手,也就是建立连接之后。服务器会从全连接队列中获取连接并进行处理
在 linux 2.2 之前,backlog 大小包括了两个队列的大小,在 linux 2.2 之后,分别用下面两个参数来控制
半连接队列 - sync queue
大小通过 /proc/sys/net/ipv4/tcp_max_syn_backlog 指定,在 syncookies
启用的情况下,逻辑上没有最大值限制,这个设置便被忽略
全连接队列 - accept queue
其大小通过 /proc/sys/net/core/somaxconn 指定,在使用 listen 函数时,内核会根据传入的 backlog 参数与系统参数,取二者的较小值
如果 accpet queue 队列满了,server 将发送一个拒绝连接的错误信息到 client
作用 在Netty中,SO_BACKLOG
主要用于设置全连接队列的大小。当处理Accept的速率小于连接建立的速率时,全连接队列中堆积的连接数大于SO_BACKLOG设置的值是,便会抛出异常
设置方式如下
1 2 new ServerBootstrap ().option(ChannelOption.SO_BACKLOG, 2 );
默认值 backlog参数在NioSocketChannel.doBind
方法被使用
1 2 3 4 5 6 7 8 @Override protected void doBind (SocketAddress localAddress) throws Exception { if (PlatformDependent.javaVersion() >= 7 ) { javaChannel().bind(localAddress, config.getBacklog()); } else { javaChannel().socket().bind(localAddress, config.getBacklog()); } }
其中backlog被保存在了DefaultServerSocketChannelConfig
配置类中
1 private volatile int backlog = NetUtil.SOMAXCONN;
具体的赋值操作如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 SOMAXCONN = AccessController.doPrivileged(new PrivilegedAction <Integer>() { @Override public Integer run () { int somaxconn = PlatformDependent.isWindows() ? 200 : 128 ; File file = new File ("/proc/sys/net/core/somaxconn" ); BufferedReader in = null ; try { if (file.exists()) { in = new BufferedReader (new FileReader (file)); somaxconn = Integer.parseInt(in.readLine()); if (logger.isDebugEnabled()) { logger.debug("{}: {}" , file, somaxconn); } } else { ... } ... } return somaxconn; } }
TCP_NODELAY
属于 SocketChannal 参数
因为 Nagle 算法,数据包会堆积到一定的数量后一起发送,这就可能导致数据的发送存在一定的延时
该参数默认为false ,如果不希望的发送被延时,则需要将该值设置为true
SO_SNDBUF & SO_RCVBUF
SO_SNDBUF 属于 SocketChannal 参数
SO_RCVBUF 既可用于 SocketChannal 参数,也可以用于 ServerSocketChannal 参数 (建议设置到 ServerSocketChannal 上)
该参数用于指定接收方与发送方的滑动窗口大小
ALLOCATOR
属于 SocketChannal 参数
用来配置 ByteBuf 是池化还是非池化,是直接内存还是堆内存
使用 1 2 3 new ServerBootstrap ().childOption(ChannelOption.ALLOCATOR, new PooledByteBufAllocator ());
ByteBufAllocator类型
池化并使用直接内存
1 2 new PooledByteBufAllocator (true );
池化并使用堆内存
1 2 new PooledByteBufAllocator (false );
非池化并使用直接内存
1 2 new UnpooledByteBufAllocator (true );
非池化并使用堆内存
1 2 new UnpooledByteBufAllocator (false );
RCVBUF_ALLOCATOR
属于 SocketChannal 参数
控制 Netty 接收缓冲区大小
负责入站数据的分配,决定入站缓冲区的大小(并可动态调整),统一采用 direct 直接内存 ,具体池化还是非池化由 allocator 决定
3、RPC框架 准备工作 在聊天室代码的基础上进行一定的改进
Message 中添加如下代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public abstract class Message implements Serializable { ... public static final int RPC_MESSAGE_TYPE_REQUEST = 101 ; public static final int RPC_MESSAGE_TYPE_RESPONSE = 102 ; static { messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class); messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class); } }
RPC请求消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 public class RpcRequestMessage extends Message { private String interfaceName; private String methodName; private Class<?> returnType; private Class[] parameterTypes; private Object[] parameterValue; public RpcRequestMessage (int sequenceId, String interfaceName, String methodName, Class<?> returnType, Class[] parameterTypes, Object[] parameterValue) { super .setSequenceId(sequenceId); this .interfaceName = interfaceName; this .methodName = methodName; this .returnType = returnType; this .parameterTypes = parameterTypes; this .parameterValue = parameterValue; } @Override public int getMessageType () { return RPC_MESSAGE_TYPE_REQUEST; } public String getInterfaceName () { return interfaceName; } public String getMethodName () { return methodName; } public Class<?> getReturnType() { return returnType; } public Class[] getParameterTypes() { return parameterTypes; } public Object[] getParameterValue() { return parameterValue; } @Override public String toString () { return "RpcRequestMessage{" + "interfaceName='" + interfaceName + '\'' + ", methodName='" + methodName + '\'' + ", returnType=" + returnType + ", parameterTypes=" + Arrays.toString(parameterTypes) + ", parameterValue=" + Arrays.toString(parameterValue) + '}' ; } }
想要远程调用一个方法,必须知道以下五个信息
方法所在的全限定类名
方法名
方法返回值类型
方法参数类型
方法参数值
RPC响应消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public class RpcResponseMessage extends Message { private Object returnValue; private Exception exceptionValue; @Override public int getMessageType () { return RPC_MESSAGE_TYPE_RESPONSE; } public void setReturnValue (Object returnValue) { this .returnValue = returnValue; } public void setExceptionValue (Exception exceptionValue) { this .exceptionValue = exceptionValue; } public Object getReturnValue () { return returnValue; } public Exception getExceptionValue () { return exceptionValue; } @Override public String toString () { return "RpcResponseMessage{" + "returnValue=" + returnValue + ", exceptionValue=" + exceptionValue + '}' ; } }
响应消息中只需要获取返回结果和异常值
服务器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public class RPCServer { public static void main (String[] args) { NioEventLoopGroup boss = new NioEventLoopGroup (); NioEventLoopGroup worker = new NioEventLoopGroup (); LoggingHandler loggingHandler = new LoggingHandler (LogLevel.DEBUG); MessageSharableCodec messageSharableCodec = new MessageSharableCodec (); RpcRequestMessageHandler rpcRequestMessageHandler = new RpcRequestMessageHandler (); try { ServerBootstrap serverBootstrap = new ServerBootstrap (); serverBootstrap.channel(NioServerSocketChannel.class); serverBootstrap.group(boss, worker); serverBootstrap.childHandler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel ch) throws Exception { ch.pipeline().addLast(new ProtocolFrameDecoder ()); ch.pipeline().addLast(loggingHandler); ch.pipeline().addLast(messageSharableCodec); ch.pipeline().addLast(rpcRequestMessageHandler); } }); Channel channel = serverBootstrap.bind(8080 ).sync().channel(); channel.closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); } } }
服务器中添加了处理RPCRequest消息的handler
客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public class RPCClient { public static void main (String[] args) { NioEventLoopGroup group = new NioEventLoopGroup (); LoggingHandler loggingHandler = new LoggingHandler (LogLevel.DEBUG); MessageSharableCodec messageSharableCodec = new MessageSharableCodec (); RpcResponseMessageHandler rpcResponseMessageHandler = new RpcResponseMessageHandler (); try { Bootstrap bootstrap = new Bootstrap (); bootstrap.channel(NioSocketChannel.class); bootstrap.group(group); bootstrap.handler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel ch) throws Exception { ch.pipeline().addLast(new ProtocolFrameDecoder ()); ch.pipeline().addLast(loggingHandler); ch.pipeline().addLast(messageSharableCodec); ch.pipeline().addLast(rpcResponseMessageHandler); } }); Channel channel = bootstrap.connect(new InetSocketAddress ("localhost" , 8080 )).sync().channel(); channel.closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } } }
通过接口Class获取实例对象的Factory
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class ServicesFactory { static HashMap<Class<?>, Object> map = new HashMap <>(16 ); public static Object getInstance (Class<?> interfaceClass) throws ClassNotFoundException, IllegalAccessException, InstantiationException { try { Class<?> clazz = Class.forName("cn.nyimac.study.day8.server.service.HelloService" ); Object instance = Class.forName("cn.nyimac.study.day8.server.service.HelloServiceImpl" ).newInstance(); map.put(clazz, instance); } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) { e.printStackTrace(); } return map.get(interfaceClass); } }
RpcRequestMessageHandler 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 @ChannelHandler .Sharablepublic class RpcRequestMessageHandler extends SimpleChannelInboundHandler <RpcRequestMessage> { @Override protected void channelRead0 (ChannelHandlerContext ctx, RpcRequestMessage rpcMessage) { RpcResponseMessage rpcResponseMessage = new RpcResponseMessage (); try { rpcResponseMessage.setSequenceId(rpcMessage.getSequenceId()); HelloService service = (HelloService) ServicesFactory.getInstance(Class.forName(rpcMessage.getInterfaceName())); Method method = service.getClass().getMethod(rpcMessage.getMethodName(), rpcMessage.getParameterTypes()); Object invoke = method.invoke(service, rpcMessage.getParameterValue()); rpcResponseMessage.setReturnValue(invoke); } catch (Exception e) { e.printStackTrace(); rpcResponseMessage.setExceptionValue(e); } } ctx.writeAndFlush(rpcResponseMessage); }
远程调用方法主要是通过反射实现的,大致步骤如下
通过请求消息传入被调入方法的各个参数
通过全限定接口名,在map中查询到对应的类并实例化对象
通过反射获取Method,并调用其invoke方法的返回值,并放入响应消息中
若有异常需要捕获,并放入响应消息中
RpcResponseMessageHandler 1 2 3 4 5 6 7 8 9 10 @ChannelHandler .Sharablepublic class RpcResponseMessageHandler extends SimpleChannelInboundHandler <RpcResponseMessage> { static final Logger log = LoggerFactory.getLogger(ChatServer.class); @Override protected void channelRead0 (ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception { log.debug("{}" , msg); System.out.println((String)msg.getReturnValue()); } }
客户端发送消息 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class RPCClient { public static void main (String[] args) { ... RpcRequestMessage message = new RpcRequestMessage (1 , "cn.nyimac.study.day8.server.service.HelloService" , "sayHello" , String.class, new Class []{String.class}, new Object []{"Nyima" }); channel.writeAndFlush(message); ... } }
运行结果
客户端
1 1606 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.server.ChatServer - RpcResponseMessage{returnValue=你好,Nyima, exceptionValue=null}
改进客户端 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 public class RPCClientManager { private static AtomicInteger sequenceId = new AtomicInteger (0 ); private static volatile Channel channel = null ; private static final Object lock = new Object (); public static void main (String[] args) { HelloService service = (HelloService) getProxy(HelloService.class); System.out.println(service.sayHello("Nyima" )); System.out.println(service.sayHello("Hulu" )); } public static Channel getChannel () { if (channel == null ) { synchronized (lock) { if (channel == null ) { init(); } } } return channel; } public static Object getProxy (Class<?> serviceClass) { Class<?>[] classes = new Class <?>[]{serviceClass}; Object o = Proxy.newProxyInstance(serviceClass.getClassLoader(), classes, new InvocationHandler () { @Override public Object invoke (Object proxy, Method method, Object[] args) throws Throwable { int id = sequenceId.getAndIncrement(); RpcRequestMessage message = new RpcRequestMessage (id, serviceClass.getName(), method.getName(), method.getReturnType(), method.getParameterTypes(), args); getChannel().writeAndFlush(message); DefaultPromise<Object> promise = new DefaultPromise <>(getChannel().eventLoop()); RpcResponseMessageHandler.promiseMap.put(id, promise); promise.await(); if (promise.isSuccess()) { return promise.getNow(); } else { throw new RuntimeException (promise.cause()); } } }); return o; } private static void init () { NioEventLoopGroup group = new NioEventLoopGroup (); LoggingHandler loggingHandler = new LoggingHandler (LogLevel.DEBUG); MessageSharableCodec messageSharableCodec = new MessageSharableCodec (); RpcResponseMessageHandler rpcResponseMessageHandler = new RpcResponseMessageHandler (); Bootstrap bootstrap = new Bootstrap (); bootstrap.channel(NioSocketChannel.class); bootstrap.group(group); bootstrap.handler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel ch) throws Exception { ch.pipeline().addLast(new ProtocolFrameDecoder ()); ch.pipeline().addLast(loggingHandler); ch.pipeline().addLast(messageSharableCodec); ch.pipeline().addLast(rpcResponseMessageHandler); } }); try { channel = bootstrap.connect(new InetSocketAddress ("localhost" , 8080 )).sync().channel(); channel.closeFuture().addListener(future -> { group.shutdownGracefully(); }); } catch (InterruptedException e) { e.printStackTrace(); } } }
获得Channel
建立连接,获取Channel的操作被封装到了init
方法中,当连接断开时,通过addListener
方法异步关闭group
通过单例模式 创建与获取Channel
远程调用方法
为了让方法的调用变得简洁明了,将RpcRequestMessage
的创建与发送过程通过JDK的动态代理来完成
通过返回的代理对象调用方法即可,方法参数为被调用方法接口的Class类
远程调用方法返回值获取
调用方法的是主线程,处理返回结果的是NIO线程(RpcResponseMessageHandler)。要在不同线程中进行返回值的传递,需要用到Promise
在RpcResponseMessageHandler
中创建一个Map
Key为SequenceId
Value为对应的Promise
主线程 的代理类将RpcResponseMessage发送给服务器后,需要创建Promise对象,并将其放入到RpcResponseMessageHandler的Map中。需要使用await等待结果被放入Promise中 。获取结果后,根据结果类型(判断是否成功)来返回结果或抛出异常
1 2 3 4 5 6 7 8 9 10 11 12 13 DefaultPromise<Object> promise = new DefaultPromise <>(getChannel().eventLoop()); RpcResponseMessageHandler.promiseMap.put(id, promise); promise.await(); if (promise.isSuccess()) { return promise.getNow(); } else { throw new RuntimeException (promise.cause()); }
NIO线程 负责通过SequenceId获取并移除(remove) 对应的Promise,然后根据RpcResponseMessage中的结果,向Promise中放入不同的值
如果没有异常信息 (ExceptionValue),就调用promise.setSuccess(returnValue)
放入方法返回值
如果有异常信息 ,就调用promise.setFailure(exception)
放入异常信息
1 2 3 4 5 6 7 8 9 10 11 12 13 Promise<Object> promise = promiseMap.remove(msg.getSequenceId()); Object returnValue = msg.getReturnValue();Exception exception = msg.getExceptionValue();if (promise != null ) { if (exception != null ) { promise.setFailure(exception); } else { promise.setSuccess(returnValue); } }
改进RpcResponseMessageHandler 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @ChannelHandler .Sharablepublic class RpcResponseMessageHandler extends SimpleChannelInboundHandler <RpcResponseMessage> { static final Logger log = LoggerFactory.getLogger(ChatServer.class); public static Map<Integer, Promise<Object>> promiseMap = new ConcurrentHashMap <>(16 ); @Override protected void channelRead0 (ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception { Promise<Object> promise = promiseMap.remove(msg.getSequenceId()); Object returnValue = msg.getReturnValue(); Exception exception = msg.getExceptionValue(); if (promise != null ) { if (exception != null ) { promise.setFailure(exception); } else { promise.setSuccess(returnValue); } } log.debug("{}" , msg); } }