tigase.server
Class AbstractMessageReceiver

java.lang.Object
  extended by tigase.server.BasicComponent
      extended by tigase.server.AbstractMessageReceiver
All Implemented Interfaces:
Configurable, XMPPService, MessageReceiver, ServerComponent, StatisticsContainer, VHostListener
Direct Known Subclasses:
ConnectionManager, Gateway, MessageRouter, SessionManager, StanzaReceiver, StanzaSender, TestComponent

public abstract class AbstractMessageReceiver
extends BasicComponent
implements StatisticsContainer, MessageReceiver

This is an archetype for all classes processing user-level packets. The implementation is designed for a heavy packets processing with internal queues and number of separate threads depending on number of CPUs. Extensions of the class can process normall user packets and administrator packets via ad-hoc commands. Good examples of such components are MUC, PubSub, SessionManager.

The class offers scripting API for administrator ad-hoc commands.

By default it internally uses priority queues which in some rare cases may lead to packets reordering. When this happens and it is unacceptable for the deployment non-priority queues can be used. The queues size is limited and depends on the available memory size.

Packets are processed by processPacket(Packet packet) method which is concurrently called from multiple threads. Created: Tue Nov 22 07:07:11 2005

Version:
$Rev: 2132 $
Author:
Artur Hefczyc

Field Summary
protected static long HOUR
          Constant used in time calculation procedures.
static String INCOMING_FILTERS_PROP_KEY
          Configuration property key for setting incoming packets filters on the component level.
static String INCOMING_FILTERS_PROP_VAL
          Configuration property default vakue with a default incoming packet filter loaded by Tigase server.

This is a comma-separated list of classes which should be loaded as packet filters.

static String MAX_QUEUE_SIZE_PROP_KEY
          Configuration property key allowing to overwrite a default (memory size dependent) size for the component internal queues.
static Integer MAX_QUEUE_SIZE_PROP_VAL
          A default value for max queue size property.
protected  int maxQueueSize
           
protected static long MINUTE
          Constant used in time calculation procedures.
static String OUTGOING_FILTERS_PROP_KEY
          Configuration property key for setting outgoing packets filters on the component level.
static String OUTGOING_FILTERS_PROP_VAL
          Configuration property default vakue with a default outgoing packet filter loaded by Tigase server.

This is a comma-separated list of classes which should be loaded as packet filters.

protected static long SECOND
          Constant used in time calculation procedures.
 
Fields inherited from class tigase.server.BasicComponent
admins, SCRIPTS_DIR_PROP_DEF, SCRIPTS_DIR_PROP_KEY, vHostManager
 
