Package org.apache.accumulo.server.logger

Source Code of org.apache.accumulo.server.logger.TestLogWriter

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements.  See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.server.logger;

import static org.apache.accumulo.server.logger.LogEvents.COMPACTION_FINISH;
import static org.apache.accumulo.server.logger.LogEvents.COMPACTION_START;
import static org.apache.accumulo.server.logger.LogEvents.DEFINE_TABLET;
import static org.apache.accumulo.server.logger.LogEvents.MANY_MUTATIONS;
import static org.apache.accumulo.server.logger.LogEvents.MUTATION;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.File;
import java.util.ArrayList;
import java.util.List;

import org.apache.accumulo.core.client.impl.HdfsZooInstance;
import org.apache.accumulo.core.conf.ZooConfiguration;
import org.apache.accumulo.core.data.KeyExtent;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.data.thrift.TMutation;
import org.apache.accumulo.core.security.thrift.AuthInfo;
import org.apache.accumulo.core.tabletserver.thrift.LogFile;
import org.apache.accumulo.core.tabletserver.thrift.NoSuchLogIDException;
import org.apache.accumulo.core.tabletserver.thrift.TabletMutations;
import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.server.logger.LogEvents;
import org.apache.accumulo.server.logger.LogFileKey;
import org.apache.accumulo.server.logger.LogFileValue;
import org.apache.accumulo.server.logger.LogWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class TestLogWriter {
  private static final AuthInfo CREDENTIALS = new AuthInfo();
 
  static final String INSTANCE_ID = "SomeInstance123";
  static FileSystem fs = null;
  static Configuration conf = null;
 
  LogWriter writer;
 
  @Before
  public void setUp() throws Exception {
    // supress log messages having to do with not having an instance
    Logger.getLogger(ZooConfiguration.class).setLevel(Level.OFF);
    Logger.getLogger(HdfsZooInstance.class).setLevel(Level.OFF);
    conf = CachedConfiguration.getInstance();
    if (fs == null) {
      fs = FileSystem.getLocal(conf);
    }
    writer = new LogWriter(conf, fs, "target", INSTANCE_ID);
    Logger.getLogger(LogWriter.class).setLevel(Level.FATAL);
  }
 
  @After
  public void tearDown() throws Exception {
    writer.shutdown();
    writer = null;
  }
 
  @Test
  public void testClose() throws Exception {
    LogFile logFile = writer.create(null, CREDENTIALS, "");
    writer.close(null, logFile.id);
    try {
      writer.close(null, logFile.id);
      fail("close didn't throw exception");
    } catch (NoSuchLogIDException ex) {
      // ignored
    }
    cleanup(logFile);
  }
 
  private void cleanup(LogFile logFile) throws Exception {
    new File("./" + logFile.name).delete();
  }
 
  @Test
  public void testCopy() throws Exception {
    LogFile logFile = writer.create(null, CREDENTIALS, "");
    Path mylog = new Path("mylog");
    fs.delete(mylog, true);
    assertTrue(!fs.exists(mylog));
    writer.startCopy(null, CREDENTIALS, logFile.name, "mylog", false);
    UtilWaitThread.sleep(500);
    assertTrue(fs.exists(mylog));
    Mutation m = new Mutation(new Text("row1"));
    m.put(new Text("cf"), new Text("cq"), new Value("value".getBytes()));
    try {
      writer.log(null, logFile.id, 1, 0, m.toThrift());
      fail("writing to a log after it has been copied to hdfs should fail");
    } catch (NoSuchLogIDException ex) {
      // ignored
    }
    fs.delete(mylog, true);
    cleanup(logFile);
  }
 
  private SequenceFile.Reader readOpen(LogFile logFile) throws Exception {
    String path = "./target/" + logFile.name;
    assertTrue(fs.exists(new Path(path)));
    SequenceFile.Reader result = new SequenceFile.Reader(fs, new Path(path), conf);
    LogFileKey key = new LogFileKey();
    LogFileValue value = new LogFileValue();
    assertTrue(result.next(key, value));
    assertTrue(key.event == LogEvents.OPEN);
    assertTrue(key.tid == LogFileKey.VERSION);
    return result;
  }
 
  @Test
  public void testCreate() throws Exception {
    LogFile logFile = writer.create(null, CREDENTIALS, "");
    writer.close(null, logFile.id);
    readOpen(logFile);
    cleanup(logFile);
  }
 
  @Test
  public void testLog() throws Exception {
    LogFile logFile = writer.create(null, CREDENTIALS, "");
    Mutation m = new Mutation(new Text("somerow"));
    m.put(new Text("cf1"), new Text("cq1"), new Value("value1".getBytes()));
    writer.log(null, logFile.id, 2, 42, m.toThrift());
    writer.close(null, logFile.id);
    SequenceFile.Reader dis = readOpen(logFile);
    LogFileKey key = new LogFileKey();
    LogFileValue value = new LogFileValue();
    assertTrue(dis.next(key, value));
    assertTrue(key.event == MUTATION);
    assertEquals(key.seq, 2);
    assertEquals(key.tid, 42);
    assertEquals(value.mutations.length, 1);
    assertTrue(m.equals(value.mutations[0]));
    cleanup(logFile);
  }
 
  @Test
  public void testLogManyTablets() throws Exception {
    LogFile logFile = writer.create(null, CREDENTIALS, "");
    List<TMutation> all = new ArrayList<TMutation>();
    for (int i = 0; i < 10; i++) {
      Mutation m = new Mutation(new Text("somerow"));
      m.put(new Text("cf" + i), new Text("cq" + i), new Value(("value" + i).getBytes()));
      all.add(m.toThrift());
    }
    List<TabletMutations> updates = new ArrayList<TabletMutations>();
    updates.add(new TabletMutations(13, 3, all));
    writer.logManyTablets(null, logFile.id, updates);
    writer.close(null, logFile.id);
    SequenceFile.Reader dis = readOpen(logFile);
    LogFileKey key = new LogFileKey();
    LogFileValue value = new LogFileValue();
    assertTrue(dis.next(key, value));
    assertTrue(key.event == MANY_MUTATIONS);
    assertEquals(key.seq, 3);
    assertEquals(key.tid, 13);
    assertEquals(value.mutations.length, 10);
    for (int i = 0; i < 10; i++) {
      assertTrue(new Mutation(all.get(i)).equals(value.mutations[i]));
    }
    cleanup(logFile);
  }
 
  @Test
  public void testMinorCompactionFinished() throws Exception {
    LogFile logFile = writer.create(null, CREDENTIALS, "");
    String fqfn = "/foo/bar";
    writer.minorCompactionFinished(null, logFile.id, 4, 17, fqfn);
    writer.close(null, logFile.id);
    SequenceFile.Reader dis = readOpen(logFile);
    LogFileKey key = new LogFileKey();
    LogFileValue value = new LogFileValue();
    assertTrue(dis.next(key, value));
    assertTrue(key.event == COMPACTION_FINISH);
    assertEquals(key.seq, 4);
    assertEquals(key.tid, 17);
    cleanup(logFile);
  }
 
  @Test
  public void testMinorCompactionStarted() throws Exception {
    LogFile logFile = writer.create(null, CREDENTIALS, "");
    String fqfn = "/foo/bar";
    writer.minorCompactionStarted(null, logFile.id, 5, 23, fqfn);
    writer.close(null, logFile.id);
    SequenceFile.Reader dis = readOpen(logFile);
    LogFileKey key = new LogFileKey();
    LogFileValue value = new LogFileValue();
    assertTrue(dis.next(key, value));
    assertTrue(key.event == COMPACTION_START);
    assertEquals(key.seq, 5);
    assertEquals(key.tid, 23);
    assertEquals(key.filename, "/foo/bar");
    cleanup(logFile);
  }
 
  @Test
  public void testDefineTablet() throws Exception {
    LogFile logFile = writer.create(null, CREDENTIALS, "");
    KeyExtent ke = new KeyExtent(new Text("table1"), new Text("zzzz"), new Text("aaaaa"));
    writer.defineTablet(null, logFile.id, 6, 31, ke.toThrift());
    writer.close(null, logFile.id);
    SequenceFile.Reader dis = readOpen(logFile);
    LogFileKey key = new LogFileKey();
    LogFileValue value = new LogFileValue();
    assertTrue(dis.next(key, value));
    assertTrue(key.event == DEFINE_TABLET);
    assertEquals(key.seq, 6);
    assertEquals(key.tid, 31);
    assertEquals(ke, key.tablet);
  }
 
  @Test
  public void testNothing() {}
}
TOP

Related Classes of org.apache.accumulo.server.logger.TestLogWriter

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.