Package org.xmlBlaster.contrib.replication.impl

Source Code of org.xmlBlaster.contrib.replication.impl.InitialUpdater$ConnectionInfo

/*------------------------------------------------------------------------------
Name:      InitialUpdater.java
Project:   xmlBlaster.org
Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
------------------------------------------------------------------------------*/

package org.xmlBlaster.contrib.replication.impl;

import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.TextMessage;

import org.xmlBlaster.util.ReplaceVariable;
import org.xmlBlaster.client.I_ConnectionStateListener;
import org.xmlBlaster.client.I_XmlBlasterAccess;
import org.xmlBlaster.contrib.ContribConstants;
import org.xmlBlaster.contrib.I_ChangePublisher;
import org.xmlBlaster.contrib.I_ContribPlugin;
import org.xmlBlaster.contrib.I_Info;
import org.xmlBlaster.contrib.I_Update;
import org.xmlBlaster.contrib.InfoHelper;
import org.xmlBlaster.contrib.PropertiesInfo;
import org.xmlBlaster.contrib.VersionTransformerCache;
import org.xmlBlaster.contrib.dbwatcher.DbWatcher;
import org.xmlBlaster.contrib.dbwriter.info.SqlInfo;
import org.xmlBlaster.contrib.dbwriter.info.SqlDescription;
import org.xmlBlaster.contrib.replication.I_DbSpecific;
import org.xmlBlaster.contrib.replication.I_ReplSource;
import org.xmlBlaster.contrib.replication.ReplSourceEngine;
import org.xmlBlaster.contrib.replication.ReplicationConstants;
import org.xmlBlaster.jms.XBConnectionMetaData;
import org.xmlBlaster.jms.XBDestination;
import org.xmlBlaster.jms.XBMessage;
import org.xmlBlaster.jms.XBMessageProducer;
import org.xmlBlaster.jms.XBSession;
import org.xmlBlaster.jms.XBStreamingMessage;
import org.xmlBlaster.util.Execute;
import org.xmlBlaster.util.I_ExecuteListener;
import org.xmlBlaster.util.I_ReplaceContent;
import org.xmlBlaster.util.Timestamp;
import org.xmlBlaster.util.def.PriorityEnum;
import org.xmlBlaster.util.dispatch.ConnectionStateEnum;
import org.xmlBlaster.util.qos.ClientProperty;

public class InitialUpdater implements I_Update, I_ContribPlugin, I_ConnectionStateListener, I_ReplaceContent, ReplicationConstants, I_ReplSource {

   public class ConnectionInfo {
      private Connection connection;
      private boolean committed;

      public ConnectionInfo(Connection conn) {
         this.connection = conn;
      }
     
      /**
       * @return Returns the connection.
       */
      public Connection getConnection() {
         return connection;
      }
     
      /**
       * @return Returns the committed.
       */
      public boolean isCommitted() {
         return committed;
      }
     
      /**
       * @param committed The committed to set.
       */
      public synchronized void commit() {
         if (this.connection == null)
            return;
         try {
            this.connection.commit();
            this.committed = true;
         }
         catch (SQLException ex) {
            ex.printStackTrace();
         }
      }
   }
  
   class ExecuteListener implements I_ExecuteListener {

      StringBuffer errors = new StringBuffer();
      long sleepTime = 0L;
      ConnectionInfo connInfo;

      public ExecuteListener(String stringToCheck, ConnectionInfo connInfo) {
         this.connInfo = connInfo;
      }
     
      public void stderr(String data) {
         log.warning(data);
         // log.info(data);
         // this.errors.append(data).append("\n");
         // sleep(this.sleepTime);
         // checkForCommit(data);
      }

      public void stdout(String data) {
         log.info(data);
         // sleep(this.sleepTime);
         // checkForCommit(data);
      }

      String getErrors() {
         return this.errors.toString();
      }
   }

  
  
   class ExecutionThread extends Thread {
     
