diff --git a/Cargo.lock b/Cargo.lock index be89db9b5d..9cfa45d25e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3084,7 +3084,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=0a7b790ed41364b5599dff806d1080bd59c5c9f6#0a7b790ed41364b5599dff806d1080bd59c5c9f6" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=dcf253314a05c9821bd90e7e09a168256c84f250#dcf253314a05c9821bd90e7e09a168256c84f250" dependencies = [ "prost", "tonic", diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index 4e280c72e5..ce95ae0abf 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -10,7 +10,7 @@ common-base = { path = "../common/base" } common-error = { path = "../common/error" } common-time = { path = "../common/time" } datatypes = { path = "../datatypes" } -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "0a7b790ed41364b5599dff806d1080bd59c5c9f6" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "dcf253314a05c9821bd90e7e09a168256c84f250" } prost.workspace = true snafu = { version = "0.7", features = ["backtraces"] } tonic.workspace = true diff --git a/src/servers/src/grpc/database.rs b/src/servers/src/grpc/database.rs index 350827c023..edf214bab0 100644 --- a/src/servers/src/grpc/database.rs +++ b/src/servers/src/grpc/database.rs @@ -15,10 +15,12 @@ use std::sync::Arc; use api::v1::greptime_database_server::GreptimeDatabase; -use api::v1::{greptime_response, AffectedRows, GreptimeRequest, GreptimeResponse}; +use api::v1::greptime_response::Response as RawResponse; +use api::v1::{AffectedRows, GreptimeRequest, GreptimeResponse}; use async_trait::async_trait; use common_query::Output; -use tonic::{Request, Response, Status}; +use futures::StreamExt; +use tonic::{Request, Response, Status, Streaming}; use crate::grpc::handler::GreptimeRequestHandler; use crate::grpc::TonicResult; @@ -44,14 +46,41 @@ impl GreptimeDatabase for DatabaseService { let response = match output { Output::AffectedRows(rows) => GreptimeResponse { header: None, - response: Some(greptime_response::Response::AffectedRows(AffectedRows { - value: rows as _, - })), + response: Some(RawResponse::AffectedRows(AffectedRows { value: rows as _ })), }, Output::Stream(_) | Output::RecordBatches(_) => { - return Err(Status::unimplemented("GreptimeDatabase::handle for query")); + return Err(Status::unimplemented("GreptimeDatabase::Handle for query")); } }; Ok(Response::new(response)) } + + async fn handle_requests( + &self, + request: Request>, + ) -> Result, Status> { + let mut affected_rows = 0; + + let mut stream = request.into_inner(); + while let Some(request) = stream.next().await { + let request = request?; + let output = self.handler.handle_request(request).await?; + match output { + Output::AffectedRows(rows) => affected_rows += rows, + Output::Stream(_) | Output::RecordBatches(_) => { + return Err(Status::unimplemented( + "GreptimeDatabase::HandleRequests for query", + )); + } + } + } + + let response = GreptimeResponse { + header: None, + response: Some(RawResponse::AffectedRows(AffectedRows { + value: affected_rows as u32, + })), + }; + Ok(Response::new(response)) + } }