博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
【从入门到放弃-Java】并发编程-NIO-Channel
阅读量:2402 次
发布时间:2019-05-10

本文共 28242 字,大约阅读时间需要 94 分钟。

前言

上篇[【从入门到放弃-Java】并发编程-NIO使用]()简单介绍了nio的基础使用,本篇将深入源码分析nio中channel的实现。

简介

channel即通道,可以用来读、写数据,它是全双工的可以同时用来读写操作。这也是它与stream流的最大区别。

channel需要与buffer配合使用,channel通道的一端是buffer,一端是数据源实体,如文件、socket等。在nio中,通过channel的不同实现来处理 不同实体与数据buffer中的数据传输。

channel接口:

package java.nio.channels;import java.io.IOException;import java.io.Closeable;/** * A nexus for I/O operations. * * 

A channel represents an open connection to an entity such as a hardware * device, a file, a network socket, or a program component that is capable of * performing one or more distinct I/O operations, for example reading or * writing. * *

A channel is either open or closed. A channel is open upon creation, * and once closed it remains closed. Once a channel is closed, any attempt to * invoke an I/O operation upon it will cause a {@link ClosedChannelException} * to be thrown. Whether or not a channel is open may be tested by invoking * its {@link #isOpen isOpen} method. * *

Channels are, in general, intended to be safe for multithreaded access * as described in the specifications of the interfaces and classes that extend * and implement this interface. * * * @author Mark Reinhold * @author JSR-51 Expert Group * @since 1.4 */public interface Channel extends Closeable { /** * Tells whether or not this channel is open. * * @return true if, and only if, this channel is open */ public boolean isOpen(); /** * Closes this channel. * *

After a channel is closed, any further attempt to invoke I/O * operations upon it will cause a {@link ClosedChannelException} to be * thrown. * *

If this channel is already closed then invoking this method has no * effect. * *

This method may be invoked at any time. If some other thread has * already invoked it, however, then another invocation will block until * the first invocation is complete, after which it will return without * effect.

* * @throws IOException If an I/O error occurs */ public void close() throws IOException;}

常见的channel实现有:

  • FileChannel:文件读写数据通道
  • SocketChannel:TCP读写网络数据通道
  • ServerSocketChannel:服务端网络数据读写通道,可以监听TCP连接。对每一个新进来的连接都会创建一个SocketChannel。
  • DatagramChannel:UDP读写网络数据通道

FileChannel

FileChannel是一个抽象类,它继承了AbstractInterruptibleChannel类,并实现了 SeekableByteChannel, GatheringByteChannel, ScatteringByteChannel接口。
具体的实现类主要是sun.nio.ch.FileChannelImpl。下面详细分析下FileChannelImpl中每个方法的具体实现。

open

