From 72f289df503d9c4496d383362f35de152775e489 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Mon, 23 Mar 2026 15:12:39 +0800 Subject: [PATCH] chore: remove GrpcQueryHandler::put_record_batch (#7844) chore: remove GrpcQueryHandler::put_record_batch, we should use GrpcQueryHandler::handle_put_record_batch_stream instead Signed-off-by: Lei, HUANG --- src/frontend/src/instance/grpc.rs | 57 --------------------------- src/servers/src/query_handler/grpc.rs | 11 +----- src/servers/tests/mod.rs | 10 ----- 3 files changed, 1 insertion(+), 77 deletions(-) diff --git a/src/frontend/src/instance/grpc.rs b/src/frontend/src/instance/grpc.rs index c4191145f8..70ff50fadc 100644 --- a/src/frontend/src/instance/grpc.rs +++ b/src/frontend/src/instance/grpc.rs @@ -27,7 +27,6 @@ use api::v1::{ use async_stream::try_stream; use async_trait::async_trait; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; -use common_base::AffectedRows; use common_error::ext::BoxedError; use common_grpc::flight::do_put::DoPutResponse; use common_query::Output; @@ -260,62 +259,6 @@ impl GrpcQueryHandler for Instance { .context(server_error::ExecuteGrpcQuerySnafu) } - async fn put_record_batch( - &self, - request: servers::grpc::flight::PutRecordBatchRequest, - table_ref: &mut Option, - ctx: QueryContextRef, - ) -> server_error::Result { - let result: Result = async { - let table = if let Some(table) = table_ref { - table.clone() - } else { - let table = self - .catalog_manager() - .table( - &request.table_name.catalog_name, - &request.table_name.schema_name, - &request.table_name.table_name, - None, - ) - .await - .context(CatalogSnafu)? - .with_context(|| TableNotFoundSnafu { - table_name: request.table_name.to_string(), - })?; - *table_ref = Some(table.clone()); - table - }; - - let interceptor_ref = self.plugins.get::>(); - let interceptor = interceptor_ref.as_ref(); - interceptor.pre_bulk_insert(table.clone(), ctx.clone())?; - - self.plugins - .get::() - .as_ref() - .check_permission(ctx.current_user(), PermissionReq::BulkInsert) - .context(PermissionSnafu)?; - - // do we check limit for bulk insert? - - self.inserter - .handle_bulk_insert( - table, - request.flight_data, - request.record_batch, - request.schema_bytes, - ) - .await - .context(TableOperationSnafu) - } - .await; - - result - .map_err(BoxedError::new) - .context(server_error::ExecuteGrpcRequestSnafu) - } - fn handle_put_record_batch_stream( &self, stream: servers::grpc::flight::PutRecordBatchRequestStream, diff --git a/src/servers/src/query_handler/grpc.rs b/src/servers/src/query_handler/grpc.rs index 67d8b3890e..d66a76464e 100644 --- a/src/servers/src/query_handler/grpc.rs +++ b/src/servers/src/query_handler/grpc.rs @@ -17,15 +17,13 @@ use std::sync::Arc; use api::v1::greptime_request::Request; use async_trait::async_trait; -use common_base::AffectedRows; use common_grpc::flight::do_put::DoPutResponse; use common_query::Output; use futures::Stream; use session::context::QueryContextRef; -use table::TableRef; use crate::error::Result; -use crate::grpc::flight::{PutRecordBatchRequest, PutRecordBatchRequestStream}; +use crate::grpc::flight::PutRecordBatchRequestStream; pub type ServerGrpcQueryHandlerRef = Arc; @@ -35,13 +33,6 @@ pub type RawRecordBatch = bytes::Bytes; pub trait GrpcQueryHandler { async fn do_query(&self, query: Request, ctx: QueryContextRef) -> Result; - async fn put_record_batch( - &self, - request: PutRecordBatchRequest, - table_ref: &mut Option, - ctx: QueryContextRef, - ) -> Result; - fn handle_put_record_batch_stream( &self, stream: PutRecordBatchRequestStream, diff --git a/src/servers/tests/mod.rs b/src/servers/tests/mod.rs index e3f8f8fc79..c4f83c5e6c 100644 --- a/src/servers/tests/mod.rs +++ b/src/servers/tests/mod.rs @@ -18,7 +18,6 @@ use api::v1::greptime_request::Request; use api::v1::query_request::Query; use async_trait::async_trait; use catalog::memory::MemoryCatalogManager; -use common_base::AffectedRows; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_grpc::flight::do_put::DoPutResponse; use common_query::Output; @@ -149,15 +148,6 @@ impl GrpcQueryHandler for DummyInstance { Ok(output) } - async fn put_record_batch( - &self, - _request: servers::grpc::flight::PutRecordBatchRequest, - _table_ref: &mut Option, - _ctx: QueryContextRef, - ) -> Result { - unimplemented!() - } - fn handle_put_record_batch_stream( &self, _stream: servers::grpc::flight::PutRecordBatchRequestStream,