本文来自DoubleH的BlogJava博客,原文标题为《JDK7 NIO2 实践: 增加 TransmitFile支持》。对于Java 7 NIO2特性的更多描述,可参考以前Google的一次技术演讲。
#t#JDK7的NIO2特性或许是我最期待的,我一直想基于它写一个高性能的Java Http Server.现在这个想法终于可以实施了。
本人基于目前最新的JDK7 b76开发了一个HTTP Server性能确实不错。
在windows平台上NIO2采用AccpetEx来异步接受连接,并且读写全都关联到IOCP完成端口。不仅如此,为了方便开发者使用,连IOCP工作线程都封装好了,你只要提供线程池就OK。
但是要注意,IOCP工作线程的线程池必须是 Fix的,因为你发出的读写请求都关联到相应的线程上,如果线程死了,那读写完成情况是不知道的。
作为一个Http Server,传送文件是必不可少的功能,那一般文件的传送都是要把程序里的buffer拷贝到内核的buffer,由内核发送出去的。windows平台上为这种情况提供了很好的解决方案,使用TransmitFile接口
复制
BOOL TransmitFile( SOCKET hSocket, HANDLE hFile, DWORD nNumberOfBytesToWrite, DWORD nNumberOfBytesPerSend, LPOVERLAPPED lpOverlapped, LPTRANSMIT_FILE_BUFFERS lpTransmitBuffers, DWORD dwFlags );
1.
2.
3.
4.
5.
6.
7.
8.
9.
你只要把文件句柄发送给内核就行了,内核帮你搞定其余的,真正做到Zero-Copy.
但是很不幸,NIO2里AsynchronousSocketChannel没有提供这样的支持。而为HTTP Server的性能考量,本人只好自己增加这个支持。
要无缝支持,这个必须得表现的跟 Read /Write一样,有完成的通知,通知传送多少数据,等等。
仔细读完sun的IOCP实现以后发现这部分工作他们封装得很好,基本只要往他们的框架里加东西就好了。
为了能访问他们的框架代码,我定义自己的TransmitFile支持类在sun.nio.ch包里,以获得最大的权限。
复制
package sun.nio.ch; import java.io.IOException; import java.lang.reflect.Field; import java.nio.channels.AsynchronousCloseException; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.ClosedChannelException; import java.nio.channels.CompletionHandler; import java.nio.channels.NotYetConnectedException; import java.nio.channels.WritePendingException; import java.util.concurrent.Future; /** * @author Yvon * */ public class WindowsTransmitFileSupport { //Sun's NIO2 channel implementation class private WindowsAsynchronousSocketChannelImpl channel; //nio2 framework core data structure PendingIoCache ioCache; //some field retrieve from sun channel implementation class private Object writeLock; private Field writingF; private Field writeShutdownF; private Field writeKilledF; // f WindowsTransmitFileSupport() { //dummy one for JNI code } /** * */ public WindowsTransmitFileSupport( AsynchronousSocketChannel channel) { this.channel = (WindowsAsynchronousSocketChannelImpl)channel; try { // Initialize the fields Field f = WindowsAsynchronousSocketChannelImpl.class .getDeclaredField("ioCache"); f.setAccessible(true); ioCache = (PendingIoCache) f.get(channel); f = AsynchronousSocketChannelImpl.class .getDeclaredField("writeLock"); f.setAccessible(true); writeLock = f.get(channel); writingF = AsynchronousSocketChannelImpl.class .getDeclaredField("writing"); writingF.setAccessible(true); writeShutdownF = AsynchronousSocketChannelImpl.class .getDeclaredField("writeShutdown"); writeShutdownF.setAccessible(true); writeKilledF = AsynchronousSocketChannelImpl.class .getDeclaredField("writeKilled"); writeKilledF.setAccessible(true); } catch (NoSuchFieldException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (SecurityException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IllegalArgumentException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IllegalAccessException e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * Implements the task to initiate a write and the handler to consume the * result when the send file completes. */ private class SendFileTask implements Runnable, Iocp.ResultHandler { private final PendingFuture result; private final long file;//file is windows file HANDLE SendFileTask(long file, PendingFuture result) { this.result = result; this.file = file; } @Override // @SuppressWarnings("unchecked") public void run() { long overlapped = 0L; boolean pending = false; boolean shutdown = false; try { channel.begin(); // get an OVERLAPPED structure (from the cache or allocate) overlapped = ioCache.add(result); int n = transmitFile0(channel.handle, file, overlapped); if (n == IOStatus.UNAVAILABLE) { // I/O is pending pending = true; return; } if (n == IOStatus.EOF) { // special case for shutdown output shutdown = true; throw new ClosedChannelException(); } // write completed immediately throw new InternalError("Write completed immediately"); } catch (Throwable x) { // write failed. Enable writing before releasing waiters. channel.enableWriting(); if (!shutdown && (x instanceof ClosedChannelException)) x = new AsynchronousCloseException(); if (!(x instanceof IOException)) x = new IOException(x); result.setFailure(x); } finally { // release resources if I/O not pending if (!pending) { if (overlapped != 0L) ioCache.remove(overlapped); } channel.end(); } // invoke completion handler Invoker.invoke(result); } /** * Executed when the I/O has completed */ @Override @SuppressWarnings("unchecked") public void completed(int bytesTransferred, boolean canInvokeDirect) { // release waiters if not already released by timeout synchronized (result) { if (result.isDone()) return; channel.enableWriting(); result.setResult((V) Integer.valueOf(bytesTransferred)); } if (canInvokeDirect) { Invoker.invokeUnchecked(result); } else { Invoker.invoke(result); } } @Override public void failed(int error, IOException x) { // return direct buffer to cache if substituted // release waiters if not already released by timeout if (!channel.isOpen()) x = new AsynchronousCloseException(); synchronized (result) { if (result.isDone()) return; channel.enableWriting(); result.setFailure(x); } Invoker.invoke(result); } } public extends Number, A> Future sendFile(long file, A att, CompletionHandlersuper A> handler) { boolean closed = false; if (channel.isOpen()) { if (channel.remoteAddress == null) throw new NotYetConnectedException(); // check and update state synchronized (writeLock) { try{ if (writeKilledF.getBoolean(channel)) throw new IllegalStateException( "Writing not allowed due to timeout or cancellation"); if (writingF.getBoolean(channel)) throw new WritePendingException(); if (writeShutdownF.getBoolean(channel)) { closed = true; } else { writingF.setBoolean(channel, true); } }catch(Exception e) { IllegalStateException ise=new IllegalStateException(" catch exception when write"); ise.initCause(e); throw ise; } } } else { closed = true; } // channel is closed or shutdown for write if (closed) { Throwable e = new ClosedChannelException(); if (handler == null) return CompletedFuture.withFailure(e); Invoker.invoke(channel, handler, att, null, e); return null; } return implSendFile(file,att,handler); } extends Number, A> Future implSendFile(long file, A attachment, CompletionHandlersuper A> handler) { // setup task PendingFuture result = new PendingFuture(channel, handler, attachment); SendFileTask sendTask=new SendFileTask(file,result); result.setContext(sendTask); // initiate I/O (can only be done from thread in thread pool) // initiate I/O if (Iocp.supportsThreadAgnosticIo()) { sendTask.run(); } else { Invoker.invokeOnThreadInThreadPool(channel, sendTask); } return result; } private native int transmitFile0(long handle, long file, long overlapped); }
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
73.
74.
75.
76.
77.
78.
79.
80.
81.
82.
83.
84.
85.
86.
87.
88.
89.
90.
91.
92.
93.
94.
95.
96.
97.
98.
99.
100.
101.
102.
103.
104.
105.
106.
107.
108.
109.
110.
111.
112.
113.
114.
115.
116.
117.
118.
119.
120.
121.
122.
123.
124.
125.
126.
127.
128.
129.
130.
131.
132.
133.
134.
135.
136.
137.
138.
139.
140.
141.
142.
143.
144.
145.
146.
147.
148.
149.
150.
151.
152.
153.
154.
155.
156.
157.
158.
159.
160.
161.
162.
163.
164.
165.
166.
167.
168.
169.
170.
171.
172.
173.
174.
175.
176.
177.
178.
179.
180.
181.
182.
183.
184.
185.
186.
187.
188.
189.
190.
191.
192.
193.
194.
195.
196.
197.
198.
199.
200.
201.
202.
203.
204.
205.
206.
207.
208.
209.
210.
211.
212.
213.
214.
215.
216.
217.
218.
219.
220.
221.
222.
223.
224.
225.
226.
227.
228.
229.
230.
231.
232.
233.
234.
235.
236.
237.
238.
239.
240.
241.
242.
243.
244.
245.
246.
247.
248.
249.
250.
251.
252.
253.
254.
255.
256.
257.
258.
259.
260.
261.
262.
这个操作跟默认实现的里的write操作是很像的,只是最后调用的本地方法不一样。。
接下来,我们怎么使用呢,这个类是定义在sun的包里的,直接用的话,会报IllegalAccessError,因为我们的类加载器跟初始化加载器是不一样的。
解决办法一个是通过启动参数-Xbootclasspath,让我们的包被初始加载器加载。我个人不喜欢这种办法,所以就采用JNI来定义我们的windows TransmitFile支持类。
这样我们的工作算是完成了,注意,发送文件的时候传得是文件句柄,这样做的好处是你可以更好的控制,一般是在发送前,打开文件句柄,完成后在回调通知方法里关闭文件句柄。
有兴趣的同学可以看看我的HTTP server项目:
http://code.google.com/p/jabhttpd/
目前基本功能实现得差不多,做了些简单的测试,性能比较满意。这个服务器不打算支持servlet api,基本是专门给做基于长连接模式通信的定做的。