feat: impl scripts table and /run-script restful api (#230)

* feat: impl scripts table and /execute restful api

* fix: test failures

* fix: test failures

* feat: impl /run_script API

* refactor: rename run_script api to run-script and test script manager

* fix: remove println

* refactor: error mod

* refactor: by CR comments

* feat: rebase develop and change timestamp/gmt_crated/gmt_modified type to timestamp

* refactor: use assert_eq instread of assert

* doc: fix comment in Script#execute function
This commit is contained in:
dennis zhuang
2022-09-13 15:09:00 +08:00
committed by GitHub
parent cad35fe82e
commit 03169c4a04
28 changed files with 900 additions and 120 deletions

View File

@@ -19,10 +19,13 @@ python = [
[dependencies]
async-trait = "0.1"
catalog = { path = "../catalog" }
common-error = {path = "../common/error"}
common-function = { path = "../common/function" }
common-query = {path = "../common/query"}
common-recordbatch = {path = "../common/recordbatch" }
common-telemetry = { path = "../common/telemetry" }
common-time = { path = "../common/time" }
console = "0.15"
datafusion = {git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", optional = true}
datafusion-common = {git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2"}
@@ -40,11 +43,14 @@ rustpython-parser = {git = "https://github.com/RustPython/RustPython", optional
rustpython-vm = {git = "https://github.com/RustPython/RustPython", optional = true, rev = "02a1d1d"}
snafu = {version = "0.7", features = ["backtraces"]}
sql = { path = "../sql" }
table = { path = "../table" }
[dev-dependencies]
catalog = { path = "../catalog" }
log-store = { path = "../log-store" }
ron = "0.7"
serde = {version = "1.0", features = ["derive"]}
table = { path = "../table" }
storage = { path = "../storage" }
table-engine = { path = "../table-engine", features = ["test"] }
tempdir = "0.3"
tokio = { version = "1.18", features = ["full"] }
tokio-test = "0.4"

View File

@@ -15,8 +15,8 @@ pub trait Script {
fn as_any(&self) -> &dyn Any;
/// Evaluate the script and returns the output.
async fn evaluate(&self, ctx: EvalContext) -> std::result::Result<Output, Self::Error>;
/// Execute the script and returns the output.
async fn execute(&self, ctx: EvalContext) -> std::result::Result<Output, Self::Error>;
}
#[async_trait]

128
src/script/src/error.rs Normal file
View File

@@ -0,0 +1,128 @@
use std::any::Any;
use common_error::ext::ErrorExt;
use common_error::prelude::{Snafu, StatusCode};
use snafu::{Backtrace, ErrorCompat};
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Failed to find scripts table, source: {}", source))]
FindScriptsTable {
#[snafu(backtrace)]
source: catalog::error::Error,
},
#[snafu(display("Failed to register scripts table, source: {}", source))]
RegisterScriptsTable {
#[snafu(backtrace)]
source: catalog::error::Error,
},
#[snafu(display("Scripts table not found"))]
ScriptsTableNotFound { backtrace: Backtrace },
#[snafu(display(
"Failed to insert script to scripts table, name: {}, source: {}",
name,
source
))]
InsertScript {
name: String,
#[snafu(backtrace)]
source: table::error::Error,
},
#[snafu(display("Failed to compile python script, name: {}, source: {}", name, source))]
CompilePython {
name: String,
#[snafu(backtrace)]
source: crate::python::error::Error,
},
#[snafu(display("Failed to execute python script {}, source: {}", name, source))]
ExecutePython {
name: String,
#[snafu(backtrace)]
source: crate::python::error::Error,
},
#[snafu(display("Script not found, name: {}", name))]
ScriptNotFound { backtrace: Backtrace, name: String },
#[snafu(display("Failed to find script by name: {}", name))]
FindScript {
name: String,
#[snafu(backtrace)]
source: query::error::Error,
},
#[snafu(display("Failed to collect record batch, source: {}", source))]
CollectRecords {
#[snafu(backtrace)]
source: common_recordbatch::error::Error,
},
#[snafu(display("Failed to cast type, msg: {}", msg))]
CastType { msg: String, backtrace: Backtrace },
}
pub type Result<T> = std::result::Result<T, Error>;
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
use Error::*;
match self {
CastType { .. } => StatusCode::Unexpected,
ScriptsTableNotFound { .. } => StatusCode::TableNotFound,
RegisterScriptsTable { source } | FindScriptsTable { source } => source.status_code(),
InsertScript { source, .. } => source.status_code(),
CompilePython { source, .. } | ExecutePython { source, .. } => source.status_code(),
FindScript { source, .. } => source.status_code(),
CollectRecords { source } => source.status_code(),
ScriptNotFound { .. } => StatusCode::InvalidArguments,
}
}
fn backtrace_opt(&self) -> Option<&Backtrace> {
ErrorCompat::backtrace(self)
}
fn as_any(&self) -> &dyn Any {
self
}
}
#[cfg(test)]
mod tests {
use snafu::ResultExt;
use super::*;
fn throw_catalog_error() -> catalog::error::Result<()> {
catalog::error::IllegalManagerStateSnafu { msg: "test" }.fail()
}
fn throw_python_error() -> crate::python::error::Result<()> {
crate::python::error::CoprParseSnafu {
reason: "test",
loc: None,
}
.fail()
}
#[test]
fn test_error() {
let err = throw_catalog_error()
.context(FindScriptsTableSnafu)
.unwrap_err();
assert_eq!(StatusCode::Unexpected, err.status_code());
assert!(err.backtrace_opt().is_some());
let err = throw_python_error()
.context(ExecutePythonSnafu { name: "test" })
.unwrap_err();
assert_eq!(StatusCode::InvalidArguments, err.status_code());
assert!(err.backtrace_opt().is_some());
}
}

View File

@@ -1,3 +1,7 @@
pub mod engine;
pub mod error;
#[cfg(feature = "python")]
pub mod manager;
#[cfg(feature = "python")]
pub mod python;
mod table;

159
src/script/src/manager.rs Normal file
View File

@@ -0,0 +1,159 @@
//! Scripts manager
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use catalog::CatalogManagerRef;
use common_telemetry::logging;
use query::{Output, QueryEngineRef};
use snafu::{OptionExt, ResultExt};
use crate::engine::{CompileContext, EvalContext, Script, ScriptEngine};
use crate::error::{CompilePythonSnafu, ExecutePythonSnafu, Result, ScriptNotFoundSnafu};
use crate::python::{PyEngine, PyScript};
use crate::table::ScriptsTable;
pub struct ScriptManager {
compiled: RwLock<HashMap<String, Arc<PyScript>>>,
py_engine: PyEngine,
table: ScriptsTable,
}
impl ScriptManager {
pub async fn new(
catalog_manager: CatalogManagerRef,
query_engine: QueryEngineRef,
) -> Result<Self> {
Ok(Self {
compiled: RwLock::new(HashMap::default()),
py_engine: PyEngine::new(query_engine.clone()),
table: ScriptsTable::new(catalog_manager, query_engine).await?,
})
}
async fn compile(&self, name: &str, script: &str) -> Result<Arc<PyScript>> {
let script = Arc::new(
self.py_engine
.compile(script, CompileContext::default())
.await
.context(CompilePythonSnafu { name })?,
);
let mut compiled = self.compiled.write().unwrap();
compiled.insert(name.to_string(), script.clone());
logging::info!("Compiled and cached script: {}", name);
Ok(script)
}
pub async fn insert_and_compile(&self, name: &str, script: &str) -> Result<Arc<PyScript>> {
let compiled_script = self.compile(name, script).await?;
self.table.insert(name, script).await?;
Ok(compiled_script)
}
pub async fn execute(&self, name: &str) -> Result<Output> {
let script = {
let s = self.compiled.read().unwrap().get(name).cloned();
if s.is_some() {
s
} else {
self.try_find_script_and_compile(name).await?
}
};
let script = script.context(ScriptNotFoundSnafu { name })?;
script
.execute(EvalContext::default())
.await
.context(ExecutePythonSnafu { name })
}
async fn try_find_script_and_compile(&self, name: &str) -> Result<Option<Arc<PyScript>>> {
let script = self.table.find_script_by_name(name).await?;
Ok(Some(self.compile(name, &script).await?))
}
}
#[cfg(test)]
mod tests {
use catalog::CatalogManager;
use query::QueryEngineFactory;
use table_engine::config::EngineConfig as TableEngineConfig;
use table_engine::table::test_util::new_test_object_store;
use super::*;
type DefaultEngine = MitoEngine<EngineImpl<LocalFileLogStore>>;
use log_store::fs::{config::LogConfig, log::LocalFileLogStore};
use storage::{config::EngineConfig as StorageEngineConfig, EngineImpl};
use table_engine::engine::MitoEngine;
use tempdir::TempDir;
#[tokio::test]
async fn test_insert_find_compile_script() {
let wal_dir = TempDir::new("test_insert_find_compile_script_wal").unwrap();
let wal_dir_str = wal_dir.path().to_string_lossy();
common_telemetry::init_default_ut_logging();
let (_dir, object_store) = new_test_object_store("test_insert_find_compile_script").await;
let log_config = LogConfig {
log_file_dir: wal_dir_str.to_string(),
..Default::default()
};
let log_store = LocalFileLogStore::open(&log_config).await.unwrap();
let mock_engine = Arc::new(DefaultEngine::new(
TableEngineConfig::default(),
EngineImpl::new(
StorageEngineConfig::default(),
Arc::new(log_store),
object_store.clone(),
),
object_store,
));
let catalog_manager = Arc::new(
catalog::LocalCatalogManager::try_new(mock_engine.clone())
.await
.unwrap(),
);
let factory = QueryEngineFactory::new(catalog_manager.clone());
let query_engine = factory.query_engine().clone();
let mgr = ScriptManager::new(catalog_manager.clone(), query_engine)
.await
.unwrap();
catalog_manager.start().await.unwrap();
let name = "test";
mgr.table
.insert(
name,
r#"
@copr(sql='select number from numbers limit 10', args=['number'], returns=['n'])
def test(n):
return n + 1;
"#,
)
.await
.unwrap();
{
let cached = mgr.compiled.read().unwrap();
assert!(cached.get(name).is_none());
}
// try to find and compile
let script = mgr.try_find_script_and_compile(name).await.unwrap();
assert!(script.is_some());
{
let cached = mgr.compiled.read().unwrap();
assert!(cached.get(name).is_some());
}
}
}

