/* * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. Oracle designates this * particular file as subject to the "Classpath" exception as provided * by Oracle in the LICENSE file that accompanied this code. * * This code is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * version 2 for more details (a copy is included in the LICENSE file that * accompanied this code). * * You should have received a copy of the GNU General Public License version * 2 along with this work; if not, write to the Free Software Foundation, * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. * * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA * or visit www.oracle.com if you need additional information or have any * questions. */ /* * This file is available under and governed by the GNU General Public * License version 2 only, as published by the Free Software Foundation. * However, the following notice accompanied the original version of this * file: * * Written by Doug Lea, Bill Scherer, and Michael Scott with * assistance from members of JCP JSR-166 Expert Group and released to * the public domain, as explained at * http://creativecommons.org/publicdomain/zero/1.0/ */ package java.util.concurrent; import jdk.internal.invoke.MhUtil; import java.lang.invoke.MethodHandles; import java.lang.invoke.VarHandle; import java.util.concurrent.locks.LockSupport; /** * A synchronization point at which threads can pair and swap elements * within pairs. Each thread presents some object on entry to the * {@link #exchange exchange} method, matches with a partner thread, * and receives its partner's object on return. An Exchanger may be * viewed as a bidirectional form of a {@link SynchronousQueue}. * Exchangers may be useful in applications such as genetic algorithms * and pipeline designs. * *

Sample Usage: * Here are the highlights of a class that uses an {@code Exchanger} * to swap buffers between threads so that the thread filling the * buffer gets a freshly emptied one when it needs it, handing off the * filled one to the thread emptying the buffer. *

 {@code
 * class FillAndEmpty {
 *   Exchanger exchanger = new Exchanger<>();
 *   DataBuffer initialEmptyBuffer = ...; // a made-up type
 *   DataBuffer initialFullBuffer = ...;
 *
 *   class FillingLoop implements Runnable {
 *     public void run() {
 *       DataBuffer currentBuffer = initialEmptyBuffer;
 *       try {
 *         while (currentBuffer != null) {
 *           addToBuffer(currentBuffer);
 *           if (currentBuffer.isFull())
 *             currentBuffer = exchanger.exchange(currentBuffer);
 *         }
 *       } catch (InterruptedException ex) { ... handle ...}
 *     }
 *   }
 *
 *   class EmptyingLoop implements Runnable {
 *     public void run() {
 *       DataBuffer currentBuffer = initialFullBuffer;
 *       try {
 *         while (currentBuffer != null) {
 *           takeFromBuffer(currentBuffer);
 *           if (currentBuffer.isEmpty())
 *             currentBuffer = exchanger.exchange(currentBuffer);
 *         }
 *       } catch (InterruptedException ex) { ... handle ...}
 *     }
 *   }
 *
 *   void start() {
 *     new Thread(new FillingLoop()).start();
 *     new Thread(new EmptyingLoop()).start();
 *   }
 * }}
* *

Memory consistency effects: For each pair of threads that * successfully exchange objects via an {@code Exchanger}, actions * prior to the {@code exchange()} in each thread * happen-before * those subsequent to a return from the corresponding {@code exchange()} * in the other thread. * * @since 1.5 * @author Doug Lea and Bill Scherer and Michael Scott * @param The type of objects that may be exchanged */ public class Exchanger { /* * Overview: The core algorithm is, for an exchange "slot", * and a participant (caller) with an item: * * for (;;) { * if (slot is empty) { // offer * place item in a Node; * if (can CAS slot from empty to node) { * wait for release; * return matching item in node; * } * } * else if (can CAS slot from node to empty) { // release * get the item in node; * set matching item in node; * release waiting thread; * } * // else retry on CAS failure * } * * This is among the simplest forms of a "dual data structure" -- * see Scott and Scherer's DISC 04 paper and * http://www.cs.rochester.edu/research/synchronization/pseudocode/duals.html * * This works great in principle. But in practice, like many * algorithms centered on atomic updates to a single location, it * scales horribly when there are more than a few participants * using the same Exchanger. So the implementation instead uses a * form of elimination arena, that spreads out this contention by * arranging that some threads typically use different slots, * while still ensuring that eventually, any two parties will be * able to exchange items. That is, we cannot completely partition * across threads, but instead give threads arena indices that * will on average grow under contention and shrink under lack of * contention. * * We approach this by defining the Nodes holding references to * transfered items as ThreadLocals, and include in them * per-thread index and related bookkeeping state. We can safely * reuse per-thread nodes rather than creating them fresh each * time because slots alternate between pointing to a node vs * null, so cannot encounter ABA problems. However, we must ensure * that object transfer fields are reset between uses. Given this, * Participant nodes can be defined as static ThreadLocals. As * seen for example in class Striped64, using indices established * in one instance across others usually improves overall * performance. Nodes also include a participant-local random * number generator. * * Spreading out contention requires that the memory locations * used by the arena slots don't share a cache line -- otherwise, * the arena would have almost no benefit. We arrange this by * adding another level of indirection: The arena elements point * to "Slots", each of which is padded using @Contended. We only * create a single Slot on intialization, adding more when * needed. The per-thread Participant Nodes may also be subject to * false-sharing contention, but tend to be more scattered in * memory, so are unpadded, with some occasional performance impact. * * The arena starts out with only one used slot. We expand the * effective arena size by tracking collisions; i.e., failed CASes * while trying to exchange. And shrink it via "spinouts" in which * threads give up waiting at a slot. By nature of the above * algorithm, the only kinds of collision that reliably indicate * contention are when two attempted releases collide -- one of * two attempted offers can legitimately fail to CAS without * indicating contention by more than one other thread. * * Arena size (the value of field "bound") is controlled by random * sampling. On each miss (collision or spinout), a thread chooses * a new random index within the arena. Upon the third collision * with the same current bound, it tries to grow the arena. And * upon the second spinout, it tries to shrink. The asymmetry in * part reflects relative costs, and reduces flailing. Because * they cannot be changed without also changing the sampling * strategy, these rules are directly incorporated into uses of * the xchg "misses" variable. The bound field is tagged with * sequence numbers to reduce stale decisions. Uniform random * indices are generated using XorShift with enough bits so that * bias (See Knuth TAoCP vol 2) is negligible for moduli used here * (at most 256) without requiring rejection tests. Using * nonuniform randoms with greater weight to higher indices is * also possible but does not seem worthwhile in practice. * * These mechanics rely on a reasonable choice of constant SPINS. * The time cost of SPINS * Thread.onSpinWait() should be at least * the expected cost of a park/unpark context switch, and larger * than that of two failed CASes, but still small enough to avoid * excessive delays during arena shrinkage. We also deal with the * possibility that when an offering thread waits for a release, * spin-waiting would be useless because the releasing thread is * descheduled. On multiprocessors, we cannot know this in * general. But when Virtual Threads are used, method * ForkJoinWorkerThread.hasKnownQueuedWork serves as a guide to * whether to spin or immediately block, allowing a context switch * that may enable a releaser. Note also that when many threads * are being run on few cores, enountering enough collisions to * trigger arena growth is rare, and soon followed by shrinkage, * so this doesn't require special handling. * * The basic exchange mechanics rely on checks that Node item * fields are not null, which doesn't work when offered items are * null. We trap this case by translating nulls to the * (un-Exchangeable) value of the static Participant * reference. * * Essentially all of the implementation is in method xchg. As is * too common in this sort of code, most of the logic relies on * reads of fields that are maintained as local variables so can't * be nicely factored. It is structured as a main loop with a * leading volatile read (of field bound), that causes others to * be freshly read even though declared in plain mode. We don't * use compareAndExchange that would otherwise save some re-reads * because of the need to recheck indices and bounds on failures. * * Support for optional timeouts in a single method adds further * complexity. Note that for the sake of arena bounds control, * time bounds must be ignored during spinouts, which may delay * TimeoutExceptions (but no more so than would excessive context * switching that could occur otherwise). Responses to * interruption are handled similarly, postponing commitment to * throw InterruptedException until successfully cancelled. * * Design differences from previous releases include: * * Accommodation of VirtualThreads. * * Use of Slots vs spaced indices for the arena and static * ThreadLocals, avoiding separate arena vs non-arena modes. * * Use of random sampling for grow/shrink decisions, with typically * faster and more stable adaptation (as was mentioned as a * possible improvement in previous version). */ /** * The maximum supported arena index. The maximum allocatable * arena size is MMASK + 1. Must be a power of two minus one. The * cap of 255 (0xff) more than suffices for the expected scaling * limits of the main algorithms. */ private static final int MMASK = 0xff; /** * Unit for sequence/version bits of bound field. Each successful * change to the bound also adds SEQ. */ private static final int SEQ = MMASK + 1; /** * The bound for spins while waiting for a match before either * blocking or possibly shrinking arena. */ private static final int SPINS = 1 << 10; /** * Padded arena cells to avoid false-sharing memory contention */ @jdk.internal.vm.annotation.Contended static final class Slot { Node entry; } /** * Nodes hold partially exchanged data, plus other per-thread * bookkeeping. */ static final class Node { long seed; // Random seed int index; // Arena index Object item; // This thread's current item volatile Object match; // Item provided by releasing thread volatile Thread parked; // Set to this thread when parked, else null Node() { index = -1; // initialize on first use seed = Thread.currentThread().threadId(); } } /** The corresponding thread local class */ static final class Participant extends ThreadLocal { public Node initialValue() { return new Node(); } } /** * The participant thread-locals. Because it is impossible to * exchange, we also use this reference for dealing with null user * arguments that are translated in and out of this value * surrounding use. */ private static final Participant participant = new Participant(); /** * Elimination array; element accesses use emulation of volatile * gets and CAS. */ private final Slot[] arena; /** * Number of cores, for sizing and spin control. Computed only * upon construction. */ private final int ncpu; /** * The index of the largest valid arena position. */ private volatile int bound; /** * Exchange function. See above for explanation. * * @param x the item to exchange * @param deadline if zero, untimed, else timeout deadline * @return the other thread's item * @throws InterruptedException if interrupted while waiting * @throws TimeoutException if deadline nonzero and timed out */ private final V xchg(V x, long deadline) throws InterruptedException, TimeoutException { Slot[] a = arena; int alen = a.length; Participant ps = participant; Object item = (x == null) ? ps : x; // translate nulls Node p = ps.get(); int i = p.index; // if < 0, move int misses = 0; // ++ on collide, -- on spinout Object offered = null; // for cleanup Object v = null; outer: for (;;) { int b, m; Slot s; Node q; if ((m = (b = bound) & MMASK) == 0) // volatile read i = 0; if (i < 0 || i > m || i >= alen || (s = a[i]) == null) { long r = p.seed; // randomly move r ^= r << 13; r ^= r >>> 7; r ^= r << 17; // xorShift i = p.index = (int)((p.seed = r) % (m + 1)); } else if ((q = s.entry) != null) { // try release if (ENTRY.compareAndSet(s, q, null)) { Thread w; v = q.item; q.match = item; if (i == 0 && (w = q.parked) != null) LockSupport.unpark(w); break; } else { // collision int nb; i = -1; // move index if (b != bound) // stale misses = 0; else if (misses <= 2) // continue sampling ++misses; else if ((nb = (b + 1) & MMASK) < alen) { misses = 0; // try to grow if (BOUND.compareAndSet(this, b, b + 1 + SEQ) && a[i = p.index = nb] == null) AA.compareAndSet(a, nb, null, new Slot()); } } } else { // try offer if (offered == null) offered = p.item = item; if (ENTRY.compareAndSet(s, null, p)) { boolean tryCancel; // true if interrupted Thread t = Thread.currentThread(); if (!(tryCancel = t.isInterrupted()) && ncpu > 1 && (i != 0 || // check for busy VTs (!ForkJoinWorkerThread.hasKnownQueuedWork()))) { for (int j = SPINS; j > 0; --j) { if ((v = p.match) != null) { MATCH.set(p, null); break outer; // spin wait } Thread.onSpinWait(); } } for (long ns = 1L;;) { // block or cancel offer if ((v = p.match) != null) { MATCH.set(p, null); break outer; } if (i == 0 && !tryCancel && (deadline == 0L || ((ns = deadline - System.nanoTime()) > 0L))) { p.parked = t; // emable unpark and recheck if (p.match == null) { if (deadline == 0L) LockSupport.park(this); else LockSupport.parkNanos(this, ns); tryCancel = t.isInterrupted(); } p.parked = null; } else if (ENTRY.compareAndSet(s, p, null)) { // cancel offered = p.item = null; if (Thread.interrupted()) throw new InterruptedException(); if (deadline != 0L && ns <= 0L) throw new TimeoutException(); i = -1; // move and restart if (bound != b) misses = 0; // stale else if (misses >= 0) --misses; // continue sampling else if ((b & MMASK) != 0) { misses = 0; // try to shrink BOUND.compareAndSet(this, b, b - 1 + SEQ); } continue outer; } } } } } if (offered != null) // cleanup p.item = null; @SuppressWarnings("unchecked") V ret = (v == participant) ? null : (V)v; return ret; } /** * Creates a new Exchanger. */ public Exchanger() { int h = (ncpu = Runtime.getRuntime().availableProcessors()) >>> 1; int size = (h == 0) ? 1 : (h > MMASK) ? MMASK + 1 : h; (arena = new Slot[size])[0] = new Slot(); } /** * Waits for another thread to arrive at this exchange point (unless * the current thread is {@linkplain Thread#interrupt interrupted}), * and then transfers the given object to it, receiving its object * in return. * *

If another thread is already waiting at the exchange point then * it is resumed for thread scheduling purposes and receives the object * passed in by the current thread. The current thread returns immediately, * receiving the object passed to the exchange by that other thread. * *

If no other thread is already waiting at the exchange then the * current thread is disabled for thread scheduling purposes and lies * dormant until one of two things happens: *

*

If the current thread: *

* then {@link InterruptedException} is thrown and the current thread's * interrupted status is cleared. * * @param x the object to exchange * @return the object provided by the other thread * @throws InterruptedException if the current thread was * interrupted while waiting */ public V exchange(V x) throws InterruptedException { try { return xchg(x, 0L); } catch (TimeoutException cannotHappen) { return null; // not reached } } /** * Waits for another thread to arrive at this exchange point (unless * the current thread is {@linkplain Thread#interrupt interrupted} or * the specified waiting time elapses), and then transfers the given * object to it, receiving its object in return. * *

If another thread is already waiting at the exchange point then * it is resumed for thread scheduling purposes and receives the object * passed in by the current thread. The current thread returns immediately, * receiving the object passed to the exchange by that other thread. * *

If no other thread is already waiting at the exchange then the * current thread is disabled for thread scheduling purposes and lies * dormant until one of three things happens: *

*

If the current thread: *

* then {@link InterruptedException} is thrown and the current thread's * interrupted status is cleared. * *

If the specified waiting time elapses then {@link * TimeoutException} is thrown. If the time is less than or equal * to zero, the method will not wait at all. * * @param x the object to exchange * @param timeout the maximum time to wait * @param unit the time unit of the {@code timeout} argument * @return the object provided by the other thread * @throws InterruptedException if the current thread was * interrupted while waiting * @throws TimeoutException if the specified waiting time elapses * before another thread enters the exchange */ public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { long d = unit.toNanos(timeout) + System.nanoTime(); return xchg(x, (d == 0L) ? 1L : d); // avoid zero deadline } // VarHandle mechanics private static final VarHandle BOUND; private static final VarHandle MATCH; private static final VarHandle ENTRY; private static final VarHandle AA; static { MethodHandles.Lookup l = MethodHandles.lookup(); BOUND = MhUtil.findVarHandle(l, "bound", int.class); MATCH = MhUtil.findVarHandle(l, Node.class, "match", Object.class); ENTRY = MhUtil.findVarHandle(l, Slot.class, "entry", Node.class); AA = MethodHandles.arrayElementVarHandle(Slot[].class); } }