Package org.apache.flume

Examples of org.apache.flume.EventDeliveryException


      transaction.rollback();
      LOG.error("process failed", th);
      if (th instanceof Error) {
        throw (Error) th;
      } else {
        throw new EventDeliveryException(th);
      }
    } finally {
      transaction.close();
    }
  }
View Full Code Here


     * the next even if errbacks for the current txn get called while
     * the next one is being processed.
     *
     */
    if(!open){
      throw new EventDeliveryException("Sink was never opened. " +
          "Please fix the configuration.");
    }
    AtomicBoolean txnFail = new AtomicBoolean(false);
    Status status = Status.READY;
    Channel channel = getChannel();
    txn = channel.getTransaction();
    txn.begin();
    List<PutRequest> actions = new LinkedList<PutRequest>();
    List<AtomicIncrementRequest> increments =
        new LinkedList<AtomicIncrementRequest>();
    for(int i = 0; i < batchSize; i++){
      Event event = channel.take();
      if(event == null){
        status = Status.BACKOFF;
        counterGroup.incrementAndGet("channel.underflow");
        break;
      } else {
        serializer.setEvent(event);
        actions.addAll(serializer.getActions());
        increments.addAll(serializer.getIncrements());
      }
    }
    CountDownLatch latch =
        new CountDownLatch(actions.size() + increments.size());
    for(PutRequest action : actions) {
      Callback<Object, Object> callback =
          new SuccessCallback<Object, Object>(latch);
      Callback<Object, Object> errback =
          new ErrBack<Object, Object>(latch, txnFail);
      client.put(action).addCallbacks(callback, errback);
    }
    for(AtomicIncrementRequest increment : increments) {
      Callback<Long, Long> callback =
          new SuccessCallback<Long, Long>(latch);
      Callback<Long, Long> errback = new ErrBack<Long, Long>(latch, txnFail);
      client.atomicIncrement(increment).addCallbacks(callback, errback);
    }

    try {
      latch.await();
    } catch (InterruptedException e1) {
      this.handleTransactionFailure(txn);
      throw new EventDeliveryException("Sink interrupted while waiting" +
          "for Hbase callbacks. Exception follows.", e1);
    }
    /*
     * At this point, either the txn has failed
     * or all callbacks received and txn is successful.
     *
     * This need not be in the monitor, since all callbacks for this txn
     * have been received. So txnFail will not be modified any more(even if
     * it is, it is set from true to true only - false happens only
     * in the next process call).
     *
     */
    if (txnFail.get()) {
      this.handleTransactionFailure(txn);
      throw new EventDeliveryException("Could not write events to Hbase. " +
          "Transaction failed, and rolled back.");
    } else {
      try{
        txn.commit();
      } catch (Throwable e) {
        try{
          txn.rollback();
        } catch (Exception e2) {
          logger.error("Exception in rollback. Rollback might not have been" +
              "successful." , e2);
        }
        counterGroup.incrementAndGet("transaction.rollback");
        logger.error("Failed to commit transaction." +
            "Transaction rolled back.", e);
        if(e instanceof Error || e instanceof RuntimeException){
          logger.error("Failed to commit transaction." +
              "Transaction rolled back.", e);
          Throwables.propagate(e);
        } else {
          logger.error("Failed to commit transaction." +
              "Transaction rolled back.", e);
          throw new EventDeliveryException("Failed to commit transaction." +
              "Transaction rolled back.", e);
        }
      } finally {
        txn.close();
      }
View Full Code Here

            "Transaction rolled back.", e);
        Throwables.propagate(e);
      } else {
        logger.error("Failed to commit transaction." +
            "Transaction rolled back.", e);
        throw new EventDeliveryException("Failed to commit transaction." +
            "Transaction rolled back.", e);
      }
    } finally {
      txn.close();
    }
View Full Code Here

            "Transaction rolled back.", e);
        Throwables.propagate(e);
      } else {
        logger.error("Failed to commit transaction." +
            "Transaction rolled back.", e);
        throw new EventDeliveryException("Failed to commit transaction." +
            "Transaction rolled back.", e);
      }
    } finally {
      txn.close();
    }
View Full Code Here

          serializer.beforeClose();
          outputStream.flush();
          outputStream.close();
          shouldRotate = false;
        } catch (IOException e) {
          throw new EventDeliveryException("Unable to rotate file "
              + pathController.getCurrentFile() + " while delivering event", e);
        }

        serializer = null;
        outputStream = null;
        pathController.rotate();
      }
    }

    if (outputStream == null) {
      File currentFile = pathController.getCurrentFile();
      logger.debug("Opening output stream for file {}", currentFile);
      try {
        outputStream = new BufferedOutputStream(
            new FileOutputStream(currentFile));
        serializer = EventSerializerFactory.getInstance(
            serializerType, serializerContext, outputStream);
        serializer.afterCreate();
      } catch (IOException e) {
        throw new EventDeliveryException("Failed to open file "
            + pathController.getCurrentFile() + " while delivering event", e);
      }
    }

    Channel channel = getChannel();
    Transaction transaction = channel.getTransaction();
    Event event = null;
    Status result = Status.READY;

    try {
      transaction.begin();
      event = channel.take();

      if (event != null) {
        serializer.write(event);

        /*
         * FIXME: Feature: Rotate on size and time by checking bytes written and
         * setting shouldRotate = true if we're past a threshold.
         */

        /*
         * FIXME: Feature: Control flush interval based on time or number of
         * events. For now, we're super-conservative and flush on each write.
         */
        serializer.flush();
        outputStream.flush();
      } else {
        // No events found, request back-off semantics from runner
        result = Status.BACKOFF;
      }
      transaction.commit();
    } catch (Exception ex) {
      transaction.rollback();
      throw new EventDeliveryException("Failed to process event: " + event, ex);
    } finally {
      transaction.close();
    }

    return result;
View Full Code Here

      counterGroup.incrementAndGet("transaction.success");
    } catch (Exception ex) {
      transaction.rollback();
      counterGroup.incrementAndGet("transaction.failed");
      logger.error("Failed to deliver event. Exception follows.", ex);
      throw new EventDeliveryException("Failed to deliver event: " + event, ex);
    } finally {
      transaction.close();
    }

    return status;
View Full Code Here

            + "Attempting next sink if available.", ex);
      }
    }

    if (status == null) {
      throw new EventDeliveryException("All configured sinks have failed");
    }

    return status;
  }
View Full Code Here

    }

    @Override
    public Status process() throws EventDeliveryException {
      if (fail) {
        throw new EventDeliveryException("failed");
      }
      Event e = this.getChannel().take();
      if (e == null)
        return Status.BACKOFF;
View Full Code Here

        LOGGER.warn("Failed to send event to host " + host, ex);
      }
    }

    if (!eventSent) {
      throw new EventDeliveryException("Unable to send event to any host");
    }
  }
View Full Code Here

        LOGGER.warn("Failed to send batch to host " + host, ex);
      }
    }

    if (!batchSent) {
      throw new EventDeliveryException("Unable to send batch to any host");
    }
  }
View Full Code Here

TOP

Related Classes of org.apache.flume.EventDeliveryException

Copyright © 2018 www.massapicom. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.