diff --git a/src/client/src/database.rs b/src/client/src/database.rs index 0da28fd5d7..a6e1b5e5be 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -65,6 +65,24 @@ impl Database { self.object(expr).await?.try_into() } + pub async fn batch_insert(&self, insert_exprs: Vec) -> Result> { + let header = ExprHeader { + version: PROTOCOL_VERSION, + }; + let obj_exprs = insert_exprs + .into_iter() + .map(|expr| ObjectExpr { + header: Some(header.clone()), + expr: Some(object_expr::Expr::Insert(expr)), + }) + .collect(); + self.objects(obj_exprs) + .await? + .into_iter() + .map(|result| result.try_into()) + .collect() + } + pub async fn select(&self, expr: Select) -> Result { let select_expr = match expr { Select::Sql(sql) => SelectExpr { diff --git a/src/frontend/src/instance/influxdb.rs b/src/frontend/src/instance/influxdb.rs index a0832c3727..74ad15f98c 100644 --- a/src/frontend/src/instance/influxdb.rs +++ b/src/frontend/src/instance/influxdb.rs @@ -1,21 +1,18 @@ -use api::v1::{insert_expr::Expr, InsertExpr}; +use api::v1::InsertExpr; use async_trait::async_trait; use common_error::prelude::BoxedError; use servers::influxdb::InfluxdbRequest; -use servers::{ - error::ExecuteQuerySnafu, influxdb::InsertBatches, query_handler::InfluxdbLineProtocolHandler, -}; +use servers::{error::ExecuteQuerySnafu, query_handler::InfluxdbLineProtocolHandler}; use snafu::ResultExt; -use crate::error::RequestDatanodeSnafu; -use crate::error::Result; use crate::instance::Instance; #[async_trait] impl InfluxdbLineProtocolHandler for Instance { async fn exec(&self, request: &InfluxdbRequest) -> servers::error::Result<()> { - // TODO(fys): use batch insert - self.do_insert(request.try_into()?) + let exprs: Vec = request.try_into()?; + self.db + .batch_insert(exprs) .await .map_err(BoxedError::new) .context(ExecuteQuerySnafu { @@ -24,22 +21,3 @@ impl InfluxdbLineProtocolHandler for Instance { Ok(()) } } - -impl Instance { - async fn do_insert(&self, insert_batches: InsertBatches) -> Result<()> { - for (table_name, batch) in insert_batches.data { - let expr = Expr::Values(api::v1::insert_expr::Values { - values: vec![batch.into()], - }); - let _object_result = self - .db - .insert(InsertExpr { - table_name, - expr: Some(expr), - }) - .await - .context(RequestDatanodeSnafu)?; - } - Ok(()) - } -} diff --git a/src/servers/src/influxdb.rs b/src/servers/src/influxdb.rs index e25e0b214e..ffac778f9a 100644 --- a/src/servers/src/influxdb.rs +++ b/src/servers/src/influxdb.rs @@ -1,5 +1,9 @@ use std::collections::HashMap; +use api::v1::{ + insert_expr::{self, Expr}, + InsertExpr, +}; use common_grpc::writer::{LinesWriter, Precision}; use influxdb_line_protocol::{parse_lines, FieldValue}; use snafu::ResultExt; @@ -16,14 +20,10 @@ pub struct InfluxdbRequest { type TableName = String; -pub struct InsertBatches { - pub data: Vec<(TableName, api::v1::codec::InsertBatch)>, -} - -impl TryFrom<&InfluxdbRequest> for InsertBatches { +impl TryFrom<&InfluxdbRequest> for Vec { type Error = Error; - fn try_from(value: &InfluxdbRequest) -> std::result::Result { + fn try_from(value: &InfluxdbRequest) -> Result { let mut writers: HashMap = HashMap::new(); let lines = parse_lines(&value.lines) .collect::>>() @@ -91,25 +91,30 @@ impl TryFrom<&InfluxdbRequest> for InsertBatches { writer.commit(); } - Ok(InsertBatches { - data: writers - .into_iter() - .map(|(table_name, writer)| (table_name, writer.finish())) - .collect(), - }) + Ok(writers + .into_iter() + .map(|(table_name, writer)| InsertExpr { + table_name, + expr: Some(Expr::Values(insert_expr::Values { + values: vec![writer.finish().into()], + })), + }) + .collect()) } } #[cfg(test)] mod tests { + use std::ops::Deref; + use api::v1::{ codec::InsertBatch, column::{SemanticType, Values}, - Column, ColumnDataType, + insert_expr::Expr, + Column, ColumnDataType, InsertExpr, }; use common_base::BitVec; - use super::InsertBatches; use crate::influxdb::InfluxdbRequest; #[test] @@ -125,18 +130,21 @@ monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003"; lines: lines.to_string(), }; - let insert_batches: InsertBatches = influxdb_req.try_into().unwrap(); - let insert_batches = insert_batches.data; + let insert_exprs: Vec = influxdb_req.try_into().unwrap(); - assert_eq!(2, insert_batches.len()); + assert_eq!(2, insert_exprs.len()); - for (table_name, insert_batch) in &insert_batches { - if table_name == "monitor1" { - assert_monitor_1(insert_batch); - } else if table_name == "monitor2" { - assert_monitor_2(insert_batch); - } else { - panic!() + for expr in insert_exprs { + let values = match expr.expr.unwrap() { + Expr::Values(vals) => vals, + Expr::Sql(_) => panic!(), + }; + let raw_batch = values.values.get(0).unwrap(); + let batch: InsertBatch = raw_batch.deref().try_into().unwrap(); + match &expr.table_name[..] { + "monitor1" => assert_monitor_1(&batch), + "monitor2" => assert_monitor_2(&batch), + _ => panic!(), } } } diff --git a/src/servers/tests/http/influxdb_test.rs b/src/servers/tests/http/influxdb_test.rs index 662dbafa90..fe75c5bae5 100644 --- a/src/servers/tests/http/influxdb_test.rs +++ b/src/servers/tests/http/influxdb_test.rs @@ -1,12 +1,13 @@ use std::sync::Arc; +use api::v1::InsertExpr; use async_trait::async_trait; use axum::Router; use axum_test_helper::TestClient; use common_query::Output; use servers::error::Result; use servers::http::HttpServer; -use servers::influxdb::{InfluxdbRequest, InsertBatches}; +use servers::influxdb::InfluxdbRequest; use servers::query_handler::{InfluxdbLineProtocolHandler, SqlQueryHandler}; use tokio::sync::mpsc; @@ -17,10 +18,10 @@ struct DummyInstance { #[async_trait] impl InfluxdbLineProtocolHandler for DummyInstance { async fn exec(&self, request: &InfluxdbRequest) -> Result<()> { - let batches: InsertBatches = request.try_into()?; + let exprs: Vec = request.try_into()?; - for (table_name, _) in batches.data { - let _ = self.tx.send(table_name).await; + for expr in exprs { + let _ = self.tx.send(expr.table_name).await; } Ok(())