From 1d83c942a9a72ee636425647a7eb28bffd9829e5 Mon Sep 17 00:00:00 2001 From: LFC <990479+MichaelScofield@users.noreply.github.com> Date: Wed, 6 Sep 2023 19:31:00 +0800 Subject: [PATCH] refactor: script table creation (#2340) * refactor: 1. remove method `register_system_table` from CatalogManager 2. the creation of ScriptTable (as a system table) is removed from CatalogManager. Instead, the ScriptTable is created when Frontend instance is starting; and is created by calling Frontend instance's grpc handler. * rebase --- src/catalog/src/error.rs | 4 - src/catalog/src/lib.rs | 10 +-- src/catalog/src/local/manager.rs | 14 ---- src/catalog/src/local/memory.rs | 7 +- src/frontend/src/catalog.rs | 124 +------------------------------ src/frontend/src/error.rs | 6 +- src/frontend/src/instance.rs | 17 ++--- src/frontend/src/script.rs | 118 ++++++++++++++++++++++++++++- src/script/src/manager.rs | 44 ++++++++++- src/script/src/table.rs | 45 +---------- 10 files changed, 184 insertions(+), 205 deletions(-) diff --git a/src/catalog/src/error.rs b/src/catalog/src/error.rs index c947a9d0f7..9fe1f5cbfb 100644 --- a/src/catalog/src/error.rs +++ b/src/catalog/src/error.rs @@ -238,9 +238,6 @@ pub enum Error { #[snafu(display("Illegal access to catalog: {} and schema: {}", catalog, schema))] QueryAccessDenied { catalog: String, schema: String }, - #[snafu(display("Invalid system table definition: {err_msg}"))] - InvalidSystemTableDef { err_msg: String, location: Location }, - #[snafu(display("{}: {}", msg, source))] Datafusion { msg: String, @@ -275,7 +272,6 @@ impl ErrorExt for Error { | Error::IllegalManagerState { .. } | Error::CatalogNotFound { .. } | Error::InvalidEntryType { .. } - | Error::InvalidSystemTableDef { .. } | Error::ParallelOpenTable { .. } => StatusCode::Unexpected, Error::SystemCatalog { .. } diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index 3a284cef4a..81b4b742b8 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -22,6 +22,7 @@ use std::sync::Arc; use api::v1::meta::RegionStat; use common_telemetry::{info, warn}; +use futures::future::BoxFuture; use snafu::ResultExt; use table::engine::{EngineContext, TableEngineRef}; use table::metadata::{TableId, TableType}; @@ -74,10 +75,6 @@ pub trait CatalogManager: Send + Sync { /// Rename a table to [RenameTableRequest::new_table_name], returns whether the table is renamed. async fn rename_table(&self, request: RenameTableRequest) -> Result; - /// Register a system table, should be called before starting the manager. - async fn register_system_table(&self, request: RegisterSystemTableRequest) - -> error::Result<()>; - async fn catalog_names(&self) -> Result>; async fn schema_names(&self, catalog: &str) -> Result>; @@ -102,7 +99,8 @@ pub trait CatalogManager: Send + Sync { pub type CatalogManagerRef = Arc; /// Hook called after system table opening. -pub type OpenSystemTableHook = Arc Result<()> + Send + Sync>; +pub type OpenSystemTableHook = + Box BoxFuture<'static, Result<()>> + Send + Sync>; /// Register system table request: /// - When system table is already created and registered, the hook will be called @@ -200,7 +198,7 @@ pub(crate) async fn handle_system_table_request<'a, M: CatalogManager>( table }; if let Some(hook) = req.open_hook { - (hook)(table)?; + (hook)(table).await?; } } Ok(()) diff --git a/src/catalog/src/local/manager.rs b/src/catalog/src/local/manager.rs index eab49873c0..5c5fb11623 100644 --- a/src/catalog/src/local/manager.rs +++ b/src/catalog/src/local/manager.rs @@ -517,20 +517,6 @@ impl CatalogManager for LocalCatalogManager { .fail() } - async fn register_system_table(&self, request: RegisterSystemTableRequest) -> Result<()> { - let catalog_name = request.create_table_request.catalog_name.clone(); - let schema_name = request.create_table_request.schema_name.clone(); - - let mut sys_table_requests = self.system_table_requests.lock().await; - sys_table_requests.push(request); - increment_gauge!( - crate::metrics::METRIC_CATALOG_MANAGER_TABLE_COUNT, - 1.0, - &[crate::metrics::db_label(&catalog_name, &schema_name)], - ); - Ok(()) - } - async fn schema_exist(&self, catalog: &str, schema: &str) -> Result { self.catalogs.schema_exist(catalog, schema).await } diff --git a/src/catalog/src/local/memory.rs b/src/catalog/src/local/memory.rs index 565224ff68..c4449bffac 100644 --- a/src/catalog/src/local/memory.rs +++ b/src/catalog/src/local/memory.rs @@ -33,7 +33,7 @@ use crate::error::{ use crate::information_schema::InformationSchemaProvider; use crate::{ CatalogManager, DeregisterSchemaRequest, DeregisterTableRequest, RegisterSchemaRequest, - RegisterSystemTableRequest, RegisterTableRequest, RenameTableRequest, + RegisterTableRequest, RenameTableRequest, }; type SchemaEntries = HashMap>; @@ -132,11 +132,6 @@ impl CatalogManager for MemoryCatalogManager { Ok(true) } - async fn register_system_table(&self, _request: RegisterSystemTableRequest) -> Result<()> { - // TODO(ruihang): support register system table request - Ok(()) - } - async fn schema_exist(&self, catalog: &str, schema: &str) -> Result { self.schema_exist_sync(catalog, schema) } diff --git a/src/frontend/src/catalog.rs b/src/frontend/src/catalog.rs index 022b6291f0..3d30a00f41 100644 --- a/src/frontend/src/catalog.rs +++ b/src/frontend/src/catalog.rs @@ -16,16 +16,15 @@ use std::any::Any; use std::collections::BTreeSet; use std::sync::Arc; -use api::v1::CreateTableExpr; use catalog::error::{ - self as catalog_err, InternalSnafu, InvalidSystemTableDefSnafu, ListCatalogsSnafu, - ListSchemasSnafu, Result as CatalogResult, TableMetadataManagerSnafu, UnimplementedSnafu, + self as catalog_err, ListCatalogsSnafu, ListSchemasSnafu, Result as CatalogResult, + TableMetadataManagerSnafu, }; use catalog::information_schema::{InformationSchemaProvider, COLUMNS, TABLES}; use catalog::remote::KvCacheInvalidatorRef; use catalog::{ CatalogManager, DeregisterSchemaRequest, DeregisterTableRequest, RegisterSchemaRequest, - RegisterSystemTableRequest, RegisterTableRequest, RenameTableRequest, + RegisterTableRequest, RenameTableRequest, }; use client::client_manager::DatanodeClients; use common_catalog::consts::{ @@ -46,8 +45,6 @@ use table::metadata::TableId; use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME}; use table::TableRef; -use crate::expr_factory; -use crate::instance::distributed::DistInstance; use crate::table::DistTable; #[derive(Clone)] @@ -57,12 +54,6 @@ pub struct FrontendCatalogManager { partition_manager: PartitionRuleManagerRef, datanode_clients: Arc, table_metadata_manager: TableMetadataManagerRef, - - // TODO(LFC): Remove this field. - // DistInstance in FrontendCatalogManager is only used for creating distributed script table now. - // Once we have some standalone distributed table creator (like create distributed table procedure), - // we should use that. - dist_instance: Option>, } impl FrontendCatalogManager { @@ -79,14 +70,9 @@ impl FrontendCatalogManager { partition_manager, datanode_clients, table_metadata_manager, - dist_instance: None, } } - pub fn set_dist_instance(&mut self, dist_instance: Arc) { - self.dist_instance = Some(dist_instance) - } - pub fn backend(&self) -> KvBackendRef { self.backend.clone() } @@ -180,99 +166,6 @@ impl CatalogManager for FrontendCatalogManager { unimplemented!() } - async fn register_system_table( - &self, - request: RegisterSystemTableRequest, - ) -> catalog::error::Result<()> { - if let Some(dist_instance) = &self.dist_instance { - let open_hook = request.open_hook; - let request = request.create_table_request; - - if let Some(table) = self - .table( - &request.catalog_name, - &request.schema_name, - &request.table_name, - ) - .await? - { - if let Some(hook) = open_hook { - (hook)(table)?; - } - return Ok(()); - } - - let time_index = request - .schema - .column_schemas - .iter() - .find_map(|x| { - if x.is_time_index() { - Some(x.name.clone()) - } else { - None - } - }) - .context(InvalidSystemTableDefSnafu { - err_msg: "Time index is not defined.", - })?; - - let primary_keys = request - .schema - .column_schemas - .iter() - .enumerate() - .filter_map(|(i, x)| { - if request.primary_key_indices.contains(&i) { - Some(x.name.clone()) - } else { - None - } - }) - .collect::>(); - - let column_defs = - expr_factory::column_schemas_to_defs(request.schema.column_schemas, &primary_keys) - .map_err(|e| { - InvalidSystemTableDefSnafu { - err_msg: e.to_string(), - } - .build() - })?; - - let mut create_table = CreateTableExpr { - catalog_name: request.catalog_name, - schema_name: request.schema_name, - table_name: request.table_name, - desc: request.desc.unwrap_or("".to_string()), - column_defs, - time_index, - primary_keys, - create_if_not_exists: request.create_if_not_exists, - table_options: (&request.table_options).into(), - table_id: None, // Should and will be assigned by Meta. - region_numbers: vec![0], - engine: request.engine, - }; - - let table = dist_instance - .create_table(&mut create_table, None) - .await - .map_err(BoxedError::new) - .context(InternalSnafu)?; - - if let Some(hook) = open_hook { - (hook)(table)?; - } - Ok(()) - } else { - UnimplementedSnafu { - operation: "register system table", - } - .fail() - } - } - async fn catalog_names(&self) -> CatalogResult> { let stream = self .table_metadata_manager @@ -374,16 +267,7 @@ impl CatalogManager for FrontendCatalogManager { } if schema == INFORMATION_SCHEMA_NAME { - // hack: use existing cyclin reference to get Arc. - // This can be remove by refactoring the struct into something like Arc - common_telemetry::info!("going to use dist instance"); - let manager = if let Some(instance) = self.dist_instance.as_ref() { - common_telemetry::info!("dist instance exist"); - instance.catalog_manager() as _ - } else { - common_telemetry::info!("dist instance doesn't exist"); - return Ok(None); - }; + let manager = Arc::new(self.clone()) as _; let provider = InformationSchemaProvider::new(catalog.to_string(), Arc::downgrade(&manager)); diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index 75632fce06..15709a3e00 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -155,7 +155,10 @@ pub enum Error { #[snafu(display("Invalid DeleteRequest, reason: {}", reason))] InvalidDeleteRequest { reason: String, location: Location }, - #[snafu(display("Table not found: {}", table_name))] + #[snafu(display("Invalid system table definition: {err_msg}, at {location}"))] + InvalidSystemTableDef { err_msg: String, location: Location }, + + #[snafu(display("Table not found: '{}', at {location}", table_name))] TableNotFound { table_name: String, location: Location, @@ -739,6 +742,7 @@ impl ErrorExt for Error { Error::IncompleteGrpcResult { .. } | Error::ContextValueNotFound { .. } + | Error::InvalidSystemTableDef { .. } | Error::EncodeJson { .. } => StatusCode::Unexpected, Error::TableNotFound { .. } => StatusCode::TableNotFound, diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 80becaea49..41f118a5ff 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -152,20 +152,19 @@ impl Instance { let partition_manager = Arc::new(PartitionRuleManager::new(table_routes)); let table_metadata_manager = Arc::new(TableMetadataManager::new(meta_backend.clone())); - let mut catalog_manager = FrontendCatalogManager::new( + let catalog_manager = Arc::new(FrontendCatalogManager::new( meta_backend.clone(), meta_backend.clone(), partition_manager.clone(), datanode_clients.clone(), table_metadata_manager.clone(), - ); + )); - let dist_instance = - DistInstance::new(meta_client.clone(), Arc::new(catalog_manager.clone())); - let dist_instance = Arc::new(dist_instance); + let dist_instance = Arc::new(DistInstance::new( + meta_client.clone(), + catalog_manager.clone(), + )); - catalog_manager.set_dist_instance(dist_instance.clone()); - let catalog_manager = Arc::new(catalog_manager); let dist_request_handler = DistRegionRequestHandler::arc(catalog_manager.clone()); let query_engine = QueryEngineFactory::new_with_plugins( @@ -383,12 +382,12 @@ impl Instance { #[async_trait] impl FrontendInstance for Instance { async fn start(&self) -> Result<()> { - // TODO(hl): Frontend init should move to here - if let Some(heartbeat_task) = &self.heartbeat_task { heartbeat_task.start().await?; } + self.script_executor.start(self).await?; + futures::future::try_join_all(self.servers.values().map(start_server)) .await .context(error::StartServerSnafu) diff --git a/src/frontend/src/script.rs b/src/frontend/src/script.rs index bad8468032..103b64dbd1 100644 --- a/src/frontend/src/script.rs +++ b/src/frontend/src/script.rs @@ -34,6 +34,10 @@ mod dummy { Ok(Self {}) } + pub async fn start(&self) -> Result<()> { + Ok(()) + } + pub async fn insert_script( &self, _schema: &str, @@ -56,12 +60,23 @@ mod dummy { #[cfg(feature = "python")] mod python { + use api::v1::ddl_request::Expr; + use api::v1::greptime_request::Request; + use api::v1::{CreateTableExpr, DdlRequest}; + use catalog::RegisterSystemTableRequest; use common_error::ext::BoxedError; + use common_meta::table_name::TableName; use common_telemetry::logging::error; use script::manager::ScriptManager; - use snafu::ResultExt; + use servers::query_handler::grpc::GrpcQueryHandler; + use session::context::QueryContext; + use snafu::{OptionExt, ResultExt}; + use table::requests::CreateTableRequest; use super::*; + use crate::error::{CatalogSnafu, InvalidSystemTableDefSnafu, TableNotFoundSnafu}; + use crate::expr_factory; + use crate::instance::Instance; pub struct ScriptExecutor { script_manager: ScriptManager, @@ -79,6 +94,107 @@ mod python { }) } + pub async fn start(&self, instance: &Instance) -> Result<()> { + let RegisterSystemTableRequest { + create_table_request: request, + open_hook, + } = self.script_manager.create_table_request(); + + if let Some(table) = instance + .catalog_manager() + .table( + &request.catalog_name, + &request.schema_name, + &request.table_name, + ) + .await + .context(CatalogSnafu)? + { + if let Some(open_hook) = open_hook { + (open_hook)(table).await.context(CatalogSnafu)?; + } + + return Ok(()); + } + + let table_name = TableName::new( + &request.catalog_name, + &request.schema_name, + &request.table_name, + ); + + let expr = Self::create_table_expr(request)?; + + let _ = instance + .do_query( + Request::Ddl(DdlRequest { + expr: Some(Expr::CreateTable(expr)), + }), + QueryContext::arc(), + ) + .await?; + + let table = instance + .catalog_manager() + .table( + &table_name.catalog_name, + &table_name.schema_name, + &table_name.table_name, + ) + .await + .context(CatalogSnafu)? + .with_context(|| TableNotFoundSnafu { + table_name: table_name.to_string(), + })?; + + if let Some(open_hook) = open_hook { + (open_hook)(table).await.context(CatalogSnafu)?; + } + + Ok(()) + } + + fn create_table_expr(request: CreateTableRequest) -> Result { + let column_schemas = request.schema.column_schemas; + + let time_index = column_schemas + .iter() + .find_map(|x| { + if x.is_time_index() { + Some(x.name.clone()) + } else { + None + } + }) + .context(InvalidSystemTableDefSnafu { + err_msg: "Time index is not defined.", + })?; + + let primary_keys = request + .primary_key_indices + .iter() + // Indexing has to be safe because the create script table request is pre-defined. + .map(|i| column_schemas[*i].name.clone()) + .collect::>(); + + let column_defs = expr_factory::column_schemas_to_defs(column_schemas, &primary_keys)?; + + Ok(CreateTableExpr { + catalog_name: request.catalog_name, + schema_name: request.schema_name, + table_name: request.table_name, + desc: request.desc.unwrap_or_default(), + column_defs, + time_index, + primary_keys, + create_if_not_exists: request.create_if_not_exists, + table_options: (&request.table_options).into(), + table_id: None, // Should and will be assigned by Meta. + region_numbers: vec![0], + engine: request.engine, + }) + } + pub async fn insert_script( &self, schema: &str, diff --git a/src/script/src/manager.rs b/src/script/src/manager.rs index ac18774251..08a9a27840 100644 --- a/src/script/src/manager.rs +++ b/src/script/src/manager.rs @@ -16,20 +16,27 @@ use std::collections::HashMap; use std::sync::{Arc, RwLock}; -use catalog::CatalogManagerRef; +use catalog::{CatalogManagerRef, OpenSystemTableHook, RegisterSystemTableRequest}; +use common_catalog::consts::{ + default_engine, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, SCRIPTS_TABLE_ID, +}; use common_query::Output; use common_telemetry::logging; +use futures::future::FutureExt; use query::QueryEngineRef; use snafu::{OptionExt, ResultExt}; +use table::requests::{CreateTableRequest, TableOptions}; +use table::TableRef; use crate::engine::{CompileContext, EvalContext, Script, ScriptEngine}; use crate::error::{CompilePythonSnafu, ExecutePythonSnafu, Result, ScriptNotFoundSnafu}; use crate::python::{PyEngine, PyScript}; -use crate::table::ScriptsTable; +use crate::table::{build_scripts_schema, ScriptsTable, SCRIPTS_TABLE_NAME}; pub struct ScriptManager { compiled: RwLock>>, py_engine: PyEngine, + query_engine: QueryEngineRef, table: ScriptsTable, } @@ -41,10 +48,41 @@ impl ScriptManager { Ok(Self { compiled: RwLock::new(HashMap::default()), py_engine: PyEngine::new(query_engine.clone()), - table: ScriptsTable::new_empty(catalog_manager, query_engine)?, + query_engine: query_engine.clone(), + table: ScriptsTable::new(catalog_manager, query_engine).await?, }) } + pub fn create_table_request(&self) -> RegisterSystemTableRequest { + let request = CreateTableRequest { + id: SCRIPTS_TABLE_ID, + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name: SCRIPTS_TABLE_NAME.to_string(), + desc: Some("GreptimeDB scripts table for Python".to_string()), + schema: build_scripts_schema(), + region_numbers: vec![0], + // 'schema' and 'name' are primary keys + primary_key_indices: vec![0, 1], + create_if_not_exists: true, + table_options: TableOptions::default(), + engine: default_engine().to_string(), + }; + + let query_engine = self.query_engine.clone(); + + let hook: OpenSystemTableHook = Box::new(move |table: TableRef| { + let query_engine = query_engine.clone(); + async move { ScriptsTable::recompile_register_udf(table, query_engine.clone()).await } + .boxed() + }); + + RegisterSystemTableRequest { + create_table_request: request, + open_hook: Some(hook), + } + } + /// compile script, and register them to the query engine and UDF registry async fn compile(&self, name: &str, script: &str) -> Result> { let script = Arc::new(Self::compile_without_cache(&self.py_engine, name, script).await?); diff --git a/src/script/src/table.rs b/src/script/src/table.rs index 993a8c5395..66910eb4cf 100644 --- a/src/script/src/table.rs +++ b/src/script/src/table.rs @@ -17,10 +17,8 @@ use std::collections::HashMap; use std::sync::Arc; use catalog::error::CompileScriptInternalSnafu; -use catalog::{CatalogManagerRef, OpenSystemTableHook, RegisterSystemTableRequest}; -use common_catalog::consts::{ - DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE, SCRIPTS_TABLE_ID, -}; +use catalog::CatalogManagerRef; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_catalog::format_full_table_name; use common_error::ext::BoxedError; use common_query::Output; @@ -38,16 +36,15 @@ use query::plan::LogicalPlan; use query::QueryEngineRef; use session::context::QueryContextBuilder; use snafu::{ensure, OptionExt, ResultExt}; -use table::requests::{CreateTableRequest, InsertRequest, TableOptions}; +use table::requests::InsertRequest; use table::table::adapter::DfTableProviderAdapter; use table::TableRef; use crate::error::{ BuildDfLogicalPlanSnafu, CastTypeSnafu, CollectRecordsSnafu, ExecuteInternalStatementSnafu, FindColumnInScriptsTableSnafu, FindScriptSnafu, FindScriptsTableSnafu, InsertScriptSnafu, - RegisterScriptsTableSnafu, Result, ScriptNotFoundSnafu, ScriptsTableNotFoundSnafu, + Result, ScriptNotFoundSnafu, ScriptsTableNotFoundSnafu, }; -use crate::python::utils::block_on_async; use crate::python::PyScript; pub const SCRIPTS_TABLE_NAME: &str = "scripts"; @@ -151,40 +148,6 @@ impl ScriptsTable { catalog_manager: CatalogManagerRef, query_engine: QueryEngineRef, ) -> Result { - let schema = build_scripts_schema(); - // TODO(dennis): we put scripts table into default catalog and schema. - // maybe put into system catalog? - let request = CreateTableRequest { - id: SCRIPTS_TABLE_ID, - catalog_name: DEFAULT_CATALOG_NAME.to_string(), - schema_name: DEFAULT_SCHEMA_NAME.to_string(), - table_name: SCRIPTS_TABLE_NAME.to_string(), - desc: Some("Scripts table".to_string()), - schema, - region_numbers: vec![0], - //schema and name as primary key - primary_key_indices: vec![0, 1], - create_if_not_exists: true, - table_options: TableOptions::default(), - engine: MITO_ENGINE.to_string(), - }; - let callback_query_engine = query_engine.clone(); - let script_recompile_callback: OpenSystemTableHook = Arc::new(move |table: TableRef| { - let callback_query_engine = callback_query_engine.clone(); - block_on_async(async move { - Self::recompile_register_udf(table, callback_query_engine.clone()).await - }) - .unwrap() - }); - - catalog_manager - .register_system_table(RegisterSystemTableRequest { - create_table_request: request, - open_hook: Some(script_recompile_callback), - }) - .await - .context(RegisterScriptsTableSnafu)?; - Ok(Self { catalog_manager, query_engine,