feat: distributed execute gRPC and Prometheus query in Frontend (#520)

* feat: distributed execute GRPC and Prometheus query in Frontend

* feat: distributed execute GRPC and Prometheus query in Frontend

* Apply suggestions from code review

Co-authored-by: Lei, Huang <6406592+v0y4g3r@users.noreply.github.com>

* feat: distributed execute GRPC and Prometheus query in Frontend

* fix: do not convert timestamp to string when converting logical plan to SQL

* fix: tests

* refactor: no mock

* refactor: 0.0.0.0 -> 127.0.0.1

* refactor: 0.0.0.0 -> 127.0.0.1

* refactor: 0.0.0.0 -> 127.0.0.1

Co-authored-by: luofucong <luofucong@greptime.com>
Co-authored-by: Lei, Huang <6406592+v0y4g3r@users.noreply.github.com>
This commit is contained in:
LFC
2022-11-16 14:59:48 +08:00
committed by GitHub
parent ce11a64fe2
commit 872ac8058f
37 changed files with 272 additions and 146 deletions

3
Cargo.lock generated
View File

@@ -1179,10 +1179,13 @@ dependencies = [
"async-trait",
"common-base",
"common-error",
"common-query",
"common-recordbatch",
"common-runtime",
"criterion 0.4.0",
"dashmap",
"datafusion",
"datatypes",
"rand 0.8.5",
"snafu",
"tokio",

View File

@@ -1,9 +1,9 @@
node_id = 42
mode = 'distributed'
rpc_addr = '0.0.0.0:3001'
rpc_addr = '127.0.0.1:3001'
wal_dir = '/tmp/greptimedb/wal'
rpc_runtime_size = 8
mysql_addr = '0.0.0.0:3306'
mysql_addr = '127.0.0.1:3306'
mysql_runtime_size = 4
[storage]

View File

@@ -1,6 +1,6 @@
mode = 'distributed'
datanode_rpc_addr = '127.0.0.1:3001'
http_addr = '0.0.0.0:4000'
http_addr = '127.0.0.1:4000'
[meta_client_opts]
metasrv_addr = '1.1.1.1:3002'

View File

@@ -1,4 +1,4 @@
bind_addr = '127.0.0.1:3002'
server_addr = '0.0.0.0:3002'
store_addr = '127.0.0.1:2380'
datanode_lease_secs = 30
server_addr = '127.0.0.1:3002'
store_addr = '127.0.0.1:2379'
datanode_lease_secs = 15

View File

@@ -1,7 +1,7 @@
node_id = 0
mode = 'standalone'
http_addr = '0.0.0.0:4000'
datanode_mysql_addr = '0.0.0.0:3306'
http_addr = '127.0.0.1:4000'
datanode_mysql_addr = '127.0.0.1:3306'
datanode_mysql_runtime_size = 4
wal_dir = '/tmp/greptimedb/wal/'
@@ -10,18 +10,18 @@ type = 'File'
data_dir = '/tmp/greptimedb/data/'
[grpc_options]
addr = '0.0.0.0:4001'
addr = '127.0.0.1:4001'
runtime_size = 8
[mysql_options]
addr = '0.0.0.0:4002'
addr = '127.0.0.1:4002'
runtime_size = 2
[influxdb_options]
enable = true
[opentsdb_options]
addr = '0.0.0.0:4242'
addr = '127.0.0.1:4242'
enable = true
runtime_size = 2
@@ -29,6 +29,6 @@ runtime_size = 2
enable = true
[postgres_options]
addr = '0.0.0.0:4003'
addr = '127.0.0.1:4003'
runtime_size = 2
check_pwd = false

View File

@@ -15,6 +15,7 @@
use std::sync::Arc;
use api::v1::codec::SelectResult as GrpcSelectResult;
use api::v1::column::SemanticType;
use api::v1::{
object_expr, object_result, select_expr, DatabaseRequest, ExprHeader, InsertExpr,
MutateResult as GrpcMutateResult, ObjectExpr, ObjectResult as GrpcObjectResult, PhysicalPlan,
@@ -219,7 +220,12 @@ impl TryFrom<ObjectResult> for Output {
.map(|(column, vector)| {
let datatype = vector.data_type();
// nullable or not, does not affect the output
ColumnSchema::new(&column.column_name, datatype, true)
let mut column_schema =
ColumnSchema::new(&column.column_name, datatype, true);
if column.semantic_type == SemanticType::Timestamp as i32 {
column_schema = column_schema.with_time_index(true);
}
column_schema
})
.collect::<Vec<ColumnSchema>>();
@@ -251,7 +257,7 @@ impl TryFrom<ObjectResult> for Output {
mod tests {
use api::helper::ColumnDataTypeWrapper;
use api::v1::Column;
use datanode::server::grpc::select::{null_mask, values};
use common_grpc::select::{null_mask, values};
use datatypes::vectors::{
BinaryVector, BooleanVector, DateTimeVector, DateVector, Float32Vector, Float64Vector,
Int16Vector, Int32Vector, Int64Vector, Int8Vector, StringVector, UInt16Vector,

View File

@@ -145,9 +145,9 @@ mod tests {
)),
};
let options: DatanodeOptions = cmd.try_into().unwrap();
assert_eq!("0.0.0.0:3001".to_string(), options.rpc_addr);
assert_eq!("127.0.0.1:3001".to_string(), options.rpc_addr);
assert_eq!("/tmp/greptimedb/wal".to_string(), options.wal_dir);
assert_eq!("0.0.0.0:3306".to_string(), options.mysql_addr);
assert_eq!("127.0.0.1:3306".to_string(), options.mysql_addr);
assert_eq!(4, options.mysql_runtime_size);
assert_eq!(
"1.1.1.1:3002".to_string(),

View File

@@ -104,13 +104,13 @@ mod tests {
fn test_read_from_cmd() {
let cmd = StartCommand {
bind_addr: Some("127.0.0.1:3002".to_string()),
server_addr: Some("0.0.0.0:3002".to_string()),
server_addr: Some("127.0.0.1:3002".to_string()),
store_addr: Some("127.0.0.1:2380".to_string()),
config_file: None,
};
let options: MetaSrvOptions = cmd.try_into().unwrap();
assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr);
assert_eq!("0.0.0.0:3002".to_string(), options.server_addr);
assert_eq!("127.0.0.1:3002".to_string(), options.server_addr);
assert_eq!("127.0.0.1:2380".to_string(), options.store_addr);
}
@@ -127,8 +127,8 @@ mod tests {
};
let options: MetaSrvOptions = cmd.try_into().unwrap();
assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr);
assert_eq!("0.0.0.0:3002".to_string(), options.server_addr);
assert_eq!("127.0.0.1:2380".to_string(), options.store_addr);
assert_eq!(30, options.datanode_lease_secs);
assert_eq!("127.0.0.1:3002".to_string(), options.server_addr);
assert_eq!("127.0.0.1:2379".to_string(), options.store_addr);
assert_eq!(15, options.datanode_lease_secs);
}
}

View File

@@ -77,7 +77,7 @@ pub struct StandaloneOptions {
impl Default for StandaloneOptions {
fn default() -> Self {
Self {
http_addr: Some("0.0.0.0:4000".to_string()),
http_addr: Some("127.0.0.1:4000".to_string()),
grpc_options: Some(GrpcOptions::default()),
mysql_options: Some(MysqlOptions::default()),
postgres_options: Some(PostgresOptions::default()),
@@ -87,7 +87,7 @@ impl Default for StandaloneOptions {
mode: Mode::Standalone,
wal_dir: "/tmp/greptimedb/wal".to_string(),
storage: ObjectStoreConfig::default(),
datanode_mysql_addr: "0.0.0.0:3306".to_string(),
datanode_mysql_addr: "127.0.0.1:3306".to_string(),
datanode_mysql_runtime_size: 4,
}
}
@@ -274,12 +274,15 @@ mod tests {
let fe_opts = FrontendOptions::try_from(cmd).unwrap();
assert_eq!(Mode::Standalone, fe_opts.mode);
assert_eq!("127.0.0.1:3001".to_string(), fe_opts.datanode_rpc_addr);
assert_eq!(Some("0.0.0.0:4000".to_string()), fe_opts.http_addr);
assert_eq!(Some("127.0.0.1:4000".to_string()), fe_opts.http_addr);
assert_eq!(
"0.0.0.0:4001".to_string(),
"127.0.0.1:4001".to_string(),
fe_opts.grpc_options.unwrap().addr
);
assert_eq!("0.0.0.0:4002", fe_opts.mysql_options.as_ref().unwrap().addr);
assert_eq!(
"127.0.0.1:4002",
fe_opts.mysql_options.as_ref().unwrap().addr
);
assert_eq!(2, fe_opts.mysql_options.as_ref().unwrap().runtime_size);
assert!(fe_opts.influxdb_options.as_ref().unwrap().enable);
}

View File

@@ -9,7 +9,10 @@ api = { path = "../../api" }
async-trait = "0.1"
common-base = { path = "../base" }
common-error = { path = "../error" }
common-query = { path = "../query" }
common-recordbatch = { path = "../recordbatch" }
common-runtime = { path = "../runtime" }
datatypes = { path = "../../datatypes" }
dashmap = "5.4"
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = [
"simd",

View File

@@ -69,6 +69,21 @@ pub enum Error {
source: tonic::transport::Error,
backtrace: Backtrace,
},
#[snafu(display("Failed to collect RecordBatches, source: {}", source))]
CollectRecordBatches {
#[snafu(backtrace)]
source: common_recordbatch::error::Error,
},
#[snafu(display("Failed to convert Arrow type: {}", from))]
Conversion { from: String, backtrace: Backtrace },
#[snafu(display("Column datatype error, source: {}", source))]
ColumnDataType {
#[snafu(backtrace)]
source: api::error::Error,
},
}
impl ErrorExt for Error {
@@ -83,7 +98,10 @@ impl ErrorExt for Error {
}
Error::NewProjection { .. }
| Error::DecodePhysicalPlanNode { .. }
| Error::CreateChannel { .. } => StatusCode::Internal,
| Error::CreateChannel { .. }
| Error::Conversion { .. } => StatusCode::Internal,
Error::CollectRecordBatches { source } => source.status_code(),
Error::ColumnDataType { source } => source.status_code(),
}
}

View File

@@ -15,6 +15,7 @@
pub mod channel_manager;
pub mod error;
pub mod physical;
pub mod select;
pub mod writer;
pub use error::Error;

View File

@@ -21,6 +21,7 @@ use api::v1::column::{SemanticType, Values};
use api::v1::{Column, ObjectResult};
use arrow::array::{Array, BooleanArray, PrimitiveArray};
use common_base::BitVec;
use common_error::prelude::ErrorExt;
use common_error::status_code::StatusCode;
use common_query::Output;
use common_recordbatch::{util, RecordBatches, SendableRecordBatchStream};
@@ -30,7 +31,7 @@ use snafu::{OptionExt, ResultExt};
use crate::error::{self, ConversionSnafu, Result};
pub async fn to_object_result(output: Result<Output>) -> ObjectResult {
pub async fn to_object_result(output: std::result::Result<Output, impl ErrorExt>) -> ObjectResult {
let result = match output {
Ok(Output::AffectedRows(rows)) => Ok(ObjectResultBuilder::new()
.status_code(StatusCode::Success as u32)
@@ -208,7 +209,7 @@ mod tests {
use datatypes::schema::Schema;
use datatypes::vectors::{UInt32Vector, VectorRef};
use crate::server::grpc::select::{null_mask, try_convert, values};
use crate::select::{null_mask, try_convert, values};
#[test]
fn test_convert_record_batches_to_select_result() {

View File

@@ -114,6 +114,12 @@ pub enum InnerError {
#[snafu(backtrace)]
source: DataTypeError,
},
#[snafu(display("Failed to execute physical plan, source: {}", source))]
ExecutePhysicalPlan {
#[snafu(backtrace)]
source: BoxedError,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -141,6 +147,7 @@ impl ErrorExt for InnerError {
InnerError::UnsupportedInputDataType { .. } => StatusCode::InvalidArguments,
InnerError::ConvertDfRecordBatchStream { source, .. } => source.status_code(),
InnerError::ExecutePhysicalPlan { source } => source.status_code(),
}
}
@@ -165,6 +172,12 @@ impl From<Error> for DataFusionError {
}
}
impl From<BoxedError> for Error {
fn from(source: BoxedError) -> Self {
InnerError::ExecutePhysicalPlan { source }.into()
}
}
#[cfg(test)]
mod tests {
use arrow::error::ArrowError;

View File

@@ -55,9 +55,9 @@ impl Default for DatanodeOptions {
fn default() -> Self {
Self {
node_id: 0,
rpc_addr: "0.0.0.0:3001".to_string(),
rpc_addr: "127.0.0.1:3001".to_string(),
rpc_runtime_size: 8,
mysql_addr: "0.0.0.0:3306".to_string(),
mysql_addr: "127.0.0.1:3306".to_string(),
mysql_runtime_size: 2,
meta_client_opts: MetaClientOpts::default(),
wal_dir: "/tmp/greptimedb/wal".to_string(),

View File

@@ -145,9 +145,6 @@ pub enum Error {
backtrace: Backtrace,
},
#[snafu(display("Failed to convert datafusion type: {}", from))]
Conversion { from: String },
#[snafu(display("Unsupported expr type: {}", name))]
UnsupportedExpr { name: String },
@@ -229,12 +226,6 @@ pub enum Error {
source: script::error::Error,
},
#[snafu(display("Failed to collect RecordBatches, source: {}", source))]
CollectRecordBatches {
#[snafu(backtrace)]
source: common_recordbatch::error::Error,
},
#[snafu(display(
"Failed to parse string to timestamp, string: {}, source: {}",
raw,
@@ -338,7 +329,6 @@ impl ErrorExt for Error {
| Error::CreateDir { .. }
| Error::InsertSystemCatalog { .. }
| Error::RegisterSchema { .. }
| Error::Conversion { .. }
| Error::IntoPhysicalPlan { .. }
| Error::UnsupportedExpr { .. }
| Error::ColumnDataType { .. }
@@ -349,8 +339,6 @@ impl ErrorExt for Error {
Error::StartScriptManager { source } => source.status_code(),
Error::OpenStorageEngine { source } => source.status_code(),
Error::RuntimeResource { .. } => StatusCode::RuntimeResourcesExhausted,
Error::CollectRecordBatches { source } => source.status_code(),
Error::MetaClientInit { source, .. } => source.status_code(),
Error::InsertData { source, .. } => source.status_code(),
Error::EmptyInsertBatch => StatusCode::InvalidArguments,

View File

@@ -21,6 +21,7 @@ use async_trait::async_trait;
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_grpc::select::to_object_result;
use common_insert::insertion_expr_to_request;
use common_query::Output;
use query::plan::LogicalPlan;
@@ -36,7 +37,6 @@ use crate::error::{
};
use crate::instance::Instance;
use crate::server::grpc::plan::PhysicalPlanner;
use crate::server::grpc::select::to_object_result;
impl Instance {
pub async fn execute_grpc_insert(

View File

@@ -14,4 +14,3 @@
mod ddl;
pub(crate) mod plan;
pub mod select;

View File

@@ -70,7 +70,7 @@ impl ConcreteDataType {
matches!(self, ConcreteDataType::Boolean(_))
}
pub fn is_string(&self) -> bool {
pub fn stringifiable(&self) -> bool {
matches!(
self,
ConcreteDataType::String(_)

View File

@@ -389,6 +389,30 @@ pub enum Error {
#[snafu(backtrace)]
source: datatypes::error::Error,
},
#[snafu(display("Failed to build DataFusion logical plan, source: {}", source))]
BuildDfLogicalPlan {
source: datafusion_common::DataFusionError,
backtrace: Backtrace,
},
#[snafu(display("Failed to convert Arrow schema, source: {}", source))]
ConvertArrowSchema {
#[snafu(backtrace)]
source: datatypes::error::Error,
},
#[snafu(display("Failed to collect Recordbatch stream, source: {}", source))]
CollectRecordbatchStream {
#[snafu(backtrace)]
source: common_recordbatch::error::Error,
},
#[snafu(display("Failed to create Recordbatches, source: {}", source))]
CreateRecordbatches {
#[snafu(backtrace)]
source: common_recordbatch::error::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -418,7 +442,8 @@ impl ErrorExt for Error {
Error::ConvertColumnDefaultConstraint { source, .. }
| Error::ConvertScalarValue { source, .. }
| Error::VectorComputation { source } => source.status_code(),
| Error::VectorComputation { source }
| Error::ConvertArrowSchema { source } => source.status_code(),
Error::ConnectDatanode { source, .. }
| Error::RequestDatanode { source }
@@ -434,7 +459,8 @@ impl ErrorExt for Error {
| Error::FindLeaderPeer { .. }
| Error::FindRegionPartition { .. }
| Error::IllegalTableRoutesData { .. }
| Error::UnsupportedExpr { .. } => StatusCode::Internal,
| Error::UnsupportedExpr { .. }
| Error::BuildDfLogicalPlan { .. } => StatusCode::Internal,
Error::IllegalFrontendState { .. } | Error::IncompleteGrpcResult { .. } => {
StatusCode::Unexpected
@@ -467,6 +493,9 @@ impl ErrorExt for Error {
Error::ExecuteSql { source, .. } => source.status_code(),
Error::InsertBatchToRequest { source, .. } => source.status_code(),
Error::CreateDatabase { source, .. } => source.status_code(),
Error::CollectRecordbatchStream { source } | Error::CreateRecordbatches { source } => {
source.status_code()
}
}
}

View File

@@ -44,7 +44,7 @@ pub struct FrontendOptions {
impl Default for FrontendOptions {
fn default() -> Self {
Self {
http_addr: Some("0.0.0.0:4000".to_string()),
http_addr: Some("127.0.0.1:4000".to_string()),
grpc_options: Some(GrpcOptions::default()),
mysql_options: Some(MysqlOptions::default()),
postgres_options: Some(PostgresOptions::default()),

View File

@@ -23,7 +23,7 @@ pub struct GrpcOptions {
impl Default for GrpcOptions {
fn default() -> Self {
Self {
addr: "0.0.0.0:4001".to_string(),
addr: "127.0.0.1:4001".to_string(),
runtime_size: 8,
}
}

View File

@@ -26,8 +26,8 @@ use api::v1::alter_expr::Kind;
use api::v1::codec::InsertBatch;
use api::v1::object_expr::Expr;
use api::v1::{
admin_expr, insert_expr, AddColumns, AdminExpr, AdminResult, AlterExpr, CreateDatabaseExpr,
CreateExpr, InsertExpr, ObjectExpr, ObjectResult as GrpcObjectResult,
admin_expr, insert_expr, select_expr, AddColumns, AdminExpr, AdminResult, AlterExpr,
CreateDatabaseExpr, CreateExpr, InsertExpr, ObjectExpr, ObjectResult as GrpcObjectResult,
};
use async_trait::async_trait;
use catalog::remote::MetaKvBackend;
@@ -37,6 +37,7 @@ use client::{Client, Database, Select};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::prelude::{BoxedError, StatusCode};
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_grpc::select::to_object_result;
use common_query::Output;
use common_telemetry::{debug, error, info};
use distributed::DistInstance;
@@ -531,21 +532,26 @@ impl Instance {
}
}
fn parse_stmt(sql: &str) -> Result<Statement> {
let mut stmt = ParserContext::create_with_dialect(sql, &GenericDialect {})
.context(error::ParseSqlSnafu)?;
// TODO(LFC): Support executing multiple SQL queries,
// which seems to be a major change to our whole server framework?
ensure!(
stmt.len() == 1,
error::InvalidSqlSnafu {
err_msg: "Currently executing multiple SQL queries are not supported."
}
);
Ok(stmt.remove(0))
}
#[async_trait]
impl SqlQueryHandler for Instance {
async fn do_query(&self, query: &str) -> server_error::Result<Output> {
let mut stmt = ParserContext::create_with_dialect(query, &GenericDialect {})
let stmt = parse_stmt(query)
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu { query })?;
if stmt.len() != 1 {
// TODO(LFC): Support executing multiple SQLs,
// which seems to be a major change to our whole server framework?
return server_error::NotSupportedSnafu {
feat: "Only one SQL is allowed to be executed at one time.",
}
.fail();
}
let stmt = stmt.remove(0);
match stmt {
Statement::Query(_) => self
@@ -680,16 +686,40 @@ impl GrpcQueryHandler for Instance {
query: format!("{:?}", query),
})
}
// FIXME(hl): refactor
_ => self
.database(DEFAULT_SCHEMA_NAME)
.object(query.clone())
.await
.map_err(BoxedError::new)
.with_context(|_| server_error::ExecuteQuerySnafu {
query: format!("{:?}", query),
}),
Expr::Select(select) => {
let select = select
.expr
.as_ref()
.context(server_error::InvalidQuerySnafu {
reason: "empty query",
})?;
match select {
select_expr::Expr::Sql(sql) => {
let output = SqlQueryHandler::do_query(self, sql).await;
Ok(to_object_result(output).await)
}
_ => {
if self.dist_instance.is_some() {
return server_error::NotSupportedSnafu {
feat: "Executing plan directly in Frontend.",
}
.fail();
}
// FIXME(hl): refactor
self.database(DEFAULT_SCHEMA_NAME)
.object(query.clone())
.await
.map_err(BoxedError::new)
.with_context(|_| server_error::ExecuteQuerySnafu {
query: format!("{:?}", query),
})
}
}
}
_ => server_error::NotSupportedSnafu {
feat: "Currently only insert and select is supported in GRPC service.",
}
.fail(),
}
} else {
server_error::InvalidQuerySnafu {

View File

@@ -15,9 +15,11 @@
use api::prometheus::remote::read_request::ResponseType;
use api::prometheus::remote::{Query, QueryResult, ReadRequest, ReadResponse, WriteRequest};
use async_trait::async_trait;
use client::{Database, ObjectResult, Select};
use client::{ObjectResult, Select};
use common_error::prelude::BoxedError;
use common_grpc::select::to_object_result;
use common_telemetry::logging;
use futures_util::TryFutureExt;
use prost::Message;
use servers::error::{self, Result as ServerResult};
use servers::prometheus::{self, Metrics};
@@ -25,7 +27,7 @@ use servers::query_handler::{PrometheusProtocolHandler, PrometheusResponse};
use snafu::{OptionExt, ResultExt};
use crate::frontend::Mode;
use crate::instance::Instance;
use crate::instance::{parse_stmt, Instance};
const SAMPLES_RESPONSE_TYPE: i32 = ResponseType::Samples as i32;
@@ -73,31 +75,37 @@ fn object_result_to_query_result(
})
}
async fn handle_remote_queries(
db: &Database,
queries: &[Query],
) -> ServerResult<Vec<(String, ObjectResult)>> {
let mut results = Vec::with_capacity(queries.len());
impl Instance {
async fn handle_remote_queries(
&self,
db: &str,
queries: &[Query],
) -> ServerResult<Vec<(String, ObjectResult)>> {
let mut results = Vec::with_capacity(queries.len());
for q in queries {
let (table_name, sql) = prometheus::query_to_sql(db.name(), q)?;
for query in queries {
let (table_name, sql) = prometheus::query_to_sql(db, query)?;
logging::debug!(
"prometheus remote read, table: {}, sql: {}",
table_name,
sql
);
logging::debug!(
"prometheus remote read, table: {}, sql: {}",
table_name,
sql
);
let object_result = db
.select(Select::Sql(sql.clone()))
.await
let object_result = if let Some(dist_instance) = &self.dist_instance {
let output = futures::future::ready(parse_stmt(&sql))
.and_then(|stmt| dist_instance.handle_sql(&sql, stmt))
.await;
to_object_result(output).await.try_into()
} else {
self.database(db).select(Select::Sql(sql.clone())).await
}
.map_err(BoxedError::new)
.context(error::ExecuteQuerySnafu { query: sql })?;
results.push((table_name, object_result));
results.push((table_name, object_result));
}
Ok(results)
}
Ok(results)
}
#[async_trait]
@@ -138,7 +146,9 @@ impl PrometheusProtocolHandler for Instance {
let response_type = negotiate_response_type(&request.accepted_response_types)?;
// TODO(dennis): use read_hints to speedup query if possible
let results = handle_remote_queries(&self.database(database), &request.queries).await?;
let results = self
.handle_remote_queries(database, &request.queries)
.await?;
match response_type {
ResponseType::Samples => {

View File

@@ -22,7 +22,6 @@ pub mod frontend;
pub mod grpc;
pub mod influxdb;
pub mod instance;
pub(crate) mod mock;
pub mod mysql;
pub mod opentsdb;
pub mod partitioning;

View File

@@ -23,7 +23,7 @@ pub struct MysqlOptions {
impl Default for MysqlOptions {
fn default() -> Self {
Self {
addr: "0.0.0.0:4002".to_string(),
addr: "127.0.0.1:4002".to_string(),
runtime_size: 2,
}
}

View File

@@ -23,7 +23,7 @@ pub struct OpentsdbOptions {
impl Default for OpentsdbOptions {
fn default() -> Self {
Self {
addr: "0.0.0.0:4242".to_string(),
addr: "127.0.0.1:4242".to_string(),
runtime_size: 2,
}
}

View File

@@ -24,7 +24,7 @@ pub struct PostgresOptions {
impl Default for PostgresOptions {
fn default() -> Self {
Self {
addr: "0.0.0.0:4003".to_string(),
addr: "127.0.0.1:4003".to_string(),
runtime_size: 2,
check_pwd: false,
}

View File

@@ -20,6 +20,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use client::Database;
use common_error::prelude::BoxedError;
use common_query::error::Result as QueryResult;
use common_query::logical_plan::Expr;
use common_query::physical_plan::{PhysicalPlan, PhysicalPlanRef};
@@ -40,7 +41,6 @@ use tokio::sync::RwLock;
use crate::datanode::DatanodeClients;
use crate::error::{self, Error, Result};
use crate::mock::{DatanodeInstance, TableScanPlan};
use crate::partitioning::columns::RangeColumnsPartitionRule;
use crate::partitioning::range::RangePartitionRule;
use crate::partitioning::{
@@ -48,7 +48,10 @@ use crate::partitioning::{
};
use crate::spliter::WriteSpliter;
use crate::table::route::TableRoutes;
use crate::table::scan::{DatanodeInstance, TableScanPlan};
pub mod insert;
pub(crate) mod scan;
#[derive(Clone)]
pub struct DistTable {
@@ -399,7 +402,7 @@ impl PhysicalPlan for DistTableScan {
_runtime: Arc<RuntimeEnv>,
) -> QueryResult<SendableRecordBatchStream> {
let exec = &self.partition_execs[partition];
exec.maybe_init().await;
exec.maybe_init().await.map_err(BoxedError::new)?;
Ok(exec.as_stream().await)
}
}
@@ -415,14 +418,14 @@ struct PartitionExec {
}
impl PartitionExec {
async fn maybe_init(&self) {
async fn maybe_init(&self) -> Result<()> {
if self.batches.read().await.is_some() {
return;
return Ok(());
}
let mut batches = self.batches.write().await;
if batches.is_some() {
return;
return Ok(());
}
let plan = TableScanPlan {
@@ -431,8 +434,9 @@ impl PartitionExec {
filters: self.filters.clone(),
limit: self.limit,
};
let result = self.datanode_instance.grpc_table_scan(plan).await;
let result = self.datanode_instance.grpc_table_scan(plan).await?;
let _ = batches.insert(result);
Ok(())
}
async fn as_stream(&self) -> SendableRecordBatchStream {

View File

@@ -29,7 +29,7 @@ use table::requests::InsertRequest;
use super::DistTable;
use crate::error;
use crate::error::Result;
use crate::mock::DatanodeInstance;
use crate::table::scan::DatanodeInstance;
impl DistTable {
pub async fn dist_insert(

View File

@@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// FIXME(LFC): no mock
use std::fmt::Formatter;
use std::sync::Arc;
@@ -24,13 +22,16 @@ use common_query::Output;
use common_recordbatch::{util, RecordBatches};
use datafusion::logical_plan::{LogicalPlan as DfLogicPlan, LogicalPlanBuilder};
use datafusion_expr::Expr as DfExpr;
use datatypes::prelude::Value;
use datatypes::prelude::*;
use datatypes::schema::SchemaRef;
use meta_client::rpc::TableName;
use query::plan::LogicalPlan;
use snafu::ResultExt;
use table::table::adapter::DfTableProviderAdapter;
use table::TableRef;
use crate::error::{self, Result};
#[derive(Clone)]
pub struct DatanodeInstance {
table: TableRef,
@@ -52,25 +53,33 @@ impl DatanodeInstance {
self.db.insert(request).await
}
pub(crate) async fn grpc_table_scan(&self, plan: TableScanPlan) -> RecordBatches {
let logical_plan = self.build_logical_plan(&plan);
common_telemetry::info!("logical_plan: {:?}", logical_plan);
pub(crate) async fn grpc_table_scan(&self, plan: TableScanPlan) -> Result<RecordBatches> {
let logical_plan = self.build_logical_plan(&plan)?;
// TODO(LFC): Directly pass in logical plan to GRPC interface when our substrait codec supports filter.
let sql = to_sql(logical_plan);
let result = self.db.select(Select::Sql(sql)).await.unwrap();
let sql = to_sql(logical_plan)?;
let output: Output = result.try_into().unwrap();
let recordbatches = match output {
Output::Stream(stream) => util::collect(stream).await.unwrap(),
Output::RecordBatches(x) => x.take(),
let output = self
.db
.select(Select::Sql(sql))
.await
.and_then(Output::try_from)
.context(error::SelectSnafu)?;
Ok(match output {
Output::Stream(stream) => {
let schema = stream.schema();
let batches = util::collect(stream)
.await
.context(error::CollectRecordbatchStreamSnafu)?;
RecordBatches::try_new(schema, batches).context(error::CreateRecordbatchesSnafu)?
}
Output::RecordBatches(x) => x,
_ => unreachable!(),
};
let schema = recordbatches.first().unwrap().schema.clone();
RecordBatches::try_new(schema, recordbatches).unwrap()
})
}
fn build_logical_plan(&self, table_scan: &TableScanPlan) -> LogicalPlan {
fn build_logical_plan(&self, table_scan: &TableScanPlan) -> Result<LogicalPlan> {
let table_provider = Arc::new(DfTableProviderAdapter::new(self.table.clone()));
let mut builder = LogicalPlanBuilder::scan_with_filters(
@@ -83,13 +92,16 @@ impl DatanodeInstance {
.map(|x| x.df_expr().clone())
.collect::<Vec<_>>(),
)
.unwrap();
.context(error::BuildDfLogicalPlanSnafu)?;
if let Some(limit) = table_scan.limit {
builder = builder.limit(limit).unwrap();
builder = builder
.limit(limit)
.context(error::BuildDfLogicalPlanSnafu)?;
}
let plan = builder.build().unwrap();
LogicalPlan::DfPlan(plan)
let plan = builder.build().context(error::BuildDfLogicalPlanSnafu)?;
Ok(LogicalPlan::DfPlan(plan))
}
}
@@ -101,14 +113,20 @@ pub(crate) struct TableScanPlan {
pub limit: Option<usize>,
}
fn to_sql(plan: LogicalPlan) -> String {
fn to_sql(plan: LogicalPlan) -> Result<String> {
let LogicalPlan::DfPlan(plan) = plan;
let table_scan = match plan {
DfLogicPlan::TableScan(table_scan) => table_scan,
_ => unreachable!("unknown plan: {:?}", plan),
};
let schema: SchemaRef = Arc::new(table_scan.source.schema().try_into().unwrap());
let schema: SchemaRef = Arc::new(
table_scan
.source
.schema()
.try_into()
.context(error::ConvertArrowSchemaSnafu)?,
);
let projection = table_scan
.projection
.map(|x| {
@@ -131,7 +149,7 @@ fn to_sql(plan: LogicalPlan) -> String {
.filters
.iter()
.map(expr_to_sql)
.collect::<Vec<String>>()
.collect::<Result<Vec<String>>>()?
.join(" AND ");
if !filters.is_empty() {
sql.push_str(" where ");
@@ -142,30 +160,31 @@ fn to_sql(plan: LogicalPlan) -> String {
sql.push_str(" limit ");
sql.push_str(&limit.to_string());
}
sql
Ok(sql)
}
fn expr_to_sql(expr: &DfExpr) -> String {
match expr {
fn expr_to_sql(expr: &DfExpr) -> Result<String> {
Ok(match expr {
DfExpr::BinaryExpr {
ref left,
ref right,
ref op,
} => format!(
"{} {} {}",
expr_to_sql(left.as_ref()),
expr_to_sql(left.as_ref())?,
op,
expr_to_sql(right.as_ref())
expr_to_sql(right.as_ref())?
),
DfExpr::Column(c) => c.name.clone(),
DfExpr::Literal(sv) => {
let v: Value = Value::try_from(sv.clone()).unwrap();
if v.data_type().is_string() {
let v: Value = Value::try_from(sv.clone())
.with_context(|_| error::ConvertScalarValueSnafu { value: sv.clone() })?;
if matches!(v.data_type(), ConcreteDataType::String(_)) {
format!("'{}'", sv)
} else {
format!("{}", sv)
}
}
_ => unimplemented!("not implemented for {:?}", expr),
}
})
}

View File

@@ -69,8 +69,8 @@ pub(crate) async fn create_datanode_client(
// Move client to an option so we can _move_ the inner value
// on the first attempt to connect. All other attempts will fail.
let mut client = Some(client);
// "0.0.0.0:3001" is just a placeholder, does not actually connect to it.
let addr = "0.0.0.0:3001";
// "127.0.0.1:3001" is just a placeholder, does not actually connect to it.
let addr = "127.0.0.1:3001";
let channel_manager = ChannelManager::new();
channel_manager
.reset_with_connector(

View File

@@ -78,7 +78,7 @@ mod tests {
let kv_store = Arc::new(MemStore::new());
let ctx = Context {
datanode_lease_secs: 30,
server_addr: "0.0.0.0:0000".to_string(),
server_addr: "127.0.0.1:0000".to_string(),
kv_store,
election: None,
skip_all: Arc::new(AtomicBool::new(false)),

View File

@@ -55,7 +55,7 @@ mod tests {
let kv_store = Arc::new(MemStore::new());
let ctx = Context {
datanode_lease_secs: 30,
server_addr: "0.0.0.0:0000".to_string(),
server_addr: "127.0.0.1:0000".to_string(),
kv_store,
election: None,
skip_all: Arc::new(AtomicBool::new(false)),

View File

@@ -182,7 +182,7 @@ mod tests {
fn test_datanode_lease_value() {
let value = LeaseValue {
timestamp_millis: 111,
node_addr: "0.0.0.0:3002".to_string(),
node_addr: "127.0.0.1:3002".to_string(),
};
let value_bytes: Vec<u8> = value.clone().try_into().unwrap();

View File

@@ -42,9 +42,9 @@ pub struct MetaSrvOptions {
impl Default for MetaSrvOptions {
fn default() -> Self {
Self {
bind_addr: "0.0.0.0:3002".to_string(),
server_addr: "0.0.0.0:3002".to_string(),
store_addr: "0.0.0.0:2379".to_string(),
bind_addr: "127.0.0.1:3002".to_string(),
server_addr: "127.0.0.1:3002".to_string(),
store_addr: "127.0.0.1:2379".to_string(),
datanode_lease_secs: 15,
}
}

View File

@@ -74,7 +74,7 @@ fn parse_string_to_value(
data_type: &ConcreteDataType,
) -> Result<Value> {
ensure!(
data_type.is_string(),
data_type.stringifiable(),
ColumnTypeMismatchSnafu {
column_name,
expect: data_type.clone(),