diff --git a/src/datanode/src/tests/grpc_test.rs b/src/datanode/src/tests/grpc_test.rs index 14e05050bd..ed0437fa7e 100644 --- a/src/datanode/src/tests/grpc_test.rs +++ b/src/datanode/src/tests/grpc_test.rs @@ -159,7 +159,6 @@ fn expect_data() -> (Column, Column, Column, Column) { } #[tokio::test(flavor = "multi_thread")] -#[ignore] async fn test_insert_and_select() { common_telemetry::init_default_ut_logging(); let (addr, _guard, fe_grpc_server, dn_grpc_server) = diff --git a/src/servers/src/grpc/handler.rs b/src/servers/src/grpc/handler.rs index c37adaaaa5..3b96de603f 100644 --- a/src/servers/src/grpc/handler.rs +++ b/src/servers/src/grpc/handler.rs @@ -28,37 +28,51 @@ impl BatchHandler { } pub async fn batch(&self, batch_req: BatchRequest) -> Result { - let mut batch_resp = BatchResponse::default(); - let mut admin_resp = AdminResponse::default(); - let mut db_resp = DatabaseResponse::default(); - - for admin_req in batch_req.admins { - for admin_expr in admin_req.exprs { - let admin_result = self.admin_handler.exec_admin_request(admin_expr).await?; - admin_resp.results.push(admin_result); - } - } - batch_resp.admins.push(admin_resp); - let (tx, rx) = oneshot::channel(); let query_handler = self.query_handler.clone(); - let _ = self.runtime.spawn(async move { - // execute request in another runtime to prevent the execution from being cancelled unexpected by tonic runtime. - let mut result = vec![]; - for db_req in batch_req.databases { - for obj_expr in db_req.exprs { - let object_resp = query_handler.do_query(obj_expr).await; + let admin_handler = self.admin_handler.clone(); - result.push(object_resp); + let future = async move { + let mut batch_resp = BatchResponse::default(); + let mut admin_resp = AdminResponse::default(); + let mut db_resp = DatabaseResponse::default(); + + for admin_req in batch_req.admins { + admin_resp.results.reserve(admin_req.exprs.len()); + + for admin_expr in admin_req.exprs { + let admin_result = admin_handler.exec_admin_request(admin_expr).await?; + admin_resp.results.push(admin_result); } } + batch_resp.admins.push(admin_resp); + + for db_req in batch_req.databases { + db_resp.results.reserve(db_req.exprs.len()); + + for obj_expr in db_req.exprs { + let object_resp = query_handler.do_query(obj_expr).await?; + + db_resp.results.push(object_resp); + } + } + batch_resp.databases.push(db_resp); + + Ok(batch_resp) + }; + + // Executes requests in another runtime to + // 1. prevent the execution from being cancelled unexpected by tonic runtime. + // 2. avoid the handler blocks the gRPC runtime because `exec_admin_request` may block + // the caller thread. + let _ = self.runtime.spawn(async move { + let result = future.await; + // Ignore send result. Usually an error indicates the rx is dropped (request timeouted). let _ = tx.send(result); }); // Safety: An early-dropped tx usually indicates a serious problem (like panic). This unwrap // is used to poison the upper layer. - db_resp.results = rx.await.unwrap().into_iter().collect::>()?; - batch_resp.databases.push(db_resp); - Ok(batch_resp) + rx.await.unwrap() } }