feat: batch grpc insert for influxdb write (#295)

This commit is contained in:
fys
2022-10-09 10:49:27 +08:00
committed by GitHub
parent 2e1ab050a7
commit 752be8dc41
4 changed files with 60 additions and 55 deletions

View File

@@ -65,6 +65,24 @@ impl Database {
self.object(expr).await?.try_into()
}
pub async fn batch_insert(&self, insert_exprs: Vec<InsertExpr>) -> Result<Vec<ObjectResult>> {
let header = ExprHeader {
version: PROTOCOL_VERSION,
};
let obj_exprs = insert_exprs
.into_iter()
.map(|expr| ObjectExpr {
header: Some(header.clone()),
expr: Some(object_expr::Expr::Insert(expr)),
})
.collect();
self.objects(obj_exprs)
.await?
.into_iter()
.map(|result| result.try_into())
.collect()
}
pub async fn select(&self, expr: Select) -> Result<ObjectResult> {
let select_expr = match expr {
Select::Sql(sql) => SelectExpr {

View File

@@ -1,21 +1,18 @@
use api::v1::{insert_expr::Expr, InsertExpr};
use api::v1::InsertExpr;
use async_trait::async_trait;
use common_error::prelude::BoxedError;
use servers::influxdb::InfluxdbRequest;
use servers::{
error::ExecuteQuerySnafu, influxdb::InsertBatches, query_handler::InfluxdbLineProtocolHandler,
};
use servers::{error::ExecuteQuerySnafu, query_handler::InfluxdbLineProtocolHandler};
use snafu::ResultExt;
use crate::error::RequestDatanodeSnafu;
use crate::error::Result;
use crate::instance::Instance;
#[async_trait]
impl InfluxdbLineProtocolHandler for Instance {
async fn exec(&self, request: &InfluxdbRequest) -> servers::error::Result<()> {
// TODO(fys): use batch insert
self.do_insert(request.try_into()?)
let exprs: Vec<InsertExpr> = request.try_into()?;
self.db
.batch_insert(exprs)
.await
.map_err(BoxedError::new)
.context(ExecuteQuerySnafu {
@@ -24,22 +21,3 @@ impl InfluxdbLineProtocolHandler for Instance {
Ok(())
}
}
impl Instance {
async fn do_insert(&self, insert_batches: InsertBatches) -> Result<()> {
for (table_name, batch) in insert_batches.data {
let expr = Expr::Values(api::v1::insert_expr::Values {
values: vec![batch.into()],
});
let _object_result = self
.db
.insert(InsertExpr {
table_name,
expr: Some(expr),
})
.await
.context(RequestDatanodeSnafu)?;
}
Ok(())
}
}

View File

@@ -1,5 +1,9 @@
use std::collections::HashMap;
use api::v1::{
insert_expr::{self, Expr},
InsertExpr,
};
use common_grpc::writer::{LinesWriter, Precision};
use influxdb_line_protocol::{parse_lines, FieldValue};
use snafu::ResultExt;
@@ -16,14 +20,10 @@ pub struct InfluxdbRequest {
type TableName = String;
pub struct InsertBatches {
pub data: Vec<(TableName, api::v1::codec::InsertBatch)>,
}
impl TryFrom<&InfluxdbRequest> for InsertBatches {
impl TryFrom<&InfluxdbRequest> for Vec<InsertExpr> {
type Error = Error;
fn try_from(value: &InfluxdbRequest) -> std::result::Result<Self, Self::Error> {
fn try_from(value: &InfluxdbRequest) -> Result<Self, Self::Error> {
let mut writers: HashMap<TableName, LinesWriter> = HashMap::new();
let lines = parse_lines(&value.lines)
.collect::<influxdb_line_protocol::Result<Vec<_>>>()
@@ -91,25 +91,30 @@ impl TryFrom<&InfluxdbRequest> for InsertBatches {
writer.commit();
}
Ok(InsertBatches {
data: writers
.into_iter()
.map(|(table_name, writer)| (table_name, writer.finish()))
.collect(),
})
Ok(writers
.into_iter()
.map(|(table_name, writer)| InsertExpr {
table_name,
expr: Some(Expr::Values(insert_expr::Values {
values: vec![writer.finish().into()],
})),
})
.collect())
}
}
#[cfg(test)]
mod tests {
use std::ops::Deref;
use api::v1::{
codec::InsertBatch,
column::{SemanticType, Values},
Column, ColumnDataType,
insert_expr::Expr,
Column, ColumnDataType, InsertExpr,
};
use common_base::BitVec;
use super::InsertBatches;
use crate::influxdb::InfluxdbRequest;
#[test]
@@ -125,18 +130,21 @@ monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003";
lines: lines.to_string(),
};
let insert_batches: InsertBatches = influxdb_req.try_into().unwrap();
let insert_batches = insert_batches.data;
let insert_exprs: Vec<InsertExpr> = influxdb_req.try_into().unwrap();
assert_eq!(2, insert_batches.len());
assert_eq!(2, insert_exprs.len());
for (table_name, insert_batch) in &insert_batches {
if table_name == "monitor1" {
assert_monitor_1(insert_batch);
} else if table_name == "monitor2" {
assert_monitor_2(insert_batch);
} else {
panic!()
for expr in insert_exprs {
let values = match expr.expr.unwrap() {
Expr::Values(vals) => vals,
Expr::Sql(_) => panic!(),
};
let raw_batch = values.values.get(0).unwrap();
let batch: InsertBatch = raw_batch.deref().try_into().unwrap();
match &expr.table_name[..] {
"monitor1" => assert_monitor_1(&batch),
"monitor2" => assert_monitor_2(&batch),
_ => panic!(),
}
}
}

View File

@@ -1,12 +1,13 @@
use std::sync::Arc;
use api::v1::InsertExpr;
use async_trait::async_trait;
use axum::Router;
use axum_test_helper::TestClient;
use common_query::Output;
use servers::error::Result;
use servers::http::HttpServer;
use servers::influxdb::{InfluxdbRequest, InsertBatches};
use servers::influxdb::InfluxdbRequest;
use servers::query_handler::{InfluxdbLineProtocolHandler, SqlQueryHandler};
use tokio::sync::mpsc;
@@ -17,10 +18,10 @@ struct DummyInstance {
#[async_trait]
impl InfluxdbLineProtocolHandler for DummyInstance {
async fn exec(&self, request: &InfluxdbRequest) -> Result<()> {
let batches: InsertBatches = request.try_into()?;
let exprs: Vec<InsertExpr> = request.try_into()?;
for (table_name, _) in batches.data {
let _ = self.tx.send(table_name).await;
for expr in exprs {
let _ = self.tx.send(expr.table_name).await;
}
Ok(())