feat: persist manifest, SST and index files to staging dir (#6726)

* make flush and access layer aware of staging mode

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* staging flag in flush notify

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* staging manifest

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* index methods

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* only stage manifest

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* improve comment

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2025-08-13 23:02:31 -07:00
committed by GitHub
parent 1b6d924169
commit 4d97754cb4
7 changed files with 156 additions and 32 deletions

View File

@@ -239,10 +239,10 @@ impl AccessLayer {
.write_and_upload_sst(
request,
SstUploadRequest {
dest_path_provider: RegionFilePathFactory {
table_dir: self.table_dir.clone(),
path_type: self.path_type,
},
dest_path_provider: RegionFilePathFactory::new(
self.table_dir.clone(),
self.path_type,
),
remote_store: self.object_store.clone(),
},
write_opts,

View File

@@ -14,13 +14,19 @@
//! Integration tests for staging state functionality.
use store_api::region_engine::RegionEngine;
use store_api::region_request::{RegionAlterRequest, RegionRequest, RegionTruncateRequest};
use std::fs;
use api::v1::Rows;
use store_api::region_engine::{RegionEngine, SettableRegionRoleState};
use store_api::region_request::{
RegionAlterRequest, RegionFlushRequest, RegionRequest, RegionTruncateRequest,
};
use store_api::storage::RegionId;
use crate::config::MitoConfig;
use crate::region::{RegionLeaderState, RegionRoleState};
use crate::request::WorkerRequest;
use crate::test_util::{CreateRequestBuilder, TestEnv};
use crate::test_util::{build_rows, put_rows, rows_schema, CreateRequestBuilder, TestEnv};
#[tokio::test]
async fn test_staging_state_integration() {
@@ -127,8 +133,6 @@ async fn test_staging_blocks_truncate_operations() {
#[tokio::test]
async fn test_staging_state_validation_patterns() {
use crate::region::{RegionLeaderState, RegionRoleState};
// Test the state validation patterns used throughout the codebase
let staging_state = RegionRoleState::Leader(RegionLeaderState::Staging);
let writable_state = RegionRoleState::Leader(RegionLeaderState::Writable);
@@ -178,3 +182,84 @@ async fn test_staging_state_validation_patterns() {
"Writable regions should be flushable"
);
}
#[tokio::test]
async fn test_staging_manifest_directory() {
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1024, 0);
let request = CreateRequestBuilder::new().build();
// Get column schemas before consuming the request
let column_schemas = rows_schema(&request);
// Check manifest files after region creation (before staging mode)
// Create region
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
// Check that manifest files exist after region creation
let data_home = env.data_home();
let region_dir = format!("{}/data/test/1024_0000000000", data_home.display());
let normal_manifest_dir = format!("{}/manifest", region_dir);
assert!(
fs::metadata(&normal_manifest_dir).is_ok(),
"Normal manifest directory should exist"
);
// Now test staging mode manifest creation
// Set region to staging mode using the engine API
engine
.set_region_role_state_gracefully(region_id, SettableRegionRoleState::StagingLeader)
.await
.unwrap();
// Put some data and flush in staging mode
let rows_data = Rows {
schema: column_schemas.clone(),
rows: build_rows(0, 3),
};
put_rows(&engine, region_id, rows_data).await;
// Force flush to generate manifest files in staging mode
engine
.handle_request(
region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
)
.await
.unwrap();
// Check that manifest files are in staging directory
let staging_manifest_dir = format!("{}/staging/manifest", region_dir);
assert!(
fs::metadata(&staging_manifest_dir).is_ok(),
"Staging manifest directory should exist"
);
// Check what exists in normal manifest directory
let files: Vec<_> = fs::read_dir(&normal_manifest_dir)
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap();
assert!(
!files.is_empty(),
"Normal manifest directory should contain files"
);
// Check what exists in staging manifest directory
let staging_files = fs::read_dir(&staging_manifest_dir)
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap();
assert!(
!staging_files.is_empty(),
"Staging manifest directory should contain files"
);
}

View File

@@ -43,7 +43,7 @@ use crate::read::scan_region::PredicateGroup;
use crate::read::Source;
use crate::region::options::{IndexOptions, MergeMode};
use crate::region::version::{VersionControlData, VersionControlRef};
use crate::region::{ManifestContextRef, RegionLeaderState};
use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState};
use crate::request::{
BackgroundNotify, FlushFailed, FlushFinished, OptionOutputTx, OutputTx, SenderBulkRequest,
SenderDdlRequest, SenderWriteRequest, WorkerRequest, WorkerRequestWithTime,
@@ -458,7 +458,13 @@ impl RegionFlushTask {
let expected_state = if matches!(self.reason, FlushReason::Downgrading) {
RegionLeaderState::Downgrading
} else {
RegionLeaderState::Writable
// Check if region is in staging mode
let current_state = self.manifest_ctx.current_state();
if current_state == RegionRoleState::Leader(RegionLeaderState::Staging) {
RegionLeaderState::Staging
} else {
RegionLeaderState::Writable
}
};
// We will leak files if the manifest update fails, but we ignore them for simplicity. We can
// add a cleanup job to remove them later.

View File

@@ -35,9 +35,7 @@ use crate::manifest::storage::{
file_version, is_checkpoint_file, is_delta_file, ManifestObjectStore,
};
use crate::metrics::MANIFEST_OP_ELAPSED;
#[cfg(test)]
use crate::region::RegionLeaderState;
use crate::region::RegionRoleState;
use crate::region::{RegionLeaderState, RegionRoleState};
/// Options for [RegionManifestManager].
#[derive(Debug, Clone)]
@@ -167,7 +165,9 @@ impl RegionManifestManager {
// Persist region change.
let action_list =
RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange { metadata }));
store.save(version, &action_list.encode()?).await?;
// New region is not in staging mode.
// TODO(ruihang): add staging mode support if needed.
store.save(version, &action_list.encode()?, false).await?;
let checkpointer = Checkpointer::new(region_id, options, store.clone(), MIN_VERSION);
manifest_version.store(version, Ordering::Relaxed);
@@ -454,7 +454,10 @@ impl RegionManifestManager {
);
let version = self.increase_version();
self.store.save(version, &action_list.encode()?).await?;
let is_staging = region_state == RegionRoleState::Leader(RegionLeaderState::Staging);
self.store
.save(version, &action_list.encode()?, is_staging)
.await?;
let mut manifest_builder =
RegionManifestBuilder::with_checkpoint(Some(self.manifest.as_ref().clone()));

View File

@@ -134,6 +134,7 @@ pub struct ManifestObjectStore {
object_store: ObjectStore,
compress_type: CompressionType,
path: String,
staging_path: String,
/// Stores the size of each manifest file.
manifest_size_map: Arc<RwLock<HashMap<FileKey, u64>>>,
total_manifest_size: Arc<AtomicU64>,
@@ -146,18 +147,30 @@ impl ManifestObjectStore {
compress_type: CompressionType,
total_manifest_size: Arc<AtomicU64>,
) -> Self {
let path = util::normalize_dir(path);
let staging_path = {
// Convert "region_dir/manifest/" to "region_dir/staging/manifest/"
let parent_dir = path.trim_end_matches("manifest/").trim_end_matches('/');
util::normalize_dir(&format!("{}/staging/manifest", parent_dir))
};
Self {
object_store,
compress_type,
path: util::normalize_dir(path),
path,
staging_path,
manifest_size_map: Arc::new(RwLock::new(HashMap::new())),
total_manifest_size,
}
}
/// Returns the delta file path under the **current** compression algorithm
fn delta_file_path(&self, version: ManifestVersion) -> String {
gen_path(&self.path, &delta_file(version), self.compress_type)
fn delta_file_path(&self, version: ManifestVersion, is_staging: bool) -> String {
let base_path = if is_staging {
&self.staging_path
} else {
&self.path
};
gen_path(base_path, &delta_file(version), self.compress_type)
}
/// Returns the checkpoint file path under the **current** compression algorithm
@@ -390,8 +403,13 @@ impl ManifestObjectStore {
}
/// Save the delta manifest file.
pub async fn save(&mut self, version: ManifestVersion, bytes: &[u8]) -> Result<()> {
let path = self.delta_file_path(version);
pub async fn save(
&mut self,
version: ManifestVersion,
bytes: &[u8],
is_staging: bool,
) -> Result<()> {
let path = self.delta_file_path(version, is_staging);
debug!("Save log to manifest storage, version: {}", version);
let data = self
.compress_type
@@ -724,7 +742,7 @@ mod tests {
async fn test_manifest_log_store_case(mut log_store: ManifestObjectStore) {
for v in 0..5 {
log_store
.save(v, format!("hello, {v}").as_bytes())
.save(v, format!("hello, {v}").as_bytes(), false)
.await
.unwrap();
}
@@ -797,7 +815,7 @@ mod tests {
log_store.compress_type = CompressionType::Uncompressed;
for v in 0..5 {
log_store
.save(v, format!("hello, {v}").as_bytes())
.save(v, format!("hello, {v}").as_bytes(), false)
.await
.unwrap();
}
@@ -817,7 +835,7 @@ mod tests {
// write compressed data to stimulate compress algorithm take effect
for v in 5..10 {
log_store
.save(v, format!("hello, {v}").as_bytes())
.save(v, format!("hello, {v}").as_bytes(), false)
.await
.unwrap();
}
@@ -873,7 +891,7 @@ mod tests {
log_store.compress_type = CompressionType::Uncompressed;
for v in 0..5 {
log_store
.save(v, format!("hello, {v}").as_bytes())
.save(v, format!("hello, {v}").as_bytes(), false)
.await
.unwrap();
}
@@ -912,7 +930,7 @@ mod tests {
// write 5 manifest files
for v in 0..5 {
log_store
.save(v, format!("hello, {v}").as_bytes())
.save(v, format!("hello, {v}").as_bytes(), false)
.await
.unwrap();
}

View File

@@ -209,11 +209,22 @@ impl<S: LogStore> RegionWorkerLoop<S> {
}
};
region.version_control.apply_edit(
request.edit.clone(),
&request.memtables_to_remove,
region.file_purger.clone(),
);
// Check if region is currently in staging mode
let is_staging = region.manifest_ctx.current_state()
== crate::region::RegionRoleState::Leader(crate::region::RegionLeaderState::Staging);
if is_staging {
info!(
"Skipping region metadata update for region {} in staging mode",
region_id
);
} else {
region.version_control.apply_edit(
request.edit.clone(),
&request.memtables_to_remove,
region.file_purger.clone(),
);
}
region.update_flush_millis();

View File

@@ -242,7 +242,8 @@ impl<S> RegionWorkerLoop<S> {
continue;
};
match region.state() {
RegionRoleState::Leader(RegionLeaderState::Writable) => {
RegionRoleState::Leader(RegionLeaderState::Writable)
| RegionRoleState::Leader(RegionLeaderState::Staging) => {
let region_ctx = RegionWriteCtx::new(
region.region_id,
&region.version_control,