Tableflow Setup

This page describes how to setup WarpStream Tableflow.

Introduction

Tableflow automates the tedious process of transforming a topic in an Apache Kafka-compatible data streaming system into an Apache Iceberg table. Instead of writing custom code and manually configuring a data pipeline for each table you want to build, Tableflow allows you to declaratively specify which topics to build tables from and what schema and data format to expect. When schemas inevitably need to change, you can update the schema in Tableflow's editor and WarpStream will handle the schema migration automatically.

Compaction and table maintenance is included out-of-the-box with no tuning required. Tableflow continuously compacts the table in the background with intelligent heuristics to ensure readers get the best performance.

Tableflow is available as Bring-Your-Own-Cloud (BYOC) where the compute and storage live inside your cloud account inside your VPC. The raw data for your table is only ever stored inside your object storage bucket and never leaves your VPC during the table ingestion and maintenance process. Tableflow maintains a metadata store inside WarpStream Cloud as the Iceberg metadata layer that is periodically synced into your object storage bucket.

Getting Started

To get started with Tableflow, you first need to create a Tableflow cluster from the WarpStream Console, or using one of infrastructure-as-code deployment options. The WarpStream Agents that join this cluster will only perform Tableflow operations and do not expose the Apache Kafka protocol. Please refer to our other documentation for how to install and configure the WarpStream Agents in your environment as the process does not differ for Tableflow.

As part of deploying the Agents, you'll also need to setup and configure an object storage bucket and/or provide the Agents with access to one of your existing buckets. See our object storage configuration documentation for more details on that.

Once the Agents are running, you can open the Configuration table and start defining your source clusters, topics, tables, and schemas. Currently the only method for defining schemas is inline mode, which doesn't require using an external Schema Registry. Schema Registry support will be available in a future release.

Managed Tables

Tableflow tables are fully managed by WarpStream, or what we call "managed tables". You cannot use another system for performing writes, compactions, or other table maintenance operations. This is in contrast to a connector-based approach where you would be forced to combine multiple distinct systems or operations together to implement all of these functions.

Configuration

Tableflow is configured with and is fully controllable from a single YAML file which can be edited through the WarpStream console or the Pipelines API.

Overview

The YAML specifies

  • The source clusters Tableflow should connect to.

  • For each cluster, the topic that Tableflow should create Iceberg tables from.

  • For each topic, the schema to deserialize the Kafka records with.

  • The destination bucket to store the table.

Configure Source Clusters

Source clusters are the Apache Kafka-compatible systems like WarpStream that store the topics you'd like to convert to tables. You define clusters by giving them a name, a list of brokers, and credentials if they are needed. You define source clusters at the root of the configuration YAML.

You can define multiple source clusters so a single Tableflow cluster can centralize data from multiple clusters into one unified place.

Configure Connection and Credentials

If credentials are needed to connect to the Kafka cluster, the connection information can be provided under the credentials block for each cluster.

circle-info

Note that if the source Kafka cluster is a WarpStream cluster, credentials still need to be provided if authentication is required. This is different from the Managed Data Pipelines setup where credentials are injected automatically.

TLS

  • use_tls specifies whether the Agents should use TLS when connecting to your source clusters.

  • tls_insecure_skip_verify specifies whether a client verifies the server's certificate chain and host name.

SASL

  • Both the sasl_username_env and the sasl_password_env fields refer to environment variable names. The Agents will append a TABLEFLOW_ prefix to the values of the fields before using the environment variables, so the environment variables in the Agent should be configured as TABLEFLOW_SASL_USERNAME_ENV_VAR and TABLEFLOW_SASL_PASSWORD_ENV_VAR respectively.

  • The default value of sasl_mechanism is plain. Supported mechanisms include: plain, scram-256, and scram-512.

mTLS PEM encoded certs

  • The mtls_client_cert_env, mtls_client_key_env, and mtls_server_ca_cert_env fields refer to environment variable names. The Agents will append a TABLEFLOW_ prefix to the values of the fields before using the environment variables, so the environment variables in the Agent should be configured as MTLS_CERT_PATH_ENV_VAR, MTLS_KEY_PATH_ENV_VAR, and MTLS_SERVER_CA_CERT_PATH_ENV_VAR respectively.

  • mtls_client_cert_env specifies the environment variable that contains the path to the X.509 certificate file in PEM format.

  • mtls_client_key_env specifies the environment variable that contains the path to the X.509 private key file in PEM format.

  • mtls_server_ca_cert_env is optional and specifies the environment variable that contains the path to the X.509 certificate file in PEM format for the client certificate authority. "If not specified, the host's root certificate pool will be used for client certificate verification.

