feat(mito): Stall write requests and add more flush tests (#2322)

* feat: impl reject write

* feat: sanitize reject size

* feat: add should_stall to WriteBufferManager

* feat: stall requests

* test: mock WriteBufferManager

* feat: add new_with_manager for test and remove object_store from inner

* feat: add an event listener for tests

* feat: Use listener to test flush

* refactor: add flush_test.rs

* style: fix clippy

* feat: test write stall

* test: test flush empty
This commit is contained in:
Yingwen
2023-09-06 16:07:55 +08:00
committed by Ruihang Xia
parent 56691ff03b
commit 1e44e86d81
12 changed files with 778 additions and 200 deletions

View File

@@ -54,7 +54,7 @@ pub struct MitoConfig {
/// Interval to auto flush a region if it has not flushed yet (default 30 min).
#[serde(with = "humantime_serde")]
pub auto_flush_interval: Duration,
/// Global write buffer size threshold to trigger flush (default 512M).
/// Global write buffer size threshold to trigger flush (default 1G).
pub global_write_buffer_size: ReadableSize,
/// Global write buffer size threshold to reject write requests (default 2G).
pub global_write_buffer_reject_size: ReadableSize,
@@ -70,7 +70,7 @@ impl Default for MitoConfig {
manifest_compress_type: CompressionType::Uncompressed,
max_background_jobs: DEFAULT_MAX_BG_JOB,
auto_flush_interval: Duration::from_secs(30 * 60),
global_write_buffer_size: ReadableSize::mb(512),
global_write_buffer_size: ReadableSize::gb(1),
global_write_buffer_reject_size: ReadableSize::gb(2),
}
}
@@ -102,5 +102,13 @@ impl MitoConfig {
warn!("Sanitize max background jobs 0 to {}", DEFAULT_MAX_BG_JOB);
self.max_background_jobs = DEFAULT_MAX_BG_JOB;
}
if self.global_write_buffer_reject_size <= self.global_write_buffer_size {
self.global_write_buffer_reject_size = self.global_write_buffer_size * 2;
warn!(
"Sanitize global write buffer reject size to {}",
self.global_write_buffer_reject_size
);
}
}
}

View File

@@ -21,6 +21,10 @@ mod create_test;
#[cfg(test)]
mod drop_test;
#[cfg(test)]
mod flush_test;
#[cfg(test)]
pub(crate) mod listener;
#[cfg(test)]
mod open_test;
#[cfg(test)]
mod tests;
@@ -102,7 +106,7 @@ impl EngineInner {
object_store: ObjectStore,
) -> EngineInner {
EngineInner {
workers: WorkerGroup::start(config, log_store, object_store.clone()),
workers: WorkerGroup::start(config, log_store, object_store),
}
}
@@ -183,3 +187,30 @@ impl RegionEngine for MitoEngine {
self.inner.get_metadata(region_id).map_err(BoxedError::new)
}
}
// Tests methods.
#[cfg(test)]
impl MitoEngine {
/// Returns a new [MitoEngine] for tests.
pub fn new_for_test<S: LogStore>(
mut config: MitoConfig,
log_store: Arc<S>,
object_store: ObjectStore,
write_buffer_manager: crate::flush::WriteBufferManagerRef,
listener: Option<crate::engine::listener::EventListenerRef>,
) -> MitoEngine {
config.sanitize();
MitoEngine {
inner: Arc::new(EngineInner {
workers: WorkerGroup::start_for_test(
config,
log_store,
object_store,
write_buffer_manager,
listener,
),
}),
}
}
}

View File

