Package com.linkedin.databus.bootstrap.utils

Source Code of com.linkedin.databus.bootstrap.utils.BootstrapSrcDBEventReader$PrimaryKeyTxn

package com.linkedin.databus.bootstrap.utils;
/*
*
* Copyright 2013 LinkedIn Corp. All rights reserved
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*   http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/


import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLClassLoader;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import javax.sql.DataSource;

import org.apache.log4j.Logger;

import com.linkedin.databus.core.DbusEventKey;
import com.linkedin.databus.core.DbusEventKey.KeyType;
import com.linkedin.databus.core.UnsupportedKeyException;
import com.linkedin.databus.core.util.ConfigBuilder;
import com.linkedin.databus.core.util.InvalidConfigException;
import com.linkedin.databus.core.util.RateMonitor;
import com.linkedin.databus.core.util.StringUtils;
import com.linkedin.databus2.core.DatabusException;
import com.linkedin.databus2.producers.EventCreationException;
import com.linkedin.databus2.producers.db.EventReaderSummary;
import com.linkedin.databus2.producers.db.OracleTriggerMonitoredSourceInfo;
import com.linkedin.databus2.producers.db.ReadEventCycleSummary;
import com.linkedin.databus2.producers.db.SourceDBEventReader;
import com.linkedin.databus2.util.DBHelper;

public class BootstrapSrcDBEventReader
      extends DbusSeederBaseThread
      implements SourceDBEventReader
{
  private static final Logger LOG = Logger.getLogger(BootstrapSrcDBEventReader.class);
  private static final long MILLISEC_TO_MIN = (1000 * 60);

  private final List<OracleTriggerMonitoredSourceInfo> _sources;
  private final DataSource _dataSource;
  private final BootstrapEventBuffer _bootstrapSeedWriter;
  private final int _numRetries;
  private final int _numRowsPrefetch;
  private final int _LOBPrefetchSize;
  private final int _commitInterval;
  private final long _sinceSCN;
  private final int _numRowsPerQuery;
  private final boolean _enableNumRowsQuery;
  private final Map<String, Long> _lastRows;
  private final Map<String, String> _lastKeys;
  private final Map<String, File> _keyTxnFilesMap;
  private final Map<String, Integer> _keyTxnBufferSizeMap;
  private final Map<String, String> _pKeyNameMap;
  private final Map<String,DbusEventKey.KeyType> _pKeyTypeMap;
  private final Map<String, String> _pKeyIndexMap;
  private final Map<String, String> _queryHintMap;
  private final Map<String,String> _eventQueryMap;
  private final Map<String,String> _beginSrcKeyMap;
  private final Map<String,String> _endSrcKeyMap;
    private final Method _setLobPrefetchSizeMethod;
    private final Class<?> _oraclePreparedStatementClass;

  public Map<String, File> getKeyTxnFilesMap() {
    return _keyTxnFilesMap;
  }

  public Map<String, Integer> getKeyTxnBufferSizeMap() {
    return _keyTxnBufferSizeMap;
  }

  public Map<String, String> getpKeyNameMap() {
    return _pKeyNameMap;
  }

  public Map<String, DbusEventKey.KeyType> getpKeyTypeMap() {
    return _pKeyTypeMap;
  }

  public BootstrapSrcDBEventReader(DataSource dataSource,
                   BootstrapEventBuffer eventBuffer,
                               StaticConfig   config,
                               List<OracleTriggerMonitoredSourceInfo> sources,
                               Map<String, Long> lastRows,
                               Map<String, String> lastKeys,
                               long           sinceSCN
                               )
    throws Exception
  {
      super("BootstrapSrcDBEventReader");
      List<OracleTriggerMonitoredSourceInfo> sourcesTemp = new ArrayList<OracleTriggerMonitoredSourceInfo>();
      sourcesTemp.addAll(sources);
      _sources = Collections.unmodifiableList(sourcesTemp);
    _dataSource = dataSource;
    _bootstrapSeedWriter = eventBuffer;
    _keyTxnFilesMap = new HashMap<String, File>();
    _numRowsPerQuery = config.getNumRowsPerQuery();
    _keyTxnBufferSizeMap = config.getKeyTxnBufferSizeMap();

    Map<String,String> keyTxnFiles = config.getKeyTxnFilesMap();

    Iterator<Entry<String, String>> itr = keyTxnFiles.entrySet().iterator();

    while (itr.hasNext())
    {
      Entry<String,String> entry = itr.next();
      LOG.info("Adding KeyTxnMapFile Entry :" + entry);
      _keyTxnFilesMap.put(entry.getKey(),new File(entry.getValue()));
    }

    _enableNumRowsQuery = config.isEnableNumRowsQuery();
    _commitInterval = config.getCommitInterval();
    _numRowsPrefetch = config.getNumRowsPrefetch();
    _LOBPrefetchSize = config.getLOBPrefetchSize();
    _numRetries      = config.getNumRetries();
    _sinceSCN = sinceSCN;
    _lastRows = new HashMap<String,Long>(lastRows);
    _lastKeys = new HashMap<String,String>(lastKeys);
    _pKeyNameMap = config.getPKeyNameMap();
    _pKeyTypeMap = config.getPKeyTypeMap();
    _pKeyIndexMap = config.getPKeyIndexMap();
    _queryHintMap = config.getQueryHintMap();
    _eventQueryMap = config.getEventQueryMap();
    _beginSrcKeyMap = config.getBeginSrcKeyMap();
    _endSrcKeyMap = config.getEndSrcKeyMap();

        File file = new File("ojdbc6-11.2.0.2.0.jar");
    URL ojdbcJarFile = file.toURI().toURL();
    URLClassLoader cl = URLClassLoader.newInstance(new URL[]{ojdbcJarFile});
    _oraclePreparedStatementClass = cl.loadClass("oracle.jdbc.OraclePreparedStatement");
    _setLobPrefetchSizeMethod = _oraclePreparedStatementClass.getMethod("setLobPrefetchSize", int.class);

    validate();
  }


  public void validate()
    throws Exception
  {

    if ( null == _pKeyTypeMap)
      throw new Exception("_pKeyTypeMap cannot be null !!");

    if ( null == _pKeyNameMap)
      throw new Exception("_pKeyNameMap cannot be null !!");

    boolean isNullIndex = (null == _pKeyIndexMap);
    boolean isNullQueryHint = ( null == _queryHintMap);

    if ( isNullIndex && isNullQueryHint)
      throw new Exception("Index and Query Hint both cannot be null !!");

    for (OracleTriggerMonitoredSourceInfo s : _sources)
    {
      if ( null == _pKeyTypeMap.get(s.getEventView()))
          throw new Exception("pKey Type for Source (" + s.getEventView() + ") not provided !!");

      if ( null == _pKeyNameMap.get(s.getEventView()))
        throw new Exception("pKey Name for Source (" + s.getEventView() + ") not provided !!");

      if ( (isNullIndex || (null == _pKeyIndexMap.get(s.getEventView())))
          && (isNullQueryHint || ( null == _queryHintMap.get(s.getEventView()))))
      {
        throw new Exception("Both pkey Index and Query Hint for source (" + s.getEventView() + ") not provided !!");
      }
    }
  }

  @Override
  public void run()
  {
    try
    {
      readEventsFromAllSources(_sinceSCN);
    } catch (Exception ex) {
     LOG.error("Got Error when executing readEventsFromAllSources !!",ex);
    }
    LOG.info(Thread.currentThread().getName() + " done seeding ||");
  }

  @Override
  public ReadEventCycleSummary readEventsFromAllSources(long sinceSCN)
      throws DatabusException, EventCreationException,
      UnsupportedKeyException
  {
      List<EventReaderSummary> summaries = new ArrayList<EventReaderSummary>();
      long maxScn = EventReaderSummary.NO_EVENTS_SCN;
      long endScn = maxScn;
      boolean error = false;

      long startTS = System.currentTimeMillis();
      try
      {
          _rate.start();
          _rate.suspend();


          Connection conn = null;
          try
          {
            conn = _dataSource.getConnection();
            LOG.info("Oracle JDBC Version :" + conn.getMetaData().getDriverVersion());
          } finally {
            DBHelper.close(conn);
          }

        if ( ! _sources.isEmpty())
        {
          // Script assumes seeding is done for one schema at a time
          // just use one source to get the schema name for sy$txlog
          maxScn = getMaxScn(_sources.get(0));
        }

        for ( OracleTriggerMonitoredSourceInfo sourceInfo : _sources)
        {
          LOG.info("Bootstrapping " + sourceInfo.getEventView());
          _bootstrapSeedWriter.start(maxScn);
          EventReaderSummary summary = readEventsForSource(sourceInfo, maxScn);
          // Script assumes seeding is done for one schema at a time
          // just use one source to get the schema name for sy$txlog
          endScn = getMaxScn(_sources.get(0));
          _bootstrapSeedWriter.endEvents(BootstrapEventBuffer.END_OF_SOURCE, endScn, null);
          summaries.add(summary);
        }
      } catch (Exception ex) {
        error = true;
        throw new DatabusException(ex);
      } finally {
        // Notify writer that I am done
        if ( error )
        {
          _bootstrapSeedWriter.endEvents(BootstrapEventBuffer.ERROR_CODE, endScn,null);
          LOG.error("Seeder stopping unexpectedly !!");
        } else {
          _bootstrapSeedWriter.endEvents(BootstrapEventBuffer.END_OF_FILE, endScn,null);
          LOG.info("Completed Seeding !!");
        }
        LOG.info("Start SCN :" + maxScn);
        LOG.info("End SCN :" + endScn);
      }
        long endTS = System.currentTimeMillis();

      ReadEventCycleSummary cycleSummary = new ReadEventCycleSummary("seeder",
                                                                     summaries, maxScn,
                                                                     (endTS - startTS));
    return cycleSummary;
  }

  public long getNumRows(Connection conn, String table)
    throws SQLException
  {
    String sql = generateCountQuery(table);
      PreparedStatement pstmt = null;
      ResultSet rs = null;
      long numRows = 0;
    try
    {
      conn  = _dataSource.getConnection();
      LOG.info("NumRows Query :" + sql);
      pstmt = conn.prepareStatement(sql);
      rs = pstmt.executeQuery();
      rs.next();
      numRows = rs.getLong(1);
    } finally {
      DBHelper.close(rs,pstmt,conn);
    }
    return numRows;
  }

  private EventReaderSummary readEventsForSource(OracleTriggerMonitoredSourceInfo sourceInfo, long maxScn)
  throws DatabusException, EventCreationException,
      UnsupportedKeyException,SQLException, IOException
  {
    int retryMax = _numRetries;
    int numRetry = 0;
      Connection conn = null;
      PreparedStatement pstmt = null;
      ResultSet rs = null;
    KeyType keyType = _pKeyTypeMap.get(sourceInfo.getEventView());
    String keyName = _pKeyNameMap.get(sourceInfo.getEventView());
    String sql = _eventQueryMap.get(sourceInfo.getEventView());
    String endSrcKey = _endSrcKeyMap.get(sourceInfo.getEventView());

    if (sql == null)
    {
      sql = generateEventQuery2(sourceInfo,keyName, keyType, getPKIndex(sourceInfo), getQueryHint(sourceInfo));
    }
    LOG.info("Chunked  Query for Source (" + sourceInfo + ") is :" + sql);
    LOG.info("EndSrcKey for source (" + sourceInfo +") is :" + endSrcKey);

    PrimaryKeyTxn endKeyTxn = null;
    if ((null != endSrcKey) && (! endSrcKey.trim().isEmpty()) )
    {
      if ( KeyType.LONG == keyType )
        endKeyTxn = new PrimaryKeyTxn(new Long(endSrcKey));
      else
        endKeyTxn = new PrimaryKeyTxn(endSrcKey);
    }



    long timestamp = System.currentTimeMillis();
    int numRowsFetched = 0;
    long totalEventSize = 0;
    long timeStart = System.currentTimeMillis();
    long checkpointInterval = _commitInterval;
    boolean done = false;
    long lastTime = timeStart;
    long numRows = 0;
    PrimaryKeyTxn pKey = null;

    String minKeySQL = generateMinKeyQuery(sourceInfo, keyName);

    String srcName = sourceInfo.getEventView();
    LOG.info("Bootstrapping for Source :" + srcName);
    String lastKey = _lastKeys.get(sourceInfo.getEventView());
    File f = _keyTxnFilesMap.get(srcName);
    FileWriter oStream = new FileWriter(f,f.exists());
    BufferedWriter keyTxnWriter = new BufferedWriter(oStream,_keyTxnBufferSizeMap.get(srcName));

    _bootstrapSeedWriter.startEvents();
    RateMonitor seedingRate = new RateMonitor("Seeding Rate");
    RateMonitor queryRate = new RateMonitor("Query Rate");
    seedingRate.start();
    seedingRate.suspend();
    queryRate.start();
    queryRate.suspend();
    boolean isException = false;
    long totProcessTime =0;
    try
    {
      conn  = _dataSource.getConnection();
      pstmt = conn.prepareStatement(sql);

      if ( _enableNumRowsQuery )
        numRows = getNumRows(conn, getTableName(sourceInfo));
      else
        numRows = -1;

      long currRowId = _lastRows.get(sourceInfo.getEventView());

      /**
       * First Key to be seeded will be decided in the following order:
       * 1. Use bootstrap_seeder_state's last srcKey as the key for the first chunk.
       * 2. If (1) is empty, use passed-in begin srcKey.
       * 3. If (2) is also empty, use Oracle's minKey as the first Chunk Key.
       */
      if ( null == lastKey )
      {
        lastKey = _beginSrcKeyMap.get(sourceInfo.getEventView());
        LOG.info("No last Src Key available in bootstrap_seeder_state for source ("  + sourceInfo + ". Trying beginSrc Key from config :" + lastKey);
      }

      if ( (null == lastKey) || (lastKey.trim().isEmpty()) )
      {
        if ( KeyType.LONG == keyType )
          pKey = new PrimaryKeyTxn(executeAndGetLong(minKeySQL));
        else
          pKey = new PrimaryKeyTxn(executeAndGetString(minKeySQL));
      } else {
        if ( KeyType.LONG == keyType )
          pKey = new PrimaryKeyTxn(Long.parseLong(lastKey));
        else
          pKey = new PrimaryKeyTxn(lastKey);
      }

      PrimaryKeyTxn lastRoundKeyTxn = new PrimaryKeyTxn(pKey);
      PrimaryKeyTxn lastKeyTxn = new PrimaryKeyTxn(pKey);
      long numUniqueKeysThisRound = 0;

      boolean first  = true;
      _rate.resume();
      while ( ! done )
      {
        LOG.info("MinKey being used for this round:" + pKey );
        numUniqueKeysThisRound = 0;
        try
        {
          lastRoundKeyTxn.copyFrom(pKey);

          if ( KeyType.LONG == keyType )
          {
            pstmt.setLong(1,pKey.getKey());
          } else {
            String key = pKey.getKeyStr();
            pstmt.setString(1, key);
          }

          pstmt.setLong(2, _numRowsPerQuery);
          pstmt.setFetchSize(_numRowsPrefetch);

          if ( _oraclePreparedStatementClass.isInstance(pstmt))
          {
            try
            {
              _setLobPrefetchSizeMethod.invoke(pstmt, _LOBPrefetchSize);
            } catch (Exception e)
            {
              throw new EventCreationException("Unable to set Lob Prefetch size" + e.getMessage());
            }
          }

          LOG.info("Executing Oracle Query :" + sql + ". Key: " + pKey + ",NumRows: " +  _numRowsPerQuery);
          queryRate.resume();
          rs = pstmt.executeQuery();
          queryRate.suspend();

          LOG.info("Total Query Latency :" + queryRate.getDuration()/1000000000L);
          long totLatency = 0;
          long txnId = 0;
          int numRowsThisRound = 0;
          seedingRate.resume();
          while (rs.next())
          {
            _rate.tick();
            seedingRate.tick();
            currRowId++;
            txnId = rs.getLong(2);
            if ( KeyType.LONG == keyType )
            {
              pKey.setKeyTxn(rs.getLong(1),txnId);
            } else {
              String key = rs.getString(1);
              pKey.setKeyStrTxn(key,txnId);
            }

            //Write TXN to file
            pKey.writeTo(keyTxnWriter);

            //LOG.info("TXNId is :" + txnId + ",RowId is :" + currRowId);
            long start = System.nanoTime();
            long eventSize = sourceInfo.getFactory().createAndAppendEvent(maxScn, timestamp, rs,
                _bootstrapSeedWriter, false, null);
            long latency = System.nanoTime() - start;
            totLatency += latency;
            totalEventSize += eventSize;
            totProcessTime += (totLatency/1000*1000);
            numRowsFetched++;
            numRowsThisRound++;

            if ( lastKeyTxn.compareKey(pKey) != 0)
            {
              numUniqueKeysThisRound++;
              lastKeyTxn.copyFrom(pKey);
            }

            if ( numRowsFetched%checkpointInterval == 0 )
            {
              // Commit this batch and reinit
              _bootstrapSeedWriter.endEvents(currRowId,timestamp,null);
              keyTxnWriter.flush();
              _bootstrapSeedWriter.startEvents();
              long procTime = totLatency/1000000000;
              long currTime = System.currentTimeMillis();
              long diff = (currTime - lastTime)/1000;
              long timeSinceStart = (currTime - timeStart)/1000;
              double currRate = _rate.getRate();
              currRate = (currRate <= 0) ? 1 : currRate;

              if ( _enableNumRowsQuery)
              {
                double remTime = (numRows - currRowId)/(currRate);

                LOG.info("Processed " + checkpointInterval + " rows in " + diff
                    + " seconds, Processing Time (seconds) so far :" + (procTime)
                    + ",Seconds elapsed since start :" + (timeSinceStart)
                    + ",Approx Seconds remaining :" + remTime
                    + ",Overall Row Rate:" + _rate.getRate() + "(" + seedingRate.getRate() + ")" +
                    ",NumRows Fetched so far:" + numRowsFetched +
                    ". TotalEventSize :" + totalEventSize);
              } else {
                LOG.info("Processed " + checkpointInterval + " rows in " + diff
                    + " seconds, Processing Time (seconds) so far :" + (procTime)
                    + ",Seconds elapsed since start :" + (timeSinceStart)
                    + ",Overall Row Rate:" + _rate.getRate() + "(" + seedingRate.getRate() + ")" +
                    ",NumRows Fetched so far:" + numRowsFetched +
                    ". TotalEventSize :" + totalEventSize);
              }
              lastTime = currTime;
            }

            if ( (null != endKeyTxn) &&  (endKeyTxn.compareKey(lastKeyTxn) < 0) )
            {
              LOG.info("Seeding to be stopped for current source as it has completed seeding upto endSrckey :" + endKeyTxn
                  + ", Current SrcKey :" + lastKeyTxn);
              break;
            }

          }
          seedingRate.suspend();

          if ( (numRowsThisRound <= 1) ||
              ((numRowsThisRound < _numRowsPerQuery) && (numUniqueKeysThisRound <= 1)))
          {
            LOG.info("Seeding Done for source :" + sourceInfo.getEventView() + ", numRowsThisRound :"
                         + numRowsThisRound + ", _numRowsPerQuery :" + _numRowsPerQuery
                         + ", numUniqueKeys :" + numUniqueKeysThisRound);
            done = true;
          } else if ((numRowsThisRound == _numRowsPerQuery) && (numUniqueKeysThisRound <= 1)) {
            String msg = "Seeding stuck at infinte loop for source : " + sourceInfo.getEventView() + ", numRowsThisRound :"
                         + numRowsThisRound + ", _numRowsPerQuery :" + _numRowsPerQuery
                         + ", numUniqueKeys :" + numUniqueKeysThisRound + ", lastChunkKey :" + lastRoundKeyTxn;
            LOG.error(msg);
            throw new DatabusException(msg);
          } else if ( null != endKeyTxn) {
            if ( endKeyTxn.compareKey(lastKeyTxn) < 0 ) {
              LOG.info("Seeding stopped for source :" + sourceInfo.getEventView()
                    + ", as it has completed seeding upto the endSrckey :" + endKeyTxn + ", numRowsThisRound :"
                           + numRowsThisRound + ", _numRowsPerQuery :" + _numRowsPerQuery
                           + ", numUniqueKeys :" + numUniqueKeysThisRound + " , Current SrcKey :" + lastKeyTxn);
              done = true;
            }
          }

          if (currRowId > 0 && (!first || done))
          {
            currRowId--; //Since next time, we will read the last seen record again
          }
          LOG.info("about to call end events with currRowId = " + currRowId);
          first = false;
          _bootstrapSeedWriter.endEvents(currRowId,timestamp,null);
          isException = false;
        } catch (SQLException ex) {
          LOG.error("Got SQLException for source (" + sourceInfo + ")", ex);

          _bootstrapSeedWriter.rollbackEvents();

          numRetry++;
          isException = true;

          if (numRetry >= retryMax)
          {
            throw new DatabusException("Error: Reached max retries for reading/processing bootstrap", ex);
          }
        } finally {
          DBHelper.close(rs);
          rs = null;
        }
      }
    }  catch (DatabusException ex) {

      isException = true;
      throw ex;

    finally {

      DBHelper.close(rs,pstmt,conn);
      keyTxnWriter.close();
      rs = null;
      _rate.suspend();

      if ( ! isException)
      {
        dedupeKeyTxnFile(_keyTxnFilesMap.get(srcName), keyType);
      }
    }
    long timeEnd = System.currentTimeMillis();
    long elapsedMin = (timeEnd - timeStart)/(MILLISEC_TO_MIN);
    LOG.info("Processed " + numRowsFetched + " rows of Source: " +  sourceInfo.getSourceName() + " in " + elapsedMin + " minutes" );
    return new EventReaderSummary(sourceInfo.getSourceId(), sourceInfo.getSourceName(), -1,
                                  numRowsFetched, totalEventSize, (timeEnd - timeStart),totProcessTime,0,0,0);
  }

  private long getMaxScn(OracleTriggerMonitoredSourceInfo sourceInfo)
    throws SQLException
  {
    String schema = ( sourceInfo.getEventSchema() == null) ? "" :
                                   sourceInfo.getEventSchema() + ".";
    String table = schema + "sy$txlog";
    /*
      StringBuilder sql = new StringBuilder();
      sql.append("select max(scn) from ");
      sql.append(table);
      String query = sql.toString();
      */
      String query = "select " +
          "max(" + schema + "sync_core.getScn(scn,ora_rowscn)) " +
          "from " + table + " where " +
          "scn >= (select max(scn) from " + table + ")";
      long maxScn = executeAndGetLong(query);
      return maxScn;
  }

  private long executeAndGetLong(String query)
     throws SQLException
  {
    long val = -1;
      Connection conn = null;
      PreparedStatement pstmt = null;
      ResultSet rs = null;
      try
      {
          conn = _dataSource.getConnection();
          pstmt = conn.prepareStatement(query);
          rs = pstmt.executeQuery();
          boolean ret = rs.next();
          assert(ret);
          val = rs.getLong(1);

          LOG.info("Query:" + query + ",Result is :" + val);

      } catch ( SQLException sqlEx) {
        LOG.error("Got error while executing query:" + query, sqlEx);
        throw sqlEx;
      } finally {
          DBHelper.close(rs, pstmt, conn);
      }
    return val;
  }

  private String executeAndGetString(String query)
     throws SQLException
  {
    String val = null;
      Connection conn = null;
      PreparedStatement pstmt = null;
      ResultSet rs = null;
      try
      {
          conn = _dataSource.getConnection();
          pstmt = conn.prepareStatement(query);
          rs = pstmt.executeQuery();
          boolean ret = rs.next();
          assert(ret);
          val = rs.getString(1);

          LOG.info("Query:" + query + ",Result is :" + val);

      } catch ( SQLException sqlEx) {
        LOG.error("Got error while executing query:" + query, sqlEx);
        throw sqlEx;
      } finally {
          DBHelper.close(rs, pstmt, conn);
      }
    return val;
  }

  public static String getTableName(OracleTriggerMonitoredSourceInfo sourceInfo)
  {
    String schema = ( sourceInfo.getEventSchema() == null) ? "" :
                    sourceInfo.getEventSchema() + ".";
    String table = schema + "sy$" + sourceInfo.getEventView();
    return table;
  }

  public String getPKIndex(OracleTriggerMonitoredSourceInfo sourceInfo)
  {
    String index = _pKeyIndexMap.get(sourceInfo.getEventView());

    if ( null == index)
    {
      index = sourceInfo.getEventView() "_pk";
    }
    return index;
  }

  public String getQueryHint(OracleTriggerMonitoredSourceInfo sourceInfo)
  {
     String index = _queryHintMap.get(sourceInfo.getEventView());

     if ( null == index)
     {
       return null;
     }

     return index;
  }
  /*
  private String generateEventQuery(MonitoredSourceInfo sourceInfo)
  {
    String table = getTableName(sourceInfo);
      StringBuilder sql = new StringBuilder();
      sql.append("select txn, src.* from ");
      sql.append(table);
      sql.append(" src");
      sql.append(" where txn > ? and txn <= ?");
    return sql.toString();
  }
  */

  public static String generateEventQuery2(OracleTriggerMonitoredSourceInfo sourceInfo, String keyName, KeyType keyType, String pkIndex, String queryHint)
  {
    return generateEventQuery2(getTableName(sourceInfo),keyName, keyType, pkIndex, queryHint);
  }

  public static String generateEventQuery2(String table, String keyName, KeyType keyType, String pkIndex, String queryHint)
  {
      StringBuilder sql = new StringBuilder();

      sql.append("select * from (");
      if ( (null == queryHint) || ( queryHint.isEmpty()))
        sql.append("select /*+ INDEX(src ").append(pkIndex).append(") */ ");
      else
         sql.append("select /*+ " + queryHint + " */ ");

      sql.append(keyName).append( " keyn,");
      sql.append(" txn txnid, src.*, ROW_NUMBER() OVER(order by src.").append(keyName).append(" asc) as row_counter from ");
      sql.append(table);
      sql.append(" src");
    sql.append(" where src." + keyName + " >= ?");
      sql.append(" ) where row_counter <= ?");
    return sql.toString();
  }

  /**
   * Same as previous query; but doesn't include keyName in query result; all keys greater than keyName
   * @param table
   * @param keyName : databus schema's key - e.g. 'key' or 'id' or 'memberId'
   * @param keyType : TODO: remove - unused here
   * @param pkIndex : used to develop hint used in oracle query
   * @param queryHint: other hints
   * @return
   */
   public static String generateEventQueryAudit(String table, String keyName, KeyType keyType, String pkIndex, String queryHint)
    {
        StringBuilder sql = new StringBuilder();

        sql.append("select * from (");
        if ( (null == queryHint) || ( queryHint.isEmpty()))
          sql.append("select /*+ INDEX(src ").append(pkIndex).append(") */ ");
        else
           sql.append("select /*+ " + queryHint + " */ ");

        sql.append(keyName).append( " keyn,");
        sql.append(" txn txnid, src.*, ROW_NUMBER() OVER(order by src.").append(keyName).append(" asc) as row_counter from ");
        sql.append(table);
        sql.append(" src");
      sql.append(" where src." + keyName + " > ?");
        sql.append(" ) where row_counter <= ?");
      return sql.toString();
    }


  public static String generateMinKeyQuery(OracleTriggerMonitoredSourceInfo sourceInfo, String keyName)
  {
    return generateMinKeyQuery(getTableName(sourceInfo), keyName );
  }

  public static String generateMinKeyQuery(String table, String keyName)
  {
      StringBuilder sql = new StringBuilder();
      sql.append("select min(" + keyName + ") ");
      sql.append("from " + table);
    return sql.toString();
  }

  private static String generateCountQuery(String table)
  {
      StringBuilder sql = new StringBuilder();
      sql.append("select count(*) from ");
      sql.append(table);
      return sql.toString();
  }

  private void dedupe(String file, String tmpFile, KeyType type)
    throws Exception
  {
    BufferedReader reader = null;
    BufferedWriter writer = null;
    PrimaryKeyTxn keyTxn = null;
    PrimaryKeyTxn oldKeyTxn = null;
    try
    {
      reader = new BufferedReader(StringUtils.createFileReader(file));
      writer = new BufferedWriter(StringUtils.createFileWriter(tmpFile));

      keyTxn = new PrimaryKeyTxn(Long.MIN_VALUE);
      oldKeyTxn = new PrimaryKeyTxn(Long.MIN_VALUE);
      keyTxn.setType(type);
      oldKeyTxn.setType(type);
      boolean first = true;
      while (true)
      {
        String line = reader.readLine();

        if (null == line)
          break;

        keyTxn.readFrom(line);

        long cmp = keyTxn.compareKey(oldKeyTxn);

        if ( cmp != 0)
        {
          if ( ! first )
          {
            oldKeyTxn.writeTo(writer);
          }
          oldKeyTxn.copyFrom(keyTxn);
        }
        first = false;
      }
      oldKeyTxn.writeTo(writer);
    } catch ( Exception ex ) {
      LOG.error("Got exception while deduping key_txns", ex);
    } finally {
      if ( null != reader)
        reader.close();

      if ( null != writer)
        writer.close();
    }
  }

  private void dedupeKeyTxnFile(File keyTxnFile, KeyType keyType)
  {
    boolean numericKey = (keyType == KeyType.LONG);
    StringBuilder cmd = new StringBuilder();
    String tmpFile = keyTxnFile.getAbsolutePath() + ".tmp";

    String backupFile = keyTxnFile.getAbsolutePath() + ".pre";

    LOG.info("Post Processing the KeyTXN File :" + keyTxnFile.toString());
    cmd.append("sort -t ");
    cmd.append(PrimaryKeyTxn.DELIMITER);
    cmd.append(" -k 1");
    if ( numericKey ) cmd.append("n");
    cmd.append(" -k 2nr ");
    cmd.append(keyTxnFile.getAbsolutePath());
    cmd.append(" -o ").append(tmpFile);

    String cmd1 = "cp " + keyTxnFile.getAbsolutePath() + " " + backupFile;
    String cmd2 = cmd.toString();
    String cmd3 = "rm " + tmpFile + " " ;
    try
    {
      Runtime rt = Runtime.getRuntime();

      LOG.info("Executing command :" + cmd1);
      Process pr1 = rt.exec(cmd1);

      int res = pr1.waitFor();

      if ( res != 0)
      {
              LOG.error("**********************");
              LOG.error("Error Executing CMD (" + cmd1 + "), Error: (STDOUT=" + getStream(pr1.getInputStream()) + ") (STDERR=" +  getStream(pr1.getErrorStream()) + "). Result was :" + res);
              LOG.error("**********************");
      }

      LOG.info("Executing command :" + cmd2);
      Process pr2 = rt.exec(cmd2);
      res = pr2.waitFor();
      if ( res != 0)
      {
          LOG.error("**********************");
        LOG.error("Error Executing CMD (" + cmd2 + "), Error: (STDOUT=" + getStream(pr2.getInputStream()) + ") (STDERR=" +  getStream(pr2.getErrorStream()) + "). Result was :" + res);
                LOG.error("**********************");
      }

      LOG.info("Removing duplicate Entries from the sorted list");
      dedupe(tmpFile,keyTxnFile.getAbsolutePath(), keyType);

      LOG.info("Executing command :" + cmd3);
      Process pr3 = rt.exec(cmd3);
      res = pr3.waitFor();
      if ( res != 0)
      {
              LOG.error("**********************");
              LOG.error("Error Executing CMD (" + cmd3 + "), Error: (STDOUT=" + getStream(pr3.getInputStream()) + ") (STDERR=" +  getStream(pr3.getErrorStream()) + "). Result was :" + res);
              LOG.error("**********************");
      }

      LOG.info("Post Processing the KeyTXN File done successfully :" + keyTxnFile.toString());

    } catch ( Exception io) {
      LOG.error("Postprocessing the KeyTXNFile :" + keyTxnFile + " failed !!", io);
      LOG.info("CMD1 :" + cmd1);
      LOG.info("CMD2 :" + cmd2);
      LOG.info("CMD3 :" + cmd3);
    }
  }

  private String getStream(InputStream stream)
    throws IOException
  {
    BufferedReader reader =
        new BufferedReader(new InputStreamReader(stream, StringUtils.DEFAULT_CHARSET));
    StringBuilder str = new StringBuilder();

    while (true)
    {
      String s = reader.readLine();

      if ( null == s)
        break;

      str.append(s);
    }
    return str.toString();
  }

  public static class StaticConfig
  {

    public int getNumRowsPrefetch() {
      return _numRowsPrefetch;
    }

    public Map<String, String> getEventQueryMap() {
      // TODO Auto-generated method stub
      return _eventQueryMap;
    }

    public int getLOBPrefetchSize() {
      return _LOBPrefetchSize;
    }

    public int getCommitInterval() {
      return _commitInterval;
    }

    public int getNumRetries() {
      return _numRetries;
    }

    public Map<String, String> getKeyTxnFilesMap()
    {
      return _keyTxnFilesMap;
    }

    public Map<String, Integer> getKeyTxnBufferSizeMap()
    {
      return _keyTxnBufferSizeMap;
    }

    public Map<String, String> getPKeyNameMap()
    {
      return _pKeyNameMap;
    }

        public Map<String, String> getQueryHintMap()
        {
            return _queryHintMap;
        }


    public Map<String, DbusEventKey.KeyType> getPKeyTypeMap()
    {
      return _pKeyTypeMap;
    }

    public Map<String, String> getPKeyIndexMap()
    {
      return _pKeyIndexMap;
    }

    public int getNumRowsPerQuery()
    {
      return _numRowsPerQuery;
    }

    public boolean isEnableNumRowsQuery() {
      return _enableNumRowsQuery;
    }

        public Map<String, String> getBeginSrcKeyMap()
        {
            return _beginSrcKeyMap;
        }

        public Map<String, String> getEndSrcKeyMap()
        {
            return _endSrcKeyMap;
        }

    public StaticConfig(boolean enableNumRowsQuery,
        int numRowsPrefetch, int LOBPrefetchSize,
        int commitInterval, int numRetries,
        int numRowsPerQuery,
        Map<String, String> keyTxnFilesMap,
        Map<String, Integer> keyTxnBufferSizeMap,
        Map<String, String> pKeyNameMap,
        Map<String, DbusEventKey.KeyType> pKeyTypeMap,
        Map<String, String> pKeyIndexMap,
        Map<String, String> queryHintMap,
        Map<String, String> eventQueryMap,
        Map<String, String> beginSrcKeyMap,
        Map<String, String> endSrcKeyMap)
    {
      super();
      this._enableNumRowsQuery = enableNumRowsQuery;
      this._numRowsPrefetch = numRowsPrefetch;
      this._LOBPrefetchSize = LOBPrefetchSize;
      this._commitInterval = commitInterval;
      this._numRetries = numRetries;
      this._numRowsPerQuery = numRowsPerQuery;
      this._keyTxnFilesMap = keyTxnFilesMap;
      this._keyTxnBufferSizeMap = keyTxnBufferSizeMap;
      this._pKeyNameMap = pKeyNameMap;
      this._pKeyTypeMap = pKeyTypeMap;
      this._pKeyIndexMap = pKeyIndexMap;
      this._queryHintMap = queryHintMap;
      this._eventQueryMap = eventQueryMap;
      this._beginSrcKeyMap = beginSrcKeyMap;
      this._endSrcKeyMap = endSrcKeyMap;

    }

    private final boolean _enableNumRowsQuery;
    private final int _numRowsPrefetch;
    private final int _LOBPrefetchSize;
    private final int _commitInterval;
    private final int _numRetries;
    private final int _numRowsPerQuery;
    private final Map<String, String> _keyTxnFilesMap;
    private final Map<String, Integer> _keyTxnBufferSizeMap;
    private final Map<String, String> _pKeyNameMap;
    private final Map<String, DbusEventKey.KeyType> _pKeyTypeMap;
    private final Map<String, String> _pKeyIndexMap;
    private final Map<String, String> _queryHintMap;
    private final Map<String, String> _eventQueryMap;
    private final Map<String, String> _beginSrcKeyMap;
    private final Map<String, String> _endSrcKeyMap;
  }

  public static class Config implements ConfigBuilder<StaticConfig>
  {
    private static final boolean DEFAULT_ENABLE_NUM_ROWS_QUERY = false;
    private static final int DEFAULT_NUM_ROWS_PREFETCH = 10;
    private static final int DEFAULT_LOB_PREFETCH_SIZE = 4000;
    private static final int DEFAULT_COMMIT_INTERVAL = 10000;
    private static final int DEFAULT_NUM_ROWS_PER_QUERY = 100000;
    private static final int DEFAULT_NUM_RETRIES = 2;
    private static final String DEFAULT_KEYTXN_MAP_FILES = "keyTxnMapFile.txt" ;
    private static final Integer DEFAULT_BUFFER_SIZE  = 0;
    private static final String DEFAULT_PKEY_NAME = "key";
    private static final String DEFAULT_PKEY_TYPE = "LONG";
      private static final String DEFAULT_QUERY_HINT = "";
      private static final String DEFAULT_BEGINSRC_KEY = "";
      private static final String DEFAULT_ENDSRC_KEY = "";


    public Config()
    {
      _enableNumRowsQuery = DEFAULT_ENABLE_NUM_ROWS_QUERY;
      _numRowsPrefetch = DEFAULT_NUM_ROWS_PREFETCH;
      _LOBPrefetchSize = DEFAULT_LOB_PREFETCH_SIZE;
      _commitInterval = DEFAULT_COMMIT_INTERVAL ;
      _numRetries = DEFAULT_NUM_RETRIES;
      _numRowsPerQuery = DEFAULT_NUM_ROWS_PER_QUERY;
      _keyTxnFilesMap = new HashMap<String, String>();
      _keyTxnBufferSizeMap = new HashMap<String, Integer>();
      _pKeyNameMap = new HashMap<String, String>();
      _pKeyTypeMap = new HashMap<String, String>();
      _pKeyIndexMap = new HashMap<String, String>();
      _queryHintMap = new HashMap<String, String>();
      _eventQueryMap = new HashMap<String, String>();
      _beginSrcKeyMap = new HashMap<String, String>();
      _endSrcKeyMap = new HashMap<String, String>();
    }

    @Override
    public StaticConfig build() throws InvalidConfigException
    {
      LOG.info("enableNumRowsQuery:" + _enableNumRowsQuery);
      LOG.info("NumRowsPrefetch:" + _numRowsPrefetch);
      LOG.info("_LOBPrefetchSize:" + _LOBPrefetchSize);
      LOG.info("Commit Interval:" + _commitInterval);
      LOG.info("Num Retries:" + _numRetries);
      LOG.info("_numRowsPerQuery:" + _numRowsPerQuery);
      LOG.info("_keyTxnFilesMap:" + _keyTxnFilesMap);
      LOG.info("_keyTxnBufferSizeMap:" + _keyTxnBufferSizeMap);
      LOG.info("_pKeyNameMap:" + _pKeyNameMap);
      LOG.info("_pKeyTypeMap:" + _pKeyTypeMap);
      LOG.info("_pKeyIndexMap:" + _pKeyIndexMap);
      LOG.info("_queryHintMap:" + _queryHintMap);
      LOG.info("_beginSrcKeyMap:" + _beginSrcKeyMap);
      LOG.info("_endSrcKeyMap:" + _endSrcKeyMap);


      HashMap<String, DbusEventKey.KeyType> pKeyTypeMap = new HashMap<String, DbusEventKey.KeyType>();
      Iterator<Entry<String, String>> itr = _pKeyTypeMap.entrySet().iterator();
      while ( itr.hasNext())
      {
        Entry<String, String> entry = itr.next();
        DbusEventKey.KeyType kType = DbusEventKey.KeyType.valueOf(entry.getValue());
        pKeyTypeMap.put(entry.getKey(), kType);
      }

      return new StaticConfig(_enableNumRowsQuery, _numRowsPrefetch,_LOBPrefetchSize,
                  _commitInterval,_numRetries, _numRowsPerQuery,
                  _keyTxnFilesMap, _keyTxnBufferSizeMap,
                  _pKeyNameMap, pKeyTypeMap, _pKeyIndexMap, _queryHintMap,_eventQueryMap, _beginSrcKeyMap, _endSrcKeyMap);
    }

    public int getNumRowsPrefetch() {
      return _numRowsPrefetch;
    }

    public void setNumRowsPrefetch(int numRowsPrefetch) {
      this._numRowsPrefetch = numRowsPrefetch;
    }

    public int getLOBPrefetchSize() {
      return _LOBPrefetchSize;
    }

    public void setLOBPrefetchSize(int lOBPrefetchSize) {
      _LOBPrefetchSize = lOBPrefetchSize;
    }

    public int getCommitInterval() {
      return _commitInterval;
    }

    public void setCommitInterval(int commitInterval) {
      this._commitInterval = commitInterval;
    }

    public int getNumRetries() {
      return _numRetries;
    }

    public void setNumRetries(int numRetries) {
      this._numRetries = numRetries;
    }

    public String getKeyTxnMapFile(String sourceName)
    {
      String file = _keyTxnFilesMap.get(sourceName);

      if ( null == file)
      {
        _keyTxnFilesMap.put(sourceName, DEFAULT_KEYTXN_MAP_FILES);
        return DEFAULT_KEYTXN_MAP_FILES;
      }
      return file;
    }

    public void setKeyTxnMapFile(String sourceName, String file)
    {
      _keyTxnFilesMap.put(sourceName, file);
    }

    public  String getPKeyName(String srcName)
    {
      String key = _pKeyNameMap.get(srcName);
      if ( null == key)
      {
        _pKeyNameMap.put(srcName, DEFAULT_PKEY_NAME);
        return DEFAULT_PKEY_NAME;
      }
      return key;
    }

    public void setBeginSrcKey(String srcName, String key)
    {
      _beginSrcKeyMap.put(srcName, key);
    }

    public String getBeginSrcKey(String srcName)
    {
          String key = _beginSrcKeyMap.get(srcName);

          if ( null == key)
          {
          key = DEFAULT_BEGINSRC_KEY;
          _beginSrcKeyMap.put(srcName, key);
          }

          return key;
    }

    public void setEndSrcKey(String srcName, String key)
    {
      _endSrcKeyMap.put(srcName, key);
    }

    public String getEndSrcKey(String srcName)
    {
          String key = _endSrcKeyMap.get(srcName);

          if ( null == key)
          {
          key = DEFAULT_ENDSRC_KEY;
          _endSrcKeyMap.put(srcName, key);
          }

          return key;
    }

    public void setQueryHint(String srcName, String key)
    {
      _queryHintMap.put(srcName, key);
    }

      public  String getQueryHint(String srcName)
      {
        String key = _queryHintMap.get(srcName);

        if ( null == key)
        {

          _queryHintMap.put(srcName, DEFAULT_QUERY_HINT);

          return DEFAULT_QUERY_HINT;
        }

        return key;
      }

      public void setPKeyName(String srcName, String key)
      {
         _pKeyNameMap.put(srcName, key);
      }

    public String getPKeyType(String srcName)
    {
      String type = _pKeyTypeMap.get(srcName);
      if ( null == type)
      {
        _pKeyTypeMap.put(srcName, DEFAULT_PKEY_NAME);
        return DEFAULT_PKEY_TYPE;
      }
      return type;
    }

    public void setPKeyType(String srcName, String type)
    {
      _pKeyTypeMap.put(srcName, type);
    }

    public String getPKeyIndex(String srcName)
    {
      String index = _pKeyIndexMap.get(srcName);
      if ( null == index)
      {
        String index2 =  srcName + "_pk";
        _pKeyIndexMap.put(srcName, index2);
        return index2;
      }
      return index;
    }

    public void setPKeyIndex(String srcName, String index)
    {
      _pKeyIndexMap.put(srcName, index);
    }

    public Integer getKeyTxnFileBufferSize(String sourceName)
    {
      Integer size = _keyTxnBufferSizeMap.get(sourceName);

      if ( null == size)
      {
        _keyTxnBufferSizeMap.put(sourceName, DEFAULT_BUFFER_SIZE);
        return DEFAULT_BUFFER_SIZE;
      }
      return size;
    }

    public void setKeyTxnFileBufferSize(String sourceName, Integer size)
    {
      _keyTxnBufferSizeMap.put(sourceName, size);
    }

    public int getNumRowsPerQuery() {
      return _numRowsPerQuery;
    }

    public void setNumRowsPerQuery(int numRowsPerQuery) {
      this._numRowsPerQuery = numRowsPerQuery;
    }


    public boolean isEnableNumRowsQuery() {
      return _enableNumRowsQuery;
    }

    public void setEnableNumRowsQuery(boolean enableNumRowsQuery) {
      this._enableNumRowsQuery = enableNumRowsQuery;
    }

    public void setEventQuery(String name,String value)
    {
      _eventQueryMap.put(name, value);
    }

    public String getEventQuery(String src)
    {
      return _eventQueryMap.get(src);
    }


    private  boolean _enableNumRowsQuery;
    private  int _numRowsPrefetch;
    private  int _LOBPrefetchSize;
    private  int _commitInterval;
    private  int _numRetries;
    private  int _numRowsPerQuery;
    private final Map<String, String> _keyTxnFilesMap;
    private final Map<String, Integer> _keyTxnBufferSizeMap;
    private final Map<String, String> _pKeyNameMap;
    private final Map<String, String> _pKeyTypeMap;
    private final Map<String, String> _pKeyIndexMap;
        private final Map<String, String> _queryHintMap;
        private final Map<String, String> _eventQueryMap;
        private final Map<String, String> _beginSrcKeyMap;
        private final Map<String, String> _endSrcKeyMap;

  }

  public static class PrimaryKeyTxn
  {
    private long key;
    private String keyStr;
    private KeyType keyType;
    private long txn;

    public static final String DELIMITER = "@";


    @Override
    public String toString() {
      return "PrimaryKeyTxn [key=" + key + ", keyStr=" + keyStr
          + ", keyType=" + keyType + ", txn=" + txn + "]";
    }

    public long getKey() {
      return key;
    }

    public long getTxn() {
      return txn;
    }

    public void setType(KeyType t)
    {
      keyType = t;
    }

    public void setKeyTxn(long key, long txn) {
      this.keyType = KeyType.LONG;
      this.txn = txn;
      this.key = key;
    }

    public String getKeyStr() {
      return keyStr;
    }

    public void setKeyStrTxn(String keyStr, long txn) {
      this.keyType = KeyType.STRING;
      this.txn = txn;
      this.keyStr = keyStr;
    }

    public KeyType getKeyType() {
      return keyType;
    }

    public PrimaryKeyTxn(PrimaryKeyTxn keyTxn)
    {
      super();
      copyFrom(keyTxn);
    }

    public PrimaryKeyTxn(long key) {
      super();
      this.key = key;
      this.keyStr = null;
      this.keyType = KeyType.LONG;
      this.txn = -1;
    }

    public PrimaryKeyTxn(String key)
    {
      super();
      this.keyStr = key;
      this.keyType = KeyType.STRING;
      this.key = -1;
      this.txn = -1;
    }

    public void writeTo(BufferedWriter writer)
      throws IOException
    {
      StringBuffer buffer = new StringBuffer();
      if ( KeyType.LONG == keyType)
      {
        buffer.append(key).append(DELIMITER);
      } else {
        buffer.append(keyStr).append(DELIMITER);
      }
      buffer.append(txn).append("\n");
      writer.write(buffer.toString());
    }

    public void readFrom(String line)
      throws IOException
    {
      String[] toks = line.split(DELIMITER);

      if ( KeyType.LONG == keyType)
      {
        key = Long.parseLong(toks[0]);
      } else {
        keyStr = toks[0];
      }
      txn = Long.parseLong(toks[1]);
    }

    public void readFrom(ResultSet rs, String keyName, KeyType keyType)
      throws SQLException
    {
      try
      {
        this.keyType = keyType;
        if (KeyType.LONG == keyType)
        {
          this.key = rs.getLong(keyName);
        } else {
          this.keyStr = rs.getString(keyName);
        }
      } finally {
      }
    }

    public long compareKey(PrimaryKeyTxn key2)
    {
      if ( this.keyType == KeyType.LONG)
      {
        return this.key - key2.getKey();
      } else {
        return this.keyStr.compareTo(key2.getKeyStr());
      }
    }

    public void copyFrom(PrimaryKeyTxn entry)
    {
      keyType = entry.getKeyType();
      key = entry.getKey();
      keyStr = entry.getKeyStr();
      txn = entry.getTxn();
    }

    public long compareTxn(PrimaryKeyTxn txn)
    {
      return this.txn - txn.getTxn();
    }

  }

  @Override
  public List<OracleTriggerMonitoredSourceInfo> getSources() {
    return _sources;
  }

}
TOP

Related Classes of com.linkedin.databus.bootstrap.utils.BootstrapSrcDBEventReader$PrimaryKeyTxn

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.