8323807: Async UL: Add a stalling mode to async UL

Reviewed-by: dholmes, aboldtch
This commit is contained in:
Johan Sjölen 2025-02-26 12:51:35 +00:00
parent e7d4b360fe
commit ea2c923849
9 changed files with 241 additions and 61 deletions

View File

@ -26,45 +26,63 @@
#include "logging/logConfiguration.hpp"
#include "logging/logFileOutput.hpp"
#include "logging/logFileStreamOutput.hpp"
#include "logging/logHandle.hpp"
#include "memory/allocation.hpp"
#include "memory/resourceArea.hpp"
#include "runtime/atomic.hpp"
#include "runtime/os.inline.hpp"
#include "runtime/globals.hpp"
class AsyncLogWriter::Locker : public StackObj {
Thread*& _holder;
PlatformMonitor& _lock;
class AsyncLogWriter::AsyncLogLocker : public StackObj {
static Thread* _holder;
public:
static Thread* current_holder() { return _holder; }
AsyncLogLocker() {
assert(_instance != nullptr, "AsyncLogWriter::_lock is unavailable");
_instance->_lock.lock();
Locker(Thread*& holder, PlatformMonitor& lock)
: _holder(holder),
_lock(lock) {
_lock.lock();
_holder = Thread::current_or_null();
}
~AsyncLogLocker() {
~Locker() {
assert(_holder == Thread::current_or_null(), "must be");
_holder = nullptr;
_instance->_lock.unlock();
_lock.unlock();
}
void notify() {
_lock.notify();
}
void wait() {
Thread* saved_holder = _holder;
_holder = nullptr;
_instance->_lock.wait(0/* no timeout */);
_lock.wait(0 /* no timeout */);
_holder = saved_holder;
}
};
Thread* AsyncLogWriter::AsyncLogLocker::_holder = nullptr;
class AsyncLogWriter::ProducerLocker : public Locker {
static Thread* _holder;
public:
static Thread* current_holder() { return _holder; }
ProducerLocker() : Locker(_holder, _instance->_producer_lock) {}
};
class AsyncLogWriter::ConsumerLocker : public Locker {
static Thread* _holder;
public:
static Thread* current_holder() { return _holder; }
ConsumerLocker() : Locker(_holder, _instance->_consumer_lock) {}
};
Thread* AsyncLogWriter::ProducerLocker::_holder = nullptr;
Thread* AsyncLogWriter::ConsumerLocker::_holder = nullptr;
// LogDecorator::None applies to 'constant initialization' because of its constexpr constructor.
const LogDecorations& AsyncLogWriter::None = LogDecorations(LogLevel::Warning, LogTagSetMapping<LogTag::__NO_TAG>::tagset(),
LogDecorators::None);
bool AsyncLogWriter::Buffer::push_back(LogFileStreamOutput* output, const LogDecorations& decorations, const char* msg) {
const size_t len = strlen(msg);
bool AsyncLogWriter::Buffer::push_back(LogFileStreamOutput* output, const LogDecorations& decorations, const char* msg, const size_t msg_len) {
const size_t len = msg_len;
const size_t sz = Message::calc_size(len);
const bool is_token = output == nullptr;
// Always leave headroom for the flush token. Pushing a token must succeed.
@ -80,7 +98,7 @@ bool AsyncLogWriter::Buffer::push_back(LogFileStreamOutput* output, const LogDec
}
void AsyncLogWriter::Buffer::push_flush_token() {
bool result = push_back(nullptr, AsyncLogWriter::None, "");
bool result = push_back(nullptr, AsyncLogWriter::None, "", 0);
assert(result, "fail to enqueue the flush token.");
}
@ -89,22 +107,45 @@ void AsyncLogWriter::enqueue_locked(LogFileStreamOutput* output, const LogDecora
// client should use "" instead.
assert(msg != nullptr, "enqueuing a null message!");
if (!_buffer->push_back(output, decorations, msg)) {
bool p_created;
uint32_t* counter = _stats.put_if_absent(output, 0, &p_created);
*counter = *counter + 1;
return;
}
size_t msg_len = strlen(msg);
void* stalled_message = nullptr;
{
ConsumerLocker clocker;
if (_buffer->push_back(output, decorations, msg, msg_len)) {
_data_available = true;
clocker.notify();
return;
}
_data_available = true;
_lock.notify();
if (LogConfiguration::async_mode() == LogConfiguration::AsyncMode::Stall) {
size_t size = Message::calc_size(msg_len);
stalled_message = os::malloc(size, mtLogging);
if (stalled_message == nullptr) {
// Out of memory. We bail without any notice.
// Some other part of the system will probably fail later.
return;
}
_stalled_message = new (stalled_message) Message(output, decorations, msg, msg_len);
_data_available = true;
clocker.notify();
// Note: we still hold the producer lock so cannot race against other threads trying to log a message
while (_stalled_message != nullptr) {
clocker.wait();
}
} else {
bool p_created;
uint32_t* counter = _stats.put_if_absent(output, 0, &p_created);
*counter = *counter + 1;
}
} // ConsumerLocker out of scope
os::free(stalled_message);
}
// This function checks for cases where continuing with asynchronous logging may lead to stability issues, such as a deadlock.
// If this returns false then we give up on logging asynchronously and do so synchronously instead.
bool AsyncLogWriter::is_enqueue_allowed() {
AsyncLogWriter* alw = AsyncLogWriter::instance();
Thread* holding_thread = AsyncLogWriter::AsyncLogLocker::current_holder();
Thread* holding_thread = AsyncLogWriter::ProducerLocker::current_holder();
Thread* this_thread = Thread::current_or_null();
if (this_thread == nullptr) {
// The current thread is unattached.
@ -142,7 +183,7 @@ bool AsyncLogWriter::enqueue(LogFileStreamOutput& output, const LogDecorations&
return false;
}
AsyncLogLocker locker;
ProducerLocker plocker;
#ifdef ASSERT
if (TestingAsyncLoggingDeathTest || TestingAsyncLoggingDeathTestNoCrash) {
@ -162,7 +203,7 @@ bool AsyncLogWriter::enqueue(LogFileStreamOutput& output, LogMessageBuffer::Iter
}
// If we get here we know the AsyncLogWriter is initialized.
AsyncLogLocker locker;
ProducerLocker plocker;
for (; !msg_iterator.is_at_end(); msg_iterator++) {
AsyncLogWriter::instance()->enqueue_locked(&output, msg_iterator.decorations(), msg_iterator.message());
}
@ -170,9 +211,13 @@ bool AsyncLogWriter::enqueue(LogFileStreamOutput& output, LogMessageBuffer::Iter
}
AsyncLogWriter::AsyncLogWriter()
: _flush_sem(0), _lock(), _data_available(false),
_initialized(false),
_stats() {
: _flush_sem(0),
_producer_lock(),
_consumer_lock(),
_data_available(false),
_initialized(false),
_stats(),
_stalled_message(nullptr) {
size_t size = AsyncLogBufferSize / 2;
_buffer = new Buffer(size);
@ -185,7 +230,7 @@ AsyncLogWriter::AsyncLogWriter()
}
}
void AsyncLogWriter::write(AsyncLogMap<AnyObj::RESOURCE_AREA>& snapshot) {
bool AsyncLogWriter::write(AsyncLogMap<AnyObj::RESOURCE_AREA>& snapshot) {
int req = 0;
auto it = _buffer_staging->iterator();
while (it.hasNext()) {
@ -213,8 +258,9 @@ void AsyncLogWriter::write(AsyncLogMap<AnyObj::RESOURCE_AREA>& snapshot) {
if (req > 0) {
assert(req == 1, "Only one token is allowed in queue. AsyncLogWriter::flush() is NOT MT-safe!");
_flush_sem.signal(req);
return true;
}
return false;
}
void AsyncLogWriter::run() {
@ -222,11 +268,11 @@ void AsyncLogWriter::run() {
ResourceMark rm;
AsyncLogMap<AnyObj::RESOURCE_AREA> snapshot;
{
AsyncLogLocker locker;
ConsumerLocker clocker;
while (!_data_available) {
locker.wait();
clocker.wait();
}
// Only doing a swap and statistics under the lock to
// guarantee that I/O jobs don't block logsites.
_buffer_staging->reset();
@ -243,7 +289,23 @@ void AsyncLogWriter::run() {
});
_data_available = false;
}
write(snapshot);
bool saw_flush_token = write(snapshot);
// Any stalled message must be written *after* the buffer has been written.
// This is because we try hard to output messages in program-order.
if (_stalled_message != nullptr) {
assert(LogConfiguration::async_mode() == LogConfiguration::AsyncMode::Stall, "must be");
ConsumerLocker clocker;
Message* m = (Message*)_stalled_message;
m->output()->write_blocking(m->decorations(), m->message());
_stalled_message = nullptr;
clocker.notify();
}
if (saw_flush_token) {
_flush_sem.signal(1);
}
}
}
@ -281,11 +343,12 @@ AsyncLogWriter* AsyncLogWriter::instance() {
void AsyncLogWriter::flush() {
if (_instance != nullptr) {
{
AsyncLogLocker locker;
ProducerLocker plocker;
ConsumerLocker clocker;
// Push directly in-case we are at logical max capacity, as this must not get dropped.
_instance->_buffer->push_flush_token();
_instance->_data_available = true;
_instance->_lock.notify();
clocker.notify();
}
_instance->_flush_sem.wait();
@ -293,7 +356,7 @@ void AsyncLogWriter::flush() {
}
AsyncLogWriter::BufferUpdater::BufferUpdater(size_t newsize) {
AsyncLogLocker locker;
ConsumerLocker clocker;
auto p = AsyncLogWriter::_instance;
_buf1 = p->_buffer;
@ -307,7 +370,7 @@ AsyncLogWriter::BufferUpdater::~BufferUpdater() {
auto p = AsyncLogWriter::_instance;
{
AsyncLogLocker locker;
ConsumerLocker clocker;
delete p->_buffer;
delete p->_buffer_staging;

View File

@ -29,6 +29,7 @@
#include "logging/logMessageBuffer.hpp"
#include "memory/allocation.hpp"
#include "runtime/mutex.hpp"
#include "runtime/os.inline.hpp"
#include "runtime/nonJavaThread.hpp"
#include "runtime/semaphore.hpp"
#include "utilities/resourceHash.hpp"
@ -59,7 +60,9 @@ class LogFileStreamOutput;
class AsyncLogWriter : public NonJavaThread {
friend class AsyncLogTest;
friend class AsyncLogTest_logBuffer_vm_Test;
class AsyncLogLocker;
class Locker;
class ProducerLocker;
class ConsumerLocker;
// account for dropped messages
template <AnyObj::allocation_type ALLOC_TYPE>
@ -125,7 +128,7 @@ class AsyncLogWriter : public NonJavaThread {
}
void push_flush_token();
bool push_back(LogFileStreamOutput* output, const LogDecorations& decorations, const char* msg);
bool push_back(LogFileStreamOutput* output, const LogDecorations& decorations, const char* msg, const size_t msg_len);
void reset() {
// Ensure _pos is Message-aligned
@ -159,8 +162,13 @@ class AsyncLogWriter : public NonJavaThread {
static AsyncLogWriter* _instance;
Semaphore _flush_sem;
// Can't use a Monitor here as we need a low-level API that can be used without Thread::current().
PlatformMonitor _lock;
// Producers take both locks in the order producer lock and then consumer lock.
// The consumer protects the buffers and performs all communication between producer and consumer via wait/notify.
// This allows a producer to await progress from the consumer thread (by only releasing the producer lock)), whilst preventing all other producers from progressing.
PlatformMonitor _producer_lock;
PlatformMonitor _consumer_lock;
bool _data_available;
// _initialized is set to true if the constructor succeeds
volatile bool _initialized;
AsyncLogMap<AnyObj::C_HEAP> _stats;
@ -168,11 +176,17 @@ class AsyncLogWriter : public NonJavaThread {
Buffer* _buffer;
Buffer* _buffer_staging;
// Stalled message
// Stalling is implemented by the producer writing to _stalled_message, notifying the consumer lock and releasing it.
// The consumer will then write all of the current buffers' content and then write the stalled message, at the end notifying the consumer lock and releasing it for the
// owning producer thread of the stalled message. This thread will finally release both locks in order, allowing for other producers to continue.
volatile Message* _stalled_message;
static const LogDecorations& None;
AsyncLogWriter();
void enqueue_locked(LogFileStreamOutput* output, const LogDecorations& decorations, const char* msg);
void write(AsyncLogMap<AnyObj::RESOURCE_AREA>& snapshot);
bool write(AsyncLogMap<AnyObj::RESOURCE_AREA>& snapshot);
void run() override;
void pre_run() override {
NonJavaThread::pre_run();

View File

@ -636,10 +636,15 @@ void LogConfiguration::print_command_line_help(outputStream* out) {
out->cr();
out->print_cr("Asynchronous logging (off by default):");
out->print_cr(" -Xlog:async");
out->print_cr(" -Xlog:async[:[mode]]");
out->print_cr(" All log messages are written to an intermediate buffer first and will then be flushed"
" to the corresponding log outputs by a standalone thread. Write operations at logsites are"
" guaranteed non-blocking.");
out->print_cr(" A mode, either 'drop' or 'stall', may be provided. If 'drop' is provided then"
" messages will be dropped if there is no room in the intermediate buffer."
" If 'stall' is provided then the log operation will wait for room to be made by the output thread, without dropping any messages."
" The default mode is 'drop'.");
out->cr();
out->print_cr("Some examples:");
@ -715,4 +720,20 @@ void LogConfiguration::notify_update_listeners() {
}
}
bool LogConfiguration::_async_mode = false;
LogConfiguration::AsyncMode LogConfiguration::_async_mode = AsyncMode::Off;
bool LogConfiguration::parse_async_argument(const char* async_tail) {
bool ret = true;
if (*async_tail == '\0') {
// Default is to drop.
LogConfiguration::set_async_mode(LogConfiguration::AsyncMode::Drop);
} else if (strcmp(async_tail, ":stall") == 0) {
LogConfiguration::set_async_mode(LogConfiguration::AsyncMode::Stall);
} else if (strcmp(async_tail, ":drop") == 0) {
LogConfiguration::set_async_mode(LogConfiguration::AsyncMode::Drop);
} else {
// User provided unknown async option
ret = false;
}
return ret;
}

View File

@ -62,7 +62,14 @@ class LogConfiguration : public AllStatic {
static UpdateListenerFunction* _listener_callbacks;
static size_t _n_listener_callbacks;
static bool _async_mode;
public:
enum class AsyncMode {
Off, Stall, Drop
};
private:
static AsyncMode _async_mode;
// Create a new output. Returns null if failed.
static LogOutput* new_output(const char* name, const char* options, outputStream* errstream);
@ -120,6 +127,8 @@ class LogConfiguration : public AllStatic {
const char* output_options,
outputStream* errstream);
static bool parse_async_argument(const char* async_tail);
// Prints log configuration to outputStream, used by JCmd/MBean.
static void describe(outputStream* out);
@ -129,9 +138,10 @@ class LogConfiguration : public AllStatic {
// Rotates all LogOutput
static void rotate_all_outputs();
static bool is_async_mode() { return _async_mode; }
static void set_async_mode(bool value) {
_async_mode = value;
static AsyncMode async_mode() { return _async_mode; }
static bool is_async_mode() { return _async_mode != AsyncMode::Off; }
static void set_async_mode(AsyncMode mode) {
_async_mode = mode;
}
};

View File

@ -2605,9 +2605,9 @@ jint Arguments::parse_each_vm_init_arg(const JavaVMInitArgs* args, JVMFlagOrigin
} else if (strcmp(tail, ":disable") == 0) {
LogConfiguration::disable_logging();
ret = true;
} else if (strcmp(tail, ":async") == 0) {
LogConfiguration::set_async_mode(true);
ret = true;
} else if (strncmp(tail, ":async", strlen(":async")) == 0) {
const char* async_tail = tail + strlen(":async");
ret = LogConfiguration::parse_async_argument(async_tail);
} else if (*tail == '\0') {
ret = LogConfiguration::parse_command_line_arguments();
assert(ret, "-Xlog without arguments should never fail to parse");

View File

@ -1871,7 +1871,7 @@ const int ObjectAlignmentInBytes = 8;
product(size_t, AsyncLogBufferSize, 2*M, \
"Memory budget (in bytes) for the buffer of Asynchronous " \
"Logging (-Xlog:async).") \
range(100*K, 50*M) \
range(DEBUG_ONLY(192) NOT_DEBUG(100*K), 50*M) \
\
product(bool, CheckIntrinsics, true, DIAGNOSTIC, \
"When a class C is loaded, check that " \

View File

@ -3357,16 +3357,18 @@ getting overwritten.
### -Xlog Output Mode
By default logging messages are output synchronously - each log message is written to
the designated output when the logging call is made. But you can instead use asynchronous
the designated output when the logging call is made. You can instead use asynchronous
logging mode by specifying:
`-Xlog:async`
`-Xlog:async[:[stall|drop]]`
: Write all logging asynchronously.
In asynchronous logging mode, log sites enqueue all logging messages to an intermediate buffer
and a standalone thread is responsible for flushing them to the corresponding outputs. The
intermediate buffer is bounded and on buffer exhaustion the enqueuing message is discarded.
Log entry write operations are guaranteed non-blocking.
intermediate buffer is bounded. On buffer exhaustion the enqueuing message is either discarded (`async:drop`),
or logging threads are stalled until the flushing thread catches up (`async:stall`).
If no specific mode is chosen, then `async:drop` is chosen by default.
Log entry write operations are guaranteed to be non-blocking in the `async:drop` case.
The option `-XX:AsyncLogBufferSize=N` specifies the memory budget in bytes for the intermediate buffer.
The default value should be big enough to cater for most cases. Users can provide a custom value to

View File

@ -25,6 +25,7 @@
#include "jvm.h"
#include "logging/log.hpp"
#include "logging/logAsyncWriter.hpp"
#include "logging/logConfiguration.hpp"
#include "logging/logFileOutput.hpp"
#include "logging/logMessage.hpp"
#include "logTestFixture.hpp"
@ -173,10 +174,10 @@ TEST_VM_F(AsyncLogTest, logBuffer) {
const uintptr_t mask = (uintptr_t)(sizeof(void*) - 1);
bool res;
res = buffer->push_back(output, Default, "a log line");
res = buffer->push_back(output, Default, "a log line", strlen("a log line"));
EXPECT_TRUE(res) << "first message should succeed.";
line++;
res = buffer->push_back(output, Default, "yet another");
res = buffer->push_back(output, Default, "yet another", strlen("yet another"));
EXPECT_TRUE(res) << "second message should succeed.";
line++;
@ -201,7 +202,7 @@ TEST_VM_F(AsyncLogTest, logBuffer) {
written = e->output()->write_blocking(e->decorations(), e->message());
EXPECT_GT(written, 0);
while (buffer->push_back(output, Default, "0123456789abcdef")) {
while (buffer->push_back(output, Default, "0123456789abcdef", strlen("0123456789abcdef"))) {
line++;
}
@ -252,6 +253,18 @@ TEST_VM_F(AsyncLogTest, droppingMessage) {
EXPECT_TRUE(file_contains_substring(TestLogFileName, "messages dropped due to async logging"));
}
TEST_VM_F(AsyncLogTest, StallingModePreventsDroppedMessages) {
if (AsyncLogWriter::instance() == nullptr) {
return;
}
set_log_config(TestLogFileName, "logging=debug");
LogConfiguration::AsyncMode prev_mode = LogConfiguration::async_mode();
LogConfiguration::set_async_mode(LogConfiguration::AsyncMode::Off);
test_asynclog_drop_messages();
EXPECT_FALSE(file_contains_substring(TestLogFileName, "messages dropped due to async logging"));
LogConfiguration::set_async_mode(prev_mode);
}
TEST_VM_F(AsyncLogTest, stdoutOutput) {
testing::internal::CaptureStdout();

View File

@ -0,0 +1,57 @@
/*
* Copyright (c) 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
/*
* @test
* @summary Stress test async UL in dropping and stalling mode
* @requires vm.flagless
* @requires vm.debug
* @library /test/lib
* @modules java.base/jdk.internal.misc
* @run driver StressAsyncUL
*/
import jdk.test.lib.process.OutputAnalyzer;
import jdk.test.lib.process.ProcessTools;
public class StressAsyncUL {
static void analyze_output(String... args) throws Exception {
ProcessBuilder pb =
ProcessTools.createLimitedTestJavaProcessBuilder(args);
OutputAnalyzer output = new OutputAnalyzer(pb.start());
output.shouldHaveExitValue(0);
}
public static void main(String[] args) throws Exception {
analyze_output("-Xlog:async:drop", "-Xlog:all=trace", InnerClass.class.getName());
analyze_output("-Xlog:async:stall", "-Xlog:all=trace", InnerClass.class.getName());
// Stress test with a very small buffer. Note: Any valid buffer size must be able to hold a flush token.
// Therefore the size of the buffer cannot be zero.
analyze_output("-Xlog:async:drop", "-Xlog:all=trace", "-XX:AsyncLogBufferSize=192", InnerClass.class.getName());
analyze_output("-Xlog:async:stall", "-Xlog:all=trace", "-XX:AsyncLogBufferSize=192", InnerClass.class.getName());
}
public static class InnerClass {
public static void main(String[] args) {
}
}
}