Fields inherited from interface tigase.conf.Configurable
ADMINS_PROP_KEY, AUTH_REPO_CLASS_PROP_KEY, AUTH_REPO_PARAMS_NODE, AUTH_REPO_URL_PROP_KEY, BOSH_COMP_CLASS_NAME, C2S_CLUST_COMP_CLASS_NAME, C2S_COMP_CLASS_NAME, CL_COMP_CLASS_NAME, CLUSTER_CONECT, CLUSTER_CONTR_CLASS_NAME, CLUSTER_LISTEN, CLUSTER_MODE, CLUSTER_NODES, CLUSTER_NODES_PROP_KEY, COMP_PROT_CLASS_NAME, COMPONENT_ID_PROP_KEY, DEF_BOSH_NAME, DEF_C2S_NAME, DEF_CL_COMP_NAME, DEF_CLUST_CONTR_NAME, DEF_COMP_PROT_NAME, DEF_EXT_COMP_NAME, DEF_HOSTNAME_PROP_KEY, DEF_S2S_NAME, DEF_SM_NAME, DEF_SRECV_NAME, DEF_SSEND_NAME, DEF_STATS_NAME, DEF_VHOST_MAN_NAME, DERBY_REPO_CLASS_PROP_VAL, DERBY_REPO_URL_PROP_VAL, DRUPAL_REPO_CLASS_PROP_VAL, DRUPAL_REPO_URL_PROP_VAL, EXT_COMP_CLASS_NAME, GEN_ADMINS, GEN_AUTH_DB, GEN_AUTH_DB_URI, GEN_COMP_CLASS, GEN_COMP_NAME, GEN_CONF, GEN_CONFIG, GEN_CONFIG_ALL, GEN_CONFIG_COMP, GEN_CONFIG_CS, GEN_CONFIG_DEF, GEN_CONFIG_SM, GEN_DEBUG, GEN_DEBUG_PACKAGES, GEN_EXT_COMP, GEN_MAX_QUEUE_SIZE, GEN_SCRIPT_DIR, GEN_SM_PLUGINS, GEN_SREC_ADMINS, GEN_SREC_DB, GEN_SREC_DB_URI, GEN_TEST, GEN_TRUSTED, GEN_USER_DB, GEN_USER_DB_URI, GEN_VIRT_HOSTS, HOSTNAMES_PROP_KEY, LIBRESOURCE_REPO_CLASS_PROP_VAL, LIBRESOURCE_REPO_URL_PROP_VAL, MONITORING, MYSQL_REPO_CLASS_PROP_VAL, MYSQL_REPO_URL_PROP_VAL, PGSQL_REPO_CLASS_PROP_VAL, PGSQL_REPO_URL_PROP_VAL, ROUTER_COMP_CLASS_NAME, S2S_CLUST_COMP_CLASS_NAME, S2S_COMP_CLASS_NAME, SHARED_AUTH_REPO_PARAMS_PROP_KEY, SHARED_AUTH_REPO_PROP_KEY, SHARED_USER_REPO_PARAMS_PROP_KEY, SHARED_USER_REPO_POOL_PROP_KEY, SHARED_USER_REPO_PROP_KEY, SM_CLUS_COMP_CLASS_NAME, SM_COMP_CLASS_NAME, SRECV_COMP_CLASS_NAME, SSEND_COMP_CLASS_NAME, STATS_CLASS_NAME, STRINGPREP_PROCESSOR, TIGASE_AUTH_REPO_CLASS_PROP_VAL, TIGASE_AUTH_REPO_URL_PROP_VAL, TIGASE_CUSTOM_AUTH_REPO_CLASS_PROP_VAL, TRUSTED_PROP_KEY, USER_REPO_CLASS_PROP_KEY, USER_REPO_PARAMS_NODE, USER_REPO_POOL_SIZE, USER_REPO_POOL_SIZE_PROP_KEY, USER_REPO_URL_PROP_KEY, VHOST_MAN_CLASS_NAME, XML_REPO_CLASS_PROP_VAL, XML_REPO_URL_PROP_VAL
 
Fields inherited from interface tigase.disco.XMPPService
CMD_FEATURES, DEF_FEATURES, INFO_XMLNS, ITEMS_XMLNS
 
Constructor Summary
AbstractMessageReceiver()
           
 
Method Summary
protected  boolean addOutPacket(Packet packet)
           
protected  boolean addOutPacketNB(Packet packet)
          Non blocking version of addOutPacket.
protected  boolean addOutPackets(Queue<Packet> packets)
           
protected  boolean addOutPacketWithTimeout(Packet packet, ReceiverTimeoutHandler handler, long delay, TimeUnit unit)
           
 boolean addPacket(Packet packet)
          Method adds a Packet object to the internal input queue.
 boolean addPacketNB(Packet packet)
          This is a variant of addPacket(Packet) method which adds Packet to in the internal input queue without blocking.

The method returns a boolean value of true if the packet has been successfuly added to the queue and false otherwise.

Use of the non-blocking methods is not recommended for most of the components implementations.

 boolean addPackets(Queue<Packet> packets)
          This is a convenience method for adding all packets stored in given queue to the component's internal input queue.
The method calls addPacket(Packet) in a loop for each packet in the queue.
 void addRegexRouting(String address)
          Method adds a new routing address for the component.
protected  void addTimerTask(TimerTask task, long delay)
           
protected  void addTimerTask(TimerTask task, long delay, TimeUnit unit)
           
 void clearRegexRoutings()
          Method clears, removes all the component routing addresses.
 void everyHour()
          Utility method executed precisely every hour.
 void everyMinute()
          Utility method executed precisely every minute.
 void everySecond()
          Utility method executed precisely every second.
 Map<String,Object> getDefaults(Map<String,Object> params)
          Returns defualt configuration settings for the component as a Map with keys as configuration property IDs and values as the configuration property values.
protected  Integer getMaxQueueSize(int def)
           
 Set<Pattern> getRegexRoutings()
          Method returns a Set with all component's routings as a precompiled regular expression patterns.
 void getStatistics(StatisticsList list)
          Method returns component statistics.
 int hashCodeForPacket(Packet packet)
          This method decides how incoming packets are distributed among processing threads.
 boolean isInRegexRoutings(String address)
          Method description
 String newPacketId(String prefix)
          Method description
 int processingThreads()
          Method description
