Package com.asakusafw.windgate.hadoopfs.jsch

Source Code of com.asakusafw.windgate.hadoopfs.jsch.JschConnection

/**
* Copyright 2011-2014 Asakusa Framework Team.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.asakusafw.windgate.hadoopfs.jsch;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.text.MessageFormat;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.asakusafw.runtime.core.context.RuntimeContext;
import com.asakusafw.windgate.core.WindGateLogger;
import com.asakusafw.windgate.hadoopfs.HadoopFsLogger;
import com.asakusafw.windgate.hadoopfs.ssh.SshConnection;
import com.asakusafw.windgate.hadoopfs.ssh.SshProfile;
import com.jcraft.jsch.ChannelExec;
import com.jcraft.jsch.JSch;
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;

/**
* A remote command execution via SSH.
* @since 0.2.2
* @version 0.4.0
*/
class JschConnection implements SshConnection {

    static final WindGateLogger WGLOG = new HadoopFsLogger(JschConnection.class);

    static final Logger LOG = LoggerFactory.getLogger(JschConnection.class);

    private static final Pattern SH_NAME = Pattern.compile("[A-Za-z_][0-9A-Za-z_]*");

    private static final Pattern SH_METACHARACTERS = Pattern.compile("[\\$`\"\\\\\n]");

    private final Session session;

    private final ChannelExec channel;

    private final SshProfile profile;

    private final String command;

    /**
     * Creates a new instance.
     * @param profile target profile
     * @param commandLineTokens the command line tokens
     * @throws IOException if failed to create a new connection
     * @throws IllegalArgumentException if any parameter is {@code null}
     */
    public JschConnection(SshProfile profile, List<String> commandLineTokens) throws IOException {
        if (profile == null) {
            throw new IllegalArgumentException("profile must not be null"); //$NON-NLS-1$
        }
        if (commandLineTokens == null) {
            throw new IllegalArgumentException("commandLineTokens must not be null"); //$NON-NLS-1$
        }
        this.profile = profile;
        this.command = buildCommand(commandLineTokens, profile.getEnvironmentVariables());
        try {
            JSch jsch = new JSch();
            jsch.addIdentity(profile.getPrivateKey(), profile.getPassPhrase());
            session = jsch.getSession(profile.getUser(), profile.getHost(), profile.getPort());
            session.setConfig("StrictHostKeyChecking", "no");
            session.setServerAliveInterval((int) TimeUnit.SECONDS.toMillis(10));
            session.setTimeout((int) TimeUnit.SECONDS.toMillis(60));

            WGLOG.info("I30001",
                    profile.getResourceName(),
                    profile.getUser(),
                    profile.getHost(),
                    profile.getPort());
            session.connect();
            WGLOG.info("I30002",
                    profile.getResourceName(),
                    profile.getUser(),
                    profile.getHost(),
                    profile.getPort());

            boolean succeeded = false;
            try {
                channel = (ChannelExec) session.openChannel("exec");
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Starting command via Jsch: {}", command);
                }
                channel.setCommand(command);
                channel.setErrStream(System.err, true);
                succeeded = true;
            } finally {
                if (succeeded == false) {
                    LOG.debug("Disconnecting SSH session (failed to initialize command channel)");
                    session.disconnect();
                }
            }
        } catch (JSchException e) {
            WGLOG.error("E30001",
                    profile.getResourceName(),
                    profile.getUser(),
                    profile.getHost(),
                    profile.getPort());
            throw new IOException(MessageFormat.format(
                    "Failed to open ssh session: {0}@{1}:{2} - {3}",
                    profile.getUser(),
                    profile.getHost(),
                    String.valueOf(profile.getPort()),
                    command), e);
        }
    }

    private String buildCommand(List<String> commandLineTokens, Map<String, String> environmentVariables) {
        assert commandLineTokens != null;
        assert environmentVariables != null;

        Map<String, String> env = new HashMap<String, String>();
        env.putAll(environmentVariables);
        env.putAll(RuntimeContext.get().unapply());

        // FIXME for bsh only
        StringBuilder buf = new StringBuilder();
        for (Map.Entry<String, String> entry : env.entrySet()) {
            if (SH_NAME.matcher(entry.getKey()).matches() == false) {
                WGLOG.warn("W30001",
                        profile.getResourceName(),
                        profile.getUser(),
                        profile.getHost(),
                        String.valueOf(profile.getPort()),
                        entry.getKey(),
                        entry.getValue());
                continue;
            }
            if (buf.length() > 0) {
                buf.append(' ');
            }
            buf.append(entry.getKey());
            String replaced = SH_METACHARACTERS.matcher(entry.getValue()).replaceAll("\\\\$0");
            buf.append('=');
            buf.append('"');
            buf.append(replaced);
            buf.append('"');
        }
        for (String token : commandLineTokens) {
            if (buf.length() > 0) {
                buf.append(' ');
            }
            String replaced = SH_METACHARACTERS.matcher(token).replaceAll("\\\\$0");
            buf.append('"');
            buf.append(replaced);
            buf.append('"');
        }
        return buf.toString();
    }

    @Override
    public void connect() throws IOException {
        try {
            WGLOG.info("I30003",
                    profile.getResourceName(),
                    profile.getUser(),
                    profile.getHost(),
                    profile.getPort(),
                    command);
            channel.connect((int) TimeUnit.SECONDS.toMillis(60));
            WGLOG.info("I30004",
                    profile.getResourceName(),
                    profile.getUser(),
                    profile.getHost(),
                    profile.getPort(),
                    command);
        } catch (JSchException e) {
            WGLOG.error("E30002",
                    profile.getResourceName(),
                    profile.getUser(),
                    profile.getHost(),
                    profile.getPort(),
                    command);
            throw new IOException(MessageFormat.format(
                    "Failed to open ssh session: {0}@{1}:{2}",
                    profile.getUser(),
                    profile.getHost(),
                    String.valueOf(profile.getPort())), e);
        }
    }

    @Override
    public OutputStream openStandardInput() throws IOException {
        LOG.debug("Opening remote standard input: {}",
                command);
        return channel.getOutputStream();
    }

    @Override
    public InputStream openStandardOutput() throws IOException {
        LOG.debug("Opening remote standard output: {}",
                command);
        return channel.getInputStream();
    }

    @Override
    public void redirectStandardOutput(OutputStream output, boolean dontClose) {
        LOG.debug("Redirecting remote standard output: {}",
                command);
        channel.setOutputStream(output, dontClose);
    }

    @Override
    public int waitForExit(long timeout) throws InterruptedException, IOException {
        LOG.debug("Waiting for remote command exit: {}",
                command);
        long until = System.currentTimeMillis() + timeout;
        while (until > System.currentTimeMillis()) {
            if (channel.isClosed()) {
                break;
            }
            Thread.sleep(100);
        }
        if (channel.isClosed() == false) {
            WGLOG.error("E30003",
                    profile.getResourceName(),
                    profile.getUser(),
                    profile.getHost(),
                    profile.getPort(),
                    command);
            throw new IOException(MessageFormat.format(
                    "Failed to wait for exit remote command: {0}@{1}:{2} - {3}",
                    profile.getUser(),
                    profile.getHost(),
                    String.valueOf(profile.getPort()),
                    command));
        }
        return channel.getExitStatus();
    }

    @Override
    public void close() throws IOException {
        LOG.debug("Closing SSH connection: {}",
                command);
        try {
            channel.disconnect();
        } finally {
            session.disconnect();
        }
    }
}
TOP

Related Classes of com.asakusafw.windgate.hadoopfs.jsch.JschConnection

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.