Compare commits

..

2 Commits

Author SHA1 Message Date
LFC
6593996255 Update project name from DataFusion to GreptimeDB 2025-12-24 16:14:39 +08:00
LFC
4a1156ba0b Fix typo in AI-assisted contributions policy 2025-12-24 16:11:03 +08:00
15 changed files with 58 additions and 745 deletions

View File

@@ -104,7 +104,7 @@ All commit messages SHOULD adhere to the [Conventional Commits specification](ht
## AI-Assisted contributions
We has the following policy for AI-assisted PRs:
We have the following policy for AI-assisted PRs:
- The PR author should **understand the core ideas** behind the implementation **end-to-end**, and be able to justify the design and code during review.
- **Calls out unknowns and assumptions**. It's okay to not fully understand some bits of AI generated code. You should comment on these cases and point them out to reviewers so that they can use their knowledge of the codebase to clear up any concerns. For example, you might comment "calling this function here seems to work but I'm not familiar with how it works internally, I wonder if there's a race condition if it is called concurrently".

2
Cargo.lock generated
View File

@@ -5464,7 +5464,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=520fa524f9d590752ea327683e82ffd65721b27c#520fa524f9d590752ea327683e82ffd65721b27c"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=173efe5ec62722089db7c531c0b0d470a072b915#173efe5ec62722089db7c531c0b0d470a072b915"
dependencies = [
"prost 0.13.5",
"prost-types 0.13.5",

View File

@@ -150,7 +150,7 @@ etcd-client = { version = "0.16.1", features = [
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "520fa524f9d590752ea327683e82ffd65721b27c" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "173efe5ec62722089db7c531c0b0d470a072b915" }
hex = "0.4"
http = "1"
humantime = "2.1"

View File

@@ -1216,8 +1216,7 @@ impl RegionServerInner {
| RegionRequest::Compact(_)
| RegionRequest::Truncate(_)
| RegionRequest::BuildIndex(_)
| RegionRequest::EnterStaging(_)
| RegionRequest::ApplyStagingManifest(_) => RegionChange::None,
| RegionRequest::EnterStaging(_) => RegionChange::None,
RegionRequest::Catchup(_) => RegionChange::Catchup,
};

View File

@@ -220,13 +220,6 @@ impl RegionEngine for MetricEngine {
UnsupportedRegionRequestSnafu { request }.fail()
}
}
RegionRequest::ApplyStagingManifest(_) => {
if self.inner.is_physical_region(region_id) {
return self.inner.mito.handle_request(region_id, request).await;
} else {
UnsupportedRegionRequestSnafu { request }.fail()
}
}
RegionRequest::Put(put) => self.inner.put_region(region_id, put).await,
RegionRequest::Create(create) => {
self.inner

View File

@@ -162,7 +162,6 @@ impl CompactionTaskImpl {
edit,
result: Ok(()),
update_region_state: false,
is_staging: false,
}),
})
.await;

View File

@@ -76,8 +76,6 @@ mod copy_region_from_test;
#[cfg(test)]
mod remap_manifests_test;
#[cfg(test)]
mod apply_staging_manifest_test;
mod puffin_index;
use std::any::Any;

View File

