diff --git a/src/hotspot/os/aix/os_aix.cpp b/src/hotspot/os/aix/os_aix.cpp index 44b9571e102..26627c2f8fb 100644 --- a/src/hotspot/os/aix/os_aix.cpp +++ b/src/hotspot/os/aix/os_aix.cpp @@ -856,9 +856,9 @@ void os::pd_start_thread(Thread* thread) { void os::free_thread(OSThread* osthread) { assert(osthread != nullptr, "osthread not set"); - // We are told to free resources of the argument thread, - // but we can only really operate on the current thread. - assert(Thread::current()->osthread() == osthread, + // We are told to free resources of the argument thread, but we can only really operate + // on the current thread. The current thread may be already detached at this point. + assert(Thread::current_or_null() == nullptr || Thread::current()->osthread() == osthread, "os::free_thread but not current thread"); // Restore caller's signal mask diff --git a/src/hotspot/os/bsd/os_bsd.cpp b/src/hotspot/os/bsd/os_bsd.cpp index 8185797563c..5db846275d4 100644 --- a/src/hotspot/os/bsd/os_bsd.cpp +++ b/src/hotspot/os/bsd/os_bsd.cpp @@ -763,9 +763,9 @@ void os::pd_start_thread(Thread* thread) { void os::free_thread(OSThread* osthread) { assert(osthread != nullptr, "osthread not set"); - // We are told to free resources of the argument thread, - // but we can only really operate on the current thread. - assert(Thread::current()->osthread() == osthread, + // We are told to free resources of the argument thread, but we can only really operate + // on the current thread. The current thread may be already detached at this point. + assert(Thread::current_or_null() == nullptr || Thread::current()->osthread() == osthread, "os::free_thread but not current thread"); // Restore caller's signal mask diff --git a/src/hotspot/os/linux/os_linux.cpp b/src/hotspot/os/linux/os_linux.cpp index d85120f3f1d..d159118016a 100644 --- a/src/hotspot/os/linux/os_linux.cpp +++ b/src/hotspot/os/linux/os_linux.cpp @@ -1190,9 +1190,9 @@ void os::pd_start_thread(Thread* thread) { void os::free_thread(OSThread* osthread) { assert(osthread != nullptr, "osthread not set"); - // We are told to free resources of the argument thread, - // but we can only really operate on the current thread. - assert(Thread::current()->osthread() == osthread, + // We are told to free resources of the argument thread, but we can only really operate + // on the current thread. The current thread may be already detached at this point. + assert(Thread::current_or_null() == nullptr || Thread::current()->osthread() == osthread, "os::free_thread but not current thread"); #ifdef ASSERT diff --git a/src/hotspot/os/windows/os_windows.cpp b/src/hotspot/os/windows/os_windows.cpp index 849fc0c29f0..afd8fe01752 100644 --- a/src/hotspot/os/windows/os_windows.cpp +++ b/src/hotspot/os/windows/os_windows.cpp @@ -788,9 +788,9 @@ bool os::create_thread(Thread* thread, ThreadType thr_type, void os::free_thread(OSThread* osthread) { assert(osthread != nullptr, "osthread not set"); - // We are told to free resources of the argument thread, - // but we can only really operate on the current thread. - assert(Thread::current()->osthread() == osthread, + // We are told to free resources of the argument thread, but we can only really operate + // on the current thread. The current thread may be already detached at this point. + assert(Thread::current_or_null() == nullptr || Thread::current()->osthread() == osthread, "os::free_thread but not current thread"); CloseHandle(osthread->thread_handle()); diff --git a/src/hotspot/share/cds/archiveUtils.cpp b/src/hotspot/share/cds/archiveUtils.cpp index 4d717879e0f..3530fcff2b3 100644 --- a/src/hotspot/share/cds/archiveUtils.cpp +++ b/src/hotspot/share/cds/archiveUtils.cpp @@ -45,6 +45,7 @@ #include "utilities/debug.hpp" #include "utilities/formatBuffer.hpp" #include "utilities/globalDefinitions.hpp" +#include "utilities/spinYield.hpp" CHeapBitMap* ArchivePtrMarker::_ptrmap = nullptr; CHeapBitMap* ArchivePtrMarker::_rw_ptrmap = nullptr; @@ -399,3 +400,162 @@ size_t HeapRootSegments::segment_offset(size_t seg_idx) { return _base_offset + seg_idx * _max_size_in_bytes; } +ArchiveWorkers::ArchiveWorkers() : + _end_semaphore(0), + _num_workers(max_workers()), + _started_workers(0), + _finish_tokens(0), + _state(UNUSED), + _task(nullptr) {} + +ArchiveWorkers::~ArchiveWorkers() { + assert(Atomic::load(&_state) != WORKING, "Should not be working"); +} + +int ArchiveWorkers::max_workers() { + // The pool is used for short-lived bursty tasks. We do not want to spend + // too much time creating and waking up threads unnecessarily. Plus, we do + // not want to overwhelm large machines. This is why we want to be very + // conservative about the number of workers actually needed. + return MAX2(0, log2i_graceful(os::active_processor_count())); +} + +bool ArchiveWorkers::is_parallel() { + return _num_workers > 0; +} + +void ArchiveWorkers::start_worker_if_needed() { + while (true) { + int cur = Atomic::load(&_started_workers); + if (cur >= _num_workers) { + return; + } + if (Atomic::cmpxchg(&_started_workers, cur, cur + 1, memory_order_relaxed) == cur) { + new ArchiveWorkerThread(this); + return; + } + } +} + +void ArchiveWorkers::run_task(ArchiveWorkerTask* task) { + assert(Atomic::load(&_state) == UNUSED, "Should be unused yet"); + assert(Atomic::load(&_task) == nullptr, "Should not have running tasks"); + Atomic::store(&_state, WORKING); + + if (is_parallel()) { + run_task_multi(task); + } else { + run_task_single(task); + } + + assert(Atomic::load(&_state) == WORKING, "Should be working"); + Atomic::store(&_state, SHUTDOWN); +} + +void ArchiveWorkers::run_task_single(ArchiveWorkerTask* task) { + // Single thread needs no chunking. + task->configure_max_chunks(1); + + // Execute the task ourselves, as there are no workers. + task->work(0, 1); +} + +void ArchiveWorkers::run_task_multi(ArchiveWorkerTask* task) { + // Multiple threads can work with multiple chunks. + task->configure_max_chunks(_num_workers * CHUNKS_PER_WORKER); + + // Set up the run and publish the task. Issue one additional finish token + // to cover the semaphore shutdown path, see below. + Atomic::store(&_finish_tokens, _num_workers + 1); + Atomic::release_store(&_task, task); + + // Kick off pool startup by starting a single worker, and proceed + // immediately to executing the task locally. + start_worker_if_needed(); + + // Execute the task ourselves, while workers are catching up. + // This allows us to hide parts of task handoff latency. + task->run(); + + // Done executing task locally, wait for any remaining workers to complete. + // Once all workers report, we can proceed to termination. To do this safely, + // we need to make sure every worker has left. A spin-wait alone would suffice, + // but we do not want to burn cycles on it. A semaphore alone would not be safe, + // since workers can still be inside it as we proceed from wait here. So we block + // on semaphore first, and then spin-wait for all workers to terminate. + _end_semaphore.wait(); + SpinYield spin; + while (Atomic::load(&_finish_tokens) != 0) { + spin.wait(); + } + + OrderAccess::fence(); + + assert(Atomic::load(&_finish_tokens) == 0, "All tokens are consumed"); +} + +void ArchiveWorkers::run_as_worker() { + assert(is_parallel(), "Should be in parallel mode"); + + ArchiveWorkerTask* task = Atomic::load_acquire(&_task); + task->run(); + + // All work done in threads should be visible to caller. + OrderAccess::fence(); + + // Signal the pool the work is complete, and we are exiting. + // Worker cannot do anything else with the pool after this. + if (Atomic::sub(&_finish_tokens, 1, memory_order_relaxed) == 1) { + // Last worker leaving. Notify the pool it can unblock to spin-wait. + // Then consume the last token and leave. + _end_semaphore.signal(); + int last = Atomic::sub(&_finish_tokens, 1, memory_order_relaxed); + assert(last == 0, "Should be"); + } +} + +void ArchiveWorkerTask::run() { + while (true) { + int chunk = Atomic::load(&_chunk); + if (chunk >= _max_chunks) { + return; + } + if (Atomic::cmpxchg(&_chunk, chunk, chunk + 1, memory_order_relaxed) == chunk) { + assert(0 <= chunk && chunk < _max_chunks, "Sanity"); + work(chunk, _max_chunks); + } + } +} + +void ArchiveWorkerTask::configure_max_chunks(int max_chunks) { + if (_max_chunks == 0) { + _max_chunks = max_chunks; + } +} + +ArchiveWorkerThread::ArchiveWorkerThread(ArchiveWorkers* pool) : NamedThread(), _pool(pool) { + set_name("ArchiveWorkerThread"); + if (os::create_thread(this, os::os_thread)) { + os::start_thread(this); + } else { + vm_exit_during_initialization("Unable to create archive worker", + os::native_thread_creation_failed_msg()); + } +} + +void ArchiveWorkerThread::run() { + // Avalanche startup: each worker starts two others. + _pool->start_worker_if_needed(); + _pool->start_worker_if_needed(); + + // Set ourselves up. + os::set_priority(this, NearMaxPriority); + + // Work. + _pool->run_as_worker(); +} + +void ArchiveWorkerThread::post_run() { + this->NamedThread::post_run(); + delete this; +} diff --git a/src/hotspot/share/cds/archiveUtils.hpp b/src/hotspot/share/cds/archiveUtils.hpp index 723332c40e5..a10117e9f9a 100644 --- a/src/hotspot/share/cds/archiveUtils.hpp +++ b/src/hotspot/share/cds/archiveUtils.hpp @@ -33,6 +33,8 @@ #include "utilities/bitMap.hpp" #include "utilities/exceptions.hpp" #include "utilities/macros.hpp" +#include "runtime/nonJavaThread.hpp" +#include "runtime/semaphore.hpp" class BootstrapInfo; class ReservedSpace; @@ -344,4 +346,74 @@ public: HeapRootSegments& operator=(const HeapRootSegments&) = default; }; +class ArchiveWorkers; + +// A task to be worked on by worker threads +class ArchiveWorkerTask : public CHeapObj { + friend class ArchiveWorkers; +private: + const char* _name; + int _max_chunks; + volatile int _chunk; + + void run(); + + void configure_max_chunks(int max_chunks); + +public: + ArchiveWorkerTask(const char* name) : + _name(name), _max_chunks(0), _chunk(0) {} + const char* name() const { return _name; } + virtual void work(int chunk, int max_chunks) = 0; +}; + +class ArchiveWorkerThread : public NamedThread { + friend class ArchiveWorkers; +private: + ArchiveWorkers* const _pool; + + void post_run() override; + +public: + ArchiveWorkerThread(ArchiveWorkers* pool); + const char* type_name() const override { return "Archive Worker Thread"; } + void run() override; +}; + +// Special archive workers. The goal for this implementation is to startup fast, +// distribute spiky workloads efficiently, and shutdown immediately after use. +// This makes the implementation quite different from the normal GC worker pool. +class ArchiveWorkers : public StackObj { + friend class ArchiveWorkerThread; +private: + // Target number of chunks per worker. This should be large enough to even + // out work imbalance, and small enough to keep bookkeeping overheads low. + static constexpr int CHUNKS_PER_WORKER = 4; + static int max_workers(); + + Semaphore _end_semaphore; + + int _num_workers; + int _started_workers; + int _finish_tokens; + + typedef enum { UNUSED, WORKING, SHUTDOWN } State; + volatile State _state; + + ArchiveWorkerTask* _task; + + void run_as_worker(); + void start_worker_if_needed(); + + void run_task_single(ArchiveWorkerTask* task); + void run_task_multi(ArchiveWorkerTask* task); + + bool is_parallel(); + +public: + ArchiveWorkers(); + ~ArchiveWorkers(); + void run_task(ArchiveWorkerTask* task); +}; + #endif // SHARE_CDS_ARCHIVEUTILS_HPP diff --git a/src/hotspot/share/cds/cds_globals.hpp b/src/hotspot/share/cds/cds_globals.hpp index 38f5d8f46a6..811740cfbcb 100644 --- a/src/hotspot/share/cds/cds_globals.hpp +++ b/src/hotspot/share/cds/cds_globals.hpp @@ -117,7 +117,10 @@ product(bool, AOTClassLinking, false, \ "Load/link all archived classes for the boot/platform/app " \ "loaders before application main") \ - + \ + product(bool, AOTCacheParallelRelocation, true, DIAGNOSTIC, \ + "Use parallel relocation code to speed up startup.") \ + \ // end of CDS_FLAGS DECLARE_FLAGS(CDS_FLAGS) diff --git a/src/hotspot/share/cds/filemap.cpp b/src/hotspot/share/cds/filemap.cpp index 91a2a57dee5..b7b08009dcc 100644 --- a/src/hotspot/share/cds/filemap.cpp +++ b/src/hotspot/share/cds/filemap.cpp @@ -1975,6 +1975,32 @@ char* FileMapInfo::map_bitmap_region() { return bitmap_base; } +class SharedDataRelocationTask : public ArchiveWorkerTask { +private: + BitMapView* const _rw_bm; + BitMapView* const _ro_bm; + SharedDataRelocator* const _rw_reloc; + SharedDataRelocator* const _ro_reloc; + +public: + SharedDataRelocationTask(BitMapView* rw_bm, BitMapView* ro_bm, SharedDataRelocator* rw_reloc, SharedDataRelocator* ro_reloc) : + ArchiveWorkerTask("Shared Data Relocation"), + _rw_bm(rw_bm), _ro_bm(ro_bm), _rw_reloc(rw_reloc), _ro_reloc(ro_reloc) {} + + void work(int chunk, int max_chunks) override { + work_on(chunk, max_chunks, _rw_bm, _rw_reloc); + work_on(chunk, max_chunks, _ro_bm, _ro_reloc); + } + + void work_on(int chunk, int max_chunks, BitMapView* bm, SharedDataRelocator* reloc) { + BitMap::idx_t size = bm->size(); + BitMap::idx_t start = MIN2(size, size * chunk / max_chunks); + BitMap::idx_t end = MIN2(size, size * (chunk + 1) / max_chunks); + assert(end > start, "Sanity: no empty slices"); + bm->iterate(reloc, start, end); + } +}; + // This is called when we cannot map the archive at the requested[ base address (usually 0x800000000). // We relocate all pointers in the 2 core regions (ro, rw). bool FileMapInfo::relocate_pointers_in_core_regions(intx addr_delta) { @@ -2013,8 +2039,15 @@ bool FileMapInfo::relocate_pointers_in_core_regions(intx addr_delta) { valid_new_base, valid_new_end, addr_delta); SharedDataRelocator ro_patcher((address*)ro_patch_base + header()->ro_ptrmap_start_pos(), (address*)ro_patch_end, valid_old_base, valid_old_end, valid_new_base, valid_new_end, addr_delta); - rw_ptrmap.iterate(&rw_patcher); - ro_ptrmap.iterate(&ro_patcher); + + if (AOTCacheParallelRelocation) { + ArchiveWorkers workers; + SharedDataRelocationTask task(&rw_ptrmap, &ro_ptrmap, &rw_patcher, &ro_patcher); + workers.run_task(&task); + } else { + rw_ptrmap.iterate(&rw_patcher); + ro_ptrmap.iterate(&ro_patcher); + } // The MetaspaceShared::bm region will be unmapped in MetaspaceShared::initialize_shared_spaces(). diff --git a/test/hotspot/gtest/cds/test_archiveWorkers.cpp b/test/hotspot/gtest/cds/test_archiveWorkers.cpp new file mode 100644 index 00000000000..eb5a4ad83e9 --- /dev/null +++ b/test/hotspot/gtest/cds/test_archiveWorkers.cpp @@ -0,0 +1,69 @@ +/* + * Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + * + */ + +#include "precompiled.hpp" +#include "cds/archiveUtils.hpp" +#include "unittest.hpp" + +class TestArchiveWorkerTask : public ArchiveWorkerTask { +private: + volatile int _sum; + int _max; +public: + TestArchiveWorkerTask() : ArchiveWorkerTask("Test"), _sum(0), _max(0) {} + void work(int chunk, int max_chunks) override { + Atomic::add(&_sum, chunk); + Atomic::store(&_max, max_chunks); + } + int sum() { return Atomic::load(&_sum); } + int max() { return Atomic::load(&_max); } +}; + +// Test a repeated cycle of workers init/shutdown without task works. +TEST_VM(ArchiveWorkersTest, continuous_restart) { + for (int c = 0; c < 1000; c++) { + ArchiveWorkers workers; + } +} + +// Test a repeated cycle of sample task works. +TEST_VM(ArchiveWorkersTest, single_task) { + for (int c = 0; c < 1000; c++) { + TestArchiveWorkerTask task; + ArchiveWorkers workers; + workers.run_task(&task); + ASSERT_EQ(task.max() * (task.max() - 1) / 2, task.sum()); + } +} + +// Test that reusing the workers fails. +#ifdef ASSERT +TEST_VM_ASSERT_MSG(ArchiveWorkersTest, multiple_tasks, ".* Should be unused yet") { + TestArchiveWorkerTask task; + ArchiveWorkers workers; + workers.run_task(&task); + workers.run_task(&task); +} +#endif // ASSERT +