diff --git a/Cargo.lock b/Cargo.lock index e714d0e154..c7ee96b45f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4152,7 +4152,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=c30a2607be4044502094b25c408171a666a8ff6d#c30a2607be4044502094b25c408171a666a8ff6d" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=3489b4742150abe0a769faf1bb60fbb95b061fc8#3489b4742150abe0a769faf1bb60fbb95b061fc8" dependencies = [ "prost", "serde", diff --git a/Cargo.toml b/Cargo.toml index 6e17a0fcc0..92b89faa67 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -77,7 +77,7 @@ datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git derive_builder = "0.12" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "c30a2607be4044502094b25c408171a666a8ff6d" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "3489b4742150abe0a769faf1bb60fbb95b061fc8" } itertools = "0.10" lazy_static = "1.4" once_cell = "1.18" diff --git a/src/client/src/database.rs b/src/client/src/database.rs index 90228ba559..b2c2037ae1 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -19,7 +19,7 @@ use api::v1::query_request::Query; use api::v1::{ AlterExpr, AuthHeader, CompactTableExpr, CreateTableExpr, DdlRequest, DeleteRequests, DropTableExpr, FlushTableExpr, GreptimeRequest, InsertRequests, PromRangeQuery, QueryRequest, - RequestHeader, TruncateTableExpr, + RequestHeader, RowInsertRequests, TruncateTableExpr, }; use arrow_flight::Ticket; use async_stream::stream; @@ -115,6 +115,11 @@ impl Database { self.handle(Request::Inserts(requests)).await } + pub async fn row_insert(&self, requests: RowInsertRequests) -> Result { + let _timer = timer!(metrics::METRIC_GRPC_INSERT); + self.handle(Request::RowInserts(requests)).await + } + pub fn streaming_inserter(&self) -> Result { self.streaming_inserter_with_channel_size(65536) } diff --git a/src/client/src/stream_insert.rs b/src/client/src/stream_insert.rs index 0701490101..e2bfb28760 100644 --- a/src/client/src/stream_insert.rs +++ b/src/client/src/stream_insert.rs @@ -16,6 +16,7 @@ use api::v1::greptime_database_client::GreptimeDatabaseClient; use api::v1::greptime_request::Request; use api::v1::{ AuthHeader, GreptimeRequest, GreptimeResponse, InsertRequest, InsertRequests, RequestHeader, + RowInsertRequest, RowInsertRequests, }; use tokio::sync::mpsc; use tokio::task::JoinHandle; @@ -84,6 +85,18 @@ impl StreamInserter { }) } + pub async fn row_insert(&self, requests: Vec) -> Result<()> { + let inserts = RowInsertRequests { inserts: requests }; + let request = self.to_rpc_request(Request::RowInserts(inserts)); + + self.sender.send(request).await.map_err(|e| { + error::ClientStreamingSnafu { + err_msg: e.to_string(), + } + .build() + }) + } + pub async fn finish(self) -> Result { drop(self.sender); diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index 35ddef81f4..6fed3bc65e 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -14,6 +14,7 @@ pub mod deleter; pub(crate) mod inserter; +pub(crate) mod row_inserter; use std::collections::HashMap; use std::sync::Arc; @@ -23,7 +24,7 @@ use api::v1::ddl_request::Expr as DdlExpr; use api::v1::greptime_request::Request; use api::v1::{ column_def, AlterExpr, CompactTableExpr, CreateDatabaseExpr, CreateTableExpr, DeleteRequests, - FlushTableExpr, InsertRequests, TruncateTableExpr, + FlushTableExpr, InsertRequests, RowInsertRequests, TruncateTableExpr, }; use async_trait::async_trait; use catalog::{CatalogManager, DeregisterTableRequest, RegisterTableRequest}; @@ -72,6 +73,7 @@ use crate::error::{ use crate::expr_factory; use crate::instance::distributed::deleter::DistDeleter; use crate::instance::distributed::inserter::DistInserter; +use crate::instance::distributed::row_inserter::RowDistInserter; use crate::table::DistTable; const MAX_VALUE: &str = "MAXVALUE"; @@ -624,6 +626,20 @@ impl DistInstance { Ok(Output::AffectedRows(affected_rows as usize)) } + async fn handle_row_dist_insert( + &self, + requests: RowInsertRequests, + ctx: QueryContextRef, + ) -> Result { + let inserter = RowDistInserter::new( + ctx.current_catalog().to_owned(), + ctx.current_schema().to_owned(), + self.catalog_manager.clone(), + ); + let affected_rows = inserter.insert(requests).await?; + Ok(Output::AffectedRows(affected_rows as usize)) + } + async fn handle_dist_delete( &self, request: DeleteRequests, @@ -664,8 +680,9 @@ impl GrpcQueryHandler for DistInstance { async fn do_query(&self, request: Request, ctx: QueryContextRef) -> Result { match request { Request::Inserts(requests) => self.handle_dist_insert(requests, ctx).await, - Request::RowInserts(_) | Request::RowDeletes(_) => NotSupportedSnafu { - feat: "row inserts/deletes", + Request::RowInserts(requests) => self.handle_row_dist_insert(requests, ctx).await, + Request::RowDeletes(_) => NotSupportedSnafu { + feat: "row deletes", } .fail(), Request::Deletes(requests) => self.handle_dist_delete(requests, ctx).await, diff --git a/src/frontend/src/instance/distributed/row_inserter.rs b/src/frontend/src/instance/distributed/row_inserter.rs new file mode 100644 index 0000000000..4eabb21de9 --- /dev/null +++ b/src/frontend/src/instance/distributed/row_inserter.rs @@ -0,0 +1,125 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use api::v1::RowInsertRequests; +use catalog::CatalogManager; +use client::Database; +use common_meta::peer::Peer; +use futures_util::future; +use metrics::counter; +use snafu::{OptionExt, ResultExt}; +use table::metadata::TableId; + +use crate::catalog::FrontendCatalogManager; +use crate::error::{ + CatalogSnafu, FindDatanodeSnafu, FindTableRouteSnafu, JoinTaskSnafu, RequestDatanodeSnafu, + Result, SplitInsertSnafu, TableNotFoundSnafu, +}; + +pub struct RowDistInserter { + catalog_name: String, + schema_name: String, + catalog_manager: Arc, +} + +impl RowDistInserter { + pub fn new( + catalog_name: String, + schema_name: String, + catalog_manager: Arc, + ) -> Self { + Self { + catalog_name, + schema_name, + catalog_manager, + } + } + + pub(crate) async fn insert(&self, requests: RowInsertRequests) -> Result { + let requests = self.split(requests).await?; + let results = future::try_join_all(requests.into_iter().map(|(peer, inserts)| { + let datanode_clients = self.catalog_manager.datanode_clients(); + let catalog = self.catalog_name.clone(); + let schema = self.schema_name.clone(); + + common_runtime::spawn_write(async move { + let client = datanode_clients.get_client(&peer).await; + let database = Database::new(catalog, schema, client); + database + .row_insert(inserts) + .await + .context(RequestDatanodeSnafu) + }) + })) + .await + .context(JoinTaskSnafu)?; + + let affected_rows = results.into_iter().sum::>()?; + counter!(crate::metrics::DIST_INGEST_ROW_COUNT, affected_rows as u64); + Ok(affected_rows) + } + + async fn split(&self, requests: RowInsertRequests) -> Result> { + let partition_manager = self.catalog_manager.partition_manager(); + let mut inserts: HashMap = HashMap::new(); + + for req in requests.inserts { + let table_name = req.table_name.clone(); + let table_id = self.get_table_id(table_name.as_str()).await?; + + let req_splits = partition_manager + .split_row_insert_request(table_id, req) + .await + .context(SplitInsertSnafu)?; + let table_route = partition_manager + .find_table_route(table_id) + .await + .context(FindTableRouteSnafu { table_name })?; + + for (region_number, insert) in req_splits { + let peer = + table_route + .find_region_leader(region_number) + .context(FindDatanodeSnafu { + region: region_number, + })?; + inserts + .entry(peer.clone()) + .or_default() + .inserts + .push(insert); + } + } + + Ok(inserts) + } + + async fn get_table_id(&self, table_name: &str) -> Result { + self.catalog_manager + .table(&self.catalog_name, &self.schema_name, table_name) + .await + .context(CatalogSnafu)? + .with_context(|| TableNotFoundSnafu { + table_name: common_catalog::format_full_table_name( + &self.catalog_name, + &self.schema_name, + table_name, + ), + }) + .map(|table| table.table_info().table_id()) + } +} diff --git a/src/frontend/src/instance/influxdb.rs b/src/frontend/src/instance/influxdb.rs index ed2088ae96..36276b71e2 100644 --- a/src/frontend/src/instance/influxdb.rs +++ b/src/frontend/src/instance/influxdb.rs @@ -38,7 +38,7 @@ impl InfluxdbLineProtocolHandler for Instance { let requests = request.try_into()?; let _ = self - .handle_inserts(requests, ctx) + .handle_row_inserts(requests, ctx) .await .map_err(BoxedError::new) .context(servers::error::ExecuteGrpcQuerySnafu)?; diff --git a/src/partition/src/lib.rs b/src/partition/src/lib.rs index 9dba350441..dd05b7f6d8 100644 --- a/src/partition/src/lib.rs +++ b/src/partition/src/lib.rs @@ -21,6 +21,7 @@ pub mod metrics; pub mod partition; pub mod range; pub mod route; +pub mod row_splitter; pub mod splitter; pub use crate::partition::{PartitionRule, PartitionRuleRef}; diff --git a/src/partition/src/manager.rs b/src/partition/src/manager.rs index e6f3021d5f..36353b13f3 100644 --- a/src/partition/src/manager.rs +++ b/src/partition/src/manager.rs @@ -15,6 +15,7 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; +use api::v1::RowInsertRequest; use common_meta::peer::Peer; use common_meta::rpc::router::TableRoute; use common_query::prelude::Expr; @@ -31,6 +32,7 @@ use crate::error::{FindLeaderSnafu, Result}; use crate::partition::{PartitionBound, PartitionDef, PartitionExpr}; use crate::range::RangePartitionRule; use crate::route::TableRoutes; +use crate::row_splitter::{RowInsertRequestSplits, RowSplitter}; use crate::splitter::{DeleteRequestSplit, InsertRequestSplit, WriteSplitter}; use crate::{error, PartitionRuleRef}; @@ -247,6 +249,17 @@ impl PartitionRuleManager { splitter.split_insert(req, schema) } + /// Split [RowInsertRequest] into [RowInsertRequestSplits] according to the partition rule + /// of given table. + pub async fn split_row_insert_request( + &self, + table: TableId, + req: RowInsertRequest, + ) -> Result { + let partition_rule = self.find_table_partition_rule(table).await?; + RowSplitter::new(partition_rule).split(req) + } + pub async fn split_delete_request( &self, table: TableId, diff --git a/src/partition/src/row_splitter.rs b/src/partition/src/row_splitter.rs new file mode 100644 index 0000000000..e3f0dda83b --- /dev/null +++ b/src/partition/src/row_splitter.rs @@ -0,0 +1,322 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use api::helper; +use api::v1::{ColumnSchema, Row, RowInsertRequest, Rows}; +use datatypes::value::Value; +use store_api::storage::RegionNumber; + +use crate::error::Result; +use crate::PartitionRuleRef; + +pub type RowInsertRequestSplits = HashMap; + +pub struct RowSplitter { + partition_rule: PartitionRuleRef, +} + +impl RowSplitter { + pub fn new(partition_rule: PartitionRuleRef) -> Self { + Self { partition_rule } + } + + pub fn split(&self, req: RowInsertRequest) -> Result { + // No partition + let partition_columns = self.partition_rule.partition_columns(); + if partition_columns.is_empty() { + return Ok(HashMap::from([(0, req)])); + } + + // No data + let Some(rows) = req.rows else { + return Ok(HashMap::new()); + }; + + SplitReadRowHelper::new(req.table_name, rows, &self.partition_rule).split_to_requests() + } +} + +struct SplitReadRowHelper<'a> { + table_name: String, + schema: Vec, + rows: Vec, + partition_rule: &'a PartitionRuleRef, + // Map from partition column name to index in the schema/row. + partition_cols_indexes: Vec>, +} + +impl<'a> SplitReadRowHelper<'a> { + fn new(table_name: String, rows: Rows, partition_rule: &'a PartitionRuleRef) -> Self { + let col_name_to_idx = rows + .schema + .iter() + .enumerate() + .map(|(idx, col)| (&col.column_name, idx)) + .collect::>(); + let partition_cols = partition_rule.partition_columns(); + let partition_cols_indexes = partition_cols + .into_iter() + .map(|col_name| col_name_to_idx.get(&col_name).cloned()) + .collect::>(); + + Self { + table_name, + schema: rows.schema, + rows: rows.rows, + partition_rule, + partition_cols_indexes, + } + } + + fn split_to_requests(mut self) -> Result { + let request_splits = self + .split_to_regions()? + .into_iter() + .map(|(region_number, row_indexes)| { + let rows = row_indexes + .into_iter() + .map(|row_idx| std::mem::take(&mut self.rows[row_idx])) + .collect(); + let req = RowInsertRequest { + table_name: self.table_name.clone(), + rows: Some(Rows { + schema: self.schema.clone(), + rows, + }), + region_number, + }; + (region_number, req) + }) + .collect::>(); + + Ok(request_splits) + } + + fn split_to_regions(&self) -> Result>> { + let mut regions_row_indexes: HashMap> = HashMap::new(); + for (row_idx, values) in self.iter_partition_values().enumerate() { + let region_number = self.partition_rule.find_region(&values)?; + regions_row_indexes + .entry(region_number) + .or_default() + .push(row_idx); + } + + Ok(regions_row_indexes) + } + + fn iter_partition_values(&'a self) -> impl Iterator> + 'a { + self.rows.iter().map(|row| { + self.partition_cols_indexes + .iter() + .map(|idx| { + idx.as_ref().map_or(Value::Null, |idx| { + helper::pb_value_to_value_ref(&row.values[*idx]).into() + }) + }) + .collect() + }) + } +} + +#[cfg(test)] +mod tests { + use std::any::Any; + use std::sync::Arc; + + use api::v1::value::ValueData; + use api::v1::{ColumnDataType, SemanticType}; + use serde::{Deserialize, Serialize}; + + use super::*; + use crate::partition::PartitionExpr; + use crate::PartitionRule; + + fn mock_insert_request() -> RowInsertRequest { + let schema = vec![ + ColumnSchema { + column_name: "id".to_string(), + datatype: ColumnDataType::String as i32, + semantic_type: SemanticType::Tag as i32, + }, + ColumnSchema { + column_name: "name".to_string(), + datatype: ColumnDataType::String as i32, + semantic_type: SemanticType::Tag as i32, + }, + ColumnSchema { + column_name: "age".to_string(), + datatype: ColumnDataType::Uint32 as i32, + semantic_type: SemanticType::Field as i32, + }, + ]; + let rows = vec![ + Row { + values: vec![ + ValueData::StringValue("1".to_string()).into(), + ValueData::StringValue("Smith".to_string()).into(), + ValueData::U32Value(20).into(), + ], + }, + Row { + values: vec![ + ValueData::StringValue("2".to_string()).into(), + ValueData::StringValue("Johnson".to_string()).into(), + ValueData::U32Value(21).into(), + ], + }, + Row { + values: vec![ + ValueData::StringValue("3".to_string()).into(), + ValueData::StringValue("Williams".to_string()).into(), + ValueData::U32Value(22).into(), + ], + }, + ]; + RowInsertRequest { + table_name: "t".to_string(), + rows: Some(Rows { schema, rows }), + region_number: 0, + } + } + + #[derive(Debug, Serialize, Deserialize)] + struct MockPartitionRule; + + impl PartitionRule for MockPartitionRule { + fn as_any(&self) -> &dyn Any { + self + } + + fn partition_columns(&self) -> Vec { + vec!["id".to_string()] + } + + fn find_region(&self, values: &[Value]) -> Result { + let val = values.get(0).unwrap().clone(); + let val = match val { + Value::String(v) => v.as_utf8().to_string(), + _ => unreachable!(), + }; + + Ok(val.parse::().unwrap() % 2) + } + + fn find_regions_by_exprs(&self, _: &[PartitionExpr]) -> Result> { + unimplemented!() + } + } + + #[derive(Debug, Serialize, Deserialize)] + struct MockMissedColPartitionRule; + + impl PartitionRule for MockMissedColPartitionRule { + fn as_any(&self) -> &dyn Any { + self + } + + fn partition_columns(&self) -> Vec { + vec!["missed_col".to_string()] + } + + fn find_region(&self, values: &[Value]) -> Result { + let val = values.get(0).unwrap().clone(); + let val = match val { + Value::Null => 1, + _ => 0, + }; + + Ok(val) + } + + fn find_regions_by_exprs(&self, _: &[PartitionExpr]) -> Result> { + unimplemented!() + } + } + + #[derive(Debug, Serialize, Deserialize)] + struct EmptyPartitionRule; + + impl PartitionRule for EmptyPartitionRule { + fn as_any(&self) -> &dyn Any { + self + } + + fn partition_columns(&self) -> Vec { + vec![] + } + + fn find_region(&self, _values: &[Value]) -> Result { + Ok(0) + } + + fn find_regions_by_exprs(&self, _: &[PartitionExpr]) -> Result> { + unimplemented!() + } + } + + #[test] + fn test_writer_splitter() { + let insert_request = mock_insert_request(); + let rule = Arc::new(MockPartitionRule) as PartitionRuleRef; + let splitter = RowSplitter::new(rule); + let splits = splitter.split(insert_request).unwrap(); + + assert_eq!(splits.len(), 2); + + let req0 = &splits[&0]; + let req1 = &splits[&1]; + assert_eq!(req0.region_number, 0); + assert_eq!(req1.region_number, 1); + + let rows0 = req0.rows.as_ref().unwrap(); + let rows1 = req1.rows.as_ref().unwrap(); + assert_eq!(rows0.rows.len(), 1); + assert_eq!(rows1.rows.len(), 2); + } + + #[test] + fn test_missed_col_writer_splitter() { + let insert_request = mock_insert_request(); + let rule = Arc::new(MockMissedColPartitionRule) as PartitionRuleRef; + let splitter = RowSplitter::new(rule); + let splits = splitter.split(insert_request).unwrap(); + + assert_eq!(splits.len(), 1); + + let req = &splits[&1]; + assert_eq!(req.region_number, 1); + + let rows = req.rows.as_ref().unwrap(); + assert_eq!(rows.rows.len(), 3); + } + + #[test] + fn test_empty_partition_rule_writer_splitter() { + let insert_request = mock_insert_request(); + let rule = Arc::new(EmptyPartitionRule) as PartitionRuleRef; + let splitter = RowSplitter::new(rule); + let splits = splitter.split(insert_request).unwrap(); + + assert_eq!(splits.len(), 1); + + let req = &splits[&0]; + assert_eq!(req.region_number, 0); + + let rows = req.rows.as_ref().unwrap(); + assert_eq!(rows.rows.len(), 3); + } +} diff --git a/src/servers/src/influxdb.rs b/src/servers/src/influxdb.rs index 82dd832393..1ad648d83f 100644 --- a/src/servers/src/influxdb.rs +++ b/src/servers/src/influxdb.rs @@ -234,7 +234,7 @@ fn parse_tags<'a>( datatype: ColumnDataType::String as i32, semantic_type: SemanticType::Tag as i32, }); - one_row.push(to_value(ValueData::StringValue(v.to_string()))); + one_row.push(ValueData::StringValue(v.to_string()).into()); } else { check_schema(ColumnDataType::String, SemanticType::Tag, &schema[*index])?; one_row[*index].value_data = Some(ValueData::StringValue(v.to_string())); @@ -269,7 +269,7 @@ fn parse_fields<'a>( datatype: datatype as i32, semantic_type: SemanticType::Field as i32, }); - one_row.push(to_value(value)); + one_row.push(value.into()); } else { check_schema(datatype, SemanticType::Field, &schema[*index])?; one_row[*index].value_data = Some(value); @@ -309,7 +309,7 @@ fn parse_ts( datatype: ColumnDataType::TimestampMillisecond as i32, semantic_type: SemanticType::Timestamp as i32, }); - one_row.push(to_value(ValueData::TsMillisecondValue(ts))) + one_row.push(ValueData::TsMillisecondValue(ts).into()) } else { check_schema( ColumnDataType::TimestampMillisecond, @@ -351,14 +351,6 @@ fn check_schema( Ok(()) } -// TODO(jeremy): impl From for Value -#[inline] -fn to_value(value: ValueData) -> Value { - Value { - value_data: Some(value), - } -} - #[inline] fn unwrap_or_default_precision(precision: Option) -> Precision { if let Some(val) = precision {