Configure the Destination Bucket URL

To specify where your table data should be stored, use the destination_bucket_url field at the root of the configuration YAML. This configures the default destination bucket URL for all tables.

Alternatively, you can specify per-table bucket URL overrides within each tables configuration:

circle-exclamation

Check our object storage configuration documentation for more details on how to configure this URL for various different cloud providers, as well as for a complete list of permissions that the Agents will require.

Note that Tables will be created under the <bucket-name>/warpstream/_tableflow path. Optionally, a prefix can be specified in the bucket URL, which will result in Tables being created under the <bucket-name>/prefix/warpstream/_tableflow path.

Configure Topics and Tables

The next step is defining your tables and topics. Each table you define has exactly one source topic, and the table will be named the same as <cluster-name>+<topic+name>.

circle-info

Tableflow currently supports append-only tables. If you ingest data from a compacted topic in the source cluster, rows will not be deduplicated and any tombstones may not comply with the schema. Support for compacted topics is coming soon.

Schema Definitions

When using inline schemas, provide the schema in the same format as your input records.

schema_mode controls where schema definitions come from:

  • inline: schema is declared directly in the Tableflow config

The guidance in this section is about how to declare schemas when using schema_mode: inline.

  • If your records are JSON, provide a JSON Schema.

  • If your records are Avro, provide an Avro schema.

  • If your records are Protobuf, provide a .proto schema.

Use input_schema as the default way to declare schemas. If you are not using transforms, define only input_schema.

This is now the preferred pattern because it keeps schema declaration aligned with producer payload format and avoids manually maintaining Iceberg field IDs for simple pipelines.

Example: Json Input Schema

Supported types are:

boolean, int, long, float, double, decimal, date, time, timestamp, timestamptz, string, uuid, fixed, and binary.

Note that map keys can only be string

Example: Avro Input Schema

Supported types are:

boolean, int, long, float, double, decimal, date, time, timestamp, timestamptz, string, uuid, fixed, and binary.

Note that map keys can only be string.

Example: Protobuf Input Schema

When source_format is protobuf, set wire_format based on the payload encoding:

  • raw: protobuf binary payload with no prefix

  • confluent: Confluent wire format (magic byte + schema ID prefix)

The list of supported types are:

boolean, int32, sint32, uint32, fixed32, sfixed32, int64, sint64, uint64, fixed64, sfixed64, float, double, string, and bytes .

circle-exclamation

The Protobuf types are converted as follows to the Iceberg types:

Protobuf Type
Iceberg Type
Notes

boolean

boolean

int32

integer

sint32

integer

sfixed32

integer

uint32

decimal(10,0)

Stored as decimal(10,0) to prevent overflow (max uint32 > max int32)

fixed32

decimal(10,0)

Stored as decimal(10,0) to prevent overflow (max uint32 > max int32)

int64

long

sint64

long

sfixed64

long

uint64

decimal(20,0)

Stored as decimal(20,0) to prevent overflow (max uint64 > max int64)

fixed64

decimal(20,0)

Stored as decimal(20,0) to prevent overflow (max uint64 > max int64)

float

float

double

double

enum

string

Stored as the enum value name

string

string

bytes

binary

message

struct

map

map

repeated

list

oneof

struct

Converted to a struct where each option is an optional field

google.protobuf.Timestamp

timestamptz

Nanosecond precision is truncated to microseconds (Iceberg limitation)

Note: Iceberg does not have unsigned integer types. To prevent overflow when storing large unsigned values:

  • uint32 and fixed32 are stored as decimal(10,0) (the max uint32 number 4,294,967,295 has 10 digits)

  • uint64 and fixed64 are stored as decimal(20,0) (the max uint64 number 18,446,744,073,709,551,615 has 20 digits)

JSON Schema Features Currently Not Supported

When using JSON input_schema, Tableflow currently rejects JSON Schema documents that use the following keywords:

  • prefixItems

  • contains

  • patternProperties

  • dependentRequired

  • dependentSchemas

  • if

  • then

  • else

  • $defs

  • $ref

  • allOf

  • oneOf

  • anyOf

If any of these are present, schema conversion fails with an unsupported-feature validation error.

When to Declare schema (Output Schema)

Define schema only when you use transforms and the post-transform output shape differs from the input shape.

  • input_schema describes how to decode source records (pre-transform).

  • schema describes the Iceberg table shape (post-transform).

  • When declared as raw schemas (string blocks), both input_schema and schema must use the same format as source_format.

