8374170: I/O Poller updates

Reviewed-by: michaelm
This commit is contained in:
Alan Bateman 2026-02-06 15:19:01 +00:00
parent 9f13ec1ccb
commit cd5256d5a6
33 changed files with 1043 additions and 511 deletions

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2022, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2022, 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -30,15 +30,28 @@ import java.io.IOException;
* Default PollerProvider for AIX.
*/
class DefaultPollerProvider extends PollerProvider {
DefaultPollerProvider() { }
DefaultPollerProvider(Poller.Mode mode) {
if (mode != Poller.Mode.SYSTEM_THREADS) {
throw new UnsupportedOperationException();
}
super(mode);
}
DefaultPollerProvider() {
this(Poller.Mode.SYSTEM_THREADS);
}
@Override
Poller readPoller(boolean subPoller) throws IOException {
if (subPoller)
throw new UnsupportedOperationException();
return new PollsetPoller(true);
}
@Override
Poller writePoller(boolean subPoller) throws IOException {
if (subPoller)
throw new UnsupportedOperationException();
return new PollsetPoller(false);
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2022, 2023, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2022, 2026, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2023, IBM Corp.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
@ -52,13 +52,19 @@ class PollsetPoller extends Poller {
this.pollBuffer = Pollset.allocatePollArray(MAX_EVENTS_TO_POLL);
}
@Override
void close() {
Pollset.pollsetDestroy(setid);
Pollset.freePollArray(pollBuffer);
}
@Override
int fdVal() {
return setid;
}
@Override
void implRegister(int fd) throws IOException {
void implStartPoll(int fd) throws IOException {
int ret = Pollset.pollsetCtl(setid, Pollset.PS_MOD, fd, Pollset.PS_POLLPRI | event);
if (ret != 0) {
throw new IOException("Unable to register fd " + fd);
@ -66,7 +72,7 @@ class PollsetPoller extends Poller {
}
@Override
void implDeregister(int fd, boolean polled) {
void implStopPoll(int fd, boolean polled) {
int ret = Pollset.pollsetCtl(setid, Pollset.PS_DELETE, fd, 0);
assert ret == 0;
}

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2023, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -31,34 +31,34 @@ import jdk.internal.vm.ContinuationSupport;
* Default PollerProvider for Linux.
*/
class DefaultPollerProvider extends PollerProvider {
DefaultPollerProvider() { }
DefaultPollerProvider(Poller.Mode mode) {
super(mode);
}
@Override
Poller.Mode defaultPollerMode() {
if (ContinuationSupport.isSupported()) {
return Poller.Mode.VTHREAD_POLLERS;
} else {
return Poller.Mode.SYSTEM_THREADS;
}
DefaultPollerProvider() {
var mode = ContinuationSupport.isSupported()
? Poller.Mode.VTHREAD_POLLERS
: Poller.Mode.SYSTEM_THREADS;
this(mode);
}
@Override
int defaultReadPollers(Poller.Mode mode) {
int defaultReadPollers() {
int ncpus = Runtime.getRuntime().availableProcessors();
if (mode == Poller.Mode.VTHREAD_POLLERS) {
return Math.min(Integer.highestOneBit(ncpus), 32);
} else {
return Math.max(Integer.highestOneBit(ncpus / 4), 1);
}
return switch (pollerMode()) {
case SYSTEM_THREADS -> Math.max(Integer.highestOneBit(ncpus / 4), 1);
case VTHREAD_POLLERS -> Math.min(Integer.highestOneBit(ncpus), 32);
default -> super.defaultReadPollers();
};
}
@Override
Poller readPoller(boolean subPoller) throws IOException {
return new EPollPoller(subPoller, true);
return new EPollPoller(pollerMode(), subPoller, true);
}
@Override
Poller writePoller(boolean subPoller) throws IOException {
return new EPollPoller(subPoller, false);
return new EPollPoller(pollerMode(), subPoller, false);
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2023, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -25,6 +25,8 @@
package sun.nio.ch;
import java.io.IOException;
import java.lang.ref.Cleaner.Cleanable;
import jdk.internal.ref.CleanerFactory;
import static sun.nio.ch.EPoll.*;
/**
@ -38,12 +40,70 @@ class EPollPoller extends Poller {
private final int event;
private final int maxEvents;
private final long address;
private final EventFD eventfd; // wakeup event, used for shutdown
EPollPoller(boolean subPoller, boolean read) throws IOException {
this.epfd = EPoll.create();
// close action, and cleaner if this is subpoller
private final Runnable closer;
private final Cleanable cleaner;
EPollPoller(Poller.Mode mode, boolean subPoller, boolean read) throws IOException {
boolean wakeable = (mode == Mode.POLLER_PER_CARRIER) && subPoller;
int maxEvents = (subPoller) ? 16 : 64;
int epfd = EPoll.create();
long address = 0L;
EventFD eventfd = null;
try {
address = EPoll.allocatePollArray(maxEvents);
// register one end of the pipe with epoll to allow for wakeup
if (wakeable) {
eventfd = new EventFD();
IOUtil.configureBlocking(eventfd.efd(), false);
EPoll.ctl(epfd, EPOLL_CTL_ADD, eventfd.efd(), EPOLLIN);
}
} catch (Throwable e) {
FileDispatcherImpl.closeIntFD(epfd);
if (address != 0L) EPoll.freePollArray(address);
if (eventfd != null) eventfd.close();
throw e;
}
this.epfd = epfd;
this.event = (read) ? EPOLLIN : EPOLLOUT;
this.maxEvents = (subPoller) ? 64 : 512;
this.address = EPoll.allocatePollArray(maxEvents);
this.maxEvents = maxEvents;
this.address = address;
this.eventfd = eventfd;
// create action to close epoll instance, register cleaner when wakeable
this.closer = closer(epfd, address, eventfd);
if (wakeable) {
this.cleaner = CleanerFactory.cleaner().register(this, closer);
} else {
this.cleaner = null;
}
}
/**
* Returns an action to close the epoll instance and release other resources.
*/
private static Runnable closer(int epfd, long address, EventFD eventfd) {
return () -> {
try {
FileDispatcherImpl.closeIntFD(epfd);
EPoll.freePollArray(address);
if (eventfd != null) eventfd.close();
} catch (IOException _) { }
};
}
@Override
void close() {
if (cleaner != null) {
cleaner.clean();
} else {
closer.run();
}
}
@Override
@ -52,8 +112,8 @@ class EPollPoller extends Poller {
}
@Override
void implRegister(int fdVal) throws IOException {
// re-arm
void implStartPoll(int fdVal) throws IOException {
// re-enable if already registered but disabled (previously polled)
int err = EPoll.ctl(epfd, EPOLL_CTL_MOD, fdVal, (event | EPOLLONESHOT));
if (err == ENOENT)
err = EPoll.ctl(epfd, EPOLL_CTL_ADD, fdVal, (event | EPOLLONESHOT));
@ -62,24 +122,36 @@ class EPollPoller extends Poller {
}
@Override
void implDeregister(int fdVal, boolean polled) {
void implStopPoll(int fdVal, boolean polled) {
// event is disabled if already polled
if (!polled) {
EPoll.ctl(epfd, EPOLL_CTL_DEL, fdVal, 0);
}
}
@Override
void wakeupPoller() throws IOException {
if (eventfd == null) {
throw new UnsupportedOperationException();
}
eventfd.set();
}
@Override
int poll(int timeout) throws IOException {
int n = EPoll.wait(epfd, address, maxEvents, timeout);
int polled = 0;
int i = 0;
while (i < n) {
long eventAddress = EPoll.getEvent(address, i);
int fdVal = EPoll.getDescriptor(eventAddress);
polled(fdVal);
int fd = EPoll.getDescriptor(eventAddress);
if (eventfd == null || fd != eventfd.efd()) {
polled(fd);
polled++;
}
i++;
}
return n;
return polled;
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2023, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -30,15 +30,21 @@ import java.io.IOException;
* Default PollerProvider for macOS.
*/
class DefaultPollerProvider extends PollerProvider {
DefaultPollerProvider() { }
DefaultPollerProvider(Poller.Mode mode) {
super(mode);
}
DefaultPollerProvider() {
this(Poller.Mode.SYSTEM_THREADS);
}
@Override
Poller readPoller(boolean subPoller) throws IOException {
return new KQueuePoller(subPoller, true);
return new KQueuePoller(pollerMode(), subPoller, true);
}
@Override
Poller writePoller(boolean subPoller) throws IOException {
return new KQueuePoller(subPoller, false);
return new KQueuePoller(pollerMode(), subPoller, false);
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2023, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -25,6 +25,8 @@
package sun.nio.ch;
import java.io.IOException;
import java.lang.ref.Cleaner.Cleanable;
import jdk.internal.ref.CleanerFactory;
import static sun.nio.ch.KQueue.*;
/**
@ -36,11 +38,77 @@ class KQueuePoller extends Poller {
private final int maxEvents;
private final long address;
KQueuePoller(boolean subPoller, boolean read) throws IOException {
this.kqfd = KQueue.create();
// file descriptors used for wakeup during shutdown
private final int fd0;
private final int fd1;
// close action, and cleaner if this is subpoller
private final Runnable closer;
private final Cleanable cleaner;
KQueuePoller(Poller.Mode mode, boolean subPoller, boolean read) throws IOException {
boolean wakeable = (mode == Mode.POLLER_PER_CARRIER) && subPoller;
int maxEvents = (subPoller) ? 16 : 64;
int kqfd = KQueue.create();
long address = 0L;
int fd0 = -1;
int fd1 = -1;
try {
address = KQueue.allocatePollArray(maxEvents);
// register one end of the pipe with kqueue to allow for wakeup
if (wakeable) {
long fds = IOUtil.makePipe(false);
fd0 = (int) (fds >>> 32);
fd1 = (int) fds;
KQueue.register(kqfd, fd0, EVFILT_READ, EV_ADD);
}
} catch (Throwable e) {
FileDispatcherImpl.closeIntFD(kqfd);
if (address != 0L) KQueue.freePollArray(address);
if (fd0 >= 0) FileDispatcherImpl.closeIntFD(fd0);
if (fd1 >= 0) FileDispatcherImpl.closeIntFD(fd1);
throw e;
}
this.kqfd = kqfd;
this.filter = (read) ? EVFILT_READ : EVFILT_WRITE;
this.maxEvents = (subPoller) ? 64 : 512;
this.address = KQueue.allocatePollArray(maxEvents);
this.maxEvents = maxEvents;
this.address = address;
this.fd0 = fd0;
this.fd1 = fd1;
// create action to close kqueue, register cleaner when wakeable
this.closer = closer(kqfd, address, fd0, fd1);
if (wakeable) {
this.cleaner = CleanerFactory.cleaner().register(this, closer);
} else {
this.cleaner = null;
}
}
/**
* Returns an action to close the kqueue and release other resources.
*/
private static Runnable closer(int kqfd, long address, int fd0, int fd1) {
return () -> {
try {
FileDispatcherImpl.closeIntFD(kqfd);
KQueue.freePollArray(address);
if (fd0 >= 0) FileDispatcherImpl.closeIntFD(fd0);
if (fd1 >= 0) FileDispatcherImpl.closeIntFD(fd1);
} catch (IOException _) { }
};
}
@Override
void close() {
if (cleaner != null) {
cleaner.clean();
} else {
closer.run();
}
}
@Override
@ -49,30 +117,42 @@ class KQueuePoller extends Poller {
}
@Override
void implRegister(int fdVal) throws IOException {
void implStartPoll(int fdVal) throws IOException {
int err = KQueue.register(kqfd, fdVal, filter, (EV_ADD|EV_ONESHOT));
if (err != 0)
throw new IOException("kevent failed: " + err);
}
@Override
void implDeregister(int fdVal, boolean polled) {
void implStopPoll(int fdVal, boolean polled) {
// event was deleted if already polled
if (!polled) {
KQueue.register(kqfd, fdVal, filter, EV_DELETE);
}
}
@Override
void wakeupPoller() throws IOException {
if (fd1 < 0) {
throw new UnsupportedOperationException();
}
IOUtil.write1(fd1, (byte)0);
}
@Override
int poll(int timeout) throws IOException {
int n = KQueue.poll(kqfd, address, maxEvents, timeout);
int polled = 0;
int i = 0;
while (i < n) {
long keventAddress = KQueue.getEvent(address, i);
int fdVal = KQueue.getDescriptor(keventAddress);
polled(fdVal);
if (fdVal != fd0) {
polled(fdVal);
polled++;
}
i++;
}
return n;
return polled;
}
}

View File

@ -2277,6 +2277,14 @@ public final class System {
return Thread.scopedValueBindings();
}
public long nativeThreadID(Thread thread) {
return thread.nativeThreadID();
}
public void setThreadNativeID(long id) {
Thread.currentThread().setNativeThreadID(id);
}
public Continuation getContinuation(Thread thread) {
return thread.getContinuation();
}
@ -2311,7 +2319,7 @@ public final class System {
if (thread instanceof BaseVirtualThread vthread) {
vthread.unpark();
} else {
throw new WrongThreadException();
throw new IllegalArgumentException();
}
}

View File

@ -286,6 +286,9 @@ public class Thread implements Runnable {
volatile boolean daemon;
volatile int threadStatus;
// Used by NativeThread for signalling
@Stable long nativeThreadID;
// This map is maintained by the ThreadLocal class
ThreadLocal.ThreadLocalMap terminatingThreadLocals;
@ -312,6 +315,14 @@ public class Thread implements Runnable {
holder.terminatingThreadLocals = map;
}
long nativeThreadID() {
return holder.nativeThreadID;
}
void setNativeThreadID(long id) {
holder.nativeThreadID = id;
}
/*
* ThreadLocal values pertaining to this thread. This map is maintained
* by the ThreadLocal class.

View File

@ -581,6 +581,16 @@ public interface JavaLangAccess {
*/
Object scopedValueBindings();
/**
* Returns the native thread ID for the given platform thread or 0 if not set.
*/
long nativeThreadID(Thread thread);
/**
* Sets the native thread ID for the current platform thread.
*/
void setThreadNativeID(long id);
/**
* Returns the innermost mounted continuation
*/

View File

@ -134,9 +134,9 @@ class DatagramChannelImpl
private static final int ST_CLOSED = 3;
private int state;
// IDs of native threads doing reads and writes, for signalling
private long readerThread;
private long writerThread;
// Threads doing reads and writes, for signalling
private Thread readerThread;
private Thread writerThread;
// Local and remote (connected) address
private InetSocketAddress localAddress;
@ -523,7 +523,7 @@ class DatagramChannelImpl
if (localAddress == null)
bindInternal(null);
if (blocking)
readerThread = NativeThread.current();
readerThread = NativeThread.threadToSignal();
}
return remote;
}
@ -538,7 +538,7 @@ class DatagramChannelImpl
{
if (blocking) {
synchronized (stateLock) {
readerThread = 0;
readerThread = null;
if (state == ST_CLOSING) {
tryFinishClose();
}
@ -1030,7 +1030,7 @@ class DatagramChannelImpl
if (localAddress == null)
bindInternal(null);
if (blocking)
writerThread = NativeThread.current();
writerThread = NativeThread.threadToSignal();
}
return remote;
}
@ -1045,7 +1045,7 @@ class DatagramChannelImpl
{
if (blocking) {
synchronized (stateLock) {
writerThread = 0;
writerThread = null;
if (state == ST_CLOSING) {
tryFinishClose();
}
@ -1714,7 +1714,7 @@ class DatagramChannelImpl
*/
private boolean tryClose() throws IOException {
assert Thread.holdsLock(stateLock) && state == ST_CLOSING;
if ((readerThread == 0) && (writerThread == 0) && !isRegistered()) {
if ((readerThread == null) && (writerThread == null) && !isRegistered()) {
state = ST_CLOSED;
try {
// close socket

View File

@ -587,9 +587,11 @@ public final class IOUtil {
*/
static native int drain1(int fd) throws IOException;
public static native void configureBlocking(FileDescriptor fd,
boolean blocking)
throws IOException;
static native void configureBlocking(int fd, boolean blocking) throws IOException;
public static void configureBlocking(FileDescriptor fd, boolean blocking) throws IOException {
configureBlocking(fdVal(fd), blocking);
}
public static native int fdVal(FileDescriptor fd);

View File

@ -27,8 +27,6 @@ package sun.nio.ch;
import java.io.FileDescriptor;
import java.io.IOException;
import jdk.internal.access.JavaIOFileDescriptorAccess;
import jdk.internal.access.SharedSecrets;
/**
* Allows different platforms to call different native methods
@ -36,7 +34,6 @@ import jdk.internal.access.SharedSecrets;
*/
abstract class NativeDispatcher {
private static final JavaIOFileDescriptorAccess JIOFDA = SharedSecrets.getJavaIOFileDescriptorAccess();
abstract int read(FileDescriptor fd, long address, int len)
throws IOException;
@ -78,12 +75,17 @@ abstract class NativeDispatcher {
* if a platform thread is blocked on the file descriptor then the file descriptor is
* dup'ed to a special fd and the thread signalled so that the syscall fails with EINTR.
*/
final void preClose(FileDescriptor fd, long reader, long writer) throws IOException {
if (NativeThread.isVirtualThread(reader) || NativeThread.isVirtualThread(writer)) {
int fdVal = JIOFDA.get(fd);
Poller.stopPoll(fdVal);
final void preClose(FileDescriptor fd, Thread reader, Thread writer) throws IOException {
if (reader != null && reader.isVirtual()) {
NativeThread.signal(reader); // unparks virtual thread
reader = null;
}
if (NativeThread.isNativeThread(reader) || NativeThread.isNativeThread(writer)) {
if (writer != null && writer.isVirtual()) {
NativeThread.signal(writer); // unparks virtual thread
writer = null;
}
// dup2 and signal platform threads
if (reader != null || writer != null) {
implPreClose(fd, reader, writer);
}
}
@ -92,8 +94,7 @@ abstract class NativeDispatcher {
* This method does nothing by default. On Unix systems the file descriptor is dup'ed
* to a special fd and native threads signalled.
*/
void implPreClose(FileDescriptor fd, long reader, long writer) throws IOException {
void implPreClose(FileDescriptor fd, Thread reader, Thread writer) throws IOException {
// Do nothing by default; this is only needed on Unix
}

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2002, 2023, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2002, 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -31,9 +31,9 @@ class NativeThreadSet {
private static final int OTHER_THREAD_INDEX = -99;
private final int initialCapacity;
private long[] threads; // array of thread handles, created lazily
private int used; // number of thread handles in threads array
private int otherThreads; // count of threads without a native thread handle
private Thread[] threads; // array of platform threads, created lazily
private int used; // number of elements in threads array
private int otherThreads; // additional threads that can't be signalled
private boolean waitingToEmpty;
NativeThreadSet(int n) {
@ -45,28 +45,28 @@ class NativeThreadSet {
* it can efficiently be removed later.
*/
int add() {
long th = NativeThread.current();
synchronized (this) {
if (!NativeThread.isNativeThread(th)) {
final Thread t = NativeThread.threadToSignal();
if (t == null || t.isVirtual()) {
otherThreads++;
return OTHER_THREAD_INDEX;
}
// add native thread handle to array, creating or growing array if needed
// add platform threads to array, creating or growing array if needed
int start = 0;
if (threads == null) {
threads = new long[initialCapacity];
threads = new Thread[initialCapacity];
} else if (used >= threads.length) {
int on = threads.length;
int nn = on * 2;
long[] nthreads = new long[nn];
Thread[] nthreads = new Thread[nn];
System.arraycopy(threads, 0, nthreads, 0, on);
threads = nthreads;
start = on;
}
for (int i = start; i < threads.length; i++) {
if (threads[i] == 0) {
threads[i] = th;
if (threads[i] == null) {
threads[i] = t;
used++;
return i;
}
@ -81,8 +81,7 @@ class NativeThreadSet {
void remove(int i) {
synchronized (this) {
if (i >= 0) {
assert threads[i] == NativeThread.current();
threads[i] = 0;
threads[i] = null;
used--;
} else if (i == OTHER_THREAD_INDEX) {
otherThreads--;
@ -104,9 +103,9 @@ class NativeThreadSet {
while (used > 0 || otherThreads > 0) {
int u = used, i = 0;
while (u > 0 && i < threads.length) {
long th = threads[i];
if (th != 0) {
NativeThread.signal(th);
Thread t = threads[i];
if (t != null) {
NativeThread.signal(t);
u--;
}
i++;

View File

@ -103,8 +103,8 @@ public final class NioSocketImpl extends SocketImpl implements PlatformSocketImp
private volatile boolean nonBlocking;
// used by connect/read/write/accept, protected by stateLock
private long readerThread;
private long writerThread;
private Thread readerThread;
private Thread writerThread;
// used when SO_REUSEADDR is emulated, protected by stateLock
private boolean isReuseAddress;
@ -218,7 +218,7 @@ public final class NioSocketImpl extends SocketImpl implements PlatformSocketImp
private FileDescriptor beginRead() throws SocketException {
synchronized (stateLock) {
ensureOpenAndConnected();
readerThread = NativeThread.current();
readerThread = NativeThread.threadToSignal();
return fd;
}
}
@ -229,7 +229,7 @@ public final class NioSocketImpl extends SocketImpl implements PlatformSocketImp
*/
private void endRead(boolean completed) throws SocketException {
synchronized (stateLock) {
readerThread = 0;
readerThread = null;
int state = this.state;
if (state == ST_CLOSING)
tryFinishClose();
@ -370,7 +370,7 @@ public final class NioSocketImpl extends SocketImpl implements PlatformSocketImp
private FileDescriptor beginWrite() throws SocketException {
synchronized (stateLock) {
ensureOpenAndConnected();
writerThread = NativeThread.current();
writerThread = NativeThread.threadToSignal();
return fd;
}
}
@ -381,7 +381,7 @@ public final class NioSocketImpl extends SocketImpl implements PlatformSocketImp
*/
private void endWrite(boolean completed) throws SocketException {
synchronized (stateLock) {
writerThread = 0;
writerThread = null;
int state = this.state;
if (state == ST_CLOSING)
tryFinishClose();
@ -511,7 +511,7 @@ public final class NioSocketImpl extends SocketImpl implements PlatformSocketImp
this.address = address;
this.port = port;
readerThread = NativeThread.current();
readerThread = NativeThread.threadToSignal();
return fd;
}
}
@ -522,7 +522,7 @@ public final class NioSocketImpl extends SocketImpl implements PlatformSocketImp
*/
private void endConnect(FileDescriptor fd, boolean completed) throws IOException {
synchronized (stateLock) {
readerThread = 0;
readerThread = null;
int state = this.state;
if (state == ST_CLOSING)
tryFinishClose();
@ -666,7 +666,7 @@ public final class NioSocketImpl extends SocketImpl implements PlatformSocketImp
ensureOpen();
if (localport == 0)
throw new SocketException("Not bound");
readerThread = NativeThread.current();
readerThread = NativeThread.threadToSignal();
return fd;
}
}
@ -678,7 +678,7 @@ public final class NioSocketImpl extends SocketImpl implements PlatformSocketImp
private void endAccept(boolean completed) throws SocketException {
synchronized (stateLock) {
int state = this.state;
readerThread = 0;
readerThread = null;
if (state == ST_CLOSING)
tryFinishClose();
if (!completed && state >= ST_CLOSING)
@ -844,7 +844,7 @@ public final class NioSocketImpl extends SocketImpl implements PlatformSocketImp
*/
private boolean tryClose() throws IOException {
assert Thread.holdsLock(stateLock) && state == ST_CLOSING;
if (readerThread == 0 && writerThread == 0) {
if (readerThread == null && writerThread == null) {
try {
cleaner.clean();
} catch (UncheckedIOException ioe) {
@ -1143,8 +1143,8 @@ public final class NioSocketImpl extends SocketImpl implements PlatformSocketImp
ensureOpenAndConnected();
if (!isInputClosed) {
Net.shutdown(fd, Net.SHUT_RD);
if (NativeThread.isVirtualThread(readerThread)) {
Poller.stopPoll(fdVal(fd), Net.POLLIN);
if (readerThread != null && readerThread.isVirtual()) {
Poller.stopPoll(readerThread);
}
isInputClosed = true;
}
@ -1157,8 +1157,8 @@ public final class NioSocketImpl extends SocketImpl implements PlatformSocketImp
ensureOpenAndConnected();
if (!isOutputClosed) {
Net.shutdown(fd, Net.SHUT_WR);
if (NativeThread.isVirtualThread(writerThread)) {
Poller.stopPoll(fdVal(fd), Net.POLLOUT);
if (writerThread != null && writerThread.isVirtual()) {
Poller.stopPoll(writerThread);
}
isOutputClosed = true;
}

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2024, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -25,61 +25,107 @@
package sun.nio.ch;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.ref.Reference;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.locks.LockSupport;
import java.util.function.BooleanSupplier;
import jdk.internal.access.JavaLangAccess;
import jdk.internal.access.SharedSecrets;
import jdk.internal.misc.InnocuousThread;
import jdk.internal.misc.TerminatingThreadLocal;
import jdk.internal.vm.Continuation;
import jdk.internal.vm.ContinuationSupport;
import jdk.internal.vm.annotation.Stable;
/**
* Polls file descriptors. Virtual threads invoke the poll method to park
* until a given file descriptor is ready for I/O.
* I/O poller to allow virtual threads park until a file descriptor is ready for I/O.
*/
public abstract class Poller {
private static final Pollers POLLERS;
static {
try {
var pollers = new Pollers();
pollers.start();
POLLERS = pollers;
} catch (IOException ioe) {
throw new ExceptionInInitializerError(ioe);
}
}
private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
// the poller or sub-poller thread
// the poller group for the I/O pollers and poller threads
private static final PollerGroup POLLER_GROUP = createPollerGroup();
// the poller or sub-poller thread (used for observability only)
private @Stable Thread owner;
// maps file descriptors to parked Thread
private final Map<Integer, Thread> map = new ConcurrentHashMap<>();
// shutdown (if supported by poller group)
private volatile boolean shutdown;
/**
* Poller mode.
*/
enum Mode {
/**
* ReadPoller and WritePoller are dedicated platform threads that block waiting
* for events and unpark virtual threads when file descriptors are ready for I/O.
* Read and write pollers are platform threads that block waiting for events and
* unpark virtual threads when file descriptors are ready for I/O.
*/
SYSTEM_THREADS,
/**
* ReadPoller and WritePoller threads are virtual threads that poll for events,
* yielding between polls and unparking virtual threads when file descriptors are
* Read and write pollers are virtual threads that poll for events, yielding
* between polls and unparking virtual threads when file descriptors are
* ready for I/O. If there are no events then the poller threads park until there
* are I/O events to poll. This mode helps to integrate polling with virtual
* thread scheduling. The approach is similar to the default scheme in "User-level
* Threading: Have Your Cake and Eat It Too" by Karsten and Barghi 2020
* (https://dl.acm.org/doi/10.1145/3379483).
*/
VTHREAD_POLLERS
VTHREAD_POLLERS,
/**
* Read pollers are per-carrier virtual threads that poll for events, yielding
* between polls and unparking virtual threads when file descriptors are ready
* for I/O. If there are no events then the poller threads park until there
* are I/O events to poll. The write poller is a system-wide platform thread.
*/
POLLER_PER_CARRIER
}
/**
* Create and return the PollerGroup.
*/
private static PollerGroup createPollerGroup() {
try {
PollerProvider provider;
if (System.getProperty("jdk.pollerMode") instanceof String s) {
Mode mode = switch (s) {
case "1" -> Mode.SYSTEM_THREADS;
case "2" -> Mode.VTHREAD_POLLERS;
case "3" -> Mode.POLLER_PER_CARRIER;
default -> {
throw new RuntimeException(s + " is not a valid polling mode");
}
};
provider = PollerProvider.createProvider(mode);
} else {
provider = PollerProvider.createProvider();
}
int readPollers = pollerCount("jdk.readPollers", provider.defaultReadPollers());
int writePollers = pollerCount("jdk.writePollers", provider.defaultWritePollers());
PollerGroup group = switch (provider.pollerMode()) {
case SYSTEM_THREADS -> new SystemThreadsPollerGroup(provider, readPollers, writePollers);
case VTHREAD_POLLERS -> new VThreadsPollerGroup(provider, readPollers, writePollers);
case POLLER_PER_CARRIER -> new PollerPerCarrierPollerGroup(provider, writePollers);
};
group.start();
return group;
} catch (IOException ioe) {
throw new UncheckedIOException(ioe);
}
}
/**
@ -89,9 +135,34 @@ public abstract class Poller {
}
/**
* Returns the poller's file descriptor, used when the read and write poller threads
* are virtual threads.
*
* Closes the poller and release resources. This method can only be used to cleanup
* when creating a poller group fails.
*/
abstract void close() throws IOException;
/**
* Sets the poller's thread owner.
*/
private void setOwner() {
owner = Thread.currentThread();
}
/**
* Returns true if this poller is marked for shutdown.
*/
boolean isShutdown() {
return shutdown;
}
/**
* Marks this poller for shutdown.
*/
private void setShutdown() {
shutdown = true;
}
/**
* Returns the poller's file descriptor to use when polling with the master poller.
* @throws UnsupportedOperationException if not supported
*/
int fdVal() {
@ -99,16 +170,18 @@ public abstract class Poller {
}
/**
* Register the file descriptor. The registration is "one shot", meaning it should
* be polled at most once.
* Register the file descriptor with the I/O event management facility so that it is
* polled when the file descriptor is ready for I/O. The registration is "one shot",
* meaning it should be polled at most once.
*/
abstract void implRegister(int fdVal) throws IOException;
abstract void implStartPoll(int fdVal) throws IOException;
/**
* Deregister the file descriptor.
* Deregister a file descriptor from the I/O event management facility. This may be
* a no-op in some implementations when the file descriptor has already been polled.
* @param polled true if the file descriptor has already been polled
*/
abstract void implDeregister(int fdVal, boolean polled);
abstract void implStopPoll(int fdVal, boolean polled) throws IOException;
/**
* Poll for events. The {@link #polled(int)} method is invoked for each
@ -116,15 +189,26 @@ public abstract class Poller {
*
* @param timeout if positive then block for up to {@code timeout} milliseconds,
* if zero then don't block, if -1 then block indefinitely
* @return the number of file descriptors polled
* @return >0 if file descriptors are polled, 0 if no file descriptor polled
*/
abstract int poll(int timeout) throws IOException;
/**
* Wakeup the poller thread if blocked in poll so it can shutdown.
* @throws UnsupportedOperationException if not supported
*/
void wakeupPoller() throws IOException {
throw new UnsupportedOperationException();
}
/**
* Callback by the poll method when a file descriptor is polled.
*/
final void polled(int fdVal) {
wakeup(fdVal);
Thread t = map.remove(fdVal);
if (t != null) {
LockSupport.unpark(t);
}
}
/**
@ -132,19 +216,10 @@ public abstract class Poller {
* @param fdVal the file descriptor
* @param event POLLIN or POLLOUT
* @param nanos the waiting time or 0 to wait indefinitely
* @param supplier supplies a boolean to indicate if the enclosing object is open
* @param isOpen supplies a boolean to indicate if the enclosing object is open
*/
static void poll(int fdVal, int event, long nanos, BooleanSupplier supplier)
throws IOException
{
assert nanos >= 0L;
if (event == Net.POLLIN) {
POLLERS.readPoller(fdVal).poll(fdVal, nanos, supplier);
} else if (event == Net.POLLOUT) {
POLLERS.writePoller(fdVal).poll(fdVal, nanos, supplier);
} else {
assert false;
}
public static void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
POLLER_GROUP.poll(fdVal, event, nanos, isOpen);
}
/**
@ -152,45 +227,24 @@ public abstract class Poller {
* @param fdVal the Selector's file descriptor
* @param nanos the waiting time or 0 to wait indefinitely
*/
static void pollSelector(int fdVal, long nanos) throws IOException {
assert nanos >= 0L;
Poller poller = POLLERS.masterPoller();
if (poller == null) {
poller = POLLERS.readPoller(fdVal);
}
poller.poll(fdVal, nanos, () -> true);
public static void pollSelector(int fdVal, long nanos) throws IOException {
POLLER_GROUP.pollSelector(fdVal, nanos);
}
/**
* If there is a thread polling the given file descriptor for the given event then
* the thread is unparked.
* Unpark the given thread so that it stops polling.
*/
static void stopPoll(int fdVal, int event) {
if (event == Net.POLLIN) {
POLLERS.readPoller(fdVal).wakeup(fdVal);
} else if (event == Net.POLLOUT) {
POLLERS.writePoller(fdVal).wakeup(fdVal);
} else {
throw new IllegalArgumentException();
}
}
/**
* If there are any threads polling the given file descriptor then they are unparked.
*/
static void stopPoll(int fdVal) {
stopPoll(fdVal, Net.POLLIN);
stopPoll(fdVal, Net.POLLOUT);
public static void stopPoll(Thread thread) {
LockSupport.unpark(thread);
}
/**
* Parks the current thread until a file descriptor is ready.
*/
private void poll(int fdVal, long nanos, BooleanSupplier supplier) throws IOException {
register(fdVal);
private void poll(int fdVal, long nanos, BooleanSupplier isOpen) throws IOException {
startPoll(fdVal);
try {
boolean isOpen = supplier.getAsBoolean();
if (isOpen) {
if (isOpen.getAsBoolean() && !isShutdown()) {
if (nanos > 0) {
LockSupport.parkNanos(nanos);
} else {
@ -198,42 +252,38 @@ public abstract class Poller {
}
}
} finally {
deregister(fdVal);
stopPoll(fdVal);
}
}
/**
* Registers the file descriptor to be polled at most once when the file descriptor
* is ready for I/O.
* Register a file descriptor with the I/O event management facility so that it is
* polled when the file descriptor is ready for I/O.
*/
private void register(int fdVal) throws IOException {
private void startPoll(int fdVal) throws IOException {
Thread previous = map.put(fdVal, Thread.currentThread());
assert previous == null;
try {
implRegister(fdVal);
implStartPoll(fdVal);
} catch (Throwable t) {
map.remove(fdVal);
throw t;
} finally {
Reference.reachabilityFence(this);
}
}
/**
* Deregister the file descriptor so that the file descriptor is not polled.
* Deregister a file descriptor from the I/O event management facility.
*/
private void deregister(int fdVal) {
private void stopPoll(int fdVal) throws IOException {
Thread previous = map.remove(fdVal);
boolean polled = (previous == null);
assert polled || previous == Thread.currentThread();
implDeregister(fdVal, polled);
}
/**
* Unparks any thread that is polling the given file descriptor.
*/
private void wakeup(int fdVal) {
Thread t = map.remove(fdVal);
if (t != null) {
LockSupport.unpark(t);
try {
implStopPoll(fdVal, polled);
} finally {
Reference.reachabilityFence(this);
}
}
@ -242,9 +292,9 @@ public abstract class Poller {
* descriptor that is polled.
*/
private void pollerLoop() {
owner = Thread.currentThread();
setOwner();
try {
for (;;) {
while (!isShutdown()) {
poll(-1);
}
} catch (Exception e) {
@ -263,10 +313,10 @@ public abstract class Poller {
*/
private void subPollerLoop(Poller masterPoller) {
assert Thread.currentThread().isVirtual();
owner = Thread.currentThread();
setOwner();
try {
int polled = 0;
for (;;) {
while (!isShutdown()) {
if (polled == 0) {
masterPoller.poll(fdVal(), 0, () -> true); // park
} else {
@ -280,194 +330,463 @@ public abstract class Poller {
}
/**
* Returns the number I/O operations currently registered with this poller.
* Unparks all threads waiting on a file descriptor registered with this poller.
*/
public int registered() {
return map.size();
private void wakeupAll() {
map.values().forEach(LockSupport::unpark);
}
@Override
public String toString() {
return String.format("%s [registered = %d, owner = %s]",
Objects.toIdentityString(this), registered(), owner);
Objects.toIdentityString(this), map.size(), owner);
}
/**
* The Pollers used for read and write events.
* A group of poller threads that support virtual threads polling file descriptors.
*/
private static class Pollers {
private static abstract class PollerGroup {
private final PollerProvider provider;
private final Poller.Mode pollerMode;
private final Poller masterPoller;
private final Poller[] readPollers;
private final Poller[] writePollers;
// used by start method to executor is kept alive
private Executor executor;
/**
* Creates the Poller instances based on configuration.
*/
Pollers() throws IOException {
PollerProvider provider = PollerProvider.provider();
Poller.Mode mode;
String s = System.getProperty("jdk.pollerMode");
if (s != null) {
if (s.equalsIgnoreCase(Mode.SYSTEM_THREADS.name()) || s.equals("1")) {
mode = Mode.SYSTEM_THREADS;
} else if (s.equalsIgnoreCase(Mode.VTHREAD_POLLERS.name()) || s.equals("2")) {
mode = Mode.VTHREAD_POLLERS;
} else {
throw new RuntimeException("Can't parse '" + s + "' as polling mode");
}
} else {
mode = provider.defaultPollerMode();
}
// vthread poller mode needs a master poller
Poller masterPoller = (mode == Mode.VTHREAD_POLLERS)
? provider.readPoller(false)
: null;
// read pollers (or sub-pollers)
int readPollerCount = pollerCount("jdk.readPollers", provider.defaultReadPollers(mode));
Poller[] readPollers = new Poller[readPollerCount];
for (int i = 0; i < readPollerCount; i++) {
readPollers[i] = provider.readPoller(mode == Mode.VTHREAD_POLLERS);
}
// write pollers (or sub-pollers)
int writePollerCount = pollerCount("jdk.writePollers", provider.defaultWritePollers(mode));
Poller[] writePollers = new Poller[writePollerCount];
for (int i = 0; i < writePollerCount; i++) {
writePollers[i] = provider.writePoller(mode == Mode.VTHREAD_POLLERS);
}
PollerGroup(PollerProvider provider) {
this.provider = provider;
this.pollerMode = mode;
this.masterPoller = masterPoller;
this.readPollers = readPollers;
this.writePollers = writePollers;
}
final PollerProvider provider() {
return provider;
}
/**
* Starts the Poller threads.
* Starts the poller group and any system-wide poller threads.
*/
void start() {
if (pollerMode == Mode.VTHREAD_POLLERS) {
startPlatformThread("MasterPoller", masterPoller::pollerLoop);
ThreadFactory factory = Thread.ofVirtual()
.inheritInheritableThreadLocals(false)
.name("SubPoller-", 0)
.uncaughtExceptionHandler((t, e) -> e.printStackTrace())
.factory();
executor = Executors.newThreadPerTaskExecutor(factory);
Arrays.stream(readPollers).forEach(p -> {
executor.execute(() -> p.subPollerLoop(masterPoller));
});
Arrays.stream(writePollers).forEach(p -> {
executor.execute(() -> p.subPollerLoop(masterPoller));
});
} else {
Arrays.stream(readPollers).forEach(p -> {
startPlatformThread("Read-Poller", p::pollerLoop);
});
Arrays.stream(writePollers).forEach(p -> {
startPlatformThread("Write-Poller", p::pollerLoop);
});
}
}
abstract void start();
/**
* Returns the master poller, or null if there is no master poller.
* Parks the current thread until a file descriptor is ready for the given op.
*/
Poller masterPoller() {
return masterPoller;
}
abstract void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException;
/**
* Returns the read poller for the given file descriptor.
* Parks the current thread until a Selector's file descriptor is ready.
*/
Poller readPoller(int fdVal) {
int index = provider.fdValToIndex(fdVal, readPollers.length);
return readPollers[index];
}
/**
* Returns the write poller for the given file descriptor.
*/
Poller writePoller(int fdVal) {
int index = provider.fdValToIndex(fdVal, writePollers.length);
return writePollers[index];
}
/**
* Return the list of read pollers.
*/
List<Poller> readPollers() {
return List.of(readPollers);
}
/**
* Return the list of write pollers.
*/
List<Poller> writePollers() {
return List.of(writePollers);
}
/**
* Reads the given property name to get the poller count. If the property is
* set then the value must be a power of 2. Returns 1 if the property is not
* set.
* @throws IllegalArgumentException if the property is set to a value that
* is not a power of 2.
*/
private static int pollerCount(String propName, int defaultCount) {
String s = System.getProperty(propName);
int count = (s != null) ? Integer.parseInt(s) : defaultCount;
// check power of 2
if (count != Integer.highestOneBit(count)) {
String msg = propName + " is set to a value that is not a power of 2";
throw new IllegalArgumentException(msg);
}
return count;
void pollSelector(int fdVal, long nanos) throws IOException {
poll(fdVal, Net.POLLIN, nanos, () -> true);
}
/**
* Starts a platform thread to run the given task.
*/
private void startPlatformThread(String name, Runnable task) {
try {
Thread thread = InnocuousThread.newSystemThread(name, task);
thread.setDaemon(true);
thread.setUncaughtExceptionHandler((t, e) -> e.printStackTrace());
thread.start();
} catch (Exception e) {
throw new InternalError(e);
protected final void startPlatformThread(String name, Runnable task) {
Thread thread = InnocuousThread.newSystemThread(name, task);
thread.setDaemon(true);
thread.setUncaughtExceptionHandler((t, e) -> e.printStackTrace());
thread.start();
}
/**
* Return the master poller, or null if no master poller.
*/
abstract Poller masterPoller();
/**
* Return the read pollers.
*/
abstract List<Poller> readPollers();
/**
* Return the write pollers.
*/
abstract List<Poller> writePollers();
/**
* Close the given pollers.
*/
static void closeAll(Poller... pollers) {
for (Poller poller : pollers) {
if (poller != null) {
try {
poller.close();
} catch (IOException _) { }
}
}
}
}
/**
* SYSTEM_THREADS poller group. The read and write pollers are system-wide platform threads.
*/
private static class SystemThreadsPollerGroup extends PollerGroup {
// system-wide read and write pollers
private final Poller[] readPollers;
private final Poller[] writePollers;
SystemThreadsPollerGroup(PollerProvider provider,
int readPollerCount,
int writePollerCount) throws IOException {
super(provider);
Poller[] readPollers = new Poller[readPollerCount];
Poller[] writePollers = new Poller[writePollerCount];
try {
for (int i = 0; i < readPollerCount; i++) {
readPollers[i] = provider.readPoller(false);
}
for (int i = 0; i < writePollerCount; i++) {
writePollers[i] = provider.writePoller(false);
}
} catch (Throwable e) {
closeAll(readPollers);
closeAll(writePollers);
throw e;
}
this.readPollers = readPollers;
this.writePollers = writePollers;
}
@Override
void start() {
Arrays.stream(readPollers).forEach(p -> {
startPlatformThread("Read-Poller", p::pollerLoop);
});
Arrays.stream(writePollers).forEach(p -> {
startPlatformThread("Write-Poller", p::pollerLoop);
});
}
private Poller readPoller(int fdVal) {
int index = provider().fdValToIndex(fdVal, readPollers.length);
return readPollers[index];
}
private Poller writePoller(int fdVal) {
int index = provider().fdValToIndex(fdVal, writePollers.length);
return writePollers[index];
}
@Override
void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
Poller poller = (event == Net.POLLIN)
? readPoller(fdVal)
: writePoller(fdVal);
poller.poll(fdVal, nanos, isOpen);
}
@Override
Poller masterPoller() {
return null;
}
@Override
List<Poller> readPollers() {
return List.of(readPollers);
}
@Override
List<Poller> writePollers() {
return List.of(writePollers);
}
}
/**
* VTHREAD_POLLERS poller group. The read and write pollers are virtual threads.
* When read and write pollers need to block then they register with a system-wide
* "master poller" that runs in a dedicated platform thread.
*/
private static class VThreadsPollerGroup extends PollerGroup {
private final Poller masterPoller;
private final Poller[] readPollers;
private final Poller[] writePollers;
// keep virtual thread pollers alive
private final Executor executor;
VThreadsPollerGroup(PollerProvider provider,
int readPollerCount,
int writePollerCount) throws IOException {
super(provider);
Poller masterPoller = provider.readPoller(false);
Poller[] readPollers = new Poller[readPollerCount];
Poller[] writePollers = new Poller[writePollerCount];
try {
for (int i = 0; i < readPollerCount; i++) {
readPollers[i] = provider.readPoller(true);
}
for (int i = 0; i < writePollerCount; i++) {
writePollers[i] = provider.writePoller(true);
}
} catch (Throwable e) {
masterPoller.close();
closeAll(readPollers);
closeAll(writePollers);
throw e;
}
this.masterPoller = masterPoller;
this.readPollers = readPollers;
this.writePollers = writePollers;
ThreadFactory factory = Thread.ofVirtual()
.inheritInheritableThreadLocals(false)
.name("SubPoller-", 0)
.uncaughtExceptionHandler((_, e) -> e.printStackTrace())
.factory();
this.executor = Executors.newThreadPerTaskExecutor(factory);
}
@Override
void start() {
startPlatformThread("Master-Poller", masterPoller::pollerLoop);
Arrays.stream(readPollers).forEach(p -> {
executor.execute(() -> p.subPollerLoop(masterPoller));
});
Arrays.stream(writePollers).forEach(p -> {
executor.execute(() -> p.subPollerLoop(masterPoller));
});
}
private Poller readPoller(int fdVal) {
int index = provider().fdValToIndex(fdVal, readPollers.length);
return readPollers[index];
}
private Poller writePoller(int fdVal) {
int index = provider().fdValToIndex(fdVal, writePollers.length);
return writePollers[index];
}
@Override
void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
Poller poller = (event == Net.POLLIN)
? readPoller(fdVal)
: writePoller(fdVal);
poller.poll(fdVal, nanos, isOpen);
}
@Override
void pollSelector(int fdVal, long nanos) throws IOException {
masterPoller.poll(fdVal, nanos, () -> true);
}
@Override
Poller masterPoller() {
return masterPoller;
}
@Override
List<Poller> readPollers() {
return List.of(readPollers);
}
@Override
List<Poller> writePollers() {
return List.of(writePollers);
}
}
/**
* POLLER_PER_CARRIER poller group. The read poller is a per-carrier virtual thread.
* When a virtual thread polls a file descriptor for POLLIN, then it will use (almost
* always, not guaranteed) the read poller for its carrier. When a read poller needs
* to block then it registers with a system-wide "master poller" that runs in a
* dedicated platform thread. The read poller terminates if the carrier terminates.
* The write pollers are system-wide platform threads (usually one).
*/
private static class PollerPerCarrierPollerGroup extends PollerGroup {
private record CarrierPoller(PollerPerCarrierPollerGroup group, Poller readPoller) { }
private static final TerminatingThreadLocal<CarrierPoller> CARRIER_POLLER =
new TerminatingThreadLocal<>() {
@Override
protected void threadTerminated(CarrierPoller carrierPoller) {
Poller readPoller = carrierPoller.readPoller();
carrierPoller.group().carrierTerminated(readPoller);
}
};
private final Poller masterPoller;
private final Set<Poller> readPollers;
private final Poller[] writePollers;
/**
* Create a PollerPerCarrierPollerGroup with the given number of write pollers.
*/
PollerPerCarrierPollerGroup(PollerProvider provider,
int writePollerCount) throws IOException {
super(provider);
Poller masterPoller = provider.readPoller(false);
Poller[] writePollers = new Poller[writePollerCount];
try {
for (int i = 0; i < writePollerCount; i++) {
writePollers[i] = provider.writePoller(false);
}
} catch (Throwable e) {
masterPoller.close();
closeAll(writePollers);
throw e;
}
this.masterPoller = masterPoller;
this.readPollers = ConcurrentHashMap.newKeySet();;
this.writePollers = writePollers;
}
@Override
void start() {
startPlatformThread("Master-Poller", masterPoller::pollerLoop);
Arrays.stream(writePollers).forEach(p -> {
startPlatformThread("Write-Poller", p::pollerLoop);
});
}
private Poller writePoller(int fdVal) {
int index = provider().fdValToIndex(fdVal, writePollers.length);
return writePollers[index];
}
/**
* Starts a read sub-poller in a virtual thread.
*/
private Poller startReadPoller() throws IOException {
assert Thread.currentThread().isVirtual() && ContinuationSupport.isSupported();
// create read sub-poller
Poller readPoller = provider().readPoller(true);
readPollers.add(readPoller);
// start virtual thread to execute sub-polling loop
Thread carrier = JLA.currentCarrierThread();
Thread.ofVirtual()
.inheritInheritableThreadLocals(false)
.name(carrier.getName() + "-Read-Poller")
.uncaughtExceptionHandler((_, e) -> e.printStackTrace())
.start(() -> subPollerLoop(readPoller));
return readPoller;
}
/**
* Returns the read poller for the current carrier, starting it if required.
*/
private Poller readPoller() throws IOException {
assert Thread.currentThread().isVirtual() && ContinuationSupport.isSupported();
Continuation.pin();
try {
CarrierPoller carrierPoller = CARRIER_POLLER.get();
if (carrierPoller != null) {
return carrierPoller.readPoller();
} else {
// first poll on this carrier will start poller
Poller readPoller = startReadPoller();
CARRIER_POLLER.set(new CarrierPoller(this, readPoller));
return readPoller;
}
} finally {
Continuation.unpin();
}
}
@Override
void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException {
// for POLLIN, get the read poller for this carrier
if (event == Net.POLLIN
&& Thread.currentThread().isVirtual()
&& ContinuationSupport.isSupported()) {
readPoller().poll(fdVal, nanos, isOpen);
return;
}
// -XX:-VMContinuations or POLLIN from platform thread does master poller
if (event == Net.POLLIN) {
masterPoller.poll(fdVal, nanos, isOpen);
} else {
writePoller(fdVal).poll(fdVal, nanos, isOpen);
}
}
@Override
void pollSelector(int fdVal, long nanos) throws IOException {
masterPoller.poll(fdVal, nanos, () -> true);
}
/**
* Sub-poller polling loop.
*/
private void subPollerLoop(Poller readPoller) {
try {
readPoller.subPollerLoop(masterPoller);
} finally {
// wakeup all threads waiting on file descriptors registered with the
// read poller, these I/O operation will migrate to another carrier.
readPoller.wakeupAll();
// remove from serviceability view
readPollers.remove(readPoller);
}
}
/**
* Invoked by the carrier thread before it terminates.
*/
private void carrierTerminated(Poller readPoller) {
readPoller.setShutdown();
try {
readPoller.wakeupPoller();
} catch (Throwable e) {
e.printStackTrace();
}
}
@Override
Poller masterPoller() {
return masterPoller;
}
@Override
List<Poller> readPollers() {
return readPollers.stream().toList();
}
@Override
List<Poller> writePollers() {
return List.of(writePollers);
}
}
/**
* Reads the given property name to get the poller count. If the property is
* set then the value must be a power of 2. Returns 1 if the property is not
* set.
* @throws IllegalArgumentException if the property is set to a value that
* is not a power of 2.
*/
private static int pollerCount(String propName, int defaultCount) {
String s = System.getProperty(propName);
int count = (s != null) ? Integer.parseInt(s) : defaultCount;
// check power of 2
if (count != Integer.highestOneBit(count)) {
String msg = propName + " is set to a value that is not a power of 2";
throw new IllegalArgumentException(msg);
}
return count;
}
/**
* Return the master poller or null if there is no master poller.
*/
public static Poller masterPoller() {
return POLLERS.masterPoller();
return POLLER_GROUP.masterPoller();
}
/**
* Return the list of read pollers.
*/
public static List<Poller> readPollers() {
return POLLERS.readPollers();
return POLLER_GROUP.readPollers();
}
/**
* Return the list of write pollers.
*/
public static List<Poller> writePollers() {
return POLLERS.writePollers();
return POLLER_GROUP.writePollers();
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2023, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -30,38 +30,43 @@ import java.io.IOException;
* Provider class for Poller implementations.
*/
abstract class PollerProvider {
private static final PollerProvider INSTANCE = new DefaultPollerProvider();
private final Poller.Mode mode;
PollerProvider() { }
PollerProvider(Poller.Mode mode) {
this.mode = mode;
}
/**
* Returns the system-wide PollerProvider.
*/
static PollerProvider provider() {
return INSTANCE;
final Poller.Mode pollerMode() {
return mode;
}
/**
* Returns the default poller mode.
* @implSpec The default implementation uses system threads.
* Creates a PollerProvider that uses its preferred/default poller mode.
*/
Poller.Mode defaultPollerMode() {
return Poller.Mode.SYSTEM_THREADS;
static PollerProvider createProvider() {
return new DefaultPollerProvider();
}
/**
* Default number of read pollers for the given mode. The count must be a power of 2.
* Creates a PollerProvider that uses the given poller mode.
*/
static PollerProvider createProvider(Poller.Mode mode) {
return new DefaultPollerProvider(mode);
}
/**
* Default number of read pollers. The count must be a power of 2.
* @implSpec The default implementation returns 1.
*/
int defaultReadPollers(Poller.Mode mode) {
int defaultReadPollers() {
return 1;
}
/**
* Default number of write pollers for the given mode. The count must be a power of 2.
* Default number of write pollers. The count must be a power of 2.
* @implSpec The default implementation returns 1.
*/
int defaultWritePollers(Poller.Mode mode) {
int defaultWritePollers() {
return 1;
}
@ -74,13 +79,13 @@ abstract class PollerProvider {
}
/**
* Creates a Poller for read ops.
* Creates a Poller for POLLIN polling.
* @param subPoller true to create a sub-poller
*/
abstract Poller readPoller(boolean subPoller) throws IOException;
/**
* Creates a Poller for write ops.
* Creates a Poller for POLLOUT polling.
* @param subPoller true to create a sub-poller
*/
abstract Poller writePoller(boolean subPoller) throws IOException;

View File

@ -90,8 +90,8 @@ class ServerSocketChannelImpl
private static final int ST_CLOSED = 2;
private int state;
// ID of native thread currently blocked in this channel, for signalling
private long thread;
// Thread currently blocked in this channel, for signalling
private Thread thread;
// Binding
private SocketAddress localAddress; // null => unbound
@ -349,7 +349,7 @@ class ServerSocketChannelImpl
if (localAddress == null)
throw new NotYetBoundException();
if (blocking)
thread = NativeThread.current();
thread = NativeThread.threadToSignal();
}
}
@ -364,7 +364,7 @@ class ServerSocketChannelImpl
{
if (blocking) {
synchronized (stateLock) {
thread = 0;
thread = null;
if (state == ST_CLOSING) {
tryFinishClose();
}
@ -551,7 +551,7 @@ class ServerSocketChannelImpl
*/
private boolean tryClose() throws IOException {
assert Thread.holdsLock(stateLock) && state == ST_CLOSING;
if ((thread == 0) && !isRegistered()) {
if ((thread == null) && !isRegistered()) {
state = ST_CLOSED;
nd.close(fd);
return true;
@ -583,7 +583,7 @@ class ServerSocketChannelImpl
assert state < ST_CLOSING;
state = ST_CLOSING;
if (!tryClose()) {
nd.preClose(fd, thread, 0);
nd.preClose(fd, thread, null);
}
}
}

View File

@ -113,9 +113,9 @@ class SocketChannelImpl
private static final int ST_CLOSED = 4;
private volatile int state; // need stateLock to change
// IDs of native threads doing reads and writes, for signalling
private long readerThread;
private long writerThread;
// Threads doing reads and writes, for signalling
private Thread readerThread;
private Thread writerThread;
// Binding
private SocketAddress localAddress;
@ -368,7 +368,7 @@ class SocketChannelImpl
synchronized (stateLock) {
ensureOpen();
// record thread so it can be signalled if needed
readerThread = NativeThread.current();
readerThread = NativeThread.threadToSignal();
}
}
}
@ -384,7 +384,7 @@ class SocketChannelImpl
{
if (blocking) {
synchronized (stateLock) {
readerThread = 0;
readerThread = null;
if (state == ST_CLOSING) {
tryFinishClose();
}
@ -522,7 +522,7 @@ class SocketChannelImpl
if (isOutputClosed)
throw new ClosedChannelException();
// record thread so it can be signalled if needed
writerThread = NativeThread.current();
writerThread = NativeThread.threadToSignal();
}
}
}
@ -538,7 +538,7 @@ class SocketChannelImpl
{
if (blocking) {
synchronized (stateLock) {
writerThread = 0;
writerThread = null;
if (state == ST_CLOSING) {
tryFinishClose();
}
@ -673,7 +673,7 @@ class SocketChannelImpl
ensureOpenAndConnected();
if (isOutputClosed)
throw new ClosedChannelException();
writerThread = NativeThread.current();
writerThread = NativeThread.threadToSignal();
completed = true;
}
} finally {
@ -689,7 +689,7 @@ class SocketChannelImpl
*/
void afterTransferTo(boolean completed) throws AsynchronousCloseException {
synchronized (stateLock) {
writerThread = 0;
writerThread = null;
if (state == ST_CLOSING) {
tryFinishClose();
}
@ -874,7 +874,7 @@ class SocketChannelImpl
if (blocking) {
// record thread so it can be signalled if needed
readerThread = NativeThread.current();
readerThread = NativeThread.threadToSignal();
}
}
}
@ -993,7 +993,7 @@ class SocketChannelImpl
throw new NoConnectionPendingException();
if (blocking) {
// record thread so it can be signalled if needed
readerThread = NativeThread.current();
readerThread = NativeThread.threadToSignal();
}
}
}
@ -1072,7 +1072,7 @@ class SocketChannelImpl
*/
private boolean tryClose() throws IOException {
assert Thread.holdsLock(stateLock) && state == ST_CLOSING;
if ((readerThread == 0) && (writerThread == 0) && !isRegistered()) {
if ((readerThread == null) && (writerThread == null) && !isRegistered()) {
state = ST_CLOSED;
nd.close(fd);
return true;
@ -1215,11 +1215,8 @@ class SocketChannelImpl
throw new NotYetConnectedException();
if (!isInputClosed) {
Net.shutdown(fd, Net.SHUT_RD);
long reader = readerThread;
if (NativeThread.isVirtualThread(reader)) {
Poller.stopPoll(fdVal, Net.POLLIN);
} else if (NativeThread.isNativeThread(reader)) {
NativeThread.signal(reader);
if (readerThread != null && readerThread.isVirtual()) {
Poller.stopPoll(readerThread);
}
isInputClosed = true;
}
@ -1235,11 +1232,8 @@ class SocketChannelImpl
throw new NotYetConnectedException();
if (!isOutputClosed) {
Net.shutdown(fd, Net.SHUT_WR);
long writer = writerThread;
if (NativeThread.isVirtualThread(writer)) {
Poller.stopPoll(fdVal, Net.POLLOUT);
} else if (NativeThread.isNativeThread(writer)) {
NativeThread.signal(writer);
if (writerThread != null && writerThread.isVirtual()) {
Poller.stopPoll(writerThread);
}
isOutputClosed = true;
}

View File

@ -25,6 +25,9 @@
package sun.nio.ch;
import java.util.concurrent.locks.LockSupport;
import jdk.internal.access.JavaLangAccess;
import jdk.internal.access.SharedSecrets;
// Signalling operations on native threads
//
@ -37,45 +40,38 @@ package sun.nio.ch;
// always returns -1 and the signal(long) method has no effect.
public class NativeThread {
private static final long VIRTUAL_THREAD_ID = -1L;
private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
private NativeThread() { }
/**
* Returns the id of the current native thread if the platform can signal
* native threads, 0 if the platform can not signal native threads, or
* -1L if the current thread is a virtual thread.
*/
public static long current() {
if (Thread.currentThread().isVirtual()) {
return VIRTUAL_THREAD_ID;
} else {
return current0();
}
}
/**
* Signals the given native thread.
* Returns the Thread to signal the current thread.
*
* @throws IllegalArgumentException if tid is not a token to a native thread
* The first use of this method on a platform thread will capture the thread's
* native thread ID.
*/
public static void signal(long tid) {
if (tid == 0 || tid == VIRTUAL_THREAD_ID)
throw new IllegalArgumentException();
signal0(tid);
public static Thread threadToSignal() {
Thread t = Thread.currentThread();
if (!t.isVirtual() && JLA.nativeThreadID(t) == 0) {
JLA.setThreadNativeID(current0());
}
return t;
}
/**
* Returns true the tid is the id of a native thread.
* Signals the given thread. For a platform thread it sends a signal to the thread.
* For a virtual thread it just unparks it.
* @throws IllegalStateException if the thread is a platform thread that hasn't set its native ID
*/
static boolean isNativeThread(long tid) {
return (tid != 0 && tid != VIRTUAL_THREAD_ID);
}
/**
* Returns true if tid is -1L.
* @see #current()
*/
static boolean isVirtualThread(long tid) {
return (tid == VIRTUAL_THREAD_ID);
public static void signal(Thread thread) {
if (thread.isVirtual()) {
LockSupport.unpark(thread);
} else {
long id = JLA.nativeThreadID(thread);
if (id == 0)
throw new IllegalStateException("Native thread ID not set");
signal0(id);
}
}
/**

View File

@ -63,8 +63,8 @@ class SinkChannelImpl
private static final int ST_CLOSED = 2;
private int state;
// ID of native thread doing write, for signalling
private long thread;
// Thread doing write, for signalling
private Thread writerThread;
// True if the channel's socket has been forced into non-blocking mode
// by a virtual thread. It cannot be reset. When the channel is in
@ -120,7 +120,7 @@ class SinkChannelImpl
*/
private boolean tryClose() throws IOException {
assert Thread.holdsLock(stateLock) && state == ST_CLOSING;
if (thread == 0 && !isRegistered()) {
if (writerThread == null && !isRegistered()) {
state = ST_CLOSED;
nd.close(fd);
return true;
@ -152,7 +152,7 @@ class SinkChannelImpl
assert state < ST_CLOSING;
state = ST_CLOSING;
if (!tryClose()) {
nd.preClose(fd, thread, 0);
nd.preClose(fd, null, writerThread);
}
}
}
@ -270,7 +270,7 @@ class SinkChannelImpl
synchronized (stateLock) {
ensureOpen();
if (blocking)
thread = NativeThread.current();
writerThread = NativeThread.threadToSignal();
}
}
@ -285,7 +285,7 @@ class SinkChannelImpl
{
if (blocking) {
synchronized (stateLock) {
thread = 0;
writerThread = null;
if (state == ST_CLOSING) {
tryFinishClose();
}

View File

@ -63,8 +63,8 @@ class SourceChannelImpl
private static final int ST_CLOSED = 2;
private int state;
// ID of native thread doing read, for signalling
private long thread;
// Thread doing read, for signalling
private Thread readerThread;
// True if the channel's socket has been forced into non-blocking mode
// by a virtual thread. It cannot be reset. When the channel is in
@ -120,7 +120,7 @@ class SourceChannelImpl
*/
private boolean tryClose() throws IOException {
assert Thread.holdsLock(stateLock) && state == ST_CLOSING;
if (thread == 0 && !isRegistered()) {
if (readerThread == null && !isRegistered()) {
state = ST_CLOSED;
nd.close(fd);
return true;
@ -152,7 +152,7 @@ class SourceChannelImpl
assert state < ST_CLOSING;
state = ST_CLOSING;
if (!tryClose()) {
nd.preClose(fd, thread, 0);
nd.preClose(fd, readerThread, null);
}
}
}
@ -269,8 +269,9 @@ class SourceChannelImpl
}
synchronized (stateLock) {
ensureOpen();
if (blocking)
thread = NativeThread.current();
if (blocking) {
readerThread = NativeThread.threadToSignal();
}
}
}
@ -285,7 +286,7 @@ class SourceChannelImpl
{
if (blocking) {
synchronized (stateLock) {
thread = 0;
readerThread = null;
if (state == ST_CLOSING) {
tryFinishClose();
}

View File

@ -36,15 +36,17 @@ abstract class UnixDispatcher extends NativeDispatcher {
close0(fd);
}
private void signalThreads(long reader, long writer) {
if (NativeThread.isNativeThread(reader))
private void signalThreads(Thread reader, Thread writer) {
if (reader != null) {
NativeThread.signal(reader);
if (NativeThread.isNativeThread(writer))
}
if (writer != null) {
NativeThread.signal(writer);
}
}
@Override
void implPreClose(FileDescriptor fd, long reader, long writer) throws IOException {
void implPreClose(FileDescriptor fd, Thread reader, Thread writer) throws IOException {
if (SUPPORTS_PENDING_SIGNALS) {
signalThreads(reader, writer);
}

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2000, 2021, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2000, 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -77,10 +77,9 @@ configureBlocking(int fd, jboolean blocking)
}
JNIEXPORT void JNICALL
Java_sun_nio_ch_IOUtil_configureBlocking(JNIEnv *env, jclass clazz,
jobject fdo, jboolean blocking)
Java_sun_nio_ch_IOUtil_configureBlocking(JNIEnv *env, jclass clazz, jint fd, jboolean blocking)
{
if (configureBlocking(fdval(env, fdo), blocking) < 0)
if (configureBlocking(fd, blocking) < 0)
JNU_ThrowIOExceptionWithLastError(env, "Configure blocking failed");
}

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2019, 2023, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2019, 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -30,11 +30,19 @@ import java.io.IOException;
* Default PollerProvider for Windows based on wepoll.
*/
class DefaultPollerProvider extends PollerProvider {
DefaultPollerProvider() { }
DefaultPollerProvider(Poller.Mode mode) {
if (mode != Poller.Mode.SYSTEM_THREADS) {
throw new UnsupportedOperationException();
}
super(mode);
}
DefaultPollerProvider() {
this(Poller.Mode.SYSTEM_THREADS);
}
@Override
int defaultReadPollers(Poller.Mode mode) {
assert mode == Poller.Mode.SYSTEM_THREADS;
int defaultReadPollers() {
int ncpus = Runtime.getRuntime().availableProcessors();
return Math.max(Integer.highestOneBit(ncpus / 8), 1);
}
@ -46,13 +54,15 @@ class DefaultPollerProvider extends PollerProvider {
@Override
Poller readPoller(boolean subPoller) throws IOException {
assert !subPoller;
if (subPoller)
throw new UnsupportedOperationException();
return new WEPollPoller(true);
}
@Override
Poller writePoller(boolean subPoller) throws IOException {
assert !subPoller;
if (subPoller)
throw new UnsupportedOperationException();
return new WEPollPoller(false);
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2002, 2023, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2002, 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -25,47 +25,29 @@
package sun.nio.ch;
// Signalling operations on native threads
import java.util.concurrent.locks.LockSupport;
public class NativeThread {
private static final long VIRTUAL_THREAD_ID = -1L;
private NativeThread() { }
/**
* Returns the id of the current native thread if the platform can signal
* native threads, 0 if the platform can not signal native threads, or
* -1L if the current thread is a virtual thread.
* Returns the Thread to signal the current thread or {@code null} if the current
* thread cannot be signalled.
*/
public static long current() {
if (Thread.currentThread().isVirtual()) {
return VIRTUAL_THREAD_ID;
public static Thread threadToSignal() {
Thread thread = Thread.currentThread();
return thread.isVirtual() ? thread : null;
}
/**
* Signals the given thread.
* @throws UnsupportedOperationException is not supported
*/
public static void signal(Thread thread) {
if (thread.isVirtual()) {
LockSupport.unpark(thread);
} else {
// no support for signalling threads on Windows
return 0;
throw new UnsupportedOperationException();
}
}
/**
* Signals the given native thread.
*
* @throws IllegalArgumentException if tid is not a token to a native thread
*/
static void signal(long tid) {
throw new UnsupportedOperationException();
}
/**
* Returns true the tid is the id of a native thread.
*/
static boolean isNativeThread(long tid) {
return false;
}
/**
* Returns true if tid is -1L.
* @see #current()
*/
static boolean isVirtualThread(long tid) {
return (tid == VIRTUAL_THREAD_ID);
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, 2023, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2020, 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -32,27 +32,41 @@ import static sun.nio.ch.WEPoll.*;
*/
class WEPollPoller extends Poller {
private static final int MAX_EVENTS_TO_POLL = 256;
private static final int ENOENT = 2;
private final long handle;
private final int event;
private final long address;
WEPollPoller(boolean read) throws IOException {
this.handle = WEPoll.create();
long handle = WEPoll.create();
long address;
try {
address = WEPoll.allocatePollArray(MAX_EVENTS_TO_POLL);
} catch (Throwable e) {
WEPoll.close(handle);
throw e;
}
this.event = (read) ? EPOLLIN : EPOLLOUT;
this.address = WEPoll.allocatePollArray(MAX_EVENTS_TO_POLL);
this.handle = handle;
this.address = address;
}
@Override
void implRegister(int fdVal) throws IOException {
void close() {
WEPoll.close(handle);
WEPoll.freePollArray(address);
}
@Override
void implStartPoll(int fdVal) throws IOException {
int err = WEPoll.ctl(handle, EPOLL_CTL_ADD, fdVal, (event | EPOLLONESHOT));
if (err != 0)
throw new IOException("epoll_ctl failed: " + err);
}
@Override
void implDeregister(int fdVal, boolean polled) {
void implStopPoll(int fdVal, boolean polled) {
WEPoll.ctl(handle, EPOLL_CTL_DEL, fdVal, 0);
}

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2000, 2023, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2000, 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -134,11 +134,10 @@ Java_sun_nio_ch_IOUtil_setfdVal(JNIEnv *env, jclass clazz, jobject fdo, jint val
JNIEXPORT void JNICALL
Java_sun_nio_ch_IOUtil_configureBlocking(JNIEnv *env, jclass clazz,
jobject fdo, jboolean blocking)
jint fd, jboolean blocking)
{
u_long argp;
int result = 0;
jint fd = fdval(env, fdo);
if (blocking == JNI_FALSE) {
argp = SET_NONBLOCKING;

View File

@ -84,9 +84,9 @@ public class SctpChannelImpl extends SctpChannel
private final int fdVal;
/* IDs of native threads doing send and receive, for signalling */
private volatile long receiverThread;
private volatile long senderThread;
/* Threads doing send and receive, for signalling */
private volatile Thread receiverThread;
private volatile Thread senderThread;
/* Lock held by current receiving or connecting thread */
private final Object receiveLock = new Object();
@ -326,7 +326,7 @@ public class SctpChannelImpl extends SctpChannel
private void receiverCleanup() throws IOException {
synchronized (stateLock) {
receiverThread = 0;
receiverThread = null;
if (state == ChannelState.KILLPENDING)
kill();
}
@ -334,7 +334,7 @@ public class SctpChannelImpl extends SctpChannel
private void senderCleanup() throws IOException {
synchronized (stateLock) {
senderThread = 0;
senderThread = null;
if (state == ChannelState.KILLPENDING)
kill();
}
@ -367,7 +367,7 @@ public class SctpChannelImpl extends SctpChannel
if (!isOpen()) {
return false;
}
receiverThread = NativeThread.current();
receiverThread = NativeThread.threadToSignal();
}
for (;;) {
InetAddress ia = isa.getAddress();
@ -472,7 +472,7 @@ public class SctpChannelImpl extends SctpChannel
if (!isOpen()) {
return false;
}
receiverThread = NativeThread.current();
receiverThread = NativeThread.threadToSignal();
}
if (!isBlocking()) {
connected = Net.pollConnect(fd, 0);
@ -484,7 +484,7 @@ public class SctpChannelImpl extends SctpChannel
}
} finally {
synchronized (stateLock) {
receiverThread = 0;
receiverThread = null;
if (state == ChannelState.KILLPENDING) {
kill();
connected = false;
@ -541,10 +541,10 @@ public class SctpChannelImpl extends SctpChannel
if (state != ChannelState.KILLED)
SctpNet.preClose(fdVal);
if (receiverThread != 0)
if (receiverThread != null)
NativeThread.signal(receiverThread);
if (senderThread != 0)
if (senderThread != null)
NativeThread.signal(senderThread);
if (!isRegistered())
@ -644,7 +644,7 @@ public class SctpChannelImpl extends SctpChannel
/* Postpone the kill if there is a waiting reader
* or writer thread. */
if (receiverThread == 0 && senderThread == 0) {
if (receiverThread == null && senderThread == null) {
state = ChannelState.KILLED;
SctpNet.close(fdVal);
} else {
@ -743,7 +743,7 @@ public class SctpChannelImpl extends SctpChannel
synchronized (stateLock) {
if(!isOpen())
return null;
receiverThread = NativeThread.current();
receiverThread = NativeThread.threadToSignal();
}
do {
@ -936,7 +936,7 @@ public class SctpChannelImpl extends SctpChannel
synchronized (stateLock) {
if(!isOpen())
return 0;
senderThread = NativeThread.current();
senderThread = NativeThread.threadToSignal();
}
do {
@ -1031,7 +1031,7 @@ public class SctpChannelImpl extends SctpChannel
ensureSendOpen();
SctpNet.shutdown(fdVal, -1);
if (senderThread != 0)
if (senderThread != null)
NativeThread.signal(senderThread);
isShutdown = true;
}

View File

@ -81,9 +81,9 @@ public class SctpMultiChannelImpl extends SctpMultiChannel
private final int fdVal;
/* IDs of native threads doing send and receives, for signalling */
private volatile long receiverThread;
private volatile long senderThread;
/* Threads doing send and receives, for signalling */
private volatile Thread receiverThread;
private volatile Thread senderThread;
/* Lock held by current receiving thread */
private final Object receiveLock = new Object();
@ -265,7 +265,7 @@ public class SctpMultiChannelImpl extends SctpMultiChannel
private void receiverCleanup() throws IOException {
synchronized (stateLock) {
receiverThread = 0;
receiverThread = null;
if (state == ChannelState.KILLPENDING)
kill();
}
@ -273,7 +273,7 @@ public class SctpMultiChannelImpl extends SctpMultiChannel
private void senderCleanup() throws IOException {
synchronized (stateLock) {
senderThread = 0;
senderThread = null;
if (state == ChannelState.KILLPENDING)
kill();
}
@ -290,10 +290,10 @@ public class SctpMultiChannelImpl extends SctpMultiChannel
if (state != ChannelState.KILLED)
SctpNet.preClose(fdVal);
if (receiverThread != 0)
if (receiverThread != null)
NativeThread.signal(receiverThread);
if (senderThread != 0)
if (senderThread != null)
NativeThread.signal(senderThread);
if (!isRegistered())
@ -378,7 +378,7 @@ public class SctpMultiChannelImpl extends SctpMultiChannel
assert !isOpen() && !isRegistered();
/* Postpone the kill if there is a thread sending or receiving. */
if (receiverThread == 0 && senderThread == 0) {
if (receiverThread == null && senderThread == null) {
state = ChannelState.KILLED;
SctpNet.close(fdVal);
} else {
@ -484,7 +484,7 @@ public class SctpMultiChannelImpl extends SctpMultiChannel
synchronized (stateLock) {
if(!isOpen())
return null;
receiverThread = NativeThread.current();
receiverThread = NativeThread.threadToSignal();
}
do {
@ -765,7 +765,7 @@ public class SctpMultiChannelImpl extends SctpMultiChannel
synchronized (stateLock) {
if(!isOpen())
return 0;
senderThread = NativeThread.current();
senderThread = NativeThread.threadToSignal();
/* Determine what address or association to send to */
Association assoc = messageInfo.association();

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2009, 2024, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2009, 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -58,8 +58,8 @@ public class SctpServerChannelImpl extends SctpServerChannel
private final int fdVal;
/* IDs of native thread doing accept, for signalling */
private volatile long thread;
/* thread doing accept, for signalling */
private volatile Thread thread;
/* Lock held by thread currently blocked in this channel */
private final Object lock = new Object();
@ -200,7 +200,7 @@ public class SctpServerChannelImpl extends SctpServerChannel
private void acceptCleanup() throws IOException {
synchronized (stateLock) {
thread = 0;
thread = null;
if (state == ChannelState.KILLPENDING)
kill();
}
@ -222,7 +222,7 @@ public class SctpServerChannelImpl extends SctpServerChannel
begin();
if (!isOpen())
return null;
thread = NativeThread.current();
thread = NativeThread.threadToSignal();
for (;;) {
n = Net.accept(fd, newfd, isaa);
if ((n == IOStatus.INTERRUPTED) && isOpen())
@ -253,7 +253,7 @@ public class SctpServerChannelImpl extends SctpServerChannel
synchronized (stateLock) {
if (state != ChannelState.KILLED)
SctpNet.preClose(fdVal);
if (thread != 0)
if (thread != null)
NativeThread.signal(thread);
if (!isRegistered())
kill();
@ -273,7 +273,7 @@ public class SctpServerChannelImpl extends SctpServerChannel
assert !isOpen() && !isRegistered();
// Postpone the kill if there is a thread in accept
if (thread == 0) {
if (thread == null) {
state = ChannelState.KILLED;
SctpNet.close(fdVal);
} else {

View File

@ -35,6 +35,7 @@
* @library /test/lib
* @run junit/othervm -Djdk.pollerMode=1 BlockingSocketOps
* @run junit/othervm -Djdk.pollerMode=2 BlockingSocketOps
* @run junit/othervm -Djdk.pollerMode=3 BlockingSocketOps
*/
/*

View File

@ -35,6 +35,7 @@
* @library /test/lib
* @run junit/othervm/timeout=480 -Djdk.pollerMode=1 BlockingChannelOps
* @run junit/othervm/timeout=480 -Djdk.pollerMode=2 BlockingChannelOps
* @run junit/othervm/timeout=480 -Djdk.pollerMode=3 BlockingChannelOps
*/
/*

View File

@ -34,6 +34,7 @@
* @library /test/lib
* @run junit/othervm/native -Djdk.pollerMode=1 --enable-native-access=ALL-UNNAMED SelectorOps
* @run junit/othervm/native -Djdk.pollerMode=2 --enable-native-access=ALL-UNNAMED SelectorOps
* @run junit/othervm/native -Djdk.pollerMode=3 --enable-native-access=ALL-UNNAMED SelectorOps
*/
/*