Package org.apache.tez.dag.history.recovery

Source Code of org.apache.tez.dag.history.recovery.RecoveryService

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.tez.dag.history.recovery;

import java.io.IOException;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.service.AbstractService;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.SummaryEvent;
import org.apache.tez.dag.history.events.DAGSubmittedEvent;
import org.apache.tez.dag.records.TezDAGID;

public class RecoveryService extends AbstractService {

  private static final Log LOG = LogFactory.getLog(RecoveryService.class);
  private final AppContext appContext;

  public static final String RECOVERY_FATAL_OCCURRED_DIR =
      "RecoveryFatalErrorOccurred";

  private LinkedBlockingQueue<DAGHistoryEvent> eventQueue =
      new LinkedBlockingQueue<DAGHistoryEvent>();
  private Set<TezDAGID> completedDAGs = new HashSet<TezDAGID>();
  private Set<TezDAGID> skippedDAGs = new HashSet<TezDAGID>();

  private Thread eventHandlingThread;
  private AtomicBoolean stopped = new AtomicBoolean(false);
  private AtomicBoolean started = new AtomicBoolean(false);
  private int eventCounter = 0;
  private int eventsProcessed = 0;
  private final Object lock = new Object();
  private FileSystem recoveryDirFS; // FS where staging dir exists
  Path recoveryPath;
  Map<TezDAGID, FSDataOutputStream> outputStreamMap = new
      HashMap<TezDAGID, FSDataOutputStream>();
  private int bufferSize;
  private FSDataOutputStream summaryStream;
  private int unflushedEventsCount = 0;
  private long lastFlushTime = -1;
  private int maxUnflushedEvents;
  private int flushInterval;
  private AtomicBoolean recoveryFatalErrorOccurred = new AtomicBoolean(false);

  public RecoveryService(AppContext appContext) {
    super(RecoveryService.class.getName());
    this.appContext = appContext;
  }

  @Override
  public void serviceInit(Configuration conf) throws Exception {
    LOG.info("Initializing RecoveryService");
    recoveryPath = appContext.getCurrentRecoveryDir();
    recoveryDirFS = FileSystem.get(recoveryPath.toUri(), conf);
    bufferSize = conf.getInt(TezConfiguration.DAG_RECOVERY_FILE_IO_BUFFER_SIZE,
        TezConfiguration.DAG_RECOVERY_FILE_IO_BUFFER_SIZE_DEFAULT);

    flushInterval = conf.getInt(TezConfiguration.DAG_RECOVERY_FLUSH_INTERVAL_SECS,
        TezConfiguration.DAG_RECOVERY_FLUSH_INTERVAL_SECS_DEFAULT);
    maxUnflushedEvents = conf.getInt(TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS,
        TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS_DEFAULT);
  }

  @Override
  public void serviceStart() {
    LOG.info("Starting RecoveryService");
    lastFlushTime = appContext.getClock().getTime();
    eventHandlingThread = new Thread(new Runnable() {
      @Override
      public void run() {
        DAGHistoryEvent event;
        while (!stopped.get() && !Thread.currentThread().isInterrupted()) {

          if (recoveryFatalErrorOccurred.get()) {
            LOG.error("Recovery failure occurred. Stopping recovery thread."
                + " Current eventQueueSize=" + eventQueue.size());
            eventQueue.clear();
            return;
          }

          // Log the size of the event-queue every so often.
          if (eventCounter != 0 && eventCounter % 1000 == 0) {
            LOG.info("Event queue stats"
                + ", eventsProcessedSinceLastUpdate=" + eventsProcessed
                + ", eventQueueSize=" + eventQueue.size());
            eventCounter = 0;
            eventsProcessed = 0;
          } else {
            ++eventCounter;
          }

          try {
            event = eventQueue.take();
          } catch (InterruptedException e) {
            LOG.info("EventQueue take interrupted. Returning");
            return;
          }

          synchronized (lock) {
            try {
              ++eventsProcessed;
              handleRecoveryEvent(event);
            } catch (Exception e) {
              // For now, ignore any such errors as these are non-critical
              // All summary event related errors are handled as critical
              LOG.warn("Error handling recovery event", e);
            }
          }
        }
      }
    }, "RecoveryEventHandlingThread");
    eventHandlingThread.start();
    started.set(true);
  }

  @Override
  public void serviceStop() {
    LOG.info("Stopping RecoveryService");
    stopped.set(true);
    if (eventHandlingThread != null) {
      eventHandlingThread.interrupt();
    }

    if (summaryStream != null) {
      try {
        LOG.info("Closing Summary Stream");
        summaryStream.hflush();
        summaryStream.close();
      } catch (IOException ioe) {
        LOG.warn("Error when closing summary stream", ioe);
      }
    }
    for (Entry<TezDAGID, FSDataOutputStream> entry : outputStreamMap.entrySet()) {
      try {
        LOG.info("Closing Output Stream for DAG " + entry.getKey());
        entry.getValue().hflush();
        entry.getValue().close();
      } catch (IOException ioe) {
        LOG.warn("Error when closing output stream", ioe);
      }
    }
  }