chevron-rightDeprecated schema definitionshashtag

This section contains documentation for the old way of declaring schemas definitions in tableflow. This was cumbersome because it required you to manually specify field-ids and used a schema definition that didn't necessarily map 1 to 1 with the type used to store your data. We still support it for backward compatibility but you shouldn't use it.

As shown in the above example, schemas specified with the inline mode contain a list of fields. Each field is named and has a unique integer id that will be mapped to the field ID for your Iceberg table as well as a type that will be used as the Iceberg date type for the corresponding column.

Primitive Types

The syntax for defining a primitive field looks like the following:

where field-type is one of the supported fields for your input record type (refer to the Protobuf / Avro / Json sections above).

Nested Types

For Avro and JSON only

A struct is specified as a tuple of typed values. Each field in the tuple is named and has an integer id that is unique in the table schema. Fields can be of any type.

For Avro, JSON and Protobuf

A map is specified as a key/value pair. Both the key field and value field have an integer id that is unique in the table schema. While map values can be either optional or required, map keys are required. For both Avro and JSON schemas, map keys can only be of type string, but values can be of any type. For Protobuf, map keys can be of any map key type allowed by the Protobuf specs, namely: string, int32, int64, uint32, uint64, sint32, sint64, sfixed32, sfixed64, fixed32, fixed64, boolean.

Note that for Protobuf, the proto_field_number must be added to the map field but should not be set for the key nor value. So a full example for a Protobuf map is:

A list (in JSON and Avro) or repeated (in Protobuf) is specified with a single element field. The element field is named element and has an integer id that is unique in the table schema. Elements can be either optional* or required and can be of any type.

Note that for Protobuf, the proto_field_number must be added to the repeated field but should not be set for the element. So a full example for a Protobuf repeated is:

*Support for optional list elements will be coming soon.

For Avro only

AVRO's binary encoding does not include field names nor type information and instead values are concatenated strictly based on the schema. Consequently, the decoder interprets the stream of bytes strictly according to the sequence defined in the schema.

You must list fields in your table schema in the same order as in the producer’s AVRO schema. Reordering fields (e.g. by id or by logical group) can cause decode failures such as avro: ReadBool: invalid bool or unexpected EOF. If you see these errors, compare your schema field order to the producer’s (e.g. the .avsc file or the schema in your schema registry) and align the order.

For Protobuf only

A message is defined using type: message. A proto_field_number must be provided for every nested field.

An enum is stored as a string in the Iceberg table using the enum value name. You must define all enum values with their corresponding numbers.

circle-exclamation

A oneof field is defined with type: oneof. Each option must be explicitly set to optional and with a proto_field_number, but the oneof field itself must be defined as required and without any proto_field_number.

Partitioning

Tableflow supports unpartitioned, timestamp partitioned tables using the record timestamp, and custom partitioning. Tables are unpartitioned by default and migrating between partitioning schemes is currently not supported.

The partitioning scheme is specified on a per-table basis. For convenience the partitioning_scheme option can be used to define unpartitioned tables and timestamp partitioned tables using the record timestamp. Supported values for partitioning_scheme include unpartitioned, hour, day, month, and year.

If a custom partitioning scheme is needed, then the custom_partitioning option can be used as follows:

Field
Description

source_field_id

The Input Column. The ID of the field in your schema (or a default field) that you want to partition on.

name

The Partition Alias. This is the name given to the partition itself. It does not have to match the source field name. It must start with a letter and contain only alphanumerics or underscores.

transform

An object defining the transform to be applied to the source column to produce a partition value.

The transform object requires a name and, depending on the type, additional parameters:

  • Time-based: year, month, day, hour.

  • bucket: Requires an n field (e.g., { name: "bucket", n: 16 }) to specify the number of buckets.

  • truncate: Requires a w field (e.g., { name: "truncate", w: 10 }) to specify the width of the truncation.

  • identity: Uses the source value as-is.

circle-info

Note that to use custom partitioning the Agent version needs to be at least v748.

For example, to create hourly partitions on the Kafka timestamp and bucket the Kafka partitions into fours bins the configuration would look like this:

Transforms (Agent v730+)

Tableflow supports applying stateless transformations to ingested records. This can be helpful to massage the data into the desired shape before inserting it into the table without having to reprocess the data into a completely different topic first.

For example, imagine the topic contains a CDC change stream from debeziumarrow-up-right that looks like the following:

