Package com.orientechnologies.orient.enterprise.channel.binary

Source Code of com.orientechnologies.orient.enterprise.channel.binary.OChannelBinary

/*
  *
  *  *  Copyright 2014 Orient Technologies LTD (info(at)orientechnologies.com)
  *  *
  *  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  *  you may not use this file except in compliance with the License.
  *  *  You may obtain a copy of the License at
  *  *
  *  *       http://www.apache.org/licenses/LICENSE-2.0
  *  *
  *  *  Unless required by applicable law or agreed to in writing, software
  *  *  distributed under the License is distributed on an "AS IS" BASIS,
  *  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  *  *  See the License for the specific language governing permissions and
  *  *  limitations under the License.
  *  *
  *  * For more information: http://www.orientechnologies.com
  *
  */
package com.orientechnologies.orient.enterprise.channel.binary;

import com.orientechnologies.common.io.OIOException;
import com.orientechnologies.common.log.OLogManager;
import com.orientechnologies.orient.core.config.OContextConfiguration;
import com.orientechnologies.orient.core.config.OGlobalConfiguration;
import com.orientechnologies.orient.core.id.OClusterPosition;
import com.orientechnologies.orient.core.id.OClusterPositionFactory;
import com.orientechnologies.orient.core.id.ORID;
import com.orientechnologies.orient.core.id.ORecordId;
import com.orientechnologies.orient.core.serialization.OBinaryProtocol;
import com.orientechnologies.orient.core.version.ODistributedVersion;
import com.orientechnologies.orient.core.version.ORecordVersion;
import com.orientechnologies.orient.core.version.OVersionFactory;
import com.orientechnologies.orient.enterprise.channel.OChannel;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

public abstract class OChannelBinary extends OChannel {
  private static final int MAX_LENGTH_DEBUG = 150;
  protected final boolean  debug;
  private final int        maxChunkSize;
  private final byte[]     buffer;
  public DataInputStream   in;
  public DataOutputStream  out;

  public OChannelBinary(final Socket iSocket, final OContextConfiguration iConfig) throws IOException {
    super(iSocket, iConfig);

    maxChunkSize = iConfig.getValueAsInteger(OGlobalConfiguration.NETWORK_BINARY_MAX_CONTENT_LENGTH);
    debug = iConfig.getValueAsBoolean(OGlobalConfiguration.NETWORK_BINARY_DEBUG);
    buffer = new byte[maxChunkSize];

    if (debug)
      OLogManager.instance().info(this, "%s - Connected", socket.getRemoteSocketAddress());
  }

  public byte readByte() throws IOException {
    updateMetricReceivedBytes(OBinaryProtocol.SIZE_BYTE);

    if (debug) {
      OLogManager.instance().info(this, "%s - Reading byte (1 byte)...", socket.getRemoteSocketAddress());
      final byte value = in.readByte();
      OLogManager.instance().info(this, "%s - Read byte: %d", socket.getRemoteSocketAddress(), (int) value);
      return value;
    }

    return in.readByte();
  }

  public boolean readBoolean() throws IOException {
    updateMetricReceivedBytes(OBinaryProtocol.SIZE_BYTE);

    if (debug) {
      OLogManager.instance().info(this, "%s - Reading boolean (1 byte)...", socket.getRemoteSocketAddress());
      final boolean value = in.readBoolean();
      OLogManager.instance().info(this, "%s - Read boolean: %b", socket.getRemoteSocketAddress(), value);
      return value;
    }

    return in.readBoolean();
  }

  public int readInt() throws IOException {
    updateMetricReceivedBytes(OBinaryProtocol.SIZE_INT);

    if (debug) {
      OLogManager.instance().info(this, "%s - Reading int (4 bytes)...", socket.getRemoteSocketAddress());
      final int value = in.readInt();
      OLogManager.instance().info(this, "%s - Read int: %d", socket.getRemoteSocketAddress(), value);
      return value;
    }

    return in.readInt();
  }

