8373649: Convert simple AtomicAccess usage in ConcurrentHashTable to use Atomic<T>

Reviewed-by: tschatzl, iwalulya
This commit is contained in:
Kim Barrett 2025-12-16 04:03:12 +00:00
parent 1748737b99
commit 3f33eaa42a
3 changed files with 58 additions and 55 deletions

View File

@ -26,6 +26,7 @@
#define SHARE_UTILITIES_CONCURRENTHASHTABLE_HPP
#include "memory/allocation.hpp"
#include "runtime/atomic.hpp"
#include "runtime/mutex.hpp"
#include "utilities/globalCounter.hpp"
#include "utilities/globalDefinitions.hpp"
@ -246,7 +247,7 @@ class ConcurrentHashTable : public CHeapObj<MT> {
const size_t _log2_start_size; // Start size.
const size_t _grow_hint; // Number of linked items
volatile bool _size_limit_reached;
Atomic<bool> _size_limit_reached;
// We serialize resizers and other bulk operations which do not support
// concurrent resize with this lock.
@ -255,7 +256,7 @@ class ConcurrentHashTable : public CHeapObj<MT> {
// taking the mutex after a safepoint this bool is the actual state. After
// acquiring the mutex you must check if this is already locked. If so you
// must drop the mutex until the real lock holder grabs the mutex.
volatile Thread* _resize_lock_owner;
Atomic<Thread*> _resize_lock_owner;
// Return true if lock mutex/state succeeded.
bool try_resize_lock(Thread* locker);
@ -273,7 +274,7 @@ class ConcurrentHashTable : public CHeapObj<MT> {
// this field keep tracks if a version of the hash-table was ever been seen.
// We the working thread pointer as tag for debugging. The _invisible_epoch
// can only be used by the owner of _resize_lock.
volatile Thread* _invisible_epoch;
Atomic<Thread*> _invisible_epoch;
// Scoped critical section, which also handles the invisible epochs.
// An invisible epoch/version do not need a write_synchronize().
@ -435,11 +436,11 @@ class ConcurrentHashTable : public CHeapObj<MT> {
size_t get_size_log2(Thread* thread);
static size_t get_node_size() { return sizeof(Node); }
static size_t get_dynamic_node_size(size_t value_size);
bool is_max_size_reached() { return _size_limit_reached; }
bool is_max_size_reached() { return _size_limit_reached.load_relaxed(); }
// This means no paused bucket resize operation is going to resume
// on this table.
bool is_safepoint_safe() { return _resize_lock_owner == nullptr; }
bool is_safepoint_safe() { return _resize_lock_owner.load_relaxed() == nullptr; }
// Re-size operations.
bool shrink(Thread* thread, size_t size_limit_log2 = 0);

View File

@ -29,6 +29,7 @@
#include "cppstdlib/type_traits.hpp"
#include "memory/allocation.inline.hpp"
#include "runtime/atomic.hpp"
#include "runtime/atomicAccess.hpp"
#include "runtime/orderAccess.hpp"
#include "runtime/prefetch.inline.hpp"
@ -221,8 +222,8 @@ inline ConcurrentHashTable<CONFIG, MT>::
_cs_context(GlobalCounter::critical_section_begin(_thread))
{
// This version is published now.
if (AtomicAccess::load_acquire(&_cht->_invisible_epoch) != nullptr) {
AtomicAccess::release_store_fence(&_cht->_invisible_epoch, (Thread*)nullptr);
if (_cht->_invisible_epoch.load_acquire() != nullptr) {
_cht->_invisible_epoch.release_store_fence(nullptr);
}
}
@ -282,16 +283,16 @@ template <typename CONFIG, MemTag MT>
inline void ConcurrentHashTable<CONFIG, MT>::
write_synchonize_on_visible_epoch(Thread* thread)
{
assert(_resize_lock_owner == thread, "Re-size lock not held");
assert(_resize_lock_owner.load_relaxed() == thread, "Re-size lock not held");
OrderAccess::fence(); // Prevent below load from floating up.
// If no reader saw this version we can skip write_synchronize.
if (AtomicAccess::load_acquire(&_invisible_epoch) == thread) {
if (_invisible_epoch.load_acquire() == thread) {
return;
}
assert(_invisible_epoch == nullptr, "Two thread doing bulk operations");
assert(_invisible_epoch.load_relaxed() == nullptr, "Two thread doing bulk operations");
// We set this/next version that we are synchronizing for to not published.
// A reader will zero this flag if it reads this/next version.
AtomicAccess::release_store(&_invisible_epoch, thread);
_invisible_epoch.release_store(thread);
GlobalCounter::write_synchronize();
}
@ -300,8 +301,8 @@ inline bool ConcurrentHashTable<CONFIG, MT>::
try_resize_lock(Thread* locker)
{
if (_resize_lock->try_lock()) {
if (_resize_lock_owner != nullptr) {
assert(locker != _resize_lock_owner, "Already own lock");
if (_resize_lock_owner.load_relaxed() != nullptr) {
assert(locker != _resize_lock_owner.load_relaxed(), "Already own lock");
// We got mutex but internal state is locked.
_resize_lock->unlock();
return false;
@ -309,8 +310,8 @@ inline bool ConcurrentHashTable<CONFIG, MT>::
} else {
return false;
}
_invisible_epoch = nullptr;
_resize_lock_owner = locker;
_invisible_epoch.store_relaxed(nullptr);
_resize_lock_owner.store_relaxed(locker);
return true;
}
@ -326,8 +327,8 @@ inline void ConcurrentHashTable<CONFIG, MT>::
_resize_lock->lock_without_safepoint_check();
// If holder of lock dropped mutex for safepoint mutex might be unlocked,
// and _resize_lock_owner will contain the owner.
if (_resize_lock_owner != nullptr) {
assert(locker != _resize_lock_owner, "Already own lock");
if (_resize_lock_owner.load_relaxed() != nullptr) {
assert(locker != _resize_lock_owner.load_relaxed(), "Already own lock");
// We got mutex but internal state is locked.
_resize_lock->unlock();
yield.wait();
@ -335,17 +336,17 @@ inline void ConcurrentHashTable<CONFIG, MT>::
break;
}
} while(true);
_resize_lock_owner = locker;
_invisible_epoch = nullptr;
_resize_lock_owner.store_relaxed(locker);
_invisible_epoch.store_relaxed(nullptr);
}
template <typename CONFIG, MemTag MT>
inline void ConcurrentHashTable<CONFIG, MT>::
unlock_resize_lock(Thread* locker)
{
_invisible_epoch = nullptr;
assert(locker == _resize_lock_owner, "Not unlocked by locker.");
_resize_lock_owner = nullptr;
_invisible_epoch.store_relaxed(nullptr);
assert(locker == _resize_lock_owner.load_relaxed(), "Not unlocked by locker.");
_resize_lock_owner.store_relaxed(nullptr);
_resize_lock->unlock();
}
@ -477,8 +478,8 @@ inline void ConcurrentHashTable<CONFIG, MT>::
{
// Here we have resize lock so table is SMR safe, and there is no new
// table. Can do this in parallel if we want.
assert((is_mt && _resize_lock_owner != nullptr) ||
(!is_mt && _resize_lock_owner == thread), "Re-size lock not held");
assert((is_mt && _resize_lock_owner.load_relaxed() != nullptr) ||
(!is_mt && _resize_lock_owner.load_relaxed() == thread), "Re-size lock not held");
Node* ndel_stack[StackBufferSize];
InternalTable* table = get_table();
assert(start_idx < stop_idx, "Must be");
@ -696,7 +697,7 @@ inline bool ConcurrentHashTable<CONFIG, MT>::
if (!try_resize_lock(thread)) {
return false;
}
assert(_resize_lock_owner == thread, "Re-size lock not held");
assert(_resize_lock_owner.load_relaxed() == thread, "Re-size lock not held");
if (_table->_log2_size == _log2_start_size ||
_table->_log2_size <= log2_size) {
unlock_resize_lock(thread);
@ -710,10 +711,10 @@ template <typename CONFIG, MemTag MT>
inline void ConcurrentHashTable<CONFIG, MT>::
internal_shrink_epilog(Thread* thread)
{
assert(_resize_lock_owner == thread, "Re-size lock not held");
assert(_resize_lock_owner.load_relaxed() == thread, "Re-size lock not held");
InternalTable* old_table = set_table_from_new();
_size_limit_reached = false;
_size_limit_reached.store_relaxed(false);
unlock_resize_lock(thread);
#ifdef ASSERT
for (size_t i = 0; i < old_table->_size; i++) {
@ -767,13 +768,13 @@ inline bool ConcurrentHashTable<CONFIG, MT>::
internal_shrink(Thread* thread, size_t log2_size)
{
if (!internal_shrink_prolog(thread, log2_size)) {
assert(_resize_lock_owner != thread, "Re-size lock held");
assert(_resize_lock_owner.load_relaxed() != thread, "Re-size lock held");
return false;
}
assert(_resize_lock_owner == thread, "Should be locked by me");
assert(_resize_lock_owner.load_relaxed() == thread, "Should be locked by me");
internal_shrink_range(thread, 0, _new_table->_size);
internal_shrink_epilog(thread);
assert(_resize_lock_owner != thread, "Re-size lock held");
assert(_resize_lock_owner.load_relaxed() != thread, "Re-size lock held");
return true;
}
@ -787,7 +788,7 @@ inline void ConcurrentHashTable<CONFIG, MT>::
delete _table;
// Create and publish a new table
InternalTable* table = new InternalTable(log2_size);
_size_limit_reached = (log2_size == _log2_size_limit);
_size_limit_reached.store_relaxed(log2_size == _log2_size_limit);
AtomicAccess::release_store(&_table, table);
}
@ -812,7 +813,7 @@ inline bool ConcurrentHashTable<CONFIG, MT>::
}
_new_table = new InternalTable(_table->_log2_size + 1);
_size_limit_reached = _new_table->_log2_size == _log2_size_limit;
_size_limit_reached.store_relaxed(_new_table->_log2_size == _log2_size_limit);
return true;
}
@ -821,7 +822,7 @@ template <typename CONFIG, MemTag MT>
inline void ConcurrentHashTable<CONFIG, MT>::
internal_grow_epilog(Thread* thread)
{
assert(_resize_lock_owner == thread, "Should be locked");
assert(_resize_lock_owner.load_relaxed() == thread, "Should be locked");
InternalTable* old_table = set_table_from_new();
unlock_resize_lock(thread);
@ -840,13 +841,13 @@ inline bool ConcurrentHashTable<CONFIG, MT>::
internal_grow(Thread* thread, size_t log2_size)
{
if (!internal_grow_prolog(thread, log2_size)) {
assert(_resize_lock_owner != thread, "Re-size lock held");
assert(_resize_lock_owner.load_relaxed() != thread, "Re-size lock held");
return false;
}
assert(_resize_lock_owner == thread, "Should be locked by me");
assert(_resize_lock_owner.load_relaxed() == thread, "Should be locked by me");
internal_grow_range(thread, 0, _table->_size);
internal_grow_epilog(thread);
assert(_resize_lock_owner != thread, "Re-size lock held");
assert(_resize_lock_owner.load_relaxed() != thread, "Re-size lock held");
return true;
}
@ -961,7 +962,7 @@ template <typename FUNC>
inline void ConcurrentHashTable<CONFIG, MT>::
do_scan_locked(Thread* thread, FUNC& scan_f)
{
assert(_resize_lock_owner == thread, "Re-size lock not held");
assert(_resize_lock_owner.load_relaxed() == thread, "Re-size lock not held");
// We can do a critical section over the entire loop but that would block
// updates for a long time. Instead we choose to block resizes.
InternalTable* table = get_table();
@ -1020,7 +1021,7 @@ ConcurrentHashTable(size_t log2size, size_t log2size_limit, size_t grow_hint, bo
_resize_lock = new Mutex(rank, "ConcurrentHashTableResize_lock");
_table = new InternalTable(log2size);
assert(log2size_limit >= log2size, "bad ergo");
_size_limit_reached = _table->_log2_size == _log2_size_limit;
_size_limit_reached.store_relaxed(_table->_log2_size == _log2_size_limit);
}
template <typename CONFIG, MemTag MT>
@ -1139,11 +1140,11 @@ inline void ConcurrentHashTable<CONFIG, MT>::
{
assert(!SafepointSynchronize::is_at_safepoint(),
"must be outside a safepoint");
assert(_resize_lock_owner != thread, "Re-size lock held");
assert(_resize_lock_owner.load_relaxed() != thread, "Re-size lock held");
lock_resize_lock(thread);
do_scan_locked(thread, scan_f);
unlock_resize_lock(thread);
assert(_resize_lock_owner != thread, "Re-size lock held");
assert(_resize_lock_owner.load_relaxed() != thread, "Re-size lock held");
}
template <typename CONFIG, MemTag MT>
@ -1205,7 +1206,7 @@ inline bool ConcurrentHashTable<CONFIG, MT>::
}
do_bulk_delete_locked(thread, eval_f, del_f);
unlock_resize_lock(thread);
assert(_resize_lock_owner != thread, "Re-size lock held");
assert(_resize_lock_owner.load_relaxed() != thread, "Re-size lock held");
return true;
}

View File

@ -27,6 +27,7 @@
// No concurrentHashTableTasks.hpp
#include "runtime/atomic.hpp"
#include "runtime/atomicAccess.hpp"
#include "utilities/concurrentHashTable.inline.hpp"
#include "utilities/globalDefinitions.hpp"
@ -41,7 +42,7 @@ class ConcurrentHashTable<CONFIG, MT>::BucketsOperation {
ConcurrentHashTable<CONFIG, MT>* _cht;
class InternalTableClaimer {
volatile size_t _next;
Atomic<size_t> _next;
size_t _limit;
size_t _size;
@ -56,14 +57,14 @@ public:
void set(size_t claim_size, InternalTable* table) {
assert(table != nullptr, "precondition");
_next = 0;
_next.store_relaxed(0);
_limit = table->_size;
_size = MIN2(claim_size, _limit);
}
bool claim(size_t* start, size_t* stop) {
if (AtomicAccess::load(&_next) < _limit) {
size_t claimed = AtomicAccess::fetch_then_add(&_next, _size);
if (_next.load_relaxed() < _limit) {
size_t claimed = _next.fetch_then_add(_size);
if (claimed < _limit) {
*start = claimed;
*stop = MIN2(claimed + _size, _limit);
@ -78,7 +79,7 @@ public:
}
bool have_more_work() {
return AtomicAccess::load_acquire(&_next) >= _limit;
return _next.load_acquire() >= _limit;
}
};
@ -108,13 +109,13 @@ public:
}
void thread_owns_resize_lock(Thread* thread) {
assert(BucketsOperation::_cht->_resize_lock_owner == thread,
assert(BucketsOperation::_cht->_resize_lock_owner.load_relaxed() == thread,
"Should be locked by me");
assert(BucketsOperation::_cht->_resize_lock->owned_by_self(),
"Operations lock not held");
}
void thread_owns_only_state_lock(Thread* thread) {
assert(BucketsOperation::_cht->_resize_lock_owner == thread,
assert(BucketsOperation::_cht->_resize_lock_owner.load_relaxed() == thread,
"Should be locked by me");
assert(!BucketsOperation::_cht->_resize_lock->owned_by_self(),
"Operations lock held");
@ -122,7 +123,7 @@ public:
void thread_do_not_own_resize_lock(Thread* thread) {
assert(!BucketsOperation::_cht->_resize_lock->owned_by_self(),
"Operations lock held");
assert(BucketsOperation::_cht->_resize_lock_owner != thread,
assert(BucketsOperation::_cht->_resize_lock_owner.load_relaxed() != thread,
"Should not be locked by me");
}
@ -169,7 +170,7 @@ class ConcurrentHashTable<CONFIG, MT>::BulkDeleteTask :
template <typename EVALUATE_FUNC, typename DELETE_FUNC>
bool do_task(Thread* thread, EVALUATE_FUNC& eval_f, DELETE_FUNC& del_f) {
size_t start, stop;
assert(BucketsOperation::_cht->_resize_lock_owner != nullptr,
assert(BucketsOperation::_cht->_resize_lock_owner.load_relaxed() != nullptr,
"Should be locked");
if (!this->claim(&start, &stop)) {
return false;
@ -177,7 +178,7 @@ class ConcurrentHashTable<CONFIG, MT>::BulkDeleteTask :
BucketsOperation::_cht->do_bulk_delete_locked_for(thread, start, stop,
eval_f, del_f,
BucketsOperation::_is_mt);
assert(BucketsOperation::_cht->_resize_lock_owner != nullptr,
assert(BucketsOperation::_cht->_resize_lock_owner.load_relaxed() != nullptr,
"Should be locked");
return true;
}
@ -210,13 +211,13 @@ class ConcurrentHashTable<CONFIG, MT>::GrowTask :
// Re-sizes a portion of the table. Returns true if there is more work.
bool do_task(Thread* thread) {
size_t start, stop;
assert(BucketsOperation::_cht->_resize_lock_owner != nullptr,
assert(BucketsOperation::_cht->_resize_lock_owner.load_relaxed() != nullptr,
"Should be locked");
if (!this->claim(&start, &stop)) {
return false;
}
BucketsOperation::_cht->internal_grow_range(thread, start, stop);
assert(BucketsOperation::_cht->_resize_lock_owner != nullptr,
assert(BucketsOperation::_cht->_resize_lock_owner.load_relaxed() != nullptr,
"Should be locked");
return true;
}
@ -253,13 +254,13 @@ class ConcurrentHashTable<CONFIG, MT>::StatisticsTask :
template <typename VALUE_SIZE_FUNC>
bool do_task(Thread* thread, VALUE_SIZE_FUNC& sz) {
size_t start, stop;
assert(BucketsOperation::_cht->_resize_lock_owner != nullptr,
assert(BucketsOperation::_cht->_resize_lock_owner.load_relaxed() != nullptr,
"Should be locked");
if (!this->claim(&start, &stop)) {
return false;
}
BucketsOperation::_cht->internal_statistics_range(thread, start, stop, sz, _summary, _literal_bytes);
assert(BucketsOperation::_cht->_resize_lock_owner != nullptr,
assert(BucketsOperation::_cht->_resize_lock_owner.load_relaxed() != nullptr,
"Should be locked");
return true;
}