From 5f0f929e42c9a0a8cf3a7bf418a252aa7e4a1168 Mon Sep 17 00:00:00 2001 From: Jacob Date: Sat, 22 Jun 2013 12:48:11 +0100 Subject: [PATCH 1/9] Added filtering functionality --- .../main/java/parquet/avro/AvroParquetReader.java | 5 + parquet-column/pom.xml | 5 + .../src/main/java/parquet/column/ColumnReader.java | 10 ++ .../java/parquet/column/impl/ColumnReaderImpl.java | 18 +++ .../main/java/parquet/filter/AndRecordFilter.java | 51 +++++++++ .../java/parquet/filter/ColumnRecordFilter.java | 92 ++++++++++++++++ .../main/java/parquet/filter/NullRecordFilter.java | 30 +++++ .../java/parquet/filter/PagedRecordFilter.java | 55 ++++++++++ .../src/main/java/parquet/filter/RecordFilter.java | 26 +++++ .../java/parquet/filter/UnboundRecordFilter.java | 19 ++++ .../src/main/java/parquet/io/MessageColumnIO.java | 17 ++- .../parquet/io/RecordReaderImplementation.java | 91 +++++++++------ .../src/test/java/parquet/io/TestFiltered.java | 122 +++++++++++++++++++++ .../main/java/parquet/hadoop/ParquetReader.java | 8 +- .../java/parquet/hadoop/ParquetRecordReader.java | 19 +++- 15 files changed, 529 insertions(+), 39 deletions(-) create mode 100644 parquet-column/src/main/java/parquet/filter/AndRecordFilter.java create mode 100644 parquet-column/src/main/java/parquet/filter/ColumnRecordFilter.java create mode 100644 parquet-column/src/main/java/parquet/filter/NullRecordFilter.java create mode 100644 parquet-column/src/main/java/parquet/filter/PagedRecordFilter.java create mode 100644 parquet-column/src/main/java/parquet/filter/RecordFilter.java create mode 100644 parquet-column/src/main/java/parquet/filter/UnboundRecordFilter.java create mode 100644 parquet-column/src/test/java/parquet/io/TestFiltered.java diff --git a/parquet-avro/src/main/java/parquet/avro/AvroParquetReader.java b/parquet-avro/src/main/java/parquet/avro/AvroParquetReader.java index 849423e0b..1442b2e74 100644 --- a/parquet-avro/src/main/java/parquet/avro/AvroParquetReader.java +++ b/parquet-avro/src/main/java/parquet/avro/AvroParquetReader.java @@ -17,6 +17,7 @@ import java.io.IOException; import org.apache.hadoop.fs.Path; +import parquet.filter.UnboundRecordFilter; import parquet.hadoop.ParquetReader; import parquet.hadoop.api.ReadSupport; @@ -28,4 +29,8 @@ public AvroParquetReader(Path file) throws IOException { super(file, (ReadSupport) new AvroReadSupport()); } + + public AvroParquetReader(Path file, UnboundRecordFilter recordFilter ) throws IOException { + super(file, (ReadSupport) new AvroReadSupport(), recordFilter); + } } diff --git a/parquet-column/pom.xml b/parquet-column/pom.xml index 6fcc75d47..8d05c123f 100644 --- a/parquet-column/pom.xml +++ b/parquet-column/pom.xml @@ -25,6 +25,11 @@ 1.7 compile + + com.google.guava + guava + 11.0 + diff --git a/parquet-column/src/main/java/parquet/column/ColumnReader.java b/parquet-column/src/main/java/parquet/column/ColumnReader.java index 064e62856..149250f8f 100644 --- a/parquet-column/src/main/java/parquet/column/ColumnReader.java +++ b/parquet-column/src/main/java/parquet/column/ColumnReader.java @@ -98,4 +98,14 @@ * @return the current value */ double getDouble(); + + /** + * @return Descriptor of the column. + */ + ColumnDescriptor getDescriptor(); + + /** + * Skip the current value + */ + void skip(); } diff --git a/parquet-column/src/main/java/parquet/column/impl/ColumnReaderImpl.java b/parquet-column/src/main/java/parquet/column/impl/ColumnReaderImpl.java index ae9169f71..9e3e434c3 100644 --- a/parquet-column/src/main/java/parquet/column/impl/ColumnReaderImpl.java +++ b/parquet-column/src/main/java/parquet/column/impl/ColumnReaderImpl.java @@ -358,6 +358,24 @@ public int getCurrentRepetitionLevel() { } /** + * {@inheritDoc} + * @see parquet.column.ColumnReader#skip() + */ + @Override + public void skip() { + checkValueRead(); // must be a more efficient version of this + } + + /** + * {@inheritDoc} + * @see parquet.column.ColumnReader#getDescriptor() + */ + @Override + public ColumnDescriptor getDescriptor() { + return path; + } + + /** * reads the current value */ public void readCurrentValue() { diff --git a/parquet-column/src/main/java/parquet/filter/AndRecordFilter.java b/parquet-column/src/main/java/parquet/filter/AndRecordFilter.java new file mode 100644 index 000000000..675e02e74 --- /dev/null +++ b/parquet-column/src/main/java/parquet/filter/AndRecordFilter.java @@ -0,0 +1,51 @@ +package parquet.filter; + +import parquet.Preconditions; +import parquet.column.ColumnReader; + +/** + * Provides ability to chain two filters together. Bear in mind that the first one will + * short circuit the second. Useful if getting a page of already filtered result. + * i.e and( column("manufacturer", equalTo("Volkswagen")), page(100,50)) + * + * @author Jacob Metcalf + */ +public final class AndRecordFilter implements RecordFilter { + + private final RecordFilter boundFilter1; + private final RecordFilter boundFilter2; + + /** + * Returns builder for creating an and filter. + * @param filter1 The first filter to check. + * @param filter2 The second filter to check. + */ + public static final UnboundRecordFilter and( final UnboundRecordFilter filter1, final UnboundRecordFilter filter2 ) { + Preconditions.checkNotNull( filter1, "filter1" ); + Preconditions.checkNotNull( filter2, "filter2" ); + return new UnboundRecordFilter() { + @Override + public RecordFilter bind(Iterable readers) { + return new AndRecordFilter( filter1.bind(readers), filter2.bind( readers) ); + } + }; + } + + /** + * Private constructor, use AndRecordFilter.and() instead. + */ + private AndRecordFilter( RecordFilter boundFilter1, RecordFilter boundFilter2 ) { + this.boundFilter1 = boundFilter1; + this.boundFilter2 = boundFilter2; + } + + @Override + public boolean isFullyConsumed() { + return boundFilter1.isFullyConsumed() && boundFilter2.isFullyConsumed(); + } + + @Override + public boolean isMatch() { + return boundFilter1.isMatch() && boundFilter2.isMatch(); + } +} diff --git a/parquet-column/src/main/java/parquet/filter/ColumnRecordFilter.java b/parquet-column/src/main/java/parquet/filter/ColumnRecordFilter.java new file mode 100644 index 000000000..2a1beedcb --- /dev/null +++ b/parquet-column/src/main/java/parquet/filter/ColumnRecordFilter.java @@ -0,0 +1,92 @@ +package parquet.filter; + +import com.google.common.base.Objects; +import com.google.common.base.Predicate; +import com.google.common.base.Splitter; +import parquet.column.ColumnReader; +import parquet.io.api.Binary; + +import java.util.Arrays; + +import static com.google.common.collect.Iterables.toArray; +import static parquet.Preconditions.checkNotNull; + +/** + * Record filter which applies the supplied predicate to the specified column. + */ +public final class ColumnRecordFilter implements RecordFilter { + + private final ColumnReader filterOnColumn; + private final Predicate filterPredicate; + + /** + * Factory method for record filter which applies the supplied predicate to the specified column. + * @param columnPath Dot separated path specifier, e.g. "engine.capacity" + * @param predicate Should call getBinary etc. and check the value + */ + public static final UnboundRecordFilter column(final String columnPath, final Predicate predicate) { + checkNotNull(columnPath, "columnPath"); + checkNotNull(predicate, "predicate"); + return new UnboundRecordFilter() { + final String[] filterPath = toArray(Splitter.on('.').split(columnPath), String.class); + @Override + public RecordFilter bind(Iterable readers) { + for ( ColumnReader reader : readers ) { + if ( Arrays.equals( reader.getDescriptor().getPath(), filterPath)) { + return new ColumnRecordFilter(reader, predicate); + } + } + throw new IllegalArgumentException( "Column " + columnPath + " does not exist."); + } + }; + } + + /** + * Private constructor. Use column() instead. + */ + private ColumnRecordFilter(ColumnReader filterOnColumn, Predicate filterPredicate) { + this.filterOnColumn = filterOnColumn; + this.filterPredicate = filterPredicate; + } + + /** + * @return true if the current value for the column reader matches the predicate. + */ + @Override + public boolean isMatch() { + return ( filterOnColumn.isFullyConsumed()) ? false : filterPredicate.apply( filterOnColumn ); + } + + /** + * @return true if the column we are filtering on has no more values. + */ + @Override + public boolean isFullyConsumed() { + return filterOnColumn.isFullyConsumed(); + } + + /** + * Predicate for string equality + */ + public static final Predicate equalTo( final String value ) { + final Binary valueAsBinary = Binary.fromString( value ); + return new Predicate () { + @Override + public boolean apply(ColumnReader input) { + return Objects.equal( input.getBinary(), valueAsBinary ); + } + }; + } + + /** + * Predicate for INT64 / long equality + */ + public static final Predicate equalTo( final long value ) { + return new Predicate () { + @Override + public boolean apply(ColumnReader input) { + return input.getLong() == value; + } + }; + } +} diff --git a/parquet-column/src/main/java/parquet/filter/NullRecordFilter.java b/parquet-column/src/main/java/parquet/filter/NullRecordFilter.java new file mode 100644 index 000000000..7a754a0f2 --- /dev/null +++ b/parquet-column/src/main/java/parquet/filter/NullRecordFilter.java @@ -0,0 +1,30 @@ +package parquet.filter; + +import parquet.column.ColumnReader; + +/** + * Null filter which will always let all records through. + */ +final class NullRecordFilter implements UnboundRecordFilter, RecordFilter { + + /** + * Package level visibility so we can make an instance available in interface. + */ + NullRecordFilter() {} + + @Override + public RecordFilter bind(Iterable readers) { + return this; + } + + @Override + public boolean isMatch() { + return true; + } + + @Override + public boolean isFullyConsumed() { + // Always false, we will leave to the record reader to decide when it has consumed everything + return false; + } +} diff --git a/parquet-column/src/main/java/parquet/filter/PagedRecordFilter.java b/parquet-column/src/main/java/parquet/filter/PagedRecordFilter.java new file mode 100644 index 000000000..02bcc81a2 --- /dev/null +++ b/parquet-column/src/main/java/parquet/filter/PagedRecordFilter.java @@ -0,0 +1,55 @@ +package parquet.filter; + +import parquet.column.ColumnReader; + +/** + * Filter which will only materialize a page worth of results. + */ +public final class PagedRecordFilter implements RecordFilter { + + private final long startPos; + private final long endPos; + private long currentPos = 0; + + /** + * Returns builder for creating a paged query. + * @param startPos The record to start from, numbering starts at 1. + * @param pageSize The size of the page. + */ + public static final UnboundRecordFilter page( final long startPos, final long pageSize ) { + return new UnboundRecordFilter() { + @Override + public RecordFilter bind(Iterable readers) { + return new PagedRecordFilter( startPos, pageSize ); + } + }; + } + + /** + * Private constructor, use column() instead. + */ + private PagedRecordFilter(long startPos, long pageSize) { + this.startPos = startPos; + this.endPos = startPos + pageSize; + } + + /** + * Terminate early when we have got our page. Later we will want a row count and this + * will be no good. + */ + @Override + public boolean isFullyConsumed() { + return ( currentPos >= endPos ); + } + + /** + * Keeps track of how many times it is called. Only returns matches when the + * record number is in the range. + */ + @Override + public boolean isMatch() { + currentPos++; + return (( currentPos >= startPos ) && ( currentPos < endPos )); + } + +} diff --git a/parquet-column/src/main/java/parquet/filter/RecordFilter.java b/parquet-column/src/main/java/parquet/filter/RecordFilter.java new file mode 100644 index 000000000..fe8031587 --- /dev/null +++ b/parquet-column/src/main/java/parquet/filter/RecordFilter.java @@ -0,0 +1,26 @@ +package parquet.filter; + +import parquet.column.ColumnReader; + +/** + * Filter to be applied to a record to work out whether to skip it. + * + * @author Jacob Metcalf + */ +public interface RecordFilter { + + /** + * Works out whether the current record can pass through the filter. + */ + boolean isMatch(); + + /** + * Whether the filter values are fully consumed. + */ + boolean isFullyConsumed(); + + /** + * Null filter to be used if no filtering needed. + */ + public static final UnboundRecordFilter NULL_FILTER = new NullRecordFilter(); +} diff --git a/parquet-column/src/main/java/parquet/filter/UnboundRecordFilter.java b/parquet-column/src/main/java/parquet/filter/UnboundRecordFilter.java new file mode 100644 index 000000000..d890115e6 --- /dev/null +++ b/parquet-column/src/main/java/parquet/filter/UnboundRecordFilter.java @@ -0,0 +1,19 @@ +package parquet.filter; + +import com.google.common.base.Predicate; +import parquet.column.ColumnReader; + +/** + * Builder for a record filter. Idea is that each filter provides a create function + * which returns an unbound filter. This only becomes a filter when it is bound to the actual + * columns. + * + * @author Jacob Metcalf + */ +public interface UnboundRecordFilter { + + /** + * Call to bind to actual columns and create filter. + */ + RecordFilter bind( Iterable readers); +} diff --git a/parquet-column/src/main/java/parquet/io/MessageColumnIO.java b/parquet-column/src/main/java/parquet/io/MessageColumnIO.java index a6e31f66a..32caaf22d 100644 --- a/parquet-column/src/main/java/parquet/io/MessageColumnIO.java +++ b/parquet-column/src/main/java/parquet/io/MessageColumnIO.java @@ -27,6 +27,8 @@ import parquet.io.api.RecordConsumer; import parquet.io.api.RecordMaterializer; import parquet.schema.MessageType; +import parquet.filter.UnboundRecordFilter; +import parquet.filter.RecordFilter; /** * Message level of the IO structure @@ -58,8 +60,19 @@ this, recordMaterializer, validating, - new ColumnReadStoreImpl(columns, recordMaterializer.getRootConverter(), getType()) - ); + new ColumnReadStoreImpl(columns, recordMaterializer.getRootConverter(), getType()), + RecordFilter.NULL_FILTER + ); + } + public RecordReader getRecordReader(PageReadStore columns, RecordMaterializer recordMaterializer, + UnboundRecordFilter recordFilter) { + return new RecordReaderImplementation( + this, + recordMaterializer, + validating, + new ColumnReadStoreImpl(columns, recordMaterializer.getRootConverter(), getType()), + recordFilter + ); } private class MessageColumnIORecordConsumer extends RecordConsumer { diff --git a/parquet-column/src/main/java/parquet/io/RecordReaderImplementation.java b/parquet-column/src/main/java/parquet/io/RecordReaderImplementation.java index 5a2d83eeb..7fe10c4ba 100644 --- a/parquet-column/src/main/java/parquet/io/RecordReaderImplementation.java +++ b/parquet-column/src/main/java/parquet/io/RecordReaderImplementation.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; +import com.google.common.collect.ImmutableList; import parquet.Log; import parquet.column.ColumnReader; import parquet.column.impl.ColumnReadStoreImpl; @@ -33,6 +34,8 @@ import parquet.io.api.RecordMaterializer; import parquet.schema.MessageType; import parquet.schema.PrimitiveType.PrimitiveTypeName; +import parquet.filter.UnboundRecordFilter; +import parquet.filter.RecordFilter; /** @@ -226,17 +229,19 @@ public Case getCase(int currentLevel, int d, int nextR) { private final GroupConverter recordConsumer; private final RecordMaterializer recordMaterializer; + private final RecordFilter recordFilter; private State[] states; /** * * @param root the root of the schema - * @param leaves the leaves of the schema * @param validating - * @param columns2 + * @param columnsStore + * @param unboundFilter Filter records, pass in NULL_FILTER to leave unfiltered. */ - public RecordReaderImplementation(MessageColumnIO root, RecordMaterializer recordMaterializer, boolean validating, ColumnReadStoreImpl columnStore) { + public RecordReaderImplementation(MessageColumnIO root, RecordMaterializer recordMaterializer, boolean validating, ColumnReadStoreImpl columnStore, + UnboundRecordFilter unboundFilter) { this.recordMaterializer = recordMaterializer; this.recordConsumer = recordMaterializer.getRootConverter(); // TODO: validator(wrap(recordMaterializer), validating, root.getType()); PrimitiveColumnIO[] leaves = root.getLeaves().toArray(new PrimitiveColumnIO[root.getLeaves().size()]); @@ -356,6 +361,10 @@ public int compare(Case o1, Case o2) { Collections.sort(state.definedCases, caseComparator); Collections.sort(state.undefinedCases, caseComparator); } + + // We need to make defensive copy to stop interference but as an optimisation don't bother if null + recordFilter = unboundFilter.bind(( unboundFilter == RecordFilter.NULL_FILTER ) + ? null : ImmutableList.copyOf(columns)); } //TODO: have those wrappers for a converter @@ -375,36 +384,54 @@ private RecordConsumer wrap(RecordConsumer recordConsumer) { */ @Override public T read() { - int currentLevel = 0; - State currentState = states[0]; - recordConsumer.start(); - do { - ColumnReader columnReader = currentState.column; - int d = columnReader.getCurrentDefinitionLevel(); - // creating needed nested groups until the current field (opening tags) - int depth = currentState.definitionLevelToDepth[d]; - for (; currentLevel <= depth; ++currentLevel) { - currentState.groupConverterPath[currentLevel].start(); - } - // currentLevel = depth + 1 at this point - // set the current value - if (d >= currentState.maxDefinitionLevel) { - // not null - columnReader.writeCurrentValueToConverter(); - } - columnReader.consume(); - - int nextR = currentState.maxRepetitionLevel == 0 ? 0 : columnReader.getCurrentRepetitionLevel(); - // level to go to close current groups - int next = currentState.nextLevel[nextR]; - for (; currentLevel > next; currentLevel--) { - currentState.groupConverterPath[currentLevel - 1].end(); + // Loop will end when either the record is materialized or we run out of values to consume + while ( !recordFilter.isFullyConsumed()) { + + int currentLevel = 0; + State currentState = states[0]; + boolean materializeRecord = recordFilter.isMatch(); + if ( materializeRecord ) { + // Where we are creating objects this is likely to be expensive. + recordConsumer.start(); } - - currentState = currentState.nextState[nextR]; - } while (currentState != null); - recordConsumer.end(); - return recordMaterializer.getCurrentRecord(); + do { + ColumnReader columnReader = currentState.column; + int d = columnReader.getCurrentDefinitionLevel(); + if (materializeRecord) { + // creating needed nested groups until the current field (opening tags) + int depth = currentState.definitionLevelToDepth[d]; + for (; currentLevel <= depth; ++currentLevel) { + currentState.groupConverterPath[currentLevel].start(); + } + } + // currentLevel = depth + 1 at this point + // set the current value + if (d >= currentState.maxDefinitionLevel) { + // not null + if (materializeRecord) { + columnReader.writeCurrentValueToConverter(); + } else { + columnReader.skip(); + } + } + columnReader.consume(); + + int nextR = currentState.maxRepetitionLevel == 0 ? 0 : columnReader.getCurrentRepetitionLevel(); + // level to go to close current groups + int next = currentState.nextLevel[nextR]; + if (materializeRecord) { + for (; currentLevel > next; currentLevel--) { + currentState.groupConverterPath[currentLevel - 1].end(); + } + } + currentState = currentState.nextState[nextR]; + } while (currentState != null); + + if (materializeRecord) { + recordConsumer.end(); + return recordMaterializer.getCurrentRecord(); + } } + return null; } private static void log(String string) { diff --git a/parquet-column/src/test/java/parquet/io/TestFiltered.java b/parquet-column/src/test/java/parquet/io/TestFiltered.java new file mode 100644 index 000000000..b46acbe00 --- /dev/null +++ b/parquet-column/src/test/java/parquet/io/TestFiltered.java @@ -0,0 +1,122 @@ +package parquet.io; + +import org.junit.Ignore; +import org.junit.Test; +import parquet.Log; +import parquet.column.impl.ColumnWriteStoreImpl; +import parquet.column.page.mem.MemPageStore; +import parquet.example.data.Group; +import parquet.example.data.GroupWriter; +import parquet.example.data.simple.convert.GroupRecordConverter; +import parquet.io.api.RecordMaterializer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static parquet.example.Paper.r1; +import static parquet.example.Paper.r2; +import static parquet.example.Paper.schema; +import static parquet.filter.AndRecordFilter.and; +import static parquet.filter.PagedRecordFilter.page; +import static parquet.filter.ColumnRecordFilter.equalTo; +import static parquet.filter.ColumnRecordFilter.column; + +public class TestFiltered { + + private static final Log LOG = Log.getLog(TestColumnIO.class); + + @Test + public void testFilterOnInteger() { + MemPageStore memPageStore = new MemPageStore(); + MessageColumnIO columnIO = new ColumnIOFactory(true).getColumnIO(schema); + writeTestRecords(memPageStore, columnIO, 1); + + // Get first record + RecordMaterializer recordConverter = new GroupRecordConverter(schema); + RecordReaderImplementation recordReader = (RecordReaderImplementation) + columnIO.getRecordReader(memPageStore, recordConverter, + column("DocId", equalTo(10l))); + + Group actual1= recordReader.read(); + assertNull( "There should be no more records as r2 filtered out", recordReader.read()); + assertEquals("filtering did not return the correct record", r1.toString(), actual1.toString()); + + // Get second record + recordReader = (RecordReaderImplementation) + columnIO.getRecordReader(memPageStore, recordConverter, + column("DocId", equalTo(20l))); + + Group actual2= recordReader.read(); + assertNull( "There should be no more records as r1 filtered out", recordReader.read()); + assertEquals("filtering did not return the correct record", r2.toString(), actual2.toString()); + + } + + /** + * Not yet working. + */ + @Ignore + public void testFilterOnString() { + MemPageStore memPageStore = new MemPageStore(); + MessageColumnIO columnIO = new ColumnIOFactory(true).getColumnIO(schema); + writeTestRecords(memPageStore, columnIO, 1); + + RecordMaterializer recordConverter = new GroupRecordConverter(schema); + RecordReaderImplementation recordReader = (RecordReaderImplementation) + columnIO.getRecordReader(memPageStore, recordConverter, + column("Name.Url", equalTo("http://C"))); + + Group actual2 = recordReader.read(); + assertNull( "There should be no more records as r1 filtered out", recordReader.read()); + assertEquals("filtering did not return the correct record", r2.toString(), actual2.toString()); + } + + @Test + public void testPaged() { + MemPageStore memPageStore = new MemPageStore(); + MessageColumnIO columnIO = new ColumnIOFactory(true).getColumnIO(schema); + writeTestRecords(memPageStore, columnIO, 6); + + RecordMaterializer recordConverter = new GroupRecordConverter(schema); + RecordReaderImplementation recordReader = (RecordReaderImplementation) + columnIO.getRecordReader(memPageStore, recordConverter, + page(4, 4)); + + int count = 0; + while ( count < 2 ) { // starts at position 4 which should be r2 + assertEquals("expecting record2", r2.toString(), recordReader.read().toString()); + assertEquals("expecting record1", r1.toString(), recordReader.read().toString()); + count++; + } + assertNull("There should be no more records", recordReader.read()); + } + + @Test + public void testFilteredAndPaged() { + MemPageStore memPageStore = new MemPageStore(); + MessageColumnIO columnIO = new ColumnIOFactory(true).getColumnIO(schema); + writeTestRecords(memPageStore, columnIO, 8); + + RecordMaterializer recordConverter = new GroupRecordConverter(schema); + RecordReaderImplementation recordReader = (RecordReaderImplementation) + columnIO.getRecordReader(memPageStore, recordConverter, + and(column("DocId", equalTo(10l)), page(2, 4))); + + int count = 0; + while ( count < 4 ) { // starts at position 4 which should be r2 + assertEquals("expecting 4 x record1", r1.toString(), recordReader.read().toString()); + count++; + } + assertNull( "There should be no more records", recordReader.read()); + } + + private void writeTestRecords(MemPageStore memPageStore, MessageColumnIO columnIO, int number) { + ColumnWriteStoreImpl columns = new ColumnWriteStoreImpl(memPageStore, 800, 800, false); + + GroupWriter groupWriter = new GroupWriter(columnIO.getRecordWriter(columns), schema); + for ( int i = 0; i < number; i++ ) { + groupWriter.write(r1); + groupWriter.write(r2); + } + columns.flush(); + } +} diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetReader.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetReader.java index 557724d1b..c9e3661ea 100644 --- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetReader.java +++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetReader.java @@ -26,6 +26,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import parquet.filter.RecordFilter; +import parquet.filter.UnboundRecordFilter; import parquet.hadoop.api.ReadSupport; import parquet.hadoop.api.ReadSupport.ReadContext; import parquet.hadoop.metadata.BlockMetaData; @@ -40,6 +42,10 @@ private ParquetRecordReader reader; public ParquetReader(Path file, ReadSupport readSupport) throws IOException { + this(file, readSupport, RecordFilter.NULL_FILTER); + } + + public ParquetReader(Path file, ReadSupport readSupport, UnboundRecordFilter filter) throws IOException { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); @@ -53,7 +59,7 @@ public ParquetReader(Path file, ReadSupport readSupport) throws IOException { MessageType schema = fileMetaData.getSchema(); Map extraMetadata = fileMetaData.getKeyValueMetaData(); final ReadContext readContext = readSupport.init(conf, extraMetadata, schema); - reader = new ParquetRecordReader(readSupport); + reader = new ParquetRecordReader(readSupport,filter); ParquetInputSplit inputSplit = new ParquetInputSplit( file, 0, 0, null, blocks, diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java index 5d800e0b0..ee08c5abe 100644 --- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java +++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java @@ -30,6 +30,8 @@ import parquet.Log; import parquet.column.ColumnDescriptor; import parquet.column.page.PageReadStore; +import parquet.filter.RecordFilter; +import parquet.filter.UnboundRecordFilter; import parquet.hadoop.api.ReadSupport; import parquet.hadoop.metadata.BlockMetaData; import parquet.hadoop.util.ContextUtil; @@ -68,6 +70,7 @@ private int currentBlock = -1; private ParquetFileReader reader; private parquet.io.RecordReader recordReader; + private UnboundRecordFilter recordFilter; private long totalTimeSpentReadingBytes; private long totalTimeSpentProcessingRecords; @@ -76,11 +79,19 @@ private long totalCountLoadedSoFar = 0; /** - * @param requestedSchema the requested schema (a subset of the original schema) for record projection - * @param readSupportClass + * @param readSupport Provides functionality for reading. */ - public ParquetRecordReader(ReadSupport readSupport) { + public ParquetRecordReader(ReadSupport readSupport ) { + this(readSupport, RecordFilter.NULL_FILTER ); + } + + /** + * @param readSupport Provides functionality for reading. + * @param recordFilter Filter to be applied to read records. Use NULL_FILTER if none required. + */ + public ParquetRecordReader(ReadSupport readSupport, UnboundRecordFilter recordFilter) { this.readSupport = readSupport; + this.recordFilter = recordFilter; } private void checkRead() throws IOException { @@ -106,7 +117,7 @@ private void checkRead() throws IOException { LOG.info("block read in memory in " + timeSpentReading + " ms. row count = " + pages.getRowCount()); if (Log.DEBUG) LOG.debug("initializing Record assembly with requested schema " + requestedSchema); MessageColumnIO columnIO = columnIOFactory.getColumnIO(requestedSchema); - recordReader = columnIO.getRecordReader(pages, recordConverter); + recordReader = columnIO.getRecordReader(pages, recordConverter, recordFilter); startedAssemblingCurrentBlockAt = System.currentTimeMillis(); totalCountLoadedSoFar += pages.getRowCount(); ++ currentBlock; From ef5c143d0ce9fc825a0ef418c584f1c3a491435d Mon Sep 17 00:00:00 2001 From: Jacob Date: Sat, 22 Jun 2013 16:05:50 +0100 Subject: [PATCH 2/9] Added avro specific functionality --- ...onverter.java => AvroIndexedRecordConverter.java} | 20 ++++++++++++-------- .../java/parquet/avro/AvroParquetInputFormat.java | 3 ++- .../java/parquet/avro/AvroParquetOutputFormat.java | 3 ++- .../main/java/parquet/avro/AvroParquetReader.java | 4 +++- .../main/java/parquet/avro/AvroParquetWriter.java | 20 +++++++++++++++++++- .../src/main/java/parquet/avro/AvroReadSupport.java | 5 +++-- .../java/parquet/avro/AvroRecordMaterializer.java | 9 +++++---- .../src/main/java/parquet/avro/AvroWriteSupport.java | 9 +++++---- 8 files changed, 51 insertions(+), 22 deletions(-) rename parquet-avro/src/main/java/parquet/avro/{AvroGenericRecordConverter.java => AvroIndexedRecordConverter.java} (93%) diff --git a/parquet-avro/src/main/java/parquet/avro/AvroGenericRecordConverter.java b/parquet-avro/src/main/java/parquet/avro/AvroIndexedRecordConverter.java similarity index 93% rename from parquet-avro/src/main/java/parquet/avro/AvroGenericRecordConverter.java rename to parquet-avro/src/main/java/parquet/avro/AvroIndexedRecordConverter.java index e88af8887..459659aec 100644 --- a/parquet-avro/src/main/java/parquet/avro/AvroGenericRecordConverter.java +++ b/parquet-avro/src/main/java/parquet/avro/AvroIndexedRecordConverter.java @@ -22,6 +22,8 @@ import org.apache.avro.generic.GenericArray; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.avro.specific.SpecificData; import parquet.io.api.Binary; import parquet.io.api.Converter; import parquet.io.api.GroupConverter; @@ -30,20 +32,20 @@ import parquet.schema.MessageType; import parquet.schema.Type; -class AvroGenericRecordConverter extends GroupConverter { +class AvroIndexedRecordConverter extends GroupConverter { private final ParentValueContainer parent; - protected GenericData.Record currentRecord; + protected IndexedRecord currentRecord; private final Converter[] converters; private final GroupType parquetSchema; private final Schema avroSchema; - public AvroGenericRecordConverter(MessageType parquetSchema, Schema avroSchema) { + public AvroIndexedRecordConverter(MessageType parquetSchema, Schema avroSchema) { this(null, parquetSchema, avroSchema); } - public AvroGenericRecordConverter(ParentValueContainer parent, GroupType + public AvroIndexedRecordConverter(ParentValueContainer parent, GroupType parquetSchema, Schema avroSchema) { this.parent = parent; this.parquetSchema = parquetSchema; @@ -62,7 +64,7 @@ public AvroGenericRecordConverter(ParentValueContainer parent, GroupType converters[index] = newConverter(fieldSchema, type, new ParentValueContainer() { @Override void add(Object value) { - AvroGenericRecordConverter.this.set(finalAvroIndex, value); + AvroIndexedRecordConverter.this.set(finalAvroIndex, value); } }); index++; @@ -86,7 +88,7 @@ private static Converter newConverter(Schema schema, Type type, } else if (schema.getType().equals(Schema.Type.STRING)) { return new FieldStringConverter(parent); } else if (schema.getType().equals(Schema.Type.RECORD)) { - return new AvroGenericRecordConverter(parent, type.asGroupType(), schema); + return new AvroIndexedRecordConverter(parent, type.asGroupType(), schema); } else if (schema.getType().equals(Schema.Type.ENUM)) { return new FieldStringConverter(parent); } else if (schema.getType().equals(Schema.Type.ARRAY)) { @@ -111,7 +113,9 @@ public Converter getConverter(int fieldIndex) { @Override public void start() { - this.currentRecord = new GenericData.Record(avroSchema); + // Should do the right thing whether it is generic or specific + this.currentRecord = + (IndexedRecord) SpecificData.get().newRecord((IndexedRecord)null, avroSchema); } @Override @@ -121,7 +125,7 @@ public void end() { } } - GenericRecord getCurrentRecord() { + IndexedRecord getCurrentRecord() { return currentRecord; } diff --git a/parquet-avro/src/main/java/parquet/avro/AvroParquetInputFormat.java b/parquet-avro/src/main/java/parquet/avro/AvroParquetInputFormat.java index 01a3ae67c..9b15997bc 100644 --- a/parquet-avro/src/main/java/parquet/avro/AvroParquetInputFormat.java +++ b/parquet-avro/src/main/java/parquet/avro/AvroParquetInputFormat.java @@ -16,13 +16,14 @@ package parquet.avro; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; import parquet.avro.AvroReadSupport; import parquet.hadoop.ParquetInputFormat; /** * A Hadoop {@link org.apache.hadoop.mapreduce.InputFormat} for Parquet files. */ -public class AvroParquetInputFormat extends ParquetInputFormat { +public class AvroParquetInputFormat extends ParquetInputFormat { public AvroParquetInputFormat() { super(AvroReadSupport.class); } diff --git a/parquet-avro/src/main/java/parquet/avro/AvroParquetOutputFormat.java b/parquet-avro/src/main/java/parquet/avro/AvroParquetOutputFormat.java index 095d56330..5007a684b 100644 --- a/parquet-avro/src/main/java/parquet/avro/AvroParquetOutputFormat.java +++ b/parquet-avro/src/main/java/parquet/avro/AvroParquetOutputFormat.java @@ -17,6 +17,7 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.mapreduce.Job; import parquet.avro.AvroWriteSupport; import parquet.hadoop.ParquetOutputFormat; @@ -25,7 +26,7 @@ /** * A Hadoop {@link org.apache.hadoop.mapreduce.OutputFormat} for Parquet files. */ -public class AvroParquetOutputFormat extends ParquetOutputFormat { +public class AvroParquetOutputFormat extends ParquetOutputFormat { public static void setSchema(Job job, Schema schema) { AvroWriteSupport.setSchema(ContextUtil.getConfiguration(job), schema); diff --git a/parquet-avro/src/main/java/parquet/avro/AvroParquetReader.java b/parquet-avro/src/main/java/parquet/avro/AvroParquetReader.java index 1442b2e74..211895cd2 100644 --- a/parquet-avro/src/main/java/parquet/avro/AvroParquetReader.java +++ b/parquet-avro/src/main/java/parquet/avro/AvroParquetReader.java @@ -16,6 +16,8 @@ package parquet.avro; import java.io.IOException; + +import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.Path; import parquet.filter.UnboundRecordFilter; import parquet.hadoop.ParquetReader; @@ -24,7 +26,7 @@ /** * Read Avro records from a Parquet file. */ -public class AvroParquetReader extends ParquetReader { +public class AvroParquetReader extends ParquetReader { public AvroParquetReader(Path file) throws IOException { super(file, (ReadSupport) new AvroReadSupport()); diff --git a/parquet-avro/src/main/java/parquet/avro/AvroParquetWriter.java b/parquet-avro/src/main/java/parquet/avro/AvroParquetWriter.java index 23c7a2498..ed631e545 100644 --- a/parquet-avro/src/main/java/parquet/avro/AvroParquetWriter.java +++ b/parquet-avro/src/main/java/parquet/avro/AvroParquetWriter.java @@ -17,6 +17,7 @@ import java.io.IOException; import org.apache.avro.Schema; +import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.Path; import parquet.hadoop.ParquetWriter; import parquet.hadoop.api.WriteSupport; @@ -25,7 +26,7 @@ /** * Write Avro records to a Parquet file. */ -public class AvroParquetWriter extends ParquetWriter { +public class AvroParquetWriter extends ParquetWriter { /** Create a new {@link AvroParquetWriter}. * @@ -44,6 +45,23 @@ public AvroParquetWriter(Path file, Schema avroSchema, compressionCodecName, blockSize, pageSize); } + /** Create a new {@link AvroParquetWriter}. + * + * @param file + * @param avroSchema + * @param compressionCodecName + * @param blockSize + * @param pageSize + * @throws IOException + */ + public AvroParquetWriter(Path file, Schema avroSchema, + CompressionCodecName compressionCodecName, int blockSize, + int pageSize, boolean enableDictionary) throws IOException { + super(file, (WriteSupport) + new AvroWriteSupport(new AvroSchemaConverter().convert(avroSchema), avroSchema), + compressionCodecName, blockSize, pageSize,enableDictionary,false); + } + /** Create a new {@link AvroParquetWriter}. The default block size is 50 MB.The default * page size is 1 MB. Default compression is no compression. (Inherited from {@link ParquetWriter}) * diff --git a/parquet-avro/src/main/java/parquet/avro/AvroReadSupport.java b/parquet-avro/src/main/java/parquet/avro/AvroReadSupport.java index dc1f088df..1d0d0b746 100644 --- a/parquet-avro/src/main/java/parquet/avro/AvroReadSupport.java +++ b/parquet-avro/src/main/java/parquet/avro/AvroReadSupport.java @@ -18,6 +18,7 @@ import java.util.Map; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.conf.Configuration; import parquet.hadoop.api.ReadSupport; import parquet.io.api.RecordMaterializer; @@ -28,7 +29,7 @@ * use {@link AvroParquetReader} or {@link AvroParquetInputFormat} rather than using * this class directly. */ -public class AvroReadSupport extends ReadSupport { +public class AvroReadSupport extends ReadSupport { @Override public ReadContext init(Configuration configuration, Map keyValueMetaData, MessageType fileSchema) { @@ -36,7 +37,7 @@ public ReadContext init(Configuration configuration, Map keyValu } @Override - public RecordMaterializer prepareForRead(Configuration configuration, Map keyValueMetaData, MessageType fileSchema, ReadContext readContext) { + public RecordMaterializer prepareForRead(Configuration configuration, Map keyValueMetaData, MessageType fileSchema, ReadContext readContext) { Schema avroSchema = new Schema.Parser().parse(keyValueMetaData.get("avro.schema")); return new AvroRecordMaterializer(readContext.getRequestedSchema(), avroSchema); } diff --git a/parquet-avro/src/main/java/parquet/avro/AvroRecordMaterializer.java b/parquet-avro/src/main/java/parquet/avro/AvroRecordMaterializer.java index 3a3d1f624..a7cf09a6b 100644 --- a/parquet-avro/src/main/java/parquet/avro/AvroRecordMaterializer.java +++ b/parquet-avro/src/main/java/parquet/avro/AvroRecordMaterializer.java @@ -17,20 +17,21 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; import parquet.io.api.GroupConverter; import parquet.io.api.RecordMaterializer; import parquet.schema.MessageType; -class AvroRecordMaterializer extends RecordMaterializer { +class AvroRecordMaterializer extends RecordMaterializer { - private AvroGenericRecordConverter root; + private AvroIndexedRecordConverter root; public AvroRecordMaterializer(MessageType requestedSchema, Schema avroSchema) { - this.root = new AvroGenericRecordConverter(requestedSchema, avroSchema); + this.root = new AvroIndexedRecordConverter(requestedSchema, avroSchema); } @Override - public GenericRecord getCurrentRecord() { + public IndexedRecord getCurrentRecord() { return root.getCurrentRecord(); } diff --git a/parquet-avro/src/main/java/parquet/avro/AvroWriteSupport.java b/parquet-avro/src/main/java/parquet/avro/AvroWriteSupport.java index 661ba7a9f..1d2af57ce 100644 --- a/parquet-avro/src/main/java/parquet/avro/AvroWriteSupport.java +++ b/parquet-avro/src/main/java/parquet/avro/AvroWriteSupport.java @@ -23,6 +23,7 @@ import org.apache.avro.generic.GenericArray; import org.apache.avro.generic.GenericFixed; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; import org.apache.avro.util.Utf8; import org.apache.hadoop.conf.Configuration; import parquet.hadoop.api.WriteSupport; @@ -37,7 +38,7 @@ * use {@link AvroParquetWriter} or {@link AvroParquetOutputFormat} rather than using * this class directly. */ -public class AvroWriteSupport extends WriteSupport { +public class AvroWriteSupport extends WriteSupport { private RecordConsumer recordConsumer; private MessageType rootSchema; @@ -72,21 +73,21 @@ public void prepareForWrite(RecordConsumer recordConsumer) { } @Override - public void write(GenericRecord record) { + public void write(IndexedRecord record) { recordConsumer.startMessage(); writeRecordFields(rootSchema, rootAvroSchema, record); recordConsumer.endMessage(); } private void writeRecord(GroupType schema, Schema avroSchema, - GenericRecord record) { + IndexedRecord record) { recordConsumer.startGroup(); writeRecordFields(schema, avroSchema, record); recordConsumer.endGroup(); } private void writeRecordFields(GroupType schema, Schema avroSchema, - GenericRecord record) { + IndexedRecord record) { List fields = schema.getFields(); List avroFields = avroSchema.getFields(); int index = 0; // parquet ignores Avro nulls, so index may differ From 61239a0aa5d41e731b6b2a53df4da2b953abe1dd Mon Sep 17 00:00:00 2001 From: Jacob Date: Sat, 22 Jun 2013 16:47:27 +0100 Subject: [PATCH 3/9] Added avro specific functionality --- parquet-avro/pom.xml | 21 ++++- .../java/parquet/avro/TestSpecificReadWrite.java | 104 +++++++++++++++++++++ parquet-avro/src/test/resources/car.avdl | 10 ++ 3 files changed, 134 insertions(+), 1 deletion(-) create mode 100644 parquet-avro/src/test/java/parquet/avro/TestSpecificReadWrite.java create mode 100644 parquet-avro/src/test/resources/car.avdl diff --git a/parquet-avro/pom.xml b/parquet-avro/pom.xml index 59675fcae..e446196d2 100644 --- a/parquet-avro/pom.xml +++ b/parquet-avro/pom.xml @@ -16,6 +16,7 @@ https://github.com/Parquet/parquet-mr + 1.7.4 @@ -37,7 +38,7 @@ org.apache.avro avro - 1.7.3 + ${avro.version} org.apache.hadoop @@ -107,6 +108,24 @@ + + org.apache.avro + avro-maven-plugin + ${avro.version} + + + generate-sources + + idl-protocol + + + ${project.basedir}/src/test/resources/ + ${project.build.directory}/generated-sources + String + + + + diff --git a/parquet-avro/src/test/java/parquet/avro/TestSpecificReadWrite.java b/parquet-avro/src/test/java/parquet/avro/TestSpecificReadWrite.java new file mode 100644 index 000000000..099bf5254 --- /dev/null +++ b/parquet-avro/src/test/java/parquet/avro/TestSpecificReadWrite.java @@ -0,0 +1,104 @@ +package parquet.avro; + +import org.apache.hadoop.fs.Path; +import org.junit.Assert; +import org.junit.Test; +import parquet.hadoop.ParquetReader; +import parquet.hadoop.ParquetWriter; +import parquet.hadoop.metadata.CompressionCodecName; + +import java.io.File; +import java.io.IOException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static parquet.filter.ColumnRecordFilter.column; +import static parquet.filter.ColumnRecordFilter.equalTo; +import static parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE; +import static parquet.hadoop.ParquetWriter.DEFAULT_PAGE_SIZE; + +public class TestSpecificReadWrite { + + @Test + public void testReadWriteSpecific() throws IOException { + Path path = writeCarsToParquetFile( 10, false, CompressionCodecName.UNCOMPRESSED, false); + ParquetReader reader = new AvroParquetReader(path); + for ( int i =0; i < 10; i++ ) { + assertEquals(getVwPolo().toString(), reader.read().toString()); + assertEquals(getVwPassat().toString(), reader.read().toString()); + assertEquals(getBmwMini().toString(), reader.read().toString()); + } + assertNull(reader.read()); + } + + @Test + public void testFilterMatchesMultiple() throws IOException { + + Path path = writeCarsToParquetFile(10, false, CompressionCodecName.UNCOMPRESSED, false); + + ParquetReader reader = new AvroParquetReader(path, column("make", equalTo("Volkswagen"))); + for ( int i =0; i < 10; i++ ) { + assertEquals(reader.read().toString(), getVwPolo().toString()); + assertEquals(reader.read().toString(), getVwPassat().toString()); + } + assertNull( reader.read()); + } + + + private Path writeCarsToParquetFile( int num, boolean varyYear, + CompressionCodecName compression, boolean enableDictionary) throws IOException { + File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp"); + tmp.deleteOnExit(); + tmp.delete(); + Path path = new Path(tmp.getPath()); + + Car vwPolo = getVwPolo(); + Car vwPassat = getVwPassat(); + Car bmwMini = getBmwMini(); + + ParquetWriter writer = new AvroParquetWriter(path, Car.SCHEMA$, compression, + DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE,enableDictionary); + for ( int i =0; i < num; i++ ) { + if (varyYear ) { + vwPolo.setYear( ( i / 100l )); + vwPassat.setYear( ( i / 100l )); + bmwMini.setYear( ( i / 100l )); + } + writer.write(vwPolo); + writer.write(vwPassat); + writer.write(bmwMini); + } + writer.close(); + return path; + } + + public static Car getVwPolo() { + return Car.newBuilder() + .setYear(2010) + .setMake("Volkswagen") + .setModel("Polo") + .setDoors(4) + .setEngineCapacity(1.4f) + .build(); + } + + public static Car getVwPassat() { + return Car.newBuilder() + .setYear(2010) + .setMake("Volkswagen") + .setModel("Passat") + .setDoors(5) + .setEngineCapacity(2.0f) + .build(); + } + + public static Car getBmwMini() { + return Car.newBuilder() + .setYear(2010) + .setMake("BMW") + .setModel("Mini") + .setDoors(4) + .setEngineCapacity(1.6f) + .build(); + } +} diff --git a/parquet-avro/src/test/resources/car.avdl b/parquet-avro/src/test/resources/car.avdl new file mode 100644 index 000000000..d741707a5 --- /dev/null +++ b/parquet-avro/src/test/resources/car.avdl @@ -0,0 +1,10 @@ +@namespace("parquet.avro") +protocol Cars { + record Car { + long year; + string make; + string model; + long doors; + float engineCapacity; + } +} \ No newline at end of file From 48bb48e6dfb9328e6071af68dd19b76aa8ab74a9 Mon Sep 17 00:00:00 2001 From: Jacob Date: Sat, 22 Jun 2013 18:24:41 +0100 Subject: [PATCH 4/9] Added avro specific functionality --- parquet-avro/pom.xml | 56 +++++++++++++++++++++++++++++++++++----------------- 1 file changed, 38 insertions(+), 18 deletions(-) diff --git a/parquet-avro/pom.xml b/parquet-avro/pom.xml index e446196d2..3a032139f 100644 --- a/parquet-avro/pom.xml +++ b/parquet-avro/pom.xml @@ -108,24 +108,44 @@ - - org.apache.avro - avro-maven-plugin - ${avro.version} - - - generate-sources - - idl-protocol - - - ${project.basedir}/src/test/resources/ - ${project.build.directory}/generated-sources - String - - - - + + org.apache.avro + avro-maven-plugin + ${avro.version} + + + generate-test-sources + + idl-protocol + + + ${project.basedir}/src/test/resources + ${project.build.directory}/generated-test-sources + String + + + + + + + org.codehaus.mojo + build-helper-maven-plugin + 1.8 + + + add-test-sources + generate-test-sources + + add-test-source + + + + ${project.build.directory}/generated-test-sources + + + + + From 8285b62ceafe3fe096ebe1836142445acf0a9586 Mon Sep 17 00:00:00 2001 From: Jacob Date: Sun, 23 Jun 2013 14:13:12 +0100 Subject: [PATCH 5/9] Fixed bug querying on Name,Url --- .../java/parquet/column/impl/ColumnReaderImpl.java | 2 +- .../java/parquet/filter/ColumnRecordFilter.java | 4 + .../parquet/io/RecordReaderImplementation.java | 87 +++++++++++++--------- .../src/test/java/parquet/io/TestFiltered.java | 23 +++++- 4 files changed, 75 insertions(+), 41 deletions(-) diff --git a/parquet-column/src/main/java/parquet/column/impl/ColumnReaderImpl.java b/parquet-column/src/main/java/parquet/column/impl/ColumnReaderImpl.java index 9e3e434c3..5c31ac1b1 100644 --- a/parquet-column/src/main/java/parquet/column/impl/ColumnReaderImpl.java +++ b/parquet-column/src/main/java/parquet/column/impl/ColumnReaderImpl.java @@ -268,7 +268,7 @@ public ColumnReaderImpl(ColumnDescriptor path, PageReader pageReader, PrimitiveC */ @Override public boolean isFullyConsumed() { - return readValues >= totalValueCount; + return readValues > totalValueCount || ( readValues == totalValueCount && consumed); } /** diff --git a/parquet-column/src/main/java/parquet/filter/ColumnRecordFilter.java b/parquet-column/src/main/java/parquet/filter/ColumnRecordFilter.java index 2a1beedcb..3cd24f8a8 100644 --- a/parquet-column/src/main/java/parquet/filter/ColumnRecordFilter.java +++ b/parquet-column/src/main/java/parquet/filter/ColumnRecordFilter.java @@ -21,6 +21,9 @@ /** * Factory method for record filter which applies the supplied predicate to the specified column. + * Note that if searching for a repeated sub-attribute it will only ever match against the + * first instance of it in the object. + * * @param columnPath Dot separated path specifier, e.g. "engine.capacity" * @param predicate Should call getBinary etc. and check the value */ @@ -54,6 +57,7 @@ private ColumnRecordFilter(ColumnReader filterOnColumn, Predicate */ @Override public boolean isMatch() { + return ( filterOnColumn.isFullyConsumed()) ? false : filterPredicate.apply( filterOnColumn ); } diff --git a/parquet-column/src/main/java/parquet/io/RecordReaderImplementation.java b/parquet-column/src/main/java/parquet/io/RecordReaderImplementation.java index 7fe10c4ba..488dd6e0a 100644 --- a/parquet-column/src/main/java/parquet/io/RecordReaderImplementation.java +++ b/parquet-column/src/main/java/parquet/io/RecordReaderImplementation.java @@ -237,7 +237,7 @@ public Case getCase(int currentLevel, int d, int nextR) { * * @param root the root of the schema * @param validating - * @param columnsStore + * @param columnStore * @param unboundFilter Filter records, pass in NULL_FILTER to leave unfiltered. */ public RecordReaderImplementation(MessageColumnIO root, RecordMaterializer recordMaterializer, boolean validating, ColumnReadStoreImpl columnStore, @@ -384,54 +384,69 @@ private RecordConsumer wrap(RecordConsumer recordConsumer) { */ @Override public T read() { - // Loop will end when either the record is materialized or we run out of values to consume - while ( !recordFilter.isFullyConsumed()) { + // Skip forwards until the filter matches a record + if ( !skipToMatch()) { + return null; + } - int currentLevel = 0; - State currentState = states[0]; - boolean materializeRecord = recordFilter.isMatch(); - if ( materializeRecord ) { - // Where we are creating objects this is likely to be expensive. - recordConsumer.start(); + // Materialize the record + int currentLevel = 0; + State currentState = states[0]; + recordConsumer.start(); + do { + ColumnReader columnReader = currentState.column; + int d = columnReader.getCurrentDefinitionLevel(); + // creating needed nested groups until the current field (opening tags) + int depth = currentState.definitionLevelToDepth[d]; + for (; currentLevel <= depth; ++currentLevel) { + currentState.groupConverterPath[currentLevel].start(); + } + // currentLevel = depth + 1 at this point + // set the current value + if (d >= currentState.maxDefinitionLevel) { + // not null + columnReader.writeCurrentValueToConverter(); + } + columnReader.consume(); + + int nextR = currentState.maxRepetitionLevel == 0 ? 0 : columnReader.getCurrentRepetitionLevel(); + // level to go to close current groups + int next = currentState.nextLevel[nextR]; + for (; currentLevel > next; currentLevel--) { + currentState.groupConverterPath[currentLevel - 1].end(); + } + currentState = currentState.nextState[nextR]; + } while (currentState != null); + recordConsumer.end(); + return recordMaterializer.getCurrentRecord(); + } + + /** + * Skips forwards until the filter finds the first match. Returns false + * if none found. + */ + private boolean skipToMatch() { + while ( !recordFilter.isMatch()) { + if ( recordFilter.isFullyConsumed()) { + return false; } + State currentState = states[0]; do { ColumnReader columnReader = currentState.column; - int d = columnReader.getCurrentDefinitionLevel(); - if (materializeRecord) { - // creating needed nested groups until the current field (opening tags) - int depth = currentState.definitionLevelToDepth[d]; - for (; currentLevel <= depth; ++currentLevel) { - currentState.groupConverterPath[currentLevel].start(); - } - } + // currentLevel = depth + 1 at this point // set the current value - if (d >= currentState.maxDefinitionLevel) { - // not null - if (materializeRecord) { - columnReader.writeCurrentValueToConverter(); - } else { + if (columnReader.getCurrentDefinitionLevel() >= currentState.maxDefinitionLevel) { columnReader.skip(); - } } columnReader.consume(); + // Based on repetition level work out next state to go to int nextR = currentState.maxRepetitionLevel == 0 ? 0 : columnReader.getCurrentRepetitionLevel(); - // level to go to close current groups - int next = currentState.nextLevel[nextR]; - if (materializeRecord) { - for (; currentLevel > next; currentLevel--) { - currentState.groupConverterPath[currentLevel - 1].end(); - } - } currentState = currentState.nextState[nextR]; } while (currentState != null); - - if (materializeRecord) { - recordConsumer.end(); - return recordMaterializer.getCurrentRecord(); - } } - return null; + } + return true; } private static void log(String string) { diff --git a/parquet-column/src/test/java/parquet/io/TestFiltered.java b/parquet-column/src/test/java/parquet/io/TestFiltered.java index b46acbe00..d7dd8d5ce 100644 --- a/parquet-column/src/test/java/parquet/io/TestFiltered.java +++ b/parquet-column/src/test/java/parquet/io/TestFiltered.java @@ -51,18 +51,33 @@ public void testFilterOnInteger() { } - /** - * Not yet working. - */ - @Ignore + @Test public void testFilterOnString() { MemPageStore memPageStore = new MemPageStore(); MessageColumnIO columnIO = new ColumnIOFactory(true).getColumnIO(schema); writeTestRecords(memPageStore, columnIO, 1); + // First try matching against the A url in record 1 RecordMaterializer recordConverter = new GroupRecordConverter(schema); RecordReaderImplementation recordReader = (RecordReaderImplementation) columnIO.getRecordReader(memPageStore, recordConverter, + column("Name.Url", equalTo("http://A"))); + + Group actual1 = recordReader.read(); + assertNull( "There should be no more records as r2 filtered out", recordReader.read()); + assertEquals("filtering did not return the correct record", r1.toString(), actual1.toString()); + + // Second try matching against the B url in record 1 - it should fail as we only match + // against the first instance of a + recordReader = (RecordReaderImplementation) + columnIO.getRecordReader(memPageStore, recordConverter, + column("Name.Url", equalTo("http://B"))); + + assertNull( "There should be no matching records", recordReader.read()); + + // Finally try matching against the C url in record 2 + recordReader = (RecordReaderImplementation) + columnIO.getRecordReader(memPageStore, recordConverter, column("Name.Url", equalTo("http://C"))); Group actual2 = recordReader.read(); From ac5cbd1a48f12a0a431af07c9867b0b0ae04eceb Mon Sep 17 00:00:00 2001 From: Jacob Date: Sun, 23 Jun 2013 16:13:48 +0100 Subject: [PATCH 6/9] Implmented more efficient skip algorithm --- .../java/parquet/column/impl/ColumnReaderImpl.java | 77 ++++++++++++++++------ .../java/parquet/column/values/ValuesReader.java | 41 ++++++++++++ .../values/dictionary/DictionaryValuesReader.java | 8 +++ .../values/plain/BinaryPlainValuesReader.java | 12 ++++ .../values/plain/BooleanPlainValuesReader.java | 8 +++ .../column/values/plain/PlainValuesReader.java | 36 ++++++++++ 6 files changed, 162 insertions(+), 20 deletions(-) diff --git a/parquet-column/src/main/java/parquet/column/impl/ColumnReaderImpl.java b/parquet-column/src/main/java/parquet/column/impl/ColumnReaderImpl.java index 5c31ac1b1..af695ac7e 100644 --- a/parquet-column/src/main/java/parquet/column/impl/ColumnReaderImpl.java +++ b/parquet-column/src/main/java/parquet/column/impl/ColumnReaderImpl.java @@ -47,6 +47,7 @@ private static abstract class Binding { abstract void read(); + abstract void skip(); abstract void writeValue(); public int getDictionaryId() { throw new UnsupportedOperationException(); @@ -99,6 +100,10 @@ private void bindToDictionary(final Dictionary dictionary) { void read() { dictionaryId = dataColumn.readValueDictionaryId(); } + public void skip() { + // Type is not important here as its just a key + dataColumn.skipBytes(); + } public int getDictionaryId() { return dictionaryId; } @@ -135,6 +140,10 @@ public Binding convertFLOAT(PrimitiveTypeName primitiveTypeName) { void read() { current = dataColumn.readFloat(); } + public void skip() { + current = 0; + dataColumn.skipFloat(); + } public float getFloat() { return current; } @@ -150,6 +159,10 @@ public Binding convertDOUBLE(PrimitiveTypeName primitiveTypeName) { void read() { current = dataColumn.readDouble(); } + public void skip() { + current = 0; + dataColumn.skipDouble(); + } public double getDouble() { return current; } @@ -165,6 +178,10 @@ public Binding convertINT32(PrimitiveTypeName primitiveTypeName) { void read() { current = dataColumn.readInteger(); } + public void skip() { + current = 0; + dataColumn.skipInteger(); + } @Override public int getInteger() { return current; @@ -181,6 +198,10 @@ public Binding convertINT64(PrimitiveTypeName primitiveTypeName) { void read() { current = dataColumn.readLong(); } + public void skip() { + current = 0; + dataColumn.skipLong(); + } @Override public long getLong() { return current; @@ -206,6 +227,10 @@ public Binding convertBOOLEAN(PrimitiveTypeName primitiveTypeName) { void read() { current = dataColumn.readBoolean(); } + public void skip() { + current = false; + dataColumn.skipBoolean(); + } @Override public boolean getBoolean() { return current; @@ -222,6 +247,10 @@ public Binding convertBINARY(PrimitiveTypeName primitiveTypeName) { void read() { current = dataColumn.readBytes(); } + public void skip() { + current = null; + dataColumn.skipBytes(); + } @Override public Binary getBinary() { return current; @@ -268,7 +297,7 @@ public ColumnReaderImpl(ColumnDescriptor path, PageReader pageReader, PrimitiveC */ @Override public boolean isFullyConsumed() { - return readValues > totalValueCount || ( readValues == totalValueCount && consumed); + return ((readValues == totalValueCount) && consumed) || (readValues > totalValueCount); } /** @@ -277,13 +306,13 @@ public boolean isFullyConsumed() { */ @Override public void writeCurrentValueToConverter() { - checkValueRead(); + checkValueRead(false); this.binding.writeValue(); } @Override public int getCurrentValueDictionaryID() { - checkValueRead(); + checkValueRead(false); return binding.getDictionaryId(); } @@ -293,7 +322,7 @@ public int getCurrentValueDictionaryID() { */ @Override public int getInteger() { - checkValueRead(); + checkValueRead(false); return this.binding.getInteger(); } @@ -303,7 +332,7 @@ public int getInteger() { */ @Override public boolean getBoolean() { - checkValueRead(); + checkValueRead(false); return this.binding.getBoolean(); } @@ -313,7 +342,7 @@ public boolean getBoolean() { */ @Override public long getLong() { - checkValueRead(); + checkValueRead(false); return this.binding.getLong(); } @@ -323,7 +352,7 @@ public long getLong() { */ @Override public Binary getBinary() { - checkValueRead(); + checkValueRead(false); return this.binding.getBinary(); } @@ -333,7 +362,7 @@ public Binary getBinary() { */ @Override public float getFloat() { - checkValueRead(); + checkValueRead(false); return this.binding.getFloat(); } @@ -343,7 +372,7 @@ public float getFloat() { */ @Override public double getDouble() { - checkValueRead(); + checkValueRead(false); return this.binding.getDouble(); } @@ -359,15 +388,6 @@ public int getCurrentRepetitionLevel() { /** * {@inheritDoc} - * @see parquet.column.ColumnReader#skip() - */ - @Override - public void skip() { - checkValueRead(); // must be a more efficient version of this - } - - /** - * {@inheritDoc} * @see parquet.column.ColumnReader#getDescriptor() */ @Override @@ -382,11 +402,19 @@ public void readCurrentValue() { binding.read(); } - protected void checkValueRead() { + /** + * Reads the value into the binding or skips forwards. + * @param skip If true don;t deserialize just skip forwards + */ + protected void checkValueRead(boolean skip) { try { checkRead(); if (!consumed && !valueRead) { - readCurrentValue(); + if ( skip ) { + binding.skip(); + } else { + readCurrentValue(); + } valueRead = true; } } catch (RuntimeException e) { @@ -400,6 +428,15 @@ protected void checkValueRead() { /** * {@inheritDoc} + * @see parquet.column.ColumnReader#skip() + */ + @Override + public void skip() { + checkValueRead(true); + } + + /** + * {@inheritDoc} * @see parquet.column.ColumnReader#getCurrentDefinitionLevel() */ @Override diff --git a/parquet-column/src/main/java/parquet/column/values/ValuesReader.java b/parquet-column/src/main/java/parquet/column/values/ValuesReader.java index d5f96c131..5086e60ac 100644 --- a/parquet-column/src/main/java/parquet/column/values/ValuesReader.java +++ b/parquet-column/src/main/java/parquet/column/values/ValuesReader.java @@ -72,6 +72,13 @@ public boolean readBoolean() { } /** + * Skips the next boolean in the page + */ + public void skipBoolean() { + throw new UnsupportedOperationException(); + } + + /** * @return the next Binary from the page */ public Binary readBytes() { @@ -79,6 +86,13 @@ public Binary readBytes() { } /** + * Skips the next Binary in the page + */ + public void skipBytes() { + throw new UnsupportedOperationException(); + } + + /** * @return the next float from the page */ public float readFloat() { @@ -86,6 +100,13 @@ public float readFloat() { } /** + * Skips the next float in the page + */ + public void skipFloat() { + throw new UnsupportedOperationException(); + } + + /** * @return the next double from the page */ public double readDouble() { @@ -93,6 +114,13 @@ public double readDouble() { } /** + * Skips the next float in the page + */ + public void skipDouble() { + throw new UnsupportedOperationException(); + } + + /** * @return the next integer from the page */ public int readInteger() { @@ -100,10 +128,23 @@ public int readInteger() { } /** + * Skips the next integer in the page + */ + public void skipInteger() { + throw new UnsupportedOperationException(); + } + + /** * @return the next long from the page */ public long readLong() { throw new UnsupportedOperationException(); } + /** + * Skips the next long in the page + */ + public void skipLong() { + throw new UnsupportedOperationException(); + } } \ No newline at end of file diff --git a/parquet-column/src/main/java/parquet/column/values/dictionary/DictionaryValuesReader.java b/parquet-column/src/main/java/parquet/column/values/dictionary/DictionaryValuesReader.java index 8cafa093c..b2920b077 100644 --- a/parquet-column/src/main/java/parquet/column/values/dictionary/DictionaryValuesReader.java +++ b/parquet-column/src/main/java/parquet/column/values/dictionary/DictionaryValuesReader.java @@ -77,4 +77,12 @@ public Binary readBytes() { } } + @Override + public void skipBytes() { + try { + decoder.readInt(); // Type does not matter as we are just skipping dictionary keys + } catch (IOException e) { + throw new ParquetDecodingException(e); + } + } } diff --git a/parquet-column/src/main/java/parquet/column/values/plain/BinaryPlainValuesReader.java b/parquet-column/src/main/java/parquet/column/values/plain/BinaryPlainValuesReader.java index 238e749b0..6a009dbd8 100644 --- a/parquet-column/src/main/java/parquet/column/values/plain/BinaryPlainValuesReader.java +++ b/parquet-column/src/main/java/parquet/column/values/plain/BinaryPlainValuesReader.java @@ -45,6 +45,18 @@ public Binary readBytes() { } @Override + public void skipBytes() { + try { + int length = BytesUtils.readIntLittleEndian(in, offset); + offset += 4 + length; + } catch (IOException e) { + throw new ParquetDecodingException("could not skip bytes at offset " + offset, e); + } catch (RuntimeException e) { + throw new ParquetDecodingException("could not skip bytes at offset " + offset, e); + } + } + + @Override public int initFromPage(long valueCount, byte[] in, int offset) throws IOException { if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.length - offset)); diff --git a/parquet-column/src/main/java/parquet/column/values/plain/BooleanPlainValuesReader.java b/parquet-column/src/main/java/parquet/column/values/plain/BooleanPlainValuesReader.java index c259f245f..dfe3cd48a 100644 --- a/parquet-column/src/main/java/parquet/column/values/plain/BooleanPlainValuesReader.java +++ b/parquet-column/src/main/java/parquet/column/values/plain/BooleanPlainValuesReader.java @@ -45,6 +45,14 @@ public boolean readBoolean() { return in.readInteger() == 0 ? false : true; } + /** + * {@inheritDoc} + * @see parquet.column.values.ValuesReader#skipBoolean() + */ + @Override + public void skipBoolean() { + in.readInteger(); + } /** * {@inheritDoc} diff --git a/parquet-column/src/main/java/parquet/column/values/plain/PlainValuesReader.java b/parquet-column/src/main/java/parquet/column/values/plain/PlainValuesReader.java index 2bf574b40..fdf7e9e78 100644 --- a/parquet-column/src/main/java/parquet/column/values/plain/PlainValuesReader.java +++ b/parquet-column/src/main/java/parquet/column/values/plain/PlainValuesReader.java @@ -46,6 +46,15 @@ public float readFloat() { } @Override + public void skipFloat() { + try { + in.skipBytes(4); + } catch (IOException e) { + throw new ParquetDecodingException("could not skip float", e); + } + } + + @Override public double readDouble() { try { return in.readDouble(); @@ -55,6 +64,15 @@ public double readDouble() { } @Override + public void skipDouble() { + try { + in.skipBytes(8); + } catch (IOException e) { + throw new ParquetDecodingException("could not skip double", e); + } + } + + @Override public int readInteger() { try { return in.readInt(); @@ -64,6 +82,15 @@ public int readInteger() { } @Override + public void skipInteger() { + try { + in.skipBytes(4); + } catch (IOException e) { + throw new ParquetDecodingException("could not skip int", e); + } + } + + @Override public long readLong() { try { return in.readLong(); @@ -72,6 +99,15 @@ public long readLong() { } } + @Override + public void skipLong() { + try { + in.skipBytes(8); + } catch (IOException e) { + throw new ParquetDecodingException("could not skip long", e); + } + } + /** * {@inheritDoc} * @see parquet.column.values.ValuesReader#initFromPage(byte[], int) From 1d7a5c33c935710ac8e5fa722d77e23fe55d5c5e Mon Sep 17 00:00:00 2001 From: Jacob Date: Mon, 1 Jul 2013 01:06:05 +0100 Subject: [PATCH 7/9] Fixing after code reviews --- .../parquet/avro/AvroIndexedRecordConverter.java | 83 +++++++++++++-- .../java/parquet/avro/AvroParquetInputFormat.java | 1 - .../java/parquet/avro/AvroParquetOutputFormat.java | 1 - .../main/java/parquet/avro/AvroParquetWriter.java | 26 ++--- .../main/java/parquet/avro/AvroReadSupport.java | 5 +- .../java/parquet/avro/AvroRecordMaterializer.java | 1 - .../java/parquet/avro/AvroSchemaConverter.java | 42 ++++++-- .../main/java/parquet/avro/AvroWriteSupport.java | 41 ++++++-- .../java/parquet/avro/TestAvroSchemaConverter.java | 11 -- .../java/parquet/avro/TestSpecificReadWrite.java | 115 ++++++++++++++++----- parquet-avro/src/test/resources/car.avdl | 32 +++++- parquet-column/pom.xml | 5 - .../java/parquet/column/impl/ColumnReaderImpl.java | 2 +- .../main/java/parquet/filter/ColumnPredicates.java | 82 +++++++++++++++ .../java/parquet/filter/ColumnRecordFilter.java | 43 ++------ .../main/java/parquet/filter/NullRecordFilter.java | 30 ------ .../java/parquet/filter/PagedRecordFilter.java | 3 +- .../src/main/java/parquet/filter/RecordFilter.java | 5 - .../java/parquet/filter/UnboundRecordFilter.java | 1 - .../main/java/parquet/io/FilteredRecordReader.java | 77 ++++++++++++++ .../src/main/java/parquet/io/MessageColumnIO.java | 12 ++- .../parquet/io/RecordReaderImplementation.java | 58 +++-------- .../src/test/java/parquet/io/TestFiltered.java | 10 +- .../main/java/parquet/hadoop/ParquetReader.java | 4 +- .../java/parquet/hadoop/ParquetRecordReader.java | 2 +- 25 files changed, 468 insertions(+), 224 deletions(-) create mode 100644 parquet-column/src/main/java/parquet/filter/ColumnPredicates.java delete mode 100644 parquet-column/src/main/java/parquet/filter/NullRecordFilter.java create mode 100644 parquet-column/src/main/java/parquet/io/FilteredRecordReader.java diff --git a/parquet-avro/src/main/java/parquet/avro/AvroIndexedRecordConverter.java b/parquet-avro/src/main/java/parquet/avro/AvroIndexedRecordConverter.java index 459659aec..b2a280530 100644 --- a/parquet-avro/src/main/java/parquet/avro/AvroIndexedRecordConverter.java +++ b/parquet-avro/src/main/java/parquet/avro/AvroIndexedRecordConverter.java @@ -24,6 +24,7 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.avro.specific.SpecificData; +import parquet.Preconditions; import parquet.io.api.Binary; import parquet.io.api.Converter; import parquet.io.api.GroupConverter; @@ -40,6 +41,7 @@ private final GroupType parquetSchema; private final Schema avroSchema; + private final Class specificClass; public AvroIndexedRecordConverter(MessageType parquetSchema, Schema avroSchema) { this(null, parquetSchema, avroSchema); @@ -52,6 +54,7 @@ public AvroIndexedRecordConverter(ParentValueContainer parent, GroupType this.avroSchema = avroSchema; int schemaSize = parquetSchema.getFieldCount(); this.converters = new Converter[schemaSize]; + this.specificClass = SpecificData.get().getClass(avroSchema); int index = 0; // parquet ignores Avro nulls, so index may differ for (int avroIndex = 0; avroIndex < avroSchema.getFields().size(); avroIndex++) { Schema.Field field = avroSchema.getFields().get(avroIndex); @@ -90,11 +93,13 @@ private static Converter newConverter(Schema schema, Type type, } else if (schema.getType().equals(Schema.Type.RECORD)) { return new AvroIndexedRecordConverter(parent, type.asGroupType(), schema); } else if (schema.getType().equals(Schema.Type.ENUM)) { - return new FieldStringConverter(parent); + return new FieldEnumConverter(parent,schema); } else if (schema.getType().equals(Schema.Type.ARRAY)) { - return new GenericArrayConverter(parent, type, schema); + return new AvroArrayConverter(parent, type, schema); } else if (schema.getType().equals(Schema.Type.MAP)) { return new MapConverter(parent, type, schema); + } else if (schema.getType().equals(Schema.Type.UNION)) { + return new AvroUnionConverter(parent, type, schema); } else if (schema.getType().equals(Schema.Type.FIXED)) { return new FieldFixedConverter(parent, schema); } @@ -114,8 +119,9 @@ public Converter getConverter(int fieldIndex) { @Override public void start() { // Should do the right thing whether it is generic or specific - this.currentRecord = - (IndexedRecord) SpecificData.get().newRecord((IndexedRecord)null, avroSchema); + this.currentRecord = (this.specificClass == null) ? + new GenericData.Record(avroSchema) : + (IndexedRecord)SpecificData.newInstance(specificClass, avroSchema); } @Override @@ -243,6 +249,26 @@ final public void addBinary(Binary value) { } + static final class FieldEnumConverter extends PrimitiveConverter { + + private final ParentValueContainer parent; + private final Class enumClass; + + public FieldEnumConverter(ParentValueContainer parent, Schema enumSchema) { + this.parent = parent; + this.enumClass = SpecificData.get().getClass(enumSchema); + } + + @Override + final public void addBinary(Binary value) { + Object enumValue = value.toStringUsingUTF8(); + if (enumClass != null) { + enumValue = (Enum.valueOf(enumClass,(String)enumValue)); + } + parent.add(enumValue); + } + } + static final class FieldFixedConverter extends PrimitiveConverter { private final ParentValueContainer parent; @@ -260,14 +286,14 @@ final public void addBinary(Binary value) { } - static final class GenericArrayConverter extends GroupConverter { + static final class AvroArrayConverter extends GroupConverter { private final ParentValueContainer parent; private final Schema avroSchema; private final Converter converter; private GenericArray array; - public GenericArrayConverter(ParentValueContainer parent, Type parquetSchema, + public AvroArrayConverter(ParentValueContainer parent, Type parquetSchema, Schema avroSchema) { this.parent = parent; this.avroSchema = avroSchema; @@ -298,6 +324,51 @@ public void end() { } } + static final class AvroUnionConverter extends GroupConverter { + + private final ParentValueContainer parent; + private final Converter[] memberConverters; + private Object memberValue = null; + + public AvroUnionConverter(ParentValueContainer parent, Type parquetSchema, + Schema avroSchema) { + this.parent = parent; + GroupType parquetGroup = parquetSchema.asGroupType(); + this.memberConverters = new Converter[ parquetGroup.getFieldCount()]; + + int parquetIndex = 0; + for (int index = 0; index < avroSchema.getTypes().size(); index++) { + Schema memberSchema = avroSchema.getTypes().get(index); + if (!memberSchema.getType().equals(Schema.Type.NULL)) { + Type memberType = parquetGroup.getType(parquetIndex); + memberConverters[parquetIndex] = newConverter(memberSchema, memberType, new ParentValueContainer() { + @Override + void add(Object value) { + Preconditions.checkArgument(memberValue==null, "Union is resolving to more than one type"); + memberValue = value; + } + }); + parquetIndex++; // Note for nulls the parquetIndex id not increased + } + } + } + + @Override + public Converter getConverter(int fieldIndex) { + return memberConverters[fieldIndex]; + } + + @Override + public void start() { + memberValue = null; + } + + @Override + public void end() { + parent.add(memberValue); + } + } + static final class MapConverter extends GroupConverter { private final ParentValueContainer parent; diff --git a/parquet-avro/src/main/java/parquet/avro/AvroParquetInputFormat.java b/parquet-avro/src/main/java/parquet/avro/AvroParquetInputFormat.java index 9b15997bc..8c95121d4 100644 --- a/parquet-avro/src/main/java/parquet/avro/AvroParquetInputFormat.java +++ b/parquet-avro/src/main/java/parquet/avro/AvroParquetInputFormat.java @@ -15,7 +15,6 @@ */ package parquet.avro; -import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import parquet.avro.AvroReadSupport; import parquet.hadoop.ParquetInputFormat; diff --git a/parquet-avro/src/main/java/parquet/avro/AvroParquetOutputFormat.java b/parquet-avro/src/main/java/parquet/avro/AvroParquetOutputFormat.java index 5007a684b..67f7ca10a 100644 --- a/parquet-avro/src/main/java/parquet/avro/AvroParquetOutputFormat.java +++ b/parquet-avro/src/main/java/parquet/avro/AvroParquetOutputFormat.java @@ -16,7 +16,6 @@ package parquet.avro; import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.mapreduce.Job; import parquet.avro.AvroWriteSupport; diff --git a/parquet-avro/src/main/java/parquet/avro/AvroParquetWriter.java b/parquet-avro/src/main/java/parquet/avro/AvroParquetWriter.java index ed631e545..27d359cb6 100644 --- a/parquet-avro/src/main/java/parquet/avro/AvroParquetWriter.java +++ b/parquet-avro/src/main/java/parquet/avro/AvroParquetWriter.java @@ -40,38 +40,38 @@ public AvroParquetWriter(Path file, Schema avroSchema, CompressionCodecName compressionCodecName, int blockSize, int pageSize) throws IOException { - super(file, (WriteSupport) - new AvroWriteSupport(new AvroSchemaConverter().convert(avroSchema), avroSchema), - compressionCodecName, blockSize, pageSize); + super(file, (WriteSupport)new AvroWriteSupport(new AvroSchemaConverter().convert(avroSchema), avroSchema), + compressionCodecName, blockSize, pageSize); } /** Create a new {@link AvroParquetWriter}. * - * @param file - * @param avroSchema - * @param compressionCodecName - * @param blockSize - * @param pageSize + * @param file The file name to write to. + * @param avroSchema The schema to write with. + * @param compressionCodecName Compression code to use, or CompressionCodecName.UNCOMPRESSED + * @param blockSize HDFS block size + * @param pageSize See parquet write up. Blocks are subdivided into pages for alignment and other purposes. + * @param enableDictionary Whether to use a dictionary to compress columns. * @throws IOException */ public AvroParquetWriter(Path file, Schema avroSchema, CompressionCodecName compressionCodecName, int blockSize, int pageSize, boolean enableDictionary) throws IOException { super(file, (WriteSupport) - new AvroWriteSupport(new AvroSchemaConverter().convert(avroSchema), avroSchema), - compressionCodecName, blockSize, pageSize,enableDictionary,false); + new AvroWriteSupport(new AvroSchemaConverter().convert(avroSchema),avroSchema), + compressionCodecName, blockSize, pageSize, enableDictionary, false); } /** Create a new {@link AvroParquetWriter}. The default block size is 50 MB.The default * page size is 1 MB. Default compression is no compression. (Inherited from {@link ParquetWriter}) * - * @param file - * @param avroSchema + * @param file The file name to write to. + * @param avroSchema The schema to write with. * @throws IOException */ public AvroParquetWriter(Path file, Schema avroSchema) throws IOException { this(file, avroSchema, CompressionCodecName.UNCOMPRESSED, - DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE); + DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE); } } diff --git a/parquet-avro/src/main/java/parquet/avro/AvroReadSupport.java b/parquet-avro/src/main/java/parquet/avro/AvroReadSupport.java index 1d0d0b746..1ea892996 100644 --- a/parquet-avro/src/main/java/parquet/avro/AvroReadSupport.java +++ b/parquet-avro/src/main/java/parquet/avro/AvroReadSupport.java @@ -17,7 +17,6 @@ import java.util.Map; import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.conf.Configuration; import parquet.hadoop.api.ReadSupport; @@ -25,8 +24,8 @@ import parquet.schema.MessageType; /** - * Avro implementation of {@link ReadSupport} for {@link GenericRecord}s. Users should - * use {@link AvroParquetReader} or {@link AvroParquetInputFormat} rather than using + * Avro implementation of {@link ReadSupport} for Avro {@link IndexedRecord}s which cover both Avro Specific and + * Generic. Users should use {@link AvroParquetReader} or {@link AvroParquetInputFormat} rather than using * this class directly. */ public class AvroReadSupport extends ReadSupport { diff --git a/parquet-avro/src/main/java/parquet/avro/AvroRecordMaterializer.java b/parquet-avro/src/main/java/parquet/avro/AvroRecordMaterializer.java index a7cf09a6b..e9df29f25 100644 --- a/parquet-avro/src/main/java/parquet/avro/AvroRecordMaterializer.java +++ b/parquet-avro/src/main/java/parquet/avro/AvroRecordMaterializer.java @@ -16,7 +16,6 @@ package parquet.avro; import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import parquet.io.api.GroupConverter; import parquet.io.api.RecordMaterializer; diff --git a/parquet-avro/src/main/java/parquet/avro/AvroSchemaConverter.java b/parquet-avro/src/main/java/parquet/avro/AvroSchemaConverter.java index a7c28d521..9a5524a0d 100644 --- a/parquet-avro/src/main/java/parquet/avro/AvroSchemaConverter.java +++ b/parquet-avro/src/main/java/parquet/avro/AvroSchemaConverter.java @@ -15,8 +15,8 @@ */ package parquet.avro; -import java.util.ArrayList; -import java.util.List; +import java.util.*; + import org.apache.avro.Schema; import parquet.schema.GroupType; import parquet.schema.MessageType; @@ -114,19 +114,39 @@ private Type convertField(String fieldName, Schema schema, Type.Repetition repet } else if (type.equals(Schema.Type.FIXED)) { return primitive(fieldName, FIXED_LEN_BYTE_ARRAY, repetition); } else if (type.equals(Schema.Type.UNION)) { - List schemas = schema.getTypes(); - if (schemas.size() == 2) { - if (schemas.get(0).getType().equals(Schema.Type.NULL)) { - return convertField(fieldName, schemas.get(1), Type.Repetition.OPTIONAL); - } else if (schemas.get(1).getType().equals(Schema.Type.NULL)) { - return convertField(fieldName, schemas.get(0), Type.Repetition.OPTIONAL); - } - } - throw new UnsupportedOperationException("Cannot convert Avro type " + type); + return convertUnion(fieldName, schema, repetition); } throw new UnsupportedOperationException("Cannot convert Avro type " + type); } + private Type convertUnion(String fieldName, Schema schema, Type.Repetition repetition) { + List nonNullSchemas = new ArrayList(schema.getTypes().size()); + for (Schema childSchema : schema.getTypes()) { + if (childSchema.getType().equals(Schema.Type.NULL)) { + repetition = Type.Repetition.OPTIONAL; + } else { + nonNullSchemas.add(childSchema); + } + } + // If we only get a null and one other type then its a simple optional field + // otherwise construct a union container + switch (nonNullSchemas.size()) { + case 0: + throw new UnsupportedOperationException("Cannot convert Avro union of only nulls"); + + case 1: + return convertField(fieldName, nonNullSchemas.get(0), Type.Repetition.OPTIONAL); // Simple optional field + + default: // complex union type + List unionTypes = new ArrayList(nonNullSchemas.size()); + int index = 0; + for (Schema childSchema : nonNullSchemas) { + unionTypes.add( convertField("member" + index++, childSchema, Type.Repetition.OPTIONAL)); + } + return new GroupType(repetition, fieldName, unionTypes); + } + } + private Type convertField(Schema.Field field) { return convertField(field.name(), field.schema()); } diff --git a/parquet-avro/src/main/java/parquet/avro/AvroWriteSupport.java b/parquet-avro/src/main/java/parquet/avro/AvroWriteSupport.java index 1d2af57ce..fc5ec70b7 100644 --- a/parquet-avro/src/main/java/parquet/avro/AvroWriteSupport.java +++ b/parquet-avro/src/main/java/parquet/avro/AvroWriteSupport.java @@ -21,9 +21,11 @@ import java.util.Map; import org.apache.avro.Schema; import org.apache.avro.generic.GenericArray; +import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericFixed; -import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; import org.apache.avro.util.Utf8; import org.apache.hadoop.conf.Configuration; import parquet.hadoop.api.WriteSupport; @@ -34,8 +36,8 @@ import parquet.schema.Type; /** - * Avro implementation of {@link WriteSupport} for {@link GenericRecord}s. Users should - * use {@link AvroParquetWriter} or {@link AvroParquetOutputFormat} rather than using + * Avro implementation of {@link WriteSupport} for {@link IndexedRecord}s - both Avro Generic and Specific. + * Users should use {@link AvroParquetWriter} or {@link AvroParquetOutputFormat} rather than using * this class directly. */ public class AvroWriteSupport extends WriteSupport { @@ -110,7 +112,7 @@ private void writeRecordFields(GroupType schema, Schema avroSchema, } private void writeArray(GroupType schema, Schema avroSchema, - GenericArray array) { + Iterable array) { recordConsumer.startGroup(); // group wrapper (original type LIST) recordConsumer.startField("array", 0); for (T elt : array) { @@ -144,6 +146,31 @@ private void writeRecordFields(GroupType schema, Schema avroSchema, recordConsumer.endGroup(); } + private void writeUnion(GroupType parquetSchema, Schema avroSchema, Object value) { + + recordConsumer.startGroup(); + + // ResolveUnion will tell us which of the union member types to deserialise + int avroIndex = GenericData.get().resolveUnion(avroSchema, value); + + // For parquet's schema we skip nulls + GroupType parquetGroup = parquetSchema.asGroupType(); + int parquetIndex = avroIndex; + for ( int i=0; i) value); + writeArray((GroupType) type, nonNullAvroSchema, (Iterable) value); } else if (avroType.equals(Schema.Type.MAP)) { writeMap((GroupType) type, nonNullAvroSchema, (Map) value); + } else if (avroType.equals(Schema.Type.UNION)) { + writeUnion((GroupType) type, nonNullAvroSchema, value); } else if (avroType.equals(Schema.Type.FIXED)) { recordConsumer.addBinary(Binary.fromByteArray(((GenericFixed) value).bytes())); } diff --git a/parquet-avro/src/test/java/parquet/avro/TestAvroSchemaConverter.java b/parquet-avro/src/test/java/parquet/avro/TestAvroSchemaConverter.java index 5c87ab7e8..551143a3b 100644 --- a/parquet-avro/src/test/java/parquet/avro/TestAvroSchemaConverter.java +++ b/parquet-avro/src/test/java/parquet/avro/TestAvroSchemaConverter.java @@ -88,15 +88,4 @@ public void testOptionalFields() throws Exception { " optional int32 myint;\n" + "}\n"); } - - @Test(expected = UnsupportedOperationException.class) - public void testNonNullUnion() { - Schema union = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.INT), - Schema.create(Schema.Type.LONG))); - Schema schema = Schema.createRecord("record1", null, null, false); - schema.setFields(Arrays.asList( - new Schema.Field("myunion", union, null, null))); - - new AvroSchemaConverter().convert(schema); - } } diff --git a/parquet-avro/src/test/java/parquet/avro/TestSpecificReadWrite.java b/parquet-avro/src/test/java/parquet/avro/TestSpecificReadWrite.java index 099bf5254..60765870d 100644 --- a/parquet-avro/src/test/java/parquet/avro/TestSpecificReadWrite.java +++ b/parquet-avro/src/test/java/parquet/avro/TestSpecificReadWrite.java @@ -1,5 +1,6 @@ package parquet.avro; +import com.google.common.collect.ImmutableList; import org.apache.hadoop.fs.Path; import org.junit.Assert; import org.junit.Test; @@ -13,20 +14,36 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static parquet.filter.ColumnRecordFilter.column; -import static parquet.filter.ColumnRecordFilter.equalTo; +import static parquet.filter.ColumnPredicates.equalTo; import static parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE; import static parquet.hadoop.ParquetWriter.DEFAULT_PAGE_SIZE; +/** + * Other tests exercise the use of Avro Generic, a dynamic data representation. This class focuses + * on Avro Speific whose schemas are pre-compiled to POJOs with built in SerDe for faster serialization. + */ public class TestSpecificReadWrite { @Test public void testReadWriteSpecific() throws IOException { - Path path = writeCarsToParquetFile( 10, false, CompressionCodecName.UNCOMPRESSED, false); + Path path = writeCarsToParquetFile(10, CompressionCodecName.UNCOMPRESSED, false); ParquetReader reader = new AvroParquetReader(path); - for ( int i =0; i < 10; i++ ) { - assertEquals(getVwPolo().toString(), reader.read().toString()); - assertEquals(getVwPassat().toString(), reader.read().toString()); - assertEquals(getBmwMini().toString(), reader.read().toString()); + for (int i = 0; i < 10; i++) { + assertEquals(getVwPolo().toString(),reader.read().toString()); + assertEquals(getVwPassat().toString(),reader.read().toString()); + assertEquals(getBmwMini().toString(),reader.read().toString()); + } + assertNull(reader.read()); + } + + @Test + public void testReadWriteSpecificWithDictionary() throws IOException { + Path path = writeCarsToParquetFile(10, CompressionCodecName.UNCOMPRESSED, true); + ParquetReader reader = new AvroParquetReader(path); + for (int i = 0; i < 10; i++) { + assertEquals(getVwPolo().toString(),reader.read().toString()); + assertEquals(getVwPassat().toString(),reader.read().toString()); + assertEquals(getBmwMini().toString(),reader.read().toString()); } assertNull(reader.read()); } @@ -34,36 +51,59 @@ public void testReadWriteSpecific() throws IOException { @Test public void testFilterMatchesMultiple() throws IOException { - Path path = writeCarsToParquetFile(10, false, CompressionCodecName.UNCOMPRESSED, false); + Path path = writeCarsToParquetFile(10, CompressionCodecName.UNCOMPRESSED, false); ParquetReader reader = new AvroParquetReader(path, column("make", equalTo("Volkswagen"))); - for ( int i =0; i < 10; i++ ) { - assertEquals(reader.read().toString(), getVwPolo().toString()); - assertEquals(reader.read().toString(), getVwPassat().toString()); + for (int i = 0; i < 10; i++) { + assertEquals(reader.read().toString(),getVwPolo().toString()); + assertEquals(reader.read().toString(),getVwPassat().toString()); } assertNull( reader.read()); } + @Test + public void testFilterWithDictionary() throws IOException { + + Path path = writeCarsToParquetFile(1,CompressionCodecName.UNCOMPRESSED,true); + + ParquetReader reader = new AvroParquetReader(path, column("make", equalTo("Volkswagen"))); + assertEquals(reader.read().toString(),getVwPolo().toString()); + assertEquals(reader.read().toString(),getVwPassat().toString()); + assertNull( reader.read()); + } + + @Test + public void testFilterOnSubAttribute() throws IOException { + + Path path = writeCarsToParquetFile(1, CompressionCodecName.UNCOMPRESSED, false); + + ParquetReader reader = new AvroParquetReader(path, column("engine.type", equalTo(EngineType.DIESEL))); + assertEquals(reader.read().toString(),getVwPassat().toString()); + assertNull( reader.read()); + + reader = new AvroParquetReader(path, column("engine.capacity", equalTo(1.4f))); + assertEquals(reader.read().toString(),getVwPolo().toString()); + assertNull( reader.read()); + + + reader = new AvroParquetReader(path, column("engine.hasTurboCharger", equalTo(true))); + assertEquals(reader.read().toString(),getBmwMini().toString()); + assertNull( reader.read()); + } - private Path writeCarsToParquetFile( int num, boolean varyYear, - CompressionCodecName compression, boolean enableDictionary) throws IOException { + private Path writeCarsToParquetFile( int num, CompressionCodecName compression, boolean enableDictionary) throws IOException { File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp"); tmp.deleteOnExit(); tmp.delete(); Path path = new Path(tmp.getPath()); - Car vwPolo = getVwPolo(); - Car vwPassat = getVwPassat(); - Car bmwMini = getBmwMini(); - - ParquetWriter writer = new AvroParquetWriter(path, Car.SCHEMA$, compression, - DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE,enableDictionary); - for ( int i =0; i < num; i++ ) { - if (varyYear ) { - vwPolo.setYear( ( i / 100l )); - vwPassat.setYear( ( i / 100l )); - bmwMini.setYear( ( i / 100l )); - } + Car vwPolo = getVwPolo(); + Car vwPassat = getVwPassat(); + Car bmwMini = getBmwMini(); + + ParquetWriter writer = new AvroParquetWriter(path,Car.SCHEMA$, compression, + DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE, enableDictionary); + for (int i = 0; i < num; i++) { writer.write(vwPolo); writer.write(vwPassat); writer.write(bmwMini); @@ -75,30 +115,51 @@ private Path writeCarsToParquetFile( int num, boolean varyYear, public static Car getVwPolo() { return Car.newBuilder() .setYear(2010) + .setRegistration("A123 GTR") .setMake("Volkswagen") .setModel("Polo") .setDoors(4) - .setEngineCapacity(1.4f) + .setEngine(Engine.newBuilder().setType(EngineType.PETROL) + .setCapacity(1.4f).setHasTurboCharger(false).build()) + .setOptionalExtra( + Stereo.newBuilder().setMake("Blaupunkt").setSpeakers(4).build()) + .setServiceHistory(ImmutableList.of( + Service.newBuilder().setDate(1325376000l).setMechanic("Jim").build(), + Service.newBuilder().setDate(1356998400l).setMechanic("Mike").build() + )) .build(); } public static Car getVwPassat() { return Car.newBuilder() .setYear(2010) + .setRegistration("A123 GXR") .setMake("Volkswagen") .setModel("Passat") .setDoors(5) - .setEngineCapacity(2.0f) + .setEngine(Engine.newBuilder().setType(EngineType.DIESEL) + .setCapacity(2.0f).setHasTurboCharger(false).build()) + .setOptionalExtra( + LeatherTrim.newBuilder().setColour("Black").build()) + .setServiceHistory(ImmutableList.of( + Service.newBuilder().setDate(1325376000l).setMechanic("Jim").build() + )) .build(); } public static Car getBmwMini() { return Car.newBuilder() .setYear(2010) + .setRegistration("A124 GSR") .setMake("BMW") .setModel("Mini") .setDoors(4) - .setEngineCapacity(1.6f) + .setEngine(Engine.newBuilder().setType(EngineType.PETROL) + .setCapacity(1.6f).setHasTurboCharger(true).build()) + .setOptionalExtra(null) + .setServiceHistory(ImmutableList.of( + Service.newBuilder().setDate(1356998400l).setMechanic("Mike").build() + )) .build(); } } diff --git a/parquet-avro/src/test/resources/car.avdl b/parquet-avro/src/test/resources/car.avdl index d741707a5..e330aaec4 100644 --- a/parquet-avro/src/test/resources/car.avdl +++ b/parquet-avro/src/test/resources/car.avdl @@ -1,10 +1,38 @@ @namespace("parquet.avro") protocol Cars { + + record Service { + long date; + string mechanic; + } + + record Stereo { + string make; + int speakers; + } + + record LeatherTrim { + string colour; + } + + enum EngineType { + DIESEL, PETROL + } + + record Engine { + EngineType type; + float capacity; + boolean hasTurboCharger; + } + record Car { long year; + string registration; string make; string model; - long doors; - float engineCapacity; + int doors; + Engine engine; + union { null, Stereo, LeatherTrim } optionalExtra = null; + array serviceHistory; } } \ No newline at end of file diff --git a/parquet-column/pom.xml b/parquet-column/pom.xml index 8d05c123f..6fcc75d47 100644 --- a/parquet-column/pom.xml +++ b/parquet-column/pom.xml @@ -25,11 +25,6 @@ 1.7 compile - - com.google.guava - guava - 11.0 - diff --git a/parquet-column/src/main/java/parquet/column/impl/ColumnReaderImpl.java b/parquet-column/src/main/java/parquet/column/impl/ColumnReaderImpl.java index af695ac7e..927282641 100644 --- a/parquet-column/src/main/java/parquet/column/impl/ColumnReaderImpl.java +++ b/parquet-column/src/main/java/parquet/column/impl/ColumnReaderImpl.java @@ -404,7 +404,7 @@ public void readCurrentValue() { /** * Reads the value into the binding or skips forwards. - * @param skip If true don;t deserialize just skip forwards + * @param skip If true do not deserialize just skip forwards */ protected void checkValueRead(boolean skip) { try { diff --git a/parquet-column/src/main/java/parquet/filter/ColumnPredicates.java b/parquet-column/src/main/java/parquet/filter/ColumnPredicates.java new file mode 100644 index 000000000..00e7ebc2b --- /dev/null +++ b/parquet-column/src/main/java/parquet/filter/ColumnPredicates.java @@ -0,0 +1,82 @@ +package parquet.filter; + +import parquet.Preconditions; +import parquet.column.ColumnReader; + +/** + * ColumnPredicates class provides checks for column values. Factory methods + * are provided for standard predicates which wrap the job of getting the + * correct value from the column. + */ +public class ColumnPredicates { + + public static interface Predicate { + boolean apply(ColumnReader input); + } + + public static Predicate equalTo(final String target) { + Preconditions.checkNotNull(target,"target"); + return new Predicate() { + @Override + public boolean apply(ColumnReader input) { + return target.equals(input.getBinary().toStringUsingUTF8()); + } + }; + } + + public static Predicate equalTo(final int target) { + return new Predicate() { + @Override + public boolean apply(ColumnReader input) { + return input.getInteger() == target; + } + }; + } + + public static Predicate equalTo(final long target) { + return new Predicate() { + @Override + public boolean apply(ColumnReader input) { + return input.getLong() == target; + } + }; + } + + public static Predicate equalTo(final float target) { + return new Predicate() { + @Override + public boolean apply(ColumnReader input) { + return input.getFloat() == target; + } + }; + } + + public static Predicate equalTo(final double target) { + return new Predicate() { + @Override + public boolean apply(ColumnReader input) { + return input.getDouble() == target; + } + }; + } + + public static Predicate equalTo(final boolean target) { + return new Predicate() { + @Override + public boolean apply(ColumnReader input) { + return input.getBoolean() == target; + } + }; + } + + public static Predicate equalTo(final E target) { + Preconditions.checkNotNull(target,"target"); + final String targetAsString = target.name(); + return new Predicate() { + @Override + public boolean apply(ColumnReader input) { + return targetAsString.equals(input.getBinary().toStringUsingUTF8()); + } + }; + } +} diff --git a/parquet-column/src/main/java/parquet/filter/ColumnRecordFilter.java b/parquet-column/src/main/java/parquet/filter/ColumnRecordFilter.java index 3cd24f8a8..7da007391 100644 --- a/parquet-column/src/main/java/parquet/filter/ColumnRecordFilter.java +++ b/parquet-column/src/main/java/parquet/filter/ColumnRecordFilter.java @@ -1,14 +1,7 @@ package parquet.filter; -import com.google.common.base.Objects; -import com.google.common.base.Predicate; -import com.google.common.base.Splitter; import parquet.column.ColumnReader; -import parquet.io.api.Binary; - import java.util.Arrays; - -import static com.google.common.collect.Iterables.toArray; import static parquet.Preconditions.checkNotNull; /** @@ -17,7 +10,7 @@ public final class ColumnRecordFilter implements RecordFilter { private final ColumnReader filterOnColumn; - private final Predicate filterPredicate; + private final ColumnPredicates.Predicate filterPredicate; /** * Factory method for record filter which applies the supplied predicate to the specified column. @@ -27,14 +20,15 @@ * @param columnPath Dot separated path specifier, e.g. "engine.capacity" * @param predicate Should call getBinary etc. and check the value */ - public static final UnboundRecordFilter column(final String columnPath, final Predicate predicate) { + public static final UnboundRecordFilter column(final String columnPath, + final ColumnPredicates.Predicate predicate) { checkNotNull(columnPath, "columnPath"); checkNotNull(predicate, "predicate"); return new UnboundRecordFilter() { - final String[] filterPath = toArray(Splitter.on('.').split(columnPath), String.class); + final String[] filterPath = columnPath.split("\\."); @Override public RecordFilter bind(Iterable readers) { - for ( ColumnReader reader : readers ) { + for (ColumnReader reader : readers) { if ( Arrays.equals( reader.getDescriptor().getPath(), filterPath)) { return new ColumnRecordFilter(reader, predicate); } @@ -47,7 +41,7 @@ public RecordFilter bind(Iterable readers) { /** * Private constructor. Use column() instead. */ - private ColumnRecordFilter(ColumnReader filterOnColumn, Predicate filterPredicate) { + private ColumnRecordFilter(ColumnReader filterOnColumn, ColumnPredicates.Predicate filterPredicate) { this.filterOnColumn = filterOnColumn; this.filterPredicate = filterPredicate; } @@ -68,29 +62,4 @@ public boolean isMatch() { public boolean isFullyConsumed() { return filterOnColumn.isFullyConsumed(); } - - /** - * Predicate for string equality - */ - public static final Predicate equalTo( final String value ) { - final Binary valueAsBinary = Binary.fromString( value ); - return new Predicate () { - @Override - public boolean apply(ColumnReader input) { - return Objects.equal( input.getBinary(), valueAsBinary ); - } - }; - } - - /** - * Predicate for INT64 / long equality - */ - public static final Predicate equalTo( final long value ) { - return new Predicate () { - @Override - public boolean apply(ColumnReader input) { - return input.getLong() == value; - } - }; - } } diff --git a/parquet-column/src/main/java/parquet/filter/NullRecordFilter.java b/parquet-column/src/main/java/parquet/filter/NullRecordFilter.java deleted file mode 100644 index 7a754a0f2..000000000 --- a/parquet-column/src/main/java/parquet/filter/NullRecordFilter.java +++ /dev/null @@ -1,30 +0,0 @@ -package parquet.filter; - -import parquet.column.ColumnReader; - -/** - * Null filter which will always let all records through. - */ -final class NullRecordFilter implements UnboundRecordFilter, RecordFilter { - - /** - * Package level visibility so we can make an instance available in interface. - */ - NullRecordFilter() {} - - @Override - public RecordFilter bind(Iterable readers) { - return this; - } - - @Override - public boolean isMatch() { - return true; - } - - @Override - public boolean isFullyConsumed() { - // Always false, we will leave to the record reader to decide when it has consumed everything - return false; - } -} diff --git a/parquet-column/src/main/java/parquet/filter/PagedRecordFilter.java b/parquet-column/src/main/java/parquet/filter/PagedRecordFilter.java index 02bcc81a2..25c4e17a4 100644 --- a/parquet-column/src/main/java/parquet/filter/PagedRecordFilter.java +++ b/parquet-column/src/main/java/parquet/filter/PagedRecordFilter.java @@ -34,8 +34,7 @@ private PagedRecordFilter(long startPos, long pageSize) { } /** - * Terminate early when we have got our page. Later we will want a row count and this - * will be no good. + * Terminate early when we have got our page. */ @Override public boolean isFullyConsumed() { diff --git a/parquet-column/src/main/java/parquet/filter/RecordFilter.java b/parquet-column/src/main/java/parquet/filter/RecordFilter.java index fe8031587..0a728b3ea 100644 --- a/parquet-column/src/main/java/parquet/filter/RecordFilter.java +++ b/parquet-column/src/main/java/parquet/filter/RecordFilter.java @@ -18,9 +18,4 @@ * Whether the filter values are fully consumed. */ boolean isFullyConsumed(); - - /** - * Null filter to be used if no filtering needed. - */ - public static final UnboundRecordFilter NULL_FILTER = new NullRecordFilter(); } diff --git a/parquet-column/src/main/java/parquet/filter/UnboundRecordFilter.java b/parquet-column/src/main/java/parquet/filter/UnboundRecordFilter.java index d890115e6..b7e0c3f50 100644 --- a/parquet-column/src/main/java/parquet/filter/UnboundRecordFilter.java +++ b/parquet-column/src/main/java/parquet/filter/UnboundRecordFilter.java @@ -1,6 +1,5 @@ package parquet.filter; -import com.google.common.base.Predicate; import parquet.column.ColumnReader; /** diff --git a/parquet-column/src/main/java/parquet/io/FilteredRecordReader.java b/parquet-column/src/main/java/parquet/io/FilteredRecordReader.java new file mode 100644 index 000000000..ba3477497 --- /dev/null +++ b/parquet-column/src/main/java/parquet/io/FilteredRecordReader.java @@ -0,0 +1,77 @@ +package parquet.io; + +import parquet.column.ColumnReader; +import parquet.column.impl.ColumnReadStoreImpl; +import parquet.filter.RecordFilter; +import parquet.filter.UnboundRecordFilter; +import parquet.io.api.RecordMaterializer; + +import java.util.Arrays; + +/** + * Extends the + * @author Jacob Metcalf + * + */ +class FilteredRecordReader extends RecordReaderImplementation { + + private final RecordFilter recordFilter; + + /** + * @param root the root of the schema + * @param validating + * @param columnStore + * @param unboundFilter Filter records, pass in NULL_FILTER to leave unfiltered. + */ + public FilteredRecordReader(MessageColumnIO root, RecordMaterializer recordMaterializer, boolean validating, + ColumnReadStoreImpl columnStore, UnboundRecordFilter unboundFilter) { + super(root, recordMaterializer, validating, columnStore); + + if ( unboundFilter != null ) { + recordFilter = unboundFilter.bind(getColumnReaders()); + } else { + recordFilter = null; + } + } + + /** + * Override read() method to provide skip. + */ + @Override + public T read() { + if ( skipToMatch()) { + return super.read(); + } else { + return null; + } + } + + + /** + * Skips forwards until the filter finds the first match. Returns false + * if none found. + */ + private boolean skipToMatch() { + while ( !recordFilter.isMatch()) { + if ( recordFilter.isFullyConsumed()) { + return false; + } + State currentState = getState(0); + do { + ColumnReader columnReader = currentState.column; + + // currentLevel = depth + 1 at this point + // set the current value + if (columnReader.getCurrentDefinitionLevel() >= currentState.maxDefinitionLevel) { + columnReader.skip(); + } + columnReader.consume(); + + // Based on repetition level work out next state to go to + int nextR = currentState.maxRepetitionLevel == 0 ? 0 : columnReader.getCurrentRepetitionLevel(); + currentState = currentState.getNextState(nextR); + } while (currentState != null); + } + return true; + } +} diff --git a/parquet-column/src/main/java/parquet/io/MessageColumnIO.java b/parquet-column/src/main/java/parquet/io/MessageColumnIO.java index 32caaf22d..64c65ad68 100644 --- a/parquet-column/src/main/java/parquet/io/MessageColumnIO.java +++ b/parquet-column/src/main/java/parquet/io/MessageColumnIO.java @@ -60,18 +60,20 @@ this, recordMaterializer, validating, - new ColumnReadStoreImpl(columns, recordMaterializer.getRootConverter(), getType()), - RecordFilter.NULL_FILTER + new ColumnReadStoreImpl(columns, recordMaterializer.getRootConverter(), getType()) ); } public RecordReader getRecordReader(PageReadStore columns, RecordMaterializer recordMaterializer, - UnboundRecordFilter recordFilter) { - return new RecordReaderImplementation( + UnboundRecordFilter unboundFilter) { + + return (unboundFilter == null) + ? getRecordReader(columns, recordMaterializer) + : new FilteredRecordReader( this, recordMaterializer, validating, new ColumnReadStoreImpl(columns, recordMaterializer.getRootConverter(), getType()), - recordFilter + unboundFilter ); } diff --git a/parquet-column/src/main/java/parquet/io/RecordReaderImplementation.java b/parquet-column/src/main/java/parquet/io/RecordReaderImplementation.java index 488dd6e0a..4d66278de 100644 --- a/parquet-column/src/main/java/parquet/io/RecordReaderImplementation.java +++ b/parquet-column/src/main/java/parquet/io/RecordReaderImplementation.java @@ -23,7 +23,6 @@ import java.util.List; import java.util.Map; -import com.google.common.collect.ImmutableList; import parquet.Log; import parquet.column.ColumnReader; import parquet.column.impl.ColumnReadStoreImpl; @@ -34,8 +33,6 @@ import parquet.io.api.RecordMaterializer; import parquet.schema.MessageType; import parquet.schema.PrimitiveType.PrimitiveTypeName; -import parquet.filter.UnboundRecordFilter; -import parquet.filter.RecordFilter; /** @@ -225,27 +222,29 @@ public int getDepth(int definitionLevel) { public Case getCase(int currentLevel, int d, int nextR) { return caseLookup[currentLevel][d][nextR]; } + + public State getNextState(int nextR) { + return nextState[nextR]; + } } private final GroupConverter recordConsumer; private final RecordMaterializer recordMaterializer; - private final RecordFilter recordFilter; private State[] states; + private ColumnReader[] columns; /** * * @param root the root of the schema * @param validating * @param columnStore - * @param unboundFilter Filter records, pass in NULL_FILTER to leave unfiltered. */ - public RecordReaderImplementation(MessageColumnIO root, RecordMaterializer recordMaterializer, boolean validating, ColumnReadStoreImpl columnStore, - UnboundRecordFilter unboundFilter) { + public RecordReaderImplementation(MessageColumnIO root, RecordMaterializer recordMaterializer, boolean validating, ColumnReadStoreImpl columnStore) { this.recordMaterializer = recordMaterializer; this.recordConsumer = recordMaterializer.getRootConverter(); // TODO: validator(wrap(recordMaterializer), validating, root.getType()); PrimitiveColumnIO[] leaves = root.getLeaves().toArray(new PrimitiveColumnIO[root.getLeaves().size()]); - ColumnReader[] columns = new ColumnReader[leaves.length]; + columns = new ColumnReader[leaves.length]; int[][] nextReader = new int[leaves.length][]; int[][] nextLevel = new int[leaves.length][]; GroupConverter[][] groupConverterPaths = new GroupConverter[leaves.length][]; @@ -361,10 +360,6 @@ public int compare(Case o1, Case o2) { Collections.sort(state.definedCases, caseComparator); Collections.sort(state.undefinedCases, caseComparator); } - - // We need to make defensive copy to stop interference but as an optimisation don't bother if null - recordFilter = unboundFilter.bind(( unboundFilter == RecordFilter.NULL_FILTER ) - ? null : ImmutableList.copyOf(columns)); } //TODO: have those wrappers for a converter @@ -384,12 +379,6 @@ private RecordConsumer wrap(RecordConsumer recordConsumer) { */ @Override public T read() { - // Skip forwards until the filter matches a record - if ( !skipToMatch()) { - return null; - } - - // Materialize the record int currentLevel = 0; State currentState = states[0]; recordConsumer.start(); @@ -415,40 +404,13 @@ public T read() { for (; currentLevel > next; currentLevel--) { currentState.groupConverterPath[currentLevel - 1].end(); } + currentState = currentState.nextState[nextR]; } while (currentState != null); recordConsumer.end(); return recordMaterializer.getCurrentRecord(); } - /** - * Skips forwards until the filter finds the first match. Returns false - * if none found. - */ - private boolean skipToMatch() { - while ( !recordFilter.isMatch()) { - if ( recordFilter.isFullyConsumed()) { - return false; - } - State currentState = states[0]; - do { - ColumnReader columnReader = currentState.column; - - // currentLevel = depth + 1 at this point - // set the current value - if (columnReader.getCurrentDefinitionLevel() >= currentState.maxDefinitionLevel) { - columnReader.skip(); - } - columnReader.consume(); - - // Based on repetition level work out next state to go to - int nextR = currentState.maxRepetitionLevel == 0 ? 0 : columnReader.getCurrentRepetitionLevel(); - currentState = currentState.nextState[nextR]; - } while (currentState != null); - } - return true; - } - private static void log(String string) { LOG.debug(string); } @@ -486,4 +448,8 @@ protected Converter getRecordConsumer() { return recordConsumer; } + protected Iterable getColumnReaders() { + // Converting the array to an iterable ensures that the array cannot be altered + return Arrays.asList(columns); + } } diff --git a/parquet-column/src/test/java/parquet/io/TestFiltered.java b/parquet-column/src/test/java/parquet/io/TestFiltered.java index d7dd8d5ce..2eb206a6b 100644 --- a/parquet-column/src/test/java/parquet/io/TestFiltered.java +++ b/parquet-column/src/test/java/parquet/io/TestFiltered.java @@ -1,8 +1,6 @@ package parquet.io; -import org.junit.Ignore; import org.junit.Test; -import parquet.Log; import parquet.column.impl.ColumnWriteStoreImpl; import parquet.column.page.mem.MemPageStore; import parquet.example.data.Group; @@ -17,13 +15,11 @@ import static parquet.example.Paper.schema; import static parquet.filter.AndRecordFilter.and; import static parquet.filter.PagedRecordFilter.page; -import static parquet.filter.ColumnRecordFilter.equalTo; +import static parquet.filter.ColumnPredicates.equalTo; import static parquet.filter.ColumnRecordFilter.column; public class TestFiltered { - private static final Log LOG = Log.getLog(TestColumnIO.class); - @Test public void testFilterOnInteger() { MemPageStore memPageStore = new MemPageStore(); @@ -45,7 +41,7 @@ public void testFilterOnInteger() { columnIO.getRecordReader(memPageStore, recordConverter, column("DocId", equalTo(20l))); - Group actual2= recordReader.read(); + Group actual2 = recordReader.read(); assertNull( "There should be no more records as r1 filtered out", recordReader.read()); assertEquals("filtering did not return the correct record", r2.toString(), actual2.toString()); @@ -63,7 +59,7 @@ public void testFilterOnString() { columnIO.getRecordReader(memPageStore, recordConverter, column("Name.Url", equalTo("http://A"))); - Group actual1 = recordReader.read(); + Group actual1 = recordReader.read(); assertNull( "There should be no more records as r2 filtered out", recordReader.read()); assertEquals("filtering did not return the correct record", r1.toString(), actual1.toString()); diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetReader.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetReader.java index c9e3661ea..948063265 100644 --- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetReader.java +++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetReader.java @@ -42,7 +42,7 @@ private ParquetRecordReader reader; public ParquetReader(Path file, ReadSupport readSupport) throws IOException { - this(file, readSupport, RecordFilter.NULL_FILTER); + this(file, readSupport, null); } public ParquetReader(Path file, ReadSupport readSupport, UnboundRecordFilter filter) throws IOException { @@ -59,7 +59,7 @@ public ParquetReader(Path file, ReadSupport readSupport, UnboundRecordFilter MessageType schema = fileMetaData.getSchema(); Map extraMetadata = fileMetaData.getKeyValueMetaData(); final ReadContext readContext = readSupport.init(conf, extraMetadata, schema); - reader = new ParquetRecordReader(readSupport,filter); + reader = new ParquetRecordReader(readSupport, filter); ParquetInputSplit inputSplit = new ParquetInputSplit( file, 0, 0, null, blocks, diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java index ee08c5abe..2cf3e52e6 100644 --- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java +++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java @@ -82,7 +82,7 @@ * @param readSupport Provides functionality for reading. */ public ParquetRecordReader(ReadSupport readSupport ) { - this(readSupport, RecordFilter.NULL_FILTER ); + this(readSupport, null); } /** From c4b14fbc4d20034596ad56825349976e4805f456 Mon Sep 17 00:00:00 2001 From: Jacob Date: Sat, 6 Jul 2013 15:05:15 +0100 Subject: [PATCH 8/9] Adding APL headers and test for union schema creation. --- .../java/parquet/avro/TestAvroSchemaConverter.java | 22 ++++++++++++++++++++++ .../java/parquet/avro/TestSpecificReadWrite.java | 15 +++++++++++++++ parquet-column/src/main/java/parquet/Ints.java | 15 +++++++++++++++ .../rle/RunLengthBitPackingHybridEncoder.java | 15 +++++++++++++++ .../rle/RunLengthBitPackingHybridValuesReader.java | 15 +++++++++++++++ .../rle/RunLengthBitPackingHybridValuesWriter.java | 15 +++++++++++++++ .../main/java/parquet/filter/AndRecordFilter.java | 15 +++++++++++++++ .../main/java/parquet/filter/ColumnPredicates.java | 15 +++++++++++++++ .../java/parquet/filter/ColumnRecordFilter.java | 15 +++++++++++++++ .../java/parquet/filter/PagedRecordFilter.java | 15 +++++++++++++++ .../src/main/java/parquet/filter/RecordFilter.java | 15 +++++++++++++++ .../java/parquet/filter/UnboundRecordFilter.java | 15 +++++++++++++++ .../main/java/parquet/io/FilteredRecordReader.java | 15 +++++++++++++++ .../bitpacking/TestByteBasedBitPackingEncoder.java | 15 +++++++++++++++ .../RunLengthBitPackingHybridIntegrationTest.java | 15 +++++++++++++++ .../rle/TestRunLengthBitPackingHybridEncoder.java | 15 +++++++++++++++ .../src/test/java/parquet/io/TestFiltered.java | 15 +++++++++++++++ .../java/parquet/hadoop/codec/SnappyCodec.java | 15 +++++++++++++++ .../parquet/hadoop/codec/SnappyCompressor.java | 15 +++++++++++++++ .../parquet/hadoop/codec/SnappyDecompressor.java | 15 +++++++++++++++ .../main/java/parquet/hadoop/codec/SnappyUtil.java | 15 +++++++++++++++ .../test/java/parquet/hadoop/TestSnappyCodec.java | 15 +++++++++++++++ 22 files changed, 337 insertions(+) diff --git a/parquet-avro/src/test/java/parquet/avro/TestAvroSchemaConverter.java b/parquet-avro/src/test/java/parquet/avro/TestAvroSchemaConverter.java index 551143a3b..2b5ce9abb 100644 --- a/parquet-avro/src/test/java/parquet/avro/TestAvroSchemaConverter.java +++ b/parquet-avro/src/test/java/parquet/avro/TestAvroSchemaConverter.java @@ -88,4 +88,26 @@ public void testOptionalFields() throws Exception { " optional int32 myint;\n" + "}\n"); } + + @Test + public void testUnionOfTwoTypes() throws Exception { + Schema schema = Schema.createRecord("record2", null, null, false); + Schema multipleTypes = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type + .NULL), + Schema.create(Schema.Type.INT), + Schema.create(Schema.Type.FLOAT))); + schema.setFields(Arrays.asList( + new Schema.Field("myunion", multipleTypes, null, NullNode.getInstance()) + )); + + // Avro union is modelled using optional data members of thw different types; + testConversion( + schema, + "message record2 {\n" + + " optional group myunion {\n" + + " optional int32 member0;\n" + + " optional float member1;\n" + + " }\n" + + "}\n"); + } } diff --git a/parquet-avro/src/test/java/parquet/avro/TestSpecificReadWrite.java b/parquet-avro/src/test/java/parquet/avro/TestSpecificReadWrite.java index 60765870d..5b37d2643 100644 --- a/parquet-avro/src/test/java/parquet/avro/TestSpecificReadWrite.java +++ b/parquet-avro/src/test/java/parquet/avro/TestSpecificReadWrite.java @@ -1,3 +1,18 @@ +/** + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package parquet.avro; import com.google.common.collect.ImmutableList; diff --git a/parquet-column/src/main/java/parquet/Ints.java b/parquet-column/src/main/java/parquet/Ints.java index 667540b1f..1e7f51aed 100644 --- a/parquet-column/src/main/java/parquet/Ints.java +++ b/parquet-column/src/main/java/parquet/Ints.java @@ -1,3 +1,18 @@ +/** + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package parquet; /** diff --git a/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java b/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java index 76f5f81c4..115898932 100644 --- a/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java +++ b/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java @@ -1,3 +1,18 @@ +/** + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package parquet.column.values.rle; import java.io.IOException; diff --git a/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java b/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java index f00f4134f..fb2e37cd9 100644 --- a/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java +++ b/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java @@ -1,3 +1,18 @@ +/** + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package parquet.column.values.rle; import java.io.ByteArrayInputStream; diff --git a/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java b/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java index c1a3b4ce0..817030c0c 100644 --- a/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java +++ b/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java @@ -1,3 +1,18 @@ +/** + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package parquet.column.values.rle; import java.io.ByteArrayOutputStream; diff --git a/parquet-column/src/main/java/parquet/filter/AndRecordFilter.java b/parquet-column/src/main/java/parquet/filter/AndRecordFilter.java index 675e02e74..04221debf 100644 --- a/parquet-column/src/main/java/parquet/filter/AndRecordFilter.java +++ b/parquet-column/src/main/java/parquet/filter/AndRecordFilter.java @@ -1,3 +1,18 @@ +/** + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package parquet.filter; import parquet.Preconditions; diff --git a/parquet-column/src/main/java/parquet/filter/ColumnPredicates.java b/parquet-column/src/main/java/parquet/filter/ColumnPredicates.java index 00e7ebc2b..b8fc10b3f 100644 --- a/parquet-column/src/main/java/parquet/filter/ColumnPredicates.java +++ b/parquet-column/src/main/java/parquet/filter/ColumnPredicates.java @@ -1,3 +1,18 @@ +/** + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package parquet.filter; import parquet.Preconditions; diff --git a/parquet-column/src/main/java/parquet/filter/ColumnRecordFilter.java b/parquet-column/src/main/java/parquet/filter/ColumnRecordFilter.java index 7da007391..104416f02 100644 --- a/parquet-column/src/main/java/parquet/filter/ColumnRecordFilter.java +++ b/parquet-column/src/main/java/parquet/filter/ColumnRecordFilter.java @@ -1,3 +1,18 @@ +/** + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package parquet.filter; import parquet.column.ColumnReader; diff --git a/parquet-column/src/main/java/parquet/filter/PagedRecordFilter.java b/parquet-column/src/main/java/parquet/filter/PagedRecordFilter.java index 25c4e17a4..41851129c 100644 --- a/parquet-column/src/main/java/parquet/filter/PagedRecordFilter.java +++ b/parquet-column/src/main/java/parquet/filter/PagedRecordFilter.java @@ -1,3 +1,18 @@ +/** + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package parquet.filter; import parquet.column.ColumnReader; diff --git a/parquet-column/src/main/java/parquet/filter/RecordFilter.java b/parquet-column/src/main/java/parquet/filter/RecordFilter.java index 0a728b3ea..569835e3f 100644 --- a/parquet-column/src/main/java/parquet/filter/RecordFilter.java +++ b/parquet-column/src/main/java/parquet/filter/RecordFilter.java @@ -1,3 +1,18 @@ +/** + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package parquet.filter; import parquet.column.ColumnReader; diff --git a/parquet-column/src/main/java/parquet/filter/UnboundRecordFilter.java b/parquet-column/src/main/java/parquet/filter/UnboundRecordFilter.java index b7e0c3f50..b317727bb 100644 --- a/parquet-column/src/main/java/parquet/filter/UnboundRecordFilter.java +++ b/parquet-column/src/main/java/parquet/filter/UnboundRecordFilter.java @@ -1,3 +1,18 @@ +/** + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package parquet.filter; import parquet.column.ColumnReader; diff --git a/parquet-column/src/main/java/parquet/io/FilteredRecordReader.java b/parquet-column/src/main/java/parquet/io/FilteredRecordReader.java index ba3477497..3c1ae92a4 100644 --- a/parquet-column/src/main/java/parquet/io/FilteredRecordReader.java +++ b/parquet-column/src/main/java/parquet/io/FilteredRecordReader.java @@ -1,3 +1,18 @@ +/** + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package parquet.io; import parquet.column.ColumnReader; diff --git a/parquet-column/src/test/java/parquet/column/values/bitpacking/TestByteBasedBitPackingEncoder.java b/parquet-column/src/test/java/parquet/column/values/bitpacking/TestByteBasedBitPackingEncoder.java index 9ca37a80e..9fd0e723c 100644 --- a/parquet-column/src/test/java/parquet/column/values/bitpacking/TestByteBasedBitPackingEncoder.java +++ b/parquet-column/src/test/java/parquet/column/values/bitpacking/TestByteBasedBitPackingEncoder.java @@ -1,3 +1,18 @@ +/** + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package parquet.column.values.bitpacking; import org.junit.Test; diff --git a/parquet-column/src/test/java/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java b/parquet-column/src/test/java/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java index 675fda634..2058dba41 100644 --- a/parquet-column/src/test/java/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java +++ b/parquet-column/src/test/java/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java @@ -1,3 +1,18 @@ +/** + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package parquet.column.values.rle; import java.io.ByteArrayInputStream; diff --git a/parquet-column/src/test/java/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java b/parquet-column/src/test/java/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java index 4e63dafd9..2f5e4b865 100644 --- a/parquet-column/src/test/java/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java +++ b/parquet-column/src/test/java/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java @@ -1,3 +1,18 @@ +/** + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package parquet.column.values.rle; import java.io.ByteArrayInputStream; diff --git a/parquet-column/src/test/java/parquet/io/TestFiltered.java b/parquet-column/src/test/java/parquet/io/TestFiltered.java index 2eb206a6b..f4d9adeb8 100644 --- a/parquet-column/src/test/java/parquet/io/TestFiltered.java +++ b/parquet-column/src/test/java/parquet/io/TestFiltered.java @@ -1,3 +1,18 @@ +/** + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package parquet.io; import org.junit.Test; diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyCodec.java b/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyCodec.java index abe1d67ef..70c4f115f 100644 --- a/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyCodec.java +++ b/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyCodec.java @@ -1,3 +1,18 @@ +/** + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package parquet.hadoop.codec; import java.io.IOException; diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyCompressor.java b/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyCompressor.java index ac24303fd..61520d54f 100644 --- a/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyCompressor.java +++ b/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyCompressor.java @@ -1,3 +1,18 @@ +/** + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package parquet.hadoop.codec; import java.io.IOException; diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyDecompressor.java b/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyDecompressor.java index d1f5e5360..f1c9e2ab2 100644 --- a/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyDecompressor.java +++ b/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyDecompressor.java @@ -1,3 +1,18 @@ +/** + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package parquet.hadoop.codec; import java.io.IOException; diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyUtil.java b/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyUtil.java index 2b2bf1eb4..389dcce19 100644 --- a/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyUtil.java +++ b/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyUtil.java @@ -1,3 +1,18 @@ +/** + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package parquet.hadoop.codec; import parquet.Preconditions; diff --git a/parquet-hadoop/src/test/java/parquet/hadoop/TestSnappyCodec.java b/parquet-hadoop/src/test/java/parquet/hadoop/TestSnappyCodec.java index ac9532743..09c9d6831 100644 --- a/parquet-hadoop/src/test/java/parquet/hadoop/TestSnappyCodec.java +++ b/parquet-hadoop/src/test/java/parquet/hadoop/TestSnappyCodec.java @@ -1,3 +1,18 @@ +/** + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package parquet.hadoop; import java.io.IOException; From f52a26e1a32aadf4ec6713ae12ad0d5cedeca9ed Mon Sep 17 00:00:00 2001 From: Jacob Date: Sat, 6 Jul 2013 15:14:14 +0100 Subject: [PATCH 9/9] Renamed checkValueRead. --- .../java/parquet/column/impl/ColumnReaderImpl.java | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/parquet-column/src/main/java/parquet/column/impl/ColumnReaderImpl.java b/parquet-column/src/main/java/parquet/column/impl/ColumnReaderImpl.java index 927282641..2153b5987 100644 --- a/parquet-column/src/main/java/parquet/column/impl/ColumnReaderImpl.java +++ b/parquet-column/src/main/java/parquet/column/impl/ColumnReaderImpl.java @@ -306,13 +306,13 @@ public boolean isFullyConsumed() { */ @Override public void writeCurrentValueToConverter() { - checkValueRead(false); + readIfPossible(false); this.binding.writeValue(); } @Override public int getCurrentValueDictionaryID() { - checkValueRead(false); + readIfPossible(false); return binding.getDictionaryId(); } @@ -322,7 +322,7 @@ public int getCurrentValueDictionaryID() { */ @Override public int getInteger() { - checkValueRead(false); + readIfPossible(false); return this.binding.getInteger(); } @@ -332,7 +332,7 @@ public int getInteger() { */ @Override public boolean getBoolean() { - checkValueRead(false); + readIfPossible(false); return this.binding.getBoolean(); } @@ -342,7 +342,7 @@ public boolean getBoolean() { */ @Override public long getLong() { - checkValueRead(false); + readIfPossible(false); return this.binding.getLong(); } @@ -352,7 +352,7 @@ public long getLong() { */ @Override public Binary getBinary() { - checkValueRead(false); + readIfPossible(false); return this.binding.getBinary(); } @@ -362,7 +362,7 @@ public Binary getBinary() { */ @Override public float getFloat() { - checkValueRead(false); + readIfPossible(false); return this.binding.getFloat(); } @@ -372,7 +372,7 @@ public float getFloat() { */ @Override public double getDouble() { - checkValueRead(false); + readIfPossible(false); return this.binding.getDouble(); } @@ -406,7 +406,7 @@ public void readCurrentValue() { * Reads the value into the binding or skips forwards. * @param skip If true do not deserialize just skip forwards */ - protected void checkValueRead(boolean skip) { + protected void readIfPossible(boolean skip) { try { checkRead(); if (!consumed && !valueRead) { @@ -432,7 +432,7 @@ protected void checkValueRead(boolean skip) { */ @Override public void skip() { - checkValueRead(true); + readIfPossible(true); } /**