diff --git a/gradle.properties b/gradle.properties index 8d3ffc4639e..d9c8c330e1c 100644 --- a/gradle.properties +++ b/gradle.properties @@ -29,3 +29,6 @@ org.gradle.jvmargs='-XX:MaxPermSize=384m' # Used for h2o-bindings generation, to allow us to use an extended h2o.jar h2oJarfile='build/h2o.jar' + +# Run ssl tests +doTestSSL=false diff --git a/gradle/multiNodeTesting.gradle b/gradle/multiNodeTesting.gradle index d233f40c51a..b83656cffd1 100644 --- a/gradle/multiNodeTesting.gradle +++ b/gradle/multiNodeTesting.gradle @@ -22,6 +22,11 @@ task testMultiNode(type: Exec) { environment "BUILD_DIR", project.buildDir def args = ['bash', './testMultiNode.sh'] + + if(project.hasProperty('doTestSSL')) { + args << 'ssl' + } + if (project.hasProperty("jacocoCoverage")) { args << 'jacoco' } diff --git a/h2o-algos/build.gradle b/h2o-algos/build.gradle index d140f4a7b20..2daa0e2a927 100644 --- a/h2o-algos/build.gradle +++ b/h2o-algos/build.gradle @@ -11,6 +11,15 @@ dependencies { apply from: "${rootDir}/gradle/dataCheck.gradle" +task testSSLEncryption(type: Exec) { + dependsOn cpLibs, jar, testJar + if(project.hasProperty('doTestSSL')) { + commandLine 'bash', './testSSL.sh' + } else { + commandLine 'echo', 'SSL tests not enabled' + } +} + // The default 'test' behavior is broken in that it does not grok clusters. // For H2O, all tests need to be run on a cluster, where each JVM is // "free-running" - it's stdout/stderr are NOT hooked by another process. If @@ -22,10 +31,8 @@ apply from: "${rootDir}/gradle/dataCheck.gradle" // level) to files - then scrape the files later for test results. test { dependsOn ":h2o-core:testJar" - dependsOn smalldataCheck, cpLibs, jar, testJar, testSingleNode, testMultiNode + dependsOn smalldataCheck, cpLibs, jar, testJar, testSingleNode, testMultiNode, testSSLEncryption // Defeat task 'test' by running no tests. exclude '**' -} - -testMultiNode.shouldRunAfter testSingleNode +} \ No newline at end of file diff --git a/h2o-algos/src/test/java/water/network/SSLEncryptionTest.java b/h2o-algos/src/test/java/water/network/SSLEncryptionTest.java new file mode 100644 index 00000000000..9c95eb23ea9 --- /dev/null +++ b/h2o-algos/src/test/java/water/network/SSLEncryptionTest.java @@ -0,0 +1,84 @@ +package water.network; + +import hex.tree.gbm.GBM; +import hex.tree.gbm.GBMModel; +import org.junit.Assert; +import org.junit.Ignore; +import water.TestUtil; +import water.fvec.Frame; +import water.util.Log; + +import java.util.Date; + +import static hex.genmodel.utils.DistributionFamily.gaussian; + +/** + * This class is used to capture TCP packets while training a model + * The result is then used to check if SSL encryption is working properly + */ +@Ignore +public class SSLEncryptionTest extends TestUtil { + + public static void main(String[] args) { + if (args.length == 1) { + testGBMRegressionGaussianSSL(args[0]); + } else { + testGBMRegressionGaussianNonSSL(); + } + + System.exit(0); + } + + public static void testGBMRegressionGaussianNonSSL() { + stall_till_cloudsize(4); + testGBMRegressionGaussian(); + } + + public static void testGBMRegressionGaussianSSL(String prop) { + stall_till_cloudsize(new String[] {"-internal_security_conf", prop}, 4); + testGBMRegressionGaussian(); + } + + private static void testGBMRegressionGaussian() { + GBMModel gbm = null; + Frame fr = null, fr2 = null; + try { + Date start = new Date(); + + fr = parse_test_file("./smalldata/gbm_test/Mfgdata_gaussian_GBM_testing.csv"); + GBMModel.GBMParameters parms = new GBMModel.GBMParameters(); + parms._train = fr._key; + parms._distribution = gaussian; + parms._response_column = fr._names[1]; // Row in col 0, dependent in col 1, predictor in col 2 + parms._ntrees = 1; + parms._max_depth = 1; + parms._min_rows = 1; + parms._nbins = 20; + // Drop ColV2 0 (row), keep 1 (response), keep col 2 (only predictor), drop remaining cols + String[] xcols = parms._ignored_columns = new String[fr.numCols()-2]; + xcols[0] = fr._names[0]; + System.arraycopy(fr._names,3,xcols,1,fr.numCols()-3); + parms._learn_rate = 1.0f; + parms._score_each_iteration=true; + + GBM job = new GBM(parms); + gbm = job.trainModel().get(); + + Log.info(">>> GBM parsing and training took: " + (new Date().getTime() - start.getTime()) + " ms."); + + Assert.assertTrue(job.isStopped()); //HEX-1817 + + // Done building model; produce a score column with predictions + + Date scoringStart = new Date(); + + fr2 = gbm.score(fr); + + Log.info(">>> GBM scoring took: " + (new Date().getTime() - scoringStart.getTime()) + " ms."); + } finally { + if( fr != null ) fr .remove(); + if( fr2 != null ) fr2.remove(); + if( gbm != null ) gbm.remove(); + } + } +} diff --git a/h2o-algos/src/test/resources/ssl.properties b/h2o-algos/src/test/resources/ssl.properties new file mode 100644 index 00000000000..577a8ca3651 --- /dev/null +++ b/h2o-algos/src/test/resources/ssl.properties @@ -0,0 +1,5 @@ +h2o_ssl_protocol=TLSv1.2 +h2o_ssl_jks_internal=../h2o-core/src/test/resources/keystore.jks +h2o_ssl_jks_password=password +h2o_ssl_jts=../h2o-core/src/test/resources/cacerts.jks +h2o_ssl_jts_password=password \ No newline at end of file diff --git a/h2o-algos/src/test/resources/ssl2.properties b/h2o-algos/src/test/resources/ssl2.properties new file mode 100644 index 00000000000..bfc241edf60 --- /dev/null +++ b/h2o-algos/src/test/resources/ssl2.properties @@ -0,0 +1,5 @@ +h2o_ssl_protocol=TLSv1.2 +h2o_ssl_jks_internal=../../h2o-core/src/test/resources/keystore.jks +h2o_ssl_jks_password=password +h2o_ssl_jts=../../h2o-core/src/test/resources/cacerts.jks +h2o_ssl_jts_password=password \ No newline at end of file diff --git a/h2o-algos/src/test/resources/ssl3.properties b/h2o-algos/src/test/resources/ssl3.properties new file mode 100644 index 00000000000..b2af8be8a4c --- /dev/null +++ b/h2o-algos/src/test/resources/ssl3.properties @@ -0,0 +1,5 @@ +h2o_ssl_protocol=TLSv1.2 +h2o_ssl_jks_internal=../../../h2o-core/src/test/resources/keystore.jks +h2o_ssl_jks_password=password +h2o_ssl_jts=../../../h2o-core/src/test/resources/cacerts.jks +h2o_ssl_jts_password=password \ No newline at end of file diff --git a/h2o-algos/testMultiNode.sh b/h2o-algos/testMultiNode.sh index f5b49dfee9e..945b17e727d 100755 --- a/h2o-algos/testMultiNode.sh +++ b/h2o-algos/testMultiNode.sh @@ -1,4 +1,5 @@ #!/bin/bash +source ../multiNodeUtils.sh # Argument parsing if [ "$1" = "jacoco" ] @@ -109,21 +110,21 @@ CLUSTER_BASEPORT_2=45000 CLUSTER_BASEPORT_3=46000 CLUSTER_BASEPORT_4=47000 CLUSTER_BASEPORT_5=48000 -$JVM water.H2O -name $CLUSTER_NAME.1 -baseport $CLUSTER_BASEPORT_1 -ga_opt_out 1> $OUTDIR/out.1.1 2>&1 & PID_11=$! -$JVM water.H2O -name $CLUSTER_NAME.1 -baseport $CLUSTER_BASEPORT_1 -ga_opt_out 1> $OUTDIR/out.1.2 2>&1 & PID_12=$! -$JVM water.H2O -name $CLUSTER_NAME.1 -baseport $CLUSTER_BASEPORT_1 -ga_opt_out 1> $OUTDIR/out.1.3 2>&1 & PID_13=$! -$JVM water.H2O -name $CLUSTER_NAME.2 -baseport $CLUSTER_BASEPORT_2 -ga_opt_out 1> $OUTDIR/out.2.1 2>&1 & PID_21=$! -$JVM water.H2O -name $CLUSTER_NAME.2 -baseport $CLUSTER_BASEPORT_2 -ga_opt_out 1> $OUTDIR/out.2.2 2>&1 & PID_22=$! -$JVM water.H2O -name $CLUSTER_NAME.2 -baseport $CLUSTER_BASEPORT_2 -ga_opt_out 1> $OUTDIR/out.2.3 2>&1 & PID_23=$! -$JVM water.H2O -name $CLUSTER_NAME.3 -baseport $CLUSTER_BASEPORT_3 -ga_opt_out 1> $OUTDIR/out.3.1 2>&1 & PID_31=$! -$JVM water.H2O -name $CLUSTER_NAME.3 -baseport $CLUSTER_BASEPORT_3 -ga_opt_out 1> $OUTDIR/out.3.2 2>&1 & PID_32=$! -$JVM water.H2O -name $CLUSTER_NAME.3 -baseport $CLUSTER_BASEPORT_3 -ga_opt_out 1> $OUTDIR/out.3.3 2>&1 & PID_33=$! -$JVM water.H2O -name $CLUSTER_NAME.4 -baseport $CLUSTER_BASEPORT_4 -ga_opt_out 1> $OUTDIR/out.4.1 2>&1 & PID_41=$! -$JVM water.H2O -name $CLUSTER_NAME.4 -baseport $CLUSTER_BASEPORT_4 -ga_opt_out 1> $OUTDIR/out.4.2 2>&1 & PID_42=$! -$JVM water.H2O -name $CLUSTER_NAME.4 -baseport $CLUSTER_BASEPORT_4 -ga_opt_out 1> $OUTDIR/out.4.3 2>&1 & PID_43=$! -$JVM water.H2O -name $CLUSTER_NAME.5 -baseport $CLUSTER_BASEPORT_5 -ga_opt_out 1> $OUTDIR/out.5.1 2>&1 & PID_51=$! -$JVM water.H2O -name $CLUSTER_NAME.5 -baseport $CLUSTER_BASEPORT_5 -ga_opt_out 1> $OUTDIR/out.5.2 2>&1 & PID_52=$! -$JVM water.H2O -name $CLUSTER_NAME.5 -baseport $CLUSTER_BASEPORT_5 -ga_opt_out 1> $OUTDIR/out.5.3 2>&1 & PID_53=$! +$JVM water.H2O -name $CLUSTER_NAME.1 -baseport $CLUSTER_BASEPORT_1 -ga_opt_out $SSL 1> $OUTDIR/out.1.1 2>&1 & PID_11=$! +$JVM water.H2O -name $CLUSTER_NAME.1 -baseport $CLUSTER_BASEPORT_1 -ga_opt_out $SSL 1> $OUTDIR/out.1.2 2>&1 & PID_12=$! +$JVM water.H2O -name $CLUSTER_NAME.1 -baseport $CLUSTER_BASEPORT_1 -ga_opt_out $SSL 1> $OUTDIR/out.1.3 2>&1 & PID_13=$! +$JVM water.H2O -name $CLUSTER_NAME.2 -baseport $CLUSTER_BASEPORT_2 -ga_opt_out $SSL 1> $OUTDIR/out.2.1 2>&1 & PID_21=$! +$JVM water.H2O -name $CLUSTER_NAME.2 -baseport $CLUSTER_BASEPORT_2 -ga_opt_out $SSL 1> $OUTDIR/out.2.2 2>&1 & PID_22=$! +$JVM water.H2O -name $CLUSTER_NAME.2 -baseport $CLUSTER_BASEPORT_2 -ga_opt_out $SSL 1> $OUTDIR/out.2.3 2>&1 & PID_23=$! +$JVM water.H2O -name $CLUSTER_NAME.3 -baseport $CLUSTER_BASEPORT_3 -ga_opt_out $SSL 1> $OUTDIR/out.3.1 2>&1 & PID_31=$! +$JVM water.H2O -name $CLUSTER_NAME.3 -baseport $CLUSTER_BASEPORT_3 -ga_opt_out $SSL 1> $OUTDIR/out.3.2 2>&1 & PID_32=$! +$JVM water.H2O -name $CLUSTER_NAME.3 -baseport $CLUSTER_BASEPORT_3 -ga_opt_out $SSL 1> $OUTDIR/out.3.3 2>&1 & PID_33=$! +$JVM water.H2O -name $CLUSTER_NAME.4 -baseport $CLUSTER_BASEPORT_4 -ga_opt_out $SSL 1> $OUTDIR/out.4.1 2>&1 & PID_41=$! +$JVM water.H2O -name $CLUSTER_NAME.4 -baseport $CLUSTER_BASEPORT_4 -ga_opt_out $SSL 1> $OUTDIR/out.4.2 2>&1 & PID_42=$! +$JVM water.H2O -name $CLUSTER_NAME.4 -baseport $CLUSTER_BASEPORT_4 -ga_opt_out $SSL 1> $OUTDIR/out.4.3 2>&1 & PID_43=$! +$JVM water.H2O -name $CLUSTER_NAME.5 -baseport $CLUSTER_BASEPORT_5 -ga_opt_out $SSL 1> $OUTDIR/out.5.1 2>&1 & PID_51=$! +$JVM water.H2O -name $CLUSTER_NAME.5 -baseport $CLUSTER_BASEPORT_5 -ga_opt_out $SSL 1> $OUTDIR/out.5.2 2>&1 & PID_52=$! +$JVM water.H2O -name $CLUSTER_NAME.5 -baseport $CLUSTER_BASEPORT_5 -ga_opt_out $SSL 1> $OUTDIR/out.5.3 2>&1 & PID_53=$! # If coverage is being run, then pass a system variable flag so that timeout limits are increased. if [ $JACOCO_ENABLED = true ] @@ -136,11 +137,11 @@ fi # Launch last driver JVM. All output redir'd at the OS level to sandbox files. echo Running h2o-algos junit tests... -($JVM -Ddoonly.tests=$DOONLY -Dbuild.id=$BUILD_ID -Dignore.tests=$IGNORE -Djob.name=$JOB_NAME -Dgit.commit=$GIT_COMMIT -Dgit.branch=$GIT_BRANCH -Dai.h2o.name=$CLUSTER_NAME.1 -Dai.h2o.baseport=$CLUSTER_BASEPORT_1 -Dai.h2o.ga_opt_out=yes $JACOCO_FLAG $JUNIT_RUNNER $JUNIT_TESTS_BOOT `cat $OUTDIR/tests.txt | awk 'NR%5==0'` 2>&1 ; echo $? > $OUTDIR/status.1) 1> $OUTDIR/out.1 2>&1 & PID_1=$! -($JVM -Ddoonly.tests=$DOONLY -Dbuild.id=$BUILD_ID -Dignore.tests=$IGNORE -Djob.name=$JOB_NAME -Dgit.commit=$GIT_COMMIT -Dgit.branch=$GIT_BRANCH -Dai.h2o.name=$CLUSTER_NAME.2 -Dai.h2o.baseport=$CLUSTER_BASEPORT_2 -Dai.h2o.ga_opt_out=yes $JACOCO_FLAG $JUNIT_RUNNER $JUNIT_TESTS_BOOT `cat $OUTDIR/tests.txt | awk 'NR%5==1'` 2>&1 ; echo $? > $OUTDIR/status.2) 1> $OUTDIR/out.2 2>&1 & PID_2=$! -($JVM -Ddoonly.tests=$DOONLY -Dbuild.id=$BUILD_ID -Dignore.tests=$IGNORE -Djob.name=$JOB_NAME -Dgit.commit=$GIT_COMMIT -Dgit.branch=$GIT_BRANCH -Dai.h2o.name=$CLUSTER_NAME.3 -Dai.h2o.baseport=$CLUSTER_BASEPORT_3 -Dai.h2o.ga_opt_out=yes $JACOCO_FLAG $JUNIT_RUNNER $JUNIT_TESTS_BOOT `cat $OUTDIR/tests.txt | awk 'NR%5==2'` 2>&1 ; echo $? > $OUTDIR/status.3) 1> $OUTDIR/out.3 2>&1 & PID_3=$! -($JVM -Ddoonly.tests=$DOONLY -Dbuild.id=$BUILD_ID -Dignore.tests=$IGNORE -Djob.name=$JOB_NAME -Dgit.commit=$GIT_COMMIT -Dgit.branch=$GIT_BRANCH -Dai.h2o.name=$CLUSTER_NAME.4 -Dai.h2o.baseport=$CLUSTER_BASEPORT_4 -Dai.h2o.ga_opt_out=yes $JACOCO_FLAG $JUNIT_RUNNER $JUNIT_TESTS_BOOT `cat $OUTDIR/tests.txt | awk 'NR%5==3'` 2>&1 ; echo $? > $OUTDIR/status.4) 1> $OUTDIR/out.4 2>&1 & PID_4=$! -($JVM -Ddoonly.tests=$DOONLY -Dbuild.id=$BUILD_ID -Dignore.tests=$IGNORE -Djob.name=$JOB_NAME -Dgit.commit=$GIT_COMMIT -Dgit.branch=$GIT_BRANCH -Dai.h2o.name=$CLUSTER_NAME.5 -Dai.h2o.baseport=$CLUSTER_BASEPORT_5 -Dai.h2o.ga_opt_out=yes $JACOCO_FLAG $JUNIT_RUNNER $JUNIT_TESTS_BOOT `cat $OUTDIR/tests.txt | awk 'NR%5==4'` 2>&1 ; echo $? > $OUTDIR/status.5) 1> $OUTDIR/out.5 2>&1 & PID_5=$! +($JVM $TEST_SSL -Ddoonly.tests=$DOONLY -Dbuild.id=$BUILD_ID -Dignore.tests=$IGNORE -Djob.name=$JOB_NAME -Dgit.commit=$GIT_COMMIT -Dgit.branch=$GIT_BRANCH -Dai.h2o.name=$CLUSTER_NAME.1 -Dai.h2o.baseport=$CLUSTER_BASEPORT_1 -Dai.h2o.ga_opt_out=yes $JACOCO_FLAG $JUNIT_RUNNER $JUNIT_TESTS_BOOT `cat $OUTDIR/tests.txt | awk 'NR%5==0'` 2>&1 ; echo $? > $OUTDIR/status.1) 1> $OUTDIR/out.1 2>&1 & PID_1=$! +($JVM $TEST_SSL -Ddoonly.tests=$DOONLY -Dbuild.id=$BUILD_ID -Dignore.tests=$IGNORE -Djob.name=$JOB_NAME -Dgit.commit=$GIT_COMMIT -Dgit.branch=$GIT_BRANCH -Dai.h2o.name=$CLUSTER_NAME.2 -Dai.h2o.baseport=$CLUSTER_BASEPORT_2 -Dai.h2o.ga_opt_out=yes $JACOCO_FLAG $JUNIT_RUNNER $JUNIT_TESTS_BOOT `cat $OUTDIR/tests.txt | awk 'NR%5==1'` 2>&1 ; echo $? > $OUTDIR/status.2) 1> $OUTDIR/out.2 2>&1 & PID_2=$! +($JVM $TEST_SSL -Ddoonly.tests=$DOONLY -Dbuild.id=$BUILD_ID -Dignore.tests=$IGNORE -Djob.name=$JOB_NAME -Dgit.commit=$GIT_COMMIT -Dgit.branch=$GIT_BRANCH -Dai.h2o.name=$CLUSTER_NAME.3 -Dai.h2o.baseport=$CLUSTER_BASEPORT_3 -Dai.h2o.ga_opt_out=yes $JACOCO_FLAG $JUNIT_RUNNER $JUNIT_TESTS_BOOT `cat $OUTDIR/tests.txt | awk 'NR%5==2'` 2>&1 ; echo $? > $OUTDIR/status.3) 1> $OUTDIR/out.3 2>&1 & PID_3=$! +($JVM $TEST_SSL -Ddoonly.tests=$DOONLY -Dbuild.id=$BUILD_ID -Dignore.tests=$IGNORE -Djob.name=$JOB_NAME -Dgit.commit=$GIT_COMMIT -Dgit.branch=$GIT_BRANCH -Dai.h2o.name=$CLUSTER_NAME.4 -Dai.h2o.baseport=$CLUSTER_BASEPORT_4 -Dai.h2o.ga_opt_out=yes $JACOCO_FLAG $JUNIT_RUNNER $JUNIT_TESTS_BOOT `cat $OUTDIR/tests.txt | awk 'NR%5==3'` 2>&1 ; echo $? > $OUTDIR/status.4) 1> $OUTDIR/out.4 2>&1 & PID_4=$! +($JVM $TEST_SSL -Ddoonly.tests=$DOONLY -Dbuild.id=$BUILD_ID -Dignore.tests=$IGNORE -Djob.name=$JOB_NAME -Dgit.commit=$GIT_COMMIT -Dgit.branch=$GIT_BRANCH -Dai.h2o.name=$CLUSTER_NAME.5 -Dai.h2o.baseport=$CLUSTER_BASEPORT_5 -Dai.h2o.ga_opt_out=yes $JACOCO_FLAG $JUNIT_RUNNER $JUNIT_TESTS_BOOT `cat $OUTDIR/tests.txt | awk 'NR%5==4'` 2>&1 ; echo $? > $OUTDIR/status.5) 1> $OUTDIR/out.5 2>&1 & PID_5=$! wait ${PID_1} ${PID_2} ${PID_3} ${PID_4} ${PID_5} 1> /dev/null 2>&1 grep EXECUTION $OUTDIR/out.* | sed -e "s/.*TEST \(.*\) EXECUTION TIME: \(.*\) (Wall.*/\2 \1/" | sort -gr | head -n 10 >> $OUTDIR/out.0 diff --git a/h2o-algos/testSSL.sh b/h2o-algos/testSSL.sh new file mode 100755 index 00000000000..ca9f33a7eaa --- /dev/null +++ b/h2o-algos/testSSL.sh @@ -0,0 +1,137 @@ +#!/bin/bash + +# Clean out any old sandbox, make a new one +OUTDIR=sandbox +rm -fr $OUTDIR; mkdir -p $OUTDIR + +# Check for os +SEP=: +case "`uname`" in + CYGWIN* ) + SEP=";" + ;; +esac + +function cleanup () { + kill -9 ${PID_1} ${PID_2} ${PID_3} ${PID_4} 1> /dev/null 2>&1 + wait 1> /dev/null 2>&1 +} + +function countDataCells () { + # Number of tokens we didn't find + COUNT=0 + # Number of tokens we looked for + TOTAL=0 + FILE=../smalldata/gbm_test/Mfgdata_gaussian_GBM_testing.csv + while IFS= read -r line; do + IFS=',' read -r -a array <<< "$line" + for el in "${array[@]}"; do + # I don't check for "\d+" since things like "1" and "11" can appear as part of SSL encrypted gibberish + # and it's not trivial to distinguish it from actual data + if [[ ! $el =~ \"[0-9]+\" ]]; then + grep -q -- "$el" sandbox/test.out + COUNT=$((COUNT + $?)) + TOTAL=$((TOTAL+1)) + fi + done + # Because the column names are mostly one letter they might actually appear + # in the encrypted TCP gibberish so we'll skip them but check the actual data + done <<< "$(sed 1d $FILE)" + echo "Found $((TOTAL-COUNT)) tokens from a total of $TOTAL" 1>&2 + # Number of tokens we found + echo $((TOTAL-COUNT)) +} + +function testOutput () { + # Grab the nonSSL data field from the packet body in human readable format + tshark -x -r $OUTDIR/h2o-nonSSL.pcap -T text | awk -F " " '{print $3}' > $OUTDIR/test_tmp.out + # Remove all newlines and spaces for future grep + cat $OUTDIR/test_tmp.out | awk 1 RS='\n' ORS= | awk '{gsub(/ /,"")}1' > $OUTDIR/test.out + + # Check that all the data we used as input is in the TCP dump in not encrypted form! + FOUND=$(countDataCells) + if [[ $FOUND -eq 0 ]]; then + echo "Haven't found any of the original data in the nonSSL TCP dump." + echo h2o-algos junit tests FAILED + exit 1 + fi + + # Grab the SSL data field from the packet body in human readable format + tshark -x -r $OUTDIR/h2o-SSL.pcap -T text | awk -F " " '{print $3}' > $OUTDIR/test_tmp.out + cat $OUTDIR/test_tmp.out | awk 1 RS='\n' ORS= | awk '{gsub(/ /,"")}1' > $OUTDIR/test.out + + # Check that none of the data we used as input is in the TCP dump in notencrypted form! + FOUND=$(countDataCells) + if [[ $FOUND -ne 0 ]]; then + echo "Found original data in the SSL TCP dump." + echo h2o-algos junit tests FAILED + exit 1 + fi + + echo h2o-algos junit tests PASSED + exit 0 +} + +trap cleanup SIGTERM SIGINT + +# Find java command +if [ -z "$TEST_JAVA_HOME" ]; then + # Use default + JAVA_CMD="java" +else + # Use test java home + JAVA_CMD="$TEST_JAVA_HOME/bin/java" + # Increase XMX since JAVA_HOME can point to java6 + JAVA6_REGEXP=".*1\.6.*" + if [[ $TEST_JAVA_HOME =~ $JAVA6_REGEXP ]]; then + JAVA_CMD="${JAVA_CMD}" + fi +fi + +JVM="nice $JAVA_CMD -ea -Xmx3g -Xms3g -cp build/libs/h2o-algos-test.jar${SEP}build/libs/h2o-algos.jar${SEP}../h2o-core/build/libs/h2o-core-test.jar${SEP}../h2o-core/build/libs/h2o-core.jar${SEP}../h2o-genmodel/build/libs/h2o-genmodel.jar${SEP}../lib/*" +echo "$JVM" > $OUTDIR/jvm_cmd.txt + +SSL="" +# Launch 3 helper JVMs. All output redir'd at the OS level to sandbox files. +CLUSTER_NAME=junit_cluster_$$ +CLUSTER_BASEPORT=44000 +$JVM water.H2O -name $CLUSTER_NAME -baseport $CLUSTER_BASEPORT -ga_opt_out $SSL 1> $OUTDIR/out.1 2>&1 & PID_1=$! +$JVM water.H2O -name $CLUSTER_NAME -baseport $CLUSTER_BASEPORT -ga_opt_out $SSL 1> $OUTDIR/out.2 2>&1 & PID_2=$! +$JVM water.H2O -name $CLUSTER_NAME -baseport $CLUSTER_BASEPORT -ga_opt_out $SSL 1> $OUTDIR/out.3 2>&1 & PID_3=$! + +INTERFACE=${TSHARK_INTERFACE:-"eth0"} + +echo Running nonSSL test on interface ${INTERFACE}... + +pwd + +tshark -i ${INTERFACE} -T fields -e data -w ${OUTDIR}/h2o-nonSSL.pcap 1> /dev/null 2>&1 & PID_4=$! + +java -Dai.h2o.name=$CLUSTER_NAME -ea \ + -cp "build/libs/h2o-algos-test.jar${SEP}build/libs/h2o-algos.jar${SEP}../h2o-core/build/libs/h2o-core.jar${SEP}../h2o-core/build/libs/h2o-core-test.jar${SEP}../h2o-genmodel/build/libs/h2o-genmodel.jar${SEP}../lib/*" \ + water.network.SSLEncryptionTest + +echo After test cleanup... + +cleanup + +SSL_CONFIG="src/test/resources/ssl.properties" +SSL="-internal_security_conf "$SSL_CONFIG +CLUSTER_NAME=$CLUSTER_NAME"_2" +$JVM water.H2O -name $CLUSTER_NAME -baseport $CLUSTER_BASEPORT -ga_opt_out $SSL 1> $OUTDIR/out.1 2>&1 & PID_1=$! +$JVM water.H2O -name $CLUSTER_NAME -baseport $CLUSTER_BASEPORT -ga_opt_out $SSL 1> $OUTDIR/out.2 2>&1 & PID_2=$! +$JVM water.H2O -name $CLUSTER_NAME -baseport $CLUSTER_BASEPORT -ga_opt_out $SSL 1> $OUTDIR/out.3 2>&1 & PID_3=$! + +echo Running SSL test... + +tshark -i ${INTERFACE} -T fields -e data -w ${OUTDIR}/h2o-SSL.pcap 1> /dev/null 2>&1 & PID_4=$! + +java -Dai.h2o.name=$CLUSTER_NAME -ea \ + -cp "build/libs/h2o-algos-test.jar${SEP}build/libs/h2o-algos.jar${SEP}../h2o-core/build/libs/h2o-core.jar${SEP}../h2o-core/build/libs/h2o-core-test.jar${SEP}../h2o-genmodel/build/libs/h2o-genmodel.jar${SEP}../lib/*" \ + water.network.SSLEncryptionTest src/test/resources/ssl.properties + +echo After test cleanup... + +cleanup + +testOutput diff --git a/h2o-bindings/bin/gen_all.py b/h2o-bindings/bin/gen_all.py index 9df68fd39c3..ca31881ba8e 100755 --- a/h2o-bindings/bin/gen_all.py +++ b/h2o-bindings/bin/gen_all.py @@ -31,7 +31,8 @@ base_port=48000, xmx="4g", cp="", - output_dir=results_dir + output_dir=results_dir, + test_ssl=False ) cloud.start() cloud.wait_for_cloud_to_be_up() diff --git a/h2o-core/src/main/java/water/AutoBuffer.java b/h2o-core/src/main/java/water/AutoBuffer.java index fac54a828fc..bc34ebe760b 100644 --- a/h2o-core/src/main/java/water/AutoBuffer.java +++ b/h2o-core/src/main/java/water/AutoBuffer.java @@ -8,6 +8,7 @@ import java.util.ArrayList; import java.util.Random; +import water.network.SocketChannelUtils; import water.util.Log; import water.util.TwoDimTable; @@ -130,7 +131,7 @@ /** Incoming TCP request. Make a read-mode AutoBuffer from the open Channel, * figure the originating H2ONode from the first few bytes read. */ - AutoBuffer( SocketChannel sock ) throws IOException { + AutoBuffer( ByteChannel sock, InetAddress remoteAddress ) throws IOException { _chan = sock; raisePriority(); // Make TCP priority high _bb = BBP_BIG.make(); // Get a big / TPC-sized ByteBuffer @@ -138,7 +139,7 @@ _read = true; // Reading by default _firstPage = true; // Read Inet from socket, port from the stream, figure out H2ONode - _h2o = H2ONode.intern(sock.socket().getInetAddress(), getPort()); + _h2o = H2ONode.intern(remoteAddress, getPort()); _firstPage = true; // Yes, must reset this. _time_start_ms = System.currentTimeMillis(); _persist = Value.TCP; @@ -425,14 +426,14 @@ public final int close() { assert x == 0xab : "AB.close instead of 0xab sentinel got "+x+", "+this; assert _chan != null; // chan set by incoming reader, since we KNOW it is a TCP // Write the reader-handshake-byte. - ((SocketChannel)_chan).socket().getOutputStream().write(0xcd); + SocketChannelUtils.underlyingSocketChannel(_chan).socket().getOutputStream().write(0xcd); // do not close actually reader socket; recycle it in TCPReader thread } else { // Writer? put1(0xab); // Write one-more byte ; might set _chan from null to not-null sendPartial(); // Finish partial writes; might set _chan from null to not-null assert _chan != null; // _chan is set not-null now! // Read the writer-handshake-byte. - int x = ((SocketChannel)_chan).socket().getInputStream().read(); + int x = SocketChannelUtils.underlyingSocketChannel(_chan).socket().getInputStream().read(); // either TCP con was dropped or other side closed connection without reading/confirming (e.g. task was cancelled). if( x == -1 ) throw new IOException("Other side closed connection before handshake byte read"); assert x == 0xcd : "Handshake; writer expected a 0xcd from reader but got "+x; @@ -442,7 +443,7 @@ public final int close() { _chan = null; // No channel now, since i/o error throw ioe; // Rethrow after close } finally { - if( !_read ) _h2o.freeTCPSocket((SocketChannel)_chan); // Recycle writable TCP channel + if( !_read ) _h2o.freeTCPSocket(_chan); // Recycle writable TCP channel restorePriority(); // And if we raised priority, lower it back } @@ -486,7 +487,7 @@ void drainClose() { if( chan != null ) { // Channel assumed sick from prior IOException try { chan.close(); } catch( IOException ignore ) {} // Silently close _chan = null; // No channel now! - if( !_read && chan instanceof SocketChannel) _h2o.freeTCPSocket((SocketChannel)chan); // Recycle writable TCP channel + if( !_read && SocketChannelUtils.isSocketChannel(chan)) _h2o.freeTCPSocket(chan); // Recycle writable TCP channel } restorePriority(); // And if we raised priority, lower it back bbFree(); @@ -496,7 +497,7 @@ void drainClose() { } // True if we opened a TCP channel, or will open one to close-and-send - boolean hasTCP() { assert !isClosed(); return _chan instanceof SocketChannel || (_h2o!=null && _bb.position() >= MTU); } + boolean hasTCP() { assert !isClosed(); return SocketChannelUtils.isSocketChannel(_chan) || (_h2o!=null && _bb.position() >= MTU); } // Size in bytes sent, after a close() int size() { return _size; } @@ -527,7 +528,7 @@ void drainClose() { // over with. private void raisePriority() { if(_oldPrior == -1){ - assert _chan instanceof SocketChannel; + assert SocketChannelUtils.isSocketChannel(_chan); _oldPrior = Thread.currentThread().getPriority(); Thread.currentThread().setPriority(Thread.MAX_PRIORITY-1); } @@ -650,7 +651,7 @@ private ByteBuffer sendPartial() { long ns = System.nanoTime(); while( _bb.hasRemaining() ) { _chan.write(_bb); - if( RANDOM_TCP_DROP != null &&_chan instanceof SocketChannel && RANDOM_TCP_DROP.nextInt(100) == 0 ) + if( RANDOM_TCP_DROP != null && SocketChannelUtils.isSocketChannel(_chan) && RANDOM_TCP_DROP.nextInt(100) == 0 ) throw new IOException("Random TCP Write Fail"); } _time_io_ns += (System.nanoTime()-ns); diff --git a/h2o-core/src/main/java/water/H2O.java b/h2o-core/src/main/java/water/H2O.java index c7152566a2a..986ad39c76f 100644 --- a/h2o-core/src/main/java/water/H2O.java +++ b/h2o-core/src/main/java/water/H2O.java @@ -160,6 +160,9 @@ public static void printHelp() { " -login_conf \n" + " LoginService configuration file\n" + "\n" + + " -internal_security_conf \n" + + " Path (absolute or relative) to a file containing all internal security related configurations\n" + + "\n" + "Cloud formation behavior:\n" + "\n" + " New H2O nodes join together to form a cloud at startup time.\n" + @@ -298,6 +301,9 @@ public static void printHelp() { /** -login_conf is login configuration service file on local filesystem */ public String login_conf = null; + /** -internal_security_conf path (absolute or relative) to a file containing all internal security related configurations */ + public String internal_security_conf = null; + //----------------------------------------------------------------------------------- // Debugging //----------------------------------------------------------------------------------- @@ -521,6 +527,10 @@ else if (s.matches("login_conf")) { i = s.incrementAndCheck(i, args); ARGS.login_conf = args[i]; } + else if (s.matches("internal_security_conf")) { + i = s.incrementAndCheck(i, args); + ARGS.internal_security_conf = args[i]; + } else { parseFailed("Unknown argument (" + s + ")"); } diff --git a/h2o-core/src/main/java/water/H2ONode.java b/h2o-core/src/main/java/water/H2ONode.java index 2b45d6da5e3..460771633e9 100644 --- a/h2o-core/src/main/java/water/H2ONode.java +++ b/h2o-core/src/main/java/water/H2ONode.java @@ -4,6 +4,7 @@ import java.net.*; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import java.nio.channels.ByteChannel; import java.nio.channels.DatagramChannel; import java.nio.channels.SocketChannel; import java.util.*; @@ -14,6 +15,7 @@ import water.nbhm.NonBlockingHashMap; import water.nbhm.NonBlockingHashMapLong; import water.util.ArrayUtils; +import water.network.SocketChannelFactory; import water.util.Log; import water.util.MathUtils; import water.util.UnsafeUtils; @@ -28,6 +30,9 @@ */ public final class H2ONode extends Iced implements Comparable { + transient private SocketChannelFactory _socketFactory; + transient private H2OSecurityManager _security; + transient short _unique_idx; // Dense integer index, skipping 0. NOT cloud-wide unique. transient boolean _announcedLostContact; // True if heartbeat published a no-contact msg transient public long _last_heard_from; // Time in msec since we last heard from this Node @@ -110,6 +115,9 @@ private H2ONode( H2Okey key, short unique_idx ) { _unique_idx = unique_idx; _last_heard_from = System.currentTimeMillis(); _heartbeat = new HeartBeat(); + + _security = new H2OSecurityManager(); + _socketFactory = new SocketChannelFactory(_security); } // --------------- @@ -226,19 +234,19 @@ public static H2ONode self(InetAddress local) { // A queue of available TCP sockets // re-usable TCP socket opened to this node, or null. // This is essentially a BlockingQueue/Stack that allows null. - private transient SocketChannel _socks[] = new SocketChannel[2]; + private transient ByteChannel _socks[] = new ByteChannel[2]; private transient int _socksAvail=_socks.length; // Count of concurrent TCP requests both incoming and outgoing static final AtomicInteger TCPS = new AtomicInteger(0); - SocketChannel getTCPSocket() throws IOException { + ByteChannel getTCPSocket() throws IOException { // Under lock, claim an existing open socket if possible synchronized(this) { // Limit myself to the number of open sockets from node-to-node while( _socksAvail == 0 ) try { wait(1000); } catch( InterruptedException ignored ) { } // Claim an open socket - SocketChannel sock = _socks[--_socksAvail]; + ByteChannel sock = _socks[--_socksAvail]; if( sock != null ) { if( sock.isOpen() ) return sock; // Return existing socket! // Else it's an already-closed socket, lower open TCP count @@ -257,12 +265,14 @@ SocketChannel getTCPSocket() throws IOException { bb.putChar((char)H2O.H2O_PORT); bb.put((byte)0xef); bb.flip(); - while(bb.hasRemaining()) - sock2.write(bb); + ByteChannel wrappedSocket = _socketFactory.clientChannel(sock2, _key.getHostName(), _key.getPort()); + while(bb.hasRemaining()) { + wrappedSocket.write(bb); + } TCPS.incrementAndGet(); // Cluster-wide counting - return sock2; + return wrappedSocket; } - synchronized void freeTCPSocket( SocketChannel sock ) { + synchronized void freeTCPSocket( ByteChannel sock ) { assert 0 <= _socksAvail && _socksAvail < _socks.length; assert TCPS.get() > 0; if( sock != null && !sock.isOpen() ) sock = null; @@ -285,7 +295,7 @@ synchronized void freeTCPSocket( SocketChannel sock ) { // Buffers the small messages together and sends the bytes over via TCP channel. class UDP_TCP_SendThread extends Thread { - private SocketChannel _chan; // Lazily made on demand; closed & reopened on error + private ByteChannel _chan; // Lazily made on demand; closed & reopened on error private final ByteBuffer _bb; // Reusable output large buffer public UDP_TCP_SendThread(){ @@ -349,7 +359,7 @@ void sendBuffer(){ _bb.flip(); // limit set to old position; position set to 0 while( _bb.hasRemaining() ) { try { - SocketChannel chan = _chan == null ? (_chan=openChan()) : _chan; + ByteChannel chan = _chan == null ? (_chan=openChan()) : _chan; chan.write(_bb); } catch(IOException ioe) { @@ -376,7 +386,7 @@ void sendBuffer(){ } // Open channel on first write attempt - private SocketChannel openChan() throws IOException { + private ByteChannel openChan() throws IOException { // Must make a fresh socket SocketChannel sock = SocketChannel.open(); sock.socket().setReuseAddress(true); @@ -389,9 +399,11 @@ private SocketChannel openChan() throws IOException { sock.socket().setTcpNoDelay(true); ByteBuffer bb = ByteBuffer.allocate(4).order(ByteOrder.nativeOrder()); bb.put((byte) 1).putChar((char) H2O.H2O_PORT).put((byte) 0xef).flip(); - while (bb.hasRemaining()) // Write out magic startup sequence - sock.write(bb); - return sock; + ByteChannel wrappedSocket = _socketFactory.clientChannel(sock, isa.getHostName(), isa.getPort()); + while (bb.hasRemaining()) { // Write out magic startup sequence + wrappedSocket.write(bb); + } + return wrappedSocket; } } @@ -572,4 +584,13 @@ void rebooted() { public final H2ONode read_impl( AutoBuffer ab ) { return intern(H2Okey.read(ab)); } public final AutoBuffer writeJSON_impl(AutoBuffer ab) { return ab.putJSONStr("node",_key.toString()); } public final H2ONode readJSON_impl( AutoBuffer ab ) { throw H2O.fail(); } + + + public SocketChannelFactory getSocketFactory() { + return _socketFactory; + } + + public H2OSecurityManager getSecurityManager() { + return _security; + } } diff --git a/h2o-core/src/main/java/water/H2OSecurityManager.java b/h2o-core/src/main/java/water/H2OSecurityManager.java new file mode 100644 index 00000000000..8c882a6964c --- /dev/null +++ b/h2o-core/src/main/java/water/H2OSecurityManager.java @@ -0,0 +1,69 @@ +package water; + +import water.network.SSLContextException; +import water.network.SSLSocketChannelFactory; +import water.util.Log; + +import java.io.IOException; +import java.nio.channels.ByteChannel; +import java.nio.channels.SocketChannel; + +/** + * Takes care of security. + * + * In the long run this class should manage all security aspects of H2O but currently some parts are handled + * in other parts of the codebase. + * + * An instance of this class should be instantiated for each H2O object + * and should follow its lifecycle. + * + * At this stage we support a simple shared secret, handshake based, authentication, which can be turned + * on with the h2o_ssl_enabled parameter. Should the communicating nodes not share a common shared secret + * communication between them will not be possible. While using this parameter the user should *not* enable + * the useUDP parameter, we do not support UDP encryption at this stage and all UDP datagrams will be + * sent in an unencrypted form! + * + * Current state of data encryption: + * + * - HTTP for FlowUI - currently we rely on Jetty's SSL capabilities, authentication can be performed with + * hash login, ldap login or kerberos. The location of secret keys used byt Jetty's SSL server should be + * passed to the jks parameter. + * + * - inter node communication - all TCP based communication is being authenticated and encrypted using SSL + * using JSSE (Java Secure Socket Extension) when then h2o_ssl_enabled parameter is passed. Keystore related + * parameter should also be used as per the documentation. Secure UDP communication through DTLS is not supported + * at this point in time thus useUDP should not be used for SSL enabled clouds. + * + * - in-memory data encryption - currently not supported, using an encrypted drive is recommended + * at least for the swap partition. + * + * - data saved to disk - currently not supported, using an encrypted drive is recommended + * + */ +public class H2OSecurityManager { + + public boolean securityEnabled = false; + private SSLSocketChannelFactory sslSocketChannelFactory; + + H2OSecurityManager() { + try { + if (null != H2O.ARGS.internal_security_conf) { + this.sslSocketChannelFactory = new SSLSocketChannelFactory(); + this.securityEnabled = true; + } + } catch (SSLContextException e) { + Log.err("Node initialized with SSL enabled but failed to create SSLContext. " + + "Node initialization aborted."); + Log.err(e); + H2O.exit(1); + } + } + + public ByteChannel wrapServerChannel(SocketChannel channel) throws IOException { + return sslSocketChannelFactory.wrapServerChannel(channel); + } + + public ByteChannel wrapClientChannel(SocketChannel channel, String host, int port) throws IOException { + return sslSocketChannelFactory.wrapClientChannel(channel, host, port); + } +} diff --git a/h2o-core/src/main/java/water/TCPReceiverThread.java b/h2o-core/src/main/java/water/TCPReceiverThread.java index 675aae4aa1f..23f22d61237 100644 --- a/h2o-core/src/main/java/water/TCPReceiverThread.java +++ b/h2o-core/src/main/java/water/TCPReceiverThread.java @@ -1,11 +1,14 @@ package water; import java.io.IOException; +import java.net.InetAddress; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import java.nio.channels.ByteChannel; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; +import water.network.SocketChannelFactory; import water.util.Log; import water.util.SB; @@ -19,8 +22,14 @@ public class TCPReceiverThread extends Thread { private ServerSocketChannel SOCK; + private SocketChannelFactory socketChannelFactory; - public TCPReceiverThread(ServerSocketChannel sock) { super("TCP-Accept"); SOCK = sock; } + public TCPReceiverThread( + ServerSocketChannel sock) { + super("TCP-Accept"); + SOCK = sock; + this.socketChannelFactory = H2O.SELF.getSocketFactory(); + } // The Run Method. // Started by main() on a single thread, this code manages reading TCP requests @@ -50,22 +59,30 @@ public void run() { // Block for TCP connection and setup to read from it. SocketChannel sock = SOCK.accept(); ByteBuffer bb = ByteBuffer.allocate(4).order(ByteOrder.nativeOrder()); + ByteChannel wrappedSocket = socketChannelFactory.serverChannel(sock); bb.limit(bb.capacity()); bb.position(0); - while(bb.hasRemaining()) // read first 8 bytes - sock.read(bb); + while(bb.hasRemaining()) { // read first 8 bytes + wrappedSocket.read(bb); + } bb.flip(); int chanType = bb.get(); // 1 - small , 2 - big int port = bb.getChar(); int sentinel = (0xFF) & bb.get(); - if(sentinel != 0xef) - throw H2O.fail("missing eom sentinel when opening new tcp channel"); + if(sentinel != 0xef) { + if(H2O.SELF.getSecurityManager().securityEnabled) { + throw new IOException("Missing EOM sentinel when opening new SSL tcp channel."); + } else { + throw H2O.fail("missing eom sentinel when opening new tcp channel"); + } + } // todo compare against current cloud, refuse the con if no match - H2ONode h2o = H2ONode.intern(sock.socket().getInetAddress(),port); + InetAddress inetAddress = sock.socket().getInetAddress(); + H2ONode h2o = H2ONode.intern(inetAddress,port); // Pass off the TCP connection to a separate reader thread switch( chanType ) { - case 1: new UDP_TCP_ReaderThread(h2o, sock).start(); break; - case 2: new TCPReaderThread(sock,new AutoBuffer(sock)).start(); break; + case 1: new UDP_TCP_ReaderThread(h2o, wrappedSocket).start(); break; + case 2: new TCPReaderThread(wrappedSocket,new AutoBuffer(wrappedSocket, inetAddress), inetAddress).start(); break; default: throw H2O.fail("unexpected channel type " + chanType + ", only know 1 - Small and 2 - Big"); } } catch( java.nio.channels.AsynchronousCloseException ex ) { @@ -82,12 +99,15 @@ public void run() { // A private thread for reading from this open socket. static class TCPReaderThread extends Thread { - public SocketChannel _sock; + public ByteChannel _sock; public AutoBuffer _ab; - public TCPReaderThread(SocketChannel sock, AutoBuffer ab) { + private final InetAddress address; + + public TCPReaderThread(ByteChannel sock, AutoBuffer ab, InetAddress address) { super("TCP-"+ab._h2o+"-"+(ab._h2o._tcp_readers++)); _sock = sock; _ab = ab; + this.address = address; setPriority(MAX_PRIORITY-1); } @@ -119,7 +139,7 @@ public void run() { // Reuse open sockets for the next task try { if( !_sock.isOpen() ) break; - _ab = new AutoBuffer(_sock); + _ab = new AutoBuffer(_sock, address); } catch( Exception e ) { // Exceptions here are *normal*, this is an idle TCP connection and // either the OS can time it out, or the cloud might shutdown. We @@ -136,11 +156,11 @@ public void run() { * byte array which is than passed on to FJQ. Each message is expected to * be MSG_SZ(2B) MSG BODY(MSG_SZ*B) EOM MARKER (1B - 0xef). */ static class UDP_TCP_ReaderThread extends Thread { - private final SocketChannel _chan; + private final ByteChannel _chan; private final ByteBuffer _bb; private final H2ONode _h2o; - public UDP_TCP_ReaderThread(H2ONode h2o, SocketChannel chan) { + public UDP_TCP_ReaderThread(H2ONode h2o, ByteChannel chan) { super("UDP-TCP-READ-" + h2o); _h2o = h2o; _chan = chan; diff --git a/h2o-core/src/main/java/water/network/SSLContextException.java b/h2o-core/src/main/java/water/network/SSLContextException.java new file mode 100644 index 00000000000..e8490a85b71 --- /dev/null +++ b/h2o-core/src/main/java/water/network/SSLContextException.java @@ -0,0 +1,7 @@ +package water.network; + +public class SSLContextException extends Throwable { + SSLContextException(String msg, Throwable e) { + super(msg, e); + } +} diff --git a/h2o-core/src/main/java/water/network/SSLProperties.java b/h2o-core/src/main/java/water/network/SSLProperties.java new file mode 100644 index 00000000000..cf391ef8171 --- /dev/null +++ b/h2o-core/src/main/java/water/network/SSLProperties.java @@ -0,0 +1,22 @@ +package water.network; + +import java.util.Properties; + +class SSLProperties extends Properties { + + String[] h2o_ssl_enabled_algorithms() { + String algs = getProperty("h2o_ssl_enabled_algorithms"); + if(null != algs) { + return algs.split(","); + } + return null; + } + + String h2o_ssl_protocol() { return getProperty("h2o_ssl_protocol", "TLSv1.2"); } + + String h2o_ssl_jks_internal() { return getProperty("h2o_ssl_jks_internal"); } + String h2o_ssl_jks_password() { return getProperty("h2o_ssl_jks_password"); } + String h2o_ssl_jts() { return getProperty("h2o_ssl_jts") != null ? getProperty("h2o_ssl_jts") : getProperty("h2o_ssl_jks_internal"); } + String h2o_ssl_jts_password() { return getProperty("h2o_ssl_jts_password") != null ? getProperty("h2o_ssl_jts_password") : getProperty("h2o_ssl_jks_password"); } + +} diff --git a/h2o-core/src/main/java/water/network/SSLSocketChannel.java b/h2o-core/src/main/java/water/network/SSLSocketChannel.java new file mode 100644 index 00000000000..0323a16edb3 --- /dev/null +++ b/h2o-core/src/main/java/water/network/SSLSocketChannel.java @@ -0,0 +1,352 @@ +package water.network; + +import water.H2O; +import water.util.Log; + +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLEngineResult; +import javax.net.ssl.SSLException; +import javax.net.ssl.SSLSession; +import java.io.IOException; +import java.net.SocketException; +import java.nio.ByteBuffer; +import java.nio.channels.ByteChannel; +import java.nio.channels.SocketChannel; + +/** + * This class is based on: + * Oracle's JSSE guide. + * Oracle's SSLEngine demo. + * + * It's a simple wrapper around SocketChannels which enables SSL/TLS + * communication using {@link javax.net.ssl.SSLEngine}. + */ +class SSLSocketChannel implements ByteChannel { + + // Empty buffer for handshakes + private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0); + + // Buffer holding encrypted outgoing data + private ByteBuffer netInBuffer; + // Buffer holding encrypted incoming data + private ByteBuffer netOutBuffer; + + // Buffer holding decrypted incoming data + private ByteBuffer peerAppData; + + private SocketChannel channel = null; + private SSLEngine sslEngine = null; + + private boolean closing = false; + private boolean closed = false; + + private boolean handshakeComplete = false; + + SSLSocketChannel(SocketChannel channel, SSLEngine sslEngine) throws IOException { + this.channel = channel; + this.sslEngine = sslEngine; + + sslEngine.setEnableSessionCreation(true); + SSLSession session = sslEngine.getSession(); + prepareBuffers(session); + + handshake(); + } + + @Override + public boolean isOpen() { + return channel.isOpen(); + } + + @Override + public void close() throws IOException { + closing = true; + sslEngine.closeOutbound(); + sslEngine.getSession().invalidate(); + netOutBuffer.clear(); + channel.close(); + closed = true; + } + + private void prepareBuffers(SSLSession session) throws SocketException { + int appBufferSize = session.getApplicationBufferSize(); + // Less is not more. More is more. Bigger than the app buffer size so successful unwraps() don't cause BUFFER_OVERFLOW + // Value 64 was based on other frameworks using it and some manual testing. Might require tuning in the future. + peerAppData = ByteBuffer.allocate(appBufferSize + 64); + + int netBufferSize = session.getPacketBufferSize(); + netInBuffer = ByteBuffer.allocate(netBufferSize); + netOutBuffer = ByteBuffer.allocate(netBufferSize); + + } + + // ----------------------------------------------------------- + // HANDSHAKE + // ----------------------------------------------------------- + + private SSLEngineResult.HandshakeStatus hs; + + private void handshake() throws IOException { + Log.debug("Starting SSL handshake..."); + sslEngine.beginHandshake(); + + hs = sslEngine.getHandshakeStatus(); + SSLEngineResult initHandshakeStatus; + + while (!handshakeComplete) { + switch (hs) { + case NOT_HANDSHAKING: { + //should never happen + throw new IOException("NOT_HANDSHAKING during handshake"); + } + case FINISHED: + handshakeComplete = !netOutBuffer.hasRemaining(); + break; + case NEED_WRAP: { + initHandshakeStatus = handshakeWrap(); + if ( initHandshakeStatus.getStatus() == SSLEngineResult.Status.OK ){ + if (hs == SSLEngineResult.HandshakeStatus.NEED_TASK) { + tasks(); + } + } + break; + } + case NEED_UNWRAP: { + initHandshakeStatus = handshakeUnwrap(); + if ( initHandshakeStatus.getStatus() == SSLEngineResult.Status.OK ){ + if (hs == SSLEngineResult.HandshakeStatus.NEED_TASK) { + tasks(); + } + } + break; + } + // SSL needs to perform some delegating tasks before it can continue. + // Those tasks will be run in the same thread and can be blocking. + case NEED_TASK: + tasks(); + break; + } + } + Log.debug("SSL handshake finished successfully!"); + } + + private synchronized SSLEngineResult handshakeWrap() throws IOException { + netOutBuffer.clear(); + SSLEngineResult wrapResult = sslEngine.wrap(EMPTY_BUFFER, netOutBuffer); + netOutBuffer.flip(); + hs = wrapResult.getHandshakeStatus(); + channel.write(netOutBuffer); + return wrapResult; + } + + private synchronized SSLEngineResult handshakeUnwrap() throws IOException { + if (netInBuffer.position() == netInBuffer.limit()) { + netInBuffer.clear(); + } + + channel.read(netInBuffer); + + SSLEngineResult unwrapResult; + peerAppData.clear(); + + do { + netInBuffer.flip(); + + unwrapResult = sslEngine.unwrap(netInBuffer, peerAppData); + + netInBuffer.compact(); + + hs = unwrapResult.getHandshakeStatus(); + + switch (unwrapResult.getStatus()) { + case OK: + case BUFFER_UNDERFLOW: { + if (unwrapResult.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_TASK) { + tasks(); + } + break; + } + case BUFFER_OVERFLOW: { + int applicationBufferSize = sslEngine.getSession().getApplicationBufferSize(); + if (applicationBufferSize > peerAppData.capacity()) { + ByteBuffer b = ByteBuffer.allocate(applicationBufferSize + peerAppData.position()); + peerAppData.flip(); + b.put(peerAppData); + peerAppData = b; + } else { + peerAppData.compact(); + } + break; + } + default: + throw new IOException("Failed to SSL unwrap with status " + unwrapResult.getStatus()); + } + } while(unwrapResult.getStatus() == SSLEngineResult.Status.OK && + hs == SSLEngineResult.HandshakeStatus.NEED_UNWRAP); + + return unwrapResult; + } + + // ----------------------------------------------------------- + // READ AND WRITE + // ----------------------------------------------------------- + + @Override + public int read(ByteBuffer dst) throws IOException { + if (closing || closed) return -1; + + return unwrap(dst); + } + + private synchronized int unwrap(ByteBuffer dst) throws IOException { + int read = 0; + // We have outstanding data in our incoming decrypted buffer, use that data first to fill dst + if(!dst.hasRemaining()) { + return 0; + } + + if(peerAppData.position() != 0) { + read += copy(peerAppData, dst); + return read; + } + + if(netInBuffer.position() == 0) { + channel.read(netInBuffer); + } + + while(netInBuffer.position() != 0) { + netInBuffer.flip(); + + // We still might have left data here if dst was smaller than the amount of data in peerAppData + if(peerAppData.position() != 0) { + peerAppData.compact(); + } + + SSLEngineResult unwrapResult = sslEngine.unwrap(netInBuffer, peerAppData); + + switch (unwrapResult.getStatus()) { + case OK: { + unwrapResult.bytesProduced(); + if (unwrapResult.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_TASK) tasks(); + break; + } + case BUFFER_OVERFLOW: { + int applicationBufferSize = sslEngine.getSession().getApplicationBufferSize(); + if (applicationBufferSize > peerAppData.capacity()) { + int appSize = applicationBufferSize; + ByteBuffer b = ByteBuffer.allocate(appSize + peerAppData.position()); + peerAppData.flip(); + b.put(peerAppData); + peerAppData = b; + } else { + // We tried to unwrap data into peerAppData which means there's leftover in netInBuffer + // the upcoming read should read int potential new data after the leftover + netInBuffer.position(netInBuffer.limit()); + netInBuffer.limit(netInBuffer.capacity()); + peerAppData.compact(); + if(!dst.hasRemaining()) { + return read; + } + } + break; + } + case BUFFER_UNDERFLOW: { + int packetBufferSize = sslEngine.getSession().getPacketBufferSize(); + if (packetBufferSize > netInBuffer.capacity()) { + int netSize = packetBufferSize; + if (netSize > netInBuffer.capacity()) { + ByteBuffer b = ByteBuffer.allocate(netSize); + netInBuffer.flip(); + b.put(netInBuffer); + netInBuffer = b; + } + } else { + // We have some leftover data from unwrap but no enough. + // We need to read in more data from the socket AFTER the current data. + netInBuffer.position(netInBuffer.limit()); + netInBuffer.limit(netInBuffer.capacity()); + channel.read(netInBuffer); + continue; + } + break; + } + default: + throw new IOException("Failed to SSL unwrap with status " + unwrapResult.getStatus()); + } + + if (peerAppData != dst && dst.hasRemaining()) { + peerAppData.flip(); + read += copy(peerAppData, dst); + if(!dst.hasRemaining()) { + netInBuffer.compact(); + return read; + } + } + + netInBuffer.compact(); + } + return read; + } + + private int copy(ByteBuffer src, ByteBuffer dst) { + int toCopy = Math.min(src.remaining(), dst.remaining()); + + dst.put(src.array(), src.position(), toCopy); + src.position(src.position() + toCopy); + + if(!src.hasRemaining()) { + src.clear(); + } + return toCopy; + } + + @Override + public int write(ByteBuffer src) throws IOException { + if(closing || closed) { + throw new IOException("Cannot perform socket write, the socket is closed (or being closed)."); + } + + int wrote = 0; + // src can be much bigger than what our SSL session allows to send in one go + while (src.hasRemaining()) { + netOutBuffer.clear(); + + SSLEngineResult wrapResult = sslEngine.wrap(src, netOutBuffer); + netOutBuffer.flip(); + + if (wrapResult.getStatus() == SSLEngineResult.Status.OK) { + if (wrapResult.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_TASK) tasks(); + } + + while (netOutBuffer.hasRemaining()) { + wrote += channel.write(netOutBuffer); + } + } + + return wrote; + } + + // ----------------------------------------------------------- + // MISC + // ----------------------------------------------------------- + + private void tasks() { + Runnable r; + while ( (r = sslEngine.getDelegatedTask()) != null) { + r.run(); + } + hs = sslEngine.getHandshakeStatus(); + } + + public SocketChannel channel() { + return channel; + } + + SSLEngine getEngine() { + return sslEngine; + } + + boolean isHandshakeComplete() { + return handshakeComplete; + } +} diff --git a/h2o-core/src/main/java/water/network/SSLSocketChannelFactory.java b/h2o-core/src/main/java/water/network/SSLSocketChannelFactory.java new file mode 100644 index 00000000000..da9543a272e --- /dev/null +++ b/h2o-core/src/main/java/water/network/SSLSocketChannelFactory.java @@ -0,0 +1,114 @@ +package water.network; + +import water.H2O; +import water.util.Log; + +import javax.net.ssl.*; +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.channels.ByteChannel; +import java.nio.channels.SocketChannel; +import java.security.*; +import java.security.cert.CertificateException; + +public class SSLSocketChannelFactory { + + private SSLContext sslContext = null; + private SSLProperties properties = null; + + public SSLSocketChannelFactory() throws SSLContextException { + try { + SSLProperties props = new SSLProperties(); + props.load(new FileInputStream(H2O.ARGS.internal_security_conf)); + init(props); + } catch (IOException e) { + Log.err("Failed to initialized SSL context.", e); + throw new SSLContextException("Failed to initialized SSL context.", e); + } + } + + public SSLSocketChannelFactory(SSLProperties props) throws SSLContextException { + init(props); + } + + private void init(SSLProperties props) throws SSLContextException { + properties = props; + try { + if (requiredParamsPresent()) { + this.sslContext = SSLContext.getInstance(properties.h2o_ssl_protocol()); + this.sslContext.init(keyManager(), trustManager(), null); + } else { + this.sslContext = SSLContext.getDefault(); + } + } catch (NoSuchAlgorithmException e) { + Log.err("Failed to initialized SSL context.", e); + throw new SSLContextException("Failed to initialized SSL context.", e); + } catch (IOException e) { + Log.err("Failed to initialized SSL context.", e); + throw new SSLContextException("Failed to initialized SSL context.", e); + } catch (UnrecoverableKeyException e) { + Log.err("Failed to initialized SSL context.", e); + throw new SSLContextException("Failed to initialized SSL context.", e); + } catch (KeyStoreException e) { + Log.err("Failed to initialized SSL context.", e); + throw new SSLContextException("Failed to initialized SSL context.", e); + } catch (KeyManagementException e) { + Log.err("Failed to initialized SSL context.", e); + throw new SSLContextException("Failed to initialized SSL context.", e); + } catch (CertificateException e) { + Log.err("Failed to initialized SSL context.", e); + throw new SSLContextException("Failed to initialized SSL context.", e); + } + } + + + private boolean requiredParamsPresent() { + return null != properties.h2o_ssl_jks_internal() && + null != properties.h2o_ssl_jks_password(); + } + + private TrustManager[] trustManager() throws + KeyStoreException, IOException, CertificateException, NoSuchAlgorithmException { + KeyStore ksTrust = KeyStore.getInstance("JKS"); + + ksTrust.load( + new FileInputStream(properties.h2o_ssl_jts()), + properties.h2o_ssl_jts_password().toCharArray() + ); + TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + tmf.init(ksTrust); + return tmf.getTrustManagers(); + } + + private KeyManager[] keyManager() throws KeyStoreException, IOException, CertificateException, NoSuchAlgorithmException, UnrecoverableKeyException { + KeyStore ksKeys = KeyStore.getInstance("JKS"); + + ksKeys.load(new FileInputStream(properties.h2o_ssl_jks_internal()), + properties.h2o_ssl_jks_password().toCharArray() + ); + KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + kmf.init(ksKeys, properties.h2o_ssl_jks_password().toCharArray()); + return kmf.getKeyManagers(); + } + + public ByteChannel wrapClientChannel( + SocketChannel channel, + String host, + int port) throws IOException { + SSLEngine sslEngine = sslContext.createSSLEngine(host, port); + sslEngine.setUseClientMode(false); + if (null != properties.h2o_ssl_enabled_algorithms()) { + sslEngine.setEnabledCipherSuites(properties.h2o_ssl_enabled_algorithms()); + } + return new SSLSocketChannel(channel, sslEngine); + } + + public ByteChannel wrapServerChannel(SocketChannel channel) throws IOException { + SSLEngine sslEngine = sslContext.createSSLEngine(); + sslEngine.setUseClientMode(true); + if (null != properties.h2o_ssl_enabled_algorithms()) { + sslEngine.setEnabledCipherSuites(properties.h2o_ssl_enabled_algorithms()); + } + return new SSLSocketChannel(channel, sslEngine); + } +} diff --git a/h2o-core/src/main/java/water/network/SecurityUtils.java b/h2o-core/src/main/java/water/network/SecurityUtils.java new file mode 100644 index 00000000000..767fd7407e0 --- /dev/null +++ b/h2o-core/src/main/java/water/network/SecurityUtils.java @@ -0,0 +1,124 @@ +package water.network; + +import water.util.Log; + +import java.io.*; +import java.security.*; +import java.util.Properties; + +public class SecurityUtils { + + private static SecureRandom RANDOM = new SecureRandom(); + private static final String AB = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; + + private final static String[] keyToolCandidates = new String[]{ + "sun.security.tools.KeyTool", // Java6 + "sun.security.tools.keytool.Main", // Java7+ + "com.ibm.crypto.tools.KeyTool" // IBM Java + }; + + private static StoreCredentials generateKeystore(String password) throws Exception { + return generateKeystore(password, "h2o-internal.jks", ""); + } + + private static StoreCredentials generateKeystore(String password, String name, String location) throws Exception { + String path = null != location && !location.isEmpty() ? location + File.pathSeparator + name : name; + if(new File(path).exists()) { + throw new IllegalStateException("A file under the location " + path + " already exists. Please delete it first."); + } + + String[] genKeyArgs = new String[]{ + "-genkeypair", + "-alias", "h2o-internal", + "-keyalg", "RSA", + "-sigalg", "SHA256withRSA", + "-dname", "CN=Java", + "-storetype", "JKS", + "-keypass", password, + "-keystore", path, + "-storepass", password, + "-validity", "3650" + }; + + Class keytool = getKeyToolClass(); + + keytool.getMethod("main", String[].class).invoke(null, (Object) genKeyArgs); + + return new StoreCredentials(name, location, password); + } + + private static Class getKeyToolClass() { + for (String keyToolCandidate : keyToolCandidates) { + try { + return Class.forName(keyToolCandidate); + } catch (Exception e) { + // Ignore, try other candidates + } + } + + // Unsuported JRE/JDK + String errorMsg = "This version of Java is not supported. Please use Oracle/OpenJDK/IBM JDK version 6/7/8."; + Log.err(errorMsg); + throw new IllegalStateException(errorMsg); + } + + public static SSLCredentials generateSSLPair(String passwd, String name, String location) throws Exception { + StoreCredentials jks = generateKeystore(passwd, name, location); + return new SSLCredentials(jks, jks); + } + + public static SSLCredentials generateSSLPair() throws Exception { + StoreCredentials jks = generateKeystore(passwordGenerator(16)); + return new SSLCredentials(jks, jks); + } + + private static String passwordGenerator(int len) { + StringBuilder sb = new StringBuilder(len); + for (int i = 0; i < len; i++) { + sb.append(AB.charAt(RANDOM.nextInt(AB.length()))); + } + return sb.toString(); + } + + public static String generateSSLConfig(SSLCredentials credentials) throws IOException { + return generateSSLConfig(credentials, "ssl.properties"); + } + + public static String generateSSLConfig(SSLCredentials credentials, String file) throws IOException { + Properties sslConfig = new Properties(); + sslConfig.put("h2o_ssl_protocol", "TLSv1.2"); + sslConfig.put("h2o_ssl_jks_internal", credentials.jks.getLocation()); + sslConfig.put("h2o_ssl_jks_password", credentials.jks.pass); + sslConfig.put("h2o_ssl_jts", credentials.jts.getLocation()); + sslConfig.put("h2o_ssl_jts_password", credentials.jts.pass); + FileOutputStream output = new FileOutputStream(file); + sslConfig.store(output, ""); + return file; + } + + public static class StoreCredentials { + public String name = null; + public String path = null; + public String pass = null; + + StoreCredentials(String name, String path, String pass) { + this.name = name; + this.path = path; + this.pass = pass; + } + + public String getLocation() { + return null != path && !path.isEmpty() ? path + File.pathSeparator + name : name; + } + } + + public static class SSLCredentials { + public StoreCredentials jks; + public StoreCredentials jts; + + SSLCredentials(StoreCredentials jks, StoreCredentials jts) { + this.jks = jks; + this.jts = jts; + } + } +} diff --git a/h2o-core/src/main/java/water/network/SocketChannelFactory.java b/h2o-core/src/main/java/water/network/SocketChannelFactory.java new file mode 100644 index 00000000000..58bbf02f2a7 --- /dev/null +++ b/h2o-core/src/main/java/water/network/SocketChannelFactory.java @@ -0,0 +1,38 @@ +package water.network; + +import water.H2OSecurityManager; + +import java.io.IOException; +import java.nio.channels.ByteChannel; +import java.nio.channels.SocketChannel; + +/** + * Creates either a raw or an SSL/TLS wrapped socket depending on + * the node's configuration. All sockets used in the application should be + * created using this class. + */ +public class SocketChannelFactory { + + private H2OSecurityManager sm; + + public SocketChannelFactory(H2OSecurityManager sm) { + this.sm = sm; + } + + public ByteChannel serverChannel(ByteChannel channel) throws IOException { + if(sm.securityEnabled && !(channel instanceof SSLSocketChannel)) { + return sm.wrapServerChannel((SocketChannel)channel); + } else { + return channel; + } + } + + public ByteChannel clientChannel(ByteChannel channel, String host, int port) throws IOException { + if(sm.securityEnabled && !(channel instanceof SSLSocketChannel)) { + return sm.wrapClientChannel((SocketChannel)channel, host, port); + } else { + return channel; + } + } + +} diff --git a/h2o-core/src/main/java/water/network/SocketChannelUtils.java b/h2o-core/src/main/java/water/network/SocketChannelUtils.java new file mode 100644 index 00000000000..efbaff64908 --- /dev/null +++ b/h2o-core/src/main/java/water/network/SocketChannelUtils.java @@ -0,0 +1,23 @@ +package water.network; + +import java.nio.channels.ByteChannel; +import java.nio.channels.SocketChannel; + +public class SocketChannelUtils { + + public static boolean isSocketChannel(ByteChannel channel) { + return channel instanceof SocketChannel || channel instanceof SSLSocketChannel; + } + + public static SocketChannel underlyingSocketChannel(ByteChannel channel) { + if(channel instanceof SSLSocketChannel) { + return ((SSLSocketChannel) channel).channel(); + } else if(channel instanceof SocketChannel) { + return (SocketChannel) channel; + } + throw new UnsupportedOperationException( + "Channel is not a socket channel. Cannot retrieve the underlying channel." + ); + } + +} diff --git a/h2o-core/src/test/java/water/TestUtil.java b/h2o-core/src/test/java/water/TestUtil.java index 100726a5e8a..752c5c0d662 100644 --- a/h2o-core/src/test/java/water/TestUtil.java +++ b/h2o-core/src/test/java/water/TestUtil.java @@ -161,7 +161,7 @@ public void evaluate() throws Throwable {} /** Hunt for test files in likely places. Null if cannot find. * @param fname Test filename * @return Found file or null */ - protected static File find_test_file_static(String fname) { + public static File find_test_file_static(String fname) { // When run from eclipse, the working directory is different. // Try pointing at another likely place File file = new File(fname); diff --git a/h2o-core/src/test/java/water/network/SSLSocketChannelFactoryTest.java b/h2o-core/src/test/java/water/network/SSLSocketChannelFactoryTest.java new file mode 100644 index 00000000000..5b16b0018cf --- /dev/null +++ b/h2o-core/src/test/java/water/network/SSLSocketChannelFactoryTest.java @@ -0,0 +1,220 @@ +package water.network; + +import org.junit.Test; +import water.TestUtil; +import water.util.Log; + +import javax.net.ssl.SSLException; +import java.io.IOException; +import java.net.BindException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; + +import static org.junit.Assert.*; + +public class SSLSocketChannelFactoryTest { + + private int port = 9999; + + @Test + public void shouldHandshake() throws IOException, SSLContextException, BrokenBarrierException, InterruptedException { + SSLProperties props = new SSLProperties(); + props.put("h2o_ssl_protocol", "TLSv1.2"); + props.put("h2o_ssl_jks_internal", TestUtil.find_test_file_static("src/test/resources/keystore.jks").getPath()); + props.put("h2o_ssl_jks_password", "password"); + props.put("h2o_ssl_jts", TestUtil.find_test_file_static("src/test/resources/cacerts.jks").getPath()); + props.put("h2o_ssl_jts_password", "password"); + + final SSLSocketChannelFactory factory = new SSLSocketChannelFactory(props); + + final CyclicBarrier barrier = new CyclicBarrier(2); + final CyclicBarrier testOne = new CyclicBarrier(2); + final CyclicBarrier testTwo = new CyclicBarrier(2); + final CyclicBarrier testThree = new CyclicBarrier(2); + + final boolean[] hs = new boolean[]{true}; + + Thread client = new ClientThread(factory, testOne, testTwo, testThree, barrier); + client.setDaemon(false); + client.start(); + + try { + ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); + serverSocketChannel.socket().setReceiveBufferSize(64 * 1024); + while(true) { + try { + serverSocketChannel.socket().bind(new InetSocketAddress(port)); + break; + } catch (BindException e) { + port++; + } + } + barrier.await(); + SocketChannel sock = serverSocketChannel.accept(); + barrier.reset(); + + SSLSocketChannel wrappedChannel = (SSLSocketChannel) factory.wrapServerChannel(sock); + + assertTrue(wrappedChannel.isHandshakeComplete()); + + // FIRST TEST: SSL -> SSL SMALL COMMUNICATION + ByteBuffer readBuffer = ByteBuffer.allocate(12); + + while (readBuffer.hasRemaining()) { + wrappedChannel.read(readBuffer); + } + + readBuffer.flip(); + + byte[] dst = new byte[12]; + readBuffer.get(dst, 0, 12); + readBuffer.clear(); + + assertEquals("hello, world", new String(dst, StandardCharsets.UTF_8)); + testOne.await(); + + // SECOND TEST: SSL -> SSL BIG COMMUNICATION + int read = 0; + byte[] dstBig = new byte[16]; + ByteBuffer readBufferBig = ByteBuffer.allocate(1024); + while (read < 5 * 64 * 1024) { + while (readBufferBig.position() < 16) { + wrappedChannel.read(readBufferBig); + } + + readBufferBig.flip(); + readBufferBig.get(dstBig, 0, 16); + if (!readBufferBig.hasRemaining()) { + readBufferBig.clear(); + } else { + readBufferBig.compact(); + } + assertEquals("hello, world" + (read % 9) + "!!!", new String(dstBig, StandardCharsets.UTF_8)); + read += 16; + } + + testTwo.await(); + + // THIRD TEST: NON-SSL -> SSL COMMUNICATION + try { + while (readBuffer.hasRemaining()) { + wrappedChannel.read(readBuffer); + } + fail(); + } catch (SSLException e) { + // PASSED + } + + assertTrue(wrappedChannel.getEngine().isInboundDone()); + + testThree.await(); + + // FOURTH TEST: SSL -> NON-SSL COMMUNICATION + readBuffer.clear(); + while (readBuffer.hasRemaining()) { + sock.read(readBuffer); + } + + readBuffer.flip(); + readBuffer.get(dst, 0, 12); + readBuffer.clear(); + + assertNotEquals("hello, world", new String(dst, StandardCharsets.UTF_8)); + } catch (IOException | InterruptedException | BrokenBarrierException e) { + e.printStackTrace(); + } + + barrier.await(); + assertTrue("One of the handshakes failed!", hs[0]); + } + + + private class ClientThread extends Thread { + private final SSLSocketChannelFactory factory; + private final CyclicBarrier testOne; + private final CyclicBarrier testTwo; + private final CyclicBarrier testThree; + private final CyclicBarrier barrier; + + public ClientThread(SSLSocketChannelFactory factory, + CyclicBarrier testOne, + CyclicBarrier testTwo, + CyclicBarrier testThree, + CyclicBarrier barrier) { + this.factory = factory; + this.testOne = testOne; + this.testTwo = testTwo; + this.testThree = testThree; + this.barrier = barrier; + } + + @Override + public void run() { + try { + barrier.await(); + SocketChannel sock = SocketChannel.open(); + sock.socket().setReuseAddress(true); + sock.socket().setSendBufferSize(64 * 1024); + InetSocketAddress isa = new InetSocketAddress("127.0.0.1", port); + sock.connect(isa); + sock.configureBlocking(true); + sock.socket().setTcpNoDelay(true); + + SSLSocketChannel wrappedChannel = (SSLSocketChannel) factory.wrapClientChannel(sock, "127.0.0.1", port); + + // FIRST TEST: SSL -> SSL SMALL COMMUNICATION + ByteBuffer write = ByteBuffer.allocate(1024); + write.put("hello, world".getBytes(StandardCharsets.UTF_8)); + write.flip(); + wrappedChannel.write(write); + + testOne.await(); + + // SECOND TEST: SSL -> SSL BIG COMMUNICATION + ByteBuffer toWriteBig = ByteBuffer.allocate(64 * 1024); + for (int i = 0; i < 5; i++) { + toWriteBig.clear(); + while (toWriteBig.hasRemaining()) { + toWriteBig.put( + ("hello, world" + ((i * 64 * 1024 + toWriteBig.position()) % 9) + "!!!") + .getBytes(StandardCharsets.UTF_8) + ); + } + toWriteBig.flip(); + wrappedChannel.write(toWriteBig); + } + + testTwo.await(); + + // THIRD TEST: NON-SSL -> SSL COMMUNICATION + write.clear(); + write.put("hello, world".getBytes(StandardCharsets.UTF_8)); + write.flip(); + sock.write(write); + + testThree.await(); + + // FOURTH TEST: SSL -> NON-SSL COMMUNICATION + write.clear(); + write.put("hello, world".getBytes(StandardCharsets.UTF_8)); + write.flip(); + wrappedChannel.write(write); + + } catch (IOException | InterruptedException | BrokenBarrierException e) { + e.printStackTrace(); + } finally { + try { + barrier.await(); + } catch (InterruptedException | BrokenBarrierException e) { + e.printStackTrace(); + } + } + } + } + +} \ No newline at end of file diff --git a/h2o-core/src/test/java/water/network/SecurityUtilsTest.java b/h2o-core/src/test/java/water/network/SecurityUtilsTest.java new file mode 100644 index 00000000000..60112251cdd --- /dev/null +++ b/h2o-core/src/test/java/water/network/SecurityUtilsTest.java @@ -0,0 +1,41 @@ +package water.network; + +import org.junit.Test; + +import java.io.File; +import java.io.FileInputStream; +import java.util.Properties; + +import static org.junit.Assert.*; + +public class SecurityUtilsTest { + + @Test + public void shouldGenerateKeystoreAndConfig() throws Exception { + try { + SecurityUtils.SSLCredentials testKeystore = SecurityUtils.generateSSLPair("test123", "h2o-keystore-test.jks", ""); + String configPath = SecurityUtils.generateSSLConfig(testKeystore, "test-ssl.properties"); + + assertTrue(new File(testKeystore.jks.getLocation()).exists()); + + Properties sslConfig = new Properties(); + sslConfig.load(new FileInputStream(configPath)); + assertEquals("TLSv1.2", sslConfig.getProperty("h2o_ssl_protocol")); + assertEquals("h2o-keystore-test.jks", sslConfig.getProperty("h2o_ssl_jks_internal")); + assertEquals("test123", sslConfig.getProperty("h2o_ssl_jks_password")); + assertEquals("h2o-keystore-test.jks", sslConfig.getProperty("h2o_ssl_jts")); + assertEquals("test123", sslConfig.getProperty("h2o_ssl_jts_password")); + } finally { + File keystore = new File("h2o-keystore-test.jks"); + if(keystore.exists()) { + keystore.deleteOnExit(); + } + + File props = new File("test-ssl.properties"); + if(props.exists()) { + props.deleteOnExit(); + } + } + } + +} \ No newline at end of file diff --git a/h2o-core/src/test/resources/cacerts.jks b/h2o-core/src/test/resources/cacerts.jks new file mode 100644 index 00000000000..cfbbf485f32 Binary files /dev/null and b/h2o-core/src/test/resources/cacerts.jks differ diff --git a/h2o-core/src/test/resources/keystore.jks b/h2o-core/src/test/resources/keystore.jks new file mode 100644 index 00000000000..a0b75188d13 Binary files /dev/null and b/h2o-core/src/test/resources/keystore.jks differ diff --git a/h2o-core/testMultiNode.sh b/h2o-core/testMultiNode.sh index 7b4f1516de0..f6ec77662fb 100755 --- a/h2o-core/testMultiNode.sh +++ b/h2o-core/testMultiNode.sh @@ -1,4 +1,5 @@ #!/bin/bash +source ../multiNodeUtils.sh # Argument parsing if [ "$1" = "jacoco" ] @@ -115,10 +116,7 @@ set +f # Launch 4 helper JVMs. All output redir'd at the OS level to sandbox files. CLUSTER_NAME=junit_cluster_$$ CLUSTER_BASEPORT=43000 -$JVM water.H2O -name $CLUSTER_NAME -baseport $CLUSTER_BASEPORT --ga_opt_out 1> $OUTDIR/out.1 2>&1 & PID_1=$! -$JVM water.H2O -name $CLUSTER_NAME -baseport $CLUSTER_BASEPORT --ga_opt_out 1> $OUTDIR/out.2 2>&1 & PID_2=$! -$JVM water.H2O -name $CLUSTER_NAME -baseport $CLUSTER_BASEPORT --ga_opt_out 1> $OUTDIR/out.3 2>&1 & PID_3=$! -$JVM water.H2O -name $CLUSTER_NAME -baseport $CLUSTER_BASEPORT --ga_opt_out 1> $OUTDIR/out.4 2>&1 & PID_4=$! +runCluster # If coverage is being run, then pass a system variable flag so that timeout limits are increased. if [ $JACOCO_ENABLED = true ] @@ -130,7 +128,7 @@ fi # Launch last driver JVM. All output redir'd at the OS level to sandbox files. echo Running h2o-core junit tests... -($JVM -Dbuild.id=$BUILD_ID -Djob.name=$JOB_NAME -Dgit.commit=$GIT_COMMIT -Dgit.branch=$GIT_BRANCH -Dai.h2o.name=$CLUSTER_NAME -Dai.h2o.baseport=$CLUSTER_BASEPORT -Dai.h2o.ga_opt_out=yes $JACOCO_FLAG $JUNIT_RUNNER $JUNIT_TESTS_BOOT `cat $OUTDIR/tests.txt` 2>&1 ; echo $? > $OUTDIR/status.0) 1> $OUTDIR/out.0 2>&1 +($JVM $TEST_SSL -Dbuild.id=$BUILD_ID -Djob.name=$JOB_NAME -Dgit.commit=$GIT_COMMIT -Dgit.branch=$GIT_BRANCH -Dai.h2o.name=$CLUSTER_NAME -Dai.h2o.baseport=$CLUSTER_BASEPORT -Dai.h2o.ga_opt_out=yes $JACOCO_FLAG $JUNIT_RUNNER $JUNIT_TESTS_BOOT `cat $OUTDIR/tests.txt` 2>&1 ; echo $? > $OUTDIR/status.0) 1> $OUTDIR/out.0 2>&1 grep EXECUTION $OUTDIR/out.0 | sed -e "s/.*TEST \(.*\) EXECUTION TIME: \(.*\) (Wall.*/\2 \1/" | sort -gr | head -n 10 >> $OUTDIR/out.0 diff --git a/h2o-hadoop/h2o-mapreduce-generic/src/main/java/water/hadoop/h2odriver.java b/h2o-hadoop/h2o-mapreduce-generic/src/main/java/water/hadoop/h2odriver.java index fa8006972b8..2247b542e1c 100644 --- a/h2o-hadoop/h2o-mapreduce-generic/src/main/java/water/hadoop/h2odriver.java +++ b/h2o-hadoop/h2o-mapreduce-generic/src/main/java/water/hadoop/h2odriver.java @@ -10,6 +10,7 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import water.network.SecurityUtils; import java.io.*; import java.net.*; @@ -83,6 +84,8 @@ static ArrayList extraJvmArguments = new ArrayList(); static String jksFileName = null; static String jksPass = null; + static String securityConf = null; + static boolean internal_secure_connections = false; static boolean hashLogin = false; static boolean ldapLogin = false; static boolean kerberosLogin = false; @@ -783,6 +786,13 @@ else if (s.equals("-jks_pass")) { i++; if (i >= args.length) { usage(); } jksPass = args[i]; } + else if (s.equals("-internal_secure_connections")) { + internal_secure_connections = true; + } + else if (s.equals("-internal_security")) { + i++; if (i >= args.length) { usage(); } + securityConf = args[i]; + } else if (s.equals("-hash_login")) { hashLogin = true; } @@ -1072,7 +1082,7 @@ private int run2(String[] args) throws Exception { // Parse arguments. // ---------------- - parseArgs (args); + parseArgs(args); validateArgs(); // Set up callback address and port. @@ -1117,7 +1127,7 @@ private int run2(String[] args) throws Exception { // YARN container must be sized greater than Xmx. // YARN will kill the application if the RSS of the process is larger than // mapreduce.map.memory.mb. - long jvmInternalMemoryMegabytes = (long) ((double)megabytes * ((double)extraMemPercent)/100.0); + long jvmInternalMemoryMegabytes = (long) ((double) megabytes * ((double) extraMemPercent) / 100.0); processTotalPhysicalMemoryMegabytes = megabytes + jvmInternalMemoryMegabytes; conf.set("mapreduce.job.ubertask.enable", "false"); String mapreduceMapMemoryMb = Long.toString(processTotalPhysicalMemoryMegabytes); @@ -1136,8 +1146,7 @@ private int run2(String[] args) throws Exception { .append((enablePrintCompilation ? " -XX:+PrintCompilation" : "")) .append((enableExcludeMethods ? " -XX:CompileCommand=exclude,water/fvec/NewChunk.append2slowd" : "")) .append((enableLog4jDefaultInitOverride ? " -Dlog4j.defaultInitOverride=true" : "")) - .append((enableDebug ? " -agentlib:jdwp=transport=dt_socket,server=y,suspend=" + (enableSuspend ? "y" : "n") + ",address=" + debugPort : "")) - ; + .append((enableDebug ? " -agentlib:jdwp=transport=dt_socket,server=y,suspend=" + (enableSuspend ? "y" : "n") + ",address=" + debugPort : "")); for (String s : extraJvmArguments) { sb.append(" ").append(s); } @@ -1145,7 +1154,7 @@ private int run2(String[] args) throws Exception { String mapChildJavaOpts = sb.toString(); conf.set("mapreduce.map.java.opts", mapChildJavaOpts); - if (! usingYarn()) { + if (!usingYarn()) { conf.set("mapred.child.java.opts", mapChildJavaOpts); conf.set("mapred.map.child.java.opts", mapChildJavaOpts); // MapR 2.x requires this. } @@ -1157,22 +1166,22 @@ private int run2(String[] args) throws Exception { } conf.set("mapreduce.client.genericoptionsparser.used", "true"); - if (! usingYarn()) { + if (!usingYarn()) { conf.set("mapred.used.genericoptionsparser", "true"); } conf.set("mapreduce.map.speculative", "false"); - if (! usingYarn()) { + if (!usingYarn()) { conf.set("mapred.map.tasks.speculative.execution", "false"); } conf.set("mapreduce.map.maxattempts", "1"); - if (! usingYarn()) { + if (!usingYarn()) { conf.set("mapred.map.max.attempts", "1"); } conf.set("mapreduce.job.jvm.numtasks", "1"); - if (! usingYarn()) { + if (!usingYarn()) { conf.set("mapred.job.reuse.jvm.num.tasks", "1"); } @@ -1199,7 +1208,7 @@ private int run2(String[] args) throws Exception { if (flowDir != null) { addMapperArg(conf, "-flow_dir", flowDir); } - if((new File(".h2o_no_collect")).exists() || (new File(System.getProperty("user.home")+"/.h2o_no_collect")).exists()) { + if ((new File(".h2o_no_collect")).exists() || (new File(System.getProperty("user.home") + "/.h2o_no_collect")).exists()) { addMapperArg(conf, "-ga_opt_out"); } String hadoopVersion = calcHadoopVersion(); @@ -1240,6 +1249,16 @@ private int run2(String[] args) throws Exception { addMapperConf(conf, "-login_conf", "login.conf", krbConfData); } + // SSL + if (null != securityConf && !securityConf.isEmpty()) { + addMapperConf(conf, "-internal_security_conf", "security.config", securityConf); + } else if(internal_secure_connections) { + SecurityUtils.SSLCredentials credentials = SecurityUtils.generateSSLPair(); + String sslConfigFile = SecurityUtils.generateSSLConfig(credentials); + addMapperConf(conf, "", credentials.jks.name, credentials.jks.getLocation()); + addMapperConf(conf, "-internal_security_conf", "default-security.config", sslConfigFile); + } + conf.set(h2omapper.H2O_MAPPER_CONF_LENGTH, Integer.toString(mapperConfLength)); // Set up job stuff. diff --git a/h2o-hadoop/h2o-mapreduce-generic/src/main/java/water/hadoop/h2omapper.java b/h2o-hadoop/h2o-mapreduce-generic/src/main/java/water/hadoop/h2omapper.java index c49568c165c..770bdb5e990 100644 --- a/h2o-hadoop/h2o-mapreduce-generic/src/main/java/water/hadoop/h2omapper.java +++ b/h2o-hadoop/h2o-mapreduce-generic/src/main/java/water/hadoop/h2omapper.java @@ -4,6 +4,7 @@ import java.net.*; import java.util.List; import java.util.ArrayList; +import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; @@ -273,7 +274,10 @@ private int run2(Context context) throws IOException, InterruptedException { int confLength = Integer.parseInt(conf.get(H2O_MAPPER_CONF_LENGTH)); for (int i = 0; i < confLength; i++) { String arg = conf.get(H2O_MAPPER_CONF_ARG_BASE + Integer.toString(i)); - argsList.add(arg); + // For files which are not passed as args (i.e. SSL certs) + if(null != arg && !arg.isEmpty()) { + argsList.add(arg); + } String basename = conf.get(H2O_MAPPER_CONF_BASENAME_BASE + Integer.toString(i)); File f = new File(ice_root); @@ -287,11 +291,17 @@ private int run2(Context context) throws IOException, InterruptedException { Log.POST(104, "after mkdirs()"); } String fileName = ice_root + File.separator + basename; - FileOutputStream out = new FileOutputStream(fileName); String payload = conf.get(H2O_MAPPER_CONF_PAYLOAD_BASE + Integer.toString(i)); byte[] byteArr = h2odriver.convertStringToByteArr(payload); h2odriver.writeBinaryFile(fileName, byteArr); - argsList.add(fileName); + if(null != arg && !arg.isEmpty()) { + argsList.add(fileName); + } + + // Need to modify this config here as we don't know the destination dir for keys when generating it + if("default-security.config".equals(basename)) { + modifyKeyPath(fileName, ice_root); + } } String[] args = argsList.toArray(new String[argsList.size()]); @@ -334,6 +344,47 @@ private int run2(Context context) throws IOException, InterruptedException { return exitStatus; } + //============================================================================== + // SSL RELATED METHODS + //============================================================================== + private void modifyKeyPath(String fileName, String ice_root) throws IOException { + FileInputStream in = null; + Properties sslProps; + try { + in = new FileInputStream(fileName); + sslProps = new Properties(); + sslProps.load(in); + } finally { + if (in != null) { + in.close(); + } + } + + subPath("h2o_ssl_jks_internal", sslProps, ice_root); + subPath("h2o_ssl_jts", sslProps, ice_root); + + FileOutputStream out = null; + try { + out = new FileOutputStream(fileName); + sslProps.store(out, null); + } finally { + if (out != null) { + out.close(); + } + } + } + + //============================================================================== + //============================================================================== + + private void subPath(String prop, Properties sslProps, String ice_root) { + String path = sslProps.getProperty(prop); + // Change only auto generated path. Don't allow the user to use "h2o-internal.jks" as path + if(null != path && "h2o-internal.jks".equals(path)) { + sslProps.setProperty(prop, ice_root + File.separator + path); + } + } + @Override public void run(Context context) throws IOException, InterruptedException { try { diff --git a/h2o-parsers/h2o-avro-parser/testMultiNode.sh b/h2o-parsers/h2o-avro-parser/testMultiNode.sh index a04687ebd0a..8a8e4935d1d 100755 --- a/h2o-parsers/h2o-avro-parser/testMultiNode.sh +++ b/h2o-parsers/h2o-avro-parser/testMultiNode.sh @@ -1,4 +1,5 @@ #!/bin/bash +source ../../multiNodeUtils.sh # Clean out any old sandbox, make a new one OUTDIR=sandbox @@ -80,14 +81,11 @@ echo $DOONLY > $OUTDIR/tests.doonly.txt # Launch 4 helper JVMs. All output redir'd at the OS level to sandbox files. CLUSTER_NAME=junit_cluster_$$ CLUSTER_BASEPORT=44000 -$JVM water.H2O -name $CLUSTER_NAME -baseport $CLUSTER_BASEPORT -ga_opt_out 1> $OUTDIR/out.1 2>&1 & PID_1=$! -$JVM water.H2O -name $CLUSTER_NAME -baseport $CLUSTER_BASEPORT -ga_opt_out 1> $OUTDIR/out.2 2>&1 & PID_2=$! -$JVM water.H2O -name $CLUSTER_NAME -baseport $CLUSTER_BASEPORT -ga_opt_out 1> $OUTDIR/out.3 2>&1 & PID_3=$! -$JVM water.H2O -name $CLUSTER_NAME -baseport $CLUSTER_BASEPORT -ga_opt_out 1> $OUTDIR/out.4 2>&1 & PID_4=$! +runCluster # Launch last driver JVM. All output redir'd at the OS level to sandbox files. echo Running h2o-avro-parser junit tests... -($JVM -Ddoonly.tests=$DOONLY -Dbuild.id=$BUILD_ID -Dignore.tests=$IGNORE -Djob.name=$JOB_NAME -Dgit.commit=$GIT_COMMIT -Dgit.branch=$GIT_BRANCH -Dai.h2o.name=$CLUSTER_NAME -Dai.h2o.baseport=$CLUSTER_BASEPORT -Dai.h2o.ga_opt_out=yes $JUNIT_RUNNER `cat $OUTDIR/tests.txt` 2>&1 ; echo $? > $OUTDIR/status.0) 1> $OUTDIR/out.0 2>&1 +($JVM $TEST_SSL -Ddoonly.tests=$DOONLY -Dbuild.id=$BUILD_ID -Dignore.tests=$IGNORE -Djob.name=$JOB_NAME -Dgit.commit=$GIT_COMMIT -Dgit.branch=$GIT_BRANCH -Dai.h2o.name=$CLUSTER_NAME -Dai.h2o.baseport=$CLUSTER_BASEPORT -Dai.h2o.ga_opt_out=yes $JUNIT_RUNNER `cat $OUTDIR/tests.txt` 2>&1 ; echo $? > $OUTDIR/status.0) 1> $OUTDIR/out.0 2>&1 grep EXECUTION $OUTDIR/out.0 | sed -e "s/.*TEST \(.*\) EXECUTION TIME: \(.*\) (Wall.*/\2 \1/" | sort -gr | head -n 10 >> $OUTDIR/out.0 diff --git a/h2o-scala/testMultiNode.sh b/h2o-scala/testMultiNode.sh index 4c6cadb0331..51098371921 100755 --- a/h2o-scala/testMultiNode.sh +++ b/h2o-scala/testMultiNode.sh @@ -1,4 +1,5 @@ #!/bin/bash +source ../multiNodeUtils.sh # Argument parsing if [ "$1" = "jacoco" ] @@ -89,10 +90,7 @@ echo $DOONLY > $OUTDIR/tests.doonly.txt # Launch 4 helper JVMs. All output redir'd at the OS level to sandbox files. CLUSTER_NAME=junit_cluster_$$ CLUSTER_BASEPORT=45000 -$JVM water.H2O -name $CLUSTER_NAME -baseport $CLUSTER_BASEPORT --ga_opt_out 1> $OUTDIR/out.1 2>&1 & PID_1=$! -$JVM water.H2O -name $CLUSTER_NAME -baseport $CLUSTER_BASEPORT --ga_opt_out 1> $OUTDIR/out.2 2>&1 & PID_2=$! -$JVM water.H2O -name $CLUSTER_NAME -baseport $CLUSTER_BASEPORT --ga_opt_out 1> $OUTDIR/out.3 2>&1 & PID_3=$! -$JVM water.H2O -name $CLUSTER_NAME -baseport $CLUSTER_BASEPORT --ga_opt_out 1> $OUTDIR/out.4 2>&1 & PID_4=$! +runCluster # If coverage is being run, then pass a system variable flag so that timeout limits are increased. if [ $JACOCO_ENABLED = true ] @@ -104,7 +102,6 @@ fi # Launch last driver JVM. All output redir'd at the OS level to sandbox files. echo Running h2o-scala junit tests... - -($JVM -Ddoonly.tests=$DOONLY -Dignore.tests=$IGNORE -Dbuild.id=$BUILD_ID -Djob.name=$JOB_NAME -Dgit.commit=$GIT_COMMIT -Dgit.branch=$GIT_BRANCH -Dai.h2o.name=$CLUSTER_NAME -Dai.h2o.baseport=$CLUSTER_BASEPORT -Dai.h2o.ga_opt_out=yes $JACOCO_FLAG $JUNIT_RUNNER `cat $OUTDIR/tests.txt` 2>&1 ; echo $? > $OUTDIR/status.0) 1> $OUTDIR/out.0 2>&1 +($JVM $TEST_SSL -Ddoonly.tests=$DOONLY -Dignore.tests=$IGNORE -Dbuild.id=$BUILD_ID -Djob.name=$JOB_NAME -Dgit.commit=$GIT_COMMIT -Dgit.branch=$GIT_BRANCH -Dai.h2o.name=$CLUSTER_NAME -Dai.h2o.baseport=$CLUSTER_BASEPORT -Dai.h2o.ga_opt_out=yes $JACOCO_FLAG $JUNIT_RUNNER `cat $OUTDIR/tests.txt` 2>&1 ; echo $? > $OUTDIR/status.0) 1> $OUTDIR/out.0 2>&1 cleanup diff --git a/multiNodeUtils.sh b/multiNodeUtils.sh new file mode 100644 index 00000000000..244a8fe33a8 --- /dev/null +++ b/multiNodeUtils.sh @@ -0,0 +1,20 @@ +#!/bin/bash +SSL="" +TEST_SSL="" +if [[ "$@" == "ssl" ]]; then + if [ ! -f "../h2o-algos/src/test/resources/ssl.properties" ]; then + SSL="-internal_security_conf ../../h2o-algos/src/test/resources/ssl2.properties" + TEST_SSL="-Dai.h2o.internal_security_conf=../../h2o-algos/src/test/resources/ssl2.properties" + else + SSL="-internal_security_conf ../h2o-algos/src/test/resources/ssl.properties" + TEST_SSL="-Dai.h2o.internal_security_conf=../h2o-algos/src/test/resources/ssl.properties" + fi + +fi + +function runCluster () { + $JVM water.H2O -name $CLUSTER_NAME -baseport $CLUSTER_BASEPORT -ga_opt_out $SSL 1> $OUTDIR/out.1 2>&1 & PID_1=$! + $JVM water.H2O -name $CLUSTER_NAME -baseport $CLUSTER_BASEPORT -ga_opt_out $SSL 1> $OUTDIR/out.2 2>&1 & PID_2=$! + $JVM water.H2O -name $CLUSTER_NAME -baseport $CLUSTER_BASEPORT -ga_opt_out $SSL 1> $OUTDIR/out.3 2>&1 & PID_3=$! + $JVM water.H2O -name $CLUSTER_NAME -baseport $CLUSTER_BASEPORT -ga_opt_out $SSL 1> $OUTDIR/out.4 2>&1 & PID_4=$! +} \ No newline at end of file diff --git a/scripts/run.py b/scripts/run.py index 3610ea8b1e0..c4145cf672d 100755 --- a/scripts/run.py +++ b/scripts/run.py @@ -262,7 +262,7 @@ class H2OCloudNode(object): """ def __init__(self, is_client, cloud_num, nodes_per_cloud, node_num, cloud_name, h2o_jar, ip, base_port, - xmx, cp, output_dir): + xmx, cp, output_dir, test_ssl): """ Create a node in a cloud. @@ -296,6 +296,8 @@ def __init__(self, is_client, cloud_num, nodes_per_cloud, node_num, cloud_name, self.child = None self.terminated = False + self.test_ssl = test_ssl + # Choose my base port number here. All math is done here. Every node has the same # base_port and calculates it's own my_base_port. ports_per_node = 2 @@ -339,7 +341,6 @@ def start(self): "-baseport", str(self.my_base_port), "-ga_opt_out"] - # If the jacoco flag was included, then modify cmd to generate coverage # data using the jacoco agent if g_jacoco_include: @@ -359,6 +360,12 @@ def start(self): cmd = cmd[:1] + [jacoco] + cmd[1:] + if self.test_ssl: + cmd.append("-internal_security_conf") + if g_convenient: + cmd.append("../h2o-algos/src/test/resources/ssl.properties") + else: + cmd.append("../../../h2o-algos/src/test/resources/ssl3.properties") # Add S3N credentials to cmd if they exist. # ec2_hdfs_config_file_name = os.path.expanduser("~/.ec2/core-site.xml") @@ -515,7 +522,7 @@ class H2OCloud(object): A class representing one of the H2O clouds. """ - def __init__(self, cloud_num, use_client, nodes_per_cloud, h2o_jar, base_port, xmx, cp, output_dir): + def __init__(self, cloud_num, use_client, nodes_per_cloud, h2o_jar, base_port, xmx, cp, output_dir, test_ssl): """ Create a cloud. See node definition above for argument descriptions. @@ -530,6 +537,7 @@ def __init__(self, cloud_num, use_client, nodes_per_cloud, h2o_jar, base_port, x self.xmx = xmx self.cp = cp self.output_dir = output_dir + self.test_ssl = test_ssl # Randomly choose a seven digit cloud number. n = random.randint(1000000, 9999999) @@ -556,7 +564,8 @@ def __init__(self, cloud_num, use_client, nodes_per_cloud, h2o_jar, base_port, x self.cloud_name, self.h2o_jar, "127.0.0.1", self.base_port, - self.xmx, self.cp, self.output_dir) + self.xmx, self.cp, self.output_dir, + self.test_ssl) if is_client: self.client_nodes.append(node) else: @@ -977,7 +986,7 @@ def __init__(self, use_cloud, use_cloud2, use_client, cloud_config, use_ip, use_port, num_clouds, nodes_per_cloud, h2o_jar, base_port, xmx, cp, output_dir, failed_output_dir, path_to_tar, path_to_whl, produce_unit_reports, - testreport_dir, r_pkg_ver_chk, hadoop_namenode, on_hadoop, perf): + testreport_dir, r_pkg_ver_chk, hadoop_namenode, on_hadoop, perf, test_ssl): """ Create a runner. @@ -1015,6 +1024,8 @@ def __init__(self, self.use_ip = use_ip self.use_port = use_port + self.test_ssl = test_ssl + # Valid if use_cloud is False self.num_clouds = num_clouds self.nodes_per_cloud = nodes_per_cloud @@ -1065,7 +1076,7 @@ def __init__(self, else: for i in range(self.num_clouds): cloud = H2OCloud(i, self.use_client, self.nodes_per_cloud, h2o_jar, self.base_port, xmx, cp, - self.output_dir) + self.output_dir, self.test_ssl) self.clouds.append(cloud) @staticmethod @@ -1973,6 +1984,7 @@ def _suspect_cloud(self, ip, port): g_job_name = None g_py3 = False g_pycoverage = False +g_test_ssl = False # globals added to support better reporting in xml files g_use_xml2 = False # by default, use the original xml file output @@ -2105,6 +2117,8 @@ def usage(): print("") print(" --geterrs Generate xml file that contains the actual unit test errors and the actual Java error.") print("") + print(" --test.ssl Runs all the nodes with SSL enabled.") + print("") print(" If neither --test nor --testlist is specified, then the list of tests is") print(" discovered automatically as files matching '*runit*.R'.") print("") @@ -2222,6 +2236,7 @@ def parse_args(argv): global g_py3 global g_pycoverage global g_use_xml2 + global g_test_ssl i = 1 while i < len(argv): @@ -2388,6 +2403,8 @@ def parse_args(argv): g_job_name = argv[i] elif s == "--geterrs": g_use_xml2 = True + elif s == "--test_ssl": + g_test_ssl = True else: unknown_arg(s) @@ -2512,6 +2529,7 @@ def main(argv): global g_ncpu global g_os global g_job_name + global g_test_ssl g_script_name = os.path.basename(argv[0]) @@ -2560,7 +2578,7 @@ def main(argv): g_use_cloud, g_use_cloud2, g_use_client, g_config, g_use_ip, g_use_port, g_num_clouds, g_nodes_per_cloud, h2o_jar, g_base_port, g_jvm_xmx, g_jvm_cp, g_output_dir, g_failed_output_dir, g_path_to_tar, g_path_to_whl, g_produce_unit_reports, - testreport_dir, g_r_pkg_ver_chk, g_hadoop_namenode, g_on_hadoop, g_perf) + testreport_dir, g_r_pkg_ver_chk, g_hadoop_namenode, g_on_hadoop, g_perf, g_test_ssl) # Build test list. if g_exclude_list_file is not None: diff --git a/scripts/run_hadoop_job.py b/scripts/run_hadoop_job.py index 8b4f7039a29..03879361613 100755 --- a/scripts/run_hadoop_job.py +++ b/scripts/run_hadoop_job.py @@ -27,7 +27,7 @@ def is_flow(file_name): class H2OCloud: - def __init__(self, abspath_tempdir, driver, nodes_per_cloud, xmx, output_dir): + def __init__(self, abspath_tempdir, driver, nodes_per_cloud, xmx, output_dir, test_ssl): self.ip = None self.port = None self.job_id = None @@ -36,6 +36,7 @@ def __init__(self, abspath_tempdir, driver, nodes_per_cloud, xmx, output_dir): self.xmx = xmx self.output_dir = output_dir self.notify_file = os.path.join(abspath_tempdir, "notify.txt") + self.test_ssl = test_ssl def start(self): print_barrier() @@ -45,6 +46,10 @@ def start(self): '-output', self.output_dir, '-notify', self.notify_file, '-disown'] + + if self.test_ssl: + cmd.append("-internal_security_conf") + print("+ CMD: " + str(cmd)) returncode = subprocess.call(cmd) @@ -112,6 +117,7 @@ def main(): parser.add_argument("-mapperXmx", "--mapperXmx", help="Size of each H2O node", required=True) parser.add_argument("-script", "--script", help="Name of script to run", required=True) parser.add_argument("-output", "--output", help="HDFS temp output dir", required=True) + parser.add_argument("-test_ssl", "--test_ssl", help="Testing with SSL enabled", required=True) args = parser.parse_args() print_barrier() print ("Path to h2odriver: " + args.driver) @@ -119,9 +125,10 @@ def main(): print ("Size of each node: " + args.mapperXmx) print ("Script to run: " + args.script) print ("HDFS output dir: " + args.output) + print ("SSL on: " + args.test_ssl) abspath_tempdir = tempfile.mkdtemp() - g_runner = H2OCloud(abspath_tempdir, args.driver, args.nodes, args.mapperXmx, args.output) + g_runner = H2OCloud(abspath_tempdir, args.driver, args.nodes, args.mapperXmx, args.output, args.test_ssl) # Handle killing the runner. signal.signal(signal.SIGINT, signal_handler)