Package org.hive2hive.core.network.data.futures

Source Code of org.hive2hive.core.network.data.futures.FuturePutListener

package org.hive2hive.core.network.data.futures;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.concurrent.CountDownLatch;

import net.tomp2p.futures.BaseFutureAdapter;
import net.tomp2p.futures.FutureDigest;
import net.tomp2p.futures.FuturePut;
import net.tomp2p.futures.FutureRemove;
import net.tomp2p.peers.Number160;
import net.tomp2p.peers.Number640;
import net.tomp2p.peers.PeerAddress;
import net.tomp2p.rpc.DigestResult;

import org.hive2hive.core.H2HConstants;
import org.hive2hive.core.model.NetworkContent;
import org.hive2hive.core.network.H2HStorageMemory.PutStatusH2H;
import org.hive2hive.core.network.data.DataManager;
import org.hive2hive.core.network.data.parameters.IParameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A put future adapter for verifying a put of a {@link NetworkContent} object. Provides failure handling and
* a blocking wait.</br></br>
*
* <b>Failure Handling</b></br>
* Putting can fail when the future object failed, when the future object contains wrong data or the
* responding node detected a failure. See {@link PutStatusH2H} for possible failures. If putting fails the
* adapter retries it to a certain threshold (see {@link H2HConstants.PUT_RETRIES}). After a successful put
* the adapter verifies with a digest if no concurrent modification happened. All puts are asynchronous.
* That's why the future listener attaches himself to the new future objects so that the adapter can finally
* notify his/her listener about a success or failure.
*
* @author Seppi, Nico
*/
public class FuturePutListener extends BaseFutureAdapter<FuturePut> {

  private final static Logger logger = LoggerFactory.getLogger(FuturePutListener.class);

  private final IParameters parameters;
  private final DataManager dataManager;
  private final CountDownLatch latch;

  // used to count put retries
  private int putTries = 0;
  private boolean success = false;

  public FuturePutListener(IParameters parameters, DataManager dataManager) {
    this.parameters = parameters;
    this.dataManager = dataManager;
    this.latch = new CountDownLatch(1);
  }

  /**
   * Wait (blocking) until the put is done
   *
   * @return true if successful, false if not successful
   */
  public boolean await() {
    try {
      latch.await();
    } catch (InterruptedException e) {
      logger.error("Could not wait until put has finished.", e);
    }

    return success;
  }

  @Override
  public void operationComplete(FuturePut future) throws Exception {
    logger.trace("Start verification of put. '{}'", parameters.toString());

    if (future.isFailed()) {
      logger.warn("Put future was not successful. '{}'", parameters.toString());
      retryPut();
      return;
    } else if (future.getRawResult().isEmpty()) {
      logger.warn("Returned raw results are empty.");
      retryPut();
      return;
    }

    // analyze returned put status
    final List<PeerAddress> versionConflict = new ArrayList<PeerAddress>();
    List<PeerAddress> fail = new ArrayList<PeerAddress>();
    for (PeerAddress peeradress : future.getRawResult().keySet()) {
      Map<Number640, Byte> map = future.getRawResult().get(peeradress);
      if (map == null) {
        logger.warn("A node gave no status (null) back. '{}'", parameters.toString());
        fail.add(peeradress);
      } else {
        for (Number640 key : future.getRawResult().get(peeradress).keySet()) {
          byte status = future.getRawResult().get(peeradress).get(key);
          switch (PutStatusH2H.values()[status]) {
            case OK:
              break;
            case FAILED:
            case FAILED_NOT_ABSENT:
            case FAILED_SECURITY:
              logger.warn("A node denied putting data. Reason = '{}'. '{}'",
                  PutStatusH2H.values()[status], parameters.toString());
              fail.add(peeradress);
              break;
            case VERSION_CONFLICT:
            case VERSION_CONFLICT_NO_BASED_ON:
            case VERSION_CONFLICT_NO_VERSION_KEY:
            case VERSION_CONFLICT_OLD_TIMESTAMP:
              logger.warn("Version conflict detected. Reason = '{}'. '{}'",
                  PutStatusH2H.values()[status], parameters.toString());
              versionConflict.add(peeradress);
              break;
          }
        }
      }
    }

    if (!versionConflict.isEmpty()) {
      logger.warn("Put verification failed. Version conflict! '{}'", parameters.toString());
      notifyFailure();
    } else if ((double) fail.size() < ((double) future.getRawResult().size()) / 2.0) {
      // majority of the contacted nodes responded with ok
      verifyPut();
    } else {
      logger.warn("{} of {} contacted nodes failed.", fail.size(), future.getRawResult().size());
      retryPut();
    }
  }

  /**
   * Retries a put till a certain threshold is reached (see {@link H2HConstants.PUT_RETRIES}). Removes first
   * the possibly succeeded puts. A {@link RetryPutListener} tries to put again the given content.
   */
  private void retryPut() {
    if (putTries++ < H2HConstants.PUT_RETRIES) {
      logger.warn("Put retry #{}. '{}'", putTries, parameters.toString());
      // remove succeeded puts
      FutureRemove futureRemove = dataManager.removeVersionUnblocked(parameters);
      futureRemove.addListener(new BaseFutureAdapter<FutureRemove>() {
        @Override
        public void operationComplete(FutureRemove future) {
          if (future.isFailed())
            logger.warn("Put retry: Could not delete the newly put content. '{}'",
                parameters.toString());

          dataManager.putUnblocked(parameters).addListener(FuturePutListener.this);
        }
      });
    } else {
      logger.error("Put verification failed. Could not put data after {} tries. '{}'", putTries,
          parameters.toString());
      notifyFailure();
    }
  }

