Package org.objectweb.joram.mom.proxies

Source Code of org.objectweb.joram.mom.proxies.ClientContext$MultiReplyContext

/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2003 - 200 ScalAgent Distributed Technologies
* Copyright (C) 2004 France Telecom R&D
* Copyright (C) 2003 - 2004 Bull SA
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
* USA.
*
* Initial developer(s): Frederic Maistre (Bull)
* Contributor(s): ScalAgent Distributed Technologies
*/
package org.objectweb.joram.mom.proxies;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.Vector;

import org.objectweb.joram.shared.client.AbstractJmsReply;
import org.objectweb.joram.shared.client.XACnxPrepare;
import org.objectweb.util.monolog.api.BasicLevel;
import org.objectweb.util.monolog.api.Logger;

import fr.dyade.aaa.agent.AgentId;
import fr.dyade.aaa.common.Debug;

/**
* The <code>ClientContext</code> class holds the data related to a client
* context.
*/
class ClientContext implements java.io.Serializable {
  /** define serialVersionUID for interoperability */
  private static final long serialVersionUID = 1L;
 
  /** logger */
  public static Logger logger = Debug.getLogger(ClientContext.class.getName());

  /** The proxy's agent identifier. */
  private AgentId proxyId;

  /** Context identifier. */
  private int id;
  /** Vector of temporary destinations. */
  private Vector tempDestinations;
  /** Identifiers of queues delivering messages. */
  private Hashtable deliveringQueues;
  /** Prepared transactions objects waiting for commit. */
  private Hashtable transactionsTable;

  /** <code>true</code> if the context is activated. */
  private transient boolean started;
  /**
   * Identifier of a cancelled "receive" request, set when a PTP listener has
   * been unset.
   */
  private transient int cancelledRequestId;
  /** Vector of active subscriptions' names. */
  private transient Vector activeSubs;
  /** Pending replies waiting for the context to be activated. */
  private transient Vector repliesBuffer;
  /** Contexts waiting for the replies from some local agents*/
  private transient Hashtable commitTable;
 
  private transient ProxyAgentItf proxy;

  /**
   * Constructs a <code>ClientContext</code> instance.
   *
   * @param proxyId  The proxy's agent identifier.
   * @param id  Identifier of the context.
   */
  ClientContext(AgentId proxyId, int id) {
    this.proxyId = proxyId;
    this.id = id;

    tempDestinations = new Vector();
    deliveringQueues = new Hashtable();

    started = false;
    cancelledRequestId = -1;
    activeSubs = new Vector();
    repliesBuffer = new Vector();
  }
 
  void setProxyAgent(ProxyAgentItf px) {
    proxy = px;
  }
  /** Returns the identifier of the context. */
  int getId() {
    return id;
  }

  /** Sets the activation status of the context. */
  void setActivated(boolean started) {
    if (logger.isLoggable(BasicLevel.DEBUG))
      logger.log(BasicLevel.DEBUG,
                 "ClientContext[" + proxyId + ',' + id + "].setActivated(" + started + ')');
    this.started = started;
  }

  /** Returns <code>true</code> if the context is activated. */
  boolean getActivated()
  {
    return started;
  }

  /** Adds a temporary destination identifier. */
  void addTemporaryDestination(AgentId destId) {
    tempDestinations.add(destId);
    proxy.setSave();
  }
  
  Iterator getTempDestinations() {
    //Creates an enumeration not backed by tempDestinations
    Vector tempDests = new Vector();
    for (Iterator dests = tempDestinations.iterator(); dests.hasNext();) {
      tempDests.addElement(dests.next());
    }
    return tempDests.iterator();
  }

  /** Removes a temporary destination identifier. */
  void removeTemporaryDestination(AgentId destId) {
    deliveringQueues.remove(destId);
    tempDestinations.remove(destId);
    proxy.setSave();
  }

  /** Adds a pending delivery. */
  void addPendingDelivery(AbstractJmsReply reply) {
    if (logger.isLoggable(BasicLevel.DEBUG))
      logger.log(BasicLevel.DEBUG,
                 "ClientContext[" + proxyId + ',' + id + "].addPendingDelivery(" + reply + ')');
    repliesBuffer.add(reply);
  }

  /** Returns the pending deliveries. */
  Iterator getPendingDeliveries() {
    return repliesBuffer.iterator();
  }

  /** Clears the pending deliveries buffer. */
  void clearPendingDeliveries() {
    repliesBuffer.clear();
  }

  /** Adds an active subscription name. */
  void addSubName(String subName) {
    activeSubs.add(subName);
  }

  /** Returns the active subscriptions' names. */
  Iterator getActiveSubs() {
    return activeSubs.iterator();
  }

  /** Removes an active subscription name. */
  void removeSubName(String subName) {
    activeSubs.remove(subName);
  }
 
