From 313121f2ae0c5d92a1204e8da69355d7b890cb80 Mon Sep 17 00:00:00 2001 From: fys <40801205+Fengys123@users.noreply.github.com> Date: Tue, 27 Jun 2023 16:57:03 +0800 Subject: [PATCH] fix: block when stream insert (#1835) * fix: stream insert blocking * fix: example link * chore: Increase the default channel size "1024" -> "65536" --- Cargo.lock | 1 + src/client/Cargo.toml | 1 + src/client/examples/stream_ingest.rs | 183 +++++++++++++++++++++++++++ src/client/src/database.rs | 44 +++---- src/client/src/lib.rs | 2 + src/client/src/stream_insert.rs | 115 +++++++++++++++++ 6 files changed, 319 insertions(+), 27 deletions(-) create mode 100644 src/client/examples/stream_ingest.rs create mode 100644 src/client/src/stream_insert.rs diff --git a/Cargo.lock b/Cargo.lock index fe703d366e..a7ae3e45c0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1527,6 +1527,7 @@ dependencies = [ "datafusion", "datanode", "datatypes", + "derive-new", "enum_dispatch", "futures-util", "moka 0.9.7", diff --git a/src/client/Cargo.toml b/src/client/Cargo.toml index 1a937a947d..5db0a910ef 100644 --- a/src/client/Cargo.toml +++ b/src/client/Cargo.toml @@ -36,6 +36,7 @@ tonic.workspace = true [dev-dependencies] datanode = { path = "../datanode" } +derive-new = "0.5" substrait = { path = "../common/substrait" } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/src/client/examples/stream_ingest.rs b/src/client/examples/stream_ingest.rs new file mode 100644 index 0000000000..0a1fb84b56 --- /dev/null +++ b/src/client/examples/stream_ingest.rs @@ -0,0 +1,183 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::column::*; +use api::v1::*; +use client::{Client, Database, DEFAULT_SCHEMA_NAME}; +use derive_new::new; +use tracing::{error, info}; + +fn main() { + tracing::subscriber::set_global_default(tracing_subscriber::FmtSubscriber::builder().finish()) + .unwrap(); + + run(); +} + +#[tokio::main] +async fn run() { + let greptimedb_endpoint = + std::env::var("GREPTIMEDB_ENDPOINT").unwrap_or_else(|_| "localhost:4001".to_owned()); + + let greptimedb_dbname = + std::env::var("GREPTIMEDB_DBNAME").unwrap_or_else(|_| DEFAULT_SCHEMA_NAME.to_owned()); + + let grpc_client = Client::with_urls(vec![&greptimedb_endpoint]); + + let client = Database::new_with_dbname(greptimedb_dbname, grpc_client); + + let stream_inserter = client.streaming_inserter().unwrap(); + + if let Err(e) = stream_inserter + .insert(vec![to_insert_request(weather_records_1())]) + .await + { + error!("Error: {e}"); + } + + if let Err(e) = stream_inserter + .insert(vec![to_insert_request(weather_records_2())]) + .await + { + error!("Error: {e}"); + } + + let result = stream_inserter.finish().await; + + match result { + Ok(rows) => { + info!("Rows written: {rows}"); + } + Err(e) => { + error!("Error: {e}"); + } + }; +} + +#[derive(new)] +struct WeatherRecord { + timestamp_millis: i64, + collector: String, + temperature: f32, + humidity: i32, +} + +fn weather_records_1() -> Vec { + vec![ + WeatherRecord::new(1686109527000, "c1".to_owned(), 26.4, 15), + WeatherRecord::new(1686023127000, "c1".to_owned(), 29.3, 20), + WeatherRecord::new(1685936727000, "c1".to_owned(), 31.8, 13), + WeatherRecord::new(1686109527000, "c2".to_owned(), 20.4, 67), + WeatherRecord::new(1686023127000, "c2".to_owned(), 18.0, 74), + WeatherRecord::new(1685936727000, "c2".to_owned(), 19.2, 81), + ] +} + +fn weather_records_2() -> Vec { + vec![ + WeatherRecord::new(1686109527001, "c3".to_owned(), 26.4, 15), + WeatherRecord::new(1686023127002, "c3".to_owned(), 29.3, 20), + WeatherRecord::new(1685936727003, "c3".to_owned(), 31.8, 13), + WeatherRecord::new(1686109527004, "c4".to_owned(), 20.4, 67), + WeatherRecord::new(1686023127005, "c4".to_owned(), 18.0, 74), + WeatherRecord::new(1685936727006, "c4".to_owned(), 19.2, 81), + ] +} + +/// This function generates some random data and bundle them into a +/// `InsertRequest`. +/// +/// Data structure: +/// +/// - `ts`: a timestamp column +/// - `collector`: a tag column +/// - `temperature`: a value field of f32 +/// - `humidity`: a value field of i32 +/// +fn to_insert_request(records: Vec) -> InsertRequest { + // convert records into columns + let rows = records.len(); + + // transpose records into columns + let (timestamp_millis, collectors, temp, humidity) = records.into_iter().fold( + ( + Vec::with_capacity(rows), + Vec::with_capacity(rows), + Vec::with_capacity(rows), + Vec::with_capacity(rows), + ), + |mut acc, rec| { + acc.0.push(rec.timestamp_millis); + acc.1.push(rec.collector); + acc.2.push(rec.temperature); + acc.3.push(rec.humidity); + + acc + }, + ); + + let columns = vec![ + // timestamp column: `ts` + Column { + column_name: "ts".to_owned(), + values: Some(column::Values { + ts_millisecond_values: timestamp_millis, + ..Default::default() + }), + semantic_type: SemanticType::Timestamp as i32, + datatype: ColumnDataType::TimestampMillisecond as i32, + ..Default::default() + }, + // tag column: collectors + Column { + column_name: "collector".to_owned(), + values: Some(column::Values { + string_values: collectors.into_iter().collect(), + ..Default::default() + }), + semantic_type: SemanticType::Tag as i32, + datatype: ColumnDataType::String as i32, + ..Default::default() + }, + // field column: temperature + Column { + column_name: "temperature".to_owned(), + values: Some(column::Values { + f32_values: temp, + ..Default::default() + }), + semantic_type: SemanticType::Field as i32, + datatype: ColumnDataType::Float32 as i32, + ..Default::default() + }, + // field column: humidity + Column { + column_name: "humidity".to_owned(), + values: Some(column::Values { + i32_values: humidity, + ..Default::default() + }), + semantic_type: SemanticType::Field as i32, + datatype: ColumnDataType::Int32 as i32, + ..Default::default() + }, + ]; + + InsertRequest { + table_name: "weather_demo".to_owned(), + columns, + row_count: rows as u32, + ..Default::default() + } +} diff --git a/src/client/src/database.rs b/src/client/src/database.rs index b3688fb9d1..b48e06becb 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -29,14 +29,11 @@ use common_telemetry::{logging, timer}; use futures_util::{TryFutureExt, TryStreamExt}; use prost::Message; use snafu::{ensure, ResultExt}; -use tokio::sync::mpsc::Sender; -use tokio::sync::{mpsc, OnceCell}; -use tokio_stream::wrappers::ReceiverStream; use crate::error::{ ConvertFlightDataSnafu, IllegalDatabaseResponseSnafu, IllegalFlightMessagesSnafu, }; -use crate::{error, metrics, Client, Result}; +use crate::{error, metrics, Client, Result, StreamInserter}; #[derive(Clone, Debug, Default)] pub struct Database { @@ -50,7 +47,6 @@ pub struct Database { dbname: String, client: Client, - streaming_client: OnceCell>, ctx: FlightContext, } @@ -62,7 +58,6 @@ impl Database { schema: schema.into(), dbname: "".to_string(), client, - streaming_client: OnceCell::new(), ctx: FlightContext::default(), } } @@ -80,7 +75,6 @@ impl Database { schema: "".to_string(), dbname: dbname.into(), client, - streaming_client: OnceCell::new(), ctx: FlightContext::default(), } } @@ -120,20 +114,24 @@ impl Database { self.handle(Request::Inserts(requests)).await } - pub async fn insert_to_stream(&self, requests: InsertRequests) -> Result<()> { - let streaming_client = self - .streaming_client - .get_or_try_init(|| self.client_stream()) - .await?; + pub fn streaming_inserter(&self) -> Result { + self.streaming_inserter_with_channel_size(65536) + } - let request = self.to_rpc_request(Request::Inserts(requests)); + pub fn streaming_inserter_with_channel_size( + &self, + channel_size: usize, + ) -> Result { + let client = self.client.make_database_client()?.inner; - streaming_client.send(request).await.map_err(|e| { - error::ClientStreamingSnafu { - err_msg: e.to_string(), - } - .build() - }) + let stream_inserter = StreamInserter::new( + client, + self.dbname().to_string(), + self.ctx.auth_header.clone(), + channel_size, + ); + + Ok(stream_inserter) } pub async fn delete(&self, request: DeleteRequest) -> Result { @@ -169,14 +167,6 @@ impl Database { } } - async fn client_stream(&self) -> Result> { - let mut client = self.client.make_database_client()?.inner; - let (sender, receiver) = mpsc::channel::(65536); - let receiver = ReceiverStream::new(receiver); - let _ = client.handle_requests(receiver).await?; - Ok(sender) - } - pub async fn sql(&self, sql: &str) -> Result { let _timer = timer!(metrics::METRIC_GRPC_SQL); self.do_get(Request::Query(QueryRequest { diff --git a/src/client/src/lib.rs b/src/client/src/lib.rs index c12d952156..3c1ae0a514 100644 --- a/src/client/src/lib.rs +++ b/src/client/src/lib.rs @@ -18,6 +18,7 @@ mod database; mod error; pub mod load_balance; mod metrics; +mod stream_insert; pub use api; pub use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; @@ -25,3 +26,4 @@ pub use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; pub use self::client::Client; pub use self::database::Database; pub use self::error::{Error, Result}; +pub use self::stream_insert::StreamInserter; diff --git a/src/client/src/stream_insert.rs b/src/client/src/stream_insert.rs new file mode 100644 index 0000000000..943f9a0ad7 --- /dev/null +++ b/src/client/src/stream_insert.rs @@ -0,0 +1,115 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::greptime_database_client::GreptimeDatabaseClient; +use api::v1::greptime_request::Request; +use api::v1::{ + greptime_response, AffectedRows, AuthHeader, GreptimeRequest, GreptimeResponse, InsertRequest, + InsertRequests, RequestHeader, +}; +use snafu::OptionExt; +use tokio::sync::mpsc; +use tokio::task::JoinHandle; +use tokio_stream::wrappers::ReceiverStream; +use tonic::transport::Channel; +use tonic::{Response, Status}; + +use crate::error::{self, IllegalDatabaseResponseSnafu, Result}; + +/// A structure that provides some methods for streaming data insert. +/// +/// [`StreamInserter`] cannot be constructed via the `StreamInserter::new` method. +/// You can use the following way to obtain [`StreamInserter`]. +/// +/// ```ignore +/// let grpc_client = Client::with_urls(vec!["127.0.0.1:4002"]); +/// let client = Database::new_with_dbname("db_name", grpc_client); +/// let stream_inserter = client.streaming_inserter().unwrap(); +/// ``` +/// +/// If you want to see a concrete usage example, please see +/// [stream_inserter.rs](https://github.com/GreptimeTeam/greptimedb/blob/develop/src/client/examples/stream_ingest.rs). +pub struct StreamInserter { + sender: mpsc::Sender, + + auth_header: Option, + + dbname: String, + + join: JoinHandle, Status>>, +} + +impl StreamInserter { + pub(crate) fn new( + mut client: GreptimeDatabaseClient, + dbname: String, + auth_header: Option, + channel_size: usize, + ) -> StreamInserter { + let (send, recv) = tokio::sync::mpsc::channel(channel_size); + + let join: JoinHandle, Status>> = + tokio::spawn(async move { + let recv_stream = ReceiverStream::new(recv); + client.handle_requests(recv_stream).await + }); + + StreamInserter { + sender: send, + auth_header, + dbname, + join, + } + } + + pub async fn insert(&self, requests: Vec) -> Result<()> { + let inserts = InsertRequests { inserts: requests }; + let request = self.to_rpc_request(Request::Inserts(inserts)); + + self.sender.send(request).await.map_err(|e| { + error::ClientStreamingSnafu { + err_msg: e.to_string(), + } + .build() + }) + } + + pub async fn finish(self) -> Result { + drop(self.sender); + + let response = self.join.await.unwrap()?; + + let response = response + .into_inner() + .response + .context(IllegalDatabaseResponseSnafu { + err_msg: "GreptimeResponse is empty", + })?; + + let greptime_response::Response::AffectedRows(AffectedRows { value }) = response; + + Ok(value) + } + + fn to_rpc_request(&self, request: Request) -> GreptimeRequest { + GreptimeRequest { + header: Some(RequestHeader { + authorization: self.auth_header.clone(), + dbname: self.dbname.clone(), + ..Default::default() + }), + request: Some(request), + } + } +}