From 4aaf6aa51b5bed3cc7845e7855186f85617fa7f8 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 17 Aug 2023 19:02:31 +0800 Subject: [PATCH] feat: implement query API for RegionServer (#2197) * some initial change Signed-off-by: Ruihang Xia * impl dummy structs Signed-off-by: Ruihang Xia * decode and send logical plan Signed-off-by: Ruihang Xia * implement table scan Signed-off-by: Ruihang Xia * add some comments Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- Cargo.lock | 3 +- Cargo.toml | 2 +- src/catalog/src/information_schema.rs | 16 +- src/catalog/src/lib.rs | 1 - src/catalog/src/table_factory.rs | 19 -- src/common/meta/src/key/table_info.rs | 4 +- src/common/meta/src/key/table_route.rs | 2 +- src/common/recordbatch/src/adapter.rs | 6 +- src/datanode/Cargo.toml | 1 + src/datanode/src/error.rs | 28 ++- src/datanode/src/region_server.rs | 225 ++++++++++++++++++-- src/frontend/src/server.rs | 2 +- src/query/src/plan.rs | 6 + src/{table => store-api}/src/data_source.rs | 8 +- src/store-api/src/lib.rs | 1 + src/store-api/src/region_engine.rs | 11 +- src/table/src/lib.rs | 1 - src/table/src/table/scan.rs | 2 +- tests-integration/src/test_util.rs | 2 +- 19 files changed, 274 insertions(+), 66 deletions(-) delete mode 100644 src/catalog/src/table_factory.rs rename src/{table => store-api}/src/data_source.rs (86%) diff --git a/Cargo.lock b/Cargo.lock index c8a522ee35..bba4a44998 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2629,6 +2629,7 @@ dependencies = [ "axum", "axum-macros", "axum-test-helper", + "bytes", "catalog", "client", "common-base", @@ -4137,7 +4138,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=fa79839bbb304e0e7097e1cc4bcac9a63b3e496a#fa79839bbb304e0e7097e1cc4bcac9a63b3e496a" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=c30a2607be4044502094b25c408171a666a8ff6d#c30a2607be4044502094b25c408171a666a8ff6d" dependencies = [ "prost", "serde", diff --git a/Cargo.toml b/Cargo.toml index 134caa88e8..16054c93b1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -77,7 +77,7 @@ datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git derive_builder = "0.12" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "fa79839bbb304e0e7097e1cc4bcac9a63b3e496a" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "c30a2607be4044502094b25c408171a666a8ff6d" } itertools = "0.10" lazy_static = "1.4" once_cell = "1.18" diff --git a/src/catalog/src/information_schema.rs b/src/catalog/src/information_schema.rs index 43ae228854..699914c1a2 100644 --- a/src/catalog/src/information_schema.rs +++ b/src/catalog/src/information_schema.rs @@ -29,8 +29,8 @@ use common_recordbatch::{RecordBatchStreamAdaptor, SendableRecordBatchStream}; use datatypes::schema::SchemaRef; use futures_util::StreamExt; use snafu::ResultExt; +use store_api::data_source::{DataSource, TableFactory}; use store_api::storage::{ScanRequest, TableId}; -use table::data_source::DataSource; use table::error::{SchemaConversionSnafu, TablesRecordBatchSnafu}; use table::metadata::{TableIdent, TableInfoBuilder, TableMetaBuilder, TableType}; use table::{Result as TableResult, Table, TableRef}; @@ -38,7 +38,6 @@ use table::{Result as TableResult, Table, TableRef}; use self::columns::InformationSchemaColumns; use crate::error::Result; use crate::information_schema::tables::InformationSchemaTables; -use crate::table_factory::TableFactory; use crate::CatalogManager; pub const TABLES: &str = "tables"; @@ -219,18 +218,22 @@ impl Table for InformationTable { } async fn scan_to_stream(&self, request: ScanRequest) -> TableResult { - self.get_stream(request) + self.get_stream(request).context(TablesRecordBatchSnafu) } } impl DataSource for InformationTable { - fn get_stream(&self, request: ScanRequest) -> TableResult { + fn get_stream( + &self, + request: ScanRequest, + ) -> std::result::Result { let projection = request.projection; let projected_schema = if let Some(projection) = &projection { Arc::new( self.schema() .try_project(projection) - .context(SchemaConversionSnafu)?, + .context(SchemaConversionSnafu) + .map_err(BoxedError::new)?, ) } else { self.schema() @@ -239,7 +242,8 @@ impl DataSource for InformationTable { .stream_builder .to_stream() .map_err(BoxedError::new) - .context(TablesRecordBatchSnafu)? + .context(TablesRecordBatchSnafu) + .map_err(BoxedError::new)? .map(move |batch| { batch.and_then(|batch| { if let Some(projection) = &projection { diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index 1805aef59a..e0f4a4e860 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -37,7 +37,6 @@ pub mod local; mod metrics; pub mod remote; pub mod system; -pub mod table_factory; pub mod table_source; pub mod tables; diff --git a/src/catalog/src/table_factory.rs b/src/catalog/src/table_factory.rs deleted file mode 100644 index dc34e7e4c0..0000000000 --- a/src/catalog/src/table_factory.rs +++ /dev/null @@ -1,19 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::Arc; - -use table::data_source::DataSourceRef; - -pub type TableFactory = Arc DataSourceRef>; diff --git a/src/common/meta/src/key/table_info.rs b/src/common/meta/src/key/table_info.rs index c32cf70ef8..03ba8c5be7 100644 --- a/src/common/meta/src/key/table_info.rs +++ b/src/common/meta/src/key/table_info.rs @@ -157,7 +157,7 @@ impl TableInfoManager { .when(vec![Compare::with_value( raw_key.clone(), CompareOp::Equal, - raw_value.clone(), + raw_value, )]) .and_then(vec![TxnOp::Put( raw_key.clone(), @@ -180,7 +180,7 @@ impl TableInfoManager { let removed_key = to_removed_key(&String::from_utf8_lossy(&raw_key)); let txn = Txn::new().and_then(vec![ - TxnOp::Delete(raw_key.clone()), + TxnOp::Delete(raw_key), TxnOp::Put(removed_key.into_bytes(), raw_value), ]); diff --git a/src/common/meta/src/key/table_route.rs b/src/common/meta/src/key/table_route.rs index 7a75c8fc28..41613fd893 100644 --- a/src/common/meta/src/key/table_route.rs +++ b/src/common/meta/src/key/table_route.rs @@ -142,7 +142,7 @@ impl TableRouteManager { .when(vec![Compare::with_value( raw_key.clone(), CompareOp::Equal, - raw_value.clone(), + raw_value, )]) .and_then(vec![TxnOp::Put(raw_key.clone(), new_raw_value)]) .or_else(vec![TxnOp::Get(raw_key.clone())]); diff --git a/src/common/recordbatch/src/adapter.rs b/src/common/recordbatch/src/adapter.rs index 6790bd39cf..a4ed408c67 100644 --- a/src/common/recordbatch/src/adapter.rs +++ b/src/common/recordbatch/src/adapter.rs @@ -75,7 +75,8 @@ impl Stream for ParquetRecordBatchS } } -/// Greptime SendableRecordBatchStream -> DataFusion RecordBatchStream +/// Greptime SendableRecordBatchStream -> DataFusion RecordBatchStream. +/// The reverse one is [RecordBatchStreamAdapter]. pub struct DfRecordBatchStreamAdapter { stream: SendableRecordBatchStream, } @@ -112,7 +113,8 @@ impl Stream for DfRecordBatchStreamAdapter { } } -/// DataFusion [SendableRecordBatchStream](DfSendableRecordBatchStream) -> Greptime [RecordBatchStream] +/// DataFusion [SendableRecordBatchStream](DfSendableRecordBatchStream) -> Greptime [RecordBatchStream]. +/// The reverse one is [DfRecordBatchStreamAdapter] pub struct RecordBatchStreamAdapter { schema: SchemaRef, stream: DfSendableRecordBatchStream, diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index 5f6ac8ad7a..1f7589a901 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -14,6 +14,7 @@ async-stream.workspace = true async-trait.workspace = true axum = "0.6" axum-macros = "0.3" +bytes = "1.1" catalog = { workspace = true } common-base = { workspace = true } common-catalog = { workspace = true } diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 99467127da..651d20d303 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -506,6 +506,30 @@ pub enum Error { #[snafu(display("Unsupported gRPC request, kind: {}, location: {}", kind, location))] UnsupportedGrpcRequest { kind: String, location: Location }, + + #[snafu(display( + "Unsupported output type, expected: {}, location: {}", + expected, + location + ))] + UnsupportedOutput { + expected: String, + location: Location, + }, + + #[snafu(display( + "Failed to get metadata from engine {} for region_id {}, location: {}, source: {}", + engine, + region_id, + location, + source + ))] + GetRegionMetadata { + engine: String, + region_id: RegionId, + location: Location, + source: BoxedError, + }, } pub type Result = std::result::Result; @@ -585,7 +609,9 @@ impl ErrorExt for Error { | CloseTableEngine { .. } | JoinTask { .. } | RegionNotFound { .. } - | RegionEngineNotFound { .. } => StatusCode::Internal, + | RegionEngineNotFound { .. } + | UnsupportedOutput { .. } + | GetRegionMetadata { .. } => StatusCode::Internal, StartServer { source, .. } | ShutdownServer { source, .. } diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index a1ea63989d..b55252ec46 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -12,31 +12,56 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::any::Any; use std::collections::HashMap; +use std::sync::{Arc, Mutex}; -use common_base::bytes::Bytes; -use common_query::Output; +use api::v1::region::QueryRequest; +use async_trait::async_trait; +use bytes::Bytes; +use common_query::logical_plan::Expr; +use common_query::physical_plan::DfPhysicalPlanAdapter; +use common_query::{DfPhysicalPlan, Output}; use common_recordbatch::SendableRecordBatchStream; use common_telemetry::info; use dashmap::DashMap; +use datafusion::catalog::schema::SchemaProvider; +use datafusion::catalog::{CatalogList, CatalogProvider}; +use datafusion::datasource::TableProvider; +use datafusion::error::Result as DfResult; +use datafusion::execution::context::SessionState; +use datafusion_common::DataFusionError; +use datafusion_expr::{Expr as DfExpr, TableType}; +use datatypes::arrow::datatypes::SchemaRef; +use query::QueryEngineRef; +use session::context::QueryContext; use snafu::{OptionExt, ResultExt}; +use store_api::metadata::RegionMetadataRef; use store_api::region_engine::RegionEngineRef; use store_api::region_request::RegionRequest; -use store_api::storage::RegionId; +use store_api::storage::{RegionId, ScanRequest}; +use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; +use table::table::scan::StreamScanAdapter; use crate::error::{ + DecodeLogicalPlanSnafu, ExecuteLogicalPlanSnafu, GetRegionMetadataSnafu, HandleRegionRequestSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu, Result, + UnsupportedOutputSnafu, }; -#[derive(Default)] pub struct RegionServer { engines: HashMap, region_map: DashMap, + query_engine: QueryEngineRef, } impl RegionServer { - pub fn new() -> Self { - Self::default() + pub fn new(query_engine: QueryEngineRef) -> Self { + Self { + engines: HashMap::new(), + region_map: DashMap::new(), + query_engine, + } } pub fn register_engine(&mut self, engine: RegionEngineRef) { @@ -96,13 +121,37 @@ impl RegionServer { Ok(result) } - #[allow(unused_variables)] - pub fn handle_read( - &self, - region_id: RegionId, - plan: Bytes, - ) -> Result { - todo!() + pub async fn handle_read(&self, request: QueryRequest) -> Result { + // TODO(ruihang): add metrics and set trace id + + let QueryRequest { region_id, plan } = request; + let region_id = RegionId::from_u64(region_id); + + // build dummy catalog list + let engine = self + .region_map + .get(®ion_id) + .with_context(|| RegionNotFoundSnafu { region_id })? + .clone(); + let catalog_list = Arc::new(DummyCatalogList::new(region_id, engine).await?); + + // decode substrait plan to logical plan and execute it + let logical_plan = DFLogicalSubstraitConvertor + .decode(Bytes::from(plan), catalog_list, "", "") + .await + .context(DecodeLogicalPlanSnafu)?; + let result = self + .query_engine + .execute(logical_plan.into(), QueryContext::arc()) + .await + .context(ExecuteLogicalPlanSnafu)?; + + match result { + Output::AffectedRows(_) | Output::RecordBatches(_) => { + UnsupportedOutputSnafu { expected: "stream" }.fail() + } + Output::Stream(stream) => Ok(stream), + } } } @@ -112,13 +161,151 @@ enum RegionChange { Deregisters, } -#[allow(dead_code)] -struct DummyCatalogList {} +/// Resolve to the given region (specified by [RegionId]) unconditionally. +#[derive(Clone)] +struct DummyCatalogList { + catalog: DummyCatalogProvider, +} -#[allow(dead_code)] -#[allow(unused_variables)] impl DummyCatalogList { - pub fn new(region_id: RegionId) -> Self { - todo!() + pub async fn new(region_id: RegionId, engine: RegionEngineRef) -> Result { + let metadata = + engine + .get_metadata(region_id) + .await + .with_context(|_| GetRegionMetadataSnafu { + engine: engine.name(), + region_id, + })?; + let table_provider = DummyTableProvider { + region_id, + engine, + metadata, + scan_request: Default::default(), + }; + let schema_provider = DummySchemaProvider { + table: table_provider, + }; + let catalog_provider = DummyCatalogProvider { + schema: schema_provider, + }; + let catalog_list = Self { + catalog: catalog_provider, + }; + Ok(catalog_list) + } +} + +impl CatalogList for DummyCatalogList { + fn as_any(&self) -> &dyn Any { + self + } + + fn register_catalog( + &self, + _name: String, + _catalog: Arc, + ) -> Option> { + None + } + + fn catalog_names(&self) -> Vec { + vec![] + } + + fn catalog(&self, _name: &str) -> Option> { + Some(Arc::new(self.catalog.clone())) + } +} + +/// For [DummyCatalogList]. +#[derive(Clone)] +struct DummyCatalogProvider { + schema: DummySchemaProvider, +} + +impl CatalogProvider for DummyCatalogProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema_names(&self) -> Vec { + vec![] + } + + fn schema(&self, _name: &str) -> Option> { + Some(Arc::new(self.schema.clone())) + } +} + +/// For [DummyCatalogList]. +#[derive(Clone)] +struct DummySchemaProvider { + table: DummyTableProvider, +} + +#[async_trait] +impl SchemaProvider for DummySchemaProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn table_names(&self) -> Vec { + vec![] + } + + async fn table(&self, _name: &str) -> Option> { + Some(Arc::new(self.table.clone())) + } + + fn table_exist(&self, _name: &str) -> bool { + true + } +} + +/// For [TableProvider](datafusion::datasource::TableProvider) and [DummyCatalogList] +#[derive(Clone)] +struct DummyTableProvider { + region_id: RegionId, + engine: RegionEngineRef, + metadata: RegionMetadataRef, + /// Keeping a mutable request makes it possible to change in the optimize phase. + scan_request: Arc>, +} + +#[async_trait] +impl TableProvider for DummyTableProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.metadata.schema.arrow_schema().clone() + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + _state: &SessionState, + projection: Option<&Vec>, + filters: &[DfExpr], + limit: Option, + ) -> DfResult> { + let mut request = self.scan_request.lock().unwrap().clone(); + request.projection = projection.cloned(); + request.filters = filters.iter().map(|e| Expr::from(e.clone())).collect(); + request.limit = limit; + + let stream = self + .engine + .handle_query(self.region_id, request) + .await + .map_err(|e| DataFusionError::External(Box::new(e)))?; + Ok(Arc::new(DfPhysicalPlanAdapter(Arc::new( + StreamScanAdapter::new(stream), + )))) } } diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index 7751f8590e..2810436468 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -183,7 +183,7 @@ impl Services { let http_server = http_server_builder .with_metrics_handler(MetricsHandler) - .with_script_handler(instance.clone()) + .with_script_handler(instance) .with_configurator(plugins.get::()) .with_greptime_config_options(opts.to_toml_string()) .build(); diff --git a/src/query/src/plan.rs b/src/query/src/plan.rs index 14ff331122..4462302d98 100644 --- a/src/query/src/plan.rs +++ b/src/query/src/plan.rs @@ -87,3 +87,9 @@ impl LogicalPlan { .map(LogicalPlan::DfPlan) } } + +impl From for LogicalPlan { + fn from(plan: DfLogicalPlan) -> Self { + Self::DfPlan(plan) + } +} diff --git a/src/table/src/data_source.rs b/src/store-api/src/data_source.rs similarity index 86% rename from src/table/src/data_source.rs rename to src/store-api/src/data_source.rs index 5c0acd4e49..b178fb41af 100644 --- a/src/table/src/data_source.rs +++ b/src/store-api/src/data_source.rs @@ -14,16 +14,18 @@ use std::sync::Arc; +use common_error::ext::BoxedError; use common_recordbatch::SendableRecordBatchStream; -use store_api::storage::ScanRequest; -use crate::error::Result; +use crate::storage::ScanRequest; /// This trait represents a common data source abstraction which provides an interface /// for retrieving data in the form of a stream of record batches. pub trait DataSource { /// Retrieves a stream of record batches based on the provided scan request. - fn get_stream(&self, request: ScanRequest) -> Result; + fn get_stream(&self, request: ScanRequest) -> Result; } pub type DataSourceRef = Arc; + +pub type TableFactory = Arc DataSourceRef>; diff --git a/src/store-api/src/lib.rs b/src/store-api/src/lib.rs index 21cf732db5..dedc5f1cc9 100644 --- a/src/store-api/src/lib.rs +++ b/src/store-api/src/lib.rs @@ -15,6 +15,7 @@ //! Storage related APIs +pub mod data_source; pub mod logstore; pub mod manifest; pub mod metadata; diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index 7b60652832..4f2b54c039 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -16,18 +16,17 @@ use std::sync::Arc; -use api::v1::QueryRequest; use async_trait::async_trait; use common_error::ext::BoxedError; use common_query::Output; use common_recordbatch::SendableRecordBatchStream; -use crate::metadata::RegionMetadata; +use crate::metadata::RegionMetadataRef; use crate::region_request::RegionRequest; -use crate::storage::RegionId; +use crate::storage::{RegionId, ScanRequest}; #[async_trait] -pub trait RegionEngine { +pub trait RegionEngine: Send + Sync { /// Name of this engine fn name(&self) -> &str; @@ -44,11 +43,11 @@ pub trait RegionEngine { async fn handle_query( &self, region_id: RegionId, - request: QueryRequest, + request: ScanRequest, ) -> Result; /// Retrieve region's metadata. - async fn get_metadata(&self, region_id: RegionId) -> Result; + async fn get_metadata(&self, region_id: RegionId) -> Result; } pub type RegionEngineRef = Arc; diff --git a/src/table/src/lib.rs b/src/table/src/lib.rs index edfdc2af1b..fa2fb5d5b1 100644 --- a/src/table/src/lib.rs +++ b/src/table/src/lib.rs @@ -13,7 +13,6 @@ // limitations under the License. #![feature(assert_matches)] -pub mod data_source; pub mod engine; pub mod error; pub mod metadata; diff --git a/src/table/src/table/scan.rs b/src/table/src/table/scan.rs index 9bb5696746..01b57879cf 100644 --- a/src/table/src/table/scan.rs +++ b/src/table/src/table/scan.rs @@ -30,7 +30,7 @@ use datatypes::schema::SchemaRef; use futures::{Stream, StreamExt}; use snafu::OptionExt; -/// Adapt greptime's [SendableRecordBatchStream] to DataFusion's [PhysicalPlan]. +/// Adapt greptime's [SendableRecordBatchStream] to GreptimeDB's [PhysicalPlan]. pub struct StreamScanAdapter { stream: Mutex>, schema: SchemaRef, diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index fc802873d7..c963d06a42 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -543,7 +543,7 @@ pub async fn setup_test_prom_app_with_frontend( .with_grpc_handler(ServerGrpcQueryHandlerAdaptor::arc(frontend_ref.clone())) .with_script_handler(frontend_ref.clone()) .with_prom_handler(frontend_ref.clone()) - .with_prometheus_handler(frontend_ref.clone()) + .with_prometheus_handler(frontend_ref) .with_greptime_config_options(opts.to_toml_string()) .build(); let app = http_server.build(http_server.make_app());