refactor(mito): Allow creating multiple files in ParquetWriter (#5291)

* - **Refactored SST File Handling**:
   - Introduced `FilePathProvider` trait and its implementations (`WriteCachePathProvider`, `RegionFilePathFactory`) to manage SST and index file paths.
   - Updated `AccessLayer`, `WriteCache`, and `ParquetWriter` to use `FilePathProvider` for path management.
   - Modified `SstWriteRequest` and `SstUploadRequest` to use path providers instead of direct paths.
   - Files affected: `access_layer.rs`, `write_cache.rs`, `parquet.rs`, `writer.rs`.

 - **Enhanced Indexer Management**:
   - Replaced `IndexerBuilder` with `IndexerBuilderImpl` and made it async to support dynamic indexer creation.
   - Updated `ParquetWriter` to handle multiple indexers and file IDs.
   - Files affected: `index.rs`, `parquet.rs`, `writer.rs`.

 - **Removed Redundant File ID Handling**:
   - Removed `file_id` from `SstWriteRequest` and `CompactionOutput`.
   - Updated related logic to dynamically generate file IDs where necessary.
   - Files affected: `compaction.rs`, `flush.rs`, `picker.rs`, `twcs.rs`, `window.rs`.

 - **Test Adjustments**:
   - Updated tests to align with new path and indexer management.
   - Introduced `FixedPathProvider` and `NoopIndexBuilder` for testing purposes.
   - Files affected: `sst_util.rs`, `version_util.rs`, `parquet.rs`.

* chore: merge main

* refactor/generate-file-id-in-parquet-writer:
 **Enhance Logging in Compactor**

 - Updated `compactor.rs` to improve logging of compaction process.
   - Added `itertools::Itertools` for efficient string joining.
   - Moved logging of compaction inputs and outputs to the async block for better context.
   - Enhanced log message to include both input and output file names for better traceability.
This commit is contained in:
Lei, HUANG
2025-02-05 17:00:54 +08:00
committed by GitHub
parent 59b31372aa
commit dba6da4d00
13 changed files with 398 additions and 267 deletions

View File

@@ -17,10 +17,12 @@ use std::sync::Arc;
use object_store::services::Fs;
use object_store::util::{join_dir, with_instrument_layers};
use object_store::ObjectStore;
use smallvec::SmallVec;
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::SequenceNumber;
use store_api::storage::{RegionId, SequenceNumber};
use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
use crate::cache::write_cache::SstUploadRequest;
use crate::cache::CacheManagerRef;
use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig};
@@ -30,13 +32,15 @@ use crate::region::options::IndexOptions;
use crate::sst::file::{FileHandle, FileId, FileMeta};
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
use crate::sst::index::IndexerBuilder;
use crate::sst::index::IndexerBuilderImpl;
use crate::sst::location;
use crate::sst::parquet::reader::ParquetReaderBuilder;
use crate::sst::parquet::writer::ParquetWriter;
use crate::sst::parquet::{SstInfo, WriteOptions};
pub type AccessLayerRef = Arc<AccessLayer>;
/// SST write results.
pub type SstInfoArray = SmallVec<[SstInfo; 2]>;
/// A layer to access SST files under the same directory.
pub struct AccessLayer {
@@ -121,11 +125,8 @@ impl AccessLayer {
&self,
request: SstWriteRequest,
write_opts: &WriteOptions,
) -> Result<Option<SstInfo>> {
let file_path = location::sst_file_path(&self.region_dir, request.file_id);
let index_file_path = location::index_file_path(&self.region_dir, request.file_id);
) -> Result<SstInfoArray> {
let region_id = request.metadata.region_id;
let file_id = request.file_id;
let cache_manager = request.cache_manager.clone();
let sst_info = if let Some(write_cache) = cache_manager.write_cache() {
@@ -134,8 +135,9 @@ impl AccessLayer {
.write_and_upload_sst(
request,
SstUploadRequest {
upload_path: file_path,
index_upload_path: index_file_path,
dest_path_provider: RegionFilePathFactory {
region_dir: self.region_dir.clone(),
},
remote_store: self.object_store.clone(),
},
write_opts,
@@ -144,11 +146,9 @@ impl AccessLayer {
} else {
// Write cache is disabled.
let store = self.object_store.clone();
let indexer = IndexerBuilder {
let indexer_builder = IndexerBuilderImpl {
op_type: request.op_type,
file_id,
file_path: index_file_path,
metadata: &request.metadata,
metadata: request.metadata.clone(),
row_group_size: write_opts.row_group_size,
puffin_manager: self.puffin_manager_factory.build(store),
intermediate_manager: self.intermediate_manager.clone(),
@@ -156,24 +156,31 @@ impl AccessLayer {
inverted_index_config: request.inverted_index_config,
fulltext_index_config: request.fulltext_index_config,
bloom_filter_index_config: request.bloom_filter_index_config,
}
.build()
.await;
};
let mut writer = ParquetWriter::new_with_object_store(
self.object_store.clone(),
file_path,
request.metadata,
indexer,
);
indexer_builder,
RegionFilePathFactory {
region_dir: self.region_dir.clone(),
},
)
.await;
writer
.write_all(request.source, request.max_sequence, write_opts)
.await?
};
// Put parquet metadata to cache manager.
if let Some(sst_info) = &sst_info {
if let Some(parquet_metadata) = &sst_info.file_metadata {
cache_manager.put_parquet_meta_data(region_id, file_id, parquet_metadata.clone())
if !sst_info.is_empty() {
for sst in &sst_info {
if let Some(parquet_metadata) = &sst.file_metadata {
cache_manager.put_parquet_meta_data(
region_id,
sst.file_id,
parquet_metadata.clone(),
)
}
}
}
@@ -191,7 +198,6 @@ pub(crate) enum OperationType {
/// Contents to build a SST.
pub(crate) struct SstWriteRequest {
pub(crate) op_type: OperationType,
pub(crate) file_id: FileId,
pub(crate) metadata: RegionMetadataRef,
pub(crate) source: Source,
pub(crate) cache_manager: CacheManagerRef,
@@ -229,3 +235,47 @@ async fn clean_dir(dir: &str) -> Result<()> {
Ok(())
}
/// Path provider for SST file and index file.
pub trait FilePathProvider: Send + Sync {
/// Creates index file path of given file id.
fn build_index_file_path(&self, file_id: FileId) -> String;
/// Creates SST file path of given file id.
fn build_sst_file_path(&self, file_id: FileId) -> String;
}
/// Path provider that builds paths in local write cache.
#[derive(Clone)]
pub(crate) struct WriteCachePathProvider {
pub(crate) region_id: RegionId,
pub(crate) file_cache: FileCacheRef,
}
impl FilePathProvider for WriteCachePathProvider {
fn build_index_file_path(&self, file_id: FileId) -> String {
let puffin_key = IndexKey::new(self.region_id, file_id, FileType::Puffin);
self.file_cache.cache_file_path(puffin_key)
}
fn build_sst_file_path(&self, file_id: FileId) -> String {
let parquet_file_key = IndexKey::new(self.region_id, file_id, FileType::Parquet);
self.file_cache.cache_file_path(parquet_file_key)
}
}
/// Path provider that builds paths in region storage path.
#[derive(Clone, Debug)]
pub(crate) struct RegionFilePathFactory {
pub(crate) region_dir: String,
}
impl FilePathProvider for RegionFilePathFactory {
fn build_index_file_path(&self, file_id: FileId) -> String {
location::index_file_path(&self.region_dir, file_id)
}
fn build_sst_file_path(&self, file_id: FileId) -> String {
location::sst_file_path(&self.region_dir, file_id)
}
}

View File

@@ -23,7 +23,10 @@ use futures::AsyncWriteExt;
use object_store::ObjectStore;
use snafu::ResultExt;
use crate::access_layer::{new_fs_cache_store, SstWriteRequest};
use crate::access_layer::{
new_fs_cache_store, FilePathProvider, RegionFilePathFactory, SstInfoArray, SstWriteRequest,
WriteCachePathProvider,
};
use crate::cache::file_cache::{FileCache, FileCacheRef, FileType, IndexKey, IndexValue};
use crate::error::{self, Result};
use crate::metrics::{
@@ -32,9 +35,9 @@ use crate::metrics::{
};
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
use crate::sst::index::IndexerBuilder;
use crate::sst::index::IndexerBuilderImpl;
use crate::sst::parquet::writer::ParquetWriter;
use crate::sst::parquet::{SstInfo, WriteOptions};
use crate::sst::parquet::WriteOptions;
use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY};
/// A cache for uploading files to remote object stores.
@@ -103,22 +106,21 @@ impl WriteCache {
write_request: SstWriteRequest,
upload_request: SstUploadRequest,
write_opts: &WriteOptions,
) -> Result<Option<SstInfo>> {
) -> Result<SstInfoArray> {
let timer = FLUSH_ELAPSED
.with_label_values(&["write_sst"])
.start_timer();
let region_id = write_request.metadata.region_id;
let file_id = write_request.file_id;
let parquet_key = IndexKey::new(region_id, file_id, FileType::Parquet);
let puffin_key = IndexKey::new(region_id, file_id, FileType::Puffin);
let store = self.file_cache.local_store();
let indexer = IndexerBuilder {
let path_provider = WriteCachePathProvider {
file_cache: self.file_cache.clone(),
region_id,
};
let indexer = IndexerBuilderImpl {
op_type: write_request.op_type,
file_id,
file_path: self.file_cache.cache_file_path(puffin_key),
metadata: &write_request.metadata,
metadata: write_request.metadata.clone(),
row_group_size: write_opts.row_group_size,
puffin_manager: self.puffin_manager_factory.build(store),
intermediate_manager: self.intermediate_manager.clone(),
@@ -126,17 +128,16 @@ impl WriteCache {
inverted_index_config: write_request.inverted_index_config,
fulltext_index_config: write_request.fulltext_index_config,
bloom_filter_index_config: write_request.bloom_filter_index_config,
}
.build()
.await;
};
// Write to FileCache.
let mut writer = ParquetWriter::new_with_object_store(
self.file_cache.local_store(),
self.file_cache.cache_file_path(parquet_key),
write_request.metadata,
indexer,
);
path_provider,
)
.await;
let sst_info = writer
.write_all(write_request.source, write_request.max_sequence, write_opts)
@@ -145,22 +146,29 @@ impl WriteCache {
timer.stop_and_record();
// Upload sst file to remote object store.
let Some(sst_info) = sst_info else {
// No data need to upload.
return Ok(None);
};
let parquet_path = &upload_request.upload_path;
let remote_store = &upload_request.remote_store;
self.upload(parquet_key, parquet_path, remote_store).await?;
if sst_info.index_metadata.file_size > 0 {
let puffin_key = IndexKey::new(region_id, file_id, FileType::Puffin);
let puffin_path = &upload_request.index_upload_path;
self.upload(puffin_key, puffin_path, remote_store).await?;
if sst_info.is_empty() {
return Ok(sst_info);
}
Ok(Some(sst_info))
let remote_store = &upload_request.remote_store;
for sst in &sst_info {
let parquet_key = IndexKey::new(region_id, sst.file_id, FileType::Parquet);
let parquet_path = upload_request
.dest_path_provider
.build_sst_file_path(sst.file_id);
self.upload(parquet_key, &parquet_path, remote_store)
.await?;
if sst.index_metadata.file_size > 0 {
let puffin_key = IndexKey::new(region_id, sst.file_id, FileType::Puffin);
let puffin_path = &upload_request
.dest_path_provider
.build_index_file_path(sst.file_id);
self.upload(puffin_key, puffin_path, remote_store).await?;
}
}
Ok(sst_info)
}
/// Removes a file from the cache by `index_key`.
@@ -319,10 +327,8 @@ impl WriteCache {
/// Request to write and upload a SST.
pub struct SstUploadRequest {
/// Path to upload the file.
pub upload_path: String,
/// Path to upload the index file.
pub index_upload_path: String,
/// Destination path provider of which SST files in write cache should be uploaded to.
pub dest_path_provider: RegionFilePathFactory,
/// Remote object store to upload.
pub remote_store: ObjectStore,
}
@@ -336,11 +342,9 @@ mod tests {
use crate::cache::test_util::new_fs_store;
use crate::cache::{CacheManager, CacheStrategy};
use crate::region::options::IndexOptions;
use crate::sst::file::FileId;
use crate::sst::location::{index_file_path, sst_file_path};
use crate::sst::parquet::reader::ParquetReaderBuilder;
use crate::test_util::sst_util::{
assert_parquet_metadata_eq, new_batch_by_range, new_source, sst_file_handle,
assert_parquet_metadata_eq, new_batch_by_range, new_source, sst_file_handle_with_file_id,
sst_region_metadata,
};
use crate::test_util::TestEnv;
@@ -351,9 +355,9 @@ mod tests {
// and now just use local file system to mock.
let mut env = TestEnv::new();
let mock_store = env.init_object_store_manager();
let file_id = FileId::random();
let upload_path = sst_file_path("test", file_id);
let index_upload_path = index_file_path("test", file_id);
let path_provider = RegionFilePathFactory {
region_dir: "test".to_string(),
};
let local_dir = create_temp_dir("");
let local_store = new_fs_store(local_dir.path().to_str().unwrap());
@@ -373,7 +377,6 @@ mod tests {
let write_request = SstWriteRequest {
op_type: OperationType::Flush,
file_id,
metadata,
source,
storage: None,
@@ -386,8 +389,7 @@ mod tests {
};
let upload_request = SstUploadRequest {
upload_path: upload_path.clone(),
index_upload_path: index_upload_path.clone(),
dest_path_provider: path_provider.clone(),
remote_store: mock_store.clone(),
};
@@ -397,18 +399,22 @@ mod tests {
};
// Write to cache and upload sst to mock remote store
write_cache
let sst_info = write_cache
.write_and_upload_sst(write_request, upload_request, &write_opts)
.await
.unwrap()
.unwrap();
.remove(0); //todo(hl): we assume it only creates one file.
let file_id = sst_info.file_id;
let sst_upload_path = path_provider.build_sst_file_path(file_id);
let index_upload_path = path_provider.build_index_file_path(file_id);
// Check write cache contains the key
let key = IndexKey::new(region_id, file_id, FileType::Parquet);
assert!(write_cache.file_cache.contains_key(&key));
// Check file data
let remote_data = mock_store.read(&upload_path).await.unwrap();
let remote_data = mock_store.read(&sst_upload_path).await.unwrap();
let cache_data = local_store
.read(&write_cache.file_cache.cache_file_path(key))
.await
@@ -436,6 +442,7 @@ mod tests {
#[tokio::test]
async fn test_read_metadata_from_write_cache() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let data_home = env.data_home().display().to_string();
let mock_store = env.init_object_store_manager();
@@ -456,8 +463,7 @@ mod tests {
// Create source
let metadata = Arc::new(sst_region_metadata());
let handle = sst_file_handle(0, 1000);
let file_id = handle.file_id();
let source = new_source(&[
new_batch_by_range(&["a", "d"], 0, 60),
new_batch_by_range(&["b", "f"], 0, 40),
@@ -467,7 +473,6 @@ mod tests {
// Write to local cache and upload sst to mock remote store
let write_request = SstWriteRequest {
op_type: OperationType::Flush,
file_id,
metadata,
source,
storage: None,
@@ -482,11 +487,10 @@ mod tests {
row_group_size: 512,
..Default::default()
};
let upload_path = sst_file_path(&data_home, file_id);
let index_upload_path = index_file_path(&data_home, file_id);
let upload_request = SstUploadRequest {
upload_path: upload_path.clone(),
index_upload_path: index_upload_path.clone(),
dest_path_provider: RegionFilePathFactory {
region_dir: data_home.clone(),
},
remote_store: mock_store.clone(),
};
@@ -494,10 +498,11 @@ mod tests {
.write_and_upload_sst(write_request, upload_request, &write_opts)
.await
.unwrap()
.unwrap();
.remove(0);
let write_parquet_metadata = sst_info.file_metadata.unwrap();
// Read metadata from write cache
let handle = sst_file_handle_with_file_id(sst_info.file_id, 0, 1000);
let builder = ParquetReaderBuilder::new(data_home, handle.clone(), mock_store.clone())
.cache(CacheStrategy::EnableAll(cache_manager.clone()));
let reader = builder.build().await.unwrap();

View File

@@ -68,7 +68,7 @@ use crate::schedule::remote_job_scheduler::{
CompactionJob, DefaultNotifier, RemoteJob, RemoteJobSchedulerRef,
};
use crate::schedule::scheduler::SchedulerRef;
use crate::sst::file::{FileHandle, FileId, FileMeta, Level};
use crate::sst::file::{FileHandle, FileMeta, Level};
use crate::sst::version::LevelMeta;
use crate::worker::WorkerListener;
@@ -596,7 +596,6 @@ impl CompactionStatus {
#[derive(Debug, Clone)]
pub struct CompactionOutput {
pub output_file_id: FileId,
/// Compaction output file level.
pub output_level: Level,
/// Compaction input files.
@@ -610,7 +609,6 @@ pub struct CompactionOutput {
/// SerializedCompactionOutput is a serialized version of [CompactionOutput] by replacing [FileHandle] with [FileMeta].
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SerializedCompactionOutput {
output_file_id: FileId,
output_level: Level,
inputs: Vec<FileMeta>,
filter_deleted: bool,

View File

@@ -20,6 +20,7 @@ use api::v1::region::compact_request;
use common_meta::key::SchemaMetadataManagerRef;
use common_telemetry::{info, warn};
use common_time::TimeToLive;
use itertools::Itertools;
use object_store::manager::ObjectStoreManagerRef;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
@@ -278,19 +279,6 @@ impl Compactor for DefaultCompactor {
for output in picker_output.outputs.drain(..) {
compacted_inputs.extend(output.inputs.iter().map(|f| f.meta_ref().clone()));
info!(
"Compaction region {} output [{}]-> {}",
compaction_region.region_id,
output
.inputs
.iter()
.map(|f| f.file_id().to_string())
.collect::<Vec<_>>()
.join(","),
output.output_file_id
);
let write_opts = WriteOptions {
write_buffer_size: compaction_region.engine_config.sst_write_buffer_size,
..Default::default()
@@ -299,7 +287,6 @@ impl Compactor for DefaultCompactor {
let region_metadata = compaction_region.region_metadata.clone();
let sst_layer = compaction_region.access_layer.clone();
let region_id = compaction_region.region_id;
let file_id = output.output_file_id;
let cache_manager = compaction_region.cache_manager.clone();
let storage = compaction_region.region_options.storage.clone();
let index_options = compaction_region
@@ -320,6 +307,11 @@ impl Compactor for DefaultCompactor {
.max()
.flatten();
futs.push(async move {
let input_file_names = output
.inputs
.iter()
.map(|f| f.file_id().to_string())
.join(",");
let reader = CompactionSstReaderBuilder {
metadata: region_metadata.clone(),
sst_layer: sst_layer.clone(),
@@ -332,11 +324,10 @@ impl Compactor for DefaultCompactor {
}
.build_sst_reader()
.await?;
let file_meta_opt = sst_layer
let output_files = sst_layer
.write_sst(
SstWriteRequest {
op_type: OperationType::Compact,
file_id,
metadata: region_metadata,
source: Source::Reader(reader),
cache_manager,
@@ -350,9 +341,10 @@ impl Compactor for DefaultCompactor {
&write_opts,
)
.await?
.into_iter()
.map(|sst_info| FileMeta {
region_id,
file_id,
file_id: sst_info.file_id,
time_range: sst_info.time_range,
level: output.output_level,
file_size: sst_info.file_size,
@@ -361,8 +353,15 @@ impl Compactor for DefaultCompactor {
num_rows: sst_info.num_rows as u64,
num_row_groups: sst_info.num_row_groups,
sequence: max_sequence,
});
Ok(file_meta_opt)
})
.collect::<Vec<_>>();
let output_file_names =
output_files.iter().map(|f| f.file_id.to_string()).join(",");
info!(
"Region {} compaction inputs: [{}], outputs: [{}]",
region_id, input_file_names, output_file_names
);
Ok(output_files)
});
}
let mut output_files = Vec::with_capacity(futs.len());
@@ -377,7 +376,7 @@ impl Compactor for DefaultCompactor {
.await
.context(JoinSnafu)?
.into_iter()
.collect::<Result<Vec<_>>>()?;
.collect::<Result<Vec<Vec<_>>>>()?;
output_files.extend(metas.into_iter().flatten());
}

View File

@@ -61,7 +61,6 @@ impl From<&PickerOutput> for SerializedPickerOutput {
.outputs
.iter()
.map(|output| SerializedCompactionOutput {
output_file_id: output.output_file_id,
output_level: output.output_level,
inputs: output.inputs.iter().map(|s| s.meta_ref().clone()).collect(),
filter_deleted: output.filter_deleted,
@@ -91,7 +90,6 @@ impl PickerOutput {
.outputs
.into_iter()
.map(|output| CompactionOutput {
output_file_id: output.output_file_id,
output_level: output.output_level,
inputs: output
.inputs
@@ -167,14 +165,12 @@ mod tests {
let picker_output = PickerOutput {
outputs: vec![
CompactionOutput {
output_file_id: FileId::random(),
output_level: 0,
inputs: inputs_file_handle.clone(),
filter_deleted: false,
output_time_range: None,
},
CompactionOutput {
output_file_id: FileId::random(),
output_level: 0,
inputs: inputs_file_handle.clone(),
filter_deleted: false,
@@ -205,7 +201,6 @@ mod tests {
.iter()
.zip(picker_output_from_serialized.outputs.iter())
.for_each(|(expected, actual)| {
assert_eq!(expected.output_file_id, actual.output_file_id);
assert_eq!(expected.output_level, actual.output_level);
expected
.inputs

View File

@@ -26,7 +26,7 @@ use crate::compaction::compactor::CompactionRegion;
use crate::compaction::picker::{Picker, PickerOutput};
use crate::compaction::run::{find_sorted_runs, reduce_runs, Item};
use crate::compaction::{get_expired_ssts, CompactionOutput};
use crate::sst::file::{overlaps, FileHandle, FileId, Level};
use crate::sst::file::{overlaps, FileHandle, Level};
use crate::sst::version::LevelMeta;
const LEVEL_COMPACTED: Level = 1;
@@ -134,7 +134,6 @@ impl TwcsPicker {
for input in split_inputs {
debug_assert!(input.len() > 1);
output.push(CompactionOutput {
output_file_id: FileId::random(),
output_level: LEVEL_COMPACTED, // always compact to l1
inputs: input,
filter_deleted,
@@ -373,7 +372,7 @@ mod tests {
use super::*;
use crate::compaction::test_util::{new_file_handle, new_file_handles};
use crate::sst::file::{FileMeta, Level};
use crate::sst::file::{FileId, FileMeta, Level};
use crate::test_util::NoopFilePurger;
#[test]

View File

@@ -26,7 +26,7 @@ use crate::compaction::buckets::infer_time_bucket;
use crate::compaction::compactor::{CompactionRegion, CompactionVersion};
use crate::compaction::picker::{Picker, PickerOutput};
use crate::compaction::{get_expired_ssts, CompactionOutput};
use crate::sst::file::{FileHandle, FileId};
use crate::sst::file::FileHandle;
/// Compaction picker that splits the time range of all involved files to windows, and merges
/// the data segments intersects with those windows of files together so that the output files
@@ -132,7 +132,6 @@ fn build_output(windows: BTreeMap<i64, (i64, Vec<FileHandle>)>) -> Vec<Compactio
);
let output = CompactionOutput {
output_file_id: FileId::random(),
output_level: 1,
inputs: files,
filter_deleted: false,

View File

@@ -45,7 +45,7 @@ use crate::request::{
SenderWriteRequest, WorkerRequest,
};
use crate::schedule::scheduler::{Job, SchedulerRef};
use crate::sst::file::{FileId, FileMeta};
use crate::sst::file::FileMeta;
use crate::sst::parquet::WriteOptions;
use crate::worker::WorkerListener;
@@ -347,14 +347,12 @@ impl RegionFlushTask {
}
let max_sequence = mem.stats().max_sequence();
let file_id = FileId::random();
let iter = mem.iter(None, None, None)?;
let source = Source::Iter(iter);
// Flush to level 0.
let write_request = SstWriteRequest {
op_type: OperationType::Flush,
file_id,
metadata: version.metadata.clone(),
source,
cache_manager: self.cache_manager.clone(),
@@ -365,29 +363,31 @@ impl RegionFlushTask {
fulltext_index_config: self.engine_config.fulltext_index.clone(),
bloom_filter_index_config: self.engine_config.bloom_filter_index.clone(),
};
let Some(sst_info) = self
let ssts_written = self
.access_layer
.write_sst(write_request, &write_opts)
.await?
else {
.await?;
if ssts_written.is_empty() {
// No data written.
continue;
};
}
flushed_bytes += sst_info.file_size;
let file_meta = FileMeta {
region_id: self.region_id,
file_id,
time_range: sst_info.time_range,
level: 0,
file_size: sst_info.file_size,
available_indexes: sst_info.index_metadata.build_available_indexes(),
index_file_size: sst_info.index_metadata.file_size,
num_rows: sst_info.num_rows as u64,
num_row_groups: sst_info.num_row_groups,
sequence: NonZeroU64::new(max_sequence),
};
file_metas.push(file_meta);
file_metas.extend(ssts_written.into_iter().map(|sst_info| {
flushed_bytes += sst_info.file_size;
FileMeta {
region_id: self.region_id,
file_id: sst_info.file_id,
time_range: sst_info.time_range,
level: 0,
file_size: sst_info.file_size,
available_indexes: sst_info.index_metadata.build_available_indexes(),
index_file_size: sst_info.index_metadata.file_size,
num_rows: sst_info.num_rows as u64,
num_row_groups: sst_info.num_row_groups,
sequence: NonZeroU64::new(max_sequence),
}
}));
}
if !file_metas.is_empty() {

View File

@@ -105,7 +105,6 @@ pub struct Indexer {
file_id: FileId,
file_path: String,
region_id: RegionId,
puffin_manager: Option<SstPuffinManager>,
inverted_indexer: Option<InvertedIndexer>,
last_mem_inverted_index: usize,
@@ -168,11 +167,15 @@ impl Indexer {
}
}
pub(crate) struct IndexerBuilder<'a> {
#[async_trait::async_trait]
pub trait IndexerBuilder {
/// Builds indexer of given file id to [index_file_path].
async fn build(&self, file_id: FileId, index_file_path: String) -> Indexer;
}
pub(crate) struct IndexerBuilderImpl {
pub(crate) op_type: OperationType,
pub(crate) file_id: FileId,
pub(crate) file_path: String,
pub(crate) metadata: &'a RegionMetadataRef,
pub(crate) metadata: RegionMetadataRef,
pub(crate) row_group_size: usize,
pub(crate) puffin_manager: SstPuffinManager,
pub(crate) intermediate_manager: IntermediateManager,
@@ -182,20 +185,20 @@ pub(crate) struct IndexerBuilder<'a> {
pub(crate) bloom_filter_index_config: BloomFilterConfig,
}
impl IndexerBuilder<'_> {
#[async_trait::async_trait]
impl IndexerBuilder for IndexerBuilderImpl {
/// Sanity check for arguments and create a new [Indexer] if arguments are valid.
pub(crate) async fn build(self) -> Indexer {
async fn build(&self, file_id: FileId, index_file_path: String) -> Indexer {
let mut indexer = Indexer {
file_id: self.file_id,
file_path: self.file_path.clone(),
file_id,
file_path: index_file_path,
region_id: self.metadata.region_id,
..Default::default()
};
indexer.inverted_indexer = self.build_inverted_indexer();
indexer.fulltext_indexer = self.build_fulltext_indexer().await;
indexer.bloom_filter_indexer = self.build_bloom_filter_indexer();
indexer.inverted_indexer = self.build_inverted_indexer(file_id);
indexer.fulltext_indexer = self.build_fulltext_indexer(file_id).await;
indexer.bloom_filter_indexer = self.build_bloom_filter_indexer(file_id);
if indexer.inverted_indexer.is_none()
&& indexer.fulltext_indexer.is_none()
&& indexer.bloom_filter_indexer.is_none()
@@ -204,11 +207,13 @@ impl IndexerBuilder<'_> {
return Indexer::default();
}
indexer.puffin_manager = Some(self.puffin_manager);
indexer.puffin_manager = Some(self.puffin_manager.clone());
indexer
}
}
fn build_inverted_indexer(&self) -> Option<InvertedIndexer> {
impl IndexerBuilderImpl {
fn build_inverted_indexer(&self, file_id: FileId) -> Option<InvertedIndexer> {
let create = match self.op_type {
OperationType::Flush => self.inverted_index_config.create_on_flush.auto(),
OperationType::Compact => self.inverted_index_config.create_on_compaction.auto(),
@@ -217,7 +222,7 @@ impl IndexerBuilder<'_> {
if !create {
debug!(
"Skip creating inverted index due to config, region_id: {}, file_id: {}",
self.metadata.region_id, self.file_id,
self.metadata.region_id, file_id,
);
return None;
}
@@ -228,7 +233,7 @@ impl IndexerBuilder<'_> {
if indexed_column_ids.is_empty() {
debug!(
"No columns to be indexed, skip creating inverted index, region_id: {}, file_id: {}",
self.metadata.region_id, self.file_id,
self.metadata.region_id, file_id,
);
return None;
}
@@ -238,7 +243,7 @@ impl IndexerBuilder<'_> {
else {
warn!(
"Segment row count is 0, skip creating index, region_id: {}, file_id: {}",
self.metadata.region_id, self.file_id,
self.metadata.region_id, file_id,
);
return None;
};
@@ -246,7 +251,7 @@ impl IndexerBuilder<'_> {
let Some(row_group_size) = NonZeroUsize::new(self.row_group_size) else {
warn!(
"Row group size is 0, skip creating index, region_id: {}, file_id: {}",
self.metadata.region_id, self.file_id,
self.metadata.region_id, file_id,
);
return None;
};
@@ -257,8 +262,8 @@ impl IndexerBuilder<'_> {
}
let indexer = InvertedIndexer::new(
self.file_id,
self.metadata,
file_id,
&self.metadata,
self.intermediate_manager.clone(),
self.inverted_index_config.mem_threshold_on_create(),
segment_row_count,
@@ -268,7 +273,7 @@ impl IndexerBuilder<'_> {
Some(indexer)
}
async fn build_fulltext_indexer(&self) -> Option<FulltextIndexer> {
async fn build_fulltext_indexer(&self, file_id: FileId) -> Option<FulltextIndexer> {
let create = match self.op_type {
OperationType::Flush => self.fulltext_index_config.create_on_flush.auto(),
OperationType::Compact => self.fulltext_index_config.create_on_compaction.auto(),
@@ -277,7 +282,7 @@ impl IndexerBuilder<'_> {
if !create {
debug!(
"Skip creating full-text index due to config, region_id: {}, file_id: {}",
self.metadata.region_id, self.file_id,
self.metadata.region_id, file_id,
);
return None;
}
@@ -285,9 +290,9 @@ impl IndexerBuilder<'_> {
let mem_limit = self.fulltext_index_config.mem_threshold_on_create();
let creator = FulltextIndexer::new(
&self.metadata.region_id,
&self.file_id,
&file_id,
&self.intermediate_manager,
self.metadata,
&self.metadata,
self.fulltext_index_config.compress,
mem_limit,
)
@@ -298,7 +303,7 @@ impl IndexerBuilder<'_> {
if creator.is_none() {
debug!(
"Skip creating full-text index due to no columns require indexing, region_id: {}, file_id: {}",
self.metadata.region_id, self.file_id,
self.metadata.region_id, file_id,
);
}
return creator;
@@ -309,19 +314,19 @@ impl IndexerBuilder<'_> {
if cfg!(any(test, feature = "test")) {
panic!(
"Failed to create full-text indexer, region_id: {}, file_id: {}, err: {:?}",
self.metadata.region_id, self.file_id, err
self.metadata.region_id, file_id, err
);
} else {
warn!(
err; "Failed to create full-text indexer, region_id: {}, file_id: {}",
self.metadata.region_id, self.file_id,
self.metadata.region_id, file_id,
);
}
None
}
fn build_bloom_filter_indexer(&self) -> Option<BloomFilterIndexer> {
fn build_bloom_filter_indexer(&self, file_id: FileId) -> Option<BloomFilterIndexer> {
let create = match self.op_type {
OperationType::Flush => self.bloom_filter_index_config.create_on_flush.auto(),
OperationType::Compact => self.bloom_filter_index_config.create_on_compaction.auto(),
@@ -330,15 +335,15 @@ impl IndexerBuilder<'_> {
if !create {
debug!(
"Skip creating bloom filter due to config, region_id: {}, file_id: {}",
self.metadata.region_id, self.file_id,
self.metadata.region_id, file_id,
);
return None;
}
let mem_limit = self.bloom_filter_index_config.mem_threshold_on_create();
let indexer = BloomFilterIndexer::new(
self.file_id,
self.metadata,
file_id,
&self.metadata,
self.intermediate_manager.clone(),
mem_limit,
);
@@ -348,7 +353,7 @@ impl IndexerBuilder<'_> {
if indexer.is_none() {
debug!(
"Skip creating bloom filter due to no columns require indexing, region_id: {}, file_id: {}",
self.metadata.region_id, self.file_id,
self.metadata.region_id, file_id,
);
}
return indexer;
@@ -359,12 +364,12 @@ impl IndexerBuilder<'_> {
if cfg!(any(test, feature = "test")) {
panic!(
"Failed to create bloom filter, region_id: {}, file_id: {}, err: {:?}",
self.metadata.region_id, self.file_id, err
self.metadata.region_id, file_id, err
);
} else {
warn!(
err; "Failed to create bloom filter, region_id: {}, file_id: {}",
self.metadata.region_id, self.file_id,
self.metadata.region_id, file_id,
);
}
@@ -490,11 +495,9 @@ mod tests {
with_fulltext: true,
with_skipping_bloom: true,
});
let indexer = IndexerBuilder {
let indexer = IndexerBuilderImpl {
op_type: OperationType::Flush,
file_id: FileId::random(),
file_path: "test".to_string(),
metadata: &metadata,
metadata,
row_group_size: 1024,
puffin_manager: factory.build(mock_object_store()),
intermediate_manager: intm_manager,
@@ -503,7 +506,7 @@ mod tests {
fulltext_index_config: FulltextIndexConfig::default(),
bloom_filter_index_config: BloomFilterConfig::default(),
}
.build()
.build(FileId::random(), "test".to_string())
.await;
assert!(indexer.inverted_indexer.is_some());
@@ -522,11 +525,9 @@ mod tests {
with_fulltext: true,
with_skipping_bloom: true,
});
let indexer = IndexerBuilder {
let indexer = IndexerBuilderImpl {
op_type: OperationType::Flush,
file_id: FileId::random(),
file_path: "test".to_string(),
metadata: &metadata,
metadata: metadata.clone(),
row_group_size: 1024,
puffin_manager: factory.build(mock_object_store()),
intermediate_manager: intm_manager.clone(),
@@ -538,18 +539,16 @@ mod tests {
fulltext_index_config: FulltextIndexConfig::default(),
bloom_filter_index_config: BloomFilterConfig::default(),
}
.build()
.build(FileId::random(), "test".to_string())
.await;
assert!(indexer.inverted_indexer.is_none());
assert!(indexer.fulltext_indexer.is_some());
assert!(indexer.bloom_filter_indexer.is_some());
let indexer = IndexerBuilder {
let indexer = IndexerBuilderImpl {
op_type: OperationType::Compact,
file_id: FileId::random(),
file_path: "test".to_string(),
metadata: &metadata,
metadata: metadata.clone(),
row_group_size: 1024,
puffin_manager: factory.build(mock_object_store()),
intermediate_manager: intm_manager.clone(),
@@ -561,18 +560,16 @@ mod tests {
},
bloom_filter_index_config: BloomFilterConfig::default(),
}
.build()
.build(FileId::random(), "test".to_string())
.await;
assert!(indexer.inverted_indexer.is_some());
assert!(indexer.fulltext_indexer.is_none());
assert!(indexer.bloom_filter_indexer.is_some());
let indexer = IndexerBuilder {
let indexer = IndexerBuilderImpl {
op_type: OperationType::Compact,
file_id: FileId::random(),
file_path: "test".to_string(),
metadata: &metadata,
metadata,
row_group_size: 1024,
puffin_manager: factory.build(mock_object_store()),
intermediate_manager: intm_manager,
@@ -584,7 +581,7 @@ mod tests {
..Default::default()
},
}
.build()
.build(FileId::random(), "test".to_string())
.await;
assert!(indexer.inverted_indexer.is_some());
@@ -603,11 +600,9 @@ mod tests {
with_fulltext: true,
with_skipping_bloom: true,
});
let indexer = IndexerBuilder {
let indexer = IndexerBuilderImpl {
op_type: OperationType::Flush,
file_id: FileId::random(),
file_path: "test".to_string(),
metadata: &metadata,
metadata: metadata.clone(),
row_group_size: 1024,
puffin_manager: factory.build(mock_object_store()),
intermediate_manager: intm_manager.clone(),
@@ -616,7 +611,7 @@ mod tests {
fulltext_index_config: FulltextIndexConfig::default(),
bloom_filter_index_config: BloomFilterConfig::default(),
}
.build()
.build(FileId::random(), "test".to_string())
.await;
assert!(indexer.inverted_indexer.is_none());
@@ -628,11 +623,9 @@ mod tests {
with_fulltext: false,
with_skipping_bloom: true,
});
let indexer = IndexerBuilder {
let indexer = IndexerBuilderImpl {
op_type: OperationType::Flush,
file_id: FileId::random(),
file_path: "test".to_string(),
metadata: &metadata,
metadata: metadata.clone(),
row_group_size: 1024,
puffin_manager: factory.build(mock_object_store()),
intermediate_manager: intm_manager.clone(),
@@ -641,7 +634,7 @@ mod tests {
fulltext_index_config: FulltextIndexConfig::default(),
bloom_filter_index_config: BloomFilterConfig::default(),
}
.build()
.build(FileId::random(), "test".to_string())
.await;
assert!(indexer.inverted_indexer.is_some());
@@ -653,11 +646,9 @@ mod tests {
with_fulltext: true,
with_skipping_bloom: false,
});
let indexer = IndexerBuilder {
let indexer = IndexerBuilderImpl {
op_type: OperationType::Flush,
file_id: FileId::random(),
file_path: "test".to_string(),
metadata: &metadata,
metadata: metadata.clone(),
row_group_size: 1024,
puffin_manager: factory.build(mock_object_store()),
intermediate_manager: intm_manager,
@@ -666,7 +657,7 @@ mod tests {
fulltext_index_config: FulltextIndexConfig::default(),
bloom_filter_index_config: BloomFilterConfig::default(),
}
.build()
.build(FileId::random(), "test".to_string())
.await;
assert!(indexer.inverted_indexer.is_some());
@@ -685,11 +676,9 @@ mod tests {
with_fulltext: true,
with_skipping_bloom: true,
});
let indexer = IndexerBuilder {
let indexer = IndexerBuilderImpl {
op_type: OperationType::Flush,
file_id: FileId::random(),
file_path: "test".to_string(),
metadata: &metadata,
metadata,
row_group_size: 0,
puffin_manager: factory.build(mock_object_store()),
intermediate_manager: intm_manager,
@@ -698,7 +687,7 @@ mod tests {
fulltext_index_config: FulltextIndexConfig::default(),
bloom_filter_index_config: BloomFilterConfig::default(),
}
.build()
.build(FileId::random(), "test".to_string())
.await;
assert!(indexer.inverted_indexer.is_none());

View File

@@ -19,7 +19,7 @@ use std::sync::Arc;
use common_base::readable_size::ReadableSize;
use parquet::file::metadata::ParquetMetaData;
use crate::sst::file::FileTimeRange;
use crate::sst::file::{FileId, FileTimeRange};
use crate::sst::index::IndexOutput;
use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
@@ -62,6 +62,8 @@ impl Default for WriteOptions {
/// Parquet SST info returned by the writer.
pub struct SstInfo {
/// SST file id.
pub file_id: FileId,
/// Time range of the SST. The timestamps have the same time unit as the
/// data in the SST.
pub time_range: FileTimeRange,
@@ -95,12 +97,13 @@ mod tests {
use tokio_util::compat::FuturesAsyncWriteCompatExt;
use super::*;
use crate::access_layer::FilePathProvider;
use crate::cache::{CacheManager, CacheStrategy, PageKey};
use crate::sst::index::Indexer;
use crate::sst::index::{Indexer, IndexerBuilder};
use crate::sst::parquet::format::WriteFormat;
use crate::sst::parquet::reader::ParquetReaderBuilder;
use crate::sst::parquet::writer::ParquetWriter;
use crate::sst::DEFAULT_WRITE_CONCURRENCY;
use crate::sst::{location, DEFAULT_WRITE_CONCURRENCY};
use crate::test_util::sst_util::{
assert_parquet_metadata_eq, build_test_binary_test_region_metadata, new_batch_by_range,
new_batch_with_binary, new_source, sst_file_handle, sst_region_metadata,
@@ -109,12 +112,38 @@ mod tests {
const FILE_DIR: &str = "/";
#[derive(Clone)]
struct FixedPathProvider {
file_id: FileId,
}
impl FilePathProvider for FixedPathProvider {
fn build_index_file_path(&self, _file_id: FileId) -> String {
location::index_file_path(FILE_DIR, self.file_id)
}
fn build_sst_file_path(&self, _file_id: FileId) -> String {
location::sst_file_path(FILE_DIR, self.file_id)
}
}
struct NoopIndexBuilder;
#[async_trait::async_trait]
impl IndexerBuilder for NoopIndexBuilder {
async fn build(&self, _file_id: FileId, _path: String) -> Indexer {
Indexer::default()
}
}
#[tokio::test]
async fn test_write_read() {
let mut env = TestEnv::new();
let object_store = env.init_object_store_manager();
let handle = sst_file_handle(0, 1000);
let file_path = handle.file_path(FILE_DIR);
let file_path = FixedPathProvider {
file_id: handle.file_id(),
};
let metadata = Arc::new(sst_region_metadata());
let source = new_source(&[
new_batch_by_range(&["a", "d"], 0, 60),
@@ -126,18 +155,20 @@ mod tests {
row_group_size: 50,
..Default::default()
};
let mut writer = ParquetWriter::new_with_object_store(
object_store.clone(),
metadata.clone(),
NoopIndexBuilder,
file_path,
metadata,
Indexer::default(),
);
)
.await;
let info = writer
.write_all(source, None, &write_opts)
.await
.unwrap()
.unwrap();
.remove(0);
assert_eq!(200, info.num_rows);
assert!(info.file_size > 0);
assert_eq!(
@@ -168,7 +199,6 @@ mod tests {
let mut env = TestEnv::new();
let object_store = env.init_object_store_manager();
let handle = sst_file_handle(0, 1000);
let file_path = handle.file_path(FILE_DIR);
let metadata = Arc::new(sst_region_metadata());
let source = new_source(&[
new_batch_by_range(&["a", "d"], 0, 60),
@@ -183,16 +213,19 @@ mod tests {
// Prepare data.
let mut writer = ParquetWriter::new_with_object_store(
object_store.clone(),
file_path,
metadata.clone(),
Indexer::default(),
);
NoopIndexBuilder,
FixedPathProvider {
file_id: handle.file_id(),
},
)
.await;
writer
.write_all(source, None, &write_opts)
.await
.unwrap()
.unwrap();
.remove(0);
// Enable page cache.
let cache = CacheStrategy::EnableAll(Arc::new(
@@ -236,7 +269,6 @@ mod tests {
let mut env = crate::test_util::TestEnv::new();
let object_store = env.init_object_store_manager();
let handle = sst_file_handle(0, 1000);
let file_path = handle.file_path(FILE_DIR);
let metadata = Arc::new(sst_region_metadata());
let source = new_source(&[
new_batch_by_range(&["a", "d"], 0, 60),
@@ -252,16 +284,19 @@ mod tests {
// sst info contains the parquet metadata, which is converted from FileMetaData
let mut writer = ParquetWriter::new_with_object_store(
object_store.clone(),
file_path,
metadata.clone(),
Indexer::default(),
);
NoopIndexBuilder,
FixedPathProvider {
file_id: handle.file_id(),
},
)
.await;
let sst_info = writer
.write_all(source, None, &write_opts)
.await
.unwrap()
.expect("write_all should return sst info");
.remove(0);
let writer_metadata = sst_info.file_metadata.unwrap();
// read the sst file metadata
@@ -277,7 +312,6 @@ mod tests {
let mut env = TestEnv::new();
let object_store = env.init_object_store_manager();
let handle = sst_file_handle(0, 1000);
let file_path = handle.file_path(FILE_DIR);
let metadata = Arc::new(sst_region_metadata());
let source = new_source(&[
new_batch_by_range(&["a", "d"], 0, 60),
@@ -292,15 +326,18 @@ mod tests {
// Prepare data.
let mut writer = ParquetWriter::new_with_object_store(
object_store.clone(),
file_path,
metadata.clone(),
Indexer::default(),
);
NoopIndexBuilder,
FixedPathProvider {
file_id: handle.file_id(),
},
)
.await;
writer
.write_all(source, None, &write_opts)
.await
.unwrap()
.unwrap();
.remove(0);
// Predicate
let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
@@ -330,7 +367,6 @@ mod tests {
let mut env = TestEnv::new();
let object_store = env.init_object_store_manager();
let handle = sst_file_handle(0, 1000);
let file_path = handle.file_path(FILE_DIR);
let metadata = Arc::new(sst_region_metadata());
let source = new_source(&[
new_batch_by_range(&["a", "z"], 0, 0),
@@ -345,15 +381,18 @@ mod tests {
// Prepare data.
let mut writer = ParquetWriter::new_with_object_store(
object_store.clone(),
file_path,
metadata.clone(),
Indexer::default(),
);
NoopIndexBuilder,
FixedPathProvider {
file_id: handle.file_id(),
},
)
.await;
writer
.write_all(source, None, &write_opts)
.await
.unwrap()
.unwrap();
.remove(0);
let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store);
let mut reader = builder.build().await.unwrap();
@@ -365,7 +404,6 @@ mod tests {
let mut env = TestEnv::new();
let object_store = env.init_object_store_manager();
let handle = sst_file_handle(0, 1000);
let file_path = handle.file_path(FILE_DIR);
let metadata = Arc::new(sst_region_metadata());
let source = new_source(&[
new_batch_by_range(&["a", "d"], 0, 60),
@@ -380,16 +418,19 @@ mod tests {
// Prepare data.
let mut writer = ParquetWriter::new_with_object_store(
object_store.clone(),
file_path,
metadata.clone(),
Indexer::default(),
);
NoopIndexBuilder,
FixedPathProvider {
file_id: handle.file_id(),
},
)
.await;
writer
.write_all(source, None, &write_opts)
.await
.unwrap()
.unwrap();
.remove(0);
// Predicate
let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {

View File

@@ -28,6 +28,7 @@ use parquet::basic::{Compression, Encoding, ZstdLevel};
use parquet::file::metadata::KeyValue;
use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
use parquet::schema::types::ColumnPath;
use smallvec::smallvec;
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::consts::SEQUENCE_COLUMN_NAME;
@@ -35,40 +36,48 @@ use store_api::storage::SequenceNumber;
use tokio::io::AsyncWrite;
use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt};
use crate::access_layer::{FilePathProvider, SstInfoArray};
use crate::error::{InvalidMetadataSnafu, OpenDalSnafu, Result, WriteParquetSnafu};
use crate::read::{Batch, Source};
use crate::sst::index::Indexer;
use crate::sst::file::FileId;
use crate::sst::index::{Indexer, IndexerBuilder};
use crate::sst::parquet::format::WriteFormat;
use crate::sst::parquet::helper::parse_parquet_metadata;
use crate::sst::parquet::{SstInfo, WriteOptions, PARQUET_METADATA_KEY};
use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY};
/// Parquet SST writer.
pub struct ParquetWriter<F: WriterFactory> {
pub struct ParquetWriter<F: WriterFactory, I: IndexerBuilder, P: FilePathProvider> {
/// Path provider that creates SST and index file paths according to file id.
path_provider: P,
writer: Option<AsyncArrowWriter<SizeAwareWriter<F::Writer>>>,
/// Current active file id.
current_file: FileId,
writer_factory: F,
/// Region metadata of the source and the target SST.
metadata: RegionMetadataRef,
indexer: Indexer,
/// Indexer build that can create indexer for multiple files.
indexer_builder: I,
/// Current active indexer.
current_indexer: Option<Indexer>,
bytes_written: Arc<AtomicUsize>,
}
pub trait WriterFactory {
type Writer: AsyncWrite + Send + Unpin;
fn create(&mut self) -> impl Future<Output = Result<Self::Writer>>;
fn create(&mut self, file_path: &str) -> impl Future<Output = Result<Self::Writer>>;
}
pub struct ObjectStoreWriterFactory {
path: String,
object_store: ObjectStore,
}
impl WriterFactory for ObjectStoreWriterFactory {
type Writer = Compat<FuturesAsyncWriter>;
async fn create(&mut self) -> Result<Self::Writer> {
async fn create(&mut self, file_path: &str) -> Result<Self::Writer> {
self.object_store
.writer_with(&self.path)
.writer_with(file_path)
.chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
.concurrent(DEFAULT_WRITE_CONCURRENCY)
.await
@@ -77,36 +86,73 @@ impl WriterFactory for ObjectStoreWriterFactory {
}
}
impl ParquetWriter<ObjectStoreWriterFactory> {
pub fn new_with_object_store(
impl<I, P> ParquetWriter<ObjectStoreWriterFactory, I, P>
where
P: FilePathProvider,
I: IndexerBuilder,
{
pub async fn new_with_object_store(
object_store: ObjectStore,
path: String,
metadata: RegionMetadataRef,
indexer: Indexer,
) -> ParquetWriter<ObjectStoreWriterFactory> {
indexer_builder: I,
path_provider: P,
) -> ParquetWriter<ObjectStoreWriterFactory, I, P> {
ParquetWriter::new(
ObjectStoreWriterFactory { path, object_store },
ObjectStoreWriterFactory { object_store },
metadata,
indexer,
indexer_builder,
path_provider,
)
.await
}
}
impl<F> ParquetWriter<F>
impl<F, I, P> ParquetWriter<F, I, P>
where
F: WriterFactory,
I: IndexerBuilder,
P: FilePathProvider,
{
/// Creates a new parquet SST writer.
pub fn new(factory: F, metadata: RegionMetadataRef, indexer: Indexer) -> ParquetWriter<F> {
pub async fn new(
factory: F,
metadata: RegionMetadataRef,
indexer_builder: I,
path_provider: P,
) -> ParquetWriter<F, I, P> {
let init_file = FileId::random();
let index_file_path = path_provider.build_index_file_path(init_file);
let indexer = indexer_builder.build(init_file, index_file_path).await;
ParquetWriter {
path_provider,
writer: None,
current_file: init_file,
writer_factory: factory,
metadata,
indexer,
indexer_builder,
current_indexer: Some(indexer),
bytes_written: Arc::new(AtomicUsize::new(0)),
}
}
async fn get_or_create_indexer(&mut self) -> &mut Indexer {
match self.current_indexer {
None => {
self.current_file = FileId::random();
let index_file_path = self.path_provider.build_index_file_path(self.current_file);
let indexer = self
.indexer_builder
.build(self.current_file, index_file_path)
.await;
self.current_indexer = Some(indexer);
// safety: self.current_indexer already set above.
self.current_indexer.as_mut().unwrap()
}
Some(ref mut indexer) => indexer,
}
}
/// Iterates source and writes all rows to Parquet file.
///
/// Returns the [SstInfo] if the SST is written.
@@ -115,7 +161,7 @@ where
mut source: Source,
override_sequence: Option<SequenceNumber>, // override the `sequence` field from `Source`
opts: &WriteOptions,
) -> Result<Option<SstInfo>> {
) -> Result<SstInfoArray> {
let write_format =
WriteFormat::new(self.metadata.clone()).with_override_sequence(override_sequence);
let mut stats = SourceStats::default();
@@ -128,24 +174,24 @@ where
match res {
Ok(mut batch) => {
stats.update(&batch);
self.indexer.update(&mut batch).await;
self.get_or_create_indexer().await.update(&mut batch).await;
}
Err(e) => {
self.indexer.abort().await;
self.get_or_create_indexer().await.abort().await;
return Err(e);
}
}
}
let index_output = self.indexer.finish().await;
let index_output = self.get_or_create_indexer().await.finish().await;
if stats.num_rows == 0 {
return Ok(None);
return Ok(smallvec![]);
}
let Some(mut arrow_writer) = self.writer.take() else {
// No batch actually written.
return Ok(None);
return Ok(smallvec![]);
};
arrow_writer.flush().await.context(WriteParquetSnafu)?;
@@ -159,15 +205,18 @@ where
// convert FileMetaData to ParquetMetaData
let parquet_metadata = parse_parquet_metadata(file_meta)?;
let file_id = self.current_file;
// object_store.write will make sure all bytes are written or an error is raised.
Ok(Some(SstInfo {
Ok(smallvec![SstInfo {
file_id,
time_range,
file_size,
num_rows: stats.num_rows,
num_row_groups: parquet_metadata.num_row_groups() as u64,
file_metadata: Some(Arc::new(parquet_metadata)),
index_metadata: index_output,
}))
}])
}
/// Customizes per-column config according to schema and maybe column cardinality.
@@ -229,8 +278,9 @@ where
let props_builder = Self::customize_column_config(props_builder, &self.metadata);
let writer_props = props_builder.build();
let sst_file_path = self.path_provider.build_sst_file_path(self.current_file);
let writer = SizeAwareWriter::new(
self.writer_factory.create().await?,
self.writer_factory.create(&sst_file_path).await?,
self.bytes_written.clone(),
);
let arrow_writer =

View File

@@ -104,13 +104,13 @@ pub fn new_source(batches: &[Batch]) -> Source {
Source::Reader(Box::new(reader))
}
/// Creates a new [FileHandle] for a SST.
pub fn sst_file_handle(start_ms: i64, end_ms: i64) -> FileHandle {
/// Creates a SST file handle with provided file id
pub fn sst_file_handle_with_file_id(file_id: FileId, start_ms: i64, end_ms: i64) -> FileHandle {
let file_purger = new_noop_file_purger();
FileHandle::new(
FileMeta {
region_id: REGION_ID,
file_id: FileId::random(),
file_id,
time_range: (
Timestamp::new_millisecond(start_ms),
Timestamp::new_millisecond(end_ms),
@@ -127,6 +127,11 @@ pub fn sst_file_handle(start_ms: i64, end_ms: i64) -> FileHandle {
)
}
/// Creates a new [FileHandle] for a SST.
pub fn sst_file_handle(start_ms: i64, end_ms: i64) -> FileHandle {
sst_file_handle_with_file_id(FileId::random(), start_ms, end_ms)
}
pub fn new_batch_by_range(tags: &[&str], start: usize, end: usize) -> Batch {
assert!(end >= start);
let pk = new_primary_key(tags);

View File

@@ -27,6 +27,7 @@ use crate::puffin_manager::stager::Stager;
use crate::puffin_manager::PuffinManager;
/// `FsPuffinManager` is a `PuffinManager` that provides readers and writers for puffin data in filesystem.
#[derive(Clone)]
pub struct FsPuffinManager<S, F> {
/// The stager.
stager: S,