Compare commits

...

3 Commits

Author SHA1 Message Date
luofucong
1bb541112f ingest jsonbench data
Signed-off-by: luofucong <luofc@foxmail.com>
2025-12-23 19:28:21 +08:00
discord9
fdedbb8261 fix: part sort share same topk dyn filter&early stop use dyn filter (#7460)
* fix: part sort share same topk dyn filter

Signed-off-by: discord9 <discord9@163.com>

* test: one

Signed-off-by: discord9 <discord9@163.com>

* feat: use dyn filter properly instead

Signed-off-by: discord9 <discord9@163.com>

* c

Signed-off-by: discord9 <discord9@163.com>

* docs: explain why dyn filter work

Signed-off-by: discord9 <discord9@163.com>

* chore: after rebase fix

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
2025-12-23 09:24:55 +00:00
Lanqing Yang
8d9afc83e3 feat: allow auto schema creation for pg (#7459)
Signed-off-by: lyang24 <lanqingy93@gmail.com>
2025-12-23 08:55:24 +00:00
38 changed files with 1256 additions and 548 deletions

6
Cargo.lock generated
View File

@@ -2580,10 +2580,12 @@ dependencies = [
name = "common-sql"
version = "1.0.0-beta.3"
dependencies = [
"arrow-schema",
"common-base",
"common-decimal",
"common-error",
"common-macro",
"common-telemetry",
"common-time",
"datafusion-sql",
"datatypes",
@@ -12229,7 +12231,7 @@ dependencies = [
[[package]]
name = "sqlparser"
version = "0.58.0"
source = "git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=4b519a5caa95472cc3988f5556813a583dd35af1#4b519a5caa95472cc3988f5556813a583dd35af1"
source = "git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=a0ce2bc6eb3e804532932f39833c32432f5c9a39#a0ce2bc6eb3e804532932f39833c32432f5c9a39"
dependencies = [
"lazy_static",
"log",
@@ -12253,7 +12255,7 @@ dependencies = [
[[package]]
name = "sqlparser_derive"
version = "0.3.0"
source = "git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=4b519a5caa95472cc3988f5556813a583dd35af1#4b519a5caa95472cc3988f5556813a583dd35af1"
source = "git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=a0ce2bc6eb3e804532932f39833c32432f5c9a39#a0ce2bc6eb3e804532932f39833c32432f5c9a39"
dependencies = [
"proc-macro2",
"quote",

View File

@@ -332,7 +332,7 @@ datafusion-physical-plan = { git = "https://github.com/GreptimeTeam/datafusion.g
datafusion-datasource = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-sql = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-substrait = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "4b519a5caa95472cc3988f5556813a583dd35af1" } # branch = "v0.58.x"
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "a0ce2bc6eb3e804532932f39833c32432f5c9a39" } # branch = "v0.58.x"
[profile.release]
debug = 1

View File

@@ -344,7 +344,8 @@
| `store_key_prefix` | String | `""` | If it's not empty, the metasrv will store all data with this key prefix. |
| `backend` | String | `etcd_store` | The datastore for meta server.<br/>Available values:<br/>- `etcd_store` (default value)<br/>- `memory_store`<br/>- `postgres_store`<br/>- `mysql_store` |
| `meta_table_name` | String | `greptime_metakv` | Table name in RDS to store metadata. Effect when using a RDS kvbackend.<br/>**Only used when backend is `postgres_store`.** |
| `meta_schema_name` | String | `greptime_schema` | Optional PostgreSQL schema for metadata table and election table name qualification.<br/>When PostgreSQL public schema is not writable (e.g., PostgreSQL 15+ with restricted public),<br/>set this to a writable schema. GreptimeDB will use `meta_schema_name`.`meta_table_name`.<br/>GreptimeDB will NOT create the schema automatically; please ensure it exists or the user has permission.<br/>**Only used when backend is `postgres_store`.** |
| `meta_schema_name` | String | `greptime_schema` | Optional PostgreSQL schema for metadata table and election table name qualification.<br/>When PostgreSQL public schema is not writable (e.g., PostgreSQL 15+ with restricted public),<br/>set this to a writable schema. GreptimeDB will use `meta_schema_name`.`meta_table_name`.<br/>**Only used when backend is `postgres_store`.** |
| `auto_create_schema` | Bool | `true` | Automatically create PostgreSQL schema if it doesn't exist.<br/>When enabled, the system will execute `CREATE SCHEMA IF NOT EXISTS <schema_name>`<br/>before creating metadata tables. This is useful in production environments where<br/>manual schema creation may be restricted.<br/>Default is true.<br/>Note: The PostgreSQL user must have CREATE SCHEMA permission for this to work.<br/>**Only used when backend is `postgres_store`.** |
| `meta_election_lock_id` | Integer | `1` | Advisory lock id in PostgreSQL for election. Effect when using PostgreSQL as kvbackend<br/>Only used when backend is `postgres_store`. |
| `selector` | String | `round_robin` | Datanode selector type.<br/>- `round_robin` (default value)<br/>- `lease_based`<br/>- `load_based`<br/>For details, please see "https://docs.greptime.com/developer-guide/metasrv/selector". |
| `enable_region_failover` | Bool | `false` | Whether to enable region failover.<br/>This feature is only available on GreptimeDB running on cluster mode and<br/>- Using Remote WAL<br/>- Using shared storage (e.g., s3). |

View File

@@ -34,11 +34,18 @@ meta_table_name = "greptime_metakv"
## Optional PostgreSQL schema for metadata table and election table name qualification.
## When PostgreSQL public schema is not writable (e.g., PostgreSQL 15+ with restricted public),
## set this to a writable schema. GreptimeDB will use `meta_schema_name`.`meta_table_name`.
## GreptimeDB will NOT create the schema automatically; please ensure it exists or the user has permission.
## **Only used when backend is `postgres_store`.**
meta_schema_name = "greptime_schema"
## Automatically create PostgreSQL schema if it doesn't exist.
## When enabled, the system will execute `CREATE SCHEMA IF NOT EXISTS <schema_name>`
## before creating metadata tables. This is useful in production environments where
## manual schema creation may be restricted.
## Default is true.
## Note: The PostgreSQL user must have CREATE SCHEMA permission for this to work.
## **Only used when backend is `postgres_store`.**
auto_create_schema = true
## Advisory lock id in PostgreSQL for election. Effect when using PostgreSQL as kvbackend
## Only used when backend is `postgres_store`.
meta_election_lock_id = 1

View File

@@ -61,6 +61,12 @@ pub struct StoreConfig {
#[cfg(feature = "pg_kvbackend")]
#[clap(long)]
pub meta_schema_name: Option<String>,
/// Automatically create PostgreSQL schema if it doesn't exist (default: true).
#[cfg(feature = "pg_kvbackend")]
#[clap(long, default_value_t = true)]
pub auto_create_schema: bool,
/// TLS mode for backend store connections (etcd, PostgreSQL, MySQL)
#[clap(long = "backend-tls-mode", value_enum, default_value = "disable")]
pub backend_tls_mode: TlsMode,
@@ -138,6 +144,7 @@ impl StoreConfig {
schema_name,
table_name,
max_txn_ops,
self.auto_create_schema,
)
.await
.map_err(BoxedError::new)?)

View File

@@ -848,7 +848,7 @@ impl PgStore {
.context(CreatePostgresPoolSnafu)?,
};
Self::with_pg_pool(pool, None, table_name, max_txn_ops).await
Self::with_pg_pool(pool, None, table_name, max_txn_ops, false).await
}
/// Create [PgStore] impl of [KvBackendRef] from url (backward compatibility).
@@ -862,6 +862,7 @@ impl PgStore {
schema_name: Option<&str>,
table_name: &str,
max_txn_ops: usize,
auto_create_schema: bool,
) -> Result<KvBackendRef> {
// Ensure the postgres metadata backend is ready to use.
let client = match pool.get().await {
@@ -873,9 +874,23 @@ impl PgStore {
.fail();
}
};
// Automatically create schema if enabled and schema_name is provided.
if auto_create_schema
&& let Some(schema) = schema_name
&& !schema.is_empty()
{
let create_schema_sql = format!("CREATE SCHEMA IF NOT EXISTS \"{}\"", schema);
client
.execute(&create_schema_sql, &[])
.await
.with_context(|_| PostgresExecutionSnafu {
sql: create_schema_sql.clone(),
})?;
}
let template_factory = PgSqlTemplateFactory::new(schema_name, table_name);
let sql_template_set = template_factory.build();
// Do not attempt to create schema implicitly.
client
.execute(&sql_template_set.create_table_statement, &[])
.await
@@ -959,7 +974,7 @@ mod tests {
let Some(pool) = build_pg15_pool().await else {
return;
};
let res = PgStore::with_pg_pool(pool, None, "pg15_public_should_fail", 128).await;
let res = PgStore::with_pg_pool(pool, None, "pg15_public_should_fail", 128, false).await;
assert!(
res.is_err(),
"creating table in public should fail for test_user"
@@ -1214,4 +1229,249 @@ mod tests {
let t = PgSqlTemplateFactory::format_table_ident(Some(""), "test_table");
assert_eq!(t, "\"test_table\"");
}
#[tokio::test]
async fn test_auto_create_schema_enabled() {
common_telemetry::init_default_ut_logging();
maybe_skip_postgres_integration_test!();
let endpoints = std::env::var("GT_POSTGRES_ENDPOINTS").unwrap();
let mut cfg = Config::new();
cfg.url = Some(endpoints);
let pool = cfg
.create_pool(Some(Runtime::Tokio1), NoTls)
.context(CreatePostgresPoolSnafu)
.unwrap();
let schema_name = "test_auto_create_enabled";
let table_name = "test_table";
// Drop the schema if it exists to start clean
let client = pool.get().await.unwrap();
let _ = client
.execute(
&format!("DROP SCHEMA IF EXISTS \"{}\" CASCADE", schema_name),
&[],
)
.await;
// Create store with auto_create_schema enabled
let _ = PgStore::with_pg_pool(pool.clone(), Some(schema_name), table_name, 128, true)
.await
.unwrap();
// Verify schema was created
let row = client
.query_one(
"SELECT schema_name FROM information_schema.schemata WHERE schema_name = $1",
&[&schema_name],
)
.await
.unwrap();
let created_schema: String = row.get(0);
assert_eq!(created_schema, schema_name);
// Verify table was created in the schema
let row = client
.query_one(
"SELECT table_schema, table_name FROM information_schema.tables WHERE table_schema = $1 AND table_name = $2",
&[&schema_name, &table_name],
)
.await
.unwrap();
let created_table_schema: String = row.get(0);
let created_table_name: String = row.get(1);
assert_eq!(created_table_schema, schema_name);
assert_eq!(created_table_name, table_name);
// Cleanup
let _ = client
.execute(
&format!("DROP SCHEMA IF EXISTS \"{}\" CASCADE", schema_name),
&[],
)
.await;
}
#[tokio::test]
async fn test_auto_create_schema_disabled() {
common_telemetry::init_default_ut_logging();
maybe_skip_postgres_integration_test!();
let endpoints = std::env::var("GT_POSTGRES_ENDPOINTS").unwrap();
let mut cfg = Config::new();
cfg.url = Some(endpoints);
let pool = cfg
.create_pool(Some(Runtime::Tokio1), NoTls)
.context(CreatePostgresPoolSnafu)
.unwrap();
let schema_name = "test_auto_create_disabled";
let table_name = "test_table";
// Drop the schema if it exists to start clean
let client = pool.get().await.unwrap();
let _ = client
.execute(
&format!("DROP SCHEMA IF EXISTS \"{}\" CASCADE", schema_name),
&[],
)
.await;
// Try to create store with auto_create_schema disabled (should fail)
let result =
PgStore::with_pg_pool(pool.clone(), Some(schema_name), table_name, 128, false).await;
// Verify it failed because schema doesn't exist
assert!(
result.is_err(),
"Expected error when schema doesn't exist and auto_create_schema is disabled"
);
}
#[tokio::test]
async fn test_auto_create_schema_already_exists() {
common_telemetry::init_default_ut_logging();
maybe_skip_postgres_integration_test!();
let endpoints = std::env::var("GT_POSTGRES_ENDPOINTS").unwrap();
let mut cfg = Config::new();
cfg.url = Some(endpoints);
let pool = cfg
.create_pool(Some(Runtime::Tokio1), NoTls)
.context(CreatePostgresPoolSnafu)
.unwrap();
let schema_name = "test_auto_create_existing";
let table_name = "test_table";
// Manually create the schema first
let client = pool.get().await.unwrap();
let _ = client
.execute(
&format!("DROP SCHEMA IF EXISTS \"{}\" CASCADE", schema_name),
&[],
)
.await;
client
.execute(&format!("CREATE SCHEMA \"{}\"", schema_name), &[])
.await
.unwrap();
// Create store with auto_create_schema enabled (should succeed idempotently)
let _ = PgStore::with_pg_pool(pool.clone(), Some(schema_name), table_name, 128, true)
.await
.unwrap();
// Verify schema still exists
let row = client
.query_one(
"SELECT schema_name FROM information_schema.schemata WHERE schema_name = $1",
&[&schema_name],
)
.await
.unwrap();
let created_schema: String = row.get(0);
assert_eq!(created_schema, schema_name);
// Verify table was created in the schema
let row = client
.query_one(
"SELECT table_schema, table_name FROM information_schema.tables WHERE table_schema = $1 AND table_name = $2",
&[&schema_name, &table_name],
)
.await
.unwrap();
let created_table_schema: String = row.get(0);
let created_table_name: String = row.get(1);
assert_eq!(created_table_schema, schema_name);
assert_eq!(created_table_name, table_name);
// Cleanup
let _ = client
.execute(
&format!("DROP SCHEMA IF EXISTS \"{}\" CASCADE", schema_name),
&[],
)
.await;
}
#[tokio::test]
async fn test_auto_create_schema_no_schema_name() {
common_telemetry::init_default_ut_logging();
maybe_skip_postgres_integration_test!();
let endpoints = std::env::var("GT_POSTGRES_ENDPOINTS").unwrap();
let mut cfg = Config::new();
cfg.url = Some(endpoints);
let pool = cfg
.create_pool(Some(Runtime::Tokio1), NoTls)
.context(CreatePostgresPoolSnafu)
.unwrap();
let table_name = "test_table_no_schema";
// Create store with auto_create_schema enabled but no schema name (should succeed)
// This should create the table in the default schema (public)
let _ = PgStore::with_pg_pool(pool.clone(), None, table_name, 128, true)
.await
.unwrap();
// Verify table was created in public schema
let client = pool.get().await.unwrap();
let row = client
.query_one(
"SELECT table_schema, table_name FROM information_schema.tables WHERE table_name = $1",
&[&table_name],
)
.await
.unwrap();
let created_table_schema: String = row.get(0);
let created_table_name: String = row.get(1);
assert_eq!(created_table_name, table_name);
// Verify it's in public schema (or whichever is the default)
assert!(created_table_schema == "public" || !created_table_schema.is_empty());
// Cleanup
let _ = client
.execute(&format!("DROP TABLE IF EXISTS \"{}\"", table_name), &[])
.await;
}
#[tokio::test]
async fn test_auto_create_schema_with_empty_schema_name() {
common_telemetry::init_default_ut_logging();
maybe_skip_postgres_integration_test!();
let endpoints = std::env::var("GT_POSTGRES_ENDPOINTS").unwrap();
let mut cfg = Config::new();
cfg.url = Some(endpoints);
let pool = cfg
.create_pool(Some(Runtime::Tokio1), NoTls)
.context(CreatePostgresPoolSnafu)
.unwrap();
let table_name = "test_table_empty_schema";
// Create store with auto_create_schema enabled but empty schema name (should succeed)
// This should create the table in the default schema (public)
let _ = PgStore::with_pg_pool(pool.clone(), Some(""), table_name, 128, true)
.await
.unwrap();
// Verify table was created in public schema
let client = pool.get().await.unwrap();
let row = client
.query_one(
"SELECT table_schema, table_name FROM information_schema.tables WHERE table_name = $1",
&[&table_name],
)
.await
.unwrap();
let created_table_schema: String = row.get(0);
let created_table_name: String = row.get(1);
assert_eq!(created_table_name, table_name);
// Verify it's in public schema (or whichever is the default)
assert!(created_table_schema == "public" || !created_table_schema.is_empty());
// Cleanup
let _ = client
.execute(&format!("DROP TABLE IF EXISTS \"{}\"", table_name), &[])
.await;
}
}

View File

@@ -5,10 +5,12 @@ edition.workspace = true
license.workspace = true
[dependencies]
arrow-schema.workspace = true
common-base.workspace = true
common-decimal.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
datafusion-sql.workspace = true
datatypes.workspace = true

View File

@@ -14,11 +14,12 @@
use std::str::FromStr;
use arrow_schema::extension::ExtensionType;
use common_time::Timestamp;
use common_time::timezone::Timezone;
use datatypes::json::JsonStructureSettings;
use datatypes::extension::json::JsonExtensionType;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnDefaultConstraint;
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema};
use datatypes::types::{JsonFormat, parse_string_to_jsonb, parse_string_to_vector_type_value};
use datatypes::value::{OrderedF32, OrderedF64, Value};
use snafu::{OptionExt, ResultExt, ensure};
@@ -124,13 +125,14 @@ pub(crate) fn sql_number_to_value(data_type: &ConcreteDataType, n: &str) -> Resu
/// If `auto_string_to_numeric` is true, tries to cast the string value to numeric values,
/// and returns error if the cast fails.
pub fn sql_value_to_value(
column_name: &str,
data_type: &ConcreteDataType,
column_schema: &ColumnSchema,
sql_val: &SqlValue,
timezone: Option<&Timezone>,
unary_op: Option<UnaryOperator>,
auto_string_to_numeric: bool,
) -> Result<Value> {
let column_name = &column_schema.name;
let data_type = &column_schema.data_type;
let mut value = match sql_val {
SqlValue::Number(n, _) => sql_number_to_value(data_type, n)?,
SqlValue::Null => Value::Null,
@@ -146,13 +148,9 @@ pub fn sql_value_to_value(
(*b).into()
}
SqlValue::DoubleQuotedString(s) | SqlValue::SingleQuotedString(s) => parse_string_to_value(
column_name,
s.clone(),
data_type,
timezone,
auto_string_to_numeric,
)?,
SqlValue::DoubleQuotedString(s) | SqlValue::SingleQuotedString(s) => {
parse_string_to_value(column_schema, s.clone(), timezone, auto_string_to_numeric)?
}
SqlValue::HexStringLiteral(s) => {
// Should not directly write binary into json column
ensure!(
@@ -244,12 +242,12 @@ pub fn sql_value_to_value(
}
pub(crate) fn parse_string_to_value(
column_name: &str,
column_schema: &ColumnSchema,
s: String,
data_type: &ConcreteDataType,
timezone: Option<&Timezone>,
auto_string_to_numeric: bool,
) -> Result<Value> {
let data_type = &column_schema.data_type;
if auto_string_to_numeric && let Some(value) = auto_cast_to_numeric(&s, data_type)? {
return Ok(value);
}
@@ -257,7 +255,7 @@ pub(crate) fn parse_string_to_value(
ensure!(
data_type.is_stringifiable(),
ColumnTypeMismatchSnafu {
column_name,
column_name: column_schema.name.clone(),
expect: data_type.clone(),
actual: ConcreteDataType::string_datatype(),
}
@@ -303,23 +301,21 @@ pub(crate) fn parse_string_to_value(
}
}
ConcreteDataType::Binary(_) => Ok(Value::Binary(s.as_bytes().into())),
ConcreteDataType::Json(j) => {
match &j.format {
JsonFormat::Jsonb => {
let v = parse_string_to_jsonb(&s).context(DatatypeSnafu)?;
Ok(Value::Binary(v.into()))
}
JsonFormat::Native(_inner) => {
// Always use the structured version at this level.
let serde_json_value =
serde_json::from_str(&s).context(DeserializeSnafu { json: s })?;
let json_structure_settings = JsonStructureSettings::Structured(None);
json_structure_settings
.encode(serde_json_value)
.context(DatatypeSnafu)
}
ConcreteDataType::Json(j) => match &j.format {
JsonFormat::Jsonb => {
let v = parse_string_to_jsonb(&s).context(DatatypeSnafu)?;
Ok(Value::Binary(v.into()))
}
}
JsonFormat::Native(_) => {
let extension_type: Option<JsonExtensionType> =
column_schema.extension_type().context(DatatypeSnafu)?;
let json_structure_settings = extension_type
.and_then(|x| x.metadata().json_structure_settings.clone())
.unwrap_or_default();
let v = serde_json::from_str(&s).context(DeserializeSnafu { json: s })?;
json_structure_settings.encode(v).context(DatatypeSnafu)
}
},
ConcreteDataType::Vector(d) => {
let v = parse_string_to_vector_type_value(&s, Some(d.dim)).context(DatatypeSnafu)?;
Ok(Value::Binary(v.into()))
@@ -417,305 +413,265 @@ mod test {
use super::*;
macro_rules! call_parse_string_to_value {
($column_name: expr, $input: expr, $data_type: expr) => {
call_parse_string_to_value!($column_name, $input, $data_type, None)
};
($column_name: expr, $input: expr, $data_type: expr, timezone = $timezone: expr) => {
call_parse_string_to_value!($column_name, $input, $data_type, Some($timezone))
};
($column_name: expr, $input: expr, $data_type: expr, $timezone: expr) => {{
let column_schema = ColumnSchema::new($column_name, $data_type, true);
parse_string_to_value(&column_schema, $input, $timezone, true)
}};
}
#[test]
fn test_string_to_value_auto_numeric() {
fn test_string_to_value_auto_numeric() -> Result<()> {
// Test string to boolean with auto cast
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"true".to_string(),
&ConcreteDataType::boolean_datatype(),
None,
true,
)
.unwrap();
ConcreteDataType::boolean_datatype()
)?;
assert_eq!(Value::Boolean(true), result);
// Test invalid string to boolean with auto cast
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"not_a_boolean".to_string(),
&ConcreteDataType::boolean_datatype(),
None,
true,
ConcreteDataType::boolean_datatype()
);
assert!(result.is_err());
// Test string to int8
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"42".to_string(),
&ConcreteDataType::int8_datatype(),
None,
true,
)
.unwrap();
ConcreteDataType::int8_datatype()
)?;
assert_eq!(Value::Int8(42), result);
// Test invalid string to int8 with auto cast
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"not_an_int8".to_string(),
&ConcreteDataType::int8_datatype(),
None,
true,
ConcreteDataType::int8_datatype()
);
assert!(result.is_err());
// Test string to int16
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"1000".to_string(),
&ConcreteDataType::int16_datatype(),
None,
true,
)
.unwrap();
ConcreteDataType::int16_datatype()
)?;
assert_eq!(Value::Int16(1000), result);
// Test invalid string to int16 with auto cast
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"not_an_int16".to_string(),
&ConcreteDataType::int16_datatype(),
None,
true,
ConcreteDataType::int16_datatype()
);
assert!(result.is_err());
// Test string to int32
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"100000".to_string(),
&ConcreteDataType::int32_datatype(),
None,
true,
)
.unwrap();
ConcreteDataType::int32_datatype()
)?;
assert_eq!(Value::Int32(100000), result);
// Test invalid string to int32 with auto cast
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"not_an_int32".to_string(),
&ConcreteDataType::int32_datatype(),
None,
true,
ConcreteDataType::int32_datatype()
);
assert!(result.is_err());
// Test string to int64
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"1000000".to_string(),
&ConcreteDataType::int64_datatype(),
None,
true,
)
.unwrap();
ConcreteDataType::int64_datatype()
)?;
assert_eq!(Value::Int64(1000000), result);
// Test invalid string to int64 with auto cast
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"not_an_int64".to_string(),
&ConcreteDataType::int64_datatype(),
None,
true,
ConcreteDataType::int64_datatype()
);
assert!(result.is_err());
// Test string to uint8
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"200".to_string(),
&ConcreteDataType::uint8_datatype(),
None,
true,
)
.unwrap();
ConcreteDataType::uint8_datatype()
)?;
assert_eq!(Value::UInt8(200), result);
// Test invalid string to uint8 with auto cast
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"not_a_uint8".to_string(),
&ConcreteDataType::uint8_datatype(),
None,
true,
ConcreteDataType::uint8_datatype()
);
assert!(result.is_err());
// Test string to uint16
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"60000".to_string(),
&ConcreteDataType::uint16_datatype(),
None,
true,
)
.unwrap();
ConcreteDataType::uint16_datatype()
)?;
assert_eq!(Value::UInt16(60000), result);
// Test invalid string to uint16 with auto cast
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"not_a_uint16".to_string(),
&ConcreteDataType::uint16_datatype(),
None,
true,
ConcreteDataType::uint16_datatype()
);
assert!(result.is_err());
// Test string to uint32
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"4000000000".to_string(),
&ConcreteDataType::uint32_datatype(),
None,
true,
)
.unwrap();
ConcreteDataType::uint32_datatype()
)?;
assert_eq!(Value::UInt32(4000000000), result);
// Test invalid string to uint32 with auto cast
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"not_a_uint32".to_string(),
&ConcreteDataType::uint32_datatype(),
None,
true,
ConcreteDataType::uint32_datatype()
);
assert!(result.is_err());
// Test string to uint64
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"18446744073709551615".to_string(),
&ConcreteDataType::uint64_datatype(),
None,
true,
)
.unwrap();
ConcreteDataType::uint64_datatype()
)?;
assert_eq!(Value::UInt64(18446744073709551615), result);
// Test invalid string to uint64 with auto cast
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"not_a_uint64".to_string(),
&ConcreteDataType::uint64_datatype(),
None,
true,
ConcreteDataType::uint64_datatype()
);
assert!(result.is_err());
// Test string to float32
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"3.5".to_string(),
&ConcreteDataType::float32_datatype(),
None,
true,
)
.unwrap();
ConcreteDataType::float32_datatype()
)?;
assert_eq!(Value::Float32(OrderedF32::from(3.5)), result);
// Test invalid string to float32 with auto cast
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"not_a_float32".to_string(),
&ConcreteDataType::float32_datatype(),
None,
true,
ConcreteDataType::float32_datatype()
);
assert!(result.is_err());
// Test string to float64
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"3.5".to_string(),
&ConcreteDataType::float64_datatype(),
None,
true,
)
.unwrap();
ConcreteDataType::float64_datatype()
)?;
assert_eq!(Value::Float64(OrderedF64::from(3.5)), result);
// Test invalid string to float64 with auto cast
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"not_a_float64".to_string(),
&ConcreteDataType::float64_datatype(),
None,
true,
ConcreteDataType::float64_datatype()
);
assert!(result.is_err());
Ok(())
}
#[test]
fn test_sql_value_to_value() {
let sql_val = SqlValue::Null;
assert_eq!(
Value::Null,
sql_value_to_value(
"a",
&ConcreteDataType::float64_datatype(),
&sql_val,
None,
macro_rules! call_sql_value_to_value {
($column_name: expr, $data_type: expr, $sql_value: expr) => {
call_sql_value_to_value!($column_name, $data_type, $sql_value, None, None, false)
};
($column_name: expr, $data_type: expr, $sql_value: expr, timezone = $timezone: expr) => {
call_sql_value_to_value!(
$column_name,
$data_type,
$sql_value,
Some($timezone),
None,
false
)
.unwrap()
};
($column_name: expr, $data_type: expr, $sql_value: expr, unary_op = $unary_op: expr) => {
call_sql_value_to_value!(
$column_name,
$data_type,
$sql_value,
None,
Some($unary_op),
false
)
};
($column_name: expr, $data_type: expr, $sql_value: expr, auto_string_to_numeric) => {
call_sql_value_to_value!($column_name, $data_type, $sql_value, None, None, true)
};
($column_name: expr, $data_type: expr, $sql_value: expr, $timezone: expr, $unary_op: expr, $auto_string_to_numeric: expr) => {{
let column_schema = ColumnSchema::new($column_name, $data_type, true);
sql_value_to_value(
&column_schema,
$sql_value,
$timezone,
$unary_op,
$auto_string_to_numeric,
)
}};
}
#[test]
fn test_sql_value_to_value() -> Result<()> {
let sql_val = SqlValue::Null;
assert_eq!(
Value::Null,
call_sql_value_to_value!("a", ConcreteDataType::float64_datatype(), &sql_val)?
);
let sql_val = SqlValue::Boolean(true);
assert_eq!(
Value::Boolean(true),
sql_value_to_value(
"a",
&ConcreteDataType::boolean_datatype(),
&sql_val,
None,
None,
false
)
.unwrap()
call_sql_value_to_value!("a", ConcreteDataType::boolean_datatype(), &sql_val)?
);
let sql_val = SqlValue::Number("3.0".to_string(), false);
assert_eq!(
Value::Float64(OrderedFloat(3.0)),
sql_value_to_value(
"a",
&ConcreteDataType::float64_datatype(),
&sql_val,
None,
None,
false
)
.unwrap()
call_sql_value_to_value!("a", ConcreteDataType::float64_datatype(), &sql_val)?
);
let sql_val = SqlValue::Number("3.0".to_string(), false);
let v = sql_value_to_value(
"a",
&ConcreteDataType::boolean_datatype(),
&sql_val,
None,
None,
false,
);
let v = call_sql_value_to_value!("a", ConcreteDataType::boolean_datatype(), &sql_val);
assert!(v.is_err());
assert!(format!("{v:?}").contains("Failed to parse number '3.0' to boolean column type"));
let sql_val = SqlValue::Boolean(true);
let v = sql_value_to_value(
"a",
&ConcreteDataType::float64_datatype(),
&sql_val,
None,
None,
false,
);
let v = call_sql_value_to_value!("a", ConcreteDataType::float64_datatype(), &sql_val);
assert!(v.is_err());
assert!(
format!("{v:?}").contains(
@@ -725,41 +681,18 @@ mod test {
);
let sql_val = SqlValue::HexStringLiteral("48656c6c6f20776f726c6421".to_string());
let v = sql_value_to_value(
"a",
&ConcreteDataType::binary_datatype(),
&sql_val,
None,
None,
false,
)
.unwrap();
let v = call_sql_value_to_value!("a", ConcreteDataType::binary_datatype(), &sql_val)?;
assert_eq!(Value::Binary(Bytes::from(b"Hello world!".as_slice())), v);
let sql_val = SqlValue::DoubleQuotedString("MorningMyFriends".to_string());
let v = sql_value_to_value(
"a",
&ConcreteDataType::binary_datatype(),
&sql_val,
None,
None,
false,
)
.unwrap();
let v = call_sql_value_to_value!("a", ConcreteDataType::binary_datatype(), &sql_val)?;
assert_eq!(
Value::Binary(Bytes::from(b"MorningMyFriends".as_slice())),
v
);
let sql_val = SqlValue::HexStringLiteral("9AF".to_string());
let v = sql_value_to_value(
"a",
&ConcreteDataType::binary_datatype(),
&sql_val,
None,
None,
false,
);
let v = call_sql_value_to_value!("a", ConcreteDataType::binary_datatype(), &sql_val);
assert!(v.is_err());
assert!(
format!("{v:?}").contains("odd number of digits"),
@@ -767,38 +700,16 @@ mod test {
);
let sql_val = SqlValue::HexStringLiteral("AG".to_string());
let v = sql_value_to_value(
"a",
&ConcreteDataType::binary_datatype(),
&sql_val,
None,
None,
false,
);
let v = call_sql_value_to_value!("a", ConcreteDataType::binary_datatype(), &sql_val);
assert!(v.is_err());
assert!(format!("{v:?}").contains("invalid character"), "v is {v:?}",);
let sql_val = SqlValue::DoubleQuotedString("MorningMyFriends".to_string());
let v = sql_value_to_value(
"a",
&ConcreteDataType::json_datatype(),
&sql_val,
None,
None,
false,
);
let v = call_sql_value_to_value!("a", ConcreteDataType::json_datatype(), &sql_val);
assert!(v.is_err());
let sql_val = SqlValue::DoubleQuotedString(r#"{"a":"b"}"#.to_string());
let v = sql_value_to_value(
"a",
&ConcreteDataType::json_datatype(),
&sql_val,
None,
None,
false,
)
.unwrap();
let v = call_sql_value_to_value!("a", ConcreteDataType::json_datatype(), &sql_val)?;
assert_eq!(
Value::Binary(Bytes::from(
jsonb::parse_value(r#"{"a":"b"}"#.as_bytes())
@@ -808,16 +719,15 @@ mod test {
)),
v
);
Ok(())
}
#[test]
fn test_parse_json_to_jsonb() {
match parse_string_to_value(
match call_parse_string_to_value!(
"json_col",
r#"{"a": "b"}"#.to_string(),
&ConcreteDataType::json_datatype(),
None,
false,
ConcreteDataType::json_datatype()
) {
Ok(Value::Binary(b)) => {
assert_eq!(
@@ -833,12 +743,10 @@ mod test {
}
assert!(
parse_string_to_value(
call_parse_string_to_value!(
"json_col",
r#"Nicola Kovac is the best rifler in the world"#.to_string(),
&ConcreteDataType::json_datatype(),
None,
false,
ConcreteDataType::json_datatype()
)
.is_err()
)
@@ -878,13 +786,10 @@ mod test {
#[test]
fn test_parse_date_literal() {
let value = sql_value_to_value(
let value = call_sql_value_to_value!(
"date",
&ConcreteDataType::date_datatype(),
&SqlValue::DoubleQuotedString("2022-02-22".to_string()),
None,
None,
false,
ConcreteDataType::date_datatype(),
&SqlValue::DoubleQuotedString("2022-02-22".to_string())
)
.unwrap();
assert_eq!(ConcreteDataType::date_datatype(), value.data_type());
@@ -895,13 +800,11 @@ mod test {
}
// with timezone
let value = sql_value_to_value(
let value = call_sql_value_to_value!(
"date",
&ConcreteDataType::date_datatype(),
ConcreteDataType::date_datatype(),
&SqlValue::DoubleQuotedString("2022-02-22".to_string()),
Some(&Timezone::from_tz_string("+07:00").unwrap()),
None,
false,
timezone = &Timezone::from_tz_string("+07:00").unwrap()
)
.unwrap();
assert_eq!(ConcreteDataType::date_datatype(), value.data_type());
@@ -913,16 +816,12 @@ mod test {
}
#[test]
fn test_parse_timestamp_literal() {
match parse_string_to_value(
fn test_parse_timestamp_literal() -> Result<()> {
match call_parse_string_to_value!(
"timestamp_col",
"2022-02-22T00:01:01+08:00".to_string(),
&ConcreteDataType::timestamp_millisecond_datatype(),
None,
false,
)
.unwrap()
{
ConcreteDataType::timestamp_millisecond_datatype()
)? {
Value::Timestamp(ts) => {
assert_eq!(1645459261000, ts.value());
assert_eq!(TimeUnit::Millisecond, ts.unit());
@@ -932,15 +831,11 @@ mod test {
}
}
match parse_string_to_value(
match call_parse_string_to_value!(
"timestamp_col",
"2022-02-22T00:01:01+08:00".to_string(),
&ConcreteDataType::timestamp_datatype(TimeUnit::Second),
None,
false,
)
.unwrap()
{
ConcreteDataType::timestamp_datatype(TimeUnit::Second)
)? {
Value::Timestamp(ts) => {
assert_eq!(1645459261, ts.value());
assert_eq!(TimeUnit::Second, ts.unit());
@@ -950,15 +845,11 @@ mod test {
}
}
match parse_string_to_value(
match call_parse_string_to_value!(
"timestamp_col",
"2022-02-22T00:01:01+08:00".to_string(),
&ConcreteDataType::timestamp_datatype(TimeUnit::Microsecond),
None,
false,
)
.unwrap()
{
ConcreteDataType::timestamp_datatype(TimeUnit::Microsecond)
)? {
Value::Timestamp(ts) => {
assert_eq!(1645459261000000, ts.value());
assert_eq!(TimeUnit::Microsecond, ts.unit());
@@ -968,15 +859,11 @@ mod test {
}
}
match parse_string_to_value(
match call_parse_string_to_value!(
"timestamp_col",
"2022-02-22T00:01:01+08:00".to_string(),
&ConcreteDataType::timestamp_datatype(TimeUnit::Nanosecond),
None,
false,
)
.unwrap()
{
ConcreteDataType::timestamp_datatype(TimeUnit::Nanosecond)
)? {
Value::Timestamp(ts) => {
assert_eq!(1645459261000000000, ts.value());
assert_eq!(TimeUnit::Nanosecond, ts.unit());
@@ -987,26 +874,21 @@ mod test {
}
assert!(
parse_string_to_value(
call_parse_string_to_value!(
"timestamp_col",
"2022-02-22T00:01:01+08".to_string(),
&ConcreteDataType::timestamp_datatype(TimeUnit::Nanosecond),
None,
false,
ConcreteDataType::timestamp_datatype(TimeUnit::Nanosecond)
)
.is_err()
);
// with timezone
match parse_string_to_value(
match call_parse_string_to_value!(
"timestamp_col",
"2022-02-22T00:01:01".to_string(),
&ConcreteDataType::timestamp_datatype(TimeUnit::Nanosecond),
Some(&Timezone::from_tz_string("Asia/Shanghai").unwrap()),
false,
)
.unwrap()
{
ConcreteDataType::timestamp_datatype(TimeUnit::Nanosecond),
timezone = &Timezone::from_tz_string("Asia/Shanghai").unwrap()
)? {
Value::Timestamp(ts) => {
assert_eq!(1645459261000000000, ts.value());
assert_eq!("2022-02-21 16:01:01+0000", ts.to_iso8601_string());
@@ -1016,51 +898,42 @@ mod test {
unreachable!()
}
}
Ok(())
}
#[test]
fn test_parse_placeholder_value() {
assert!(
sql_value_to_value(
call_sql_value_to_value!(
"test",
&ConcreteDataType::string_datatype(),
ConcreteDataType::string_datatype(),
&SqlValue::Placeholder("default".into())
)
.is_err()
);
assert!(
call_sql_value_to_value!(
"test",
ConcreteDataType::string_datatype(),
&SqlValue::Placeholder("default".into()),
None,
None,
false
unary_op = UnaryOperator::Minus
)
.is_err()
);
assert!(
sql_value_to_value(
call_sql_value_to_value!(
"test",
&ConcreteDataType::string_datatype(),
&SqlValue::Placeholder("default".into()),
None,
Some(UnaryOperator::Minus),
false
)
.is_err()
);
assert!(
sql_value_to_value(
"test",
&ConcreteDataType::uint16_datatype(),
ConcreteDataType::uint16_datatype(),
&SqlValue::Number("3".into(), false),
None,
Some(UnaryOperator::Minus),
false
unary_op = UnaryOperator::Minus
)
.is_err()
);
assert!(
sql_value_to_value(
call_sql_value_to_value!(
"test",
&ConcreteDataType::uint16_datatype(),
&SqlValue::Number("3".into(), false),
None,
None,
false
ConcreteDataType::uint16_datatype(),
&SqlValue::Number("3".into(), false)
)
.is_ok()
);
@@ -1070,77 +943,60 @@ mod test {
fn test_auto_string_to_numeric() {
// Test with auto_string_to_numeric=true
let sql_val = SqlValue::SingleQuotedString("123".to_string());
let v = sql_value_to_value(
let v = call_sql_value_to_value!(
"a",
&ConcreteDataType::int32_datatype(),
ConcreteDataType::int32_datatype(),
&sql_val,
None,
None,
true,
auto_string_to_numeric
)
.unwrap();
assert_eq!(Value::Int32(123), v);
// Test with a float string
let sql_val = SqlValue::SingleQuotedString("3.5".to_string());
let v = sql_value_to_value(
let v = call_sql_value_to_value!(
"a",
&ConcreteDataType::float64_datatype(),
ConcreteDataType::float64_datatype(),
&sql_val,
None,
None,
true,
auto_string_to_numeric
)
.unwrap();
assert_eq!(Value::Float64(OrderedFloat(3.5)), v);
// Test with auto_string_to_numeric=false
let sql_val = SqlValue::SingleQuotedString("123".to_string());
let v = sql_value_to_value(
"a",
&ConcreteDataType::int32_datatype(),
&sql_val,
None,
None,
false,
);
let v = call_sql_value_to_value!("a", ConcreteDataType::int32_datatype(), &sql_val);
assert!(v.is_err());
// Test with an invalid numeric string but auto_string_to_numeric=true
// Should return an error now with the new auto_cast_to_numeric behavior
let sql_val = SqlValue::SingleQuotedString("not_a_number".to_string());
let v = sql_value_to_value(
let v = call_sql_value_to_value!(
"a",
&ConcreteDataType::int32_datatype(),
ConcreteDataType::int32_datatype(),
&sql_val,
None,
None,
true,
auto_string_to_numeric
);
assert!(v.is_err());
// Test with boolean type
let sql_val = SqlValue::SingleQuotedString("true".to_string());
let v = sql_value_to_value(
let v = call_sql_value_to_value!(
"a",
&ConcreteDataType::boolean_datatype(),
ConcreteDataType::boolean_datatype(),
&sql_val,
None,
None,
true,
auto_string_to_numeric
)
.unwrap();
assert_eq!(Value::Boolean(true), v);
// Non-numeric types should still be handled normally
let sql_val = SqlValue::SingleQuotedString("hello".to_string());
let v = sql_value_to_value(
let v = call_sql_value_to_value!(
"a",
&ConcreteDataType::string_datatype(),
ConcreteDataType::string_datatype(),
&sql_val,
None,
None,
true,
auto_string_to_numeric
);
assert!(v.is_ok());
}

View File

@@ -14,8 +14,8 @@
use common_time::timezone::Timezone;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnDefaultConstraint;
use datatypes::schema::constraint::{CURRENT_TIMESTAMP, CURRENT_TIMESTAMP_FN};
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema};
use snafu::ensure;
use sqlparser::ast::ValueWithSpan;
pub use sqlparser::ast::{
@@ -47,9 +47,12 @@ pub fn parse_column_default_constraint(
);
let default_constraint = match &opt.option {
ColumnOption::Default(Expr::Value(v)) => ColumnDefaultConstraint::Value(
sql_value_to_value(column_name, data_type, &v.value, timezone, None, false)?,
),
ColumnOption::Default(Expr::Value(v)) => {
let schema = ColumnSchema::new(column_name, data_type.clone(), true);
ColumnDefaultConstraint::Value(sql_value_to_value(
&schema, &v.value, timezone, None, false,
)?)
}
ColumnOption::Default(Expr::Function(func)) => {
let mut func = format!("{func}").to_lowercase();
// normalize CURRENT_TIMESTAMP to CURRENT_TIMESTAMP()
@@ -80,8 +83,7 @@ pub fn parse_column_default_constraint(
if let Expr::Value(v) = &**expr {
let value = sql_value_to_value(
column_name,
data_type,
&ColumnSchema::new(column_name, data_type.clone(), true),
&v.value,
timezone,
Some(*op),

View File

@@ -26,9 +26,9 @@ use std::sync::Arc;
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value as Json};
use snafu::{ResultExt, ensure};
use snafu::{OptionExt, ResultExt, ensure};
use crate::error::{self, Error};
use crate::error::{self, InvalidJsonSnafu, Result, SerializeSnafu};
use crate::json::value::{JsonValue, JsonVariant};
use crate::types::json_type::{JsonNativeType, JsonNumberType, JsonObjectType};
use crate::types::{StructField, StructType};
@@ -71,7 +71,7 @@ impl JsonStructureSettings {
pub const RAW_FIELD: &'static str = "_raw";
/// Decode an encoded StructValue back into a serde_json::Value.
pub fn decode(&self, value: Value) -> Result<Json, Error> {
pub fn decode(&self, value: Value) -> Result<Json> {
let context = JsonContext {
key_path: String::new(),
settings: self,
@@ -82,7 +82,7 @@ impl JsonStructureSettings {
/// Decode a StructValue that was encoded with current settings back into a fully structured StructValue.
/// This is useful for reconstructing the original structure from encoded data, especially when
/// unstructured encoding was used for some fields.
pub fn decode_struct(&self, struct_value: StructValue) -> Result<StructValue, Error> {
pub fn decode_struct(&self, struct_value: StructValue) -> Result<StructValue> {
let context = JsonContext {
key_path: String::new(),
settings: self,
@@ -91,7 +91,11 @@ impl JsonStructureSettings {
}
/// Encode a serde_json::Value into a Value::Json using current settings.
pub fn encode(&self, json: Json) -> Result<Value, Error> {
pub fn encode(&self, json: Json) -> Result<Value> {
if let Some(json_struct) = self.json_struct() {
return encode_by_struct(json_struct, json);
}
let context = JsonContext {
key_path: String::new(),
settings: self,
@@ -104,13 +108,21 @@ impl JsonStructureSettings {
&self,
json: Json,
data_type: Option<&JsonNativeType>,
) -> Result<Value, Error> {
) -> Result<Value> {
let context = JsonContext {
key_path: String::new(),
settings: self,
};
encode_json_with_context(json, data_type, &context).map(|v| Value::Json(Box::new(v)))
}
fn json_struct(&self) -> Option<&StructType> {
match &self {
JsonStructureSettings::Structured(fields) => fields.as_ref(),
JsonStructureSettings::PartialUnstructuredByKey { fields, .. } => fields.as_ref(),
_ => None,
}
}
}
impl Default for JsonStructureSettings {
@@ -144,12 +156,54 @@ impl<'a> JsonContext<'a> {
}
}
fn encode_by_struct(json_struct: &StructType, mut json: Json) -> Result<Value> {
let Some(json_object) = json.as_object_mut() else {
return InvalidJsonSnafu {
value: "expect JSON object when struct is provided",
}
.fail();
};
let mut encoded = BTreeMap::new();
fn extract_field(json_object: &mut Map<String, Json>, field: &str) -> Result<Option<Json>> {
let (first, rest) = field.split_once('.').unwrap_or((field, ""));
if rest.is_empty() {
Ok(json_object.remove(first))
} else {
let Some(value) = json_object.get_mut(first) else {
return Ok(None);
};
let json_object = value.as_object_mut().with_context(|| InvalidJsonSnafu {
value: format!(r#"expect "{}" an object"#, first),
})?;
extract_field(json_object, rest)
}
}
let fields = json_struct.fields();
for field in fields.iter() {
let Some(field_value) = extract_field(json_object, field.name())? else {
continue;
};
let field_type: JsonNativeType = field.data_type().into();
let field_value = try_convert_to_expected_type(field_value, &field_type)?;
encoded.insert(field.name().to_string(), field_value);
}
let rest = serde_json::to_string(json_object).context(SerializeSnafu)?;
encoded.insert(JsonStructureSettings::RAW_FIELD.to_string(), rest.into());
let value: JsonValue = encoded.into();
Ok(Value::Json(Box::new(value)))
}
/// Main encoding function with key path tracking
pub fn encode_json_with_context<'a>(
json: Json,
data_type: Option<&JsonNativeType>,
context: &JsonContext<'a>,
) -> Result<JsonValue, Error> {
) -> Result<JsonValue> {
// Check if the entire encoding should be unstructured
if matches!(context.settings, JsonStructureSettings::UnstructuredRaw) {
let json_string = json.to_string();
@@ -215,7 +269,7 @@ fn encode_json_object_with_context<'a>(
mut json_object: Map<String, Json>,
fields: Option<&JsonObjectType>,
context: &JsonContext<'a>,
) -> Result<JsonValue, Error> {
) -> Result<JsonValue> {
let mut object = BTreeMap::new();
// First, process fields from the provided schema in their original order
if let Some(fields) = fields {
@@ -248,7 +302,7 @@ fn encode_json_array_with_context<'a>(
json_array: Vec<Json>,
item_type: Option<&JsonNativeType>,
context: &JsonContext<'a>,
) -> Result<JsonValue, Error> {
) -> Result<JsonValue> {
let json_array_len = json_array.len();
let mut items = Vec::with_capacity(json_array_len);
let mut element_type = item_type.cloned();
@@ -286,7 +340,7 @@ fn encode_json_value_with_context<'a>(
json: Json,
expected_type: Option<&JsonNativeType>,
context: &JsonContext<'a>,
) -> Result<JsonValue, Error> {
) -> Result<JsonValue> {
// Check if current key should be treated as unstructured
if context.is_unstructured_key() {
return Ok(json.to_string().into());
@@ -301,7 +355,7 @@ fn encode_json_value_with_context<'a>(
if let Some(expected) = expected_type
&& let Ok(value) = try_convert_to_expected_type(i, expected)
{
return Ok(value);
return Ok(value.into());
}
Ok(i.into())
} else if let Some(u) = n.as_u64() {
@@ -309,7 +363,7 @@ fn encode_json_value_with_context<'a>(
if let Some(expected) = expected_type
&& let Ok(value) = try_convert_to_expected_type(u, expected)
{
return Ok(value);
return Ok(value.into());
}
if u <= i64::MAX as u64 {
Ok((u as i64).into())
@@ -321,7 +375,7 @@ fn encode_json_value_with_context<'a>(
if let Some(expected) = expected_type
&& let Ok(value) = try_convert_to_expected_type(f, expected)
{
return Ok(value);
return Ok(value.into());
}
// Default to f64 for floating point numbers
@@ -335,7 +389,7 @@ fn encode_json_value_with_context<'a>(
if let Some(expected) = expected_type
&& let Ok(value) = try_convert_to_expected_type(s.as_str(), expected)
{
return Ok(value);
return Ok(value.into());
}
Ok(s.into())
}
@@ -345,10 +399,7 @@ fn encode_json_value_with_context<'a>(
}
/// Main decoding function with key path tracking
pub fn decode_value_with_context<'a>(
value: Value,
context: &JsonContext<'a>,
) -> Result<Json, Error> {
pub fn decode_value_with_context(value: Value, context: &JsonContext) -> Result<Json> {
// Check if the entire decoding should be unstructured
if matches!(context.settings, JsonStructureSettings::UnstructuredRaw) {
return decode_unstructured_value(value);
@@ -370,7 +421,7 @@ pub fn decode_value_with_context<'a>(
fn decode_struct_with_context<'a>(
struct_value: StructValue,
context: &JsonContext<'a>,
) -> Result<Json, Error> {
) -> Result<Json> {
let mut json_object = Map::with_capacity(struct_value.len());
let (items, fields) = struct_value.into_parts();
@@ -385,10 +436,7 @@ fn decode_struct_with_context<'a>(
}
/// Decode a list value to JSON array
fn decode_list_with_context<'a>(
list_value: ListValue,
context: &JsonContext<'a>,
) -> Result<Json, Error> {
fn decode_list_with_context(list_value: ListValue, context: &JsonContext) -> Result<Json> {
let mut json_array = Vec::with_capacity(list_value.len());
let data_items = list_value.take_items();
@@ -403,7 +451,7 @@ fn decode_list_with_context<'a>(
}
/// Decode unstructured value (stored as string)
fn decode_unstructured_value(value: Value) -> Result<Json, Error> {
fn decode_unstructured_value(value: Value) -> Result<Json> {
match value {
// Handle expected format: StructValue with single _raw field
Value::Struct(struct_value) => {
@@ -443,7 +491,7 @@ fn decode_unstructured_value(value: Value) -> Result<Json, Error> {
}
/// Decode primitive value to JSON
fn decode_primitive_value(value: Value) -> Result<Json, Error> {
fn decode_primitive_value(value: Value) -> Result<Json> {
match value {
Value::Null => Ok(Json::Null),
Value::Boolean(b) => Ok(Json::Bool(b)),
@@ -487,7 +535,7 @@ fn decode_primitive_value(value: Value) -> Result<Json, Error> {
fn decode_struct_with_settings<'a>(
struct_value: StructValue,
context: &JsonContext<'a>,
) -> Result<StructValue, Error> {
) -> Result<StructValue> {
// Check if we can return the struct directly (Structured case)
if matches!(context.settings, JsonStructureSettings::Structured(_)) {
return Ok(struct_value);
@@ -567,7 +615,7 @@ fn decode_struct_with_settings<'a>(
fn decode_list_with_settings<'a>(
list_value: ListValue,
context: &JsonContext<'a>,
) -> Result<ListValue, Error> {
) -> Result<ListValue> {
let mut items = Vec::with_capacity(list_value.len());
let (data_items, datatype) = list_value.into_parts();
@@ -592,7 +640,7 @@ fn decode_list_with_settings<'a>(
}
/// Helper function to decode a struct that was encoded with UnstructuredRaw settings
fn decode_unstructured_raw_struct(struct_value: StructValue) -> Result<StructValue, Error> {
fn decode_unstructured_raw_struct(struct_value: StructValue) -> Result<StructValue> {
// For UnstructuredRaw, the struct must have exactly one field named "_raw"
if struct_value.struct_type().fields().len() == 1 {
let field = &struct_value.struct_type().fields()[0];
@@ -636,12 +684,9 @@ fn decode_unstructured_raw_struct(struct_value: StructValue) -> Result<StructVal
}
/// Helper function to try converting a value to an expected type
fn try_convert_to_expected_type<T>(
value: T,
expected_type: &JsonNativeType,
) -> Result<JsonValue, Error>
fn try_convert_to_expected_type<T>(value: T, expected_type: &JsonNativeType) -> Result<JsonVariant>
where
T: Into<JsonValue>,
T: Into<JsonVariant>,
{
let value = value.into();
let cast_error = || {
@@ -650,7 +695,7 @@ where
}
.fail()
};
let actual_type = value.json_type().native_type();
let actual_type = &value.native_type();
match (actual_type, expected_type) {
(x, y) if x == y => Ok(value),
(JsonNativeType::Number(x), JsonNativeType::Number(y)) => match (x, y) {
@@ -691,6 +736,107 @@ mod tests {
use crate::data_type::ConcreteDataType;
use crate::types::ListType;
#[test]
fn test_encode_by_struct() {
let json_struct: StructType = [
StructField::new("s", ConcreteDataType::string_datatype(), true),
StructField::new("foo.i", ConcreteDataType::int64_datatype(), true),
StructField::new("x.y.z", ConcreteDataType::boolean_datatype(), true),
]
.into();
let json = json!({
"s": "hello",
"t": "world",
"foo": {
"i": 1,
"j": 2
},
"x": {
"y": {
"z": true
}
}
});
let value = encode_by_struct(&json_struct, json).unwrap();
assert_eq!(
value.to_string(),
r#"Json({ _raw: {"foo":{"j":2},"t":"world","x":{"y":{}}}, foo.i: 1, s: hello, x.y.z: true })"#
);
let json = json!({
"t": "world",
"foo": {
"i": 1,
"j": 2
},
"x": {
"y": {
"z": true
}
}
});
let value = encode_by_struct(&json_struct, json).unwrap();
assert_eq!(
value.to_string(),
r#"Json({ _raw: {"foo":{"j":2},"t":"world","x":{"y":{}}}, foo.i: 1, x.y.z: true })"#
);
let json = json!({
"s": 1234,
"foo": {
"i": 1,
"j": 2
},
"x": {
"y": {
"z": true
}
}
});
let value = encode_by_struct(&json_struct, json).unwrap();
assert_eq!(
value.to_string(),
r#"Json({ _raw: {"foo":{"j":2},"x":{"y":{}}}, foo.i: 1, s: 1234, x.y.z: true })"#
);
let json = json!({
"s": "hello",
"t": "world",
"foo": {
"i": "bar",
"j": 2
},
"x": {
"y": {
"z": true
}
}
});
let result = encode_by_struct(&json_struct, json);
assert_eq!(
result.unwrap_err().to_string(),
"Cannot cast value bar to Number(I64)"
);
let json = json!({
"s": "hello",
"t": "world",
"foo": {
"i": 1,
"j": 2
},
"x": {
"y": "z"
}
});
let result = encode_by_struct(&json_struct, json);
assert_eq!(
result.unwrap_err().to_string(),
r#"Invalid JSON: expect "y" an object"#
);
}
#[test]
fn test_encode_json_null() {
let json = Json::Null;

View File

@@ -82,6 +82,18 @@ impl From<f64> for JsonNumber {
}
}
impl From<Number> for JsonNumber {
fn from(n: Number) -> Self {
if let Some(i) = n.as_i64() {
i.into()
} else if let Some(i) = n.as_u64() {
i.into()
} else {
n.as_f64().unwrap_or(f64::NAN).into()
}
}
}
impl Display for JsonNumber {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
@@ -109,7 +121,28 @@ pub enum JsonVariant {
}
impl JsonVariant {
fn native_type(&self) -> JsonNativeType {
pub(crate) fn as_i64(&self) -> Option<i64> {
match self {
JsonVariant::Number(n) => n.as_i64(),
_ => None,
}
}
pub(crate) fn as_u64(&self) -> Option<u64> {
match self {
JsonVariant::Number(n) => n.as_u64(),
_ => None,
}
}
pub(crate) fn as_f64(&self) -> Option<f64> {
match self {
JsonVariant::Number(n) => Some(n.as_f64()),
_ => None,
}
}
pub(crate) fn native_type(&self) -> JsonNativeType {
match self {
JsonVariant::Null => JsonNativeType::Null,
JsonVariant::Bool(_) => JsonNativeType::Bool,
@@ -205,6 +238,32 @@ impl<K: Into<String>, V: Into<JsonVariant>, const N: usize> From<[(K, V); N]> fo
}
}
impl From<serde_json::Value> for JsonVariant {
fn from(v: serde_json::Value) -> Self {
fn helper(v: serde_json::Value) -> JsonVariant {
match v {
serde_json::Value::Null => JsonVariant::Null,
serde_json::Value::Bool(b) => b.into(),
serde_json::Value::Number(n) => n.into(),
serde_json::Value::String(s) => s.into(),
serde_json::Value::Array(array) => {
JsonVariant::Array(array.into_iter().map(helper).collect())
}
serde_json::Value::Object(object) => {
JsonVariant::Object(object.into_iter().map(|(k, v)| (k, helper(v))).collect())
}
}
}
helper(v)
}
}
impl From<BTreeMap<String, JsonVariant>> for JsonVariant {
fn from(v: BTreeMap<String, JsonVariant>) -> Self {
Self::Object(v)
}
}
impl Display for JsonVariant {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
@@ -277,24 +336,11 @@ impl JsonValue {
}
pub(crate) fn as_i64(&self) -> Option<i64> {
match self.json_variant {
JsonVariant::Number(n) => n.as_i64(),
_ => None,
}
self.json_variant.as_i64()
}
pub(crate) fn as_u64(&self) -> Option<u64> {
match self.json_variant {
JsonVariant::Number(n) => n.as_u64(),
_ => None,
}
}
pub(crate) fn as_f64(&self) -> Option<f64> {
match self.json_variant {
JsonVariant::Number(n) => Some(n.as_f64()),
_ => None,
}
self.json_variant.as_u64()
}
pub(crate) fn as_f64_lossy(&self) -> Option<f64> {

View File

@@ -122,9 +122,9 @@ pub struct StructField {
}
impl StructField {
pub fn new(name: String, data_type: ConcreteDataType, nullable: bool) -> Self {
pub fn new<T: Into<String>>(name: T, data_type: ConcreteDataType, nullable: bool) -> Self {
StructField {
name,
name: name.into(),
data_type,
nullable,
metadata: BTreeMap::new(),

View File

@@ -339,6 +339,7 @@ pub async fn metasrv_builder(
opts.meta_schema_name.as_deref(),
&opts.meta_table_name,
opts.max_txn_ops,
opts.auto_create_schema,
)
.await
.context(error::KvBackendSnafu)?;

View File

@@ -231,6 +231,9 @@ pub struct MetasrvOptions {
#[cfg(feature = "pg_kvbackend")]
/// Optional PostgreSQL schema for metadata table (defaults to current search_path if empty).
pub meta_schema_name: Option<String>,
#[cfg(feature = "pg_kvbackend")]
/// Automatically create PostgreSQL schema if it doesn't exist (default: true).
pub auto_create_schema: bool,
#[serde(with = "humantime_serde")]
pub node_max_idle_time: Duration,
/// The event recorder options.
@@ -333,6 +336,8 @@ impl Default for MetasrvOptions {
meta_election_lock_id: common_meta::kv_backend::DEFAULT_META_ELECTION_LOCK_ID,
#[cfg(feature = "pg_kvbackend")]
meta_schema_name: None,
#[cfg(feature = "pg_kvbackend")]
auto_create_schema: true,
node_max_idle_time: Duration::from_secs(24 * 60 * 60),
event_recorder: EventRecorderOptions::default(),
stats_persistence: StatsPersistenceOptions::default(),

View File

@@ -410,8 +410,7 @@ fn sql_value_to_value(
})?
} else {
common_sql::convert::sql_value_to_value(
column,
&column_schema.data_type,
column_schema,
sql_val,
timezone,
None,

View File

@@ -52,6 +52,7 @@ use common_time::Timestamp;
use common_time::range::TimestampRange;
use datafusion_expr::LogicalPlan;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use humantime::format_duration;
use itertools::Itertools;
use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
@@ -644,11 +645,20 @@ impl StatementExecutor {
})?
.unit();
let start_column = ColumnSchema::new(
"range_start",
ConcreteDataType::timestamp_datatype(time_unit),
false,
);
let end_column = ColumnSchema::new(
"range_end",
ConcreteDataType::timestamp_datatype(time_unit),
false,
);
let mut time_ranges = Vec::with_capacity(sql_values_time_range.len());
for (start, end) in sql_values_time_range {
let start = common_sql::convert::sql_value_to_value(
"range_start",
&ConcreteDataType::timestamp_datatype(time_unit),
&start_column,
start,
Some(&query_ctx.timezone()),
None,
@@ -667,8 +677,7 @@ impl StatementExecutor {
})?;
let end = common_sql::convert::sql_value_to_value(
"range_end",
&ConcreteDataType::timestamp_datatype(time_unit),
&end_column,
end,
Some(&query_ctx.timezone()),
None,

View File

@@ -242,8 +242,12 @@ fn values_to_vectors_by_exact_types(
args.iter()
.zip(exact_types.iter())
.map(|(value, data_type)| {
let data_type = &ConcreteDataType::from_arrow_type(data_type);
let value = sql_value_to_value(DUMMY_COLUMN, data_type, value, tz, None, false)
let schema = ColumnSchema::new(
DUMMY_COLUMN,
ConcreteDataType::from_arrow_type(data_type),
true,
);
let value = sql_value_to_value(&schema, value, tz, None, false)
.context(error::SqlCommonSnafu)?;
Ok(value_to_vector(value))
@@ -260,10 +264,12 @@ fn values_to_vectors_by_valid_types(
args.iter()
.map(|value| {
for data_type in valid_types {
let data_type = &ConcreteDataType::from_arrow_type(data_type);
if let Ok(value) =
sql_value_to_value(DUMMY_COLUMN, data_type, value, tz, None, false)
{
let schema = ColumnSchema::new(
DUMMY_COLUMN,
ConcreteDataType::from_arrow_type(data_type),
true,
);
if let Ok(value) = sql_value_to_value(&schema, value, tz, None, false) {
return Ok(value_to_vector(value));
}
}

View File

@@ -50,7 +50,7 @@ use common_time::{Timestamp, Timezone};
use datafusion_common::tree_node::TreeNodeVisitor;
use datafusion_expr::LogicalPlan;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{RawSchema, Schema};
use datatypes::schema::{ColumnSchema, RawSchema, Schema};
use datatypes::value::Value;
use partition::expr::{Operand, PartitionExpr, RestrictedOp};
use partition::multi_dim::MultiDimPartitionRule;
@@ -2001,8 +2001,7 @@ fn convert_value(
unary_op: Option<UnaryOperator>,
) -> Result<Value> {
sql_value_to_value(
"<NONAME>",
&data_type,
&ColumnSchema::new("<NONAME>", data_type, true),
value,
Some(timezone),
unary_op,

View File

@@ -23,11 +23,16 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use arrow::array::ArrayRef;
use arrow::array::{
ArrayRef, AsArray, TimestampMicrosecondArray, TimestampMillisecondArray,
TimestampNanosecondArray, TimestampSecondArray,
};
use arrow::compute::{concat, concat_batches, take_record_batch};
use arrow_schema::SchemaRef;
use arrow_schema::{Schema, SchemaRef};
use common_recordbatch::{DfRecordBatch, DfSendableRecordBatchStream};
use common_telemetry::warn;
use common_time::Timestamp;
use common_time::timestamp::TimeUnit;
use datafusion::common::arrow::compute::sort_to_indices;
use datafusion::execution::memory_pool::{MemoryConsumer, MemoryReservation};
use datafusion::execution::{RecordBatchStream, TaskContext};
@@ -40,8 +45,9 @@ use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties, TopK,
TopKDynamicFilters,
};
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{DataFusionError, internal_err};
use datafusion_physical_expr::expressions::{DynamicFilterPhysicalExpr, lit};
use datafusion_physical_expr::expressions::{Column, DynamicFilterPhysicalExpr, lit};
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
use futures::{Stream, StreamExt};
use itertools::Itertools;
@@ -347,6 +353,9 @@ struct PartSortStream {
range_groups: Vec<(Timestamp, usize, usize)>,
/// Current group being processed (index into range_groups).
cur_group_idx: usize,
/// Dynamic Filter for all TopK instance, notice the `PartSortExec`/`PartSortStream`/`TopK` must share the same filter
/// so that updates from each `TopK` can be seen by others(and by the table scan operator).
filter: Option<Arc<RwLock<TopKDynamicFilters>>>,
}
impl PartSortStream {
@@ -360,7 +369,7 @@ impl PartSortStream {
filter: Option<Arc<RwLock<TopKDynamicFilters>>>,
) -> datafusion_common::Result<Self> {
let buffer = if let Some(limit) = limit {
let Some(filter) = filter else {
let Some(filter) = filter.clone() else {
return internal_err!(
"TopKDynamicFilters must be provided when limit is set at {}",
snafu::location!()
@@ -377,7 +386,7 @@ impl PartSortStream {
context.session_config().batch_size(),
context.runtime_env(),
&sort.metrics,
filter,
filter.clone(),
)?,
0,
)
@@ -407,23 +416,11 @@ impl PartSortStream {
root_metrics: sort.metrics.clone(),
range_groups,
cur_group_idx: 0,
filter,
})
}
}
macro_rules! ts_to_timestamp {
($t:ty, $unit:expr, $arr:expr) => {{
let arr = $arr
.as_any()
.downcast_ref::<arrow::array::PrimitiveArray<$t>>()
.unwrap();
arr.iter()
.map(|v| v.map(|v| Timestamp::new(v, common_time::timestamp::TimeUnit::from(&$unit))))
.collect_vec()
}};
}
macro_rules! array_check_helper {
($t:ty, $unit:expr, $arr:expr, $cur_range:expr, $min_max_idx:expr) => {{
if $cur_range.start.unit().as_arrow_time_unit() != $unit
@@ -546,9 +543,10 @@ impl PartSortStream {
Ok(())
}
/// A temporary solution for stop read earlier when current group do not overlap with any of those next group
/// Stop read earlier when current group do not overlap with any of those next group
/// If not overlap, we can stop read further input as current top k is final
fn can_stop_early(&mut self) -> datafusion_common::Result<bool> {
/// Use dynamic filter to evaluate the next group's primary end
fn can_stop_early(&mut self, schema: &Arc<Schema>) -> datafusion_common::Result<bool> {
let topk_cnt = match &self.buffer {
PartSortBuffer::Top(_, cnt) => *cnt,
_ => return Ok(false),
@@ -557,46 +555,74 @@ impl PartSortStream {
if Some(topk_cnt) < self.limit {
return Ok(false);
}
// else check if last value in topk is not in next group range
let topk_buffer = self.sort_top_buffer()?;
// Guard against empty buffer - this can happen if TopK's internal filtering
// removed all rows, or if the buffer was cleared. In this case, we cannot
// determine if we can stop early, so continue processing.
// Fixes: https://github.com/orgs/GreptimeTeam/discussions/7457
if topk_buffer.num_rows() == 0 {
return Ok(false);
}
let min_batch = topk_buffer.slice(topk_buffer.num_rows() - 1, 1);
let min_sort_column = self.expression.evaluate_to_sort_column(&min_batch)?.values;
let last_val = downcast_ts_array!(
min_sort_column.data_type() => (ts_to_timestamp, min_sort_column),
_ => internal_err!(
"Unsupported data type for sort column: {:?}",
min_sort_column.data_type()
)?,
)[0];
let Some(last_val) = last_val else {
return Ok(false);
};
let next_group_primary_end = if self.cur_group_idx + 1 < self.range_groups.len() {
self.range_groups[self.cur_group_idx + 1].0
} else {
// no next group
return Ok(false);
};
let descending = self.expression.options.descending;
let not_in_next_group_range = if descending {
last_val >= next_group_primary_end
} else {
last_val < next_group_primary_end
// dyn filter is updated based on the last value of topk heap("threshold")
// it's a max-heap for a ASC TopK operator
// so can use dyn filter to prune data range
let filter = self
.filter
.as_ref()
.expect("TopKDynamicFilters must be provided when limit is set");
let filter = filter.read().expr().current()?;
let mut ts_index = None;
// invariant: the filter must contain only the same column expr that's time index column
let filter = filter
.transform_down(|c| {
// rewrite all column's index as 0
if let Some(column) = c.as_any().downcast_ref::<Column>() {
ts_index = Some(column.index());
Ok(Transformed::yes(
Arc::new(Column::new(column.name(), 0)) as Arc<dyn PhysicalExpr>
))
} else {
Ok(Transformed::no(c))
}
})?
.data;
let Some(ts_index) = ts_index else {
return Ok(false); // dyn filter is still true, cannot decide, continue read
};
// refill topk buffer count
self.push_buffer(topk_buffer)?;
Ok(not_in_next_group_range)
let field = if schema.fields().len() <= ts_index {
warn!(
"Schema mismatch when evaluating dynamic filter for PartSortExec at {}, schema: {:?}, ts_index: {}",
self.partition, schema, ts_index
);
return Ok(false); // schema mismatch, cannot decide, continue read
} else {
schema.field(ts_index)
};
let schema = Arc::new(Schema::new(vec![field.clone()]));
// convert next_group_primary_end to array&filter, if eval to false, means no overlap, can stop early
let primary_end_array = match next_group_primary_end.unit() {
TimeUnit::Second => Arc::new(TimestampSecondArray::from(vec![
next_group_primary_end.value(),
])) as ArrayRef,
TimeUnit::Millisecond => Arc::new(TimestampMillisecondArray::from(vec![
next_group_primary_end.value(),
])) as ArrayRef,
TimeUnit::Microsecond => Arc::new(TimestampMicrosecondArray::from(vec![
next_group_primary_end.value(),
])) as ArrayRef,
TimeUnit::Nanosecond => Arc::new(TimestampNanosecondArray::from(vec![
next_group_primary_end.value(),
])) as ArrayRef,
};
let primary_end_batch = DfRecordBatch::try_new(schema, vec![primary_end_array])?;
let res = filter.evaluate(&primary_end_batch)?;
let array = res.into_array(primary_end_batch.num_rows())?;
let filter = array.as_boolean().clone();
let overlap = filter.iter().next().flatten();
if let Some(false) = overlap {
Ok(true)
} else {
Ok(false)
}
}
/// Check if the given partition index is within the current group.
@@ -749,9 +775,13 @@ impl PartSortStream {
/// Internal method for sorting `Top` buffer (with limit).
fn sort_top_buffer(&mut self) -> datafusion_common::Result<DfRecordBatch> {
let filter = Arc::new(RwLock::new(TopKDynamicFilters::new(Arc::new(
DynamicFilterPhysicalExpr::new(vec![], lit(true)),
))));
let Some(filter) = self.filter.clone() else {
return internal_err!(
"TopKDynamicFilters must be provided when sorting with limit at {}",
snafu::location!()
);
};
let new_top_buffer = TopK::try_new(
self.partition,
self.schema().clone(),
@@ -888,7 +918,7 @@ impl PartSortStream {
// When TopK is fulfilled and we are switching to a new group, stop consuming further ranges if possible.
// read from topk heap and determine whether we can stop earlier.
if !in_same_group && self.can_stop_early()? {
if !in_same_group && self.can_stop_early(&batch.schema())? {
self.input_complete = true;
self.evaluating_batch = None;
return Ok(());
@@ -1127,7 +1157,7 @@ mod test {
// The TopK result buffer is empty, so we cannot determine early-stop.
// Ensure this path returns `Ok(false)` (and, importantly, does not panic).
assert!(!stream.can_stop_early().unwrap());
assert!(!stream.can_stop_early(&schema).unwrap());
}
#[ignore = "hard to gen expected data correctly here, TODO(discord9): fix it later"]
@@ -2096,12 +2126,11 @@ mod test {
// Group 1 (end=100) has 6 rows, TopK will keep top 4
// Group 2 (end=98) has 3 rows - threshold (96) < 98, so next group
// could theoretically have better values. But limit exhaustion stops us.
// Note: Data values must not overlap between ranges to avoid ambiguity.
// could theoretically have better values. Continue reading.
let input_ranged_data = vec![
(
PartitionRange {
start: Timestamp::new(70, unit.into()),
start: Timestamp::new(90, unit.into()),
end: Timestamp::new(100, unit.into()),
num_rows: 6,
identifier: 0,
@@ -2888,4 +2917,88 @@ mod test {
)
.await;
}
/// First group: [0,20), data: [0, 5, 15]
/// Second group: [10, 30), data: [21, 25, 29]
/// after first group, calling early stop manually, and check if filter is updated
#[tokio::test]
async fn test_early_stop_check_update_dyn_filter() {
let unit = TimeUnit::Millisecond;
let schema = Arc::new(Schema::new(vec![Field::new(
"ts",
DataType::Timestamp(unit, None),
false,
)]));
let mock_input = Arc::new(MockInputExec::new(vec![vec![]], schema.clone()));
let exec = PartSortExec::try_new(
PhysicalSortExpr {
expr: Arc::new(Column::new("ts", 0)),
options: SortOptions {
descending: false,
..Default::default()
},
},
Some(3),
vec![vec![
PartitionRange {
start: Timestamp::new(0, unit.into()),
end: Timestamp::new(20, unit.into()),
num_rows: 3,
identifier: 1,
},
PartitionRange {
start: Timestamp::new(10, unit.into()),
end: Timestamp::new(30, unit.into()),
num_rows: 3,
identifier: 1,
},
]],
mock_input.clone(),
)
.unwrap();
let filter = exec.filter.clone().unwrap();
let input_stream = mock_input
.execute(0, Arc::new(TaskContext::default()))
.unwrap();
let mut stream = PartSortStream::new(
Arc::new(TaskContext::default()),
&exec,
Some(3),
input_stream,
vec![],
0,
Some(filter.clone()),
)
.unwrap();
// initially, snapshot_generation is 1
assert_eq!(filter.read().expr().snapshot_generation(), 1);
let batch =
DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![0, 5, 15])])
.unwrap();
stream.push_buffer(batch).unwrap();
// after pushing first batch, snapshot_generation is updated to 2
assert_eq!(filter.read().expr().snapshot_generation(), 2);
assert!(!stream.can_stop_early(&schema).unwrap());
// still two as not updated
assert_eq!(filter.read().expr().snapshot_generation(), 2);
let _ = stream.sort_top_buffer().unwrap();
let batch =
DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![21, 25, 29])])
.unwrap();
stream.push_buffer(batch).unwrap();
// still two as not updated
assert_eq!(filter.read().expr().snapshot_generation(), 2);
let new = stream.sort_top_buffer().unwrap();
// still two as not updated
assert_eq!(filter.read().expr().snapshot_generation(), 2);
// dyn filter kick in, and filter out all rows >= 15(the filter is rows<15)
assert_eq!(new.num_rows(), 0)
}
}

View File

@@ -22,6 +22,7 @@ use common_time::{Date, Timestamp};
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_expr::LogicalPlan;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use datatypes::types::TimestampType;
use datatypes::value::{self, Value};
use itertools::Itertools;
@@ -254,9 +255,10 @@ pub fn convert_value(param: &ParamValue, t: &ConcreteDataType) -> Result<ScalarV
/// Convert an MySQL expression to a scalar value.
/// It automatically handles the conversion of strings to numeric values.
pub fn convert_expr_to_scalar_value(param: &Expr, t: &ConcreteDataType) -> Result<ScalarValue> {
let column_schema = ColumnSchema::new("", t.clone(), true);
match param {
Expr::Value(v) => {
let v = sql_value_to_value("", t, &v.value, None, None, true);
let v = sql_value_to_value(&column_schema, &v.value, None, None, true);
match v {
Ok(v) => v
.try_to_scalar_value(t)
@@ -268,7 +270,7 @@ pub fn convert_expr_to_scalar_value(param: &Expr, t: &ConcreteDataType) -> Resul
}
}
Expr::UnaryOp { op, expr } if let Expr::Value(v) = &**expr => {
let v = sql_value_to_value("", t, &v.value, None, Some(*op), true);
let v = sql_value_to_value(&column_schema, &v.value, None, Some(*op), true);
match v {
Ok(v) => v
.try_to_scalar_value(t)

View File

@@ -40,4 +40,8 @@ impl Dialect for GreptimeDbDialect {
fn supports_filter_during_aggregation(&self) -> bool {
true
}
fn supports_struct_literal(&self) -> bool {
true
}
}

View File

@@ -215,6 +215,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("Invalid JSON structure setting, reason: {reason}"))]
InvalidJsonStructureSetting {
reason: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to serialize column default constraint"))]
SerializeColumnDefaultConstraint {
#[snafu(implicit)]
@@ -374,6 +381,7 @@ impl ErrorExt for Error {
InvalidColumnOption { .. }
| InvalidExprAsOptionValue { .. }
| InvalidJsonStructureSetting { .. }
| InvalidDatabaseName { .. }
| InvalidDatabaseOption { .. }
| ColumnTypeMismatch { .. }

View File

@@ -40,16 +40,17 @@ pub(super) fn parse_json_datatype_options(parser: &mut Parser<'_>) -> Result<Opt
#[cfg(test)]
mod tests {
use sqlparser::ast::DataType;
use sqlparser::ast::{DataType, Expr, Ident, StructField};
use crate::dialect::GreptimeDbDialect;
use crate::parser::{ParseOptions, ParserContext};
use crate::statements::OptionMap;
use crate::statements::create::{
Column, JSON_FORMAT_FULL_STRUCTURED, JSON_FORMAT_PARTIAL, JSON_FORMAT_RAW, JSON_OPT_FORMAT,
JSON_OPT_UNSTRUCTURED_KEYS,
Column, JSON_FORMAT_FULL_STRUCTURED, JSON_FORMAT_PARTIAL, JSON_FORMAT_RAW, JSON_OPT_FIELDS,
JSON_OPT_FORMAT, JSON_OPT_UNSTRUCTURED_KEYS,
};
use crate::statements::statement::Statement;
use crate::util::OptionValue;
#[test]
fn test_parse_json_datatype_options() {
@@ -77,6 +78,42 @@ mod tests {
let sql = r#"
CREATE TABLE json_data (
my_json JSON(format = "partial", fields = Struct<i Int, "o.a" String, "o.b" String, `x.y.z` Float64>),
ts TIMESTAMP TIME INDEX,
)"#;
let options = parse(sql).unwrap();
assert_eq!(options.len(), 2);
let option = options.value(JSON_OPT_FIELDS);
let expected = OptionValue::try_new(Expr::Struct {
values: vec![],
fields: vec![
StructField {
field_name: Some(Ident::new("i")),
field_type: DataType::Int(None),
options: None,
},
StructField {
field_name: Some(Ident::with_quote('"', "o.a")),
field_type: DataType::String(None),
options: None,
},
StructField {
field_name: Some(Ident::with_quote('"', "o.b")),
field_type: DataType::String(None),
options: None,
},
StructField {
field_name: Some(Ident::with_quote('`', "x.y.z")),
field_type: DataType::Float64,
options: None,
},
],
})
.ok();
assert_eq!(option, expected.as_ref());
let sql = r#"
CREATE TABLE json_data (
my_json JSON(format = "partial", unstructured_keys = ["k", "foo.bar", "a.b.c"]),
ts TIMESTAMP TIME INDEX,
)"#;

View File

@@ -40,6 +40,7 @@ use api::v1::SemanticType;
use common_sql::default_constraint::parse_column_default_constraint;
use common_time::timezone::Timezone;
use datatypes::extension::json::{JsonExtensionType, JsonMetadata};
use datatypes::json::JsonStructureSettings;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{COMMENT_KEY, ColumnDefaultConstraint, ColumnSchema};
use datatypes::types::json_type::JsonNativeType;
@@ -281,8 +282,17 @@ pub fn sql_data_type_to_concrete_data_type(
}
},
SqlDataType::JSON => {
let format = if column_extensions.json_datatype_options.is_some() {
JsonFormat::Native(Box::new(JsonNativeType::Null))
let format = if let Some(x) = column_extensions.build_json_structure_settings()? {
if let Some(fields) = match x {
JsonStructureSettings::Structured(fields) => fields,
JsonStructureSettings::UnstructuredRaw => None,
JsonStructureSettings::PartialUnstructuredByKey { fields, .. } => fields,
} {
let datatype = &ConcreteDataType::Struct(fields);
JsonFormat::Native(Box::new(datatype.into()))
} else {
JsonFormat::Native(Box::new(JsonNativeType::Null))
}
} else {
JsonFormat::Jsonb
};

View File

@@ -14,27 +14,30 @@
use std::collections::{HashMap, HashSet};
use std::fmt::{Display, Formatter};
use std::sync::Arc;
use common_catalog::consts::FILE_ENGINE;
use datatypes::data_type::ConcreteDataType;
use datatypes::json::JsonStructureSettings;
use datatypes::schema::{
FulltextOptions, SkippingIndexOptions, VectorDistanceMetric, VectorIndexEngineType,
VectorIndexOptions,
};
use datatypes::types::StructType;
use itertools::Itertools;
use serde::Serialize;
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};
use sqlparser::ast::{ColumnOptionDef, DataType, Expr, Query};
use sqlparser_derive::{Visit, VisitMut};
use crate::ast::{ColumnDef, Ident, ObjectName, Value as SqlValue};
use crate::error::{
InvalidFlowQuerySnafu, InvalidSqlSnafu, Result, SetFulltextOptionSnafu,
SetSkippingIndexOptionSnafu,
InvalidFlowQuerySnafu, InvalidJsonStructureSettingSnafu, InvalidSqlSnafu, Result,
SetFulltextOptionSnafu, SetSkippingIndexOptionSnafu,
};
use crate::statements::OptionMap;
use crate::statements::statement::Statement;
use crate::statements::tql::Tql;
use crate::statements::{OptionMap, sql_data_type_to_concrete_data_type};
use crate::util::OptionValue;
const LINE_SEP: &str = ",\n";
@@ -44,6 +47,7 @@ pub const VECTOR_OPT_DIM: &str = "dim";
pub const JSON_OPT_UNSTRUCTURED_KEYS: &str = "unstructured_keys";
pub const JSON_OPT_FORMAT: &str = "format";
pub(crate) const JSON_OPT_FIELDS: &str = "fields";
pub const JSON_FORMAT_FULL_STRUCTURED: &str = "structured";
pub const JSON_FORMAT_RAW: &str = "raw";
pub const JSON_FORMAT_PARTIAL: &str = "partial";
@@ -346,14 +350,51 @@ impl ColumnExtensions {
})
.unwrap_or_default();
let fields = if let Some(value) = options.value(JSON_OPT_FIELDS) {
let fields = value
.as_struct_fields()
.context(InvalidJsonStructureSettingSnafu {
reason: format!(r#"expect "{JSON_OPT_FIELDS}" a struct, actual: "{value}""#,),
})?;
let fields = fields
.iter()
.map(|field| {
let name = field.field_name.as_ref().map(|x| x.value.clone()).context(
InvalidJsonStructureSettingSnafu {
reason: format!(r#"missing field name in "{field}""#),
},
)?;
let datatype = sql_data_type_to_concrete_data_type(
&field.field_type,
&Default::default(),
)?;
Ok(datatypes::types::StructField::new(name, datatype, true))
})
.collect::<Result<_>>()?;
Some(StructType::new(Arc::new(fields)))
} else {
None
};
options
.get(JSON_OPT_FORMAT)
.map(|format| match format {
JSON_FORMAT_FULL_STRUCTURED => Ok(JsonStructureSettings::Structured(None)),
JSON_FORMAT_PARTIAL => Ok(JsonStructureSettings::PartialUnstructuredByKey {
fields: None,
unstructured_keys,
}),
JSON_FORMAT_FULL_STRUCTURED => Ok(JsonStructureSettings::Structured(fields)),
JSON_FORMAT_PARTIAL => {
let fields = fields.map(|fields| {
let mut fields = Arc::unwrap_or_clone(fields.fields());
fields.push(datatypes::types::StructField::new(
JsonStructureSettings::RAW_FIELD.to_string(),
ConcreteDataType::string_datatype(),
true,
));
StructType::new(Arc::new(fields))
});
Ok(JsonStructureSettings::PartialUnstructuredByKey {
fields,
unstructured_keys,
})
}
JSON_FORMAT_RAW => Ok(JsonStructureSettings::UnstructuredRaw),
_ => InvalidSqlSnafu {
msg: format!("unknown JSON datatype 'format': {format}"),

View File

@@ -19,7 +19,8 @@ use itertools::Itertools;
use serde::Serialize;
use snafu::ensure;
use sqlparser::ast::{
Array, Expr, Ident, ObjectName, SetExpr, SqlOption, TableFactor, Value, ValueWithSpan,
Array, Expr, Ident, ObjectName, SetExpr, SqlOption, StructField, TableFactor, Value,
ValueWithSpan,
};
use sqlparser_derive::{Visit, VisitMut};
@@ -52,9 +53,12 @@ pub fn format_raw_object_name(name: &ObjectName) -> String {
pub struct OptionValue(Expr);
impl OptionValue {
fn try_new(expr: Expr) -> Result<Self> {
pub(crate) fn try_new(expr: Expr) -> Result<Self> {
ensure!(
matches!(expr, Expr::Value(_) | Expr::Identifier(_) | Expr::Array(_)),
matches!(
expr,
Expr::Value(_) | Expr::Identifier(_) | Expr::Array(_) | Expr::Struct { .. }
),
InvalidExprAsOptionValueSnafu {
error: format!("{expr} not accepted")
}
@@ -106,6 +110,13 @@ impl OptionValue {
_ => None,
}
}
pub(crate) fn as_struct_fields(&self) -> Option<&[StructField]> {
match &self.0 {
Expr::Struct { fields, .. } => Some(fields),
_ => None,
}
}
}
impl From<String> for OptionValue {

View File

@@ -0,0 +1,10 @@
{"did":"did:plc:yj3sjq3blzpynh27cumnp5ks","time_us":1732206349000167,"kind":"commit","commit":{"rev":"3lbhtytnn2k2f","operation":"create","collection":"app.bsky.feed.post","rkey":"3lbhtyteurk2y","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-21T16:09:27.095Z","langs":["en"],"reply":{"parent":{"cid":"bafyreibfglofvqou2yiqvwzk4rcgkhhxrbunyemshdjledgwymimqkg24e","uri":"at://did:plc:6tr6tuzlx2db3rduzr2d6r24/app.bsky.feed.post/3lbhqo2rtys2z"},"root":{"cid":"bafyreibfglofvqou2yiqvwzk4rcgkhhxrbunyemshdjledgwymimqkg24e","uri":"at://did:plc:6tr6tuzlx2db3rduzr2d6r24/app.bsky.feed.post/3lbhqo2rtys2z"}},"text":"aaaaah. LIght shines in a corner of WTF...."},"cid":"bafyreidblutgvj75o4q4akzyyejedjj6l3it6hgqwee6jpwv2wqph5fsgm"}}
{"did":"did:plc:3i4xf2v4wcnyktgv6satke64","time_us":1732206349000644,"kind":"commit","commit":{"rev":"3lbhuvzds6d2a","operation":"create","collection":"app.bsky.feed.like","rkey":"3lbhuvzdked2a","record":{"$type":"app.bsky.feed.like","createdAt":"2024-11-21T16:25:46.221Z","subject":{"cid":"bafyreidjvrcmckkm765mct5fph36x7kupkfo35rjklbf2k76xkzwyiauge","uri":"at://did:plc:azrv4rcbws6kmcga4fsbphg2/app.bsky.feed.post/3lbgjdpbiec2l"}},"cid":"bafyreia5l5vrkh5oj4cjyhcqby2dprhyvcyofo2q5562tijlae2pzih23m"}}
{"did":"did:plc:gccfnqqizz4urhchsaie6jft","time_us":1732206349001108,"kind":"commit","commit":{"rev":"3lbhuvze3gi2u","operation":"create","collection":"app.bsky.graph.follow","rkey":"3lbhuvzdtmi2u","record":{"$type":"app.bsky.graph.follow","createdAt":"2024-11-21T16:27:40.923Z","subject":"did:plc:r7cdh4sgzqbfdc6wcdxxti7c"},"cid":"bafyreiew2p6cgirfaj45qoenm4fgumib7xoloclrap3jgkz5es7g7kby3i"}}
{"did":"did:plc:msxqf3twq7abtdw7dbfskphk","time_us":1732206349001372,"kind":"commit","commit":{"rev":"3lbhueija5p22","operation":"create","collection":"app.bsky.feed.like","rkey":"3lbhueiizcx22","record":{"$type":"app.bsky.feed.like","createdAt":"2024-11-21T16:15:58.232Z","subject":{"cid":"bafyreiavpshyqzrlo5m7fqodjhs6jevweqnif4phasiwimv4a7mnsqi2fe","uri":"at://did:plc:fusulxqc52zbrc75fi6xrcof/app.bsky.feed.post/3lbhskq5zn22f"}},"cid":"bafyreidjix4dauj2afjlbzmhj3a7gwftcevvmmy6edww6vrjdbst26rkby"}}
{"did":"did:plc:l5o3qjrmfztir54cpwlv2eme","time_us":1732206349001905,"kind":"commit","commit":{"rev":"3lbhtytohxc2o","operation":"create","collection":"app.bsky.feed.post","rkey":"3lbhtytjqzk2q","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-21T16:09:27.254Z","langs":["en"],"reply":{"parent":{"cid":"bafyreih35fe2jj3gchmgk4amold4l6sfxd2sby5wrg3jrws5fkdypxrbg4","uri":"at://did:plc:6wx2gg5yqgvmlu35r6y3bk6d/app.bsky.feed.post/3lbhtj2eb4s2o"},"root":{"cid":"bafyreifipyt3vctd4ptuoicvio7rbr5xvjv4afwuggnd2prnmn55mu6luu","uri":"at://did:plc:474ldquxwzrlcvjhhbbk2wte/app.bsky.feed.post/3lbhdzrynik27"}},"text":"okay i take mine back because I hadnt heard this one yet^^"},"cid":"bafyreigzdsdne3z2xxcakgisieyj7y47hj6eg7lj6v4q25ah5q2qotu5ku"}}
{"did":"did:plc:jkaaf5j2yb2pvpx3ualm3vbh","time_us":1732206349002758,"kind":"commit","commit":{"rev":"3lbhudfo3yi2w","operation":"create","collection":"app.bsky.graph.follow","rkey":"3lbhudfnw4y2w","record":{"$type":"app.bsky.graph.follow","createdAt":"2024-11-21T16:15:21.495Z","subject":"did:plc:amsdn2tbjxo3xrwqneqhh4cm"},"cid":"bafyreiaa2vsdr4ckwjg4jq47zfd7mewidywfz3qh3dmglcd6ozi4xwdega"}}
{"did":"did:plc:tdwz2h4id5dxezvohftsmffu","time_us":1732206349003106,"kind":"commit","commit":{"rev":"3lbhujcp4ix2n","operation":"create","collection":"app.bsky.graph.follow","rkey":"3lbhujcoxmp2n","record":{"$type":"app.bsky.graph.follow","createdAt":"2024-11-21T16:18:39.913Z","subject":"did:plc:gf3vum7insztt5rxrpxdz2id"},"cid":"bafyreihaatlpar3abtx6ck3kde2ksic6zzflk4ppduhf6dxurytqrv33ni"}}
{"did":"did:plc:cdsd346mwow7aj3tgfkwsct3","time_us":1732206349003461,"kind":"commit","commit":{"rev":"3lbhus5vior2t","operation":"create","collection":"app.bsky.feed.repost","rkey":"3lbhus5vbtz2t","record":{"$type":"app.bsky.feed.repost","createdAt":"2024-11-21T16:23:36.714Z","subject":{"cid":"bafyreieaacfiobnuqvjhhsndyi5s3fd6krbzdduxsyrzfv43kczpcmkl6y","uri":"at://did:plc:o5q6dynpme4ndolc3heztasm/app.bsky.feed.post/3lbfli3qsoc2o"}},"cid":"bafyreid5ycocp5zq2g7fcx2xxzxrbafuh7b5qhtwuwiomzo6vqila2cbpu"}}
{"did":"did:plc:s4bwqchfzm6gjqfeb6mexgbu","time_us":1732206349003907,"kind":"commit","commit":{"rev":"3lbhuvzeccx2w","operation":"create","collection":"app.bsky.feed.post","rkey":"3lbhuvxf4qs2m","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-21T16:25:44.376Z","langs":["en"],"reply":{"parent":{"cid":"bafyreiaev27cfcxxvn2pdhrwwquzwgclujnulzbcfnn4p4fwgb6migjhw4","uri":"at://did:plc:zec6cslvgc3hhdatrhk6pq5p/app.bsky.feed.post/3lbhujvds4c2b"},"root":{"cid":"bafyreif7qjxhvecwnhlynijj6pf47jwvtkahsz3zh2kaipwu2bw2dxwaqq","uri":"at://did:plc:s4bwqchfzm6gjqfeb6mexgbu/app.bsky.feed.post/3lbhug53kkk2m"}},"text":"\n⌜ Blinking. She hadn't realized she spoke out loud. ⌟\n\n It was nothing like that — . I was only thinking . . . \n\n⌜ Trailing off, her mind occupied. ⌟\n"},"cid":"bafyreibugobcike72y4zxvdyz2oopyt6ywwqfielcwojkb27p7s6rlomgm"}}
{"did":"did:plc:hbc74dlsxhq53kp5oxges6d7","time_us":1732206349004769,"kind":"commit","commit":{"rev":"3lbhuvzedg52j","operation":"create","collection":"app.bsky.feed.like","rkey":"3lbhuvzdyof2j","record":{"$type":"app.bsky.feed.like","createdAt":"2024-11-21T16:25:46.167Z","subject":{"cid":"bafyreiaumopip75nzx2xjbugtwemdppsyx54bd2odf6q45f3o7xkocgari","uri":"at://did:plc:ig2jv6gqup4t7gdq2pmanknw/app.bsky.feed.post/3lbhuvtlaec2c"}},"cid":"bafyreidjk2svg2fdjiiwohmfmvp3hdxhpb33ycnixzbkyib5m6cocindxq"}}

View File

@@ -0,0 +1,14 @@
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------+
| data | ts |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------+
| {_raw: {"commit":{"cid":"bafyreidblutgvj75o4q4akzyyejedjj6l3it6hgqwee6jpwv2wqph5fsgm","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-21T16:09:27.095Z","langs":["en"],"reply":{"parent":{"cid":"bafyreibfglofvqou2yiqvwzk4rcgkhhxrbunyemshdjledgwymimqkg24e","uri":"at://did:plc:6tr6tuzlx2db3rduzr2d6r24/app.bsky.feed.post/3lbhqo2rtys2z"},"root":{"cid":"bafyreibfglofvqou2yiqvwzk4rcgkhhxrbunyemshdjledgwymimqkg24e","uri":"at://did:plc:6tr6tuzlx2db3rduzr2d6r24/app.bsky.feed.post/3lbhqo2rtys2z"}},"text":"aaaaah. LIght shines in a corner of WTF...."},"rev":"3lbhtytnn2k2f","rkey":"3lbhtyteurk2y"}}, commit.collection: app.bsky.feed.post, commit.operation: create, did: did:plc:yj3sjq3blzpynh27cumnp5ks, kind: commit, time_us: 1732206349000167} | 1970-01-01T00:00:00.001 |
| {_raw: {"commit":{"cid":"bafyreia5l5vrkh5oj4cjyhcqby2dprhyvcyofo2q5562tijlae2pzih23m","record":{"$type":"app.bsky.feed.like","createdAt":"2024-11-21T16:25:46.221Z","subject":{"cid":"bafyreidjvrcmckkm765mct5fph36x7kupkfo35rjklbf2k76xkzwyiauge","uri":"at://did:plc:azrv4rcbws6kmcga4fsbphg2/app.bsky.feed.post/3lbgjdpbiec2l"}},"rev":"3lbhuvzds6d2a","rkey":"3lbhuvzdked2a"}}, commit.collection: app.bsky.feed.like, commit.operation: create, did: did:plc:3i4xf2v4wcnyktgv6satke64, kind: commit, time_us: 1732206349000644} | 1970-01-01T00:00:00.002 |
| {_raw: {"commit":{"cid":"bafyreiew2p6cgirfaj45qoenm4fgumib7xoloclrap3jgkz5es7g7kby3i","record":{"$type":"app.bsky.graph.follow","createdAt":"2024-11-21T16:27:40.923Z","subject":"did:plc:r7cdh4sgzqbfdc6wcdxxti7c"},"rev":"3lbhuvze3gi2u","rkey":"3lbhuvzdtmi2u"}}, commit.collection: app.bsky.graph.follow, commit.operation: create, did: did:plc:gccfnqqizz4urhchsaie6jft, kind: commit, time_us: 1732206349001108} | 1970-01-01T00:00:00.003 |
| {_raw: {"commit":{"cid":"bafyreidjix4dauj2afjlbzmhj3a7gwftcevvmmy6edww6vrjdbst26rkby","record":{"$type":"app.bsky.feed.like","createdAt":"2024-11-21T16:15:58.232Z","subject":{"cid":"bafyreiavpshyqzrlo5m7fqodjhs6jevweqnif4phasiwimv4a7mnsqi2fe","uri":"at://did:plc:fusulxqc52zbrc75fi6xrcof/app.bsky.feed.post/3lbhskq5zn22f"}},"rev":"3lbhueija5p22","rkey":"3lbhueiizcx22"}}, commit.collection: app.bsky.feed.like, commit.operation: create, did: did:plc:msxqf3twq7abtdw7dbfskphk, kind: commit, time_us: 1732206349001372} | 1970-01-01T00:00:00.004 |
| {_raw: {"commit":{"cid":"bafyreigzdsdne3z2xxcakgisieyj7y47hj6eg7lj6v4q25ah5q2qotu5ku","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-21T16:09:27.254Z","langs":["en"],"reply":{"parent":{"cid":"bafyreih35fe2jj3gchmgk4amold4l6sfxd2sby5wrg3jrws5fkdypxrbg4","uri":"at://did:plc:6wx2gg5yqgvmlu35r6y3bk6d/app.bsky.feed.post/3lbhtj2eb4s2o"},"root":{"cid":"bafyreifipyt3vctd4ptuoicvio7rbr5xvjv4afwuggnd2prnmn55mu6luu","uri":"at://did:plc:474ldquxwzrlcvjhhbbk2wte/app.bsky.feed.post/3lbhdzrynik27"}},"text":"okay i take mine back because I hadnt heard this one yet^^"},"rev":"3lbhtytohxc2o","rkey":"3lbhtytjqzk2q"}}, commit.collection: app.bsky.feed.post, commit.operation: create, did: did:plc:l5o3qjrmfztir54cpwlv2eme, kind: commit, time_us: 1732206349001905} | 1970-01-01T00:00:00.005 |
| {_raw: {"commit":{"cid":"bafyreiaa2vsdr4ckwjg4jq47zfd7mewidywfz3qh3dmglcd6ozi4xwdega","record":{"$type":"app.bsky.graph.follow","createdAt":"2024-11-21T16:15:21.495Z","subject":"did:plc:amsdn2tbjxo3xrwqneqhh4cm"},"rev":"3lbhudfo3yi2w","rkey":"3lbhudfnw4y2w"}}, commit.collection: app.bsky.graph.follow, commit.operation: create, did: did:plc:jkaaf5j2yb2pvpx3ualm3vbh, kind: commit, time_us: 1732206349002758} | 1970-01-01T00:00:00.006 |
| {_raw: {"commit":{"cid":"bafyreihaatlpar3abtx6ck3kde2ksic6zzflk4ppduhf6dxurytqrv33ni","record":{"$type":"app.bsky.graph.follow","createdAt":"2024-11-21T16:18:39.913Z","subject":"did:plc:gf3vum7insztt5rxrpxdz2id"},"rev":"3lbhujcp4ix2n","rkey":"3lbhujcoxmp2n"}}, commit.collection: app.bsky.graph.follow, commit.operation: create, did: did:plc:tdwz2h4id5dxezvohftsmffu, kind: commit, time_us: 1732206349003106} | 1970-01-01T00:00:00.007 |
| {_raw: {"commit":{"cid":"bafyreid5ycocp5zq2g7fcx2xxzxrbafuh7b5qhtwuwiomzo6vqila2cbpu","record":{"$type":"app.bsky.feed.repost","createdAt":"2024-11-21T16:23:36.714Z","subject":{"cid":"bafyreieaacfiobnuqvjhhsndyi5s3fd6krbzdduxsyrzfv43kczpcmkl6y","uri":"at://did:plc:o5q6dynpme4ndolc3heztasm/app.bsky.feed.post/3lbfli3qsoc2o"}},"rev":"3lbhus5vior2t","rkey":"3lbhus5vbtz2t"}}, commit.collection: app.bsky.feed.repost, commit.operation: create, did: did:plc:cdsd346mwow7aj3tgfkwsct3, kind: commit, time_us: 1732206349003461} | 1970-01-01T00:00:00.008 |
| {_raw: {"commit":{"cid":"bafyreibugobcike72y4zxvdyz2oopyt6ywwqfielcwojkb27p7s6rlomgm","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-21T16:25:44.376Z","langs":["en"],"reply":{"parent":{"cid":"bafyreiaev27cfcxxvn2pdhrwwquzwgclujnulzbcfnn4p4fwgb6migjhw4","uri":"at://did:plc:zec6cslvgc3hhdatrhk6pq5p/app.bsky.feed.post/3lbhujvds4c2b"},"root":{"cid":"bafyreif7qjxhvecwnhlynijj6pf47jwvtkahsz3zh2kaipwu2bw2dxwaqq","uri":"at://did:plc:s4bwqchfzm6gjqfeb6mexgbu/app.bsky.feed.post/3lbhug53kkk2m"}},"text":"\n⌜ Blinking. She hadn't realized she spoke out loud. ⌟\n\n It was nothing like that — . I was only thinking . . . \n\n⌜ Trailing off, her mind occupied. ⌟\n"},"rev":"3lbhuvzeccx2w","rkey":"3lbhuvxf4qs2m"}}, commit.collection: app.bsky.feed.post, commit.operation: create, did: did:plc:s4bwqchfzm6gjqfeb6mexgbu, kind: commit, time_us: 1732206349003907} | 1970-01-01T00:00:00.009 |
| {_raw: {"commit":{"cid":"bafyreidjk2svg2fdjiiwohmfmvp3hdxhpb33ycnixzbkyib5m6cocindxq","record":{"$type":"app.bsky.feed.like","createdAt":"2024-11-21T16:25:46.167Z","subject":{"cid":"bafyreiaumopip75nzx2xjbugtwemdppsyx54bd2odf6q45f3o7xkocgari","uri":"at://did:plc:ig2jv6gqup4t7gdq2pmanknw/app.bsky.feed.post/3lbhuvtlaec2c"}},"rev":"3lbhuvzedg52j","rkey":"3lbhuvzdyof2j"}}, commit.collection: app.bsky.feed.like, commit.operation: create, did: did:plc:hbc74dlsxhq53kp5oxges6d7, kind: commit, time_us: 1732206349004769} | 1970-01-01T00:00:00.010 |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------+

View File

@@ -62,7 +62,6 @@ mod test {
use common_meta::rpc::router::region_distribution;
use common_query::Output;
use common_recordbatch::RecordBatches;
use common_test_util::recordbatch::check_output_stream;
use frontend::instance::Instance;
use query::parser::QueryLanguageParser;
use query::query_engine::DefaultSerializer;
@@ -76,11 +75,10 @@ mod test {
use super::*;
use crate::standalone::GreptimeDbStandaloneBuilder;
use crate::test_util::execute_sql_and_expect;
use crate::tests;
use crate::tests::MockDistributedInstance;
use crate::tests::test_util::{
MockInstance, both_instances_cases, distributed, execute_sql, standalone,
};
use crate::tests::test_util::{MockInstance, both_instances_cases, distributed, standalone};
#[tokio::test(flavor = "multi_thread")]
async fn test_distributed_handle_ddl_request() {
@@ -1242,8 +1240,7 @@ CREATE TABLE {table_name} (
.unwrap();
assert!(matches!(output.data, OutputData::AffectedRows(3)));
let output = execute_sql(&frontend, "show create table auto_created_table").await;
let sql = "show create table auto_created_table";
let expected = r#"+--------------------+---------------------------------------------------+
| Table | Create Table |
+--------------------+---------------------------------------------------+
@@ -1261,6 +1258,6 @@ CREATE TABLE {table_name} (
| | 'compaction.type' = 'twcs' |
| | ) |
+--------------------+---------------------------------------------------+"#;
check_output_stream(output.data, expected).await;
execute_sql_and_expect(&frontend, sql, expected).await;
}
}

View File

@@ -24,6 +24,7 @@ use common_base::Plugins;
use common_config::Configurable;
use common_meta::key::catalog_name::CatalogNameKey;
use common_meta::key::schema_name::SchemaNameKey;
use common_query::Output;
use common_runtime::runtime::BuilderBuild;
use common_runtime::{Builder as RuntimeBuilder, Runtime};
use common_test_util::ports;
@@ -774,3 +775,16 @@ pub(crate) async fn prepare_another_catalog_and_schema(instance: &Instance) {
.await
.unwrap();
}
pub async fn execute_sql(instance: &Arc<Instance>, sql: &str) -> Output {
SqlQueryHandler::do_query(instance.as_ref(), sql, QueryContext::arc())
.await
.remove(0)
.unwrap()
}
pub async fn execute_sql_and_expect(instance: &Arc<Instance>, sql: &str, expected: &str) {
let output = execute_sql(instance, sql).await;
let output = output.data.pretty_print().await;
assert_eq!(output, expected.trim());
}

View File

@@ -18,7 +18,7 @@ mod instance_noop_wal_test;
mod instance_test;
mod promql_test;
mod reconcile_table;
pub(crate) mod test_util;
pub mod test_util;
use std::collections::HashMap;
use std::sync::Arc;

View File

@@ -27,8 +27,8 @@ use store_api::storage::RegionId;
use table::metadata::TableId;
use crate::cluster::GreptimeDbClusterBuilder;
use crate::test_util::{StorageType, TempDirGuard, get_test_store_config};
use crate::tests::test_util::{MockInstanceBuilder, TestContext, execute_sql, wait_procedure};
use crate::test_util::{StorageType, TempDirGuard, execute_sql, get_test_store_config};
use crate::tests::test_util::{MockInstanceBuilder, TestContext, wait_procedure};
/// Helper function to get table route information for GC procedure
async fn get_table_route(

View File

@@ -18,9 +18,8 @@ use common_test_util::recordbatch::check_output_stream;
use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig};
use crate::cluster::GreptimeDbClusterBuilder;
use crate::tests::test_util::{
MockInstanceBuilder, RebuildableMockInstance, TestContext, execute_sql,
};
use crate::test_util::execute_sql;
use crate::tests::test_util::{MockInstanceBuilder, RebuildableMockInstance, TestContext};
pub(crate) async fn distributed_with_noop_wal() -> TestContext {
common_telemetry::init_default_ut_logging();

View File

@@ -23,9 +23,10 @@ use common_test_util::recordbatch::check_output_stream;
use table::table_reference::TableReference;
use crate::cluster::GreptimeDbClusterBuilder;
use crate::test_util::execute_sql;
use crate::tests::test_util::{
MockInstanceBuilder, RebuildableMockInstance, TestContext, dump_kvbackend, execute_sql,
restore_kvbackend, try_execute_sql, wait_procedure,
MockInstanceBuilder, RebuildableMockInstance, TestContext, dump_kvbackend, restore_kvbackend,
try_execute_sql, wait_procedure,
};
const CREATE_MONITOR_TABLE_SQL: &str = r#"

View File

@@ -439,10 +439,6 @@ pub fn find_testing_resource(path: &str) -> String {
prepare_path(&p)
}
pub async fn execute_sql(instance: &Arc<Instance>, sql: &str) -> Output {
execute_sql_with(instance, sql, QueryContext::arc()).await
}
pub async fn try_execute_sql(instance: &Arc<Instance>, sql: &str) -> Result<Output> {
try_execute_sql_with(instance, sql, QueryContext::arc()).await
}
@@ -455,16 +451,6 @@ pub async fn try_execute_sql_with(
instance.do_query(sql, query_ctx).await.remove(0)
}
pub async fn execute_sql_with(
instance: &Arc<Instance>,
sql: &str,
query_ctx: QueryContextRef,
) -> Output {
try_execute_sql_with(instance, sql, query_ctx)
.await
.unwrap_or_else(|e| panic!("Failed to execute sql: {sql}, error: {e:?}"))
}
/// Dump the kv backend to a vector of key-value pairs.
pub async fn dump_kvbackend(kv_backend: &KvBackendRef) -> Vec<(Vec<u8>, Vec<u8>)> {
let req = RangeRequest::new().with_range(vec![0], vec![0]);

View File

@@ -0,0 +1,111 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::io::BufRead;
use std::sync::Arc;
use std::{fs, io};
use common_test_util::find_workspace_path;
use frontend::instance::Instance;
use tests_integration::standalone::GreptimeDbStandaloneBuilder;
use tests_integration::test_util::execute_sql_and_expect;
#[tokio::test]
async fn test_load_jsonbench_data() {
common_telemetry::init_default_ut_logging();
let instance = GreptimeDbStandaloneBuilder::new("test_load_jsonbench_data")
.build()
.await;
let frontend = instance.fe_instance();
create_table(frontend).await;
desc_table(frontend).await;
insert_data(frontend).await.unwrap();
query_data(frontend).await.unwrap();
}
async fn query_data(frontend: &Arc<Instance>) -> io::Result<()> {
let sql = "SELECT count(*) FROM bluesky";
let expected = r#"
+----------+
| count(*) |
+----------+
| 10 |
+----------+
"#;
execute_sql_and_expect(frontend, sql, expected).await;
let sql = "SELECT * FROM bluesky ORDER BY ts";
let expected = fs::read_to_string(find_workspace_path(
"tests-integration/resources/jsonbench-select-all.txt",
))?;
execute_sql_and_expect(frontend, sql, &expected).await;
Ok(())
}
async fn insert_data(frontend: &Arc<Instance>) -> io::Result<()> {
let file = fs::File::open(find_workspace_path(
"tests-integration/resources/jsonbench-head-10.ndjson",
))?;
let reader = io::BufReader::new(file);
for (i, line) in reader.lines().enumerate() {
let line = line?;
if line.is_empty() {
continue;
}
let sql = format!(
"INSERT INTO bluesky (ts, data) VALUES ({}, '{}')",
i + 1,
line.replace("'", "''"), // standard method to escape the single quote
);
execute_sql_and_expect(frontend, &sql, "Affected Rows: 1").await;
}
Ok(())
}
async fn desc_table(frontend: &Arc<Instance>) {
let sql = "DESC TABLE bluesky";
let expected = r#"
+--------+----------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
| Column | Type | Key | Null | Default | Semantic Type |
+--------+----------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
| data | Json<Object{"_raw": String, "commit.collection": String, "commit.operation": String, "did": String, "kind": String, "time_us": Number(I64)}> | | YES | | FIELD |
| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP |
+--------+----------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+"#;
execute_sql_and_expect(frontend, sql, expected).await;
}
async fn create_table(frontend: &Arc<Instance>) {
let sql = r#"
CREATE TABLE bluesky (
"data" JSON (
format = "partial",
fields = Struct<
kind String,
"commit.operation" String,
"commit.collection" String,
did String,
time_us Bigint
>,
),
ts Timestamp TIME INDEX,
)
"#;
execute_sql_and_expect(frontend, sql, "Affected Rows: 0").await;
}

View File

@@ -16,6 +16,7 @@
mod grpc;
#[macro_use]
mod http;
mod jsonbench;
#[macro_use]
mod sql;
#[macro_use]

View File

@@ -15,5 +15,6 @@ extend-exclude = [
"*.sql",
"*.result",
"src/pipeline/benches/data.log",
"cyborg/pnpm-lock.yaml"
"cyborg/pnpm-lock.yaml",
"tests-integration/resources/*"
]