  public long readLong() throws IOException {
    updateMetricReceivedBytes(OBinaryProtocol.SIZE_LONG);

    if (debug) {
      OLogManager.instance().info(this, "%s - Reading long (8 bytes)...", socket.getRemoteSocketAddress());
      final long value = in.readLong();
      OLogManager.instance().info(this, "%s - Read long: %d", socket.getRemoteSocketAddress(), value);
      return value;
    }

    return in.readLong();
  }

  public short readShort() throws IOException {
    updateMetricReceivedBytes(OBinaryProtocol.SIZE_SHORT);

    if (debug) {
      OLogManager.instance().info(this, "%s - Reading short (2 bytes)...", socket.getRemoteSocketAddress());
      final short value = in.readShort();
      OLogManager.instance().info(this, "%s - Read short: %d", socket.getRemoteSocketAddress(), value);
      return value;
    }

    return in.readShort();
  }

  public String readString() throws IOException {
    if (debug) {
      OLogManager.instance().info(this, "%s - Reading string (4+N bytes)...", socket.getRemoteSocketAddress());
      final int len = in.readInt();
      if (len < 0)
        return null;

      // REUSE STATIC BUFFER?
      final byte[] tmp = new byte[len];
      in.readFully(tmp);

      updateMetricReceivedBytes(OBinaryProtocol.SIZE_INT + len);

      final String value = new String(tmp);
      OLogManager.instance().info(this, "%s - Read string: %s", socket.getRemoteSocketAddress(), value);
      return value;
    }

    final int len = in.readInt();
    if (len < 0)
      return null;

    final byte[] tmp = new byte[len];
    in.readFully(tmp);

    updateMetricReceivedBytes(OBinaryProtocol.SIZE_INT + len);

    return new String(tmp);
  }

  public byte[] readBytes() throws IOException {
    if (debug)
      OLogManager.instance().info(this, "%s - Reading chunk of bytes. Reading chunk length as int (4 bytes)...",
          socket.getRemoteSocketAddress());

    final int len = in.readInt();
    updateMetricReceivedBytes(OBinaryProtocol.SIZE_INT + len);

    if (debug)
      OLogManager.instance().info(this, "%s - Read chunk lenght: %d", socket.getRemoteSocketAddress(), len);

    if (len < 0)
      return null;

    if (debug)
      OLogManager.instance().info(this, "%s - Reading %d bytes...", socket.getRemoteSocketAddress(), len);

    // REUSE STATIC BUFFER?
    final byte[] tmp = new byte[len];
    in.readFully(tmp);

    if (debug)
      OLogManager.instance().info(this, "%s - Read %d bytes: %s", socket.getRemoteSocketAddress(), len, new String(tmp));

    return tmp;
  }

  public List<String> readStringList() throws IOException {
    if (debug)
      OLogManager.instance().info(this, "%s - Reading string list. Reading string list items as int (4 bytes)...",
          socket.getRemoteSocketAddress());

    final int items = in.readInt();
    updateMetricReceivedBytes(OBinaryProtocol.SIZE_INT);

    if (debug)
      OLogManager.instance().info(this, "%s - Read string list items: %d", socket.getRemoteSocketAddress(), items);

    if (items < 0)
      return null;

    List<String> result = new ArrayList<String>();
    for (int i = 0; i < items; ++i)
      result.add(readString());

    if (debug)
      OLogManager.instance().info(this, "%s - Read string list with %d items: %d", socket.getRemoteSocketAddress(), items);

    return result;
  }

  public Set<String> readStringSet() throws IOException {
    if (debug)
      OLogManager.instance().info(this, "%s - Reading string set. Reading string set items as int (4 bytes)...",
          socket.getRemoteSocketAddress());

    int items = in.readInt();
    updateMetricReceivedBytes(OBinaryProtocol.SIZE_INT);

    if (debug)
      OLogManager.instance().info(this, "%s - Read string set items: %d", socket.getRemoteSocketAddress(), items);

    if (items < 0)
      return null;

    Set<String> result = new HashSet<String>();
    for (int i = 0; i < items; ++i)
      result.add(readString());

    if (debug)
      OLogManager.instance().info(this, "%s - Read string set with %d items: %d", socket.getRemoteSocketAddress(), items, result);

    return result;
  }