@@ -1,400 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::assert_matches::assert_matches;
use std::fs;
use api::v1::Rows;
use datatypes::value::Value;
use partition::expr::{PartitionExpr, col};
use store_api::region_engine::{
RegionEngine, RegionRole, RemapManifestsRequest, SettableRegionRoleState,
};
use store_api::region_request::{
ApplyStagingManifestRequest, EnterStagingRequest, RegionFlushRequest, RegionRequest,
};
use store_api::storage::{FileId, RegionId};
use crate::config::MitoConfig;
use crate::error::Error;
use crate::manifest::action::RegionManifest;
use crate::sst::file::FileMeta;
use crate::test_util::{CreateRequestBuilder, TestEnv, build_rows, put_rows, rows_schema};
fn range_expr(col_name: &str, start: i64, end: i64) -> PartitionExpr {
col(col_name)
.gt_eq(Value::Int64(start))
.and(col(col_name).lt(Value::Int64(end)))
}
#[tokio::test]
async fn test_apply_staging_manifest_invalid_region_state() {
common_telemetry::init_default_ut_logging();
test_apply_staging_manifest_invalid_region_state_with_format(false).await;
test_apply_staging_manifest_invalid_region_state_with_format(true).await;
}
async fn test_apply_staging_manifest_invalid_region_state_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("invalid-region-state").await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
..Default::default()
})
.await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new()
.partition_expr_json(Some(range_expr("x", 0, 50).as_json_str().unwrap()))
.build();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
// Region is in leader state, apply staging manifest request should fail.
let err = engine
.handle_request(
region_id,
RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest {
partition_expr: range_expr("x", 0, 100).as_json_str().unwrap(),
files_to_add: vec![],
}),
)
.await
.unwrap_err();
assert_matches!(
err.into_inner().as_any().downcast_ref::<Error>().unwrap(),
Error::RegionState { .. }
);
// Region is in leader state, apply staging manifest request should fail.
engine
.set_region_role(region_id, RegionRole::Follower)
.unwrap();
let err = engine
.handle_request(
region_id,
RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest {
partition_expr: range_expr("x", 0, 100).as_json_str().unwrap(),
files_to_add: vec![],
}),
)
.await
.unwrap_err();
assert_matches!(
err.into_inner().as_any().downcast_ref::<Error>().unwrap(),
Error::RegionState { .. }
);
}
#[tokio::test]
async fn test_apply_staging_manifest_mismatched_partition_expr() {
common_telemetry::init_default_ut_logging();
test_apply_staging_manifest_mismatched_partition_expr_with_format(false).await;
test_apply_staging_manifest_mismatched_partition_expr_with_format(true).await;
}
async fn test_apply_staging_manifest_mismatched_partition_expr_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("mismatched-partition-expr").await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
..Default::default()
})
.await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
engine
.handle_request(
region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: range_expr("x", 0, 50).as_json_str().unwrap(),
}),
)
.await
.unwrap();
let err = engine
.handle_request(
region_id,
RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest {
partition_expr: range_expr("x", 0, 100).as_json_str().unwrap(),
files_to_add: vec![],
}),
)
.await
.unwrap_err();
assert_matches!(
err.into_inner().as_any().downcast_ref::<Error>().unwrap(),
Error::StagingPartitionExprMismatch { .. }
)
}
#[tokio::test]
async fn test_apply_staging_manifest_success() {
common_telemetry::init_default_ut_logging();
test_apply_staging_manifest_success_with_format(false).await;
test_apply_staging_manifest_success_with_format(true).await;
}
async fn test_apply_staging_manifest_success_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("success").await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
..Default::default()
})
.await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new()
.partition_expr_json(Some(range_expr("tag_0", 0, 100).as_json_str().unwrap()))
.build();
let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
let new_region_id_1 = RegionId::new(1, 2);
let new_region_id_2 = RegionId::new(1, 3);
// Generate some data
for i in 0..3 {
let rows_data = Rows {
schema: column_schemas.clone(),
rows: build_rows(i * 10, (i + 1) * 10),
};
put_rows(&engine, region_id, rows_data).await;
engine
.handle_request(
region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
)
.await
.unwrap();
}
engine
.set_region_role_state_gracefully(region_id, SettableRegionRoleState::StagingLeader)
.await
.unwrap();
let result = engine
.remap_manifests(RemapManifestsRequest {
region_id,
input_regions: vec![region_id],
region_mapping: [(region_id, vec![new_region_id_1, new_region_id_2])]
.into_iter()
.collect(),
new_partition_exprs: [
(
new_region_id_1,
range_expr("tag_0", 0, 50).as_json_str().unwrap(),
),
(
new_region_id_2,
range_expr("tag_0", 50, 100).as_json_str().unwrap(),
),
]
.into_iter()
.collect(),
})
.await
.unwrap();
assert_eq!(result.new_manifests.len(), 2);
let new_manifest_1 =
serde_json::from_str::<RegionManifest>(&result.new_manifests[&new_region_id_1]).unwrap();
let new_manifest_2 =
serde_json::from_str::<RegionManifest>(&result.new_manifests[&new_region_id_2]).unwrap();
assert_eq!(new_manifest_1.files.len(), 3);
assert_eq!(new_manifest_2.files.len(), 3);
let request = CreateRequestBuilder::new().build();
engine
.handle_request(new_region_id_1, RegionRequest::Create(request))
.await
.unwrap();
engine
.handle_request(
new_region_id_1,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: range_expr("tag_0", 0, 50).as_json_str().unwrap(),
}),
)
.await
.unwrap();
let mut files_to_add = new_manifest_1.files.values().cloned().collect::<Vec<_>>();
// Before apply staging manifest, the files should be empty
let region = engine.get_region(new_region_id_1).unwrap();
let manifest = region.manifest_ctx.manifest().await;
assert_eq!(manifest.files.len(), 0);
let staging_manifest = region.manifest_ctx.staging_manifest().await.unwrap();
assert_eq!(staging_manifest.files.len(), 0);
engine
.handle_request(
new_region_id_1,
RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest {
partition_expr: range_expr("tag_0", 0, 50).as_json_str().unwrap(),
files_to_add: serde_json::to_vec(&files_to_add).unwrap(),
}),
)
.await
.unwrap();
// After apply staging manifest, the files should be the same as the new manifest
let region = engine.get_region(new_region_id_1).unwrap();
let manifest = region.manifest_ctx.manifest().await;
assert_eq!(manifest.files.len(), 3);
assert!(region.is_writable());
assert!(!region.is_staging());
// The manifest partition expr should be the same as the request.
assert_eq!(
manifest.metadata.partition_expr.as_ref().unwrap(),
&range_expr("tag_0", 0, 50).as_json_str().unwrap()
);
// The staging manifest should be cleared.
let staging_manifest = region.manifest_ctx.staging_manifest().await;
assert!(staging_manifest.is_none());
// The staging partition expr should be cleared.
assert!(region.staging_partition_expr.lock().unwrap().is_none());
// The staging manifest directory should be empty.
let data_home = env.data_home();
let region_dir = format!("{}/data/test/1_0000000001", data_home.display());
let staging_manifest_dir = format!("{}/staging/manifest", region_dir);
let staging_files = fs::read_dir(&staging_manifest_dir)
.map(|entries| entries.collect::<Result<Vec<_>, _>>().unwrap_or_default())
.unwrap_or_default();
assert_eq!(staging_files.len(), 0);
// Try to modify the file sequence.
files_to_add.push(FileMeta {
region_id,
file_id: FileId::random(),
..Default::default()
});
// This request will be ignored.
engine
.handle_request(
new_region_id_1,
RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest {
partition_expr: range_expr("tag_0", 0, 50).as_json_str().unwrap(),
files_to_add: serde_json::to_vec(&files_to_add).unwrap(),
}),
)
.await
.unwrap();
// The files number should not change.
let region = engine.get_region(new_region_id_1).unwrap();
let manifest = region.manifest_ctx.manifest().await;
assert_eq!(manifest.files.len(), 3);
}
#[tokio::test]
async fn test_apply_staging_manifest_invalid_files_to_add() {
common_telemetry::init_default_ut_logging();
test_apply_staging_manifest_invalid_files_to_add_with_format(false).await;
test_apply_staging_manifest_invalid_files_to_add_with_format(true).await;
}
async fn test_apply_staging_manifest_invalid_files_to_add_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("invalid-files-to-add").await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
..Default::default()
})
.await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
engine
.handle_request(
region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: range_expr("tag_0", 0, 50).as_json_str().unwrap(),
}),
)
.await
.unwrap();
let err = engine
.handle_request(
region_id,
RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest {
partition_expr: range_expr("tag_0", 0, 50).as_json_str().unwrap(),
files_to_add: b"invalid".to_vec(),
}),
)
.await
.unwrap_err();
assert_matches!(
err.into_inner().as_any().downcast_ref::<Error>().unwrap(),
Error::SerdeJson { .. }
);
}
#[tokio::test]
async fn test_apply_staging_manifest_empty_files() {
common_telemetry::init_default_ut_logging();
test_apply_staging_manifest_empty_files_with_format(false).await;
test_apply_staging_manifest_empty_files_with_format(true).await;
}
async fn test_apply_staging_manifest_empty_files_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("empty-files").await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
..Default::default()
})
.await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
engine
.handle_request(
region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: range_expr("tag_0", 0, 50).as_json_str().unwrap(),
}),
)
.await
.unwrap();
engine
.handle_request(
region_id,
RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest {
partition_expr: range_expr("tag_0", 0, 50).as_json_str().unwrap(),
files_to_add: serde_json::to_vec::<Vec<FileMeta>>(&vec![]).unwrap(),
}),
)
.await
.unwrap();
let region = engine.get_region(region_id).unwrap();
let manifest = region.manifest_ctx.manifest().await;
assert_eq!(manifest.files.len(), 0);
let staging_manifest = region.manifest_ctx.staging_manifest().await;
assert!(staging_manifest.is_none());
let staging_partition_expr = region.staging_partition_expr.lock().unwrap();
assert!(staging_partition_expr.is_none());
}

