From e7b4a00ef05ca1065817fa0c9a4d03bfe6112497 Mon Sep 17 00:00:00 2001 From: LFC Date: Mon, 14 Nov 2022 15:49:25 +0800 Subject: [PATCH] feat: create distributed table in Frontend (#475) * feat: create distributed table in Frontend * fix: some table creation issues (#482) Co-authored-by: luofucong Co-authored-by: Lei, Huang <6406592+v0y4g3r@users.noreply.github.com> --- src/catalog/src/error.rs | 8 - src/client/src/client.rs | 7 +- src/client/src/error.rs | 3 +- src/cmd/src/datanode.rs | 28 +- src/cmd/src/frontend.rs | 14 +- src/cmd/src/lib.rs | 2 + src/common/catalog/src/helper.rs | 5 +- src/datanode/src/instance.rs | 6 +- src/frontend/Cargo.toml | 2 +- src/frontend/src/catalog.rs | 18 +- src/frontend/src/error.rs | 90 +++- src/frontend/src/frontend.rs | 3 +- src/frontend/src/instance.rs | 113 +++-- src/frontend/src/instance/distributed.rs | 329 +++++++++++++ src/frontend/src/instance/influxdb.rs | 2 +- src/frontend/src/instance/opentsdb.rs | 2 +- src/frontend/src/instance/prometheus.rs | 2 +- src/frontend/src/partitioning.rs | 134 ++++- src/frontend/src/partitioning/columns.rs | 27 +- src/frontend/src/partitioning/range.rs | 32 +- src/frontend/src/spliter.rs | 5 + src/frontend/src/table.rs | 593 +++++++++++++++++------ src/frontend/src/table/route.rs | 9 + src/meta-client/src/rpc.rs | 2 +- src/meta-client/src/rpc/router.rs | 42 +- 25 files changed, 1211 insertions(+), 267 deletions(-) create mode 100644 src/frontend/src/instance/distributed.rs diff --git a/src/catalog/src/error.rs b/src/catalog/src/error.rs index ed2a8018a2..fb9a14284d 100644 --- a/src/catalog/src/error.rs +++ b/src/catalog/src/error.rs @@ -171,13 +171,6 @@ pub enum Error { source: meta_client::error::Error, }, - #[snafu(display("Failed to deserialize partition rule from string: {:?}", data))] - DeserializePartitionRule { - data: String, - source: serde_json::error::Error, - backtrace: Backtrace, - }, - #[snafu(display("Invalid table schema in catalog, source: {:?}", source))] InvalidSchemaInCatalog { #[snafu(backtrace)] @@ -226,7 +219,6 @@ impl ErrorExt for Error { Error::SystemCatalogTableScan { source } => source.status_code(), Error::SystemCatalogTableScanExec { source } => source.status_code(), Error::InvalidTableSchema { source, .. } => source.status_code(), - Error::DeserializePartitionRule { .. } => StatusCode::Unexpected, Error::InvalidSchemaInCatalog { .. } => StatusCode::Unexpected, Error::Internal { source, .. } => source.status_code(), } diff --git a/src/client/src/client.rs b/src/client/src/client.rs index 05bfb4c0c3..8913f6139c 100644 --- a/src/client/src/client.rs +++ b/src/client/src/client.rs @@ -128,8 +128,11 @@ impl Client { .context(error::IllegalGrpcClientStateSnafu { err_msg: "No available peer found", })?; - let mut client = self.make_client(peer)?; - let result = client.batch(req).await.context(error::TonicStatusSnafu)?; + let mut client = self.make_client(&peer)?; + let result = client + .batch(req) + .await + .context(error::TonicStatusSnafu { addr: peer })?; Ok(result.into_inner()) } diff --git a/src/client/src/error.rs b/src/client/src/error.rs index c8c4517667..1ca36f1c24 100644 --- a/src/client/src/error.rs +++ b/src/client/src/error.rs @@ -25,8 +25,9 @@ pub enum Error { #[snafu(display("Missing result header"))] MissingHeader, - #[snafu(display("Tonic internal error, source: {}", source))] + #[snafu(display("Tonic internal error, addr: {}, source: {}", addr, source))] TonicStatus { + addr: String, source: tonic::Status, backtrace: Backtrace, }, diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index 851282896e..9ab84d1ff7 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -84,9 +84,9 @@ impl TryFrom for DatanodeOptions { // Running mode is only set to Distributed when // both metasrv addr and node id are set in // commandline options - opts.meta_client_opts.metasrv_addr = meta_addr; + opts.meta_client_opts.metasrv_addr = meta_addr.clone(); opts.node_id = node_id; - opts.mode = Mode::Distributed; + opts.mode = Mode::Distributed(vec![meta_addr]); } (None, None) => { opts.mode = Mode::Standalone; @@ -110,6 +110,8 @@ impl TryFrom for DatanodeOptions { #[cfg(test)] mod tests { + use std::assert_matches::assert_matches; + use datanode::datanode::ObjectStoreConfig; use frontend::frontend::Mode; @@ -162,18 +164,16 @@ mod tests { .mode ); - assert_eq!( - Mode::Distributed, - DatanodeOptions::try_from(StartCommand { - node_id: Some(42), - rpc_addr: None, - mysql_addr: None, - metasrv_addr: Some("127.0.0.1:3002".to_string()), - config_file: None - }) - .unwrap() - .mode - ); + let mode = DatanodeOptions::try_from(StartCommand { + node_id: Some(42), + rpc_addr: None, + mysql_addr: None, + metasrv_addr: Some("127.0.0.1:3002".to_string()), + config_file: None, + }) + .unwrap() + .mode; + assert_matches!(mode, Mode::Distributed(_)); assert!(DatanodeOptions::try_from(StartCommand { node_id: None, diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index 76e37d4295..52d4dbfcd2 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -1,5 +1,5 @@ use clap::Parser; -use frontend::frontend::{Frontend, FrontendOptions}; +use frontend::frontend::{Frontend, FrontendOptions, Mode}; use frontend::grpc::GrpcOptions; use frontend::influxdb::InfluxdbOptions; use frontend::instance::Instance; @@ -52,6 +52,8 @@ pub struct StartCommand { config_file: Option, #[clap(short, long)] influxdb_enable: Option, + #[clap(long)] + metasrv_addr: Option, } impl StartCommand { @@ -107,6 +109,15 @@ impl TryFrom for FrontendOptions { if let Some(enable) = cmd.influxdb_enable { opts.influxdb_options = Some(InfluxdbOptions { enable }); } + if let Some(metasrv_addr) = cmd.metasrv_addr { + opts.mode = Mode::Distributed( + metasrv_addr + .split(',') + .into_iter() + .map(|x| x.trim().to_string()) + .collect::>(), + ); + } Ok(opts) } } @@ -125,6 +136,7 @@ mod tests { opentsdb_addr: Some("127.0.0.1:4321".to_string()), influxdb_enable: Some(false), config_file: None, + metasrv_addr: None, }; let opts: FrontendOptions = command.try_into().unwrap(); diff --git a/src/cmd/src/lib.rs b/src/cmd/src/lib.rs index 56e81f9cdd..ee7ee01d6e 100644 --- a/src/cmd/src/lib.rs +++ b/src/cmd/src/lib.rs @@ -1,3 +1,5 @@ +#![feature(assert_matches)] + pub mod datanode; pub mod error; pub mod frontend; diff --git a/src/common/catalog/src/helper.rs b/src/common/catalog/src/helper.rs index 722bda9b02..9009c7d762 100644 --- a/src/common/catalog/src/helper.rs +++ b/src/common/catalog/src/helper.rs @@ -121,10 +121,8 @@ pub struct TableGlobalValue { // TODO(LFC): Maybe remove it? /// Allocation of region ids across all datanodes. pub regions_id_map: HashMap>, - /// Node id -> region ids + // TODO(LFC): Too much for assembling the table schema that DistTable needs, find another way. pub meta: RawTableMeta, - /// Partition rules for table - pub partition_rules: String, } /// Table regional info that varies between datanode, so it contains a `node_id` field. @@ -332,7 +330,6 @@ mod tests { node_id: 0, regions_id_map: HashMap::from([(0, vec![1, 2, 3])]), meta, - partition_rules: "{}".to_string(), }; let serialized = serde_json::to_string(&value).unwrap(); let deserialized = TableGlobalValue::parse(&serialized).unwrap(); diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index bc5fda55f6..4bb5be9bf2 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -51,7 +51,7 @@ impl Instance { let meta_client = match opts.mode { Mode::Standalone => None, - Mode::Distributed => { + Mode::Distributed(_) => { let meta_client = new_metasrv_client(opts.node_id, &opts.meta_client_opts).await?; Some(Arc::new(meta_client)) } @@ -83,7 +83,7 @@ impl Instance { ) } - Mode::Distributed => { + Mode::Distributed(_) => { let catalog = Arc::new(catalog::remote::RemoteCatalogManager::new( table_engine.clone(), opts.node_id, @@ -102,7 +102,7 @@ impl Instance { let heartbeat_task = match opts.mode { Mode::Standalone => None, - Mode::Distributed => Some(HeartbeatTask::new( + Mode::Distributed(_) => Some(HeartbeatTask::new( opts.node_id, /*node id not set*/ opts.rpc_addr.clone(), meta_client.as_ref().unwrap().clone(), diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 998e9adea6..d9b7de4219 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -8,6 +8,7 @@ api = { path = "../api" } async-stream = "0.3" async-trait = "0.1" catalog = { path = "../catalog" } +chrono = "0.4" client = { path = "../client" } common-base = { path = "../common/base" } common-error = { path = "../common/error" } @@ -49,7 +50,6 @@ features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc [dev-dependencies] datanode = { path = "../datanode" } -chrono = "0.4" futures = "0.3" meta-srv = { path = "../meta-srv", features = ["mock"] } tempdir = "0.3" diff --git a/src/frontend/src/catalog.rs b/src/frontend/src/catalog.rs index 0a22365b95..0aabbbb7f4 100644 --- a/src/frontend/src/catalog.rs +++ b/src/frontend/src/catalog.rs @@ -2,9 +2,7 @@ use std::any::Any; use std::collections::HashSet; use std::sync::Arc; -use catalog::error::{ - DeserializePartitionRuleSnafu, InvalidCatalogValueSnafu, InvalidSchemaInCatalogSnafu, -}; +use catalog::error::{InvalidCatalogValueSnafu, InvalidSchemaInCatalogSnafu}; use catalog::remote::{Kv, KvBackendRef}; use catalog::{ CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, RegisterSchemaRequest, @@ -17,7 +15,6 @@ use snafu::prelude::*; use table::TableRef; use crate::datanode::DatanodeClients; -use crate::partitioning::range::RangePartitionRule; use crate::table::route::TableRoutes; use crate::table::DistTable; @@ -40,6 +37,10 @@ impl FrontendCatalogManager { datanode_clients, } } + + pub(crate) fn backend(&self) -> KvBackendRef { + self.backend.clone() + } } // FIXME(hl): Frontend only needs a CatalogList, should replace with trait upcasting @@ -249,14 +250,6 @@ impl SchemaProvider for FrontendSchemaProvider { let val = TableGlobalValue::parse(String::from_utf8_lossy(&res.1)) .context(InvalidCatalogValueSnafu)?; - // TODO(hl): We need to deserialize string to PartitionRule trait object - let partition_rule: Arc = - Arc::new(serde_json::from_str(&val.partition_rules).context( - DeserializePartitionRuleSnafu { - data: &val.partition_rules, - }, - )?); - let table = Arc::new(DistTable { table_name, schema: Arc::new( @@ -265,7 +258,6 @@ impl SchemaProvider for FrontendSchemaProvider { .try_into() .context(InvalidSchemaInCatalogSnafu)?, ), - partition_rule, table_routes, datanode_clients, }); diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index e0faeb4806..dfe9be93ff 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -164,18 +164,24 @@ pub enum Error { backtrace: Backtrace, }, - #[snafu(display("Failed access catalog: {}", source))] + #[snafu(display("General catalog error: {}", source))] Catalog { #[snafu(backtrace)] source: catalog::error::Error, }, - #[snafu(display("Failed to parse catalog entry: {}", source))] - ParseCatalogEntry { + #[snafu(display("Failed to serialize or deserialize catalog entry: {}", source))] + CatalogEntrySerde { #[snafu(backtrace)] source: common_catalog::error::Error, }, + #[snafu(display("Failed to start Meta client, source: {}", source))] + StartMetaClient { + #[snafu(backtrace)] + source: meta_client::error::Error, + }, + #[snafu(display("Failed to request Meta, source: {}", source))] RequestMeta { #[snafu(backtrace)] @@ -280,6 +286,63 @@ pub enum Error { #[snafu(backtrace)] source: sql::error::Error, }, + + #[snafu(display("Failed to find region routes for table {}", table_name))] + FindRegionRoutes { + table_name: String, + backtrace: Backtrace, + }, + + #[snafu(display("Failed to serialize value to json, source: {}", source))] + SerializeJson { + source: serde_json::Error, + backtrace: Backtrace, + }, + + #[snafu(display("Failed to deserialize value from json, source: {}", source))] + DeserializeJson { + source: serde_json::Error, + backtrace: Backtrace, + }, + + #[snafu(display( + "Failed to find leader peer for region {} in table {}", + region, + table_name + ))] + FindLeaderPeer { + region: u64, + table_name: String, + backtrace: Backtrace, + }, + + #[snafu(display( + "Failed to find partition info for region {} in table {}", + region, + table_name + ))] + FindRegionPartition { + region: u64, + table_name: String, + backtrace: Backtrace, + }, + + #[snafu(display( + "Illegal table routes data for table {}, error message: {}", + table_name, + err_msg + ))] + IllegalTableRoutesData { + table_name: String, + err_msg: String, + backtrace: Backtrace, + }, + + #[snafu(display("Invalid admin result, source: {}", source))] + InvalidAdminResult { + #[snafu(backtrace)] + source: client::Error, + }, } pub type Result = std::result::Result; @@ -287,8 +350,7 @@ pub type Result = std::result::Result; impl ErrorExt for Error { fn status_code(&self) -> StatusCode { match self { - Error::ConnectDatanode { .. } - | Error::ParseAddr { .. } + Error::ParseAddr { .. } | Error::InvalidSql { .. } | Error::FindRegion { .. } | Error::FindRegions { .. } @@ -311,12 +373,20 @@ impl ErrorExt for Error { Error::ConvertColumnDefaultConstraint { source, .. } | Error::ConvertScalarValue { source, .. } => source.status_code(), - Error::RequestDatanode { source } => source.status_code(), + Error::ConnectDatanode { source, .. } + | Error::RequestDatanode { source } + | Error::InvalidAdminResult { source } => source.status_code(), Error::ColumnDataType { .. } | Error::FindDatanode { .. } | Error::GetCache { .. } - | Error::FindTableRoutes { .. } => StatusCode::Internal, + | Error::FindTableRoutes { .. } + | Error::SerializeJson { .. } + | Error::DeserializeJson { .. } + | Error::FindRegionRoutes { .. } + | Error::FindLeaderPeer { .. } + | Error::FindRegionPartition { .. } + | Error::IllegalTableRoutesData { .. } => StatusCode::Internal, Error::IllegalFrontendState { .. } | Error::IncompleteGrpcResult { .. } => { StatusCode::Unexpected @@ -328,9 +398,11 @@ impl ErrorExt for Error { Error::JoinTask { .. } => StatusCode::Unexpected, Error::Catalog { source, .. } => source.status_code(), - Error::ParseCatalogEntry { source, .. } => source.status_code(), + Error::CatalogEntrySerde { source, .. } => source.status_code(), - Error::RequestMeta { source } => source.status_code(), + Error::StartMetaClient { source } | Error::RequestMeta { source } => { + source.status_code() + } Error::BumpTableId { source, .. } => source.status_code(), Error::SchemaNotFound { .. } => StatusCode::InvalidArguments, Error::CatalogNotFound { .. } => StatusCode::InvalidArguments, diff --git a/src/frontend/src/frontend.rs b/src/frontend/src/frontend.rs index 4ee927ef88..31519c0313 100644 --- a/src/frontend/src/frontend.rs +++ b/src/frontend/src/frontend.rs @@ -85,5 +85,6 @@ where #[serde(rename_all = "lowercase")] pub enum Mode { Standalone, - Distributed, + // with meta server's addr + Distributed(Vec), } diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 8b66af9072..7fe470e07f 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -1,3 +1,4 @@ +pub(crate) mod distributed; mod influxdb; mod opentsdb; mod prometheus; @@ -26,6 +27,7 @@ use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_query::Output; use common_telemetry::{debug, error, info}; use datatypes::schema::ColumnSchema; +use distributed::DistInstance; use meta_client::client::MetaClientBuilder; use meta_client::MetaClientOpts; use servers::error as server_error; @@ -85,6 +87,8 @@ pub struct Instance { // Standalone and Distributed, then the code behind it doesn't need to use so // many match statements. mode: Mode, + // TODO(LFC): Refactor consideration: Can we split Frontend to DistInstance and EmbedInstance? + dist_instance: Option, } impl Default for Instance { @@ -94,19 +98,29 @@ impl Default for Instance { catalog_manager: None, table_id_provider: None, mode: Mode::Standalone, + dist_instance: None, } } } impl Instance { pub async fn try_new(opts: &FrontendOptions) -> Result { - let mut instance = Instance::default(); + let mut instance = Instance { + mode: opts.mode.clone(), + ..Default::default() + }; + let addr = opts.datanode_grpc_addr(); instance.client.start(vec![addr]); - let meta_client = match opts.mode { + instance.dist_instance = match &opts.mode { Mode::Standalone => None, - Mode::Distributed => { + Mode::Distributed(metasrv_addr) => { + info!( + "Creating Frontend instance in distributed mode with Meta server addr {:?}", + metasrv_addr + ); + let meta_config = MetaClientOpts::default(); let channel_config = ChannelConfig::new() .timeout(Duration::from_millis(meta_config.timeout_millis)) @@ -115,26 +129,36 @@ impl Instance { let channel_manager = ChannelManager::with_config(channel_config); - let meta_client = MetaClientBuilder::new(0, 0) + let mut meta_client = MetaClientBuilder::new(0, 0) .enable_router() .enable_store() .channel_manager(channel_manager) .build(); - Some(Arc::new(meta_client)) - } - }; + meta_client + .start(metasrv_addr) + .await + .context(error::StartMetaClientSnafu)?; + let meta_client = Arc::new(meta_client); - instance.catalog_manager = if let Some(meta_client) = meta_client { - let meta_backend = Arc::new(MetaKvBackend { - client: meta_client.clone(), - }); - let table_routes = Arc::new(TableRoutes::new(meta_client)); - let datanode_clients = Arc::new(DatanodeClients::new()); - let catalog_manager = - FrontendCatalogManager::new(meta_backend, table_routes, datanode_clients); - Some(Arc::new(catalog_manager)) - } else { - None + let meta_backend = Arc::new(MetaKvBackend { + client: meta_client.clone(), + }); + let table_routes = Arc::new(TableRoutes::new(meta_client.clone())); + let datanode_clients = Arc::new(DatanodeClients::new()); + let catalog_manager = FrontendCatalogManager::new( + meta_backend, + table_routes, + datanode_clients.clone(), + ); + + instance.catalog_manager = Some(Arc::new(catalog_manager.clone())); + + Some(DistInstance::new( + meta_client, + catalog_manager, + datanode_clients, + )) + } }; Ok(instance) } @@ -162,17 +186,14 @@ impl Instance { } /// Convert `CreateTable` statement to `CreateExpr` gRPC request. - async fn create_to_expr(&self, create: CreateTable) -> Result { + fn create_to_expr( + table_id: Option, + region_ids: Vec, + create: &CreateTable, + ) -> Result { let (catalog_name, schema_name, table_name) = table_idents_to_full_name(&create.name).context(error::ParseSqlSnafu)?; - let table_id = match &self.table_id_provider { - Some(provider) => Some(provider.next_table_id().await.context(BumpTableIdSnafu)?), - None => None, - }; - // FIXME(hl): Region id should be generated from metasrv - let region_ids = vec![0]; - let time_index = find_time_index(&create.constraints)?; let expr = CreateExpr { catalog_name: Some(catalog_name), @@ -184,7 +205,7 @@ impl Instance { primary_keys: find_primary_keys(&create.constraints)?, create_if_not_exists: create.if_not_exists, // TODO(LFC): Fill in other table options. - table_options: HashMap::from([("engine".to_string(), create.engine)]), + table_options: HashMap::from([("engine".to_string(), create.engine.clone())]), table_id, region_ids, }; @@ -478,6 +499,7 @@ impl Instance { catalog_manager: Some(catalog), table_id_provider: None, mode: Mode::Standalone, + dist_instance: None, } } } @@ -526,7 +548,7 @@ impl SqlQueryHandler for Instance { .map_err(BoxedError::new) .context(server_error::ExecuteQuerySnafu { query }) } - Mode::Distributed => { + Mode::Distributed(_) => { let affected = self .sql_dist_insert(insert) .await @@ -538,15 +560,32 @@ impl SqlQueryHandler for Instance { } }, Statement::CreateTable(create) => { - let expr = self - .create_to_expr(create) - .await - .map_err(BoxedError::new) - .context(server_error::ExecuteQuerySnafu { query })?; - self.handle_create_table(expr) - .await - .map_err(BoxedError::new) - .context(server_error::ExecuteQuerySnafu { query }) + if let Some(dist_instance) = &self.dist_instance { + dist_instance + .create_table(&create) + .await + .map_err(BoxedError::new) + .context(server_error::ExecuteQuerySnafu { query }) + } else { + let table_id = match &self.table_id_provider { + Some(provider) => Some( + provider + .next_table_id() + .await + .context(BumpTableIdSnafu) + .map_err(BoxedError::new) + .context(server_error::ExecuteQuerySnafu { query })?, + ), + None => None, + }; + let expr = Self::create_to_expr(table_id, vec![0], &create) + .map_err(BoxedError::new) + .context(server_error::ExecuteQuerySnafu { query })?; + self.handle_create_table(expr) + .await + .map_err(BoxedError::new) + .context(server_error::ExecuteQuerySnafu { query }) + } } Statement::ShowDatabases(_) | Statement::ShowTables(_) => self diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs new file mode 100644 index 0000000000..ebd1117564 --- /dev/null +++ b/src/frontend/src/instance/distributed.rs @@ -0,0 +1,329 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use chrono::DateTime; +use client::admin::{admin_result_to_output, Admin}; +use common_catalog::{TableGlobalKey, TableGlobalValue}; +use common_query::Output; +use common_telemetry::debug; +use datatypes::schema::RawSchema; +use meta_client::client::MetaClient; +use meta_client::rpc::{ + CreateRequest as MetaCreateRequest, Partition as MetaPartition, RouteResponse, TableName, + TableRoute, +}; +use snafu::{ensure, OptionExt, ResultExt}; +use sql::statements::create::CreateTable; +use sql::statements::{ + column_def_to_schema, sql_data_type_to_concrete_data_type, sql_value_to_value, + table_idents_to_full_name, +}; +use sqlparser::ast::ColumnDef; +use sqlparser::ast::Value as SqlValue; +use table::metadata::RawTableMeta; + +use crate::catalog::FrontendCatalogManager; +use crate::datanode::DatanodeClients; +use crate::error::{self, Result}; +use crate::instance::{find_primary_keys, find_time_index, Instance}; +use crate::partitioning::{PartitionBound, PartitionDef}; + +#[derive(Clone)] +pub(crate) struct DistInstance { + meta_client: Arc, + catalog_manager: FrontendCatalogManager, + datanode_clients: Arc, +} + +impl DistInstance { + pub(crate) fn new( + meta_client: Arc, + catalog_manager: FrontendCatalogManager, + datanode_clients: Arc, + ) -> Self { + Self { + meta_client, + catalog_manager, + datanode_clients, + } + } + + pub(crate) async fn create_table(&self, create_table: &CreateTable) -> Result { + let response = self.create_table_in_meta(create_table).await?; + + let table_routes = response.table_routes; + ensure!( + table_routes.len() == 1, + error::FindTableRoutesSnafu { + table_name: create_table.name.to_string() + } + ); + let table_route = table_routes.first().unwrap(); + + let region_routes = &table_route.region_routes; + ensure!( + !region_routes.is_empty(), + error::FindRegionRoutesSnafu { + table_name: create_table.name.to_string() + } + ); + + self.put_table_global_meta(create_table, table_route) + .await?; + + for datanode in table_route.find_leaders() { + let client = self.datanode_clients.get_client(&datanode).await; + let client = Admin::new("greptime", client); + + let regions = table_route.find_leader_regions(&datanode); + let create_expr = Instance::create_to_expr( + Some(table_route.table.id as u32), + regions.clone(), + create_table, + )?; + debug!( + "Creating table {:?} on Datanode {:?} with regions {:?}", + create_table, datanode, regions, + ); + + client + .create(create_expr) + .await + .and_then(admin_result_to_output) + .context(error::InvalidAdminResultSnafu)?; + } + + Ok(Output::AffectedRows(region_routes.len())) + } + + async fn create_table_in_meta(&self, create_table: &CreateTable) -> Result { + let (catalog, schema, table) = + table_idents_to_full_name(&create_table.name).context(error::ParseSqlSnafu)?; + let table_name = TableName::new(catalog, schema, table); + + let partitions = parse_partitions(create_table)?; + + let request = MetaCreateRequest { + table_name, + partitions, + }; + self.meta_client + .create_route(request) + .await + .context(error::RequestMetaSnafu) + } + + // TODO(LFC): Maybe move this to FrontendCatalogManager's "register_table" method? + async fn put_table_global_meta( + &self, + create_table: &CreateTable, + table_route: &TableRoute, + ) -> Result<()> { + let table_name = &table_route.table.table_name; + let key = TableGlobalKey { + catalog_name: table_name.catalog_name.clone(), + schema_name: table_name.schema_name.clone(), + table_name: table_name.table_name.clone(), + }; + + let value = create_table_global_value(create_table, table_route)? + .as_bytes() + .context(error::CatalogEntrySerdeSnafu)?; + + self.catalog_manager + .backend() + .set(key.to_string().as_bytes(), &value) + .await + .context(error::CatalogSnafu) + } +} + +fn create_table_global_value( + create_table: &CreateTable, + table_route: &TableRoute, +) -> Result { + let table_name = &table_route.table.table_name; + + let region_routes = &table_route.region_routes; + let node_id = region_routes[0] + .leader_peer + .as_ref() + .context(error::FindLeaderPeerSnafu { + region: region_routes[0].region.id, + table_name: table_name.to_string(), + })? + .id; + + let mut column_schemas = Vec::with_capacity(create_table.columns.len()); + let time_index = find_time_index(&create_table.constraints)?; + for column in create_table.columns.iter() { + column_schemas.push( + column_def_to_schema(column, column.name.value == time_index) + .context(error::ParseSqlSnafu)?, + ); + } + let timestamp_index = column_schemas.iter().enumerate().find_map(|(i, c)| { + if c.name == time_index { + Some(i) + } else { + None + } + }); + let raw_schema = RawSchema { + column_schemas: column_schemas.clone(), + timestamp_index, + version: 0, + }; + + let primary_key_indices = find_primary_keys(&create_table.constraints)? + .iter() + .map(|k| { + column_schemas + .iter() + .enumerate() + .find_map(|(i, c)| if &c.name == k { Some(i) } else { None }) + .unwrap() // unwrap is safe because primary key's column name must have been defined + }) + .collect::>(); + + let meta = RawTableMeta { + schema: raw_schema, + primary_key_indices, + value_indices: vec![], + engine: create_table.engine.clone(), + next_column_id: column_schemas.len() as u32, + region_numbers: vec![], + engine_options: HashMap::new(), + options: HashMap::new(), + created_on: DateTime::default(), + }; + + Ok(TableGlobalValue { + id: table_route.table.id as u32, + node_id, + regions_id_map: HashMap::new(), + meta, + }) +} + +fn parse_partitions(create_table: &CreateTable) -> Result> { + // If partitions are not defined by user, use the timestamp column (which has to be existed) as + // the partition column, and create only one partition. + let partition_columns = find_partition_columns(create_table)?; + let partition_entries = find_partition_entries(create_table, &partition_columns)?; + + partition_entries + .into_iter() + .map(|x| PartitionDef::new(partition_columns.clone(), x).try_into()) + .collect::>>() +} + +fn find_partition_entries( + create_table: &CreateTable, + partition_columns: &[String], +) -> Result>> { + let entries = if let Some(partitions) = &create_table.partitions { + let column_defs = partition_columns + .iter() + .map(|pc| { + create_table + .columns + .iter() + .find(|c| &c.name.value == pc) + // unwrap is safe here because we have checked that partition columns are defined + .unwrap() + }) + .collect::>(); + let mut column_name_and_type = Vec::with_capacity(column_defs.len()); + for column in column_defs { + let column_name = &column.name.value; + let data_type = sql_data_type_to_concrete_data_type(&column.data_type) + .context(error::ParseSqlSnafu)?; + column_name_and_type.push((column_name, data_type)); + } + + let mut entries = Vec::with_capacity(partitions.entries.len()); + for e in partitions.entries.iter() { + let mut values = Vec::with_capacity(e.value_list.len()); + for (i, v) in e.value_list.iter().enumerate() { + // indexing is safe here because we have checked that "value_list" and "column_list" are matched in size + let (column_name, data_type) = &column_name_and_type[i]; + let v = match v { + SqlValue::Number(n, _) if n == "MAXVALUE" => PartitionBound::MaxValue, + _ => PartitionBound::Value( + sql_value_to_value(column_name, data_type, v) + .context(error::ParseSqlSnafu)?, + ), + }; + values.push(v); + } + entries.push(values); + } + entries + } else { + vec![vec![PartitionBound::MaxValue]] + }; + Ok(entries) +} + +fn find_partition_columns(create_table: &CreateTable) -> Result> { + let columns = if let Some(partitions) = &create_table.partitions { + partitions + .column_list + .iter() + .map(|x| x.value.clone()) + .collect::>() + } else { + vec![find_time_index(&create_table.constraints)?] + }; + Ok(columns) +} + +#[cfg(test)] +mod test { + + use sql::parser::ParserContext; + use sql::statements::statement::Statement; + use sqlparser::dialect::GenericDialect; + + use super::*; + + #[test] + fn test_parse_partitions() { + let cases = [ + ( + r" +CREATE TABLE rcx ( a INT, b STRING, c INT ) +PARTITION BY RANGE COLUMNS (b) ( + PARTITION r0 VALUES LESS THAN ('hz'), + PARTITION r1 VALUES LESS THAN ('sh'), + PARTITION r2 VALUES LESS THAN (MAXVALUE), +) +ENGINE=mito", + r#"[{"column_list":"b","value_list":"{\"Value\":{\"String\":\"hz\"}}"},{"column_list":"b","value_list":"{\"Value\":{\"String\":\"sh\"}}"},{"column_list":"b","value_list":"\"MaxValue\""}]"#, + ), + ( + r" +CREATE TABLE rcx ( a INT, b STRING, c INT ) +PARTITION BY RANGE COLUMNS (b, a) ( + PARTITION r0 VALUES LESS THAN ('hz', 10), + PARTITION r1 VALUES LESS THAN ('sh', 20), + PARTITION r2 VALUES LESS THAN (MAXVALUE, MAXVALUE), +) +ENGINE=mito", + r#"[{"column_list":"b,a","value_list":"{\"Value\":{\"String\":\"hz\"}},{\"Value\":{\"Int32\":10}}"},{"column_list":"b,a","value_list":"{\"Value\":{\"String\":\"sh\"}},{\"Value\":{\"Int32\":20}}"},{"column_list":"b,a","value_list":"\"MaxValue\",\"MaxValue\""}]"#, + ), + ]; + for (sql, expected) in cases { + let result = ParserContext::create_with_dialect(sql, &GenericDialect {}).unwrap(); + match &result[0] { + Statement::CreateTable(c) => { + let partitions = parse_partitions(c).unwrap(); + let json = serde_json::to_string(&partitions).unwrap(); + assert_eq!(json, expected); + } + _ => unreachable!(), + } + } + } +} diff --git a/src/frontend/src/instance/influxdb.rs b/src/frontend/src/instance/influxdb.rs index 98bf71d699..4178ab0667 100644 --- a/src/frontend/src/instance/influxdb.rs +++ b/src/frontend/src/instance/influxdb.rs @@ -24,7 +24,7 @@ impl InfluxdbLineProtocolHandler for Instance { query: &request.lines, })?; } - Mode::Distributed => { + Mode::Distributed(_) => { self.dist_insert(request.try_into()?) .await .map_err(BoxedError::new) diff --git a/src/frontend/src/instance/opentsdb.rs b/src/frontend/src/instance/opentsdb.rs index f4e5543e4c..24e89ae256 100644 --- a/src/frontend/src/instance/opentsdb.rs +++ b/src/frontend/src/instance/opentsdb.rs @@ -23,7 +23,7 @@ impl OpentsdbProtocolHandler for Instance { data_point: format!("{:?}", data_point), })?; } - Mode::Distributed => { + Mode::Distributed(_) => { self.dist_insert(vec![data_point.as_insert_request()]) .await .map_err(BoxedError::new) diff --git a/src/frontend/src/instance/prometheus.rs b/src/frontend/src/instance/prometheus.rs index 2f02387b06..e3dc41aa0d 100644 --- a/src/frontend/src/instance/prometheus.rs +++ b/src/frontend/src/instance/prometheus.rs @@ -107,7 +107,7 @@ impl PrometheusProtocolHandler for Instance { msg: "failed to write prometheus remote request", })?; } - Mode::Distributed => { + Mode::Distributed(_) => { let inserts = prometheus::write_request_to_insert_reqs(request)?; self.dist_insert(inserts) diff --git a/src/frontend/src/partitioning.rs b/src/frontend/src/partitioning.rs index 7809640256..a71cdb9888 100644 --- a/src/frontend/src/partitioning.rs +++ b/src/frontend/src/partitioning.rs @@ -1,18 +1,26 @@ -mod columns; +pub(crate) mod columns; pub(crate) mod range; +use std::any::Any; use std::fmt::Debug; use std::sync::Arc; pub use datafusion_expr::Operator; use datatypes::prelude::Value; +use meta_client::rpc::Partition as MetaPartition; +use serde::{Deserialize, Serialize}; +use snafu::ResultExt; use store_api::storage::RegionNumber; +use crate::error::{self, Error}; + pub(crate) type PartitionRuleRef = Arc>; pub trait PartitionRule: Sync + Send { type Error: Debug; + fn as_any(&self) -> &dyn Any; + fn partition_columns(&self) -> Vec; // TODO(LFC): Unify `find_region` and `find_regions` methods when distributed read and write features are both merged into develop. @@ -23,14 +31,92 @@ pub trait PartitionRule: Sync + Send { } /// The right bound(exclusive) of partition range. -#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] -enum PartitionBound { +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] +pub(crate) enum PartitionBound { Value(Value), - // FIXME(LFC): no allow, for clippy temporarily - #[allow(dead_code)] MaxValue, } +#[derive(Debug)] +pub(crate) struct PartitionDef { + partition_columns: Vec, + partition_bounds: Vec, +} + +impl PartitionDef { + pub(crate) fn new( + partition_columns: Vec, + partition_bounds: Vec, + ) -> Self { + Self { + partition_columns, + partition_bounds, + } + } + + pub(crate) fn partition_columns(&self) -> &Vec { + &self.partition_columns + } + + pub(crate) fn partition_bounds(&self) -> &Vec { + &self.partition_bounds + } +} + +impl TryFrom for PartitionDef { + type Error = Error; + + fn try_from(partition: MetaPartition) -> Result { + let MetaPartition { + column_list, + value_list, + } = partition; + + let partition_columns = column_list + .into_iter() + .map(|x| String::from_utf8_lossy(&x).to_string()) + .collect::>(); + + let partition_bounds = value_list + .into_iter() + .map(|x| serde_json::from_str(&String::from_utf8_lossy(&x))) + .collect::, serde_json::Error>>() + .context(error::DeserializeJsonSnafu)?; + + Ok(PartitionDef { + partition_columns, + partition_bounds, + }) + } +} + +impl TryFrom for MetaPartition { + type Error = Error; + + fn try_from(partition: PartitionDef) -> Result { + let PartitionDef { + partition_columns: columns, + partition_bounds: bounds, + } = partition; + + let column_list = columns + .into_iter() + .map(|x| x.into_bytes()) + .collect::>>(); + + let value_list = bounds + .into_iter() + .map(|x| serde_json::to_string(&x).map(|s| s.into_bytes())) + .collect::>, serde_json::Error>>() + .context(error::SerializeJsonSnafu)?; + + Ok(MetaPartition { + column_list, + value_list, + }) + } +} + #[derive(Debug, PartialEq, Eq)] pub struct PartitionExpr { column: String, @@ -56,6 +142,44 @@ impl PartitionExpr { mod tests { use super::*; + #[test] + fn test_partition_def() { + // PartitionDef -> MetaPartition + let def = PartitionDef { + partition_columns: vec!["a".to_string(), "b".to_string()], + partition_bounds: vec![ + PartitionBound::MaxValue, + PartitionBound::Value(1_i32.into()), + ], + }; + let partition: MetaPartition = def.try_into().unwrap(); + assert_eq!( + r#"{"column_list":"a,b","value_list":"\"MaxValue\",{\"Value\":{\"Int32\":1}}"}"#, + serde_json::to_string(&partition).unwrap(), + ); + + // MetaPartition -> PartitionDef + let partition = MetaPartition { + column_list: vec![b"a".to_vec(), b"b".to_vec()], + value_list: vec![ + b"\"MaxValue\"".to_vec(), + b"{\"Value\":{\"Int32\":1}}".to_vec(), + ], + }; + let def: PartitionDef = partition.try_into().unwrap(); + assert_eq!( + def.partition_columns, + vec!["a".to_string(), "b".to_string()] + ); + assert_eq!( + def.partition_bounds, + vec![ + PartitionBound::MaxValue, + PartitionBound::Value(1_i32.into()) + ] + ); + } + #[test] fn test_partition_bound() { let b1 = PartitionBound::Value(1_i32.into()); diff --git a/src/frontend/src/partitioning/columns.rs b/src/frontend/src/partitioning/columns.rs index 5305483a69..ad5194f0dc 100644 --- a/src/frontend/src/partitioning/columns.rs +++ b/src/frontend/src/partitioning/columns.rs @@ -1,3 +1,5 @@ +use std::any::Any; + use datafusion_expr::Operator; use datatypes::value::Value; use snafu::ensure; @@ -30,7 +32,7 @@ use crate::partitioning::{PartitionBound, PartitionExpr, PartitionRule}; /// /// Please refer to MySQL's ["RANGE COLUMNS Partitioning"](https://dev.mysql.com/doc/refman/8.0/en/partitioning-columns-range.html) /// document for more details. -struct RangeColumnsPartitionRule { +pub struct RangeColumnsPartitionRule { column_list: Vec, value_lists: Vec>, regions: Vec, @@ -61,9 +63,7 @@ struct RangeColumnsPartitionRule { impl RangeColumnsPartitionRule { // It's assured that input arguments are valid because they are checked in SQL parsing stage. // So we can skip validating them. - // FIXME(LFC): no allow, for clippy temporarily - #[allow(dead_code)] - fn new( + pub(crate) fn new( column_list: Vec, value_lists: Vec>, regions: Vec, @@ -108,11 +108,30 @@ impl RangeColumnsPartitionRule { first_column_regions, } } + + #[cfg(test)] + pub(crate) fn column_list(&self) -> &Vec { + &self.column_list + } + + #[cfg(test)] + pub(crate) fn value_lists(&self) -> &Vec> { + &self.value_lists + } + + #[cfg(test)] + pub(crate) fn regions(&self) -> &Vec { + &self.regions + } } impl PartitionRule for RangeColumnsPartitionRule { type Error = Error; + fn as_any(&self) -> &dyn Any { + self + } + fn partition_columns(&self) -> Vec { self.column_list.clone() } diff --git a/src/frontend/src/partitioning/range.rs b/src/frontend/src/partitioning/range.rs index 69119bc29c..518c2f9d3c 100644 --- a/src/frontend/src/partitioning/range.rs +++ b/src/frontend/src/partitioning/range.rs @@ -1,3 +1,5 @@ +use std::any::Any; + use datatypes::prelude::*; use serde::{Deserialize, Serialize}; use snafu::OptionExt; @@ -54,8 +56,6 @@ pub struct RangePartitionRule { } impl RangePartitionRule { - // FIXME(LFC): no allow, for clippy temporarily - #[allow(dead_code)] pub(crate) fn new( column_name: impl Into, bounds: Vec, @@ -68,24 +68,44 @@ impl RangePartitionRule { } } - fn column_name(&self) -> &String { + pub(crate) fn column_name(&self) -> &String { &self.column_name } - fn all_regions(&self) -> &Vec { + pub(crate) fn all_regions(&self) -> &Vec { &self.regions } + + #[cfg(test)] + pub(crate) fn bounds(&self) -> &Vec { + &self.bounds + } } impl PartitionRule for RangePartitionRule { type Error = Error; + fn as_any(&self) -> &dyn Any { + self + } + fn partition_columns(&self) -> Vec { vec![self.column_name().to_string()] } - fn find_region(&self, _values: &[Value]) -> Result { - unimplemented!() + fn find_region(&self, values: &[Value]) -> Result { + debug_assert_eq!( + values.len(), + 1, + "RangePartitionRule can only handle one partition value, actual {}", + values.len() + ); + let value = &values[0]; + + Ok(match self.bounds.binary_search(value) { + Ok(i) => self.regions[i + 1], + Err(i) => self.regions[i], + }) } fn find_regions(&self, exprs: &[PartitionExpr]) -> Result, Self::Error> { diff --git a/src/frontend/src/spliter.rs b/src/frontend/src/spliter.rs index b4e8db0d4f..f0db03209e 100644 --- a/src/frontend/src/spliter.rs +++ b/src/frontend/src/spliter.rs @@ -158,6 +158,7 @@ fn partition_insert_request( #[cfg(test)] mod tests { + use std::any::Any; use std::{collections::HashMap, result::Result, sync::Arc}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; @@ -422,6 +423,10 @@ mod tests { impl PartitionRule for MockPartitionRule { type Error = Error; + fn as_any(&self) -> &dyn Any { + self + } + fn partition_columns(&self) -> Vec { vec!["id".to_string()] } diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index 6c8a8d253d..1ce458aa28 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -13,6 +13,7 @@ use common_recordbatch::{RecordBatches, SendableRecordBatchStream}; use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::logical_plan::Expr as DfExpr; use datafusion::physical_plan::Partitioning; +use datatypes::prelude::Value; use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; use meta_client::rpc::{Peer, TableName}; use snafu::prelude::*; @@ -26,7 +27,11 @@ use tokio::sync::RwLock; use crate::datanode::DatanodeClients; use crate::error::{self, Error, Result}; use crate::mock::{DatanodeInstance, TableScanPlan}; -use crate::partitioning::{Operator, PartitionExpr, PartitionRuleRef}; +use crate::partitioning::columns::RangeColumnsPartitionRule; +use crate::partitioning::range::RangePartitionRule; +use crate::partitioning::{ + Operator, PartitionBound, PartitionDef, PartitionExpr, PartitionRuleRef, +}; use crate::spliter::WriteSpliter; use crate::table::route::TableRoutes; pub mod insert; @@ -35,7 +40,6 @@ pub mod insert; pub struct DistTable { pub(crate) table_name: TableName, pub(crate) schema: SchemaRef, - pub(crate) partition_rule: PartitionRuleRef, pub(crate) table_routes: Arc, pub(crate) datanode_clients: Arc, } @@ -55,7 +59,9 @@ impl Table for DistTable { } async fn insert(&self, request: InsertRequest) -> table::Result { - let spliter = WriteSpliter::with_patition_rule(self.partition_rule.clone()); + let partition_rule = self.find_partition_rule().await.map_err(TableError::new)?; + + let spliter = WriteSpliter::with_patition_rule(partition_rule); let inserts = spliter.split(request).map_err(TableError::new)?; let result = match self.dist_insert(inserts).await.map_err(TableError::new)? { client::ObjectResult::Select(_) => unreachable!(), @@ -70,7 +76,11 @@ impl Table for DistTable { filters: &[Expr], limit: Option, ) -> table::Result { - let regions = self.find_regions(filters).map_err(TableError::new)?; + let partition_rule = self.find_partition_rule().await.map_err(TableError::new)?; + + let regions = self + .find_regions(partition_rule, filters) + .map_err(TableError::new)?; let datanodes = self .find_datanodes(regions) .await @@ -107,11 +117,15 @@ impl Table for DistTable { impl DistTable { // TODO(LFC): Finding regions now seems less efficient, should be further looked into. - fn find_regions(&self, filters: &[Expr]) -> Result> { + fn find_regions( + &self, + partition_rule: PartitionRuleRef, + filters: &[Expr], + ) -> Result> { let regions = if let Some((first, rest)) = filters.split_first() { - let mut target = self.find_regions0(first)?; + let mut target = self.find_regions0(partition_rule.clone(), first)?; for filter in rest { - let regions = self.find_regions0(filter)?; + let regions = self.find_regions0(partition_rule.clone(), filter)?; // When all filters are provided as a collection, it often implicitly states that // "all filters must be satisfied". So we join all the results here. @@ -124,7 +138,7 @@ impl DistTable { } target.into_iter().collect::>() } else { - self.partition_rule.find_regions(&[])? + partition_rule.find_regions(&[])? }; ensure!( !regions.is_empty(), @@ -139,7 +153,11 @@ impl DistTable { // - BETWEEN and IN (maybe more) // - expr with arithmetic like "a + 1 < 10" (should have been optimized in logic plan?) // - not comparison or neither "AND" nor "OR" operations, for example, "a LIKE x" - fn find_regions0(&self, filter: &Expr) -> Result> { + fn find_regions0( + &self, + partition_rule: PartitionRuleRef, + filter: &Expr, + ) -> Result> { let expr = filter.df_expr(); match expr { DfExpr::BinaryExpr { left, op, right } if is_compare_op(op) => { @@ -155,8 +173,7 @@ impl DistTable { .clone() .try_into() .with_context(|_| error::ConvertScalarValueSnafu { value: sv.clone() })?; - return Ok(self - .partition_rule + return Ok(partition_rule .find_regions(&[PartitionExpr::new(column, op, value)])? .into_iter() .collect::>()); @@ -165,8 +182,10 @@ impl DistTable { DfExpr::BinaryExpr { left, op, right } if matches!(op, Operator::And | Operator::Or) => { - let left_regions = self.find_regions0(&(*left.clone()).into())?; - let right_regions = self.find_regions0(&(*right.clone()).into())?; + let left_regions = + self.find_regions0(partition_rule.clone(), &(*left.clone()).into())?; + let right_regions = + self.find_regions0(partition_rule.clone(), &(*right.clone()).into())?; let regions = match op { Operator::And => left_regions .intersection(&right_regions) @@ -184,8 +203,7 @@ impl DistTable { } // Returns all regions for not supported partition expr as a safety hatch. - Ok(self - .partition_rule + Ok(partition_rule .find_regions(&[])? .into_iter() .collect::>()) @@ -217,6 +235,85 @@ impl DistTable { } Ok(datanodes) } + + async fn find_partition_rule(&self) -> Result> { + let route = self.table_routes.get_route(&self.table_name).await?; + ensure!( + !route.region_routes.is_empty(), + error::FindRegionRoutesSnafu { + table_name: self.table_name.to_string() + } + ); + + let mut partitions = Vec::with_capacity(route.region_routes.len()); + for r in route.region_routes.iter() { + let partition = + r.region + .partition + .clone() + .context(error::FindRegionPartitionSnafu { + region: r.region.id, + table_name: self.table_name.to_string(), + })?; + let partition_def: PartitionDef = partition.try_into()?; + partitions.push((r.region.id, partition_def)); + } + partitions.sort_by(|a, b| a.1.partition_bounds().cmp(b.1.partition_bounds())); + + ensure!( + partitions + .windows(2) + .all(|w| w[0].1.partition_columns() == w[1].1.partition_columns()), + error::IllegalTableRoutesDataSnafu { + table_name: self.table_name.to_string(), + err_msg: "partition columns of all regions are not the same" + } + ); + let partition_columns = partitions[0].1.partition_columns(); + ensure!( + !partition_columns.is_empty(), + error::IllegalTableRoutesDataSnafu { + table_name: self.table_name.to_string(), + err_msg: "no partition columns found" + } + ); + + let regions = partitions + .iter() + .map(|x| x.0 as u32) + .collect::>(); + + // TODO(LFC): Serializing and deserializing partition rule is ugly, must find a much more elegant way. + let partition_rule: PartitionRuleRef = match partition_columns.len() { + 1 => { + // Omit the last "MAXVALUE". + let bounds = partitions + .iter() + .filter_map(|(_, p)| match &p.partition_bounds()[0] { + PartitionBound::Value(v) => Some(v.clone()), + PartitionBound::MaxValue => None, + }) + .collect::>(); + Arc::new(RangePartitionRule::new( + partition_columns[0].clone(), + bounds, + regions, + )) as _ + } + _ => { + let bounds = partitions + .iter() + .map(|x| x.1.partition_bounds().clone()) + .collect::>>(); + Arc::new(RangeColumnsPartitionRule::new( + partition_columns.clone(), + bounds, + regions, + )) as _ + } + }; + Ok(partition_rule) + } } fn project_schema(table_schema: SchemaRef, projection: &Option>) -> SchemaRef { @@ -337,11 +434,11 @@ impl PartitionExec { #[allow(clippy::print_stdout)] #[cfg(test)] mod test { - use api::v1::meta::{PutRequest, RequestHeader}; - use catalog::RegisterTableRequest; - use chrono::DateTime; - use common_catalog::{TableGlobalKey, TableGlobalValue}; - use common_recordbatch::{util, RecordBatch}; + use api::v1::codec::InsertBatch; + use api::v1::column::SemanticType; + use api::v1::{column, insert_expr, Column, ColumnDataType}; + use catalog::remote::MetaKvBackend; + use common_recordbatch::util; use datafusion::arrow_print; use datafusion_common::record_batch::RecordBatch as DfRecordBatch; use datafusion_expr::expr_fn::col; @@ -349,24 +446,214 @@ mod test { use datafusion_expr::lit; use datanode::datanode::{DatanodeOptions, ObjectStoreConfig}; use datanode::instance::Instance; - use datatypes::prelude::{ConcreteDataType, VectorRef}; - use datatypes::schema::{ColumnSchema, RawSchema, Schema}; - use datatypes::vectors::{Int32Vector, UInt32Vector, UInt64Vector}; - use meta_client::client::MetaClientBuilder; - use meta_client::rpc::{CreateRequest, Partition}; + use datatypes::prelude::ConcreteDataType; + use datatypes::schema::{ColumnSchema, Schema}; + use meta_client::client::{MetaClient, MetaClientBuilder}; + use meta_client::rpc::router::RegionRoute; + use meta_client::rpc::{Region, Table, TableRoute}; use meta_srv::metasrv::MetaSrvOptions; use meta_srv::mocks::MockInfo; use meta_srv::service::store::kv::KvStoreRef; use meta_srv::service::store::memory::MemStore; - use table::metadata::RawTableMeta; - use table::test_util::MemTable; + use sql::parser::ParserContext; + use sql::statements::statement::Statement; + use sqlparser::dialect::GenericDialect; use table::TableRef; use tempdir::TempDir; use super::*; + use crate::catalog::FrontendCatalogManager; + use crate::instance::distributed::DistInstance; use crate::partitioning::range::RangePartitionRule; #[tokio::test(flavor = "multi_thread")] + async fn test_find_partition_rule() { + let table_name = TableName::new("greptime", "public", "foo"); + + let column_schemas = vec![ + ColumnSchema::new("ts", ConcreteDataType::uint64_datatype(), false), + ColumnSchema::new("a", ConcreteDataType::int32_datatype(), true), + ColumnSchema::new("b", ConcreteDataType::string_datatype(), true), + ]; + let schema = Arc::new(Schema::new(column_schemas.clone())); + + let table_routes = Arc::new(TableRoutes::new(Arc::new(MetaClient::default()))); + let table = DistTable { + table_name: table_name.clone(), + schema, + table_routes: table_routes.clone(), + datanode_clients: Arc::new(DatanodeClients::new()), + }; + + let table_route = TableRoute { + table: Table { + id: 1, + table_name: table_name.clone(), + table_schema: vec![], + }, + region_routes: vec![ + RegionRoute { + region: Region { + id: 3, + name: "r1".to_string(), + partition: Some( + PartitionDef::new( + vec!["a".to_string()], + vec![PartitionBound::Value(10_i32.into())], + ) + .try_into() + .unwrap(), + ), + attrs: HashMap::new(), + }, + leader_peer: None, + follower_peers: vec![], + }, + RegionRoute { + region: Region { + id: 2, + name: "r2".to_string(), + partition: Some( + PartitionDef::new( + vec!["a".to_string()], + vec![PartitionBound::Value(50_i32.into())], + ) + .try_into() + .unwrap(), + ), + attrs: HashMap::new(), + }, + leader_peer: None, + follower_peers: vec![], + }, + RegionRoute { + region: Region { + id: 1, + name: "r3".to_string(), + partition: Some( + PartitionDef::new( + vec!["a".to_string()], + vec![PartitionBound::MaxValue], + ) + .try_into() + .unwrap(), + ), + attrs: HashMap::new(), + }, + leader_peer: None, + follower_peers: vec![], + }, + ], + }; + table_routes + .insert_table_route(table_name.clone(), Arc::new(table_route)) + .await; + + let partition_rule = table.find_partition_rule().await.unwrap(); + let range_rule = partition_rule + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(range_rule.column_name(), "a"); + assert_eq!(range_rule.all_regions(), &vec![3, 2, 1]); + assert_eq!(range_rule.bounds(), &vec![10_i32.into(), 50_i32.into()]); + + let table_route = TableRoute { + table: Table { + id: 1, + table_name: table_name.clone(), + table_schema: vec![], + }, + region_routes: vec![ + RegionRoute { + region: Region { + id: 1, + name: "r1".to_string(), + partition: Some( + PartitionDef::new( + vec!["a".to_string(), "b".to_string()], + vec![ + PartitionBound::Value(10_i32.into()), + PartitionBound::Value("hz".into()), + ], + ) + .try_into() + .unwrap(), + ), + attrs: HashMap::new(), + }, + leader_peer: None, + follower_peers: vec![], + }, + RegionRoute { + region: Region { + id: 2, + name: "r2".to_string(), + partition: Some( + PartitionDef::new( + vec!["a".to_string(), "b".to_string()], + vec![ + PartitionBound::Value(50_i32.into()), + PartitionBound::Value("sh".into()), + ], + ) + .try_into() + .unwrap(), + ), + attrs: HashMap::new(), + }, + leader_peer: None, + follower_peers: vec![], + }, + RegionRoute { + region: Region { + id: 3, + name: "r3".to_string(), + partition: Some( + PartitionDef::new( + vec!["a".to_string(), "b".to_string()], + vec![PartitionBound::MaxValue, PartitionBound::MaxValue], + ) + .try_into() + .unwrap(), + ), + attrs: HashMap::new(), + }, + leader_peer: None, + follower_peers: vec![], + }, + ], + }; + table_routes + .insert_table_route(table_name.clone(), Arc::new(table_route)) + .await; + + let partition_rule = table.find_partition_rule().await.unwrap(); + let range_columns_rule = partition_rule + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(range_columns_rule.column_list(), &vec!["a", "b"]); + assert_eq!( + range_columns_rule.value_lists(), + &vec![ + vec![ + PartitionBound::Value(10_i32.into()), + PartitionBound::Value("hz".into()), + ], + vec![ + PartitionBound::Value(50_i32.into()), + PartitionBound::Value("sh".into()), + ], + vec![PartitionBound::MaxValue, PartitionBound::MaxValue] + ] + ); + assert_eq!(range_columns_rule.regions(), &vec![1, 2, 3]); + } + + #[tokio::test(flavor = "multi_thread")] + // FIXME(LFC): Remove ignore when auto create table upon insertion is ready. + #[ignore] async fn test_dist_table_scan() { common_telemetry::init_default_ut_logging(); let table = Arc::new(new_dist_table().await); @@ -452,28 +739,21 @@ mod test { ]; let schema = Arc::new(Schema::new(column_schemas.clone())); - // PARTITION BY RANGE (a) ( - // PARTITION r1 VALUES LESS THAN (10), - // PARTITION r2 VALUES LESS THAN (20), - // PARTITION r3 VALUES LESS THAN (50), - // PARTITION r4 VALUES LESS THAN (MAXVALUE), - // ) - let partition_rule = RangePartitionRule::new( - "a", - vec![10_i32.into(), 20_i32.into(), 50_i32.into()], - vec![0_u32, 1, 2, 3], - ); - let kv_store: KvStoreRef = Arc::new(MemStore::default()) as _; let meta_srv = meta_srv::mocks::mock(MetaSrvOptions::default(), kv_store.clone(), None).await; + let datanode_clients = Arc::new(DatanodeClients::new()); + let mut datanode_instances = HashMap::new(); for datanode_id in 1..=4 { - datanode_instances.insert( - datanode_id, - create_datanode_instance(datanode_id, meta_srv.clone()).await, - ); + let dn_instance = create_datanode_instance(datanode_id, meta_srv.clone()).await; + datanode_instances.insert(datanode_id, dn_instance.clone()); + + let (addr, client) = crate::tests::create_datanode_client(dn_instance).await; + datanode_clients + .insert_client(Peer::new(datanode_id, addr), client) + .await; } let MockInfo { @@ -489,29 +769,47 @@ mod test { let meta_client = Arc::new(meta_client); let table_name = TableName::new("greptime", "public", "dist_numbers"); - let create_request = CreateRequest { - table_name: table_name.clone(), - partitions: vec![ - Partition { - column_list: vec![b"a".to_vec()], - value_list: vec![b"10".to_vec()], - }, - Partition { - column_list: vec![b"a".to_vec()], - value_list: vec![b"20".to_vec()], - }, - Partition { - column_list: vec![b"a".to_vec()], - value_list: vec![b"50".to_vec()], - }, - Partition { - column_list: vec![b"a".to_vec()], - value_list: vec![b"MAXVALUE".to_vec()], - }, - ], + + let meta_backend = Arc::new(MetaKvBackend { + client: meta_client.clone(), + }); + let table_routes = Arc::new(TableRoutes::new(meta_client.clone())); + let catalog_manager = FrontendCatalogManager::new( + meta_backend, + table_routes.clone(), + datanode_clients.clone(), + ); + let dist_instance = DistInstance::new( + meta_client.clone(), + catalog_manager, + datanode_clients.clone(), + ); + + let sql = " + CREATE TABLE greptime.public.dist_numbers ( + ts BIGINT, + a INT, + row_id INT, + TIME INDEX (ts), + ) + PARTITION BY RANGE COLUMNS (a) ( + PARTITION r0 VALUES LESS THAN (10), + PARTITION r1 VALUES LESS THAN (20), + PARTITION r2 VALUES LESS THAN (50), + PARTITION r3 VALUES LESS THAN (MAXVALUE), + ) + ENGINE=mito"; + let create_table = match ParserContext::create_with_dialect(sql, &GenericDialect {}) + .unwrap() + .pop() + .unwrap() + { + Statement::CreateTable(c) => c, + _ => unreachable!(), }; - let mut route_response = meta_client.create_route(create_request).await.unwrap(); - let table_route = route_response.table_routes.remove(0); + let _result = dist_instance.create_table(&create_table).await.unwrap(); + + let table_route = table_routes.get_route(&table_name).await.unwrap(); println!("{}", serde_json::to_string_pretty(&table_route).unwrap()); let mut region_to_datanode_mapping = HashMap::new(); @@ -521,50 +819,6 @@ mod test { region_to_datanode_mapping.insert(region_id, datanode_id); } - let table_global_key = TableGlobalKey { - catalog_name: table_name.catalog_name.clone(), - schema_name: table_name.schema_name.clone(), - table_name: table_name.table_name.clone(), - }; - let table_global_value = TableGlobalValue { - id: table_route.table.id as u32, - node_id: table_route - .region_routes - .first() - .unwrap() - .leader_peer - .as_ref() - .unwrap() - .id, - regions_id_map: HashMap::new(), - meta: RawTableMeta { - schema: RawSchema { - column_schemas: column_schemas.clone(), - timestamp_index: Some(0), - version: 0, - }, - primary_key_indices: vec![], - value_indices: vec![], - engine: "".to_string(), - next_column_id: column_schemas.len() as u32, - region_numbers: vec![], - engine_options: HashMap::new(), - options: HashMap::new(), - created_on: DateTime::default(), - }, - partition_rules: serde_json::to_string(&partition_rule).unwrap(), - }; - let _put_response = kv_store - .put(PutRequest { - header: Some(RequestHeader::new((1000, 0))), - key: table_global_key.to_string().as_bytes().to_vec(), - value: table_global_value.as_bytes().unwrap(), - prev_kv: true, - }) - .await - .unwrap(); - - let datanode_clients = Arc::new(DatanodeClients::new()); let mut global_start_ts = 1; let regional_numbers = vec![ (0, (0..5).collect::>()), @@ -577,45 +831,70 @@ mod test { let instance = datanode_instances.get(&datanode_id).unwrap().clone(); let start_ts = global_start_ts; - global_start_ts += numbers.len() as u64; + global_start_ts += numbers.len() as i64; - let table = new_memtable(schema.clone(), numbers, vec![region_id], start_ts); - register_datanode_table(instance.clone(), table).await; - - let (addr, client) = crate::tests::create_datanode_client(instance).await; - datanode_clients - .insert_client(Peer::new(datanode_id, addr), client) - .await; + insert_testing_data(&table_name, instance.clone(), numbers, start_ts).await; } DistTable { table_name, schema, - partition_rule: Arc::new(partition_rule), - table_routes: Arc::new(TableRoutes::new(meta_client)), + table_routes, datanode_clients, } } - fn new_memtable( - schema: SchemaRef, + async fn insert_testing_data( + table_name: &TableName, + dn_instance: Arc, data: Vec, - regions: Vec, - start_ts: u64, - ) -> MemTable { + start_ts: i64, + ) { let rows = data.len() as u32; - let columns: Vec = vec![ - // column "ts" - Arc::new(UInt64Vector::from_slice( - (start_ts..start_ts + rows as u64).collect::>(), - )), - // column "a" - Arc::new(Int32Vector::from_slice(data)), - // column "row_id" - Arc::new(UInt32Vector::from_slice((1..=rows).collect::>())), - ]; - let recordbatch = RecordBatch::new(schema, columns).unwrap(); - MemTable::new_with_region("dist_numbers", recordbatch, regions) + let values = vec![InsertBatch { + columns: vec![ + Column { + column_name: "ts".to_string(), + values: Some(column::Values { + i64_values: (start_ts..start_ts + rows as i64).collect::>(), + ..Default::default() + }), + datatype: ColumnDataType::Int64 as i32, + semantic_type: SemanticType::Timestamp as i32, + ..Default::default() + }, + Column { + column_name: "a".to_string(), + values: Some(column::Values { + i32_values: data, + ..Default::default() + }), + datatype: ColumnDataType::Int32 as i32, + ..Default::default() + }, + Column { + column_name: "row_id".to_string(), + values: Some(column::Values { + i32_values: (1..=rows as i32).collect::>(), + ..Default::default() + }), + datatype: ColumnDataType::Int32 as i32, + ..Default::default() + }, + ], + row_count: rows, + } + .into()]; + let values = insert_expr::Values { values }; + dn_instance + .execute_grpc_insert( + &table_name.catalog_name, + &table_name.schema_name, + &table_name.table_name, + values, + ) + .await + .unwrap(); } async fn create_datanode_instance(datanode_id: u64, meta_srv: MockInfo) -> Arc { @@ -642,25 +921,36 @@ mod test { instance } - async fn register_datanode_table(instance: Arc, table: MemTable) { - let catalog_manager = instance.catalog_manager().clone(); - let _ = catalog_manager - .register_table(RegisterTableRequest { - catalog: "greptime".to_string(), - schema: "public".to_string(), - table_name: table.table_name().to_string(), - table_id: 1234, - table: Arc::new(table), - }) - .await; - } - #[tokio::test(flavor = "multi_thread")] async fn test_find_regions() { - let table = new_dist_table().await; + let schema = Arc::new(Schema::new(vec![ColumnSchema::new( + "a", + ConcreteDataType::int32_datatype(), + true, + )])); + let table = DistTable { + table_name: TableName::new("greptime", "public", "foo"), + schema, + table_routes: Arc::new(TableRoutes::new(Arc::new(MetaClient::default()))), + datanode_clients: Arc::new(DatanodeClients::new()), + }; + + // PARTITION BY RANGE (a) ( + // PARTITION r1 VALUES LESS THAN (10), + // PARTITION r2 VALUES LESS THAN (20), + // PARTITION r3 VALUES LESS THAN (50), + // PARTITION r4 VALUES LESS THAN (MAXVALUE), + // ) + let partition_rule: PartitionRuleRef = Arc::new(RangePartitionRule::new( + "a", + vec![10_i32.into(), 20_i32.into(), 50_i32.into()], + vec![0_u32, 1, 2, 3], + )) as _; let test = |filters: Vec, expect_regions: Vec| { - let mut regions = table.find_regions(filters.as_slice()).unwrap(); + let mut regions = table + .find_regions(partition_rule.clone(), filters.as_slice()) + .unwrap(); regions.sort(); assert_eq!(regions, expect_regions); @@ -750,6 +1040,7 @@ mod test { // test failed to find regions by contradictory filters let regions = table.find_regions( + partition_rule, vec![and( binary_expr(col("a"), Operator::Lt, lit(20)), binary_expr(col("a"), Operator::GtEq, lit(20)), diff --git a/src/frontend/src/table/route.rs b/src/frontend/src/table/route.rs index f058b3cb9a..051e73d4cf 100644 --- a/src/frontend/src/table/route.rs +++ b/src/frontend/src/table/route.rs @@ -53,4 +53,13 @@ impl TableRoutes { let route = resp.table_routes.swap_remove(0); Ok(Arc::new(route)) } + + #[cfg(test)] + pub(crate) async fn insert_table_route( + &self, + table_name: TableName, + table_route: Arc, + ) { + self.cache.insert(table_name, table_route).await + } } diff --git a/src/meta-client/src/rpc.rs b/src/meta-client/src/rpc.rs index 16b988b79f..5a6d79d4bf 100644 --- a/src/meta-client/src/rpc.rs +++ b/src/meta-client/src/rpc.rs @@ -1,4 +1,4 @@ -mod router; +pub mod router; mod store; pub mod util; diff --git a/src/meta-client/src/rpc/router.rs b/src/meta-client/src/rpc/router.rs index 381199a453..614c898e4b 100644 --- a/src/meta-client/src/rpc/router.rs +++ b/src/meta-client/src/rpc/router.rs @@ -137,10 +137,35 @@ pub struct TableRoute { pub region_routes: Vec, } +impl TableRoute { + pub fn find_leaders(&self) -> Vec { + self.region_routes + .iter() + .flat_map(|x| &x.leader_peer) + .cloned() + .collect::>() + } + + pub fn find_leader_regions(&self, datanode: &Peer) -> Vec { + self.region_routes + .iter() + .filter_map(|x| { + if let Some(peer) = &x.leader_peer { + if peer == datanode { + return Some(x.region.id as u32); + } + } + None + }) + .collect::>() + } +} + #[derive(Debug, Clone, Deserialize, Serialize)] pub struct Table { pub id: u64, pub table_name: TableName, + #[serde(serialize_with = "as_utf8")] pub table_schema: Vec, } @@ -190,13 +215,24 @@ impl From for Region { #[derive(Debug, Clone, Deserialize, Serialize)] pub struct Partition { - #[serde(serialize_with = "as_utf8")] + #[serde(serialize_with = "as_utf8_vec")] pub column_list: Vec>, - #[serde(serialize_with = "as_utf8")] + #[serde(serialize_with = "as_utf8_vec")] pub value_list: Vec>, } -fn as_utf8(val: &[Vec], serializer: S) -> std::result::Result { +fn as_utf8(val: &[u8], serializer: S) -> std::result::Result { + serializer.serialize_str( + String::from_utf8(val.to_vec()) + .unwrap_or_else(|_| "".to_string()) + .as_str(), + ) +} + +fn as_utf8_vec( + val: &[Vec], + serializer: S, +) -> std::result::Result { serializer.serialize_str( val.iter() .map(|v| {