  /**
   * Loads digest and triggers a check.
   */
  private void verifyPut() {
    // get data to verify if everything went correct
    FutureDigest digestFuture = dataManager.getDigestUnblocked(parameters);
    digestFuture.addListener(new BaseFutureAdapter<FutureDigest>() {
      @Override
      public void operationComplete(FutureDigest future) throws Exception {
        if (future.isFailed() || future.getRawDigest() == null || future.getRawDigest().isEmpty()) {
          logger.error("Put verification failed. Could not get digest. '{}'", parameters.toString());
          notifyFailure();
        } else {
          checkVersionKey(future.getRawDigest());
        }
      }
    });
  }

  /**
   * Checks if newly put version is listened in the digest. If yes everything is fine. If one peer doesn't
   * contain the new version key it is a sign for a concurrent modification. In this case we have to figure
   * out which newly put version wins.
   *
   * @param rawDigest
   *            raw digest data set
   */
  private void checkVersionKey(Map<PeerAddress, DigestResult> rawDigest) {
    for (PeerAddress peerAddress : rawDigest.keySet()) {
      if (rawDigest.get(peerAddress) == null || rawDigest.get(peerAddress).keyDigest() == null
          || rawDigest.get(peerAddress).keyDigest().isEmpty()) {
        logger.warn("Put verification: Received no digest from peer '{}'. '{}'", peerAddress,
            parameters.toString());
      } else {
        NavigableMap<Number640, Number160> keyDigest = rawDigest.get(peerAddress).keyDigest();

        if (keyDigest.firstEntry().getKey().getVersionKey().equals(parameters.getVersionKey())) {
          logger.trace("Put verification: On peer '{}' entry is newest. '{}'",
              peerAddress, parameters.toString());

        } else if (keyDigest.containsKey(parameters.getKey())) {
          logger.trace("Put verification: On peer '{}' entry exists in history. '{}'",
              peerAddress, parameters.toString());

        } else {
          logger.warn("Put verification: Concurrent modification on peer '{}' happened. '{}'",
              peerAddress, parameters.toString());

          // if version key is older than the other, the version wins
          if (!checkIfMyVerisonWins(keyDigest, peerAddress)) {
            notifyFailure();
            return;
          }
        }
      }
    }
    notifySuccess();
  }

  /**
   * Checks if the new version key is older than the one listened in the digest.
   *
   * @param keyDigest
   *            digest of a peer
   * @param peerAddress
   *            the owner of the digest
   * @return
   *         <code>true</code> if the new version key has precedence to the one listened in the digest,
   *         otherwise <code>false</code>
   */
  protected boolean checkIfMyVerisonWins(NavigableMap<Number640, Number160> keyDigest,
      PeerAddress peerAddress) {
    /* Check if based on entry exists */
    if (!keyDigest.containsKey(new Number640(parameters.getLKey(), parameters.getDKey(), parameters
        .getCKey(), parameters.getData().getBasedOnKey()))) {
      logger.warn("Put verification: Peer '{}' does not contain based on version. '{}'",
          peerAddress, parameters.toString());
      // something is definitely wrong with this peer
      return true;
    } else {
      // figure out the next version based on same version
      Number640 entryBasingOnSameParent = getSuccessor(keyDigest);
      if (entryBasingOnSameParent == null) {
        if (keyDigest.firstKey().getVersionKey().equals(parameters.getData().getBasedOnKey())) {
          logger.error("Put verification: Peer '{}' has no successor version. '{}'",
              peerAddress, parameters.toString());
          // this peer doesn't contain any successor version, with this peer is something wrong
          return true;
        } else {
          logger.error("Put verification: Peer '{}' has a corrupt version history. '{}'", peerAddress,
              parameters.toString());
          return true;
        }
      } else {
        int compare = entryBasingOnSameParent.getVersionKey().compareTo(parameters.getVersionKey());
        if (compare == 0) {
          logger.error("Put verification: Peer '{}' has same version. '{}'",
              peerAddress, parameters.toString());
          return true;
        } else if (compare < 0) {
          logger.warn("Put verification: Peer '{}' has older version. '{}'",
              peerAddress, parameters.toString());
          return false;
        } else {
          logger.warn("Put verification: Peer '{}' has newer version. '{}'",
              peerAddress, parameters.toString());
          return true;
        }
      }
    }
  }

  /**
   * Get the entry which is the parent of the new version.
   *
   * @param keyDigest
   *            a digest containing the parent version (based on)
   * @return the parent of the new version
   */
  private Number640 getSuccessor(NavigableMap<Number640, Number160> keyDigest) {
    Number640 entryBasingOnSameParent = null;
    for (Number640 key : keyDigest.keySet()) {
      if (keyDigest.get(key).equals(parameters.getData().getBasedOnKey())) {
        entryBasingOnSameParent = key;
        break;
      }
    }
    return entryBasingOnSameParent;
  }

  private void notifySuccess() {
    logger.trace("Verification for put completed. '{}'", parameters.toString());
    // everything is ok
    success = true;
    latch.countDown();
  }

  /**
   * Remove first potentially successful puts. Then notify the listener about the fail.
   */
  private void notifyFailure() {
    // remove succeeded puts
    FutureRemove futureRemove = dataManager.removeVersionUnblocked(parameters);
    futureRemove.addListener(new BaseFutureAdapter<FutureRemove>() {
      @Override
      public void operationComplete(FutureRemove future) {
        if (future.isFailed())
          logger.warn("Put retry: Could not delete the newly put content. '{}'",
              parameters.toString());

        success = false;
        latch.countDown();
      }
    });
  }
}
TOP

Related Classes of org.hive2hive.core.network.data.futures.FuturePutListener

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.