abstract  void processPacket(Packet packet)
          This is the main Packet processing method.
 void processPacket(Packet packet, Queue<Packet> results)
          Method description
 void release()
          Method description
 boolean removeRegexRouting(String address)
          Method description
 void setMaxQueueSize(int maxQueueSize)
          Method description
 void setName(String name)
          Method description
 void setParent(MessageReceiver parent)
          Method description
 void setProperties(Map<String,Object> props)
          Sets all configuration properties for object.
 void start()
          Method description
 void stop()
          Method description
 
Methods inherited from class tigase.server.BasicComponent
addComponentDomain, getComponentId, getDefHostName, getDiscoCategoryType, getDiscoDescription, getDiscoFeatures, getDiscoFeatures, getDiscoInfo, getDiscoInfo, getDiscoItems, getDiscoItems, getName, getVHostItem, handlesLocalDomains, handlesNameSubdomains, handlesNonLocalDomains, initBindings, initializationCompleted, isAdmin, isLocalDomain, isLocalDomainOrComponent, processScriptCommand, removeComponentDomain, removeServiceDiscoveryItem, setVHostManager, updateServiceDiscoveryItem, updateServiceDiscoveryItem, updateServiceDiscoveryItem
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 
Methods inherited from interface tigase.server.MessageReceiver
getDefHostName
 
Methods inherited from interface tigase.server.ServerComponent
getComponentId, getName, initializationCompleted
 

Field Detail

INCOMING_FILTERS_PROP_KEY

public static final String INCOMING_FILTERS_PROP_KEY
Configuration property key for setting incoming packets filters on the component level.

See Also:
Constant Field Values

INCOMING_FILTERS_PROP_VAL

public static final String INCOMING_FILTERS_PROP_VAL
Configuration property default vakue with a default incoming packet filter loaded by Tigase server.

This is a comma-separated list of classes which should be loaded as packet filters. The classes must implement PacketFilterIfc interface.

See Also:
Constant Field Values

MAX_QUEUE_SIZE_PROP_KEY

public static final String MAX_QUEUE_SIZE_PROP_KEY
Configuration property key allowing to overwrite a default (memory size dependent) size for the component internal queues. By default the queue size is adjusted to the available memory size to avoid out of memory errors.

See Also:
Constant Field Values

MAX_QUEUE_SIZE_PROP_VAL

public static final Integer MAX_QUEUE_SIZE_PROP_VAL
A default value for max queue size property. The value is calculated at the server startup time using following formula:
Runtime.getRuntime().maxMemory() / 400000L You can change the default queue size by setting a different value for the MAX_QUEUE_SIZE_PROP_KEY property in the server configuration.


OUTGOING_FILTERS_PROP_KEY

public static final String OUTGOING_FILTERS_PROP_KEY
Configuration property key for setting outgoing packets filters on the component level. This is a comma-separated list of classes which should be loaded as packet filters. The classes must implement PacketFilterIfc interface.

See Also:
Constant Field Values

OUTGOING_FILTERS_PROP_VAL

public static final String OUTGOING_FILTERS_PROP_VAL
Configuration property default vakue with a default outgoing packet filter loaded by Tigase server.

This is a comma-separated list of classes which should be loaded as packet filters. The classes must implement PacketFilterIfc interface.

See Also:
Constant Field Values

SECOND

protected static final long SECOND
Constant used in time calculation procedures. Indicates a second that is 1000 milliseconds.

See Also:
Constant Field Values

MINUTE

protected static final long MINUTE
Constant used in time calculation procedures. Indicates a minute that is 60 SECONDs.

See Also:
Constant Field Values

HOUR

protected static final long HOUR
Constant used in time calculation procedures. Indicates a hour that is 60 MINUTEs.

See Also:
Constant Field Values

maxQueueSize

protected int maxQueueSize
Constructor Detail

AbstractMessageReceiver

public AbstractMessageReceiver()
Method Detail

processPacket

public abstract void processPacket(Packet packet)
This is the main Packet processing method. It is called concurrently from many threads so implementing it in thread save manner is essential. The method is called for each packet addressed to the component.

Please note, the Packet instance may be processed by different parts of the server, different components or plugins at the same time. Therefore this is very important to tread the Packet instance as unmodifiable object.

Processing in this method is asynchronous, therefore there is no result value. If there are some 'result' packets generated during processing, they should be passed back using addOutPacket(Packet) method.

Parameters:
packet - is an instance of the Packet class passed for processing.

addPacket

public boolean addPacket(Packet packet)
Method adds a Packet object to the internal input queue. Packets from the input queue are later passed to the processPacket(Packet) method. This is a blocking method waiting if necessary for the room if the queue is full.

