feat: implement table flush (#1121)

* feat: add flush method for trait

* feat: implement flush via grpc

* chore: move table_dir/region_name/region_id to table crate

* chore: Update src/mito/src/table.rs

---------

Co-authored-by: Yingwen <realevenyag@gmail.com>
This commit is contained in:
Weny Xu
2023-03-13 20:10:37 +08:00
committed by GitHub
parent 604c20a83d
commit 296c6dfcbf
23 changed files with 539 additions and 43 deletions

View File

@@ -19,8 +19,8 @@ use api::v1::ddl_request::Expr as DdlExpr;
use api::v1::greptime_request::Request;
use api::v1::query_request::Query;
use api::v1::{
AlterExpr, AuthHeader, CreateTableExpr, DdlRequest, DropTableExpr, GreptimeRequest,
InsertRequest, PromRangeQuery, QueryRequest, RequestHeader,
AlterExpr, AuthHeader, CreateTableExpr, DdlRequest, DropTableExpr, FlushTableExpr,
GreptimeRequest, InsertRequest, PromRangeQuery, QueryRequest, RequestHeader,
};
use arrow_flight::{FlightData, Ticket};
use common_error::prelude::*;
@@ -135,6 +135,13 @@ impl Database {
.await
}
pub async fn flush_table(&self, expr: FlushTableExpr) -> Result<Output> {
self.do_get(Request::Ddl(DdlRequest {
expr: Some(DdlExpr::FlushTable(expr)),
}))
.await
}
async fn do_get(&self, request: Request) -> Result<Output> {
let request = GreptimeRequest {
header: Some(RequestHeader {

View File

@@ -169,6 +169,13 @@ pub enum Error {
source: TableError,
},
#[snafu(display("Failed to flush table: {}, source: {}", table_name, source))]
FlushTable {
table_name: String,
#[snafu(backtrace)]
source: TableError,
},
#[snafu(display("Failed to start server, source: {}", source))]
StartServer {
#[snafu(backtrace)]
@@ -539,6 +546,7 @@ impl ErrorExt for Error {
source.status_code()
}
DropTable { source, .. } => source.status_code(),
FlushTable { source, .. } => source.status_code(),
Insert { source, .. } => source.status_code(),
Delete { source, .. } => source.status_code(),

View File

@@ -127,7 +127,7 @@ impl Instance {
DdlExpr::Alter(expr) => self.handle_alter(expr).await,
DdlExpr::CreateDatabase(expr) => self.handle_create_database(expr, query_ctx).await,
DdlExpr::DropTable(expr) => self.handle_drop_table(expr).await,
DdlExpr::FlushTable(_) => todo!(),
DdlExpr::FlushTable(expr) => self.handle_flush_table(expr).await,
}
}
}

View File

@@ -12,13 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::{AlterExpr, CreateTableExpr, DropTableExpr};
use api::v1::{AlterExpr, CreateTableExpr, DropTableExpr, FlushTableExpr};
use common_grpc_expr::{alter_expr_to_request, create_expr_to_request};
use common_query::Output;
use common_telemetry::info;
use session::context::QueryContext;
use snafu::prelude::*;
use table::requests::DropTableRequest;
use table::requests::{DropTableRequest, FlushTableRequest};
use crate::error::{
AlterExprToRequestSnafu, BumpTableIdSnafu, CreateExprToRequestSnafu,
@@ -82,6 +82,18 @@ impl Instance {
.execute(SqlRequest::DropTable(req), QueryContext::arc())
.await
}
pub(crate) async fn handle_flush_table(&self, expr: FlushTableExpr) -> Result<Output> {
let req = FlushTableRequest {
catalog_name: expr.catalog_name,
schema_name: expr.schema_name,
table_name: expr.table_name,
region_number: expr.region_id,
};
self.sql_handler()
.execute(SqlRequest::FlushTable(req), QueryContext::arc())
.await
}
}
#[cfg(test)]

View File

@@ -39,6 +39,7 @@ mod copy_table_from;
mod create;
mod delete;
mod drop_table;
mod flush_table;
pub(crate) mod insert;
#[derive(Debug)]
@@ -48,6 +49,7 @@ pub enum SqlRequest {
CreateDatabase(CreateDatabaseRequest),
Alter(AlterTableRequest),
DropTable(DropTableRequest),
FlushTable(FlushTableRequest),
ShowDatabases(ShowDatabases),
ShowTables(ShowTables),
DescribeTable(DescribeTable),
@@ -116,6 +118,7 @@ impl SqlHandler {
})?;
describe_table(table).context(ExecuteSqlSnafu)
}
SqlRequest::FlushTable(req) => self.flush_table(req).await,
};
if let Err(e) = &result {
error!(e; "{query_ctx}");

View File

@@ -0,0 +1,40 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_query::Output;
use snafu::ResultExt;
use table::engine::TableReference;
use table::requests::FlushTableRequest;
use crate::error::{self, Result};
use crate::sql::SqlHandler;
impl SqlHandler {
pub(crate) async fn flush_table(&self, req: FlushTableRequest) -> Result<Output> {
let table_ref = TableReference {
catalog: &req.catalog_name,
schema: &req.schema_name,
table: &req.table_name,
};
let full_table_name = table_ref.to_string();
let table = self.get_table(&table_ref)?;
table.flush(req).await.context(error::FlushTableSnafu {
table_name: full_table_name,
})?;
Ok(Output::AffectedRows(0))
}
}

View File

@@ -19,8 +19,8 @@ use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;
use api::v1::{
column_def, AlterExpr, CreateDatabaseExpr, CreateTableExpr, DropTableExpr, InsertRequest,
TableId,
column_def, AlterExpr, CreateDatabaseExpr, CreateTableExpr, DropTableExpr, FlushTableExpr,
InsertRequest, TableId,
};
use async_trait::async_trait;
use catalog::helper::{SchemaKey, SchemaValue};
@@ -39,7 +39,7 @@ use meta_client::client::MetaClient;
use meta_client::rpc::router::DeleteRequest as MetaDeleteRequest;
use meta_client::rpc::{
CompareAndPutRequest, CreateRequest as MetaCreateRequest, Partition as MetaPartition,
RouteResponse, TableName,
RouteRequest, RouteResponse, TableName,
};
use partition::partition::{PartitionBound, PartitionDef};
use query::error::QueryExecutionSnafu;
@@ -259,6 +259,61 @@ impl DistInstance {
Ok(Output::AffectedRows(1))
}
async fn flush_table(&self, table_name: TableName, region_id: Option<u32>) -> Result<Output> {
let _ = self
.catalog_manager
.table(
&table_name.catalog_name,
&table_name.schema_name,
&table_name.table_name,
)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: table_name.to_string(),
})?;
let route_response = self
.meta_client
.route(RouteRequest {
table_names: vec![table_name.clone()],
})
.await
.context(RequestMetaSnafu)?;
let expr = FlushTableExpr {
catalog_name: table_name.catalog_name.clone(),
schema_name: table_name.schema_name.clone(),
table_name: table_name.table_name.clone(),
region_id,
};
for table_route in &route_response.table_routes {
let should_send_rpc = table_route.region_routes.iter().any(|route| {
if let Some(region_id) = region_id {
region_id == route.region.id as u32
} else {
true
}
});
if !should_send_rpc {
continue;
}
for datanode in table_route.find_leaders() {
debug!("Flushing table {table_name} on Datanode {datanode:?}");
let client = self.datanode_clients.get_client(&datanode).await;
let client = Database::new(&expr.catalog_name, &expr.schema_name, client);
client
.flush_table(expr.clone())
.await
.context(RequestDatanodeSnafu)?;
}
}
Ok(Output::AffectedRows(0))
}
async fn handle_statement(
&self,
stmt: Statement,

View File

@@ -57,7 +57,11 @@ impl GrpcQueryHandler for DistInstance {
TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name);
self.drop_table(table_name).await
}
DdlExpr::FlushTable(_) => todo!(),
DdlExpr::FlushTable(expr) => {
let table_name =
TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name);
self.flush_table(table_name, expr.region_id).await
}
}
}
}

