refactor: directly invoke Datanode methods in standalone mode (part 1) (#694)

* refactor: directly invoke Datanode methods in standalone mode

* test: add more unit tests

* fix: get rid of `println` in testing codes

* fix: resolve PR comments

* fix: resolve PR comments

Co-authored-by: luofucong <luofucong@greptime.com>
This commit is contained in:
LFC
2022-12-07 11:37:59 +08:00
committed by GitHub
parent 90c832b33d
commit 833216d317
24 changed files with 738 additions and 676 deletions

1
Cargo.lock generated
View File

@@ -1984,7 +1984,6 @@ dependencies = [
"datafusion",
"datafusion-common 7.0.0",
"datatypes",
"frontend",
"futures",
"hyper",
"log-store",

View File

@@ -25,12 +25,6 @@ pub enum Error {
source: datanode::error::Error,
},
#[snafu(display("Failed to build frontend, source: {}", source))]
BuildFrontend {
#[snafu(backtrace)]
source: frontend::error::Error,
},
#[snafu(display("Failed to start frontend, source: {}", source))]
StartFrontend {
#[snafu(backtrace)]
@@ -75,7 +69,6 @@ impl ErrorExt for Error {
StatusCode::InvalidArguments
}
Error::IllegalConfig { .. } => StatusCode::InvalidArguments,
Error::BuildFrontend { source, .. } => source.status_code(),
}
}

View File

@@ -78,7 +78,7 @@ impl StartCommand {
let opts: FrontendOptions = self.try_into()?;
let mut frontend = Frontend::new(
opts.clone(),
Instance::try_new(&opts)
Instance::try_new_distributed(&opts)
.await
.context(error::StartFrontendSnafu)?,
);
@@ -213,7 +213,6 @@ mod tests {
let fe_opts = FrontendOptions::try_from(command).unwrap();
assert_eq!(Mode::Distributed, fe_opts.mode);
assert_eq!("127.0.0.1:3001".to_string(), fe_opts.datanode_rpc_addr);
assert_eq!(
"127.0.0.1:4000".to_string(),
fe_opts.http_options.as_ref().unwrap().addr

View File

@@ -28,11 +28,8 @@ use serde::{Deserialize, Serialize};
use servers::http::HttpOptions;
use servers::Mode;
use snafu::ResultExt;
use tokio::try_join;
use crate::error::{
BuildFrontendSnafu, Error, IllegalConfigSnafu, Result, StartDatanodeSnafu, StartFrontendSnafu,
};
use crate::error::{Error, IllegalConfigSnafu, Result, StartDatanodeSnafu, StartFrontendSnafu};
use crate::toml_loader;
#[derive(Parser)]
@@ -104,7 +101,6 @@ impl StandaloneOptions {
influxdb_options: self.influxdb_options,
prometheus_options: self.prometheus_options,
mode: self.mode,
datanode_rpc_addr: "127.0.0.1:3001".to_string(),
meta_client_opts: None,
}
}
@@ -162,7 +158,7 @@ impl StartCommand {
let mut datanode = Datanode::new(dn_opts.clone())
.await
.context(StartDatanodeSnafu)?;
let mut frontend = build_frontend(fe_opts, &dn_opts, datanode.get_instance()).await?;
let mut frontend = build_frontend(fe_opts, datanode.get_instance()).await?;
// Start datanode instance before starting services, to avoid requests come in before internal components are started.
datanode
@@ -171,11 +167,7 @@ impl StartCommand {
.context(StartDatanodeSnafu)?;
info!("Datanode instance started");
try_join!(
async { datanode.start_services().await.context(StartDatanodeSnafu) },
async { frontend.start().await.context(StartFrontendSnafu) }
)?;
frontend.start().await.context(StartFrontendSnafu)?;
Ok(())
}
}
@@ -183,17 +175,9 @@ impl StartCommand {
/// Build frontend instance in standalone mode
async fn build_frontend(
fe_opts: FrontendOptions,
dn_opts: &DatanodeOptions,
datanode_instance: InstanceRef,
) -> Result<Frontend<FeInstance>> {
let grpc_server_addr = &dn_opts.rpc_addr;
info!(
"Build frontend with datanode gRPC addr: {}",
grpc_server_addr
);
let mut frontend_instance = FeInstance::try_new(&fe_opts)
.await
.context(BuildFrontendSnafu)?;
let mut frontend_instance = FeInstance::new_standalone(datanode_instance.clone());
frontend_instance.set_catalog_manager(datanode_instance.catalog_manager().clone());
frontend_instance.set_script_handler(datanode_instance);
Ok(Frontend::new(fe_opts, frontend_instance))
@@ -289,7 +273,6 @@ mod tests {
let fe_opts = FrontendOptions::try_from(cmd).unwrap();
assert_eq!(Mode::Standalone, fe_opts.mode);
assert_eq!("127.0.0.1:3001".to_string(), fe_opts.datanode_rpc_addr);
assert_eq!(
"127.0.0.1:4000".to_string(),
fe_opts.http_options.as_ref().unwrap().addr

View File

@@ -23,7 +23,7 @@ use common_base::BitVec;
use common_error::prelude::ErrorExt;
use common_error::status_code::StatusCode;
use common_query::Output;
use common_recordbatch::{util, RecordBatches, SendableRecordBatchStream};
use common_recordbatch::{RecordBatches, SendableRecordBatchStream};
use datatypes::arrow::array::{Array, BooleanArray, PrimitiveArray};
use datatypes::arrow_array::{BinaryArray, StringArray};
use datatypes::schema::SchemaRef;
@@ -47,13 +47,9 @@ pub async fn to_object_result(output: std::result::Result<Output, impl ErrorExt>
}
}
async fn collect(stream: SendableRecordBatchStream) -> Result<ObjectResult> {
let schema = stream.schema();
let recordbatches = util::collect(stream)
let recordbatches = RecordBatches::try_collect(stream)
.await
.and_then(|batches| RecordBatches::try_new(schema, batches))
.context(error::CollectRecordBatchesSnafu)?;
let object_result = build_result(recordbatches)?;
Ok(object_result)
}

View File

@@ -27,7 +27,7 @@ use datatypes::prelude::VectorRef;
use datatypes::schema::{Schema, SchemaRef};
use error::Result;
use futures::task::{Context, Poll};
use futures::Stream;
use futures::{Stream, TryStreamExt};
pub use recordbatch::RecordBatch;
use snafu::ensure;
@@ -80,6 +80,12 @@ impl RecordBatches {
Ok(Self { schema, batches })
}
pub async fn try_collect(stream: SendableRecordBatchStream) -> Result<Self> {
let schema = stream.schema();
let batches = stream.try_collect::<Vec<_>>().await?;
Ok(Self { schema, batches })
}
#[inline]
pub fn empty() -> Self {
Self {

View File

@@ -29,7 +29,6 @@ datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch =
"simd",
] }
datatypes = { path = "../datatypes" }
frontend = { path = "../frontend" }
futures = "0.3"
hyper = { version = "0.14", features = ["full"] }
log-store = { path = "../log-store" }

View File

@@ -26,7 +26,7 @@ datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch =
] }
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" }
datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" }
datanode = { path = "../datanode" }
datatypes = { path = "../datatypes" }
futures = "0.3"
futures-util = "0.3"

View File

@@ -59,6 +59,16 @@ impl FrontendCatalogManager {
pub(crate) fn backend(&self) -> KvBackendRef {
self.backend.clone()
}
#[cfg(test)]
pub(crate) fn table_routes(&self) -> Arc<TableRoutes> {
self.table_routes.clone()
}
#[cfg(test)]
pub(crate) fn datanode_clients(&self) -> Arc<DatanodeClients> {
self.datanode_clients.clone()
}
}
// FIXME(hl): Frontend only needs a CatalogList, should replace with trait upcasting

View File

@@ -244,18 +244,6 @@ pub enum Error {
source: client::Error,
},
#[snafu(display("Failed to alter table, source: {}", source))]
AlterTable {
#[snafu(backtrace)]
source: client::Error,
},
#[snafu(display("Failed to drop table, source: {}", source))]
DropTable {
#[snafu(backtrace)]
source: client::Error,
},
#[snafu(display("Failed to insert values to table, source: {}", source))]
Insert {
#[snafu(backtrace)]
@@ -398,9 +386,6 @@ pub enum Error {
source: query::error::Error,
},
#[snafu(display("Unsupported expr type: {}", name))]
UnsupportedExpr { name: String, backtrace: Backtrace },
#[snafu(display("Failed to do vector computation, source: {}", source))]
VectorComputation {
#[snafu(backtrace)]
@@ -451,6 +436,12 @@ pub enum Error {
#[snafu(backtrace)]
source: substrait::error::Error,
},
#[snafu(display("Failed to invoke GRPC server, source: {}", source))]
InvokeGrpcServer {
#[snafu(backtrace)]
source: servers::error::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -470,7 +461,9 @@ impl ErrorExt for Error {
Error::RuntimeResource { source, .. } => source.status_code(),
Error::StartServer { source, .. } => source.status_code(),
Error::StartServer { source, .. } | Error::InvokeGrpcServer { source } => {
source.status_code()
}
Error::ParseSql { source } => source.status_code(),
@@ -500,7 +493,6 @@ impl ErrorExt for Error {
| Error::FindLeaderPeer { .. }
| Error::FindRegionPartition { .. }
| Error::IllegalTableRoutesData { .. }
| Error::UnsupportedExpr { .. }
| Error::BuildDfLogicalPlan { .. } => StatusCode::Internal,
Error::IllegalFrontendState { .. } | Error::IncompleteGrpcResult { .. } => {
@@ -522,8 +514,6 @@ impl ErrorExt for Error {
Error::SchemaNotFound { .. } => StatusCode::InvalidArguments,
Error::CatalogNotFound { .. } => StatusCode::InvalidArguments,
Error::CreateTable { source, .. }
| Error::AlterTable { source, .. }
| Error::DropTable { source }
| Error::Select { source, .. }
| Error::CreateDatabase { source, .. }
| Error::CreateTableOnInsertion { source, .. }

View File

@@ -40,7 +40,6 @@ pub struct FrontendOptions {
pub influxdb_options: Option<InfluxdbOptions>,
pub prometheus_options: Option<PrometheusOptions>,
pub mode: Mode,
pub datanode_rpc_addr: String,
pub meta_client_opts: Option<MetaClientOpts>,
}
@@ -55,18 +54,11 @@ impl Default for FrontendOptions {
influxdb_options: Some(InfluxdbOptions::default()),
prometheus_options: Some(PrometheusOptions::default()),
mode: Mode::Standalone,
datanode_rpc_addr: "127.0.0.1:3001".to_string(),
meta_client_opts: None,
}
}
}
impl FrontendOptions {
pub(crate) fn datanode_grpc_addr(&self) -> String {
self.datanode_rpc_addr.clone()
}
}
pub struct Frontend<T>
where
T: FrontendInstance,

View File

@@ -20,50 +20,49 @@ mod prometheus;
use std::sync::Arc;
use std::time::Duration;
use api::result::ObjectResultBuilder;
use api::result::{ObjectResultBuilder, PROTOCOL_VERSION};
use api::v1::alter_expr::Kind;
use api::v1::object_expr::Expr;
use api::v1::{
admin_expr, select_expr, AddColumns, AdminExpr, AdminResult, AlterExpr, Column,
CreateDatabaseExpr, CreateExpr, DropTableExpr, InsertExpr, ObjectExpr,
admin_expr, AddColumns, AdminExpr, AdminResult, AlterExpr, Column, CreateDatabaseExpr,
CreateExpr, DropTableExpr, ExprHeader, InsertExpr, ObjectExpr,
ObjectResult as GrpcObjectResult,
};
use async_trait::async_trait;
use catalog::remote::MetaKvBackend;
use catalog::{CatalogManagerRef, CatalogProviderRef, SchemaProviderRef};
use client::admin::{admin_result_to_output, Admin};
use client::{Client, Database, Select};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::prelude::{BoxedError, StatusCode};
use client::admin::admin_result_to_output;
use client::ObjectResult;
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use common_error::prelude::BoxedError;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_grpc::select::to_object_result;
use common_query::Output;
use common_recordbatch::RecordBatches;
use common_telemetry::{debug, error, info};
use common_telemetry::{debug, info};
use datanode::instance::InstanceRef as DnInstanceRef;
use distributed::DistInstance;
use meta_client::client::MetaClientBuilder;
use meta_client::client::{MetaClient, MetaClientBuilder};
use meta_client::MetaClientOpts;
use servers::query_handler::{
GrpcAdminHandler, GrpcQueryHandler, InfluxdbLineProtocolHandler, OpentsdbProtocolHandler,
PrometheusProtocolHandler, ScriptHandler, ScriptHandlerRef, SqlQueryHandler,
GrpcAdminHandler, GrpcAdminHandlerRef, GrpcQueryHandler, GrpcQueryHandlerRef,
InfluxdbLineProtocolHandler, OpentsdbProtocolHandler, PrometheusProtocolHandler, ScriptHandler,
ScriptHandlerRef, SqlQueryHandler, SqlQueryHandlerRef,
};
use servers::{error as server_error, Mode};
use session::context::{QueryContext, QueryContextRef};
use session::context::QueryContextRef;
use snafu::prelude::*;
use sql::dialect::GenericDialect;
use sql::parser::ParserContext;
use sql::statements::create::Partitions;
use sql::statements::explain::Explain;
use sql::statements::insert::Insert;
use sql::statements::statement::Statement;
use crate::catalog::FrontendCatalogManager;
use crate::datanode::DatanodeClients;
use crate::error::{
self, AlterTableOnInsertionSnafu, AlterTableSnafu, CatalogNotFoundSnafu, CatalogSnafu,
CreateDatabaseSnafu, CreateTableSnafu, DropTableSnafu, FindNewColumnsOnInsertionSnafu,
InsertSnafu, MissingMetasrvOptsSnafu, Result, SchemaNotFoundSnafu, SelectSnafu,
UnsupportedExprSnafu,
self, AlterTableOnInsertionSnafu, CatalogNotFoundSnafu, CatalogSnafu, CreateDatabaseSnafu,
CreateTableSnafu, FindNewColumnsOnInsertionSnafu, InsertSnafu, MissingMetasrvOptsSnafu, Result,
SchemaNotFoundSnafu,
};
use crate::expr_factory::{CreateExprFactoryRef, DefaultCreateExprFactory};
use crate::frontend::FrontendOptions;
@@ -91,9 +90,6 @@ pub type FrontendInstanceRef = Arc<dyn FrontendInstance>;
#[derive(Clone)]
pub struct Instance {
// TODO(hl): In standalone mode, there is only one client.
// But in distribute mode, frontend should fetch datanodes' addresses from metasrv.
client: Client,
/// catalog manager is None in standalone mode, datanode will keep their own
catalog_manager: Option<CatalogManagerRef>,
/// Script handler is None in distributed mode, only works on standalone mode.
@@ -103,94 +99,87 @@ pub struct Instance {
// Standalone and Distributed, then the code behind it doesn't need to use so
// many match statements.
mode: Mode,
// TODO(LFC): Refactor consideration: Can we split Frontend to DistInstance and EmbedInstance?
dist_instance: Option<DistInstance>,
}
impl Default for Instance {
fn default() -> Self {
Self {
client: Client::default(),
catalog_manager: None,
script_handler: None,
create_expr_factory: Arc::new(DefaultCreateExprFactory {}),
mode: Mode::Standalone,
dist_instance: None,
}
}
// TODO(LFC): Remove `dist_instance` together with Arrow Flight adoption refactor.
dist_instance: Option<DistInstance>,
sql_handler: SqlQueryHandlerRef,
grpc_query_handler: GrpcQueryHandlerRef,
grpc_admin_handler: GrpcAdminHandlerRef,
}
impl Instance {
pub async fn try_new(opts: &FrontendOptions) -> Result<Self> {
let mut instance = Instance {
mode: opts.mode.clone(),
..Default::default()
};
pub async fn try_new_distributed(opts: &FrontendOptions) -> Result<Self> {
let meta_client = Self::create_meta_client(opts).await?;
let addr = opts.datanode_grpc_addr();
instance.client.start(vec![addr]);
let meta_backend = Arc::new(MetaKvBackend {
client: meta_client.clone(),
});
let table_routes = Arc::new(TableRoutes::new(meta_client.clone()));
let datanode_clients = Arc::new(DatanodeClients::new());
let catalog_manager = Arc::new(FrontendCatalogManager::new(
meta_backend,
table_routes,
datanode_clients.clone(),
));
instance.dist_instance = match &opts.mode {
Mode::Standalone => None,
Mode::Distributed => {
let metasrv_addr = &opts
.meta_client_opts
.as_ref()
.context(MissingMetasrvOptsSnafu)?
.metasrv_addrs;
info!(
"Creating Frontend instance in distributed mode with Meta server addr {:?}",
metasrv_addr
);
let dist_instance =
DistInstance::new(meta_client, catalog_manager.clone(), datanode_clients);
let dist_instance_ref = Arc::new(dist_instance.clone());
let meta_config = MetaClientOpts::default();
let channel_config = ChannelConfig::new()
.timeout(Duration::from_millis(meta_config.timeout_millis))
.connect_timeout(Duration::from_millis(meta_config.connect_timeout_millis))
.tcp_nodelay(meta_config.tcp_nodelay);
let channel_manager = ChannelManager::with_config(channel_config);
let mut meta_client = MetaClientBuilder::new(0, 0)
.enable_router()
.enable_store()
.channel_manager(channel_manager)
.build();
meta_client
.start(metasrv_addr)
.await
.context(error::StartMetaClientSnafu)?;
let meta_client = Arc::new(meta_client);
let meta_backend = Arc::new(MetaKvBackend {
client: meta_client.clone(),
});
let table_routes = Arc::new(TableRoutes::new(meta_client.clone()));
let datanode_clients = Arc::new(DatanodeClients::new());
let catalog_manager = Arc::new(FrontendCatalogManager::new(
meta_backend,
table_routes,
datanode_clients.clone(),
));
instance.catalog_manager = Some(catalog_manager.clone());
Some(DistInstance::new(
meta_client,
catalog_manager,
datanode_clients,
))
}
};
Ok(instance)
Ok(Instance {
catalog_manager: Some(catalog_manager),
script_handler: None,
create_expr_factory: Arc::new(DefaultCreateExprFactory),
mode: Mode::Distributed,
dist_instance: Some(dist_instance),
sql_handler: dist_instance_ref.clone(),
grpc_query_handler: dist_instance_ref.clone(),
grpc_admin_handler: dist_instance_ref,
})
}
pub fn database(&self, database: &str) -> Database {
Database::new(database, self.client.clone())
async fn create_meta_client(opts: &FrontendOptions) -> Result<Arc<MetaClient>> {
let metasrv_addr = &opts
.meta_client_opts
.as_ref()
.context(MissingMetasrvOptsSnafu)?
.metasrv_addrs;
info!(
"Creating Frontend instance in distributed mode with Meta server addr {:?}",
metasrv_addr
);
let meta_config = MetaClientOpts::default();
let channel_config = ChannelConfig::new()
.timeout(Duration::from_millis(meta_config.timeout_millis))
.connect_timeout(Duration::from_millis(meta_config.connect_timeout_millis))
.tcp_nodelay(meta_config.tcp_nodelay);
let channel_manager = ChannelManager::with_config(channel_config);
let mut meta_client = MetaClientBuilder::new(0, 0)
.enable_router()
.enable_store()
.channel_manager(channel_manager)
.build();
meta_client
.start(metasrv_addr)
.await
.context(error::StartMetaClientSnafu)?;
Ok(Arc::new(meta_client))
}
pub fn admin(&self, database: &str) -> Admin {
Admin::new(database, self.client.clone())
pub fn new_standalone(dn_instance: DnInstanceRef) -> Self {
Instance {
catalog_manager: None,
script_handler: None,
create_expr_factory: Arc::new(DefaultCreateExprFactory),
mode: Mode::Standalone,
dist_instance: None,
sql_handler: dn_instance.clone(),
grpc_query_handler: dn_instance.clone(),
grpc_admin_handler: dn_instance,
}
}
pub fn catalog_manager(&self) -> &Option<CatalogManagerRef> {
@@ -213,27 +202,6 @@ impl Instance {
self.script_handler = Some(handler);
}
async fn handle_select(
&self,
expr: Select,
stmt: Statement,
query_ctx: QueryContextRef,
) -> Result<Output> {
if let Some(dist_instance) = &self.dist_instance {
let Select::Sql(sql) = expr;
dist_instance.handle_sql(&sql, stmt, query_ctx).await
} else {
// TODO(LFC): Refactor consideration: Datanode should directly execute statement in standalone mode to avoid parse SQL again.
// Find a better way to execute query between Frontend and Datanode in standalone mode.
// Otherwise we have to parse SQL first to get schema name. Maybe not GRPC.
self.database(DEFAULT_SCHEMA_NAME)
.select(expr)
.await
.and_then(Output::try_from)
.context(SelectSnafu)
}
}
/// Handle create expr.
pub async fn handle_create_table(
&self,
@@ -243,81 +211,38 @@ impl Instance {
if let Some(v) = &self.dist_instance {
v.create_table(&mut expr, partitions).await
} else {
// Currently standalone mode does not support multi partitions/regions.
let expr = AdminExpr {
header: Some(ExprHeader {
version: PROTOCOL_VERSION,
}),
expr: Some(admin_expr::Expr::Create(expr)),
};
let result = self
.admin(expr.schema_name.as_deref().unwrap_or(DEFAULT_SCHEMA_NAME))
.create(expr.clone())
.await;
if let Err(e) = &result {
error!(e; "Failed to create table by expr: {:?}", expr);
}
result
.and_then(admin_result_to_output)
.context(CreateTableSnafu)
.grpc_admin_handler
.exec_admin_request(expr)
.await
.context(error::InvokeGrpcServerSnafu)?;
admin_result_to_output(result).context(CreateTableSnafu)
}
}
/// Handle create database expr.
pub async fn handle_create_database(&self, expr: CreateDatabaseExpr) -> Result<Output> {
let database_name = expr.database_name.clone();
if let Some(dist_instance) = &self.dist_instance {
dist_instance.handle_create_database(expr).await
} else {
// FIXME(hl): In order to get admin client to create schema, we need to use the default schema admin
self.admin(DEFAULT_SCHEMA_NAME)
.create_database(expr)
.await
.and_then(admin_result_to_output)
.context(CreateDatabaseSnafu {
name: database_name,
})
}
}
/// Handle alter expr
pub async fn handle_alter(&self, expr: AlterExpr) -> Result<Output> {
match &self.dist_instance {
Some(dist_instance) => dist_instance.handle_alter_table(expr).await,
None => self
.admin(expr.schema_name.as_deref().unwrap_or(DEFAULT_SCHEMA_NAME))
.alter(expr)
.await
.and_then(admin_result_to_output)
.context(AlterTableSnafu),
}
}
/// Handle drop table expr
pub async fn handle_drop_table(&self, expr: DropTableExpr) -> Result<Output> {
match self.mode {
Mode::Standalone => self
.admin(&expr.schema_name)
.drop_table(expr)
.await
.and_then(admin_result_to_output)
.context(DropTableSnafu),
// TODO(ruihang): support drop table in distributed mode
Mode::Distributed => UnsupportedExprSnafu {
name: "Distributed DROP TABLE",
}
.fail(),
}
}
/// Handle explain expr
pub async fn handle_explain(
&self,
sql: &str,
explain_stmt: Explain,
query_ctx: QueryContextRef,
) -> Result<Output> {
if let Some(dist_instance) = &self.dist_instance {
dist_instance
.handle_sql(sql, Statement::Explain(explain_stmt), query_ctx)
.await
} else {
Ok(Output::AffectedRows(0))
}
let expr = AdminExpr {
header: Some(ExprHeader {
version: PROTOCOL_VERSION,
}),
expr: Some(admin_expr::Expr::CreateDatabase(expr)),
};
let result = self
.grpc_admin_handler
.exec_admin_request(expr)
.await
.context(error::InvokeGrpcServerSnafu)?;
admin_result_to_output(result).context(CreateDatabaseSnafu {
name: database_name,
})
}
/// Handle batch inserts
@@ -333,7 +258,7 @@ impl Instance {
}
/// Handle insert. for 'values' insertion, create/alter the destination table on demand.
pub async fn handle_insert(&self, mut insert_expr: InsertExpr) -> Result<Output> {
async fn handle_insert(&self, mut insert_expr: InsertExpr) -> Result<Output> {
let table_name = &insert_expr.table_name;
let catalog_name = DEFAULT_CATALOG_NAME;
let schema_name = &insert_expr.schema_name;
@@ -345,11 +270,17 @@ impl Instance {
insert_expr.region_number = 0;
self.database(schema_name)
.insert(insert_expr)
let query = ObjectExpr {
header: Some(ExprHeader {
version: PROTOCOL_VERSION,
}),
expr: Some(Expr::Insert(insert_expr)),
};
let result = GrpcQueryHandler::do_query(&*self.grpc_query_handler, query)
.await
.and_then(Output::try_from)
.context(InsertSnafu)
.context(error::InvokeGrpcServerSnafu)?;
let result: ObjectResult = result.try_into().context(InsertSnafu)?;
result.try_into().context(InsertSnafu)
}
// check if table already exist:
@@ -455,11 +386,19 @@ impl Instance {
catalog_name: Some(catalog_name.to_string()),
kind: Some(Kind::AddColumns(add_columns)),
};
self.admin(schema_name)
.alter(expr)
let expr = AdminExpr {
header: Some(ExprHeader {
version: PROTOCOL_VERSION,
}),
expr: Some(admin_expr::Expr::Alter(expr)),
};
let result = self
.grpc_admin_handler
.exec_admin_request(expr)
.await
.and_then(admin_result_to_output)
.context(AlterTableOnInsertionSnafu)
.context(error::InvokeGrpcServerSnafu)?;
admin_result_to_output(result).context(AlterTableOnInsertionSnafu)
}
fn get_catalog(&self, catalog_name: &str) -> Result<CatalogProviderRef> {
@@ -547,20 +486,6 @@ impl FrontendInstance for Instance {
}
}
#[cfg(test)]
impl Instance {
pub fn with_client_and_catalog_manager(client: Client, catalog: CatalogManagerRef) -> Self {
Self {
client,
catalog_manager: Some(catalog),
script_handler: None,
create_expr_factory: Arc::new(DefaultCreateExprFactory),
mode: Mode::Standalone,
dist_instance: None,
}
}
}
fn parse_stmt(sql: &str) -> Result<Statement> {
let mut stmt = ParserContext::create_with_dialect(sql, &GenericDialect {})
.context(error::ParseSqlSnafu)?;
@@ -587,12 +512,14 @@ impl SqlQueryHandler for Instance {
.context(server_error::ExecuteQuerySnafu { query })?;
match stmt {
Statement::ShowDatabases(_)
Statement::CreateDatabase(_)
| Statement::ShowDatabases(_)
| Statement::CreateTable(_)
| Statement::ShowTables(_)
| Statement::DescribeTable(_)
| Statement::Explain(_)
| Statement::Query(_) => {
self.handle_select(Select::Sql(query.to_string()), stmt, query_ctx)
.await
return self.sql_handler.do_query(query, query_ctx).await;
}
Statement::Insert(insert) => match self.mode {
Mode::Standalone => {
@@ -629,30 +556,18 @@ impl SqlQueryHandler for Instance {
Ok(Output::AffectedRows(affected))
}
},
Statement::CreateTable(create) => {
let create_expr = self
.create_expr_factory
.create_expr_by_stmt(&create)
.await
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu { query })?;
self.handle_create_table(create_expr, create.partitions)
.await
}
Statement::CreateDatabase(c) => {
let expr = CreateDatabaseExpr {
database_name: c.name.to_string(),
};
self.handle_create_database(expr).await
}
Statement::Alter(alter_stmt) => {
self.handle_alter(
AlterExpr::try_from(alter_stmt)
.map_err(BoxedError::new)
.context(server_error::ExecuteAlterSnafu { query })?,
)
.await
let expr = AlterExpr::try_from(alter_stmt)
.map_err(BoxedError::new)
.context(server_error::ExecuteAlterSnafu { query })?;
let expr = AdminExpr {
header: Some(ExprHeader {
version: PROTOCOL_VERSION,
}),
expr: Some(admin_expr::Expr::Alter(expr)),
};
let result = self.grpc_admin_handler.exec_admin_request(expr).await?;
admin_result_to_output(result).context(error::InvalidAdminResultSnafu)
}
Statement::DropTable(drop_stmt) => {
let expr = DropTableExpr {
@@ -660,10 +575,14 @@ impl SqlQueryHandler for Instance {
schema_name: drop_stmt.schema_name,
table_name: drop_stmt.table_name,
};
self.handle_drop_table(expr).await
}
Statement::Explain(explain_stmt) => {
self.handle_explain(query, explain_stmt, query_ctx).await
let expr = AdminExpr {
header: Some(ExprHeader {
version: PROTOCOL_VERSION,
}),
expr: Some(admin_expr::Expr::DropTable(expr)),
};
let result = self.grpc_admin_handler.exec_admin_request(expr).await?;
admin_result_to_output(result).context(error::InvalidAdminResultSnafu)
}
Statement::ShowCreateTable(_) => {
return server_error::NotSupportedSnafu { feat: query }.fail();
@@ -703,81 +622,34 @@ impl ScriptHandler for Instance {
#[async_trait]
impl GrpcQueryHandler for Instance {
async fn do_query(&self, query: ObjectExpr) -> server_error::Result<GrpcObjectResult> {
if let Some(expr) = &query.expr {
match expr {
Expr::Insert(insert) => {
// TODO(fys): refactor, avoid clone
let result = self.handle_insert(insert.clone()).await;
result
.map(|o| match o {
Output::AffectedRows(rows) => ObjectResultBuilder::new()
.status_code(StatusCode::Success as u32)
.mutate_result(rows as u32, 0u32)
.build(),
_ => {
unreachable!()
}
})
.map_err(BoxedError::new)
.with_context(|_| server_error::ExecuteQuerySnafu {
query: format!("{:?}", query),
})
}
Expr::Select(select) => {
let select = select
.expr
.as_ref()
.context(server_error::InvalidQuerySnafu {
reason: "empty query",
})?;
match select {
select_expr::Expr::Sql(sql) => {
let query_ctx = Arc::new(QueryContext::new());
let output = SqlQueryHandler::do_query(self, sql, query_ctx).await;
Ok(to_object_result(output).await)
}
_ => {
if self.dist_instance.is_some() {
return server_error::NotSupportedSnafu {
feat: "Executing plan directly in Frontend.",
}
.fail();
}
// FIXME(hl): refactor
self.database(DEFAULT_SCHEMA_NAME)
.object(query.clone())
.await
.map_err(BoxedError::new)
.with_context(|_| server_error::ExecuteQuerySnafu {
query: format!("{:?}", query),
})
}
}
}
_ => server_error::NotSupportedSnafu {
feat: "Currently only insert and select is supported in GRPC service.",
}
.fail(),
let expr = query
.clone()
.expr
.context(server_error::InvalidQuerySnafu {
reason: "empty expr",
})?;
match expr {
Expr::Insert(insert_expr) => {
let output = self
.handle_insert(insert_expr.clone())
.await
.map_err(BoxedError::new)
.with_context(|_| server_error::ExecuteQuerySnafu {
query: format!("{:?}", insert_expr),
})?;
let object_result = match output {
Output::AffectedRows(rows) => ObjectResultBuilder::default()
.mutate_result(rows as _, 0)
.build(),
_ => unreachable!(),
};
Ok(object_result)
}
} else {
server_error::InvalidQuerySnafu {
reason: "empty query",
}
.fail()
_ => GrpcQueryHandler::do_query(&*self.grpc_query_handler, query).await,
}
}
}
fn get_schema_name(expr: &AdminExpr) -> &str {
let schema_name = match &expr.expr {
Some(admin_expr::Expr::Create(expr)) => expr.schema_name.as_deref(),
Some(admin_expr::Expr::Alter(expr)) => expr.schema_name.as_deref(),
Some(admin_expr::Expr::CreateDatabase(_)) | None => Some(DEFAULT_SCHEMA_NAME),
Some(admin_expr::Expr::DropTable(expr)) => Some(expr.schema_name.as_ref()),
};
schema_name.unwrap_or(DEFAULT_SCHEMA_NAME)
}
#[async_trait]
impl GrpcAdminHandler for Instance {
async fn exec_admin_request(&self, mut expr: AdminExpr) -> server_error::Result<AdminResult> {
@@ -786,13 +658,7 @@ impl GrpcAdminHandler for Instance {
if let Some(api::v1::admin_expr::Expr::Create(create)) = &mut expr.expr {
create.table_id = None;
}
self.admin(get_schema_name(&expr))
.do_request(expr.clone())
.await
.map_err(BoxedError::new)
.with_context(|_| server_error::ExecuteQuerySnafu {
query: format!("{:?}", expr),
})
self.grpc_admin_handler.exec_admin_request(expr).await
}
}
@@ -808,6 +674,7 @@ mod tests {
};
use datatypes::schema::ColumnDefaultConstraint;
use datatypes::value::Value;
use session::context::QueryContext;
use super::*;
use crate::tests;
@@ -853,7 +720,8 @@ mod tests {
.await
.unwrap();
match output {
Output::RecordBatches(recordbatches) => {
Output::Stream(stream) => {
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
let pretty_print = recordbatches.pretty_print();
let pretty_print = pretty_print.lines().collect::<Vec<&str>>();
let expected = vec![
@@ -875,7 +743,8 @@ mod tests {
.await
.unwrap();
match output {
Output::RecordBatches(recordbatches) => {
Output::Stream(stream) => {
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
let pretty_print = recordbatches.pretty_print();
let pretty_print = pretty_print.lines().collect::<Vec<&str>>();
let expected = vec![

View File

@@ -16,12 +16,18 @@ use std::collections::HashMap;
use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;
use api::v1::{AlterExpr, CreateDatabaseExpr, CreateExpr};
use api::result::AdminResultBuilder;
use api::v1::{
admin_expr, AdminExpr, AdminResult, AlterExpr, CreateDatabaseExpr, CreateExpr, ObjectExpr,
ObjectResult,
};
use async_trait::async_trait;
use catalog::helper::{SchemaKey, SchemaValue, TableGlobalKey, TableGlobalValue};
use catalog::CatalogList;
use chrono::DateTime;
use client::admin::{admin_result_to_output, Admin};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::prelude::BoxedError;
use common_query::Output;
use common_telemetry::{debug, error, info};
use datatypes::prelude::ConcreteDataType;
@@ -33,6 +39,8 @@ use meta_client::rpc::{
};
use query::sql::{describe_table, explain, show_databases, show_tables};
use query::{QueryEngineFactory, QueryEngineRef};
use servers::error as server_error;
use servers::query_handler::{GrpcAdminHandler, GrpcQueryHandler, SqlQueryHandler};
use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
use sql::statements::create::Partitions;
@@ -48,6 +56,8 @@ use crate::error::{
PrimaryKeyNotFoundSnafu, RequestMetaSnafu, Result, SchemaNotFoundSnafu, StartMetaClientSnafu,
TableNotFoundSnafu,
};
use crate::expr_factory::{CreateExprFactory, DefaultCreateExprFactory};
use crate::instance::parse_stmt;
use crate::partitioning::{PartitionBound, PartitionDef};
use crate::table::DistTable;
@@ -126,15 +136,12 @@ impl DistInstance {
.context(error::InvalidAdminResultSnafu)?;
}
Ok(Output::AffectedRows(region_routes.len()))
// Checked in real MySQL, it truly returns "0 rows affected".
Ok(Output::AffectedRows(0))
}
pub(crate) async fn handle_sql(
&self,
sql: &str,
stmt: Statement,
query_ctx: QueryContextRef,
) -> Result<Output> {
async fn handle_sql(&self, sql: &str, query_ctx: QueryContextRef) -> Result<Output> {
let stmt = parse_stmt(sql)?;
match stmt {
Statement::Query(_) => {
let plan = self
@@ -143,6 +150,17 @@ impl DistInstance {
.context(error::ExecuteSqlSnafu { sql })?;
self.query_engine.execute(&plan).await
}
Statement::CreateDatabase(stmt) => {
let expr = CreateDatabaseExpr {
database_name: stmt.name.to_string(),
};
self.handle_create_database(expr).await?;
Ok(Output::AffectedRows(1))
}
Statement::CreateTable(stmt) => {
let create_expr = &mut DefaultCreateExprFactory.create_expr_by_stmt(&stmt).await?;
Ok(self.create_table(create_expr, stmt.partitions).await?)
}
Statement::ShowDatabases(stmt) => show_databases(stmt, self.catalog_manager.clone()),
Statement::ShowTables(stmt) => {
show_tables(stmt, self.catalog_manager.clone(), query_ctx)
@@ -157,7 +175,7 @@ impl DistInstance {
}
/// Handles distributed database creation
pub(crate) async fn handle_create_database(&self, expr: CreateDatabaseExpr) -> Result<Output> {
async fn handle_create_database(&self, expr: CreateDatabaseExpr) -> Result<()> {
let key = SchemaKey {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: expr.database_name,
@@ -172,10 +190,10 @@ impl DistInstance {
.with_key(key.to_string())
.with_value(value.as_bytes().context(CatalogEntrySerdeSnafu)?);
client.put(request.into()).await.context(RequestMetaSnafu)?;
Ok(Output::AffectedRows(1))
Ok(())
}
pub async fn handle_alter_table(&self, expr: AlterExpr) -> Result<Output> {
async fn handle_alter_table(&self, expr: AlterExpr) -> Result<AdminResult> {
let catalog_name = expr.catalog_name.as_deref().unwrap_or(DEFAULT_CATALOG_NAME);
let schema_name = expr.schema_name.as_deref().unwrap_or(DEFAULT_SCHEMA_NAME);
let table_name = expr.table_name.as_str();
@@ -200,7 +218,7 @@ impl DistInstance {
.downcast_ref::<DistTable>()
.expect("Table impl must be DistTable in distributed mode");
dist_table.alter_by_expr(expr).await?;
Ok(Output::AffectedRows(0))
Ok(AdminResultBuilder::default().mutate_result(0, 0).build())
}
async fn create_table_in_meta(
@@ -269,6 +287,56 @@ impl DistInstance {
}
Ok(())
}
#[cfg(test)]
pub(crate) fn catalog_manager(&self) -> Arc<FrontendCatalogManager> {
self.catalog_manager.clone()
}
}
#[async_trait]
impl SqlQueryHandler for DistInstance {
async fn do_query(
&self,
query: &str,
query_ctx: QueryContextRef,
) -> server_error::Result<Output> {
self.handle_sql(query, query_ctx)
.await
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu { query })
}
}
#[async_trait]
impl GrpcQueryHandler for DistInstance {
async fn do_query(&self, _: ObjectExpr) -> server_error::Result<ObjectResult> {
unimplemented!()
}
}
#[async_trait]
impl GrpcAdminHandler for DistInstance {
async fn exec_admin_request(&self, query: AdminExpr) -> server_error::Result<AdminResult> {
let expr = query
.clone()
.expr
.context(server_error::InvalidQuerySnafu {
reason: "empty expr",
})?;
match expr {
admin_expr::Expr::CreateDatabase(create_database) => self
.handle_create_database(create_database)
.await
.map(|_| AdminResultBuilder::default().mutate_result(1, 0).build()),
admin_expr::Expr::Alter(alter) => self.handle_alter_table(alter).await,
_ => unimplemented!(),
}
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu {
query: format!("{:?}", query),
})
}
}
fn create_table_global_value(
@@ -454,12 +522,15 @@ fn find_partition_columns(
#[cfg(test)]
mod test {
use servers::query_handler::SqlQueryHandlerRef;
use session::context::QueryContext;
use sql::parser::ParserContext;
use sql::statements::statement::Statement;
use sqlparser::dialect::GenericDialect;
use super::*;
use crate::expr_factory::{CreateExprFactory, DefaultCreateExprFactory};
use crate::tests::create_dist_instance;
#[tokio::test]
async fn test_parse_partitions() {
@@ -492,9 +563,10 @@ ENGINE=mito",
let result = ParserContext::create_with_dialect(sql, &GenericDialect {}).unwrap();
match &result[0] {
Statement::CreateTable(c) => {
common_telemetry::info!("{}", sql);
let factory = DefaultCreateExprFactory {};
let expr = factory.create_expr_by_stmt(c).await.unwrap();
let expr = DefaultCreateExprFactory
.create_expr_by_stmt(c)
.await
.unwrap();
let partitions = parse_partitions(&expr, c.partitions.clone()).unwrap();
let json = serde_json::to_string(&partitions).unwrap();
assert_eq!(json, expected);
@@ -503,4 +575,103 @@ ENGINE=mito",
}
}
}
#[tokio::test(flavor = "multi_thread")]
async fn test_show_databases() {
let (dist_instance, _) = create_dist_instance().await;
let sql = "create database test_show_databases";
let output = dist_instance
.handle_sql(sql, QueryContext::arc())
.await
.unwrap();
match output {
Output::AffectedRows(rows) => assert_eq!(rows, 1),
_ => unreachable!(),
}
let sql = "show databases";
let output = dist_instance
.handle_sql(sql, QueryContext::arc())
.await
.unwrap();
match output {
Output::RecordBatches(r) => {
let expected1 = vec![
"+---------------------+",
"| Schemas |",
"+---------------------+",
"| public |",
"| test_show_databases |",
"+---------------------+",
];
let expected2 = vec![
"+---------------------+",
"| Schemas |",
"+---------------------+",
"| test_show_databases |",
"| public |",
"+---------------------+",
];
let pretty = r.pretty_print();
let lines = pretty.lines().collect::<Vec<_>>();
assert!(lines == expected1 || lines == expected2)
}
_ => unreachable!(),
}
}
#[tokio::test(flavor = "multi_thread")]
async fn test_show_tables() {
let (dist_instance, datanode_instances) = create_dist_instance().await;
let sql = "create database test_show_tables";
dist_instance
.handle_sql(sql, QueryContext::arc())
.await
.unwrap();
let sql = "
CREATE TABLE greptime.test_show_tables.dist_numbers (
ts BIGINT,
n INT,
TIME INDEX (ts),
)
PARTITION BY RANGE COLUMNS (n) (
PARTITION r0 VALUES LESS THAN (10),
PARTITION r1 VALUES LESS THAN (20),
PARTITION r2 VALUES LESS THAN (50),
PARTITION r3 VALUES LESS THAN (MAXVALUE),
)
ENGINE=mito";
dist_instance
.handle_sql(sql, QueryContext::arc())
.await
.unwrap();
async fn assert_show_tables(instance: SqlQueryHandlerRef) {
let sql = "show tables in test_show_tables";
let output = instance.do_query(sql, QueryContext::arc()).await.unwrap();
match output {
Output::RecordBatches(r) => {
let expected = vec![
"+--------------+",
"| Tables |",
"+--------------+",
"| dist_numbers |",
"+--------------+",
];
assert_eq!(r.pretty_print().lines().collect::<Vec<_>>(), expected);
}
_ => unreachable!(),
}
}
assert_show_tables(Arc::new(dist_instance)).await;
// Asserts that new table is created in Datanode as well.
for x in datanode_instances.values() {
assert_show_tables(x.clone()).await
}
}
}

View File

@@ -19,7 +19,6 @@ use servers::query_handler::OpentsdbProtocolHandler;
use servers::{error as server_error, Mode};
use snafu::prelude::*;
use crate::error::Result;
use crate::instance::Instance;
#[async_trait]
@@ -29,12 +28,7 @@ impl OpentsdbProtocolHandler for Instance {
// metric table and tags can be created upon insertion.
match self.mode {
Mode::Standalone => {
self.insert_opentsdb_metric(data_point)
.await
.map_err(BoxedError::new)
.with_context(|_| server_error::PutOpentsdbDataPointSnafu {
data_point: format!("{:?}", data_point),
})?;
self.insert_opentsdb_metric(data_point).await?;
}
Mode::Distributed => {
self.dist_insert(vec![data_point.as_grpc_insert()])
@@ -51,9 +45,14 @@ impl OpentsdbProtocolHandler for Instance {
}
impl Instance {
async fn insert_opentsdb_metric(&self, data_point: &DataPoint) -> Result<()> {
let expr = data_point.as_grpc_insert();
self.handle_insert(expr).await?;
async fn insert_opentsdb_metric(&self, data_point: &DataPoint) -> server_error::Result<()> {
let insert_expr = data_point.as_grpc_insert();
self.handle_insert(insert_expr)
.await
.map_err(BoxedError::new)
.with_context(|_| server_error::ExecuteQuerySnafu {
query: format!("{:?}", data_point),
})?;
Ok(())
}
}
@@ -63,6 +62,7 @@ mod tests {
use std::sync::Arc;
use common_query::Output;
use common_recordbatch::RecordBatches;
use datafusion::arrow_print;
use servers::query_handler::SqlQueryHandler;
use session::context::QueryContext;
@@ -128,7 +128,8 @@ mod tests {
.await
.unwrap();
match output {
Output::RecordBatches(recordbatches) => {
Output::Stream(stream) => {
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
let recordbatches = recordbatches
.take()
.into_iter()

View File

@@ -17,11 +17,10 @@ use std::sync::Arc;
use api::prometheus::remote::read_request::ResponseType;
use api::prometheus::remote::{Query, QueryResult, ReadRequest, ReadResponse, WriteRequest};
use async_trait::async_trait;
use client::{ObjectResult, Select};
use client::ObjectResult;
use common_error::prelude::BoxedError;
use common_grpc::select::to_object_result;
use common_telemetry::logging;
use futures_util::TryFutureExt;
use prost::Message;
use servers::error::{self, Result as ServerResult};
use servers::prometheus::{self, Metrics};
@@ -30,7 +29,7 @@ use servers::Mode;
use session::context::QueryContext;
use snafu::{OptionExt, ResultExt};
use crate::instance::{parse_stmt, Instance};
use crate::instance::Instance;
const SAMPLES_RESPONSE_TYPE: i32 = ResponseType::Samples as i32;
@@ -94,19 +93,14 @@ impl Instance {
sql
);
let object_result = if let Some(dist_instance) = &self.dist_instance {
let output = futures::future::ready(parse_stmt(&sql))
.and_then(|stmt| {
let query_ctx = Arc::new(QueryContext::with_current_schema(db.to_string()));
dist_instance.handle_sql(&sql, stmt, query_ctx)
})
.await;
to_object_result(output).await.try_into()
} else {
self.database(db).select(Select::Sql(sql.clone())).await
}
.map_err(BoxedError::new)
.context(error::ExecuteQuerySnafu { query: sql })?;
let query_ctx = Arc::new(QueryContext::with_current_schema(db.to_string()));
let output = self.sql_handler.do_query(&sql, query_ctx).await;
let object_result = to_object_result(output)
.await
.try_into()
.map_err(BoxedError::new)
.context(error::ExecuteQuerySnafu { query: sql })?;
results.push((table_name, object_result));
}
@@ -117,34 +111,25 @@ impl Instance {
#[async_trait]
impl PrometheusProtocolHandler for Instance {
async fn write(&self, database: &str, request: WriteRequest) -> ServerResult<()> {
let exprs = prometheus::write_request_to_insert_exprs(database, request.clone())?;
match self.mode {
Mode::Standalone => {
let exprs = prometheus::write_request_to_insert_exprs(database, request)?;
let futures = exprs
.into_iter()
.map(|e| self.handle_insert(e))
.collect::<Vec<_>>();
let res = futures_util::future::join_all(futures)
self.handle_inserts(exprs)
.await
.into_iter()
.collect::<Result<Vec<_>, crate::error::Error>>();
res.map_err(BoxedError::new)
.context(error::ExecuteInsertSnafu {
msg: "failed to write prometheus remote request",
.map_err(BoxedError::new)
.with_context(|_| error::ExecuteInsertSnafu {
msg: format!("{:?}", request),
})?;
}
Mode::Distributed => {
let inserts = prometheus::write_request_to_insert_exprs(database, request)?;
self.dist_insert(inserts)
self.dist_insert(exprs)
.await
.map_err(BoxedError::new)
.context(error::ExecuteInsertSnafu {
msg: "execute insert failed",
.with_context(|_| error::ExecuteInsertSnafu {
msg: format!("{:?}", request),
})?;
}
}
Ok(())
}

View File

@@ -505,42 +505,33 @@ impl PartitionExec {
}
}
// FIXME(LFC): no allow, for clippy temporarily
#[allow(clippy::print_stdout)]
#[cfg(test)]
mod test {
use std::time::Duration;
use api::v1::column::SemanticType;
use api::v1::{column, Column, ColumnDataType};
use catalog::remote::MetaKvBackend;
use common_recordbatch::util;
use datafusion::arrow_print;
use datafusion_common::record_batch::RecordBatch as DfRecordBatch;
use common_query::physical_plan::DfPhysicalPlanAdapter;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::expressions::{col as physical_col, PhysicalSortExpr};
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_expr::expr_fn::{and, binary_expr, col, or};
use datafusion_expr::lit;
use datanode::datanode::{DatanodeOptions, ObjectStoreConfig};
use datanode::instance::Instance;
use datatypes::arrow::compute::sort::SortOptions;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema};
use meta_client::client::{MetaClient, MetaClientBuilder};
use meta_client::client::MetaClient;
use meta_client::rpc::router::RegionRoute;
use meta_client::rpc::{Region, Table, TableRoute};
use meta_srv::metasrv::MetaSrvOptions;
use meta_srv::mocks::MockInfo;
use meta_srv::service::store::kv::KvStoreRef;
use meta_srv::service::store::memory::MemStore;
use sql::parser::ParserContext;
use sql::statements::statement::Statement;
use sqlparser::dialect::GenericDialect;
use table::metadata::{TableInfoBuilder, TableMetaBuilder};
use table::TableRef;
use tempdir::TempDir;
use super::*;
use crate::catalog::FrontendCatalogManager;
use crate::expr_factory::{CreateExprFactory, DefaultCreateExprFactory};
use crate::instance::distributed::DistInstance;
use crate::partitioning::range::RangePartitionRule;
#[tokio::test(flavor = "multi_thread")]
@@ -741,29 +732,78 @@ mod test {
#[tokio::test(flavor = "multi_thread")]
async fn test_dist_table_scan() {
common_telemetry::init_default_ut_logging();
let table = Arc::new(new_dist_table().await);
// should scan all regions
// select * from numbers
let projection = None;
// select a, row_id from numbers
let projection = Some(vec![1, 2]);
let filters = vec![];
exec_table_scan(table.clone(), projection, filters, None).await;
println!();
let expected_output = vec![
"+-----+--------+",
"| a | row_id |",
"+-----+--------+",
"| 0 | 1 |",
"| 1 | 2 |",
"| 2 | 3 |",
"| 3 | 4 |",
"| 4 | 5 |",
"| 10 | 1 |",
"| 11 | 2 |",
"| 12 | 3 |",
"| 13 | 4 |",
"| 14 | 5 |",
"| 30 | 1 |",
"| 31 | 2 |",
"| 32 | 3 |",
"| 33 | 4 |",
"| 34 | 5 |",
"| 100 | 1 |",
"| 101 | 2 |",
"| 102 | 3 |",
"| 103 | 4 |",
"| 104 | 5 |",
"+-----+--------+",
];
exec_table_scan(table.clone(), projection, filters, 4, expected_output).await;
// should scan only region 1
// select a, row_id from numbers where a < 10
let projection = Some(vec![1, 2]);
let filters = vec![binary_expr(col("a"), Operator::Lt, lit(10)).into()];
exec_table_scan(table.clone(), projection, filters, None).await;
println!();
let expected_output = vec![
"+---+--------+",
"| a | row_id |",
"+---+--------+",
"| 0 | 1 |",
"| 1 | 2 |",
"| 2 | 3 |",
"| 3 | 4 |",
"| 4 | 5 |",
"+---+--------+",
];
exec_table_scan(table.clone(), projection, filters, 1, expected_output).await;
// should scan region 1 and 2
// select a, row_id from numbers where a < 15
let projection = Some(vec![1, 2]);
let filters = vec![binary_expr(col("a"), Operator::Lt, lit(15)).into()];
exec_table_scan(table.clone(), projection, filters, None).await;
println!();
let expected_output = vec![
"+----+--------+",
"| a | row_id |",
"+----+--------+",
"| 0 | 1 |",
"| 1 | 2 |",
"| 2 | 3 |",
"| 3 | 4 |",
"| 4 | 5 |",
"| 10 | 1 |",
"| 11 | 2 |",
"| 12 | 3 |",
"| 13 | 4 |",
"| 14 | 5 |",
"+----+--------+",
];
exec_table_scan(table.clone(), projection, filters, 2, expected_output).await;
// should scan region 2 and 3
// select a, row_id from numbers where a < 40 and a >= 10
@@ -773,8 +813,23 @@ mod test {
binary_expr(col("a"), Operator::GtEq, lit(10)),
)
.into()];
exec_table_scan(table.clone(), projection, filters, None).await;
println!();
let expected_output = vec![
"+----+--------+",
"| a | row_id |",
"+----+--------+",
"| 10 | 1 |",
"| 11 | 2 |",
"| 12 | 3 |",
"| 13 | 4 |",
"| 14 | 5 |",
"| 30 | 1 |",
"| 31 | 2 |",
"| 32 | 3 |",
"| 33 | 4 |",
"| 34 | 5 |",
"+----+--------+",
];
exec_table_scan(table.clone(), projection, filters, 2, expected_output).await;
// should scan all regions
// select a, row_id from numbers where a < 1000 and row_id == 1
@@ -784,36 +839,59 @@ mod test {
binary_expr(col("row_id"), Operator::Eq, lit(1)),
)
.into()];
exec_table_scan(table.clone(), projection, filters, None).await;
let expected_output = vec![
"+-----+--------+",
"| a | row_id |",
"+-----+--------+",
"| 0 | 1 |",
"| 10 | 1 |",
"| 30 | 1 |",
"| 100 | 1 |",
"+-----+--------+",
];
exec_table_scan(table.clone(), projection, filters, 4, expected_output).await;
}
async fn exec_table_scan(
table: TableRef,
projection: Option<Vec<usize>>,
filters: Vec<Expr>,
limit: Option<usize>,
expected_partitions: usize,
expected_output: Vec<&str>,
) {
let table_scan = table
.scan(&projection, filters.as_slice(), limit)
.scan(&projection, filters.as_slice(), None)
.await
.unwrap();
assert_eq!(
table_scan.output_partitioning().partition_count(),
expected_partitions
);
for partition in 0..table_scan.output_partitioning().partition_count() {
let result = table_scan
.execute(partition, Arc::new(RuntimeEnv::default()))
.unwrap();
let recordbatches = util::collect(result).await.unwrap();
let merge =
CoalescePartitionsExec::new(Arc::new(DfPhysicalPlanAdapter(table_scan.clone())));
let df_recordbatch = recordbatches
.into_iter()
.map(|r| r.df_recordbatch)
.collect::<Vec<DfRecordBatch>>();
let sort = SortExec::try_new(
vec![PhysicalSortExpr {
expr: physical_col("a", table_scan.schema().arrow_schema()).unwrap(),
options: SortOptions::default(),
}],
Arc::new(merge),
)
.unwrap();
assert_eq!(sort.output_partitioning().partition_count(), 1);
println!("DataFusion partition {}:", partition);
let pretty_print = arrow_print::write(&df_recordbatch);
let pretty_print = pretty_print.lines().collect::<Vec<&str>>();
pretty_print.iter().for_each(|x| println!("{}", x));
}
let stream = sort
.execute(0, Arc::new(RuntimeEnv::default()))
.await
.unwrap();
let stream = Box::pin(RecordBatchStreamAdapter::try_new(stream).unwrap());
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(
recordbatches.pretty_print().lines().collect::<Vec<_>>(),
expected_output
);
}
async fn new_dist_table() -> DistTable {
@@ -824,52 +902,13 @@ mod test {
];
let schema = Arc::new(Schema::new(column_schemas.clone()));
let kv_store: KvStoreRef = Arc::new(MemStore::default()) as _;
let meta_srv =
meta_srv::mocks::mock(MetaSrvOptions::default(), kv_store.clone(), None).await;
let datanode_clients = Arc::new(DatanodeClients::new());
let mut datanode_instances = HashMap::new();
for datanode_id in 1..=4 {
let dn_instance = create_datanode_instance(datanode_id, meta_srv.clone()).await;
datanode_instances.insert(datanode_id, dn_instance.clone());
let (addr, client) = crate::tests::create_datanode_client(dn_instance).await;
datanode_clients
.insert_client(Peer::new(datanode_id, addr), client)
.await;
}
let MockInfo {
server_addr,
channel_manager,
} = meta_srv.clone();
let mut meta_client = MetaClientBuilder::new(1000, 0)
.enable_router()
.enable_store()
.channel_manager(channel_manager)
.build();
meta_client.start(&[&server_addr]).await.unwrap();
let meta_client = Arc::new(meta_client);
let (dist_instance, datanode_instances) = crate::tests::create_dist_instance().await;
let catalog_manager = dist_instance.catalog_manager();
let table_routes = catalog_manager.table_routes();
let datanode_clients = catalog_manager.datanode_clients();
let table_name = TableName::new("greptime", "public", "dist_numbers");
let meta_backend = Arc::new(MetaKvBackend {
client: meta_client.clone(),
});
let table_routes = Arc::new(TableRoutes::new(meta_client.clone()));
let catalog_manager = Arc::new(FrontendCatalogManager::new(
meta_backend,
table_routes.clone(),
datanode_clients.clone(),
));
let dist_instance = DistInstance::new(
meta_client.clone(),
catalog_manager,
datanode_clients.clone(),
);
let sql = "
CREATE TABLE greptime.public.dist_numbers (
ts BIGINT,
@@ -893,17 +932,16 @@ mod test {
_ => unreachable!(),
};
wait_datanodes_alive(kv_store).await;
let factory = DefaultCreateExprFactory {};
let mut expr = factory.create_expr_by_stmt(&create_table).await.unwrap();
let mut expr = DefaultCreateExprFactory
.create_expr_by_stmt(&create_table)
.await
.unwrap();
let _result = dist_instance
.create_table(&mut expr, create_table.partitions)
.await
.unwrap();
let table_route = table_routes.get_route(&table_name).await.unwrap();
println!("{}", serde_json::to_string_pretty(&table_route).unwrap());
let mut region_to_datanode_mapping = HashMap::new();
for region_route in table_route.region_routes.iter() {
@@ -948,20 +986,6 @@ mod test {
}
}
async fn wait_datanodes_alive(kv_store: KvStoreRef) {
let wait = 10;
for _ in 0..wait {
let datanodes = meta_srv::lease::alive_datanodes(1000, &kv_store, |_, _| true)
.await
.unwrap();
if datanodes.len() >= 4 {
return;
}
tokio::time::sleep(Duration::from_secs(1)).await
}
panic!()
}
async fn insert_testing_data(
table_name: &TableName,
dn_instance: Arc<Instance>,
@@ -1013,30 +1037,6 @@ mod test {
.unwrap();
}
async fn create_datanode_instance(datanode_id: u64, meta_srv: MockInfo) -> Arc<Instance> {
let current = common_time::util::current_time_millis();
let wal_tmp_dir =
TempDir::new_in("/tmp", &format!("dist_table_test-wal-{}", current)).unwrap();
let data_tmp_dir =
TempDir::new_in("/tmp", &format!("dist_table_test-data-{}", current)).unwrap();
let opts = DatanodeOptions {
node_id: Some(datanode_id),
wal_dir: wal_tmp_dir.path().to_str().unwrap().to_string(),
storage: ObjectStoreConfig::File {
data_dir: data_tmp_dir.path().to_str().unwrap().to_string(),
},
..Default::default()
};
let instance = Arc::new(
Instance::with_mock_meta_server(&opts, meta_srv)
.await
.unwrap(),
);
instance.start().await.unwrap();
instance
}
#[tokio::test(flavor = "multi_thread")]
async fn test_find_regions() {
let schema = Arc::new(Schema::new(vec![ColumnSchema::new(

View File

@@ -12,17 +12,32 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use catalog::remote::MetaKvBackend;
use client::Client;
use common_grpc::channel_manager::ChannelManager;
use common_runtime::Builder as RuntimeBuilder;
use datanode::datanode::{DatanodeOptions, ObjectStoreConfig};
use datanode::instance::Instance as DatanodeInstance;
use meta_client::client::MetaClientBuilder;
use meta_client::rpc::Peer;
use meta_srv::metasrv::MetaSrvOptions;
use meta_srv::mocks::MockInfo;
use meta_srv::service::store::kv::KvStoreRef;
use meta_srv::service::store::memory::MemStore;
use servers::grpc::GrpcServer;
use tempdir::TempDir;
use tonic::transport::Server;
use tower::service_fn;
use crate::catalog::FrontendCatalogManager;
use crate::datanode::DatanodeClients;
use crate::instance::distributed::DistInstance;
use crate::instance::Instance;
use crate::table::route::TableRoutes;
async fn create_datanode_instance() -> Arc<DatanodeInstance> {
// TODO(LFC) Use real Mito engine when we can alter its region schema,
@@ -35,11 +50,10 @@ async fn create_datanode_instance() -> Arc<DatanodeInstance> {
pub(crate) async fn create_frontend_instance() -> Arc<Instance> {
let datanode_instance: Arc<DatanodeInstance> = create_datanode_instance().await;
let dn_catalog_manager = datanode_instance.catalog_manager().clone();
let (_, client) = create_datanode_client(datanode_instance).await;
Arc::new(Instance::with_client_and_catalog_manager(
client,
dn_catalog_manager,
))
let mut frontend_instance = Instance::new_standalone(datanode_instance);
frontend_instance.set_catalog_manager(dn_catalog_manager);
Arc::new(frontend_instance)
}
pub(crate) async fn create_datanode_client(
@@ -96,3 +110,91 @@ pub(crate) async fn create_datanode_client(
Client::with_manager_and_urls(channel_manager, vec![addr]),
)
}
async fn create_dist_datanode_instance(
datanode_id: u64,
meta_srv: MockInfo,
) -> Arc<DatanodeInstance> {
let current = common_time::util::current_time_millis();
let wal_tmp_dir = TempDir::new_in("/tmp", &format!("dist_datanode-wal-{}", current)).unwrap();
let data_tmp_dir = TempDir::new_in("/tmp", &format!("dist_datanode-data-{}", current)).unwrap();
let opts = DatanodeOptions {
node_id: Some(datanode_id),
wal_dir: wal_tmp_dir.path().to_str().unwrap().to_string(),
storage: ObjectStoreConfig::File {
data_dir: data_tmp_dir.path().to_str().unwrap().to_string(),
},
..Default::default()
};
let instance = Arc::new(
DatanodeInstance::with_mock_meta_server(&opts, meta_srv)
.await
.unwrap(),
);
instance.start().await.unwrap();
instance
}
async fn wait_datanodes_alive(kv_store: KvStoreRef) {
let wait = 10;
for _ in 0..wait {
let datanodes = meta_srv::lease::alive_datanodes(1000, &kv_store, |_, _| true)
.await
.unwrap();
if datanodes.len() >= 4 {
return;
}
tokio::time::sleep(Duration::from_secs(1)).await
}
panic!()
}
pub(crate) async fn create_dist_instance() -> (DistInstance, HashMap<u64, Arc<DatanodeInstance>>) {
let kv_store: KvStoreRef = Arc::new(MemStore::default()) as _;
let meta_srv = meta_srv::mocks::mock(MetaSrvOptions::default(), kv_store.clone(), None).await;
let datanode_clients = Arc::new(DatanodeClients::new());
let mut datanode_instances = HashMap::new();
for datanode_id in 1..=4 {
let dn_instance = create_dist_datanode_instance(datanode_id, meta_srv.clone()).await;
datanode_instances.insert(datanode_id, dn_instance.clone());
let (addr, client) = create_datanode_client(dn_instance).await;
datanode_clients
.insert_client(Peer::new(datanode_id, addr), client)
.await;
}
let MockInfo {
server_addr,
channel_manager,
} = meta_srv.clone();
let mut meta_client = MetaClientBuilder::new(1000, 0)
.enable_router()
.enable_store()
.channel_manager(channel_manager)
.build();
meta_client.start(&[&server_addr]).await.unwrap();
let meta_client = Arc::new(meta_client);
let meta_backend = Arc::new(MetaKvBackend {
client: meta_client.clone(),
});
let table_routes = Arc::new(TableRoutes::new(meta_client.clone()));
let catalog_manager = Arc::new(FrontendCatalogManager::new(
meta_backend,
table_routes.clone(),
datanode_clients.clone(),
));
wait_datanodes_alive(kv_store).await;
let dist_instance = DistInstance::new(
meta_client.clone(),
catalog_manager,
datanode_clients.clone(),
);
(dist_instance, datanode_instances)
}

View File

@@ -15,7 +15,6 @@
//! Builtin module contains GreptimeDB builtin udf/udaf
#[cfg(test)]
#[allow(clippy::print_stdout)]
mod test;
use datafusion_common::{DataFusionError, ScalarValue};

View File

@@ -18,6 +18,7 @@ use std::io::Read;
use std::path::Path;
use std::sync::Arc;
use common_telemetry::{error, info};
use datatypes::arrow::array::{Float64Array, Int64Array, PrimitiveArray};
use datatypes::arrow::compute::cast::CastOptions;
use datatypes::arrow::datatypes::DataType;
@@ -331,6 +332,8 @@ impl PyValue {
#[test]
fn run_builtin_fn_testcases() {
common_telemetry::init_default_ut_logging();
let loc = Path::new("src/python/builtins/testcases.ron");
let loc = loc.to_str().expect("Fail to parse path");
let mut file = File::open(loc).expect("Fail to open file");
@@ -343,7 +346,7 @@ fn run_builtin_fn_testcases() {
PyVector::make_class(&vm.ctx);
});
for (idx, case) in testcases.into_iter().enumerate() {
print!("Testcase {idx} ...");
info!("Testcase {idx} ...");
cached_vm
.enter(|vm| {
let scope = vm.new_scope_with_builtins();
@@ -368,7 +371,7 @@ fn run_builtin_fn_testcases() {
let err_res = format_py_error(e, vm).to_string();
match case.expect{
Ok(v) => {
println!("\nError:\n{err_res}");
error!("\nError:\n{err_res}");
panic!("Expect Ok: {v:?}, found Error");
},
Err(err) => {
@@ -397,7 +400,6 @@ fn run_builtin_fn_testcases() {
}
};
});
println!(" passed!");
}
}
@@ -443,6 +445,8 @@ fn set_lst_of_vecs_in_scope(
#[allow(unused_must_use)]
#[test]
fn test_vm() {
common_telemetry::init_default_ut_logging();
rustpython_vm::Interpreter::with_init(Default::default(), |vm| {
vm.add_native_module("udf_builtins", Box::new(greptime_builtin::make_module));
// this can be in `.enter()` closure, but for clearity, put it in the `with_init()`
@@ -471,11 +475,10 @@ sin(values)"#,
.map_err(|err| vm.new_syntax_error(&err))
.unwrap();
let res = vm.run_code_obj(code_obj, scope);
println!("{:#?}", res);
match res {
Err(e) => {
let err_res = format_py_error(e, vm).to_string();
println!("Error:\n{err_res}");
error!("Error:\n{err_res}");
}
Ok(obj) => {
let _ser = PyValue::from_py_obj(&obj, vm);

View File

@@ -20,6 +20,7 @@ use std::io::prelude::*;
use std::path::Path;
use std::sync::Arc;
use common_telemetry::{error, info};
use console::style;
use datafusion_common::record_batch::RecordBatch as DfRecordBatch;
use datatypes::arrow::array::PrimitiveArray;
@@ -82,6 +83,8 @@ fn create_sample_recordbatch() -> DfRecordBatch {
/// and exec/parse (depending on the type of predicate) then decide if result is as expected
#[test]
fn run_ron_testcases() {
common_telemetry::init_default_ut_logging();
let loc = Path::new("src/python/testcases.ron");
let loc = loc.to_str().expect("Fail to parse path");
let mut file = File::open(loc).expect("Fail to open file");
@@ -89,9 +92,9 @@ fn run_ron_testcases() {
file.read_to_string(&mut buf)
.expect("Fail to read to string");
let testcases: Vec<TestCase> = from_ron_string(&buf).expect("Fail to convert to testcases");
println!("Read {} testcases from {}", testcases.len(), loc);
info!("Read {} testcases from {}", testcases.len(), loc);
for testcase in testcases {
print!(".ron test {}", testcase.name);
info!(".ron test {}", testcase.name);
match testcase.predicate {
Predicate::ParseIsOk { result } => {
let copr = parse_and_compile_copr(&testcase.code);
@@ -101,21 +104,19 @@ fn run_ron_testcases() {
}
Predicate::ParseIsErr { reason } => {
let copr = parse_and_compile_copr(&testcase.code);
if copr.is_ok() {
eprintln!("Expect to be err, found{copr:#?}");
panic!()
}
assert!(copr.is_err(), "Expect to be err, actual {copr:#?}");
let res = &copr.unwrap_err();
println!(
error!(
"{}",
pretty_print_error_in_src(&testcase.code, res, 0, "<embedded>")
);
let (res, _) = get_error_reason_loc(res);
if !res.contains(&reason) {
eprintln!("{}", testcase.code);
eprintln!("Parse Error, expect \"{reason}\" in \"{res}\", but not found.");
panic!()
}
assert!(
res.contains(&reason),
"{} Parse Error, expect \"{reason}\" in \"{res}\", actual not found.",
testcase.code,
);
}
Predicate::ExecIsOk { fields, columns } => {
let rb = create_sample_recordbatch();
@@ -129,28 +130,25 @@ fn run_ron_testcases() {
.iter()
.zip(&res.schema.arrow_schema().fields)
.map(|(anno, real)| {
if !(anno.datatype.clone().unwrap() == real.data_type
&& anno.is_nullable == real.is_nullable)
{
eprintln!("fields expect to be {anno:#?}, found to be {real:#?}.");
panic!()
}
assert!(
anno.datatype.clone().unwrap() == real.data_type
&& anno.is_nullable == real.is_nullable,
"Fields expected to be {anno:#?}, actual {real:#?}"
);
})
.count();
columns
.iter()
.zip(res.df_recordbatch.columns())
.map(|(anno, real)| {
if !(&anno.ty == real.data_type() && anno.len == real.len()) {
eprintln!(
"Unmatch type or length!Expect [{:#?}; {}], found [{:#?}; {}]",
anno.ty,
anno.len,
real.data_type(),
real.len()
);
panic!()
}
assert!(
&anno.ty == real.data_type() && anno.len == real.len(),
"Type or length not match! Expect [{:#?}; {}], actual [{:#?}; {}]",
anno.ty,
anno.len,
real.data_type(),
real.len()
);
})
.count();
}
@@ -159,28 +157,24 @@ fn run_ron_testcases() {
} => {
let rb = create_sample_recordbatch();
let res = coprocessor::exec_coprocessor(&testcase.code, &rb);
assert!(res.is_err(), "{:#?}\nExpect Err(...), actual Ok(...)", res);
if let Err(res) = res {
println!(
error!(
"{}",
pretty_print_error_in_src(&testcase.code, &res, 1120, "<embedded>")
);
let (reason, _) = get_error_reason_loc(&res);
if !reason.contains(&part_reason) {
eprintln!(
"{}\nExecute error, expect \"{reason}\" in \"{res}\", but not found.",
testcase.code,
reason = style(reason).green(),
res = style(res).red()
);
panic!()
}
} else {
eprintln!("{:#?}\nExpect Err(...), found Ok(...)", res);
panic!();
assert!(
reason.contains(&part_reason),
"{}\nExecute error, expect \"{reason}\" in \"{res}\", actual not found.",
testcase.code,
reason = style(reason).green(),
res = style(res).red()
)
}
}
}
println!(" ... {}", style("ok✅").green());
info!(" ... {}", style("ok✅").green());
}
}
@@ -275,7 +269,7 @@ def calc_rvs(open_time, close):
0,
"copr.py",
);
println!("{res}");
info!("{res}");
} else if let Ok(res) = ret {
dbg!(&res);
} else {
@@ -319,7 +313,7 @@ def a(cpu, mem):
0,
"copr.py",
);
println!("{res}");
info!("{res}");
} else if let Ok(res) = ret {
dbg!(&res);
} else {

View File

@@ -1039,6 +1039,7 @@ pub mod tests {
use std::sync::Arc;
use common_telemetry::info;
use datatypes::vectors::{Float32Vector, Int32Vector, NullVector};
use rustpython_vm::builtins::PyList;
use rustpython_vm::class::PyClassImpl;
@@ -1170,9 +1171,10 @@ pub mod tests {
}
#[test]
#[allow(clippy::print_stdout)]
// for debug purpose, also this is already a test function so allow print_stdout shouldn't be a problem?
fn test_execute_script() {
common_telemetry::init_default_ut_logging();
fn is_eq<T: std::cmp::PartialEq + rustpython_vm::TryFromObject>(
v: PyResult,
i: T,
@@ -1221,7 +1223,7 @@ pub mod tests {
for (code, pred) in snippet {
let result = execute_script(&interpreter, code, None, pred);
println!(
info!(
"\u{001B}[35m{code}\u{001B}[0m: {:?}{}",
result.clone().map(|v| v.0),
result

View File

@@ -30,6 +30,10 @@ impl Default for QueryContext {
}
impl QueryContext {
pub fn arc() -> QueryContextRef {
Arc::new(QueryContext::new())
}
pub fn new() -> Self {
Self {
current_schema: ArcSwapOption::new(None),

View File

@@ -29,8 +29,6 @@ use datanode::instance::{Instance, InstanceRef};
use datanode::sql::SqlHandler;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, SchemaBuilder};
use frontend::frontend::FrontendOptions;
use frontend::grpc::GrpcOptions;
use frontend::instance::{FrontendInstance, Instance as FeInstance};
use object_store::backend::s3;
use object_store::test_util::TempFolder;
@@ -215,8 +213,7 @@ pub async fn create_test_table(
}
async fn build_frontend_instance(datanode_instance: InstanceRef) -> FeInstance {
let fe_opts = FrontendOptions::default();
let mut frontend_instance = FeInstance::try_new(&fe_opts).await.unwrap();
let mut frontend_instance = FeInstance::new_standalone(datanode_instance.clone());
frontend_instance.set_catalog_manager(datanode_instance.catalog_manager().clone());
frontend_instance.set_script_handler(datanode_instance);
frontend_instance
@@ -262,19 +259,13 @@ pub async fn setup_test_app_with_frontend(
pub async fn setup_grpc_server(
store_type: StorageType,
name: &str,
) -> (String, TestGuard, Arc<GrpcServer>, Arc<GrpcServer>) {
) -> (String, TestGuard, Arc<GrpcServer>) {
common_telemetry::init_default_ut_logging();
let datanode_port = get_port();
let frontend_port = get_port();
let (mut opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name);
let datanode_grpc_addr = format!("127.0.0.1:{}", datanode_port);
opts.rpc_addr = datanode_grpc_addr.clone();
let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name);
let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap());
instance.start().await.unwrap();
let datanode_grpc_addr = datanode_grpc_addr.clone();
let runtime = Arc::new(
RuntimeBuilder::default()
.worker_threads(2)
@@ -283,26 +274,9 @@ pub async fn setup_grpc_server(
.unwrap(),
);
let fe_grpc_addr = format!("127.0.0.1:{}", frontend_port);
let fe_opts = FrontendOptions {
mode: Mode::Standalone,
datanode_rpc_addr: datanode_grpc_addr.clone(),
grpc_options: Some(GrpcOptions {
addr: fe_grpc_addr.clone(),
runtime_size: 8,
}),
..Default::default()
};
let fe_grpc_addr = format!("127.0.0.1:{}", get_port());
let datanode_grpc_server = Arc::new(GrpcServer::new(
instance.clone(),
instance.clone(),
runtime.clone(),
));
let mut fe_instance = frontend::instance::Instance::try_new(&fe_opts)
.await
.unwrap();
let mut fe_instance = frontend::instance::Instance::new_standalone(instance.clone());
fe_instance.set_catalog_manager(instance.catalog_manager().clone());
let fe_instance_ref = Arc::new(fe_instance);
@@ -319,15 +293,8 @@ pub async fn setup_grpc_server(
grpc_server_clone.start(addr).await.unwrap()
});
let dn_grpc_addr_clone = datanode_grpc_addr.clone();
let dn_grpc_server_clone = datanode_grpc_server.clone();
tokio::spawn(async move {
let addr = dn_grpc_addr_clone.parse::<SocketAddr>().unwrap();
dn_grpc_server_clone.start(addr).await.unwrap()
});
// wait for GRPC server to start
tokio::time::sleep(Duration::from_secs(1)).await;
(fe_grpc_addr, guard, fe_grpc_server, datanode_grpc_server)
(fe_grpc_addr, guard, fe_grpc_server)
}

View File

@@ -61,14 +61,13 @@ macro_rules! grpc_tests {
}
pub async fn test_auto_create_table(store_type: StorageType) {
let (addr, mut guard, fe_grpc_server, dn_grpc_server) =
let (addr, mut guard, fe_grpc_server) =
setup_grpc_server(store_type, "auto_create_table").await;
let grpc_client = Client::with_urls(vec![addr]);
let db = Database::new("greptime", grpc_client);
insert_and_assert(&db).await;
let _ = fe_grpc_server.shutdown().await;
let _ = dn_grpc_server.shutdown().await;
guard.remove_all().await;
}
@@ -128,7 +127,7 @@ fn expect_data() -> (Column, Column, Column, Column) {
pub async fn test_insert_and_select(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (addr, mut guard, fe_grpc_server, dn_grpc_server) =
let (addr, mut guard, fe_grpc_server) =
setup_grpc_server(store_type, "insert_and_select").await;
let grpc_client = Client::with_urls(vec![addr]);
@@ -173,7 +172,6 @@ pub async fn test_insert_and_select(store_type: StorageType) {
insert_and_assert(&db).await;
let _ = fe_grpc_server.shutdown().await;
let _ = dn_grpc_server.shutdown().await;
guard.remove_all().await;
}