feat: implement sync_region for mito engine (#5765)

* chore: upgrade proto to `2d52b`

* feat: add `SyncRegion` to `WorkerRequest`

* feat: impl `sync_region` for `Engine` trait

* test: add tests

* chore: fmt code

* chore: upgrade proto

* chore: unify `RegionLeaderState` and `RegionFollowerState`

* chore: check immutable memtable

* chore: fix clippy

* chore: apply suggestions from CR
This commit is contained in:
Weny Xu
2025-03-31 11:53:47 +08:00
committed by GitHub
parent c5b55fd8cf
commit 41aee1f1b7
20 changed files with 513 additions and 53 deletions

2
Cargo.lock generated
View File

@@ -4671,7 +4671,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=97e298d119fdb9499bc6ba9e03f375cfa7cdf130#97e298d119fdb9499bc6ba9e03f375cfa7cdf130"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=5bf34f1ba22763bfd4ab2ed1dd82fc790746048a#5bf34f1ba22763bfd4ab2ed1dd82fc790746048a"
dependencies = [
"prost 0.13.3",
"serde",

View File

@@ -129,7 +129,7 @@ etcd-client = "0.14"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "97e298d119fdb9499bc6ba9e03f375cfa7cdf130" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "5bf34f1ba22763bfd4ab2ed1dd82fc790746048a" }
hex = "0.4"
http = "1"
humantime = "2.1"

View File

@@ -380,7 +380,9 @@ impl CountdownTask {
}
},
Some(CountdownCommand::Reset((role, deadline))) => {
let _ = self.region_server.set_region_role(self.region_id, role);
if let Err(err) = self.region_server.set_region_role(self.region_id, role) {
error!(err; "Failed to set region role to {role} for region {region_id}");
}
trace!(
"Reset deadline of region {region_id} to approximately {} seconds later.",
(deadline - Instant::now()).as_secs_f32(),
@@ -402,7 +404,9 @@ impl CountdownTask {
}
() = &mut countdown => {
warn!("The region {region_id} lease is expired, convert region to follower.");
let _ = self.region_server.set_region_role(self.region_id, RegionRole::Follower);
if let Err(err) = self.region_server.set_region_role(self.region_id, RegionRole::Follower) {
error!(err; "Failed to set region role to follower for region {region_id}");
}
// resets the countdown.
let far_future = Instant::now() + Duration::from_secs(86400 * 365 * 30);
countdown.as_mut().reset(far_future);
@@ -468,6 +472,8 @@ mod test {
&[GrantedRegion {
region_id: region_id.as_u64(),
role: api::v1::meta::RegionRole::Leader.into(),
// TODO(weny): use real manifest version
manifest_version: 0,
}],
Instant::now() + Duration::from_millis(200),
)

View File

@@ -246,6 +246,10 @@ impl RegionEngine for MockRegionEngine {
Some(RegionRole::Leader)
}
async fn sync_region(&self, _region_id: RegionId, _version: u64) -> Result<(), BoxedError> {
unimplemented!()
}
fn as_any(&self) -> &dyn Any {
self
}

View File

@@ -24,6 +24,7 @@ use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::{error, info};
use object_store::ObjectStore;
use snafu::{ensure, OptionExt};
use store_api::manifest::ManifestVersion;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{
RegionEngine, RegionRole, RegionScannerRef, RegionStatistic, SetRegionRoleStateResponse,
@@ -138,6 +139,15 @@ impl RegionEngine for FileRegionEngine {
}
}
async fn sync_region(
&self,
_region_id: RegionId,
_manifest_version: ManifestVersion,
) -> Result<(), BoxedError> {
// File engine doesn't need to sync region manifest.
Ok(())
}
fn role(&self, region_id: RegionId) -> Option<RegionRole> {
self.inner.state(region_id)
}

View File

@@ -80,6 +80,8 @@ impl HeartbeatHandler for RegionLeaseHandler {
GrantedRegion {
region_id,
region_role,
// TODO(weny): use real manifest version
manifest_version: 0,
}
.into()
})

View File

@@ -36,6 +36,7 @@ use common_error::status_code::StatusCode;
use mito2::engine::MitoEngine;
pub(crate) use options::IndexOptions;
use snafu::ResultExt;
use store_api::manifest::ManifestVersion;
use store_api::metadata::RegionMetadataRef;
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
use store_api::region_engine::{
@@ -48,7 +49,7 @@ use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
use self::state::MetricEngineState;
use crate::config::EngineConfig;
use crate::data_region::DataRegion;
use crate::error::{self, Result, UnsupportedRegionRequestSnafu};
use crate::error::{self, Result, UnsupportedRegionRequestSnafu, UnsupportedSyncRegionSnafu};
use crate::metadata_region::MetadataRegion;
use crate::row_modifier::RowModifier;
use crate::utils;
@@ -285,6 +286,15 @@ impl RegionEngine for MetricEngine {
Ok(())
}
async fn sync_region(
&self,
_region_id: RegionId,
_manifest_version: ManifestVersion,
) -> Result<(), BoxedError> {
// TODO(weny): implement it later.
Err(BoxedError::new(UnsupportedSyncRegionSnafu {}.build()))
}
async fn set_region_role_state_gracefully(
&self,
region_id: RegionId,

View File

@@ -259,6 +259,12 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Unsupported sync region request"))]
UnsupportedSyncRegion {
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -280,9 +286,9 @@ impl ErrorExt for Error {
| UnexpectedRequest { .. }
| UnsupportedAlterKind { .. } => StatusCode::InvalidArguments,
ForbiddenPhysicalAlter { .. } | UnsupportedRegionRequest { .. } => {
StatusCode::Unsupported
}
ForbiddenPhysicalAlter { .. }
| UnsupportedRegionRequest { .. }
| UnsupportedSyncRegion { .. } => StatusCode::Unsupported,
DeserializeColumnMetadata { .. }
| SerializeColumnMetadata { .. }

View File

@@ -55,6 +55,8 @@ mod row_selector_test;
#[cfg(test)]
mod set_role_state_test;
#[cfg(test)]
mod sync_test;
#[cfg(test)]
mod truncate_test;
use std::any::Any;
@@ -76,6 +78,7 @@ use snafu::{ensure, OptionExt, ResultExt};
use store_api::codec::PrimaryKeyEncoding;
use store_api::logstore::provider::Provider;
use store_api::logstore::LogStore;
use store_api::manifest::ManifestVersion;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{
BatchResponses, RegionEngine, RegionRole, RegionScannerRef, RegionStatistic,
@@ -488,6 +491,18 @@ impl EngineInner {
receiver.await.context(RecvSnafu)
}
async fn sync_region(
&self,
region_id: RegionId,
manifest_version: ManifestVersion,
) -> Result<ManifestVersion> {
let (request, receiver) =
WorkerRequest::new_sync_region_request(region_id, manifest_version);
self.workers.submit_to_worker(region_id, request).await?;
receiver.await.context(RecvSnafu)?
}
fn role(&self, region_id: RegionId) -> Option<RegionRole> {
self.workers.get_region(region_id).map(|region| {
if region.is_follower() {
@@ -609,6 +624,18 @@ impl RegionEngine for MitoEngine {
.map_err(BoxedError::new)
}
async fn sync_region(
&self,
region_id: RegionId,
manifest_version: ManifestVersion,
) -> Result<(), BoxedError> {
self.inner
.sync_region(region_id, manifest_version)
.await
.map_err(BoxedError::new)
.map(|_| ())
}
fn role(&self, region_id: RegionId) -> Option<RegionRole> {
self.inner.role(region_id)
}

View File

@@ -0,0 +1,235 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::assert_matches::assert_matches;
use api::v1::{Rows, SemanticType};
use common_error::ext::ErrorExt;
use common_recordbatch::RecordBatches;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use store_api::metadata::ColumnMetadata;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{
AddColumn, AddColumnLocation, AlterKind, RegionAlterRequest, RegionOpenRequest, RegionRequest,
};
use store_api::storage::{RegionId, ScanRequest};
use super::MitoEngine;
use crate::config::MitoConfig;
use crate::error::Error;
use crate::test_util::{
build_rows, flush_region, put_rows, rows_schema, CreateRequestBuilder, TestEnv,
};
fn add_tag1() -> RegionAlterRequest {
RegionAlterRequest {
schema_version: 0,
kind: AlterKind::AddColumns {
columns: vec![AddColumn {
column_metadata: ColumnMetadata {
column_schema: ColumnSchema::new(
"tag_1",
ConcreteDataType::string_datatype(),
true,
),
semantic_type: SemanticType::Tag,
column_id: 3,
},
location: Some(AddColumnLocation::First),
}],
},
}
}
async fn scan_check(
engine: &MitoEngine,
region_id: RegionId,
expected: &str,
num_memtable: usize,
num_files: usize,
) {
let request = ScanRequest::default();
let scanner = engine.scanner(region_id, request).unwrap();
assert_eq!(num_memtable, scanner.num_memtables());
assert_eq!(num_files, scanner.num_files());
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(expected, batches.pretty_print().unwrap());
}
#[tokio::test]
async fn test_sync_after_flush_region() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
env.get_kv_backend(),
)
.await;
let request = CreateRequestBuilder::new().build();
let region_dir = request.region_dir.clone();
let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
let rows = Rows {
schema: column_schemas,
rows: build_rows(0, 3),
};
put_rows(&engine, region_id, rows).await;
// Open the region on the follower engine
let follower_engine = env.create_follower_engine(MitoConfig::default()).await;
follower_engine
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: String::new(),
region_dir,
options: Default::default(),
// Ensure the region is not replayed from the WAL.
skip_wal_replay: true,
}),
)
.await
.unwrap();
flush_region(&engine, region_id, None).await;
// Scan the region on the leader engine
let expected = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 0 | 0.0 | 1970-01-01T00:00:00 |
| 1 | 1.0 | 1970-01-01T00:00:01 |
| 2 | 2.0 | 1970-01-01T00:00:02 |
+-------+---------+---------------------+";
scan_check(&engine, region_id, expected, 0, 1).await;
common_telemetry::info!("Scan the region on the follower engine");
// Scan the region on the follower engine
let expected = "++\n++";
scan_check(&follower_engine, region_id, expected, 0, 0).await;
// Returns error since the max manifest is 1
let err = follower_engine.sync_region(region_id, 2).await.unwrap_err();
let err = err.as_any().downcast_ref::<Error>().unwrap();
assert_matches!(err, Error::InstallManifestTo { .. });
follower_engine.sync_region(region_id, 1).await.unwrap();
common_telemetry::info!("Scan the region on the follower engine after sync");
// Scan the region on the follower engine
let expected = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 0 | 0.0 | 1970-01-01T00:00:00 |
| 1 | 1.0 | 1970-01-01T00:00:01 |
| 2 | 2.0 | 1970-01-01T00:00:02 |
+-------+---------+---------------------+";
scan_check(&follower_engine, region_id, expected, 0, 1).await;
}
#[tokio::test]
async fn test_sync_after_alter_region() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
env.get_kv_backend(),
)
.await;
let column_schemas = rows_schema(&request);
let region_dir = request.region_dir.clone();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
let rows = Rows {
schema: column_schemas,
rows: build_rows(0, 3),
};
put_rows(&engine, region_id, rows).await;
// Open the region on the follower engine
let follower_engine = env.create_follower_engine(MitoConfig::default()).await;
follower_engine
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: String::new(),
region_dir,
options: Default::default(),
// Ensure the region is not replayed from the WAL.
skip_wal_replay: true,
}),
)
.await
.unwrap();
let request = add_tag1();
engine
.handle_request(region_id, RegionRequest::Alter(request))
.await
.unwrap();
let expected = "\
+-------+-------+---------+---------------------+
| tag_1 | tag_0 | field_0 | ts |
+-------+-------+---------+---------------------+
| | 0 | 0.0 | 1970-01-01T00:00:00 |
| | 1 | 1.0 | 1970-01-01T00:00:01 |
| | 2 | 2.0 | 1970-01-01T00:00:02 |
+-------+-------+---------+---------------------+";
scan_check(&engine, region_id, expected, 0, 1).await;
let expected = "++\n++";
scan_check(&follower_engine, region_id, expected, 0, 0).await;
// Sync the region from the leader engine to the follower engine
follower_engine.sync_region(region_id, 2).await.unwrap();
let expected = "\
+-------+-------+---------+---------------------+
| tag_1 | tag_0 | field_0 | ts |
+-------+-------+---------+---------------------+
| | 0 | 0.0 | 1970-01-01T00:00:00 |
| | 1 | 1.0 | 1970-01-01T00:00:01 |
| | 2 | 2.0 | 1970-01-01T00:00:02 |
+-------+-------+---------+---------------------+";
scan_check(&follower_engine, region_id, expected, 0, 1).await;
}

View File

@@ -33,7 +33,7 @@ use store_api::storage::RegionId;
use tokio::time::error::Elapsed;
use crate::cache::file_cache::FileType;
use crate::region::{RegionLeaderState, RegionRoleState};
use crate::region::RegionRoleState;
use crate::schedule::remote_job_scheduler::JobId;
use crate::sst::file::FileId;
use crate::worker::WorkerId;
@@ -511,10 +511,10 @@ pub enum Error {
},
#[snafu(display("Region {} is in {:?} state, expect: {:?}", region_id, state, expect))]
RegionLeaderState {
RegionState {
region_id: RegionId,
state: RegionRoleState,
expect: RegionLeaderState,
expect: RegionRoleState,
#[snafu(implicit)]
location: Location,
},
@@ -812,8 +812,8 @@ pub enum Error {
#[snafu(display(
"Failed to install manifest to {}, region: {}, available manifest version: {}, last version: {}",
target_version,
available_version,
region_id,
available_version,
last_version
))]
InstallManifestTo {
@@ -1125,8 +1125,8 @@ impl ErrorExt for Error {
CompactRegion { source, .. } => source.status_code(),
CompatReader { .. } => StatusCode::Unexpected,
InvalidRegionRequest { source, .. } => source.status_code(),
RegionLeaderState { .. } | UpdateManifest { .. } => StatusCode::RegionNotReady,
&FlushableRegionState { .. } => StatusCode::RegionNotReady,
RegionState { .. } | UpdateManifest { .. } => StatusCode::RegionNotReady,
FlushableRegionState { .. } => StatusCode::RegionNotReady,
JsonOptions { .. } => StatusCode::InvalidArguments,
EmptyRegionDir { .. } | EmptyManifestDir { .. } => StatusCode::RegionNotFound,
ArrowReader { .. } => StatusCode::StorageUnavailable,

View File

@@ -35,10 +35,10 @@ use store_api::storage::RegionId;
use crate::access_layer::AccessLayerRef;
use crate::error::{
FlushableRegionStateSnafu, RegionLeaderStateSnafu, RegionNotFoundSnafu, RegionTruncatedSnafu,
Result, UpdateManifestSnafu,
FlushableRegionStateSnafu, RegionNotFoundSnafu, RegionStateSnafu, RegionTruncatedSnafu, Result,
UpdateManifestSnafu,
};
use crate::manifest::action::{RegionMetaAction, RegionMetaActionList};
use crate::manifest::action::{RegionManifest, RegionMetaAction, RegionMetaActionList};
use crate::manifest::manager::RegionManifestManager;
use crate::memtable::MemtableBuilderRef;
use crate::region::version::{VersionControlRef, VersionRef};
@@ -317,10 +317,10 @@ impl MitoRegion {
.state
.compare_exchange(RegionRoleState::Leader(expect), state)
.map_err(|actual| {
RegionLeaderStateSnafu {
RegionStateSnafu {
region_id: self.region_id,
state: actual,
expect,
expect: RegionRoleState::Leader(expect),
}
.build()
})?;
@@ -358,6 +358,21 @@ impl ManifestContext {
self.manifest_manager.read().await.has_update().await
}
/// Installs the manifest changes from the current version to the target version (inclusive).
///
/// Returns installed [RegionManifest].
/// **Note**: This function is not guaranteed to install the target version strictly.
/// The installed version may be greater than the target version.
pub(crate) async fn install_manifest_to(
&self,
version: ManifestVersion,
) -> Result<Arc<RegionManifest>> {
let mut manager = self.manifest_manager.write().await;
manager.install_manifest_to(version).await?;
Ok(manager.manifest())
}
/// Updates the manifest if current state is `expect_state`.
pub(crate) async fn update_manifest(
&self,
@@ -394,10 +409,10 @@ impl ManifestContext {
} else {
ensure!(
current_state == RegionRoleState::Leader(expect_state),
RegionLeaderStateSnafu {
RegionStateSnafu {
region_id: manifest.metadata.region_id,
state: current_state,
expect: expect_state,
expect: RegionRoleState::Leader(expect_state),
}
);
}
@@ -589,15 +604,34 @@ impl RegionMap {
.context(RegionNotFoundSnafu { region_id })?;
ensure!(
region.is_writable(),
RegionLeaderStateSnafu {
RegionStateSnafu {
region_id,
state: region.state(),
expect: RegionLeaderState::Writable,
expect: RegionRoleState::Leader(RegionLeaderState::Writable),
}
);
Ok(region)
}
/// Gets readonly region by region id.
///
/// Returns error if the region does not exist or is writable.
pub(crate) fn follower_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
let region = self
.get_region(region_id)
.context(RegionNotFoundSnafu { region_id })?;
ensure!(
region.is_follower(),
RegionStateSnafu {
region_id,
state: region.state(),
expect: RegionRoleState::Follower,
}
);
Ok(region)
}
/// Gets region by region id.
///
/// Calls the callback if the region does not exist.

View File

@@ -216,6 +216,12 @@ impl VersionControl {
version_data.version.ssts.mark_all_deleted();
version_data.version = new_version;
}
/// Overwrites the current version with a new version.
pub(crate) fn overwrite_current(&self, version: VersionRef) {
let mut version_data = self.data.write().unwrap();
version_data.version = version;
}
}
pub(crate) type VersionControlRef = Arc<VersionControl>;

View File

@@ -31,6 +31,7 @@ use prost::Message;
use smallvec::SmallVec;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::codec::{infer_primary_key_encoding_from_hint, PrimaryKeyEncoding};
use store_api::manifest::ManifestVersion;
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
use store_api::region_engine::{SetRegionRoleStateResponse, SettableRegionRoleState};
use store_api::region_request::{
@@ -565,6 +566,9 @@ pub(crate) enum WorkerRequest {
/// Use [RegionEdit] to edit a region directly.
EditRegion(RegionEditRequest),
/// Keep the manifest of a region up to date.
SyncRegion(RegionSyncRequest),
}
impl WorkerRequest {
@@ -684,6 +688,21 @@ impl WorkerRequest {
receiver,
)
}
pub(crate) fn new_sync_region_request(
region_id: RegionId,
manifest_version: ManifestVersion,
) -> (WorkerRequest, Receiver<Result<ManifestVersion>>) {
let (sender, receiver) = oneshot::channel();
(
WorkerRequest::SyncRegion(RegionSyncRequest {
region_id,
manifest_version,
sender,
}),
receiver,
)
}
}
/// DDL request to a region.
@@ -869,6 +888,13 @@ pub(crate) struct RegionEditResult {
pub(crate) result: Result<()>,
}
#[derive(Debug)]
pub(crate) struct RegionSyncRequest {
pub(crate) region_id: RegionId,
pub(crate) manifest_version: ManifestVersion,
pub(crate) sender: Sender<Result<ManifestVersion>>,
}
#[cfg(test)]
mod tests {
use api::v1::value::ValueData;

View File

@@ -824,6 +824,10 @@ impl<S: LogStore> RegionWorkerLoop<S> {
WorkerRequest::Stop => {
debug_assert!(!self.running.load(Ordering::Relaxed));
}
WorkerRequest::SyncRegion(req) => {
self.handle_region_sync(req).await;
}
}
}

View File

@@ -27,6 +27,7 @@ use tokio::time::Instant;
use crate::error::{self, Result};
use crate::region::opener::{replay_memtable, RegionOpener};
use crate::region::MitoRegion;
use crate::worker::RegionWorkerLoop;
impl<S: LogStore> RegionWorkerLoop<S> {
@@ -45,34 +46,12 @@ impl<S: LogStore> RegionWorkerLoop<S> {
}
// Note: Currently, We protect the split brain by ensuring the mutable table is empty.
// It's expensive to execute catch-up requests without `set_writable=true` multiple times.
let is_mutable_empty = region.version().memtables.mutable.is_empty();
let version = region.version();
let is_empty_memtable = version.memtables.is_empty();
// Utilizes the short circuit evaluation.
let region = if !is_mutable_empty || region.manifest_ctx.has_update().await? {
let manifest_version = region.manifest_ctx.manifest_version().await;
let flushed_entry_id = region.version_control.current().last_entry_id;
info!("Reopening the region: {region_id}, empty mutable: {is_mutable_empty}, manifest version: {manifest_version}, flushed entry id: {flushed_entry_id}");
let reopened_region = Arc::new(
RegionOpener::new(
region_id,
region.region_dir(),
self.memtable_builder_provider.clone(),
self.object_store_manager.clone(),
self.purge_scheduler.clone(),
self.puffin_manager_factory.clone(),
self.intermediate_manager.clone(),
self.time_provider.clone(),
)
.cache(Some(self.cache_manager.clone()))
.options(region.version().options.clone())?
.skip_wal_replay(true)
.open(&self.config, &self.wal)
.await?,
);
debug_assert!(!reopened_region.is_writable());
self.regions.insert_region(reopened_region.clone());
reopened_region
let region = if !is_empty_memtable || region.manifest_ctx.has_update().await? {
self.reopen_region(&region).await?
} else {
region
};
@@ -124,4 +103,36 @@ impl<S: LogStore> RegionWorkerLoop<S> {
Ok(0)
}
/// Reopens a region.
pub(crate) async fn reopen_region(
&mut self,
region: &Arc<MitoRegion>,
) -> Result<Arc<MitoRegion>> {
let region_id = region.region_id;
let manifest_version = region.manifest_ctx.manifest_version().await;
let flushed_entry_id = region.version_control.current().last_entry_id;
info!("Reopening the region: {region_id}, manifest version: {manifest_version}, flushed entry id: {flushed_entry_id}");
let reopened_region = Arc::new(
RegionOpener::new(
region_id,
region.region_dir(),
self.memtable_builder_provider.clone(),
self.object_store_manager.clone(),
self.purge_scheduler.clone(),
self.puffin_manager_factory.clone(),
self.intermediate_manager.clone(),
self.time_provider.clone(),
)
.cache(Some(self.cache_manager.clone()))
.options(region.version().options.clone())?
.skip_wal_replay(true)
.open(&self.config, &self.wal)
.await?,
);
debug_assert!(!reopened_region.is_writable());
self.regions.insert_region(reopened_region.clone());
Ok(reopened_region)
}
}

View File

@@ -17,6 +17,7 @@
//! It updates the manifest and applies the changes to the region in background.
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use common_telemetry::{info, warn};
use store_api::logstore::LogStore;
@@ -29,10 +30,11 @@ use crate::manifest::action::{
RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList, RegionTruncate,
};
use crate::metrics::WRITE_CACHE_INFLIGHT_DOWNLOAD;
use crate::region::version::VersionBuilder;
use crate::region::{MitoRegionRef, RegionLeaderState, RegionRoleState};
use crate::request::{
BackgroundNotify, OptionOutputTx, RegionChangeResult, RegionEditRequest, RegionEditResult,
TruncateResult, WorkerRequest,
RegionSyncRequest, TruncateResult, WorkerRequest,
};
use crate::sst::location;
use crate::worker::{RegionWorkerLoop, WorkerListener};
@@ -118,6 +120,61 @@ impl<S: LogStore> RegionWorkerLoop<S> {
self.handle_region_stalled_requests(&change_result.region_id)
.await;
}
/// Handles region sync request.
///
/// Updates the manifest to at least the given version.
/// **Note**: The installed version may be greater than the given version.
pub(crate) async fn handle_region_sync(&mut self, request: RegionSyncRequest) {
let region_id = request.region_id;
let sender = request.sender;
let region = match self.regions.follower_region(region_id) {
Ok(region) => region,
Err(e) => {
let _ = sender.send(Err(e));
return;
}
};
let manifest = match region
.manifest_ctx
.install_manifest_to(request.manifest_version)
.await
{
Ok(manifest) => manifest,
Err(e) => {
let _ = sender.send(Err(e));
return;
}
};
let version = region.version();
if !version.memtables.is_empty() {
warn!(
"Region {} memtables is not empty, which should not happen, manifest version: {}",
region.region_id, manifest.manifest_version
);
}
let region_options = version.options.clone();
let new_mutable = Arc::new(
region
.version()
.memtables
.mutable
.new_with_part_duration(version.compaction_time_window),
);
let metadata = manifest.metadata.clone();
let version = VersionBuilder::new(metadata, new_mutable)
.add_files(region.file_purger.clone(), manifest.files.values().cloned())
.flushed_entry_id(manifest.flushed_entry_id)
.flushed_sequence(manifest.flushed_sequence)
.truncated_entry_id(manifest.truncated_entry_id)
.compaction_time_window(manifest.compaction_time_window)
.options(region_options)
.build();
region.version_control.overwrite_current(Arc::new(version));
let _ = sender.send(Ok(manifest.manifest_version));
}
}
impl<S> RegionWorkerLoop<S> {

View File

@@ -25,7 +25,7 @@ use store_api::codec::PrimaryKeyEncoding;
use store_api::logstore::LogStore;
use store_api::storage::RegionId;
use crate::error::{InvalidRequestSnafu, RegionLeaderStateSnafu, RejectWriteSnafu, Result};
use crate::error::{InvalidRequestSnafu, RegionStateSnafu, RejectWriteSnafu, Result};
use crate::metrics::{WRITE_REJECT_TOTAL, WRITE_ROWS_TOTAL, WRITE_STAGE_ELAPSED};
use crate::region::{RegionLeaderState, RegionRoleState};
use crate::region_write_ctx::RegionWriteCtx;
@@ -240,10 +240,10 @@ impl<S> RegionWorkerLoop<S> {
state => {
// The region is not writable.
sender_req.sender.send(
RegionLeaderStateSnafu {
RegionStateSnafu {
region_id,
state,
expect: RegionLeaderState::Writable,
expect: RegionRoleState::Leader(RegionLeaderState::Writable),
}
.fail(),
);

View File

@@ -24,6 +24,7 @@ use async_trait::async_trait;
use common_error::ext::{BoxedError, PlainError};
use common_error::status_code::StatusCode;
use datatypes::schema::ColumnSchema;
use store_api::manifest::ManifestVersion;
use store_api::metadata::{
ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef,
};
@@ -109,6 +110,14 @@ impl RegionEngine for MetaRegionEngine {
unimplemented!()
}
async fn sync_region(
&self,
_region_id: RegionId,
_manifest_version: ManifestVersion,
) -> Result<(), BoxedError> {
unimplemented!()
}
fn role(&self, _region_id: RegionId) -> Option<RegionRole> {
None
}

View File

@@ -34,6 +34,7 @@ use serde::{Deserialize, Serialize};
use tokio::sync::Semaphore;
use crate::logstore::entry;
use crate::manifest::ManifestVersion;
use crate::metadata::RegionMetadataRef;
use crate::region_request::{
BatchRegionDdlRequest, RegionOpenRequest, RegionRequest, RegionSequencesRequest,
@@ -84,6 +85,7 @@ impl SetRegionRoleStateResponse {
pub struct GrantedRegion {
pub region_id: RegionId,
pub region_role: RegionRole,
pub manifest_version: u64,
}
impl GrantedRegion {
@@ -91,6 +93,8 @@ impl GrantedRegion {
Self {
region_id,
region_role,
// TODO(weny): use real manifest version
manifest_version: 0,
}
}
}
@@ -100,6 +104,7 @@ impl From<GrantedRegion> for PbGrantedRegion {
PbGrantedRegion {
region_id: value.region_id.as_u64(),
role: PbRegionRole::from(value.region_role).into(),
manifest_version: value.manifest_version,
}
}
}
@@ -109,6 +114,7 @@ impl From<PbGrantedRegion> for GrantedRegion {
GrantedRegion {
region_id: RegionId::from_u64(value.region_id),
region_role: value.role().into(),
manifest_version: value.manifest_version,
}
}
}
@@ -503,6 +509,13 @@ pub trait RegionEngine: Send + Sync {
/// take effect.
fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError>;
/// Syncs the region manifest to the given manifest version.
async fn sync_region(
&self,
region_id: RegionId,
manifest_version: ManifestVersion,
) -> Result<(), BoxedError>;
/// Sets region role state gracefully.
///
/// After the call returns, the engine ensures no more write operations will succeed in the region.