(?) data-driven concretize
(?) select
(?) compaction

Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
luofucong
2026-05-21 19:54:00 +08:00
parent 4668dd43bd
commit 2d430091f4
17 changed files with 390 additions and 114 deletions

View File

@@ -355,7 +355,7 @@ pub fn merge_record_batches(schema: SchemaRef, batches: &[RecordBatch]) -> Resul
Ok(RecordBatch::from_df_record_batch(schema, record_batch))
}
fn maybe_align_json_array_with_schema(
pub fn maybe_align_json_array_with_schema(
schema: &ArrowSchemaRef,
arrays: Vec<ArrayRef>,
) -> Result<Vec<ArrayRef>> {

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#![recursion_limit = "256"]
pub mod alive_keeper;
pub mod config;
pub mod datanode;

View File

@@ -26,12 +26,12 @@ use std::sync::Arc;
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value as Json};
use snafu::{OptionExt, ResultExt, ensure};
use snafu::{OptionExt, ResultExt};
use crate::error::{self, InvalidJsonSnafu, Result, SerializeSnafu};
use crate::json::value::{JsonValue, JsonVariant};
use crate::types::json_type::{JsonNativeType, JsonNumberType, JsonObjectType};
use crate::types::{StructField, StructType};
use crate::types::{JsonType, StructField, StructType};
use crate::value::{ListValue, StructValue, Value};
/// The configuration of JSON encoding
@@ -305,32 +305,45 @@ fn encode_json_array_with_context<'a>(
) -> Result<JsonValue> {
let json_array_len = json_array.len();
let mut items = Vec::with_capacity(json_array_len);
let mut element_type = item_type.cloned();
for (index, value) in json_array.into_iter().enumerate() {
let array_context = context.with_key(&index.to_string());
let item_value =
encode_json_value_with_context(value, element_type.as_ref(), &array_context)?;
let item_type = item_value.json_type().native_type().clone();
items.push(item_value.into_variant());
let item_value = encode_json_value_with_context(value, None, &array_context)?;
items.push(item_value);
}
// Determine the common type for the list
if let Some(current_type) = &element_type {
// It's valid for json array to have different types of items, for example,
// ["a string", 1]. However, the `JsonValue` will be converted to Arrow list array,
// which requires all items have exactly same type. So we forbid the different types
// case here. Besides, it's not common for items in a json array to differ. So I think
// we are good here.
ensure!(
item_type == *current_type,
error::InvalidJsonSnafu {
value: "all items in json array must have the same type"
}
);
} else {
element_type = Some(item_type);
// In specification, it's valid for a JSON array to have different types of items, for example,
// ["a string", 1]. However, in implementation, the `JsonValue` will be converted to Arrow list
// array, which requires all items have exactly the same type. So we merge out the maybe
// different item types to a unified type, and align all the item values to it.
let provided_item_type = item_type.map(|x| JsonType::new_json2(x.clone()));
let merged_item_type = if let Some((first, rests)) = items.split_first() {
let mut merged = first.json_type().clone();
for rest in rests.iter().map(|x| x.json_type()) {
merged.merge(rest)?;
}
Some(merged)
} else {
None
};
let unified_item_type = match (provided_item_type, merged_item_type) {
(Some(mut x), Some(y)) => {
x.merge(&y)?;
Some(x)
}
(Some(x), None) | (None, Some(x)) => Some(x),
(None, None) => None,
};
if let Some(unified_item_type) = unified_item_type {
for item in &mut items {
item.try_align(&unified_item_type)?;
}
}
let items = items
.into_iter()
.map(|x| x.into_variant())
.collect::<Vec<_>>();
Ok(JsonValue::new(JsonVariant::Array(items)))
}
@@ -729,7 +742,7 @@ where
#[cfg(test)]
mod tests {
use common_base::bytes::Bytes;
use serde_json::json;
use super::*;
@@ -1050,11 +1063,21 @@ mod tests {
fn test_encode_json_array_mixed_types() {
let json = json!([1, "hello", true, 3.15]);
let settings = JsonStructureSettings::Structured(None);
let result = settings.encode_with_type(json, None);
assert_eq!(
result.unwrap_err().to_string(),
"Invalid JSON: all items in json array must have the same type"
);
let result = settings
.encode_with_type(json, None)
.unwrap()
.into_json_inner()
.unwrap();
if let Value::List(list_value) = result {
assert_eq!(list_value.items().len(), 4);
assert_eq!(
list_value.datatype(),
Arc::new(ConcreteDataType::binary_datatype())
);
} else {
panic!("Expected List value");
}
}
#[test]
@@ -1276,12 +1299,12 @@ mod tests {
#[test]
fn test_encode_json_array_with_item_type() {
let json = json!([1, 2, 3]);
let item_type = Arc::new(ConcreteDataType::uint64_datatype());
let item_type = Arc::new(ConcreteDataType::int64_datatype());
let settings = JsonStructureSettings::Structured(None);
let result = settings
.encode_with_type(
json,
Some(&JsonNativeType::Array(Box::new(JsonNativeType::u64()))),
Some(&JsonNativeType::Array(Box::new(JsonNativeType::i64()))),
)
.unwrap()
.into_json_inner()
@@ -1289,9 +1312,9 @@ mod tests {
if let Value::List(list_value) = result {
assert_eq!(list_value.items().len(), 3);
assert_eq!(list_value.items()[0], Value::UInt64(1));
assert_eq!(list_value.items()[1], Value::UInt64(2));
assert_eq!(list_value.items()[2], Value::UInt64(3));
assert_eq!(list_value.items()[0], Value::Int64(1));
assert_eq!(list_value.items()[1], Value::Int64(2));
assert_eq!(list_value.items()[2], Value::Int64(3));
assert_eq!(list_value.datatype(), item_type);
} else {
panic!("Expected List value");
@@ -2249,11 +2272,32 @@ mod tests {
)])),
);
let decoded_struct = settings.decode_struct(array_struct);
assert_eq!(
decoded_struct.unwrap_err().to_string(),
"Invalid JSON: all items in json array must have the same type"
);
let decoded_struct = settings.decode_struct(array_struct).unwrap();
let fields = decoded_struct.struct_type().fields();
let decoded_fields: Vec<&str> = fields.iter().map(|f| f.name()).collect();
assert!(decoded_fields.contains(&"value"));
if let Value::List(list_value) = &decoded_struct.items()[0] {
assert_eq!(list_value.items().len(), 4);
assert_eq!(
list_value.items()[0],
Value::Binary(Bytes::from("1".as_bytes()))
);
assert_eq!(
list_value.items()[1],
Value::Binary(Bytes::from(r#""hello""#.as_bytes()))
);
assert_eq!(
list_value.items()[2],
Value::Binary(Bytes::from("true".as_bytes()))
);
assert_eq!(
list_value.items()[3],
Value::Binary(Bytes::from("3.15".as_bytes()))
);
} else {
panic!("Expected array to be decoded as ListValue");
}
}
#[test]

View File

@@ -161,12 +161,18 @@ impl JsonVariant {
};
JsonNativeType::Array(Box::new(item_type))
}
JsonVariant::Object(object) => JsonNativeType::Object(
object
.iter()
.map(|(k, v)| (k.clone(), v.native_type()))
.collect(),
),
JsonVariant::Object(object) => {
if object.is_empty() {
JsonNativeType::Null
} else {
JsonNativeType::Object(
object
.iter()
.map(|(k, v)| (k.clone(), v.native_type()))
.collect(),
)
}
}
JsonVariant::Variant(_) => JsonNativeType::Variant,
}
}
@@ -642,12 +648,18 @@ impl JsonVariantRef<'_> {
};
JsonNativeType::Array(Box::new(item_type))
}
JsonVariantRef::Object(object) => JsonNativeType::Object(
object
.iter()
.map(|(k, v)| (k.to_string(), native_type(v)))
.collect(),
),
JsonVariantRef::Object(object) => {
if object.is_empty() {
JsonNativeType::Null
} else {
JsonNativeType::Object(
object
.iter()
.map(|(k, v)| (k.to_string(), native_type(v)))
.collect(),
)
}
}
JsonVariantRef::Variant(_) => JsonNativeType::Variant,
}
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::borrow::Cow;
use std::collections::BTreeMap;
use std::fmt::{Debug, Display, Formatter};
use std::str::FromStr;
@@ -115,6 +116,14 @@ impl JsonNativeType {
(JsonNativeType::Null, that) => that.clone(),
(this, JsonNativeType::Null) => this,
(this, that) if this == *that => this,
// (JsonNativeType::Number(x), JsonNativeType::Number(y)) => {
// JsonNativeType::Number(match (x, y) {
// (x, y) if x == y => *x,
// (JsonNumberType::F64, _) | (_, JsonNumberType::F64) => JsonNumberType::F64,
// _ => JsonNumberType::I64,
// })
// }
_ => JsonNativeType::Variant,
};
}
@@ -380,6 +389,23 @@ fn plain_json_struct_type(data_type: &ArrowDataType) -> StructType {
StructType::new(Arc::new(vec![field]))
}
pub fn merge_as_json_type<'a>(
left: &'a ArrowDataType,
right: &ArrowDataType,
) -> Cow<'a, ArrowDataType> {
if left == right {
return Cow::Borrowed(left);
}
let mut left = JsonType::from(left);
let right = JsonType::from(right);
Cow::Owned(if left.merge(&right).is_ok() {
left.as_arrow_type()
} else {
ArrowDataType::Utf8
})
}
impl From<&ArrowDataType> for JsonType {
fn from(t: &ArrowDataType) -> Self {
JsonType::new_json2(JsonNativeType::from(&ConcreteDataType::from_arrow_type(t)))
@@ -811,18 +837,15 @@ mod tests {
Ok(r#""<Bool>""#),
)?;
// Identical number categories should stay as Number.
test(
"1",
&mut JsonType::new_json2(JsonNativeType::i64()),
Ok(r#""<Number>""#),
)?;
// Conflicting number categories should be lifted to Variant.
test(
"1.5",
&mut JsonType::new_json2(JsonNativeType::i64()),
Ok(r#""<Variant>""#),
Ok(r#""<Number>""#),
)?;
// Object merge should preserve existing fields and append missing fields.

View File

@@ -24,9 +24,10 @@ use arrow_schema::{DataType, FieldRef};
use serde_json::Value;
use snafu::{OptionExt, ResultExt};
use crate::arrow_array::{StringArray, binary_array_value, string_array_value};
use crate::arrow_array::{MutableBinaryArray, StringArray, binary_array_value, string_array_value};
use crate::error::{
AlignJsonArraySnafu, ArrowComputeSnafu, DeserializeSnafu, InvalidJsonSnafu, Result,
SerializeSnafu,
};
pub struct JsonArray<'a> {
@@ -177,7 +178,39 @@ impl JsonArray<'_> {
Ok(Arc::new(json_array))
}
fn try_decode_variant(&self) -> Result<ArrayRef> {
let json_values = (0..self.inner.len())
.map(|i| self.try_get_value(i))
.collect::<Result<Vec<_>>>()?;
let serialized_values = json_values
.iter()
.map(|value| {
(!value.is_null())
.then(|| serde_json::to_vec(value))
.transpose()
})
.collect::<std::result::Result<Vec<_>, _>>()
.context(SerializeSnafu)?;
let total_bytes = serialized_values.iter().flatten().map(Vec::len).sum();
let mut builder = MutableBinaryArray::with_capacity(self.inner.len(), total_bytes);
for serialized_value in serialized_values {
if let Some(bytes) = serialized_value {
builder.append_value(bytes);
} else {
builder.append_null();
}
}
Ok(Arc::new(builder.finish()))
}
fn try_cast(&self, to_type: &DataType) -> Result<ArrayRef> {
if matches!(to_type, DataType::Binary) {
return self.try_decode_variant();
}
if compute::can_cast_types(self.inner.data_type(), to_type) {
return compute::cast(&self.inner, to_type).context(ArrowComputeSnafu);
}

View File

@@ -567,14 +567,16 @@ impl RegionFlushTask {
write_opts: &WriteOptions,
mem_ranges: MemtableRanges,
) -> Result<FlushFlatMemResult> {
let batch_schema = to_flat_sst_arrow_schema(
&version.metadata,
&FlatSchemaOptions::from_encoding(version.metadata.primary_key_encoding),
);
// let memtable_schema = mem_ranges
// .schema()
// .unwrap_or_else(|| version.metadata.schema.arrow_schema().clone());
let options = FlatSchemaOptions::from_encoding(version.metadata.primary_key_encoding);
let batch_schema = to_flat_sst_arrow_schema(&version.metadata, &options);
let field_column_start =
flat_format::field_column_start(&version.metadata, batch_schema.fields().len());
let flat_sources = memtable_flat_sources(
batch_schema,
batch_schema.clone(),
mem_ranges,
&version.options,
field_column_start,

View File

@@ -14,6 +14,7 @@
//! Memtables are write buffers for regions.
use std::borrow::Cow;
use std::collections::BTreeMap;
use std::fmt;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
@@ -60,6 +61,10 @@ pub use bulk::part::{
BulkPart, BulkPartEncoder, BulkPartMeta, UnorderedPart, record_batch_estimated_size,
sort_primary_key_record_batch,
};
use datatypes::arrow::datatypes::{Schema, SchemaRef};
use datatypes::extension::json;
use datatypes::schema::ext::ArrowSchemaExt;
use datatypes::types::json_type;
#[cfg(any(test, feature = "test"))]
pub use time_partition::filter_record_batch;
@@ -217,6 +222,56 @@ impl MemtableRanges {
.max()
.unwrap_or(0)
}
#[expect(unused)]
pub(crate) fn schema(&self) -> Option<SchemaRef> {
let mut schemas = self
.ranges
.values()
.filter_map(|x| x.record_batch_schema())
.collect::<Vec<_>>();
if schemas.iter().all(|x| !x.has_json_extension_field()) {
// If there are no JSON extension fields in any schemas, the invariant must be hold,
// that all schemas are same (they are all derived from same region metadata).
// So it's ok to return the first one as the schema of the whole memtable ranges.
return (!schemas.is_empty()).then(|| schemas.swap_remove(0));
}
// If there are JSON extension fields, by convention, only their concrete data types
// (Arrow's Struct) may differ. Other things like the metadata or the fields count are same.
// So to produce the final schema, we can solely merge the data types.
schemas
.split_first()
.map(|(first, rest)| merge_json_extension_fields(first, rest))
}
}
pub(crate) fn merge_json_extension_fields(base: &SchemaRef, others: &[SchemaRef]) -> SchemaRef {
let mut fields = base.fields().iter().cloned().collect::<Vec<_>>();
for (i, field) in fields.iter_mut().enumerate() {
if !json::is_json_extension_type(field) {
continue;
}
let merged = others
.iter()
.map(|x| Cow::Borrowed(x.field(i).data_type()))
.reduce(|acc, e| {
Cow::Owned(json_type::merge_as_json_type(acc.as_ref(), e.as_ref()).into_owned())
});
if let Some(merged) = merged
&& field.data_type() != merged.as_ref()
{
let merged =
json_type::merge_as_json_type(field.data_type(), merged.as_ref()).into_owned();
let mut new = field.as_ref().clone();
new.set_data_type(merged);
*field = Arc::new(new);
}
}
Arc::new(Schema::new_with_metadata(fields, base.metadata().clone()))
}
impl IterBuilder for MemtableRanges {
@@ -557,6 +612,11 @@ pub trait IterBuilder: Send + Sync {
.fail()
}
/// Returns the schema of record batches produced by this iterator.
fn record_batch_schema(&self) -> Option<SchemaRef> {
None
}
/// Returns the [EncodedRange] if the range is already encoded into SST.
fn encoded_range(&self) -> Option<EncodedRange> {
None
@@ -734,6 +794,11 @@ impl MemtableRange {
self.context.builder.is_record_batch()
}
/// Returns the schema of record batches if this range supports record batch iteration.
pub fn record_batch_schema(&self) -> Option<SchemaRef> {
self.context.builder.record_batch_schema()
}
pub fn num_rows(&self) -> usize {
self.stats.num_rows
}

View File

@@ -835,6 +835,10 @@ impl IterBuilder for BulkRangeIterBuilder {
fn encoded_range(&self) -> Option<EncodedRange> {
None
}
fn record_batch_schema(&self) -> Option<SchemaRef> {
Some(self.part.batch.schema())
}
}
impl IterBuilder for MultiBulkRangeIterBuilder {
@@ -867,6 +871,10 @@ impl IterBuilder for MultiBulkRangeIterBuilder {
fn encoded_range(&self) -> Option<EncodedRange> {
None
}
fn record_batch_schema(&self) -> Option<SchemaRef> {
self.part.record_batch_schema()
}
}
/// Iterator builder for encoded bulk range

View File

@@ -433,7 +433,7 @@ impl UnorderedPart {
return Ok(Some(self.parts[0].batch.clone()));
}
// Get the schema from the first part
// Get the schema from the first part and normalize JSON2 columns across all parts.
let schema = self.parts[0].batch.schema();
let concatenated = if schema.has_json_extension_field() {
let (schema, batches) = align_parts(&self.parts)?;
@@ -1608,6 +1608,11 @@ impl MultiBulkPart {
self.series_count
}
/// Returns the schema of batches in this part.
pub(crate) fn record_batch_schema(&self) -> Option<SchemaRef> {
self.batches.first().map(|batch| batch.schema())
}
/// Returns the number of record batches in this part.
pub fn num_batches(&self) -> usize {
self.batches.len()

View File

@@ -26,12 +26,14 @@ use datatypes::arrow::datatypes::{ArrowNativeType, BinaryType, DataType, SchemaR
use datatypes::arrow::error::ArrowError;
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::arrow_array::BinaryArray;
use datatypes::extension::json::is_json_extension_type;
use datatypes::timestamp::timestamp_array_to_primitive;
use datatypes::vectors::json::array::JsonArray;
use futures::{Stream, TryStreamExt};
use snafu::ResultExt;
use store_api::storage::SequenceNumber;
use crate::error::{ComputeArrowSnafu, Result};
use crate::error::{ComputeArrowSnafu, DataTypeMismatchSnafu, Result};
use crate::memtable::BoxedRecordBatchIterator;
use crate::metrics::READ_STAGE_ELAPSED;
use crate::read::BoxedRecordBatchStream;
@@ -258,14 +260,29 @@ impl BatchBuilder {
check_interleave_overflow(&self.batches, &self.schema, &self.indices)?;
let columns = (0..self.schema.fields.len())
.map(|column_idx| {
let arrays: Vec<_> = self
let columns = self
.schema
.fields()
.iter()
.enumerate()
.map(|(column_idx, field)| {
let arrays = self
.batches
.iter()
.map(|(_, batch)| batch.column(column_idx).as_ref())
.collect();
interleave(&arrays, &self.indices).context(ComputeArrowSnafu)
.map(|(_, batch)| {
let column = batch.column(column_idx);
let column = if is_json_extension_type(field) {
JsonArray::from(column)
.try_align(field.data_type())
.context(DataTypeMismatchSnafu)?
} else {
column.clone()
};
Ok(column)
})
.collect::<Result<Vec<_>>>()?;
let aligned = arrays.iter().map(|x| x.as_ref()).collect::<Vec<_>>();
interleave(&aligned, &self.indices).context(ComputeArrowSnafu)
})
.collect::<Result<Vec<_>>>()?;

View File

@@ -51,6 +51,7 @@ use crate::sst::{
///
/// This mapper support duplicate and unsorted projection indices.
/// The output schema is determined by the projection indices.
#[derive(Clone)]
pub struct FlatProjectionMapper {
/// Metadata of the region.
metadata: RegionMetadataRef,

View File

@@ -15,6 +15,7 @@
//! Structs for partition ranges.
use common_time::Timestamp;
use datatypes::arrow::datatypes::SchemaRef;
use smallvec::{SmallVec, smallvec};
use store_api::region_engine::PartitionRange;
use store_api::storage::TimeSeriesDistribution;
@@ -478,6 +479,12 @@ impl MemRangeBuilder {
pub(crate) fn stats(&self) -> &MemtableStats {
&self.stats
}
/// Returns the record batch schema for this memtable range if available.
#[expect(unused)]
pub(crate) fn record_batch_schema(&self) -> Option<SchemaRef> {
self.range.record_batch_schema()
}
}
#[cfg(test)]

View File

@@ -106,6 +106,12 @@ impl FlatWriteFormat {
let sequence_array = Arc::new(UInt64Array::from(vec![override_sequence; batch.num_rows()]));
columns[sequence_column_index(batch.num_columns())] = sequence_array;
// let columns = common_recordbatch::recordbatch::maybe_align_json_array_with_schema(
// &self.arrow_schema,
// columns,
// )
// .context(RecordBatchSnafu)?;
// RecordBatch::try_new(self.arrow_schema.clone(), columns).context(NewRecordBatchSnafu)
RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu)
}
}

View File

@@ -153,16 +153,10 @@ async fn query_data(frontend: &Arc<Instance>) -> io::Result<()> {
+----------+"#;
execute_sql_and_expect(frontend, sql, expected).await;
let sql = "SELECT * FROM bluesky ORDER BY time_us";
let expected = fs::read_to_string(find_workspace_path(
"tests-integration/resources/jsonbench-select-all.txt",
))?;
execute_sql_and_expect(frontend, sql, &expected).await;
// query 1:
let sql = "
SELECT
json_get_string(data, '$.commit.collection') AS event, count() AS count
data.commit.collection AS event, count() AS count
FROM bluesky
GROUP BY event
ORDER BY count DESC, event ASC";
@@ -180,13 +174,12 @@ ORDER BY count DESC, event ASC";
// query 2:
let sql = "
SELECT
json_get_string(data, '$.commit.collection') AS event,
data.commit.collection AS event,
count() AS count,
count(DISTINCT json_get_string(data, '$.did')) AS users
count(DISTINCT data.did) AS users
FROM bluesky
WHERE
(json_get_string(data, '$.kind') = 'commit') AND
(json_get_string(data, '$.commit.operation') = 'create')
data.kind = 'commit' AND data.commit.operation = 'create'
GROUP BY event
ORDER BY count DESC, event ASC";
let expected = r#"
@@ -203,15 +196,14 @@ ORDER BY count DESC, event ASC";
// query 3:
let sql = "
SELECT
json_get_string(data, '$.commit.collection') AS event,
date_part('hour', to_timestamp_micros(json_get_int(data, '$.time_us'))) as hour_of_day,
data.commit.collection AS event,
date_part('hour', to_timestamp_micros(arrow_cast(data.time_us, 'Int64'))) as hour_of_day,
count() AS count
FROM bluesky
WHERE
(json_get_string(data, '$.kind') = 'commit') AND
(json_get_string(data, '$.commit.operation') = 'create') AND
json_get_string(data, '$.commit.collection') IN
('app.bsky.feed.post', 'app.bsky.feed.repost', 'app.bsky.feed.like')
data.kind = 'commit' AND
data.commit.operation = 'create' AND
data.commit.collection in ('app.bsky.feed.post', 'app.bsky.feed.repost', 'app.bsky.feed.like')
GROUP BY event, hour_of_day
ORDER BY hour_of_day, event";
let expected = r#"
@@ -227,13 +219,13 @@ ORDER BY hour_of_day, event";
// query 4:
let sql = "
SELECT
json_get_string(data, '$.did') as user_id,
min(to_timestamp_micros(json_get_int(data, '$.time_us'))) AS first_post_ts
data.did::String as user_id,
min(to_timestamp_micros(arrow_cast(data.time_us, 'Int64'))) AS first_post_ts
FROM bluesky
WHERE
(json_get_string(data, '$.kind') = 'commit') AND
(json_get_string(data, '$.commit.operation') = 'create') AND
(json_get_string(data, '$.commit.collection') = 'app.bsky.feed.post')
data.kind = 'commit' AND
data.commit.operation = 'create' AND
data.commit.collection = 'app.bsky.feed.post'
GROUP BY user_id
ORDER BY first_post_ts ASC, user_id DESC
LIMIT 3";
@@ -250,17 +242,17 @@ LIMIT 3";
// query 5:
let sql = "
SELECT
json_get_string(data, '$.did') as user_id,
data.did::String as user_id,
date_part(
'epoch',
max(to_timestamp_micros(json_get_int(data, '$.time_us'))) -
min(to_timestamp_micros(json_get_int(data, '$.time_us')))
max(to_timestamp_micros(arrow_cast(data.time_us, 'Int64'))) -
min(to_timestamp_micros(arrow_cast(data.time_us, 'Int64')))
) AS activity_span
FROM bluesky
WHERE
(json_get_string(data, '$.kind') = 'commit') AND
(json_get_string(data, '$.commit.operation') = 'create') AND
(json_get_string(data, '$.commit.collection') = 'app.bsky.feed.post')
data.kind = 'commit' AND
data.commit.operation = 'create' AND
data.commit.collection = 'app.bsky.feed.post'
GROUP BY user_id
ORDER BY activity_span DESC, user_id DESC
LIMIT 3";
@@ -304,30 +296,21 @@ async fn insert_data_by_sql(frontend: &Arc<Instance>) -> io::Result<()> {
async fn desc_table(frontend: &Arc<Instance>) {
let sql = "DESC TABLE bluesky";
let expected = r#"
+---------+------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
| Column | Type | Key | Null | Default | Semantic Type |
+---------+------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
| data | Json<{"_raw":"<String>","commit.collection":"<String>","commit.operation":"<String>","did":"<String>","kind":"<String>","time_us":"<Number>"}> | | YES | | FIELD |
| time_us | TimestampMicrosecond | PRI | NO | | TIMESTAMP |
+---------+------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+"#;
+---------+----------------------+-----+------+---------+---------------+
| Column | Type | Key | Null | Default | Semantic Type |
+---------+----------------------+-----+------+---------+---------------+
| data | JSON2 | | YES | | FIELD |
| time_us | TimestampMicrosecond | PRI | NO | | TIMESTAMP |
+---------+----------------------+-----+------+---------+---------------+"#;
execute_sql_and_expect(frontend, sql, expected).await;
}
async fn create_table(frontend: &Arc<Instance>) {
let sql = r#"
CREATE TABLE bluesky (
"data" JSON (
format = "partial",
fields = Struct<
kind String,
"commit.operation" String,
"commit.collection" String,
did String,
time_us Bigint
>,
),
"data" JSON2,
time_us TimestampMicrosecond TIME INDEX,
)
) WITH ('append_mode' = 'true', 'sst_format' = 'flat')
"#;
execute_sql_and_expect(frontend, sql, "Affected Rows: 0").await;
}

View File

@@ -117,6 +117,23 @@ select j.a.b from json2_table order by ts;
| 10 |
+-------------------------------------+
select j.a, j.a.x from json2_table order by ts;
+--------------------------------------------------+----------------------------------------------------+
| json_get(json2_table.j,Utf8("a"),Utf8View(NULL)) | json_get(json2_table.j,Utf8("a.x"),Utf8View(NULL)) |
+--------------------------------------------------+----------------------------------------------------+
| {"b":1,"x":null} | |
| {"b":-2,"x":null} | |
| {"b":3,"x":null} | |
| {"b":-4,"x":null} | |
| {"b":null,"x":null} | |
| | |
| {"b":"s7","x":null} | |
| {"b":8,"x":null} | |
| {"b":null,"x":true} | true |
| {"b":10,"x":null} | |
+--------------------------------------------------+----------------------------------------------------+
select j.c, j.y from json2_table order by ts;
+-----------------------------------+-----------------------------------+
@@ -134,6 +151,44 @@ select j.c, j.y from json2_table order by ts;
| | false |
+-----------------------------------+-----------------------------------+
select j from json2_table order by ts;
Error: 3001(EngineExecuteQuery), Invalid argument error: column types must match schema types, expected Struct() but found Struct("a": Struct("b": Binary, "x": Boolean), "c": Binary, "d": List(Struct("e": Struct("f": Float64, "g": Float64))), "y": Boolean) at column index 0
select * from json2_table order by ts;
Error: 3001(EngineExecuteQuery), Invalid argument error: column types must match schema types, expected Struct() but found Struct("a": Struct("b": Binary, "x": Boolean), "c": Binary, "d": List(Struct("e": Struct("f": Float64, "g": Float64))), "y": Boolean) at column index 1
select j.a.b + 1 from json2_table order by ts;
+------------------------------------------------------------+
| json_get(json2_table.j,Utf8("a.b"),Int64(NULL)) + Int64(1) |
+------------------------------------------------------------+
| 2 |
| -1 |
| 4 |
| -3 |
| |
| |
| |
| 9 |
| |
| 11 |
+------------------------------------------------------------+
select abs(j.a.b) from json2_table order by ts;
Error: 3000(PlanQuery), Failed to plan SQL: Error during planning: Function 'abs' expects NativeType::Numeric but received NativeType::String No function matches the given name and argument types 'abs(Utf8View)'. You might need to add explicit type casts.
Candidate functions:
abs(Numeric(1))
-- "j.c" is of type "String", "abs" is expected to be all "null"s.
select abs(j.c) from json2_table order by ts;
Error: 3000(PlanQuery), Failed to plan SQL: Error during planning: Function 'abs' expects NativeType::Numeric but received NativeType::String No function matches the given name and argument types 'abs(Utf8View)'. You might need to add explicit type casts.
Candidate functions:
abs(Numeric(1))
select j.d from json2_table order by ts;
+-----------------------------------+

View File

@@ -42,8 +42,21 @@ explain select j.a.x::bool from json2_table;
select j.a.b from json2_table order by ts;
select j.a, j.a.x from json2_table order by ts;
select j.c, j.y from json2_table order by ts;
select j from json2_table order by ts;
select * from json2_table order by ts;
select j.a.b + 1 from json2_table order by ts;
select abs(j.a.b) from json2_table order by ts;
-- "j.c" is of type "String", "abs" is expected to be all "null"s.
select abs(j.c) from json2_table order by ts;
select j.d from json2_table order by ts;
drop table json2_table;