fix: frontend compile errors (#747)

fix: fix compile errors in frontend
This commit is contained in:
Lei, HUANG
2022-12-14 18:30:16 +08:00
committed by GitHub
parent dbb3034ecb
commit ce6d1cb7d1
10 changed files with 74 additions and 47 deletions

View File

@@ -17,6 +17,7 @@ use std::any::Any;
use common_error::prelude::*;
use common_query::logical_plan::Expr;
use datafusion_common::ScalarValue;
use datatypes::prelude::Value;
use store_api::storage::RegionId;
#[derive(Debug, Snafu)]
@@ -451,6 +452,17 @@ pub enum Error {
#[snafu(backtrace)]
source: substrait::error::Error,
},
#[snafu(display(
"Failed to build a vector from values, value: {}, source: {}",
value,
source
))]
BuildVector {
value: Value,
#[snafu(backtrace)]
source: datatypes::error::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -543,6 +555,7 @@ impl ErrorExt for Error {
Error::LeaderNotFound { .. } => StatusCode::StorageUnavailable,
Error::TableAlreadyExist { .. } => StatusCode::TableAlreadyExists,
Error::EncodeSubstraitLogicalPlan { source } => source.status_code(),
Error::BuildVector { source, .. } => source.status_code(),
}
}

View File

@@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use serde::{Deserialize, Serialize};
use servers::tls::TlsOption;
@@ -22,7 +20,7 @@ pub struct MysqlOptions {
pub addr: String,
pub runtime_size: usize,
#[serde(default = "Default::default")]
pub tls: Arc<TlsOption>,
pub tls: TlsOption,
}
impl Default for MysqlOptions {
@@ -30,7 +28,7 @@ impl Default for MysqlOptions {
Self {
addr: "127.0.0.1:4002".to_string(),
runtime_size: 2,
tls: Arc::new(TlsOption::default()),
tls: TlsOption::default(),
}
}
}

View File

