feat: supports CsvWithNames and CsvWithNamesAndTypes formats (#6384)

* feat: supports CsvWithNames and CsvWithNamesAndTypes formats and object/array types

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* test: added and fixed tests

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* chore: fix test

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* chore: remove comments

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* test: add json type csv tests

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* chore: remove comment

Co-authored-by: Yingwen <realevenyag@gmail.com>

---------

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>
Co-authored-by: Yingwen <realevenyag@gmail.com>
This commit is contained in:
dennis zhuang
2025-06-25 15:28:11 +08:00
committed by GitHub
parent 7953b090c0
commit 944b4b3e49
5 changed files with 249 additions and 17 deletions

View File

@@ -306,7 +306,8 @@ pub enum GreptimeQueryOutput {
#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
pub enum ResponseFormat {
Arrow,
Csv,
// (with_names, with_types)
Csv(bool, bool),
Table,
#[default]
GreptimedbV1,
@@ -318,7 +319,9 @@ impl ResponseFormat {
pub fn parse(s: &str) -> Option<Self> {
match s {
"arrow" => Some(ResponseFormat::Arrow),
"csv" => Some(ResponseFormat::Csv),
"csv" => Some(ResponseFormat::Csv(false, false)),
"csvwithnames" => Some(ResponseFormat::Csv(true, false)),
"csvwithnamesandtypes" => Some(ResponseFormat::Csv(true, true)),
"table" => Some(ResponseFormat::Table),
"greptimedb_v1" => Some(ResponseFormat::GreptimedbV1),
"influxdb_v1" => Some(ResponseFormat::InfluxdbV1),
@@ -330,7 +333,7 @@ impl ResponseFormat {
pub fn as_str(&self) -> &'static str {
match self {
ResponseFormat::Arrow => "arrow",
ResponseFormat::Csv => "csv",
ResponseFormat::Csv(_, _) => "csv",
ResponseFormat::Table => "table",
ResponseFormat::GreptimedbV1 => "greptimedb_v1",
ResponseFormat::InfluxdbV1 => "influxdb_v1",
@@ -1480,7 +1483,7 @@ mod test {
for format in [
ResponseFormat::GreptimedbV1,
ResponseFormat::InfluxdbV1,
ResponseFormat::Csv,
ResponseFormat::Csv(true, true),
ResponseFormat::Table,
ResponseFormat::Arrow,
ResponseFormat::Json,
@@ -1490,7 +1493,9 @@ mod test {
let outputs = vec![Ok(Output::new_with_record_batches(recordbatches))];
let json_resp = match format {
ResponseFormat::Arrow => ArrowResponse::from_output(outputs, None).await,
ResponseFormat::Csv => CsvResponse::from_output(outputs).await,
ResponseFormat::Csv(with_names, with_types) => {
CsvResponse::from_output(outputs, with_names, with_types).await
}
ResponseFormat::Table => TableResponse::from_output(outputs).await,
ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await,
ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, None).await,
@@ -1583,4 +1588,46 @@ mod test {
}
}
}
#[test]
fn test_response_format_misc() {
assert_eq!(ResponseFormat::default(), ResponseFormat::GreptimedbV1);
assert_eq!(ResponseFormat::parse("arrow"), Some(ResponseFormat::Arrow));
assert_eq!(
ResponseFormat::parse("csv"),
Some(ResponseFormat::Csv(false, false))
);
assert_eq!(
ResponseFormat::parse("csvwithnames"),
Some(ResponseFormat::Csv(true, false))
);
assert_eq!(
ResponseFormat::parse("csvwithnamesandtypes"),
Some(ResponseFormat::Csv(true, true))
);
assert_eq!(ResponseFormat::parse("table"), Some(ResponseFormat::Table));
assert_eq!(
ResponseFormat::parse("greptimedb_v1"),
Some(ResponseFormat::GreptimedbV1)
);
assert_eq!(
ResponseFormat::parse("influxdb_v1"),
Some(ResponseFormat::InfluxdbV1)
);
assert_eq!(ResponseFormat::parse("json"), Some(ResponseFormat::Json));
// invalid formats
assert_eq!(ResponseFormat::parse("invalid"), None);
assert_eq!(ResponseFormat::parse(""), None);
assert_eq!(ResponseFormat::parse("CSV"), None); // Case sensitive
// as str
assert_eq!(ResponseFormat::Arrow.as_str(), "arrow");
assert_eq!(ResponseFormat::Csv(false, false).as_str(), "csv");
assert_eq!(ResponseFormat::Csv(true, true).as_str(), "csv");
assert_eq!(ResponseFormat::Table.as_str(), "table");
assert_eq!(ResponseFormat::GreptimedbV1.as_str(), "greptimedb_v1");
assert_eq!(ResponseFormat::InfluxdbV1.as_str(), "influxdb_v1");
assert_eq!(ResponseFormat::Json.as_str(), "json");
}
}

View File

@@ -138,7 +138,9 @@ pub async fn sql(
ResponseFormat::Arrow => {
ArrowResponse::from_output(outputs, query_params.compression).await
}
ResponseFormat::Csv => CsvResponse::from_output(outputs).await,
ResponseFormat::Csv(with_names, with_types) => {
CsvResponse::from_output(outputs, with_names, with_types).await
}
ResponseFormat::Table => TableResponse::from_output(outputs).await,
ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await,
ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, epoch).await,
@@ -327,7 +329,9 @@ pub async fn promql(
match format {
ResponseFormat::Arrow => ArrowResponse::from_output(outputs, compression).await,
ResponseFormat::Csv => CsvResponse::from_output(outputs).await,
ResponseFormat::Csv(with_names, with_types) => {
CsvResponse::from_output(outputs, with_names, with_types).await
}
ResponseFormat::Table => TableResponse::from_output(outputs).await,
ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await,
ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, epoch).await,

View File

@@ -18,9 +18,9 @@ use common_error::status_code::StatusCode;
use common_query::Output;
use mime_guess::mime;
use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;
use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT};
// use super::process_with_limit;
use crate::http::result::error_result::ErrorResponse;
use crate::http::{handler, process_with_limit, GreptimeQueryOutput, HttpResponse, ResponseFormat};
@@ -28,10 +28,16 @@ use crate::http::{handler, process_with_limit, GreptimeQueryOutput, HttpResponse
pub struct CsvResponse {
output: Vec<GreptimeQueryOutput>,
execution_time_ms: u64,
with_names: bool,
with_types: bool,
}
impl CsvResponse {
pub async fn from_output(outputs: Vec<crate::error::Result<Output>>) -> HttpResponse {
pub async fn from_output(
outputs: Vec<crate::error::Result<Output>>,
with_names: bool,
with_types: bool,
) -> HttpResponse {
match handler::from_output(outputs).await {
Err(err) => HttpResponse::Error(err),
Ok((output, _)) => {
@@ -41,10 +47,14 @@ impl CsvResponse {
"cannot output multi-statements result in csv format".to_string(),
))
} else {
HttpResponse::Csv(CsvResponse {
let csv_resp = CsvResponse {
output,
execution_time_ms: 0,
})
with_names: false,
with_types: false,
};
HttpResponse::Csv(csv_resp.with_names(with_names).with_types(with_types))
}
}
}
@@ -67,6 +77,21 @@ impl CsvResponse {
self.output = process_with_limit(self.output, limit);
self
}
pub fn with_names(mut self, with_names: bool) -> Self {
self.with_names = with_names;
self
}
pub fn with_types(mut self, with_types: bool) -> Self {
self.with_types = with_types;
// If `with_type` is true, than always set `with_names` to be true.
if with_types {
self.with_names = true;
}
self
}
}
macro_rules! http_try {
@@ -100,11 +125,50 @@ impl IntoResponse for CsvResponse {
format!("{n}\n")
}
Some(GreptimeQueryOutput::Records(records)) => {
let mut wtr = csv::Writer::from_writer(Vec::new());
let mut wtr = csv::WriterBuilder::new()
.terminator(csv::Terminator::CRLF) // RFC 4180
.from_writer(Vec::new());
if self.with_names {
let names = records
.schema
.column_schemas
.iter()
.map(|c| &c.name)
.collect::<Vec<_>>();
http_try!(wtr.serialize(names));
}
if self.with_types {
let types = records
.schema
.column_schemas
.iter()
.map(|c| &c.data_type)
.collect::<Vec<_>>();
http_try!(wtr.serialize(types));
}
for row in records.rows {
let row = row
.into_iter()
.map(|value| {
match value {
// Cast array and object to string
JsonValue::Array(a) => {
JsonValue::String(serde_json::to_string(&a).unwrap_or_default())
}
JsonValue::Object(o) => {
JsonValue::String(serde_json::to_string(&o).unwrap_or_default())
}
v => v,
}
})
.collect::<Vec<_>>();
http_try!(wtr.serialize(row));
}
http_try!(wtr.flush());
let bytes = http_try!(wtr.into_inner());
@@ -122,7 +186,9 @@ impl IntoResponse for CsvResponse {
.into_response();
resp.headers_mut().insert(
&GREPTIME_DB_HEADER_FORMAT,
HeaderValue::from_static(ResponseFormat::Csv.as_str()),
HeaderValue::from_static(
ResponseFormat::Csv(self.with_names, self.with_types).as_str(),
),
);
resp.headers_mut().insert(
&GREPTIME_DB_HEADER_EXECUTION_TIME,
@@ -131,3 +197,97 @@ impl IntoResponse for CsvResponse {
resp
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_query::Output;
use common_recordbatch::{RecordBatch, RecordBatches};
use datatypes::prelude::{ConcreteDataType, ScalarVector};
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::vectors::{BinaryVector, Float32Vector, StringVector, UInt32Vector, VectorRef};
use super::*;
#[tokio::test]
async fn test_csv_response_with_names_and_types() {
let (schema, columns) = create_test_data();
let data = r#"1,,-1000.1400146484375,"{""a"":{""b"":2},""b"":2,""c"":3}"
2,hello,1.9900000095367432,"{""a"":4,""b"":{""c"":6},""c"":6}""#
.replace("\n", "\r\n");
// Test with_names=true, with_types=true
{
let body = get_csv_body(&schema, &columns, true, true).await;
assert!(body.starts_with("col1,col2,col3,col4\r\nUInt32,String,Float32,Json\r\n"));
assert!(body.contains(&data));
}
// Test with_names=true, with_types=false
{
let body = get_csv_body(&schema, &columns, true, false).await;
assert!(body.starts_with("col1,col2,col3,col4\r\n"));
assert!(!body.contains("UInt32,String,Float32,Json"));
assert!(body.contains(&data));
}
// Test with_names=false, with_types=false
{
let body = get_csv_body(&schema, &columns, false, false).await;
assert!(!body.starts_with("col1,col2,col3,col4"));
assert!(!body.contains("UInt32,String,Float32,Json"));
assert!(body.contains(&data));
}
}
fn create_test_data() -> (Arc<Schema>, Vec<VectorRef>) {
let column_schemas = vec![
ColumnSchema::new("col1", ConcreteDataType::uint32_datatype(), false),
ColumnSchema::new("col2", ConcreteDataType::string_datatype(), true),
ColumnSchema::new("col3", ConcreteDataType::float32_datatype(), true),
ColumnSchema::new("col4", ConcreteDataType::json_datatype(), true),
];
let schema = Arc::new(Schema::new(column_schemas));
let json_strings = [
r#"{"a": {"b": 2}, "b": 2, "c": 3}"#,
r#"{"a": 4, "b": {"c": 6}, "c": 6}"#,
];
let jsonbs = json_strings
.iter()
.map(|s| {
let value = jsonb::parse_value(s.as_bytes()).unwrap();
value.to_vec()
})
.collect::<Vec<_>>();
let columns: Vec<VectorRef> = vec![
Arc::new(UInt32Vector::from_slice(vec![1, 2])),
Arc::new(StringVector::from(vec![None, Some("hello")])),
Arc::new(Float32Vector::from_slice(vec![-1000.14, 1.99])),
Arc::new(BinaryVector::from_vec(jsonbs)),
];
(schema, columns)
}
async fn get_csv_body(
schema: &Arc<Schema>,
columns: &[VectorRef],
with_names: bool,
with_types: bool,
) -> String {
let recordbatch = RecordBatch::new(schema.clone(), columns.to_vec()).unwrap();
let recordbatches = RecordBatches::try_new(schema.clone(), vec![recordbatch]).unwrap();
let output = Output::new_with_record_batches(recordbatches);
let outputs = vec![Ok(output)];
let resp = CsvResponse::from_output(outputs, with_names, with_types)
.await
.into_response();
let bytes = axum::body::to_bytes(resp.into_body(), 1024).await.unwrap();
String::from_utf8(bytes.to_vec()).unwrap()
}
}

View File

@@ -142,7 +142,7 @@ async fn test_sql_output_rows() {
axum::body::to_bytes(resp.into_body(), usize::MAX)
.await
.unwrap(),
Bytes::from_static(b"4950\n"),
Bytes::from_static(b"4950\r\n"),
);
}
HttpResponse::Table(resp) => {
@@ -289,7 +289,7 @@ async fn test_sql_form() {
axum::body::to_bytes(resp.into_body(), usize::MAX)
.await
.unwrap(),
Bytes::from_static(b"4950\n"),
Bytes::from_static(b"4950\r\n"),
);
}
HttpResponse::Table(resp) => {

View File

@@ -453,7 +453,28 @@ pub async fn test_sql_api(store_type: StorageType) {
assert_eq!(res.status(), StatusCode::OK);
let body = &res.text().await;
// Must be escaped correctly: 66.6,0,"host, ""name"
assert_eq!(body, "66.6,0,\"host, \"\"name\"\n");
assert_eq!(body, "66.6,0,\"host, \"\"name\"\r\n");
// csv with names
let res = client
.get("/v1/sql?format=csvWithNames&sql=select cpu,ts,host from demo limit 1")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = &res.text().await;
assert_eq!(body, "cpu,ts,host\r\n66.6,0,\"host, \"\"name\"\r\n");
// csv with names and types
let res = client
.get("/v1/sql?format=csvWithNamesAndTypes&sql=select cpu,ts,host from demo limit 1")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = &res.text().await;
assert_eq!(
body,
"cpu,ts,host\r\nFloat64,TimestampMillisecond,String\r\n66.6,0,\"host, \"\"name\"\r\n"
);
// test parse method
let res = client.get("/v1/sql/parse?sql=desc table t").send().await;
@@ -523,7 +544,7 @@ pub async fn test_prometheus_promql_api(store_type: StorageType) {
assert_eq!(res.status(), StatusCode::OK);
let csv_body = &res.text().await;
assert_eq!("0,1.0\n5000,1.0\n10000,1.0\n15000,1.0\n20000,1.0\n25000,1.0\n30000,1.0\n35000,1.0\n40000,1.0\n45000,1.0\n50000,1.0\n55000,1.0\n60000,1.0\n65000,1.0\n70000,1.0\n75000,1.0\n80000,1.0\n85000,1.0\n90000,1.0\n95000,1.0\n100000,1.0\n", csv_body);
assert_eq!("0,1.0\r\n5000,1.0\r\n10000,1.0\r\n15000,1.0\r\n20000,1.0\r\n25000,1.0\r\n30000,1.0\r\n35000,1.0\r\n40000,1.0\r\n45000,1.0\r\n50000,1.0\r\n55000,1.0\r\n60000,1.0\r\n65000,1.0\r\n70000,1.0\r\n75000,1.0\r\n80000,1.0\r\n85000,1.0\r\n90000,1.0\r\n95000,1.0\r\n100000,1.0\r\n", csv_body);
guard.remove_all().await;
}