Skip to content

Commit aa3c05e

Browse files
authored
Merge pull request #35152 from rschu1ze/protobuf-batch-write
ProtobufList
2 parents 2c6ce4b + 6e1d7a3 commit aa3c05e

23 files changed

Lines changed: 1610 additions & 69 deletions

docs/en/interfaces/formats.md

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ The supported formats are:
5151
| [PrettySpace](#prettyspace) |||
5252
| [Protobuf](#protobuf) |||
5353
| [ProtobufSingle](#protobufsingle) |||
54+
| [ProtobufList](#protobuflist) |||
5455
| [Avro](#data-format-avro) |||
5556
| [AvroConfluent](#data-format-avro-confluent) |||
5657
| [Parquet](#data-format-parquet) |||
@@ -1230,7 +1231,38 @@ See also [how to read/write length-delimited protobuf messages in popular langua
12301231

12311232
## ProtobufSingle {#protobufsingle}
12321233

1233-
Same as [Protobuf](#protobuf) but for storing/parsing single Protobuf message without length delimiters.
1234+
Same as [Protobuf](#protobuf) but for storing/parsing a single Protobuf message without length delimiter.
1235+
As a result, only a single table row can be written/read.
1236+
1237+
## ProtobufList {#protobuflist}
1238+
1239+
Similar to Protobuf but rows are represented as a sequence of sub-messages contained in a message with fixed name "Envelope".
1240+
1241+
Usage example:
1242+
1243+
``` sql
1244+
SELECT * FROM test.table FORMAT ProtobufList SETTINGS format_schema = 'schemafile:MessageType'
1245+
```
1246+
1247+
``` bash
1248+
cat protobuflist_messages.bin | clickhouse-client --query "INSERT INTO test.table FORMAT ProtobufList SETTINGS format_schema='schemafile:MessageType'"
1249+
```
1250+
1251+
where the file `schemafile.proto` looks like this:
1252+
1253+
``` capnp
1254+
syntax = "proto3";
1255+
1256+
message Envelope {
1257+
message MessageType {
1258+
string name = 1;
1259+
string surname = 2;
1260+
uint32 birthDate = 3;
1261+
repeated string phoneNumbers = 4;
1262+
};
1263+
MessageType row = 1;
1264+
};
1265+
```
12341266

12351267
## Avro {#data-format-avro}
12361268

src/Formats/ProtobufSchemas.cpp

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ ProtobufSchemas & ProtobufSchemas::instance()
2424
class ProtobufSchemas::ImporterWithSourceTree : public google::protobuf::compiler::MultiFileErrorCollector
2525
{
2626
public:
27-
explicit ImporterWithSourceTree(const String & schema_directory) : importer(&disk_source_tree, this)
27+
explicit ImporterWithSourceTree(const String & schema_directory, WithEnvelope with_envelope_)
28+
: importer(&disk_source_tree, this)
29+
, with_envelope(with_envelope_)
2830
{
2931
disk_source_tree.MapPath("", schema_directory);
3032
}
@@ -39,16 +41,33 @@ class ProtobufSchemas::ImporterWithSourceTree : public google::protobuf::compile
3941
return descriptor;
4042

4143
const auto * file_descriptor = importer.Import(schema_path);
42-
// If there are parsing errors AddError() throws an exception and in this case the following line
44+
// If there are parsing errors, AddError() throws an exception and in this case the following line
4345
// isn't executed.
4446
assert(file_descriptor);
4547

46-
descriptor = file_descriptor->FindMessageTypeByName(message_name);
47-
if (!descriptor)
48-
throw Exception(
49-
"Not found a message named '" + message_name + "' in the schema file '" + schema_path + "'", ErrorCodes::BAD_ARGUMENTS);
50-
51-
return descriptor;
48+
if (with_envelope == WithEnvelope::No)
49+
{
50+
const auto * message_descriptor = file_descriptor->FindMessageTypeByName(message_name);
51+
if (!message_descriptor)
52+
throw Exception(
53+
"Could not find a message named '" + message_name + "' in the schema file '" + schema_path + "'", ErrorCodes::BAD_ARGUMENTS);
54+
55+
return message_descriptor;
56+
}
57+
else
58+
{
59+
const auto * envelope_descriptor = file_descriptor->FindMessageTypeByName("Envelope");
60+
if (!envelope_descriptor)
61+
throw Exception(
62+
"Could not find a message named 'Envelope' in the schema file '" + schema_path + "'", ErrorCodes::BAD_ARGUMENTS);
63+
64+
const auto * message_descriptor = envelope_descriptor->FindNestedTypeByName(message_name); // silly protobuf API disallows a restricting the field type to messages
65+
if (!message_descriptor)
66+
throw Exception(
67+
"Could not find a message named '" + message_name + "' in the schema file '" + schema_path + "'", ErrorCodes::BAD_ARGUMENTS);
68+
69+
return message_descriptor;
70+
}
5271
}
5372

5473
private:
@@ -63,18 +82,16 @@ class ProtobufSchemas::ImporterWithSourceTree : public google::protobuf::compile
6382

6483
google::protobuf::compiler::DiskSourceTree disk_source_tree;
6584
google::protobuf::compiler::Importer importer;
85+
const WithEnvelope with_envelope;
6686
};
6787

6888

69-
ProtobufSchemas::ProtobufSchemas() = default;
70-
ProtobufSchemas::~ProtobufSchemas() = default;
71-
72-
const google::protobuf::Descriptor * ProtobufSchemas::getMessageTypeForFormatSchema(const FormatSchemaInfo & info)
89+
const google::protobuf::Descriptor * ProtobufSchemas::getMessageTypeForFormatSchema(const FormatSchemaInfo & info, WithEnvelope with_envelope)
7390
{
7491
std::lock_guard lock(mutex);
7592
auto it = importers.find(info.schemaDirectory());
7693
if (it == importers.end())
77-
it = importers.emplace(info.schemaDirectory(), std::make_unique<ImporterWithSourceTree>(info.schemaDirectory())).first;
94+
it = importers.emplace(info.schemaDirectory(), std::make_unique<ImporterWithSourceTree>(info.schemaDirectory(), with_envelope)).first;
7895
auto * importer = it->second.get();
7996
return importer->import(info.schemaPath(), info.messageName());
8097
}

src/Formats/ProtobufSchemas.h

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,36 @@ class FormatSchemaInfo;
2828
class ProtobufSchemas : private boost::noncopyable
2929
{
3030
public:
31-
static ProtobufSchemas & instance();
31+
enum class WithEnvelope
32+
{
33+
// Return descriptor for a top-level message with a user-provided name.
34+
// Example: In protobuf schema
35+
// message MessageType {
36+
// string colA = 1;
37+
// int32 colB = 2;
38+
// }
39+
// message_name = "MessageType" returns a descriptor. Used by IO
40+
// formats Protobuf and ProtobufSingle.
41+
No,
42+
// Return descriptor for a message with a user-provided name one level
43+
// below a top-level message with the hardcoded name "Envelope".
44+
// Example: In protobuf schema
45+
// message Envelope {
46+
// message MessageType {
47+
// string colA = 1;
48+
// int32 colB = 2;
49+
// }
50+
// }
51+
// message_name = "MessageType" returns a descriptor. Used by IO format
52+
// ProtobufList.
53+
Yes
54+
};
3255

33-
ProtobufSchemas();
34-
~ProtobufSchemas();
56+
static ProtobufSchemas & instance();
3557

3658
/// Parses the format schema, then parses the corresponding proto file, and returns the descriptor of the message type.
3759
/// The function never returns nullptr, it throws an exception if it cannot load or parse the file.
38-
const google::protobuf::Descriptor * getMessageTypeForFormatSchema(const FormatSchemaInfo & info);
60+
const google::protobuf::Descriptor * getMessageTypeForFormatSchema(const FormatSchemaInfo & info, WithEnvelope with_envelope);
3961

4062
private:
4163
class ImporterWithSourceTree;

src/Formats/ProtobufSerializer.cpp

Lines changed: 114 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2171,6 +2171,11 @@ namespace
21712171
field_index_by_field_tag.emplace(field_infos[i].field_tag, i);
21722172
}
21732173

2174+
void setHasEnvelopeAsParent()
2175+
{
2176+
has_envelope_as_parent = true;
2177+
}
2178+
21742179
void setColumns(const ColumnPtr * columns_, size_t num_columns_) override
21752180
{
21762181
if (!num_columns_)
@@ -2217,7 +2222,7 @@ namespace
22172222

22182223
void writeRow(size_t row_num) override
22192224
{
2220-
if (parent_field_descriptor)
2225+
if (parent_field_descriptor || has_envelope_as_parent)
22212226
writer->startNestedMessage();
22222227
else
22232228
writer->startMessage();
@@ -2236,13 +2241,17 @@ namespace
22362241
bool is_group = (parent_field_descriptor->type() == FieldTypeId::TYPE_GROUP);
22372242
writer->endNestedMessage(parent_field_descriptor->number(), is_group, should_skip_if_empty);
22382243
}
2244+
else if (has_envelope_as_parent)
2245+
{
2246+
writer->endNestedMessage(1, false, should_skip_if_empty);
2247+
}
22392248
else
22402249
writer->endMessage(with_length_delimiter);
22412250
}
22422251

22432252
void readRow(size_t row_num) override
22442253
{
2245-
if (parent_field_descriptor)
2254+
if (parent_field_descriptor || has_envelope_as_parent)
22462255
reader->startNestedMessage();
22472256
else
22482257
reader->startMessage(with_length_delimiter);
@@ -2285,7 +2294,7 @@ namespace
22852294
}
22862295
}
22872296

2288-
if (parent_field_descriptor)
2297+
if (parent_field_descriptor || has_envelope_as_parent)
22892298
reader->endNestedMessage();
22902299
else
22912300
reader->endMessage(false);
@@ -2375,6 +2384,7 @@ namespace
23752384
};
23762385

23772386
const FieldDescriptor * const parent_field_descriptor;
2387+
bool has_envelope_as_parent = false;
23782388
const bool with_length_delimiter;
23792389
const std::unique_ptr<RowInputMissingColumnsFiller> missing_columns_filler;
23802390
const bool should_skip_if_empty;
@@ -2388,6 +2398,86 @@ namespace
23882398
size_t last_field_index = static_cast<size_t>(-1);
23892399
};
23902400

2401+
/// Serializes a top-level envelope message in the protobuf schema.
2402+
/// "Envelope" means that the contained subtree of serializers is enclosed in a message just once,
2403+
/// i.e. only when the first and the last row read/write trigger a read/write of the msg header.
2404+
class ProtobufSerializerEnvelope : public ProtobufSerializer
2405+
{
2406+
public:
2407+
ProtobufSerializerEnvelope(
2408+
std::unique_ptr<ProtobufSerializerMessage>&& serializer_,
2409+
const ProtobufReaderOrWriter & reader_or_writer_)
2410+
: serializer(std::move(serializer_))
2411+
, reader(reader_or_writer_.reader)
2412+
, writer(reader_or_writer_.writer)
2413+
{
2414+
// The inner serializer has a backreference of type protobuf::FieldDescriptor * to it's parent
2415+
// serializer. If it is unset, it considers itself the top-level message, otherwise a nested
2416+
// message and accordingly it makes start/endMessage() vs. startEndNestedMessage() calls into
2417+
// Protobuf(Writer|Reader). There is no field descriptor because Envelopes merely forward calls
2418+
// but don't contain data to be serialized. We must still force the inner serializer to act
2419+
// as nested message.
2420+
serializer->setHasEnvelopeAsParent();
2421+
}
2422+
2423+
void setColumns(const ColumnPtr * columns_, size_t num_columns_) override
2424+
{
2425+
serializer->setColumns(columns_, num_columns_);
2426+
}
2427+
2428+
void setColumns(const MutableColumnPtr * columns_, size_t num_columns_) override
2429+
{
2430+
serializer->setColumns(columns_, num_columns_);
2431+
}
2432+
2433+
void writeRow(size_t row_num) override
2434+
{
2435+
if (first_call_of_write_row)
2436+
{
2437+
writer->startMessage();
2438+
first_call_of_write_row = false;
2439+
}
2440+
2441+
serializer->writeRow(row_num);
2442+
}
2443+
2444+
void finalizeWrite() override
2445+
{
2446+
writer->endMessage(/*with_length_delimiter = */ true);
2447+
}
2448+
2449+
void readRow(size_t row_num) override
2450+
{
2451+
if (first_call_of_read_row)
2452+
{
2453+
reader->startMessage(/*with_length_delimiter = */ true);
2454+
first_call_of_read_row = false;
2455+
}
2456+
2457+
int field_tag;
2458+
[[maybe_unused]] bool ret = reader->readFieldNumber(field_tag);
2459+
assert(ret);
2460+
2461+
serializer->readRow(row_num);
2462+
}
2463+
2464+
void insertDefaults(size_t row_num) override
2465+
{
2466+
serializer->insertDefaults(row_num);
2467+
}
2468+
2469+
void describeTree(WriteBuffer & out, size_t indent) const override
2470+
{
2471+
writeIndent(out, indent) << "ProtobufSerializerEnvelope ->\n";
2472+
serializer->describeTree(out, indent + 1);
2473+
}
2474+
2475+
std::unique_ptr<ProtobufSerializerMessage> serializer;
2476+
ProtobufReader * const reader;
2477+
ProtobufWriter * const writer;
2478+
bool first_call_of_write_row = true;
2479+
bool first_call_of_read_row = true;
2480+
};
23912481

23922482
/// Serializes a tuple with explicit names as a nested message.
23932483
class ProtobufSerializerTupleAsNestedMessage : public ProtobufSerializer
@@ -2610,7 +2700,8 @@ namespace
26102700
const DataTypes & data_types,
26112701
std::vector<size_t> & missing_column_indices,
26122702
const MessageDescriptor & message_descriptor,
2613-
bool with_length_delimiter)
2703+
bool with_length_delimiter,
2704+
bool with_envelope)
26142705
{
26152706
root_serializer_ptr = std::make_shared<ProtobufSerializer *>();
26162707
get_root_desc_function = [root_serializer_ptr = root_serializer_ptr](size_t indent) -> String
@@ -2648,13 +2739,23 @@ namespace
26482739
boost::range::set_difference(collections::range(column_names.size()), used_column_indices_sorted,
26492740
std::back_inserter(missing_column_indices));
26502741

2651-
*root_serializer_ptr = message_serializer.get();
2652-
2742+
if (!with_envelope)
2743+
{
2744+
*root_serializer_ptr = message_serializer.get();
26532745
#if 0
2654-
LOG_INFO(&Poco::Logger::get("ProtobufSerializer"), "Serialization tree:\n{}", get_root_desc_function(0));
2746+
LOG_INFO(&Poco::Logger::get("ProtobufSerializer"), "Serialization tree:\n{}", get_root_desc_function(0));
26552747
#endif
2656-
2657-
return message_serializer;
2748+
return message_serializer;
2749+
}
2750+
else
2751+
{
2752+
auto envelope_serializer = std::make_unique<ProtobufSerializerEnvelope>(std::move(message_serializer), reader_or_writer);
2753+
*root_serializer_ptr = envelope_serializer.get();
2754+
#if 0
2755+
LOG_INFO(&Poco::Logger::get("ProtobufSerializer"), "Serialization tree:\n{}", get_root_desc_function(0));
2756+
#endif
2757+
return envelope_serializer;
2758+
}
26582759
}
26592760

26602761
private:
@@ -3337,20 +3438,22 @@ std::unique_ptr<ProtobufSerializer> ProtobufSerializer::create(
33373438
std::vector<size_t> & missing_column_indices,
33383439
const google::protobuf::Descriptor & message_descriptor,
33393440
bool with_length_delimiter,
3441+
bool with_envelope,
33403442
ProtobufReader & reader)
33413443
{
3342-
return ProtobufSerializerBuilder(reader).buildMessageSerializer(column_names, data_types, missing_column_indices, message_descriptor, with_length_delimiter);
3444+
return ProtobufSerializerBuilder(reader).buildMessageSerializer(column_names, data_types, missing_column_indices, message_descriptor, with_length_delimiter, with_envelope);
33433445
}
33443446

33453447
std::unique_ptr<ProtobufSerializer> ProtobufSerializer::create(
33463448
const Strings & column_names,
33473449
const DataTypes & data_types,
33483450
const google::protobuf::Descriptor & message_descriptor,
33493451
bool with_length_delimiter,
3452+
bool with_envelope,
33503453
ProtobufWriter & writer)
33513454
{
33523455
std::vector<size_t> missing_column_indices;
3353-
return ProtobufSerializerBuilder(writer).buildMessageSerializer(column_names, data_types, missing_column_indices, message_descriptor, with_length_delimiter);
3456+
return ProtobufSerializerBuilder(writer).buildMessageSerializer(column_names, data_types, missing_column_indices, message_descriptor, with_length_delimiter, with_envelope);
33543457
}
33553458

33563459
NamesAndTypesList protobufSchemaToCHSchema(const google::protobuf::Descriptor * message_descriptor)

src/Formats/ProtobufSerializer.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ class ProtobufSerializer
2626

2727
virtual void setColumns(const ColumnPtr * columns, size_t num_columns) = 0;
2828
virtual void writeRow(size_t row_num) = 0;
29+
virtual void finalizeWrite() {}
2930

3031
virtual void setColumns(const MutableColumnPtr * columns, size_t num_columns) = 0;
3132
virtual void readRow(size_t row_num) = 0;
@@ -39,13 +40,15 @@ class ProtobufSerializer
3940
std::vector<size_t> & missing_column_indices,
4041
const google::protobuf::Descriptor & message_descriptor,
4142
bool with_length_delimiter,
43+
bool with_envelope,
4244
ProtobufReader & reader);
4345

4446
static std::unique_ptr<ProtobufSerializer> create(
4547
const Strings & column_names,
4648
const DataTypes & data_types,
4749
const google::protobuf::Descriptor & message_descriptor,
4850
bool with_length_delimiter,
51+
bool with_envelope,
4952
ProtobufWriter & writer);
5053
};
5154

0 commit comments

Comments
 (0)