Package org.apache.hadoop.hdfs.qjournal.server

Source Code of org.apache.hadoop.hdfs.qjournal.server.TestJournal

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.qjournal.server;

import static org.junit.Assert.*;

import java.io.File;
import java.io.IOException;

import junit.framework.TestCase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.qjournal.QJMTestUtil;
import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
import org.apache.hadoop.hdfs.qjournal.protocol.JournalOutOfSyncException;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
import org.apache.hadoop.hdfs.qjournal.server.Journal;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.Transition;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
import org.apache.hadoop.hdfs.server.namenode.NNStorageDirectoryRetentionManager;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

public class TestJournal {
  private static final NamespaceInfo FAKE_NSINFO = new NamespaceInfo(12345, 0L,
      0);
  private static final NamespaceInfo FAKE_NSINFO_2 = new NamespaceInfo(6789,
      0L, 0);

  private static final String JID = "test-journal";
  private static final byte[] JIDB = QuorumJournalManager
      .journalIdStringToBytes(JID);

  private static final File TEST_LOG_DIR = new File(new File(
      MiniDFSCluster.getBaseDirectory(null), "edits"), "TestJournal");
  private static final File TEST_IMG_DIR = new File(new File(
      MiniDFSCluster.getBaseDirectory(null), "image"), "TestJournal");

  private StorageErrorReporter mockErrorReporter = Mockito
      .mock(StorageErrorReporter.class);
  private JournalNode mockJournalNode = Mockito.mock(JournalNode.class);
 
  {
    Configuration conf = new Configuration();
    Mockito.doReturn(conf).when(mockJournalNode).getConf();
  }
 
  private static final String STORAGE_FILE_LOCK = "in_use.lock";

  private Journal journal;

  @Before
  public void setup() throws Exception {
    FileUtil.fullyDelete(TEST_LOG_DIR);
    FileUtil.fullyDelete(TEST_IMG_DIR);
    journal = new Journal(TEST_LOG_DIR, TEST_IMG_DIR, JID, mockErrorReporter,
        mockJournalNode);
    journal.transitionJournal(FAKE_NSINFO, Transition.FORMAT, null);
    journal.transitionImage(FAKE_NSINFO, Transition.FORMAT, null);
  }

  @After
  public void verifyNoStorageErrors() throws Exception {
    Mockito.verify(mockErrorReporter, Mockito.never()).reportErrorOnFile(
        Mockito.<File> any());
  }

  @After
  public void cleanup() {
    IOUtils.closeStream(journal);
  }
 
  @Test
  public void testEpochHandling() throws Exception {
    assertEquals(0, journal.getLastPromisedEpoch());
    NewEpochResponseProto newEpoch = journal.newEpoch(FAKE_NSINFO, 1);
    assertFalse(newEpoch.hasLastSegmentTxId());
    assertEquals(1, journal.getLastPromisedEpoch());
    journal.newEpoch(FAKE_NSINFO, 3);
    assertFalse(newEpoch.hasLastSegmentTxId());
    assertEquals(3, journal.getLastPromisedEpoch());
    try {
      journal.newEpoch(FAKE_NSINFO, 3);
      fail("Should have failed to promise same epoch twice");
    } catch (IOException ioe) {
      GenericTestUtils.assertExceptionContains(
          "Proposed epoch 3 <= last promise 3", ioe);
    }
    try {
      journal.startLogSegment(makeRI(1), 12345L);
      fail("Should have rejected call from prior epoch");
    } catch (IOException ioe) {
      GenericTestUtils.assertExceptionContains(
          "epoch 1 is less than the last promised epoch 3", ioe);
    }
    try {
      journal.journal(makeRI(1), 12345L, 100L, 0, new byte[0]);
      fail("Should have rejected call from prior epoch");
    } catch (IOException ioe) {
      GenericTestUtils.assertExceptionContains(
          "epoch 1 is less than the last promised epoch 3", ioe);
    }
  }

  @Test
  public void testMaintainCommittedTxId() throws Exception {
    journal.newEpoch(FAKE_NSINFO, 1);
    journal.startLogSegment(makeRI(1), 1);
    // Send txids 1-3, with a request indicating only 0 committed
    journal.journal(new RequestInfo(JIDB, 1, 2, 0), 1, 1, 3,
        QJMTestUtil.createTxnData(1, 3));
    assertEquals(0, journal.getCommittedTxnId());

    // Send 4-6, with request indicating that through 3 is committed.
    journal.journal(new RequestInfo(JIDB, 1, 3, 3), 1, 4, 3,
        QJMTestUtil.createTxnData(4, 6));
    assertEquals(3, journal.getCommittedTxnId());
  }

