mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
Compare commits
1 Commits
feat/inges
...
v1.0.0-bet
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8346acb900 |
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -8362,6 +8362,7 @@ dependencies = [
|
||||
"common-macro",
|
||||
"common-telemetry",
|
||||
"common-test-util",
|
||||
"derive_builder 0.20.2",
|
||||
"futures",
|
||||
"humantime-serde",
|
||||
"lazy_static",
|
||||
|
||||
@@ -1200,7 +1200,8 @@ impl RegionServerInner {
|
||||
| RegionRequest::Flush(_)
|
||||
| RegionRequest::Compact(_)
|
||||
| RegionRequest::Truncate(_)
|
||||
| RegionRequest::BuildIndex(_) => RegionChange::None,
|
||||
| RegionRequest::BuildIndex(_)
|
||||
| RegionRequest::EnterStaging(_) => RegionChange::None,
|
||||
RegionRequest::Catchup(_) => RegionChange::Catchup,
|
||||
};
|
||||
|
||||
|
||||
@@ -23,6 +23,7 @@ mod options;
|
||||
mod put;
|
||||
mod read;
|
||||
mod region_metadata;
|
||||
mod staging;
|
||||
mod state;
|
||||
mod sync;
|
||||
|
||||
@@ -211,6 +212,13 @@ impl RegionEngine for MetricEngine {
|
||||
let mut extension_return_value = HashMap::new();
|
||||
|
||||
let result = match request {
|
||||
RegionRequest::EnterStaging(_) => {
|
||||
if self.inner.is_physical_region(region_id) {
|
||||
self.handle_enter_staging_request(region_id, request).await
|
||||
} else {
|
||||
UnsupportedRegionRequestSnafu { request }.fail()
|
||||
}
|
||||
}
|
||||
RegionRequest::Put(put) => self.inner.put_region(region_id, put).await,
|
||||
RegionRequest::Create(create) => {
|
||||
self.inner
|
||||
|
||||
54
src/metric-engine/src/engine/staging.rs
Normal file
54
src/metric-engine/src/engine/staging.rs
Normal file
@@ -0,0 +1,54 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_base::AffectedRows;
|
||||
use snafu::ResultExt;
|
||||
use store_api::region_engine::RegionEngine;
|
||||
use store_api::region_request::{EnterStagingRequest, RegionRequest};
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::engine::MetricEngine;
|
||||
use crate::error::{MitoEnterStagingOperationSnafu, Result};
|
||||
use crate::utils;
|
||||
|
||||
impl MetricEngine {
|
||||
/// Handles the enter staging request for the given region.
|
||||
pub(crate) async fn handle_enter_staging_request(
|
||||
&self,
|
||||
region_id: RegionId,
|
||||
request: RegionRequest,
|
||||
) -> Result<AffectedRows> {
|
||||
let metadata_region_id = utils::to_metadata_region_id(region_id);
|
||||
let data_region_id = utils::to_data_region_id(region_id);
|
||||
|
||||
// For metadata region, it doesn't care about the partition expr, so we can just pass an empty string.
|
||||
self.inner
|
||||
.mito
|
||||
.handle_request(
|
||||
metadata_region_id,
|
||||
RegionRequest::EnterStaging(EnterStagingRequest {
|
||||
partition_expr: String::new(),
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.context(MitoEnterStagingOperationSnafu)?;
|
||||
|
||||
self.inner
|
||||
.mito
|
||||
.handle_request(data_region_id, request)
|
||||
.await
|
||||
.context(MitoEnterStagingOperationSnafu)
|
||||
.map(|response| response.affected_rows)
|
||||
}
|
||||
}
|
||||
@@ -156,6 +156,13 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Mito enter staging operation fails"))]
|
||||
MitoEnterStagingOperation {
|
||||
source: BoxedError,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to collect record batch stream"))]
|
||||
CollectRecordBatchStream {
|
||||
source: common_recordbatch::error::Error,
|
||||
@@ -360,6 +367,7 @@ impl ErrorExt for Error {
|
||||
| MitoWriteOperation { source, .. }
|
||||
| MitoFlushOperation { source, .. }
|
||||
| MitoSyncOperation { source, .. }
|
||||
| MitoEnterStagingOperation { source, .. }
|
||||
| BatchOpenMitoRegion { source, .. }
|
||||
| BatchCatchupMitoRegion { source, .. } => source.status_code(),
|
||||
|
||||
|
||||
@@ -55,7 +55,7 @@ lazy_static = "1.4"
|
||||
log-store = { workspace = true }
|
||||
mito-codec.workspace = true
|
||||
moka = { workspace = true, features = ["sync", "future"] }
|
||||
object-store.workspace = true
|
||||
object-store = { workspace = true, features = ["testing"] }
|
||||
parquet = { workspace = true, features = ["async"] }
|
||||
paste.workspace = true
|
||||
pin-project.workspace = true
|
||||
|
||||
@@ -501,7 +501,7 @@ impl Compactor for DefaultCompactor {
|
||||
// TODO: We might leak files if we fail to update manifest. We can add a cleanup task to remove them later.
|
||||
compaction_region
|
||||
.manifest_ctx
|
||||
.update_manifest(RegionLeaderState::Writable, action_list)
|
||||
.update_manifest(RegionLeaderState::Writable, action_list, false)
|
||||
.await?;
|
||||
|
||||
Ok(edit)
|
||||
|
||||
@@ -117,7 +117,7 @@ impl CompactionTaskImpl {
|
||||
};
|
||||
if let Err(e) = compaction_region
|
||||
.manifest_ctx
|
||||
.update_manifest(current_region_state, action_list)
|
||||
.update_manifest(current_region_state, action_list, false)
|
||||
.await
|
||||
{
|
||||
warn!(
|
||||
|
||||
@@ -901,7 +901,7 @@ async fn test_alter_region_ttl_options_with_format(flat_format: bool) {
|
||||
check_ttl(&engine, &Duration::from_secs(500));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_write_stall_on_altering() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
@@ -952,6 +952,8 @@ async fn test_write_stall_on_altering_with_format(flat_format: bool) {
|
||||
.await
|
||||
.unwrap();
|
||||
});
|
||||
// Make sure the loop is handling the alter request.
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
|
||||
let column_schemas_cloned = column_schemas.clone();
|
||||
let engine_cloned = engine.clone();
|
||||
@@ -962,6 +964,8 @@ async fn test_write_stall_on_altering_with_format(flat_format: bool) {
|
||||
};
|
||||
put_rows(&engine_cloned, region_id, rows).await;
|
||||
});
|
||||
// Make sure the loop is handling the put request.
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
|
||||
listener.wake_notify();
|
||||
alter_job.await.unwrap();
|
||||
|
||||
@@ -74,6 +74,9 @@ pub trait EventListener: Send + Sync {
|
||||
/// Notifies the listener that region starts to send a region change result to worker.
|
||||
async fn on_notify_region_change_result_begin(&self, _region_id: RegionId) {}
|
||||
|
||||
/// Notifies the listener that region starts to send a enter staging result to worker.
|
||||
async fn on_enter_staging_result_begin(&self, _region_id: RegionId) {}
|
||||
|
||||
/// Notifies the listener that the index build task is executed successfully.
|
||||
async fn on_index_build_finish(&self, _region_file_id: RegionFileId) {}
|
||||
|
||||
@@ -307,6 +310,37 @@ impl EventListener for NotifyRegionChangeResultListener {
|
||||
region_id
|
||||
);
|
||||
self.notify.notified().await;
|
||||
info!(
|
||||
"Continue to sending region change result for region {}",
|
||||
region_id
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct NotifyEnterStagingResultListener {
|
||||
notify: Notify,
|
||||
}
|
||||
|
||||
impl NotifyEnterStagingResultListener {
|
||||
/// Continue to sending enter staging result.
|
||||
pub fn wake_notify(&self) {
|
||||
self.notify.notify_one();
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl EventListener for NotifyEnterStagingResultListener {
|
||||
async fn on_enter_staging_result_begin(&self, region_id: RegionId) {
|
||||
info!(
|
||||
"Wait on notify to start notify enter staging result for region {}",
|
||||
region_id
|
||||
);
|
||||
self.notify.notified().await;
|
||||
info!(
|
||||
"Continue to sending enter staging result for region {}",
|
||||
region_id
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -14,17 +14,30 @@
|
||||
|
||||
//! Integration tests for staging state functionality.
|
||||
|
||||
use std::assert_matches::assert_matches;
|
||||
use std::fs;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use api::v1::Rows;
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_recordbatch::RecordBatches;
|
||||
use object_store::Buffer;
|
||||
use object_store::layers::mock::{
|
||||
Entry, Error as MockError, ErrorKind, List, Lister, Metadata, MockLayerBuilder,
|
||||
Result as MockResult, Write, Writer,
|
||||
};
|
||||
use store_api::region_engine::{RegionEngine, SettableRegionRoleState};
|
||||
use store_api::region_request::{
|
||||
RegionAlterRequest, RegionFlushRequest, RegionRequest, RegionTruncateRequest,
|
||||
EnterStagingRequest, RegionAlterRequest, RegionFlushRequest, RegionRequest,
|
||||
RegionTruncateRequest,
|
||||
};
|
||||
use store_api::storage::{RegionId, ScanRequest};
|
||||
|
||||
use crate::config::MitoConfig;
|
||||
use crate::engine::listener::NotifyEnterStagingResultListener;
|
||||
use crate::error::Error;
|
||||
use crate::region::{RegionLeaderState, RegionRoleState};
|
||||
use crate::request::WorkerRequest;
|
||||
use crate::test_util::{CreateRequestBuilder, TestEnv, build_rows, put_rows, rows_schema};
|
||||
@@ -214,6 +227,8 @@ async fn test_staging_state_validation_patterns() {
|
||||
);
|
||||
}
|
||||
|
||||
const PARTITION_EXPR: &str = "partition_expr";
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_staging_manifest_directory() {
|
||||
test_staging_manifest_directory_with_format(false).await;
|
||||
@@ -221,6 +236,7 @@ async fn test_staging_manifest_directory() {
|
||||
}
|
||||
|
||||
async fn test_staging_manifest_directory_with_format(flat_format: bool) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
@@ -255,9 +271,57 @@ async fn test_staging_manifest_directory_with_format(flat_format: bool) {
|
||||
// Now test staging mode manifest creation
|
||||
// Set region to staging mode using the engine API
|
||||
engine
|
||||
.set_region_role_state_gracefully(region_id, SettableRegionRoleState::StagingLeader)
|
||||
.handle_request(
|
||||
region_id,
|
||||
RegionRequest::EnterStaging(EnterStagingRequest {
|
||||
partition_expr: PARTITION_EXPR.to_string(),
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let region = engine.get_region(region_id).unwrap();
|
||||
let staging_partition_expr = region.staging_partition_expr.lock().unwrap().clone();
|
||||
assert_eq!(staging_partition_expr.unwrap(), PARTITION_EXPR);
|
||||
{
|
||||
let manager = region.manifest_ctx.manifest_manager.read().await;
|
||||
assert_eq!(
|
||||
manager
|
||||
.staging_manifest()
|
||||
.unwrap()
|
||||
.metadata
|
||||
.partition_expr
|
||||
.as_deref()
|
||||
.unwrap(),
|
||||
PARTITION_EXPR
|
||||
);
|
||||
assert!(manager.manifest().metadata.partition_expr.is_none());
|
||||
}
|
||||
|
||||
// Should be ok to enter staging mode again with the same partition expr
|
||||
engine
|
||||
.handle_request(
|
||||
region_id,
|
||||
RegionRequest::EnterStaging(EnterStagingRequest {
|
||||
partition_expr: PARTITION_EXPR.to_string(),
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Should throw error if try to enter staging mode again with a different partition expr
|
||||
let err = engine
|
||||
.handle_request(
|
||||
region_id,
|
||||
RegionRequest::EnterStaging(EnterStagingRequest {
|
||||
partition_expr: "".to_string(),
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert_matches!(
|
||||
err.into_inner().as_any().downcast_ref::<Error>().unwrap(),
|
||||
Error::StagingPartitionExprMismatch { .. }
|
||||
);
|
||||
|
||||
// Put some data and flush in staging mode
|
||||
let rows_data = Rows {
|
||||
@@ -312,6 +376,7 @@ async fn test_staging_exit_success_with_manifests() {
|
||||
}
|
||||
|
||||
async fn test_staging_exit_success_with_manifests_with_format(flat_format: bool) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
@@ -330,16 +395,28 @@ async fn test_staging_exit_success_with_manifests_with_format(flat_format: bool)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Add some data and flush in staging mode to generate staging manifests
|
||||
let rows_data = Rows {
|
||||
schema: column_schemas.clone(),
|
||||
rows: build_rows(0, 3),
|
||||
};
|
||||
put_rows(&engine, region_id, rows_data).await;
|
||||
|
||||
// Enter staging mode
|
||||
engine
|
||||
.set_region_role_state_gracefully(region_id, SettableRegionRoleState::StagingLeader)
|
||||
.handle_request(
|
||||
region_id,
|
||||
RegionRequest::EnterStaging(EnterStagingRequest {
|
||||
partition_expr: PARTITION_EXPR.to_string(),
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Add some data and flush in staging mode to generate staging manifests
|
||||
let rows_data = Rows {
|
||||
schema: column_schemas.clone(),
|
||||
rows: build_rows(0, 5),
|
||||
rows: build_rows(3, 8),
|
||||
};
|
||||
put_rows(&engine, region_id, rows_data).await;
|
||||
|
||||
@@ -357,7 +434,7 @@ async fn test_staging_exit_success_with_manifests_with_format(flat_format: bool)
|
||||
// Add more data and flush again to generate multiple staging manifests
|
||||
let rows_data2 = Rows {
|
||||
schema: column_schemas.clone(),
|
||||
rows: build_rows(5, 10),
|
||||
rows: build_rows(8, 10),
|
||||
};
|
||||
put_rows(&engine, region_id, rows_data2).await;
|
||||
|
||||
@@ -382,8 +459,11 @@ async fn test_staging_exit_success_with_manifests_with_format(flat_format: bool)
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
staging_files_before.len(),
|
||||
2,
|
||||
"Staging manifest directory should contain two files before exit"
|
||||
// Two files for flush operation
|
||||
// One file for entering staging mode
|
||||
3,
|
||||
"Staging manifest directory should contain 3 files before exit, got: {:?}",
|
||||
staging_files_before
|
||||
);
|
||||
|
||||
// Count normal manifest files before exit
|
||||
@@ -394,8 +474,11 @@ async fn test_staging_exit_success_with_manifests_with_format(flat_format: bool)
|
||||
.unwrap();
|
||||
let normal_count_before = normal_files_before.len();
|
||||
assert_eq!(
|
||||
normal_count_before, 1,
|
||||
"Normal manifest directory should initially contain one file"
|
||||
// One file for table creation
|
||||
// One file for flush operation
|
||||
normal_count_before,
|
||||
2,
|
||||
"Normal manifest directory should initially contain 2 files"
|
||||
);
|
||||
|
||||
// Try read data before exiting staging, SST files should be invisible
|
||||
@@ -403,8 +486,8 @@ async fn test_staging_exit_success_with_manifests_with_format(flat_format: bool)
|
||||
let scanner = engine.scanner(region_id, request).await.unwrap();
|
||||
assert_eq!(
|
||||
scanner.num_files(),
|
||||
0,
|
||||
"No SST files should be scanned before exit"
|
||||
1,
|
||||
"1 SST files should be scanned before exit"
|
||||
);
|
||||
assert_eq!(
|
||||
scanner.num_memtables(),
|
||||
@@ -415,14 +498,20 @@ async fn test_staging_exit_success_with_manifests_with_format(flat_format: bool)
|
||||
let batches = RecordBatches::try_collect(stream).await.unwrap();
|
||||
let total_rows: usize = batches.iter().map(|rb| rb.num_rows()).sum();
|
||||
assert_eq!(
|
||||
total_rows, 0,
|
||||
"No data should be readable before exit staging mode"
|
||||
total_rows, 3,
|
||||
"3 rows should be readable before exit staging mode"
|
||||
);
|
||||
|
||||
// Inspect SSTs from manifest
|
||||
let sst_entries = engine.all_ssts_from_manifest().await;
|
||||
assert_eq!(sst_entries.len(), 2);
|
||||
assert!(sst_entries.iter().all(|e| !e.visible));
|
||||
assert_eq!(
|
||||
sst_entries.len(),
|
||||
3,
|
||||
"sst entries should be 3, got: {:?}",
|
||||
sst_entries
|
||||
);
|
||||
assert_eq!(sst_entries.iter().filter(|e| e.visible).count(), 1);
|
||||
assert_eq!(sst_entries.iter().filter(|e| !e.visible).count(), 2);
|
||||
|
||||
// Exit staging mode successfully
|
||||
engine
|
||||
@@ -470,7 +559,7 @@ async fn test_staging_exit_success_with_manifests_with_format(flat_format: bool)
|
||||
let scanner = engine.scanner(region_id, request).await.unwrap();
|
||||
assert_eq!(
|
||||
scanner.num_files(),
|
||||
2,
|
||||
3,
|
||||
"SST files should be scanned after exit"
|
||||
);
|
||||
|
||||
@@ -482,6 +571,209 @@ async fn test_staging_exit_success_with_manifests_with_format(flat_format: bool)
|
||||
|
||||
// Inspect SSTs from manifest
|
||||
let sst_entries = engine.all_ssts_from_manifest().await;
|
||||
assert_eq!(sst_entries.len(), 2);
|
||||
assert_eq!(sst_entries.len(), 3);
|
||||
assert!(sst_entries.iter().all(|e| e.visible));
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_write_stall_on_enter_staging() {
|
||||
test_write_stall_on_enter_staging_with_format(false).await;
|
||||
test_write_stall_on_enter_staging_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_write_stall_on_enter_staging_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::new().await;
|
||||
let listener = Arc::new(NotifyEnterStagingResultListener::default());
|
||||
let engine = env
|
||||
.create_engine_with(
|
||||
MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
},
|
||||
None,
|
||||
Some(listener.clone()),
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new().build();
|
||||
|
||||
env.get_schema_metadata_manager()
|
||||
.register_region_table_info(
|
||||
region_id.table_id(),
|
||||
"test_table",
|
||||
"test_catalog",
|
||||
"test_schema",
|
||||
None,
|
||||
env.get_kv_backend(),
|
||||
)
|
||||
.await;
|
||||
|
||||
let column_schemas = rows_schema(&request);
|
||||
engine
|
||||
.handle_request(region_id, RegionRequest::Create(request))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let engine_cloned = engine.clone();
|
||||
let alter_job = tokio::spawn(async move {
|
||||
engine_cloned
|
||||
.handle_request(
|
||||
region_id,
|
||||
RegionRequest::EnterStaging(EnterStagingRequest {
|
||||
partition_expr: PARTITION_EXPR.to_string(),
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
});
|
||||
// Make sure the loop is handling the alter request.
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
|
||||
let column_schemas_cloned = column_schemas.clone();
|
||||
let engine_cloned = engine.clone();
|
||||
let put_job = tokio::spawn(async move {
|
||||
let rows = Rows {
|
||||
schema: column_schemas_cloned,
|
||||
rows: build_rows(0, 3),
|
||||
};
|
||||
put_rows(&engine_cloned, region_id, rows).await;
|
||||
});
|
||||
// Make sure the loop is handling the put request.
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
|
||||
listener.wake_notify();
|
||||
alter_job.await.unwrap();
|
||||
put_job.await.unwrap();
|
||||
|
||||
let expected = "\
|
||||
+-------+---------+---------------------+
|
||||
| tag_0 | field_0 | ts |
|
||||
+-------+---------+---------------------+
|
||||
| 0 | 0.0 | 1970-01-01T00:00:00 |
|
||||
| 1 | 1.0 | 1970-01-01T00:00:01 |
|
||||
| 2 | 2.0 | 1970-01-01T00:00:02 |
|
||||
+-------+---------+---------------------+";
|
||||
let request = ScanRequest::default();
|
||||
let scanner = engine.scanner(region_id, request).await.unwrap();
|
||||
let stream = scanner.scan().await.unwrap();
|
||||
let batches = RecordBatches::try_collect(stream).await.unwrap();
|
||||
assert_eq!(expected, batches.pretty_print().unwrap());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_enter_staging_clean_staging_manifest_error() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
test_enter_staging_clean_staging_manifest_error_with_format(false).await;
|
||||
test_enter_staging_clean_staging_manifest_error_with_format(true).await;
|
||||
}
|
||||
|
||||
struct MockLister {
|
||||
path: String,
|
||||
inner: Lister,
|
||||
}
|
||||
|
||||
impl List for MockLister {
|
||||
async fn next(&mut self) -> MockResult<Option<Entry>> {
|
||||
if self.path.contains("staging") {
|
||||
return Err(MockError::new(ErrorKind::Unexpected, "mock error"));
|
||||
}
|
||||
self.inner.next().await
|
||||
}
|
||||
}
|
||||
|
||||
struct MockWriter {
|
||||
path: String,
|
||||
inner: Writer,
|
||||
}
|
||||
|
||||
impl Write for MockWriter {
|
||||
async fn write(&mut self, bs: Buffer) -> MockResult<()> {
|
||||
self.inner.write(bs).await
|
||||
}
|
||||
|
||||
async fn close(&mut self) -> MockResult<Metadata> {
|
||||
if self.path.contains("staging") {
|
||||
return Err(MockError::new(ErrorKind::Unexpected, "mock error"));
|
||||
}
|
||||
self.inner.close().await
|
||||
}
|
||||
|
||||
async fn abort(&mut self) -> MockResult<()> {
|
||||
self.inner.abort().await
|
||||
}
|
||||
}
|
||||
|
||||
async fn test_enter_staging_error(env: &mut TestEnv, flat_format: bool) {
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
let region_id = RegionId::new(1024, 0);
|
||||
let request = CreateRequestBuilder::new().build();
|
||||
engine
|
||||
.handle_request(region_id, RegionRequest::Create(request))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let err = engine
|
||||
.handle_request(
|
||||
region_id,
|
||||
RegionRequest::EnterStaging(EnterStagingRequest {
|
||||
partition_expr: PARTITION_EXPR.to_string(),
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert_eq!(err.status_code(), StatusCode::StorageUnavailable);
|
||||
let region = engine.get_region(region_id).unwrap();
|
||||
assert!(
|
||||
region
|
||||
.manifest_ctx
|
||||
.manifest_manager
|
||||
.read()
|
||||
.await
|
||||
.staging_manifest()
|
||||
.is_none()
|
||||
);
|
||||
let state = region.state();
|
||||
assert_eq!(state, RegionRoleState::Leader(RegionLeaderState::Writable));
|
||||
}
|
||||
|
||||
async fn test_enter_staging_clean_staging_manifest_error_with_format(flat_format: bool) {
|
||||
let mock_layer = MockLayerBuilder::default()
|
||||
.lister_factory(Arc::new(|path, _args, lister| {
|
||||
Box::new(MockLister {
|
||||
path: path.to_string(),
|
||||
inner: lister,
|
||||
})
|
||||
}))
|
||||
.build()
|
||||
.unwrap();
|
||||
let mut env = TestEnv::new().await.with_mock_layer(mock_layer);
|
||||
test_enter_staging_error(&mut env, flat_format).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_enter_staging_save_staging_manifest_error() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
test_enter_staging_save_staging_manifest_error_with_format(false).await;
|
||||
test_enter_staging_save_staging_manifest_error_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_enter_staging_save_staging_manifest_error_with_format(flat_format: bool) {
|
||||
let mock_layer = MockLayerBuilder::default()
|
||||
.writer_factory(Arc::new(|path, _args, lister| {
|
||||
Box::new(MockWriter {
|
||||
path: path.to_string(),
|
||||
inner: lister,
|
||||
})
|
||||
}))
|
||||
.build()
|
||||
.unwrap();
|
||||
let mut env = TestEnv::new().await.with_mock_layer(mock_layer);
|
||||
test_enter_staging_error(&mut env, flat_format).await;
|
||||
}
|
||||
|
||||
@@ -1150,6 +1150,18 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Staging partition expr mismatch, manifest: {:?}, request: {}",
|
||||
manifest_expr,
|
||||
request_expr
|
||||
))]
|
||||
StagingPartitionExprMismatch {
|
||||
manifest_expr: Option<String>,
|
||||
request_expr: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
@@ -1196,7 +1208,8 @@ impl ErrorExt for Error {
|
||||
| InstallManifestTo { .. }
|
||||
| Unexpected { .. }
|
||||
| SerializeColumnMetadata { .. }
|
||||
| SerializeManifest { .. } => StatusCode::Unexpected,
|
||||
| SerializeManifest { .. }
|
||||
| StagingPartitionExprMismatch { .. } => StatusCode::Unexpected,
|
||||
|
||||
RegionNotFound { .. } => StatusCode::RegionNotFound,
|
||||
ObjectStoreNotFound { .. }
|
||||
|
||||
@@ -208,7 +208,7 @@ impl WriteBufferManager for WriteBufferManagerImpl {
|
||||
}
|
||||
|
||||
/// Reason of a flush task.
|
||||
#[derive(Debug, IntoStaticStr)]
|
||||
#[derive(Debug, IntoStaticStr, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum FlushReason {
|
||||
/// Other reasons.
|
||||
Others,
|
||||
@@ -222,6 +222,8 @@ pub enum FlushReason {
|
||||
Periodically,
|
||||
/// Flush memtable during downgrading state.
|
||||
Downgrading,
|
||||
/// Enter staging mode.
|
||||
EnterStaging,
|
||||
}
|
||||
|
||||
impl FlushReason {
|
||||
@@ -253,6 +255,8 @@ pub(crate) struct RegionFlushTask {
|
||||
pub(crate) index_options: IndexOptions,
|
||||
/// Semaphore to control flush concurrency.
|
||||
pub(crate) flush_semaphore: Arc<Semaphore>,
|
||||
/// Whether the region is in staging mode.
|
||||
pub(crate) is_staging: bool,
|
||||
}
|
||||
|
||||
impl RegionFlushTask {
|
||||
@@ -316,6 +320,7 @@ impl RegionFlushTask {
|
||||
_timer: timer,
|
||||
edit,
|
||||
memtables_to_remove,
|
||||
is_staging: self.is_staging,
|
||||
};
|
||||
WorkerRequest::Background {
|
||||
region_id: self.region_id,
|
||||
@@ -398,7 +403,10 @@ impl RegionFlushTask {
|
||||
flushed_sequence: Some(version_data.committed_sequence),
|
||||
committed_sequence: None,
|
||||
};
|
||||
info!("Applying {edit:?} to region {}", self.region_id);
|
||||
info!(
|
||||
"Applying {edit:?} to region {}, is_staging: {}",
|
||||
self.region_id, self.is_staging
|
||||
);
|
||||
|
||||
let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
|
||||
|
||||
@@ -417,11 +425,12 @@ impl RegionFlushTask {
|
||||
// add a cleanup job to remove them later.
|
||||
let version = self
|
||||
.manifest_ctx
|
||||
.update_manifest(expected_state, action_list)
|
||||
.update_manifest(expected_state, action_list, self.is_staging)
|
||||
.await?;
|
||||
info!(
|
||||
"Successfully update manifest version to {version}, region: {}, reason: {}",
|
||||
"Successfully update manifest version to {version}, region: {}, is_staging: {}, reason: {}",
|
||||
self.region_id,
|
||||
self.is_staging,
|
||||
self.reason.as_str()
|
||||
);
|
||||
|
||||
@@ -1292,6 +1301,7 @@ mod tests {
|
||||
.await,
|
||||
index_options: IndexOptions::default(),
|
||||
flush_semaphore: Arc::new(Semaphore::new(2)),
|
||||
is_staging: false,
|
||||
};
|
||||
task.push_sender(OptionOutputTx::from(output_tx));
|
||||
scheduler
|
||||
@@ -1334,6 +1344,7 @@ mod tests {
|
||||
manifest_ctx: manifest_ctx.clone(),
|
||||
index_options: IndexOptions::default(),
|
||||
flush_semaphore: Arc::new(Semaphore::new(2)),
|
||||
is_staging: false,
|
||||
})
|
||||
.collect();
|
||||
// Schedule first task.
|
||||
|
||||
@@ -25,7 +25,6 @@ use crate::manifest::action::{RegionCheckpoint, RegionManifest};
|
||||
use crate::manifest::manager::RegionManifestOptions;
|
||||
use crate::manifest::storage::ManifestObjectStore;
|
||||
use crate::metrics::MANIFEST_OP_ELAPSED;
|
||||
use crate::region::{RegionLeaderState, RegionRoleState};
|
||||
|
||||
/// [`Checkpointer`] is responsible for doing checkpoint for a region, in an asynchronous way.
|
||||
#[derive(Debug)]
|
||||
@@ -137,20 +136,7 @@ impl Checkpointer {
|
||||
/// Check if it's needed to do checkpoint for the region by the checkpoint distance.
|
||||
/// If needed, and there's no currently running checkpoint task, it will start a new checkpoint
|
||||
/// task running in the background.
|
||||
pub(crate) fn maybe_do_checkpoint(
|
||||
&self,
|
||||
manifest: &RegionManifest,
|
||||
region_state: RegionRoleState,
|
||||
) {
|
||||
// Skip checkpoint if region is in staging state
|
||||
if region_state == RegionRoleState::Leader(RegionLeaderState::Staging) {
|
||||
info!(
|
||||
"Skipping checkpoint for region {} in staging mode, manifest version: {}",
|
||||
manifest.metadata.region_id, manifest.manifest_version
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
pub(crate) fn maybe_do_checkpoint(&self, manifest: &RegionManifest) {
|
||||
if self.manifest_options.checkpoint_distance == 0 {
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -151,6 +151,10 @@ pub struct RegionManifestManager {
|
||||
last_version: Arc<AtomicU64>,
|
||||
checkpointer: Checkpointer,
|
||||
manifest: Arc<RegionManifest>,
|
||||
// Staging manifest is used to store the manifest of the staging region before it becomes available.
|
||||
// It is initially inherited from the previous manifest(i.e., `self.manifest`).
|
||||
// When the staging manifest becomes available, it will be used to construct the new manifest.
|
||||
staging_manifest: Option<Arc<RegionManifest>>,
|
||||
stats: ManifestStats,
|
||||
stopped: bool,
|
||||
}
|
||||
@@ -229,6 +233,7 @@ impl RegionManifestManager {
|
||||
last_version: manifest_version,
|
||||
checkpointer,
|
||||
manifest: Arc::new(manifest),
|
||||
staging_manifest: None,
|
||||
stats: stats.clone(),
|
||||
stopped: false,
|
||||
})
|
||||
@@ -334,6 +339,8 @@ impl RegionManifestManager {
|
||||
last_version: manifest_version,
|
||||
checkpointer,
|
||||
manifest: Arc::new(manifest),
|
||||
// TODO(weny): open the staging manifest if exists.
|
||||
staging_manifest: None,
|
||||
stats: stats.clone(),
|
||||
stopped: false,
|
||||
}))
|
||||
@@ -504,7 +511,7 @@ impl RegionManifestManager {
|
||||
pub async fn update(
|
||||
&mut self,
|
||||
action_list: RegionMetaActionList,
|
||||
region_state: RegionRoleState,
|
||||
is_staging: bool,
|
||||
) -> Result<ManifestVersion> {
|
||||
let _t = MANIFEST_OP_ELAPSED
|
||||
.with_label_values(&["update"])
|
||||
@@ -518,13 +525,19 @@ impl RegionManifestManager {
|
||||
);
|
||||
|
||||
let version = self.increase_version();
|
||||
let is_staging = region_state == RegionRoleState::Leader(RegionLeaderState::Staging);
|
||||
self.store
|
||||
.save(version, &action_list.encode()?, is_staging)
|
||||
.await?;
|
||||
|
||||
// For a staging region, the manifest is initially inherited from the previous manifest(i.e., `self.manifest`).
|
||||
// When the staging manifest becomes available, it will be used to construct the new manifest.
|
||||
let mut manifest_builder =
|
||||
RegionManifestBuilder::with_checkpoint(Some(self.manifest.as_ref().clone()));
|
||||
if is_staging && let Some(staging_manifest) = self.staging_manifest.as_ref() {
|
||||
RegionManifestBuilder::with_checkpoint(Some(staging_manifest.as_ref().clone()))
|
||||
} else {
|
||||
RegionManifestBuilder::with_checkpoint(Some(self.manifest.as_ref().clone()))
|
||||
};
|
||||
|
||||
for action in action_list.actions {
|
||||
match action {
|
||||
RegionMetaAction::Change(action) => {
|
||||
@@ -544,17 +557,27 @@ impl RegionManifestManager {
|
||||
}
|
||||
}
|
||||
}
|
||||
let new_manifest = manifest_builder.try_build()?;
|
||||
new_manifest
|
||||
.removed_files
|
||||
.update_file_removed_cnt_to_stats(&self.stats);
|
||||
let updated_manifest = self
|
||||
.checkpointer
|
||||
.update_manifest_removed_files(new_manifest)?;
|
||||
self.manifest = Arc::new(updated_manifest);
|
||||
|
||||
self.checkpointer
|
||||
.maybe_do_checkpoint(self.manifest.as_ref(), region_state);
|
||||
if is_staging {
|
||||
let new_manifest = manifest_builder.try_build()?;
|
||||
self.staging_manifest = Some(Arc::new(new_manifest));
|
||||
|
||||
info!(
|
||||
"Skipping checkpoint for region {} in staging mode, manifest version: {}",
|
||||
self.manifest.metadata.region_id, self.manifest.manifest_version
|
||||
);
|
||||
} else {
|
||||
let new_manifest = manifest_builder.try_build()?;
|
||||
new_manifest
|
||||
.removed_files
|
||||
.update_file_removed_cnt_to_stats(&self.stats);
|
||||
let updated_manifest = self
|
||||
.checkpointer
|
||||
.update_manifest_removed_files(new_manifest)?;
|
||||
self.manifest = Arc::new(updated_manifest);
|
||||
self.checkpointer
|
||||
.maybe_do_checkpoint(self.manifest.as_ref());
|
||||
}
|
||||
|
||||
Ok(version)
|
||||
}
|
||||
@@ -575,6 +598,11 @@ impl RegionManifestManager {
|
||||
self.manifest.clone()
|
||||
}
|
||||
|
||||
/// Retrieves the current [RegionManifest].
|
||||
pub fn staging_manifest(&self) -> Option<Arc<RegionManifest>> {
|
||||
self.staging_manifest.clone()
|
||||
}
|
||||
|
||||
/// Returns total manifest size.
|
||||
pub fn manifest_usage(&self) -> u64 {
|
||||
self.store.total_manifest_size()
|
||||
@@ -711,6 +739,22 @@ impl RegionManifestManager {
|
||||
|
||||
Ok(Some(RegionMetaActionList::new(merged_actions)))
|
||||
}
|
||||
|
||||
/// Unsets the staging manifest.
|
||||
pub(crate) fn unset_staging_manifest(&mut self) {
|
||||
self.staging_manifest = None;
|
||||
}
|
||||
|
||||
/// Clear all staging manifests.
|
||||
pub(crate) async fn clear_staging_manifest_and_dir(&mut self) -> Result<()> {
|
||||
self.staging_manifest = None;
|
||||
self.store.clear_staging_manifests().await?;
|
||||
info!(
|
||||
"Cleared all staging manifests for region {}",
|
||||
self.manifest.metadata.region_id
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -837,13 +881,7 @@ mod test {
|
||||
sst_format: FormatType::PrimaryKey,
|
||||
}));
|
||||
|
||||
let current_version = manager
|
||||
.update(
|
||||
action_list,
|
||||
RegionRoleState::Leader(RegionLeaderState::Writable),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let current_version = manager.update(action_list, false).await.unwrap();
|
||||
assert_eq!(current_version, 1);
|
||||
manager.validate_manifest(&new_metadata, 1);
|
||||
|
||||
@@ -906,13 +944,7 @@ mod test {
|
||||
sst_format: FormatType::PrimaryKey,
|
||||
}));
|
||||
|
||||
let current_version = manager
|
||||
.update(
|
||||
action_list,
|
||||
RegionRoleState::Leader(RegionLeaderState::Writable),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let current_version = manager.update(action_list, false).await.unwrap();
|
||||
assert_eq!(current_version, 1);
|
||||
manager.validate_manifest(&new_metadata, 1);
|
||||
|
||||
@@ -933,7 +965,7 @@ mod test {
|
||||
flushed_sequence: None,
|
||||
committed_sequence: None,
|
||||
})]),
|
||||
RegionRoleState::Leader(RegionLeaderState::Writable),
|
||||
false,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@@ -27,7 +27,6 @@ use crate::manifest::action::{
|
||||
use crate::manifest::manager::RegionManifestManager;
|
||||
use crate::manifest::storage::CheckpointMetadata;
|
||||
use crate::manifest::tests::utils::basic_region_metadata;
|
||||
use crate::region::{RegionLeaderState, RegionRoleState};
|
||||
use crate::sst::file::FileMeta;
|
||||
use crate::test_util::TestEnv;
|
||||
|
||||
@@ -87,13 +86,7 @@ async fn manager_without_checkpoint() {
|
||||
|
||||
// apply 10 actions
|
||||
for _ in 0..10 {
|
||||
manager
|
||||
.update(
|
||||
nop_action(),
|
||||
RegionRoleState::Leader(RegionLeaderState::Writable),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
manager.update(nop_action(), false).await.unwrap();
|
||||
}
|
||||
|
||||
// no checkpoint
|
||||
@@ -138,13 +131,7 @@ async fn manager_with_checkpoint_distance_1() {
|
||||
|
||||
// apply 10 actions
|
||||
for _ in 0..10 {
|
||||
manager
|
||||
.update(
|
||||
nop_action(),
|
||||
RegionRoleState::Leader(RegionLeaderState::Writable),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
manager.update(nop_action(), false).await.unwrap();
|
||||
|
||||
while manager.checkpointer().is_doing_checkpoint() {
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
@@ -205,13 +192,7 @@ async fn test_corrupted_data_causing_checksum_error() {
|
||||
|
||||
// Apply actions
|
||||
for _ in 0..10 {
|
||||
manager
|
||||
.update(
|
||||
nop_action(),
|
||||
RegionRoleState::Leader(RegionLeaderState::Writable),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
manager.update(nop_action(), false).await.unwrap();
|
||||
}
|
||||
|
||||
// Wait for the checkpoint to finish.
|
||||
@@ -302,10 +283,7 @@ async fn generate_checkpoint_with_compression_types(
|
||||
let (_env, mut manager) = build_manager(1, compress_type).await;
|
||||
|
||||
for action in actions {
|
||||
manager
|
||||
.update(action, RegionRoleState::Leader(RegionLeaderState::Writable))
|
||||
.await
|
||||
.unwrap();
|
||||
manager.update(action, false).await.unwrap();
|
||||
|
||||
while manager.checkpointer().is_doing_checkpoint() {
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
@@ -361,10 +339,7 @@ async fn manifest_install_manifest_to() {
|
||||
let (env, mut manager) = build_manager(0, CompressionType::Uncompressed).await;
|
||||
let (files, actions) = generate_action_lists(10);
|
||||
for action in actions {
|
||||
manager
|
||||
.update(action, RegionRoleState::Leader(RegionLeaderState::Writable))
|
||||
.await
|
||||
.unwrap();
|
||||
manager.update(action, false).await.unwrap();
|
||||
}
|
||||
|
||||
// Nothing to install
|
||||
@@ -402,10 +377,7 @@ async fn manifest_install_manifest_to_with_checkpoint() {
|
||||
let (env, mut manager) = build_manager(3, CompressionType::Uncompressed).await;
|
||||
let (files, actions) = generate_action_lists(10);
|
||||
for action in actions {
|
||||
manager
|
||||
.update(action, RegionRoleState::Leader(RegionLeaderState::Writable))
|
||||
.await
|
||||
.unwrap();
|
||||
manager.update(action, false).await.unwrap();
|
||||
|
||||
while manager.checkpointer().is_doing_checkpoint() {
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
@@ -477,13 +449,7 @@ async fn test_checkpoint_bypass_in_staging_mode() {
|
||||
|
||||
// Apply actions in staging mode - checkpoint should be bypassed
|
||||
for _ in 0..15 {
|
||||
manager
|
||||
.update(
|
||||
nop_action(),
|
||||
RegionRoleState::Leader(RegionLeaderState::Staging),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
manager.update(nop_action(), true).await.unwrap();
|
||||
}
|
||||
assert!(!manager.checkpointer().is_doing_checkpoint());
|
||||
|
||||
@@ -498,13 +464,7 @@ async fn test_checkpoint_bypass_in_staging_mode() {
|
||||
);
|
||||
|
||||
// Now switch to normal mode and apply one more action
|
||||
manager
|
||||
.update(
|
||||
nop_action(),
|
||||
RegionRoleState::Leader(RegionLeaderState::Writable),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
manager.update(nop_action(), false).await.unwrap();
|
||||
|
||||
// Wait for potential checkpoint
|
||||
while manager.checkpointer().is_doing_checkpoint() {
|
||||
|
||||
@@ -22,7 +22,7 @@ pub(crate) mod version;
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::sync::{Arc, Mutex, RwLock};
|
||||
|
||||
use common_telemetry::{error, info, warn};
|
||||
use crossbeam_utils::atomic::AtomicCell;
|
||||
@@ -77,6 +77,8 @@ pub enum RegionLeaderState {
|
||||
Writable,
|
||||
/// The region is in staging mode - writable but no checkpoint/compaction.
|
||||
Staging,
|
||||
/// The region is entering staging mode. - write requests will be stalled.
|
||||
EnteringStaging,
|
||||
/// The region is altering.
|
||||
Altering,
|
||||
/// The region is dropping.
|
||||
@@ -138,6 +140,14 @@ pub struct MitoRegion {
|
||||
pub(crate) topic_latest_entry_id: AtomicU64,
|
||||
/// The total bytes written to the region.
|
||||
pub(crate) written_bytes: Arc<AtomicU64>,
|
||||
/// The partition expression of the region in staging mode.
|
||||
///
|
||||
/// During the staging mode, the region metadata in [`VersionControlRef`] is not updated,
|
||||
/// so we need to store the partition expression separately.
|
||||
/// TODO(weny):
|
||||
/// 1. Reload the staging partition expr during region open.
|
||||
/// 2. Rejects requests with mismatching partition expr.
|
||||
pub(crate) staging_partition_expr: Mutex<Option<String>>,
|
||||
/// manifest stats
|
||||
stats: ManifestStats,
|
||||
}
|
||||
@@ -326,11 +336,19 @@ impl MitoRegion {
|
||||
)
|
||||
}
|
||||
|
||||
/// Sets the entering staging state.
|
||||
pub(crate) fn set_entering_staging(&self) -> Result<()> {
|
||||
self.compare_exchange_state(
|
||||
RegionLeaderState::Writable,
|
||||
RegionRoleState::Leader(RegionLeaderState::EnteringStaging),
|
||||
)
|
||||
}
|
||||
|
||||
/// Exits the staging state back to writable.
|
||||
///
|
||||
/// You should call this method in the worker loop.
|
||||
/// Transitions from Staging to Writable state.
|
||||
fn exit_staging(&self) -> Result<()> {
|
||||
pub fn exit_staging(&self) -> Result<()> {
|
||||
self.compare_exchange_state(
|
||||
RegionLeaderState::Staging,
|
||||
RegionRoleState::Leader(RegionLeaderState::Writable),
|
||||
@@ -457,10 +475,7 @@ impl MitoRegion {
|
||||
sst_format: current_version.options.sst_format.unwrap_or_default(),
|
||||
});
|
||||
let result = manager
|
||||
.update(
|
||||
RegionMetaActionList::with_action(action),
|
||||
RegionRoleState::Leader(RegionLeaderState::Writable),
|
||||
)
|
||||
.update(RegionMetaActionList::with_action(action), false)
|
||||
.await;
|
||||
|
||||
match result {
|
||||
@@ -492,6 +507,16 @@ impl MitoRegion {
|
||||
}
|
||||
}
|
||||
|
||||
/// Switches the region state to `RegionRoleState::Leader(RegionLeaderState::Staging)` if the current state is `expect`.
|
||||
/// Otherwise, logs an error.
|
||||
pub(crate) fn switch_state_to_staging(&self, expect: RegionLeaderState) {
|
||||
if let Err(e) =
|
||||
self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Staging))
|
||||
{
|
||||
error!(e; "failed to switch region state to staging, expect state is {:?}", expect);
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the region statistic.
|
||||
pub(crate) fn region_statistic(&self) -> RegionStatistic {
|
||||
let version = self.version();
|
||||
@@ -575,10 +600,19 @@ impl MitoRegion {
|
||||
.flat_map(|level| level.files().map(|file| file.file_id().file_id()))
|
||||
.collect::<HashSet<_>>();
|
||||
|
||||
self.manifest_ctx
|
||||
.manifest()
|
||||
let manifest_files = self.manifest_ctx.manifest().await.files.clone();
|
||||
let staging_files = self
|
||||
.manifest_ctx
|
||||
.staging_manifest()
|
||||
.await
|
||||
.files
|
||||
.map(|m| m.files.clone())
|
||||
.unwrap_or_default();
|
||||
let files = manifest_files
|
||||
.into_iter()
|
||||
.chain(staging_files.into_iter())
|
||||
.collect::<HashMap<_, _>>();
|
||||
|
||||
files
|
||||
.values()
|
||||
.map(|meta| {
|
||||
let region_id = self.region_id;
|
||||
@@ -654,9 +688,8 @@ impl MitoRegion {
|
||||
};
|
||||
|
||||
// Submit merged actions using the manifest manager's update method
|
||||
// Pass the target state (Writable) so it saves to normal directory, not staging
|
||||
let target_state = RegionRoleState::Leader(RegionLeaderState::Writable);
|
||||
let new_version = manager.update(merged_actions.clone(), target_state).await?;
|
||||
// Pass the `false` so it saves to normal directory, not staging
|
||||
let new_version = manager.update(merged_actions.clone(), false).await?;
|
||||
|
||||
info!(
|
||||
"Successfully submitted merged staged manifests for region {}, new version: {}",
|
||||
@@ -731,6 +764,7 @@ impl ManifestContext {
|
||||
&self,
|
||||
expect_state: RegionLeaderState,
|
||||
action_list: RegionMetaActionList,
|
||||
is_staging: bool,
|
||||
) -> Result<ManifestVersion> {
|
||||
// Acquires the write lock of the manifest manager.
|
||||
let mut manager = self.manifest_manager.write().await;
|
||||
@@ -806,7 +840,7 @@ impl ManifestContext {
|
||||
}
|
||||
|
||||
// Now we can update the manifest.
|
||||
let version = manager.update(action_list, current_state).await.inspect_err(
|
||||
let version = manager.update(action_list, is_staging).await.inspect_err(
|
||||
|e| error!(e; "Failed to update manifest, region_id: {}", manifest.metadata.region_id),
|
||||
)?;
|
||||
|
||||
@@ -913,9 +947,17 @@ impl ManifestContext {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the normal manifest of the region.
|
||||
pub(crate) async fn manifest(&self) -> Arc<crate::manifest::action::RegionManifest> {
|
||||
self.manifest_manager.read().await.manifest()
|
||||
}
|
||||
|
||||
/// Returns the staging manifest of the region.
|
||||
pub(crate) async fn staging_manifest(
|
||||
&self,
|
||||
) -> Option<Arc<crate::manifest::action::RegionManifest>> {
|
||||
self.manifest_manager.read().await.staging_manifest()
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) type ManifestContextRef = Arc<ManifestContext>;
|
||||
@@ -1213,8 +1255,8 @@ impl ManifestStats {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use common_datasource::compression::CompressionType;
|
||||
use common_test_util::temp_dir::create_temp_dir;
|
||||
@@ -1404,6 +1446,7 @@ mod tests {
|
||||
topic_latest_entry_id: Default::default(),
|
||||
written_bytes: Arc::new(AtomicU64::new(0)),
|
||||
stats: ManifestStats::default(),
|
||||
staging_partition_expr: Mutex::new(None),
|
||||
};
|
||||
|
||||
// Test initial state
|
||||
|
||||
@@ -16,8 +16,8 @@
|
||||
|
||||
use std::any::TypeId;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicI64, AtomicU64};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Instant;
|
||||
|
||||
use common_telemetry::{debug, error, info, warn};
|
||||
@@ -334,6 +334,7 @@ impl RegionOpener {
|
||||
topic_latest_entry_id: AtomicU64::new(0),
|
||||
written_bytes: Arc::new(AtomicU64::new(0)),
|
||||
stats: self.stats,
|
||||
staging_partition_expr: Mutex::new(None),
|
||||
}))
|
||||
}
|
||||
|
||||
@@ -563,6 +564,8 @@ impl RegionOpener {
|
||||
topic_latest_entry_id: AtomicU64::new(topic_latest_entry_id),
|
||||
written_bytes: Arc::new(AtomicU64::new(0)),
|
||||
stats: self.stats.clone(),
|
||||
// TODO(weny): reload the staging partition expr from the manifest.
|
||||
staging_partition_expr: Mutex::new(None),
|
||||
};
|
||||
|
||||
let region = Arc::new(region);
|
||||
@@ -973,6 +976,7 @@ fn can_load_cache(state: RegionRoleState) -> bool {
|
||||
RegionRoleState::Leader(RegionLeaderState::Writable)
|
||||
| RegionRoleState::Leader(RegionLeaderState::Staging)
|
||||
| RegionRoleState::Leader(RegionLeaderState::Altering)
|
||||
| RegionRoleState::Leader(RegionLeaderState::EnteringStaging)
|
||||
| RegionRoleState::Leader(RegionLeaderState::Editing)
|
||||
| RegionRoleState::Follower => true,
|
||||
// The region will be closed soon if it is downgrading.
|
||||
|
||||
@@ -35,9 +35,10 @@ use store_api::codec::{PrimaryKeyEncoding, infer_primary_key_encoding_from_hint}
|
||||
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
|
||||
use store_api::region_engine::{SetRegionRoleStateResponse, SettableRegionRoleState};
|
||||
use store_api::region_request::{
|
||||
AffectedRows, RegionAlterRequest, RegionBuildIndexRequest, RegionBulkInsertsRequest,
|
||||
RegionCatchupRequest, RegionCloseRequest, RegionCompactRequest, RegionCreateRequest,
|
||||
RegionFlushRequest, RegionOpenRequest, RegionRequest, RegionTruncateRequest,
|
||||
AffectedRows, EnterStagingRequest, RegionAlterRequest, RegionBuildIndexRequest,
|
||||
RegionBulkInsertsRequest, RegionCatchupRequest, RegionCloseRequest, RegionCompactRequest,
|
||||
RegionCreateRequest, RegionFlushRequest, RegionOpenRequest, RegionRequest,
|
||||
RegionTruncateRequest,
|
||||
};
|
||||
use store_api::storage::{FileId, RegionId};
|
||||
use tokio::sync::oneshot::{self, Receiver, Sender};
|
||||
@@ -725,6 +726,11 @@ impl WorkerRequest {
|
||||
sender: sender.into(),
|
||||
request: DdlRequest::Catchup((v, None)),
|
||||
}),
|
||||
RegionRequest::EnterStaging(v) => WorkerRequest::Ddl(SenderDdlRequest {
|
||||
region_id,
|
||||
sender: sender.into(),
|
||||
request: DdlRequest::EnterStaging(v),
|
||||
}),
|
||||
RegionRequest::BulkInserts(region_bulk_inserts_request) => WorkerRequest::BulkInserts {
|
||||
metadata: region_metadata,
|
||||
sender: sender.into(),
|
||||
@@ -822,6 +828,7 @@ pub(crate) enum DdlRequest {
|
||||
BuildIndex(RegionBuildIndexRequest),
|
||||
Truncate(RegionTruncateRequest),
|
||||
Catchup((RegionCatchupRequest, Option<WalEntryReceiver>)),
|
||||
EnterStaging(EnterStagingRequest),
|
||||
}
|
||||
|
||||
/// Sender and Ddl request.
|
||||
@@ -858,6 +865,8 @@ pub(crate) enum BackgroundNotify {
|
||||
RegionChange(RegionChangeResult),
|
||||
/// Region edit result.
|
||||
RegionEdit(RegionEditResult),
|
||||
/// Enter staging result.
|
||||
EnterStaging(EnterStagingResult),
|
||||
}
|
||||
|
||||
/// Notifies a flush job is finished.
|
||||
@@ -875,6 +884,8 @@ pub(crate) struct FlushFinished {
|
||||
pub(crate) edit: RegionEdit,
|
||||
/// Memtables to remove.
|
||||
pub(crate) memtables_to_remove: SmallVec<[MemtableId; 2]>,
|
||||
/// Whether the region is in staging mode.
|
||||
pub(crate) is_staging: bool,
|
||||
}
|
||||
|
||||
impl FlushFinished {
|
||||
@@ -999,6 +1010,19 @@ pub(crate) struct RegionChangeResult {
|
||||
pub(crate) new_options: Option<RegionOptions>,
|
||||
}
|
||||
|
||||
/// Notifies the region the result of entering staging.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct EnterStagingResult {
|
||||
/// Region id.
|
||||
pub(crate) region_id: RegionId,
|
||||
/// The new partition expression to apply.
|
||||
pub(crate) partition_expr: String,
|
||||
/// Result sender.
|
||||
pub(crate) sender: OptionOutputTx,
|
||||
/// Result from the manifest manager.
|
||||
pub(crate) result: Result<()>,
|
||||
}
|
||||
|
||||
/// Request to edit a region directly.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct RegionEditRequest {
|
||||
|
||||
@@ -776,6 +776,7 @@ impl IndexBuildTask {
|
||||
.update_manifest(
|
||||
RegionLeaderState::Writable,
|
||||
RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone())),
|
||||
false,
|
||||
)
|
||||
.await?;
|
||||
info!(
|
||||
|
||||
@@ -39,7 +39,7 @@ use common_meta::cache::{new_schema_cache, new_table_schema_cache};
|
||||
use common_meta::key::{SchemaMetadataManager, SchemaMetadataManagerRef};
|
||||
use common_meta::kv_backend::KvBackendRef;
|
||||
use common_meta::kv_backend::memory::MemoryKvBackend;
|
||||
use common_telemetry::warn;
|
||||
use common_telemetry::{debug, warn};
|
||||
use common_test_util::temp_dir::{TempDir, create_temp_dir};
|
||||
use common_wal::options::{KafkaWalOptions, WAL_OPTIONS_KEY, WalOptions};
|
||||
use datatypes::arrow::array::{TimestampMillisecondArray, UInt8Array, UInt64Array};
|
||||
@@ -50,6 +50,7 @@ use log_store::raft_engine::log_store::RaftEngineLogStore;
|
||||
use log_store::test_util::log_store_util;
|
||||
use moka::future::CacheBuilder;
|
||||
use object_store::ObjectStore;
|
||||
use object_store::layers::mock::MockLayer;
|
||||
use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef};
|
||||
use object_store::services::Fs;
|
||||
use rskafka::client::partition::{Compression, UnknownTopicHandling};
|
||||
@@ -228,6 +229,7 @@ pub struct TestEnv {
|
||||
file_ref_manager: FileReferenceManagerRef,
|
||||
kv_backend: KvBackendRef,
|
||||
partition_expr_fetcher: PartitionExprFetcherRef,
|
||||
object_store_mock_layer: Option<MockLayer>,
|
||||
}
|
||||
|
||||
impl TestEnv {
|
||||
@@ -264,6 +266,7 @@ impl TestEnv {
|
||||
file_ref_manager: Arc::new(FileReferenceManager::new(None)),
|
||||
kv_backend,
|
||||
partition_expr_fetcher: noop_partition_expr_fetcher(),
|
||||
object_store_mock_layer: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -273,6 +276,12 @@ impl TestEnv {
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets the original `object_store_mock_layer`.
|
||||
pub fn with_mock_layer(mut self, mock_layer: MockLayer) -> TestEnv {
|
||||
self.object_store_mock_layer = Some(mock_layer);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn get_object_store(&self) -> Option<ObjectStore> {
|
||||
self.object_store_manager
|
||||
.as_ref()
|
||||
@@ -569,7 +578,16 @@ impl TestEnv {
|
||||
let data_home = self.data_home.path();
|
||||
let data_path = data_home.join("data").as_path().display().to_string();
|
||||
let builder = Fs::default().root(&data_path);
|
||||
let object_store = ObjectStore::new(builder).unwrap().finish();
|
||||
|
||||
let object_store = if let Some(mock_layer) = self.object_store_mock_layer.as_ref() {
|
||||
debug!("create object store with mock layer");
|
||||
ObjectStore::new(builder)
|
||||
.unwrap()
|
||||
.layer(mock_layer.clone())
|
||||
.finish()
|
||||
} else {
|
||||
ObjectStore::new(builder).unwrap().finish()
|
||||
};
|
||||
ObjectStoreManager::new("default", object_store)
|
||||
}
|
||||
|
||||
|
||||
@@ -21,6 +21,7 @@ mod handle_close;
|
||||
mod handle_compaction;
|
||||
mod handle_create;
|
||||
mod handle_drop;
|
||||
mod handle_enter_staging;
|
||||
mod handle_flush;
|
||||
mod handle_manifest;
|
||||
mod handle_open;
|
||||
@@ -1039,8 +1040,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
continue;
|
||||
}
|
||||
DdlRequest::Flush(req) => {
|
||||
self.handle_flush_request(ddl.region_id, req, ddl.sender)
|
||||
.await;
|
||||
self.handle_flush_request(ddl.region_id, req, ddl.sender);
|
||||
continue;
|
||||
}
|
||||
DdlRequest::Compact(req) => {
|
||||
@@ -1063,6 +1063,15 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
.await;
|
||||
continue;
|
||||
}
|
||||
DdlRequest::EnterStaging(req) => {
|
||||
self.handle_enter_staging_request(
|
||||
ddl.region_id,
|
||||
req.partition_expr,
|
||||
ddl.sender,
|
||||
)
|
||||
.await;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
ddl.sender.send(res);
|
||||
@@ -1111,6 +1120,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
BackgroundNotify::RegionChange(req) => {
|
||||
self.handle_manifest_region_change_result(req).await
|
||||
}
|
||||
BackgroundNotify::EnterStaging(req) => self.handle_enter_staging_result(req).await,
|
||||
BackgroundNotify::RegionEdit(req) => self.handle_region_edit_result(req).await,
|
||||
}
|
||||
}
|
||||
@@ -1272,6 +1282,13 @@ impl WorkerListener {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn on_enter_staging_result_begin(&self, _region_id: RegionId) {
|
||||
#[cfg(any(test, feature = "test"))]
|
||||
if let Some(listener) = &self.listener {
|
||||
listener.on_enter_staging_result_begin(_region_id).await;
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn on_index_build_finish(&self, _region_file_id: RegionFileId) {
|
||||
#[cfg(any(test, feature = "test"))]
|
||||
if let Some(listener) = &self.listener {
|
||||
|
||||
@@ -113,7 +113,13 @@ impl<S> RegionWorkerLoop<S> {
|
||||
info!("Flush region: {} before alteration", region_id);
|
||||
|
||||
// Try to submit a flush task.
|
||||
let task = self.new_flush_task(®ion, FlushReason::Alter, None, self.config.clone());
|
||||
let task = self.new_flush_task(
|
||||
®ion,
|
||||
FlushReason::Alter,
|
||||
None,
|
||||
self.config.clone(),
|
||||
region.is_staging(),
|
||||
);
|
||||
if let Err(e) =
|
||||
self.flush_scheduler
|
||||
.schedule_flush(region.region_id, ®ion.version_control, task)
|
||||
|
||||
249
src/mito2/src/worker/handle_enter_staging.rs
Normal file
249
src/mito2/src/worker/handle_enter_staging.rs
Normal file
@@ -0,0 +1,249 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
use common_telemetry::{error, info, warn};
|
||||
use store_api::logstore::LogStore;
|
||||
use store_api::region_request::EnterStagingRequest;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::error::{RegionNotFoundSnafu, Result, StagingPartitionExprMismatchSnafu};
|
||||
use crate::flush::FlushReason;
|
||||
use crate::manifest::action::{RegionChange, RegionMetaAction, RegionMetaActionList};
|
||||
use crate::region::{MitoRegionRef, RegionLeaderState};
|
||||
use crate::request::{
|
||||
BackgroundNotify, DdlRequest, EnterStagingResult, OptionOutputTx, SenderDdlRequest,
|
||||
WorkerRequest, WorkerRequestWithTime,
|
||||
};
|
||||
use crate::worker::RegionWorkerLoop;
|
||||
|
||||
impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
pub(crate) async fn handle_enter_staging_request(
|
||||
&mut self,
|
||||
region_id: RegionId,
|
||||
partition_expr: String,
|
||||
mut sender: OptionOutputTx,
|
||||
) {
|
||||
let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else {
|
||||
return;
|
||||
};
|
||||
|
||||
// If the region is already in staging mode, verify the partition expr matches.
|
||||
if region.is_staging() {
|
||||
let staging_partition_expr = region.staging_partition_expr.lock().unwrap().clone();
|
||||
// If the partition expr mismatch, return error.
|
||||
if staging_partition_expr.as_ref() != Some(&partition_expr) {
|
||||
sender.send(Err(StagingPartitionExprMismatchSnafu {
|
||||
manifest_expr: staging_partition_expr,
|
||||
request_expr: partition_expr,
|
||||
}
|
||||
.build()));
|
||||
return;
|
||||
}
|
||||
|
||||
// If the partition expr matches, return success.
|
||||
sender.send(Ok(0));
|
||||
return;
|
||||
}
|
||||
|
||||
let version = region.version();
|
||||
if !version.memtables.is_empty() {
|
||||
// If memtable is not empty, we can't enter staging directly and need to flush
|
||||
// all memtables first.
|
||||
info!("Flush region: {} before entering staging", region_id);
|
||||
debug_assert!(!region.is_staging());
|
||||
let task = self.new_flush_task(
|
||||
®ion,
|
||||
FlushReason::EnterStaging,
|
||||
None,
|
||||
self.config.clone(),
|
||||
region.is_staging(),
|
||||
);
|
||||
if let Err(e) =
|
||||
self.flush_scheduler
|
||||
.schedule_flush(region.region_id, ®ion.version_control, task)
|
||||
{
|
||||
// Unable to flush the region, send error to waiter.
|
||||
sender.send(Err(e));
|
||||
return;
|
||||
}
|
||||
|
||||
// Safety: We have requested flush.
|
||||
self.flush_scheduler
|
||||
.add_ddl_request_to_pending(SenderDdlRequest {
|
||||
region_id,
|
||||
sender,
|
||||
request: DdlRequest::EnterStaging(EnterStagingRequest { partition_expr }),
|
||||
});
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
self.handle_enter_staging(region, partition_expr, sender);
|
||||
}
|
||||
|
||||
async fn enter_staging(region: &MitoRegionRef, partition_expr: String) -> Result<()> {
|
||||
let now = Instant::now();
|
||||
// First step: clear all staging manifest files.
|
||||
{
|
||||
let mut manager = region.manifest_ctx.manifest_manager.write().await;
|
||||
manager
|
||||
.clear_staging_manifest_and_dir()
|
||||
.await
|
||||
.inspect_err(|e| {
|
||||
error!(
|
||||
e;
|
||||
"Failed to clear staging manifest files for region {}",
|
||||
region.region_id
|
||||
);
|
||||
})?;
|
||||
|
||||
info!(
|
||||
"Cleared all staging manifest files for region {}, elapsed: {:?}",
|
||||
region.region_id,
|
||||
now.elapsed(),
|
||||
);
|
||||
}
|
||||
|
||||
// Second step: write new staging manifest.
|
||||
let mut new_meta = (*region.metadata()).clone();
|
||||
new_meta.partition_expr = Some(partition_expr.clone());
|
||||
let sst_format = region.version().options.sst_format.unwrap_or_default();
|
||||
let change = RegionChange {
|
||||
metadata: Arc::new(new_meta),
|
||||
sst_format,
|
||||
};
|
||||
let action_list = RegionMetaActionList::with_action(RegionMetaAction::Change(change));
|
||||
region
|
||||
.manifest_ctx
|
||||
.update_manifest(RegionLeaderState::EnteringStaging, action_list, true)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_enter_staging(
|
||||
&self,
|
||||
region: MitoRegionRef,
|
||||
partition_expr: String,
|
||||
sender: OptionOutputTx,
|
||||
) {
|
||||
if let Err(e) = region.set_entering_staging() {
|
||||
sender.send(Err(e));
|
||||
return;
|
||||
}
|
||||
|
||||
let listener = self.listener.clone();
|
||||
let request_sender = self.sender.clone();
|
||||
common_runtime::spawn_global(async move {
|
||||
let now = Instant::now();
|
||||
let result = Self::enter_staging(®ion, partition_expr.clone()).await;
|
||||
match result {
|
||||
Ok(_) => {
|
||||
info!(
|
||||
"Created staging manifest for region {}, elapsed: {:?}",
|
||||
region.region_id,
|
||||
now.elapsed(),
|
||||
);
|
||||
}
|
||||
Err(ref e) => {
|
||||
// Unset the staging manifest
|
||||
region
|
||||
.manifest_ctx
|
||||
.manifest_manager
|
||||
.write()
|
||||
.await
|
||||
.unset_staging_manifest();
|
||||
error!(
|
||||
"Failed to create staging manifest for region {}: {:?}, elapsed: {:?}",
|
||||
region.region_id,
|
||||
e,
|
||||
now.elapsed(),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
let notify = WorkerRequest::Background {
|
||||
region_id: region.region_id,
|
||||
notify: BackgroundNotify::EnterStaging(EnterStagingResult {
|
||||
region_id: region.region_id,
|
||||
sender,
|
||||
result,
|
||||
partition_expr,
|
||||
}),
|
||||
};
|
||||
listener
|
||||
.on_enter_staging_result_begin(region.region_id)
|
||||
.await;
|
||||
|
||||
if let Err(res) = request_sender
|
||||
.send(WorkerRequestWithTime::new(notify))
|
||||
.await
|
||||
{
|
||||
warn!(
|
||||
"Failed to send enter staging result back to the worker, region_id: {}, res: {:?}",
|
||||
region.region_id, res
|
||||
);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// Handles enter staging result.
|
||||
pub(crate) async fn handle_enter_staging_result(
|
||||
&mut self,
|
||||
enter_staging_result: EnterStagingResult,
|
||||
) {
|
||||
let region = match self.regions.get_region(enter_staging_result.region_id) {
|
||||
Some(region) => region,
|
||||
None => {
|
||||
self.reject_region_stalled_requests(&enter_staging_result.region_id);
|
||||
enter_staging_result.sender.send(
|
||||
RegionNotFoundSnafu {
|
||||
region_id: enter_staging_result.region_id,
|
||||
}
|
||||
.fail(),
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
if enter_staging_result.result.is_ok() {
|
||||
info!(
|
||||
"Updating region {} staging partition expr to {}",
|
||||
region.region_id, enter_staging_result.partition_expr
|
||||
);
|
||||
Self::update_region_staging_partition_expr(
|
||||
®ion,
|
||||
enter_staging_result.partition_expr,
|
||||
);
|
||||
region.switch_state_to_staging(RegionLeaderState::EnteringStaging);
|
||||
} else {
|
||||
region.switch_state_to_writable(RegionLeaderState::EnteringStaging);
|
||||
}
|
||||
enter_staging_result
|
||||
.sender
|
||||
.send(enter_staging_result.result.map(|_| 0));
|
||||
// Handles the stalled requests.
|
||||
self.handle_region_stalled_requests(&enter_staging_result.region_id)
|
||||
.await;
|
||||
}
|
||||
|
||||
fn update_region_staging_partition_expr(region: &MitoRegionRef, partition_expr: String) {
|
||||
let mut staging_partition_expr = region.staging_partition_expr.lock().unwrap();
|
||||
debug_assert!(staging_partition_expr.is_none());
|
||||
*staging_partition_expr = Some(partition_expr);
|
||||
}
|
||||
}
|
||||
@@ -76,8 +76,13 @@ impl<S> RegionWorkerLoop<S> {
|
||||
|
||||
if region.last_flush_millis() < min_last_flush_time {
|
||||
// If flush time of this region is earlier than `min_last_flush_time`, we can flush this region.
|
||||
let task =
|
||||
self.new_flush_task(region, FlushReason::EngineFull, None, self.config.clone());
|
||||
let task = self.new_flush_task(
|
||||
region,
|
||||
FlushReason::EngineFull,
|
||||
None,
|
||||
self.config.clone(),
|
||||
region.is_staging(),
|
||||
);
|
||||
self.flush_scheduler.schedule_flush(
|
||||
region.region_id,
|
||||
®ion.version_control,
|
||||
@@ -91,8 +96,13 @@ impl<S> RegionWorkerLoop<S> {
|
||||
if let Some(region) = max_mem_region
|
||||
&& !self.flush_scheduler.is_flush_requested(region.region_id)
|
||||
{
|
||||
let task =
|
||||
self.new_flush_task(region, FlushReason::EngineFull, None, self.config.clone());
|
||||
let task = self.new_flush_task(
|
||||
region,
|
||||
FlushReason::EngineFull,
|
||||
None,
|
||||
self.config.clone(),
|
||||
region.is_staging(),
|
||||
);
|
||||
self.flush_scheduler
|
||||
.schedule_flush(region.region_id, ®ion.version_control, task)?;
|
||||
}
|
||||
@@ -107,6 +117,7 @@ impl<S> RegionWorkerLoop<S> {
|
||||
reason: FlushReason,
|
||||
row_group_size: Option<usize>,
|
||||
engine_config: Arc<MitoConfig>,
|
||||
is_staging: bool,
|
||||
) -> RegionFlushTask {
|
||||
RegionFlushTask {
|
||||
region_id: region.region_id,
|
||||
@@ -121,13 +132,14 @@ impl<S> RegionWorkerLoop<S> {
|
||||
manifest_ctx: region.manifest_ctx.clone(),
|
||||
index_options: region.version().options.index_options.clone(),
|
||||
flush_semaphore: self.flush_semaphore.clone(),
|
||||
is_staging,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
/// Handles manual flush request.
|
||||
pub(crate) async fn handle_flush_request(
|
||||
pub(crate) fn handle_flush_request(
|
||||
&mut self,
|
||||
region_id: RegionId,
|
||||
request: RegionFlushRequest,
|
||||
@@ -147,8 +159,13 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
FlushReason::Manual
|
||||
};
|
||||
|
||||
let mut task =
|
||||
self.new_flush_task(®ion, reason, request.row_group_size, self.config.clone());
|
||||
let mut task = self.new_flush_task(
|
||||
®ion,
|
||||
reason,
|
||||
request.row_group_size,
|
||||
self.config.clone(),
|
||||
region.is_staging(),
|
||||
);
|
||||
task.push_sender(sender);
|
||||
if let Err(e) =
|
||||
self.flush_scheduler
|
||||
@@ -178,6 +195,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
FlushReason::Periodically,
|
||||
None,
|
||||
self.config.clone(),
|
||||
region.is_staging(),
|
||||
);
|
||||
self.flush_scheduler.schedule_flush(
|
||||
region.region_id,
|
||||
@@ -208,11 +226,8 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
}
|
||||
};
|
||||
|
||||
// Check if region is currently in staging mode
|
||||
let is_staging = region.manifest_ctx.current_state()
|
||||
== crate::region::RegionRoleState::Leader(crate::region::RegionLeaderState::Staging);
|
||||
|
||||
if is_staging {
|
||||
if request.is_staging {
|
||||
// Skip the region metadata update.
|
||||
info!(
|
||||
"Skipping region metadata update for region {} in staging mode",
|
||||
region_id
|
||||
|
||||
@@ -346,6 +346,7 @@ impl<S> RegionWorkerLoop<S> {
|
||||
|
||||
let request_sender = self.sender.clone();
|
||||
let manifest_ctx = region.manifest_ctx.clone();
|
||||
let is_staging = region.is_staging();
|
||||
|
||||
// Updates manifest in background.
|
||||
common_runtime::spawn_global(async move {
|
||||
@@ -354,7 +355,7 @@ impl<S> RegionWorkerLoop<S> {
|
||||
RegionMetaActionList::with_action(RegionMetaAction::Truncate(truncate.clone()));
|
||||
|
||||
let result = manifest_ctx
|
||||
.update_manifest(RegionLeaderState::Truncating, action_list)
|
||||
.update_manifest(RegionLeaderState::Truncating, action_list, is_staging)
|
||||
.await
|
||||
.map(|_| ());
|
||||
|
||||
@@ -391,6 +392,7 @@ impl<S> RegionWorkerLoop<S> {
|
||||
}
|
||||
let listener = self.listener.clone();
|
||||
let request_sender = self.sender.clone();
|
||||
let is_staging = region.is_staging();
|
||||
// Now the region is in altering state.
|
||||
common_runtime::spawn_global(async move {
|
||||
let new_meta = change.metadata.clone();
|
||||
@@ -398,7 +400,7 @@ impl<S> RegionWorkerLoop<S> {
|
||||
|
||||
let result = region
|
||||
.manifest_ctx
|
||||
.update_manifest(RegionLeaderState::Altering, action_list)
|
||||
.update_manifest(RegionLeaderState::Altering, action_list, is_staging)
|
||||
.await
|
||||
.map(|_| ());
|
||||
let notify = WorkerRequest::Background {
|
||||
@@ -463,6 +465,7 @@ async fn edit_region(
|
||||
listener: WorkerListener,
|
||||
) -> Result<()> {
|
||||
let region_id = region.region_id;
|
||||
let is_staging = region.is_staging();
|
||||
if let Some(write_cache) = cache_manager.write_cache() {
|
||||
for file_meta in &edit.files_to_add {
|
||||
let write_cache = write_cache.clone();
|
||||
@@ -532,7 +535,7 @@ async fn edit_region(
|
||||
let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit));
|
||||
region
|
||||
.manifest_ctx
|
||||
.update_manifest(RegionLeaderState::Editing, action_list)
|
||||
.update_manifest(RegionLeaderState::Editing, action_list, is_staging)
|
||||
.await
|
||||
.map(|_| ())
|
||||
}
|
||||
|
||||
@@ -241,6 +241,12 @@ impl<S> RegionWorkerLoop<S> {
|
||||
// No such region.
|
||||
continue;
|
||||
};
|
||||
#[cfg(test)]
|
||||
debug!(
|
||||
"Handling write request for region {}, state: {:?}",
|
||||
region_id,
|
||||
region.state()
|
||||
);
|
||||
match region.state() {
|
||||
RegionRoleState::Leader(RegionLeaderState::Writable)
|
||||
| RegionRoleState::Leader(RegionLeaderState::Staging) => {
|
||||
@@ -263,6 +269,16 @@ impl<S> RegionWorkerLoop<S> {
|
||||
self.stalled_requests.push(sender_req);
|
||||
continue;
|
||||
}
|
||||
RegionRoleState::Leader(RegionLeaderState::EnteringStaging) => {
|
||||
debug!(
|
||||
"Region {} is entering staging, add request to pending writes",
|
||||
region.region_id
|
||||
);
|
||||
self.stalling_count.add(1);
|
||||
WRITE_STALL_TOTAL.inc();
|
||||
self.stalled_requests.push(sender_req);
|
||||
continue;
|
||||
}
|
||||
state => {
|
||||
// The region is not writable.
|
||||
sender_req.sender.send(
|
||||
|
||||
@@ -9,6 +9,7 @@ workspace = true
|
||||
|
||||
[features]
|
||||
services-memory = ["opendal/services-memory"]
|
||||
testing = ["derive_builder"]
|
||||
|
||||
[dependencies]
|
||||
bytes.workspace = true
|
||||
@@ -16,6 +17,7 @@ common-base.workspace = true
|
||||
common-error.workspace = true
|
||||
common-macro.workspace = true
|
||||
common-telemetry.workspace = true
|
||||
derive_builder = { workspace = true, optional = true }
|
||||
futures.workspace = true
|
||||
humantime-serde.workspace = true
|
||||
lazy_static.workspace = true
|
||||
|
||||
@@ -13,6 +13,8 @@
|
||||
// limitations under the License.
|
||||
|
||||
mod lru_cache;
|
||||
#[cfg(feature = "testing")]
|
||||
pub mod mock;
|
||||
|
||||
pub use lru_cache::*;
|
||||
pub use opendal::layers::*;
|
||||
|
||||
217
src/object-store/src/layers/mock.rs
Normal file
217
src/object-store/src/layers/mock.rs
Normal file
@@ -0,0 +1,217 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::fmt::Debug;
|
||||
use std::sync::Arc;
|
||||
|
||||
use derive_builder::Builder;
|
||||
pub use oio::*;
|
||||
pub use opendal::raw::{
|
||||
Access, Layer, LayeredAccess, OpDelete, OpList, OpRead, OpWrite, RpDelete, RpList, RpRead,
|
||||
RpWrite, oio,
|
||||
};
|
||||
pub use opendal::{Buffer, Error, ErrorKind, Metadata, Result};
|
||||
|
||||
pub type MockWriterFactory = Arc<dyn Fn(&str, OpWrite, oio::Writer) -> oio::Writer + Send + Sync>;
|
||||
pub type MockReaderFactory = Arc<dyn Fn(&str, OpRead, oio::Reader) -> oio::Reader + Send + Sync>;
|
||||
pub type MockListerFactory = Arc<dyn Fn(&str, OpList, oio::Lister) -> oio::Lister + Send + Sync>;
|
||||
pub type MockDeleterFactory = Arc<dyn Fn(oio::Deleter) -> oio::Deleter + Send + Sync>;
|
||||
|
||||
#[derive(Builder)]
|
||||
pub struct MockLayer {
|
||||
#[builder(setter(strip_option), default)]
|
||||
writer_factory: Option<MockWriterFactory>,
|
||||
#[builder(setter(strip_option), default)]
|
||||
reader_factory: Option<MockReaderFactory>,
|
||||
#[builder(setter(strip_option), default)]
|
||||
lister_factory: Option<MockListerFactory>,
|
||||
#[builder(setter(strip_option), default)]
|
||||
deleter_factory: Option<MockDeleterFactory>,
|
||||
}
|
||||
|
||||
impl Clone for MockLayer {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
writer_factory: self.writer_factory.clone(),
|
||||
reader_factory: self.reader_factory.clone(),
|
||||
lister_factory: self.lister_factory.clone(),
|
||||
deleter_factory: self.deleter_factory.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<A: Access> Layer<A> for MockLayer {
|
||||
type LayeredAccess = MockAccessor<A>;
|
||||
|
||||
fn layer(&self, inner: A) -> Self::LayeredAccess {
|
||||
MockAccessor {
|
||||
inner,
|
||||
writer_factory: self.writer_factory.clone(),
|
||||
reader_factory: self.reader_factory.clone(),
|
||||
lister_factory: self.lister_factory.clone(),
|
||||
deleter_factory: self.deleter_factory.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct MockAccessor<A> {
|
||||
inner: A,
|
||||
writer_factory: Option<MockWriterFactory>,
|
||||
reader_factory: Option<MockReaderFactory>,
|
||||
lister_factory: Option<MockListerFactory>,
|
||||
deleter_factory: Option<MockDeleterFactory>,
|
||||
}
|
||||
|
||||
impl<A: Debug> Debug for MockAccessor<A> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("MockAccessor")
|
||||
.field("inner", &self.inner)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct MockReader {
|
||||
inner: oio::Reader,
|
||||
}
|
||||
|
||||
impl oio::Read for MockReader {
|
||||
async fn read(&mut self) -> Result<Buffer> {
|
||||
self.inner.read().await
|
||||
}
|
||||
}
|
||||
|
||||
pub struct MockWriter {
|
||||
inner: oio::Writer,
|
||||
}
|
||||
|
||||
impl oio::Write for MockWriter {
|
||||
async fn write(&mut self, bs: Buffer) -> Result<()> {
|
||||
self.inner.write(bs).await
|
||||
}
|
||||
|
||||
async fn close(&mut self) -> Result<Metadata> {
|
||||
self.inner.close().await
|
||||
}
|
||||
|
||||
async fn abort(&mut self) -> Result<()> {
|
||||
self.inner.abort().await
|
||||
}
|
||||
}
|
||||
|
||||
pub struct MockLister {
|
||||
inner: oio::Lister,
|
||||
}
|
||||
|
||||
impl oio::List for MockLister {
|
||||
async fn next(&mut self) -> Result<Option<oio::Entry>> {
|
||||
self.inner.next().await
|
||||
}
|
||||
}
|
||||
|
||||
pub struct MockDeleter {
|
||||
inner: oio::Deleter,
|
||||
}
|
||||
|
||||
impl oio::Delete for MockDeleter {
|
||||
fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
|
||||
self.inner.delete(path, args)
|
||||
}
|
||||
|
||||
async fn flush(&mut self) -> Result<usize> {
|
||||
self.inner.flush().await
|
||||
}
|
||||
}
|
||||
|
||||
impl<A: Access> LayeredAccess for MockAccessor<A> {
|
||||
type Inner = A;
|
||||
type Reader = MockReader;
|
||||
type Writer = MockWriter;
|
||||
type Lister = MockLister;
|
||||
type Deleter = MockDeleter;
|
||||
|
||||
fn inner(&self) -> &Self::Inner {
|
||||
&self.inner
|
||||
}
|
||||
|
||||
async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
|
||||
if let Some(reader_factory) = self.reader_factory.as_ref() {
|
||||
let (rp_read, reader) = self.inner.read(path, args.clone()).await?;
|
||||
let reader = reader_factory(path, args, Box::new(reader));
|
||||
Ok((rp_read, MockReader { inner: reader }))
|
||||
} else {
|
||||
self.inner.read(path, args).await.map(|(rp_read, reader)| {
|
||||
(
|
||||
rp_read,
|
||||
MockReader {
|
||||
inner: Box::new(reader),
|
||||
},
|
||||
)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
|
||||
if let Some(writer_factory) = self.writer_factory.as_ref() {
|
||||
let (rp_write, writer) = self.inner.write(path, args.clone()).await?;
|
||||
let writer = writer_factory(path, args, Box::new(writer));
|
||||
Ok((rp_write, MockWriter { inner: writer }))
|
||||
} else {
|
||||
self.inner
|
||||
.write(path, args)
|
||||
.await
|
||||
.map(|(rp_write, writer)| {
|
||||
(
|
||||
rp_write,
|
||||
MockWriter {
|
||||
inner: Box::new(writer),
|
||||
},
|
||||
)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
|
||||
if let Some(deleter_factory) = self.deleter_factory.as_ref() {
|
||||
let (rp_delete, deleter) = self.inner.delete().await?;
|
||||
let deleter = deleter_factory(Box::new(deleter));
|
||||
Ok((rp_delete, MockDeleter { inner: deleter }))
|
||||
} else {
|
||||
self.inner.delete().await.map(|(rp_delete, deleter)| {
|
||||
(
|
||||
rp_delete,
|
||||
MockDeleter {
|
||||
inner: Box::new(deleter),
|
||||
},
|
||||
)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
|
||||
if let Some(lister_factory) = self.lister_factory.as_ref() {
|
||||
let (rp_list, lister) = self.inner.list(path, args.clone()).await?;
|
||||
let lister = lister_factory(path, args, Box::new(lister));
|
||||
Ok((rp_list, MockLister { inner: lister }))
|
||||
} else {
|
||||
self.inner.list(path, args).await.map(|(rp_list, lister)| {
|
||||
(
|
||||
rp_list,
|
||||
MockLister {
|
||||
inner: Box::new(lister),
|
||||
},
|
||||
)
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -151,6 +151,7 @@ pub enum RegionRequest {
|
||||
Truncate(RegionTruncateRequest),
|
||||
Catchup(RegionCatchupRequest),
|
||||
BulkInserts(RegionBulkInsertsRequest),
|
||||
EnterStaging(EnterStagingRequest),
|
||||
}
|
||||
|
||||
impl RegionRequest {
|
||||
@@ -1416,6 +1417,17 @@ impl RegionBulkInsertsRequest {
|
||||
}
|
||||
}
|
||||
|
||||
/// Request to stage a region with a new region rule(partition expression).
|
||||
///
|
||||
/// This request transitions a region into the staging mode.
|
||||
/// It first flushes the memtable for the old region rule if it is not empty,
|
||||
/// then enters the staging mode with the new region rule.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct EnterStagingRequest {
|
||||
/// The partition expression of the staging region.
|
||||
pub partition_expr: String,
|
||||
}
|
||||
|
||||
impl fmt::Display for RegionRequest {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
@@ -1432,6 +1444,7 @@ impl fmt::Display for RegionRequest {
|
||||
RegionRequest::Truncate(_) => write!(f, "Truncate"),
|
||||
RegionRequest::Catchup(_) => write!(f, "Catchup"),
|
||||
RegionRequest::BulkInserts(_) => write!(f, "BulkInserts"),
|
||||
RegionRequest::EnterStaging(_) => write!(f, "EnterStaging"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user