Package io.vertx.core.impl

Source Code of io.vertx.core.impl.ContextImpl

/*
* Copyright (c) 2011-2013 The original author or authors
* ------------------------------------------------------
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Apache License v2.0 which accompanies this distribution.
*
*     The Eclipse Public License is available at
*     http://www.eclipse.org/legal/epl-v10.html
*
*     The Apache License v2.0 is available at
*     http://www.opensource.org/licenses/apache2.0.php
*
* You may elect to redistribute this code under either of these licenses.
*/

package io.vertx.core.impl;

import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Starter;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.impl.LoggerFactory;
import io.vertx.core.spi.cluster.Action;

import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

/**
* @author <a href="http://tfox.org">Tim Fox</a>
*/
public abstract class ContextImpl implements Context {

  private static final Logger log = LoggerFactory.getLogger(ContextImpl.class);

  protected final String deploymentID;
  protected final JsonObject config;
  protected final VertxInternal vertx;
  private Deployment deployment;
  private Set<Closeable> closeHooks;
  private final ClassLoader tccl;
  private boolean closed;
  private final EventLoop eventLoop;
  protected final Executor orderedInternalPoolExec;
  protected VertxThread contextThread;

  protected ContextImpl(VertxInternal vertx, Executor orderedInternalPoolExec, String deploymentID, JsonObject config,
                        ClassLoader tccl) {
    this.vertx = vertx;
    this.orderedInternalPoolExec = orderedInternalPoolExec;
    this.deploymentID = deploymentID;
    this.config = config;
    EventLoopGroup group = vertx.getEventLoopGroup();
    if (group != null) {
      this.eventLoop = group.next();
    } else {
      this.eventLoop = null;
    }
    this.tccl = tccl;
  }

  public void setTCCL() {
    Thread.currentThread().setContextClassLoader(tccl);
  }

  public void setDeployment(Deployment deployment) {
    this.deployment = deployment;
  }

  public Deployment getDeployment() {
    return deployment;
  }

  public void addCloseHook(Closeable hook) {
    if (closeHooks == null) {
      closeHooks = new HashSet<>();
    }
    closeHooks.add(hook);
  }

  public void removeCloseHook(Closeable hook) {
    if (closeHooks != null) {
      closeHooks.remove(hook);
    }
  }

  public void runCloseHooks(Handler<AsyncResult<Void>> completionHandler) {
    if (closeHooks != null && !closeHooks.isEmpty()) {
      final int num = closeHooks.size();
      AtomicInteger count = new AtomicInteger();
      AtomicBoolean failed = new AtomicBoolean();
      // Copy to avoid ConcurrentModificationException
      for (Closeable hook: new HashSet<>(closeHooks)) {
        try {
          hook.close(ar -> {
            if (ar.failed()) {
              if (failed.compareAndSet(false, true)) {
                // Only report one failure
                completionHandler.handle(Future.completedFuture(ar.cause()));
              }
            } else {
              if (count.incrementAndGet() == num) {
                completionHandler.handle(Future.completedFuture());
              }
            }
          });
        } catch (Throwable t) {
          log.warn("Failed to run close hooks", t);
        }
      }
    } else {
      completionHandler.handle(Future.completedFuture());
    }
  }

  protected abstract void executeAsync(Handler<Void> task);

  public abstract boolean isEventLoopContext();

  public abstract boolean isMultiThreaded();

  public boolean isWorker() {
    return !isEventLoopContext();
  }

  // We should already be on the event loop, but check this anyway, then execute directly
  public void executeSync(ContextTask task) {
    checkCorrectThread();
    wrapTask(task, null, true).run();
  }

  protected abstract void checkCorrectThread();

  // Run the task asynchronously on this same context
  public void runOnContext(Handler<Void> task) {
    try {
      executeAsync(task);
    } catch (RejectedExecutionException ignore) {
      // Pool is already shut down
    }
  }

  @Override
  public String deploymentID() {
    return deploymentID;
  }

  @Override
  public JsonObject config() {
    return config;
  }

  @Override
  public List<String> processArgs() {
    return Starter.PROCESS_ARGS;
  }

  public EventLoop getEventLoop() {
    return eventLoop;
  }

  // Execute an internal task on the internal blocking ordered executor
  public <T> void executeBlocking(Action<T> action, Handler<AsyncResult<T>> resultHandler) {
    try {
      orderedInternalPoolExec.execute(() -> {
        Future<T> res = Future.future();
        try {
          T result = action.perform();
          res.complete(result);
        } catch (Throwable e) {
          res.fail(e);
        }
        if (resultHandler != null) {
          runOnContext(v -> res.setHandler(resultHandler));
        }
      });
    } catch (RejectedExecutionException ignore) {
      // Pool is already shut down
    }
  }

  public void close() {
    unsetContext();
    closed = true;
  }

  private void unsetContext() {
    vertx.setContext(null);
  }

  protected void executeStart() {
    Thread thread = Thread.currentThread();
    // Sanity check - make sure Netty is really delivering events on the correct thread
    if (this.contextThread == null) {
      if (thread instanceof VertxThread) {
        this.contextThread = (VertxThread)thread;
      } else {
        throw new IllegalStateException("Not a vert.x thread!");
      }
    } else if (this.contextThread != thread && !this.contextThread.isWorker()) {
      throw new IllegalStateException("Uh oh! Event loop context executing with wrong thread! Expected " + this.contextThread + " got " + thread);
    }
    this.contextThread.executeStart();
  }

  protected void executeEnd() {
    contextThread.executeEnd();
  }

  protected Runnable wrapTask(ContextTask cTask, Handler<Void> hTask, boolean checkThread) {
    return () -> {
      if (checkThread) {
        executeStart();
      }
      try {
        vertx.setContext(ContextImpl.this);
        if (cTask != null) {
          cTask.run();
        } else {
          hTask.handle(null);
        }
      } catch (Throwable t) {
        log.error("Unhandled exception", t);
      } finally {
        // TODO - we might have to restore the thread name in case it's been changed during the execution
        if (checkThread) {
          executeEnd();
        }
      }
      if (closed) {
        // We allow tasks to be run after the context is closed but we make sure we unset the context afterwards
        // to avoid any leaks
        unsetContext();
      }
    };
  }

}
TOP

Related Classes of io.vertx.core.impl.ContextImpl

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.