From cd5256d5a654d436e5ef926f6afb1bcbfc7a8bd1 Mon Sep 17 00:00:00 2001 From: Alan Bateman Date: Fri, 6 Feb 2026 15:19:01 +0000 Subject: [PATCH] 8374170: I/O Poller updates Reviewed-by: michaelm --- .../sun/nio/ch/DefaultPollerProvider.java | 17 +- .../aix/classes/sun/nio/ch/PollsetPoller.java | 12 +- .../sun/nio/ch/DefaultPollerProvider.java | 34 +- .../linux/classes/sun/nio/ch/EPollPoller.java | 94 ++- .../sun/nio/ch/DefaultPollerProvider.java | 14 +- .../classes/sun/nio/ch/KQueuePoller.java | 98 ++- .../share/classes/java/lang/System.java | 10 +- .../share/classes/java/lang/Thread.java | 11 + .../jdk/internal/access/JavaLangAccess.java | 10 + .../sun/nio/ch/DatagramChannelImpl.java | 16 +- .../share/classes/sun/nio/ch/IOUtil.java | 8 +- .../classes/sun/nio/ch/NativeDispatcher.java | 21 +- .../classes/sun/nio/ch/NativeThreadSet.java | 31 +- .../classes/sun/nio/ch/NioSocketImpl.java | 30 +- .../share/classes/sun/nio/ch/Poller.java | 783 ++++++++++++------ .../classes/sun/nio/ch/PollerProvider.java | 41 +- .../sun/nio/ch/ServerSocketChannelImpl.java | 12 +- .../classes/sun/nio/ch/SocketChannelImpl.java | 38 +- .../unix/classes/sun/nio/ch/NativeThread.java | 58 +- .../classes/sun/nio/ch/SinkChannelImpl.java | 12 +- .../classes/sun/nio/ch/SourceChannelImpl.java | 15 +- .../classes/sun/nio/ch/UnixDispatcher.java | 10 +- src/java.base/unix/native/libnio/ch/IOUtil.c | 7 +- .../sun/nio/ch/DefaultPollerProvider.java | 22 +- .../classes/sun/nio/ch/NativeThread.java | 54 +- .../classes/sun/nio/ch/WEPollPoller.java | 26 +- .../windows/native/libnio/ch/IOUtil.c | 5 +- .../sun/nio/ch/sctp/SctpChannelImpl.java | 28 +- .../sun/nio/ch/sctp/SctpMultiChannelImpl.java | 20 +- .../nio/ch/sctp/SctpServerChannelImpl.java | 14 +- .../java/net/vthread/BlockingSocketOps.java | 1 + .../channels/vthread/BlockingChannelOps.java | 1 + .../nio/channels/vthread/SelectorOps.java | 1 + 33 files changed, 1043 insertions(+), 511 deletions(-) diff --git a/src/java.base/aix/classes/sun/nio/ch/DefaultPollerProvider.java b/src/java.base/aix/classes/sun/nio/ch/DefaultPollerProvider.java index b645b735533..eb895d5a3a1 100644 --- a/src/java.base/aix/classes/sun/nio/ch/DefaultPollerProvider.java +++ b/src/java.base/aix/classes/sun/nio/ch/DefaultPollerProvider.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2022, 2025, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -30,15 +30,28 @@ import java.io.IOException; * Default PollerProvider for AIX. */ class DefaultPollerProvider extends PollerProvider { - DefaultPollerProvider() { } + DefaultPollerProvider(Poller.Mode mode) { + if (mode != Poller.Mode.SYSTEM_THREADS) { + throw new UnsupportedOperationException(); + } + super(mode); + } + + DefaultPollerProvider() { + this(Poller.Mode.SYSTEM_THREADS); + } @Override Poller readPoller(boolean subPoller) throws IOException { + if (subPoller) + throw new UnsupportedOperationException(); return new PollsetPoller(true); } @Override Poller writePoller(boolean subPoller) throws IOException { + if (subPoller) + throw new UnsupportedOperationException(); return new PollsetPoller(false); } } diff --git a/src/java.base/aix/classes/sun/nio/ch/PollsetPoller.java b/src/java.base/aix/classes/sun/nio/ch/PollsetPoller.java index 724f14495a8..3eee39906ee 100644 --- a/src/java.base/aix/classes/sun/nio/ch/PollsetPoller.java +++ b/src/java.base/aix/classes/sun/nio/ch/PollsetPoller.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022, 2023, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2022, 2026, Oracle and/or its affiliates. All rights reserved. * Copyright (c) 2023, IBM Corp. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * @@ -52,13 +52,19 @@ class PollsetPoller extends Poller { this.pollBuffer = Pollset.allocatePollArray(MAX_EVENTS_TO_POLL); } + @Override + void close() { + Pollset.pollsetDestroy(setid); + Pollset.freePollArray(pollBuffer); + } + @Override int fdVal() { return setid; } @Override - void implRegister(int fd) throws IOException { + void implStartPoll(int fd) throws IOException { int ret = Pollset.pollsetCtl(setid, Pollset.PS_MOD, fd, Pollset.PS_POLLPRI | event); if (ret != 0) { throw new IOException("Unable to register fd " + fd); @@ -66,7 +72,7 @@ class PollsetPoller extends Poller { } @Override - void implDeregister(int fd, boolean polled) { + void implStopPoll(int fd, boolean polled) { int ret = Pollset.pollsetCtl(setid, Pollset.PS_DELETE, fd, 0); assert ret == 0; } diff --git a/src/java.base/linux/classes/sun/nio/ch/DefaultPollerProvider.java b/src/java.base/linux/classes/sun/nio/ch/DefaultPollerProvider.java index a9b169a4657..b53a90e4f7f 100644 --- a/src/java.base/linux/classes/sun/nio/ch/DefaultPollerProvider.java +++ b/src/java.base/linux/classes/sun/nio/ch/DefaultPollerProvider.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017, 2023, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2017, 2025, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -31,34 +31,34 @@ import jdk.internal.vm.ContinuationSupport; * Default PollerProvider for Linux. */ class DefaultPollerProvider extends PollerProvider { - DefaultPollerProvider() { } + DefaultPollerProvider(Poller.Mode mode) { + super(mode); + } - @Override - Poller.Mode defaultPollerMode() { - if (ContinuationSupport.isSupported()) { - return Poller.Mode.VTHREAD_POLLERS; - } else { - return Poller.Mode.SYSTEM_THREADS; - } + DefaultPollerProvider() { + var mode = ContinuationSupport.isSupported() + ? Poller.Mode.VTHREAD_POLLERS + : Poller.Mode.SYSTEM_THREADS; + this(mode); } @Override - int defaultReadPollers(Poller.Mode mode) { + int defaultReadPollers() { int ncpus = Runtime.getRuntime().availableProcessors(); - if (mode == Poller.Mode.VTHREAD_POLLERS) { - return Math.min(Integer.highestOneBit(ncpus), 32); - } else { - return Math.max(Integer.highestOneBit(ncpus / 4), 1); - } + return switch (pollerMode()) { + case SYSTEM_THREADS -> Math.max(Integer.highestOneBit(ncpus / 4), 1); + case VTHREAD_POLLERS -> Math.min(Integer.highestOneBit(ncpus), 32); + default -> super.defaultReadPollers(); + }; } @Override Poller readPoller(boolean subPoller) throws IOException { - return new EPollPoller(subPoller, true); + return new EPollPoller(pollerMode(), subPoller, true); } @Override Poller writePoller(boolean subPoller) throws IOException { - return new EPollPoller(subPoller, false); + return new EPollPoller(pollerMode(), subPoller, false); } } diff --git a/src/java.base/linux/classes/sun/nio/ch/EPollPoller.java b/src/java.base/linux/classes/sun/nio/ch/EPollPoller.java index cdebff7c766..5c45393bd62 100644 --- a/src/java.base/linux/classes/sun/nio/ch/EPollPoller.java +++ b/src/java.base/linux/classes/sun/nio/ch/EPollPoller.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017, 2023, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2017, 2025, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -25,6 +25,8 @@ package sun.nio.ch; import java.io.IOException; +import java.lang.ref.Cleaner.Cleanable; +import jdk.internal.ref.CleanerFactory; import static sun.nio.ch.EPoll.*; /** @@ -38,12 +40,70 @@ class EPollPoller extends Poller { private final int event; private final int maxEvents; private final long address; + private final EventFD eventfd; // wakeup event, used for shutdown - EPollPoller(boolean subPoller, boolean read) throws IOException { - this.epfd = EPoll.create(); + // close action, and cleaner if this is subpoller + private final Runnable closer; + private final Cleanable cleaner; + + EPollPoller(Poller.Mode mode, boolean subPoller, boolean read) throws IOException { + boolean wakeable = (mode == Mode.POLLER_PER_CARRIER) && subPoller; + int maxEvents = (subPoller) ? 16 : 64; + + int epfd = EPoll.create(); + long address = 0L; + EventFD eventfd = null; + try { + address = EPoll.allocatePollArray(maxEvents); + + // register one end of the pipe with epoll to allow for wakeup + if (wakeable) { + eventfd = new EventFD(); + IOUtil.configureBlocking(eventfd.efd(), false); + EPoll.ctl(epfd, EPOLL_CTL_ADD, eventfd.efd(), EPOLLIN); + } + } catch (Throwable e) { + FileDispatcherImpl.closeIntFD(epfd); + if (address != 0L) EPoll.freePollArray(address); + if (eventfd != null) eventfd.close(); + throw e; + } + + this.epfd = epfd; this.event = (read) ? EPOLLIN : EPOLLOUT; - this.maxEvents = (subPoller) ? 64 : 512; - this.address = EPoll.allocatePollArray(maxEvents); + this.maxEvents = maxEvents; + this.address = address; + this.eventfd = eventfd; + + // create action to close epoll instance, register cleaner when wakeable + this.closer = closer(epfd, address, eventfd); + if (wakeable) { + this.cleaner = CleanerFactory.cleaner().register(this, closer); + } else { + this.cleaner = null; + } + } + + /** + * Returns an action to close the epoll instance and release other resources. + */ + private static Runnable closer(int epfd, long address, EventFD eventfd) { + return () -> { + try { + FileDispatcherImpl.closeIntFD(epfd); + EPoll.freePollArray(address); + if (eventfd != null) eventfd.close(); + } catch (IOException _) { } + }; + } + + @Override + void close() { + if (cleaner != null) { + cleaner.clean(); + } else { + closer.run(); + } } @Override @@ -52,8 +112,8 @@ class EPollPoller extends Poller { } @Override - void implRegister(int fdVal) throws IOException { - // re-arm + void implStartPoll(int fdVal) throws IOException { + // re-enable if already registered but disabled (previously polled) int err = EPoll.ctl(epfd, EPOLL_CTL_MOD, fdVal, (event | EPOLLONESHOT)); if (err == ENOENT) err = EPoll.ctl(epfd, EPOLL_CTL_ADD, fdVal, (event | EPOLLONESHOT)); @@ -62,24 +122,36 @@ class EPollPoller extends Poller { } @Override - void implDeregister(int fdVal, boolean polled) { + void implStopPoll(int fdVal, boolean polled) { // event is disabled if already polled if (!polled) { EPoll.ctl(epfd, EPOLL_CTL_DEL, fdVal, 0); } } + @Override + void wakeupPoller() throws IOException { + if (eventfd == null) { + throw new UnsupportedOperationException(); + } + eventfd.set(); + } + @Override int poll(int timeout) throws IOException { int n = EPoll.wait(epfd, address, maxEvents, timeout); + int polled = 0; int i = 0; while (i < n) { long eventAddress = EPoll.getEvent(address, i); - int fdVal = EPoll.getDescriptor(eventAddress); - polled(fdVal); + int fd = EPoll.getDescriptor(eventAddress); + if (eventfd == null || fd != eventfd.efd()) { + polled(fd); + polled++; + } i++; } - return n; + return polled; } } diff --git a/src/java.base/macosx/classes/sun/nio/ch/DefaultPollerProvider.java b/src/java.base/macosx/classes/sun/nio/ch/DefaultPollerProvider.java index dc32c2cd90c..6349ae503e4 100644 --- a/src/java.base/macosx/classes/sun/nio/ch/DefaultPollerProvider.java +++ b/src/java.base/macosx/classes/sun/nio/ch/DefaultPollerProvider.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017, 2023, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2017, 2025, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -30,15 +30,21 @@ import java.io.IOException; * Default PollerProvider for macOS. */ class DefaultPollerProvider extends PollerProvider { - DefaultPollerProvider() { } + DefaultPollerProvider(Poller.Mode mode) { + super(mode); + } + + DefaultPollerProvider() { + this(Poller.Mode.SYSTEM_THREADS); + } @Override Poller readPoller(boolean subPoller) throws IOException { - return new KQueuePoller(subPoller, true); + return new KQueuePoller(pollerMode(), subPoller, true); } @Override Poller writePoller(boolean subPoller) throws IOException { - return new KQueuePoller(subPoller, false); + return new KQueuePoller(pollerMode(), subPoller, false); } } diff --git a/src/java.base/macosx/classes/sun/nio/ch/KQueuePoller.java b/src/java.base/macosx/classes/sun/nio/ch/KQueuePoller.java index 6a1c771820e..69c191913a9 100644 --- a/src/java.base/macosx/classes/sun/nio/ch/KQueuePoller.java +++ b/src/java.base/macosx/classes/sun/nio/ch/KQueuePoller.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017, 2023, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2017, 2025, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -25,6 +25,8 @@ package sun.nio.ch; import java.io.IOException; +import java.lang.ref.Cleaner.Cleanable; +import jdk.internal.ref.CleanerFactory; import static sun.nio.ch.KQueue.*; /** @@ -36,11 +38,77 @@ class KQueuePoller extends Poller { private final int maxEvents; private final long address; - KQueuePoller(boolean subPoller, boolean read) throws IOException { - this.kqfd = KQueue.create(); + // file descriptors used for wakeup during shutdown + private final int fd0; + private final int fd1; + + // close action, and cleaner if this is subpoller + private final Runnable closer; + private final Cleanable cleaner; + + KQueuePoller(Poller.Mode mode, boolean subPoller, boolean read) throws IOException { + boolean wakeable = (mode == Mode.POLLER_PER_CARRIER) && subPoller; + int maxEvents = (subPoller) ? 16 : 64; + + int kqfd = KQueue.create(); + long address = 0L; + int fd0 = -1; + int fd1 = -1; + try { + address = KQueue.allocatePollArray(maxEvents); + + // register one end of the pipe with kqueue to allow for wakeup + if (wakeable) { + long fds = IOUtil.makePipe(false); + fd0 = (int) (fds >>> 32); + fd1 = (int) fds; + KQueue.register(kqfd, fd0, EVFILT_READ, EV_ADD); + } + } catch (Throwable e) { + FileDispatcherImpl.closeIntFD(kqfd); + if (address != 0L) KQueue.freePollArray(address); + if (fd0 >= 0) FileDispatcherImpl.closeIntFD(fd0); + if (fd1 >= 0) FileDispatcherImpl.closeIntFD(fd1); + throw e; + } + + this.kqfd = kqfd; this.filter = (read) ? EVFILT_READ : EVFILT_WRITE; - this.maxEvents = (subPoller) ? 64 : 512; - this.address = KQueue.allocatePollArray(maxEvents); + this.maxEvents = maxEvents; + this.address = address; + this.fd0 = fd0; + this.fd1 = fd1; + + // create action to close kqueue, register cleaner when wakeable + this.closer = closer(kqfd, address, fd0, fd1); + if (wakeable) { + this.cleaner = CleanerFactory.cleaner().register(this, closer); + } else { + this.cleaner = null; + } + } + + /** + * Returns an action to close the kqueue and release other resources. + */ + private static Runnable closer(int kqfd, long address, int fd0, int fd1) { + return () -> { + try { + FileDispatcherImpl.closeIntFD(kqfd); + KQueue.freePollArray(address); + if (fd0 >= 0) FileDispatcherImpl.closeIntFD(fd0); + if (fd1 >= 0) FileDispatcherImpl.closeIntFD(fd1); + } catch (IOException _) { } + }; + } + + @Override + void close() { + if (cleaner != null) { + cleaner.clean(); + } else { + closer.run(); + } } @Override @@ -49,30 +117,42 @@ class KQueuePoller extends Poller { } @Override - void implRegister(int fdVal) throws IOException { + void implStartPoll(int fdVal) throws IOException { int err = KQueue.register(kqfd, fdVal, filter, (EV_ADD|EV_ONESHOT)); if (err != 0) throw new IOException("kevent failed: " + err); } @Override - void implDeregister(int fdVal, boolean polled) { + void implStopPoll(int fdVal, boolean polled) { // event was deleted if already polled if (!polled) { KQueue.register(kqfd, fdVal, filter, EV_DELETE); } } + @Override + void wakeupPoller() throws IOException { + if (fd1 < 0) { + throw new UnsupportedOperationException(); + } + IOUtil.write1(fd1, (byte)0); + } + @Override int poll(int timeout) throws IOException { int n = KQueue.poll(kqfd, address, maxEvents, timeout); + int polled = 0; int i = 0; while (i < n) { long keventAddress = KQueue.getEvent(address, i); int fdVal = KQueue.getDescriptor(keventAddress); - polled(fdVal); + if (fdVal != fd0) { + polled(fdVal); + polled++; + } i++; } - return n; + return polled; } } diff --git a/src/java.base/share/classes/java/lang/System.java b/src/java.base/share/classes/java/lang/System.java index bd684fab629..2f772e4d065 100644 --- a/src/java.base/share/classes/java/lang/System.java +++ b/src/java.base/share/classes/java/lang/System.java @@ -2277,6 +2277,14 @@ public final class System { return Thread.scopedValueBindings(); } + public long nativeThreadID(Thread thread) { + return thread.nativeThreadID(); + } + + public void setThreadNativeID(long id) { + Thread.currentThread().setNativeThreadID(id); + } + public Continuation getContinuation(Thread thread) { return thread.getContinuation(); } @@ -2311,7 +2319,7 @@ public final class System { if (thread instanceof BaseVirtualThread vthread) { vthread.unpark(); } else { - throw new WrongThreadException(); + throw new IllegalArgumentException(); } } diff --git a/src/java.base/share/classes/java/lang/Thread.java b/src/java.base/share/classes/java/lang/Thread.java index bb1292b374a..57d28aca5f4 100644 --- a/src/java.base/share/classes/java/lang/Thread.java +++ b/src/java.base/share/classes/java/lang/Thread.java @@ -286,6 +286,9 @@ public class Thread implements Runnable { volatile boolean daemon; volatile int threadStatus; + // Used by NativeThread for signalling + @Stable long nativeThreadID; + // This map is maintained by the ThreadLocal class ThreadLocal.ThreadLocalMap terminatingThreadLocals; @@ -312,6 +315,14 @@ public class Thread implements Runnable { holder.terminatingThreadLocals = map; } + long nativeThreadID() { + return holder.nativeThreadID; + } + + void setNativeThreadID(long id) { + holder.nativeThreadID = id; + } + /* * ThreadLocal values pertaining to this thread. This map is maintained * by the ThreadLocal class. diff --git a/src/java.base/share/classes/jdk/internal/access/JavaLangAccess.java b/src/java.base/share/classes/jdk/internal/access/JavaLangAccess.java index 4ae1905aa10..9d980c3ba3b 100644 --- a/src/java.base/share/classes/jdk/internal/access/JavaLangAccess.java +++ b/src/java.base/share/classes/jdk/internal/access/JavaLangAccess.java @@ -581,6 +581,16 @@ public interface JavaLangAccess { */ Object scopedValueBindings(); + /** + * Returns the native thread ID for the given platform thread or 0 if not set. + */ + long nativeThreadID(Thread thread); + + /** + * Sets the native thread ID for the current platform thread. + */ + void setThreadNativeID(long id); + /** * Returns the innermost mounted continuation */ diff --git a/src/java.base/share/classes/sun/nio/ch/DatagramChannelImpl.java b/src/java.base/share/classes/sun/nio/ch/DatagramChannelImpl.java index afb312ed722..f5002e8b716 100644 --- a/src/java.base/share/classes/sun/nio/ch/DatagramChannelImpl.java +++ b/src/java.base/share/classes/sun/nio/ch/DatagramChannelImpl.java @@ -134,9 +134,9 @@ class DatagramChannelImpl private static final int ST_CLOSED = 3; private int state; - // IDs of native threads doing reads and writes, for signalling - private long readerThread; - private long writerThread; + // Threads doing reads and writes, for signalling + private Thread readerThread; + private Thread writerThread; // Local and remote (connected) address private InetSocketAddress localAddress; @@ -523,7 +523,7 @@ class DatagramChannelImpl if (localAddress == null) bindInternal(null); if (blocking) - readerThread = NativeThread.current(); + readerThread = NativeThread.threadToSignal(); } return remote; } @@ -538,7 +538,7 @@ class DatagramChannelImpl { if (blocking) { synchronized (stateLock) { - readerThread = 0; + readerThread = null; if (state == ST_CLOSING) { tryFinishClose(); } @@ -1030,7 +1030,7 @@ class DatagramChannelImpl if (localAddress == null) bindInternal(null); if (blocking) - writerThread = NativeThread.current(); + writerThread = NativeThread.threadToSignal(); } return remote; } @@ -1045,7 +1045,7 @@ class DatagramChannelImpl { if (blocking) { synchronized (stateLock) { - writerThread = 0; + writerThread = null; if (state == ST_CLOSING) { tryFinishClose(); } @@ -1714,7 +1714,7 @@ class DatagramChannelImpl */ private boolean tryClose() throws IOException { assert Thread.holdsLock(stateLock) && state == ST_CLOSING; - if ((readerThread == 0) && (writerThread == 0) && !isRegistered()) { + if ((readerThread == null) && (writerThread == null) && !isRegistered()) { state = ST_CLOSED; try { // close socket diff --git a/src/java.base/share/classes/sun/nio/ch/IOUtil.java b/src/java.base/share/classes/sun/nio/ch/IOUtil.java index 45f8cb2e588..df6677ab94d 100644 --- a/src/java.base/share/classes/sun/nio/ch/IOUtil.java +++ b/src/java.base/share/classes/sun/nio/ch/IOUtil.java @@ -587,9 +587,11 @@ public final class IOUtil { */ static native int drain1(int fd) throws IOException; - public static native void configureBlocking(FileDescriptor fd, - boolean blocking) - throws IOException; + static native void configureBlocking(int fd, boolean blocking) throws IOException; + + public static void configureBlocking(FileDescriptor fd, boolean blocking) throws IOException { + configureBlocking(fdVal(fd), blocking); + } public static native int fdVal(FileDescriptor fd); diff --git a/src/java.base/share/classes/sun/nio/ch/NativeDispatcher.java b/src/java.base/share/classes/sun/nio/ch/NativeDispatcher.java index 9b65310784a..63009b407ac 100644 --- a/src/java.base/share/classes/sun/nio/ch/NativeDispatcher.java +++ b/src/java.base/share/classes/sun/nio/ch/NativeDispatcher.java @@ -27,8 +27,6 @@ package sun.nio.ch; import java.io.FileDescriptor; import java.io.IOException; -import jdk.internal.access.JavaIOFileDescriptorAccess; -import jdk.internal.access.SharedSecrets; /** * Allows different platforms to call different native methods @@ -36,7 +34,6 @@ import jdk.internal.access.SharedSecrets; */ abstract class NativeDispatcher { - private static final JavaIOFileDescriptorAccess JIOFDA = SharedSecrets.getJavaIOFileDescriptorAccess(); abstract int read(FileDescriptor fd, long address, int len) throws IOException; @@ -78,12 +75,17 @@ abstract class NativeDispatcher { * if a platform thread is blocked on the file descriptor then the file descriptor is * dup'ed to a special fd and the thread signalled so that the syscall fails with EINTR. */ - final void preClose(FileDescriptor fd, long reader, long writer) throws IOException { - if (NativeThread.isVirtualThread(reader) || NativeThread.isVirtualThread(writer)) { - int fdVal = JIOFDA.get(fd); - Poller.stopPoll(fdVal); + final void preClose(FileDescriptor fd, Thread reader, Thread writer) throws IOException { + if (reader != null && reader.isVirtual()) { + NativeThread.signal(reader); // unparks virtual thread + reader = null; } - if (NativeThread.isNativeThread(reader) || NativeThread.isNativeThread(writer)) { + if (writer != null && writer.isVirtual()) { + NativeThread.signal(writer); // unparks virtual thread + writer = null; + } + // dup2 and signal platform threads + if (reader != null || writer != null) { implPreClose(fd, reader, writer); } } @@ -92,8 +94,7 @@ abstract class NativeDispatcher { * This method does nothing by default. On Unix systems the file descriptor is dup'ed * to a special fd and native threads signalled. */ - - void implPreClose(FileDescriptor fd, long reader, long writer) throws IOException { + void implPreClose(FileDescriptor fd, Thread reader, Thread writer) throws IOException { // Do nothing by default; this is only needed on Unix } diff --git a/src/java.base/share/classes/sun/nio/ch/NativeThreadSet.java b/src/java.base/share/classes/sun/nio/ch/NativeThreadSet.java index 079291572a8..c5423141789 100644 --- a/src/java.base/share/classes/sun/nio/ch/NativeThreadSet.java +++ b/src/java.base/share/classes/sun/nio/ch/NativeThreadSet.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2002, 2023, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2002, 2025, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -31,9 +31,9 @@ class NativeThreadSet { private static final int OTHER_THREAD_INDEX = -99; private final int initialCapacity; - private long[] threads; // array of thread handles, created lazily - private int used; // number of thread handles in threads array - private int otherThreads; // count of threads without a native thread handle + private Thread[] threads; // array of platform threads, created lazily + private int used; // number of elements in threads array + private int otherThreads; // additional threads that can't be signalled private boolean waitingToEmpty; NativeThreadSet(int n) { @@ -45,28 +45,28 @@ class NativeThreadSet { * it can efficiently be removed later. */ int add() { - long th = NativeThread.current(); synchronized (this) { - if (!NativeThread.isNativeThread(th)) { + final Thread t = NativeThread.threadToSignal(); + if (t == null || t.isVirtual()) { otherThreads++; return OTHER_THREAD_INDEX; } - // add native thread handle to array, creating or growing array if needed + // add platform threads to array, creating or growing array if needed int start = 0; if (threads == null) { - threads = new long[initialCapacity]; + threads = new Thread[initialCapacity]; } else if (used >= threads.length) { int on = threads.length; int nn = on * 2; - long[] nthreads = new long[nn]; + Thread[] nthreads = new Thread[nn]; System.arraycopy(threads, 0, nthreads, 0, on); threads = nthreads; start = on; } for (int i = start; i < threads.length; i++) { - if (threads[i] == 0) { - threads[i] = th; + if (threads[i] == null) { + threads[i] = t; used++; return i; } @@ -81,8 +81,7 @@ class NativeThreadSet { void remove(int i) { synchronized (this) { if (i >= 0) { - assert threads[i] == NativeThread.current(); - threads[i] = 0; + threads[i] = null; used--; } else if (i == OTHER_THREAD_INDEX) { otherThreads--; @@ -104,9 +103,9 @@ class NativeThreadSet { while (used > 0 || otherThreads > 0) { int u = used, i = 0; while (u > 0 && i < threads.length) { - long th = threads[i]; - if (th != 0) { - NativeThread.signal(th); + Thread t = threads[i]; + if (t != null) { + NativeThread.signal(t); u--; } i++; diff --git a/src/java.base/share/classes/sun/nio/ch/NioSocketImpl.java b/src/java.base/share/classes/sun/nio/ch/NioSocketImpl.java index 01f894be227..58b3bc7aaba 100644 --- a/src/java.base/share/classes/sun/nio/ch/NioSocketImpl.java +++ b/src/java.base/share/classes/sun/nio/ch/NioSocketImpl.java @@ -103,8 +103,8 @@ public final class NioSocketImpl extends SocketImpl implements PlatformSocketImp private volatile boolean nonBlocking; // used by connect/read/write/accept, protected by stateLock - private long readerThread; - private long writerThread; + private Thread readerThread; + private Thread writerThread; // used when SO_REUSEADDR is emulated, protected by stateLock private boolean isReuseAddress; @@ -218,7 +218,7 @@ public final class NioSocketImpl extends SocketImpl implements PlatformSocketImp private FileDescriptor beginRead() throws SocketException { synchronized (stateLock) { ensureOpenAndConnected(); - readerThread = NativeThread.current(); + readerThread = NativeThread.threadToSignal(); return fd; } } @@ -229,7 +229,7 @@ public final class NioSocketImpl extends SocketImpl implements PlatformSocketImp */ private void endRead(boolean completed) throws SocketException { synchronized (stateLock) { - readerThread = 0; + readerThread = null; int state = this.state; if (state == ST_CLOSING) tryFinishClose(); @@ -370,7 +370,7 @@ public final class NioSocketImpl extends SocketImpl implements PlatformSocketImp private FileDescriptor beginWrite() throws SocketException { synchronized (stateLock) { ensureOpenAndConnected(); - writerThread = NativeThread.current(); + writerThread = NativeThread.threadToSignal(); return fd; } } @@ -381,7 +381,7 @@ public final class NioSocketImpl extends SocketImpl implements PlatformSocketImp */ private void endWrite(boolean completed) throws SocketException { synchronized (stateLock) { - writerThread = 0; + writerThread = null; int state = this.state; if (state == ST_CLOSING) tryFinishClose(); @@ -511,7 +511,7 @@ public final class NioSocketImpl extends SocketImpl implements PlatformSocketImp this.address = address; this.port = port; - readerThread = NativeThread.current(); + readerThread = NativeThread.threadToSignal(); return fd; } } @@ -522,7 +522,7 @@ public final class NioSocketImpl extends SocketImpl implements PlatformSocketImp */ private void endConnect(FileDescriptor fd, boolean completed) throws IOException { synchronized (stateLock) { - readerThread = 0; + readerThread = null; int state = this.state; if (state == ST_CLOSING) tryFinishClose(); @@ -666,7 +666,7 @@ public final class NioSocketImpl extends SocketImpl implements PlatformSocketImp ensureOpen(); if (localport == 0) throw new SocketException("Not bound"); - readerThread = NativeThread.current(); + readerThread = NativeThread.threadToSignal(); return fd; } } @@ -678,7 +678,7 @@ public final class NioSocketImpl extends SocketImpl implements PlatformSocketImp private void endAccept(boolean completed) throws SocketException { synchronized (stateLock) { int state = this.state; - readerThread = 0; + readerThread = null; if (state == ST_CLOSING) tryFinishClose(); if (!completed && state >= ST_CLOSING) @@ -844,7 +844,7 @@ public final class NioSocketImpl extends SocketImpl implements PlatformSocketImp */ private boolean tryClose() throws IOException { assert Thread.holdsLock(stateLock) && state == ST_CLOSING; - if (readerThread == 0 && writerThread == 0) { + if (readerThread == null && writerThread == null) { try { cleaner.clean(); } catch (UncheckedIOException ioe) { @@ -1143,8 +1143,8 @@ public final class NioSocketImpl extends SocketImpl implements PlatformSocketImp ensureOpenAndConnected(); if (!isInputClosed) { Net.shutdown(fd, Net.SHUT_RD); - if (NativeThread.isVirtualThread(readerThread)) { - Poller.stopPoll(fdVal(fd), Net.POLLIN); + if (readerThread != null && readerThread.isVirtual()) { + Poller.stopPoll(readerThread); } isInputClosed = true; } @@ -1157,8 +1157,8 @@ public final class NioSocketImpl extends SocketImpl implements PlatformSocketImp ensureOpenAndConnected(); if (!isOutputClosed) { Net.shutdown(fd, Net.SHUT_WR); - if (NativeThread.isVirtualThread(writerThread)) { - Poller.stopPoll(fdVal(fd), Net.POLLOUT); + if (writerThread != null && writerThread.isVirtual()) { + Poller.stopPoll(writerThread); } isOutputClosed = true; } diff --git a/src/java.base/share/classes/sun/nio/ch/Poller.java b/src/java.base/share/classes/sun/nio/ch/Poller.java index 4a2cb4d8fdf..9360bcc8327 100644 --- a/src/java.base/share/classes/sun/nio/ch/Poller.java +++ b/src/java.base/share/classes/sun/nio/ch/Poller.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017, 2024, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2017, 2025, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -25,61 +25,107 @@ package sun.nio.ch; import java.io.IOException; +import java.io.UncheckedIOException; +import java.lang.ref.Reference; import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.locks.LockSupport; import java.util.function.BooleanSupplier; +import jdk.internal.access.JavaLangAccess; +import jdk.internal.access.SharedSecrets; import jdk.internal.misc.InnocuousThread; +import jdk.internal.misc.TerminatingThreadLocal; +import jdk.internal.vm.Continuation; +import jdk.internal.vm.ContinuationSupport; import jdk.internal.vm.annotation.Stable; /** - * Polls file descriptors. Virtual threads invoke the poll method to park - * until a given file descriptor is ready for I/O. + * I/O poller to allow virtual threads park until a file descriptor is ready for I/O. */ public abstract class Poller { - private static final Pollers POLLERS; - static { - try { - var pollers = new Pollers(); - pollers.start(); - POLLERS = pollers; - } catch (IOException ioe) { - throw new ExceptionInInitializerError(ioe); - } - } + private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess(); - // the poller or sub-poller thread + // the poller group for the I/O pollers and poller threads + private static final PollerGroup POLLER_GROUP = createPollerGroup(); + + // the poller or sub-poller thread (used for observability only) private @Stable Thread owner; // maps file descriptors to parked Thread private final Map map = new ConcurrentHashMap<>(); + // shutdown (if supported by poller group) + private volatile boolean shutdown; + /** * Poller mode. */ enum Mode { /** - * ReadPoller and WritePoller are dedicated platform threads that block waiting - * for events and unpark virtual threads when file descriptors are ready for I/O. + * Read and write pollers are platform threads that block waiting for events and + * unpark virtual threads when file descriptors are ready for I/O. */ SYSTEM_THREADS, /** - * ReadPoller and WritePoller threads are virtual threads that poll for events, - * yielding between polls and unparking virtual threads when file descriptors are + * Read and write pollers are virtual threads that poll for events, yielding + * between polls and unparking virtual threads when file descriptors are * ready for I/O. If there are no events then the poller threads park until there * are I/O events to poll. This mode helps to integrate polling with virtual * thread scheduling. The approach is similar to the default scheme in "User-level * Threading: Have Your Cake and Eat It Too" by Karsten and Barghi 2020 * (https://dl.acm.org/doi/10.1145/3379483). */ - VTHREAD_POLLERS + VTHREAD_POLLERS, + + /** + * Read pollers are per-carrier virtual threads that poll for events, yielding + * between polls and unparking virtual threads when file descriptors are ready + * for I/O. If there are no events then the poller threads park until there + * are I/O events to poll. The write poller is a system-wide platform thread. + */ + POLLER_PER_CARRIER + } + + /** + * Create and return the PollerGroup. + */ + private static PollerGroup createPollerGroup() { + try { + PollerProvider provider; + if (System.getProperty("jdk.pollerMode") instanceof String s) { + Mode mode = switch (s) { + case "1" -> Mode.SYSTEM_THREADS; + case "2" -> Mode.VTHREAD_POLLERS; + case "3" -> Mode.POLLER_PER_CARRIER; + default -> { + throw new RuntimeException(s + " is not a valid polling mode"); + } + }; + provider = PollerProvider.createProvider(mode); + } else { + provider = PollerProvider.createProvider(); + } + + int readPollers = pollerCount("jdk.readPollers", provider.defaultReadPollers()); + int writePollers = pollerCount("jdk.writePollers", provider.defaultWritePollers()); + PollerGroup group = switch (provider.pollerMode()) { + case SYSTEM_THREADS -> new SystemThreadsPollerGroup(provider, readPollers, writePollers); + case VTHREAD_POLLERS -> new VThreadsPollerGroup(provider, readPollers, writePollers); + case POLLER_PER_CARRIER -> new PollerPerCarrierPollerGroup(provider, writePollers); + }; + group.start(); + return group; + } catch (IOException ioe) { + throw new UncheckedIOException(ioe); + } } /** @@ -89,9 +135,34 @@ public abstract class Poller { } /** - * Returns the poller's file descriptor, used when the read and write poller threads - * are virtual threads. - * + * Closes the poller and release resources. This method can only be used to cleanup + * when creating a poller group fails. + */ + abstract void close() throws IOException; + + /** + * Sets the poller's thread owner. + */ + private void setOwner() { + owner = Thread.currentThread(); + } + + /** + * Returns true if this poller is marked for shutdown. + */ + boolean isShutdown() { + return shutdown; + } + + /** + * Marks this poller for shutdown. + */ + private void setShutdown() { + shutdown = true; + } + + /** + * Returns the poller's file descriptor to use when polling with the master poller. * @throws UnsupportedOperationException if not supported */ int fdVal() { @@ -99,16 +170,18 @@ public abstract class Poller { } /** - * Register the file descriptor. The registration is "one shot", meaning it should - * be polled at most once. + * Register the file descriptor with the I/O event management facility so that it is + * polled when the file descriptor is ready for I/O. The registration is "one shot", + * meaning it should be polled at most once. */ - abstract void implRegister(int fdVal) throws IOException; + abstract void implStartPoll(int fdVal) throws IOException; /** - * Deregister the file descriptor. + * Deregister a file descriptor from the I/O event management facility. This may be + * a no-op in some implementations when the file descriptor has already been polled. * @param polled true if the file descriptor has already been polled */ - abstract void implDeregister(int fdVal, boolean polled); + abstract void implStopPoll(int fdVal, boolean polled) throws IOException; /** * Poll for events. The {@link #polled(int)} method is invoked for each @@ -116,15 +189,26 @@ public abstract class Poller { * * @param timeout if positive then block for up to {@code timeout} milliseconds, * if zero then don't block, if -1 then block indefinitely - * @return the number of file descriptors polled + * @return >0 if file descriptors are polled, 0 if no file descriptor polled */ abstract int poll(int timeout) throws IOException; + /** + * Wakeup the poller thread if blocked in poll so it can shutdown. + * @throws UnsupportedOperationException if not supported + */ + void wakeupPoller() throws IOException { + throw new UnsupportedOperationException(); + } + /** * Callback by the poll method when a file descriptor is polled. */ final void polled(int fdVal) { - wakeup(fdVal); + Thread t = map.remove(fdVal); + if (t != null) { + LockSupport.unpark(t); + } } /** @@ -132,19 +216,10 @@ public abstract class Poller { * @param fdVal the file descriptor * @param event POLLIN or POLLOUT * @param nanos the waiting time or 0 to wait indefinitely - * @param supplier supplies a boolean to indicate if the enclosing object is open + * @param isOpen supplies a boolean to indicate if the enclosing object is open */ - static void poll(int fdVal, int event, long nanos, BooleanSupplier supplier) - throws IOException - { - assert nanos >= 0L; - if (event == Net.POLLIN) { - POLLERS.readPoller(fdVal).poll(fdVal, nanos, supplier); - } else if (event == Net.POLLOUT) { - POLLERS.writePoller(fdVal).poll(fdVal, nanos, supplier); - } else { - assert false; - } + public static void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException { + POLLER_GROUP.poll(fdVal, event, nanos, isOpen); } /** @@ -152,45 +227,24 @@ public abstract class Poller { * @param fdVal the Selector's file descriptor * @param nanos the waiting time or 0 to wait indefinitely */ - static void pollSelector(int fdVal, long nanos) throws IOException { - assert nanos >= 0L; - Poller poller = POLLERS.masterPoller(); - if (poller == null) { - poller = POLLERS.readPoller(fdVal); - } - poller.poll(fdVal, nanos, () -> true); + public static void pollSelector(int fdVal, long nanos) throws IOException { + POLLER_GROUP.pollSelector(fdVal, nanos); } /** - * If there is a thread polling the given file descriptor for the given event then - * the thread is unparked. + * Unpark the given thread so that it stops polling. */ - static void stopPoll(int fdVal, int event) { - if (event == Net.POLLIN) { - POLLERS.readPoller(fdVal).wakeup(fdVal); - } else if (event == Net.POLLOUT) { - POLLERS.writePoller(fdVal).wakeup(fdVal); - } else { - throw new IllegalArgumentException(); - } - } - - /** - * If there are any threads polling the given file descriptor then they are unparked. - */ - static void stopPoll(int fdVal) { - stopPoll(fdVal, Net.POLLIN); - stopPoll(fdVal, Net.POLLOUT); + public static void stopPoll(Thread thread) { + LockSupport.unpark(thread); } /** * Parks the current thread until a file descriptor is ready. */ - private void poll(int fdVal, long nanos, BooleanSupplier supplier) throws IOException { - register(fdVal); + private void poll(int fdVal, long nanos, BooleanSupplier isOpen) throws IOException { + startPoll(fdVal); try { - boolean isOpen = supplier.getAsBoolean(); - if (isOpen) { + if (isOpen.getAsBoolean() && !isShutdown()) { if (nanos > 0) { LockSupport.parkNanos(nanos); } else { @@ -198,42 +252,38 @@ public abstract class Poller { } } } finally { - deregister(fdVal); + stopPoll(fdVal); } } /** - * Registers the file descriptor to be polled at most once when the file descriptor - * is ready for I/O. + * Register a file descriptor with the I/O event management facility so that it is + * polled when the file descriptor is ready for I/O. */ - private void register(int fdVal) throws IOException { + private void startPoll(int fdVal) throws IOException { Thread previous = map.put(fdVal, Thread.currentThread()); assert previous == null; try { - implRegister(fdVal); + implStartPoll(fdVal); } catch (Throwable t) { map.remove(fdVal); throw t; + } finally { + Reference.reachabilityFence(this); } } /** - * Deregister the file descriptor so that the file descriptor is not polled. + * Deregister a file descriptor from the I/O event management facility. */ - private void deregister(int fdVal) { + private void stopPoll(int fdVal) throws IOException { Thread previous = map.remove(fdVal); boolean polled = (previous == null); assert polled || previous == Thread.currentThread(); - implDeregister(fdVal, polled); - } - - /** - * Unparks any thread that is polling the given file descriptor. - */ - private void wakeup(int fdVal) { - Thread t = map.remove(fdVal); - if (t != null) { - LockSupport.unpark(t); + try { + implStopPoll(fdVal, polled); + } finally { + Reference.reachabilityFence(this); } } @@ -242,9 +292,9 @@ public abstract class Poller { * descriptor that is polled. */ private void pollerLoop() { - owner = Thread.currentThread(); + setOwner(); try { - for (;;) { + while (!isShutdown()) { poll(-1); } } catch (Exception e) { @@ -263,10 +313,10 @@ public abstract class Poller { */ private void subPollerLoop(Poller masterPoller) { assert Thread.currentThread().isVirtual(); - owner = Thread.currentThread(); + setOwner(); try { int polled = 0; - for (;;) { + while (!isShutdown()) { if (polled == 0) { masterPoller.poll(fdVal(), 0, () -> true); // park } else { @@ -280,194 +330,463 @@ public abstract class Poller { } /** - * Returns the number I/O operations currently registered with this poller. + * Unparks all threads waiting on a file descriptor registered with this poller. */ - public int registered() { - return map.size(); + private void wakeupAll() { + map.values().forEach(LockSupport::unpark); } @Override public String toString() { return String.format("%s [registered = %d, owner = %s]", - Objects.toIdentityString(this), registered(), owner); + Objects.toIdentityString(this), map.size(), owner); } /** - * The Pollers used for read and write events. + * A group of poller threads that support virtual threads polling file descriptors. */ - private static class Pollers { + private static abstract class PollerGroup { private final PollerProvider provider; - private final Poller.Mode pollerMode; - private final Poller masterPoller; - private final Poller[] readPollers; - private final Poller[] writePollers; - - // used by start method to executor is kept alive - private Executor executor; - - /** - * Creates the Poller instances based on configuration. - */ - Pollers() throws IOException { - PollerProvider provider = PollerProvider.provider(); - Poller.Mode mode; - String s = System.getProperty("jdk.pollerMode"); - if (s != null) { - if (s.equalsIgnoreCase(Mode.SYSTEM_THREADS.name()) || s.equals("1")) { - mode = Mode.SYSTEM_THREADS; - } else if (s.equalsIgnoreCase(Mode.VTHREAD_POLLERS.name()) || s.equals("2")) { - mode = Mode.VTHREAD_POLLERS; - } else { - throw new RuntimeException("Can't parse '" + s + "' as polling mode"); - } - } else { - mode = provider.defaultPollerMode(); - } - - // vthread poller mode needs a master poller - Poller masterPoller = (mode == Mode.VTHREAD_POLLERS) - ? provider.readPoller(false) - : null; - - // read pollers (or sub-pollers) - int readPollerCount = pollerCount("jdk.readPollers", provider.defaultReadPollers(mode)); - Poller[] readPollers = new Poller[readPollerCount]; - for (int i = 0; i < readPollerCount; i++) { - readPollers[i] = provider.readPoller(mode == Mode.VTHREAD_POLLERS); - } - - // write pollers (or sub-pollers) - int writePollerCount = pollerCount("jdk.writePollers", provider.defaultWritePollers(mode)); - Poller[] writePollers = new Poller[writePollerCount]; - for (int i = 0; i < writePollerCount; i++) { - writePollers[i] = provider.writePoller(mode == Mode.VTHREAD_POLLERS); - } + PollerGroup(PollerProvider provider) { this.provider = provider; - this.pollerMode = mode; - this.masterPoller = masterPoller; - this.readPollers = readPollers; - this.writePollers = writePollers; + } + + final PollerProvider provider() { + return provider; } /** - * Starts the Poller threads. + * Starts the poller group and any system-wide poller threads. */ - void start() { - if (pollerMode == Mode.VTHREAD_POLLERS) { - startPlatformThread("MasterPoller", masterPoller::pollerLoop); - ThreadFactory factory = Thread.ofVirtual() - .inheritInheritableThreadLocals(false) - .name("SubPoller-", 0) - .uncaughtExceptionHandler((t, e) -> e.printStackTrace()) - .factory(); - executor = Executors.newThreadPerTaskExecutor(factory); - Arrays.stream(readPollers).forEach(p -> { - executor.execute(() -> p.subPollerLoop(masterPoller)); - }); - Arrays.stream(writePollers).forEach(p -> { - executor.execute(() -> p.subPollerLoop(masterPoller)); - }); - } else { - Arrays.stream(readPollers).forEach(p -> { - startPlatformThread("Read-Poller", p::pollerLoop); - }); - Arrays.stream(writePollers).forEach(p -> { - startPlatformThread("Write-Poller", p::pollerLoop); - }); - } - } + abstract void start(); /** - * Returns the master poller, or null if there is no master poller. + * Parks the current thread until a file descriptor is ready for the given op. */ - Poller masterPoller() { - return masterPoller; - } + abstract void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException; /** - * Returns the read poller for the given file descriptor. + * Parks the current thread until a Selector's file descriptor is ready. */ - Poller readPoller(int fdVal) { - int index = provider.fdValToIndex(fdVal, readPollers.length); - return readPollers[index]; - } - - /** - * Returns the write poller for the given file descriptor. - */ - Poller writePoller(int fdVal) { - int index = provider.fdValToIndex(fdVal, writePollers.length); - return writePollers[index]; - } - - /** - * Return the list of read pollers. - */ - List readPollers() { - return List.of(readPollers); - } - - /** - * Return the list of write pollers. - */ - List writePollers() { - return List.of(writePollers); - } - - - /** - * Reads the given property name to get the poller count. If the property is - * set then the value must be a power of 2. Returns 1 if the property is not - * set. - * @throws IllegalArgumentException if the property is set to a value that - * is not a power of 2. - */ - private static int pollerCount(String propName, int defaultCount) { - String s = System.getProperty(propName); - int count = (s != null) ? Integer.parseInt(s) : defaultCount; - - // check power of 2 - if (count != Integer.highestOneBit(count)) { - String msg = propName + " is set to a value that is not a power of 2"; - throw new IllegalArgumentException(msg); - } - return count; + void pollSelector(int fdVal, long nanos) throws IOException { + poll(fdVal, Net.POLLIN, nanos, () -> true); } /** * Starts a platform thread to run the given task. */ - private void startPlatformThread(String name, Runnable task) { - try { - Thread thread = InnocuousThread.newSystemThread(name, task); - thread.setDaemon(true); - thread.setUncaughtExceptionHandler((t, e) -> e.printStackTrace()); - thread.start(); - } catch (Exception e) { - throw new InternalError(e); + protected final void startPlatformThread(String name, Runnable task) { + Thread thread = InnocuousThread.newSystemThread(name, task); + thread.setDaemon(true); + thread.setUncaughtExceptionHandler((t, e) -> e.printStackTrace()); + thread.start(); + } + + /** + * Return the master poller, or null if no master poller. + */ + abstract Poller masterPoller(); + + /** + * Return the read pollers. + */ + abstract List readPollers(); + + /** + * Return the write pollers. + */ + abstract List writePollers(); + + /** + * Close the given pollers. + */ + static void closeAll(Poller... pollers) { + for (Poller poller : pollers) { + if (poller != null) { + try { + poller.close(); + } catch (IOException _) { } + } } } } + /** + * SYSTEM_THREADS poller group. The read and write pollers are system-wide platform threads. + */ + private static class SystemThreadsPollerGroup extends PollerGroup { + // system-wide read and write pollers + private final Poller[] readPollers; + private final Poller[] writePollers; + + SystemThreadsPollerGroup(PollerProvider provider, + int readPollerCount, + int writePollerCount) throws IOException { + super(provider); + Poller[] readPollers = new Poller[readPollerCount]; + Poller[] writePollers = new Poller[writePollerCount]; + try { + for (int i = 0; i < readPollerCount; i++) { + readPollers[i] = provider.readPoller(false); + } + for (int i = 0; i < writePollerCount; i++) { + writePollers[i] = provider.writePoller(false); + } + } catch (Throwable e) { + closeAll(readPollers); + closeAll(writePollers); + throw e; + } + + this.readPollers = readPollers; + this.writePollers = writePollers; + } + + @Override + void start() { + Arrays.stream(readPollers).forEach(p -> { + startPlatformThread("Read-Poller", p::pollerLoop); + }); + Arrays.stream(writePollers).forEach(p -> { + startPlatformThread("Write-Poller", p::pollerLoop); + }); + } + + private Poller readPoller(int fdVal) { + int index = provider().fdValToIndex(fdVal, readPollers.length); + return readPollers[index]; + } + + private Poller writePoller(int fdVal) { + int index = provider().fdValToIndex(fdVal, writePollers.length); + return writePollers[index]; + } + + @Override + void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException { + Poller poller = (event == Net.POLLIN) + ? readPoller(fdVal) + : writePoller(fdVal); + poller.poll(fdVal, nanos, isOpen); + } + + @Override + Poller masterPoller() { + return null; + } + + @Override + List readPollers() { + return List.of(readPollers); + } + + @Override + List writePollers() { + return List.of(writePollers); + } + } + + /** + * VTHREAD_POLLERS poller group. The read and write pollers are virtual threads. + * When read and write pollers need to block then they register with a system-wide + * "master poller" that runs in a dedicated platform thread. + */ + private static class VThreadsPollerGroup extends PollerGroup { + private final Poller masterPoller; + private final Poller[] readPollers; + private final Poller[] writePollers; + + // keep virtual thread pollers alive + private final Executor executor; + + VThreadsPollerGroup(PollerProvider provider, + int readPollerCount, + int writePollerCount) throws IOException { + super(provider); + Poller masterPoller = provider.readPoller(false); + Poller[] readPollers = new Poller[readPollerCount]; + Poller[] writePollers = new Poller[writePollerCount]; + + try { + for (int i = 0; i < readPollerCount; i++) { + readPollers[i] = provider.readPoller(true); + } + for (int i = 0; i < writePollerCount; i++) { + writePollers[i] = provider.writePoller(true); + } + } catch (Throwable e) { + masterPoller.close(); + closeAll(readPollers); + closeAll(writePollers); + throw e; + } + + this.masterPoller = masterPoller; + this.readPollers = readPollers; + this.writePollers = writePollers; + + ThreadFactory factory = Thread.ofVirtual() + .inheritInheritableThreadLocals(false) + .name("SubPoller-", 0) + .uncaughtExceptionHandler((_, e) -> e.printStackTrace()) + .factory(); + this.executor = Executors.newThreadPerTaskExecutor(factory); + } + + @Override + void start() { + startPlatformThread("Master-Poller", masterPoller::pollerLoop); + Arrays.stream(readPollers).forEach(p -> { + executor.execute(() -> p.subPollerLoop(masterPoller)); + }); + Arrays.stream(writePollers).forEach(p -> { + executor.execute(() -> p.subPollerLoop(masterPoller)); + }); + } + + private Poller readPoller(int fdVal) { + int index = provider().fdValToIndex(fdVal, readPollers.length); + return readPollers[index]; + } + + private Poller writePoller(int fdVal) { + int index = provider().fdValToIndex(fdVal, writePollers.length); + return writePollers[index]; + } + + @Override + void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException { + Poller poller = (event == Net.POLLIN) + ? readPoller(fdVal) + : writePoller(fdVal); + poller.poll(fdVal, nanos, isOpen); + } + + @Override + void pollSelector(int fdVal, long nanos) throws IOException { + masterPoller.poll(fdVal, nanos, () -> true); + } + + @Override + Poller masterPoller() { + return masterPoller; + } + + @Override + List readPollers() { + return List.of(readPollers); + } + + @Override + List writePollers() { + return List.of(writePollers); + } + } + + /** + * POLLER_PER_CARRIER poller group. The read poller is a per-carrier virtual thread. + * When a virtual thread polls a file descriptor for POLLIN, then it will use (almost + * always, not guaranteed) the read poller for its carrier. When a read poller needs + * to block then it registers with a system-wide "master poller" that runs in a + * dedicated platform thread. The read poller terminates if the carrier terminates. + * The write pollers are system-wide platform threads (usually one). + */ + private static class PollerPerCarrierPollerGroup extends PollerGroup { + private record CarrierPoller(PollerPerCarrierPollerGroup group, Poller readPoller) { } + private static final TerminatingThreadLocal CARRIER_POLLER = + new TerminatingThreadLocal<>() { + @Override + protected void threadTerminated(CarrierPoller carrierPoller) { + Poller readPoller = carrierPoller.readPoller(); + carrierPoller.group().carrierTerminated(readPoller); + } + }; + + private final Poller masterPoller; + private final Set readPollers; + private final Poller[] writePollers; + + /** + * Create a PollerPerCarrierPollerGroup with the given number of write pollers. + */ + PollerPerCarrierPollerGroup(PollerProvider provider, + int writePollerCount) throws IOException { + super(provider); + Poller masterPoller = provider.readPoller(false); + Poller[] writePollers = new Poller[writePollerCount]; + try { + for (int i = 0; i < writePollerCount; i++) { + writePollers[i] = provider.writePoller(false); + } + } catch (Throwable e) { + masterPoller.close(); + closeAll(writePollers); + throw e; + } + this.masterPoller = masterPoller; + this.readPollers = ConcurrentHashMap.newKeySet();; + this.writePollers = writePollers; + } + + @Override + void start() { + startPlatformThread("Master-Poller", masterPoller::pollerLoop); + Arrays.stream(writePollers).forEach(p -> { + startPlatformThread("Write-Poller", p::pollerLoop); + }); + } + + private Poller writePoller(int fdVal) { + int index = provider().fdValToIndex(fdVal, writePollers.length); + return writePollers[index]; + } + + /** + * Starts a read sub-poller in a virtual thread. + */ + private Poller startReadPoller() throws IOException { + assert Thread.currentThread().isVirtual() && ContinuationSupport.isSupported(); + + // create read sub-poller + Poller readPoller = provider().readPoller(true); + readPollers.add(readPoller); + + // start virtual thread to execute sub-polling loop + Thread carrier = JLA.currentCarrierThread(); + Thread.ofVirtual() + .inheritInheritableThreadLocals(false) + .name(carrier.getName() + "-Read-Poller") + .uncaughtExceptionHandler((_, e) -> e.printStackTrace()) + .start(() -> subPollerLoop(readPoller)); + return readPoller; + } + + /** + * Returns the read poller for the current carrier, starting it if required. + */ + private Poller readPoller() throws IOException { + assert Thread.currentThread().isVirtual() && ContinuationSupport.isSupported(); + Continuation.pin(); + try { + CarrierPoller carrierPoller = CARRIER_POLLER.get(); + if (carrierPoller != null) { + return carrierPoller.readPoller(); + } else { + // first poll on this carrier will start poller + Poller readPoller = startReadPoller(); + CARRIER_POLLER.set(new CarrierPoller(this, readPoller)); + return readPoller; + } + } finally { + Continuation.unpin(); + } + } + + @Override + void poll(int fdVal, int event, long nanos, BooleanSupplier isOpen) throws IOException { + // for POLLIN, get the read poller for this carrier + if (event == Net.POLLIN + && Thread.currentThread().isVirtual() + && ContinuationSupport.isSupported()) { + readPoller().poll(fdVal, nanos, isOpen); + return; + } + + // -XX:-VMContinuations or POLLIN from platform thread does master poller + if (event == Net.POLLIN) { + masterPoller.poll(fdVal, nanos, isOpen); + } else { + writePoller(fdVal).poll(fdVal, nanos, isOpen); + } + } + + @Override + void pollSelector(int fdVal, long nanos) throws IOException { + masterPoller.poll(fdVal, nanos, () -> true); + } + + /** + * Sub-poller polling loop. + */ + private void subPollerLoop(Poller readPoller) { + try { + readPoller.subPollerLoop(masterPoller); + } finally { + // wakeup all threads waiting on file descriptors registered with the + // read poller, these I/O operation will migrate to another carrier. + readPoller.wakeupAll(); + + // remove from serviceability view + readPollers.remove(readPoller); + } + } + + /** + * Invoked by the carrier thread before it terminates. + */ + private void carrierTerminated(Poller readPoller) { + readPoller.setShutdown(); + try { + readPoller.wakeupPoller(); + } catch (Throwable e) { + e.printStackTrace(); + } + } + + @Override + Poller masterPoller() { + return masterPoller; + } + + @Override + List readPollers() { + return readPollers.stream().toList(); + } + + @Override + List writePollers() { + return List.of(writePollers); + } + } + + /** + * Reads the given property name to get the poller count. If the property is + * set then the value must be a power of 2. Returns 1 if the property is not + * set. + * @throws IllegalArgumentException if the property is set to a value that + * is not a power of 2. + */ + private static int pollerCount(String propName, int defaultCount) { + String s = System.getProperty(propName); + int count = (s != null) ? Integer.parseInt(s) : defaultCount; + + // check power of 2 + if (count != Integer.highestOneBit(count)) { + String msg = propName + " is set to a value that is not a power of 2"; + throw new IllegalArgumentException(msg); + } + return count; + } + /** * Return the master poller or null if there is no master poller. */ public static Poller masterPoller() { - return POLLERS.masterPoller(); + return POLLER_GROUP.masterPoller(); } /** * Return the list of read pollers. */ public static List readPollers() { - return POLLERS.readPollers(); + return POLLER_GROUP.readPollers(); } /** * Return the list of write pollers. */ public static List writePollers() { - return POLLERS.writePollers(); + return POLLER_GROUP.writePollers(); } } diff --git a/src/java.base/share/classes/sun/nio/ch/PollerProvider.java b/src/java.base/share/classes/sun/nio/ch/PollerProvider.java index b10ec309265..7d19b72d2fc 100644 --- a/src/java.base/share/classes/sun/nio/ch/PollerProvider.java +++ b/src/java.base/share/classes/sun/nio/ch/PollerProvider.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017, 2023, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2017, 2025, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -30,38 +30,43 @@ import java.io.IOException; * Provider class for Poller implementations. */ abstract class PollerProvider { - private static final PollerProvider INSTANCE = new DefaultPollerProvider(); + private final Poller.Mode mode; - PollerProvider() { } + PollerProvider(Poller.Mode mode) { + this.mode = mode; + } - /** - * Returns the system-wide PollerProvider. - */ - static PollerProvider provider() { - return INSTANCE; + final Poller.Mode pollerMode() { + return mode; } /** - * Returns the default poller mode. - * @implSpec The default implementation uses system threads. + * Creates a PollerProvider that uses its preferred/default poller mode. */ - Poller.Mode defaultPollerMode() { - return Poller.Mode.SYSTEM_THREADS; + static PollerProvider createProvider() { + return new DefaultPollerProvider(); } /** - * Default number of read pollers for the given mode. The count must be a power of 2. + * Creates a PollerProvider that uses the given poller mode. + */ + static PollerProvider createProvider(Poller.Mode mode) { + return new DefaultPollerProvider(mode); + } + + /** + * Default number of read pollers. The count must be a power of 2. * @implSpec The default implementation returns 1. */ - int defaultReadPollers(Poller.Mode mode) { + int defaultReadPollers() { return 1; } /** - * Default number of write pollers for the given mode. The count must be a power of 2. + * Default number of write pollers. The count must be a power of 2. * @implSpec The default implementation returns 1. */ - int defaultWritePollers(Poller.Mode mode) { + int defaultWritePollers() { return 1; } @@ -74,13 +79,13 @@ abstract class PollerProvider { } /** - * Creates a Poller for read ops. + * Creates a Poller for POLLIN polling. * @param subPoller true to create a sub-poller */ abstract Poller readPoller(boolean subPoller) throws IOException; /** - * Creates a Poller for write ops. + * Creates a Poller for POLLOUT polling. * @param subPoller true to create a sub-poller */ abstract Poller writePoller(boolean subPoller) throws IOException; diff --git a/src/java.base/share/classes/sun/nio/ch/ServerSocketChannelImpl.java b/src/java.base/share/classes/sun/nio/ch/ServerSocketChannelImpl.java index e75110a76ad..98f79e6671b 100644 --- a/src/java.base/share/classes/sun/nio/ch/ServerSocketChannelImpl.java +++ b/src/java.base/share/classes/sun/nio/ch/ServerSocketChannelImpl.java @@ -90,8 +90,8 @@ class ServerSocketChannelImpl private static final int ST_CLOSED = 2; private int state; - // ID of native thread currently blocked in this channel, for signalling - private long thread; + // Thread currently blocked in this channel, for signalling + private Thread thread; // Binding private SocketAddress localAddress; // null => unbound @@ -349,7 +349,7 @@ class ServerSocketChannelImpl if (localAddress == null) throw new NotYetBoundException(); if (blocking) - thread = NativeThread.current(); + thread = NativeThread.threadToSignal(); } } @@ -364,7 +364,7 @@ class ServerSocketChannelImpl { if (blocking) { synchronized (stateLock) { - thread = 0; + thread = null; if (state == ST_CLOSING) { tryFinishClose(); } @@ -551,7 +551,7 @@ class ServerSocketChannelImpl */ private boolean tryClose() throws IOException { assert Thread.holdsLock(stateLock) && state == ST_CLOSING; - if ((thread == 0) && !isRegistered()) { + if ((thread == null) && !isRegistered()) { state = ST_CLOSED; nd.close(fd); return true; @@ -583,7 +583,7 @@ class ServerSocketChannelImpl assert state < ST_CLOSING; state = ST_CLOSING; if (!tryClose()) { - nd.preClose(fd, thread, 0); + nd.preClose(fd, thread, null); } } } diff --git a/src/java.base/share/classes/sun/nio/ch/SocketChannelImpl.java b/src/java.base/share/classes/sun/nio/ch/SocketChannelImpl.java index 868ed3b64bc..010b1832190 100644 --- a/src/java.base/share/classes/sun/nio/ch/SocketChannelImpl.java +++ b/src/java.base/share/classes/sun/nio/ch/SocketChannelImpl.java @@ -113,9 +113,9 @@ class SocketChannelImpl private static final int ST_CLOSED = 4; private volatile int state; // need stateLock to change - // IDs of native threads doing reads and writes, for signalling - private long readerThread; - private long writerThread; + // Threads doing reads and writes, for signalling + private Thread readerThread; + private Thread writerThread; // Binding private SocketAddress localAddress; @@ -368,7 +368,7 @@ class SocketChannelImpl synchronized (stateLock) { ensureOpen(); // record thread so it can be signalled if needed - readerThread = NativeThread.current(); + readerThread = NativeThread.threadToSignal(); } } } @@ -384,7 +384,7 @@ class SocketChannelImpl { if (blocking) { synchronized (stateLock) { - readerThread = 0; + readerThread = null; if (state == ST_CLOSING) { tryFinishClose(); } @@ -522,7 +522,7 @@ class SocketChannelImpl if (isOutputClosed) throw new ClosedChannelException(); // record thread so it can be signalled if needed - writerThread = NativeThread.current(); + writerThread = NativeThread.threadToSignal(); } } } @@ -538,7 +538,7 @@ class SocketChannelImpl { if (blocking) { synchronized (stateLock) { - writerThread = 0; + writerThread = null; if (state == ST_CLOSING) { tryFinishClose(); } @@ -673,7 +673,7 @@ class SocketChannelImpl ensureOpenAndConnected(); if (isOutputClosed) throw new ClosedChannelException(); - writerThread = NativeThread.current(); + writerThread = NativeThread.threadToSignal(); completed = true; } } finally { @@ -689,7 +689,7 @@ class SocketChannelImpl */ void afterTransferTo(boolean completed) throws AsynchronousCloseException { synchronized (stateLock) { - writerThread = 0; + writerThread = null; if (state == ST_CLOSING) { tryFinishClose(); } @@ -874,7 +874,7 @@ class SocketChannelImpl if (blocking) { // record thread so it can be signalled if needed - readerThread = NativeThread.current(); + readerThread = NativeThread.threadToSignal(); } } } @@ -993,7 +993,7 @@ class SocketChannelImpl throw new NoConnectionPendingException(); if (blocking) { // record thread so it can be signalled if needed - readerThread = NativeThread.current(); + readerThread = NativeThread.threadToSignal(); } } } @@ -1072,7 +1072,7 @@ class SocketChannelImpl */ private boolean tryClose() throws IOException { assert Thread.holdsLock(stateLock) && state == ST_CLOSING; - if ((readerThread == 0) && (writerThread == 0) && !isRegistered()) { + if ((readerThread == null) && (writerThread == null) && !isRegistered()) { state = ST_CLOSED; nd.close(fd); return true; @@ -1215,11 +1215,8 @@ class SocketChannelImpl throw new NotYetConnectedException(); if (!isInputClosed) { Net.shutdown(fd, Net.SHUT_RD); - long reader = readerThread; - if (NativeThread.isVirtualThread(reader)) { - Poller.stopPoll(fdVal, Net.POLLIN); - } else if (NativeThread.isNativeThread(reader)) { - NativeThread.signal(reader); + if (readerThread != null && readerThread.isVirtual()) { + Poller.stopPoll(readerThread); } isInputClosed = true; } @@ -1235,11 +1232,8 @@ class SocketChannelImpl throw new NotYetConnectedException(); if (!isOutputClosed) { Net.shutdown(fd, Net.SHUT_WR); - long writer = writerThread; - if (NativeThread.isVirtualThread(writer)) { - Poller.stopPoll(fdVal, Net.POLLOUT); - } else if (NativeThread.isNativeThread(writer)) { - NativeThread.signal(writer); + if (writerThread != null && writerThread.isVirtual()) { + Poller.stopPoll(writerThread); } isOutputClosed = true; } diff --git a/src/java.base/unix/classes/sun/nio/ch/NativeThread.java b/src/java.base/unix/classes/sun/nio/ch/NativeThread.java index 8d0bcea48d9..75cf36d6d52 100644 --- a/src/java.base/unix/classes/sun/nio/ch/NativeThread.java +++ b/src/java.base/unix/classes/sun/nio/ch/NativeThread.java @@ -25,6 +25,9 @@ package sun.nio.ch; +import java.util.concurrent.locks.LockSupport; +import jdk.internal.access.JavaLangAccess; +import jdk.internal.access.SharedSecrets; // Signalling operations on native threads // @@ -37,45 +40,38 @@ package sun.nio.ch; // always returns -1 and the signal(long) method has no effect. public class NativeThread { - private static final long VIRTUAL_THREAD_ID = -1L; + private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess(); + + private NativeThread() { } /** - * Returns the id of the current native thread if the platform can signal - * native threads, 0 if the platform can not signal native threads, or - * -1L if the current thread is a virtual thread. - */ - public static long current() { - if (Thread.currentThread().isVirtual()) { - return VIRTUAL_THREAD_ID; - } else { - return current0(); - } - } - - /** - * Signals the given native thread. + * Returns the Thread to signal the current thread. * - * @throws IllegalArgumentException if tid is not a token to a native thread + * The first use of this method on a platform thread will capture the thread's + * native thread ID. */ - public static void signal(long tid) { - if (tid == 0 || tid == VIRTUAL_THREAD_ID) - throw new IllegalArgumentException(); - signal0(tid); + public static Thread threadToSignal() { + Thread t = Thread.currentThread(); + if (!t.isVirtual() && JLA.nativeThreadID(t) == 0) { + JLA.setThreadNativeID(current0()); + } + return t; } /** - * Returns true the tid is the id of a native thread. + * Signals the given thread. For a platform thread it sends a signal to the thread. + * For a virtual thread it just unparks it. + * @throws IllegalStateException if the thread is a platform thread that hasn't set its native ID */ - static boolean isNativeThread(long tid) { - return (tid != 0 && tid != VIRTUAL_THREAD_ID); - } - - /** - * Returns true if tid is -1L. - * @see #current() - */ - static boolean isVirtualThread(long tid) { - return (tid == VIRTUAL_THREAD_ID); + public static void signal(Thread thread) { + if (thread.isVirtual()) { + LockSupport.unpark(thread); + } else { + long id = JLA.nativeThreadID(thread); + if (id == 0) + throw new IllegalStateException("Native thread ID not set"); + signal0(id); + } } /** diff --git a/src/java.base/unix/classes/sun/nio/ch/SinkChannelImpl.java b/src/java.base/unix/classes/sun/nio/ch/SinkChannelImpl.java index b073c287bfb..09d280b370c 100644 --- a/src/java.base/unix/classes/sun/nio/ch/SinkChannelImpl.java +++ b/src/java.base/unix/classes/sun/nio/ch/SinkChannelImpl.java @@ -63,8 +63,8 @@ class SinkChannelImpl private static final int ST_CLOSED = 2; private int state; - // ID of native thread doing write, for signalling - private long thread; + // Thread doing write, for signalling + private Thread writerThread; // True if the channel's socket has been forced into non-blocking mode // by a virtual thread. It cannot be reset. When the channel is in @@ -120,7 +120,7 @@ class SinkChannelImpl */ private boolean tryClose() throws IOException { assert Thread.holdsLock(stateLock) && state == ST_CLOSING; - if (thread == 0 && !isRegistered()) { + if (writerThread == null && !isRegistered()) { state = ST_CLOSED; nd.close(fd); return true; @@ -152,7 +152,7 @@ class SinkChannelImpl assert state < ST_CLOSING; state = ST_CLOSING; if (!tryClose()) { - nd.preClose(fd, thread, 0); + nd.preClose(fd, null, writerThread); } } } @@ -270,7 +270,7 @@ class SinkChannelImpl synchronized (stateLock) { ensureOpen(); if (blocking) - thread = NativeThread.current(); + writerThread = NativeThread.threadToSignal(); } } @@ -285,7 +285,7 @@ class SinkChannelImpl { if (blocking) { synchronized (stateLock) { - thread = 0; + writerThread = null; if (state == ST_CLOSING) { tryFinishClose(); } diff --git a/src/java.base/unix/classes/sun/nio/ch/SourceChannelImpl.java b/src/java.base/unix/classes/sun/nio/ch/SourceChannelImpl.java index 571d7f483d2..52188c12b17 100644 --- a/src/java.base/unix/classes/sun/nio/ch/SourceChannelImpl.java +++ b/src/java.base/unix/classes/sun/nio/ch/SourceChannelImpl.java @@ -63,8 +63,8 @@ class SourceChannelImpl private static final int ST_CLOSED = 2; private int state; - // ID of native thread doing read, for signalling - private long thread; + // Thread doing read, for signalling + private Thread readerThread; // True if the channel's socket has been forced into non-blocking mode // by a virtual thread. It cannot be reset. When the channel is in @@ -120,7 +120,7 @@ class SourceChannelImpl */ private boolean tryClose() throws IOException { assert Thread.holdsLock(stateLock) && state == ST_CLOSING; - if (thread == 0 && !isRegistered()) { + if (readerThread == null && !isRegistered()) { state = ST_CLOSED; nd.close(fd); return true; @@ -152,7 +152,7 @@ class SourceChannelImpl assert state < ST_CLOSING; state = ST_CLOSING; if (!tryClose()) { - nd.preClose(fd, thread, 0); + nd.preClose(fd, readerThread, null); } } } @@ -269,8 +269,9 @@ class SourceChannelImpl } synchronized (stateLock) { ensureOpen(); - if (blocking) - thread = NativeThread.current(); + if (blocking) { + readerThread = NativeThread.threadToSignal(); + } } } @@ -285,7 +286,7 @@ class SourceChannelImpl { if (blocking) { synchronized (stateLock) { - thread = 0; + readerThread = null; if (state == ST_CLOSING) { tryFinishClose(); } diff --git a/src/java.base/unix/classes/sun/nio/ch/UnixDispatcher.java b/src/java.base/unix/classes/sun/nio/ch/UnixDispatcher.java index 4cdd0c400ec..3656b172822 100644 --- a/src/java.base/unix/classes/sun/nio/ch/UnixDispatcher.java +++ b/src/java.base/unix/classes/sun/nio/ch/UnixDispatcher.java @@ -36,15 +36,17 @@ abstract class UnixDispatcher extends NativeDispatcher { close0(fd); } - private void signalThreads(long reader, long writer) { - if (NativeThread.isNativeThread(reader)) + private void signalThreads(Thread reader, Thread writer) { + if (reader != null) { NativeThread.signal(reader); - if (NativeThread.isNativeThread(writer)) + } + if (writer != null) { NativeThread.signal(writer); + } } @Override - void implPreClose(FileDescriptor fd, long reader, long writer) throws IOException { + void implPreClose(FileDescriptor fd, Thread reader, Thread writer) throws IOException { if (SUPPORTS_PENDING_SIGNALS) { signalThreads(reader, writer); } diff --git a/src/java.base/unix/native/libnio/ch/IOUtil.c b/src/java.base/unix/native/libnio/ch/IOUtil.c index dfa99658fa6..3a7693b2ee0 100644 --- a/src/java.base/unix/native/libnio/ch/IOUtil.c +++ b/src/java.base/unix/native/libnio/ch/IOUtil.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2000, 2021, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2000, 2025, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -77,10 +77,9 @@ configureBlocking(int fd, jboolean blocking) } JNIEXPORT void JNICALL -Java_sun_nio_ch_IOUtil_configureBlocking(JNIEnv *env, jclass clazz, - jobject fdo, jboolean blocking) +Java_sun_nio_ch_IOUtil_configureBlocking(JNIEnv *env, jclass clazz, jint fd, jboolean blocking) { - if (configureBlocking(fdval(env, fdo), blocking) < 0) + if (configureBlocking(fd, blocking) < 0) JNU_ThrowIOExceptionWithLastError(env, "Configure blocking failed"); } diff --git a/src/java.base/windows/classes/sun/nio/ch/DefaultPollerProvider.java b/src/java.base/windows/classes/sun/nio/ch/DefaultPollerProvider.java index abd2f34a229..d1af62fbd73 100644 --- a/src/java.base/windows/classes/sun/nio/ch/DefaultPollerProvider.java +++ b/src/java.base/windows/classes/sun/nio/ch/DefaultPollerProvider.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019, 2023, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2019, 2025, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -30,11 +30,19 @@ import java.io.IOException; * Default PollerProvider for Windows based on wepoll. */ class DefaultPollerProvider extends PollerProvider { - DefaultPollerProvider() { } + DefaultPollerProvider(Poller.Mode mode) { + if (mode != Poller.Mode.SYSTEM_THREADS) { + throw new UnsupportedOperationException(); + } + super(mode); + } + + DefaultPollerProvider() { + this(Poller.Mode.SYSTEM_THREADS); + } @Override - int defaultReadPollers(Poller.Mode mode) { - assert mode == Poller.Mode.SYSTEM_THREADS; + int defaultReadPollers() { int ncpus = Runtime.getRuntime().availableProcessors(); return Math.max(Integer.highestOneBit(ncpus / 8), 1); } @@ -46,13 +54,15 @@ class DefaultPollerProvider extends PollerProvider { @Override Poller readPoller(boolean subPoller) throws IOException { - assert !subPoller; + if (subPoller) + throw new UnsupportedOperationException(); return new WEPollPoller(true); } @Override Poller writePoller(boolean subPoller) throws IOException { - assert !subPoller; + if (subPoller) + throw new UnsupportedOperationException(); return new WEPollPoller(false); } } diff --git a/src/java.base/windows/classes/sun/nio/ch/NativeThread.java b/src/java.base/windows/classes/sun/nio/ch/NativeThread.java index 1870c95494f..28d8c6303d9 100644 --- a/src/java.base/windows/classes/sun/nio/ch/NativeThread.java +++ b/src/java.base/windows/classes/sun/nio/ch/NativeThread.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2002, 2023, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2002, 2025, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -25,47 +25,29 @@ package sun.nio.ch; - -// Signalling operations on native threads +import java.util.concurrent.locks.LockSupport; public class NativeThread { - private static final long VIRTUAL_THREAD_ID = -1L; + private NativeThread() { } /** - * Returns the id of the current native thread if the platform can signal - * native threads, 0 if the platform can not signal native threads, or - * -1L if the current thread is a virtual thread. + * Returns the Thread to signal the current thread or {@code null} if the current + * thread cannot be signalled. */ - public static long current() { - if (Thread.currentThread().isVirtual()) { - return VIRTUAL_THREAD_ID; + public static Thread threadToSignal() { + Thread thread = Thread.currentThread(); + return thread.isVirtual() ? thread : null; + } + + /** + * Signals the given thread. + * @throws UnsupportedOperationException is not supported + */ + public static void signal(Thread thread) { + if (thread.isVirtual()) { + LockSupport.unpark(thread); } else { - // no support for signalling threads on Windows - return 0; + throw new UnsupportedOperationException(); } } - - /** - * Signals the given native thread. - * - * @throws IllegalArgumentException if tid is not a token to a native thread - */ - static void signal(long tid) { - throw new UnsupportedOperationException(); - } - - /** - * Returns true the tid is the id of a native thread. - */ - static boolean isNativeThread(long tid) { - return false; - } - - /** - * Returns true if tid is -1L. - * @see #current() - */ - static boolean isVirtualThread(long tid) { - return (tid == VIRTUAL_THREAD_ID); - } } diff --git a/src/java.base/windows/classes/sun/nio/ch/WEPollPoller.java b/src/java.base/windows/classes/sun/nio/ch/WEPollPoller.java index 3db8d67acc6..9f575072f22 100644 --- a/src/java.base/windows/classes/sun/nio/ch/WEPollPoller.java +++ b/src/java.base/windows/classes/sun/nio/ch/WEPollPoller.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, 2023, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2020, 2025, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -32,27 +32,41 @@ import static sun.nio.ch.WEPoll.*; */ class WEPollPoller extends Poller { private static final int MAX_EVENTS_TO_POLL = 256; - private static final int ENOENT = 2; private final long handle; private final int event; private final long address; WEPollPoller(boolean read) throws IOException { - this.handle = WEPoll.create(); + long handle = WEPoll.create(); + long address; + try { + address = WEPoll.allocatePollArray(MAX_EVENTS_TO_POLL); + } catch (Throwable e) { + WEPoll.close(handle); + throw e; + } + this.event = (read) ? EPOLLIN : EPOLLOUT; - this.address = WEPoll.allocatePollArray(MAX_EVENTS_TO_POLL); + this.handle = handle; + this.address = address; } @Override - void implRegister(int fdVal) throws IOException { + void close() { + WEPoll.close(handle); + WEPoll.freePollArray(address); + } + + @Override + void implStartPoll(int fdVal) throws IOException { int err = WEPoll.ctl(handle, EPOLL_CTL_ADD, fdVal, (event | EPOLLONESHOT)); if (err != 0) throw new IOException("epoll_ctl failed: " + err); } @Override - void implDeregister(int fdVal, boolean polled) { + void implStopPoll(int fdVal, boolean polled) { WEPoll.ctl(handle, EPOLL_CTL_DEL, fdVal, 0); } diff --git a/src/java.base/windows/native/libnio/ch/IOUtil.c b/src/java.base/windows/native/libnio/ch/IOUtil.c index 850c237d9e9..a6b81b7afec 100644 --- a/src/java.base/windows/native/libnio/ch/IOUtil.c +++ b/src/java.base/windows/native/libnio/ch/IOUtil.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2000, 2023, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2000, 2025, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -134,11 +134,10 @@ Java_sun_nio_ch_IOUtil_setfdVal(JNIEnv *env, jclass clazz, jobject fdo, jint val JNIEXPORT void JNICALL Java_sun_nio_ch_IOUtil_configureBlocking(JNIEnv *env, jclass clazz, - jobject fdo, jboolean blocking) + jint fd, jboolean blocking) { u_long argp; int result = 0; - jint fd = fdval(env, fdo); if (blocking == JNI_FALSE) { argp = SET_NONBLOCKING; diff --git a/src/jdk.sctp/unix/classes/sun/nio/ch/sctp/SctpChannelImpl.java b/src/jdk.sctp/unix/classes/sun/nio/ch/sctp/SctpChannelImpl.java index 485ebf13f2c..263aa58e098 100644 --- a/src/jdk.sctp/unix/classes/sun/nio/ch/sctp/SctpChannelImpl.java +++ b/src/jdk.sctp/unix/classes/sun/nio/ch/sctp/SctpChannelImpl.java @@ -84,9 +84,9 @@ public class SctpChannelImpl extends SctpChannel private final int fdVal; - /* IDs of native threads doing send and receive, for signalling */ - private volatile long receiverThread; - private volatile long senderThread; + /* Threads doing send and receive, for signalling */ + private volatile Thread receiverThread; + private volatile Thread senderThread; /* Lock held by current receiving or connecting thread */ private final Object receiveLock = new Object(); @@ -326,7 +326,7 @@ public class SctpChannelImpl extends SctpChannel private void receiverCleanup() throws IOException { synchronized (stateLock) { - receiverThread = 0; + receiverThread = null; if (state == ChannelState.KILLPENDING) kill(); } @@ -334,7 +334,7 @@ public class SctpChannelImpl extends SctpChannel private void senderCleanup() throws IOException { synchronized (stateLock) { - senderThread = 0; + senderThread = null; if (state == ChannelState.KILLPENDING) kill(); } @@ -367,7 +367,7 @@ public class SctpChannelImpl extends SctpChannel if (!isOpen()) { return false; } - receiverThread = NativeThread.current(); + receiverThread = NativeThread.threadToSignal(); } for (;;) { InetAddress ia = isa.getAddress(); @@ -472,7 +472,7 @@ public class SctpChannelImpl extends SctpChannel if (!isOpen()) { return false; } - receiverThread = NativeThread.current(); + receiverThread = NativeThread.threadToSignal(); } if (!isBlocking()) { connected = Net.pollConnect(fd, 0); @@ -484,7 +484,7 @@ public class SctpChannelImpl extends SctpChannel } } finally { synchronized (stateLock) { - receiverThread = 0; + receiverThread = null; if (state == ChannelState.KILLPENDING) { kill(); connected = false; @@ -541,10 +541,10 @@ public class SctpChannelImpl extends SctpChannel if (state != ChannelState.KILLED) SctpNet.preClose(fdVal); - if (receiverThread != 0) + if (receiverThread != null) NativeThread.signal(receiverThread); - if (senderThread != 0) + if (senderThread != null) NativeThread.signal(senderThread); if (!isRegistered()) @@ -644,7 +644,7 @@ public class SctpChannelImpl extends SctpChannel /* Postpone the kill if there is a waiting reader * or writer thread. */ - if (receiverThread == 0 && senderThread == 0) { + if (receiverThread == null && senderThread == null) { state = ChannelState.KILLED; SctpNet.close(fdVal); } else { @@ -743,7 +743,7 @@ public class SctpChannelImpl extends SctpChannel synchronized (stateLock) { if(!isOpen()) return null; - receiverThread = NativeThread.current(); + receiverThread = NativeThread.threadToSignal(); } do { @@ -936,7 +936,7 @@ public class SctpChannelImpl extends SctpChannel synchronized (stateLock) { if(!isOpen()) return 0; - senderThread = NativeThread.current(); + senderThread = NativeThread.threadToSignal(); } do { @@ -1031,7 +1031,7 @@ public class SctpChannelImpl extends SctpChannel ensureSendOpen(); SctpNet.shutdown(fdVal, -1); - if (senderThread != 0) + if (senderThread != null) NativeThread.signal(senderThread); isShutdown = true; } diff --git a/src/jdk.sctp/unix/classes/sun/nio/ch/sctp/SctpMultiChannelImpl.java b/src/jdk.sctp/unix/classes/sun/nio/ch/sctp/SctpMultiChannelImpl.java index 31e83d72f96..c08c6dc88d0 100644 --- a/src/jdk.sctp/unix/classes/sun/nio/ch/sctp/SctpMultiChannelImpl.java +++ b/src/jdk.sctp/unix/classes/sun/nio/ch/sctp/SctpMultiChannelImpl.java @@ -81,9 +81,9 @@ public class SctpMultiChannelImpl extends SctpMultiChannel private final int fdVal; - /* IDs of native threads doing send and receives, for signalling */ - private volatile long receiverThread; - private volatile long senderThread; + /* Threads doing send and receives, for signalling */ + private volatile Thread receiverThread; + private volatile Thread senderThread; /* Lock held by current receiving thread */ private final Object receiveLock = new Object(); @@ -265,7 +265,7 @@ public class SctpMultiChannelImpl extends SctpMultiChannel private void receiverCleanup() throws IOException { synchronized (stateLock) { - receiverThread = 0; + receiverThread = null; if (state == ChannelState.KILLPENDING) kill(); } @@ -273,7 +273,7 @@ public class SctpMultiChannelImpl extends SctpMultiChannel private void senderCleanup() throws IOException { synchronized (stateLock) { - senderThread = 0; + senderThread = null; if (state == ChannelState.KILLPENDING) kill(); } @@ -290,10 +290,10 @@ public class SctpMultiChannelImpl extends SctpMultiChannel if (state != ChannelState.KILLED) SctpNet.preClose(fdVal); - if (receiverThread != 0) + if (receiverThread != null) NativeThread.signal(receiverThread); - if (senderThread != 0) + if (senderThread != null) NativeThread.signal(senderThread); if (!isRegistered()) @@ -378,7 +378,7 @@ public class SctpMultiChannelImpl extends SctpMultiChannel assert !isOpen() && !isRegistered(); /* Postpone the kill if there is a thread sending or receiving. */ - if (receiverThread == 0 && senderThread == 0) { + if (receiverThread == null && senderThread == null) { state = ChannelState.KILLED; SctpNet.close(fdVal); } else { @@ -484,7 +484,7 @@ public class SctpMultiChannelImpl extends SctpMultiChannel synchronized (stateLock) { if(!isOpen()) return null; - receiverThread = NativeThread.current(); + receiverThread = NativeThread.threadToSignal(); } do { @@ -765,7 +765,7 @@ public class SctpMultiChannelImpl extends SctpMultiChannel synchronized (stateLock) { if(!isOpen()) return 0; - senderThread = NativeThread.current(); + senderThread = NativeThread.threadToSignal(); /* Determine what address or association to send to */ Association assoc = messageInfo.association(); diff --git a/src/jdk.sctp/unix/classes/sun/nio/ch/sctp/SctpServerChannelImpl.java b/src/jdk.sctp/unix/classes/sun/nio/ch/sctp/SctpServerChannelImpl.java index 4b2be742c6d..f72e0938eb5 100644 --- a/src/jdk.sctp/unix/classes/sun/nio/ch/sctp/SctpServerChannelImpl.java +++ b/src/jdk.sctp/unix/classes/sun/nio/ch/sctp/SctpServerChannelImpl.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2009, 2024, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2009, 2025, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -58,8 +58,8 @@ public class SctpServerChannelImpl extends SctpServerChannel private final int fdVal; - /* IDs of native thread doing accept, for signalling */ - private volatile long thread; + /* thread doing accept, for signalling */ + private volatile Thread thread; /* Lock held by thread currently blocked in this channel */ private final Object lock = new Object(); @@ -200,7 +200,7 @@ public class SctpServerChannelImpl extends SctpServerChannel private void acceptCleanup() throws IOException { synchronized (stateLock) { - thread = 0; + thread = null; if (state == ChannelState.KILLPENDING) kill(); } @@ -222,7 +222,7 @@ public class SctpServerChannelImpl extends SctpServerChannel begin(); if (!isOpen()) return null; - thread = NativeThread.current(); + thread = NativeThread.threadToSignal(); for (;;) { n = Net.accept(fd, newfd, isaa); if ((n == IOStatus.INTERRUPTED) && isOpen()) @@ -253,7 +253,7 @@ public class SctpServerChannelImpl extends SctpServerChannel synchronized (stateLock) { if (state != ChannelState.KILLED) SctpNet.preClose(fdVal); - if (thread != 0) + if (thread != null) NativeThread.signal(thread); if (!isRegistered()) kill(); @@ -273,7 +273,7 @@ public class SctpServerChannelImpl extends SctpServerChannel assert !isOpen() && !isRegistered(); // Postpone the kill if there is a thread in accept - if (thread == 0) { + if (thread == null) { state = ChannelState.KILLED; SctpNet.close(fdVal); } else { diff --git a/test/jdk/java/net/vthread/BlockingSocketOps.java b/test/jdk/java/net/vthread/BlockingSocketOps.java index ef58e06b915..d3db734f36a 100644 --- a/test/jdk/java/net/vthread/BlockingSocketOps.java +++ b/test/jdk/java/net/vthread/BlockingSocketOps.java @@ -35,6 +35,7 @@ * @library /test/lib * @run junit/othervm -Djdk.pollerMode=1 BlockingSocketOps * @run junit/othervm -Djdk.pollerMode=2 BlockingSocketOps + * @run junit/othervm -Djdk.pollerMode=3 BlockingSocketOps */ /* diff --git a/test/jdk/java/nio/channels/vthread/BlockingChannelOps.java b/test/jdk/java/nio/channels/vthread/BlockingChannelOps.java index eb2229d927a..7e934301892 100644 --- a/test/jdk/java/nio/channels/vthread/BlockingChannelOps.java +++ b/test/jdk/java/nio/channels/vthread/BlockingChannelOps.java @@ -35,6 +35,7 @@ * @library /test/lib * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 BlockingChannelOps * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 BlockingChannelOps + * @run junit/othervm/timeout=480 -Djdk.pollerMode=3 BlockingChannelOps */ /* diff --git a/test/jdk/java/nio/channels/vthread/SelectorOps.java b/test/jdk/java/nio/channels/vthread/SelectorOps.java index 81821a85791..11fe98a3a01 100644 --- a/test/jdk/java/nio/channels/vthread/SelectorOps.java +++ b/test/jdk/java/nio/channels/vthread/SelectorOps.java @@ -34,6 +34,7 @@ * @library /test/lib * @run junit/othervm/native -Djdk.pollerMode=1 --enable-native-access=ALL-UNNAMED SelectorOps * @run junit/othervm/native -Djdk.pollerMode=2 --enable-native-access=ALL-UNNAMED SelectorOps + * @run junit/othervm/native -Djdk.pollerMode=3 --enable-native-access=ALL-UNNAMED SelectorOps */ /*