package com.linkedin.databus2.producers.db;
/*
* Copyright 2013 LinkedIn Corp. All rights reserved
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.Reader;
import java.io.StringWriter;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.sql.Array;
import java.sql.Blob;
import java.sql.Clob;
import java.sql.Date;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLXML;
import java.sql.Struct;
import java.sql.Timestamp;
import java.util.List;
import java.util.regex.Pattern;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericArray;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.Encoder;
import org.apache.log4j.Logger;
import com.linkedin.databus.core.DbusEventBufferAppendable;
import com.linkedin.databus.core.DbusEventKey;
import com.linkedin.databus.core.UnsupportedKeyException;
import com.linkedin.databus.core.monitoring.mbean.DbusEventsStatisticsCollector;
import com.linkedin.databus2.producers.EventCreationException;
import com.linkedin.databus2.producers.PartitionFunction;
import com.linkedin.databus2.relay.OracleJarUtils;
import com.linkedin.databus2.relay.config.ReplicationBitSetterStaticConfig;
import com.linkedin.databus2.relay.config.ReplicationBitSetterStaticConfig.SourceType;
import com.linkedin.databus2.schemas.utils.SchemaHelper;
/**
* Reusable EventFactory which can generate Avro serialized records based on an Avro schema file
* and using Avro's GenericData facilities.
*
* This EventFactory requires specific metadata be included in the Avro schema file, including
* database field names and column indexes. Ideally the schema is generated by the
* Databus2 SchemaGenerator command-line tool, which generates this metadata automatically.
*/
public class OracleAvroGenericEventFactory
implements EventFactory
{
/** Avro schema for the generated events. */
protected final Schema _eventSchema;
/** Unique schema ID for the _eventSchema. */
protected final byte[] _schemaId;
/** Source ID for this event source. */
protected final short _sourceId;
/** Physical source id */
protected final short _pSourceId;
/** PartitionFunction used to generate the event partition based on event key. */
protected final PartitionFunction _partitionFunction;
/** Logger for error and debug messages. */
private final Logger _log = Logger.getLogger(getClass());
/** key column name. */
protected String keyColumnName = "key";
/** Replication BitSetter StaticConfig **/
private final ReplicationBitSetterStaticConfig _replSetterConfig;
private final Pattern _replBitSetterPattern;
public static final String MODULE = OracleAvroGenericEventFactory.class.getName();
public static final Logger LOG = Logger.getLogger(MODULE);
public OracleAvroGenericEventFactory(short sourceId, short pSourceId,
String eventSchema, PartitionFunction partitionFunction,
ReplicationBitSetterStaticConfig replSetterConfig)
throws EventCreationException, UnsupportedKeyException
{
// TODO: Constructor should validate that eventSchema has all required metadata
_sourceId = sourceId;
_pSourceId = pSourceId;
_eventSchema = Schema.parse(eventSchema);
_schemaId = SchemaHelper.getSchemaId(eventSchema);
_partitionFunction = partitionFunction;
_replSetterConfig = replSetterConfig;
if ((null != _replSetterConfig) && (SourceType.COLUMN.equals(_replSetterConfig.getSourceType())))
_replBitSetterPattern = Pattern.compile(replSetterConfig.getRemoteUpdateValueRegex());
else
_replBitSetterPattern = null;
String keyNameOverride = SchemaHelper.getMetaField(_eventSchema, "pk");
if (null != keyNameOverride)
{
keyColumnName = keyNameOverride;
_log.info(_eventSchema.getFullName() + ": using primary key override:" + keyColumnName);
}
// Examine the event schema to determine the proper type of "key". The key must be a
// String, int, or long data type.
Field keyField = _eventSchema.getField(keyColumnName);
// work around for bizfollow for now, chekc if it is id
if (keyField == null)
{
keyField = _eventSchema.getField("id");
if (keyField != null)
{
keyColumnName = "id";
}
}
if(keyField == null)
{
throw new EventCreationException("The event schema is missing the required field \"key\".");
}
}
protected byte[] serializeEvent(GenericRecord record,
long scn,
long timestamp,
ResultSet row,
DbusEventBufferAppendable eventBuffer,
boolean enableTracing,
DbusEventsStatisticsCollector dbusEventsStatisticsCollector)
throws EventCreationException, UnsupportedKeyException
{
// Serialize the row
byte[] serializedValue;
try
{
ByteArrayOutputStream bos = new ByteArrayOutputStream();
Encoder encoder = new BinaryEncoder(bos);
GenericDatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(record.getSchema());
writer.write(record, encoder);
serializedValue = bos.toByteArray();
}
catch(IOException ex)
{
throw new EventCreationException("Failed to serialize the Avro GenericRecord. ResultSet was: (" + row + ")", ex);
}
catch(RuntimeException ex)
{
// Avro likes to throw RuntimeExceptions instead of checked exceptions when serialization fails.
throw new EventCreationException("Failed to serialize the Avro GenericRecord. ResultSet was: (" + row + ")", ex);
}
return serializedValue;
}
/*
* @see com.linkedin.databus2.monitors.db.EventFactory#createEvent(long, long, java.sql.ResultSet)
*/
@Override
public long createAndAppendEvent(long scn,
long timestamp,
ResultSet row,
DbusEventBufferAppendable eventBuffer,
boolean enableTracing,
DbusEventsStatisticsCollector dbusEventsStatisticsCollector)
throws SQLException, EventCreationException, UnsupportedKeyException
{
// Serialize the row into an Avro GenericRecord
GenericRecord record = buildGenericRecord(row);
boolean isReplicated = isReplicatedEvent(row);
return createAndAppendEvent(scn, timestamp, record, row, eventBuffer, enableTracing,
isReplicated, dbusEventsStatisticsCollector);
}
/**
* Inspects the row to identify if the event is replicated or not
* @param row ResultSet
* @return true if the row is deemed replicated from external DB, false otherwise
*/
private boolean isReplicatedEvent(ResultSet row)
throws SQLException
{
boolean replicated = false;
try
{
if ((null != row) &&
(_replSetterConfig != null) &&
(SourceType.COLUMN.equals(_replSetterConfig.getSourceType())))
{
String value = row.getString(_replSetterConfig.getFieldName());
if ((null != value) && (null != _replBitSetterPattern))
{
replicated =_replBitSetterPattern.matcher(value).matches();
}
}
}
catch (SQLException sqlEx)
{
LOG.error("Unable to identify if this row was externally replicated. Config Used :" + _replSetterConfig,sqlEx);
throw sqlEx;
}
return replicated;
}
@Override
public long createAndAppendEvent(long scn, long timestamp,
GenericRecord record, DbusEventBufferAppendable eventBuffer,
boolean enableTracing,
DbusEventsStatisticsCollector dbusEventsStatisticsCollector)
throws EventCreationException, UnsupportedKeyException
{
return createAndAppendEvent(scn, timestamp, record, null, eventBuffer, enableTracing,
false, dbusEventsStatisticsCollector);
}
public long createAndAppendEvent(long scn,
long timestamp,
GenericRecord record,
ResultSet row,
DbusEventBufferAppendable eventBuffer,
boolean enableTracing,
boolean isReplicated,
DbusEventsStatisticsCollector dbusEventsStatisticsCollector)
throws EventCreationException, UnsupportedKeyException
{
byte[] serializedValue = serializeEvent(record, scn, timestamp, row, eventBuffer, enableTracing,
dbusEventsStatisticsCollector);
// Append the event to the databus event buffer
//DbusEventKey eventKey = new DbusEventKey(record.get("key"));
DbusEventKey eventKey = new DbusEventKey(record.get(keyColumnName));
short lPartitionId = _partitionFunction.getPartition(eventKey);
//short pPartitionId = PhysicalSourceConfig.DEFAULT_PHYSICAL_PARTITION.shortValue();
eventBuffer.appendEvent(eventKey, _pSourceId, lPartitionId, timestamp * 1000000, _sourceId,
_schemaId, serializedValue, enableTracing, isReplicated, dbusEventsStatisticsCollector);
return serializedValue.length;
}
/**
* Build a GenericRecord from the contents of the current ResultSet row.
* @param rs
* @return
* @throws SQLException
*/
protected GenericRecord buildGenericRecord(ResultSet rs)
throws SQLException, EventCreationException
{
boolean traceEnabled = _log.isTraceEnabled();
if (traceEnabled)
{
_log.trace("--- New Record ---");
}
// Initialize a new GenericData.Record from the event schema
GenericRecord record = new GenericData.Record(_eventSchema);
// Iterate over the array of fields defined in the Avro schema
List<Field> fields = _eventSchema.getFields();
for (Field field : fields)
{
// Get the Avro field type information
String schemaFieldName = field.name();
// This is just field.schema() if field is not a union; but if it IS one,
// this is the schema of the first non-null type within the union:
Schema fieldSchema = SchemaHelper.unwindUnionSchema(field);
Type avroFieldType = fieldSchema.getType();
if (avroFieldType == Type.ARRAY)
{
// Process as an array. Note that we're encoding to Avro's internal representation rather
// than to Avro binary format, which is what allows us to directly encode one of the union's
// inner types (here as well as in put()) instead of wrapping the inner type in a union.
// (Avro's binary encoding for unions includes an additional long index value before the
// encoding of the selected inner type.)
putArray(record, schemaFieldName, fieldSchema, getJdbcArray(rs, fieldSchema));
}
else
{
String databaseFieldName = SchemaHelper.getMetaField(field, "dbFieldName");
try
{
Object databaseFieldValue = rs.getObject(databaseFieldName);
put(record, field, databaseFieldValue);
}
catch (SQLException ex)
{
_log.error("Failed to read column (" + databaseFieldName + ") for source (" + _sourceId + ")");
throw ex;
}
}
}
// Return the Avro record.
return record;
}
private static Array getJdbcArray(ResultSet rs, Schema schema)
throws EventCreationException
{
Schema elementSchema = schema.getElementType(); // fails if schema isn't for array type
String dbFieldName = SchemaHelper.getMetaField(elementSchema, "dbFieldName");
if (dbFieldName == null)
{
throw new EventCreationException("array field is missing required metadata dbFieldName. "
+ schema.getName());
}
Array array;
try
{
array = rs.getArray(dbFieldName);
}
catch (SQLException e)
{
throw new EventCreationException("unable to read array field: " + dbFieldName + ": "
+ e.getMessage(), e);
}
return array;
}
private void putArray(GenericRecord record,
String arrayFieldName,
Schema schema,
Array array)
throws EventCreationException
{
// Make sure this is an array type
if (schema.getType() != Type.ARRAY)
{
throw new EventCreationException("Not an array type. " + schema.getName());
}
Schema elementSchema = schema.getElementType();
GenericArray<GenericRecord> avroArray = new GenericData.Array<GenericRecord>(0, schema);
try
{
ResultSet arrayResultSet = array.getResultSet();
try
{
while (arrayResultSet.next())
{
// Create the avro record and add it to the array
GenericRecord elemRecord = new GenericData.Record(elementSchema);
avroArray.add(elemRecord);
// Get the underlying structure from the database. Oracle returns the structure in the
// second column of the array's ResultSet
Struct struct = (Struct) arrayResultSet.getObject(2);
putOracleRecord(elemRecord, elementSchema, struct);
}
}
finally
{
arrayResultSet.close();
}
}
catch (SQLException e)
{
throw new EventCreationException("putArray error: " + e.getMessage(), e);
}
record.put(arrayFieldName, avroArray);
}
/**
* Copies the value of a simple-type event field from DB field value to an Avro record
*
* @param parentRecord the parent Avro record to which to add the generated child
* @param fieldName the name of the Avro field
* @param fieldSchema the schema of the Avro field (must be a record)
* @param dbFieldValue the DB field value from the result set
* @throws EventCreationException if conversion from the STRUCT type to the Avro type failed
*/
private void addOracleRecordToParent(GenericRecord parentRecord,
String fieldName,
Schema fieldSchema,
Struct dbFieldValue)
throws EventCreationException
{
GenericRecord fieldRecord = new GenericData.Record(fieldSchema);
putOracleRecord(fieldRecord, fieldSchema, dbFieldValue);
parentRecord.put(fieldName, fieldRecord);
}
private void processRecordField(GenericRecord fieldRecord, Field field, Object[] structAttribs)
throws EventCreationException
{
String recordFieldName = field.name();
String dbFieldPositionStr = SchemaHelper.getMetaField(field, "dbFieldPosition");
int dbFieldPosition = 0;
if (null != dbFieldPositionStr && !dbFieldPositionStr.isEmpty())
{
dbFieldPosition = Integer.valueOf(dbFieldPositionStr);
}
Object structAttribValue = structAttribs[dbFieldPosition];
if (structAttribValue == null)
{
// The field value was null. If the field is nullable then we do nothing. Otherwise this is an error.
boolean isNullAllowedInSchema = SchemaHelper.isNullable(field.schema());
if (!isNullAllowedInSchema)
{
throw new EventCreationException("Null value not allowed for field " + recordFieldName +
":" + field.schema());
}
}
else
{
Schema recordSchema = SchemaHelper.unwindUnionSchema(field); // == field.schema() if not a union
Type recordFieldType = recordSchema.getType();
switch (recordFieldType)
{
case BOOLEAN:
case BYTES:
case DOUBLE:
case FLOAT:
case INT:
case LONG:
case STRING:
case NULL:
putSimpleValue(fieldRecord, recordFieldName, recordFieldType, structAttribValue);
break;
case RECORD:
addOracleRecordToParent(fieldRecord, recordFieldName, recordSchema, (Struct)structAttribValue);
break;
case ARRAY:
putArray(fieldRecord, recordFieldName, recordSchema, (Array)structAttribValue);
break;
case ENUM:
case FIXED:
case MAP:
case UNION:
default:
throw new EventCreationException("unknown struct field type: " + recordFieldName + ":" +
recordFieldType);
}
}
}
/**
* Copies the value of a simple-type event fields from DB field value to an Avro record
*
* @param fieldRecord the Avro record to populate
* @param fieldSchema the schema of the Avro field (must be a record)
* @param dbFieldValue the DB field value from the result set (cannot be null)
* @throws EventCreationException if conversion from the STRUCT type to the Avro type failed
*/
private void putOracleRecord(GenericRecord fieldRecord,
Schema fieldSchema,
Struct dbFieldValue)
throws EventCreationException
{
assert fieldSchema.getType() == Type.RECORD;
assert null != dbFieldValue;
try
{
List<Field> fields = fieldSchema.getFields();
Object[] structAttribs = dbFieldValue.getAttributes();
if (fields.size() != structAttribs.length)
throw new EventCreationException("Avro field number mismatch: avro schema field# =" +
fields.size() + " ; struct " +
dbFieldValue.getSQLTypeName() + " field# = " +
structAttribs.length);
for (Field field: fields)
{
processRecordField(fieldRecord, field, structAttribs);
}
}
catch (SQLException e)
{
throw new EventCreationException("creation of field " + fieldSchema.getFullName(), e);
}
}
/**
* Copies the value of a simple-type event fields from DB field value to an Avro record
* @param record the Avro record to populate
* @param schemaFieldName the name of the Avro field
* @param avroFieldType the type of the Avro field
* @param databaseFieldValue the JDBC field value from the ResultSet (cannot be null)
* @throws EventCreationException if the conversion from JDBC type to Avro type failed
*/
private void putSimpleValue(GenericRecord record,
String schemaFieldName,
Type avroFieldType,
Object databaseFieldValue)
throws EventCreationException
{
assert null != databaseFieldValue;
switch(avroFieldType)
{
case BOOLEAN:
record.put(schemaFieldName, ((Boolean)databaseFieldValue).booleanValue());
break;
case BYTES:
if (databaseFieldValue instanceof byte[])
{
record.put(schemaFieldName, ByteBuffer.wrap((byte[]) databaseFieldValue));
}
else
{
record.put(schemaFieldName, extractBlobBytes((Blob)databaseFieldValue, schemaFieldName));
}
break;
case DOUBLE:
record.put(schemaFieldName, ((Number)databaseFieldValue).doubleValue());
break;
case FLOAT:
record.put(schemaFieldName, ((Number)databaseFieldValue).floatValue());
break;
case INT:
record.put(schemaFieldName, ((Number)databaseFieldValue).intValue());
break;
case LONG:
Class<?> timestampClass = null, dateClass = null;
Method timestampValueMethod = null;
try
{
timestampClass = OracleJarUtils.loadClass("oracle.sql.TIMESTAMP");
dateClass = OracleJarUtils.loadClass("oracle.sql.DATE");
timestampValueMethod = timestampClass.getMethod("timestampValue");
} catch (Exception e)
{
String errMsg = "Cannot convert " + databaseFieldValue.getClass()
+ " to long for field " + schemaFieldName + " Unable to get oracle datatypes " + e.getMessage();
throw new EventCreationException(errMsg);
}
if(databaseFieldValue instanceof Timestamp)
{
long time = ((Timestamp) databaseFieldValue).getTime();
record.put(schemaFieldName, time);
}
else if(databaseFieldValue instanceof Date)
{
long time = ((Date) databaseFieldValue).getTime();
record.put(schemaFieldName, time);
}
else if(timestampClass.isInstance(databaseFieldValue))
{
try
{
Object tsc = timestampClass.cast(databaseFieldValue);
Timestamp tsValue = (Timestamp) timestampValueMethod.invoke(tsc);
long time = tsValue.getTime();
record.put(schemaFieldName, time);
}
catch(Exception ex)
{
throw new EventCreationException("SQLException reading oracle.sql.TIMESTAMP value for field "
+ schemaFieldName, ex);
}
}
else if(dateClass.isInstance(databaseFieldValue))
{
try
{
Object dsc = dateClass.cast(databaseFieldValue);
Timestamp tsValue = (Timestamp) timestampValueMethod.invoke(dsc);
long time = tsValue.getTime();
record.put(schemaFieldName, time);
}
catch(Exception ex)
{
throw new EventCreationException("SQLException reading oracle.sql.TIMESTAMP value for field "
+ schemaFieldName, ex);
}
}
/**
* This needs to stay after Oracle.sql.Timestamp because the timestamp class extends/implements the Number,BigDecimal classes,
* so it will pass as a number in the instanceof check. To avoid this we stick to this order.
*/
else if(databaseFieldValue instanceof Number)
{
long lvalue = ((Number) databaseFieldValue).longValue();
record.put(schemaFieldName, lvalue);
}
else
{
throw new EventCreationException("Cannot convert " + databaseFieldValue.getClass()
+ " to long for field " + schemaFieldName);
}
break;
case STRING:
if(databaseFieldValue instanceof Clob)
{
String text = extractClobText((Clob)databaseFieldValue, schemaFieldName);
record.put(schemaFieldName, text);
}
else if (databaseFieldValue instanceof SQLXML)
{
SQLXML xmlInst = (SQLXML) databaseFieldValue;
try
{
record.put(schemaFieldName,xmlInst.getString());
}
catch (SQLException e)
{
throw new EventCreationException("Cannot convert " + databaseFieldValue.getClass() +
" to string field " + schemaFieldName + " cause: " + e);
}
}
else
{
String text = databaseFieldValue.toString();
record.put(schemaFieldName, text);
}
break;
case NULL:
record.put(schemaFieldName, null);
break;
default:
throw new EventCreationException("unknown simple type " + avroFieldType.toString() +
" for field " + schemaFieldName);
}
}
private void put(GenericRecord record, Field field, Object databaseFieldValue)
throws EventCreationException
{
// Get the field name and type from the event schema
String schemaFieldName = field.name();
Schema fieldSchema = SchemaHelper.unwindUnionSchema(field); // == field.schema() if not a union
Type avroFieldType = fieldSchema.getType();
if (databaseFieldValue == null)
{
// The field value was null. If the field is nullable then we do nothing. Otherwise this is an error.
boolean isNullAllowedInSchema = SchemaHelper.isNullable(field);
if (!isNullAllowedInSchema)
{
throw new EventCreationException("Null value not allowed for field " + schemaFieldName);
}
}
else
{
if (_log.isTraceEnabled())
{
_log.trace("record.put(\"" + schemaFieldName + "\", (" + avroFieldType + ") \"" + databaseFieldValue + "\"");
}
try
{
switch (avroFieldType)
{
case BOOLEAN:
case BYTES:
case DOUBLE:
case FLOAT:
case INT:
case LONG:
case STRING:
case NULL:
putSimpleValue(record, schemaFieldName, avroFieldType, databaseFieldValue);
break;
case RECORD:
addOracleRecordToParent(record, schemaFieldName, fieldSchema, (Struct)databaseFieldValue);
break;
case ARRAY:
putArray(record, schemaFieldName, fieldSchema, (Array)databaseFieldValue);
break;
case ENUM: // exists in some Espresso schemas: don't blindly cut and paste!
case MAP: // ditto
case FIXED:
case UNION: // shouldn't be possible, given unwindUnionSchema() call above
default:
throw new EventCreationException("Don't know how to populate this type of field: " + avroFieldType);
}
}
catch(ClassCastException ex)
{
throw new EventCreationException("Type conversion error for field name (" + field.name() +
") in source " + _sourceId + ". Value was: " + databaseFieldValue +
" avro field was: " + avroFieldType, ex);
}
}
}
/**
* Read the BLOB passed and return the value as a byte[].
*/
public static ByteBuffer extractBlobBytes(Blob blob, String fieldName)
throws EventCreationException
{
if(blob == null)
return null;
try
{
byte[] bytes = blob.getBytes(1, (int) blob.length());
return ByteBuffer.wrap(bytes);
}
catch(SQLException ex)
{
throw new EventCreationException("SQLException reading BLOB value for field " + fieldName, ex);
}
}
/**
* Read the CLOB passed and return the value as a String.
*/
public static String extractClobText(Clob clob, String fieldName)
throws EventCreationException
{
if(clob == null)
return null;
try
{
long length = clob.length();
if(length <= Integer.MAX_VALUE)
{
String s = clob.getSubString(1, (int)length);
return s;
}
else
{
Reader reader = null;
try
{
reader = clob.getCharacterStream();
StringWriter writer = new StringWriter();
char[] buffer = new char[1024];
int n;
while((n = reader.read(buffer)) != -1)
{
writer.write(buffer, 0, n);
}
return writer.toString();
}
catch(IOException ex)
{
throw new SQLException("IOException reading from CLOB column.", ex);
}
finally
{
if(reader != null)
{
try
{
reader.close();
}
catch(IOException ex)
{
// ignore this
}
}
}
}
}
catch(SQLException ex)
{
throw new EventCreationException("SQLException reading CLOB value for field " + fieldName, ex);
}
}
}