chore: remove GrpcQueryHandler::put_record_batch (#7844)

chore: remove GrpcQueryHandler::put_record_batch, we should use GrpcQueryHandler::handle_put_record_batch_stream instead

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
Lei, HUANG
2026-03-23 15:12:39 +08:00
committed by GitHub
parent 805536aed1
commit 72f289df50
3 changed files with 1 additions and 77 deletions

View File

@@ -27,7 +27,6 @@ use api::v1::{
use async_stream::try_stream;
use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use common_base::AffectedRows;
use common_error::ext::BoxedError;
use common_grpc::flight::do_put::DoPutResponse;
use common_query::Output;
@@ -260,62 +259,6 @@ impl GrpcQueryHandler for Instance {
.context(server_error::ExecuteGrpcQuerySnafu)
}
async fn put_record_batch(
&self,
request: servers::grpc::flight::PutRecordBatchRequest,
table_ref: &mut Option<TableRef>,
ctx: QueryContextRef,
) -> server_error::Result<AffectedRows> {
let result: Result<AffectedRows> = async {
let table = if let Some(table) = table_ref {
table.clone()
} else {
let table = self
.catalog_manager()
.table(
&request.table_name.catalog_name,
&request.table_name.schema_name,
&request.table_name.table_name,
None,
)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: request.table_name.to_string(),
})?;
*table_ref = Some(table.clone());
table
};
let interceptor_ref = self.plugins.get::<GrpcQueryInterceptorRef<Error>>();
let interceptor = interceptor_ref.as_ref();
interceptor.pre_bulk_insert(table.clone(), ctx.clone())?;
self.plugins
.get::<PermissionCheckerRef>()
.as_ref()
.check_permission(ctx.current_user(), PermissionReq::BulkInsert)
.context(PermissionSnafu)?;
// do we check limit for bulk insert?
self.inserter
.handle_bulk_insert(
table,
request.flight_data,
request.record_batch,
request.schema_bytes,
)
.await
.context(TableOperationSnafu)
}
.await;
result
.map_err(BoxedError::new)
.context(server_error::ExecuteGrpcRequestSnafu)
}
fn handle_put_record_batch_stream(
&self,
stream: servers::grpc::flight::PutRecordBatchRequestStream,

View File

@@ -17,15 +17,13 @@ use std::sync::Arc;
use api::v1::greptime_request::Request;
use async_trait::async_trait;
use common_base::AffectedRows;
use common_grpc::flight::do_put::DoPutResponse;
use common_query::Output;
use futures::Stream;
use session::context::QueryContextRef;
use table::TableRef;
use crate::error::Result;
use crate::grpc::flight::{PutRecordBatchRequest, PutRecordBatchRequestStream};
use crate::grpc::flight::PutRecordBatchRequestStream;
pub type ServerGrpcQueryHandlerRef = Arc<dyn GrpcQueryHandler + Send + Sync>;
@@ -35,13 +33,6 @@ pub type RawRecordBatch = bytes::Bytes;
pub trait GrpcQueryHandler {
async fn do_query(&self, query: Request, ctx: QueryContextRef) -> Result<Output>;
async fn put_record_batch(
&self,
request: PutRecordBatchRequest,
table_ref: &mut Option<TableRef>,
ctx: QueryContextRef,
) -> Result<AffectedRows>;
fn handle_put_record_batch_stream(
&self,
stream: PutRecordBatchRequestStream,

View File

@@ -18,7 +18,6 @@ use api::v1::greptime_request::Request;
use api::v1::query_request::Query;
use async_trait::async_trait;
use catalog::memory::MemoryCatalogManager;
use common_base::AffectedRows;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_grpc::flight::do_put::DoPutResponse;
use common_query::Output;
@@ -149,15 +148,6 @@ impl GrpcQueryHandler for DummyInstance {
Ok(output)
}
async fn put_record_batch(
&self,
_request: servers::grpc::flight::PutRecordBatchRequest,
_table_ref: &mut Option<TableRef>,
_ctx: QueryContextRef,
) -> Result<AffectedRows> {
unimplemented!()
}
fn handle_put_record_batch_stream(
&self,
_stream: servers::grpc::flight::PutRecordBatchRequestStream,