@@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use serde::{Deserialize, Serialize};
use servers::tls::TlsOption;
@@ -23,7 +21,7 @@ pub struct PostgresOptions {
pub runtime_size: usize,
pub check_pwd: bool,
#[serde(default = "Default::default")]
pub tls: Arc<TlsOption>,
pub tls: TlsOption,
}
impl Default for PostgresOptions {

View File

@@ -14,8 +14,10 @@
use std::collections::HashMap;
use datatypes::data_type::DataType;
use datatypes::prelude::MutableVector;
use datatypes::value::Value;
use datatypes::vectors::{VectorBuilder, VectorRef};
use datatypes::vectors::VectorRef;
use snafu::{ensure, OptionExt};
use store_api::storage::RegionNumber;
use table::requests::InsertRequest;
@@ -125,9 +127,16 @@ fn partition_insert_request(
insert: &InsertRequest,
region_map: HashMap<RegionNumber, Vec<usize>>,
) -> DistInsertRequest {
let mut dist_insert: HashMap<RegionNumber, HashMap<&str, VectorBuilder>> =
let mut dist_insert: HashMap<RegionNumber, HashMap<&str, Box<dyn MutableVector>>> =
HashMap::with_capacity(region_map.len());
let row_num = insert
.columns_values
.values()
.next()
.map(|v| v.len())
.unwrap_or(0);
let column_count = insert.columns_values.len();
for (column_name, vector) in &insert.columns_values {
for (region_id, val_idxs) in &region_map {
@@ -136,10 +145,13 @@ fn partition_insert_request(
.or_insert_with(|| HashMap::with_capacity(column_count));
let builder = region_insert
.entry(column_name)
.or_insert_with(|| VectorBuilder::new(vector.data_type()));
val_idxs
.iter()
.for_each(|idx| builder.push(&vector.get(*idx)));
.or_insert_with(|| vector.data_type().create_mutable_vector(row_num));
val_idxs.iter().for_each(|idx| {
// Safety: MutableVector is built according to column data type.
builder
.push_value_ref(vector.get(*idx).as_value_ref())
.unwrap();
});
}
}
@@ -151,7 +163,7 @@ fn partition_insert_request(
.map(|(region_id, vector_map)| {
let columns_values = vector_map
.into_iter()
.map(|(column_name, mut builder)| (column_name.to_string(), builder.finish()))
.map(|(column_name, mut builder)| (column_name.to_string(), builder.to_vector()))
.collect();
(
region_id,

View File

@@ -14,15 +14,15 @@
use catalog::SchemaProviderRef;
use common_error::snafu::ensure;
use datatypes::prelude::ConcreteDataType;
use datatypes::vectors::VectorBuilder;
use datatypes::data_type::DataType;
use datatypes::prelude::{ConcreteDataType, MutableVector};
use snafu::{OptionExt, ResultExt};
use sql::ast::Value as SqlValue;
use sql::statements;
use sql::statements::insert::Insert;
use table::requests::InsertRequest;
use crate::error::{self, Result};
use crate::error::{self, BuildVectorSnafu, Result};
// TODO(fys): Extract the common logic in datanode and frontend in the future.
#[allow(dead_code)]
@@ -49,7 +49,7 @@ pub(crate) fn insert_to_request(
};
let rows_num = values.len();
let mut columns_builders: Vec<(&String, &ConcreteDataType, VectorBuilder)> =
let mut columns_builders: Vec<(&String, &ConcreteDataType, Box<dyn MutableVector>)> =
Vec::with_capacity(columns_num);
if columns.is_empty() {
@@ -58,7 +58,7 @@ pub(crate) fn insert_to_request(
columns_builders.push((
&column_schema.name,
data_type,
VectorBuilder::with_capacity(data_type.clone(), rows_num),
data_type.create_mutable_vector(rows_num),
));
}
} else {
@@ -73,7 +73,7 @@ pub(crate) fn insert_to_request(
columns_builders.push((
column_name,
data_type,
VectorBuilder::with_capacity(data_type.clone(), rows_num),
data_type.create_mutable_vector(rows_num),
));
}
}
@@ -100,7 +100,7 @@ pub(crate) fn insert_to_request(
table_name,
columns_values: columns_builders
.into_iter()
.map(|(c, _, mut b)| (c.to_owned(), b.finish()))
.map(|(c, _, mut b)| (c.to_owned(), b.to_vector()))
.collect(),
})
}
@@ -109,11 +109,12 @@ fn add_row_to_vector(
column_name: &str,
data_type: &ConcreteDataType,
sql_val: &SqlValue,
builder: &mut VectorBuilder,
builder: &mut Box<dyn MutableVector>,
) -> Result<()> {
let value = statements::sql_value_to_value(column_name, data_type, sql_val)
.context(error::ParseSqlSnafu)?;
builder.push(&value);
builder
.push_value_ref(value.as_value_ref())
.context(BuildVectorSnafu { value })?;
Ok(())
}

View File

@@ -29,12 +29,13 @@ use common_query::physical_plan::{PhysicalPlan, PhysicalPlanRef};
use common_recordbatch::adapter::AsyncRecordBatchStreamAdapter;
use common_recordbatch::{RecordBatches, SendableRecordBatchStream};
use common_telemetry::debug;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::logical_plan::Expr as DfExpr;
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::{
Partitioning, SendableRecordBatchStream as DfSendableRecordBatchStream,
};
use datafusion_common::DataFusionError;
use datafusion_expr::expr::Expr as DfExpr;
use datafusion_expr::BinaryExpr;
use datatypes::prelude::Value;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use meta_client::rpc::{Peer, TableName};
@@ -198,7 +199,7 @@ impl DistTable {
) -> Result<HashSet<RegionNumber>> {
let expr = filter.df_expr();
match expr {
DfExpr::BinaryExpr { left, op, right } if is_compare_op(op) => {
DfExpr::BinaryExpr(BinaryExpr { left, op, right }) if is_compare_op(op) => {
let column_op_value = match (left.as_ref(), right.as_ref()) {
(DfExpr::Column(c), DfExpr::Literal(v)) => Some((&c.name, *op, v)),
(DfExpr::Literal(v), DfExpr::Column(c)) => {
@@ -217,7 +218,7 @@ impl DistTable {
.collect::<HashSet<RegionNumber>>());
}
}
DfExpr::BinaryExpr { left, op, right }
DfExpr::BinaryExpr(BinaryExpr { left, op, right })
if matches!(op, Operator::And | Operator::Or) =>
{
let left_regions =
@@ -449,7 +450,7 @@ impl PhysicalPlan for DistTableScan {
fn execute(
&self,
partition: usize,
_runtime: Arc<RuntimeEnv>,
_context: Arc<TaskContext>,
) -> QueryResult<SendableRecordBatchStream> {
let exec = self.partition_execs[partition].clone();
let stream = Box::pin(async move {
@@ -516,6 +517,7 @@ mod test {
use catalog::remote::MetaKvBackend;
use common_recordbatch::util;
use datafusion::arrow_print;
use datafusion::execution::context::TaskContext;
use datafusion_common::record_batch::RecordBatch as DfRecordBatch;
use datafusion_expr::expr_fn::{and, binary_expr, col, or};
use datafusion_expr::lit;
@@ -743,7 +745,6 @@ mod test {
async fn test_dist_table_scan() {
common_telemetry::init_default_ut_logging();
let table = Arc::new(new_dist_table().await);
// should scan all regions
// select * from numbers
let projection = None;
@@ -798,10 +799,16 @@ mod test {
.await
.unwrap();
let task_ctx = Arc::new(TaskContext::new(
"0".to_string(),
"0".to_string(),
HashMap::new(),
HashMap::new(),
HashMap::new(),
Arc::new(RuntimeEnv::default()),
));
for partition in 0..table_scan.output_partitioning().partition_count() {
let result = table_scan
.execute(partition, Arc::new(RuntimeEnv::default()))
.unwrap();
let result = table_scan.execute(partition, task_ctx.clone()).unwrap();
let recordbatches = util::collect(result).await.unwrap();
let df_recordbatch = recordbatches

View File

@@ -150,7 +150,6 @@ mod tests {
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use datatypes::prelude::ConcreteDataType;
use datatypes::types::StringType;
use datatypes::vectors::VectorBuilder;
use table::requests::InsertRequest;
use super::to_insert_expr;
@@ -173,11 +172,11 @@ mod tests {
builder.push(&"host3".into());
columns_values.insert("host".to_string(), builder.finish());
let mut builder = VectorBuilder::new(ConcreteDataType::int16_datatype());
let mut builder = ConcreteDataType::int16_datatype().create_mutable_vector(3);
builder.push(&1_i16.into());
builder.push(&2_i16.into());
builder.push(&3_i16.into());
columns_values.insert("id".to_string(), builder.finish());
columns_values.insert("id".to_string(), builder.to_vector());
InsertRequest {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),

View File

@@ -20,7 +20,8 @@ use client::{Database, ObjectResult};
use common_query::prelude::Expr;
use common_query::Output;
use common_recordbatch::{util, RecordBatches};
use datafusion::logical_plan::{LogicalPlan, LogicalPlanBuilder};
use datafusion::datasource::DefaultTableSource;
use datafusion_expr::{LogicalPlan, LogicalPlanBuilder};
use meta_client::rpc::TableName;
use snafu::ResultExt;
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
@@ -82,7 +83,7 @@ impl DatanodeInstance {
let mut builder = LogicalPlanBuilder::scan_with_filters(
&table_scan.table_name.to_string(),
table_provider,
Arc::new(DefaultTableSource::new(table_provider)),
table_scan.projection.clone(),
table_scan
.filters
@@ -104,11 +105,9 @@ impl DatanodeInstance {
.context(error::BuildDfLogicalPlanSnafu)?;
}
if let Some(limit) = table_scan.limit {
builder = builder
.limit(limit)
.context(error::BuildDfLogicalPlanSnafu)?;
}
builder
.limit(0, table_scan.limit)
.context(error::BuildDfLogicalPlanSnafu)?;
builder.build().context(error::BuildDfLogicalPlanSnafu)
}

View File

@@ -40,14 +40,14 @@ const DEFAULT_RESULT_SET_WRITE_BUFFER_SIZE: usize = 100 * 1024;
pub struct MysqlServer {
base_server: BaseTcpServer,
query_handler: SqlQueryHandlerRef,
tls: Arc<TlsOption>,
tls: TlsOption,
}
impl MysqlServer {
pub fn create_server(
query_handler: SqlQueryHandlerRef,
io_runtime: Arc<Runtime>,
tls: Arc<TlsOption>,
tls: TlsOption,
) -> Box<dyn Server> {
Box::new(MysqlServer {
base_server: BaseTcpServer::create_server("MySQL", io_runtime),

View File

@@ -35,7 +35,7 @@ pub struct PostgresServer {
base_server: BaseTcpServer,
auth_handler: Arc<PgAuthStartupHandler>,
query_handler: Arc<PostgresServerHandler>,
tls: Arc<TlsOption>,
tls: TlsOption,
}
impl PostgresServer {
@@ -43,7 +43,7 @@ impl PostgresServer {
pub fn new(
query_handler: SqlQueryHandlerRef,
check_pwd: bool,
tls: Arc<TlsOption>,
tls: TlsOption,
io_runtime: Arc<Runtime>,
) -> PostgresServer {
let postgres_handler = Arc::new(PostgresServerHandler::new(query_handler));