  public void handle(DAGHistoryEvent event) throws IOException {
    if (stopped.get()) {
      LOG.warn("Igoring event as service stopped, eventType"
          + event.getHistoryEvent().getEventType());
      return;
    }
    HistoryEventType eventType = event.getHistoryEvent().getEventType();

    if (recoveryFatalErrorOccurred.get()) {
      return;
    }

    if (!started.get()) {
      LOG.warn("Adding event of type " + eventType
          + " to queue as service not started");
      eventQueue.add(event);
      return;
    }

    TezDAGID dagId = event.getDagID();
    if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) {
      DAGSubmittedEvent dagSubmittedEvent =
          (DAGSubmittedEvent) event.getHistoryEvent();
      String dagName = dagSubmittedEvent.getDAGName();
      if (dagName != null
          && dagName.startsWith(
              TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX)) {
        // Skip recording pre-warm DAG events
        skippedDAGs.add(dagId);
        return;
      }
    }
    if (dagId == null || skippedDAGs.contains(dagId)) {
      if (LOG.isDebugEnabled()) {
        LOG.debug("Skipping event for DAG"
            + ", eventType=" + eventType
            + ", dagId=" + (dagId == null ? "null" : dagId.toString())
            + ", isSkippedDAG=" + (dagId == null ? "null"
            : skippedDAGs.contains(dagId)));
      }
      return;
    }

