Skip to content

Add Stream Source #969

@penghuo

Description

@penghuo

Stream source interface

  • Optional getLatestOffset()
    • return the latest Offset which map to S3Metadata. Notice, the StreamSource does NOT guarantee the Offset is mapping to unread files.
    • return empty if there is no data in data stream source.
// read stream from file data source
Set<Files> allFiles = fileDataSource.listAllObjects();

// get unread files
Set<Files> unreadFileds = Sets.*difference*(allFiles, seenObjects);

// update seenObjects
seenFiles = allFiles

Long latestBatchId = fileMetadataLog.getLatest()

if (!unreadFileds.isEmpty()) {
// has unread files
// update batchId, keep it monotonically increasing
    latestBatchId += 1;
// update s3MetadataLog    
    fileMetadataLog.add(latestBatchId, new S3Metadata(unreadFileds, latestBatchId));
    return Optional.of(new Offset(latestBatchId));
} else {
    return latestBatchId == -1 ? Optional.empty() : Optional.of(new Offset(latestBatchId));
}
  • Batch getBatch(Optional start, Offset end)
    • return the Batch from stream source between (start, end].

Stream source state maintain

  • FileMetadataLog maintain the mapping between Offset and FileMetadata. The user of FileMetadataLog MUST maintain the monotonically increasing of Offset.
    • Optional<Pair<Long, FileMetadata>> getLatest(). return the latest Offset and FileMetaData.
    • List<FileMetadata> get(Optional<Long> start, Optional<Long> end). return the list of FileMetaData between Offset range in [start, end]
    • boolean add(Long offset, T metadata). add Offset and FileMetaData.
  • SeenFiles, maintain the seen files from stream source so far.

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions