Background
#2017 introduced DB operations to submit ingested S3 object metadata into a compression job, as a part of the implementation for #1978. With this PR, the ingested_s3_object_metadata table is created and populated by the log-ingestor during ingestion.
For reference, this table contains:
| Column |
Type |
id |
BIGINT UNSIGNED |
bucket |
VARCHAR(1024) |
key |
VARCHAR(1024) |
size |
BIGINT UNSIGNED |
status |
ENUM(...) |
ingestion_job_id |
BIGINT UNSIGNED |
compression_job_id |
INT |
creation_ts |
DATETIME(3) |
last_update_ts |
DATETIME(3) |
Current Compression Job Submission & Processing Flow
Log Ingestor
- Once it has a full buffer of object metadata, the log ingestor calls
submit_for_compression.
- This method takes a list of
(S3ObjectMetadataId, key) pairs. It then only uses the S3 keys to build a clp_config which it then inserts into compression_jobs. The IDs are used to update columns in ingested_s3_object_metadata.
- Updates
ingested_s3_object_metadata.compression_job_id and ingested_s3_object_metadata.status to submitted for those rows.
Compression Scheduler
- Polls
compression_jobs for pending jobs
- For each pending job, it reads the
clp_config. Since this config only contains keys, the scheduler has no way to associate the key with the relevant metadata entry. It thus calls out to S3 to fetch object sizes before it can partition the work into tasks.
Problem
Object size is already stored in ingested_s3_object_metadata. The compression scheduler fetches it again from S3 which is redundant. This happens because submit_for_compression stores only keys in clp_config. This means the compression_jobs table has no reference to the rows in ingested_s3_object_metadata.
A symptom of this issue was present in #2017. In this PR, we had to change ObjectMetadata to include an ID in the DB for compression job submission. The way of setting this ID is not legit as it mutates ObjectMetadata while the information duplicates what's been persisted in CLP DB.
Proposal
We will eliminate the redundant trip to S3. When a compression job is submitted by the log-ingestor, the job config will contain
ingested_s3_object_metadata row IDs rather than S3 keys. This enables us to address the problem because:
- The compression scheduler can read the required metadata from the DB without contacting S3 eliminating the round trip.
- The log-ingestor no longer needs to hold
ObjectMetadata in its compression submitting buffer since the IDs can be associated with the metadata.
- This means we can remove the ID field added to the
ObjectMetadata
Compression job config schema
Scheduler
We will add a new input type IngestorInputConfig as a third variant of ClpIoConfig.input. It will resemble the following:
class IngestorInputConfig(BaseModel):
type: Literal[InputType.INGESTOR.value] = InputType.INGESTOR.value
dataset: str | None = None
timestamp_key: str | None = None
unstructured: bool = False
metadata_ids: list[int]
@field_validator("metadata_ids")
@classmethod
def validate_metadata_ids_non_empty(cls, value: list[int]) -> list[int]:
if len(value) == 0:
raise ValueError("metadata_ids cannot be an empty list")
return value
Ingestor
We will remove the keys param in clp_io_config.rs's S3InputConfig and add an optional :
/// Represents S3 input config.
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
pub struct S3InputConfig {
#[serde(flatten)]
pub s3_config: S3Config,
pub metadata_ids: Option<Vec<S3ObjectMetadataId>>,
pub dataset: Option<NonEmptyString>,
pub timestamp_key: Option<NonEmptyString>,
pub unstructured: bool,
}
Updated Flow
Log-ingestor: Submitting a Compression Job
We will introduce a named type CompressionBufferEntry (pls suggest a better name; maybe BufferedObjectMetadataEntry?) in buffer.rs for the data fed into the buffer. While the buffer itself will contain a vector of S3ObjectMetadataId, it flushes when total byte size of accumulated objects reaches a threshold. So each entry added to the buffer must carry both the metadata row id and that row's size, so we can maintain a running total and decide when to flush. We opt for a named struct as opposed to a tuple for readability and deriving FromRow for loading from the DB when recovering buffered metadata.
It will resemble:
/// A reference to a persisted S3 object metadata row
#[derive(Debug, Clone, PartialEq, Eq, FromRow)]
pub struct CompressionBufferEntry {
pub id: S3ObjectMetadataId,
pub size: u64,
}
We introduce this type
- Once it has a full buffer of object metadata, the log ingestor calls
submit_for_compression with a list of S3ObjectMetadataId values. Each item in this listed would have already been persisted in ingested_s3_object_metadata.
- It builds an
S3InputConfig with those IDs.
- It serializes and inserts that config into
compression_jobs.
- It updates
ingested_s3_object_metadata (unchanged).
The log-ingestor no longer needs to carry S3 keys through the buffer or the compression job submitter.
The buffer accepts a named type CompressionBufferEntry (id and size) for each persisted object. Size is used for the flush threshold; the submitter receives only the list of IDs when the buffer flushes.
Compression scheduler: Processing The Job
When the compression scheduler reads a pending job whose clp_config is an IngestorInputConfig:
- Extract
metadata_ids from the config.
- Query
ingested_s3_object_metadata for those IDs:
SELECT id, key, size FROM ingested_s3_object_metadata WHERE id IN (...)
- Build
FileMetadata(path=key, size=size) for each row and add to PathsToCompressBuffer.
- Partitioning and dispatching will be unchanged.
Implementation plan
Each item will be a PR:
- Compression Scheduler
- Add
IngestorInputConfig to job_config.py and extend ClpIoConfig.input to include it.
- In the scheduler, when
clp_config.input is IngestorInputConfig, follow the flow outlined above.
- Log-ingestor: submit by IDs and simplify buffer
- Update
clp_io_config.rs's S3InputConfig as described above.
- Introduce
CompressionBufferEntry in buffer.rs and update the listener interface to use it.
- Change the buffer and submitter interface so they work with the list of IDs when building the compression job, rather than passing a list of
ObjectMetadata with keys.
- Remove and stop populating the
id field from ObjectMetadata in clp-rust-utils
- Update log-ingestor's unit tests and component-wise integration tests so that the ingested files are buffered and checked directly through the ingestion job state implementation, and no buffering logic will be involved.
Background
#2017 introduced DB operations to submit ingested S3 object metadata into a compression job, as a part of the implementation for #1978. With this PR, the
ingested_s3_object_metadatatable is created and populated by the log-ingestor during ingestion.For reference, this table contains:
idbucketkeysizestatusingestion_job_idcompression_job_idcreation_tslast_update_tsCurrent Compression Job Submission & Processing Flow
Log Ingestor
submit_for_compression.(S3ObjectMetadataId, key)pairs. It then only uses the S3 keys to build aclp_configwhich it then inserts intocompression_jobs. The IDs are used to update columns iningested_s3_object_metadata.ingested_s3_object_metadata.compression_job_idandingested_s3_object_metadata.statustosubmittedfor those rows.Compression Scheduler
compression_jobsfor pending jobsclp_config. Since this config only contains keys, the scheduler has no way to associate the key with the relevant metadata entry. It thus calls out to S3 to fetch object sizes before it can partition the work into tasks.Problem
Object size is already stored in
ingested_s3_object_metadata. The compression scheduler fetches it again from S3 which is redundant. This happens becausesubmit_for_compressionstores only keys inclp_config. This means thecompression_jobstable has no reference to the rows iningested_s3_object_metadata.A symptom of this issue was present in #2017. In this PR, we had to change
ObjectMetadatato include an ID in the DB for compression job submission. The way of setting this ID is not legit as it mutatesObjectMetadatawhile the information duplicates what's been persisted in CLP DB.Proposal
We will eliminate the redundant trip to S3. When a compression job is submitted by the log-ingestor, the job config will contain
ingested_s3_object_metadatarow IDs rather than S3 keys. This enables us to address the problem because:ObjectMetadatain its compression submitting buffer since the IDs can be associated with the metadata.ObjectMetadataCompression job config schema
Scheduler
We will add a new input type
IngestorInputConfigas a third variant ofClpIoConfig.input. It will resemble the following:Ingestor
We will remove the
keysparam inclp_io_config.rs'sS3InputConfigand add an optional :Updated Flow
Log-ingestor: Submitting a Compression Job
We will introduce a named type
CompressionBufferEntry(pls suggest a better name; maybe BufferedObjectMetadataEntry?) inbuffer.rsfor the data fed into the buffer. While the buffer itself will contain a vector ofS3ObjectMetadataId, it flushes when total byte size of accumulated objects reaches a threshold. So each entry added to the buffer must carry both the metadata row id and that row'ssize, so we can maintain a running total and decide when to flush. We opt for a named struct as opposed to a tuple for readability and derivingFromRowfor loading from the DB when recovering buffered metadata.It will resemble:
We introduce this type
submit_for_compressionwith a list ofS3ObjectMetadataIdvalues. Each item in this listed would have already been persisted iningested_s3_object_metadata.S3InputConfigwith those IDs.compression_jobs.ingested_s3_object_metadata(unchanged).The log-ingestor no longer needs to carry S3 keys through the buffer or the compression job submitter.
The buffer accepts a named type
CompressionBufferEntry(id and size) for each persisted object. Size is used for the flush threshold; the submitter receives only the list of IDs when the buffer flushes.Compression scheduler: Processing The Job
When the compression scheduler reads a pending job whose
clp_configis anIngestorInputConfig:metadata_idsfrom the config.ingested_s3_object_metadatafor those IDs:FileMetadata(path=key, size=size)for each row and add toPathsToCompressBuffer.Implementation plan
Each item will be a PR:
IngestorInputConfigtojob_config.pyand extendClpIoConfig.inputto include it.clp_config.inputisIngestorInputConfig, follow the flow outlined above.clp_io_config.rs'sS3InputConfigas described above.CompressionBufferEntryinbuffer.rsand update the listener interface to use it.ObjectMetadatawith keys.idfield fromObjectMetadatainclp-rust-utils