feat: add CatchupRequest to engine (#2939)

* chore: remove redundant code

* feat(mito): add CatchupRequest

feat: reopen region before replay if need

* chore: apply suggestions from CR

* chore: apply suggestions from CR

* Apply suggestions from code review

Co-authored-by: Yingwen <realevenyag@gmail.com>

---------

Co-authored-by: Yingwen <realevenyag@gmail.com>
This commit is contained in:
Weny Xu
2023-12-21 17:27:53 +09:00
committed by GitHub
parent b5c5458798
commit ff8c10eae7
19 changed files with 676 additions and 50 deletions

View File

@@ -417,7 +417,8 @@ impl RegionServerInner {
| RegionRequest::Alter(_)
| RegionRequest::Flush(_)
| RegionRequest::Compact(_)
| RegionRequest::Truncate(_) => RegionChange::None,
| RegionRequest::Truncate(_)
| RegionRequest::Catchup(_) => RegionChange::None,
};
let engine = match self.get_engine(region_id, &region_change)? {

View File

@@ -131,6 +131,8 @@ impl RegionEngine for MetricEngine {
RegionRequest::Flush(_) => todo!(),
RegionRequest::Compact(_) => todo!(),
RegionRequest::Truncate(_) => todo!(),
/// It always Ok(0), all data is latest.
RegionRequest::Catchup(_) => Ok(0),
};
result.map_err(BoxedError::new)

View File

@@ -19,6 +19,8 @@ mod alter_test;
#[cfg(test)]
mod basic_test;
#[cfg(test)]
mod catchup_test;
#[cfg(test)]
mod close_test;
#[cfg(test)]
mod compaction_test;

View File

@@ -0,0 +1,419 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::assert_matches::assert_matches;
use std::collections::HashMap;
use api::v1::Rows;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_recordbatch::RecordBatches;
use store_api::region_engine::{RegionEngine, SetReadonlyResponse};
use store_api::region_request::{RegionCatchupRequest, RegionOpenRequest, RegionRequest};
use store_api::storage::{RegionId, ScanRequest};
use crate::config::MitoConfig;
use crate::error::{self, Error};
use crate::test_util::{
build_rows, flush_region, put_rows, rows_schema, CreateRequestBuilder, TestEnv,
};
use crate::wal::EntryId;
fn get_last_entry_id(resp: SetReadonlyResponse) -> Option<EntryId> {
if let SetReadonlyResponse::Success { last_entry_id } = resp {
last_entry_id
} else {
unreachable!();
}
}
#[tokio::test]
async fn test_catchup_with_last_entry_id() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::with_prefix("last_entry_id");
let leader_engine = env.create_engine(MitoConfig::default()).await;
let follower_engine = env.create_follower_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
let region_dir = request.region_dir.clone();
let column_schemas = rows_schema(&request);
leader_engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
follower_engine
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: String::new(),
region_dir,
options: HashMap::default(),
skip_wal_replay: false,
}),
)
.await
.unwrap();
// Ensures the mutable is empty.
let region = follower_engine.get_region(region_id).unwrap();
assert!(region.version().memtables.mutable.is_empty());
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows(0, 3),
};
put_rows(&leader_engine, region_id, rows).await;
let resp = leader_engine
.set_readonly_gracefully(region_id)
.await
.unwrap();
let last_entry_id = get_last_entry_id(resp);
assert!(last_entry_id.is_some());
// Replays the memtable.
let resp = follower_engine
.handle_request(
region_id,
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: false,
entry_id: last_entry_id,
}),
)
.await;
let region = follower_engine.get_region(region_id).unwrap();
assert!(!region.is_writable());
assert!(resp.is_ok());
// Scans
let request = ScanRequest::default();
let stream = follower_engine
.handle_query(region_id, request)
.await
.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 0 | 0.0 | 1970-01-01T00:00:00 |
| 1 | 1.0 | 1970-01-01T00:00:01 |
| 2 | 2.0 | 1970-01-01T00:00:02 |
+-------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());
// Replays the memtable again, should be ok.
let resp = follower_engine
.handle_request(
region_id,
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: true,
entry_id: last_entry_id,
}),
)
.await;
let region = follower_engine.get_region(region_id).unwrap();
assert!(region.is_writable());
assert!(resp.is_ok());
}
#[tokio::test]
async fn test_catchup_with_incorrect_last_entry_id() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::with_prefix("incorrect_last_entry_id");
let leader_engine = env.create_engine(MitoConfig::default()).await;
let follower_engine = env.create_follower_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
let region_dir = request.region_dir.clone();
let column_schemas = rows_schema(&request);
leader_engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
follower_engine
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: String::new(),
region_dir,
options: HashMap::default(),
skip_wal_replay: false,
}),
)
.await
.unwrap();
// Ensures the mutable is empty.
let region = follower_engine.get_region(region_id).unwrap();
assert!(region.version().memtables.mutable.is_empty());
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows(0, 3),
};
put_rows(&leader_engine, region_id, rows).await;
let resp = leader_engine
.set_readonly_gracefully(region_id)
.await
.unwrap();
let last_entry_id = get_last_entry_id(resp);
assert!(last_entry_id.is_some());
let incorrect_last_entry_id = last_entry_id.map(|e| e + 1);
// Replays the memtable.
let err = follower_engine
.handle_request(
region_id,
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: false,
entry_id: incorrect_last_entry_id,
}),
)
.await
.unwrap_err();
let err = err.as_any().downcast_ref::<Error>().unwrap();
assert_matches!(err, error::Error::UnexpectedReplay { .. });
// It should ignore requests to writable regions.
region.set_writable(true);
let resp = follower_engine
.handle_request(
region_id,
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: false,
entry_id: incorrect_last_entry_id,
}),
)
.await;
assert!(resp.is_ok());
}
#[tokio::test]
async fn test_catchup_without_last_entry_id() {
let mut env = TestEnv::with_prefix("without_last_entry_id");
let leader_engine = env.create_engine(MitoConfig::default()).await;
let follower_engine = env.create_follower_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
let region_dir = request.region_dir.clone();
let column_schemas = rows_schema(&request);
leader_engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
follower_engine
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: String::new(),
region_dir,
options: HashMap::default(),
skip_wal_replay: false,
}),
)
.await
.unwrap();
let rows = Rows {
schema: column_schemas,
rows: build_rows(0, 3),
};
put_rows(&leader_engine, region_id, rows).await;
// Replays the memtable.
let resp = follower_engine
.handle_request(
region_id,
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: false,
entry_id: None,
}),
)
.await;
assert!(resp.is_ok());
let region = follower_engine.get_region(region_id).unwrap();
assert!(!region.is_writable());
let request = ScanRequest::default();
let stream = follower_engine
.handle_query(region_id, request)
.await
.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 0 | 0.0 | 1970-01-01T00:00:00 |
| 1 | 1.0 | 1970-01-01T00:00:01 |
| 2 | 2.0 | 1970-01-01T00:00:02 |
+-------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());
// Replays the memtable again, should be ok.
let resp = follower_engine
.handle_request(
region_id,
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: true,
entry_id: None,
}),
)
.await;
assert!(resp.is_ok());
let region = follower_engine.get_region(region_id).unwrap();
assert!(region.is_writable());
}
#[tokio::test]
async fn test_catchup_with_manifest_update() {
let mut env = TestEnv::with_prefix("without_manifest_update");
let leader_engine = env.create_engine(MitoConfig::default()).await;
let follower_engine = env.create_follower_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
let region_dir = request.region_dir.clone();
let column_schemas = rows_schema(&request);
leader_engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
follower_engine
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: String::new(),
region_dir,
options: HashMap::default(),
skip_wal_replay: false,
}),
)
.await
.unwrap();
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows(0, 3),
};
put_rows(&leader_engine, region_id, rows).await;
// Triggers to create a new manifest file.
flush_region(&leader_engine, region_id, None).await;
// Puts to WAL.
let rows = Rows {
schema: column_schemas,
rows: build_rows(3, 5),
};
put_rows(&leader_engine, region_id, rows).await;
// Triggers to create a new manifest file.
flush_region(&leader_engine, region_id, None).await;
let region = follower_engine.get_region(region_id).unwrap();
// Ensures the mutable is empty.
assert!(region.version().memtables.mutable.is_empty());
let manifest = region.manifest_manager.manifest().await;
assert_eq!(manifest.manifest_version, 0);
let resp = follower_engine
.handle_request(
region_id,
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: false,
entry_id: None,
}),
)
.await;
assert!(resp.is_ok());
// The inner region was replaced. We must get it again.
let region = follower_engine.get_region(region_id).unwrap();
let manifest = region.manifest_manager.manifest().await;
assert_eq!(manifest.manifest_version, 2);
assert!(!region.is_writable());
let request = ScanRequest::default();
let stream = follower_engine
.handle_query(region_id, request)
.await
.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 0 | 0.0 | 1970-01-01T00:00:00 |
| 1 | 1.0 | 1970-01-01T00:00:01 |
| 2 | 2.0 | 1970-01-01T00:00:02 |
| 3 | 3.0 | 1970-01-01T00:00:03 |
| 4 | 4.0 | 1970-01-01T00:00:04 |
+-------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());
// Replays the memtable again, should be ok.
let resp = follower_engine
.handle_request(
region_id,
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: true,
entry_id: None,
}),
)
.await;
let region = follower_engine.get_region(region_id).unwrap();
assert!(resp.is_ok());
assert!(region.is_writable());
}
#[tokio::test]
async fn test_catchup_not_exist() {
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;
let non_exist_region_id = RegionId::new(1, 1);
let err = engine
.handle_request(
non_exist_region_id,
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: true,
entry_id: None,
}),
)
.await
.unwrap_err();
assert_matches!(err.status_code(), StatusCode::RegionNotFound);
}

