From fbf50c594e8a9499cc0402381256bcdef69e676f Mon Sep 17 00:00:00 2001 From: dennis zhuang Date: Wed, 7 May 2025 22:52:20 -0700 Subject: [PATCH] fix: csv format escaping (#6061) * fix: csv format escaping * chore: change status code * fix: crate version --- Cargo.lock | 5 ++-- src/pipeline/Cargo.toml | 2 +- src/servers/Cargo.toml | 1 + src/servers/src/http/result/csv_result.rs | 30 +++++++++++++++++------ tests-integration/tests/http.rs | 14 +++++++++-- 5 files changed, 40 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 371d4dc1f0..f0419018d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2930,9 +2930,9 @@ dependencies = [ [[package]] name = "csv" -version = "1.3.0" +version = "1.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac574ff4d437a7b5ad237ef331c17ccca63c46479e5b5453eb8e10bb99a759fe" +checksum = "acdc4883a9c96732e4733212c01447ebd805833b7275a73ca3ee080fd77afdaf" dependencies = [ "csv-core", "itoa", @@ -10502,6 +10502,7 @@ dependencies = [ "common-time", "common-version", "criterion 0.5.1", + "csv", "dashmap", "datafusion", "datafusion-common", diff --git a/src/pipeline/Cargo.toml b/src/pipeline/Cargo.toml index 7a96ff82ce..f022e81f6f 100644 --- a/src/pipeline/Cargo.toml +++ b/src/pipeline/Cargo.toml @@ -28,7 +28,7 @@ common-runtime.workspace = true common-telemetry.workspace = true common-time.workspace = true crossbeam-utils.workspace = true -csv = "1.3.0" +csv = "1.3" dashmap.workspace = true datafusion.workspace = true datafusion-common.workspace = true diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index 5ec1563604..754f4f1e22 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -50,6 +50,7 @@ common-session.workspace = true common-telemetry.workspace = true common-time.workspace = true common-version = { workspace = true, features = ["codec"] } +csv = "1.3" dashmap.workspace = true datafusion.workspace = true datafusion-common.workspace = true diff --git a/src/servers/src/http/result/csv_result.rs b/src/servers/src/http/result/csv_result.rs index b6b997ef2e..ab662d8176 100644 --- a/src/servers/src/http/result/csv_result.rs +++ b/src/servers/src/http/result/csv_result.rs @@ -12,13 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt::Write; - use axum::http::{header, HeaderValue}; use axum::response::{IntoResponse, Response}; use common_error::status_code::StatusCode; use common_query::Output; -use itertools::Itertools; use mime_guess::mime; use serde::{Deserialize, Serialize}; @@ -72,6 +69,22 @@ impl CsvResponse { } } +macro_rules! http_try { + ($handle: expr) => { + match $handle { + Ok(res) => res, + Err(err) => { + let msg = err.to_string(); + return HttpResponse::Error(ErrorResponse::from_error_message( + StatusCode::Unexpected, + msg, + )) + .into_response(); + } + } + }; +} + impl IntoResponse for CsvResponse { fn into_response(mut self) -> Response { debug_assert!( @@ -87,12 +100,15 @@ impl IntoResponse for CsvResponse { format!("{n}\n") } Some(GreptimeQueryOutput::Records(records)) => { - let mut result = String::new(); + let mut wtr = csv::Writer::from_writer(Vec::new()); + for row in records.rows { - let row = row.iter().map(|v| v.to_string()).join(","); - writeln!(result, "{row}").unwrap(); + http_try!(wtr.serialize(row)); } - result + http_try!(wtr.flush()); + + let bytes = http_try!(wtr.into_inner()); + http_try!(String::from_utf8(bytes)) } }; diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index a3470eef76..f02d25dbd6 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -270,7 +270,7 @@ pub async fn test_sql_api(store_type: StorageType) { // test insert and select let res = client - .get("/v1/sql?sql=insert into demo values('host', 66.6, 1024, 0)") + .get("/v1/sql?sql=insert into demo values('host, \"name', 66.6, 1024, 0)") .send() .await; assert_eq!(res.status(), StatusCode::OK); @@ -289,7 +289,7 @@ pub async fn test_sql_api(store_type: StorageType) { assert_eq!( output[0], serde_json::from_value::(json!({ - "records":{"schema":{"column_schemas":[{"name":"host","data_type":"String"},{"name":"cpu","data_type":"Float64"},{"name":"memory","data_type":"Float64"},{"name":"ts","data_type":"TimestampMillisecond"}]},"rows":[["host",66.6,1024.0,0]],"total_rows":1} + "records":{"schema":{"column_schemas":[{"name":"host","data_type":"String"},{"name":"cpu","data_type":"Float64"},{"name":"memory","data_type":"Float64"},{"name":"ts","data_type":"TimestampMillisecond"}]},"rows":[["host, \"name",66.6,1024.0,0]],"total_rows":1} })).unwrap() ); @@ -436,6 +436,16 @@ pub async fn test_sql_api(store_type: StorageType) { // this is something only json format can show assert!(format!("{:?}", output[0]).contains("\\\"param\\\"")); + // test csv format + let res = client + .get("/v1/sql?format=csv&sql=select cpu,ts,host from demo limit 1") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + let body = &res.text().await; + // Must be escaped correctly: 66.6,0,"host, ""name" + assert_eq!(body, "66.6,0,\"host, \"\"name\"\n"); + // test parse method let res = client.get("/v1/sql/parse?sql=desc table t").send().await; assert_eq!(res.status(), StatusCode::OK);