fix: executes pending ddls if region memtable is empty while scheduling next flush (#4119)

* ci: enable debug log

* chore: test to reproduce panic

* chore: Revert "ci: enable debug log"

This reverts commit 17eff2a045.

* test: add test for alter during flush

* fix: clear status if region has nothing to flush

It will also executes pending ddls and requests

* docs: fix typo
This commit is contained in:
Yingwen
2024-06-11 08:10:17 +08:00
committed by GitHub
parent d8b51cfaba
commit 9cae15bd1b
8 changed files with 334 additions and 40 deletions

View File

@@ -518,13 +518,10 @@ fn get_expired_ssts(
#[cfg(test)]
mod tests {
use std::sync::Mutex;
use tokio::sync::oneshot;
use super::*;
use crate::schedule::scheduler::{Job, Scheduler};
use crate::test_util::scheduler_util::SchedulerEnv;
use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
use crate::test_util::version_util::{apply_edit, VersionControlBuilder};
#[tokio::test]
@@ -574,29 +571,6 @@ mod tests {
assert!(scheduler.region_status.is_empty());
}
#[derive(Default)]
struct VecScheduler {
jobs: Mutex<Vec<Job>>,
}
impl VecScheduler {
fn num_jobs(&self) -> usize {
self.jobs.lock().unwrap().len()
}
}
#[async_trait::async_trait]
impl Scheduler for VecScheduler {
fn schedule(&self, job: Job) -> Result<()> {
self.jobs.lock().unwrap().push(job);
Ok(())
}
async fn stop(&self, _await_termination: bool) -> Result<()> {
Ok(())
}
}
#[tokio::test]
async fn test_schedule_on_finished() {
let job_scheduler = Arc::new(VecScheduler::default());

View File

@@ -13,6 +13,8 @@
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, Row, Rows, SemanticType};
@@ -29,9 +31,11 @@ use store_api::region_request::{
use store_api::storage::{RegionId, ScanRequest};
use crate::config::MitoConfig;
use crate::engine::listener::AlterFlushListener;
use crate::engine::MitoEngine;
use crate::test_util::{
build_rows, build_rows_for_key, put_rows, rows_schema, CreateRequestBuilder, TestEnv,
build_rows, build_rows_for_key, flush_region, put_rows, rows_schema, CreateRequestBuilder,
TestEnv,
};
async fn scan_check_after_alter(engine: &MitoEngine, region_id: RegionId, expected: &str) {
@@ -300,3 +304,96 @@ async fn test_alter_region_retry() {
assert_eq!(1, version_data.version.flushed_entry_id);
assert_eq!(2, version_data.version.flushed_sequence);
}
#[tokio::test]
async fn test_alter_on_flushing() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let listener = Arc::new(AlterFlushListener::default());
let engine = env
.create_engine_with(MitoConfig::default(), None, Some(listener.clone()))
.await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
// Prepares rows for flush.
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows_for_key("a", 0, 2, 0),
};
put_rows(&engine, region_id, rows).await;
// Spawns a task to flush the engine.
let engine_cloned = engine.clone();
let flush_job = tokio::spawn(async move {
flush_region(&engine_cloned, region_id, None).await;
});
// Waits for flush begin.
listener.wait_flush_begin().await;
// Consumes the notify permit in the listener.
listener.wait_request_begin().await;
// Submits an alter request to the region. The region should add the request
// to the pending ddl request list.
let request = add_tag1();
let engine_cloned = engine.clone();
let alter_job = tokio::spawn(async move {
engine_cloned
.handle_request(region_id, RegionRequest::Alter(request))
.await
.unwrap();
});
// Waits until the worker handles the alter request.
listener.wait_request_begin().await;
// Spawns two task to flush the engine. The flush scheduler should put them to the
// pending task list.
let engine_cloned = engine.clone();
let pending_flush_job = tokio::spawn(async move {
flush_region(&engine_cloned, region_id, None).await;
});
// Waits until the worker handles the flush request.
listener.wait_request_begin().await;
// Wake up flush.
listener.wake_flush();
// Wait for the flush job.
tokio::time::timeout(Duration::from_secs(5), flush_job)
.await
.unwrap()
.unwrap();
// Wait for pending flush job.
tokio::time::timeout(Duration::from_secs(5), pending_flush_job)
.await
.unwrap()
.unwrap();
// Wait for the write job.
tokio::time::timeout(Duration::from_secs(5), alter_job)
.await
.unwrap()
.unwrap();
let request = ScanRequest::default();
let scanner = engine.scanner(region_id, request).unwrap();
assert_eq!(0, scanner.num_memtables());
assert_eq!(1, scanner.num_files());
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+-------+---------+---------------------+
| tag_1 | tag_0 | field_0 | ts |
+-------+-------+---------+---------------------+
| | a | 0.0 | 1970-01-01T00:00:00 |
| | a | 1.0 | 1970-01-01T00:00:01 |
+-------+-------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());
}

View File

@@ -56,6 +56,11 @@ pub trait EventListener: Send + Sync {
async fn on_merge_ssts_finished(&self, region_id: RegionId) {
let _ = region_id;
}
/// Notifies the listener that the worker receives requests from the request channel.
fn on_recv_requests(&self, request_num: usize) {
let _ = request_num;
}
}
pub type EventListenerRef = Arc<dyn EventListener>;
@@ -210,3 +215,46 @@ impl EventListener for CompactionListener {
self.blocker.notified().await;
}
}
/// Listener to block on flush and alter.
#[derive(Default)]
pub struct AlterFlushListener {
flush_begin_notify: Notify,
block_flush_notify: Notify,
request_begin_notify: Notify,
}
impl AlterFlushListener {
/// Waits on flush begin.
pub async fn wait_flush_begin(&self) {
self.flush_begin_notify.notified().await;
}
/// Waits on request begin.
pub async fn wait_request_begin(&self) {
self.request_begin_notify.notified().await;
}
/// Continue the flush job.
pub fn wake_flush(&self) {
self.block_flush_notify.notify_one();
}
}
#[async_trait]
impl EventListener for AlterFlushListener {
async fn on_flush_begin(&self, region_id: RegionId) {
info!("Wait on notify to start flush for region {}", region_id);
self.flush_begin_notify.notify_one();
self.block_flush_notify.notified().await;
info!("region {} begin flush", region_id);
}
fn on_recv_requests(&self, request_num: usize) {
info!("receive {} request", request_num);
self.request_begin_notify.notify_one();
}
}

View File

@@ -523,11 +523,26 @@ impl FlushScheduler {
let pending_requests = if flush_status.pending_task.is_none() {
// The region doesn't have any pending flush task.
// Safety: The flush status exists.
// Safety: The flush status must exist.
let flush_status = self.region_status.remove(&region_id).unwrap();
Some((flush_status.pending_ddls, flush_status.pending_writes))
} else {
None
let version_data = flush_status.version_control.current();
if version_data.version.memtables.is_empty() {
// The region has nothing to flush, we also need to remove it from the status.
// Safety: The pending task is not None.
let task = flush_status.pending_task.take().unwrap();
// The region has nothing to flush. We can notify pending task.
task.on_success();
// `schedule_next_flush()` may pick up the same region to flush, so we must remove
// it from the status to avoid leaking pending requests.
// Safety: The flush status must exist.
let flush_status = self.region_status.remove(&region_id).unwrap();
Some((flush_status.pending_ddls, flush_status.pending_writes))
} else {
// We can flush the region again, keep it in the region status.
None
}
};
// Schedule next flush job.
@@ -718,8 +733,9 @@ mod tests {
use super::*;
use crate::cache::CacheManager;
use crate::test_util::scheduler_util::SchedulerEnv;
use crate::test_util::version_util::VersionControlBuilder;
use crate::memtable::time_series::TimeSeriesMemtableBuilder;
use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
use crate::test_util::version_util::{write_rows_to_version, VersionControlBuilder};
#[test]
fn test_get_mutable_limit() {
@@ -807,4 +823,82 @@ mod tests {
assert_eq!(output, 0);
assert!(scheduler.region_status.is_empty());
}
#[tokio::test]
async fn test_schedule_pending_request() {
let job_scheduler = Arc::new(VecScheduler::default());
let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
let (tx, _rx) = mpsc::channel(4);
let mut scheduler = env.mock_flush_scheduler();
let mut builder = VersionControlBuilder::new();
// Overwrites the empty memtable builder.
builder.set_memtable_builder(Arc::new(TimeSeriesMemtableBuilder::default()));
let version_control = Arc::new(builder.build());
// Writes data to the memtable so it is not empty.
let version_data = version_control.current();
write_rows_to_version(&version_data.version, "host0", 0, 10);
let manifest_ctx = env
.mock_manifest_context(version_data.version.metadata.clone())
.await;
// Creates 3 tasks.
let mut tasks: Vec<_> = (0..3)
.map(|_| RegionFlushTask {
region_id: builder.region_id(),
reason: FlushReason::Others,
senders: Vec::new(),
request_sender: tx.clone(),
access_layer: env.access_layer.clone(),
listener: WorkerListener::default(),
engine_config: Arc::new(MitoConfig::default()),
row_group_size: None,
cache_manager: Arc::new(CacheManager::default()),
manifest_ctx: manifest_ctx.clone(),
index_options: IndexOptions::default(),
})
.collect();
// Schedule first task.
let task = tasks.pop().unwrap();
scheduler
.schedule_flush(builder.region_id(), &version_control, task)
.unwrap();
// Should schedule 1 flush.
assert_eq!(1, scheduler.region_status.len());
assert_eq!(1, job_scheduler.num_jobs());
// Check the new version.
let version_data = version_control.current();
assert_eq!(0, version_data.version.memtables.immutables()[0].id());
// Schedule remaining tasks.
let output_rxs: Vec<_> = tasks
.into_iter()
.map(|mut task| {
let (output_tx, output_rx) = oneshot::channel();
task.push_sender(OptionOutputTx::from(output_tx));
scheduler
.schedule_flush(builder.region_id(), &version_control, task)
.unwrap();
output_rx
})
.collect();
// Assumes the flush job is finished.
version_control.apply_edit(
RegionEdit {
files_to_add: Vec::new(),
files_to_remove: Vec::new(),
compaction_time_window: None,
flushed_entry_id: None,
flushed_sequence: None,
},
&[0],
builder.file_purger(),
);
scheduler.on_flush_success(builder.region_id());
// No new flush task.
assert_eq!(1, job_scheduler.num_jobs());
// The flush status is cleared.
assert!(scheduler.region_status.is_empty());
for output_rx in output_rxs {
let output = output_rx.await.unwrap().unwrap();
assert_eq!(output, 0);
}
}
}

View File

@@ -737,9 +737,7 @@ impl CreateRequestBuilder {
}
}
// TODO(yingwen): Support conversion in greptime-proto.
/// Creates value for i64.
#[cfg(test)]
pub(crate) fn i64_value(data: i64) -> v1::Value {
v1::Value {
value_data: Some(ValueData::I64Value(data)),
@@ -747,7 +745,6 @@ pub(crate) fn i64_value(data: i64) -> v1::Value {
}
/// Creates value for timestamp millis.
#[cfg(test)]
pub(crate) fn ts_ms_value(data: i64) -> v1::Value {
v1::Value {
value_data: Some(ValueData::TimestampMillisecondValue(data)),

View File

@@ -14,7 +14,7 @@
//! Utilities to mock flush and compaction schedulers.
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use common_datasource::compression::CompressionType;
use common_test_util::temp_dir::{create_temp_dir, TempDir};
@@ -28,11 +28,12 @@ use crate::access_layer::{AccessLayer, AccessLayerRef};
use crate::cache::CacheManager;
use crate::compaction::CompactionScheduler;
use crate::config::MitoConfig;
use crate::error::Result;
use crate::flush::FlushScheduler;
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
use crate::region::{ManifestContext, ManifestContextRef, RegionState};
use crate::request::WorkerRequest;
use crate::schedule::scheduler::{LocalScheduler, SchedulerRef};
use crate::schedule::scheduler::{Job, LocalScheduler, Scheduler, SchedulerRef};
use crate::sst::index::intermediate::IntermediateManager;
use crate::worker::WorkerListener;
@@ -123,3 +124,26 @@ impl SchedulerEnv {
.unwrap_or_else(|| Arc::new(LocalScheduler::new(1)))
}
}
#[derive(Default)]
pub struct VecScheduler {
jobs: Mutex<Vec<Job>>,
}
impl VecScheduler {
pub fn num_jobs(&self) -> usize {
self.jobs.lock().unwrap().len()
}
}
#[async_trait::async_trait]
impl Scheduler for VecScheduler {
fn schedule(&self, job: Job) -> Result<()> {
self.jobs.lock().unwrap().push(job);
Ok(())
}
async fn stop(&self, _await_termination: bool) -> Result<()> {
Ok(())
}
}

View File

@@ -17,7 +17,8 @@
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::SemanticType;
use api::v1::value::ValueData;
use api::v1::{self, ColumnDataType, Mutation, OpType, Row, Rows, SemanticType};
use common_time::Timestamp;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
@@ -26,11 +27,12 @@ use store_api::storage::RegionId;
use crate::manifest::action::RegionEdit;
use crate::memtable::time_partition::TimePartitions;
use crate::memtable::{KeyValues, MemtableBuilderRef};
use crate::region::version::{Version, VersionBuilder, VersionControl};
use crate::sst::file::{FileId, FileMeta};
use crate::sst::file_purger::FilePurgerRef;
use crate::test_util::memtable_util::EmptyMemtableBuilder;
use crate::test_util::new_noop_file_purger;
use crate::test_util::{new_noop_file_purger, ts_ms_value};
fn new_region_metadata(region_id: RegionId) -> RegionMetadata {
let mut builder = RegionMetadataBuilder::new(region_id);
@@ -57,7 +59,7 @@ fn new_region_metadata(region_id: RegionId) -> RegionMetadata {
pub(crate) struct VersionControlBuilder {
metadata: RegionMetadata,
file_purger: FilePurgerRef,
memtable_builder: Arc<EmptyMemtableBuilder>,
memtable_builder: MemtableBuilderRef,
files: HashMap<FileId, FileMeta>,
}
@@ -79,6 +81,11 @@ impl VersionControlBuilder {
self.file_purger.clone()
}
pub(crate) fn set_memtable_builder(&mut self, builder: MemtableBuilderRef) -> &mut Self {
self.memtable_builder = builder;
self
}
pub(crate) fn push_l0_file(&mut self, start_ms: i64, end_ms: i64) -> &mut Self {
let file_id = FileId::random();
self.files.insert(
@@ -118,6 +125,48 @@ impl VersionControlBuilder {
}
}
/// Put rows to the mutable memtable in the version.
pub(crate) fn write_rows_to_version(
version: &Version,
tag: &str,
start_ts: usize,
num_rows: usize,
) {
let mut rows = Vec::with_capacity(num_rows);
for idx in 0..num_rows {
let ts = (start_ts + idx) as i64;
let values = vec![
ts_ms_value(ts),
v1::Value {
value_data: Some(ValueData::StringValue(tag.to_string())),
},
];
rows.push(Row { values });
}
let schema = vec![
v1::ColumnSchema {
column_name: "ts".to_string(),
datatype: ColumnDataType::TimestampMillisecond as i32,
semantic_type: SemanticType::Timestamp as i32,
..Default::default()
},
v1::ColumnSchema {
column_name: "tag_0".to_string(),
datatype: ColumnDataType::String as i32,
semantic_type: SemanticType::Tag as i32,
..Default::default()
},
];
let rows = Rows { rows, schema };
let mutation = Mutation {
op_type: OpType::Put as i32,
sequence: start_ts as u64, // The sequence may be incorrect, but it's fine in test.
rows: Some(rows),
};
let key_values = KeyValues::new(&version.metadata, mutation).unwrap();
version.memtables.mutable.write(&key_values).unwrap();
}
/// Add mocked l0 files to the version control.
/// `files_to_add` are slice of `(start_ms, end_ms)`.
pub(crate) fn apply_edit(

View File

@@ -657,6 +657,8 @@ impl<S: LogStore> RegionWorkerLoop<S> {
}
}
self.listener.on_recv_requests(buffer.len());
self.handle_requests(&mut buffer).await;
self.handle_periodical_tasks();
@@ -901,6 +903,15 @@ impl WorkerListener {
// Avoid compiler warning.
let _ = region_id;
}
pub(crate) fn on_recv_requests(&self, request_num: usize) {
#[cfg(any(test, feature = "test"))]
if let Some(listener) = &self.listener {
listener.on_recv_requests(request_num);
}
// Avoid compiler warning.
let _ = request_num;
}
}
#[cfg(test)]