  @Test
  public void testRestartJournal() throws Exception {
    journal.newEpoch(FAKE_NSINFO, 1);
    journal.startLogSegment(makeRI(1), 1);
    journal.journal(makeRI(2), 1, 1, 2, QJMTestUtil.createTxnData(1, 2));
    // Don't finalize.

    String storageString = journal.getJournalStorage().toColonSeparatedString();
    System.err.println("storage string: " + storageString);
    journal.close(); // close to unlock the storage dir

    // Now re-instantiate, make sure history is still there
    journal = new Journal(TEST_LOG_DIR, TEST_IMG_DIR, JID, mockErrorReporter,
        mockJournalNode);

    // The storage info should be read, even if no writer has taken over.
    assertEquals(storageString, journal.getJournalStorage().toColonSeparatedString());

    assertEquals(1, journal.getLastPromisedEpoch());
    NewEpochResponseProto newEpoch = journal.newEpoch(FAKE_NSINFO, 2);
    assertEquals(1, newEpoch.getLastSegmentTxId());
  }

  @Test
  public void testFormatResetsCachedValues() throws Exception {
    journal.newEpoch(FAKE_NSINFO, 12345L);
    journal.startLogSegment(new RequestInfo(JIDB, 12345L, 1L, 0L), 1L);

    assertEquals(12345L, journal.getLastPromisedEpoch());
    assertEquals(12345L, journal.getLastWriterEpoch());
    assertTrue(journal.isJournalFormatted());

    journal.transitionJournal(FAKE_NSINFO_2, Transition.FORMAT, null);

    assertEquals(0, journal.getLastPromisedEpoch());
    assertEquals(0, journal.getLastWriterEpoch());
    assertTrue(journal.isJournalFormatted());
  }

  /**
   * Test that, if the writer crashes at the very beginning of a segment, before
   * any transactions are written, that the next newEpoch() call returns the
   * prior segment txid as its most recent segment.
   */
  @Test
  public void testNewEpochAtBeginningOfSegment() throws Exception {
    journal.newEpoch(FAKE_NSINFO, 1);
    journal.startLogSegment(makeRI(1), 1);
    journal.journal(makeRI(2), 1, 1, 2, QJMTestUtil.createTxnData(1, 2));
    journal.finalizeLogSegment(makeRI(3), 1, 2);
    journal.startLogSegment(makeRI(4), 3);
    NewEpochResponseProto resp = journal.newEpoch(FAKE_NSINFO, 2);
    assertEquals(1, resp.getLastSegmentTxId());
  }

  @Test
  public void testJournalLocking() throws Exception {
    Assume.assumeTrue(journal.getJournalStorage().isLockSupported(0));
    StorageDirectory sd = journal.getJournalStorage().getStorageDir(0);
    File lockFile = new File(sd.getRoot(), STORAGE_FILE_LOCK);

    // Journal should be locked, since the format() call locks it.
    GenericTestUtils.assertExists(lockFile);

    journal.newEpoch(FAKE_NSINFO, 1);
    try {
      new Journal(TEST_LOG_DIR, TEST_IMG_DIR, JID, mockErrorReporter, mockJournalNode);
      fail("Did not fail to create another journal in same dir");
    } catch (IOException ioe) {
      GenericTestUtils.assertExceptionContains("Cannot lock storage", ioe);
    }

    journal.close();

    // Journal should no longer be locked after the close() call.
    // Hence, should be able to create a new Journal in the same dir.
    Journal journal2 = new Journal(TEST_LOG_DIR, TEST_IMG_DIR, JID,
        mockErrorReporter, mockJournalNode);
    journal2.newEpoch(FAKE_NSINFO, 2);
    journal2.close();
  }

  /**
   * Test finalizing a segment after some batch of edits were missed. This
   * should fail, since we validate the log before finalization.
   */
  @Test
  public void testFinalizeWhenEditsAreMissed() throws Exception {
    journal.newEpoch(FAKE_NSINFO, 1);
    journal.startLogSegment(makeRI(1), 1);
    journal.journal(makeRI(2), 1, 1, 3, QJMTestUtil.createTxnData(1, 3));

    // Try to finalize up to txn 6, even though we only wrote up to txn 3.
    try {
      journal.finalizeLogSegment(makeRI(3), 1, 6);
      fail("did not fail to finalize");
    } catch (JournalOutOfSyncException e) {
      GenericTestUtils.assertExceptionContains("but only written up to txid 3",
          e);
    }

    // Check that, even if we re-construct the journal by scanning the
    // disk, we don't allow finalizing incorrectly.
    journal.close();
    journal = new Journal(TEST_LOG_DIR, TEST_IMG_DIR, JID, mockErrorReporter,
        mockJournalNode);

    try {
      journal.finalizeLogSegment(makeRI(4), 1, 6);
      fail("did not fail to finalize");
    } catch (JournalOutOfSyncException e) {
      GenericTestUtils.assertExceptionContains(
          "disk only contains up to txid 3", e);
    }
  }

