feat: execute python script in distributed mode (#1264)

* feat: execute python script in distributed mode

* fix: rebase develop
This commit is contained in:
LFC
2023-04-02 20:36:48 +08:00
committed by GitHub
parent d2542552d3
commit 48c2841e4d
27 changed files with 343 additions and 163 deletions

2
Cargo.lock generated
View File

@@ -2413,7 +2413,6 @@ dependencies = [
"prost",
"query",
"regex",
"script",
"serde",
"serde_json",
"servers",
@@ -2920,6 +2919,7 @@ dependencies = [
"prost",
"query",
"rustls",
"script",
"serde",
"serde_json",
"servers",

View File

@@ -219,6 +219,12 @@ pub enum Error {
#[snafu(backtrace)]
source: table::error::Error,
},
#[snafu(display("Invalid system table definition: {err_msg}"))]
InvalidSystemTableDef {
err_msg: String,
backtrace: Backtrace,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -231,7 +237,8 @@ impl ErrorExt for Error {
| Error::TableNotFound { .. }
| Error::IllegalManagerState { .. }
| Error::CatalogNotFound { .. }
| Error::InvalidEntryType { .. } => StatusCode::Unexpected,
| Error::InvalidEntryType { .. }
| Error::InvalidSystemTableDef { .. } => StatusCode::Unexpected,
Error::SystemCatalog { .. }
| Error::EmptyValue { .. }

View File

@@ -236,8 +236,9 @@ async fn build_frontend(
plugins: Arc<Plugins>,
datanode_instance: InstanceRef,
) -> Result<FeInstance> {
let mut frontend_instance = FeInstance::new_standalone(datanode_instance.clone());
frontend_instance.set_script_handler(datanode_instance);
let mut frontend_instance = FeInstance::try_new_standalone(datanode_instance.clone())
.await
.context(StartFrontendSnafu)?;
frontend_instance.set_plugins(plugins.clone());
Ok(frontend_instance)
}

View File

@@ -4,10 +4,6 @@ version.workspace = true
edition.workspace = true
license.workspace = true
[features]
default = ["python"]
python = ["dep:script"]
[dependencies]
async-compat = "0.2"
async-stream.workspace = true
@@ -49,7 +45,6 @@ pin-project = "1.0"
prost.workspace = true
query = { path = "../query" }
regex = "1.6"
script = { path = "../script", features = ["python"], optional = true }
serde = "1.0"
serde_json = "1.0"
servers = { path = "../servers" }

View File

@@ -314,12 +314,6 @@ pub enum Error {
source: sql::error::Error,
},
#[snafu(display("Failed to start script manager, source: {}", source))]
StartScriptManager {
#[snafu(backtrace)]
source: script::error::Error,
},
#[snafu(display(
"Failed to parse string to timestamp, string: {}, source: {}",
raw,
@@ -601,7 +595,6 @@ impl ErrorExt for Error {
| WriteObject { .. }
| ListObjects { .. } => StatusCode::StorageUnavailable,
OpenLogStore { source } => source.status_code(),
StartScriptManager { source } => source.status_code(),
OpenStorageEngine { source } => source.status_code(),
RuntimeResource { .. } => StatusCode::RuntimeResourcesExhausted,
MetaClientInit { source, .. } => source.status_code(),

View File

@@ -58,11 +58,9 @@ use crate::error::{
NewCatalogSnafu, OpenLogStoreSnafu, RecoverProcedureSnafu, Result, ShutdownInstanceSnafu,
};
use crate::heartbeat::HeartbeatTask;
use crate::script::ScriptExecutor;
use crate::sql::{SqlHandler, SqlRequest};
mod grpc;
mod script;
pub mod sql;
pub(crate) type DefaultEngine = MitoEngine<EngineImpl<RaftEngineLogStore>>;
@@ -72,7 +70,6 @@ pub struct Instance {
pub(crate) query_engine: QueryEngineRef,
pub(crate) sql_handler: SqlHandler,
pub(crate) catalog_manager: CatalogManagerRef,
pub(crate) script_executor: ScriptExecutor,
pub(crate) table_id_provider: Option<TableIdProviderRef>,
pub(crate) heartbeat_task: Option<HeartbeatTask>,
procedure_manager: Option<ProcedureManagerRef>,
@@ -170,8 +167,6 @@ impl Instance {
let factory = QueryEngineFactory::new(catalog_manager.clone());
let query_engine = factory.query_engine();
let script_executor =
ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()).await?;
let heartbeat_task = match opts.mode {
Mode::Standalone => None,
@@ -205,7 +200,6 @@ impl Instance {
procedure_manager.clone(),
),
catalog_manager,
script_executor,
heartbeat_task,
table_id_provider,
procedure_manager,

View File

@@ -21,7 +21,6 @@ mod heartbeat;
pub mod instance;
pub mod metrics;
mod mock;
mod script;
pub mod server;
pub mod sql;
#[cfg(test)]

View File

@@ -15,6 +15,4 @@
//! datanode metrics
pub const METRIC_HANDLE_SQL_ELAPSED: &str = "datanode.handle_sql_elapsed";
pub const METRIC_HANDLE_SCRIPTS_ELAPSED: &str = "datanode.handle_scripts_elapsed";
pub const METRIC_RUN_SCRIPT_ELAPSED: &str = "datanode.run_script_elapsed";
pub const METRIC_HANDLE_PROMQL_ELAPSED: &str = "datanode.handle_promql_elapsed";

View File

@@ -4,6 +4,10 @@ version.workspace = true
edition.workspace = true
license.workspace = true
[features]
default = ["python"]
python = ["dep:script"]
[dependencies]
api = { path = "../api" }
async-stream.workspace = true
@@ -37,6 +41,7 @@ partition = { path = "../partition" }
prost.workspace = true
query = { path = "../query" }
rustls = "0.20"
script = { path = "../script", features = ["python"], optional = true }
serde = "1.0"
serde_json = "1.0"
servers = { path = "../servers" }

View File

@@ -16,8 +16,12 @@ use std::any::Any;
use std::collections::HashSet;
use std::sync::Arc;
use api::v1::CreateTableExpr;
use async_trait::async_trait;
use catalog::error::{self as catalog_err, InvalidCatalogValueSnafu, Result as CatalogResult};
use catalog::error::{
self as catalog_err, InternalSnafu, InvalidCatalogValueSnafu, InvalidSystemTableDefSnafu,
Result as CatalogResult, UnimplementedSnafu,
};
use catalog::helper::{
build_catalog_prefix, build_schema_prefix, build_table_global_prefix, CatalogKey, SchemaKey,
TableGlobalKey, TableGlobalValue,
@@ -28,6 +32,7 @@ use catalog::{
RegisterSchemaRequest, RegisterSystemTableRequest, RegisterTableRequest, RenameTableRequest,
SchemaProvider, SchemaProviderRef,
};
use common_error::prelude::BoxedError;
use common_telemetry::error;
use futures::StreamExt;
use meta_client::rpc::TableName;
@@ -36,6 +41,8 @@ use snafu::prelude::*;
use table::TableRef;
use crate::datanode::DatanodeClients;
use crate::expr_factory;
use crate::instance::distributed::DistInstance;
use crate::table::DistTable;
#[derive(Clone)]
@@ -43,6 +50,12 @@ pub struct FrontendCatalogManager {
backend: KvBackendRef,
partition_manager: PartitionRuleManagerRef,
datanode_clients: Arc<DatanodeClients>,
// TODO(LFC): Remove this field.
// DistInstance in FrontendCatalogManager is only used for creating distributed script table now.
// Once we have some standalone distributed table creator (like create distributed table procedure),
// we should use that.
dist_instance: Option<Arc<DistInstance>>,
}
impl FrontendCatalogManager {
@@ -55,9 +68,14 @@ impl FrontendCatalogManager {
backend,
partition_manager,
datanode_clients,
dist_instance: None,
}
}
pub(crate) fn set_dist_instance(&mut self, dist_instance: Arc<DistInstance>) {
self.dist_instance = Some(dist_instance)
}
pub(crate) fn backend(&self) -> KvBackendRef {
self.backend.clone()
}
@@ -106,9 +124,93 @@ impl CatalogManager for FrontendCatalogManager {
async fn register_system_table(
&self,
_request: RegisterSystemTableRequest,
request: RegisterSystemTableRequest,
) -> catalog::error::Result<()> {
unimplemented!()
if let Some(dist_instance) = &self.dist_instance {
let open_hook = request.open_hook;
let request = request.create_table_request;
if let Some(table) = self
.table(
&request.catalog_name,
&request.schema_name,
&request.table_name,
)
.await?
{
if let Some(hook) = open_hook {
(hook)(table)?;
}
return Ok(());
}
let time_index = request
.schema
.column_schemas
.iter()
.find_map(|x| {
if x.is_time_index() {
Some(x.name.clone())
} else {
None
}
})
.context(InvalidSystemTableDefSnafu {
err_msg: "Time index is not defined.",
})?;
let primary_keys = request
.schema
.column_schemas
.iter()
.enumerate()
.filter_map(|(i, x)| {
if request.primary_key_indices.contains(&i) {
Some(x.name.clone())
} else {
None
}
})
.collect();
let column_defs = expr_factory::column_schemas_to_defs(request.schema.column_schemas)
.map_err(|e| {
InvalidSystemTableDefSnafu {
err_msg: e.to_string(),
}
.build()
})?;
let mut create_table = CreateTableExpr {
catalog_name: request.catalog_name,
schema_name: request.schema_name,
table_name: request.table_name,
desc: request.desc.unwrap_or("".to_string()),
column_defs,
time_index,
primary_keys,
create_if_not_exists: request.create_if_not_exists,
table_options: (&request.table_options).into(),
table_id: None, // Should and will be assigned by Meta.
region_ids: vec![0],
};
let table = dist_instance
.create_table(&mut create_table, None)
.await
.map_err(BoxedError::new)
.context(InternalSnafu)?;
if let Some(hook) = open_hook {
(hook)(table)?;
}
Ok(())
} else {
UnimplementedSnafu {
operation: "register system table",
}
.fail()
}
}
fn schema(
@@ -330,3 +432,70 @@ impl SchemaProvider for FrontendSchemaProvider {
Ok(self.table_names()?.contains(&name.to_string()))
}
}
#[cfg(test)]
mod tests {
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use script::table::{build_scripts_schema, SCRIPTS_TABLE_NAME};
use table::requests::{CreateTableRequest, TableOptions};
use super::*;
#[tokio::test(flavor = "multi_thread")]
async fn test_register_system_table() {
let instance =
crate::tests::create_distributed_instance("test_register_system_table").await;
let catalog_name = DEFAULT_CATALOG_NAME;
let schema_name = DEFAULT_SCHEMA_NAME;
let table_name = SCRIPTS_TABLE_NAME;
let request = CreateTableRequest {
id: 1,
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
table_name: table_name.to_string(),
desc: Some("Scripts table".to_string()),
schema: build_scripts_schema(),
region_numbers: vec![0],
primary_key_indices: vec![0, 1],
create_if_not_exists: true,
table_options: TableOptions::default(),
};
let result = instance
.catalog_manager
.register_system_table(RegisterSystemTableRequest {
create_table_request: request,
open_hook: None,
})
.await;
assert!(result.is_ok());
assert!(
instance
.catalog_manager
.table(catalog_name, schema_name, table_name)
.await
.unwrap()
.is_some(),
"the registered system table cannot be found in catalog"
);
let mut actually_created_table_in_datanode = 0;
for datanode in instance.datanodes.values() {
if datanode
.catalog_manager()
.table(catalog_name, schema_name, table_name)
.await
.unwrap()
.is_some()
{
actually_created_table_in_datanode += 1;
}
}
assert_eq!(
actually_created_table_in_datanode, 1,
"system table should be actually created at one and only one datanode"
)
}
}

View File

@@ -378,6 +378,12 @@ pub enum Error {
#[snafu(backtrace)]
source: table::error::Error,
},
#[snafu(display("Failed to start script manager, source: {}", source))]
StartScriptManager {
#[snafu(backtrace)]
source: script::error::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -462,6 +468,8 @@ impl ErrorExt for Error {
source.status_code()
}
Error::UnrecognizedTableOption { .. } => StatusCode::InvalidArguments,
Error::StartScriptManager { source } => source.status_code(),
}
}

View File

@@ -187,7 +187,12 @@ fn columns_to_expr(
.iter()
.map(|c| column_def_to_schema(c, c.name.to_string() == time_index).context(ParseSqlSnafu))
.collect::<Result<Vec<ColumnSchema>>>()?;
column_schemas_to_defs(column_schemas)
}
pub(crate) fn column_schemas_to_defs(
column_schemas: Vec<ColumnSchema>,
) -> Result<Vec<api::v1::ColumnDef>> {
let column_datatypes = column_schemas
.iter()
.map(|c| {

View File

@@ -17,6 +17,7 @@ mod grpc;
mod influxdb;
mod opentsdb;
mod prometheus;
mod script;
mod standalone;
use std::collections::HashMap;
@@ -40,7 +41,6 @@ use common_telemetry::timer;
use datafusion::sql::sqlparser::ast::ObjectName;
use datanode::instance::sql::table_idents_to_full_name;
use datanode::instance::InstanceRef as DnInstanceRef;
use datanode::metrics;
use datatypes::schema::Schema;
use distributed::DistInstance;
use meta_client::client::{MetaClient, MetaClientBuilder};
@@ -58,7 +58,6 @@ use servers::query_handler::grpc::{GrpcQueryHandler, GrpcQueryHandlerRef};
use servers::query_handler::sql::SqlQueryHandler;
use servers::query_handler::{
InfluxdbLineProtocolHandler, OpentsdbProtocolHandler, PrometheusProtocolHandler, ScriptHandler,
ScriptHandlerRef,
};
use session::context::QueryContextRef;
use snafu::prelude::*;
@@ -80,6 +79,8 @@ use crate::error::{
use crate::expr_factory::{CreateExprFactoryRef, DefaultCreateExprFactory};
use crate::frontend::FrontendOptions;
use crate::instance::standalone::StandaloneGrpcQueryHandler;
use crate::metric;
use crate::script::ScriptExecutor;
use crate::server::{start_server, ServerHandlers, Services};
#[async_trait]
@@ -103,9 +104,7 @@ pub type FrontendInstanceRef = Arc<dyn FrontendInstance>;
#[derive(Clone)]
pub struct Instance {
catalog_manager: CatalogManagerRef,
/// Script handler is None in distributed mode, only works on standalone mode.
script_handler: Option<ScriptHandlerRef>,
script_executor: Arc<ScriptExecutor>,
statement_handler: StatementHandlerRef,
query_engine: QueryEngineRef,
grpc_query_handler: GrpcQueryHandlerRef<Error>,
@@ -134,23 +133,29 @@ impl Instance {
let partition_manager = Arc::new(PartitionRuleManager::new(table_routes));
let datanode_clients = Arc::new(DatanodeClients::default());
let catalog_manager = Arc::new(FrontendCatalogManager::new(
meta_backend,
partition_manager,
datanode_clients.clone(),
));
let mut catalog_manager =
FrontendCatalogManager::new(meta_backend, partition_manager, datanode_clients.clone());
let dist_instance =
DistInstance::new(meta_client, catalog_manager.clone(), datanode_clients);
let dist_instance = DistInstance::new(
meta_client,
Arc::new(catalog_manager.clone()),
datanode_clients,
);
let dist_instance = Arc::new(dist_instance);
catalog_manager.set_dist_instance(dist_instance.clone());
let catalog_manager = Arc::new(catalog_manager);
let query_engine =
QueryEngineFactory::new_with_plugins(catalog_manager.clone(), plugins.clone())
.query_engine();
let script_executor =
Arc::new(ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()).await?);
Ok(Instance {
catalog_manager,
script_handler: None,
script_executor,
create_expr_factory: Arc::new(DefaultCreateExprFactory),
statement_handler: dist_instance.clone(),
query_engine,
@@ -191,18 +196,22 @@ impl Instance {
Ok(Arc::new(meta_client))
}
pub fn new_standalone(dn_instance: DnInstanceRef) -> Self {
Instance {
catalog_manager: dn_instance.catalog_manager().clone(),
script_handler: None,
pub async fn try_new_standalone(dn_instance: DnInstanceRef) -> Result<Self> {
let catalog_manager = dn_instance.catalog_manager();
let query_engine = dn_instance.query_engine();
let script_executor =
Arc::new(ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()).await?);
Ok(Instance {
catalog_manager: catalog_manager.clone(),
script_executor,
create_expr_factory: Arc::new(DefaultCreateExprFactory),
statement_handler: dn_instance.clone(),
query_engine: dn_instance.query_engine(),
query_engine,
grpc_query_handler: StandaloneGrpcQueryHandler::arc(dn_instance.clone()),
promql_handler: Some(dn_instance.clone()),
plugins: Default::default(),
servers: Arc::new(HashMap::new()),
}
})
}
pub async fn build_servers(
@@ -217,12 +226,19 @@ impl Instance {
}
#[cfg(test)]
pub(crate) fn new_distributed(dist_instance: Arc<DistInstance>) -> Self {
let catalog_manager = dist_instance.catalog_manager();
pub(crate) async fn new_distributed(
catalog_manager: CatalogManagerRef,
dist_instance: Arc<DistInstance>,
) -> Self {
let query_engine = QueryEngineFactory::new(catalog_manager.clone()).query_engine();
let script_executor = Arc::new(
ScriptExecutor::new(catalog_manager.clone(), query_engine.clone())
.await
.unwrap(),
);
Instance {
catalog_manager,
script_handler: None,
script_executor,
statement_handler: dist_instance.clone(),
query_engine,
create_expr_factory: Arc::new(DefaultCreateExprFactory),
@@ -237,14 +253,6 @@ impl Instance {
&self.catalog_manager
}
pub fn set_script_handler(&mut self, handler: ScriptHandlerRef) {
debug_assert!(
self.script_handler.is_none(),
"Script handler can be set only once!"
);
self.script_handler = Some(handler);
}
/// Handle batch inserts
pub async fn handle_inserts(
&self,
@@ -532,7 +540,7 @@ impl SqlQueryHandler for Instance {
type Error = Error;
async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>> {
let _timer = timer!(metrics::METRIC_HANDLE_SQL_ELAPSED);
let _timer = timer!(metric::METRIC_HANDLE_SQL_ELAPSED);
let query_interceptor = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
let query = match query_interceptor.pre_parsing(query, query_ctx.clone()) {
@@ -620,41 +628,6 @@ impl SqlQueryHandler for Instance {
}
}
#[async_trait]
impl ScriptHandler for Instance {
async fn insert_script(
&self,
schema: &str,
name: &str,
script: &str,
) -> server_error::Result<()> {
if let Some(handler) = &self.script_handler {
handler.insert_script(schema, name, script).await
} else {
server_error::NotSupportedSnafu {
feat: "Script execution in Frontend",
}
.fail()
}
}
async fn execute_script(
&self,
schema: &str,
script: &str,
params: HashMap<String, String>,
) -> server_error::Result<Output> {
if let Some(handler) = &self.script_handler {
handler.execute_script(schema, script, params).await
} else {
server_error::NotSupportedSnafu {
feat: "Script execution in Frontend",
}
.fail()
}
}
}
#[async_trait]
impl PromHandler for Instance {
async fn do_query(&self, query: &PromQuery) -> server_error::Result<Output> {

View File

@@ -56,6 +56,7 @@ use sql::statements::statement::Statement;
use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType};
use table::requests::TableOptions;
use table::table::AlterContext;
use table::TableRef;
use crate::catalog::FrontendCatalogManager;
use crate::datanode::DatanodeClients;
@@ -93,14 +94,14 @@ impl DistInstance {
&self,
create_table: &mut CreateTableExpr,
partitions: Option<Partitions>,
) -> Result<Output> {
) -> Result<TableRef> {
let table_name = TableName::new(
&create_table.catalog_name,
&create_table.schema_name,
&create_table.table_name,
);
if self
if let Some(table) = self
.catalog_manager
.table(
&table_name.catalog_name,
@@ -109,10 +110,9 @@ impl DistInstance {
)
.await
.context(CatalogSnafu)?
.is_some()
{
return if create_table.create_if_not_exists {
Ok(Output::AffectedRows(0))
Ok(table)
} else {
TableAlreadyExistSnafu {
table: table_name.to_string(),
@@ -153,20 +153,20 @@ impl DistInstance {
create_table.table_id = Some(TableId { id: table_id });
let table = DistTable::new(
let table = Arc::new(DistTable::new(
table_name.clone(),
table_info,
self.catalog_manager.partition_manager(),
self.catalog_manager.datanode_clients(),
self.catalog_manager.backend(),
);
));
let request = RegisterTableRequest {
catalog: table_name.catalog_name.clone(),
schema: table_name.schema_name.clone(),
table_name: table_name.table_name.clone(),
table_id,
table: Arc::new(table),
table: table.clone(),
};
ensure!(
self.catalog_manager
@@ -196,9 +196,7 @@ impl DistInstance {
.await
.context(RequestDatanodeSnafu)?;
}
// Checked in real MySQL, it truly returns "0 rows affected".
Ok(Output::AffectedRows(0))
Ok(table)
}
async fn drop_table(&self, table_name: TableName) -> Result<Output> {
@@ -329,7 +327,8 @@ impl DistInstance {
}
Statement::CreateTable(stmt) => {
let create_expr = &mut expr_factory::create_to_expr(&stmt, query_ctx)?;
Ok(self.create_table(create_expr, stmt.partitions).await?)
let _ = self.create_table(create_expr, stmt.partitions).await?;
Ok(Output::AffectedRows(0))
}
Statement::Alter(alter_table) => {
let expr = grpc::to_alter_expr(alter_table, query_ctx)?;

View File

@@ -49,7 +49,8 @@ impl GrpcQueryHandler for DistInstance {
DdlExpr::CreateTable(mut expr) => {
// TODO(LFC): Support creating distributed table through GRPC interface.
// Currently only SQL supports it; how to design the fields in CreateTableExpr?
self.create_table(&mut expr, None).await
let _ = self.create_table(&mut expr, None).await;
Ok(Output::AffectedRows(0))
}
DdlExpr::Alter(expr) => self.handle_alter_table(expr).await,
DdlExpr::DropTable(expr) => {

View File

@@ -383,8 +383,6 @@ CREATE TABLE {table_name} (
// Wait for previous task finished
flush_table(frontend, "greptime", "public", table_name, None).await;
let table_id = 1024;
let table = instance
.frontend
.catalog_manager()
@@ -394,7 +392,7 @@ CREATE TABLE {table_name} (
.unwrap();
let table = table.as_any().downcast_ref::<DistTable>().unwrap();
let TableGlobalValue { regions_id_map, .. } = table
let tgv = table
.table_global_value(&TableGlobalKey {
catalog_name: "greptime".to_string(),
schema_name: "public".to_string(),
@@ -403,7 +401,10 @@ CREATE TABLE {table_name} (
.await
.unwrap()
.unwrap();
let region_to_dn_map = regions_id_map
let table_id = tgv.table_id();
let region_to_dn_map = tgv
.regions_id_map
.iter()
.map(|(k, v)| (v[0], *k))
.collect::<HashMap<u32, u64>>();

View File

@@ -20,7 +20,7 @@ use common_telemetry::timer;
use servers::query_handler::ScriptHandler;
use crate::instance::Instance;
use crate::metrics;
use crate::metric;
#[async_trait]
impl ScriptHandler for Instance {
@@ -30,7 +30,7 @@ impl ScriptHandler for Instance {
name: &str,
script: &str,
) -> servers::error::Result<()> {
let _timer = timer!(metrics::METRIC_HANDLE_SCRIPTS_ELAPSED);
let _timer = timer!(metric::METRIC_HANDLE_SCRIPTS_ELAPSED);
self.script_executor
.insert_script(schema, name, script)
.await
@@ -42,7 +42,7 @@ impl ScriptHandler for Instance {
name: &str,
params: HashMap<String, String>,
) -> servers::error::Result<Output> {
let _timer = timer!(metrics::METRIC_RUN_SCRIPT_ELAPSED);
let _timer = timer!(metric::METRIC_RUN_SCRIPT_ELAPSED);
self.script_executor
.execute_script(schema, name, params)
.await

View File

@@ -13,6 +13,7 @@
// limitations under the License.
#![feature(assert_matches)]
#![feature(trait_upcasting)]
pub mod catalog;
pub mod datanode;
@@ -22,11 +23,13 @@ pub mod frontend;
pub mod grpc;
pub mod influxdb;
pub mod instance;
pub(crate) mod metric;
pub mod mysql;
pub mod opentsdb;
pub mod postgres;
pub mod prom;
pub mod prometheus;
mod script;
mod server;
mod table;
#[cfg(test)]

View File

@@ -0,0 +1,17 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub(crate) const METRIC_HANDLE_SQL_ELAPSED: &str = "frontend.handle_sql_elapsed";
pub(crate) const METRIC_HANDLE_SCRIPTS_ELAPSED: &str = "frontend.handle_scripts_elapsed";
pub(crate) const METRIC_RUN_SCRIPT_ELAPSED: &str = "frontend.run_script_elapsed";

View File

@@ -59,6 +59,7 @@ pub(crate) struct MockDistributedInstance {
pub(crate) frontend: Arc<Instance>,
pub(crate) dist_instance: Arc<DistInstance>,
pub(crate) datanodes: HashMap<u64, Arc<DatanodeInstance>>,
pub(crate) catalog_manager: Arc<FrontendCatalogManager>,
_guards: Vec<TestGuard>,
}
@@ -81,11 +82,11 @@ impl MockStandaloneInstance {
pub(crate) async fn create_standalone_instance(test_name: &str) -> MockStandaloneInstance {
let (opts, guard) = create_tmp_dir_and_datanode_opts(test_name);
let datanode_instance = DatanodeInstance::new(&opts).await.unwrap();
datanode_instance.start().await.unwrap();
let frontend_instance = Instance::new_standalone(Arc::new(datanode_instance));
let dn_instance = Arc::new(DatanodeInstance::new(&opts).await.unwrap());
let frontend_instance = Instance::try_new_standalone(dn_instance.clone())
.await
.unwrap();
dn_instance.start().await.unwrap();
MockStandaloneInstance {
instance: Arc::new(frontend_instance),
_guard: guard,
@@ -270,26 +271,28 @@ pub(crate) async fn create_distributed_instance(test_name: &str) -> MockDistribu
let partition_manager = Arc::new(PartitionRuleManager::new(Arc::new(TableRoutes::new(
meta_client.clone(),
))));
let catalog_manager = Arc::new(FrontendCatalogManager::new(
meta_backend,
partition_manager,
datanode_clients.clone(),
));
let mut catalog_manager =
FrontendCatalogManager::new(meta_backend, partition_manager, datanode_clients.clone());
wait_datanodes_alive(kv_store).await;
let dist_instance = DistInstance::new(
meta_client.clone(),
catalog_manager,
Arc::new(catalog_manager.clone()),
datanode_clients.clone(),
);
let dist_instance = Arc::new(dist_instance);
let frontend = Instance::new_distributed(dist_instance.clone());
catalog_manager.set_dist_instance(dist_instance.clone());
let catalog_manager = Arc::new(catalog_manager);
let frontend = Instance::new_distributed(catalog_manager.clone(), dist_instance.clone()).await;
MockDistributedInstance {
frontend: Arc::new(frontend),
dist_instance,
datanodes: datanode_instances,
catalog_manager,
_guards: test_guards,
}
}

View File

@@ -18,4 +18,4 @@ pub mod error;
pub mod manager;
#[cfg(feature = "python")]
pub mod python;
mod table;
pub mod table;

View File

@@ -205,7 +205,7 @@ impl ScriptsTable {
}
/// Build scripts table
fn build_scripts_schema() -> RawSchema {
pub fn build_scripts_schema() -> RawSchema {
let cols = vec![
ColumnSchema::new(
"schema".to_string(),

View File

@@ -27,7 +27,7 @@ use datanode::datanode::{
DatanodeOptions, FileConfig, ObjectStoreConfig, OssConfig, S3Config, StorageConfig, WalConfig,
};
use datanode::error::{CreateTableSnafu, Result};
use datanode::instance::{Instance, InstanceRef};
use datanode::instance::Instance;
use datanode::sql::SqlHandler;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, RawSchema};
@@ -255,16 +255,9 @@ pub async fn create_test_table(
Ok(())
}
fn build_frontend_instance(datanode_instance: InstanceRef) -> FeInstance {
let mut frontend_instance = FeInstance::new_standalone(datanode_instance.clone());
frontend_instance.set_script_handler(datanode_instance);
frontend_instance
}
pub async fn setup_test_http_app(store_type: StorageType, name: &str) -> (Router, TestGuard) {
let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name);
let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap());
instance.start().await.unwrap();
create_test_table(
instance.catalog_manager(),
instance.sql_handler(),
@@ -272,9 +265,13 @@ pub async fn setup_test_http_app(store_type: StorageType, name: &str) -> (Router
)
.await
.unwrap();
let frontend_instance = FeInstance::try_new_standalone(instance.clone())
.await
.unwrap();
instance.start().await.unwrap();
let http_server = HttpServerBuilder::new(HttpOptions::default())
.with_sql_handler(ServerSqlQueryHandlerAdaptor::arc(Arc::new(
build_frontend_instance(instance.clone()),
frontend_instance,
)))
.with_grpc_handler(ServerGrpcQueryHandlerAdaptor::arc(instance.clone()))
.with_metrics_handler(MetricsHandler)
@@ -288,7 +285,9 @@ pub async fn setup_test_http_app_with_frontend(
) -> (Router, TestGuard) {
let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name);
let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap());
let frontend = build_frontend_instance(instance.clone());
let frontend = FeInstance::try_new_standalone(instance.clone())
.await
.unwrap();
instance.start().await.unwrap();
create_test_table(
frontend.catalog_manager(),
@@ -300,8 +299,8 @@ pub async fn setup_test_http_app_with_frontend(
let frontend_ref = Arc::new(frontend);
let http_server = HttpServerBuilder::new(HttpOptions::default())
.with_sql_handler(ServerSqlQueryHandlerAdaptor::arc(frontend_ref.clone()))
.with_grpc_handler(ServerGrpcQueryHandlerAdaptor::arc(frontend_ref))
.with_script_handler(instance.clone())
.with_grpc_handler(ServerGrpcQueryHandlerAdaptor::arc(frontend_ref.clone()))
.with_script_handler(frontend_ref)
.build();
let app = http_server.make_app();
(app, guard)
@@ -313,7 +312,9 @@ pub async fn setup_test_prom_app_with_frontend(
) -> (Router, TestGuard) {
let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name);
let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap());
let frontend = build_frontend_instance(instance.clone());
let frontend = FeInstance::try_new_standalone(instance.clone())
.await
.unwrap();
instance.start().await.unwrap();
create_test_table(
frontend.catalog_manager(),
@@ -335,7 +336,6 @@ pub async fn setup_grpc_server(
let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name);
let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap());
instance.start().await.unwrap();
let runtime = Arc::new(
RuntimeBuilder::default()
@@ -347,7 +347,10 @@ pub async fn setup_grpc_server(
let fe_grpc_addr = format!("127.0.0.1:{}", get_port());
let fe_instance = frontend::instance::Instance::new_standalone(instance.clone());
let fe_instance = FeInstance::try_new_standalone(instance.clone())
.await
.unwrap();
instance.start().await.unwrap();
let fe_instance_ref = Arc::new(fe_instance);
let fe_grpc_server = Arc::new(GrpcServer::new(
ServerGrpcQueryHandlerAdaptor::arc(fe_instance_ref),

View File

@@ -327,7 +327,7 @@ pub async fn test_metrics_api(store_type: StorageType) {
let res = client.get("/metrics").send().await;
assert_eq!(res.status(), StatusCode::OK);
let body = res.text().await;
assert!(body.contains("datanode_handle_sql_elapsed"));
assert!(body.contains("frontend_handle_sql_elapsed"));
guard.remove_all().await;
}

View File

@@ -46,10 +46,11 @@ SHOW TABLES FROM test_public_schema;
SHOW TABLES FROM public;
+--------+
| Tables |
+--------+
+--------+
+---------+
| Tables |
+---------+
| scripts |
+---------+
INSERT INTO hello VALUES (2), (3), (4);
@@ -90,10 +91,11 @@ SHOW TABLES FROM test_public_schema;
SHOW TABLES FROM public;
+--------+
| Tables |
+--------+
+--------+
+---------+
| Tables |
+---------+
| scripts |
+---------+
DROP SCHEMA test_public_schema;

View File

@@ -105,19 +105,23 @@ impl Env {
// start a distributed GreptimeDB
let mut meta_server = Env::start_server("metasrv", &db_ctx, true);
// wait for election
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
let mut frontend = Env::start_server("frontend", &db_ctx, true);
let mut datanode = Env::start_server("datanode", &db_ctx, true);
if !util::check_port(METASRV_ADDR.parse().unwrap(), Duration::from_secs(10)).await {
Env::stop_server(&mut meta_server).await;
panic!("Metasrv doesn't up in 10 seconds, quit.")
}
for addr in [DATANODE_ADDR, METASRV_ADDR, SERVER_ADDR].iter() {
let is_up = util::check_port(addr.parse().unwrap(), Duration::from_secs(10)).await;
if !is_up {
Env::stop_server(&mut meta_server).await;
Env::stop_server(&mut frontend).await;
Env::stop_server(&mut datanode).await;
panic!("Server {addr} doesn't up in 10 seconds, quit.")
}
let mut datanode = Env::start_server("datanode", &db_ctx, true);
// Wait for default catalog and schema being created.
// Can be removed once #1265 resolved, and merged with Frontend start checking below.
if !util::check_port(DATANODE_ADDR.parse().unwrap(), Duration::from_secs(10)).await {
Env::stop_server(&mut datanode).await;
panic!("Datanode doesn't up in 10 seconds, quit.")
}
let mut frontend = Env::start_server("frontend", &db_ctx, true);
if !util::check_port(SERVER_ADDR.parse().unwrap(), Duration::from_secs(10)).await {
Env::stop_server(&mut frontend).await;
panic!("Frontend doesn't up in 10 seconds, quit.")
}
let client = Client::with_urls(vec![SERVER_ADDR]);