Tableflow Setup
This page describes how to setup WarpStream Tableflow.
Introduction
Tableflow automates the tedious process of transforming a topic in an Apache Kafka-compatible data streaming system into an Apache Iceberg table. Instead of writing custom code and manually configuring a data pipeline for each table you want to build, Tableflow allows you to declaratively specify which topics to build tables from and what schema and data format to expect. When schemas inevitably need to change, you can update the schema in Tableflow's editor and WarpStream will handle the schema migration automatically.
Compaction and table maintenance is included out-of-the-box with no tuning required. Tableflow continuously compacts the table in the background with intelligent heuristics to ensure readers get the best performance.
Tableflow is available as Bring-Your-Own-Cloud (BYOC) where the compute and storage live inside your cloud account inside your VPC. The raw data for your table is only ever stored inside your object storage bucket and never leaves your VPC during the table ingestion and maintenance process. Tableflow maintains a metadata store inside WarpStream Cloud as the Iceberg metadata layer that is periodically synced into your object storage bucket.
Getting Started
To get started with Tableflow, you first need to create a Tableflow cluster from the WarpStream Console, or using one of infrastructure-as-code deployment options. The WarpStream Agents that join this cluster will only perform Tableflow operations and do not expose the Apache Kafka protocol. Please refer to our other documentation for how to install and configure the WarpStream Agents in your environment as the process does not differ for Tableflow.
As part of deploying the Agents, you'll also need to setup and configure an object storage bucket and/or provide the Agents with access to one of your existing buckets. See our object storage configuration documentation for more details on that.
Once the Agents are running, you can open the Configuration table and start defining your source clusters, topics, tables, and schemas. Currently the only method for defining schemas is inline mode, which doesn't require using an external Schema Registry. Schema Registry support will be available in a future release.
Managed Tables
Tableflow tables are fully managed by WarpStream, or what we call "managed tables". You cannot use another system for performing writes, compactions, or other table maintenance operations. This is in contrast to a connector-based approach where you would be forced to combine multiple distinct systems or operations together to implement all of these functions.
Configuration
Tableflow is configured with and is fully controllable from a single YAML file which can be edited through the WarpStream console or the Pipelines API.
Overview
The YAML specifies
The source clusters Tableflow should connect to.
For each cluster, the topic that Tableflow should create Iceberg tables from.
For each topic, the schema to deserialize the Kafka records with.
The destination bucket to store the table.
Configure Source Clusters
Source clusters are the Apache Kafka-compatible systems like WarpStream that store the topics you'd like to convert to tables. You define clusters by giving them a name, a list of brokers, and credentials if they are needed. You define source clusters at the root of the configuration YAML.
You can define multiple source clusters so a single Tableflow cluster can centralize data from multiple clusters into one unified place.
Configure Connection and Credentials
If credentials are needed to connect to the Kafka cluster, the connection information can be provided under the credentials block for each cluster.
Note that if the source Kafka cluster is a WarpStream cluster, credentials still need to be provided if authentication is required. This is different from the Managed Data Pipelines setup where credentials are injected automatically.
TLS
use_tlsspecifies whether the Agents should use TLS when connecting to your source clusters.tls_insecure_skip_verifyspecifies whether a client verifies the server's certificate chain and host name.
SASL
Both the
sasl_username_envand thesasl_password_envfields refer to environment variable names. The Agents will append aTABLEFLOW_prefix to the values of the fields before using the environment variables, so the environment variables in the Agent should be configured asTABLEFLOW_SASL_USERNAME_ENV_VARandTABLEFLOW_SASL_PASSWORD_ENV_VARrespectively.The default value of
sasl_mechanismisplain. Supported mechanisms include:plain,scram-256, andscram-512.
mTLS PEM encoded certs
The
mtls_client_cert_env,mtls_client_key_env, andmtls_server_ca_cert_envfields refer to environment variable names. The Agents will append aTABLEFLOW_prefix to the values of the fields before using the environment variables, so the environment variables in the Agent should be configured asMTLS_CERT_PATH_ENV_VAR,MTLS_KEY_PATH_ENV_VAR, andMTLS_SERVER_CA_CERT_PATH_ENV_VARrespectively.mtls_client_cert_envspecifies the environment variable that contains the path to the X.509 certificate file in PEM format.mtls_client_key_envspecifies the environment variable that contains the path to the X.509 private key file in PEM format.mtls_server_ca_cert_envis optional and specifies the environment variable that contains the path to the X.509 certificate file in PEM format for the client certificate authority. "If not specified, the host's root certificate pool will be used for client certificate verification.
Configure the Destination Bucket URL
To specify where your table data should be stored, use the destination_bucket_url field at the root of the configuration YAML. This configures the default destination bucket URL for all tables.
Alternatively, you can specify per-table bucket URL overrides within each tables configuration:
The destination bucket URL for a table can only be changed if a table is completely empty. This means that once your table has started ingesting data successfully, the destination bucket cannot be changed.
Check our object storage configuration documentation for more details on how to configure this URL for various different cloud providers, as well as for a complete list of permissions that the Agents will require.
Note that Tables will be created under the <bucket-name>/warpstream/_tableflow path. Optionally, a prefix can be specified in the bucket URL, which will result in Tables being created under the <bucket-name>/prefix/warpstream/_tableflow path.
Configure Topics and Tables
The next step is defining your tables and topics. Each table you define has exactly one source topic, and the table will be named the same as <cluster-name>+<topic+name>.
Tableflow currently supports append-only tables. If you ingest data from a compacted topic in the source cluster, rows will not be deduplicated and any tombstones may not comply with the schema. Support for compacted topics is coming soon.
Schema Definitions
When using inline schemas, provide the schema in the same format as your input records.
schema_mode controls where schema definitions come from:
inline: schema is declared directly in the Tableflow config
The guidance in this section is about how to declare schemas when using schema_mode: inline.
If your records are JSON, provide a JSON Schema.
If your records are Avro, provide an Avro schema.
If your records are Protobuf, provide a
.protoschema.
Recommended Pattern: input_schema-first (agents v771+)
input_schema-first (agents v771+)Use input_schema as the default way to declare schemas.
If you are not using transforms, define only input_schema.
This is now the preferred pattern because it keeps schema declaration aligned with producer payload format and avoids manually maintaining Iceberg field IDs for simple pipelines.
Example: Json Input Schema
Supported types are:
boolean, int, long, float, double, decimal, date, time, timestamp, timestamptz, string, uuid, fixed, and binary.
Note that map keys can only be string
Example: Avro Input Schema
Supported types are:
boolean, int, long, float, double, decimal, date, time, timestamp, timestamptz, string, uuid, fixed, and binary.
Note that map keys can only be string.
Example: Protobuf Input Schema
When source_format is protobuf, set wire_format based on the payload encoding:
raw: protobuf binary payload with no prefixconfluent: Confluent wire format (magic byte + schema ID prefix)
The list of supported types are:
boolean, int32, sint32, uint32, fixed32, sfixed32, int64, sint64, uint64, fixed64, sfixed64, float, double, string, and bytes .
Enum values are stored by name in Iceberg but identified by number on the wire. This has implications for schema evolution:
adding new enum values is safe
renaming enum values is forbidden in WarpStream's TableFlow because renaming would cause inconsistent data (old records would have old names, new records would have new names)
removing old enum values is safe. But note that if we decode a record whose number is not in the current schema, it will be stored in Iceberg as the number in string form (e.g.
"99")
Note: WarpStream's TableFlow also validates that sibling enum fields have identical sets if they share any value name. This prevents accidental inconsistencies between different fields using the same enum type.
The Protobuf types are converted as follows to the Iceberg types:
boolean
boolean
int32
integer
sint32
integer
sfixed32
integer
uint32
decimal(10,0)
Stored as decimal(10,0) to prevent overflow (max uint32 > max int32)
fixed32
decimal(10,0)
Stored as decimal(10,0) to prevent overflow (max uint32 > max int32)
int64
long
sint64
long
sfixed64
long
uint64
decimal(20,0)
Stored as decimal(20,0) to prevent overflow (max uint64 > max int64)
fixed64
decimal(20,0)
Stored as decimal(20,0) to prevent overflow (max uint64 > max int64)
float
float
double
double
enum
string
Stored as the enum value name
string
string
bytes
binary
message
struct
map
map
repeated
list
oneof
struct
Converted to a struct where each option is an optional field
google.protobuf.Timestamp
timestamptz
Nanosecond precision is truncated to microseconds (Iceberg limitation)
Note: Iceberg does not have unsigned integer types. To prevent overflow when storing large unsigned values:
uint32andfixed32are stored asdecimal(10,0)(the maxuint32number4,294,967,295has 10 digits)uint64andfixed64are stored asdecimal(20,0)(the maxuint64number18,446,744,073,709,551,615has 20 digits)
JSON Schema Features Currently Not Supported
When using JSON input_schema, Tableflow currently rejects JSON Schema documents that use the following keywords:
prefixItemscontainspatternPropertiesdependentRequireddependentSchemasifthenelse$defs$refallOfoneOfanyOf
If any of these are present, schema conversion fails with an unsupported-feature validation error.
When to Declare schema (Output Schema)
schema (Output Schema)Define schema only when you use transforms and the post-transform output shape differs from the input shape.
input_schemadescribes how to decode source records (pre-transform).schemadescribes the Iceberg table shape (post-transform).When declared as raw schemas (string blocks), both
input_schemaandschemamust use the same format assource_format.
Deprecated schema definitions
This section contains documentation for the old way of declaring schemas definitions in tableflow. This was cumbersome because it required you to manually specify field-ids and used a schema definition that didn't necessarily map 1 to 1 with the type used to store your data. We still support it for backward compatibility but you shouldn't use it.
As shown in the above example, schemas specified with the inline mode contain a list of fields. Each field is named and has a unique integer id that will be mapped to the field ID for your Iceberg table as well as a type that will be used as the Iceberg date type for the corresponding column.
Primitive Types
The syntax for defining a primitive field looks like the following:
where field-type is one of the supported fields for your input record type (refer to the Protobuf / Avro / Json sections above).
Nested Types
For Avro and JSON only
A struct is specified as a tuple of typed values. Each field in the tuple is named and has an integer id that is unique in the table schema. Fields can be of any type.
For Avro, JSON and Protobuf
A map is specified as a key/value pair. Both the key field and value field have an integer id that is unique in the table schema. While map values can be either optional or required, map keys are required. For both Avro and JSON schemas, map keys can only be of type string, but values can be of any type. For Protobuf, map keys can be of any map key type allowed by the Protobuf specs, namely: string, int32, int64, uint32, uint64, sint32, sint64, sfixed32, sfixed64, fixed32, fixed64, boolean.
Note that for Protobuf, the proto_field_number must be added to the map field but should not be set for the key nor value. So a full example for a Protobuf map is:
A list (in JSON and Avro) or repeated (in Protobuf) is specified with a single element field. The element field is named element and has an integer id that is unique in the table schema. Elements can be either optional* or required and can be of any type.
Note that for Protobuf, the proto_field_number must be added to the repeated field but should not be set for the element. So a full example for a Protobuf repeated is:
*Support for optional list elements will be coming soon.
For Avro only
AVRO's binary encoding does not include field names nor type information and instead values are concatenated strictly based on the schema. Consequently, the decoder interprets the stream of bytes strictly according to the sequence defined in the schema.
You must list fields in your table schema in the same order as in the producer’s AVRO schema. Reordering fields (e.g. by id or by logical group) can cause decode failures such as avro: ReadBool: invalid bool or unexpected EOF. If you see these errors, compare your schema field order to the producer’s (e.g. the .avsc file or the schema in your schema registry) and align the order.
For Protobuf only
A message is defined using type: message. A proto_field_number must be provided for every nested field.
An enum is stored as a string in the Iceberg table using the enum value name. You must define all enum values with their corresponding numbers.
Enum values are stored by name in Iceberg but identified by number on the wire. This has implications for schema evolution:
adding new enum values is safe
renaming enum values is forbidden in WarpStream's TableFlow because renaming would cause inconsistent data (old records would have old names, new records would have new names)
removing old enum values is safe. But note that if we decode a record whose number is not in the current schema, it will be stored in Iceberg as the number in string form (e.g.
"99")
Note: WarpStream's TableFlow also validates that sibling enum fields have identical sets if they share any value name. This prevents accidental inconsistencies between different fields using the same enum type.
A oneof field is defined with type: oneof. Each option must be explicitly set to optional and with a proto_field_number, but the oneof field itself must be defined as required and without any proto_field_number.
Partitioning
Tableflow supports unpartitioned, timestamp partitioned tables using the record timestamp, and custom partitioning. Tables are unpartitioned by default and migrating between partitioning schemes is currently not supported.
The partitioning scheme is specified on a per-table basis. For convenience the partitioning_scheme option can be used to define unpartitioned tables and timestamp partitioned tables using the record timestamp. Supported values for partitioning_scheme include unpartitioned, hour, day, month, and year.
If a custom partitioning scheme is needed, then the custom_partitioning option can be used as follows:
source_field_id
The Input Column. The ID of the field in your schema (or a default field) that you want to partition on.
name
The Partition Alias. This is the name given to the partition itself. It does not have to match the source field name. It must start with a letter and contain only alphanumerics or underscores.
transform
An object defining the transform to be applied to the source column to produce a partition value.
The transform object requires a name and, depending on the type, additional parameters:
Time-based:
year,month,day,hour.bucket: Requires annfield (e.g.,{ name: "bucket", n: 16 }) to specify the number of buckets.truncate: Requires awfield (e.g.,{ name: "truncate", w: 10 }) to specify the width of the truncation.identity: Uses the source value as-is.
Note that to use custom partitioning the Agent version needs to be at least v748.
For example, to create hourly partitions on the Kafka timestamp and bucket the Kafka partitions into fours bins the configuration would look like this:
Transforms (Agent v730+)
Tableflow supports applying stateless transformations to ingested records. This can be helpful to massage the data into the desired shape before inserting it into the table without having to reprocess the data into a completely different topic first.
For example, imagine the topic contains a CDC change stream from debezium that looks like the following:
Without a transform, the table schema would have to be defined as follows:
This is unfortunate because users querying the data would always have to write their queries in the form: SELECT payload.after.field1 instead of simply SELECT field1.
Transforms solve this problem by rewriting the structure of the record before applying the table schema. For example, we can rewrite the Tableflow configuration above as follows:
Note that while the example above works for a JSON workload, it would not work for an Avro or Protobuf workload. The reason for this is that JSON is self-describing, while Avro and Protobuf records can only be deserialized if the schema is known ahead of time.
However, when transformations are used, records may only match the schema defined in the schema field after the transformation is applied. As a result, Avro and Protobuf workloads may require a separate input_schema which defines how to deserialize the input records pre-transformation.
For example, if the previous use-case were Avro encoded our Tableflow configuration would look like this:
Note that field IDs are not required for input_schema. Also, the optional property is not enforced by input_schema, all fields are treated as optional, so whether or not a field is optional or required is still ultimately determined by the value of schema even when input_schema and transforms are being used.
In summary, pre-transformation processed records must match input_schema and post-transformation must match schema . If input_schema is not defined, then it defaults to the same value as schema. input_schema is never required when processing JSON payloads, but may be required when processing Avro and Protobuf payloads.
Separately, keep in mind that transforms can be chained:
Tableflow transformations are executed by running arbitrary Bento Bloblang programs, but are limited to "pure" Bloblang functions that have no external side-effects.
Bloblang is a rich turing-complete programming language with many features, functions, methods, conditionals, and even error-handling. You can read more about Bloblang and its capabilities in the Bento Bloblang documentation, but but the basics are quite straightforward and can be grasped with a few examples.
The key thing to understand about Bloblang transformations is that they're mapping functions that mutate the input record into the desired shape. Within the context of a Tableflow Bloblang mapping, the this keyword refers to the input record and the root keyword refers to the output record. See the examples below to learn how to perform the most common transformations.
The Bento website has a powerful and interactive Bloblang playground that can be used to experiment with Bloblang mapping programs.
Rename a field
Delete a field
Add a field
Drop / filter out an entire record
Type Conversions
Data Retention and TTL
By default, data is retained in the table indefinitely. Optionally, a retention period can specified using the retention_ttl field. Retention must be expressed in units of hour (h).
Dead Letter Queue (DLQ) Mode
Requires Agent v737 or higher.
By default, Tableflow stops ingestion when it sees records that are incompatible with the provided schema to avoid head of line blocking. This behavior can be overridden using the dlq_mode field. Supported values include:
stop, which blocks ingestion upon encountering an invalid record (this is the default)skip, which skips invalid records during ingestion
Compression codecs
Requires Agent v748 or higher.
Tableflow supports a few compression codecs for the stored data files. The default one is snappy .
This codec can be overridden using the compression field. Supported values include:
snappygziplz4zstdbrotlinoneTo disable compression
Skipping raw record values
Requires Agent v749 or higher.
If you don't need to access the raw record values (the ones coming from the kafka topics) you can set the skip_raw_record_values to true in your config. This will result in smaller data files.
Handling topic re-creation
To define the table ingestion behavior when the source topic is recreated, use the topic_recreation_policy setting.
Currently, the only supported policy is recreate_table. This ensures data integrity by creating a new table (with a different identifier) whenever the system detects that the source topic has been re-created.
Pausing a Table
To temporarily stop ingestion for a specific table without removing it from your configuration, set paused to true:
To resume, set paused: false (or remove the field) and deploy the updated configuration. Ingestion will pick up from where it left off.
Table Schema
Tableflow creates an Iceberg table with a struct schema, containing all the fields from the configured schema as well as the following default fields:
warpstream
10000000
struct
warpstream.partition
10000001
int
warpstream.offset
10000002
long
warpstream.key
10000003
binary
warpstream.value
10000004
binary
warpstream.timestamp
10000005
timestamp
The warpstream.value field can be ommited with the skip_raw_record_values option.
Schema Migrations
Tableflow follows the Apache Iceberg schema evolution rules. Schema migrations are supported for adding columns, changing fields from required to optional, and widening integer and floating point types. To perform any of these operations, update the schema in the Configuration editor and deploy it. In the next few syncs of the table metadata into your bucket, the schema change will be reflected.
Ensure that you execute a schema change before attempting to send data with the new schema. If you fail to do this, you will potentially lose the data written with the newer schema.
Breaking Changes
Any schema change that is not one of the compatible operations listed above is considered a breaking change and requires Table Recreation. Specifically, the following changes are breaking:
Make an optional field required — Existing data may contain nulls for that field, so the constraint cannot be applied retroactively.
Change a type in a non-widening way — For example,
long→int,double→float, orstring→int. This includes any data type change that is not a supported numeric widening.Drop a column — Removing a column from the schema is not supported as an in-place migration.
Reorder columns — Changing the order of columns is not supported as an in-place migration.
Rename a column — Renaming a column is not currently supported as an in-place migration.
To apply breaking changes, use the recreation_key mechanism described in Table Recreation below.
Table Recreation
Certain operations require an existing table to be completely deleted and recreated. While this is most commonly necessary to apply breaking schema changes (such as converting an optional field to required, performing non-widening type conversions, or dropping columns), a full rebuild may also be required for other operational reasons. The recreation_key parameter is designed to automate this workflow.
To utilize this feature, assign an initial string value to the recreation_key in your table configuration. Any change to this value acts as a direct trigger for table recreation. When you need to rebuild a table, simply update the recreation_key to a new string and deploy the new configuration. Tableflow will detect the change, automatically drop the existing table, and provision a new one with the updated config.
Note that adding a recreation key to a table that does not already specify this option will trigger a recreation.
The following example demonstrates how to use the recreation_key to apply backward-incompatible schema changes:
To make the service field required and add a required severity field, bump the value (e.g. "v1" to "v2") alongside your schema change:
After recreation, Tableflow will re-ingest data from the earliest available offset in the source topic (subject to the topic's retention policy) into the new table.
Changing recreation_key will hard-delete the existing table's metadata. Data already written to the object store is not removed. The new table will only contain data that is still available in the source topic based on its retention settings.
Table Deletion
Tableflow does not delete any tables from the object storage bucket when they are removed from the Configuration in order to prevent accidental data deletion. To delete a table, first delete it from the Configuration and then use your cloud provider's UI or CLI to delete the directory containing your table from within the warpstream/_tableflow directory. To programmatically delete and recreate a table (e.g. for incompatible schema changes), see Table Recreation.
Object Storage Path Layout
Tableflow writes all table data and metadata into your object storage bucket under a predictable directory structure. Understanding this layout is useful for debugging, removing an entire table directory, or integrating with query engines that need direct file paths.
Given a bucket URL of s3://my-bucket (or s3://my-bucket?prefix=my-prefix), the structure is:
warpstream/_tableflow/
Root directory for all Tableflow tables.
<table_name>-<table_uuid>/
One directory per table.
data/
Parquet data files.
metadata/
Iceberg metadata: JSON metadata files (v1.metadata.json, v2.metadata.json, ...), manifest files (mani-*.avro), manifest lists (snap-*.avro), and a version-hint.text that points to the latest metadata version.
When a table is deleted from the configuration or via the API, Tableflow only removes the control plane metadata. The data and metadata files in object storage are not deleted. You must clean them up manually using your cloud provider's UI or CLI.
Terraform / Infrastructure as Code
Tableflow Agents are deployed using the standard WarpStream Agent chart, and there is full support for Tableflow clusters in the WarpStream Terraform provider.
Click here for a complete example of creating a Tableflow cluster and configuring it to ingest a single topic into an Iceberg table.
Observability
Ingestion lag for tables is emitted as a metric from the WarpStream Tableflow Agents. This metric is also available visually within the WarpStream Console. This metric is called warpstream_tableflow_partition_time_lag_seconds. Offset lag will be available at a later date.
This metric is tagged by table.id, table.name, topic, and partition. partition-level tagging can be disabled with the WARPSTREAM_DISABLE_CONSUMER_GROUP_METRICS_TAGS=partition environment variable.

Agent Version Requirements
Different Tableflow features require different minimum Agent versions. The table below summarizes the requirements:
Tableflow (core ingestion, metadata sync, compaction)
v710
AWS Glue integration
v710
Transforms (Bento/Bloblang)
v730
Dead Letter Queue (dlq_mode: stop / skip)
v737
BigQuery integration
v737
Custom partitioning
v748
Compression codecs
v748
skip_raw_record_values
v749
google.protobuf.Timestamp as partitioning field
v764
BigLake Metastore integration
v769
Hive Metadata Store integration
v769
input_schema without manual Iceberg field IDs
v771
Always check the Change Log for the latest feature additions and bug fixes. When in doubt, use the latest stable Agent version.
Tableflow UI
The Tableflow UI available in the WarpStream console allows editing the Configuration.

Last updated
Was this helpful?