feat: wait flush until the flush is done (#1188)

* feat: Add wait argument to flush

* test(storage): Fix flush tests
This commit is contained in:
Yingwen
2023-03-20 11:25:19 +08:00
committed by GitHub
parent 64441616db
commit ad5c47185d
12 changed files with 96 additions and 77 deletions

View File

@@ -95,6 +95,7 @@ impl Instance {
schema_name: expr.schema_name,
table_name,
region_number: expr.region_id,
wait: None,
};
self.sql_handler()
.execute(SqlRequest::FlushTable(req), QueryContext::arc())

View File

@@ -28,6 +28,7 @@ impl SqlHandler {
&req.schema_name,
table,
req.region_number,
req.wait,
)
.await?;
} else {
@@ -47,6 +48,7 @@ impl SqlHandler {
&req.schema_name,
table,
req.region_number,
req.wait,
)
}))
.await
@@ -62,6 +64,7 @@ impl SqlHandler {
schema: &str,
table: &str,
region: Option<u32>,
wait: Option<bool>,
) -> Result<()> {
let table_ref = TableReference {
catalog,
@@ -71,8 +74,11 @@ impl SqlHandler {
let full_table_name = table_ref.to_string();
let table = self.get_table(&table_ref)?;
table.flush(region).await.context(error::FlushTableSnafu {
table_name: full_table_name,
})
table
.flush(region, wait)
.await
.context(error::FlushTableSnafu {
table_name: full_table_name,
})
}
}

View File

@@ -793,10 +793,10 @@ async fn test_flush_table_all_regions() {
assert!(!has_parquet_file(&region_dir));
// Trigger flush all region
table.flush(None).await.unwrap();
table.flush(None, None).await.unwrap();
// Trigger again, wait for the previous task finished
table.flush(None).await.unwrap();
table.flush(None, None).await.unwrap();
assert!(has_parquet_file(&region_dir));
}
@@ -832,10 +832,10 @@ async fn test_flush_table_with_region_id() {
};
// Trigger flush all region
table.flush(req.region_number).await.unwrap();
table.flush(req.region_number, Some(false)).await.unwrap();
// Trigger again, wait for the previous task finished
table.flush(req.region_number).await.unwrap();
table.flush(req.region_number, Some(true)).await.unwrap();
assert!(has_parquet_file(&region_dir));
}

View File

