Package ke.go.moh.oec.oecsm.sync.data

Source Code of ke.go.moh.oec.oecsm.sync.data.DataSynchronizer

/* ***** BEGIN LICENSE BLOCK *****
* Version: MPL 1.1
*
* The contents of this file are subject to the Mozilla Public License Version
* 1.1 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.mozilla.org/MPL/
*
* Software distributed under the License is distributed on an "AS IS" basis,
* WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
* for the specific language governing rights and limitations under the
* License.
*
* The Original Code is OpenEMRConnect.
*
* The Initial Developer of the Original Code is International Training &
* Education Center for Health (I-TECH) <http://www.go2itech.org/>
*
* Portions created by the Initial Developer are Copyright (C) 2011
* the Initial Developer. All Rights Reserved.
*
* Contributor(s):
*
* ***** END LICENSE BLOCK ***** */
package ke.go.moh.oec.oecsm.sync.data;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Level;
import ke.go.moh.oec.lib.Mediator;
import ke.go.moh.oec.oecsm.bridge.DatabaseConnector;
import ke.go.moh.oec.oecsm.bridge.Transaction;
import ke.go.moh.oec.oecsm.bridge.TransactionConverter;
import ke.go.moh.oec.oecsm.data.Cell;
import ke.go.moh.oec.oecsm.data.Column;
import ke.go.moh.oec.oecsm.data.DataTransaction;
import ke.go.moh.oec.oecsm.data.Database;
import ke.go.moh.oec.oecsm.data.LoggableTransaction;
import ke.go.moh.oec.oecsm.data.LoggableTransactionDatum;
import ke.go.moh.oec.oecsm.data.Table;
import ke.go.moh.oec.oecsm.data.TransactionType;
import ke.go.moh.oec.oecsm.exceptions.DriverNotFoundException;
import ke.go.moh.oec.oecsm.exceptions.InaccessibleConfigurationFileException;
import ke.go.moh.oec.oecsm.sync.data.resultsets.ShadowResultSet;
import ke.go.moh.oec.oecsm.sync.data.resultsets.SourceResultSet;
import ke.go.moh.oec.oecsm.sync.schema.ShadowSchemaMiner;

/**
* @date Aug 19, 2010
*
* @author Gitahi Ng'ang'a
*/
public class DataSynchronizer extends DatabaseConnector {

    private boolean sourceRsHasRecords = false;
    private boolean shadowRsHasRecords = false;

    public void synchronize() throws InaccessibleConfigurationFileException, DriverNotFoundException, SQLException {
        SourceDataMiner sourceDataMiner = new SourceDataMiner();
        ShadowDataMiner shadowDataMiner = new ShadowDataMiner();
        sourceDataMiner.start();
        shadowDataMiner.start();
        Database shadowDb = new ShadowSchemaMiner().mine(true);

        for (Table table : shadowDb.getTableList()) {
            SourceResultSet sourceRs = sourceDataMiner.mine(table);
            ShadowResultSet shadowRs = shadowDataMiner.mine(table);

            sourceRsHasRecords = sourceRs.next();
            shadowRsHasRecords = shadowRs.next();

            String tempSourcePk = null;
            boolean sourceRsMovedNext = true;
            while (sourceRsHasRecords || shadowRsHasRecords) {
                if (sourceRsHasRecords && !shadowRsHasRecords) {
                    String sourcePk = sourceRs.getString("PK");
                    insert(table, sourcePk, sourceRs);
                } else if (!sourceRsHasRecords && shadowRsHasRecords) {
                    delete(table, shadowRs);
                } else if (sourceRsHasRecords && shadowRsHasRecords) {
                    String sourcePk;
                    if (!sourceRsMovedNext) {
                        sourcePk = tempSourcePk;
                    } else {
                        sourcePk = sourceRs.getString("PK");
                    }
                    tempSourcePk = sourcePk;

                    String shadowPk = shadowRs.getCell("PK").getData();
                    if (sourcePk.compareTo(shadowPk) < 0) {
                        insert(table, sourcePk, sourceRs);
                        sourceRsMovedNext = true;
                    } else if (sourcePk.compareTo(shadowPk) > 0) {
                        delete(table, shadowRs);
                        sourceRsMovedNext = false;
                    } else if (sourcePk.compareTo(shadowPk) == 0) {
                        update(table, sourcePk, sourceRs, shadowRs);
                        sourceRsMovedNext = true;
                    }
                }
            }
            sourceRs.close();
            shadowRs.close();
        }
        sourceDataMiner.finish();
        shadowDataMiner.finish();
    }

    private String curePk(String pk) {
        return pk.replace(" ", "#").replace("-", "_").replace(".", "?");
    }

    private void insert(Table table, String sourcePk, SourceResultSet sourceRs) throws SQLException {
        List<Transaction> transactionList = new ArrayList<Transaction>();
        LoggableTransaction loggableTransaction = new LoggableTransaction(table, TransactionType.INSERT);
        List<LoggableTransactionDatum> loggableTransactionDatumList = new ArrayList<LoggableTransactionDatum>();
        String pk = null;
        for (Column column : table.getColumnList()) {
//            if (pk == null) {
//                pk = sourceRs.getString("PK");
//            }
            Cell cell = new Cell(sourcePk, sourceRs.getString(column));
            cell.setColumn(column);
            transactionList.add(new DataTransaction(cell, TransactionType.INSERT));
            loggableTransactionDatumList.add(new LoggableTransactionDatum(cell, loggableTransaction));
        }
        loggableTransaction.setLoggableTransactionDatumList(loggableTransactionDatumList);
        transactionList.add(loggableTransaction);
        processTransactions(transactionList);
        sourceRsHasRecords = sourceRs.next();
    }