      private String replTopic;
      private I_DbSpecific dbSpec;
      private String initialFilesLocation;

      private List slaveNamesList;
      private String replManagerAddress;
      private String version;
     
      /**
       *
       * @param replTopic The topic to use to publish the initial data
       * @param replManagerAddress The address to which to send the end-of-data message
       * @param dbSpecific
       * @param initialFilesLocation
       */
      public ExecutionThread(String replTopic, String replManagerAddress, I_DbSpecific dbSpecific, String initialFilesLocation) {
         this.slaveNamesList = new ArrayList();
         this.replManagerAddress = replManagerAddress;
         this.replTopic = replTopic;
         this.dbSpec = dbSpecific;
         this.initialFilesLocation = initialFilesLocation;
      }
     
      /**
       * Adds a destination to this initial update (so that it is possible to perform several I.U.
       * with the same data.
       *
       * @param destination The destination (PtP) of this initial Update
       * @param slaveName The name of the slave
       * @param version The version for this update.
       * @return true if the entry has the correct version, false otherwise (in which case it
       * will not be added).
       */
      public boolean add(String slaveName, String replManagerAddress_, String ver) {
         if (this.version == null)
            this.version = ver;
         else {
            if (!this.version.equals(ver)) {
               return false;
            }
         }
         if (this.replManagerAddress == null)
            this.replManagerAddress = replManagerAddress_;
         else {
            if (!this.replManagerAddress.equals(replManagerAddress_)) {
               return false;
            }
         }
         this.slaveNamesList.add(slaveName);
         return true;
      }
     
      public void process() {
         start();
      }
     
      public void run() {
         String[] slaveNames = (String[])this.slaveNamesList.toArray(new String[this.slaveNamesList.size()]);
         try {
            this.dbSpec.initiateUpdate(replTopic, this.replManagerAddress, slaveNames, this.version, this.initialFilesLocation);
         }
         catch (Exception ex) {
            log.severe("An Exception occured when running intial update for '" + replTopic + "' for '" + this.replManagerAddress + "' as slave '" + SpecificDefault.toString(slaveNames) + "'");
            ex.printStackTrace();
         }
      }
   };

  
   private String CREATE_COUNTER_KEY = "_createCounter";
   private static Logger log = Logger.getLogger(InitialUpdater.class.getName());

   /** used to publish CREATE changes */
   protected I_ChangePublisher publisher;
   protected I_Info info;
   private String initialCmd;
   private String initialCmdPre;
   private String initialCmdPath;
   private boolean keepDumpFiles;
   private String replPrefix;
   private I_DbSpecific dbSpecific;
   private String stringToCheck;
   private Map runningExecutes = new HashMap();
   private String initialDataTopic;
   /** Contains updates to be executed where the key is the version */
   private Map preparedUpdates = new HashMap();
   private boolean collectInitialUpdates;
   private boolean initialDumpAsXml;
   private int initialDumpMaxSize = 1048576;
   private long initialCmdSleepDelay = 10L;
   private ReplSourceEngine replSourceEngine;
  
   /**
    * Not doing anything.
    */
   public InitialUpdater(I_DbSpecific dbSpecific) {
      this.dbSpecific = dbSpecific;
   }

   /**
    * @see org.xmlBlaster.contrib.I_ContribPlugin#getUsedPropertyKeys()
    */
   public final Set getUsedPropertyKeys() {
      Set set = new HashSet();
      set.add("replication.prefix");
      set.add("maxRowsOnCreate");
      if (this.publisher != null)
         PropertiesInfo.addSet(set, this.publisher.getUsedPropertyKeys());
      return set;
   }

   public ConnectionInfo getConnectionInfo(Connection conn) {
      return new ConnectionInfo(conn);
   }
  
