From 193cf2ead145d9da98ec0c3fbeb4b06895effee5 Mon Sep 17 00:00:00 2001 From: Chris Hegarty Date: Thu, 23 Jul 2009 14:06:51 +0100 Subject: [PATCH] 6863110: Newly connected/accepted SctpChannel should fire OP_READ if registered with a Selector Reviewed-by: jccollet --- .../classes/sun/nio/ch/SctpChannelImpl.java | 30 +- .../sun/nio/ch/SctpMultiChannelImpl.java | 12 +- .../native/sun/nio/ch/SctpChannelImpl.c | 9 +- .../com/sun/nio/sctp/SctpChannel/CommUp.java | 364 ++++++++++++++++++ .../sun/nio/sctp/SctpMultiChannel/Branch.java | 1 - .../SctpMultiChannel/SocketOptionTests.java | 2 +- 6 files changed, 390 insertions(+), 28 deletions(-) create mode 100644 jdk/test/com/sun/nio/sctp/SctpChannel/CommUp.java diff --git a/jdk/src/solaris/classes/sun/nio/ch/SctpChannelImpl.java b/jdk/src/solaris/classes/sun/nio/ch/SctpChannelImpl.java index 366ccc52cec..d7842570a2c 100644 --- a/jdk/src/solaris/classes/sun/nio/ch/SctpChannelImpl.java +++ b/jdk/src/solaris/classes/sun/nio/ch/SctpChannelImpl.java @@ -127,8 +127,6 @@ public class SctpChannelImpl extends SctpChannel /* -- End of fields protected by stateLock -- */ - private SctpResultContainer commUpResultContainer; /* null */ - /** * Constructor for normal connecting sockets */ @@ -761,12 +759,6 @@ public class SctpChannelImpl extends SctpChannel if (!ensureReceiveOpen()) return null; - if (commUpResultContainer != null) { - resultContainer = commUpResultContainer; - commUpResultContainer = null; - continue; - } - int n = 0; try { begin(); @@ -778,7 +770,7 @@ public class SctpChannelImpl extends SctpChannel } do { - n = receive(fdVal, buffer, resultContainer); + n = receive(fdVal, buffer, resultContainer, fromConnect); } while ((n == IOStatus.INTERRUPTED) && isOpen()); } finally { receiverCleanup(); @@ -809,9 +801,9 @@ public class SctpChannelImpl extends SctpChannel if (fromConnect) { /* If we reach here, then it was connect that invoked - * receive an received the COMM_UP. Save it and allow - * the user handler to process it upon next receive. */ - commUpResultContainer = resultContainer; + * receive and received the COMM_UP. We have already + * handled the COMM_UP with the internal notification + * handler. Simply return. */ return null; } } /* receiveLock */ @@ -827,20 +819,21 @@ public class SctpChannelImpl extends SctpChannel private int receive(int fd, ByteBuffer dst, - SctpResultContainer resultContainer) + SctpResultContainer resultContainer, + boolean peek) throws IOException { int pos = dst.position(); int lim = dst.limit(); assert (pos <= lim); int rem = (pos <= lim ? lim - pos : 0); if (dst instanceof DirectBuffer && rem > 0) - return receiveIntoNativeBuffer(fd, resultContainer, dst, rem, pos); + return receiveIntoNativeBuffer(fd, resultContainer, dst, rem, pos, peek); /* Substitute a native buffer */ int newSize = Math.max(rem, 1); ByteBuffer bb = Util.getTemporaryDirectBuffer(newSize); try { - int n = receiveIntoNativeBuffer(fd, resultContainer, bb, newSize, 0); + int n = receiveIntoNativeBuffer(fd, resultContainer, bb, newSize, 0, peek); bb.flip(); if (n > 0 && rem > 0) dst.put(bb); @@ -854,10 +847,11 @@ public class SctpChannelImpl extends SctpChannel SctpResultContainer resultContainer, ByteBuffer bb, int rem, - int pos) + int pos, + boolean peek) throws IOException { - int n = receive0(fd, resultContainer, ((DirectBuffer)bb).address() + pos, rem); + int n = receive0(fd, resultContainer, ((DirectBuffer)bb).address() + pos, rem, peek); if (n > 0) bb.position(pos + n); @@ -1089,7 +1083,7 @@ public class SctpChannelImpl extends SctpChannel private static native void initIDs(); static native int receive0(int fd, SctpResultContainer resultContainer, - long address, int length) throws IOException; + long address, int length, boolean peek) throws IOException; static native int send0(int fd, long address, int length, SocketAddress target, int assocId, int streamNumber, diff --git a/jdk/src/solaris/classes/sun/nio/ch/SctpMultiChannelImpl.java b/jdk/src/solaris/classes/sun/nio/ch/SctpMultiChannelImpl.java index 555ce0e5971..b8457fdba27 100644 --- a/jdk/src/solaris/classes/sun/nio/ch/SctpMultiChannelImpl.java +++ b/jdk/src/solaris/classes/sun/nio/ch/SctpMultiChannelImpl.java @@ -31,6 +31,8 @@ import java.net.InetSocketAddress; import java.io.FileDescriptor; import java.io.IOException; import java.util.Collections; +import java.util.Map.Entry; +import java.util.Iterator; import java.util.Set; import java.util.HashSet; import java.util.HashMap; @@ -702,7 +704,7 @@ public class SctpMultiChannelImpl extends SctpMultiChannel int assocId = association.associationID(); Set addresses = null; - try { + try { addresses = SctpNet.getRemoteAddresses(fdVal, assocId); } catch (IOException unused) { /* OK, determining connected addresses may not be possible @@ -723,9 +725,11 @@ public class SctpMultiChannelImpl extends SctpMultiChannel /* We cannot determine the connected addresses */ Set> addrAssocs = addressMap.entrySet(); - for (java.util.Map.Entry entry : addrAssocs) { + Iterator> iterator = addrAssocs.iterator(); + while (iterator.hasNext()) { + Entry entry = iterator.next(); if (entry.getValue().equals(association)) { - addressMap.remove(entry.getKey()); + iterator.remove(); } } } @@ -957,7 +961,7 @@ public class SctpMultiChannelImpl extends SctpMultiChannel int length) throws IOException{ return SctpChannelImpl.receive0(fd, resultContainer, address, - length); + length, false /*peek */); } private static int send0(int fd, diff --git a/jdk/src/solaris/native/sun/nio/ch/SctpChannelImpl.c b/jdk/src/solaris/native/sun/nio/ch/SctpChannelImpl.c index 7bb312a6ba5..0cc1b6bf626 100644 --- a/jdk/src/solaris/native/sun/nio/ch/SctpChannelImpl.c +++ b/jdk/src/solaris/native/sun/nio/ch/SctpChannelImpl.c @@ -417,11 +417,11 @@ void handleMessage /* * Class: sun_nio_ch_SctpChannelImpl * Method: receive0 - * Signature: (ILsun/nio/ch/SctpResultContainer;JI)I + * Signature: (ILsun/nio/ch/SctpResultContainer;JIZ)I */ JNIEXPORT jint JNICALL Java_sun_nio_ch_SctpChannelImpl_receive0 (JNIEnv *env, jclass klass, jint fd, jobject resultContainerObj, - jlong address, jint length) { + jlong address, jint length, jboolean peek) { SOCKADDR sa; int sa_len = sizeof(sa); ssize_t rv = 0; @@ -429,6 +429,7 @@ JNIEXPORT jint JNICALL Java_sun_nio_ch_SctpChannelImpl_receive0 struct iovec iov[1]; struct msghdr msg[1]; char cbuf[CMSG_SPACE(sizeof (struct sctp_sndrcvinfo))]; + int flags = peek == JNI_TRUE ? MSG_PEEK : 0; /* Set up the msghdr structure for receiving */ memset(msg, 0, sizeof (*msg)); @@ -443,7 +444,7 @@ JNIEXPORT jint JNICALL Java_sun_nio_ch_SctpChannelImpl_receive0 msg->msg_flags = 0; do { - if ((rv = recvmsg(fd, msg, 0)) < 0) { + if ((rv = recvmsg(fd, msg, flags)) < 0) { if (errno == EWOULDBLOCK) { return IOS_UNAVAILABLE; } else if (errno == EINTR) { @@ -473,7 +474,7 @@ JNIEXPORT jint JNICALL Java_sun_nio_ch_SctpChannelImpl_receive0 memcpy(buf, addr, rv); iov->iov_base = buf + rv; iov->iov_len = NOTIFICATION_BUFFER_SIZE - rv; - if ((rv = recvmsg(fd, msg, 0)) < 0) { + if ((rv = recvmsg(fd, msg, flags)) < 0) { handleSocketError(env, errno); return 0; } diff --git a/jdk/test/com/sun/nio/sctp/SctpChannel/CommUp.java b/jdk/test/com/sun/nio/sctp/SctpChannel/CommUp.java new file mode 100644 index 00000000000..ec2e68ac4d2 --- /dev/null +++ b/jdk/test/com/sun/nio/sctp/SctpChannel/CommUp.java @@ -0,0 +1,364 @@ +/* + * Copyright 2009 Sun Microsystems, Inc. All Rights Reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. + */ + +/* @test + * @bug 6863110 + * @summary Newly connected/accepted SctpChannel should fire OP_READ if registered with a Selector + * @author chegar + */ + +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.io.IOException; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.nio.ByteBuffer; +import java.nio.channels.Selector; +import java.nio.channels.SelectionKey; +import com.sun.nio.sctp.AbstractNotificationHandler; +import com.sun.nio.sctp.AssociationChangeNotification; +import com.sun.nio.sctp.AssociationChangeNotification.AssocChangeEvent; +import com.sun.nio.sctp.HandlerResult; +import com.sun.nio.sctp.Notification; +import com.sun.nio.sctp.SctpChannel; +import com.sun.nio.sctp.SctpServerChannel; +import com.sun.nio.sctp.ShutdownNotification; +import static java.lang.System.out; +import static java.lang.System.err; +import static java.nio.channels.SelectionKey.OP_CONNECT; +import static java.nio.channels.SelectionKey.OP_READ; + +public class CommUp { + static CountDownLatch acceptLatch = new CountDownLatch(1); + static final int TIMEOUT = 10000; + + CommUpNotificationHandler clientHandler = new CommUpNotificationHandler(); + CommUpNotificationHandler serverHandler = new CommUpNotificationHandler(); + CommUpServer server; + Thread clientThread; + + void test(String[] args) { + SocketAddress address = null; + + if (!Util.isSCTPSupported()) { + out.println("SCTP protocol is not supported"); + out.println("Test cannot be run"); + return; + } + + if (args.length == 2) { + /* requested to connecct to a specific address */ + try { + int port = Integer.valueOf(args[1]); + address = new InetSocketAddress(args[0], port); + } catch (NumberFormatException nfe) { + err.println(nfe); + } + } else { + /* start server on local machine, default */ + try { + server = new CommUpServer(); + server.start(); + address = server.address(); + debug("Server started and listening on " + address); + } catch (IOException ioe) { + ioe.printStackTrace(); + return; + } + } + + /* store the main thread so that the server can interrupt it, if necessary */ + clientThread = Thread.currentThread(); + + doClient(address); + } + + void doClient(SocketAddress peerAddress) { + SctpChannel sc = null; + try { + debug("connecting to " + peerAddress); + sc = SctpChannel.open(); + sc.configureBlocking(false); + check(sc.isBlocking() == false, "Should be in non-blocking mode"); + sc.connect(peerAddress); + + Selector selector = Selector.open(); + SelectionKey selectiontKey = sc.register(selector, OP_CONNECT); + + /* Expect two interest Ops */ + boolean opConnectReceived = false; + boolean opReadReceived = false; + for (int z=0; z<2; z++) { + debug("select " + z); + int keysAdded = selector.select(TIMEOUT); + debug("returned " + keysAdded + " keys"); + if (keysAdded > 0) { + Set keys = selector.selectedKeys(); + Iterator i = keys.iterator(); + while(i.hasNext()) { + SelectionKey sk = i.next(); + i.remove(); + SctpChannel readyChannel = + (SctpChannel)sk.channel(); + + /* OP_CONNECT */ + if (sk.isConnectable()) { + /* some trivial checks */ + check(opConnectReceived == false, + "should only received one OP_CONNECT"); + check(opReadReceived == false, + "should not receive OP_READ before OP_CONNECT"); + check(readyChannel.equals(sc), + "channels should be equal"); + check(!sk.isAcceptable(), + "key should not be acceptable"); + check(!sk.isReadable(), + "key should not be readable"); + check(!sk.isWritable(), + "key should not be writable"); + + /* now process the OP_CONNECT */ + opConnectReceived = true; + check((sk.interestOps() & OP_CONNECT) == OP_CONNECT, + "selection key interest ops should contain OP_CONNECT"); + sk.interestOps(OP_READ); + check((sk.interestOps() & OP_CONNECT) != OP_CONNECT, + "selection key interest ops should not contain OP_CONNECT"); + check(sc.finishConnect(), + "finishConnect should return true"); + } /* OP_READ */ + else if (sk.isReadable()) { + /* some trivial checks */ + check(opConnectReceived == true, + "should receive one OP_CONNECT before OP_READ"); + check(opReadReceived == false, + "should not receive OP_READ before OP_CONNECT"); + check(readyChannel.equals(sc), + "channels should be equal"); + check(!sk.isAcceptable(), + "key should not be acceptable"); + check(sk.isReadable(), + "key should be readable"); + check(!sk.isWritable(), + "key should not be writable"); + check(!sk.isConnectable(), + "key should not be connectable"); + + /* now process the OP_READ */ + opReadReceived = true; + selectiontKey.cancel(); + + /* try with small buffer to see if native + * implementation can handle this */ + ByteBuffer buffer = ByteBuffer.allocateDirect(1); + readyChannel.receive(buffer, null, clientHandler); + check(clientHandler.receivedCommUp(), + "Client should have received COMM_UP"); + + /* dont close (or put anything on) the channel until + * we check that the server's accepted channel also + * received COMM_UP */ + serverHandler.waitForCommUp(); + } else { + fail("Unexpected selection key"); + } + } + } else { + fail("Client selector returned 0 ready keys"); + /* stop the server */ + server.thread().interrupt(); + } + } //for + + } catch (IOException ioe) { + unexpected(ioe); + } catch (InterruptedException ie) { + unexpected(ie); + } + } + + class CommUpServer implements Runnable + { + final InetSocketAddress serverAddr; + private SctpServerChannel ssc; + private Thread serverThread; + + public CommUpServer() throws IOException { + ssc = SctpServerChannel.open().bind(null); + java.util.Set addrs = ssc.getAllLocalAddresses(); + if (addrs.isEmpty()) + debug("addrs should not be empty"); + + serverAddr = (InetSocketAddress) addrs.iterator().next(); + } + + void start() { + serverThread = new Thread(this, "CommUpServer-" + + serverAddr.getPort()); + serverThread.start(); + } + + InetSocketAddress address () { + return serverAddr; + } + + Thread thread() { + return serverThread; + } + + @Override + public void run() { + Selector selector = null; + SctpChannel sc = null; + SelectionKey readKey = null; + try { + sc = ssc.accept(); + debug("accepted " + sc); + + selector = Selector.open(); + sc.configureBlocking(false); + check(sc.isBlocking() == false, "Should be in non-blocking mode"); + readKey = sc.register(selector, SelectionKey.OP_READ); + + debug("select"); + int keysAdded = selector.select(TIMEOUT); + debug("returned " + keysAdded + " keys"); + if (keysAdded > 0) { + Set keys = selector.selectedKeys(); + Iterator i = keys.iterator(); + while(i.hasNext()) { + SelectionKey sk = i.next(); + i.remove(); + SctpChannel readyChannel = + (SctpChannel)sk.channel(); + check(readyChannel.equals(sc), + "channels should be equal"); + check(!sk.isAcceptable(), + "key should not be acceptable"); + check(sk.isReadable(), + "key should be readable"); + check(!sk.isWritable(), + "key should not be writable"); + check(!sk.isConnectable(), + "key should not be connectable"); + + /* block until we check if the client has received its COMM_UP*/ + clientHandler.waitForCommUp(); + + ByteBuffer buffer = ByteBuffer.allocateDirect(1); + sc.receive(buffer, null, serverHandler); + check(serverHandler.receivedCommUp(), + "Accepted channel should have received COMM_UP"); + } + } else { + fail("Server selector returned 0 ready keys"); + /* stop the client */ + clientThread.interrupt(); + } + } catch (IOException ioe) { + ioe.printStackTrace(); + } catch (InterruptedException unused) { + } finally { + if (readKey != null) readKey.cancel(); + try { if (selector != null) selector.close(); } + catch (IOException ioe) { unexpected(ioe); } + try { if (ssc != null) ssc.close(); } + catch (IOException ioe) { unexpected(ioe); } + try { if (sc != null) sc.close(); } + catch (IOException ioe) { unexpected(ioe); } + } + } + } + + class CommUpNotificationHandler extends AbstractNotificationHandler + { + private boolean receivedCommUp; // false + + public synchronized boolean receivedCommUp() { + return receivedCommUp; + } + + public synchronized boolean waitForCommUp() throws InterruptedException { + while (receivedCommUp == false) { + wait(); + } + + return false; + } + + @Override + public HandlerResult handleNotification( + Notification notification, Object attachment) { + fail("Unknown notification type"); + return HandlerResult.CONTINUE; + } + + @Override + public synchronized HandlerResult handleNotification( + AssociationChangeNotification notification, Object attachment) { + AssocChangeEvent event = notification.event(); + debug("AssociationChangeNotification"); + debug(" Association: " + notification.association()); + debug(" Event: " + event); + + if (event.equals(AssocChangeEvent.COMM_UP)) { + receivedCommUp = true; + notifyAll(); + } + + return HandlerResult.RETURN; + } + + @Override + public HandlerResult handleNotification( + ShutdownNotification notification, Object attachment) { + debug("ShutdownNotification"); + debug(" Association: " + notification.association()); + return HandlerResult.RETURN; + } + } + + //--------------------- Infrastructure --------------------------- + boolean debug = true; + volatile int passed = 0, failed = 0; + void pass() {passed++;} + void fail() {failed++; Thread.dumpStack();} + void fail(String msg) {err.println(msg); fail();} + void unexpected(Throwable t) {failed++; t.printStackTrace();} + void check(boolean cond) {if (cond) pass(); else fail();} + void check(boolean cond, String failMessage) {if (cond) pass(); else fail(failMessage);} + void debug(String message) {if(debug) { out.println(Thread.currentThread().getName() + ": " + message); } } + void sleep(long millis) { try { Thread.currentThread().sleep(millis); } + catch(InterruptedException ie) { unexpected(ie); }} + public static void main(String[] args) throws Throwable { + Class k = new Object(){}.getClass().getEnclosingClass(); + try {k.getMethod("instanceMain",String[].class) + .invoke( k.newInstance(), (Object) args);} + catch (Throwable e) {throw e.getCause();}} + public void instanceMain(String[] args) throws Throwable { + try {test(args);} catch (Throwable t) {unexpected(t);} + out.printf("%nPassed = %d, failed = %d%n%n", passed, failed); + if (failed > 0) throw new AssertionError("Some tests failed");} + +} diff --git a/jdk/test/com/sun/nio/sctp/SctpMultiChannel/Branch.java b/jdk/test/com/sun/nio/sctp/SctpMultiChannel/Branch.java index eb75415614f..7db79e341d1 100644 --- a/jdk/test/com/sun/nio/sctp/SctpMultiChannel/Branch.java +++ b/jdk/test/com/sun/nio/sctp/SctpMultiChannel/Branch.java @@ -115,7 +115,6 @@ public class Branch { /* Receive the COMM_UP */ buffer.clear(); BranchNotificationHandler handler = new BranchNotificationHandler(); - channel.configureBlocking(false); info = channel.receive(buffer, null, handler); check(handler.receivedCommUp(), "COMM_UP no received"); Set associations = channel.associations(); diff --git a/jdk/test/com/sun/nio/sctp/SctpMultiChannel/SocketOptionTests.java b/jdk/test/com/sun/nio/sctp/SctpMultiChannel/SocketOptionTests.java index 1495f5399e3..e4292dd2339 100644 --- a/jdk/test/com/sun/nio/sctp/SctpMultiChannel/SocketOptionTests.java +++ b/jdk/test/com/sun/nio/sctp/SctpMultiChannel/SocketOptionTests.java @@ -181,7 +181,6 @@ public class SocketOptionTests { /* Receive the COMM_UP */ buffer.clear(); SOTNotificationHandler handler = new SOTNotificationHandler(); - smc.configureBlocking(false); info = smc.receive(buffer, null, handler); check(handler.receivedCommUp(), "COMM_UP no received"); Set associations = smc.associations(); @@ -220,6 +219,7 @@ public class SocketOptionTests { } check(found, "SCTP_PRIMARY_ADDR returned bogus address!"); + System.out.println("Try SCTP_PRIMARY_ADDR set to: " + addrToSet); smc.setOption(SCTP_PRIMARY_ADDR, addrToSet, assoc); System.out.println("SCTP_PRIMARY_ADDR set to: " + addrToSet); primaryAddr = smc.getOption(SCTP_PRIMARY_ADDR, assoc);