package org.wso2.carbon.cep.core.internal.registry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.cep.core.*;
import org.wso2.carbon.cep.core.exception.CEPConfigurationException;
import org.wso2.carbon.cep.core.internal.ds.CEPServiceValueHolder;
import org.wso2.carbon.cep.core.internal.util.CEPConstants;
import org.wso2.carbon.registry.core.Collection;
import org.wso2.carbon.registry.core.Registry;
import org.wso2.carbon.registry.core.Resource;
import org.wso2.carbon.registry.core.exceptions.RegistryException;
import java.util.*;
/**
* this class is used to invoke the registry
* for CEP Operations
*/
public class CEPRegistryInvoker {
private static final Log log = LogFactory.getLog(CEPRegistryInvoker.class);
/**
* This method is used to load the buckets from registry
*
* @return Array of buckets
*/
public static Bucket[] loadBucketsFromRegistry(int tenantId) throws CEPConfigurationException {
Bucket[] buckets = null;
Registry registry = null;
try {
registry = CEPServiceValueHolder.getInstance().getRegistry(tenantId);
} catch (RegistryException e) {
String errorMessage = "Error in getting registry specific to tenant :" + tenantId;
log.error(errorMessage, e);
throw new CEPConfigurationException(errorMessage, e);
}
String parentCollectionPath = CEPConstants.CEP_CONF_ELE_CEP_BUCKETS + CEPConstants.CEP_REGISTRY_BS;
try {
if (registry.resourceExists(parentCollectionPath)) {
if (registry.get(parentCollectionPath) instanceof Collection) {
Collection cepBucketsCollection = (Collection) registry.get(parentCollectionPath);
buckets = new Bucket[cepBucketsCollection.getChildCount()];
int bucketCount = 0;
for (String bucketName : cepBucketsCollection.getChildren()) {
Bucket bucket = new Bucket();
if (registry.get(bucketName) instanceof Collection) {
Collection bucketDetailsCollection = (Collection) registry.get(bucketName);
for (String attirbute : bucketDetailsCollection.getChildren()) {
if (registry.get(attirbute) instanceof Collection) {
Input input;
Query query;
Collection attributeCollection = (Collection) registry.get(attirbute);
for (String names : attributeCollection.getChildren()) {
if (registry.get(names) instanceof Collection) {
if ((CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_INPUTS)
.equals(attirbute.substring(attirbute.lastIndexOf(CEPConstants.CEP_REGISTRY_BS)))) {
input = new Input();
Collection collection3 = (Collection) registry.get(names);
for (String names2 : collection3.getChildren()) {
if (registry.get(names2) instanceof Collection) {
Collection mappingCollection = (Collection) registry.get(names2);
Mapping mapping = new Mapping();
for (String mappingChild : mappingCollection.getChildren()) {
if (registry.get(mappingChild) instanceof Collection) {
Collection mapCollection = (Collection) registry.get(mappingChild);
if ((CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_XPATH_DEFS)
.equals(mappingChild.substring(mappingChild.lastIndexOf(CEPConstants.CEP_REGISTRY_BS)))) {
for (String defs : mapCollection.getChildren()) {
Resource xpathDefResource = registry.get(defs);
Hashtable propertiesHashtable = xpathDefResource.getProperties();
Enumeration e = propertiesHashtable.keys();
while (e.hasMoreElements()) {
String key = (String) e.nextElement();
ArrayList values = (ArrayList) propertiesHashtable.get(key);
XpathDefinition xpathDefinition = new XpathDefinition();
xpathDefinition.setPrefix(key);
xpathDefinition.setNamespace(values.get(0).toString());
mapping.addXpathDefinition(xpathDefinition);
}
}
} else if (
(CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_PROPERTIES)
.equals(mappingChild.substring(mappingChild.lastIndexOf(CEPConstants.CEP_REGISTRY_BS)))) {
for (String defs : mapCollection.getChildren()) {
Resource xpathDefResource = registry.get(defs);
Hashtable propertiesHashtable = xpathDefResource.getProperties();
Enumeration e = propertiesHashtable.keys();
Property property = new Property();
while (e.hasMoreElements()) {
String key = (String) e.nextElement();
ArrayList values = (ArrayList) propertiesHashtable.get(key);
if (CEPConstants.CEP_REGISTRY_NAME.equals(key)) {
property.setName(values.get(0).toString());
} else if (CEPConstants.CEP_REGISTRY_TYPE.equals(key)) {
property.setType(values.get(0).toString());
} else if (CEPConstants.CEP_REGISTRY_XPATH.equals(key)) {
property.setXpath(values.get(0).toString());
}
}
mapping.addProperty(property);
}
}
} else {
Resource resource = registry.get(mappingChild);
Hashtable propertiesHashtable = resource.getProperties();
Enumeration e = propertiesHashtable.keys();
while (e.hasMoreElements()) {
String key = (String) e.nextElement();
ArrayList values = (ArrayList) propertiesHashtable.get(key);
if (CEPConstants.CEP_REGISTRY_STREAM.equals(key)) {
mapping.setStream(values.get(0).toString());
}
}
}
}
input.setMapping(mapping);
} else {
if ((CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_DETAILS)
.equals(names2.substring(names2.lastIndexOf(CEPConstants.CEP_REGISTRY_BS)))) {
Resource resource = registry.get(names2);
Hashtable propertiesHashtable = resource.getProperties();
Enumeration e = propertiesHashtable.keys();
while (e.hasMoreElements()) {
String key = (String) e.nextElement();
ArrayList values = (ArrayList) propertiesHashtable.get(key);
if (CEPConstants.CEP_REGISTRY_TOPIC.equals(key)) {
input.setTopic(values.get(0).toString());
} else if (CEPConstants.CEP_CONF_ELE_BROKER_NAME.equals(key)) {
input.setBrokerName(values.get(0).toString());
}
}
}
}
}
bucket.addInput(input);
} else if ((CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_QUERIES)
.equals(attirbute.substring(attirbute.lastIndexOf(CEPConstants.CEP_REGISTRY_BS)))) {
query = new Query();
Expression expression = new Expression();
Output output = new Output();
Collection collection3 = (Collection) registry.get(names);
for (String names2 : collection3.getChildren()) {
if (registry.get(names2) instanceof Collection) {
Collection outputCollection = (Collection) registry.get(names2);
for (String outputS : outputCollection.getChildren()) {
if (registry.get(outputS) instanceof Collection) {
Collection outputMapping = (Collection) registry.get(outputS);
if ((CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_ELEMENT_MAPPING)
.equals(outputS.substring(outputS.lastIndexOf(CEPConstants.CEP_REGISTRY_BS)))) {
ElementMapping elementMapping = new ElementMapping();
for (String mappingName : outputMapping.getChildren()) {
if (registry.get(mappingName) instanceof Collection) {
Collection propertyCollection = (Collection) registry.get(mappingName);
for (String propertyName : propertyCollection.getChildren()) {
Resource propertyResource = registry.get(propertyName);
Hashtable propertiesHashtable = propertyResource.getProperties();
Enumeration e = propertiesHashtable.keys();
Property property = new Property();
while (e.hasMoreElements()) {
String key = (String) e.nextElement();
ArrayList values = (ArrayList) propertiesHashtable.get(key);
if (CEPConstants.CEP_REGISTRY_NAME.equals(key)) {
property.setName(values.get(0).toString());
} else if (CEPConstants.CEP_REGISTRY_XML_FIELD_NAME.equals(key)) {
property.setXmlFieldName(values.get(0).toString());
} else if (CEPConstants.CEP_REGISTRY_XML_FIELD_TYPE.equals(key)) {
property.setXmlFieldType(values.get(0).toString());
}
}
elementMapping.addProperty(property);
}
} else {
Resource outputdetailsResource = registry.get(mappingName);
Hashtable propertiesHashtable = outputdetailsResource.getProperties();
Enumeration e = propertiesHashtable.keys();
while (e.hasMoreElements()) {
String key = (String) e.nextElement();
ArrayList values = (ArrayList) propertiesHashtable.get(key);
if (CEPConstants.CEP_REGISTRY_DOC_ELEMENT.equals(key)) {
elementMapping.setDocumentElement(values.get(0).toString());
} else if (CEPConstants.CEP_REGISTRY_NS.equals(key)) {
elementMapping.setNamespace(values.get(0).toString());
}
}
}
output.setElementMapping(elementMapping);
}
} else {
XMLMapping xmlMapping = null;
for (String mappingName : outputMapping.getChildren()) {
xmlMapping = new XMLMapping();
if (registry.get(mappingName) instanceof Collection) {
} else {
Resource outputdetailsResource = registry.get(mappingName);
Hashtable propertiesHashtable = outputdetailsResource.getProperties();
Enumeration e = propertiesHashtable.keys();
while (e.hasMoreElements()) {
String key = (String) e.nextElement();
ArrayList values = (ArrayList) propertiesHashtable.get(key);
if (CEPConstants.CEP_REGISTRY_TEXT.equals(key)) {
xmlMapping.setMappingXMLText(values.get(0).toString());
}
}
}
}
output.setXmlMapping(xmlMapping);
}
} else {
Resource outputdetailsResource = registry.get(outputS);
Hashtable propertiesHashtable = outputdetailsResource.getProperties();
Enumeration e = propertiesHashtable.keys();
while (e.hasMoreElements()) {
String key = (String) e.nextElement();
ArrayList values = (ArrayList) propertiesHashtable.get(key);
if (CEPConstants.CEP_REGISTRY_TOPIC.equals(key)) {
output.setTopic(values.get(0).toString());
} else if (CEPConstants.CEP_CONF_ELE_BROKER_NAME.equals(key)) {
output.setBrokerName(values.get(0).toString());
}
}
}
}
} else {
Resource detailsResource = registry.get(names2);
Hashtable propertiesHashtable = detailsResource.getProperties();
Enumeration e = propertiesHashtable.keys();
while (e.hasMoreElements()) {
String key = (String) e.nextElement();
ArrayList values = (ArrayList) propertiesHashtable.get(key);
if (CEPConstants.CEP_REGISTRY_NAME.equals(key)) {
query.setName(values.get(0).toString());
} else if (CEPConstants.CEP_REGISTRY_TYPE.equals(key)) {
expression.setType(values.get(0).toString());
} /*else if (CEPConstants.CEP_REGISTRY_EXPRESSION.equals(key)) {
expression.setText(values.get(0).toString());
}*/
}
String content = new String((byte[]) detailsResource.getContent());
expression.setText(content);
}
}
query.setExpression(expression);
query.setOutput(output);
bucket.addQuery(query);
}
}
}
} else {
Resource propertyResource = registry.get(attirbute);
Hashtable propeHashtable = propertyResource.getProperties();
Enumeration e = propeHashtable.keys();
while (e.hasMoreElements()) {
String key = (String) e.nextElement();
ArrayList values = (ArrayList) propeHashtable.get(key);
if (CEPConstants.CEP_CONF_ELE_NAME.equals(key)) {
bucket.setName(values.get(0).toString().trim());
} else if (CEPConstants.CEP_CONF_ELE_DESCRIPTION.equals(key)) {
if(values.get(0) != null){
bucket.setDescription(values.get(0).toString().trim());
}else {
bucket.setDescription("");
}
} else if (CEPConstants.CEP_CONF_ELE_CEP_ENGINE_PROVIDER.equals(key)) {
bucket.setEngineProvider(values.get(0).toString().trim());
} else if (CEPConstants.CEP_CONF_ELE_CEP_BUCKET_OWNER.equals(key)){
bucket.setOwner(values.get(0).toString());
}
}
}
}
}
buckets[bucketCount] = bucket;
bucketCount++;
}
}
}
} catch (RegistryException e) {
String errorMessage = "Unable to load buckets from registry ";
log.error(errorMessage, e);
throw new CEPConfigurationException(errorMessage, e);
}
log.debug("Loaded buckets from the registry successfully");
return buckets;
}
/**
* this method is used to add a given
* bucket to the config registry
*
* @param bucket
*/
public static void addBucketsToRegistry(Bucket bucket, int tenantId) throws CEPConfigurationException {
try {
Registry registry = CEPServiceValueHolder.getInstance().getRegistry(tenantId);
String parentCollectionPath = CEPConstants.CEP_CONF_ELE_CEP_BUCKETS + CEPConstants.CEP_REGISTRY_BS + bucket.getName();
if (registry.resourceExists(parentCollectionPath) == true) {
return;
}
registry.put(parentCollectionPath, registry.newCollection());
Resource bucketProperties = registry.newResource();
bucketProperties.addProperty(CEPConstants.CEP_CONF_ELE_NAME, bucket.getName());
bucketProperties.addProperty(CEPConstants.CEP_CONF_ELE_DESCRIPTION, bucket.getDescription());
bucketProperties.addProperty(CEPConstants.CEP_CONF_ELE_CEP_ENGINE_PROVIDER, bucket.getEngineProvider());
bucketProperties.addProperty(CEPConstants.CEP_CONF_ELE_CEP_BUCKET_OWNER, bucket.getOwner());
registry.put(parentCollectionPath + CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_PROPERTIES, bucketProperties);
String inputsCollectionPath = parentCollectionPath + CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_INPUTS;
registry.put(inputsCollectionPath, registry.newCollection());
for (Input input : bucket.getInputs()) {
String inputResourcePath = inputsCollectionPath + "/" + input.getTopic();
registry.put(inputResourcePath, registry.newCollection());
Resource inputTopic = registry.newResource();
inputTopic.addProperty(CEPConstants.CEP_CONF_ELE_TOPIC, input.getTopic());
inputTopic.addProperty(CEPConstants.CEP_CONF_ELE_BROKER_NAME, input.getBrokerName());
registry.put(inputResourcePath + CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_DETAILS, inputTopic);
registry.put(inputResourcePath + CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_MAPPING, registry.newCollection());
String mappingPath = inputResourcePath + CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_MAPPING + CEPConstants.CEP_REGISTRY_BS;
Resource streamResource = registry.newResource();
streamResource.addProperty(CEPConstants.CEP_REGISTRY_STREAM, input.getMapping().getStream());
registry.put(mappingPath + CEPConstants.CEP_REGISTRY_STREAM, streamResource);
/* Map<String, String> xpathDefinitionsTable = input.getMapping().getXpathNamespacePrefixes();
if (xpathDefinitionsTable != null && xpathDefinitionsTable.size() > 0) {
Set set = xpathDefinitionsTable.keySet();
Iterator it = set.iterator();
while (it.hasNext()) {
String key = (String) it.next();
String value = xpathDefinitionsTable.get(key);
Resource xpathDef = registry.newResource();
xpathDef.addProperty(key, value);
registry.put(mappingPath + CEPConstants.CEP_REGISTRY_XPATH_DEFS + CEPConstants.CEP_REGISTRY_BS + key, xpathDef);
}
}*/
List<XpathDefinition> xpathDefinitionList = input.getMapping().getXpathNamespacePrefixes();
if(xpathDefinitionList != null && xpathDefinitionList.size() >0){
for(XpathDefinition xpathDefinition : xpathDefinitionList){
String key = xpathDefinition.getPrefix();
String value = xpathDefinition.getNamespace();
Resource xpathDef = registry.newResource();
xpathDef.addProperty(key, value);
registry.put(mappingPath + CEPConstants.CEP_REGISTRY_XPATH_DEFS + CEPConstants.CEP_REGISTRY_BS + key, xpathDef);
}
}
if (input.getMapping().getProperties() != null) {
for (Property property : input.getMapping().getProperties()) {
Resource propertyResource = registry.newResource();
propertyResource.addProperty(CEPConstants.CEP_REGISTRY_NAME, property.getName());
propertyResource.addProperty(CEPConstants.CEP_REGISTRY_TYPE, property.getType());
propertyResource.addProperty(CEPConstants.CEP_REGISTRY_XPATH, property.getXpath());
registry.put(mappingPath + CEPConstants.CEP_REGISTRY_PROPERTIES + CEPConstants.CEP_REGISTRY_BS + property.getName(), propertyResource);
}
}
}
String queriesCollectionPath = parentCollectionPath + CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_QUERIES;
registry.put(queriesCollectionPath, registry.newCollection());
for (Query query : bucket.getQueries()) {
String queryPath = queriesCollectionPath + "/" + query.getName();
registry.put(queryPath, registry.newCollection());
Resource queryResource = registry.newResource();
queryResource.setProperty(CEPConstants.CEP_REGISTRY_NAME, query.getName());
queryResource.setProperty(CEPConstants.CEP_REGISTRY_TYPE, query.getExpression().getType());
// queryResource.setProperty(CEPConstants.CEP_REGISTRY_EXPRESSION,query.getExpression().getText());
queryResource.setContent(query.getExpression().getText());
registry.put(queryPath + CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_DETAILS, queryResource);
registry.put(queryPath + CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_OUTPUT, registry.newCollection());
Output output = query.getOutput();
Resource outputResource = registry.newResource();
outputResource.addProperty(CEPConstants.CEP_REGISTRY_TOPIC, output.getTopic());
outputResource.addProperty(CEPConstants.CEP_CONF_ELE_BROKER_NAME, output.getBrokerName());
outputResource.addProperty(CEPConstants.CEP_REGISTRY_TYPE,
output.getElementMapping() == null ? CEPConstants.CEP_REGISTRY_XML_MAPPING : CEPConstants.CEP_REGISTRY_ELEMENT_MAPPING);
registry.put(queryPath + CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_OUTPUT +
CEPConstants.CEP_REGISTRY_BS + output.getTopic(), outputResource);
ElementMapping elementMapping = output.getElementMapping();
if (elementMapping != null) {
String elementMappingPathString = CEPConstants.CEP_REGISTRY_BS +
CEPConstants.CEP_REGISTRY_OUTPUT +
CEPConstants.CEP_REGISTRY_BS +
CEPConstants.CEP_REGISTRY_ELEMENT_MAPPING;
registry.put(queryPath + elementMappingPathString, registry.newCollection());
Resource elementMappingResource = registry.newResource();
elementMappingResource.addProperty(CEPConstants.CEP_REGISTRY_DOC_ELEMENT, elementMapping.getDocumentElement());
elementMappingResource.addProperty(CEPConstants.CEP_REGISTRY_NS, elementMapping.getNamespace());
registry.put(queryPath + elementMappingPathString +
CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_DETAILS, elementMappingResource);
registry.put(queryPath + elementMappingPathString +
CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_PROPERTIES, registry.newCollection());
for (Property property : elementMapping.getProperties()) {
Resource elementMappingProperties = registry.newResource();
elementMappingProperties.addProperty(CEPConstants.CEP_REGISTRY_NAME, property.getName());
elementMappingProperties.addProperty(CEPConstants.CEP_REGISTRY_XML_FIELD_NAME, property.getXmlFieldName());
elementMappingProperties.addProperty(CEPConstants.CEP_REGISTRY_XML_FIELD_TYPE, property.getXmlFieldType());
registry.put(queryPath + elementMappingPathString +
CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_PROPERTIES +
CEPConstants.CEP_REGISTRY_BS + property.getName(), elementMappingProperties);
}
} else {
registry.put(queryPath +
CEPConstants.CEP_REGISTRY_BS +
CEPConstants.CEP_REGISTRY_OUTPUT +
CEPConstants.CEP_REGISTRY_BS +
CEPConstants.CEP_REGISTRY_XML_MAPPING, registry.newCollection());
Resource xmlMappingResource = registry.newResource();
xmlMappingResource.addProperty(CEPConstants.CEP_REGISTRY_TEXT, output.getXmlMapping().getMappingXMLText());
registry.put(queryPath +
CEPConstants.CEP_REGISTRY_BS +
CEPConstants.CEP_REGISTRY_OUTPUT +
CEPConstants.CEP_REGISTRY_BS +
CEPConstants.CEP_REGISTRY_XML_MAPPING +
CEPConstants.CEP_REGISTRY_BS +
CEPConstants.CEP_REGISTRY_DETAILS
, xmlMappingResource);
}
}
} catch (RegistryException e) {
String errorMessage = "Can not add bucket to the registry ";
log.error(errorMessage, e);
throw new CEPConfigurationException(errorMessage, e);
}
log.debug("Added the bucket to the registry successfully");
}
/**
* When there are new queries or inputs for an existing bucket
* This method will modify the registry entire related to that existing bucket by
* add new Inputs and New Queries
*
* @param bucket
*/
public static void modifyBucketInRegistry(Bucket bucket, int tenantId) throws CEPConfigurationException {
try {
Registry registry = CEPServiceValueHolder.getInstance().getRegistry(tenantId);
String parentCollectionPath = CEPConstants.CEP_CONF_ELE_CEP_BUCKETS + CEPConstants.CEP_REGISTRY_BS + bucket.getName();
if (registry.resourceExists(parentCollectionPath)) {
registry.put(parentCollectionPath, registry.newCollection());
Resource bucketProperties = registry.newResource();
bucketProperties.addProperty(CEPConstants.CEP_CONF_ELE_NAME, bucket.getName());
bucketProperties.addProperty(CEPConstants.CEP_CONF_ELE_DESCRIPTION, bucket.getDescription());
bucketProperties.addProperty(CEPConstants.CEP_CONF_ELE_CEP_ENGINE_PROVIDER, bucket.getEngineProvider());
// registry.put(parentCollectionPath + CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_PROPERTIES, bucketProperties);
String inputsCollectionPath = parentCollectionPath + CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_INPUTS;
if (!registry.resourceExists(inputsCollectionPath)) {
registry.put(inputsCollectionPath, registry.newCollection());
}
for (Input input : bucket.getInputs()) {
String inputResourcePath = inputsCollectionPath + "/" + input.getTopic();
registry.put(inputResourcePath, registry.newCollection());
Resource inputTopic = registry.newResource();
inputTopic.addProperty(CEPConstants.CEP_CONF_ELE_TOPIC, input.getTopic());
inputTopic.addProperty(CEPConstants.CEP_CONF_ELE_BROKER_NAME, input.getBrokerName());
registry.put(inputResourcePath + CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_DETAILS, inputTopic);
registry.put(inputResourcePath + CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_MAPPING, registry.newCollection());
String mappingPath = inputResourcePath + CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_MAPPING + CEPConstants.CEP_REGISTRY_BS;
Resource streamResource = registry.newResource();
streamResource.addProperty(CEPConstants.CEP_REGISTRY_STREAM, input.getMapping().getStream());
registry.put(mappingPath + CEPConstants.CEP_REGISTRY_STREAM, streamResource);
/* Map<String, String> xpathDefinitionsTable = input.getMapping().getXpathNamespacePrefixes();
if (xpathDefinitionsTable != null && xpathDefinitionsTable.size() > 0) {
Set set = xpathDefinitionsTable.keySet();
Iterator it = set.iterator();
while (it.hasNext()) {
String key = (String) it.next();
String value = xpathDefinitionsTable.get(key);
Resource xpathDef = registry.newResource();
xpathDef.addProperty(key, value);
registry.put(mappingPath + CEPConstants.CEP_REGISTRY_XPATH_DEFS + CEPConstants.CEP_REGISTRY_BS + key, xpathDef);
}
}*/
List<XpathDefinition> xpathDefinitionList = input.getMapping().getXpathNamespacePrefixes();
if(xpathDefinitionList != null && xpathDefinitionList.size()>0){
for(XpathDefinition xpathDefinition : xpathDefinitionList){
String key = xpathDefinition.getPrefix();
String value = xpathDefinition.getNamespace();
Resource xpathDef = registry.newResource();
xpathDef.addProperty(key, value);
registry.put(mappingPath + CEPConstants.CEP_REGISTRY_XPATH_DEFS + CEPConstants.CEP_REGISTRY_BS + key, xpathDef);
}
}
if (input.getMapping().getProperties() != null) {
for (Property property : input.getMapping().getProperties()) {
Resource propertyResource = registry.newResource();
propertyResource.addProperty(CEPConstants.CEP_REGISTRY_NAME, property.getName());
propertyResource.addProperty(CEPConstants.CEP_REGISTRY_TYPE, property.getType());
propertyResource.addProperty(CEPConstants.CEP_REGISTRY_XPATH, property.getXpath());
registry.put(mappingPath + CEPConstants.CEP_REGISTRY_PROPERTIES + CEPConstants.CEP_REGISTRY_BS + property.getName(), propertyResource);
}
}
}
String queriesCollectionPath = parentCollectionPath + CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_QUERIES;
if (registry.resourceExists(queriesCollectionPath)) {
registry.put(queriesCollectionPath, registry.newCollection());
}
for (Query query : bucket.getQueries()) {
String queryPath = queriesCollectionPath + "/" + query.getName();
registry.put(queryPath, registry.newCollection());
Resource queryResource = registry.newResource();
queryResource.setProperty(CEPConstants.CEP_REGISTRY_NAME, query.getName());
queryResource.setProperty(CEPConstants.CEP_REGISTRY_TYPE, query.getExpression().getType());
// queryResource.setProperty(CEPConstants.CEP_REGISTRY_EXPRESSION, query.getExpression().getText());
queryResource.setContent(query.getExpression().getText());
registry.put(queryPath + CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_DETAILS, queryResource);
registry.put(queryPath + CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_OUTPUT, registry.newCollection());
Output output = query.getOutput();
Resource outputResource = registry.newResource();
outputResource.addProperty(CEPConstants.CEP_REGISTRY_TOPIC, output.getTopic());
outputResource.addProperty(CEPConstants.CEP_CONF_ELE_BROKER_NAME, output.getBrokerName());
outputResource.addProperty(CEPConstants.CEP_REGISTRY_TYPE,
output.getElementMapping() == null ? CEPConstants.CEP_REGISTRY_XML_MAPPING : CEPConstants.CEP_REGISTRY_ELEMENT_MAPPING);
registry.put(queryPath + CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_OUTPUT +
CEPConstants.CEP_REGISTRY_BS + output.getTopic(), outputResource);
ElementMapping elementMapping = output.getElementMapping();
if (elementMapping != null) {
String elementMappingPathString = CEPConstants.CEP_REGISTRY_BS +
CEPConstants.CEP_REGISTRY_OUTPUT +
CEPConstants.CEP_REGISTRY_BS +
CEPConstants.CEP_REGISTRY_ELEMENT_MAPPING;
registry.put(queryPath + elementMappingPathString, registry.newCollection());
Resource elementMappingResource = registry.newResource();
elementMappingResource.addProperty(CEPConstants.CEP_REGISTRY_DOC_ELEMENT, elementMapping.getDocumentElement());
elementMappingResource.addProperty(CEPConstants.CEP_REGISTRY_NS, elementMapping.getNamespace());
registry.put(queryPath + elementMappingPathString +
CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_DETAILS, elementMappingResource);
registry.put(queryPath + elementMappingPathString +
CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_PROPERTIES, registry.newCollection());
for (Property property : elementMapping.getProperties()) {
Resource elementMappingProperties = registry.newResource();
elementMappingProperties.addProperty(CEPConstants.CEP_REGISTRY_NAME, property.getName());
elementMappingProperties.addProperty(CEPConstants.CEP_REGISTRY_XML_FIELD_NAME, property.getXmlFieldName());
elementMappingProperties.addProperty(CEPConstants.CEP_REGISTRY_XML_FIELD_TYPE, property.getXmlFieldType());
registry.put(queryPath + elementMappingPathString +
CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_PROPERTIES +
CEPConstants.CEP_REGISTRY_BS + property.getName(), elementMappingProperties);
}
} else {
registry.put(queryPath +
CEPConstants.CEP_REGISTRY_BS +
CEPConstants.CEP_REGISTRY_OUTPUT +
CEPConstants.CEP_REGISTRY_BS +
CEPConstants.CEP_REGISTRY_XML_MAPPING, registry.newCollection());
Resource xmlMappingResource = registry.newResource();
xmlMappingResource.addProperty(CEPConstants.CEP_REGISTRY_TEXT, output.getXmlMapping().getMappingXMLText());
registry.put(queryPath +
CEPConstants.CEP_REGISTRY_BS +
CEPConstants.CEP_REGISTRY_OUTPUT +
CEPConstants.CEP_REGISTRY_BS +
CEPConstants.CEP_REGISTRY_XML_MAPPING +
CEPConstants.CEP_REGISTRY_BS +
CEPConstants.CEP_REGISTRY_DETAILS
, xmlMappingResource);
}
}
}
} catch (RegistryException e) {
String errorMessage = "Can not modify the bucket in registry ";
log.error(errorMessage, e);
throw new CEPConfigurationException(errorMessage, e);
}
log.debug("Modified the bucket successfully");
}
/**
* this method is used to remove all the buckets
* from registry
*/
public static void removeAllBucketsFromRegistry(int tenantId) throws CEPConfigurationException {
try {
Registry registry = CEPServiceValueHolder.getInstance().getRegistry(tenantId);
registry.delete(CEPConstants.CEP_CONF_ELE_CEP_BUCKETS);
} catch (RegistryException e) {
String errorMessage = "Error in removing all buckets from registry ";
log.error(errorMessage, e);
throw new CEPConfigurationException(errorMessage, e);
}
}
/**
* This method is used to remove a given bucket
* from registry
*
* @param bucketName
*/
public static void removeBucketFromRegistry(String bucketName, int tenantId) throws CEPConfigurationException {
try {
Registry registry = CEPServiceValueHolder.getInstance().getRegistry(tenantId);
registry.delete(CEPConstants.CEP_CONF_ELE_CEP_BUCKETS + CEPConstants.CEP_REGISTRY_BS + bucketName);
} catch (RegistryException e) {
String errorMessage = "Error in removing bucket :" + bucketName + " from registry ";
log.error(errorMessage, e);
throw new CEPConfigurationException(errorMessage, e);
}
}
/**
* This method will remove the specified query from registry if exists
*
* @param bucketName -Name of the bucket which the query exists
* @param queryName - Name of the query to be deleted
*/
public static void removeQueryFromRegistry(String bucketName, String queryName, int tenantId) throws CEPConfigurationException {
String parentCollectionPath = CEPConstants.CEP_CONF_ELE_CEP_BUCKETS + CEPConstants.CEP_REGISTRY_BS + bucketName;
String queriesCollectionPath = parentCollectionPath + CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_QUERIES;
String queryPath = queriesCollectionPath + "/" + queryName;
try {
Registry registry = CEPServiceValueHolder.getInstance().getRegistry(tenantId);
if (registry.resourceExists(queryPath)) {
registry.delete(queryPath);
}
} catch (RegistryException e) {
String errorMessage = "Error in deleting the query to be deleted ";
log.error(errorMessage, e);
throw new CEPConfigurationException(errorMessage, e);
}
}
/**
* This method will remove the specified Input from registry if exists
*
* @param bucketName - Name of the bucket which input exists
* @param inputTopic - topic to be deleted from registry
*/
public static void removeInputFromRegistry(String bucketName, String inputTopic, int tenantId) throws CEPConfigurationException {
String parentCollectionPath = CEPConstants.CEP_CONF_ELE_CEP_BUCKETS + CEPConstants.CEP_REGISTRY_BS + bucketName;
String inputsCollectionPath = parentCollectionPath + CEPConstants.CEP_REGISTRY_BS + CEPConstants.CEP_REGISTRY_INPUTS;
String inputPath = inputsCollectionPath + "/" + inputTopic;
try {
Registry registry = CEPServiceValueHolder.getInstance().getRegistry(tenantId);
if (registry.resourceExists(inputPath)) {
registry.delete(inputPath);
}
} catch (RegistryException e) {
String errorMessage = "Error in deleting the input to be deleted ";
log.error(errorMessage, e);
throw new CEPConfigurationException(errorMessage, e);
}
}
}