diff --git a/h2o-hadoop/driverjar.gradle b/h2o-hadoop/driverjar.gradle index 206d07a4626..fea37fd2783 100644 --- a/h2o-hadoop/driverjar.gradle +++ b/h2o-hadoop/driverjar.gradle @@ -13,10 +13,18 @@ sourceSets { main { java { if (project.hasProperty('notYarn')) { - srcDir '../h2o-mapreduce-generic/src/main/java' + srcDirs '../h2o-mapreduce-generic/src/main/java' + } else { + srcDirs '../h2o-mapreduce-generic/src/main/java', + '../h2o-yarn-generic/src/main/java' } - else { - srcDirs '../h2o-mapreduce-generic/src/main/java', '../h2o-yarn-generic/src/main/java' + } + resources { + if (project.hasProperty('notYarn')) { + srcDirs '../h2o-mapreduce-generic/src/main/resources' + } else { + srcDirs '../h2o-mapreduce-generic/src/main/resources', + '../h2o-yarn-generic/src/main/resources' } } } diff --git a/h2o-hadoop/h2o-mapreduce-generic/src/main/java/water/hadoop/mapred/H2OYARNRunner.java b/h2o-hadoop/h2o-mapreduce-generic/src/main/java/water/hadoop/mapred/H2OYARNRunner.java new file mode 100644 index 00000000000..85401796510 --- /dev/null +++ b/h2o-hadoop/h2o-mapreduce-generic/src/main/java/water/hadoop/mapred/H2OYARNRunner.java @@ -0,0 +1,89 @@ +package water.hadoop.mapred; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.ClientCache; +import org.apache.hadoop.mapred.ResourceMgrDelegate; +import org.apache.hadoop.mapred.YARNRunner; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.v2.app.MRAppMaster; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Vector; + +import water.util.CollectionUtils; + +/** + * A H2o specific ProtocolClient + * + * In this case we use standard YARNRunner, but just override application type to H2O. + */ +public class H2OYARNRunner extends YARNRunner { + + private static final Log LOG = LogFactory.getLog(H2OYARNRunner.class); + + // Here is place which can override default application master + // to replace it via H2O App master + private static final Class NEW_APP_MASTER_CLASS = MRAppMaster.class; + private static final Class DEFAULT_APP_MASTER_CLASS = MRAppMaster.class; + + public H2OYARNRunner(Configuration conf) { + super(conf); + } + + public H2OYARNRunner(Configuration conf, + ResourceMgrDelegate resMgrDelegate) { + super(conf, resMgrDelegate); + } + + public H2OYARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate, + ClientCache clientCache) { + super(conf, resMgrDelegate, clientCache); + } + + @Override + public ApplicationSubmissionContext createApplicationSubmissionContext(Configuration jobConf, + String jobSubmitDir, + Credentials ts) + throws IOException { + // Change created app context + LOG.info("Setting application type to H2O"); + ApplicationSubmissionContext appContext = super.createApplicationSubmissionContext(jobConf, jobSubmitDir, ts); + appContext.setApplicationType("H2O"); + // Modify MRAppMaster commands to point to our master + if (replaceDefaultAppMaster()) { + LOG.info("Setting MRAppMaster to " + NEW_APP_MASTER_CLASS.getName().toString()); + ContainerLaunchContext origClc = appContext.getAMContainerSpec(); + ContainerLaunchContext newClc = ContainerLaunchContext.newInstance( + origClc.getLocalResources(), origClc.getEnvironment(), + replaceMRAppMaster(origClc.getCommands()), + null, origClc.getTokens(), origClc.getApplicationACLs()); + LOG.info(newClc); + appContext.setAMContainerSpec(newClc); + } + // And return modified context + return appContext; + } + + private List replaceMRAppMaster(List commands) { + Vector args = new Vector(8); + for (String cmd : commands) { + if (cmd.contains(MRJobConfig.APPLICATION_MASTER_CLASS)) { + cmd = cmd.replace(MRJobConfig.APPLICATION_MASTER_CLASS, NEW_APP_MASTER_CLASS.getName()); + } + args.add(cmd); + } + return args; + } + + private static boolean replaceDefaultAppMaster() { + return NEW_APP_MASTER_CLASS != DEFAULT_APP_MASTER_CLASS; + } +} + diff --git a/h2o-hadoop/h2o-mapreduce-generic/src/main/java/water/hadoop/mapred/H2OYarnClientProtocolProvider.java b/h2o-hadoop/h2o-mapreduce-generic/src/main/java/water/hadoop/mapred/H2OYarnClientProtocolProvider.java new file mode 100644 index 00000000000..9c2135dc0bd --- /dev/null +++ b/h2o-hadoop/h2o-mapreduce-generic/src/main/java/water/hadoop/mapred/H2OYarnClientProtocolProvider.java @@ -0,0 +1,36 @@ +package water.hadoop.mapred; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.MRConfig; +import org.apache.hadoop.mapreduce.protocol.ClientProtocol; +import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider; + +import java.io.IOException; +import java.net.InetSocketAddress; + +/** + * H2O specific yarn client provider. + * + * The provider can be selected by providing: `-Dmapreduce.framework.name=h2o-yarn` + */ +public class H2OYarnClientProtocolProvider extends ClientProtocolProvider { + + @Override + public ClientProtocol create(Configuration conf) throws IOException { + if ("h2o-yarn".equals(conf.get(MRConfig.FRAMEWORK_NAME))) { + return new H2OYARNRunner(conf); + } + return null; + } + + @Override + public ClientProtocol create(InetSocketAddress addr, Configuration conf) + throws IOException { + return create(conf); + } + + @Override + public void close(ClientProtocol clientProtocol) throws IOException { + // nothing to do + } +} diff --git a/h2o-hadoop/h2o-mapreduce-generic/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider b/h2o-hadoop/h2o-mapreduce-generic/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider new file mode 100644 index 00000000000..5c5910cadfd --- /dev/null +++ b/h2o-hadoop/h2o-mapreduce-generic/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider @@ -0,0 +1,14 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +water.hadoop.mapred.H2OYarnClientProtocolProvider