  public ORecordId readRID() throws IOException {
    final int clusterId = readShort();

    final OClusterPosition clusterPosition = readClusterPosition();
    return new ORecordId(clusterId, clusterPosition);
  }

  public OClusterPosition readClusterPosition() throws IOException {
    final int serializedSize = OClusterPositionFactory.INSTANCE.getSerializedSize();

    if (debug)
      OLogManager.instance().info(this, "%s - Reading cluster position (%d bytes)....", socket.getRemoteSocketAddress(),
          serializedSize);

    final OClusterPosition clusterPosition = OClusterPositionFactory.INSTANCE.fromStream((InputStream) in);

    updateMetricReceivedBytes(serializedSize);

    if (debug)
      OLogManager.instance().info(this, "%s - Read cluster position: %s", socket.getRemoteSocketAddress(), clusterPosition);

    return clusterPosition;
  }

  public OChannelBinary writeClusterPosition(final OClusterPosition clusterPosition) throws IOException {
    final int serializedSize = OClusterPositionFactory.INSTANCE.getSerializedSize();

    if (debug)
      OLogManager.instance().info(this, "%s - Writing cluster position (%d bytes) : %s....", socket.getRemoteSocketAddress(),
          serializedSize, clusterPosition);

    out.write(clusterPosition.toStream());

    updateMetricTransmittedBytes(serializedSize);

    return this;
  }

  public ORecordVersion readVersion() throws IOException {
    if (OVersionFactory.instance().isDistributed()) {
      final int recordVersion = readInt();
      final long timestamp = readLong();
      final long macAddress = readLong();
      return OVersionFactory.instance().createDistributedVersion(recordVersion, timestamp, macAddress);
    } else {
      final ORecordVersion version = OVersionFactory.instance().createVersion();
      version.setCounter(readInt());
      return version;
    }
  }

  public OChannelBinary writeByte(final byte iContent) throws IOException {
    if (debug)
      OLogManager.instance().info(this, "%s - Writing byte (1 byte): %d", socket.getRemoteSocketAddress(), iContent);

    out.write(iContent);
    updateMetricTransmittedBytes(OBinaryProtocol.SIZE_BYTE);
    return this;
  }

  public OChannelBinary writeBoolean(final boolean iContent) throws IOException {
    if (debug)
      OLogManager.instance().info(this, "%s - Writing boolean (1 byte): %b", socket.getRemoteSocketAddress(), iContent);

    out.writeBoolean(iContent);
    updateMetricTransmittedBytes(OBinaryProtocol.SIZE_BYTE);
    return this;
  }

  public OChannelBinary writeInt(final int iContent) throws IOException {
    if (debug)
      OLogManager.instance().info(this, "%s - Writing int (4 bytes): %d", socket.getRemoteSocketAddress(), iContent);

    out.writeInt(iContent);
    updateMetricTransmittedBytes(OBinaryProtocol.SIZE_INT);
    return this;
  }

  public OChannelBinary writeLong(final long iContent) throws IOException {
    if (debug)
      OLogManager.instance().info(this, "%s - Writing long (8 bytes): %d", socket.getRemoteSocketAddress(), iContent);

    out.writeLong(iContent);
    updateMetricTransmittedBytes(OBinaryProtocol.SIZE_LONG);
    return this;
  }

  public OChannelBinary writeShort(final short iContent) throws IOException {
    if (debug)
      OLogManager.instance().info(this, "%s - Writing short (2 bytes): %d", socket.getRemoteSocketAddress(), iContent);

    out.writeShort(iContent);
    updateMetricTransmittedBytes(OBinaryProtocol.SIZE_SHORT);
    return this;
  }

  public OChannelBinary writeString(final String iContent) throws IOException {
    if (debug)
      OLogManager.instance().info(this, "%s - Writing string (4+%d=%d bytes): %s", socket.getRemoteSocketAddress(),
          iContent != null ? iContent.length() : 0, iContent != null ? iContent.length() + 4 : 4, iContent);

    if (iContent == null) {
      out.writeInt(-1);
      updateMetricTransmittedBytes(OBinaryProtocol.SIZE_INT);
    } else {
      final byte[] buffer = iContent.getBytes();
      out.writeInt(buffer.length);
      out.write(buffer, 0, buffer.length);
      updateMetricTransmittedBytes(OBinaryProtocol.SIZE_INT + buffer.length);
    }

    return this;
  }

