feat: impl insert for DistTable (#406)

* feat: impl insert for dist_table in frontend

* add the logic of decode region id in datanode.
This commit is contained in:
fys
2022-11-08 17:19:17 +08:00
committed by GitHub
parent a41aec0a86
commit 857054f70d
12 changed files with 353 additions and 15 deletions

2
Cargo.lock generated
View File

@@ -118,6 +118,8 @@ checksum = "98161a4e3e2184da77bb14f02184cdd111e83bbbcc9979dfee3c44b9a85f5602"
name = "api"
version = "0.1.0"
dependencies = [
"common-base",
"common-time",
"datatypes",
"prost 0.11.0",
"snafu",

View File

@@ -5,6 +5,8 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
common-base = { path = "../common/base" }
common-time = { path = "../common/time" }
datatypes = { path = "../datatypes" }
prost = "0.11"
snafu = { version = "0.7", features = ["backtraces"] }

View File

@@ -8,3 +8,7 @@ message InsertBatch {
repeated Column columns = 1;
uint32 row_count = 2;
}
message RegionId {
uint64 id = 1;
}

View File

@@ -1,8 +1,13 @@
use common_base::BitVec;
use common_time::timestamp::TimeUnit;
use datatypes::prelude::ConcreteDataType;
use datatypes::value::Value;
use datatypes::vectors::VectorRef;
use snafu::prelude::*;
use crate::error::{self, Result};
use crate::v1::column::Values;
use crate::v1::Column;
use crate::v1::ColumnDataType;
#[derive(Debug, PartialEq, Eq)]
@@ -143,8 +148,47 @@ impl Values {
}
}
impl Column {
// The type of vals must be same.
pub fn push_vals(&mut self, origin_count: usize, vector: VectorRef) {
let values = self.values.get_or_insert_with(Values::default);
let mut null_mask = BitVec::from_slice(&self.null_mask);
let len = vector.len();
null_mask.reserve_exact(origin_count + len);
null_mask.extend(BitVec::repeat(false, len));
(0..len).into_iter().for_each(|idx| match vector.get(idx) {
Value::Null => null_mask.set(idx + origin_count, true),
Value::Boolean(val) => values.bool_values.push(val),
Value::UInt8(val) => values.u8_values.push(val.into()),
Value::UInt16(val) => values.u16_values.push(val.into()),
Value::UInt32(val) => values.u32_values.push(val),
Value::UInt64(val) => values.u64_values.push(val),
Value::Int8(val) => values.i8_values.push(val.into()),
Value::Int16(val) => values.i16_values.push(val.into()),
Value::Int32(val) => values.i32_values.push(val),
Value::Int64(val) => values.i64_values.push(val),
Value::Float32(val) => values.f32_values.push(*val),
Value::Float64(val) => values.f64_values.push(*val),
Value::String(val) => values.string_values.push(val.as_utf8().to_string()),
Value::Binary(val) => values.binary_values.push(val.to_vec()),
Value::Date(val) => values.date_values.push(val.val()),
Value::DateTime(val) => values.datetime_values.push(val.val()),
Value::Timestamp(val) => values
.ts_millis_values
.push(val.convert_to(TimeUnit::Millisecond)),
Value::List(_) => unreachable!(),
});
self.null_mask = null_mask.into_vec();
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use datatypes::vectors::BooleanVector;
use super::*;
#[test]
@@ -358,4 +402,29 @@ mod tests {
"Failed to create column datatype from List(ListType { inner: Boolean(BooleanType) })"
);
}
#[test]
fn test_column_put_vector() {
use crate::v1::column::SemanticType;
// Some(false), None, Some(true), Some(true)
let mut column = Column {
column_name: "test".to_string(),
semantic_type: SemanticType::Field as i32,
values: Some(Values {
bool_values: vec![false, true, true],
..Default::default()
}),
null_mask: vec![2],
datatype: ColumnDataType::Boolean as i32,
};
let row_count = 4;
let vector = Arc::new(BooleanVector::from(vec![Some(true), None, Some(false)]));
column.push_vals(row_count, vector);
// Some(false), None, Some(true), Some(true), Some(true), None, Some(false)
let bool_values = column.values.unwrap().bool_values;
assert_eq!(vec![false, true, true, true, false], bool_values);
let null_mask = column.null_mask;
assert_eq!(34, null_mask[0]);
}
}

View File

@@ -1,7 +1,7 @@
pub use prost::DecodeError;
use prost::Message;
use crate::v1::codec::{InsertBatch, PhysicalPlanNode, SelectResult};
use crate::v1::codec::{InsertBatch, PhysicalPlanNode, RegionId, SelectResult};
macro_rules! impl_convert_with_bytes {
($data_type: ty) => {
@@ -24,6 +24,7 @@ macro_rules! impl_convert_with_bytes {
impl_convert_with_bytes!(InsertBatch);
impl_convert_with_bytes!(SelectResult);
impl_convert_with_bytes!(PhysicalPlanNode);
impl_convert_with_bytes!(RegionId);
#[cfg(test)]
mod tests {
@@ -127,6 +128,16 @@ mod tests {
);
}
#[test]
fn test_convert_region_id() {
let region_id = RegionId { id: 12 };
let bytes: Vec<u8> = region_id.into();
let region_id: RegionId = bytes.deref().try_into().unwrap();
assert_eq!(12, region_id.id);
}
fn mock_insert_batch() -> InsertBatch {
let values = column::Values {
i32_values: vec![2, 3, 4, 5, 6, 7, 8],

View File

@@ -1,3 +1,6 @@
use std::ops::Deref;
use api::v1::codec::RegionId;
use api::v1::{
admin_expr, codec::InsertBatch, insert_expr, object_expr, select_expr, AdminExpr, AdminResult,
ObjectExpr, ObjectResult, SelectExpr,
@@ -200,6 +203,18 @@ impl GrpcQueryHandler for Instance {
.context(servers::error::InvalidQuerySnafu {
reason: "missing `expr` in `InsertExpr`",
})?;
// TODO(fys): _region_id is for later use.
let _region_id: Option<RegionId> = insert_expr
.options
.get("region_id")
.map(|id| {
id.deref()
.try_into()
.context(servers::error::DecodeRegionIdSnafu)
})
.transpose()?;
match expr {
insert_expr::Expr::Values(values) => {
self.handle_insert(table_name, values).await

View File

@@ -141,6 +141,12 @@ pub enum Error {
actual: usize,
backtrace: Backtrace,
},
#[snafu(display("Failed to join task, source: {}", source))]
JoinTask {
source: common_runtime::JoinError,
backtrace: Backtrace,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -176,6 +182,7 @@ impl ErrorExt for Error {
StatusCode::Unexpected
}
Error::ExecOpentsdbPut { .. } => StatusCode::Internal,
Error::JoinTask { .. } => StatusCode::Unexpected,
}
}

View File

@@ -3,7 +3,9 @@
use std::fmt::Formatter;
use std::sync::Arc;
use api::v1::InsertExpr;
use catalog::CatalogManagerRef;
use client::ObjectResult;
use client::{Database, Select};
use common_query::prelude::Expr;
use common_query::Output;
@@ -45,6 +47,10 @@ impl DatanodeInstance {
}
}
pub(crate) async fn grpc_insert(&self, request: InsertExpr) -> client::Result<ObjectResult> {
self.db.insert(request).await
}
#[allow(clippy::print_stdout)]
pub(crate) async fn grpc_table_scan(&self, plan: TableScanPlan) -> RecordBatches {
let logical_plan = self.build_logical_plan(&plan);

View File

@@ -8,23 +8,21 @@ use snafu::OptionExt;
use store_api::storage::RegionId;
use table::requests::InsertRequest;
use crate::error::Error;
use crate::error::FindPartitionColumnSnafu;
use crate::error::FindRegionSnafu;
use crate::error::InvalidInsertRequestSnafu;
use crate::error::Result;
use crate::partitioning::PartitionRule;
use crate::partitioning::PartitionRuleRef;
pub type DistInsertRequest = HashMap<RegionId, InsertRequest>;
pub struct WriteSpliter<'a, P> {
partition_rule: &'a P,
pub struct WriteSpliter {
partition_rule: PartitionRuleRef<Error>,
}
impl<'a, P> WriteSpliter<'a, P>
where
P: PartitionRule,
{
pub fn with_patition_rule(rule: &'a P) -> Self {
impl WriteSpliter {
pub fn with_patition_rule(rule: PartitionRuleRef<Error>) -> Self {
Self {
partition_rule: rule,
}
@@ -156,7 +154,7 @@ fn partition_insert_request(
#[cfg(test)]
mod tests {
use std::{collections::HashMap, result::Result};
use std::{collections::HashMap, result::Result, sync::Arc};
use datatypes::{
data_type::ConcreteDataType,
@@ -167,10 +165,13 @@ mod tests {
use table::requests::InsertRequest;
use super::{
check_req, find_partitioning_values, partition_insert_request, partition_values,
PartitionRule, RegionId, WriteSpliter,
check_req, find_partitioning_values, partition_insert_request, partition_values, RegionId,
WriteSpliter,
};
use crate::{
error::Error,
partitioning::{PartitionExpr, PartitionRule, PartitionRuleRef},
};
use crate::partitioning::PartitionExpr;
#[test]
fn test_insert_req_check() {
@@ -186,7 +187,8 @@ mod tests {
#[test]
fn test_writer_spliter() {
let insert = mock_insert_request();
let spliter = WriteSpliter::with_patition_rule(&MockPartitionRule);
let rule = Arc::new(MockPartitionRule) as PartitionRuleRef<Error>;
let spliter = WriteSpliter::with_patition_rule(rule);
let ret = spliter.split(insert).unwrap();
assert_eq!(2, ret.len());
@@ -406,7 +408,7 @@ mod tests {
// PARTITION r1 VALUES IN(2, 3),
// );
impl PartitionRule for MockPartitionRule {
type Error = String;
type Error = Error;
fn partition_columns(&self) -> Vec<String> {
vec!["id".to_string()]

View File

@@ -1,3 +1,5 @@
mod insert;
use std::any::Any;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
@@ -15,12 +17,14 @@ use snafu::prelude::*;
use store_api::storage::RegionId;
use table::error::Error as TableError;
use table::metadata::{FilterPushDownType, TableInfoRef};
use table::requests::InsertRequest;
use table::Table;
use tokio::sync::RwLock;
use crate::error::{self, Error, Result};
use crate::mock::{DatanodeId, DatanodeInstance, TableScanPlan};
use crate::partitioning::{Operator, PartitionExpr, PartitionRuleRef};
use crate::spliter::WriteSpliter;
struct DistTable {
table_name: String,
@@ -44,6 +48,16 @@ impl Table for DistTable {
unimplemented!()
}
async fn insert(&self, request: InsertRequest) -> table::Result<usize> {
let spliter = WriteSpliter::with_patition_rule(self.partition_rule.clone());
let inserts = spliter.split(request).map_err(TableError::new)?;
let result = match self.dist_insert(inserts).await.map_err(TableError::new)? {
client::ObjectResult::Select(_) => unreachable!(),
client::ObjectResult::Mutate(result) => result,
};
Ok(result.success as usize)
}
async fn scan(
&self,
projection: &Option<Vec<usize>>,

View File

@@ -0,0 +1,202 @@
use std::collections::HashMap;
use api::helper::ColumnDataTypeWrapper;
use api::v1::codec;
use api::v1::insert_expr;
use api::v1::insert_expr::Expr;
use api::v1::Column;
use api::v1::InsertExpr;
use api::v1::MutateResult;
use client::ObjectResult;
use snafu::ensure;
use snafu::OptionExt;
use snafu::ResultExt;
use store_api::storage::RegionId;
use table::requests::InsertRequest;
use super::DistTable;
use crate::error;
use crate::error::Result;
impl DistTable {
pub async fn dist_insert(
&self,
inserts: HashMap<RegionId, InsertRequest>,
) -> Result<ObjectResult> {
let mut joins = Vec::with_capacity(inserts.len());
for (region_id, insert) in inserts {
let db = self
.region_dist_map
.get(&region_id)
.context(error::FindDatanodeSnafu { region: region_id })?;
let instance = self
.datanode_instances
.get(db)
.context(error::DatanodeInstanceSnafu { datanode: *db })?;
let instance = instance.clone();
// TODO(fys): a separate runtime should be used here.
let join = tokio::spawn(async move {
instance
.grpc_insert(to_insert_expr(region_id, insert)?)
.await
.context(error::RequestDatanodeSnafu)
});
joins.push(join);
}
let mut success = 0;
let mut failure = 0;
for join in joins {
let object_result = join.await.context(error::JoinTaskSnafu)??;
let result = match object_result {
client::ObjectResult::Select(_) => unreachable!(),
client::ObjectResult::Mutate(result) => result,
};
success += result.success;
failure += result.failure;
}
Ok(ObjectResult::Mutate(MutateResult { success, failure }))
}
}
fn to_insert_expr(region_id: RegionId, insert: InsertRequest) -> Result<InsertExpr> {
let mut row_count = None;
let columns = insert
.columns_values
.into_iter()
.map(|(column_name, vector)| {
match row_count {
Some(rows) => ensure!(
rows == vector.len(),
error::InvalidInsertRequestSnafu {
reason: "The row count of columns is not the same."
}
),
None => row_count = Some(vector.len()),
}
let datatype: ColumnDataTypeWrapper = vector
.data_type()
.try_into()
.context(error::ColumnDataTypeSnafu)?;
let mut column = Column {
column_name,
datatype: datatype.datatype() as i32,
..Default::default()
};
column.push_vals(0, vector);
Ok(column)
})
.collect::<Result<Vec<_>>>()?;
let insert_batch = codec::InsertBatch {
columns,
row_count: row_count.map(|rows| rows as u32).unwrap_or(0),
};
let mut options = HashMap::with_capacity(1);
options.insert(
// TODO(fys): Temporarily hard code here
"region_id".to_string(),
codec::RegionId { id: region_id }.into(),
);
Ok(InsertExpr {
table_name: insert.table_name,
options,
expr: Some(Expr::Values(insert_expr::Values {
values: vec![insert_batch.into()],
})),
})
}
#[cfg(test)]
mod tests {
use std::{collections::HashMap, ops::Deref};
use api::v1::{
codec::{self, InsertBatch},
insert_expr::Expr,
ColumnDataType, InsertExpr,
};
use datatypes::{prelude::ConcreteDataType, types::StringType, vectors::VectorBuilder};
use table::requests::InsertRequest;
use super::to_insert_expr;
#[test]
fn test_to_insert_expr() {
let insert_request = mock_insert_request();
let insert_expr = to_insert_expr(12, insert_request).unwrap();
verify_insert_expr(insert_expr);
}
fn mock_insert_request() -> InsertRequest {
let mut columns_values = HashMap::with_capacity(4);
let mut builder = VectorBuilder::new(ConcreteDataType::String(StringType));
builder.push(&"host1".into());
builder.push_null();
builder.push(&"host3".into());
columns_values.insert("host".to_string(), builder.finish());
let mut builder = VectorBuilder::new(ConcreteDataType::int16_datatype());
builder.push(&1_i16.into());
builder.push(&2_i16.into());
builder.push(&3_i16.into());
columns_values.insert("id".to_string(), builder.finish());
InsertRequest {
table_name: "demo".to_string(),
columns_values,
}
}
fn verify_insert_expr(insert_expr: InsertExpr) {
let table_name = insert_expr.table_name;
assert_eq!("demo", table_name);
let expr = insert_expr.expr.as_ref().unwrap();
let vals = match expr {
Expr::Values(vals) => vals,
Expr::Sql(_) => unreachable!(),
};
let batch: &[u8] = vals.values[0].as_ref();
let vals: InsertBatch = batch.try_into().unwrap();
for column in vals.columns {
let name = column.column_name;
if name == "id" {
assert_eq!(0, column.null_mask[0]);
assert_eq!(ColumnDataType::Int16 as i32, column.datatype);
assert_eq!(vec![1, 2, 3], column.values.as_ref().unwrap().i16_values);
}
if name == "host" {
assert_eq!(2, column.null_mask[0]);
assert_eq!(ColumnDataType::String as i32, column.datatype);
assert_eq!(
vec!["host1", "host3"],
column.values.as_ref().unwrap().string_values
);
}
}
let bytes = insert_expr.options.get("region_id").unwrap();
let region_id: codec::RegionId = bytes.deref().try_into().unwrap();
assert_eq!(12, region_id.id);
}
}

View File

@@ -152,6 +152,9 @@ pub enum Error {
#[snafu(display("Invalid prometheus remote read query result, msg: {}", msg))]
InvalidPromRemoteReadQueryResult { msg: String, backtrace: Backtrace },
#[snafu(display("Failed to decode region id, source: {}", source))]
DecodeRegionId { source: api::DecodeError },
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -186,6 +189,7 @@ impl ErrorExt for Error {
| DecodePromRemoteRequest { .. }
| DecompressPromRemoteRequest { .. }
| InvalidPromRemoteRequest { .. }
| DecodeRegionId { .. }
| TimePrecision { .. } => StatusCode::InvalidArguments,
InfluxdbLinesWrite { source, .. } => source.status_code(),