    private void delete(Table table, ShadowResultSet shadowRs) throws SQLException {
        List<Transaction> transactionList = new ArrayList<Transaction>();
        LoggableTransaction loggableTransaction = new LoggableTransaction(table, TransactionType.DELETE);
        List<LoggableTransactionDatum> loggableTransactionDatumList = new ArrayList<LoggableTransactionDatum>();
        for (Column column : table.getColumnList()) {
            Cell cell = shadowRs.getCell(column);
            cell.setColumn(column);
            transactionList.add(new DataTransaction(cell, TransactionType.DELETE));
            loggableTransactionDatumList.add(new LoggableTransactionDatum(cell, loggableTransaction));
        }
        loggableTransaction.setLoggableTransactionDatumList(loggableTransactionDatumList);
        transactionList.add(loggableTransaction);
        processTransactions(transactionList);
        shadowRsHasRecords = shadowRs.next();
    }

    private void update(Table table, String sourcePk, SourceResultSet sourceRs, ShadowResultSet shadowRs) throws SQLException {
        List<Transaction> transactionList = new ArrayList<Transaction>();
        LoggableTransaction loggableTransaction = new LoggableTransaction(table, TransactionType.UPDATE);
        List<LoggableTransactionDatum> loggableTransactionDatumList = new ArrayList<LoggableTransactionDatum>();
//        String pk = null;
        for (Column column : table.getColumnList()) {
            /*
             * Ensure cells associated with new columns are created
             */
//            if (pk == null) {
//                pk = sourceRs.getString("PK");              
//            }
            Cell shadowCell = shadowRs.getCell(column);
            String sourceColumnValue = sourceRs.getString(column);
            if (shadowCell == null) {
                Cell cell = new Cell(shadowRs.getCell("PK").getData(), sourceColumnValue);
                cell.setColumn(column);
                transactionList.add(new DataTransaction(cell, TransactionType.INSERT));
                loggableTransactionDatumList.add(new LoggableTransactionDatum(cell, loggableTransaction));
                continue;
            }
            /*
             * Judge which columns we need to compare
             */
            String shadowColumnValue = shadowRs.getCell(column).getData();
            boolean pkColumn = table.getPk().contains(column.getName() + ",");
            boolean compare = (shadowColumnValue != null && sourceColumnValue != null)//compare only if both values are not null and this column is not (part of) the primary key
                    && !pkColumn;
            boolean ignore = (shadowColumnValue == null && sourceColumnValue == null);//if both values are null, no update has taken place
            boolean update = false;
            if (compare) {
                update = !shadowColumnValue.equals(sourceColumnValue);//if both values are not equal then an update has taken place
            } else {
                if (!pkColumn) {
                    update = !ignore;//if only one value is null then an update has taken place
                }
            }
            if (update) {
                Cell cell = shadowRs.getCell(column);
                cell.setData(sourceColumnValue);
                cell.setColumn(column);
                transactionList.add(new DataTransaction(cell, TransactionType.UPDATE));
                loggableTransactionDatumList.add(new LoggableTransactionDatum(cell, loggableTransaction));
            }
        }
        if (!loggableTransactionDatumList.isEmpty()) {
            // String pk = null;
            Cell cell = shadowRs.getCell("PK");
//            cell.setData(sourceRs.getString("PK"));
            cell.setData(sourcePk);

            //TODO: Accommodate cases where the pk is not the first column of the table
            cell.setColumn(table.getColumnList().get(0));
            loggableTransactionDatumList.add(new LoggableTransactionDatum(cell, loggableTransaction));

            loggableTransaction.setLoggableTransactionDatumList(loggableTransactionDatumList);
            transactionList.add(loggableTransaction);
        }
        processTransactions(transactionList);
        sourceRsHasRecords = sourceRs.next();
        shadowRsHasRecords = shadowRs.next();
    }
    /*
     * Applies a set of transactions to the database
     */

    private void processTransactions(List<Transaction> dataTransactionList) throws SQLException {
        try {
            connectToShadow();
            connection.setAutoCommit(false);
            Statement statement = connection.createStatement();
            for (Transaction dataTransaction : dataTransactionList) {
                Mediator.getLogger(DataSynchronizer.class.getName()).log(Level.FINEST, TransactionConverter.convertToSQL(dataTransaction));
                if (statement.executeUpdate(TransactionConverter.convertToSQL(dataTransaction), Statement.RETURN_GENERATED_KEYS) == 1) {
                    if (dataTransaction.getClass() == LoggableTransaction.class) {
                        ResultSet rs = statement.getGeneratedKeys();
                        LoggableTransaction loggableTransaction = (LoggableTransaction) dataTransaction;
                        if (rs.next()) {
                            loggableTransaction.setId(rs.getInt(1));
                        }
                        for (LoggableTransactionDatum loggableTransactionDatum : loggableTransaction.getLoggableTransactionDatumList()) {
                            Mediator.getLogger(DataSynchronizer.class.getName()).log(Level.FINEST, TransactionConverter.convertToSQL(loggableTransactionDatum));
                            statement.executeUpdate(TransactionConverter.convertToSQL(loggableTransactionDatum));
                        }
                    }
                }
            }
            connection.commit();
            statement.close();
        } catch (Exception ex) {
            Mediator.getLogger(DataSynchronizer.class.getName()).log(Level.SEVERE, null, ex);
        } finally {
            disconnectFromShadow();
        }
    }
}
TOP

Related Classes of ke.go.moh.oec.oecsm.sync.data.DataSynchronizer

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.