Without a transform, the table schema would have to be defined as follows:

This is unfortunate because users querying the data would always have to write their queries in the form: SELECT payload.after.field1 instead of simply SELECT field1.

Transforms solve this problem by rewriting the structure of the record before applying the table schema. For example, we can rewrite the Tableflow configuration above as follows:

Note that while the example above works for a JSON workload, it would not work for an Avro or Protobuf workload. The reason for this is that JSON is self-describing, while Avro and Protobuf records can only be deserialized if the schema is known ahead of time.

However, when transformations are used, records may only match the schema defined in the schema field after the transformation is applied. As a result, Avro and Protobuf workloads may require a separate input_schema which defines how to deserialize the input records pre-transformation.

For example, if the previous use-case were Avro encoded our Tableflow configuration would look like this:

Note that field IDs are not required for input_schema. Also, the optional property is not enforced by input_schema, all fields are treated as optional, so whether or not a field is optional or required is still ultimately determined by the value of schema even when input_schema and transforms are being used.

In summary, pre-transformation processed records must match input_schema and post-transformation must match schema . If input_schema is not defined, then it defaults to the same value as schema. input_schema is never required when processing JSON payloads, but may be required when processing Avro and Protobuf payloads.

Separately, keep in mind that transforms can be chained:

Tableflow transformations are executed by running arbitrary Bento Bloblang programs, but are limited to "pure" Bloblang functions that have no external side-effects.

Bloblang is a rich turing-complete programming language with many featuresarrow-up-right, functionsarrow-up-right, methodsarrow-up-right, conditionalsarrow-up-right, and even error-handlingarrow-up-right. You can read more about Bloblang and its capabilities in the Bento Bloblang documentationarrow-up-right, but but the basics are quite straightforward and can be grasped with a few examples.

The key thing to understand about Bloblang transformations is that they're mapping functions that mutate the input record into the desired shape. Within the context of a Tableflow Bloblang mapping, the this keyword refers to the input record and the root keyword refers to the output record. See the examples below to learn how to perform the most common transformations.

circle-check

Rename a field

Delete a field

Add a field

Drop / filter out an entire record

Type Conversions

Data Retention and TTL

By default, data is retained in the table indefinitely. Optionally, a retention period can specified using the retention_ttl field. Retention must be expressed in units of hour (h).

Dead Letter Queue (DLQ) Mode

circle-info

Requires Agent v737 or higher.

By default, Tableflow stops ingestion when it sees records that are incompatible with the provided schema to avoid head of line blocking. This behavior can be overridden using the dlq_mode field. Supported values include:

  • stop, which blocks ingestion upon encountering an invalid record (this is the default)

  • skip, which skips invalid records during ingestion

Compression codecs

circle-info

Requires Agent v748 or higher.

Tableflow supports a few compression codecs for the stored data files. The default one is snappy .

This codec can be overridden using the compression field. Supported values include:

  • snappy

  • gzip

  • lz4

  • zstd

  • brotli

  • none To disable compression

Skipping raw record values

circle-info

Requires Agent v749 or higher.

If you don't need to access the raw record values (the ones coming from the kafka topics) you can set the skip_raw_record_values to true in your config. This will result in smaller data files.

Handling topic re-creation

To define the table ingestion behavior when the source topic is recreated, use the topic_recreation_policy setting. Currently, the only supported policy is recreate_table. This ensures data integrity by creating a new table (with a different identifier) whenever the system detects that the source topic has been re-created.

Pausing a Table

To temporarily stop ingestion for a specific table without removing it from your configuration, set paused to true:

To resume, set paused: false (or remove the field) and deploy the updated configuration. Ingestion will pick up from where it left off.

Table Schema

Tableflow creates an Iceberg table with a struct schema, containing all the fields from the configured schema as well as the following default fields:

Name
Field ID
Type

warpstream

10000000

struct

warpstream.partition

10000001

int

warpstream.offset

10000002

long

warpstream.key

10000003

binary

warpstream.value

10000004

binary

warpstream.timestamp

10000005

timestamp

The warpstream.value field can be ommited with the skip_raw_record_values option.

Schema Migrations

Tableflow follows the Apache Iceberg schema evolution rulesarrow-up-right. Schema migrations are supported for adding columns, changing fields from required to optional, and widening integer and floating point types. To perform any of these operations, update the schema in the Configuration editor and deploy it. In the next few syncs of the table metadata into your bucket, the schema change will be reflected.

circle-exclamation

Breaking Changes