View File

@@ -77,7 +77,7 @@ impl Script for PyScript {
self
}
async fn evaluate(&self, _ctx: EvalContext) -> Result<Output> {
async fn execute(&self, _ctx: EvalContext) -> Result<Output> {
if let Some(sql) = &self.copr.deco_args.sql {
let stmt = self.query_engine.sql_to_statement(sql)?;
ensure!(
@@ -150,7 +150,7 @@ mod tests {
use super::*;
#[tokio::test]
async fn test_compile_evaluate() {
async fn test_compile_execute() {
let catalog_list = catalog::memory::new_memory_catalog_list().unwrap();
let default_schema = Arc::new(MemorySchemaProvider::new());
@@ -176,7 +176,7 @@ def test(a, b, c):
.compile(script, CompileContext::default())
.await
.unwrap();
let output = script.evaluate(EvalContext::default()).await.unwrap();
let output = script.execute(EvalContext::default()).await.unwrap();
match output {
Output::RecordBatch(stream) => {
let numbers = util::collect(stream).await.unwrap();
@@ -207,7 +207,7 @@ def test(a):
.compile(script, CompileContext::default())
.await
.unwrap();
let output = script.evaluate(EvalContext::default()).await.unwrap();
let output = script.execute(EvalContext::default()).await.unwrap();
match output {
Output::RecordBatch(stream) => {
let numbers = util::collect(stream).await.unwrap();

View File

@@ -48,11 +48,11 @@ pub enum Error {
/// errors in coprocessors' parse check for types and etc.
#[snafu(display("Coprocessor error: {} {}.", reason,
if let Some(loc) = loc{
format!("at {loc}")
}else{
"".into()
}))]
if let Some(loc) = loc{
format!("at {loc}")
}else{
"".into()
}))]
CoprParse {
backtrace: Backtrace,
reason: String,
@@ -89,12 +89,13 @@ impl From<QueryError> for Error {
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::Arrow { .. }
| Error::TypeCast { .. }
| Error::DatabaseQuery { .. }
| Error::PyRuntime { .. }
| Error::RecordBatch { .. }
| Error::Other { .. } => StatusCode::Internal,
Error::Arrow { .. } | Error::PyRuntime { .. } | Error::Other { .. } => {
StatusCode::Internal
}
Error::RecordBatch { source } => source.status_code(),
Error::DatabaseQuery { source } => source.status_code(),
Error::TypeCast { source } => source.status_code(),
Error::PyParse { .. }
| Error::PyCompile { .. }
@@ -187,3 +188,23 @@ pub fn get_error_reason_loc(err: &Error) -> (String, Option<Location>) {
_ => (format!("Unknown error: {:?}", err), None),
}
}
#[cfg(test)]
mod tests {
use common_error::mock::MockError;
use snafu::ResultExt;
use super::*;
fn throw_query_error() -> query::error::Result<()> {
let mock_err = MockError::with_backtrace(StatusCode::TableColumnNotFound);
Err(query::error::Error::new(mock_err))
}
#[test]
fn test_error() {
let err = throw_query_error().context(DatabaseQuerySnafu).unwrap_err();
assert_eq!(StatusCode::TableColumnNotFound, err.status_code());
assert!(err.backtrace_opt().is_some());
}
}

220
src/script/src/table.rs Normal file
View File

@@ -0,0 +1,220 @@
//! Scripts table
use std::collections::HashMap;
use std::sync::Arc;
use catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, SCRIPTS_TABLE_ID};
use catalog::{CatalogManagerRef, RegisterSystemTableRequest};
use common_recordbatch::util as record_util;
use common_telemetry::logging;
use common_time::timestamp::Timestamp;
use common_time::util;
use datatypes::arrow::array::Utf8Array;
use datatypes::prelude::ConcreteDataType;
use datatypes::prelude::ScalarVector;
use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder};
use datatypes::vectors::{StringVector, TimestampVector, VectorRef};
use query::{Output, QueryEngineRef};
use snafu::{ensure, OptionExt, ResultExt};
use table::requests::{CreateTableRequest, InsertRequest};
use crate::error::{
CastTypeSnafu, CollectRecordsSnafu, FindScriptSnafu, FindScriptsTableSnafu, InsertScriptSnafu,
RegisterScriptsTableSnafu, Result, ScriptNotFoundSnafu, ScriptsTableNotFoundSnafu,
};
pub const SCRIPTS_TABLE_NAME: &str = "scripts";
pub struct ScriptsTable {
catalog_manager: CatalogManagerRef,
query_engine: QueryEngineRef,
name: String,
}
impl ScriptsTable {
pub async fn new(
catalog_manager: CatalogManagerRef,
query_engine: QueryEngineRef,
) -> Result<Self> {
let schema = Arc::new(build_scripts_schema());
// TODO(dennis): we put scripts table into default catalog and schema.
// maybe put into system catalog?
let request = CreateTableRequest {
id: SCRIPTS_TABLE_ID,
catalog_name: Some(DEFAULT_CATALOG_NAME.to_string()),
schema_name: Some(DEFAULT_SCHEMA_NAME.to_string()),
table_name: SCRIPTS_TABLE_NAME.to_string(),
desc: Some("Scripts table".to_string()),
schema,
// name and timestamp as primary key
primary_key_indices: vec![0, 3],
create_if_not_exists: true,
table_options: HashMap::default(),
};
catalog_manager
.register_system_table(RegisterSystemTableRequest {
create_table_request: request,
open_hook: None,
})
.await
.context(RegisterScriptsTableSnafu)?;
Ok(Self {
catalog_manager,
query_engine,
name: catalog::format_full_table_name(
DEFAULT_CATALOG_NAME,
DEFAULT_SCHEMA_NAME,
SCRIPTS_TABLE_NAME,
),
})
}
pub async fn insert(&self, name: &str, script: &str) -> Result<()> {
let mut columns_values: HashMap<String, VectorRef> = HashMap::with_capacity(7);
columns_values.insert(
"name".to_string(),
Arc::new(StringVector::from(vec![name])) as _,
);
columns_values.insert(
"script".to_string(),
Arc::new(StringVector::from(vec![script])) as _,
);
// TODO(dennis): we only supports python right now.
columns_values.insert(
"engine".to_string(),
Arc::new(StringVector::from(vec!["python"])) as _,
);
// Timestamp in key part is intentionally left to 0
columns_values.insert(
"timestamp".to_string(),
Arc::new(TimestampVector::from_slice(&[Timestamp::from_millis(0)])) as _,
);
columns_values.insert(
"gmt_created".to_string(),
Arc::new(TimestampVector::from_slice(&[Timestamp::from_millis(
util::current_time_millis(),
)])) as _,
);
columns_values.insert(
"gmt_modified".to_string(),
Arc::new(TimestampVector::from_slice(&[Timestamp::from_millis(
util::current_time_millis(),
)])) as _,
);
let table = self
.catalog_manager
.table(
Some(DEFAULT_CATALOG_NAME),
Some(DEFAULT_SCHEMA_NAME),
SCRIPTS_TABLE_NAME,
)
.context(FindScriptsTableSnafu)?
.context(ScriptsTableNotFoundSnafu)?;
let _ = table
.insert(InsertRequest {
table_name: SCRIPTS_TABLE_NAME.to_string(),
columns_values,
})
.await
.context(InsertScriptSnafu { name })?;
logging::info!("Inserted script: name={} into scripts table.", name);
Ok(())
}
pub async fn find_script_by_name(&self, name: &str) -> Result<String> {
// FIXME(dennis): SQL injection
// TODO(dennis): we use sql to find the script, the better way is use a function
// such as `find_record_by_primary_key` in table_engine.
let sql = format!("select script from {} where name='{}'", self.name(), name);
let plan = self
.query_engine
.sql_to_plan(&sql)
.context(FindScriptSnafu { name })?;
let stream = match self
.query_engine
.execute(&plan)
.await
.context(FindScriptSnafu { name })?
{
Output::RecordBatch(stream) => stream,
_ => unreachable!(),
};
let records = record_util::collect(stream)
.await
.context(CollectRecordsSnafu)?;
ensure!(!records.is_empty(), ScriptNotFoundSnafu { name });
assert_eq!(records.len(), 1);
assert_eq!(records[0].df_recordbatch.num_columns(), 1);
let record = &records[0].df_recordbatch;
let script_column = record
.column(0)
.as_any()
.downcast_ref::<Utf8Array<i32>>()
.context(CastTypeSnafu {
msg: format!(
"can't downcast {:?} array into utf8 array",
record.column(0).data_type()
),
})?;
assert_eq!(script_column.len(), 1);
Ok(script_column.value(0).to_string())
}
#[inline]
pub fn name(&self) -> &str {
&self.name
}
}
/// Build scripts table
fn build_scripts_schema() -> Schema {
let cols = vec![
ColumnSchema::new(
"name".to_string(),
ConcreteDataType::string_datatype(),
false,
),
ColumnSchema::new(
"script".to_string(),
ConcreteDataType::string_datatype(),
false,
),
ColumnSchema::new(
"engine".to_string(),
ConcreteDataType::string_datatype(),
false,
),
ColumnSchema::new(
"timestamp".to_string(),
ConcreteDataType::timestamp_millis_datatype(),
false,
),
ColumnSchema::new(
"gmt_created".to_string(),
ConcreteDataType::timestamp_millis_datatype(),
false,
),
ColumnSchema::new(
"gmt_modified".to_string(),
ConcreteDataType::timestamp_millis_datatype(),
false,
),
];
SchemaBuilder::from(cols)
.timestamp_index(3)
.build()
.unwrap()
}