diff --git a/Cargo.lock b/Cargo.lock index beec8ed491..bb77552c70 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -118,6 +118,8 @@ checksum = "98161a4e3e2184da77bb14f02184cdd111e83bbbcc9979dfee3c44b9a85f5602" name = "api" version = "0.1.0" dependencies = [ + "common-base", + "common-time", "datatypes", "prost 0.11.0", "snafu", diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index 7f59a80315..01f7c3de8d 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -5,6 +5,8 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +common-base = { path = "../common/base" } +common-time = { path = "../common/time" } datatypes = { path = "../datatypes" } prost = "0.11" snafu = { version = "0.7", features = ["backtraces"] } diff --git a/src/api/greptime/v1/insert.proto b/src/api/greptime/v1/insert.proto index 33d204b5c0..0b2b6d6343 100644 --- a/src/api/greptime/v1/insert.proto +++ b/src/api/greptime/v1/insert.proto @@ -8,3 +8,7 @@ message InsertBatch { repeated Column columns = 1; uint32 row_count = 2; } + +message RegionId { + uint64 id = 1; +} diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs index 13c8afc346..7c1a4a9b6c 100644 --- a/src/api/src/helper.rs +++ b/src/api/src/helper.rs @@ -1,8 +1,13 @@ +use common_base::BitVec; +use common_time::timestamp::TimeUnit; use datatypes::prelude::ConcreteDataType; +use datatypes::value::Value; +use datatypes::vectors::VectorRef; use snafu::prelude::*; use crate::error::{self, Result}; use crate::v1::column::Values; +use crate::v1::Column; use crate::v1::ColumnDataType; #[derive(Debug, PartialEq, Eq)] @@ -143,8 +148,47 @@ impl Values { } } +impl Column { + // The type of vals must be same. + pub fn push_vals(&mut self, origin_count: usize, vector: VectorRef) { + let values = self.values.get_or_insert_with(Values::default); + let mut null_mask = BitVec::from_slice(&self.null_mask); + let len = vector.len(); + null_mask.reserve_exact(origin_count + len); + null_mask.extend(BitVec::repeat(false, len)); + + (0..len).into_iter().for_each(|idx| match vector.get(idx) { + Value::Null => null_mask.set(idx + origin_count, true), + Value::Boolean(val) => values.bool_values.push(val), + Value::UInt8(val) => values.u8_values.push(val.into()), + Value::UInt16(val) => values.u16_values.push(val.into()), + Value::UInt32(val) => values.u32_values.push(val), + Value::UInt64(val) => values.u64_values.push(val), + Value::Int8(val) => values.i8_values.push(val.into()), + Value::Int16(val) => values.i16_values.push(val.into()), + Value::Int32(val) => values.i32_values.push(val), + Value::Int64(val) => values.i64_values.push(val), + Value::Float32(val) => values.f32_values.push(*val), + Value::Float64(val) => values.f64_values.push(*val), + Value::String(val) => values.string_values.push(val.as_utf8().to_string()), + Value::Binary(val) => values.binary_values.push(val.to_vec()), + Value::Date(val) => values.date_values.push(val.val()), + Value::DateTime(val) => values.datetime_values.push(val.val()), + Value::Timestamp(val) => values + .ts_millis_values + .push(val.convert_to(TimeUnit::Millisecond)), + Value::List(_) => unreachable!(), + }); + self.null_mask = null_mask.into_vec(); + } +} + #[cfg(test)] mod tests { + use std::sync::Arc; + + use datatypes::vectors::BooleanVector; + use super::*; #[test] @@ -358,4 +402,29 @@ mod tests { "Failed to create column datatype from List(ListType { inner: Boolean(BooleanType) })" ); } + + #[test] + fn test_column_put_vector() { + use crate::v1::column::SemanticType; + // Some(false), None, Some(true), Some(true) + let mut column = Column { + column_name: "test".to_string(), + semantic_type: SemanticType::Field as i32, + values: Some(Values { + bool_values: vec![false, true, true], + ..Default::default() + }), + null_mask: vec![2], + datatype: ColumnDataType::Boolean as i32, + }; + let row_count = 4; + + let vector = Arc::new(BooleanVector::from(vec![Some(true), None, Some(false)])); + column.push_vals(row_count, vector); + // Some(false), None, Some(true), Some(true), Some(true), None, Some(false) + let bool_values = column.values.unwrap().bool_values; + assert_eq!(vec![false, true, true, true, false], bool_values); + let null_mask = column.null_mask; + assert_eq!(34, null_mask[0]); + } } diff --git a/src/api/src/serde.rs b/src/api/src/serde.rs index 9e884f4515..f887dab918 100644 --- a/src/api/src/serde.rs +++ b/src/api/src/serde.rs @@ -1,7 +1,7 @@ pub use prost::DecodeError; use prost::Message; -use crate::v1::codec::{InsertBatch, PhysicalPlanNode, SelectResult}; +use crate::v1::codec::{InsertBatch, PhysicalPlanNode, RegionId, SelectResult}; macro_rules! impl_convert_with_bytes { ($data_type: ty) => { @@ -24,6 +24,7 @@ macro_rules! impl_convert_with_bytes { impl_convert_with_bytes!(InsertBatch); impl_convert_with_bytes!(SelectResult); impl_convert_with_bytes!(PhysicalPlanNode); +impl_convert_with_bytes!(RegionId); #[cfg(test)] mod tests { @@ -127,6 +128,16 @@ mod tests { ); } + #[test] + fn test_convert_region_id() { + let region_id = RegionId { id: 12 }; + + let bytes: Vec = region_id.into(); + let region_id: RegionId = bytes.deref().try_into().unwrap(); + + assert_eq!(12, region_id.id); + } + fn mock_insert_batch() -> InsertBatch { let values = column::Values { i32_values: vec![2, 3, 4, 5, 6, 7, 8], diff --git a/src/datanode/src/instance/grpc.rs b/src/datanode/src/instance/grpc.rs index f0e93dd4d3..a24ff02274 100644 --- a/src/datanode/src/instance/grpc.rs +++ b/src/datanode/src/instance/grpc.rs @@ -1,3 +1,6 @@ +use std::ops::Deref; + +use api::v1::codec::RegionId; use api::v1::{ admin_expr, codec::InsertBatch, insert_expr, object_expr, select_expr, AdminExpr, AdminResult, ObjectExpr, ObjectResult, SelectExpr, @@ -200,6 +203,18 @@ impl GrpcQueryHandler for Instance { .context(servers::error::InvalidQuerySnafu { reason: "missing `expr` in `InsertExpr`", })?; + + // TODO(fys): _region_id is for later use. + let _region_id: Option = insert_expr + .options + .get("region_id") + .map(|id| { + id.deref() + .try_into() + .context(servers::error::DecodeRegionIdSnafu) + }) + .transpose()?; + match expr { insert_expr::Expr::Values(values) => { self.handle_insert(table_name, values).await diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index dd2656e899..f0a8495d4d 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -141,6 +141,12 @@ pub enum Error { actual: usize, backtrace: Backtrace, }, + + #[snafu(display("Failed to join task, source: {}", source))] + JoinTask { + source: common_runtime::JoinError, + backtrace: Backtrace, + }, } pub type Result = std::result::Result; @@ -176,6 +182,7 @@ impl ErrorExt for Error { StatusCode::Unexpected } Error::ExecOpentsdbPut { .. } => StatusCode::Internal, + Error::JoinTask { .. } => StatusCode::Unexpected, } } diff --git a/src/frontend/src/mock.rs b/src/frontend/src/mock.rs index bd4299b548..0b55824958 100644 --- a/src/frontend/src/mock.rs +++ b/src/frontend/src/mock.rs @@ -3,7 +3,9 @@ use std::fmt::Formatter; use std::sync::Arc; +use api::v1::InsertExpr; use catalog::CatalogManagerRef; +use client::ObjectResult; use client::{Database, Select}; use common_query::prelude::Expr; use common_query::Output; @@ -45,6 +47,10 @@ impl DatanodeInstance { } } + pub(crate) async fn grpc_insert(&self, request: InsertExpr) -> client::Result { + self.db.insert(request).await + } + #[allow(clippy::print_stdout)] pub(crate) async fn grpc_table_scan(&self, plan: TableScanPlan) -> RecordBatches { let logical_plan = self.build_logical_plan(&plan); diff --git a/src/frontend/src/spliter.rs b/src/frontend/src/spliter.rs index bb55b85eff..b788f82422 100644 --- a/src/frontend/src/spliter.rs +++ b/src/frontend/src/spliter.rs @@ -8,23 +8,21 @@ use snafu::OptionExt; use store_api::storage::RegionId; use table::requests::InsertRequest; +use crate::error::Error; use crate::error::FindPartitionColumnSnafu; use crate::error::FindRegionSnafu; use crate::error::InvalidInsertRequestSnafu; use crate::error::Result; -use crate::partitioning::PartitionRule; +use crate::partitioning::PartitionRuleRef; pub type DistInsertRequest = HashMap; -pub struct WriteSpliter<'a, P> { - partition_rule: &'a P, +pub struct WriteSpliter { + partition_rule: PartitionRuleRef, } -impl<'a, P> WriteSpliter<'a, P> -where - P: PartitionRule, -{ - pub fn with_patition_rule(rule: &'a P) -> Self { +impl WriteSpliter { + pub fn with_patition_rule(rule: PartitionRuleRef) -> Self { Self { partition_rule: rule, } @@ -156,7 +154,7 @@ fn partition_insert_request( #[cfg(test)] mod tests { - use std::{collections::HashMap, result::Result}; + use std::{collections::HashMap, result::Result, sync::Arc}; use datatypes::{ data_type::ConcreteDataType, @@ -167,10 +165,13 @@ mod tests { use table::requests::InsertRequest; use super::{ - check_req, find_partitioning_values, partition_insert_request, partition_values, - PartitionRule, RegionId, WriteSpliter, + check_req, find_partitioning_values, partition_insert_request, partition_values, RegionId, + WriteSpliter, + }; + use crate::{ + error::Error, + partitioning::{PartitionExpr, PartitionRule, PartitionRuleRef}, }; - use crate::partitioning::PartitionExpr; #[test] fn test_insert_req_check() { @@ -186,7 +187,8 @@ mod tests { #[test] fn test_writer_spliter() { let insert = mock_insert_request(); - let spliter = WriteSpliter::with_patition_rule(&MockPartitionRule); + let rule = Arc::new(MockPartitionRule) as PartitionRuleRef; + let spliter = WriteSpliter::with_patition_rule(rule); let ret = spliter.split(insert).unwrap(); assert_eq!(2, ret.len()); @@ -406,7 +408,7 @@ mod tests { // PARTITION r1 VALUES IN(2, 3), // ); impl PartitionRule for MockPartitionRule { - type Error = String; + type Error = Error; fn partition_columns(&self) -> Vec { vec!["id".to_string()] diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index 39e8a379ac..ba35893b7f 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -1,3 +1,5 @@ +mod insert; + use std::any::Any; use std::collections::{HashMap, HashSet}; use std::sync::Arc; @@ -15,12 +17,14 @@ use snafu::prelude::*; use store_api::storage::RegionId; use table::error::Error as TableError; use table::metadata::{FilterPushDownType, TableInfoRef}; +use table::requests::InsertRequest; use table::Table; use tokio::sync::RwLock; use crate::error::{self, Error, Result}; use crate::mock::{DatanodeId, DatanodeInstance, TableScanPlan}; use crate::partitioning::{Operator, PartitionExpr, PartitionRuleRef}; +use crate::spliter::WriteSpliter; struct DistTable { table_name: String, @@ -44,6 +48,16 @@ impl Table for DistTable { unimplemented!() } + async fn insert(&self, request: InsertRequest) -> table::Result { + let spliter = WriteSpliter::with_patition_rule(self.partition_rule.clone()); + let inserts = spliter.split(request).map_err(TableError::new)?; + let result = match self.dist_insert(inserts).await.map_err(TableError::new)? { + client::ObjectResult::Select(_) => unreachable!(), + client::ObjectResult::Mutate(result) => result, + }; + Ok(result.success as usize) + } + async fn scan( &self, projection: &Option>, diff --git a/src/frontend/src/table/insert.rs b/src/frontend/src/table/insert.rs new file mode 100644 index 0000000000..fea776dbe7 --- /dev/null +++ b/src/frontend/src/table/insert.rs @@ -0,0 +1,202 @@ +use std::collections::HashMap; + +use api::helper::ColumnDataTypeWrapper; +use api::v1::codec; +use api::v1::insert_expr; +use api::v1::insert_expr::Expr; +use api::v1::Column; +use api::v1::InsertExpr; +use api::v1::MutateResult; +use client::ObjectResult; +use snafu::ensure; +use snafu::OptionExt; +use snafu::ResultExt; +use store_api::storage::RegionId; +use table::requests::InsertRequest; + +use super::DistTable; +use crate::error; +use crate::error::Result; + +impl DistTable { + pub async fn dist_insert( + &self, + inserts: HashMap, + ) -> Result { + let mut joins = Vec::with_capacity(inserts.len()); + + for (region_id, insert) in inserts { + let db = self + .region_dist_map + .get(®ion_id) + .context(error::FindDatanodeSnafu { region: region_id })?; + + let instance = self + .datanode_instances + .get(db) + .context(error::DatanodeInstanceSnafu { datanode: *db })?; + + let instance = instance.clone(); + + // TODO(fys): a separate runtime should be used here. + let join = tokio::spawn(async move { + instance + .grpc_insert(to_insert_expr(region_id, insert)?) + .await + .context(error::RequestDatanodeSnafu) + }); + + joins.push(join); + } + + let mut success = 0; + let mut failure = 0; + + for join in joins { + let object_result = join.await.context(error::JoinTaskSnafu)??; + let result = match object_result { + client::ObjectResult::Select(_) => unreachable!(), + client::ObjectResult::Mutate(result) => result, + }; + success += result.success; + failure += result.failure; + } + + Ok(ObjectResult::Mutate(MutateResult { success, failure })) + } +} + +fn to_insert_expr(region_id: RegionId, insert: InsertRequest) -> Result { + let mut row_count = None; + + let columns = insert + .columns_values + .into_iter() + .map(|(column_name, vector)| { + match row_count { + Some(rows) => ensure!( + rows == vector.len(), + error::InvalidInsertRequestSnafu { + reason: "The row count of columns is not the same." + } + ), + + None => row_count = Some(vector.len()), + } + + let datatype: ColumnDataTypeWrapper = vector + .data_type() + .try_into() + .context(error::ColumnDataTypeSnafu)?; + + let mut column = Column { + column_name, + datatype: datatype.datatype() as i32, + ..Default::default() + }; + + column.push_vals(0, vector); + Ok(column) + }) + .collect::>>()?; + + let insert_batch = codec::InsertBatch { + columns, + row_count: row_count.map(|rows| rows as u32).unwrap_or(0), + }; + + let mut options = HashMap::with_capacity(1); + options.insert( + // TODO(fys): Temporarily hard code here + "region_id".to_string(), + codec::RegionId { id: region_id }.into(), + ); + + Ok(InsertExpr { + table_name: insert.table_name, + options, + expr: Some(Expr::Values(insert_expr::Values { + values: vec![insert_batch.into()], + })), + }) +} + +#[cfg(test)] +mod tests { + use std::{collections::HashMap, ops::Deref}; + + use api::v1::{ + codec::{self, InsertBatch}, + insert_expr::Expr, + ColumnDataType, InsertExpr, + }; + use datatypes::{prelude::ConcreteDataType, types::StringType, vectors::VectorBuilder}; + use table::requests::InsertRequest; + + use super::to_insert_expr; + + #[test] + fn test_to_insert_expr() { + let insert_request = mock_insert_request(); + + let insert_expr = to_insert_expr(12, insert_request).unwrap(); + + verify_insert_expr(insert_expr); + } + + fn mock_insert_request() -> InsertRequest { + let mut columns_values = HashMap::with_capacity(4); + + let mut builder = VectorBuilder::new(ConcreteDataType::String(StringType)); + builder.push(&"host1".into()); + builder.push_null(); + builder.push(&"host3".into()); + columns_values.insert("host".to_string(), builder.finish()); + + let mut builder = VectorBuilder::new(ConcreteDataType::int16_datatype()); + builder.push(&1_i16.into()); + builder.push(&2_i16.into()); + builder.push(&3_i16.into()); + columns_values.insert("id".to_string(), builder.finish()); + + InsertRequest { + table_name: "demo".to_string(), + columns_values, + } + } + + fn verify_insert_expr(insert_expr: InsertExpr) { + let table_name = insert_expr.table_name; + assert_eq!("demo", table_name); + + let expr = insert_expr.expr.as_ref().unwrap(); + let vals = match expr { + Expr::Values(vals) => vals, + Expr::Sql(_) => unreachable!(), + }; + + let batch: &[u8] = vals.values[0].as_ref(); + let vals: InsertBatch = batch.try_into().unwrap(); + + for column in vals.columns { + let name = column.column_name; + if name == "id" { + assert_eq!(0, column.null_mask[0]); + assert_eq!(ColumnDataType::Int16 as i32, column.datatype); + assert_eq!(vec![1, 2, 3], column.values.as_ref().unwrap().i16_values); + } + if name == "host" { + assert_eq!(2, column.null_mask[0]); + assert_eq!(ColumnDataType::String as i32, column.datatype); + assert_eq!( + vec!["host1", "host3"], + column.values.as_ref().unwrap().string_values + ); + } + } + + let bytes = insert_expr.options.get("region_id").unwrap(); + let region_id: codec::RegionId = bytes.deref().try_into().unwrap(); + assert_eq!(12, region_id.id); + } +} diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 9060280093..c2d0791657 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -152,6 +152,9 @@ pub enum Error { #[snafu(display("Invalid prometheus remote read query result, msg: {}", msg))] InvalidPromRemoteReadQueryResult { msg: String, backtrace: Backtrace }, + + #[snafu(display("Failed to decode region id, source: {}", source))] + DecodeRegionId { source: api::DecodeError }, } pub type Result = std::result::Result; @@ -186,6 +189,7 @@ impl ErrorExt for Error { | DecodePromRemoteRequest { .. } | DecompressPromRemoteRequest { .. } | InvalidPromRemoteRequest { .. } + | DecodeRegionId { .. } | TimePrecision { .. } => StatusCode::InvalidArguments, InfluxdbLinesWrite { source, .. } => source.status_code(),