8344831: [REDO] CDS: Parallel relocation

Reviewed-by: dholmes, stuefe
This commit is contained in:
Aleksey Shipilev 2024-12-05 12:37:54 +00:00
parent 92e9ac6dc7
commit 84240cc8e0
9 changed files with 352 additions and 15 deletions

View File

@ -856,9 +856,9 @@ void os::pd_start_thread(Thread* thread) {
void os::free_thread(OSThread* osthread) {
assert(osthread != nullptr, "osthread not set");
// We are told to free resources of the argument thread,
// but we can only really operate on the current thread.
assert(Thread::current()->osthread() == osthread,
// We are told to free resources of the argument thread, but we can only really operate
// on the current thread. The current thread may be already detached at this point.
assert(Thread::current_or_null() == nullptr || Thread::current()->osthread() == osthread,
"os::free_thread but not current thread");
// Restore caller's signal mask

View File

@ -763,9 +763,9 @@ void os::pd_start_thread(Thread* thread) {
void os::free_thread(OSThread* osthread) {
assert(osthread != nullptr, "osthread not set");
// We are told to free resources of the argument thread,
// but we can only really operate on the current thread.
assert(Thread::current()->osthread() == osthread,
// We are told to free resources of the argument thread, but we can only really operate
// on the current thread. The current thread may be already detached at this point.
assert(Thread::current_or_null() == nullptr || Thread::current()->osthread() == osthread,
"os::free_thread but not current thread");
// Restore caller's signal mask

View File

@ -1190,9 +1190,9 @@ void os::pd_start_thread(Thread* thread) {
void os::free_thread(OSThread* osthread) {
assert(osthread != nullptr, "osthread not set");
// We are told to free resources of the argument thread,
// but we can only really operate on the current thread.
assert(Thread::current()->osthread() == osthread,
// We are told to free resources of the argument thread, but we can only really operate
// on the current thread. The current thread may be already detached at this point.
assert(Thread::current_or_null() == nullptr || Thread::current()->osthread() == osthread,
"os::free_thread but not current thread");
#ifdef ASSERT

View File

@ -788,9 +788,9 @@ bool os::create_thread(Thread* thread, ThreadType thr_type,
void os::free_thread(OSThread* osthread) {
assert(osthread != nullptr, "osthread not set");
// We are told to free resources of the argument thread,
// but we can only really operate on the current thread.
assert(Thread::current()->osthread() == osthread,
// We are told to free resources of the argument thread, but we can only really operate
// on the current thread. The current thread may be already detached at this point.
assert(Thread::current_or_null() == nullptr || Thread::current()->osthread() == osthread,
"os::free_thread but not current thread");
CloseHandle(osthread->thread_handle());

View File

@ -45,6 +45,7 @@
#include "utilities/debug.hpp"
#include "utilities/formatBuffer.hpp"
#include "utilities/globalDefinitions.hpp"
#include "utilities/spinYield.hpp"
CHeapBitMap* ArchivePtrMarker::_ptrmap = nullptr;
CHeapBitMap* ArchivePtrMarker::_rw_ptrmap = nullptr;
@ -399,3 +400,162 @@ size_t HeapRootSegments::segment_offset(size_t seg_idx) {
return _base_offset + seg_idx * _max_size_in_bytes;
}
ArchiveWorkers::ArchiveWorkers() :
_end_semaphore(0),
_num_workers(max_workers()),
_started_workers(0),
_finish_tokens(0),
_state(UNUSED),
_task(nullptr) {}
ArchiveWorkers::~ArchiveWorkers() {
assert(Atomic::load(&_state) != WORKING, "Should not be working");
}
int ArchiveWorkers::max_workers() {
// The pool is used for short-lived bursty tasks. We do not want to spend
// too much time creating and waking up threads unnecessarily. Plus, we do
// not want to overwhelm large machines. This is why we want to be very
// conservative about the number of workers actually needed.
return MAX2(0, log2i_graceful(os::active_processor_count()));
}
bool ArchiveWorkers::is_parallel() {
return _num_workers > 0;
}
void ArchiveWorkers::start_worker_if_needed() {
while (true) {
int cur = Atomic::load(&_started_workers);
if (cur >= _num_workers) {
return;
}
if (Atomic::cmpxchg(&_started_workers, cur, cur + 1, memory_order_relaxed) == cur) {
new ArchiveWorkerThread(this);
return;
}
}
}
void ArchiveWorkers::run_task(ArchiveWorkerTask* task) {
assert(Atomic::load(&_state) == UNUSED, "Should be unused yet");
assert(Atomic::load(&_task) == nullptr, "Should not have running tasks");
Atomic::store(&_state, WORKING);
if (is_parallel()) {
run_task_multi(task);
} else {
run_task_single(task);
}
assert(Atomic::load(&_state) == WORKING, "Should be working");
Atomic::store(&_state, SHUTDOWN);
}
void ArchiveWorkers::run_task_single(ArchiveWorkerTask* task) {
// Single thread needs no chunking.
task->configure_max_chunks(1);
// Execute the task ourselves, as there are no workers.
task->work(0, 1);
}
void ArchiveWorkers::run_task_multi(ArchiveWorkerTask* task) {
// Multiple threads can work with multiple chunks.
task->configure_max_chunks(_num_workers * CHUNKS_PER_WORKER);
// Set up the run and publish the task. Issue one additional finish token
// to cover the semaphore shutdown path, see below.
Atomic::store(&_finish_tokens, _num_workers + 1);
Atomic::release_store(&_task, task);
// Kick off pool startup by starting a single worker, and proceed
// immediately to executing the task locally.
start_worker_if_needed();
// Execute the task ourselves, while workers are catching up.
// This allows us to hide parts of task handoff latency.
task->run();
// Done executing task locally, wait for any remaining workers to complete.
// Once all workers report, we can proceed to termination. To do this safely,
// we need to make sure every worker has left. A spin-wait alone would suffice,
// but we do not want to burn cycles on it. A semaphore alone would not be safe,
// since workers can still be inside it as we proceed from wait here. So we block
// on semaphore first, and then spin-wait for all workers to terminate.
_end_semaphore.wait();
SpinYield spin;
while (Atomic::load(&_finish_tokens) != 0) {
spin.wait();
}
OrderAccess::fence();
assert(Atomic::load(&_finish_tokens) == 0, "All tokens are consumed");
}
void ArchiveWorkers::run_as_worker() {
assert(is_parallel(), "Should be in parallel mode");
ArchiveWorkerTask* task = Atomic::load_acquire(&_task);
task->run();
// All work done in threads should be visible to caller.
OrderAccess::fence();
// Signal the pool the work is complete, and we are exiting.
// Worker cannot do anything else with the pool after this.
if (Atomic::sub(&_finish_tokens, 1, memory_order_relaxed) == 1) {
// Last worker leaving. Notify the pool it can unblock to spin-wait.
// Then consume the last token and leave.
_end_semaphore.signal();
int last = Atomic::sub(&_finish_tokens, 1, memory_order_relaxed);
assert(last == 0, "Should be");
}
}
void ArchiveWorkerTask::run() {
while (true) {
int chunk = Atomic::load(&_chunk);
if (chunk >= _max_chunks) {
return;
}
if (Atomic::cmpxchg(&_chunk, chunk, chunk + 1, memory_order_relaxed) == chunk) {
assert(0 <= chunk && chunk < _max_chunks, "Sanity");
work(chunk, _max_chunks);
}
}
}
void ArchiveWorkerTask::configure_max_chunks(int max_chunks) {
if (_max_chunks == 0) {
_max_chunks = max_chunks;
}
}
ArchiveWorkerThread::ArchiveWorkerThread(ArchiveWorkers* pool) : NamedThread(), _pool(pool) {
set_name("ArchiveWorkerThread");
if (os::create_thread(this, os::os_thread)) {
os::start_thread(this);
} else {
vm_exit_during_initialization("Unable to create archive worker",
os::native_thread_creation_failed_msg());
}
}
void ArchiveWorkerThread::run() {
// Avalanche startup: each worker starts two others.
_pool->start_worker_if_needed();
_pool->start_worker_if_needed();
// Set ourselves up.
os::set_priority(this, NearMaxPriority);
// Work.
_pool->run_as_worker();
}
void ArchiveWorkerThread::post_run() {
this->NamedThread::post_run();
delete this;
}

View File

@ -33,6 +33,8 @@
#include "utilities/bitMap.hpp"
#include "utilities/exceptions.hpp"
#include "utilities/macros.hpp"
#include "runtime/nonJavaThread.hpp"
#include "runtime/semaphore.hpp"
class BootstrapInfo;
class ReservedSpace;
@ -344,4 +346,74 @@ public:
HeapRootSegments& operator=(const HeapRootSegments&) = default;
};
class ArchiveWorkers;
// A task to be worked on by worker threads
class ArchiveWorkerTask : public CHeapObj<mtInternal> {
friend class ArchiveWorkers;
private:
const char* _name;
int _max_chunks;
volatile int _chunk;
void run();
void configure_max_chunks(int max_chunks);
public:
ArchiveWorkerTask(const char* name) :
_name(name), _max_chunks(0), _chunk(0) {}
const char* name() const { return _name; }
virtual void work(int chunk, int max_chunks) = 0;
};
class ArchiveWorkerThread : public NamedThread {
friend class ArchiveWorkers;
private:
ArchiveWorkers* const _pool;
void post_run() override;
public:
ArchiveWorkerThread(ArchiveWorkers* pool);
const char* type_name() const override { return "Archive Worker Thread"; }
void run() override;
};
// Special archive workers. The goal for this implementation is to startup fast,
// distribute spiky workloads efficiently, and shutdown immediately after use.
// This makes the implementation quite different from the normal GC worker pool.
class ArchiveWorkers : public StackObj {
friend class ArchiveWorkerThread;
private:
// Target number of chunks per worker. This should be large enough to even
// out work imbalance, and small enough to keep bookkeeping overheads low.
static constexpr int CHUNKS_PER_WORKER = 4;
static int max_workers();
Semaphore _end_semaphore;
int _num_workers;
int _started_workers;
int _finish_tokens;
typedef enum { UNUSED, WORKING, SHUTDOWN } State;
volatile State _state;
ArchiveWorkerTask* _task;
void run_as_worker();
void start_worker_if_needed();
void run_task_single(ArchiveWorkerTask* task);
void run_task_multi(ArchiveWorkerTask* task);
bool is_parallel();
public:
ArchiveWorkers();
~ArchiveWorkers();
void run_task(ArchiveWorkerTask* task);
};
#endif // SHARE_CDS_ARCHIVEUTILS_HPP

View File

@ -117,7 +117,10 @@
product(bool, AOTClassLinking, false, \
"Load/link all archived classes for the boot/platform/app " \
"loaders before application main") \
\
product(bool, AOTCacheParallelRelocation, true, DIAGNOSTIC, \
"Use parallel relocation code to speed up startup.") \
\
// end of CDS_FLAGS
DECLARE_FLAGS(CDS_FLAGS)

View File

@ -1975,6 +1975,32 @@ char* FileMapInfo::map_bitmap_region() {
return bitmap_base;
}
class SharedDataRelocationTask : public ArchiveWorkerTask {
private:
BitMapView* const _rw_bm;
BitMapView* const _ro_bm;
SharedDataRelocator* const _rw_reloc;
SharedDataRelocator* const _ro_reloc;
public:
SharedDataRelocationTask(BitMapView* rw_bm, BitMapView* ro_bm, SharedDataRelocator* rw_reloc, SharedDataRelocator* ro_reloc) :
ArchiveWorkerTask("Shared Data Relocation"),
_rw_bm(rw_bm), _ro_bm(ro_bm), _rw_reloc(rw_reloc), _ro_reloc(ro_reloc) {}
void work(int chunk, int max_chunks) override {
work_on(chunk, max_chunks, _rw_bm, _rw_reloc);
work_on(chunk, max_chunks, _ro_bm, _ro_reloc);
}
void work_on(int chunk, int max_chunks, BitMapView* bm, SharedDataRelocator* reloc) {
BitMap::idx_t size = bm->size();
BitMap::idx_t start = MIN2(size, size * chunk / max_chunks);
BitMap::idx_t end = MIN2(size, size * (chunk + 1) / max_chunks);
assert(end > start, "Sanity: no empty slices");
bm->iterate(reloc, start, end);
}
};
// This is called when we cannot map the archive at the requested[ base address (usually 0x800000000).
// We relocate all pointers in the 2 core regions (ro, rw).
bool FileMapInfo::relocate_pointers_in_core_regions(intx addr_delta) {
@ -2013,8 +2039,15 @@ bool FileMapInfo::relocate_pointers_in_core_regions(intx addr_delta) {
valid_new_base, valid_new_end, addr_delta);
SharedDataRelocator ro_patcher((address*)ro_patch_base + header()->ro_ptrmap_start_pos(), (address*)ro_patch_end, valid_old_base, valid_old_end,
valid_new_base, valid_new_end, addr_delta);
rw_ptrmap.iterate(&rw_patcher);
ro_ptrmap.iterate(&ro_patcher);
if (AOTCacheParallelRelocation) {
ArchiveWorkers workers;
SharedDataRelocationTask task(&rw_ptrmap, &ro_ptrmap, &rw_patcher, &ro_patcher);
workers.run_task(&task);
} else {
rw_ptrmap.iterate(&rw_patcher);
ro_ptrmap.iterate(&ro_patcher);
}
// The MetaspaceShared::bm region will be unmapped in MetaspaceShared::initialize_shared_spaces().

View File

@ -0,0 +1,69 @@
/*
* Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*
*/
#include "precompiled.hpp"
#include "cds/archiveUtils.hpp"
#include "unittest.hpp"
class TestArchiveWorkerTask : public ArchiveWorkerTask {
private:
volatile int _sum;
int _max;
public:
TestArchiveWorkerTask() : ArchiveWorkerTask("Test"), _sum(0), _max(0) {}
void work(int chunk, int max_chunks) override {
Atomic::add(&_sum, chunk);
Atomic::store(&_max, max_chunks);
}
int sum() { return Atomic::load(&_sum); }
int max() { return Atomic::load(&_max); }
};
// Test a repeated cycle of workers init/shutdown without task works.
TEST_VM(ArchiveWorkersTest, continuous_restart) {
for (int c = 0; c < 1000; c++) {
ArchiveWorkers workers;
}
}
// Test a repeated cycle of sample task works.
TEST_VM(ArchiveWorkersTest, single_task) {
for (int c = 0; c < 1000; c++) {
TestArchiveWorkerTask task;
ArchiveWorkers workers;
workers.run_task(&task);
ASSERT_EQ(task.max() * (task.max() - 1) / 2, task.sum());
}
}
// Test that reusing the workers fails.
#ifdef ASSERT
TEST_VM_ASSERT_MSG(ArchiveWorkersTest, multiple_tasks, ".* Should be unused yet") {
TestArchiveWorkerTask task;
ArchiveWorkers workers;
workers.run_task(&task);
workers.run_task(&task);
}
#endif // ASSERT