From 0564b113a4854757e70c8dcd59cbefa65e5aa8cb Mon Sep 17 00:00:00 2001 From: Tomas Nykodym Date: Wed, 21 Jun 2017 15:08:48 -0700 Subject: [PATCH 1/9] Made ORC use VecFileSystem.VecInputStream from Paruqet implementation. VecFileSystem/InputStream refactored to h2o-persist-hdfs project. --- .../src/main/java/water/parser/ParseDataset.java | 17 ++++---------- .../src/main/java/water/parser/ParserInfo.java | 23 ++++++++++++++++++ .../java/water/parser/orc/OrcParserProvider.java | 27 ++++++++++++++++++---- h2o-parsers/h2o-parquet-parser/build.gradle | 3 +++ .../apache/parquet/hadoop/VecParquetReader.java | 5 ++-- .../water/parser/parquet/ParseTestParquet.java | 2 +- .../parser/parquet/VecDataInputStreamTest.java | 1 + .../java/water/persist}/VecDataInputStream.java | 2 +- .../main/java/water/persist}/VecFileSystem.java | 2 +- 9 files changed, 59 insertions(+), 23 deletions(-) rename {h2o-parsers/h2o-parquet-parser/src/main/java/water/parser/parquet => h2o-persist-hdfs/src/main/java/water/persist}/VecDataInputStream.java (99%) rename {h2o-parsers/h2o-parquet-parser/src/main/java/water/parser/parquet => h2o-persist-hdfs/src/main/java/water/persist}/VecFileSystem.java (99%) diff --git a/h2o-core/src/main/java/water/parser/ParseDataset.java b/h2o-core/src/main/java/water/parser/ParseDataset.java index 12a258433ed..a98c469169f 100644 --- a/h2o-core/src/main/java/water/parser/ParseDataset.java +++ b/h2o-core/src/main/java/water/parser/ParseDataset.java @@ -570,13 +570,6 @@ protected void compute() { // files are parsed in parallel across the cluster), but we want to throttle // the parallelism on each node. private static class MultiFileParseTask extends MRTask { - // TOO_MANY_KEYS_COUNT specifies when to disable parallel parse. We want to cover a scenario when - // we are working with too many keys made of small files - in this case the distributed parse - // doesn't work well because of the way chunks are distributed to nodes. We should switch to a local - // parse to make sure the work is uniformly distributed across the whole cluster. - private static final int TOO_MANY_KEYS_COUNT = 128; - // A file is considered to be small if it can fit into number of chunks. - private static final int SMALL_FILE_NCHUNKS = 10; private final ParseSetup _parseSetup; // The expected column layout private final VectorGroup _vg; // vector group of the target dataset @@ -729,21 +722,19 @@ private FVecParseWriter makeDout(ParseSetup localSetup, int chunkOff, int nchunk try { switch( cpr ) { case NONE: - boolean disableParallelParse = localSetup.disableParallelParse || (_keys.length > TOO_MANY_KEYS_COUNT) && - (vec.nChunks() <= SMALL_FILE_NCHUNKS) && _parseSetup._parse_type.isStreamParseSupported(); - if( _parseSetup._parse_type.isParallelParseSupported() && (! disableParallelParse)) { + ParserInfo.ParseMethod pm = _parseSetup._parse_type.parseMethod(_keys.length,vec.nChunks()); + if(pm == ParserInfo.ParseMethod.DistributesParse) { new DistributedParse(_vg, localSetup, _vecIdStart, chunkStartIdx, this, key, vec.nChunks()).dfork(vec).getResult(false); for( int i = 0; i < vec.nChunks(); ++i ) _chunk2ParseNodeMap[chunkStartIdx + i] = vec.chunkKey(i).home_node().index(); - } else { + } else if(pm == ParserInfo.ParseMethod.StreamParse){ localSetup = ParserService.INSTANCE.getByInfo(localSetup._parse_type).setupLocal(vec,localSetup); InputStream bvs = vec.openStream(_jobKey); Parser p = localSetup.parser(_jobKey); _dout[_lo] = ((FVecParseWriter) p.streamParse(bvs,makeDout(localSetup,chunkStartIdx,vec.nChunks()))).close(_fs); _errors = _dout[_lo].removeErrors(); chunksAreLocal(vec,chunkStartIdx,key); - - } + } else throw H2O.unimpl(); break; case ZIP: { localSetup = ParserService.INSTANCE.getByInfo(localSetup._parse_type).setupLocal(vec,localSetup); diff --git a/h2o-core/src/main/java/water/parser/ParserInfo.java b/h2o-core/src/main/java/water/parser/ParserInfo.java index d672acf1fec..8b18bbad70c 100644 --- a/h2o-core/src/main/java/water/parser/ParserInfo.java +++ b/h2o-core/src/main/java/water/parser/ParserInfo.java @@ -1,5 +1,6 @@ package water.parser; +import water.H2O; import water.Iced; /** @@ -43,6 +44,28 @@ public int priority() { return prior; } + // TOO_MANY_KEYS_COUNT specifies when to disable parallel parse. We want to cover a scenario when + // we are working with too many keys made of small files - in this case the distributed parse + // doesn't work well because of the way chunks are distributed to nodes. We should switch to a local + // parse to make sure the work is uniformly distributed across the whole cluster. + public static final int TOO_MANY_KEYS_COUNT = 128; + // A file is considered to be small if it can fit into number of chunks. + public static final int SMALL_FILE_NCHUNKS = 10; + + public enum ParseMethod {StreamParse,DistributesParse} + /* + localSetup.disableParallelParse || + */ + public ParseMethod parseMethod(int nfiles, int nchunks){ + if(isStreamParseSupported()) { + if (!isParallelParseSupported() || (nfiles > TOO_MANY_KEYS_COUNT && (nchunks <= SMALL_FILE_NCHUNKS))) + return ParseMethod.StreamParse; + } + if(isParallelParseSupported()) + return ParseMethod.DistributesParse; + throw H2O.unimpl(); + } + /** Does the parser support parallel parse? */ public boolean isParallelParseSupported() { return isParallelParseSupported; diff --git a/h2o-parsers/h2o-orc-parser/src/main/java/water/parser/orc/OrcParserProvider.java b/h2o-parsers/h2o-orc-parser/src/main/java/water/parser/orc/OrcParserProvider.java index 6c4b0b34292..0a4fcad55de 100644 --- a/h2o-parsers/h2o-orc-parser/src/main/java/water/parser/orc/OrcParserProvider.java +++ b/h2o-parsers/h2o-orc-parser/src/main/java/water/parser/orc/OrcParserProvider.java @@ -1,6 +1,7 @@ package water.parser.orc; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.orc.OrcFile; import org.apache.hadoop.hive.ql.io.orc.Reader; @@ -13,6 +14,7 @@ import water.fvec.*; import water.parser.*; import water.persist.PersistHdfs; +import water.persist.VecFileSystem; import java.io.IOException; import java.util.List; @@ -25,8 +27,25 @@ */ public class OrcParserProvider extends ParserProvider { + public static class OrParserInfo extends ParserInfo { + + public OrParserInfo() { + super("ORC", DefaultParserProviders.MAX_CORE_PRIO + 20, true, true, false); + } + + public ParseMethod parseMethod(int nfiles, int nchunks){ + int ncores_tot = H2O.NUMCPUS*H2O.CLOUD.size(); + // prefer StreamParse if we have enough files to keep cluster busy + // ORC stream parse is more efficient + return + nfiles >= ncores_tot // got enough files to keep cluster busy + || nchunks <= 4 // there is not much parallelization anyways, better to save unnecessary memory loads + || (nfiles >= ncores_tot >> 1) && nchunks <= H2O.NUMCPUS + ?ParseMethod.StreamParse:ParseMethod.StreamParse;//ParseMethod.DistributesParse; + } + } /* Setup for this parser */ - static ParserInfo ORC_INFO = new ParserInfo("ORC", DefaultParserProviders.MAX_CORE_PRIO + 20, true); + static ParserInfo ORC_INFO = new OrParserInfo(); @Override public ParserInfo info() { @@ -70,10 +89,8 @@ public ParseSetup createParserSetup(Key[] inputs, ParseSetup requiredSetup) { private Reader getReader(FileVec f) throws IOException { String strPath = getPathForKey(f._key); Path path = new Path(strPath); - if(f instanceof HDFSFileVec) - return OrcFile.createReader(PersistHdfs.getFS(strPath), path); - else - return OrcFile.createReader(path, OrcFile.readerOptions(new Configuration())); + Configuration conf = VecFileSystem.makeConfiguration(f); + return OrcFile.createReader(VecFileSystem.get(conf), path); } /** diff --git a/h2o-parsers/h2o-parquet-parser/build.gradle b/h2o-parsers/h2o-parquet-parser/build.gradle index 5ea91f75f8a..9b64bf8cb78 100644 --- a/h2o-parsers/h2o-parquet-parser/build.gradle +++ b/h2o-parsers/h2o-parquet-parser/build.gradle @@ -8,6 +8,9 @@ def parquetHadoopVersion = binding.variables.get("hadoopVersion") ? dependencies { compile project(":h2o-core") + compile(project(":h2o-persist-hdfs")) { + transitive = false + } // Parquet support // Use the same version as Spark 1.5-2.0 compile("org.apache.parquet:parquet-hadoop:1.7.0") diff --git a/h2o-parsers/h2o-parquet-parser/src/main/java/org/apache/parquet/hadoop/VecParquetReader.java b/h2o-parsers/h2o-parquet-parser/src/main/java/org/apache/parquet/hadoop/VecParquetReader.java index 0c3b439c6cb..976cb9f72ee 100644 --- a/h2o-parsers/h2o-parquet-parser/src/main/java/org/apache/parquet/hadoop/VecParquetReader.java +++ b/h2o-parsers/h2o-parquet-parser/src/main/java/org/apache/parquet/hadoop/VecParquetReader.java @@ -36,13 +36,14 @@ import water.fvec.Vec; import water.parser.ParseWriter; import water.parser.parquet.ChunkReadSupport; -import water.parser.parquet.VecDataInputStream; -import water.parser.parquet.VecFileSystem; +import water.persist.VecDataInputStream; import water.util.Log; import static org.apache.parquet.bytes.BytesUtils.readIntLittleEndian; import static org.apache.parquet.hadoop.ParquetFileWriter.MAGIC; +import water.persist.VecFileSystem; + /** * Implementation of Parquet Reader working on H2O's Vecs. * diff --git a/h2o-parsers/h2o-parquet-parser/src/test/java/water/parser/parquet/ParseTestParquet.java b/h2o-parsers/h2o-parquet-parser/src/test/java/water/parser/parquet/ParseTestParquet.java index f35c8ca0741..90941c4e790 100644 --- a/h2o-parsers/h2o-parquet-parser/src/test/java/water/parser/parquet/ParseTestParquet.java +++ b/h2o-parsers/h2o-parquet-parser/src/test/java/water/parser/parquet/ParseTestParquet.java @@ -42,7 +42,7 @@ private static double EPSILON = 1e-9; @BeforeClass - static public void setup() { TestUtil.stall_till_cloudsize(5); } + static public void setup() { TestUtil.stall_till_cloudsize(1); } @Test public void testParseSimple() { diff --git a/h2o-parsers/h2o-parquet-parser/src/test/java/water/parser/parquet/VecDataInputStreamTest.java b/h2o-parsers/h2o-parquet-parser/src/test/java/water/parser/parquet/VecDataInputStreamTest.java index 98ca24b719c..1751bc58300 100644 --- a/h2o-parsers/h2o-parquet-parser/src/test/java/water/parser/parquet/VecDataInputStreamTest.java +++ b/h2o-parsers/h2o-parquet-parser/src/test/java/water/parser/parquet/VecDataInputStreamTest.java @@ -7,6 +7,7 @@ import water.MRTask; import water.TestUtil; import water.fvec.*; +import water.persist.VecDataInputStream; import java.io.ByteArrayInputStream; import java.io.IOException; diff --git a/h2o-parsers/h2o-parquet-parser/src/main/java/water/parser/parquet/VecDataInputStream.java b/h2o-persist-hdfs/src/main/java/water/persist/VecDataInputStream.java similarity index 99% rename from h2o-parsers/h2o-parquet-parser/src/main/java/water/parser/parquet/VecDataInputStream.java rename to h2o-persist-hdfs/src/main/java/water/persist/VecDataInputStream.java index 9fe2975e339..cc489b5aa72 100644 --- a/h2o-parsers/h2o-parquet-parser/src/main/java/water/parser/parquet/VecDataInputStream.java +++ b/h2o-persist-hdfs/src/main/java/water/persist/VecDataInputStream.java @@ -1,4 +1,4 @@ -package water.parser.parquet; +package water.persist; import org.apache.hadoop.fs.PositionedReadable; import org.apache.hadoop.fs.Seekable; diff --git a/h2o-parsers/h2o-parquet-parser/src/main/java/water/parser/parquet/VecFileSystem.java b/h2o-persist-hdfs/src/main/java/water/persist/VecFileSystem.java similarity index 99% rename from h2o-parsers/h2o-parquet-parser/src/main/java/water/parser/parquet/VecFileSystem.java rename to h2o-persist-hdfs/src/main/java/water/persist/VecFileSystem.java index e6b3cf05ccb..9fa519c32ee 100644 --- a/h2o-parsers/h2o-parquet-parser/src/main/java/water/parser/parquet/VecFileSystem.java +++ b/h2o-persist-hdfs/src/main/java/water/persist/VecFileSystem.java @@ -1,4 +1,4 @@ -package water.parser.parquet; +package water.persist; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; From 59207c7b7affccad6c974a7526167d66aada452f Mon Sep 17 00:00:00 2001 From: Tomas Nykodym Date: Wed, 21 Jun 2017 15:11:50 -0700 Subject: [PATCH 2/9] simplified orc parse method selection --- .../src/main/java/water/parser/orc/OrcParserProvider.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/h2o-parsers/h2o-orc-parser/src/main/java/water/parser/orc/OrcParserProvider.java b/h2o-parsers/h2o-orc-parser/src/main/java/water/parser/orc/OrcParserProvider.java index 0a4fcad55de..54aeff4ce8a 100644 --- a/h2o-parsers/h2o-orc-parser/src/main/java/water/parser/orc/OrcParserProvider.java +++ b/h2o-parsers/h2o-orc-parser/src/main/java/water/parser/orc/OrcParserProvider.java @@ -38,9 +38,7 @@ public ParseMethod parseMethod(int nfiles, int nchunks){ // prefer StreamParse if we have enough files to keep cluster busy // ORC stream parse is more efficient return - nfiles >= ncores_tot // got enough files to keep cluster busy - || nchunks <= 4 // there is not much parallelization anyways, better to save unnecessary memory loads - || (nfiles >= ncores_tot >> 1) && nchunks <= H2O.NUMCPUS + nfiles >= (ncores_tot >> 1) // got enough files to keep cluster busy ?ParseMethod.StreamParse:ParseMethod.StreamParse;//ParseMethod.DistributesParse; } } From f7ef88bcee086a8072c9a315354eeddf63f3107a Mon Sep 17 00:00:00 2001 From: Tomas Nykodym Date: Wed, 21 Jun 2017 15:35:55 -0700 Subject: [PATCH 3/9] Fixed typo in OrcParserInfo class name. --- .../src/main/java/water/parser/orc/OrcParserProvider.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/h2o-parsers/h2o-orc-parser/src/main/java/water/parser/orc/OrcParserProvider.java b/h2o-parsers/h2o-orc-parser/src/main/java/water/parser/orc/OrcParserProvider.java index 54aeff4ce8a..f78f2a53725 100644 --- a/h2o-parsers/h2o-orc-parser/src/main/java/water/parser/orc/OrcParserProvider.java +++ b/h2o-parsers/h2o-orc-parser/src/main/java/water/parser/orc/OrcParserProvider.java @@ -1,7 +1,6 @@ package water.parser.orc; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.orc.OrcFile; import org.apache.hadoop.hive.ql.io.orc.Reader; @@ -13,7 +12,6 @@ import water.Key; import water.fvec.*; import water.parser.*; -import water.persist.PersistHdfs; import water.persist.VecFileSystem; import java.io.IOException; @@ -27,9 +25,9 @@ */ public class OrcParserProvider extends ParserProvider { - public static class OrParserInfo extends ParserInfo { + public static class OrcParserInfo extends ParserInfo { - public OrParserInfo() { + public OrcParserInfo() { super("ORC", DefaultParserProviders.MAX_CORE_PRIO + 20, true, true, false); } @@ -43,7 +41,7 @@ public ParseMethod parseMethod(int nfiles, int nchunks){ } } /* Setup for this parser */ - static ParserInfo ORC_INFO = new OrParserInfo(); + static ParserInfo ORC_INFO = new OrcParserInfo(); @Override public ParserInfo info() { From b7e00156acfabc798ae7c05ef8f741222e487908 Mon Sep 17 00:00:00 2001 From: Tomas Nykodym Date: Wed, 21 Jun 2017 15:36:28 -0700 Subject: [PATCH 4/9] Enable Orc to ignore (store as all NA) column based on provided user type --- .../src/main/java/water/parser/orc/OrcParserProvider.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/h2o-parsers/h2o-orc-parser/src/main/java/water/parser/orc/OrcParserProvider.java b/h2o-parsers/h2o-orc-parser/src/main/java/water/parser/orc/OrcParserProvider.java index f78f2a53725..a869a34fd04 100644 --- a/h2o-parsers/h2o-orc-parser/src/main/java/water/parser/orc/OrcParserProvider.java +++ b/h2o-parsers/h2o-orc-parser/src/main/java/water/parser/orc/OrcParserProvider.java @@ -116,7 +116,7 @@ public ParseSetup readSetup(FileVec f, String[] columnNames, byte[] columnTypes) byte[] old_columnTypes = stp.getColumnTypes(); String[] old_columnTypeNames = stp.getColumnTypesString(); for (int index = 0; index < columnTypes.length; index++) { - if (columnTypes[index] == Vec.T_CAT) // only copy the enum types + if (columnTypes[index] == Vec.T_CAT || columnTypes[index] == Vec.T_BAD) // only copy the enum types old_columnTypes[index] = columnTypes[index]; } stp.setColumnTypes(old_columnTypes); From 457a58092149d6102a0687b36c5ab70e776ef54e Mon Sep 17 00:00:00 2001 From: Tomas Nykodym Date: Wed, 21 Jun 2017 16:41:31 -0700 Subject: [PATCH 5/9] ORC update: Allow user to force column to all NAs using parse setup. Minor typo fix. --- .../src/main/java/water/parser/FVecParseWriter.java | 6 ++++++ h2o-core/src/main/java/water/parser/ParseDataset.java | 2 +- h2o-core/src/main/java/water/parser/ParseWriter.java | 1 + h2o-core/src/main/java/water/parser/ParserInfo.java | 4 ++-- .../src/main/java/water/parser/PreviewParseWriter.java | 6 ++++++ .../src/main/java/water/parser/orc/OrcParser.java | 8 +++++++- .../main/java/water/parser/orc/OrcParserProvider.java | 2 +- .../src/test/java/water/parser/ParseTestOrc.java | 17 +++++++++++++++++ 8 files changed, 41 insertions(+), 5 deletions(-) diff --git a/h2o-core/src/main/java/water/parser/FVecParseWriter.java b/h2o-core/src/main/java/water/parser/FVecParseWriter.java index 468a3280ccf..1d21bd56cf3 100644 --- a/h2o-core/src/main/java/water/parser/FVecParseWriter.java +++ b/h2o-core/src/main/java/water/parser/FVecParseWriter.java @@ -96,6 +96,12 @@ public FVecParseWriter(Vec.VectorGroup vg, int cidx, Categorical[] categoricals, @Override public final void addInvalidCol(int colIdx) { if(colIdx < _nCols) _nvs[_col = colIdx].addNA(); } + + @Override + public void setInvalidCol(int colIdx, int nrows) { + (_nvs[colIdx] = _vecs[colIdx].chunkForChunkIdx(_cidx)).addNAs(nrows); + } + @Override public boolean isString(int colIdx) { return (colIdx < _nCols) && (_ctypes[colIdx] == Vec.T_CAT || _ctypes[colIdx] == Vec.T_STR);} @Override public void addStrCol(int colIdx, BufferedString str) { diff --git a/h2o-core/src/main/java/water/parser/ParseDataset.java b/h2o-core/src/main/java/water/parser/ParseDataset.java index a98c469169f..711efddb3ae 100644 --- a/h2o-core/src/main/java/water/parser/ParseDataset.java +++ b/h2o-core/src/main/java/water/parser/ParseDataset.java @@ -723,7 +723,7 @@ private FVecParseWriter makeDout(ParseSetup localSetup, int chunkOff, int nchunk switch( cpr ) { case NONE: ParserInfo.ParseMethod pm = _parseSetup._parse_type.parseMethod(_keys.length,vec.nChunks()); - if(pm == ParserInfo.ParseMethod.DistributesParse) { + if(pm == ParserInfo.ParseMethod.DistributedParse) { new DistributedParse(_vg, localSetup, _vecIdStart, chunkStartIdx, this, key, vec.nChunks()).dfork(vec).getResult(false); for( int i = 0; i < vec.nChunks(); ++i ) _chunk2ParseNodeMap[chunkStartIdx + i] = vec.chunkKey(i).home_node().index(); diff --git a/h2o-core/src/main/java/water/parser/ParseWriter.java b/h2o-core/src/main/java/water/parser/ParseWriter.java index c00f76e382f..c1ba91e35eb 100644 --- a/h2o-core/src/main/java/water/parser/ParseWriter.java +++ b/h2o-core/src/main/java/water/parser/ParseWriter.java @@ -39,6 +39,7 @@ public String toString(){ void addNumCol(int colIdx, double d); // An an invalid / missing entry void addInvalidCol(int colIdx); + void setInvalidCol(int colIdx, int nrow); // Add a String column void addStrCol( int colIdx, BufferedString str ); // Final rolling back of partial line diff --git a/h2o-core/src/main/java/water/parser/ParserInfo.java b/h2o-core/src/main/java/water/parser/ParserInfo.java index 8b18bbad70c..e9e5ef476eb 100644 --- a/h2o-core/src/main/java/water/parser/ParserInfo.java +++ b/h2o-core/src/main/java/water/parser/ParserInfo.java @@ -52,7 +52,7 @@ public int priority() { // A file is considered to be small if it can fit into number of chunks. public static final int SMALL_FILE_NCHUNKS = 10; - public enum ParseMethod {StreamParse,DistributesParse} + public enum ParseMethod {StreamParse, DistributedParse} /* localSetup.disableParallelParse || */ @@ -62,7 +62,7 @@ public ParseMethod parseMethod(int nfiles, int nchunks){ return ParseMethod.StreamParse; } if(isParallelParseSupported()) - return ParseMethod.DistributesParse; + return ParseMethod.DistributedParse; throw H2O.unimpl(); } diff --git a/h2o-core/src/main/java/water/parser/PreviewParseWriter.java b/h2o-core/src/main/java/water/parser/PreviewParseWriter.java index 0d5f604ede1..c6d31512494 100644 --- a/h2o-core/src/main/java/water/parser/PreviewParseWriter.java +++ b/h2o-core/src/main/java/water/parser/PreviewParseWriter.java @@ -98,6 +98,12 @@ private void setColumnCount(int n) { _data[_nlines][colIdx] = "NA"; } } + + @Override + public void setInvalidCol(int colIdx, int nrow) { + throw H2O.unimpl(); + } + @Override public void addStrCol(int colIdx, BufferedString str) { if(colIdx < _ncols) { // Check for time diff --git a/h2o-parsers/h2o-orc-parser/src/main/java/water/parser/orc/OrcParser.java b/h2o-parsers/h2o-orc-parser/src/main/java/water/parser/orc/OrcParser.java index dd2dce487f3..69f2497985b 100644 --- a/h2o-parsers/h2o-orc-parser/src/main/java/water/parser/orc/OrcParser.java +++ b/h2o-parsers/h2o-orc-parser/src/main/java/water/parser/orc/OrcParser.java @@ -132,12 +132,18 @@ protected final ParseWriter parseChunk(int chunkId, ParseReader din, ParseWriter int colIndex = 0; for (int col = 0; col < batch.numCols; ++col) { // read one column at a time; if (toInclude[col + 1]) { // only write a column if we actually want it - write1column(dataVectors[col], orcTypes[colIndex], colIndex, nrows, dout); + if(_setup.getColumnTypes()[col] != Vec.T_BAD) + write1column(dataVectors[col], orcTypes[colIndex], colIndex, nrows, dout); colIndex++; } } rows += currentBatchRow; // record number of rows of data actually read } + byte [] col_types = _setup.getColumnTypes(); + for(int i = 0; i < col_types.length; ++i){ + if(col_types[i] == Vec.T_BAD) + dout.setInvalidCol(i,(int)rowCount); + } perStripe.close(); } catch(IOException ioe) { throw new RuntimeException(ioe); diff --git a/h2o-parsers/h2o-orc-parser/src/main/java/water/parser/orc/OrcParserProvider.java b/h2o-parsers/h2o-orc-parser/src/main/java/water/parser/orc/OrcParserProvider.java index a869a34fd04..941bdf9d76a 100644 --- a/h2o-parsers/h2o-orc-parser/src/main/java/water/parser/orc/OrcParserProvider.java +++ b/h2o-parsers/h2o-orc-parser/src/main/java/water/parser/orc/OrcParserProvider.java @@ -37,7 +37,7 @@ public ParseMethod parseMethod(int nfiles, int nchunks){ // ORC stream parse is more efficient return nfiles >= (ncores_tot >> 1) // got enough files to keep cluster busy - ?ParseMethod.StreamParse:ParseMethod.StreamParse;//ParseMethod.DistributesParse; + ?ParseMethod.StreamParse:ParseMethod.StreamParse;//ParseMethod.DistributedParse; } } /* Setup for this parser */ diff --git a/h2o-parsers/h2o-orc-parser/src/test/java/water/parser/ParseTestOrc.java b/h2o-parsers/h2o-orc-parser/src/test/java/water/parser/ParseTestOrc.java index 53c0811970e..a7f50be37c2 100644 --- a/h2o-parsers/h2o-orc-parser/src/test/java/water/parser/ParseTestOrc.java +++ b/h2o-parsers/h2o-orc-parser/src/test/java/water/parser/ParseTestOrc.java @@ -1,5 +1,6 @@ package water.parser; +import org.junit.Assert; import org.junit.Assume; import org.junit.BeforeClass; import org.junit.Test; @@ -9,8 +10,12 @@ import java.util.Set; import java.util.TreeSet; +import water.Key; import water.TestUtil; +import water.api.schemas3.ParseSetupV3; import water.fvec.Frame; +import water.fvec.NFSFileVec; +import water.fvec.Vec; import water.util.FileUtils; import water.util.Log; @@ -77,6 +82,18 @@ static public void _preconditionJavaVersion() { // NOTE: the `_` force execution Assume.assumeTrue("Java6 is not supported", !System.getProperty("java.version", "NA").startsWith("1.6")); } + @Test public void testBadColumn(){ + NFSFileVec nfs = makeNfsFileVec("smalldata/parser/orc/orc_split_elim.orc"); + Key outputKey = Key.make("orc_Test"); + ParseSetup ps = ParseSetup.guessSetup(new Key[]{nfs._key}, new ParseSetup(new ParseSetupV3())); + System.out.println("ParseSetup"); + System.out.println(ps); + ps._column_types[0] = Vec.T_BAD; + Frame fr = ParseDataset.parse(outputKey, new Key[]{nfs._key},true,ps); + Assert.assertTrue(fr.vec(0).isBad()); + fr.delete(); + } + @Test public void testParseAllOrcs() { Set failedFiles = new TreeSet<>(); From 84ff20eccffbdb4f387c03120437068dd0c99f03 Mon Sep 17 00:00:00 2001 From: Tomas Nykodym Date: Fri, 30 Jun 2017 18:33:26 -0700 Subject: [PATCH 6/9] Fixed ORC bad column handling. --- h2o-core/src/main/java/water/parser/FVecParseWriter.java | 2 +- h2o-core/src/main/java/water/parser/ParseWriter.java | 2 +- h2o-core/src/main/java/water/parser/PreviewParseWriter.java | 2 +- .../h2o-orc-parser/src/main/java/water/parser/orc/OrcParser.java | 3 ++- .../src/main/java/water/parser/orc/OrcParserProvider.java | 1 - .../h2o-orc-parser/src/test/java/water/parser/ParseTestOrc.java | 6 +++++- 6 files changed, 10 insertions(+), 6 deletions(-) diff --git a/h2o-core/src/main/java/water/parser/FVecParseWriter.java b/h2o-core/src/main/java/water/parser/FVecParseWriter.java index 1d21bd56cf3..c0031005553 100644 --- a/h2o-core/src/main/java/water/parser/FVecParseWriter.java +++ b/h2o-core/src/main/java/water/parser/FVecParseWriter.java @@ -98,7 +98,7 @@ public FVecParseWriter(Vec.VectorGroup vg, int cidx, Categorical[] categoricals, } @Override - public void setInvalidCol(int colIdx, int nrows) { + public void addNAs(int colIdx, int nrows) { (_nvs[colIdx] = _vecs[colIdx].chunkForChunkIdx(_cidx)).addNAs(nrows); } diff --git a/h2o-core/src/main/java/water/parser/ParseWriter.java b/h2o-core/src/main/java/water/parser/ParseWriter.java index c1ba91e35eb..ae15e0ed72e 100644 --- a/h2o-core/src/main/java/water/parser/ParseWriter.java +++ b/h2o-core/src/main/java/water/parser/ParseWriter.java @@ -39,7 +39,7 @@ public String toString(){ void addNumCol(int colIdx, double d); // An an invalid / missing entry void addInvalidCol(int colIdx); - void setInvalidCol(int colIdx, int nrow); + void addNAs(int colIdx, int nrow); // Add a String column void addStrCol( int colIdx, BufferedString str ); // Final rolling back of partial line diff --git a/h2o-core/src/main/java/water/parser/PreviewParseWriter.java b/h2o-core/src/main/java/water/parser/PreviewParseWriter.java index c6d31512494..e0db08e3845 100644 --- a/h2o-core/src/main/java/water/parser/PreviewParseWriter.java +++ b/h2o-core/src/main/java/water/parser/PreviewParseWriter.java @@ -100,7 +100,7 @@ private void setColumnCount(int n) { } @Override - public void setInvalidCol(int colIdx, int nrow) { + public void addNAs(int colIdx, int nrow) { throw H2O.unimpl(); } diff --git a/h2o-parsers/h2o-orc-parser/src/main/java/water/parser/orc/OrcParser.java b/h2o-parsers/h2o-orc-parser/src/main/java/water/parser/orc/OrcParser.java index 69f2497985b..ff206211f2b 100644 --- a/h2o-parsers/h2o-orc-parser/src/main/java/water/parser/orc/OrcParser.java +++ b/h2o-parsers/h2o-orc-parser/src/main/java/water/parser/orc/OrcParser.java @@ -134,6 +134,7 @@ protected final ParseWriter parseChunk(int chunkId, ParseReader din, ParseWriter if (toInclude[col + 1]) { // only write a column if we actually want it if(_setup.getColumnTypes()[col] != Vec.T_BAD) write1column(dataVectors[col], orcTypes[colIndex], colIndex, nrows, dout); + else dout.addNAs(col,nrows); colIndex++; } } @@ -142,7 +143,7 @@ protected final ParseWriter parseChunk(int chunkId, ParseReader din, ParseWriter byte [] col_types = _setup.getColumnTypes(); for(int i = 0; i < col_types.length; ++i){ if(col_types[i] == Vec.T_BAD) - dout.setInvalidCol(i,(int)rowCount); + dout.addNAs(i,(int)rowCount); } perStripe.close(); } catch(IOException ioe) { diff --git a/h2o-parsers/h2o-orc-parser/src/main/java/water/parser/orc/OrcParserProvider.java b/h2o-parsers/h2o-orc-parser/src/main/java/water/parser/orc/OrcParserProvider.java index 941bdf9d76a..3352f6032a7 100644 --- a/h2o-parsers/h2o-orc-parser/src/main/java/water/parser/orc/OrcParserProvider.java +++ b/h2o-parsers/h2o-orc-parser/src/main/java/water/parser/orc/OrcParserProvider.java @@ -142,7 +142,6 @@ public ParseSetup setupLocal(Vec v, ParseSetup setup){ if(!(v instanceof FileVec)) throw H2O.unimpl("ORC only implemented for HDFS / NFS files"); try { ((OrcParser.OrcParseSetup)setup).setOrcFileReader(getReader((FileVec)v)); - return setup; } catch (IOException e) {throw new RuntimeException(e);} diff --git a/h2o-parsers/h2o-orc-parser/src/test/java/water/parser/ParseTestOrc.java b/h2o-parsers/h2o-orc-parser/src/test/java/water/parser/ParseTestOrc.java index a7f50be37c2..844be61c706 100644 --- a/h2o-parsers/h2o-orc-parser/src/test/java/water/parser/ParseTestOrc.java +++ b/h2o-parsers/h2o-orc-parser/src/test/java/water/parser/ParseTestOrc.java @@ -16,6 +16,7 @@ import water.fvec.Frame; import water.fvec.NFSFileVec; import water.fvec.Vec; +import water.parser.orc.OrcParserProvider; import water.util.FileUtils; import water.util.Log; @@ -85,7 +86,10 @@ static public void _preconditionJavaVersion() { // NOTE: the `_` force execution @Test public void testBadColumn(){ NFSFileVec nfs = makeNfsFileVec("smalldata/parser/orc/orc_split_elim.orc"); Key outputKey = Key.make("orc_Test"); - ParseSetup ps = ParseSetup.guessSetup(new Key[]{nfs._key}, new ParseSetup(new ParseSetupV3())); + ParseSetup pstp = new ParseSetup(new ParseSetupV3()); + pstp._parse_type = new OrcParserProvider.OrcParserInfo(); + ParseSetup ps = ParseSetup.guessSetup(new Key[]{nfs._key}, pstp); + Assert.assertEquals(ps._parse_type.name(), "ORC"); System.out.println("ParseSetup"); System.out.println(ps); ps._column_types[0] = Vec.T_BAD; From 9209ad72eaf0d704190066053db9a6df75b3c69e Mon Sep 17 00:00:00 2001 From: mmalohlava Date: Wed, 5 Jul 2017 17:11:21 -0700 Subject: [PATCH 7/9] Update test script file. --- h2o-parsers/h2o-orc-parser/testMultiNode.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/h2o-parsers/h2o-orc-parser/testMultiNode.sh b/h2o-parsers/h2o-orc-parser/testMultiNode.sh index d77f28af07b..2e0514b7140 100755 --- a/h2o-parsers/h2o-orc-parser/testMultiNode.sh +++ b/h2o-parsers/h2o-orc-parser/testMultiNode.sh @@ -65,7 +65,7 @@ fi # build/classes/main - Main h2o core classes # build/classes/test - Test h2o core classes # build/resources/main - Main resources (e.g. page.html) -JVM="nice $JAVA_CMD -DcloudSize=5 -ea $COVERAGE -Xmx${MAX_MEM} -Xms${MAX_MEM} -cp build/libs/h2o-orc-parser-test.jar${SEP}build/libs/h2o-orc-parser.jar${SEP}../../h2o-core/build/libs/h2o-core-test.jar${SEP}../../h2o-core/build/libs/h2o-core.jar${SEP}../../h2o-genmodel/build/libs/h2o-genmodel.jar${SEP}../../lib/*" +JVM="nice $JAVA_CMD -DcloudSize=5 -ea $COVERAGE -Xmx${MAX_MEM} -Xms${MAX_MEM} -cp build/libs/h2o-orc-parser-test.jar${SEP}build/libs/h2o-orc-parser.jar${SEP}../../h2o-core/build/libs/h2o-core-test.jar${SEP}../../h2o-core/build/libs/h2o-core.jar${SEP}../../h2o-genmodel/build/libs/h2o-genmodel.jar${SEP}../../h2o-persist-hdfs/build/libs/h2o-persist-hdfs.jar${SEP}../../lib/*" echo "$JVM" > $OUTDIR/jvm_cmd.txt # Ahhh... but the makefile runs the tests skipping the jar'ing step when possible. From a26716a8effd0babe46e254ad0740ed906c38d33 Mon Sep 17 00:00:00 2001 From: Tomas Nykodym Date: Thu, 6 Jul 2017 13:04:47 -0700 Subject: [PATCH 8/9] Fixed orc bug - use correct index when checking the column type. --- .../h2o-orc-parser/src/main/java/water/parser/orc/OrcParser.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/h2o-parsers/h2o-orc-parser/src/main/java/water/parser/orc/OrcParser.java b/h2o-parsers/h2o-orc-parser/src/main/java/water/parser/orc/OrcParser.java index ff206211f2b..06b5af2806f 100644 --- a/h2o-parsers/h2o-orc-parser/src/main/java/water/parser/orc/OrcParser.java +++ b/h2o-parsers/h2o-orc-parser/src/main/java/water/parser/orc/OrcParser.java @@ -132,7 +132,7 @@ protected final ParseWriter parseChunk(int chunkId, ParseReader din, ParseWriter int colIndex = 0; for (int col = 0; col < batch.numCols; ++col) { // read one column at a time; if (toInclude[col + 1]) { // only write a column if we actually want it - if(_setup.getColumnTypes()[col] != Vec.T_BAD) + if(_setup.getColumnTypes()[colIndex] != Vec.T_BAD) write1column(dataVectors[col], orcTypes[colIndex], colIndex, nrows, dout); else dout.addNAs(col,nrows); colIndex++; From 826c8d60d20edcae7aed304d0ed12186ffeea28c Mon Sep 17 00:00:00 2001 From: Tomas Nykodym Date: Thu, 6 Jul 2017 14:24:31 -0700 Subject: [PATCH 9/9] test script fix --- h2o-parsers/h2o-parquet-parser/testMultiNode.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/h2o-parsers/h2o-parquet-parser/testMultiNode.sh b/h2o-parsers/h2o-parquet-parser/testMultiNode.sh index 9d7c001c289..b7048d7380a 100755 --- a/h2o-parsers/h2o-parquet-parser/testMultiNode.sh +++ b/h2o-parsers/h2o-parquet-parser/testMultiNode.sh @@ -44,7 +44,7 @@ fi # build/classes/main - Main h2o core classes # build/classes/test - Test h2o core classes # build/resources/main - Main resources (e.g. page.html) -JVM="nice $JAVA_CMD -DcloudSize=5 -ea -Xmx3g -Xms3g -cp build/libs/h2o-parquet-parser-test.jar${SEP}build/libs/h2o-parquet-parser.jar${SEP}../../h2o-core/build/libs/h2o-core-test.jar${SEP}../../h2o-core/build/libs/h2o-core.jar${SEP}../../h2o-genmodel/build/libs/h2o-genmodel.jar${SEP}../../lib/*" +JVM="nice $JAVA_CMD -DcloudSize=5 -ea -Xmx3g -Xms3g -cp build/libs/h2o-parquet-parser-test.jar${SEP}build/libs/h2o-parquet-parser.jar${SEP}../../h2o-core/build/libs/h2o-core-test.jar${SEP}../../h2o-core/build/libs/h2o-core.jar${SEP}../../h2o-genmodel/build/libs/h2o-genmodel.jar${SEP}../../h2o-persist-hdfs/build/libs/h2o-persist-hdfs.jar${SEP}../../lib/*" echo "$JVM" > $OUTDIR/jvm_cmd.txt # Ahhh... but the makefile runs the tests skipping the jar'ing step when possible. # Also, sometimes see test files in the main-class directory, so put the test