diff --git a/.gitignore b/.gitignore index 7642f5086..8df040bc4 100644 --- a/.gitignore +++ b/.gitignore @@ -44,3 +44,5 @@ examples/bigdata # generated Dockerfile docker/sparkling-water/base/Dockerfile +# data generated by one of the tests +spark-tfidf-data/ diff --git a/build.gradle b/build.gradle index ace79c443..99eeb3012 100644 --- a/build.gradle +++ b/build.gradle @@ -89,7 +89,7 @@ configure(subprojects) { project -> repositories { // Should be enabled only in development mode if (h2oBuild == '99999') mavenLocal() - + mavenCentral() maven { diff --git a/core/build.gradle b/core/build.gradle index 9fa0d3ff6..9e5289efb 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -26,7 +26,6 @@ dependencies { } // Spark components - // - core compile "org.apache.spark:spark-core_${scalaBaseVersion}:${sparkVersion}" // - SQL component @@ -98,5 +97,4 @@ integTest { // Pass list of jars required for testing systemProperty "sparkling.assembly.jar", "${project(":sparkling-water-assembly").configurations.shadow.artifacts.file.join(',')}" systemProperty "sparkling.itest.jar", "${integTestJar.archivePath}" -} - +} \ No newline at end of file diff --git a/core/src/main/scala/org/apache/spark/h2o/H2OContext.scala b/core/src/main/scala/org/apache/spark/h2o/H2OContext.scala index ded46a59e..ae6064116 100644 --- a/core/src/main/scala/org/apache/spark/h2o/H2OContext.scala +++ b/core/src/main/scala/org/apache/spark/h2o/H2OContext.scala @@ -58,9 +58,7 @@ import scala.util.control.NoStackTrace * @param conf H2O configuration */ class H2OContext private (@transient val sparkContext: SparkContext, @transient conf: H2OConf) extends org.apache.spark.Logging - with Serializable with SparkDataFrameConverter with SupportedRDDConverter with H2OContextUtils{ - self => - + with Serializable with SparkDataFrameConverter with SupportedRDDConverter with H2OContextUtils { self => /** IP of H2O client */ private var localClientIp: String = _ @@ -85,7 +83,7 @@ class H2OContext private (@transient val sparkContext: SparkContext, @transient * otherwise it creates new H2O cluster living in Spark */ def init(): H2OContext = { - if(!isRunningOnCorrectSpark(sparkContext)){ + if (!isRunningOnCorrectSpark(sparkContext)) { throw new WrongSparkVersion(s"You are trying to use Sparkling Water built for Spark $buildSparkMajorVersion," + s" but your $$SPARK_HOME(=${sparkContext.getSparkHome().getOrElse("SPARK_HOME is not defined!")}) property" + s" points to Spark of version ${sparkContext.version}. Please ensure correct Spark is provided and" + @@ -125,13 +123,13 @@ class H2OContext private (@transient val sparkContext: SparkContext, @transient /** Transform DataFrame to H2OFrame */ def asH2OFrame(df : DataFrame): H2OFrame = asH2OFrame(df, None) - def asH2OFrame(df : DataFrame, frameName: Option[String]) : H2OFrame = toH2OFrame(this, df, frameName) - def asH2OFrame(df : DataFrame, frameName: String) : H2OFrame = asH2OFrame(df, Option(frameName)) + def asH2OFrame(df : DataFrame, frameName: Option[String]): H2OFrame = toH2OFrame(this, df, frameName) + def asH2OFrame(df : DataFrame, frameName: String): H2OFrame = asH2OFrame(df, Option(frameName)) /** Transform DataFrame to H2OFrame key */ def toH2OFrameKey(df : DataFrame): Key[Frame] = toH2OFrameKey(df, None) - def toH2OFrameKey(df : DataFrame, frameName: Option[String]) : Key[Frame] = asH2OFrame(df, frameName)._key - def toH2OFrameKey(df : DataFrame, frameName: String) : Key[Frame] = toH2OFrameKey(df, Option(frameName)) + def toH2OFrameKey(df : DataFrame, frameName: Option[String]): Key[Frame] = asH2OFrame(df, frameName)._key + def toH2OFrameKey(df : DataFrame, frameName: String): Key[Frame] = toH2OFrameKey(df, Option(frameName)) /** Create a new H2OFrame based on existing Frame referenced by its key.*/ def asH2OFrame(s: String): H2OFrame = new H2OFrame(s) @@ -145,7 +143,7 @@ class H2OContext private (@transient val sparkContext: SparkContext, @transient * in case we are RDD[T] where T is class defined in REPL. This is because class T is created as inner class * and we are not able to create instance of class T without outer scope - which is impossible to get. * */ - def asRDD[A <: Product: TypeTag: ClassTag](fr : H2OFrame) : RDD[A] = toRDD[A, H2OFrame](this, fr) + def asRDD[A <: Product : TypeTag : ClassTag](fr: H2OFrame): RDD[A] = toRDD[A, H2OFrame](this, fr) /** A generic convert of Frame into Product RDD type * @@ -161,9 +159,9 @@ class H2OContext private (@transient val sparkContext: SparkContext, @transient /** Convert given H2O frame into DataFrame type */ @deprecated("1.3", "Use asDataFrame") - def asSchemaRDD[T <: Frame](fr : T, copyMetadata: Boolean = true)(implicit sqlContext: SQLContext) : DataFrame = toDataFrame(this, fr, copyMetadata) - def asDataFrame[T <: Frame](fr : T, copyMetadata: Boolean = true)(implicit sqlContext: SQLContext) : DataFrame = toDataFrame(this, fr, copyMetadata) - def asDataFrame(s : String, copyMetadata: Boolean)(implicit sqlContext: SQLContext) : DataFrame = toDataFrame(this, new H2OFrame(s), copyMetadata) + def asSchemaRDD[T <: Frame](fr: T, copyMetadata: Boolean = true)(implicit sqlContext: SQLContext): DataFrame = toDataFrame(this, fr, copyMetadata) + def asDataFrame[T <: Frame](fr: T, copyMetadata: Boolean = true)(implicit sqlContext: SQLContext): DataFrame = toDataFrame(this, fr, copyMetadata) + def asDataFrame(s: String, copyMetadata: Boolean)(implicit sqlContext: SQLContext): DataFrame = toDataFrame(this, new H2OFrame(s), copyMetadata) def h2oLocalClient = this.localClientIp + ":" + this.localClientPort @@ -210,7 +208,7 @@ class H2OContext private (@transient val sparkContext: SparkContext, @transient // scalastyle:on } -object H2OContext extends Logging{ +object H2OContext extends Logging { private[H2OContext] def setInstantiatedContext(h2oContext: H2OContext): Unit = { synchronized { diff --git a/core/src/main/scala/org/apache/spark/h2o/backends/internal/InternalH2OBackend.scala b/core/src/main/scala/org/apache/spark/h2o/backends/internal/InternalH2OBackend.scala index d29947663..1887472a8 100644 --- a/core/src/main/scala/org/apache/spark/h2o/backends/internal/InternalH2OBackend.scala +++ b/core/src/main/scala/org/apache/spark/h2o/backends/internal/InternalH2OBackend.scala @@ -109,7 +109,7 @@ class InternalH2OBackend(@transient val hc: H2OContext) extends SparklingBackend H2O.waitForCloudSize(executors.length, hc.getConf.cloudTimeout) // Register web API for client - RestAPIManager.registerClientWebAPI(hc) + RestAPIManager(hc).registerAll() H2O.finalizeRegistration() executors } diff --git a/core/src/main/scala/water/api/RestAPIManager.scala b/core/src/main/scala/water/api/RestAPIManager.scala index 0a78564bf..93f06b858 100644 --- a/core/src/main/scala/water/api/RestAPIManager.scala +++ b/core/src/main/scala/water/api/RestAPIManager.scala @@ -17,6 +17,8 @@ package water.api +import java.util.ServiceLoader + import org.apache.spark.SparkContext import org.apache.spark.h2o.H2OContext import water.api.DataFrames.DataFramesHandler @@ -24,9 +26,34 @@ import water.api.H2OFrames.H2OFramesHandler import water.api.RDDs.RDDsHandler import water.api.scalaInt.ScalaCodeHandler +trait RestApi { + def register(h2oContext: H2OContext): Unit +} + +private[api] class RestAPIManager(hc: H2OContext) { + private val loader: ServiceLoader[RestApi] = ServiceLoader.load(classOf[RestApi]) + + def registerAll(): Unit = { + // Register first the core + register(CoreRestApi) + // Then additional APIs + import scala.collection.JavaConversions._ + loader.reload() + loader.foreach(api => register(api)) + } + + def register(api: RestApi): Unit = { + api.register(hc) + } +} object RestAPIManager { - def registerClientWebAPI(h2oContext: H2OContext): Unit = { + def apply(hc: H2OContext) = new RestAPIManager(hc) +} + +private object CoreRestApi extends RestApi { + + def register(h2oContext: H2OContext): Unit = { if(h2oContext.getConf.isH2OReplEnabled){ registerScalaIntEndp(h2oContext.sparkContext, h2oContext) } @@ -44,8 +71,8 @@ object RestAPIManager { } RequestServer.registerEndpoint("getDataFrame", "POST", "/3/h2oframes/{h2oframe_id}/dataframe", - classOf[H2OFramesHandler], "toDataFrame", "Transform H2OFrame with given ID to Spark's DataFrame", - h2oFramesFactory) + classOf[H2OFramesHandler], "toDataFrame", "Transform H2OFrame with given ID to Spark's DataFrame", + h2oFramesFactory) } @@ -56,14 +83,14 @@ object RestAPIManager { def rddsFactory = new HandlerFactory { override def create(aClass: Class[_ <: Handler]): Handler = rddsHandler } - RequestServer.registerEndpoint("listRDDs", "GET", "/3/RDDs", classOf[RDDsHandler], "list", - "Return all RDDs within Spark cloud", rddsFactory) + RequestServer.registerEndpoint("listRDDs", "GET", "/3/RDDs", classOf[RDDsHandler], "list", + "Return all RDDs within Spark cloud", rddsFactory) RequestServer.registerEndpoint("getRDD", "POST", "/3/RDDs/{rdd_id}", classOf[RDDsHandler], - "getRDD", "Get RDD with the given ID from Spark cloud", rddsFactory) + "getRDD", "Get RDD with the given ID from Spark cloud", rddsFactory) RequestServer.registerEndpoint("rddToH2OFrame", "POST", "/3/RDDs/{rdd_id}/h2oframe", - classOf[RDDsHandler], "toH2OFrame", "Transform RDD with the given ID to H2OFrame", rddsFactory) + classOf[RDDsHandler], "toH2OFrame", "Transform RDD with the given ID to H2OFrame", rddsFactory) } @@ -76,14 +103,14 @@ object RestAPIManager { } RequestServer.registerEndpoint("listDataFrames", "GET", "/3/dataframes", - classOf[DataFramesHandler], "list", "Return all Spark's DataFrames", dataFramesfactory) + classOf[DataFramesHandler], "list", "Return all Spark's DataFrames", dataFramesfactory) RequestServer.registerEndpoint("getDataFrame", "POST", "/3/dataframes/{dataframe_id}", - classOf[DataFramesHandler], "getDataFrame", "Get Spark's DataFrame with the given ID", dataFramesfactory) + classOf[DataFramesHandler], "getDataFrame", "Get Spark's DataFrame with the given ID", dataFramesfactory) RequestServer.registerEndpoint("dataFrametoH2OFrame", "POST", - "/3/dataframes/{dataframe_id}/h2oframe", classOf[DataFramesHandler], "toH2OFrame", - "Transform Spark's DataFrame with the given ID to H2OFrame", dataFramesfactory) + "/3/dataframes/{dataframe_id}/h2oframe", classOf[DataFramesHandler], "toH2OFrame", + "Transform Spark's DataFrame with the given ID to H2OFrame", dataFramesfactory) } @@ -93,18 +120,18 @@ object RestAPIManager { override def create(aClass: Class[_ <: Handler]): Handler = scalaCodeHandler } RequestServer.registerEndpoint("interpretScalaCode", "POST" ,"/3/scalaint/{session_id}", - classOf[ScalaCodeHandler], "interpret", "Interpret the code and return the result", - scalaCodeFactory) + classOf[ScalaCodeHandler], "interpret", "Interpret the code and return the result", + scalaCodeFactory) RequestServer.registerEndpoint("initScalaSession", "POST", "/3/scalaint", - classOf[ScalaCodeHandler], "initSession", "Return session id for communication with scala interpreter", - scalaCodeFactory) + classOf[ScalaCodeHandler], "initSession", "Return session id for communication with scala interpreter", + scalaCodeFactory) RequestServer.registerEndpoint("getScalaSessions", "GET" ,"/3/scalaint", - classOf[ScalaCodeHandler], "getSessions", "Return all active session IDs", scalaCodeFactory) + classOf[ScalaCodeHandler], "getSessions", "Return all active session IDs", scalaCodeFactory) RequestServer.registerEndpoint("destroyScalaSession", "DELETE", "/3/scalaint/{session_id}", - classOf[ScalaCodeHandler], "destroySession", "Return session id for communication with scala interpreter", - scalaCodeFactory) + classOf[ScalaCodeHandler], "destroySession", "Return session id for communication with scala interpreter", + scalaCodeFactory) } } diff --git a/examples/flows/Spark_SVM_Model.flow b/examples/flows/Spark_SVM_Model.flow new file mode 100644 index 000000000..235eb17e1 --- /dev/null +++ b/examples/flows/Spark_SVM_Model.flow @@ -0,0 +1,45 @@ +{ + "version": "1.0.0", + "cells": [ + { + "type": "sca", + "input": "import org.apache.spark.mllib.linalg.Vectors\nimport scala.util.Random\nval df = sc.parallelize(1 to 50).map(v => {\n val values = Array.fill(5){0}.map(x => Random.nextDouble())\n // Use this for regression instead of binomial. Threshold has to be set to NaN during model building\n //val label = Math.round(Random.nextDouble())\n val label = if(Math.round(Random.nextDouble()) > 0.5) \"1\" else \"0\"\n (label, Vectors.dense(values))\n }).toDF(\"Label\", \"Vector\")\n\nval hc = H2OContext.getOrCreate(sc)\nhc.asH2OFrame(df, \"bubbles\")\nval weightsDF = sc.parallelize(Array(Tuple1(Vectors.dense(1,1,1,1,1)))).toDF(\"Vector\")\nhc.asH2OFrame(weightsDF, \"weights\")" + }, + { + "type": "cs", + "input": "changeColumnType frame: \"bubbles\", column: \"Label\", type: 'enum'" + }, + { + "type": "cs", + "input": "buildModel \"svm\"" + }, + { + "type": "cs", + "input": "buildModel 'svm', {\"model_id\":\"test1\",\"training_frame\":\"bubbles\",\"response_column\":\"Label\",\"initial_weights_frame\":\"weights\",\"nfolds\":0,\"add_intercept\":false,\"step_size\":1,\"reg_param\":0.01,\"convergence_tol\":0.001,\"mini_batch_fraction\":1,\"threshold\":0,\"updater\":\"L2\",\"gradient\":\"Hinge\",\"ignored_columns\":[],\"ignore_const_cols\":true}" + }, + { + "type": "cs", + "input": "getModel \"test1\"" + }, + { + "type": "cs", + "input": "predict model: \"test1\"" + }, + { + "type": "cs", + "input": "predict model: \"test1\", frame: \"bubbles\", predictions_frame: \"prediction-test1\"" + }, + { + "type": "cs", + "input": "bindFrames \"combined-prediction-test1\", [ \"prediction-test1\", \"bubbles\" ]" + }, + { + "type": "cs", + "input": "getFrameSummary \"combined-prediction-test1\"" + }, + { + "type": "cs", + "input": "getFrameData \"combined-prediction-test1\"" + } + ] +} \ No newline at end of file diff --git a/examples/smalldata/bcwd.csv b/examples/smalldata/bcwd.csv new file mode 100644 index 000000000..56d4032dc --- /dev/null +++ b/examples/smalldata/bcwd.csv @@ -0,0 +1,700 @@ +clump_thickness,uniformity_cell_size,uniformity_cell_shape,marginal_adhesion,single_epithelial_cell_size,bare_nuclei,bland_chromatin,normal_nucleoli,mitoses,label +5,1,1,1,2,1,3,1,1,B +5,4,4,5,7,10,3,2,1,B +3,1,1,1,2,2,3,1,1,B +6,8,8,1,3,4,3,7,1,B +4,1,1,3,2,1,3,1,1,B +8,10,10,8,7,10,9,7,1,M +1,1,1,1,2,10,3,1,1,B +2,1,2,1,2,1,3,1,1,B +2,1,1,1,2,1,1,1,5,B +4,2,1,1,2,1,2,1,1,B +1,1,1,1,1,1,3,1,1,B +2,1,1,1,2,1,2,1,1,B +5,3,3,3,2,3,4,4,1,M +1,1,1,1,2,3,3,1,1,B +8,7,5,10,7,9,5,5,4,M +7,4,6,4,6,1,4,3,1,M +4,1,1,1,2,1,2,1,1,B +4,1,1,1,2,1,3,1,1,B +10,7,7,6,4,10,4,1,2,M +6,1,1,1,2,1,3,1,1,B +7,3,2,10,5,10,5,4,4,M +10,5,5,3,6,7,7,10,1,M +3,1,1,1,2,1,2,1,1,B +8,4,5,1,2,5,7,3,1,M +1,1,1,1,2,1,3,1,1,B +5,2,3,4,2,7,3,6,1,M +3,2,1,1,1,1,2,1,1,B +5,1,1,1,2,1,2,1,1,B +2,1,1,1,2,1,2,1,1,B +1,1,3,1,2,1,1,1,1,B +3,1,1,1,1,1,2,1,1,B +2,1,1,1,2,1,3,1,1,B +10,7,7,3,8,5,7,4,3,M +2,1,1,2,2,1,3,1,1,B +3,1,2,1,2,1,2,1,1,B +2,1,1,1,2,1,2,1,1,B +10,10,10,8,6,1,8,9,1,M +6,2,1,1,1,1,7,1,1,B +5,4,4,9,2,10,5,6,1,M +2,5,3,3,6,7,7,5,1,M +6,6,6,9,6,5,7,8,1,B +10,4,3,1,3,3,6,5,2,M +6,10,10,2,8,10,7,3,3,M +5,6,5,6,10,1,3,1,1,M +10,10,10,4,8,1,8,10,1,M +1,1,1,1,2,1,2,1,2,B +3,7,7,4,4,9,4,8,1,M +1,1,1,1,2,1,2,1,1,B +4,1,1,3,2,1,3,1,1,B +7,8,7,2,4,8,3,8,2,M +9,5,8,1,2,3,2,1,5,M +5,3,3,4,2,4,3,4,1,M +10,3,6,2,3,5,4,10,2,M +5,5,5,8,10,8,7,3,7,M +10,5,5,6,8,8,7,1,1,M +10,6,6,3,4,5,3,6,1,M +8,10,10,1,3,6,3,9,1,M +8,2,4,1,5,1,5,4,4,M +5,2,3,1,6,10,5,1,1,M +9,5,5,2,2,2,5,1,1,M +5,3,5,5,3,3,4,10,1,M +1,1,1,1,2,2,2,1,1,B +9,10,10,1,10,8,3,3,1,M +6,3,4,1,5,2,3,9,1,M +1,1,1,1,2,1,2,1,1,B +10,4,2,1,3,2,4,3,10,M +4,1,1,1,2,1,3,1,1,B +5,3,4,1,8,10,4,9,1,M +8,3,8,3,4,9,8,9,8,M +1,1,1,1,2,1,3,2,1,B +5,1,3,1,2,1,2,1,1,B +6,10,2,8,10,2,7,8,10,M +1,3,3,2,2,1,7,2,1,B +9,4,5,10,6,10,4,8,1,M +10,6,4,1,3,4,3,2,3,M +1,1,2,1,2,2,4,2,1,B +1,1,4,1,2,1,2,1,1,B +5,3,1,2,2,1,2,1,1,B +3,1,1,1,2,3,3,1,1,B +2,1,1,1,3,1,2,1,1,B +2,2,2,1,1,1,7,1,1,B +4,1,1,2,2,1,2,1,1,B +5,2,1,1,2,1,3,1,1,B +3,1,1,1,2,2,7,1,1,B +3,5,7,8,8,9,7,10,7,M +5,10,6,1,10,4,4,10,10,M +3,3,6,4,5,8,4,4,1,M +3,6,6,6,5,10,6,8,3,M +4,1,1,1,2,1,3,1,1,B +2,1,1,2,3,1,2,1,1,B +1,1,1,1,2,1,3,1,1,B +3,1,1,2,2,1,1,1,1,B +4,1,1,1,2,1,3,1,1,B +1,1,1,1,2,1,2,1,1,B +2,1,1,1,2,1,3,1,1,B +1,1,1,1,2,1,3,1,1,B +2,1,1,2,2,1,1,1,1,B +5,1,1,1,2,1,3,1,1,B +9,6,9,2,10,6,2,9,10,M +7,5,6,10,5,10,7,9,4,M +10,3,5,1,10,5,3,10,2,M +2,3,4,4,2,5,2,5,1,M +4,1,2,1,2,1,3,1,1,B +8,2,3,1,6,3,7,1,1,M +10,10,10,10,10,1,8,8,8,M +7,3,4,4,3,3,3,2,7,M +10,10,10,8,2,10,4,1,1,M +1,6,8,10,8,10,5,7,1,M +1,1,1,1,2,1,2,3,1,B +6,5,4,4,3,9,7,8,3,M +1,3,1,2,2,2,5,3,2,B +8,6,4,3,5,9,3,1,1,M +10,3,3,10,2,10,7,3,3,M +10,10,10,3,10,8,8,1,1,M +3,3,2,1,2,3,3,1,1,B +1,1,1,1,2,5,1,1,1,B +8,3,3,1,2,2,3,2,1,B +4,5,5,10,4,10,7,5,8,M +1,1,1,1,4,3,1,1,1,B +3,2,1,1,2,2,3,1,1,B +1,1,2,2,2,1,3,1,1,B +4,2,1,1,2,2,3,1,1,B +10,10,10,2,10,10,5,3,3,M +5,3,5,1,8,10,5,3,1,M +5,4,6,7,9,7,8,10,1,M +1,1,1,1,2,1,2,1,1,B +7,5,3,7,4,10,7,5,5,M +3,1,1,1,2,1,3,1,1,B +8,3,5,4,5,10,1,6,2,M +1,1,1,1,10,1,1,1,1,B +5,1,3,1,2,1,2,1,1,B +2,1,1,1,2,1,3,1,1,B +5,10,8,10,8,10,3,6,3,M +3,1,1,1,2,1,2,2,1,B +3,1,1,1,3,1,2,1,1,B +5,1,1,1,2,2,3,3,1,B +4,1,1,1,2,1,2,1,1,B +3,1,1,1,2,1,1,1,1,B +4,1,2,1,2,1,2,1,1,B +1,1,1,1,1,5,2,1,1,B +3,1,1,1,2,1,1,1,1,B +2,1,1,1,2,1,1,1,1,B +9,5,5,4,4,5,4,3,3,M +1,1,1,1,2,5,1,1,1,B +2,1,1,1,2,1,2,1,1,B +1,1,3,1,2,5,2,1,1,B +3,4,5,2,6,8,4,1,1,M +1,1,1,1,3,2,2,1,1,B +3,1,1,3,8,1,5,8,1,B +8,8,7,4,10,10,7,8,7,M +1,1,1,1,1,1,3,1,1,B +7,2,4,1,6,10,5,4,3,M +10,10,8,6,4,5,8,10,1,M +4,1,1,1,2,3,1,1,1,B +1,1,1,1,2,1,1,1,1,B +5,5,5,6,3,10,3,1,1,M +1,2,2,1,2,1,2,1,1,B +2,1,1,1,2,1,3,1,1,B +1,1,2,1,3,5,1,1,1,B +9,9,10,3,6,10,7,10,6,M +10,7,7,4,5,10,5,7,2,M +4,1,1,1,2,1,3,2,1,B +3,1,1,1,2,1,3,1,1,B +1,1,1,2,1,3,1,1,7,B +5,1,1,1,2,5,3,1,1,B +4,1,1,1,2,2,3,2,1,B +5,6,7,8,8,10,3,10,3,M +10,8,10,10,6,1,3,1,10,M +3,1,1,1,2,1,3,1,1,B +1,1,1,2,1,1,1,1,1,B +3,1,1,1,2,1,1,1,1,B +1,1,1,1,2,1,3,1,1,B +1,1,1,1,2,1,2,1,1,B +6,10,10,10,8,10,10,10,7,M +8,6,5,4,3,10,6,1,1,M +5,8,7,7,10,10,5,7,1,M +2,1,1,1,2,1,3,1,1,B +5,10,10,3,8,1,5,10,3,M +4,1,1,1,2,1,3,1,1,B +5,3,3,3,6,10,3,1,1,M +1,1,1,1,1,1,3,1,1,B +1,1,1,1,2,1,1,1,1,B +6,1,1,1,2,1,3,1,1,B +5,8,8,8,5,10,7,8,1,M +8,7,6,4,4,10,5,1,1,M +2,1,1,1,1,1,3,1,1,B +1,5,8,6,5,8,7,10,1,M +10,5,6,10,6,10,7,7,10,M +5,8,4,10,5,8,9,10,1,M +1,2,3,1,2,1,3,1,1,B +10,10,10,8,6,8,7,10,1,M +7,5,10,10,10,10,4,10,3,M +5,1,1,1,2,1,2,1,1,B +1,1,1,1,2,1,3,1,1,B +3,1,1,1,2,1,3,1,1,B +4,1,1,1,2,1,3,1,1,B +8,4,4,5,4,7,7,8,2,B +5,1,1,4,2,1,3,1,1,B +1,1,1,1,2,1,1,1,1,B +3,1,1,1,2,1,2,1,1,B +9,7,7,5,5,10,7,8,3,M +10,8,8,4,10,10,8,1,1,M +1,1,1,1,2,1,3,1,1,B +5,1,1,1,2,1,3,1,1,B +1,1,1,1,2,1,3,1,1,B +5,10,10,9,6,10,7,10,5,M +10,10,9,3,7,5,3,5,1,M +1,1,1,1,1,1,3,1,1,B +1,1,1,1,1,1,3,1,1,B +5,1,1,1,1,1,3,1,1,B +8,10,10,10,5,10,8,10,6,M +8,10,8,8,4,8,7,7,1,M +1,1,1,1,2,1,3,1,1,B +10,10,10,10,7,10,7,10,4,M +10,10,10,10,3,10,10,6,1,M +8,7,8,7,5,5,5,10,2,M +1,1,1,1,2,1,2,1,1,B +1,1,1,1,2,1,3,1,1,B +6,10,7,7,6,4,8,10,2,M +6,1,3,1,2,1,3,1,1,B +1,1,1,2,2,1,3,1,1,B +10,6,4,3,10,10,9,10,1,M +4,1,1,3,1,5,2,1,1,M +7,5,6,3,3,8,7,4,1,M +10,5,5,6,3,10,7,9,2,M +1,1,1,1,2,1,2,1,1,B +10,5,7,4,4,10,8,9,1,M +8,9,9,5,3,5,7,7,1,M +1,1,1,1,1,1,3,1,1,B +10,10,10,3,10,10,9,10,1,M +7,4,7,4,3,7,7,6,1,M +6,8,7,5,6,8,8,9,2,M +8,4,6,3,3,1,4,3,1,B +10,4,5,5,5,10,4,1,1,M +3,3,2,1,3,1,3,6,1,B +3,1,4,1,2,5,3,1,1,B +10,8,8,2,8,10,4,8,10,M +9,8,8,5,6,2,4,10,4,M +8,10,10,8,6,9,3,10,10,M +10,4,3,2,3,10,5,3,2,M +5,1,3,3,2,2,2,3,1,B +3,1,1,3,1,1,3,1,1,B +2,1,1,1,2,1,3,1,1,B +1,1,1,1,2,5,5,1,1,B +1,1,1,1,2,1,3,1,1,B +5,1,1,2,2,2,3,1,1,B +8,10,10,8,5,10,7,8,1,M +8,4,4,1,2,9,3,3,1,M +4,1,1,1,2,1,3,6,1,B +3,1,1,1,2,5,3,1,1,B +1,2,2,1,2,1,1,1,1,B +10,4,4,10,2,10,5,3,3,M +6,3,3,5,3,10,3,5,3,B +6,10,10,2,8,10,7,3,3,M +9,10,10,1,10,8,3,3,1,M +5,6,6,2,4,10,3,6,1,M +3,1,1,1,2,1,1,1,1,B +3,1,1,1,2,1,2,1,1,B +3,1,1,1,2,1,3,1,1,B +5,7,7,1,5,8,3,4,1,B +10,5,8,10,3,10,5,1,3,M +5,10,10,6,10,10,10,6,5,M +8,8,9,4,5,10,7,8,1,M +10,4,4,10,6,10,5,5,1,M +7,9,4,10,10,3,5,3,3,M +5,1,4,1,2,1,3,2,1,B +10,10,6,3,3,10,4,3,2,M +3,3,5,2,3,10,7,1,1,M +10,8,8,2,3,4,8,7,8,M +1,1,1,1,2,1,3,1,1,B +8,4,7,1,3,10,3,9,2,M +5,1,1,1,2,1,3,1,1,B +3,3,5,2,3,10,7,1,1,M +7,2,4,1,3,4,3,3,1,M +3,1,1,1,2,1,3,2,1,B +3,1,3,1,2,5,2,1,1,B +3,1,1,1,2,1,2,1,1,B +1,1,1,1,2,1,2,1,1,B +1,1,1,1,2,1,3,1,1,B +10,5,7,3,3,7,3,3,8,M +3,1,1,1,2,1,3,1,1,B +2,1,1,2,2,1,3,1,1,B +1,4,3,10,4,10,5,6,1,M +10,4,6,1,2,10,5,3,1,M +7,4,5,10,2,10,3,8,2,M +8,10,10,10,8,10,10,7,3,M +10,10,10,10,10,10,4,10,10,M +3,1,1,1,3,1,2,1,1,B +6,1,3,1,4,5,5,10,1,M +5,6,6,8,6,10,4,10,4,M +1,1,1,1,2,1,1,1,1,B +1,1,1,1,2,1,3,1,1,B +8,8,8,1,2,5,6,10,1,M +10,4,4,6,2,10,2,3,1,M +1,1,1,1,2,5,2,1,1,B +5,5,7,8,6,10,7,4,1,M +5,3,4,3,4,5,4,7,1,B +5,4,3,1,2,5,2,3,1,B +8,2,1,1,5,1,1,1,1,B +9,1,2,6,4,10,7,7,2,M +8,4,10,5,4,4,7,10,1,M +1,1,1,1,2,1,3,1,1,B +10,10,10,7,9,10,7,10,10,M +1,1,1,1,2,1,3,1,1,B +8,3,4,9,3,10,3,3,1,M +10,8,4,4,4,10,3,10,4,M +1,1,1,1,2,1,3,1,1,B +1,1,1,1,2,1,3,1,1,B +7,8,7,6,4,3,8,8,4,M +3,1,1,1,2,5,5,1,1,B +2,1,1,1,3,1,2,1,1,B +1,1,1,1,2,1,1,1,1,B +8,6,4,10,10,1,3,5,1,M +1,1,1,1,2,1,1,1,1,B +1,1,1,1,1,1,2,1,1,B +4,6,5,6,7,5,4,9,1,B +5,5,5,2,5,10,4,3,1,M +6,8,7,8,6,8,8,9,1,M +1,1,1,1,5,1,3,1,1,B +4,4,4,4,6,5,7,3,1,B +7,6,3,2,5,10,7,4,6,M +3,1,1,1,2,5,3,1,1,B +3,1,1,1,2,1,3,1,1,B +5,4,6,10,2,10,4,1,1,M +1,1,1,1,2,1,3,1,1,B +3,2,2,1,2,1,2,3,1,B +10,1,1,1,2,10,5,4,1,M +1,1,1,1,2,1,2,1,1,B +8,10,3,2,6,4,3,10,1,M +10,4,6,4,5,10,7,1,1,M +10,4,7,2,2,8,6,1,1,M +5,1,1,1,2,1,3,1,2,B +5,2,2,2,2,1,2,2,1,B +5,4,6,6,4,10,4,3,1,M +8,6,7,3,3,10,3,4,2,M +1,1,1,1,2,1,1,1,1,B +6,5,5,8,4,10,3,4,1,M +1,1,1,1,2,1,3,1,1,B +1,1,1,1,1,1,2,1,1,B +8,5,5,5,2,10,4,3,1,M +10,3,3,1,2,10,7,6,1,M +1,1,1,1,2,1,3,1,1,B +2,1,1,1,2,1,1,1,1,B +1,1,1,1,2,1,1,1,1,B +7,6,4,8,10,10,9,5,3,M +1,1,1,1,2,1,1,1,1,B +5,2,2,2,3,1,1,3,1,B +1,1,1,1,1,1,1,3,1,B +3,4,4,10,5,1,3,3,1,M +4,2,3,5,3,8,7,6,1,M +5,1,1,3,2,1,1,1,1,B +2,1,1,1,2,1,3,1,1,B +3,4,5,3,7,3,4,6,1,B +2,7,10,10,7,10,4,9,4,M +1,1,1,1,2,1,2,1,1,B +4,1,1,1,3,1,2,2,1,B +5,3,3,1,3,3,3,3,3,M +8,10,10,7,10,10,7,3,8,M +8,10,5,3,8,4,4,10,3,M +10,3,5,4,3,7,3,5,3,M +6,10,10,10,10,10,8,10,10,M +3,10,3,10,6,10,5,1,4,M +3,2,2,1,4,3,2,1,1,B +4,4,4,2,2,3,2,1,1,B +2,1,1,1,2,1,3,1,1,B +2,1,1,1,2,1,2,1,1,B +6,10,10,10,8,10,7,10,7,M +5,8,8,10,5,10,8,10,3,M +1,1,3,1,2,1,1,1,1,B +1,1,3,1,1,1,2,1,1,B +4,3,2,1,3,1,2,1,1,B +1,1,3,1,2,1,1,1,1,B +4,1,2,1,2,1,2,1,1,B +5,1,1,2,2,1,2,1,1,B +3,1,2,1,2,1,2,1,1,B +1,1,1,1,2,1,1,1,1,B +1,1,1,1,2,1,2,1,1,B +1,1,1,1,1,1,2,1,1,B +3,1,1,4,3,1,2,2,1,B +5,3,4,1,4,1,3,1,1,B +1,1,1,1,2,1,1,1,1,B +10,6,3,6,4,10,7,8,4,M +3,2,2,2,2,1,3,2,1,B +2,1,1,1,2,1,1,1,1,B +2,1,1,1,2,1,1,1,1,B +3,3,2,2,3,1,1,2,3,B +7,6,6,3,2,10,7,1,1,M +5,3,3,2,3,1,3,1,1,B +2,1,1,1,2,1,2,2,1,B +5,1,1,1,3,2,2,2,1,B +1,1,1,2,2,1,2,1,1,B +10,8,7,4,3,10,7,9,1,M +3,1,1,1,2,1,2,1,1,B +1,1,1,1,1,1,1,1,1,B +1,2,3,1,2,1,2,1,1,B +3,1,1,1,2,1,2,1,1,B +3,1,1,1,2,1,3,1,1,B +4,1,1,1,2,1,1,1,1,B +3,2,1,1,2,1,2,2,1,B +1,2,3,1,2,1,1,1,1,B +3,10,8,7,6,9,9,3,8,M +3,1,1,1,2,1,1,1,1,B +5,3,3,1,2,1,2,1,1,B +3,1,1,1,2,4,1,1,1,B +1,2,1,3,2,1,1,2,1,B +1,1,1,1,2,1,2,1,1,B +4,2,2,1,2,1,2,1,1,B +1,1,1,1,2,1,2,1,1,B +2,3,2,2,2,2,3,1,1,B +3,1,2,1,2,1,2,1,1,B +1,1,1,1,2,1,2,1,1,B +1,1,1,1,1,5,2,1,1,B +10,10,10,6,8,4,8,5,1,M +5,1,2,1,2,1,3,1,1,B +8,5,6,2,3,10,6,6,1,M +3,3,2,6,3,3,3,5,1,B +8,7,8,5,10,10,7,2,1,M +1,1,1,1,2,1,2,1,1,B +5,2,2,2,2,2,3,2,2,B +2,3,1,1,5,1,1,1,1,B +3,2,2,3,2,3,3,1,1,B +10,10,10,7,10,10,8,2,1,M +4,3,3,1,2,1,3,3,1,B +5,1,3,1,2,1,2,1,1,B +3,1,1,1,2,1,1,1,1,B +9,10,10,10,10,10,10,10,1,M +5,3,6,1,2,1,1,1,1,B +8,7,8,2,4,2,5,10,1,M +1,1,1,1,2,1,2,1,1,B +2,1,1,1,2,1,2,1,1,B +1,3,1,1,2,1,2,2,1,B +5,1,1,3,4,1,3,2,1,B +5,1,1,1,2,1,2,2,1,B +3,2,2,3,2,1,1,1,1,B +6,9,7,5,5,8,4,2,1,B +10,8,10,1,3,10,5,1,1,M +10,10,10,1,6,1,2,8,1,M +4,1,1,1,2,1,1,1,1,B +4,1,3,3,2,1,1,1,1,B +5,1,1,1,2,1,1,1,1,B +10,4,3,10,4,10,10,1,1,M +5,2,2,4,2,4,1,1,1,B +1,1,1,3,2,3,1,1,1,B +1,1,1,1,2,2,1,1,1,B +5,1,1,6,3,1,2,1,1,B +2,1,1,1,2,1,1,1,1,B +1,1,1,1,2,1,1,1,1,B +5,1,1,1,2,1,1,1,1,B +1,1,1,1,1,1,1,1,1,B +5,7,9,8,6,10,8,10,1,M +4,1,1,3,1,1,2,1,1,B +5,1,1,1,2,1,1,1,1,B +3,1,1,3,2,1,1,1,1,B +4,5,5,8,6,10,10,7,1,M +2,3,1,1,3,1,1,1,1,B +10,2,2,1,2,6,1,1,2,M +10,6,5,8,5,10,8,6,1,M +8,8,9,6,6,3,10,10,1,M +5,1,2,1,2,1,1,1,1,B +5,1,3,1,2,1,1,1,1,B +5,1,1,3,2,1,1,1,1,B +3,1,1,1,2,5,1,1,1,B +6,1,1,3,2,1,1,1,1,B +4,1,1,1,2,1,1,2,1,B +4,1,1,1,2,1,1,1,1,B +10,9,8,7,6,4,7,10,3,M +10,6,6,2,4,10,9,7,1,M +6,6,6,5,4,10,7,6,2,M +4,1,1,1,2,1,1,1,1,B +1,1,2,1,2,1,2,1,1,B +3,1,1,1,1,1,2,1,1,B +6,1,1,3,2,1,1,1,1,B +6,1,1,1,1,1,1,1,1,B +4,1,1,1,2,1,1,1,1,B +5,1,1,1,2,1,1,1,1,B +3,1,1,1,2,1,1,1,1,B +4,1,2,1,2,1,1,1,1,B +4,1,1,1,2,1,1,1,1,B +5,2,1,1,2,1,1,1,1,B +4,8,7,10,4,10,7,5,1,M +5,1,1,1,1,1,1,1,1,B +5,3,2,4,2,1,1,1,1,B +9,10,10,10,10,5,10,10,10,M +8,7,8,5,5,10,9,10,1,M +5,1,2,1,2,1,1,1,1,B +1,1,1,3,1,3,1,1,1,B +3,1,1,1,1,1,2,1,1,B +10,10,10,10,6,10,8,1,5,M +3,6,4,10,3,3,3,4,1,M +6,3,2,1,3,4,4,1,1,M +1,1,1,1,2,1,1,1,1,B +5,8,9,4,3,10,7,1,1,M +4,1,1,1,1,1,2,1,1,B +5,10,10,10,6,10,6,5,2,M +5,1,2,10,4,5,2,1,1,B +3,1,1,1,1,1,2,1,1,B +1,1,1,1,1,1,1,1,1,B +4,2,1,1,2,1,1,1,1,B +4,1,1,1,2,1,2,1,1,B +4,1,1,1,2,1,2,1,1,B +6,1,1,1,2,1,3,1,1,B +4,1,1,1,2,1,2,1,1,B +4,1,1,2,2,1,2,1,1,B +4,1,1,1,2,1,3,1,1,B +1,1,1,1,2,1,1,1,1,B +3,3,1,1,2,1,1,1,1,B +8,10,10,10,7,5,4,8,7,M +1,1,1,1,2,4,1,1,1,B +5,1,1,1,2,1,1,1,1,B +2,1,1,1,2,1,1,1,1,B +1,1,1,1,2,1,1,1,1,B +5,1,1,1,2,1,2,1,1,B +5,1,1,1,2,1,1,1,1,B +3,1,1,1,1,1,2,1,1,B +6,6,7,10,3,10,8,10,2,M +4,10,4,7,3,10,9,10,1,M +1,1,1,1,1,1,1,1,1,B +1,1,1,1,1,1,2,1,1,B +3,1,2,2,2,1,1,1,1,B +4,7,8,3,4,10,9,1,1,M +1,1,1,1,3,1,1,1,1,B +4,1,1,1,3,1,1,1,1,B +10,4,5,4,3,5,7,3,1,M +7,5,6,10,4,10,5,3,1,M +3,1,1,1,2,1,2,1,1,B +3,1,1,2,2,1,1,1,1,B +4,1,1,1,2,1,1,1,1,B +4,1,1,1,2,1,3,1,1,B +6,1,3,2,2,1,1,1,1,B +4,1,1,1,1,1,2,1,1,B +7,4,4,3,4,10,6,9,1,M +4,2,2,1,2,1,2,1,1,B +1,1,1,1,1,1,3,1,1,B +3,1,1,1,2,1,2,1,1,B +2,1,1,1,2,1,2,1,1,B +1,1,3,2,2,1,3,1,1,B +5,1,1,1,2,1,3,1,1,B +5,1,2,1,2,1,3,1,1,B +4,1,1,1,2,1,2,1,1,B +6,1,1,1,2,1,2,1,1,B +5,1,1,1,2,2,2,1,1,B +3,1,1,1,2,1,1,1,1,B +5,3,1,1,2,1,1,1,1,B +4,1,1,1,2,1,2,1,1,B +2,1,3,2,2,1,2,1,1,B +5,1,1,1,2,1,2,1,1,B +6,10,10,10,4,10,7,10,1,M +2,1,1,1,1,1,1,1,1,B +3,1,1,1,1,1,1,1,1,B +7,8,3,7,4,5,7,8,2,M +3,1,1,1,2,1,2,1,1,B +1,1,1,1,2,1,3,1,1,B +3,2,2,2,2,1,4,2,1,B +4,4,2,1,2,5,2,1,2,B +3,1,1,1,2,1,1,1,1,B +4,3,1,1,2,1,4,8,1,B +5,2,2,2,1,1,2,1,1,B +5,1,1,3,2,1,1,1,1,B +2,1,1,1,2,1,2,1,1,B +5,1,1,1,2,1,2,1,1,B +5,1,1,1,2,1,3,1,1,B +5,1,1,1,2,1,3,1,1,B +1,1,1,1,2,1,3,1,1,B +3,1,1,1,2,1,2,1,1,B +4,1,1,1,2,1,3,2,1,B +5,7,10,10,5,10,10,10,1,M +3,1,2,1,2,1,3,1,1,B +4,1,1,1,2,3,2,1,1,B +8,4,4,1,6,10,2,5,2,M +10,10,8,10,6,5,10,3,1,M +8,10,4,4,8,10,8,2,1,M +7,6,10,5,3,10,9,10,2,M +3,1,1,1,2,1,2,1,1,B +1,1,1,1,2,1,2,1,1,B +10,9,7,3,4,2,7,7,1,M +5,1,2,1,2,1,3,1,1,B +5,1,1,1,2,1,2,1,1,B +1,1,1,1,2,1,2,1,1,B +1,1,1,1,2,1,2,1,1,B +1,1,1,1,2,1,3,1,1,B +5,1,2,1,2,1,2,1,1,B +5,7,10,6,5,10,7,5,1,M +6,10,5,5,4,10,6,10,1,M +3,1,1,1,2,1,1,1,1,B +5,1,1,6,3,1,1,1,1,B +1,1,1,1,2,1,1,1,1,B +8,10,10,10,6,10,10,10,1,M +5,1,1,1,2,1,2,2,1,B +9,8,8,9,6,3,4,1,1,M +5,1,1,1,2,1,1,1,1,B +4,10,8,5,4,1,10,1,1,M +2,5,7,6,4,10,7,6,1,M +10,3,4,5,3,10,4,1,1,M +5,1,2,1,2,1,1,1,1,B +4,8,6,3,4,10,7,1,1,M +5,1,1,1,2,1,2,1,1,B +4,1,2,1,2,1,2,1,1,B +5,1,3,1,2,1,3,1,1,B +3,1,1,1,2,1,2,1,1,B +5,2,4,1,1,1,1,1,1,B +3,1,1,1,2,1,2,1,1,B +1,1,1,1,1,1,2,1,1,B +4,1,1,1,2,1,2,1,1,B +5,4,6,8,4,1,8,10,1,M +5,3,2,8,5,10,8,1,2,M +10,5,10,3,5,8,7,8,3,M +4,1,1,2,2,1,1,1,1,B +1,1,1,1,2,1,1,1,1,B +5,10,10,10,10,10,10,1,1,M +5,1,1,1,2,1,1,1,1,B +10,4,3,10,3,10,7,1,2,M +5,10,10,10,5,2,8,5,1,M +8,10,10,10,6,10,10,10,10,M +2,3,1,1,2,1,2,1,1,B +2,1,1,1,1,1,2,1,1,B +4,1,3,1,2,1,2,1,1,B +3,1,1,1,2,1,2,1,1,B +1,1,1,1,1,5,1,1,1,B +4,1,1,1,2,1,2,1,1,B +5,1,1,1,2,1,2,1,1,B +3,1,1,1,2,1,2,1,1,B +6,3,3,3,3,2,6,1,1,B +7,1,2,3,2,1,2,1,1,B +1,1,1,1,2,1,1,1,1,B +5,1,1,2,1,1,2,1,1,B +3,1,3,1,3,4,1,1,1,B +4,6,6,5,7,6,7,7,3,M +2,1,1,1,2,5,1,1,1,B +2,1,1,1,2,1,1,1,1,B +4,1,1,1,2,1,1,1,1,B +6,2,3,1,2,1,1,1,1,B +5,1,1,1,2,1,2,1,1,B +1,1,1,1,2,1,1,1,1,B +8,7,4,4,5,3,5,10,1,M +3,1,1,1,2,1,1,1,1,B +3,1,4,1,2,1,1,1,1,B +10,10,7,8,7,1,10,10,3,M +4,2,4,3,2,2,2,1,1,B +4,1,1,1,2,1,1,1,1,B +5,1,1,3,2,1,1,1,1,B +4,1,1,3,2,1,1,1,1,B +3,1,1,1,2,1,2,1,1,B +3,1,1,1,2,1,2,1,1,B +1,1,1,1,2,1,1,1,1,B +2,1,1,1,2,1,1,1,1,B +3,1,1,1,2,1,2,1,1,B +1,2,2,1,2,1,1,1,1,B +1,1,1,3,2,1,1,1,1,B +5,10,10,10,10,2,10,10,10,M +3,1,1,1,2,1,2,1,1,B +3,1,1,2,3,4,1,1,1,B +1,2,1,3,2,1,2,1,1,B +5,1,1,1,2,1,2,2,1,B +4,1,1,1,2,1,2,1,1,B +3,1,1,1,2,1,3,1,1,B +3,1,1,1,2,1,2,1,1,B +5,1,1,1,2,1,2,1,1,B +5,4,5,1,8,1,3,6,1,B +7,8,8,7,3,10,7,2,3,M +1,1,1,1,2,1,1,1,1,B +1,1,1,1,2,1,2,1,1,B +4,1,1,1,2,1,3,1,1,B +1,1,3,1,2,1,2,1,1,B +1,1,3,1,2,1,2,1,1,B +3,1,1,3,2,1,2,1,1,B +1,1,1,1,2,1,1,1,1,B +5,2,2,2,2,1,1,1,2,B +3,1,1,1,2,1,3,1,1,B +5,7,4,1,6,1,7,10,3,M +5,10,10,8,5,5,7,10,1,M +3,10,7,8,5,8,7,4,1,M +3,2,1,2,2,1,3,1,1,B +2,1,1,1,2,1,3,1,1,B +5,3,2,1,3,1,1,1,1,B +1,1,1,1,2,1,2,1,1,B +4,1,4,1,2,1,1,1,1,B +1,1,2,1,2,1,2,1,1,B +5,1,1,1,2,1,1,1,1,B +1,1,1,1,2,1,1,1,1,B +2,1,1,1,2,1,1,1,1,B +10,10,10,10,5,10,10,10,7,M +5,10,10,10,4,10,5,6,3,M +5,1,1,1,2,1,3,2,1,B +1,1,1,1,2,1,1,1,1,B +1,1,1,1,2,1,1,1,1,B +1,1,1,1,2,1,1,1,1,B +1,1,1,1,2,1,1,1,1,B +3,1,1,1,2,1,2,3,1,B +4,1,1,1,2,1,1,1,1,B +1,1,1,1,2,1,1,1,8,B +1,1,1,3,2,1,1,1,1,B +5,10,10,5,4,5,4,4,1,M +3,1,1,1,2,1,1,1,1,B +3,1,1,1,2,1,2,1,2,B +3,1,1,1,3,2,1,1,1,B +2,1,1,1,2,1,1,1,1,B +5,10,10,3,7,3,8,10,2,M +4,8,6,4,3,4,10,6,1,M +4,8,8,5,4,5,10,4,1,M diff --git a/examples/src/main/scala/org/apache/spark/examples/h2o/SparkSVMDemo.scala b/examples/src/main/scala/org/apache/spark/examples/h2o/SparkSVMDemo.scala new file mode 100644 index 000000000..76f00de96 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/h2o/SparkSVMDemo.scala @@ -0,0 +1,70 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.spark.examples.h2o + +import java.io.File + +import org.apache.spark.{SparkContext, SparkFiles} +import org.apache.spark.h2o.H2OContext +import org.apache.spark.ml.spark.models.svm.SVM +import org.apache.spark.ml.spark.models.svm.SVMParameters +import org.apache.spark.sql.SQLContext +import water.fvec.H2OFrame +import water.support.SparkContextSupport + +object SparkSVMDemo extends SparkContextSupport { + + def main(args: Array[String]) { + val conf = configure("Sparkling Water: Spark SVM demo.") + val sc = new SparkContext(conf) + + val h2oContext = H2OContext.getOrCreate(sc) + implicit val sqLContext = SQLContext.getOrCreate(sc) + + // Setup environment + addFiles(sc, absPath("examples/smalldata/bcwd.csv")) + + // Load H2O from CSV file (i.e., access directly H2O cloud) + // Use super-fast advanced H2O CSV parser !!! + val breastCancerData = new H2OFrame(new File(SparkFiles.get("bcwd.csv"))) + + // Training data + breastCancerData.replace(breastCancerData.numCols()-1, breastCancerData.lastVec().toCategoricalVec) + breastCancerData.update() + + // Configure Deep Learning algorithm + val parms = new SVMParameters + parms._train = breastCancerData.key + parms._response_column = "label" + + val svm = new SVM(parms, h2oContext) + + val svmModel = svm.trainModel.get + + // Use model for scoring + val predictionH2OFrame = svmModel.score(breastCancerData) + val predictionsFromModel = h2oContext.asDataFrame(predictionH2OFrame).collect + println(predictionsFromModel.mkString("\n===> Model predictions: ", ",\n", ", ...\n")) + + // Stop Spark cluster and destroy all executors + if (System.getProperty("spark.ext.h2o.preserve.executors")==null) { + sc.stop() + } + // Shutdown H2O + h2oContext.stop() + } +} diff --git a/ml/build.gradle b/ml/build.gradle index b28d7b9d5..573e2eb0e 100644 --- a/ml/build.gradle +++ b/ml/build.gradle @@ -14,5 +14,6 @@ dependencies { // And use scalatest for Scala testing testCompile "org.scalatest:scalatest_${scalaBaseVersion}:2.2.1" testCompile "junit:junit:4.11" -} + testCompile project(':sparkling-water-core').sourceSets.test.output +} diff --git a/ml/src/main/scala/hex/Register.scala b/ml/src/main/scala/hex/Register.scala new file mode 100644 index 000000000..64102e29e --- /dev/null +++ b/ml/src/main/scala/hex/Register.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hex + +import org.apache.spark.h2o.H2OContext +import org.apache.spark.ml.spark.models.svm.SVM +import water.H2O +import water.api.{GridSearchHandler, ModelBuilderHandler, RestApi} + +class Register extends RestApi { + + override def register(hc: H2OContext) = { + + val models = Seq(new SVM(true, hc)) + + for (algo <- models) { + val base: String = algo.getClass.getSimpleName + val lbase: String = base.toLowerCase + val bh_clz = classOf[ModelBuilderHandler[_, _, _]] + val version: Int = 3 + H2O.register( + "POST /" + version + "/ModelBuilders/" + lbase, + bh_clz, + "train", + "train_" + lbase, + "Train a " + base + " model." + ) + H2O.register( + "POST /" + version + "/ModelBuilders/" + lbase + "/parameters", + bh_clz, + "validate_parameters", + "validate_" + lbase, + "Validate a set of " + base + " model builder parameters." + ) + H2O.register( + "POST /99/Grid/" + lbase, + classOf[GridSearchHandler[_,_,_,_]], + "train", + "grid_search_" + lbase, + "Run grid search for " + base + " model." + ) + } + } +} diff --git a/ml/src/main/scala/hex/SVMDriverUtil.java b/ml/src/main/scala/hex/SVMDriverUtil.java new file mode 100644 index 000000000..b9e599ce0 --- /dev/null +++ b/ml/src/main/scala/hex/SVMDriverUtil.java @@ -0,0 +1,66 @@ +package hex; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.ml.spark.models.svm.SVMModel; +import org.apache.spark.mllib.regression.LabeledPoint; +import org.apache.spark.rdd.RDD; +import scala.Tuple2; +import water.fvec.Frame; + +public class SVMDriverUtil { + + public static ModelMetrics computeMetrics(SVMModel model, RDD training, + final org.apache.spark.mllib.classification.SVMModel trainedModel, + Frame f, + String[] responseDomains) { + + // Compute Spark evaluations + JavaRDD> predictionAndLabels = training.toJavaRDD().map( + new Function>() { + public Tuple2 call(LabeledPoint p) { + Double prediction = trainedModel.predict(p.features()); + return new Tuple2<>(prediction, p.label()); + } + } + ); + + double mse; + + // Set the metrics + switch (model._output.getModelCategory()) { + case Binomial: + ModelMetricsBinomial.MetricBuilderBinomial builderB = new ModelMetricsBinomial.MetricBuilderBinomial(responseDomains); + for (Tuple2 predAct : predictionAndLabels.collect()) { + Double pred = predAct._1; + builderB.perRow( + new double[] {pred, pred == 1 ? 0 : 1, pred == 1 ? 1 : 0}, + new float[] {predAct._2.floatValue()}, + model + ); + } + mse = builderB._sumsqe / builderB._nclasses; + + return new ModelMetricsBinomial( + model, f, builderB._count, mse, responseDomains, + builderB.weightedSigma(), new AUC2(builderB._auc), builderB._logloss, null + ); + default: + ModelMetricsRegression.MetricBuilderRegression builderR = new ModelMetricsRegression.MetricBuilderRegression(); + for (Tuple2 predAct : predictionAndLabels.collect()) { + Double pred = predAct._1; + builderR.perRow( + new double[] {pred, pred == 1 ? 0 : 1, pred == 1 ? 1 : 0}, + new float[] {predAct._2.floatValue()}, + model + ); + } + mse = builderR._sumsqe / builderR._nclasses; + + return new ModelMetricsRegression( + model, f, builderR._count, mse, builderR.weightedSigma(), builderR._sumdeviance / builderR._wcount, 0 + ); + } + } + +} diff --git a/ml/src/main/scala/hex/schemas/SVMModelV3.scala b/ml/src/main/scala/hex/schemas/SVMModelV3.scala new file mode 100644 index 000000000..59af258f8 --- /dev/null +++ b/ml/src/main/scala/hex/schemas/SVMModelV3.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hex.schemas + +import SVMV3.SVMParametersV3 +import hex.schemas.SVMModelV3.SVMModelOutputV3 +import org.apache.spark.ml.spark.models.svm.{SVMModel, SVMParameters} +import water.api.schemas3.{ModelOutputSchemaV3, ModelSchemaV3} +import water.api.API + +class SVMModelV3 extends ModelSchemaV3[SVMModel, + SVMModelV3, + SVMParameters, + SVMParametersV3, + SVMModel.SVMOutput, + SVMModelV3.SVMModelOutputV3] { + + override def createParametersSchema(): SVMParametersV3 = { new SVMParametersV3() } + override def createOutputSchema(): SVMModelOutputV3 = { new SVMModelOutputV3() } + +} + +object SVMModelV3 { + + final class SVMModelOutputV3 extends ModelOutputSchemaV3[SVMModel.SVMOutput, SVMModelOutputV3] { + // Output fields + @API(help = "Iterations executed") var iterations: Int = 0 + @API(help = "Interceptor") var interceptor: Double = 0 + @API(help = "Weights") var weights: Array[Double] = Array() + } + +} + diff --git a/ml/src/main/scala/hex/schemas/SVMV3.java b/ml/src/main/scala/hex/schemas/SVMV3.java new file mode 100644 index 000000000..dc7f57d2c --- /dev/null +++ b/ml/src/main/scala/hex/schemas/SVMV3.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hex.schemas; + +import org.apache.spark.ml.spark.models.svm.*; +import water.DKV; +import water.Key; +import water.Value; +import water.api.API; +import water.api.schemas3.KeyV3; +import water.api.schemas3.ModelParametersSchemaV3; +import water.fvec.Frame; + +// Seems like this has to be in Java since H2O's frameworks uses reflection's getFields... +// I probably could mix Java and Scala here, leave SVMParametersV3 with fields as Java +// and then make the same Scala class SVMParametersV3 which extends it but not sure if it's worth it... +public class SVMV3 extends ModelBuilderSchema { + + public static final class SVMParametersV3 extends + ModelParametersSchemaV3 { + public static String[] fields = new String[]{ + "model_id", + "training_frame", + "response_column", + "initial_weights_frame", + "validation_frame", + "nfolds", + "add_intercept", + + "step_size", + "reg_param", + "convergence_tol", + "mini_batch_fraction", + "threshold", + "updater", + "gradient", + + "ignored_columns", + "ignore_const_cols" + }; + + @API(help="Add intercept.", direction=API.Direction.INPUT, gridable = true) + public boolean add_intercept = false; + + @API(help="Set step size", direction=API.Direction.INPUT, gridable = true) + public double step_size = 1.0; + + @API(help="Set regularization parameter", direction=API.Direction.INPUT, gridable = true) + public double reg_param = 0.01; + + @API(help="Set convergence tolerance", direction=API.Direction.INPUT, gridable = true) + public double convergence_tol = 0.001; + + @API(help="Set mini batch fraction", direction=API.Direction.INPUT, gridable = true) + public double mini_batch_fraction = 1.0; + + // TODO what exactly does INOUT do?? Should this be only INPUT? + @API(help="Set threshold that separates positive predictions from negative ones. NaN for raw prediction.", direction=API.Direction.INOUT, gridable = true) + public double threshold = 0.0; + + @API(help="Set the updater for SGD.", direction=API.Direction.INPUT, values = {"L2", "L1", "Simple"}, required = true, gridable = true) + public Updater updater = Updater.L2; + + @API(help="Set the gradient computation type for SGD.", direction=API.Direction.INPUT, values = {"Hinge", "LeastSquares", "Logistic"}, required = true, gridable = true) + public Gradient gradient = Gradient.Hinge; + + @API(help="Initial model weights.", direction=API.Direction.INOUT, gridable = true) + public KeyV3.FrameKeyV3 initial_weights_frame; + + @Override + public SVMParametersV3 fillFromImpl(SVMParameters impl) { + super.fillFromImpl(impl); + + if (null != impl._initial_weights) { + Value v = DKV.get(impl._initial_weights); + if (null != v) { + initial_weights_frame = new KeyV3.FrameKeyV3(((Frame) v.get())._key); + } + } + + return this; + } + + @Override + public SVMParameters fillImpl(SVMParameters impl) { + super.fillImpl(impl); + impl._initial_weights = + null == this.initial_weights_frame ? null : Key.make(this.initial_weights_frame.name); + return impl; + } + + } + +} diff --git a/ml/src/main/scala/org/apache/spark/ml/h2o/H2OPipeline.scala b/ml/src/main/scala/org/apache/spark/ml/h2o/H2OPipeline.scala index 0c888ca36..5420e67c6 100644 --- a/ml/src/main/scala/org/apache/spark/ml/h2o/H2OPipeline.scala +++ b/ml/src/main/scala/org/apache/spark/ml/h2o/H2OPipeline.scala @@ -19,8 +19,9 @@ package org.apache.spark.ml.h2o import org.apache.spark.annotation.Since import org.apache.spark.ml.Pipeline.SharedReadWrite -import org.apache.spark.ml.{PipelineStage, Transformer, PipelineModel, Pipeline} +import org.apache.spark.ml.{Pipeline, PipelineModel, PipelineStage, Transformer} import _root_.org.apache.spark.sql.DataFrame +import org.apache.spark.h2o.H2OFrame import org.apache.spark.ml.util.{Identifiable, MLReadable, MLReader} /** diff --git a/ml/src/main/scala/org/apache/spark/ml/spark/models/svm/Gradient.java b/ml/src/main/scala/org/apache/spark/ml/spark/models/svm/Gradient.java new file mode 100644 index 000000000..c6be2e935 --- /dev/null +++ b/ml/src/main/scala/org/apache/spark/ml/spark/models/svm/Gradient.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.ml.spark.models.svm; + +import org.apache.spark.mllib.optimization.HingeGradient; +import org.apache.spark.mllib.optimization.LeastSquaresGradient; +import org.apache.spark.mllib.optimization.LogisticGradient; + +public enum Gradient { + Hinge(new HingeGradient()), + LeastSquares(new LeastSquaresGradient()), + Logistic(new LogisticGradient()); + + private org.apache.spark.mllib.optimization.Gradient sparkGradient; + + Gradient(org.apache.spark.mllib.optimization.Gradient sparkGradient) { + this.sparkGradient = sparkGradient; + } + + public org.apache.spark.mllib.optimization.Gradient get() { + return sparkGradient; + } +} diff --git a/ml/src/main/scala/org/apache/spark/ml/spark/models/svm/SVM.java b/ml/src/main/scala/org/apache/spark/ml/spark/models/svm/SVM.java new file mode 100644 index 000000000..f570f94ce --- /dev/null +++ b/ml/src/main/scala/org/apache/spark/ml/spark/models/svm/SVM.java @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.ml.spark.models.svm; + +import hex.*; + +import org.apache.spark.api.java.function.Function; +import org.apache.spark.SparkContext; +import org.apache.spark.h2o.H2OContext; +import org.apache.spark.ml.spark.models.svm.SVMModel.SVMOutput; +import org.apache.spark.mllib.classification.SVMWithSGD; +import org.apache.spark.mllib.linalg.Vector; +import org.apache.spark.mllib.linalg.Vectors; +import org.apache.spark.mllib.regression.LabeledPoint; +import org.apache.spark.rdd.RDD; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructField; +import water.Scope; +import water.fvec.Frame; +import water.fvec.H2OFrame; +import water.fvec.Vec; +import water.util.Log; + +import java.util.Arrays; + +public class SVM extends ModelBuilder { + + transient private final H2OContext hc; + + public SVM(boolean startup_once, H2OContext hc) { + super(new SVMParameters(), startup_once); + this.hc = hc; + } + + public SVM(SVMParameters parms, H2OContext hc) { + super(parms); + init(false); + this.hc = hc; + } + + @Override + protected Driver trainModelImpl() { + return new SVMDriver(); + } + + @Override + public ModelCategory[] can_build() { + return new ModelCategory[]{ + ModelCategory.Binomial, + ModelCategory.Regression + }; + } + + @Override + public boolean isSupervised() { + return true; + } + + @Override + public void init(boolean expensive) { + super.init(expensive); + + _parms.validate(this); + + if (_train == null) return; + + if (null != _parms._initial_weights) { + Frame user_points = _parms._initial_weights.get(); + if (user_points.numCols() != _train.numCols() - numSpecialCols()) { + error("_initial_weights", + "The user-specified initial weights must have the same number of columns " + + "(" + (_train.numCols() - numSpecialCols()) + ") as the training observations"); + } + + if (user_points.hasNAs()) { + error("_initial_weights", "Initial weights cannot contain missing values."); + } + } + + for (int i = 0; i < _train.numCols(); i++) { + Vec vec = _train.vec(i); + String vecName = _train.name(i); + if (vec.naCnt() > 0 && (null == _parms._ignored_columns || Arrays.binarySearch(_parms._ignored_columns, vecName) < 0)) { + error("_train", "Training frame cannot contain any missing values [" + vecName + "]."); + } + } + + for (Vec vec : _train.vecs()) { + if (!(vec.isNumeric() || vec.isTime() || vec.isCategorical())) { + error("_train", "SVM supports only frames with numeric values. But a " + vec.get_type_str() + " was found."); + } + } + + if (null != _parms._response_column && null == _train.vec(_parms._response_column)) { + error("_train", "Training frame has to contain the response column."); + } + + + if (_train != null && _parms._response_column != null) { + String[] responseDomains = responseDomains(); + if (null == responseDomains) { + if (!(Double.isNaN(_parms._threshold))) { + error("_threshold", "Threshold cannot be set for regression SVM. Set the threshold to NaN or modify the response column to an enum."); + } + + if (!_train.vec(_parms._response_column).isNumeric()) { + error("_threshold", "Threshold cannot be set for regression SVM. Set the threshold to NaN or modify the response column to an enum."); + error("_response_column", "Regression SVM requires the response column type to be numeric."); + } + } else { + if (Double.isNaN(_parms._threshold)) { + error("_threshold", "Threshold has to be set for binomial SVM. Set the threshold to a numeric value or change the response column type."); + } + + if (responseDomains.length != 2) { + error("_response_column", "Binomial SVM requires the response column's domain to be \"0\" and \"1\"."); + } + } + } + } + + private String[] responseDomains() { + int idx = _parms.train().find(_parms._response_column); + if (idx == -1) { + return null; + } + return _parms.train().domains()[idx]; + } + + @Override + public int numSpecialCols() { + return (hasOffsetCol() ? 1 : 0) + + (hasWeightCol() ? 1 : 0) + + (hasFoldCol() ? 1 : 0) + 1; + } + + private final class SVMDriver extends Driver { + + transient private H2OContext h2oContext = hc; + transient private SparkContext sc = hc.sparkContext(); + transient private SQLContext sqlContext = SQLContext.getOrCreate(sc); + + @Override + public void computeImpl() { + init(true); + + // The model to be built + SVMModel model = new SVMModel(dest(), _parms, new SVMModel.SVMOutput(SVM.this)); + model.delete_and_lock(_job); + + RDD training = getTrainingData( + _train, + _parms._response_column, + model._output.nfeatures() + ); + training.cache(); + + SVMWithSGD svm = new SVMWithSGD(); + svm.setIntercept(_parms._add_intercept); + + svm.optimizer().setNumIterations(_parms._max_iterations); + + svm.optimizer().setStepSize(_parms._step_size); + svm.optimizer().setRegParam(_parms._reg_param); + svm.optimizer().setMiniBatchFraction(_parms._mini_batch_fraction); + svm.optimizer().setConvergenceTol(_parms._convergence_tol); + svm.optimizer().setGradient(_parms._gradient.get()); + svm.optimizer().setUpdater(_parms._updater.get()); + + final org.apache.spark.mllib.classification.SVMModel trainedModel = + (null == _parms._initial_weights) ? + svm.run(training) : + svm.run(training, vec2vec(_parms.initialWeights().vecs())); + training.unpersist(false); + + model._output.weights_$eq(trainedModel.weights().toArray()); + model._output.iterations_$eq(_parms._max_iterations); + model._output.interceptor_$eq(trainedModel.intercept()); + + model._output._training_metrics = + SVMDriverUtil.computeMetrics(model, training, trainedModel, _parms.train(), responseDomains()); + model.update(_job); + + _job.update(model._parms._max_iterations); + + if (_valid != null) { + model.score(_parms.valid()).delete(); + model._output._validation_metrics = ModelMetrics.getFromDKV(model, _parms.valid()); + model.update(_job); + } + + model._output.interceptor_$eq(trainedModel.intercept()); + + Log.info(model._output._model_summary); + + } + + private Vector vec2vec(Vec[] vals) { + double[] dense = new double[vals.length]; + for (int i = 0; i < vals.length; i++) { + dense[i] = vals[i].at(0); + } + return Vectors.dense(dense); + } + + private RDD getTrainingData(Frame parms, String _response_column, int nfeatures) { + return h2oContext.asDataFrame(new H2OFrame(parms), true, sqlContext) + .javaRDD() + .map(new RowToLabeledPoint(nfeatures, _response_column, parms.domains()[parms.find(_response_column)])).rdd(); + } + } +} + +class RowToLabeledPoint implements Function { + private final int nfeatures; + private final String _response_column; + private final String[] domains; + + RowToLabeledPoint(int nfeatures, String response_column, String[] domains) { + this.nfeatures = nfeatures; + this._response_column = response_column; + this.domains = domains; + } + + @Override + public LabeledPoint call(Row row) throws Exception { + StructField[] fields = row.schema().fields(); + double[] features = new double[nfeatures]; + for (int i = 0; i < nfeatures; i++) { + features[i] = toDouble(row.get(i), fields[i]); + } + + return new LabeledPoint( + Arrays.binarySearch(domains, row.getAs(_response_column)), + Vectors.dense(features)); + } + + private double toDouble(Object value, StructField fieldStruct) { + if (fieldStruct.dataType().sameType(DataTypes.ByteType)) { + return ((Byte) value).doubleValue(); + } + + if (fieldStruct.dataType().sameType(DataTypes.ShortType)) { + return ((Short) value).doubleValue(); + } + + if (fieldStruct.dataType().sameType(DataTypes.IntegerType)) { + return ((Integer) value).doubleValue(); + } + + if (fieldStruct.dataType().sameType(DataTypes.DoubleType)) { + return (Double) value; + } + + if (fieldStruct.dataType().sameType(DataTypes.StringType)) { + return Double.parseDouble(value.toString()); + } + + throw new IllegalArgumentException("Target column has to be an enum or a number. " + fieldStruct.toString()); + } +} + diff --git a/ml/src/main/scala/org/apache/spark/ml/spark/models/svm/SVMModel.scala b/ml/src/main/scala/org/apache/spark/ml/spark/models/svm/SVMModel.scala new file mode 100644 index 000000000..bd50140d0 --- /dev/null +++ b/ml/src/main/scala/org/apache/spark/ml/spark/models/svm/SVMModel.scala @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.ml.spark.models.svm + +import hex.ModelMetricsSupervised.MetricBuilderSupervised +import hex._ +import water.codegen.CodeGeneratorPipeline +import water.util.{JCodeGen, SBPrintStream} +import water.{H2O, Key, Keyed} + +object SVMModel { + + class SVMOutput(val b: SVM) extends Model.Output(b) { + var interceptor: Double = .0 + var iterations: Int = 0 + var weights: Array[Double] = null + } + +} + +class SVMModel private[svm](val selfKey: Key[_ <: Keyed[_ <: Keyed[_ <: AnyRef]]], + val parms: SVMParameters, + val output: SVMModel.SVMOutput) + extends Model[SVMModel, SVMParameters, SVMModel.SVMOutput](selfKey, parms, output) { + + override def makeMetricBuilder(domain: Array[String]): MetricBuilderSupervised[Nothing] = + _output.getModelCategory match { + case ModelCategory.Binomial => + new ModelMetricsBinomial.MetricBuilderBinomial(domain) + case ModelCategory.Regression => + new ModelMetricsRegression.MetricBuilderRegression + case _ => + throw H2O.unimpl + } + + protected def score0(data: Array[Double], preds: Array[Double]): Array[Double] = { + java.util.Arrays.fill(preds, 0) + val pred = + data.zip(_output.weights).foldRight(_output.interceptor){ case ((d, w), acc) => d * w + acc} + + if(_parms._threshold.isNaN) { // Regression + preds(0) = pred + } else { // Binomial + if(pred > _parms._threshold) { + preds(2) = 1 + preds(1) = -1 + preds(0) = 1 + } else { + preds(2) = -1 + preds(1) = 1 + preds(0) = 0 + } + } + preds + } + + override protected def toJavaInit(sb: SBPrintStream, fileCtx: CodeGeneratorPipeline): SBPrintStream = { + val sbInitialized = super.toJavaInit(sb, fileCtx) + sbInitialized.ip("public boolean isSupervised() { return " + isSupervised + "; }").nl + JCodeGen.toStaticVar(sbInitialized, "WEIGHTS", _output.weights, "Weights.") + sbInitialized + } + + override protected def toJavaPredictBody(bodySb: SBPrintStream, + classCtx: CodeGeneratorPipeline, + fileCtx: CodeGeneratorPipeline, + verboseCode: Boolean) { + bodySb.i.p("java.util.Arrays.fill(preds,0);").nl + bodySb.i.p(s"double prediction = ${_output.interceptor};").nl + bodySb.i.p("for(int i = 0; i < data.length; i++) {").nl + bodySb.i(1).p("prediction += (data[i] * WEIGHTS[i]);").nl + bodySb.i.p("}").nl + + if (_output.nclasses == 1) { + bodySb.i.p("preds[0] = prediction;").nl + } else { + bodySb.i.p(s"if(prediction > ${_parms._threshold}) {").nl + bodySb.i(1).p("preds[2] = 1;").nl + bodySb.i(1).p("preds[1] = -1;").nl + bodySb.i(1).p("preds[0] = 1;").nl + bodySb.i.p(s"} else {").nl + bodySb.i(1).p("preds[2] = -1;").nl + bodySb.i(1).p("preds[1] = 1;").nl + bodySb.i(1).p("preds[0] = 0;").nl + bodySb.i.p(s"}").nl + } + + } + +} diff --git a/ml/src/main/scala/org/apache/spark/ml/spark/models/svm/SVMParameters.java b/ml/src/main/scala/org/apache/spark/ml/spark/models/svm/SVMParameters.java new file mode 100644 index 000000000..66b6ce7ce --- /dev/null +++ b/ml/src/main/scala/org/apache/spark/ml/spark/models/svm/SVMParameters.java @@ -0,0 +1,63 @@ +package org.apache.spark.ml.spark.models.svm; + +import hex.Model; +import water.Key; +import water.fvec.Frame; + +public class SVMParameters extends Model.Parameters { + @Override + public String algoName() { return "SVM"; } + + @Override + public String fullName() { return "Support Vector Machine (*Spark*)"; } + + @Override + public String javaName() { return SVMModel.class.getName(); } + + @Override + public long progressUnits() { return _max_iterations; } + + public final Frame initialWeights() { + if (null == _initial_weights) { + return null; + } else { + return _initial_weights.get(); + } + } + + public int _max_iterations = 1000; + public boolean _add_intercept = false; + public double _step_size = 1.0; + public double _reg_param = 0.01; + public double _convergence_tol = 0.001; + public double _mini_batch_fraction = 1.0; + public double _threshold = 0.0; + public Updater _updater = Updater.L2; + public Gradient _gradient = Gradient.Hinge; + public Key _initial_weights = null; + + public void validate(SVM svm) { + if (_max_iterations < 0 || _max_iterations > 1e6) { + svm.error("_max_iterations", " max_iterations must be between 0 and 1e6"); + } + + if(_step_size <= 0) { + svm.error("_step_size", "The step size has to be positive."); + } + + if(_reg_param <= 0) { + svm.error("_reg_param", "The regularization parameter has to be positive."); + } + + if(_convergence_tol <= 0) { + svm.error("_convergence_tol", "The convergence tolerance has to be positive."); + } + + if(_mini_batch_fraction <= 0) { + svm.error("_mini_batch_fraction", "The minimum batch fraction has to be positive."); + } + + } + + +} diff --git a/ml/src/main/scala/org/apache/spark/ml/spark/models/svm/Updater.java b/ml/src/main/scala/org/apache/spark/ml/spark/models/svm/Updater.java new file mode 100644 index 000000000..be5ff38e5 --- /dev/null +++ b/ml/src/main/scala/org/apache/spark/ml/spark/models/svm/Updater.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.ml.spark.models.svm; + +import org.apache.spark.mllib.optimization.L1Updater; +import org.apache.spark.mllib.optimization.SimpleUpdater; +import org.apache.spark.mllib.optimization.SquaredL2Updater; + +public enum Updater { + L2(new SquaredL2Updater()), + L1(new L1Updater()), + Simple(new SimpleUpdater()); + + private org.apache.spark.mllib.optimization.Updater sparkUpdater; + + Updater(org.apache.spark.mllib.optimization.Updater sparkUpdater) { + this.sparkUpdater = sparkUpdater; + } + + public org.apache.spark.mllib.optimization.Updater get() { + return sparkUpdater; + } +} diff --git a/ml/src/test/scala/org/apache/spark/ml/spark/models/svm/SVMModelTest.scala b/ml/src/test/scala/org/apache/spark/ml/spark/models/svm/SVMModelTest.scala new file mode 100644 index 000000000..c412cb1fd --- /dev/null +++ b/ml/src/test/scala/org/apache/spark/ml/spark/models/svm/SVMModelTest.scala @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.ml.spark.models.svm + +import org.apache.spark.SparkContext +import org.apache.spark.h2o.utils.SharedSparkTestContext +import org.apache.spark.mllib.classification +import org.apache.spark.mllib.linalg.Vectors +import org.junit.runner.RunWith +import org.scalatest.FunSuite +import org.scalatest.junit.JUnitRunner + +import scala.util.Random + +@RunWith(classOf[JUnitRunner]) +class SVMModelTest extends FunSuite with SharedSparkTestContext { + + override def createSparkContext: SparkContext = new SparkContext("local[*]", "test-local", conf = defaultSparkConf) + + test("Should score the same regression value.") { + val sqlContext = sqlc + import sqlContext.implicits._ + val h2oContext = hc + import h2oContext.implicits._ + + // Generate random training data + val trainRDD = sc.parallelize(1 to 50, 1).map(v => { + val values = Array.fill(5){0}.map(x => Random.nextDouble()) + //val label = Math.round(Random.nextDouble()) + val label = if (Math.round(Random.nextDouble()) > 0.5) "1" else "0" + (label, Vectors.dense(values)) + }).cache() + val trainDF = trainRDD.toDF("Label", "Vector") + + val trainFrame = hc.asH2OFrame(trainDF, "bubbles") + trainFrame.replace(0, trainFrame.vec(0).toCategoricalVec).remove() + + val initialWeights = Vectors.dense(1, 1, 1, 1, 1) + val weightsDF = sc.parallelize(Array(Tuple1(initialWeights))).toDF("Vector") + val weightsFrame = hc.asH2OFrame(weightsDF, "weights") + + // Learning parameters + val parms = new SVMParameters + parms._train = trainFrame + parms._response_column = 'Label + parms._initial_weights = weightsFrame + + val svm = new SVM(parms, h2oContext) + + // Train model + val h2oSVMModel: SVMModel = svm.trainModel.get + + val sparkSVMModel = new classification.SVMModel( + Vectors.dense(h2oSVMModel.output.weights), + h2oSVMModel.output.interceptor + ) + + // Make sure both scoring methods return the same results + val h2oPredsVec = h2oSVMModel.score(trainFrame).vec(0) + val h2oPreds = (0 until 50).map(h2oPredsVec.at(_)).toArray + val sparkPreds = sparkSVMModel.predict(trainRDD.map(_._2)).collect() + + assert( h2oPreds.sameElements(sparkPreds) ) + + } + +}