Package org.mule.transport.jdbc

Source Code of org.mule.transport.jdbc.JdbcConnector

/*
* Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
* The software in this package is published under the terms of the CPAL v1.0
* license, a copy of which has been included with this distribution in the
* LICENSE.txt file.
*/
package org.mule.transport.jdbc;

import org.mule.api.DefaultMuleException;
import org.mule.api.MuleContext;
import org.mule.api.MuleException;
import org.mule.api.MuleMessage;
import org.mule.api.config.MuleProperties;
import org.mule.api.construct.FlowConstruct;
import org.mule.api.endpoint.ImmutableEndpoint;
import org.mule.api.endpoint.InboundEndpoint;
import org.mule.api.endpoint.OutboundEndpoint;
import org.mule.api.expression.ExpressionManager;
import org.mule.api.expression.ExpressionRuntimeException;
import org.mule.api.lifecycle.Disposable;
import org.mule.api.lifecycle.InitialisationException;
import org.mule.api.processor.MessageProcessor;
import org.mule.api.retry.RetryContext;
import org.mule.api.transaction.Transaction;
import org.mule.api.transaction.TransactionException;
import org.mule.api.transport.MessageDispatcher;
import org.mule.api.transport.MessageReceiver;
import org.mule.common.DefaultTestResult;
import org.mule.common.FailureType;
import org.mule.common.TestResult;
import org.mule.common.Testable;
import org.mule.config.ExceptionHelper;
import org.mule.config.i18n.MessageFactory;
import org.mule.transaction.TransactionCoordination;
import org.mule.transport.AbstractConnector;
import org.mule.transport.ConnectException;
import org.mule.transport.MessageDispatcherUtils;
import org.mule.transport.jdbc.sqlstrategy.DefaultSqlStatementStrategyFactory;
import org.mule.transport.jdbc.sqlstrategy.SqlStatementStrategyFactory;
import org.mule.transport.jdbc.xa.CompositeDataSourceDecorator;
import org.mule.util.StringUtils;
import org.mule.util.TemplateParser;

import java.sql.Connection;
import java.sql.SQLException;
import java.text.MessageFormat;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import javax.sql.DataSource;

import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.ResultSetHandler;
import org.apache.commons.lang.BooleanUtils;

public class JdbcConnector extends AbstractConnector implements Testable
{
    public static final String JDBC = "jdbc";

    // These are properties that can be overridden on the Receiver by the endpoint
    // declaration
    public static final String PROPERTY_POLLING_FREQUENCY = "pollingFrequency";
    public static final long DEFAULT_POLLING_FREQUENCY = 1000;

    private static final Pattern STATEMENT_ARGS = TemplateParser.WIGGLY_MULE_TEMPLATE_PATTERN;

    public static final String USE_DISPATCHER_POOL_SYSTEM_PROPERTY = MuleProperties.SYSTEM_PROPERTY_PREFIX
                                                                                + "transport."
                                                                                + JDBC + ".useDispatcherPool";

    private final CompositeDataSourceDecorator databaseDecorator = new CompositeDataSourceDecorator();
    private SqlStatementStrategyFactory sqlStatementStrategyFactory = new DefaultSqlStatementStrategyFactory();

    /* Register the SQL Exception reader if this class gets loaded */
    static
    {
        ExceptionHelper.registerExceptionReader(new SQLExceptionReader());
    }

    protected long pollingFrequency = 0;
    protected Map queries;

    protected DataSource dataSource;
    protected ResultSetHandler resultSetHandler;
    protected QueryRunner queryRunner;

    private int queryTimeout;

    /**
     * Should each DB record be received in a separate transaction or should there be a single transaction for
     * the entire ResultSet?
     */
    protected boolean transactionPerMessage = true;

    private boolean useDispatcherPool = false;
    protected Map<OutboundEndpoint, MessageDispatcher> endpointDispatchers = new ConcurrentHashMap<OutboundEndpoint, MessageDispatcher>();

    public JdbcConnector(MuleContext context)
    {
        super(context);
        useDispatcherPool = BooleanUtils.toBoolean(System.getProperty(USE_DISPATCHER_POOL_SYSTEM_PROPERTY));
    }

    @Override
    protected void doInitialise() throws InitialisationException
    {
        databaseDecorator.init(muleContext);
        createMultipleTransactedReceivers = false;

        if (dataSource == null)
        {
            throw new InitialisationException(MessageFactory.createStaticMessage("Missing data source"), this);
        }
        if (resultSetHandler == null)
        {
            resultSetHandler = new org.apache.commons.dbutils.handlers.MapListHandler(
                new ColumnAliasRowProcessor());
        }
        decorateDataSourceIfRequired();
        if (queryRunner == null)
        {
            if (this.queryTimeout >= 0)
            {
                queryRunner = new ExtendedQueryRunner(dataSource, this.queryTimeout);
            }
            else
            {
                // Fix for MULE-5825 (Mariano Capurro) -> Adding dataSource parameter
                queryRunner = new QueryRunner(dataSource);
            }
        }
    }

