mirror of
https://github.com/openjdk/jdk.git
synced 2026-02-13 20:05:31 +00:00
6834246: (ch) AsynchronousSocketChannel#write completes with wrong number of bytes written under load (win)
Reviewed-by: sherman
This commit is contained in:
parent
2c08c535e0
commit
e4f45d0a0b
@ -475,49 +475,40 @@ class WindowsAsynchronousSocketChannelImpl
|
||||
// get an OVERLAPPED structure (from the cache or allocate)
|
||||
overlapped = ioCache.add(result);
|
||||
|
||||
// synchronize on result to allow this thread handle the case
|
||||
// where the read completes immediately.
|
||||
synchronized (result) {
|
||||
int n = read0(handle, numBufs, readBufferArray, overlapped);
|
||||
if (n == IOStatus.UNAVAILABLE) {
|
||||
// I/O is pending
|
||||
pending = true;
|
||||
return;
|
||||
}
|
||||
// read completed immediately:
|
||||
// 1. update buffer position
|
||||
// 2. reset read flag
|
||||
// 3. release waiters
|
||||
if (n == 0) {
|
||||
n = -1;
|
||||
} else {
|
||||
updateBuffers(n);
|
||||
}
|
||||
// initiate read
|
||||
int n = read0(handle, numBufs, readBufferArray, overlapped);
|
||||
if (n == IOStatus.UNAVAILABLE) {
|
||||
// I/O is pending
|
||||
pending = true;
|
||||
return;
|
||||
}
|
||||
if (n == IOStatus.EOF) {
|
||||
// input shutdown
|
||||
enableReading();
|
||||
|
||||
if (scatteringRead) {
|
||||
result.setResult((V)Long.valueOf(n));
|
||||
result.setResult((V)Long.valueOf(-1L));
|
||||
} else {
|
||||
result.setResult((V)Integer.valueOf(n));
|
||||
result.setResult((V)Integer.valueOf(-1));
|
||||
}
|
||||
} else {
|
||||
throw new InternalError("Read completed immediately");
|
||||
}
|
||||
} catch (Throwable x) {
|
||||
// failed to initiate read:
|
||||
// 1. reset read flag
|
||||
// 2. free resources
|
||||
// 3. release waiters
|
||||
// failed to initiate read
|
||||
// reset read flag before releasing waiters
|
||||
enableReading();
|
||||
if (overlapped != 0L)
|
||||
ioCache.remove(overlapped);
|
||||
if (x instanceof ClosedChannelException)
|
||||
x = new AsynchronousCloseException();
|
||||
if (!(x instanceof IOException))
|
||||
x = new IOException(x);
|
||||
result.setFailure(x);
|
||||
} finally {
|
||||
if (prepared && !pending) {
|
||||
// return direct buffer(s) to cache if substituted
|
||||
releaseBuffers();
|
||||
// release resources if I/O not pending
|
||||
if (!pending) {
|
||||
if (overlapped != 0L)
|
||||
ioCache.remove(overlapped);
|
||||
if (prepared)
|
||||
releaseBuffers();
|
||||
}
|
||||
end();
|
||||
}
|
||||
@ -721,7 +712,6 @@ class WindowsAsynchronousSocketChannelImpl
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public void run() {
|
||||
int n = -1;
|
||||
long overlapped = 0L;
|
||||
boolean prepared = false;
|
||||
boolean pending = false;
|
||||
@ -736,56 +726,34 @@ class WindowsAsynchronousSocketChannelImpl
|
||||
|
||||
// get an OVERLAPPED structure (from the cache or allocate)
|
||||
overlapped = ioCache.add(result);
|
||||
|
||||
// synchronize on result to allow this thread handle the case
|
||||
// where the read completes immediately.
|
||||
synchronized (result) {
|
||||
n = write0(handle, numBufs, writeBufferArray, overlapped);
|
||||
if (n == IOStatus.UNAVAILABLE) {
|
||||
// I/O is pending
|
||||
pending = true;
|
||||
return;
|
||||
}
|
||||
|
||||
enableWriting();
|
||||
|
||||
if (n == IOStatus.EOF) {
|
||||
// special case for shutdown output
|
||||
shutdown = true;
|
||||
throw new ClosedChannelException();
|
||||
}
|
||||
|
||||
// write completed immediately:
|
||||
// 1. enable writing
|
||||
// 2. update buffer position
|
||||
// 3. release waiters
|
||||
updateBuffers(n);
|
||||
|
||||
// result is a Long or Integer
|
||||
if (gatheringWrite) {
|
||||
result.setResult((V)Long.valueOf(n));
|
||||
} else {
|
||||
result.setResult((V)Integer.valueOf(n));
|
||||
}
|
||||
int n = write0(handle, numBufs, writeBufferArray, overlapped);
|
||||
if (n == IOStatus.UNAVAILABLE) {
|
||||
// I/O is pending
|
||||
pending = true;
|
||||
return;
|
||||
}
|
||||
if (n == IOStatus.EOF) {
|
||||
// special case for shutdown output
|
||||
shutdown = true;
|
||||
throw new ClosedChannelException();
|
||||
}
|
||||
// write completed immediately
|
||||
throw new InternalError("Write completed immediately");
|
||||
} catch (Throwable x) {
|
||||
// write failed. Enable writing before releasing waiters.
|
||||
enableWriting();
|
||||
|
||||
// failed to initiate read:
|
||||
if (!shutdown && (x instanceof ClosedChannelException))
|
||||
x = new AsynchronousCloseException();
|
||||
if (!(x instanceof IOException))
|
||||
x = new IOException(x);
|
||||
result.setFailure(x);
|
||||
|
||||
// release resources
|
||||
if (overlapped != 0L)
|
||||
ioCache.remove(overlapped);
|
||||
|
||||
} finally {
|
||||
if (prepared && !pending) {
|
||||
// return direct buffer(s) to cache if substituted
|
||||
releaseBuffers();
|
||||
// release resources if I/O not pending
|
||||
if (!pending) {
|
||||
if (overlapped != 0L)
|
||||
ioCache.remove(overlapped);
|
||||
if (prepared)
|
||||
releaseBuffers();
|
||||
}
|
||||
end();
|
||||
}
|
||||
|
||||
@ -157,14 +157,13 @@ Java_sun_nio_ch_WindowsAsynchronousSocketChannelImpl_read0(JNIEnv* env, jclass t
|
||||
WSABUF* lpWsaBuf = (WSABUF*) jlong_to_ptr(address);
|
||||
OVERLAPPED* lpOverlapped = (OVERLAPPED*) jlong_to_ptr(ov);
|
||||
BOOL res;
|
||||
DWORD nread = 0;
|
||||
DWORD flags = 0;
|
||||
|
||||
ZeroMemory((PVOID)lpOverlapped, sizeof(OVERLAPPED));
|
||||
res = WSARecv(s,
|
||||
lpWsaBuf,
|
||||
(DWORD)count,
|
||||
&nread,
|
||||
NULL,
|
||||
&flags,
|
||||
lpOverlapped,
|
||||
NULL);
|
||||
@ -175,17 +174,12 @@ Java_sun_nio_ch_WindowsAsynchronousSocketChannelImpl_read0(JNIEnv* env, jclass t
|
||||
return IOS_UNAVAILABLE;
|
||||
}
|
||||
if (error == WSAESHUTDOWN) {
|
||||
return 0; // input shutdown
|
||||
return IOS_EOF; // input shutdown
|
||||
}
|
||||
JNU_ThrowIOExceptionWithLastError(env, "WSARecv failed");
|
||||
return IOS_THROWN;
|
||||
}
|
||||
if (nread == 0) {
|
||||
// Handle graceful close or bytes not yet available cases
|
||||
// via completion port notification.
|
||||
return IOS_UNAVAILABLE;
|
||||
}
|
||||
return (jint)nread;
|
||||
return IOS_UNAVAILABLE;
|
||||
}
|
||||
|
||||
JNIEXPORT jint JNICALL
|
||||
@ -196,13 +190,12 @@ Java_sun_nio_ch_WindowsAsynchronousSocketChannelImpl_write0(JNIEnv* env, jclass
|
||||
WSABUF* lpWsaBuf = (WSABUF*) jlong_to_ptr(address);
|
||||
OVERLAPPED* lpOverlapped = (OVERLAPPED*) jlong_to_ptr(ov);
|
||||
BOOL res;
|
||||
DWORD nwritten;
|
||||
|
||||
ZeroMemory((PVOID)lpOverlapped, sizeof(OVERLAPPED));
|
||||
res = WSASend(s,
|
||||
lpWsaBuf,
|
||||
(DWORD)count,
|
||||
&nwritten,
|
||||
NULL,
|
||||
0,
|
||||
lpOverlapped,
|
||||
NULL);
|
||||
@ -218,5 +211,5 @@ Java_sun_nio_ch_WindowsAsynchronousSocketChannelImpl_write0(JNIEnv* env, jclass
|
||||
JNU_ThrowIOExceptionWithLastError(env, "WSASend failed");
|
||||
return IOS_THROWN;
|
||||
}
|
||||
return (jint)nwritten;
|
||||
return IOS_UNAVAILABLE;
|
||||
}
|
||||
|
||||
@ -0,0 +1,183 @@
|
||||
/*
|
||||
* Copyright 2008-2009 Sun Microsystems, Inc. All Rights Reserved.
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
* under the terms of the GNU General Public License version 2 only, as
|
||||
* published by the Free Software Foundation.
|
||||
*
|
||||
* This code is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||
* version 2 for more details (a copy is included in the LICENSE file that
|
||||
* accompanied this code).
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License version
|
||||
* 2 along with this work; if not, write to the Free Software Foundation,
|
||||
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
*
|
||||
* Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
|
||||
* CA 95054 USA or visit www.sun.com if you need additional information or
|
||||
* have any questions.
|
||||
*/
|
||||
|
||||
/* @test
|
||||
* @bug 6834246
|
||||
* @summary Stress test connections through the loopback interface
|
||||
*/
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.net.*;
|
||||
import java.nio.channels.*;
|
||||
import java.util.Random;
|
||||
import java.io.IOException;
|
||||
|
||||
public class StressLoopback {
|
||||
static final Random rand = new Random();
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
// setup listener
|
||||
AsynchronousServerSocketChannel listener =
|
||||
AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(0));
|
||||
int port =((InetSocketAddress)(listener.getLocalAddress())).getPort();
|
||||
InetAddress lh = InetAddress.getLocalHost();
|
||||
SocketAddress remote = new InetSocketAddress(lh, port);
|
||||
|
||||
// create sources and sinks
|
||||
int count = 2 + rand.nextInt(9);
|
||||
Source[] source = new Source[count];
|
||||
Sink[] sink = new Sink[count];
|
||||
for (int i=0; i<count; i++) {
|
||||
AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
|
||||
ch.connect(remote).get();
|
||||
source[i] = new Source(ch);
|
||||
sink[i] = new Sink(listener.accept().get());
|
||||
}
|
||||
|
||||
// start the sinks and sources
|
||||
for (int i=0; i<count; i++) {
|
||||
sink[i].start();
|
||||
source[i].start();
|
||||
}
|
||||
|
||||
// let the test run for a while
|
||||
Thread.sleep(20*1000);
|
||||
|
||||
// wait until everyone is done
|
||||
boolean failed = false;
|
||||
long total = 0L;
|
||||
for (int i=0; i<count; i++) {
|
||||
long nwrote = source[i].finish();
|
||||
long nread = sink[i].finish();
|
||||
if (nread != nwrote)
|
||||
failed = true;
|
||||
System.out.format("%d -> %d (%s)\n",
|
||||
nwrote, nread, (failed) ? "FAIL" : "PASS");
|
||||
total += nwrote;
|
||||
}
|
||||
if (failed)
|
||||
throw new RuntimeException("Test failed - see log for details");
|
||||
System.out.format("Total sent %d MB\n", total / (1024L * 1024L));
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes bytes to a channel until "done". When done the channel is closed.
|
||||
*/
|
||||
static class Source {
|
||||
private final AsynchronousByteChannel channel;
|
||||
private final ByteBuffer sentBuffer;
|
||||
private volatile long bytesSent;
|
||||
private volatile boolean finished;
|
||||
|
||||
Source(AsynchronousByteChannel channel) {
|
||||
this.channel = channel;
|
||||
int size = 1024 + rand.nextInt(10000);
|
||||
this.sentBuffer = (rand.nextBoolean()) ?
|
||||
ByteBuffer.allocateDirect(size) : ByteBuffer.allocate(size);
|
||||
}
|
||||
|
||||
void start() {
|
||||
sentBuffer.position(0);
|
||||
sentBuffer.limit(sentBuffer.capacity());
|
||||
channel.write(sentBuffer, null, new CompletionHandler<Integer,Void> () {
|
||||
public void completed(Integer nwrote, Void att) {
|
||||
bytesSent += nwrote;
|
||||
if (finished) {
|
||||
closeUnchecked(channel);
|
||||
} else {
|
||||
sentBuffer.position(0);
|
||||
sentBuffer.limit(sentBuffer.capacity());
|
||||
channel.write(sentBuffer, null, this);
|
||||
}
|
||||
}
|
||||
public void failed(Throwable exc, Void att) {
|
||||
exc.printStackTrace();
|
||||
closeUnchecked(channel);
|
||||
}
|
||||
public void cancelled(Void att) {
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
long finish() {
|
||||
finished = true;
|
||||
waitUntilClosed(channel);
|
||||
return bytesSent;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Read bytes from a channel until EOF is received.
|
||||
*/
|
||||
static class Sink {
|
||||
private final AsynchronousByteChannel channel;
|
||||
private final ByteBuffer readBuffer;
|
||||
private volatile long bytesRead;
|
||||
|
||||
Sink(AsynchronousByteChannel channel) {
|
||||
this.channel = channel;
|
||||
int size = 1024 + rand.nextInt(10000);
|
||||
this.readBuffer = (rand.nextBoolean()) ?
|
||||
ByteBuffer.allocateDirect(size) : ByteBuffer.allocate(size);
|
||||
}
|
||||
|
||||
void start() {
|
||||
channel.read(readBuffer, null, new CompletionHandler<Integer,Void> () {
|
||||
public void completed(Integer nread, Void att) {
|
||||
if (nread < 0) {
|
||||
closeUnchecked(channel);
|
||||
} else {
|
||||
bytesRead += nread;
|
||||
readBuffer.clear();
|
||||
channel.read(readBuffer, null, this);
|
||||
}
|
||||
}
|
||||
public void failed(Throwable exc, Void att) {
|
||||
exc.printStackTrace();
|
||||
closeUnchecked(channel);
|
||||
}
|
||||
public void cancelled(Void att) {
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
long finish() {
|
||||
waitUntilClosed(channel);
|
||||
return bytesRead;
|
||||
}
|
||||
}
|
||||
|
||||
static void waitUntilClosed(Channel c) {
|
||||
while (c.isOpen()) {
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (InterruptedException ignore) { }
|
||||
}
|
||||
}
|
||||
|
||||
static void closeUnchecked(Channel c) {
|
||||
try {
|
||||
c.close();
|
||||
} catch (IOException ignore) { }
|
||||
}
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user