Package org.apache.marmotta.platform.sparql.services.sparql

Source Code of org.apache.marmotta.platform.sparql.services.sparql.SparqlServiceImpl

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.marmotta.platform.sparql.services.sparql;

import java.io.IOException;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;

import javax.annotation.PostConstruct;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;

import org.apache.marmotta.platform.core.api.config.ConfigurationService;
import org.apache.marmotta.platform.core.api.templating.TemplatingService;
import org.apache.marmotta.platform.core.api.triplestore.SesameService;
import org.apache.marmotta.platform.core.exception.InvalidArgumentException;
import org.apache.marmotta.platform.core.exception.MarmottaException;
import org.apache.marmotta.platform.sparql.api.sparql.SparqlService;
import org.apache.marmotta.platform.sparql.services.sparqlio.rdf.SPARQLGraphResultWriter;
import org.apache.marmotta.platform.sparql.services.sparqlio.sparqlhtml.SPARQLBooleanHTMLWriter;
import org.apache.marmotta.platform.sparql.services.sparqlio.sparqlhtml.SPARQLResultsHTMLWriter;
import org.openrdf.model.Value;
import org.openrdf.query.Binding;
import org.openrdf.query.BindingSet;
import org.openrdf.query.BooleanQuery;
import org.openrdf.query.GraphQuery;
import org.openrdf.query.MalformedQueryException;
import org.openrdf.query.Query;
import org.openrdf.query.QueryEvaluationException;
import org.openrdf.query.QueryLanguage;
import org.openrdf.query.TupleQuery;
import org.openrdf.query.TupleQueryResult;
import org.openrdf.query.TupleQueryResultHandlerException;
import org.openrdf.query.Update;
import org.openrdf.query.UpdateExecutionException;
import org.openrdf.query.resultio.BooleanQueryResultFormat;
import org.openrdf.query.resultio.BooleanQueryResultWriter;
import org.openrdf.query.resultio.QueryResultIO;
import org.openrdf.query.resultio.TupleQueryResultFormat;
import org.openrdf.query.resultio.TupleQueryResultWriter;
import org.openrdf.repository.RepositoryConnection;
import org.openrdf.repository.RepositoryException;
import org.slf4j.Logger;

/**
* Sparql Service implementation
*
* @author Sebastian Schaffert
* @author Sergio Fernández
*/
@ApplicationScoped
public class SparqlServiceImpl implements SparqlService {

    /**
     * Get the seam logger for issuing logging statements.
     */
    @Inject
    private Logger log;

    @Inject
    private ConfigurationService configurationService;

    @Inject
    private TemplatingService templatingService;

    @Inject
    private SesameService sesameService;

    private ExecutorService executorService;

    private long queryId = 0;

