diff --git a/src/client/src/database.rs b/src/client/src/database.rs index b85539955d..11736c1996 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -75,12 +75,24 @@ pub struct Database { } pub struct DatabaseClient { + pub addr: String, pub inner: GreptimeDatabaseClient, } +impl DatabaseClient { + /// Returns a closure that logs the error when the request fails. + pub fn inspect_err<'a>(&'a self, context: &'a str) -> impl Fn(&tonic::Status) + 'a { + let addr = &self.addr; + move |status| { + error!("Failed to {context} request, peer: {addr}, status: {status:?}"); + } + } +} + fn make_database_client(client: &Client) -> Result { - let (_, channel) = client.find_channel()?; + let (addr, channel) = client.find_channel()?; Ok(DatabaseClient { + addr, inner: GreptimeDatabaseClient::new(channel) .max_decoding_message_size(client.max_grpc_recv_message_size()) .max_encoding_message_size(client.max_grpc_send_message_size()), @@ -167,14 +179,19 @@ impl Database { requests: InsertRequests, hints: &[(&str, &str)], ) -> Result { - let mut client = make_database_client(&self.client)?.inner; + let mut client = make_database_client(&self.client)?; let request = self.to_rpc_request(Request::Inserts(requests)); let mut request = tonic::Request::new(request); let metadata = request.metadata_mut(); Self::put_hints(metadata, hints)?; - let response = client.handle(request).await?.into_inner(); + let response = client + .inner + .handle(request) + .await + .inspect_err(client.inspect_err("insert_with_hints"))? + .into_inner(); from_grpc_response(response) } @@ -189,14 +206,19 @@ impl Database { requests: RowInsertRequests, hints: &[(&str, &str)], ) -> Result { - let mut client = make_database_client(&self.client)?.inner; + let mut client = make_database_client(&self.client)?; let request = self.to_rpc_request(Request::RowInserts(requests)); let mut request = tonic::Request::new(request); let metadata = request.metadata_mut(); Self::put_hints(metadata, hints)?; - let response = client.handle(request).await?.into_inner(); + let response = client + .inner + .handle(request) + .await + .inspect_err(client.inspect_err("row_inserts_with_hints"))? + .into_inner(); from_grpc_response(response) } @@ -217,9 +239,14 @@ impl Database { /// Make a request to the database. pub async fn handle(&self, request: Request) -> Result { - let mut client = make_database_client(&self.client)?.inner; + let mut client = make_database_client(&self.client)?; let request = self.to_rpc_request(request); - let response = client.handle(request).await?.into_inner(); + let response = client + .inner + .handle(request) + .await + .inspect_err(client.inspect_err("handle"))? + .into_inner(); from_grpc_response(response) } @@ -231,7 +258,7 @@ impl Database { max_retries: u32, hints: &[(&str, &str)], ) -> Result { - let mut client = make_database_client(&self.client)?.inner; + let mut client = make_database_client(&self.client)?; let mut retries = 0; let request = self.to_rpc_request(request); @@ -240,7 +267,11 @@ impl Database { let mut tonic_request = tonic::Request::new(request.clone()); let metadata = tonic_request.metadata_mut(); Self::put_hints(metadata, hints)?; - let raw_response = client.handle(tonic_request).await; + let raw_response = client + .inner + .handle(tonic_request) + .await + .inspect_err(client.inspect_err("handle")); match (raw_response, retries < max_retries) { (Ok(resp), _) => return from_grpc_response(resp.into_inner()), (Err(err), true) => {