View File

@@ -20,7 +20,6 @@ use store_api::region_request::{RegionPutRequest, RegionRequest};
use store_api::storage::RegionId;
use crate::config::MitoConfig;
use crate::request::WorkerRequest;
use crate::test_util::{build_rows, put_rows, rows_schema, CreateRequestBuilder, TestEnv};
#[tokio::test]
@@ -90,18 +89,6 @@ async fn test_set_readonly_gracefully_not_exist() {
let non_exist_region_id = RegionId::new(1, 1);
// For inner `handle_inter_command`.
let (set_readonly_request, recv) =
WorkerRequest::new_set_readonly_gracefully(non_exist_region_id);
engine
.inner
.workers
.submit_to_worker(non_exist_region_id, set_readonly_request)
.await
.unwrap();
let result = recv.await.unwrap();
assert_eq!(SetReadonlyResponse::NotFound, result);
// For fast-path.
let result = engine
.set_readonly_gracefully(non_exist_region_id)

View File

@@ -35,6 +35,17 @@ use crate::worker::WorkerId;
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display(
"Failed to set region {} to writable, it was expected to replayed to {}, but actually replayed to {}",
region_id, expected_last_entry_id, replayed_last_entry_id
))]
UnexpectedReplay {
location: Location,
region_id: RegionId,
expected_last_entry_id: u64,
replayed_last_entry_id: u64,
},
#[snafu(display("OpenDAL operator failed"))]
OpenDal {
location: Location,
@@ -392,6 +403,12 @@ pub enum Error {
location: Location,
},
#[snafu(display("Empty manifest directory, manifest_dir: {}", manifest_dir,))]
EmptyManifestDir {
manifest_dir: String,
location: Location,
},
#[snafu(display("Failed to read arrow record batch from parquet file {}", path))]
ArrowReader {
path: String,
@@ -435,7 +452,8 @@ impl ErrorExt for Error {
| NewRecordBatch { .. }
| RegionCorrupted { .. }
| CreateDefault { .. }
| InvalidParquet { .. } => StatusCode::Unexpected,
| InvalidParquet { .. }
| UnexpectedReplay { .. } => StatusCode::Unexpected,
RegionNotFound { .. } => StatusCode::RegionNotFound,
ObjectStoreNotFound { .. }
| InvalidScanIndex { .. }
@@ -476,7 +494,7 @@ impl ErrorExt for Error {
InvalidRegionRequest { source, .. } => source.status_code(),
RegionReadonly { .. } => StatusCode::RegionReadonly,
JsonOptions { .. } => StatusCode::InvalidArguments,
EmptyRegionDir { .. } => StatusCode::RegionNotFound,
EmptyRegionDir { .. } | EmptyManifestDir { .. } => StatusCode::RegionNotFound,
ArrowReader { .. } => StatusCode::StorageUnavailable,
}
}

View File

@@ -17,6 +17,7 @@
//! Mito is the a region engine to store timeseries data.
#![feature(let_chains)]
#![feature(assert_matches)]
#[cfg(any(test, feature = "test"))]
#[cfg_attr(feature = "test", allow(unused))]

View File

@@ -16,17 +16,19 @@ use std::sync::Arc;
use common_datasource::compression::CompressionType;
use common_telemetry::{debug, info};
use futures::TryStreamExt;
use object_store::ObjectStore;
use snafu::{OptionExt, ResultExt};
use store_api::manifest::{ManifestVersion, MAX_VERSION, MIN_VERSION};
use store_api::metadata::RegionMetadataRef;
use tokio::sync::RwLock;
use crate::error::Result;
use crate::error::{self, Result};
use crate::manifest::action::{
RegionChange, RegionCheckpoint, RegionManifest, RegionManifestBuilder, RegionMetaAction,
RegionMetaActionList,
};
use crate::manifest::storage::ManifestObjectStore;
use crate::manifest::storage::{file_version, is_delta_file, ManifestObjectStore};
/// Options for [RegionManifestManager].
#[derive(Debug, Clone)]
@@ -160,6 +162,12 @@ impl RegionManifestManager {
let inner = self.inner.read().await;
inner.total_manifest_size()
}
/// Returns true if a newer version manifest file is found.
pub async fn has_update(&self) -> Result<bool> {
let inner = self.inner.read().await;
inner.has_update().await
}
}
#[cfg(test)]
@@ -233,7 +241,7 @@ impl RegionManifestManagerInner {
})
}
/// Open an existing manifest.
/// Opens an existing manifest.
///
/// Returns `Ok(None)` if no such manifest.
async fn open(options: RegionManifestOptions) -> Result<Option<Self>> {
@@ -323,7 +331,7 @@ impl RegionManifestManagerInner {
Ok(())
}
/// Update the manifest. Return the current manifest version number.
/// Updates the manifest. Return the current manifest version number.
async fn update(&mut self, action_list: RegionMetaActionList) -> Result<ManifestVersion> {
let version = self.increase_version();
self.store.save(version, &action_list.encode()?).await?;
@@ -385,7 +393,7 @@ impl RegionManifestManagerInner {
Ok(())
}
/// Make a new checkpoint. Return the fresh one if there are some actions to compact.
/// Makes a new checkpoint. Return the fresh one if there are some actions to compact.
async fn do_checkpoint(&mut self) -> Result<Option<RegionCheckpoint>> {
let last_checkpoint = Self::last_checkpoint(&mut self.store).await?;
let current_version = self.last_version;
@@ -459,7 +467,7 @@ impl RegionManifestManagerInner {
Ok(Some(checkpoint))
}
/// Fetch the last [RegionCheckpoint] from storage.
/// Fetches the last [RegionCheckpoint] from storage.
pub(crate) async fn last_checkpoint(
store: &mut ManifestObjectStore,
) -> Result<Option<RegionCheckpoint>> {
@@ -472,6 +480,37 @@ impl RegionManifestManagerInner {
Ok(None)
}
}
/// Returns true if a newer version manifest file is found.
///
/// It is typically used in read-only regions to catch up with manifest.
pub(crate) async fn has_update(&self) -> Result<bool> {
let last_version = self.last_version;
let streamer =
self.store
.manifest_lister()
.await?
.context(error::EmptyManifestDirSnafu {
manifest_dir: self.store.manifest_dir(),
})?;
let need_update = streamer
.try_any(|entry| async move {
let file_name = entry.name();
if is_delta_file(file_name) {
let version = file_version(file_name);
if version > last_version {
return true;
}
}
false
})
.await
.context(error::OpenDalSnafu)?;
Ok(need_update)
}
}
#[cfg(test)]

View File

@@ -21,7 +21,7 @@ use common_telemetry::debug;
use futures::future::try_join_all;
use futures::TryStreamExt;
use lazy_static::lazy_static;
use object_store::{util, Entry, ErrorKind, ObjectStore};
use object_store::{util, Entry, ErrorKind, Lister, ObjectStore};
use regex::Regex;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
@@ -148,6 +148,23 @@ impl ManifestObjectStore {
format!("{}{}", self.path, LAST_CHECKPOINT_FILE)
}
/// Returns the manifest dir
pub(crate) fn manifest_dir(&self) -> &str {
&self.path
}
/// Returns a iterator of manifests.
pub(crate) async fn manifest_lister(&self) -> Result<Option<Lister>> {
match self.object_store.lister_with(&self.path).await {
Ok(streamer) => Ok(Some(streamer)),
Err(e) if e.kind() == ErrorKind::NotFound => {
debug!("Manifest directory does not exists: {}", self.path);
Ok(None)
}
Err(e) => Err(e).context(OpenDalSnafu)?,
}
}
/// Return all `R`s in the root directory that meet the `filter` conditions (that is, the `filter` closure returns `Some(R)`),
/// and discard `R` that does not meet the conditions (that is, the `filter` closure returns `None`)
/// Return an empty vector when directory is not found.
@@ -155,13 +172,8 @@ impl ManifestObjectStore {
where
F: Fn(Entry) -> Option<R>,
{
let streamer = match self.object_store.lister_with(&self.path).await {
Ok(streamer) => streamer,
Err(e) if e.kind() == ErrorKind::NotFound => {
debug!("Manifest directory does not exists: {}", self.path);
return Ok(vec![]);
}
Err(e) => Err(e).context(OpenDalSnafu)?,
let Some(streamer) = self.manifest_lister().await? else {
return Ok(vec![]);
};
streamer
@@ -171,7 +183,12 @@ impl ManifestObjectStore {
.context(OpenDalSnafu)
}
/// Scan the manifest files in the range of [start, end) and return all manifest entries.
/// Sorts the manifest files.
fn sort_manifests(entries: &mut [(ManifestVersion, Entry)]) {
entries.sort_unstable_by(|(v1, _), (v2, _)| v1.cmp(v2));
}
/// Scans the manifest files in the range of [start, end) and return all manifest entries.
pub async fn scan(
&self,
start: ManifestVersion,
@@ -192,7 +209,7 @@ impl ManifestObjectStore {
})
.await?;
entries.sort_unstable_by(|(v1, _), (v2, _)| v1.cmp(v2));
Self::sort_manifests(&mut entries);
Ok(entries)
}

View File

@@ -126,6 +126,11 @@ impl MitoRegion {
self.writable.load(Ordering::Relaxed)
}
/// Returns the region dir.
pub(crate) fn region_dir(&self) -> &str {
self.access_layer.region_dir()
}
/// Sets the writable flag.
pub(crate) fn set_writable(&self, writable: bool) {
self.writable.store(writable, Ordering::Relaxed);

View File

@@ -53,7 +53,7 @@ pub(crate) struct RegionOpener {
object_store_manager: ObjectStoreManagerRef,
region_dir: String,
scheduler: SchedulerRef,
options: HashMap<String, String>,
options: Option<RegionOptions>,
cache_manager: Option<CacheManagerRef>,
skip_wal_replay: bool,
}
@@ -74,7 +74,7 @@ impl RegionOpener {
object_store_manager,
region_dir: normalize_dir(region_dir),
scheduler,
options: HashMap::new(),
options: None,
cache_manager: None,
skip_wal_replay: false,
}
@@ -86,9 +86,15 @@ impl RegionOpener {
self
}
/// Parses and sets options for the region.
pub(crate) fn parse_options(mut self, options: HashMap<String, String>) -> Result<Self> {
self.options = Some(RegionOptions::try_from(&options)?);
Ok(self)
}
/// Sets options for the region.
pub(crate) fn options(mut self, value: HashMap<String, String>) -> Self {
self.options = value;
pub(crate) fn options(mut self, options: RegionOptions) -> Self {
self.options = Some(options);
self
}
@@ -108,9 +114,10 @@ impl RegionOpener {
/// Opens the region if it already exists.
///
/// # Panics
/// Panics if metadata is not set.
/// - Panics if metadata is not set.
/// - Panics if options is not set.
pub(crate) async fn create_or_open<S: LogStore>(
self,
mut self,
config: &MitoConfig,
wal: &Wal<S>,
) -> Result<MitoRegion> {
@@ -145,9 +152,8 @@ impl RegionOpener {
);
}
}
let options = RegionOptions::try_from(&self.options)?;
let options = self.options.take().unwrap();
let wal_options = options.wal_options.clone();
let object_store = self.object_store(&options.storage)?.clone();
// Create a manifest manager for this region and writes regions to the manifest file.
@@ -218,7 +224,7 @@ impl RegionOpener {
config: &MitoConfig,
wal: &Wal<S>,
) -> Result<Option<MitoRegion>> {
let region_options = RegionOptions::try_from(&self.options)?;
let region_options = self.options.as_ref().unwrap().clone();
let wal_options = region_options.wal_options.clone();
let region_manifest_options = self.manifest_options(config, &region_options)?;
@@ -358,13 +364,13 @@ pub(crate) fn check_recovered_region(
}
/// Replays the mutations from WAL and inserts mutations to memtable of given region.
async fn replay_memtable<S: LogStore>(
pub(crate) async fn replay_memtable<S: LogStore>(
wal: &Wal<S>,
wal_options: &WalOptions,
region_id: RegionId,
flushed_entry_id: EntryId,
version_control: &VersionControlRef,
) -> Result<()> {
) -> Result<EntryId> {
let mut rows_replayed = 0;
// Last entry id should start from flushed entry id since there might be no
// data in the WAL.
@@ -392,7 +398,7 @@ async fn replay_memtable<S: LogStore>(
"Replay WAL for region: {}, rows recovered: {}, last entry id: {}",
region_id, rows_replayed, last_entry_id
);
Ok(())
Ok(last_entry_id)
}
/// Returns the directory to the manifest files.

View File

@@ -32,9 +32,9 @@ use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::{ColumnMetadata, RegionMetadata};
use store_api::region_engine::SetReadonlyResponse;
use store_api::region_request::{
AffectedRows, RegionAlterRequest, RegionCloseRequest, RegionCompactRequest,
RegionCreateRequest, RegionDropRequest, RegionFlushRequest, RegionOpenRequest, RegionRequest,
RegionTruncateRequest,
AffectedRows, RegionAlterRequest, RegionCatchupRequest, RegionCloseRequest,
RegionCompactRequest, RegionCreateRequest, RegionDropRequest, RegionFlushRequest,
RegionOpenRequest, RegionRequest, RegionTruncateRequest,
};
use store_api::storage::{RegionId, SequenceNumber};
use tokio::sync::oneshot::{self, Receiver, Sender};
@@ -550,6 +550,11 @@ impl WorkerRequest {
sender: sender.into(),
request: DdlRequest::Truncate(v),
}),
RegionRequest::Catchup(v) => WorkerRequest::Ddl(SenderDdlRequest {
region_id,
sender: sender.into(),
request: DdlRequest::Catchup(v),
}),
};
Ok((worker_request, receiver))
@@ -578,6 +583,7 @@ pub(crate) enum DdlRequest {
Flush(RegionFlushRequest),
Compact(RegionCompactRequest),
Truncate(RegionTruncateRequest),
Catchup(RegionCatchupRequest),
}
/// Sender and Ddl request.

View File

@@ -139,6 +139,14 @@ impl TestEnv {
MitoEngine::new(config, logstore, object_store_manager)
}
/// Creates a new engine with specific config and existing logstore and object store manager.
pub async fn create_follower_engine(&mut self, config: MitoConfig) -> MitoEngine {
let logstore = self.logstore.as_ref().unwrap().clone();
let object_store_manager = self.object_store_manager.as_ref().unwrap().clone();
MitoEngine::new(config, logstore, object_store_manager)
}
/// Creates a new engine with specific config and manager/listener under this env.
pub async fn create_engine_with(
&mut self,
@@ -221,6 +229,7 @@ impl TestEnv {
)
}
/// Returns the log store and object store manager.
async fn create_log_and_object_store_manager(
&self,
) -> (RaftEngineLogStore, ObjectStoreManager) {

View File

@@ -15,6 +15,7 @@
//! Structs and utilities for writing regions.
mod handle_alter;
mod handle_catchup;
mod handle_close;
mod handle_compaction;
mod handle_create;
@@ -546,6 +547,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
continue;
}
DdlRequest::Truncate(_) => self.handle_truncate_request(ddl.region_id).await,
DdlRequest::Catchup(req) => self.handle_catchup_request(ddl.region_id, req).await,
};
ddl.sender.send(res);

View File

@@ -0,0 +1,95 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Handling catchup request.
use std::sync::Arc;
use snafu::ensure;
use store_api::logstore::LogStore;
use store_api::region_request::{AffectedRows, RegionCatchupRequest};
use store_api::storage::RegionId;
use crate::error::{self, Result};
use crate::region::opener::{replay_memtable, RegionOpener};
use crate::worker::RegionWorkerLoop;
impl<S: LogStore> RegionWorkerLoop<S> {
pub(crate) async fn handle_catchup_request(
&mut self,
region_id: RegionId,
request: RegionCatchupRequest,
) -> Result<AffectedRows> {
let Some(region) = self.regions.get_region(region_id) else {
return error::RegionNotFoundSnafu { region_id }.fail();
};
if region.is_writable() {
return Ok(0);
}
// Note: Currently, We protect the split brain by ensuring the mutable table is empty.
// It's expensive to execute catch-up requests without `set_writable=true` multiple times.
let is_mutable_empty = region.version().memtables.mutable.is_empty();
// Utilizes the short circuit evaluation.
let region = if !is_mutable_empty || region.manifest_manager.has_update().await? {
let reopened_region = Arc::new(
RegionOpener::new(
region_id,
region.region_dir(),
self.memtable_builder.clone(),
self.object_store_manager.clone(),
self.scheduler.clone(),
)
.cache(Some(self.cache_manager.clone()))
.options(region.version().options.clone())
.skip_wal_replay(true)
.open(&self.config, &self.wal)
.await?,
);
debug_assert!(!reopened_region.is_writable());
self.regions.insert_region(reopened_region.clone());
reopened_region
} else {
region
};
let flushed_entry_id = region.version_control.current().last_entry_id;
let last_entry_id = replay_memtable(
&self.wal,
&region.wal_options,
region_id,
flushed_entry_id,
&region.version_control,
)
.await?;
if let Some(expected_last_entry_id) = request.entry_id {
ensure!(
expected_last_entry_id == last_entry_id,
error::UnexpectedReplaySnafu {
region_id,
expected_last_entry_id,
replayed_last_entry_id: last_entry_id,
}
)
}
if request.set_writable {
region.set_writable(true);
}
Ok(0)
}
}

View File

@@ -63,7 +63,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
self.scheduler.clone(),
)
.metadata(metadata)
.options(request.options)
.parse_options(request.options)?
.cache(Some(self.cache_manager.clone()))
.create_or_open(&self.config, &self.wal)
.await?;

View File

@@ -69,8 +69,8 @@ impl<S: LogStore> RegionWorkerLoop<S> {
self.object_store_manager.clone(),
self.scheduler.clone(),
)
.options(request.options)
.skip_wal_replay(request.skip_wal_replay)
.parse_options(request.options)?
.cache(Some(self.cache_manager.clone()))
.open(&self.config, &self.wal)
.await?;

View File

@@ -15,7 +15,7 @@
pub use opendal::raw::oio::Pager;
pub use opendal::raw::{normalize_path as raw_normalize_path, HttpClient};
pub use opendal::{
services, Builder as ObjectStoreBuilder, Entry, EntryMode, Error, ErrorKind, Metakey,
services, Builder as ObjectStoreBuilder, Entry, EntryMode, Error, ErrorKind, Lister, Metakey,
Operator as ObjectStore, Reader, Result, Writer,
};

View File

@@ -21,6 +21,7 @@ use api::v1::{self, Rows, SemanticType};
use snafu::{ensure, OptionExt};
use strum::IntoStaticStr;
use crate::logstore::entry;
use crate::metadata::{
ColumnMetadata, InvalidRawRegionRequestSnafu, InvalidRegionRequestSnafu, MetadataError,
RegionMetadata, Result,
@@ -42,6 +43,7 @@ pub enum RegionRequest {
Flush(RegionFlushRequest),
Compact(RegionCompactRequest),
Truncate(RegionTruncateRequest),
Catchup(RegionCatchupRequest),
}
impl RegionRequest {
@@ -59,6 +61,7 @@ impl RegionRequest {
RegionRequest::Flush(_) => "flush",
RegionRequest::Compact(_) => "compact",
RegionRequest::Truncate(_) => "truncate",
RegionRequest::Catchup(_) => "catchup",
}
}
@@ -453,6 +456,19 @@ pub struct RegionCompactRequest {}
#[derive(Debug)]
pub struct RegionTruncateRequest {}
/// Catchup region request.
///
/// Makes a readonly region to catch up to leader region changes.
/// There is no effect if it operating on a leader region.
#[derive(Debug)]
pub struct RegionCatchupRequest {
/// Sets it to writable if it's available after it has caught up with all changes.
pub set_writable: bool,
/// The `entry_id` that was expected to reply to.
/// `None` stands replaying to latest.
pub entry_id: Option<entry::Id>,
}
impl fmt::Display for RegionRequest {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
@@ -466,6 +482,7 @@ impl fmt::Display for RegionRequest {
RegionRequest::Flush(_) => write!(f, "Flush"),
RegionRequest::Compact(_) => write!(f, "Compact"),
RegionRequest::Truncate(_) => write!(f, "Truncate"),
RegionRequest::Catchup(_) => write!(f, "Catchup"),
}
}
}