/**
* Copyright 2011-2014 Asakusa Framework Team.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.asakusafw.runtime.stage.resource;
import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import com.asakusafw.runtime.compatibility.JobCompatibility;
import com.asakusafw.runtime.stage.temporary.TemporaryStorage;
/**
* ステージリソースを利用するためのドライバ。
*/
public class StageResourceDriver implements Closeable {
static final Log LOG = LogFactory.getLog(StageResourceDriver.class);
private static final String PREFIX_LOCAL_CACHE_NAME = "com.asakusafw.cache.";
private final Configuration configuration;
private final FileSystem localFileSystem;
/**
* インスタンスを生成する。
* @param configuration 設定情報
* @throws IOException ファイルシステムの利用に失敗した場合
* @throws IllegalArgumentException 引数に{@code null}が含まれる場合
*/
public StageResourceDriver(Configuration configuration) throws IOException {
if (configuration == null) {
throw new IllegalArgumentException("configuration must not be null"); //$NON-NLS-1$
}
this.configuration = configuration;
this.localFileSystem = FileSystem.getLocal(configuration);
}
/**
* このオブジェクトが利用する設定情報の一覧を返す。
* @return 設定情報の一覧
*/
public Configuration getConfiguration() {
return configuration;
}
/**
* このドライバに登録されたリソースへのパスを返す。
* @param resourceName リソースの名前
* @return 対応するリソースへのパス一覧
* @throws IOException リソースの検索に失敗した場合
* @throws IllegalArgumentException 引数に{@code null}が含まれる場合
*/
public List<Path> findCache(String resourceName) throws IOException {
if (resourceName == null) {
throw new IllegalArgumentException("cacheName must not be null"); //$NON-NLS-1$
}
if (LOG.isDebugEnabled()) {
LOG.debug("finding cache: " + resourceName);
}
String[] localNames = getConfiguration().getStrings(getLocalCacheNameKey(resourceName));
List<Path> results = new ArrayList<Path>();
for (String localName : localNames) {
Path resolvedPath = findLocalCache(resourceName, localName);
if (resolvedPath == null) {
return Collections.emptyList();
}
results.add(resolvedPath);
}
if (LOG.isDebugEnabled()) {
LOG.debug(MessageFormat.format(
"Cache file resolved: resource={0}, paths={1}",
resourceName,
results));
}
return results;
}
private Path findLocalCache(String resourceName, String localName) throws IOException {
assert localName != null;
Path cache = new Path(localName);
if (localFileSystem.exists(cache)) {
if (LOG.isDebugEnabled()) {
LOG.debug("symlink found: " + cache);
}
return localFileSystem.makeQualified(cache);
}
if (LOG.isDebugEnabled()) {
LOG.debug("symlink not found: " + localName);
}
Path directPath = findCacheForLocalMode(resourceName, localName);
return directPath;
}
private Path findCacheForLocalMode(String resourceName, String localName) throws IOException {
assert resourceName != null;
assert localName != null;
Path remotePath = null;
String remoteName = null;
for (URI uri : DistributedCache.getCacheFiles(configuration)) {
if (localName.equals(uri.getFragment())) {
if (LOG.isDebugEnabled()) {
LOG.debug("fragment matched: " + uri);
}
String rpath = uri.getPath();
remotePath = new Path(uri);
remoteName = rpath.substring(rpath.lastIndexOf('/') + 1);
break;
}
}
if (remoteName == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("fragment not matched: " + resourceName);
}
return null;
}
assert remotePath != null;
for (Path path : DistributedCache.getLocalCacheFiles(configuration)) {
String localFileName = path.getName();
if (remoteName.equals(localFileName) == false) {
continue;
}
if (localFileSystem.exists(path) == false) {
continue;
}
if (LOG.isDebugEnabled()) {
LOG.debug("local path matched: " + path);
}
return localFileSystem.makeQualified(path);
}
FileSystem remoteFileSystem = remotePath.getFileSystem(configuration);
remotePath = remoteFileSystem.makeQualified(remotePath);
if (LOG.isDebugEnabled()) {
LOG.debug("distributed cache is not localized explicitly: " + remotePath);
}
if (isLocal(remoteFileSystem) == false) {
LOG.warn(MessageFormat.format(
"Failed to resolve stage resource in local cache \"{1}\" (resource={0})",
resourceName,
localName));
}
return remotePath;
}
private boolean isLocal(FileSystem fs) {
assert fs != null;
if (fs == localFileSystem) {
return true;
}
// TODO user getCanonicalUri() on 1.0.0
return fs.getUri().equals(localFileSystem.getUri());
}
@Override
public void close() throws IOException {
// do not close local file system
return;
}
/**
* 指定のジョブにリソースの情報を追加する。
* @param job 対象の情報
* @param resourcePath リソースへのパス (for temporary storage)
* @param resourceName リソースの名前
* @throws IOException リソースの情報が不明であった場合
* @throws IllegalArgumentException 引数に{@code null}が含まれる場合
*/
public static void add(Job job, String resourcePath, String resourceName) throws IOException {
if (job == null) {
throw new IllegalArgumentException("job must not be null"); //$NON-NLS-1$
}
if (resourcePath == null) {
throw new IllegalArgumentException("resourcePath must not be null"); //$NON-NLS-1$
}
if (resourceName == null) {
throw new IllegalArgumentException("resourceName must not be null"); //$NON-NLS-1$
}
List<Path> list = TemporaryStorage.list(job.getConfiguration(), new Path(resourcePath));
if (list.isEmpty()) {
throw new IOException(MessageFormat.format(
"Resource not found: {0}",
resourcePath));
}
String[] added = job.getConfiguration().getStrings(getLocalCacheNameKey(resourceName));
List<String> localNames = new ArrayList<String>();
if (added != null && added.length >= 1) {
Collections.addAll(localNames, added);
}
int index = localNames.size();
for (Path path : list) {
String name = String.format("%s-%04d", resourceName, index++);
StringBuilder buf = new StringBuilder();
buf.append(path.toString());
buf.append('#');
buf.append(name);
localNames.add(name);
try {
URI uri = new URI(buf.toString());
DistributedCache.addCacheFile(uri, job.getConfiguration());
} catch (URISyntaxException e) {
throw new IllegalStateException(e);
}
}
job.getConfiguration().setStrings(
getLocalCacheNameKey(resourceName),
localNames.toArray(new String[localNames.size()]));
if (JobCompatibility.isLocalMode(job)) {
LOG.info("symlinks for distributed cache will not be created in standalone mode");
} else {
DistributedCache.createSymlink(job.getConfiguration());
}
}
private static String getLocalCacheNameKey(String resourceName) {
assert resourceName != null;
return PREFIX_LOCAL_CACHE_NAME + resourceName;
}
}