8248564: JFR: Remote Recording Stream

Reviewed-by: mgronlun
This commit is contained in:
Erik Gahlin 2020-11-30 08:19:08 +00:00
parent 9bcd2695c3
commit 738efea9c6
44 changed files with 3279 additions and 147 deletions

View File

@ -103,8 +103,8 @@ class JfrChunkHeadWriter : public StackObj {
_writer->be_write(PAD);
}
void write_next_generation() {
_writer->be_write(_chunk->next_generation());
void write_next_generation(bool finalize) {
_writer->be_write(finalize ? COMPLETE : _chunk->next_generation());
_writer->be_write(PAD);
}
@ -199,9 +199,9 @@ int64_t JfrChunkWriter::write_chunk_header_checkpoint(bool flushpoint) {
const int64_t chunk_size_offset = reserve(sizeof(int64_t)); // size to be decided when we are done
be_write(event_size_offset); // last checkpoint offset will be this checkpoint
head.write_metadata();
head.write_time(false);
head.write_time(!flushpoint);
head.write_cpu_frequency();
head.write_next_generation();
head.write_next_generation(!flushpoint);
head.write_flags();
assert(current_offset() - header_content_pos == HEADER_SIZE, "invariant");
const u4 checkpoint_size = current_offset() - event_size_offset;

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2016, 2018, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2016, 2020, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -28,6 +28,8 @@ package jdk.jfr;
import java.time.Duration;
import java.util.Map;
import jdk.jfr.internal.management.EventSettingsModifier;
/**
* Convenience class for applying event settings to a recording.
* <p>
@ -55,6 +57,26 @@ import java.util.Map;
*/
public abstract class EventSettings {
// Used to provide EventSettings for jdk.management.jfr module
static class DelegatedEventSettings extends EventSettings {
private final EventSettingsModifier delegate;
DelegatedEventSettings(EventSettingsModifier modifier) {
this.delegate = modifier;
}
@Override
public EventSettings with(String name, String value) {
delegate.with(name, value);
return this;
}
@Override
Map<String, String> toMap() {
return delegate.toMap();
}
}
// package private
EventSettings() {
}

View File

@ -37,6 +37,7 @@ import jdk.jfr.internal.PlatformRecording;
import jdk.jfr.internal.PrivateAccess;
import jdk.jfr.internal.Type;
import jdk.jfr.internal.Utils;
import jdk.jfr.internal.management.EventSettingsModifier;
/**
* Permission for controlling access to Flight Recorder.
@ -197,6 +198,11 @@ public final class FlightRecorderPermission extends java.security.BasicPermissio
public AccessControlContext getContext(SettingControl settingControl) {
return settingControl.getContext();
}
@Override
public EventSettings newEventSettings(EventSettingsModifier esm) {
return new EventSettings.DelegatedEventSettings(esm);
}
}
/**

View File

@ -31,6 +31,7 @@ import java.security.AccessControlContext;
import java.security.AccessController;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.Objects;
import java.util.function.Consumer;
@ -137,7 +138,7 @@ public interface EventStream extends AutoCloseable {
*/
public static EventStream openRepository() throws IOException {
Utils.checkAccessFlightRecorder();
return new EventDirectoryStream(AccessController.getContext(), null, SecuritySupport.PRIVILEGED, null);
return new EventDirectoryStream(AccessController.getContext(), null, SecuritySupport.PRIVILEGED, null, Collections.emptyList());
}
/**
@ -160,7 +161,7 @@ public interface EventStream extends AutoCloseable {
public static EventStream openRepository(Path directory) throws IOException {
Objects.nonNull(directory);
AccessControlContext acc = AccessController.getContext();
return new EventDirectoryStream(acc, directory, FileAccess.UNPRIVILEGED, null);
return new EventDirectoryStream(acc, directory, FileAccess.UNPRIVILEGED, null, Collections.emptyList());
}
/**
@ -182,6 +183,22 @@ public interface EventStream extends AutoCloseable {
return new EventFileStream(AccessController.getContext(), file);
}
/**
* Registers an action to perform when new metadata arrives in the stream.
*
* The event type of an event always arrives sometime before the actual event.
* The action must be registered before the stream is started.
*
* @implSpec The default implementation of this method is empty.
*
* @param action to perform, not {@code null}
*
* @throws IllegalStateException if an action is added after the stream has
* started
*/
default void onMetadata(Consumer<MetadataEvent> action) {
}
/**
* Registers an action to perform on all events in the stream.
*

View File

@ -0,0 +1,118 @@
/*
* Copyright (c) 2020, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package jdk.jfr.consumer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import jdk.jfr.Configuration;
import jdk.jfr.EventType;
/**
* Event that contains information about event types and configurations.
*
* @since 16
*/
public final class MetadataEvent {
private final List<EventType> current;
private final List<EventType> previous;
private final List<Configuration> configurations;
private List<EventType> added;
private List<EventType> removed;
/* package private */
MetadataEvent(List<EventType> previous, List<EventType> current, List<Configuration> configs) {
this.previous = previous;
this.current = current;
this.configurations = configs;
}
/**
* Returns a list of the current event types being used.
*
* @return an immutable list of event types, not {@code null}
*/
public final List<EventType> getEventTypes() {
return Collections.unmodifiableList(current);
}
/**
* Returns a list of added event types since the last metadata event.
* <p>
* The delta will be from the last metadata event. If no metadata event has been
* emitted earlier, all known event types will be in the list.
*
* @return an immutable list of added event types, not {@code null}
*/
public final List<EventType> getAddedEventTypes() {
if (added == null) {
calculateDelta();
}
return added;
}
/**
* Returns a list of removed event types since the last metadata event.
* <p>
* The delta will be from the last metadata event. If no metadata event has been
* emitted earlier, the list will be empty.
*
* @return an immutable list of added event types, not {@code null}
*/
public final List<EventType> getRemovedEventTypes() {
if (removed == null) {
calculateDelta();
}
return removed;
}
/**
* Returns a list of configurations.
*
* @return an immutable list of configurations, not {@code null}
*/
public List<Configuration> getConfigurations() {
return configurations;
}
private void calculateDelta() {
List<EventType> added = new ArrayList<>();
Map<Long, EventType> previousSet = new HashMap<>(previous.size());
for (EventType eventType : previous) {
previousSet.put(eventType.getId(), eventType);
}
for (EventType eventType : current) {
EventType t = previousSet.remove(eventType.getId());
if (t == null) {
added.add(eventType);
}
}
this.removed = Collections.unmodifiableList(new ArrayList<>(previousSet.values()));
this.added = Collections.unmodifiableList(added);
}
}

View File

@ -34,15 +34,19 @@ import java.time.OffsetDateTime;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
import jdk.jfr.Configuration;
import jdk.jfr.EventType;
import jdk.jfr.Timespan;
import jdk.jfr.Timestamp;
import jdk.jfr.ValueDescriptor;
import jdk.jfr.internal.consumer.JdkJfrConsumer;
import jdk.jfr.internal.consumer.ObjectFactory;
import jdk.jfr.internal.PrivateAccess;
import jdk.jfr.internal.Type;
import jdk.jfr.internal.consumer.EventDirectoryStream;
import jdk.jfr.internal.consumer.JdkJfrConsumer;
import jdk.jfr.internal.consumer.ObjectContext;
import jdk.jfr.internal.consumer.ObjectFactory;
import jdk.jfr.internal.tool.PrettyWriter;
/**
@ -136,6 +140,12 @@ public class RecordedObject {
public Object[] eventValues(RecordedEvent event) {
return event.objects;
}
@Override
public MetadataEvent newMetadataEvent(List<EventType> previous, List<EventType> current,
List<Configuration> configurations) {
return new MetadataEvent(previous, current, configurations);
}
};
JdkJfrConsumer.setAccess(access);
}

View File

@ -30,6 +30,8 @@ import java.security.AccessControlContext;
import java.security.AccessController;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
@ -43,6 +45,7 @@ import jdk.jfr.internal.PrivateAccess;
import jdk.jfr.internal.SecuritySupport;
import jdk.jfr.internal.Utils;
import jdk.jfr.internal.consumer.EventDirectoryStream;
import jdk.jfr.internal.consumer.JdkJfrConsumer;
/**
* A recording stream produces events from the current JVM (Java Virtual
@ -85,13 +88,21 @@ public final class RecordingStream implements AutoCloseable, EventStream {
this.recording = new Recording();
try {
PlatformRecording pr = PrivateAccess.getInstance().getPlatformRecording(recording);
this.directoryStream = new EventDirectoryStream(acc, null, SecuritySupport.PRIVILEGED, pr);
this.directoryStream = new EventDirectoryStream(acc, null, SecuritySupport.PRIVILEGED, pr, configurations());
} catch (IOException ioe) {
this.recording.close();
throw new IllegalStateException(ioe.getMessage());
}
}
private List<Configuration> configurations() {
try {
return Configuration.getConfigurations();
} catch (Exception e) {
return Collections.emptyList();
}
}
/**
* Creates a recording stream using settings from a configuration.
* <p>
@ -361,4 +372,9 @@ public final class RecordingStream implements AutoCloseable, EventStream {
public void awaitTermination() throws InterruptedException {
directoryStream.awaitTermination();
}
@Override
public void onMetadata(Consumer<MetadataEvent> action) {
directoryStream.onMetadata(action);
}
}

View File

@ -86,6 +86,7 @@ public final class PlatformRecording implements AutoCloseable {
private boolean shouldWriteActiveRecordingEvent = true;
private Duration flushInterval = Duration.ofSeconds(1);
private long finalStartChunkNanos = Long.MIN_VALUE;
private long startNanos = -1;
PlatformRecording(PlatformRecorder recorder, long id) {
// Typically the access control context is taken
@ -103,7 +104,6 @@ public final class PlatformRecording implements AutoCloseable {
public long start() {
RecordingState oldState;
RecordingState newState;
long startNanos = -1;
synchronized (recorder) {
oldState = getState();
if (!Utils.isBefore(state, RecordingState.RUNNING)) {
@ -826,6 +826,10 @@ public final class PlatformRecording implements AutoCloseable {
}
}
public long getStartNanos() {
return startNanos;
}
public long getFinalChunkStartNanos() {
return finalStartChunkNanos;
}
@ -833,4 +837,18 @@ public final class PlatformRecording implements AutoCloseable {
public void setFinalStartnanos(long chunkStartNanos) {
this.finalStartChunkNanos = chunkStartNanos;
}
public void removeBefore(Instant timestamp) {
synchronized (recorder) {
while (!chunks.isEmpty()) {
RepositoryChunk oldestChunk = chunks.peek();
if (!oldestChunk.getEndTime().isBefore(timestamp)) {
return;
}
chunks.removeFirst();
removed(oldestChunk);
}
}
}
}

View File

@ -31,12 +31,14 @@ import java.util.Map;
import jdk.jfr.AnnotationElement;
import jdk.jfr.Configuration;
import jdk.jfr.EventSettings;
import jdk.jfr.EventType;
import jdk.jfr.FlightRecorderPermission;
import jdk.jfr.Recording;
import jdk.jfr.SettingControl;
import jdk.jfr.SettingDescriptor;
import jdk.jfr.ValueDescriptor;
import jdk.jfr.internal.management.EventSettingsModifier;
/**
* Provides access to package private function in jdk.jfr.
@ -98,4 +100,6 @@ public abstract class PrivateAccess {
public abstract PlatformRecorder getPlatformRecorder();
public abstract AccessControlContext getContext(SettingControl sc);
public abstract EventSettings newEventSettings(EventSettingsModifier esm);
}

View File

@ -742,6 +742,12 @@ public final class Utils {
}
public static Instant epochNanosToInstant(long epochNanos) {
long epochSeconds = epochNanos / 1_000_000_000L;
long nanoAdjustment = epochNanos - 1_000_000_000L * epochSeconds;
return Instant.ofEpochSecond(epochSeconds, nanoAdjustment);
}
public static long timeToNanos(Instant timestamp) {
return timestamp.getEpochSecond() * 1_000_000_000L + timestamp.getNano();
}

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2019, 2020, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -31,11 +31,16 @@ import java.security.AccessController;
import java.security.PrivilegedAction;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import jdk.jfr.Configuration;
import jdk.jfr.EventType;
import jdk.jfr.consumer.EventStream;
import jdk.jfr.consumer.MetadataEvent;
import jdk.jfr.consumer.RecordedEvent;
import jdk.jfr.internal.LogLevel;
import jdk.jfr.internal.LogTag;
@ -47,23 +52,25 @@ import jdk.jfr.internal.SecuritySupport;
* Purpose of this class is to simplify the implementation of
* an event stream.
*/
abstract class AbstractEventStream implements EventStream {
public abstract class AbstractEventStream implements EventStream {
private final static AtomicLong counter = new AtomicLong();
private final Object terminated = new Object();
private final Runnable flushOperation = () -> dispatcher().runFlushActions();
private final AccessControlContext accessControllerContext;
private final StreamConfiguration configuration = new StreamConfiguration();
private final PlatformRecording recording;
private final StreamConfiguration streamConfiguration = new StreamConfiguration();
protected final PlatformRecording recording;
private final List<Configuration> configurations;
private volatile Thread thread;
private Dispatcher dispatcher;
private volatile boolean closed;
AbstractEventStream(AccessControlContext acc, PlatformRecording recording) throws IOException {
AbstractEventStream(AccessControlContext acc, PlatformRecording recording, List<Configuration> configurations) throws IOException {
this.accessControllerContext = Objects.requireNonNull(acc);
this.recording = recording;
this.configurations = configurations;
}
@Override
@ -76,10 +83,10 @@ abstract class AbstractEventStream implements EventStream {
abstract public void close();
protected final Dispatcher dispatcher() {
if (configuration.hasChanged()) { // quick check
synchronized (configuration) {
dispatcher = new Dispatcher(configuration);
configuration.setChanged(false);
if (streamConfiguration.hasChanged()) { // quick check
synchronized (streamConfiguration) {
dispatcher = new Dispatcher(streamConfiguration);
streamConfiguration.setChanged(false);
}
}
return dispatcher;
@ -87,74 +94,74 @@ abstract class AbstractEventStream implements EventStream {
@Override
public final void setOrdered(boolean ordered) {
configuration.setOrdered(ordered);
streamConfiguration.setOrdered(ordered);
}
@Override
public final void setReuse(boolean reuse) {
configuration.setReuse(reuse);
streamConfiguration.setReuse(reuse);
}
@Override
public final void setStartTime(Instant startTime) {
Objects.nonNull(startTime);
synchronized (configuration) {
if (configuration.started) {
synchronized (streamConfiguration) {
if (streamConfiguration.started) {
throw new IllegalStateException("Stream is already started");
}
if (startTime.isBefore(Instant.EPOCH)) {
startTime = Instant.EPOCH;
}
configuration.setStartTime(startTime);
streamConfiguration.setStartTime(startTime);
}
}
@Override
public final void setEndTime(Instant endTime) {
Objects.requireNonNull(endTime);
synchronized (configuration) {
if (configuration.started) {
synchronized (streamConfiguration) {
if (streamConfiguration.started) {
throw new IllegalStateException("Stream is already started");
}
configuration.setEndTime(endTime);
streamConfiguration.setEndTime(endTime);
}
}
@Override
public final void onEvent(Consumer<RecordedEvent> action) {
Objects.requireNonNull(action);
configuration.addEventAction(action);
streamConfiguration.addEventAction(action);
}
@Override
public final void onEvent(String eventName, Consumer<RecordedEvent> action) {
Objects.requireNonNull(eventName);
Objects.requireNonNull(action);
configuration.addEventAction(eventName, action);
streamConfiguration.addEventAction(eventName, action);
}
@Override
public final void onFlush(Runnable action) {
Objects.requireNonNull(action);
configuration.addFlushAction(action);
streamConfiguration.addFlushAction(action);
}
@Override
public final void onClose(Runnable action) {
Objects.requireNonNull(action);
configuration.addCloseAction(action);
streamConfiguration.addCloseAction(action);
}
@Override
public final void onError(Consumer<Throwable> action) {
Objects.requireNonNull(action);
configuration.addErrorAction(action);
streamConfiguration.addErrorAction(action);
}
@Override
public final boolean remove(Object action) {
Objects.requireNonNull(action);
return configuration.remove(action);
return streamConfiguration.remove(action);
}
@Override
@ -227,14 +234,14 @@ abstract class AbstractEventStream implements EventStream {
}
private void startInternal(long startNanos) {
synchronized (configuration) {
if (configuration.started) {
synchronized (streamConfiguration) {
if (streamConfiguration.started) {
throw new IllegalStateException("Event stream can only be started once");
}
if (recording != null && configuration.startTime == null) {
configuration.setStartNanos(startNanos);
if (recording != null && streamConfiguration.startTime == null) {
streamConfiguration.setStartNanos(startNanos);
}
configuration.setStarted(true);
streamConfiguration.setStarted(true);
}
}
@ -272,4 +279,29 @@ abstract class AbstractEventStream implements EventStream {
counter.incrementAndGet();
return "JFR Event Stream " + counter;
}
@Override
public void onMetadata(Consumer<MetadataEvent> action) {
Objects.requireNonNull(action);
synchronized (streamConfiguration) {
if (streamConfiguration.started) {
throw new IllegalStateException("Stream is already started");
}
}
streamConfiguration.addMetadataAction(action);
}
protected final void emitMetadataEvent(ChunkParser parser) {
if (parser.hasStaleMetadata()) {
if (dispatcher.hasMetadataHandler()) {
List<EventType> ce = parser.getEventTypes();
List<EventType> pe = parser.getPreviousEventTypes();
if (ce != pe) {
MetadataEvent me = JdkJfrConsumer.instance().newMetadataEvent(pe, ce, configurations);
dispatcher.runMetadataActions(me);
}
parser.setStaleMetadata(false);
}
}
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2016, 2019, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2016, 2020, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -34,15 +34,15 @@ import jdk.jfr.internal.MetadataDescriptor;
import jdk.jfr.internal.Utils;
public final class ChunkHeader {
private static final long HEADER_SIZE = 68;
private static final byte UPDATING_CHUNK_HEADER = (byte) 255;
private static final long CHUNK_SIZE_POSITION = 8;
private static final long DURATION_NANOS_POSITION = 40;
private static final long FILE_STATE_POSITION = 64;
private static final long FLAG_BYTE_POSITION = 67;
private static final long METADATA_TYPE_ID = 0;
private static final byte[] FILE_MAGIC = { 'F', 'L', 'R', '\0' };
private static final int MASK_FINAL_CHUNK = 1 << 1;
static final long HEADER_SIZE = 68;
static final byte UPDATING_CHUNK_HEADER = (byte) 255;
static final long CHUNK_SIZE_POSITION = 8;
static final long DURATION_NANOS_POSITION = 40;
static final long FILE_STATE_POSITION = 64;
static final long FLAG_BYTE_POSITION = 67;
static final long METADATA_TYPE_ID = 0;
static final byte[] FILE_MAGIC = { 'F', 'L', 'R', '\0' };
static final int MASK_FINAL_CHUNK = 1 << 1;
private final short major;
private final short minor;
@ -90,8 +90,8 @@ public final class ChunkHeader {
if (major != 1 && major != 2) {
throw new IOException("File version " + major + "." + minor + ". Only Flight Recorder files of version 1.x and 2.x can be read by this JDK.");
}
input.readRawLong(); // chunk size
Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Chunk: chunkSize=" + chunkSize);
long c = input.readRawLong(); // chunk size
Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Chunk: chunkSize=" + c);
input.readRawLong(); // constant pool position
Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Chunk: constantPoolPosition=" + constantPoolPosition);
input.readRawLong(); // metadata position
@ -109,7 +109,7 @@ public final class ChunkHeader {
input.position(absoluteEventStart);
}
void refresh() throws IOException {
public void refresh() throws IOException {
while (true) {
byte fileState1;
input.positionPhysical(absoluteChunkStart + FILE_STATE_POSITION);
@ -163,6 +163,14 @@ public final class ChunkHeader {
}
}
public boolean readHeader(byte[] bytes, int count) throws IOException {
input.position(absoluteChunkStart);
for (int i = 0; i< count; i++) {
bytes[i] = input.readPhysicalByte();
}
return bytes[(int)FILE_STATE_POSITION] != UPDATING_CHUNK_HEADER;
}
public void awaitFinished() throws IOException {
if (finished) {
return;

View File

@ -27,6 +27,7 @@ package jdk.jfr.internal.consumer;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.StringJoiner;
@ -95,9 +96,8 @@ public final class ChunkParser {
private static final String CHUNKHEADER = "jdk.types.ChunkHeader";
private final RecordingInput input;
private final ChunkHeader chunkHeader;
private final MetadataDescriptor metadata;
private final TimeConverter timeConverter;
private final MetadataDescriptor previousMetadata;
private final LongMap<ConstantLookup> constantLookups;
private LongMap<Type> typeMap;
@ -107,6 +107,9 @@ public final class ChunkParser {
private Runnable flushOperation;
private ParserConfiguration configuration;
private volatile boolean closed;
private MetadataDescriptor previousMetadata;
private MetadataDescriptor metadata;
private boolean staleMetadata = true;
public ChunkParser(RecordingInput input) throws IOException {
this(input, new ParserConfiguration());
@ -206,11 +209,13 @@ public final class ChunkParser {
// Read metadata and constant pools for the next segment
if (chunkHeader.getMetataPosition() != metadataPosition) {
Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Found new metadata in chunk. Rebuilding types and parsers");
MetadataDescriptor metadata = chunkHeader.readMetadata(previousMetadata);
this.previousMetadata = this.metadata;
this.metadata = chunkHeader.readMetadata(previousMetadata);
ParserFactory factory = new ParserFactory(metadata, constantLookups, timeConverter);
parsers = factory.getParsers();
typeMap = factory.getTypeMap();
updateConfiguration();
setStaleMetadata(true);
}
if (constantPosition != chunkHeader.getConstantPoolPosition()) {
Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Found new constant pool data. Filling up pools with new values");
@ -426,6 +431,14 @@ public final class ChunkParser {
return metadata.getEventTypes();
}
public List<EventType> getPreviousEventTypes() {
if (previousMetadata == null) {
return Collections.emptyList();
} else {
return previousMetadata.getEventTypes();
}
}
public boolean isLastChunk() throws IOException {
return chunkHeader.isLastChunk();
}
@ -456,7 +469,23 @@ public final class ChunkParser {
public void close() {
this.closed = true;
try {
input.close();
} catch(IOException e) {
// ignore
}
Utils.notifyFlush();
}
public long getEndNanos() {
return getStartNanos() + getChunkDuration();
}
public void setStaleMetadata(boolean stale) {
this.staleMetadata = stale;
}
public boolean hasStaleMetadata() {
return staleMetadata;
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2019, 2020, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -31,12 +31,15 @@ import java.util.List;
import java.util.function.Consumer;
import jdk.jfr.EventType;
import jdk.jfr.consumer.MetadataEvent;
import jdk.jfr.consumer.RecordedEvent;
import jdk.jfr.internal.LongMap;
import jdk.jfr.internal.consumer.ChunkParser.ParserConfiguration;
final class Dispatcher {
public final static RecordedEvent FLUSH_MARKER = JdkJfrConsumer.instance().newRecordedEvent(null, null, 0L, 0L);
final static class EventDispatcher {
private final static EventDispatcher[] NO_DISPATCHERS = new EventDispatcher[0];
@ -62,6 +65,7 @@ final class Dispatcher {
}
private final Consumer<Throwable>[] errorActions;
private final Consumer<MetadataEvent>[] metadataActions;
private final Runnable[] flushActions;
private final Runnable[] closeActions;
private final EventDispatcher[] dispatchers;
@ -81,25 +85,42 @@ final class Dispatcher {
this.flushActions = c.flushActions.toArray(new Runnable[0]);
this.closeActions = c.closeActions.toArray(new Runnable[0]);
this.errorActions = c.errorActions.toArray(new Consumer[0]);
this.metadataActions = c.metadataActions.toArray(new Consumer[0]);
this.dispatchers = c.eventActions.toArray(new EventDispatcher[0]);
this.parserConfiguration = new ParserConfiguration(0, Long.MAX_VALUE, c.reuse, c.ordered, buildFilter(dispatchers));
this.startTime = c.startTime;
this.endTime = c.endTime;
this.startNanos = c.startNanos;
this.endNanos = c.endNanos;
EventDispatcher[] ed = new EventDispatcher[1];
ed[0] = new EventDispatcher(null, e -> {
runFlushActions();
});
dispatcherLookup.put(1L, ed);
}
public void runFlushActions() {
Runnable[] flushActions = this.flushActions;
for (int i = 0; i < flushActions.length; i++) {
public void runMetadataActions(MetadataEvent event) {
Consumer<MetadataEvent>[] metadataActions = this.metadataActions;
for (int i = 0; i < metadataActions.length; i++) {
try {
flushActions[i].run();
metadataActions[i].accept(event);
} catch (Exception e) {
handleError(e);
}
}
}
public void runFlushActions() {
Runnable[] flushActions = this.flushActions;
for (int i = 0; i < flushActions.length; i++) {
try {
flushActions[i].run();
} catch (Exception e) {
handleError(e);
}
}
}
public void runCloseActions() {
Runnable[] closeActions = this.closeActions;
for (int i = 0; i < closeActions.length; i++) {
@ -185,4 +206,8 @@ final class Dispatcher {
private void defaultErrorHandler(Throwable e) {
e.printStackTrace();
}
public boolean hasMetadataHandler() {
return metadataActions.length > 0;
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2019, 2020, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -28,11 +28,15 @@ package jdk.jfr.internal.consumer;
import java.io.IOException;
import java.nio.file.Path;
import java.security.AccessControlContext;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
import jdk.jfr.Configuration;
import jdk.jfr.consumer.RecordedEvent;
import jdk.jfr.internal.JVM;
import jdk.jfr.internal.PlatformRecording;
@ -49,18 +53,20 @@ public class EventDirectoryStream extends AbstractEventStream {
private final static Comparator<? super RecordedEvent> EVENT_COMPARATOR = JdkJfrConsumer.instance().eventComparator();
private final RepositoryFiles repositoryFiles;
private final PlatformRecording recording;
private final FileAccess fileAccess;
private ChunkParser currentParser;
private long currentChunkStartNanos;
private RecordedEvent[] sortedCache;
private int threadExclusionLevel = 0;
protected volatile long maxSize;
protected volatile Duration maxAge;
public EventDirectoryStream(AccessControlContext acc, Path p, FileAccess fileAccess, PlatformRecording recording) throws IOException {
super(acc, recording);
private volatile Consumer<Long> onCompleteHandler;
public EventDirectoryStream(AccessControlContext acc, Path p, FileAccess fileAccess, PlatformRecording recording, List<Configuration> configurations) throws IOException {
super(acc, recording, configurations);
this.fileAccess = Objects.requireNonNull(fileAccess);
this.recording = recording;
this.repositoryFiles = new RepositoryFiles(fileAccess, p);
}
@ -71,6 +77,18 @@ public class EventDirectoryStream extends AbstractEventStream {
repositoryFiles.close();
if (currentParser != null) {
currentParser.close();
onComplete(currentParser.getEndNanos());
}
}
public void setChunkCompleteHandler(Consumer<Long> handler) {
onCompleteHandler = handler;
}
private void onComplete(long epochNanos) {
Consumer<Long> handler = onCompleteHandler;
if (handler != null) {
handler.accept(epochNanos);
}
}
@ -110,9 +128,9 @@ public class EventDirectoryStream extends AbstractEventStream {
Path path;
boolean validStartTime = recording != null || disp.startTime != null;
if (validStartTime) {
path = repositoryFiles.firstPath(disp.startNanos);
path = repositoryFiles.firstPath(disp.startNanos, true);
} else {
path = repositoryFiles.lastPath();
path = repositoryFiles.lastPath(true);
}
if (path == null) { // closed
return;
@ -122,9 +140,10 @@ public class EventDirectoryStream extends AbstractEventStream {
currentParser = new ChunkParser(input, disp.parserConfiguration);
long segmentStart = currentParser.getStartNanos() + currentParser.getChunkDuration();
long filterStart = validStartTime ? disp.startNanos : segmentStart;
long filterEnd = disp.endTime != null ? disp.endNanos: Long.MAX_VALUE;
long filterEnd = disp.endTime != null ? disp.endNanos : Long.MAX_VALUE;
while (!isClosed()) {
emitMetadataEvent(currentParser);
while (!isClosed() && !currentParser.isChunkFinished()) {
disp = dispatcher();
if (disp != lastDisp) {
@ -159,17 +178,19 @@ public class EventDirectoryStream extends AbstractEventStream {
return;
}
long durationNanos = currentParser.getChunkDuration();
long endChunkNanos = currentParser.getEndNanos();
if (durationNanos == 0) {
// Avoid reading the same chunk again and again if
// duration is 0 ns
durationNanos++;
}
path = repositoryFiles.nextPath(currentChunkStartNanos + durationNanos);
path = repositoryFiles.nextPath(currentChunkStartNanos + durationNanos, true);
if (path == null) {
return; // stream closed
}
currentChunkStartNanos = repositoryFiles.getTimestamp(path);
input.setFile(path);
onComplete(endChunkNanos);
currentParser = currentParser.newChunkParser();
// TODO: Optimization. No need filter when we reach new chunk
// Could set start = 0;
@ -199,6 +220,7 @@ public class EventDirectoryStream extends AbstractEventStream {
}
sortedCache[index++] = e;
}
emitMetadataEvent(currentParser);
// no events found
if (index == 0 && currentParser.isChunkFinished()) {
return;
@ -217,6 +239,7 @@ public class EventDirectoryStream extends AbstractEventStream {
while (true) {
RecordedEvent e = currentParser.readStreamingEvent();
if (e == null) {
emitMetadataEvent(currentParser);
return true;
} else {
c.dispatch(e);
@ -224,4 +247,11 @@ public class EventDirectoryStream extends AbstractEventStream {
}
}
public void setMaxSize(long maxSize) {
this.maxSize = maxSize;
}
public void setMaxAge(Duration maxAge) {
this.maxAge = maxAge;
}
}

View File

@ -29,9 +29,9 @@ import java.io.IOException;
import java.nio.file.Path;
import java.security.AccessControlContext;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Objects;
import jdk.jfr.consumer.RecordedEvent;
/**
@ -47,7 +47,7 @@ public final class EventFileStream extends AbstractEventStream {
private RecordedEvent[] cacheSorted;
public EventFileStream(AccessControlContext acc, Path path) throws IOException {
super(acc, null);
super(acc, null, Collections.emptyList());
Objects.requireNonNull(path);
this.input = new RecordingInput(path.toFile(), FileAccess.UNPRIVILEGED);
}
@ -87,6 +87,7 @@ public final class EventFileStream extends AbstractEventStream {
currentParser = new ChunkParser(input, disp.parserConfiguration);
while (!isClosed()) {
emitMetadataEvent(currentParser);
if (currentParser.getStartNanos() > end) {
close();
return;
@ -116,11 +117,16 @@ public final class EventFileStream extends AbstractEventStream {
int index = 0;
while (true) {
event = currentParser.readEvent();
if (event == Dispatcher.FLUSH_MARKER) {
emitMetadataEvent(currentParser);
dispatchOrdered(c, index);
index = 0;
continue;
}
if (event == null) {
Arrays.sort(cacheSorted, 0, index, EVENT_COMPARATOR);
for (int i = 0; i < index; i++) {
c.dispatch(cacheSorted[i]);
}
emitMetadataEvent(currentParser);
dispatchOrdered(c, index);
return;
}
if (index == cacheSorted.length) {
@ -132,13 +138,23 @@ public final class EventFileStream extends AbstractEventStream {
}
}
private void dispatchOrdered(Dispatcher c, int index) {
Arrays.sort(cacheSorted, 0, index, EVENT_COMPARATOR);
for (int i = 0; i < index; i++) {
c.dispatch(cacheSorted[i]);
}
}
private void processUnordered(Dispatcher c) throws IOException {
while (!isClosed()) {
RecordedEvent event = currentParser.readEvent();
if (event == null) {
emitMetadataEvent(currentParser);
return;
}
c.dispatch(event);
if (event != Dispatcher.FLUSH_MARKER) {
c.dispatch(event);
}
}
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2016, 2018, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2016, 2020, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -22,37 +22,25 @@
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package jdk.management.jfr;
package jdk.jfr.internal.consumer;
import java.io.BufferedInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
final class Stream implements Closeable {
import jdk.jfr.internal.management.EventByteStream;
private final long identifier;
public final class FinishedStream extends EventByteStream {
private final BufferedInputStream inputStream;
private final byte[] buffer;
private volatile long time;
Stream(InputStream is, long identifier, int blockSize) {
public FinishedStream(InputStream is, int blockSize) {
super();
this.inputStream = new BufferedInputStream(is, 50000);
this.identifier = identifier;
this.buffer = new byte[blockSize];
}
private void touch() {
time = System.currentTimeMillis();
}
public long getLastTouched() {
return time;
}
public byte[] read() throws IOException {
public synchronized byte[] read() throws IOException {
// OK to reuse buffer since this
// is only used for serialization
touch();
@ -71,11 +59,8 @@ final class Stream implements Closeable {
}
@Override
public void close() throws IOException {
public synchronized void close() throws IOException {
inputStream.close();
}
public long getId() {
return identifier;
}
}

View File

@ -28,7 +28,12 @@ package jdk.jfr.internal.consumer;
import java.io.IOException;
import java.util.Comparator;
import java.util.List;
import java.util.function.Consumer;
import jdk.jfr.Configuration;
import jdk.jfr.EventType;
import jdk.jfr.consumer.EventStream;
import jdk.jfr.consumer.MetadataEvent;
import jdk.jfr.consumer.RecordedClass;
import jdk.jfr.consumer.RecordedClassLoader;
import jdk.jfr.consumer.RecordedEvent;
@ -40,6 +45,7 @@ import jdk.jfr.consumer.RecordedThread;
import jdk.jfr.consumer.RecordedThreadGroup;
import jdk.jfr.consumer.RecordingFile;
import jdk.jfr.internal.Type;
/*
* Purpose of this class is to give package private access to
* the jdk.jfr.consumer package
@ -98,4 +104,7 @@ public abstract class JdkJfrConsumer {
public abstract void setEndTicks(RecordedEvent event, long endTicks);
public abstract Object[] eventValues(RecordedEvent event);
public abstract MetadataEvent newMetadataEvent(List<EventType> previous, List<EventType> current, List<Configuration> configuration);
}

View File

@ -0,0 +1,239 @@
/*
* Copyright (c) 2020, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package jdk.jfr.internal.consumer;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.time.Instant;
import jdk.jfr.Recording;
import jdk.jfr.RecordingState;
import jdk.jfr.internal.Utils;
import jdk.jfr.internal.SecuritySupport;
import jdk.jfr.internal.SecuritySupport.SafePath;
import jdk.jfr.internal.management.EventByteStream;
import jdk.jfr.internal.management.ManagementSupport;
public final class OngoingStream extends EventByteStream {
private static final byte[] EMPTY_ARRAY = new byte[0];
private static final int HEADER_SIZE = (int)ChunkHeader.HEADER_SIZE;
private static final int HEADER_FILE_STATE_POSITION = (int)ChunkHeader.FILE_STATE_POSITION;
private static final byte MODIFYING_STATE = ChunkHeader.UPDATING_CHUNK_HEADER;
private final RepositoryFiles repositoryFiles;
private final Recording recording;
private final int blockSize;
private final long endTimeNanos;
private final byte[] headerBytes = new byte[HEADER_SIZE];
private RecordingInput input;
private ChunkHeader header;
private long position;
private long startTimeNanos;
private Path path;
private boolean first = true;
public OngoingStream(Recording recording, int blockSize, long startTimeNanos, long endTimeNanos) {
super();
this.recording = recording;
this.blockSize = blockSize;
this.startTimeNanos = startTimeNanos;
this.endTimeNanos = endTimeNanos;
this.repositoryFiles = new RepositoryFiles(SecuritySupport.PRIVILEGED, null);
}
public synchronized byte[] read() throws IOException {
try {
return readBytes();
} catch (IOException ioe) {
if (recording.getState() == RecordingState.CLOSED) {
// Recording closed, return null;
return null;
}
// Something unexpected has happened.
throw ioe;
}
}
private byte[] readBytes() throws IOException {
touch();
if (recording.getState() == RecordingState.NEW) {
return EMPTY_ARRAY;
}
if (recording.getState() == RecordingState.DELAYED) {
return EMPTY_ARRAY;
}
if (first) {
// In case stream starts before recording
long s = ManagementSupport.getStartTimeNanos(recording);
startTimeNanos = Math.max(s, startTimeNanos);
first = false;
}
while (true) {
if (startTimeNanos > endTimeNanos) {
return null;
}
if (isRecordingClosed()) {
closeInput();
return null;
}
if (!ensurePath()) {
return EMPTY_ARRAY;
}
if (!ensureInput()) {
return EMPTY_ARRAY;
}
if (position < header.getChunkSize()) {
long size = Math.min(header.getChunkSize() - position, blockSize);
return readBytes((int) size);
}
if (header.isFinished()) {
if (header.getDurationNanos() < 1) {
throw new IOException("No progress");
}
startTimeNanos += header.getDurationNanos();
Instant timestamp = Utils.epochNanosToInstant(startTimeNanos);
ManagementSupport.removeBefore(recording, timestamp);
closeInput();
} else {
header.refresh();
if (position >= header.getChunkSize()) {
return EMPTY_ARRAY;
}
}
}
}
private boolean isRecordingClosed() {
return recording != null && recording.getState() == RecordingState.CLOSED;
}
private void closeInput() {
if (input != null) {
try {
input.close();
} catch (IOException ioe) {
// ignore
}
input = null;
position = 0;
path = null;
}
}
private byte[] readBytes(int size) throws IOException {
if (position == 0) {
return readWithHeader(size);
} else {
return readNonHeader(size);
}
}
private byte[] readNonHeader(int size) throws IOException {
byte[] result = new byte[size];
input.readFully(result);
position += size;
return result;
}
private byte[] readWithHeader(int size) throws IOException {
byte[] bytes = new byte[Math.max(HEADER_SIZE, size)];
for (int attempts = 0; attempts < 25; attempts++) {
// read twice and check files state to avoid simultaneous change by JVM
input.position(0);
input.readFully(bytes, 0, HEADER_SIZE);
input.position(0);
input.readFully(headerBytes);
if (bytes[HEADER_FILE_STATE_POSITION] != MODIFYING_STATE) {
if (bytes[HEADER_FILE_STATE_POSITION] == headerBytes[HEADER_FILE_STATE_POSITION]) {
ByteBuffer buffer = ByteBuffer.wrap(bytes);
// 0-3: magic
// 4-5: major
// 6-7: minor
// 8-15: chunk size
buffer.putLong(8, HEADER_SIZE);
// 16-23: constant pool offset
buffer.putLong(16, 0);
// 24-31: metadata offset
buffer.putLong(24, 0);
// 32-39: chunk start nanos
// 40-47 duration
buffer.putLong(40, 0);
// 48-55: chunk start ticks
// 56-63: ticks per second
// 64: file state
buffer.put(64, (byte) 1);
// 65-67: extension bit
int left = bytes.length - HEADER_SIZE;
input.readFully(bytes, HEADER_SIZE, left);
position += bytes.length;
return bytes;
}
}
takeNap();
}
return EMPTY_ARRAY;
}
private void takeNap() throws IOException {
try {
Thread.sleep(10);
} catch (InterruptedException ie) {
throw new IOException("Read operation interrupted", ie);
}
}
private boolean ensureInput() throws IOException {
if (input == null) {
if (SecuritySupport.getFileSize(new SafePath(path)) < HEADER_SIZE) {
return false;
}
input = new RecordingInput(path.toFile(), SecuritySupport.PRIVILEGED);
header = new ChunkHeader(input);
}
return true;
}
private boolean ensurePath() {
if (path == null) {
path = repositoryFiles.nextPath(startTimeNanos, false);
}
return path != null;
}
@Override
public synchronized void close() throws IOException {
closeInput();
// Close recording if stream times out.
if (recording.getName().startsWith(EventByteStream.NAME)) {
recording.close();
}
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2016, 2019, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2016, 2020, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -218,7 +218,10 @@ public final class RecordingInput implements DataInput, AutoCloseable {
@Override
public void close() throws IOException {
file.close();
RandomAccessFile ra = file;
if (ra != null) {
ra.close();
}
}
@Override
@ -426,13 +429,5 @@ public final class RecordingInput implements DataInput, AutoCloseable {
file = null;
initialize(path.toFile());
}
/*
*
*
*/
}

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2019, 2020, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -61,7 +61,7 @@ public final class RepositoryFiles {
private volatile boolean closed;
RepositoryFiles(FileAccess fileAccess, Path repository) {
public RepositoryFiles(FileAccess fileAccess, Path repository) {
this.repository = repository;
this.fileAccess = fileAccess;
this.waitObject = repository == null ? WAIT_OBJECT : new Object();
@ -71,26 +71,27 @@ public final class RepositoryFiles {
return pathLookup.get(p);
}
Path lastPath() {
if (waitForPaths()) {
public Path lastPath(boolean wait) {
if (updatePaths(wait)) {
return pathSet.lastEntry().getValue();
}
return null; // closed
}
Path firstPath(long startTimeNanos) {
if (waitForPaths()) {
public Path firstPath(long startTimeNanos, boolean wait) {
if (updatePaths(wait)) {
// Pick closest chunk before timestamp
Long time = pathSet.floorKey(startTimeNanos);
if (time != null) {
startTimeNanos = time;
}
return path(startTimeNanos);
return path(startTimeNanos, wait);
}
return null; // closed
}
private boolean waitForPaths() {
private boolean updatePaths(boolean wait) {
int beforeSize = pathLookup.size();
while (!closed) {
try {
if (updatePaths()) {
@ -103,12 +104,16 @@ public final class RepositoryFiles {
// was accessed, or if new file has been written yet
// Just ignore, and retry later.
}
nap();
if (wait) {
nap();
} else {
return pathLookup.size() > beforeSize;
}
}
return !closed;
}
Path nextPath(long startTimeNanos) {
public Path nextPath(long startTimeNanos, boolean wait) {
if (closed) {
return null;
}
@ -127,10 +132,10 @@ public final class RepositoryFiles {
// ignore
}
// try to get the next file
return path(startTimeNanos);
return path(startTimeNanos, wait);
}
private Path path(long timestamp) {
private Path path(long timestamp, boolean wait) {
if (closed) {
return null;
}
@ -143,7 +148,7 @@ public final class RepositoryFiles {
}
return path;
}
if (!waitForPaths()) {
if (!updatePaths(wait)) {
return null; // closed
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2019, 2020, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -30,6 +30,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import jdk.jfr.consumer.MetadataEvent;
import jdk.jfr.consumer.RecordedEvent;
import jdk.jfr.internal.Utils;
import jdk.jfr.internal.consumer.Dispatcher.EventDispatcher;
@ -39,6 +40,7 @@ final class StreamConfiguration {
final List<Runnable> flushActions = new ArrayList<>();
final List<EventDispatcher> eventActions = new ArrayList<>();
final List<Consumer<Throwable>> errorActions = new ArrayList<>();
final List<Consumer<MetadataEvent>> metadataActions = new ArrayList<>();
boolean reuse = true;
boolean ordered = true;
@ -56,6 +58,7 @@ final class StreamConfiguration {
removed |= closeActions.removeIf(e -> e == action);
removed |= errorActions.removeIf(e -> e == action);
removed |= eventActions.removeIf(e -> e.getAction() == action);
removed |= metadataActions.removeIf(e -> e == action);
if (removed) {
changed = true;
}
@ -86,6 +89,11 @@ final class StreamConfiguration {
changed = true;
}
public synchronized void addMetadataAction(Consumer<MetadataEvent> action) {
metadataActions.add(action);
changed = true;
}
public synchronized void setReuse(boolean reuse) {
this.reuse = reuse;
changed = true;

View File

@ -0,0 +1,71 @@
/*
* Copyright (c) 2016, 2020, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package jdk.jfr.internal.management;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.atomic.AtomicLong;
import jdk.jfr.Recording;
import jdk.jfr.internal.consumer.FinishedStream;
import jdk.jfr.internal.consumer.OngoingStream;
// abstract class that hides if a recording is ongoing or finished.
public abstract class EventByteStream implements Closeable {
public static final String NAME = "Remote Recording Stream";
private static AtomicLong idCounter = new AtomicLong();
private final long identifier;
private volatile long time;
public EventByteStream() {
this.identifier = idCounter.incrementAndGet();
}
public static EventByteStream newOngoingStream(Recording recording, int blockSize, long startTimeNanos,long endTimeNanos) {
return new OngoingStream(recording, blockSize, startTimeNanos, endTimeNanos);
}
public static EventByteStream newFinishedStream(InputStream is, int blockSize) {
return new FinishedStream(is, blockSize);
}
final protected void touch() {
time = System.currentTimeMillis();
}
final public long getLastTouched() {
return time;
}
abstract public byte[] read() throws IOException;
final public long getId() {
return identifier;
}
}

View File

@ -0,0 +1,40 @@
/*
* Copyright (c) 2020, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package jdk.jfr.internal.management;
import java.util.Map;
/**
* Purpose of the interface is to be able to provide an implementation of
* EventSettings in the jdk.management.jfr module.
*
*/
public interface EventSettingsModifier {
void with(String name, String value);
Map<String, String> toMap();
}

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2016, 2020, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2016, 2018, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -26,14 +26,22 @@
package jdk.jfr.internal.management;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.security.AccessControlContext;
import jdk.jfr.Configuration;
import jdk.jfr.EventSettings;
import jdk.jfr.EventType;
import jdk.jfr.Recording;
import jdk.jfr.consumer.EventStream;
import jdk.jfr.internal.JVMSupport;
import jdk.jfr.internal.LogLevel;
import jdk.jfr.internal.LogTag;
@ -43,6 +51,9 @@ import jdk.jfr.internal.PlatformRecording;
import jdk.jfr.internal.PrivateAccess;
import jdk.jfr.internal.Utils;
import jdk.jfr.internal.WriteableUserPath;
import jdk.jfr.internal.consumer.EventDirectoryStream;
import jdk.jfr.internal.consumer.FileAccess;
import jdk.jfr.internal.consumer.JdkJfrConsumer;
import jdk.jfr.internal.instrument.JDKEvents;
/**
@ -83,6 +94,11 @@ public final class ManagementSupport {
return Utils.parseTimespan(s);
}
// Reuse internal code for converting nanoseconds since epoch to Instant
public static Instant epochNanosToInstant(long epochNanos) {
return Utils.epochNanosToInstant(epochNanos);
}
// Reuse internal code for formatting settings
public static final String formatTimespan(Duration dValue, String separation) {
return Utils.formatTimespan(dValue, separation);
@ -93,6 +109,11 @@ public final class ManagementSupport {
Logger.log(LogTag.JFR, LogLevel.ERROR, message);
}
// Reuse internal logging mechanism
public static void logDebug(String message) {
Logger.log(LogTag.JFR, LogLevel.DEBUG, message);
}
// Get the textual representation when the destination was set, which
// requires access to jdk.jfr.internal.PlatformRecording
public static String getDestinationOriginalText(Recording recording) {
@ -101,6 +122,8 @@ public final class ManagementSupport {
return wup == null ? null : wup.getOriginalText();
}
// Needed to check if destination can be set, so FlightRecorderMXBean::setRecordingOption
// can abort if not all data is valid
public static void checkSetDestination(Recording recording, String destination) throws IOException{
PlatformRecording pr = PrivateAccess.getInstance().getPlatformRecording(recording);
if(destination != null){
@ -108,4 +131,45 @@ public final class ManagementSupport {
pr.checkSetDestination(wup);
}
}
// Needed to modify setting using fluent API.
public static EventSettings newEventSettings(EventSettingsModifier esm) {
return PrivateAccess.getInstance().newEventSettings(esm);
}
// When streaming an ongoing recording, consumed chunks should be removed
public static void removeBefore(Recording recording, Instant timestamp) {
PlatformRecording pr = PrivateAccess.getInstance().getPlatformRecording(recording);
pr.removeBefore(timestamp);
}
// Needed callback to detect when a chunk has been parsed.
public static void setOnChunkCompleteHandler(EventStream stream, Consumer<Long> consumer) {
EventDirectoryStream eds = (EventDirectoryStream) stream;
eds.setChunkCompleteHandler(consumer);
}
// Needed to start an ongoing stream at the right chunk, which
// can be identified by the start time with nanosecond precision.
public static long getStartTimeNanos(Recording recording) {
PlatformRecording pr = PrivateAccess.getInstance().getPlatformRecording(recording);
return pr.getStartNanos();
}
// Needed to produce Configuration objects for MetadataEvent
public static Configuration newConfiguration(String name, String label, String description, String provider,
Map<String, String> settings, String contents) {
return PrivateAccess.getInstance().newConfiguration(name, label, description, provider, settings, contents);
}
// Can't use EventStream.openRepository(...) because
// EventStream::onMetadataData need to supply MetadataEvent
// with configuration objects
public static EventStream newEventDirectoryStream(
AccessControlContext acc,
Path directory,
List<Configuration> confs) throws IOException {
return new EventDirectoryStream(acc, directory, FileAccess.UNPRIVILEGED, null, confs);
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2016, 2018, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2016, 2020, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -23,16 +23,17 @@
* questions.
*/
package jdk.management.jfr;
package jdk.jfr.internal.management;
import java.util.TimerTask;
// Helper class to StreamManager
final class StreamCleanupTask extends TimerTask {
private final Stream stream;
private final EventByteStream stream;
private final StreamManager manager;
StreamCleanupTask(StreamManager streamManager, Stream stream) {
StreamCleanupTask(StreamManager streamManager, EventByteStream stream) {
this.stream = stream;
this.manager = streamManager;
}

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2016, 2018, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2016, 2020, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -23,44 +23,64 @@
* questions.
*/
package jdk.management.jfr;
package jdk.jfr.internal.management;
import java.io.IOException;
import java.io.InputStream;
import java.time.Instant;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Timer;
import java.util.concurrent.TimeUnit;
final class StreamManager {
import jdk.jfr.Recording;
import jdk.jfr.internal.consumer.FinishedStream;
import jdk.jfr.internal.consumer.OngoingStream;
public static final long TIME_OUT = TimeUnit.MINUTES.toMillis(2);
// Exposes EventByteStreams to the FlightRecorderMXBean
public final class StreamManager {
public static final long TIME_OUT = TimeUnit.MINUTES.toMillis(200);
public static final int DEFAULT_BLOCK_SIZE = 50000;
private static long idCounter = 0;
private final Map<Long, Stream> streams = new HashMap<>();
private final Map<Long, EventByteStream> streams = new HashMap<>();
private Timer timer;
public synchronized Stream getStream(long streamIdentifer) {
Stream stream = streams.get(streamIdentifer);
public synchronized EventByteStream getStream(long streamIdentifer) {
EventByteStream stream = streams.get(streamIdentifer);
if (stream == null) {
throw new IllegalArgumentException("Unknown stream identifier " + streamIdentifer);
}
return stream;
}
public synchronized Stream create(InputStream is, int blockSize) {
idCounter++;
Stream stream = new Stream(is, idCounter, blockSize);
public synchronized EventByteStream createOngoing(Recording recording, int blockSize, Instant startTime, Instant endTime) {
long startTimeNanos = 0;
long endTimeNanos = Long.MAX_VALUE;
if (!startTime.equals(Instant.MIN)) {
startTimeNanos = startTime.getEpochSecond() * 1_000_000_000L;
startTimeNanos += startTime.getNano();
}
if (!endTime.equals(Instant.MAX)) {
endTimeNanos = endTime.getEpochSecond() * 1_000_000_000L;
endTimeNanos+= endTime.getNano();
}
EventByteStream stream = EventByteStream.newOngoingStream(recording, blockSize, startTimeNanos, endTimeNanos);
streams.put(stream.getId(), stream);
scheduleAbort(stream, System.currentTimeMillis() + TIME_OUT);
return stream;
}
public synchronized EventByteStream create(InputStream is, int blockSize) {
EventByteStream stream = EventByteStream.newFinishedStream(is, blockSize);
streams.put(stream.getId(), stream);
scheduleAbort(stream, System.currentTimeMillis() + TIME_OUT);
return stream;
}
public synchronized void destroy(Stream stream) {
public synchronized void destroy(EventByteStream stream) {
try {
stream.close();
} catch (IOException e) {
@ -73,7 +93,7 @@ final class StreamManager {
}
}
public synchronized void scheduleAbort(Stream s, long when) {
public synchronized void scheduleAbort(EventByteStream s, long when) {
if (timer == null) {
timer = new Timer(true);
}

View File

@ -0,0 +1,482 @@
/*
* Copyright (c) 2020, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package jdk.management.jfr;
import java.io.Closeable;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Iterator;
import java.util.Objects;
import jdk.jfr.internal.management.ManagementSupport;
final class DiskRepository implements Closeable {
final static class DiskChunk {
final Path path;
final Instant startTime;
Instant endTime;
long size;
DiskChunk(Path path, long startNanos) {
this.path = path;
this.startTime = ManagementSupport.epochNanosToInstant(startNanos);
}
}
enum State {
HEADER, EVENT_SIZE, EVENT_TYPE, CHECKPOINT_EVENT_TIMESTAMP, CHECKPOINT_EVENT_DURATION, CHECKPOINT_EVENT_DELTA,
CHECKPOINT_EVENT_FLUSH_TYPE, CHECKPOINT_EVENT_POOL_COUNT, CHECKPOINT_EVENT_HEADER_TYPE,
CHECKPOINT_EVENT_HEADER_ITEM_COUNT, CHECKPOINT_EVENT_HEADER_KEY, CHECKPOINT_EVENT_HEADER_BYTE_ARRAY_LENGTH,
CHECKPOINT_EVENT_HEADER_BYTE_ARRAY_CONTENT, EVENT_PAYLOAD;
public State next() {
return State.values()[ordinal() + 1];
}
}
static final byte CHECKPOINT_WITH_HEADER = (byte) 2;
static final byte MODIFYING_STATE = (byte) 255;
static final byte COMPLETE_STATE = (byte) 0;
static final int HEADER_FILE_STATE_POSITION = 64;
static final int HEADER_START_NANOS_POSITION = 32;
static final int HEADER_SIZE = 68;
static final int HEADER_FILE_DURATION = 40;
private final Deque<DiskChunk> activeChunks = new ArrayDeque<>();
private final Deque<DiskChunk> deadChunks = new ArrayDeque<>();
private final boolean deleteDirectory;
private final ByteBuffer buffer = ByteBuffer.allocate(256);
private final Path directory;
private RandomAccessFile raf;
private RandomAccessFile previousRAF;
private byte previousRAFstate;
private int index;
private int bufferIndex;
private State state = State.HEADER;
private byte[] currentByteArray;
private int typeId;
private int typeIdshift;
private int sizeShift;
private int payLoadSize;
private int longValueshift;
private int eventFieldSize;
private int lastFlush;
private DiskChunk currentChunk;
private Duration maxAge;
private long maxSize;
private long size;
public DiskRepository(Path path, boolean deleteDirectory) throws IOException {
this.directory = path;
this.deleteDirectory = deleteDirectory;
}
public synchronized void write(byte[] bytes) throws IOException {
index = 0;
lastFlush = 0;
currentByteArray = bytes;
while (index < bytes.length) {
switch (state) {
case HEADER:
processInitialHeader();
break;
case EVENT_SIZE:
processEventSize();
break;
case EVENT_TYPE:
processEventTypeId();
break;
case CHECKPOINT_EVENT_TIMESTAMP:
case CHECKPOINT_EVENT_DURATION:
case CHECKPOINT_EVENT_DELTA:
case CHECKPOINT_EVENT_POOL_COUNT:
case CHECKPOINT_EVENT_HEADER_TYPE:
case CHECKPOINT_EVENT_HEADER_ITEM_COUNT:
case CHECKPOINT_EVENT_HEADER_KEY:
case CHECKPOINT_EVENT_HEADER_BYTE_ARRAY_LENGTH:
processNumericValueInEvent();
bufferIndex = 0;
break;
case CHECKPOINT_EVENT_HEADER_BYTE_ARRAY_CONTENT:
processCheckPointHeader();
break;
case CHECKPOINT_EVENT_FLUSH_TYPE:
processFlush();
break;
case EVENT_PAYLOAD:
processEvent();
break;
default:
break;
}
}
// Don't write before header/file is complete
if (raf == null) {
return;
}
flush();
}
private void processFlush() throws IOException {
byte b = nextByte(true);
if ((b & CHECKPOINT_WITH_HEADER) != 0) {
state = State.CHECKPOINT_EVENT_POOL_COUNT;
} else {
state = State.EVENT_PAYLOAD;
}
}
private void processNumericValueInEvent() {
int b = nextByte(true);
// longValue += (((long) (b & 0x7FL)) << longValueshift);
if (b >= 0 || longValueshift == 56) {
state = state.next();
// longValue = 0;
longValueshift = 0;
} else {
longValueshift += 7;
}
}
private void processEvent() {
int left = currentByteArray.length - index;
if (left >= payLoadSize) {
index += payLoadSize;
payLoadSize = 0;
state = State.EVENT_SIZE;
} else {
index += left;
payLoadSize -= left;
}
}
private void processEventTypeId() {
byte b = nextByte(true);
long v = (b & 0x7FL);
typeId += (v << typeIdshift);
if (b >= 0) {
if (typeId == 1) {
state = State.CHECKPOINT_EVENT_TIMESTAMP;
} else {
state = State.EVENT_PAYLOAD;
}
typeIdshift = 0;
typeId = 0;
} else {
typeIdshift += 7;
}
}
private void processEventSize() throws IOException {
// End of chunk
if (previousRAF != null) {
flush();
state = State.HEADER;
return;
}
eventFieldSize++;
byte b = nextByte(false);
long v = (b & 0x7FL);
payLoadSize += (v << sizeShift);
if (b >= 0) {
if (payLoadSize == 0) {
throw new IOException("Event size can't be null." + index);
}
state = State.EVENT_TYPE;
sizeShift = 0;
payLoadSize -= eventFieldSize;
eventFieldSize = 0;
} else {
sizeShift += 7;
}
}
private void processInitialHeader() throws IOException {
buffer.put(bufferIndex, nextByte(false));
if (bufferIndex == HEADER_SIZE) {
writeInitialHeader();
state = State.EVENT_SIZE;
bufferIndex = 0;
if (index != lastFlush + HEADER_SIZE) {
throw new IOException("Expected data before header to be flushed");
}
lastFlush = index;
}
}
private void processCheckPointHeader() throws IOException {
buffer.put(bufferIndex, nextByte(true));
if (bufferIndex == HEADER_SIZE) {
writeCheckPointHeader();
state = State.EVENT_PAYLOAD;
bufferIndex = 0;
}
}
private void writeInitialHeader() throws IOException {
DiskChunk previous = currentChunk;
currentChunk = nextChunk();
raf = new RandomAccessFile(currentChunk.path.toFile(), "rw");
byte fileState = buffer.get(HEADER_FILE_STATE_POSITION);
buffer.put(HEADER_FILE_STATE_POSITION, MODIFYING_STATE);
raf.write(buffer.array(), 0, HEADER_SIZE);
// Complete previous chunk
completePrevious(previous);
raf.seek(HEADER_FILE_STATE_POSITION);
raf.writeByte(fileState);
raf.seek(HEADER_SIZE);
}
private void completePrevious(DiskChunk previous) throws IOException {
if (previousRAF != null) {
previousRAF.seek(HEADER_FILE_STATE_POSITION);
previousRAF.writeByte(previousRAFstate);
previousRAF.close();
addChunk(previous);
previousRAF = null;
previousRAFstate = (byte) 0;
}
}
private void writeCheckPointHeader() throws IOException {
Objects.requireNonNull(raf);
byte state = buffer.get(HEADER_FILE_STATE_POSITION);
boolean complete = state == COMPLETE_STATE;
buffer.put(HEADER_FILE_STATE_POSITION, MODIFYING_STATE);
flush();
long position = raf.getFilePointer();
raf.seek(HEADER_FILE_STATE_POSITION);
raf.writeByte(MODIFYING_STATE);
raf.seek(0);
raf.write(buffer.array(), 0, HEADER_SIZE);
if (!complete) {
raf.seek(HEADER_FILE_STATE_POSITION);
raf.writeByte(state);
} else {
// will set state to complete when
// header of next file is created.
previousRAF = raf;
previousRAFstate = state;
currentChunk.size = Files.size(currentChunk.path);
long durationNanos = buffer.getLong(HEADER_FILE_DURATION);
Duration d = Duration.ofNanos(durationNanos);
currentChunk.endTime = currentChunk.startTime.plus(d);
}
raf.seek(position);
}
private void flush() throws IOException {
int length = index - lastFlush;
if (length != 0) {
raf.write(currentByteArray, lastFlush, length);
lastFlush = index;
}
}
private byte nextByte(boolean inEvent) {
byte b = currentByteArray[index];
index++;
bufferIndex++;
if (inEvent) {
payLoadSize--;
}
return b;
}
private DiskChunk nextChunk() throws IOException {
long nanos = buffer.getLong(HEADER_START_NANOS_POSITION);
long epochSecond = nanos / 1_000_000_000;
int nanoOfSecond = (int) (nanos % 1_000_000_000);
ZoneOffset z = OffsetDateTime.now().getOffset();
LocalDateTime d = LocalDateTime.ofEpochSecond(epochSecond, nanoOfSecond, z);
String filename = formatDateTime(d);
Path p1 = directory.resolve(filename + ".jfr");
if (!Files.exists(p1)) {
return new DiskChunk(p1, nanos);
}
for (int i = 1; i < 100; i++) {
String s = Integer.toString(i);
if (i < 10) {
s = "0" + s;
}
Path p2 = directory.resolve(filename + "_" + s + ".jfr");
if (!Files.exists(p2)) {
return new DiskChunk(p2, nanos);
}
}
throw new IOException("Could not create chunk for path " + p1);
}
static String formatDateTime(LocalDateTime time) {
StringBuilder sb = new StringBuilder(19);
sb.append(time.getYear() / 100);
appendPadded(sb, time.getYear() % 100, true);
appendPadded(sb, time.getMonth().getValue(), true);
appendPadded(sb, time.getDayOfMonth(), true);
appendPadded(sb, time.getHour(), true);
appendPadded(sb, time.getMinute(), true);
appendPadded(sb, time.getSecond(), false);
return sb.toString();
}
private static void appendPadded(StringBuilder text, int number, boolean separator) {
if (number < 10) {
text.append('0');
}
text.append(number);
if (separator) {
text.append('_');
}
}
@Override
public synchronized void close() throws IOException {
completePrevious(currentChunk);
if (raf != null) {
raf.close();
}
deadChunks.addAll(activeChunks);
if (currentChunk != null) {
deadChunks.add(currentChunk);
}
cleanUpDeadChunk(Integer.MAX_VALUE);
if (deleteDirectory) {
try {
Files.delete(directory);
} catch (IOException ioe) {
ManagementSupport.logDebug("Could not delete temp stream repository: " + ioe.getMessage());
}
}
}
public synchronized void setMaxAge(Duration maxAge) {
this.maxAge = maxAge;
trimToAge(Instant.now().minus(maxAge));
}
public synchronized void setMaxSize(long maxSize) {
this.maxSize = maxSize;
trimToSize();
}
private void trimToSize() {
if (maxSize == 0) {
return;
}
int count = 0;
while (size > maxSize && activeChunks.size() > 1) {
removeOldestChunk();
count++;
}
cleanUpDeadChunk(count + 10);
}
private void trimToAge(Instant oldest) {
if (maxAge == null) {
return;
}
int count = 0;
while (activeChunks.size() > 1) {
DiskChunk oldestChunk = activeChunks.getLast();
if (oldestChunk.endTime.isAfter(oldest)) {
return;
}
removeOldestChunk();
count++;
}
cleanUpDeadChunk(count + 10);
}
public synchronized void onChunkComplete(Instant timestamp) {
int count = 0;
while (!activeChunks.isEmpty()) {
DiskChunk oldestChunk = activeChunks.peek();
if (oldestChunk.startTime.isBefore(timestamp)) {
removeOldestChunk();
count++;
} else {
break;
}
}
cleanUpDeadChunk(count + 10);
}
private void addChunk(DiskChunk chunk) {
if (maxAge != null) {
trimToAge(chunk.endTime.minus(maxAge));
}
activeChunks.push(chunk);
size += chunk.size;
trimToSize();
}
private void removeOldestChunk() {
DiskChunk chunk = activeChunks.poll();
deadChunks.add(chunk);
size -= chunk.size;
}
private void cleanUpDeadChunk(int maxCount) {
int count = 0;
Iterator<DiskChunk> iterator = deadChunks.iterator();
while (iterator.hasNext()) {
DiskChunk chunk = iterator.next();
try {
Files.delete(chunk.path);
iterator.remove();
} catch (IOException e) {
// ignore
}
count++;
if (count == maxCount) {
return;
}
}
}
public synchronized void complete() {
if (currentChunk != null) {
try {
completePrevious(currentChunk);
} catch (IOException ioe) {
ManagementSupport.logDebug("Could not complete chunk " + currentChunk.path + " : " + ioe.getMessage());
}
}
}
}

View File

@ -0,0 +1,81 @@
/*
* Copyright (c) 2020, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package jdk.management.jfr;
import java.io.IOException;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
final class DownLoadThread extends Thread {
private final RemoteRecordingStream stream;
private final Instant startTime;
private final Instant endTime;
private final DiskRepository diskRepository;
DownLoadThread(RemoteRecordingStream stream) {
this.stream = stream;
this.startTime = stream.startTime;
this.endTime = stream.endTime;
this.diskRepository = stream.repository;
}
public void run() {
try {
Map<String, String> options = new HashMap<>();
if (startTime != null) {
options.put("startTime", startTime.toString());
}
if (endTime != null) {
options.put("endTime", endTime.toString());
}
options.put("streamVersion", "1.0");
long streamId = this.stream.mbean.openStream(stream.recordingId, options);
while (!stream.isClosed()) {
byte[] bytes = stream.mbean.readStream(streamId);
if (bytes == null) {
return;
}
if (bytes.length != 0) {
diskRepository.write(bytes);
} else {
takeNap();
}
}
} catch (IOException ioe) {
// ignore
} finally {
diskRepository.complete();
}
}
private void takeNap() {
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
// ignore
}
}
}

View File

@ -346,12 +346,22 @@ public interface FlightRecorderMXBean extends PlatformManagedObject {
* <td>{@code "50000"},<br>
* {@code "1000000"},<br>
* </tr>
* <tr>
* <th scope="row">{@code streamVersion}</th>
* <td>Specifies format to use when reading data from a running recording
* </td>
* <td>{@code "1.0"}</td>
* <td>A version number with a major and minor.<br>
* <br>
* To be able to read from a running recording the value must be set</td>
* <td>{@code "1.0"}
* </tr>
* </tbody>
* </table>
* If an option is omitted from the map the default value is used.
* <p>
* The recording with the specified ID must be stopped before a stream can
* be opened. This restriction might be lifted in future releases.
* be opened, unless the option {@code "streamVersion"} is specified.
*
* @param recordingId ID of the recording to open the stream for
*

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2016, 2018, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2016, 2020, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -71,6 +71,7 @@ import jdk.jfr.FlightRecorderPermission;
import jdk.jfr.Recording;
import jdk.jfr.RecordingState;
import jdk.jfr.internal.management.ManagementSupport;
import jdk.jfr.internal.management.StreamManager;
// Instantiated by service provider
final class FlightRecorderMXBeanImpl extends StandardEmitterMBean implements FlightRecorderMXBean, NotificationEmitter {
@ -147,6 +148,15 @@ final class FlightRecorderMXBeanImpl extends StandardEmitterMBean implements Fli
Instant starttime = MBeanUtils.parseTimestamp(s.get("startTime"), Instant.MIN);
Instant endtime = MBeanUtils.parseTimestamp(s.get("endTime"), Instant.MAX);
int blockSize = MBeanUtils.parseBlockSize(s.get("blockSize"), StreamManager.DEFAULT_BLOCK_SIZE);
String version = s.get("streamVersion");
if (version != null) {
if ("1.0".equals(version)) {
Recording r = getRecording(id);
return streamHandler.createOngoing(r, blockSize, starttime, endtime).getId();
}
throw new IllegalArgumentException("Unsupported stream version " + version);
}
InputStream is = getExistingRecording(id).getStream(starttime, endtime);
if (is == null) {
throw new IOException("No recording data available");

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2016, 2018, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2016, 2020, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -140,4 +140,3 @@ final class MBeanUtils {
}
}
}

View File

@ -0,0 +1,563 @@
/*
* Copyright (c) 2020, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package jdk.management.jfr;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.AccessControlContext;
import java.security.AccessController;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import java.security.AccessControlException;
import javax.management.JMX;
import javax.management.MBeanServerConnection;
import javax.management.ObjectName;
import jdk.jfr.Configuration;
import jdk.jfr.EventSettings;
import jdk.jfr.EventType;
import jdk.jfr.Recording;
import jdk.jfr.consumer.EventStream;
import jdk.jfr.consumer.MetadataEvent;
import jdk.jfr.consumer.RecordedEvent;
import jdk.jfr.internal.management.EventSettingsModifier;
import jdk.jfr.internal.management.ManagementSupport;
import jdk.jfr.internal.management.EventByteStream;
/**
* An implementation of an {@link EventStream} that can serialize events over
* the network using an {@link MBeanServerConnection}.
* <p>
* The following example shows how to record garbage collection pauses and CPU
* usage on a remote host and print the events to standard out.
*
* <pre>
* {
* {@literal
* String host = "com.example";
* int port = 4711;
*
* String url = "service:jmx:rmi:///jndi/rmi://" + host + ":" + port + "/jmxrmi";
*
* JMXServiceURL u = new JMXServiceURL(url);
* JMXConnector c = JMXConnectorFactory.connect(u);
* MBeanServerConnection conn = c.getMBeanServerConnection();
*
* try (var rs = new RemoteRecordingStream(conn)) {
* rs.enable("jdk.GCPhasePause").withoutThreshold();
* rs.enable("jdk.CPULoad").withPeriod(Duration.ofSeconds(1));
* rs.onEvent("jdk.CPULoad", System.out::println);
* rs.onEvent("jdk.GCPhasePause", System.out::println);
* rs.start();
* }
* }
* </pre>
*
* @since 16
*/
public final class RemoteRecordingStream implements EventStream {
private static final String ENABLED = "enabled";
static final class RemoteSettings implements EventSettingsModifier {
private final FlightRecorderMXBean mbean;
private final long recordingId;
RemoteSettings(FlightRecorderMXBean mbean, long recordingId) {
this.mbean = mbean;
this.recordingId = recordingId;
}
@Override
public void with(String name, String value) {
Objects.requireNonNull(name);
Objects.requireNonNull(value);
// FlightRecorderMXBean implementation always returns
// new instance of Map so no need to create new here.
Map<String, String> newSettings = getEventSettings();
newSettings.put(name, value);
mbean.setRecordingSettings(recordingId, newSettings);
}
@Override
public Map<String, String> toMap() {
return getEventSettings();
}
private Map<String, String> getEventSettings() {
return mbean.getRecordingSettings(recordingId);
}
}
// Reference to stream is released when EventStream::close is called
final static class ChunkConsumer implements Consumer<Long> {
private final DiskRepository repository;
ChunkConsumer(DiskRepository repository) {
this.repository = repository;
}
@Override
public void accept(Long endNanos) {
Instant t = ManagementSupport.epochNanosToInstant(endNanos);
repository.onChunkComplete(t);
}
}
private static final ObjectName OBJECT_NAME = MBeanUtils.createObjectName();
final Path path;
final FlightRecorderMXBean mbean;
final long recordingId;
final EventStream stream;
final AccessControlContext accessControllerContext;
final DiskRepository repository;
final Instant creationTime;
volatile Instant startTime;
volatile Instant endTime;
volatile boolean closed;
/**
* Creates an event stream that operates against a {@link MBeanServerConnection}
* that has a registered {@link FlightRecorderMXBean}.
* <p>
* To configure event settings, use {@link #setSettings(Map)}.
*
* @param connection the {@code MBeanServerConnection} where the
* {@code FlightRecorderMXBean} is registered, not
* {@code null}
*
* @throws IOException if a stream can't be opened, an I/O error occurs
* when trying to access the repository or the
* {@code FlightRecorderMXBean}
*
* @throws SecurityException if a security manager exists and its
* {@code checkRead} method denies read access to the
* directory, or files in the directory.
*/
public RemoteRecordingStream(MBeanServerConnection connection) throws IOException {
this(connection, makeTempDirectory(), true);
}
/**
* Creates an event stream that operates against a {@link MBeanServerConnection}
* that has a registered {@link FlightRecorderMXBean}.
* <p>
* To configure event settings, use {@link #setSettings(Map)}.
*
* @param connection the {@code MBeanServerConnection} where the
* {@code FlightRecorderMXBean} is registered, not
* {@code null}
*
* @param directory the directory to store event data that is downloaded, not
* {@code null}
*
* @throws IOException if a stream can't be opened, an I/O error occurs
* when trying to access the repository or the
* {@code FlightRecorderMXBean}
*
* @throws SecurityException if a security manager exists and its
* {@code checkRead} method denies read access to the
* directory, or files in the directory.
*/
public RemoteRecordingStream(MBeanServerConnection connection, Path directory) throws IOException {
this(connection, directory, false);
}
private RemoteRecordingStream(MBeanServerConnection connection, Path dir, boolean delete) throws IOException {
Objects.requireNonNull(connection);
Objects.requireNonNull(dir);
accessControllerContext = AccessController.getContext();
// Make sure users can't implement malicious version of a Path object.
path = Paths.get(dir.toString());
if (!Files.exists(path)) {
throw new IOException("Download directory doesn't exist");
}
if (!Files.isDirectory(path)) {
throw new IOException("Download location must be a directory");
}
checkFileAccess(path);
creationTime = Instant.now();
mbean = createProxy(connection);
recordingId = createRecording();
stream = ManagementSupport.newEventDirectoryStream(accessControllerContext, path, configurations(mbean));
stream.setStartTime(Instant.MIN);
repository = new DiskRepository(path, delete);
ManagementSupport.setOnChunkCompleteHandler(stream, new ChunkConsumer(repository));
}
private List<Configuration> configurations(FlightRecorderMXBean mbean) {
List<ConfigurationInfo> cis = mbean.getConfigurations();
List<Configuration> confs = new ArrayList<>(cis.size());
for (ConfigurationInfo ci : cis) {
confs.add(ManagementSupport.newConfiguration(ci.getName(), ci.getLabel(), ci.getDescription(),
ci.getProvider(), ci.getSettings(), ci.getContents()));
}
return Collections.unmodifiableList(confs);
}
@Override
public void onMetadata(Consumer<MetadataEvent> action) {
stream.onMetadata(action);
}
private static void checkFileAccess(Path directory) throws IOException {
RandomAccessFile f = null;
try {
Path testFile = directory.resolve("test-access");
f = new RandomAccessFile(testFile.toFile(), "rw");
f.write(0);
f.seek(0);
f.read();
f.close();
Files.delete(testFile);
} catch (Exception e) {
closeSilently(f);
throw new IOException("Could not read/write/delete in directory" + directory + " :" + e.getMessage());
}
}
private static void closeSilently(RandomAccessFile f) {
if (f == null) {
return;
}
try {
f.close();
} catch (IOException ioe) {
// ignore
}
}
private static FlightRecorderMXBean createProxy(MBeanServerConnection connection) throws IOException {
try {
return JMX.newMXBeanProxy(connection, OBJECT_NAME, FlightRecorderMXBean.class);
} catch (Exception e) {
throw new IOException("Could not create proxy for FlightRecorderMXBean: " + e.getMessage(), e);
}
}
private long createRecording() throws IOException {
try {
long id = mbean.newRecording();
Map<String, String> options = new HashMap<>();
options.put("name", EventByteStream.NAME + ": " + creationTime);
mbean.setRecordingOptions(id, options);
return id;
} catch (Exception e) {
throw new IOException("Could not create new recording: " + e.getMessage(), e);
}
}
private Map<String, String> getRecordingOptions() throws IOException {
try {
return mbean.getRecordingOptions(recordingId);
} catch (Exception e) {
throw new IOException("Could not get recording options: " + e.getMessage(), e);
}
}
/**
* Replaces all settings for this recording stream.
* <p>
* The following example connects to a remote host and stream events using
* settings from the "default" configuration.
*
* <pre>
* {
* {@literal
*
* String host = "com.example";
* int port = 4711;
*
* String url = "service:jmx:rmi:///jndi/rmi://" + host + ":" + port + "/jmxrmi";
*
* JMXServiceURL u = new JMXServiceURL(url);
* JMXConnector c = JMXConnectorFactory.connect(u);
* MBeanServerConnection conn = c.getMBeanServerConnection();
*
* try (final var rs = new RemoteRecordingStream(conn)) {
* rs.onMetadata(e -> {
* for (Configuration c : e.getConfigurations()) {
* if (c.getName().equals("default")) {
* rs.setSettings(c.getSettings());
* }
* }
* });
* rs.onEvent(System.out::println);
* rs.start();
* }
*
* }
* </pre>
*
* @param settings the settings to set, not {@code null}
*
* @see Recording#setSettings(Map)
*/
public void setSettings(Map<String, String> settings) {
Objects.requireNonNull(settings);
try {
mbean.setRecordingSettings(recordingId, settings);
} catch (Exception e) {
ManagementSupport.logDebug(e.getMessage());
close();
}
};
/**
* Disables event with the specified name.
* <p>
* If multiple events with same name (for example, the same class is loaded in
* different class loaders), then all events that match the name are disabled.
*
* @param name the settings for the event, not {@code null}
*
* @return an event setting for further configuration, not {@code null}
*
*/
public EventSettings disable(String name) {
Objects.requireNonNull(name);
EventSettings s = ManagementSupport.newEventSettings(new RemoteSettings(mbean, recordingId));
try {
return s.with(name + "#" + ENABLED, "false");
} catch (Exception e) {
ManagementSupport.logDebug(e.getMessage());
close();
return s;
}
}
/**
* Enables the event with the specified name.
* <p>
* If multiple events have the same name (for example, the same class is loaded
* in different class loaders), then all events that match the name are enabled.
*
* @param name the settings for the event, not {@code null}
*
* @return an event setting for further configuration, not {@code null}
*
* @see EventType
*/
public EventSettings enable(String name) {
Objects.requireNonNull(name);
EventSettings s = ManagementSupport.newEventSettings(new RemoteSettings(mbean, recordingId));
try {
return s.with(name + "#" + ENABLED, "true");
} catch (Exception e) {
ManagementSupport.logDebug(e.getMessage());
close();
return s;
}
}
/**
* Determines how far back data is kept for the stream.
* <p>
* To control the amount of recording data stored on disk, the maximum length of
* time to retain the data can be specified. Data stored on disk that is older
* than the specified length of time is removed by the Java Virtual Machine
* (JVM).
* <p>
* If neither maximum limit or the maximum age is set, the size of the recording
* may grow indefinitely if events are not consumed.
*
* @param maxAge the length of time that data is kept, or {@code null} if
* infinite
*
* @throws IllegalArgumentException if {@code maxAge} is negative
*
* @throws IllegalStateException if the recording is in the {@code CLOSED}
* state
*/
public void setMaxAge(Duration maxAge) {
Objects.requireNonNull(maxAge);
repository.setMaxAge(maxAge);
}
/**
* Determines how much data is kept for the stream.
* <p>
* To control the amount of recording data that is stored on disk, the maximum
* amount of data to retain can be specified. When the maximum limit is
* exceeded, the Java Virtual Machine (JVM) removes the oldest chunk to make
* room for a more recent chunk.
* <p>
* If neither maximum limit or the maximum age is set, the size of the recording
* may grow indefinitely if events are not consumed.
* <p>
* The size is measured in bytes.
*
* @param maxSize the amount of data to retain, {@code 0} if infinite
*
* @throws IllegalArgumentException if {@code maxSize} is negative
*
* @throws IllegalStateException if the recording is in {@code CLOSED} state
*/
public void setMaxSize(long maxSize) {
if (maxSize < 0) {
throw new IllegalArgumentException("Max size of recording can't be negative");
}
repository.setMaxSize(maxSize);
}
@Override
public void onEvent(Consumer<RecordedEvent> action) {
stream.onEvent(action);
}
@Override
public void onEvent(String eventName, Consumer<RecordedEvent> action) {
stream.onEvent(eventName, action);
}
@Override
public void onFlush(Runnable action) {
stream.onFlush(action);
}
@Override
public void onError(Consumer<Throwable> action) {
stream.onError(action);
}
@Override
public void onClose(Runnable action) {
stream.onClose(action);
}
@Override
public void close() {
if (closed) {
return;
}
closed = true;
ManagementSupport.setOnChunkCompleteHandler(stream, null);
stream.close();
try {
mbean.closeRecording(recordingId);
} catch (IOException e) {
ManagementSupport.logDebug(e.getMessage());
}
try {
repository.close();
} catch (IOException e) {
ManagementSupport.logDebug(e.getMessage());
}
}
@Override
public boolean remove(Object action) {
return stream.remove(action);
}
@Override
public void setReuse(boolean reuse) {
stream.setReuse(reuse);
}
@Override
public void setOrdered(boolean ordered) {
stream.setOrdered(ordered);
}
@Override
public void setStartTime(Instant startTime) {
stream.setStartTime(startTime);
this.startTime = startTime;
}
@Override
public void setEndTime(Instant endTime) {
stream.setEndTime(endTime);
this.endTime = endTime;
}
@Override
public void start() {
try {
try {
mbean.startRecording(recordingId);
} catch (IllegalStateException ise) {
throw ise;
}
startDownload();
} catch (Exception e) {
ManagementSupport.logDebug(e.getMessage());
close();
return;
}
stream.start();
}
@Override
public void startAsync() {
stream.startAsync();
try {
mbean.startRecording(recordingId);
startDownload();
} catch (Exception e) {
ManagementSupport.logDebug(e.getMessage());
close();
}
}
@Override
public void awaitTermination(Duration timeout) throws InterruptedException {
stream.awaitTermination(timeout);
}
@Override
public void awaitTermination() throws InterruptedException {
stream.awaitTermination();
}
private static Path makeTempDirectory() throws IOException {
return Files.createTempDirectory("jfr-streaming");
}
private void startDownload() {
Thread downLoadThread = new DownLoadThread(this);
downLoadThread.setName("JFR: Download Thread " + creationTime);
downLoadThread.start();
}
boolean isClosed() {
return closed;
}
}

View File

@ -30,7 +30,7 @@
* @since 9
*/
module jdk.management.jfr {
requires jdk.jfr;
requires transitive jdk.jfr;
requires jdk.management;
requires transitive java.management;

View File

@ -843,6 +843,7 @@ jdk/jfr/api/recording/event/TestReEnableName.java 8256482 windows-
jdk/jfr/event/runtime/TestNetworkUtilizationEvent.java 8228990,8229370 generic-all
jdk/jfr/event/compiler/TestCodeSweeper.java 8225209 generic-all
jdk/jfr/event/os/TestThreadContextSwitches.java 8247776 windows-all
jdk/jfr/jmx/streaming/TestRotate.java 8257215 generic-all
############################################################################

View File

@ -0,0 +1,314 @@
/*
* Copyright (c) 2020, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package jdk.jfr.api.consumer.recordingstream;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import jdk.jfr.Event;
import jdk.jfr.EventType;
import jdk.jfr.FlightRecorder;
import jdk.jfr.Recording;
import jdk.jfr.Registered;
import jdk.jfr.consumer.EventStream;
import jdk.jfr.consumer.MetadataEvent;
import jdk.jfr.consumer.RecordingStream;
/**
* @test
* @summary Tests RecordingStream::onMetadata(...)
* @key jfr
* @requires vm.hasJFR
* @library /test/lib /test/jdk
* @run main/othervm jdk.jfr.api.consumer.recordingstream.TestOnMetadata
*/
public class TestOnMetadata {
public static void main(String... args) throws Throwable {
testDirectoryStream(true, false);
testFileStream(true, false);
testAddAfterStart();
testRemove();
testNull();
testUnmodifiable();
}
private static void testUnmodifiable() throws Exception {
AtomicBoolean fail = new AtomicBoolean();
try (RecordingStream r = new RecordingStream()) {
r.onMetadata(m -> {
EventType t = FlightRecorder.getFlightRecorder().getEventTypes().get(0);
try {
m.getEventTypes().add(t);
System.out.println("Should not be able to modify getEventTypes()");
fail.set(true);
} catch (UnsupportedOperationException uoe) {
// as expected;
}
try {
m.getRemovedEventTypes().add(t);
System.out.println("Should not be able to modify getRemovedEventTypes()");
fail.set(true);
} catch (UnsupportedOperationException uoe) {
// as expected;
}
try {
m.getAddedEventTypes().add(t);
System.out.println("Should not be able to modify getAddedEventTypes()");
fail.set(true);
} catch (UnsupportedOperationException uoe) {
// as expected;
}
r.close();
});
r.start();
r.awaitTermination();
if (fail.get()) {
throw new Exception("Metadata event could be mofified");
}
}
}
private static void testNull() throws Exception {
try (RecordingStream r = new RecordingStream()) {
try {
r.onMetadata(null);
throw new Exception("Expected NullPointerException");
} catch (NullPointerException e) {
// as expected;
}
}
}
private static void testRemove() throws Exception {
class RemoveEvent extends Event {
}
AtomicBoolean receviedMetadata = new AtomicBoolean();
try (RecordingStream r = new RecordingStream()) {
Consumer<MetadataEvent> m = e -> {
receviedMetadata.set(true);
};
r.onMetadata(m);
r.remove(m);
r.onEvent(e -> {
r.close();
});
r.remove(m);
r.startAsync();
RemoveEvent t = new RemoveEvent();
t.commit();
r.awaitTermination();
if (receviedMetadata.get()) {
throw new Exception("Unexpected MetadataEvent!");
}
}
}
private static void testAddAfterStart() throws Exception {
try (RecordingStream rs = new RecordingStream()) {
rs.startAsync();
rs.onMetadata(m -> {
});
throw new Exception("Expected exception if handler is added after start");
} catch (IllegalStateException ise) {
// as expected
}
}
private static void testFileStream(boolean ordered, boolean reuse) throws Exception {
class Spider extends Event {
}
AtomicInteger counter = new AtomicInteger();
try (Recording rs = new Recording()) {
rs.start(); // event 1
rotateChunk();
FlightRecorder.register(Spider.class);
final EventType spiderType = EventType.getEventType(Spider.class);
// event 2
rotateChunk();
FlightRecorder.unregister(Spider.class);
// event 3
rs.stop();
Path p = Paths.get("test-file-stream-jfr");
rs.dump(p);
try (EventStream s = EventStream.openFile(p)) {
System.out.println("Testing file: ordered=" + ordered + " reuse=" + reuse);
s.setOrdered(ordered);
s.setReuse(reuse);
s.onMetadata(e -> {
int count = counter.get();
if (count == 1) {
assertinitialEventypes(e);
}
if (count == 2) {
assertAddedEventType(e, spiderType);
}
if (count == 3) {
assertRemovedEventType(e, spiderType);
}
});
s.start();
if (counter.get() > 3) {
throw new Exception("Unexpected number of Metadata events");
}
}
}
}
private static void testDirectoryStream(boolean ordered, boolean reuse) throws Throwable {
@Registered(false)
class Turtle extends Event {
}
class AssertEventTypes implements Consumer<MetadataEvent> {
private final Semaphore semaphore = new Semaphore(0);
private volatile Throwable exception;
private volatile Consumer<MetadataEvent> assertMethod;
@Override
public void accept(MetadataEvent t) {
try {
assertMethod.accept(t);
} catch (Throwable e) {
this.exception = e;
e.printStackTrace();
}
semaphore.release();
}
public void await() throws Throwable {
semaphore.acquire();
if (exception != null) {
throw exception;
}
}
}
try (RecordingStream rs = new RecordingStream()) {
System.out.println("Testing directory: ordered=" + ordered + " reuse=" + reuse);
rs.setOrdered(ordered);
rs.setReuse(reuse);
AssertEventTypes assertion = new AssertEventTypes();
// Check initial event types
assertion.assertMethod = e -> assertinitialEventypes(e);
rs.onMetadata(assertion);
rs.startAsync();
assertion.await();
// Check added event type
assertion.assertMethod = e -> assertAddedEventType(e, EventType.getEventType(Turtle.class));
FlightRecorder.register(Turtle.class);
final EventType turtleType = EventType.getEventType(Turtle.class);
assertion.await();
// Check removal of turtle event
assertion.assertMethod = e -> assertRemovedEventType(e, turtleType);
FlightRecorder.unregister(Turtle.class);
rotateChunk();
assertion.await();
}
}
private static void assertRemovedEventType(MetadataEvent m, EventType removedType) {
List<EventType> eventTypes = FlightRecorder.getFlightRecorder().getEventTypes();
List<EventType> added = m.getAddedEventTypes();
List<EventType> all = m.getEventTypes();
List<EventType> removed = m.getRemovedEventTypes();
assertEventTypes(all, eventTypes);
assertEventTypes(added, Collections.emptyList());
assertEventTypes(removed, List.of(removedType));
}
private static void assertAddedEventType(MetadataEvent m, EventType addedType) {
List<EventType> eventTypes = FlightRecorder.getFlightRecorder().getEventTypes();
List<EventType> added = m.getAddedEventTypes();
List<EventType> all = m.getEventTypes();
List<EventType> removed = m.getRemovedEventTypes();
assertEventTypes(all, eventTypes);
assertEventTypes(added, List.of(addedType));
assertEventTypes(removed, Collections.emptyList());
}
private static void assertinitialEventypes(MetadataEvent m) {
List<EventType> added = m.getAddedEventTypes();
List<EventType> all = m.getEventTypes();
List<EventType> removed = m.getRemovedEventTypes();
List<EventType> eventTypes = FlightRecorder.getFlightRecorder().getEventTypes();
assertEventTypes(all, eventTypes);
assertEventTypes(added, eventTypes);
assertEventTypes(removed, Collections.emptyList());
}
private static void assertEventTypes(List<EventType> eventTypes, List<EventType> expected) {
if (eventTypes.size() != expected.size()) {
fail(eventTypes, expected);
}
Set<Long> set = new HashSet<>();
for (EventType eb : expected) {
set.add(eb.getId());
}
for (EventType ea : eventTypes) {
if (!set.contains(ea.getId())) {
fail(eventTypes, expected);
}
}
}
private static void fail(List<EventType> evenTypes, List<EventType> expected) {
System.out.println("Event types don't match");
System.out.println("Expected:");
for (EventType t : expected) {
System.out.println(t.getName());
}
System.out.println("Got:");
for (EventType t : expected) {
System.out.println(t.getName());
}
throw new RuntimeException("EventTypes don't match!");
}
private static void rotateChunk() {
try (Recording r = new Recording()) {
r.start();
}
}
}

View File

@ -0,0 +1,74 @@
/*
* Copyright (c) 2020, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package jdk.jfr.jmx.streaming;
import java.lang.management.ManagementFactory;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import javax.management.MBeanServerConnection;
import jdk.jfr.Event;
import jdk.management.jfr.RemoteRecordingStream;
/**
* @test
* @key jfr
* @summary Tests that a RemoteRecordingStream can be closed
* @requires vm.hasJFR
* @library /test/lib /test/jdk
* @run main/othervm jdk.jfr.jmx.streaming.TestClose
*/
public class TestClose {
static class TestCloseEvent extends Event {
}
public static void main(String... args) throws Exception {
MBeanServerConnection conn = ManagementFactory.getPlatformMBeanServer();
Path p = Files.createDirectory(Paths.get("test-close-" + System.currentTimeMillis()));
RemoteRecordingStream e = new RemoteRecordingStream(conn, p);
e.startAsync();
// Produce enough to generate multiple chunks
for (int i = 0; i < 200_000; i++) {
TestCloseEvent event = new TestCloseEvent();
event.commit();
}
e.onFlush(() -> {
e.close(); // <- should clean up files.
});
e.awaitTermination();
int count = 0;
for (Object path : Files.list(p).toArray()) {
System.out.println(path);
count++;
}
if (count > 0) {
throw new Exception("Expected repository to be empty");
}
}
}

View File

@ -0,0 +1,203 @@
/*
* Copyright (c) 2020, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package jdk.jfr.jmx.streaming;
import java.lang.management.ManagementFactory;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import javax.management.MBeanServerConnection;
import jdk.jfr.Event;
import jdk.management.jfr.RemoteRecordingStream;
/**
* @test
* @key jfr
* @summary Sanity test methods that delegates to an ordinary stream
* @requires vm.hasJFR
* @library /test/lib /test/jdk
* @run main/othervm jdk.jfr.jmx.streaming.TestDelegated
*/
public class TestDelegated {
private static MBeanServerConnection CONNECTION = ManagementFactory.getPlatformMBeanServer();
static class TestDelegatedEvent extends Event {
}
// The assumption here is that the following methods don't
// need t be tested fully since they all delegate to the
// same implementation class that is tested elsewhere.
public static void main(String[] args) throws Exception {
testRemove();
testReuse();
testOrdered();
testOnEvent();
testOnEventName();
testOnFlush();
testOnError();
testOnClose();
testSetMaxAge();
testAwaitTermination();
testAwaitTerminationWithDuration();
}
private static void testSetMaxAge() throws Exception {
try (RemoteRecordingStream stream = new RemoteRecordingStream(CONNECTION)) {
try {
stream.setMaxAge(null);
throw new Exception("Expected NullPointerException");
} catch (NullPointerException npe) {
// As expected
}
}
}
private static void testAwaitTerminationWithDuration() throws Exception {
try (RemoteRecordingStream rs = new RemoteRecordingStream(CONNECTION)) {
rs.onEvent(e -> {
rs.close();
});
rs.startAsync();
TestDelegatedEvent e = new TestDelegatedEvent();
e.commit();
rs.awaitTermination(Duration.ofDays(1));
}
}
private static void testAwaitTermination() throws Exception {
try (RemoteRecordingStream rs = new RemoteRecordingStream(CONNECTION)) {
rs.onEvent(e -> {
rs.close();
});
rs.startAsync();
TestDelegatedEvent e = new TestDelegatedEvent();
e.commit();
rs.awaitTermination();
}
}
private static void testOnClose() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
try (RemoteRecordingStream rs = new RemoteRecordingStream(CONNECTION)) {
rs.onClose(() -> {
latch.countDown();
});
rs.startAsync();
rs.close();
latch.await();
}
}
private static void testOnError() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
try (RemoteRecordingStream rs = new RemoteRecordingStream(CONNECTION)) {
rs.onEvent(TestDelegatedEvent.class.getName(), e -> {
throw new RuntimeException("Testing");
});
rs.onError(t -> {
latch.countDown();
});
rs.startAsync();
TestDelegatedEvent e = new TestDelegatedEvent();
e.commit();
latch.await();
}
}
private static void testOnFlush() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
try (RemoteRecordingStream rs = new RemoteRecordingStream(CONNECTION)) {
rs.onFlush(() -> {
latch.countDown();
});
rs.startAsync();
TestDelegatedEvent e = new TestDelegatedEvent();
e.commit();
latch.await();
}
}
private static void testOnEventName() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
try (RemoteRecordingStream rs = new RemoteRecordingStream(CONNECTION)) {
rs.onEvent(TestDelegatedEvent.class.getName(), e -> {
latch.countDown();
});
rs.startAsync();
TestDelegatedEvent e = new TestDelegatedEvent();
e.commit();
latch.await();
}
}
private static void testOnEvent() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
try (RemoteRecordingStream rs = new RemoteRecordingStream(CONNECTION)) {
rs.onEvent(e -> {
System.out.println(e);
latch.countDown();
});
rs.startAsync();
TestDelegatedEvent e = new TestDelegatedEvent();
e.commit();
latch.await();
}
}
private static void testOrdered() throws Exception {
try (RemoteRecordingStream rs = new RemoteRecordingStream(CONNECTION)) {
rs.setOrdered(true);
rs.setOrdered(false);
}
}
private static void testReuse() throws Exception {
try (RemoteRecordingStream rs = new RemoteRecordingStream(CONNECTION)) {
rs.setReuse(true);
rs.setReuse(false);
}
}
private static void testRemove() throws Exception {
try (RemoteRecordingStream rs = new RemoteRecordingStream(CONNECTION)) {
Runnable r1 = () -> {
};
Runnable r2 = () -> {
};
rs.onFlush(r1);
if (!rs.remove(r1)) {
throw new Exception("Expected remove to return true");
}
if (rs.remove(r2)) {
throw new Exception("Expected remove to return false");
}
}
}
}

View File

@ -0,0 +1,85 @@
/*
* Copyright (c) 2020, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package jdk.jfr.jmx.streaming;
import java.lang.management.ManagementFactory;
import java.util.concurrent.CountDownLatch;
import javax.management.MBeanServerConnection;
import jdk.jfr.Enabled;
import jdk.jfr.Event;
import jdk.jfr.Name;
import jdk.management.jfr.RemoteRecordingStream;
/**
* @test
* @key jfr
* @summary Tests that event settings for a RemoteRecordingStream can be changed
* @requires vm.hasJFR
* @library /test/lib /test/jdk
* @run main/othervm jdk.jfr.jmx.streaming.TestEnableDisable
*/
public class TestEnableDisable {
@Name("Zebra")
@Enabled(false)
static class Zebra extends Event {
}
@Name("Tiger")
@Enabled(true)
static class Tiger extends Event {
}
public static void main(String... args) throws Exception {
CountDownLatch zebraLatch = new CountDownLatch(1);
MBeanServerConnection conn = ManagementFactory.getPlatformMBeanServer();
try (RemoteRecordingStream stream = new RemoteRecordingStream(conn)) {
stream.enable("Zebra");
stream.disable("Tiger");
stream.onEvent("Zebra", e -> {
System.out.println(e);
zebraLatch.countDown();
});
stream.onEvent("Tiger", e -> {
System.out.println(e);
throw new RuntimeException("Unexpected Tiger"); // will close stream
});
stream.startAsync();
Tiger t = new Tiger();
t.commit();
Zebra z = new Zebra();
z.commit();
zebraLatch.await();
}
}
}

View File

@ -0,0 +1,116 @@
/*
* Copyright (c) 2020, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package jdk.jfr.jmx.streaming;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.management.MBeanServerConnection;
import jdk.jfr.Event;
import jdk.management.jfr.RemoteRecordingStream;
/**
* @test
* @key jfr
* @summary Tests that max size can be set for a RemoteRecordingStream
* @requires vm.hasJFR
* @library /test/lib /test/jdk
* @run main/othervm jdk.jfr.jmx.streaming.TestMaxSize
*/
public class TestMaxSize {
static class Monkey extends Event {
}
public static void main(String... args) throws Exception {
MBeanServerConnection conn = ManagementFactory.getPlatformMBeanServer();
Path dir = Files.createDirectories(Paths.get("max-size-" + System.currentTimeMillis()));
System.out.println(dir);
AtomicBoolean finished = new AtomicBoolean();
try (RemoteRecordingStream e = new RemoteRecordingStream(conn, dir)) {
e.startAsync();
e.onEvent(ev -> {
if (finished.get()) {
return;
}
// Consume some events, but give back control
// to stream so it can be closed.
try {
Thread.sleep(10);
} catch (InterruptedException e1) {
// ignore
}
});
while (directorySize(dir) < 50_000_000) {
emitEvents(500_000);
}
e.setMaxSize(1_000_000);
long count = fileCount(dir);
if (count > 2) {
// Two chunks can happen when header of new chunk is written and previous
// chunk is not finalized.
throw new Exception("Expected only one or two chunks with setMaxSize(1_000_000). Found " + count);
}
finished.set(true);
}
}
private static void emitEvents(int count) throws InterruptedException {
for (int i = 0; i < count; i++) {
Monkey m = new Monkey();
m.commit();
}
System.out.println("Emitted " + count + " events");
Thread.sleep(1000);
}
private static int fileCount(Path dir) throws IOException {
System.out.println("Files:");
AtomicInteger count = new AtomicInteger();
Files.list(dir).forEach(p -> {
System.out.println(p);
count.incrementAndGet();
});
return count.get();
}
private static long directorySize(Path dir) throws IOException {
long p = Files.list(dir).mapToLong(f -> {
try {
return Files.size(f);
} catch (IOException e) {
return 0;
}
}).sum();
System.out.println("Directory size: " + p);
return p;
}
}

View File

@ -0,0 +1,69 @@
/*
* Copyright (c) 2020, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package jdk.jfr.jmx.streaming;
import java.lang.management.ManagementFactory;
import java.util.concurrent.CountDownLatch;
import javax.management.MBeanServerConnection;
import jdk.jfr.Event;
import jdk.jfr.Recording;
import jdk.management.jfr.RemoteRecordingStream;
/**
* @test
* @key jfr
* @summary Tests that a RemoteRecordingStream can stream over multiple chunks
* @requires vm.hasJFR
* @library /test/lib /test/jdk
* @run main/othervm jdk.jfr.jmx.streaming.TestMultipleChunks
*/
public class TestMultipleChunks {
static class Snake extends Event {
}
public static void main(String... args) throws Exception {
CountDownLatch latch = new CountDownLatch(5);
MBeanServerConnection conn = ManagementFactory.getPlatformMBeanServer();
try (RemoteRecordingStream s = new RemoteRecordingStream(conn)) {
s.onEvent(e -> latch.countDown());
s.startAsync();
for (int i = 0; i < 5; i++) {
Snake snake = new Snake();
snake.commit();
rotate();
}
}
}
private static void rotate() {
try (Recording r = new Recording()) {
r.start();
}
}
}

View File

@ -0,0 +1,137 @@
/*
* Copyright (c) 2020, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package jdk.jfr.jmx.streaming;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.lang.management.ManagementFactory;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import javax.management.MBeanServerConnection;
import javax.management.ObjectName;
import jdk.management.jfr.FlightRecorderMXBean;
import jdk.management.jfr.RemoteRecordingStream;
/**
* @test
* @key jfr
* @summary Test constructors of RemoteRecordingStream
* @requires vm.hasJFR
* @library /test/lib /test/jdk
* @run main/othervm jdk.jfr.jmx.streaming.TestNew
*/
public class TestNew {
private final static ObjectName MXBEAN = createObjectName();
public static void main(String... args) throws Exception {
testNullArguments();
testMissingDirectory();
testNotDirectory();
testDefaultDIrectory();
TestUserDefinedDirectory();
testMissingFlightRecorderMXBean();
}
private static void TestUserDefinedDirectory() throws IOException {
Path p = Paths.get("user-repository-" + System.currentTimeMillis());
Files.createDirectory(p);
MBeanServerConnection conn = ManagementFactory.getPlatformMBeanServer();
try (RemoteRecordingStream s = new RemoteRecordingStream(conn, p)) {
// success
}
}
private static void testDefaultDIrectory() throws IOException {
MBeanServerConnection conn = ManagementFactory.getPlatformMBeanServer();
try (RemoteRecordingStream s = new RemoteRecordingStream(conn)) {
// success
}
}
private static void testNotDirectory() throws Exception {
Path p = Paths.get("file.txt");
RandomAccessFile raf = new RandomAccessFile(p.toFile(), "rw");
raf.close();
MBeanServerConnection conn = ManagementFactory.getPlatformMBeanServer();
try (var s = new RemoteRecordingStream(conn, p)) {
throw new Exception("Expected IOException");
} catch (IOException ioe) {
if (!ioe.getMessage().contains("Download location must be a directory")) {
throw new Exception("Unexpected message " + ioe.getMessage());
}
}
}
private static void testMissingDirectory() throws Exception {
Path p = Paths.get("/missing");
MBeanServerConnection conn = ManagementFactory.getPlatformMBeanServer();
try (var s = new RemoteRecordingStream(conn, p)) {
throw new Exception("Expected IOException");
} catch (IOException ioe) {
if (!ioe.getMessage().contains("Download directory doesn't exist")) {
throw new Exception("Unexpected message " + ioe.getMessage());
}
}
}
private static void testNullArguments() throws Exception {
try (var s = new RemoteRecordingStream(null)) {
throw new Exception("Expected NullPointerException");
} catch (NullPointerException npe) {
// as expected
}
MBeanServerConnection conn = ManagementFactory.getPlatformMBeanServer();
try (var s = new RemoteRecordingStream(conn, null)) {
throw new Exception("Expected NullPointerException");
} catch (NullPointerException npe) {
// as expected
}
}
private static void testMissingFlightRecorderMXBean() throws Exception {
MBeanServerConnection conn = ManagementFactory.getPlatformMBeanServer();
conn.unregisterMBean(MXBEAN);
try (var s = new RemoteRecordingStream(conn)) {
throw new Exception("Expected IOException");
} catch (IOException npe) {
// as expected
}
}
private static ObjectName createObjectName() {
try {
return new ObjectName(FlightRecorderMXBean.MXBEAN_NAME);
} catch (Exception e) {
throw new InternalError("Unexpected exception", e);
}
}
}

View File

@ -0,0 +1,85 @@
/*
* Copyright (c) 2020, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package jdk.jfr.jmx.streaming;
import java.lang.management.ManagementFactory;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.CountDownLatch;
import javax.management.MBeanServerConnection;
import jdk.jfr.Event;
import jdk.jfr.Recording;
import jdk.jfr.StackTrace;
import jdk.management.jfr.RemoteRecordingStream;
/**
* @test
* @key jfr
* @summary Tests that streaming can work over chunk rotations
* @requires vm.hasJFR
* @library /test/lib /test/jdk
* @run main/othervm jdk.jfr.jmx.streaming.TestRotate
*/
public class TestRotate {
@StackTrace(false)
static class TestRotateEvent extends Event {
int value;
}
public static void main(String... args) throws Exception {
MBeanServerConnection conn = ManagementFactory.getPlatformMBeanServer();
Path p = Files.createDirectory(Paths.get("test-stream-rotate-" + System.currentTimeMillis()));
CountDownLatch latch = new CountDownLatch(100);
try (RemoteRecordingStream r = new RemoteRecordingStream(conn, p)) {
r.onEvent(e -> {
System.out.println(e);
latch.countDown();
});
r.startAsync();
for (int i = 1; i <= 100; i++) {
TestRotateEvent e = new TestRotateEvent();
e.value = i;
e.commit();
if (i % 30 == 0) {
rotate();
}
Thread.sleep(10);
}
System.out.println("Events generated. Awaiting consumption");
latch.await();
}
}
private static void rotate() {
try (Recording r = new Recording()) {
r.start();
}
}
}

View File

@ -0,0 +1,109 @@
/*
* Copyright (c) 2020, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package jdk.jfr.jmx.streaming;
import java.lang.management.ManagementFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import javax.management.MBeanServerConnection;
import jdk.jfr.Enabled;
import jdk.jfr.Event;
import jdk.jfr.Name;
import jdk.jfr.StackTrace;
import jdk.management.jfr.RemoteRecordingStream;
/**
* @test
* @key jfr
* @summary Tests that a RemoteRecordingStream can be configured using
* setSettings
* @requires vm.hasJFR
* @library /test/lib /test/jdk
* @run main/othervm jdk.jfr.jmx.streaming.TestSetSettings
*/
public class TestSetSettings {
@Enabled(false)
@StackTrace(false)
@Name("Event1")
static class Event1 extends Event {
}
@Enabled(false)
@StackTrace(false)
@Name("Event2")
static class Event2 extends Event {
}
public static void main(String... args) throws Exception {
CountDownLatch latch1 = new CountDownLatch(1);
CountDownLatch latch2 = new CountDownLatch(1);
MBeanServerConnection conn = ManagementFactory.getPlatformMBeanServer();
try (RemoteRecordingStream r = new RemoteRecordingStream(conn)) {
r.onEvent("Event1", e -> {
System.out.println("Found event: " + e.getEventType().getName());
if (e.getStackTrace() == null) {
System.out.println("Missing strack trace");
return;
}
latch1.countDown();
});
r.onEvent("Event2", e -> {
System.out.println("Found event: " + e.getEventType().getName());
if (e.getStackTrace() == null) {
System.out.println("Missing strack trace");
return;
}
latch2.countDown();
});
// Set settings before start
Map<String, String> settings = new HashMap<>();
settings.put("Event1#enabled", "true");
settings.put("Event1#stackTrace", "true");
r.setSettings(settings);
r.startAsync();
Event1 e1 = new Event1();
e1.commit();
System.out.println("Awaiting latch 1");
latch1.await();
// Set settings when running
settings = new HashMap<>();
settings.put("Event2#enabled", "true");
settings.put("Event2#stackTrace", "true");
r.setSettings(settings);
Event2 e2 = new Event2();
e2.commit();
System.out.println("Awaiting latch 2");
latch2.await();
}
}
}