/*------------------------------------------------------------------------------
Name: AddressBase.java
Project: xmlBlaster.org
Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
Comment: Holding connect address and callback address string including protocol
------------------------------------------------------------------------------*/
package org.xmlBlaster.util.qos.address;
import java.util.Iterator;
import java.util.Properties;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.xml.sax.Attributes;
import org.xmlBlaster.contrib.ClientPropertiesInfo;
import org.xmlBlaster.util.EncodableData;
import org.xmlBlaster.util.Global;
import org.xmlBlaster.util.SessionName;
import org.xmlBlaster.util.XmlBuffer;
import org.xmlBlaster.util.def.Constants;
import org.xmlBlaster.util.plugin.PluginManagerBase;
import org.xmlBlaster.util.property.PropBoolean;
import org.xmlBlaster.util.property.PropEntry;
import org.xmlBlaster.util.property.PropInt;
import org.xmlBlaster.util.property.PropLong;
import org.xmlBlaster.util.property.PropString;
import org.xmlBlaster.util.qos.ClientProperty;
/**
* Abstract helper class holding connect address and callback address string
* and protocol string.
* <p />
* See examples in the implementing classes
* @see Address
* @see CallbackAddress
* @see org.xmlBlaster.test.classtest.qos.AddressBaseTest
*/
public abstract class AddressBase implements Cloneable
{
protected final Global glob;
private static Logger log = Logger.getLogger(AddressBase.class.getName());
// Are strongest, usually set by programmer
private /*I_Info*/ClientPropertiesInfo pluginAttributes;
//private Hashtable pluginAttributes;
private Properties pluginInfoParameters = new Properties();
/** The root xml element: <callback> or <address>, is set from the derived class */
protected String rootTag = null;
protected String instanceName;
protected final String className = "dispatch";
protected final String context = null;
/** For example "plugin/socket/" */
protected String envPrefix = "";
/** The unique address, e.g. the CORBA IOR string */
private PropString rawAddress = new PropString("");
private PropString bootstrapHostname = new PropString(""); // initially not "localhost" to ask bootstrap hostname
public static final int DEFAULT_bootstrapPort = Constants.XMLBLASTER_PORT; // 3412
private PropInt bootstrapPort = new PropInt(DEFAULT_bootstrapPort);
/** The unique protocol type, defaults to SOCKET */
public static final String DEFAULT_type = "SOCKET";
protected PropString type = new PropString(DEFAULT_type);
/** The protocol version, e.g. "1.0" */
public static final String DEFAULT_version = "1.0";
protected PropString version = new PropString(DEFAULT_version);
/** BurstMode: The time to collect messages for publish/update */
public static final long DEFAULT_collectTime = 0L;
protected PropLong collectTime = new PropLong(DEFAULT_collectTime);
/** Ping interval: pinging every given milliseconds */
abstract public long getDefaultPingInterval();
protected PropLong pingInterval = new PropLong(getDefaultPingInterval());
/** How often to retry if connection fails */
abstract public int getDefaultRetries();
protected PropInt retries = new PropInt(getDefaultRetries());
/** Delay between connection retries in milliseconds */
abstract public long getDefaultDelay();
protected PropLong delay = new PropLong(getDefaultDelay());
/**
* Shall the update() or publish() messages be send oneway (no application level ACK).
* <p />
* For more info read the CORBA spec. Only CORBA and our native SOCKET protocol support oneway.
* Defaults to false (the update() or publish() has a return value and can throw an exception).
*/
public static final boolean DEFAULT_oneway = false;
protected PropBoolean oneway = new PropBoolean(DEFAULT_oneway);
public static final boolean DEFAULT_dispatcherActive = true;
/**
* Control if the dispatcher is activated on login, i.e. if it is
* able to deliver asynchronous messages from the callback queue.
* defaults to true
*/
protected PropBoolean dispatcherActive = new PropBoolean(DEFAULT_dispatcherActive);
/** Compress messages if set to "gzip" or "zip" */
public static final String DEFAULT_compressType = "";
protected PropString compressType = new PropString("compressType", DEFAULT_compressType);
/** Messages bigger this size in bytes are compressed */
public static final long DEFAULT_minSize = 0L;
protected PropLong minSize = new PropLong("minSize", DEFAULT_minSize);
public static final int DEFAULT_burstModeMaxEntries = 1;
protected PropInt burstModeMaxEntries = new PropInt(DEFAULT_burstModeMaxEntries);
public static final long DEFAULT_burstModeMaxBytes = -1L;
protected PropLong burstModeMaxBytes = new PropLong(DEFAULT_burstModeMaxBytes);
/** PtP messages wanted? Defaults to true, false prevents spamming */
public static final boolean DEFAULT_ptpAllowed = true;
protected PropBoolean ptpAllowed = new PropBoolean(DEFAULT_ptpAllowed);
/** The identifier sent to the callback client, the client can decide if he trusts this invocation */
public static final String DEFAULT_sessionId = "unknown";
protected PropString sessionId = new PropString(DEFAULT_sessionId);
/** Shall this session callback be used for subjectQueue messages as well? For <callback> only */
public static final boolean DEFAULT_useForSubjectQueue = true;
protected PropBoolean useForSubjectQueue = new PropBoolean(DEFAULT_useForSubjectQueue);
protected SessionName sessionName;
/**
* Does client whish a dispatcher plugin.
* <p>
* Set to "undef" forces to switch off, or e.g. "Priority,1.0" to access the PriorizedDispatchPlugin
* </p>
* <p>
* Setting it to 'null' (which is the default) lets the server choose the plugin
* </p>
* @see <a href="http://www.xmlBlaster.org/xmlBlaster/doc/requirements/dispatch.control.plugin.html">The dispatch.control.plugin requirement</a>
*/
public String DEFAULT_dispatchPlugin = PluginManagerBase.NO_PLUGIN_TYPE; // "undef";
protected PropString dispatchPlugin = new PropString(DEFAULT_dispatchPlugin);
/**
* Setting this property will not throw an exception on pings when a ping response timeout is detected. Instead it
* will just set the 'stalled' flag in the dispatch statistics.
*/
public boolean DEFAULT_stallOnPingTimeout = false;
protected PropBoolean stallOnPingTimeout = new PropBoolean(DEFAULT_stallOnPingTimeout);
/** The node id to which we want to connect */
protected String nodeId;
/**
* Marker if message comes from persistent store and is recovered after a server restart.
* NOTE: This information is for server side usage only and is NOT dumped to XML!
*/
protected transient boolean fromPersistenceRecovery = false;
/**
*/
public AddressBase(Global glob, String rootTag) {
this.glob = glob;
setRootTag(rootTag);
}
/*
* @throws IllegalArgumentException Not implemented.
public Object clone() {
throw new IllegalArgumentException("AddressBase.clone() is not implemented");
}
*/
public AddressBase getClone() {
try {
AddressBase s = (AddressBase)super.clone();
s.callbackDriver = null;
return s;
} catch (CloneNotSupportedException e) {
//This shouldn't happen because we implement Cloneable.
throw new AssertionError();
}
}
/**
* Configure property settings.
* "-/node/heron/dispatch/connection/delay 20" has precedence over "-delay 10"
*/
protected void initialize()
{
/* This is always set on server side from ServerAddress.java
but not always on client side
Shall we switch it on always here?
if (this.nodeId == null) {
this.nodeId = glob.getId();
}
*/
// SOCKET, IOR, XMLRPC, RMI, ...
this.type.setFromEnv(this.glob, this.nodeId, context, className, this.instanceName, "protocol");
// dispatch/callback/plugin/socket/hostname
// dispatch/connection/plugin/ior/localPort
this.envPrefix = "plugin/"+this.type.getValue().toLowerCase()+"/";
if (log.isLoggable(Level.FINE)) log.fine("type=" + this.type.getValue() + " nodeId=" + this.nodeId + " context=" + context +
" className=" + className + " instanceName=" + this.instanceName + " envPrefix=" + this.envPrefix);
// On server side for SOCKET protocol we support compression types:
// Constants.COMPRESS_ZLIB_STREAM="zlib:stream" or "zlib" with minSize=1234 bytes
// This default setting comes from environment or protocol plugin property
// None stream compressions can be overwritten by CallbackAddress for each client individually
// Here follows the plugin initialization, further down we overwrite this with Address specific settings
// Example on server side: "-plugin/socket/compress/type stream"
this.compressType = getEnv("compress/type", this.compressType.getValue());
this.minSize = getEnv("compress/minSize", this.minSize.getValue());
this.bootstrapHostname.setFromEnv(this.glob, this.nodeId, context, className, this.instanceName, "bootstrapHostname");
this.bootstrapPort.setFromEnv(this.glob, this.nodeId, context, className, this.instanceName, "bootstrapPort");
//this.bootstrapHostname.setFromEnv(this.glob, this.nodeId, context, className, this.instanceName, envPrefix+"bootstrapHostname");
//this.bootstrapPort.setFromEnv(this.glob, this.nodeId, context, className, this.instanceName, envPrefix+"bootstrapPort");
//log.error(ME, "DEBUG ONLY: Checking " + this.instanceName + ": " + envPrefix+"port to result=" + this.bootstrapPort.getValue() );
// These are protocol unspecific values
this.burstModeMaxEntries.setFromEnv(this.glob, this.nodeId, context, className, this.instanceName, "burstMode/maxEntries");
this.burstModeMaxBytes.setFromEnv(this.glob, this.nodeId, context, className, this.instanceName, "burstMode/maxBytes");
this.collectTime.setFromEnv(this.glob, this.nodeId, context, className, this.instanceName, "burstMode/collectTime");
this.pingInterval.setFromEnv(this.glob, this.nodeId, context, className, this.instanceName, "pingInterval");
this.retries.setFromEnv(this.glob, this.nodeId, context, className, this.instanceName, "retries");
this.delay.setFromEnv(this.glob, this.nodeId, context, className, this.instanceName, "delay");
this.oneway.setFromEnv(this.glob, this.nodeId, context, className, this.instanceName, "oneway");
this.dispatcherActive.setFromEnv(this.glob, this.nodeId, context, className, this.instanceName, "dispatcherActive");
this.compressType.setFromEnv(this.glob, this.nodeId, context, className, this.instanceName, "compress/type");
this.minSize.setFromEnv(this.glob, this.nodeId, context, className, this.instanceName, "compress/minSize");
this.ptpAllowed.setFromEnv(this.glob, this.nodeId, context, className, this.instanceName, "ptpAllowed");
this.sessionId.setFromEnv(this.glob, this.nodeId, context, className, this.instanceName, "sessionId");
this.dispatchPlugin.setFromEnv(this.glob, this.nodeId, context, className, this.instanceName, "DispatchPlugin/defaultPlugin");
this.stallOnPingTimeout.setFromEnv(this.glob, this.nodeId, context, className, this.instanceName, "stallOnPingTimeout");
//log.error(ME, getType() + " " + "DEBUG ONLY " + this.compressType + " " + this.minSize + toXml());
}
/**
* Set a protocol specific property.
* <p>
* Setting a property here forces the setting in the plugin, it
* has precedence over any environment, xmlBlaster.properties or command line setting.
* <br />
* You typically use this method in your client code to overwrite settings,.
* please check the protocol specific documentation about the supported settings.
* </p>
* @param key The property, e.g. "SOLingerTimeout" (WITHOUT any prefix like "plugin/socket/")
* The searched property is depending on the type (here "socket")
* and instance (here "connection") e.g. "plugin/socket/SOLingerTimeout"
* and with higher precedence "dispatch/connection/plugin/socket/SOLingerTimeout"
* @param value The value, e.g. "10000"
*/
public void setPluginProperty(String key, String value) {
if (this.pluginAttributes == null) this.pluginAttributes = new ClientPropertiesInfo(null);
this.pluginAttributes.put(key, value);
// refresh compressType or minSize: Those attributes are double used:
// Once SOCKET specific and again as a common setting in <address ...>
// TODO: clean up this mess: no SOCKET specific code in here!
if ("compress/type".equals(key) || "compress/minSize".equals(key)) {
initialize();
}
}
public void addClientProperty(ClientProperty clientProperty) {
if (this.pluginAttributes == null) this.pluginAttributes = new ClientPropertiesInfo(null);
this.pluginAttributes.put(clientProperty.getName(), clientProperty);
}
/**
* Set the PluginInfo parameters (derived from xmlBlasterPlugins.xml or xmlBlaster.properties).
* <br />
* As a protocol plugin developer you should call this method if you have a PluginInfo instance
* to use the default paramaters of the plugin.
* <br />
* Example from xmlBlasterPlugins.xml:
* <br />
* <plugin id='SOCKET_UDP' className='org.xmlBlaster.protocol.socket.SocketDriver'>
* ...
* <attribute id='useUdpForOneway'>true</attribute>
* </plugin>
* <p/>
* These settings are used as default settings for the plugin with lowest priority
* <p/>
* Calls initialize() to reinitialize compression.
*/
public void setPluginInfoParameters(Properties parameters) {
if (parameters == null) {
this.pluginInfoParameters = new Properties();
}
else {
this.pluginInfoParameters = parameters;
}
initialize();
}
public String getEnvPrefix() {
return this.envPrefix;
}
public boolean hasAttributeEnv(String key) {
if (this.pluginAttributes != null) {
Object val = this.pluginAttributes.get(key, (String)null);
if (val != null)
return true;
}
return false;
}
/**
* Plugins may query their properties here
* @param key The property, e.g. "SOLingerTimeout" (WITHOUT any prefix like "plugin/socket/")
* @return never null
*/
public PropString getEnv(String key, String defaultValue) {
PropString tmp = new PropString(key, this.pluginInfoParameters.getProperty(key,defaultValue));
if (this.pluginAttributes != null) {
Object val = this.pluginAttributes.get(key, (String)null);
if (val != null) {
tmp.setValue((String)val, PropEntry.CREATED_BY_SETTER); // or by client's ConnectQos
return tmp;
}
}
tmp.setFromEnv(this.glob, this.nodeId, context, className, this.instanceName, this.envPrefix+key);
return tmp;
}
/**
* Plugins may query their properties here
* @param key The property, e.g. "SOLingerTimeout" (WITHOUT any prefix like "plugin/socket/")
* @return never null
*/
public PropInt getEnv(String key, int defaultValue) {
String defaultStr = this.pluginInfoParameters.getProperty(key,""+defaultValue);
defaultValue = Integer.valueOf(defaultStr).intValue();
PropInt tmp = new PropInt(key, defaultValue);
if (this.pluginAttributes != null) {
Object val = this.pluginAttributes.get(key, (String)null);
if (val != null) {
tmp.setValue((String)val, PropEntry.CREATED_BY_SETTER);
return tmp;
}
}
tmp.setFromEnv(this.glob, this.nodeId, context, className, this.instanceName, this.envPrefix+key);
return tmp;
}
/**
* Plugins may query their properties here
* @param key The property, e.g. "SOLingerTimeout" (WITHOUT any prefix like "plugin/socket/")
* @return never null
*/
public PropLong getEnv(String key, long defaultValue) {
String defaultStr = this.pluginInfoParameters.getProperty(key,""+defaultValue);
defaultValue = Long.valueOf(defaultStr).longValue();
PropLong tmp = new PropLong(key, defaultValue);
if (this.pluginAttributes != null) {
Object val = this.pluginAttributes.get(key, (String)null);
if (val != null) {
tmp.setValue((String)val, PropEntry.CREATED_BY_SETTER);
return tmp;
}
}
tmp.setFromEnv(this.glob, this.nodeId, context, className, this.instanceName, this.envPrefix+key);
return tmp;
}
/**
* Plugins may query their properties here
* @param key The property, e.g. "SOLingerTimeout" (WITHOUT any prefix like "plugin/socket/")
* @return never null
*/
public PropBoolean getEnv(String key, boolean defaultValue) {
String defaultStr = this.pluginInfoParameters.getProperty(key,""+defaultValue);
defaultValue = Boolean.valueOf(defaultStr).booleanValue();
PropBoolean tmp = new PropBoolean(key, defaultValue);
if (this.pluginAttributes != null) {
Object val = this.pluginAttributes.get(key, (String)null);
if (val != null) {
tmp.setValue((String)val, PropEntry.CREATED_BY_SETTER);
return tmp;
}
}
tmp.setFromEnv(this.glob, this.nodeId, context, className, this.instanceName, this.envPrefix+key);
return tmp;
}
/**
* Returns the completed key which was found and chosen.
* @return For "responseTimeout" it could be "dispatch/connection/plugin/socket/responseTimeout"
*/
public String getEnvLookupKey(String key) {
PropString tmp = new PropString("");
String k = tmp.setFromEnv(this.glob, this.nodeId, context, className, this.instanceName, this.envPrefix+key);
if (k != null && k.length() > 0)
return k;
return key;
}
/**
* A nice human readable name for this address (used for logging)
*/
public final String getName() {
return getLogId();
}
/**
* Check if supplied address would connect to the address of this instance
*/
public final boolean isSameAddress(AddressBase other) {
if (other.getType().equals(getType())) { // what about two different SOCKET connections??
String or = other.getRawAddress();
if (or != null && or.length() > 0) {
if (or.equals(getRawAddress())) {
return true;
}
else {
return false;
}
}
String oh = other.getBootstrapHostname();
int op = other.getBootstrapPort();
if (op > 0 && oh != null) {
if (op == getBootstrapPort() && oh.equals(getBootstrapHostname()))
return true;
else
return false;
}
}
return false;
}
/**
* Sets the root xml tag, <callback> or <address>
*/
private final void setRootTag(String rootTag) {
this.rootTag = rootTag;
}
/**
* Show some important settings for logging
*/
public String getSettings() {
StringBuffer buf = new StringBuffer(126);
buf.append("type=").append(getType()).append(" oneway=").append(oneway()).append(" dispatcherActive=").append(isDispatcherActive()).append(" burstMode.collectTime=").append(getCollectTime());
return buf.toString();
}
/**
* NOTE: This setting has precedence over all environment or command line settings
* @param type The protocol type, e.g. "IOR", "EMAIL", "XMLRPC"
* If you pass null the value is reset to its default setting
*/
public final void setType(String type) {
if (type == null) this.type.setValue(this.type.getDefaultValue(), PropEntry.CREATED_BY_DEFAULT);
else this.type.setValue(type);
this.envPrefix = "plugin/"+this.type.getValue().toLowerCase()+"/";
}
/**
* @param version The protocol version, e.g. "1.0"
*/
public final void setVersion(String version) {
this.version.setValue(version);
}
/**
* @return A human readable address for logging only
*/
public String getLogId() {
if (getRawAddress() != null && getRawAddress().length() > 0 && getRawAddress().length() < 50) {
return getRawAddress();
}
return getBootstrapUrl();
}
/**
* Updates the internal address as well.
* <p>NOTE:</p>
* <p>This bootstrapping bootstrapPort is currently only used by the CORBA plugin.</p>
* <p>To set other protocols try e.g.:</p>
* <pre>
* String[] args = { "-protocol", "SOCKET",
* "-dispatch/connection/plugin/socket/hostname", "myHost",
* "-dispatch/connection/plugin/socket/port", "7666",
* "-dispatch/connection/plugin/socket/localHostname", "myHost", // optional
* "-dispatch/connection/plugin/socket/localPort", "8888" }; // optional
* glob.init(args);
* </pre>
* @param host An IP or DNS
*/
public final void setBootstrapHostname(String host) {
if (host == null) this.bootstrapHostname.setValue("");
else this.bootstrapHostname.setValue(host);
}
public final void setDefaultBootstrapHostname(String host) {
if (host == null) this.bootstrapHostname.setValue("", PropEntry.CREATED_BY_DEFAULT);
else this.bootstrapHostname.setValue(host, PropEntry.CREATED_BY_DEFAULT);
}
/**
* Check if a bootstrapHostname is set already
*/
public boolean hasBootstrapHostname() {
return (this.bootstrapHostname.getValue().length() > 0);
}
/**
* @return The Hostname, IP or "" if not known
*/
public final String getBootstrapHostname() {
if (!hasBootstrapHostname()) {
this.bootstrapHostname.setValue(glob.getLocalIP(), PropEntry.CREATED_BY_DEFAULT);
}
return this.bootstrapHostname.getValue();
}
/**
* Returns a URL markup of the bootstrap server, currently it looks like
* <i>xmlBlaster://myServer.com:3412</i> but will probably change in a future release.
*/
public final String getBootstrapUrl() {
return "xmlBlaster://" + getBootstrapHostname() + ":" + getBootstrapPort(); // + "/" + getType();
}
/**
* Set the bootstrapping port.
* Updates the internal address as well.
* <p>NOTE:</p>
* <p>This bootstrapping bootstrapPort is currently only used by the CORBA plugin.</p>
* <p>To set other protocols try e.g.:</p>
* <pre>
* String[] args = { "-protocol", "SOCKET",
* "-dispatch/connection/plugin/socket/hostname", "myHost",
* "-dispatch/connection/plugin/socket/port", "7666",
* "-dispatch/connection/plugin/socket/localHostname", "myHost", // optional
* "-dispatch/connection/plugin/socket/localPort", "8888" }; // optional
* glob.init(args);
* </pre>
*/
public final void setBootstrapPort(int bootstrapPort) {
this.bootstrapPort.setValue(bootstrapPort);
}
public final int getBootstrapPort() {
return this.bootstrapPort.getValue();
}
/**
* The creation default will be overwritten by the given defaultPort.
* <p>
* If the bootstrapPort was changed by environment setting, this setting has precedence
* over the given defaultPort and nothing happens.
* </p>
* <p>
* This is used by the protocol plugins which all have different defaults
* </p>
*/
public final void setDefaultBootstrapPort(int defaultBootstrapPort) {
if (this.bootstrapPort.isDefault()) {
this.bootstrapPort.setValue(defaultBootstrapPort, PropEntry.CREATED_BY_DEFAULT);
}
}
/**
* Set the callback address, it should fit to the protocol-type.
*
* <p>
* If you set an address here you need to set it compatible to the
* protocol from getType().<br />
* For XmlRpc it looks typically like <i>http://myServer:8080</i>
* for CORBA like <i>IOR:00005395....</i> and
* for SOCKET like <i>socket://128.56.44.12:7608</i>
* </p>
* <p>
* Setting the address here has precedence over any environment settings
* like <i>-dispatch/connection/plugin/socket/port 7666</i> on command line
* or
* <pre>
* String[] args = { "-protocol", "SOCKET",
* "-dispatch/connection/plugin/socket/hostname", "myHost",
* "-dispatch/connection/plugin/socket/port", "7666",
* "-dispatch/connection/plugin/socket/localHostname", "myHost", // optional
* "-dispatch/connection/plugin/socket/localPort", "8888" }; // optional
* glob.init(args);
* </pre>
* @param rawAddress The address specific for the protocol, e.g. "et@mars.univers" for EMAIL
*/
public final void setRawAddress(String rawAddress) {
if (rawAddress == null) rawAddress = "";
this.rawAddress.setValue(rawAddress);
if (log.isLoggable(Level.FINE)) log.fine("setRawAddress=" + this.rawAddress.getValue());
}
/**
* Returns the rawAddress which is specific for each protocol.
* @return e.g. "IOR:00001100022...." or "et@universe.com" or "" (never null)
*/
public final String getRawAddress() {
return this.rawAddress.getValue();
}
// used by SOCKET as the same socket is used for callback tunneling
private Object/*I_CallbackDriver*/ callbackDriver = null;
public final void setCallbackDriver(Object/*I_CallbackDriver*/ callbackDriver) {
if (callbackDriver != null && !(callbackDriver instanceof org.xmlBlaster.protocol.socket.CallbackSocketDriver)) {
log.severe("Unexpected " + callbackDriver);
Thread.dumpStack(); //assert
}
this.callbackDriver = callbackDriver;
}
public final Object getCallbackDriver() {
return this.callbackDriver;
}
/**
* Returns the protocol type.
* @return e.g. "EMAIL" or "IOR" (never null).
*/
public final String getType() {
return type.getValue();
}
/**
* Returns the protocol version.
* @return e.g. "1.0" or null
*/
public final String getVersion() {
return version.getValue();
}
/**
* What to do if max retries is exhausted.
* <p />
* This mode is currently not configurable, we always destroy the login session.
* This is interpreted only server side if callback fails.
* @return Constants.ONEXHAUST_KILL_SESSION="killSession"
*/
public final String getOnExhaust() {
return Constants.ONEXHAUST_KILL_SESSION; // in future possibly Constants.ONEXHAUST_KILL_CALLBACK
}
/**
* Kill login session if max callback retries is exhausted?
*/
public final boolean getOnExhaustKillSession() {
return getOnExhaust().equalsIgnoreCase(Constants.ONEXHAUST_KILL_SESSION);
}
/**
* BurstMode: The time span to collect messages before sending.
* @return The time to collect in milliseconds (default is 0 == switched off)
*/
public final long getCollectTime() {
return this.collectTime.getValue();
}
/**
* BurstMode: The time to collect messages for sending in a bulk.
* @param The time to collect in milliseconds
*/
public void setCollectTime(long collectTime) {
if (collectTime < 0L) this.collectTime.setValue(0L);
else this.collectTime.setValue(collectTime);
}
/**
* How many messages maximum shall the callback thread take in one bulk out of the
* callback queue and deliver to the client in one bulk.
*/
public int getBurstModeMaxEntries() {
return this.burstModeMaxEntries.getValue();
}
public void setBurstModeMaxEntries(int burstModeMaxEntries) {
if (burstModeMaxEntries == 0)
log.warning("<burstMode maxEntries='" + burstModeMaxEntries + "'> is not supported and may cause strange behavior");
else if (burstModeMaxEntries < -1)
burstModeMaxEntries = -1;
this.burstModeMaxEntries.setValue(burstModeMaxEntries);
}
/**
* How many bytes maximum shall the callback thread take in one bulk out of the
* callback queue and deliver to the client in one bulk.
*/
public long getBurstModeMaxBytes() {
return this.burstModeMaxBytes.getValue();
}
public void setBurstModeMaxBytes(long burstModeMaxBytes) {
if (burstModeMaxBytes == 0)
log.warning("<burstMode maxBytes='" + burstModeMaxBytes + "'> is not supported and may cause strange behavior");
else if (burstModeMaxBytes < -1L)
burstModeMaxBytes = -1L;
this.burstModeMaxBytes.setValue(burstModeMaxBytes);
}
/**
* How long to wait between pings to the callback server.
* @return The pause time between pings in millis
*/
public final long getPingInterval() {
return pingInterval.getValue();
}
/**
* How long to wait between pings to the callback server.
* @param pingInterval The pause time between pings in millis
*/
public void setPingInterval(long pingInterval) {
if (pingInterval <= 0L) this.pingInterval.setValue(0L);
else if (pingInterval < 10L) {
log.warning("pingInterval=" + pingInterval + " msec is too short, setting it to 10 millis");
this.pingInterval.setValue(10L);
}
else
this.pingInterval.setValue(pingInterval);
}
/**
* How often shall we retry callback attempt on callback failure
* @return -1 forever, 0 no retry, > 0 number of retries
*/
public final int getRetries() {
return retries.getValue();
}
/**
* How often shall we retry callback attempt on callback failure
* @param -1 forever, 0 no retry, > 0 number of retries
*/
public void setRetries(int retries) {
if (retries < -1) this.retries.setValue(-1);
else this.retries.setValue(retries);
}
/**
* Delay between callback retries in milliseconds, defaults to one minute
* @return The delay in millisconds
*/
public final long getDelay() {
return delay.getValue();
}
/**
* Delay between callback retries in milliseconds, defaults to one minute
*/
public void setDelay(long delay) {
if (delay <= 0L) this.delay.setValue(getDefaultDelay());
else this.delay.setValue(delay);
}
/**
* Shall the publish() or callback update() message be oneway.
* Is only with CORBA and our native SOCKET protocol supported
* @return true if you want to force oneway sending
*/
public final boolean oneway() {
return oneway.getValue();
}
/**
* Shall the publish() or callback update() message be oneway.
* Is only with CORBA and our native SOCKET protocol supported
* @param oneway false is default
*/
public void setOneway(boolean oneway) {
this.oneway.setValue(oneway);
}
/**
* Inhibits/activates the delivery of asynchronous dispatches of messages.
* @param dispatcherActive
*/
public void setDispatcherActive(boolean dispatcherActive) {
this.dispatcherActive.setValue(dispatcherActive);
}
/**
* @return true if the dispatcher is currently activated, i.e. if it is
* able to deliver asynchronous messages from the queue.
*/
public boolean isDispatcherActive() {
return this.dispatcherActive.getValue();
}
/**
* @param Set if we accept point to point messages
*/
public void setPtpAllowed(boolean ptpAllowed) {
this.ptpAllowed.setValue(ptpAllowed);
}
/**
* @return true if we may send PtP messages
*/
public final boolean isPtpAllowed() {
return this.ptpAllowed.getValue();
}
public void setCompressType(String compressType) {
if (compressType == null) this.compressType.setValue("");
this.compressType.setValue(compressType);
}
/**
* The identifier sent to the callback client, the client can decide if he trusts this invocation
* @return never null
*/
public final String getSecretSessionId() {
return sessionId.getValue();
}
/** The identifier sent to the callback client, the client can decide if he trusts this invocation */
public void setSecretSessionId(String sessionId) {
this.sessionId.setValue(sessionId);
}
/**
* Get the compression method.
* @return "" No compression
*/
public final String getCompressType() {
return this.compressType.getValue();
}
/**
* Messages bigger this size in bytes are compressed.
* <br />
* Note: This value is only used if compressType is set to a supported value
* @return size in bytes
*/
public long getMinSize() {
return this.minSize.getValue();
}
/**
* Messages bigger this size in bytes are compressed.
* <br />
* Note: This value is only evaluated if compressType is set to a supported value
* @return size in bytes
*/
public void setMinSize(long minSize) {
this.minSize.setValue(minSize);
}
/**
* Specify your dispatcher plugin configuration.
* <p>
* Set to "undef" to switch off, or to e.g. "Priority,1.0" to access the PriorizedDispatchPlugin
* </p>
* <p>
* This overwrites the xmlBlaster.properties default setting e.g.:
* <pre>
* DispatchPlugin[Priority][1.0]=org.xmlBlaster.util.dispatch.plugins.prio.PriorizedDispatchPlugin
* DispatchPlugin[SlowMotion][1.0]=org.xmlBlaster.util.dispatch.plugins.motion.SlowMotion
* DispatchPlugin/defaultPlugin=Priority,1.0
* </pre>
* </p>
* @see <a href="http://www.xmlBlaster.org/xmlBlaster/doc/requirements/dispatch.control.plugin.html">The dispatch.control.plugin requirement</a>
*/
public void setDispatchPlugin(String dispatchPlugin) {
this.dispatchPlugin.setValue(dispatchPlugin);
}
/**
* @return "undef" or e.g. "Priority,1.0"
*/
public String getDispatchPlugin() {
return this.dispatchPlugin.getValue();
}
public final boolean isStallOnPingTimeout() {
return this.stallOnPingTimeout.getValue();
}
public final void setStallOnPingTimeout(boolean stallOnPingTimeout) {
this.stallOnPingTimeout.setValue(stallOnPingTimeout);
}
/**
* Called for SAX callback start tag
*/
public final void startElement(String uri, String localName, String name, StringBuffer character, Attributes attrs) {
// log.info(ME, "startElement(rootTag=" + rootTag + "): name=" + name + " character='" + character.toString() + "'");
String tmp = character.toString().trim(); // The address
if (tmp.length() > 0) {
setRawAddress(tmp);
}
character.setLength(0);
if (name.equalsIgnoreCase(rootTag)) { // "callback"
if (attrs != null) {
int len = attrs.getLength();
for (int i = 0; i < len; i++) {
if( attrs.getQName(i).equalsIgnoreCase("type") ) {
setType(attrs.getValue(i).trim());
}
else if( attrs.getQName(i).equalsIgnoreCase("version") ) {
setVersion(attrs.getValue(i).trim());
}
else if( attrs.getQName(i).equalsIgnoreCase("bootstrapHostname") ) {
setBootstrapHostname(attrs.getValue(i).trim());
}
else if( attrs.getQName(i).equalsIgnoreCase("hostname") ) { // deprecated -> use bootstrapHostname
setBootstrapHostname(attrs.getValue(i).trim());
}
else if( attrs.getQName(i).equalsIgnoreCase("bootstrapPort") ) {
String ll = attrs.getValue(i).trim();
try {
setBootstrapPort(new Integer(ll).intValue());
} catch (NumberFormatException e) {
log.severe("Wrong format of <" + rootTag + " bootstrapPort='" + ll + "'>, expected an integer number.");
}
}
else if( attrs.getQName(i).equalsIgnoreCase("port") ) { // deprecated -> use bootstrapPort
String ll = attrs.getValue(i).trim();
try {
setBootstrapPort(new Integer(ll).intValue());
} catch (NumberFormatException e) {
log.severe("Wrong format of <" + rootTag + " port='" + ll + "'>, expected an integer number.");
}
}
else if( attrs.getQName(i).equalsIgnoreCase("sessionId") ) {
setSecretSessionId(attrs.getValue(i).trim());
}
else if( attrs.getQName(i).equalsIgnoreCase("pingInterval") ) {
String ll = attrs.getValue(i).trim();
try {
setPingInterval(new Long(ll).longValue());
} catch (NumberFormatException e) {
log.severe("Wrong format of <" + rootTag + " pingInterval='" + ll + "'>, expected a long in milliseconds.");
}
}
else if( attrs.getQName(i).equalsIgnoreCase("retries") ) {
String ll = attrs.getValue(i).trim();
try {
setRetries(new Integer(ll).intValue());
} catch (NumberFormatException e) {
log.severe("Wrong format of <" + rootTag + " retries='" + ll + "'>, expected an integer number.");
}
}
else if( attrs.getQName(i).equalsIgnoreCase("delay") ) {
String ll = attrs.getValue(i).trim();
try {
setDelay(new Long(ll).longValue());
} catch (NumberFormatException e) {
log.severe("Wrong format of <" + rootTag + " delay='" + ll + "'>, expected a long in milliseconds.");
}
}
else if( attrs.getQName(i).equalsIgnoreCase("oneway") ) {
setOneway(new Boolean(attrs.getValue(i).trim()).booleanValue());
}
else if( attrs.getQName(i).equalsIgnoreCase("dispatcherActive") ) {
setDispatcherActive(new Boolean(attrs.getValue(i).trim()).booleanValue());
}
else if( attrs.getQName(i).equalsIgnoreCase("useForSubjectQueue") ) {
this.useForSubjectQueue.setValue(new Boolean(attrs.getValue(i).trim()).booleanValue());
}
else if( attrs.getQName(i).equalsIgnoreCase("dispatchPlugin") ) {
this.dispatchPlugin.setValue(attrs.getValue(i).trim());
}
else if( attrs.getQName(i).equalsIgnoreCase("stallOnPingTimeout") ) {
this.stallOnPingTimeout.setValue(new Boolean(attrs.getValue(i).trim()).booleanValue());
}
else {
log.severe("Ignoring unknown attribute " + attrs.getQName(i) + " in " + rootTag + " section.");
}
}
}
if (getType() == null) {
log.severe("Missing '" + rootTag + "' attribute 'type' in QoS");
setType("IOR");
}
if (getSecretSessionId() == null) {
log.warning("Missing '" + rootTag + "' attribute 'sessionId' QoS");
}
return;
}
if (name.equalsIgnoreCase("burstMode")) {
if (attrs != null) {
int len = attrs.getLength();
int ii=0;
for (ii = 0; ii < len; ii++) {
if (attrs.getQName(ii).equalsIgnoreCase("collectTime")) {
String ll = attrs.getValue(ii).trim();
try {
setCollectTime(new Long(ll).longValue());
} catch (NumberFormatException e) {
log.severe("Wrong format of <burstMode collectTime='" + ll + "'>, expected a long in milliseconds, burst mode is switched off sync messages.");
}
}
else if( attrs.getQName(ii).equalsIgnoreCase("maxEntries") ) {
String ll = attrs.getValue(ii).trim();
try {
setBurstModeMaxEntries(new Integer(ll).intValue());
} catch (NumberFormatException e) {
log.severe("Wrong format of <burstMode maxEntries='" + ll + "'>, expected an integer number.");
}
}
else if( attrs.getQName(ii).equalsIgnoreCase("maxBytes") ) {
String ll = attrs.getValue(ii).trim();
try {
setBurstModeMaxBytes(new Long(ll).longValue());
} catch (NumberFormatException e) {
log.severe("Wrong format of <burstMode maxBytes='" + ll + "'>, expected a long in bytes.");
}
}
}
}
else {
log.severe("Missing 'collectTime' attribute in login-qos <burstMode>");
}
return;
}
if (name.equalsIgnoreCase("compress")) {
if (attrs != null) {
int len = attrs.getLength();
for (int ii = 0; ii < len; ii++) {
if (attrs.getQName(ii).equalsIgnoreCase("type")) {
setCompressType(attrs.getValue(ii).trim());
}
else if (attrs.getQName(ii).equalsIgnoreCase("minSize")) {
String ll = attrs.getValue(ii).trim();
try {
setMinSize(new Long(ll).longValue());
} catch (NumberFormatException e) {
log.severe("Wrong format of <compress minSize='" + ll + "'>, expected a long in bytes, compress is switched off.");
}
}
}
}
else {
log.severe("Missing 'type' attribute in qos <compress>");
}
return;
}
if (name.equalsIgnoreCase("ptp")) {
setPtpAllowed(true);
character.setLength(0);
return;
}
}
/**
* Handle SAX parsed end element
*/
public final void endElement(String uri, String localName, String name, StringBuffer character) {
if (name.equalsIgnoreCase(rootTag)) { // "callback"
String tmp = character.toString().trim(); // The address (if after inner tags)
if (tmp.length() > 0)
setRawAddress(tmp);
else if (getRawAddress() == null)
log.severe(rootTag + " QoS contains no rawAddress data");
}
else if (name.equalsIgnoreCase("burstMode")) {
}
else if (name.equalsIgnoreCase("compress")) {
}
else if (name.equalsIgnoreCase("ptp")) {
String tmp = character.toString().trim();
if (tmp.length() > 0)
setPtpAllowed(new Boolean(tmp).booleanValue());
return;
}
character.setLength(0);
}
/**
* Dump state of this object into a XML ASCII string.
*/
public final String toXml() {
return toXml((String)null);
}
/**
* Dump state of this object into a XML ASCII string.
* <br>
* Only none default values are dumped for performance reasons
* @param extraOffset indenting of tags for nice output
* @return The xml representation
*/
public final String toXml(String extraOffset) {
XmlBuffer sb = new XmlBuffer(1200);
if (extraOffset == null) extraOffset = "";
String offset = Constants.OFFSET + extraOffset;
sb.append(offset).append("<").append(rootTag).append(" type='").appendAttributeEscaped(getType()).append("'");
// For debugging only:
//sb.append(" nodeId='").append(this.nodeId).append("'");
//sb.append(" context='").append(this.context).append("'");
//sb.append(" className='").append(this.className).append("'");
//sb.append(" instanceName='").append(this.instanceName).append("'");
//sb.append(" envPrefix='").append(this.envPrefix).append("'");
if (this.version.isModified())
sb.append(" version='").appendAttributeEscaped(getVersion()).append("'");
if (this.bootstrapHostname.isModified())
sb.append(" bootstrapHostname='").appendAttributeEscaped(getBootstrapHostname()).append("'");
if (this.bootstrapPort.isModified())
sb.append(" bootstrapPort='").append(getBootstrapPort()).append("'");
if (this.sessionId.isModified())
sb.append(" sessionId='").appendAttributeEscaped(getSecretSessionId()).append("'");
if (this.pingInterval.isModified())
sb.append(" pingInterval='").append(getPingInterval()).append("'");
if (this.retries.isModified())
sb.append(" retries='").append(getRetries()).append("'");
if (this.delay.isModified())
sb.append(" delay='").append(getDelay()).append("'");
if (this.oneway.isModified())
sb.append(" oneway='").append(oneway()).append("'");
if (this.dispatcherActive.isModified())
sb.append(" dispatcherActive='").append(isDispatcherActive()).append("'");
if (this.useForSubjectQueue.isModified())
sb.append(" useForSubjectQueue='").append(this.useForSubjectQueue.getValue()).append("'");
if (this.dispatchPlugin.isModified())
sb.append(" dispatchPlugin='").appendAttributeEscaped(this.dispatchPlugin.getValue()).append("'");
if (this.stallOnPingTimeout.isModified())
sb.append(" stallOnPingTimeout='").append(isStallOnPingTimeout()).append("'");
sb.append(">");
sb.append(offset).append(" ").append(getRawAddress());
if (this.collectTime.isModified() || this.burstModeMaxEntries.isModified() || this.burstModeMaxBytes.isModified()) {
sb.append(offset).append(" ").append("<burstMode");
if (this.collectTime.isModified())
sb.append(" collectTime='").append(getCollectTime()).append("'");
if (this.burstModeMaxEntries.isModified())
sb.append(" maxEntries='").append(getBurstModeMaxEntries()).append("'");
if (this.burstModeMaxBytes.isModified())
sb.append(" maxBytes='").append(getBurstModeMaxBytes()).append("'");
sb.append("/>");
}
if (this.compressType.isModified())
sb.append(offset).append(" ").append("<compress type='").appendAttributeEscaped(getCompressType()).append("' minSize='").append(getMinSize()).append("'/>");
if (this.ptpAllowed.isModified()) {
if (this.ptpAllowed.getValue()) {
sb.append(offset).append(" ").append("<ptp/>");
}
else {
sb.append(offset).append(" ").append("<ptp>").append(this.ptpAllowed.getValue()).append("</ptp>");
}
}
if (this.pluginAttributes != null) {
Iterator it = this.pluginAttributes.getClientPropertyMap().values().iterator();
while (it.hasNext()) {
Object obj = it.next();
EncodableData cp = (EncodableData)obj;
sb.append(cp.toXml(offset+" ", ClientProperty.ATTRIBUTE_TAG));
}
}
sb.append(offset).append("</").append(rootTag).append(">");
return sb.toString();
}
/**
* Get a usage string for the connection parameters.
* Currently only for client side usage
*/
public String usage()
{
String text = "";
//text += " -oneway Shall the publish() messages be send oneway (no application level ACK) [" + Address.DEFAULT_oneway + "]\n";
text += " -dispatch/" + this.instanceName + "/protocol\n";
text += " Protocol to use [" + DEFAULT_type + "]\n";
// text += " -dispatch/" + this.instanceName + "/plugin/" + this.type + "/port\n";
// text += " Port to use for the protocol [" + DEFAULT_port + "]\n";
text += " -dispatch/" + this.instanceName + "/pingInterval\n";
text += " Pinging every given milliseconds [" + getDefaultPingInterval() + "]\n";
text += " 0 switches pinging off\n";
text += " -dispatch/" + this.instanceName + "/retries\n";
text += " How often to retry if connection fails (-1 is forever) [" + getDefaultRetries() + "]\n";
text += " Set to -1 for failsafe operation\n";
text += " -dispatch/" + this.instanceName + "/delay\n";
text += " Delay between connection retries in milliseconds [" + getDefaultDelay() + "]\n";
text += " A delay value > 0 switches fails save mode on, 0 switches it off\n";
text += " -dispatch/" + this.instanceName + "/stallOnPingTimeout\n";
text += " if true it will set the 'stalled' flag in the dispatch statistics instead of\n";
text += " going in polling mode when a ping response timeout occurs. Default is false\n";
// other settings like burstMode are in the derived classes
return text;
}
/**
*
* @return Can be null
*/
public SessionName getSessionName() {
return sessionName;
}
public void setSessionName(SessionName sessionName) {
this.sessionName = sessionName;
}
public void shutdown() {
this.callbackDriver = null;
this.sessionName = null;
this.pluginInfoParameters.clear();
}
public boolean isFromPersistenceRecovery() {
// QosData contains the same TODO: assure they are in sync or remove one
return fromPersistenceRecovery;
}
public void setFromPersistenceRecovery(boolean fromPersistenceRecovery) {
this.fromPersistenceRecovery = fromPersistenceRecovery;
}
}