/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jackrabbit.core.persistence.db;
import org.apache.jackrabbit.core.NodeId;
import org.apache.jackrabbit.core.PropertyId;
import org.apache.jackrabbit.core.fs.FileSystem;
import org.apache.jackrabbit.core.fs.local.LocalFileSystem;
import org.apache.jackrabbit.core.persistence.AbstractPersistenceManager;
import org.apache.jackrabbit.core.persistence.PMContext;
import org.apache.jackrabbit.core.persistence.util.BLOBStore;
import org.apache.jackrabbit.core.persistence.util.FileSystemBLOBStore;
import org.apache.jackrabbit.core.persistence.util.Serializer;
import org.apache.jackrabbit.core.state.ChangeLog;
import org.apache.jackrabbit.core.state.ItemState;
import org.apache.jackrabbit.core.state.ItemStateException;
import org.apache.jackrabbit.core.state.NoSuchItemStateException;
import org.apache.jackrabbit.core.state.NodeReferences;
import org.apache.jackrabbit.core.state.NodeReferencesId;
import org.apache.jackrabbit.core.state.NodeState;
import org.apache.jackrabbit.core.state.PropertyState;
import org.apache.jackrabbit.core.value.BLOBFileValue;
import org.apache.jackrabbit.core.value.InternalValue;
import org.apache.jackrabbit.util.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jcr.PropertyType;
import javax.jcr.RepositoryException;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Iterator;
/**
* Abstract base class for database persistence managers. This class
* contains common functionality for database persistence manager subclasses
* that normally differ only in the way the database connection is acquired.
* Subclasses should override the {@link #getConnection()} method to return
* the configured database connection.
* <p>
* See the {@link SimpleDbPersistenceManager} for a detailed description
* of the available configuration options and database behaviour.
*/
public abstract class DatabasePersistenceManager extends AbstractPersistenceManager {
/**
* Logger instance
*/
private static Logger log = LoggerFactory.getLogger(DatabasePersistenceManager.class);
protected static final String SCHEMA_OBJECT_PREFIX_VARIABLE =
"${schemaObjectPrefix}";
protected boolean initialized;
protected String schema;
protected String schemaObjectPrefix;
protected boolean externalBLOBs;
// initial size of buffer used to serialize objects
protected static final int INITIAL_BUFFER_SIZE = 1024;
// jdbc connection
protected Connection con;
// internal flag governing whether an automatic reconnect should be
// attempted after a SQLException had been encountered
protected boolean autoReconnect = true;
// time to sleep in ms before a reconnect is attempted
protected static final int SLEEP_BEFORE_RECONNECT = 10000;
// the map of prepared statements (key: sql stmt, value: prepared stmt)
private HashMap preparedStatements = new HashMap();
// SQL statements for NodeState management
protected String nodeStateInsertSQL;
protected String nodeStateUpdateSQL;
protected String nodeStateSelectSQL;
protected String nodeStateSelectExistSQL;
protected String nodeStateDeleteSQL;
// SQL statements for PropertyState management
protected String propertyStateInsertSQL;
protected String propertyStateUpdateSQL;
protected String propertyStateSelectSQL;
protected String propertyStateSelectExistSQL;
protected String propertyStateDeleteSQL;
// SQL statements for NodeReference management
protected String nodeReferenceInsertSQL;
protected String nodeReferenceUpdateSQL;
protected String nodeReferenceSelectSQL;
protected String nodeReferenceSelectExistSQL;
protected String nodeReferenceDeleteSQL;
// SQL statements for BLOB management
// (if <code>externalBLOBs==false</code>)
protected String blobInsertSQL;
protected String blobUpdateSQL;
protected String blobSelectSQL;
protected String blobSelectExistSQL;
protected String blobDeleteSQL;
/**
* file system where BLOB data is stored
* (if <code>externalBLOBs==true</code>)
*/
protected FileSystem blobFS;
/**
* BLOBStore that manages BLOB data in the file system
* (if <code>externalBLOBs==true</code>)
*/
protected BLOBStore blobStore;
/**
* Creates a new <code>DatabasePersistenceManager</code> instance.
*/
public DatabasePersistenceManager() {
schema = "default";
schemaObjectPrefix = "";
externalBLOBs = true;
initialized = false;
}
//----------------------------------------------------< setters & getters >
public String getSchemaObjectPrefix() {
return schemaObjectPrefix;
}
public void setSchemaObjectPrefix(String schemaObjectPrefix) {
// make sure prefix is all uppercase
this.schemaObjectPrefix = schemaObjectPrefix.toUpperCase();
}
public String getSchema() {
return schema;
}
public void setSchema(String schema) {
this.schema = schema;
}
public boolean isExternalBLOBs() {
return externalBLOBs;
}
public void setExternalBLOBs(boolean externalBLOBs) {
this.externalBLOBs = externalBLOBs;
}
public void setExternalBLOBs(String externalBLOBs) {
this.externalBLOBs = Boolean.valueOf(externalBLOBs).booleanValue();
}
//---------------------------------------------------< PersistenceManager >
/**
* {@inheritDoc}
*/
public void init(PMContext context) throws Exception {
if (initialized) {
throw new IllegalStateException("already initialized");
}
// setup jdbc connection
initConnection();
// make sure schemaObjectPrefix consists of legal name characters only
prepareSchemaObjectPrefix();
// check if schema objects exist and create them if necessary
checkSchema();
// build sql statements
buildSQLStatements();
// prepare statements
initPreparedStatements();
if (externalBLOBs) {
/**
* store BLOBs in local file system in a sub directory
* of the workspace home directory
*/
LocalFileSystem blobFS = new LocalFileSystem();
blobFS.setRoot(new File(context.getHomeDir(), "blobs"));
blobFS.init();
this.blobFS = blobFS;
blobStore = new FileSystemBLOBStore(blobFS);
} else {
/**
* store BLOBs in db
*/
blobStore = new DbBLOBStore();
}
initialized = true;
}
/**
* {@inheritDoc}
*/
public synchronized void close() throws Exception {
if (!initialized) {
throw new IllegalStateException("not initialized");
}
try {
// close shared prepared statements
for (Iterator it = preparedStatements.values().iterator(); it.hasNext(); ) {
closeStatement((PreparedStatement) it.next());
}
preparedStatements.clear();
if (externalBLOBs) {
// close BLOB file system
blobFS.close();
blobFS = null;
}
blobStore = null;
// close jdbc connection
closeConnection(con);
} finally {
initialized = false;
}
}
/**
* {@inheritDoc}
*/
public synchronized void store(ChangeLog changeLog)
throws ItemStateException {
// temporarily disable automatic reconnect feature
// since the changes need to be persisted atomically
autoReconnect = false;
try {
ItemStateException ise = null;
// number of attempts to store the changes
int trials = 2;
while (true) {
try {
super.store(changeLog);
break;
} catch (ItemStateException e) {
// catch exception and fall through...
ise = e;
}
if (ise != null && ise.getCause() instanceof SQLException
&& --trials > 0) {
// a SQLException has been thrown, try to reconnect
log.warn("storing changes failed, about to reconnect...", ise.getCause());
// try to reconnect
if (reestablishConnection()) {
// now let's give it another try
ise = null;
continue;
} else {
// reconnect failed, proceed with error processing
break;
}
}
}
if (ise == null) {
// storing the changes succeeded, now commit the changes
try {
con.commit();
} catch (SQLException e) {
String msg = "committing change log failed";
log.error(msg, e);
throw new ItemStateException(msg, e);
}
} else {
// storing the changes failed, rollback changes
try {
con.rollback();
} catch (SQLException e) {
String msg = "rollback of change log failed";
log.error(msg, e);
}
// re-throw original exception
throw ise;
}
} finally {
// re-enable automatic reconnect feature
autoReconnect = true;
}
}
/**
* {@inheritDoc}
*/
public NodeState load(NodeId id)
throws NoSuchItemStateException, ItemStateException {
if (!initialized) {
throw new IllegalStateException("not initialized");
}
synchronized (nodeStateSelectSQL) {
ResultSet rs = null;
InputStream in = null;
try {
Statement stmt = executeStmt(nodeStateSelectSQL, new Object[]{id.toString()});
rs = stmt.getResultSet();
if (!rs.next()) {
throw new NoSuchItemStateException(id.toString());
}
in = rs.getBinaryStream(1);
NodeState state = createNew(id);
Serializer.deserialize(state, in);
return state;
} catch (Exception e) {
if (e instanceof NoSuchItemStateException) {
throw (NoSuchItemStateException) e;
}
String msg = "failed to read node state: " + id;
log.error(msg, e);
throw new ItemStateException(msg, e);
} finally {
closeStream(in);
closeResultSet(rs);
}
}
}
/**
* {@inheritDoc}
*/
public PropertyState load(PropertyId id)
throws NoSuchItemStateException, ItemStateException {
if (!initialized) {
throw new IllegalStateException("not initialized");
}
synchronized (propertyStateSelectSQL) {
ResultSet rs = null;
InputStream in = null;
try {
Statement stmt = executeStmt(propertyStateSelectSQL, new Object[]{id.toString()});
rs = stmt.getResultSet();
if (!rs.next()) {
throw new NoSuchItemStateException(id.toString());
}
in = rs.getBinaryStream(1);
PropertyState state = createNew(id);
Serializer.deserialize(state, in, blobStore);
return state;
} catch (Exception e) {
if (e instanceof NoSuchItemStateException) {
throw (NoSuchItemStateException) e;
}
String msg = "failed to read property state: " + id;
log.error(msg, e);
throw new ItemStateException(msg, e);
} finally {
closeStream(in);
closeResultSet(rs);
}
}
}
/**
* {@inheritDoc}
* <p/>
* This method uses shared <code>PreparedStatement</code>s which must
* be executed strictly sequentially. Because this method synchronizes on
* the persistence manager instance there is no need to synchronize on the
* shared statement. If the method would not be sychronized the shared
* statements would have to be synchronized.
*/
public synchronized void store(NodeState state) throws ItemStateException {
if (!initialized) {
throw new IllegalStateException("not initialized");
}
// check if insert or update
boolean update = state.getStatus() != ItemState.STATUS_NEW;
//boolean update = exists(state.getId());
String sql = (update) ? nodeStateUpdateSQL : nodeStateInsertSQL;
try {
ByteArrayOutputStream out =
new ByteArrayOutputStream(INITIAL_BUFFER_SIZE);
// serialize node state
Serializer.serialize(state, out);
// we are synchronized on this instance, therefore we do not
// not have to additionally synchronize on the sql statement
executeStmt(sql, new Object[]{out.toByteArray(), state.getNodeId().toString()});
// there's no need to close a ByteArrayOutputStream
//out.close();
} catch (Exception e) {
String msg = "failed to write node state: " + state.getNodeId();
log.error(msg, e);
throw new ItemStateException(msg, e);
}
}
/**
* {@inheritDoc}
* <p/>
* This method uses shared <code>PreparedStatement</code>s which must
* be executed strictly sequentially. Because this method synchronizes on
* the persistence manager instance there is no need to synchronize on the
* shared statement. If the method would not be sychronized the shared
* statements would have to be synchronized.
*/
public synchronized void store(PropertyState state)
throws ItemStateException {
if (!initialized) {
throw new IllegalStateException("not initialized");
}
// check if insert or update
boolean update = state.getStatus() != ItemState.STATUS_NEW;
//boolean update = exists(state.getId());
String sql = (update) ? propertyStateUpdateSQL : propertyStateInsertSQL;
try {
ByteArrayOutputStream out =
new ByteArrayOutputStream(INITIAL_BUFFER_SIZE);
// serialize property state
Serializer.serialize(state, out, blobStore);
// we are synchronized on this instance, therefore we do not
// not have to additionally synchronize on the sql statement
executeStmt(sql, new Object[]{out.toByteArray(), state.getPropertyId().toString()});
// there's no need to close a ByteArrayOutputStream
//out.close();
} catch (Exception e) {
String msg = "failed to write property state: " + state.getPropertyId();
log.error(msg, e);
throw new ItemStateException(msg, e);
}
}
/**
* {@inheritDoc}
*/
public synchronized void destroy(NodeState state)
throws ItemStateException {
if (!initialized) {
throw new IllegalStateException("not initialized");
}
try {
// we are synchronized on this instance, therefore we do not
// not have to additionally synchronize on the sql statement
executeStmt(nodeStateDeleteSQL, new Object[]{state.getNodeId().toString()});
} catch (Exception e) {
String msg = "failed to delete node state: " + state.getNodeId();
log.error(msg, e);
throw new ItemStateException(msg, e);
}
}
/**
* {@inheritDoc}
*/
public synchronized void destroy(PropertyState state)
throws ItemStateException {
if (!initialized) {
throw new IllegalStateException("not initialized");
}
// make sure binary values (BLOBs) are properly removed
InternalValue[] values = state.getValues();
if (values != null) {
for (int i = 0; i < values.length; i++) {
InternalValue val = values[i];
if (val != null) {
if (val.getType() == PropertyType.BINARY) {
BLOBFileValue blobVal = (BLOBFileValue) val.internalValue();
// delete internal resource representation of BLOB value
blobVal.delete(true);
// also remove from BLOBStore
String blobId = blobStore.createId(state.getPropertyId(), i);
try {
blobStore.remove(blobId);
} catch (Exception e) {
log.warn("failed to remove from BLOBStore: " + blobId, e);
}
}
}
}
}
try {
// we are synchronized on this instance, therefore we do not
// not have to additionally synchronize on the sql statement
executeStmt(propertyStateDeleteSQL, new Object[]{state.getPropertyId().toString()});
} catch (Exception e) {
String msg = "failed to delete property state: " + state.getPropertyId();
log.error(msg, e);
throw new ItemStateException(msg, e);
}
}
/**
* {@inheritDoc}
*/
public NodeReferences load(NodeReferencesId targetId)
throws NoSuchItemStateException, ItemStateException {
if (!initialized) {
throw new IllegalStateException("not initialized");
}
synchronized (nodeReferenceSelectSQL) {
ResultSet rs = null;
InputStream in = null;
try {
Statement stmt = executeStmt(
nodeReferenceSelectSQL, new Object[]{targetId.toString()});
rs = stmt.getResultSet();
if (!rs.next()) {
throw new NoSuchItemStateException(targetId.toString());
}
in = rs.getBinaryStream(1);
NodeReferences refs = new NodeReferences(targetId);
Serializer.deserialize(refs, in);
return refs;
} catch (Exception e) {
if (e instanceof NoSuchItemStateException) {
throw (NoSuchItemStateException) e;
}
String msg = "failed to read node references: " + targetId;
log.error(msg, e);
throw new ItemStateException(msg, e);
} finally {
closeStream(in);
closeResultSet(rs);
}
}
}
/**
* {@inheritDoc}
* <p/>
* This method uses shared <code>PreparedStatement</code>s which must
* be executed strictly sequentially. Because this method synchronizes on
* the persistence manager instance there is no need to synchronize on the
* shared statement. If the method would not be sychronized the shared
* statements would have to be synchronized.
*/
public synchronized void store(NodeReferences refs)
throws ItemStateException {
if (!initialized) {
throw new IllegalStateException("not initialized");
}
// check if insert or update
boolean update = exists(refs.getId());
String sql = (update) ? nodeReferenceUpdateSQL : nodeReferenceInsertSQL;
try {
ByteArrayOutputStream out =
new ByteArrayOutputStream(INITIAL_BUFFER_SIZE);
// serialize references
Serializer.serialize(refs, out);
// we are synchronized on this instance, therefore we do not
// not have to additionally synchronize on the sql statement
executeStmt(sql, new Object[]{out.toByteArray(), refs.getId().toString()});
// there's no need to close a ByteArrayOutputStream
//out.close();
} catch (Exception e) {
String msg = "failed to write node references: " + refs.getId();
log.error(msg, e);
throw new ItemStateException(msg, e);
}
}
/**
* {@inheritDoc}
*/
public synchronized void destroy(NodeReferences refs)
throws ItemStateException {
if (!initialized) {
throw new IllegalStateException("not initialized");
}
try {
// we are synchronized on this instance, therefore we do not
// not have to additionally synchronize on the sql statement
executeStmt(nodeReferenceDeleteSQL, new Object[]{refs.getId().toString()});
} catch (Exception e) {
String msg = "failed to delete node references: " + refs.getId();
log.error(msg, e);
throw new ItemStateException(msg, e);
}
}
/**
* {@inheritDoc}
*/
public boolean exists(NodeId id) throws ItemStateException {
if (!initialized) {
throw new IllegalStateException("not initialized");
}
synchronized (nodeStateSelectExistSQL) {
ResultSet rs = null;
try {
Statement stmt = executeStmt(nodeStateSelectExistSQL, new Object[]{id.toString()});
rs = stmt.getResultSet();
// a node state exists if the result has at least one entry
return rs.next();
} catch (Exception e) {
String msg = "failed to check existence of node state: " + id;
log.error(msg, e);
throw new ItemStateException(msg, e);
} finally {
closeResultSet(rs);
}
}
}
/**
* {@inheritDoc}
*/
public boolean exists(PropertyId id) throws ItemStateException {
if (!initialized) {
throw new IllegalStateException("not initialized");
}
synchronized (propertyStateSelectExistSQL) {
ResultSet rs = null;
try {
Statement stmt = executeStmt(
propertyStateSelectExistSQL, new Object[]{id.toString()});
rs = stmt.getResultSet();
// a property state exists if the result has at least one entry
return rs.next();
} catch (Exception e) {
String msg = "failed to check existence of property state: " + id;
log.error(msg, e);
throw new ItemStateException(msg, e);
} finally {
closeResultSet(rs);
}
}
}
/**
* {@inheritDoc}
*/
public boolean exists(NodeReferencesId targetId) throws ItemStateException {
if (!initialized) {
throw new IllegalStateException("not initialized");
}
synchronized (nodeReferenceSelectExistSQL) {
ResultSet rs = null;
try {
Statement stmt = executeStmt(
nodeReferenceSelectExistSQL, new Object[]{targetId.toString()});
rs = stmt.getResultSet();
// a reference exists if the result has at least one entry
return rs.next();
} catch (Exception e) {
String msg = "failed to check existence of node references: "
+ targetId;
log.error(msg, e);
throw new ItemStateException(msg, e);
} finally {
closeResultSet(rs);
}
}
}
//----------------------------------< misc. helper methods & overridables >
/**
* Initializes the database connection used by this persistence manager.
* <p>
* Subclasses should normally override the {@link #getConnection()}
* method instead of this one. The default implementation calls
* {@link #getConnection()} to get the database connection and disables
* the autocommit feature.
*
* @throws Exception if an error occurs
*/
protected void initConnection() throws Exception {
con = getConnection();
con.setAutoCommit(false);
}
/**
* Abstract factory method for creating a new database connection. This
* method is called by {@link #init(PMContext)} when the persistence
* manager is started. The returned connection should come with the default
* JDBC settings, as the {@link #init(PMContext)} method will explicitly
* set the <code>autoCommit</code> and other properties as needed.
* <p>
* Note that the returned database connection is kept during the entire
* lifetime of the persistence manager, after which it is closed by
* {@link #close()} using the {@link #closeConnection(Connection)} method.
*
* @return new connection
* @throws Exception if an error occurs
*/
protected Connection getConnection() throws Exception {
throw new UnsupportedOperationException("Override in a subclass!");
}
/**
* Closes the given database connection. This method is called by
* {@link #close()} to close the connection acquired using
* {@link #getConnection()} when the persistence manager was started.
* <p>
* The default implementation just calls the {@link Connection#close()}
* method of the given connection, but subclasses can override this
* method to provide more extensive database and connection cleanup.
*
* @param connection database connection
* @throws Exception if an error occurs
*/
protected void closeConnection(Connection connection) throws Exception {
connection.close();
}
/**
* Re-establishes the database connection. This method is called by
* {@link #store(ChangeLog)} and {@link #executeStmt(String, Object[])}
* after a <code>SQLException</code> had been encountered.
* @return true if the connection could be successfully re-established,
* false otherwise.
*/
protected synchronized boolean reestablishConnection() {
// in any case try to shut down current connection
// gracefully in order to avoid potential memory leaks
// close shared prepared statements
for (Iterator it = preparedStatements.values().iterator(); it.hasNext(); ) {
closeStatement((PreparedStatement) it.next());
}
preparedStatements.clear();
try {
closeConnection(con);
} catch (Exception ignore) {
}
// sleep for a while to give database a chance
// to restart before a reconnect is attempted
try {
Thread.sleep(SLEEP_BEFORE_RECONNECT);
} catch (InterruptedException ignore) {
}
// now try to re-establish connection
try {
initConnection();
initPreparedStatements();
return true;
} catch (Exception e) {
log.error("failed to re-establish connection", e);
// reconnect failed
return false;
}
}
/**
* Executes the given SQL statement with the specified parameters.
* If a <code>SQLException</code> is encountered and
* <code>autoReconnect==true</code> <i>one</i> attempt is made to re-establish
* the database connection and re-execute the statement.
*
* @param sql statement to execute
* @param params parameters to set
* @return the <code>Statement</code> object that had been executed
* @throws SQLException if an error occurs
*/
protected Statement executeStmt(String sql, Object[] params)
throws SQLException {
int trials = autoReconnect ? 2 : 1;
while (true) {
PreparedStatement stmt = (PreparedStatement) preparedStatements.get(sql);
try {
for (int i = 0; i < params.length; i++) {
if (params[i] instanceof SizedInputStream) {
SizedInputStream in = (SizedInputStream) params[i];
stmt.setBinaryStream(i + 1, in, (int) in.getSize());
} else {
stmt.setObject(i + 1, params[i]);
}
}
stmt.execute();
resetStatement(stmt);
return stmt;
} catch (SQLException se) {
if (--trials == 0) {
// no more trials, re-throw
throw se;
}
log.warn("execute failed, about to reconnect...", se.getMessage());
// try to reconnect
if (reestablishConnection()) {
// reconnect succeeded; check whether it's possible to
// re-execute the prepared stmt with the given parameters
for (int i = 0; i < params.length; i++) {
if (params[i] instanceof SizedInputStream) {
SizedInputStream in = (SizedInputStream) params[i];
if (in.isConsumed()) {
// we're unable to re-execute the prepared stmt
// since an InputStream paramater has already
// been 'consumed';
// re-throw previous SQLException
throw se;
}
}
}
// try again to execute the statement
continue;
} else {
// reconnect failed, re-throw previous SQLException
throw se;
}
}
}
}
/**
* Resets the given <code>PreparedStatement</code> by clearing the parameters
* and warnings contained.
* <p/>
* NOTE: This method MUST be called in a synchronized context as neither
* this method nor the <code>PreparedStatement</code> instance on which it
* operates are thread safe.
*
* @param stmt The <code>PreparedStatement</code> to reset. If
* <code>null</code> this method does nothing.
*/
protected void resetStatement(PreparedStatement stmt) {
if (stmt != null) {
try {
stmt.clearParameters();
stmt.clearWarnings();
} catch (SQLException se) {
logException("failed resetting PreparedStatement", se);
}
}
}
protected void closeResultSet(ResultSet rs) {
if (rs != null) {
try {
rs.close();
} catch (SQLException se) {
logException("failed closing ResultSet", se);
}
}
}
protected void closeStream(InputStream in) {
if (in != null) {
try {
in.close();
} catch (IOException ignore) {
}
}
}
protected void closeStatement(Statement stmt) {
if (stmt != null) {
try {
stmt.close();
} catch (SQLException se) {
logException("failed closing Statement", se);
}
}
}
protected void logException(String message, SQLException se) {
if (message != null) {
log.error(message);
}
log.error(" reason: " + se.getMessage());
log.error("state/code: " + se.getSQLState() + "/" + se.getErrorCode());
log.debug(" dump:", se);
}
/**
* Makes sure that <code>schemaObjectPrefix</code> does only consist of
* characters that are allowed in names on the target database. Illegal
* characters will be escaped as necessary.
*
* @throws Exception if an error occurs
*/
protected void prepareSchemaObjectPrefix() throws Exception {
DatabaseMetaData metaData = con.getMetaData();
String legalChars = metaData.getExtraNameCharacters();
legalChars += "ABCDEFGHIJKLMNOPQRSTUVWXZY0123456789_";
String prefix = schemaObjectPrefix.toUpperCase();
StringBuffer escaped = new StringBuffer();
for (int i = 0; i < prefix.length(); i++) {
char c = prefix.charAt(i);
if (legalChars.indexOf(c) == -1) {
escaped.append("_x");
String hex = Integer.toHexString(c);
escaped.append("0000".toCharArray(), 0, 4 - hex.length());
escaped.append(hex);
escaped.append("_");
} else {
escaped.append(c);
}
}
schemaObjectPrefix = escaped.toString();
}
/**
* Checks if the required schema objects exist and creates them if they
* don't exist yet.
*
* @throws Exception if an error occurs
*/
protected void checkSchema() throws Exception {
DatabaseMetaData metaData = con.getMetaData();
String tableName = schemaObjectPrefix + "NODE";
if (metaData.storesLowerCaseIdentifiers()) {
tableName = tableName.toLowerCase();
} else if (metaData.storesUpperCaseIdentifiers()) {
tableName = tableName.toUpperCase();
}
ResultSet rs = metaData.getTables(null, null, tableName, null);
boolean schemaExists;
try {
schemaExists = rs.next();
} finally {
rs.close();
}
if (!schemaExists) {
// read ddl from resources
InputStream in = getSchemaDDL();
if (in == null) {
String msg = "Configuration error: unknown schema '" + schema + "'";
log.debug(msg);
throw new RepositoryException(msg);
}
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
Statement stmt = con.createStatement();
try {
String sql = reader.readLine();
while (sql != null) {
// Skip comments and empty lines
if (!sql.startsWith("#") && sql.length() > 0) {
// replace prefix variable
sql = Text.replace(sql, SCHEMA_OBJECT_PREFIX_VARIABLE, schemaObjectPrefix);
// execute sql stmt
stmt.executeUpdate(sql);
}
// read next sql stmt
sql = reader.readLine();
}
// commit the changes
con.commit();
} finally {
closeStream(in);
closeStatement(stmt);
}
}
}
/**
* Returns an input stream to the schema DDL resource.
* @return an input stream to the schema DDL resource.
*/
protected InputStream getSchemaDDL() {
// JCR-595: Use the class explicitly instead of using getClass()
// to avoid problems when subclassed in a different package
return DatabasePersistenceManager.class.getResourceAsStream(schema + ".ddl");
}
/**
* Builds the SQL statements
*/
protected void buildSQLStatements() {
nodeStateInsertSQL = "insert into "
+ schemaObjectPrefix + "NODE (NODE_DATA, NODE_ID) values (?, ?)";
nodeStateUpdateSQL = "update "
+ schemaObjectPrefix + "NODE set NODE_DATA = ? where NODE_ID = ?";
nodeStateSelectSQL = "select NODE_DATA from "
+ schemaObjectPrefix + "NODE where NODE_ID = ?";
nodeStateSelectExistSQL = "select 1 from "
+ schemaObjectPrefix + "NODE where NODE_ID = ?";
nodeStateDeleteSQL = "delete from "
+ schemaObjectPrefix + "NODE where NODE_ID = ?";
propertyStateInsertSQL = "insert into "
+ schemaObjectPrefix + "PROP (PROP_DATA, PROP_ID) values (?, ?)";
propertyStateUpdateSQL = "update "
+ schemaObjectPrefix + "PROP set PROP_DATA = ? where PROP_ID = ?";
propertyStateSelectSQL = "select PROP_DATA from "
+ schemaObjectPrefix + "PROP where PROP_ID = ?";
propertyStateSelectExistSQL = "select 1 from "
+ schemaObjectPrefix + "PROP where PROP_ID = ?";
propertyStateDeleteSQL = "delete from "
+ schemaObjectPrefix + "PROP where PROP_ID = ?";
nodeReferenceInsertSQL = "insert into "
+ schemaObjectPrefix + "REFS (REFS_DATA, NODE_ID) values (?, ?)";
nodeReferenceUpdateSQL = "update "
+ schemaObjectPrefix + "REFS set REFS_DATA = ? where NODE_ID = ?";
nodeReferenceSelectSQL = "select REFS_DATA from "
+ schemaObjectPrefix + "REFS where NODE_ID = ?";
nodeReferenceSelectExistSQL = "select 1 from "
+ schemaObjectPrefix + "REFS where NODE_ID = ?";
nodeReferenceDeleteSQL = "delete from "
+ schemaObjectPrefix + "REFS where NODE_ID = ?";
if (!externalBLOBs) {
blobInsertSQL = "insert into "
+ schemaObjectPrefix + "BINVAL (BINVAL_DATA, BINVAL_ID) values (?, ?)";
blobUpdateSQL = "update "
+ schemaObjectPrefix + "BINVAL set BINVAL_DATA = ? where BINVAL_ID = ?";
blobSelectSQL =
"select BINVAL_DATA from "
+ schemaObjectPrefix + "BINVAL where BINVAL_ID = ?";
blobSelectExistSQL =
"select 1 from "
+ schemaObjectPrefix + "BINVAL where BINVAL_ID = ?";
blobDeleteSQL = "delete from "
+ schemaObjectPrefix + "BINVAL where BINVAL_ID = ?";
}
}
/**
* Initializes the map of prepared statements.
*
* @throws SQLException if an error occurs
*/
protected void initPreparedStatements() throws SQLException {
preparedStatements.put(
nodeStateInsertSQL, con.prepareStatement(nodeStateInsertSQL));
preparedStatements.put(
nodeStateUpdateSQL, con.prepareStatement(nodeStateUpdateSQL));
preparedStatements.put(
nodeStateSelectSQL, con.prepareStatement(nodeStateSelectSQL));
preparedStatements.put(
nodeStateSelectExistSQL, con.prepareStatement(nodeStateSelectExistSQL));
preparedStatements.put(
nodeStateDeleteSQL, con.prepareStatement(nodeStateDeleteSQL));
preparedStatements.put(
propertyStateInsertSQL, con.prepareStatement(propertyStateInsertSQL));
preparedStatements.put(
propertyStateUpdateSQL, con.prepareStatement(propertyStateUpdateSQL));
preparedStatements.put(
propertyStateSelectSQL, con.prepareStatement(propertyStateSelectSQL));
preparedStatements.put(
propertyStateSelectExistSQL, con.prepareStatement(propertyStateSelectExistSQL));
preparedStatements.put(
propertyStateDeleteSQL, con.prepareStatement(propertyStateDeleteSQL));
preparedStatements.put(
nodeReferenceInsertSQL, con.prepareStatement(nodeReferenceInsertSQL));
preparedStatements.put(
nodeReferenceUpdateSQL, con.prepareStatement(nodeReferenceUpdateSQL));
preparedStatements.put(
nodeReferenceSelectSQL, con.prepareStatement(nodeReferenceSelectSQL));
preparedStatements.put(
nodeReferenceSelectExistSQL, con.prepareStatement(nodeReferenceSelectExistSQL));
preparedStatements.put(
nodeReferenceDeleteSQL, con.prepareStatement(nodeReferenceDeleteSQL));
if (!externalBLOBs) {
preparedStatements.put(blobInsertSQL, con.prepareStatement(blobInsertSQL));
preparedStatements.put(blobUpdateSQL, con.prepareStatement(blobUpdateSQL));
preparedStatements.put(blobSelectSQL, con.prepareStatement(blobSelectSQL));
preparedStatements.put(blobSelectExistSQL, con.prepareStatement(blobSelectExistSQL));
preparedStatements.put(blobDeleteSQL, con.prepareStatement(blobDeleteSQL));
}
}
//--------------------------------------------------------< inner classes >
class SizedInputStream extends FilterInputStream {
private final long size;
private boolean consumed = false;
SizedInputStream(InputStream in, long size) {
super(in);
this.size = size;
}
long getSize() {
return size;
}
boolean isConsumed() {
return consumed;
}
public int read() throws IOException {
consumed = true;
return super.read();
}
public long skip(long n) throws IOException {
consumed = true;
return super.skip(n);
}
public int read(byte b[]) throws IOException {
consumed = true;
return super.read(b);
}
public int read(byte b[], int off, int len) throws IOException {
consumed = true;
return super.read(b, off, len);
}
}
class DbBLOBStore implements BLOBStore {
/**
* {@inheritDoc}
*/
public String createId(PropertyId id, int index) {
// the blobId is a simple string concatenation of id plus index
StringBuffer sb = new StringBuffer();
sb.append(id.toString());
sb.append('[');
sb.append(index);
sb.append(']');
return sb.toString();
}
/**
* {@inheritDoc}
*/
public InputStream get(String blobId) throws Exception {
synchronized (blobSelectSQL) {
Statement stmt = executeStmt(blobSelectSQL, new Object[]{blobId});
final ResultSet rs = stmt.getResultSet();
if (!rs.next()) {
closeResultSet(rs);
throw new Exception("no such BLOB: " + blobId);
}
InputStream in = rs.getBinaryStream(1);
if (in == null) {
// some databases treat zero-length values as NULL;
// return empty InputStream in such a case
closeResultSet(rs);
return new ByteArrayInputStream(new byte[0]);
}
/**
* return an InputStream wrapper in order to
* close the ResultSet when the stream is closed
*/
return new FilterInputStream(in) {
public void close() throws IOException {
in.close();
// now it's safe to close ResultSet
closeResultSet(rs);
}
};
}
}
/**
* {@inheritDoc}
*/
public synchronized void put(String blobId, InputStream in, long size)
throws Exception {
Statement stmt = executeStmt(blobSelectExistSQL, new Object[]{blobId});
ResultSet rs = stmt.getResultSet();
// a BLOB exists if the result has at least one entry
boolean exists = rs.next();
closeResultSet(rs);
String sql = (exists) ? blobUpdateSQL : blobInsertSQL;
executeStmt(sql, new Object[]{new SizedInputStream(in, size), blobId});
}
/**
* {@inheritDoc}
*/
public synchronized boolean remove(String blobId) throws Exception {
Statement stmt = executeStmt(blobDeleteSQL, new Object[]{blobId});
return stmt.getUpdateCount() == 1;
}
}
}