Package org.archive.crawler.util

Source Code of org.archive.crawler.util.BdbUriUniqFilter

/*
*  This file is part of the Heritrix web crawler (crawler.archive.org).
*
*  Licensed to the Internet Archive (IA) by one or more individual
*  contributors.
*
*  The IA licenses this file to You under the Apache License, Version 2.0
*  (the "License"); you may not use this file except in compliance with
*  the License.  You may obtain a copy of the License at
*
*      http://www.apache.org/licenses/LICENSE-2.0
*
*  Unless required by applicable law or agreed to in writing, software
*  distributed under the License is distributed on an "AS IS" BASIS,
*  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*  See the License for the specific language governing permissions and
*  limitations under the License.
*/
package org.archive.crawler.util;

import java.io.File;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;

import org.archive.bdb.BdbModule;
import org.archive.checkpointing.Checkpoint;
import org.archive.checkpointing.Checkpointable;
import org.archive.util.FileUtils;
import org.json.JSONException;
import org.json.JSONObject;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.Lifecycle;

import st.ata.util.FPGenerator;

import com.sleepycat.bind.tuple.LongBinding;
import com.sleepycat.je.Cursor;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.DatabaseNotFoundException;
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.OperationStatus;


/**
* A BDB implementation of an AlreadySeen list.
*
* This implementation performs adequately without blowing out
* the heap. See
* <a href="http://crawler.archive.org/cgi-bin/wiki.pl?AlreadySeen">AlreadySeen</a>.
*
* <p>Makes keys that have URIs from same server close to each other.  Mercator
* and 2.3.5 'Elminating Already-Visited URLs' in 'Mining the Web' by Soumen
* Chakrabarti talk of a two-level key with the first 24 bits a hash of the
* host plus port and with the last 40 as a hash of the path.  Testing
* showed adoption of such a scheme halving lookup times (Tutilhis implementation
* actually concatenates scheme + host in first 24 bits and path + query in
* trailing 40 bits).
*
* @author stack
* @version $Date$, $Revision$
*/
public class BdbUriUniqFilter extends SetBasedUriUniqFilter
implements Lifecycle, Checkpointable, BeanNameAware, DisposableBean {
    private static final long serialVersionUID = -8099357538178524011L;

    private static Logger logger =
        Logger.getLogger(BdbUriUniqFilter.class.getName());

    protected boolean createdEnvironment = false;
    protected long lastCacheMiss = 0;
    protected long lastCacheMissDiff = 0;
    protected transient Database alreadySeen = null;
    protected transient DatabaseEntry value = null;
    static protected DatabaseEntry ZERO_LENGTH_ENTRY =
        new DatabaseEntry(new byte[0]);
    private static final String DB_NAME = "alreadySeenUrl";
    protected AtomicLong count = new AtomicLong(0);
    private long aggregatedLookupTime = 0;
   
    private static final String COLON_SLASH_SLASH = "://";
   
    protected BdbModule bdb;
    @Autowired
    public void setBdbModule(BdbModule bdb) {
        this.bdb = bdb;
    }
   
    protected String beanName;
    public void setBeanName(String name) {
        this.beanName = name;
    }
   
    public BdbUriUniqFilter() {
    }
   
    protected boolean isRunning = false;
    public void start() {
        if(isRunning()) {
            return;
        }
        boolean isRecovery = (recoveryCheckpoint != null);
        try {
            BdbModule.BdbConfig config = getDatabaseConfig();
            config.setAllowCreate(!isRecovery);
            initialize(bdb.openDatabase(DB_NAME, config, isRecovery));
        } catch (DatabaseException e) {
            throw new IllegalStateException(e);
        }
        if(isRecovery) {
            JSONObject json = recoveryCheckpoint.loadJson(beanName);
            try {
                count.set(json.getLong("count"));
            } catch (JSONException e) {
                throw new RuntimeException(e);
            }          
        }
        isRunning = true;
    }
   
    public boolean isRunning() {
        return isRunning;
    }

    public void stop() {
        if(!isRunning()) {
            return;
        }
        // XXX do sync? currently happens in close()
        isRunning = false;
    }
   
    public void destroy() {
        close();
    }
   
    /**
     * Constructor.
     *
     * Only used for testing; usually no-arg constructor is used, and
     * environment provided by injected BdbModule.
     *
     * @param bdbEnv The directory that holds the bdb environment. Will
     * make a database under here if doesn't already exit.  Otherwise
     * reopens any existing dbs.
     * @throws IOException
     */
    public BdbUriUniqFilter(File bdbEnv)
    throws IOException {
        this(bdbEnv, -1);
    }
   
    /**
     * Constructor.
     *
     * Only used for testing; usually no-arg constructor is used, and
     * environment provided by injected BdbModule.
     *
     * @param bdbEnv The directory that holds the bdb environment. Will
     * make a database under here if doesn't already exit.  Otherwise
     * reopens any existing dbs.
     * @param cacheSizePercentage Percentage of JVM bdb allocates as
     * its cache.  Pass -1 to get default cache size.
     * @throws IOException
     */
    public BdbUriUniqFilter(File bdbEnv, final int cacheSizePercentage)
    throws IOException {
        super();
        FileUtils.ensureWriteableDirectory(bdbEnv);
        EnvironmentConfig envConfig = new EnvironmentConfig();
        envConfig.setAllowCreate(true);
        if (cacheSizePercentage > 0 && cacheSizePercentage < 100) {
            envConfig.setCachePercent(cacheSizePercentage);
        }
        try {
            createdEnvironment = true;
            Environment env = new Environment(bdbEnv, envConfig);
            BdbModule.BdbConfig config = getDatabaseConfig();
            config.setAllowCreate(true);
            try {
                env.truncateDatabase(null, DB_NAME, false);
            } catch (DatabaseNotFoundException e) {
                // ignored
            }
            Database db = env.openDatabase(null, DB_NAME, config.toDatabaseConfig());
            initialize(db);
        } catch (DatabaseException e) {
            IOException io = new IOException();
            io.initCause(e);
            throw io;
        }
    }


    /**
     * Method shared by constructors.
     * @param env Environment to use.
     * @throws DatabaseException
     */
    protected void initialize(Database db) throws DatabaseException {
        open(db);
    }

    /**
     * @return DatabaseConfig to use
     */
    protected BdbModule.BdbConfig getDatabaseConfig() {
        BdbModule.BdbConfig dbConfig = new BdbModule.BdbConfig();
        return dbConfig;
    }
   
    /**
     * Call after deserializing an instance of this class.  Will open the
     * already seen in passed environment.
     * @param env DB Environment to use.
     * @throws DatabaseException
     */
    public void reopen(Database db)
    throws DatabaseException {
        open(db);
    }
   
    protected void open(final Database db)
    throws DatabaseException {
        this.alreadySeen = db;
        this.value = new DatabaseEntry("".getBytes());
    }
   
    public synchronized void close() {
        logger.fine("Count of alreadyseen on close " + count.get());
        Environment env = null;
        if (this.alreadySeen != null) {
            try {
                env = this.alreadySeen.getEnvironment();
                alreadySeen.sync();
            } catch (DatabaseException e) {
                logger.severe(e.getMessage());
            }
        }
        if (env != null) {
            try {
                // This sync flushes whats in RAM. Its expensive operation.
                // Without, data can be lost. Not for transactional operation.
                env.sync();
            } catch (DatabaseException e) {
                logger.severe(e.getMessage());
            }
        }
       
        if (createdEnvironment) {
            // Only manually close database if it were created via a
            // constructor, and not via a BdbModule. Databases created by a
            // BdbModule will be closed by that BdbModule.
            if (this.alreadySeen != null) {
                try {
                    alreadySeen.close();
                } catch (DatabaseException e) {
                    logger.severe(e.getMessage());
                }
            }
            if (env != null) {
                try {
                    env.close();
                } catch (DatabaseException e) {
                    logger.severe(e.getMessage());
                }
            }
        }
    }
   
    public synchronized long getCacheMisses() {
        if(alreadySeen==null) {
            return 0;
        }
        try {
            long cacheMiss = this.alreadySeen.getEnvironment().
            getStats(null).getNCacheMiss();
            // FIXME: get shouldn't define intervals (should be idempotent)
            this.lastCacheMissDiff = cacheMiss - this.lastCacheMiss;
            this.lastCacheMiss = cacheMiss;
            return this.lastCacheMiss;
        } catch (DatabaseException de) {
            return 0;
        }
       
    }
   
    public long getLastCacheMissDiff() {
        return this.lastCacheMissDiff;
    }
   
    /**
     * Create fingerprint.
     * Pubic access so test code can access createKey.
     * @param uri URI to fingerprint.
     * @return Fingerprint of passed <code>url</code>.
     */
    public static long createKey(CharSequence uri) {
        String url = uri.toString();
        long schemeAuthorityKeyPart = calcSchemeAuthorityKeyBytes(url);
        return schemeAuthorityKeyPart | (FPGenerator.std40.fp(url) >>> 24);
    }

    protected static long calcSchemeAuthorityKeyBytes(String url) {
        int index = url.indexOf(COLON_SLASH_SLASH);
        if (index > 0) {
            index = url.indexOf('/', index + COLON_SLASH_SLASH.length());
        }
        CharSequence schemeAuthority = (index == -1)? url: url.subSequence(0, index);
        return FPGenerator.std24.fp(schemeAuthority);
    }

    protected boolean setAdd(CharSequence uri) {
        DatabaseEntry key = new DatabaseEntry();
        LongBinding.longToEntry(createKey(uri), key);
        long started = 0;
       
        OperationStatus status = null;
        try {
            if (logger.isLoggable(Level.FINE)) {
                started = System.currentTimeMillis();
            }
            status = alreadySeen.putNoOverwrite(null, key, ZERO_LENGTH_ENTRY);
            if (logger.isLoggable(Level.FINE)) {
                aggregatedLookupTime +=
                    (System.currentTimeMillis() - started);
            }
        } catch (DatabaseException e) {
            logger.severe(e.getMessage());
        }
        if (status == OperationStatus.SUCCESS) {
            count.incrementAndGet();
            if (logger.isLoggable(Level.FINE)) {
                final int logAt = 10000;
                if (count.get() > 0 && ((count.get() % logAt) == 0)) {
                    logger.fine("Average lookup " +
                        (aggregatedLookupTime / logAt) + "ms.");
                    aggregatedLookupTime = 0;
                }
            }
        }
        if(status == OperationStatus.KEYEXIST) {
            return false; // not added
        } else {
            return true;
        }
    }

    protected long setCount() {
        return count.get();
    }

    protected boolean setRemove(CharSequence uri) {
        DatabaseEntry key = new DatabaseEntry();
        LongBinding.longToEntry(createKey(uri), key);
            OperationStatus status = null;
        try {
            status = alreadySeen.delete(null, key);
        } catch (DatabaseException e) {
            logger.severe(e.getMessage());
        }
        if (status == OperationStatus.SUCCESS) {
            count.decrementAndGet();
            return true; // removed
        } else {
            return false; // not present
        }
    }
   
    public long flush() {
        // We always write but this might be place to do the sync
        // when checkpointing?  TODO.
        return 0;
    }

    // Checkpointable
    // CrawlController's only interest is in knowing that a Checkpoint is
    // being recovered
    public void startCheckpoint(Checkpoint checkpointInProgress) {}
    public void doCheckpoint(Checkpoint checkpointInProgress) throws IOException {
        JSONObject json = new JSONObject();
        try {
            json.put("count", setCount());
            checkpointInProgress.saveJson(beanName, json);
        } catch (JSONException e) {
            // impossible
            throw new RuntimeException(e);
        }
    }
    public void finishCheckpoint(Checkpoint checkpointInProgress) {}
    protected Checkpoint recoveryCheckpoint;
    public void setRecoveryCheckpoint(Checkpoint recoveryCheckpoint) {
        this.recoveryCheckpoint = recoveryCheckpoint;
    }

    /**
     * Forget all entries that match the scheme+host+port of the given url, so
     * that they can be crawled again if discovered again. Expensive operation.
     *
     * <p>
     * Because of the way keys are calculated, scheme+host+port is the only
     * grouping of urls that is feasible to forget in bulk. See
     * {@link #createKey(CharSequence)}
     *
     * <p>
     * WARNING: Value collisions in this 24-bit schemeAuthority part are going
     * to be fairly common, by 'birthday problem' over 50% likely to show up
     * with as few as 2^12 unique schemeAuthority strings. So the forgetting may
     * forget other hosts.
     *
     * @param url
     *            whose scheme+host+port should be forgotten (remainder of url
     *            is ignored)
     */
    public void forgetAllSchemeAuthorityMatching(String url) {
        long schemeAuthorityKeyLong = calcSchemeAuthorityKeyBytes(url);
       
        DatabaseEntry key = new DatabaseEntry();
        DatabaseEntry value = new DatabaseEntry();
       
        LongBinding.longToEntry(schemeAuthorityKeyLong, key);
        byte[] schemeAuthorityKeyBytes = key.getData();
       
        Cursor cursor = alreadySeen.openCursor(null, null);
        long forgottenCount = 0l;
       
        for (OperationStatus status = cursor.getSearchKeyRange(key, value, null);
                status == OperationStatus.SUCCESS;
                status = cursor.getNext(key, value, null)) {

            byte[] keyData = key.getData();
            if (keyData[0] == schemeAuthorityKeyBytes[0]
                    && keyData[1] == schemeAuthorityKeyBytes[1]
                    && keyData[2] == schemeAuthorityKeyBytes[2]) {
                cursor.delete();
                forgottenCount++;
            } else {
                break;
            }
        }
       
        cursor.close();
        long newCount = count.addAndGet(-forgottenCount);
        logger.info("forgot " + forgottenCount + " urls from scheme+authority of url " + url + " (leaving " + newCount + " urls from other scheme+authorities)");
    }
   
} //EOC
TOP

Related Classes of org.archive.crawler.util.BdbUriUniqFilter

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.