Package org.hive2hive.core.network.data

Source Code of org.hive2hive.core.network.data.UserProfileManager$QueueWorker

package org.hive2hive.core.network.data;

import java.security.KeyPair;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import org.hive2hive.core.exceptions.GetFailedException;
import org.hive2hive.core.exceptions.PutFailedException;
import org.hive2hive.core.model.UserProfile;
import org.hive2hive.core.security.UserCredentials;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Manages the user profile resource. Each process waiting for get / put is added to a queue and delivered in
* order.
*
* @author Nico, Seppi
*
*/
public class UserProfileManager {

  private final static Logger logger = LoggerFactory.getLogger(UserProfileManager.class);
  private static final long MAX_MODIFICATION_TIME = 1000;

  private final UserProfileHolder profileHolder;
  private final UserCredentials credentials;

  private final Object queueWaiter = new Object();
  private final QueueWorker worker = new QueueWorker();
  private final Queue<QueueEntry> readOnlyQueue = new ConcurrentLinkedQueue<QueueEntry>();
  private final Queue<PutQueueEntry> modifyQueue = new ConcurrentLinkedQueue<PutQueueEntry>();

  private volatile PutQueueEntry modifying;

  private final AtomicBoolean running;;
  private KeyPair defaultProtectionKey = null;

  public UserProfileManager(DataManager dataManager, UserCredentials credentials) {
    this.credentials = credentials;
    profileHolder = new UserProfileHolder(credentials, dataManager);
    running = new AtomicBoolean(true);

    Thread thread = new Thread(worker);
    thread.setName("UP queue");
    thread.start();
  }

  public void stopQueueWorker() {
    running.set(false);
    synchronized (queueWaiter) {
      queueWaiter.notify();
    }
  }

  public UserCredentials getUserCredentials() {
    return credentials;
  }

  /**
   * Gets the user profile. The call blocks until the most recent profile is here.
   *
   * @param pid the process identifier
   * @param intendsToPut whether the process intends modifying and putting the user profile. After the
   *            get-call, the profile has a given time to make its modification.
   * @param the user profile
   * @throws GetFailedException if the profile cannot be fetched
   */
  public UserProfile getUserProfile(String pid, boolean intendsToPut) throws GetFailedException {
    QueueEntry entry;

    if (intendsToPut) {
      PutQueueEntry putEntry = new PutQueueEntry(pid);
      modifyQueue.add(putEntry);
      entry = putEntry;
    } else {
      entry = new QueueEntry(pid);
      readOnlyQueue.add(entry);
    }

    synchronized (queueWaiter) {
      queueWaiter.notify();
    }

    try {
      entry.waitForGet();
    } catch (GetFailedException e) {
      // just stop the modification if an error occurs.
      if (intendsToPut)
        stopModification(pid);
      throw e;
    }

    UserProfile profile = entry.getUserProfile();
    if (profile == null)
      throw new GetFailedException("User Profile not found");
    return profile;
  }

  /**
   * A process notifies that he is ready to put the new profile. Note that the profile in the argument must
   * be a modification of the profile in the DHT.
   *
   * @param profile the modified user profile
   * @param pid the process identifier
   * @throws PutFailedException if putting has failed (because of network errors or the profile is invalid).
   *             An error is also thrown when the process is not allowed to put (because he did not register
   *             himself as intending to put)
   */
  public void readyToPut(UserProfile profile, String pid) throws PutFailedException {
    if (modifying != null && modifying.equals(pid)) {
      modifying.setUserProfile(profile);
      modifying.readyToPut();
      modifying.waitForPut();
    } else {
      throw new PutFailedException("Not allowed to put anymore");
    }
  }

  /**
   * Notifies that a process is done with a modification on the user profile.
   */
  private void stopModification(String pid) {
    // test whether is the current modifying process
    if (modifying != null && modifying.equals(pid)) {
      modifying.abort();
    }
  }

  /**
   * Get the default content protection keys. If called first time the method is called first time the user
   * profile gets loaded from network and the default protection key temporally gets stored for further
   * gets.
   *
   * @return the default content protection keys
   * @throws GetFailedException
   */
  public KeyPair getDefaultProtectionKey() throws GetFailedException {
    if (defaultProtectionKey == null) {
      UserProfile userProfile = getUserProfile(UUID.randomUUID().toString(), false);
      defaultProtectionKey = userProfile.getProtectionKeys();
    }
    return defaultProtectionKey;
  }

  private class QueueWorker implements Runnable {

    @Override
    public void run() {
      while (running.get()) { // run forever
        // modifying processes have advantage here because the read-only processes can profit
        if (modifyQueue.isEmpty() && readOnlyQueue.isEmpty()) {
          synchronized (queueWaiter) {
            try {
              queueWaiter.wait();
            } catch (InterruptedException e) {
              // ignore
            }
          }
        } else if (modifyQueue.isEmpty()) {
          logger.trace("{} process(es) are waiting for read-only access.", readOnlyQueue.size());
          // a process wants to read
          QueueEntry entry = readOnlyQueue.peek();

          profileHolder.get(entry);

          logger.trace("Notifying {} processes that newest profile is ready.", readOnlyQueue.size());
          // notify all read only processes
          while (!readOnlyQueue.isEmpty()) {
            // copy user profile and errors to other entries
            QueueEntry readOnly = readOnlyQueue.poll();
            readOnly.setUserProfile(entry.getUserProfile());
            readOnly.setGetError(entry.getGetError());
            readOnly.notifyGet();
          }
        } else {
          // a process wants to modify
          modifying = modifyQueue.poll();
          logger.trace("Process {} is waiting to make profile modifications.", modifying.getPid());
          profileHolder.get(modifying);
          logger.trace("Notifying {} processes (inclusive process {}) to get newest profile.",
              readOnlyQueue.size() + 1, modifying.getPid());

          modifying.notifyGet();
          // notify all read only processes
          while (!readOnlyQueue.isEmpty()) {
            // copy user profile and errors to other entries
            QueueEntry readOnly = readOnlyQueue.poll();
            readOnly.setUserProfile(modifying.getUserProfile());
            readOnly.setGetError(modifying.getGetError());
            readOnly.notifyGet();
          }

          int counter = 0;
          long sleepTime = MAX_MODIFICATION_TIME / 10;
          while (counter < 10 && !modifying.isReadyToPut() && !modifying.isAborted()) {
            try {
              Thread.sleep(sleepTime);
            } catch (InterruptedException e) {
              // ignore
            } finally {
              counter++;
            }
          }

          if (modifying.isReadyToPut()) {
            // is ready to put
            logger.trace("Process {} made modifcations and uploads them now.", modifying.getPid());
            profileHolder.put(modifying);
          } else if (!modifying.isAborted()) {
            // request is not ready to put and has not been aborted
            logger.warn("Process {} never finished doing modifications. Abort the put request.",
                modifying.getPid());
            modifying.abort();
            modifying.setPutError(new PutFailedException("Too long modification. Only " + MAX_MODIFICATION_TIME
                + "ms are allowed."));
            modifying.notifyPut();
          }
        }
      }

      logger.debug("Queue worker stopped. user id = '{}'", credentials.getUserId());
    }

  }
}
TOP

Related Classes of org.hive2hive.core.network.data.UserProfileManager$QueueWorker

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.