diff --git a/Cargo.lock b/Cargo.lock index 8c6a62befa..dcdf4ecf6a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1389,6 +1389,7 @@ version = "0.1.0" dependencies = [ "anymap", "build-data", + "catalog", "clap 3.2.23", "client", "common-base", @@ -1403,11 +1404,15 @@ dependencies = [ "meta-client", "meta-srv", "nu-ansi-term", + "partition", + "query", "rexpect", "rustyline", "serde", "servers", + "session", "snafu", + "substrait 0.1.0", "tempdir", "tokio", "toml", diff --git a/src/client/src/database.rs b/src/client/src/database.rs index 737ca8492c..8b88728ca6 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -55,10 +55,18 @@ impl Database { } } + pub fn catalog(&self) -> &String { + &self.catalog + } + pub fn set_catalog(&mut self, catalog: impl Into) { self.catalog = catalog.into(); } + pub fn schema(&self) -> &String { + &self.schema + } + pub fn set_schema(&mut self, schema: impl Into) { self.schema = schema.into(); } diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml index 2a0c891d45..97b2fead16 100644 --- a/src/cmd/Cargo.toml +++ b/src/cmd/Cargo.toml @@ -11,12 +11,14 @@ path = "src/bin/greptime.rs" [dependencies] anymap = "1.0.0-beta.2" +catalog = { path = "../catalog" } clap = { version = "3.1", features = ["derive"] } client = { path = "../client" } common-base = { path = "../common/base" } common-error = { path = "../common/error" } common-query = { path = "../common/query" } common-recordbatch = { path = "../common/recordbatch" } +substrait = { path = "../common/substrait" } common-telemetry = { path = "../common/telemetry", features = [ "deadlock_detection", ] } @@ -27,9 +29,12 @@ futures.workspace = true meta-client = { path = "../meta-client" } meta-srv = { path = "../meta-srv" } nu-ansi-term = "0.46" +partition = { path = "../partition" } +query = { path = "../query" } rustyline = "10.1" serde.workspace = true servers = { path = "../servers" } +session = { path = "../session" } snafu.workspace = true tokio.workspace = true toml = "0.5" diff --git a/src/cmd/src/cli.rs b/src/cmd/src/cli.rs index 6de7a91a39..dbabaf8e60 100644 --- a/src/cmd/src/cli.rs +++ b/src/cmd/src/cli.rs @@ -50,13 +50,15 @@ impl SubCommand { pub(crate) struct AttachCommand { #[clap(long)] pub(crate) grpc_addr: String, + #[clap(long)] + pub(crate) meta_addr: Option, #[clap(long, action)] pub(crate) disable_helper: bool, } impl AttachCommand { async fn run(self) -> Result<()> { - let mut repl = Repl::try_new(&self)?; + let mut repl = Repl::try_new(&self).await?; repl.run().await } } diff --git a/src/cmd/src/cli/repl.rs b/src/cmd/src/cli/repl.rs index ae0e4d0c46..79f3c38864 100644 --- a/src/cmd/src/cli/repl.rs +++ b/src/cmd/src/cli/repl.rs @@ -13,24 +13,39 @@ // limitations under the License. use std::path::PathBuf; +use std::sync::Arc; use std::time::Instant; +use catalog::remote::MetaKvBackend; use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_error::prelude::ErrorExt; use common_query::Output; use common_recordbatch::RecordBatches; use common_telemetry::logging; use either::Either; +use frontend::catalog::FrontendCatalogManager; +use frontend::datanode::DatanodeClients; +use meta_client::client::MetaClientBuilder; +use partition::manager::PartitionRuleManager; +use partition::route::TableRoutes; +use query::datafusion::DatafusionQueryEngine; +use query::logical_optimizer::LogicalOptimizer; +use query::parser::QueryLanguageParser; +use query::plan::LogicalPlan; +use query::QueryEngine; use rustyline::error::ReadlineError; use rustyline::Editor; +use session::context::QueryContext; use snafu::{ErrorCompat, ResultExt}; +use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; use crate::cli::cmd::ReplCommand; use crate::cli::helper::RustylineHelper; use crate::cli::AttachCommand; use crate::error::{ - CollectRecordBatchesSnafu, PrettyPrintRecordBatchesSnafu, ReadlineSnafu, ReplCreationSnafu, - RequestDatabaseSnafu, Result, + CollectRecordBatchesSnafu, ParseSqlSnafu, PlanStatementSnafu, PrettyPrintRecordBatchesSnafu, + ReadlineSnafu, ReplCreationSnafu, RequestDatabaseSnafu, Result, StartMetaClientSnafu, + SubstraitEncodeLogicalPlanSnafu, }; /// Captures the state of the repl, gathers commands and executes them one by one @@ -43,6 +58,8 @@ pub(crate) struct Repl { /// Client for interacting with GreptimeDB database: Database, + + query_engine: Option, } #[allow(clippy::print_stdout)] @@ -51,7 +68,7 @@ impl Repl { println!("{}", ReplCommand::help()) } - pub(crate) fn try_new(cmd: &AttachCommand) -> Result { + pub(crate) async fn try_new(cmd: &AttachCommand) -> Result { let mut rl = Editor::new().context(ReplCreationSnafu)?; if !cmd.disable_helper { @@ -69,10 +86,17 @@ impl Repl { let client = Client::with_urls([&cmd.grpc_addr]); let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); + let query_engine = if let Some(meta_addr) = &cmd.meta_addr { + create_query_engine(meta_addr).await.map(Some)? + } else { + None + }; + Ok(Self { rl, prompt: "> ".to_string(), database, + query_engine, }) } @@ -134,11 +158,29 @@ impl Repl { async fn do_execute_sql(&self, sql: String) -> Result<()> { let start = Instant::now(); - let output = self - .database - .sql(&sql) - .await - .context(RequestDatabaseSnafu { sql: &sql })?; + let output = if let Some(query_engine) = &self.query_engine { + let stmt = QueryLanguageParser::parse_sql(&sql) + .with_context(|_| ParseSqlSnafu { sql: sql.clone() })?; + + let query_ctx = Arc::new(QueryContext::with( + self.database.catalog(), + self.database.schema(), + )); + let LogicalPlan::DfPlan(plan) = query_engine + .statement_to_plan(stmt, query_ctx) + .await + .and_then(|x| query_engine.optimize(&x)) + .context(PlanStatementSnafu)?; + + let plan = DFLogicalSubstraitConvertor {} + .encode(plan) + .context(SubstraitEncodeLogicalPlanSnafu)?; + + self.database.logical_plan(plan.to_vec()).await + } else { + self.database.sql(&sql).await + } + .context(RequestDatabaseSnafu { sql: &sql })?; let either = match output { Output::Stream(s) => { @@ -197,3 +239,29 @@ fn history_file() -> PathBuf { buf.push(".greptimedb_cli_history"); buf } + +async fn create_query_engine(meta_addr: &str) -> Result { + let mut meta_client = MetaClientBuilder::default().enable_store().build(); + meta_client + .start([meta_addr]) + .await + .context(StartMetaClientSnafu)?; + let meta_client = Arc::new(meta_client); + + let backend = Arc::new(MetaKvBackend { + client: meta_client.clone(), + }); + + let table_routes = Arc::new(TableRoutes::new(meta_client)); + let partition_manager = Arc::new(PartitionRuleManager::new(table_routes)); + + let datanode_clients = Arc::new(DatanodeClients::default()); + + let catalog_list = Arc::new(FrontendCatalogManager::new( + backend, + partition_manager, + datanode_clients, + )); + + Ok(DatafusionQueryEngine::new(catalog_list, Default::default())) +} diff --git a/src/cmd/src/error.rs b/src/cmd/src/error.rs index 209f41b1a1..9a7c083c75 100644 --- a/src/cmd/src/error.rs +++ b/src/cmd/src/error.rs @@ -103,6 +103,31 @@ pub enum Error { #[snafu(backtrace)] source: common_recordbatch::error::Error, }, + + #[snafu(display("Failed to start Meta client, source: {}", source))] + StartMetaClient { + #[snafu(backtrace)] + source: meta_client::error::Error, + }, + + #[snafu(display("Failed to parse SQL: {}, source: {}", sql, source))] + ParseSql { + sql: String, + #[snafu(backtrace)] + source: query::error::Error, + }, + + #[snafu(display("Failed to plan statement, source: {}", source))] + PlanStatement { + #[snafu(backtrace)] + source: query::error::Error, + }, + + #[snafu(display("Failed to encode logical plan in substrait, source: {}", source))] + SubstraitEncodeLogicalPlan { + #[snafu(backtrace)] + source: substrait::error::Error, + }, } pub type Result = std::result::Result; @@ -126,6 +151,11 @@ impl ErrorExt for Error { Error::CollectRecordBatches { source } | Error::PrettyPrintRecordBatches { source } => { source.status_code() } + Error::StartMetaClient { source } => source.status_code(), + Error::ParseSql { source, .. } | Error::PlanStatement { source } => { + source.status_code() + } + Error::SubstraitEncodeLogicalPlan { source } => source.status_code(), } } diff --git a/src/frontend/src/catalog.rs b/src/frontend/src/catalog.rs index f4d50c3f05..f61b7412f2 100644 --- a/src/frontend/src/catalog.rs +++ b/src/frontend/src/catalog.rs @@ -46,7 +46,7 @@ pub struct FrontendCatalogManager { } impl FrontendCatalogManager { - pub(crate) fn new( + pub fn new( backend: KvBackendRef, partition_manager: PartitionRuleManagerRef, datanode_clients: Arc, diff --git a/src/frontend/src/datanode.rs b/src/frontend/src/datanode.rs index c6b8efdbf2..ea496d37d5 100644 --- a/src/frontend/src/datanode.rs +++ b/src/frontend/src/datanode.rs @@ -19,13 +19,13 @@ use common_grpc::channel_manager::ChannelManager; use meta_client::rpc::Peer; use moka::future::{Cache, CacheBuilder}; -pub(crate) struct DatanodeClients { +pub struct DatanodeClients { channel_manager: ChannelManager, clients: Cache, } -impl DatanodeClients { - pub(crate) fn new() -> Self { +impl Default for DatanodeClients { + fn default() -> Self { Self { channel_manager: ChannelManager::new(), clients: CacheBuilder::new(1024) @@ -34,7 +34,9 @@ impl DatanodeClients { .build(), } } +} +impl DatanodeClients { pub(crate) async fn get_client(&self, datanode: &Peer) -> Client { self.clients .get_with_by_ref(datanode, async move { diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 275ab27918..3448a63870 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -119,7 +119,7 @@ impl Instance { }); let table_routes = Arc::new(TableRoutes::new(meta_client.clone())); let partition_manager = Arc::new(PartitionRuleManager::new(table_routes)); - let datanode_clients = Arc::new(DatanodeClients::new()); + let datanode_clients = Arc::new(DatanodeClients::default()); let catalog_manager = Arc::new(FrontendCatalogManager::new( meta_backend, diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index ea9fd25d70..4c33577f2c 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -14,8 +14,8 @@ #![feature(assert_matches)] -mod catalog; -mod datanode; +pub mod catalog; +pub mod datanode; pub mod error; mod expr_factory; pub mod frontend; diff --git a/src/frontend/src/tests.rs b/src/frontend/src/tests.rs index b67e68cee8..91fb322528 100644 --- a/src/frontend/src/tests.rs +++ b/src/frontend/src/tests.rs @@ -210,7 +210,7 @@ pub(crate) async fn create_distributed_instance(test_name: &str) -> MockDistribu let kv_store: KvStoreRef = Arc::new(MemStore::default()) as _; let meta_srv = meta_srv::mocks::mock(MetaSrvOptions::default(), kv_store.clone(), None).await; - let datanode_clients = Arc::new(DatanodeClients::new()); + let datanode_clients = Arc::new(DatanodeClients::default()); let mut test_guards = vec![]; diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index f7dde0a2e6..53ab61c611 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -58,7 +58,7 @@ use crate::plan::LogicalPlan; use crate::query_engine::{QueryEngineContext, QueryEngineState}; use crate::{metric, QueryEngine}; -pub(crate) struct DatafusionQueryEngine { +pub struct DatafusionQueryEngine { state: QueryEngineState, } @@ -145,14 +145,14 @@ impl QueryEngine for DatafusionQueryEngine { // TODO(sunng87): consider cache optmised logical plan between describe // and execute let plan = self.statement_to_plan(stmt, query_ctx).await?; - let mut ctx = QueryEngineContext::new(self.state.session_state()); - let optimised_plan = self.optimize_logical_plan(&mut ctx, &plan)?; + let optimised_plan = self.optimize(&plan)?; optimised_plan.schema() } async fn execute(&self, plan: &LogicalPlan) -> Result { + let logical_plan = self.optimize(plan)?; + let mut ctx = QueryEngineContext::new(self.state.session_state()); - let logical_plan = self.optimize_logical_plan(&mut ctx, plan)?; let physical_plan = self.create_physical_plan(&mut ctx, &logical_plan).await?; let physical_plan = self.optimize_physical_plan(&mut ctx, physical_plan)?; @@ -185,16 +185,13 @@ impl QueryEngine for DatafusionQueryEngine { } impl LogicalOptimizer for DatafusionQueryEngine { - fn optimize_logical_plan( - &self, - ctx: &mut QueryEngineContext, - plan: &LogicalPlan, - ) -> Result { + fn optimize(&self, plan: &LogicalPlan) -> Result { let _timer = timer!(metric::METRIC_OPTIMIZE_LOGICAL_ELAPSED); match plan { LogicalPlan::DfPlan(df_plan) => { - let state = ctx.state(); - let optimized_plan = state + let optimized_plan = self + .state + .session_state() .optimize(df_plan) .context(error::DatafusionSnafu { msg: "Fail to optimize logical plan", diff --git a/src/query/src/lib.rs b/src/query/src/lib.rs index 02fce7b982..689e36b1a8 100644 --- a/src/query/src/lib.rs +++ b/src/query/src/lib.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod datafusion; +pub mod datafusion; pub mod error; pub mod executor; mod function; diff --git a/src/query/src/logical_optimizer.rs b/src/query/src/logical_optimizer.rs index 5e3db6d8fe..97e5a70d4a 100644 --- a/src/query/src/logical_optimizer.rs +++ b/src/query/src/logical_optimizer.rs @@ -14,12 +14,7 @@ use crate::error::Result; use crate::plan::LogicalPlan; -use crate::query_engine::QueryEngineContext; pub trait LogicalOptimizer { - fn optimize_logical_plan( - &self, - ctx: &mut QueryEngineContext, - plan: &LogicalPlan, - ) -> Result; + fn optimize(&self, plan: &LogicalPlan) -> Result; }