org.apache.http.impl.nio.reactor
Class AbstractMultiworkerIOReactor

java.lang.Object
  extended by org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor
All Implemented Interfaces:
IOReactor
Direct Known Subclasses:
DefaultConnectingIOReactor, DefaultListeningIOReactor

public abstract class AbstractMultiworkerIOReactor
extends Object
implements IOReactor

Generic implementation of IOReactor that can run multiple BaseIOReactor instance in separate worker threads and distribute newly created I/O session equally across those I/O reactors for a more optimal resource utilization and a better I/O performance. Usually it is recommended to have one worker I/O reactor per physical CPU core.

Important note about exception handling

Protocol specific exceptions as well as those I/O exceptions thrown in the course of interaction with the session's channel are to be expected are to be dealt with by specific protocol handlers. These exceptions may result in termination of an individual session but should not affect the I/O reactor and all other active sessions. There are situations, however, when the I/O reactor itself encounters an internal problem such as an I/O exception in the underlying NIO classes or an unhandled runtime exception. Those types of exceptions are usually fatal and will cause the I/O reactor to shut down automatically.

There is a possibility to override this behavior and prevent I/O reactors from shutting down automatically in case of a runtime exception or an I/O exception in internal classes. This can be accomplished by providing a custom implementation of the IOReactorExceptionHandler interface.

If an I/O reactor is unable to automatically recover from an I/O or a runtime exception it will enter the shutdown mode. First off, it cancel all pending new session requests. Then it will attempt to close all active I/O sessions gracefully giving them some time to flush pending output data and terminate cleanly. Lastly, it will forcibly shut down those I/O sessions that still remain active after the grace period. This is a fairly complex process, where many things can fail at the same time and many different exceptions can be thrown in the course of the shutdown process. The I/O reactor will record all exceptions thrown during the shutdown process, including the original one that actually caused the shutdown in the first place, in an audit log. One can obtain the audit log using getAuditLog(), examine exceptions thrown by the I/O reactor prior and in the course of the reactor shutdown and decide whether it is safe to restart the I/O reactor.

Since:
4.0
Version:
$Revision: 744539 $

Field Summary
protected  List<ExceptionEvent> auditLog
           
protected  IOReactorExceptionHandler exceptionHandler
           
protected  HttpParams params
           
protected  Selector selector
           
protected  long selectTimeout
           
protected  IOReactorStatus status
           
 
Constructor Summary
AbstractMultiworkerIOReactor(int workerCount, ThreadFactory threadFactory, HttpParams params)
          Creates an instance of AbstractMultiworkerIOReactor.
 
Method Summary
protected  void addChannel(ChannelEntry entry)
          Assigns the given channel entry to one of the worker I/O reactors.
protected  void addExceptionEvent(Throwable ex)
          Adds the given Throwable object to the audit log.
protected  void addExceptionEvent(Throwable ex, Date timestamp)
          Adds the given Throwable object with the given time stamp to the audit log.
protected  void awaitShutdown(long timeout)
          Blocks for the given period of time in milliseconds awaiting the completion of the reactor shutdown.
protected abstract  void cancelRequests()
          Triggered to cancel pending session requests.
protected  void doShutdown()
          Activates the shutdown sequence for this reactor.
 void execute(IOEventDispatch eventDispatch)
          Activates the main I/O reactor as well as all worker I/O reactors.
 List<ExceptionEvent> getAuditLog()
          Returns the audit log containing exceptions thrown by the I/O reactor prior and in the course of the reactor shutdown.
 IOReactorStatus getStatus()
          Returns the current status of the reactor.
protected  void prepareSocket(Socket socket)
          Prepares the given Socket by resetting some of its properties.
protected abstract  void processEvents(int count)
          Triggered to process I/O events registered by the main Selector.
protected  SelectionKey registerChannel(SelectableChannel channel, int ops)
          Registers the given channel with the main Selector.
 void setExceptionHandler(IOReactorExceptionHandler exceptionHandler)
          Sets exception handler for this I/O reactor.
 void shutdown()
          Initiates shutdown of the reactor and blocks for a default period of time waiting for the reactor to terminate all active connections, to shut down itself and to release system resources it currently holds.
 void shutdown(long waitMs)
          Initiates shutdown of the reactor and blocks approximately for the given period of time in milliseconds waiting for the reactor to terminate all active connections, to shut down itself and to release system resources it currently holds.
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

status

protected volatile IOReactorStatus status

params

protected final HttpParams params

selector

protected final Selector selector

selectTimeout

protected final long selectTimeout

exceptionHandler

protected IOReactorExceptionHandler exceptionHandler

auditLog

protected List<ExceptionEvent> auditLog
Constructor Detail

AbstractMultiworkerIOReactor

public AbstractMultiworkerIOReactor(int workerCount,
                                    ThreadFactory threadFactory,
                                    HttpParams params)
                             throws IOReactorException
Creates an instance of AbstractMultiworkerIOReactor.

Parameters:
workerCount - number of worker I/O reactors.
threadFactory - the factory to create threads. Can be null.
params - HTTP parameters.
Throws:
IOReactorException - in case if a non-recoverable I/O error.
Method Detail

getStatus

public IOReactorStatus getStatus()
Description copied from interface: IOReactor
Returns the current status of the reactor.

Specified by:
getStatus in interface IOReactor
Returns:
reactor status.

getAuditLog

public List<ExceptionEvent> getAuditLog()
Returns the audit log containing exceptions thrown by the I/O reactor prior and in the course of the reactor shutdown.

Returns:
audit log.

addExceptionEvent

protected void addExceptionEvent(Throwable ex,
                                 Date timestamp)