@@ -0,0 +1,219 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Flush tests for mito engine.
use std::sync::Arc;
use api::v1::Rows;
use common_recordbatch::RecordBatches;
use store_api::region_engine::RegionEngine;
use store_api::region_request::RegionRequest;
use store_api::storage::{RegionId, ScanRequest};
use crate::config::MitoConfig;
use crate::engine::listener::{FlushListener, StallListener};
use crate::test_util::{
build_rows, build_rows_for_key, flush_region, put_rows, rows_schema, CreateRequestBuilder,
MockWriteBufferManager, TestEnv,
};
#[tokio::test]
async fn test_manual_flush() {
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
let rows = Rows {
schema: column_schemas,
rows: build_rows(0, 3),
};
put_rows(&engine, region_id, rows).await;
flush_region(&engine, region_id).await;
let request = ScanRequest::default();
let scanner = engine.scan(region_id, request).unwrap();
assert_eq!(1, scanner.num_files());
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 0 | 0.0 | 1970-01-01T00:00:00 |
| 1 | 1.0 | 1970-01-01T00:00:01 |
| 2 | 2.0 | 1970-01-01T00:00:02 |
+-------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());
}
#[tokio::test]
async fn test_flush_engine() {
let mut env = TestEnv::new();
let write_buffer_manager = Arc::new(MockWriteBufferManager::default());
let listener = Arc::new(FlushListener::new());
let engine = env
.create_engine_with(
MitoConfig::default(),
write_buffer_manager.clone(),
Some(listener.clone()),
)
.await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
// Prepares rows for flush.
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows_for_key("a", 0, 2, 0),
};
put_rows(&engine, region_id, rows).await;
write_buffer_manager.set_should_flush(true);
// Writes and triggers flush.
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows_for_key("b", 0, 2, 0),
};
put_rows(&engine, region_id, rows).await;
// Wait until flush is finished.
listener.wait().await;
let request = ScanRequest::default();
let scanner = engine.scan(region_id, request).unwrap();
assert_eq!(1, scanner.num_files());
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| a | 0.0 | 1970-01-01T00:00:00 |
| a | 1.0 | 1970-01-01T00:00:01 |
| b | 0.0 | 1970-01-01T00:00:00 |
| b | 1.0 | 1970-01-01T00:00:01 |
+-------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());
}
#[tokio::test]
async fn test_write_stall() {
let mut env = TestEnv::new();
let write_buffer_manager = Arc::new(MockWriteBufferManager::default());
let listener = Arc::new(StallListener::new());
let engine = env
.create_engine_with(
MitoConfig::default(),
write_buffer_manager.clone(),
Some(listener.clone()),
)
.await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
// Prepares rows for flush.
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows_for_key("a", 0, 2, 0),
};
put_rows(&engine, region_id, rows).await;
// Stalls the engine.
write_buffer_manager.set_should_stall(true);
let engine_cloned = engine.clone();
// Spawns a task to flush the engine on stall.
tokio::spawn(async move {
listener.wait().await;
flush_region(&engine_cloned, region_id).await;
});
// Triggers write stall.
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows_for_key("b", 0, 2, 0),
};
put_rows(&engine, region_id, rows).await;
let request = ScanRequest::default();
let scanner = engine.scan(region_id, request).unwrap();
assert_eq!(1, scanner.num_files());
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| a | 0.0 | 1970-01-01T00:00:00 |
| a | 1.0 | 1970-01-01T00:00:01 |
| b | 0.0 | 1970-01-01T00:00:00 |
| b | 1.0 | 1970-01-01T00:00:01 |
+-------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());
}
#[tokio::test]
async fn test_flush_empty() {
let mut env = TestEnv::new();
let write_buffer_manager = Arc::new(MockWriteBufferManager::default());
let engine = env
.create_engine_with(MitoConfig::default(), write_buffer_manager.clone(), None)
.await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
flush_region(&engine, region_id).await;
let request = ScanRequest::default();
let scanner = engine.scan(region_id, request).unwrap();
assert_eq!(0, scanner.num_files());
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
++
++";
assert_eq!(expected, batches.pretty_print().unwrap());
}

View File

@@ -0,0 +1,90 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Engine event listener for tests.
use std::sync::Arc;
use common_telemetry::info;
use store_api::storage::RegionId;
use tokio::sync::Notify;
/// Mito engine background event listener.
pub trait EventListener: Send + Sync {
/// Notifies the listener that a region is flushed successfully.
fn on_flush_success(&self, region_id: RegionId);
/// Notifies the listener that the engine is stalled.
fn on_write_stall(&self);
}
pub type EventListenerRef = Arc<dyn EventListener>;
/// Listener to watch flush events.
pub struct FlushListener {
notify: Notify,
}
impl FlushListener {
/// Creates a new listener.
pub fn new() -> FlushListener {
FlushListener {
notify: Notify::new(),
}
}
/// Wait until one flush job is done.
pub async fn wait(&self) {
self.notify.notified().await;
}
}
impl EventListener for FlushListener {
fn on_flush_success(&self, region_id: RegionId) {
info!("Region {} flush successfully", region_id);
self.notify.notify_one()
}
fn on_write_stall(&self) {}
}
/// Listener to watch stall events.
pub struct StallListener {
notify: Notify,
}
impl StallListener {
/// Creates a new listener.
pub fn new() -> StallListener {
StallListener {
notify: Notify::new(),
}
}
/// Wait for a stall event.
pub async fn wait(&self) {
self.notify.notified().await;
}
}
impl EventListener for StallListener {
fn on_flush_success(&self, _region_id: RegionId) {}
fn on_write_stall(&self) {
info!("Engine is stalled");
self.notify.notify_one();
}
}