    private void decorateDataSourceIfRequired()
    {
        dataSource = databaseDecorator.decorate(dataSource, getName(), muleContext);
    }

    @Override
    public MessageReceiver createReceiver(FlowConstruct flowConstruct, InboundEndpoint endpoint)
            throws Exception
    {
        Map props = endpoint.getProperties();
        if (props != null)
        {
            String tempPolling = (String) props.get(PROPERTY_POLLING_FREQUENCY);
            if (tempPolling != null)
            {
                pollingFrequency = Long.parseLong(tempPolling);
            }
        }

        if (pollingFrequency <= 0)
        {
            pollingFrequency = DEFAULT_POLLING_FREQUENCY;
        }

        String[] params = getReadAndAckStatements(endpoint);
        return getServiceDescriptor().createMessageReceiver(this, flowConstruct, endpoint, params);
    }

    public String[] getReadAndAckStatements(ImmutableEndpoint endpoint)
    {
        String str;

        // Find read statement
        String readStmt;
        if ((str = (String) endpoint.getProperty("sql")) != null)
        {
            readStmt = str;
        }
        else
        {
            readStmt = endpoint.getEndpointURI().getAddress();
        }

        // Find ack statement
        String ackStmt;
        if ((str = (String) endpoint.getProperty("ack")) != null)
        {
            ackStmt = str;
            if ((str = getQuery(endpoint, ackStmt)) != null)
            {
                ackStmt = str;
            }
            ackStmt = ackStmt.trim();
        }
        else
        {
            ackStmt = readStmt + ".ack";
            if ((str = getQuery(endpoint, ackStmt)) != null)
            {
                ackStmt = str.trim();
            }
            else
            {
                ackStmt = null;
            }
        }

        // Translate both using queries map
        if ((str = getQuery(endpoint, readStmt)) != null)
        {
            readStmt = str;
        }
        if (readStmt == null)
        {
            throw new IllegalArgumentException("Read statement should not be null");
        }
        else
        {
            // MULE-3109: trim the readStatement for better user experience
            readStmt = readStmt.trim();
        }

        if (!"select".equalsIgnoreCase(readStmt.substring(0, 6))
            && !"call".equalsIgnoreCase(readStmt.substring(0, 4)))
        {
            throw new IllegalArgumentException(
                "Read statement should be a select sql statement or a stored procedure");
        }
        if (ackStmt != null)
        {
            if (!"insert".equalsIgnoreCase(ackStmt.substring(0, 6))
                && !"update".equalsIgnoreCase(ackStmt.substring(0, 6))
                && !"delete".equalsIgnoreCase(ackStmt.substring(0, 6)))
            {
                throw new IllegalArgumentException(
                    "Ack statement should be an insert / update / delete sql statement");
            }
        }
        return new String[]{readStmt, ackStmt};
    }

    public String getQuery(ImmutableEndpoint endpoint, String stmt)
    {
        Object query = null;
        if (endpoint != null && endpoint.getProperties() != null)
        {
            Object queries = endpoint.getProperties().get("queries");
            if (queries instanceof Map)
            {
                query = ((Map) queries).get(stmt);
            }
        }
        if (query == null)
        {
            if (this.queries != null)
            {
                query = this.queries.get(stmt);
            }
        }
        return query == null ? null : query.toString();
    }

    public Connection getConnection() throws Exception
    {
        Transaction tx = TransactionCoordination.getInstance().getTransaction();
        if (tx != null)
        {
            if (tx.hasResource(dataSource))
            {
                logger.debug("Retrieving connection from current transaction: " + tx);
                return (Connection) tx.getResource(dataSource);
            }
        }
        logger.debug("Retrieving new connection from data source");

        Connection con;
        try
        {
            con = dataSource.getConnection();
        }
        catch (Exception e)
        {
            throw new ConnectException(e, this);
        }

        if (tx != null)
        {
            logger.debug("Binding connection " + con + " to current transaction: " + tx);
            try
            {
                tx.bindResource(dataSource, con);
            }
            catch (TransactionException e)
            {
                JdbcUtils.close(con);
                throw new RuntimeException("Could not bind connection to current transaction: " + tx, e);
            }
        }
        return con;
    }

    public boolean isTransactionPerMessage()
    {
        return transactionPerMessage;
    }

