From 18bb707985b119d98eaa14f6b8911595dc749bd2 Mon Sep 17 00:00:00 2001 From: Duncan Mackintosh Date: Thu, 14 Jul 2011 16:30:28 +0100 Subject: [PATCH] Refactored provisiond's AsyncBasicDetector to use far less NioSocketConnectors for service detection, which should substantially reduce file handle consumption and eliminate a crashed state where thousands of file handles are never released --- .../provision/support/AsyncBasicDetector.java | 77 ++++++---- .../provision/support/ConnectionFactory.java | 167 ++++++++++++++++++++ .../netmgt/provision/support/ConnectorFactory.java | 77 --------- 3 files changed, 214 insertions(+), 107 deletions(-) create mode 100644 opennms-provision/opennms-provision-api/src/main/java/org/opennms/netmgt/provision/support/ConnectionFactory.java delete mode 100644 opennms-provision/opennms-provision-api/src/main/java/org/opennms/netmgt/provision/support/ConnectorFactory.java diff --git a/opennms-provision/opennms-provision-api/src/main/java/org/opennms/netmgt/provision/support/AsyncBasicDetector.java b/opennms-provision/opennms-provision-api/src/main/java/org/opennms/netmgt/provision/support/AsyncBasicDetector.java index bcfe6d6..327eaf0 100644 --- a/opennms-provision/opennms-provision-api/src/main/java/org/opennms/netmgt/provision/support/AsyncBasicDetector.java +++ b/opennms-provision/opennms-provision-api/src/main/java/org/opennms/netmgt/provision/support/AsyncBasicDetector.java @@ -46,11 +46,12 @@ import org.apache.mina.core.future.ConnectFuture; import org.apache.mina.core.future.IoFutureListener; import org.apache.mina.core.service.IoHandler; import org.apache.mina.core.session.IdleStatus; +import org.apache.mina.core.session.IoSession; +import org.apache.mina.core.session.IoSessionInitializer; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.codec.textline.TextLineCodecFactory; import org.apache.mina.filter.logging.LoggingFilter; import org.apache.mina.filter.ssl.SslFilter; -import org.apache.mina.transport.socket.SocketConnector; import org.opennms.core.utils.LogUtils; import org.opennms.netmgt.provision.DetectFuture; import org.opennms.netmgt.provision.DetectorMonitor; @@ -74,8 +75,9 @@ public abstract class AsyncBasicDetector extends AsyncAbstrac private AsyncClientConversation m_conversation = new AsyncClientConversation(); private boolean useSSLFilter = false; - private ConnectorFactory s_connectorFactory = new ConnectorFactory(); - private SocketConnector m_connector; + + private ConnectFuture m_connection; + private ConnectionFactory m_connectionFactory; /** *

Constructor for AsyncBasicDetector.