    @PostConstruct
    public void initialize() {
        executorService = Executors.newCachedThreadPool(new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "SPARQL Query Thread " + (++queryId));
            }
        });
    }

    @Override
    public Query parseQuery(QueryLanguage language, String query) throws RepositoryException, MalformedQueryException {
        Query sparqlQuery = null;
        RepositoryConnection connection = sesameService.getConnection();
        try {
            connection.begin();
            sparqlQuery = connection.prepareQuery(language, query);
            connection.commit();
        } finally {
            connection.close();
        }
        return sparqlQuery;
    }

    @Override
    public void query(final QueryLanguage queryLanguage, final String query, final TupleQueryResultWriter tupleWriter, final BooleanQueryResultWriter booleanWriter, final SPARQLGraphResultWriter graphWriter, int timeoutInSeconds) throws MarmottaException, MalformedQueryException, QueryEvaluationException, TimeoutException {

        log.debug("executing SPARQL query:\n{}", query);


        Future<Boolean> future = executorService.submit(new Callable<Boolean>() {
            @Override
            public Boolean call() throws Exception {
                long start = System.currentTimeMillis();
                try {
                    RepositoryConnection connection = sesameService.getConnection();
                    try {
                        connection.begin();
                        Query sparqlQuery = connection.prepareQuery(queryLanguage, query);

                        if (sparqlQuery instanceof TupleQuery) {
                            query((TupleQuery) sparqlQuery, tupleWriter);
                        } else if (sparqlQuery instanceof BooleanQuery) {
                            query((BooleanQuery) sparqlQuery, booleanWriter);
                        } else if (sparqlQuery instanceof GraphQuery) {
                            query((GraphQuery) sparqlQuery, graphWriter);
                        } else {
                            connection.rollback();
                            throw new InvalidArgumentException("SPARQL query type " + sparqlQuery.getClass() + " not supported!");
                        }

                        connection.commit();
                    } catch (Exception ex) {
                        connection.rollback();
                        throw ex;
                    } finally {
                        connection.close();
                    }
                } catch(RepositoryException e) {
                    log.error("error while getting repository connection: {}", e);
                    throw new MarmottaException("error while getting repository connection", e);
                } catch (QueryEvaluationException e) {
                    log.error("error while evaluating query: {}", e.getMessage());
                    throw new MarmottaException("error while writing query result in format ", e);
                }

                log.debug("SPARQL execution took {}ms", System.currentTimeMillis()-start);

                return Boolean.TRUE;
            }
        });

        try {
            Boolean result = future.get(timeoutInSeconds, TimeUnit.SECONDS);
        } catch (InterruptedException | TimeoutException e) {
            log.info("SPARQL query execution aborted due to timeout");
            future.cancel(true);

            throw new TimeoutException("SPARQL query execution aborted due to timeout (" + configurationService.getIntConfiguration("sparql.timeout",60)+"s)");
        } catch (ExecutionException e) {
            log.info("SPARQL query execution aborted due to exception");
            if(e.getCause() instanceof MarmottaException) {
                throw (MarmottaException)e.getCause();
            } else if(e.getCause() instanceof MalformedQueryException) {
                throw (MalformedQueryException)e.getCause();
            } else {
                throw new MarmottaException("unknown exception while evaluating SPARQL query",e.getCause());
            }
        }
    }

    @Override
    public void query(final QueryLanguage language, final String query, final OutputStream output, final String format, int timeoutInSeconds) throws MarmottaException, TimeoutException, MalformedQueryException {
        log.debug("executing SPARQL query:\n{}", query);



        Future<Boolean> future = executorService.submit(new Callable<Boolean>() {
            @Override
            public Boolean call() throws Exception {
                long start = System.currentTimeMillis();
                try {
                    RepositoryConnection connection = sesameService.getConnection();
                    try {
                        connection.begin();
                        Query sparqlQuery = connection.prepareQuery(language, query);

                        if (sparqlQuery instanceof TupleQuery) {
                            query((TupleQuery)sparqlQuery, output, format);
                        } else if (sparqlQuery instanceof BooleanQuery) {
                            query((BooleanQuery)sparqlQuery, output, format);
                        } else if (sparqlQuery instanceof GraphQuery) {
                            query((GraphQuery)sparqlQuery, output, format);
                        } else {
                            throw new InvalidArgumentException("SPARQL query type " + sparqlQuery.getClass() + " not supported!");
                        }

                        connection.commit();
                    } catch (Exception ex) {
                        connection.rollback();
                        throw ex;
                    } finally {
                        connection.close();
                    }
                } catch(RepositoryException e) {
                    log.error("error while getting repository connection: {}", e);
                    throw new MarmottaException("error while getting repository connection", e);
                } catch (QueryEvaluationException e) {
                    log.error("error while evaluating query: {}", e);
                    throw new MarmottaException("error while writing query result in format ", e);
                } catch (MalformedQueryException e) {
                    log.error("error because malformed query: {}", e);
                    throw new MarmottaException("error because malformed query result in format ", e);
                }

                log.debug("SPARQL execution took {}ms", System.currentTimeMillis()-start);
                return Boolean.TRUE;
            }
        });

        try {
            Boolean result = future.get(timeoutInSeconds, TimeUnit.SECONDS);
        } catch (InterruptedException | TimeoutException e) {
            log.info("SPARQL query execution aborted due to timeout");
            future.cancel(true);

            throw new TimeoutException("SPARQL query execution aborted due to timeout (" + configurationService.getIntConfiguration("sparql.timeout",60)+"s)");
        } catch (ExecutionException e) {
            log.info("SPARQL query execution aborted due to exception");
            if(e.getCause() instanceof MarmottaException) {
                throw (MarmottaException)e.getCause();
            } else if(e.getCause() instanceof MalformedQueryException) {
                throw (MalformedQueryException)e.getCause();
            } else {
                throw new MarmottaException("unknown exception while evaluating SPARQL query",e.getCause());
            }
        }

    }

    private void query(TupleQuery query, TupleQueryResultWriter writer) throws QueryEvaluationException {
        try {
            query.evaluate(writer);
        } catch (TupleQueryResultHandlerException e) {
            throw new QueryEvaluationException("error while writing query tuple result: ",e);
        }
    }

    private void query(TupleQuery query, OutputStream output, String format) throws QueryEvaluationException {
        query(query, getTupleResultWriter(format, output));
    }

    private void query(BooleanQuery query, BooleanQueryResultWriter writer) throws QueryEvaluationException {
        try {
            writer.write(query.evaluate());
        } catch (IOException e) {
            throw new QueryEvaluationException("error while writing query boolean result: ",e);
        }
    }

    private void query(BooleanQuery query, OutputStream output, String format) throws QueryEvaluationException {
        query(query, getBooleanResultWriter(format, output));
    }

    private void query(GraphQuery query, SPARQLGraphResultWriter writer) throws QueryEvaluationException {
        try {
            writer.write(query.evaluate());
        } catch (IOException e) {
            throw new QueryEvaluationException("error while writing query graph result: ",e);
        }
    }

    private void query(GraphQuery query, OutputStream output, String format) throws QueryEvaluationException {
        query(query, getGraphResultWriter(format, output));
    }

    /**
     * Evaluate a SPARQL query on the LMF TripleStore. Returns the results as a list of result maps, each element
     * a KiWiNode.
     * <p/>
     * see http://www.w3.org/TR/sparql11-query/
     *
     * @param queryLanguage the query language to use
     * @param query         the SPARQL query to evaluate in SPARQL 1.1 syntax
     */
    @Override
    public List<Map<String, Value>> query(QueryLanguage queryLanguage, String query) throws MarmottaException {
        long start = System.currentTimeMillis();

        log.debug("executing {} query:\n{}", queryLanguage.getName(), query);

        List<Map<String,Value>> result = new LinkedList<Map<String, Value>>();

        try {
            RepositoryConnection connection = sesameService.getConnection();
            try {
                connection.begin();
                TupleQuery tupleQuery = connection.prepareTupleQuery(queryLanguage, query);
                TupleQueryResult r = tupleQuery.evaluate();
                try {
                    while (r.hasNext()) {
                        BindingSet s = r.next();
                        Map<String, Value> map = new HashMap<String, Value>();
                        for (Binding binding : s) {
                            map.put(binding.getName(), binding.getValue());
                        }
                        result.add(map);
                    }
                } finally {
                    r.close();
                }
                //
                connection.commit();
            } finally {
                connection.close();
            }
        } catch(RepositoryException ex) {
            log.error("error while getting repository connection");
            throw new MarmottaException("error while getting repository connection",ex);
        } catch (QueryEvaluationException e) {
            throw new MarmottaException("error while evaluating SPARQL query "+query,e);
        } catch (MalformedQueryException e) {
            throw new InvalidArgumentException("malformed SPARQL query ("+query+") for language "+queryLanguage,e);
        }

        log.debug("SPARQL execution took {}ms",System.currentTimeMillis()-start);
        return result;
    }

    /**
     * Execute a SPARQL update on the LMF TripleStore. Throws a KiWiException in case the update execution fails.
     * <p/>
     * see http://www.w3.org/TR/sparql11-update/
     *
     * @param queryLanguage
     * @param query         a string representing the update query in SPARQL Update 1.1 syntax
     * @throws Exception
     */
    @Override
    public void update(QueryLanguage queryLanguage, String query) throws MarmottaException {
        long start = System.currentTimeMillis();

        log.debug("executing SPARQL update:\n{}", query);

        try {
            RepositoryConnection connection = sesameService.getConnection();
            try {
                connection.begin();
                Update update = connection.prepareUpdate(queryLanguage,query,configurationService.getBaseUri());
                update.execute();
                connection.commit();
            } catch (UpdateExecutionException e) {
                connection.rollback();
                throw new MarmottaException("error while executing update",e);
            } catch (MalformedQueryException e) {
                connection.rollback();
                throw new MarmottaException("malformed query, update failed",e);
            } finally {
                connection.close();
            }
        } catch(RepositoryException ex) {
            log.error("error while getting repository connection", ex);
            throw new MarmottaException("error while getting repository connection",ex);
        }
        log.debug("SPARQL update execution took {}ms",System.currentTimeMillis()-start);

    }

    @Override
    public boolean ask(QueryLanguage queryLanguage, String query)
            throws MarmottaException {
        long start = System.currentTimeMillis();

        log.debug("executing SPARQL ask:\n{}", query);

        boolean result = false;
        try {
            RepositoryConnection connection = sesameService.getConnection();
            try {
                connection.begin();
                BooleanQuery ask = connection.prepareBooleanQuery(queryLanguage, query);
                result = ask.evaluate();
                connection.commit();
            } catch (MalformedQueryException e) {
                throw new MarmottaException("malformed query, update failed",e);
            } catch (QueryEvaluationException e) {
                throw new MarmottaException("error evaluating querry",e);
            } finally {
                connection.close();
            }
        } catch(RepositoryException ex) {
            log.error("error while getting repository connection", ex);
            throw new MarmottaException("error while getting repository connection",ex);
        }
        log.debug("SPARQL update execution took {}ms",System.currentTimeMillis()-start);
        return result;
    }

    private TupleQueryResultWriter getTupleResultWriter(String format, OutputStream os) {
        //build outputwriter
        final TupleQueryResultWriter out;
        if(format == null) {
            out = QueryResultIO.createWriter(TupleQueryResultFormat.SPARQL, os);
        } else if(SparqlWritersHelper.parseSubType(format).equals("html")) {
            out = new SPARQLResultsHTMLWriter(os, templatingService);
        } else if(SparqlWritersHelper.parseSubType(format).equals("json")) {
            out = QueryResultIO.createWriter(TupleQueryResultFormat.JSON, os);
        } else if(SparqlWritersHelper.parseSubType(format).equals("xml")) {
            out = QueryResultIO.createWriter(TupleQueryResultFormat.SPARQL, os);
        } else if(SparqlWritersHelper.parseSubType(format).equals("csv")) {
            out = QueryResultIO.createWriter(TupleQueryResultFormat.CSV, os);
        } else throw new InvalidArgumentException("could not produce format "+format);
        return out;
    }

    private BooleanQueryResultWriter getBooleanResultWriter(String format, OutputStream os) {
        //build outputwriter
        final BooleanQueryResultWriter out;
        if(format == null) {
            out = QueryResultIO.createWriter(BooleanQueryResultFormat.SPARQL, os);
        } else if(SparqlWritersHelper.parseSubType(format).equals("html")) {
            out = new SPARQLBooleanHTMLWriter(os);
        } else if(SparqlWritersHelper.parseSubType(format).equals("json")) {
            out = QueryResultIO.createWriter(BooleanQueryResultFormat.JSON, os);
        } else if(SparqlWritersHelper.parseSubType(format).equals("xml")) {
            out = QueryResultIO.createWriter(BooleanQueryResultFormat.SPARQL, os);
        } else if(SparqlWritersHelper.parseSubType(format).equals("csv")) {
            out = QueryResultIO.createWriter(BooleanQueryResultFormat.TEXT, os);
        } else throw new InvalidArgumentException("could not produce format "+format);
        return out;
    }

    private SPARQLGraphResultWriter getGraphResultWriter(String format, OutputStream os) {
        return new SPARQLGraphResultWriter(os,format);
    }

}
TOP

Related Classes of org.apache.marmotta.platform.sparql.services.sparql.SparqlServiceImpl

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.