diff --git a/src/datanode/src/server/grpc.rs b/src/datanode/src/server/grpc.rs index a38093dc98..e8301e028c 100644 --- a/src/datanode/src/server/grpc.rs +++ b/src/datanode/src/server/grpc.rs @@ -95,6 +95,7 @@ impl Instance { schema_name: expr.schema_name, table_name, region_number: expr.region_id, + wait: None, }; self.sql_handler() .execute(SqlRequest::FlushTable(req), QueryContext::arc()) diff --git a/src/datanode/src/sql/flush_table.rs b/src/datanode/src/sql/flush_table.rs index ab3b459311..d255b2d29b 100644 --- a/src/datanode/src/sql/flush_table.rs +++ b/src/datanode/src/sql/flush_table.rs @@ -28,6 +28,7 @@ impl SqlHandler { &req.schema_name, table, req.region_number, + req.wait, ) .await?; } else { @@ -47,6 +48,7 @@ impl SqlHandler { &req.schema_name, table, req.region_number, + req.wait, ) })) .await @@ -62,6 +64,7 @@ impl SqlHandler { schema: &str, table: &str, region: Option, + wait: Option, ) -> Result<()> { let table_ref = TableReference { catalog, @@ -71,8 +74,11 @@ impl SqlHandler { let full_table_name = table_ref.to_string(); let table = self.get_table(&table_ref)?; - table.flush(region).await.context(error::FlushTableSnafu { - table_name: full_table_name, - }) + table + .flush(region, wait) + .await + .context(error::FlushTableSnafu { + table_name: full_table_name, + }) } } diff --git a/src/mito/src/engine/tests.rs b/src/mito/src/engine/tests.rs index c252d4f0c1..1368dc6a29 100644 --- a/src/mito/src/engine/tests.rs +++ b/src/mito/src/engine/tests.rs @@ -793,10 +793,10 @@ async fn test_flush_table_all_regions() { assert!(!has_parquet_file(®ion_dir)); // Trigger flush all region - table.flush(None).await.unwrap(); + table.flush(None, None).await.unwrap(); // Trigger again, wait for the previous task finished - table.flush(None).await.unwrap(); + table.flush(None, None).await.unwrap(); assert!(has_parquet_file(®ion_dir)); } @@ -832,10 +832,10 @@ async fn test_flush_table_with_region_id() { }; // Trigger flush all region - table.flush(req.region_number).await.unwrap(); + table.flush(req.region_number, Some(false)).await.unwrap(); // Trigger again, wait for the previous task finished - table.flush(req.region_number).await.unwrap(); + table.flush(req.region_number, Some(true)).await.unwrap(); assert!(has_parquet_file(®ion_dir)); } diff --git a/src/mito/src/table.rs b/src/mito/src/table.rs index 9404fb3be3..e244afb235 100644 --- a/src/mito/src/table.rs +++ b/src/mito/src/table.rs @@ -35,8 +35,8 @@ use object_store::ObjectStore; use snafu::{ensure, OptionExt, ResultExt}; use store_api::manifest::{self, Manifest, ManifestVersion, MetaActionIterator}; use store_api::storage::{ - AddColumn, AlterOperation, AlterRequest, ChunkReader, ReadContext, Region, RegionMeta, - RegionNumber, ScanRequest, SchemaRef, Snapshot, WriteContext, WriteRequest, + AddColumn, AlterOperation, AlterRequest, ChunkReader, FlushContext, ReadContext, Region, + RegionMeta, RegionNumber, ScanRequest, SchemaRef, Snapshot, WriteContext, WriteRequest, }; use table::error as table_error; use table::error::{RegionSchemaMismatchSnafu, Result as TableResult, TableOperationSnafu}; @@ -323,20 +323,27 @@ impl Table for MitoTable { Ok(rows_deleted) } - async fn flush(&self, region_number: Option) -> TableResult<()> { + async fn flush( + &self, + region_number: Option, + wait: Option, + ) -> TableResult<()> { + let flush_ctx = wait.map(|wait| FlushContext { wait }).unwrap_or_default(); if let Some(region_number) = region_number { if let Some(region) = self.regions.get(®ion_number) { region - .flush() + .flush(&flush_ctx) .await .map_err(BoxedError::new) .context(table_error::TableOperationSnafu)?; } } else { - futures::future::try_join_all(self.regions.values().map(|region| region.flush())) - .await - .map_err(BoxedError::new) - .context(table_error::TableOperationSnafu)?; + futures::future::try_join_all( + self.regions.values().map(|region| region.flush(&flush_ctx)), + ) + .await + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu)?; } Ok(()) diff --git a/src/mito/src/table/test_util/mock_engine.rs b/src/mito/src/table/test_util/mock_engine.rs index c3be8318b8..3b0b7f8ebe 100644 --- a/src/mito/src/table/test_util/mock_engine.rs +++ b/src/mito/src/table/test_util/mock_engine.rs @@ -26,9 +26,9 @@ use datatypes::schema::{ColumnSchema, Schema}; use storage::metadata::{RegionMetaImpl, RegionMetadata}; use storage::write_batch::WriteBatch; use store_api::storage::{ - AlterRequest, Chunk, ChunkReader, CreateOptions, EngineContext, GetRequest, GetResponse, - OpenOptions, ReadContext, Region, RegionDescriptor, RegionId, ScanRequest, ScanResponse, - SchemaRef, Snapshot, StorageEngine, WriteContext, WriteResponse, + AlterRequest, Chunk, ChunkReader, CreateOptions, EngineContext, FlushContext, GetRequest, + GetResponse, OpenOptions, ReadContext, Region, RegionDescriptor, RegionId, ScanRequest, + ScanResponse, SchemaRef, Snapshot, StorageEngine, WriteContext, WriteResponse, }; pub type Result = std::result::Result; @@ -201,7 +201,7 @@ impl Region for MockRegion { 0 } - async fn flush(&self) -> Result<()> { + async fn flush(&self, _ctx: &FlushContext) -> Result<()> { unimplemented!() } } diff --git a/src/storage/src/region.rs b/src/storage/src/region.rs index a7933ab9bf..2c7ac128be 100644 --- a/src/storage/src/region.rs +++ b/src/storage/src/region.rs @@ -27,8 +27,8 @@ use snafu::ResultExt; use store_api::logstore::LogStore; use store_api::manifest::{self, Manifest, ManifestVersion, MetaActionIterator}; use store_api::storage::{ - AlterRequest, OpenOptions, ReadContext, Region, RegionId, SequenceNumber, WriteContext, - WriteResponse, + AlterRequest, FlushContext, OpenOptions, ReadContext, Region, RegionId, SequenceNumber, + WriteContext, WriteResponse, }; use crate::compaction::CompactionSchedulerRef; @@ -136,8 +136,8 @@ impl Region for RegionImpl { .sum() } - async fn flush(&self) -> Result<()> { - self.inner.flush().await + async fn flush(&self, ctx: &FlushContext) -> Result<()> { + self.inner.flush(ctx).await } } @@ -436,10 +436,6 @@ impl RegionImpl { self.inner.version_control().current_manifest_version() } - async fn wait_flush_done(&self) -> Result<()> { - self.inner.writer.wait_flush_done().await - } - /// Write to inner, also the `RegionWriter` directly. async fn write_inner(&self, ctx: &WriteContext, request: WriteBatch) -> Result { self.inner.write(ctx, request).await @@ -565,7 +561,7 @@ impl RegionInner { self.writer.close().await } - async fn flush(&self) -> Result<()> { + async fn flush(&self, ctx: &FlushContext) -> Result<()> { let writer_ctx = WriterContext { shared: &self.shared, flush_strategy: &self.flush_strategy, @@ -576,6 +572,6 @@ impl RegionInner { writer: &self.writer, manifest: &self.manifest, }; - self.writer.flush(writer_ctx).await + self.writer.flush(writer_ctx, ctx).await } } diff --git a/src/storage/src/region/tests/flush.rs b/src/storage/src/region/tests/flush.rs index e17b8ff21e..07e0b7bb2c 100644 --- a/src/storage/src/region/tests/flush.rs +++ b/src/storage/src/region/tests/flush.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use common_test_util::temp_dir::create_temp_dir; use log_store::raft_engine::log_store::RaftEngineLogStore; -use store_api::storage::{OpenOptions, Region, WriteResponse}; +use store_api::storage::{FlushContext, OpenOptions, Region, WriteResponse}; use crate::engine; use crate::flush::FlushStrategyRef; @@ -91,12 +91,9 @@ impl FlushTester { self.base().full_scan().await } - async fn wait_flush_done(&self) { - self.base().region.wait_flush_done().await.unwrap(); - } - - async fn flush(&self) { - self.base().region.flush().await.unwrap(); + async fn flush(&self, wait: Option) { + let ctx = wait.map(|wait| FlushContext { wait }).unwrap_or_default(); + self.base().region.flush(&ctx).await.unwrap(); } } @@ -146,8 +143,7 @@ async fn test_manual_flush() { let sst_dir = format!("{}/{}", store_dir, engine::region_sst_dir("", REGION_NAME)); assert!(!has_parquet_file(&sst_dir)); - tester.flush().await; - tester.wait_flush_done().await; + tester.flush(None).await; assert!(has_parquet_file(&sst_dir)); } @@ -160,15 +156,12 @@ async fn test_flush_empty() { let flush_switch = Arc::new(FlushSwitch::default()); let tester = FlushTester::new(store_dir, flush_switch.clone()).await; - // Now set should flush to true to trigger flush. - flush_switch.set_should_flush(true); + // Flush empty table. + tester.flush(None).await; let data = [(1000, Some(100))]; // Put element to trigger flush. tester.put(&data).await; - tester.wait_flush_done().await; - // Disable flush. - flush_switch.set_should_flush(false); // Put again. let data = [(2000, Some(200))]; tester.put(&data).await; @@ -197,12 +190,11 @@ async fn test_read_after_flush() { tester.put(&[(1000, Some(100))]).await; tester.put(&[(2000, Some(200))]).await; - // Now set should flush to true to trigger flush. - flush_switch.set_should_flush(true); + // Flush. + tester.flush(None).await; - // Put element to trigger flush. + // Put element again. tester.put(&[(3000, Some(300))]).await; - tester.wait_flush_done().await; let expect = vec![(1000, Some(100)), (2000, Some(200)), (3000, Some(300))]; @@ -230,24 +222,21 @@ async fn test_merge_read_after_flush() { tester.put(&[(3000, Some(300))]).await; tester.put(&[(2000, Some(200))]).await; - // Now set should flush to true to trigger flush. - flush_switch.set_should_flush(true); + // Flush content to SST1. + tester.flush(None).await; - // Put element to trigger flush (In SST2). + // Put element (In SST2). tester.put(&[(2000, Some(201))]).await; - tester.wait_flush_done().await; - // Disable flush. - flush_switch.set_should_flush(false); // In SST2. tester.put(&[(2000, Some(202))]).await; tester.put(&[(1000, Some(100))]).await; - // Enable flush. - flush_switch.set_should_flush(true); - // Trigger flush and overwrite row (In memtable). + // Trigger flush. + tester.flush(None).await; + + // Overwrite row (In memtable). tester.put(&[(2000, Some(203))]).await; - tester.wait_flush_done().await; let expect = vec![(1000, Some(100)), (2000, Some(203)), (3000, Some(300))]; diff --git a/src/storage/src/region/writer.rs b/src/storage/src/region/writer.rs index 86a4236fdd..a544771ba2 100644 --- a/src/storage/src/region/writer.rs +++ b/src/storage/src/region/writer.rs @@ -22,7 +22,7 @@ use futures::TryStreamExt; use snafu::{ensure, ResultExt}; use store_api::logstore::LogStore; use store_api::manifest::{Manifest, ManifestVersion, MetaAction}; -use store_api::storage::{AlterRequest, SequenceNumber, WriteContext, WriteResponse}; +use store_api::storage::{AlterRequest, FlushContext, SequenceNumber, WriteContext, WriteResponse}; use tokio::sync::Mutex; use crate::background::JobHandle; @@ -261,12 +261,24 @@ impl RegionWriter { } /// Flush task manually - pub async fn flush(&self, writer_ctx: WriterContext<'_, S>) -> Result<()> { + pub async fn flush( + &self, + writer_ctx: WriterContext<'_, S>, + ctx: &FlushContext, + ) -> Result<()> { let mut inner = self.inner.lock().await; ensure!(!inner.is_closed(), error::ClosedRegionSnafu); - inner.manual_flush(writer_ctx).await + inner.manual_flush(writer_ctx).await?; + + if ctx.wait { + if let Some(handle) = inner.flush_handle.take() { + handle.join().await?; + } + } + + Ok(()) } /// Cancel flush task if any @@ -284,19 +296,6 @@ impl RegionWriter { } } -// Private methods for tests. -#[cfg(test)] -impl RegionWriter { - pub async fn wait_flush_done(&self) -> Result<()> { - let mut inner = self.inner.lock().await; - if let Some(handle) = inner.flush_handle.take() { - handle.join().await?; - } - - Ok(()) - } -} - pub struct WriterContext<'a, S: LogStore> { pub shared: &'a SharedDataRef, pub flush_strategy: &'a FlushStrategyRef, diff --git a/src/store-api/src/storage.rs b/src/store-api/src/storage.rs index 10b14d37a9..9301303229 100644 --- a/src/store-api/src/storage.rs +++ b/src/store-api/src/storage.rs @@ -34,7 +34,7 @@ pub use self::chunk::{Chunk, ChunkReader}; pub use self::descriptors::*; pub use self::engine::{CreateOptions, EngineContext, OpenOptions, StorageEngine}; pub use self::metadata::RegionMeta; -pub use self::region::{Region, WriteContext}; +pub use self::region::{FlushContext, Region, WriteContext}; pub use self::requests::{ AddColumn, AlterOperation, AlterRequest, GetRequest, ScanRequest, WriteRequest, }; diff --git a/src/store-api/src/storage/region.rs b/src/store-api/src/storage/region.rs index df8047f488..d4dd2c0e1e 100644 --- a/src/store-api/src/storage/region.rs +++ b/src/store-api/src/storage/region.rs @@ -77,7 +77,8 @@ pub trait Region: Send + Sync + Clone + std::fmt::Debug + 'static { fn disk_usage_bytes(&self) -> u64; - async fn flush(&self) -> Result<(), Self::Error>; + /// Flush memtable of the region to disk. + async fn flush(&self, ctx: &FlushContext) -> Result<(), Self::Error>; } /// Context for write operations. @@ -89,3 +90,17 @@ impl From<&OpenOptions> for WriteContext { WriteContext::default() } } + +/// Context for flush operations. +#[derive(Debug, Clone)] +pub struct FlushContext { + /// If true, the flush will wait until the flush is done. + /// Default: true + pub wait: bool, +} + +impl Default for FlushContext { + fn default() -> FlushContext { + FlushContext { wait: true } + } +} diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index b09e47446f..6740823800 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -216,6 +216,8 @@ pub struct FlushTableRequest { pub schema_name: String, pub table_name: Option, pub region_number: Option, + /// Wait until the flush is done. + pub wait: Option, } #[cfg(test)] diff --git a/src/table/src/table.rs b/src/table/src/table.rs index a733473a77..9f48dead0f 100644 --- a/src/table/src/table.rs +++ b/src/table/src/table.rs @@ -96,8 +96,12 @@ pub trait Table: Send + Sync { } /// Flush table. - async fn flush(&self, region_number: Option) -> Result<()> { - let _ = region_number; + /// + /// Options: + /// - region_number: specify region to flush. + /// - wait: Whether to wait until flush is done. + async fn flush(&self, region_number: Option, wait: Option) -> Result<()> { + let _ = (region_number, wait); UnsupportedSnafu { operation: "FLUSH" }.fail()? }