refactor: add row_inserts() and row_inserts_with_hints(). (#6503)

Signed-off-by: zyy17 <zyylsxm@gmail.com>
This commit is contained in:
zyy17
2025-07-11 16:06:36 +08:00
committed by GitHub
parent c1847e6b6a
commit 93e3a04aa8

View File

@@ -23,7 +23,7 @@ use api::v1::greptime_request::Request;
use api::v1::query_request::Query;
use api::v1::{
AlterTableExpr, AuthHeader, Basic, CreateTableExpr, DdlRequest, GreptimeRequest,
InsertRequests, QueryRequest, RequestHeader,
InsertRequests, QueryRequest, RequestHeader, RowInsertRequests,
};
use arrow_flight::{FlightData, Ticket};
use async_stream::stream;
@@ -118,6 +118,7 @@ impl Database {
}
}
/// Set the catalog for the database client.
pub fn set_catalog(&mut self, catalog: impl Into<String>) {
self.catalog = catalog.into();
}
@@ -130,6 +131,7 @@ impl Database {
}
}
/// Set the schema for the database client.
pub fn set_schema(&mut self, schema: impl Into<String>) {
self.schema = schema.into();
}
@@ -142,20 +144,24 @@ impl Database {
}
}
/// Set the timezone for the database client.
pub fn set_timezone(&mut self, timezone: impl Into<String>) {
self.timezone = timezone.into();
}
/// Set the auth scheme for the database client.
pub fn set_auth(&mut self, auth: AuthScheme) {
self.ctx.auth_header = Some(AuthHeader {
auth_scheme: Some(auth),
});
}
/// Make an InsertRequests request to the database.
pub async fn insert(&self, requests: InsertRequests) -> Result<u32> {
self.handle(Request::Inserts(requests)).await
}
/// Make an InsertRequests request to the database with hints.
pub async fn insert_with_hints(
&self,
requests: InsertRequests,
@@ -172,6 +178,28 @@ impl Database {
from_grpc_response(response)
}
/// Make a RowInsertRequests request to the database.
pub async fn row_inserts(&self, requests: RowInsertRequests) -> Result<u32> {
self.handle(Request::RowInserts(requests)).await
}
/// Make a RowInsertRequests request to the database with hints.
pub async fn row_inserts_with_hints(
&self,
requests: RowInsertRequests,
hints: &[(&str, &str)],
) -> Result<u32> {
let mut client = make_database_client(&self.client)?.inner;
let request = self.to_rpc_request(Request::RowInserts(requests));
let mut request = tonic::Request::new(request);
let metadata = request.metadata_mut();
Self::put_hints(metadata, hints)?;
let response = client.handle(request).await?.into_inner();
from_grpc_response(response)
}
fn put_hints(metadata: &mut MetadataMap, hints: &[(&str, &str)]) -> Result<()> {
let Some(value) = hints
.iter()
@@ -187,6 +215,7 @@ impl Database {
Ok(())
}
/// Make a request to the database.
pub async fn handle(&self, request: Request) -> Result<u32> {
let mut client = make_database_client(&self.client)?.inner;
let request = self.to_rpc_request(request);
@@ -250,6 +279,7 @@ impl Database {
}
}
/// Executes a SQL query without any hints.
pub async fn sql<S>(&self, sql: S) -> Result<Output>
where
S: AsRef<str>,
@@ -257,6 +287,7 @@ impl Database {
self.sql_with_hint(sql, &[]).await
}
/// Executes a SQL query with optional hints for query optimization.
pub async fn sql_with_hint<S>(&self, sql: S, hints: &[(&str, &str)]) -> Result<Output>
where
S: AsRef<str>,
@@ -267,6 +298,7 @@ impl Database {
self.do_get(request, hints).await
}
/// Executes a logical plan directly without SQL parsing.
pub async fn logical_plan(&self, logical_plan: Vec<u8>) -> Result<Output> {
let request = Request::Query(QueryRequest {
query: Some(Query::LogicalPlan(logical_plan)),
@@ -274,6 +306,7 @@ impl Database {
self.do_get(request, &[]).await
}
/// Creates a new table using the provided table expression.
pub async fn create(&self, expr: CreateTableExpr) -> Result<Output> {
let request = Request::Ddl(DdlRequest {
expr: Some(DdlExpr::CreateTable(expr)),
@@ -281,6 +314,7 @@ impl Database {
self.do_get(request, &[]).await
}
/// Alters an existing table using the provided alter expression.
pub async fn alter(&self, expr: AlterTableExpr) -> Result<Output> {
let request = Request::Ddl(DdlRequest {
expr: Some(DdlExpr::AlterTable(expr)),