    public void setTransactionPerMessage(boolean transactionPerMessage)
    {
        this.transactionPerMessage = transactionPerMessage;
        if (!transactionPerMessage)
        {
            logger.warn("transactionPerMessage property is set to false so setting createMultipleTransactedReceivers "
                        + "to false also to prevent creation of multiple JdbcMessageReceivers");
            setCreateMultipleTransactedReceivers(transactionPerMessage);
        }
    }

    /**
     * Parse the given statement filling the parameter list and return the ready to use statement.
     *
     * @param stmt
     * @param params
     */
    public String parseStatement(String stmt, List<String> params)
    {
        if (stmt == null)
        {
            return stmt;
        }
        Matcher m = STATEMENT_ARGS.matcher(stmt);
        StringBuffer sb = new StringBuffer(200);
        while (m.find())
        {
            String key = m.group();
            m.appendReplacement(sb, "?");
            params.add(key);
        }
        m.appendTail(sb);
        return sb.toString();
    }

    public Object[] getParams(ImmutableEndpoint endpoint, List paramNames, MuleMessage message, String query)
        throws Exception
    {
        Object[] params = new Object[paramNames.size()];
        for (int i = 0; i < paramNames.size(); i++)
        {
            String param = (String) paramNames.get(i);
            Object value = getParamValue(endpoint, message, param);

            params[i] = value;
        }
        return params;
    }

    protected Object getParamValue(ImmutableEndpoint endpoint, MuleMessage message, String param)
    {
        Object value = null;
        if (muleContext.getExpressionManager().isValidExpression(param))
        {
            try
            {
                value = muleContext.getExpressionManager().evaluate(param, message);
            }
            catch (ExpressionRuntimeException e)
            {
                // If expression evaluation fails then give the legacy approach a chance.
                logger.warn(MessageFormat.format(
                    "Config is using the legacy param format {0} (no evaluator defined)."
                                    + " This expression can be replaced with {1}header:{2}{3}", param,
                    ExpressionManager.DEFAULT_EXPRESSION_PREFIX, name,
                    ExpressionManager.DEFAULT_EXPRESSION_POSTFIX));
                value = endpoint.getProperty(getNameFromParam(param));
            }
        }
        return value;
    }

    protected String getNameFromParam(String param)
    {
        return param.substring(2, param.length() - 1);
    }

    @Override
    protected void doDispose()
    {
        if (dataSource instanceof Disposable)
        {
            ((Disposable) dataSource).dispose();
        }
    }

    @Override
    protected void doConnect() throws Exception
    {
        Connection connection = null;

        try
        {
            connection = getConnection();
        }
        finally
        {
            if (connection != null)
            {
                connection.close();
            }
        }
    }

    /**
     * Verify that we are able to connect to the DataSource (needed for retry policies)
     *
     * @param retryContext
     */
    @Override
    public RetryContext validateConnection(RetryContext retryContext)
    {
        Connection con;
        try
        {
            con = getConnection();
            if (con != null)
            {
                con.close();
            }
            retryContext.setOk();
        }
        catch (Exception ex)
        {
            retryContext.setFailed(ex);
        }
        finally
        {
            con = null;
        }

        return retryContext;
    }

    @Override
    protected void doDisconnect() throws Exception
    {
        // template method
    }

    @Override
    protected void doStart() throws MuleException
    {
        // template method
    }

    @Override
    protected void doStop() throws MuleException
    {
        // template method
    }

    // ////////////////////////////////////////////////////////////////////////////////////
    // Getters and Setters
    // ////////////////////////////////////////////////////////////////////////////////////

    public String getProtocol()
    {
        return JDBC;
    }

    public DataSource getDataSource()
    {
        return dataSource;
    }

    public void setDataSource(DataSource dataSource)
    {
        this.dataSource = dataSource;
    }

    public ResultSetHandler getResultSetHandler()
    {
        return resultSetHandler;
    }

    public void setResultSetHandler(ResultSetHandler resultSetHandler)
    {
        this.resultSetHandler = resultSetHandler;
    }

    public QueryRunner getQueryRunnerFor(ImmutableEndpoint endpoint)
    {
        String queryTimeoutAsString = (String) endpoint.getProperty("queryTimeout");
        Integer queryTimeout = -1;

        try
        {
            queryTimeout = Integer.valueOf(queryTimeoutAsString);
        }
        catch (NumberFormatException e)
        {

        }

        if (queryTimeout >= 0)
        {
            ExtendedQueryRunner extendedQueryRunner = new ExtendedQueryRunner(
                    this.queryRunner.getDataSource(), queryTimeout);
            return extendedQueryRunner;
        }
        else
        {
            return queryRunner;
        }
    }