   /**
    * @see I_DbSpecific#init(I_Info)
    *
    */
   public final void init(I_Info info_) throws Exception {
      log.info("going to initialize the resources");
      this.info = info_;
      this.replPrefix = SpecificDefault.getReplPrefix(this.info);

      this.initialCmdPath = this.info.get("replication.path", "${user.home}/tmp");
      log.fine("replication.path='" + this.initialCmdPath + "'");
      this.initialCmd = this.info.get("replication.initialCmd", null);
      if (this.initialCmd != null && this.initialCmd.trim().length() < 1) // if emtpy
         this.initialCmd = null;
      this.initialCmdPre = info_.get("replication.initialCmdPre", null);
      this.keepDumpFiles = info_.getBoolean("replication.keepDumpFiles", false);
      // this.stringToCheck = info.get("replication.initial.stringToCheck", "rows exported");
      this.stringToCheck = info_.get("replication.initial.stringToCheck", null);
      this.initialDataTopic = info_.get("replication.initialDataTopic", "replication.initialData");
      String currentVersion = this.info.get("replication.version", "0.0");
      // this is only needed on the master side
      this.info.put(SUPPORTED_VERSIONS, getSupportedVersions(currentVersion));
      this.initialDumpAsXml = this.info.getBoolean("replication.initialDumpAsXml", false);
      this.initialDumpMaxSize = this.info.getInt("replication.initialDumpMaxSize", 1048576);      
      if (this.initialDumpAsXml)
         this.initialDumpMaxSize = (int)(0.666 * this.initialDumpMaxSize);
        
      boolean needsPublisher = this.info.getBoolean(I_DbSpecific.NEEDS_PUBLISHER_KEY, true);
      if (needsPublisher) {
         this.info.putObject("_connectionStateListener", this);
         this.publisher = DbWatcher.getChangePublisher(this.info);
      }
     
      // registering this instance to the Replication Manager
      HashMap subscriptionMap = new HashMap();
      subscriptionMap.put("ptp", "true");
      if (this.publisher != null) {
         replSourceEngine = new ReplSourceEngine(replPrefix, publisher, this);
         this.publisher.registerAlertListener(this, subscriptionMap);
      }
      this.initialCmdSleepDelay = this.info.getLong("replication.initialCmd.sleepDelay", 10L);
     
      // rewrite the default behaviour of the timestamp detector to detect even UPDATES (deletes are also updates)
      /*
      boolean detectUpdates = this.info.getBoolean("detector.detectUpdates", false);
      if (detectUpdates)
         throw new Exception("You have configured the DbWatcher to have 'detector.detectUpdates=true'. This is not allowed in replication");
      log.info("overwriting the default for 'detector.detectUpdates' from 'true' to 'false' since we are in replication");
      this.info.put("detector.detectUpdates", "" + false);
      */

      // used for versioning (shall be passed to the ConnectQos when connecting (make sure this is
      // invoked before the mom connects
   }

   /**
    * @see I_DbSpecific#shutdown()
    */
   public final void shutdown() throws Exception {
      try {
         if (this.publisher != null) {
            log.info("going to shutdown: cleaning up resources");
            this.publisher.shutdown();
            this.publisher = null;
         }
      }
      catch (Throwable e) {
         e.printStackTrace();
         log.warning(e.toString());
      }
   }