View File

@@ -91,14 +91,15 @@ mod test {
use api::v1::ddl_request::Expr as DdlExpr;
use api::v1::{
alter_expr, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDef,
CreateDatabaseExpr, CreateTableExpr, DdlRequest, DropTableExpr, InsertRequest,
QueryRequest,
CreateDatabaseExpr, CreateTableExpr, DdlRequest, DropTableExpr, FlushTableExpr,
InsertRequest, QueryRequest,
};
use catalog::helper::{TableGlobalKey, TableGlobalValue};
use common_query::Output;
use common_recordbatch::RecordBatches;
use query::parser::QueryLanguageParser;
use session::context::QueryContext;
use tests::{has_parquet_file, test_region_dir};
use super::*;
use crate::table::DistTable;
@@ -352,6 +353,108 @@ CREATE TABLE {table_name} (
test_insert_and_query_on_auto_created_table(instance).await
}
#[tokio::test(flavor = "multi_thread")]
async fn test_distributed_flush_table() {
common_telemetry::init_default_ut_logging();
let instance = tests::create_distributed_instance("test_distributed_flush_table").await;
let data_tmp_dirs = instance.data_tmp_dirs();
let frontend = instance.frontend.as_ref();
let table_name = "my_dist_table";
let sql = format!(
r"
CREATE TABLE {table_name} (
a INT,
ts TIMESTAMP,
TIME INDEX (ts)
) PARTITION BY RANGE COLUMNS(a) (
PARTITION r0 VALUES LESS THAN (10),
PARTITION r1 VALUES LESS THAN (20),
PARTITION r2 VALUES LESS THAN (50),
PARTITION r3 VALUES LESS THAN (MAXVALUE),
)"
);
create_table(frontend, sql).await;
test_insert_and_query_on_existing_table(frontend, table_name).await;
flush_table(frontend, "greptime", "public", table_name, None).await;
// Wait for previous task finished
flush_table(frontend, "greptime", "public", table_name, None).await;
let table_id = 1024;
let table = instance
.frontend
.catalog_manager()
.table("greptime", "public", table_name)
.await
.unwrap()
.unwrap();
let table = table.as_any().downcast_ref::<DistTable>().unwrap();
let TableGlobalValue { regions_id_map, .. } = table
.table_global_value(&TableGlobalKey {
catalog_name: "greptime".to_string(),
schema_name: "public".to_string(),
table_name: table_name.to_string(),
})
.await
.unwrap()
.unwrap();
let region_to_dn_map = regions_id_map
.iter()
.map(|(k, v)| (v[0], *k))
.collect::<HashMap<u32, u64>>();
for (region, dn) in region_to_dn_map.iter() {
// data_tmp_dirs -> dn: 1..4
let data_tmp_dir = data_tmp_dirs.get((*dn - 1) as usize).unwrap();
let region_dir = test_region_dir(
data_tmp_dir.path().to_str().unwrap(),
"greptime",
"public",
table_id,
*region,
);
has_parquet_file(&region_dir);
}
}
#[tokio::test(flavor = "multi_thread")]
async fn test_standalone_flush_table() {
common_telemetry::init_default_ut_logging();
let standalone = tests::create_standalone_instance("test_standalone_flush_table").await;
let instance = &standalone.instance;
let data_tmp_dir = standalone.data_tmp_dir();
let table_name = "my_table";
let sql = format!("CREATE TABLE {table_name} (a INT, ts TIMESTAMP, TIME INDEX (ts))");
create_table(instance, sql).await;
test_insert_and_query_on_existing_table(instance, table_name).await;
let table_id = 1024;
let region_id = 0;
let region_dir = test_region_dir(
data_tmp_dir.path().to_str().unwrap(),
"greptime",
"public",
table_id,
region_id,
);
assert!(!has_parquet_file(&region_dir));
flush_table(instance, "greptime", "public", "my_table", None).await;
// Wait for previous task finished
flush_table(instance, "greptime", "public", "my_table", None).await;
assert!(has_parquet_file(&region_dir));
}
async fn create_table(frontend: &Instance, sql: String) {
let request = Request::Query(QueryRequest {
query: Some(Query::Sql(sql)),
@@ -360,6 +463,26 @@ CREATE TABLE {table_name} (
assert!(matches!(output, Output::AffectedRows(0)));
}
async fn flush_table(
frontend: &Instance,
catalog_name: &str,
schema_name: &str,
table_name: &str,
region_id: Option<u32>,
) {
let request = Request::Ddl(DdlRequest {
expr: Some(DdlExpr::FlushTable(FlushTableExpr {
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
table_name: table_name.to_string(),
region_id,
})),
});
let output = query(frontend, request).await;
assert!(matches!(output, Output::AffectedRows(0)));
}
async fn test_insert_and_query_on_existing_table(instance: &Instance, table_name: &str) {
let insert = InsertRequest {
table_name: table_name.to_string(),

View File

@@ -34,6 +34,7 @@ use partition::route::TableRoutes;
use servers::grpc::GrpcServer;
use servers::query_handler::grpc::ServerGrpcQueryHandlerAdaptor;
use servers::Mode;
use table::engine::{region_name, table_dir};
use tonic::transport::Server;
use tower::service_fn;
@@ -56,11 +57,23 @@ pub(crate) struct MockDistributedInstance {
_guards: Vec<TestGuard>,
}
impl MockDistributedInstance {
pub fn data_tmp_dirs(&self) -> Vec<&TempDir> {
self._guards.iter().map(|g| &g._data_tmp_dir).collect()
}
}
pub(crate) struct MockStandaloneInstance {
pub(crate) instance: Arc<Instance>,
_guard: TestGuard,
}
impl MockStandaloneInstance {
pub fn data_tmp_dir(&self) -> &TempDir {
&self._guard._data_tmp_dir
}
}
pub(crate) async fn create_standalone_instance(test_name: &str) -> MockStandaloneInstance {
let (opts, guard) = create_tmp_dir_and_datanode_opts(test_name);
let datanode_instance = DatanodeInstance::new(&opts).await.unwrap();
@@ -269,3 +282,29 @@ pub(crate) async fn create_distributed_instance(test_name: &str) -> MockDistribu
_guards: test_guards,
}
}
pub fn test_region_dir(
dir: &str,
catalog_name: &str,
schema_name: &str,
table_id: u32,
region_id: u32,
) -> String {
let table_dir = table_dir(catalog_name, schema_name, table_id);
let region_name = region_name(table_id, region_id);
format!("{}/{}/{}", dir, table_dir, region_name)
}
pub fn has_parquet_file(sst_dir: &str) -> bool {
for entry in std::fs::read_dir(sst_dir).unwrap() {
let entry = entry.unwrap();
let path = entry.path();
if !path.is_dir() {
assert_eq!("parquet", path.extension().unwrap());
return true;
}
}
false
}

View File

@@ -31,13 +31,14 @@ use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::{
ColumnDescriptorBuilder, ColumnFamilyDescriptor, ColumnFamilyDescriptorBuilder, ColumnId,
CreateOptions, EngineContext as StorageEngineContext, OpenOptions, Region,
RegionDescriptorBuilder, RegionId, RowKeyDescriptor, RowKeyDescriptorBuilder, StorageEngine,
RegionDescriptorBuilder, RowKeyDescriptor, RowKeyDescriptorBuilder, StorageEngine,
};
use table::engine::{
region_id, region_name, table_dir, EngineContext, TableEngine, TableEngineProcedure,
TableReference,
};
use table::engine::{EngineContext, TableEngine, TableEngineProcedure, TableReference};
use table::error::TableOperationSnafu;
use table::metadata::{
TableId, TableInfo, TableInfoBuilder, TableMetaBuilder, TableType, TableVersion,
};
use table::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder, TableType, TableVersion};
use table::requests::{
AlterKind, AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest,
};
@@ -59,22 +60,6 @@ pub const MITO_ENGINE: &str = "mito";
pub const INIT_COLUMN_ID: ColumnId = 0;
const INIT_TABLE_VERSION: TableVersion = 0;
/// Generate region name in the form of "{TABLE_ID}_{REGION_NUMBER}"
#[inline]
fn region_name(table_id: TableId, n: u32) -> String {
format!("{table_id}_{n:010}")
}
#[inline]
fn region_id(table_id: TableId, n: u32) -> RegionId {
(u64::from(table_id) << 32) | u64::from(n)
}
#[inline]
fn table_dir(catalog_name: &str, schema_name: &str, table_id: TableId) -> String {
format!("{catalog_name}/{schema_name}/{table_id}/")
}
/// [TableEngine] implementation.
///
/// About mito <https://en.wikipedia.org/wiki/Alfa_Romeo_MiTo>.

View File

@@ -25,6 +25,7 @@ use store_api::storage::{
ColumnId, CreateOptions, EngineContext, OpenOptions, RegionDescriptorBuilder, RegionNumber,
StorageEngine,
};
use table::engine::{region_id, table_dir};
use table::metadata::{TableInfoBuilder, TableMetaBuilder, TableType};
use table::requests::CreateTableRequest;
@@ -146,7 +147,7 @@ impl<S: StorageEngine> CreateMitoTable<S> {
/// Creates regions for the table.
async fn on_create_regions(&mut self) -> Result<Status> {
let engine_ctx = EngineContext::default();
let table_dir = engine::table_dir(
let table_dir = table_dir(
&self.data.request.catalog_name,
&self.data.request.schema_name,
self.data.request.id,
@@ -203,7 +204,7 @@ impl<S: StorageEngine> CreateMitoTable<S> {
}
// We need to create that region.
let region_id = engine::region_id(self.data.request.id, *number);
let region_id = region_id(self.data.request.id, *number);
let region_desc = RegionDescriptorBuilder::default()
.id(region_id)
.name(region_name.clone())
@@ -234,7 +235,7 @@ impl<S: StorageEngine> CreateMitoTable<S> {
/// Writes metadata to the table manifest.
async fn on_write_table_manifest(&mut self) -> Result<Status> {
let table_dir = engine::table_dir(
let table_dir = table_dir(
&self.data.request.catalog_name,
&self.data.request.schema_name,
self.data.request.id,

View File

@@ -31,14 +31,28 @@ use storage::region::RegionImpl;
use storage::EngineImpl;
use store_api::manifest::Manifest;
use store_api::storage::ReadContext;
use table::requests::{AddColumnRequest, AlterKind, DeleteRequest, TableOptions};
use table::requests::{
AddColumnRequest, AlterKind, DeleteRequest, FlushTableRequest, TableOptions,
};
use super::*;
use crate::table::test_util;
use crate::table::test_util::{
new_insert_request, schema_for_test, TestEngineComponents, TABLE_NAME,
self, new_insert_request, schema_for_test, setup_table, TestEngineComponents, TABLE_NAME,
};
pub fn has_parquet_file(sst_dir: &str) -> bool {
for entry in std::fs::read_dir(sst_dir).unwrap() {
let entry = entry.unwrap();
let path = entry.path();
if !path.is_dir() {
assert_eq!("parquet", path.extension().unwrap());
return true;
}
}
false
}
async fn setup_table_with_column_default_constraint() -> (TempDir, String, TableRef) {
let table_name = "test_default_constraint";
let column_schemas = vec![
@@ -752,3 +766,76 @@ async fn test_table_delete_rows() {
+-------+-----+--------+-------------------------+"
);
}
#[tokio::test]
async fn test_flush_table_all_regions() {
let TestEngineComponents {
table_ref: table,
dir,
..
} = test_util::setup_test_engine_and_table().await;
setup_table(table.clone()).await;
let table_id = 1u32;
let region_name = region_name(table_id, 0);
let table_info = table.table_info();
let table_dir = table_dir(&table_info.catalog_name, &table_info.schema_name, table_id);
let region_dir = format!(
"{}/{}/{}",
dir.path().to_str().unwrap(),
table_dir,
region_name
);
assert!(!has_parquet_file(&region_dir));
// Trigger flush all region
table.flush(FlushTableRequest::default()).await.unwrap();
// Trigger again, wait for the previous task finished
table.flush(FlushTableRequest::default()).await.unwrap();
assert!(has_parquet_file(&region_dir));
}
#[tokio::test]
async fn test_flush_table_with_region_id() {
let TestEngineComponents {
table_ref: table,
dir,
..
} = test_util::setup_test_engine_and_table().await;
setup_table(table.clone()).await;
let table_id = 1u32;
let region_name = region_name(table_id, 0);
let table_info = table.table_info();
let table_dir = table_dir(&table_info.catalog_name, &table_info.schema_name, table_id);
let region_dir = format!(
"{}/{}/{}",
dir.path().to_str().unwrap(),
table_dir,
region_name
);
assert!(!has_parquet_file(&region_dir));
let req = FlushTableRequest {
region_number: Some(0),
..Default::default()
};
// Trigger flush all region
table.flush(req.clone()).await.unwrap();
// Trigger again, wait for the previous task finished
table.flush(req).await.unwrap();
assert!(has_parquet_file(&region_dir));
}

View File

@@ -44,7 +44,7 @@ use table::metadata::{
FilterPushDownType, RawTableInfo, TableInfo, TableInfoRef, TableMeta, TableType,
};
use table::requests::{
AddColumnRequest, AlterKind, AlterTableRequest, DeleteRequest, InsertRequest,
AddColumnRequest, AlterKind, AlterTableRequest, DeleteRequest, FlushTableRequest, InsertRequest,
};
use table::table::scan::SimpleTableScan;
use table::table::{AlterContext, Table};
@@ -323,6 +323,25 @@ impl<R: Region> Table for MitoTable<R> {
Ok(rows_deleted)
}
async fn flush(&self, request: FlushTableRequest) -> TableResult<()> {
if let Some(region_id) = request.region_number {
if let Some(region) = self.regions.get(&region_id) {
region
.flush()
.await
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
}
} else {
futures::future::try_join_all(self.regions.values().map(|region| region.flush()))
.await
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
}
Ok(())
}
async fn close(&self) -> TableResult<()> {
futures::future::try_join_all(self.regions.values().map(|region| region.close()))
.await

View File

@@ -20,7 +20,7 @@ use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, RawSchema, Schema, SchemaBuilder, SchemaRef};
use datatypes::vectors::VectorRef;
use datatypes::vectors::{Float64Vector, StringVector, TimestampMillisecondVector, VectorRef};
use log_store::NoopLogStore;
use object_store::services::Fs as Builder;
use object_store::{ObjectStore, ObjectStoreBuilder};
@@ -30,7 +30,7 @@ use storage::EngineImpl;
use table::engine::{EngineContext, TableEngine};
use table::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder, TableType};
use table::requests::{CreateTableRequest, InsertRequest, TableOptions};
use table::TableRef;
use table::{Table, TableRef};
use crate::config::EngineConfig;
use crate::engine::{MitoEngine, MITO_ENGINE};
@@ -178,3 +178,19 @@ pub async fn setup_mock_engine_and_table(
(mock_engine, table_engine, table, object_store, dir)
}
pub async fn setup_table(table: Arc<dyn Table>) {
let mut columns_values: HashMap<String, VectorRef> = HashMap::with_capacity(4);
let hosts: VectorRef = Arc::new(StringVector::from(vec!["host1", "host2", "host3", "host4"]));
let cpus: VectorRef = Arc::new(Float64Vector::from_vec(vec![1.0, 2.0, 3.0, 4.0]));
let memories: VectorRef = Arc::new(Float64Vector::from_vec(vec![1.0, 2.0, 3.0, 4.0]));
let tss: VectorRef = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 2, 2, 1]));
columns_values.insert("host".to_string(), hosts.clone());
columns_values.insert("cpu".to_string(), cpus.clone());
columns_values.insert("memory".to_string(), memories.clone());
columns_values.insert("ts".to_string(), tss.clone());
let insert_req = new_insert_request("demo".to_string(), columns_values);
assert_eq!(4, table.insert(insert_req).await.unwrap());
}

View File

@@ -200,6 +200,10 @@ impl Region for MockRegion {
fn disk_usage_bytes(&self) -> u64 {
0
}
async fn flush(&self) -> Result<()> {
unimplemented!()
}
}
impl MockRegionInner {

View File

@@ -135,6 +135,10 @@ impl<S: LogStore> Region for RegionImpl<S> {
.map(|level_ssts| level_ssts.files().map(|sst| sst.file_size()).sum::<u64>())
.sum()
}
async fn flush(&self) -> Result<()> {
self.inner.flush().await
}
}
/// Storage related config for region.
@@ -560,4 +564,18 @@ impl<S: LogStore> RegionInner<S> {
async fn close(&self) -> Result<()> {
self.writer.close().await
}
async fn flush(&self) -> Result<()> {
let writer_ctx = WriterContext {
shared: &self.shared,
flush_strategy: &self.flush_strategy,
flush_scheduler: &self.flush_scheduler,
compaction_scheduler: &self.compaction_scheduler,
sst_layer: &self.sst_layer,
wal: &self.wal,
writer: &self.writer,
manifest: &self.manifest,
};
self.writer.flush(writer_ctx).await
}
}

View File

@@ -18,7 +18,7 @@ use std::sync::Arc;
use common_test_util::temp_dir::create_temp_dir;
use log_store::raft_engine::log_store::RaftEngineLogStore;
use store_api::storage::{OpenOptions, WriteResponse};
use store_api::storage::{OpenOptions, Region, WriteResponse};
use crate::engine;
use crate::flush::FlushStrategyRef;
@@ -94,6 +94,10 @@ impl FlushTester {
async fn wait_flush_done(&self) {
self.base().region.wait_flush_done().await.unwrap();
}
async fn flush(&self) {
self.base().region.flush().await.unwrap();
}
}
#[tokio::test]
@@ -124,6 +128,30 @@ async fn test_flush_and_stall() {
assert!(has_parquet_file(&sst_dir));
}
#[tokio::test]
async fn test_manual_flush() {
common_telemetry::init_default_ut_logging();
let dir = create_temp_dir("manual_flush");
let store_dir = dir.path().to_str().unwrap();
let flush_switch = Arc::new(FlushSwitch::default());
let tester = FlushTester::new(store_dir, flush_switch.clone()).await;
let data = [(1000, Some(100))];
// Put one element so we have content to flush.
tester.put(&data).await;
// No parquet file should be flushed.
let sst_dir = format!("{}/{}", store_dir, engine::region_sst_dir("", REGION_NAME));
assert!(!has_parquet_file(&sst_dir));
tester.flush().await;
tester.wait_flush_done().await;
assert!(has_parquet_file(&sst_dir));
}
#[tokio::test]
async fn test_flush_empty() {
let dir = create_temp_dir("flush-empty");

View File

@@ -260,6 +260,15 @@ impl RegionWriter {
Ok(())
}
/// Flush task manually
pub async fn flush<S: LogStore>(&self, writer_ctx: WriterContext<'_, S>) -> Result<()> {
let mut inner = self.inner.lock().await;
ensure!(!inner.is_closed(), error::ClosedRegionSnafu);
inner.manual_flush(writer_ctx).await
}
/// Cancel flush task if any
async fn cancel_flush(&self) -> Result<()> {
let mut inner = self.inner.lock().await;
@@ -680,6 +689,11 @@ impl WriterInner {
Some(schedule_compaction_cb)
}
async fn manual_flush<S: LogStore>(&mut self, writer_ctx: WriterContext<'_, S>) -> Result<()> {
self.trigger_flush(&writer_ctx).await?;
Ok(())
}
#[inline]
fn is_closed(&self) -> bool {
self.closed

View File

@@ -76,6 +76,8 @@ pub trait Region: Send + Sync + Clone + std::fmt::Debug + 'static {
async fn close(&self) -> Result<(), Self::Error>;
fn disk_usage_bytes(&self) -> u64;
async fn flush(&self) -> Result<(), Self::Error>;
}
/// Context for write operations.

View File

@@ -16,8 +16,10 @@ use std::fmt::{self, Display};
use std::sync::Arc;
use common_procedure::BoxedProcedure;
use store_api::storage::RegionId;
use crate::error::Result;
use crate::metadata::TableId;
use crate::requests::{AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest};
use crate::TableRef;
@@ -123,6 +125,22 @@ pub trait TableEngineProcedure: Send + Sync {
pub type TableEngineProcedureRef = Arc<dyn TableEngineProcedure>;
/// Generate region name in the form of "{TABLE_ID}_{REGION_NUMBER}"
#[inline]
pub fn region_name(table_id: TableId, n: u32) -> String {
format!("{table_id}_{n:010}")
}
#[inline]
pub fn region_id(table_id: TableId, n: u32) -> RegionId {
(u64::from(table_id) << 32) | u64::from(n)
}
#[inline]
pub fn table_dir(catalog_name: &str, schema_name: &str, table_id: TableId) -> String {
format!("{catalog_name}/{schema_name}/{table_id}/")
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -209,6 +209,14 @@ pub struct CopyTableFromRequest {
pub from: String,
}
#[derive(Debug, Clone, Default)]
pub struct FlushTableRequest {
pub catalog_name: String,
pub schema_name: String,
pub table_name: String,
pub region_number: Option<RegionNumber>,
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -26,7 +26,7 @@ use datatypes::schema::SchemaRef;
use crate::error::{Result, UnsupportedSnafu};
use crate::metadata::{FilterPushDownType, TableId, TableInfoRef, TableType};
use crate::requests::{AlterTableRequest, DeleteRequest, InsertRequest};
use crate::requests::{AlterTableRequest, DeleteRequest, FlushTableRequest, InsertRequest};
pub type AlterContext = anymap::Map<dyn Any + Send + Sync>;
@@ -94,6 +94,11 @@ pub trait Table: Send + Sync {
.fail()?
}
/// Flush table.
async fn flush(&self, _request: FlushTableRequest) -> Result<()> {
UnsupportedSnafu { operation: "FLUSH" }.fail()?
}
/// Close the table.
async fn close(&self) -> Result<()> {
Ok(())