  /**
   * Ensure that finalizing a segment which doesn't exist throws the appropriate
   * exception.
   */
  @Test
  public void testFinalizeMissingSegment() throws Exception {
    journal.newEpoch(FAKE_NSINFO, 1);
    try {
      journal.finalizeLogSegment(makeRI(1), 1000, 1001);
      fail("did not fail to finalize");
    } catch (JournalOutOfSyncException e) {
      GenericTestUtils.assertExceptionContains(
          "No log file to finalize at transaction ID 1000", e);
    }
  }

  /**
   * Assume that a client is writing to a journal, but loses its connection in
   * the middle of a segment. Thus, any future journal() calls in that segment
   * may fail, because some txns were missed while the connection was down.
   *
   * Eventually, the connection comes back, and the NN tries to start a new
   * segment at a higher txid. This should abort the old one and succeed.
   */
  @Test
  public void testAbortOldSegmentIfFinalizeIsMissed() throws Exception {
    journal.newEpoch(FAKE_NSINFO, 1);

    // Start a segment at txid 1, and write a batch of 3 txns.
    journal.startLogSegment(makeRI(1), 1);
    journal.journal(makeRI(2), 1, 1, 3, QJMTestUtil.createTxnData(1, 3));

    GenericTestUtils.assertExists(journal.getJournalStorage().getInProgressEditLog(1));

    // Try to start new segment at txid 6, this should abort old segment and
    // then succeed, allowing us to write txid 6-9.
    journal.startLogSegment(makeRI(3), 6);
    journal.journal(makeRI(4), 6, 6, 3, QJMTestUtil.createTxnData(6, 3));

    // The old segment should *not* be finalized.
    GenericTestUtils.assertExists(journal.getJournalStorage().getInProgressEditLog(1));
    GenericTestUtils.assertExists(journal.getJournalStorage().getInProgressEditLog(6));
  }

  /**
   * Test behavior of startLogSegment() when a segment with the same transaction
   * ID already exists.
   */
  @Test
  public void testStartLogSegmentWhenAlreadyExists() throws Exception {
    journal.newEpoch(FAKE_NSINFO, 1);

    // Start a segment at txid 1, and write just 1 transaction. This
    // would normally be the START_LOG_SEGMENT transaction.
    journal.startLogSegment(makeRI(1), 1);
    journal.journal(makeRI(2), 1, 1, 1, QJMTestUtil.createTxnData(1, 1));

    // Try to start new segment at txid 1, this should succeed, because
    // we are allowed to re-start a segment if we only ever had the
    // START_LOG_SEGMENT transaction logged.
    journal.startLogSegment(makeRI(3), 1);
    journal.journal(makeRI(4), 1, 1, 1, QJMTestUtil.createTxnData(1, 1));

    // This time through, write more transactions afterwards, simulating
    // real user transactions.
    journal.journal(makeRI(5), 1, 2, 3, QJMTestUtil.createTxnData(2, 3));

    try {
      journal.startLogSegment(makeRI(6), 1);
      fail("Did not fail to start log segment which would overwrite "
          + "an existing one");
    } catch (IllegalStateException ise) {
      GenericTestUtils.assertExceptionContains(
          "seems to contain valid transactions", ise);
    }

    journal.finalizeLogSegment(makeRI(7), 1, 4);

    // Ensure that we cannot overwrite a finalized segment
    try {
      journal.startLogSegment(makeRI(8), 1);
      fail("Did not fail to start log segment which would overwrite "
          + "an existing one");
    } catch (IllegalStateException ise) {
      GenericTestUtils.assertExceptionContains("have a finalized segment", ise);
    }

  }
 
  @Test
  public void testPurgeAllLogs() throws Exception {
    journal.newEpoch(FAKE_NSINFO, 1);
    journal.startLogSegment(makeRI(1), 1);
    journal.journal(makeRI(2), 1, 1, 2, QJMTestUtil.createTxnData(1, 2));
    journal.finalizeLogSegment(makeRI(3), 1, 2);
    TestCase.assertEquals(1, journal.getEditLogManifest(Long.MIN_VALUE).getLogs().size());
    journal.purgeLogsOlderThan(makeRI(4), Long.MAX_VALUE);
    TestCase.assertEquals(0, journal.getEditLogManifest(Long.MIN_VALUE).getLogs().size());
    TestCase.assertEquals(0, journal.getCommittedTxnId());
  }

  private static RequestInfo makeRI(int serial) {
    return new RequestInfo(JIDB, 1, serial, 0);
  }

  @Test
  public void testNamespaceVerification() throws Exception {
    journal.newEpoch(FAKE_NSINFO, 1);

    try {
      journal.newEpoch(FAKE_NSINFO_2, 2);
      fail("Did not fail newEpoch() when namespaces mismatched");
    } catch (IOException ioe) {
      GenericTestUtils.assertExceptionContains("Incompatible namespaceID", ioe);
    }
  }

}
TOP

Related Classes of org.apache.hadoop.hdfs.qjournal.server.TestJournal

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.