feat: distributed query in Frontend (#486)

* feat: distributed query in Frontend

Co-authored-by: luofucong <luofucong@greptime.com>
This commit is contained in:
LFC
2022-11-14 18:15:49 +08:00
committed by GitHub
parent dcd5e34dbd
commit d10e45f4aa
15 changed files with 74 additions and 31 deletions

View File

@@ -96,7 +96,7 @@ impl Instance {
}
};
let query_engine = factory.query_engine().clone();
let query_engine = factory.query_engine();
let script_executor =
ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()).await?;

View File

@@ -44,7 +44,7 @@ impl Instance {
);
let factory = QueryEngineFactory::new(catalog_manager.clone());
let query_engine = factory.query_engine().clone();
let query_engine = factory.query_engine();
let sql_handler = SqlHandler::new(mock_engine.clone(), catalog_manager.clone());
let physical_planner = PhysicalPlanner::new(query_engine.clone());
@@ -100,7 +100,7 @@ impl Instance {
));
let factory = QueryEngineFactory::new(catalog_manager.clone());
let query_engine = factory.query_engine().clone();
let query_engine = factory.query_engine();
let script_executor =
ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()).await?;

View File

@@ -214,7 +214,7 @@ mod tests {
.unwrap(),
);
let factory = QueryEngineFactory::new(catalog_list.clone());
let query_engine = factory.query_engine().clone();
let query_engine = factory.query_engine();
let sql_handler = SqlHandler::new(table_engine, catalog_list);
let stmt = match query_engine.sql_to_statement(sql).unwrap() {

View File

@@ -343,6 +343,13 @@ pub enum Error {
#[snafu(backtrace)]
source: client::Error,
},
#[snafu(display("Failed to execute sql: {}, source: {}", sql, source))]
ExecuteSql {
sql: String,
#[snafu(backtrace)]
source: query::error::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -415,6 +422,8 @@ impl ErrorExt for Error {
Error::Select { source, .. } => source.status_code(),
Error::FindNewColumnsOnInsertion { source, .. } => source.status_code(),
Error::DeserializeInsertBatch { source, .. } => source.status_code(),
Error::ExecuteSql { source, .. } => source.status_code(),
}
}

View File

@@ -145,13 +145,13 @@ impl Instance {
});
let table_routes = Arc::new(TableRoutes::new(meta_client.clone()));
let datanode_clients = Arc::new(DatanodeClients::new());
let catalog_manager = FrontendCatalogManager::new(
let catalog_manager = Arc::new(FrontendCatalogManager::new(
meta_backend,
table_routes,
datanode_clients.clone(),
);
));
instance.catalog_manager = Some(Arc::new(catalog_manager.clone()));
instance.catalog_manager = Some(catalog_manager.clone());
Some(DistInstance::new(
meta_client,
@@ -178,11 +178,15 @@ impl Instance {
}
pub async fn handle_select(&self, expr: Select) -> Result<Output> {
self.database()
.select(expr)
.await
.and_then(Output::try_from)
.context(SelectSnafu)
if let Some(dist_instance) = &self.dist_instance {
dist_instance.handle_select(expr).await
} else {
self.database()
.select(expr)
.await
.and_then(Output::try_from)
.context(SelectSnafu)
}
}
/// Convert `CreateTable` statement to `CreateExpr` gRPC request.

View File

@@ -3,6 +3,7 @@ use std::sync::Arc;
use chrono::DateTime;
use client::admin::{admin_result_to_output, Admin};
use client::Select;
use common_catalog::{TableGlobalKey, TableGlobalValue};
use common_query::Output;
use common_telemetry::debug;
@@ -12,6 +13,7 @@ use meta_client::rpc::{
CreateRequest as MetaCreateRequest, Partition as MetaPartition, RouteResponse, TableName,
TableRoute,
};
use query::{QueryEngineFactory, QueryEngineRef};
use snafu::{ensure, OptionExt, ResultExt};
use sql::statements::create::CreateTable;
use sql::statements::{
@@ -31,20 +33,23 @@ use crate::partitioning::{PartitionBound, PartitionDef};
#[derive(Clone)]
pub(crate) struct DistInstance {
meta_client: Arc<MetaClient>,
catalog_manager: FrontendCatalogManager,
catalog_manager: Arc<FrontendCatalogManager>,
datanode_clients: Arc<DatanodeClients>,
query_engine: QueryEngineRef,
}
impl DistInstance {
pub(crate) fn new(
meta_client: Arc<MetaClient>,
catalog_manager: FrontendCatalogManager,
catalog_manager: Arc<FrontendCatalogManager>,
datanode_clients: Arc<DatanodeClients>,
) -> Self {
let query_engine = QueryEngineFactory::new(catalog_manager.clone()).query_engine();
Self {
meta_client,
catalog_manager,
datanode_clients,
query_engine,
}
}
@@ -96,6 +101,18 @@ impl DistInstance {
Ok(Output::AffectedRows(region_routes.len()))
}
pub(crate) async fn handle_select(&self, select: Select) -> Result<Output> {
let Select::Sql(sql) = select;
let plan = self
.query_engine
.sql_to_plan(&sql)
.with_context(|_| error::ExecuteSqlSnafu { sql: sql.clone() })?;
self.query_engine
.execute(&plan)
.await
.context(error::ExecuteSqlSnafu { sql })
}
async fn create_table_in_meta(&self, create_table: &CreateTable) -> Result<RouteResponse> {
let (catalog, schema, table) =
table_idents_to_full_name(&create_table.name).context(error::ParseSqlSnafu)?;

View File

@@ -434,6 +434,8 @@ impl PartitionExec {
#[allow(clippy::print_stdout)]
#[cfg(test)]
mod test {
use std::time::Duration;
use api::v1::codec::InsertBatch;
use api::v1::column::SemanticType;
use api::v1::{column, insert_expr, Column, ColumnDataType};
@@ -652,8 +654,6 @@ mod test {
}
#[tokio::test(flavor = "multi_thread")]
// FIXME(LFC): Remove ignore when auto create table upon insertion is ready.
#[ignore]
async fn test_dist_table_scan() {
common_telemetry::init_default_ut_logging();
let table = Arc::new(new_dist_table().await);
@@ -774,11 +774,11 @@ mod test {
client: meta_client.clone(),
});
let table_routes = Arc::new(TableRoutes::new(meta_client.clone()));
let catalog_manager = FrontendCatalogManager::new(
let catalog_manager = Arc::new(FrontendCatalogManager::new(
meta_backend,
table_routes.clone(),
datanode_clients.clone(),
);
));
let dist_instance = DistInstance::new(
meta_client.clone(),
catalog_manager,
@@ -807,6 +807,9 @@ mod test {
Statement::CreateTable(c) => c,
_ => unreachable!(),
};
wait_datanodes_alive(kv_store).await;
let _result = dist_instance.create_table(&create_table).await.unwrap();
let table_route = table_routes.get_route(&table_name).await.unwrap();
@@ -844,6 +847,20 @@ mod test {
}
}
async fn wait_datanodes_alive(kv_store: KvStoreRef) {
let wait = 10;
for _ in 0..wait {
let datanodes = meta_srv::lease::alive_datanodes(1000, &kv_store, |_, _| true)
.await
.unwrap();
if datanodes.len() >= 4 {
return;
}
tokio::time::sleep(Duration::from_secs(1)).await
}
panic!()
}
async fn insert_testing_data(
table_name: &TableName,
dn_instance: Arc<Instance>,

View File

@@ -3,7 +3,7 @@ pub mod bootstrap;
pub mod error;
pub mod handler;
mod keys;
mod lease;
pub mod lease;
pub mod metasrv;
#[cfg(feature = "mock")]
pub mod mocks;

View File

@@ -259,8 +259,7 @@ mod tests {
.register_catalog(DEFAULT_CATALOG_NAME.to_string(), default_catalog)
.unwrap();
let factory = QueryEngineFactory::new(catalog_list);
factory.query_engine().clone()
QueryEngineFactory::new(catalog_list).query_engine()
}
#[test]

View File

@@ -59,8 +59,8 @@ impl QueryEngineFactory {
}
impl QueryEngineFactory {
pub fn query_engine(&self) -> &Arc<dyn QueryEngine> {
&self.query_engine
pub fn query_engine(&self) -> QueryEngineRef {
self.query_engine.clone()
}
}

View File

@@ -53,8 +53,7 @@ pub fn create_query_engine() -> Arc<dyn QueryEngine> {
.register_catalog(DEFAULT_CATALOG_NAME.to_string(), catalog_provider)
.unwrap();
let factory = QueryEngineFactory::new(catalog_list);
factory.query_engine().clone()
QueryEngineFactory::new(catalog_list).query_engine()
}
pub async fn get_numbers_from_table<'s, T>(

View File

@@ -143,6 +143,5 @@ fn create_correctness_engine() -> Arc<dyn QueryEngine> {
.register_catalog(DEFAULT_CATALOG_NAME.to_string(), catalog_provider)
.unwrap();
let factory = QueryEngineFactory::new(catalog_list);
factory.query_engine().clone()
QueryEngineFactory::new(catalog_list).query_engine()
}

View File

@@ -216,8 +216,7 @@ fn create_query_engine() -> Arc<dyn QueryEngine> {
.register_catalog(DEFAULT_CATALOG_NAME.to_string(), catalog_provider)
.unwrap();
let factory = QueryEngineFactory::new(catalog_list);
factory.query_engine().clone()
QueryEngineFactory::new(catalog_list).query_engine()
}
async fn get_numbers_from_table<'s, T>(

View File

@@ -124,7 +124,7 @@ mod tests {
);
let factory = QueryEngineFactory::new(catalog_manager.clone());
let query_engine = factory.query_engine().clone();
let query_engine = factory.query_engine();
let mgr = ScriptManager::new(catalog_manager.clone(), query_engine)
.await
.unwrap();

View File

@@ -80,6 +80,6 @@ fn create_testing_sql_query_handler(table: MemTable) -> SqlQueryHandlerRef {
.unwrap();
let factory = QueryEngineFactory::new(catalog_list);
let query_engine = factory.query_engine().clone();
let query_engine = factory.query_engine();
Arc::new(DummyInstance::new(query_engine))
}