View File

@@ -12,26 +12,23 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//! Tests for mito engine.
//! Basic tests for mito engine.
use std::collections::HashMap;
use api::helper::ColumnDataTypeWrapper;
use api::v1::value::ValueData;
use api::v1::{ColumnSchema, Row, Rows, SemanticType};
use api::v1::Rows;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_recordbatch::RecordBatches;
use store_api::metadata::ColumnMetadata;
use store_api::region_request::{
RegionCreateRequest, RegionDeleteRequest, RegionFlushRequest, RegionOpenRequest,
RegionPutRequest,
};
use store_api::region_request::RegionOpenRequest;
use store_api::storage::RegionId;
use super::*;
use crate::region::version::VersionControlData;
use crate::test_util::{CreateRequestBuilder, TestEnv};
use crate::test_util::{
build_delete_rows_for_key, build_rows, build_rows_for_key, delete_rows, delete_rows_schema,
put_rows, rows_schema, CreateRequestBuilder, TestEnv,
};
#[tokio::test]
async fn test_engine_new_stop() {
@@ -60,54 +57,6 @@ async fn test_engine_new_stop() {
);
}
fn column_metadata_to_column_schema(metadata: &ColumnMetadata) -> api::v1::ColumnSchema {
api::v1::ColumnSchema {
column_name: metadata.column_schema.name.clone(),
datatype: ColumnDataTypeWrapper::try_from(metadata.column_schema.data_type.clone())
.unwrap()
.datatype() as i32,
semantic_type: metadata.semantic_type as i32,
}
}
fn build_rows(start: usize, end: usize) -> Vec<Row> {
(start..end)
.map(|i| api::v1::Row {
values: vec![
api::v1::Value {
value_data: Some(ValueData::StringValue(i.to_string())),
},
api::v1::Value {
value_data: Some(ValueData::F64Value(i as f64)),
},
api::v1::Value {
value_data: Some(ValueData::TsMillisecondValue(i as i64 * 1000)),
},
],
})
.collect()
}
fn rows_schema(request: &RegionCreateRequest) -> Vec<ColumnSchema> {
request
.column_metadatas
.iter()
.map(column_metadata_to_column_schema)
.collect::<Vec<_>>()
}
async fn put_rows(engine: &MitoEngine, region_id: RegionId, rows: Rows) {
let num_rows = rows.rows.len();
let output = engine
.handle_request(region_id, RegionRequest::Put(RegionPutRequest { rows }))
.await
.unwrap();
let Output::AffectedRows(rows_inserted) = output else {
unreachable!()
};
assert_eq!(num_rows, rows_inserted);
}
#[tokio::test]
async fn test_write_to_region() {
let mut env = TestEnv::with_prefix("write-to-region");
@@ -157,13 +106,7 @@ async fn test_region_replay() {
};
put_rows(&engine, region_id, rows).await;
engine.stop().await.unwrap();
let engine = MitoEngine::new(
MitoConfig::default(),
env.get_logstore().unwrap(),
env.get_object_store().unwrap(),
);
let engine = env.reopen_engine(engine, MitoConfig::default()).await;
let open_region = engine
.handle_request(
@@ -200,8 +143,6 @@ async fn test_region_replay() {
engine.stop().await.unwrap();
}
// TODO(yingwen): build_rows() only generate one point for each series. We need to add tests
// for series with multiple points and other cases.
#[tokio::test]
async fn test_write_query_region() {
let mut env = TestEnv::new();
@@ -236,66 +177,6 @@ async fn test_write_query_region() {
assert_eq!(expected, batches.pretty_print().unwrap());
}
/// Build rows to put for specific `key`.
fn build_rows_for_key(key: &str, start: usize, end: usize, value_start: usize) -> Vec<Row> {
(start..end)
.enumerate()
.map(|(idx, ts)| api::v1::Row {
values: vec![
api::v1::Value {
value_data: Some(ValueData::StringValue(key.to_string())),
},
api::v1::Value {
value_data: Some(ValueData::F64Value((value_start + idx) as f64)),
},
api::v1::Value {
value_data: Some(ValueData::TsMillisecondValue(ts as i64 * 1000)),
},
],
})
.collect()
}
/// Build rows to delete for specific `key`.
fn build_delete_rows_for_key(key: &str, start: usize, end: usize) -> Vec<Row> {
(start..end)
.map(|ts| api::v1::Row {
values: vec![
api::v1::Value {
value_data: Some(ValueData::StringValue(key.to_string())),
},
api::v1::Value {
value_data: Some(ValueData::TsMillisecondValue(ts as i64 * 1000)),
},
],
})
.collect()
}
fn delete_rows_schema(request: &RegionCreateRequest) -> Vec<ColumnSchema> {
request
.column_metadatas
.iter()
.filter(|col| col.semantic_type != SemanticType::Field)
.map(column_metadata_to_column_schema)
.collect::<Vec<_>>()
}
async fn delete_rows(engine: &MitoEngine, region_id: RegionId, rows: Rows) {
let num_rows = rows.rows.len();
let output = engine
.handle_request(
region_id,
RegionRequest::Delete(RegionDeleteRequest { rows }),
)
.await
.unwrap();
let Output::AffectedRows(rows_inserted) = output else {
unreachable!()
};
assert_eq!(num_rows, rows_inserted);
}
#[tokio::test]
async fn test_put_delete() {
let mut env = TestEnv::new();
@@ -409,52 +290,3 @@ async fn test_put_overwrite() {
+-------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());
}
#[tokio::test]
async fn test_manual_flush() {
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
let column_schemas = request
.column_metadatas
.iter()
.map(column_metadata_to_column_schema)
.collect::<Vec<_>>();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
let rows = Rows {
schema: column_schemas,
rows: build_rows(0, 3),
};
put_rows(&engine, region_id, rows).await;
let Output::AffectedRows(rows) = engine
.handle_request(region_id, RegionRequest::Flush(RegionFlushRequest {}))
.await
.unwrap()
else {
unreachable!()
};
assert_eq!(0, rows);
let request = ScanRequest::default();
let scanner = engine.scan(region_id, request).unwrap();
assert_eq!(1, scanner.num_files());
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 0 | 0.0 | 1970-01-01T00:00:00 |
| 1 | 1.0 | 1970-01-01T00:00:01 |
| 2 | 2.0 | 1970-01-01T00:00:02 |
+-------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());
}

View File

@@ -424,6 +424,16 @@ pub enum Error {
region_id: RegionId,
location: Location,
},
#[snafu(display(
"Engine write buffer is full, rejecting write requests of region {}, location: {}",
region_id,
location
))]
RejectWrite {
region_id: RegionId,
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -489,6 +499,7 @@ impl ErrorExt for Error {
FlushRegion { source, .. } => source.status_code(),
RegionDropped { .. } => StatusCode::Cancelled,
RegionClosed { .. } => StatusCode::Cancelled,
RejectWrite { .. } => StatusCode::StorageUnavailable,
}
}

