1818 */
1919package org .apache .parquet .proto ;
2020
21- import com .google .protobuf .ByteString ;
22- import com .google .protobuf .DescriptorProtos ;
23- import com .google .protobuf .Descriptors ;
24- import com .google .protobuf .MapEntry ;
25- import com .google .protobuf .Message ;
26- import com .google .protobuf .MessageOrBuilder ;
27- import com .google .protobuf .TextFormat ;
21+ import com .google .protobuf .*;
22+ import com .google .protobuf .Descriptors .Descriptor ;
23+ import com .google .protobuf .Descriptors .FieldDescriptor ;
2824import com .twitter .elephantbird .util .Protobufs ;
2925import org .apache .hadoop .conf .Configuration ;
3026import org .apache .parquet .hadoop .BadConfigurationException ;
3127import org .apache .parquet .hadoop .api .WriteSupport ;
3228import org .apache .parquet .io .InvalidRecordException ;
3329import org .apache .parquet .io .api .Binary ;
3430import org .apache .parquet .io .api .RecordConsumer ;
35- import org .apache .parquet .schema .GroupType ;
36- import org .apache .parquet .schema .IncompatibleSchemaModificationException ;
37- import org .apache .parquet .schema .MessageType ;
38- import org .apache .parquet .schema .OriginalType ;
31+ import org .apache .parquet .schema .*;
3932import org .apache .parquet .schema .Type ;
4033import org .slf4j .Logger ;
4134import org .slf4j .LoggerFactory ;
@@ -113,7 +106,7 @@ public WriteContext init(Configuration configuration) {
113106 }
114107
115108 MessageType rootSchema = new ProtoSchemaConverter ().convert (protoMessage );
116- Descriptors . Descriptor messageDescriptor = Protobufs .getMessageDescriptor (protoMessage );
109+ Descriptor messageDescriptor = Protobufs .getMessageDescriptor (protoMessage );
117110 validatedMapping (messageDescriptor , rootSchema );
118111
119112 this .messageWriter = new MessageWriter (messageDescriptor , rootSchema );
@@ -156,11 +149,11 @@ class MessageWriter extends FieldWriter {
156149 final FieldWriter [] fieldWriters ;
157150
158151 @ SuppressWarnings ("unchecked" )
159- MessageWriter (Descriptors . Descriptor descriptor , GroupType schema ) {
160- List <Descriptors . FieldDescriptor > fields = descriptor .getFields ();
152+ MessageWriter (Descriptor descriptor , GroupType schema ) {
153+ List <FieldDescriptor > fields = descriptor .getFields ();
161154 fieldWriters = (FieldWriter []) Array .newInstance (FieldWriter .class , fields .size ());
162155
163- for (Descriptors . FieldDescriptor fieldDescriptor : fields ) {
156+ for (FieldDescriptor fieldDescriptor : fields ) {
164157 String name = fieldDescriptor .getName ();
165158 Type type = schema .getType (name );
166159 FieldWriter writer = createWriter (fieldDescriptor , type );
@@ -176,7 +169,7 @@ class MessageWriter extends FieldWriter {
176169 }
177170 }
178171
179- private FieldWriter createWriter (Descriptors . FieldDescriptor fieldDescriptor , Type type ) {
172+ private FieldWriter createWriter (FieldDescriptor fieldDescriptor , Type type ) {
180173
181174 switch (fieldDescriptor .getJavaType ()) {
182175 case STRING : return new StringWriter () ;
@@ -193,7 +186,7 @@ private FieldWriter createWriter(Descriptors.FieldDescriptor fieldDescriptor, Ty
193186 return unknownType (fieldDescriptor );//should not be executed, always throws exception.
194187 }
195188
196- private FieldWriter createMessageWriter (Descriptors . FieldDescriptor fieldDescriptor , Type type ) {
189+ private FieldWriter createMessageWriter (FieldDescriptor fieldDescriptor , Type type ) {
197190 if (fieldDescriptor .isMapField ()) {
198191 return createMapWriter (fieldDescriptor , type );
199192 }
@@ -203,7 +196,7 @@ private FieldWriter createMessageWriter(Descriptors.FieldDescriptor fieldDescrip
203196
204197 private GroupType getGroupType (Type type ) {
205198 if (type .getOriginalType () == OriginalType .LIST ) {
206- return type .asGroupType ().getType ("list" ).asGroupType ();
199+ return type .asGroupType ().getType ("list" ).asGroupType (). getType ( "element" ). asGroupType () ;
207200 }
208201
209202 if (type .getOriginalType () == OriginalType .MAP ) {
@@ -213,20 +206,20 @@ private GroupType getGroupType(Type type) {
213206 return type .asGroupType ();
214207 }
215208
216- private MapWriter createMapWriter (Descriptors . FieldDescriptor fieldDescriptor , Type type ) {
217- List <Descriptors . FieldDescriptor > fields = fieldDescriptor .getMessageType ().getFields ();
209+ private MapWriter createMapWriter (FieldDescriptor fieldDescriptor , Type type ) {
210+ List <FieldDescriptor > fields = fieldDescriptor .getMessageType ().getFields ();
218211 if (fields .size () != 2 ) {
219212 throw new UnsupportedOperationException ("Expected two fields for the map (key/value), but got: " + fields );
220213 }
221214
222215 // KeyFieldWriter
223- Descriptors . FieldDescriptor keyProtoField = fields .get (0 );
216+ FieldDescriptor keyProtoField = fields .get (0 );
224217 FieldWriter keyWriter = createWriter (keyProtoField , type );
225218 keyWriter .setFieldName (keyProtoField .getName ());
226219 keyWriter .setIndex (0 );
227220
228221 // ValueFieldWriter
229- Descriptors . FieldDescriptor valueProtoField = fields .get (1 );
222+ FieldDescriptor valueProtoField = fields .get (1 );
230223 FieldWriter valueWriter = createWriter (valueProtoField , type );
231224 valueWriter .setFieldName (valueProtoField .getName ());
232225 valueWriter .setIndex (1 );
@@ -257,10 +250,10 @@ final void writeField(Object value) {
257250
258251 private void writeAllFields (MessageOrBuilder pb ) {
259252 //returns changed fields with values. Map is ordered by id.
260- Map <Descriptors . FieldDescriptor , Object > changedPbFields = pb .getAllFields ();
253+ Map <FieldDescriptor , Object > changedPbFields = pb .getAllFields ();
261254
262- for (Map .Entry <Descriptors . FieldDescriptor , Object > entry : changedPbFields .entrySet ()) {
263- Descriptors . FieldDescriptor fieldDescriptor = entry .getKey ();
255+ for (Map .Entry <FieldDescriptor , Object > entry : changedPbFields .entrySet ()) {
256+ FieldDescriptor fieldDescriptor = entry .getKey ();
264257
265258 if (fieldDescriptor .isExtension ()) {
266259 // Field index of an extension field might overlap with a base field.
@@ -295,13 +288,21 @@ final void writeField(Object value) {
295288 recordConsumer .startField ("list" , 0 ); // This is the wrapper group for the array field
296289 for (Object listEntry : list ) {
297290 recordConsumer .startGroup ();
298- if (isPrimitive (listEntry )) {
299- recordConsumer .startField ("element" , 0 );
291+
292+ recordConsumer .startField ("element" , 0 ); // This is the mandatory inner field
293+
294+ if (!isPrimitive (listEntry )) {
295+ recordConsumer .startGroup ();
300296 }
297+
301298 fieldWriter .writeRawValue (listEntry );
302- if (isPrimitive (listEntry )) {
303- recordConsumer .endField ("element" , 0 );
299+
300+ if (!isPrimitive (listEntry )) {
301+ recordConsumer .endGroup ();
304302 }
303+
304+ recordConsumer .endField ("element" , 0 );
305+
305306 recordConsumer .endGroup ();
306307 }
307308 recordConsumer .endField ("list" , 0 );
@@ -316,10 +317,10 @@ private boolean isPrimitive(Object listEntry) {
316317 }
317318
318319 /** validates mapping between protobuffer fields and parquet fields.*/
319- private void validatedMapping (Descriptors . Descriptor descriptor , GroupType parquetSchema ) {
320- List <Descriptors . FieldDescriptor > allFields = descriptor .getFields ();
320+ private void validatedMapping (Descriptor descriptor , GroupType parquetSchema ) {
321+ List <FieldDescriptor > allFields = descriptor .getFields ();
321322
322- for (Descriptors . FieldDescriptor fieldDescriptor : allFields ) {
323+ for (FieldDescriptor fieldDescriptor : allFields ) {
323324 String fieldName = fieldDescriptor .getName ();
324325 int fieldIndex = fieldDescriptor .getIndex ();
325326 int parquetIndex = parquetSchema .getFieldIndex (fieldName );
@@ -370,10 +371,16 @@ final void writeRawValue(Object value) {
370371 recordConsumer .startGroup ();
371372
372373 recordConsumer .startField ("key_value" , 0 ); // This is the wrapper group for the map field
373- for ( MapEntry <?, ?> entry : (Collection <MapEntry <?, ?> >) value ) {
374+ for ( Message msg : (Collection <Message >) value ) {
374375 recordConsumer .startGroup ();
375- keyWriter .writeField (entry .getKey ());
376- valueWriter .writeField (entry .getValue ());
376+
377+ final Descriptor descriptorForType = msg .getDescriptorForType ();
378+ final FieldDescriptor keyDesc = descriptorForType .findFieldByName ("key" );
379+ final FieldDescriptor valueDesc = descriptorForType .findFieldByName ("value" );
380+
381+ keyWriter .writeField (msg .getField (keyDesc ));
382+ valueWriter .writeField (msg .getField (valueDesc ));
383+
377384 recordConsumer .endGroup ();
378385 }
379386
@@ -421,15 +428,15 @@ final void writeRawValue(Object value) {
421428 }
422429 }
423430
424- private FieldWriter unknownType (Descriptors . FieldDescriptor fieldDescriptor ) {
431+ private FieldWriter unknownType (FieldDescriptor fieldDescriptor ) {
425432 String exceptionMsg = "Unknown type with descriptor \" " + fieldDescriptor
426433 + "\" and type \" " + fieldDescriptor .getJavaType () + "\" ." ;
427434 throw new InvalidRecordException (exceptionMsg );
428435 }
429436
430437 /** Returns message descriptor as JSON String*/
431438 private String serializeDescriptor (Class <? extends Message > protoClass ) {
432- Descriptors . Descriptor descriptor = Protobufs .getMessageDescriptor (protoClass );
439+ Descriptor descriptor = Protobufs .getMessageDescriptor (protoClass );
433440 DescriptorProtos .DescriptorProto asProto = descriptor .toProto ();
434441 return TextFormat .printToString (asProto );
435442 }
0 commit comments