mirror of
https://github.com/openjdk/jdk.git
synced 2026-03-14 09:53:18 +00:00
8043575: Dynamically parallelize reference processing work
In G1 automatically set the number of parallel Reference processing threads Co-authored-by: Sangheon Kim <sangheon.kim@oracle.com> Reviewed-by: sangheki, kbarrett
This commit is contained in:
parent
1a0553e4eb
commit
7f9bbfa767
@ -300,7 +300,8 @@ void CMSCollector::ref_processor_init() {
|
||||
_cmsGen->refs_discovery_is_mt(), // mt discovery
|
||||
MAX2(ConcGCThreads, ParallelGCThreads), // mt discovery degree
|
||||
_cmsGen->refs_discovery_is_atomic(), // discovery is not atomic
|
||||
&_is_alive_closure); // closure for liveness info
|
||||
&_is_alive_closure, // closure for liveness info
|
||||
false); // disable adjusting number of processing threads
|
||||
// Initialize the _ref_processor field of CMSGen
|
||||
_cmsGen->set_ref_processor(_ref_processor);
|
||||
|
||||
@ -5126,16 +5127,18 @@ void CMSRefProcTaskProxy::do_work_steal(int i,
|
||||
log_develop_trace(gc, task)("\t(%d: stole %d oops)", i, num_steals);
|
||||
}
|
||||
|
||||
void CMSRefProcTaskExecutor::execute(ProcessTask& task)
|
||||
{
|
||||
void CMSRefProcTaskExecutor::execute(ProcessTask& task, uint ergo_workers) {
|
||||
CMSHeap* heap = CMSHeap::heap();
|
||||
WorkGang* workers = heap->workers();
|
||||
assert(workers != NULL, "Need parallel worker threads.");
|
||||
assert(workers->active_workers() == ergo_workers,
|
||||
"Ergonomically chosen workers (%u) must be equal to active workers (%u)",
|
||||
ergo_workers, workers->active_workers());
|
||||
CMSRefProcTaskProxy rp_task(task, &_collector,
|
||||
_collector.ref_processor_span(),
|
||||
_collector.markBitMap(),
|
||||
workers, _collector.task_queues());
|
||||
workers->run_task(&rp_task);
|
||||
workers->run_task(&rp_task, workers->active_workers());
|
||||
}
|
||||
|
||||
void CMSCollector::refProcessingWork() {
|
||||
|
||||
@ -486,7 +486,7 @@ public:
|
||||
{ }
|
||||
|
||||
// Executes a task using worker threads.
|
||||
virtual void execute(ProcessTask& task);
|
||||
virtual void execute(ProcessTask& task, uint ergo_workers);
|
||||
private:
|
||||
CMSCollector& _collector;
|
||||
};
|
||||
|
||||
@ -793,14 +793,17 @@ void ParNewRefProcTaskProxy::work(uint worker_id) {
|
||||
par_scan_state.evacuate_followers_closure());
|
||||
}
|
||||
|
||||
void ParNewRefProcTaskExecutor::execute(ProcessTask& task) {
|
||||
void ParNewRefProcTaskExecutor::execute(ProcessTask& task, uint ergo_workers) {
|
||||
CMSHeap* gch = CMSHeap::heap();
|
||||
WorkGang* workers = gch->workers();
|
||||
assert(workers != NULL, "Need parallel worker threads.");
|
||||
assert(workers->active_workers() == ergo_workers,
|
||||
"Ergonomically chosen workers (%u) must be equal to active workers (%u)",
|
||||
ergo_workers, workers->active_workers());
|
||||
_state_set.reset(workers->active_workers(), _young_gen.promotion_failed());
|
||||
ParNewRefProcTaskProxy rp_task(task, _young_gen, _old_gen,
|
||||
_young_gen.reserved().end(), _state_set);
|
||||
workers->run_task(&rp_task);
|
||||
workers->run_task(&rp_task, workers->active_workers());
|
||||
_state_set.reset(0 /* bad value in debug if not reset */,
|
||||
_young_gen.promotion_failed());
|
||||
}
|
||||
@ -1450,7 +1453,8 @@ void ParNewGeneration::ref_processor_init() {
|
||||
refs_discovery_is_mt(), // mt discovery
|
||||
ParallelGCThreads, // mt discovery degree
|
||||
refs_discovery_is_atomic(), // atomic_discovery
|
||||
NULL); // is_alive_non_header
|
||||
NULL, // is_alive_non_header
|
||||
false); // disable adjusting number of processing threads
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -298,7 +298,7 @@ class ParNewRefProcTaskExecutor: public AbstractRefProcTaskExecutor {
|
||||
{ }
|
||||
|
||||
// Executes a task using worker threads.
|
||||
virtual void execute(ProcessTask& task);
|
||||
virtual void execute(ProcessTask& task, uint ergo_workers);
|
||||
// Switch to single threaded mode.
|
||||
virtual void set_single_threaded_mode();
|
||||
};
|
||||
|
||||
@ -1815,7 +1815,8 @@ void G1CollectedHeap::ref_processing_init() {
|
||||
(ParallelGCThreads > 1) || (ConcGCThreads > 1), // mt discovery
|
||||
MAX2(ParallelGCThreads, ConcGCThreads), // degree of mt discovery
|
||||
false, // Reference discovery is not atomic
|
||||
&_is_alive_closure_cm); // is alive closure
|
||||
&_is_alive_closure_cm, // is alive closure
|
||||
true); // allow changes to number of processing threads
|
||||
|
||||
// STW ref processor
|
||||
_ref_processor_stw =
|
||||
@ -1825,7 +1826,8 @@ void G1CollectedHeap::ref_processing_init() {
|
||||
(ParallelGCThreads > 1), // mt discovery
|
||||
ParallelGCThreads, // degree of mt discovery
|
||||
true, // Reference discovery is atomic
|
||||
&_is_alive_closure_stw); // is alive closure
|
||||
&_is_alive_closure_stw, // is alive closure
|
||||
true); // allow changes to number of processing threads
|
||||
}
|
||||
|
||||
CollectorPolicy* G1CollectedHeap::collector_policy() const {
|
||||
@ -3791,25 +3793,22 @@ private:
|
||||
G1ParScanThreadStateSet* _pss;
|
||||
RefToScanQueueSet* _queues;
|
||||
WorkGang* _workers;
|
||||
uint _active_workers;
|
||||
|
||||
public:
|
||||
G1STWRefProcTaskExecutor(G1CollectedHeap* g1h,
|
||||
G1ParScanThreadStateSet* per_thread_states,
|
||||
WorkGang* workers,
|
||||
RefToScanQueueSet *task_queues,
|
||||
uint n_workers) :
|
||||
RefToScanQueueSet *task_queues) :
|
||||
_g1h(g1h),
|
||||
_pss(per_thread_states),
|
||||
_queues(task_queues),
|
||||
_workers(workers),
|
||||
_active_workers(n_workers)
|
||||
_workers(workers)
|
||||
{
|
||||
g1h->ref_processor_stw()->set_active_mt_degree(n_workers);
|
||||
g1h->ref_processor_stw()->set_active_mt_degree(workers->active_workers());
|
||||
}
|
||||
|
||||
// Executes the given task using concurrent marking worker threads.
|
||||
virtual void execute(ProcessTask& task);
|
||||
virtual void execute(ProcessTask& task, uint ergo_workers);
|
||||
};
|
||||
|
||||
// Gang task for possibly parallel reference processing
|
||||
@ -3865,13 +3864,16 @@ public:
|
||||
// Driver routine for parallel reference processing.
|
||||
// Creates an instance of the ref processing gang
|
||||
// task and has the worker threads execute it.
|
||||
void G1STWRefProcTaskExecutor::execute(ProcessTask& proc_task) {
|
||||
void G1STWRefProcTaskExecutor::execute(ProcessTask& proc_task, uint ergo_workers) {
|
||||
assert(_workers != NULL, "Need parallel worker threads.");
|
||||
|
||||
ParallelTaskTerminator terminator(_active_workers, _queues);
|
||||
assert(_workers->active_workers() >= ergo_workers,
|
||||
"Ergonomically chosen workers (%u) should be less than or equal to active workers (%u)",
|
||||
ergo_workers, _workers->active_workers());
|
||||
ParallelTaskTerminator terminator(ergo_workers, _queues);
|
||||
G1STWRefProcTaskProxy proc_task_proxy(proc_task, _g1h, _pss, _queues, &terminator);
|
||||
|
||||
_workers->run_task(&proc_task_proxy);
|
||||
_workers->run_task(&proc_task_proxy, ergo_workers);
|
||||
}
|
||||
|
||||
// End of weak reference support closures
|
||||
@ -3922,7 +3924,7 @@ void G1CollectedHeap::process_discovered_references(G1ParScanThreadStateSet* per
|
||||
"Mismatch between the number of GC workers %u and the maximum number of Reference process queues %u",
|
||||
no_of_gc_workers, rp->max_num_queues());
|
||||
|
||||
G1STWRefProcTaskExecutor par_task_executor(this, per_thread_states, workers(), _task_queues, no_of_gc_workers);
|
||||
G1STWRefProcTaskExecutor par_task_executor(this, per_thread_states, workers(), _task_queues);
|
||||
stats = rp->process_discovered_references(&is_alive,
|
||||
&keep_alive,
|
||||
&drain_queue,
|
||||
|
||||
@ -1518,8 +1518,7 @@ public:
|
||||
_g1h(g1h), _cm(cm),
|
||||
_workers(workers), _active_workers(n_workers) { }
|
||||
|
||||
// Executes the given task using concurrent marking worker threads.
|
||||
virtual void execute(ProcessTask& task);
|
||||
virtual void execute(ProcessTask& task, uint ergo_workers);
|
||||
};
|
||||
|
||||
class G1CMRefProcTaskProxy : public AbstractGangTask {
|
||||
@ -1550,9 +1549,12 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
void G1CMRefProcTaskExecutor::execute(ProcessTask& proc_task) {
|
||||
void G1CMRefProcTaskExecutor::execute(ProcessTask& proc_task, uint ergo_workers) {
|
||||
assert(_workers != NULL, "Need parallel worker threads.");
|
||||
assert(_g1h->ref_processor_cm()->processing_is_mt(), "processing is not MT");
|
||||
assert(_workers->active_workers() >= ergo_workers,
|
||||
"Ergonomically chosen workers(%u) should be less than or equal to active workers(%u)",
|
||||
ergo_workers, _workers->active_workers());
|
||||
|
||||
G1CMRefProcTaskProxy proc_task_proxy(proc_task, _g1h, _cm);
|
||||
|
||||
@ -1560,8 +1562,8 @@ void G1CMRefProcTaskExecutor::execute(ProcessTask& proc_task) {
|
||||
// proxy task execution, so that the termination protocol
|
||||
// and overflow handling in G1CMTask::do_marking_step() knows
|
||||
// how many workers to wait for.
|
||||
_cm->set_concurrency(_active_workers);
|
||||
_workers->run_task(&proc_task_proxy);
|
||||
_cm->set_concurrency(ergo_workers);
|
||||
_workers->run_task(&proc_task_proxy, ergo_workers);
|
||||
}
|
||||
|
||||
void G1ConcurrentMark::weak_refs_work(bool clear_all_soft_refs) {
|
||||
|
||||
@ -67,9 +67,13 @@ void G1FullGCReferenceProcessingExecutor::run_task(AbstractGangTask* task) {
|
||||
G1CollectedHeap::heap()->workers()->run_task(task, _collector->workers());
|
||||
}
|
||||
|
||||
void G1FullGCReferenceProcessingExecutor::execute(ProcessTask& proc_task) {
|
||||
void G1FullGCReferenceProcessingExecutor::run_task(AbstractGangTask* task, uint workers) {
|
||||
G1CollectedHeap::heap()->workers()->run_task(task, workers);
|
||||
}
|
||||
|
||||
void G1FullGCReferenceProcessingExecutor::execute(ProcessTask& proc_task, uint ergo_workers) {
|
||||
G1RefProcTaskProxy proc_task_proxy(proc_task, _collector);
|
||||
run_task(&proc_task_proxy);
|
||||
run_task(&proc_task_proxy, ergo_workers);
|
||||
}
|
||||
|
||||
void G1FullGCReferenceProcessingExecutor::execute(STWGCTimer* timer, G1FullGCTracer* tracer) {
|
||||
|
||||
@ -50,10 +50,11 @@ public:
|
||||
void execute(STWGCTimer* timer, G1FullGCTracer* tracer);
|
||||
|
||||
// Executes the given task using concurrent marking worker threads.
|
||||
virtual void execute(ProcessTask& task);
|
||||
virtual void execute(ProcessTask& task, uint ergo_workers);
|
||||
|
||||
private:
|
||||
void run_task(AbstractGangTask* task);
|
||||
void run_task(AbstractGangTask* task, uint workers);
|
||||
|
||||
class G1RefProcTaskProxy : public AbstractGangTask {
|
||||
typedef AbstractRefProcTaskExecutor::ProcessTask ProcessTask;
|
||||
|
||||
@ -146,10 +146,13 @@ void RefProcTaskProxy::do_it(GCTaskManager* manager, uint which)
|
||||
// RefProcTaskExecutor
|
||||
//
|
||||
|
||||
void RefProcTaskExecutor::execute(ProcessTask& task)
|
||||
void RefProcTaskExecutor::execute(ProcessTask& task, uint ergo_workers)
|
||||
{
|
||||
ParallelScavengeHeap* heap = ParallelScavengeHeap::heap();
|
||||
uint active_gc_threads = heap->gc_task_manager()->active_workers();
|
||||
assert(active_gc_threads == ergo_workers,
|
||||
"Ergonomically chosen workers (%u) must be equal to active workers (%u)",
|
||||
ergo_workers, active_gc_threads);
|
||||
OopTaskQueueSet* qset = ParCompactionManager::stack_array();
|
||||
ParallelTaskTerminator terminator(active_gc_threads, qset);
|
||||
GCTaskQueue* q = GCTaskQueue::create();
|
||||
|
||||
@ -140,7 +140,7 @@ private:
|
||||
//
|
||||
|
||||
class RefProcTaskExecutor: public AbstractRefProcTaskExecutor {
|
||||
virtual void execute(ProcessTask& task);
|
||||
virtual void execute(ProcessTask& task, uint ergo_workers);
|
||||
};
|
||||
|
||||
|
||||
|
||||
@ -852,7 +852,8 @@ void PSParallelCompact::post_initialize() {
|
||||
true, // mt discovery
|
||||
ParallelGCThreads, // mt discovery degree
|
||||
true, // atomic_discovery
|
||||
&_is_alive_closure); // non-header is alive closure
|
||||
&_is_alive_closure, // non-header is alive closure
|
||||
false); // disable adjusting number of processing threads
|
||||
_counters = new CollectorCounters("PSParallelCompact", 1);
|
||||
|
||||
// Initialize static fields in ParCompactionManager.
|
||||
|
||||
@ -150,20 +150,26 @@ void PSRefProcTaskProxy::do_it(GCTaskManager* manager, uint which)
|
||||
}
|
||||
|
||||
class PSRefProcTaskExecutor: public AbstractRefProcTaskExecutor {
|
||||
virtual void execute(ProcessTask& task);
|
||||
virtual void execute(ProcessTask& task, uint ergo_workers);
|
||||
};
|
||||
|
||||
void PSRefProcTaskExecutor::execute(ProcessTask& task)
|
||||
void PSRefProcTaskExecutor::execute(ProcessTask& task, uint ergo_workers)
|
||||
{
|
||||
GCTaskQueue* q = GCTaskQueue::create();
|
||||
GCTaskManager* manager = ParallelScavengeHeap::gc_task_manager();
|
||||
for(uint i=0; i < manager->active_workers(); i++) {
|
||||
uint active_workers = manager->active_workers();
|
||||
|
||||
assert(active_workers == ergo_workers,
|
||||
"Ergonomically chosen workers (%u) must be equal to active workers (%u)",
|
||||
ergo_workers, active_workers);
|
||||
|
||||
for(uint i=0; i < active_workers; i++) {
|
||||
q->enqueue(new PSRefProcTaskProxy(task, i));
|
||||
}
|
||||
ParallelTaskTerminator terminator(manager->active_workers(),
|
||||
ParallelTaskTerminator terminator(active_workers,
|
||||
(TaskQueueSetSuper*) PSPromotionManager::stack_array_depth());
|
||||
if (task.marks_oops_alive() && manager->active_workers() > 1) {
|
||||
for (uint j = 0; j < manager->active_workers(); j++) {
|
||||
if (task.marks_oops_alive() && active_workers > 1) {
|
||||
for (uint j = 0; j < active_workers; j++) {
|
||||
q->enqueue(new StealTask(&terminator));
|
||||
}
|
||||
}
|
||||
@ -748,7 +754,8 @@ void PSScavenge::initialize() {
|
||||
true, // mt discovery
|
||||
ParallelGCThreads, // mt discovery degree
|
||||
true, // atomic_discovery
|
||||
NULL); // header provides liveness info
|
||||
NULL, // header provides liveness info
|
||||
false);
|
||||
|
||||
// Cache the cardtable
|
||||
_card_table = heap->card_table();
|
||||
|
||||
@ -307,6 +307,12 @@
|
||||
product(bool, ParallelRefProcBalancingEnabled, true, \
|
||||
"Enable balancing of reference processing queues") \
|
||||
\
|
||||
experimental(size_t, ReferencesPerThread, 1000, \
|
||||
"Ergonomically start one thread for this amount of " \
|
||||
"references for reference processing if " \
|
||||
"ParallelRefProcEnabled is true. Specify 0 to disable and " \
|
||||
"use all threads.") \
|
||||
\
|
||||
product(uintx, InitiatingHeapOccupancyPercent, 45, \
|
||||
"The percent occupancy (IHOP) of the current old generation " \
|
||||
"capacity above which a concurrent mark cycle will be initiated " \
|
||||
|
||||
@ -99,13 +99,15 @@ ReferenceProcessor::ReferenceProcessor(BoolObjectClosure* is_subject_to_discover
|
||||
bool mt_discovery,
|
||||
uint mt_discovery_degree,
|
||||
bool atomic_discovery,
|
||||
BoolObjectClosure* is_alive_non_header) :
|
||||
BoolObjectClosure* is_alive_non_header,
|
||||
bool adjust_no_of_processing_threads) :
|
||||
_is_subject_to_discovery(is_subject_to_discovery),
|
||||
_discovering_refs(false),
|
||||
_enqueuing_is_done(false),
|
||||
_is_alive_non_header(is_alive_non_header),
|
||||
_processing_is_mt(mt_processing),
|
||||
_next_id(0)
|
||||
_next_id(0),
|
||||
_adjust_no_of_processing_threads(adjust_no_of_processing_threads)
|
||||
{
|
||||
assert(is_subject_to_discovery != NULL, "must be set");
|
||||
|
||||
@ -679,7 +681,8 @@ bool ReferenceProcessor::need_balance_queues(DiscoveredList refs_lists[]) {
|
||||
}
|
||||
|
||||
void ReferenceProcessor::maybe_balance_queues(DiscoveredList refs_lists[]) {
|
||||
if (_processing_is_mt && need_balance_queues(refs_lists)) {
|
||||
assert(_processing_is_mt, "Should not call this otherwise");
|
||||
if (need_balance_queues(refs_lists)) {
|
||||
balance_queues(refs_lists);
|
||||
}
|
||||
}
|
||||
@ -768,22 +771,30 @@ void ReferenceProcessor::balance_queues(DiscoveredList ref_lists[])
|
||||
#endif
|
||||
}
|
||||
|
||||
bool ReferenceProcessor::is_mt_processing_set_up(AbstractRefProcTaskExecutor* task_executor) const {
|
||||
return task_executor != NULL && _processing_is_mt;
|
||||
}
|
||||
|
||||
void ReferenceProcessor::process_soft_ref_reconsider(BoolObjectClosure* is_alive,
|
||||
OopClosure* keep_alive,
|
||||
VoidClosure* complete_gc,
|
||||
AbstractRefProcTaskExecutor* task_executor,
|
||||
AbstractRefProcTaskExecutor* task_executor,
|
||||
ReferenceProcessorPhaseTimes* phase_times) {
|
||||
assert(!_processing_is_mt || task_executor != NULL, "Task executor must not be NULL when mt processing is set.");
|
||||
|
||||
phase_times->set_ref_discovered(REF_SOFT, total_count(_discoveredSoftRefs));
|
||||
|
||||
if (_current_soft_ref_policy == NULL) {
|
||||
return;
|
||||
}
|
||||
size_t const num_soft_refs = total_count(_discoveredSoftRefs);
|
||||
phase_times->set_ref_discovered(REF_SOFT, num_soft_refs);
|
||||
|
||||
phase_times->set_processing_is_mt(_processing_is_mt);
|
||||
|
||||
{
|
||||
if (num_soft_refs == 0 || _current_soft_ref_policy == NULL) {
|
||||
log_debug(gc, ref)("Skipped phase1 of Reference Processing due to unavailable references");
|
||||
return;
|
||||
}
|
||||
|
||||
RefProcMTDegreeAdjuster a(this, RefPhase1, num_soft_refs);
|
||||
|
||||
if (_processing_is_mt) {
|
||||
RefProcBalanceQueuesTimeTracker tt(RefPhase1, phase_times);
|
||||
maybe_balance_queues(_discoveredSoftRefs);
|
||||
}
|
||||
@ -793,7 +804,7 @@ void ReferenceProcessor::process_soft_ref_reconsider(BoolObjectClosure* is_alive
|
||||
log_reflist("Phase1 Soft before", _discoveredSoftRefs, _max_num_queues);
|
||||
if (_processing_is_mt) {
|
||||
RefProcPhase1Task phase1(*this, phase_times, _current_soft_ref_policy);
|
||||
task_executor->execute(phase1);
|
||||
task_executor->execute(phase1, num_queues());
|
||||
} else {
|
||||
size_t removed = 0;
|
||||
|
||||
@ -815,12 +826,23 @@ void ReferenceProcessor::process_soft_weak_final_refs(BoolObjectClosure* is_aliv
|
||||
ReferenceProcessorPhaseTimes* phase_times) {
|
||||
assert(!_processing_is_mt || task_executor != NULL, "Task executor must not be NULL when mt processing is set.");
|
||||
|
||||
phase_times->set_ref_discovered(REF_WEAK, total_count(_discoveredWeakRefs));
|
||||
phase_times->set_ref_discovered(REF_FINAL, total_count(_discoveredFinalRefs));
|
||||
size_t const num_soft_refs = total_count(_discoveredSoftRefs);
|
||||
size_t const num_weak_refs = total_count(_discoveredWeakRefs);
|
||||
size_t const num_final_refs = total_count(_discoveredFinalRefs);
|
||||
size_t const num_total_refs = num_soft_refs + num_weak_refs + num_final_refs;
|
||||
phase_times->set_ref_discovered(REF_WEAK, num_weak_refs);
|
||||
phase_times->set_ref_discovered(REF_FINAL, num_final_refs);
|
||||
|
||||
phase_times->set_processing_is_mt(_processing_is_mt);
|
||||
|
||||
{
|
||||
if (num_total_refs == 0) {
|
||||
log_debug(gc, ref)("Skipped phase2 of Reference Processing due to unavailable references");
|
||||
return;
|
||||
}
|
||||
|
||||
RefProcMTDegreeAdjuster a(this, RefPhase2, num_total_refs);
|
||||
|
||||
if (_processing_is_mt) {
|
||||
RefProcBalanceQueuesTimeTracker tt(RefPhase2, phase_times);
|
||||
maybe_balance_queues(_discoveredSoftRefs);
|
||||
maybe_balance_queues(_discoveredWeakRefs);
|
||||
@ -834,7 +856,7 @@ void ReferenceProcessor::process_soft_weak_final_refs(BoolObjectClosure* is_aliv
|
||||
log_reflist("Phase2 Final before", _discoveredFinalRefs, _max_num_queues);
|
||||
if (_processing_is_mt) {
|
||||
RefProcPhase2Task phase2(*this, phase_times);
|
||||
task_executor->execute(phase2);
|
||||
task_executor->execute(phase2, num_queues());
|
||||
} else {
|
||||
RefProcWorkerTimeTracker t(phase_times->phase2_worker_time_sec(), 0);
|
||||
{
|
||||
@ -880,9 +902,18 @@ void ReferenceProcessor::process_final_keep_alive(OopClosure* keep_alive,
|
||||
ReferenceProcessorPhaseTimes* phase_times) {
|
||||
assert(!_processing_is_mt || task_executor != NULL, "Task executor must not be NULL when mt processing is set.");
|
||||
|
||||
size_t const num_final_refs = total_count(_discoveredFinalRefs);
|
||||
|
||||
phase_times->set_processing_is_mt(_processing_is_mt);
|
||||
|
||||
{
|
||||
if (num_final_refs == 0) {
|
||||
log_debug(gc, ref)("Skipped phase3 of Reference Processing due to unavailable references");
|
||||
return;
|
||||
}
|
||||
|
||||
RefProcMTDegreeAdjuster a(this, RefPhase3, num_final_refs);
|
||||
|
||||
if (_processing_is_mt) {
|
||||
RefProcBalanceQueuesTimeTracker tt(RefPhase3, phase_times);
|
||||
maybe_balance_queues(_discoveredFinalRefs);
|
||||
}
|
||||
@ -893,7 +924,7 @@ void ReferenceProcessor::process_final_keep_alive(OopClosure* keep_alive,
|
||||
|
||||
if (_processing_is_mt) {
|
||||
RefProcPhase3Task phase3(*this, phase_times);
|
||||
task_executor->execute(phase3);
|
||||
task_executor->execute(phase3, num_queues());
|
||||
} else {
|
||||
RefProcSubPhasesWorkerTimeTracker tt2(FinalRefSubPhase3, phase_times, 0);
|
||||
for (uint i = 0; i < _max_num_queues; i++) {
|
||||
@ -910,11 +941,19 @@ void ReferenceProcessor::process_phantom_refs(BoolObjectClosure* is_alive,
|
||||
ReferenceProcessorPhaseTimes* phase_times) {
|
||||
assert(!_processing_is_mt || task_executor != NULL, "Task executor must not be NULL when mt processing is set.");
|
||||
|
||||
phase_times->set_ref_discovered(REF_PHANTOM, total_count(_discoveredPhantomRefs));
|
||||
size_t const num_phantom_refs = total_count(_discoveredPhantomRefs);
|
||||
phase_times->set_ref_discovered(REF_PHANTOM, num_phantom_refs);
|
||||
|
||||
phase_times->set_processing_is_mt(_processing_is_mt);
|
||||
|
||||
{
|
||||
if (num_phantom_refs == 0) {
|
||||
log_debug(gc, ref)("Skipped phase4 of Reference Processing due to unavailable references");
|
||||
return;
|
||||
}
|
||||
|
||||
RefProcMTDegreeAdjuster a(this, RefPhase4, num_phantom_refs);
|
||||
|
||||
if (_processing_is_mt) {
|
||||
RefProcBalanceQueuesTimeTracker tt(RefPhase4, phase_times);
|
||||
maybe_balance_queues(_discoveredPhantomRefs);
|
||||
}
|
||||
@ -925,7 +964,7 @@ void ReferenceProcessor::process_phantom_refs(BoolObjectClosure* is_alive,
|
||||
log_reflist("Phase4 Phantom before", _discoveredPhantomRefs, _max_num_queues);
|
||||
if (_processing_is_mt) {
|
||||
RefProcPhase4Task phase4(*this, phase_times);
|
||||
task_executor->execute(phase4);
|
||||
task_executor->execute(phase4, num_queues());
|
||||
} else {
|
||||
size_t removed = 0;
|
||||
|
||||
@ -1308,3 +1347,45 @@ const char* ReferenceProcessor::list_name(uint i) {
|
||||
ShouldNotReachHere();
|
||||
return NULL;
|
||||
}
|
||||
|
||||
uint RefProcMTDegreeAdjuster::ergo_proc_thread_count(size_t ref_count,
|
||||
uint max_threads,
|
||||
RefProcPhases phase) const {
|
||||
assert(0 < max_threads, "must allow at least one thread");
|
||||
|
||||
if (use_max_threads(phase) || (ReferencesPerThread == 0)) {
|
||||
return max_threads;
|
||||
}
|
||||
|
||||
size_t thread_count = 1 + (ref_count / ReferencesPerThread);
|
||||
return (uint)MIN3(thread_count,
|
||||
static_cast<size_t>(max_threads),
|
||||
(size_t)os::active_processor_count());
|
||||
}
|
||||
|
||||
bool RefProcMTDegreeAdjuster::use_max_threads(RefProcPhases phase) const {
|
||||
// Even a small number of references in either of those cases could produce large amounts of work.
|
||||
return (phase == ReferenceProcessor::RefPhase1 || phase == ReferenceProcessor::RefPhase3);
|
||||
}
|
||||
|
||||
RefProcMTDegreeAdjuster::RefProcMTDegreeAdjuster(ReferenceProcessor* rp,
|
||||
RefProcPhases phase,
|
||||
size_t ref_count):
|
||||
_rp(rp),
|
||||
_saved_mt_processing(_rp->processing_is_mt()),
|
||||
_saved_num_queues(_rp->num_queues()) {
|
||||
if (!_rp->processing_is_mt() || !_rp->adjust_no_of_processing_threads() || (ReferencesPerThread == 0)) {
|
||||
return;
|
||||
}
|
||||
|
||||
uint workers = ergo_proc_thread_count(ref_count, _rp->num_queues(), phase);
|
||||
|
||||
_rp->set_mt_processing(workers > 1);
|
||||
_rp->set_active_mt_degree(workers);
|
||||
}
|
||||
|
||||
RefProcMTDegreeAdjuster::~RefProcMTDegreeAdjuster() {
|
||||
// Revert to previous status.
|
||||
_rp->set_mt_processing(_saved_mt_processing);
|
||||
_rp->set_active_mt_degree(_saved_num_queues);
|
||||
}
|
||||
|
||||
@ -213,6 +213,7 @@ private:
|
||||
uint _next_id; // round-robin mod _num_queues counter in
|
||||
// support of work distribution
|
||||
|
||||
bool _adjust_no_of_processing_threads; // allow dynamic adjustment of processing threads
|
||||
// For collectors that do not keep GC liveness information
|
||||
// in the object header, this field holds a closure that
|
||||
// helps the reference processor determine the reachability
|
||||
@ -378,13 +379,16 @@ private:
|
||||
|
||||
bool is_subject_to_discovery(oop const obj) const;
|
||||
|
||||
bool is_mt_processing_set_up(AbstractRefProcTaskExecutor* task_executor) const;
|
||||
|
||||
public:
|
||||
// Default parameters give you a vanilla reference processor.
|
||||
ReferenceProcessor(BoolObjectClosure* is_subject_to_discovery,
|
||||
bool mt_processing = false, uint mt_processing_degree = 1,
|
||||
bool mt_discovery = false, uint mt_discovery_degree = 1,
|
||||
bool atomic_discovery = true,
|
||||
BoolObjectClosure* is_alive_non_header = NULL);
|
||||
BoolObjectClosure* is_alive_non_header = NULL,
|
||||
bool adjust_no_of_processing_threads = false);
|
||||
|
||||
// RefDiscoveryPolicy values
|
||||
enum DiscoveryPolicy {
|
||||
@ -457,6 +461,8 @@ public:
|
||||
// debugging
|
||||
void verify_no_references_recorded() PRODUCT_RETURN;
|
||||
void verify_referent(oop obj) PRODUCT_RETURN;
|
||||
|
||||
bool adjust_no_of_processing_threads() const { return _adjust_no_of_processing_threads; }
|
||||
};
|
||||
|
||||
// A subject-to-discovery closure that uses a single memory span to determine the area that
|
||||
@ -634,7 +640,7 @@ public:
|
||||
class ProcessTask;
|
||||
|
||||
// Executes a task using worker threads.
|
||||
virtual void execute(ProcessTask& task) = 0;
|
||||
virtual void execute(ProcessTask& task, uint ergo_workers) = 0;
|
||||
|
||||
// Switch to single threaded mode.
|
||||
virtual void set_single_threaded_mode() { };
|
||||
@ -666,4 +672,27 @@ public:
|
||||
bool marks_oops_alive() const { return _marks_oops_alive; }
|
||||
};
|
||||
|
||||
// Temporarily change the number of workers based on given reference count.
|
||||
// This ergonomically decided worker count will be used to activate worker threads.
|
||||
class RefProcMTDegreeAdjuster : public StackObj {
|
||||
typedef ReferenceProcessor::RefProcPhases RefProcPhases;
|
||||
|
||||
ReferenceProcessor* _rp;
|
||||
bool _saved_mt_processing;
|
||||
uint _saved_num_queues;
|
||||
|
||||
// Calculate based on total of references.
|
||||
uint ergo_proc_thread_count(size_t ref_count,
|
||||
uint max_threads,
|
||||
RefProcPhases phase) const;
|
||||
|
||||
bool use_max_threads(RefProcPhases phase) const;
|
||||
|
||||
public:
|
||||
RefProcMTDegreeAdjuster(ReferenceProcessor* rp,
|
||||
RefProcPhases phase,
|
||||
size_t ref_count);
|
||||
~RefProcMTDegreeAdjuster();
|
||||
};
|
||||
|
||||
#endif // SHARE_VM_GC_SHARED_REFERENCEPROCESSOR_HPP
|
||||
|
||||
@ -378,9 +378,13 @@ void ReferenceProcessorPhaseTimes::print_worker_time(LogStream* ls, WorkerDataAr
|
||||
worker_time->print_details_on(&ls2);
|
||||
}
|
||||
} else {
|
||||
ls->print_cr("%s " TIME_FORMAT,
|
||||
ser_title,
|
||||
worker_time->get(0) * MILLIUNITS);
|
||||
if (worker_time->get(0) != uninitialized()) {
|
||||
ls->print_cr("%s " TIME_FORMAT,
|
||||
ser_title,
|
||||
worker_time->get(0) * MILLIUNITS);
|
||||
} else {
|
||||
ls->print_cr("%s skipped", ser_title);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -121,10 +121,10 @@ public class TestPrintReferences {
|
||||
if (parallelRefProcEnabled) {
|
||||
final String timeInParRegex = timeRegex +",\\s";
|
||||
return gcLogTimeRegex + indent(8) + subphaseName +
|
||||
" \\(ms\\):\\s+Min: " + timeInParRegex + "Avg: " + timeInParRegex + "Max: " + timeInParRegex + "Diff: " + timeInParRegex + "Sum: " + timeInParRegex +
|
||||
"Workers: [0-9]+" + "\n";
|
||||
" \\(ms\\):\\s+(Min:" + timeInParRegex + "Avg:" + timeInParRegex + "Max:" + timeInParRegex + "Diff:" + timeInParRegex + "Sum:" + timeInParRegex +
|
||||
"Workers: [0-9]+|skipped)" + "\n";
|
||||
} else {
|
||||
return gcLogTimeRegex + indent(8) + subphaseName + ":" + timeRegex + "ms\n";
|
||||
return gcLogTimeRegex + indent(8) + subphaseName + ":(" + timeRegex + "ms|\\s+skipped)\n";
|
||||
}
|
||||
}
|
||||
|
||||
@ -136,7 +136,7 @@ public class TestPrintReferences {
|
||||
/* Total Reference processing time */
|
||||
String totalRegex = gcLogTimeRegex + indent(4) + referenceProcessing + ": " + timeRegex + "\n";
|
||||
|
||||
String balanceRegex = parallelRefProcEnabled ? gcLogTimeRegex + indent(8) + "Balance queues: " + timeRegex + "\n" : "";
|
||||
String balanceRegex = parallelRefProcEnabled ? "(" + gcLogTimeRegex + indent(8) + "Balance queues: " + timeRegex + "\n)??" : "";
|
||||
|
||||
final boolean p = parallelRefProcEnabled;
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user