From bd377ef32908e9a957ea7e74bc2cad71ac5cb7e0 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Mon, 27 Feb 2023 11:49:23 +0800 Subject: [PATCH] feat: Procedure to create table and register table to catalog (#1040) * feat: Add table-procedures crate * feat: Implement procedure to create table * feat: Integrate procedure manager to datanode * test: Test CreateTableProcedure * refactor: Rename table-procedures to table-procedure * feat: Implement create_table_by_procedure * chore: Remove comment * chore: Add todo * feat: Add procedure config to standalone mode * feat: Register table-procedure loaders * feat: Address review comments CreateTableProcedure just return error if the subprocedure is failed * chore: Address CR comments --- Cargo.lock | 24 ++ Cargo.toml | 1 + config/datanode.example.toml | 4 + config/standalone.example.toml | 5 +- src/cmd/src/datanode.rs | 11 +- src/cmd/src/standalone.rs | 5 +- src/common/procedure/src/local.rs | 2 - src/datanode/Cargo.toml | 2 + src/datanode/src/datanode.rs | 23 ++ src/datanode/src/error.rs | 30 +++ src/datanode/src/instance.rs | 46 +++- src/datanode/src/mock.rs | 4 +- src/datanode/src/sql.rs | 17 +- src/datanode/src/sql/create.rs | 50 +++- src/datanode/src/tests/test_util.rs | 8 +- src/table-procedure/Cargo.toml | 25 ++ src/table-procedure/src/create.rs | 388 +++++++++++++++++++++++++++ src/table-procedure/src/error.rs | 89 ++++++ src/table-procedure/src/lib.rs | 43 +++ src/table-procedure/src/test_util.rs | 73 +++++ 20 files changed, 834 insertions(+), 16 deletions(-) create mode 100644 src/table-procedure/Cargo.toml create mode 100644 src/table-procedure/src/create.rs create mode 100644 src/table-procedure/src/error.rs create mode 100644 src/table-procedure/src/lib.rs create mode 100644 src/table-procedure/src/test_util.rs diff --git a/Cargo.lock b/Cargo.lock index aa2a385182..a54d3b0ef7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2186,6 +2186,7 @@ dependencies = [ "common-error", "common-grpc", "common-grpc-expr", + "common-procedure", "common-query", "common-recordbatch", "common-runtime", @@ -2218,6 +2219,7 @@ dependencies = [ "store-api", "substrait 0.1.0", "table", + "table-procedure", "tempdir", "tokio", "tokio-stream", @@ -7473,6 +7475,28 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "table-procedure" +version = "0.1.0" +dependencies = [ + "async-trait", + "catalog", + "common-error", + "common-procedure", + "common-telemetry", + "datatypes", + "log-store", + "mito", + "object-store", + "serde", + "serde_json", + "snafu", + "storage", + "table", + "tempdir", + "tokio", +] + [[package]] name = "tagptr" version = "0.2.0" diff --git a/Cargo.toml b/Cargo.toml index d1f730d0a2..bab6f28f2d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,6 +37,7 @@ members = [ "src/storage", "src/store-api", "src/table", + "src/table-procedure", "tests-integration", "tests/runner", ] diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 83bcfc4d5d..6a4ac171eb 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -29,3 +29,7 @@ tcp_nodelay = false max_inflight_tasks = 4 max_files_in_level0 = 16 max_purge_tasks = 32 + +[procedure.store] +type = 'File' +data_dir = '/tmp/greptimedb/procedure/' diff --git a/config/standalone.example.toml b/config/standalone.example.toml index af6ca0bcfa..f22de3c589 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -14,7 +14,6 @@ purge_threshold = '50GB' read_batch_size = 128 sync_write = false - [storage] type = 'File' data_dir = '/tmp/greptimedb/data/' @@ -42,3 +41,7 @@ enable = true addr = '127.0.0.1:4003' runtime_size = 2 check_pwd = false + +[procedure.store] +type = 'File' +data_dir = '/tmp/greptimedb/procedure/' diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index 1dc71d3313..05e70d8b41 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -14,7 +14,9 @@ use clap::Parser; use common_telemetry::logging; -use datanode::datanode::{Datanode, DatanodeOptions, FileConfig, ObjectStoreConfig}; +use datanode::datanode::{ + Datanode, DatanodeOptions, FileConfig, ObjectStoreConfig, ProcedureConfig, +}; use meta_client::MetaClientOptions; use servers::Mode; use snafu::ResultExt; @@ -65,6 +67,8 @@ struct StartCommand { data_dir: Option, #[clap(long)] wal_dir: Option, + #[clap(long)] + procedure_dir: Option, } impl StartCommand { @@ -134,6 +138,11 @@ impl TryFrom for DatanodeOptions { if let Some(wal_dir) = cmd.wal_dir { opts.wal.dir = wal_dir; } + + if let Some(procedure_dir) = cmd.procedure_dir { + opts.procedure = Some(ProcedureConfig::from_file_path(procedure_dir)); + } + Ok(opts) } } diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 1be6ad8979..77fef7481b 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -18,7 +18,7 @@ use clap::Parser; use common_base::Plugins; use common_telemetry::info; use datanode::datanode::{ - CompactionConfig, Datanode, DatanodeOptions, ObjectStoreConfig, WalConfig, + CompactionConfig, Datanode, DatanodeOptions, ObjectStoreConfig, ProcedureConfig, WalConfig, }; use datanode::instance::InstanceRef; use frontend::frontend::{Frontend, FrontendOptions}; @@ -81,6 +81,7 @@ pub struct StandaloneOptions { pub wal: WalConfig, pub storage: ObjectStoreConfig, pub compaction: CompactionConfig, + pub procedure: Option, } impl Default for StandaloneOptions { @@ -99,6 +100,7 @@ impl Default for StandaloneOptions { wal: WalConfig::default(), storage: ObjectStoreConfig::default(), compaction: CompactionConfig::default(), + procedure: None, } } } @@ -125,6 +127,7 @@ impl StandaloneOptions { wal: self.wal, storage: self.storage, compaction: self.compaction, + procedure: self.procedure, ..Default::default() } } diff --git a/src/common/procedure/src/local.rs b/src/common/procedure/src/local.rs index 7022231351..94686a01aa 100644 --- a/src/common/procedure/src/local.rs +++ b/src/common/procedure/src/local.rs @@ -123,8 +123,6 @@ pub(crate) struct ManagerContext { loaders: Mutex>, lock_map: LockMap, procedures: RwLock>, - // TODO(yingwen): Now we never clean the messages. But when the root procedure is done, we - // should be able to remove the its message and all its child messages. /// Messages loaded from the procedure store. messages: Mutex>, } diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index 355da9c046..66f42a7963 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -21,6 +21,7 @@ common-catalog = { path = "../common/catalog" } common-error = { path = "../common/error" } common-grpc = { path = "../common/grpc" } common-grpc-expr = { path = "../common/grpc-expr" } +common-procedure = { path = "../common/procedure" } common-query = { path = "../common/query" } common-recordbatch = { path = "../common/recordbatch" } common-runtime = { path = "../common/runtime" } @@ -52,6 +53,7 @@ storage = { path = "../storage" } store-api = { path = "../store-api" } substrait = { path = "../common/substrait" } table = { path = "../table" } +table-procedure = { path = "../table-procedure" } tokio.workspace = true tokio-stream = { version = "0.1", features = ["net"] } tonic.workspace = true diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index daa8822d5c..19d5d8a735 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -144,6 +144,27 @@ impl From<&DatanodeOptions> for StorageEngineConfig { } } +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default)] +pub struct ProcedureConfig { + /// Storage config for procedure manager. + pub store: ObjectStoreConfig, +} + +impl Default for ProcedureConfig { + fn default() -> ProcedureConfig { + ProcedureConfig::from_file_path("/tmp/greptimedb/procedure/".to_string()) + } +} + +impl ProcedureConfig { + pub fn from_file_path(path: String) -> ProcedureConfig { + ProcedureConfig { + store: ObjectStoreConfig::File(FileConfig { data_dir: path }), + } + } +} + #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] pub struct DatanodeOptions { @@ -159,6 +180,7 @@ pub struct DatanodeOptions { pub wal: WalConfig, pub storage: ObjectStoreConfig, pub compaction: CompactionConfig, + pub procedure: Option, } impl Default for DatanodeOptions { @@ -176,6 +198,7 @@ impl Default for DatanodeOptions { wal: WalConfig::default(), storage: ObjectStoreConfig::default(), compaction: CompactionConfig::default(), + procedure: None, } } } diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 4e571159ae..7d481bdb53 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -15,6 +15,7 @@ use std::any::Any; use common_error::prelude::*; +use common_procedure::ProcedureId; use common_recordbatch::error::Error as RecordBatchError; use datafusion::parquet; use datatypes::prelude::ConcreteDataType; @@ -394,6 +395,31 @@ pub enum Error { #[snafu(backtrace)] source: table::error::Error, }, + + #[snafu(display("Failed to recover procedure, source: {}", source))] + RecoverProcedure { + #[snafu(backtrace)] + source: common_procedure::error::Error, + }, + + #[snafu(display("Failed to submit procedure, source: {}", source))] + SubmitProcedure { + #[snafu(backtrace)] + source: common_procedure::error::Error, + }, + + #[snafu(display("Failed to wait procedure done, source: {}", source))] + WaitProcedure { + source: tokio::sync::watch::error::RecvError, + backtrace: Backtrace, + }, + + // TODO(yingwen): Use procedure's error. + #[snafu(display("Failed to execute procedure, procedure_id: {}", procedure_id))] + ProcedureExec { + procedure_id: ProcedureId, + backtrace: Backtrace, + }, } pub type Result = std::result::Result; @@ -470,6 +496,10 @@ impl ErrorExt for Error { CopyTable { source, .. } => source.status_code(), TableScanExec { source, .. } => source.status_code(), UnrecognizedTableOption { .. } => StatusCode::InvalidArguments, + RecoverProcedure { source, .. } | SubmitProcedure { source, .. } => { + source.status_code() + } + WaitProcedure { .. } | ProcedureExec { .. } => StatusCode::Internal, } } diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 7cfc10b3b3..5c12c44aac 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -21,6 +21,8 @@ use catalog::{CatalogManager, CatalogManagerRef, RegisterTableRequest}; use common_base::readable_size::ReadableSize; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID}; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; +use common_procedure::local::{LocalManager, ManagerConfig}; +use common_procedure::ProcedureManagerRef; use common_telemetry::logging::info; use log_store::raft_engine::log_store::RaftEngineLogStore; use log_store::LogConfig; @@ -45,11 +47,11 @@ use table::table::TableIdProviderRef; use table::Table; use crate::datanode::{ - DatanodeOptions, ObjectStoreConfig, WalConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE, + DatanodeOptions, ObjectStoreConfig, ProcedureConfig, WalConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE, }; use crate::error::{ self, CatalogSnafu, MetaClientInitSnafu, MissingMetasrvOptsSnafu, MissingNodeIdSnafu, - NewCatalogSnafu, OpenLogStoreSnafu, Result, + NewCatalogSnafu, OpenLogStoreSnafu, RecoverProcedureSnafu, Result, }; use crate::heartbeat::HeartbeatTask; use crate::script::ScriptExecutor; @@ -173,12 +175,32 @@ impl Instance { catalog_manager.clone(), )), }; + + let procedure_manager = create_procedure_manager(&opts.procedure).await?; + // Recover procedures. + if let Some(procedure_manager) = &procedure_manager { + table_engine.register_procedure_loaders(&**procedure_manager); + table_procedure::register_procedure_loaders( + catalog_manager.clone(), + table_engine.clone(), + table_engine.clone(), + &**procedure_manager, + ); + + procedure_manager + .recover() + .await + .context(RecoverProcedureSnafu)?; + } + Ok(Self { query_engine: query_engine.clone(), sql_handler: SqlHandler::new( - table_engine, + table_engine.clone(), catalog_manager.clone(), query_engine.clone(), + table_engine, + procedure_manager, ), catalog_manager, script_executor, @@ -400,3 +422,21 @@ pub(crate) async fn create_log_store(wal_config: &WalConfig) -> Result, +) -> Result> { + let Some(procedure_config) = procedure_config else { + return Ok(None); + }; + + info!( + "Creating procedure manager with config: {:?}", + procedure_config + ); + + let object_store = new_object_store(&procedure_config.store).await?; + let manager_config = ManagerConfig { object_store }; + + Ok(Some(Arc::new(LocalManager::new(manager_config)))) +} diff --git a/src/datanode/src/mock.rs b/src/datanode/src/mock.rs index c54491f800..c083514bca 100644 --- a/src/datanode/src/mock.rs +++ b/src/datanode/src/mock.rs @@ -96,9 +96,11 @@ impl Instance { Ok(Self { query_engine: query_engine.clone(), sql_handler: SqlHandler::new( - table_engine, + table_engine.clone(), catalog_manager.clone(), query_engine.clone(), + table_engine, + None, ), catalog_manager, script_executor, diff --git a/src/datanode/src/sql.rs b/src/datanode/src/sql.rs index 72b4342dee..6651560cae 100644 --- a/src/datanode/src/sql.rs +++ b/src/datanode/src/sql.rs @@ -13,6 +13,7 @@ // limitations under the License. use catalog::CatalogManagerRef; +use common_procedure::ProcedureManagerRef; use common_query::Output; use common_telemetry::error; use query::query_engine::QueryEngineRef; @@ -23,7 +24,7 @@ use sql::statements::delete::Delete; use sql::statements::describe::DescribeTable; use sql::statements::explain::Explain; use sql::statements::show::{ShowDatabases, ShowTables}; -use table::engine::{EngineContext, TableEngineRef, TableReference}; +use table::engine::{EngineContext, TableEngineProcedureRef, TableEngineRef, TableReference}; use table::requests::*; use table::TableRef; @@ -57,6 +58,8 @@ pub struct SqlHandler { table_engine: TableEngineRef, catalog_manager: CatalogManagerRef, query_engine: QueryEngineRef, + engine_procedure: TableEngineProcedureRef, + procedure_manager: Option, } impl SqlHandler { @@ -64,11 +67,15 @@ impl SqlHandler { table_engine: TableEngineRef, catalog_manager: CatalogManagerRef, query_engine: QueryEngineRef, + engine_procedure: TableEngineProcedureRef, + procedure_manager: Option, ) -> Self { Self { table_engine, catalog_manager, query_engine, + engine_procedure, + procedure_manager, } } @@ -250,7 +257,13 @@ mod tests { let factory = QueryEngineFactory::new(catalog_list.clone()); let query_engine = factory.query_engine(); - let sql_handler = SqlHandler::new(table_engine, catalog_list.clone(), query_engine.clone()); + let sql_handler = SqlHandler::new( + table_engine.clone(), + catalog_list.clone(), + query_engine.clone(), + table_engine, + None, + ); let stmt = match QueryLanguageParser::parse_sql(sql).unwrap() { QueryStatement::Sql(Statement::Insert(i)) => i, diff --git a/src/datanode/src/sql/create.rs b/src/datanode/src/sql/create.rs index bdbf1995ac..2657f45bdb 100644 --- a/src/datanode/src/sql/create.rs +++ b/src/datanode/src/sql/create.rs @@ -15,9 +15,9 @@ use std::collections::HashMap; use catalog::{RegisterSchemaRequest, RegisterTableRequest}; +use common_procedure::{ProcedureManagerRef, ProcedureState, ProcedureWithId}; use common_query::Output; -use common_telemetry::tracing::info; -use common_telemetry::tracing::log::error; +use common_telemetry::tracing::{error, info}; use datatypes::schema::RawSchema; use session::context::QueryContextRef; use snafu::{ensure, OptionExt, ResultExt}; @@ -28,12 +28,13 @@ use store_api::storage::consts::TIME_INDEX_NAME; use table::engine::{EngineContext, TableReference}; use table::metadata::TableId; use table::requests::*; +use table_procedure::CreateTableProcedure; use crate::error::{ self, CatalogNotFoundSnafu, CatalogSnafu, ConstraintNotSupportedSnafu, CreateTableSnafu, IllegalPrimaryKeysDefSnafu, InsertSystemCatalogSnafu, KeyColumnNotFoundSnafu, - RegisterSchemaSnafu, Result, SchemaExistsSnafu, SchemaNotFoundSnafu, - UnrecognizedTableOptionSnafu, + ProcedureExecSnafu, RegisterSchemaSnafu, Result, SchemaExistsSnafu, SchemaNotFoundSnafu, + SubmitProcedureSnafu, UnrecognizedTableOptionSnafu, WaitProcedureSnafu, }; use crate::sql::SqlHandler; @@ -72,6 +73,10 @@ impl SqlHandler { } pub(crate) async fn create_table(&self, req: CreateTableRequest) -> Result { + if let Some(procedure_manager) = &self.procedure_manager { + return self.create_table_by_procedure(procedure_manager, req).await; + } + let ctx = EngineContext {}; // first check if catalog and schema exist let catalog = self @@ -127,6 +132,43 @@ impl SqlHandler { Ok(Output::AffectedRows(0)) } + pub(crate) async fn create_table_by_procedure( + &self, + procedure_manager: &ProcedureManagerRef, + req: CreateTableRequest, + ) -> Result { + let table_name = req.table_name.clone(); + let procedure = CreateTableProcedure::new( + req, + self.catalog_manager.clone(), + self.table_engine.clone(), + self.engine_procedure.clone(), + ); + let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); + let procedure_id = procedure_with_id.id; + + info!("Create table {} by procedure {}", table_name, procedure_id); + + let mut watcher = procedure_manager + .submit(procedure_with_id) + .await + .context(SubmitProcedureSnafu)?; + + // TODO(yingwen): Wrap this into a function and add error to ProcedureState::Failed. + loop { + watcher.changed().await.context(WaitProcedureSnafu)?; + match *watcher.borrow() { + ProcedureState::Running => (), + ProcedureState::Done => { + return Ok(Output::AffectedRows(0)); + } + ProcedureState::Failed => { + return ProcedureExecSnafu { procedure_id }.fail(); + } + } + } + } + /// Converts [CreateTable] to [SqlRequest::CreateTable]. pub(crate) fn create_to_request( &self, diff --git a/src/datanode/src/tests/test_util.rs b/src/datanode/src/tests/test_util.rs index a1d21b53da..71d085841b 100644 --- a/src/datanode/src/tests/test_util.rs +++ b/src/datanode/src/tests/test_util.rs @@ -140,7 +140,13 @@ pub async fn create_mock_sql_handler() -> SqlHandler { let catalog_list = catalog::local::new_memory_catalog_list().unwrap(); let factory = QueryEngineFactory::new(catalog_list); - SqlHandler::new(mock_engine, catalog_manager, factory.query_engine()) + SqlHandler::new( + mock_engine.clone(), + catalog_manager, + factory.query_engine(), + mock_engine, + None, + ) } pub(crate) async fn setup_test_instance(test_name: &str) -> MockInstance { diff --git a/src/table-procedure/Cargo.toml b/src/table-procedure/Cargo.toml new file mode 100644 index 0000000000..ac813376d8 --- /dev/null +++ b/src/table-procedure/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "table-procedure" +version.workspace = true +edition.workspace = true +license.workspace = true + +[dependencies] +async-trait.workspace = true +catalog = { path = "../catalog" } +common-error = { path = "../common/error" } +common-procedure = { path = "../common/procedure" } +common-telemetry = { path = "../common/telemetry" } +datatypes = { path = "../datatypes" } +serde.workspace = true +serde_json.workspace = true +snafu.workspace = true +table = { path = "../table" } + +[dev-dependencies] +log-store = { path = "../log-store" } +mito = { path = "../mito" } +object-store = { path = "../object-store" } +storage = { path = "../storage" } +tokio.workspace = true +tempdir = "0.3" diff --git a/src/table-procedure/src/create.rs b/src/table-procedure/src/create.rs new file mode 100644 index 0000000000..dba29cb8f8 --- /dev/null +++ b/src/table-procedure/src/create.rs @@ -0,0 +1,388 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Procedure to create a table. + +use async_trait::async_trait; +use catalog::{CatalogManagerRef, RegisterTableRequest}; +use common_procedure::{ + Context, Error, LockKey, Procedure, ProcedureId, ProcedureManager, ProcedureState, + ProcedureWithId, Result, Status, +}; +use common_telemetry::logging; +use serde::{Deserialize, Serialize}; +use snafu::{OptionExt, ResultExt}; +use table::engine::{EngineContext, TableEngineProcedureRef, TableEngineRef, TableReference}; +use table::requests::CreateTableRequest; + +use crate::error::{ + AccessCatalogSnafu, CatalogNotFoundSnafu, DeserializeProcedureSnafu, SchemaNotFoundSnafu, + SerializeProcedureSnafu, SubprocedureFailedSnafu, +}; + +/// Procedure to create a table. +pub struct CreateTableProcedure { + data: CreateTableData, + catalog_manager: CatalogManagerRef, + table_engine: TableEngineRef, + engine_procedure: TableEngineProcedureRef, +} + +#[async_trait] +impl Procedure for CreateTableProcedure { + fn type_name(&self) -> &str { + Self::TYPE_NAME + } + + async fn execute(&mut self, ctx: &Context) -> Result { + match self.data.state { + CreateTableState::Prepare => self.on_prepare(), + CreateTableState::EngineCreateTable => self.on_engine_create_table(ctx).await, + CreateTableState::RegisterCatalog => self.on_register_catalog().await, + } + } + + fn dump(&self) -> Result { + let json = serde_json::to_string(&self.data).context(SerializeProcedureSnafu)?; + Ok(json) + } + + fn lock_key(&self) -> LockKey { + // We lock the whole table. + let table_name = self.data.table_ref().to_string(); + LockKey::single(table_name) + } +} + +impl CreateTableProcedure { + const TYPE_NAME: &str = "table-procedures::CreateTableProcedure"; + + /// Returns a new [CreateTableProcedure]. + pub fn new( + request: CreateTableRequest, + catalog_manager: CatalogManagerRef, + table_engine: TableEngineRef, + engine_procedure: TableEngineProcedureRef, + ) -> CreateTableProcedure { + CreateTableProcedure { + data: CreateTableData { + state: CreateTableState::Prepare, + request, + subprocedure_id: None, + }, + catalog_manager, + table_engine, + engine_procedure, + } + } + + /// Register the loader of this procedure to the `procedure_manager`. + /// + /// # Panics + /// Panics on error. + pub fn register_loader( + catalog_manager: CatalogManagerRef, + engine_procedure: TableEngineProcedureRef, + table_engine: TableEngineRef, + procedure_manager: &dyn ProcedureManager, + ) { + procedure_manager + .register_loader( + Self::TYPE_NAME, + Box::new(move |data| { + Self::from_json( + data, + catalog_manager.clone(), + table_engine.clone(), + engine_procedure.clone(), + ) + .map(|p| Box::new(p) as _) + }), + ) + .unwrap() + } + + /// Recover the procedure from json. + fn from_json( + json: &str, + catalog_manager: CatalogManagerRef, + table_engine: TableEngineRef, + engine_procedure: TableEngineProcedureRef, + ) -> Result { + let data: CreateTableData = + serde_json::from_str(json).context(DeserializeProcedureSnafu)?; + + Ok(CreateTableProcedure { + data, + catalog_manager, + table_engine, + engine_procedure, + }) + } + + fn on_prepare(&mut self) -> Result { + // Check whether catalog and schema exist. + let catalog = self + .catalog_manager + .catalog(&self.data.request.catalog_name) + .context(AccessCatalogSnafu)? + .with_context(|| { + logging::error!( + "Failed to create table {}, catalog not found", + self.data.table_ref() + ); + CatalogNotFoundSnafu { + name: &self.data.request.catalog_name, + } + })?; + catalog + .schema(&self.data.request.schema_name) + .context(AccessCatalogSnafu)? + .with_context(|| { + logging::error!( + "Failed to create table {}, schema not found", + self.data.table_ref(), + ); + SchemaNotFoundSnafu { + name: &self.data.request.schema_name, + } + })?; + + self.data.state = CreateTableState::EngineCreateTable; + // Assign procedure id to the subprocedure. + self.data.subprocedure_id = Some(ProcedureId::random()); + + Ok(Status::executing(true)) + } + + async fn on_engine_create_table(&mut self, ctx: &Context) -> Result { + // Safety: subprocedure id is always set in this state. + let sub_id = self.data.subprocedure_id.unwrap(); + + // Query subprocedure state. + let Some(sub_state) = ctx.provider.procedure_state(sub_id).await? else { + // We need to submit the subprocedure if it doesn't exist. We always need to + // do this check as we might not submitted the subprocedure yet when the manager + // recover this procedure from procedure store. + logging::info!( + "On engine create table {}, not found, sub_id: {}", + self.data.request.table_name, + sub_id + ); + + // If the sub procedure is not found, we create a new sub procedure with the same id. + let engine_ctx = EngineContext::default(); + let procedure = self + .engine_procedure + .create_table_procedure(&engine_ctx, self.data.request.clone()) + .map_err(Error::external)?; + return Ok(Status::Suspended { + subprocedures: vec![ProcedureWithId { + id: sub_id, + procedure, + }], + persist: true, + }); + }; + + match sub_state { + ProcedureState::Running => Ok(Status::Suspended { + subprocedures: Vec::new(), + persist: false, + }), + ProcedureState::Done => { + logging::info!( + "On engine create table {}, done, sub_id: {}", + self.data.request.table_name, + sub_id + ); + // The sub procedure is done, we can execute next step. + self.data.state = CreateTableState::RegisterCatalog; + Ok(Status::executing(true)) + } + ProcedureState::Failed => { + // Return error if the subprocedure is failed. + SubprocedureFailedSnafu { + subprocedure_id: sub_id, + } + .fail()? + } + } + } + + async fn on_register_catalog(&mut self) -> Result { + let catalog = self + .catalog_manager + .catalog(&self.data.request.catalog_name) + .context(AccessCatalogSnafu)? + .context(CatalogNotFoundSnafu { + name: &self.data.request.catalog_name, + })?; + let schema = catalog + .schema(&self.data.request.schema_name) + .context(AccessCatalogSnafu)? + .context(SchemaNotFoundSnafu { + name: &self.data.request.schema_name, + })?; + let table_exists = schema + .table(&self.data.request.table_name) + .map_err(Error::external)? + .is_some(); + if table_exists { + // Table already exists. + return Ok(Status::Done); + } + + let engine_ctx = EngineContext::default(); + let table_ref = self.data.table_ref(); + // Safety: The procedure owns the lock so the table should exist. + let table = self + .table_engine + .get_table(&engine_ctx, &table_ref) + .map_err(Error::external)? + .unwrap(); + + let register_req = RegisterTableRequest { + catalog: self.data.request.catalog_name.clone(), + schema: self.data.request.schema_name.clone(), + table_name: self.data.request.table_name.clone(), + table_id: self.data.request.id, + table, + }; + self.catalog_manager + .register_table(register_req) + .await + .map_err(Error::external)?; + + Ok(Status::Done) + } +} + +/// Represents each step while creating a table in the datanode. +#[derive(Debug, Serialize, Deserialize)] +enum CreateTableState { + /// Validate request and prepare to create table. + Prepare, + /// Create table in the table engine. + EngineCreateTable, + /// Register the table to the catalog. + RegisterCatalog, +} + +/// Serializable data of [CreateTableProcedure]. +#[derive(Debug, Serialize, Deserialize)] +struct CreateTableData { + /// Current state. + state: CreateTableState, + /// Request to create this table. + request: CreateTableRequest, + /// Id of the subprocedure to create this table in the engine. + /// + /// This id is `Some` while the procedure is in [CreateTableState::EngineCreateTable] + /// state. + subprocedure_id: Option, +} + +impl CreateTableData { + fn table_ref(&self) -> TableReference { + TableReference { + catalog: &self.request.catalog_name, + schema: &self.request.schema_name, + table: &self.request.table_name, + } + } +} + +#[cfg(test)] +mod tests { + use datatypes::prelude::ConcreteDataType; + use datatypes::schema::{ColumnSchema, RawSchema}; + use table::engine::{EngineContext, TableEngine}; + + use super::*; + use crate::test_util::TestEnv; + + fn schema_for_test() -> RawSchema { + let column_schemas = vec![ + // Key + ColumnSchema::new("host", ConcreteDataType::string_datatype(), false), + // Nullable value column: cpu + ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true), + // Non-null value column: memory + ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), false), + ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + true, + ) + .with_time_index(true), + ]; + + RawSchema::new(column_schemas) + } + + fn new_create_request(table_name: &str) -> CreateTableRequest { + CreateTableRequest { + id: 1, + catalog_name: "greptime".to_string(), + schema_name: "public".to_string(), + table_name: table_name.to_string(), + desc: Some("a test table".to_string()), + schema: schema_for_test(), + region_numbers: vec![0, 1], + create_if_not_exists: true, + primary_key_indices: vec![0], + table_options: Default::default(), + } + } + + #[tokio::test] + async fn test_create_table_procedure() { + let TestEnv { + dir: _dir, + table_engine, + procedure_manager, + catalog_manager, + } = TestEnv::new("create"); + + let table_name = "test_create"; + let request = new_create_request(table_name); + let procedure = CreateTableProcedure::new( + request.clone(), + catalog_manager, + table_engine.clone(), + table_engine.clone(), + ); + + let table_ref = TableReference { + catalog: &request.catalog_name, + schema: &request.schema_name, + table: &request.table_name, + }; + let engine_ctx = EngineContext::default(); + assert!(table_engine + .get_table(&engine_ctx, &table_ref) + .unwrap() + .is_none()); + + let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); + let mut watcher = procedure_manager.submit(procedure_with_id).await.unwrap(); + watcher.changed().await.unwrap(); + + assert!(table_engine + .get_table(&engine_ctx, &table_ref) + .unwrap() + .is_some()); + } +} diff --git a/src/table-procedure/src/error.rs b/src/table-procedure/src/error.rs new file mode 100644 index 0000000000..582467b011 --- /dev/null +++ b/src/table-procedure/src/error.rs @@ -0,0 +1,89 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; + +use common_error::prelude::*; +use common_procedure::ProcedureId; + +#[derive(Debug, Snafu)] +#[snafu(visibility(pub(crate)))] +pub enum Error { + #[snafu(display("Failed to serialize procedure to json, source: {}", source))] + SerializeProcedure { + source: serde_json::Error, + backtrace: Backtrace, + }, + + #[snafu(display("Failed to deserialize procedure from json, source: {}", source))] + DeserializeProcedure { + source: serde_json::Error, + backtrace: Backtrace, + }, + + #[snafu(display("Invalid raw schema, source: {}", source))] + InvalidRawSchema { + #[snafu(backtrace)] + source: datatypes::error::Error, + }, + + #[snafu(display("Failed to access catalog, source: {}", source))] + AccessCatalog { + #[snafu(backtrace)] + source: catalog::error::Error, + }, + + #[snafu(display("Catalog {} not found", name))] + CatalogNotFound { name: String }, + + #[snafu(display("Schema {} not found", name))] + SchemaNotFound { name: String }, + + #[snafu(display("Subprocedure {} failed", subprocedure_id))] + SubprocedureFailed { + subprocedure_id: ProcedureId, + backtrace: Backtrace, + }, +} + +pub type Result = std::result::Result; + +impl ErrorExt for Error { + fn status_code(&self) -> StatusCode { + use Error::*; + + match self { + SerializeProcedure { .. } | DeserializeProcedure { .. } | SubprocedureFailed { .. } => { + StatusCode::Internal + } + InvalidRawSchema { source, .. } => source.status_code(), + AccessCatalog { source } => source.status_code(), + CatalogNotFound { .. } | SchemaNotFound { .. } => StatusCode::InvalidArguments, + } + } + + fn backtrace_opt(&self) -> Option<&Backtrace> { + ErrorCompat::backtrace(self) + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +impl From for common_procedure::Error { + fn from(e: Error) -> common_procedure::Error { + common_procedure::Error::external(e) + } +} diff --git a/src/table-procedure/src/lib.rs b/src/table-procedure/src/lib.rs new file mode 100644 index 0000000000..9915376808 --- /dev/null +++ b/src/table-procedure/src/lib.rs @@ -0,0 +1,43 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Procedures for table operations. + +mod create; +pub mod error; +#[cfg(test)] +mod test_util; + +use catalog::CatalogManagerRef; +use common_procedure::ProcedureManager; +pub use create::CreateTableProcedure; +use table::engine::{TableEngineProcedureRef, TableEngineRef}; + +/// Register all procedure loaders to the procedure manager. +/// +/// # Panics +/// Panics on error. +pub fn register_procedure_loaders( + catalog_manager: CatalogManagerRef, + engine_procedure: TableEngineProcedureRef, + table_engine: TableEngineRef, + procedure_manager: &dyn ProcedureManager, +) { + CreateTableProcedure::register_loader( + catalog_manager, + engine_procedure, + table_engine, + procedure_manager, + ); +} diff --git a/src/table-procedure/src/test_util.rs b/src/table-procedure/src/test_util.rs new file mode 100644 index 0000000000..ac047a93f5 --- /dev/null +++ b/src/table-procedure/src/test_util.rs @@ -0,0 +1,73 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use catalog::local::MemoryCatalogManager; +use catalog::CatalogManagerRef; +use common_procedure::local::{LocalManager, ManagerConfig}; +use common_procedure::ProcedureManagerRef; +use log_store::NoopLogStore; +use mito::config::EngineConfig; +use mito::engine::MitoEngine; +use object_store::services::Fs; +use object_store::{ObjectStore, ObjectStoreBuilder}; +use storage::compaction::noop::NoopCompactionScheduler; +use storage::config::EngineConfig as StorageEngineConfig; +use storage::EngineImpl; +use tempdir::TempDir; + +pub struct TestEnv { + pub dir: TempDir, + pub table_engine: Arc>>, + pub procedure_manager: ProcedureManagerRef, + pub catalog_manager: CatalogManagerRef, +} + +impl TestEnv { + pub fn new(prefix: &str) -> TestEnv { + let dir = TempDir::new(prefix).unwrap(); + let store_dir = format!("{}/db", dir.path().to_string_lossy()); + let accessor = Fs::default().root(&store_dir).build().unwrap(); + let object_store = ObjectStore::new(accessor).finish(); + + let compaction_scheduler = Arc::new(NoopCompactionScheduler::default()); + let storage_engine = EngineImpl::new( + StorageEngineConfig::default(), + Arc::new(NoopLogStore::default()), + object_store.clone(), + compaction_scheduler, + ); + let table_engine = Arc::new(MitoEngine::new( + EngineConfig::default(), + storage_engine, + object_store, + )); + + let procedure_dir = format!("{}/procedure", dir.path().to_string_lossy()); + let accessor = Fs::default().root(&procedure_dir).build().unwrap(); + let object_store = ObjectStore::new(accessor).finish(); + + let procedure_manager = Arc::new(LocalManager::new(ManagerConfig { object_store })); + + let catalog_manager = Arc::new(MemoryCatalogManager::default()); + + TestEnv { + dir, + table_engine, + procedure_manager, + catalog_manager, + } + } +}