feat: dist row inserter (#2231)

* feat: fronend row inserter

* feat: row splitter

chore: row splitter's unit test

* feat: RowDistInserter

* feat: make influxdb line protocol using row-based protocol

* Update src/partition/src/row_splitter.rs

Co-authored-by: Yingwen <realevenyag@gmail.com>

* Update src/frontend/src/instance/distributed/row_inserter.rs

Co-authored-by: Yingwen <realevenyag@gmail.com>

* chore: by review comment

* Update src/frontend/src/instance/distributed/row_inserter.rs

Co-authored-by: LFC <bayinamine@gmail.com>

* chore: by comment

---------

Co-authored-by: Yingwen <realevenyag@gmail.com>
Co-authored-by: LFC <bayinamine@gmail.com>
This commit is contained in:
JeremyHi
2023-08-24 14:58:05 +08:00
committed by GitHub
parent a7e0e2330e
commit 7746e5b172
11 changed files with 506 additions and 18 deletions

2
Cargo.lock generated
View File

@@ -4152,7 +4152,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=c30a2607be4044502094b25c408171a666a8ff6d#c30a2607be4044502094b25c408171a666a8ff6d"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=3489b4742150abe0a769faf1bb60fbb95b061fc8#3489b4742150abe0a769faf1bb60fbb95b061fc8"
dependencies = [
"prost",
"serde",

View File

@@ -77,7 +77,7 @@ datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git
derive_builder = "0.12"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "c30a2607be4044502094b25c408171a666a8ff6d" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "3489b4742150abe0a769faf1bb60fbb95b061fc8" }
itertools = "0.10"
lazy_static = "1.4"
once_cell = "1.18"

View File

@@ -19,7 +19,7 @@ use api::v1::query_request::Query;
use api::v1::{
AlterExpr, AuthHeader, CompactTableExpr, CreateTableExpr, DdlRequest, DeleteRequests,
DropTableExpr, FlushTableExpr, GreptimeRequest, InsertRequests, PromRangeQuery, QueryRequest,
RequestHeader, TruncateTableExpr,
RequestHeader, RowInsertRequests, TruncateTableExpr,
};
use arrow_flight::Ticket;
use async_stream::stream;
@@ -115,6 +115,11 @@ impl Database {
self.handle(Request::Inserts(requests)).await
}
pub async fn row_insert(&self, requests: RowInsertRequests) -> Result<u32> {
let _timer = timer!(metrics::METRIC_GRPC_INSERT);
self.handle(Request::RowInserts(requests)).await
}
pub fn streaming_inserter(&self) -> Result<StreamInserter> {
self.streaming_inserter_with_channel_size(65536)
}

View File

@@ -16,6 +16,7 @@ use api::v1::greptime_database_client::GreptimeDatabaseClient;
use api::v1::greptime_request::Request;
use api::v1::{
AuthHeader, GreptimeRequest, GreptimeResponse, InsertRequest, InsertRequests, RequestHeader,
RowInsertRequest, RowInsertRequests,
};
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
@@ -84,6 +85,18 @@ impl StreamInserter {
})
}
pub async fn row_insert(&self, requests: Vec<RowInsertRequest>) -> Result<()> {
let inserts = RowInsertRequests { inserts: requests };
let request = self.to_rpc_request(Request::RowInserts(inserts));
self.sender.send(request).await.map_err(|e| {
error::ClientStreamingSnafu {
err_msg: e.to_string(),
}
.build()
})
}
pub async fn finish(self) -> Result<u32> {
drop(self.sender);

View File

@@ -14,6 +14,7 @@
pub mod deleter;
pub(crate) mod inserter;
pub(crate) mod row_inserter;
use std::collections::HashMap;
use std::sync::Arc;
@@ -23,7 +24,7 @@ use api::v1::ddl_request::Expr as DdlExpr;
use api::v1::greptime_request::Request;
use api::v1::{
column_def, AlterExpr, CompactTableExpr, CreateDatabaseExpr, CreateTableExpr, DeleteRequests,
FlushTableExpr, InsertRequests, TruncateTableExpr,
FlushTableExpr, InsertRequests, RowInsertRequests, TruncateTableExpr,
};
use async_trait::async_trait;
use catalog::{CatalogManager, DeregisterTableRequest, RegisterTableRequest};
@@ -72,6 +73,7 @@ use crate::error::{
use crate::expr_factory;
use crate::instance::distributed::deleter::DistDeleter;
use crate::instance::distributed::inserter::DistInserter;
use crate::instance::distributed::row_inserter::RowDistInserter;
use crate::table::DistTable;
const MAX_VALUE: &str = "MAXVALUE";
@@ -624,6 +626,20 @@ impl DistInstance {
Ok(Output::AffectedRows(affected_rows as usize))
}
async fn handle_row_dist_insert(
&self,
requests: RowInsertRequests,
ctx: QueryContextRef,
) -> Result<Output> {
let inserter = RowDistInserter::new(
ctx.current_catalog().to_owned(),
ctx.current_schema().to_owned(),
self.catalog_manager.clone(),
);
let affected_rows = inserter.insert(requests).await?;
Ok(Output::AffectedRows(affected_rows as usize))
}
async fn handle_dist_delete(
&self,
request: DeleteRequests,
@@ -664,8 +680,9 @@ impl GrpcQueryHandler for DistInstance {
async fn do_query(&self, request: Request, ctx: QueryContextRef) -> Result<Output> {
match request {
Request::Inserts(requests) => self.handle_dist_insert(requests, ctx).await,
Request::RowInserts(_) | Request::RowDeletes(_) => NotSupportedSnafu {
feat: "row inserts/deletes",
Request::RowInserts(requests) => self.handle_row_dist_insert(requests, ctx).await,
Request::RowDeletes(_) => NotSupportedSnafu {
feat: "row deletes",
}
.fail(),
Request::Deletes(requests) => self.handle_dist_delete(requests, ctx).await,

View File

@@ -0,0 +1,125 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::RowInsertRequests;
use catalog::CatalogManager;
use client::Database;
use common_meta::peer::Peer;
use futures_util::future;
use metrics::counter;
use snafu::{OptionExt, ResultExt};
use table::metadata::TableId;
use crate::catalog::FrontendCatalogManager;
use crate::error::{
CatalogSnafu, FindDatanodeSnafu, FindTableRouteSnafu, JoinTaskSnafu, RequestDatanodeSnafu,
Result, SplitInsertSnafu, TableNotFoundSnafu,
};
pub struct RowDistInserter {
catalog_name: String,
schema_name: String,
catalog_manager: Arc<FrontendCatalogManager>,
}
impl RowDistInserter {
pub fn new(
catalog_name: String,
schema_name: String,
catalog_manager: Arc<FrontendCatalogManager>,
) -> Self {
Self {
catalog_name,
schema_name,
catalog_manager,
}
}
pub(crate) async fn insert(&self, requests: RowInsertRequests) -> Result<u32> {
let requests = self.split(requests).await?;
let results = future::try_join_all(requests.into_iter().map(|(peer, inserts)| {
let datanode_clients = self.catalog_manager.datanode_clients();
let catalog = self.catalog_name.clone();
let schema = self.schema_name.clone();
common_runtime::spawn_write(async move {
let client = datanode_clients.get_client(&peer).await;
let database = Database::new(catalog, schema, client);
database
.row_insert(inserts)
.await
.context(RequestDatanodeSnafu)
})
}))
.await
.context(JoinTaskSnafu)?;
let affected_rows = results.into_iter().sum::<Result<u32>>()?;
counter!(crate::metrics::DIST_INGEST_ROW_COUNT, affected_rows as u64);
Ok(affected_rows)
}
async fn split(&self, requests: RowInsertRequests) -> Result<HashMap<Peer, RowInsertRequests>> {
let partition_manager = self.catalog_manager.partition_manager();
let mut inserts: HashMap<Peer, RowInsertRequests> = HashMap::new();
for req in requests.inserts {
let table_name = req.table_name.clone();
let table_id = self.get_table_id(table_name.as_str()).await?;
let req_splits = partition_manager
.split_row_insert_request(table_id, req)
.await
.context(SplitInsertSnafu)?;
let table_route = partition_manager
.find_table_route(table_id)
.await
.context(FindTableRouteSnafu { table_name })?;
for (region_number, insert) in req_splits {
let peer =
table_route
.find_region_leader(region_number)
.context(FindDatanodeSnafu {
region: region_number,
})?;
inserts
.entry(peer.clone())
.or_default()
.inserts
.push(insert);
}
}
Ok(inserts)
}
async fn get_table_id(&self, table_name: &str) -> Result<TableId> {
self.catalog_manager
.table(&self.catalog_name, &self.schema_name, table_name)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: common_catalog::format_full_table_name(
&self.catalog_name,
&self.schema_name,
table_name,
),
})
.map(|table| table.table_info().table_id())
}
}

View File

@@ -38,7 +38,7 @@ impl InfluxdbLineProtocolHandler for Instance {
let requests = request.try_into()?;
let _ = self
.handle_inserts(requests, ctx)
.handle_row_inserts(requests, ctx)
.await
.map_err(BoxedError::new)
.context(servers::error::ExecuteGrpcQuerySnafu)?;

View File

@@ -21,6 +21,7 @@ pub mod metrics;
pub mod partition;
pub mod range;
pub mod route;
pub mod row_splitter;
pub mod splitter;
pub use crate::partition::{PartitionRule, PartitionRuleRef};

View File

@@ -15,6 +15,7 @@
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use api::v1::RowInsertRequest;
use common_meta::peer::Peer;
use common_meta::rpc::router::TableRoute;
use common_query::prelude::Expr;
@@ -31,6 +32,7 @@ use crate::error::{FindLeaderSnafu, Result};
use crate::partition::{PartitionBound, PartitionDef, PartitionExpr};
use crate::range::RangePartitionRule;
use crate::route::TableRoutes;
use crate::row_splitter::{RowInsertRequestSplits, RowSplitter};
use crate::splitter::{DeleteRequestSplit, InsertRequestSplit, WriteSplitter};
use crate::{error, PartitionRuleRef};
@@ -247,6 +249,17 @@ impl PartitionRuleManager {
splitter.split_insert(req, schema)
}
/// Split [RowInsertRequest] into [RowInsertRequestSplits] according to the partition rule
/// of given table.
pub async fn split_row_insert_request(
&self,
table: TableId,
req: RowInsertRequest,
) -> Result<RowInsertRequestSplits> {
let partition_rule = self.find_table_partition_rule(table).await?;
RowSplitter::new(partition_rule).split(req)
}
pub async fn split_delete_request(
&self,
table: TableId,

View File

@@ -0,0 +1,322 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use api::helper;
use api::v1::{ColumnSchema, Row, RowInsertRequest, Rows};
use datatypes::value::Value;
use store_api::storage::RegionNumber;
use crate::error::Result;
use crate::PartitionRuleRef;
pub type RowInsertRequestSplits = HashMap<RegionNumber, RowInsertRequest>;
pub struct RowSplitter {
partition_rule: PartitionRuleRef,
}
impl RowSplitter {
pub fn new(partition_rule: PartitionRuleRef) -> Self {
Self { partition_rule }
}
pub fn split(&self, req: RowInsertRequest) -> Result<RowInsertRequestSplits> {
// No partition
let partition_columns = self.partition_rule.partition_columns();
if partition_columns.is_empty() {
return Ok(HashMap::from([(0, req)]));
}
// No data
let Some(rows) = req.rows else {
return Ok(HashMap::new());
};
SplitReadRowHelper::new(req.table_name, rows, &self.partition_rule).split_to_requests()
}
}
struct SplitReadRowHelper<'a> {
table_name: String,
schema: Vec<ColumnSchema>,
rows: Vec<Row>,
partition_rule: &'a PartitionRuleRef,
// Map from partition column name to index in the schema/row.
partition_cols_indexes: Vec<Option<usize>>,
}
impl<'a> SplitReadRowHelper<'a> {
fn new(table_name: String, rows: Rows, partition_rule: &'a PartitionRuleRef) -> Self {
let col_name_to_idx = rows
.schema
.iter()
.enumerate()
.map(|(idx, col)| (&col.column_name, idx))
.collect::<HashMap<_, _>>();
let partition_cols = partition_rule.partition_columns();
let partition_cols_indexes = partition_cols
.into_iter()
.map(|col_name| col_name_to_idx.get(&col_name).cloned())
.collect::<Vec<_>>();
Self {
table_name,
schema: rows.schema,
rows: rows.rows,
partition_rule,
partition_cols_indexes,
}
}
fn split_to_requests(mut self) -> Result<RowInsertRequestSplits> {
let request_splits = self
.split_to_regions()?
.into_iter()
.map(|(region_number, row_indexes)| {
let rows = row_indexes
.into_iter()
.map(|row_idx| std::mem::take(&mut self.rows[row_idx]))
.collect();
let req = RowInsertRequest {
table_name: self.table_name.clone(),
rows: Some(Rows {
schema: self.schema.clone(),
rows,
}),
region_number,
};
(region_number, req)
})
.collect::<HashMap<_, _>>();
Ok(request_splits)
}
fn split_to_regions(&self) -> Result<HashMap<RegionNumber, Vec<usize>>> {
let mut regions_row_indexes: HashMap<RegionNumber, Vec<usize>> = HashMap::new();
for (row_idx, values) in self.iter_partition_values().enumerate() {
let region_number = self.partition_rule.find_region(&values)?;
regions_row_indexes
.entry(region_number)
.or_default()
.push(row_idx);
}
Ok(regions_row_indexes)
}
fn iter_partition_values(&'a self) -> impl Iterator<Item = Vec<Value>> + 'a {
self.rows.iter().map(|row| {
self.partition_cols_indexes
.iter()
.map(|idx| {
idx.as_ref().map_or(Value::Null, |idx| {
helper::pb_value_to_value_ref(&row.values[*idx]).into()
})
})
.collect()
})
}
}
#[cfg(test)]
mod tests {
use std::any::Any;
use std::sync::Arc;
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, SemanticType};
use serde::{Deserialize, Serialize};
use super::*;
use crate::partition::PartitionExpr;
use crate::PartitionRule;
fn mock_insert_request() -> RowInsertRequest {
let schema = vec![
ColumnSchema {
column_name: "id".to_string(),
datatype: ColumnDataType::String as i32,
semantic_type: SemanticType::Tag as i32,
},
ColumnSchema {
column_name: "name".to_string(),
datatype: ColumnDataType::String as i32,
semantic_type: SemanticType::Tag as i32,
},
ColumnSchema {
column_name: "age".to_string(),
datatype: ColumnDataType::Uint32 as i32,
semantic_type: SemanticType::Field as i32,
},
];
let rows = vec![
Row {
values: vec![
ValueData::StringValue("1".to_string()).into(),
ValueData::StringValue("Smith".to_string()).into(),
ValueData::U32Value(20).into(),
],
},
Row {
values: vec![
ValueData::StringValue("2".to_string()).into(),
ValueData::StringValue("Johnson".to_string()).into(),
ValueData::U32Value(21).into(),
],
},
Row {
values: vec![
ValueData::StringValue("3".to_string()).into(),
ValueData::StringValue("Williams".to_string()).into(),
ValueData::U32Value(22).into(),
],
},
];
RowInsertRequest {
table_name: "t".to_string(),
rows: Some(Rows { schema, rows }),
region_number: 0,
}
}
#[derive(Debug, Serialize, Deserialize)]
struct MockPartitionRule;
impl PartitionRule for MockPartitionRule {
fn as_any(&self) -> &dyn Any {
self
}
fn partition_columns(&self) -> Vec<String> {
vec!["id".to_string()]
}
fn find_region(&self, values: &[Value]) -> Result<RegionNumber> {
let val = values.get(0).unwrap().clone();
let val = match val {
Value::String(v) => v.as_utf8().to_string(),
_ => unreachable!(),
};
Ok(val.parse::<u32>().unwrap() % 2)
}
fn find_regions_by_exprs(&self, _: &[PartitionExpr]) -> Result<Vec<RegionNumber>> {
unimplemented!()
}
}
#[derive(Debug, Serialize, Deserialize)]
struct MockMissedColPartitionRule;
impl PartitionRule for MockMissedColPartitionRule {
fn as_any(&self) -> &dyn Any {
self
}
fn partition_columns(&self) -> Vec<String> {
vec!["missed_col".to_string()]
}
fn find_region(&self, values: &[Value]) -> Result<RegionNumber> {
let val = values.get(0).unwrap().clone();
let val = match val {
Value::Null => 1,
_ => 0,
};
Ok(val)
}
fn find_regions_by_exprs(&self, _: &[PartitionExpr]) -> Result<Vec<RegionNumber>> {
unimplemented!()
}
}
#[derive(Debug, Serialize, Deserialize)]
struct EmptyPartitionRule;
impl PartitionRule for EmptyPartitionRule {
fn as_any(&self) -> &dyn Any {
self
}
fn partition_columns(&self) -> Vec<String> {
vec![]
}
fn find_region(&self, _values: &[Value]) -> Result<RegionNumber> {
Ok(0)
}
fn find_regions_by_exprs(&self, _: &[PartitionExpr]) -> Result<Vec<RegionNumber>> {
unimplemented!()
}
}
#[test]
fn test_writer_splitter() {
let insert_request = mock_insert_request();
let rule = Arc::new(MockPartitionRule) as PartitionRuleRef;
let splitter = RowSplitter::new(rule);
let splits = splitter.split(insert_request).unwrap();
assert_eq!(splits.len(), 2);
let req0 = &splits[&0];
let req1 = &splits[&1];
assert_eq!(req0.region_number, 0);
assert_eq!(req1.region_number, 1);
let rows0 = req0.rows.as_ref().unwrap();
let rows1 = req1.rows.as_ref().unwrap();
assert_eq!(rows0.rows.len(), 1);
assert_eq!(rows1.rows.len(), 2);
}
#[test]
fn test_missed_col_writer_splitter() {
let insert_request = mock_insert_request();
let rule = Arc::new(MockMissedColPartitionRule) as PartitionRuleRef;
let splitter = RowSplitter::new(rule);
let splits = splitter.split(insert_request).unwrap();
assert_eq!(splits.len(), 1);
let req = &splits[&1];
assert_eq!(req.region_number, 1);
let rows = req.rows.as_ref().unwrap();
assert_eq!(rows.rows.len(), 3);
}
#[test]
fn test_empty_partition_rule_writer_splitter() {
let insert_request = mock_insert_request();
let rule = Arc::new(EmptyPartitionRule) as PartitionRuleRef;
let splitter = RowSplitter::new(rule);
let splits = splitter.split(insert_request).unwrap();
assert_eq!(splits.len(), 1);
let req = &splits[&0];
assert_eq!(req.region_number, 0);
let rows = req.rows.as_ref().unwrap();
assert_eq!(rows.rows.len(), 3);
}
}

View File

@@ -234,7 +234,7 @@ fn parse_tags<'a>(
datatype: ColumnDataType::String as i32,
semantic_type: SemanticType::Tag as i32,
});
one_row.push(to_value(ValueData::StringValue(v.to_string())));
one_row.push(ValueData::StringValue(v.to_string()).into());
} else {
check_schema(ColumnDataType::String, SemanticType::Tag, &schema[*index])?;
one_row[*index].value_data = Some(ValueData::StringValue(v.to_string()));
@@ -269,7 +269,7 @@ fn parse_fields<'a>(
datatype: datatype as i32,
semantic_type: SemanticType::Field as i32,
});
one_row.push(to_value(value));
one_row.push(value.into());
} else {
check_schema(datatype, SemanticType::Field, &schema[*index])?;
one_row[*index].value_data = Some(value);
@@ -309,7 +309,7 @@ fn parse_ts(
datatype: ColumnDataType::TimestampMillisecond as i32,
semantic_type: SemanticType::Timestamp as i32,
});
one_row.push(to_value(ValueData::TsMillisecondValue(ts)))
one_row.push(ValueData::TsMillisecondValue(ts).into())
} else {
check_schema(
ColumnDataType::TimestampMillisecond,
@@ -351,14 +351,6 @@ fn check_schema(
Ok(())
}
// TODO(jeremy): impl From<ValueData> for Value
#[inline]
fn to_value(value: ValueData) -> Value {
Value {
value_data: Some(value),
}
}
#[inline]
fn unwrap_or_default_precision(precision: Option<Precision>) -> Precision {
if let Some(val) = precision {