Package com.orientechnologies.orient.server.distributed.task

Source Code of com.orientechnologies.orient.server.distributed.task.ODeployClusterTask

/*
*
*  *  Copyright 2014 Orient Technologies LTD (info(at)orientechnologies.com)
*  *
*  *  Licensed under the Apache License, Version 2.0 (the "License");
*  *  you may not use this file except in compliance with the License.
*  *  You may obtain a copy of the License at
*  *
*  *       http://www.apache.org/licenses/LICENSE-2.0
*  *
*  *  Unless required by applicable law or agreed to in writing, software
*  *  distributed under the License is distributed on an "AS IS" BASIS,
*  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*  *  See the License for the specific language governing permissions and
*  *  limitations under the License.
*  *
*  * For more information: http://www.orientechnologies.com
*/
package com.orientechnologies.orient.server.distributed.task;

import com.orientechnologies.common.io.OFileUtils;
import com.orientechnologies.common.log.OLogManager;
import com.orientechnologies.orient.core.Orient;
import com.orientechnologies.orient.core.command.OCommandOutputListener;
import com.orientechnologies.orient.core.config.OGlobalConfiguration;
import com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx;
import com.orientechnologies.orient.core.storage.OCluster;
import com.orientechnologies.orient.server.OServer;
import com.orientechnologies.orient.server.distributed.ODistributedConfiguration;
import com.orientechnologies.orient.server.distributed.ODistributedDatabaseChunk;
import com.orientechnologies.orient.server.distributed.ODistributedException;
import com.orientechnologies.orient.server.distributed.ODistributedServerLog;
import com.orientechnologies.orient.server.distributed.ODistributedServerLog.DIRECTION;
import com.orientechnologies.orient.server.distributed.ODistributedServerManager;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;

/**
* Ask for deployment of single cluster from a remote node.
*
* @author Luca Garulli (l.garulli--at--orientechnologies.com)
*
*/
public class ODeployClusterTask extends OAbstractReplicatedTask implements OCommandOutputListener {
  public final static int    CHUNK_MAX_SIZE = 1048576;         // 1MB
  public static final String DEPLOYCLUSTER  = "deploycluster.";
  protected long             random;

  protected String           clusterName;

  public ODeployClusterTask() {
  }

  public ODeployClusterTask(final String iName) {
    random = UUID.randomUUID().getLeastSignificantBits();
    clusterName = iName;
  }

  @Override
  public Object execute(final OServer iServer, ODistributedServerManager iManager, final ODatabaseDocumentTx database)
      throws Exception {

    if (!getNodeSource().equals(iManager.getLocalNodeName())) {
      if (database == null)
        throw new ODistributedException("Database instance is null");

      final String databaseName = database.getName();

      final ODistributedConfiguration dCfg = iManager.getDatabaseConfiguration(databaseName);
      if (!clusterName.equalsIgnoreCase(dCfg.getMasterServer(clusterName)))
        // NOT MASTER SERVER FOR THIS CLUSTER, SKIP IT
        return Boolean.FALSE;

      final Long lastDeployment = (Long) iManager.getConfigurationMap().get(DEPLOYCLUSTER + databaseName + "." + clusterName);
      if (lastDeployment != null && lastDeployment.longValue() == random) {
        // SKIP IT
        ODistributedServerLog.debug(this, iManager.getLocalNodeName(), getNodeSource(), DIRECTION.NONE,
            "skip deploying cluster '%s' because already executed", clusterName);
        return Boolean.FALSE;
      }

      iManager.getConfigurationMap().put(DEPLOYCLUSTER + databaseName + "." + clusterName, random);

      ODistributedServerLog.warn(this, iManager.getLocalNodeName(), getNodeSource(), DIRECTION.OUT, "deploying cluster %s...",
          databaseName);

      final File f = new File(Orient.getTempPath() + "/backup_" + database.getName() + "_" + clusterName + ".zip");
      if (f.exists())
        f.delete();
      else
        f.getParentFile().mkdirs();
      f.createNewFile();

      ODistributedServerLog.info(this, iManager.getLocalNodeName(), getNodeSource(), DIRECTION.OUT,
          "creating backup of cluster '%s' in directory: %s...", databaseName, f.getAbsolutePath());

      final AtomicLong lastOperationId = new AtomicLong(-1);

      FileOutputStream fileOutputStream = new FileOutputStream(f);
      try {
        final OCluster cluster = database.getStorage().getClusterByName(clusterName);

        // cluster.backup(fileOutputStream, null, null, this,
        // OGlobalConfiguration.DISTRIBUTED_DEPLOYDB_TASK_COMPRESSION.getValueAsInteger(), CHUNK_MAX_SIZE);

        final long fileSize = f.length();

        ODistributedServerLog.info(this, iManager.getLocalNodeName(), getNodeSource(), DIRECTION.OUT,
            "sending the compressed cluster '%s.%s' over the NETWORK to node '%s', size=%s, lastOperationId=%d...", databaseName,
            clusterName, getNodeSource(), OFileUtils.getSizeAsString(fileSize), lastOperationId.get());

        final ODistributedDatabaseChunk chunk = new ODistributedDatabaseChunk(lastOperationId.get(), f, 0, CHUNK_MAX_SIZE);

        ODistributedServerLog.info(this, iManager.getLocalNodeName(), getNodeSource(), DIRECTION.OUT,
            "- transferring chunk #%d offset=%d size=%s...", 1, 0, OFileUtils.getSizeAsNumber(chunk.buffer.length));

        if (chunk.last)
          // NO MORE CHUNKS: SET THE NODE ONLINE (SYNCHRONIZING ENDED)
          iManager.setDatabaseStatus(iManager.getLocalNodeName(), databaseName, ODistributedServerManager.DB_STATUS.ONLINE);

        return chunk;

      } finally {
        fileOutputStream.close();
      }

    } else
      ODistributedServerLog.debug(this, iManager.getLocalNodeName(), getNodeSource(), DIRECTION.NONE,
          "skip deploying cluster %s.%s from the same node");

    return Boolean.FALSE;
  }

  @Override
  public RESULT_STRATEGY getResultStrategy() {
    return RESULT_STRATEGY.UNION;
  }

  @Override
  public QUORUM_TYPE getQuorumType() {
    return QUORUM_TYPE.NONE;
  }

  @Override
  public boolean isRequireNodeOnline() {
    return false;
  }

  @Override
  public long getTimeout() {
    return OGlobalConfiguration.DISTRIBUTED_DEPLOYDB_TASK_SYNCH_TIMEOUT.getValueAsLong();
  }

  @Override
  public String getPayload() {
    return null;
  }

  @Override
  public String getName() {
    return "deploy_cluster";
  }

  @Override
  public void writeExternal(final ObjectOutput out) throws IOException {
    out.writeLong(random);
    out.writeUTF(clusterName);
  }

  @Override
  public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
    random = in.readLong();
    clusterName = in.readUTF();
  }

  @Override
  public void onMessage(String iText) {
    if (iText.startsWith("\n"))
      iText = iText.substring(1);

    OLogManager.instance().info(this, iText);
  }

  @Override
  public boolean isRequiredOpenDatabase() {
    return true;
  }

}
TOP

Related Classes of com.orientechnologies.orient.server.distributed.task.ODeployClusterTask

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.