Compare commits

..

2 Commits

Author SHA1 Message Date
Weny Xu
e1b18614ee feat(mito2): implement ApplyStagingManifest request handling (#7456)
* feat(mito2): implement `ApplyStagingManifest` request handling

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: fmt

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: fix logic

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: update proto

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-12-24 09:05:09 +00:00
Frost Ming
4bae75ccdb docs: refer to the correct project name in AI guidelines (#7471)
doc: refer to the correct project name in AI guidelines
2025-12-24 07:58:36 +00:00
15 changed files with 746 additions and 205 deletions

View File

@@ -111,7 +111,7 @@ We has the following policy for AI-assisted PRs:
### Why fully AI-generated PRs without understanding are not helpful
Today, AI tools cannot reliably make complex changes to DataFusion on their own, which is why we rely on pull requests and code review.
Today, AI tools cannot reliably make complex changes to GreptimeDB on their own, which is why we rely on pull requests and code review.
The purposes of code review are:

2
Cargo.lock generated
View File

@@ -5464,7 +5464,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=173efe5ec62722089db7c531c0b0d470a072b915#173efe5ec62722089db7c531c0b0d470a072b915"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=520fa524f9d590752ea327683e82ffd65721b27c#520fa524f9d590752ea327683e82ffd65721b27c"
dependencies = [
"prost 0.13.5",
"prost-types 0.13.5",

View File

@@ -150,7 +150,7 @@ etcd-client = { version = "0.16.1", features = [
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "173efe5ec62722089db7c531c0b0d470a072b915" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "520fa524f9d590752ea327683e82ffd65721b27c" }
hex = "0.4"
http = "1"
humantime = "2.1"

View File

@@ -1216,7 +1216,8 @@ impl RegionServerInner {
| RegionRequest::Compact(_)
| RegionRequest::Truncate(_)
| RegionRequest::BuildIndex(_)
| RegionRequest::EnterStaging(_) => RegionChange::None,
| RegionRequest::EnterStaging(_)
| RegionRequest::ApplyStagingManifest(_) => RegionChange::None,
RegionRequest::Catchup(_) => RegionChange::Catchup,
};

View File

@@ -220,6 +220,13 @@ impl RegionEngine for MetricEngine {
UnsupportedRegionRequestSnafu { request }.fail()
}
}
RegionRequest::ApplyStagingManifest(_) => {
if self.inner.is_physical_region(region_id) {
return self.inner.mito.handle_request(region_id, request).await;
} else {
UnsupportedRegionRequestSnafu { request }.fail()
}
}
RegionRequest::Put(put) => self.inner.put_region(region_id, put).await,
RegionRequest::Create(create) => {
self.inner

View File

@@ -162,6 +162,7 @@ impl CompactionTaskImpl {
edit,
result: Ok(()),
update_region_state: false,
is_staging: false,
}),
})
.await;

View File

@@ -76,6 +76,8 @@ mod copy_region_from_test;
#[cfg(test)]
mod remap_manifests_test;
#[cfg(test)]
mod apply_staging_manifest_test;
mod puffin_index;
use std::any::Any;

View File

@@ -0,0 +1,400 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::assert_matches::assert_matches;
use std::fs;
use api::v1::Rows;
use datatypes::value::Value;
use partition::expr::{PartitionExpr, col};
use store_api::region_engine::{
RegionEngine, RegionRole, RemapManifestsRequest, SettableRegionRoleState,
};
use store_api::region_request::{
ApplyStagingManifestRequest, EnterStagingRequest, RegionFlushRequest, RegionRequest,
};
use store_api::storage::{FileId, RegionId};
use crate::config::MitoConfig;
use crate::error::Error;
use crate::manifest::action::RegionManifest;
use crate::sst::file::FileMeta;
use crate::test_util::{CreateRequestBuilder, TestEnv, build_rows, put_rows, rows_schema};
fn range_expr(col_name: &str, start: i64, end: i64) -> PartitionExpr {
col(col_name)
.gt_eq(Value::Int64(start))
.and(col(col_name).lt(Value::Int64(end)))
}
#[tokio::test]
async fn test_apply_staging_manifest_invalid_region_state() {
common_telemetry::init_default_ut_logging();
test_apply_staging_manifest_invalid_region_state_with_format(false).await;
test_apply_staging_manifest_invalid_region_state_with_format(true).await;
}
async fn test_apply_staging_manifest_invalid_region_state_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("invalid-region-state").await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
..Default::default()
})
.await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new()
.partition_expr_json(Some(range_expr("x", 0, 50).as_json_str().unwrap()))
.build();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
// Region is in leader state, apply staging manifest request should fail.
let err = engine
.handle_request(
region_id,
RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest {
partition_expr: range_expr("x", 0, 100).as_json_str().unwrap(),
files_to_add: vec![],
}),
)
.await
.unwrap_err();
assert_matches!(
err.into_inner().as_any().downcast_ref::<Error>().unwrap(),
Error::RegionState { .. }
);
// Region is in leader state, apply staging manifest request should fail.
engine
.set_region_role(region_id, RegionRole::Follower)
.unwrap();
let err = engine
.handle_request(
region_id,
RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest {
partition_expr: range_expr("x", 0, 100).as_json_str().unwrap(),
files_to_add: vec![],
}),
)
.await
.unwrap_err();
assert_matches!(
err.into_inner().as_any().downcast_ref::<Error>().unwrap(),
Error::RegionState { .. }
);
}
#[tokio::test]
async fn test_apply_staging_manifest_mismatched_partition_expr() {
common_telemetry::init_default_ut_logging();
test_apply_staging_manifest_mismatched_partition_expr_with_format(false).await;
test_apply_staging_manifest_mismatched_partition_expr_with_format(true).await;
}
async fn test_apply_staging_manifest_mismatched_partition_expr_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("mismatched-partition-expr").await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
..Default::default()
})
.await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
engine
.handle_request(
region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: range_expr("x", 0, 50).as_json_str().unwrap(),
}),
)
.await
.unwrap();
let err = engine
.handle_request(
region_id,
RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest {
partition_expr: range_expr("x", 0, 100).as_json_str().unwrap(),
files_to_add: vec![],
}),
)
.await
.unwrap_err();
assert_matches!(
err.into_inner().as_any().downcast_ref::<Error>().unwrap(),
Error::StagingPartitionExprMismatch { .. }
)
}
#[tokio::test]
async fn test_apply_staging_manifest_success() {
common_telemetry::init_default_ut_logging();
test_apply_staging_manifest_success_with_format(false).await;
test_apply_staging_manifest_success_with_format(true).await;
}
async fn test_apply_staging_manifest_success_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("success").await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
..Default::default()
})
.await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new()
.partition_expr_json(Some(range_expr("tag_0", 0, 100).as_json_str().unwrap()))
.build();
let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
let new_region_id_1 = RegionId::new(1, 2);
let new_region_id_2 = RegionId::new(1, 3);
// Generate some data
for i in 0..3 {
let rows_data = Rows {
schema: column_schemas.clone(),
rows: build_rows(i * 10, (i + 1) * 10),
};
put_rows(&engine, region_id, rows_data).await;
engine
.handle_request(
region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
)
.await
.unwrap();
}
engine
.set_region_role_state_gracefully(region_id, SettableRegionRoleState::StagingLeader)
.await
.unwrap();
let result = engine
.remap_manifests(RemapManifestsRequest {
region_id,
input_regions: vec![region_id],
region_mapping: [(region_id, vec![new_region_id_1, new_region_id_2])]
.into_iter()
.collect(),
new_partition_exprs: [
(
new_region_id_1,
range_expr("tag_0", 0, 50).as_json_str().unwrap(),
),
(
new_region_id_2,
range_expr("tag_0", 50, 100).as_json_str().unwrap(),
),
]
.into_iter()
.collect(),
})
.await
.unwrap();
assert_eq!(result.new_manifests.len(), 2);
let new_manifest_1 =
serde_json::from_str::<RegionManifest>(&result.new_manifests[&new_region_id_1]).unwrap();
let new_manifest_2 =
serde_json::from_str::<RegionManifest>(&result.new_manifests[&new_region_id_2]).unwrap();
assert_eq!(new_manifest_1.files.len(), 3);
assert_eq!(new_manifest_2.files.len(), 3);
let request = CreateRequestBuilder::new().build();
engine
.handle_request(new_region_id_1, RegionRequest::Create(request))
.await
.unwrap();
engine
.handle_request(
new_region_id_1,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: range_expr("tag_0", 0, 50).as_json_str().unwrap(),
}),
)
.await
.unwrap();
let mut files_to_add = new_manifest_1.files.values().cloned().collect::<Vec<_>>();
// Before apply staging manifest, the files should be empty
let region = engine.get_region(new_region_id_1).unwrap();
let manifest = region.manifest_ctx.manifest().await;
assert_eq!(manifest.files.len(), 0);
let staging_manifest = region.manifest_ctx.staging_manifest().await.unwrap();
assert_eq!(staging_manifest.files.len(), 0);
engine
.handle_request(
new_region_id_1,
RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest {
partition_expr: range_expr("tag_0", 0, 50).as_json_str().unwrap(),
files_to_add: serde_json::to_vec(&files_to_add).unwrap(),
}),
)
.await
.unwrap();
// After apply staging manifest, the files should be the same as the new manifest
let region = engine.get_region(new_region_id_1).unwrap();
let manifest = region.manifest_ctx.manifest().await;
assert_eq!(manifest.files.len(), 3);
assert!(region.is_writable());
assert!(!region.is_staging());
// The manifest partition expr should be the same as the request.
assert_eq!(
manifest.metadata.partition_expr.as_ref().unwrap(),
&range_expr("tag_0", 0, 50).as_json_str().unwrap()
);
// The staging manifest should be cleared.
let staging_manifest = region.manifest_ctx.staging_manifest().await;
assert!(staging_manifest.is_none());
// The staging partition expr should be cleared.
assert!(region.staging_partition_expr.lock().unwrap().is_none());
// The staging manifest directory should be empty.
let data_home = env.data_home();
let region_dir = format!("{}/data/test/1_0000000001", data_home.display());
let staging_manifest_dir = format!("{}/staging/manifest", region_dir);
let staging_files = fs::read_dir(&staging_manifest_dir)
.map(|entries| entries.collect::<Result<Vec<_>, _>>().unwrap_or_default())
.unwrap_or_default();
assert_eq!(staging_files.len(), 0);
// Try to modify the file sequence.
files_to_add.push(FileMeta {
region_id,
file_id: FileId::random(),
..Default::default()
});
// This request will be ignored.
engine
.handle_request(
new_region_id_1,
RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest {
partition_expr: range_expr("tag_0", 0, 50).as_json_str().unwrap(),
files_to_add: serde_json::to_vec(&files_to_add).unwrap(),
}),
)
.await
.unwrap();
// The files number should not change.
let region = engine.get_region(new_region_id_1).unwrap();
let manifest = region.manifest_ctx.manifest().await;
assert_eq!(manifest.files.len(), 3);
}
#[tokio::test]
async fn test_apply_staging_manifest_invalid_files_to_add() {
common_telemetry::init_default_ut_logging();
test_apply_staging_manifest_invalid_files_to_add_with_format(false).await;
test_apply_staging_manifest_invalid_files_to_add_with_format(true).await;
}
async fn test_apply_staging_manifest_invalid_files_to_add_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("invalid-files-to-add").await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
..Default::default()
})
.await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
engine
.handle_request(
region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: range_expr("tag_0", 0, 50).as_json_str().unwrap(),
}),
)
.await
.unwrap();
let err = engine
.handle_request(
region_id,
RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest {
partition_expr: range_expr("tag_0", 0, 50).as_json_str().unwrap(),
files_to_add: b"invalid".to_vec(),
}),
)
.await
.unwrap_err();
assert_matches!(
err.into_inner().as_any().downcast_ref::<Error>().unwrap(),
Error::SerdeJson { .. }
);
}
#[tokio::test]
async fn test_apply_staging_manifest_empty_files() {
common_telemetry::init_default_ut_logging();
test_apply_staging_manifest_empty_files_with_format(false).await;
test_apply_staging_manifest_empty_files_with_format(true).await;
}
async fn test_apply_staging_manifest_empty_files_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("empty-files").await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
..Default::default()
})
.await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
engine
.handle_request(
region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: range_expr("tag_0", 0, 50).as_json_str().unwrap(),
}),
)
.await
.unwrap();
engine
.handle_request(
region_id,
RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest {
partition_expr: range_expr("tag_0", 0, 50).as_json_str().unwrap(),
files_to_add: serde_json::to_vec::<Vec<FileMeta>>(&vec![]).unwrap(),
}),
)
.await
.unwrap();
let region = engine.get_region(region_id).unwrap();
let manifest = region.manifest_ctx.manifest().await;
assert_eq!(manifest.files.len(), 0);
let staging_manifest = region.manifest_ctx.staging_manifest().await;
assert!(staging_manifest.is_none());
let staging_partition_expr = region.staging_partition_expr.lock().unwrap();
assert!(staging_partition_expr.is_none());
}

