From 03169c4a04b57841d66315cb51dbafeeaaa7ed94 Mon Sep 17 00:00:00 2001 From: dennis zhuang Date: Tue, 13 Sep 2022 15:09:00 +0800 Subject: [PATCH] feat: impl scripts table and /run-script restful api (#230) * feat: impl scripts table and /execute restful api * fix: test failures * fix: test failures * feat: impl /run_script API * refactor: rename run_script api to run-script and test script manager * fix: remove println * refactor: error mod * refactor: by CR comments * feat: rebase develop and change timestamp/gmt_crated/gmt_modified type to timestamp * refactor: use assert_eq instread of assert * doc: fix comment in Script#execute function --- Cargo.lock | 6 + src/catalog/src/consts.rs | 9 +- src/catalog/src/error.rs | 46 ++-- src/catalog/src/lib.rs | 29 ++- src/catalog/src/manager.rs | 115 +++++++++- src/datanode/src/error.rs | 7 + src/datanode/src/instance.rs | 29 +-- src/datanode/src/script.rs | 60 ++++-- src/datanode/src/server/grpc/ddl.rs | 4 +- src/datanode/src/sql.rs | 7 +- src/datanode/src/sql/create.rs | 2 +- src/datanode/src/tests/http_test.rs | 12 +- src/datanode/src/tests/test_util.rs | 4 +- src/script/Cargo.toml | 10 +- src/script/src/engine.rs | 4 +- src/script/src/error.rs | 128 ++++++++++++ src/script/src/lib.rs | 4 + src/script/src/manager.rs | 159 ++++++++++++++ src/script/src/python/engine.rs | 8 +- src/script/src/python/error.rs | 43 +++- src/script/src/table.rs | 220 ++++++++++++++++++++ src/servers/src/error.rs | 36 ++-- src/servers/src/http.rs | 3 +- src/servers/src/http/handler.rs | 38 +++- src/servers/src/query_handler.rs | 3 +- src/servers/tests/http/http_handler_test.rs | 8 +- src/servers/tests/mod.rs | 24 ++- src/table/src/requests.rs | 2 +- 28 files changed, 900 insertions(+), 120 deletions(-) create mode 100644 src/script/src/error.rs create mode 100644 src/script/src/manager.rs create mode 100644 src/script/src/table.rs diff --git a/Cargo.lock b/Cargo.lock index 076d598f97..ec30af55ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4379,6 +4379,8 @@ dependencies = [ "common-function", "common-query", "common-recordbatch", + "common-telemetry", + "common-time", "console", "datafusion", "datafusion-common", @@ -4387,6 +4389,7 @@ dependencies = [ "datatypes", "futures", "futures-util", + "log-store", "query", "ron", "rustpython-ast", @@ -4398,7 +4401,10 @@ dependencies = [ "serde", "snafu", "sql", + "storage", "table", + "table-engine", + "tempdir", "tokio", "tokio-test", ] diff --git a/src/catalog/src/consts.rs b/src/catalog/src/consts.rs index 5e74470ae8..68a82a4170 100644 --- a/src/catalog/src/consts.rs +++ b/src/catalog/src/consts.rs @@ -1,6 +1,13 @@ pub const SYSTEM_CATALOG_NAME: &str = "system"; pub const INFORMATION_SCHEMA_NAME: &str = "information_schema"; -pub const SYSTEM_CATALOG_TABLE_ID: u32 = 0; pub const SYSTEM_CATALOG_TABLE_NAME: &str = "system_catalog"; pub const DEFAULT_CATALOG_NAME: &str = "greptime"; pub const DEFAULT_SCHEMA_NAME: &str = "public"; + +/// Reserves [0,MIN_USER_TABLE_ID) for internal usage. +/// User defined table id starts from this value. +pub const MIN_USER_TABLE_ID: u32 = 1024; +/// system_catalog table id +pub const SYSTEM_CATALOG_TABLE_ID: u32 = 0; +/// scripts table id +pub const SCRIPTS_TABLE_ID: u32 = 1; diff --git a/src/catalog/src/error.rs b/src/catalog/src/error.rs index 9f5f880fdd..a908f6ce65 100644 --- a/src/catalog/src/error.rs +++ b/src/catalog/src/error.rs @@ -21,6 +21,17 @@ pub enum Error { source: table::error::Error, }, + #[snafu(display( + "Failed to create table, table info: {}, source: {}", + table_info, + source + ))] + CreateTable { + table_info: String, + #[snafu(backtrace)] + source: table::error::Error, + }, + #[snafu(display("System catalog is not valid: {}", msg))] SystemCatalog { msg: String, backtrace: Backtrace }, @@ -89,6 +100,9 @@ pub enum Error { #[snafu(backtrace)] source: table::error::Error, }, + + #[snafu(display("Illegal catalog manager state: {}", msg))] + IllegalManagerState { backtrace: Backtrace, msg: String }, } pub type Result = std::result::Result; @@ -97,21 +111,27 @@ impl ErrorExt for Error { fn status_code(&self) -> StatusCode { match self { Error::InvalidKey { .. } - | Error::OpenSystemCatalog { .. } - | Error::CreateSystemCatalog { .. } | Error::SchemaNotFound { .. } | Error::TableNotFound { .. } - | Error::InvalidEntryType { .. } => StatusCode::Unexpected, - Error::SystemCatalog { .. } - | Error::SystemCatalogTypeMismatch { .. } - | Error::EmptyValue - | Error::ValueDeserialize { .. } + | Error::IllegalManagerState { .. } | Error::CatalogNotFound { .. } - | Error::OpenTable { .. } - | Error::ReadSystemCatalog { .. } - | Error::InsertTableRecord { .. } => StatusCode::StorageUnavailable, + | Error::InvalidEntryType { .. } => StatusCode::Unexpected, + + Error::SystemCatalog { .. } | Error::EmptyValue | Error::ValueDeserialize { .. } => { + StatusCode::StorageUnavailable + } + + Error::ReadSystemCatalog { source, .. } => source.status_code(), + Error::SystemCatalogTypeMismatch { source, .. } => source.status_code(), + Error::RegisterTable { .. } => StatusCode::Internal, Error::TableExists { .. } => StatusCode::TableAlreadyExists, + + Error::OpenSystemCatalog { source, .. } + | Error::CreateSystemCatalog { source, .. } + | Error::InsertTableRecord { source, .. } + | Error::OpenTable { source, .. } + | Error::CreateTable { source, .. } => source.status_code(), } } @@ -155,7 +175,7 @@ mod tests { ); assert_eq!( - StatusCode::Unexpected, + StatusCode::StorageUnavailable, Error::OpenSystemCatalog { source: table::error::Error::new(MockError::new(StatusCode::StorageUnavailable)) } @@ -163,7 +183,7 @@ mod tests { ); assert_eq!( - StatusCode::Unexpected, + StatusCode::StorageUnavailable, Error::CreateSystemCatalog { source: table::error::Error::new(MockError::new(StatusCode::StorageUnavailable)) } @@ -180,7 +200,7 @@ mod tests { ); assert_eq!( - StatusCode::StorageUnavailable, + StatusCode::Internal, Error::SystemCatalogTypeMismatch { data_type: DataType::Boolean, source: datatypes::error::Error::UnsupportedArrowType { diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index cf74028790..4b667110aa 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -4,13 +4,14 @@ use std::any::Any; use std::sync::Arc; use table::metadata::TableId; +use table::requests::CreateTableRequest; use table::TableRef; -pub use crate::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +pub use crate::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID}; pub use crate::manager::LocalCatalogManager; pub use crate::schema::{SchemaProvider, SchemaProviderRef}; -mod consts; +pub mod consts; pub mod error; mod manager; pub mod memory; @@ -70,10 +71,34 @@ pub trait CatalogManager: CatalogList { /// Registers a table given given catalog/schema to catalog manager, /// returns table registered. async fn register_table(&self, request: RegisterTableRequest) -> error::Result; + + /// Register a system table, should be called before starting the manager. + async fn register_system_table(&self, request: RegisterSystemTableRequest) + -> error::Result<()>; + + /// Returns the table by catalog, schema and table name. + fn table( + &self, + catalog: Option<&str>, + schema: Option<&str>, + table_name: &str, + ) -> error::Result>; } pub type CatalogManagerRef = Arc; +/// Hook called after system table opening. +pub type OpenSystemTableHook = Arc error::Result<()> + Send + Sync>; + +/// Register system table request: +/// - When system table is already created and registered, the hook will be called +/// with table ref after opening the system table +/// - When system table is not exists, create and register the table by create_table_request and calls open_hook with the created table. +pub struct RegisterSystemTableRequest { + pub create_table_request: CreateTableRequest, + pub open_hook: Option, +} + pub struct RegisterTableRequest { pub catalog: Option, pub schema: Option, diff --git a/src/catalog/src/manager.rs b/src/catalog/src/manager.rs index affec86815..8ec2aadbd9 100644 --- a/src/catalog/src/manager.rs +++ b/src/catalog/src/manager.rs @@ -13,12 +13,16 @@ use table::engine::{EngineContext, TableEngineRef}; use table::metadata::TableId; use table::requests::OpenTableRequest; use table::table::numbers::NumbersTable; +use table::TableRef; use super::error::Result; -use crate::consts::{INFORMATION_SCHEMA_NAME, SYSTEM_CATALOG_NAME, SYSTEM_CATALOG_TABLE_NAME}; +use crate::consts::{ + INFORMATION_SCHEMA_NAME, MIN_USER_TABLE_ID, SYSTEM_CATALOG_NAME, SYSTEM_CATALOG_TABLE_NAME, +}; use crate::error::{ - CatalogNotFoundSnafu, OpenTableSnafu, ReadSystemCatalogSnafu, SchemaNotFoundSnafu, - SystemCatalogSnafu, SystemCatalogTypeMismatchSnafu, TableExistsSnafu, TableNotFoundSnafu, + CatalogNotFoundSnafu, CreateTableSnafu, IllegalManagerStateSnafu, OpenTableSnafu, + ReadSystemCatalogSnafu, SchemaNotFoundSnafu, SystemCatalogSnafu, + SystemCatalogTypeMismatchSnafu, TableExistsSnafu, TableNotFoundSnafu, }; use crate::memory::{MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider}; use crate::system::{ @@ -28,7 +32,8 @@ use crate::system::{ use crate::tables::SystemCatalog; use crate::{ format_full_table_name, CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, - RegisterTableRequest, SchemaProvider, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, + RegisterSystemTableRequest, RegisterTableRequest, SchemaProvider, DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, }; /// A `CatalogManager` consists of a system catalog and a bunch of user catalogs. @@ -37,7 +42,8 @@ pub struct LocalCatalogManager { catalogs: Arc, engine: TableEngineRef, next_table_id: AtomicU32, - lock: Mutex<()>, + init_lock: Mutex, + system_table_requests: Mutex>, } impl LocalCatalogManager { @@ -54,8 +60,9 @@ impl LocalCatalogManager { system: system_catalog, catalogs: memory_catalog_list, engine, - next_table_id: AtomicU32::new(0), - lock: Mutex::new(()), + next_table_id: AtomicU32::new(MIN_USER_TABLE_ID), + init_lock: Mutex::new(false), + system_table_requests: Mutex::new(Vec::default()), }) } @@ -78,7 +85,54 @@ impl LocalCatalogManager { max_table_id ); self.next_table_id - .store(max_table_id + 1, Ordering::Relaxed); + .store((max_table_id + 1).max(MIN_USER_TABLE_ID), Ordering::Relaxed); + *self.init_lock.lock().await = true; + + // Processing system table hooks + let mut sys_table_requests = self.system_table_requests.lock().await; + for req in sys_table_requests.drain(..) { + let catalog_name = &req.create_table_request.catalog_name; + let schema_name = &req.create_table_request.schema_name; + let table_name = &req.create_table_request.table_name; + let table_id = req.create_table_request.id; + + let table = if let Some(table) = + self.table(catalog_name.as_deref(), schema_name.as_deref(), table_name)? + { + table + } else { + let table = self + .engine + .create_table(&EngineContext::default(), req.create_table_request.clone()) + .await + .with_context(|_| CreateTableSnafu { + table_info: format!( + "{}.{}.{}, id: {}", + catalog_name.as_deref().unwrap_or(DEFAULT_CATALOG_NAME), + schema_name.as_deref().unwrap_or(DEFAULT_SCHEMA_NAME), + table_name, + table_id, + ), + })?; + self.register_table(RegisterTableRequest { + catalog: catalog_name.clone(), + schema: schema_name.clone(), + table_name: table_name.clone(), + table_id, + table: table.clone(), + }) + .await?; + + info!("Created and registered system table: {}", table_name); + + table + }; + + if let Some(hook) = req.open_hook { + (hook)(table)?; + } + } + Ok(()) } @@ -265,7 +319,15 @@ impl CatalogManager for LocalCatalogManager { } async fn register_table(&self, request: RegisterTableRequest) -> Result { - let _lock = self.lock.lock().await; + let started = self.init_lock.lock().await; + + ensure!( + *started, + IllegalManagerStateSnafu { + msg: "Catalog manager not started", + } + ); + let catalog_name = request .catalog .unwrap_or_else(|| DEFAULT_CATALOG_NAME.to_string()); @@ -304,4 +366,39 @@ impl CatalogManager for LocalCatalogManager { schema.register_table(request.table_name, request.table)?; Ok(1) } + + async fn register_system_table(&self, request: RegisterSystemTableRequest) -> Result<()> { + ensure!( + !*self.init_lock.lock().await, + IllegalManagerStateSnafu { + msg: "Catalog manager already started", + } + ); + + let mut sys_table_requests = self.system_table_requests.lock().await; + sys_table_requests.push(request); + + Ok(()) + } + + fn table( + &self, + catalog: Option<&str>, + schema: Option<&str>, + table_name: &str, + ) -> Result> { + let catalog_name = catalog.unwrap_or(DEFAULT_CATALOG_NAME); + let schema_name = schema.unwrap_or(DEFAULT_SCHEMA_NAME); + + let catalog = self + .catalogs + .catalog(catalog_name) + .context(CatalogNotFoundSnafu { catalog_name })?; + let schema = catalog + .schema(schema_name) + .with_context(|| SchemaNotFoundSnafu { + schema_info: format!("{}.{}", catalog_name, schema_name), + })?; + Ok(schema.table(table_name)) + } } diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index ef922fe66a..9589db0dfd 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -191,6 +191,12 @@ pub enum Error { #[snafu(display("Invalid ColumnDef in protobuf msg: {}", msg))] InvalidColumnDef { msg: String, backtrace: Backtrace }, + + #[snafu(display("Failed to start script manager, source: {}", source))] + StartScriptManager { + #[snafu(backtrace)] + source: script::error::Error, + }, } pub type Result = std::result::Result; @@ -232,6 +238,7 @@ impl ErrorExt for Error { | Error::UnsupportedExpr { .. } => StatusCode::Internal, Error::InitBackend { .. } => StatusCode::StorageUnavailable, Error::OpenLogStore { source } => source.status_code(), + Error::StartScriptManager { source } => source.status_code(), Error::OpenStorageEngine { source } => source.status_code(), Error::RuntimeResource { .. } => StatusCode::RuntimeResourcesExhausted, } diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 1900a4cc03..48db6b74af 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -53,7 +53,7 @@ impl Instance { let object_store = new_object_store(&opts.storage).await?; let log_store = create_local_file_log_store(opts).await?; - let table_engine = DefaultEngine::new( + let table_engine = Arc::new(DefaultEngine::new( TableEngineConfig::default(), EngineImpl::new( StorageEngineConfig::default(), @@ -61,9 +61,7 @@ impl Instance { object_store.clone(), ), object_store, - ); - let table_engine = Arc::new(table_engine); - + )); let catalog_manager = Arc::new( catalog::LocalCatalogManager::try_new(table_engine.clone()) .await @@ -71,7 +69,8 @@ impl Instance { ); let factory = QueryEngineFactory::new(catalog_manager.clone()); let query_engine = factory.query_engine().clone(); - let script_executor = ScriptExecutor::new(query_engine.clone()); + let script_executor = + ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()).await?; Ok(Self { query_engine: query_engine.clone(), @@ -220,12 +219,11 @@ impl Instance { use table_engine::table::test_util::MockMitoEngine; let (_dir, object_store) = new_test_object_store("setup_mock_engine_and_table").await; - let mock_engine = MockMitoEngine::new( + let mock_engine = Arc::new(MockMitoEngine::new( TableEngineConfig::default(), MockEngine::default(), object_store, - ); - let mock_engine = Arc::new(mock_engine); + )); let catalog_manager = Arc::new( catalog::LocalCatalogManager::try_new(mock_engine.clone()) @@ -236,9 +234,12 @@ impl Instance { let factory = QueryEngineFactory::new(catalog_manager.clone()); let query_engine = factory.query_engine().clone(); - let sql_handler = SqlHandler::new(mock_engine, catalog_manager.clone()); + let sql_handler = SqlHandler::new(mock_engine.clone(), catalog_manager.clone()); let physical_planner = PhysicalPlanner::new(query_engine.clone()); - let script_executor = ScriptExecutor::new(query_engine.clone()); + let script_executor = ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()) + .await + .unwrap(); + Ok(Self { query_engine, sql_handler, @@ -301,8 +302,12 @@ impl SqlQueryHandler for Instance { .context(servers::error::ExecuteQuerySnafu { query }) } - async fn execute_script(&self, script: &str) -> servers::error::Result { - self.script_executor.execute_script(script).await + async fn insert_script(&self, name: &str, script: &str) -> servers::error::Result<()> { + self.script_executor.insert_script(name, script).await + } + + async fn execute_script(&self, name: &str) -> servers::error::Result { + self.script_executor.execute_script(name).await } } diff --git a/src/datanode/src/script.rs b/src/datanode/src/script.rs index f18aa73574..979bbe00d0 100644 --- a/src/datanode/src/script.rs +++ b/src/datanode/src/script.rs @@ -1,6 +1,9 @@ +use catalog::CatalogManagerRef; use query::Output; use query::QueryEngineRef; +use crate::error::Result; + #[cfg(not(feature = "python"))] mod dummy { use super::*; @@ -8,8 +11,19 @@ mod dummy { pub struct ScriptExecutor; impl ScriptExecutor { - pub fn new(_query_engine: QueryEngineRef) -> Self { - Self {} + pub async fn new( + _catalog_manager: CatalogManagerRef, + _query_engine: QueryEngineRef, + ) -> Result { + Ok(Self {}) + } + + pub async fn insert_script( + &self, + _name: &str, + _script: &str, + ) -> servers::error::Result<()> { + servers::error::NotSupportedSnafu { feat: "script" }.fail() } pub async fn execute_script(&self, _script: &str) -> servers::error::Result { @@ -22,44 +36,50 @@ mod dummy { mod python { use common_error::prelude::BoxedError; use common_telemetry::logging::error; - use script::{ - engine::{CompileContext, EvalContext, Script, ScriptEngine}, - python::PyEngine, - }; + use script::manager::ScriptManager; use snafu::ResultExt; use super::*; pub struct ScriptExecutor { - py_engine: PyEngine, + script_manager: ScriptManager, } impl ScriptExecutor { - pub fn new(query_engine: QueryEngineRef) -> Self { - Self { - py_engine: PyEngine::new(query_engine), - } + pub async fn new( + catalog_manager: CatalogManagerRef, + query_engine: QueryEngineRef, + ) -> Result { + Ok(Self { + script_manager: ScriptManager::new(catalog_manager, query_engine) + .await + .context(crate::error::StartScriptManagerSnafu)?, + }) } - pub async fn execute_script(&self, script: &str) -> servers::error::Result { - let py_script = self - .py_engine - .compile(script, CompileContext::default()) + pub async fn insert_script(&self, name: &str, script: &str) -> servers::error::Result<()> { + let _s = self + .script_manager + .insert_and_compile(name, script) .await .map_err(|e| { - error!(e; "Instance failed to execute script"); + error!(e; "Instance failed to insert script"); BoxedError::new(e) }) - .context(servers::error::ExecuteScriptSnafu { script })?; + .context(servers::error::InsertScriptSnafu { name })?; - py_script - .evaluate(EvalContext::default()) + Ok(()) + } + + pub async fn execute_script(&self, name: &str) -> servers::error::Result { + self.script_manager + .execute(name) .await .map_err(|e| { error!(e; "Instance failed to execute script"); BoxedError::new(e) }) - .context(servers::error::ExecuteScriptSnafu { script }) + .context(servers::error::ExecuteScriptSnafu { name }) } } } diff --git a/src/datanode/src/server/grpc/ddl.rs b/src/datanode/src/server/grpc/ddl.rs index dc9397dbf1..7b5a76dc45 100644 --- a/src/datanode/src/server/grpc/ddl.rs +++ b/src/datanode/src/server/grpc/ddl.rs @@ -172,6 +172,8 @@ fn create_column_schema(column_def: &ColumnDef) -> Result { mod tests { use std::collections::HashMap; + use catalog::MIN_USER_TABLE_ID; + use super::*; use crate::tests::test_util; @@ -183,7 +185,7 @@ mod tests { let expr = testing_create_expr(); let request = instance.create_expr_to_request(expr).unwrap(); - assert_eq!(request.id, 1); + assert_eq!(request.id, MIN_USER_TABLE_ID); assert_eq!(request.catalog_name, None); assert_eq!(request.schema_name, None); assert_eq!(request.table_name, "my-metrics"); diff --git a/src/datanode/src/sql.rs b/src/datanode/src/sql.rs index 810cb60c56..4266b87f58 100644 --- a/src/datanode/src/sql.rs +++ b/src/datanode/src/sql.rs @@ -59,7 +59,7 @@ impl SqlHandler { } /// Converts maybe fully-qualified table name (`..` or `
` when -/// catalog and schema are default) to tuple. +/// catalog and schema are default) to tuple. fn table_idents_to_full_name( obj_name: &ObjectName, ) -> Result<(Option, Option, String)> { @@ -230,7 +230,7 @@ mod tests { ('host2', 88.8, 333.3, 1655276558000) "#; - let table_engine = MitoEngine::>::new( + let table_engine = Arc::new(MitoEngine::>::new( TableEngineConfig::default(), EngineImpl::new( StorageEngineConfig::default(), @@ -238,8 +238,7 @@ mod tests { object_store.clone(), ), object_store, - ); - let table_engine = Arc::new(table_engine); + )); let catalog_list = Arc::new( catalog::LocalCatalogManager::try_new(table_engine.clone()) diff --git a/src/datanode/src/sql/create.rs b/src/datanode/src/sql/create.rs index de4e4d10b1..8010abf913 100644 --- a/src/datanode/src/sql/create.rs +++ b/src/datanode/src/sql/create.rs @@ -205,7 +205,7 @@ mod tests { assert_matches!(error, Error::CreateSchema { .. }); } - /// If primary key is not specified, time index should be used as primary key. + /// If primary key is not specified, time index should be used as primary key. #[tokio::test] pub async fn test_primary_key_not_specified() { let handler = create_mock_sql_handler().await; diff --git a/src/datanode/src/tests/http_test.rs b/src/datanode/src/tests/http_test.rs index 355b6fa214..fba752185b 100644 --- a/src/datanode/src/tests/http_test.rs +++ b/src/datanode/src/tests/http_test.rs @@ -110,10 +110,11 @@ async fn test_scripts_api() { let res = client .post("/v1/scripts") .json(&ScriptExecution { + name: "test".to_string(), script: r#" @copr(sql='select number from numbers limit 10', args=['number'], returns=['n']) def test(n): - return n; + return n + 1; "# .to_string(), }) @@ -121,10 +122,17 @@ def test(n): .await; assert_eq!(res.status(), StatusCode::OK); + let body = res.text().await; + assert_eq!(body, r#"{"success":true}"#,); + + // call script + let res = client.post("/v1/run-script?name=test").send().await; + assert_eq!(res.status(), StatusCode::OK); + let body = res.text().await; assert_eq!( body, - r#"{"success":true,"output":{"Rows":[{"schema":{"fields":[{"name":"n","data_type":"UInt32","is_nullable":false,"metadata":{}}],"metadata":{}},"columns":[[0,1,2,3,4,5,6,7,8,9]]}]}}"# + r#"{"success":true,"output":{"Rows":[{"schema":{"fields":[{"name":"n","data_type":"Float64","is_nullable":false,"metadata":{}}],"metadata":{}},"columns":[[1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0]]}]}}"#, ); } diff --git a/src/datanode/src/tests/test_util.rs b/src/datanode/src/tests/test_util.rs index 8eb713a2d4..d949ee2bea 100644 --- a/src/datanode/src/tests/test_util.rs +++ b/src/datanode/src/tests/test_util.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; use std::sync::Arc; -use catalog::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use catalog::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID}; use datatypes::data_type::ConcreteDataType; use datatypes::schema::{ColumnSchema, SchemaBuilder}; use snafu::ResultExt; @@ -57,7 +57,7 @@ pub async fn create_test_table(instance: &Instance) -> Result<()> { .create_table( &EngineContext::default(), CreateTableRequest { - id: 1, + id: MIN_USER_TABLE_ID, catalog_name: None, schema_name: None, table_name: table_name.to_string(), diff --git a/src/script/Cargo.toml b/src/script/Cargo.toml index a88007dd2f..4f5d75905a 100644 --- a/src/script/Cargo.toml +++ b/src/script/Cargo.toml @@ -19,10 +19,13 @@ python = [ [dependencies] async-trait = "0.1" +catalog = { path = "../catalog" } common-error = {path = "../common/error"} common-function = { path = "../common/function" } common-query = {path = "../common/query"} common-recordbatch = {path = "../common/recordbatch" } +common-telemetry = { path = "../common/telemetry" } +common-time = { path = "../common/time" } console = "0.15" datafusion = {git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", optional = true} datafusion-common = {git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2"} @@ -40,11 +43,14 @@ rustpython-parser = {git = "https://github.com/RustPython/RustPython", optional rustpython-vm = {git = "https://github.com/RustPython/RustPython", optional = true, rev = "02a1d1d"} snafu = {version = "0.7", features = ["backtraces"]} sql = { path = "../sql" } +table = { path = "../table" } [dev-dependencies] -catalog = { path = "../catalog" } +log-store = { path = "../log-store" } ron = "0.7" serde = {version = "1.0", features = ["derive"]} -table = { path = "../table" } +storage = { path = "../storage" } +table-engine = { path = "../table-engine", features = ["test"] } +tempdir = "0.3" tokio = { version = "1.18", features = ["full"] } tokio-test = "0.4" diff --git a/src/script/src/engine.rs b/src/script/src/engine.rs index 3c6850bbf6..9522c21bc4 100644 --- a/src/script/src/engine.rs +++ b/src/script/src/engine.rs @@ -15,8 +15,8 @@ pub trait Script { fn as_any(&self) -> &dyn Any; - /// Evaluate the script and returns the output. - async fn evaluate(&self, ctx: EvalContext) -> std::result::Result; + /// Execute the script and returns the output. + async fn execute(&self, ctx: EvalContext) -> std::result::Result; } #[async_trait] diff --git a/src/script/src/error.rs b/src/script/src/error.rs new file mode 100644 index 0000000000..79e640358d --- /dev/null +++ b/src/script/src/error.rs @@ -0,0 +1,128 @@ +use std::any::Any; + +use common_error::ext::ErrorExt; +use common_error::prelude::{Snafu, StatusCode}; +use snafu::{Backtrace, ErrorCompat}; + +#[derive(Debug, Snafu)] +#[snafu(visibility(pub))] +pub enum Error { + #[snafu(display("Failed to find scripts table, source: {}", source))] + FindScriptsTable { + #[snafu(backtrace)] + source: catalog::error::Error, + }, + + #[snafu(display("Failed to register scripts table, source: {}", source))] + RegisterScriptsTable { + #[snafu(backtrace)] + source: catalog::error::Error, + }, + + #[snafu(display("Scripts table not found"))] + ScriptsTableNotFound { backtrace: Backtrace }, + + #[snafu(display( + "Failed to insert script to scripts table, name: {}, source: {}", + name, + source + ))] + InsertScript { + name: String, + #[snafu(backtrace)] + source: table::error::Error, + }, + + #[snafu(display("Failed to compile python script, name: {}, source: {}", name, source))] + CompilePython { + name: String, + #[snafu(backtrace)] + source: crate::python::error::Error, + }, + + #[snafu(display("Failed to execute python script {}, source: {}", name, source))] + ExecutePython { + name: String, + #[snafu(backtrace)] + source: crate::python::error::Error, + }, + + #[snafu(display("Script not found, name: {}", name))] + ScriptNotFound { backtrace: Backtrace, name: String }, + + #[snafu(display("Failed to find script by name: {}", name))] + FindScript { + name: String, + #[snafu(backtrace)] + source: query::error::Error, + }, + + #[snafu(display("Failed to collect record batch, source: {}", source))] + CollectRecords { + #[snafu(backtrace)] + source: common_recordbatch::error::Error, + }, + + #[snafu(display("Failed to cast type, msg: {}", msg))] + CastType { msg: String, backtrace: Backtrace }, +} + +pub type Result = std::result::Result; + +impl ErrorExt for Error { + fn status_code(&self) -> StatusCode { + use Error::*; + match self { + CastType { .. } => StatusCode::Unexpected, + ScriptsTableNotFound { .. } => StatusCode::TableNotFound, + RegisterScriptsTable { source } | FindScriptsTable { source } => source.status_code(), + InsertScript { source, .. } => source.status_code(), + CompilePython { source, .. } | ExecutePython { source, .. } => source.status_code(), + FindScript { source, .. } => source.status_code(), + CollectRecords { source } => source.status_code(), + ScriptNotFound { .. } => StatusCode::InvalidArguments, + } + } + + fn backtrace_opt(&self) -> Option<&Backtrace> { + ErrorCompat::backtrace(self) + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +#[cfg(test)] +mod tests { + use snafu::ResultExt; + + use super::*; + + fn throw_catalog_error() -> catalog::error::Result<()> { + catalog::error::IllegalManagerStateSnafu { msg: "test" }.fail() + } + + fn throw_python_error() -> crate::python::error::Result<()> { + crate::python::error::CoprParseSnafu { + reason: "test", + loc: None, + } + .fail() + } + + #[test] + fn test_error() { + let err = throw_catalog_error() + .context(FindScriptsTableSnafu) + .unwrap_err(); + assert_eq!(StatusCode::Unexpected, err.status_code()); + assert!(err.backtrace_opt().is_some()); + + let err = throw_python_error() + .context(ExecutePythonSnafu { name: "test" }) + .unwrap_err(); + assert_eq!(StatusCode::InvalidArguments, err.status_code()); + assert!(err.backtrace_opt().is_some()); + } +} diff --git a/src/script/src/lib.rs b/src/script/src/lib.rs index b11a118d8c..a5cb3d92b7 100644 --- a/src/script/src/lib.rs +++ b/src/script/src/lib.rs @@ -1,3 +1,7 @@ pub mod engine; +pub mod error; +#[cfg(feature = "python")] +pub mod manager; #[cfg(feature = "python")] pub mod python; +mod table; diff --git a/src/script/src/manager.rs b/src/script/src/manager.rs new file mode 100644 index 0000000000..c76daff23f --- /dev/null +++ b/src/script/src/manager.rs @@ -0,0 +1,159 @@ +//! Scripts manager +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; + +use catalog::CatalogManagerRef; +use common_telemetry::logging; +use query::{Output, QueryEngineRef}; +use snafu::{OptionExt, ResultExt}; + +use crate::engine::{CompileContext, EvalContext, Script, ScriptEngine}; +use crate::error::{CompilePythonSnafu, ExecutePythonSnafu, Result, ScriptNotFoundSnafu}; +use crate::python::{PyEngine, PyScript}; +use crate::table::ScriptsTable; + +pub struct ScriptManager { + compiled: RwLock>>, + py_engine: PyEngine, + table: ScriptsTable, +} + +impl ScriptManager { + pub async fn new( + catalog_manager: CatalogManagerRef, + query_engine: QueryEngineRef, + ) -> Result { + Ok(Self { + compiled: RwLock::new(HashMap::default()), + py_engine: PyEngine::new(query_engine.clone()), + table: ScriptsTable::new(catalog_manager, query_engine).await?, + }) + } + + async fn compile(&self, name: &str, script: &str) -> Result> { + let script = Arc::new( + self.py_engine + .compile(script, CompileContext::default()) + .await + .context(CompilePythonSnafu { name })?, + ); + + let mut compiled = self.compiled.write().unwrap(); + compiled.insert(name.to_string(), script.clone()); + + logging::info!("Compiled and cached script: {}", name); + + Ok(script) + } + + pub async fn insert_and_compile(&self, name: &str, script: &str) -> Result> { + let compiled_script = self.compile(name, script).await?; + self.table.insert(name, script).await?; + Ok(compiled_script) + } + + pub async fn execute(&self, name: &str) -> Result { + let script = { + let s = self.compiled.read().unwrap().get(name).cloned(); + + if s.is_some() { + s + } else { + self.try_find_script_and_compile(name).await? + } + }; + + let script = script.context(ScriptNotFoundSnafu { name })?; + + script + .execute(EvalContext::default()) + .await + .context(ExecutePythonSnafu { name }) + } + + async fn try_find_script_and_compile(&self, name: &str) -> Result>> { + let script = self.table.find_script_by_name(name).await?; + + Ok(Some(self.compile(name, &script).await?)) + } +} + +#[cfg(test)] +mod tests { + use catalog::CatalogManager; + use query::QueryEngineFactory; + use table_engine::config::EngineConfig as TableEngineConfig; + use table_engine::table::test_util::new_test_object_store; + + use super::*; + type DefaultEngine = MitoEngine>; + use log_store::fs::{config::LogConfig, log::LocalFileLogStore}; + use storage::{config::EngineConfig as StorageEngineConfig, EngineImpl}; + use table_engine::engine::MitoEngine; + use tempdir::TempDir; + + #[tokio::test] + async fn test_insert_find_compile_script() { + let wal_dir = TempDir::new("test_insert_find_compile_script_wal").unwrap(); + let wal_dir_str = wal_dir.path().to_string_lossy(); + + common_telemetry::init_default_ut_logging(); + let (_dir, object_store) = new_test_object_store("test_insert_find_compile_script").await; + let log_config = LogConfig { + log_file_dir: wal_dir_str.to_string(), + ..Default::default() + }; + + let log_store = LocalFileLogStore::open(&log_config).await.unwrap(); + + let mock_engine = Arc::new(DefaultEngine::new( + TableEngineConfig::default(), + EngineImpl::new( + StorageEngineConfig::default(), + Arc::new(log_store), + object_store.clone(), + ), + object_store, + )); + + let catalog_manager = Arc::new( + catalog::LocalCatalogManager::try_new(mock_engine.clone()) + .await + .unwrap(), + ); + + let factory = QueryEngineFactory::new(catalog_manager.clone()); + let query_engine = factory.query_engine().clone(); + let mgr = ScriptManager::new(catalog_manager.clone(), query_engine) + .await + .unwrap(); + catalog_manager.start().await.unwrap(); + + let name = "test"; + mgr.table + .insert( + name, + r#" +@copr(sql='select number from numbers limit 10', args=['number'], returns=['n']) +def test(n): + return n + 1; +"#, + ) + .await + .unwrap(); + + { + let cached = mgr.compiled.read().unwrap(); + assert!(cached.get(name).is_none()); + } + + // try to find and compile + let script = mgr.try_find_script_and_compile(name).await.unwrap(); + assert!(script.is_some()); + + { + let cached = mgr.compiled.read().unwrap(); + assert!(cached.get(name).is_some()); + } + } +} diff --git a/src/script/src/python/engine.rs b/src/script/src/python/engine.rs index 6bf74b02bf..2ef9d083ef 100644 --- a/src/script/src/python/engine.rs +++ b/src/script/src/python/engine.rs @@ -77,7 +77,7 @@ impl Script for PyScript { self } - async fn evaluate(&self, _ctx: EvalContext) -> Result { + async fn execute(&self, _ctx: EvalContext) -> Result { if let Some(sql) = &self.copr.deco_args.sql { let stmt = self.query_engine.sql_to_statement(sql)?; ensure!( @@ -150,7 +150,7 @@ mod tests { use super::*; #[tokio::test] - async fn test_compile_evaluate() { + async fn test_compile_execute() { let catalog_list = catalog::memory::new_memory_catalog_list().unwrap(); let default_schema = Arc::new(MemorySchemaProvider::new()); @@ -176,7 +176,7 @@ def test(a, b, c): .compile(script, CompileContext::default()) .await .unwrap(); - let output = script.evaluate(EvalContext::default()).await.unwrap(); + let output = script.execute(EvalContext::default()).await.unwrap(); match output { Output::RecordBatch(stream) => { let numbers = util::collect(stream).await.unwrap(); @@ -207,7 +207,7 @@ def test(a): .compile(script, CompileContext::default()) .await .unwrap(); - let output = script.evaluate(EvalContext::default()).await.unwrap(); + let output = script.execute(EvalContext::default()).await.unwrap(); match output { Output::RecordBatch(stream) => { let numbers = util::collect(stream).await.unwrap(); diff --git a/src/script/src/python/error.rs b/src/script/src/python/error.rs index c06d20a9b4..5eae26ae99 100644 --- a/src/script/src/python/error.rs +++ b/src/script/src/python/error.rs @@ -48,11 +48,11 @@ pub enum Error { /// errors in coprocessors' parse check for types and etc. #[snafu(display("Coprocessor error: {} {}.", reason, - if let Some(loc) = loc{ - format!("at {loc}") - }else{ - "".into() - }))] + if let Some(loc) = loc{ + format!("at {loc}") + }else{ + "".into() + }))] CoprParse { backtrace: Backtrace, reason: String, @@ -89,12 +89,13 @@ impl From for Error { impl ErrorExt for Error { fn status_code(&self) -> StatusCode { match self { - Error::Arrow { .. } - | Error::TypeCast { .. } - | Error::DatabaseQuery { .. } - | Error::PyRuntime { .. } - | Error::RecordBatch { .. } - | Error::Other { .. } => StatusCode::Internal, + Error::Arrow { .. } | Error::PyRuntime { .. } | Error::Other { .. } => { + StatusCode::Internal + } + + Error::RecordBatch { source } => source.status_code(), + Error::DatabaseQuery { source } => source.status_code(), + Error::TypeCast { source } => source.status_code(), Error::PyParse { .. } | Error::PyCompile { .. } @@ -187,3 +188,23 @@ pub fn get_error_reason_loc(err: &Error) -> (String, Option) { _ => (format!("Unknown error: {:?}", err), None), } } + +#[cfg(test)] +mod tests { + use common_error::mock::MockError; + use snafu::ResultExt; + + use super::*; + + fn throw_query_error() -> query::error::Result<()> { + let mock_err = MockError::with_backtrace(StatusCode::TableColumnNotFound); + Err(query::error::Error::new(mock_err)) + } + + #[test] + fn test_error() { + let err = throw_query_error().context(DatabaseQuerySnafu).unwrap_err(); + assert_eq!(StatusCode::TableColumnNotFound, err.status_code()); + assert!(err.backtrace_opt().is_some()); + } +} diff --git a/src/script/src/table.rs b/src/script/src/table.rs new file mode 100644 index 0000000000..34ef239fbc --- /dev/null +++ b/src/script/src/table.rs @@ -0,0 +1,220 @@ +//! Scripts table +use std::collections::HashMap; +use std::sync::Arc; + +use catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, SCRIPTS_TABLE_ID}; +use catalog::{CatalogManagerRef, RegisterSystemTableRequest}; +use common_recordbatch::util as record_util; +use common_telemetry::logging; +use common_time::timestamp::Timestamp; +use common_time::util; +use datatypes::arrow::array::Utf8Array; +use datatypes::prelude::ConcreteDataType; +use datatypes::prelude::ScalarVector; +use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder}; +use datatypes::vectors::{StringVector, TimestampVector, VectorRef}; +use query::{Output, QueryEngineRef}; +use snafu::{ensure, OptionExt, ResultExt}; +use table::requests::{CreateTableRequest, InsertRequest}; + +use crate::error::{ + CastTypeSnafu, CollectRecordsSnafu, FindScriptSnafu, FindScriptsTableSnafu, InsertScriptSnafu, + RegisterScriptsTableSnafu, Result, ScriptNotFoundSnafu, ScriptsTableNotFoundSnafu, +}; + +pub const SCRIPTS_TABLE_NAME: &str = "scripts"; + +pub struct ScriptsTable { + catalog_manager: CatalogManagerRef, + query_engine: QueryEngineRef, + name: String, +} + +impl ScriptsTable { + pub async fn new( + catalog_manager: CatalogManagerRef, + query_engine: QueryEngineRef, + ) -> Result { + let schema = Arc::new(build_scripts_schema()); + // TODO(dennis): we put scripts table into default catalog and schema. + // maybe put into system catalog? + let request = CreateTableRequest { + id: SCRIPTS_TABLE_ID, + catalog_name: Some(DEFAULT_CATALOG_NAME.to_string()), + schema_name: Some(DEFAULT_SCHEMA_NAME.to_string()), + table_name: SCRIPTS_TABLE_NAME.to_string(), + desc: Some("Scripts table".to_string()), + schema, + // name and timestamp as primary key + primary_key_indices: vec![0, 3], + create_if_not_exists: true, + table_options: HashMap::default(), + }; + + catalog_manager + .register_system_table(RegisterSystemTableRequest { + create_table_request: request, + open_hook: None, + }) + .await + .context(RegisterScriptsTableSnafu)?; + + Ok(Self { + catalog_manager, + query_engine, + name: catalog::format_full_table_name( + DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, + SCRIPTS_TABLE_NAME, + ), + }) + } + + pub async fn insert(&self, name: &str, script: &str) -> Result<()> { + let mut columns_values: HashMap = HashMap::with_capacity(7); + columns_values.insert( + "name".to_string(), + Arc::new(StringVector::from(vec![name])) as _, + ); + columns_values.insert( + "script".to_string(), + Arc::new(StringVector::from(vec![script])) as _, + ); + // TODO(dennis): we only supports python right now. + columns_values.insert( + "engine".to_string(), + Arc::new(StringVector::from(vec!["python"])) as _, + ); + // Timestamp in key part is intentionally left to 0 + columns_values.insert( + "timestamp".to_string(), + Arc::new(TimestampVector::from_slice(&[Timestamp::from_millis(0)])) as _, + ); + columns_values.insert( + "gmt_created".to_string(), + Arc::new(TimestampVector::from_slice(&[Timestamp::from_millis( + util::current_time_millis(), + )])) as _, + ); + columns_values.insert( + "gmt_modified".to_string(), + Arc::new(TimestampVector::from_slice(&[Timestamp::from_millis( + util::current_time_millis(), + )])) as _, + ); + + let table = self + .catalog_manager + .table( + Some(DEFAULT_CATALOG_NAME), + Some(DEFAULT_SCHEMA_NAME), + SCRIPTS_TABLE_NAME, + ) + .context(FindScriptsTableSnafu)? + .context(ScriptsTableNotFoundSnafu)?; + + let _ = table + .insert(InsertRequest { + table_name: SCRIPTS_TABLE_NAME.to_string(), + columns_values, + }) + .await + .context(InsertScriptSnafu { name })?; + + logging::info!("Inserted script: name={} into scripts table.", name); + + Ok(()) + } + + pub async fn find_script_by_name(&self, name: &str) -> Result { + // FIXME(dennis): SQL injection + // TODO(dennis): we use sql to find the script, the better way is use a function + // such as `find_record_by_primary_key` in table_engine. + let sql = format!("select script from {} where name='{}'", self.name(), name); + + let plan = self + .query_engine + .sql_to_plan(&sql) + .context(FindScriptSnafu { name })?; + + let stream = match self + .query_engine + .execute(&plan) + .await + .context(FindScriptSnafu { name })? + { + Output::RecordBatch(stream) => stream, + _ => unreachable!(), + }; + let records = record_util::collect(stream) + .await + .context(CollectRecordsSnafu)?; + + ensure!(!records.is_empty(), ScriptNotFoundSnafu { name }); + + assert_eq!(records.len(), 1); + assert_eq!(records[0].df_recordbatch.num_columns(), 1); + + let record = &records[0].df_recordbatch; + + let script_column = record + .column(0) + .as_any() + .downcast_ref::>() + .context(CastTypeSnafu { + msg: format!( + "can't downcast {:?} array into utf8 array", + record.column(0).data_type() + ), + })?; + + assert_eq!(script_column.len(), 1); + Ok(script_column.value(0).to_string()) + } + + #[inline] + pub fn name(&self) -> &str { + &self.name + } +} + +/// Build scripts table +fn build_scripts_schema() -> Schema { + let cols = vec![ + ColumnSchema::new( + "name".to_string(), + ConcreteDataType::string_datatype(), + false, + ), + ColumnSchema::new( + "script".to_string(), + ConcreteDataType::string_datatype(), + false, + ), + ColumnSchema::new( + "engine".to_string(), + ConcreteDataType::string_datatype(), + false, + ), + ColumnSchema::new( + "timestamp".to_string(), + ConcreteDataType::timestamp_millis_datatype(), + false, + ), + ColumnSchema::new( + "gmt_created".to_string(), + ConcreteDataType::timestamp_millis_datatype(), + false, + ), + ColumnSchema::new( + "gmt_modified".to_string(), + ConcreteDataType::timestamp_millis_datatype(), + false, + ), + ]; + + SchemaBuilder::from(cols) + .timestamp_index(3) + .build() + .unwrap() +} diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 97d31ce035..5e79c1aff7 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -49,9 +49,16 @@ pub enum Error { source: BoxedError, }, - #[snafu(display("Failed to execute script: {}, source: {}", script, source))] + #[snafu(display("Failed to insert script with name: {}, source: {}", name, source))] + InsertScript { + name: String, + #[snafu(backtrace)] + source: BoxedError, + }, + + #[snafu(display("Failed to execute script by name: {}, source: {}", name, source))] ExecuteScript { - script: String, + name: String, #[snafu(backtrace)] source: BoxedError, }, @@ -64,21 +71,22 @@ pub type Result = std::result::Result; impl ErrorExt for Error { fn status_code(&self) -> StatusCode { + use Error::*; match self { - Error::Internal { .. } - | Error::InternalIo { .. } - | Error::TokioIo { .. } - | Error::VectorConversion { .. } - | Error::CollectRecordbatch { .. } - | Error::StartHttp { .. } - | Error::StartGrpc { .. } - | Error::TcpBind { .. } => StatusCode::Internal, + Internal { .. } + | InternalIo { .. } + | TokioIo { .. } + | VectorConversion { .. } + | CollectRecordbatch { .. } + | StartHttp { .. } + | StartGrpc { .. } + | TcpBind { .. } => StatusCode::Internal, - Error::ExecuteScript { source, .. } | Error::ExecuteQuery { source, .. } => { - source.status_code() - } + InsertScript { source, .. } + | ExecuteScript { source, .. } + | ExecuteQuery { source, .. } => source.status_code(), - Error::NotSupported { .. } => StatusCode::InvalidArguments, + NotSupported { .. } => StatusCode::InvalidArguments, } } diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 81e5483657..3b7964e43a 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -122,7 +122,8 @@ impl HttpServer { Router::new() // handlers .route("/sql", routing::get(handler::sql)) - .route("/scripts", routing::post(handler::scripts)), + .route("/scripts", routing::post(handler::scripts)) + .route("/run-script", routing::post(handler::run_script)), ) .route("/metrics", routing::get(handler::metrics)) // middlewares diff --git a/src/servers/src/http/handler.rs b/src/servers/src/http/handler.rs index 15134690c9..81fb403835 100644 --- a/src/servers/src/http/handler.rs +++ b/src/servers/src/http/handler.rs @@ -37,20 +37,46 @@ pub async fn metrics( #[derive(Deserialize, Serialize)] pub struct ScriptExecution { + pub name: String, pub script: String, } -/// Handler to execute scripts +/// Handler to insert and compile script #[axum_macros::debug_handler] pub async fn scripts( Extension(query_handler): Extension, Json(payload): Json, ) -> HttpResponse { - if payload.script.is_empty() { - return HttpResponse::Json(JsonResponse::with_error(Some("Invalid script".to_string()))); + if payload.name.is_empty() || payload.script.is_empty() { + return HttpResponse::Json(JsonResponse::with_error(Some( + "Invalid name or script".to_string(), + ))); } - HttpResponse::Json( - JsonResponse::from_output(query_handler.execute_script(&payload.script).await).await, - ) + let body = match query_handler + .insert_script(&payload.name, &payload.script) + .await + { + Ok(()) => JsonResponse::with_output(None), + Err(e) => JsonResponse::with_error(Some(format!("Insert script error: {}", e))), + }; + + HttpResponse::Json(body) +} + +/// Handler to execute script +#[axum_macros::debug_handler] +pub async fn run_script( + Extension(query_handler): Extension, + Query(params): Query>, +) -> HttpResponse { + let name = params.get("name"); + + if name.is_none() || name.unwrap().is_empty() { + return HttpResponse::Json(JsonResponse::with_error(Some("Invalid name".to_string()))); + } + + let output = query_handler.execute_script(name.unwrap()).await; + + HttpResponse::Json(JsonResponse::from_output(output).await) } diff --git a/src/servers/src/query_handler.rs b/src/servers/src/query_handler.rs index e7cbb7f8c6..eed8b0e9c8 100644 --- a/src/servers/src/query_handler.rs +++ b/src/servers/src/query_handler.rs @@ -23,7 +23,8 @@ pub type GrpcAdminHandlerRef = Arc; #[async_trait] pub trait SqlQueryHandler { async fn do_query(&self, query: &str) -> Result; - async fn execute_script(&self, script: &str) -> Result; + async fn insert_script(&self, name: &str, script: &str) -> Result<()>; + async fn execute_script(&self, name: &str) -> Result; } #[async_trait] diff --git a/src/servers/tests/http/http_handler_test.rs b/src/servers/tests/http/http_handler_test.rs index 5ba52ad2e5..92d37d8ac5 100644 --- a/src/servers/tests/http/http_handler_test.rs +++ b/src/servers/tests/http/http_handler_test.rs @@ -84,12 +84,7 @@ async fn test_scripts() { HttpResponse::Json(json) => { assert!(json.success(), "{:?}", json); assert!(json.error().is_none()); - match json.output().expect("assertion failed") { - JsonOutput::Rows(rows) => { - assert_eq!(1, rows.len()); - } - _ => unreachable!(), - } + assert!(json.output().is_none()); } _ => unreachable!(), } @@ -97,6 +92,7 @@ async fn test_scripts() { fn create_script_payload() -> Json { Json(ScriptExecution { + name: "test".to_string(), script: r#" @copr(sql='select uint32s as number from numbers', args=['number'], returns=['n']) def test(n): diff --git a/src/servers/tests/mod.rs b/src/servers/tests/mod.rs index c4e42a53ea..8cbc1bec34 100644 --- a/src/servers/tests/mod.rs +++ b/src/servers/tests/mod.rs @@ -1,4 +1,5 @@ -use std::sync::Arc; +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; use async_trait::async_trait; use catalog::memory::{MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider}; @@ -14,18 +15,20 @@ mod http; mod mysql; use script::{ engine::{CompileContext, EvalContext, Script, ScriptEngine}, - python::PyEngine, + python::{PyEngine, PyScript}, }; struct DummyInstance { query_engine: QueryEngineRef, py_engine: Arc, + scripts: RwLock>>, } impl DummyInstance { fn new(query_engine: QueryEngineRef) -> Self { Self { py_engine: Arc::new(PyEngine::new(query_engine.clone())), + scripts: RwLock::new(HashMap::new()), query_engine, } } @@ -37,14 +40,25 @@ impl SqlQueryHandler for DummyInstance { let plan = self.query_engine.sql_to_plan(query).unwrap(); Ok(self.query_engine.execute(&plan).await.unwrap()) } - async fn execute_script(&self, script: &str) -> Result { - let py_script = self + + async fn insert_script(&self, name: &str, script: &str) -> Result<()> { + let script = self .py_engine .compile(script, CompileContext::default()) .await .unwrap(); + self.scripts + .write() + .unwrap() + .insert(name.to_string(), Arc::new(script)); - Ok(py_script.evaluate(EvalContext::default()).await.unwrap()) + Ok(()) + } + + async fn execute_script(&self, name: &str) -> Result { + let py_script = self.scripts.read().unwrap().get(name).unwrap().clone(); + + Ok(py_script.execute(EvalContext::default()).await.unwrap()) } } diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index 1fca70fdfc..ade7283a30 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -14,7 +14,7 @@ pub struct InsertRequest { } /// Create table request -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct CreateTableRequest { pub id: TableId, pub catalog_name: Option,