Package edu.cmu.graphchi.apps.kcore

Source Code of edu.cmu.graphchi.apps.kcore.KCoreDecomposer

package edu.cmu.graphchi.apps.kcore;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.logging.Logger;

import edu.cmu.graphchi.ChiFilenames;
import edu.cmu.graphchi.ChiLogger;
import edu.cmu.graphchi.ChiVertex;
import edu.cmu.graphchi.GraphChiContext;
import edu.cmu.graphchi.GraphChiProgram;
import edu.cmu.graphchi.datablocks.IntConverter;
import edu.cmu.graphchi.engine.GraphChiEngine;
import edu.cmu.graphchi.engine.VertexInterval;
import edu.cmu.graphchi.io.CompressedIO;
import edu.cmu.graphchi.preprocessing.EdgeProcessor;
import edu.cmu.graphchi.preprocessing.FastSharder;
import edu.cmu.graphchi.preprocessing.VertexIdTranslate;
import edu.cmu.graphchi.preprocessing.VertexProcessor;
import edu.cmu.graphchi.util.IdInt;
import edu.cmu.graphchi.util.Toplist;

/**
* K-core decomposition algorithm
*
* Outputs: a file containing key-value pairs: vertexId, coreness
*
* How does it work ?
* 1 - Initializes vertex values to their degrees then those values are communicated to neighbors.
* 2 - for each vertex v, an upper-bound is computed on its coreness based on the values received from neighbors.
* 3 - if the upper-bound is better than its current value, v updates its value with the upper-bound.
* 4 - Steps 2 and 3 are repeated until no more value updates are occurring.
*
* For correct results, run your input graph through GraphTransformer first.
* Also, make sure to delete the preprocessed shard files created by GraphTransformer prior to running KCoreDecomposer.
*
* KCoreDecomposer is inspired from the algorithm presented in the following paper:
*     Distributed K-Core Decomposition
*     Alberto Montresor, Francesco De Pellegrini, Daniele Miorandi
*     http://ieeexplore.ieee.org/stamp/stamp.jsp?arnumber=6189336
*
* <i>Note</i>: You may change output and input directory path based on your needs.
*
* @author Wissam Khaouid, wissamk@uvic.ca, 2014
*/

public class KCoreDecomposer implements GraphChiProgram<Integer, Integer> {

    public static final int INFINITY = Integer.MAX_VALUE;

    protected int vertexValuesUpdated;
    protected static int nVertexes = 0;

    private static int nIterations = 0;
    protected static BufferedWriter bw;

    private static Logger logger = ChiLogger.getLogger("kCoreDecomposition");

    public static void startWriting(File file, boolean append) throws IOException {
        FileWriter fw = new FileWriter(file, append);
        bw = new BufferedWriter(fw);
    }

    public static void stopWriting() throws IOException {
        bw.close();
    }

    public void update(ChiVertex<Integer, Integer> vertex, GraphChiContext context) {

        int iteration = context.getIteration();
        int numOutEdges = vertex.numOutEdges();

        if (iteration == 0) {
            vertex.setValue(numOutEdges);
            broadcastValue(vertex, numOutEdges);
            nVertexes++;
            vertexValuesUpdated++;
        } else {
            int topDrawer = vertex.getValue() + 1,
                    topDrawerCount = 0,
                    localEstimate = 0;

            SortedMap<Integer, Integer> inEdgeValueCounts =
                    Collections.synchronizedSortedMap(
                            new TreeMap<Integer, Integer>(Collections.reverseOrder()));

            for(int i = 0; i <= vertex.numOutEdges(); i++) {
                inEdgeValueCounts.put(i, 0);
            }

            for(int i = 0; i < vertex.numInEdges(); i++) {
                int inEdgeValue = vertex.inEdge(i).getValue();
                if( inEdgeValue >= topDrawer ) {
                    topDrawerCount ++;
                } else {
                    try {
                        int currentValue = inEdgeValueCounts.get(inEdgeValue);
                        inEdgeValueCounts.put(inEdgeValue, currentValue + 1);
                    }
                    catch(Exception e) {
                        e.printStackTrace();
                        System.exit(0);
                    }

                }
            }

            inEdgeValueCounts.put(topDrawer, topDrawerCount);
            localEstimate = computeLeastValue(inEdgeValueCounts);

            if( localEstimate < vertex.getValue() ) {
                vertex.setValue(localEstimate);
                broadcastValue(vertex, localEstimate);
                vertexValuesUpdated ++;
            }
        }

        context.getScheduler().addTask(vertex.getId());

    }

