diff --git a/Cargo.lock b/Cargo.lock index f0ffb2ec95..82d66cc1a9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7622,6 +7622,7 @@ dependencies = [ "common-function", "common-query", "common-recordbatch", + "common-runtime", "common-telemetry", "common-test-util", "common-time", diff --git a/src/catalog/src/error.rs b/src/catalog/src/error.rs index 62e8e203a2..2433119249 100644 --- a/src/catalog/src/error.rs +++ b/src/catalog/src/error.rs @@ -27,6 +27,14 @@ use crate::DeregisterTableRequest; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { + #[snafu(display( + "Failed to re-compile script due to internal error, source: {}", + source + ))] + CompileScriptInternal { + #[snafu(backtrace)] + source: BoxedError, + }, #[snafu(display("Failed to open system catalog table, source: {}", source))] OpenSystemCatalog { #[snafu(backtrace)] @@ -300,9 +308,10 @@ impl ErrorExt for Error { Error::SystemCatalogTableScan { source } => source.status_code(), Error::SystemCatalogTableScanExec { source } => source.status_code(), Error::InvalidTableInfoInCatalog { source } => source.status_code(), - Error::SchemaProviderOperation { source } | Error::Internal { source } => { - source.status_code() - } + + Error::CompileScriptInternal { source } + | Error::SchemaProviderOperation { source } + | Error::Internal { source } => source.status_code(), Error::Unimplemented { .. } | Error::NotSupported { .. } => StatusCode::Unsupported, Error::QueryAccessDenied { .. } => StatusCode::AccessDenied, diff --git a/src/script/Cargo.toml b/src/script/Cargo.toml index 9d282befef..87f4307f15 100644 --- a/src/script/Cargo.toml +++ b/src/script/Cargo.toml @@ -34,6 +34,7 @@ common-query = { path = "../common/query" } common-recordbatch = { path = "../common/recordbatch" } common-telemetry = { path = "../common/telemetry" } common-time = { path = "../common/time" } +common-runtime = { path = "../common/runtime" } console = "0.15" crossbeam-utils = "0.8.14" datafusion = { workspace = true, optional = true } diff --git a/src/script/src/error.rs b/src/script/src/error.rs index b9105bde93..fcd445a85f 100644 --- a/src/script/src/error.rs +++ b/src/script/src/error.rs @@ -27,6 +27,9 @@ pub enum Error { source: catalog::error::Error, }, + #[snafu(display("Failed to find column in scripts table, name: {}", name))] + FindColumnInScriptsTable { name: String, location: Location }, + #[snafu(display("Failed to register scripts table, source: {}", source))] RegisterScriptsTable { #[snafu(backtrace)] @@ -87,7 +90,7 @@ impl ErrorExt for Error { fn status_code(&self) -> StatusCode { use Error::*; match self { - CastType { .. } => StatusCode::Unexpected, + FindColumnInScriptsTable { .. } | CastType { .. } => StatusCode::Unexpected, ScriptsTableNotFound { .. } => StatusCode::TableNotFound, RegisterScriptsTable { source } | FindScriptsTable { source } => source.status_code(), InsertScript { source, .. } => source.status_code(), diff --git a/src/script/src/manager.rs b/src/script/src/manager.rs index 45cfa120e5..fdb2f2f177 100644 --- a/src/script/src/manager.rs +++ b/src/script/src/manager.rs @@ -45,26 +45,35 @@ impl ScriptManager { }) } + /// compile script, and register them to the query engine and UDF registry async fn compile(&self, name: &str, script: &str) -> Result> { - let script = Arc::new( - self.py_engine - .compile(script, CompileContext::default()) - .await - .context(CompilePythonSnafu { name })?, - ); - - let mut compiled = self.compiled.write().unwrap(); - compiled.insert(name.to_string(), script.clone()); + let script = Arc::new(Self::compile_without_cache(&self.py_engine, name, script).await?); + { + let mut compiled = self.compiled.write().unwrap(); + compiled.insert(name.to_string(), script.clone()); + } logging::info!("Compiled and cached script: {}", name); - script.as_ref().register_udf(); + script.as_ref().register_udf().await; logging::info!("Script register as UDF: {}", name); Ok(script) } + /// compile script to PyScript, but not register them to the query engine and UDF registry nor caching in `compiled` + async fn compile_without_cache( + py_engine: &PyEngine, + name: &str, + script: &str, + ) -> Result { + py_engine + .compile(script, CompileContext::default()) + .await + .context(CompilePythonSnafu { name }) + } + pub async fn insert_and_compile( &self, schema: &str, diff --git a/src/script/src/python/engine.rs b/src/script/src/python/engine.rs index 1e51c6141b..25ec54c235 100644 --- a/src/script/src/python/engine.rs +++ b/src/script/src/python/engine.rs @@ -40,8 +40,9 @@ use snafu::{ensure, ResultExt}; use sql::statements::statement::Statement; use crate::engine::{CompileContext, EvalContext, Script, ScriptEngine}; -use crate::python::error::{self, PyRuntimeSnafu, Result}; +use crate::python::error::{self, PyRuntimeSnafu, Result, TokioJoinSnafu}; use crate::python::ffi_types::copr::{exec_parsed, parse, AnnotationInfo, CoprocessorRef}; +use crate::python::utils::spawn_blocking_script; const PY_ENGINE: &str = "python"; #[derive(Debug)] @@ -179,9 +180,17 @@ pub struct PyScript { } impl PyScript { + pub fn from_script(script: &str, query_engine: QueryEngineRef) -> Result { + let copr = Arc::new(parse::parse_and_compile_copr( + script, + Some(query_engine.clone()), + )?); + + Ok(PyScript { copr, query_engine }) + } /// Register Current Script as UDF, register name is same as script name /// FIXME(discord9): possible inject attack? - pub fn register_udf(&self) { + pub async fn register_udf(&self) { let udf = PyUDF::from_copr(self.copr.clone()); PyUDF::register_as_udf(udf.clone()); PyUDF::register_to_query_engine(udf, self.query_engine.clone()); @@ -291,7 +300,11 @@ impl Script for PyScript { _ => unreachable!(), } } else { - let batch = exec_parsed(&self.copr, &None, ¶ms)?; + let copr = self.copr.clone(); + let params = params.clone(); + let batch = spawn_blocking_script(move || exec_parsed(&copr, &None, ¶ms)) + .await + .context(TokioJoinSnafu)??; let batches = RecordBatches::try_new(batch.schema.clone(), vec![batch]).unwrap(); Ok(Output::RecordBatches(batches)) } @@ -335,6 +348,7 @@ impl ScriptEngine for PyEngine { } #[cfg(test)] pub(crate) use tests::sample_script_engine; + #[cfg(test)] mod tests { use catalog::local::{MemoryCatalogProvider, MemorySchemaProvider}; diff --git a/src/script/src/python/error.rs b/src/script/src/python/error.rs index a1e09931e8..e20d9f2ffa 100644 --- a/src/script/src/python/error.rs +++ b/src/script/src/python/error.rs @@ -117,6 +117,8 @@ pub enum Error { #[snafu(backtrace)] source: common_recordbatch::error::Error, }, + #[snafu(display("Failed to create tokio task, source: {}", source))] + TokioJoin { source: tokio::task::JoinError }, } impl From for Error { @@ -131,6 +133,7 @@ impl ErrorExt for Error { Error::DataFusion { .. } | Error::Arrow { .. } | Error::PyRuntime { .. } + | Error::TokioJoin { .. } | Error::Other { .. } => StatusCode::Internal, Error::RecordBatch { source } | Error::NewRecordBatch { source } => { diff --git a/src/script/src/python/utils.rs b/src/script/src/python/utils.rs index 709e87e74a..a82b10418d 100644 --- a/src/script/src/python/utils.rs +++ b/src/script/src/python/utils.rs @@ -12,11 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +use common_runtime::JoinHandle; use futures::Future; -use once_cell::sync::OnceCell; use rustpython_vm::builtins::PyBaseExceptionRef; use rustpython_vm::VirtualMachine; -use tokio::runtime::Runtime; use crate::python::error; @@ -30,22 +29,29 @@ pub fn format_py_error(excep: PyBaseExceptionRef, vm: &VirtualMachine) -> error: } error::PyRuntimeSnafu { msg }.build() } -static LOCAL_RUNTIME: OnceCell = OnceCell::new(); -fn get_local_runtime() -> std::thread::Result<&'static Runtime> { - let rt = LOCAL_RUNTIME - .get_or_try_init(|| tokio::runtime::Runtime::new().map_err(|e| Box::new(e) as _))?; - Ok(rt) + +/// just like [`tokio::task::spawn_blocking`] but using a dedicated runtime(runtime `bg`) using by `scripts` crate +pub fn spawn_blocking_script(f: F) -> JoinHandle +where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, +{ + common_runtime::spawn_blocking_bg(f) } + +/// Please only use this method because you are calling from (optionally first as async) to sync then to a async /// a terrible hack to call async from sync by: +/// /// TODO(discord9): find a better way -/// 1. spawn a new thread -/// 2. create a new runtime in new thread and call `block_on` on it +/// 1. using a cached runtime +/// 2. block on that runtime pub fn block_on_async(f: F) -> std::thread::Result where F: Future + Send + 'static, T: Send + 'static, { - let rt = get_local_runtime()?; - + let rt = common_runtime::bg_runtime(); + // spawn a thread to block on the runtime, also should prevent `start a runtime inside of runtime` error + // it's ok to block here, assume calling from async to sync is using a `spawn_blocking_*` call std::thread::spawn(move || rt.block_on(f)).join() } diff --git a/src/script/src/table.rs b/src/script/src/table.rs index c5371e0731..aa9cd83d13 100644 --- a/src/script/src/table.rs +++ b/src/script/src/table.rs @@ -16,15 +16,18 @@ use std::collections::HashMap; use std::sync::Arc; -use catalog::{CatalogManagerRef, RegisterSystemTableRequest}; +use catalog::error::CompileScriptInternalSnafu; +use catalog::{CatalogManagerRef, OpenSystemTableHook, RegisterSystemTableRequest}; use common_catalog::consts::{ DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE, SCRIPTS_TABLE_ID, }; use common_catalog::format_full_table_name; +use common_error::prelude::BoxedError; use common_query::Output; -use common_recordbatch::util as record_util; +use common_recordbatch::{util as record_util, RecordBatch}; use common_telemetry::logging; use common_time::util; +use datafusion::prelude::SessionContext; use datatypes::prelude::{ConcreteDataType, ScalarVector}; use datatypes::schema::{ColumnSchema, RawSchema}; use datatypes::vectors::{StringVector, TimestampMillisecondVector, Vector, VectorRef}; @@ -33,11 +36,15 @@ use query::QueryEngineRef; use session::context::QueryContext; use snafu::{ensure, OptionExt, ResultExt}; use table::requests::{CreateTableRequest, InsertRequest, TableOptions}; +use table::TableRef; use crate::error::{ - CastTypeSnafu, CollectRecordsSnafu, FindScriptSnafu, FindScriptsTableSnafu, InsertScriptSnafu, - RegisterScriptsTableSnafu, Result, ScriptNotFoundSnafu, ScriptsTableNotFoundSnafu, + CastTypeSnafu, CollectRecordsSnafu, FindColumnInScriptsTableSnafu, FindScriptSnafu, + FindScriptsTableSnafu, InsertScriptSnafu, RegisterScriptsTableSnafu, Result, + ScriptNotFoundSnafu, ScriptsTableNotFoundSnafu, }; +use crate::python::utils::block_on_async; +use crate::python::PyScript; pub const SCRIPTS_TABLE_NAME: &str = "scripts"; @@ -48,6 +55,84 @@ pub struct ScriptsTable { } impl ScriptsTable { + fn get_str_col_by_name<'a>(record: &'a RecordBatch, name: &str) -> Result<&'a StringVector> { + let column = record + .column_by_name(name) + .with_context(|| FindColumnInScriptsTableSnafu { name })?; + let column = column + .as_any() + .downcast_ref::() + .with_context(|| CastTypeSnafu { + msg: format!( + "can't downcast {:?} array into string vector", + column.data_type() + ), + })?; + Ok(column) + } + /// this is used as a callback function when scripts table is created. `table` should be `scripts` table. + /// the function will try it best to register all scripts, and ignore the error in parsing and register scripts + /// if any, just emit a warning + /// TODO(discord9): rethink error handling here + pub async fn recompile_register_udf( + table: TableRef, + query_engine: QueryEngineRef, + ) -> catalog::error::Result<()> { + let scan_stream = table + .scan(None, &[], None) + .await + .map_err(BoxedError::new) + .context(CompileScriptInternalSnafu)?; + let ctx = SessionContext::new(); + let rbs = scan_stream + .execute(0, ctx.task_ctx()) + .map_err(BoxedError::new) + .context(CompileScriptInternalSnafu)?; + let records = record_util::collect(rbs) + .await + .map_err(BoxedError::new) + .context(CompileScriptInternalSnafu)?; + + let mut script_list: Vec<(String, String)> = Vec::new(); + for record in records { + let names = Self::get_str_col_by_name(&record, "name") + .map_err(BoxedError::new) + .context(CompileScriptInternalSnafu)?; + let scripts = Self::get_str_col_by_name(&record, "script") + .map_err(BoxedError::new) + .context(CompileScriptInternalSnafu)?; + + let part_of_scripts_list = + names + .iter_data() + .zip(scripts.iter_data()) + .filter_map(|i| match i { + (Some(a), Some(b)) => Some((a.to_string(), b.to_string())), + _ => None, + }); + script_list.extend(part_of_scripts_list); + } + + for (name, script) in script_list { + match PyScript::from_script(&script, query_engine.clone()) { + Ok(script) => { + script.register_udf().await; + logging::debug!( + "Script in `scripts` system table re-register as UDF: {}", + name + ); + } + Err(err) => { + logging::warn!( + r#"Failed to compile script "{}"" in `scripts` table: {}"#, + name, + err + ); + } + } + } + Ok(()) + } pub async fn new( catalog_manager: CatalogManagerRef, query_engine: QueryEngineRef, @@ -69,11 +154,19 @@ impl ScriptsTable { table_options: TableOptions::default(), engine: MITO_ENGINE.to_string(), }; + let callback_query_engine = query_engine.clone(); + let script_recompile_callback: OpenSystemTableHook = Arc::new(move |table: TableRef| { + let callback_query_engine = callback_query_engine.clone(); + block_on_async(async move { + Self::recompile_register_udf(table, callback_query_engine.clone()).await + }) + .unwrap() + }); catalog_manager .register_system_table(RegisterSystemTableRequest { create_table_request: request, - open_hook: None, + open_hook: Some(script_recompile_callback), }) .await .context(RegisterScriptsTableSnafu)?; diff --git a/src/servers/tests/mod.rs b/src/servers/tests/mod.rs index 811904ce25..f73ffdd865 100644 --- a/src/servers/tests/mod.rs +++ b/src/servers/tests/mod.rs @@ -119,7 +119,7 @@ impl ScriptHandler for DummyInstance { .compile(script, CompileContext::default()) .await .unwrap(); - script.register_udf(); + script.register_udf().await; self.scripts .write() .unwrap()