From f386329e29f30d49e854e36e24d23f128e2bb403 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Fri, 8 Sep 2023 12:16:59 +0800 Subject: [PATCH] refactor: introduce `DdlTaskExecutor` and refactor statement executor (#2341) * feat: add kv store option * refactor: refactor statement executor * refactor: refactor standalone table creator * chore: apply suggestions from CR * chore: apply suggestions from CR * refactor: move ShowCreateTable and CreateDatabase to StatementExecutor * fix: fix RegionDistribution * feat: build standalone * chore: apply suggestions from CR * chore: apply suggestions from CR * chore: apply suggestions from CR --- Cargo.lock | 12 +- Cargo.toml | 1 + config/standalone.example.toml | 16 +- src/catalog/src/remote.rs | 7 + src/client/src/region.rs | 15 +- src/cmd/Cargo.toml | 1 + src/cmd/src/bin/greptime.rs | 2 +- src/cmd/src/cli/repl.rs | 8 - src/cmd/src/options.rs | 10 +- src/cmd/src/standalone.rs | 95 +- src/common/config/src/lib.rs | 24 + src/common/meta/Cargo.toml | 2 + src/common/meta/src/cache_invalidator.rs | 10 + src/common/meta/src/datanode_manager.rs | 5 +- src/common/meta/src/ddl.rs | 12 +- src/common/meta/src/ddl_manager.rs | 15 +- src/common/meta/src/peer.rs | 2 +- src/common/meta/src/rpc/router.rs | 44 +- src/datanode/src/datanode.rs | 10 +- src/frontend/Cargo.toml | 4 + src/frontend/src/catalog.rs | 119 +-- src/frontend/src/error.rs | 23 + .../handler/invalidate_table_cache.rs | 13 +- src/frontend/src/heartbeat/handler/tests.rs | 9 +- src/frontend/src/insert.rs | 35 +- src/frontend/src/instance.rs | 148 ++-- src/frontend/src/instance/distributed.rs | 816 +----------------- .../src/instance/distributed/deleter.rs | 69 +- .../src/instance/distributed/inserter.rs | 78 +- src/frontend/src/instance/grpc.rs | 44 +- src/frontend/src/instance/standalone.rs | 116 ++- src/frontend/src/lib.rs | 2 + src/frontend/src/statement.rs | 89 +- src/frontend/src/statement/ddl.rs | 626 ++++++++++++++ src/frontend/src/statement/show.rs | 70 +- src/frontend/src/table.rs | 322 +++---- src/log-store/Cargo.toml | 2 +- src/log-store/src/raft_engine.rs | 3 + src/log-store/src/raft_engine/backend.rs | 13 +- src/meta-client/src/client.rs | 4 +- src/meta-srv/src/metasrv.rs | 6 +- src/meta-srv/src/metasrv/builder.rs | 4 +- src/meta-srv/src/table_creator.rs | 10 +- src/partition/src/error.rs | 7 + src/partition/src/manager.rs | 68 +- 45 files changed, 1597 insertions(+), 1394 deletions(-) create mode 100644 src/frontend/src/statement/ddl.rs diff --git a/Cargo.lock b/Cargo.lock index 75d33fc098..f8e46be4ee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1591,6 +1591,7 @@ dependencies = [ "common-config", "common-error", "common-meta", + "common-procedure", "common-query", "common-recordbatch", "common-telemetry", @@ -1840,6 +1841,7 @@ name = "common-meta" version = "0.4.0-nightly" dependencies = [ "api", + "arrow-flight", "async-stream", "async-trait", "chrono", @@ -1847,6 +1849,7 @@ dependencies = [ "common-error", "common-grpc-expr", "common-procedure", + "common-recordbatch", "common-runtime", "common-telemetry", "common-time", @@ -3242,12 +3245,14 @@ dependencies = [ "client", "common-base", "common-catalog", + "common-config", "common-datasource", "common-error", "common-function", "common-grpc", "common-grpc-expr", "common-meta", + "common-procedure", "common-query", "common-recordbatch", "common-runtime", @@ -3264,6 +3269,7 @@ dependencies = [ "futures-util", "humantime-serde", "itertools 0.10.5", + "log-store", "meta-client", "meta-srv", "meter-core", @@ -3277,6 +3283,7 @@ dependencies = [ "partition", "prost", "query", + "raft-engine", "regex", "script", "serde", @@ -7275,8 +7282,9 @@ dependencies = [ [[package]] name = "raft-engine" -version = "0.3.0" -source = "git+https://github.com/tikv/raft-engine.git?rev=2dcaf5beeea3d5de9ec9c7133a2451d00f508f52#2dcaf5beeea3d5de9ec9c7133a2451d00f508f52" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e02bdc8cba47cb7062b433f56700a8edbc9fcd6d706389120d20aa1827e5ba7b" dependencies = [ "byteorder", "crc32fast", diff --git a/Cargo.toml b/Cargo.toml index bd8e99879c..cbc256cf06 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -153,6 +153,7 @@ object-store = { path = "src/object-store" } partition = { path = "src/partition" } promql = { path = "src/promql" } query = { path = "src/query" } +raft-engine = { version = "0.4" } script = { path = "src/script" } servers = { path = "src/servers" } session = { path = "src/session" } diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 5e6757777e..7bd5b69cb7 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -82,8 +82,6 @@ enable = true # WAL options. [wal] -# WAL data directory -# dir = "/tmp/greptimedb/wal" # WAL file size in bytes. file_size = "256MB" # WAL purge threshold. @@ -95,6 +93,20 @@ read_batch_size = 128 # Whether to sync log file after every write. sync_write = false +# Kv options. +[kv_store] +# Kv file size in bytes. +file_size = "256MB" +# Kv purge threshold. +purge_threshold = "4GB" + +# Procedure storage options. +[procedure] +# Procedure max retry time. +max_retry_times = 3 +# Initial retry delay of procedures, increases exponentially +retry_delay = "500ms" + # Storage options. [storage] # The working home directory. diff --git a/src/catalog/src/remote.rs b/src/catalog/src/remote.rs index 9cdd84e490..802cb06a20 100644 --- a/src/catalog/src/remote.rs +++ b/src/catalog/src/remote.rs @@ -27,3 +27,10 @@ pub trait KvCacheInvalidator: Send + Sync { } pub type KvCacheInvalidatorRef = Arc; + +pub struct DummyKvCacheInvalidator; + +#[async_trait::async_trait] +impl KvCacheInvalidator for DummyKvCacheInvalidator { + async fn invalidate_key(&self, _key: &[u8]) {} +} diff --git a/src/client/src/region.rs b/src/client/src/region.rs index 5869f5971a..a8bc305740 100644 --- a/src/client/src/region.rs +++ b/src/client/src/region.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use api::v1::region::{RegionRequest, RegionResponse}; +use api::v1::region::{QueryRequest, RegionRequest, RegionResponse}; use api::v1::ResponseHeader; use arrow_flight::Ticket; use async_stream::stream; @@ -25,6 +25,7 @@ use common_meta::error::{self as meta_error, Result as MetaResult}; use common_recordbatch::error::ExternalSnafu; use common_recordbatch::{RecordBatchStreamAdaptor, SendableRecordBatchStream}; use common_telemetry::{error, timer}; +use prost::Message; use snafu::{location, Location, OptionExt, ResultExt}; use tokio_stream::StreamExt; @@ -56,6 +57,16 @@ impl Datanode for RegionRequester { } }) } + + async fn handle_query(&self, request: QueryRequest) -> MetaResult { + let ticket = Ticket { + ticket: request.encode_to_vec().into(), + }; + self.do_get_inner(ticket) + .await + .map_err(BoxedError::new) + .context(meta_error::ExternalSnafu) + } } impl RegionRequester { @@ -63,7 +74,7 @@ impl RegionRequester { Self { client } } - pub async fn do_get(&self, ticket: Ticket) -> Result { + pub async fn do_get_inner(&self, ticket: Ticket) -> Result { let mut flight_client = self.client.make_flight_client()?; let response = flight_client .mut_inner() diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml index 1c49432352..2997eb0f30 100644 --- a/src/cmd/Cargo.toml +++ b/src/cmd/Cargo.toml @@ -26,6 +26,7 @@ common-base = { workspace = true } common-config = { workspace = true } common-error = { workspace = true } common-meta = { workspace = true } +common-procedure = { workspace = true } common-query = { workspace = true } common-recordbatch = { workspace = true } common-telemetry = { workspace = true, features = [ diff --git a/src/cmd/src/bin/greptime.rs b/src/cmd/src/bin/greptime.rs index 1be0982eec..819e79b2c8 100644 --- a/src/cmd/src/bin/greptime.rs +++ b/src/cmd/src/bin/greptime.rs @@ -116,7 +116,7 @@ impl SubCommand { Ok(Application::Metasrv(app)) } (SubCommand::Standalone(cmd), Options::Standalone(opts)) => { - let app = cmd.build(opts.fe_opts, opts.dn_opts).await?; + let app = cmd.build(*opts).await?; Ok(Application::Standalone(app)) } (SubCommand::Cli(cmd), Options::Cli(_)) => { diff --git a/src/cmd/src/cli/repl.rs b/src/cmd/src/cli/repl.rs index f8d6832415..1e2513af6a 100644 --- a/src/cmd/src/cli/repl.rs +++ b/src/cmd/src/cli/repl.rs @@ -21,15 +21,12 @@ use client::client_manager::DatanodeClients; use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_base::Plugins; use common_error::ext::ErrorExt; -use common_meta::key::TableMetadataManager; use common_query::Output; use common_recordbatch::RecordBatches; use common_telemetry::logging; use either::Either; use frontend::catalog::FrontendCatalogManager; use meta_client::client::MetaClientBuilder; -use partition::manager::PartitionRuleManager; -use partition::route::TableRoutes; use query::datafusion::DatafusionQueryEngine; use query::logical_optimizer::LogicalOptimizer; use query::parser::QueryLanguageParser; @@ -254,17 +251,12 @@ async fn create_query_engine(meta_addr: &str) -> Result { let cached_meta_backend = Arc::new(CachedMetaKvBackend::new(meta_client.clone())); - let table_routes = Arc::new(TableRoutes::new(meta_client)); - let partition_manager = Arc::new(PartitionRuleManager::new(table_routes)); - let datanode_clients = Arc::new(DatanodeClients::default()); let catalog_list = Arc::new(FrontendCatalogManager::new( cached_meta_backend.clone(), cached_meta_backend.clone(), - partition_manager, datanode_clients, - Arc::new(TableMetadataManager::new(cached_meta_backend)), )); let plugins: Arc = Default::default(); let state = Arc::new(QueryEngineState::new( diff --git a/src/cmd/src/options.rs b/src/cmd/src/options.rs index 7058b63c3b..0f87c135ad 100644 --- a/src/cmd/src/options.rs +++ b/src/cmd/src/options.rs @@ -12,9 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +use common_config::KvStoreConfig; use common_telemetry::logging::LoggingOptions; use config::{Config, Environment, File, FileFormat}; -use datanode::datanode::DatanodeOptions; +use datanode::datanode::{DatanodeOptions, ProcedureConfig}; use frontend::frontend::FrontendOptions; use meta_srv::metasrv::MetaSrvOptions; use serde::{Deserialize, Serialize}; @@ -26,9 +27,12 @@ pub const ENV_VAR_SEP: &str = "__"; pub const ENV_LIST_SEP: &str = ","; pub struct MixOptions { + pub data_home: String, + pub procedure_cfg: ProcedureConfig, + pub kv_store_cfg: KvStoreConfig, pub fe_opts: FrontendOptions, pub dn_opts: DatanodeOptions, - pub logging: LoggingOptions, + pub logging_opts: LoggingOptions, } pub enum Options { @@ -51,7 +55,7 @@ impl Options { Options::Datanode(opts) => &opts.logging, Options::Frontend(opts) => &opts.logging, Options::Metasrv(opts) => &opts.logging, - Options::Standalone(opts) => &opts.logging, + Options::Standalone(opts) => &opts.logging_opts, Options::Cli(opts) => opts, } } diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 42d69d8f0c..a0ca946741 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -14,19 +14,24 @@ use std::sync::Arc; +use catalog::remote::DummyKvCacheInvalidator; +use catalog::CatalogManagerRef; use clap::Parser; use common_base::Plugins; -use common_config::WalConfig; +use common_config::{kv_store_dir, KvStoreConfig, WalConfig}; +use common_meta::kv_backend::KvBackendRef; +use common_procedure::ProcedureManagerRef; use common_telemetry::info; use common_telemetry::logging::LoggingOptions; use datanode::datanode::{Datanode, DatanodeOptions, ProcedureConfig, StorageConfig}; use datanode::region_server::RegionServer; -use datanode::Instance as InstanceRef; +use frontend::catalog::FrontendCatalogManager; use frontend::frontend::FrontendOptions; -use frontend::instance::{FrontendInstance, Instance as FeInstance}; +use frontend::instance::{FrontendInstance, Instance as FeInstance, StandaloneDatanodeManager}; use frontend::service_config::{ GrpcOptions, InfluxdbOptions, MysqlOptions, OpentsdbOptions, PostgresOptions, PromStoreOptions, }; +use query::QueryEngineRef; use serde::{Deserialize, Serialize}; use servers::http::HttpOptions; use servers::tls::{TlsMode, TlsOption}; @@ -47,12 +52,8 @@ pub struct Command { } impl Command { - pub async fn build( - self, - fe_opts: FrontendOptions, - dn_opts: DatanodeOptions, - ) -> Result { - self.subcmd.build(fe_opts, dn_opts).await + pub async fn build(self, opts: MixOptions) -> Result { + self.subcmd.build(opts).await } pub fn load_options(&self, top_level_options: TopLevelOptions) -> Result { @@ -66,9 +67,9 @@ enum SubCommand { } impl SubCommand { - async fn build(self, fe_opts: FrontendOptions, dn_opts: DatanodeOptions) -> Result { + async fn build(self, opts: MixOptions) -> Result { match self { - SubCommand::Start(cmd) => cmd.build(fe_opts, dn_opts).await, + SubCommand::Start(cmd) => cmd.build(opts).await, } } @@ -93,6 +94,7 @@ pub struct StandaloneOptions { pub prom_store_options: PromStoreOptions, pub wal: WalConfig, pub storage: StorageConfig, + pub kv_store: KvStoreConfig, pub procedure: ProcedureConfig, pub logging: LoggingOptions, } @@ -111,6 +113,7 @@ impl Default for StandaloneOptions { prom_store_options: PromStoreOptions::default(), wal: WalConfig::default(), storage: StorageConfig::default(), + kv_store: KvStoreConfig::default(), procedure: ProcedureConfig::default(), logging: LoggingOptions::default(), } @@ -265,23 +268,29 @@ impl StartCommand { if self.influxdb_enable { opts.influxdb_options.enable = self.influxdb_enable; } - + let kv_store_cfg = opts.kv_store.clone(); + let procedure_cfg = opts.procedure.clone(); let fe_opts = opts.clone().frontend_options(); - let logging = opts.logging.clone(); + let logging_opts = opts.logging.clone(); let dn_opts = opts.datanode_options(); Ok(Options::Standalone(Box::new(MixOptions { + procedure_cfg, + kv_store_cfg, + data_home: dn_opts.storage.data_home.to_string(), fe_opts, dn_opts, - logging, + logging_opts, }))) } #[allow(unreachable_code)] #[allow(unused_variables)] #[allow(clippy::diverging_sub_expression)] - async fn build(self, fe_opts: FrontendOptions, dn_opts: DatanodeOptions) -> Result { + async fn build(self, opts: MixOptions) -> Result { let plugins = Arc::new(load_frontend_plugins(&self.user_provider)?); + let fe_opts = opts.fe_opts; + let dn_opts = opts.dn_opts; info!("Standalone start command: {:#?}", self); info!( @@ -289,13 +298,36 @@ impl StartCommand { fe_opts, dn_opts ); - let datanode = Datanode::new(dn_opts.clone(), Default::default()) + let kv_dir = kv_store_dir(&opts.data_home); + let (kv_store, procedure_manager) = FeInstance::try_build_standalone_components( + kv_dir, + opts.kv_store_cfg, + opts.procedure_cfg, + ) + .await + .context(StartFrontendSnafu)?; + + let datanode = Datanode::new(dn_opts.clone(), plugins.clone()) .await .context(StartDatanodeSnafu)?; + let region_server = datanode.region_server(); + + let catalog_manager = Arc::new(FrontendCatalogManager::new( + kv_store.clone(), + Arc::new(DummyKvCacheInvalidator), + Arc::new(StandaloneDatanodeManager(region_server.clone())), + )); // TODO: build frontend instance like in distributed mode - let mut frontend = - build_frontend(plugins.clone(), todo!(), datanode.region_server()).await?; + let mut frontend = build_frontend( + plugins, + kv_store, + procedure_manager, + catalog_manager, + datanode.query_engine(), + region_server, + ) + .await?; frontend .build_servers(&fe_opts) @@ -309,12 +341,21 @@ impl StartCommand { /// Build frontend instance in standalone mode async fn build_frontend( plugins: Arc, - datanode_instance: InstanceRef, + kv_store: KvBackendRef, + procedure_manager: ProcedureManagerRef, + catalog_manager: CatalogManagerRef, + query_engine: QueryEngineRef, region_server: RegionServer, ) -> Result { - let mut frontend_instance = FeInstance::try_new_standalone(datanode_instance, region_server) - .await - .context(StartFrontendSnafu)?; + let mut frontend_instance = FeInstance::try_new_standalone( + kv_store, + procedure_manager, + catalog_manager, + query_engine, + region_server, + ) + .await + .context(StartFrontendSnafu)?; frontend_instance.set_plugins(plugins.clone()); Ok(frontend_instance) } @@ -411,7 +452,7 @@ mod tests { }; let fe_opts = options.fe_opts; let dn_opts = options.dn_opts; - let logging_opts = options.logging; + let logging_opts = options.logging_opts; assert_eq!(Mode::Standalone, fe_opts.mode); assert_eq!("127.0.0.1:4000".to_string(), fe_opts.http_options.addr); assert_eq!(Duration::from_secs(30), fe_opts.http_options.timeout); @@ -456,8 +497,8 @@ mod tests { unreachable!() }; - assert_eq!("/tmp/greptimedb/test/logs", opts.logging.dir); - assert_eq!("debug", opts.logging.level.unwrap()); + assert_eq!("/tmp/greptimedb/test/logs", opts.logging_opts.dir); + assert_eq!("debug", opts.logging_opts.level.unwrap()); } #[test] @@ -526,10 +567,10 @@ mod tests { }; // Should be read from env, env > default values. - assert_eq!(opts.logging.dir, "/other/log/dir"); + assert_eq!(opts.logging_opts.dir, "/other/log/dir"); // Should be read from config file, config file > env > default values. - assert_eq!(opts.logging.level.as_ref().unwrap(), "debug"); + assert_eq!(opts.logging_opts.level.as_ref().unwrap(), "debug"); // Should be read from cli, cli > config file > env > default values. assert_eq!(opts.fe_opts.http_options.addr, "127.0.0.1:14000"); diff --git a/src/common/config/src/lib.rs b/src/common/config/src/lib.rs index 785e66ca40..36bedcb32f 100644 --- a/src/common/config/src/lib.rs +++ b/src/common/config/src/lib.rs @@ -44,3 +44,27 @@ impl Default for WalConfig { } } } + +pub fn kv_store_dir(store_dir: &str) -> String { + format!("{store_dir}/kv") +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default)] +pub struct KvStoreConfig { + // Kv file size in bytes + pub file_size: ReadableSize, + // Kv purge threshold in bytes + pub purge_threshold: ReadableSize, +} + +impl Default for KvStoreConfig { + fn default() -> Self { + Self { + // log file size 256MB + file_size: ReadableSize::mb(256), + // purge threshold 4GB + purge_threshold: ReadableSize::gb(4), + } + } +} diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index 9733c2970a..2bd4a56ab4 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -6,12 +6,14 @@ license.workspace = true [dependencies] api = { workspace = true } +arrow-flight.workspace = true async-stream.workspace = true async-trait.workspace = true common-catalog = { workspace = true } common-error = { workspace = true } common-grpc-expr.workspace = true common-procedure = { workspace = true } +common-recordbatch = { workspace = true } common-runtime = { workspace = true } common-telemetry = { workspace = true } common-time = { workspace = true } diff --git a/src/common/meta/src/cache_invalidator.rs b/src/common/meta/src/cache_invalidator.rs index 993ac3b098..f1f23bbc56 100644 --- a/src/common/meta/src/cache_invalidator.rs +++ b/src/common/meta/src/cache_invalidator.rs @@ -18,6 +18,7 @@ use crate::error::Result; use crate::ident::TableIdent; /// Places context of invalidating cache. e.g., span id, trace id etc. +#[derive(Debug, Default)] pub struct Context { pub subject: Option, } @@ -29,3 +30,12 @@ pub trait CacheInvalidator: Send + Sync { } pub type CacheInvalidatorRef = Arc; + +pub struct DummyCacheInvalidator; + +#[async_trait::async_trait] +impl CacheInvalidator for DummyCacheInvalidator { + async fn invalidate_table(&self, _ctx: &Context, _table_ident: TableIdent) -> Result<()> { + Ok(()) + } +} diff --git a/src/common/meta/src/datanode_manager.rs b/src/common/meta/src/datanode_manager.rs index 93733664ad..9f679d262c 100644 --- a/src/common/meta/src/datanode_manager.rs +++ b/src/common/meta/src/datanode_manager.rs @@ -14,7 +14,8 @@ use std::sync::Arc; -use api::v1::region::RegionRequest; +use api::v1::region::{QueryRequest, RegionRequest}; +use common_recordbatch::SendableRecordBatchStream; use crate::error::Result; use crate::peer::Peer; @@ -25,6 +26,8 @@ pub type AffectedRows = u64; pub trait Datanode: Send + Sync { /// Handles DML, and DDL requests. async fn handle(&self, request: RegionRequest) -> Result; + + async fn handle_query(&self, request: QueryRequest) -> Result; } pub type DatanodeRef = Arc; diff --git a/src/common/meta/src/ddl.rs b/src/common/meta/src/ddl.rs index 220274100f..f848252524 100644 --- a/src/common/meta/src/ddl.rs +++ b/src/common/meta/src/ddl.rs @@ -36,7 +36,7 @@ pub struct ExecutorContext { } #[async_trait::async_trait] -pub trait DdlExecutor: Send + Sync { +pub trait DdlTaskExecutor: Send + Sync { async fn submit_ddl_task( &self, ctx: &ExecutorContext, @@ -44,23 +44,23 @@ pub trait DdlExecutor: Send + Sync { ) -> Result; } -pub type DdlExecutorRef = Arc; +pub type DdlTaskExecutorRef = Arc; -pub struct TableCreatorContext { +pub struct TableMetadataAllocatorContext { pub cluster_id: u64, } #[async_trait::async_trait] -pub trait TableCreator: Send + Sync { +pub trait TableMetadataAllocator: Send + Sync { async fn create( &self, - ctx: &TableCreatorContext, + ctx: &TableMetadataAllocatorContext, table_info: &mut RawTableInfo, partitions: &[Partition], ) -> Result<(TableId, Vec)>; } -pub type TableCreatorRef = Arc; +pub type TableMetadataAllocatorRef = Arc; #[derive(Clone)] pub struct DdlContext { diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 569c213a20..1814b38645 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -23,7 +23,10 @@ use crate::datanode_manager::DatanodeManagerRef; use crate::ddl::alter_table::AlterTableProcedure; use crate::ddl::create_table::CreateTableProcedure; use crate::ddl::drop_table::DropTableProcedure; -use crate::ddl::{DdlContext, DdlExecutor, ExecutorContext, TableCreatorContext, TableCreatorRef}; +use crate::ddl::{ + DdlContext, DdlTaskExecutor, ExecutorContext, TableMetadataAllocatorContext, + TableMetadataAllocatorRef, +}; use crate::error::{ self, RegisterProcedureLoaderSnafu, Result, SubmitProcedureSnafu, TableNotFoundSnafu, UnsupportedSnafu, WaitProcedureSnafu, @@ -46,7 +49,7 @@ pub struct DdlManager { datanode_manager: DatanodeManagerRef, cache_invalidator: CacheInvalidatorRef, table_metadata_manager: TableMetadataManagerRef, - table_creator: TableCreatorRef, + table_creator: TableMetadataAllocatorRef, } impl DdlManager { @@ -55,7 +58,7 @@ impl DdlManager { datanode_clients: DatanodeManagerRef, cache_invalidator: CacheInvalidatorRef, table_metadata_manager: TableMetadataManagerRef, - table_creator: TableCreatorRef, + table_creator: TableMetadataAllocatorRef, ) -> Self { Self { procedure_manager, @@ -332,7 +335,7 @@ async fn handle_create_table_task( let (table_id, region_routes) = ddl_manager .table_creator .create( - &TableCreatorContext { cluster_id }, + &TableMetadataAllocatorContext { cluster_id }, &mut create_table_task.table_info, &create_table_task.partitions, ) @@ -342,7 +345,7 @@ async fn handle_create_table_task( .submit_create_table_task(cluster_id, create_table_task, region_routes) .await?; - info!("Table: {table_id} is created via procedure_id {id:?}"); + info!("Table: {table_id:?} is created via procedure_id {id:?}"); Ok(SubmitDdlTaskResponse { key: id.to_string().into(), @@ -351,7 +354,7 @@ async fn handle_create_table_task( } #[async_trait::async_trait] -impl DdlExecutor for DdlManager { +impl DdlTaskExecutor for DdlManager { async fn submit_ddl_task( &self, ctx: &ExecutorContext, diff --git a/src/common/meta/src/peer.rs b/src/common/meta/src/peer.rs index 535a47bf60..483250cbf1 100644 --- a/src/common/meta/src/peer.rs +++ b/src/common/meta/src/peer.rs @@ -17,7 +17,7 @@ use std::fmt::{Display, Formatter}; use api::v1::meta::Peer as PbPeer; use serde::{Deserialize, Serialize}; -#[derive(Debug, Clone, Hash, Eq, PartialEq, Deserialize, Serialize)] +#[derive(Debug, Default, Clone, Hash, Eq, PartialEq, Deserialize, Serialize)] pub struct Peer { /// Node identifier. Unique in a cluster. pub id: u64, diff --git a/src/common/meta/src/rpc/router.rs b/src/common/meta/src/rpc/router.rs index 9772b6e8d1..63d4880c23 100644 --- a/src/common/meta/src/rpc/router.rs +++ b/src/common/meta/src/rpc/router.rs @@ -77,16 +77,10 @@ impl TryFrom for RouteResponse { pub fn region_distribution(region_routes: &[RegionRoute]) -> Result { let mut regions_id_map = RegionDistribution::new(); for route in region_routes.iter() { - let node_id = route - .leader_peer - .as_ref() - .context(error::UnexpectedSnafu { - err_msg: "leader not found", - })? - .id; - - let region_id = route.region.id.region_number(); - regions_id_map.entry(node_id).or_default().push(region_id); + if let Some(peer) = route.leader_peer.as_ref() { + let region_id = route.region.id.region_number(); + regions_id_map.entry(peer.id).or_default().push(region_id); + } } for (_, regions) in regions_id_map.iter_mut() { // id asc @@ -110,6 +104,24 @@ pub fn find_leaders(region_routes: &[RegionRoute]) -> HashSet { .collect() } +pub fn convert_to_region_map(region_routes: &[RegionRoute]) -> HashMap { + region_routes + .iter() + .filter_map(|x| { + x.leader_peer + .as_ref() + .map(|leader| (x.region.id.region_number(), leader)) + }) + .collect::>() +} + +pub fn find_region_leader(region_routes: &[RegionRoute], region_number: u32) -> Option<&Peer> { + region_routes + .iter() + .find(|x| x.region.id.region_number() == region_number) + .and_then(|r| r.leader_peer.as_ref()) +} + pub fn find_leader_regions(region_routes: &[RegionRoute], datanode: &Peer) -> Vec { region_routes .iter() @@ -333,6 +345,18 @@ pub struct RegionRoute { pub follower_peers: Vec, } +pub struct RegionRoutes(pub Vec); + +impl RegionRoutes { + pub fn region_map(&self) -> HashMap { + convert_to_region_map(&self.0) + } + + pub fn find_region_leader(&self, region_number: u32) -> Option<&Peer> { + self.region_map().get(®ion_number).copied() + } +} + #[derive(Debug, Clone, Default, Deserialize, Serialize, PartialEq)] pub struct Region { pub id: RegionId, diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 520ac9bca3..fe9791ee60 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -33,7 +33,7 @@ use meta_client::MetaClientOptions; use mito2::config::MitoConfig; use mito2::engine::MitoEngine; use object_store::util::normalize_dir; -use query::QueryEngineFactory; +use query::{QueryEngineFactory, QueryEngineRef}; use secrecy::SecretString; use serde::{Deserialize, Serialize}; use servers::heartbeat_options::HeartbeatOptions; @@ -401,6 +401,7 @@ pub struct Datanode { services: Option, heartbeat_task: Option, region_server: RegionServer, + query_engine: QueryEngineRef, greptimedb_telemetry_task: Arc, } @@ -423,7 +424,7 @@ impl Datanode { .context(RuntimeResourceSnafu)?, ); - let mut region_server = RegionServer::new(query_engine, runtime.clone()); + let mut region_server = RegionServer::new(query_engine.clone(), runtime.clone()); let log_store = Self::build_log_store(&opts).await?; let object_store = store::new_object_store(&opts).await?; let engines = Self::build_store_engines(&opts, log_store, object_store).await?; @@ -454,6 +455,7 @@ impl Datanode { services, heartbeat_task, region_server, + query_engine, greptimedb_telemetry_task, }) } @@ -502,6 +504,10 @@ impl Datanode { self.region_server.clone() } + pub fn query_engine(&self) -> QueryEngineRef { + self.query_engine.clone() + } + // internal utils /// Build [RaftEngineLogStore] diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 8b2edc804c..6bab5ad824 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -21,12 +21,14 @@ chrono.workspace = true client = { workspace = true } common-base = { workspace = true } common-catalog = { workspace = true } +common-config = { workspace = true } common-datasource = { workspace = true } common-error = { workspace = true } common-function = { workspace = true } common-grpc = { workspace = true } common-grpc-expr = { workspace = true } common-meta = { workspace = true } +common-procedure = { workspace = true } common-query = { workspace = true } common-recordbatch = { workspace = true } common-runtime = { workspace = true } @@ -42,7 +44,9 @@ futures = "0.3" futures-util.workspace = true humantime-serde.workspace = true itertools.workspace = true +log-store = { workspace = true } meta-client = { workspace = true } +raft-engine = { workspace = true } # Although it is not used, please do not delete it. meter-core.workspace = true meter-macros.workspace = true diff --git a/src/frontend/src/catalog.rs b/src/frontend/src/catalog.rs index 3d30a00f41..13d8a4d7d1 100644 --- a/src/frontend/src/catalog.rs +++ b/src/frontend/src/catalog.rs @@ -26,22 +26,25 @@ use catalog::{ CatalogManager, DeregisterSchemaRequest, DeregisterTableRequest, RegisterSchemaRequest, RegisterTableRequest, RenameTableRequest, }; -use client::client_manager::DatanodeClients; use common_catalog::consts::{ DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, NUMBERS_TABLE_ID, }; use common_error::ext::BoxedError; +use common_meta::cache_invalidator::{CacheInvalidator, Context}; +use common_meta::datanode_manager::DatanodeManagerRef; +use common_meta::error::Result as MetaResult; +use common_meta::ident::TableIdent; use common_meta::key::catalog_name::CatalogNameKey; use common_meta::key::schema_name::SchemaNameKey; use common_meta::key::table_info::TableInfoKey; use common_meta::key::table_name::TableNameKey; -use common_meta::key::{TableMetaKey, TableMetadataManagerRef}; +use common_meta::key::table_route::NextTableRouteKey; +use common_meta::key::{TableMetaKey, TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::KvBackendRef; use common_telemetry::debug; use futures_util::TryStreamExt; -use partition::manager::PartitionRuleManagerRef; +use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef}; use snafu::prelude::*; -use table::metadata::TableId; use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME}; use table::TableRef; @@ -52,57 +55,19 @@ pub struct FrontendCatalogManager { backend: KvBackendRef, backend_cache_invalidator: KvCacheInvalidatorRef, partition_manager: PartitionRuleManagerRef, - datanode_clients: Arc, table_metadata_manager: TableMetadataManagerRef, + datanode_manager: DatanodeManagerRef, } -impl FrontendCatalogManager { - pub fn new( - backend: KvBackendRef, - backend_cache_invalidator: KvCacheInvalidatorRef, - partition_manager: PartitionRuleManagerRef, - datanode_clients: Arc, - table_metadata_manager: TableMetadataManagerRef, - ) -> Self { - Self { - backend, - backend_cache_invalidator, - partition_manager, - datanode_clients, - table_metadata_manager, - } - } - - pub fn backend(&self) -> KvBackendRef { - self.backend.clone() - } - - pub fn partition_manager(&self) -> PartitionRuleManagerRef { - self.partition_manager.clone() - } - - pub fn table_metadata_manager_ref(&self) -> &TableMetadataManagerRef { - &self.table_metadata_manager - } - - pub fn datanode_clients(&self) -> Arc { - self.datanode_clients.clone() - } - - pub async fn invalidate_schema(&self, catalog: &str, schema: &str) { - let key = SchemaNameKey::new(catalog, schema).as_raw_key(); - - self.backend_cache_invalidator.invalidate_key(&key).await; - } - - pub async fn invalidate_table( - &self, - catalog: &str, - schema: &str, - table: &str, - table_id: TableId, - ) { - let key = TableNameKey::new(catalog, schema, table); +#[async_trait::async_trait] +impl CacheInvalidator for FrontendCatalogManager { + async fn invalidate_table(&self, _ctx: &Context, table_ident: TableIdent) -> MetaResult<()> { + let table_id = table_ident.table_id; + let key = TableNameKey::new( + &table_ident.catalog, + &table_ident.schema, + &table_ident.table, + ); self.backend_cache_invalidator .invalidate_key(&key.as_raw_key()) .await; @@ -120,10 +85,54 @@ impl FrontendCatalogManager { String::from_utf8_lossy(&key.as_raw_key()) ); - self.partition_manager - .table_routes() - .invalidate_table_route(table_id) + let key = &NextTableRouteKey { table_id }; + self.backend_cache_invalidator + .invalidate_key(&key.as_raw_key()) .await; + debug!( + "invalidated cache key: {}", + String::from_utf8_lossy(&key.as_raw_key()) + ); + + Ok(()) + } +} + +impl FrontendCatalogManager { + pub fn new( + backend: KvBackendRef, + backend_cache_invalidator: KvCacheInvalidatorRef, + datanode_manager: DatanodeManagerRef, + ) -> Self { + Self { + backend: backend.clone(), + backend_cache_invalidator, + partition_manager: Arc::new(PartitionRuleManager::new(backend.clone())), + table_metadata_manager: Arc::new(TableMetadataManager::new(backend)), + datanode_manager, + } + } + + pub fn backend(&self) -> KvBackendRef { + self.backend.clone() + } + + pub fn partition_manager(&self) -> PartitionRuleManagerRef { + self.partition_manager.clone() + } + + pub fn table_metadata_manager_ref(&self) -> &TableMetadataManagerRef { + &self.table_metadata_manager + } + + pub fn datanode_manager(&self) -> DatanodeManagerRef { + self.datanode_manager.clone() + } + + pub async fn invalidate_schema(&self, catalog: &str, schema: &str) { + let key = SchemaNameKey::new(catalog, schema).as_raw_key(); + + self.backend_cache_invalidator.invalidate_key(&key).await; } } diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index 15709a3e00..3fd9055777 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -27,6 +27,18 @@ use store_api::storage::RegionNumber; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { + #[snafu(display("Failed to invalidate table cache, source: {}", source))] + InvalidateTableCache { + location: Location, + source: common_meta::error::Error, + }, + + #[snafu(display("Failed to open raft engine backend, source: {}", source))] + OpenRaftEngineBackend { + location: Location, + source: BoxedError, + }, + #[snafu(display("Failed to execute ddl, source: {}", source))] ExecuteDdl { location: Location, @@ -57,6 +69,12 @@ pub enum Error { source: client::Error, }, + #[snafu(display("Failed to query, source: {}", source))] + RequestQuery { + #[snafu(backtrace)] + source: common_meta::error::Error, + }, + #[snafu(display("Failed to insert data, source: {}", source))] RequestInserts { #[snafu(backtrace)] @@ -710,6 +728,8 @@ impl ErrorExt for Error { source.status_code() } + Error::InvalidateTableCache { source, .. } => source.status_code(), + Error::ParseFileFormat { source, .. } | Error::InferSchema { source, .. } => { source.status_code() } @@ -722,7 +742,10 @@ impl ErrorExt for Error { | Error::CreateTableInfo { source } | Error::IntoVectors { source } => source.status_code(), + Error::OpenRaftEngineBackend { .. } => StatusCode::StorageUnavailable, + Error::RequestDatanode { source } => source.status_code(), + Error::RequestQuery { source } => source.status_code(), Error::RequestInserts { source } => source.status_code(), Error::RequestDeletes { source } => source.status_code(), diff --git a/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs b/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs index 310791571a..631fb861fd 100644 --- a/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs +++ b/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs @@ -22,14 +22,13 @@ use common_meta::ident::TableIdent; use common_meta::instruction::{Instruction, InstructionReply, SimpleReply}; use common_meta::key::table_info::TableInfoKey; use common_meta::key::table_name::TableNameKey; +use common_meta::key::table_route::NextTableRouteKey; use common_meta::key::TableMetaKey; use common_telemetry::error; -use partition::manager::TableRouteCacheInvalidatorRef; #[derive(Clone)] pub struct InvalidateTableCacheHandler { backend_cache_invalidator: KvCacheInvalidatorRef, - table_route_cache_invalidator: TableRouteCacheInvalidatorRef, } #[async_trait] @@ -73,13 +72,9 @@ impl HeartbeatResponseHandler for InvalidateTableCacheHandler { } impl InvalidateTableCacheHandler { - pub fn new( - backend_cache_invalidator: KvCacheInvalidatorRef, - table_route_cache_invalidator: TableRouteCacheInvalidatorRef, - ) -> Self { + pub fn new(backend_cache_invalidator: KvCacheInvalidatorRef) -> Self { Self { backend_cache_invalidator, - table_route_cache_invalidator, } } @@ -100,8 +95,8 @@ impl InvalidateTableCacheHandler { ) .await; - self.table_route_cache_invalidator - .invalidate_table_route(table_id) + self.backend_cache_invalidator + .invalidate_key(&NextTableRouteKey { table_id }.as_raw_key()) .await; } } diff --git a/src/frontend/src/heartbeat/handler/tests.rs b/src/frontend/src/heartbeat/handler/tests.rs index 8fec54cba9..02fcd6d39d 100644 --- a/src/frontend/src/heartbeat/handler/tests.rs +++ b/src/frontend/src/heartbeat/handler/tests.rs @@ -64,13 +64,8 @@ async fn test_invalidate_table_cache_handler() { inner: Mutex::new(inner), }); - let inner = HashMap::from([(table_id, 1)]); - let table_route = Arc::new(MockTableRouteCacheInvalidator { - inner: Mutex::new(inner), - }); - let executor = Arc::new(HandlerGroupExecutor::new(vec![Arc::new( - InvalidateTableCacheHandler::new(backend.clone(), table_route.clone()), + InvalidateTableCacheHandler::new(backend.clone()), )])); let (tx, mut rx) = mpsc::channel(8); @@ -101,8 +96,6 @@ async fn test_invalidate_table_cache_handler() { .unwrap() .contains_key(&table_info_key.as_raw_key())); - assert!(!table_route.inner.lock().unwrap().contains_key(&table_id)); - // removes a invalid key handle_instruction( executor, diff --git a/src/frontend/src/insert.rs b/src/frontend/src/insert.rs index 4283be3e4b..b5c82a5da0 100644 --- a/src/frontend/src/insert.rs +++ b/src/frontend/src/insert.rs @@ -13,12 +13,8 @@ // limitations under the License. use api::v1::alter_expr::Kind; -use api::v1::ddl_request::Expr as DdlExpr; -use api::v1::greptime_request::Request; use api::v1::region::region_request; -use api::v1::{ - AlterExpr, ColumnSchema, DdlRequest, InsertRequests, RowInsertRequest, RowInsertRequests, -}; +use api::v1::{AlterExpr, ColumnSchema, InsertRequests, RowInsertRequest, RowInsertRequests}; use catalog::CatalogManager; use client::region_handler::RegionRequestHandler; use common_catalog::consts::default_engine; @@ -26,23 +22,23 @@ use common_grpc_expr::util::{extract_new_columns, ColumnExpr}; use common_query::Output; use common_telemetry::info; use datatypes::schema::Schema; -use servers::query_handler::grpc::GrpcQueryHandlerRef; use session::context::QueryContextRef; use snafu::prelude::*; use table::engine::TableReference; use table::TableRef; use crate::error::{ - CatalogSnafu, Error, FindNewColumnsOnInsertionSnafu, InvalidInsertRequestSnafu, - RequestDatanodeSnafu, Result, + CatalogSnafu, FindNewColumnsOnInsertionSnafu, InvalidInsertRequestSnafu, RequestDatanodeSnafu, + Result, }; use crate::expr_factory::CreateExprFactory; use crate::req_convert::insert::{ColumnToRow, RowToRegion}; +use crate::statement::StatementExecutor; pub(crate) struct Inserter<'a> { catalog_manager: &'a dyn CatalogManager, create_expr_factory: &'a CreateExprFactory, - grpc_query_handler: &'a GrpcQueryHandlerRef, + statement_executor: &'a StatementExecutor, region_request_handler: &'a dyn RegionRequestHandler, } @@ -50,13 +46,13 @@ impl<'a> Inserter<'a> { pub fn new( catalog_manager: &'a dyn CatalogManager, create_expr_factory: &'a CreateExprFactory, - grpc_query_handler: &'a GrpcQueryHandlerRef, + statement_executor: &'a StatementExecutor, region_request_handler: &'a dyn RegionRequestHandler, ) -> Self { Self { catalog_manager, create_expr_factory, - grpc_query_handler, + statement_executor, region_request_handler, } } @@ -164,10 +160,9 @@ impl<'a> Inserter<'a> { kind: Some(Kind::AddColumns(add_columns)), }; - let req = Request::Ddl(DdlRequest { - expr: Some(DdlExpr::Alter(alter_table_expr)), - }); - self.grpc_query_handler.do_query(req, ctx.clone()).await?; + self.statement_executor + .alter_table_inner(alter_table_expr) + .await?; info!( "Successfully added new columns to table: {}.{}.{}", @@ -187,14 +182,14 @@ impl<'a> Inserter<'a> { table_ref.catalog, table_ref.schema, table_ref.table, ); - let create_table_expr = self + let mut create_table_expr = self .create_expr_factory .create_table_expr_by_column_schemas(&table_ref, request_schema, default_engine())?; - let req = Request::Ddl(DdlRequest { - expr: Some(DdlExpr::CreateTable(create_table_expr)), - }); - self.grpc_query_handler.do_query(req, ctx.clone()).await?; + // TODO(weny): multiple regions table. + self.statement_executor + .create_table_inner(&mut create_table_expr, None) + .await?; info!( "Successfully created table on insertion: {}.{}.{}", diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 065c97756a..a3d9872d02 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -20,7 +20,6 @@ mod otlp; mod prom_store; mod script; mod standalone; - use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; @@ -34,32 +33,38 @@ use catalog::CatalogManagerRef; use client::client_manager::DatanodeClients; use client::region_handler::RegionRequestHandlerRef; use common_base::Plugins; +use common_config::KvStoreConfig; use common_error::ext::BoxedError; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; +use common_meta::cache_invalidator::DummyCacheInvalidator; +use common_meta::ddl_manager::DdlManager; use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; use common_meta::heartbeat::handler::HandlerGroupExecutor; use common_meta::key::TableMetadataManager; +use common_meta::kv_backend::KvBackendRef; +use common_meta::state_store::KvStateStore; +use common_procedure::local::{LocalManager, ManagerConfig}; +use common_procedure::options::ProcedureConfig; +use common_procedure::ProcedureManagerRef; use common_query::Output; use common_telemetry::logging::info; use common_telemetry::{error, timer}; use datanode::region_server::RegionServer; -use datanode::Instance as DnInstanceRef; -use distributed::DistInstance; +use log_store::raft_engine::RaftEngineBackend; use meta_client::client::{MetaClient, MetaClientBuilder}; -use partition::manager::PartitionRuleManager; -use partition::route::TableRoutes; use query::parser::{PromQuery, QueryLanguageParser, QueryStatement}; use query::plan::LogicalPlan; use query::query_engine::options::{validate_catalog_and_schema, QueryOptions}; use query::query_engine::DescribeResult; use query::{QueryEngineFactory, QueryEngineRef}; +use raft_engine::{Config, ReadableSize, RecoveryMode}; use servers::error as server_error; use servers::error::{AuthSnafu, ExecuteQuerySnafu, ParsePromQLSnafu}; use servers::interceptor::{ PromQueryInterceptor, PromQueryInterceptorRef, SqlQueryInterceptor, SqlQueryInterceptorRef, }; use servers::prometheus_handler::PrometheusHandler; -use servers::query_handler::grpc::{GrpcQueryHandler, GrpcQueryHandlerRef}; +use servers::query_handler::grpc::GrpcQueryHandler; use servers::query_handler::sql::SqlQueryHandler; use servers::query_handler::{ InfluxdbLineProtocolHandler, OpenTelemetryProtocolHandler, OpentsdbProtocolHandler, @@ -72,8 +77,10 @@ use sql::parser::ParserContext; use sql::statements::copy::CopyTable; use sql::statements::statement::Statement; use sqlparser::ast::ObjectName; +pub use standalone::StandaloneDatanodeManager; use self::distributed::DistRegionRequestHandler; +use self::standalone::{StandaloneRegionRequestHandler, StandaloneTableMetadataCreator}; use crate::catalog::FrontendCatalogManager; use crate::delete::Deleter; use crate::error::{ @@ -117,7 +124,6 @@ pub struct Instance { script_executor: Arc, statement_executor: Arc, query_engine: QueryEngineRef, - grpc_query_handler: GrpcQueryHandlerRef, region_request_handler: RegionRequestHandlerRef, create_expr_factory: CreateExprFactory, /// plugins: this map holds extensions to customize query or auth @@ -146,21 +152,11 @@ impl Instance { opts: &FrontendOptions, ) -> Result { let meta_backend = Arc::new(CachedMetaKvBackend::new(meta_client.clone())); - let table_routes = Arc::new(TableRoutes::new(meta_client.clone())); - let partition_manager = Arc::new(PartitionRuleManager::new(table_routes)); - let table_metadata_manager = Arc::new(TableMetadataManager::new(meta_backend.clone())); let catalog_manager = Arc::new(FrontendCatalogManager::new( meta_backend.clone(), meta_backend.clone(), - partition_manager.clone(), datanode_clients.clone(), - table_metadata_manager.clone(), - )); - - let dist_instance = Arc::new(DistInstance::new( - meta_client.clone(), - catalog_manager.clone(), )); let dist_request_handler = DistRegionRequestHandler::arc(catalog_manager.clone()); @@ -173,26 +169,25 @@ impl Instance { ) .query_engine(); - let script_executor = - Arc::new(ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()).await?); - let region_request_handler = DistRegionRequestHandler::arc(catalog_manager.clone()); let statement_executor = Arc::new(StatementExecutor::new( catalog_manager.clone(), query_engine.clone(), - dist_instance.clone(), region_request_handler.clone(), + meta_client.clone(), + meta_backend.clone(), + catalog_manager.clone(), )); plugins.insert::(statement_executor.clone()); + let script_executor = + Arc::new(ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()).await?); + let handlers_executor = HandlerGroupExecutor::new(vec![ Arc::new(ParseMailboxMessageHandler), - Arc::new(InvalidateTableCacheHandler::new( - meta_backend, - partition_manager, - )), + Arc::new(InvalidateTableCacheHandler::new(meta_backend)), ]); let heartbeat_task = Some(HeartbeatTask::new( @@ -212,7 +207,6 @@ impl Instance { statement_executor, query_engine, region_request_handler, - grpc_query_handler: dist_instance, plugins: plugins.clone(), servers: Arc::new(HashMap::new()), heartbeat_task, @@ -256,39 +250,81 @@ impl Instance { Ok(Arc::new(meta_client)) } + pub async fn try_build_standalone_components( + dir: String, + kv_store_config: KvStoreConfig, + procedure_config: ProcedureConfig, + ) -> Result<(KvBackendRef, ProcedureManagerRef)> { + let kv_store = Arc::new( + RaftEngineBackend::try_open_with_cfg(Config { + dir, + purge_threshold: ReadableSize(kv_store_config.purge_threshold.0), + recovery_mode: RecoveryMode::TolerateTailCorruption, + batch_compression_threshold: ReadableSize::kb(8), + target_file_size: ReadableSize(kv_store_config.file_size.0), + ..Default::default() + }) + .map_err(BoxedError::new) + .context(error::OpenRaftEngineBackendSnafu)?, + ); + + let state_store = Arc::new(KvStateStore::new(kv_store.clone())); + + let manager_config = ManagerConfig { + max_retry_times: procedure_config.max_retry_times, + retry_delay: procedure_config.retry_delay, + ..Default::default() + }; + let procedure_manager = Arc::new(LocalManager::new(manager_config, state_store)); + + Ok((kv_store, procedure_manager)) + } + pub async fn try_new_standalone( - _dn_instance: DnInstanceRef, - _region_server: RegionServer, + kv_backend: KvBackendRef, + procedure_manager: ProcedureManagerRef, + catalog_manager: CatalogManagerRef, + query_engine: QueryEngineRef, + region_server: RegionServer, ) -> Result { - todo!() - // let catalog_manager = dn_instance.catalog_manager(); - // let query_engine = dn_instance.query_engine(); - // let script_executor = - // Arc::new(ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()).await?); + let script_executor = + Arc::new(ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()).await?); - // let region_request_handler = StandaloneRegionRequestHandler::arc(region_server); - // let statement_executor = Arc::new(StatementExecutor::new( - // catalog_manager.clone(), - // query_engine.clone(), - // dn_instance.clone(), - // region_request_handler.clone(), - // )); + let region_request_handler = StandaloneRegionRequestHandler::arc(region_server.clone()); - // let create_expr_factory = CreateExprFactory; - // let grpc_query_handler = StandaloneGrpcQueryHandler::arc(dn_instance.clone()); + let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); - // Ok(Instance { - // catalog_manager: catalog_manager.clone(), - // script_executor, - // create_expr_factory, - // statement_executor, - // query_engine, - // grpc_query_handler, - // region_request_handler, - // plugins: Default::default(), - // servers: Arc::new(HashMap::new()), - // heartbeat_task: None, - // }) + let cache_invalidator = Arc::new(DummyCacheInvalidator); + let ddl_executor = Arc::new(DdlManager::new( + procedure_manager, + Arc::new(StandaloneDatanodeManager(region_server)), + cache_invalidator.clone(), + table_metadata_manager.clone(), + Arc::new(StandaloneTableMetadataCreator::new(kv_backend.clone())), + )); + + let statement_executor = Arc::new(StatementExecutor::new( + catalog_manager.clone(), + query_engine.clone(), + region_request_handler.clone(), + ddl_executor, + kv_backend.clone(), + cache_invalidator, + )); + + let create_expr_factory = CreateExprFactory; + + Ok(Instance { + catalog_manager: catalog_manager.clone(), + script_executor, + create_expr_factory, + statement_executor, + query_engine, + region_request_handler, + plugins: Default::default(), + servers: Arc::new(HashMap::new()), + heartbeat_task: None, + }) } pub async fn build_servers(&mut self, opts: &FrontendOptions) -> Result<()> { @@ -311,7 +347,7 @@ impl Instance { let inserter = Inserter::new( self.catalog_manager.as_ref(), &self.create_expr_factory, - &self.grpc_query_handler, + &self.statement_executor, self.region_request_handler.as_ref(), ); inserter.handle_row_inserts(requests, ctx).await @@ -326,7 +362,7 @@ impl Instance { let inserter = Inserter::new( self.catalog_manager.as_ref(), &self.create_expr_factory, - &self.grpc_query_handler, + &self.statement_executor, self.region_request_handler.as_ref(), ); inserter.handle_column_inserts(requests, ctx).await diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index 3e49dae2b1..bff5d87520 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -15,551 +15,25 @@ pub mod deleter; pub(crate) mod inserter; -use std::collections::HashMap; use std::sync::Arc; -use api::helper::ColumnDataTypeWrapper; -use api::v1::ddl_request::Expr as DdlExpr; -use api::v1::greptime_request::Request; use api::v1::region::{region_request, QueryRequest}; -use api::v1::{column_def, AlterExpr, CreateDatabaseExpr, CreateTableExpr, TruncateTableExpr}; -use arrow_flight::Ticket; use async_trait::async_trait; -use catalog::{CatalogManager, DeregisterTableRequest, RegisterTableRequest}; -use chrono::DateTime; use client::error::{HandleRequestSnafu, Result as ClientResult}; -use client::region::RegionRequester; use client::region_handler::RegionRequestHandler; -use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; -use common_catalog::format_full_table_name; use common_error::ext::BoxedError; use common_meta::datanode_manager::AffectedRows; -use common_meta::ddl::{DdlExecutorRef, ExecutorContext}; -use common_meta::key::schema_name::{SchemaNameKey, SchemaNameValue}; -use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse}; -use common_meta::rpc::router::{Partition, Partition as MetaPartition}; -use common_meta::table_name::TableName; -use common_query::Output; use common_recordbatch::SendableRecordBatchStream; -use common_telemetry::info; -use datatypes::prelude::ConcreteDataType; -use datatypes::schema::RawSchema; -use partition::manager::PartitionInfo; -use partition::partition::{PartitionBound, PartitionDef}; -use prost::Message; -use query::error::QueryExecutionSnafu; -use query::query_engine::SqlStatementExecutor; -use servers::query_handler::grpc::GrpcQueryHandler; use session::context::QueryContextRef; -use snafu::{ensure, OptionExt, ResultExt}; -use sql::ast::{Ident, Value as SqlValue}; -use sql::statements::create::{PartitionEntry, Partitions}; -use sql::statements::statement::Statement; -use sql::statements::{self, sql_value_to_value}; +use snafu::{OptionExt, ResultExt}; use store_api::storage::RegionId; -use table::metadata::{RawTableInfo, RawTableMeta, TableId, TableIdent, TableInfo, TableType}; -use table::requests::{AlterTableRequest, TableOptions}; -use table::TableRef; use crate::catalog::FrontendCatalogManager; use crate::error::{ - self, AlterExprToRequestSnafu, CatalogSnafu, ColumnDataTypeSnafu, ColumnNotFoundSnafu, - DeserializePartitionSnafu, FindDatanodeSnafu, FindTableRouteSnafu, NotSupportedSnafu, - ParseSqlSnafu, RequestDatanodeSnafu, Result, SchemaExistsSnafu, TableAlreadyExistSnafu, - TableMetadataManagerSnafu, TableNotFoundSnafu, UnrecognizedTableOptionSnafu, + FindDatanodeSnafu, FindTableRouteSnafu, NotSupportedSnafu, RequestQuerySnafu, Result, }; -use crate::expr_factory; use crate::instance::distributed::deleter::DistDeleter; use crate::instance::distributed::inserter::DistInserter; -use crate::table::{table_idents_to_full_name, DistTable}; - -const MAX_VALUE: &str = "MAXVALUE"; - -#[derive(Clone)] -pub struct DistInstance { - ddl_executor: DdlExecutorRef, - pub(crate) catalog_manager: Arc, -} - -impl DistInstance { - pub fn new(ddl_executor: DdlExecutorRef, catalog_manager: Arc) -> Self { - Self { - ddl_executor, - catalog_manager, - } - } - - pub async fn create_table( - &self, - create_table: &mut CreateTableExpr, - partitions: Option, - ) -> Result { - let _timer = common_telemetry::timer!(crate::metrics::DIST_CREATE_TABLE); - // 1. get schema info - let schema_value = self - .catalog_manager - .table_metadata_manager_ref() - .schema_manager() - .get(SchemaNameKey::new( - &create_table.catalog_name, - &create_table.schema_name, - )) - .await - .context(TableMetadataManagerSnafu)?; - - let table_name = TableName::new( - &create_table.catalog_name, - &create_table.schema_name, - &create_table.table_name, - ); - - let (partitions, partition_cols) = parse_partitions(create_table, partitions)?; - - let mut table_info = create_table_info(create_table, partition_cols, schema_value)?; - - let resp = self - .create_table_procedure(create_table, partitions, table_info.clone()) - .await?; - - let table_id = resp.table_id.context(error::UnexpectedSnafu { - violated: "expected table_id", - })?; - info!("Successfully created distributed table '{table_name}' with table id {table_id}"); - - table_info.ident.table_id = table_id; - - let table_info = Arc::new(table_info.try_into().context(error::CreateTableInfoSnafu)?); - - create_table.table_id = Some(api::v1::TableId { id: table_id }); - - let table = DistTable::table(table_info); - - let request = RegisterTableRequest { - catalog: table_name.catalog_name.clone(), - schema: table_name.schema_name.clone(), - table_name: table_name.table_name.clone(), - table_id, - table: table.clone(), - }; - ensure!( - self.catalog_manager - .register_table(request) - .await - .context(CatalogSnafu)?, - TableAlreadyExistSnafu { - table: table_name.to_string() - } - ); - - // Invalidates local cache ASAP. - self.catalog_manager - .invalidate_table( - &table_name.catalog_name, - &table_name.schema_name, - &table_name.table_name, - table_id, - ) - .await; - - Ok(table) - } - - async fn drop_table(&self, table_name: TableName) -> Result { - let table = self - .catalog_manager - .table( - &table_name.catalog_name, - &table_name.schema_name, - &table_name.table_name, - ) - .await - .context(CatalogSnafu)? - .with_context(|| TableNotFoundSnafu { - table_name: table_name.to_string(), - })?; - let table_id = table.table_info().table_id(); - - self.drop_table_procedure(&table_name, table_id).await?; - - let request = DeregisterTableRequest { - catalog: table_name.catalog_name.clone(), - schema: table_name.schema_name.clone(), - table_name: table_name.table_name.clone(), - }; - self.catalog_manager - .deregister_table(request) - .await - .context(CatalogSnafu)?; - - // Invalidates local cache ASAP. - self.catalog_manager() - .invalidate_table( - &table_name.catalog_name, - &table_name.schema_name, - &table_name.table_name, - table_id, - ) - .await; - - Ok(Output::AffectedRows(1)) - } - - async fn truncate_table(&self, table_name: TableName) -> Result { - let table = self - .catalog_manager - .table( - &table_name.catalog_name, - &table_name.schema_name, - &table_name.table_name, - ) - .await - .context(CatalogSnafu)? - .with_context(|| TableNotFoundSnafu { - table_name: table_name.to_string(), - })?; - let table_id = table.table_info().ident.table_id; - - let expr = TruncateTableExpr { - catalog_name: table_name.catalog_name.clone(), - schema_name: table_name.schema_name.clone(), - table_name: table_name.table_name.clone(), - table_id: Some(api::v1::TableId { id: table_id }), - }; - self.truncate_table_procedure(&expr).await?; - - Ok(Output::AffectedRows(0)) - } - - async fn handle_statement( - &self, - stmt: Statement, - query_ctx: QueryContextRef, - ) -> Result { - match stmt { - Statement::CreateDatabase(stmt) => { - let expr = CreateDatabaseExpr { - database_name: stmt.name.to_string(), - create_if_not_exists: stmt.if_not_exists, - options: Default::default(), - }; - self.handle_create_database(expr, query_ctx).await - } - Statement::CreateTable(stmt) => { - let create_expr = &mut expr_factory::create_to_expr(&stmt, query_ctx)?; - let _ = self.create_table(create_expr, stmt.partitions).await?; - Ok(Output::AffectedRows(0)) - } - Statement::CreateExternalTable(stmt) => { - let create_expr = &mut expr_factory::create_external_expr(stmt, query_ctx).await?; - let _ = self.create_table(create_expr, None).await?; - Ok(Output::AffectedRows(0)) - } - Statement::Alter(alter_table) => { - let expr = expr_factory::to_alter_expr(alter_table, query_ctx)?; - self.handle_alter_table(expr).await - } - Statement::DropTable(stmt) => { - let (catalog, schema, table) = - table_idents_to_full_name(stmt.table_name(), query_ctx) - .map_err(BoxedError::new) - .context(error::ExternalSnafu)?; - let table_name = TableName::new(catalog, schema, table); - self.drop_table(table_name).await - } - Statement::ShowCreateTable(show) => { - let (catalog, schema, table) = - table_idents_to_full_name(&show.table_name, query_ctx.clone()) - .map_err(BoxedError::new) - .context(error::ExternalSnafu)?; - - let table_ref = self - .catalog_manager - .table(&catalog, &schema, &table) - .await - .context(CatalogSnafu)? - .context(TableNotFoundSnafu { table_name: &table })?; - let table_name = TableName::new(catalog, schema, table); - - self.show_create_table(table_name, table_ref, query_ctx.clone()) - .await - } - Statement::TruncateTable(stmt) => { - let (catalog, schema, table) = - table_idents_to_full_name(stmt.table_name(), query_ctx) - .map_err(BoxedError::new) - .context(error::ExternalSnafu)?; - let table_name = TableName::new(catalog, schema, table); - self.truncate_table(table_name).await - } - _ => NotSupportedSnafu { - feat: format!("{stmt:?}"), - } - .fail(), - } - } - - async fn show_create_table( - &self, - table_name: TableName, - table: TableRef, - query_ctx: QueryContextRef, - ) -> Result { - let partitions = self - .catalog_manager - .partition_manager() - .find_table_partitions(table.table_info().table_id()) - .await - .context(error::FindTablePartitionRuleSnafu { - table_name: &table_name.table_name, - })?; - - let partitions = create_partitions_stmt(partitions)?; - - query::sql::show_create_table(table, partitions, query_ctx) - .context(error::ExecuteStatementSnafu) - } - - /// Handles distributed database creation - async fn handle_create_database( - &self, - expr: CreateDatabaseExpr, - query_ctx: QueryContextRef, - ) -> Result { - let catalog = query_ctx.current_catalog(); - if self - .catalog_manager - .schema_exist(catalog, &expr.database_name) - .await - .context(CatalogSnafu)? - { - return if expr.create_if_not_exists { - Ok(Output::AffectedRows(1)) - } else { - SchemaExistsSnafu { - name: &expr.database_name, - } - .fail() - }; - } - - let schema = SchemaNameKey::new(catalog, &expr.database_name); - let exist = self - .catalog_manager - .table_metadata_manager_ref() - .schema_manager() - .exist(schema) - .await - .context(error::TableMetadataManagerSnafu)?; - - ensure!( - !exist, - SchemaExistsSnafu { - name: schema.to_string(), - } - ); - - let schema_value = - SchemaNameValue::try_from(&expr.options).context(error::TableMetadataManagerSnafu)?; - self.catalog_manager - .table_metadata_manager_ref() - .schema_manager() - .create(schema, Some(schema_value)) - .await - .context(error::TableMetadataManagerSnafu)?; - - // Since the database created on meta does not go through KvBackend, so we manually - // invalidate the cache here. - // - // TODO(fys): when the meta invalidation cache mechanism is established, remove it. - self.catalog_manager() - .invalidate_schema(catalog, &expr.database_name) - .await; - - Ok(Output::AffectedRows(1)) - } - - fn verify_alter( - &self, - table_id: TableId, - table_info: Arc, - expr: AlterExpr, - ) -> Result<()> { - let request: table::requests::AlterTableRequest = - common_grpc_expr::alter_expr_to_request(table_id, expr) - .context(AlterExprToRequestSnafu)?; - - let AlterTableRequest { table_name, .. } = &request; - - let _ = table_info - .meta - .builder_with_alter_kind(table_name, &request.alter_kind) - .context(error::TableSnafu)? - .build() - .context(error::BuildTableMetaSnafu { table_name })?; - - Ok(()) - } - - async fn handle_alter_table(&self, expr: AlterExpr) -> Result { - let catalog_name = if expr.catalog_name.is_empty() { - DEFAULT_CATALOG_NAME - } else { - expr.catalog_name.as_str() - }; - - let schema_name = if expr.schema_name.is_empty() { - DEFAULT_SCHEMA_NAME - } else { - expr.schema_name.as_str() - }; - - let table_name = expr.table_name.as_str(); - - let table = self - .catalog_manager - .table(catalog_name, schema_name, table_name) - .await - .context(CatalogSnafu)? - .context(TableNotFoundSnafu { - table_name: format_full_table_name(catalog_name, schema_name, table_name), - })?; - - let table_id = table.table_info().ident.table_id; - self.verify_alter(table_id, table.table_info(), expr.clone())?; - - let req = SubmitDdlTaskRequest { - task: DdlTask::new_alter_table(expr.clone()), - }; - - self.ddl_executor - .submit_ddl_task(&ExecutorContext::default(), req) - .await - .context(error::ExecuteDdlSnafu)?; - - // Invalidates local cache ASAP. - self.catalog_manager() - .invalidate_table(catalog_name, schema_name, table_name, table_id) - .await; - - Ok(Output::AffectedRows(0)) - } - - async fn create_table_procedure( - &self, - create_table: &CreateTableExpr, - partitions: Vec, - table_info: RawTableInfo, - ) -> Result { - let partitions = partitions.into_iter().map(Into::into).collect(); - - let request = SubmitDdlTaskRequest { - task: DdlTask::new_create_table(create_table.clone(), partitions, table_info), - }; - - self.ddl_executor - .submit_ddl_task(&ExecutorContext::default(), request) - .await - .context(error::ExecuteDdlSnafu) - } - - async fn drop_table_procedure( - &self, - table_name: &TableName, - table_id: TableId, - ) -> Result { - let request = SubmitDdlTaskRequest { - task: DdlTask::new_drop_table( - table_name.catalog_name.to_string(), - table_name.schema_name.to_string(), - table_name.table_name.to_string(), - table_id, - ), - }; - - self.ddl_executor - .submit_ddl_task(&ExecutorContext::default(), request) - .await - .context(error::ExecuteDdlSnafu) - } - - async fn truncate_table_procedure( - &self, - truncate_table: &TruncateTableExpr, - ) -> Result { - let request = SubmitDdlTaskRequest { - task: DdlTask::new_truncate_table(truncate_table.clone()), - }; - - self.ddl_executor - .submit_ddl_task(&ExecutorContext::default(), request) - .await - .context(error::ExecuteDdlSnafu) - } - - pub fn catalog_manager(&self) -> Arc { - self.catalog_manager.clone() - } -} - -#[async_trait] -impl SqlStatementExecutor for DistInstance { - async fn execute_sql( - &self, - stmt: Statement, - query_ctx: QueryContextRef, - ) -> query::error::Result { - self.handle_statement(stmt, query_ctx) - .await - .map_err(BoxedError::new) - .context(QueryExecutionSnafu) - } -} - -#[async_trait] -impl GrpcQueryHandler for DistInstance { - type Error = error::Error; - - async fn do_query(&self, request: Request, ctx: QueryContextRef) -> Result { - match request { - Request::Inserts(_) => NotSupportedSnafu { feat: "inserts" }.fail(), - Request::Deletes(_) => NotSupportedSnafu { feat: "deletes" }.fail(), - Request::RowInserts(_) => NotSupportedSnafu { - feat: "row inserts", - } - .fail(), - Request::RowDeletes(_) => NotSupportedSnafu { - feat: "row deletes", - } - .fail(), - Request::Query(_) => { - unreachable!("Query should have been handled directly in Frontend Instance!") - } - Request::Ddl(request) => { - let expr = request.expr.context(error::IncompleteGrpcResultSnafu { - err_msg: "Missing 'expr' in DDL request", - })?; - match expr { - DdlExpr::CreateDatabase(expr) => self.handle_create_database(expr, ctx).await, - DdlExpr::CreateTable(mut expr) => { - let _ = self.create_table(&mut expr, None).await?; - Ok(Output::AffectedRows(0)) - } - DdlExpr::Alter(expr) => self.handle_alter_table(expr).await, - DdlExpr::DropTable(expr) => { - let table_name = - TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name); - self.drop_table(table_name).await - } - DdlExpr::TruncateTable(expr) => { - let table_name = - TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name); - self.truncate_table(table_name).await - } - } - } - } - } -} pub(crate) struct DistRegionRequestHandler { catalog_manager: Arc, @@ -655,288 +129,12 @@ impl DistRegionRequestHandler { .context(FindDatanodeSnafu { region: region_id.region_number(), })?; - let client = self - .catalog_manager - .datanode_clients() - .get_client(peer) - .await; - let ticket = Ticket { - ticket: request.encode_to_vec().into(), - }; - let region_requester = RegionRequester::new(client); - region_requester - .do_get(ticket) + let client = self.catalog_manager.datanode_manager().datanode(peer).await; + + client + .handle_query(request) .await - .context(RequestDatanodeSnafu) - } -} - -fn create_partitions_stmt(partitions: Vec) -> Result> { - if partitions.is_empty() { - return Ok(None); - } - - let column_list: Vec = partitions[0] - .partition - .partition_columns() - .iter() - .map(|name| name[..].into()) - .collect(); - - let entries = partitions - .into_iter() - .map(|info| { - // Generated the partition name from id - let name = &format!("r{}", info.id.region_number()); - let bounds = info.partition.partition_bounds(); - let value_list = bounds - .iter() - .map(|b| match b { - PartitionBound::Value(v) => statements::value_to_sql_value(v) - .with_context(|_| error::ConvertSqlValueSnafu { value: v.clone() }), - PartitionBound::MaxValue => Ok(SqlValue::Number(MAX_VALUE.to_string(), false)), - }) - .collect::>>()?; - - Ok(PartitionEntry { - name: name[..].into(), - value_list, - }) - }) - .collect::>>()?; - - Ok(Some(Partitions { - column_list, - entries, - })) -} - -fn create_table_info( - create_table: &CreateTableExpr, - partition_columns: Vec, - schema_opts: Option, -) -> Result { - let mut column_schemas = Vec::with_capacity(create_table.column_defs.len()); - let mut column_name_to_index_map = HashMap::new(); - - for (idx, column) in create_table.column_defs.iter().enumerate() { - let schema = - column_def::try_as_column_schema(column).context(error::InvalidColumnDefSnafu { - column: &column.name, - })?; - let schema = schema.with_time_index(column.name == create_table.time_index); - - column_schemas.push(schema); - let _ = column_name_to_index_map.insert(column.name.clone(), idx); - } - - let timestamp_index = column_name_to_index_map - .get(&create_table.time_index) - .cloned(); - - let raw_schema = RawSchema { - column_schemas: column_schemas.clone(), - timestamp_index, - version: 0, - }; - - let primary_key_indices = create_table - .primary_keys - .iter() - .map(|name| { - column_name_to_index_map - .get(name) - .cloned() - .context(ColumnNotFoundSnafu { msg: name }) - }) - .collect::>>()?; - - let partition_key_indices = partition_columns - .into_iter() - .map(|col_name| { - column_name_to_index_map - .get(&col_name) - .cloned() - .context(ColumnNotFoundSnafu { msg: col_name }) - }) - .collect::>>()?; - - let table_options = TableOptions::try_from(&create_table.table_options) - .context(UnrecognizedTableOptionSnafu)?; - let table_options = merge_options(table_options, schema_opts); - - let meta = RawTableMeta { - schema: raw_schema, - primary_key_indices, - value_indices: vec![], - engine: create_table.engine.clone(), - next_column_id: column_schemas.len() as u32, - region_numbers: vec![], - engine_options: HashMap::new(), - options: table_options, - created_on: DateTime::default(), - partition_key_indices, - }; - - let desc = if create_table.desc.is_empty() { - None - } else { - Some(create_table.desc.clone()) - }; - - let table_info = RawTableInfo { - ident: TableIdent { - // The table id of distributed table is assigned by Meta, set "0" here as a placeholder. - table_id: 0, - version: 0, - }, - name: create_table.table_name.clone(), - desc, - catalog_name: create_table.catalog_name.clone(), - schema_name: create_table.schema_name.clone(), - meta, - table_type: TableType::Base, - }; - Ok(table_info) -} - -fn merge_options( - mut table_opts: TableOptions, - schema_opts: Option, -) -> TableOptions { - table_opts.ttl = table_opts.ttl.or(schema_opts.and_then(|s| s.ttl)); - table_opts -} - -fn parse_partitions( - create_table: &CreateTableExpr, - partitions: Option, -) -> Result<(Vec, Vec)> { - // If partitions are not defined by user, use the timestamp column (which has to be existed) as - // the partition column, and create only one partition. - let partition_columns = find_partition_columns(&partitions)?; - let partition_entries = find_partition_entries(create_table, &partitions, &partition_columns)?; - - Ok(( - partition_entries - .into_iter() - .map(|x| MetaPartition::try_from(PartitionDef::new(partition_columns.clone(), x))) - .collect::>() - .context(DeserializePartitionSnafu)?, - partition_columns, - )) -} - -fn find_partition_entries( - create_table: &CreateTableExpr, - partitions: &Option, - partition_columns: &[String], -) -> Result>> { - let entries = if let Some(partitions) = partitions { - let column_defs = partition_columns - .iter() - .map(|pc| { - create_table - .column_defs - .iter() - .find(|c| &c.name == pc) - // unwrap is safe here because we have checked that partition columns are defined - .unwrap() - }) - .collect::>(); - let mut column_name_and_type = Vec::with_capacity(column_defs.len()); - for column in column_defs { - let column_name = &column.name; - let data_type = ConcreteDataType::from( - ColumnDataTypeWrapper::try_new(column.data_type).context(ColumnDataTypeSnafu)?, - ); - column_name_and_type.push((column_name, data_type)); - } - - let mut entries = Vec::with_capacity(partitions.entries.len()); - for e in partitions.entries.iter() { - let mut values = Vec::with_capacity(e.value_list.len()); - for (i, v) in e.value_list.iter().enumerate() { - // indexing is safe here because we have checked that "value_list" and "column_list" are matched in size - let (column_name, data_type) = &column_name_and_type[i]; - let v = match v { - SqlValue::Number(n, _) if n == MAX_VALUE => PartitionBound::MaxValue, - _ => PartitionBound::Value( - sql_value_to_value(column_name, data_type, v).context(ParseSqlSnafu)?, - ), - }; - values.push(v); - } - entries.push(values); - } - entries - } else { - vec![vec![PartitionBound::MaxValue]] - }; - Ok(entries) -} - -fn find_partition_columns(partitions: &Option) -> Result> { - let columns = if let Some(partitions) = partitions { - partitions - .column_list - .iter() - .map(|x| x.value.clone()) - .collect::>() - } else { - vec![] - }; - Ok(columns) -} - -#[cfg(test)] -mod test { - use session::context::QueryContext; - use sql::dialect::GreptimeDbDialect; - use sql::parser::ParserContext; - use sql::statements::statement::Statement; - - use super::*; - - #[tokio::test] - async fn test_parse_partitions() { - common_telemetry::init_default_ut_logging(); - let cases = [ - ( - r" -CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) ) -PARTITION BY RANGE COLUMNS (b) ( - PARTITION r0 VALUES LESS THAN ('hz'), - PARTITION r1 VALUES LESS THAN ('sh'), - PARTITION r2 VALUES LESS THAN (MAXVALUE), -) -ENGINE=mito", - r#"[{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"hz\"}}"]},{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"sh\"}}"]},{"column_list":["b"],"value_list":["\"MaxValue\""]}]"#, - ), - ( - r" -CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) ) -PARTITION BY RANGE COLUMNS (b, a) ( - PARTITION r0 VALUES LESS THAN ('hz', 10), - PARTITION r1 VALUES LESS THAN ('sh', 20), - PARTITION r2 VALUES LESS THAN (MAXVALUE, MAXVALUE), -) -ENGINE=mito", - r#"[{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"hz\"}}","{\"Value\":{\"Int32\":10}}"]},{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"sh\"}}","{\"Value\":{\"Int32\":20}}"]},{"column_list":["b","a"],"value_list":["\"MaxValue\"","\"MaxValue\""]}]"#, - ), - ]; - for (sql, expected) in cases { - let result = ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}).unwrap(); - match &result[0] { - Statement::CreateTable(c) => { - let expr = expr_factory::create_to_expr(c, QueryContext::arc()).unwrap(); - let (partitions, _) = parse_partitions(&expr, c.partitions.clone()).unwrap(); - let json = serde_json::to_string(&partitions).unwrap(); - assert_eq!(json, expected); - } - _ => unreachable!(), - } - } + .context(RequestQuerySnafu) } } diff --git a/src/frontend/src/instance/distributed/deleter.rs b/src/frontend/src/instance/distributed/deleter.rs index 7a5c0d489b..befcda4574 100644 --- a/src/frontend/src/instance/distributed/deleter.rs +++ b/src/frontend/src/instance/distributed/deleter.rs @@ -15,7 +15,7 @@ use std::collections::HashMap; use api::v1::region::{region_request, DeleteRequests, RegionRequest, RegionRequestHeader}; -use common_meta::datanode_manager::{AffectedRows, DatanodeManager}; +use common_meta::datanode_manager::AffectedRows; use common_meta::peer::Peer; use futures::future; use metrics::counter; @@ -62,7 +62,7 @@ impl<'a> DistDeleter<'a> { let trace_id = self.trace_id.unwrap_or_default(); let span_id = self.span_id.unwrap_or_default(); let results = future::try_join_all(requests.into_iter().map(|(peer, deletes)| { - let datanode_clients = self.catalog_manager.datanode_clients(); + let datanode_clients = self.catalog_manager.datanode_manager(); common_runtime::spawn_write(async move { let request = RegionRequest { header: Some(RegionRequestHeader { trace_id, span_id }), @@ -135,15 +135,11 @@ mod tests { use common_meta::helper::{CatalogValue, SchemaValue}; use common_meta::key::catalog_name::CatalogNameKey; use common_meta::key::schema_name::SchemaNameKey; - use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::kv_backend::{KvBackend, KvBackendRef}; - use common_meta::rpc::router::{Region, RegionRoute}; use common_meta::rpc::store::PutRequest; - use datatypes::prelude::{ConcreteDataType, VectorRef}; - use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema as DtColumnSchema, Schema}; + use datatypes::prelude::VectorRef; use datatypes::vectors::Int32Vector; - use table::metadata::{RawTableInfo, TableInfoBuilder, TableMetaBuilder}; use super::*; use crate::heartbeat::handler::tests::MockKvCacheInvalidator; @@ -174,72 +170,15 @@ mod tests { backend } - async fn create_testing_table( - table_name: &str, - table_metadata_manager: &TableMetadataManagerRef, - ) { - let schema = Arc::new(Schema::new(vec![ - DtColumnSchema::new("ts", ConcreteDataType::int64_datatype(), false) - .with_time_index(true) - .with_default_constraint(Some(ColumnDefaultConstraint::Function( - "current_timestamp()".to_string(), - ))) - .unwrap(), - DtColumnSchema::new("a", ConcreteDataType::int32_datatype(), true), - DtColumnSchema::new("value", ConcreteDataType::int32_datatype(), false), - ])); - - let table_meta = TableMetaBuilder::default() - .schema(schema) - .primary_key_indices(vec![1]) - .next_column_id(1) - .build() - .unwrap(); - - let table_id = 1; - let table_info: RawTableInfo = TableInfoBuilder::new(table_name, table_meta) - .table_id(table_id) - .build() - .unwrap() - .into(); - - let region_route_factory = |region_id: u64, peer: u64| RegionRoute { - region: Region { - id: region_id.into(), - ..Default::default() - }, - leader_peer: Some(Peer { - id: peer, - addr: String::new(), - }), - follower_peers: vec![], - }; - - let region_routes = vec![ - region_route_factory(1, 1), - region_route_factory(2, 2), - region_route_factory(3, 3), - ]; - table_metadata_manager - .create_table_metadata(table_info, region_routes) - .await - .unwrap(); - } - #[tokio::test] async fn test_split_deletes() { let backend = prepare_mocked_backend().await; - - let table_metadata_manager = Arc::new(TableMetadataManager::new(backend.clone())); - let table_name = "one_column_partitioning_table"; - create_testing_table(table_name, &table_metadata_manager).await; + create_partition_rule_manager(backend.clone()).await; let catalog_manager = Arc::new(FrontendCatalogManager::new( backend, Arc::new(MockKvCacheInvalidator::default()), - create_partition_rule_manager().await, Arc::new(DatanodeClients::default()), - table_metadata_manager, )); let new_delete_request = |vector: VectorRef| -> DeleteRequest { diff --git a/src/frontend/src/instance/distributed/inserter.rs b/src/frontend/src/instance/distributed/inserter.rs index 83d8bf6097..2be5b3fa94 100644 --- a/src/frontend/src/instance/distributed/inserter.rs +++ b/src/frontend/src/instance/distributed/inserter.rs @@ -15,7 +15,7 @@ use std::collections::HashMap; use api::v1::region::{region_request, InsertRequests, RegionRequest, RegionRequestHeader}; -use common_meta::datanode_manager::{AffectedRows, DatanodeManager}; +use common_meta::datanode_manager::AffectedRows; use common_meta::peer::Peer; use futures_util::future; use metrics::counter; @@ -62,7 +62,7 @@ impl<'a> DistInserter<'a> { let trace_id = self.trace_id.unwrap_or_default(); let span_id = self.span_id.unwrap_or_default(); let results = future::try_join_all(requests.into_iter().map(|(peer, inserts)| { - let datanode_clients = self.catalog_manager.datanode_clients(); + let datanode_clients = self.catalog_manager.datanode_manager(); common_runtime::spawn_write(async move { let request = RegionRequest { header: Some(RegionRequestHeader { trace_id, span_id }), @@ -102,14 +102,12 @@ impl<'a> DistInserter<'a> { .find_table_route(table_id) .await .context(FindTableRouteSnafu { table_id })?; + let region_map = table_route.region_map(); for (region_number, insert) in req_splits { - let peer = - table_route - .find_region_leader(region_number) - .context(FindDatanodeSnafu { - region: region_number, - })?; + let peer = *region_map.get(®ion_number).context(FindDatanodeSnafu { + region: region_number, + })?; inserts .entry(peer.clone()) .or_default() @@ -133,14 +131,10 @@ mod tests { use client::client_manager::DatanodeClients; use common_meta::key::catalog_name::{CatalogManager, CatalogNameKey}; use common_meta::key::schema_name::{SchemaManager, SchemaNameKey}; - use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::kv_backend::KvBackendRef; - use common_meta::rpc::router::{Region, RegionRoute}; - use datatypes::prelude::{ConcreteDataType, VectorRef}; - use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema as DtColumnSchema, Schema}; + use datatypes::prelude::VectorRef; use datatypes::vectors::Int32Vector; - use table::metadata::{RawTableInfo, TableInfoBuilder, TableMetaBuilder}; use super::*; use crate::heartbeat::handler::tests::MockKvCacheInvalidator; @@ -164,71 +158,15 @@ mod tests { backend } - async fn create_testing_table( - table_name: &str, - table_metadata_manager: &TableMetadataManagerRef, - ) { - let schema = Arc::new(Schema::new(vec![ - DtColumnSchema::new("ts", ConcreteDataType::int64_datatype(), false) - .with_time_index(true) - .with_default_constraint(Some(ColumnDefaultConstraint::Function( - "current_timestamp()".to_string(), - ))) - .unwrap(), - DtColumnSchema::new("a", ConcreteDataType::int32_datatype(), true), - ])); - - let table_meta = TableMetaBuilder::default() - .schema(schema) - .primary_key_indices(vec![]) - .next_column_id(1) - .build() - .unwrap(); - - let table_id = 1; - let table_info: RawTableInfo = TableInfoBuilder::new(table_name, table_meta) - .table_id(table_id) - .build() - .unwrap() - .into(); - - let region_route_factory = |region_id: u64, peer: u64| RegionRoute { - region: Region { - id: region_id.into(), - ..Default::default() - }, - leader_peer: Some(Peer { - id: peer, - addr: String::new(), - }), - follower_peers: vec![], - }; - - let region_routes = vec![ - region_route_factory(1, 1), - region_route_factory(2, 2), - region_route_factory(3, 3), - ]; - table_metadata_manager - .create_table_metadata(table_info, region_routes) - .await - .unwrap(); - } - #[tokio::test] async fn test_split_inserts() { let backend = prepare_mocked_backend().await; - - let table_metadata_manager = Arc::new(TableMetadataManager::new(backend.clone())); - let table_name = "one_column_partitioning_table"; - create_testing_table(table_name, &table_metadata_manager).await; + create_partition_rule_manager(backend.clone()).await; let catalog_manager = Arc::new(FrontendCatalogManager::new( backend, Arc::new(MockKvCacheInvalidator::default()), - create_partition_rule_manager().await, Arc::new(DatanodeClients::default()), - table_metadata_manager, )); let inserter = DistInserter::new(&catalog_manager); diff --git a/src/frontend/src/instance/grpc.rs b/src/frontend/src/instance/grpc.rs index 97c387a743..5dd76f7eab 100644 --- a/src/frontend/src/instance/grpc.rs +++ b/src/frontend/src/instance/grpc.rs @@ -12,10 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +use api::v1::ddl_request::Expr as DdlExpr; use api::v1::greptime_request::Request; use api::v1::query_request::Query; use async_trait::async_trait; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; +use common_meta::table_name::TableName; use common_query::Output; use query::parser::PromQuery; use servers::interceptor::{GrpcQueryInterceptor, GrpcQueryInterceptorRef}; @@ -24,7 +26,9 @@ use servers::query_handler::sql::SqlQueryHandler; use session::context::QueryContextRef; use snafu::{ensure, OptionExt, ResultExt}; -use crate::error::{Error, IncompleteGrpcResultSnafu, NotSupportedSnafu, PermissionSnafu, Result}; +use crate::error::{ + self, Error, IncompleteGrpcResultSnafu, NotSupportedSnafu, PermissionSnafu, Result, +}; use crate::instance::Instance; #[async_trait] @@ -87,9 +91,41 @@ impl GrpcQueryHandler for Instance { } } } - Request::Ddl(_) => { - GrpcQueryHandler::do_query(self.grpc_query_handler.as_ref(), request, ctx.clone()) - .await? + Request::Ddl(request) => { + let expr = request.expr.context(error::UnexpectedSnafu { + violated: "expected expr", + })?; + + match expr { + DdlExpr::CreateTable(mut expr) => { + // TODO(weny): supports to create multiple region table. + let _ = self + .statement_executor + .create_table_inner(&mut expr, None) + .await?; + Output::AffectedRows(0) + } + DdlExpr::Alter(expr) => self.statement_executor.alter_table_inner(expr).await?, + DdlExpr::CreateDatabase(expr) => { + self.statement_executor + .create_database( + ctx.current_catalog(), + &expr.database_name, + expr.create_if_not_exists, + ) + .await? + } + DdlExpr::DropTable(expr) => { + let table_name = + TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name); + self.statement_executor.drop_table(table_name).await? + } + DdlExpr::TruncateTable(expr) => { + let table_name = + TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name); + self.statement_executor.truncate_table(table_name).await? + } + } } }; diff --git a/src/frontend/src/instance/standalone.rs b/src/frontend/src/instance/standalone.rs index 4710d1c651..fb6b4a2053 100644 --- a/src/frontend/src/instance/standalone.rs +++ b/src/frontend/src/instance/standalone.rs @@ -14,38 +14,31 @@ use std::sync::Arc; -use api::v1::greptime_request::Request; -use api::v1::region::{region_request, QueryRequest}; +use api::v1::meta::Partition; +use api::v1::region::{region_request, QueryRequest, RegionRequest}; use async_trait::async_trait; use client::error::{HandleRequestSnafu, Result as ClientResult}; use client::region::check_response_header; use client::region_handler::RegionRequestHandler; use common_error::ext::BoxedError; -use common_meta::datanode_manager::AffectedRows; -use common_query::Output; +use common_meta::datanode_manager::{AffectedRows, Datanode, DatanodeManager, DatanodeRef}; +use common_meta::ddl::{TableMetadataAllocator, TableMetadataAllocatorContext}; +use common_meta::error::{self as meta_error, Result as MetaResult}; +use common_meta::kv_backend::KvBackendRef; +use common_meta::peer::Peer; +use common_meta::rpc::router::{Region, RegionRoute}; +use common_meta::sequence::{Sequence, SequenceRef}; use common_recordbatch::SendableRecordBatchStream; -use datanode::error::Error as DatanodeError; use datanode::region_server::RegionServer; use servers::grpc::region_server::RegionServerHandler; -use servers::query_handler::grpc::{GrpcQueryHandler, GrpcQueryHandlerRef}; use session::context::QueryContextRef; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; +use store_api::storage::{RegionId, TableId}; +use table::metadata::RawTableInfo; -use crate::error::{Error, InvokeDatanodeSnafu, InvokeRegionServerSnafu, Result}; +use crate::error::InvokeRegionServerSnafu; -pub(crate) struct StandaloneGrpcQueryHandler(GrpcQueryHandlerRef); - -#[async_trait] -impl GrpcQueryHandler for StandaloneGrpcQueryHandler { - type Error = Error; - - async fn do_query(&self, query: Request, ctx: QueryContextRef) -> Result { - self.0 - .do_query(query, ctx) - .await - .context(InvokeDatanodeSnafu) - } -} +const TABLE_ID_SEQ: &str = "table_id"; pub(crate) struct StandaloneRegionRequestHandler { region_server: RegionServer, @@ -85,3 +78,84 @@ impl RegionRequestHandler for StandaloneRegionRequestHandler { .context(HandleRequestSnafu) } } + +pub(crate) struct StandaloneDatanode(pub(crate) RegionServer); + +#[async_trait] +impl Datanode for StandaloneDatanode { + async fn handle(&self, request: RegionRequest) -> MetaResult { + let body = request.body.context(meta_error::UnexpectedSnafu { + err_msg: "body not found", + })?; + let resp = self + .0 + .handle(body) + .await + .map_err(BoxedError::new) + .context(meta_error::ExternalSnafu)?; + + Ok(resp.affected_rows) + } + + async fn handle_query(&self, request: QueryRequest) -> MetaResult { + self.0 + .handle_read(request) + .await + .map_err(BoxedError::new) + .context(meta_error::ExternalSnafu) + } +} + +pub struct StandaloneDatanodeManager(pub RegionServer); + +#[async_trait] +impl DatanodeManager for StandaloneDatanodeManager { + async fn datanode(&self, _datanode: &Peer) -> DatanodeRef { + Arc::new(StandaloneDatanode(self.0.clone())) + } +} + +pub(crate) struct StandaloneTableMetadataCreator { + table_id_sequence: SequenceRef, +} + +impl StandaloneTableMetadataCreator { + pub fn new(kv_backend: KvBackendRef) -> Self { + Self { + table_id_sequence: Arc::new(Sequence::new(TABLE_ID_SEQ, 1024, 10, kv_backend)), + } + } +} + +#[async_trait] +impl TableMetadataAllocator for StandaloneTableMetadataCreator { + async fn create( + &self, + _ctx: &TableMetadataAllocatorContext, + raw_table_info: &mut RawTableInfo, + partitions: &[Partition], + ) -> MetaResult<(TableId, Vec)> { + let table_id = self.table_id_sequence.next().await? as u32; + raw_table_info.ident.table_id = table_id; + let region_routes = partitions + .iter() + .enumerate() + .map(|(i, partition)| { + let region = Region { + id: RegionId::new(table_id, i as u32), + partition: Some(partition.clone().into()), + ..Default::default() + }; + // It's only a placeholder. + let peer = Peer::default(); + RegionRoute { + region, + leader_peer: Some(peer), + follower_peers: vec![], + } + }) + .collect::>(); + + Ok((table_id, region_routes)) + } +} diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index d9279012a2..52d355549a 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -30,3 +30,5 @@ mod server; pub mod service_config; pub mod statement; pub mod table; + +pub const MAX_VALUE: &str = "MAXVALUE"; diff --git a/src/frontend/src/statement.rs b/src/frontend/src/statement.rs index f45f79bdba..7ff2228c7a 100644 --- a/src/frontend/src/statement.rs +++ b/src/frontend/src/statement.rs @@ -15,6 +15,7 @@ mod backup; mod copy_table_from; mod copy_table_to; +mod ddl; mod describe; mod dml; mod show; @@ -22,17 +23,23 @@ mod tql; use std::collections::HashMap; use std::str::FromStr; +use std::sync::Arc; use api::v1::region::region_request; use catalog::CatalogManagerRef; use client::region_handler::RegionRequestHandlerRef; use common_error::ext::BoxedError; +use common_meta::cache_invalidator::CacheInvalidatorRef; +use common_meta::ddl::DdlTaskExecutorRef; +use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; +use common_meta::kv_backend::KvBackendRef; +use common_meta::table_name::TableName; use common_query::Output; use common_time::range::TimestampRange; use common_time::Timestamp; +use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef}; use query::parser::QueryStatement; use query::plan::LogicalPlan; -use query::query_engine::SqlStatementExecutorRef; use query::QueryEngineRef; use session::context::QueryContextRef; use snafu::{OptionExt, ResultExt}; @@ -46,8 +53,8 @@ use table::requests::{ use table::TableRef; use crate::error::{ - self, CatalogSnafu, ExecLogicalPlanSnafu, ExecuteStatementSnafu, ExternalSnafu, - InvalidSqlSnafu, PlanStatementSnafu, RequestDatanodeSnafu, Result, TableNotFoundSnafu, + self, CatalogSnafu, ExecLogicalPlanSnafu, ExternalSnafu, InvalidSqlSnafu, PlanStatementSnafu, + RequestDatanodeSnafu, Result, TableNotFoundSnafu, }; use crate::req_convert::{delete, insert}; use crate::statement::backup::{COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY}; @@ -57,22 +64,30 @@ use crate::table::table_idents_to_full_name; pub struct StatementExecutor { catalog_manager: CatalogManagerRef, query_engine: QueryEngineRef, - sql_stmt_executor: SqlStatementExecutorRef, region_request_handler: RegionRequestHandlerRef, + ddl_executor: DdlTaskExecutorRef, + table_metadata_manager: TableMetadataManagerRef, + partition_manager: PartitionRuleManagerRef, + cache_invalidator: CacheInvalidatorRef, } impl StatementExecutor { pub(crate) fn new( catalog_manager: CatalogManagerRef, query_engine: QueryEngineRef, - sql_stmt_executor: SqlStatementExecutorRef, region_request_handler: RegionRequestHandlerRef, + ddl_task_executor: DdlTaskExecutorRef, + kv_backend: KvBackendRef, + cache_invalidator: CacheInvalidatorRef, ) -> Self { Self { catalog_manager, query_engine, - sql_stmt_executor, region_request_handler, + ddl_executor: ddl_task_executor, + table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())), + partition_manager: Arc::new(PartitionRuleManager::new(kv_backend)), + cache_invalidator, } } @@ -123,17 +138,59 @@ impl StatementExecutor { self.copy_database(to_copy_database_request(arg, &query_ctx)?) .await } - Statement::CreateTable(_) - | Statement::CreateExternalTable(_) - | Statement::CreateDatabase(_) - | Statement::Alter(_) - | Statement::DropTable(_) - | Statement::TruncateTable(_) - | Statement::ShowCreateTable(_) => self - .sql_stmt_executor - .execute_sql(stmt, query_ctx) + + Statement::CreateTable(stmt) => { + let _ = self.create_table(stmt, query_ctx).await?; + Ok(Output::AffectedRows(0)) + } + Statement::CreateExternalTable(stmt) => { + let _ = self.create_external_table(stmt, query_ctx).await?; + Ok(Output::AffectedRows(0)) + } + Statement::Alter(alter_table) => self.alter_table(alter_table, query_ctx).await, + Statement::DropTable(stmt) => { + let (catalog, schema, table) = + table_idents_to_full_name(stmt.table_name(), query_ctx) + .map_err(BoxedError::new) + .context(error::ExternalSnafu)?; + let table_name = TableName::new(catalog, schema, table); + self.drop_table(table_name).await + } + Statement::TruncateTable(stmt) => { + let (catalog, schema, table) = + table_idents_to_full_name(stmt.table_name(), query_ctx) + .map_err(BoxedError::new) + .context(error::ExternalSnafu)?; + let table_name = TableName::new(catalog, schema, table); + self.truncate_table(table_name).await + } + + Statement::CreateDatabase(stmt) => { + self.create_database( + query_ctx.current_catalog(), + &stmt.name.to_string(), + stmt.if_not_exists, + ) .await - .context(ExecuteStatementSnafu), + } + + Statement::ShowCreateTable(show) => { + let (catalog, schema, table) = + table_idents_to_full_name(&show.table_name, query_ctx.clone()) + .map_err(BoxedError::new) + .context(error::ExternalSnafu)?; + + let table_ref = self + .catalog_manager + .table(&catalog, &schema, &table) + .await + .context(error::CatalogSnafu)? + .context(error::TableNotFoundSnafu { table_name: &table })?; + let table_name = TableName::new(catalog, schema, table); + + self.show_create_table(table_name, table_ref, query_ctx) + .await + } } } diff --git a/src/frontend/src/statement/ddl.rs b/src/frontend/src/statement/ddl.rs new file mode 100644 index 0000000000..a250e5febb --- /dev/null +++ b/src/frontend/src/statement/ddl.rs @@ -0,0 +1,626 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use api::helper::ColumnDataTypeWrapper; +use api::v1::{column_def, AlterExpr, CreateTableExpr, TruncateTableExpr}; +use catalog::{CatalogManagerRef, DeregisterTableRequest, RegisterTableRequest}; +use chrono::DateTime; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_catalog::format_full_table_name; +use common_meta::cache_invalidator::Context; +use common_meta::ddl::ExecutorContext; +use common_meta::ident::TableIdent; +use common_meta::key::schema_name::{SchemaNameKey, SchemaNameValue}; +use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse}; +use common_meta::rpc::router::{Partition, Partition as MetaPartition}; +use common_meta::table_name::TableName; +use common_query::Output; +use common_telemetry::info; +use datatypes::prelude::ConcreteDataType; +use datatypes::schema::RawSchema; +use partition::partition::{PartitionBound, PartitionDef}; +use session::context::QueryContextRef; +use snafu::{ensure, OptionExt, ResultExt}; +use sql::ast::Value as SqlValue; +use sql::statements::alter::AlterTable; +use sql::statements::create::{CreateExternalTable, CreateTable, Partitions}; +use sql::statements::sql_value_to_value; +use table::metadata::{self, RawTableInfo, RawTableMeta, TableId, TableInfo, TableType}; +use table::requests::{AlterTableRequest, TableOptions}; +use table::TableRef; + +use super::StatementExecutor; +use crate::error::{ + self, AlterExprToRequestSnafu, CatalogSnafu, ColumnDataTypeSnafu, ColumnNotFoundSnafu, + DeserializePartitionSnafu, ParseSqlSnafu, Result, SchemaNotFoundSnafu, TableAlreadyExistSnafu, + TableMetadataManagerSnafu, TableNotFoundSnafu, UnrecognizedTableOptionSnafu, +}; +use crate::table::DistTable; +use crate::{expr_factory, MAX_VALUE}; + +impl StatementExecutor { + pub fn catalog_manager(&self) -> CatalogManagerRef { + self.catalog_manager.clone() + } + + pub async fn create_table(&self, stmt: CreateTable, ctx: QueryContextRef) -> Result { + let create_expr = &mut expr_factory::create_to_expr(&stmt, ctx)?; + self.create_table_inner(create_expr, stmt.partitions).await + } + + pub async fn create_external_table( + &self, + create_expr: CreateExternalTable, + ctx: QueryContextRef, + ) -> Result { + let create_expr = &mut expr_factory::create_external_expr(create_expr, ctx).await?; + self.create_table_inner(create_expr, None).await + } + + pub async fn create_table_inner( + &self, + create_table: &mut CreateTableExpr, + partitions: Option, + ) -> Result { + let _timer = common_telemetry::timer!(crate::metrics::DIST_CREATE_TABLE); + let schema = self + .table_metadata_manager + .schema_manager() + .get(SchemaNameKey::new( + &create_table.catalog_name, + &create_table.schema_name, + )) + .await + .context(TableMetadataManagerSnafu)?; + + let Some(schema_opts) = schema else { + return SchemaNotFoundSnafu { + schema_info: &create_table.schema_name, + } + .fail(); + }; + + let table_name = TableName::new( + &create_table.catalog_name, + &create_table.schema_name, + &create_table.table_name, + ); + + let (partitions, partition_cols) = parse_partitions(create_table, partitions)?; + + let mut table_info = create_table_info(create_table, partition_cols, schema_opts)?; + + let resp = self + .create_table_procedure(create_table, partitions, table_info.clone()) + .await?; + + let table_id = resp.table_id.context(error::UnexpectedSnafu { + violated: "expected table_id", + })?; + info!("Successfully created distributed table '{table_name}' with table id {table_id}"); + + table_info.ident.table_id = table_id; + let engine = table_info.meta.engine.to_string(); + + let table_info = Arc::new(table_info.try_into().context(error::CreateTableInfoSnafu)?); + create_table.table_id = Some(api::v1::TableId { id: table_id }); + + let table = DistTable::table(table_info); + + let request = RegisterTableRequest { + catalog: table_name.catalog_name.clone(), + schema: table_name.schema_name.clone(), + table_name: table_name.table_name.clone(), + table_id, + table: table.clone(), + }; + ensure!( + self.catalog_manager + .register_table(request) + .await + .context(CatalogSnafu)?, + TableAlreadyExistSnafu { + table: table_name.to_string() + } + ); + + // Invalidates local cache ASAP. + self.cache_invalidator + .invalidate_table( + &Context::default(), + TableIdent { + catalog: table_name.catalog_name.to_string(), + schema: table_name.schema_name.to_string(), + table: table_name.table_name.to_string(), + table_id, + engine, + }, + ) + .await + .context(error::InvalidateTableCacheSnafu)?; + + Ok(table) + } + + pub async fn drop_table(&self, table_name: TableName) -> Result { + let table = self + .catalog_manager + .table( + &table_name.catalog_name, + &table_name.schema_name, + &table_name.table_name, + ) + .await + .context(CatalogSnafu)? + .with_context(|| TableNotFoundSnafu { + table_name: table_name.to_string(), + })?; + let table_id = table.table_info().table_id(); + let engine = table.table_info().meta.engine.to_string(); + self.drop_table_procedure(&table_name, table_id).await?; + + let request = DeregisterTableRequest { + catalog: table_name.catalog_name.clone(), + schema: table_name.schema_name.clone(), + table_name: table_name.table_name.clone(), + }; + self.catalog_manager + .deregister_table(request) + .await + .context(CatalogSnafu)?; + + // Invalidates local cache ASAP. + self.cache_invalidator + .invalidate_table( + &Context::default(), + TableIdent { + catalog: table_name.catalog_name.to_string(), + schema: table_name.schema_name.to_string(), + table: table_name.table_name.to_string(), + table_id, + engine, + }, + ) + .await + .context(error::InvalidateTableCacheSnafu)?; + + Ok(Output::AffectedRows(1)) + } + + pub async fn truncate_table(&self, table_name: TableName) -> Result { + let table = self + .catalog_manager + .table( + &table_name.catalog_name, + &table_name.schema_name, + &table_name.table_name, + ) + .await + .context(CatalogSnafu)? + .with_context(|| TableNotFoundSnafu { + table_name: table_name.to_string(), + })?; + let table_id = table.table_info().ident.table_id; + + let expr = TruncateTableExpr { + catalog_name: table_name.catalog_name.clone(), + schema_name: table_name.schema_name.clone(), + table_name: table_name.table_name.clone(), + table_id: Some(api::v1::TableId { id: table_id }), + }; + self.truncate_table_procedure(&expr).await?; + + Ok(Output::AffectedRows(0)) + } + + fn verify_alter( + &self, + table_id: TableId, + table_info: Arc, + expr: AlterExpr, + ) -> Result<()> { + let request: table::requests::AlterTableRequest = + common_grpc_expr::alter_expr_to_request(table_id, expr) + .context(AlterExprToRequestSnafu)?; + + let AlterTableRequest { table_name, .. } = &request; + + let _ = table_info + .meta + .builder_with_alter_kind(table_name, &request.alter_kind) + .context(error::TableSnafu)? + .build() + .context(error::BuildTableMetaSnafu { table_name })?; + + Ok(()) + } + + pub async fn alter_table( + &self, + alter_table: AlterTable, + query_ctx: QueryContextRef, + ) -> Result { + let expr = expr_factory::to_alter_expr(alter_table, query_ctx)?; + self.alter_table_inner(expr).await + } + + pub async fn alter_table_inner(&self, expr: AlterExpr) -> Result { + let catalog_name = if expr.catalog_name.is_empty() { + DEFAULT_CATALOG_NAME + } else { + expr.catalog_name.as_str() + }; + + let schema_name = if expr.schema_name.is_empty() { + DEFAULT_SCHEMA_NAME + } else { + expr.schema_name.as_str() + }; + + let table_name = expr.table_name.as_str(); + + let table = self + .catalog_manager + .table(catalog_name, schema_name, table_name) + .await + .context(CatalogSnafu)? + .with_context(|| TableNotFoundSnafu { + table_name: format_full_table_name(catalog_name, schema_name, table_name), + })?; + + let table_id = table.table_info().ident.table_id; + let engine = table.table_info().meta.engine.to_string(); + self.verify_alter(table_id, table.table_info(), expr.clone())?; + + let req = SubmitDdlTaskRequest { + task: DdlTask::new_alter_table(expr.clone()), + }; + + self.ddl_executor + .submit_ddl_task(&ExecutorContext::default(), req) + .await + .context(error::ExecuteDdlSnafu)?; + + // Invalidates local cache ASAP. + self.cache_invalidator + .invalidate_table( + &Context::default(), + TableIdent { + catalog: catalog_name.to_string(), + schema: schema_name.to_string(), + table: table_name.to_string(), + table_id, + engine, + }, + ) + .await + .context(error::InvalidateTableCacheSnafu)?; + + Ok(Output::AffectedRows(0)) + } + + async fn create_table_procedure( + &self, + create_table: &CreateTableExpr, + partitions: Vec, + table_info: RawTableInfo, + ) -> Result { + let partitions = partitions.into_iter().map(Into::into).collect(); + + let request = SubmitDdlTaskRequest { + task: DdlTask::new_create_table(create_table.clone(), partitions, table_info), + }; + + self.ddl_executor + .submit_ddl_task(&ExecutorContext::default(), request) + .await + .context(error::ExecuteDdlSnafu) + } + + async fn drop_table_procedure( + &self, + table_name: &TableName, + table_id: TableId, + ) -> Result { + let request = SubmitDdlTaskRequest { + task: DdlTask::new_drop_table( + table_name.catalog_name.to_string(), + table_name.schema_name.to_string(), + table_name.table_name.to_string(), + table_id, + ), + }; + + self.ddl_executor + .submit_ddl_task(&ExecutorContext::default(), request) + .await + .context(error::ExecuteDdlSnafu) + } + + async fn truncate_table_procedure( + &self, + truncate_table: &TruncateTableExpr, + ) -> Result { + let request = SubmitDdlTaskRequest { + task: DdlTask::new_truncate_table(truncate_table.clone()), + }; + + self.ddl_executor + .submit_ddl_task(&ExecutorContext::default(), request) + .await + .context(error::ExecuteDdlSnafu) + } + + pub async fn create_database( + &self, + catalog: &str, + database: &str, + create_if_not_exists: bool, + ) -> Result { + // TODO(weny): considers executing it in the procedures. + let schema_key = SchemaNameKey::new(catalog, database); + let exists = self + .table_metadata_manager + .schema_manager() + .exist(schema_key) + .await + .context(error::TableMetadataManagerSnafu)?; + + if exists { + return if create_if_not_exists { + Ok(Output::AffectedRows(1)) + } else { + error::SchemaExistsSnafu { name: database }.fail() + }; + } + + self.table_metadata_manager + .schema_manager() + .create(schema_key, None) + .await + .context(error::TableMetadataManagerSnafu)?; + + Ok(Output::AffectedRows(1)) + } +} + +fn parse_partitions( + create_table: &CreateTableExpr, + partitions: Option, +) -> Result<(Vec, Vec)> { + // If partitions are not defined by user, use the timestamp column (which has to be existed) as + // the partition column, and create only one partition. + let partition_columns = find_partition_columns(&partitions)?; + let partition_entries = find_partition_entries(create_table, &partitions, &partition_columns)?; + + Ok(( + partition_entries + .into_iter() + .map(|x| MetaPartition::try_from(PartitionDef::new(partition_columns.clone(), x))) + .collect::>() + .context(DeserializePartitionSnafu)?, + partition_columns, + )) +} + +fn create_table_info( + create_table: &CreateTableExpr, + partition_columns: Vec, + schema_opts: SchemaNameValue, +) -> Result { + let mut column_schemas = Vec::with_capacity(create_table.column_defs.len()); + let mut column_name_to_index_map = HashMap::new(); + + for (idx, column) in create_table.column_defs.iter().enumerate() { + let schema = + column_def::try_as_column_schema(column).context(error::InvalidColumnDefSnafu { + column: &column.name, + })?; + let schema = schema.with_time_index(column.name == create_table.time_index); + + column_schemas.push(schema); + let _ = column_name_to_index_map.insert(column.name.clone(), idx); + } + + let timestamp_index = column_name_to_index_map + .get(&create_table.time_index) + .cloned(); + + let raw_schema = RawSchema { + column_schemas: column_schemas.clone(), + timestamp_index, + version: 0, + }; + + let primary_key_indices = create_table + .primary_keys + .iter() + .map(|name| { + column_name_to_index_map + .get(name) + .cloned() + .context(ColumnNotFoundSnafu { msg: name }) + }) + .collect::>>()?; + + let partition_key_indices = partition_columns + .into_iter() + .map(|col_name| { + column_name_to_index_map + .get(&col_name) + .cloned() + .context(ColumnNotFoundSnafu { msg: col_name }) + }) + .collect::>>()?; + + let table_options = TableOptions::try_from(&create_table.table_options) + .context(UnrecognizedTableOptionSnafu)?; + let table_options = merge_options(table_options, schema_opts); + + let meta = RawTableMeta { + schema: raw_schema, + primary_key_indices, + value_indices: vec![], + engine: create_table.engine.clone(), + next_column_id: column_schemas.len() as u32, + region_numbers: vec![], + engine_options: HashMap::new(), + options: table_options, + created_on: DateTime::default(), + partition_key_indices, + }; + + let desc = if create_table.desc.is_empty() { + None + } else { + Some(create_table.desc.clone()) + }; + + let table_info = RawTableInfo { + ident: metadata::TableIdent { + // The table id of distributed table is assigned by Meta, set "0" here as a placeholder. + table_id: 0, + version: 0, + }, + name: create_table.table_name.clone(), + desc, + catalog_name: create_table.catalog_name.clone(), + schema_name: create_table.schema_name.clone(), + meta, + table_type: TableType::Base, + }; + Ok(table_info) +} + +fn find_partition_columns(partitions: &Option) -> Result> { + let columns = if let Some(partitions) = partitions { + partitions + .column_list + .iter() + .map(|x| x.value.clone()) + .collect::>() + } else { + vec![] + }; + Ok(columns) +} + +fn find_partition_entries( + create_table: &CreateTableExpr, + partitions: &Option, + partition_columns: &[String], +) -> Result>> { + let entries = if let Some(partitions) = partitions { + let column_defs = partition_columns + .iter() + .map(|pc| { + create_table + .column_defs + .iter() + .find(|c| &c.name == pc) + // unwrap is safe here because we have checked that partition columns are defined + .unwrap() + }) + .collect::>(); + let mut column_name_and_type = Vec::with_capacity(column_defs.len()); + for column in column_defs { + let column_name = &column.name; + let data_type = ConcreteDataType::from( + ColumnDataTypeWrapper::try_new(column.data_type).context(ColumnDataTypeSnafu)?, + ); + column_name_and_type.push((column_name, data_type)); + } + + let mut entries = Vec::with_capacity(partitions.entries.len()); + for e in partitions.entries.iter() { + let mut values = Vec::with_capacity(e.value_list.len()); + for (i, v) in e.value_list.iter().enumerate() { + // indexing is safe here because we have checked that "value_list" and "column_list" are matched in size + let (column_name, data_type) = &column_name_and_type[i]; + let v = match v { + SqlValue::Number(n, _) if n == MAX_VALUE => PartitionBound::MaxValue, + _ => PartitionBound::Value( + sql_value_to_value(column_name, data_type, v).context(ParseSqlSnafu)?, + ), + }; + values.push(v); + } + entries.push(values); + } + entries + } else { + vec![vec![PartitionBound::MaxValue]] + }; + Ok(entries) +} + +fn merge_options(mut table_opts: TableOptions, schema_opts: SchemaNameValue) -> TableOptions { + table_opts.ttl = table_opts.ttl.or(schema_opts.ttl); + table_opts +} + +#[cfg(test)] +mod test { + use session::context::QueryContext; + use sql::dialect::GreptimeDbDialect; + use sql::parser::ParserContext; + use sql::statements::statement::Statement; + + use super::*; + use crate::expr_factory; + + #[tokio::test] + async fn test_parse_partitions() { + common_telemetry::init_default_ut_logging(); + let cases = [ + ( + r" +CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) ) +PARTITION BY RANGE COLUMNS (b) ( + PARTITION r0 VALUES LESS THAN ('hz'), + PARTITION r1 VALUES LESS THAN ('sh'), + PARTITION r2 VALUES LESS THAN (MAXVALUE), +) +ENGINE=mito", + r#"[{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"hz\"}}"]},{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"sh\"}}"]},{"column_list":["b"],"value_list":["\"MaxValue\""]}]"#, + ), + ( + r" +CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) ) +PARTITION BY RANGE COLUMNS (b, a) ( + PARTITION r0 VALUES LESS THAN ('hz', 10), + PARTITION r1 VALUES LESS THAN ('sh', 20), + PARTITION r2 VALUES LESS THAN (MAXVALUE, MAXVALUE), +) +ENGINE=mito", + r#"[{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"hz\"}}","{\"Value\":{\"Int32\":10}}"]},{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"sh\"}}","{\"Value\":{\"Int32\":20}}"]},{"column_list":["b","a"],"value_list":["\"MaxValue\"","\"MaxValue\""]}]"#, + ), + ]; + for (sql, expected) in cases { + let result = ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}).unwrap(); + match &result[0] { + Statement::CreateTable(c) => { + let expr = expr_factory::create_to_expr(c, QueryContext::arc()).unwrap(); + let (partitions, _) = parse_partitions(&expr, c.partitions.clone()).unwrap(); + let json = serde_json::to_string(&partitions).unwrap(); + assert_eq!(json, expected); + } + _ => unreachable!(), + } + } + } +} diff --git a/src/frontend/src/statement/show.rs b/src/frontend/src/statement/show.rs index 37df294a2a..ff033a5fc1 100644 --- a/src/frontend/src/statement/show.rs +++ b/src/frontend/src/statement/show.rs @@ -12,13 +12,21 @@ // See the License for the specific language governing permissions and // limitations under the License. +use common_meta::table_name::TableName; use common_query::Output; +use partition::manager::PartitionInfo; +use partition::partition::PartitionBound; use session::context::QueryContextRef; use snafu::ResultExt; +use sql::ast::{Ident, Value as SqlValue}; +use sql::statements; +use sql::statements::create::{PartitionEntry, Partitions}; use sql::statements::show::{ShowDatabases, ShowTables}; +use table::TableRef; -use crate::error::{ExecuteStatementSnafu, Result}; +use crate::error::{self, ExecuteStatementSnafu, Result}; use crate::statement::StatementExecutor; +use crate::MAX_VALUE; impl StatementExecutor { pub(super) async fn show_databases( @@ -40,4 +48,64 @@ impl StatementExecutor { .await .context(ExecuteStatementSnafu) } + + pub async fn show_create_table( + &self, + table_name: TableName, + table: TableRef, + query_ctx: QueryContextRef, + ) -> Result { + let partitions = self + .partition_manager + .find_table_partitions(table.table_info().table_id()) + .await + .context(error::FindTablePartitionRuleSnafu { + table_name: &table_name.table_name, + })?; + + let partitions = create_partitions_stmt(partitions)?; + + query::sql::show_create_table(table, partitions, query_ctx) + .context(error::ExecuteStatementSnafu) + } +} + +fn create_partitions_stmt(partitions: Vec) -> Result> { + if partitions.is_empty() { + return Ok(None); + } + + let column_list: Vec = partitions[0] + .partition + .partition_columns() + .iter() + .map(|name| name[..].into()) + .collect(); + + let entries = partitions + .into_iter() + .map(|info| { + // Generated the partition name from id + let name = &format!("r{}", info.id.region_number()); + let bounds = info.partition.partition_bounds(); + let value_list = bounds + .iter() + .map(|b| match b { + PartitionBound::Value(v) => statements::value_to_sql_value(v) + .with_context(|_| error::ConvertSqlValueSnafu { value: v.clone() }), + PartitionBound::MaxValue => Ok(SqlValue::Number(MAX_VALUE.to_string(), false)), + }) + .collect::>>()?; + + Ok(PartitionEntry { + name: name[..].into(), + value_list, + }) + }) + .collect::>>()?; + + Ok(Some(Partitions { + column_list, + entries, + })) } diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index c1c18d7c81..8a19e3627d 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -88,13 +88,17 @@ pub(crate) mod test { use std::collections::BTreeMap; use std::sync::atomic::{AtomicU32, Ordering}; - use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; + use catalog::remote::MetaKvBackend; + use common_meta::key::TableMetadataManager; + use common_meta::kv_backend::memory::MemoryKvBackend; + use common_meta::kv_backend::KvBackendRef; use common_meta::peer::Peer; - use common_meta::rpc::router::{Region, RegionRoute, Table, TableRoute}; - use common_meta::table_name::TableName; + use common_meta::rpc::router::{Region, RegionRoute}; use common_query::prelude::Expr; use datafusion_expr::expr_fn::{and, binary_expr, col, or}; use datafusion_expr::{lit, Operator}; + use datatypes::prelude::ConcreteDataType; + use datatypes::schema::{ColumnSchema, SchemaBuilder}; use meta_client::client::MetaClient; use meter_core::collect::Collect; use meter_core::data::{ReadRecord, WriteRecord}; @@ -104,14 +108,52 @@ pub(crate) mod test { use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef}; use partition::partition::{PartitionBound, PartitionDef}; use partition::range::RangePartitionRule; - use partition::route::TableRoutes; use partition::PartitionRuleRef; use store_api::storage::RegionNumber; + use table::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder}; use table::meter_insert_request; use table::requests::InsertRequest; use super::*; + fn new_test_table_info( + table_id: u32, + table_name: &str, + region_numbers: impl Iterator, + ) -> TableInfo { + let column_schemas = vec![ + ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true), + ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ) + .with_time_index(true), + ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true), + ]; + let schema = SchemaBuilder::try_from(column_schemas) + .unwrap() + .version(123) + .build() + .unwrap(); + + let meta = TableMetaBuilder::default() + .schema(Arc::new(schema)) + .primary_key_indices(vec![0]) + .engine("engine") + .next_column_id(3) + .region_numbers(region_numbers.collect::>()) + .build() + .unwrap(); + TableInfoBuilder::default() + .table_id(table_id) + .table_version(5) + .name(table_name) + .meta(meta) + .build() + .unwrap() + } + /// Create a partition rule manager with two tables, one is partitioned by single column, and /// the other one is two. The tables are under default catalog and schema. /// @@ -128,160 +170,145 @@ pub(crate) mod test { /// PARTITION r2 VALUES LESS THAN (50, 'sh'), /// PARTITION r3 VALUES LESS THAN (MAXVALUE, MAXVALUE), /// ) - pub(crate) async fn create_partition_rule_manager() -> PartitionRuleManagerRef { - let table_routes = Arc::new(TableRoutes::new(Arc::new(MetaClient::default()))); - let partition_manager = Arc::new(PartitionRuleManager::new(table_routes.clone())); + pub(crate) async fn create_partition_rule_manager( + kv_backend: KvBackendRef, + ) -> PartitionRuleManagerRef { + let table_metadata_manager = TableMetadataManager::new(kv_backend.clone()); + let partition_manager = Arc::new(PartitionRuleManager::new(kv_backend)); - let table_name = TableName::new( - DEFAULT_CATALOG_NAME, - DEFAULT_SCHEMA_NAME, - "one_column_partitioning_table", - ); - let table_route = TableRoute::new( - Table { - id: 1, - table_name: table_name.clone(), - table_schema: vec![], - }, - vec![ - RegionRoute { - region: Region { - id: 3.into(), - name: "r1".to_string(), - partition: Some( - PartitionDef::new( - vec!["a".to_string()], - vec![PartitionBound::Value(10_i32.into())], - ) - .try_into() - .unwrap(), - ), - attrs: BTreeMap::new(), + table_metadata_manager + .create_table_metadata( + new_test_table_info(1, "table_1", vec![0u32, 1, 2].into_iter()).into(), + vec![ + RegionRoute { + region: Region { + id: 3.into(), + name: "r1".to_string(), + partition: Some( + PartitionDef::new( + vec!["a".to_string()], + vec![PartitionBound::Value(10_i32.into())], + ) + .try_into() + .unwrap(), + ), + attrs: BTreeMap::new(), + }, + leader_peer: Some(Peer::new(3, "")), + follower_peers: vec![], }, - leader_peer: Some(Peer::new(3, "")), - follower_peers: vec![], - }, - RegionRoute { - region: Region { - id: 2.into(), - name: "r2".to_string(), - partition: Some( - PartitionDef::new( - vec!["a".to_string()], - vec![PartitionBound::Value(50_i32.into())], - ) - .try_into() - .unwrap(), - ), - attrs: BTreeMap::new(), + RegionRoute { + region: Region { + id: 2.into(), + name: "r2".to_string(), + partition: Some( + PartitionDef::new( + vec!["a".to_string()], + vec![PartitionBound::Value(50_i32.into())], + ) + .try_into() + .unwrap(), + ), + attrs: BTreeMap::new(), + }, + leader_peer: Some(Peer::new(2, "")), + follower_peers: vec![], }, - leader_peer: Some(Peer::new(2, "")), - follower_peers: vec![], - }, - RegionRoute { - region: Region { - id: 1.into(), - name: "r3".to_string(), - partition: Some( - PartitionDef::new( - vec!["a".to_string()], - vec![PartitionBound::MaxValue], - ) - .try_into() - .unwrap(), - ), - attrs: BTreeMap::new(), + RegionRoute { + region: Region { + id: 1.into(), + name: "r3".to_string(), + partition: Some( + PartitionDef::new( + vec!["a".to_string()], + vec![PartitionBound::MaxValue], + ) + .try_into() + .unwrap(), + ), + attrs: BTreeMap::new(), + }, + leader_peer: Some(Peer::new(1, "")), + follower_peers: vec![], }, - leader_peer: Some(Peer::new(1, "")), - follower_peers: vec![], - }, - ], - ); - table_routes - .insert_table_route(1, Arc::new(table_route)) - .await; + ], + ) + .await + .unwrap(); - let table_name = TableName::new( - DEFAULT_CATALOG_NAME, - DEFAULT_SCHEMA_NAME, - "two_column_partitioning_table", - ); - let table_route = TableRoute::new( - Table { - id: 2, - table_name: table_name.clone(), - table_schema: vec![], - }, - vec![ - RegionRoute { - region: Region { - id: 1.into(), - name: "r1".to_string(), - partition: Some( - PartitionDef::new( - vec!["a".to_string(), "b".to_string()], - vec![ - PartitionBound::Value(10_i32.into()), - PartitionBound::Value("hz".into()), - ], - ) - .try_into() - .unwrap(), - ), - attrs: BTreeMap::new(), + table_metadata_manager + .create_table_metadata( + new_test_table_info(2, "table_2", vec![0u32, 1, 2].into_iter()).into(), + vec![ + RegionRoute { + region: Region { + id: 1.into(), + name: "r1".to_string(), + partition: Some( + PartitionDef::new( + vec!["a".to_string(), "b".to_string()], + vec![ + PartitionBound::Value(10_i32.into()), + PartitionBound::Value("hz".into()), + ], + ) + .try_into() + .unwrap(), + ), + attrs: BTreeMap::new(), + }, + leader_peer: None, + follower_peers: vec![], }, - leader_peer: None, - follower_peers: vec![], - }, - RegionRoute { - region: Region { - id: 2.into(), - name: "r2".to_string(), - partition: Some( - PartitionDef::new( - vec!["a".to_string(), "b".to_string()], - vec![ - PartitionBound::Value(50_i32.into()), - PartitionBound::Value("sh".into()), - ], - ) - .try_into() - .unwrap(), - ), - attrs: BTreeMap::new(), + RegionRoute { + region: Region { + id: 2.into(), + name: "r2".to_string(), + partition: Some( + PartitionDef::new( + vec!["a".to_string(), "b".to_string()], + vec![ + PartitionBound::Value(50_i32.into()), + PartitionBound::Value("sh".into()), + ], + ) + .try_into() + .unwrap(), + ), + attrs: BTreeMap::new(), + }, + leader_peer: None, + follower_peers: vec![], }, - leader_peer: None, - follower_peers: vec![], - }, - RegionRoute { - region: Region { - id: 3.into(), - name: "r3".to_string(), - partition: Some( - PartitionDef::new( - vec!["a".to_string(), "b".to_string()], - vec![PartitionBound::MaxValue, PartitionBound::MaxValue], - ) - .try_into() - .unwrap(), - ), - attrs: BTreeMap::new(), + RegionRoute { + region: Region { + id: 3.into(), + name: "r3".to_string(), + partition: Some( + PartitionDef::new( + vec!["a".to_string(), "b".to_string()], + vec![PartitionBound::MaxValue, PartitionBound::MaxValue], + ) + .try_into() + .unwrap(), + ), + attrs: BTreeMap::new(), + }, + leader_peer: None, + follower_peers: vec![], }, - leader_peer: None, - follower_peers: vec![], - }, - ], - ); - table_routes - .insert_table_route(2, Arc::new(table_route)) - .await; + ], + ) + .await + .unwrap(); partition_manager } #[tokio::test(flavor = "multi_thread")] async fn test_find_partition_rule() { - let partition_manager = create_partition_rule_manager().await; + let partition_manager = + create_partition_rule_manager(Arc::new(MemoryKvBackend::default())).await; // "one_column_partitioning_table" has id 1 let partition_rule = partition_manager @@ -325,9 +352,10 @@ pub(crate) mod test { #[tokio::test(flavor = "multi_thread")] async fn test_find_regions() { - let partition_manager = Arc::new(PartitionRuleManager::new(Arc::new(TableRoutes::new( - Arc::new(MetaClient::default()), - )))); + let kv_backend = MetaKvBackend { + client: Arc::new(MetaClient::default()), + }; + let partition_manager = Arc::new(PartitionRuleManager::new(Arc::new(kv_backend))); // PARTITION BY RANGE (a) ( // PARTITION r1 VALUES LESS THAN (10), diff --git a/src/log-store/Cargo.toml b/src/log-store/Cargo.toml index 0ad9a63d35..b02b7c6a5a 100644 --- a/src/log-store/Cargo.toml +++ b/src/log-store/Cargo.toml @@ -24,7 +24,7 @@ common-telemetry = { workspace = true } futures-util.workspace = true futures.workspace = true protobuf = { version = "2", features = ["bytes"] } -raft-engine = { git = "https://github.com/tikv/raft-engine.git", rev = "2dcaf5beeea3d5de9ec9c7133a2451d00f508f52" } +raft-engine = { workspace = true } snafu = { version = "0.7", features = ["backtraces"] } store-api = { workspace = true } tokio-util.workspace = true diff --git a/src/log-store/src/raft_engine.rs b/src/log-store/src/raft_engine.rs index 24bb26cfc6..8e453c0d26 100644 --- a/src/log-store/src/raft_engine.rs +++ b/src/log-store/src/raft_engine.rs @@ -23,6 +23,9 @@ use crate::raft_engine::protos::logstore::{EntryImpl, NamespaceImpl}; mod backend; pub mod log_store; +pub use backend::RaftEngineBackend; +pub use raft_engine::Config; + pub mod protos { include!(concat!(env!("OUT_DIR"), concat!("/", "protos/", "mod.rs"))); } diff --git a/src/log-store/src/raft_engine/backend.rs b/src/log-store/src/raft_engine/backend.rs index d7479799b5..31a002a952 100644 --- a/src/log-store/src/raft_engine/backend.rs +++ b/src/log-store/src/raft_engine/backend.rs @@ -28,10 +28,10 @@ use common_meta::rpc::store::{ RangeRequest, RangeResponse, }; use common_meta::rpc::KeyValue; -use raft_engine::{Engine, LogBatch}; +use raft_engine::{Config, Engine, LogBatch}; use snafu::ResultExt; -use crate::error::RaftEngineSnafu; +use crate::error::{self, RaftEngineSnafu}; pub(crate) const SYSTEM_NAMESPACE: u64 = 0; @@ -40,6 +40,15 @@ pub struct RaftEngineBackend { engine: RwLock, } +impl RaftEngineBackend { + pub fn try_open_with_cfg(config: Config) -> error::Result { + let engine = Engine::open(config).context(RaftEngineSnafu)?; + Ok(Self { + engine: RwLock::new(engine), + }) + } +} + #[async_trait::async_trait] impl TxnService for RaftEngineBackend { type Error = meta_error::Error; diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index 015e87c9bb..ab6700af04 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -23,7 +23,7 @@ mod store; use api::v1::meta::Role; use common_error::ext::BoxedError; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; -use common_meta::ddl::{DdlExecutor, ExecutorContext}; +use common_meta::ddl::{DdlTaskExecutor, ExecutorContext}; use common_meta::error::{self as meta_error, Result as MetaResult}; use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse}; use common_meta::rpc::lock::{LockRequest, LockResponse, UnlockRequest}; @@ -178,7 +178,7 @@ pub struct MetaClient { } #[async_trait::async_trait] -impl DdlExecutor for MetaClient { +impl DdlTaskExecutor for MetaClient { async fn submit_ddl_task( &self, _ctx: &ExecutorContext, diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 9b53f8a263..fb320b8afb 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -22,7 +22,7 @@ use api::v1::meta::Peer; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_greptimedb_telemetry::GreptimeDBTelemetryTask; use common_grpc::channel_manager; -use common_meta::ddl::DdlExecutorRef; +use common_meta::ddl::DdlTaskExecutorRef; use common_meta::key::TableMetadataManagerRef; use common_meta::sequence::SequenceRef; use common_procedure::options::ProcedureConfig; @@ -198,7 +198,7 @@ pub struct MetaSrv { procedure_manager: ProcedureManagerRef, metadata_service: MetadataServiceRef, mailbox: MailboxRef, - ddl_executor: DdlExecutorRef, + ddl_executor: DdlTaskExecutorRef, table_metadata_manager: TableMetadataManagerRef, greptimedb_telemetry_task: Arc, pubsub: Option<(PublishRef, SubscribeManagerRef)>, @@ -350,7 +350,7 @@ impl MetaSrv { &self.mailbox } - pub fn ddl_executor(&self) -> &DdlExecutorRef { + pub fn ddl_executor(&self) -> &DdlTaskExecutorRef { &self.ddl_executor } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 196c1ce14b..ae308a8c76 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -54,7 +54,7 @@ use crate::service::mailbox::MailboxRef; use crate::service::store::cached_kv::{CheckLeader, LeaderCachedKvStore}; use crate::service::store::kv::{KvBackendAdapter, KvStoreRef, ResettableKvStoreRef}; use crate::service::store::memory::MemStore; -use crate::table_creator::MetaSrvTableCreator; +use crate::table_creator::MetaSrvTableMetadataAllocator; // TODO(fys): try use derive_builder macro pub struct MetaSrvBuilder { @@ -366,7 +366,7 @@ fn build_ddl_manager( }, )); - let table_creator = Arc::new(MetaSrvTableCreator::new( + let table_creator = Arc::new(MetaSrvTableMetadataAllocator::new( selector_ctx.clone(), selector.clone(), table_id_sequence.clone(), diff --git a/src/meta-srv/src/table_creator.rs b/src/meta-srv/src/table_creator.rs index 1c56dd07aa..abbc729b8d 100644 --- a/src/meta-srv/src/table_creator.rs +++ b/src/meta-srv/src/table_creator.rs @@ -15,7 +15,7 @@ use api::v1::meta::Partition; use common_catalog::format_full_table_name; use common_error::ext::BoxedError; -use common_meta::ddl::{TableCreator, TableCreatorContext}; +use common_meta::ddl::{TableMetadataAllocator, TableMetadataAllocatorContext}; use common_meta::error::{self as meta_error, Result as MetaResult}; use common_meta::rpc::router::{Region, RegionRoute}; use common_meta::sequence::SequenceRef; @@ -27,13 +27,13 @@ use table::metadata::RawTableInfo; use crate::error::{self, Result, TooManyPartitionsSnafu}; use crate::metasrv::{SelectorContext, SelectorRef}; -pub struct MetaSrvTableCreator { +pub struct MetaSrvTableMetadataAllocator { ctx: SelectorContext, selector: SelectorRef, table_id_sequence: SequenceRef, } -impl MetaSrvTableCreator { +impl MetaSrvTableMetadataAllocator { pub fn new( ctx: SelectorContext, selector: SelectorRef, @@ -48,10 +48,10 @@ impl MetaSrvTableCreator { } #[async_trait::async_trait] -impl TableCreator for MetaSrvTableCreator { +impl TableMetadataAllocator for MetaSrvTableMetadataAllocator { async fn create( &self, - ctx: &TableCreatorContext, + ctx: &TableMetadataAllocatorContext, raw_table_info: &mut RawTableInfo, partitions: &[Partition], ) -> MetaResult<(TableId, Vec)> { diff --git a/src/partition/src/error.rs b/src/partition/src/error.rs index fd3710f178..37a6f250be 100644 --- a/src/partition/src/error.rs +++ b/src/partition/src/error.rs @@ -25,6 +25,12 @@ use table::metadata::TableId; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { + #[snafu(display("Table route manager error: {}", source))] + TableRouteManager { + source: common_meta::error::Error, + location: Location, + }, + #[snafu(display("Failed to get meta info from cache, error: {}", err_msg))] GetCache { err_msg: String, location: Location }, @@ -156,6 +162,7 @@ impl ErrorExt for Error { Error::ConvertScalarValue { .. } => StatusCode::Internal, Error::FindDatanode { .. } => StatusCode::InvalidArguments, Error::CreateDefaultToRead { source, .. } => source.status_code(), + Error::TableRouteManager { source, .. } => source.status_code(), Error::MissingDefaultValue { .. } => StatusCode::Internal, } } diff --git a/src/partition/src/manager.rs b/src/partition/src/manager.rs index f2b8856a36..297447f794 100644 --- a/src/partition/src/manager.rs +++ b/src/partition/src/manager.rs @@ -16,8 +16,10 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; use api::v1::region::{DeleteRequest, InsertRequest}; +use common_meta::key::table_route::TableRouteManager; +use common_meta::kv_backend::KvBackendRef; use common_meta::peer::Peer; -use common_meta::rpc::router::TableRoute; +use common_meta::rpc::router::{convert_to_region_map, RegionRoutes}; use common_query::prelude::Expr; use datafusion_expr::{BinaryExpr, Expr as DfExpr, Operator}; use datatypes::prelude::Value; @@ -29,7 +31,6 @@ use crate::columns::RangeColumnsPartitionRule; use crate::error::{FindLeaderSnafu, Result}; use crate::partition::{PartitionBound, PartitionDef, PartitionExpr}; use crate::range::RangePartitionRule; -use crate::route::TableRoutes; use crate::splitter::{DeleteRequestSplits, InsertRequestSplits, RowSplitter}; use crate::{error, PartitionRuleRef}; @@ -47,7 +48,7 @@ pub type PartitionRuleManagerRef = Arc; /// - values (in case of insertion) /// - filters (in case of select, deletion and update) pub struct PartitionRuleManager { - table_routes: Arc, + table_route_manager: TableRouteManager, } #[derive(Debug)] @@ -56,25 +57,23 @@ pub struct PartitionInfo { pub partition: PartitionDef, } -#[async_trait::async_trait] -impl TableRouteCacheInvalidator for PartitionRuleManager { - async fn invalidate_table_route(&self, table: TableId) { - self.table_routes.invalidate_table_route(table).await - } -} - impl PartitionRuleManager { - pub fn new(table_routes: Arc) -> Self { - Self { table_routes } - } - - pub fn table_routes(&self) -> &TableRoutes { - self.table_routes.as_ref() + pub fn new(kv_backend: KvBackendRef) -> Self { + Self { + table_route_manager: TableRouteManager::new(kv_backend), + } } /// Find table route of given table name. - pub async fn find_table_route(&self, table: TableId) -> Result> { - self.table_routes.get_route(table).await + pub async fn find_table_route(&self, table_id: TableId) -> Result { + let route = self + .table_route_manager + .get(table_id) + .await + .context(error::TableRouteManagerSnafu)? + .context(error::FindTableRoutesSnafu { table_id })?; + + Ok(RegionRoutes(route.region_routes)) } /// Find datanodes of corresponding regions of given table. @@ -83,15 +82,19 @@ impl PartitionRuleManager { table_id: TableId, regions: Vec, ) -> Result>> { - let route = self.table_routes.get_route(table_id).await?; + let route = self + .table_route_manager + .get(table_id) + .await + .context(error::TableRouteManagerSnafu)? + .context(error::FindTableRoutesSnafu { table_id })?; let mut datanodes = HashMap::with_capacity(regions.len()); + let region_map = convert_to_region_map(&route.region_routes); for region in regions.iter() { - let datanode = route - .find_region_leader(*region) - .context(error::FindDatanodeSnafu { - table_id, - region: *region, - })?; + let datanode = *region_map.get(region).context(error::FindDatanodeSnafu { + table_id, + region: *region, + })?; datanodes .entry(datanode.clone()) .or_insert_with(Vec::new) @@ -102,8 +105,14 @@ impl PartitionRuleManager { /// Find all leader peers of given table. pub async fn find_table_region_leaders(&self, table_id: TableId) -> Result> { - let route = self.table_routes.get_route(table_id).await?; + let route = self + .table_route_manager + .get(table_id) + .await + .context(error::TableRouteManagerSnafu)? + .context(error::FindTableRoutesSnafu { table_id })?; let mut peers = Vec::with_capacity(route.region_routes.len()); + for peer in &route.region_routes { peers.push(peer.leader_peer.clone().with_context(|| FindLeaderSnafu { region_id: peer.region.id, @@ -115,7 +124,12 @@ impl PartitionRuleManager { } pub async fn find_table_partitions(&self, table_id: TableId) -> Result> { - let route = self.table_routes.get_route(table_id).await?; + let route = self + .table_route_manager + .get(table_id) + .await + .context(error::TableRouteManagerSnafu)? + .context(error::FindTableRoutesSnafu { table_id })?; ensure!( !route.region_routes.is_empty(), error::FindTableRoutesSnafu { table_id }