fix: address review comments

This commit is contained in:
Ning Sun
2026-05-21 09:46:21 +08:00
parent f2efe3bb3e
commit e03ee25214
3 changed files with 27 additions and 15 deletions

View File

@@ -292,8 +292,8 @@ impl CompactionTaskImpl {
.iter()
.zip(merge_output.files_to_add.iter())
.map(|(info, meta)| SstFileInfo {
file_meta: meta.clone(),
sst_info_ref: info,
file_meta: meta,
})
.collect();
hook.on_sst_files_written(

View File

@@ -28,7 +28,7 @@ use crate::sst::parquet::SstInfo;
/// Information about a single SST file written during flush.
pub struct SstFileInfo<'a> {
pub sst_info_ref: &'a SstInfo,
pub file_meta: FileMeta,
pub file_meta: &'a FileMeta,
}
/// Extension hook for flush operations.

View File

@@ -82,18 +82,17 @@ impl Iterator for PkCollectingIter {
fn next(&mut self) -> Option<Self::Item> {
let batch = self.inner.next();
if let Some(Ok(ref record_batch)) = batch {
let pk_col_idx = record_batch.num_columns().saturating_sub(3);
if let Some(pk_col) = record_batch.columns().get(pk_col_idx)
&& let Some(pk_dict) = pk_col
.as_any()
.downcast_ref::<DictionaryArray<UInt32Type>>()
&& let Some(pk_values) = pk_dict.values().as_any().downcast_ref::<BinaryArray>()
{
let mut keys = self.primary_keys.lock().unwrap();
for i in 0..pk_values.len() {
keys.insert(pk_values.value(i).to_vec());
}
if let Some(Ok(ref record_batch)) = batch
&& let Some(pk_col) =
record_batch.column_by_name(store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME)
&& let Some(pk_dict) = pk_col
.as_any()
.downcast_ref::<DictionaryArray<UInt32Type>>()
&& let Some(pk_values) = pk_dict.values().as_any().downcast_ref::<BinaryArray>()
{
let mut keys = self.primary_keys.lock().unwrap();
for i in 0..pk_values.len() {
keys.insert(pk_values.value(i).to_vec());
}
}
batch
@@ -401,6 +400,8 @@ impl RegionFlushTask {
/// Flushes memtables to level 0 SSTs and updates the manifest.
/// Returns the [RegionEdit] to apply.
async fn flush_memtables(&self, version_data: &VersionControlData) -> Result<RegionEdit> {
// We must use the immutable memtables list and entry ids from the `version_data`
// for consistency as others might already modify the version in the `version_control`.
let version = &version_data.version;
let timer = FLUSH_ELAPSED
.with_label_values(&["flush_memtables"])
@@ -457,7 +458,7 @@ impl RegionFlushTask {
.zip(file_metas.iter())
.map(|(sst_info, file_meta)| SstFileInfo {
sst_info_ref: sst_info,
file_meta: file_meta.clone(),
file_meta,
})
.collect();
hook.on_sst_files_written(self.region_id, &version.metadata, &files, &primary_keys)
@@ -469,6 +470,7 @@ impl RegionFlushTask {
files_to_remove: Vec::new(),
timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
compaction_time_window: None,
// The last entry has been flushed.
flushed_entry_id: Some(version_data.last_entry_id),
flushed_sequence: Some(version_data.committed_sequence),
committed_sequence: None,
@@ -483,6 +485,7 @@ impl RegionFlushTask {
let expected_state = if matches!(self.reason, FlushReason::Downgrading) {
RegionLeaderState::Downgrading
} else {
// Check if region is in staging mode
let current_state = self.manifest_ctx.current_state();
if current_state == RegionRoleState::Leader(RegionLeaderState::Staging) {
RegionLeaderState::Staging
@@ -490,6 +493,8 @@ impl RegionFlushTask {
RegionLeaderState::Writable
}
};
// We will leak files if the manifest update fails, but we ignore them for simplicity. We can
// add a cleanup job to remove them later.
let manifest_version = self
.manifest_ctx
.update_manifest(expected_state, action_list, self.is_staging)
@@ -527,9 +532,11 @@ impl RegionFlushTask {
let mut all_sst_infos = Vec::new();
for mem in memtables {
if mem.is_empty() {
// Skip empty memtables.
continue;
}
// Compact the memtable first, this waits the background compaction to finish.
let compact_start = std::time::Instant::now();
if let Err(e) = mem.compact(true) {
common_telemetry::error!(e; "Failed to compact memtable before flush");
@@ -537,12 +544,16 @@ impl RegionFlushTask {
let compact_cost = compact_start.elapsed();
flush_metrics.compact_memtable += compact_cost;
// Sets `for_flush` flag to true.
let mem_ranges = mem.ranges(None, RangesOptions::for_flush())?;
let num_mem_ranges = mem_ranges.ranges.len();
// Aggregate stats from all ranges
let num_mem_rows = mem_ranges.num_rows();
let memtable_series_count = mem_ranges.series_count();
let memtable_id = mem.id();
// Increases series count for each mem range. We consider each mem range has different series so
// the counter may have more series than the actual series count.
series_count += memtable_series_count;
let flush_start = Instant::now();
@@ -557,6 +568,7 @@ impl RegionFlushTask {
for (source_idx, result) in results.into_iter().enumerate() {
let (max_sequence, ssts_written, metrics) = result?;
if ssts_written.is_empty() {
// No data written.
continue;
}