fix: recompile&register scripts as UDF on reboot (#1421)

* fixme: recompile somewhere else

* feat: re-compile&re-register all scripts in table

* fix: allow empty scripts table

* chore: add non-blocking somewhere

* chore: PR advices

* chore: more PR advices

* style: remove useless join

* style: remove redunent code

* refactor: use `bg` runtime instead

* style: cargo fmt
This commit is contained in:
discord9
2023-04-26 16:30:58 +08:00
committed by GitHub
parent 1a245f35b9
commit ef4e473e6d
10 changed files with 173 additions and 34 deletions

1
Cargo.lock generated
View File

@@ -7622,6 +7622,7 @@ dependencies = [
"common-function",
"common-query",
"common-recordbatch",
"common-runtime",
"common-telemetry",
"common-test-util",
"common-time",

View File

@@ -27,6 +27,14 @@ use crate::DeregisterTableRequest;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display(
"Failed to re-compile script due to internal error, source: {}",
source
))]
CompileScriptInternal {
#[snafu(backtrace)]
source: BoxedError,
},
#[snafu(display("Failed to open system catalog table, source: {}", source))]
OpenSystemCatalog {
#[snafu(backtrace)]
@@ -300,9 +308,10 @@ impl ErrorExt for Error {
Error::SystemCatalogTableScan { source } => source.status_code(),
Error::SystemCatalogTableScanExec { source } => source.status_code(),
Error::InvalidTableInfoInCatalog { source } => source.status_code(),
Error::SchemaProviderOperation { source } | Error::Internal { source } => {
source.status_code()
}
Error::CompileScriptInternal { source }
| Error::SchemaProviderOperation { source }
| Error::Internal { source } => source.status_code(),
Error::Unimplemented { .. } | Error::NotSupported { .. } => StatusCode::Unsupported,
Error::QueryAccessDenied { .. } => StatusCode::AccessDenied,

View File

@@ -34,6 +34,7 @@ common-query = { path = "../common/query" }
common-recordbatch = { path = "../common/recordbatch" }
common-telemetry = { path = "../common/telemetry" }
common-time = { path = "../common/time" }
common-runtime = { path = "../common/runtime" }
console = "0.15"
crossbeam-utils = "0.8.14"
datafusion = { workspace = true, optional = true }

View File

@@ -27,6 +27,9 @@ pub enum Error {
source: catalog::error::Error,
},
#[snafu(display("Failed to find column in scripts table, name: {}", name))]
FindColumnInScriptsTable { name: String, location: Location },
#[snafu(display("Failed to register scripts table, source: {}", source))]
RegisterScriptsTable {
#[snafu(backtrace)]
@@ -87,7 +90,7 @@ impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
use Error::*;
match self {
CastType { .. } => StatusCode::Unexpected,
FindColumnInScriptsTable { .. } | CastType { .. } => StatusCode::Unexpected,
ScriptsTableNotFound { .. } => StatusCode::TableNotFound,
RegisterScriptsTable { source } | FindScriptsTable { source } => source.status_code(),
InsertScript { source, .. } => source.status_code(),

View File

@@ -45,26 +45,35 @@ impl ScriptManager {
})
}
/// compile script, and register them to the query engine and UDF registry
async fn compile(&self, name: &str, script: &str) -> Result<Arc<PyScript>> {
let script = Arc::new(
self.py_engine
.compile(script, CompileContext::default())
.await
.context(CompilePythonSnafu { name })?,
);
let mut compiled = self.compiled.write().unwrap();
compiled.insert(name.to_string(), script.clone());
let script = Arc::new(Self::compile_without_cache(&self.py_engine, name, script).await?);
{
let mut compiled = self.compiled.write().unwrap();
compiled.insert(name.to_string(), script.clone());
}
logging::info!("Compiled and cached script: {}", name);
script.as_ref().register_udf();
script.as_ref().register_udf().await;
logging::info!("Script register as UDF: {}", name);
Ok(script)
}
/// compile script to PyScript, but not register them to the query engine and UDF registry nor caching in `compiled`
async fn compile_without_cache(
py_engine: &PyEngine,
name: &str,
script: &str,
) -> Result<PyScript> {
py_engine
.compile(script, CompileContext::default())
.await
.context(CompilePythonSnafu { name })
}
pub async fn insert_and_compile(
&self,
schema: &str,

View File

@@ -40,8 +40,9 @@ use snafu::{ensure, ResultExt};
use sql::statements::statement::Statement;
use crate::engine::{CompileContext, EvalContext, Script, ScriptEngine};
use crate::python::error::{self, PyRuntimeSnafu, Result};
use crate::python::error::{self, PyRuntimeSnafu, Result, TokioJoinSnafu};
use crate::python::ffi_types::copr::{exec_parsed, parse, AnnotationInfo, CoprocessorRef};
use crate::python::utils::spawn_blocking_script;
const PY_ENGINE: &str = "python";
#[derive(Debug)]
@@ -179,9 +180,17 @@ pub struct PyScript {
}
impl PyScript {
pub fn from_script(script: &str, query_engine: QueryEngineRef) -> Result<Self> {
let copr = Arc::new(parse::parse_and_compile_copr(
script,
Some(query_engine.clone()),
)?);
Ok(PyScript { copr, query_engine })
}
/// Register Current Script as UDF, register name is same as script name
/// FIXME(discord9): possible inject attack?
pub fn register_udf(&self) {
pub async fn register_udf(&self) {
let udf = PyUDF::from_copr(self.copr.clone());
PyUDF::register_as_udf(udf.clone());
PyUDF::register_to_query_engine(udf, self.query_engine.clone());
@@ -291,7 +300,11 @@ impl Script for PyScript {
_ => unreachable!(),
}
} else {
let batch = exec_parsed(&self.copr, &None, &params)?;
let copr = self.copr.clone();
let params = params.clone();
let batch = spawn_blocking_script(move || exec_parsed(&copr, &None, &params))
.await
.context(TokioJoinSnafu)??;
let batches = RecordBatches::try_new(batch.schema.clone(), vec![batch]).unwrap();
Ok(Output::RecordBatches(batches))
}
@@ -335,6 +348,7 @@ impl ScriptEngine for PyEngine {
}
#[cfg(test)]
pub(crate) use tests::sample_script_engine;
#[cfg(test)]
mod tests {
use catalog::local::{MemoryCatalogProvider, MemorySchemaProvider};

View File

@@ -117,6 +117,8 @@ pub enum Error {
#[snafu(backtrace)]
source: common_recordbatch::error::Error,
},
#[snafu(display("Failed to create tokio task, source: {}", source))]
TokioJoin { source: tokio::task::JoinError },
}
impl From<QueryError> for Error {
@@ -131,6 +133,7 @@ impl ErrorExt for Error {
Error::DataFusion { .. }
| Error::Arrow { .. }
| Error::PyRuntime { .. }
| Error::TokioJoin { .. }
| Error::Other { .. } => StatusCode::Internal,
Error::RecordBatch { source } | Error::NewRecordBatch { source } => {

View File

@@ -12,11 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_runtime::JoinHandle;
use futures::Future;
use once_cell::sync::OnceCell;
use rustpython_vm::builtins::PyBaseExceptionRef;
use rustpython_vm::VirtualMachine;
use tokio::runtime::Runtime;
use crate::python::error;
@@ -30,22 +29,29 @@ pub fn format_py_error(excep: PyBaseExceptionRef, vm: &VirtualMachine) -> error:
}
error::PyRuntimeSnafu { msg }.build()
}
static LOCAL_RUNTIME: OnceCell<tokio::runtime::Runtime> = OnceCell::new();
fn get_local_runtime() -> std::thread::Result<&'static Runtime> {
let rt = LOCAL_RUNTIME
.get_or_try_init(|| tokio::runtime::Runtime::new().map_err(|e| Box::new(e) as _))?;
Ok(rt)
/// just like [`tokio::task::spawn_blocking`] but using a dedicated runtime(runtime `bg`) using by `scripts` crate
pub fn spawn_blocking_script<F, R>(f: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
common_runtime::spawn_blocking_bg(f)
}
/// Please only use this method because you are calling from (optionally first as async) to sync then to a async
/// a terrible hack to call async from sync by:
///
/// TODO(discord9): find a better way
/// 1. spawn a new thread
/// 2. create a new runtime in new thread and call `block_on` on it
/// 1. using a cached runtime
/// 2. block on that runtime
pub fn block_on_async<T, F>(f: F) -> std::thread::Result<T>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
let rt = get_local_runtime()?;
let rt = common_runtime::bg_runtime();
// spawn a thread to block on the runtime, also should prevent `start a runtime inside of runtime` error
// it's ok to block here, assume calling from async to sync is using a `spawn_blocking_*` call
std::thread::spawn(move || rt.block_on(f)).join()
}

View File

@@ -16,15 +16,18 @@
use std::collections::HashMap;
use std::sync::Arc;
use catalog::{CatalogManagerRef, RegisterSystemTableRequest};
use catalog::error::CompileScriptInternalSnafu;
use catalog::{CatalogManagerRef, OpenSystemTableHook, RegisterSystemTableRequest};
use common_catalog::consts::{
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE, SCRIPTS_TABLE_ID,
};
use common_catalog::format_full_table_name;
use common_error::prelude::BoxedError;
use common_query::Output;
use common_recordbatch::util as record_util;
use common_recordbatch::{util as record_util, RecordBatch};
use common_telemetry::logging;
use common_time::util;
use datafusion::prelude::SessionContext;
use datatypes::prelude::{ConcreteDataType, ScalarVector};
use datatypes::schema::{ColumnSchema, RawSchema};
use datatypes::vectors::{StringVector, TimestampMillisecondVector, Vector, VectorRef};
@@ -33,11 +36,15 @@ use query::QueryEngineRef;
use session::context::QueryContext;
use snafu::{ensure, OptionExt, ResultExt};
use table::requests::{CreateTableRequest, InsertRequest, TableOptions};
use table::TableRef;
use crate::error::{
CastTypeSnafu, CollectRecordsSnafu, FindScriptSnafu, FindScriptsTableSnafu, InsertScriptSnafu,
RegisterScriptsTableSnafu, Result, ScriptNotFoundSnafu, ScriptsTableNotFoundSnafu,
CastTypeSnafu, CollectRecordsSnafu, FindColumnInScriptsTableSnafu, FindScriptSnafu,
FindScriptsTableSnafu, InsertScriptSnafu, RegisterScriptsTableSnafu, Result,
ScriptNotFoundSnafu, ScriptsTableNotFoundSnafu,
};
use crate::python::utils::block_on_async;
use crate::python::PyScript;
pub const SCRIPTS_TABLE_NAME: &str = "scripts";
@@ -48,6 +55,84 @@ pub struct ScriptsTable {
}
impl ScriptsTable {
fn get_str_col_by_name<'a>(record: &'a RecordBatch, name: &str) -> Result<&'a StringVector> {
let column = record
.column_by_name(name)
.with_context(|| FindColumnInScriptsTableSnafu { name })?;
let column = column
.as_any()
.downcast_ref::<StringVector>()
.with_context(|| CastTypeSnafu {
msg: format!(
"can't downcast {:?} array into string vector",
column.data_type()
),
})?;
Ok(column)
}
/// this is used as a callback function when scripts table is created. `table` should be `scripts` table.
/// the function will try it best to register all scripts, and ignore the error in parsing and register scripts
/// if any, just emit a warning
/// TODO(discord9): rethink error handling here
pub async fn recompile_register_udf(
table: TableRef,
query_engine: QueryEngineRef,
) -> catalog::error::Result<()> {
let scan_stream = table
.scan(None, &[], None)
.await
.map_err(BoxedError::new)
.context(CompileScriptInternalSnafu)?;
let ctx = SessionContext::new();
let rbs = scan_stream
.execute(0, ctx.task_ctx())
.map_err(BoxedError::new)
.context(CompileScriptInternalSnafu)?;
let records = record_util::collect(rbs)
.await
.map_err(BoxedError::new)
.context(CompileScriptInternalSnafu)?;
let mut script_list: Vec<(String, String)> = Vec::new();
for record in records {
let names = Self::get_str_col_by_name(&record, "name")
.map_err(BoxedError::new)
.context(CompileScriptInternalSnafu)?;
let scripts = Self::get_str_col_by_name(&record, "script")
.map_err(BoxedError::new)
.context(CompileScriptInternalSnafu)?;
let part_of_scripts_list =
names
.iter_data()
.zip(scripts.iter_data())
.filter_map(|i| match i {
(Some(a), Some(b)) => Some((a.to_string(), b.to_string())),
_ => None,
});
script_list.extend(part_of_scripts_list);
}
for (name, script) in script_list {
match PyScript::from_script(&script, query_engine.clone()) {
Ok(script) => {
script.register_udf().await;
logging::debug!(
"Script in `scripts` system table re-register as UDF: {}",
name
);
}
Err(err) => {
logging::warn!(
r#"Failed to compile script "{}"" in `scripts` table: {}"#,
name,
err
);
}
}
}
Ok(())
}
pub async fn new(
catalog_manager: CatalogManagerRef,
query_engine: QueryEngineRef,
@@ -69,11 +154,19 @@ impl ScriptsTable {
table_options: TableOptions::default(),
engine: MITO_ENGINE.to_string(),
};
let callback_query_engine = query_engine.clone();
let script_recompile_callback: OpenSystemTableHook = Arc::new(move |table: TableRef| {
let callback_query_engine = callback_query_engine.clone();
block_on_async(async move {
Self::recompile_register_udf(table, callback_query_engine.clone()).await
})
.unwrap()
});
catalog_manager
.register_system_table(RegisterSystemTableRequest {
create_table_request: request,
open_hook: None,
open_hook: Some(script_recompile_callback),
})
.await
.context(RegisterScriptsTableSnafu)?;

View File

@@ -119,7 +119,7 @@ impl ScriptHandler for DummyInstance {
.compile(script, CompileContext::default())
.await
.unwrap();
script.register_udf();
script.register_udf().await;
self.scripts
.write()
.unwrap()