  /** Cancels a "receive" request. */
  void cancelReceive(int cancelledRequestId) {
    if (logger.isLoggable(BasicLevel.DEBUG))
      logger.log(BasicLevel.DEBUG,
                 "ClientContext[" + proxyId + ':' + id + "].cancelReceive(" + cancelledRequestId + ')');
    this.cancelledRequestId = cancelledRequestId;
  }

  /** Returns the cancelled "receive" request identifier. */
  int getCancelledReceive() {
    return cancelledRequestId;
  }

  /** Adds the identifier of a delivering queue. */
  void addDeliveringQueue(AgentId queueId) {
    deliveringQueues.put(queueId, queueId);
    proxy.setSave();
  }

  /** Returns the identifiers of the delivering queues. */
  Iterator getDeliveringQueues() {
    return deliveringQueues.keySet().iterator();
  }
 
  /**
   * Some requests may require to wait for several
   * SendReplyNot notifications before replying to the client.
   *
   * @param requestId
   * @param asyncReplyCount
   */
  void addMultiReplyContext(int requestId, int asyncReplyCount) {
    if (logger.isLoggable(BasicLevel.DEBUG))
      logger.log(BasicLevel.DEBUG,
                 "ClientContext[" + proxyId + ':' + id + "].addMultiReplyContext(" + requestId + ',' + asyncReplyCount + ')');
    if (commitTable == null) commitTable = new Hashtable();
    commitTable.put(new Integer(requestId), new MultiReplyContext(asyncReplyCount));
    proxy.setSave();
  }
 
  /**
   * Called by UserAgent when a SendReplyNot
   * arrived.
   *
   * @param requestId
   * @return
   * > 0 if there are still some pending replies
   * 0 if all the replies arrived (the context is removed)
   * or if the context doesn't exist
   */
  int setReply(int requestId) {
    if (logger.isLoggable(BasicLevel.DEBUG))
      logger.log(BasicLevel.DEBUG,
                 "ClientContext[" + proxyId + ':' + id + "].setReply(" + requestId + ')');
    if (commitTable == null) return 0;
    Integer ctxKey = new Integer(requestId);
    MultiReplyContext ctx = (MultiReplyContext)commitTable.get(ctxKey);
    if (ctx == null) return 0;

    ctx.counter--;
    if (ctx.counter == 0) {
      commitTable.remove(ctxKey);
      proxy.setSave();
    }
    return ctx.counter;
  }
 
  static class MultiReplyContext {
    public int counter;
   
    MultiReplyContext(int c) {
      counter = c;
    }
  }

  /** Registers a given transaction "prepare". */
  void registerTxPrepare(Object key, XACnxPrepare prepare) throws Exception {
    if (transactionsTable == null)
      transactionsTable = new Hashtable();

    if (! transactionsTable.containsKey(key)) {
      transactionsTable.put(key, prepare);
      proxy.setSave();
    } else
      throw new Exception("Prepare request already received by "
                          + "TM for this transaction.");
  }

  /** Returns and deletes a given transaction "prepare". */
  XACnxPrepare getTxPrepare(Object key) {
    XACnxPrepare prepare = null;
    if (transactionsTable != null) {
      prepare = (XACnxPrepare) transactionsTable.remove(key);
      proxy.setSave();
    }
    return prepare;
  }

  /**
   *
   * @param key XID
   * @return true if prepared transation.
   */
  public boolean isPrepared(Object key) {
    return transactionsTable.containsKey(key);
  }
 
  /** Returns the identifiers of the prepared transactions. */
  Iterator getTxIds() {
    if (transactionsTable == null)
      return new Hashtable().keySet().iterator();
    return transactionsTable.keySet().iterator();
  }

  public void readBag(ObjectInputStream in)
    throws IOException, ClassNotFoundException {
    started = in.readBoolean();
    cancelledRequestId = in.readInt();
    activeSubs = (Vector)in.readObject();
    repliesBuffer = (Vector)in.readObject();
  }

  public void writeBag(ObjectOutputStream out) throws IOException {
    out.writeBoolean(started);
    out.writeInt(cancelledRequestId);
    out.writeObject(activeSubs);
    out.writeObject(repliesBuffer);
  }

  public String toString() {
    StringBuffer buff = new StringBuffer();
    buff.append("ClientContext (proxyId=");
    buff.append(proxyId);
    buff.append(",id=");
    buff.append(id);
    buff.append(",tempDestinations=");
    buff.append(tempDestinations);
    buff.append(",deliveringQueues=");
    buff.append(deliveringQueues);
    buff.append(",transactionsTable=");
    buff.append(transactionsTable);
    buff.append(",started=");
    buff.append(started);
    buff.append(",cancelledRequestId=");
    buff.append(cancelledRequestId);
    buff.append(",activeSubs=");
    buff.append(activeSubs);
    buff.append(",repliesBuffer=");
    buff.append(repliesBuffer);
    buff.append(')');
    return buff.toString();
  }
}
TOP

Related Classes of org.objectweb.joram.mom.proxies.ClientContext$MultiReplyContext

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.