   /**
    * Publishes a 'CREATE TABLE' operation to the XmlBlaster. It is used on the
    * DbWatcher side. Note that it is also used to publish the INSERT commands
    * related to a CREATE TABLE operation, i.e. if on a CREATE TABLE operation
    * it is found that the table is already populated when reading it, then
    * these INSERT operations are published with this method.
    *
    * @param counter
    *           The counter indicating which message number it is. The create
    *           opeation itself will have '0', the subsequent associated INSERT
    *           operations will have an increasing number (it is the number of
    *           the message not the number of the associated INSERT operation).
    * @param destination in case it is a ptp it is sent only to that destination,
    *         otherwise it is sent as a pub/sub. This parameter also determines on
    *         which topic to publish.
    * @return a uniqueId identifying this publish operation.
    *
    * @throws Exception
    */
   public final String publishCreate(int counter, SqlInfo updateInfo, long newReplKey, String destination) throws Exception {
      log.info("publishCreate invoked for counter '" + counter + "'");
      SqlDescription description = updateInfo.getDescription();
     
      description.setAttribute(new ClientProperty(CREATE_COUNTER_KEY, "int",
            null, "" + counter));
      description.setAttribute(new ClientProperty(EXTRA_REPL_KEY_ATTR, null, null, "" + newReplKey));
      if (counter == 0) {
         description.setCommand(CREATE_ACTION);
         description.setAttribute(new ClientProperty(
               ACTION_ATTR, null, null,
               CREATE_ACTION));
      } else {
         description.setCommand(REPLICATION_CMD);
         description.setAttribute(new ClientProperty(
               ACTION_ATTR, null, null,
               INSERT_ACTION));
      }

      Map map = new HashMap();
      map.put("_command", "CREATE");
      if (destination != null)
         map.put("_destination", destination);
      // and later put the part number inside
      // this is implicit association: if a destination is null, then it is ment for everybody,
      // i.e. it must be published on the replication topic, otherwise it is published on the
      // initial update topic.
     
      if (destination != null)
         map.put(ContribConstants.TOPIC_NAME, this.initialDataTopic);
      else {
         String topic = this.info.get("mom.topicName", null);
         if (topic != null)
            map.put(ContribConstants.TOPIC_NAME, topic);
      }

      if (this.publisher == null) {
         log.warning("SpecificDefaut.publishCreate publisher is null, can not publish. Check your configuration");
         return null;
      }
      else
         return this.publisher.publish("createTableMsg", updateInfo.toXml("").getBytes(), map);
   }

   /**
    * Sending this message will reactivate the Dispatcher of the associated slave
    * @param topic
    * @param filename
    * @param replManagerAddress
    * @param slaveName
    * @param minKey
    * @param maxKey
    * @throws Exception
    */
   public final void sendInitialDataResponseOnly(String[] slaveSessionNames, String replManagerAddress, long minKey, long maxKey) throws Exception {
      if (replSourceEngine != null)
         replSourceEngine.sendInitialDataResponse(slaveSessionNames, replManagerAddress, minKey, maxKey);
   }
  
