fix: Fix test_insert_and_select hangs occasionally (#496)

* fix: Also handles admin request in another runtime

* chore: Describe why executes admin request in another runtime

* test: Enable test_insert_and_select
This commit is contained in:
Yingwen
2022-11-14 21:11:25 +08:00
committed by GitHub
parent 68b299e04a
commit 508f4cdfd0
2 changed files with 36 additions and 23 deletions

View File

@@ -159,7 +159,6 @@ fn expect_data() -> (Column, Column, Column, Column) {
}
#[tokio::test(flavor = "multi_thread")]
#[ignore]
async fn test_insert_and_select() {
common_telemetry::init_default_ut_logging();
let (addr, _guard, fe_grpc_server, dn_grpc_server) =

View File

@@ -28,37 +28,51 @@ impl BatchHandler {
}
pub async fn batch(&self, batch_req: BatchRequest) -> Result<BatchResponse> {
let mut batch_resp = BatchResponse::default();
let mut admin_resp = AdminResponse::default();
let mut db_resp = DatabaseResponse::default();
for admin_req in batch_req.admins {
for admin_expr in admin_req.exprs {
let admin_result = self.admin_handler.exec_admin_request(admin_expr).await?;
admin_resp.results.push(admin_result);
}
}
batch_resp.admins.push(admin_resp);
let (tx, rx) = oneshot::channel();
let query_handler = self.query_handler.clone();
let _ = self.runtime.spawn(async move {
// execute request in another runtime to prevent the execution from being cancelled unexpected by tonic runtime.
let mut result = vec![];
for db_req in batch_req.databases {
for obj_expr in db_req.exprs {
let object_resp = query_handler.do_query(obj_expr).await;
let admin_handler = self.admin_handler.clone();
result.push(object_resp);
let future = async move {
let mut batch_resp = BatchResponse::default();
let mut admin_resp = AdminResponse::default();
let mut db_resp = DatabaseResponse::default();
for admin_req in batch_req.admins {
admin_resp.results.reserve(admin_req.exprs.len());
for admin_expr in admin_req.exprs {
let admin_result = admin_handler.exec_admin_request(admin_expr).await?;
admin_resp.results.push(admin_result);
}
}
batch_resp.admins.push(admin_resp);
for db_req in batch_req.databases {
db_resp.results.reserve(db_req.exprs.len());
for obj_expr in db_req.exprs {
let object_resp = query_handler.do_query(obj_expr).await?;
db_resp.results.push(object_resp);
}
}
batch_resp.databases.push(db_resp);
Ok(batch_resp)
};
// Executes requests in another runtime to
// 1. prevent the execution from being cancelled unexpected by tonic runtime.
// 2. avoid the handler blocks the gRPC runtime because `exec_admin_request` may block
// the caller thread.
let _ = self.runtime.spawn(async move {
let result = future.await;
// Ignore send result. Usually an error indicates the rx is dropped (request timeouted).
let _ = tx.send(result);
});
// Safety: An early-dropped tx usually indicates a serious problem (like panic). This unwrap
// is used to poison the upper layer.
db_resp.results = rx.await.unwrap().into_iter().collect::<Result<_>>()?;
batch_resp.databases.push(db_resp);
Ok(batch_resp)
rx.await.unwrap()
}
}