Package com.alibaba.otter.node.etl.common.db

Source Code of com.alibaba.otter.node.etl.common.db.DbDialectTest

/*
* Copyright (C) 2010-2101 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.otter.node.etl.common.db;

import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Types;
import java.util.ArrayList;
import java.util.List;

import org.jtester.annotations.SpringBeanByName;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.PreparedStatementCallback;
import org.springframework.jdbc.core.StatementCreatorUtils;
import org.springframework.jdbc.support.lob.LobCreator;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionTemplate;
import org.testng.annotations.Test;

import com.alibaba.otter.node.etl.BaseDbTest;
import com.alibaba.otter.node.etl.common.db.dialect.DbDialect;
import com.alibaba.otter.node.etl.common.db.dialect.DbDialectFactory;
import com.alibaba.otter.node.etl.common.db.dialect.SqlTemplate;
import com.alibaba.otter.node.etl.common.db.dialect.mysql.MysqlDialect;
import com.alibaba.otter.node.etl.common.db.dialect.oracle.OracleDialect;
import com.alibaba.otter.node.etl.common.db.utils.SqlUtils;
import com.alibaba.otter.shared.common.model.config.data.db.DbDataMedia;

public class DbDialectTest extends BaseDbTest {

    private static final String MYSQL_SCHEMA_NAME  = "srf";
    private static final String ORACLE_SCHEMA_NAME = "srf";
    private static final String TABLE_NAME         = "columns";
    @SpringBeanByName
    private DbDialectFactory    dbDialectFactory;
    // private String[] allColumns = { "alias_name", "amount", "text_b",
    // "text_c", "curr_date",
    // "gmt_create", "gmt_modify", "id", "name" };

    private String[]            pkColumns          = { "id", "name" };
    private String[]            columns            = { "alias_name", "amount", "text_b", "text_c", "curr_date",
            "gmt_create", "gmt_modify"            };

    private String[]            pkColumnValues     = { "1", "ljh" };

    // [116,101,120,116,95,98]
    private String[]            columnValues       = { "hello", "100.01", "text_b", "text_c", "2011-01-01",
            "2011-01-01 11:11:11", "2011-01-01 11:11:11" };

    @Test(expectedExceptions = RuntimeException.class)
    public void test_mysql() {
        DbDataMedia media = getMysqlMedia();
        final DbDialect dbDialect = dbDialectFactory.getDbDialect(2L, media.getSource());
        want.object(dbDialect).clazIs(MysqlDialect.class);

        final SqlTemplate sqlTemplate = dbDialect.getSqlTemplate();
        final JdbcTemplate jdbcTemplate = dbDialect.getJdbcTemplate();
        final TransactionTemplate transactionTemplate = dbDialect.getTransactionTemplate();
        final int[] pkColumnTypes = { Types.INTEGER, Types.VARCHAR };
        final int[] columnTypes = { Types.CHAR, Types.DECIMAL, Types.BLOB, Types.CLOB, Types.DATE, Types.TIMESTAMP,
                Types.TIMESTAMP };
        transactionTemplate.execute(new TransactionCallback() {

            public Object doInTransaction(TransactionStatus status) {
                int affect = 0;
                String sql = null;
                // 执行insert
                sql = sqlTemplate.getInsertSql(MYSQL_SCHEMA_NAME, TABLE_NAME, pkColumns, columns);
                System.out.println(sql);
                affect = (Integer) jdbcTemplate.execute(sql, new PreparedStatementCallback() {

                    public Object doInPreparedStatement(PreparedStatement ps) throws SQLException, DataAccessException {
                        doPreparedStatement(ps,
                            dbDialect,
                            toTypes(columnTypes, pkColumnTypes),
                            toValues(columnValues, pkColumnValues));
                        return ps.executeUpdate();
                    }

                });
                want.number(affect).isEqualTo(1);
                // 执行update
                sql = sqlTemplate.getUpdateSql(MYSQL_SCHEMA_NAME, TABLE_NAME, pkColumns, columns);
                System.out.println(sql);
                affect = (Integer) jdbcTemplate.execute(sql, new PreparedStatementCallback() {

                    public Object doInPreparedStatement(PreparedStatement ps) throws SQLException, DataAccessException {
                        doPreparedStatement(ps,
                            dbDialect,
                            toTypes(columnTypes, pkColumnTypes),
                            toValues(columnValues, pkColumnValues));
                        return ps.executeUpdate();
                    }

                });
                want.number(affect).isEqualTo(1);
                // 执行deleate
                sql = sqlTemplate.getDeleteSql(MYSQL_SCHEMA_NAME, TABLE_NAME, pkColumns);
                System.out.println(sql);
                affect = (Integer) jdbcTemplate.execute(sql, new PreparedStatementCallback() {

                    public Object doInPreparedStatement(PreparedStatement ps) throws SQLException, DataAccessException {
                        doPreparedStatement(ps, dbDialect, toTypes(pkColumnTypes), toValues(pkColumnValues));
                        return ps.executeUpdate();
                    }

                });
                want.number(affect).isEqualTo(1);
                // 执行merge
                sql = sqlTemplate.getMergeSql(MYSQL_SCHEMA_NAME, TABLE_NAME, pkColumns, columns, null);
                System.out.println(sql);
                affect = (Integer) jdbcTemplate.execute(sql, new PreparedStatementCallback() {

                    public Object doInPreparedStatement(PreparedStatement ps) throws SQLException, DataAccessException {
                        doPreparedStatement(ps,
                            dbDialect,
                            toTypes(columnTypes, pkColumnTypes),
                            toValues(columnValues, pkColumnValues));
                        return ps.executeUpdate();
                    }

                });
                want.number(affect).isEqualTo(1);
                throw new RuntimeException("rollback");
            }
        });

    }

    @Test(expectedExceptions = RuntimeException.class)
    public void test_oracle() {
        DbDataMedia media = getOracleMedia();
        final DbDialect dbDialect = dbDialectFactory.getDbDialect(1L, media.getSource());

        want.object(dbDialect).clazIs(OracleDialect.class);
        final SqlTemplate sqlTemplate = dbDialect.getSqlTemplate();
        final JdbcTemplate jdbcTemplate = dbDialect.getJdbcTemplate();
        final TransactionTemplate transactionTemplate = dbDialect.getTransactionTemplate();
        final int[] pkColumnTypes = { Types.NUMERIC, Types.VARCHAR };
        final int[] columnTypes = { Types.CHAR, Types.NUMERIC, Types.BLOB, Types.CLOB, Types.DATE, Types.DATE,
                Types.DATE };
        transactionTemplate.execute(new TransactionCallback() {

            public Object doInTransaction(TransactionStatus status) {
                int affect = 0;
                String sql = null;
                // 执行insert
                sql = sqlTemplate.getInsertSql(ORACLE_SCHEMA_NAME, TABLE_NAME, pkColumns, columns);
                System.out.println(sql);
                affect = (Integer) jdbcTemplate.execute(sql, new PreparedStatementCallback() {

                    public Object doInPreparedStatement(PreparedStatement ps) throws SQLException, DataAccessException {
                        doPreparedStatement(ps,
                            dbDialect,
                            toTypes(columnTypes, pkColumnTypes),
                            toValues(columnValues, pkColumnValues));
                        return ps.executeUpdate();
                    }

                });
                want.number(affect).isEqualTo(1);
                // 执行update
                sql = sqlTemplate.getUpdateSql(ORACLE_SCHEMA_NAME, TABLE_NAME, pkColumns, columns);
                System.out.println(sql);
                affect = (Integer) jdbcTemplate.execute(sql, new PreparedStatementCallback() {

                    public Object doInPreparedStatement(PreparedStatement ps) throws SQLException, DataAccessException {
                        doPreparedStatement(ps,
                            dbDialect,
                            toTypes(columnTypes, pkColumnTypes),
                            toValues(columnValues, pkColumnValues));
                        return ps.executeUpdate();
                    }

                });
                want.number(affect).isEqualTo(1);
                // 执行deleate
                sql = sqlTemplate.getDeleteSql(ORACLE_SCHEMA_NAME, TABLE_NAME, pkColumns);
                System.out.println(sql);
                affect = (Integer) jdbcTemplate.execute(sql, new PreparedStatementCallback() {

                    public Object doInPreparedStatement(PreparedStatement ps) throws SQLException, DataAccessException {
                        doPreparedStatement(ps, dbDialect, toTypes(pkColumnTypes), toValues(pkColumnValues));
                        return ps.executeUpdate();
                    }

                });
                want.number(affect).isEqualTo(1);
                // 执行merge
                sql = sqlTemplate.getMergeSql(ORACLE_SCHEMA_NAME, TABLE_NAME, pkColumns, columns, null);
                System.out.println(sql);

                affect = (Integer) jdbcTemplate.execute(sql, new PreparedStatementCallback() {

                    public Object doInPreparedStatement(PreparedStatement ps) throws SQLException, DataAccessException {
                        doPreparedStatement(ps,
                            dbDialect,
                            toTypes(columnTypes, pkColumnTypes),
                            toValues(columnValues, pkColumnValues));
                        return ps.executeUpdate();
                    }

                });
                want.number(affect).isEqualTo(1);
                throw new RuntimeException("rollback");
            }
        });
    }

    private Integer[] toTypes(int[]... types) {
        List<Integer> result = new ArrayList<Integer>();
        for (int[] type : types) {
            for (int t : type) {
                result.add(t);
            }
        }

        return result.toArray(new Integer[result.size()]);
    }

    private String[] toValues(String[]... values) {
        List<String> result = new ArrayList<String>();
        for (String[] value : values) {
            for (String v : value) {
                result.add(v);
            }
        }

        return result.toArray(new String[result.size()]);
    }

    private void doPreparedStatement(PreparedStatement ps, final DbDialect dbDialect, final Integer[] columnTypes,
                                     final String[] columnValues) throws SQLException {
        LobCreator lobCreator = null;
        for (int i = 0; i < columnTypes.length; i++) {
            int paramIndex = i + 1;
            String sqlValue = columnValues[i];
            int sqlType = columnTypes[i];
            Object param = SqlUtils.stringToSqlValue(sqlValue,
                sqlType,
                SqlUtils.isTextType(sqlType),
                dbDialect.isEmptyStringNulled());
            switch (sqlType) {
                case Types.CLOB:
                    if (lobCreator == null) {
                        lobCreator = dbDialect.getLobHandler().getLobCreator();
                    }

                    lobCreator.setClobAsString(ps, paramIndex, (String) param);
                    break;

                case Types.BLOB:
                    if (lobCreator == null) {
                        lobCreator = dbDialect.getLobHandler().getLobCreator();
                    }

                    lobCreator.setBlobAsBytes(ps, paramIndex, (byte[]) param);
                    break;

                default:
                    StatementCreatorUtils.setParameterValue(ps, paramIndex, sqlType, null, param);
                    break;
            }
        }
    }
}
TOP

Related Classes of com.alibaba.otter.node.etl.common.db.DbDialectTest

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.