View File

@@ -45,6 +45,9 @@ pub trait WriteBufferManager: Send + Sync + std::fmt::Debug {
/// Returns whether to trigger the engine.
fn should_flush_engine(&self) -> bool;
/// Returns whether to stall write requests.
fn should_stall(&self) -> bool;
/// Reserves `mem` bytes.
fn reserve_mem(&self, mem: usize);
@@ -132,6 +135,10 @@ impl WriteBufferManager for WriteBufferManagerImpl {
false
}
fn should_stall(&self) -> bool {
self.memory_usage() >= self.global_write_buffer_size
}
fn reserve_mem(&self, mem: usize) {
self.memory_used.fetch_add(mem, Ordering::Relaxed);
self.memory_active.fetch_add(mem, Ordering::Relaxed);
@@ -553,6 +560,7 @@ mod tests {
let manager = WriteBufferManagerImpl::new(1000);
manager.reserve_mem(500);
assert!(!manager.should_flush_engine());
assert!(!manager.should_stall());
// More than mutable limit.
manager.reserve_mem(400);
@@ -575,6 +583,7 @@ mod tests {
// Mutable limit is 800.
let manager = WriteBufferManagerImpl::new(1000);
manager.reserve_mem(1100);
assert!(manager.should_stall());
// Global usage is still 1100.
manager.schedule_free_mem(200);
assert!(manager.should_flush_engine());

View File

@@ -26,6 +26,7 @@ use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value};
use common_base::readable_size::ReadableSize;
use common_query::Output;
use datatypes::prelude::DataType;
use prost::Message;
use smallvec::SmallVec;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::{ColumnMetadata, RegionMetadata};
@@ -133,6 +134,17 @@ impl WriteRequest {
})
}
/// Returns estimated size of the request.
pub(crate) fn estimated_size(&self) -> usize {
let row_size = self
.rows
.rows
.get(0)
.map(|row| row.encoded_len())
.unwrap_or(0);
row_size * self.rows.rows.len()
}
/// Gets column index by name.
pub(crate) fn column_index_by_name(&self, name: &str) -> Option<usize> {
self.name_to_index.get(name).copied()

View File

@@ -15,12 +15,15 @@
//! Utilities for testing.
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use api::greptime_proto::v1;
use api::helper::ColumnDataTypeWrapper;
use api::v1::value::ValueData;
use api::v1::{OpType, SemanticType};
use api::v1::{OpType, Row, Rows, SemanticType};
use common_datasource::compression::CompressionType;
use common_query::Output;
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use datatypes::arrow::array::{TimestampMillisecondArray, UInt64Array, UInt8Array};
use datatypes::prelude::ConcreteDataType;
@@ -30,11 +33,17 @@ use log_store::test_util::log_store_util;
use object_store::services::Fs;
use object_store::ObjectStore;
use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
use store_api::region_request::RegionCreateRequest;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{
RegionCreateRequest, RegionDeleteRequest, RegionFlushRequest, RegionPutRequest, RegionRequest,
};
use store_api::storage::RegionId;
use crate::config::MitoConfig;
use crate::engine::listener::EventListenerRef;
use crate::engine::MitoEngine;
use crate::error::Result;
use crate::flush::{WriteBufferManager, WriteBufferManagerRef};
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
use crate::read::{Batch, BatchBuilder, BatchReader};
use crate::worker::WorkerGroup;
@@ -90,6 +99,51 @@ impl TestEnv {
MitoEngine::new(config, logstore, object_store)
}
/// Creates a new engine with specific config and manager/listener under this env.
pub async fn create_engine_with(
&mut self,
config: MitoConfig,
manager: WriteBufferManagerRef,
listener: Option<EventListenerRef>,
) -> MitoEngine {
let (log_store, object_store) = self.create_log_and_object_store().await;
let logstore = Arc::new(log_store);
self.logstore = Some(logstore.clone());
self.object_store = Some(object_store.clone());
MitoEngine::new_for_test(config, logstore, object_store, manager, listener)
}
/// Reopen the engine.
pub async fn reopen_engine(&mut self, engine: MitoEngine, config: MitoConfig) -> MitoEngine {
engine.stop().await.unwrap();
MitoEngine::new(
config,
self.logstore.clone().unwrap(),
self.object_store.clone().unwrap(),
)
}
/// Reopen the engine.
pub async fn reopen_engine_with(
&self,
engine: MitoEngine,
config: MitoConfig,
manager: WriteBufferManagerRef,
listener: Option<EventListenerRef>,
) -> MitoEngine {
engine.stop().await.unwrap();
MitoEngine::new_for_test(
config,
self.logstore.clone().unwrap(),
self.object_store.clone().unwrap(),
manager,
listener,
)
}
/// Creates a new [WorkerGroup] with specific config under this env.
pub(crate) async fn create_worker_group(&self, config: MitoConfig) -> WorkerGroup {
let (log_store, object_store) = self.create_log_and_object_store().await;
@@ -329,3 +383,181 @@ pub fn new_batch(
.build()
.unwrap()
}
/// A mock [WriteBufferManager] that supports controlling whether to flush/stall.
#[derive(Debug, Default)]
pub struct MockWriteBufferManager {
should_flush: AtomicBool,
should_stall: AtomicBool,
memory_used: AtomicUsize,
memory_active: AtomicUsize,
}
impl MockWriteBufferManager {
/// Set whether to flush the engine.
pub fn set_should_flush(&self, value: bool) {
self.should_flush.store(value, Ordering::Relaxed);
}
/// Set whether to stall the engine.
pub fn set_should_stall(&self, value: bool) {
self.should_stall.store(value, Ordering::Relaxed);
}
/// Returns memory usage of mutable memtables.
pub fn mutable_usage(&self) -> usize {
self.memory_active.load(Ordering::Relaxed)
}
}
impl WriteBufferManager for MockWriteBufferManager {
fn should_flush_engine(&self) -> bool {
self.should_flush.load(Ordering::Relaxed)
}
fn should_stall(&self) -> bool {
self.should_stall.load(Ordering::Relaxed)
}
fn reserve_mem(&self, mem: usize) {
self.memory_used.fetch_add(mem, Ordering::Relaxed);
self.memory_active.fetch_add(mem, Ordering::Relaxed);
}
fn schedule_free_mem(&self, mem: usize) {
self.memory_active.fetch_sub(mem, Ordering::Relaxed);
}
fn free_mem(&self, mem: usize) {
self.memory_used.fetch_sub(mem, Ordering::Relaxed);
}
fn memory_usage(&self) -> usize {
self.memory_used.load(Ordering::Relaxed)
}
}
fn column_metadata_to_column_schema(metadata: &ColumnMetadata) -> api::v1::ColumnSchema {
api::v1::ColumnSchema {
column_name: metadata.column_schema.name.clone(),
datatype: ColumnDataTypeWrapper::try_from(metadata.column_schema.data_type.clone())
.unwrap()
.datatype() as i32,
semantic_type: metadata.semantic_type as i32,
}
}
/// Build rows with schema (string, f64, ts_millis).
pub fn build_rows(start: usize, end: usize) -> Vec<Row> {
(start..end)
.map(|i| api::v1::Row {
values: vec![
api::v1::Value {
value_data: Some(ValueData::StringValue(i.to_string())),
},
api::v1::Value {
value_data: Some(ValueData::F64Value(i as f64)),
},
api::v1::Value {
value_data: Some(ValueData::TsMillisecondValue(i as i64 * 1000)),
},
],
})
.collect()
}
/// Get column schemas for rows.
pub fn rows_schema(request: &RegionCreateRequest) -> Vec<api::v1::ColumnSchema> {
request
.column_metadatas
.iter()
.map(column_metadata_to_column_schema)
.collect::<Vec<_>>()
}
/// Get column schemas for delete requests.
pub fn delete_rows_schema(request: &RegionCreateRequest) -> Vec<api::v1::ColumnSchema> {
request
.column_metadatas
.iter()
.filter(|col| col.semantic_type != SemanticType::Field)
.map(column_metadata_to_column_schema)
.collect::<Vec<_>>()
}
/// Put rows into the engine.
pub async fn put_rows(engine: &MitoEngine, region_id: RegionId, rows: Rows) {
let num_rows = rows.rows.len();
let output = engine
.handle_request(region_id, RegionRequest::Put(RegionPutRequest { rows }))
.await
.unwrap();
let Output::AffectedRows(rows_inserted) = output else {
unreachable!()
};
assert_eq!(num_rows, rows_inserted);
}
/// Build rows to put for specific `key`.
pub fn build_rows_for_key(key: &str, start: usize, end: usize, value_start: usize) -> Vec<Row> {
(start..end)
.enumerate()
.map(|(idx, ts)| api::v1::Row {
values: vec![
api::v1::Value {
value_data: Some(ValueData::StringValue(key.to_string())),
},
api::v1::Value {
value_data: Some(ValueData::F64Value((value_start + idx) as f64)),
},
api::v1::Value {
value_data: Some(ValueData::TsMillisecondValue(ts as i64 * 1000)),
},
],
})
.collect()
}
/// Build rows to delete for specific `key`.
pub fn build_delete_rows_for_key(key: &str, start: usize, end: usize) -> Vec<Row> {
(start..end)
.map(|ts| api::v1::Row {
values: vec![
api::v1::Value {
value_data: Some(ValueData::StringValue(key.to_string())),
},
api::v1::Value {
value_data: Some(ValueData::TsMillisecondValue(ts as i64 * 1000)),
},
],
})
.collect()
}
/// Delete rows from the engine.
pub async fn delete_rows(engine: &MitoEngine, region_id: RegionId, rows: Rows) {
let num_rows = rows.rows.len();
let output = engine
.handle_request(
region_id,
RegionRequest::Delete(RegionDeleteRequest { rows }),
)
.await
.unwrap();
let Output::AffectedRows(rows_inserted) = output else {
unreachable!()
};
assert_eq!(num_rows, rows_inserted);
}
/// Flush a region manually.
pub async fn flush_region(engine: &MitoEngine, region_id: RegionId) {
let Output::AffectedRows(rows) = engine
.handle_request(region_id, RegionRequest::Flush(RegionFlushRequest {}))
.await
.unwrap()
else {
unreachable!()
};
assert_eq!(0, rows);
}

View File

@@ -42,7 +42,9 @@ use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef
use crate::memtable::time_series::TimeSeriesMemtableBuilder;
use crate::memtable::MemtableBuilderRef;
use crate::region::{MitoRegionRef, RegionMap, RegionMapRef};
use crate::request::{BackgroundNotify, DdlRequest, SenderDdlRequest, WorkerRequest};
use crate::request::{
BackgroundNotify, DdlRequest, SenderDdlRequest, SenderWriteRequest, WorkerRequest,
};
use crate::schedule::scheduler::{LocalScheduler, SchedulerRef};
use crate::wal::Wal;
@@ -94,7 +96,7 @@ pub(crate) struct WorkerGroup {
}
impl WorkerGroup {
/// Start a worker group.
/// Starts a worker group.
///
/// The number of workers should be power of two.
pub(crate) fn start<S: LogStore>(
@@ -118,6 +120,7 @@ impl WorkerGroup {
object_store: object_store.clone(),
write_buffer_manager: write_buffer_manager.clone(),
scheduler: scheduler.clone(),
listener: WorkerListener::default(),
}
.start()
})
@@ -126,18 +129,19 @@ impl WorkerGroup {
WorkerGroup { workers, scheduler }
}
/// Stop the worker group.
/// Stops the worker group.
pub(crate) async fn stop(&self) -> Result<()> {
info!("Stop region worker group");
try_join_all(self.workers.iter().map(|worker| worker.stop())).await?;
// Stops the scheduler gracefully.
self.scheduler.stop(true).await?;
try_join_all(self.workers.iter().map(|worker| worker.stop())).await?;
Ok(())
}
/// Submit a request to a worker in the group.
/// Submits a request to a worker in the group.
pub(crate) async fn submit_to_worker(
&self,
region_id: RegionId,
@@ -169,6 +173,42 @@ impl WorkerGroup {
}
}
// Tests methods.
#[cfg(test)]
impl WorkerGroup {
/// Starts a worker group with `write_buffer_manager` and `listener` for tests.
///
/// The number of workers should be power of two.
pub(crate) fn start_for_test<S: LogStore>(
config: MitoConfig,
log_store: Arc<S>,
object_store: ObjectStore,
write_buffer_manager: WriteBufferManagerRef,
listener: Option<crate::engine::listener::EventListenerRef>,
) -> WorkerGroup {
assert!(config.num_workers.is_power_of_two());
let config = Arc::new(config);
let scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs));
let workers = (0..config.num_workers)
.map(|id| {
WorkerStarter {
id: id as WorkerId,
config: config.clone(),
log_store: log_store.clone(),
object_store: object_store.clone(),
write_buffer_manager: write_buffer_manager.clone(),
scheduler: scheduler.clone(),
listener: WorkerListener::new(listener.clone()),
}
.start()
})
.collect();
WorkerGroup { workers, scheduler }
}
}
fn value_to_index(value: usize, num_workers: usize) -> usize {
value & (num_workers - 1)
}
@@ -181,10 +221,11 @@ struct WorkerStarter<S> {
object_store: ObjectStore,
write_buffer_manager: WriteBufferManagerRef,
scheduler: SchedulerRef,
listener: WorkerListener,
}
impl<S: LogStore> WorkerStarter<S> {
/// Start a region worker and its background thread.
/// Starts a region worker and its background thread.
fn start(self) -> RegionWorker {
let regions = Arc::new(RegionMap::default());
let (sender, receiver) = mpsc::channel(self.config.worker_channel_size);
@@ -206,6 +247,8 @@ impl<S: LogStore> WorkerStarter<S> {
scheduler: self.scheduler.clone(),
write_buffer_manager: self.write_buffer_manager,
flush_scheduler: FlushScheduler::new(self.scheduler),
stalled_requests: StalledRequests::default(),
listener: self.listener,
};
let handle = common_runtime::spawn_write(async move {
worker_thread.run().await;
@@ -236,7 +279,7 @@ pub(crate) struct RegionWorker {
}
impl RegionWorker {
/// Submit request to background worker thread.
/// Submits request to background worker thread.
async fn submit_request(&self, request: WorkerRequest) -> Result<()> {
ensure!(self.is_running(), WorkerStoppedSnafu { id: self.id });
if self.sender.send(request).await.is_err() {
@@ -303,6 +346,29 @@ impl Drop for RegionWorker {
type RequestBuffer = Vec<WorkerRequest>;
/// Buffer for stalled write requests.
///
/// Maintains stalled write requests and their estimated size.
#[derive(Default)]
pub(crate) struct StalledRequests {
/// Stalled requests.
pub(crate) requests: Vec<SenderWriteRequest>,
/// Estimated size of all stalled requests.
pub(crate) estimated_size: usize,
}
impl StalledRequests {
/// Appends stalled requests.
pub(crate) fn append(&mut self, requests: &mut Vec<SenderWriteRequest>) {
let size: usize = requests
.iter()
.map(|req| req.request.estimated_size())
.sum();
self.requests.append(requests);
self.estimated_size += size;
}
}
/// Background worker loop to handle requests.
struct RegionWorkerLoop<S> {
// Id of the worker.
@@ -331,6 +397,10 @@ struct RegionWorkerLoop<S> {
write_buffer_manager: WriteBufferManagerRef,
/// Schedules background flush requests.
flush_scheduler: FlushScheduler,
/// Stalled write requests.
stalled_requests: StalledRequests,
/// Event listener for tests.
listener: WorkerListener,
}
impl<S: LogStore> RegionWorkerLoop<S> {
@@ -397,7 +467,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
// Handles all write requests first. So we can alter regions without
// considering existing write requests.
self.handle_write_requests(write_requests).await;
self.handle_write_requests(write_requests, true).await;
self.handle_ddl_requests(ddl_requests).await;
}
@@ -455,6 +525,40 @@ impl<S> RegionWorkerLoop<S> {
}
}
/// Wrapper that only calls event listener in tests.
#[derive(Default)]
pub(crate) struct WorkerListener {
#[cfg(test)]
listener: Option<crate::engine::listener::EventListenerRef>,
}
impl WorkerListener {
#[cfg(test)]
pub(crate) fn new(
listener: Option<crate::engine::listener::EventListenerRef>,
) -> WorkerListener {
WorkerListener { listener }
}
/// Flush is finished successfully.
pub(crate) fn on_flush_success(&self, region_id: RegionId) {
#[cfg(test)]
if let Some(listener) = &self.listener {
listener.on_flush_success(region_id);
}
// Avoid compiler warning.
let _ = region_id;
}
/// Engine is stalled.
pub(crate) fn on_write_stall(&self) {
#[cfg(test)]
if let Some(listener) = &self.listener {
listener.on_write_stall();
}
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -72,13 +72,20 @@ impl<S: LogStore> RegionWorkerLoop<S> {
return;
}
// Handle pending requests of the region.
// Notifies waiters.
request.on_success();
// Handle pending DDL requests for the region.
if let Some(ddl_requests) = self.flush_scheduler.on_flush_success(region_id) {
self.handle_ddl_requests(ddl_requests).await;
}
// Notifies waiters.
request.on_success();
// Handle stalled requests.
let stalled = std::mem::take(&mut self.stalled_requests);
// We already stalled these requests, don't stall them again.
self.handle_write_requests(stalled.requests, false).await;
self.listener.on_flush_success(region_id);
}
}

View File

@@ -23,14 +23,18 @@ use store_api::metadata::RegionMetadata;
use store_api::storage::RegionId;
use tokio::sync::oneshot::Sender;
use crate::error::{RegionNotFoundSnafu, Result};
use crate::error::{RegionNotFoundSnafu, RejectWriteSnafu, Result};
use crate::region_write_ctx::RegionWriteCtx;
use crate::request::{SenderWriteRequest, WriteRequest};
use crate::worker::RegionWorkerLoop;
impl<S: LogStore> RegionWorkerLoop<S> {
/// Takes and handles all write requests.
pub(crate) async fn handle_write_requests(&mut self, write_requests: Vec<SenderWriteRequest>) {
pub(crate) async fn handle_write_requests(
&mut self,
mut write_requests: Vec<SenderWriteRequest>,
allow_stall: bool,
) {
if write_requests.is_empty() {
return;
}
@@ -41,6 +45,16 @@ impl<S: LogStore> RegionWorkerLoop<S> {
if self.should_reject_write() {
// The memory pressure is still too high, reject write requests.
reject_write_requests(write_requests);
// Also reject all stalled requests.
let stalled = std::mem::take(&mut self.stalled_requests);
reject_write_requests(stalled.requests);
return;
}
if self.write_buffer_manager.should_stall() && allow_stall {
// TODO(yingwen): stalled metrics.
self.stalled_requests.append(&mut write_requests);
self.listener.on_write_stall();
return;
}
@@ -118,15 +132,24 @@ impl<S> RegionWorkerLoop<S> {
/// Returns true if the engine needs to reject some write requests.
fn should_reject_write(&self) -> bool {
// If memory usage reaches high threshold (we should also consider pending flush requests) returns true.
// TODO(yingwen): Implement this.
false
// If memory usage reaches high threshold (we should also consider stalled requests) returns true.
self.write_buffer_manager.memory_usage() + self.stalled_requests.estimated_size
>= self.config.global_write_buffer_reject_size.as_bytes() as usize
}
}
/// Send rejected error to all `write_requests`.
fn reject_write_requests(_write_requests: Vec<SenderWriteRequest>) {
unimplemented!()
fn reject_write_requests(write_requests: Vec<SenderWriteRequest>) {
for req in write_requests {
if let Some(sender) = req.sender {
let _ = sender.send(
RejectWriteSnafu {
region_id: req.request.region_id,
}
.fail(),
);
}
}
}
/// Checks the schema and fill missing columns.