通道基础
基本
- 顶层的Channel接口,只有两个操作: 检查一个通道是否打开(isOpen())和关闭一个打开的通道(close())
public interface Channel{ public boolean isOpen();//通道是否开启 public void close() throws IOException;//关闭打开的通道 }
- 通道是访问I/O服务的导管,I/O可以分为广义的两大类:File I/O与Stream I/O,相应的有两种类型的通道与之对应,FileChannel类和socket通道类:SocketChannel丶ServerSocketChannel丶DatagramChannel(UDP)
- 顶层的Channel接口,只有两个操作: 检查一个通道是否打开(isOpen())和关闭一个打开的通道(close())
打开通道
//socket通道可以直接使用其工厂方法open()打开 SocketChannel sc = SocketChannel.open(); sc.connect (new InetSocketAddress ("somehost", someport)); ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.socket().bind (new InetSocketAddress (somelocalport)); DatagramChannel dc = DatagramChannel.open(); //FileChannel只能通过RandomAccessFile丶FileInputStream丶FileOutputStream获取 RandomAccessFile raf = new RandomAccessFile ("somefile", "r"); FileChannel fc = raf.getChannel( ); //Channels工厂类 Channels.newChannel(System.in);
使用通道
- 通道可以是单向的(unidirectional)或者双向的(bidirectional),一个类实现ReadableByteChannel接口或者WritableByteChannel接口其中一个则为单向,全部实现则为双向(可以双向传输数据),
- 对于file文件可以在不同的时候以不同的权限打开,FileInputStream对象总是以read-only的权限打开文件,因此此对象的getChannel()方法虽然实现了两个接口,但是却是只读的,如果进行write()操作就会抛出异常
- ByteChannel的read()与write()方法使用ByteBuffer对象作为参数,两种方法均返回已传输的字节数,如果缓冲区没有满,则可重新提交给通道从上次中断的地方继续传输,直到缓冲区hasRemaining()方法返回false
- 通道可以以阻塞(blocking)或非阻塞(nonblocking)模式运行,非阻塞模式的通道不会让调用线程休眠,请求的操作要么立即完成,要么返回一个未进行任何操作的结果,只有面向流的通道,如sockets和pipes才能使用非阻塞模式
//读取接口 public interface ReadableByteChannel extends Channel{ public int read (ByteBuffer dst) throws IOException; } //写入接口 public interface WritableByteChannel extends Channel{ public int write (ByteBuffer src) throws IOException; }
关闭通道
- 通道不能被重复使用,当通道关闭时,与特定I/O服务的特定连接就会丢失,然后通道将不再连接任何东西
- 调用通道的close()方法时,可能会导致在关闭底层I/O服务的过程中线程暂时阻塞(Socket通道关闭花费时间较长),哪怕该通道是非阻塞模式,多线程同时close()时,第一个阻塞,后面会跟着阻塞,直到已关闭,再调用close()不会产生任何操作
- isOpen()方法检测通道的开放状态
- 通道引入了一些与关闭和中断有关的新行为,对于一个实现了InterruptibleChannel接口的通道,如果一个线程在该通道上被阻塞同时被中断,那么该通道将被关闭,被阻塞线程会产生ClosedByInterruptException异常
- 一个被设置中断状态(interrupt status)的线程试图访问一个通道时,那么这个通道将立即被关闭,同时抛出ClosedByInterruptException异常
- 可中断的通道是可以异步关闭的,当一个通道被关闭时,休眠在该通道上的所有线程都将被唤醒并受到一个AsynchronousCloseException异常,接着通道就被关闭将不再可用
Scatter/Gather(分散/聚集)
- Scatter/Gather是一个简单却强大的概念,对于read(ByteBuffer [] dsts),从通道中读取的数据会按顺序分散(scatter)到多个缓冲区;对于write(ByteBuffer [] dsts),多个缓冲区中的数据会按顺序进入通道(gather)
public interface ScatteringByteChannel extends ReadableByteChannel{ //scatter: 从通道中读取的数据按顺序放入多个缓冲区中 public long read (ByteBuffer [] dsts)throws IOException; //offset为数组索引,指明从哪个缓冲区开始,length为要使用的缓冲区数量 public long read (ByteBuffer [] dsts, int offset, int length)throws IOException; } public interface GatheringByteChannel extends WritableByteChannel{ //gather: 多个缓冲区中的数据按顺序进入通道中 public long write(ByteBuffer[] srcs)throws IOException; public long write(ByteBuffer[] srcs, int offset, int length) throws IOException; }
- Scatter/Gather是一个简单却强大的概念,对于read(ByteBuffer [] dsts),从通道中读取的数据会按顺序分散(scatter)到多个缓冲区;对于write(ByteBuffer [] dsts),多个缓冲区中的数据会按顺序进入通道(gather)
文件通道
基本
文件通道总是阻塞式的,FileChannel实例只能通过一个打开的file对象(RandomAccessFile丶FileInputStream丶FileOutputStream)上调用getChannel()方法获取,获取到的对象与file对象访问权限相同
FileChannel对象是线程安全的,多个进程可以在同一个实例上并发调用方法而不会引起任何问题,但是有些文件操作是单线程的,会阻塞其他线程
public abstract class FileChannel extends AbstractChannel implements ByteChannel, GatheringByteChannel, ScatteringByteChannel { //...部分方法列表 //从通道的position位置读取字节序列放入dst缓冲区 public abstract int read (ByteBuffer dst, long position) //从给定缓冲区向通道从position位置写入字节序列 public abstract int write (ByteBuffer src, long position) public abstract long size()//返回此通道文件的当前大小字节 public abstract long position()//返回此通道的文件位置 public abstract void position (long newPosition)//设置此通道的文件位置 public abstract void truncate (long size)//截断: 将此通道文件截断为指定大小 //参数metadata表示在方法返回值前文件的元数据(metadate)是否也要同步更新到磁盘 public abstract void force (boolean metaData)//强制文件的所有待定修改立即同步到磁盘 public final FileLock lock()//获取此通道文件的排它锁 //获取此通道给定区域的锁定,position:锁定区域起始位置,size:锁的区域大小,shared:true=共享锁,false=独占锁 //获取共享锁时必须以只读的权限打开,获取独占锁时则需要写的权限 public abstract FileLock lock (long position, long size, boolean shared) public final FileLock tryLock()//尝试获取此通道文件的排它锁,会立即返回结果(成功或者失败) //尝试获取此通道给定区域的排它锁 public abstract FileLock tryLock (long position, long size, boolean shared) //将此通道文件的指定区域直接映射到内存中,position:起始位置,size大小 public abstract MappedByteBuffer map (MapMode mode, long position, long size) public static class MapMode { public static final MapMode READ_ONLY //只读映射模式 public static final MapMode READ_WRITE //读/写映射模式 public static final MapMode PRIVATE //私有(写时复制)映射模式 } //将该通道文件直接传输到给定的可写字节通道,position:起始位置,count:传输最大字节数,target:目标通道 public abstract long transferTo (long position, long count, WritableByteChannel target) //从给定的可读字节通道将字节传输到当前通道中,src:源通道,position:起始位置,count:要传输的最大自己数 public abstract long transferFrom (ReadableByteChannel src, long position, long count) }
访问文件
- 每个FileChannel都有一个”file position”的概念,这个position值决定文件中从哪一处的数据接下来将被读或写
- truncate()方法会砍掉你所指定的新size值之外的所有数据,同时position会被设置为新size值,因此无论传入size大于或者小于原始大小都会产生副作用
- 对于position (long newPosition)方法设置通道的position值,如果设置负值会抛异常,如果设置的值超出文件尾,那么再使用read()方法时会返回一个文件尾,使用write()方法会引起文件增长以容纳写入的字节,并可能出现文件空洞(文件尾到写入的位置position之间的空间被称为文件空洞)
- 文件空洞并不占据磁盘空间,直到后续真正写入数据才存入磁盘,在此之前,文件系统会将其解释为0的子串,所以建立文件空洞时速度非常快,没有磁盘I/O
- 文件空洞虽然没有占用磁盘空间,但是文件系统会扣减程序可用磁盘空间数值大小,做到预留,因此基于文件系统的程序会看到文件空洞占据磁盘空间的表象
- 文件空洞可以预先空间占用,如下载10G的内容需要1小时,使用文件空洞占用10G大小,其他程序将无法在此期间使用该空间
文件锁定
public abstract class FileChannel extends AbstractChannel implements ByteChannel, GatheringByteChannel, ScatteringByteChannel { //...other public final FileLock lock()//获取此通道文件的排它锁 //获取此通道给定区域的锁定,position:锁定区域起始位置,size:锁的区域大小,shared:true=共享锁,false=独占锁 //获取共享锁时必须以只读的权限打开,获取独占锁时则需要写的权限 public abstract FileLock lock (long position, long size, boolean shared) public final FileLock tryLock()//尝试获取此通道文件的排它锁,会立即返回结果(成功或者失败) //尝试获取此通道给定区域的排它锁 public abstract FileLock tryLock (long position, long size, boolean shared) }
- 文件锁与文件关联,而不是与通道关联,锁的是外部进程,而不是同一个Java虚拟机上的线程
- 锁定区域的范围不一定限制在文件的size以内,可以超出文件尾,因此,我们可以提前把待写入数据的区域锁定,我们也可以锁定一个不包含任何文件内容的区域,以备后续增长需要,相反如果锁定了文件的某块区域,然后文件增长超出了那块区域,那么新增加的文件内容将不受到锁保护
//文件锁API public abstract class FileLock { public final FileChannel channel()//锁是由哪个锁创建的 public final long position()//锁定文件的起始位置 public final long size()//锁定文件的大小 public final boolean isShared()//是否为共享锁 public final boolean overlaps (long position, long size)//锁是否与给定范围重叠 public abstract boolean isValid();//锁是否有效 public abstract void release( ) throws IOException;//释放锁 }
内存映射文件
- FileChannel上调用map()方法会创建一个由磁盘文件支持的虚拟内存映射,并在那块虚拟内存空间外部封装一个MappedByteBuffer对象
- MappedByteBuffer对象的行为大体类似一个基于内存的缓冲区,只不过该对象的数据元素存储在磁盘上的 一个文件中(文件数据会缓存到内存中)
- get()方法从磁盘文件中获取数据,put()方法会更新磁盘上的文件,并且做出的修改对于该文件的其他阅读者可见
- 内存映射机制来访问一个文件会比常规方法读写高效得多,甚至比使用通道的效率都高
- MapMode.PRIVATE表示写时复制模式,通过put()方法所做的修改会导致产生一个私有的数据拷贝并且该拷贝中的数据只有MappedByteBuffer实例可以看到,该过程不会对底层文件做任何修改,一旦缓冲区被施以垃圾回收,那么修改就会丢失,必须以read/wirte权限打开文件才能建立MapMode.PRIVATE映射
- MapMode.PRIVATE模式并不会导致你的缓冲区看不到通过其他方式对文件所做的修改,对文件某个区域的修改再使用MapMode.PRIVATE模式的缓冲区中都能反映出来,除非该缓冲区已经修改了文件上的同一个区域
- MappedByteBuffer对象一旦建立之后就保持有效,直到被垃圾收集为止,没有绑定在创建他们的通道上,关闭相关联的FileChannel不会破坏映射(易导致文件被占用无法及时释放)
- MemoryMappedBuffer直接反映它所关联的磁盘文件,如果映射有效时文件被在结构上(大小)修改,那么缓冲区部分可能无法访问,并抛出未检查异常
//MappedByteBuffer创建 public abstract class FileChannel extends AbstractChannel implements ByteChannel, GatheringByteChannel, ScatteringByteChannel { // other //将此通道文件的指定区域直接映射到内存中,position:起始位置,size大小 public abstract MappedByteBuffer map (MapMode mode, long position,long size) public static class MapMode { public static final MapMode READ_ONLY //只读映射模式 public static final MapMode READ_WRITE //读/写映射模式 public static final MapMode PRIVATE //私有(写时复制)映射模式 } }
- 当为一个文件建立虚拟内存映射之后,文件数据通常不会因此被从磁盘读取到内存,对于映射缓冲区,虚拟内存系统将根据需要来把文件中相应区块的数据读进来,这个过程是需要时间的,因此可以使用load()提前加载
- load()方法会加载整个文件以使它常驻内存,这样后续的操作基于内存,访问速度将更高
- 但是load()方法是一个代价高的操作,因为它会导致大量的页调入,load()方法返回并不能保证文件就会完全常驻内存,由于请求页面调入是动态的,该方法主要作用是提前加载文件进入内存,以便后续的访问速度尽可能快
- 对于大多数程序,提前加载文件消耗资源是不划算的,在实际访问时分摊页调入开销才是更好的选择
- 应该总是使用MappedByteBuffer.force()而非FileChannel.force()方法,因为通道对象可能不清楚通过映射缓冲区做出的文件的全部更改
public abstract class MappedByteBuffer extends ByteBuffer { // other //MappedByteBuffer特别的方法 public final MappedByteBuffer load()//加载整个文件使他常驻内存(相当于提前加载) public final boolean isLoaded()//判断被映射的文件是否常驻内存了(不绝对,只是一个暗示) //应使用MappedByteBuffer的force()方法,而非FileChannel的force()方法, //因为通道对象可能不清楚通过映射缓冲区做出的全部更改 public final MappedByteBuffer force()//强制将缓冲区更改反映到磁盘上 }
Channel-to-Channel传输
- 从一个位置将文件数据批量传输到另一个位置,FileChannel类添加了一些优化方法来提高传输过程的效率
- transferTo()和transferFrom()方法允许将一个通道交叉连接到另一个通道,而不需要通过一个中间缓冲区来传递数据,只有FileChannel类有这两个方法
- 直接的通道传输不会更新与某个FileChannel关联的position值,会从position位置开始,传输的字节数不超过count参数值,实际传输的字节数会有方法返回
- 对于transferTo()方法,如果position+count的值大于文件的size值,传输会在文件尾的位置终止;类似的,对于transferFrom()方法: 如果来源src是另一个FileChannel并且已经到达文件尾,那么传输将提前终止
- Channel-to-Channel传输是可以极其快速的,特别是在底层操作系统提供本地支持的时候,对于大量的数据传输,会是一个巨大的帮助
Socket通道
基本
- 全部的socket通道类(DatagramChannel丶SocketChannel和ServerSocketChannel)在被实例化时都会创建一个对等socket对象(java.net类中的Socket丶ServerSocket丶DatagramSocket,它们已经被更新以识别通道),对等的socket可以通过调用socket()方法从一个通道上获取,此外这三个java.net类现在都有getChannel()方法
- 虽然每个socket通道(java.nio.channel包中)都有一个关联的java.net.socket对象,却并非所有的socket都有一个关联的通道,如果使用传统方式(直接实例化)创建了一个Socket对象,它就不会有关联的SocketChannel并且它的getChannel()方法将总是返回null
- 方法
public abstract class SelectableChannel extends AbstractChannel implements Channel { // 部分API列表 //设置通道的阻塞模式:阻塞/非阻塞 public abstract void configureBlocking (boolean block) throws IOException; public abstract boolean isBlocking( ); //判断通道是那种阻塞模式 //防止其他对象修改阻塞模式,只有拥有该对象锁的线程才能修改阻塞模式 public abstract Object blockingLock( ); }
ServerSocketChannel
- 完整API
public abstract class ServerSocketChannel extends AbstractSelectableChannel { /** *工厂方法创建一个新的ServerSocketChannel对象, *返回同一个未绑定的java.net.ServerSocket关联的通道 *该对等ServerSocket可以通过ServerSocketChannel对象调用socket()获取 *通过对等的socket的bind()方法绑定到一个端口开始监听 **/ public static ServerSocketChannel open( ) throws IOException //返回对等的Socket对象 public abstract ServerSocket socket( ); //返回的对象可以在非阻塞模式下运行,非阻塞模式下调用,没有传入链接在等待时,会立即返回null public abstract ServerSocket accept( ) throws IOException; //返回此频道支持操作的操作集 public final int validOps( ) }
- 完整API
SocketChannel
- API
public abstract class SocketChannel extends AbstractSelectableChannel implements ByteChannel, ScatteringByteChannel, GatheringByteChannel { //部分API列表 //打开链接通道 public static SocketChannel open( ) throws IOException //打开链接通道并链接到远程地址(相当于open()+connect(remote)) public static SocketChannel open (InetSocketAddress remote) throws IOException //获取与此通道对等的Socket public abstract Socket socket( ); //将此通道链接到远程地址 public abstract boolean connect (SocketAddress remote) throws IOException; //判断是否正在进行链接操作 public abstract boolean isConnectionPending( ); //判断是否完成链接操作 public abstract boolean finishConnect( ) throws IOException; //判断是否已经完成链接 public abstract boolean isConnected( ); //确定支持的操作的操作集 public final int validOps( ) }
- Socket通道是线程安全的,但是任何时候都只有一个读操作和一个写操作在进行中,sockets是面向流而非包导向的,他们可以保证发送的字节会按照顺序到达但无法承诺维持字节分组,例如某个发送器可能给一个socket写入20字节而接收器调用read()方法却只收到了其中3个字节,剩下的17个字节还在传输中,因此让多个不配合的线程共享某个流socket的同一侧绝非一个好的涉及选择
- connect()和finishConnect()方法是互相同步的,并且只要其中一个操作正在进行,任何读或写方法调用都会阻塞,即使是在非阻塞模式下
- API
DatagramSocket
- API
public abstract class DatagramChannel extends AbstractSelectableChannel implements ByteChannel, ScatteringByteChannel, GatheringByteChannel { //部分方法列表 //打开数据通道 public static DatagramChannel open( ) throws IOException //获取与通道等价的socket public abstract DatagramSocket socket( ); //指定链接到的地址,除了此地址之外任何其他源地址的数据包都会被忽略 public abstract DatagramChannel connect (SocketAddress remote) throws IOException; //判断是否指定连接地址 public abstract boolean isConnected( ); //取消指定的连接地址 public abstract DatagramChannel disconnect( ) throws IOException; //接收数据放入ByteBuffer中,非阻塞模式没有可接受包时会返回null, //包内数据超出byteBuffer范围则会被悄悄丢弃 public abstract SocketAddress receive (ByteBuffer dst) throws IOException; //发送数据包,非阻塞模式返回发送字节数或者0, //发送数据包是一个全有全无的行为,如果超出数据包承载能力,就什么都不发送 public abstract int send (ByteBuffer src, SocketAddress target) //返回读取字节的数量,非阻塞模式返回值可能为0 public abstract int read (ByteBuffer dst) throws IOException; public abstract int write (ByteBuffer src) throws IOException; }
- DatagramChannel(数据包通道UDP)不同与流socket(TCP/IP),流socket具有有序而可靠的数据传输特性,但是该协议为了在包导向的互联网基础设施上维护流的语义必然会产生巨大的开销,并且流语义不能适用所有情形,数据包的吞吐量要比流协议高很多
- API
管道
- 基本
- API
public abstract class Pipe { //打开管道 public static Pipe open( ) throws IOException //返回此通道的源通道 public abstract SourceChannel source( ); //返回此通道的接收通道 public abstract SinkChannel sink( ); //Pipe可写端的通道 public static abstract class SourceChannel extends AbstractSelectableChannel implements ReadableByteChannel, ScatteringByteChannel //Pipe可读端的通道 public static abstract class SinkChannel extends AbstractSelectableChannel implements WritableByteChannel, GatheringByteChannel }
- Pipe实例是通过调用不带参数的Pipe.open()工厂方法来创建的,Pipe类定义了两个嵌套的通道类来实现管路,Pipe.SourceChannel(管道负责读的一端)和Pipe.SinkChannel(管道负责写的一端),这两个通道实例是在Pipe对象创建的同时被创建的,可以通过在Pipe对象上分别调用source()与sink()方法来获取
- 管道所能承载的数据量是依赖实现的,唯一可保证的是写到SinkChannel中的字节都能按照同样的顺序在SourceChannel上重现
- API
通道工具类
- API
public final class Channels{ //返回一个将从给定的通道读取字节的流 public static InputStream newInputStream(ReadableByteChannel ch) //返回一个将给定的通道写入字节的流 public static OutputStream newOutputStream(final WritableByteChannel ch) //返回一个给定的输入流读取数据的通道 public static ReadableByteChannel newChannel(final InputStream in) //返回一个将向给定的输出流写入数据的通道 public static WritableByteChannel newChannel(final OutputStream out) //返回一个reader,它将从给定的通道读取字节并依据提供的CharsetDecoder对读取的字节进行解码 public static Reader newReader(ReadableByteChannel ch, CharsetDecoder dec, int minBufferCap) //返回一个reader,它将从给定的通道读取字节并依据提供的字符集名称将读取到的字节解码为字符 public static Reader newReader(ReadableByteChannel ch, String csName) //返回一个writer,它将使用提供的CharsetEncoder对象对字符编码并写到给定的通道中 public static Writer newWriter(final WritableByteChannel ch, final CharsetEncoder enc, final int minBufferCap) //返回一个writer,它将依据提供的字符集名称对字符编码并写到给定的通道中 public static Writer newWriter(WritableByteChannel ch, String csName) }