Netty之NIO基础
本博客是根据黑马程序员Netty实战 学习时所做的笔记
一、三大组件简介 Channel与Buffer
Java NIO系统的核心 在于:通道(Channel)和缓冲区(Buffer)。通道表示打开到 IO 设备(例如:文件、套接字)的连接。若需要使用 NIO 系统,需要获取用于 连接 IO 设备的通道 以及用于容纳数据的缓冲区 。然后操作缓冲区,对数据进行处理
简而言之,通道负责传输,缓冲区负责存储
常见的Channel有以下四种 ,其中FileChannel主要用于文件传输,其余三种用于网络通信
FileChannel
DatagramChannel
SocketChannel
ServerSocketChannel
Buffer有以下几种 ,其中使用较多的是ByteBuffer
ByteBuffer
MappedByteBuffer
DirectByteBuffer
HeapByteBuffer
ShortBuffer
IntBuffer
LongBuffer
FloatBuffer
DoubleBuffer
CharBuffer
1、Selector 在使用Selector之前,处理socket连接还有以下两种方法
使用多线程技术
为每个连接分别开辟一个线程,分别去处理对应的 socket 连接
这种方法存在以下几个问题
内存占用高
每个线程都需要占用一定的内存,当连接较多时,会开辟大量线程,导致占用大量内存
线程上下文切换成本高
只适合连接数少的场景
使用线程池技术
使用线程池,让线程池中的线程去处理连接
这种方法存在以下几个问题
阻塞模式下,线程仅能处理一个连接
线程池中的线程获取任务(task)后,只有当其执行完任务之后(断开连接后),才会去获取并执行下一个任务
若socke连接一直未断开,则其对应的线程无法处理其他socke连接
仅适合短连接场景
短连接即建立连接发送请求并响应后就立即断开,使得线程池中的线程可以快速处理其他连接
使用选择器
selector 的作用就是配合一个线程来管理多个 channel(fileChannel因为是阻塞式的,所以无法使用selector) ,获取这些 channel 上发生的事件 ,这些 channel 工作在非阻塞模式 下,当一个channel中没有执行任务时,可以去执行其他channel中的任务。适合连接数多,但流量较少的场景
若事件未就绪,调用 selector 的 select() 方法会阻塞线程,直到 channel 发生了就绪事件。这些事件就绪后,select 方法就会返回这些事件交给 thread 来处理
2、ByteBuffer 使用案例 使用方式
向 buffer 写入数据,例如调用 channel.read(buffer)
调用 flip() 切换至
读模式
flip会使得buffer中的limit变为position,position变为0
从 buffer 读取数据,例如调用 buffer.get()
调用 clear() 或者compact()切换至
写模式
调用clear()方法时position=0,limit变为capacity
调用compact()方法时,会将缓冲区中的未读数据压缩到缓冲区前面
重复以上步骤
使用ByteBuffer读取文件中的内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class TestByteBuffer { public static void main (String[] args) { try (FileChannel channel = new FileInputStream ("stu.txt" ).getChannel()){ ByteBuffer buffer = ByteBuffer.allocate(10 ); int read = 0 ; StringBuilder builder = new StringBuilder (); while ((read =channel.read(buffer))>0 ){ buffer.flip(); while (buffer.hasRemaining()){ builder.append((char )buffer.get()); } buffer.clear(); } System.out.println(builder.toString()); } catch (Exception e) { e.printStackTrace(); } finally { } } }
打印结果
核心属性 字节缓冲区的父类Buffer中有几个核心属性,如下
1 2 3 4 5 private int mark = -1 ;private int position = 0 ;private int limit;private int capacity;
capacity :缓冲区的容量。通过构造函数赋予,一旦设置,无法更改
limit :缓冲区的界限。位于limit 后的数据不可读写。缓冲区的限制不能为负,并且不能大于其容量
position :下一个 读写位置的索引(类似PC)。缓冲区的位置不能为负,并且不能大于limit
mark :记录当前position的值。position被改变后,可以通过调用reset() 方法恢复到mark的位置。
以上四个属性必须满足以下要求
mark <= position <= limit <= capacity
核心方法 put()方法
put()方法可以将一个数据放入到缓冲区中。
进行该操作后,postition 的值会 +1,指向下一个可以放入的位置。capacity = limit ,为缓冲区容量的值。
flip()方法
flip()方法会 切换对缓冲区的操作模式 ,由 写->读 / 读->写
进行该操作后
如果是 写模式 -> 读模式,position = 0 , limit 指向最后一个元素的下一个位置,capacity不变
如果是读 -> 写 ,则恢复为put()方法中的值
get()方法
get()方法会读取缓冲区中的一个值
进行该操作后,position 会 +1 ,如果超过了limit则会抛出异常
注意:get(i)方法不会改变position的值
rewind()方法
该方法 只能在读模式下使用
rewind()方法后,会恢复position、limit和capacity的值,变为进行get()前的值
clear()方法
clear()方法会将缓冲区中的各个属性恢复为最初的状态,position = 0, capacity = limit
此时缓冲区的数据依然存在 ,处于“被遗忘”状态,下次进行写操作时会覆盖这些数据
mark()和reset()方法
mark()方法会将postion的值保存到mark属性中
reset()方法会将position的值改为mark中保存的值
compact()方法 此方法为ByteBuffer的方法,而不是Buffer的方法
compact会把未读完的数据向前压缩,然后切换到写模式
数据前移后,原位置的值并未清零,写时会覆盖 之前的值
clear() VS compact() clear 只是对position、limit、mark进行重置,而 compact 在对position进行设置,以及limit、mark进行重置的同时,还涉及到数据在内存中拷贝(会调用array)。所以compact比clear更耗性能。 但 compact 能保存你未读取的数据,将新数据追加到为读取的数据之后;而 clear 则不行,若你调用了clear,则未读取的数据就无法再读取到了
所以需要根据情况来判断使用哪种方法进行模式切换
方法调用及演示 ByteBuffer调试工具类 需要先导入netty依赖
1 2 3 4 5 <dependency > <groupId > io.netty</groupId > <artifactId > netty-all</artifactId > <version > 4.1.51.Final</version > </dependency >
import java.nio.ByteBuffer;import io.netty.util.internal.MathUtil;import io.netty.util.internal.StringUtil;import io.netty.util.internal.MathUtil.*;public class ByteBufferUtil { private static final char [] BYTE2CHAR = new char [256 ]; private static final char [] HEXDUMP_TABLE = new char [256 * 4 ]; private static final String[] HEXPADDING = new String [16 ]; private static final String[] HEXDUMP_ROWPREFIXES = new String [65536 >>> 4 ]; private static final String[] BYTE2HEX = new String [256 ]; private static final String[] BYTEPADDING = new String [16 ]; static { final char [] DIGITS = "0123456789abcdef" .toCharArray(); for (int i = 0 ; i < 256 ; i++) { HEXDUMP_TABLE[i << 1 ] = DIGITS[i >>> 4 & 0x0F ]; HEXDUMP_TABLE[(i << 1 ) + 1 ] = DIGITS[i & 0x0F ]; } int i; for (i = 0 ; i < HEXPADDING.length; i++) { int padding = HEXPADDING.length - i; StringBuilder buf = new StringBuilder (padding * 3 ); for (int j = 0 ; j < padding; j++) { buf.append(" " ); } HEXPADDING[i] = buf.toString(); } for (i = 0 ; i < HEXDUMP_ROWPREFIXES.length; i++) { StringBuilder buf = new StringBuilder (12 ); buf.append(StringUtil.NEWLINE); buf.append(Long.toHexString(i << 4 & 0xFFFFFFFFL | 0x100000000L )); buf.setCharAt(buf.length() - 9 , '|' ); buf.append('|' ); HEXDUMP_ROWPREFIXES[i] = buf.toString(); } for (i = 0 ; i < BYTE2HEX.length; i++) { BYTE2HEX[i] = ' ' + StringUtil.byteToHexStringPadded(i); } for (i = 0 ; i < BYTEPADDING.length; i++) { int padding = BYTEPADDING.length - i; StringBuilder buf = new StringBuilder (padding); for (int j = 0 ; j < padding; j++) { buf.append(' ' ); } BYTEPADDING[i] = buf.toString(); } for (i = 0 ; i < BYTE2CHAR.length; i++) { if (i <= 0x1f || i >= 0x7f ) { BYTE2CHAR[i] = '.' ; } else { BYTE2CHAR[i] = (char ) i; } } } public static void debugAll (ByteBuffer buffer) { int oldlimit = buffer.limit(); buffer.limit(buffer.capacity()); StringBuilder origin = new StringBuilder (256 ); appendPrettyHexDump(origin, buffer, 0 , buffer.capacity()); System.out.println("+--------+-------------------- all ------------------------+----------------+" ); System.out.printf("position: [%d], limit: [%d]\n" , buffer.position(), oldlimit); System.out.println(origin); buffer.limit(oldlimit); } public static void debugRead (ByteBuffer buffer) { StringBuilder builder = new StringBuilder (256 ); appendPrettyHexDump(builder, buffer, buffer.position(), buffer.limit() - buffer.position()); System.out.println("+--------+-------------------- read -----------------------+----------------+" ); System.out.printf("position: [%d], limit: [%d]\n" , buffer.position(), buffer.limit()); System.out.println(builder); } private static void appendPrettyHexDump (StringBuilder dump, ByteBuffer buf, int offset, int length) { if (MathUtil.isOutOfBounds(offset, length, buf.capacity())) { throw new IndexOutOfBoundsException ( "expected: " + "0 <= offset(" + offset + ") <= offset + length(" + length + ") <= " + "buf.capacity(" + buf.capacity() + ')' ); } if (length == 0 ) { return ; } dump.append( " +-------------------------------------------------+" + StringUtil.NEWLINE + " | 0 1 2 3 4 5 6 7 8 9 a b c d e f |" + StringUtil.NEWLINE + "+--------+-------------------------------------------------+----------------+" ); final int startIndex = offset; final int fullRows = length >>> 4 ; final int remainder = length & 0xF ; for (int row = 0 ; row < fullRows; row++) { int rowStartIndex = (row << 4 ) + startIndex; appendHexDumpRowPrefix(dump, row, rowStartIndex); int rowEndIndex = rowStartIndex + 16 ; for (int j = rowStartIndex; j < rowEndIndex; j++) { dump.append(BYTE2HEX[getUnsignedByte(buf, j)]); } dump.append(" |" ); for (int j = rowStartIndex; j < rowEndIndex; j++) { dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]); } dump.append('|' ); } if (remainder != 0 ) { int rowStartIndex = (fullRows << 4 ) + startIndex; appendHexDumpRowPrefix(dump, fullRows, rowStartIndex); int rowEndIndex = rowStartIndex + remainder; for (int j = rowStartIndex; j < rowEndIndex; j++) { dump.append(BYTE2HEX[getUnsignedByte(buf, j)]); } dump.append(HEXPADDING[remainder]); dump.append(" |" ); for (int j = rowStartIndex; j < rowEndIndex; j++) { dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]); } dump.append(BYTEPADDING[remainder]); dump.append('|' ); } dump.append(StringUtil.NEWLINE + "+--------+-------------------------------------------------+----------------+" ); } private static void appendHexDumpRowPrefix (StringBuilder dump, int row, int rowStartIndex) { if (row < HEXDUMP_ROWPREFIXES.length) { dump.append(HEXDUMP_ROWPREFIXES[row]); } else { dump.append(StringUtil.NEWLINE); dump.append(Long.toHexString(rowStartIndex & 0xFFFFFFFFL | 0x100000000L )); dump.setCharAt(dump.length() - 9 , '|' ); dump.append('|' ); } } public static short getUnsignedByte (ByteBuffer buffer, int index) { return (short ) (buffer.get(index) & 0xFF ); } }Copy
调用ByteBuffer的方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public class TestByteBuffer { public static void main (String[] args) { ByteBuffer buffer = ByteBuffer.allocate(10 ); buffer.put((byte )97 ); ByteBufferUtil.debugAll(buffer); buffer.put(new byte []{98 , 99 , 100 , 101 }); ByteBufferUtil.debugAll(buffer); buffer.flip(); ByteBufferUtil.debugAll(buffer); System.out.println(buffer.get()); System.out.println(buffer.get()); ByteBufferUtil.debugAll(buffer); buffer.compact(); ByteBufferUtil.debugAll(buffer); buffer.put((byte )102 ); buffer.put((byte )103 ); ByteBufferUtil.debugAll(buffer); } }
运行结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 +--------+-------------------- all ------------------------+----------------+ position: [1 ], limit: [10 ] +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 61 00 00 00 00 00 00 00 00 00 |a......... | +--------+-------------------------------------------------+----------------+ +--------+-------------------- all ------------------------+----------------+ position: [5 ], limit: [10 ] +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 61 62 63 64 65 00 00 00 00 00 |abcde..... | +--------+-------------------------------------------------+----------------+ +--------+-------------------- all ------------------------+----------------+ position: [0 ], limit: [5 ] +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 61 62 63 64 65 00 00 00 00 00 |abcde..... | +--------+-------------------------------------------------+----------------+ 97 98 +--------+-------------------- all ------------------------+----------------+ position: [2 ], limit: [5 ] +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 61 62 63 64 65 00 00 00 00 00 |abcde..... | +--------+-------------------------------------------------+----------------+ +--------+-------------------- all ------------------------+----------------+ position: [3 ], limit: [10 ] +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 63 64 65 64 65 00 00 00 00 00 |cdede..... | +--------+-------------------------------------------------+----------------+ +--------+-------------------- all ------------------------+----------------+ position: [5 ], limit: [10 ] +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 63 64 65 66 67 00 00 00 00 00 |cdefg..... | +--------+-------------------------------------------------+----------------+
字符串与ByteBuffer的相互转换 方法一 编码 :字符串调用getByte方法获得byte数组,将byte数组放入ByteBuffer中
解码 :先调用ByteBuffer的flip方法,然后通过StandardCharsets的decoder方法解码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public class Translate { public static void main (String[] args) { String str1 = "hello" ; String str2 = "" ; ByteBuffer buffer1 = ByteBuffer.allocate(16 ); buffer1.put(str1.getBytes()); ByteBufferUtil.debugAll(buffer1); buffer1.flip(); str2 = StandardCharsets.UTF_8.decode(buffer1).toString(); System.out.println(str2); ByteBufferUtil.debugAll(buffer1); } }
运行结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 +--------+-------------------- all ------------------------+----------------+ position: [5 ], limit: [16 ] +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 68 65 6c 6c 6f 00 00 00 00 00 00 00 00 00 00 00 |hello...........| +--------+-------------------------------------------------+----------------+ hello +--------+-------------------- all ------------------------+----------------+ position: [5 ], limit: [5 ] +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 68 65 6c 6c 6f 00 00 00 00 00 00 00 00 00 00 00 |hello...........| +--------+-------------------------------------------------+----------------+
方法二 编码 :通过 StandardCharsets 的 encode 方法获得ByteBuffer,此时获得的ByteBuffer为读模式,无需通过flip切换模式
解码 :通过 StandardCharsets 的 decoder 方法解码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public class Translate { public static void main (String[] args) { String str1 = "hello" ; String str2 = "" ; ByteBuffer buffer1 = StandardCharsets.UTF_8.encode(str1); ByteBufferUtil.debugAll(buffer1); str2 = StandardCharsets.UTF_8.decode(buffer1).toString(); System.out.println(str2); ByteBufferUtil.debugAll(buffer1); } }
运行结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 +--------+-------------------- all ------------------------+----------------+ position: [0 ], limit: [5 ] +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 68 65 6c 6c 6f |hello | +--------+-------------------------------------------------+----------------+ hello +--------+-------------------- all ------------------------+----------------+ position: [5 ], limit: [5 ] +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 68 65 6c 6c 6f |hello | +--------+-------------------------------------------------+----------------+
方法三 编码 :字符串调用 getByte() 方法获得字节数组,将字节数组传给 ByteBuffer的wrap() 方法 ,通过该方法获得ByteBuffer。同样无需调用flip方法切换为读模式
解码 :通过 StandardCharsets 的 decoder 方法解码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public class Translate { public static void main (String[] args) { String str1 = "hello" ; String str2 = "" ; ByteBuffer buffer1 = ByteBuffer.wrap(str1.getBytes()); ByteBufferUtil.debugAll(buffer1); str2 = StandardCharsets.UTF_8.decode(buffer1).toString(); System.out.println(str2); ByteBufferUtil.debugAll(buffer1); } }
运行结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 +--------+-------------------- all ------------------------+----------------+ position: [0 ], limit: [5 ] +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 68 65 6c 6c 6f |hello | +--------+-------------------------------------------------+----------------+ hello +--------+-------------------- all ------------------------+----------------+ position: [5 ], limit: [5 ] +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 68 65 6c 6c 6f |hello | +--------+-------------------------------------------------+----------------+
粘包与半包 现象 网络上有多条数据发送给服务端,数据之间使用 \n 进行分隔 但由于某种原因这些数据在接收时,被进行了重新组合,例如原始数据有3条为
Hello,world\n
I’m Nyima\n
How are you?\n
变成了下面的两个 byteBuffer (粘包,半包)
Hello,world\nI’m Nyima\nHo
w are you?\n
出现原因 粘包
发送方
在发送数据时,并不是一条一条地发送数据,而是将数据整合在一起 ,当数据达到一定的数量后再一起发送。这就会导致多条信息被放在一个缓冲区中被一起发送出去
半包
接收方
的缓冲区的大小是有限的,当接收方的缓冲区满了以后,就需要将信息截断 ,等缓冲区空了以后再继续放入数据。这就会发生一段完整的数据最后被截断的现象
解决办法
通过get(index)方法遍历ByteBuffer,遇到分隔符时进行处理。
注意
:get(index)不会改变position的值
记录该段数据长度,以便于申请对应大小的缓冲区
将缓冲区的数据通过get()方法写入到target中
调用compact方法 切换模式,因为缓冲区中可能还有未读的数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public class ByteBufferDemo { public static void main (String[] args) { ByteBuffer buffer = ByteBuffer.allocate(32 ); buffer.put("Hello,world\nI'm Nyima\nHo" .getBytes()); split(buffer); buffer.put("w are you?\n" .getBytes()); split(buffer); } private static void split (ByteBuffer buffer) { buffer.flip(); for (int i = 0 ; i < buffer.limit(); i++) { if (buffer.get(i) == '\n' ) { int length = i+1 -buffer.position(); ByteBuffer target = ByteBuffer.allocate(length); for (int j = 0 ; j < length; j++) { target.put(buffer.get()); } ByteBufferUtil.debugAll(target); } } buffer.compact(); } }
运行结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 +--------+-------------------- all ------------------------+----------------+ position: [12], limit: [12] +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 48 65 6c 6c 6f 2c 77 6f 72 6c 64 0a |Hello,world. | +--------+-------------------------------------------------+----------------+ +--------+-------------------- all ------------------------+----------------+ position: [10], limit: [10] +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 49 27 6d 20 4e 79 69 6d 61 0a |I'm Nyima. | +--------+-------------------------------------------------+----------------+ +--------+-------------------- all ------------------------+----------------+ position: [13], limit: [13] +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 48 6f 77 20 61 72 65 20 79 6f 75 3f 0a |How are you?. | +--------+-------------------------------------------------+----------------+
二、文件编程 1、FileChannel 工作模式 FileChannel只能在阻塞模式下工作 ,所以无法搭配Selector
获取 不能直接打开 FileChannel,必须 通过 FileInputStream、FileOutputStream 或者 RandomAccessFile 来获取 FileChannel,它们都有 getChannel 方法
通过 FileInputStream 获取的 channel 只能读
通过 FileOutputStream 获取的 channel 只能写
通过 RandomAccessFile 是否能读写根据构造 RandomAccessFile 时的读写模式决定
读取 通过 FileInputStream 获取channel,通过read方法将数据写入到ByteBuffer中
read方法的返回值表示读到了多少字节,若读到了文件末尾则返回-1
1 int readBytes = channel.read(buffer);
可根据返回值判断是否读取完毕
1 2 3 4 while (channel.read(buffer) > 0 ) { ... }
写入 因为channel也是有大小的,所以 write 方法并不能保证一次将 buffer 中的内容全部写入 channel。必须需要按照以下规则进行写入
1 2 3 4 while (buffer.hasRemaining()) { channel.write(buffer); }
关闭 通道需要close,一般情况通过try-with-resource进行关闭,最好使用以下方法获取strea以及channel,避免某些原因使得资源未被关闭
1 2 3 4 5 6 7 8 9 10 11 12 13 public class TestChannel { public static void main (String[] args) throws IOException { try (FileInputStream fis = new FileInputStream ("stu.txt" ); FileOutputStream fos = new FileOutputStream ("student.txt" ); FileChannel inputChannel = fis.getChannel(); FileChannel outputChannel = fos.getChannel()) { ... } } }
位置 position
channel也拥有一个保存读取数据位置的属性,即position
1 long pos = channel.position();
可以通过position(int pos)设置channel中position的值
1 2 long newPos = ...;channel.position(newPos);
设置当前位置时,如果设置为文件的末尾
这时读取会返回 -1
这时写入,会追加内容,但要注意如果 position 超过了文件末尾,再写入时在新内容和原末尾之间会有空洞(00)
强制写入 操作系统出于性能的考虑,会将数据缓存,不是立刻写入磁盘,而是等到缓存满了以后将所有数据一次性的写入磁盘。可以调用 force(true) 方法将文件内容和元数据(文件的权限等信息)立刻写入磁盘
2、两个Channel传输数据 transferTo方法 使用transferTo方法可以快速、高效地将一个channel中的数据传输到另一个channel中,但一次只能传输2G的内容
transferTo底层使用了零拷贝技术
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class TestChannel { public static void main (String[] args) { try (FileInputStream fis = new FileInputStream ("stu.txt" ); FileOutputStream fos = new FileOutputStream ("student.txt" ); FileChannel inputChannel = fis.getChannel(); FileChannel outputChannel = fos.getChannel()) { inputChannel.transferTo(0 , inputChannel.size(), outputChannel); } catch (IOException e) { e.printStackTrace(); } } }
当传输的文件大于2G 时,需要使用以下方法进行多次传输
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public class TestChannel { public static void main (String[] args) { try (FileInputStream fis = new FileInputStream ("stu.txt" ); FileOutputStream fos = new FileOutputStream ("student.txt" ); FileChannel inputChannel = fis.getChannel(); FileChannel outputChannel = fos.getChannel()) { long size = inputChannel.size(); long capacity = inputChannel.size(); while (capacity > 0 ) { capacity -= inputChannel.transferTo(size-capacity, capacity, outputChannel); } } catch (IOException e) { e.printStackTrace(); } } }
3、Path与Paths
Path 用来表示文件路径
Paths 是工具类,用来获取 Path 实例
1 2 3 4 5 6 7 Path source = Paths.get("1.txt" ); Path source = Paths.get("d:\\1.txt" ); Path source = Paths.get("d:/1.txt" ); Path projects = Paths.get("d:\\data" , "projects" );
例如目录结构如下
1 2 3 4 5 d: |- data |- projects |- a |- b
代码
1 2 3 Path path = Paths.get("d:\\data\\projects\\a\\..\\b" );System.out.println(path); System.out.println(path.normalize());
输出结果为
1 2 d:\data\projects\a\..\b d:\data\projects\b
4、Files 查找 检查文件是否存在
1 2 Path path = Paths.get("helloword/data.txt" );System.out.println(Files.exists(path));
创建 创建一级目录
1 2 Path path = Paths.get("helloword/d1" );Files.createDirectory(path);
如果目录已存在,会抛异常 FileAlreadyExistsException
不能一次创建多级目录,否则会抛异常 NoSuchFileException
创建多级目录用
1 2 Path path = Paths.get("helloword/d1/d2" );Files.createDirectories(path);
拷贝及移动 拷贝文件
1 2 3 4 Path source = Paths.get("helloword/data.txt" );Path target = Paths.get("helloword/target.txt" );Files.copy(source, target);
如果文件已存在,会抛异常 FileAlreadyExistsException
如果希望用 source 覆盖 掉 target,需要用 StandardOption 来控制
1 Files.copy(source, target, StandardOption.REPLACE_EXISTING);
移动文件
1 2 3 4 Path source = Paths.get("helloword/data.txt" );Path target = Paths.get("helloword/data.txt" );Files.move(source, target, StandardOption.ATOMIC_MOVE);
StandardOption.ATOMIC_MOVE 保证文件移动的原子性
删除 删除文件
1 2 3 Path target = Paths.get("helloword/target.txt" );Files.delete(target);
如果文件不存在,会抛异常 NoSuchFileException
删除目录
1 2 3 Path target = Paths.get("helloword/d1" );Files.delete(target);
如果目录还有内容 ,会抛异常 DirectoryNotEmptyException
遍历 可以使用Files工具类中的walkFileTree(Path, FileVisitor)方法 ,其中需要传入两个参数
Path:文件起始路径
FileVisitor:文件访问器,
使用访问者模式
接口的实现类
SimpleFileVisitor
有四个方法
preVisitDirectory:访问目录前的操作
visitFile:访问文件的操作
visitFileFailed:访问文件失败时的操作
postVisitDirectory:访问目录后的操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class TestFiles { public static void main (String[] args) throws IOException { AtomicInteger ditCount = new AtomicInteger (); AtomicInteger fileCount = new AtomicInteger (); Files.walkFileTree(Paths.get("D:\\Program Files\\jdk7" ),new SimpleFileVisitor <Path>(){ @Override public FileVisitResult preVisitDirectory (Path dir, BasicFileAttributes attrs) throws IOException { System.err.println("=====>" +dir); ditCount.incrementAndGet(); return super .preVisitDirectory(dir, attrs); } @Override public FileVisitResult visitFile (Path file, BasicFileAttributes attrs) throws IOException { System.out.println("=====>" +file); fileCount.incrementAndGet(); return super .visitFile(file, attrs); } }); System.out.println("dir count :" +ditCount); System.out.println("file count :" +fileCount); } }
运行结果如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 ... =====>D:\Program Files\jdk7\jre7\lib\zi\SystemV\EST5EDT =====>D:\Program Files\jdk7\jre7\lib\zi\SystemV\HST10 =====>D:\Program Files\jdk7\jre7\lib\zi\SystemV\MST7 =====>D:\Program Files\jdk7\jre7\lib\zi\SystemV\MST7MDT =====>D:\Program Files\jdk7\jre7\lib\zi\SystemV\PST8 =====>D:\Program Files\jdk7\jre7\lib\zi\SystemV\PST8PDT =====>D:\Program Files\jdk7\jre7\lib\zi\SystemV\YST9 =====>D:\Program Files\jdk7\jre7\lib\zi\SystemV\YST9YDT =====>D:\Program Files\jdk7\jre7\lib\zi\WET =====>D:\Program Files\jdk7\jre7\lib\zi\ZoneInfoMappings =====>D:\Program Files\jdk7\jre7\LICENSE =====>D:\Program Files\jdk7\jre7\README.txt =====>D:\Program Files\jdk7\jre7\release =====>D:\Program Files\jdk7\jre7\THIRDPARTYLICENSEREADME-JAVAFX.txt =====>D:\Program Files\jdk7\jre7\THIRDPARTYLICENSEREADME.txt =====>D:\Program Files\jdk7\jre7\Welcome.html dir count :183 file count :2437
三、网络编程 1、阻塞
阻塞模式下,相关方法都会导致线程暂停
ServerSocketChannel.accept 会在没有连接建立时 让线程暂停
SocketChannel.read 会在通道中没有数据可读时 让线程暂停
阻塞的表现其实就是线程暂停了,暂停期间不会占用 cpu,但线程相当于闲置
单线程下,阻塞方法之间相互影响,几乎不能正常工作,需要多线程支持
但多线程下,有新的问题,体现在以下方面
32 位 jvm 一个线程 320k,64 位 jvm 一个线程 1024k,如果连接数过多,必然导致 OOM,并且线程太多,反而会因为频繁上下文切换导致性能降低
可以采用线程池技术来减少线程数和线程上下文切换,但治标不治本,如果有很多连接建立,但长时间 inactive,会阻塞线程池中所有线程,因此不适合长连接,只适合短连接
服务端代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public class Server { public static void main (String[] args) { ByteBuffer buffer = ByteBuffer.allocate(16 ); try (ServerSocketChannel server = ServerSocketChannel.open()) { server.bind(new InetSocketAddress (8080 )); ArrayList<SocketChannel> channels = new ArrayList <>(); while (true ) { System.out.println("before connecting..." ); SocketChannel socketChannel = server.accept(); System.out.println("after connecting..." ); channels.add(socketChannel); for (SocketChannel channel : channels) { System.out.println("before reading" ); channel.read(buffer); buffer.flip(); ByteBufferUtil.debugRead(buffer); buffer.clear(); System.out.println("after reading" ); } } } catch (IOException e) { e.printStackTrace(); } } }
客户端代码
1 2 3 4 5 6 7 8 9 10 11 public class Client { public static void main (String[] args) { try (SocketChannel socketChannel = SocketChannel.open()) { socketChannel.connect(new InetSocketAddress ("localhost" , 8080 )); System.out.println("waiting..." ); } catch (IOException e) { e.printStackTrace(); } } }
运行结果
客户端-服务器建立连接前:服务器端因accept阻塞
客户端-服务器建立连接后,客户端发送消息前:服务器端因通道为空被阻塞
客户端发送数据后,服务器处理通道中的数据。再次进入循环时,再次被accept阻塞
之前的客户端再次发送消息,服务器端因为被accept阻塞 ,无法处理之前客户端发送到通道中的信息
2、非阻塞
可以通过ServerSocketChannel的configureBlocking(false )方法将获得连接设置为非阻塞的 。此时若没有连接,accept会返回null
可以通过SocketChannel的configureBlocking(false )方法将从通道中读取数据设置为非阻塞的 。若此时通道中没有数据可读,read会返回-1
服务器代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 public class Server { public static void main (String[] args) { ByteBuffer buffer = ByteBuffer.allocate(16 ); try (ServerSocketChannel server = ServerSocketChannel.open()) { server.configureBlocking(false ); server.bind(new InetSocketAddress (8080 )); ArrayList<SocketChannel> channels = new ArrayList <>(); while (true ) { SocketChannel socketChannel = server.accept(); if (socketChannel != null ) { System.out.println("after connecting..." ); channels.add(socketChannel); } for (SocketChannel channel : channels) { channel.configureBlocking(false ); int read = channel.read(buffer); if (read > 0 ) { buffer.flip(); ByteBufferUtil.debugRead(buffer); buffer.clear(); System.out.println("after reading" ); } } } } catch (IOException e) { e.printStackTrace(); } } }
这样写存在一个问题,因为设置为了非阻塞,会一直执行while(true)中的代码,CPU一直处于忙碌状态,会使得性能变低,所以实际情况中不使用这种方法处理请求
3、Selector 多路复用 单线程可以配合 Selector 完成对多个 Channel 可读写事件的监控,这称之为多路复用
多路复用仅针对网络 IO ,普通文件 IO 无法 利用多路复用
如果不用 Selector 的非阻塞模式,线程大部分时间都在做无用功,而 Selector 能够保证
有可连接事件时才去连接
有可读事件才去读取
有可写事件才去写入
限于网络传输能力,Channel 未必时时可写,一旦 Channel 可写,会触发 Selector 的可写事件
4、使用及Accpet事件 要使用Selector实现多路复用,服务端代码如下改进
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 public class SelectServer { public static void main (String[] args) { ByteBuffer buffer = ByteBuffer.allocate(16 ); try (ServerSocketChannel server = ServerSocketChannel.open()) { server.bind(new InetSocketAddress (8080 )); Selector selector = Selector.open(); server.configureBlocking(false ); server.register(selector, SelectionKey.OP_ACCEPT); while (true ) { int ready = selector.select(); System.out.println("selector ready counts : " + ready); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); if (key.isAcceptable()) { ServerSocketChannel channel = (ServerSocketChannel) key.channel(); System.out.println("before accepting..." ); SocketChannel socketChannel = channel.accept(); System.out.println("after accepting..." ); iterator.remove(); } } } } catch (IOException e) { e.printStackTrace(); } } }
步骤解析
1 Selector selector = Selector.open();
1 2 3 4 server.configureBlocking(false ); server.register(selector, SelectionKey.OP_ACCEPT);
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); if (key.isAcceptable()) { ServerSocketChannel channel = (ServerSocketChannel) key.channel(); SocketChannel socketChannel = channel.accept(); iterator.remove(); } }
事件发生后能否不处理
事件发生后,要么处理,要么取消(cancel) ,不能什么都不做,否则下次该事件仍会触发 ,这是因为 nio 底层使用的是水平触发
5、Read事件
在Accept事件中,若有客户端与服务器端建立了连接,需要将其对应的SocketChannel设置为非阻塞,并注册到选择其中
添加Read事件,触发后进行读取操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 public class SelectServer { public static void main (String[] args) { ByteBuffer buffer = ByteBuffer.allocate(16 ); try (ServerSocketChannel server = ServerSocketChannel.open()) { server.bind(new InetSocketAddress (8080 )); Selector selector = Selector.open(); server.configureBlocking(false ); server.register(selector, SelectionKey.OP_ACCEPT); while (true ) { int ready = selector.select(); System.out.println("selector ready counts : " + ready); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); if (key.isAcceptable()) { ServerSocketChannel channel = (ServerSocketChannel) key.channel(); System.out.println("before accepting..." ); SocketChannel socketChannel = channel.accept(); System.out.println("after accepting..." ); socketChannel.configureBlocking(false ); socketChannel.register(selector, SelectionKey.OP_READ); iterator.remove(); } else if (key.isReadable()) { SocketChannel channel = (SocketChannel) key.channel(); System.out.println("before reading..." ); channel.read(buffer); System.out.println("after reading..." ); buffer.flip(); ByteBufferUtil.debugRead(buffer); buffer.clear(); iterator.remove(); } } } } catch (IOException e) { e.printStackTrace(); } } }
删除事件
当处理完一个事件后,一定要调用迭代器的remove方法移除对应事件,否则会出现错误 。原因如下
以我们上面的 Read事件 的代码为例
当调用了 server.register(selector, SelectionKey.OP_ACCEPT)后,Selector中维护了一个集合,用于存放SelectionKey以及其对应的通道
1 2 private SelectionKeyImpl[] channelArray = new SelectionKeyImpl [8 ];
1 2 3 4 5 public class SelectionKeyImpl extends AbstractSelectionKey { final SelChImpl channel; ... }
当选择器中的通道对应的事件发生后 ,selecionKey会被放到另一个集合中,但是selecionKey不会自动移除 ,所以需要我们在处理完一个事件后,通过迭代器手动移除其中的selecionKey。否则会导致已被处理过的事件再次被处理,就会引发错误
断开处理 当客户端与服务器之间的连接断开时,会给服务器端发送一个读事件 ,对异常断开和正常断开需要加以不同的方式进行处理
正常断开
异常断开
异常断开时,会抛出IOException异常, 在try-catch的catch块中捕获异常并调用key的cancel方法即可
消息边界 不处理消息边界存在的问题
将缓冲区的大小设置为4个字节,发送2个汉字(你好),通过decode解码并打印时,会出现乱码
1 2 3 4 5 ByteBuffer buffer = ByteBuffer.allocate(4 );System.out.println(StandardCharsets.UTF_8.decode(buffer)); 你� ��
这是因为UTF-8字符集下,1个汉字占用3个字节,此时缓冲区大小为4个字节,一次读时间无法处理完通道中的所有数据,所以一共会触发两次读事件 。这就导致 你好
的 好
字被拆分为了前半部分和后半部分发送,解码时就会出现问题
处理消息边界
传输的文本可能有以下三种情况
解决思路大致有以下三种
固定消息长度 ,数据包大小一样,服务器按预定长度读取,当发送的数据较少时,需要将数据进行填充,直到长度与消息规定长度一致。缺点是浪费带宽
另一种思路是按分隔符拆分,缺点是效率低,需要一个一个字符地去匹配分隔符
TLV 格式,即 Type 类型、Length 长度、Value 数据
(也就是在消息开头用一些空间存放后面数据的长度),如HTTP请求头中的 Content-Type 与 Content-Length
。类型和长度已知的情况下,就可以方便获取消息大小,分配合适的 buffer,缺点是 buffer 需要提前分配,如果内容过大,则影响 server 吞吐量
Http 2.0 是 LTV 格式
下文的消息边界处理方式为第二种:按分隔符拆分
附件与扩容
Channel的register方法还有第三个参数 :附件
,可以向其中放入一个Object类型的对象,该对象会与登记的Channel以及其对应的SelectionKey绑定,可以从SelectionKey获取到对应通道的附件
1 public final SelectionKey register (Selector sel, int ops, Object att)
可通过SelectionKey的attachment()方法获得附件
1 ByteBuffer buffer = (ByteBuffer) key.attachment();
我们需要在Accept事件发生后,将通道注册到Selector中时,对每个通道添加一个ByteBuffer附件 ,让每个通道发生读事件时都使用自己的通道,避免与其他通道发生冲突而导致问题
1 2 3 4 5 socketChannel.configureBlocking(false ); ByteBuffer buffer = ByteBuffer.allocate(16 );socketChannel.register(selector, SelectionKey.OP_READ, buffer);
当Channel中的数据大于缓冲区时,需要对缓冲区进行扩容 操作。此代码中的扩容的判定方法:Channel调用compact方法后,的position与limit相等,说明缓冲区中的数据并未被读取(容量太小),此时创建新的缓冲区,其大小扩大为两倍。同时还要将旧缓冲区中的数据拷贝到新的缓冲区中,同时调用SelectionKey的attach方法将新的缓冲区作为新的附件放入SelectionKey中
1 2 3 4 5 6 7 8 if (buffer.position() == buffer.limit()) { ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity()*2 ); ewBuffer.put(buffer); key.attach(newBuffer); }
改造后的服务器代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 public class SelectServer { public static void main (String[] args) { try (ServerSocketChannel server = ServerSocketChannel.open()) { server.bind(new InetSocketAddress (8080 )); Selector selector = Selector.open(); server.configureBlocking(false ); server.register(selector, SelectionKey.OP_ACCEPT); while (true ) { int ready = selector.select(); System.out.println("selector ready counts : " + ready); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); if (key.isAcceptable()) { ServerSocketChannel channel = (ServerSocketChannel) key.channel(); System.out.println("before accepting..." ); SocketChannel socketChannel = channel.accept(); System.out.println("after accepting..." ); socketChannel.configureBlocking(false ); ByteBuffer buffer = ByteBuffer.allocate(16 ); socketChannel.register(selector, SelectionKey.OP_READ, buffer); } else if (key.isReadable()) { SocketChannel channel = (SocketChannel) key.channel(); System.out.println("before reading..." ); ByteBuffer buffer = (ByteBuffer) key.attachment(); int read = channel.read(buffer); if (read == -1 ) { key.cancel(); channel.close(); } else { split(buffer); if (buffer.position() == buffer.limit()) { ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity()*2 ); buffer.flip(); newBuffer.put(buffer); key.attach(newBuffer); } } System.out.println("after reading..." ); } } } } catch (IOException e) { e.printStackTrace(); } } private static void split (ByteBuffer buffer) { buffer.flip(); for (int i = 0 ; i < buffer.limit(); i++) { if (buffer.get(i) == '\n' ) { int length = i+1 -buffer.position(); ByteBuffer target = ByteBuffer.allocate(length); for (int j = 0 ; j < length; j++) { target.put(buffer.get()); } ByteBufferUtil.debugAll(target); } } buffer.compact(); } }
ByteBuffer的大小分配
每个 channel 都需要记录可能被切分的消息,因为 ByteBuffer 不能被多个 channel 共同使用 ,因此需要为每个 channel 维护一个独立的 ByteBuffer
ByteBuffer 不能太大,比如一个 ByteBuffer 1Mb 的话,要支持百万连接就要 1Tb 内存,因此需要设计大小可变的 ByteBuffer
分配思路可以参考
一种思路是首先分配一个较小的 buffer,例如 4k,如果发现数据不够,再分配 8k 的 buffer,将 4k buffer 内容拷贝至 8k buffer,优点是消息连续容易处理,缺点是数据拷贝耗费性能
另一种思路是用多个数组组成 buffer,一个数组不够,把多出来的内容写入新的数组,与前面的区别是消息存储不连续解析复杂,优点是避免了拷贝引起的性能损耗
6、Write事件 服务器通过Buffer向通道中写入数据时,可能因为通道容量小于Buffer中的数据大小,导致无法一次性将Buffer中的数据全部写入到Channel中,这时便需要分多次写入 ,具体步骤如下
执行一次写操作,向将buffer中的内容写入到SocketChannel中,然后判断Buffer中是否还有数据
若Buffer中还有数据,则需要将SockerChannel注册到Seletor中,并关注写事件,同时将未写完的Buffer作为附件一起放入到SelectionKey中
1 2 3 4 5 6 7 int write = socket.write(buffer); if (buffer.hasRemaining()) { socket.configureBlocking(false ); socket.register(selector, SelectionKey.OP_WRITE, buffer); }
添加写事件的相关操作key.isWritable()
,对Buffer再次进行写操作
每次写后需要判断Buffer中是否还有数据(是否写完)。若写完,需要移除SelecionKey中的Buffer附件,避免其占用过多内存,同时还需移除对写事件的关注
1 2 3 4 5 6 7 8 9 10 11 SocketChannel socket = (SocketChannel) key.channel();ByteBuffer buffer = (ByteBuffer) key.attachment();int write = socket.write(buffer);System.out.println(write); if (!buffer.hasRemaining()) { key.attach(null ); key.interestOps(0 ); }
整体代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 public class WriteServer { public static void main (String[] args) { try (ServerSocketChannel server = ServerSocketChannel.open()) { server.bind(new InetSocketAddress (8080 )); server.configureBlocking(false ); Selector selector = Selector.open(); server.register(selector, SelectionKey.OP_ACCEPT); while (true ) { selector.select(); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); if (key.isAcceptable()) { SocketChannel socket = server.accept(); StringBuilder builder = new StringBuilder (); for (int i = 0 ; i < 500000000 ; i++) { builder.append("a" ); } ByteBuffer buffer = StandardCharsets.UTF_8.encode(builder.toString()); int write = socket.write(buffer); System.out.println(write); if (buffer.hasRemaining()) { socket.configureBlocking(false ); socket.register(selector, SelectionKey.OP_WRITE, buffer); } } else if (key.isWritable()) { SocketChannel socket = (SocketChannel) key.channel(); ByteBuffer buffer = (ByteBuffer) key.attachment(); int write = socket.write(buffer); System.out.println(write); if (!buffer.hasRemaining()) { key.attach(null ); key.interestOps(0 ); } } } } } catch (IOException e) { e.printStackTrace(); } } }
7、优化 多线程优化 充分利用多核CPU,分两组选择器
单线程配一个选择器(Boss),专门处理 accept 事件
创建 cpu 核心数的线程(Worker),每个线程配一个选择器,轮流处理 read 事件
实现思路
实现代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 public class ThreadsServer { public static void main (String[] args) { try (ServerSocketChannel server = ServerSocketChannel.open()) { Thread.currentThread().setName("Boss" ); server.bind(new InetSocketAddress (8080 )); Selector boss = Selector.open(); server.configureBlocking(false ); server.register(boss, SelectionKey.OP_ACCEPT); Worker[] workers = new Worker [4 ]; AtomicInteger robin = new AtomicInteger (0 ); for (int i = 0 ; i < workers.length; i++) { workers[i] = new Worker ("worker-" +i); } while (true ) { boss.select(); Set<SelectionKey> selectionKeys = boss.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); if (key.isAcceptable()) { SocketChannel socket = server.accept(); System.out.println("connected... " ); socket.configureBlocking(false ); System.out.println("before read..." ); workers[robin.getAndIncrement()% workers.length].register(socket); System.out.println("after read..." ); } } } } catch (IOException e) { e.printStackTrace(); } } static class Worker implements Runnable { private Thread thread; private volatile Selector selector; private String name; private volatile boolean started = false ; private ConcurrentLinkedQueue<Runnable> queue; public Worker (String name) { this .name = name; } public void register (final SocketChannel socket) throws IOException { if (!started) { thread = new Thread (this , name); selector = Selector.open(); queue = new ConcurrentLinkedQueue <>(); thread.start(); started = true ; } queue.add(new Runnable () { @Override public void run () { try { socket.register(selector, SelectionKey.OP_READ); } catch (IOException e) { e.printStackTrace(); } } }); selector.wakeup(); } @Override public void run () { while (true ) { try { selector.select(); Runnable task = queue.poll(); if (task != null ) { task.run(); } Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); if (key.isReadable()) { SocketChannel socket = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(16 ); socket.read(buffer); buffer.flip(); ByteBufferUtil.debugAll(buffer); } } } catch (IOException e) { e.printStackTrace(); } } } } }
四、NIO与BIO 1、Stream与Channel
2、IO模型
当调用一次 channel.read 或 stream.read 后,会由用户态切换至操作系统内核态来完成真正数据读取,而读取又分为两个阶段,分别为:
等待数据阶段
复制数据阶段
根据UNIX 网络编程 - 卷 I,IO模型主要有以下几种
阻塞IO
用户线程进行read操作时,需要等待操作系统执行实际的read操作 ,此期间用户线程是被阻塞的,无法执行其他操作
非阻塞IO
多路复用
Java中通过Selector实现多路复用
当没有事件是,调用select方法会被阻塞住
一旦有一个或多个事件发生后,就会处理对应的事件,从而实现多路复用
多路复用与阻塞IO的区别
阻塞IO模式下,若线程因accept事件被阻塞,发生read事件后,仍需等待accept事件执行完成后 ,才能去处理read事件
多路复用模式下,一个事件发生后,若另一个事件处于阻塞状态,不会影响该事件的执行
异步IO
线程1调用方法后理解返回,不会被阻塞也不需要立即获取结果
当方法的运行结果出来以后,由线程2将结果返回给线程1
3、零拷贝 零拷贝指的是数据无需拷贝到 JVM 内存中 ,同时具有以下三个优点
更少的用户态与内核态的切换
不利用 cpu 计算,减少 cpu 缓存伪共享
零拷贝适合小文件传输
传统 IO 问题 传统的 IO 将一个文件通过 socket 写出
1 2 3 4 5 6 7 8 File f = new File("helloword/data.txt"); RandomAccessFile file = new RandomAccessFile(file, "r"); byte[] buf = new byte[(int)f.length()]; file.read(buf); Socket socket = ...; socket.getOutputStream().write(buf);
内部工作流如下
Java 本身并不具备 IO 读写能力,因此 read 方法调用后,要从 Java 程序的用户态切换至内核态 ,去调用操作系统(Kernel)的读能力,将数据读入内核缓冲区 。这期间用户线程阻塞,操作系统使用 DMA(Direct Memory Access)来实现文件读,其间也不会使用 CPU
DMA 也可以理解为硬件单元,用来解放 cpu 完成文件 IO
从内核态 切换回用户态 ,将数据从内核缓冲区 读入用户缓冲区 (即 byte[] buf),这期间 CPU 会参与拷贝 ,无法利用 DMA
调用 write 方法,这时将数据从用户缓冲区 (byte[] buf)写入 socket 缓冲区,CPU 会参与拷贝
接下来要向网卡写数据,这项能力 Java 又不具备,因此又得从用户态 切换至内核态 ,调用操作系统的写能力,使用 DMA 将 socket 缓冲区 的数据写入网卡,不会使用 CPU
可以看到中间环节较多,java 的 IO 实际不是物理设备级别的读写,而是缓存的复制,底层的真正读写是操作系统来完成的
用户态与内核态的切换发生了 3 次,这个操作比较重量级
数据拷贝了共 4 次
NIO 优化 通过 DirectByteBuf
大部分步骤与优化前相同,唯有一点:Java 可以使用 DirectByteBuffer 将堆外内存映射到 JVM 内存中来直接访问使用
这块内存不受 JVM 垃圾回收的影响,因此内存地址固定,有助于 IO 读写
Java 中的 DirectByteBuf 对象仅维护了此内存的虚引用,内存回收分成两步
DirectByteBuffer 对象被垃圾回收,将虚引用加入引用队列
当引用的对象ByteBuffer被垃圾回收以后,虚引用对象Cleaner就会被放入引用队列中,然后调用Cleaner的clean方法来释放直接内存
DirectByteBuffer 的释放底层调用的是 Unsafe 的 freeMemory 方法
通过专门线程访问引用队列,根据虚引用释放堆外内存
减少了一次数据拷贝,用户态与内核态的切换次数没有减少
进一步优化1 以下两种方式都是零拷贝 ,即无需将数据拷贝到用户缓冲区中(JVM内存中)
底层采用了 linux 2.1 后提供的 sendFile 方法,Java 中对应着两个 channel 调用 transferTo/transferFrom 方法拷贝数据
Java 调用 transferTo 方法后,要从 Java 程序的用户态 切换至内核态 ,使用 DMA将数据读入内核缓冲区 ,不会使用 CPU
数据从内核缓冲区 传输到 socket 缓冲区 ,CPU 会参与拷贝
最后使用 DMA 将 socket 缓冲区 的数据写入网卡,不会使用 CPU
这种方法下
只发生了1次用户态与内核态的切换
数据拷贝了 3 次
进一步优化2 linux 2.4 对上述方法再次进行了优化
Java 调用 transferTo 方法后,要从 Java 程序的用户态 切换至内核态 ,使用 DMA将数据读入内核缓冲区 ,不会使用 CPU
只会将一些 offset 和 length 信息拷入 socket 缓冲区 ,几乎无消耗
使用 DMA 将 内核缓冲区 的数据写入网卡,不会使用 CPU
整个过程仅只发生了1次用户态与内核态的切换,数据拷贝了 2 次
4、AIO AIO 用来解决数据复制阶段的阻塞问题
同步意味着,在进行读写操作时,线程需要等待结果,还是相当于闲置
异步意味着,在进行读写操作时,线程不必等待结果,而是将来由操作系统来通过回调方式由另外的线程来获得结果
异步模型需要底层操作系统(Kernel)提供支持
Windows 系统通过 IOCP 实现了真正的异步 IO
Linux 系统异步 IO 在 2.6 版本引入,但其底层实现还是用多路复用模拟了异步 IO,性能没有优势