diff --git a/parquet-avro/pom.xml b/parquet-avro/pom.xml
index 59675fcae..3a032139f 100644
--- a/parquet-avro/pom.xml
+++ b/parquet-avro/pom.xml
@@ -16,6 +16,7 @@
https://github.com/Parquet/parquet-mr
+ 1.7.4
@@ -37,7 +38,7 @@
org.apache.avroavro
- 1.7.3
+ ${avro.version}org.apache.hadoop
@@ -107,6 +108,44 @@
+
+ org.apache.avro
+ avro-maven-plugin
+ ${avro.version}
+
+
+ generate-test-sources
+
+ idl-protocol
+
+
+ ${project.basedir}/src/test/resources
+ ${project.build.directory}/generated-test-sources
+ String
+
+
+
+
+
+
+ org.codehaus.mojo
+ build-helper-maven-plugin
+ 1.8
+
+
+ add-test-sources
+ generate-test-sources
+
+ add-test-source
+
+
+
+ ${project.build.directory}/generated-test-sources
+
+
+
+
+
diff --git a/parquet-avro/src/main/java/parquet/avro/AvroGenericRecordConverter.java b/parquet-avro/src/main/java/parquet/avro/AvroIndexedRecordConverter.java
similarity index 74%
rename from parquet-avro/src/main/java/parquet/avro/AvroGenericRecordConverter.java
rename to parquet-avro/src/main/java/parquet/avro/AvroIndexedRecordConverter.java
index e88af8887..b2a280530 100644
--- a/parquet-avro/src/main/java/parquet/avro/AvroGenericRecordConverter.java
+++ b/parquet-avro/src/main/java/parquet/avro/AvroIndexedRecordConverter.java
@@ -22,6 +22,9 @@
import org.apache.avro.generic.GenericArray;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.specific.SpecificData;
+import parquet.Preconditions;
import parquet.io.api.Binary;
import parquet.io.api.Converter;
import parquet.io.api.GroupConverter;
@@ -30,26 +33,28 @@
import parquet.schema.MessageType;
import parquet.schema.Type;
-class AvroGenericRecordConverter extends GroupConverter {
+class AvroIndexedRecordConverter extends GroupConverter {
private final ParentValueContainer parent;
- protected GenericData.Record currentRecord;
+ protected IndexedRecord currentRecord;
private final Converter[] converters;
private final GroupType parquetSchema;
private final Schema avroSchema;
+ private final Class extends IndexedRecord> specificClass;
- public AvroGenericRecordConverter(MessageType parquetSchema, Schema avroSchema) {
+ public AvroIndexedRecordConverter(MessageType parquetSchema, Schema avroSchema) {
this(null, parquetSchema, avroSchema);
}
- public AvroGenericRecordConverter(ParentValueContainer parent, GroupType
+ public AvroIndexedRecordConverter(ParentValueContainer parent, GroupType
parquetSchema, Schema avroSchema) {
this.parent = parent;
this.parquetSchema = parquetSchema;
this.avroSchema = avroSchema;
int schemaSize = parquetSchema.getFieldCount();
this.converters = new Converter[schemaSize];
+ this.specificClass = SpecificData.get().getClass(avroSchema);
int index = 0; // parquet ignores Avro nulls, so index may differ
for (int avroIndex = 0; avroIndex < avroSchema.getFields().size(); avroIndex++) {
Schema.Field field = avroSchema.getFields().get(avroIndex);
@@ -62,7 +67,7 @@ public AvroGenericRecordConverter(ParentValueContainer parent, GroupType
converters[index] = newConverter(fieldSchema, type, new ParentValueContainer() {
@Override
void add(Object value) {
- AvroGenericRecordConverter.this.set(finalAvroIndex, value);
+ AvroIndexedRecordConverter.this.set(finalAvroIndex, value);
}
});
index++;
@@ -86,13 +91,15 @@ private static Converter newConverter(Schema schema, Type type,
} else if (schema.getType().equals(Schema.Type.STRING)) {
return new FieldStringConverter(parent);
} else if (schema.getType().equals(Schema.Type.RECORD)) {
- return new AvroGenericRecordConverter(parent, type.asGroupType(), schema);
+ return new AvroIndexedRecordConverter(parent, type.asGroupType(), schema);
} else if (schema.getType().equals(Schema.Type.ENUM)) {
- return new FieldStringConverter(parent);
+ return new FieldEnumConverter(parent,schema);
} else if (schema.getType().equals(Schema.Type.ARRAY)) {
- return new GenericArrayConverter(parent, type, schema);
+ return new AvroArrayConverter(parent, type, schema);
} else if (schema.getType().equals(Schema.Type.MAP)) {
return new MapConverter(parent, type, schema);
+ } else if (schema.getType().equals(Schema.Type.UNION)) {
+ return new AvroUnionConverter(parent, type, schema);
} else if (schema.getType().equals(Schema.Type.FIXED)) {
return new FieldFixedConverter(parent, schema);
}
@@ -111,7 +118,10 @@ public Converter getConverter(int fieldIndex) {
@Override
public void start() {
- this.currentRecord = new GenericData.Record(avroSchema);
+ // Should do the right thing whether it is generic or specific
+ this.currentRecord = (this.specificClass == null) ?
+ new GenericData.Record(avroSchema) :
+ (IndexedRecord)SpecificData.newInstance(specificClass, avroSchema);
}
@Override
@@ -121,7 +131,7 @@ public void end() {
}
}
- GenericRecord getCurrentRecord() {
+ IndexedRecord getCurrentRecord() {
return currentRecord;
}
@@ -239,6 +249,26 @@ final public void addBinary(Binary value) {
}
+ static final class FieldEnumConverter extends PrimitiveConverter {
+
+ private final ParentValueContainer parent;
+ private final Class extends Enum> enumClass;
+
+ public FieldEnumConverter(ParentValueContainer parent, Schema enumSchema) {
+ this.parent = parent;
+ this.enumClass = SpecificData.get().getClass(enumSchema);
+ }
+
+ @Override
+ final public void addBinary(Binary value) {
+ Object enumValue = value.toStringUsingUTF8();
+ if (enumClass != null) {
+ enumValue = (Enum.valueOf(enumClass,(String)enumValue));
+ }
+ parent.add(enumValue);
+ }
+ }
+
static final class FieldFixedConverter extends PrimitiveConverter {
private final ParentValueContainer parent;
@@ -256,14 +286,14 @@ final public void addBinary(Binary value) {
}
- static final class GenericArrayConverter extends GroupConverter {
+ static final class AvroArrayConverter extends GroupConverter {
private final ParentValueContainer parent;
private final Schema avroSchema;
private final Converter converter;
private GenericArray array;
- public GenericArrayConverter(ParentValueContainer parent, Type parquetSchema,
+ public AvroArrayConverter(ParentValueContainer parent, Type parquetSchema,
Schema avroSchema) {
this.parent = parent;
this.avroSchema = avroSchema;
@@ -294,6 +324,51 @@ public void end() {
}
}
+ static final class AvroUnionConverter extends GroupConverter {
+
+ private final ParentValueContainer parent;
+ private final Converter[] memberConverters;
+ private Object memberValue = null;
+
+ public AvroUnionConverter(ParentValueContainer parent, Type parquetSchema,
+ Schema avroSchema) {
+ this.parent = parent;
+ GroupType parquetGroup = parquetSchema.asGroupType();
+ this.memberConverters = new Converter[ parquetGroup.getFieldCount()];
+
+ int parquetIndex = 0;
+ for (int index = 0; index < avroSchema.getTypes().size(); index++) {
+ Schema memberSchema = avroSchema.getTypes().get(index);
+ if (!memberSchema.getType().equals(Schema.Type.NULL)) {
+ Type memberType = parquetGroup.getType(parquetIndex);
+ memberConverters[parquetIndex] = newConverter(memberSchema, memberType, new ParentValueContainer() {
+ @Override
+ void add(Object value) {
+ Preconditions.checkArgument(memberValue==null, "Union is resolving to more than one type");
+ memberValue = value;
+ }
+ });
+ parquetIndex++; // Note for nulls the parquetIndex id not increased
+ }
+ }
+ }
+
+ @Override
+ public Converter getConverter(int fieldIndex) {
+ return memberConverters[fieldIndex];
+ }
+
+ @Override
+ public void start() {
+ memberValue = null;
+ }
+
+ @Override
+ public void end() {
+ parent.add(memberValue);
+ }
+ }
+
static final class MapConverter extends GroupConverter {
private final ParentValueContainer parent;
diff --git a/parquet-avro/src/main/java/parquet/avro/AvroParquetInputFormat.java b/parquet-avro/src/main/java/parquet/avro/AvroParquetInputFormat.java
index 01a3ae67c..8c95121d4 100644
--- a/parquet-avro/src/main/java/parquet/avro/AvroParquetInputFormat.java
+++ b/parquet-avro/src/main/java/parquet/avro/AvroParquetInputFormat.java
@@ -15,14 +15,14 @@
*/
package parquet.avro;
-import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
import parquet.avro.AvroReadSupport;
import parquet.hadoop.ParquetInputFormat;
/**
* A Hadoop {@link org.apache.hadoop.mapreduce.InputFormat} for Parquet files.
*/
-public class AvroParquetInputFormat extends ParquetInputFormat {
+public class AvroParquetInputFormat extends ParquetInputFormat {
public AvroParquetInputFormat() {
super(AvroReadSupport.class);
}
diff --git a/parquet-avro/src/main/java/parquet/avro/AvroParquetOutputFormat.java b/parquet-avro/src/main/java/parquet/avro/AvroParquetOutputFormat.java
index 095d56330..67f7ca10a 100644
--- a/parquet-avro/src/main/java/parquet/avro/AvroParquetOutputFormat.java
+++ b/parquet-avro/src/main/java/parquet/avro/AvroParquetOutputFormat.java
@@ -16,7 +16,7 @@
package parquet.avro;
import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.mapreduce.Job;
import parquet.avro.AvroWriteSupport;
import parquet.hadoop.ParquetOutputFormat;
@@ -25,7 +25,7 @@
/**
* A Hadoop {@link org.apache.hadoop.mapreduce.OutputFormat} for Parquet files.
*/
-public class AvroParquetOutputFormat extends ParquetOutputFormat {
+public class AvroParquetOutputFormat extends ParquetOutputFormat {
public static void setSchema(Job job, Schema schema) {
AvroWriteSupport.setSchema(ContextUtil.getConfiguration(job), schema);
diff --git a/parquet-avro/src/main/java/parquet/avro/AvroParquetReader.java b/parquet-avro/src/main/java/parquet/avro/AvroParquetReader.java
index 849423e0b..211895cd2 100644
--- a/parquet-avro/src/main/java/parquet/avro/AvroParquetReader.java
+++ b/parquet-avro/src/main/java/parquet/avro/AvroParquetReader.java
@@ -16,16 +16,23 @@
package parquet.avro;
import java.io.IOException;
+
+import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
+import parquet.filter.UnboundRecordFilter;
import parquet.hadoop.ParquetReader;
import parquet.hadoop.api.ReadSupport;
/**
* Read Avro records from a Parquet file.
*/
-public class AvroParquetReader extends ParquetReader {
+public class AvroParquetReader extends ParquetReader {
public AvroParquetReader(Path file) throws IOException {
super(file, (ReadSupport) new AvroReadSupport());
}
+
+ public AvroParquetReader(Path file, UnboundRecordFilter recordFilter ) throws IOException {
+ super(file, (ReadSupport) new AvroReadSupport(), recordFilter);
+ }
}
diff --git a/parquet-avro/src/main/java/parquet/avro/AvroParquetWriter.java b/parquet-avro/src/main/java/parquet/avro/AvroParquetWriter.java
index 23c7a2498..27d359cb6 100644
--- a/parquet-avro/src/main/java/parquet/avro/AvroParquetWriter.java
+++ b/parquet-avro/src/main/java/parquet/avro/AvroParquetWriter.java
@@ -17,6 +17,7 @@
import java.io.IOException;
import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import parquet.hadoop.ParquetWriter;
import parquet.hadoop.api.WriteSupport;
@@ -25,7 +26,7 @@
/**
* Write Avro records to a Parquet file.
*/
-public class AvroParquetWriter extends ParquetWriter {
+public class AvroParquetWriter extends ParquetWriter {
/** Create a new {@link AvroParquetWriter}.
*
@@ -39,21 +40,38 @@
public AvroParquetWriter(Path file, Schema avroSchema,
CompressionCodecName compressionCodecName, int blockSize,
int pageSize) throws IOException {
+ super(file, (WriteSupport)new AvroWriteSupport(new AvroSchemaConverter().convert(avroSchema), avroSchema),
+ compressionCodecName, blockSize, pageSize);
+ }
+
+ /** Create a new {@link AvroParquetWriter}.
+ *
+ * @param file The file name to write to.
+ * @param avroSchema The schema to write with.
+ * @param compressionCodecName Compression code to use, or CompressionCodecName.UNCOMPRESSED
+ * @param blockSize HDFS block size
+ * @param pageSize See parquet write up. Blocks are subdivided into pages for alignment and other purposes.
+ * @param enableDictionary Whether to use a dictionary to compress columns.
+ * @throws IOException
+ */
+ public AvroParquetWriter(Path file, Schema avroSchema,
+ CompressionCodecName compressionCodecName, int blockSize,
+ int pageSize, boolean enableDictionary) throws IOException {
super(file, (WriteSupport)
- new AvroWriteSupport(new AvroSchemaConverter().convert(avroSchema), avroSchema),
- compressionCodecName, blockSize, pageSize);
+ new AvroWriteSupport(new AvroSchemaConverter().convert(avroSchema),avroSchema),
+ compressionCodecName, blockSize, pageSize, enableDictionary, false);
}
/** Create a new {@link AvroParquetWriter}. The default block size is 50 MB.The default
* page size is 1 MB. Default compression is no compression. (Inherited from {@link ParquetWriter})
*
- * @param file
- * @param avroSchema
+ * @param file The file name to write to.
+ * @param avroSchema The schema to write with.
* @throws IOException
*/
public AvroParquetWriter(Path file, Schema avroSchema) throws IOException {
this(file, avroSchema, CompressionCodecName.UNCOMPRESSED,
- DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE);
+ DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE);
}
}
diff --git a/parquet-avro/src/main/java/parquet/avro/AvroReadSupport.java b/parquet-avro/src/main/java/parquet/avro/AvroReadSupport.java
index dc1f088df..1ea892996 100644
--- a/parquet-avro/src/main/java/parquet/avro/AvroReadSupport.java
+++ b/parquet-avro/src/main/java/parquet/avro/AvroReadSupport.java
@@ -17,18 +17,18 @@
import java.util.Map;
import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import parquet.hadoop.api.ReadSupport;
import parquet.io.api.RecordMaterializer;
import parquet.schema.MessageType;
/**
- * Avro implementation of {@link ReadSupport} for {@link GenericRecord}s. Users should
- * use {@link AvroParquetReader} or {@link AvroParquetInputFormat} rather than using
+ * Avro implementation of {@link ReadSupport} for Avro {@link IndexedRecord}s which cover both Avro Specific and
+ * Generic. Users should use {@link AvroParquetReader} or {@link AvroParquetInputFormat} rather than using
* this class directly.
*/
-public class AvroReadSupport extends ReadSupport {
+public class AvroReadSupport extends ReadSupport {
@Override
public ReadContext init(Configuration configuration, Map keyValueMetaData, MessageType fileSchema) {
@@ -36,7 +36,7 @@ public ReadContext init(Configuration configuration, Map keyValu
}
@Override
- public RecordMaterializer prepareForRead(Configuration configuration, Map keyValueMetaData, MessageType fileSchema, ReadContext readContext) {
+ public RecordMaterializer prepareForRead(Configuration configuration, Map keyValueMetaData, MessageType fileSchema, ReadContext readContext) {
Schema avroSchema = new Schema.Parser().parse(keyValueMetaData.get("avro.schema"));
return new AvroRecordMaterializer(readContext.getRequestedSchema(), avroSchema);
}
diff --git a/parquet-avro/src/main/java/parquet/avro/AvroRecordMaterializer.java b/parquet-avro/src/main/java/parquet/avro/AvroRecordMaterializer.java
index 3a3d1f624..e9df29f25 100644
--- a/parquet-avro/src/main/java/parquet/avro/AvroRecordMaterializer.java
+++ b/parquet-avro/src/main/java/parquet/avro/AvroRecordMaterializer.java
@@ -16,21 +16,21 @@
package parquet.avro;
import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
import parquet.io.api.GroupConverter;
import parquet.io.api.RecordMaterializer;
import parquet.schema.MessageType;
-class AvroRecordMaterializer extends RecordMaterializer {
+class AvroRecordMaterializer extends RecordMaterializer {
- private AvroGenericRecordConverter root;
+ private AvroIndexedRecordConverter root;
public AvroRecordMaterializer(MessageType requestedSchema, Schema avroSchema) {
- this.root = new AvroGenericRecordConverter(requestedSchema, avroSchema);
+ this.root = new AvroIndexedRecordConverter(requestedSchema, avroSchema);
}
@Override
- public GenericRecord getCurrentRecord() {
+ public IndexedRecord getCurrentRecord() {
return root.getCurrentRecord();
}
diff --git a/parquet-avro/src/main/java/parquet/avro/AvroSchemaConverter.java b/parquet-avro/src/main/java/parquet/avro/AvroSchemaConverter.java
index a7c28d521..9a5524a0d 100644
--- a/parquet-avro/src/main/java/parquet/avro/AvroSchemaConverter.java
+++ b/parquet-avro/src/main/java/parquet/avro/AvroSchemaConverter.java
@@ -15,8 +15,8 @@
*/
package parquet.avro;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.*;
+
import org.apache.avro.Schema;
import parquet.schema.GroupType;
import parquet.schema.MessageType;
@@ -114,19 +114,39 @@ private Type convertField(String fieldName, Schema schema, Type.Repetition repet
} else if (type.equals(Schema.Type.FIXED)) {
return primitive(fieldName, FIXED_LEN_BYTE_ARRAY, repetition);
} else if (type.equals(Schema.Type.UNION)) {
- List schemas = schema.getTypes();
- if (schemas.size() == 2) {
- if (schemas.get(0).getType().equals(Schema.Type.NULL)) {
- return convertField(fieldName, schemas.get(1), Type.Repetition.OPTIONAL);
- } else if (schemas.get(1).getType().equals(Schema.Type.NULL)) {
- return convertField(fieldName, schemas.get(0), Type.Repetition.OPTIONAL);
- }
- }
- throw new UnsupportedOperationException("Cannot convert Avro type " + type);
+ return convertUnion(fieldName, schema, repetition);
}
throw new UnsupportedOperationException("Cannot convert Avro type " + type);
}
+ private Type convertUnion(String fieldName, Schema schema, Type.Repetition repetition) {
+ List nonNullSchemas = new ArrayList(schema.getTypes().size());
+ for (Schema childSchema : schema.getTypes()) {
+ if (childSchema.getType().equals(Schema.Type.NULL)) {
+ repetition = Type.Repetition.OPTIONAL;
+ } else {
+ nonNullSchemas.add(childSchema);
+ }
+ }
+ // If we only get a null and one other type then its a simple optional field
+ // otherwise construct a union container
+ switch (nonNullSchemas.size()) {
+ case 0:
+ throw new UnsupportedOperationException("Cannot convert Avro union of only nulls");
+
+ case 1:
+ return convertField(fieldName, nonNullSchemas.get(0), Type.Repetition.OPTIONAL); // Simple optional field
+
+ default: // complex union type
+ List unionTypes = new ArrayList(nonNullSchemas.size());
+ int index = 0;
+ for (Schema childSchema : nonNullSchemas) {
+ unionTypes.add( convertField("member" + index++, childSchema, Type.Repetition.OPTIONAL));
+ }
+ return new GroupType(repetition, fieldName, unionTypes);
+ }
+ }
+
private Type convertField(Schema.Field field) {
return convertField(field.name(), field.schema());
}
diff --git a/parquet-avro/src/main/java/parquet/avro/AvroWriteSupport.java b/parquet-avro/src/main/java/parquet/avro/AvroWriteSupport.java
index 661ba7a9f..fc5ec70b7 100644
--- a/parquet-avro/src/main/java/parquet/avro/AvroWriteSupport.java
+++ b/parquet-avro/src/main/java/parquet/avro/AvroWriteSupport.java
@@ -21,8 +21,11 @@
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericArray;
+import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericFixed;
-import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.conf.Configuration;
import parquet.hadoop.api.WriteSupport;
@@ -33,11 +36,11 @@
import parquet.schema.Type;
/**
- * Avro implementation of {@link WriteSupport} for {@link GenericRecord}s. Users should
- * use {@link AvroParquetWriter} or {@link AvroParquetOutputFormat} rather than using
+ * Avro implementation of {@link WriteSupport} for {@link IndexedRecord}s - both Avro Generic and Specific.
+ * Users should use {@link AvroParquetWriter} or {@link AvroParquetOutputFormat} rather than using
* this class directly.
*/
-public class AvroWriteSupport extends WriteSupport {
+public class AvroWriteSupport extends WriteSupport {
private RecordConsumer recordConsumer;
private MessageType rootSchema;
@@ -72,21 +75,21 @@ public void prepareForWrite(RecordConsumer recordConsumer) {
}
@Override
- public void write(GenericRecord record) {
+ public void write(IndexedRecord record) {
recordConsumer.startMessage();
writeRecordFields(rootSchema, rootAvroSchema, record);
recordConsumer.endMessage();
}
private void writeRecord(GroupType schema, Schema avroSchema,
- GenericRecord record) {
+ IndexedRecord record) {
recordConsumer.startGroup();
writeRecordFields(schema, avroSchema, record);
recordConsumer.endGroup();
}
private void writeRecordFields(GroupType schema, Schema avroSchema,
- GenericRecord record) {
+ IndexedRecord record) {
List fields = schema.getFields();
List avroFields = avroSchema.getFields();
int index = 0; // parquet ignores Avro nulls, so index may differ
@@ -109,7 +112,7 @@ private void writeRecordFields(GroupType schema, Schema avroSchema,
}
private void writeArray(GroupType schema, Schema avroSchema,
- GenericArray array) {
+ Iterable array) {
recordConsumer.startGroup(); // group wrapper (original type LIST)
recordConsumer.startField("array", 0);
for (T elt : array) {
@@ -143,6 +146,31 @@ private void writeRecordFields(GroupType schema, Schema avroSchema,
recordConsumer.endGroup();
}
+ private void writeUnion(GroupType parquetSchema, Schema avroSchema, Object value) {
+
+ recordConsumer.startGroup();
+
+ // ResolveUnion will tell us which of the union member types to deserialise
+ int avroIndex = GenericData.get().resolveUnion(avroSchema, value);
+
+ // For parquet's schema we skip nulls
+ GroupType parquetGroup = parquetSchema.asGroupType();
+ int parquetIndex = avroIndex;
+ for ( int i=0; i) value);
+ writeArray((GroupType) type, nonNullAvroSchema, (Iterable>) value);
} else if (avroType.equals(Schema.Type.MAP)) {
writeMap((GroupType) type, nonNullAvroSchema, (Map) value);
+ } else if (avroType.equals(Schema.Type.UNION)) {
+ writeUnion((GroupType) type, nonNullAvroSchema, value);
} else if (avroType.equals(Schema.Type.FIXED)) {
recordConsumer.addBinary(Binary.fromByteArray(((GenericFixed) value).bytes()));
}
diff --git a/parquet-avro/src/test/java/parquet/avro/TestAvroSchemaConverter.java b/parquet-avro/src/test/java/parquet/avro/TestAvroSchemaConverter.java
index 5c87ab7e8..2b5ce9abb 100644
--- a/parquet-avro/src/test/java/parquet/avro/TestAvroSchemaConverter.java
+++ b/parquet-avro/src/test/java/parquet/avro/TestAvroSchemaConverter.java
@@ -89,14 +89,25 @@ public void testOptionalFields() throws Exception {
"}\n");
}
- @Test(expected = UnsupportedOperationException.class)
- public void testNonNullUnion() {
- Schema union = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.INT),
- Schema.create(Schema.Type.LONG)));
- Schema schema = Schema.createRecord("record1", null, null, false);
+ @Test
+ public void testUnionOfTwoTypes() throws Exception {
+ Schema schema = Schema.createRecord("record2", null, null, false);
+ Schema multipleTypes = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type
+ .NULL),
+ Schema.create(Schema.Type.INT),
+ Schema.create(Schema.Type.FLOAT)));
schema.setFields(Arrays.asList(
- new Schema.Field("myunion", union, null, null)));
+ new Schema.Field("myunion", multipleTypes, null, NullNode.getInstance())
+ ));
- new AvroSchemaConverter().convert(schema);
+ // Avro union is modelled using optional data members of thw different types;
+ testConversion(
+ schema,
+ "message record2 {\n" +
+ " optional group myunion {\n" +
+ " optional int32 member0;\n" +
+ " optional float member1;\n" +
+ " }\n" +
+ "}\n");
}
}
diff --git a/parquet-avro/src/test/java/parquet/avro/TestSpecificReadWrite.java b/parquet-avro/src/test/java/parquet/avro/TestSpecificReadWrite.java
new file mode 100644
index 000000000..aaa5d9b17
--- /dev/null
+++ b/parquet-avro/src/test/java/parquet/avro/TestSpecificReadWrite.java
@@ -0,0 +1,180 @@
+/**
+ * Copyright 2012 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package parquet.avro;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.hadoop.fs.Path;
+import org.junit.Assert;
+import org.junit.Test;
+import parquet.hadoop.ParquetReader;
+import parquet.hadoop.ParquetWriter;
+import parquet.hadoop.metadata.CompressionCodecName;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static parquet.filter.ColumnRecordFilter.column;
+import static parquet.filter.ColumnPredicates.equalTo;
+import static parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
+import static parquet.hadoop.ParquetWriter.DEFAULT_PAGE_SIZE;
+
+/**
+ * Other tests exercise the use of Avro Generic, a dynamic data representation. This class focuses
+ * on Avro Speific whose schemas are pre-compiled to POJOs with built in SerDe for faster serialization.
+ */
+public class TestSpecificReadWrite {
+
+ @Test
+ public void testReadWriteSpecific() throws IOException {
+ Path path = writeCarsToParquetFile(10, CompressionCodecName.UNCOMPRESSED, false);
+ ParquetReader reader = new AvroParquetReader(path);
+ for (int i = 0; i < 10; i++) {
+ assertEquals(getVwPolo().toString(),reader.read().toString());
+ assertEquals(getVwPassat().toString(),reader.read().toString());
+ assertEquals(getBmwMini().toString(),reader.read().toString());
+ }
+ assertNull(reader.read());
+ }
+
+ @Test
+ public void testReadWriteSpecificWithDictionary() throws IOException {
+ Path path = writeCarsToParquetFile(10, CompressionCodecName.UNCOMPRESSED, true);
+ ParquetReader reader = new AvroParquetReader(path);
+ for (int i = 0; i < 10; i++) {
+ assertEquals(getVwPolo().toString(),reader.read().toString());
+ assertEquals(getVwPassat().toString(),reader.read().toString());
+ assertEquals(getBmwMini().toString(),reader.read().toString());
+ }
+ assertNull(reader.read());
+ }
+
+ @Test
+ public void testFilterMatchesMultiple() throws IOException {
+
+ Path path = writeCarsToParquetFile(10, CompressionCodecName.UNCOMPRESSED, false);
+
+ ParquetReader reader = new AvroParquetReader(path, column("make", equalTo("Volkswagen")));
+ for (int i = 0; i < 10; i++) {
+ assertEquals(getVwPolo().toString(),reader.read().toString());
+ assertEquals(getVwPassat().toString(),reader.read().toString());
+ }
+ assertNull( reader.read());
+ }
+
+ @Test
+ public void testFilterWithDictionary() throws IOException {
+
+ Path path = writeCarsToParquetFile(1,CompressionCodecName.UNCOMPRESSED,true);
+
+ ParquetReader reader = new AvroParquetReader(path, column("make", equalTo("Volkswagen")));
+ assertEquals(getVwPolo().toString(),reader.read().toString());
+ assertEquals(getVwPassat().toString(),reader.read().toString());
+ assertNull( reader.read());
+ }
+
+ @Test
+ public void testFilterOnSubAttribute() throws IOException {
+
+ Path path = writeCarsToParquetFile(1, CompressionCodecName.UNCOMPRESSED, false);
+
+ ParquetReader reader = new AvroParquetReader(path, column("engine.type", equalTo(EngineType.DIESEL)));
+ assertEquals(reader.read().toString(),getVwPassat().toString());
+ assertNull( reader.read());
+
+ reader = new AvroParquetReader(path, column("engine.capacity", equalTo(1.4f)));
+ assertEquals(getVwPolo().toString(),reader.read().toString());
+ assertNull( reader.read());
+
+
+ reader = new AvroParquetReader(path, column("engine.hasTurboCharger", equalTo(true)));
+ assertEquals(getBmwMini().toString(),reader.read().toString());
+ assertNull( reader.read());
+ }
+
+ private Path writeCarsToParquetFile( int num, CompressionCodecName compression, boolean enableDictionary) throws IOException {
+ File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
+ tmp.deleteOnExit();
+ tmp.delete();
+ Path path = new Path(tmp.getPath());
+
+ Car vwPolo = getVwPolo();
+ Car vwPassat = getVwPassat();
+ Car bmwMini = getBmwMini();
+
+ ParquetWriter writer = new AvroParquetWriter(path,Car.SCHEMA$, compression,
+ DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE, enableDictionary);
+ for (int i = 0; i < num; i++) {
+ writer.write(vwPolo);
+ writer.write(vwPassat);
+ writer.write(bmwMini);
+ }
+ writer.close();
+ return path;
+ }
+
+ public static Car getVwPolo() {
+ return Car.newBuilder()
+ .setYear(2010)
+ .setRegistration("A123 GTR")
+ .setMake("Volkswagen")
+ .setModel("Polo")
+ .setDoors(4)
+ .setEngine(Engine.newBuilder().setType(EngineType.PETROL)
+ .setCapacity(1.4f).setHasTurboCharger(false).build())
+ .setOptionalExtra(
+ Stereo.newBuilder().setMake("Blaupunkt").setSpeakers(4).build())
+ .setServiceHistory(ImmutableList.of(
+ Service.newBuilder().setDate(1325376000l).setMechanic("Jim").build(),
+ Service.newBuilder().setDate(1356998400l).setMechanic("Mike").build()
+ ))
+ .build();
+ }
+
+ public static Car getVwPassat() {
+ return Car.newBuilder()
+ .setYear(2010)
+ .setRegistration("A123 GXR")
+ .setMake("Volkswagen")
+ .setModel("Passat")
+ .setDoors(5)
+ .setEngine(Engine.newBuilder().setType(EngineType.DIESEL)
+ .setCapacity(2.0f).setHasTurboCharger(false).build())
+ .setOptionalExtra(
+ LeatherTrim.newBuilder().setColour("Black").build())
+ .setServiceHistory(ImmutableList.of(
+ Service.newBuilder().setDate(1325376000l).setMechanic("Jim").build()
+ ))
+ .build();
+ }
+
+ public static Car getBmwMini() {
+ return Car.newBuilder()
+ .setYear(2010)
+ .setRegistration("A124 GSR")
+ .setMake("BMW")
+ .setModel("Mini")
+ .setDoors(4)
+ .setEngine(Engine.newBuilder().setType(EngineType.PETROL)
+ .setCapacity(1.6f).setHasTurboCharger(true).build())
+ .setOptionalExtra(null)
+ .setServiceHistory(ImmutableList.of(
+ Service.newBuilder().setDate(1356998400l).setMechanic("Mike").build()
+ ))
+ .build();
+ }
+}
diff --git a/parquet-avro/src/test/resources/car.avdl b/parquet-avro/src/test/resources/car.avdl
new file mode 100644
index 000000000..e330aaec4
--- /dev/null
+++ b/parquet-avro/src/test/resources/car.avdl
@@ -0,0 +1,38 @@
+@namespace("parquet.avro")
+protocol Cars {
+
+ record Service {
+ long date;
+ string mechanic;
+ }
+
+ record Stereo {
+ string make;
+ int speakers;
+ }
+
+ record LeatherTrim {
+ string colour;
+ }
+
+ enum EngineType {
+ DIESEL, PETROL
+ }
+
+ record Engine {
+ EngineType type;
+ float capacity;
+ boolean hasTurboCharger;
+ }
+
+ record Car {
+ long year;
+ string registration;
+ string make;
+ string model;
+ int doors;
+ Engine engine;
+ union { null, Stereo, LeatherTrim } optionalExtra = null;
+ array serviceHistory;
+ }
+}
\ No newline at end of file
diff --git a/parquet-column/src/main/java/parquet/Ints.java b/parquet-column/src/main/java/parquet/Ints.java
index 667540b1f..1e7f51aed 100644
--- a/parquet-column/src/main/java/parquet/Ints.java
+++ b/parquet-column/src/main/java/parquet/Ints.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright 2012 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package parquet;
/**
diff --git a/parquet-column/src/main/java/parquet/column/ColumnReader.java b/parquet-column/src/main/java/parquet/column/ColumnReader.java
index 064e62856..149250f8f 100644
--- a/parquet-column/src/main/java/parquet/column/ColumnReader.java
+++ b/parquet-column/src/main/java/parquet/column/ColumnReader.java
@@ -98,4 +98,14 @@
* @return the current value
*/
double getDouble();
+
+ /**
+ * @return Descriptor of the column.
+ */
+ ColumnDescriptor getDescriptor();
+
+ /**
+ * Skip the current value
+ */
+ void skip();
}
diff --git a/parquet-column/src/main/java/parquet/column/impl/ColumnReaderImpl.java b/parquet-column/src/main/java/parquet/column/impl/ColumnReaderImpl.java
index 49e44e3b9..f7371d56b 100644
--- a/parquet-column/src/main/java/parquet/column/impl/ColumnReaderImpl.java
+++ b/parquet-column/src/main/java/parquet/column/impl/ColumnReaderImpl.java
@@ -47,6 +47,7 @@
private static abstract class Binding {
abstract void read();
+ abstract void skip();
abstract void writeValue();
public int getDictionaryId() {
throw new UnsupportedOperationException();
@@ -99,6 +100,10 @@ private void bindToDictionary(final Dictionary dictionary) {
void read() {
dictionaryId = dataColumn.readValueDictionaryId();
}
+ public void skip() {
+ // Type is not important here as its just a key
+ dataColumn.skipBytes();
+ }
public int getDictionaryId() {
return dictionaryId;
}
@@ -135,6 +140,10 @@ public Binding convertFLOAT(PrimitiveTypeName primitiveTypeName) {
void read() {
current = dataColumn.readFloat();
}
+ public void skip() {
+ current = 0;
+ dataColumn.skipFloat();
+ }
public float getFloat() {
return current;
}
@@ -150,6 +159,10 @@ public Binding convertDOUBLE(PrimitiveTypeName primitiveTypeName) {
void read() {
current = dataColumn.readDouble();
}
+ public void skip() {
+ current = 0;
+ dataColumn.skipDouble();
+ }
public double getDouble() {
return current;
}
@@ -165,6 +178,10 @@ public Binding convertINT32(PrimitiveTypeName primitiveTypeName) {
void read() {
current = dataColumn.readInteger();
}
+ public void skip() {
+ current = 0;
+ dataColumn.skipInteger();
+ }
@Override
public int getInteger() {
return current;
@@ -181,6 +198,10 @@ public Binding convertINT64(PrimitiveTypeName primitiveTypeName) {
void read() {
current = dataColumn.readLong();
}
+ public void skip() {
+ current = 0;
+ dataColumn.skipLong();
+ }
@Override
public long getLong() {
return current;
@@ -206,6 +227,10 @@ public Binding convertBOOLEAN(PrimitiveTypeName primitiveTypeName) {
void read() {
current = dataColumn.readBoolean();
}
+ public void skip() {
+ current = false;
+ dataColumn.skipBoolean();
+ }
@Override
public boolean getBoolean() {
return current;
@@ -222,6 +247,10 @@ public Binding convertBINARY(PrimitiveTypeName primitiveTypeName) {
void read() {
current = dataColumn.readBytes();
}
+ public void skip() {
+ current = null;
+ dataColumn.skipBytes();
+ }
@Override
public Binary getBinary() {
return current;
@@ -268,7 +297,7 @@ public ColumnReaderImpl(ColumnDescriptor path, PageReader pageReader, PrimitiveC
*/
@Override
public boolean isFullyConsumed() {
- return readValues >= totalValueCount;
+ return ((readValues == totalValueCount) && consumed) || (readValues > totalValueCount);
}
/**
@@ -277,13 +306,13 @@ public boolean isFullyConsumed() {
*/
@Override
public void writeCurrentValueToConverter() {
- checkValueRead();
+ readIfPossible(false);
this.binding.writeValue();
}
@Override
public int getCurrentValueDictionaryID() {
- checkValueRead();
+ readIfPossible(false);
return binding.getDictionaryId();
}
@@ -293,7 +322,7 @@ public int getCurrentValueDictionaryID() {
*/
@Override
public int getInteger() {
- checkValueRead();
+ readIfPossible(false);
return this.binding.getInteger();
}
@@ -303,7 +332,7 @@ public int getInteger() {
*/
@Override
public boolean getBoolean() {
- checkValueRead();
+ readIfPossible(false);
return this.binding.getBoolean();
}
@@ -313,7 +342,7 @@ public boolean getBoolean() {
*/
@Override
public long getLong() {
- checkValueRead();
+ readIfPossible(false);
return this.binding.getLong();
}
@@ -323,7 +352,7 @@ public long getLong() {
*/
@Override
public Binary getBinary() {
- checkValueRead();
+ readIfPossible(false);
return this.binding.getBinary();
}
@@ -333,7 +362,7 @@ public Binary getBinary() {
*/
@Override
public float getFloat() {
- checkValueRead();
+ readIfPossible(false);
return this.binding.getFloat();
}
@@ -343,7 +372,7 @@ public float getFloat() {
*/
@Override
public double getDouble() {
- checkValueRead();
+ readIfPossible(false);
return this.binding.getDouble();
}
@@ -358,17 +387,34 @@ public int getCurrentRepetitionLevel() {
}
/**
+ * {@inheritDoc}
+ * @see parquet.column.ColumnReader#getDescriptor()
+ */
+ @Override
+ public ColumnDescriptor getDescriptor() {
+ return path;
+ }
+
+ /**
* reads the current value
*/
public void readCurrentValue() {
binding.read();
}
- protected void checkValueRead() {
+ /**
+ * Reads the value into the binding or skips forwards.
+ * @param skip If true do not deserialize just skip forwards
+ */
+ protected void readIfPossible(boolean skip) {
try {
checkRead();
if (!consumed && !valueRead) {
- readCurrentValue();
+ if ( skip ) {
+ binding.skip();
+ } else {
+ readCurrentValue();
+ }
valueRead = true;
}
} catch (RuntimeException e) {
@@ -382,6 +428,15 @@ protected void checkValueRead() {
/**
* {@inheritDoc}
+ * @see parquet.column.ColumnReader#skip()
+ */
+ @Override
+ public void skip() {
+ readIfPossible(true);
+ }
+
+ /**
+ * {@inheritDoc}
* @see parquet.column.ColumnReader#getCurrentDefinitionLevel()
*/
@Override
diff --git a/parquet-column/src/main/java/parquet/column/values/ValuesReader.java b/parquet-column/src/main/java/parquet/column/values/ValuesReader.java
index d5f96c131..5086e60ac 100644
--- a/parquet-column/src/main/java/parquet/column/values/ValuesReader.java
+++ b/parquet-column/src/main/java/parquet/column/values/ValuesReader.java
@@ -72,6 +72,13 @@ public boolean readBoolean() {
}
/**
+ * Skips the next boolean in the page
+ */
+ public void skipBoolean() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
* @return the next Binary from the page
*/
public Binary readBytes() {
@@ -79,6 +86,13 @@ public Binary readBytes() {
}
/**
+ * Skips the next Binary in the page
+ */
+ public void skipBytes() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
* @return the next float from the page
*/
public float readFloat() {
@@ -86,6 +100,13 @@ public float readFloat() {
}
/**
+ * Skips the next float in the page
+ */
+ public void skipFloat() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
* @return the next double from the page
*/
public double readDouble() {
@@ -93,6 +114,13 @@ public double readDouble() {
}
/**
+ * Skips the next float in the page
+ */
+ public void skipDouble() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
* @return the next integer from the page
*/
public int readInteger() {
@@ -100,10 +128,23 @@ public int readInteger() {
}
/**
+ * Skips the next integer in the page
+ */
+ public void skipInteger() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
* @return the next long from the page
*/
public long readLong() {
throw new UnsupportedOperationException();
}
+ /**
+ * Skips the next long in the page
+ */
+ public void skipLong() {
+ throw new UnsupportedOperationException();
+ }
}
\ No newline at end of file
diff --git a/parquet-column/src/main/java/parquet/column/values/dictionary/DictionaryValuesReader.java b/parquet-column/src/main/java/parquet/column/values/dictionary/DictionaryValuesReader.java
index 8cafa093c..b2920b077 100644
--- a/parquet-column/src/main/java/parquet/column/values/dictionary/DictionaryValuesReader.java
+++ b/parquet-column/src/main/java/parquet/column/values/dictionary/DictionaryValuesReader.java
@@ -77,4 +77,12 @@ public Binary readBytes() {
}
}
+ @Override
+ public void skipBytes() {
+ try {
+ decoder.readInt(); // Type does not matter as we are just skipping dictionary keys
+ } catch (IOException e) {
+ throw new ParquetDecodingException(e);
+ }
+ }
}
diff --git a/parquet-column/src/main/java/parquet/column/values/plain/BinaryPlainValuesReader.java b/parquet-column/src/main/java/parquet/column/values/plain/BinaryPlainValuesReader.java
index 238e749b0..6a009dbd8 100644
--- a/parquet-column/src/main/java/parquet/column/values/plain/BinaryPlainValuesReader.java
+++ b/parquet-column/src/main/java/parquet/column/values/plain/BinaryPlainValuesReader.java
@@ -45,6 +45,18 @@ public Binary readBytes() {
}
@Override
+ public void skipBytes() {
+ try {
+ int length = BytesUtils.readIntLittleEndian(in, offset);
+ offset += 4 + length;
+ } catch (IOException e) {
+ throw new ParquetDecodingException("could not skip bytes at offset " + offset, e);
+ } catch (RuntimeException e) {
+ throw new ParquetDecodingException("could not skip bytes at offset " + offset, e);
+ }
+ }
+
+ @Override
public int initFromPage(long valueCount, byte[] in, int offset)
throws IOException {
if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.length - offset));
diff --git a/parquet-column/src/main/java/parquet/column/values/plain/BooleanPlainValuesReader.java b/parquet-column/src/main/java/parquet/column/values/plain/BooleanPlainValuesReader.java
index c259f245f..dfe3cd48a 100644
--- a/parquet-column/src/main/java/parquet/column/values/plain/BooleanPlainValuesReader.java
+++ b/parquet-column/src/main/java/parquet/column/values/plain/BooleanPlainValuesReader.java
@@ -45,6 +45,14 @@ public boolean readBoolean() {
return in.readInteger() == 0 ? false : true;
}
+ /**
+ * {@inheritDoc}
+ * @see parquet.column.values.ValuesReader#skipBoolean()
+ */
+ @Override
+ public void skipBoolean() {
+ in.readInteger();
+ }
/**
* {@inheritDoc}
diff --git a/parquet-column/src/main/java/parquet/column/values/plain/PlainValuesReader.java b/parquet-column/src/main/java/parquet/column/values/plain/PlainValuesReader.java
index 2bf574b40..fdf7e9e78 100644
--- a/parquet-column/src/main/java/parquet/column/values/plain/PlainValuesReader.java
+++ b/parquet-column/src/main/java/parquet/column/values/plain/PlainValuesReader.java
@@ -46,6 +46,15 @@ public float readFloat() {
}
@Override
+ public void skipFloat() {
+ try {
+ in.skipBytes(4);
+ } catch (IOException e) {
+ throw new ParquetDecodingException("could not skip float", e);
+ }
+ }
+
+ @Override
public double readDouble() {
try {
return in.readDouble();
@@ -55,6 +64,15 @@ public double readDouble() {
}
@Override
+ public void skipDouble() {
+ try {
+ in.skipBytes(8);
+ } catch (IOException e) {
+ throw new ParquetDecodingException("could not skip double", e);
+ }
+ }
+
+ @Override
public int readInteger() {
try {
return in.readInt();
@@ -64,6 +82,15 @@ public int readInteger() {
}
@Override
+ public void skipInteger() {
+ try {
+ in.skipBytes(4);
+ } catch (IOException e) {
+ throw new ParquetDecodingException("could not skip int", e);
+ }
+ }
+
+ @Override
public long readLong() {
try {
return in.readLong();
@@ -72,6 +99,15 @@ public long readLong() {
}
}
+ @Override
+ public void skipLong() {
+ try {
+ in.skipBytes(8);
+ } catch (IOException e) {
+ throw new ParquetDecodingException("could not skip long", e);
+ }
+ }
+
/**
* {@inheritDoc}
* @see parquet.column.values.ValuesReader#initFromPage(byte[], int)
diff --git a/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java b/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java
index 76f5f81c4..115898932 100644
--- a/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java
+++ b/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright 2012 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package parquet.column.values.rle;
import java.io.IOException;
diff --git a/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java b/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java
index f00f4134f..fb2e37cd9 100644
--- a/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java
+++ b/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright 2012 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package parquet.column.values.rle;
import java.io.ByteArrayInputStream;
diff --git a/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java b/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java
index c1a3b4ce0..817030c0c 100644
--- a/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java
+++ b/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright 2012 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package parquet.column.values.rle;
import java.io.ByteArrayOutputStream;
diff --git a/parquet-column/src/main/java/parquet/filter/AndRecordFilter.java b/parquet-column/src/main/java/parquet/filter/AndRecordFilter.java
new file mode 100644
index 000000000..04221debf
--- /dev/null
+++ b/parquet-column/src/main/java/parquet/filter/AndRecordFilter.java
@@ -0,0 +1,66 @@
+/**
+ * Copyright 2012 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package parquet.filter;
+
+import parquet.Preconditions;
+import parquet.column.ColumnReader;
+
+/**
+ * Provides ability to chain two filters together. Bear in mind that the first one will
+ * short circuit the second. Useful if getting a page of already filtered result.
+ * i.e and( column("manufacturer", equalTo("Volkswagen")), page(100,50))
+ *
+ * @author Jacob Metcalf
+ */
+public final class AndRecordFilter implements RecordFilter {
+
+ private final RecordFilter boundFilter1;
+ private final RecordFilter boundFilter2;
+
+ /**
+ * Returns builder for creating an and filter.
+ * @param filter1 The first filter to check.
+ * @param filter2 The second filter to check.
+ */
+ public static final UnboundRecordFilter and( final UnboundRecordFilter filter1, final UnboundRecordFilter filter2 ) {
+ Preconditions.checkNotNull( filter1, "filter1" );
+ Preconditions.checkNotNull( filter2, "filter2" );
+ return new UnboundRecordFilter() {
+ @Override
+ public RecordFilter bind(Iterable readers) {
+ return new AndRecordFilter( filter1.bind(readers), filter2.bind( readers) );
+ }
+ };
+ }
+
+ /**
+ * Private constructor, use AndRecordFilter.and() instead.
+ */
+ private AndRecordFilter( RecordFilter boundFilter1, RecordFilter boundFilter2 ) {
+ this.boundFilter1 = boundFilter1;
+ this.boundFilter2 = boundFilter2;
+ }
+
+ @Override
+ public boolean isFullyConsumed() {
+ return boundFilter1.isFullyConsumed() && boundFilter2.isFullyConsumed();
+ }
+
+ @Override
+ public boolean isMatch() {
+ return boundFilter1.isMatch() && boundFilter2.isMatch();
+ }
+}
diff --git a/parquet-column/src/main/java/parquet/filter/ColumnPredicates.java b/parquet-column/src/main/java/parquet/filter/ColumnPredicates.java
new file mode 100644
index 000000000..b8fc10b3f
--- /dev/null
+++ b/parquet-column/src/main/java/parquet/filter/ColumnPredicates.java
@@ -0,0 +1,97 @@
+/**
+ * Copyright 2012 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package parquet.filter;
+
+import parquet.Preconditions;
+import parquet.column.ColumnReader;
+
+/**
+ * ColumnPredicates class provides checks for column values. Factory methods
+ * are provided for standard predicates which wrap the job of getting the
+ * correct value from the column.
+ */
+public class ColumnPredicates {
+
+ public static interface Predicate {
+ boolean apply(ColumnReader input);
+ }
+
+ public static Predicate equalTo(final String target) {
+ Preconditions.checkNotNull(target,"target");
+ return new Predicate() {
+ @Override
+ public boolean apply(ColumnReader input) {
+ return target.equals(input.getBinary().toStringUsingUTF8());
+ }
+ };
+ }
+
+ public static Predicate equalTo(final int target) {
+ return new Predicate() {
+ @Override
+ public boolean apply(ColumnReader input) {
+ return input.getInteger() == target;
+ }
+ };
+ }
+
+ public static Predicate equalTo(final long target) {
+ return new Predicate() {
+ @Override
+ public boolean apply(ColumnReader input) {
+ return input.getLong() == target;
+ }
+ };
+ }
+
+ public static Predicate equalTo(final float target) {
+ return new Predicate() {
+ @Override
+ public boolean apply(ColumnReader input) {
+ return input.getFloat() == target;
+ }
+ };
+ }
+
+ public static Predicate equalTo(final double target) {
+ return new Predicate() {
+ @Override
+ public boolean apply(ColumnReader input) {
+ return input.getDouble() == target;
+ }
+ };
+ }
+
+ public static Predicate equalTo(final boolean target) {
+ return new Predicate() {
+ @Override
+ public boolean apply(ColumnReader input) {
+ return input.getBoolean() == target;
+ }
+ };
+ }
+
+ public static Predicate equalTo(final E target) {
+ Preconditions.checkNotNull(target,"target");
+ final String targetAsString = target.name();
+ return new Predicate() {
+ @Override
+ public boolean apply(ColumnReader input) {
+ return targetAsString.equals(input.getBinary().toStringUsingUTF8());
+ }
+ };
+ }
+}
diff --git a/parquet-column/src/main/java/parquet/filter/ColumnRecordFilter.java b/parquet-column/src/main/java/parquet/filter/ColumnRecordFilter.java
new file mode 100644
index 000000000..104416f02
--- /dev/null
+++ b/parquet-column/src/main/java/parquet/filter/ColumnRecordFilter.java
@@ -0,0 +1,80 @@
+/**
+ * Copyright 2012 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package parquet.filter;
+
+import parquet.column.ColumnReader;
+import java.util.Arrays;
+import static parquet.Preconditions.checkNotNull;
+
+/**
+ * Record filter which applies the supplied predicate to the specified column.
+ */
+public final class ColumnRecordFilter implements RecordFilter {
+
+ private final ColumnReader filterOnColumn;
+ private final ColumnPredicates.Predicate filterPredicate;
+
+ /**
+ * Factory method for record filter which applies the supplied predicate to the specified column.
+ * Note that if searching for a repeated sub-attribute it will only ever match against the
+ * first instance of it in the object.
+ *
+ * @param columnPath Dot separated path specifier, e.g. "engine.capacity"
+ * @param predicate Should call getBinary etc. and check the value
+ */
+ public static final UnboundRecordFilter column(final String columnPath,
+ final ColumnPredicates.Predicate predicate) {
+ checkNotNull(columnPath, "columnPath");
+ checkNotNull(predicate, "predicate");
+ return new UnboundRecordFilter() {
+ final String[] filterPath = columnPath.split("\\.");
+ @Override
+ public RecordFilter bind(Iterable readers) {
+ for (ColumnReader reader : readers) {
+ if ( Arrays.equals( reader.getDescriptor().getPath(), filterPath)) {
+ return new ColumnRecordFilter(reader, predicate);
+ }
+ }
+ throw new IllegalArgumentException( "Column " + columnPath + " does not exist.");
+ }
+ };
+ }
+
+ /**
+ * Private constructor. Use column() instead.
+ */
+ private ColumnRecordFilter(ColumnReader filterOnColumn, ColumnPredicates.Predicate filterPredicate) {
+ this.filterOnColumn = filterOnColumn;
+ this.filterPredicate = filterPredicate;
+ }
+
+ /**
+ * @return true if the current value for the column reader matches the predicate.
+ */
+ @Override
+ public boolean isMatch() {
+
+ return ( filterOnColumn.isFullyConsumed()) ? false : filterPredicate.apply( filterOnColumn );
+ }
+
+ /**
+ * @return true if the column we are filtering on has no more values.
+ */
+ @Override
+ public boolean isFullyConsumed() {
+ return filterOnColumn.isFullyConsumed();
+ }
+}
diff --git a/parquet-column/src/main/java/parquet/filter/PagedRecordFilter.java b/parquet-column/src/main/java/parquet/filter/PagedRecordFilter.java
new file mode 100644
index 000000000..41851129c
--- /dev/null
+++ b/parquet-column/src/main/java/parquet/filter/PagedRecordFilter.java
@@ -0,0 +1,69 @@
+/**
+ * Copyright 2012 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package parquet.filter;
+
+import parquet.column.ColumnReader;
+
+/**
+ * Filter which will only materialize a page worth of results.
+ */
+public final class PagedRecordFilter implements RecordFilter {
+
+ private final long startPos;
+ private final long endPos;
+ private long currentPos = 0;
+
+ /**
+ * Returns builder for creating a paged query.
+ * @param startPos The record to start from, numbering starts at 1.
+ * @param pageSize The size of the page.
+ */
+ public static final UnboundRecordFilter page( final long startPos, final long pageSize ) {
+ return new UnboundRecordFilter() {
+ @Override
+ public RecordFilter bind(Iterable readers) {
+ return new PagedRecordFilter( startPos, pageSize );
+ }
+ };
+ }
+
+ /**
+ * Private constructor, use column() instead.
+ */
+ private PagedRecordFilter(long startPos, long pageSize) {
+ this.startPos = startPos;
+ this.endPos = startPos + pageSize;
+ }
+
+ /**
+ * Terminate early when we have got our page.
+ */
+ @Override
+ public boolean isFullyConsumed() {
+ return ( currentPos >= endPos );
+ }
+
+ /**
+ * Keeps track of how many times it is called. Only returns matches when the
+ * record number is in the range.
+ */
+ @Override
+ public boolean isMatch() {
+ currentPos++;
+ return (( currentPos >= startPos ) && ( currentPos < endPos ));
+ }
+
+}
diff --git a/parquet-column/src/main/java/parquet/filter/RecordFilter.java b/parquet-column/src/main/java/parquet/filter/RecordFilter.java
new file mode 100644
index 000000000..569835e3f
--- /dev/null
+++ b/parquet-column/src/main/java/parquet/filter/RecordFilter.java
@@ -0,0 +1,36 @@
+/**
+ * Copyright 2012 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package parquet.filter;
+
+import parquet.column.ColumnReader;
+
+/**
+ * Filter to be applied to a record to work out whether to skip it.
+ *
+ * @author Jacob Metcalf
+ */
+public interface RecordFilter {
+
+ /**
+ * Works out whether the current record can pass through the filter.
+ */
+ boolean isMatch();
+
+ /**
+ * Whether the filter values are fully consumed.
+ */
+ boolean isFullyConsumed();
+}
diff --git a/parquet-column/src/main/java/parquet/filter/UnboundRecordFilter.java b/parquet-column/src/main/java/parquet/filter/UnboundRecordFilter.java
new file mode 100644
index 000000000..b317727bb
--- /dev/null
+++ b/parquet-column/src/main/java/parquet/filter/UnboundRecordFilter.java
@@ -0,0 +1,33 @@
+/**
+ * Copyright 2012 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package parquet.filter;
+
+import parquet.column.ColumnReader;
+
+/**
+ * Builder for a record filter. Idea is that each filter provides a create function
+ * which returns an unbound filter. This only becomes a filter when it is bound to the actual
+ * columns.
+ *
+ * @author Jacob Metcalf
+ */
+public interface UnboundRecordFilter {
+
+ /**
+ * Call to bind to actual columns and create filter.
+ */
+ RecordFilter bind( Iterable readers);
+}
diff --git a/parquet-column/src/main/java/parquet/io/FilteredRecordReader.java b/parquet-column/src/main/java/parquet/io/FilteredRecordReader.java
new file mode 100644
index 000000000..3c1ae92a4
--- /dev/null
+++ b/parquet-column/src/main/java/parquet/io/FilteredRecordReader.java
@@ -0,0 +1,92 @@
+/**
+ * Copyright 2012 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package parquet.io;
+
+import parquet.column.ColumnReader;
+import parquet.column.impl.ColumnReadStoreImpl;
+import parquet.filter.RecordFilter;
+import parquet.filter.UnboundRecordFilter;
+import parquet.io.api.RecordMaterializer;
+
+import java.util.Arrays;
+
+/**
+ * Extends the
+ * @author Jacob Metcalf
+ *
+ */
+class FilteredRecordReader extends RecordReaderImplementation {
+
+ private final RecordFilter recordFilter;
+
+ /**
+ * @param root the root of the schema
+ * @param validating
+ * @param columnStore
+ * @param unboundFilter Filter records, pass in NULL_FILTER to leave unfiltered.
+ */
+ public FilteredRecordReader(MessageColumnIO root, RecordMaterializer recordMaterializer, boolean validating,
+ ColumnReadStoreImpl columnStore, UnboundRecordFilter unboundFilter) {
+ super(root, recordMaterializer, validating, columnStore);
+
+ if ( unboundFilter != null ) {
+ recordFilter = unboundFilter.bind(getColumnReaders());
+ } else {
+ recordFilter = null;
+ }
+ }
+
+ /**
+ * Override read() method to provide skip.
+ */
+ @Override
+ public T read() {
+ if ( skipToMatch()) {
+ return super.read();
+ } else {
+ return null;
+ }
+ }
+
+
+ /**
+ * Skips forwards until the filter finds the first match. Returns false
+ * if none found.
+ */
+ private boolean skipToMatch() {
+ while ( !recordFilter.isMatch()) {
+ if ( recordFilter.isFullyConsumed()) {
+ return false;
+ }
+ State currentState = getState(0);
+ do {
+ ColumnReader columnReader = currentState.column;
+
+ // currentLevel = depth + 1 at this point
+ // set the current value
+ if (columnReader.getCurrentDefinitionLevel() >= currentState.maxDefinitionLevel) {
+ columnReader.skip();
+ }
+ columnReader.consume();
+
+ // Based on repetition level work out next state to go to
+ int nextR = currentState.maxRepetitionLevel == 0 ? 0 : columnReader.getCurrentRepetitionLevel();
+ currentState = currentState.getNextState(nextR);
+ } while (currentState != null);
+ }
+ return true;
+ }
+}
diff --git a/parquet-column/src/main/java/parquet/io/MessageColumnIO.java b/parquet-column/src/main/java/parquet/io/MessageColumnIO.java
index a6e31f66a..64c65ad68 100644
--- a/parquet-column/src/main/java/parquet/io/MessageColumnIO.java
+++ b/parquet-column/src/main/java/parquet/io/MessageColumnIO.java
@@ -27,6 +27,8 @@
import parquet.io.api.RecordConsumer;
import parquet.io.api.RecordMaterializer;
import parquet.schema.MessageType;
+import parquet.filter.UnboundRecordFilter;
+import parquet.filter.RecordFilter;
/**
* Message level of the IO structure
@@ -59,7 +61,20 @@
recordMaterializer,
validating,
new ColumnReadStoreImpl(columns, recordMaterializer.getRootConverter(), getType())
- );
+ );
+ }
+ public RecordReader getRecordReader(PageReadStore columns, RecordMaterializer recordMaterializer,
+ UnboundRecordFilter unboundFilter) {
+
+ return (unboundFilter == null)
+ ? getRecordReader(columns, recordMaterializer)
+ : new FilteredRecordReader(
+ this,
+ recordMaterializer,
+ validating,
+ new ColumnReadStoreImpl(columns, recordMaterializer.getRootConverter(), getType()),
+ unboundFilter
+ );
}
private class MessageColumnIORecordConsumer extends RecordConsumer {
diff --git a/parquet-column/src/main/java/parquet/io/RecordReaderImplementation.java b/parquet-column/src/main/java/parquet/io/RecordReaderImplementation.java
index 5a2d83eeb..4d66278de 100644
--- a/parquet-column/src/main/java/parquet/io/RecordReaderImplementation.java
+++ b/parquet-column/src/main/java/parquet/io/RecordReaderImplementation.java
@@ -222,25 +222,29 @@ public int getDepth(int definitionLevel) {
public Case getCase(int currentLevel, int d, int nextR) {
return caseLookup[currentLevel][d][nextR];
}
+
+ public State getNextState(int nextR) {
+ return nextState[nextR];
+ }
}
private final GroupConverter recordConsumer;
private final RecordMaterializer recordMaterializer;
private State[] states;
+ private ColumnReader[] columns;
/**
*
* @param root the root of the schema
- * @param leaves the leaves of the schema
* @param validating
- * @param columns2
+ * @param columnStore
*/
public RecordReaderImplementation(MessageColumnIO root, RecordMaterializer recordMaterializer, boolean validating, ColumnReadStoreImpl columnStore) {
this.recordMaterializer = recordMaterializer;
this.recordConsumer = recordMaterializer.getRootConverter(); // TODO: validator(wrap(recordMaterializer), validating, root.getType());
PrimitiveColumnIO[] leaves = root.getLeaves().toArray(new PrimitiveColumnIO[root.getLeaves().size()]);
- ColumnReader[] columns = new ColumnReader[leaves.length];
+ columns = new ColumnReader[leaves.length];
int[][] nextReader = new int[leaves.length][];
int[][] nextLevel = new int[leaves.length][];
GroupConverter[][] groupConverterPaths = new GroupConverter[leaves.length][];
@@ -444,4 +448,8 @@ protected Converter getRecordConsumer() {
return recordConsumer;
}
+ protected Iterable getColumnReaders() {
+ // Converting the array to an iterable ensures that the array cannot be altered
+ return Arrays.asList(columns);
+ }
}
diff --git a/parquet-column/src/test/java/parquet/column/values/bitpacking/TestByteBasedBitPackingEncoder.java b/parquet-column/src/test/java/parquet/column/values/bitpacking/TestByteBasedBitPackingEncoder.java
index 9ca37a80e..9fd0e723c 100644
--- a/parquet-column/src/test/java/parquet/column/values/bitpacking/TestByteBasedBitPackingEncoder.java
+++ b/parquet-column/src/test/java/parquet/column/values/bitpacking/TestByteBasedBitPackingEncoder.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright 2012 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package parquet.column.values.bitpacking;
import org.junit.Test;
diff --git a/parquet-column/src/test/java/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java b/parquet-column/src/test/java/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java
index 675fda634..2058dba41 100644
--- a/parquet-column/src/test/java/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java
+++ b/parquet-column/src/test/java/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright 2012 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package parquet.column.values.rle;
import java.io.ByteArrayInputStream;
diff --git a/parquet-column/src/test/java/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java b/parquet-column/src/test/java/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java
index 4e63dafd9..2f5e4b865 100644
--- a/parquet-column/src/test/java/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java
+++ b/parquet-column/src/test/java/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright 2012 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package parquet.column.values.rle;
import java.io.ByteArrayInputStream;
diff --git a/parquet-column/src/test/java/parquet/io/TestFiltered.java b/parquet-column/src/test/java/parquet/io/TestFiltered.java
new file mode 100644
index 000000000..f4d9adeb8
--- /dev/null
+++ b/parquet-column/src/test/java/parquet/io/TestFiltered.java
@@ -0,0 +1,148 @@
+/**
+ * Copyright 2012 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package parquet.io;
+
+import org.junit.Test;
+import parquet.column.impl.ColumnWriteStoreImpl;
+import parquet.column.page.mem.MemPageStore;
+import parquet.example.data.Group;
+import parquet.example.data.GroupWriter;
+import parquet.example.data.simple.convert.GroupRecordConverter;
+import parquet.io.api.RecordMaterializer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static parquet.example.Paper.r1;
+import static parquet.example.Paper.r2;
+import static parquet.example.Paper.schema;
+import static parquet.filter.AndRecordFilter.and;
+import static parquet.filter.PagedRecordFilter.page;
+import static parquet.filter.ColumnPredicates.equalTo;
+import static parquet.filter.ColumnRecordFilter.column;
+
+public class TestFiltered {
+
+ @Test
+ public void testFilterOnInteger() {
+ MemPageStore memPageStore = new MemPageStore();
+ MessageColumnIO columnIO = new ColumnIOFactory(true).getColumnIO(schema);
+ writeTestRecords(memPageStore, columnIO, 1);
+
+ // Get first record
+ RecordMaterializer recordConverter = new GroupRecordConverter(schema);
+ RecordReaderImplementation recordReader = (RecordReaderImplementation)
+ columnIO.getRecordReader(memPageStore, recordConverter,
+ column("DocId", equalTo(10l)));
+
+ Group actual1= recordReader.read();
+ assertNull( "There should be no more records as r2 filtered out", recordReader.read());
+ assertEquals("filtering did not return the correct record", r1.toString(), actual1.toString());
+
+ // Get second record
+ recordReader = (RecordReaderImplementation)
+ columnIO.getRecordReader(memPageStore, recordConverter,
+ column("DocId", equalTo(20l)));
+
+ Group actual2 = recordReader.read();
+ assertNull( "There should be no more records as r1 filtered out", recordReader.read());
+ assertEquals("filtering did not return the correct record", r2.toString(), actual2.toString());
+
+ }
+
+ @Test
+ public void testFilterOnString() {
+ MemPageStore memPageStore = new MemPageStore();
+ MessageColumnIO columnIO = new ColumnIOFactory(true).getColumnIO(schema);
+ writeTestRecords(memPageStore, columnIO, 1);
+
+ // First try matching against the A url in record 1
+ RecordMaterializer recordConverter = new GroupRecordConverter(schema);
+ RecordReaderImplementation recordReader = (RecordReaderImplementation)
+ columnIO.getRecordReader(memPageStore, recordConverter,
+ column("Name.Url", equalTo("http://A")));
+
+ Group actual1 = recordReader.read();
+ assertNull( "There should be no more records as r2 filtered out", recordReader.read());
+ assertEquals("filtering did not return the correct record", r1.toString(), actual1.toString());
+
+ // Second try matching against the B url in record 1 - it should fail as we only match
+ // against the first instance of a
+ recordReader = (RecordReaderImplementation)
+ columnIO.getRecordReader(memPageStore, recordConverter,
+ column("Name.Url", equalTo("http://B")));
+
+ assertNull( "There should be no matching records", recordReader.read());
+
+ // Finally try matching against the C url in record 2
+ recordReader = (RecordReaderImplementation)
+ columnIO.getRecordReader(memPageStore, recordConverter,
+ column("Name.Url", equalTo("http://C")));
+
+ Group actual2 = recordReader.read();
+ assertNull( "There should be no more records as r1 filtered out", recordReader.read());
+ assertEquals("filtering did not return the correct record", r2.toString(), actual2.toString());
+ }
+
+ @Test
+ public void testPaged() {
+ MemPageStore memPageStore = new MemPageStore();
+ MessageColumnIO columnIO = new ColumnIOFactory(true).getColumnIO(schema);
+ writeTestRecords(memPageStore, columnIO, 6);
+
+ RecordMaterializer recordConverter = new GroupRecordConverter(schema);
+ RecordReaderImplementation recordReader = (RecordReaderImplementation)
+ columnIO.getRecordReader(memPageStore, recordConverter,
+ page(4, 4));
+
+ int count = 0;
+ while ( count < 2 ) { // starts at position 4 which should be r2
+ assertEquals("expecting record2", r2.toString(), recordReader.read().toString());
+ assertEquals("expecting record1", r1.toString(), recordReader.read().toString());
+ count++;
+ }
+ assertNull("There should be no more records", recordReader.read());
+ }
+
+ @Test
+ public void testFilteredAndPaged() {
+ MemPageStore memPageStore = new MemPageStore();
+ MessageColumnIO columnIO = new ColumnIOFactory(true).getColumnIO(schema);
+ writeTestRecords(memPageStore, columnIO, 8);
+
+ RecordMaterializer recordConverter = new GroupRecordConverter(schema);
+ RecordReaderImplementation recordReader = (RecordReaderImplementation)
+ columnIO.getRecordReader(memPageStore, recordConverter,
+ and(column("DocId", equalTo(10l)), page(2, 4)));
+
+ int count = 0;
+ while ( count < 4 ) { // starts at position 4 which should be r2
+ assertEquals("expecting 4 x record1", r1.toString(), recordReader.read().toString());
+ count++;
+ }
+ assertNull( "There should be no more records", recordReader.read());
+ }
+
+ private void writeTestRecords(MemPageStore memPageStore, MessageColumnIO columnIO, int number) {
+ ColumnWriteStoreImpl columns = new ColumnWriteStoreImpl(memPageStore, 800, 800, false);
+
+ GroupWriter groupWriter = new GroupWriter(columnIO.getRecordWriter(columns), schema);
+ for ( int i = 0; i < number; i++ ) {
+ groupWriter.write(r1);
+ groupWriter.write(r2);
+ }
+ columns.flush();
+ }
+}
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetReader.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetReader.java
index e69dca2f1..d3e196a4e 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetReader.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetReader.java
@@ -26,6 +26,8 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import parquet.filter.RecordFilter;
+import parquet.filter.UnboundRecordFilter;
import parquet.hadoop.api.ReadSupport;
import parquet.hadoop.api.ReadSupport.ReadContext;
import parquet.hadoop.metadata.BlockMetaData;
@@ -40,6 +42,10 @@
private ParquetRecordReader reader;
public ParquetReader(Path file, ReadSupport readSupport) throws IOException {
+ this(file, readSupport, null);
+ }
+
+ public ParquetReader(Path file, ReadSupport readSupport, UnboundRecordFilter filter) throws IOException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
@@ -53,7 +59,7 @@ public ParquetReader(Path file, ReadSupport readSupport) throws IOException {
MessageType schema = fileMetaData.getSchema();
Map extraMetadata = fileMetaData.getKeyValueMetaData();
final ReadContext readContext = readSupport.init(conf, extraMetadata, schema);
- reader = new ParquetRecordReader(readSupport);
+ reader = new ParquetRecordReader(readSupport, filter);
ParquetInputSplit inputSplit =
new ParquetInputSplit(
file, 0, 0, null, blocks,
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java
index 9ddbea466..a8ccb0282 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java
@@ -30,6 +30,8 @@
import parquet.Log;
import parquet.column.ColumnDescriptor;
import parquet.column.page.PageReadStore;
+import parquet.filter.RecordFilter;
+import parquet.filter.UnboundRecordFilter;
import parquet.hadoop.api.ReadSupport;
import parquet.hadoop.metadata.BlockMetaData;
import parquet.hadoop.util.ContextUtil;
@@ -69,6 +71,7 @@
private int currentBlock = -1;
private ParquetFileReader reader;
private parquet.io.RecordReader recordReader;
+ private UnboundRecordFilter recordFilter;
private long totalTimeSpentReadingBytes;
private long totalTimeSpentProcessingRecords;
@@ -77,11 +80,19 @@
private long totalCountLoadedSoFar = 0;
/**
- * @param requestedSchema the requested schema (a subset of the original schema) for record projection
- * @param readSupportClass
+ * @param readSupport Object which helps reads files of the given tye, e.g. Thrift, Avro.
*/
public ParquetRecordReader(ReadSupport readSupport) {
+ this(readSupport, null);
+ }
+
+ /**
+ * @param readSupport Object which helps reads files of the given tye, e.g. Thrift, Avro.
+ * @param filter Optional filter for only returning matching records.
+ */
+ public ParquetRecordReader(ReadSupport readSupport, UnboundRecordFilter filter) {
this.readSupport = readSupport;
+ this.recordFilter = filter;
}
private void checkRead() throws IOException {
@@ -107,7 +118,7 @@ private void checkRead() throws IOException {
LOG.info("block read in memory in " + timeSpentReading + " ms. row count = " + pages.getRowCount());
if (Log.DEBUG) LOG.debug("initializing Record assembly with requested schema " + requestedSchema);
MessageColumnIO columnIO = columnIOFactory.getColumnIO(requestedSchema, fileSchema);
- recordReader = columnIO.getRecordReader(pages, recordConverter);
+ recordReader = columnIO.getRecordReader(pages, recordConverter, recordFilter);
startedAssemblingCurrentBlockAt = System.currentTimeMillis();
totalCountLoadedSoFar += pages.getRowCount();
++ currentBlock;
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyCodec.java b/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyCodec.java
index abe1d67ef..70c4f115f 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyCodec.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyCodec.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright 2012 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package parquet.hadoop.codec;
import java.io.IOException;
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyCompressor.java b/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyCompressor.java
index ac24303fd..61520d54f 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyCompressor.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyCompressor.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright 2012 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package parquet.hadoop.codec;
import java.io.IOException;
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyDecompressor.java b/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyDecompressor.java
index d1f5e5360..f1c9e2ab2 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyDecompressor.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyDecompressor.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright 2012 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package parquet.hadoop.codec;
import java.io.IOException;
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyUtil.java b/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyUtil.java
index 2b2bf1eb4..389dcce19 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyUtil.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyUtil.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright 2012 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package parquet.hadoop.codec;
import parquet.Preconditions;
diff --git a/parquet-hadoop/src/test/java/parquet/hadoop/TestSnappyCodec.java b/parquet-hadoop/src/test/java/parquet/hadoop/TestSnappyCodec.java
index ac9532743..09c9d6831 100644
--- a/parquet-hadoop/src/test/java/parquet/hadoop/TestSnappyCodec.java
+++ b/parquet-hadoop/src/test/java/parquet/hadoop/TestSnappyCodec.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright 2012 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package parquet.hadoop;
import java.io.IOException;