    if (event.getHistoryEvent() instanceof SummaryEvent) {
      synchronized (lock) {
        try {
          SummaryEvent summaryEvent = (SummaryEvent) event.getHistoryEvent();
          handleSummaryEvent(dagId, eventType, summaryEvent);
          summaryStream.hflush();
          if (summaryEvent.writeToRecoveryImmediately()) {
            handleRecoveryEvent(event);
            doFlush(outputStreamMap.get(event.getDagID()),
                appContext.getClock().getTime());
          } else {
            if (LOG.isDebugEnabled()) {
              LOG.debug("Queueing Non-immediate Summary/Recovery event of type"
                  + eventType.name());
            }
            eventQueue.add(event);
          }
          if (eventType.equals(HistoryEventType.DAG_FINISHED)) {
            LOG.info("DAG completed"
                + ", dagId=" + event.getDagID()
                + ", queueSize=" + eventQueue.size());
            completedDAGs.add(dagId);
            if (outputStreamMap.containsKey(dagId)) {
              try {
                outputStreamMap.get(dagId).close();
                outputStreamMap.remove(dagId);
              } catch (IOException ioe) {
                LOG.warn("Error when trying to flush/close recovery file for"
                    + " dag, dagId=" + event.getDagID());
              }
            }
          }
        } catch (IOException ioe) {
          LOG.error("Error handling summary event"
              + ", eventType=" + event.getHistoryEvent().getEventType(), ioe);
          Path fatalErrorDir = new Path(recoveryPath, RECOVERY_FATAL_OCCURRED_DIR);
          try {
            LOG.error("Adding a flag to ensure next AM attempt does not start up"
                + ", flagFile=" + fatalErrorDir.toString());
            recoveryFatalErrorOccurred.set(true);
            recoveryDirFS.mkdirs(fatalErrorDir);
            if (recoveryDirFS.exists(fatalErrorDir)) {
              LOG.error("Recovery failure occurred. Skipping all events");
            } else {
              // throw error if fatal error flag could not be set
              throw ioe;
            }
          } catch (IOException e) {
            LOG.fatal("Failed to create fatal error flag dir "
                + fatalErrorDir.toString(), e);
            throw ioe;
          }
          if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) {
            // Throw error to tell client that dag submission failed
            throw ioe;
          }
        }
      }
    } else {
      // All other events just get queued
      if (LOG.isDebugEnabled()) {
        LOG.debug("Queueing Non-Summary Recovery event of type " + eventType.name());
      }
      eventQueue.add(event);
    }
  }

  private void handleSummaryEvent(TezDAGID dagID,
      HistoryEventType eventType,
      SummaryEvent summaryEvent) throws IOException {
    if (LOG.isDebugEnabled()) {
      LOG.debug("Handling summary event"
          + ", dagID=" + dagID
          + ", eventType=" + eventType);
    }
    if (summaryStream == null) {
      Path summaryPath = TezCommonUtils.getSummaryRecoveryPath(recoveryPath);
      if (LOG.isDebugEnabled()) {
        LOG.debug("AppId :" + appContext.getApplicationID() + " summaryPath " + summaryPath);
      }
      if (!recoveryDirFS.exists(summaryPath)) {
        summaryStream = recoveryDirFS.create(summaryPath, false,
            bufferSize);
      } else {
        summaryStream = recoveryDirFS.append(summaryPath, bufferSize);
      }
    }
    if (LOG.isDebugEnabled()) {
      LOG.debug("Writing recovery event to summary stream"
          + ", dagId=" + dagID
          + ", eventType=" + eventType);
    }
    summaryEvent.toSummaryProtoStream(summaryStream);
  }

  private void handleRecoveryEvent(DAGHistoryEvent event) throws IOException {
    HistoryEventType eventType = event.getHistoryEvent().getEventType();
    if (LOG.isDebugEnabled()) {
      LOG.debug("Handling recovery event of type "
          + event.getHistoryEvent().getEventType());
    }
    TezDAGID dagID = event.getDagID();

    if (completedDAGs.contains(dagID)) {
      // no need to recover completed DAGs
      if (LOG.isDebugEnabled()) {
        LOG.debug("Skipping Recovery Event as DAG completed"
            + ", dagId=" + dagID
            + ", completed=" + completedDAGs.contains(dagID)
            + ", skipped=" + skippedDAGs.contains(dagID)
            + ", eventType=" + eventType);
      }
      return;
    }

    if (!outputStreamMap.containsKey(dagID)) {
      Path dagFilePath = TezCommonUtils.getDAGRecoveryPath(recoveryPath, dagID.toString());
      FSDataOutputStream outputStream;
      if (recoveryDirFS.exists(dagFilePath)) {
        if (LOG.isDebugEnabled()) {
          LOG.debug("Opening DAG recovery file in append mode"
              + ", filePath=" + dagFilePath);
        }
        outputStream = recoveryDirFS.append(dagFilePath, bufferSize);
      } else {
        if (LOG.isDebugEnabled()) {
          LOG.debug("Opening DAG recovery file in create mode"
              + ", filePath=" + dagFilePath);
        }
        outputStream = recoveryDirFS.create(dagFilePath, false, bufferSize);
      }
      outputStreamMap.put(dagID, outputStream);
    }

    FSDataOutputStream outputStream = outputStreamMap.get(dagID);

    if (LOG.isDebugEnabled()) {
      LOG.debug("Writing recovery event to output stream"
          + ", dagId=" + dagID
          + ", eventType=" + eventType);
    }
    ++unflushedEventsCount;
    outputStream.writeInt(event.getHistoryEvent().getEventType().ordinal());
    event.getHistoryEvent().toProtoStream(outputStream);
    if (!EnumSet.of(HistoryEventType.DAG_SUBMITTED,
        HistoryEventType.DAG_FINISHED).contains(eventType)) {
      maybeFlush(outputStream);
    }
  }

  private void maybeFlush(FSDataOutputStream outputStream) throws IOException {
    long currentTime = appContext.getClock().getTime();
    boolean doFlush = false;
    if (maxUnflushedEvents >=0
        && unflushedEventsCount >= maxUnflushedEvents) {
      if  (LOG.isDebugEnabled()) {
        LOG.debug("Max unflushed events count reached. Flushing recovery data"
            + ", unflushedEventsCount=" + unflushedEventsCount
            + ", maxUnflushedEvents=" + maxUnflushedEvents);
      }
      doFlush = true;
    } else if (flushInterval >= 0
        && ((currentTime - lastFlushTime) >= (flushInterval*1000))) {
      LOG.debug("Flush interval time period elapsed. Flushing recovery data"
          + ", lastTimeSinceFLush=" + lastFlushTime
          + ", timeSinceLastFlush=" + (currentTime - lastFlushTime));
      doFlush = true;
    }
    if (!doFlush) {
      return;
    }
    doFlush(outputStream, currentTime);
  }

  private void doFlush(FSDataOutputStream outputStream,
      long currentTime) throws IOException {
    outputStream.hflush();

    if (LOG.isDebugEnabled()) {
      LOG.debug("Flushing output stream"
          + ", lastTimeSinceFLush=" + lastFlushTime
          + ", timeSinceLastFlush=" + (currentTime - lastFlushTime)
          + ", unflushedEventsCount=" + unflushedEventsCount
          + ", maxUnflushedEvents=" + maxUnflushedEvents);
    }

    unflushedEventsCount = 0;
    lastFlushTime = currentTime;
  }

  public boolean hasRecoveryFailed() {
    return recoveryFatalErrorOccurred.get();
  }

}
TOP

Related Classes of org.apache.tez.dag.history.recovery.RecoveryService

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.