   public final void sendInitialDataResponse(String[] slaveSessionNames, String shortFilename, String replManagerAddress, long minKey, long maxKey, String requestedVersion, String currentVersion, String initialFilesLocation) throws Exception {
      // in this case they are just decorators around I_ChangePublisher
      if (this.publisher == null) {
         if (shortFilename == null)
            shortFilename = "noFileSinceNoInitialData";
         log.warning("The publisher has not been initialized, can not publish message for '" + shortFilename + "'");
         return;
      }
      XBSession session = this.publisher.getJmsSession();
      // XBMessageProducer producer = new XBMessageProducer(session, new XBDestination(topic, null));
     
      XBDestination dest = new XBDestination(this.initialDataTopic, SpecificDefault.toString(slaveSessionNames));
     
      XBMessageProducer producer = new XBMessageProducer(session, dest);
      producer.setPriority(PriorityEnum.HIGH_PRIORITY.getInt());
      producer.setDeliveryMode(DeliveryMode.PERSISTENT);
     
      String dumpId = "" + new Timestamp().getTimestamp();
      // now read the file which has been generated
      String filename = null;
      if (shortFilename != null) {
         log.info("sending initial file '" + shortFilename + "' for user '" + SpecificDefault.toString(slaveSessionNames+ "'");
         if (this.initialCmdPath != null)
            filename = this.initialCmdPath + File.separator + shortFilename;
         else
            filename = shortFilename;
         File file = new File(filename);
        
         FileInputStream fis = new FileInputStream(file);
        
         XBStreamingMessage msg = session.createStreamingMessage(this);
         msg.setIntProperty(XBConnectionMetaData.JMSX_MAX_CHUNK_SIZE, this.initialDumpMaxSize);
         msg.setStringProperty(FILENAME_ATTR, shortFilename);
         msg.setLongProperty(REPL_KEY_ATTR, minKey);
         msg.setStringProperty(DUMP_ACTION, "true");
         if (initialFilesLocation != null) {
            msg.setStringProperty(INITIAL_FILES_LOCATION, initialFilesLocation);
            msg.setStringProperty(INITIAL_DATA_ID, dumpId);
         }
         msg.setInputStream(fis);
         producer.send(msg);
         // make a version copy if none exists yet
         boolean doDelete = true;
         if (currentVersion != null) {
            String backupFileName = this.initialCmdPath + File.separator + VersionTransformerCache.buildFilename(this.replPrefix, currentVersion);
            File backupFile = new File(backupFileName);
            if (!backupFile.exists()) {
               final boolean copy = true;
               if (copy) {
                  BufferedInputStream bis = new BufferedInputStream(file.toURL().openStream());
                  FileOutputStream os = new FileOutputStream(backupFileName);
                  long length = file.length();
                  long remaining = length;
                  byte[] buf = new byte[this.initialDumpMaxSize];
                  while (remaining > 0) {
                     int tot = bis.read(buf);
                     remaining -= tot;
                     os.write(buf, 0, tot);
                  }
                  bis.close();
                  os.close();
               }
               else {
                  boolean ret = file.renameTo(backupFile);
                  if (!ret)
                     log.severe("could not move the file '" + filename + "' to '" + backupFileName + "' reason: could it be that the destination is not a local file system ? try the flag 'copyOnMove='true' (see http://www.xmlblaster.org/xmlBlaster/doc/requirements/client.filepoller.html");
                  else
                     doDelete = false;
               }
            }
         }
         else
            log.severe("The version is not set. Can not make a backup copy of the version file");
       
         boolean isRequestingCurrentVersion = currentVersion.equalsIgnoreCase(requestedVersion);
         if (!this.keepDumpFiles && doDelete && isRequestingCurrentVersion) {
            if (file.exists()) {
               boolean ret = file.delete();
               if (!ret)
                  log.warning("could not delete the file '" + filename + "'");
            }
         }
         fis.close();
      }
      else
         log.info("initial update requested with no real initial data for '" + SpecificDefault.toString(slaveSessionNames) + "' and for replication '" + this.replPrefix + "'");

      // send the message for the status change
      if (initialFilesLocation != null) {
         // then we save it in a file but we must tell it is finished now
         TextMessage  endMsg = session.createTextMessage();
         endMsg.setText("INITIAL UPDATE WILL BE STORED UNDER '" + initialFilesLocation + "'");
         endMsg.setBooleanProperty(INITIAL_DATA_END, true);
         endMsg.setStringProperty(INITIAL_DATA_ID, dumpId);
         endMsg.setStringProperty(INITIAL_FILES_LOCATION, initialFilesLocation);
         producer.send(endMsg);
         endMsg = session.createTextMessage();
         endMsg.setText("INITIAL UPDATE WILL BE STORED UNDER '" + initialFilesLocation + "' (going to remote)");
         endMsg.setBooleanProperty(INITIAL_DATA_END_TO_REMOTE, true);
         endMsg.setStringProperty(INITIAL_DATA_ID, dumpId);
         endMsg.setStringProperty(INITIAL_FILES_LOCATION, initialFilesLocation);
         producer.send(endMsg);
      }
      sendInitialDataResponseOnly(slaveSessionNames, replManagerAddress, minKey, maxKey);
      if (replSourceEngine != null)
         replSourceEngine.sendEndOfTransitionMessage(info, session, initialFilesLocation, shortFilename, dumpId, producer);
   }
  
  
   /**
    * This method is used where the end of transition message has to be sent separately (for example for read-only applications without triggers)
    * @param slaveSessionNames
    * @throws JMSException
    */
   public void sendEndOfTransitionMessage(String[] slaveSessionNames) throws JMSException {
      if (replSourceEngine != null)
         replSourceEngine.sendEndOfTransitionMessage(info, initialDataTopic, slaveSessionNames);
   }
  
   /**
    * Executes an Operating System command.
    *
    * @param cmd
    * @throws Exception
    */
   private void osExecute(String[] slaveNames, String cmd, ConnectionInfo connInfo) throws Exception {
      try {
         // if (Execute.isWindows()) cmd = "cmd " + cmd;
         String[] args = ReplaceVariable.toArray(cmd, " ");
         log.info("running for '" + SpecificDefault.toString(slaveNames) + "' for cmd '" + cmd + "'");
         Execute execute = new Execute(args, null, this.initialCmdSleepDelay);
         synchronized (this) {
            if (slaveNames != null) {
               for (int i=0; i < slaveNames.length; i++) {
                  String slaveName = slaveNames[i];
                  if (slaveName != null) {
                     Execute oldExecute = (Execute)this.runningExecutes.remove(slaveName);
                     if (oldExecute != null) {
                        log.warning("A new request for an initial update has come for '" + slaveName + "' but there is one already running. Will shut down the running one first");
                        oldExecute.stop();
                        log.info("old initial request for '" + slaveName + "' has been shut down");
                        this.runningExecutes.put(slaveName, execute);
                     }
                  }
               }
            }
         }
         ExecuteListener listener = new ExecuteListener(this.stringToCheck, connInfo);
         execute.setExecuteListener(listener);
         execute.run(); // blocks until finished
         if (execute.getExitValue() != 0) {
            throw new Exception("Exception occured on executing '" + cmd + "': " + listener.getErrors());
         }
      }
      finally {
         synchronized (this) {
            if (slaveNames != null) {
               for (int i=0; i < slaveNames.length; i++) {
                  String slaveName = slaveNames[i];
                  if (slaveName != null)
                     this.runningExecutes.remove(slaveName);
               }
            }
         }
      }
   }
  
   private String getSupportedVersions(String currentReplVersion) throws Exception {
      if (this.initialCmdPath == null)
         throw new Exception("InitialUpdater.getSupportedVersions invoked with no initialCmdPath specified");
      File dir = new File(this.initialCmdPath);
      if (!dir.exists())
         throw new Exception("InitialUpdater.getSupportedVersions invoked but the directory '" + this.initialCmdPath + "' does not exist");
      if (!dir.isDirectory())
         throw new Exception("InitialUpdater.getSupportedVersions invoked but '" + this.initialCmdPath + "' is not a directory");
      File[] childs = dir.listFiles();
      TreeSet set = new TreeSet();
      for (int i=0; i < childs.length; i++) {
         if (childs[i].isDirectory())
            continue;
         if (!childs[i].canRead())
            continue;
         String filename = childs[i].getName();
         String prefix = VersionTransformerCache.stripReplicationPrefix(filename).trim();
         String version = VersionTransformerCache.stripReplicationVersion(filename);
         if (version != null && prefix.equals(this.replPrefix)) {
            log.info("added version='" + version + "' for prefix='" + prefix + "' when encountering file='" + filename + "'");
            set.add(prefix + VERSION_TOKEN + version.trim());
         }
      }
      if (currentReplVersion != null) {
         String txt = this.replPrefix + VERSION_TOKEN + currentReplVersion;
         set.add(txt);
         log.info("added default version '" + txt + "'");
      }
      return InfoHelper.getIteratorAsString(set.iterator());
   }
  
  
   /**
    * This is the intial command which is invoked on the OS. It is basically used for the
    * import and export of the DB. Could also be used for other operations on the OS.
    * It is a helper method. If the initialCmd (the 'replication.initialCmd' property) is null,
    * then it silently returns null as the filename.
    *
    * @param argument the argument to execute. It is normally the absolute file name to be
    * exported/imported. Can be null, if null, one is generated by using the current timestamp.
    * @param conn the connection to perform a commit on. Can be null, if null, no commit
    * is done asynchronously.
    *
    * @throws Exception
    */
   public final String initialCommand(String[] slaveNames, String completeFilename, ConnectionInfo connInfo, String version) throws Exception {
      if (this.initialCmd == null)
         return null;
      String filename = null;
      if (completeFilename == null) {
         filename = "" + (new Timestamp()).getTimestamp() + ".dmp";
         completeFilename = this.initialCmdPath + File.separator + filename;
      }
      // String cmd = this.initialCmd + " \"" + completeFilename + "\"";
      String cmd = this.initialCmd + " " + completeFilename;
      if (version != null)
         cmd += " " + version;
      osExecute(slaveNames, cmd, connInfo);
      return filename;
   }

   /**
    * It builds the name to return in case the version is different from the current version.
    * If initialCmd has been defined as null, null is silenty returned.
    * @param replPrefix_
    * @param requestedVersion
    * @return
    */
   public String buildFilename(String replPrefix_, String requestedVersion) {
      if (initialCmd == null)
         return null;
      return VersionTransformerCache.buildFilename(replPrefix_, requestedVersion);
   }

  
   public final void initialCommandPre() throws Exception {
      if (this.initialCmdPre == null)
         return;
      osExecute(null, this.initialCmdPre, null);
   }
  
   /**
    * Sends a new registration message
    * @see org.xmlBlaster.client.I_ConnectionStateListener#reachedAlive(org.xmlBlaster.util.dispatch.ConnectionStateEnum, org.xmlBlaster.client.I_XmlBlasterAccess)
    */
   public void reachedAlive(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
      try {
         log.info("connection is going from '" + oldState + " to 'ALIVE'");
      }
      catch (Exception ex) {
         ex.printStackTrace();
      }
   }

   public void reachedAliveSync(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
   }

   /**
    * @see org.xmlBlaster.client.I_ConnectionStateListener#reachedDead(org.xmlBlaster.util.dispatch.ConnectionStateEnum, org.xmlBlaster.client.I_XmlBlasterAccess)
    */
   public void reachedDead(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
      log.info("connection is going in DEAD from '" + oldState + "'");
   }

   /**
    * @see org.xmlBlaster.client.I_ConnectionStateListener#reachedPolling(org.xmlBlaster.util.dispatch.ConnectionStateEnum, org.xmlBlaster.client.I_XmlBlasterAccess)
    */
   public synchronized void reachedPolling(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
      log.info("connection is going in POLLING from '" + oldState + "'");
      this.info.put("_InitialUpdaterRegistered", "false");
   }

   // enforced by I_ReplaceContent
   /**
    *
    */
   public byte[] replace(byte[] oldContent, Map clientProperties) {
      if (!this.initialDumpAsXml)
      return oldContent;
      SqlInfo sqlInfo = new SqlInfo(this.info);
      SqlDescription description = new SqlDescription(this.info);
      description.setCommand(INITIAL_XML_CMD);
      ClientProperty prop = (ClientProperty)clientProperties.get(FILENAME_ATTR);
     
      if (prop != null)
         description.setAttribute(prop);
      prop = (ClientProperty)clientProperties.get(TIMESTAMP_ATTR);
     
      if (prop != null)
         description.setAttribute(prop);

      prop = XBMessage.get(XBConnectionMetaData.JMSX_GROUP_SEQ, clientProperties);
      if (prop != null) {
         prop = new ClientProperty(XBConnectionMetaData.JMSX_GROUP_SEQ, null, null, prop.getStringValue());
         description.setAttribute(prop);
      }
     
      prop = XBMessage.get(XBConnectionMetaData.JMSX_GROUP_EOF, clientProperties);
      if (prop != null) {
         prop = new ClientProperty(XBConnectionMetaData.JMSX_GROUP_EOF, null, null, prop.getStringValue());
         description.setAttribute(prop);
      }
     
      prop = XBMessage.get(XBConnectionMetaData.JMSX_GROUP_EX, clientProperties);
      if (prop != null) {
         prop = new ClientProperty(XBConnectionMetaData.JMSX_GROUP_EX, null, null, prop.getStringValue());
         description.setAttribute(prop);
      }
     
      prop = new ClientProperty(DUMP_CONTENT_ATTR, oldContent);
      description.setAttribute(prop);
      sqlInfo.setDescription(description);
      String ret = sqlInfo.toXml("");
      if (log.isLoggable(Level.FINEST))
         log.finest(ret);
      return ret.getBytes();
   }

  
   /**
    * @see org.xmlBlaster.contrib.I_Update#update(java.lang.String, byte[], java.util.Map)
    */
   public final void update(String topic, InputStream is, Map attrMap) {
      if (replSourceEngine != null)
         replSourceEngine.update(topic, is, attrMap);
   }

   public void collectInitialUpdate() {
      synchronized(this.preparedUpdates) {
         this.collectInitialUpdates = true;
         log.info("Will collect initial updates until message '" + INITIAL_UPDATE_START_BATCH + "' comes");
      }
   }

   public byte[] executeStatement(String sql, long maxResponseEntries, boolean isHighPrio, boolean isMaster, String sqlTopic, String statementId) throws Exception {
      return this.dbSpecific.broadcastStatement(sql, maxResponseEntries, isHighPrio, isMaster, sqlTopic, statementId);
   }
  
   public void recreateTriggers() throws Exception {
      final boolean force = true;
      final boolean forceSend = false;
      this.dbSpecific.addTriggersIfNeeded(force, null, forceSend);
   }
  
   public void cancelUpdate(String slaveName) {
      this.dbSpecific.cancelUpdate(slaveName);
      synchronized (this) {
         Execute exec = (Execute)this.runningExecutes.remove(slaveName);
         if (exec != null)
            exec.stop();
      }
   }

  
   public void initialUpdate(String replTopic, String replManagerAddress, String slaveName, String requestedVersion, String initialFilesLocation, boolean onlyRegister) {
      this.dbSpecific.clearCancelUpdate(slaveName);
      if (onlyRegister || this.collectInitialUpdates) {
         log.info("The update for slave='" + slaveName + "' and version='" + requestedVersion + "' will be queued since onlyRegister='" + onlyRegister + "' and collectInitialUpdates='" + this.collectInitialUpdates + "'");
         String key = "__default";
         if (requestedVersion != null)
            key = requestedVersion;
         synchronized(this.preparedUpdates) {
            ExecutionThread executionThread = (ExecutionThread)this.preparedUpdates.get(key);
            if (executionThread == null) {
               executionThread = new ExecutionThread(replTopic, replManagerAddress, this.dbSpecific, initialFilesLocation);
               this.preparedUpdates.put(key, executionThread);
            }
            executionThread.add(slaveName, replManagerAddress, requestedVersion);
         }
      }
      else {
         ExecutionThread executionThread = new ExecutionThread(replTopic, replManagerAddress, this.dbSpecific, initialFilesLocation);
         executionThread.add(slaveName, replManagerAddress, requestedVersion);
         executionThread.process();
      }
   }
  
   public String getTopic() {
      return this.info.get("mom.topicName", null);     
   }
  
   public void startInitialUpdateBatch() {
      synchronized (this.preparedUpdates) {
         this.collectInitialUpdates = false; // reset the flag
         try {
            ExecutionThread[] threads = (ExecutionThread[])this.preparedUpdates.values().toArray(new ExecutionThread[this.preparedUpdates.size()]);
            for (int i=0; i < threads.length; i++)
               threads[i].process();
         }
         finally {
            this.preparedUpdates.clear();
         }
      }
   }
  
}
TOP

Related Classes of org.xmlBlaster.contrib.replication.impl.InitialUpdater$ConnectionInfo

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.