@@ -109,30 +111,43 @@ public abstract class AsyncBasicDetector extends AsyncAbstrac /** {@inheritDoc} */ @Override public DetectFuture isServiceDetected(final InetAddress address, final DetectorMonitor monitor) throws Exception { - m_connector = s_connectorFactory.getConnector(); + + final DetectFuture detectFuture = new DefaultDetectFuture(this); - final DetectFuture future = new DefaultDetectFuture(this); + // Set this up here because it can throw an Exception, which we want + // to throw now, not in initializeSession + final SSLContext c = createClientSSLContext(); - // Set connect timeout. - m_connector.setConnectTimeoutMillis( getTimeout() ); - m_connector.setHandler( createDetectorHandler(future) ); - - if(isUseSSLFilter()) { - final SslFilter filter = new SslFilter(createClientSSLContext()); - filter.setUseClientMode(true); - m_connector.getFilterChain().addFirst("SSL", filter); - } - - m_connector.getFilterChain().addLast( "logger", getLoggingFilter() != null ? getLoggingFilter() : new LoggingFilter() ); - m_connector.getFilterChain().addLast( "codec", getProtocolCodecFilter()); - m_connector.getSessionConfig().setIdleTime( IdleStatus.READER_IDLE, getIdleTime() ); + // Create an IoSessionInitializer that will configure this individual + // session. Previously, all this was done on a new Connector each time + // but that was leaking file handles all over the place. This way gives + // us per-connection settings without the overhead of creating new + // Connectors each time + IoSessionInitializer init = new IoSessionInitializer() { - // Start communication - final InetSocketAddress socketAddress = new InetSocketAddress(address, getPort()); - final ConnectFuture cf = m_connector.connect( socketAddress ); - cf.addListener(retryAttemptListener( m_connector, future, socketAddress, getRetries() )); - - return future; + public void initializeSession(IoSession session, ConnectFuture future) { + // Add filters to the session + if(isUseSSLFilter()) { + final SslFilter filter = new SslFilter(c); + filter.setUseClientMode(true); + session.getFilterChain().addFirst("SSL", filter); + } + session.getFilterChain().addLast( "logger", getLoggingFilter() != null ? getLoggingFilter() : new LoggingFilter() ); + session.getFilterChain().addLast( "codec", getProtocolCodecFilter()); + session.getConfig().setIdleTime(IdleStatus.READER_IDLE, getIdleTime()); + // Give the session an IoHandler that will get everything delegated to it + // by the SessionDelegateIoHandler + session.setAttribute( IoHandler.class, createDetectorHandler(detectFuture) ); + } + }; + + // Start communication + final InetSocketAddress socketAddress = new InetSocketAddress(address, getPort()); + m_connectionFactory = ConnectionFactory.getFactory(getTimeout()); + final ConnectFuture cf = m_connectionFactory.connect(socketAddress, init); + cf.addListener(retryAttemptListener( m_connectionFactory, detectFuture, socketAddress, init, getRetries() )); + + return detectFuture; } /** @@ -140,8 +155,9 @@ public abstract class AsyncBasicDetector extends AsyncAbstrac */ public void dispose(){ LogUtils.debugf(this, "calling dispose on detector %s", getServiceName()); - s_connectorFactory.dispose(m_connector); - m_connector = null; + ConnectionFactory.dispose(m_connectionFactory, m_connection); + m_connectionFactory = null; + m_connection = null; } /** @@ -166,7 +182,7 @@ public abstract class AsyncBasicDetector extends AsyncAbstrac * @param retryAttempt * @return IoFutureListener */ - private IoFutureListener retryAttemptListener(final SocketConnector connector,final DetectFuture detectFuture, final InetSocketAddress address, final int retryAttempt) { + private IoFutureListener retryAttemptListener(final ConnectionFactory connector, final DetectFuture detectFuture, final InetSocketAddress address, final IoSessionInitializer init, final int retryAttempt) { return new IoFutureListener() { public void operationComplete(ConnectFuture future) { @@ -178,8 +194,9 @@ public abstract class AsyncBasicDetector extends AsyncAbstrac detectFuture.setServiceDetected(false); }else { LogUtils.infof(this, "Connection exception occurred %s for service %s retrying attempt: ", cause, getServiceName()); - future = connector.connect(address); - future.addListener(retryAttemptListener(connector, detectFuture, address, retryAttempt -1)); + // Connect without using a semaphore + future = connector.reConnect(address, init); + future.addListener(retryAttemptListener(connector, detectFuture, address, init, retryAttempt -1)); } }else if(cause instanceof Throwable) { LogUtils.infof(this, "Threw a Throwable and detection is false for service %s", getServiceName()); @@ -227,7 +244,7 @@ public abstract class AsyncBasicDetector extends AsyncAbstrac * @return a {@link org.apache.mina.core.service.IoHandler} object. */ protected IoHandler createDetectorHandler(final DetectFuture future) { - ((BaseDetectorHandler) m_detectorHandler).setConversation(getConversation()); + m_detectorHandler.setConversation(getConversation()); m_detectorHandler.setFuture(future); return m_detectorHandler; } diff --git a/opennms-provision/opennms-provision-api/src/main/java/org/opennms/netmgt/provision/support/ConnectionFactory.java b/opennms-provision/opennms-provision-api/src/main/java/org/opennms/netmgt/provision/support/ConnectionFactory.java new file mode 100644 index 0000000..e905143 --- /dev/null +++ b/opennms-provision/opennms-provision-api/src/main/java/org/opennms/netmgt/provision/support/ConnectionFactory.java @@ -0,0 +1,167 @@ +package org.opennms.netmgt.provision.support; + +import java.net.SocketAddress; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.Semaphore; + +import org.apache.mina.core.future.ConnectFuture; +import org.apache.mina.core.session.IoSessionInitializer; +import org.apache.mina.transport.socket.nio.NioSocketConnector; +import org.opennms.core.utils.LogUtils; + +/** + * Factory for encapsulating a NioSocketConnector in such a way as to allow us + * to have a Semaphore limiting the number of active Connections across all + * Connectors. + *

+ * There will be one ConnectionFactory for each discrete connection timeout + * value. + *

+ * Adapted from original ConnectorFactory. + * + * @author ranger, Duncan Mackintosh + * @version $Id: $ + */ +public class ConnectionFactory { + + /** Map of timeoutInMillis to a ConnectionFactory with that timeout */ + private static Map s_connectorPool + = new HashMap(); + + + // Semaphores for number of connectors (small, probably ~ 20) and + // connections (large, in the hundreds). + private static Semaphore s_availableConnectors; + private static Semaphore s_availableConnections; + + static{ + if(System.getProperty("org.opennms.netmgt.provision.maxConcurrentConnectors") != null){ + + if(Integer.parseInt(System.getProperty("org.opennms.netmgt.provision.maxConcurrentConnectors")) == 0){ + s_availableConnectors = null; + }else{ + s_availableConnectors = new Semaphore(Integer.parseInt(System.getProperty("org.opennms.netmgt.provision.maxConcurrentConnectors", "20"))); + } + } + + if(System.getProperty("org.opennms.netmgt.provision.maxConcurrentConnections") != null){ + + if(Integer.parseInt(System.getProperty("org.opennms.netmgt.provision.maxConcurrentConnections")) == 0){ + s_availableConnectors = null; + }else{ + s_availableConnectors = new Semaphore(Integer.parseInt(System.getProperty("org.opennms.netmgt.provision.maxConcurrentConnections", "2000"))); + } + } + } + + /** + * Count the number of references to this Factory so we can dispose it + * when there are no active references + */ + private int m_references; + /** + * The actual connector + */ + private NioSocketConnector m_connector; + + /** + * Create a new factory. Private because one should use {@link #getFactory(int)} + */ + private ConnectionFactory(int timeoutInMillis) { + m_connector = new NioSocketConnector(); + m_connector.setHandler(new SessionDelegateIoHandler()); + m_connector.setConnectTimeoutMillis(timeoutInMillis); + } + + /** + * Get a new ConnectionFactory. If there is already a Factory with the + * desired timeout, you will get that one; otherwise a new one is created. + *

+ * If org.opennms.netmgt.provision.maxConcurrentConnectors is set, this may + * block until a connector is available. + * + * @param timeoutInMillis + * Connection timeout + * @return + * An appropriate Factory + */ + public static ConnectionFactory getFactory(int timeoutInMillis) { + LogUtils.debugf(ConnectionFactory.class, "Creating a ConnectionFactory for timeout %d, there are already %d factories", timeoutInMillis, s_connectorPool.size()); + if (s_availableConnectors != null) { + s_availableConnectors.acquireUninterruptibly(); + } + synchronized (s_connectorPool) { + ConnectionFactory w = s_connectorPool.get(timeoutInMillis); + if (w == null) { + w = new ConnectionFactory(timeoutInMillis); + s_connectorPool.put(timeoutInMillis, w); + } + w.m_references++; + return w; + } + } + + /** + * Connect to a remote socket. If org.opennms.netmgt.provision.maxConcurrentConnections + * is set, this may block until a connection slot is available. + *

+ * You must dispose both the ConnectionFactory and ConncetFuture when done + * by calling {@link #dispose(ConnectionFactory, ConnectFuture)}. + * + * @param destination + * Destination address + * @param init + * Initialiser for the IoSession + * @return + * ConnectFuture from a Mina connect call + */ + public ConnectFuture connect(SocketAddress destination, IoSessionInitializer init) { + if (s_availableConnections != null) { + s_availableConnections.acquireUninterruptibly(); + } + return m_connector.connect(destination, init); + } + + /** + * Retry a connection. This does not consume a connection slot, so will not + * block or throw IntrruptedExceptions. Use only if you have already + * acquired a connection slot using {@link #connect(SocketAddress, IoSessionInitializer)}. + * + * @param destination + * @param init + * @return + */ + public ConnectFuture reConnect(SocketAddress destination, IoSessionInitializer init) { + return m_connector.connect(destination, init); + } + + /** + * Free up the resources used by a connection and connection factory. + * @param factory + * @param connection + */ + public static void dispose(ConnectionFactory factory, ConnectFuture connection) { + if (s_availableConnections != null) { + s_availableConnections.release(); + } + + if (--factory.m_references <= 0) { + if (s_availableConnectors != null) { + s_availableConnectors.release(); + } + + Iterator> i = s_connectorPool.entrySet().iterator(); + while(i.hasNext()) { + if(i.next().getValue() == factory) { + i.remove(); + } + } + + factory.m_connector.dispose(); + } + } + +} diff --git a/opennms-provision/opennms-provision-api/src/main/java/org/opennms/netmgt/provision/support/ConnectorFactory.java b/opennms-provision/opennms-provision-api/src/main/java/org/opennms/netmgt/provision/support/ConnectorFactory.java deleted file mode 100644 index 2db9545..0000000 --- a/opennms-provision/opennms-provision-api/src/main/java/org/opennms/netmgt/provision/support/ConnectorFactory.java +++ /dev/null @@ -1,77 +0,0 @@ -package org.opennms.netmgt.provision.support; - -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; -import java.util.concurrent.Semaphore; - -import org.apache.mina.transport.socket.SocketConnector; -import org.apache.mina.transport.socket.nio.NioSocketConnector; -import org.opennms.core.utils.ThreadCategory; - -/** - *

ConnectorFactory class.

- * - * @author ranger - * @version $Id: $ - */ -public class ConnectorFactory { - - - private static Semaphore s_available; - - static{ - if(System.getProperty("org.opennms.netmgt.provision.maxConcurrentConnectors") != null){ - - if(Integer.parseInt(System.getProperty("org.opennms.netmgt.provision.maxConcurrentConnectors")) == 0){ - s_available = null; - }else{ - s_available = new Semaphore(Integer.parseInt(System.getProperty("org.opennms.netmgt.provision.maxConcurrentConnectors", "2000"))); - } - } - } - - private static Executor s_executor = Executors.newSingleThreadExecutor(); - - /** - *

getConnector

- * - * @return a {@link org.apache.mina.transport.socket.SocketConnector} object. - * @throws java.lang.InterruptedException if any. - */ - public SocketConnector getConnector() throws InterruptedException { - if(s_available != null){ - s_available.acquire(); - } - return createConnector(); - } - - /** - *

dispose

- * - * @param connector a {@link org.apache.mina.transport.socket.SocketConnector} object. - */ - public void dispose(final SocketConnector connector) { - Runnable r = new Runnable(){ - - public void run() { - ThreadCategory.getInstance(ConnectorFactory.class).debug("Disposing the connector"); - try{ - connector.dispose(); - }finally{ - if(s_available != null){ - s_available.release(); - } - } - - } - - }; - - s_executor.execute(r); - } - - private SocketConnector createConnector() throws InterruptedException{ - return new NioSocketConnector(); //m_socketPool.getItem(); - } - -} -- 1.7.1