Package org.apache.cassandra.hadoop

Source Code of org.apache.cassandra.hadoop.ColumnFamilyOutputFormatTest

package org.apache.cassandra.hadoop;

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import java.io.IOError;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;

import org.apache.cassandra.EmbeddedServer;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.CfDef;
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.thrift.KeyRange;
import org.apache.cassandra.thrift.KeySlice;
import org.apache.cassandra.thrift.KsDef;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.util.ToolRunner;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/**
* A test case for the {@link ColumnFamilyOutputFormat}, which reads each
* <key, value> pair from a sequence file, maps them to a <key,
* column> pair, and then reduces it by aggregating the columns that
* correspond to the same key. Finally, the output <key, columns> pairs
* are written into the (Cassandra) column family associated with this
* {@link OutputFormat}.
*
* @author Karthick Sankarachary
*
*/

public class ColumnFamilyOutputFormatTest extends EmbeddedServer
{
    static final String KEYSPACE = "ColumnFamilyOutputFormatTestKeyspace";
    static final String COLUMN_FAMILY = "outputColumnFamily";

    private static final String INPUT_FOLDER = "columnfamily.outputtest";

    private static final String INPUT_FILE = "rows.txt";

    private static final int NUMBER_OF_ROWS = 3;
    private static final int NUMBER_OF_COLUMNS = 4;

    List<Integer> rowKeys;
    List<Integer> columnValues;

    private static FileSystem fs;

    static
    {
        try
        {
            fs = FileSystem.getLocal(new Configuration());
        }
        catch (IOException ioe)
        {
            throw new IOError(ioe);
        }
    }

    private Cassandra.Client thriftClient;

    @Before
    public void setup() throws Exception
    {
        setupCassandra();
        defineRows();
        createInputFile();
    }

    @After
    public void tearDown() throws Exception
    {
        deleteInputFile();
        deleteRows();
    }

    private void deleteRows() throws InvalidRequestException, TException
    {
        thriftClient.system_drop_column_family(COLUMN_FAMILY);
    }

    private void deleteInputFile() throws IOException
    {
        inputdir = new Path(INPUT_FOLDER);
        if (!fs.delete(inputdir, true))
            throw new IOException("Delete failed to remove " + inputdir.toString());
    }

    private void setupCassandra() throws TException, InvalidRequestException
    {
        /* Establish a thrift connection to the cassandra instance */
        TSocket socket = new TSocket(DatabaseDescriptor.getListenAddress().getHostName(), DatabaseDescriptor.getRpcPort());
        TTransport transport = new TFramedTransport(socket);
        TBinaryProtocol binaryProtocol = new TBinaryProtocol(transport);
        Cassandra.Client cassandraClient = new Cassandra.Client(binaryProtocol);
        transport.open();
        thriftClient = cassandraClient;
        Set<String> keyspaces = thriftClient.describe_keyspaces();
        if (!keyspaces.contains(KEYSPACE))
        {
            List<CfDef> cfDefs = new ArrayList<CfDef>();
            thriftClient.system_add_keyspace(new KsDef(KEYSPACE, "org.apache.cassandra.locator.RackUnawareStrategy", 1, cfDefs));
        }
        thriftClient.set_keyspace(KEYSPACE);

        CfDef cfDef = new CfDef(KEYSPACE, COLUMN_FAMILY);
        try
        {
            thriftClient.system_add_column_family(cfDef);
        }
        catch (InvalidRequestException e)
        {
            throw new RuntimeException(e);
        }

    }

    private void defineRows()
    {
        rowKeys = new ArrayList<Integer>();
        columnValues = new ArrayList<Integer>();
        for (int key = 0; key < NUMBER_OF_ROWS; key++)
        {
            for (int columnValue = 0; columnValue < NUMBER_OF_COLUMNS; columnValue++)
            {
                rowKeys.add(key);
                columnValues.add(columnValue);
            }
        }
    }

    Path inputdir;

    private void createInputFile() throws IOException
    {
        inputdir = new Path(INPUT_FOLDER);
        if (!fs.mkdirs(inputdir))
        {
            throw new IOException("Mkdirs failed to create " + inputdir.toString());
        }

        Path inputRows = new Path(inputdir, INPUT_FILE);
        SequenceFile.Writer writer = SequenceFile.createWriter(fs, fs.getConf(), inputRows, IntWritable.class, IntWritable.class);
        // OutputStream os = fs.create(inputRows);
        // Writer writer = new OutputStreamWriter(os);
        for (int keyValuePair = 0; keyValuePair < NUMBER_OF_ROWS * NUMBER_OF_COLUMNS; keyValuePair++)
            writer.append(new IntWritable(rowKeys.get(keyValuePair)), new IntWritable(columnValues.get(keyValuePair)));
        writer.close();
    }

    @Test
    public void testCopyCF() throws Exception
    {
        ToolRunner.run(new Configuration(), new SampleColumnFamilyOutputTool(inputdir, COLUMN_FAMILY), new String[] {});
        verifyOutput();

    }

    public void verifyOutput() throws Exception
    {
        List<KeySlice> keySlices = thriftClient.get_range_slices(new ColumnParent().setColumn_family(COLUMN_FAMILY),
                                                                 new SlicePredicate().setColumn_names(new ArrayList<byte[]>()),
                                                                 new KeyRange().setStart_key("".getBytes()).setEnd_key("".getBytes()),
                                                                 ConsistencyLevel.ONE);
        for (KeySlice keySlice : keySlices)
        {
            Integer key = Integer.parseInt(new String(keySlice.getKey()));
            List<Integer> columnValues = new ArrayList<Integer>();
            for (ColumnOrSuperColumn cosc : keySlice.getColumns())
                columnValues.add(Integer.parseInt(new String(cosc.getColumn().getValue())));
            verifyKeyValues(key, columnValues);
        }
    }

    private void verifyKeyValues(Integer key, List<Integer> columnValues) throws Exception
    {
        List<Integer> outputColumnValues = new ArrayList<Integer>();
        for (int i = 0; i < rowKeys.size(); i++)
        {
            if (rowKeys.get(i).equals(key))
                outputColumnValues.add(this.columnValues.get(i));
        }
        columnValues.removeAll(outputColumnValues);
        assert columnValues.isEmpty() : "Some of the input columns could not be found in the column family";
    }
}
TOP

Related Classes of org.apache.cassandra.hadoop.ColumnFamilyOutputFormatTest

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.