diff --git a/h2o-core/src/main/java/water/parser/FVecParseWriter.java b/h2o-core/src/main/java/water/parser/FVecParseWriter.java index 468a3280ccf..c0031005553 100644 --- a/h2o-core/src/main/java/water/parser/FVecParseWriter.java +++ b/h2o-core/src/main/java/water/parser/FVecParseWriter.java @@ -96,6 +96,12 @@ public FVecParseWriter(Vec.VectorGroup vg, int cidx, Categorical[] categoricals, @Override public final void addInvalidCol(int colIdx) { if(colIdx < _nCols) _nvs[_col = colIdx].addNA(); } + + @Override + public void addNAs(int colIdx, int nrows) { + (_nvs[colIdx] = _vecs[colIdx].chunkForChunkIdx(_cidx)).addNAs(nrows); + } + @Override public boolean isString(int colIdx) { return (colIdx < _nCols) && (_ctypes[colIdx] == Vec.T_CAT || _ctypes[colIdx] == Vec.T_STR);} @Override public void addStrCol(int colIdx, BufferedString str) { diff --git a/h2o-core/src/main/java/water/parser/ParseDataset.java b/h2o-core/src/main/java/water/parser/ParseDataset.java index 12a258433ed..711efddb3ae 100644 --- a/h2o-core/src/main/java/water/parser/ParseDataset.java +++ b/h2o-core/src/main/java/water/parser/ParseDataset.java @@ -570,13 +570,6 @@ protected void compute() { // files are parsed in parallel across the cluster), but we want to throttle // the parallelism on each node. private static class MultiFileParseTask extends MRTask { - // TOO_MANY_KEYS_COUNT specifies when to disable parallel parse. We want to cover a scenario when - // we are working with too many keys made of small files - in this case the distributed parse - // doesn't work well because of the way chunks are distributed to nodes. We should switch to a local - // parse to make sure the work is uniformly distributed across the whole cluster. - private static final int TOO_MANY_KEYS_COUNT = 128; - // A file is considered to be small if it can fit into number of chunks. - private static final int SMALL_FILE_NCHUNKS = 10; private final ParseSetup _parseSetup; // The expected column layout private final VectorGroup _vg; // vector group of the target dataset @@ -729,21 +722,19 @@ private FVecParseWriter makeDout(ParseSetup localSetup, int chunkOff, int nchunk try { switch( cpr ) { case NONE: - boolean disableParallelParse = localSetup.disableParallelParse || (_keys.length > TOO_MANY_KEYS_COUNT) && - (vec.nChunks() <= SMALL_FILE_NCHUNKS) && _parseSetup._parse_type.isStreamParseSupported(); - if( _parseSetup._parse_type.isParallelParseSupported() && (! disableParallelParse)) { + ParserInfo.ParseMethod pm = _parseSetup._parse_type.parseMethod(_keys.length,vec.nChunks()); + if(pm == ParserInfo.ParseMethod.DistributedParse) { new DistributedParse(_vg, localSetup, _vecIdStart, chunkStartIdx, this, key, vec.nChunks()).dfork(vec).getResult(false); for( int i = 0; i < vec.nChunks(); ++i ) _chunk2ParseNodeMap[chunkStartIdx + i] = vec.chunkKey(i).home_node().index(); - } else { + } else if(pm == ParserInfo.ParseMethod.StreamParse){ localSetup = ParserService.INSTANCE.getByInfo(localSetup._parse_type).setupLocal(vec,localSetup); InputStream bvs = vec.openStream(_jobKey); Parser p = localSetup.parser(_jobKey); _dout[_lo] = ((FVecParseWriter) p.streamParse(bvs,makeDout(localSetup,chunkStartIdx,vec.nChunks()))).close(_fs); _errors = _dout[_lo].removeErrors(); chunksAreLocal(vec,chunkStartIdx,key); - - } + } else throw H2O.unimpl(); break; case ZIP: { localSetup = ParserService.INSTANCE.getByInfo(localSetup._parse_type).setupLocal(vec,localSetup); diff --git a/h2o-core/src/main/java/water/parser/ParseWriter.java b/h2o-core/src/main/java/water/parser/ParseWriter.java index c00f76e382f..ae15e0ed72e 100644 --- a/h2o-core/src/main/java/water/parser/ParseWriter.java +++ b/h2o-core/src/main/java/water/parser/ParseWriter.java @@ -39,6 +39,7 @@ public String toString(){ void addNumCol(int colIdx, double d); // An an invalid / missing entry void addInvalidCol(int colIdx); + void addNAs(int colIdx, int nrow); // Add a String column void addStrCol( int colIdx, BufferedString str ); // Final rolling back of partial line diff --git a/h2o-core/src/main/java/water/parser/ParserInfo.java b/h2o-core/src/main/java/water/parser/ParserInfo.java index d672acf1fec..e9e5ef476eb 100644 --- a/h2o-core/src/main/java/water/parser/ParserInfo.java +++ b/h2o-core/src/main/java/water/parser/ParserInfo.java @@ -1,5 +1,6 @@ package water.parser; +import water.H2O; import water.Iced; /** @@ -43,6 +44,28 @@ public int priority() { return prior; } + // TOO_MANY_KEYS_COUNT specifies when to disable parallel parse. We want to cover a scenario when + // we are working with too many keys made of small files - in this case the distributed parse + // doesn't work well because of the way chunks are distributed to nodes. We should switch to a local + // parse to make sure the work is uniformly distributed across the whole cluster. + public static final int TOO_MANY_KEYS_COUNT = 128; + // A file is considered to be small if it can fit into number of chunks. + public static final int SMALL_FILE_NCHUNKS = 10; + + public enum ParseMethod {StreamParse, DistributedParse} + /* + localSetup.disableParallelParse || + */ + public ParseMethod parseMethod(int nfiles, int nchunks){ + if(isStreamParseSupported()) { + if (!isParallelParseSupported() || (nfiles > TOO_MANY_KEYS_COUNT && (nchunks <= SMALL_FILE_NCHUNKS))) + return ParseMethod.StreamParse; + } + if(isParallelParseSupported()) + return ParseMethod.DistributedParse; + throw H2O.unimpl(); + } + /** Does the parser support parallel parse? */ public boolean isParallelParseSupported() { return isParallelParseSupported; diff --git a/h2o-core/src/main/java/water/parser/PreviewParseWriter.java b/h2o-core/src/main/java/water/parser/PreviewParseWriter.java index 0d5f604ede1..e0db08e3845 100644 --- a/h2o-core/src/main/java/water/parser/PreviewParseWriter.java +++ b/h2o-core/src/main/java/water/parser/PreviewParseWriter.java @@ -98,6 +98,12 @@ private void setColumnCount(int n) { _data[_nlines][colIdx] = "NA"; } } + + @Override + public void addNAs(int colIdx, int nrow) { + throw H2O.unimpl(); + } + @Override public void addStrCol(int colIdx, BufferedString str) { if(colIdx < _ncols) { // Check for time diff --git a/h2o-parsers/h2o-orc-parser/src/main/java/water/parser/orc/OrcParser.java b/h2o-parsers/h2o-orc-parser/src/main/java/water/parser/orc/OrcParser.java index dd2dce487f3..06b5af2806f 100644 --- a/h2o-parsers/h2o-orc-parser/src/main/java/water/parser/orc/OrcParser.java +++ b/h2o-parsers/h2o-orc-parser/src/main/java/water/parser/orc/OrcParser.java @@ -132,12 +132,19 @@ protected final ParseWriter parseChunk(int chunkId, ParseReader din, ParseWriter int colIndex = 0; for (int col = 0; col < batch.numCols; ++col) { // read one column at a time; if (toInclude[col + 1]) { // only write a column if we actually want it - write1column(dataVectors[col], orcTypes[colIndex], colIndex, nrows, dout); + if(_setup.getColumnTypes()[colIndex] != Vec.T_BAD) + write1column(dataVectors[col], orcTypes[colIndex], colIndex, nrows, dout); + else dout.addNAs(col,nrows); colIndex++; } } rows += currentBatchRow; // record number of rows of data actually read } + byte [] col_types = _setup.getColumnTypes(); + for(int i = 0; i < col_types.length; ++i){ + if(col_types[i] == Vec.T_BAD) + dout.addNAs(i,(int)rowCount); + } perStripe.close(); } catch(IOException ioe) { throw new RuntimeException(ioe); diff --git a/h2o-parsers/h2o-orc-parser/src/main/java/water/parser/orc/OrcParserProvider.java b/h2o-parsers/h2o-orc-parser/src/main/java/water/parser/orc/OrcParserProvider.java index 6c4b0b34292..3352f6032a7 100644 --- a/h2o-parsers/h2o-orc-parser/src/main/java/water/parser/orc/OrcParserProvider.java +++ b/h2o-parsers/h2o-orc-parser/src/main/java/water/parser/orc/OrcParserProvider.java @@ -12,7 +12,7 @@ import water.Key; import water.fvec.*; import water.parser.*; -import water.persist.PersistHdfs; +import water.persist.VecFileSystem; import java.io.IOException; import java.util.List; @@ -25,8 +25,23 @@ */ public class OrcParserProvider extends ParserProvider { + public static class OrcParserInfo extends ParserInfo { + + public OrcParserInfo() { + super("ORC", DefaultParserProviders.MAX_CORE_PRIO + 20, true, true, false); + } + + public ParseMethod parseMethod(int nfiles, int nchunks){ + int ncores_tot = H2O.NUMCPUS*H2O.CLOUD.size(); + // prefer StreamParse if we have enough files to keep cluster busy + // ORC stream parse is more efficient + return + nfiles >= (ncores_tot >> 1) // got enough files to keep cluster busy + ?ParseMethod.StreamParse:ParseMethod.StreamParse;//ParseMethod.DistributedParse; + } + } /* Setup for this parser */ - static ParserInfo ORC_INFO = new ParserInfo("ORC", DefaultParserProviders.MAX_CORE_PRIO + 20, true); + static ParserInfo ORC_INFO = new OrcParserInfo(); @Override public ParserInfo info() { @@ -70,10 +85,8 @@ public ParseSetup createParserSetup(Key[] inputs, ParseSetup requiredSetup) { private Reader getReader(FileVec f) throws IOException { String strPath = getPathForKey(f._key); Path path = new Path(strPath); - if(f instanceof HDFSFileVec) - return OrcFile.createReader(PersistHdfs.getFS(strPath), path); - else - return OrcFile.createReader(path, OrcFile.readerOptions(new Configuration())); + Configuration conf = VecFileSystem.makeConfiguration(f); + return OrcFile.createReader(VecFileSystem.get(conf), path); } /** @@ -103,7 +116,7 @@ public ParseSetup readSetup(FileVec f, String[] columnNames, byte[] columnTypes) byte[] old_columnTypes = stp.getColumnTypes(); String[] old_columnTypeNames = stp.getColumnTypesString(); for (int index = 0; index < columnTypes.length; index++) { - if (columnTypes[index] == Vec.T_CAT) // only copy the enum types + if (columnTypes[index] == Vec.T_CAT || columnTypes[index] == Vec.T_BAD) // only copy the enum types old_columnTypes[index] = columnTypes[index]; } stp.setColumnTypes(old_columnTypes); @@ -129,7 +142,6 @@ public ParseSetup setupLocal(Vec v, ParseSetup setup){ if(!(v instanceof FileVec)) throw H2O.unimpl("ORC only implemented for HDFS / NFS files"); try { ((OrcParser.OrcParseSetup)setup).setOrcFileReader(getReader((FileVec)v)); - return setup; } catch (IOException e) {throw new RuntimeException(e);} diff --git a/h2o-parsers/h2o-orc-parser/src/test/java/water/parser/ParseTestOrc.java b/h2o-parsers/h2o-orc-parser/src/test/java/water/parser/ParseTestOrc.java index 53c0811970e..844be61c706 100644 --- a/h2o-parsers/h2o-orc-parser/src/test/java/water/parser/ParseTestOrc.java +++ b/h2o-parsers/h2o-orc-parser/src/test/java/water/parser/ParseTestOrc.java @@ -1,5 +1,6 @@ package water.parser; +import org.junit.Assert; import org.junit.Assume; import org.junit.BeforeClass; import org.junit.Test; @@ -9,8 +10,13 @@ import java.util.Set; import java.util.TreeSet; +import water.Key; import water.TestUtil; +import water.api.schemas3.ParseSetupV3; import water.fvec.Frame; +import water.fvec.NFSFileVec; +import water.fvec.Vec; +import water.parser.orc.OrcParserProvider; import water.util.FileUtils; import water.util.Log; @@ -77,6 +83,21 @@ static public void _preconditionJavaVersion() { // NOTE: the `_` force execution Assume.assumeTrue("Java6 is not supported", !System.getProperty("java.version", "NA").startsWith("1.6")); } + @Test public void testBadColumn(){ + NFSFileVec nfs = makeNfsFileVec("smalldata/parser/orc/orc_split_elim.orc"); + Key outputKey = Key.make("orc_Test"); + ParseSetup pstp = new ParseSetup(new ParseSetupV3()); + pstp._parse_type = new OrcParserProvider.OrcParserInfo(); + ParseSetup ps = ParseSetup.guessSetup(new Key[]{nfs._key}, pstp); + Assert.assertEquals(ps._parse_type.name(), "ORC"); + System.out.println("ParseSetup"); + System.out.println(ps); + ps._column_types[0] = Vec.T_BAD; + Frame fr = ParseDataset.parse(outputKey, new Key[]{nfs._key},true,ps); + Assert.assertTrue(fr.vec(0).isBad()); + fr.delete(); + } + @Test public void testParseAllOrcs() { Set failedFiles = new TreeSet<>(); diff --git a/h2o-parsers/h2o-orc-parser/testMultiNode.sh b/h2o-parsers/h2o-orc-parser/testMultiNode.sh index d77f28af07b..2e0514b7140 100755 --- a/h2o-parsers/h2o-orc-parser/testMultiNode.sh +++ b/h2o-parsers/h2o-orc-parser/testMultiNode.sh @@ -65,7 +65,7 @@ fi # build/classes/main - Main h2o core classes # build/classes/test - Test h2o core classes # build/resources/main - Main resources (e.g. page.html) -JVM="nice $JAVA_CMD -DcloudSize=5 -ea $COVERAGE -Xmx${MAX_MEM} -Xms${MAX_MEM} -cp build/libs/h2o-orc-parser-test.jar${SEP}build/libs/h2o-orc-parser.jar${SEP}../../h2o-core/build/libs/h2o-core-test.jar${SEP}../../h2o-core/build/libs/h2o-core.jar${SEP}../../h2o-genmodel/build/libs/h2o-genmodel.jar${SEP}../../lib/*" +JVM="nice $JAVA_CMD -DcloudSize=5 -ea $COVERAGE -Xmx${MAX_MEM} -Xms${MAX_MEM} -cp build/libs/h2o-orc-parser-test.jar${SEP}build/libs/h2o-orc-parser.jar${SEP}../../h2o-core/build/libs/h2o-core-test.jar${SEP}../../h2o-core/build/libs/h2o-core.jar${SEP}../../h2o-genmodel/build/libs/h2o-genmodel.jar${SEP}../../h2o-persist-hdfs/build/libs/h2o-persist-hdfs.jar${SEP}../../lib/*" echo "$JVM" > $OUTDIR/jvm_cmd.txt # Ahhh... but the makefile runs the tests skipping the jar'ing step when possible. diff --git a/h2o-parsers/h2o-parquet-parser/build.gradle b/h2o-parsers/h2o-parquet-parser/build.gradle index 5ea91f75f8a..9b64bf8cb78 100644 --- a/h2o-parsers/h2o-parquet-parser/build.gradle +++ b/h2o-parsers/h2o-parquet-parser/build.gradle @@ -8,6 +8,9 @@ def parquetHadoopVersion = binding.variables.get("hadoopVersion") ? dependencies { compile project(":h2o-core") + compile(project(":h2o-persist-hdfs")) { + transitive = false + } // Parquet support // Use the same version as Spark 1.5-2.0 compile("org.apache.parquet:parquet-hadoop:1.7.0") diff --git a/h2o-parsers/h2o-parquet-parser/src/main/java/org/apache/parquet/hadoop/VecParquetReader.java b/h2o-parsers/h2o-parquet-parser/src/main/java/org/apache/parquet/hadoop/VecParquetReader.java index 0c3b439c6cb..976cb9f72ee 100644 --- a/h2o-parsers/h2o-parquet-parser/src/main/java/org/apache/parquet/hadoop/VecParquetReader.java +++ b/h2o-parsers/h2o-parquet-parser/src/main/java/org/apache/parquet/hadoop/VecParquetReader.java @@ -36,13 +36,14 @@ import water.fvec.Vec; import water.parser.ParseWriter; import water.parser.parquet.ChunkReadSupport; -import water.parser.parquet.VecDataInputStream; -import water.parser.parquet.VecFileSystem; +import water.persist.VecDataInputStream; import water.util.Log; import static org.apache.parquet.bytes.BytesUtils.readIntLittleEndian; import static org.apache.parquet.hadoop.ParquetFileWriter.MAGIC; +import water.persist.VecFileSystem; + /** * Implementation of Parquet Reader working on H2O's Vecs. * diff --git a/h2o-parsers/h2o-parquet-parser/src/test/java/water/parser/parquet/ParseTestParquet.java b/h2o-parsers/h2o-parquet-parser/src/test/java/water/parser/parquet/ParseTestParquet.java index f35c8ca0741..90941c4e790 100644 --- a/h2o-parsers/h2o-parquet-parser/src/test/java/water/parser/parquet/ParseTestParquet.java +++ b/h2o-parsers/h2o-parquet-parser/src/test/java/water/parser/parquet/ParseTestParquet.java @@ -42,7 +42,7 @@ private static double EPSILON = 1e-9; @BeforeClass - static public void setup() { TestUtil.stall_till_cloudsize(5); } + static public void setup() { TestUtil.stall_till_cloudsize(1); } @Test public void testParseSimple() { diff --git a/h2o-parsers/h2o-parquet-parser/src/test/java/water/parser/parquet/VecDataInputStreamTest.java b/h2o-parsers/h2o-parquet-parser/src/test/java/water/parser/parquet/VecDataInputStreamTest.java index 98ca24b719c..1751bc58300 100644 --- a/h2o-parsers/h2o-parquet-parser/src/test/java/water/parser/parquet/VecDataInputStreamTest.java +++ b/h2o-parsers/h2o-parquet-parser/src/test/java/water/parser/parquet/VecDataInputStreamTest.java @@ -7,6 +7,7 @@ import water.MRTask; import water.TestUtil; import water.fvec.*; +import water.persist.VecDataInputStream; import java.io.ByteArrayInputStream; import java.io.IOException; diff --git a/h2o-parsers/h2o-parquet-parser/testMultiNode.sh b/h2o-parsers/h2o-parquet-parser/testMultiNode.sh index 9d7c001c289..b7048d7380a 100755 --- a/h2o-parsers/h2o-parquet-parser/testMultiNode.sh +++ b/h2o-parsers/h2o-parquet-parser/testMultiNode.sh @@ -44,7 +44,7 @@ fi # build/classes/main - Main h2o core classes # build/classes/test - Test h2o core classes # build/resources/main - Main resources (e.g. page.html) -JVM="nice $JAVA_CMD -DcloudSize=5 -ea -Xmx3g -Xms3g -cp build/libs/h2o-parquet-parser-test.jar${SEP}build/libs/h2o-parquet-parser.jar${SEP}../../h2o-core/build/libs/h2o-core-test.jar${SEP}../../h2o-core/build/libs/h2o-core.jar${SEP}../../h2o-genmodel/build/libs/h2o-genmodel.jar${SEP}../../lib/*" +JVM="nice $JAVA_CMD -DcloudSize=5 -ea -Xmx3g -Xms3g -cp build/libs/h2o-parquet-parser-test.jar${SEP}build/libs/h2o-parquet-parser.jar${SEP}../../h2o-core/build/libs/h2o-core-test.jar${SEP}../../h2o-core/build/libs/h2o-core.jar${SEP}../../h2o-genmodel/build/libs/h2o-genmodel.jar${SEP}../../h2o-persist-hdfs/build/libs/h2o-persist-hdfs.jar${SEP}../../lib/*" echo "$JVM" > $OUTDIR/jvm_cmd.txt # Ahhh... but the makefile runs the tests skipping the jar'ing step when possible. # Also, sometimes see test files in the main-class directory, so put the test diff --git a/h2o-parsers/h2o-parquet-parser/src/main/java/water/parser/parquet/VecDataInputStream.java b/h2o-persist-hdfs/src/main/java/water/persist/VecDataInputStream.java similarity index 99% rename from h2o-parsers/h2o-parquet-parser/src/main/java/water/parser/parquet/VecDataInputStream.java rename to h2o-persist-hdfs/src/main/java/water/persist/VecDataInputStream.java index 9fe2975e339..cc489b5aa72 100644 --- a/h2o-parsers/h2o-parquet-parser/src/main/java/water/parser/parquet/VecDataInputStream.java +++ b/h2o-persist-hdfs/src/main/java/water/persist/VecDataInputStream.java @@ -1,4 +1,4 @@ -package water.parser.parquet; +package water.persist; import org.apache.hadoop.fs.PositionedReadable; import org.apache.hadoop.fs.Seekable; diff --git a/h2o-parsers/h2o-parquet-parser/src/main/java/water/parser/parquet/VecFileSystem.java b/h2o-persist-hdfs/src/main/java/water/persist/VecFileSystem.java similarity index 99% rename from h2o-parsers/h2o-parquet-parser/src/main/java/water/parser/parquet/VecFileSystem.java rename to h2o-persist-hdfs/src/main/java/water/persist/VecFileSystem.java index e6b3cf05ccb..9fa519c32ee 100644 --- a/h2o-parsers/h2o-parquet-parser/src/main/java/water/parser/parquet/VecFileSystem.java +++ b/h2o-persist-hdfs/src/main/java/water/persist/VecFileSystem.java @@ -1,4 +1,4 @@ -package water.parser.parquet; +package water.persist; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*;