The method returns a boolean value of true if the packet has been successfuly added to the queue and false otherwise.

There can be many queues and many threads processing packets for the component, however the method makes the best effort to quarantee that packets are later processed in the correct order. For example that packets for a single user always end up in the same exact queue. You can tweak the packets ditribution amongs threads by overwriting hashCodeForPacket(Packet) method.
If there is N threads the packets are distributed among thread using following logic:

 int threadNo = Math.abs(hashCodeForPacket(packet) % N);
 
This is a preferred method to be used by most Tigase components. If the queues are full the component should stop and wait for more room. The blocking methods aim to prevent from the system overloading or wasting resources for generating packets which can't be processed anyway.

Specified by:
addPacket in interface MessageReceiver
Parameters:
packet - is a Packet instance being put to the component internal input queue.
Returns:
a boolean value of true if the packet has been successfuly added to the queue and false otherwise.

addPacketNB

public boolean addPacketNB(Packet packet)
This is a variant of addPacket(Packet) method which adds Packet to in the internal input queue without blocking.

The method returns a boolean value of true if the packet has been successfuly added to the queue and false otherwise.

Use of the non-blocking methods is not recommended for most of the components implementations. The only component which is allowed to use them is the server MessageRouter implementation which can not hang on any method. This would cause a dead-lock in the application. All other components must use blocking methods and wait if the system is under so high load that it's queues are full.

See addPacket(Packet) method's documentation for some more details.

Specified by:
addPacketNB in interface MessageReceiver
Parameters:
packet - is a Packet instance being put to the component internal input queue.
Returns:
a boolean value of true if the packet has been successfuly added to the queue and false otherwise.
See Also:
AbstractMessageReceiver.addPacket(Packet packet)

addPackets

public boolean addPackets(Queue<Packet> packets)
This is a convenience method for adding all packets stored in given queue to the component's internal input queue.
The method calls addPacket(Packet) in a loop for each packet in the queue. If the call returns true then the packet is removed from the given queue, otherwise the methods ends the loop and returns false.

Please note, if the method returns true it means that all the packets from the queue passed as a parameter have been successfuly run through the addPacket(Packet) method and the queue passed as a parameter should be empty. If the method returns false then at least one packet from the parameter queue wasn't successfully run through the addPacket(Packet) method. If the method returns false then the queue passed as a parameter is not empty and it contains packet which was unseccessfully run through the addPacket(Packet) method and all the packets which were not run at all.

Specified by:
addPackets in interface MessageReceiver
Parameters:
packets - is a Queue of packets for adding to the component internal input queue. All the packets are later processed by processPacket(Packet) method in the same exact order if they are processed by the same thread. See documentation hashCodeForPacket(Packet) method how to control assiging packets to particular threads.
Returns:
a boolean value of true if all packets has been successfully added to the component's internal input queue and false otherwise.
See Also:
AbstractMessageReceiver.hashCodeForPacket(Packet packet)

addRegexRouting

public void addRegexRouting(String address)
Method adds a new routing address for the component. Routing addresses are used by the MessageRouter to calculate packet's destination. If the packet's destination address matches one of the component's routing addresses the packet is added to the component's internal input queue.

By default all components accept packets addressed to the componentId and to:

 component.getName() + '@' + any virtual domain
 

Parameters:
address - is a Java regular expression string for the packet's destination address accepted by this component.

clearRegexRoutings

public void clearRegexRoutings()
Method clears, removes all the component routing addresses. After this method call the component accepts only packets addressed to default routings that is component ID or the component name + '@' + virtual domains


everyHour

public void everyHour()
Utility method executed precisely every hour. A component can overwrite the method to put own code to be executed at the regular intervals of time.

Note, no extensive calculations should happen in this method nor long lasting operations. It is essential that the method processing does not exceed 1 hour. The overiding method must call the the super method first and only then run own code.


everyMinute

public void everyMinute()
Utility method executed precisely every minute. A component can overwrite the method to put own code to be executed at the regular intervals of time.

Note, no extensive calculations should happen in this method nor long lasting operations. It is essential that the method processing does not exceed 1 minute. The overiding method must call the the super method first and only then run own code.


everySecond

public void everySecond()
Utility method executed precisely every second. A component can overwrite the method to put own code to be executed at the regular intervals of time.

Note, no extensive calculations should happen in this method nor long lasting operations. It is essential that the method processing does not exceed 1 second. The overiding method must call the the super method first and only then run own code.


getDefaults