    /**
     * Computes the greatest x among a list of values, such that at least x values are greater than x
     * For now, the array is instantiated and filled up elsewhere
     */
    public int computeLeastValue(SortedMap<Integer, Integer> map) {
        int cumulCount = 0;
        int key, count;
        for(Map.Entry<Integer, Integer> entry : map.entrySet()) {
            key = entry.getKey();
            count = entry.getValue();
            cumulCount += count;
            if(cumulCount >= key) {
                return key;
            }
        }
        return 1;
    }

    /**
     * Broadcasts a value to the neighbors by writing it to the out-edges
     */

    public void broadcastValue(ChiVertex<Integer, Integer> vertex, int value) {
        for(int i = 0; i < vertex.numOutEdges(); i++) {
            vertex.outEdge(i).setValue(value);
        }
    }

    /**
     * Invoked with the start of a new iteration
     */
    public void beginIteration(GraphChiContext ctx) {
        vertexValuesUpdated = 0;
    }

    /**
     * Invoked at the end of every iteration
     */
    public void endIteration(GraphChiContext ctx) {
        System.out.println(vertexValuesUpdated + " updates.");
        System.out.println("iteration " + ctx.getIteration() + " ends.");

        nIterations ++;
        if( vertexValuesUpdated == 0 ) {
            System.out.println("no updates in this round. No more rounds .. KCore-montresor terminates!");
            ctx.getScheduler().removeAllTasks();
        }
    }

    public void beginInterval(GraphChiContext ctx, VertexInterval interval) {}

    public void endInterval(GraphChiContext ctx, VertexInterval interval) {}

    public void beginSubInterval(GraphChiContext ctx, VertexInterval interval) {}

    public void endSubInterval(GraphChiContext ctx, VertexInterval interval) {}

    protected static FastSharder createSharder(String graphName, int numShards) throws IOException {
        return new FastSharder<Integer, Integer>(graphName, numShards, new VertexProcessor<Integer>() {
            public Integer receiveVertexValue(int vertexId, String token) {
                return 0;
            }
        }, new EdgeProcessor<Integer>() {
            public Integer receiveEdge(int from, int to, String token) {
                return 0;
            }
        }, new IntConverter(), new IntConverter());
    }

    public static void main(String[] args) throws IOException {

        /** Run from command line (Example)
         *  java -Xmx2048m -cp bin:gchi-libs/* -Dnum_threads=4 edu.cmu.graphchi.apps.kcore.KCoreDecomposition filename nbrOfShards filetype memoryBudget
         *
         * Assuming GraphChi jar files are saved in ./gchi-libs/
         */

        String inputDirectory = "./datasets/";
        String outputDirectory = "./output/";

        String fileName = args[0];
        int nShards = Integer.parseInt(args[1]);
        String fileType = args[2];
        int memBudget = (args.length >= 4 ? Integer.parseInt(args[3]) : null);

        CompressedIO.disableCompression();

        String inputFilePath = inputDirectory + fileName;
       
        /* Preprocessing graph : Making shards */

        FastSharder sharder = createSharder(inputFilePath, nShards);
        if (inputFilePath.equals("pipein")) {     // Allow piping graph in
            sharder.shard(System.in, fileType);
        } else {
            if (!new File(ChiFilenames.getFilenameIntervals(inputFilePath, nShards)).exists()) {
                sharder.shard(new FileInputStream(new File(inputFilePath)), fileType);
            } else {
                logger.info("Found shards -- no need to preprocess");
            }
        }

        /* Running GraphChi */
        GraphChiEngine<Integer, Integer> engine =
                new GraphChiEngine<Integer, Integer>(inputFilePath, nShards);
        engine.setMemoryBudgetMb(memBudget);
        engine.setSkipZeroDegreeVertices(true);
        engine.setEnableScheduler(true);
        engine.setEdataConverter(new IntConverter());
        engine.setVertexDataConverter(new IntConverter());

        engine.run(new KCoreDecomposer(), INFINITY);

        logger.info("Ready.");

        /* Outputting Core Values */
        startWriting(new File(outputDirectory + "out-cores-" + fileName), false);
        bw.write(nVertexes + "\n");

        VertexIdTranslate trans = engine.getVertexIdTranslate();
        TreeSet<IdInt> topToBottom = Toplist.topListInt(inputFilePath,
                engine.numVertices(), engine.numVertices());

        for(IdInt walker : topToBottom) {
            float coreValue = walker.getValue();
            bw.write(trans.backward(walker.getVertexId()) + ", " + String.valueOf((int)coreValue) + "\n");
        }

        stopWriting();

        System.out.println("Vertexes Processed: " + engine.numVertices());
        System.out.println("Edges Processed: " + engine.numEdges()) ;

        System.out.println("nIterations: " + nIterations);
        System.out.println("Success!");

    }

}
TOP

Related Classes of edu.cmu.graphchi.apps.kcore.KCoreDecomposer

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.