Package org.hive2hive.core.processes.implementations.login

Source Code of org.hive2hive.core.processes.implementations.login.ContactOtherClientsStep

package org.hive2hive.core.processes.implementations.login;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import net.tomp2p.peers.PeerAddress;

import org.hive2hive.core.H2HConstants;
import org.hive2hive.core.exceptions.NoPeerConnectionException;
import org.hive2hive.core.exceptions.NoSessionException;
import org.hive2hive.core.model.Locations;
import org.hive2hive.core.network.NetworkManager;
import org.hive2hive.core.network.NetworkUtils;
import org.hive2hive.core.network.data.PublicKeyManager;
import org.hive2hive.core.network.messages.MessageManager;
import org.hive2hive.core.network.messages.direct.ContactPeerMessage;
import org.hive2hive.core.network.messages.direct.response.IResponseCallBackHandler;
import org.hive2hive.core.network.messages.direct.response.ResponseMessage;
import org.hive2hive.core.processes.framework.abstracts.ProcessStep;
import org.hive2hive.core.processes.framework.exceptions.InvalidProcessStateException;
import org.hive2hive.core.processes.framework.exceptions.ProcessExecutionException;
import org.hive2hive.core.processes.implementations.context.LoginProcessContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

// TODO this class should be split up into multiple steps
public class ContactOtherClientsStep extends ProcessStep implements IResponseCallBackHandler {

  private static final Logger logger = LoggerFactory.getLogger(ContactOtherClientsStep.class);

  private final ConcurrentHashMap<PeerAddress, String> evidences = new ConcurrentHashMap<PeerAddress, String>();
  private final ConcurrentHashMap<PeerAddress, Boolean> responses = new ConcurrentHashMap<PeerAddress, Boolean>();
  private CountDownLatch waitForResponses;
  private boolean isUpdated = false;

  private final LoginProcessContext context;
  private final MessageManager messageManager;
  private final NetworkManager networkManager;

  public ContactOtherClientsStep(LoginProcessContext context, NetworkManager networkManager)
      throws NoPeerConnectionException {
    this.context = context;
    this.networkManager = networkManager;
    this.messageManager = networkManager.getMessageManager();
  }

  @Override
  protected void doExecute() throws InvalidProcessStateException, ProcessExecutionException {
    PublicKeyManager keyManager;
    try {
      keyManager = networkManager.getSession().getKeyManager();
    } catch (NoSessionException e) {
      throw new ProcessExecutionException("No session yet");
    }

    Locations locations = context.consumeLocations();
    waitForResponses = new CountDownLatch(locations.getPeerAddresses().size());
    if (!locations.getPeerAddresses().isEmpty()) {
      for (PeerAddress address : locations.getPeerAddresses()) {
        // contact all other clients (exclude self)
        if (!address.equals(networkManager.getConnection().getPeer().getPeerAddress())) {
          String evidence = UUID.randomUUID().toString();
          evidences.put(address, evidence);

          ContactPeerMessage message = new ContactPeerMessage(address, evidence);
          message.setCallBackHandler(this);

          // TODO this is blocking, should be parallel (asynchronous)
          boolean success = messageManager.sendDirect(message, keyManager.getOwnPublicKey());
          if (!success) {
            responses.put(address, false);
          }
        }
      }
    }

    // wait (blocking) until all responses are here or the time's up
    try {
      waitForResponses.await(H2HConstants.CONTACT_PEERS_AWAIT_MS, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
      logger.error("Could not wait the given time for the clients to respond.", e);
    }
    updateLocations();
  }

  @Override
  public void handleResponseMessage(ResponseMessage responseMessage) {
    if (isUpdated) {
      // TODO notify delayed response client nodes about removing him from location map
      logger.warn(
          "Received a delayed contact peer response message, which gets ignored. Peer address = '{}'.",
          responseMessage.getSenderAddress());
      return;
    }

    // verify response
    if (evidences.get(responseMessage.getSenderAddress()).equals((String) responseMessage.getContent())) {
      responses.put(responseMessage.getSenderAddress(), true);
      waitForResponses.countDown();
    } else {
      logger.error(
          "Received during liveness check of other clients a wrong evidence content. Responding node = '{}'.",
          responseMessage.getSenderAddress());
    }
  }

  private void updateLocations() {
    isUpdated = true;

    Locations updatedLocations = new Locations(context.consumeLocations().getUserId());
    updatedLocations.setBasedOnKey(context.consumeLocations().getBasedOnKey());
    updatedLocations.setVersionKey(context.consumeLocations().getVersionKey());

    // add addresses that responded and self
    for (PeerAddress address : responses.keySet()) {
      if (responses.get(address)) {
        updatedLocations.addPeerAddress(address);
      }
    }
    updatedLocations.addPeerAddress(networkManager.getConnection().getPeer().getPeerAddress());
    context.provideLocations(updatedLocations);

    // evaluate if initial
    List<PeerAddress> clientAddresses = new ArrayList<PeerAddress>(updatedLocations.getPeerAddresses());

    if (NetworkUtils.choseFirstPeerAddress(clientAddresses).equals(
        networkManager.getConnection().getPeer().getPeerAddress())) {
      context.setIsInitial(true);
    } else {
      context.setIsInitial(false);
    }
  }

}
TOP

Related Classes of org.hive2hive.core.processes.implementations.login.ContactOtherClientsStep

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.