package org.apache.hedwig.server.topics;
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
import java.io.BufferedReader;
import java.io.IOException;
import java.io.StringReader;
import org.apache.hedwig.protocol.PubSubProtocol.HubInfoData;
import org.apache.hedwig.util.HedwigSocketAddress;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.TextFormat;
/**
* Info identifies a hub server.
*/
public class HubInfo {
public static class InvalidHubInfoException extends Exception {
public InvalidHubInfoException(String msg) {
super(msg);
}
public InvalidHubInfoException(String msg, Throwable t) {
super(msg, t);
}
}
// address identify a hub server
final HedwigSocketAddress addr;
// its znode czxid
final long czxid;
// protobuf encoded hub info data to be serialized
HubInfoData hubInfoData;
public HubInfo(HedwigSocketAddress addr, long czxid) {
this(addr, czxid, null);
}
protected HubInfo(HedwigSocketAddress addr, long czxid,
HubInfoData data) {
this.addr = addr;
this.czxid = czxid;
this.hubInfoData = data;
}
public HedwigSocketAddress getAddress() {
return addr;
}
public long getZxid() {
return czxid;
}
private synchronized HubInfoData getHubInfoData() {
if (null == hubInfoData) {
hubInfoData = HubInfoData.newBuilder().setHostname(addr.toString())
.setCzxid(czxid).build();
}
return hubInfoData;
}
@Override
public String toString() {
return TextFormat.printToString(getHubInfoData());
}
@Override
public boolean equals(Object o) {
if (null == o) {
return false;
}
if (!(o instanceof HubInfo)) {
return false;
}
HubInfo other = (HubInfo)o;
if (null == addr) {
if (null == other.addr) {
return true;
} else {
return czxid == other.czxid;
}
} else {
if (addr.equals(other.addr)) {
return czxid == other.czxid;
} else {
return false;
}
}
}
@Override
public int hashCode() {
return addr.hashCode();
}
/**
* Parse hub info from a string.
*
* @param hubInfoStr
* String representation of hub info
* @return hub info
* @throws InvalidHubInfoException when <code>hubInfoStr</code> is not a valid
* string representation of hub info.
*/
public static HubInfo parse(String hubInfoStr) throws InvalidHubInfoException {
// it is not protobuf encoded hub info, it might be generated by ZkTopicManager
if (!hubInfoStr.startsWith("hostname")) {
final HedwigSocketAddress owner;
try {
owner = new HedwigSocketAddress(hubInfoStr);
} catch (Exception e) {
throw new InvalidHubInfoException("Corrupted hub server address : " + hubInfoStr, e);
}
return new HubInfo(owner, 0L);
}
// it is a protobuf encoded hub info.
HubInfoData hubInfoData;
try {
BufferedReader reader = new BufferedReader(
new StringReader(hubInfoStr));
HubInfoData.Builder dataBuilder = HubInfoData.newBuilder();
TextFormat.merge(reader, dataBuilder);
hubInfoData = dataBuilder.build();
} catch (InvalidProtocolBufferException ipbe) {
throw new InvalidHubInfoException("Corrupted hub info : " + hubInfoStr, ipbe);
} catch (IOException ie) {
throw new InvalidHubInfoException("Corrupted hub info : " + hubInfoStr, ie);
}
final HedwigSocketAddress owner;
try {
owner = new HedwigSocketAddress(hubInfoData.getHostname().trim());
} catch (Exception e) {
throw new InvalidHubInfoException("Corrupted hub server address : " + hubInfoData.getHostname(), e);
}
long ownerZxid = hubInfoData.getCzxid();
return new HubInfo(owner, ownerZxid, hubInfoData);
}
}