Package org.mule.module.db.internal.processor

Source Code of org.mule.module.db.internal.processor.AbstractDbMessageProcessor

/*
* Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
* The software in this package is published under the terms of the CPAL v1.0
* license, a copy of which has been included with this distribution in the
* LICENSE.txt file.
*/

package org.mule.module.db.internal.processor;

import org.mule.DefaultMuleEvent;
import org.mule.DefaultMuleMessage;
import org.mule.api.MessagingException;
import org.mule.api.MuleEvent;
import org.mule.api.MuleException;
import org.mule.api.MuleMessage;
import org.mule.api.lifecycle.Initialisable;
import org.mule.api.lifecycle.InitialisationException;
import org.mule.api.processor.InterceptingMessageProcessor;
import org.mule.common.Result;
import org.mule.common.metadata.MetaData;
import org.mule.common.metadata.OperationMetaDataEnabled;
import org.mule.module.db.internal.domain.query.QueryTemplate;
import org.mule.module.db.internal.domain.query.QueryType;
import org.mule.module.db.internal.domain.database.DbConfig;
import org.mule.module.db.internal.domain.connection.DbConnection;
import org.mule.processor.AbstractInterceptingMessageProcessor;

import org.mule.module.db.internal.metadata.QueryMetadataProvider;
import org.mule.module.db.internal.metadata.NullMetadataProvider;
import org.mule.module.db.internal.resolver.database.DbConfigResolver;
import org.mule.module.db.internal.domain.transaction.TransactionalAction;
import org.mule.util.StringUtils;

import java.sql.SQLException;
import java.util.List;

/**
* Base class for database message processors.
*/
public abstract class AbstractDbMessageProcessor extends AbstractInterceptingMessageProcessor implements Initialisable, InterceptingMessageProcessor, OperationMetaDataEnabled
{

    protected final DbConfigResolver dbConfigResolver;
    private final TransactionalAction transactionalAction;
    private QueryMetadataProvider queryMetadataProvider = new NullMetadataProvider();
    private String source;
    private String target;

    public AbstractDbMessageProcessor(DbConfigResolver dbConfigResolver, TransactionalAction transactionalAction)
    {
        this.dbConfigResolver = dbConfigResolver;
        this.transactionalAction = transactionalAction;
    }

    @Override
    public MuleEvent process(MuleEvent muleEvent) throws MuleException
    {
        DbConnection connection = null;

        DbConfig dbConfig = dbConfigResolver.resolve(muleEvent);

        try
        {
            connection = dbConfig.getConnectionFactory().createConnection(transactionalAction);

            Object result = executeQuery(connection, muleEvent);

            if (mustCloseConnection())
            {
                try
                {
                    dbConfig.getConnectionFactory().releaseConnection(connection);
                }
                finally
                {
                    connection = null;
                }
            }

            if (target == null || "".equals(target) || "#[payload]".equals(target))
            {
                muleEvent.getMessage().setPayload(result);
            }
            else
            {
                muleContext.getExpressionManager().enrich(target, muleEvent, result);
            }

            return processNext(muleEvent);
        }
        catch (SQLException e)
        {
            throw new MessagingException(muleEvent, e);
        }
        finally
        {
            if (connection != null && mustCloseConnection())
            {
                dbConfig.getConnectionFactory().releaseConnection(connection);
            }
        }
    }

    protected boolean mustCloseConnection()
    {
        return true;
    }

    protected MuleEvent resolveSource(MuleEvent muleEvent)
    {
        MuleEvent eventToUse = muleEvent;

        if (!StringUtils.isEmpty(source) && !("#[payload]".equals(source)))
        {
            Object payload = muleContext.getExpressionManager().evaluate(source, muleEvent);
            MuleMessage itemMessage = new DefaultMuleMessage(payload, muleContext);
            eventToUse = new DefaultMuleEvent(itemMessage, muleEvent);
        }
        return eventToUse;
    }

    protected abstract Object executeQuery(DbConnection connection, MuleEvent muleEvent) throws SQLException;

    @Override
    public void initialise() throws InitialisationException
    {
    }

    public void setQueryMetadataProvider(QueryMetadataProvider queryMetadataProvider)
    {
        this.queryMetadataProvider = queryMetadataProvider;
    }

    public QueryMetadataProvider getQueryMetadataProvider()
    {
        return queryMetadataProvider;
    }

    @Override
    public Result<MetaData> getOutputMetaData(MetaData metaData)
    {
        return queryMetadataProvider.getOutputMetaData(metaData);
    }

    @Override
    public Result<MetaData> getInputMetaData()
    {
        return queryMetadataProvider.getInputMetaData();
    }

    public String getSource()
    {
        return source;
    }

    public void setSource(String source)
    {
        this.source = source;
    }

    public String getTarget()
    {
        return target;
    }

    public void setTarget(String target)
    {
        this.target = target;
    }

    protected void validateQueryType(QueryTemplate queryTemplate)
    {
        List<QueryType> validTypes = getValidQueryTypes();
        if (validTypes == null || !validTypes.contains(queryTemplate.getType()))
        {
            throw new IllegalArgumentException(String.format("Query type must be one of '%s' but was '%s'", validTypes, queryTemplate.getType()));
        }
    }

    protected abstract List<QueryType> getValidQueryTypes();
}
TOP

Related Classes of org.mule.module.db.internal.processor.AbstractDbMessageProcessor

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.