@@ -2171,6 +2171,11 @@ namespace
21712171 field_index_by_field_tag.emplace (field_infos[i].field_tag , i);
21722172 }
21732173
2174+ void setHasEnvelopeAsParent ()
2175+ {
2176+ has_envelope_as_parent = true ;
2177+ }
2178+
21742179 void setColumns (const ColumnPtr * columns_, size_t num_columns_) override
21752180 {
21762181 if (!num_columns_)
@@ -2217,7 +2222,7 @@ namespace
22172222
22182223 void writeRow (size_t row_num) override
22192224 {
2220- if (parent_field_descriptor)
2225+ if (parent_field_descriptor || has_envelope_as_parent )
22212226 writer->startNestedMessage ();
22222227 else
22232228 writer->startMessage ();
@@ -2236,13 +2241,17 @@ namespace
22362241 bool is_group = (parent_field_descriptor->type () == FieldTypeId::TYPE_GROUP);
22372242 writer->endNestedMessage (parent_field_descriptor->number (), is_group, should_skip_if_empty);
22382243 }
2244+ else if (has_envelope_as_parent)
2245+ {
2246+ writer->endNestedMessage (1 , false , should_skip_if_empty);
2247+ }
22392248 else
22402249 writer->endMessage (with_length_delimiter);
22412250 }
22422251
22432252 void readRow (size_t row_num) override
22442253 {
2245- if (parent_field_descriptor)
2254+ if (parent_field_descriptor || has_envelope_as_parent )
22462255 reader->startNestedMessage ();
22472256 else
22482257 reader->startMessage (with_length_delimiter);
@@ -2285,7 +2294,7 @@ namespace
22852294 }
22862295 }
22872296
2288- if (parent_field_descriptor)
2297+ if (parent_field_descriptor || has_envelope_as_parent )
22892298 reader->endNestedMessage ();
22902299 else
22912300 reader->endMessage (false );
@@ -2375,6 +2384,7 @@ namespace
23752384 };
23762385
23772386 const FieldDescriptor * const parent_field_descriptor;
2387+ bool has_envelope_as_parent = false ;
23782388 const bool with_length_delimiter;
23792389 const std::unique_ptr<RowInputMissingColumnsFiller> missing_columns_filler;
23802390 const bool should_skip_if_empty;
@@ -2388,6 +2398,86 @@ namespace
23882398 size_t last_field_index = static_cast <size_t >(-1 );
23892399 };
23902400
2401+ // / Serializes a top-level envelope message in the protobuf schema.
2402+ // / "Envelope" means that the contained subtree of serializers is enclosed in a message just once,
2403+ // / i.e. only when the first and the last row read/write trigger a read/write of the msg header.
2404+ class ProtobufSerializerEnvelope : public ProtobufSerializer
2405+ {
2406+ public:
2407+ ProtobufSerializerEnvelope (
2408+ std::unique_ptr<ProtobufSerializerMessage>&& serializer_,
2409+ const ProtobufReaderOrWriter & reader_or_writer_)
2410+ : serializer(std::move(serializer_))
2411+ , reader(reader_or_writer_.reader)
2412+ , writer(reader_or_writer_.writer)
2413+ {
2414+ // The inner serializer has a backreference of type protobuf::FieldDescriptor * to it's parent
2415+ // serializer. If it is unset, it considers itself the top-level message, otherwise a nested
2416+ // message and accordingly it makes start/endMessage() vs. startEndNestedMessage() calls into
2417+ // Protobuf(Writer|Reader). There is no field descriptor because Envelopes merely forward calls
2418+ // but don't contain data to be serialized. We must still force the inner serializer to act
2419+ // as nested message.
2420+ serializer->setHasEnvelopeAsParent ();
2421+ }
2422+
2423+ void setColumns (const ColumnPtr * columns_, size_t num_columns_) override
2424+ {
2425+ serializer->setColumns (columns_, num_columns_);
2426+ }
2427+
2428+ void setColumns (const MutableColumnPtr * columns_, size_t num_columns_) override
2429+ {
2430+ serializer->setColumns (columns_, num_columns_);
2431+ }
2432+
2433+ void writeRow (size_t row_num) override
2434+ {
2435+ if (first_call_of_write_row)
2436+ {
2437+ writer->startMessage ();
2438+ first_call_of_write_row = false ;
2439+ }
2440+
2441+ serializer->writeRow (row_num);
2442+ }
2443+
2444+ void finalizeWrite () override
2445+ {
2446+ writer->endMessage (/* with_length_delimiter = */ true );
2447+ }
2448+
2449+ void readRow (size_t row_num) override
2450+ {
2451+ if (first_call_of_read_row)
2452+ {
2453+ reader->startMessage (/* with_length_delimiter = */ true );
2454+ first_call_of_read_row = false ;
2455+ }
2456+
2457+ int field_tag;
2458+ [[maybe_unused]] bool ret = reader->readFieldNumber (field_tag);
2459+ assert (ret);
2460+
2461+ serializer->readRow (row_num);
2462+ }
2463+
2464+ void insertDefaults (size_t row_num) override
2465+ {
2466+ serializer->insertDefaults (row_num);
2467+ }
2468+
2469+ void describeTree (WriteBuffer & out, size_t indent) const override
2470+ {
2471+ writeIndent (out, indent) << " ProtobufSerializerEnvelope ->\n " ;
2472+ serializer->describeTree (out, indent + 1 );
2473+ }
2474+
2475+ std::unique_ptr<ProtobufSerializerMessage> serializer;
2476+ ProtobufReader * const reader;
2477+ ProtobufWriter * const writer;
2478+ bool first_call_of_write_row = true ;
2479+ bool first_call_of_read_row = true ;
2480+ };
23912481
23922482 // / Serializes a tuple with explicit names as a nested message.
23932483 class ProtobufSerializerTupleAsNestedMessage : public ProtobufSerializer
@@ -2610,7 +2700,8 @@ namespace
26102700 const DataTypes & data_types,
26112701 std::vector<size_t > & missing_column_indices,
26122702 const MessageDescriptor & message_descriptor,
2613- bool with_length_delimiter)
2703+ bool with_length_delimiter,
2704+ bool with_envelope)
26142705 {
26152706 root_serializer_ptr = std::make_shared<ProtobufSerializer *>();
26162707 get_root_desc_function = [root_serializer_ptr = root_serializer_ptr](size_t indent) -> String
@@ -2648,13 +2739,23 @@ namespace
26482739 boost::range::set_difference (collections::range (column_names.size ()), used_column_indices_sorted,
26492740 std::back_inserter (missing_column_indices));
26502741
2651- *root_serializer_ptr = message_serializer.get ();
2652-
2742+ if (!with_envelope)
2743+ {
2744+ *root_serializer_ptr = message_serializer.get ();
26532745#if 0
2654- LOG_INFO(&Poco::Logger::get("ProtobufSerializer"), "Serialization tree:\n{}", get_root_desc_function(0));
2746+ LOG_INFO(&Poco::Logger::get("ProtobufSerializer"), "Serialization tree:\n{}", get_root_desc_function(0));
26552747#endif
2656-
2657- return message_serializer;
2748+ return message_serializer;
2749+ }
2750+ else
2751+ {
2752+ auto envelope_serializer = std::make_unique<ProtobufSerializerEnvelope>(std::move (message_serializer), reader_or_writer);
2753+ *root_serializer_ptr = envelope_serializer.get ();
2754+ #if 0
2755+ LOG_INFO(&Poco::Logger::get("ProtobufSerializer"), "Serialization tree:\n{}", get_root_desc_function(0));
2756+ #endif
2757+ return envelope_serializer;
2758+ }
26582759 }
26592760
26602761 private:
@@ -3337,20 +3438,22 @@ std::unique_ptr<ProtobufSerializer> ProtobufSerializer::create(
33373438 std::vector<size_t > & missing_column_indices,
33383439 const google::protobuf::Descriptor & message_descriptor,
33393440 bool with_length_delimiter,
3441+ bool with_envelope,
33403442 ProtobufReader & reader)
33413443{
3342- return ProtobufSerializerBuilder (reader).buildMessageSerializer (column_names, data_types, missing_column_indices, message_descriptor, with_length_delimiter);
3444+ return ProtobufSerializerBuilder (reader).buildMessageSerializer (column_names, data_types, missing_column_indices, message_descriptor, with_length_delimiter, with_envelope );
33433445}
33443446
33453447std::unique_ptr<ProtobufSerializer> ProtobufSerializer::create (
33463448 const Strings & column_names,
33473449 const DataTypes & data_types,
33483450 const google::protobuf::Descriptor & message_descriptor,
33493451 bool with_length_delimiter,
3452+ bool with_envelope,
33503453 ProtobufWriter & writer)
33513454{
33523455 std::vector<size_t > missing_column_indices;
3353- return ProtobufSerializerBuilder (writer).buildMessageSerializer (column_names, data_types, missing_column_indices, message_descriptor, with_length_delimiter);
3456+ return ProtobufSerializerBuilder (writer).buildMessageSerializer (column_names, data_types, missing_column_indices, message_descriptor, with_length_delimiter, with_envelope );
33543457}
33553458
33563459NamesAndTypesList protobufSchemaToCHSchema (const google::protobuf::Descriptor * message_descriptor)
0 commit comments