@@ -35,8 +35,8 @@ use object_store::ObjectStore;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::manifest::{self, Manifest, ManifestVersion, MetaActionIterator};
use store_api::storage::{
AddColumn, AlterOperation, AlterRequest, ChunkReader, ReadContext, Region, RegionMeta,
RegionNumber, ScanRequest, SchemaRef, Snapshot, WriteContext, WriteRequest,
AddColumn, AlterOperation, AlterRequest, ChunkReader, FlushContext, ReadContext, Region,
RegionMeta, RegionNumber, ScanRequest, SchemaRef, Snapshot, WriteContext, WriteRequest,
};
use table::error as table_error;
use table::error::{RegionSchemaMismatchSnafu, Result as TableResult, TableOperationSnafu};
@@ -323,20 +323,27 @@ impl<R: Region> Table for MitoTable<R> {
Ok(rows_deleted)
}
async fn flush(&self, region_number: Option<RegionNumber>) -> TableResult<()> {
async fn flush(
&self,
region_number: Option<RegionNumber>,
wait: Option<bool>,
) -> TableResult<()> {
let flush_ctx = wait.map(|wait| FlushContext { wait }).unwrap_or_default();
if let Some(region_number) = region_number {
if let Some(region) = self.regions.get(&region_number) {
region
.flush()
.flush(&flush_ctx)
.await
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
}
} else {
futures::future::try_join_all(self.regions.values().map(|region| region.flush()))
.await
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
futures::future::try_join_all(
self.regions.values().map(|region| region.flush(&flush_ctx)),
)
.await
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
}
Ok(())

View File

@@ -26,9 +26,9 @@ use datatypes::schema::{ColumnSchema, Schema};
use storage::metadata::{RegionMetaImpl, RegionMetadata};
use storage::write_batch::WriteBatch;
use store_api::storage::{
AlterRequest, Chunk, ChunkReader, CreateOptions, EngineContext, GetRequest, GetResponse,
OpenOptions, ReadContext, Region, RegionDescriptor, RegionId, ScanRequest, ScanResponse,
SchemaRef, Snapshot, StorageEngine, WriteContext, WriteResponse,
AlterRequest, Chunk, ChunkReader, CreateOptions, EngineContext, FlushContext, GetRequest,
GetResponse, OpenOptions, ReadContext, Region, RegionDescriptor, RegionId, ScanRequest,
ScanResponse, SchemaRef, Snapshot, StorageEngine, WriteContext, WriteResponse,
};
pub type Result<T> = std::result::Result<T, MockError>;
@@ -201,7 +201,7 @@ impl Region for MockRegion {
0
}
async fn flush(&self) -> Result<()> {
async fn flush(&self, _ctx: &FlushContext) -> Result<()> {
unimplemented!()
}
}

View File

@@ -27,8 +27,8 @@ use snafu::ResultExt;
use store_api::logstore::LogStore;
use store_api::manifest::{self, Manifest, ManifestVersion, MetaActionIterator};
use store_api::storage::{
AlterRequest, OpenOptions, ReadContext, Region, RegionId, SequenceNumber, WriteContext,
WriteResponse,
AlterRequest, FlushContext, OpenOptions, ReadContext, Region, RegionId, SequenceNumber,
WriteContext, WriteResponse,
};
use crate::compaction::CompactionSchedulerRef;
@@ -136,8 +136,8 @@ impl<S: LogStore> Region for RegionImpl<S> {
.sum()
}
async fn flush(&self) -> Result<()> {
self.inner.flush().await
async fn flush(&self, ctx: &FlushContext) -> Result<()> {
self.inner.flush(ctx).await
}
}
@@ -436,10 +436,6 @@ impl<S: LogStore> RegionImpl<S> {
self.inner.version_control().current_manifest_version()
}
async fn wait_flush_done(&self) -> Result<()> {
self.inner.writer.wait_flush_done().await
}
/// Write to inner, also the `RegionWriter` directly.
async fn write_inner(&self, ctx: &WriteContext, request: WriteBatch) -> Result<WriteResponse> {
self.inner.write(ctx, request).await
@@ -565,7 +561,7 @@ impl<S: LogStore> RegionInner<S> {
self.writer.close().await
}
async fn flush(&self) -> Result<()> {
async fn flush(&self, ctx: &FlushContext) -> Result<()> {
let writer_ctx = WriterContext {
shared: &self.shared,
flush_strategy: &self.flush_strategy,
@@ -576,6 +572,6 @@ impl<S: LogStore> RegionInner<S> {
writer: &self.writer,
manifest: &self.manifest,
};
self.writer.flush(writer_ctx).await
self.writer.flush(writer_ctx, ctx).await
}
}

View File

@@ -18,7 +18,7 @@ use std::sync::Arc;
use common_test_util::temp_dir::create_temp_dir;
use log_store::raft_engine::log_store::RaftEngineLogStore;
use store_api::storage::{OpenOptions, Region, WriteResponse};
use store_api::storage::{FlushContext, OpenOptions, Region, WriteResponse};
use crate::engine;
use crate::flush::FlushStrategyRef;
@@ -91,12 +91,9 @@ impl FlushTester {
self.base().full_scan().await
}
async fn wait_flush_done(&self) {
self.base().region.wait_flush_done().await.unwrap();
}
async fn flush(&self) {
self.base().region.flush().await.unwrap();
async fn flush(&self, wait: Option<bool>) {
let ctx = wait.map(|wait| FlushContext { wait }).unwrap_or_default();
self.base().region.flush(&ctx).await.unwrap();
}
}
@@ -146,8 +143,7 @@ async fn test_manual_flush() {
let sst_dir = format!("{}/{}", store_dir, engine::region_sst_dir("", REGION_NAME));
assert!(!has_parquet_file(&sst_dir));
tester.flush().await;
tester.wait_flush_done().await;
tester.flush(None).await;
assert!(has_parquet_file(&sst_dir));
}
@@ -160,15 +156,12 @@ async fn test_flush_empty() {
let flush_switch = Arc::new(FlushSwitch::default());
let tester = FlushTester::new(store_dir, flush_switch.clone()).await;
// Now set should flush to true to trigger flush.
flush_switch.set_should_flush(true);
// Flush empty table.
tester.flush(None).await;
let data = [(1000, Some(100))];
// Put element to trigger flush.
tester.put(&data).await;
tester.wait_flush_done().await;
// Disable flush.
flush_switch.set_should_flush(false);
// Put again.
let data = [(2000, Some(200))];
tester.put(&data).await;
@@ -197,12 +190,11 @@ async fn test_read_after_flush() {
tester.put(&[(1000, Some(100))]).await;
tester.put(&[(2000, Some(200))]).await;
// Now set should flush to true to trigger flush.
flush_switch.set_should_flush(true);
// Flush.
tester.flush(None).await;
// Put element to trigger flush.
// Put element again.
tester.put(&[(3000, Some(300))]).await;
tester.wait_flush_done().await;
let expect = vec![(1000, Some(100)), (2000, Some(200)), (3000, Some(300))];
@@ -230,24 +222,21 @@ async fn test_merge_read_after_flush() {
tester.put(&[(3000, Some(300))]).await;
tester.put(&[(2000, Some(200))]).await;
// Now set should flush to true to trigger flush.
flush_switch.set_should_flush(true);
// Flush content to SST1.
tester.flush(None).await;
// Put element to trigger flush (In SST2).
// Put element (In SST2).
tester.put(&[(2000, Some(201))]).await;
tester.wait_flush_done().await;
// Disable flush.
flush_switch.set_should_flush(false);
// In SST2.
tester.put(&[(2000, Some(202))]).await;
tester.put(&[(1000, Some(100))]).await;
// Enable flush.
flush_switch.set_should_flush(true);
// Trigger flush and overwrite row (In memtable).
// Trigger flush.
tester.flush(None).await;
// Overwrite row (In memtable).
tester.put(&[(2000, Some(203))]).await;
tester.wait_flush_done().await;
let expect = vec![(1000, Some(100)), (2000, Some(203)), (3000, Some(300))];

View File

@@ -22,7 +22,7 @@ use futures::TryStreamExt;
use snafu::{ensure, ResultExt};
use store_api::logstore::LogStore;
use store_api::manifest::{Manifest, ManifestVersion, MetaAction};
use store_api::storage::{AlterRequest, SequenceNumber, WriteContext, WriteResponse};
use store_api::storage::{AlterRequest, FlushContext, SequenceNumber, WriteContext, WriteResponse};
use tokio::sync::Mutex;
use crate::background::JobHandle;
@@ -261,12 +261,24 @@ impl RegionWriter {
}
/// Flush task manually
pub async fn flush<S: LogStore>(&self, writer_ctx: WriterContext<'_, S>) -> Result<()> {
pub async fn flush<S: LogStore>(
&self,
writer_ctx: WriterContext<'_, S>,
ctx: &FlushContext,
) -> Result<()> {
let mut inner = self.inner.lock().await;
ensure!(!inner.is_closed(), error::ClosedRegionSnafu);
inner.manual_flush(writer_ctx).await
inner.manual_flush(writer_ctx).await?;
if ctx.wait {
if let Some(handle) = inner.flush_handle.take() {
handle.join().await?;
}
}
Ok(())
}
/// Cancel flush task if any
@@ -284,19 +296,6 @@ impl RegionWriter {
}
}
// Private methods for tests.
#[cfg(test)]
impl RegionWriter {
pub async fn wait_flush_done(&self) -> Result<()> {
let mut inner = self.inner.lock().await;
if let Some(handle) = inner.flush_handle.take() {
handle.join().await?;
}
Ok(())
}
}
pub struct WriterContext<'a, S: LogStore> {
pub shared: &'a SharedDataRef,
pub flush_strategy: &'a FlushStrategyRef,

View File

@@ -34,7 +34,7 @@ pub use self::chunk::{Chunk, ChunkReader};
pub use self::descriptors::*;
pub use self::engine::{CreateOptions, EngineContext, OpenOptions, StorageEngine};
pub use self::metadata::RegionMeta;
pub use self::region::{Region, WriteContext};
pub use self::region::{FlushContext, Region, WriteContext};
pub use self::requests::{
AddColumn, AlterOperation, AlterRequest, GetRequest, ScanRequest, WriteRequest,
};

View File

@@ -77,7 +77,8 @@ pub trait Region: Send + Sync + Clone + std::fmt::Debug + 'static {
fn disk_usage_bytes(&self) -> u64;
async fn flush(&self) -> Result<(), Self::Error>;
/// Flush memtable of the region to disk.
async fn flush(&self, ctx: &FlushContext) -> Result<(), Self::Error>;
}
/// Context for write operations.
@@ -89,3 +90,17 @@ impl From<&OpenOptions> for WriteContext {
WriteContext::default()
}
}
/// Context for flush operations.
#[derive(Debug, Clone)]
pub struct FlushContext {
/// If true, the flush will wait until the flush is done.
/// Default: true
pub wait: bool,
}
impl Default for FlushContext {
fn default() -> FlushContext {
FlushContext { wait: true }
}
}

View File

@@ -216,6 +216,8 @@ pub struct FlushTableRequest {
pub schema_name: String,
pub table_name: Option<String>,
pub region_number: Option<RegionNumber>,
/// Wait until the flush is done.
pub wait: Option<bool>,
}
#[cfg(test)]

View File

@@ -96,8 +96,12 @@ pub trait Table: Send + Sync {
}
/// Flush table.
async fn flush(&self, region_number: Option<RegionNumber>) -> Result<()> {
let _ = region_number;
///
/// Options:
/// - region_number: specify region to flush.
/// - wait: Whether to wait until flush is done.
async fn flush(&self, region_number: Option<RegionNumber>, wait: Option<bool>) -> Result<()> {
let _ = (region_number, wait);
UnsupportedSnafu { operation: "FLUSH" }.fail()?
}