diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index d032c046d3..dd08865581 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -518,13 +518,10 @@ fn get_expired_ssts( #[cfg(test)] mod tests { - use std::sync::Mutex; - use tokio::sync::oneshot; use super::*; - use crate::schedule::scheduler::{Job, Scheduler}; - use crate::test_util::scheduler_util::SchedulerEnv; + use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler}; use crate::test_util::version_util::{apply_edit, VersionControlBuilder}; #[tokio::test] @@ -574,29 +571,6 @@ mod tests { assert!(scheduler.region_status.is_empty()); } - #[derive(Default)] - struct VecScheduler { - jobs: Mutex>, - } - - impl VecScheduler { - fn num_jobs(&self) -> usize { - self.jobs.lock().unwrap().len() - } - } - - #[async_trait::async_trait] - impl Scheduler for VecScheduler { - fn schedule(&self, job: Job) -> Result<()> { - self.jobs.lock().unwrap().push(job); - Ok(()) - } - - async fn stop(&self, _await_termination: bool) -> Result<()> { - Ok(()) - } - } - #[tokio::test] async fn test_schedule_on_finished() { let job_scheduler = Arc::new(VecScheduler::default()); diff --git a/src/mito2/src/engine/alter_test.rs b/src/mito2/src/engine/alter_test.rs index 29f1ecc188..b48dc2ccfb 100644 --- a/src/mito2/src/engine/alter_test.rs +++ b/src/mito2/src/engine/alter_test.rs @@ -13,6 +13,8 @@ // limitations under the License. use std::collections::HashMap; +use std::sync::Arc; +use std::time::Duration; use api::v1::value::ValueData; use api::v1::{ColumnDataType, Row, Rows, SemanticType}; @@ -29,9 +31,11 @@ use store_api::region_request::{ use store_api::storage::{RegionId, ScanRequest}; use crate::config::MitoConfig; +use crate::engine::listener::AlterFlushListener; use crate::engine::MitoEngine; use crate::test_util::{ - build_rows, build_rows_for_key, put_rows, rows_schema, CreateRequestBuilder, TestEnv, + build_rows, build_rows_for_key, flush_region, put_rows, rows_schema, CreateRequestBuilder, + TestEnv, }; async fn scan_check_after_alter(engine: &MitoEngine, region_id: RegionId, expected: &str) { @@ -300,3 +304,96 @@ async fn test_alter_region_retry() { assert_eq!(1, version_data.version.flushed_entry_id); assert_eq!(2, version_data.version.flushed_sequence); } + +#[tokio::test] +async fn test_alter_on_flushing() { + common_telemetry::init_default_ut_logging(); + + let mut env = TestEnv::new(); + let listener = Arc::new(AlterFlushListener::default()); + let engine = env + .create_engine_with(MitoConfig::default(), None, Some(listener.clone())) + .await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + + let column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + // Prepares rows for flush. + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows_for_key("a", 0, 2, 0), + }; + put_rows(&engine, region_id, rows).await; + + // Spawns a task to flush the engine. + let engine_cloned = engine.clone(); + let flush_job = tokio::spawn(async move { + flush_region(&engine_cloned, region_id, None).await; + }); + // Waits for flush begin. + listener.wait_flush_begin().await; + + // Consumes the notify permit in the listener. + listener.wait_request_begin().await; + + // Submits an alter request to the region. The region should add the request + // to the pending ddl request list. + let request = add_tag1(); + let engine_cloned = engine.clone(); + let alter_job = tokio::spawn(async move { + engine_cloned + .handle_request(region_id, RegionRequest::Alter(request)) + .await + .unwrap(); + }); + // Waits until the worker handles the alter request. + listener.wait_request_begin().await; + + // Spawns two task to flush the engine. The flush scheduler should put them to the + // pending task list. + let engine_cloned = engine.clone(); + let pending_flush_job = tokio::spawn(async move { + flush_region(&engine_cloned, region_id, None).await; + }); + // Waits until the worker handles the flush request. + listener.wait_request_begin().await; + + // Wake up flush. + listener.wake_flush(); + // Wait for the flush job. + tokio::time::timeout(Duration::from_secs(5), flush_job) + .await + .unwrap() + .unwrap(); + // Wait for pending flush job. + tokio::time::timeout(Duration::from_secs(5), pending_flush_job) + .await + .unwrap() + .unwrap(); + // Wait for the write job. + tokio::time::timeout(Duration::from_secs(5), alter_job) + .await + .unwrap() + .unwrap(); + + let request = ScanRequest::default(); + let scanner = engine.scanner(region_id, request).unwrap(); + assert_eq!(0, scanner.num_memtables()); + assert_eq!(1, scanner.num_files()); + let stream = scanner.scan().await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++-------+-------+---------+---------------------+ +| tag_1 | tag_0 | field_0 | ts | ++-------+-------+---------+---------------------+ +| | a | 0.0 | 1970-01-01T00:00:00 | +| | a | 1.0 | 1970-01-01T00:00:01 | ++-------+-------+---------+---------------------+"; + assert_eq!(expected, batches.pretty_print().unwrap()); +} diff --git a/src/mito2/src/engine/listener.rs b/src/mito2/src/engine/listener.rs index 8a4c33c6a8..ee69662701 100644 --- a/src/mito2/src/engine/listener.rs +++ b/src/mito2/src/engine/listener.rs @@ -56,6 +56,11 @@ pub trait EventListener: Send + Sync { async fn on_merge_ssts_finished(&self, region_id: RegionId) { let _ = region_id; } + + /// Notifies the listener that the worker receives requests from the request channel. + fn on_recv_requests(&self, request_num: usize) { + let _ = request_num; + } } pub type EventListenerRef = Arc; @@ -210,3 +215,46 @@ impl EventListener for CompactionListener { self.blocker.notified().await; } } + +/// Listener to block on flush and alter. +#[derive(Default)] +pub struct AlterFlushListener { + flush_begin_notify: Notify, + block_flush_notify: Notify, + request_begin_notify: Notify, +} + +impl AlterFlushListener { + /// Waits on flush begin. + pub async fn wait_flush_begin(&self) { + self.flush_begin_notify.notified().await; + } + + /// Waits on request begin. + pub async fn wait_request_begin(&self) { + self.request_begin_notify.notified().await; + } + + /// Continue the flush job. + pub fn wake_flush(&self) { + self.block_flush_notify.notify_one(); + } +} + +#[async_trait] +impl EventListener for AlterFlushListener { + async fn on_flush_begin(&self, region_id: RegionId) { + info!("Wait on notify to start flush for region {}", region_id); + + self.flush_begin_notify.notify_one(); + self.block_flush_notify.notified().await; + + info!("region {} begin flush", region_id); + } + + fn on_recv_requests(&self, request_num: usize) { + info!("receive {} request", request_num); + + self.request_begin_notify.notify_one(); + } +} diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 5bfec096cb..8d720f137e 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -523,11 +523,26 @@ impl FlushScheduler { let pending_requests = if flush_status.pending_task.is_none() { // The region doesn't have any pending flush task. - // Safety: The flush status exists. + // Safety: The flush status must exist. let flush_status = self.region_status.remove(®ion_id).unwrap(); Some((flush_status.pending_ddls, flush_status.pending_writes)) } else { - None + let version_data = flush_status.version_control.current(); + if version_data.version.memtables.is_empty() { + // The region has nothing to flush, we also need to remove it from the status. + // Safety: The pending task is not None. + let task = flush_status.pending_task.take().unwrap(); + // The region has nothing to flush. We can notify pending task. + task.on_success(); + // `schedule_next_flush()` may pick up the same region to flush, so we must remove + // it from the status to avoid leaking pending requests. + // Safety: The flush status must exist. + let flush_status = self.region_status.remove(®ion_id).unwrap(); + Some((flush_status.pending_ddls, flush_status.pending_writes)) + } else { + // We can flush the region again, keep it in the region status. + None + } }; // Schedule next flush job. @@ -718,8 +733,9 @@ mod tests { use super::*; use crate::cache::CacheManager; - use crate::test_util::scheduler_util::SchedulerEnv; - use crate::test_util::version_util::VersionControlBuilder; + use crate::memtable::time_series::TimeSeriesMemtableBuilder; + use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler}; + use crate::test_util::version_util::{write_rows_to_version, VersionControlBuilder}; #[test] fn test_get_mutable_limit() { @@ -807,4 +823,82 @@ mod tests { assert_eq!(output, 0); assert!(scheduler.region_status.is_empty()); } + + #[tokio::test] + async fn test_schedule_pending_request() { + let job_scheduler = Arc::new(VecScheduler::default()); + let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone()); + let (tx, _rx) = mpsc::channel(4); + let mut scheduler = env.mock_flush_scheduler(); + let mut builder = VersionControlBuilder::new(); + // Overwrites the empty memtable builder. + builder.set_memtable_builder(Arc::new(TimeSeriesMemtableBuilder::default())); + let version_control = Arc::new(builder.build()); + // Writes data to the memtable so it is not empty. + let version_data = version_control.current(); + write_rows_to_version(&version_data.version, "host0", 0, 10); + let manifest_ctx = env + .mock_manifest_context(version_data.version.metadata.clone()) + .await; + // Creates 3 tasks. + let mut tasks: Vec<_> = (0..3) + .map(|_| RegionFlushTask { + region_id: builder.region_id(), + reason: FlushReason::Others, + senders: Vec::new(), + request_sender: tx.clone(), + access_layer: env.access_layer.clone(), + listener: WorkerListener::default(), + engine_config: Arc::new(MitoConfig::default()), + row_group_size: None, + cache_manager: Arc::new(CacheManager::default()), + manifest_ctx: manifest_ctx.clone(), + index_options: IndexOptions::default(), + }) + .collect(); + // Schedule first task. + let task = tasks.pop().unwrap(); + scheduler + .schedule_flush(builder.region_id(), &version_control, task) + .unwrap(); + // Should schedule 1 flush. + assert_eq!(1, scheduler.region_status.len()); + assert_eq!(1, job_scheduler.num_jobs()); + // Check the new version. + let version_data = version_control.current(); + assert_eq!(0, version_data.version.memtables.immutables()[0].id()); + // Schedule remaining tasks. + let output_rxs: Vec<_> = tasks + .into_iter() + .map(|mut task| { + let (output_tx, output_rx) = oneshot::channel(); + task.push_sender(OptionOutputTx::from(output_tx)); + scheduler + .schedule_flush(builder.region_id(), &version_control, task) + .unwrap(); + output_rx + }) + .collect(); + // Assumes the flush job is finished. + version_control.apply_edit( + RegionEdit { + files_to_add: Vec::new(), + files_to_remove: Vec::new(), + compaction_time_window: None, + flushed_entry_id: None, + flushed_sequence: None, + }, + &[0], + builder.file_purger(), + ); + scheduler.on_flush_success(builder.region_id()); + // No new flush task. + assert_eq!(1, job_scheduler.num_jobs()); + // The flush status is cleared. + assert!(scheduler.region_status.is_empty()); + for output_rx in output_rxs { + let output = output_rx.await.unwrap().unwrap(); + assert_eq!(output, 0); + } + } } diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 3ceff8a297..ac5f34f789 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -737,9 +737,7 @@ impl CreateRequestBuilder { } } -// TODO(yingwen): Support conversion in greptime-proto. /// Creates value for i64. -#[cfg(test)] pub(crate) fn i64_value(data: i64) -> v1::Value { v1::Value { value_data: Some(ValueData::I64Value(data)), @@ -747,7 +745,6 @@ pub(crate) fn i64_value(data: i64) -> v1::Value { } /// Creates value for timestamp millis. -#[cfg(test)] pub(crate) fn ts_ms_value(data: i64) -> v1::Value { v1::Value { value_data: Some(ValueData::TimestampMillisecondValue(data)), diff --git a/src/mito2/src/test_util/scheduler_util.rs b/src/mito2/src/test_util/scheduler_util.rs index a47d9d4e7c..5bb0bfe14a 100644 --- a/src/mito2/src/test_util/scheduler_util.rs +++ b/src/mito2/src/test_util/scheduler_util.rs @@ -14,7 +14,7 @@ //! Utilities to mock flush and compaction schedulers. -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use common_datasource::compression::CompressionType; use common_test_util::temp_dir::{create_temp_dir, TempDir}; @@ -28,11 +28,12 @@ use crate::access_layer::{AccessLayer, AccessLayerRef}; use crate::cache::CacheManager; use crate::compaction::CompactionScheduler; use crate::config::MitoConfig; +use crate::error::Result; use crate::flush::FlushScheduler; use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; use crate::region::{ManifestContext, ManifestContextRef, RegionState}; use crate::request::WorkerRequest; -use crate::schedule::scheduler::{LocalScheduler, SchedulerRef}; +use crate::schedule::scheduler::{Job, LocalScheduler, Scheduler, SchedulerRef}; use crate::sst::index::intermediate::IntermediateManager; use crate::worker::WorkerListener; @@ -123,3 +124,26 @@ impl SchedulerEnv { .unwrap_or_else(|| Arc::new(LocalScheduler::new(1))) } } + +#[derive(Default)] +pub struct VecScheduler { + jobs: Mutex>, +} + +impl VecScheduler { + pub fn num_jobs(&self) -> usize { + self.jobs.lock().unwrap().len() + } +} + +#[async_trait::async_trait] +impl Scheduler for VecScheduler { + fn schedule(&self, job: Job) -> Result<()> { + self.jobs.lock().unwrap().push(job); + Ok(()) + } + + async fn stop(&self, _await_termination: bool) -> Result<()> { + Ok(()) + } +} diff --git a/src/mito2/src/test_util/version_util.rs b/src/mito2/src/test_util/version_util.rs index c6e2f45e0b..fad5b0f263 100644 --- a/src/mito2/src/test_util/version_util.rs +++ b/src/mito2/src/test_util/version_util.rs @@ -17,7 +17,8 @@ use std::collections::HashMap; use std::sync::Arc; -use api::v1::SemanticType; +use api::v1::value::ValueData; +use api::v1::{self, ColumnDataType, Mutation, OpType, Row, Rows, SemanticType}; use common_time::Timestamp; use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; @@ -26,11 +27,12 @@ use store_api::storage::RegionId; use crate::manifest::action::RegionEdit; use crate::memtable::time_partition::TimePartitions; +use crate::memtable::{KeyValues, MemtableBuilderRef}; use crate::region::version::{Version, VersionBuilder, VersionControl}; use crate::sst::file::{FileId, FileMeta}; use crate::sst::file_purger::FilePurgerRef; use crate::test_util::memtable_util::EmptyMemtableBuilder; -use crate::test_util::new_noop_file_purger; +use crate::test_util::{new_noop_file_purger, ts_ms_value}; fn new_region_metadata(region_id: RegionId) -> RegionMetadata { let mut builder = RegionMetadataBuilder::new(region_id); @@ -57,7 +59,7 @@ fn new_region_metadata(region_id: RegionId) -> RegionMetadata { pub(crate) struct VersionControlBuilder { metadata: RegionMetadata, file_purger: FilePurgerRef, - memtable_builder: Arc, + memtable_builder: MemtableBuilderRef, files: HashMap, } @@ -79,6 +81,11 @@ impl VersionControlBuilder { self.file_purger.clone() } + pub(crate) fn set_memtable_builder(&mut self, builder: MemtableBuilderRef) -> &mut Self { + self.memtable_builder = builder; + self + } + pub(crate) fn push_l0_file(&mut self, start_ms: i64, end_ms: i64) -> &mut Self { let file_id = FileId::random(); self.files.insert( @@ -118,6 +125,48 @@ impl VersionControlBuilder { } } +/// Put rows to the mutable memtable in the version. +pub(crate) fn write_rows_to_version( + version: &Version, + tag: &str, + start_ts: usize, + num_rows: usize, +) { + let mut rows = Vec::with_capacity(num_rows); + for idx in 0..num_rows { + let ts = (start_ts + idx) as i64; + let values = vec![ + ts_ms_value(ts), + v1::Value { + value_data: Some(ValueData::StringValue(tag.to_string())), + }, + ]; + rows.push(Row { values }); + } + let schema = vec![ + v1::ColumnSchema { + column_name: "ts".to_string(), + datatype: ColumnDataType::TimestampMillisecond as i32, + semantic_type: SemanticType::Timestamp as i32, + ..Default::default() + }, + v1::ColumnSchema { + column_name: "tag_0".to_string(), + datatype: ColumnDataType::String as i32, + semantic_type: SemanticType::Tag as i32, + ..Default::default() + }, + ]; + let rows = Rows { rows, schema }; + let mutation = Mutation { + op_type: OpType::Put as i32, + sequence: start_ts as u64, // The sequence may be incorrect, but it's fine in test. + rows: Some(rows), + }; + let key_values = KeyValues::new(&version.metadata, mutation).unwrap(); + version.memtables.mutable.write(&key_values).unwrap(); +} + /// Add mocked l0 files to the version control. /// `files_to_add` are slice of `(start_ms, end_ms)`. pub(crate) fn apply_edit( diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 0aa4dbb921..7ffb271515 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -657,6 +657,8 @@ impl RegionWorkerLoop { } } + self.listener.on_recv_requests(buffer.len()); + self.handle_requests(&mut buffer).await; self.handle_periodical_tasks(); @@ -901,6 +903,15 @@ impl WorkerListener { // Avoid compiler warning. let _ = region_id; } + + pub(crate) fn on_recv_requests(&self, request_num: usize) { + #[cfg(any(test, feature = "test"))] + if let Some(listener) = &self.listener { + listener.on_recv_requests(request_num); + } + // Avoid compiler warning. + let _ = request_num; + } } #[cfg(test)]