View File

@@ -45,6 +45,18 @@ pub enum RegionMetaAction {
Truncate(RegionTruncate),
}
impl RegionMetaAction {
/// Returns true if the action is a change action.
pub fn is_change(&self) -> bool {
matches!(self, RegionMetaAction::Change(_))
}
/// Returns true if the action is an edit action.
pub fn is_edit(&self) -> bool {
matches!(self, RegionMetaAction::Edit(_))
}
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct RegionChange {
/// The metadata after changed.
@@ -340,51 +352,16 @@ pub struct RemovedFiles {
/// the files are removed from manifest. The timestamp is in milliseconds since unix epoch.
pub removed_at: i64,
/// The set of file ids that are removed.
#[serde(default)]
pub files: HashSet<RemovedFile>,
}
/// A removed file, which can be a data file(optional paired with a index file) or an outdated index file.
#[derive(Serialize, Hash, Clone, Debug, PartialEq, Eq)]
#[derive(Serialize, Deserialize, Hash, Clone, Debug, PartialEq, Eq)]
pub enum RemovedFile {
File(FileId, Option<IndexVersion>),
Index(FileId, IndexVersion),
}
/// Support deserialize from old format(just FileId as string) for backward compatibility
/// into current format(RemovedFile enum).
/// This is needed just in case there are old manifests with removed files recorded.
impl<'de> Deserialize<'de> for RemovedFile {
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
#[derive(Deserialize)]
#[serde(untagged)]
enum CompatRemovedFile {
Enum(RemovedFileEnum),
FileId(FileId),
}
#[derive(Deserialize)]
enum RemovedFileEnum {
File(FileId, Option<IndexVersion>),
Index(FileId, IndexVersion),
}
let compat = CompatRemovedFile::deserialize(deserializer)?;
match compat {
CompatRemovedFile::FileId(file_id) => Ok(RemovedFile::File(file_id, None)),
CompatRemovedFile::Enum(e) => match e {
RemovedFileEnum::File(file_id, version) => Ok(RemovedFile::File(file_id, version)),
RemovedFileEnum::Index(file_id, version) => {
Ok(RemovedFile::Index(file_id, version))
}
},
}
}
}
impl RemovedFile {
pub fn file_id(&self) -> FileId {
match self {
@@ -473,7 +450,8 @@ impl RegionMetaActionList {
Self { actions }
}
pub fn into_region_edit(self) -> RegionEdit {
/// Split the actions into a region change and an edit.
pub fn split_region_change_and_edit(self) -> (Option<RegionChange>, RegionEdit) {
let mut edit = RegionEdit {
files_to_add: Vec::new(),
files_to_remove: Vec::new(),
@@ -483,31 +461,39 @@ impl RegionMetaActionList {
flushed_sequence: None,
committed_sequence: None,
};
let mut region_change = None;
for action in self.actions {
if let RegionMetaAction::Edit(region_edit) = action {
// Merge file adds/removes
edit.files_to_add.extend(region_edit.files_to_add);
edit.files_to_remove.extend(region_edit.files_to_remove);
// Max of flushed entry id / sequence
if let Some(eid) = region_edit.flushed_entry_id {
edit.flushed_entry_id = Some(edit.flushed_entry_id.map_or(eid, |v| v.max(eid)));
match action {
RegionMetaAction::Change(change) => {
region_change = Some(change);
}
if let Some(seq) = region_edit.flushed_sequence {
edit.flushed_sequence = Some(edit.flushed_sequence.map_or(seq, |v| v.max(seq)));
}
if let Some(seq) = region_edit.committed_sequence {
edit.committed_sequence =
Some(edit.committed_sequence.map_or(seq, |v| v.max(seq)));
}
// Prefer the latest non-none time window
if region_edit.compaction_time_window.is_some() {
edit.compaction_time_window = region_edit.compaction_time_window;
RegionMetaAction::Edit(region_edit) => {
// Merge file adds/removes
edit.files_to_add.extend(region_edit.files_to_add);
edit.files_to_remove.extend(region_edit.files_to_remove);
// Max of flushed entry id / sequence
if let Some(eid) = region_edit.flushed_entry_id {
edit.flushed_entry_id =
Some(edit.flushed_entry_id.map_or(eid, |v| v.max(eid)));
}
if let Some(seq) = region_edit.flushed_sequence {
edit.flushed_sequence =
Some(edit.flushed_sequence.map_or(seq, |v| v.max(seq)));
}
if let Some(seq) = region_edit.committed_sequence {
edit.committed_sequence =
Some(edit.committed_sequence.map_or(seq, |v| v.max(seq)));
}
// Prefer the latest non-none time window
if region_edit.compaction_time_window.is_some() {
edit.compaction_time_window = region_edit.compaction_time_window;
}
}
_ => {}
}
}
edit
(region_change, edit)
}
}
@@ -1043,115 +1029,4 @@ mod tests {
let deserialized: RegionChange = serde_json::from_str(&serialized).unwrap();
assert_eq!(deserialized.sst_format, FormatType::Flat);
}
#[test]
fn test_removed_file_compatibility() {
let file_id = FileId::random();
// Case 1: Deserialize from FileId string (Legacy format)
let json_str = format!("\"{}\"", file_id);
let removed_file: RemovedFile = serde_json::from_str(&json_str).unwrap();
assert_eq!(removed_file, RemovedFile::File(file_id, None));
// Case 2: Deserialize from new format (File)
let removed_file_v2 = RemovedFile::File(file_id, Some(10));
let json_v2 = serde_json::to_string(&removed_file_v2).unwrap();
let deserialized_v2: RemovedFile = serde_json::from_str(&json_v2).unwrap();
assert_eq!(removed_file_v2, deserialized_v2);
// Case 3: Deserialize from new format (Index)
let removed_index = RemovedFile::Index(file_id, 20);
let json_index = serde_json::to_string(&removed_index).unwrap();
let deserialized_index: RemovedFile = serde_json::from_str(&json_index).unwrap();
assert_eq!(removed_index, deserialized_index);
// Case 4: Round-trip serialization/deserialization of new enum format with None as index version
let removed_file = RemovedFile::File(file_id, None);
let json = serde_json::to_string(&removed_file).unwrap();
let deserialized: RemovedFile = serde_json::from_str(&json).unwrap();
assert_eq!(removed_file, deserialized);
// Case 5: Deserialize mixed set in RemovedFilesRecord
// This simulates a Set<RemovedFile> which might contain old strings or new objects if manually constructed or from old versions.
// Actually, if it was HashSet<FileId>, the JSON is ["id1", "id2"].
// If it is HashSet<RemovedFile>, the JSON is [{"File":...}, "id2"] if mixed (which shouldn't happen usually but good to test).
let json_set = format!("[\"{}\"]", file_id);
let removed_files_set: HashSet<RemovedFile> = serde_json::from_str(&json_set).unwrap();
assert!(removed_files_set.contains(&RemovedFile::File(file_id, None)));
}
/// It is intentionally acceptable to ignore the legacy `file_ids` field when
/// deserializing [`RemovedFiles`].
///
/// In older manifests, `file_ids` recorded the set of SSTable files that were
/// candidates for garbage collection at a given `removed_at` timestamp. The
/// newer format stores this information in the `files` field instead. When we
/// deserialize an old manifest entry into the new struct, we *drop* the
/// `file_ids` field instead of trying to recover or merge it.
///
/// Dropping `file_ids` does **not** risk deleting live data: a file is only
/// physically removed when it is both (a) no longer referenced by any region
/// metadata and (b) selected by the GC worker as safe to delete. Losing the
/// historical list of candidate `file_ids` merely means some obsolete files
/// may stay on disk longer than strictly necessary.
///
/// The GC worker periodically scans storage (e.g. by walking the data
/// directories and/or consulting the latest manifest) to discover files that
/// are no longer referenced anywhere. Any files that were only referenced via
/// the dropped `file_ids` field will be rediscovered during these scans and
/// eventually deleted. Thus the system converges to a correct, fully-collected
/// state without relying on `file_ids`, and the only potential impact of
/// ignoring it is temporary disk space overhead, not data loss.
#[test]
fn test_removed_files_backward_compatibility() {
// Define the old version struct with file_ids field
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
struct OldRemovedFiles {
pub removed_at: i64,
pub file_ids: HashSet<FileId>,
}
// Create an old version instance
let mut file_ids = HashSet::new();
file_ids.insert(FileId::random());
file_ids.insert(FileId::random());
let old_removed_files = OldRemovedFiles {
removed_at: 1234567890,
file_ids,
};
// Serialize the old version
let old_json = serde_json::to_string(&old_removed_files).unwrap();
// Try to deserialize into new version - file_ids should be ignored
let result: Result<RemovedFiles, _> = serde_json::from_str(&old_json);
// This should succeed and create a default RemovedFiles (empty files set)
assert!(result.is_ok(), "{:?}", result);
let removed_files = result.unwrap();
assert_eq!(removed_files.removed_at, 1234567890);
assert!(removed_files.files.is_empty());
// Test that new format still works
let file_id = FileId::random();
let new_json = format!(
r#"{{
"removed_at": 1234567890,
"files": ["{}"]
}}"#,
file_id
);
let result: Result<RemovedFiles, _> = serde_json::from_str(&new_json);
assert!(result.is_ok());
let removed_files = result.unwrap();
assert_eq!(removed_files.removed_at, 1234567890);
assert_eq!(removed_files.files.len(), 1);
assert!(
removed_files
.files
.contains(&RemovedFile::File(file_id, None))
);
}
}

View File

@@ -45,7 +45,7 @@ pub use utils::*;
use crate::access_layer::AccessLayerRef;
use crate::error::{
FlushableRegionStateSnafu, InvalidPartitionExprSnafu, RegionNotFoundSnafu, RegionStateSnafu,
RegionTruncatedSnafu, Result, UpdateManifestSnafu,
RegionTruncatedSnafu, Result, UnexpectedSnafu, UpdateManifestSnafu,
};
use crate::manifest::action::{
RegionChange, RegionManifest, RegionMetaAction, RegionMetaActionList,
@@ -102,6 +102,16 @@ pub enum RegionRoleState {
Follower,
}
impl RegionRoleState {
/// Converts the region role state to leader state if it is a leader state.
pub fn into_leader_state(self) -> Option<RegionLeaderState> {
match self {
RegionRoleState::Leader(leader_state) => Some(leader_state),
RegionRoleState::Follower => None,
}
}
}
/// Metadata and runtime status of a region.
///
/// Writing and reading a region follow a single-writer-multi-reader rule:
@@ -322,11 +332,8 @@ impl MitoRegion {
/// Sets the editing state.
/// You should call this method in the worker loop.
pub(crate) fn set_editing(&self) -> Result<()> {
self.compare_exchange_state(
RegionLeaderState::Writable,
RegionRoleState::Leader(RegionLeaderState::Editing),
)
pub(crate) fn set_editing(&self, expect: RegionLeaderState) -> Result<()> {
self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Editing))
}
/// Sets the staging state.
@@ -359,6 +366,7 @@ impl MitoRegion {
/// You should call this method in the worker loop.
/// Transitions from Staging to Writable state.
pub fn exit_staging(&self) -> Result<()> {
*self.staging_partition_expr.lock().unwrap() = None;
self.compare_exchange_state(
RegionLeaderState::Staging,
RegionRoleState::Leader(RegionLeaderState::Writable),
@@ -370,7 +378,8 @@ impl MitoRegion {
&self,
state: SettableRegionRoleState,
) -> Result<()> {
let mut manager = self.manifest_ctx.manifest_manager.write().await;
let mut manager: RwLockWriteGuard<'_, RegionManifestManager> =
self.manifest_ctx.manifest_manager.write().await;
let current_state = self.state();
match state {
@@ -705,6 +714,20 @@ impl MitoRegion {
return Ok(());
}
};
let expect_change = merged_actions.actions.iter().any(|a| a.is_change());
let expect_edit = merged_actions.actions.iter().any(|a| a.is_edit());
ensure!(
expect_change,
UnexpectedSnafu {
reason: "expect a change action in merged actions"
}
);
ensure!(
expect_edit,
UnexpectedSnafu {
reason: "expect an edit action in merged actions"
}
);
// Submit merged actions using the manifest manager's update method
// Pass the `false` so it saves to normal directory, not staging
@@ -716,12 +739,17 @@ impl MitoRegion {
);
// Apply the merged changes to in-memory version control
let merged_edit = merged_actions.into_region_edit();
let (merged_change, merged_edit) = merged_actions.split_region_change_and_edit();
// Safety: we have already ensured that there is a change action in the merged actions.
let new_metadata = merged_change.as_ref().unwrap().metadata.clone();
self.version_control.alter_schema(new_metadata);
self.version_control
.apply_edit(Some(merged_edit), &[], self.file_purger.clone());
// Clear all staging manifests and transit state
manager.store().clear_staging_manifests().await?;
if let Err(e) = manager.clear_staging_manifest_and_dir().await {
error!(e; "Failed to clear staging manifest dir for region {}", self.region_id);
}
self.exit_staging()?;
Ok(())

View File

@@ -37,10 +37,10 @@ use store_api::region_engine::{
MitoCopyRegionFromResponse, SetRegionRoleStateResponse, SettableRegionRoleState,
};
use store_api::region_request::{
AffectedRows, EnterStagingRequest, RegionAlterRequest, RegionBuildIndexRequest,
RegionBulkInsertsRequest, RegionCatchupRequest, RegionCloseRequest, RegionCompactRequest,
RegionCreateRequest, RegionFlushRequest, RegionOpenRequest, RegionRequest,
RegionTruncateRequest,
AffectedRows, ApplyStagingManifestRequest, EnterStagingRequest, RegionAlterRequest,
RegionBuildIndexRequest, RegionBulkInsertsRequest, RegionCatchupRequest, RegionCloseRequest,
RegionCompactRequest, RegionCreateRequest, RegionFlushRequest, RegionOpenRequest,
RegionRequest, RegionTruncateRequest,
};
use store_api::storage::{FileId, RegionId};
use tokio::sync::oneshot::{self, Receiver, Sender};
@@ -741,6 +741,11 @@ impl WorkerRequest {
sender: sender.into(),
request: region_bulk_inserts_request,
},
RegionRequest::ApplyStagingManifest(v) => WorkerRequest::Ddl(SenderDdlRequest {
region_id,
sender: sender.into(),
request: DdlRequest::ApplyStagingManifest(v),
}),
};
Ok((worker_request, receiver))
@@ -852,6 +857,7 @@ pub(crate) enum DdlRequest {
Truncate(RegionTruncateRequest),
Catchup((RegionCatchupRequest, Option<WalEntryReceiver>)),
EnterStaging(EnterStagingRequest),
ApplyStagingManifest(ApplyStagingManifestRequest),
}
/// Sender and Ddl request.
@@ -1080,6 +1086,8 @@ pub(crate) struct RegionEditResult {
pub(crate) result: Result<()>,
/// Whether region state need to be set to Writable after handling this request.
pub(crate) update_region_state: bool,
/// The region is in staging mode before handling this request.
pub(crate) is_staging: bool,
}
#[derive(Debug)]

View File

@@ -15,6 +15,7 @@
//! Structs and utilities for writing regions.
mod handle_alter;
mod handle_apply_staging;
mod handle_bulk_insert;
mod handle_catchup;
mod handle_close;
@@ -1005,7 +1006,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
.await;
}
WorkerRequest::EditRegion(request) => {
self.handle_region_edit(request).await;
self.handle_region_edit(request);
}
WorkerRequest::Stop => {
debug_assert!(!self.running.load(Ordering::Relaxed));
@@ -1107,6 +1108,11 @@ impl<S: LogStore> RegionWorkerLoop<S> {
.await;
continue;
}
DdlRequest::ApplyStagingManifest(req) => {
self.handle_apply_staging_manifest_request(ddl.region_id, req, ddl.sender)
.await;
continue;
}
};
ddl.sender.send(res);

View File

@@ -0,0 +1,140 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use chrono::Utc;
use common_telemetry::{debug, info};
use snafu::ResultExt;
use store_api::logstore::LogStore;
use store_api::region_request::ApplyStagingManifestRequest;
use store_api::storage::RegionId;
use tokio::sync::oneshot;
use crate::error::{
RegionStateSnafu, SerdeJsonSnafu, StagingPartitionExprMismatchSnafu, UnexpectedSnafu,
};
use crate::manifest::action::RegionEdit;
use crate::region::{RegionLeaderState, RegionRoleState};
use crate::request::{OptionOutputTx, RegionEditRequest};
use crate::sst::file::FileMeta;
use crate::worker::RegionWorkerLoop;
impl<S: LogStore> RegionWorkerLoop<S> {
pub(crate) async fn handle_apply_staging_manifest_request(
&mut self,
region_id: RegionId,
request: ApplyStagingManifestRequest,
sender: OptionOutputTx,
) {
let region = match self.regions.writable_region(region_id) {
Ok(region) => region,
Err(e) => {
sender.send(Err(e));
return;
}
};
if !region.is_staging() {
let manifest_partition_expr = region.metadata().partition_expr.as_ref().cloned();
let is_match = manifest_partition_expr.as_ref() == Some(&request.partition_expr);
debug!(
"region {} manifest partition expr: {:?}, request partition expr: {:?}",
region_id, manifest_partition_expr, request.partition_expr
);
if is_match {
// If current partition expr is already the same as the request,
// treats the region already applied the staging manifest.
info!(
"Region {} already applied the staging manifest, partition expr: {}, ignore the apply staging manifest request",
region_id, request.partition_expr
);
sender.send(Ok(0));
return;
}
sender.send(
RegionStateSnafu {
region_id,
state: region.state(),
expect: RegionRoleState::Leader(RegionLeaderState::Staging),
}
.fail(),
);
return;
}
let staging_partition_expr = region.staging_partition_expr.lock().unwrap().clone();
// If the partition expr mismatch, return error.
if staging_partition_expr.as_ref() != Some(&request.partition_expr) {
sender.send(
StagingPartitionExprMismatchSnafu {
manifest_expr: staging_partition_expr,
request_expr: request.partition_expr,
}
.fail(),
);
return;
}
let (tx, rx) = oneshot::channel();
let files_to_add = match serde_json::from_slice::<Vec<FileMeta>>(&request.files_to_add)
.context(SerdeJsonSnafu)
{
Ok(files_to_add) => files_to_add,
Err(e) => {
sender.send(Err(e));
return;
}
};
info!("Applying staging manifest request to region {}", region_id);
self.handle_region_edit(RegionEditRequest {
region_id,
edit: RegionEdit {
files_to_add,
files_to_remove: vec![],
timestamp_ms: Some(Utc::now().timestamp_millis()),
compaction_time_window: None,
flushed_entry_id: None,
flushed_sequence: None,
committed_sequence: None,
},
tx,
});
common_runtime::spawn_global(async move {
// Await the result from the region edit and forward the outcome to the original sender.
// If the operation completes successfully, respond with Ok(0); otherwise, respond with an appropriate error.
if let Ok(result) = rx.await {
let Ok(()) = result else {
sender.send(result.map(|_| 0));
return;
};
let mut manager = region.manifest_ctx.manifest_manager.write().await;
match region.exit_staging_on_success(&mut manager).await {
Ok(()) => {
sender.send(Ok(0));
}
Err(e) => sender.send(Err(e)),
}
} else {
sender.send(
UnexpectedSnafu {
reason: "edit region receiver channel closed",
}
.fail(),
);
}
});
}
}

View File

@@ -214,7 +214,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
impl<S> RegionWorkerLoop<S> {
/// Handles region edit request.
pub(crate) async fn handle_region_edit(&mut self, request: RegionEditRequest) {
pub(crate) fn handle_region_edit(&mut self, request: RegionEditRequest) {
let region_id = request.region_id;
let Some(region) = self.regions.get_region(region_id) else {
let _ = request.tx.send(RegionNotFoundSnafu { region_id }.fail());
@@ -246,8 +246,15 @@ impl<S> RegionWorkerLoop<S> {
file.sequence = NonZeroU64::new(file_sequence);
}
// Allow retrieving `is_staging` before spawn the edit region task.
let is_staging = region.is_staging();
let expect_state = if is_staging {
RegionLeaderState::Staging
} else {
RegionLeaderState::Writable
};
// Marks the region as editing.
if let Err(e) = region.set_editing() {
if let Err(e) = region.set_editing(expect_state) {
let _ = sender.send(Err(e));
return;
}
@@ -258,7 +265,8 @@ impl<S> RegionWorkerLoop<S> {
// Now the region is in editing state.
// Updates manifest in background.
common_runtime::spawn_global(async move {
let result = edit_region(&region, edit.clone(), cache_manager, listener).await;
let result =
edit_region(&region, edit.clone(), cache_manager, listener, is_staging).await;
let notify = WorkerRequest::Background {
region_id,
notify: BackgroundNotify::RegionEdit(RegionEditResult {
@@ -268,6 +276,7 @@ impl<S> RegionWorkerLoop<S> {
result,
// we always need to restore region state after region edit
update_region_state: true,
is_staging,
}),
};
@@ -299,29 +308,39 @@ impl<S> RegionWorkerLoop<S> {
}
};
let need_compaction =
edit_result.result.is_ok() && !edit_result.edit.files_to_add.is_empty();
let need_compaction = if edit_result.is_staging {
if edit_result.update_region_state {
// For staging regions, edits are not applied immediately,
// as they remain invisible until the region exits the staging state.
region.switch_state_to_staging(RegionLeaderState::Editing);
}
if edit_result.result.is_ok() {
// Applies the edit to the region.
region.version_control.apply_edit(
Some(edit_result.edit),
&[],
region.file_purger.clone(),
);
}
false
} else {
let need_compaction =
edit_result.result.is_ok() && !edit_result.edit.files_to_add.is_empty();
// Only apply the edit if the result is ok and region is not in staging state.
if edit_result.result.is_ok() {
// Applies the edit to the region.
region.version_control.apply_edit(
Some(edit_result.edit),
&[],
region.file_purger.clone(),
);
}
if edit_result.update_region_state {
region.switch_state_to_writable(RegionLeaderState::Editing);
}
if edit_result.update_region_state {
// Sets the region as writable.
region.switch_state_to_writable(RegionLeaderState::Editing);
}
need_compaction
};
let _ = edit_result.sender.send(edit_result.result);
if let Some(edit_queue) = self.region_edit_queues.get_mut(&edit_result.region_id)
&& let Some(request) = edit_queue.dequeue()
{
self.handle_region_edit(request).await;
self.handle_region_edit(request);
}
if need_compaction {
@@ -463,9 +482,9 @@ async fn edit_region(
edit: RegionEdit,
cache_manager: CacheManagerRef,
listener: WorkerListener,
is_staging: bool,
) -> Result<()> {
let region_id = region.region_id;
let is_staging = region.is_staging();
if let Some(write_cache) = cache_manager.write_cache() {
for file_meta in &edit.files_to_add {
let write_cache = write_cache.clone();
@@ -530,7 +549,10 @@ async fn edit_region(
}
}
info!("Applying {edit:?} to region {}", region_id);
info!(
"Applying {edit:?} to region {}, is_staging: {}",
region_id, is_staging
);
let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit));
region

View File

@@ -152,6 +152,7 @@ pub enum RegionRequest {
Catchup(RegionCatchupRequest),
BulkInserts(RegionBulkInsertsRequest),
EnterStaging(EnterStagingRequest),
ApplyStagingManifest(ApplyStagingManifestRequest),
}
impl RegionRequest {
@@ -182,6 +183,9 @@ impl RegionRequest {
reason: "ListMetadata request should be handled separately by RegionServer",
}
.fail(),
region_request::Body::ApplyStagingManifest(apply) => {
make_region_apply_staging_manifest(apply)
}
}
}
@@ -413,6 +417,28 @@ fn make_region_bulk_inserts(request: BulkInsertRequest) -> Result<Vec<(RegionId,
)])
}
fn make_region_apply_staging_manifest(
api::v1::region::ApplyStagingManifestRequest {
region_id,
partition_expr,
files_to_add,
}: api::v1::region::ApplyStagingManifestRequest,
) -> Result<Vec<(RegionId, RegionRequest)>> {
let region_id = region_id.into();
let files_to_add = files_to_add
.context(UnexpectedSnafu {
reason: "'files_to_add' field is missing",
})?
.data;
Ok(vec![(
region_id,
RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest {
partition_expr,
files_to_add,
}),
)])
}
/// Request to put data into a region.
#[derive(Debug)]
pub struct RegionPutRequest {
@@ -1428,6 +1454,30 @@ pub struct EnterStagingRequest {
pub partition_expr: String,
}
/// This request is used as part of the region repartition.
///
/// After a region has entered staging mode with a new region rule (partition
/// expression) and a separate process (for example, `remap_manifests`) has
/// generated the new file assignments for the staging region, this request
/// applies that generated manifest to the region.
///
/// In practice, this means:
/// - The `partition_expr` identifies the staging region rule that the manifest
/// was generated for.
/// - `files_to_add` carries the serialized metadata (such as file manifests or
/// file lists) that should be attached to the region under the new rule.
///
/// It should typically be called **after** the staging region has been
/// initialized by [`EnterStagingRequest`] and the new file layout has been
/// computed, to finalize the repartition operation.
#[derive(Debug, Clone)]
pub struct ApplyStagingManifestRequest {
/// The partition expression of the staging region.
pub partition_expr: String,
/// The files to add to the region.
pub files_to_add: Vec<u8>,
}
impl fmt::Display for RegionRequest {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
@@ -1445,6 +1495,7 @@ impl fmt::Display for RegionRequest {
RegionRequest::Catchup(_) => write!(f, "Catchup"),
RegionRequest::BulkInserts(_) => write!(f, "BulkInserts"),
RegionRequest::EnterStaging(_) => write!(f, "EnterStaging"),
RegionRequest::ApplyStagingManifest(_) => write!(f, "ApplyStagingManifest"),
}
}
}