View File

@@ -45,18 +45,6 @@ pub enum RegionMetaAction {
Truncate(RegionTruncate),
}
impl RegionMetaAction {
/// Returns true if the action is a change action.
pub fn is_change(&self) -> bool {
matches!(self, RegionMetaAction::Change(_))
}
/// Returns true if the action is an edit action.
pub fn is_edit(&self) -> bool {
matches!(self, RegionMetaAction::Edit(_))
}
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct RegionChange {
/// The metadata after changed.
@@ -450,8 +438,7 @@ impl RegionMetaActionList {
Self { actions }
}
/// Split the actions into a region change and an edit.
pub fn split_region_change_and_edit(self) -> (Option<RegionChange>, RegionEdit) {
pub fn into_region_edit(self) -> RegionEdit {
let mut edit = RegionEdit {
files_to_add: Vec::new(),
files_to_remove: Vec::new(),
@@ -461,39 +448,31 @@ impl RegionMetaActionList {
flushed_sequence: None,
committed_sequence: None,
};
let mut region_change = None;
for action in self.actions {
match action {
RegionMetaAction::Change(change) => {
region_change = Some(change);
if let RegionMetaAction::Edit(region_edit) = action {
// Merge file adds/removes
edit.files_to_add.extend(region_edit.files_to_add);
edit.files_to_remove.extend(region_edit.files_to_remove);
// Max of flushed entry id / sequence
if let Some(eid) = region_edit.flushed_entry_id {
edit.flushed_entry_id = Some(edit.flushed_entry_id.map_or(eid, |v| v.max(eid)));
}
RegionMetaAction::Edit(region_edit) => {
// Merge file adds/removes
edit.files_to_add.extend(region_edit.files_to_add);
edit.files_to_remove.extend(region_edit.files_to_remove);
// Max of flushed entry id / sequence
if let Some(eid) = region_edit.flushed_entry_id {
edit.flushed_entry_id =
Some(edit.flushed_entry_id.map_or(eid, |v| v.max(eid)));
}
if let Some(seq) = region_edit.flushed_sequence {
edit.flushed_sequence =
Some(edit.flushed_sequence.map_or(seq, |v| v.max(seq)));
}
if let Some(seq) = region_edit.committed_sequence {
edit.committed_sequence =
Some(edit.committed_sequence.map_or(seq, |v| v.max(seq)));
}
// Prefer the latest non-none time window
if region_edit.compaction_time_window.is_some() {
edit.compaction_time_window = region_edit.compaction_time_window;
}
if let Some(seq) = region_edit.flushed_sequence {
edit.flushed_sequence = Some(edit.flushed_sequence.map_or(seq, |v| v.max(seq)));
}
if let Some(seq) = region_edit.committed_sequence {
edit.committed_sequence =
Some(edit.committed_sequence.map_or(seq, |v| v.max(seq)));
}
// Prefer the latest non-none time window
if region_edit.compaction_time_window.is_some() {
edit.compaction_time_window = region_edit.compaction_time_window;
}
_ => {}
}
}
(region_change, edit)
edit
}
}

View File

@@ -45,7 +45,7 @@ pub use utils::*;
use crate::access_layer::AccessLayerRef;
use crate::error::{
FlushableRegionStateSnafu, InvalidPartitionExprSnafu, RegionNotFoundSnafu, RegionStateSnafu,
RegionTruncatedSnafu, Result, UnexpectedSnafu, UpdateManifestSnafu,
RegionTruncatedSnafu, Result, UpdateManifestSnafu,
};
use crate::manifest::action::{
RegionChange, RegionManifest, RegionMetaAction, RegionMetaActionList,
@@ -102,16 +102,6 @@ pub enum RegionRoleState {
Follower,
}
impl RegionRoleState {
/// Converts the region role state to leader state if it is a leader state.
pub fn into_leader_state(self) -> Option<RegionLeaderState> {
match self {
RegionRoleState::Leader(leader_state) => Some(leader_state),
RegionRoleState::Follower => None,
}
}
}
/// Metadata and runtime status of a region.
///
/// Writing and reading a region follow a single-writer-multi-reader rule:
@@ -332,8 +322,11 @@ impl MitoRegion {
/// Sets the editing state.
/// You should call this method in the worker loop.
pub(crate) fn set_editing(&self, expect: RegionLeaderState) -> Result<()> {
self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Editing))
pub(crate) fn set_editing(&self) -> Result<()> {
self.compare_exchange_state(
RegionLeaderState::Writable,
RegionRoleState::Leader(RegionLeaderState::Editing),
)
}
/// Sets the staging state.
@@ -366,7 +359,6 @@ impl MitoRegion {
/// You should call this method in the worker loop.
/// Transitions from Staging to Writable state.
pub fn exit_staging(&self) -> Result<()> {
*self.staging_partition_expr.lock().unwrap() = None;
self.compare_exchange_state(
RegionLeaderState::Staging,
RegionRoleState::Leader(RegionLeaderState::Writable),
@@ -378,8 +370,7 @@ impl MitoRegion {
&self,
state: SettableRegionRoleState,
) -> Result<()> {
let mut manager: RwLockWriteGuard<'_, RegionManifestManager> =
self.manifest_ctx.manifest_manager.write().await;
let mut manager = self.manifest_ctx.manifest_manager.write().await;
let current_state = self.state();
match state {
@@ -714,20 +705,6 @@ impl MitoRegion {
return Ok(());
}
};
let expect_change = merged_actions.actions.iter().any(|a| a.is_change());
let expect_edit = merged_actions.actions.iter().any(|a| a.is_edit());
ensure!(
expect_change,
UnexpectedSnafu {
reason: "expect a change action in merged actions"
}
);
ensure!(
expect_edit,
UnexpectedSnafu {
reason: "expect an edit action in merged actions"
}
);
// Submit merged actions using the manifest manager's update method
// Pass the `false` so it saves to normal directory, not staging
@@ -739,17 +716,12 @@ impl MitoRegion {
);
// Apply the merged changes to in-memory version control
let (merged_change, merged_edit) = merged_actions.split_region_change_and_edit();
// Safety: we have already ensured that there is a change action in the merged actions.
let new_metadata = merged_change.as_ref().unwrap().metadata.clone();
self.version_control.alter_schema(new_metadata);
let merged_edit = merged_actions.into_region_edit();
self.version_control
.apply_edit(Some(merged_edit), &[], self.file_purger.clone());
// Clear all staging manifests and transit state
if let Err(e) = manager.clear_staging_manifest_and_dir().await {
error!(e; "Failed to clear staging manifest dir for region {}", self.region_id);
}
manager.store().clear_staging_manifests().await?;
self.exit_staging()?;
Ok(())

View File

@@ -37,10 +37,10 @@ use store_api::region_engine::{
MitoCopyRegionFromResponse, SetRegionRoleStateResponse, SettableRegionRoleState,
};
use store_api::region_request::{
AffectedRows, ApplyStagingManifestRequest, EnterStagingRequest, RegionAlterRequest,
RegionBuildIndexRequest, RegionBulkInsertsRequest, RegionCatchupRequest, RegionCloseRequest,
RegionCompactRequest, RegionCreateRequest, RegionFlushRequest, RegionOpenRequest,
RegionRequest, RegionTruncateRequest,
AffectedRows, EnterStagingRequest, RegionAlterRequest, RegionBuildIndexRequest,
RegionBulkInsertsRequest, RegionCatchupRequest, RegionCloseRequest, RegionCompactRequest,
RegionCreateRequest, RegionFlushRequest, RegionOpenRequest, RegionRequest,
RegionTruncateRequest,
};
use store_api::storage::{FileId, RegionId};
use tokio::sync::oneshot::{self, Receiver, Sender};
@@ -741,11 +741,6 @@ impl WorkerRequest {
sender: sender.into(),
request: region_bulk_inserts_request,
},
RegionRequest::ApplyStagingManifest(v) => WorkerRequest::Ddl(SenderDdlRequest {
region_id,
sender: sender.into(),
request: DdlRequest::ApplyStagingManifest(v),
}),
};
Ok((worker_request, receiver))
@@ -857,7 +852,6 @@ pub(crate) enum DdlRequest {
Truncate(RegionTruncateRequest),
Catchup((RegionCatchupRequest, Option<WalEntryReceiver>)),
EnterStaging(EnterStagingRequest),
ApplyStagingManifest(ApplyStagingManifestRequest),
}
/// Sender and Ddl request.
@@ -1086,8 +1080,6 @@ pub(crate) struct RegionEditResult {
pub(crate) result: Result<()>,
/// Whether region state need to be set to Writable after handling this request.
pub(crate) update_region_state: bool,
/// The region is in staging mode before handling this request.
pub(crate) is_staging: bool,
}
#[derive(Debug)]

View File

@@ -15,7 +15,6 @@
//! Structs and utilities for writing regions.
mod handle_alter;
mod handle_apply_staging;
mod handle_bulk_insert;
mod handle_catchup;
mod handle_close;
@@ -1006,7 +1005,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
.await;
}
WorkerRequest::EditRegion(request) => {
self.handle_region_edit(request);
self.handle_region_edit(request).await;
}
WorkerRequest::Stop => {
debug_assert!(!self.running.load(Ordering::Relaxed));
@@ -1108,11 +1107,6 @@ impl<S: LogStore> RegionWorkerLoop<S> {
.await;
continue;
}
DdlRequest::ApplyStagingManifest(req) => {
self.handle_apply_staging_manifest_request(ddl.region_id, req, ddl.sender)
.await;
continue;
}
};
ddl.sender.send(res);

View File

@@ -1,140 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use chrono::Utc;
use common_telemetry::{debug, info};
use snafu::ResultExt;
use store_api::logstore::LogStore;
use store_api::region_request::ApplyStagingManifestRequest;
use store_api::storage::RegionId;
use tokio::sync::oneshot;
use crate::error::{
RegionStateSnafu, SerdeJsonSnafu, StagingPartitionExprMismatchSnafu, UnexpectedSnafu,
};
use crate::manifest::action::RegionEdit;
use crate::region::{RegionLeaderState, RegionRoleState};
use crate::request::{OptionOutputTx, RegionEditRequest};
use crate::sst::file::FileMeta;
use crate::worker::RegionWorkerLoop;
impl<S: LogStore> RegionWorkerLoop<S> {
pub(crate) async fn handle_apply_staging_manifest_request(
&mut self,
region_id: RegionId,
request: ApplyStagingManifestRequest,
sender: OptionOutputTx,
) {
let region = match self.regions.writable_region(region_id) {
Ok(region) => region,
Err(e) => {
sender.send(Err(e));
return;
}
};
if !region.is_staging() {
let manifest_partition_expr = region.metadata().partition_expr.as_ref().cloned();
let is_match = manifest_partition_expr.as_ref() == Some(&request.partition_expr);
debug!(
"region {} manifest partition expr: {:?}, request partition expr: {:?}",
region_id, manifest_partition_expr, request.partition_expr
);
if is_match {
// If current partition expr is already the same as the request,
// treats the region already applied the staging manifest.
info!(
"Region {} already applied the staging manifest, partition expr: {}, ignore the apply staging manifest request",
region_id, request.partition_expr
);
sender.send(Ok(0));
return;
}
sender.send(
RegionStateSnafu {
region_id,
state: region.state(),
expect: RegionRoleState::Leader(RegionLeaderState::Staging),
}
.fail(),
);
return;
}
let staging_partition_expr = region.staging_partition_expr.lock().unwrap().clone();
// If the partition expr mismatch, return error.
if staging_partition_expr.as_ref() != Some(&request.partition_expr) {
sender.send(
StagingPartitionExprMismatchSnafu {
manifest_expr: staging_partition_expr,
request_expr: request.partition_expr,
}
.fail(),
);
return;
}
let (tx, rx) = oneshot::channel();
let files_to_add = match serde_json::from_slice::<Vec<FileMeta>>(&request.files_to_add)
.context(SerdeJsonSnafu)
{
Ok(files_to_add) => files_to_add,
Err(e) => {
sender.send(Err(e));
return;
}
};
info!("Applying staging manifest request to region {}", region_id);
self.handle_region_edit(RegionEditRequest {
region_id,
edit: RegionEdit {
files_to_add,
files_to_remove: vec![],
timestamp_ms: Some(Utc::now().timestamp_millis()),
compaction_time_window: None,
flushed_entry_id: None,
flushed_sequence: None,
committed_sequence: None,
},
tx,
});
common_runtime::spawn_global(async move {
// Await the result from the region edit and forward the outcome to the original sender.
// If the operation completes successfully, respond with Ok(0); otherwise, respond with an appropriate error.
if let Ok(result) = rx.await {
let Ok(()) = result else {
sender.send(result.map(|_| 0));
return;
};
let mut manager = region.manifest_ctx.manifest_manager.write().await;
match region.exit_staging_on_success(&mut manager).await {
Ok(()) => {
sender.send(Ok(0));
}
Err(e) => sender.send(Err(e)),
}
} else {
sender.send(
UnexpectedSnafu {
reason: "edit region receiver channel closed",
}
.fail(),
);
}
});
}
}

View File

@@ -214,7 +214,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
impl<S> RegionWorkerLoop<S> {
/// Handles region edit request.
pub(crate) fn handle_region_edit(&mut self, request: RegionEditRequest) {
pub(crate) async fn handle_region_edit(&mut self, request: RegionEditRequest) {
let region_id = request.region_id;
let Some(region) = self.regions.get_region(region_id) else {
let _ = request.tx.send(RegionNotFoundSnafu { region_id }.fail());
@@ -246,15 +246,8 @@ impl<S> RegionWorkerLoop<S> {
file.sequence = NonZeroU64::new(file_sequence);
}
// Allow retrieving `is_staging` before spawn the edit region task.
let is_staging = region.is_staging();
let expect_state = if is_staging {
RegionLeaderState::Staging
} else {
RegionLeaderState::Writable
};
// Marks the region as editing.
if let Err(e) = region.set_editing(expect_state) {
if let Err(e) = region.set_editing() {
let _ = sender.send(Err(e));
return;
}
@@ -265,8 +258,7 @@ impl<S> RegionWorkerLoop<S> {
// Now the region is in editing state.
// Updates manifest in background.
common_runtime::spawn_global(async move {
let result =
edit_region(&region, edit.clone(), cache_manager, listener, is_staging).await;
let result = edit_region(&region, edit.clone(), cache_manager, listener).await;
let notify = WorkerRequest::Background {
region_id,
notify: BackgroundNotify::RegionEdit(RegionEditResult {
@@ -276,7 +268,6 @@ impl<S> RegionWorkerLoop<S> {
result,
// we always need to restore region state after region edit
update_region_state: true,
is_staging,
}),
};
@@ -308,39 +299,29 @@ impl<S> RegionWorkerLoop<S> {
}
};
let need_compaction = if edit_result.is_staging {
if edit_result.update_region_state {
// For staging regions, edits are not applied immediately,
// as they remain invisible until the region exits the staging state.
region.switch_state_to_staging(RegionLeaderState::Editing);
}
let need_compaction =
edit_result.result.is_ok() && !edit_result.edit.files_to_add.is_empty();
false
} else {
let need_compaction =
edit_result.result.is_ok() && !edit_result.edit.files_to_add.is_empty();
// Only apply the edit if the result is ok and region is not in staging state.
if edit_result.result.is_ok() {
// Applies the edit to the region.
region.version_control.apply_edit(
Some(edit_result.edit),
&[],
region.file_purger.clone(),
);
}
if edit_result.update_region_state {
region.switch_state_to_writable(RegionLeaderState::Editing);
}
if edit_result.result.is_ok() {
// Applies the edit to the region.
region.version_control.apply_edit(
Some(edit_result.edit),
&[],
region.file_purger.clone(),
);
}
need_compaction
};
if edit_result.update_region_state {
// Sets the region as writable.
region.switch_state_to_writable(RegionLeaderState::Editing);
}
let _ = edit_result.sender.send(edit_result.result);
if let Some(edit_queue) = self.region_edit_queues.get_mut(&edit_result.region_id)
&& let Some(request) = edit_queue.dequeue()
{
self.handle_region_edit(request);
self.handle_region_edit(request).await;
}
if need_compaction {
@@ -482,9 +463,9 @@ async fn edit_region(
edit: RegionEdit,
cache_manager: CacheManagerRef,
listener: WorkerListener,
is_staging: bool,
) -> Result<()> {
let region_id = region.region_id;
let is_staging = region.is_staging();
if let Some(write_cache) = cache_manager.write_cache() {
for file_meta in &edit.files_to_add {
let write_cache = write_cache.clone();
@@ -549,10 +530,7 @@ async fn edit_region(
}
}
info!(
"Applying {edit:?} to region {}, is_staging: {}",
region_id, is_staging
);
info!("Applying {edit:?} to region {}", region_id);
let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit));
region

View File

@@ -152,7 +152,6 @@ pub enum RegionRequest {
Catchup(RegionCatchupRequest),
BulkInserts(RegionBulkInsertsRequest),
EnterStaging(EnterStagingRequest),
ApplyStagingManifest(ApplyStagingManifestRequest),
}
impl RegionRequest {
@@ -183,9 +182,6 @@ impl RegionRequest {
reason: "ListMetadata request should be handled separately by RegionServer",
}
.fail(),
region_request::Body::ApplyStagingManifest(apply) => {
make_region_apply_staging_manifest(apply)
}
}
}
@@ -417,28 +413,6 @@ fn make_region_bulk_inserts(request: BulkInsertRequest) -> Result<Vec<(RegionId,
)])
}
fn make_region_apply_staging_manifest(
api::v1::region::ApplyStagingManifestRequest {
region_id,
partition_expr,
files_to_add,
}: api::v1::region::ApplyStagingManifestRequest,
) -> Result<Vec<(RegionId, RegionRequest)>> {
let region_id = region_id.into();
let files_to_add = files_to_add
.context(UnexpectedSnafu {
reason: "'files_to_add' field is missing",
})?
.data;
Ok(vec![(
region_id,
RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest {
partition_expr,
files_to_add,
}),
)])
}
/// Request to put data into a region.
#[derive(Debug)]
pub struct RegionPutRequest {
@@ -1454,30 +1428,6 @@ pub struct EnterStagingRequest {
pub partition_expr: String,
}
/// This request is used as part of the region repartition.
///
/// After a region has entered staging mode with a new region rule (partition
/// expression) and a separate process (for example, `remap_manifests`) has
/// generated the new file assignments for the staging region, this request
/// applies that generated manifest to the region.
///
/// In practice, this means:
/// - The `partition_expr` identifies the staging region rule that the manifest
/// was generated for.
/// - `files_to_add` carries the serialized metadata (such as file manifests or
/// file lists) that should be attached to the region under the new rule.
///
/// It should typically be called **after** the staging region has been
/// initialized by [`EnterStagingRequest`] and the new file layout has been
/// computed, to finalize the repartition operation.
#[derive(Debug, Clone)]
pub struct ApplyStagingManifestRequest {
/// The partition expression of the staging region.
pub partition_expr: String,
/// The files to add to the region.
pub files_to_add: Vec<u8>,
}
impl fmt::Display for RegionRequest {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
@@ -1495,7 +1445,6 @@ impl fmt::Display for RegionRequest {
RegionRequest::Catchup(_) => write!(f, "Catchup"),
RegionRequest::BulkInserts(_) => write!(f, "BulkInserts"),
RegionRequest::EnterStaging(_) => write!(f, "EnterStaging"),
RegionRequest::ApplyStagingManifest(_) => write!(f, "ApplyStagingManifest"),
}
}
}