From 1e44e86d8171d695e839686ddb5da08cfa323962 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Wed, 6 Sep 2023 16:07:55 +0800 Subject: [PATCH] feat(mito): Stall write requests and add more flush tests (#2322) * feat: impl reject write * feat: sanitize reject size * feat: add should_stall to WriteBufferManager * feat: stall requests * test: mock WriteBufferManager * feat: add new_with_manager for test and remove object_store from inner * feat: add an event listener for tests * feat: Use listener to test flush * refactor: add flush_test.rs * style: fix clippy * feat: test write stall * test: test flush empty --- src/mito2/src/config.rs | 12 +- src/mito2/src/engine.rs | 33 +++- src/mito2/src/engine/flush_test.rs | 219 +++++++++++++++++++++++++ src/mito2/src/engine/listener.rs | 90 ++++++++++ src/mito2/src/engine/tests.rs | 184 +-------------------- src/mito2/src/error.rs | 11 ++ src/mito2/src/flush.rs | 9 + src/mito2/src/request.rs | 12 ++ src/mito2/src/test_util.rs | 236 ++++++++++++++++++++++++++- src/mito2/src/worker.rs | 122 +++++++++++++- src/mito2/src/worker/handle_flush.rs | 13 +- src/mito2/src/worker/handle_write.rs | 37 ++++- 12 files changed, 778 insertions(+), 200 deletions(-) create mode 100644 src/mito2/src/engine/flush_test.rs create mode 100644 src/mito2/src/engine/listener.rs diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index 868b6d6647..8b343305e2 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -54,7 +54,7 @@ pub struct MitoConfig { /// Interval to auto flush a region if it has not flushed yet (default 30 min). #[serde(with = "humantime_serde")] pub auto_flush_interval: Duration, - /// Global write buffer size threshold to trigger flush (default 512M). + /// Global write buffer size threshold to trigger flush (default 1G). pub global_write_buffer_size: ReadableSize, /// Global write buffer size threshold to reject write requests (default 2G). pub global_write_buffer_reject_size: ReadableSize, @@ -70,7 +70,7 @@ impl Default for MitoConfig { manifest_compress_type: CompressionType::Uncompressed, max_background_jobs: DEFAULT_MAX_BG_JOB, auto_flush_interval: Duration::from_secs(30 * 60), - global_write_buffer_size: ReadableSize::mb(512), + global_write_buffer_size: ReadableSize::gb(1), global_write_buffer_reject_size: ReadableSize::gb(2), } } @@ -102,5 +102,13 @@ impl MitoConfig { warn!("Sanitize max background jobs 0 to {}", DEFAULT_MAX_BG_JOB); self.max_background_jobs = DEFAULT_MAX_BG_JOB; } + + if self.global_write_buffer_reject_size <= self.global_write_buffer_size { + self.global_write_buffer_reject_size = self.global_write_buffer_size * 2; + warn!( + "Sanitize global write buffer reject size to {}", + self.global_write_buffer_reject_size + ); + } } } diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index a9fb400f93..c30b0d8c4f 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -21,6 +21,10 @@ mod create_test; #[cfg(test)] mod drop_test; #[cfg(test)] +mod flush_test; +#[cfg(test)] +pub(crate) mod listener; +#[cfg(test)] mod open_test; #[cfg(test)] mod tests; @@ -102,7 +106,7 @@ impl EngineInner { object_store: ObjectStore, ) -> EngineInner { EngineInner { - workers: WorkerGroup::start(config, log_store, object_store.clone()), + workers: WorkerGroup::start(config, log_store, object_store), } } @@ -183,3 +187,30 @@ impl RegionEngine for MitoEngine { self.inner.get_metadata(region_id).map_err(BoxedError::new) } } + +// Tests methods. +#[cfg(test)] +impl MitoEngine { + /// Returns a new [MitoEngine] for tests. + pub fn new_for_test( + mut config: MitoConfig, + log_store: Arc, + object_store: ObjectStore, + write_buffer_manager: crate::flush::WriteBufferManagerRef, + listener: Option, + ) -> MitoEngine { + config.sanitize(); + + MitoEngine { + inner: Arc::new(EngineInner { + workers: WorkerGroup::start_for_test( + config, + log_store, + object_store, + write_buffer_manager, + listener, + ), + }), + } + } +} diff --git a/src/mito2/src/engine/flush_test.rs b/src/mito2/src/engine/flush_test.rs new file mode 100644 index 0000000000..c2a04a971f --- /dev/null +++ b/src/mito2/src/engine/flush_test.rs @@ -0,0 +1,219 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Flush tests for mito engine. + +use std::sync::Arc; + +use api::v1::Rows; +use common_recordbatch::RecordBatches; +use store_api::region_engine::RegionEngine; +use store_api::region_request::RegionRequest; +use store_api::storage::{RegionId, ScanRequest}; + +use crate::config::MitoConfig; +use crate::engine::listener::{FlushListener, StallListener}; +use crate::test_util::{ + build_rows, build_rows_for_key, flush_region, put_rows, rows_schema, CreateRequestBuilder, + MockWriteBufferManager, TestEnv, +}; + +#[tokio::test] +async fn test_manual_flush() { + let mut env = TestEnv::new(); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + + let column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + let rows = Rows { + schema: column_schemas, + rows: build_rows(0, 3), + }; + put_rows(&engine, region_id, rows).await; + + flush_region(&engine, region_id).await; + + let request = ScanRequest::default(); + let scanner = engine.scan(region_id, request).unwrap(); + assert_eq!(1, scanner.num_files()); + let stream = scanner.scan().await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 0 | 0.0 | 1970-01-01T00:00:00 | +| 1 | 1.0 | 1970-01-01T00:00:01 | +| 2 | 2.0 | 1970-01-01T00:00:02 | ++-------+---------+---------------------+"; + assert_eq!(expected, batches.pretty_print().unwrap()); +} + +#[tokio::test] +async fn test_flush_engine() { + let mut env = TestEnv::new(); + let write_buffer_manager = Arc::new(MockWriteBufferManager::default()); + let listener = Arc::new(FlushListener::new()); + let engine = env + .create_engine_with( + MitoConfig::default(), + write_buffer_manager.clone(), + Some(listener.clone()), + ) + .await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + + let column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + // Prepares rows for flush. + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows_for_key("a", 0, 2, 0), + }; + put_rows(&engine, region_id, rows).await; + + write_buffer_manager.set_should_flush(true); + + // Writes and triggers flush. + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows_for_key("b", 0, 2, 0), + }; + put_rows(&engine, region_id, rows).await; + + // Wait until flush is finished. + listener.wait().await; + + let request = ScanRequest::default(); + let scanner = engine.scan(region_id, request).unwrap(); + assert_eq!(1, scanner.num_files()); + let stream = scanner.scan().await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| a | 0.0 | 1970-01-01T00:00:00 | +| a | 1.0 | 1970-01-01T00:00:01 | +| b | 0.0 | 1970-01-01T00:00:00 | +| b | 1.0 | 1970-01-01T00:00:01 | ++-------+---------+---------------------+"; + assert_eq!(expected, batches.pretty_print().unwrap()); +} + +#[tokio::test] +async fn test_write_stall() { + let mut env = TestEnv::new(); + let write_buffer_manager = Arc::new(MockWriteBufferManager::default()); + let listener = Arc::new(StallListener::new()); + let engine = env + .create_engine_with( + MitoConfig::default(), + write_buffer_manager.clone(), + Some(listener.clone()), + ) + .await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + + let column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + // Prepares rows for flush. + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows_for_key("a", 0, 2, 0), + }; + put_rows(&engine, region_id, rows).await; + + // Stalls the engine. + write_buffer_manager.set_should_stall(true); + + let engine_cloned = engine.clone(); + // Spawns a task to flush the engine on stall. + tokio::spawn(async move { + listener.wait().await; + + flush_region(&engine_cloned, region_id).await; + }); + + // Triggers write stall. + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows_for_key("b", 0, 2, 0), + }; + put_rows(&engine, region_id, rows).await; + + let request = ScanRequest::default(); + let scanner = engine.scan(region_id, request).unwrap(); + assert_eq!(1, scanner.num_files()); + let stream = scanner.scan().await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| a | 0.0 | 1970-01-01T00:00:00 | +| a | 1.0 | 1970-01-01T00:00:01 | +| b | 0.0 | 1970-01-01T00:00:00 | +| b | 1.0 | 1970-01-01T00:00:01 | ++-------+---------+---------------------+"; + assert_eq!(expected, batches.pretty_print().unwrap()); +} + +#[tokio::test] +async fn test_flush_empty() { + let mut env = TestEnv::new(); + let write_buffer_manager = Arc::new(MockWriteBufferManager::default()); + let engine = env + .create_engine_with(MitoConfig::default(), write_buffer_manager.clone(), None) + .await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + flush_region(&engine, region_id).await; + + let request = ScanRequest::default(); + let scanner = engine.scan(region_id, request).unwrap(); + assert_eq!(0, scanner.num_files()); + let stream = scanner.scan().await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ +++ +++"; + assert_eq!(expected, batches.pretty_print().unwrap()); +} diff --git a/src/mito2/src/engine/listener.rs b/src/mito2/src/engine/listener.rs new file mode 100644 index 0000000000..e50d3b5a22 --- /dev/null +++ b/src/mito2/src/engine/listener.rs @@ -0,0 +1,90 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Engine event listener for tests. + +use std::sync::Arc; + +use common_telemetry::info; +use store_api::storage::RegionId; +use tokio::sync::Notify; + +/// Mito engine background event listener. +pub trait EventListener: Send + Sync { + /// Notifies the listener that a region is flushed successfully. + fn on_flush_success(&self, region_id: RegionId); + + /// Notifies the listener that the engine is stalled. + fn on_write_stall(&self); +} + +pub type EventListenerRef = Arc; + +/// Listener to watch flush events. +pub struct FlushListener { + notify: Notify, +} + +impl FlushListener { + /// Creates a new listener. + pub fn new() -> FlushListener { + FlushListener { + notify: Notify::new(), + } + } + + /// Wait until one flush job is done. + pub async fn wait(&self) { + self.notify.notified().await; + } +} + +impl EventListener for FlushListener { + fn on_flush_success(&self, region_id: RegionId) { + info!("Region {} flush successfully", region_id); + + self.notify.notify_one() + } + + fn on_write_stall(&self) {} +} + +/// Listener to watch stall events. +pub struct StallListener { + notify: Notify, +} + +impl StallListener { + /// Creates a new listener. + pub fn new() -> StallListener { + StallListener { + notify: Notify::new(), + } + } + + /// Wait for a stall event. + pub async fn wait(&self) { + self.notify.notified().await; + } +} + +impl EventListener for StallListener { + fn on_flush_success(&self, _region_id: RegionId) {} + + fn on_write_stall(&self) { + info!("Engine is stalled"); + + self.notify.notify_one(); + } +} diff --git a/src/mito2/src/engine/tests.rs b/src/mito2/src/engine/tests.rs index 25bfe00f68..968034f49a 100644 --- a/src/mito2/src/engine/tests.rs +++ b/src/mito2/src/engine/tests.rs @@ -12,26 +12,23 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! Tests for mito engine. +//! Basic tests for mito engine. use std::collections::HashMap; -use api::helper::ColumnDataTypeWrapper; -use api::v1::value::ValueData; -use api::v1::{ColumnSchema, Row, Rows, SemanticType}; +use api::v1::Rows; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_recordbatch::RecordBatches; -use store_api::metadata::ColumnMetadata; -use store_api::region_request::{ - RegionCreateRequest, RegionDeleteRequest, RegionFlushRequest, RegionOpenRequest, - RegionPutRequest, -}; +use store_api::region_request::RegionOpenRequest; use store_api::storage::RegionId; use super::*; use crate::region::version::VersionControlData; -use crate::test_util::{CreateRequestBuilder, TestEnv}; +use crate::test_util::{ + build_delete_rows_for_key, build_rows, build_rows_for_key, delete_rows, delete_rows_schema, + put_rows, rows_schema, CreateRequestBuilder, TestEnv, +}; #[tokio::test] async fn test_engine_new_stop() { @@ -60,54 +57,6 @@ async fn test_engine_new_stop() { ); } -fn column_metadata_to_column_schema(metadata: &ColumnMetadata) -> api::v1::ColumnSchema { - api::v1::ColumnSchema { - column_name: metadata.column_schema.name.clone(), - datatype: ColumnDataTypeWrapper::try_from(metadata.column_schema.data_type.clone()) - .unwrap() - .datatype() as i32, - semantic_type: metadata.semantic_type as i32, - } -} - -fn build_rows(start: usize, end: usize) -> Vec { - (start..end) - .map(|i| api::v1::Row { - values: vec![ - api::v1::Value { - value_data: Some(ValueData::StringValue(i.to_string())), - }, - api::v1::Value { - value_data: Some(ValueData::F64Value(i as f64)), - }, - api::v1::Value { - value_data: Some(ValueData::TsMillisecondValue(i as i64 * 1000)), - }, - ], - }) - .collect() -} - -fn rows_schema(request: &RegionCreateRequest) -> Vec { - request - .column_metadatas - .iter() - .map(column_metadata_to_column_schema) - .collect::>() -} - -async fn put_rows(engine: &MitoEngine, region_id: RegionId, rows: Rows) { - let num_rows = rows.rows.len(); - let output = engine - .handle_request(region_id, RegionRequest::Put(RegionPutRequest { rows })) - .await - .unwrap(); - let Output::AffectedRows(rows_inserted) = output else { - unreachable!() - }; - assert_eq!(num_rows, rows_inserted); -} - #[tokio::test] async fn test_write_to_region() { let mut env = TestEnv::with_prefix("write-to-region"); @@ -157,13 +106,7 @@ async fn test_region_replay() { }; put_rows(&engine, region_id, rows).await; - engine.stop().await.unwrap(); - - let engine = MitoEngine::new( - MitoConfig::default(), - env.get_logstore().unwrap(), - env.get_object_store().unwrap(), - ); + let engine = env.reopen_engine(engine, MitoConfig::default()).await; let open_region = engine .handle_request( @@ -200,8 +143,6 @@ async fn test_region_replay() { engine.stop().await.unwrap(); } -// TODO(yingwen): build_rows() only generate one point for each series. We need to add tests -// for series with multiple points and other cases. #[tokio::test] async fn test_write_query_region() { let mut env = TestEnv::new(); @@ -236,66 +177,6 @@ async fn test_write_query_region() { assert_eq!(expected, batches.pretty_print().unwrap()); } -/// Build rows to put for specific `key`. -fn build_rows_for_key(key: &str, start: usize, end: usize, value_start: usize) -> Vec { - (start..end) - .enumerate() - .map(|(idx, ts)| api::v1::Row { - values: vec![ - api::v1::Value { - value_data: Some(ValueData::StringValue(key.to_string())), - }, - api::v1::Value { - value_data: Some(ValueData::F64Value((value_start + idx) as f64)), - }, - api::v1::Value { - value_data: Some(ValueData::TsMillisecondValue(ts as i64 * 1000)), - }, - ], - }) - .collect() -} - -/// Build rows to delete for specific `key`. -fn build_delete_rows_for_key(key: &str, start: usize, end: usize) -> Vec { - (start..end) - .map(|ts| api::v1::Row { - values: vec![ - api::v1::Value { - value_data: Some(ValueData::StringValue(key.to_string())), - }, - api::v1::Value { - value_data: Some(ValueData::TsMillisecondValue(ts as i64 * 1000)), - }, - ], - }) - .collect() -} - -fn delete_rows_schema(request: &RegionCreateRequest) -> Vec { - request - .column_metadatas - .iter() - .filter(|col| col.semantic_type != SemanticType::Field) - .map(column_metadata_to_column_schema) - .collect::>() -} - -async fn delete_rows(engine: &MitoEngine, region_id: RegionId, rows: Rows) { - let num_rows = rows.rows.len(); - let output = engine - .handle_request( - region_id, - RegionRequest::Delete(RegionDeleteRequest { rows }), - ) - .await - .unwrap(); - let Output::AffectedRows(rows_inserted) = output else { - unreachable!() - }; - assert_eq!(num_rows, rows_inserted); -} - #[tokio::test] async fn test_put_delete() { let mut env = TestEnv::new(); @@ -409,52 +290,3 @@ async fn test_put_overwrite() { +-------+---------+---------------------+"; assert_eq!(expected, batches.pretty_print().unwrap()); } - -#[tokio::test] -async fn test_manual_flush() { - let mut env = TestEnv::new(); - let engine = env.create_engine(MitoConfig::default()).await; - - let region_id = RegionId::new(1, 1); - let request = CreateRequestBuilder::new().build(); - - let column_schemas = request - .column_metadatas - .iter() - .map(column_metadata_to_column_schema) - .collect::>(); - engine - .handle_request(region_id, RegionRequest::Create(request)) - .await - .unwrap(); - - let rows = Rows { - schema: column_schemas, - rows: build_rows(0, 3), - }; - put_rows(&engine, region_id, rows).await; - - let Output::AffectedRows(rows) = engine - .handle_request(region_id, RegionRequest::Flush(RegionFlushRequest {})) - .await - .unwrap() - else { - unreachable!() - }; - assert_eq!(0, rows); - - let request = ScanRequest::default(); - let scanner = engine.scan(region_id, request).unwrap(); - assert_eq!(1, scanner.num_files()); - let stream = scanner.scan().await.unwrap(); - let batches = RecordBatches::try_collect(stream).await.unwrap(); - let expected = "\ -+-------+---------+---------------------+ -| tag_0 | field_0 | ts | -+-------+---------+---------------------+ -| 0 | 0.0 | 1970-01-01T00:00:00 | -| 1 | 1.0 | 1970-01-01T00:00:01 | -| 2 | 2.0 | 1970-01-01T00:00:02 | -+-------+---------+---------------------+"; - assert_eq!(expected, batches.pretty_print().unwrap()); -} diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index c821cf0514..70ece0f3bb 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -424,6 +424,16 @@ pub enum Error { region_id: RegionId, location: Location, }, + + #[snafu(display( + "Engine write buffer is full, rejecting write requests of region {}, location: {}", + region_id, + location + ))] + RejectWrite { + region_id: RegionId, + location: Location, + }, } pub type Result = std::result::Result; @@ -489,6 +499,7 @@ impl ErrorExt for Error { FlushRegion { source, .. } => source.status_code(), RegionDropped { .. } => StatusCode::Cancelled, RegionClosed { .. } => StatusCode::Cancelled, + RejectWrite { .. } => StatusCode::StorageUnavailable, } } diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index eb795f23b2..11f08b5d5f 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -45,6 +45,9 @@ pub trait WriteBufferManager: Send + Sync + std::fmt::Debug { /// Returns whether to trigger the engine. fn should_flush_engine(&self) -> bool; + /// Returns whether to stall write requests. + fn should_stall(&self) -> bool; + /// Reserves `mem` bytes. fn reserve_mem(&self, mem: usize); @@ -132,6 +135,10 @@ impl WriteBufferManager for WriteBufferManagerImpl { false } + fn should_stall(&self) -> bool { + self.memory_usage() >= self.global_write_buffer_size + } + fn reserve_mem(&self, mem: usize) { self.memory_used.fetch_add(mem, Ordering::Relaxed); self.memory_active.fetch_add(mem, Ordering::Relaxed); @@ -553,6 +560,7 @@ mod tests { let manager = WriteBufferManagerImpl::new(1000); manager.reserve_mem(500); assert!(!manager.should_flush_engine()); + assert!(!manager.should_stall()); // More than mutable limit. manager.reserve_mem(400); @@ -575,6 +583,7 @@ mod tests { // Mutable limit is 800. let manager = WriteBufferManagerImpl::new(1000); manager.reserve_mem(1100); + assert!(manager.should_stall()); // Global usage is still 1100. manager.schedule_free_mem(200); assert!(manager.should_flush_engine()); diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index cfbe45819d..c4ec010f99 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -26,6 +26,7 @@ use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value}; use common_base::readable_size::ReadableSize; use common_query::Output; use datatypes::prelude::DataType; +use prost::Message; use smallvec::SmallVec; use snafu::{ensure, OptionExt, ResultExt}; use store_api::metadata::{ColumnMetadata, RegionMetadata}; @@ -133,6 +134,17 @@ impl WriteRequest { }) } + /// Returns estimated size of the request. + pub(crate) fn estimated_size(&self) -> usize { + let row_size = self + .rows + .rows + .get(0) + .map(|row| row.encoded_len()) + .unwrap_or(0); + row_size * self.rows.rows.len() + } + /// Gets column index by name. pub(crate) fn column_index_by_name(&self, name: &str) -> Option { self.name_to_index.get(name).copied() diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 37d3f8410d..01372593a5 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -15,12 +15,15 @@ //! Utilities for testing. use std::collections::HashMap; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; use api::greptime_proto::v1; +use api::helper::ColumnDataTypeWrapper; use api::v1::value::ValueData; -use api::v1::{OpType, SemanticType}; +use api::v1::{OpType, Row, Rows, SemanticType}; use common_datasource::compression::CompressionType; +use common_query::Output; use common_test_util::temp_dir::{create_temp_dir, TempDir}; use datatypes::arrow::array::{TimestampMillisecondArray, UInt64Array, UInt8Array}; use datatypes::prelude::ConcreteDataType; @@ -30,11 +33,17 @@ use log_store::test_util::log_store_util; use object_store::services::Fs; use object_store::ObjectStore; use store_api::metadata::{ColumnMetadata, RegionMetadataRef}; -use store_api::region_request::RegionCreateRequest; +use store_api::region_engine::RegionEngine; +use store_api::region_request::{ + RegionCreateRequest, RegionDeleteRequest, RegionFlushRequest, RegionPutRequest, RegionRequest, +}; +use store_api::storage::RegionId; use crate::config::MitoConfig; +use crate::engine::listener::EventListenerRef; use crate::engine::MitoEngine; use crate::error::Result; +use crate::flush::{WriteBufferManager, WriteBufferManagerRef}; use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; use crate::read::{Batch, BatchBuilder, BatchReader}; use crate::worker::WorkerGroup; @@ -90,6 +99,51 @@ impl TestEnv { MitoEngine::new(config, logstore, object_store) } + /// Creates a new engine with specific config and manager/listener under this env. + pub async fn create_engine_with( + &mut self, + config: MitoConfig, + manager: WriteBufferManagerRef, + listener: Option, + ) -> MitoEngine { + let (log_store, object_store) = self.create_log_and_object_store().await; + + let logstore = Arc::new(log_store); + self.logstore = Some(logstore.clone()); + self.object_store = Some(object_store.clone()); + MitoEngine::new_for_test(config, logstore, object_store, manager, listener) + } + + /// Reopen the engine. + pub async fn reopen_engine(&mut self, engine: MitoEngine, config: MitoConfig) -> MitoEngine { + engine.stop().await.unwrap(); + + MitoEngine::new( + config, + self.logstore.clone().unwrap(), + self.object_store.clone().unwrap(), + ) + } + + /// Reopen the engine. + pub async fn reopen_engine_with( + &self, + engine: MitoEngine, + config: MitoConfig, + manager: WriteBufferManagerRef, + listener: Option, + ) -> MitoEngine { + engine.stop().await.unwrap(); + + MitoEngine::new_for_test( + config, + self.logstore.clone().unwrap(), + self.object_store.clone().unwrap(), + manager, + listener, + ) + } + /// Creates a new [WorkerGroup] with specific config under this env. pub(crate) async fn create_worker_group(&self, config: MitoConfig) -> WorkerGroup { let (log_store, object_store) = self.create_log_and_object_store().await; @@ -329,3 +383,181 @@ pub fn new_batch( .build() .unwrap() } + +/// A mock [WriteBufferManager] that supports controlling whether to flush/stall. +#[derive(Debug, Default)] +pub struct MockWriteBufferManager { + should_flush: AtomicBool, + should_stall: AtomicBool, + memory_used: AtomicUsize, + memory_active: AtomicUsize, +} + +impl MockWriteBufferManager { + /// Set whether to flush the engine. + pub fn set_should_flush(&self, value: bool) { + self.should_flush.store(value, Ordering::Relaxed); + } + + /// Set whether to stall the engine. + pub fn set_should_stall(&self, value: bool) { + self.should_stall.store(value, Ordering::Relaxed); + } + + /// Returns memory usage of mutable memtables. + pub fn mutable_usage(&self) -> usize { + self.memory_active.load(Ordering::Relaxed) + } +} + +impl WriteBufferManager for MockWriteBufferManager { + fn should_flush_engine(&self) -> bool { + self.should_flush.load(Ordering::Relaxed) + } + + fn should_stall(&self) -> bool { + self.should_stall.load(Ordering::Relaxed) + } + + fn reserve_mem(&self, mem: usize) { + self.memory_used.fetch_add(mem, Ordering::Relaxed); + self.memory_active.fetch_add(mem, Ordering::Relaxed); + } + + fn schedule_free_mem(&self, mem: usize) { + self.memory_active.fetch_sub(mem, Ordering::Relaxed); + } + + fn free_mem(&self, mem: usize) { + self.memory_used.fetch_sub(mem, Ordering::Relaxed); + } + + fn memory_usage(&self) -> usize { + self.memory_used.load(Ordering::Relaxed) + } +} + +fn column_metadata_to_column_schema(metadata: &ColumnMetadata) -> api::v1::ColumnSchema { + api::v1::ColumnSchema { + column_name: metadata.column_schema.name.clone(), + datatype: ColumnDataTypeWrapper::try_from(metadata.column_schema.data_type.clone()) + .unwrap() + .datatype() as i32, + semantic_type: metadata.semantic_type as i32, + } +} + +/// Build rows with schema (string, f64, ts_millis). +pub fn build_rows(start: usize, end: usize) -> Vec { + (start..end) + .map(|i| api::v1::Row { + values: vec![ + api::v1::Value { + value_data: Some(ValueData::StringValue(i.to_string())), + }, + api::v1::Value { + value_data: Some(ValueData::F64Value(i as f64)), + }, + api::v1::Value { + value_data: Some(ValueData::TsMillisecondValue(i as i64 * 1000)), + }, + ], + }) + .collect() +} + +/// Get column schemas for rows. +pub fn rows_schema(request: &RegionCreateRequest) -> Vec { + request + .column_metadatas + .iter() + .map(column_metadata_to_column_schema) + .collect::>() +} + +/// Get column schemas for delete requests. +pub fn delete_rows_schema(request: &RegionCreateRequest) -> Vec { + request + .column_metadatas + .iter() + .filter(|col| col.semantic_type != SemanticType::Field) + .map(column_metadata_to_column_schema) + .collect::>() +} + +/// Put rows into the engine. +pub async fn put_rows(engine: &MitoEngine, region_id: RegionId, rows: Rows) { + let num_rows = rows.rows.len(); + let output = engine + .handle_request(region_id, RegionRequest::Put(RegionPutRequest { rows })) + .await + .unwrap(); + let Output::AffectedRows(rows_inserted) = output else { + unreachable!() + }; + assert_eq!(num_rows, rows_inserted); +} + +/// Build rows to put for specific `key`. +pub fn build_rows_for_key(key: &str, start: usize, end: usize, value_start: usize) -> Vec { + (start..end) + .enumerate() + .map(|(idx, ts)| api::v1::Row { + values: vec![ + api::v1::Value { + value_data: Some(ValueData::StringValue(key.to_string())), + }, + api::v1::Value { + value_data: Some(ValueData::F64Value((value_start + idx) as f64)), + }, + api::v1::Value { + value_data: Some(ValueData::TsMillisecondValue(ts as i64 * 1000)), + }, + ], + }) + .collect() +} + +/// Build rows to delete for specific `key`. +pub fn build_delete_rows_for_key(key: &str, start: usize, end: usize) -> Vec { + (start..end) + .map(|ts| api::v1::Row { + values: vec![ + api::v1::Value { + value_data: Some(ValueData::StringValue(key.to_string())), + }, + api::v1::Value { + value_data: Some(ValueData::TsMillisecondValue(ts as i64 * 1000)), + }, + ], + }) + .collect() +} + +/// Delete rows from the engine. +pub async fn delete_rows(engine: &MitoEngine, region_id: RegionId, rows: Rows) { + let num_rows = rows.rows.len(); + let output = engine + .handle_request( + region_id, + RegionRequest::Delete(RegionDeleteRequest { rows }), + ) + .await + .unwrap(); + let Output::AffectedRows(rows_inserted) = output else { + unreachable!() + }; + assert_eq!(num_rows, rows_inserted); +} + +/// Flush a region manually. +pub async fn flush_region(engine: &MitoEngine, region_id: RegionId) { + let Output::AffectedRows(rows) = engine + .handle_request(region_id, RegionRequest::Flush(RegionFlushRequest {})) + .await + .unwrap() + else { + unreachable!() + }; + assert_eq!(0, rows); +} diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 8b9691235c..9545d6fc72 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -42,7 +42,9 @@ use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef use crate::memtable::time_series::TimeSeriesMemtableBuilder; use crate::memtable::MemtableBuilderRef; use crate::region::{MitoRegionRef, RegionMap, RegionMapRef}; -use crate::request::{BackgroundNotify, DdlRequest, SenderDdlRequest, WorkerRequest}; +use crate::request::{ + BackgroundNotify, DdlRequest, SenderDdlRequest, SenderWriteRequest, WorkerRequest, +}; use crate::schedule::scheduler::{LocalScheduler, SchedulerRef}; use crate::wal::Wal; @@ -94,7 +96,7 @@ pub(crate) struct WorkerGroup { } impl WorkerGroup { - /// Start a worker group. + /// Starts a worker group. /// /// The number of workers should be power of two. pub(crate) fn start( @@ -118,6 +120,7 @@ impl WorkerGroup { object_store: object_store.clone(), write_buffer_manager: write_buffer_manager.clone(), scheduler: scheduler.clone(), + listener: WorkerListener::default(), } .start() }) @@ -126,18 +129,19 @@ impl WorkerGroup { WorkerGroup { workers, scheduler } } - /// Stop the worker group. + /// Stops the worker group. pub(crate) async fn stop(&self) -> Result<()> { info!("Stop region worker group"); - try_join_all(self.workers.iter().map(|worker| worker.stop())).await?; - + // Stops the scheduler gracefully. self.scheduler.stop(true).await?; + try_join_all(self.workers.iter().map(|worker| worker.stop())).await?; + Ok(()) } - /// Submit a request to a worker in the group. + /// Submits a request to a worker in the group. pub(crate) async fn submit_to_worker( &self, region_id: RegionId, @@ -169,6 +173,42 @@ impl WorkerGroup { } } +// Tests methods. +#[cfg(test)] +impl WorkerGroup { + /// Starts a worker group with `write_buffer_manager` and `listener` for tests. + /// + /// The number of workers should be power of two. + pub(crate) fn start_for_test( + config: MitoConfig, + log_store: Arc, + object_store: ObjectStore, + write_buffer_manager: WriteBufferManagerRef, + listener: Option, + ) -> WorkerGroup { + assert!(config.num_workers.is_power_of_two()); + let config = Arc::new(config); + let scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs)); + + let workers = (0..config.num_workers) + .map(|id| { + WorkerStarter { + id: id as WorkerId, + config: config.clone(), + log_store: log_store.clone(), + object_store: object_store.clone(), + write_buffer_manager: write_buffer_manager.clone(), + scheduler: scheduler.clone(), + listener: WorkerListener::new(listener.clone()), + } + .start() + }) + .collect(); + + WorkerGroup { workers, scheduler } + } +} + fn value_to_index(value: usize, num_workers: usize) -> usize { value & (num_workers - 1) } @@ -181,10 +221,11 @@ struct WorkerStarter { object_store: ObjectStore, write_buffer_manager: WriteBufferManagerRef, scheduler: SchedulerRef, + listener: WorkerListener, } impl WorkerStarter { - /// Start a region worker and its background thread. + /// Starts a region worker and its background thread. fn start(self) -> RegionWorker { let regions = Arc::new(RegionMap::default()); let (sender, receiver) = mpsc::channel(self.config.worker_channel_size); @@ -206,6 +247,8 @@ impl WorkerStarter { scheduler: self.scheduler.clone(), write_buffer_manager: self.write_buffer_manager, flush_scheduler: FlushScheduler::new(self.scheduler), + stalled_requests: StalledRequests::default(), + listener: self.listener, }; let handle = common_runtime::spawn_write(async move { worker_thread.run().await; @@ -236,7 +279,7 @@ pub(crate) struct RegionWorker { } impl RegionWorker { - /// Submit request to background worker thread. + /// Submits request to background worker thread. async fn submit_request(&self, request: WorkerRequest) -> Result<()> { ensure!(self.is_running(), WorkerStoppedSnafu { id: self.id }); if self.sender.send(request).await.is_err() { @@ -303,6 +346,29 @@ impl Drop for RegionWorker { type RequestBuffer = Vec; +/// Buffer for stalled write requests. +/// +/// Maintains stalled write requests and their estimated size. +#[derive(Default)] +pub(crate) struct StalledRequests { + /// Stalled requests. + pub(crate) requests: Vec, + /// Estimated size of all stalled requests. + pub(crate) estimated_size: usize, +} + +impl StalledRequests { + /// Appends stalled requests. + pub(crate) fn append(&mut self, requests: &mut Vec) { + let size: usize = requests + .iter() + .map(|req| req.request.estimated_size()) + .sum(); + self.requests.append(requests); + self.estimated_size += size; + } +} + /// Background worker loop to handle requests. struct RegionWorkerLoop { // Id of the worker. @@ -331,6 +397,10 @@ struct RegionWorkerLoop { write_buffer_manager: WriteBufferManagerRef, /// Schedules background flush requests. flush_scheduler: FlushScheduler, + /// Stalled write requests. + stalled_requests: StalledRequests, + /// Event listener for tests. + listener: WorkerListener, } impl RegionWorkerLoop { @@ -397,7 +467,7 @@ impl RegionWorkerLoop { // Handles all write requests first. So we can alter regions without // considering existing write requests. - self.handle_write_requests(write_requests).await; + self.handle_write_requests(write_requests, true).await; self.handle_ddl_requests(ddl_requests).await; } @@ -455,6 +525,40 @@ impl RegionWorkerLoop { } } +/// Wrapper that only calls event listener in tests. +#[derive(Default)] +pub(crate) struct WorkerListener { + #[cfg(test)] + listener: Option, +} + +impl WorkerListener { + #[cfg(test)] + pub(crate) fn new( + listener: Option, + ) -> WorkerListener { + WorkerListener { listener } + } + + /// Flush is finished successfully. + pub(crate) fn on_flush_success(&self, region_id: RegionId) { + #[cfg(test)] + if let Some(listener) = &self.listener { + listener.on_flush_success(region_id); + } + // Avoid compiler warning. + let _ = region_id; + } + + /// Engine is stalled. + pub(crate) fn on_write_stall(&self) { + #[cfg(test)] + if let Some(listener) = &self.listener { + listener.on_write_stall(); + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index face8da74f..71685c70b7 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -72,13 +72,20 @@ impl RegionWorkerLoop { return; } - // Handle pending requests of the region. + // Notifies waiters. + request.on_success(); + + // Handle pending DDL requests for the region. if let Some(ddl_requests) = self.flush_scheduler.on_flush_success(region_id) { self.handle_ddl_requests(ddl_requests).await; } - // Notifies waiters. - request.on_success(); + // Handle stalled requests. + let stalled = std::mem::take(&mut self.stalled_requests); + // We already stalled these requests, don't stall them again. + self.handle_write_requests(stalled.requests, false).await; + + self.listener.on_flush_success(region_id); } } diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index 38098aa10d..a882d12500 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -23,14 +23,18 @@ use store_api::metadata::RegionMetadata; use store_api::storage::RegionId; use tokio::sync::oneshot::Sender; -use crate::error::{RegionNotFoundSnafu, Result}; +use crate::error::{RegionNotFoundSnafu, RejectWriteSnafu, Result}; use crate::region_write_ctx::RegionWriteCtx; use crate::request::{SenderWriteRequest, WriteRequest}; use crate::worker::RegionWorkerLoop; impl RegionWorkerLoop { /// Takes and handles all write requests. - pub(crate) async fn handle_write_requests(&mut self, write_requests: Vec) { + pub(crate) async fn handle_write_requests( + &mut self, + mut write_requests: Vec, + allow_stall: bool, + ) { if write_requests.is_empty() { return; } @@ -41,6 +45,16 @@ impl RegionWorkerLoop { if self.should_reject_write() { // The memory pressure is still too high, reject write requests. reject_write_requests(write_requests); + // Also reject all stalled requests. + let stalled = std::mem::take(&mut self.stalled_requests); + reject_write_requests(stalled.requests); + return; + } + + if self.write_buffer_manager.should_stall() && allow_stall { + // TODO(yingwen): stalled metrics. + self.stalled_requests.append(&mut write_requests); + self.listener.on_write_stall(); return; } @@ -118,15 +132,24 @@ impl RegionWorkerLoop { /// Returns true if the engine needs to reject some write requests. fn should_reject_write(&self) -> bool { - // If memory usage reaches high threshold (we should also consider pending flush requests) returns true. - // TODO(yingwen): Implement this. - false + // If memory usage reaches high threshold (we should also consider stalled requests) returns true. + self.write_buffer_manager.memory_usage() + self.stalled_requests.estimated_size + >= self.config.global_write_buffer_reject_size.as_bytes() as usize } } /// Send rejected error to all `write_requests`. -fn reject_write_requests(_write_requests: Vec) { - unimplemented!() +fn reject_write_requests(write_requests: Vec) { + for req in write_requests { + if let Some(sender) = req.sender { + let _ = sender.send( + RejectWriteSnafu { + region_id: req.request.region_id, + } + .fail(), + ); + } + } } /// Checks the schema and fill missing columns.