public Map<String,Object> getDefaults(Map<String,Object> params)
Returns defualt configuration settings for the component as a Map with keys as configuration property IDs and values as the configuration property values. All the default parameters returned from this method are later passed to the setProperties(...) method. Some of them may have changed value if they have been overwriten in the server configurtion. The configuration property value can be of any of the basic types: int, long, boolean, String.

Specified by:
getDefaults in interface Configurable
Overrides:
getDefaults in class BasicComponent
Parameters:
params - is a Map with some initial properties set for the starting up server. These parameters can be used as a hints to generate component's default configuration.
Returns:
a Map with the component default configuration.

getRegexRoutings

public Set<Pattern> getRegexRoutings()
Method returns a Set with all component's routings as a precompiled regular expression patterns. The Set can be empty but it can not be null.

Returns:
a Set with all component's routings as a precompiled regular expression patterns.

getStatistics

public void getStatistics(StatisticsList list)
Method returns component statistics. Please note, the method can be called every second by the server monitoring system therefore no extensive or lengthy calculations are allowed. If there are some statistics requiring lengthy operations like database access they must have Level.FINEST assigned and must be put inside the level guard to prevent generating them by the system monitor. The system monitor does not collect FINEST statistics.

Level guard code looks like the example below:

 if (list.checkLevel(Level.FINEST)) {
   // Some CPU intensive calculations or lengthy operations
   list.add(getName(), "Statistic description", stat_value, Level.FINEST);
 }
 
 This way you make sure your extensive operation is not executed every second by the
 monitoing system and does not affect the server performance.

Specified by:
getStatistics in interface StatisticsContainer
Parameters:
list - is a StatistcsList where all statistics are stored.

hashCodeForPacket

public int hashCodeForPacket(Packet packet)
This method decides how incoming packets are distributed among processing threads. Different components needs different distribution to efficient use all threads and avoid packets re-ordering.

If there are N processing threads, packets are distributed among threads using following code:

 int threadNo = Math.abs(hashCodeForPacket(packet) % N);
 
For a PubSub component, for example, a better packets distribution would be based on the PubSub channel name, for SM a better distribution is based on the destination address, etc....

Parameters:
packet - is a Packet which needs to be processed by some thread.
Returns:
a hash code generated for the input thread.

isInRegexRoutings

public boolean isInRegexRoutings(String address)
Method description

Specified by:
isInRegexRoutings in interface MessageReceiver
Parameters:
address -
Returns:

newPacketId

public String newPacketId(String prefix)
Method description

Parameters:
prefix -
Returns:

processPacket

public final void processPacket(Packet packet,
                                Queue<Packet> results)
Method description

Specified by:
processPacket in interface ServerComponent
Overrides:
processPacket in class BasicComponent
Parameters:
packet -
results -

processingThreads

public int processingThreads()
Method description

Returns:

release

public void release()
Method description

Specified by:
release in interface ServerComponent
Overrides:
release in class BasicComponent

removeRegexRouting

public boolean removeRegexRouting(String address)
Method description

Parameters:
address -
Returns:

setMaxQueueSize

public void setMaxQueueSize(int maxQueueSize)
Method description

Parameters:
maxQueueSize -

setName

public void setName(String name)
Method description

Specified by:
setName in interface ServerComponent
Overrides:
setName in class BasicComponent
Parameters:
name -

setParent

public void setParent(MessageReceiver parent)
Method description

Specified by:
setParent in interface MessageReceiver
Parameters:
parent -

setProperties

@TODO(note="Replace fixed filers loading with configurable options for that")
public void setProperties(Map<String,Object> props)
Sets all configuration properties for object.

Specified by:
setProperties in interface Configurable
Overrides:
setProperties in class BasicComponent
Parameters:
props -

start

public void start()
Method description

Specified by:
start in interface MessageReceiver

stop

public void stop()
Method description


addOutPacket

protected boolean addOutPacket(Packet packet)

addOutPacketNB

protected boolean addOutPacketNB(Packet packet)
Non blocking version of addOutPacket.

Parameters:
packet - a Packet value
Returns:
a boolean value

addOutPacketWithTimeout

protected boolean addOutPacketWithTimeout(Packet packet,
                                          ReceiverTimeoutHandler handler,
                                          long delay,
                                          TimeUnit unit)

addOutPackets

protected boolean addOutPackets(Queue<Packet> packets)

addTimerTask

protected void addTimerTask(TimerTask task,
                            long delay,
                            TimeUnit unit)

addTimerTask

protected void addTimerTask(TimerTask task,
                            long delay)

getMaxQueueSize

protected Integer getMaxQueueSize(int def)


Copyright © 2001-2006 Tigase Developers Team. All rights Reserved.