From 3d942f676356cb557d791c7559e191075460540d Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Sat, 24 May 2025 15:02:42 +0800 Subject: [PATCH] fix: bulk insert case sensitive (#6165) * fix/bulk-insert-case-sensitive: Add error inspection for gRPC bulk insert in `greptime_handler.rs` - Enhanced error handling by adding `inspect_err` to log errors during the `put_record_batch` operation in `greptime_handler.rs`. * fix: silient error while bulk ingest with uppercase columns --- src/partition/src/expr.rs | 2 +- src/servers/src/grpc/greptime_handler.rs | 3 ++- tests-integration/src/grpc/flight.rs | 12 ++++++------ 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/src/partition/src/expr.rs b/src/partition/src/expr.rs index b758d6dcba..56d7e740e2 100644 --- a/src/partition/src/expr.rs +++ b/src/partition/src/expr.rs @@ -61,7 +61,7 @@ impl From for Operand { impl Operand { pub fn try_as_logical_expr(&self) -> error::Result { match self { - Self::Column(c) => Ok(datafusion_expr::col(c)), + Self::Column(c) => Ok(datafusion_expr::col(format!(r#""{}""#, c))), Self::Value(v) => { let scalar_value = match v { Value::Boolean(v) => ScalarValue::Boolean(Some(*v)), diff --git a/src/servers/src/grpc/greptime_handler.rs b/src/servers/src/grpc/greptime_handler.rs index b4736ab4d4..018fd33e60 100644 --- a/src/servers/src/grpc/greptime_handler.rs +++ b/src/servers/src/grpc/greptime_handler.rs @@ -167,7 +167,8 @@ impl GreptimeRequestHandler { let timer = metrics::GRPC_BULK_INSERT_ELAPSED.start_timer(); let result = handler .put_record_batch(&table_name, &mut table_id, &mut decoder, data) - .await; + .await + .inspect_err(|e| error!(e; "Failed to handle flight record batches")); timer.observe_duration(); let result = result .map(|x| DoPutResponse::new(request_id, x)) diff --git a/tests-integration/src/grpc/flight.rs b/tests-integration/src/grpc/flight.rs index 6c50a90e11..cebe59af5b 100644 --- a/tests-integration/src/grpc/flight.rs +++ b/tests-integration/src/grpc/flight.rs @@ -59,10 +59,10 @@ mod test { let record_batches = create_record_batches(1); test_put_record_batches(&client, record_batches).await; - let sql = "select ts, a, b from foo order by ts"; + let sql = "select ts, a, `B` from foo order by ts"; let expected = "\ +-------------------------+----+----+ -| ts | a | b | +| ts | a | B | +-------------------------+----+----+ | 1970-01-01T00:00:00.001 | -1 | s1 | | 1970-01-01T00:00:00.002 | -2 | s2 | @@ -116,10 +116,10 @@ mod test { let record_batches = create_record_batches(1); test_put_record_batches(&client, record_batches).await; - let sql = "select ts, a, b from foo order by ts"; + let sql = "select ts, a, `B` from foo order by ts"; let expected = "\ +-------------------------+----+----+ -| ts | a | b | +| ts | a | B | +-------------------------+----+----+ | 1970-01-01T00:00:00.001 | -1 | s1 | | 1970-01-01T00:00:00.002 | -2 | s2 | @@ -192,7 +192,7 @@ mod test { ) .with_time_index(true), ColumnSchema::new("a", ConcreteDataType::int32_datatype(), false), - ColumnSchema::new("b", ConcreteDataType::string_datatype(), true), + ColumnSchema::new("B", ConcreteDataType::string_datatype(), true), ])); let mut record_batches = Vec::with_capacity(3); @@ -250,7 +250,7 @@ mod test { ..Default::default() }, ColumnDef { - name: "b".to_string(), + name: "B".to_string(), data_type: ColumnDataType::String as i32, semantic_type: SemanticType::Field as i32, is_nullable: true,