mirror of
https://github.com/openjdk/jdk.git
synced 2026-04-14 08:58:46 +00:00
6747411: EventClient causes thread leaks
Reworked thread management in EventClient and related classes. Reviewed-by: sjiang, dfuchs
This commit is contained in:
parent
3020470ba9
commit
a1e4e3ec94
@ -27,7 +27,6 @@ package com.sun.jmx.event;
|
||||
|
||||
import com.sun.jmx.remote.util.ClassLogger;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.FutureTask;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
@ -115,6 +114,7 @@ public class LeaseManager {
|
||||
scheduled = null;
|
||||
}
|
||||
callback.run();
|
||||
executor.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@ -131,6 +131,13 @@ public class LeaseManager {
|
||||
logger.trace("stop", "canceling lease");
|
||||
scheduled.cancel(false);
|
||||
scheduled = null;
|
||||
try {
|
||||
executor.shutdown();
|
||||
} catch (SecurityException e) {
|
||||
// OK: caller doesn't have RuntimePermission("modifyThread")
|
||||
// which is unlikely in reality but triggers a test failure otherwise
|
||||
logger.trace("stop", "exception from executor.shutdown", e);
|
||||
}
|
||||
}
|
||||
|
||||
private final Runnable callback;
|
||||
@ -138,7 +145,7 @@ public class LeaseManager {
|
||||
|
||||
private final ScheduledExecutorService executor
|
||||
= Executors.newScheduledThreadPool(1,
|
||||
new DaemonThreadFactory("LeaseManager"));
|
||||
new DaemonThreadFactory("JMX LeaseManager %d"));
|
||||
|
||||
private static final ClassLogger logger =
|
||||
new ClassLogger("javax.management.event", "LeaseManager");
|
||||
|
||||
@ -95,7 +95,9 @@ public abstract class RepeatedSingletonJob implements Runnable {
|
||||
executor.execute(this);
|
||||
} catch (RejectedExecutionException e) {
|
||||
logger.warning(
|
||||
"setEventReceiver", "Executor threw exception", e);
|
||||
"execute",
|
||||
"Executor threw exception (" + this.getClass().getName() + ")",
|
||||
e);
|
||||
throw new RejectedExecutionException(
|
||||
"Executor.execute threw exception -" +
|
||||
"should not be possible", e);
|
||||
|
||||
@ -32,13 +32,15 @@ import com.sun.jmx.remote.util.ClassLogger;
|
||||
import com.sun.jmx.remote.util.EnvHelp;
|
||||
|
||||
public abstract class ClientCommunicatorAdmin {
|
||||
private static volatile long threadNo = 1;
|
||||
|
||||
public ClientCommunicatorAdmin(long period) {
|
||||
this.period = period;
|
||||
|
||||
if (period > 0) {
|
||||
checker = new Checker();
|
||||
|
||||
Thread t = new Thread(checker);
|
||||
Thread t = new Thread(checker, "JMX client heartbeat " + ++threadNo);
|
||||
t.setDaemon(true);
|
||||
t.start();
|
||||
} else
|
||||
|
||||
@ -264,11 +264,12 @@ public class EventClient implements EventConsumer, NotificationManager {
|
||||
new PerThreadGroupPool.Create<ScheduledThreadPoolExecutor>() {
|
||||
public ScheduledThreadPoolExecutor createThreadPool(ThreadGroup group) {
|
||||
ThreadFactory daemonThreadFactory = new DaemonThreadFactory(
|
||||
"EventClient lease renewer %d");
|
||||
"JMX EventClient lease renewer %d");
|
||||
ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(
|
||||
20, daemonThreadFactory);
|
||||
exec.setKeepAliveTime(3, TimeUnit.SECONDS);
|
||||
exec.setKeepAliveTime(1, TimeUnit.SECONDS);
|
||||
exec.allowCoreThreadTimeOut(true);
|
||||
exec.setRemoveOnCancelPolicy(true);
|
||||
return exec;
|
||||
}
|
||||
};
|
||||
|
||||
@ -31,10 +31,8 @@ import com.sun.jmx.remote.util.ClassLogger;
|
||||
import java.io.IOException;
|
||||
import java.io.NotSerializableException;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import javax.management.MBeanException;
|
||||
@ -215,50 +213,47 @@ public class FetchingEventRelay implements EventRelay {
|
||||
this.maxNotifs = maxNotifs;
|
||||
|
||||
if (executor == null) {
|
||||
executor = Executors.newSingleThreadScheduledExecutor(
|
||||
ScheduledThreadPoolExecutor stpe = new ScheduledThreadPoolExecutor(1,
|
||||
daemonThreadFactory);
|
||||
}
|
||||
stpe.setKeepAliveTime(1, TimeUnit.SECONDS);
|
||||
stpe.allowCoreThreadTimeOut(true);
|
||||
executor = stpe;
|
||||
this.defaultExecutor = stpe;
|
||||
} else
|
||||
this.defaultExecutor = null;
|
||||
this.executor = executor;
|
||||
if (executor instanceof ScheduledExecutorService)
|
||||
leaseScheduler = (ScheduledExecutorService) executor;
|
||||
else {
|
||||
leaseScheduler = Executors.newSingleThreadScheduledExecutor(
|
||||
daemonThreadFactory);
|
||||
}
|
||||
|
||||
startSequenceNumber = 0;
|
||||
fetchingJob = new MyJob();
|
||||
}
|
||||
|
||||
public void setEventReceiver(EventReceiver eventReceiver) {
|
||||
public synchronized void setEventReceiver(EventReceiver eventReceiver) {
|
||||
if (logger.traceOn()) {
|
||||
logger.trace("setEventReceiver", ""+eventReceiver);
|
||||
}
|
||||
|
||||
EventReceiver old = this.eventReceiver;
|
||||
synchronized(fetchingJob) {
|
||||
this.eventReceiver = eventReceiver;
|
||||
if (old == null && eventReceiver != null)
|
||||
fetchingJob.resume();
|
||||
}
|
||||
this.eventReceiver = eventReceiver;
|
||||
if (old == null && eventReceiver != null)
|
||||
fetchingJob.resume();
|
||||
}
|
||||
|
||||
public String getClientId() {
|
||||
return clientId;
|
||||
}
|
||||
|
||||
public void stop() {
|
||||
public synchronized void stop() {
|
||||
if (logger.traceOn()) {
|
||||
logger.trace("stop", "");
|
||||
}
|
||||
synchronized(fetchingJob) {
|
||||
if (stopped) {
|
||||
return;
|
||||
}
|
||||
|
||||
stopped = true;
|
||||
clientId = null;
|
||||
if (stopped) {
|
||||
return;
|
||||
}
|
||||
|
||||
stopped = true;
|
||||
clientId = null;
|
||||
if (defaultExecutor != null)
|
||||
defaultExecutor.shutdown();
|
||||
}
|
||||
|
||||
private class MyJob extends RepeatedSingletonJob {
|
||||
@ -372,10 +367,9 @@ public class FetchingEventRelay implements EventRelay {
|
||||
private final EventClientDelegateMBean delegate;
|
||||
private String clientId;
|
||||
private boolean stopped = false;
|
||||
private volatile ScheduledFuture<?> leaseRenewalFuture;
|
||||
|
||||
private final Executor executor;
|
||||
private final ScheduledExecutorService leaseScheduler;
|
||||
private final ExecutorService defaultExecutor;
|
||||
private final MyJob fetchingJob;
|
||||
|
||||
private final long timeout;
|
||||
@ -385,5 +379,5 @@ public class FetchingEventRelay implements EventRelay {
|
||||
new ClassLogger("javax.management.event",
|
||||
"FetchingEventRelay");
|
||||
private static final ThreadFactory daemonThreadFactory =
|
||||
new DaemonThreadFactory("FetchingEventRelay-executor");
|
||||
new DaemonThreadFactory("JMX FetchingEventRelay executor %d");
|
||||
}
|
||||
|
||||
@ -185,7 +185,7 @@ public class RMIPushEventForwarder implements EventForwarder {
|
||||
|
||||
private static final ExecutorService executor =
|
||||
Executors.newCachedThreadPool(
|
||||
new DaemonThreadFactory("RMIEventForwarder Executor"));
|
||||
new DaemonThreadFactory("JMX RMIEventForwarder Executor"));
|
||||
private final SendingJob sendingJob = new SendingJob();
|
||||
|
||||
private final BlockingQueue<TargetedNotification> buffer;
|
||||
|
||||
@ -420,7 +420,7 @@ public class RMIConnector implements JMXConnector, Serializable, JMXAddressable
|
||||
new PerThreadGroupPool.Create<ThreadPoolExecutor>() {
|
||||
public ThreadPoolExecutor createThreadPool(ThreadGroup group) {
|
||||
ThreadFactory daemonThreadFactory = new DaemonThreadFactory(
|
||||
"RMIConnector listener dispatch %d");
|
||||
"JMX RMIConnector listener dispatch %d");
|
||||
ThreadPoolExecutor exec = new ThreadPoolExecutor(
|
||||
1, 10, 1, TimeUnit.SECONDS,
|
||||
new LinkedBlockingDeque<Runnable>(),
|
||||
|
||||
@ -0,0 +1,176 @@
|
||||
/*
|
||||
* Copyright 2008 Sun Microsystems, Inc. All Rights Reserved.
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
* under the terms of the GNU General Public License version 2 only, as
|
||||
* published by the Free Software Foundation.
|
||||
*
|
||||
* This code is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||
* version 2 for more details (a copy is included in the LICENSE file that
|
||||
* accompanied this code).
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License version
|
||||
* 2 along with this work; if not, write to the Free Software Foundation,
|
||||
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
*
|
||||
* Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
|
||||
* CA 95054 USA or visit www.sun.com if you need additional information or
|
||||
* have any questions.
|
||||
*/
|
||||
|
||||
/*
|
||||
* @test
|
||||
* @bug 6747411
|
||||
* @summary Check that EventClient instances don't leak threads.
|
||||
* @author Eamonn McManus
|
||||
*/
|
||||
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.lang.management.ThreadInfo;
|
||||
import java.lang.management.ThreadMXBean;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
import javax.management.MBeanServer;
|
||||
import javax.management.MBeanServerConnection;
|
||||
import javax.management.MBeanServerDelegate;
|
||||
import javax.management.MBeanServerNotification;
|
||||
import javax.management.Notification;
|
||||
import javax.management.NotificationFilter;
|
||||
import javax.management.NotificationListener;
|
||||
import javax.management.ObjectName;
|
||||
import javax.management.event.EventClient;
|
||||
import javax.management.remote.JMXConnector;
|
||||
import javax.management.remote.JMXConnectorFactory;
|
||||
import javax.management.remote.JMXConnectorServer;
|
||||
import javax.management.remote.JMXConnectorServerFactory;
|
||||
import javax.management.remote.JMXServiceURL;
|
||||
|
||||
public class EventClientThreadTest {
|
||||
private static final int MAX_TIME_SECONDS = 20;
|
||||
|
||||
private static final BlockingQueue<Notification> queue =
|
||||
new ArrayBlockingQueue(100);
|
||||
|
||||
private static final NotificationListener queueListener =
|
||||
new NotificationListener() {
|
||||
public void handleNotification(Notification notification,
|
||||
Object handback) {
|
||||
queue.add(notification);
|
||||
}
|
||||
};
|
||||
|
||||
private static final NotificationFilter dummyFilter =
|
||||
new NotificationFilter() {
|
||||
public boolean isNotificationEnabled(Notification notification) {
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
long start = System.currentTimeMillis();
|
||||
long deadline = start + MAX_TIME_SECONDS * 1000;
|
||||
|
||||
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
|
||||
JMXServiceURL url = new JMXServiceURL("service:jmx:rmi://");
|
||||
JMXConnectorServer cs = JMXConnectorServerFactory.newJMXConnectorServer(
|
||||
url, null, mbs);
|
||||
cs.start();
|
||||
JMXServiceURL addr = cs.getAddress();
|
||||
JMXConnector cc = JMXConnectorFactory.connect(addr);
|
||||
MBeanServerConnection mbsc = cc.getMBeanServerConnection();
|
||||
|
||||
ThreadMXBean threads = ManagementFactory.getThreadMXBean();
|
||||
|
||||
System.out.println("Opening and closing some EventClients...");
|
||||
// If we create a connection, then create and destroy EventClients
|
||||
// over it, then close it, there should be no "JMX *" threads left.
|
||||
for (int i = 0; i < 5; i++)
|
||||
test(mbsc);
|
||||
|
||||
cc.close();
|
||||
|
||||
showTime("opening and closing initial EventClients", start);
|
||||
|
||||
Set<String> jmxThreads = threadsMatching("JMX .*");
|
||||
while (!jmxThreads.isEmpty() && System.currentTimeMillis() < deadline) {
|
||||
Set<String> jmxThreadsNow = threadsMatching("JMX .*");
|
||||
Set<String> gone = new TreeSet<String>(jmxThreads);
|
||||
gone.removeAll(jmxThreadsNow);
|
||||
for (String s : gone)
|
||||
showTime("expiry of \"" + s + "\"", start);
|
||||
jmxThreads = jmxThreadsNow;
|
||||
Thread.sleep(10);
|
||||
}
|
||||
if (System.currentTimeMillis() >= deadline) {
|
||||
showThreads(threads);
|
||||
throw new Exception("Timed out waiting for JMX threads to expire");
|
||||
}
|
||||
|
||||
showTime("waiting for JMX threads to expire", start);
|
||||
|
||||
System.out.println("TEST PASSED");
|
||||
}
|
||||
|
||||
static void showThreads(ThreadMXBean threads) throws Exception {
|
||||
long[] ids = threads.getAllThreadIds();
|
||||
for (long id : ids) {
|
||||
ThreadInfo ti = threads.getThreadInfo(id);
|
||||
String name = (ti == null) ? "(defunct)" : ti.getThreadName();
|
||||
System.out.printf("%4d %s\n", id, name);
|
||||
}
|
||||
}
|
||||
|
||||
static void showTime(String what, long start) {
|
||||
long elapsed = System.currentTimeMillis() - start;
|
||||
System.out.printf("Time after %s: %.3f s\n", what, elapsed / 1000.0);
|
||||
}
|
||||
|
||||
static Set<String> threadsMatching(String pattern) {
|
||||
Set<String> matching = new TreeSet<String>();
|
||||
ThreadMXBean threads = ManagementFactory.getThreadMXBean();
|
||||
long[] ids = threads.getAllThreadIds();
|
||||
for (long id : ids) {
|
||||
ThreadInfo ti = threads.getThreadInfo(id);
|
||||
String name = (ti == null) ? "(defunct)" : ti.getThreadName();
|
||||
if (name.matches(pattern))
|
||||
matching.add(name);
|
||||
}
|
||||
return matching;
|
||||
}
|
||||
|
||||
static void test(MBeanServerConnection mbsc) throws Exception {
|
||||
final ObjectName delegateName = MBeanServerDelegate.DELEGATE_NAME;
|
||||
final ObjectName testName = new ObjectName("test:type=Test");
|
||||
EventClient ec = new EventClient(mbsc);
|
||||
ec.addNotificationListener(delegateName, queueListener, null, null);
|
||||
mbsc.createMBean(MBeanServerDelegate.class.getName(), testName);
|
||||
mbsc.unregisterMBean(testName);
|
||||
final String[] expectedTypes = {
|
||||
MBeanServerNotification.REGISTRATION_NOTIFICATION,
|
||||
MBeanServerNotification.UNREGISTRATION_NOTIFICATION,
|
||||
};
|
||||
for (String s : expectedTypes) {
|
||||
Notification n = queue.poll(3, TimeUnit.SECONDS);
|
||||
if (n == null)
|
||||
throw new Exception("Timed out waiting for notif: " + s);
|
||||
if (!(n instanceof MBeanServerNotification))
|
||||
throw new Exception("Got notif of wrong class: " + n.getClass());
|
||||
if (!n.getType().equals(s)) {
|
||||
throw new Exception("Got notif of wrong type: " + n.getType() +
|
||||
" (expecting " + s + ")");
|
||||
}
|
||||
}
|
||||
ec.removeNotificationListener(delegateName, queueListener);
|
||||
|
||||
ec.addNotificationListener(delegateName, queueListener, dummyFilter, "foo");
|
||||
ec.removeNotificationListener(delegateName, queueListener, dummyFilter, "foo");
|
||||
|
||||
ec.close();
|
||||
}
|
||||
}
|
||||
@ -1,4 +1,4 @@
|
||||
/*/*
|
||||
/*
|
||||
* Copyright 2007 Sun Microsystems, Inc. All Rights Reserved.
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user