feat: GRPC client stream interface for insertion (#1206)

* feat: GRPC client stream interface for insertion

* feat: GRPC client stream interface for insertion
This commit is contained in:
LFC
2023-03-20 18:45:37 +08:00
committed by GitHub
parent f6669a8201
commit ad886f5b3e
3 changed files with 37 additions and 8 deletions

2
Cargo.lock generated
View File

@@ -3084,7 +3084,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=0a7b790ed41364b5599dff806d1080bd59c5c9f6#0a7b790ed41364b5599dff806d1080bd59c5c9f6"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=dcf253314a05c9821bd90e7e09a168256c84f250#dcf253314a05c9821bd90e7e09a168256c84f250"
dependencies = [
"prost",
"tonic",

View File

@@ -10,7 +10,7 @@ common-base = { path = "../common/base" }
common-error = { path = "../common/error" }
common-time = { path = "../common/time" }
datatypes = { path = "../datatypes" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "0a7b790ed41364b5599dff806d1080bd59c5c9f6" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "dcf253314a05c9821bd90e7e09a168256c84f250" }
prost.workspace = true
snafu = { version = "0.7", features = ["backtraces"] }
tonic.workspace = true

View File

@@ -15,10 +15,12 @@
use std::sync::Arc;
use api::v1::greptime_database_server::GreptimeDatabase;
use api::v1::{greptime_response, AffectedRows, GreptimeRequest, GreptimeResponse};
use api::v1::greptime_response::Response as RawResponse;
use api::v1::{AffectedRows, GreptimeRequest, GreptimeResponse};
use async_trait::async_trait;
use common_query::Output;
use tonic::{Request, Response, Status};
use futures::StreamExt;
use tonic::{Request, Response, Status, Streaming};
use crate::grpc::handler::GreptimeRequestHandler;
use crate::grpc::TonicResult;
@@ -44,14 +46,41 @@ impl GreptimeDatabase for DatabaseService {
let response = match output {
Output::AffectedRows(rows) => GreptimeResponse {
header: None,
response: Some(greptime_response::Response::AffectedRows(AffectedRows {
value: rows as _,
})),
response: Some(RawResponse::AffectedRows(AffectedRows { value: rows as _ })),
},
Output::Stream(_) | Output::RecordBatches(_) => {
return Err(Status::unimplemented("GreptimeDatabase::handle for query"));
return Err(Status::unimplemented("GreptimeDatabase::Handle for query"));
}
};
Ok(Response::new(response))
}
async fn handle_requests(
&self,
request: Request<Streaming<GreptimeRequest>>,
) -> Result<Response<GreptimeResponse>, Status> {
let mut affected_rows = 0;
let mut stream = request.into_inner();
while let Some(request) = stream.next().await {
let request = request?;
let output = self.handler.handle_request(request).await?;
match output {
Output::AffectedRows(rows) => affected_rows += rows,
Output::Stream(_) | Output::RecordBatches(_) => {
return Err(Status::unimplemented(
"GreptimeDatabase::HandleRequests for query",
));
}
}
}
let response = GreptimeResponse {
header: None,
response: Some(RawResponse::AffectedRows(AffectedRows {
value: affected_rows as u32,
})),
};
Ok(Response::new(response))
}
}