    public QueryRunner getQueryRunner()
    {
        return queryRunner;
    }

    public void setQueryRunner(QueryRunner queryRunner)
    {
        this.queryRunner = queryRunner;
    }

    /**
     * @return Returns the pollingFrequency.
     */
    public long getPollingFrequency()
    {
        return pollingFrequency;
    }

    /**
     * @param pollingFrequency The pollingFrequency to set.
     */
    public void setPollingFrequency(long pollingFrequency)
    {
        this.pollingFrequency = pollingFrequency;
    }

    /**
     * @return Returns the queries.
     */
    public Map getQueries()
    {
        return queries;
    }

    /**
     * @param queries The queries to set.
     */
    public void setQueries(Map queries)
    {
        this.queries = queries;
    }

    public SqlStatementStrategyFactory getSqlStatementStrategyFactory()
    {
        return sqlStatementStrategyFactory;
    }

    public void setSqlStatementStrategyFactory(SqlStatementStrategyFactory sqlStatementStrategyFactory)
    {
        this.sqlStatementStrategyFactory = sqlStatementStrategyFactory;
    }

    public String getStatement(ImmutableEndpoint endpoint)
    {
        String writeStmt = endpoint.getEndpointURI().getAddress();
        String str;
        if ((str = getQuery(endpoint, writeStmt)) != null)
        {
            writeStmt = str;
        }
        writeStmt = StringUtils.trimToEmpty(writeStmt);
        if (StringUtils.isBlank(writeStmt))
        {
            throw new IllegalArgumentException("Missing statement");
        }

        return writeStmt;
    }

    public int getQueryTimeout()
    {
        return queryTimeout;
    }

    public void setQueryTimeout(int queryTimeout)
    {
        this.queryTimeout = queryTimeout;
    }

    @Override
    protected <T> T getOperationResourceFactory()
    {
        return (T) getDataSource();
    }

    @Override
    protected <T> T createOperationResource(ImmutableEndpoint endpoint) throws MuleException
    {
        try
        {
            return (T) getDataSource().getConnection();
        }
        catch (SQLException e)
        {
            throw new DefaultMuleException(e);
        }
    }

    @Override
    public TestResult test()
    {
        if (isConnected())
        {
            return new DefaultTestResult(TestResult.Status.SUCCESS);
        }
        Connection con = null;
        try
        {
            con = dataSource.getConnection();
            return new DefaultTestResult(TestResult.Status.SUCCESS);
        }
        catch (Exception e)
        {
            // this surely doesn't cover all cases for all kinds of jdbc drivers but it is better than nothing
            FailureType failureType = FailureType.UNSPECIFIED;
            String msg = e.getMessage();
            if (msg != null && msg.contains("Communications link failure"))
            {
                failureType = FailureType.CONNECTION_FAILURE;
            }
            else if (msg != null && msg.contains("Access denied for user"))
            {
                failureType = FailureType.INVALID_CREDENTIALS;
            }
            return new DefaultTestResult(TestResult.Status.FAILURE, e.getMessage(), failureType, e);
        }
        finally
        {
            if (con != null)
            {
                try
                {
                    con.close();
                }
                catch (SQLException e)
                {
                    // eat exception
                }
            }
        }
    }

    @Override
    public MessageProcessor createDispatcherMessageProcessor(OutboundEndpoint endpoint) throws MuleException
    {
        if (!useDispatcherPool)
        {
            // Avoid lazy initialization of dispatcher in borrow method which would be less performant by
            // creating the dispatcher instance when DispatcherMessageProcessor is created.
            MessageDispatcher dispatcher = dispatcherFactory.create(endpoint);
            applyDispatcherLifecycle(dispatcher);
            endpointDispatchers.put(endpoint, dispatcher);
        }
        return super.createDispatcherMessageProcessor(endpoint);
    }

    @Override
    protected MessageDispatcher borrowDispatcher(OutboundEndpoint endpoint) throws MuleException
    {
        if (useDispatcherPool)
        {
            return super.borrowDispatcher(endpoint);
        }
        else
        {
            return endpointDispatchers.get(endpoint);
        }
    }

    @Override
    protected void returnDispatcher(OutboundEndpoint endpoint, MessageDispatcher dispatcher)
    {
        if (useDispatcherPool)
        {
            super.returnDispatcher(endpoint, dispatcher);
        }
        else
        {
            // Nothing to do because implementation of borrowDispatcher doesn't use dispatcher pool
        }
    }

    protected void applyDispatcherLifecycle(MessageDispatcher dispatcher) throws MuleException
    {
        MessageDispatcherUtils.applyLifecycle(dispatcher);
    }

}
TOP

Related Classes of org.mule.transport.jdbc.JdbcConnector

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.