diff --git a/parquet-avro/pom.xml b/parquet-avro/pom.xml index 59675fcae..3a032139f 100644 --- a/parquet-avro/pom.xml +++ b/parquet-avro/pom.xml @@ -16,6 +16,7 @@ https://github.com/Parquet/parquet-mr + 1.7.4 @@ -37,7 +38,7 @@ org.apache.avro avro - 1.7.3 + ${avro.version} org.apache.hadoop @@ -107,6 +108,44 @@ + + org.apache.avro + avro-maven-plugin + ${avro.version} + + + generate-test-sources + + idl-protocol + + + ${project.basedir}/src/test/resources + ${project.build.directory}/generated-test-sources + String + + + + + + + org.codehaus.mojo + build-helper-maven-plugin + 1.8 + + + add-test-sources + generate-test-sources + + add-test-source + + + + ${project.build.directory}/generated-test-sources + + + + + diff --git a/parquet-avro/src/main/java/parquet/avro/AvroGenericRecordConverter.java b/parquet-avro/src/main/java/parquet/avro/AvroIndexedRecordConverter.java similarity index 74% rename from parquet-avro/src/main/java/parquet/avro/AvroGenericRecordConverter.java rename to parquet-avro/src/main/java/parquet/avro/AvroIndexedRecordConverter.java index e88af8887..b2a280530 100644 --- a/parquet-avro/src/main/java/parquet/avro/AvroGenericRecordConverter.java +++ b/parquet-avro/src/main/java/parquet/avro/AvroIndexedRecordConverter.java @@ -22,6 +22,9 @@ import org.apache.avro.generic.GenericArray; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.avro.specific.SpecificData; +import parquet.Preconditions; import parquet.io.api.Binary; import parquet.io.api.Converter; import parquet.io.api.GroupConverter; @@ -30,26 +33,28 @@ import parquet.schema.MessageType; import parquet.schema.Type; -class AvroGenericRecordConverter extends GroupConverter { +class AvroIndexedRecordConverter extends GroupConverter { private final ParentValueContainer parent; - protected GenericData.Record currentRecord; + protected IndexedRecord currentRecord; private final Converter[] converters; private final GroupType parquetSchema; private final Schema avroSchema; + private final Class specificClass; - public AvroGenericRecordConverter(MessageType parquetSchema, Schema avroSchema) { + public AvroIndexedRecordConverter(MessageType parquetSchema, Schema avroSchema) { this(null, parquetSchema, avroSchema); } - public AvroGenericRecordConverter(ParentValueContainer parent, GroupType + public AvroIndexedRecordConverter(ParentValueContainer parent, GroupType parquetSchema, Schema avroSchema) { this.parent = parent; this.parquetSchema = parquetSchema; this.avroSchema = avroSchema; int schemaSize = parquetSchema.getFieldCount(); this.converters = new Converter[schemaSize]; + this.specificClass = SpecificData.get().getClass(avroSchema); int index = 0; // parquet ignores Avro nulls, so index may differ for (int avroIndex = 0; avroIndex < avroSchema.getFields().size(); avroIndex++) { Schema.Field field = avroSchema.getFields().get(avroIndex); @@ -62,7 +67,7 @@ public AvroGenericRecordConverter(ParentValueContainer parent, GroupType converters[index] = newConverter(fieldSchema, type, new ParentValueContainer() { @Override void add(Object value) { - AvroGenericRecordConverter.this.set(finalAvroIndex, value); + AvroIndexedRecordConverter.this.set(finalAvroIndex, value); } }); index++; @@ -86,13 +91,15 @@ private static Converter newConverter(Schema schema, Type type, } else if (schema.getType().equals(Schema.Type.STRING)) { return new FieldStringConverter(parent); } else if (schema.getType().equals(Schema.Type.RECORD)) { - return new AvroGenericRecordConverter(parent, type.asGroupType(), schema); + return new AvroIndexedRecordConverter(parent, type.asGroupType(), schema); } else if (schema.getType().equals(Schema.Type.ENUM)) { - return new FieldStringConverter(parent); + return new FieldEnumConverter(parent,schema); } else if (schema.getType().equals(Schema.Type.ARRAY)) { - return new GenericArrayConverter(parent, type, schema); + return new AvroArrayConverter(parent, type, schema); } else if (schema.getType().equals(Schema.Type.MAP)) { return new MapConverter(parent, type, schema); + } else if (schema.getType().equals(Schema.Type.UNION)) { + return new AvroUnionConverter(parent, type, schema); } else if (schema.getType().equals(Schema.Type.FIXED)) { return new FieldFixedConverter(parent, schema); } @@ -111,7 +118,10 @@ public Converter getConverter(int fieldIndex) { @Override public void start() { - this.currentRecord = new GenericData.Record(avroSchema); + // Should do the right thing whether it is generic or specific + this.currentRecord = (this.specificClass == null) ? + new GenericData.Record(avroSchema) : + (IndexedRecord)SpecificData.newInstance(specificClass, avroSchema); } @Override @@ -121,7 +131,7 @@ public void end() { } } - GenericRecord getCurrentRecord() { + IndexedRecord getCurrentRecord() { return currentRecord; } @@ -239,6 +249,26 @@ final public void addBinary(Binary value) { } + static final class FieldEnumConverter extends PrimitiveConverter { + + private final ParentValueContainer parent; + private final Class enumClass; + + public FieldEnumConverter(ParentValueContainer parent, Schema enumSchema) { + this.parent = parent; + this.enumClass = SpecificData.get().getClass(enumSchema); + } + + @Override + final public void addBinary(Binary value) { + Object enumValue = value.toStringUsingUTF8(); + if (enumClass != null) { + enumValue = (Enum.valueOf(enumClass,(String)enumValue)); + } + parent.add(enumValue); + } + } + static final class FieldFixedConverter extends PrimitiveConverter { private final ParentValueContainer parent; @@ -256,14 +286,14 @@ final public void addBinary(Binary value) { } - static final class GenericArrayConverter extends GroupConverter { + static final class AvroArrayConverter extends GroupConverter { private final ParentValueContainer parent; private final Schema avroSchema; private final Converter converter; private GenericArray array; - public GenericArrayConverter(ParentValueContainer parent, Type parquetSchema, + public AvroArrayConverter(ParentValueContainer parent, Type parquetSchema, Schema avroSchema) { this.parent = parent; this.avroSchema = avroSchema; @@ -294,6 +324,51 @@ public void end() { } } + static final class AvroUnionConverter extends GroupConverter { + + private final ParentValueContainer parent; + private final Converter[] memberConverters; + private Object memberValue = null; + + public AvroUnionConverter(ParentValueContainer parent, Type parquetSchema, + Schema avroSchema) { + this.parent = parent; + GroupType parquetGroup = parquetSchema.asGroupType(); + this.memberConverters = new Converter[ parquetGroup.getFieldCount()]; + + int parquetIndex = 0; + for (int index = 0; index < avroSchema.getTypes().size(); index++) { + Schema memberSchema = avroSchema.getTypes().get(index); + if (!memberSchema.getType().equals(Schema.Type.NULL)) { + Type memberType = parquetGroup.getType(parquetIndex); + memberConverters[parquetIndex] = newConverter(memberSchema, memberType, new ParentValueContainer() { + @Override + void add(Object value) { + Preconditions.checkArgument(memberValue==null, "Union is resolving to more than one type"); + memberValue = value; + } + }); + parquetIndex++; // Note for nulls the parquetIndex id not increased + } + } + } + + @Override + public Converter getConverter(int fieldIndex) { + return memberConverters[fieldIndex]; + } + + @Override + public void start() { + memberValue = null; + } + + @Override + public void end() { + parent.add(memberValue); + } + } + static final class MapConverter extends GroupConverter { private final ParentValueContainer parent; diff --git a/parquet-avro/src/main/java/parquet/avro/AvroParquetInputFormat.java b/parquet-avro/src/main/java/parquet/avro/AvroParquetInputFormat.java index 01a3ae67c..8c95121d4 100644 --- a/parquet-avro/src/main/java/parquet/avro/AvroParquetInputFormat.java +++ b/parquet-avro/src/main/java/parquet/avro/AvroParquetInputFormat.java @@ -15,14 +15,14 @@ */ package parquet.avro; -import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; import parquet.avro.AvroReadSupport; import parquet.hadoop.ParquetInputFormat; /** * A Hadoop {@link org.apache.hadoop.mapreduce.InputFormat} for Parquet files. */ -public class AvroParquetInputFormat extends ParquetInputFormat { +public class AvroParquetInputFormat extends ParquetInputFormat { public AvroParquetInputFormat() { super(AvroReadSupport.class); } diff --git a/parquet-avro/src/main/java/parquet/avro/AvroParquetOutputFormat.java b/parquet-avro/src/main/java/parquet/avro/AvroParquetOutputFormat.java index 095d56330..67f7ca10a 100644 --- a/parquet-avro/src/main/java/parquet/avro/AvroParquetOutputFormat.java +++ b/parquet-avro/src/main/java/parquet/avro/AvroParquetOutputFormat.java @@ -16,7 +16,7 @@ package parquet.avro; import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.mapreduce.Job; import parquet.avro.AvroWriteSupport; import parquet.hadoop.ParquetOutputFormat; @@ -25,7 +25,7 @@ /** * A Hadoop {@link org.apache.hadoop.mapreduce.OutputFormat} for Parquet files. */ -public class AvroParquetOutputFormat extends ParquetOutputFormat { +public class AvroParquetOutputFormat extends ParquetOutputFormat { public static void setSchema(Job job, Schema schema) { AvroWriteSupport.setSchema(ContextUtil.getConfiguration(job), schema); diff --git a/parquet-avro/src/main/java/parquet/avro/AvroParquetReader.java b/parquet-avro/src/main/java/parquet/avro/AvroParquetReader.java index 849423e0b..211895cd2 100644 --- a/parquet-avro/src/main/java/parquet/avro/AvroParquetReader.java +++ b/parquet-avro/src/main/java/parquet/avro/AvroParquetReader.java @@ -16,16 +16,23 @@ package parquet.avro; import java.io.IOException; + +import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.Path; +import parquet.filter.UnboundRecordFilter; import parquet.hadoop.ParquetReader; import parquet.hadoop.api.ReadSupport; /** * Read Avro records from a Parquet file. */ -public class AvroParquetReader extends ParquetReader { +public class AvroParquetReader extends ParquetReader { public AvroParquetReader(Path file) throws IOException { super(file, (ReadSupport) new AvroReadSupport()); } + + public AvroParquetReader(Path file, UnboundRecordFilter recordFilter ) throws IOException { + super(file, (ReadSupport) new AvroReadSupport(), recordFilter); + } } diff --git a/parquet-avro/src/main/java/parquet/avro/AvroParquetWriter.java b/parquet-avro/src/main/java/parquet/avro/AvroParquetWriter.java index 23c7a2498..27d359cb6 100644 --- a/parquet-avro/src/main/java/parquet/avro/AvroParquetWriter.java +++ b/parquet-avro/src/main/java/parquet/avro/AvroParquetWriter.java @@ -17,6 +17,7 @@ import java.io.IOException; import org.apache.avro.Schema; +import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.Path; import parquet.hadoop.ParquetWriter; import parquet.hadoop.api.WriteSupport; @@ -25,7 +26,7 @@ /** * Write Avro records to a Parquet file. */ -public class AvroParquetWriter extends ParquetWriter { +public class AvroParquetWriter extends ParquetWriter { /** Create a new {@link AvroParquetWriter}. * @@ -39,21 +40,38 @@ public AvroParquetWriter(Path file, Schema avroSchema, CompressionCodecName compressionCodecName, int blockSize, int pageSize) throws IOException { + super(file, (WriteSupport)new AvroWriteSupport(new AvroSchemaConverter().convert(avroSchema), avroSchema), + compressionCodecName, blockSize, pageSize); + } + + /** Create a new {@link AvroParquetWriter}. + * + * @param file The file name to write to. + * @param avroSchema The schema to write with. + * @param compressionCodecName Compression code to use, or CompressionCodecName.UNCOMPRESSED + * @param blockSize HDFS block size + * @param pageSize See parquet write up. Blocks are subdivided into pages for alignment and other purposes. + * @param enableDictionary Whether to use a dictionary to compress columns. + * @throws IOException + */ + public AvroParquetWriter(Path file, Schema avroSchema, + CompressionCodecName compressionCodecName, int blockSize, + int pageSize, boolean enableDictionary) throws IOException { super(file, (WriteSupport) - new AvroWriteSupport(new AvroSchemaConverter().convert(avroSchema), avroSchema), - compressionCodecName, blockSize, pageSize); + new AvroWriteSupport(new AvroSchemaConverter().convert(avroSchema),avroSchema), + compressionCodecName, blockSize, pageSize, enableDictionary, false); } /** Create a new {@link AvroParquetWriter}. The default block size is 50 MB.The default * page size is 1 MB. Default compression is no compression. (Inherited from {@link ParquetWriter}) * - * @param file - * @param avroSchema + * @param file The file name to write to. + * @param avroSchema The schema to write with. * @throws IOException */ public AvroParquetWriter(Path file, Schema avroSchema) throws IOException { this(file, avroSchema, CompressionCodecName.UNCOMPRESSED, - DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE); + DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE); } } diff --git a/parquet-avro/src/main/java/parquet/avro/AvroReadSupport.java b/parquet-avro/src/main/java/parquet/avro/AvroReadSupport.java index dc1f088df..1ea892996 100644 --- a/parquet-avro/src/main/java/parquet/avro/AvroReadSupport.java +++ b/parquet-avro/src/main/java/parquet/avro/AvroReadSupport.java @@ -17,18 +17,18 @@ import java.util.Map; import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.conf.Configuration; import parquet.hadoop.api.ReadSupport; import parquet.io.api.RecordMaterializer; import parquet.schema.MessageType; /** - * Avro implementation of {@link ReadSupport} for {@link GenericRecord}s. Users should - * use {@link AvroParquetReader} or {@link AvroParquetInputFormat} rather than using + * Avro implementation of {@link ReadSupport} for Avro {@link IndexedRecord}s which cover both Avro Specific and + * Generic. Users should use {@link AvroParquetReader} or {@link AvroParquetInputFormat} rather than using * this class directly. */ -public class AvroReadSupport extends ReadSupport { +public class AvroReadSupport extends ReadSupport { @Override public ReadContext init(Configuration configuration, Map keyValueMetaData, MessageType fileSchema) { @@ -36,7 +36,7 @@ public ReadContext init(Configuration configuration, Map keyValu } @Override - public RecordMaterializer prepareForRead(Configuration configuration, Map keyValueMetaData, MessageType fileSchema, ReadContext readContext) { + public RecordMaterializer prepareForRead(Configuration configuration, Map keyValueMetaData, MessageType fileSchema, ReadContext readContext) { Schema avroSchema = new Schema.Parser().parse(keyValueMetaData.get("avro.schema")); return new AvroRecordMaterializer(readContext.getRequestedSchema(), avroSchema); } diff --git a/parquet-avro/src/main/java/parquet/avro/AvroRecordMaterializer.java b/parquet-avro/src/main/java/parquet/avro/AvroRecordMaterializer.java index 3a3d1f624..e9df29f25 100644 --- a/parquet-avro/src/main/java/parquet/avro/AvroRecordMaterializer.java +++ b/parquet-avro/src/main/java/parquet/avro/AvroRecordMaterializer.java @@ -16,21 +16,21 @@ package parquet.avro; import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; import parquet.io.api.GroupConverter; import parquet.io.api.RecordMaterializer; import parquet.schema.MessageType; -class AvroRecordMaterializer extends RecordMaterializer { +class AvroRecordMaterializer extends RecordMaterializer { - private AvroGenericRecordConverter root; + private AvroIndexedRecordConverter root; public AvroRecordMaterializer(MessageType requestedSchema, Schema avroSchema) { - this.root = new AvroGenericRecordConverter(requestedSchema, avroSchema); + this.root = new AvroIndexedRecordConverter(requestedSchema, avroSchema); } @Override - public GenericRecord getCurrentRecord() { + public IndexedRecord getCurrentRecord() { return root.getCurrentRecord(); } diff --git a/parquet-avro/src/main/java/parquet/avro/AvroSchemaConverter.java b/parquet-avro/src/main/java/parquet/avro/AvroSchemaConverter.java index a7c28d521..9a5524a0d 100644 --- a/parquet-avro/src/main/java/parquet/avro/AvroSchemaConverter.java +++ b/parquet-avro/src/main/java/parquet/avro/AvroSchemaConverter.java @@ -15,8 +15,8 @@ */ package parquet.avro; -import java.util.ArrayList; -import java.util.List; +import java.util.*; + import org.apache.avro.Schema; import parquet.schema.GroupType; import parquet.schema.MessageType; @@ -114,19 +114,39 @@ private Type convertField(String fieldName, Schema schema, Type.Repetition repet } else if (type.equals(Schema.Type.FIXED)) { return primitive(fieldName, FIXED_LEN_BYTE_ARRAY, repetition); } else if (type.equals(Schema.Type.UNION)) { - List schemas = schema.getTypes(); - if (schemas.size() == 2) { - if (schemas.get(0).getType().equals(Schema.Type.NULL)) { - return convertField(fieldName, schemas.get(1), Type.Repetition.OPTIONAL); - } else if (schemas.get(1).getType().equals(Schema.Type.NULL)) { - return convertField(fieldName, schemas.get(0), Type.Repetition.OPTIONAL); - } - } - throw new UnsupportedOperationException("Cannot convert Avro type " + type); + return convertUnion(fieldName, schema, repetition); } throw new UnsupportedOperationException("Cannot convert Avro type " + type); } + private Type convertUnion(String fieldName, Schema schema, Type.Repetition repetition) { + List nonNullSchemas = new ArrayList(schema.getTypes().size()); + for (Schema childSchema : schema.getTypes()) { + if (childSchema.getType().equals(Schema.Type.NULL)) { + repetition = Type.Repetition.OPTIONAL; + } else { + nonNullSchemas.add(childSchema); + } + } + // If we only get a null and one other type then its a simple optional field + // otherwise construct a union container + switch (nonNullSchemas.size()) { + case 0: + throw new UnsupportedOperationException("Cannot convert Avro union of only nulls"); + + case 1: + return convertField(fieldName, nonNullSchemas.get(0), Type.Repetition.OPTIONAL); // Simple optional field + + default: // complex union type + List unionTypes = new ArrayList(nonNullSchemas.size()); + int index = 0; + for (Schema childSchema : nonNullSchemas) { + unionTypes.add( convertField("member" + index++, childSchema, Type.Repetition.OPTIONAL)); + } + return new GroupType(repetition, fieldName, unionTypes); + } + } + private Type convertField(Schema.Field field) { return convertField(field.name(), field.schema()); } diff --git a/parquet-avro/src/main/java/parquet/avro/AvroWriteSupport.java b/parquet-avro/src/main/java/parquet/avro/AvroWriteSupport.java index 661ba7a9f..fc5ec70b7 100644 --- a/parquet-avro/src/main/java/parquet/avro/AvroWriteSupport.java +++ b/parquet-avro/src/main/java/parquet/avro/AvroWriteSupport.java @@ -21,8 +21,11 @@ import java.util.Map; import org.apache.avro.Schema; import org.apache.avro.generic.GenericArray; +import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericFixed; -import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; import org.apache.avro.util.Utf8; import org.apache.hadoop.conf.Configuration; import parquet.hadoop.api.WriteSupport; @@ -33,11 +36,11 @@ import parquet.schema.Type; /** - * Avro implementation of {@link WriteSupport} for {@link GenericRecord}s. Users should - * use {@link AvroParquetWriter} or {@link AvroParquetOutputFormat} rather than using + * Avro implementation of {@link WriteSupport} for {@link IndexedRecord}s - both Avro Generic and Specific. + * Users should use {@link AvroParquetWriter} or {@link AvroParquetOutputFormat} rather than using * this class directly. */ -public class AvroWriteSupport extends WriteSupport { +public class AvroWriteSupport extends WriteSupport { private RecordConsumer recordConsumer; private MessageType rootSchema; @@ -72,21 +75,21 @@ public void prepareForWrite(RecordConsumer recordConsumer) { } @Override - public void write(GenericRecord record) { + public void write(IndexedRecord record) { recordConsumer.startMessage(); writeRecordFields(rootSchema, rootAvroSchema, record); recordConsumer.endMessage(); } private void writeRecord(GroupType schema, Schema avroSchema, - GenericRecord record) { + IndexedRecord record) { recordConsumer.startGroup(); writeRecordFields(schema, avroSchema, record); recordConsumer.endGroup(); } private void writeRecordFields(GroupType schema, Schema avroSchema, - GenericRecord record) { + IndexedRecord record) { List fields = schema.getFields(); List avroFields = avroSchema.getFields(); int index = 0; // parquet ignores Avro nulls, so index may differ @@ -109,7 +112,7 @@ private void writeRecordFields(GroupType schema, Schema avroSchema, } private void writeArray(GroupType schema, Schema avroSchema, - GenericArray array) { + Iterable array) { recordConsumer.startGroup(); // group wrapper (original type LIST) recordConsumer.startField("array", 0); for (T elt : array) { @@ -143,6 +146,31 @@ private void writeRecordFields(GroupType schema, Schema avroSchema, recordConsumer.endGroup(); } + private void writeUnion(GroupType parquetSchema, Schema avroSchema, Object value) { + + recordConsumer.startGroup(); + + // ResolveUnion will tell us which of the union member types to deserialise + int avroIndex = GenericData.get().resolveUnion(avroSchema, value); + + // For parquet's schema we skip nulls + GroupType parquetGroup = parquetSchema.asGroupType(); + int parquetIndex = avroIndex; + for ( int i=0; i) value); + writeArray((GroupType) type, nonNullAvroSchema, (Iterable) value); } else if (avroType.equals(Schema.Type.MAP)) { writeMap((GroupType) type, nonNullAvroSchema, (Map) value); + } else if (avroType.equals(Schema.Type.UNION)) { + writeUnion((GroupType) type, nonNullAvroSchema, value); } else if (avroType.equals(Schema.Type.FIXED)) { recordConsumer.addBinary(Binary.fromByteArray(((GenericFixed) value).bytes())); } diff --git a/parquet-avro/src/test/java/parquet/avro/TestAvroSchemaConverter.java b/parquet-avro/src/test/java/parquet/avro/TestAvroSchemaConverter.java index 5c87ab7e8..2b5ce9abb 100644 --- a/parquet-avro/src/test/java/parquet/avro/TestAvroSchemaConverter.java +++ b/parquet-avro/src/test/java/parquet/avro/TestAvroSchemaConverter.java @@ -89,14 +89,25 @@ public void testOptionalFields() throws Exception { "}\n"); } - @Test(expected = UnsupportedOperationException.class) - public void testNonNullUnion() { - Schema union = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.INT), - Schema.create(Schema.Type.LONG))); - Schema schema = Schema.createRecord("record1", null, null, false); + @Test + public void testUnionOfTwoTypes() throws Exception { + Schema schema = Schema.createRecord("record2", null, null, false); + Schema multipleTypes = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type + .NULL), + Schema.create(Schema.Type.INT), + Schema.create(Schema.Type.FLOAT))); schema.setFields(Arrays.asList( - new Schema.Field("myunion", union, null, null))); + new Schema.Field("myunion", multipleTypes, null, NullNode.getInstance()) + )); - new AvroSchemaConverter().convert(schema); + // Avro union is modelled using optional data members of thw different types; + testConversion( + schema, + "message record2 {\n" + + " optional group myunion {\n" + + " optional int32 member0;\n" + + " optional float member1;\n" + + " }\n" + + "}\n"); } } diff --git a/parquet-avro/src/test/java/parquet/avro/TestSpecificReadWrite.java b/parquet-avro/src/test/java/parquet/avro/TestSpecificReadWrite.java new file mode 100644 index 000000000..aaa5d9b17 --- /dev/null +++ b/parquet-avro/src/test/java/parquet/avro/TestSpecificReadWrite.java @@ -0,0 +1,180 @@ +/** + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package parquet.avro; + +import com.google.common.collect.ImmutableList; +import org.apache.hadoop.fs.Path; +import org.junit.Assert; +import org.junit.Test; +import parquet.hadoop.ParquetReader; +import parquet.hadoop.ParquetWriter; +import parquet.hadoop.metadata.CompressionCodecName; + +import java.io.File; +import java.io.IOException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static parquet.filter.ColumnRecordFilter.column; +import static parquet.filter.ColumnPredicates.equalTo; +import static parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE; +import static parquet.hadoop.ParquetWriter.DEFAULT_PAGE_SIZE; + +/** + * Other tests exercise the use of Avro Generic, a dynamic data representation. This class focuses + * on Avro Speific whose schemas are pre-compiled to POJOs with built in SerDe for faster serialization. + */ +public class TestSpecificReadWrite { + + @Test + public void testReadWriteSpecific() throws IOException { + Path path = writeCarsToParquetFile(10, CompressionCodecName.UNCOMPRESSED, false); + ParquetReader reader = new AvroParquetReader(path); + for (int i = 0; i < 10; i++) { + assertEquals(getVwPolo().toString(),reader.read().toString()); + assertEquals(getVwPassat().toString(),reader.read().toString()); + assertEquals(getBmwMini().toString(),reader.read().toString()); + } + assertNull(reader.read()); + } + + @Test + public void testReadWriteSpecificWithDictionary() throws IOException { + Path path = writeCarsToParquetFile(10, CompressionCodecName.UNCOMPRESSED, true); + ParquetReader reader = new AvroParquetReader(path); + for (int i = 0; i < 10; i++) { + assertEquals(getVwPolo().toString(),reader.read().toString()); + assertEquals(getVwPassat().toString(),reader.read().toString()); + assertEquals(getBmwMini().toString(),reader.read().toString()); + } + assertNull(reader.read()); + } + + @Test + public void testFilterMatchesMultiple() throws IOException { + + Path path = writeCarsToParquetFile(10, CompressionCodecName.UNCOMPRESSED, false); + + ParquetReader reader = new AvroParquetReader(path, column("make", equalTo("Volkswagen"))); + for (int i = 0; i < 10; i++) { + assertEquals(getVwPolo().toString(),reader.read().toString()); + assertEquals(getVwPassat().toString(),reader.read().toString()); + } + assertNull( reader.read()); + } + + @Test + public void testFilterWithDictionary() throws IOException { + + Path path = writeCarsToParquetFile(1,CompressionCodecName.UNCOMPRESSED,true); + + ParquetReader reader = new AvroParquetReader(path, column("make", equalTo("Volkswagen"))); + assertEquals(getVwPolo().toString(),reader.read().toString()); + assertEquals(getVwPassat().toString(),reader.read().toString()); + assertNull( reader.read()); + } + + @Test + public void testFilterOnSubAttribute() throws IOException { + + Path path = writeCarsToParquetFile(1, CompressionCodecName.UNCOMPRESSED, false); + + ParquetReader reader = new AvroParquetReader(path, column("engine.type", equalTo(EngineType.DIESEL))); + assertEquals(reader.read().toString(),getVwPassat().toString()); + assertNull( reader.read()); + + reader = new AvroParquetReader(path, column("engine.capacity", equalTo(1.4f))); + assertEquals(getVwPolo().toString(),reader.read().toString()); + assertNull( reader.read()); + + + reader = new AvroParquetReader(path, column("engine.hasTurboCharger", equalTo(true))); + assertEquals(getBmwMini().toString(),reader.read().toString()); + assertNull( reader.read()); + } + + private Path writeCarsToParquetFile( int num, CompressionCodecName compression, boolean enableDictionary) throws IOException { + File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp"); + tmp.deleteOnExit(); + tmp.delete(); + Path path = new Path(tmp.getPath()); + + Car vwPolo = getVwPolo(); + Car vwPassat = getVwPassat(); + Car bmwMini = getBmwMini(); + + ParquetWriter writer = new AvroParquetWriter(path,Car.SCHEMA$, compression, + DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE, enableDictionary); + for (int i = 0; i < num; i++) { + writer.write(vwPolo); + writer.write(vwPassat); + writer.write(bmwMini); + } + writer.close(); + return path; + } + + public static Car getVwPolo() { + return Car.newBuilder() + .setYear(2010) + .setRegistration("A123 GTR") + .setMake("Volkswagen") + .setModel("Polo") + .setDoors(4) + .setEngine(Engine.newBuilder().setType(EngineType.PETROL) + .setCapacity(1.4f).setHasTurboCharger(false).build()) + .setOptionalExtra( + Stereo.newBuilder().setMake("Blaupunkt").setSpeakers(4).build()) + .setServiceHistory(ImmutableList.of( + Service.newBuilder().setDate(1325376000l).setMechanic("Jim").build(), + Service.newBuilder().setDate(1356998400l).setMechanic("Mike").build() + )) + .build(); + } + + public static Car getVwPassat() { + return Car.newBuilder() + .setYear(2010) + .setRegistration("A123 GXR") + .setMake("Volkswagen") + .setModel("Passat") + .setDoors(5) + .setEngine(Engine.newBuilder().setType(EngineType.DIESEL) + .setCapacity(2.0f).setHasTurboCharger(false).build()) + .setOptionalExtra( + LeatherTrim.newBuilder().setColour("Black").build()) + .setServiceHistory(ImmutableList.of( + Service.newBuilder().setDate(1325376000l).setMechanic("Jim").build() + )) + .build(); + } + + public static Car getBmwMini() { + return Car.newBuilder() + .setYear(2010) + .setRegistration("A124 GSR") + .setMake("BMW") + .setModel("Mini") + .setDoors(4) + .setEngine(Engine.newBuilder().setType(EngineType.PETROL) + .setCapacity(1.6f).setHasTurboCharger(true).build()) + .setOptionalExtra(null) + .setServiceHistory(ImmutableList.of( + Service.newBuilder().setDate(1356998400l).setMechanic("Mike").build() + )) + .build(); + } +} diff --git a/parquet-avro/src/test/resources/car.avdl b/parquet-avro/src/test/resources/car.avdl new file mode 100644 index 000000000..e330aaec4 --- /dev/null +++ b/parquet-avro/src/test/resources/car.avdl @@ -0,0 +1,38 @@ +@namespace("parquet.avro") +protocol Cars { + + record Service { + long date; + string mechanic; + } + + record Stereo { + string make; + int speakers; + } + + record LeatherTrim { + string colour; + } + + enum EngineType { + DIESEL, PETROL + } + + record Engine { + EngineType type; + float capacity; + boolean hasTurboCharger; + } + + record Car { + long year; + string registration; + string make; + string model; + int doors; + Engine engine; + union { null, Stereo, LeatherTrim } optionalExtra = null; + array serviceHistory; + } +} \ No newline at end of file diff --git a/parquet-column/src/main/java/parquet/Ints.java b/parquet-column/src/main/java/parquet/Ints.java index 667540b1f..1e7f51aed 100644 --- a/parquet-column/src/main/java/parquet/Ints.java +++ b/parquet-column/src/main/java/parquet/Ints.java @@ -1,3 +1,18 @@ +/** + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package parquet; /** diff --git a/parquet-column/src/main/java/parquet/column/ColumnReader.java b/parquet-column/src/main/java/parquet/column/ColumnReader.java index 064e62856..149250f8f 100644 --- a/parquet-column/src/main/java/parquet/column/ColumnReader.java +++ b/parquet-column/src/main/java/parquet/column/ColumnReader.java @@ -98,4 +98,14 @@ * @return the current value */ double getDouble(); + + /** + * @return Descriptor of the column. + */ + ColumnDescriptor getDescriptor(); + + /** + * Skip the current value + */ + void skip(); } diff --git a/parquet-column/src/main/java/parquet/column/impl/ColumnReaderImpl.java b/parquet-column/src/main/java/parquet/column/impl/ColumnReaderImpl.java index 49e44e3b9..f7371d56b 100644 --- a/parquet-column/src/main/java/parquet/column/impl/ColumnReaderImpl.java +++ b/parquet-column/src/main/java/parquet/column/impl/ColumnReaderImpl.java @@ -47,6 +47,7 @@ private static abstract class Binding { abstract void read(); + abstract void skip(); abstract void writeValue(); public int getDictionaryId() { throw new UnsupportedOperationException(); @@ -99,6 +100,10 @@ private void bindToDictionary(final Dictionary dictionary) { void read() { dictionaryId = dataColumn.readValueDictionaryId(); } + public void skip() { + // Type is not important here as its just a key + dataColumn.skipBytes(); + } public int getDictionaryId() { return dictionaryId; } @@ -135,6 +140,10 @@ public Binding convertFLOAT(PrimitiveTypeName primitiveTypeName) { void read() { current = dataColumn.readFloat(); } + public void skip() { + current = 0; + dataColumn.skipFloat(); + } public float getFloat() { return current; } @@ -150,6 +159,10 @@ public Binding convertDOUBLE(PrimitiveTypeName primitiveTypeName) { void read() { current = dataColumn.readDouble(); } + public void skip() { + current = 0; + dataColumn.skipDouble(); + } public double getDouble() { return current; } @@ -165,6 +178,10 @@ public Binding convertINT32(PrimitiveTypeName primitiveTypeName) { void read() { current = dataColumn.readInteger(); } + public void skip() { + current = 0; + dataColumn.skipInteger(); + } @Override public int getInteger() { return current; @@ -181,6 +198,10 @@ public Binding convertINT64(PrimitiveTypeName primitiveTypeName) { void read() { current = dataColumn.readLong(); } + public void skip() { + current = 0; + dataColumn.skipLong(); + } @Override public long getLong() { return current; @@ -206,6 +227,10 @@ public Binding convertBOOLEAN(PrimitiveTypeName primitiveTypeName) { void read() { current = dataColumn.readBoolean(); } + public void skip() { + current = false; + dataColumn.skipBoolean(); + } @Override public boolean getBoolean() { return current; @@ -222,6 +247,10 @@ public Binding convertBINARY(PrimitiveTypeName primitiveTypeName) { void read() { current = dataColumn.readBytes(); } + public void skip() { + current = null; + dataColumn.skipBytes(); + } @Override public Binary getBinary() { return current; @@ -268,7 +297,7 @@ public ColumnReaderImpl(ColumnDescriptor path, PageReader pageReader, PrimitiveC */ @Override public boolean isFullyConsumed() { - return readValues >= totalValueCount; + return ((readValues == totalValueCount) && consumed) || (readValues > totalValueCount); } /** @@ -277,13 +306,13 @@ public boolean isFullyConsumed() { */ @Override public void writeCurrentValueToConverter() { - checkValueRead(); + readIfPossible(false); this.binding.writeValue(); } @Override public int getCurrentValueDictionaryID() { - checkValueRead(); + readIfPossible(false); return binding.getDictionaryId(); } @@ -293,7 +322,7 @@ public int getCurrentValueDictionaryID() { */ @Override public int getInteger() { - checkValueRead(); + readIfPossible(false); return this.binding.getInteger(); } @@ -303,7 +332,7 @@ public int getInteger() { */ @Override public boolean getBoolean() { - checkValueRead(); + readIfPossible(false); return this.binding.getBoolean(); } @@ -313,7 +342,7 @@ public boolean getBoolean() { */ @Override public long getLong() { - checkValueRead(); + readIfPossible(false); return this.binding.getLong(); } @@ -323,7 +352,7 @@ public long getLong() { */ @Override public Binary getBinary() { - checkValueRead(); + readIfPossible(false); return this.binding.getBinary(); } @@ -333,7 +362,7 @@ public Binary getBinary() { */ @Override public float getFloat() { - checkValueRead(); + readIfPossible(false); return this.binding.getFloat(); } @@ -343,7 +372,7 @@ public float getFloat() { */ @Override public double getDouble() { - checkValueRead(); + readIfPossible(false); return this.binding.getDouble(); } @@ -358,17 +387,34 @@ public int getCurrentRepetitionLevel() { } /** + * {@inheritDoc} + * @see parquet.column.ColumnReader#getDescriptor() + */ + @Override + public ColumnDescriptor getDescriptor() { + return path; + } + + /** * reads the current value */ public void readCurrentValue() { binding.read(); } - protected void checkValueRead() { + /** + * Reads the value into the binding or skips forwards. + * @param skip If true do not deserialize just skip forwards + */ + protected void readIfPossible(boolean skip) { try { checkRead(); if (!consumed && !valueRead) { - readCurrentValue(); + if ( skip ) { + binding.skip(); + } else { + readCurrentValue(); + } valueRead = true; } } catch (RuntimeException e) { @@ -382,6 +428,15 @@ protected void checkValueRead() { /** * {@inheritDoc} + * @see parquet.column.ColumnReader#skip() + */ + @Override + public void skip() { + readIfPossible(true); + } + + /** + * {@inheritDoc} * @see parquet.column.ColumnReader#getCurrentDefinitionLevel() */ @Override diff --git a/parquet-column/src/main/java/parquet/column/values/ValuesReader.java b/parquet-column/src/main/java/parquet/column/values/ValuesReader.java index d5f96c131..5086e60ac 100644 --- a/parquet-column/src/main/java/parquet/column/values/ValuesReader.java +++ b/parquet-column/src/main/java/parquet/column/values/ValuesReader.java @@ -72,6 +72,13 @@ public boolean readBoolean() { } /** + * Skips the next boolean in the page + */ + public void skipBoolean() { + throw new UnsupportedOperationException(); + } + + /** * @return the next Binary from the page */ public Binary readBytes() { @@ -79,6 +86,13 @@ public Binary readBytes() { } /** + * Skips the next Binary in the page + */ + public void skipBytes() { + throw new UnsupportedOperationException(); + } + + /** * @return the next float from the page */ public float readFloat() { @@ -86,6 +100,13 @@ public float readFloat() { } /** + * Skips the next float in the page + */ + public void skipFloat() { + throw new UnsupportedOperationException(); + } + + /** * @return the next double from the page */ public double readDouble() { @@ -93,6 +114,13 @@ public double readDouble() { } /** + * Skips the next float in the page + */ + public void skipDouble() { + throw new UnsupportedOperationException(); + } + + /** * @return the next integer from the page */ public int readInteger() { @@ -100,10 +128,23 @@ public int readInteger() { } /** + * Skips the next integer in the page + */ + public void skipInteger() { + throw new UnsupportedOperationException(); + } + + /** * @return the next long from the page */ public long readLong() { throw new UnsupportedOperationException(); } + /** + * Skips the next long in the page + */ + public void skipLong() { + throw new UnsupportedOperationException(); + } } \ No newline at end of file diff --git a/parquet-column/src/main/java/parquet/column/values/dictionary/DictionaryValuesReader.java b/parquet-column/src/main/java/parquet/column/values/dictionary/DictionaryValuesReader.java index 8cafa093c..b2920b077 100644 --- a/parquet-column/src/main/java/parquet/column/values/dictionary/DictionaryValuesReader.java +++ b/parquet-column/src/main/java/parquet/column/values/dictionary/DictionaryValuesReader.java @@ -77,4 +77,12 @@ public Binary readBytes() { } } + @Override + public void skipBytes() { + try { + decoder.readInt(); // Type does not matter as we are just skipping dictionary keys + } catch (IOException e) { + throw new ParquetDecodingException(e); + } + } } diff --git a/parquet-column/src/main/java/parquet/column/values/plain/BinaryPlainValuesReader.java b/parquet-column/src/main/java/parquet/column/values/plain/BinaryPlainValuesReader.java index 238e749b0..6a009dbd8 100644 --- a/parquet-column/src/main/java/parquet/column/values/plain/BinaryPlainValuesReader.java +++ b/parquet-column/src/main/java/parquet/column/values/plain/BinaryPlainValuesReader.java @@ -45,6 +45,18 @@ public Binary readBytes() { } @Override + public void skipBytes() { + try { + int length = BytesUtils.readIntLittleEndian(in, offset); + offset += 4 + length; + } catch (IOException e) { + throw new ParquetDecodingException("could not skip bytes at offset " + offset, e); + } catch (RuntimeException e) { + throw new ParquetDecodingException("could not skip bytes at offset " + offset, e); + } + } + + @Override public int initFromPage(long valueCount, byte[] in, int offset) throws IOException { if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.length - offset)); diff --git a/parquet-column/src/main/java/parquet/column/values/plain/BooleanPlainValuesReader.java b/parquet-column/src/main/java/parquet/column/values/plain/BooleanPlainValuesReader.java index c259f245f..dfe3cd48a 100644 --- a/parquet-column/src/main/java/parquet/column/values/plain/BooleanPlainValuesReader.java +++ b/parquet-column/src/main/java/parquet/column/values/plain/BooleanPlainValuesReader.java @@ -45,6 +45,14 @@ public boolean readBoolean() { return in.readInteger() == 0 ? false : true; } + /** + * {@inheritDoc} + * @see parquet.column.values.ValuesReader#skipBoolean() + */ + @Override + public void skipBoolean() { + in.readInteger(); + } /** * {@inheritDoc} diff --git a/parquet-column/src/main/java/parquet/column/values/plain/PlainValuesReader.java b/parquet-column/src/main/java/parquet/column/values/plain/PlainValuesReader.java index 2bf574b40..fdf7e9e78 100644 --- a/parquet-column/src/main/java/parquet/column/values/plain/PlainValuesReader.java +++ b/parquet-column/src/main/java/parquet/column/values/plain/PlainValuesReader.java @@ -46,6 +46,15 @@ public float readFloat() { } @Override + public void skipFloat() { + try { + in.skipBytes(4); + } catch (IOException e) { + throw new ParquetDecodingException("could not skip float", e); + } + } + + @Override public double readDouble() { try { return in.readDouble(); @@ -55,6 +64,15 @@ public double readDouble() { } @Override + public void skipDouble() { + try { + in.skipBytes(8); + } catch (IOException e) { + throw new ParquetDecodingException("could not skip double", e); + } + } + + @Override public int readInteger() { try { return in.readInt(); @@ -64,6 +82,15 @@ public int readInteger() { } @Override + public void skipInteger() { + try { + in.skipBytes(4); + } catch (IOException e) { + throw new ParquetDecodingException("could not skip int", e); + } + } + + @Override public long readLong() { try { return in.readLong(); @@ -72,6 +99,15 @@ public long readLong() { } } + @Override + public void skipLong() { + try { + in.skipBytes(8); + } catch (IOException e) { + throw new ParquetDecodingException("could not skip long", e); + } + } + /** * {@inheritDoc} * @see parquet.column.values.ValuesReader#initFromPage(byte[], int) diff --git a/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java b/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java index 76f5f81c4..115898932 100644 --- a/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java +++ b/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java @@ -1,3 +1,18 @@ +/** + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package parquet.column.values.rle; import java.io.IOException; diff --git a/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java b/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java index f00f4134f..fb2e37cd9 100644 --- a/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java +++ b/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java @@ -1,3 +1,18 @@ +/** + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package parquet.column.values.rle; import java.io.ByteArrayInputStream; diff --git a/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java b/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java index c1a3b4ce0..817030c0c 100644 --- a/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java +++ b/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java @@ -1,3 +1,18 @@ +/** + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package parquet.column.values.rle; import java.io.ByteArrayOutputStream; diff --git a/parquet-column/src/main/java/parquet/filter/AndRecordFilter.java b/parquet-column/src/main/java/parquet/filter/AndRecordFilter.java new file mode 100644 index 000000000..04221debf --- /dev/null +++ b/parquet-column/src/main/java/parquet/filter/AndRecordFilter.java @@ -0,0 +1,66 @@ +/** + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package parquet.filter; + +import parquet.Preconditions; +import parquet.column.ColumnReader; + +/** + * Provides ability to chain two filters together. Bear in mind that the first one will + * short circuit the second. Useful if getting a page of already filtered result. + * i.e and( column("manufacturer", equalTo("Volkswagen")), page(100,50)) + * + * @author Jacob Metcalf + */ +public final class AndRecordFilter implements RecordFilter { + + private final RecordFilter boundFilter1; + private final RecordFilter boundFilter2; + + /** + * Returns builder for creating an and filter. + * @param filter1 The first filter to check. + * @param filter2 The second filter to check. + */ + public static final UnboundRecordFilter and( final UnboundRecordFilter filter1, final UnboundRecordFilter filter2 ) { + Preconditions.checkNotNull( filter1, "filter1" ); + Preconditions.checkNotNull( filter2, "filter2" ); + return new UnboundRecordFilter() { + @Override + public RecordFilter bind(Iterable readers) { + return new AndRecordFilter( filter1.bind(readers), filter2.bind( readers) ); + } + }; + } + + /** + * Private constructor, use AndRecordFilter.and() instead. + */ + private AndRecordFilter( RecordFilter boundFilter1, RecordFilter boundFilter2 ) { + this.boundFilter1 = boundFilter1; + this.boundFilter2 = boundFilter2; + } + + @Override + public boolean isFullyConsumed() { + return boundFilter1.isFullyConsumed() && boundFilter2.isFullyConsumed(); + } + + @Override + public boolean isMatch() { + return boundFilter1.isMatch() && boundFilter2.isMatch(); + } +} diff --git a/parquet-column/src/main/java/parquet/filter/ColumnPredicates.java b/parquet-column/src/main/java/parquet/filter/ColumnPredicates.java new file mode 100644 index 000000000..b8fc10b3f --- /dev/null +++ b/parquet-column/src/main/java/parquet/filter/ColumnPredicates.java @@ -0,0 +1,97 @@ +/** + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package parquet.filter; + +import parquet.Preconditions; +import parquet.column.ColumnReader; + +/** + * ColumnPredicates class provides checks for column values. Factory methods + * are provided for standard predicates which wrap the job of getting the + * correct value from the column. + */ +public class ColumnPredicates { + + public static interface Predicate { + boolean apply(ColumnReader input); + } + + public static Predicate equalTo(final String target) { + Preconditions.checkNotNull(target,"target"); + return new Predicate() { + @Override + public boolean apply(ColumnReader input) { + return target.equals(input.getBinary().toStringUsingUTF8()); + } + }; + } + + public static Predicate equalTo(final int target) { + return new Predicate() { + @Override + public boolean apply(ColumnReader input) { + return input.getInteger() == target; + } + }; + } + + public static Predicate equalTo(final long target) { + return new Predicate() { + @Override + public boolean apply(ColumnReader input) { + return input.getLong() == target; + } + }; + } + + public static Predicate equalTo(final float target) { + return new Predicate() { + @Override + public boolean apply(ColumnReader input) { + return input.getFloat() == target; + } + }; + } + + public static Predicate equalTo(final double target) { + return new Predicate() { + @Override + public boolean apply(ColumnReader input) { + return input.getDouble() == target; + } + }; + } + + public static Predicate equalTo(final boolean target) { + return new Predicate() { + @Override + public boolean apply(ColumnReader input) { + return input.getBoolean() == target; + } + }; + } + + public static Predicate equalTo(final E target) { + Preconditions.checkNotNull(target,"target"); + final String targetAsString = target.name(); + return new Predicate() { + @Override + public boolean apply(ColumnReader input) { + return targetAsString.equals(input.getBinary().toStringUsingUTF8()); + } + }; + } +} diff --git a/parquet-column/src/main/java/parquet/filter/ColumnRecordFilter.java b/parquet-column/src/main/java/parquet/filter/ColumnRecordFilter.java new file mode 100644 index 000000000..104416f02 --- /dev/null +++ b/parquet-column/src/main/java/parquet/filter/ColumnRecordFilter.java @@ -0,0 +1,80 @@ +/** + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package parquet.filter; + +import parquet.column.ColumnReader; +import java.util.Arrays; +import static parquet.Preconditions.checkNotNull; + +/** + * Record filter which applies the supplied predicate to the specified column. + */ +public final class ColumnRecordFilter implements RecordFilter { + + private final ColumnReader filterOnColumn; + private final ColumnPredicates.Predicate filterPredicate; + + /** + * Factory method for record filter which applies the supplied predicate to the specified column. + * Note that if searching for a repeated sub-attribute it will only ever match against the + * first instance of it in the object. + * + * @param columnPath Dot separated path specifier, e.g. "engine.capacity" + * @param predicate Should call getBinary etc. and check the value + */ + public static final UnboundRecordFilter column(final String columnPath, + final ColumnPredicates.Predicate predicate) { + checkNotNull(columnPath, "columnPath"); + checkNotNull(predicate, "predicate"); + return new UnboundRecordFilter() { + final String[] filterPath = columnPath.split("\\."); + @Override + public RecordFilter bind(Iterable readers) { + for (ColumnReader reader : readers) { + if ( Arrays.equals( reader.getDescriptor().getPath(), filterPath)) { + return new ColumnRecordFilter(reader, predicate); + } + } + throw new IllegalArgumentException( "Column " + columnPath + " does not exist."); + } + }; + } + + /** + * Private constructor. Use column() instead. + */ + private ColumnRecordFilter(ColumnReader filterOnColumn, ColumnPredicates.Predicate filterPredicate) { + this.filterOnColumn = filterOnColumn; + this.filterPredicate = filterPredicate; + } + + /** + * @return true if the current value for the column reader matches the predicate. + */ + @Override + public boolean isMatch() { + + return ( filterOnColumn.isFullyConsumed()) ? false : filterPredicate.apply( filterOnColumn ); + } + + /** + * @return true if the column we are filtering on has no more values. + */ + @Override + public boolean isFullyConsumed() { + return filterOnColumn.isFullyConsumed(); + } +} diff --git a/parquet-column/src/main/java/parquet/filter/PagedRecordFilter.java b/parquet-column/src/main/java/parquet/filter/PagedRecordFilter.java new file mode 100644 index 000000000..41851129c --- /dev/null +++ b/parquet-column/src/main/java/parquet/filter/PagedRecordFilter.java @@ -0,0 +1,69 @@ +/** + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package parquet.filter; + +import parquet.column.ColumnReader; + +/** + * Filter which will only materialize a page worth of results. + */ +public final class PagedRecordFilter implements RecordFilter { + + private final long startPos; + private final long endPos; + private long currentPos = 0; + + /** + * Returns builder for creating a paged query. + * @param startPos The record to start from, numbering starts at 1. + * @param pageSize The size of the page. + */ + public static final UnboundRecordFilter page( final long startPos, final long pageSize ) { + return new UnboundRecordFilter() { + @Override + public RecordFilter bind(Iterable readers) { + return new PagedRecordFilter( startPos, pageSize ); + } + }; + } + + /** + * Private constructor, use column() instead. + */ + private PagedRecordFilter(long startPos, long pageSize) { + this.startPos = startPos; + this.endPos = startPos + pageSize; + } + + /** + * Terminate early when we have got our page. + */ + @Override + public boolean isFullyConsumed() { + return ( currentPos >= endPos ); + } + + /** + * Keeps track of how many times it is called. Only returns matches when the + * record number is in the range. + */ + @Override + public boolean isMatch() { + currentPos++; + return (( currentPos >= startPos ) && ( currentPos < endPos )); + } + +} diff --git a/parquet-column/src/main/java/parquet/filter/RecordFilter.java b/parquet-column/src/main/java/parquet/filter/RecordFilter.java new file mode 100644 index 000000000..569835e3f --- /dev/null +++ b/parquet-column/src/main/java/parquet/filter/RecordFilter.java @@ -0,0 +1,36 @@ +/** + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package parquet.filter; + +import parquet.column.ColumnReader; + +/** + * Filter to be applied to a record to work out whether to skip it. + * + * @author Jacob Metcalf + */ +public interface RecordFilter { + + /** + * Works out whether the current record can pass through the filter. + */ + boolean isMatch(); + + /** + * Whether the filter values are fully consumed. + */ + boolean isFullyConsumed(); +} diff --git a/parquet-column/src/main/java/parquet/filter/UnboundRecordFilter.java b/parquet-column/src/main/java/parquet/filter/UnboundRecordFilter.java new file mode 100644 index 000000000..b317727bb --- /dev/null +++ b/parquet-column/src/main/java/parquet/filter/UnboundRecordFilter.java @@ -0,0 +1,33 @@ +/** + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package parquet.filter; + +import parquet.column.ColumnReader; + +/** + * Builder for a record filter. Idea is that each filter provides a create function + * which returns an unbound filter. This only becomes a filter when it is bound to the actual + * columns. + * + * @author Jacob Metcalf + */ +public interface UnboundRecordFilter { + + /** + * Call to bind to actual columns and create filter. + */ + RecordFilter bind( Iterable readers); +} diff --git a/parquet-column/src/main/java/parquet/io/FilteredRecordReader.java b/parquet-column/src/main/java/parquet/io/FilteredRecordReader.java new file mode 100644 index 000000000..3c1ae92a4 --- /dev/null +++ b/parquet-column/src/main/java/parquet/io/FilteredRecordReader.java @@ -0,0 +1,92 @@ +/** + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package parquet.io; + +import parquet.column.ColumnReader; +import parquet.column.impl.ColumnReadStoreImpl; +import parquet.filter.RecordFilter; +import parquet.filter.UnboundRecordFilter; +import parquet.io.api.RecordMaterializer; + +import java.util.Arrays; + +/** + * Extends the + * @author Jacob Metcalf + * + */ +class FilteredRecordReader extends RecordReaderImplementation { + + private final RecordFilter recordFilter; + + /** + * @param root the root of the schema + * @param validating + * @param columnStore + * @param unboundFilter Filter records, pass in NULL_FILTER to leave unfiltered. + */ + public FilteredRecordReader(MessageColumnIO root, RecordMaterializer recordMaterializer, boolean validating, + ColumnReadStoreImpl columnStore, UnboundRecordFilter unboundFilter) { + super(root, recordMaterializer, validating, columnStore); + + if ( unboundFilter != null ) { + recordFilter = unboundFilter.bind(getColumnReaders()); + } else { + recordFilter = null; + } + } + + /** + * Override read() method to provide skip. + */ + @Override + public T read() { + if ( skipToMatch()) { + return super.read(); + } else { + return null; + } + } + + + /** + * Skips forwards until the filter finds the first match. Returns false + * if none found. + */ + private boolean skipToMatch() { + while ( !recordFilter.isMatch()) { + if ( recordFilter.isFullyConsumed()) { + return false; + } + State currentState = getState(0); + do { + ColumnReader columnReader = currentState.column; + + // currentLevel = depth + 1 at this point + // set the current value + if (columnReader.getCurrentDefinitionLevel() >= currentState.maxDefinitionLevel) { + columnReader.skip(); + } + columnReader.consume(); + + // Based on repetition level work out next state to go to + int nextR = currentState.maxRepetitionLevel == 0 ? 0 : columnReader.getCurrentRepetitionLevel(); + currentState = currentState.getNextState(nextR); + } while (currentState != null); + } + return true; + } +} diff --git a/parquet-column/src/main/java/parquet/io/MessageColumnIO.java b/parquet-column/src/main/java/parquet/io/MessageColumnIO.java index a6e31f66a..64c65ad68 100644 --- a/parquet-column/src/main/java/parquet/io/MessageColumnIO.java +++ b/parquet-column/src/main/java/parquet/io/MessageColumnIO.java @@ -27,6 +27,8 @@ import parquet.io.api.RecordConsumer; import parquet.io.api.RecordMaterializer; import parquet.schema.MessageType; +import parquet.filter.UnboundRecordFilter; +import parquet.filter.RecordFilter; /** * Message level of the IO structure @@ -59,7 +61,20 @@ recordMaterializer, validating, new ColumnReadStoreImpl(columns, recordMaterializer.getRootConverter(), getType()) - ); + ); + } + public RecordReader getRecordReader(PageReadStore columns, RecordMaterializer recordMaterializer, + UnboundRecordFilter unboundFilter) { + + return (unboundFilter == null) + ? getRecordReader(columns, recordMaterializer) + : new FilteredRecordReader( + this, + recordMaterializer, + validating, + new ColumnReadStoreImpl(columns, recordMaterializer.getRootConverter(), getType()), + unboundFilter + ); } private class MessageColumnIORecordConsumer extends RecordConsumer { diff --git a/parquet-column/src/main/java/parquet/io/RecordReaderImplementation.java b/parquet-column/src/main/java/parquet/io/RecordReaderImplementation.java index 5a2d83eeb..4d66278de 100644 --- a/parquet-column/src/main/java/parquet/io/RecordReaderImplementation.java +++ b/parquet-column/src/main/java/parquet/io/RecordReaderImplementation.java @@ -222,25 +222,29 @@ public int getDepth(int definitionLevel) { public Case getCase(int currentLevel, int d, int nextR) { return caseLookup[currentLevel][d][nextR]; } + + public State getNextState(int nextR) { + return nextState[nextR]; + } } private final GroupConverter recordConsumer; private final RecordMaterializer recordMaterializer; private State[] states; + private ColumnReader[] columns; /** * * @param root the root of the schema - * @param leaves the leaves of the schema * @param validating - * @param columns2 + * @param columnStore */ public RecordReaderImplementation(MessageColumnIO root, RecordMaterializer recordMaterializer, boolean validating, ColumnReadStoreImpl columnStore) { this.recordMaterializer = recordMaterializer; this.recordConsumer = recordMaterializer.getRootConverter(); // TODO: validator(wrap(recordMaterializer), validating, root.getType()); PrimitiveColumnIO[] leaves = root.getLeaves().toArray(new PrimitiveColumnIO[root.getLeaves().size()]); - ColumnReader[] columns = new ColumnReader[leaves.length]; + columns = new ColumnReader[leaves.length]; int[][] nextReader = new int[leaves.length][]; int[][] nextLevel = new int[leaves.length][]; GroupConverter[][] groupConverterPaths = new GroupConverter[leaves.length][]; @@ -444,4 +448,8 @@ protected Converter getRecordConsumer() { return recordConsumer; } + protected Iterable getColumnReaders() { + // Converting the array to an iterable ensures that the array cannot be altered + return Arrays.asList(columns); + } } diff --git a/parquet-column/src/test/java/parquet/column/values/bitpacking/TestByteBasedBitPackingEncoder.java b/parquet-column/src/test/java/parquet/column/values/bitpacking/TestByteBasedBitPackingEncoder.java index 9ca37a80e..9fd0e723c 100644 --- a/parquet-column/src/test/java/parquet/column/values/bitpacking/TestByteBasedBitPackingEncoder.java +++ b/parquet-column/src/test/java/parquet/column/values/bitpacking/TestByteBasedBitPackingEncoder.java @@ -1,3 +1,18 @@ +/** + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package parquet.column.values.bitpacking; import org.junit.Test; diff --git a/parquet-column/src/test/java/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java b/parquet-column/src/test/java/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java index 675fda634..2058dba41 100644 --- a/parquet-column/src/test/java/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java +++ b/parquet-column/src/test/java/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java @@ -1,3 +1,18 @@ +/** + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package parquet.column.values.rle; import java.io.ByteArrayInputStream; diff --git a/parquet-column/src/test/java/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java b/parquet-column/src/test/java/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java index 4e63dafd9..2f5e4b865 100644 --- a/parquet-column/src/test/java/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java +++ b/parquet-column/src/test/java/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java @@ -1,3 +1,18 @@ +/** + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package parquet.column.values.rle; import java.io.ByteArrayInputStream; diff --git a/parquet-column/src/test/java/parquet/io/TestFiltered.java b/parquet-column/src/test/java/parquet/io/TestFiltered.java new file mode 100644 index 000000000..f4d9adeb8 --- /dev/null +++ b/parquet-column/src/test/java/parquet/io/TestFiltered.java @@ -0,0 +1,148 @@ +/** + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package parquet.io; + +import org.junit.Test; +import parquet.column.impl.ColumnWriteStoreImpl; +import parquet.column.page.mem.MemPageStore; +import parquet.example.data.Group; +import parquet.example.data.GroupWriter; +import parquet.example.data.simple.convert.GroupRecordConverter; +import parquet.io.api.RecordMaterializer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static parquet.example.Paper.r1; +import static parquet.example.Paper.r2; +import static parquet.example.Paper.schema; +import static parquet.filter.AndRecordFilter.and; +import static parquet.filter.PagedRecordFilter.page; +import static parquet.filter.ColumnPredicates.equalTo; +import static parquet.filter.ColumnRecordFilter.column; + +public class TestFiltered { + + @Test + public void testFilterOnInteger() { + MemPageStore memPageStore = new MemPageStore(); + MessageColumnIO columnIO = new ColumnIOFactory(true).getColumnIO(schema); + writeTestRecords(memPageStore, columnIO, 1); + + // Get first record + RecordMaterializer recordConverter = new GroupRecordConverter(schema); + RecordReaderImplementation recordReader = (RecordReaderImplementation) + columnIO.getRecordReader(memPageStore, recordConverter, + column("DocId", equalTo(10l))); + + Group actual1= recordReader.read(); + assertNull( "There should be no more records as r2 filtered out", recordReader.read()); + assertEquals("filtering did not return the correct record", r1.toString(), actual1.toString()); + + // Get second record + recordReader = (RecordReaderImplementation) + columnIO.getRecordReader(memPageStore, recordConverter, + column("DocId", equalTo(20l))); + + Group actual2 = recordReader.read(); + assertNull( "There should be no more records as r1 filtered out", recordReader.read()); + assertEquals("filtering did not return the correct record", r2.toString(), actual2.toString()); + + } + + @Test + public void testFilterOnString() { + MemPageStore memPageStore = new MemPageStore(); + MessageColumnIO columnIO = new ColumnIOFactory(true).getColumnIO(schema); + writeTestRecords(memPageStore, columnIO, 1); + + // First try matching against the A url in record 1 + RecordMaterializer recordConverter = new GroupRecordConverter(schema); + RecordReaderImplementation recordReader = (RecordReaderImplementation) + columnIO.getRecordReader(memPageStore, recordConverter, + column("Name.Url", equalTo("http://A"))); + + Group actual1 = recordReader.read(); + assertNull( "There should be no more records as r2 filtered out", recordReader.read()); + assertEquals("filtering did not return the correct record", r1.toString(), actual1.toString()); + + // Second try matching against the B url in record 1 - it should fail as we only match + // against the first instance of a + recordReader = (RecordReaderImplementation) + columnIO.getRecordReader(memPageStore, recordConverter, + column("Name.Url", equalTo("http://B"))); + + assertNull( "There should be no matching records", recordReader.read()); + + // Finally try matching against the C url in record 2 + recordReader = (RecordReaderImplementation) + columnIO.getRecordReader(memPageStore, recordConverter, + column("Name.Url", equalTo("http://C"))); + + Group actual2 = recordReader.read(); + assertNull( "There should be no more records as r1 filtered out", recordReader.read()); + assertEquals("filtering did not return the correct record", r2.toString(), actual2.toString()); + } + + @Test + public void testPaged() { + MemPageStore memPageStore = new MemPageStore(); + MessageColumnIO columnIO = new ColumnIOFactory(true).getColumnIO(schema); + writeTestRecords(memPageStore, columnIO, 6); + + RecordMaterializer recordConverter = new GroupRecordConverter(schema); + RecordReaderImplementation recordReader = (RecordReaderImplementation) + columnIO.getRecordReader(memPageStore, recordConverter, + page(4, 4)); + + int count = 0; + while ( count < 2 ) { // starts at position 4 which should be r2 + assertEquals("expecting record2", r2.toString(), recordReader.read().toString()); + assertEquals("expecting record1", r1.toString(), recordReader.read().toString()); + count++; + } + assertNull("There should be no more records", recordReader.read()); + } + + @Test + public void testFilteredAndPaged() { + MemPageStore memPageStore = new MemPageStore(); + MessageColumnIO columnIO = new ColumnIOFactory(true).getColumnIO(schema); + writeTestRecords(memPageStore, columnIO, 8); + + RecordMaterializer recordConverter = new GroupRecordConverter(schema); + RecordReaderImplementation recordReader = (RecordReaderImplementation) + columnIO.getRecordReader(memPageStore, recordConverter, + and(column("DocId", equalTo(10l)), page(2, 4))); + + int count = 0; + while ( count < 4 ) { // starts at position 4 which should be r2 + assertEquals("expecting 4 x record1", r1.toString(), recordReader.read().toString()); + count++; + } + assertNull( "There should be no more records", recordReader.read()); + } + + private void writeTestRecords(MemPageStore memPageStore, MessageColumnIO columnIO, int number) { + ColumnWriteStoreImpl columns = new ColumnWriteStoreImpl(memPageStore, 800, 800, false); + + GroupWriter groupWriter = new GroupWriter(columnIO.getRecordWriter(columns), schema); + for ( int i = 0; i < number; i++ ) { + groupWriter.write(r1); + groupWriter.write(r2); + } + columns.flush(); + } +} diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetReader.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetReader.java index e69dca2f1..d3e196a4e 100644 --- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetReader.java +++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetReader.java @@ -26,6 +26,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import parquet.filter.RecordFilter; +import parquet.filter.UnboundRecordFilter; import parquet.hadoop.api.ReadSupport; import parquet.hadoop.api.ReadSupport.ReadContext; import parquet.hadoop.metadata.BlockMetaData; @@ -40,6 +42,10 @@ private ParquetRecordReader reader; public ParquetReader(Path file, ReadSupport readSupport) throws IOException { + this(file, readSupport, null); + } + + public ParquetReader(Path file, ReadSupport readSupport, UnboundRecordFilter filter) throws IOException { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); @@ -53,7 +59,7 @@ public ParquetReader(Path file, ReadSupport readSupport) throws IOException { MessageType schema = fileMetaData.getSchema(); Map extraMetadata = fileMetaData.getKeyValueMetaData(); final ReadContext readContext = readSupport.init(conf, extraMetadata, schema); - reader = new ParquetRecordReader(readSupport); + reader = new ParquetRecordReader(readSupport, filter); ParquetInputSplit inputSplit = new ParquetInputSplit( file, 0, 0, null, blocks, diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java index 9ddbea466..a8ccb0282 100644 --- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java +++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java @@ -30,6 +30,8 @@ import parquet.Log; import parquet.column.ColumnDescriptor; import parquet.column.page.PageReadStore; +import parquet.filter.RecordFilter; +import parquet.filter.UnboundRecordFilter; import parquet.hadoop.api.ReadSupport; import parquet.hadoop.metadata.BlockMetaData; import parquet.hadoop.util.ContextUtil; @@ -69,6 +71,7 @@ private int currentBlock = -1; private ParquetFileReader reader; private parquet.io.RecordReader recordReader; + private UnboundRecordFilter recordFilter; private long totalTimeSpentReadingBytes; private long totalTimeSpentProcessingRecords; @@ -77,11 +80,19 @@ private long totalCountLoadedSoFar = 0; /** - * @param requestedSchema the requested schema (a subset of the original schema) for record projection - * @param readSupportClass + * @param readSupport Object which helps reads files of the given tye, e.g. Thrift, Avro. */ public ParquetRecordReader(ReadSupport readSupport) { + this(readSupport, null); + } + + /** + * @param readSupport Object which helps reads files of the given tye, e.g. Thrift, Avro. + * @param filter Optional filter for only returning matching records. + */ + public ParquetRecordReader(ReadSupport readSupport, UnboundRecordFilter filter) { this.readSupport = readSupport; + this.recordFilter = filter; } private void checkRead() throws IOException { @@ -107,7 +118,7 @@ private void checkRead() throws IOException { LOG.info("block read in memory in " + timeSpentReading + " ms. row count = " + pages.getRowCount()); if (Log.DEBUG) LOG.debug("initializing Record assembly with requested schema " + requestedSchema); MessageColumnIO columnIO = columnIOFactory.getColumnIO(requestedSchema, fileSchema); - recordReader = columnIO.getRecordReader(pages, recordConverter); + recordReader = columnIO.getRecordReader(pages, recordConverter, recordFilter); startedAssemblingCurrentBlockAt = System.currentTimeMillis(); totalCountLoadedSoFar += pages.getRowCount(); ++ currentBlock; diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyCodec.java b/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyCodec.java index abe1d67ef..70c4f115f 100644 --- a/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyCodec.java +++ b/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyCodec.java @@ -1,3 +1,18 @@ +/** + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package parquet.hadoop.codec; import java.io.IOException; diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyCompressor.java b/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyCompressor.java index ac24303fd..61520d54f 100644 --- a/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyCompressor.java +++ b/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyCompressor.java @@ -1,3 +1,18 @@ +/** + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package parquet.hadoop.codec; import java.io.IOException; diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyDecompressor.java b/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyDecompressor.java index d1f5e5360..f1c9e2ab2 100644 --- a/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyDecompressor.java +++ b/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyDecompressor.java @@ -1,3 +1,18 @@ +/** + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package parquet.hadoop.codec; import java.io.IOException; diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyUtil.java b/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyUtil.java index 2b2bf1eb4..389dcce19 100644 --- a/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyUtil.java +++ b/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyUtil.java @@ -1,3 +1,18 @@ +/** + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package parquet.hadoop.codec; import parquet.Preconditions; diff --git a/parquet-hadoop/src/test/java/parquet/hadoop/TestSnappyCodec.java b/parquet-hadoop/src/test/java/parquet/hadoop/TestSnappyCodec.java index ac9532743..09c9d6831 100644 --- a/parquet-hadoop/src/test/java/parquet/hadoop/TestSnappyCodec.java +++ b/parquet-hadoop/src/test/java/parquet/hadoop/TestSnappyCodec.java @@ -1,3 +1,18 @@ +/** + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package parquet.hadoop; import java.io.IOException;