refactor: script table creation (#2340)

* refactor:
1. remove method `register_system_table` from CatalogManager
2. the creation of ScriptTable (as a system table) is removed from CatalogManager. Instead, the ScriptTable is created when Frontend instance is starting; and is created by calling Frontend instance's grpc handler.

* rebase
This commit is contained in:
LFC
2023-09-06 19:31:00 +08:00
committed by Ruihang Xia
parent f287a5db9f
commit 1d83c942a9
10 changed files with 184 additions and 205 deletions

View File

@@ -238,9 +238,6 @@ pub enum Error {
#[snafu(display("Illegal access to catalog: {} and schema: {}", catalog, schema))]
QueryAccessDenied { catalog: String, schema: String },
#[snafu(display("Invalid system table definition: {err_msg}"))]
InvalidSystemTableDef { err_msg: String, location: Location },
#[snafu(display("{}: {}", msg, source))]
Datafusion {
msg: String,
@@ -275,7 +272,6 @@ impl ErrorExt for Error {
| Error::IllegalManagerState { .. }
| Error::CatalogNotFound { .. }
| Error::InvalidEntryType { .. }
| Error::InvalidSystemTableDef { .. }
| Error::ParallelOpenTable { .. } => StatusCode::Unexpected,
Error::SystemCatalog { .. }

View File

@@ -22,6 +22,7 @@ use std::sync::Arc;
use api::v1::meta::RegionStat;
use common_telemetry::{info, warn};
use futures::future::BoxFuture;
use snafu::ResultExt;
use table::engine::{EngineContext, TableEngineRef};
use table::metadata::{TableId, TableType};
@@ -74,10 +75,6 @@ pub trait CatalogManager: Send + Sync {
/// Rename a table to [RenameTableRequest::new_table_name], returns whether the table is renamed.
async fn rename_table(&self, request: RenameTableRequest) -> Result<bool>;
/// Register a system table, should be called before starting the manager.
async fn register_system_table(&self, request: RegisterSystemTableRequest)
-> error::Result<()>;
async fn catalog_names(&self) -> Result<Vec<String>>;
async fn schema_names(&self, catalog: &str) -> Result<Vec<String>>;
@@ -102,7 +99,8 @@ pub trait CatalogManager: Send + Sync {
pub type CatalogManagerRef = Arc<dyn CatalogManager>;
/// Hook called after system table opening.
pub type OpenSystemTableHook = Arc<dyn Fn(TableRef) -> Result<()> + Send + Sync>;
pub type OpenSystemTableHook =
Box<dyn Fn(TableRef) -> BoxFuture<'static, Result<()>> + Send + Sync>;
/// Register system table request:
/// - When system table is already created and registered, the hook will be called
@@ -200,7 +198,7 @@ pub(crate) async fn handle_system_table_request<'a, M: CatalogManager>(
table
};
if let Some(hook) = req.open_hook {
(hook)(table)?;
(hook)(table).await?;
}
}
Ok(())

View File

@@ -517,20 +517,6 @@ impl CatalogManager for LocalCatalogManager {
.fail()
}
async fn register_system_table(&self, request: RegisterSystemTableRequest) -> Result<()> {
let catalog_name = request.create_table_request.catalog_name.clone();
let schema_name = request.create_table_request.schema_name.clone();
let mut sys_table_requests = self.system_table_requests.lock().await;
sys_table_requests.push(request);
increment_gauge!(
crate::metrics::METRIC_CATALOG_MANAGER_TABLE_COUNT,
1.0,
&[crate::metrics::db_label(&catalog_name, &schema_name)],
);
Ok(())
}
async fn schema_exist(&self, catalog: &str, schema: &str) -> Result<bool> {
self.catalogs.schema_exist(catalog, schema).await
}

View File

@@ -33,7 +33,7 @@ use crate::error::{
use crate::information_schema::InformationSchemaProvider;
use crate::{
CatalogManager, DeregisterSchemaRequest, DeregisterTableRequest, RegisterSchemaRequest,
RegisterSystemTableRequest, RegisterTableRequest, RenameTableRequest,
RegisterTableRequest, RenameTableRequest,
};
type SchemaEntries = HashMap<String, HashMap<String, TableRef>>;
@@ -132,11 +132,6 @@ impl CatalogManager for MemoryCatalogManager {
Ok(true)
}
async fn register_system_table(&self, _request: RegisterSystemTableRequest) -> Result<()> {
// TODO(ruihang): support register system table request
Ok(())
}
async fn schema_exist(&self, catalog: &str, schema: &str) -> Result<bool> {
self.schema_exist_sync(catalog, schema)
}

View File

@@ -16,16 +16,15 @@ use std::any::Any;
use std::collections::BTreeSet;
use std::sync::Arc;
use api::v1::CreateTableExpr;
use catalog::error::{
self as catalog_err, InternalSnafu, InvalidSystemTableDefSnafu, ListCatalogsSnafu,
ListSchemasSnafu, Result as CatalogResult, TableMetadataManagerSnafu, UnimplementedSnafu,
self as catalog_err, ListCatalogsSnafu, ListSchemasSnafu, Result as CatalogResult,
TableMetadataManagerSnafu,
};
use catalog::information_schema::{InformationSchemaProvider, COLUMNS, TABLES};
use catalog::remote::KvCacheInvalidatorRef;
use catalog::{
CatalogManager, DeregisterSchemaRequest, DeregisterTableRequest, RegisterSchemaRequest,
RegisterSystemTableRequest, RegisterTableRequest, RenameTableRequest,
RegisterTableRequest, RenameTableRequest,
};
use client::client_manager::DatanodeClients;
use common_catalog::consts::{
@@ -46,8 +45,6 @@ use table::metadata::TableId;
use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
use table::TableRef;
use crate::expr_factory;
use crate::instance::distributed::DistInstance;
use crate::table::DistTable;
#[derive(Clone)]
@@ -57,12 +54,6 @@ pub struct FrontendCatalogManager {
partition_manager: PartitionRuleManagerRef,
datanode_clients: Arc<DatanodeClients>,
table_metadata_manager: TableMetadataManagerRef,
// TODO(LFC): Remove this field.
// DistInstance in FrontendCatalogManager is only used for creating distributed script table now.
// Once we have some standalone distributed table creator (like create distributed table procedure),
// we should use that.
dist_instance: Option<Arc<DistInstance>>,
}
impl FrontendCatalogManager {
@@ -79,14 +70,9 @@ impl FrontendCatalogManager {
partition_manager,
datanode_clients,
table_metadata_manager,
dist_instance: None,
}
}
pub fn set_dist_instance(&mut self, dist_instance: Arc<DistInstance>) {
self.dist_instance = Some(dist_instance)
}
pub fn backend(&self) -> KvBackendRef {
self.backend.clone()
}
@@ -180,99 +166,6 @@ impl CatalogManager for FrontendCatalogManager {
unimplemented!()
}
async fn register_system_table(
&self,
request: RegisterSystemTableRequest,
) -> catalog::error::Result<()> {
if let Some(dist_instance) = &self.dist_instance {
let open_hook = request.open_hook;
let request = request.create_table_request;
if let Some(table) = self
.table(
&request.catalog_name,
&request.schema_name,
&request.table_name,
)
.await?
{
if let Some(hook) = open_hook {
(hook)(table)?;
}
return Ok(());
}
let time_index = request
.schema
.column_schemas
.iter()
.find_map(|x| {
if x.is_time_index() {
Some(x.name.clone())
} else {
None
}
})
.context(InvalidSystemTableDefSnafu {
err_msg: "Time index is not defined.",
})?;
let primary_keys = request
.schema
.column_schemas
.iter()
.enumerate()
.filter_map(|(i, x)| {
if request.primary_key_indices.contains(&i) {
Some(x.name.clone())
} else {
None
}
})
.collect::<Vec<_>>();
let column_defs =
expr_factory::column_schemas_to_defs(request.schema.column_schemas, &primary_keys)
.map_err(|e| {
InvalidSystemTableDefSnafu {
err_msg: e.to_string(),
}
.build()
})?;
let mut create_table = CreateTableExpr {
catalog_name: request.catalog_name,
schema_name: request.schema_name,
table_name: request.table_name,
desc: request.desc.unwrap_or("".to_string()),
column_defs,
time_index,
primary_keys,
create_if_not_exists: request.create_if_not_exists,
table_options: (&request.table_options).into(),
table_id: None, // Should and will be assigned by Meta.
region_numbers: vec![0],
engine: request.engine,
};
let table = dist_instance
.create_table(&mut create_table, None)
.await
.map_err(BoxedError::new)
.context(InternalSnafu)?;
if let Some(hook) = open_hook {
(hook)(table)?;
}
Ok(())
} else {
UnimplementedSnafu {
operation: "register system table",
}
.fail()
}
}
async fn catalog_names(&self) -> CatalogResult<Vec<String>> {
let stream = self
.table_metadata_manager
@@ -374,16 +267,7 @@ impl CatalogManager for FrontendCatalogManager {
}
if schema == INFORMATION_SCHEMA_NAME {
// hack: use existing cyclin reference to get Arc<Self>.
// This can be remove by refactoring the struct into something like Arc<Inner>
common_telemetry::info!("going to use dist instance");
let manager = if let Some(instance) = self.dist_instance.as_ref() {
common_telemetry::info!("dist instance exist");
instance.catalog_manager() as _
} else {
common_telemetry::info!("dist instance doesn't exist");
return Ok(None);
};
let manager = Arc::new(self.clone()) as _;
let provider =
InformationSchemaProvider::new(catalog.to_string(), Arc::downgrade(&manager));

View File

@@ -155,7 +155,10 @@ pub enum Error {
#[snafu(display("Invalid DeleteRequest, reason: {}", reason))]
InvalidDeleteRequest { reason: String, location: Location },
#[snafu(display("Table not found: {}", table_name))]
#[snafu(display("Invalid system table definition: {err_msg}, at {location}"))]
InvalidSystemTableDef { err_msg: String, location: Location },
#[snafu(display("Table not found: '{}', at {location}", table_name))]
TableNotFound {
table_name: String,
location: Location,
@@ -739,6 +742,7 @@ impl ErrorExt for Error {
Error::IncompleteGrpcResult { .. }
| Error::ContextValueNotFound { .. }
| Error::InvalidSystemTableDef { .. }
| Error::EncodeJson { .. } => StatusCode::Unexpected,
Error::TableNotFound { .. } => StatusCode::TableNotFound,

View File

@@ -152,20 +152,19 @@ impl Instance {
let partition_manager = Arc::new(PartitionRuleManager::new(table_routes));
let table_metadata_manager = Arc::new(TableMetadataManager::new(meta_backend.clone()));
let mut catalog_manager = FrontendCatalogManager::new(
let catalog_manager = Arc::new(FrontendCatalogManager::new(
meta_backend.clone(),
meta_backend.clone(),
partition_manager.clone(),
datanode_clients.clone(),
table_metadata_manager.clone(),
);
));
let dist_instance =
DistInstance::new(meta_client.clone(), Arc::new(catalog_manager.clone()));
let dist_instance = Arc::new(dist_instance);
let dist_instance = Arc::new(DistInstance::new(
meta_client.clone(),
catalog_manager.clone(),
));
catalog_manager.set_dist_instance(dist_instance.clone());
let catalog_manager = Arc::new(catalog_manager);
let dist_request_handler = DistRegionRequestHandler::arc(catalog_manager.clone());
let query_engine = QueryEngineFactory::new_with_plugins(
@@ -383,12 +382,12 @@ impl Instance {
#[async_trait]
impl FrontendInstance for Instance {
async fn start(&self) -> Result<()> {
// TODO(hl): Frontend init should move to here
if let Some(heartbeat_task) = &self.heartbeat_task {
heartbeat_task.start().await?;
}
self.script_executor.start(self).await?;
futures::future::try_join_all(self.servers.values().map(start_server))
.await
.context(error::StartServerSnafu)

View File

@@ -34,6 +34,10 @@ mod dummy {
Ok(Self {})
}
pub async fn start(&self) -> Result<()> {
Ok(())
}
pub async fn insert_script(
&self,
_schema: &str,
@@ -56,12 +60,23 @@ mod dummy {
#[cfg(feature = "python")]
mod python {
use api::v1::ddl_request::Expr;
use api::v1::greptime_request::Request;
use api::v1::{CreateTableExpr, DdlRequest};
use catalog::RegisterSystemTableRequest;
use common_error::ext::BoxedError;
use common_meta::table_name::TableName;
use common_telemetry::logging::error;
use script::manager::ScriptManager;
use snafu::ResultExt;
use servers::query_handler::grpc::GrpcQueryHandler;
use session::context::QueryContext;
use snafu::{OptionExt, ResultExt};
use table::requests::CreateTableRequest;
use super::*;
use crate::error::{CatalogSnafu, InvalidSystemTableDefSnafu, TableNotFoundSnafu};
use crate::expr_factory;
use crate::instance::Instance;
pub struct ScriptExecutor {
script_manager: ScriptManager,
@@ -79,6 +94,107 @@ mod python {
})
}
pub async fn start(&self, instance: &Instance) -> Result<()> {
let RegisterSystemTableRequest {
create_table_request: request,
open_hook,
} = self.script_manager.create_table_request();
if let Some(table) = instance
.catalog_manager()
.table(
&request.catalog_name,
&request.schema_name,
&request.table_name,
)
.await
.context(CatalogSnafu)?
{
if let Some(open_hook) = open_hook {
(open_hook)(table).await.context(CatalogSnafu)?;
}
return Ok(());
}
let table_name = TableName::new(
&request.catalog_name,
&request.schema_name,
&request.table_name,
);
let expr = Self::create_table_expr(request)?;
let _ = instance
.do_query(
Request::Ddl(DdlRequest {
expr: Some(Expr::CreateTable(expr)),
}),
QueryContext::arc(),
)
.await?;
let table = instance
.catalog_manager()
.table(
&table_name.catalog_name,
&table_name.schema_name,
&table_name.table_name,
)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: table_name.to_string(),
})?;
if let Some(open_hook) = open_hook {
(open_hook)(table).await.context(CatalogSnafu)?;
}
Ok(())
}
fn create_table_expr(request: CreateTableRequest) -> Result<CreateTableExpr> {
let column_schemas = request.schema.column_schemas;
let time_index = column_schemas
.iter()
.find_map(|x| {
if x.is_time_index() {
Some(x.name.clone())
} else {
None
}
})
.context(InvalidSystemTableDefSnafu {
err_msg: "Time index is not defined.",
})?;
let primary_keys = request
.primary_key_indices
.iter()
// Indexing has to be safe because the create script table request is pre-defined.
.map(|i| column_schemas[*i].name.clone())
.collect::<Vec<_>>();
let column_defs = expr_factory::column_schemas_to_defs(column_schemas, &primary_keys)?;
Ok(CreateTableExpr {
catalog_name: request.catalog_name,
schema_name: request.schema_name,
table_name: request.table_name,
desc: request.desc.unwrap_or_default(),
column_defs,
time_index,
primary_keys,
create_if_not_exists: request.create_if_not_exists,
table_options: (&request.table_options).into(),
table_id: None, // Should and will be assigned by Meta.
region_numbers: vec![0],
engine: request.engine,
})
}
pub async fn insert_script(
&self,
schema: &str,

View File

@@ -16,20 +16,27 @@
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use catalog::CatalogManagerRef;
use catalog::{CatalogManagerRef, OpenSystemTableHook, RegisterSystemTableRequest};
use common_catalog::consts::{
default_engine, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, SCRIPTS_TABLE_ID,
};
use common_query::Output;
use common_telemetry::logging;
use futures::future::FutureExt;
use query::QueryEngineRef;
use snafu::{OptionExt, ResultExt};
use table::requests::{CreateTableRequest, TableOptions};
use table::TableRef;
use crate::engine::{CompileContext, EvalContext, Script, ScriptEngine};
use crate::error::{CompilePythonSnafu, ExecutePythonSnafu, Result, ScriptNotFoundSnafu};
use crate::python::{PyEngine, PyScript};
use crate::table::ScriptsTable;
use crate::table::{build_scripts_schema, ScriptsTable, SCRIPTS_TABLE_NAME};
pub struct ScriptManager {
compiled: RwLock<HashMap<String, Arc<PyScript>>>,
py_engine: PyEngine,
query_engine: QueryEngineRef,
table: ScriptsTable,
}
@@ -41,10 +48,41 @@ impl ScriptManager {
Ok(Self {
compiled: RwLock::new(HashMap::default()),
py_engine: PyEngine::new(query_engine.clone()),
table: ScriptsTable::new_empty(catalog_manager, query_engine)?,
query_engine: query_engine.clone(),
table: ScriptsTable::new(catalog_manager, query_engine).await?,
})
}
pub fn create_table_request(&self) -> RegisterSystemTableRequest {
let request = CreateTableRequest {
id: SCRIPTS_TABLE_ID,
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: SCRIPTS_TABLE_NAME.to_string(),
desc: Some("GreptimeDB scripts table for Python".to_string()),
schema: build_scripts_schema(),
region_numbers: vec![0],
// 'schema' and 'name' are primary keys
primary_key_indices: vec![0, 1],
create_if_not_exists: true,
table_options: TableOptions::default(),
engine: default_engine().to_string(),
};
let query_engine = self.query_engine.clone();
let hook: OpenSystemTableHook = Box::new(move |table: TableRef| {
let query_engine = query_engine.clone();
async move { ScriptsTable::recompile_register_udf(table, query_engine.clone()).await }
.boxed()
});
RegisterSystemTableRequest {
create_table_request: request,
open_hook: Some(hook),
}
}
/// compile script, and register them to the query engine and UDF registry
async fn compile(&self, name: &str, script: &str) -> Result<Arc<PyScript>> {
let script = Arc::new(Self::compile_without_cache(&self.py_engine, name, script).await?);

View File

@@ -17,10 +17,8 @@ use std::collections::HashMap;
use std::sync::Arc;
use catalog::error::CompileScriptInternalSnafu;
use catalog::{CatalogManagerRef, OpenSystemTableHook, RegisterSystemTableRequest};
use common_catalog::consts::{
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE, SCRIPTS_TABLE_ID,
};
use catalog::CatalogManagerRef;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_catalog::format_full_table_name;
use common_error::ext::BoxedError;
use common_query::Output;
@@ -38,16 +36,15 @@ use query::plan::LogicalPlan;
use query::QueryEngineRef;
use session::context::QueryContextBuilder;
use snafu::{ensure, OptionExt, ResultExt};
use table::requests::{CreateTableRequest, InsertRequest, TableOptions};
use table::requests::InsertRequest;
use table::table::adapter::DfTableProviderAdapter;
use table::TableRef;
use crate::error::{
BuildDfLogicalPlanSnafu, CastTypeSnafu, CollectRecordsSnafu, ExecuteInternalStatementSnafu,
FindColumnInScriptsTableSnafu, FindScriptSnafu, FindScriptsTableSnafu, InsertScriptSnafu,
RegisterScriptsTableSnafu, Result, ScriptNotFoundSnafu, ScriptsTableNotFoundSnafu,
Result, ScriptNotFoundSnafu, ScriptsTableNotFoundSnafu,
};
use crate::python::utils::block_on_async;
use crate::python::PyScript;
pub const SCRIPTS_TABLE_NAME: &str = "scripts";
@@ -151,40 +148,6 @@ impl ScriptsTable {
catalog_manager: CatalogManagerRef,
query_engine: QueryEngineRef,
) -> Result<Self> {
let schema = build_scripts_schema();
// TODO(dennis): we put scripts table into default catalog and schema.
// maybe put into system catalog?
let request = CreateTableRequest {
id: SCRIPTS_TABLE_ID,
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: SCRIPTS_TABLE_NAME.to_string(),
desc: Some("Scripts table".to_string()),
schema,
region_numbers: vec![0],
//schema and name as primary key
primary_key_indices: vec![0, 1],
create_if_not_exists: true,
table_options: TableOptions::default(),
engine: MITO_ENGINE.to_string(),
};
let callback_query_engine = query_engine.clone();
let script_recompile_callback: OpenSystemTableHook = Arc::new(move |table: TableRef| {
let callback_query_engine = callback_query_engine.clone();
block_on_async(async move {
Self::recompile_register_udf(table, callback_query_engine.clone()).await
})
.unwrap()
});
catalog_manager
.register_system_table(RegisterSystemTableRequest {
create_table_request: request,
open_hook: Some(script_recompile_callback),
})
.await
.context(RegisterScriptsTableSnafu)?;
Ok(Self {
catalog_manager,
query_engine,