From 296c6dfcbff1a4a43de05cdbc29d54a16c4045d4 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Mon, 13 Mar 2023 20:10:37 +0800 Subject: [PATCH] feat: implement table flush (#1121) * feat: add flush method for trait * feat: implement flush via grpc * chore: move table_dir/region_name/region_id to table crate * chore: Update src/mito/src/table.rs --------- Co-authored-by: Yingwen --- src/client/src/database.rs | 11 +- src/datanode/src/error.rs | 8 ++ src/datanode/src/instance/grpc.rs | 2 +- src/datanode/src/server/grpc.rs | 16 ++- src/datanode/src/sql.rs | 3 + src/datanode/src/sql/flush_table.rs | 40 ++++++ src/frontend/src/instance/distributed.rs | 61 ++++++++- src/frontend/src/instance/distributed/grpc.rs | 6 +- src/frontend/src/instance/grpc.rs | 127 +++++++++++++++++- src/frontend/src/tests.rs | 39 ++++++ src/mito/src/engine.rs | 27 +--- src/mito/src/engine/procedure/create.rs | 7 +- src/mito/src/engine/tests.rs | 93 ++++++++++++- src/mito/src/table.rs | 21 ++- src/mito/src/table/test_util.rs | 20 ++- src/mito/src/table/test_util/mock_engine.rs | 4 + src/storage/src/region.rs | 18 +++ src/storage/src/region/tests/flush.rs | 30 ++++- src/storage/src/region/writer.rs | 14 ++ src/store-api/src/storage/region.rs | 2 + src/table/src/engine.rs | 18 +++ src/table/src/requests.rs | 8 ++ src/table/src/table.rs | 7 +- 23 files changed, 539 insertions(+), 43 deletions(-) create mode 100644 src/datanode/src/sql/flush_table.rs diff --git a/src/client/src/database.rs b/src/client/src/database.rs index ab86c12de8..de6295f643 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -19,8 +19,8 @@ use api::v1::ddl_request::Expr as DdlExpr; use api::v1::greptime_request::Request; use api::v1::query_request::Query; use api::v1::{ - AlterExpr, AuthHeader, CreateTableExpr, DdlRequest, DropTableExpr, GreptimeRequest, - InsertRequest, PromRangeQuery, QueryRequest, RequestHeader, + AlterExpr, AuthHeader, CreateTableExpr, DdlRequest, DropTableExpr, FlushTableExpr, + GreptimeRequest, InsertRequest, PromRangeQuery, QueryRequest, RequestHeader, }; use arrow_flight::{FlightData, Ticket}; use common_error::prelude::*; @@ -135,6 +135,13 @@ impl Database { .await } + pub async fn flush_table(&self, expr: FlushTableExpr) -> Result { + self.do_get(Request::Ddl(DdlRequest { + expr: Some(DdlExpr::FlushTable(expr)), + })) + .await + } + async fn do_get(&self, request: Request) -> Result { let request = GreptimeRequest { header: Some(RequestHeader { diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 138deb1679..3adf6cf47f 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -169,6 +169,13 @@ pub enum Error { source: TableError, }, + #[snafu(display("Failed to flush table: {}, source: {}", table_name, source))] + FlushTable { + table_name: String, + #[snafu(backtrace)] + source: TableError, + }, + #[snafu(display("Failed to start server, source: {}", source))] StartServer { #[snafu(backtrace)] @@ -539,6 +546,7 @@ impl ErrorExt for Error { source.status_code() } DropTable { source, .. } => source.status_code(), + FlushTable { source, .. } => source.status_code(), Insert { source, .. } => source.status_code(), Delete { source, .. } => source.status_code(), diff --git a/src/datanode/src/instance/grpc.rs b/src/datanode/src/instance/grpc.rs index 1664a53e9a..05592d6104 100644 --- a/src/datanode/src/instance/grpc.rs +++ b/src/datanode/src/instance/grpc.rs @@ -127,7 +127,7 @@ impl Instance { DdlExpr::Alter(expr) => self.handle_alter(expr).await, DdlExpr::CreateDatabase(expr) => self.handle_create_database(expr, query_ctx).await, DdlExpr::DropTable(expr) => self.handle_drop_table(expr).await, - DdlExpr::FlushTable(_) => todo!(), + DdlExpr::FlushTable(expr) => self.handle_flush_table(expr).await, } } } diff --git a/src/datanode/src/server/grpc.rs b/src/datanode/src/server/grpc.rs index c541912de6..33734ceae1 100644 --- a/src/datanode/src/server/grpc.rs +++ b/src/datanode/src/server/grpc.rs @@ -12,13 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use api::v1::{AlterExpr, CreateTableExpr, DropTableExpr}; +use api::v1::{AlterExpr, CreateTableExpr, DropTableExpr, FlushTableExpr}; use common_grpc_expr::{alter_expr_to_request, create_expr_to_request}; use common_query::Output; use common_telemetry::info; use session::context::QueryContext; use snafu::prelude::*; -use table::requests::DropTableRequest; +use table::requests::{DropTableRequest, FlushTableRequest}; use crate::error::{ AlterExprToRequestSnafu, BumpTableIdSnafu, CreateExprToRequestSnafu, @@ -82,6 +82,18 @@ impl Instance { .execute(SqlRequest::DropTable(req), QueryContext::arc()) .await } + + pub(crate) async fn handle_flush_table(&self, expr: FlushTableExpr) -> Result { + let req = FlushTableRequest { + catalog_name: expr.catalog_name, + schema_name: expr.schema_name, + table_name: expr.table_name, + region_number: expr.region_id, + }; + self.sql_handler() + .execute(SqlRequest::FlushTable(req), QueryContext::arc()) + .await + } } #[cfg(test)] diff --git a/src/datanode/src/sql.rs b/src/datanode/src/sql.rs index cfa804df47..2a05e8c8f6 100644 --- a/src/datanode/src/sql.rs +++ b/src/datanode/src/sql.rs @@ -39,6 +39,7 @@ mod copy_table_from; mod create; mod delete; mod drop_table; +mod flush_table; pub(crate) mod insert; #[derive(Debug)] @@ -48,6 +49,7 @@ pub enum SqlRequest { CreateDatabase(CreateDatabaseRequest), Alter(AlterTableRequest), DropTable(DropTableRequest), + FlushTable(FlushTableRequest), ShowDatabases(ShowDatabases), ShowTables(ShowTables), DescribeTable(DescribeTable), @@ -116,6 +118,7 @@ impl SqlHandler { })?; describe_table(table).context(ExecuteSqlSnafu) } + SqlRequest::FlushTable(req) => self.flush_table(req).await, }; if let Err(e) = &result { error!(e; "{query_ctx}"); diff --git a/src/datanode/src/sql/flush_table.rs b/src/datanode/src/sql/flush_table.rs new file mode 100644 index 0000000000..35ea7361cc --- /dev/null +++ b/src/datanode/src/sql/flush_table.rs @@ -0,0 +1,40 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_query::Output; +use snafu::ResultExt; +use table::engine::TableReference; +use table::requests::FlushTableRequest; + +use crate::error::{self, Result}; +use crate::sql::SqlHandler; + +impl SqlHandler { + pub(crate) async fn flush_table(&self, req: FlushTableRequest) -> Result { + let table_ref = TableReference { + catalog: &req.catalog_name, + schema: &req.schema_name, + table: &req.table_name, + }; + + let full_table_name = table_ref.to_string(); + + let table = self.get_table(&table_ref)?; + + table.flush(req).await.context(error::FlushTableSnafu { + table_name: full_table_name, + })?; + Ok(Output::AffectedRows(0)) + } +} diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index 66f98d28cd..472b05245b 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -19,8 +19,8 @@ use std::sync::Arc; use api::helper::ColumnDataTypeWrapper; use api::v1::{ - column_def, AlterExpr, CreateDatabaseExpr, CreateTableExpr, DropTableExpr, InsertRequest, - TableId, + column_def, AlterExpr, CreateDatabaseExpr, CreateTableExpr, DropTableExpr, FlushTableExpr, + InsertRequest, TableId, }; use async_trait::async_trait; use catalog::helper::{SchemaKey, SchemaValue}; @@ -39,7 +39,7 @@ use meta_client::client::MetaClient; use meta_client::rpc::router::DeleteRequest as MetaDeleteRequest; use meta_client::rpc::{ CompareAndPutRequest, CreateRequest as MetaCreateRequest, Partition as MetaPartition, - RouteResponse, TableName, + RouteRequest, RouteResponse, TableName, }; use partition::partition::{PartitionBound, PartitionDef}; use query::error::QueryExecutionSnafu; @@ -259,6 +259,61 @@ impl DistInstance { Ok(Output::AffectedRows(1)) } + async fn flush_table(&self, table_name: TableName, region_id: Option) -> Result { + let _ = self + .catalog_manager + .table( + &table_name.catalog_name, + &table_name.schema_name, + &table_name.table_name, + ) + .await + .context(CatalogSnafu)? + .with_context(|| TableNotFoundSnafu { + table_name: table_name.to_string(), + })?; + + let route_response = self + .meta_client + .route(RouteRequest { + table_names: vec![table_name.clone()], + }) + .await + .context(RequestMetaSnafu)?; + + let expr = FlushTableExpr { + catalog_name: table_name.catalog_name.clone(), + schema_name: table_name.schema_name.clone(), + table_name: table_name.table_name.clone(), + region_id, + }; + + for table_route in &route_response.table_routes { + let should_send_rpc = table_route.region_routes.iter().any(|route| { + if let Some(region_id) = region_id { + region_id == route.region.id as u32 + } else { + true + } + }); + + if !should_send_rpc { + continue; + } + for datanode in table_route.find_leaders() { + debug!("Flushing table {table_name} on Datanode {datanode:?}"); + + let client = self.datanode_clients.get_client(&datanode).await; + let client = Database::new(&expr.catalog_name, &expr.schema_name, client); + client + .flush_table(expr.clone()) + .await + .context(RequestDatanodeSnafu)?; + } + } + Ok(Output::AffectedRows(0)) + } + async fn handle_statement( &self, stmt: Statement, diff --git a/src/frontend/src/instance/distributed/grpc.rs b/src/frontend/src/instance/distributed/grpc.rs index 513e4cac84..0e41cc3f12 100644 --- a/src/frontend/src/instance/distributed/grpc.rs +++ b/src/frontend/src/instance/distributed/grpc.rs @@ -57,7 +57,11 @@ impl GrpcQueryHandler for DistInstance { TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name); self.drop_table(table_name).await } - DdlExpr::FlushTable(_) => todo!(), + DdlExpr::FlushTable(expr) => { + let table_name = + TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name); + self.flush_table(table_name, expr.region_id).await + } } } } diff --git a/src/frontend/src/instance/grpc.rs b/src/frontend/src/instance/grpc.rs index 27d5236e6a..ef12c71da3 100644 --- a/src/frontend/src/instance/grpc.rs +++ b/src/frontend/src/instance/grpc.rs @@ -91,14 +91,15 @@ mod test { use api::v1::ddl_request::Expr as DdlExpr; use api::v1::{ alter_expr, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDef, - CreateDatabaseExpr, CreateTableExpr, DdlRequest, DropTableExpr, InsertRequest, - QueryRequest, + CreateDatabaseExpr, CreateTableExpr, DdlRequest, DropTableExpr, FlushTableExpr, + InsertRequest, QueryRequest, }; use catalog::helper::{TableGlobalKey, TableGlobalValue}; use common_query::Output; use common_recordbatch::RecordBatches; use query::parser::QueryLanguageParser; use session::context::QueryContext; + use tests::{has_parquet_file, test_region_dir}; use super::*; use crate::table::DistTable; @@ -352,6 +353,108 @@ CREATE TABLE {table_name} ( test_insert_and_query_on_auto_created_table(instance).await } + #[tokio::test(flavor = "multi_thread")] + async fn test_distributed_flush_table() { + common_telemetry::init_default_ut_logging(); + + let instance = tests::create_distributed_instance("test_distributed_flush_table").await; + let data_tmp_dirs = instance.data_tmp_dirs(); + let frontend = instance.frontend.as_ref(); + + let table_name = "my_dist_table"; + let sql = format!( + r" +CREATE TABLE {table_name} ( + a INT, + ts TIMESTAMP, + TIME INDEX (ts) +) PARTITION BY RANGE COLUMNS(a) ( + PARTITION r0 VALUES LESS THAN (10), + PARTITION r1 VALUES LESS THAN (20), + PARTITION r2 VALUES LESS THAN (50), + PARTITION r3 VALUES LESS THAN (MAXVALUE), +)" + ); + create_table(frontend, sql).await; + + test_insert_and_query_on_existing_table(frontend, table_name).await; + + flush_table(frontend, "greptime", "public", table_name, None).await; + // Wait for previous task finished + flush_table(frontend, "greptime", "public", table_name, None).await; + + let table_id = 1024; + + let table = instance + .frontend + .catalog_manager() + .table("greptime", "public", table_name) + .await + .unwrap() + .unwrap(); + let table = table.as_any().downcast_ref::().unwrap(); + + let TableGlobalValue { regions_id_map, .. } = table + .table_global_value(&TableGlobalKey { + catalog_name: "greptime".to_string(), + schema_name: "public".to_string(), + table_name: table_name.to_string(), + }) + .await + .unwrap() + .unwrap(); + let region_to_dn_map = regions_id_map + .iter() + .map(|(k, v)| (v[0], *k)) + .collect::>(); + + for (region, dn) in region_to_dn_map.iter() { + // data_tmp_dirs -> dn: 1..4 + let data_tmp_dir = data_tmp_dirs.get((*dn - 1) as usize).unwrap(); + let region_dir = test_region_dir( + data_tmp_dir.path().to_str().unwrap(), + "greptime", + "public", + table_id, + *region, + ); + has_parquet_file(®ion_dir); + } + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_standalone_flush_table() { + common_telemetry::init_default_ut_logging(); + + let standalone = tests::create_standalone_instance("test_standalone_flush_table").await; + let instance = &standalone.instance; + let data_tmp_dir = standalone.data_tmp_dir(); + + let table_name = "my_table"; + let sql = format!("CREATE TABLE {table_name} (a INT, ts TIMESTAMP, TIME INDEX (ts))"); + + create_table(instance, sql).await; + + test_insert_and_query_on_existing_table(instance, table_name).await; + + let table_id = 1024; + let region_id = 0; + let region_dir = test_region_dir( + data_tmp_dir.path().to_str().unwrap(), + "greptime", + "public", + table_id, + region_id, + ); + assert!(!has_parquet_file(®ion_dir)); + + flush_table(instance, "greptime", "public", "my_table", None).await; + // Wait for previous task finished + flush_table(instance, "greptime", "public", "my_table", None).await; + + assert!(has_parquet_file(®ion_dir)); + } + async fn create_table(frontend: &Instance, sql: String) { let request = Request::Query(QueryRequest { query: Some(Query::Sql(sql)), @@ -360,6 +463,26 @@ CREATE TABLE {table_name} ( assert!(matches!(output, Output::AffectedRows(0))); } + async fn flush_table( + frontend: &Instance, + catalog_name: &str, + schema_name: &str, + table_name: &str, + region_id: Option, + ) { + let request = Request::Ddl(DdlRequest { + expr: Some(DdlExpr::FlushTable(FlushTableExpr { + catalog_name: catalog_name.to_string(), + schema_name: schema_name.to_string(), + table_name: table_name.to_string(), + region_id, + })), + }); + + let output = query(frontend, request).await; + assert!(matches!(output, Output::AffectedRows(0))); + } + async fn test_insert_and_query_on_existing_table(instance: &Instance, table_name: &str) { let insert = InsertRequest { table_name: table_name.to_string(), diff --git a/src/frontend/src/tests.rs b/src/frontend/src/tests.rs index 631d04ce06..ad79cd261a 100644 --- a/src/frontend/src/tests.rs +++ b/src/frontend/src/tests.rs @@ -34,6 +34,7 @@ use partition::route::TableRoutes; use servers::grpc::GrpcServer; use servers::query_handler::grpc::ServerGrpcQueryHandlerAdaptor; use servers::Mode; +use table::engine::{region_name, table_dir}; use tonic::transport::Server; use tower::service_fn; @@ -56,11 +57,23 @@ pub(crate) struct MockDistributedInstance { _guards: Vec, } +impl MockDistributedInstance { + pub fn data_tmp_dirs(&self) -> Vec<&TempDir> { + self._guards.iter().map(|g| &g._data_tmp_dir).collect() + } +} + pub(crate) struct MockStandaloneInstance { pub(crate) instance: Arc, _guard: TestGuard, } +impl MockStandaloneInstance { + pub fn data_tmp_dir(&self) -> &TempDir { + &self._guard._data_tmp_dir + } +} + pub(crate) async fn create_standalone_instance(test_name: &str) -> MockStandaloneInstance { let (opts, guard) = create_tmp_dir_and_datanode_opts(test_name); let datanode_instance = DatanodeInstance::new(&opts).await.unwrap(); @@ -269,3 +282,29 @@ pub(crate) async fn create_distributed_instance(test_name: &str) -> MockDistribu _guards: test_guards, } } + +pub fn test_region_dir( + dir: &str, + catalog_name: &str, + schema_name: &str, + table_id: u32, + region_id: u32, +) -> String { + let table_dir = table_dir(catalog_name, schema_name, table_id); + let region_name = region_name(table_id, region_id); + + format!("{}/{}/{}", dir, table_dir, region_name) +} + +pub fn has_parquet_file(sst_dir: &str) -> bool { + for entry in std::fs::read_dir(sst_dir).unwrap() { + let entry = entry.unwrap(); + let path = entry.path(); + if !path.is_dir() { + assert_eq!("parquet", path.extension().unwrap()); + return true; + } + } + + false +} diff --git a/src/mito/src/engine.rs b/src/mito/src/engine.rs index ed77a72763..b83d43f091 100644 --- a/src/mito/src/engine.rs +++ b/src/mito/src/engine.rs @@ -31,13 +31,14 @@ use snafu::{ensure, OptionExt, ResultExt}; use store_api::storage::{ ColumnDescriptorBuilder, ColumnFamilyDescriptor, ColumnFamilyDescriptorBuilder, ColumnId, CreateOptions, EngineContext as StorageEngineContext, OpenOptions, Region, - RegionDescriptorBuilder, RegionId, RowKeyDescriptor, RowKeyDescriptorBuilder, StorageEngine, + RegionDescriptorBuilder, RowKeyDescriptor, RowKeyDescriptorBuilder, StorageEngine, +}; +use table::engine::{ + region_id, region_name, table_dir, EngineContext, TableEngine, TableEngineProcedure, + TableReference, }; -use table::engine::{EngineContext, TableEngine, TableEngineProcedure, TableReference}; use table::error::TableOperationSnafu; -use table::metadata::{ - TableId, TableInfo, TableInfoBuilder, TableMetaBuilder, TableType, TableVersion, -}; +use table::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder, TableType, TableVersion}; use table::requests::{ AlterKind, AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest, }; @@ -59,22 +60,6 @@ pub const MITO_ENGINE: &str = "mito"; pub const INIT_COLUMN_ID: ColumnId = 0; const INIT_TABLE_VERSION: TableVersion = 0; -/// Generate region name in the form of "{TABLE_ID}_{REGION_NUMBER}" -#[inline] -fn region_name(table_id: TableId, n: u32) -> String { - format!("{table_id}_{n:010}") -} - -#[inline] -fn region_id(table_id: TableId, n: u32) -> RegionId { - (u64::from(table_id) << 32) | u64::from(n) -} - -#[inline] -fn table_dir(catalog_name: &str, schema_name: &str, table_id: TableId) -> String { - format!("{catalog_name}/{schema_name}/{table_id}/") -} - /// [TableEngine] implementation. /// /// About mito . diff --git a/src/mito/src/engine/procedure/create.rs b/src/mito/src/engine/procedure/create.rs index 0f3d504bee..ee0827ae3e 100644 --- a/src/mito/src/engine/procedure/create.rs +++ b/src/mito/src/engine/procedure/create.rs @@ -25,6 +25,7 @@ use store_api::storage::{ ColumnId, CreateOptions, EngineContext, OpenOptions, RegionDescriptorBuilder, RegionNumber, StorageEngine, }; +use table::engine::{region_id, table_dir}; use table::metadata::{TableInfoBuilder, TableMetaBuilder, TableType}; use table::requests::CreateTableRequest; @@ -146,7 +147,7 @@ impl CreateMitoTable { /// Creates regions for the table. async fn on_create_regions(&mut self) -> Result { let engine_ctx = EngineContext::default(); - let table_dir = engine::table_dir( + let table_dir = table_dir( &self.data.request.catalog_name, &self.data.request.schema_name, self.data.request.id, @@ -203,7 +204,7 @@ impl CreateMitoTable { } // We need to create that region. - let region_id = engine::region_id(self.data.request.id, *number); + let region_id = region_id(self.data.request.id, *number); let region_desc = RegionDescriptorBuilder::default() .id(region_id) .name(region_name.clone()) @@ -234,7 +235,7 @@ impl CreateMitoTable { /// Writes metadata to the table manifest. async fn on_write_table_manifest(&mut self) -> Result { - let table_dir = engine::table_dir( + let table_dir = table_dir( &self.data.request.catalog_name, &self.data.request.schema_name, self.data.request.id, diff --git a/src/mito/src/engine/tests.rs b/src/mito/src/engine/tests.rs index d533a01621..9771cb668e 100644 --- a/src/mito/src/engine/tests.rs +++ b/src/mito/src/engine/tests.rs @@ -31,14 +31,28 @@ use storage::region::RegionImpl; use storage::EngineImpl; use store_api::manifest::Manifest; use store_api::storage::ReadContext; -use table::requests::{AddColumnRequest, AlterKind, DeleteRequest, TableOptions}; +use table::requests::{ + AddColumnRequest, AlterKind, DeleteRequest, FlushTableRequest, TableOptions, +}; use super::*; -use crate::table::test_util; use crate::table::test_util::{ - new_insert_request, schema_for_test, TestEngineComponents, TABLE_NAME, + self, new_insert_request, schema_for_test, setup_table, TestEngineComponents, TABLE_NAME, }; +pub fn has_parquet_file(sst_dir: &str) -> bool { + for entry in std::fs::read_dir(sst_dir).unwrap() { + let entry = entry.unwrap(); + let path = entry.path(); + if !path.is_dir() { + assert_eq!("parquet", path.extension().unwrap()); + return true; + } + } + + false +} + async fn setup_table_with_column_default_constraint() -> (TempDir, String, TableRef) { let table_name = "test_default_constraint"; let column_schemas = vec![ @@ -752,3 +766,76 @@ async fn test_table_delete_rows() { +-------+-----+--------+-------------------------+" ); } + +#[tokio::test] +async fn test_flush_table_all_regions() { + let TestEngineComponents { + table_ref: table, + dir, + .. + } = test_util::setup_test_engine_and_table().await; + + setup_table(table.clone()).await; + + let table_id = 1u32; + let region_name = region_name(table_id, 0); + + let table_info = table.table_info(); + let table_dir = table_dir(&table_info.catalog_name, &table_info.schema_name, table_id); + + let region_dir = format!( + "{}/{}/{}", + dir.path().to_str().unwrap(), + table_dir, + region_name + ); + + assert!(!has_parquet_file(®ion_dir)); + + // Trigger flush all region + table.flush(FlushTableRequest::default()).await.unwrap(); + + // Trigger again, wait for the previous task finished + table.flush(FlushTableRequest::default()).await.unwrap(); + + assert!(has_parquet_file(®ion_dir)); +} + +#[tokio::test] +async fn test_flush_table_with_region_id() { + let TestEngineComponents { + table_ref: table, + dir, + .. + } = test_util::setup_test_engine_and_table().await; + + setup_table(table.clone()).await; + + let table_id = 1u32; + let region_name = region_name(table_id, 0); + + let table_info = table.table_info(); + let table_dir = table_dir(&table_info.catalog_name, &table_info.schema_name, table_id); + + let region_dir = format!( + "{}/{}/{}", + dir.path().to_str().unwrap(), + table_dir, + region_name + ); + + assert!(!has_parquet_file(®ion_dir)); + + let req = FlushTableRequest { + region_number: Some(0), + ..Default::default() + }; + + // Trigger flush all region + table.flush(req.clone()).await.unwrap(); + + // Trigger again, wait for the previous task finished + table.flush(req).await.unwrap(); + + assert!(has_parquet_file(®ion_dir)); +} diff --git a/src/mito/src/table.rs b/src/mito/src/table.rs index 92251b8b2c..a940e49f2e 100644 --- a/src/mito/src/table.rs +++ b/src/mito/src/table.rs @@ -44,7 +44,7 @@ use table::metadata::{ FilterPushDownType, RawTableInfo, TableInfo, TableInfoRef, TableMeta, TableType, }; use table::requests::{ - AddColumnRequest, AlterKind, AlterTableRequest, DeleteRequest, InsertRequest, + AddColumnRequest, AlterKind, AlterTableRequest, DeleteRequest, FlushTableRequest, InsertRequest, }; use table::table::scan::SimpleTableScan; use table::table::{AlterContext, Table}; @@ -323,6 +323,25 @@ impl Table for MitoTable { Ok(rows_deleted) } + async fn flush(&self, request: FlushTableRequest) -> TableResult<()> { + if let Some(region_id) = request.region_number { + if let Some(region) = self.regions.get(®ion_id) { + region + .flush() + .await + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu)?; + } + } else { + futures::future::try_join_all(self.regions.values().map(|region| region.flush())) + .await + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu)?; + } + + Ok(()) + } + async fn close(&self) -> TableResult<()> { futures::future::try_join_all(self.regions.values().map(|region| region.close())) .await diff --git a/src/mito/src/table/test_util.rs b/src/mito/src/table/test_util.rs index be637cc498..dd4ac97459 100644 --- a/src/mito/src/table/test_util.rs +++ b/src/mito/src/table/test_util.rs @@ -20,7 +20,7 @@ use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_test_util::temp_dir::{create_temp_dir, TempDir}; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnSchema, RawSchema, Schema, SchemaBuilder, SchemaRef}; -use datatypes::vectors::VectorRef; +use datatypes::vectors::{Float64Vector, StringVector, TimestampMillisecondVector, VectorRef}; use log_store::NoopLogStore; use object_store::services::Fs as Builder; use object_store::{ObjectStore, ObjectStoreBuilder}; @@ -30,7 +30,7 @@ use storage::EngineImpl; use table::engine::{EngineContext, TableEngine}; use table::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder, TableType}; use table::requests::{CreateTableRequest, InsertRequest, TableOptions}; -use table::TableRef; +use table::{Table, TableRef}; use crate::config::EngineConfig; use crate::engine::{MitoEngine, MITO_ENGINE}; @@ -178,3 +178,19 @@ pub async fn setup_mock_engine_and_table( (mock_engine, table_engine, table, object_store, dir) } + +pub async fn setup_table(table: Arc) { + let mut columns_values: HashMap = HashMap::with_capacity(4); + let hosts: VectorRef = Arc::new(StringVector::from(vec!["host1", "host2", "host3", "host4"])); + let cpus: VectorRef = Arc::new(Float64Vector::from_vec(vec![1.0, 2.0, 3.0, 4.0])); + let memories: VectorRef = Arc::new(Float64Vector::from_vec(vec![1.0, 2.0, 3.0, 4.0])); + let tss: VectorRef = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 2, 2, 1])); + + columns_values.insert("host".to_string(), hosts.clone()); + columns_values.insert("cpu".to_string(), cpus.clone()); + columns_values.insert("memory".to_string(), memories.clone()); + columns_values.insert("ts".to_string(), tss.clone()); + + let insert_req = new_insert_request("demo".to_string(), columns_values); + assert_eq!(4, table.insert(insert_req).await.unwrap()); +} diff --git a/src/mito/src/table/test_util/mock_engine.rs b/src/mito/src/table/test_util/mock_engine.rs index d097f2a9be..c3be8318b8 100644 --- a/src/mito/src/table/test_util/mock_engine.rs +++ b/src/mito/src/table/test_util/mock_engine.rs @@ -200,6 +200,10 @@ impl Region for MockRegion { fn disk_usage_bytes(&self) -> u64 { 0 } + + async fn flush(&self) -> Result<()> { + unimplemented!() + } } impl MockRegionInner { diff --git a/src/storage/src/region.rs b/src/storage/src/region.rs index edff614f6b..a7933ab9bf 100644 --- a/src/storage/src/region.rs +++ b/src/storage/src/region.rs @@ -135,6 +135,10 @@ impl Region for RegionImpl { .map(|level_ssts| level_ssts.files().map(|sst| sst.file_size()).sum::()) .sum() } + + async fn flush(&self) -> Result<()> { + self.inner.flush().await + } } /// Storage related config for region. @@ -560,4 +564,18 @@ impl RegionInner { async fn close(&self) -> Result<()> { self.writer.close().await } + + async fn flush(&self) -> Result<()> { + let writer_ctx = WriterContext { + shared: &self.shared, + flush_strategy: &self.flush_strategy, + flush_scheduler: &self.flush_scheduler, + compaction_scheduler: &self.compaction_scheduler, + sst_layer: &self.sst_layer, + wal: &self.wal, + writer: &self.writer, + manifest: &self.manifest, + }; + self.writer.flush(writer_ctx).await + } } diff --git a/src/storage/src/region/tests/flush.rs b/src/storage/src/region/tests/flush.rs index 0d717272bb..e17b8ff21e 100644 --- a/src/storage/src/region/tests/flush.rs +++ b/src/storage/src/region/tests/flush.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use common_test_util::temp_dir::create_temp_dir; use log_store::raft_engine::log_store::RaftEngineLogStore; -use store_api::storage::{OpenOptions, WriteResponse}; +use store_api::storage::{OpenOptions, Region, WriteResponse}; use crate::engine; use crate::flush::FlushStrategyRef; @@ -94,6 +94,10 @@ impl FlushTester { async fn wait_flush_done(&self) { self.base().region.wait_flush_done().await.unwrap(); } + + async fn flush(&self) { + self.base().region.flush().await.unwrap(); + } } #[tokio::test] @@ -124,6 +128,30 @@ async fn test_flush_and_stall() { assert!(has_parquet_file(&sst_dir)); } +#[tokio::test] +async fn test_manual_flush() { + common_telemetry::init_default_ut_logging(); + let dir = create_temp_dir("manual_flush"); + + let store_dir = dir.path().to_str().unwrap(); + + let flush_switch = Arc::new(FlushSwitch::default()); + let tester = FlushTester::new(store_dir, flush_switch.clone()).await; + + let data = [(1000, Some(100))]; + // Put one element so we have content to flush. + tester.put(&data).await; + + // No parquet file should be flushed. + let sst_dir = format!("{}/{}", store_dir, engine::region_sst_dir("", REGION_NAME)); + assert!(!has_parquet_file(&sst_dir)); + + tester.flush().await; + tester.wait_flush_done().await; + + assert!(has_parquet_file(&sst_dir)); +} + #[tokio::test] async fn test_flush_empty() { let dir = create_temp_dir("flush-empty"); diff --git a/src/storage/src/region/writer.rs b/src/storage/src/region/writer.rs index 56af329c04..86a4236fdd 100644 --- a/src/storage/src/region/writer.rs +++ b/src/storage/src/region/writer.rs @@ -260,6 +260,15 @@ impl RegionWriter { Ok(()) } + /// Flush task manually + pub async fn flush(&self, writer_ctx: WriterContext<'_, S>) -> Result<()> { + let mut inner = self.inner.lock().await; + + ensure!(!inner.is_closed(), error::ClosedRegionSnafu); + + inner.manual_flush(writer_ctx).await + } + /// Cancel flush task if any async fn cancel_flush(&self) -> Result<()> { let mut inner = self.inner.lock().await; @@ -680,6 +689,11 @@ impl WriterInner { Some(schedule_compaction_cb) } + async fn manual_flush(&mut self, writer_ctx: WriterContext<'_, S>) -> Result<()> { + self.trigger_flush(&writer_ctx).await?; + Ok(()) + } + #[inline] fn is_closed(&self) -> bool { self.closed diff --git a/src/store-api/src/storage/region.rs b/src/store-api/src/storage/region.rs index 3d267579bb..df8047f488 100644 --- a/src/store-api/src/storage/region.rs +++ b/src/store-api/src/storage/region.rs @@ -76,6 +76,8 @@ pub trait Region: Send + Sync + Clone + std::fmt::Debug + 'static { async fn close(&self) -> Result<(), Self::Error>; fn disk_usage_bytes(&self) -> u64; + + async fn flush(&self) -> Result<(), Self::Error>; } /// Context for write operations. diff --git a/src/table/src/engine.rs b/src/table/src/engine.rs index 2f46e16eec..bdebd2600c 100644 --- a/src/table/src/engine.rs +++ b/src/table/src/engine.rs @@ -16,8 +16,10 @@ use std::fmt::{self, Display}; use std::sync::Arc; use common_procedure::BoxedProcedure; +use store_api::storage::RegionId; use crate::error::Result; +use crate::metadata::TableId; use crate::requests::{AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest}; use crate::TableRef; @@ -123,6 +125,22 @@ pub trait TableEngineProcedure: Send + Sync { pub type TableEngineProcedureRef = Arc; +/// Generate region name in the form of "{TABLE_ID}_{REGION_NUMBER}" +#[inline] +pub fn region_name(table_id: TableId, n: u32) -> String { + format!("{table_id}_{n:010}") +} + +#[inline] +pub fn region_id(table_id: TableId, n: u32) -> RegionId { + (u64::from(table_id) << 32) | u64::from(n) +} + +#[inline] +pub fn table_dir(catalog_name: &str, schema_name: &str, table_id: TableId) -> String { + format!("{catalog_name}/{schema_name}/{table_id}/") +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index 8cd3472cab..c2e6d31d9a 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -209,6 +209,14 @@ pub struct CopyTableFromRequest { pub from: String, } +#[derive(Debug, Clone, Default)] +pub struct FlushTableRequest { + pub catalog_name: String, + pub schema_name: String, + pub table_name: String, + pub region_number: Option, +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/table/src/table.rs b/src/table/src/table.rs index b59841b8ed..8684da9aff 100644 --- a/src/table/src/table.rs +++ b/src/table/src/table.rs @@ -26,7 +26,7 @@ use datatypes::schema::SchemaRef; use crate::error::{Result, UnsupportedSnafu}; use crate::metadata::{FilterPushDownType, TableId, TableInfoRef, TableType}; -use crate::requests::{AlterTableRequest, DeleteRequest, InsertRequest}; +use crate::requests::{AlterTableRequest, DeleteRequest, FlushTableRequest, InsertRequest}; pub type AlterContext = anymap::Map; @@ -94,6 +94,11 @@ pub trait Table: Send + Sync { .fail()? } + /// Flush table. + async fn flush(&self, _request: FlushTableRequest) -> Result<()> { + UnsupportedSnafu { operation: "FLUSH" }.fail()? + } + /// Close the table. async fn close(&self) -> Result<()> { Ok(())