  public OChannelBinary writeBytes(final byte[] iContent) throws IOException {
    return writeBytes(iContent, iContent != null ? iContent.length : 0);
  }

  public OChannelBinary writeBytes(final byte[] iContent, final int iLength) throws IOException {
    if (debug)
      OLogManager.instance().info(this, "%s - Writing bytes (4+%d=%d bytes): %s", socket.getRemoteSocketAddress(), iLength,
          iLength + 4, Arrays.toString(iContent));

    if (iContent == null) {
      out.writeInt(-1);
      updateMetricTransmittedBytes(OBinaryProtocol.SIZE_INT);
    } else {
      out.writeInt(iLength);
      out.write(iContent, 0, iLength);
      updateMetricTransmittedBytes(OBinaryProtocol.SIZE_INT + iLength);
    }
    return this;
  }

  public OChannelBinary writeCollectionString(final Collection<String> iCollection) throws IOException {
    if (debug)
      OLogManager.instance().info(this, "%s - Writing strings (4+%d=%d items): %s", socket.getRemoteSocketAddress(),
          iCollection != null ? iCollection.size() : 0, iCollection != null ? iCollection.size() + 4 : 4, iCollection!=null? iCollection.toString():"null");

    updateMetricTransmittedBytes(OBinaryProtocol.SIZE_INT);
    if (iCollection == null)
      writeInt(-1);
    else {
      writeInt(iCollection.size());

      for (String s : iCollection)
        writeString(s);
    }

    return this;
  }

  public void writeRID(final ORID iRID) throws IOException {
    writeShort((short) iRID.getClusterId());
    writeClusterPosition(iRID.getClusterPosition());
  }

  public void writeVersion(final ORecordVersion version) throws IOException {
    if (version instanceof ODistributedVersion) {
      final ODistributedVersion v = (ODistributedVersion) version;
      writeInt(v.getCounter());
      writeLong(v.getTimestamp());
      writeLong(v.getMacAddress());
    } else {
      // Usual serialization
      writeInt(version.getCounter());
    }
  }

  public void clearInput() throws IOException {
    if (in == null)
      return;

    final StringBuilder dirtyBuffer = new StringBuilder(MAX_LENGTH_DEBUG);
    int i = 0;
    while (in.available() > 0) {
      char c = (char) in.read();
      ++i;

      if (dirtyBuffer.length() < MAX_LENGTH_DEBUG)
        dirtyBuffer.append(c);
    }
    updateMetricReceivedBytes(i);

    OLogManager.instance().error(
        this,
        "Received unread response from " + socket.getRemoteSocketAddress()
            + " probably corrupted data from the network connection. Cleared dirty data in the buffer (" + i + " bytes): ["
            + dirtyBuffer + (i > dirtyBuffer.length() ? "..." : "") + "]", OIOException.class);
  }

  @Override
  public void flush() throws IOException {
    if (debug)
      OLogManager.instance().info(this, "%s - Flush", socket.getRemoteSocketAddress());

    updateMetricFlushes();

    super.flush();
    if (out != null)
      out.flush();
  }

  @Override
  public void close() {
    if (debug)
      OLogManager.instance().info(this, "%s - Closing socket...", socket.getRemoteSocketAddress());

    try {
      if (in != null) {
        in.close();
        // in = null;
      }
    } catch (IOException e) {
    }

    try {
      if (out != null) {
        out.close();
        // out = null;
      }
    } catch (IOException e) {
    }

    super.close();
  }

  public byte[] getBuffer() {
    return buffer;
  }

  public int getMaxChunkSize() {
    return maxChunkSize;
  }
}
TOP

Related Classes of com.orientechnologies.orient.enterprise.channel.binary.OChannelBinary

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.