Any schema change that is not one of the compatible operations listed above is considered a breaking change and requires Table Recreation. Specifically, the following changes are breaking:

  • Make an optional field required — Existing data may contain nulls for that field, so the constraint cannot be applied retroactively.

  • Change a type in a non-widening way — For example, longint, doublefloat, or stringint. This includes any data type change that is not a supported numeric widening.

  • Drop a column — Removing a column from the schema is not supported as an in-place migration.

  • Reorder columns — Changing the order of columns is not supported as an in-place migration.

  • Rename a column — Renaming a column is not currently supported as an in-place migration.

To apply breaking changes, use the recreation_key mechanism described in Table Recreation below.

Table Recreation

Certain operations require an existing table to be completely deleted and recreated. While this is most commonly necessary to apply breaking schema changes (such as converting an optional field to required, performing non-widening type conversions, or dropping columns), a full rebuild may also be required for other operational reasons. The recreation_key parameter is designed to automate this workflow.

To utilize this feature, assign an initial string value to the recreation_key in your table configuration. Any change to this value acts as a direct trigger for table recreation. When you need to rebuild a table, simply update the recreation_key to a new string and deploy the new configuration. Tableflow will detect the change, automatically drop the existing table, and provision a new one with the updated config.

triangle-exclamation

The following example demonstrates how to use the recreation_key to apply backward-incompatible schema changes:

To make the service field required and add a required severity field, bump the value (e.g. "v1" to "v2") alongside your schema change:

After recreation, Tableflow will re-ingest data from the earliest available offset in the source topic (subject to the topic's retention policy) into the new table.

circle-exclamation

Table Deletion

Tableflow does not delete any tables from the object storage bucket when they are removed from the Configuration in order to prevent accidental data deletion. To delete a table, first delete it from the Configuration and then use your cloud provider's UI or CLI to delete the directory containing your table from within the warpstream/_tableflow directory. To programmatically delete and recreate a table (e.g. for incompatible schema changes), see Table Recreation.

Object Storage Path Layout

Tableflow writes all table data and metadata into your object storage bucket under a predictable directory structure. Understanding this layout is useful for debugging, removing an entire table directory, or integrating with query engines that need direct file paths.

Given a bucket URL of s3://my-bucket (or s3://my-bucket?prefix=my-prefix), the structure is:

Path
Contents

warpstream/_tableflow/

Root directory for all Tableflow tables.

<table_name>-<table_uuid>/

One directory per table.

data/

Parquet data files.

metadata/

Iceberg metadata: JSON metadata files (v1.metadata.json, v2.metadata.json, ...), manifest files (mani-*.avro), manifest lists (snap-*.avro), and a version-hint.text that points to the latest metadata version.

circle-info

When a table is deleted from the configuration or via the API, Tableflow only removes the control plane metadata. The data and metadata files in object storage are not deleted. You must clean them up manually using your cloud provider's UI or CLI.

Terraform / Infrastructure as Code

Tableflow Agents are deployed using the standard WarpStream Agent chartarrow-up-right, and there is full support for Tableflow clusters in the WarpStream Terraform providerarrow-up-right.

Click herearrow-up-right for a complete example of creating a Tableflow cluster and configuring it to ingest a single topic into an Iceberg table.

Observability

Ingestion lag for tables is emitted as a metric from the WarpStream Tableflow Agents. This metric is also available visually within the WarpStream Console. This metric is called warpstream_tableflow_partition_time_lag_seconds. Offset lag will be available at a later date.

This metric is tagged by table.id, table.name, topic, and partition. partition-level tagging can be disabled with the WARPSTREAM_DISABLE_CONSUMER_GROUP_METRICS_TAGS=partition environment variable.

Agent Version Requirements

Different Tableflow features require different minimum Agent versions. The table below summarizes the requirements:

Feature
Minimum Agent Version

Tableflow (core ingestion, metadata sync, compaction)

v710

AWS Glue integration

v710

Transforms (Bento/Bloblang)

v730

Dead Letter Queue (dlq_mode: stop / skip)

v737

BigQuery integration

v737

Custom partitioning

v748

Compression codecs

v748

skip_raw_record_values

v749

google.protobuf.Timestamp as partitioning field

v764

BigLake Metastore integration

v769

Hive Metadata Store integration

v769

input_schema without manual Iceberg field IDs

v771

circle-info

Always check the Change Log for the latest feature additions and bug fixes. When in doubt, use the latest stable Agent version.

Tableflow UI

The Tableflow UI available in the WarpStream console allows editing the Configuration.

Last updated

Was this helpful?