feat: implement query API for RegionServer (#2197)

* some initial change

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* impl dummy structs

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* decode and send logical plan

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* implement table scan

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add some comments

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2023-08-17 19:02:31 +08:00
committed by GitHub
parent 6e6ff5a606
commit 4aaf6aa51b
19 changed files with 274 additions and 66 deletions

3
Cargo.lock generated
View File

@@ -2629,6 +2629,7 @@ dependencies = [
"axum",
"axum-macros",
"axum-test-helper",
"bytes",
"catalog",
"client",
"common-base",
@@ -4137,7 +4138,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=fa79839bbb304e0e7097e1cc4bcac9a63b3e496a#fa79839bbb304e0e7097e1cc4bcac9a63b3e496a"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=c30a2607be4044502094b25c408171a666a8ff6d#c30a2607be4044502094b25c408171a666a8ff6d"
dependencies = [
"prost",
"serde",

View File

@@ -77,7 +77,7 @@ datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git
derive_builder = "0.12"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "fa79839bbb304e0e7097e1cc4bcac9a63b3e496a" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "c30a2607be4044502094b25c408171a666a8ff6d" }
itertools = "0.10"
lazy_static = "1.4"
once_cell = "1.18"

View File

@@ -29,8 +29,8 @@ use common_recordbatch::{RecordBatchStreamAdaptor, SendableRecordBatchStream};
use datatypes::schema::SchemaRef;
use futures_util::StreamExt;
use snafu::ResultExt;
use store_api::data_source::{DataSource, TableFactory};
use store_api::storage::{ScanRequest, TableId};
use table::data_source::DataSource;
use table::error::{SchemaConversionSnafu, TablesRecordBatchSnafu};
use table::metadata::{TableIdent, TableInfoBuilder, TableMetaBuilder, TableType};
use table::{Result as TableResult, Table, TableRef};
@@ -38,7 +38,6 @@ use table::{Result as TableResult, Table, TableRef};
use self::columns::InformationSchemaColumns;
use crate::error::Result;
use crate::information_schema::tables::InformationSchemaTables;
use crate::table_factory::TableFactory;
use crate::CatalogManager;
pub const TABLES: &str = "tables";
@@ -219,18 +218,22 @@ impl Table for InformationTable {
}
async fn scan_to_stream(&self, request: ScanRequest) -> TableResult<SendableRecordBatchStream> {
self.get_stream(request)
self.get_stream(request).context(TablesRecordBatchSnafu)
}
}
impl DataSource for InformationTable {
fn get_stream(&self, request: ScanRequest) -> TableResult<SendableRecordBatchStream> {
fn get_stream(
&self,
request: ScanRequest,
) -> std::result::Result<SendableRecordBatchStream, BoxedError> {
let projection = request.projection;
let projected_schema = if let Some(projection) = &projection {
Arc::new(
self.schema()
.try_project(projection)
.context(SchemaConversionSnafu)?,
.context(SchemaConversionSnafu)
.map_err(BoxedError::new)?,
)
} else {
self.schema()
@@ -239,7 +242,8 @@ impl DataSource for InformationTable {
.stream_builder
.to_stream()
.map_err(BoxedError::new)
.context(TablesRecordBatchSnafu)?
.context(TablesRecordBatchSnafu)
.map_err(BoxedError::new)?
.map(move |batch| {
batch.and_then(|batch| {
if let Some(projection) = &projection {

View File

@@ -37,7 +37,6 @@ pub mod local;
mod metrics;
pub mod remote;
pub mod system;
pub mod table_factory;
pub mod table_source;
pub mod tables;

View File

@@ -1,19 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use table::data_source::DataSourceRef;
pub type TableFactory = Arc<dyn Fn() -> DataSourceRef>;

View File

@@ -157,7 +157,7 @@ impl TableInfoManager {
.when(vec![Compare::with_value(
raw_key.clone(),
CompareOp::Equal,
raw_value.clone(),
raw_value,
)])
.and_then(vec![TxnOp::Put(
raw_key.clone(),
@@ -180,7 +180,7 @@ impl TableInfoManager {
let removed_key = to_removed_key(&String::from_utf8_lossy(&raw_key));
let txn = Txn::new().and_then(vec![
TxnOp::Delete(raw_key.clone()),
TxnOp::Delete(raw_key),
TxnOp::Put(removed_key.into_bytes(), raw_value),
]);

View File

@@ -142,7 +142,7 @@ impl TableRouteManager {
.when(vec![Compare::with_value(
raw_key.clone(),
CompareOp::Equal,
raw_value.clone(),
raw_value,
)])
.and_then(vec![TxnOp::Put(raw_key.clone(), new_raw_value)])
.or_else(vec![TxnOp::Get(raw_key.clone())]);

View File

@@ -75,7 +75,8 @@ impl<T: Unpin + AsyncFileReader + Send + 'static> Stream for ParquetRecordBatchS
}
}
/// Greptime SendableRecordBatchStream -> DataFusion RecordBatchStream
/// Greptime SendableRecordBatchStream -> DataFusion RecordBatchStream.
/// The reverse one is [RecordBatchStreamAdapter].
pub struct DfRecordBatchStreamAdapter {
stream: SendableRecordBatchStream,
}
@@ -112,7 +113,8 @@ impl Stream for DfRecordBatchStreamAdapter {
}
}
/// DataFusion [SendableRecordBatchStream](DfSendableRecordBatchStream) -> Greptime [RecordBatchStream]
/// DataFusion [SendableRecordBatchStream](DfSendableRecordBatchStream) -> Greptime [RecordBatchStream].
/// The reverse one is [DfRecordBatchStreamAdapter]
pub struct RecordBatchStreamAdapter {
schema: SchemaRef,
stream: DfSendableRecordBatchStream,

View File

@@ -14,6 +14,7 @@ async-stream.workspace = true
async-trait.workspace = true
axum = "0.6"
axum-macros = "0.3"
bytes = "1.1"
catalog = { workspace = true }
common-base = { workspace = true }
common-catalog = { workspace = true }

View File

@@ -506,6 +506,30 @@ pub enum Error {
#[snafu(display("Unsupported gRPC request, kind: {}, location: {}", kind, location))]
UnsupportedGrpcRequest { kind: String, location: Location },
#[snafu(display(
"Unsupported output type, expected: {}, location: {}",
expected,
location
))]
UnsupportedOutput {
expected: String,
location: Location,
},
#[snafu(display(
"Failed to get metadata from engine {} for region_id {}, location: {}, source: {}",
engine,
region_id,
location,
source
))]
GetRegionMetadata {
engine: String,
region_id: RegionId,
location: Location,
source: BoxedError,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -585,7 +609,9 @@ impl ErrorExt for Error {
| CloseTableEngine { .. }
| JoinTask { .. }
| RegionNotFound { .. }
| RegionEngineNotFound { .. } => StatusCode::Internal,
| RegionEngineNotFound { .. }
| UnsupportedOutput { .. }
| GetRegionMetadata { .. } => StatusCode::Internal,
StartServer { source, .. }
| ShutdownServer { source, .. }

View File

@@ -12,31 +12,56 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use common_base::bytes::Bytes;
use common_query::Output;
use api::v1::region::QueryRequest;
use async_trait::async_trait;
use bytes::Bytes;
use common_query::logical_plan::Expr;
use common_query::physical_plan::DfPhysicalPlanAdapter;
use common_query::{DfPhysicalPlan, Output};
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::info;
use dashmap::DashMap;
use datafusion::catalog::schema::SchemaProvider;
use datafusion::catalog::{CatalogList, CatalogProvider};
use datafusion::datasource::TableProvider;
use datafusion::error::Result as DfResult;
use datafusion::execution::context::SessionState;
use datafusion_common::DataFusionError;
use datafusion_expr::{Expr as DfExpr, TableType};
use datatypes::arrow::datatypes::SchemaRef;
use query::QueryEngineRef;
use session::context::QueryContext;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::RegionEngineRef;
use store_api::region_request::RegionRequest;
use store_api::storage::RegionId;
use store_api::storage::{RegionId, ScanRequest};
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use table::table::scan::StreamScanAdapter;
use crate::error::{
DecodeLogicalPlanSnafu, ExecuteLogicalPlanSnafu, GetRegionMetadataSnafu,
HandleRegionRequestSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu, Result,
UnsupportedOutputSnafu,
};
#[derive(Default)]
pub struct RegionServer {
engines: HashMap<String, RegionEngineRef>,
region_map: DashMap<RegionId, RegionEngineRef>,
query_engine: QueryEngineRef,
}
impl RegionServer {
pub fn new() -> Self {
Self::default()
pub fn new(query_engine: QueryEngineRef) -> Self {
Self {
engines: HashMap::new(),
region_map: DashMap::new(),
query_engine,
}
}
pub fn register_engine(&mut self, engine: RegionEngineRef) {
@@ -96,13 +121,37 @@ impl RegionServer {
Ok(result)
}
#[allow(unused_variables)]
pub fn handle_read(
&self,
region_id: RegionId,
plan: Bytes,
) -> Result<SendableRecordBatchStream> {
todo!()
pub async fn handle_read(&self, request: QueryRequest) -> Result<SendableRecordBatchStream> {
// TODO(ruihang): add metrics and set trace id
let QueryRequest { region_id, plan } = request;
let region_id = RegionId::from_u64(region_id);
// build dummy catalog list
let engine = self
.region_map
.get(&region_id)
.with_context(|| RegionNotFoundSnafu { region_id })?
.clone();
let catalog_list = Arc::new(DummyCatalogList::new(region_id, engine).await?);
// decode substrait plan to logical plan and execute it
let logical_plan = DFLogicalSubstraitConvertor
.decode(Bytes::from(plan), catalog_list, "", "")
.await
.context(DecodeLogicalPlanSnafu)?;
let result = self
.query_engine
.execute(logical_plan.into(), QueryContext::arc())
.await
.context(ExecuteLogicalPlanSnafu)?;
match result {
Output::AffectedRows(_) | Output::RecordBatches(_) => {
UnsupportedOutputSnafu { expected: "stream" }.fail()
}
Output::Stream(stream) => Ok(stream),
}
}
}
@@ -112,13 +161,151 @@ enum RegionChange {
Deregisters,
}
#[allow(dead_code)]
struct DummyCatalogList {}
/// Resolve to the given region (specified by [RegionId]) unconditionally.
#[derive(Clone)]
struct DummyCatalogList {
catalog: DummyCatalogProvider,
}
#[allow(dead_code)]
#[allow(unused_variables)]
impl DummyCatalogList {
pub fn new(region_id: RegionId) -> Self {
todo!()
pub async fn new(region_id: RegionId, engine: RegionEngineRef) -> Result<Self> {
let metadata =
engine
.get_metadata(region_id)
.await
.with_context(|_| GetRegionMetadataSnafu {
engine: engine.name(),
region_id,
})?;
let table_provider = DummyTableProvider {
region_id,
engine,
metadata,
scan_request: Default::default(),
};
let schema_provider = DummySchemaProvider {
table: table_provider,
};
let catalog_provider = DummyCatalogProvider {
schema: schema_provider,
};
let catalog_list = Self {
catalog: catalog_provider,
};
Ok(catalog_list)
}
}
impl CatalogList for DummyCatalogList {
fn as_any(&self) -> &dyn Any {
self
}
fn register_catalog(
&self,
_name: String,
_catalog: Arc<dyn CatalogProvider>,
) -> Option<Arc<dyn CatalogProvider>> {
None
}
fn catalog_names(&self) -> Vec<String> {
vec![]
}
fn catalog(&self, _name: &str) -> Option<Arc<dyn CatalogProvider>> {
Some(Arc::new(self.catalog.clone()))
}
}
/// For [DummyCatalogList].
#[derive(Clone)]
struct DummyCatalogProvider {
schema: DummySchemaProvider,
}
impl CatalogProvider for DummyCatalogProvider {
fn as_any(&self) -> &dyn Any {
self
}
fn schema_names(&self) -> Vec<String> {
vec![]
}
fn schema(&self, _name: &str) -> Option<Arc<dyn SchemaProvider>> {
Some(Arc::new(self.schema.clone()))
}
}
/// For [DummyCatalogList].
#[derive(Clone)]
struct DummySchemaProvider {
table: DummyTableProvider,
}
#[async_trait]
impl SchemaProvider for DummySchemaProvider {
fn as_any(&self) -> &dyn Any {
self
}
fn table_names(&self) -> Vec<String> {
vec![]
}
async fn table(&self, _name: &str) -> Option<Arc<dyn TableProvider>> {
Some(Arc::new(self.table.clone()))
}
fn table_exist(&self, _name: &str) -> bool {
true
}
}
/// For [TableProvider](datafusion::datasource::TableProvider) and [DummyCatalogList]
#[derive(Clone)]
struct DummyTableProvider {
region_id: RegionId,
engine: RegionEngineRef,
metadata: RegionMetadataRef,
/// Keeping a mutable request makes it possible to change in the optimize phase.
scan_request: Arc<Mutex<ScanRequest>>,
}
#[async_trait]
impl TableProvider for DummyTableProvider {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.metadata.schema.arrow_schema().clone()
}
fn table_type(&self) -> TableType {
TableType::Base
}
async fn scan(
&self,
_state: &SessionState,
projection: Option<&Vec<usize>>,
filters: &[DfExpr],
limit: Option<usize>,
) -> DfResult<Arc<dyn DfPhysicalPlan>> {
let mut request = self.scan_request.lock().unwrap().clone();
request.projection = projection.cloned();
request.filters = filters.iter().map(|e| Expr::from(e.clone())).collect();
request.limit = limit;
let stream = self
.engine
.handle_query(self.region_id, request)
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;
Ok(Arc::new(DfPhysicalPlanAdapter(Arc::new(
StreamScanAdapter::new(stream),
))))
}
}

View File

@@ -183,7 +183,7 @@ impl Services {
let http_server = http_server_builder
.with_metrics_handler(MetricsHandler)
.with_script_handler(instance.clone())
.with_script_handler(instance)
.with_configurator(plugins.get::<ConfiguratorRef>())
.with_greptime_config_options(opts.to_toml_string())
.build();

View File

@@ -87,3 +87,9 @@ impl LogicalPlan {
.map(LogicalPlan::DfPlan)
}
}
impl From<DfLogicalPlan> for LogicalPlan {
fn from(plan: DfLogicalPlan) -> Self {
Self::DfPlan(plan)
}
}

View File

@@ -14,16 +14,18 @@
use std::sync::Arc;
use common_error::ext::BoxedError;
use common_recordbatch::SendableRecordBatchStream;
use store_api::storage::ScanRequest;
use crate::error::Result;
use crate::storage::ScanRequest;
/// This trait represents a common data source abstraction which provides an interface
/// for retrieving data in the form of a stream of record batches.
pub trait DataSource {
/// Retrieves a stream of record batches based on the provided scan request.
fn get_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream>;
fn get_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream, BoxedError>;
}
pub type DataSourceRef = Arc<dyn DataSource>;
pub type TableFactory = Arc<dyn Fn() -> DataSourceRef>;

View File

@@ -15,6 +15,7 @@
//! Storage related APIs
pub mod data_source;
pub mod logstore;
pub mod manifest;
pub mod metadata;

View File

@@ -16,18 +16,17 @@
use std::sync::Arc;
use api::v1::QueryRequest;
use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_query::Output;
use common_recordbatch::SendableRecordBatchStream;
use crate::metadata::RegionMetadata;
use crate::metadata::RegionMetadataRef;
use crate::region_request::RegionRequest;
use crate::storage::RegionId;
use crate::storage::{RegionId, ScanRequest};
#[async_trait]
pub trait RegionEngine {
pub trait RegionEngine: Send + Sync {
/// Name of this engine
fn name(&self) -> &str;
@@ -44,11 +43,11 @@ pub trait RegionEngine {
async fn handle_query(
&self,
region_id: RegionId,
request: QueryRequest,
request: ScanRequest,
) -> Result<SendableRecordBatchStream, BoxedError>;
/// Retrieve region's metadata.
async fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadata, BoxedError>;
async fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef, BoxedError>;
}
pub type RegionEngineRef = Arc<dyn RegionEngine>;

View File

@@ -13,7 +13,6 @@
// limitations under the License.
#![feature(assert_matches)]
pub mod data_source;
pub mod engine;
pub mod error;
pub mod metadata;

View File

@@ -30,7 +30,7 @@ use datatypes::schema::SchemaRef;
use futures::{Stream, StreamExt};
use snafu::OptionExt;
/// Adapt greptime's [SendableRecordBatchStream] to DataFusion's [PhysicalPlan].
/// Adapt greptime's [SendableRecordBatchStream] to GreptimeDB's [PhysicalPlan].
pub struct StreamScanAdapter {
stream: Mutex<Option<SendableRecordBatchStream>>,
schema: SchemaRef,

View File

@@ -543,7 +543,7 @@ pub async fn setup_test_prom_app_with_frontend(
.with_grpc_handler(ServerGrpcQueryHandlerAdaptor::arc(frontend_ref.clone()))
.with_script_handler(frontend_ref.clone())
.with_prom_handler(frontend_ref.clone())
.with_prometheus_handler(frontend_ref.clone())
.with_prometheus_handler(frontend_ref)
.with_greptime_config_options(opts.to_toml_string())
.build();
let app = http_server.build(http_server.make_app());