private FileChannelImpl(FileDescriptor var1, String var2, boolean var3, boolean var4, boolean var5, Object var6) {    //主要记载操作系统维护的文件描述符    this.fd = var1;        //是否可读    this.readable = var3;        //是否可写    this.writable = var4;        //是否以追加的方式打开    this.append = var5;        this.parent = var6;    this.path = var2;        //底层使用native的read和write来处理文件的    this.nd = new FileDispatcherImpl(var5);}//FileInputStream::getChannel 调用 FileChannelImpl.open(fd, path, true, false, this) 获取只读channelpublic static FileChannel open(FileDescriptor var0, String var1, boolean var2, boolean var3, Object var4) {    return new FileChannelImpl(var0, var1, var2, var3, false, var4);}//FileOutputStream::getChannel 调用 FileChannelImpl.open(fd, path, false, true, append, this) 获取只写channelpublic static FileChannel open(FileDescriptor var0, String var1, boolean var2, boolean var3, boolean var4, Object var5) {    return new FileChannelImpl(var0, var1, var2, var3, var4, var5);}private FileChannelImpl(FileDescriptor fd, String path, boolean readable,                        boolean writable, boolean direct, Object parent){    this.fd = fd;        //是否可读    this.readable = readable;        //是否可写    this.writable = writable;        //对于从流创建的channel,在结束时要做不同的清理动作,(openJDK中才有,sun的jdk中没有)    this.parent = parent;        //源文件的path    this.path = path;        //是否使用DirectIO    this.direct = direct;        this.nd = new FileDispatcherImpl();    if (direct) {        assert path != null;        this.alignment = nd.setDirectIO(fd, path);    } else {        this.alignment = -1;    }    //当parent不存在时,则注册一个cleaner,否则交由parent做清理动作。    // Register a cleaning action if and only if there is no parent    // as the parent will take care of closing the file descriptor.    // FileChannel is used by the LambdaMetaFactory so a lambda cannot    // be used here hence we use a nested class instead.    this.closer = parent != null ? null :        CleanerFactory.cleaner().register(this, new Closer(fd));}// Used by FileInputStream.getChannel(), FileOutputStream.getChannel// and RandomAccessFile.getChannel()public static FileChannel open(FileDescriptor fd, String path,                               boolean readable, boolean writable,                               boolean direct, Object parent){    return new FileChannelImpl(fd, path, readable, writable, direct, parent);}
  • open方法主要是返回一个新new的FileChannelImpl对象,初始化时设置fileDescriptor、readable、writable、append、parent、path等属性,看变量名很容易理解,在此不赘述变量含义。

read

//实现自SeekableByteChannel接口的方法,将文件中的内容读取到给定的byteBuffer中public int read(ByteBuffer dst) throws IOException {    //保证读写时,channel处于开启状态    ensureOpen();        //判断是否可读    if (!readable)        throw new NonReadableChannelException();    synchronized (positionLock) {        if (direct)            Util.checkChannelPositionAligned(position(), alignment);        int n = 0;        int ti = -1;        try {                    //开始阻塞,并注册为Interruptible,可以被中断            beginBlocking();                        //将当前线程添加到NativeThreadSet中,并返回索引,方便后续操作。            //NativeThreadSet是一个线程安全的本地线程集合,方便管理,用来发送信号            ti = threads.add();            if (!isOpen())                return 0;            do {                //当未被系统中断(即读取完毕)或channel未被关闭,则一直读,将内容写入到byteBuffer(dst)中                n = IOUtil.read(fd, dst, -1, direct, alignment, nd);            } while ((n == IOStatus.INTERRUPTED) && isOpen());            return IOStatus.normalize(n);        } finally {            //把当前线程从set中移出            threads.remove(ti);                        //结束,释放锁            endBlocking(n > 0);            assert IOStatus.check(n);        }    }}//实现自ScatteringByteChannel接口的方法,将文件中的内容依次读取到给定的byteBuffer数组中。public long read(ByteBuffer[] dsts, int offset, int length)    throws IOException{    if ((offset < 0) || (length < 0) || (offset > dsts.length - length))        throw new IndexOutOfBoundsException();    //保证读写时,channel处于开启状态    ensureOpen();        //判断是否可读    if (!readable)        throw new NonReadableChannelException();    synchronized (positionLock) {        if (direct)            Util.checkChannelPositionAligned(position(), alignment);        long n = 0;        int ti = -1;        try {            //开始阻塞,并注册为Interruptible,可以被中断            beginBlocking();                        //将当前线程添加到NativeThreadSet中,并返回索引,方便后续操作。            //NativeThreadSet是一个线程安全的本地线程集合,方便管理,用来发送信号            ti = threads.add();            if (!isOpen())                return 0;            do {                //当未被系统中断(即读取完毕)或channel未被关闭,则一直读,将内容写入到byteBuffer(dst)中                n = IOUtil.read(fd, dsts, offset, length,                        direct, alignment, nd);            } while ((n == IOStatus.INTERRUPTED) && isOpen());            return IOStatus.normalize(n);        } finally {            //把当前线程从set中移出            threads.remove(ti);                        //结束,释放锁            endBlocking(n > 0);            assert IOStatus.check(n);        }    }}

write

//实现自SeekableByteChannel接口的方法,将byteBuffer中的内容写入到文件中public int write(ByteBuffer src) throws IOException {    //保证写时,channel处于开启状态    ensureOpen();        //判断是否可写    if (!writable)        throw new NonWritableChannelException();    synchronized (positionLock) {        if (direct)            Util.checkChannelPositionAligned(position(), alignment);        int n = 0;        int ti = -1;        try {            //开始阻塞,并注册为Interruptible,可以被中断            beginBlocking();                        //将当前线程添加到NativeThreadSet中,并返回索引,方便后续操作。            //NativeThreadSet是一个线程安全的本地线程集合,方便管理,用来发送信号            ti = threads.add();            if (!isOpen())                return 0;            do {                //当未被系统中断(即写入完毕)或channel未被关闭,则一直写,将内容写入到文件中                n = IOUtil.write(fd, src, -1, direct, alignment, nd);            } while ((n == IOStatus.INTERRUPTED) && isOpen());            return IOStatus.normalize(n);        } finally {            //把当前线程从set中移出            threads.remove(ti);                        //结束,释放锁            assert IOStatus.check(n);        }    }}//实现自GatheringByteChannel接口的方法,将byteBuffer数组中的内容依次写入到文件中public long write(ByteBuffer[] srcs, int offset, int length)    throws IOException{    if ((offset < 0) || (length < 0) || (offset > srcs.length - length))        throw new IndexOutOfBoundsException();        //保证写时,channel处于开启状态        ensureOpen();                //判断是否可写    if (!writable)        throw new NonWritableChannelException();    synchronized (positionLock) {        if (direct)            Util.checkChannelPositionAligned(position(), alignment);        long n = 0;        int ti = -1;        try {            //开始阻塞,并注册为Interruptible,可以被中断            beginBlocking();                        //将当前线程添加到NativeThreadSet中,并返回索引,方便后续操作。            //NativeThreadSet是一个线程安全的本地线程集合,方便管理,用来发送信号            ti = threads.add();            if (!isOpen())                return 0;            do {                //当未被系统中断(即写入完毕)或channel未被关闭,则一直写,将内容写入到文件中                n = IOUtil.write(fd, srcs, offset, length,                        direct, alignment, nd);            } while ((n == IOStatus.INTERRUPTED) && isOpen());            return IOStatus.normalize(n);        } finally {            //把当前线程从set中移出            threads.remove(ti);                        //结束,释放锁            assert IOStatus.check(n);        }    }}

position

//实现自SeekableByteChannel接口的方法,获取当前channel的positionpublic long position() throws IOException {    ensureOpen();    synchronized (positionLock) {        long p = -1;        int ti = -1;        try {            beginBlocking();            ti = threads.add();            if (!isOpen())                return 0;            boolean append = fdAccess.getAppend(fd);            do {                //append模式下,position在channel的末尾                // in append-mode then position is advanced to end before writing                p = (append) ? nd.size(fd) : nd.seek(fd, -1);            } while ((p == IOStatus.INTERRUPTED) && isOpen());            return IOStatus.normalize(p);        } finally {            threads.remove(ti);            endBlocking(p > -1);            assert IOStatus.check(p);        }    }}//实现自SeekableByteChannel接口的方法,设置当前channel的position为newPositionpublic FileChannel position(long newPosition) throws IOException {    ensureOpen();    if (newPosition < 0)        throw new IllegalArgumentException();    synchronized (positionLock) {        long p = -1;        int ti = -1;        try {            beginBlocking();            ti = threads.add();            if (!isOpen())                return null;            do {                //设置当前position为newPosition                p = nd.seek(fd, newPosition);            } while ((p == IOStatus.INTERRUPTED) && isOpen());            return this;        } finally {            threads.remove(ti);            endBlocking(p > -1);            assert IOStatus.check(p);        }    }}

size

实现自SeekableByteChannel接口的方法,返回当前实体(文件)的大小

truncate

实现自SeekableByteChannel接口的方法,用来截取文件至newSize大小

force

实现自SeekableByteChannel接口的方法,用来将channel中尚未写入磁盘的数据强制落盘

transferTo

将fileChannel中的数据传递至另一个channel

transferFrom

从其它channel读取数据至fileChannel

SocketChannel

open

/** * Opens a socket channel. * * 

The new channel is created by invoking the {@link * java.nio.channels.spi.SelectorProvider#openSocketChannel * openSocketChannel} method of the system-wide default {@link * java.nio.channels.spi.SelectorProvider} object.

* * @return A new socket channel * * @throws IOException * If an I/O error occurs */public static SocketChannel open() throws IOException { return SelectorProvider.provider().openSocketChannel();}

open方法是调用SelectorProvider中实现了java.nio.channels.spi.SelectorProvider#openSocketChannel的方法,底层实际是new SocketChannelImpl,调用native方法创建socket

connect

public boolean connect(SocketAddress sa) throws IOException {    //校验Address是否合法    InetSocketAddress isa = Net.checkAddress(sa);        //获取系统安全管理器    SecurityManager sm = System.getSecurityManager();    if (sm != null)                //校验IP和端口是否被允许连接        sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort());    InetAddress ia = isa.getAddress();        //如果是本机地址,则获取本机的host    if (ia.isAnyLocalAddress())        ia = InetAddress.getLocalHost();    try {        //加读锁        readLock.lock();        try {            //加写锁            writeLock.lock();            try {                int n = 0;                                //是否阻塞                boolean blocking = isBlocking();                try {                    //开启connect前的校验并设置为ST_CONNECTIONPENDING,如果blocking是true 即阻塞模式,则记录当前线程的ID,以便接收信号处理。                    beginConnect(blocking, isa);                    do {                        //调用native connect方法                        n = Net.connect(fd, ia, isa.getPort());                    } while (n == IOStatus.INTERRUPTED && isOpen());                } finally {                    //结束连接                    endConnect(blocking, (n > 0));                }                assert IOStatus.check(n);                return n > 0;            } finally {                //释放写锁                writeLock.unlock();            }        } finally {            //释放读锁            readLock.unlock();        }    } catch (IOException ioe) {        // connect failed, close the channel        close();        throw SocketExceptions.of(ioe, isa);    }}

configureBlocking

实现自SelectableChannel的接口方法,调用native方法设置socket的阻塞状态

register

在AbstractSelectableChannel中定义,注册要监听的事件。

public final SelectionKey register(Selector sel, int ops, Object att)        throws ClosedChannelException{    if ((ops & ~validOps()) != 0)        throw new IllegalArgumentException();    if (!isOpen())        throw new ClosedChannelException();    synchronized (regLock) {        if (isBlocking())            throw new IllegalBlockingModeException();        synchronized (keyLock) {            // re-check if channel has been closed            if (!isOpen())                throw new ClosedChannelException();            SelectionKey k = findKey(sel);            if (k != null) {                k.attach(att);                k.interestOps(ops);            } else {                // 向Selector中注册事件                // New registration                k = ((AbstractSelector)sel).register(this, ops, att);                addKey(k);            }            return k;        }    }}

read

//实现自ReadableByteChannel接口的方法,从socket中读取数据至ByteBuffer@Overridepublic int read(ByteBuffer buf) throws IOException {    Objects.requireNonNull(buf);    readLock.lock();    try {        boolean blocking = isBlocking();        int n = 0;        try {            //检查channel是否开启并已经是connected的状态。如果blocking是true 即阻塞模式,则记录当前线程的ID,以便接收信号处理。            beginRead(blocking);            // check if input is shutdown            if (isInputClosed)                return IOStatus.EOF;            //如果是阻塞模式,则一直读取直到数据读取完毕;非阻塞模式则直接调用native方法不需要等待。            if (blocking) {                do {                    n = IOUtil.read(fd, buf, -1, nd);                } while (n == IOStatus.INTERRUPTED && isOpen());            } else {                n = IOUtil.read(fd, buf, -1, nd);            }        } finally {            endRead(blocking, n > 0);            if (n <= 0 && isInputClosed)                return IOStatus.EOF;        }        return IOStatus.normalize(n);    } finally {        readLock.unlock();    }}//实现自ScatteringByteChannel接口的方法,从socket中依次读取数据至ByteBuffer数组@Overridepublic long read(ByteBuffer[] dsts, int offset, int length)    throws IOException{    Objects.checkFromIndexSize(offset, length, dsts.length);    readLock.lock();    try {        boolean blocking = isBlocking();        long n = 0;        try {            beginRead(blocking);            // check if input is shutdown            if (isInputClosed)                return IOStatus.EOF;            //如果是阻塞模式,则一直读取直到数据读取完毕;非阻塞模式则直接调用native方法不需要等待。            if (blocking) {                do {                    n = IOUtil.read(fd, dsts, offset, length, nd);                } while (n == IOStatus.INTERRUPTED && isOpen());            } else {                n = IOUtil.read(fd, dsts, offset, length, nd);            }        } finally {            endRead(blocking, n > 0);            if (n <= 0 && isInputClosed)                return IOStatus.EOF;        }        return IOStatus.normalize(n);    } finally {        readLock.unlock();    }}

write

//实现自ReadableByteChannel接口的方法,将ByteBuffer中的数据写入socket@Overridepublic int write(ByteBuffer buf) throws IOException {    Objects.requireNonNull(buf);    writeLock.lock();    try {        boolean blocking = isBlocking();        int n = 0;        try {            beginWrite(blocking);            //如果是阻塞模式,则一直读取直到数据读取完毕;非阻塞模式则直接调用native方法不需要等待。            if (blocking) {                do {                    n = IOUtil.write(fd, buf, -1, nd);                } while (n == IOStatus.INTERRUPTED && isOpen());            } else {                n = IOUtil.write(fd, buf, -1, nd);            }        } finally {            endWrite(blocking, n > 0);            if (n <= 0 && isOutputClosed)                throw new AsynchronousCloseException();        }        return IOStatus.normalize(n);    } finally {        writeLock.unlock();    }}@Overridepublic long write(ByteBuffer[] srcs, int offset, int length)    throws IOException{    Objects.checkFromIndexSize(offset, length, srcs.length);    writeLock.lock();    try {        boolean blocking = isBlocking();        long n = 0;        try {            beginWrite(blocking);            //如果是阻塞模式,则一直等待直到数据写入完毕;非阻塞模式则直接调用native方法不需要等待。            if (blocking) {                do {                    n = IOUtil.write(fd, srcs, offset, length, nd);                } while (n == IOStatus.INTERRUPTED && isOpen());            } else {                n = IOUtil.write(fd, srcs, offset, length, nd);            }        } finally {            endWrite(blocking, n > 0);            if (n <= 0 && isOutputClosed)                throw new AsynchronousCloseException();        }        return IOStatus.normalize(n);    } finally {        writeLock.unlock();    }}//实现自ReadableByteChannel接口的方法,将ByteBuffer数组中的数据依次写入socket/** * Writes a byte of out of band data. */int sendOutOfBandData(byte b) throws IOException {    writeLock.lock();    try {        boolean blocking = isBlocking();        int n = 0;        try {            beginWrite(blocking);            //如果是阻塞模式,则一直等待直到数据写入完毕;非阻塞模式则直接调用native方法不需要等待。            if (blocking) {                do {                    n = sendOutOfBandData(fd, b);                } while (n == IOStatus.INTERRUPTED && isOpen());            } else {                n = sendOutOfBandData(fd, b);            }        } finally {            endWrite(blocking, n > 0);            if (n <= 0 && isOutputClosed)                throw new AsynchronousCloseException();        }        return IOStatus.normalize(n);    } finally {        writeLock.unlock();    }}

ServerSocketChannel

socket

@Overridepublic ServerSocket socket() {    synchronized (stateLock) {        if (socket == null)            socket = ServerSocketAdaptor.create(this);        return socket;    }}

bind

@Overridepublic ServerSocketChannel bind(SocketAddress local, int backlog) throws IOException {    synchronized (stateLock) {        ensureOpen();        if (localAddress != null)            throw new AlreadyBoundException();        InetSocketAddress isa = (local == null)                                ? new InetSocketAddress(0)                                : Net.checkAddress(local);        SecurityManager sm = System.getSecurityManager();        if (sm != null)            sm.checkListen(isa.getPort());                    //绑定前做一些前置处理,如将tcp socket文件描述符转换成SDP        NetHooks.beforeTcpBind(fd, isa.getAddress(), isa.getPort());                //绑定IP和地址        Net.bind(fd, isa.getAddress(), isa.getPort());                //开始监听,设置socket上最多可以挂起backlog个连接,若backlog小于1 则默认设置50个        Net.listen(fd, backlog < 1 ? 50 : backlog);                localAddress = Net.localAddress(fd);    }    return this;}

accept

@Overridepublic SocketChannel accept() throws IOException {    acceptLock.lock();    try {        int n = 0;        FileDescriptor newfd = new FileDescriptor();        InetSocketAddress[] isaa = new InetSocketAddress[1];        boolean blocking = isBlocking();        try {            begin(blocking);            do {                //阻塞等待接收客户端链接                n = accept(this.fd, newfd, isaa);            } while (n == IOStatus.INTERRUPTED && isOpen());        } finally {            end(blocking, n > 0);            assert IOStatus.check(n);        }        if (n < 1)            return null;        //新接收的socket初始设置为阻塞模式(因此非阻塞模式的每次需要显示设置)        // newly accepted socket is initially in blocking mode        IOUtil.configureBlocking(newfd, true);        InetSocketAddress isa = isaa[0];                //用新接收的socket创建SocketChannel        SocketChannel sc = new SocketChannelImpl(provider(), newfd, isa);        // check permitted to accept connections from the remote address        SecurityManager sm = System.getSecurityManager();        if (sm != null) {            try {                sm.checkAccept(isa.getAddress().getHostAddress(), isa.getPort());            } catch (SecurityException x) {                sc.close();                throw x;            }        }        return sc;    } finally {        acceptLock.unlock();    }}

ServerSocketChannel并没有read和write方法,只是继承了AbstractSelectableChannel,以便在selector中使用

DatagramChannel

open

public DatagramChannelImpl(SelectorProvider sp)    throws IOException{    super(sp);    ResourceManager.beforeUdpCreate();    try {        //如果不支持IPv6则使用IPv4        this.family = Net.isIPv6Available()                ? StandardProtocolFamily.INET6                : StandardProtocolFamily.INET;                        //设置非流式的socket(tcp是流模式协议,udp是数据报模式协议)        this.fd = Net.socket(family, false);        this.fdVal = IOUtil.fdVal(fd);    } catch (IOException ioe) {        ResourceManager.afterUdpClose();        throw ioe;    }}

receive

public SocketAddress receive(ByteBuffer dst) throws IOException {    if (dst.isReadOnly())        throw new IllegalArgumentException("Read-only buffer");    readLock.lock();    try {        boolean blocking = isBlocking();        int n = 0;        ByteBuffer bb = null;        try {            SocketAddress remote = beginRead(blocking, false);            boolean connected = (remote != null);            SecurityManager sm = System.getSecurityManager();            if (connected || (sm == null)) {                // connected or no security manager                do {                    n = receive(fd, dst, connected);                } while ((n == IOStatus.INTERRUPTED) && isOpen());                if (n == IOStatus.UNAVAILABLE)                    return null;            } else {                // Cannot receive into user's buffer when running with a                // security manager and not connected                bb = Util.getTemporaryDirectBuffer(dst.remaining());                for (;;) {                    do {                        n = receive(fd, bb, connected);                    } while ((n == IOStatus.INTERRUPTED) && isOpen());                    if (n == IOStatus.UNAVAILABLE)                        return null;                    InetSocketAddress isa = (InetSocketAddress)sender;                    try {                        sm.checkAccept(isa.getAddress().getHostAddress(),                                       isa.getPort());                    } catch (SecurityException se) {                        // Ignore packet                        bb.clear();                        n = 0;                        continue;                    }                    bb.flip();                    dst.put(bb);                    break;                }            }                        //sender:发送方地址, Set by receive0 (## ugh)            assert sender != null;            return sender;        } finally {            if (bb != null)                Util.releaseTemporaryDirectBuffer(bb);            endRead(blocking, n > 0);            assert IOStatus.check(n);        }    } finally {        readLock.unlock();    }}

send

public int send(ByteBuffer src, SocketAddress target)        throws IOException{    Objects.requireNonNull(src);    InetSocketAddress isa = Net.checkAddress(target, family);    writeLock.lock();    try {        boolean blocking = isBlocking();        int n = 0;        try {            //当connect后,remote会设置为连接的地址            SocketAddress remote = beginWrite(blocking, false);            if (remote != null) {                // connected                if (!target.equals(remote)) {                    throw new AlreadyConnectedException();                }                do {                    n = IOUtil.write(fd, src, -1, nd);                } while ((n == IOStatus.INTERRUPTED) && isOpen());            } else {                // not connected                SecurityManager sm = System.getSecurityManager();                if (sm != null) {                    InetAddress ia = isa.getAddress();                    if (ia.isMulticastAddress()) {                        sm.checkMulticast(ia);                    } else {                        sm.checkConnect(ia.getHostAddress(), isa.getPort());                    }                }                do {                    n = send(fd, src, isa);                } while ((n == IOStatus.INTERRUPTED) && isOpen());            }        } finally {            endWrite(blocking, n > 0);            assert IOStatus.check(n);        }        return IOStatus.normalize(n);    } finally {        writeLock.unlock();    }}

connect

@Overridepublic DatagramChannel connect(SocketAddress sa) throws IOException {    InetSocketAddress isa = Net.checkAddress(sa, family);    SecurityManager sm = System.getSecurityManager();    if (sm != null) {        InetAddress ia = isa.getAddress();        if (ia.isMulticastAddress()) {            sm.checkMulticast(ia);        } else {            sm.checkConnect(ia.getHostAddress(), isa.getPort());            sm.checkAccept(ia.getHostAddress(), isa.getPort());        }    }    readLock.lock();    try {        writeLock.lock();        try {            synchronized (stateLock) {                ensureOpen();                if (state == ST_CONNECTED)                    throw new AlreadyConnectedException();                int n = Net.connect(family,                                    fd,                                    isa.getAddress(),                                    isa.getPort());                if (n <= 0)                    throw new Error();      // Can't happen                // connected                remoteAddress = isa;                state = ST_CONNECTED;                // refresh local address                localAddress = Net.localAddress(fd);                // flush any packets already received.                boolean blocking = isBlocking();                if (blocking) {                    IOUtil.configureBlocking(fd, false);                }                try {                    ByteBuffer buf = ByteBuffer.allocate(100);                    while (receive(buf) != null) {                        buf.clear();                    }                } finally {                    if (blocking) {                        IOUtil.configureBlocking(fd, true);                    }                }            }        } finally {            writeLock.unlock();        }    } finally {        readLock.unlock();    }    return this;}

udp是数据报模式的协议,是没有connect的。这里的connect实际上是在底层忽略了与其他地址的数据传输。

在connect后,就可以像socketChannel似得使用read和write了

总结

本文学习了各种channel的实现,主要是对底层native方法的一些封装,针对不同属性的实体(文件、socket),使用对应的channel与byteBuffer传输数据。再通过byteBuffer与byte数据进行转换。

channel的实现中,封装了大量的native方法,重要的底层实现全在native中,后续可以深入学习下。

本文中出现的byteBuffer和selector将在接下来的文章中,单独分析。

本文为云栖社区原创内容,未经允许不得转载。

转载于:https://my.oschina.net/u/1464083/blog/3070910

你可能感兴趣的文章
打造自己的起高速硬盘--双硬盘RAID0全攻略(转)
查看>>
新手入门:忘记密码并不是件可怕的事(转)
查看>>
ADO对象的关闭以及数据库压缩(转)
查看>>
Win2K无盘终端网组建全攻略(3)(转)
查看>>
让Windows2000飞起来(转)
查看>>
妙用Windows系统任务实现定时提醒(转)
查看>>
启用Win2k的多设备拨号功能(转)
查看>>
轻松将NTFS格式转换FAT32格式(转)
查看>>
在Win2000中妙用系统审核(转)
查看>>
Win2K无盘终端网组建全攻略(5)(转)
查看>>
WindowsXP救急小贴示(转)
查看>>
Win2K无盘终端网组建全攻略(4)(转)
查看>>
Win2K无盘终端网组建全攻略(2)(转)
查看>>
[灾难备份]拿什么拯救你我的金融数据(转)
查看>>
忘记XP登录口令后的应急办法(转)
查看>>
Win2K无盘终端网组建全攻略(1)(转)
查看>>
WIN2000/NT密码全攻略(转)
查看>>
XP命令行加密功能(转)
查看>>
轻轻松松更改NT超级用户密码 (转)
查看>>
给Win2000减减肥(转)
查看>>