mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-24 23:19:57 +00:00
Compare commits
2 Commits
MichaelSco
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e1b18614ee | ||
|
|
4bae75ccdb |
@@ -111,7 +111,7 @@ We has the following policy for AI-assisted PRs:
|
||||
|
||||
### Why fully AI-generated PRs without understanding are not helpful
|
||||
|
||||
Today, AI tools cannot reliably make complex changes to DataFusion on their own, which is why we rely on pull requests and code review.
|
||||
Today, AI tools cannot reliably make complex changes to GreptimeDB on their own, which is why we rely on pull requests and code review.
|
||||
|
||||
The purposes of code review are:
|
||||
|
||||
|
||||
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -5464,7 +5464,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "greptime-proto"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=173efe5ec62722089db7c531c0b0d470a072b915#173efe5ec62722089db7c531c0b0d470a072b915"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=520fa524f9d590752ea327683e82ffd65721b27c#520fa524f9d590752ea327683e82ffd65721b27c"
|
||||
dependencies = [
|
||||
"prost 0.13.5",
|
||||
"prost-types 0.13.5",
|
||||
|
||||
@@ -150,7 +150,7 @@ etcd-client = { version = "0.16.1", features = [
|
||||
fst = "0.4.7"
|
||||
futures = "0.3"
|
||||
futures-util = "0.3"
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "173efe5ec62722089db7c531c0b0d470a072b915" }
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "520fa524f9d590752ea327683e82ffd65721b27c" }
|
||||
hex = "0.4"
|
||||
http = "1"
|
||||
humantime = "2.1"
|
||||
|
||||
@@ -1216,7 +1216,8 @@ impl RegionServerInner {
|
||||
| RegionRequest::Compact(_)
|
||||
| RegionRequest::Truncate(_)
|
||||
| RegionRequest::BuildIndex(_)
|
||||
| RegionRequest::EnterStaging(_) => RegionChange::None,
|
||||
| RegionRequest::EnterStaging(_)
|
||||
| RegionRequest::ApplyStagingManifest(_) => RegionChange::None,
|
||||
RegionRequest::Catchup(_) => RegionChange::Catchup,
|
||||
};
|
||||
|
||||
|
||||
@@ -220,6 +220,13 @@ impl RegionEngine for MetricEngine {
|
||||
UnsupportedRegionRequestSnafu { request }.fail()
|
||||
}
|
||||
}
|
||||
RegionRequest::ApplyStagingManifest(_) => {
|
||||
if self.inner.is_physical_region(region_id) {
|
||||
return self.inner.mito.handle_request(region_id, request).await;
|
||||
} else {
|
||||
UnsupportedRegionRequestSnafu { request }.fail()
|
||||
}
|
||||
}
|
||||
RegionRequest::Put(put) => self.inner.put_region(region_id, put).await,
|
||||
RegionRequest::Create(create) => {
|
||||
self.inner
|
||||
|
||||
@@ -162,6 +162,7 @@ impl CompactionTaskImpl {
|
||||
edit,
|
||||
result: Ok(()),
|
||||
update_region_state: false,
|
||||
is_staging: false,
|
||||
}),
|
||||
})
|
||||
.await;
|
||||
|
||||
@@ -76,6 +76,8 @@ mod copy_region_from_test;
|
||||
#[cfg(test)]
|
||||
mod remap_manifests_test;
|
||||
|
||||
#[cfg(test)]
|
||||
mod apply_staging_manifest_test;
|
||||
mod puffin_index;
|
||||
|
||||
use std::any::Any;
|
||||
|
||||
400
src/mito2/src/engine/apply_staging_manifest_test.rs
Normal file
400
src/mito2/src/engine/apply_staging_manifest_test.rs
Normal file
@@ -0,0 +1,400 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::assert_matches::assert_matches;
|
||||
use std::fs;
|
||||
|
||||
use api::v1::Rows;
|
||||
use datatypes::value::Value;
|
||||
use partition::expr::{PartitionExpr, col};
|
||||
use store_api::region_engine::{
|
||||
RegionEngine, RegionRole, RemapManifestsRequest, SettableRegionRoleState,
|
||||
};
|
||||
use store_api::region_request::{
|
||||
ApplyStagingManifestRequest, EnterStagingRequest, RegionFlushRequest, RegionRequest,
|
||||
};
|
||||
use store_api::storage::{FileId, RegionId};
|
||||
|
||||
use crate::config::MitoConfig;
|
||||
use crate::error::Error;
|
||||
use crate::manifest::action::RegionManifest;
|
||||
use crate::sst::file::FileMeta;
|
||||
use crate::test_util::{CreateRequestBuilder, TestEnv, build_rows, put_rows, rows_schema};
|
||||
|
||||
fn range_expr(col_name: &str, start: i64, end: i64) -> PartitionExpr {
|
||||
col(col_name)
|
||||
.gt_eq(Value::Int64(start))
|
||||
.and(col(col_name).lt(Value::Int64(end)))
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_apply_staging_manifest_invalid_region_state() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
test_apply_staging_manifest_invalid_region_state_with_format(false).await;
|
||||
test_apply_staging_manifest_invalid_region_state_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_apply_staging_manifest_invalid_region_state_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::with_prefix("invalid-region-state").await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new()
|
||||
.partition_expr_json(Some(range_expr("x", 0, 50).as_json_str().unwrap()))
|
||||
.build();
|
||||
engine
|
||||
.handle_request(region_id, RegionRequest::Create(request))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Region is in leader state, apply staging manifest request should fail.
|
||||
let err = engine
|
||||
.handle_request(
|
||||
region_id,
|
||||
RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest {
|
||||
partition_expr: range_expr("x", 0, 100).as_json_str().unwrap(),
|
||||
files_to_add: vec![],
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert_matches!(
|
||||
err.into_inner().as_any().downcast_ref::<Error>().unwrap(),
|
||||
Error::RegionState { .. }
|
||||
);
|
||||
|
||||
// Region is in leader state, apply staging manifest request should fail.
|
||||
engine
|
||||
.set_region_role(region_id, RegionRole::Follower)
|
||||
.unwrap();
|
||||
let err = engine
|
||||
.handle_request(
|
||||
region_id,
|
||||
RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest {
|
||||
partition_expr: range_expr("x", 0, 100).as_json_str().unwrap(),
|
||||
files_to_add: vec![],
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert_matches!(
|
||||
err.into_inner().as_any().downcast_ref::<Error>().unwrap(),
|
||||
Error::RegionState { .. }
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_apply_staging_manifest_mismatched_partition_expr() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
test_apply_staging_manifest_mismatched_partition_expr_with_format(false).await;
|
||||
test_apply_staging_manifest_mismatched_partition_expr_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_apply_staging_manifest_mismatched_partition_expr_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::with_prefix("mismatched-partition-expr").await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new().build();
|
||||
engine
|
||||
.handle_request(region_id, RegionRequest::Create(request))
|
||||
.await
|
||||
.unwrap();
|
||||
engine
|
||||
.handle_request(
|
||||
region_id,
|
||||
RegionRequest::EnterStaging(EnterStagingRequest {
|
||||
partition_expr: range_expr("x", 0, 50).as_json_str().unwrap(),
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let err = engine
|
||||
.handle_request(
|
||||
region_id,
|
||||
RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest {
|
||||
partition_expr: range_expr("x", 0, 100).as_json_str().unwrap(),
|
||||
files_to_add: vec![],
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert_matches!(
|
||||
err.into_inner().as_any().downcast_ref::<Error>().unwrap(),
|
||||
Error::StagingPartitionExprMismatch { .. }
|
||||
)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_apply_staging_manifest_success() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
test_apply_staging_manifest_success_with_format(false).await;
|
||||
test_apply_staging_manifest_success_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_apply_staging_manifest_success_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::with_prefix("success").await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new()
|
||||
.partition_expr_json(Some(range_expr("tag_0", 0, 100).as_json_str().unwrap()))
|
||||
.build();
|
||||
let column_schemas = rows_schema(&request);
|
||||
engine
|
||||
.handle_request(region_id, RegionRequest::Create(request))
|
||||
.await
|
||||
.unwrap();
|
||||
let new_region_id_1 = RegionId::new(1, 2);
|
||||
let new_region_id_2 = RegionId::new(1, 3);
|
||||
// Generate some data
|
||||
for i in 0..3 {
|
||||
let rows_data = Rows {
|
||||
schema: column_schemas.clone(),
|
||||
rows: build_rows(i * 10, (i + 1) * 10),
|
||||
};
|
||||
put_rows(&engine, region_id, rows_data).await;
|
||||
engine
|
||||
.handle_request(
|
||||
region_id,
|
||||
RegionRequest::Flush(RegionFlushRequest {
|
||||
row_group_size: None,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
engine
|
||||
.set_region_role_state_gracefully(region_id, SettableRegionRoleState::StagingLeader)
|
||||
.await
|
||||
.unwrap();
|
||||
let result = engine
|
||||
.remap_manifests(RemapManifestsRequest {
|
||||
region_id,
|
||||
input_regions: vec![region_id],
|
||||
region_mapping: [(region_id, vec![new_region_id_1, new_region_id_2])]
|
||||
.into_iter()
|
||||
.collect(),
|
||||
new_partition_exprs: [
|
||||
(
|
||||
new_region_id_1,
|
||||
range_expr("tag_0", 0, 50).as_json_str().unwrap(),
|
||||
),
|
||||
(
|
||||
new_region_id_2,
|
||||
range_expr("tag_0", 50, 100).as_json_str().unwrap(),
|
||||
),
|
||||
]
|
||||
.into_iter()
|
||||
.collect(),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(result.new_manifests.len(), 2);
|
||||
let new_manifest_1 =
|
||||
serde_json::from_str::<RegionManifest>(&result.new_manifests[&new_region_id_1]).unwrap();
|
||||
let new_manifest_2 =
|
||||
serde_json::from_str::<RegionManifest>(&result.new_manifests[&new_region_id_2]).unwrap();
|
||||
assert_eq!(new_manifest_1.files.len(), 3);
|
||||
assert_eq!(new_manifest_2.files.len(), 3);
|
||||
|
||||
let request = CreateRequestBuilder::new().build();
|
||||
engine
|
||||
.handle_request(new_region_id_1, RegionRequest::Create(request))
|
||||
.await
|
||||
.unwrap();
|
||||
engine
|
||||
.handle_request(
|
||||
new_region_id_1,
|
||||
RegionRequest::EnterStaging(EnterStagingRequest {
|
||||
partition_expr: range_expr("tag_0", 0, 50).as_json_str().unwrap(),
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut files_to_add = new_manifest_1.files.values().cloned().collect::<Vec<_>>();
|
||||
// Before apply staging manifest, the files should be empty
|
||||
let region = engine.get_region(new_region_id_1).unwrap();
|
||||
let manifest = region.manifest_ctx.manifest().await;
|
||||
assert_eq!(manifest.files.len(), 0);
|
||||
let staging_manifest = region.manifest_ctx.staging_manifest().await.unwrap();
|
||||
assert_eq!(staging_manifest.files.len(), 0);
|
||||
|
||||
engine
|
||||
.handle_request(
|
||||
new_region_id_1,
|
||||
RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest {
|
||||
partition_expr: range_expr("tag_0", 0, 50).as_json_str().unwrap(),
|
||||
files_to_add: serde_json::to_vec(&files_to_add).unwrap(),
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
// After apply staging manifest, the files should be the same as the new manifest
|
||||
let region = engine.get_region(new_region_id_1).unwrap();
|
||||
let manifest = region.manifest_ctx.manifest().await;
|
||||
assert_eq!(manifest.files.len(), 3);
|
||||
assert!(region.is_writable());
|
||||
assert!(!region.is_staging());
|
||||
// The manifest partition expr should be the same as the request.
|
||||
assert_eq!(
|
||||
manifest.metadata.partition_expr.as_ref().unwrap(),
|
||||
&range_expr("tag_0", 0, 50).as_json_str().unwrap()
|
||||
);
|
||||
// The staging manifest should be cleared.
|
||||
let staging_manifest = region.manifest_ctx.staging_manifest().await;
|
||||
assert!(staging_manifest.is_none());
|
||||
// The staging partition expr should be cleared.
|
||||
assert!(region.staging_partition_expr.lock().unwrap().is_none());
|
||||
// The staging manifest directory should be empty.
|
||||
let data_home = env.data_home();
|
||||
let region_dir = format!("{}/data/test/1_0000000001", data_home.display());
|
||||
let staging_manifest_dir = format!("{}/staging/manifest", region_dir);
|
||||
let staging_files = fs::read_dir(&staging_manifest_dir)
|
||||
.map(|entries| entries.collect::<Result<Vec<_>, _>>().unwrap_or_default())
|
||||
.unwrap_or_default();
|
||||
assert_eq!(staging_files.len(), 0);
|
||||
|
||||
// Try to modify the file sequence.
|
||||
files_to_add.push(FileMeta {
|
||||
region_id,
|
||||
file_id: FileId::random(),
|
||||
..Default::default()
|
||||
});
|
||||
// This request will be ignored.
|
||||
engine
|
||||
.handle_request(
|
||||
new_region_id_1,
|
||||
RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest {
|
||||
partition_expr: range_expr("tag_0", 0, 50).as_json_str().unwrap(),
|
||||
files_to_add: serde_json::to_vec(&files_to_add).unwrap(),
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
// The files number should not change.
|
||||
let region = engine.get_region(new_region_id_1).unwrap();
|
||||
let manifest = region.manifest_ctx.manifest().await;
|
||||
assert_eq!(manifest.files.len(), 3);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_apply_staging_manifest_invalid_files_to_add() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
test_apply_staging_manifest_invalid_files_to_add_with_format(false).await;
|
||||
test_apply_staging_manifest_invalid_files_to_add_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_apply_staging_manifest_invalid_files_to_add_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::with_prefix("invalid-files-to-add").await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new().build();
|
||||
engine
|
||||
.handle_request(region_id, RegionRequest::Create(request))
|
||||
.await
|
||||
.unwrap();
|
||||
engine
|
||||
.handle_request(
|
||||
region_id,
|
||||
RegionRequest::EnterStaging(EnterStagingRequest {
|
||||
partition_expr: range_expr("tag_0", 0, 50).as_json_str().unwrap(),
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let err = engine
|
||||
.handle_request(
|
||||
region_id,
|
||||
RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest {
|
||||
partition_expr: range_expr("tag_0", 0, 50).as_json_str().unwrap(),
|
||||
files_to_add: b"invalid".to_vec(),
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert_matches!(
|
||||
err.into_inner().as_any().downcast_ref::<Error>().unwrap(),
|
||||
Error::SerdeJson { .. }
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_apply_staging_manifest_empty_files() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
test_apply_staging_manifest_empty_files_with_format(false).await;
|
||||
test_apply_staging_manifest_empty_files_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_apply_staging_manifest_empty_files_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::with_prefix("empty-files").await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new().build();
|
||||
engine
|
||||
.handle_request(region_id, RegionRequest::Create(request))
|
||||
.await
|
||||
.unwrap();
|
||||
engine
|
||||
.handle_request(
|
||||
region_id,
|
||||
RegionRequest::EnterStaging(EnterStagingRequest {
|
||||
partition_expr: range_expr("tag_0", 0, 50).as_json_str().unwrap(),
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
engine
|
||||
.handle_request(
|
||||
region_id,
|
||||
RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest {
|
||||
partition_expr: range_expr("tag_0", 0, 50).as_json_str().unwrap(),
|
||||
files_to_add: serde_json::to_vec::<Vec<FileMeta>>(&vec![]).unwrap(),
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let region = engine.get_region(region_id).unwrap();
|
||||
let manifest = region.manifest_ctx.manifest().await;
|
||||
assert_eq!(manifest.files.len(), 0);
|
||||
let staging_manifest = region.manifest_ctx.staging_manifest().await;
|
||||
assert!(staging_manifest.is_none());
|
||||
let staging_partition_expr = region.staging_partition_expr.lock().unwrap();
|
||||
assert!(staging_partition_expr.is_none());
|
||||
}
|
||||
@@ -45,6 +45,18 @@ pub enum RegionMetaAction {
|
||||
Truncate(RegionTruncate),
|
||||
}
|
||||
|
||||
impl RegionMetaAction {
|
||||
/// Returns true if the action is a change action.
|
||||
pub fn is_change(&self) -> bool {
|
||||
matches!(self, RegionMetaAction::Change(_))
|
||||
}
|
||||
|
||||
/// Returns true if the action is an edit action.
|
||||
pub fn is_edit(&self) -> bool {
|
||||
matches!(self, RegionMetaAction::Edit(_))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
|
||||
pub struct RegionChange {
|
||||
/// The metadata after changed.
|
||||
@@ -438,7 +450,8 @@ impl RegionMetaActionList {
|
||||
Self { actions }
|
||||
}
|
||||
|
||||
pub fn into_region_edit(self) -> RegionEdit {
|
||||
/// Split the actions into a region change and an edit.
|
||||
pub fn split_region_change_and_edit(self) -> (Option<RegionChange>, RegionEdit) {
|
||||
let mut edit = RegionEdit {
|
||||
files_to_add: Vec::new(),
|
||||
files_to_remove: Vec::new(),
|
||||
@@ -448,31 +461,39 @@ impl RegionMetaActionList {
|
||||
flushed_sequence: None,
|
||||
committed_sequence: None,
|
||||
};
|
||||
|
||||
let mut region_change = None;
|
||||
for action in self.actions {
|
||||
if let RegionMetaAction::Edit(region_edit) = action {
|
||||
// Merge file adds/removes
|
||||
edit.files_to_add.extend(region_edit.files_to_add);
|
||||
edit.files_to_remove.extend(region_edit.files_to_remove);
|
||||
// Max of flushed entry id / sequence
|
||||
if let Some(eid) = region_edit.flushed_entry_id {
|
||||
edit.flushed_entry_id = Some(edit.flushed_entry_id.map_or(eid, |v| v.max(eid)));
|
||||
match action {
|
||||
RegionMetaAction::Change(change) => {
|
||||
region_change = Some(change);
|
||||
}
|
||||
if let Some(seq) = region_edit.flushed_sequence {
|
||||
edit.flushed_sequence = Some(edit.flushed_sequence.map_or(seq, |v| v.max(seq)));
|
||||
}
|
||||
if let Some(seq) = region_edit.committed_sequence {
|
||||
edit.committed_sequence =
|
||||
Some(edit.committed_sequence.map_or(seq, |v| v.max(seq)));
|
||||
}
|
||||
// Prefer the latest non-none time window
|
||||
if region_edit.compaction_time_window.is_some() {
|
||||
edit.compaction_time_window = region_edit.compaction_time_window;
|
||||
RegionMetaAction::Edit(region_edit) => {
|
||||
// Merge file adds/removes
|
||||
edit.files_to_add.extend(region_edit.files_to_add);
|
||||
edit.files_to_remove.extend(region_edit.files_to_remove);
|
||||
// Max of flushed entry id / sequence
|
||||
if let Some(eid) = region_edit.flushed_entry_id {
|
||||
edit.flushed_entry_id =
|
||||
Some(edit.flushed_entry_id.map_or(eid, |v| v.max(eid)));
|
||||
}
|
||||
if let Some(seq) = region_edit.flushed_sequence {
|
||||
edit.flushed_sequence =
|
||||
Some(edit.flushed_sequence.map_or(seq, |v| v.max(seq)));
|
||||
}
|
||||
if let Some(seq) = region_edit.committed_sequence {
|
||||
edit.committed_sequence =
|
||||
Some(edit.committed_sequence.map_or(seq, |v| v.max(seq)));
|
||||
}
|
||||
// Prefer the latest non-none time window
|
||||
if region_edit.compaction_time_window.is_some() {
|
||||
edit.compaction_time_window = region_edit.compaction_time_window;
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
edit
|
||||
(region_change, edit)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -45,7 +45,7 @@ pub use utils::*;
|
||||
use crate::access_layer::AccessLayerRef;
|
||||
use crate::error::{
|
||||
FlushableRegionStateSnafu, InvalidPartitionExprSnafu, RegionNotFoundSnafu, RegionStateSnafu,
|
||||
RegionTruncatedSnafu, Result, UpdateManifestSnafu,
|
||||
RegionTruncatedSnafu, Result, UnexpectedSnafu, UpdateManifestSnafu,
|
||||
};
|
||||
use crate::manifest::action::{
|
||||
RegionChange, RegionManifest, RegionMetaAction, RegionMetaActionList,
|
||||
@@ -102,6 +102,16 @@ pub enum RegionRoleState {
|
||||
Follower,
|
||||
}
|
||||
|
||||
impl RegionRoleState {
|
||||
/// Converts the region role state to leader state if it is a leader state.
|
||||
pub fn into_leader_state(self) -> Option<RegionLeaderState> {
|
||||
match self {
|
||||
RegionRoleState::Leader(leader_state) => Some(leader_state),
|
||||
RegionRoleState::Follower => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Metadata and runtime status of a region.
|
||||
///
|
||||
/// Writing and reading a region follow a single-writer-multi-reader rule:
|
||||
@@ -322,11 +332,8 @@ impl MitoRegion {
|
||||
|
||||
/// Sets the editing state.
|
||||
/// You should call this method in the worker loop.
|
||||
pub(crate) fn set_editing(&self) -> Result<()> {
|
||||
self.compare_exchange_state(
|
||||
RegionLeaderState::Writable,
|
||||
RegionRoleState::Leader(RegionLeaderState::Editing),
|
||||
)
|
||||
pub(crate) fn set_editing(&self, expect: RegionLeaderState) -> Result<()> {
|
||||
self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Editing))
|
||||
}
|
||||
|
||||
/// Sets the staging state.
|
||||
@@ -359,6 +366,7 @@ impl MitoRegion {
|
||||
/// You should call this method in the worker loop.
|
||||
/// Transitions from Staging to Writable state.
|
||||
pub fn exit_staging(&self) -> Result<()> {
|
||||
*self.staging_partition_expr.lock().unwrap() = None;
|
||||
self.compare_exchange_state(
|
||||
RegionLeaderState::Staging,
|
||||
RegionRoleState::Leader(RegionLeaderState::Writable),
|
||||
@@ -370,7 +378,8 @@ impl MitoRegion {
|
||||
&self,
|
||||
state: SettableRegionRoleState,
|
||||
) -> Result<()> {
|
||||
let mut manager = self.manifest_ctx.manifest_manager.write().await;
|
||||
let mut manager: RwLockWriteGuard<'_, RegionManifestManager> =
|
||||
self.manifest_ctx.manifest_manager.write().await;
|
||||
let current_state = self.state();
|
||||
|
||||
match state {
|
||||
@@ -705,6 +714,20 @@ impl MitoRegion {
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
let expect_change = merged_actions.actions.iter().any(|a| a.is_change());
|
||||
let expect_edit = merged_actions.actions.iter().any(|a| a.is_edit());
|
||||
ensure!(
|
||||
expect_change,
|
||||
UnexpectedSnafu {
|
||||
reason: "expect a change action in merged actions"
|
||||
}
|
||||
);
|
||||
ensure!(
|
||||
expect_edit,
|
||||
UnexpectedSnafu {
|
||||
reason: "expect an edit action in merged actions"
|
||||
}
|
||||
);
|
||||
|
||||
// Submit merged actions using the manifest manager's update method
|
||||
// Pass the `false` so it saves to normal directory, not staging
|
||||
@@ -716,12 +739,17 @@ impl MitoRegion {
|
||||
);
|
||||
|
||||
// Apply the merged changes to in-memory version control
|
||||
let merged_edit = merged_actions.into_region_edit();
|
||||
let (merged_change, merged_edit) = merged_actions.split_region_change_and_edit();
|
||||
// Safety: we have already ensured that there is a change action in the merged actions.
|
||||
let new_metadata = merged_change.as_ref().unwrap().metadata.clone();
|
||||
self.version_control.alter_schema(new_metadata);
|
||||
self.version_control
|
||||
.apply_edit(Some(merged_edit), &[], self.file_purger.clone());
|
||||
|
||||
// Clear all staging manifests and transit state
|
||||
manager.store().clear_staging_manifests().await?;
|
||||
if let Err(e) = manager.clear_staging_manifest_and_dir().await {
|
||||
error!(e; "Failed to clear staging manifest dir for region {}", self.region_id);
|
||||
}
|
||||
self.exit_staging()?;
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -37,10 +37,10 @@ use store_api::region_engine::{
|
||||
MitoCopyRegionFromResponse, SetRegionRoleStateResponse, SettableRegionRoleState,
|
||||
};
|
||||
use store_api::region_request::{
|
||||
AffectedRows, EnterStagingRequest, RegionAlterRequest, RegionBuildIndexRequest,
|
||||
RegionBulkInsertsRequest, RegionCatchupRequest, RegionCloseRequest, RegionCompactRequest,
|
||||
RegionCreateRequest, RegionFlushRequest, RegionOpenRequest, RegionRequest,
|
||||
RegionTruncateRequest,
|
||||
AffectedRows, ApplyStagingManifestRequest, EnterStagingRequest, RegionAlterRequest,
|
||||
RegionBuildIndexRequest, RegionBulkInsertsRequest, RegionCatchupRequest, RegionCloseRequest,
|
||||
RegionCompactRequest, RegionCreateRequest, RegionFlushRequest, RegionOpenRequest,
|
||||
RegionRequest, RegionTruncateRequest,
|
||||
};
|
||||
use store_api::storage::{FileId, RegionId};
|
||||
use tokio::sync::oneshot::{self, Receiver, Sender};
|
||||
@@ -741,6 +741,11 @@ impl WorkerRequest {
|
||||
sender: sender.into(),
|
||||
request: region_bulk_inserts_request,
|
||||
},
|
||||
RegionRequest::ApplyStagingManifest(v) => WorkerRequest::Ddl(SenderDdlRequest {
|
||||
region_id,
|
||||
sender: sender.into(),
|
||||
request: DdlRequest::ApplyStagingManifest(v),
|
||||
}),
|
||||
};
|
||||
|
||||
Ok((worker_request, receiver))
|
||||
@@ -852,6 +857,7 @@ pub(crate) enum DdlRequest {
|
||||
Truncate(RegionTruncateRequest),
|
||||
Catchup((RegionCatchupRequest, Option<WalEntryReceiver>)),
|
||||
EnterStaging(EnterStagingRequest),
|
||||
ApplyStagingManifest(ApplyStagingManifestRequest),
|
||||
}
|
||||
|
||||
/// Sender and Ddl request.
|
||||
@@ -1080,6 +1086,8 @@ pub(crate) struct RegionEditResult {
|
||||
pub(crate) result: Result<()>,
|
||||
/// Whether region state need to be set to Writable after handling this request.
|
||||
pub(crate) update_region_state: bool,
|
||||
/// The region is in staging mode before handling this request.
|
||||
pub(crate) is_staging: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
//! Structs and utilities for writing regions.
|
||||
|
||||
mod handle_alter;
|
||||
mod handle_apply_staging;
|
||||
mod handle_bulk_insert;
|
||||
mod handle_catchup;
|
||||
mod handle_close;
|
||||
@@ -1005,7 +1006,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
.await;
|
||||
}
|
||||
WorkerRequest::EditRegion(request) => {
|
||||
self.handle_region_edit(request).await;
|
||||
self.handle_region_edit(request);
|
||||
}
|
||||
WorkerRequest::Stop => {
|
||||
debug_assert!(!self.running.load(Ordering::Relaxed));
|
||||
@@ -1107,6 +1108,11 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
.await;
|
||||
continue;
|
||||
}
|
||||
DdlRequest::ApplyStagingManifest(req) => {
|
||||
self.handle_apply_staging_manifest_request(ddl.region_id, req, ddl.sender)
|
||||
.await;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
ddl.sender.send(res);
|
||||
|
||||
140
src/mito2/src/worker/handle_apply_staging.rs
Normal file
140
src/mito2/src/worker/handle_apply_staging.rs
Normal file
@@ -0,0 +1,140 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use chrono::Utc;
|
||||
use common_telemetry::{debug, info};
|
||||
use snafu::ResultExt;
|
||||
use store_api::logstore::LogStore;
|
||||
use store_api::region_request::ApplyStagingManifestRequest;
|
||||
use store_api::storage::RegionId;
|
||||
use tokio::sync::oneshot;
|
||||
|
||||
use crate::error::{
|
||||
RegionStateSnafu, SerdeJsonSnafu, StagingPartitionExprMismatchSnafu, UnexpectedSnafu,
|
||||
};
|
||||
use crate::manifest::action::RegionEdit;
|
||||
use crate::region::{RegionLeaderState, RegionRoleState};
|
||||
use crate::request::{OptionOutputTx, RegionEditRequest};
|
||||
use crate::sst::file::FileMeta;
|
||||
use crate::worker::RegionWorkerLoop;
|
||||
|
||||
impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
pub(crate) async fn handle_apply_staging_manifest_request(
|
||||
&mut self,
|
||||
region_id: RegionId,
|
||||
request: ApplyStagingManifestRequest,
|
||||
sender: OptionOutputTx,
|
||||
) {
|
||||
let region = match self.regions.writable_region(region_id) {
|
||||
Ok(region) => region,
|
||||
Err(e) => {
|
||||
sender.send(Err(e));
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
if !region.is_staging() {
|
||||
let manifest_partition_expr = region.metadata().partition_expr.as_ref().cloned();
|
||||
let is_match = manifest_partition_expr.as_ref() == Some(&request.partition_expr);
|
||||
debug!(
|
||||
"region {} manifest partition expr: {:?}, request partition expr: {:?}",
|
||||
region_id, manifest_partition_expr, request.partition_expr
|
||||
);
|
||||
if is_match {
|
||||
// If current partition expr is already the same as the request,
|
||||
// treats the region already applied the staging manifest.
|
||||
info!(
|
||||
"Region {} already applied the staging manifest, partition expr: {}, ignore the apply staging manifest request",
|
||||
region_id, request.partition_expr
|
||||
);
|
||||
sender.send(Ok(0));
|
||||
return;
|
||||
}
|
||||
|
||||
sender.send(
|
||||
RegionStateSnafu {
|
||||
region_id,
|
||||
state: region.state(),
|
||||
expect: RegionRoleState::Leader(RegionLeaderState::Staging),
|
||||
}
|
||||
.fail(),
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
let staging_partition_expr = region.staging_partition_expr.lock().unwrap().clone();
|
||||
// If the partition expr mismatch, return error.
|
||||
if staging_partition_expr.as_ref() != Some(&request.partition_expr) {
|
||||
sender.send(
|
||||
StagingPartitionExprMismatchSnafu {
|
||||
manifest_expr: staging_partition_expr,
|
||||
request_expr: request.partition_expr,
|
||||
}
|
||||
.fail(),
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let files_to_add = match serde_json::from_slice::<Vec<FileMeta>>(&request.files_to_add)
|
||||
.context(SerdeJsonSnafu)
|
||||
{
|
||||
Ok(files_to_add) => files_to_add,
|
||||
Err(e) => {
|
||||
sender.send(Err(e));
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
info!("Applying staging manifest request to region {}", region_id);
|
||||
self.handle_region_edit(RegionEditRequest {
|
||||
region_id,
|
||||
edit: RegionEdit {
|
||||
files_to_add,
|
||||
files_to_remove: vec![],
|
||||
timestamp_ms: Some(Utc::now().timestamp_millis()),
|
||||
compaction_time_window: None,
|
||||
flushed_entry_id: None,
|
||||
flushed_sequence: None,
|
||||
committed_sequence: None,
|
||||
},
|
||||
tx,
|
||||
});
|
||||
|
||||
common_runtime::spawn_global(async move {
|
||||
// Await the result from the region edit and forward the outcome to the original sender.
|
||||
// If the operation completes successfully, respond with Ok(0); otherwise, respond with an appropriate error.
|
||||
if let Ok(result) = rx.await {
|
||||
let Ok(()) = result else {
|
||||
sender.send(result.map(|_| 0));
|
||||
return;
|
||||
};
|
||||
let mut manager = region.manifest_ctx.manifest_manager.write().await;
|
||||
match region.exit_staging_on_success(&mut manager).await {
|
||||
Ok(()) => {
|
||||
sender.send(Ok(0));
|
||||
}
|
||||
Err(e) => sender.send(Err(e)),
|
||||
}
|
||||
} else {
|
||||
sender.send(
|
||||
UnexpectedSnafu {
|
||||
reason: "edit region receiver channel closed",
|
||||
}
|
||||
.fail(),
|
||||
);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -214,7 +214,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
|
||||
impl<S> RegionWorkerLoop<S> {
|
||||
/// Handles region edit request.
|
||||
pub(crate) async fn handle_region_edit(&mut self, request: RegionEditRequest) {
|
||||
pub(crate) fn handle_region_edit(&mut self, request: RegionEditRequest) {
|
||||
let region_id = request.region_id;
|
||||
let Some(region) = self.regions.get_region(region_id) else {
|
||||
let _ = request.tx.send(RegionNotFoundSnafu { region_id }.fail());
|
||||
@@ -246,8 +246,15 @@ impl<S> RegionWorkerLoop<S> {
|
||||
file.sequence = NonZeroU64::new(file_sequence);
|
||||
}
|
||||
|
||||
// Allow retrieving `is_staging` before spawn the edit region task.
|
||||
let is_staging = region.is_staging();
|
||||
let expect_state = if is_staging {
|
||||
RegionLeaderState::Staging
|
||||
} else {
|
||||
RegionLeaderState::Writable
|
||||
};
|
||||
// Marks the region as editing.
|
||||
if let Err(e) = region.set_editing() {
|
||||
if let Err(e) = region.set_editing(expect_state) {
|
||||
let _ = sender.send(Err(e));
|
||||
return;
|
||||
}
|
||||
@@ -258,7 +265,8 @@ impl<S> RegionWorkerLoop<S> {
|
||||
// Now the region is in editing state.
|
||||
// Updates manifest in background.
|
||||
common_runtime::spawn_global(async move {
|
||||
let result = edit_region(®ion, edit.clone(), cache_manager, listener).await;
|
||||
let result =
|
||||
edit_region(®ion, edit.clone(), cache_manager, listener, is_staging).await;
|
||||
let notify = WorkerRequest::Background {
|
||||
region_id,
|
||||
notify: BackgroundNotify::RegionEdit(RegionEditResult {
|
||||
@@ -268,6 +276,7 @@ impl<S> RegionWorkerLoop<S> {
|
||||
result,
|
||||
// we always need to restore region state after region edit
|
||||
update_region_state: true,
|
||||
is_staging,
|
||||
}),
|
||||
};
|
||||
|
||||
@@ -299,29 +308,39 @@ impl<S> RegionWorkerLoop<S> {
|
||||
}
|
||||
};
|
||||
|
||||
let need_compaction =
|
||||
edit_result.result.is_ok() && !edit_result.edit.files_to_add.is_empty();
|
||||
let need_compaction = if edit_result.is_staging {
|
||||
if edit_result.update_region_state {
|
||||
// For staging regions, edits are not applied immediately,
|
||||
// as they remain invisible until the region exits the staging state.
|
||||
region.switch_state_to_staging(RegionLeaderState::Editing);
|
||||
}
|
||||
|
||||
if edit_result.result.is_ok() {
|
||||
// Applies the edit to the region.
|
||||
region.version_control.apply_edit(
|
||||
Some(edit_result.edit),
|
||||
&[],
|
||||
region.file_purger.clone(),
|
||||
);
|
||||
}
|
||||
false
|
||||
} else {
|
||||
let need_compaction =
|
||||
edit_result.result.is_ok() && !edit_result.edit.files_to_add.is_empty();
|
||||
// Only apply the edit if the result is ok and region is not in staging state.
|
||||
if edit_result.result.is_ok() {
|
||||
// Applies the edit to the region.
|
||||
region.version_control.apply_edit(
|
||||
Some(edit_result.edit),
|
||||
&[],
|
||||
region.file_purger.clone(),
|
||||
);
|
||||
}
|
||||
if edit_result.update_region_state {
|
||||
region.switch_state_to_writable(RegionLeaderState::Editing);
|
||||
}
|
||||
|
||||
if edit_result.update_region_state {
|
||||
// Sets the region as writable.
|
||||
region.switch_state_to_writable(RegionLeaderState::Editing);
|
||||
}
|
||||
need_compaction
|
||||
};
|
||||
|
||||
let _ = edit_result.sender.send(edit_result.result);
|
||||
|
||||
if let Some(edit_queue) = self.region_edit_queues.get_mut(&edit_result.region_id)
|
||||
&& let Some(request) = edit_queue.dequeue()
|
||||
{
|
||||
self.handle_region_edit(request).await;
|
||||
self.handle_region_edit(request);
|
||||
}
|
||||
|
||||
if need_compaction {
|
||||
@@ -463,9 +482,9 @@ async fn edit_region(
|
||||
edit: RegionEdit,
|
||||
cache_manager: CacheManagerRef,
|
||||
listener: WorkerListener,
|
||||
is_staging: bool,
|
||||
) -> Result<()> {
|
||||
let region_id = region.region_id;
|
||||
let is_staging = region.is_staging();
|
||||
if let Some(write_cache) = cache_manager.write_cache() {
|
||||
for file_meta in &edit.files_to_add {
|
||||
let write_cache = write_cache.clone();
|
||||
@@ -530,7 +549,10 @@ async fn edit_region(
|
||||
}
|
||||
}
|
||||
|
||||
info!("Applying {edit:?} to region {}", region_id);
|
||||
info!(
|
||||
"Applying {edit:?} to region {}, is_staging: {}",
|
||||
region_id, is_staging
|
||||
);
|
||||
|
||||
let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit));
|
||||
region
|
||||
|
||||
@@ -152,6 +152,7 @@ pub enum RegionRequest {
|
||||
Catchup(RegionCatchupRequest),
|
||||
BulkInserts(RegionBulkInsertsRequest),
|
||||
EnterStaging(EnterStagingRequest),
|
||||
ApplyStagingManifest(ApplyStagingManifestRequest),
|
||||
}
|
||||
|
||||
impl RegionRequest {
|
||||
@@ -182,6 +183,9 @@ impl RegionRequest {
|
||||
reason: "ListMetadata request should be handled separately by RegionServer",
|
||||
}
|
||||
.fail(),
|
||||
region_request::Body::ApplyStagingManifest(apply) => {
|
||||
make_region_apply_staging_manifest(apply)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -413,6 +417,28 @@ fn make_region_bulk_inserts(request: BulkInsertRequest) -> Result<Vec<(RegionId,
|
||||
)])
|
||||
}
|
||||
|
||||
fn make_region_apply_staging_manifest(
|
||||
api::v1::region::ApplyStagingManifestRequest {
|
||||
region_id,
|
||||
partition_expr,
|
||||
files_to_add,
|
||||
}: api::v1::region::ApplyStagingManifestRequest,
|
||||
) -> Result<Vec<(RegionId, RegionRequest)>> {
|
||||
let region_id = region_id.into();
|
||||
let files_to_add = files_to_add
|
||||
.context(UnexpectedSnafu {
|
||||
reason: "'files_to_add' field is missing",
|
||||
})?
|
||||
.data;
|
||||
Ok(vec![(
|
||||
region_id,
|
||||
RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest {
|
||||
partition_expr,
|
||||
files_to_add,
|
||||
}),
|
||||
)])
|
||||
}
|
||||
|
||||
/// Request to put data into a region.
|
||||
#[derive(Debug)]
|
||||
pub struct RegionPutRequest {
|
||||
@@ -1428,6 +1454,30 @@ pub struct EnterStagingRequest {
|
||||
pub partition_expr: String,
|
||||
}
|
||||
|
||||
/// This request is used as part of the region repartition.
|
||||
///
|
||||
/// After a region has entered staging mode with a new region rule (partition
|
||||
/// expression) and a separate process (for example, `remap_manifests`) has
|
||||
/// generated the new file assignments for the staging region, this request
|
||||
/// applies that generated manifest to the region.
|
||||
///
|
||||
/// In practice, this means:
|
||||
/// - The `partition_expr` identifies the staging region rule that the manifest
|
||||
/// was generated for.
|
||||
/// - `files_to_add` carries the serialized metadata (such as file manifests or
|
||||
/// file lists) that should be attached to the region under the new rule.
|
||||
///
|
||||
/// It should typically be called **after** the staging region has been
|
||||
/// initialized by [`EnterStagingRequest`] and the new file layout has been
|
||||
/// computed, to finalize the repartition operation.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ApplyStagingManifestRequest {
|
||||
/// The partition expression of the staging region.
|
||||
pub partition_expr: String,
|
||||
/// The files to add to the region.
|
||||
pub files_to_add: Vec<u8>,
|
||||
}
|
||||
|
||||
impl fmt::Display for RegionRequest {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
@@ -1445,6 +1495,7 @@ impl fmt::Display for RegionRequest {
|
||||
RegionRequest::Catchup(_) => write!(f, "Catchup"),
|
||||
RegionRequest::BulkInserts(_) => write!(f, "BulkInserts"),
|
||||
RegionRequest::EnterStaging(_) => write!(f, "EnterStaging"),
|
||||
RegionRequest::ApplyStagingManifest(_) => write!(f, "ApplyStagingManifest"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user