Adds the given Throwable object with the given time stamp to the audit log.

Parameters:
ex - the exception thrown by the I/O reactor.
timestamp - the time stamp of the exception. Can be null in which case the current date / time will be used.

addExceptionEvent

protected void addExceptionEvent(Throwable ex)
Adds the given Throwable object to the audit log.

Parameters:
ex - the exception thrown by the I/O reactor.

setExceptionHandler

public void setExceptionHandler(IOReactorExceptionHandler exceptionHandler)
Sets exception handler for this I/O reactor.

Parameters:
exceptionHandler - the exception handler.

processEvents

protected abstract void processEvents(int count)
                               throws IOReactorException
Triggered to process I/O events registered by the main Selector.

Super-classes can implement this method to react to the event.

Parameters:
count - event count.
Throws:
IOReactorException - in case if a non-recoverable I/O error.

cancelRequests

protected abstract void cancelRequests()
                                throws IOReactorException
Triggered to cancel pending session requests.

Super-classes can implement this method to react to the event.

Throws:
IOReactorException - in case if a non-recoverable I/O error.

execute

public void execute(IOEventDispatch eventDispatch)
             throws InterruptedIOException,
                    IOReactorException
Activates the main I/O reactor as well as all worker I/O reactors. The I/O main reactor will start reacting to I/O events and triggering notification methods. The worker I/O reactor in their turn will start reacting to I/O events and dispatch I/O event notifications to the given IOEventDispatch interface.

This method will enter the infinite I/O select loop on the Selector instance associated with this I/O reactor and used to manage creation of new I/O channels. Once a new I/O channel has been created the processing of I/O events on that channel will be delegated to one of the worker I/O reactors.

The method will remain blocked unto the I/O reactor is shut down or the execution thread is interrupted.

The following HTTP parameters affect execution of this method:

The NIOReactorPNames.SELECT_INTERVAL parameter determines the time interval in milliseconds at which the I/O reactor wakes up to check for timed out sessions and session requests.

Specified by:
execute in interface IOReactor
Parameters:
eventDispatch - the I/O event dispatch.
Throws:
InterruptedIOException - if the dispatch thread is interrupted.
IOReactorException - in case if a non-recoverable I/O error.
See Also:
processEvents(int), cancelRequests()

doShutdown

protected void doShutdown()
                   throws InterruptedIOException
Activates the shutdown sequence for this reactor. This method will cancel all pending session requests, close out all active I/O channels, make an attempt to terminate all worker I/O reactors gracefully, and finally force-terminate those I/O reactors that failed to terminate after the specified grace period.

The following HTTP parameters affect execution of this method:

The NIOReactorPNames.GRACE_PERIOD parameter determines the grace period the I/O reactors are expected to block waiting for individual worker threads to terminate cleanly.

Throws:
InterruptedIOException - if the shutdown sequence has been interrupted.

addChannel

protected void addChannel(ChannelEntry entry)
Assigns the given channel entry to one of the worker I/O reactors.

Parameters:
entry - the channel entry.

registerChannel

protected SelectionKey registerChannel(SelectableChannel channel,
                                       int ops)
                                throws ClosedChannelException
Registers the given channel with the main Selector.

Parameters:
channel - the channel.
ops - interest ops.
Returns:
selection key.
Throws:
ClosedChannelException - if the channel has been already closed.

prepareSocket

protected void prepareSocket(Socket socket)
                      throws IOException
Prepares the given Socket by resetting some of its properties.

The following HTTP parameters affect execution of this method:

CoreConnectionPNames.TCP_NODELAY parameter determines whether Nagle's algorithm is to be used. The Nagle's algorithm tries to conserve bandwidth by minimizing the number of segments that are sent. When applications wish to decrease network latency and increase performance, they can disable Nagle's algorithm (that is enable TCP_NODELAY). Data will be sent earlier, at the cost of an increase in bandwidth consumption.

CoreConnectionPNames.SO_TIMEOUT parameter defines the socket timeout in milliseconds, which is the timeout for waiting for data. A timeout value of zero is interpreted as an infinite timeout.

CoreConnectionPNames.SO_LINGER parameter defines linger time in seconds. The maximum timeout value is platform specific. Value 0 implies that the option is disabled. Value -1 implies that the JRE default is to be used. The setting only affects socket close.

Parameters:
socket - the socket
Throws:
IOException - in case of an I/O error.

awaitShutdown

protected void awaitShutdown(long timeout)
                      throws InterruptedException
Blocks for the given period of time in milliseconds awaiting the completion of the reactor shutdown. If the value of timeout is set to 0 this method blocks indefinitely.

Parameters:
timeout - the maximum wait time.
Throws:
InterruptedException - if interrupted.

shutdown

public void shutdown()
              throws IOException
Description copied from interface: IOReactor
Initiates shutdown of the reactor and blocks for a default period of time waiting for the reactor to terminate all active connections, to shut down itself and to release system resources it currently holds. It is up to individual implementations to decide for how long this method can remain blocked.

Specified by:
shutdown in interface IOReactor
Throws:
IOException - in case of an I/O error.

shutdown

public void shutdown(long waitMs)
              throws IOException
Description copied from interface: IOReactor
Initiates shutdown of the reactor and blocks approximately for the given period of time in milliseconds waiting for the reactor to terminate all active connections, to shut down itself and to release system resources it currently holds.

Specified by:
shutdown in interface IOReactor
Parameters:
waitMs - wait time in milliseconds.
Throws:
IOException - in case of an I/O error.


Copyright © 2005-2009 Apache Software Foundation. All Rights Reserved.