Remove excessive locking

This commit is contained in:
Daniel Jelinski 2026-01-27 11:48:42 +01:00
parent 38b66b1258
commit 2504e17096

View File

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2022, 2024, Oracle and/or its affiliates. All rights reserved. * Copyright (c) 2022, 2026, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
* *
* This code is free software; you can redistribute it and/or modify it * This code is free software; you can redistribute it and/or modify it
@ -292,45 +292,46 @@ final class Http3ServerStreamImpl {
} }
class RequestBodyInputStream extends InputStream { class RequestBodyInputStream extends InputStream {
// non-null if the QUIC stream was reset, or an exception occurred
volatile IOException error; volatile IOException error;
// true if close() was called
volatile boolean closed; volatile boolean closed;
// uses an unbounded blocking queue in which the readrLoop // uses an unbounded blocking queue in which the readrLoop
// publishes the DataFrames payload... // publishes the DataFrames payload...
ByteBuffer current; ByteBuffer current;
// Use lock to avoid pinned threads on the blocking queue
final ReentrantLock lock = new ReentrantLock();
ByteBuffer current() throws IOException { ByteBuffer current() throws IOException {
lock.lock(); while (true) {
try { if (current != null && current.hasRemaining()) {
while (true) { return current;
if (current != null && current.hasRemaining()) { }
return current; if (current == QuicStreamReader.EOF) return current;
} try {
if (current == QuicStreamReader.EOF) return current; if (debug.on())
try { debug.log("Taking buffer from queue");
if (debug.on()) // Blocking call
debug.log("Taking buffer from queue"); current = requestBodyQueue.take();
// Blocking call } catch (InterruptedException e) {
current = requestBodyQueue.take(); var io = new InterruptedIOException();
} catch (InterruptedException e) { Thread.currentThread().interrupt();
var io = new InterruptedIOException(); io.initCause(e);
Thread.currentThread().interrupt(); close(io);
io.initCause(e); var error = this.error;
close(io); if (error != null) throw error;
var error = this.error;
if (error != null) throw error;
}
} }
} finally {
lock.unlock();
} }
} }
@Override @Override
public int read() throws IOException { public int read() throws IOException {
if (closed) {
throw new IOException("Stream is closed");
}
ByteBuffer buffer = current(); ByteBuffer buffer = current();
if (buffer == QuicStreamReader.EOF) { if (buffer == QuicStreamReader.EOF) {
if (closed) {
throw new IOException("Stream is closed");
}
var error = this.error; var error = this.error;
if (error == null) return -1; if (error == null) return -1;
throw error; throw error;
@ -341,11 +342,17 @@ final class Http3ServerStreamImpl {
@Override @Override
public int read(byte[] b, int off, int len) throws IOException { public int read(byte[] b, int off, int len) throws IOException {
Objects.checkFromIndexSize(off, len, b.length); Objects.checkFromIndexSize(off, len, b.length);
if (closed) {
throw new IOException("Stream is closed");
}
int remaining = len; int remaining = len;
while (remaining > 0) { while (remaining > 0) {
ByteBuffer buffer = current(); ByteBuffer buffer = current();
if (buffer == QuicStreamReader.EOF) { if (buffer == QuicStreamReader.EOF) {
if (len == remaining) { if (len == remaining) {
if (closed) {
throw new IOException("Stream is closed");
}
var error = this.error; var error = this.error;
if (error == null) return -1; if (error == null) return -1;
throw error; throw error;
@ -360,32 +367,19 @@ final class Http3ServerStreamImpl {
@Override @Override
public void close() throws IOException { public void close() throws IOException {
lock.lock(); if (closed) return;
try { closed = true;
if (closed) return;
closed = true;
} finally {
lock.unlock();
}
if (debug.on()) if (debug.on())
debug.log("Closing request body input stream"); debug.log("Closing request body input stream");
requestBodyQueue.add(QuicStreamReader.EOF); requestBodyQueue.add(QuicStreamReader.EOF);
} }
void close(IOException io) { void close(IOException io) {
lock.lock(); if (error != null) return;
try { error = io;
if (closed) return;
closed = true;
error = io;
} finally {
lock.unlock();
}
if (debug.on()) { if (debug.on()) {
debug.log("Closing request body input stream: " + io); debug.log("Closing request body input stream: " + io);
} }
requestBodyQueue.clear();
requestBodyQueue.add(QuicStreamReader.EOF); requestBodyQueue.add(QuicStreamReader.EOF);
} }
} }