flush
(?) query-driven and data-driven concretize
(?) select
(?) compaction

Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
luofucong
2026-03-03 17:34:05 +08:00
parent e26ce9ca32
commit ecc7c01e74
65 changed files with 2230 additions and 395 deletions

1
Cargo.lock generated
View File

@@ -7986,6 +7986,7 @@ version = "1.0.0"
dependencies = [
"api",
"aquamarine",
"arrow-schema 57.3.0",
"async-channel 1.9.0",
"async-stream",
"async-trait",

View File

@@ -248,6 +248,7 @@ impl ObjbenchCommand {
op_type: OperationType::Flush,
metadata: region_meta,
source: FlatSource::Stream(reader_stream),
schema: None,
cache_manager,
storage: None,
max_sequence: None,

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod json2_get;
mod json2_get_rewriter;
pub mod json_get;
mod json_get_rewriter;
mod json_is;
@@ -26,6 +28,8 @@ use json_is::{
JsonIsArray, JsonIsBool, JsonIsFloat, JsonIsInt, JsonIsNull, JsonIsObject, JsonIsString,
};
use json_to_string::JsonToStringFunction;
use json2_get::Json2GetFunction;
use json2_get_rewriter::Json2GetRewriter;
use parse_json::ParseJsonFunction;
use crate::function_registry::FunctionRegistry;
@@ -44,6 +48,7 @@ impl JsonFunction {
registry.register_scalar(JsonGetBool::default());
registry.register_scalar(JsonGetObject::default());
registry.register_scalar(JsonGetWithType::default());
registry.register_scalar(Json2GetFunction::default());
registry.register_scalar(JsonIsNull::default());
registry.register_scalar(JsonIsInt::default());
@@ -57,5 +62,6 @@ impl JsonFunction {
registry.register_scalar(json_path_match::JsonPathMatchFunction::default());
registry.register_function_rewrite(JsonGetRewriter);
registry.register_function_rewrite(Json2GetRewriter);
}
}

View File

@@ -0,0 +1,173 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use arrow::array::StringViewBuilder;
use arrow_cast::display::ArrayFormatter;
use datafusion_common::arrow::array::{Array, ArrayRef, StructArray, new_null_array};
use datafusion_common::arrow::datatypes::{DataType, Field};
use datafusion_common::{DataFusionError, Result, ScalarValue, exec_err, internal_err};
use datafusion_expr::{
ColumnarValue, Expr, ReturnFieldArgs, ScalarFunctionArgs, Signature, Volatility,
};
use datatypes::vectors::json::array::JsonArray;
use derive_more::Display;
use crate::function::Function;
#[derive(Display, Debug)]
#[display("{}", Self::NAME.to_ascii_uppercase())]
pub struct Json2GetFunction {
signature: Signature,
}
impl Json2GetFunction {
pub const NAME: &'static str = "json2_get";
}
impl Function for Json2GetFunction {
fn name(&self) -> &str {
Self::NAME
}
fn return_type(&self, _: &[DataType]) -> Result<DataType> {
internal_err!("this method isn't meant to be called")
}
fn signature(&self) -> &Signature {
&self.signature
}
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
if args.args.len() != 3 {
return exec_err!("json2_get expects 3 arguments, got {}", args.args.len());
}
let input = args.args[0].to_array(args.number_rows)?;
let path = path_from_arg(&args.args[1])?;
let return_type = args.return_field.data_type();
common_telemetry::debug!(
"input datatype: {}, path: {}, return_type: {}",
input.data_type(),
path,
return_type
);
let segments = path.split('.').collect::<Vec<_>>();
let Some(mut leaf) = resolve_leaf_path(&input, &segments) else {
return Ok(ColumnarValue::Array(new_null_array(
return_type,
input.len(),
)));
};
common_telemetry::debug!("leaf datatype before: {}", leaf.data_type());
if matches!(leaf.data_type(), DataType::Struct(_) | DataType::Binary) {
let mut decoded = StringViewBuilder::with_capacity(leaf.len());
for i in 0..leaf.len() {
let value = JsonArray::serialize_arrow_value(&leaf, i)
.map_err(|e| DataFusionError::Internal(e.to_string()))?;
common_telemetry::debug!("leaf {}: {:?}", i, value);
if value.is_null() {
decoded.append_null();
} else {
decoded.append_value(value.to_string());
}
}
leaf = Arc::new(decoded.finish());
}
common_telemetry::debug!(
"leaf datatype after: {}, can_cast_types: {}",
leaf.data_type(),
arrow_cast::can_cast_types(leaf.data_type(), return_type)
);
let casted = if arrow_cast::can_cast_types(leaf.data_type(), return_type) {
arrow_cast::cast(leaf.as_ref(), return_type)?
} else if return_type.is_string() {
cast_array_to_string(&leaf)?
} else {
return Ok(ColumnarValue::Array(new_null_array(
return_type,
input.len(),
)));
};
Ok(ColumnarValue::Array(casted))
}
fn return_field_from_args(&self, args: ReturnFieldArgs<'_>) -> Result<Arc<Field>> {
let Some(Some(value)) = args.scalar_arguments.get(2) else {
return internal_err!(
"third argument of function {} must be present and is scalar",
self.name()
);
};
Ok(Arc::new(Field::new(
"json2_get expected type",
value.data_type(),
true,
)))
}
}
impl Default for Json2GetFunction {
fn default() -> Self {
Self {
signature: Signature::any(3, Volatility::Immutable),
}
}
}
fn path_from_arg(arg: &ColumnarValue) -> Result<&String> {
match arg {
ColumnarValue::Scalar(ScalarValue::Utf8(Some(path)))
| ColumnarValue::Scalar(ScalarValue::LargeUtf8(Some(path)))
| ColumnarValue::Scalar(ScalarValue::Utf8View(Some(path))) => Ok(path),
ColumnarValue::Scalar(_) => exec_err!("json2_get expects a string path"),
ColumnarValue::Array(_) => exec_err!("json2_get expects a literal path"),
}
}
fn resolve_leaf_path(array: &ArrayRef, segments: &[&str]) -> Option<ArrayRef> {
if segments.is_empty() {
return None;
}
let mut current = array.clone();
for segment in segments {
let struct_array = current.as_any().downcast_ref::<StructArray>()?;
let DataType::Struct(fields) = current.data_type() else {
unreachable!()
};
let index = fields.iter().position(|field| field.name() == *segment)?;
current = struct_array.column(index).clone();
}
Some(current)
}
fn cast_array_to_string(array: &ArrayRef) -> Result<ArrayRef> {
let mut builder = StringViewBuilder::with_capacity(array.len());
let formatter = ArrayFormatter::try_new(array, &Default::default())?;
for i in 0..array.len() {
let value = array.is_valid(i).then(|| formatter.value(i).to_string());
builder.append_option(value);
}
Ok(Arc::new(builder.finish()))
}
pub fn datatype_expr(data_type: &DataType) -> Result<Expr> {
ScalarValue::try_new_null(data_type).map(|x| Expr::Literal(x, None))
}

View File

@@ -0,0 +1,82 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use arrow_schema::DataType;
use datafusion::common::config::ConfigOptions;
use datafusion::common::tree_node::Transformed;
use datafusion::common::{DFSchema, Result};
use datafusion::logical_expr::expr_rewriter::FunctionRewrite;
use datafusion::scalar::ScalarValue;
use datafusion_common::{exec_err, internal_err};
use datafusion_expr::Expr;
use crate::scalars::json::json2_get::{Json2GetFunction, datatype_expr};
#[derive(Debug)]
pub(crate) struct Json2GetRewriter;
impl FunctionRewrite for Json2GetRewriter {
fn name(&self) -> &'static str {
"Json2GetRewriter"
}
fn rewrite(
&self,
expr: Expr,
_schema: &DFSchema,
_config: &ConfigOptions,
) -> Result<Transformed<Expr>> {
let (expr, rewritten) = reduce_arrow_cast(expr)?;
if rewritten {
Ok(Transformed::yes(expr))
} else {
Ok(Transformed::no(expr))
}
}
}
// arrow_cast(json2_get(_, _, _), "<type>") => json2_get(_, _, "<type>")
fn reduce_arrow_cast(expr: Expr) -> Result<(Expr, bool)> {
let mut f = match expr {
Expr::ScalarFunction(f) => f,
expr => return Ok((expr, false)),
};
if f.name() != "arrow_cast" {
return Ok((Expr::ScalarFunction(f), false));
}
if !matches!(&f.args[0], Expr::ScalarFunction(f) if f.name() == Json2GetFunction::NAME) {
return Ok((Expr::ScalarFunction(f), false));
}
if f.args.len() != 2 {
return internal_err!("arrow_cast must have 2 arguments");
}
let target_type = match &f.args[1] {
Expr::Literal(ScalarValue::Utf8(Some(target_type)), _) => target_type
.parse::<DataType>()
.map_err(Into::into)
.and_then(|x| datatype_expr(&x))?,
x => return exec_err!("arrow_cast expects 2nd argument a string, got: {:?}", x),
};
let Expr::ScalarFunction(mut json2_get) = f.args.remove(0) else {
// checked in above "matches!"
unreachable!()
};
if json2_get.args.len() != 3 {
return internal_err!("function {} must have 3 arguments", Json2GetFunction::NAME);
}
json2_get.args[2] = target_type;
Ok((Expr::ScalarFunction(json2_get), true))
}

View File

@@ -188,13 +188,6 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to align JSON array, reason: {reason}"))]
AlignJsonArray {
reason: String,
#[snafu(implicit)]
location: Location,
},
}
impl ErrorExt for Error {
@@ -210,8 +203,7 @@ impl ErrorExt for Error {
| Error::ToArrowScalar { .. }
| Error::ProjectArrowRecordBatch { .. }
| Error::PhysicalExpr { .. }
| Error::RecordBatchSliceIndexOverflow { .. }
| Error::AlignJsonArray { .. } => StatusCode::Internal,
| Error::RecordBatchSliceIndexOverflow { .. } => StatusCode::Internal,
Error::PollStream { .. } => StatusCode::EngineExecuteQuery,

View File

@@ -20,10 +20,11 @@ use datafusion::arrow::util::pretty::pretty_format_batches;
use datafusion_common::arrow::array::ArrayRef;
use datafusion_common::arrow::compute;
use datafusion_common::arrow::datatypes::{DataType as ArrowDataType, SchemaRef as ArrowSchemaRef};
use datatypes::arrow::array::{Array, AsArray, RecordBatchOptions, StructArray, new_null_array};
use datatypes::arrow::array::{Array, AsArray, RecordBatchOptions};
use datatypes::extension::json::is_json_extension_type;
use datatypes::prelude::DataType;
use datatypes::schema::SchemaRef;
use datatypes::vectors::json::array::JsonArray;
use datatypes::vectors::{Helper, VectorRef};
use serde::ser::{Error, SerializeStruct};
use serde::{Serialize, Serializer};
@@ -31,8 +32,8 @@ use snafu::{OptionExt, ResultExt, ensure};
use crate::DfRecordBatch;
use crate::error::{
self, AlignJsonArraySnafu, ArrowComputeSnafu, ColumnNotExistsSnafu, DataTypesSnafu,
NewDfRecordBatchSnafu, ProjectArrowRecordBatchSnafu, Result,
self, ArrowComputeSnafu, ColumnNotExistsSnafu, DataTypesSnafu, ProjectArrowRecordBatchSnafu,
Result,
};
/// A two-dimensional batch of column-oriented data with a defined schema.
@@ -354,81 +355,7 @@ pub fn merge_record_batches(schema: SchemaRef, batches: &[RecordBatch]) -> Resul
Ok(RecordBatch::from_df_record_batch(schema, record_batch))
}
/// Align a json array `json_array` to the json type `schema_type`. The `schema_type` is often the
/// "largest" json type after some insertions in the table schema, while the json array previously
/// written in the SST could be lagged behind it. So it's important to "amend" the json array's
/// missing fields with null arrays, to align the array's data type with the provided one.
///
/// # Panics
///
/// - The json array is not an Arrow [StructArray], or the provided data type `schema_type` is not
/// of Struct type. Both of which shouldn't happen unless we switch our implementation of how
/// json array is physically stored.
pub fn align_json_array(json_array: &ArrayRef, schema_type: &ArrowDataType) -> Result<ArrayRef> {
let json_type = json_array.data_type();
if json_type == schema_type {
return Ok(json_array.clone());
}
let json_array = json_array.as_struct();
let array_fields = json_array.fields();
let array_columns = json_array.columns();
let ArrowDataType::Struct(schema_fields) = schema_type else {
unreachable!()
};
let mut aligned = Vec::with_capacity(schema_fields.len());
// Compare the fields in the json array and the to-be-aligned schema, amending with null arrays
// on the way. It's very important to note that fields in the json array and in the json type
// are both SORTED.
let mut i = 0; // point to the schema fields
let mut j = 0; // point to the array fields
while i < schema_fields.len() && j < array_fields.len() {
let schema_field = &schema_fields[i];
let array_field = &array_fields[j];
if schema_field.name() == array_field.name() {
if matches!(schema_field.data_type(), ArrowDataType::Struct(_)) {
// A `StructArray`s in a json array must be another json array. (Like a nested json
// object in a json value.)
aligned.push(align_json_array(
&array_columns[j],
schema_field.data_type(),
)?);
} else {
aligned.push(array_columns[j].clone());
}
j += 1;
} else {
aligned.push(new_null_array(schema_field.data_type(), json_array.len()));
}
i += 1;
}
if i < schema_fields.len() {
for field in &schema_fields[i..] {
aligned.push(new_null_array(field.data_type(), json_array.len()));
}
}
ensure!(
j == array_fields.len(),
AlignJsonArraySnafu {
reason: format!(
"this json array has more fields {:?}",
array_fields[j..]
.iter()
.map(|x| x.name())
.collect::<Vec<_>>(),
)
}
);
let json_array =
StructArray::try_new(schema_fields.clone(), aligned, json_array.nulls().cloned())
.context(NewDfRecordBatchSnafu)?;
Ok(Arc::new(json_array))
}
fn maybe_align_json_array_with_schema(
pub fn maybe_align_json_array_with_schema(
schema: &ArrowSchemaRef,
arrays: Vec<ArrayRef>,
) -> Result<Vec<ArrayRef>> {
@@ -443,7 +370,9 @@ fn maybe_align_json_array_with_schema(
continue;
}
let json_array = align_json_array(&array, field.data_type())?;
let json_array = JsonArray::from(&array)
.try_align(field.data_type())
.context(DataTypesSnafu)?;
aligned.push(json_array);
}
Ok(aligned)
@@ -453,12 +382,8 @@ fn maybe_align_json_array_with_schema(
mod tests {
use std::sync::Arc;
use datatypes::arrow::array::{
AsArray, BooleanArray, Float64Array, Int64Array, ListArray, UInt32Array,
};
use datatypes::arrow::datatypes::{
DataType, Field, Fields, Int64Type, Schema as ArrowSchema, UInt32Type,
};
use datatypes::arrow::array::{AsArray, UInt32Array};
use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema, UInt32Type};
use datatypes::arrow_array::StringArray;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema};
@@ -466,165 +391,6 @@ mod tests {
use super::*;
#[test]
fn test_align_json_array() -> Result<()> {
struct TestCase {
json_array: ArrayRef,
schema_type: DataType,
expected: std::result::Result<ArrayRef, String>,
}
impl TestCase {
fn new(
json_array: StructArray,
schema_type: Fields,
expected: std::result::Result<Vec<ArrayRef>, String>,
) -> Self {
Self {
json_array: Arc::new(json_array),
schema_type: DataType::Struct(schema_type.clone()),
expected: expected
.map(|x| Arc::new(StructArray::new(schema_type, x, None)) as ArrayRef),
}
}
fn test(self) -> Result<()> {
let result = align_json_array(&self.json_array, &self.schema_type);
match (result, self.expected) {
(Ok(json_array), Ok(expected)) => assert_eq!(&json_array, &expected),
(Ok(json_array), Err(e)) => {
panic!("expecting error {e} but actually get: {json_array:?}")
}
(Err(e), Err(expected)) => assert_eq!(e.to_string(), expected),
(Err(e), Ok(_)) => return Err(e),
}
Ok(())
}
}
// Test empty json array can be aligned with a complex json type.
TestCase::new(
StructArray::new_empty_fields(2, None),
Fields::from(vec![
Field::new("int", DataType::Int64, true),
Field::new_struct(
"nested",
vec![Field::new("bool", DataType::Boolean, true)],
true,
),
Field::new("string", DataType::Utf8, true),
]),
Ok(vec![
Arc::new(Int64Array::new_null(2)) as ArrayRef,
Arc::new(StructArray::new_null(
Fields::from(vec![Arc::new(Field::new("bool", DataType::Boolean, true))]),
2,
)),
Arc::new(StringArray::new_null(2)),
]),
)
.test()?;
// Test simple json array alignment.
TestCase::new(
StructArray::from(vec![(
Arc::new(Field::new("float", DataType::Float64, true)),
Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])) as ArrayRef,
)]),
Fields::from(vec![
Field::new("float", DataType::Float64, true),
Field::new("string", DataType::Utf8, true),
]),
Ok(vec![
Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])) as ArrayRef,
Arc::new(StringArray::new_null(3)),
]),
)
.test()?;
// Test complex json array alignment.
TestCase::new(
StructArray::from(vec![
(
Arc::new(Field::new_list(
"list",
Field::new_list_field(DataType::Int64, true),
true,
)),
Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
Some(vec![Some(1)]),
None,
Some(vec![Some(2), Some(3)]),
])) as ArrayRef,
),
(
Arc::new(Field::new_struct(
"nested",
vec![Field::new("int", DataType::Int64, true)],
true,
)),
Arc::new(StructArray::from(vec![(
Arc::new(Field::new("int", DataType::Int64, true)),
Arc::new(Int64Array::from(vec![-1, -2, -3])) as ArrayRef,
)])),
),
(
Arc::new(Field::new("string", DataType::Utf8, true)),
Arc::new(StringArray::from(vec!["a", "b", "c"])),
),
]),
Fields::from(vec![
Field::new("bool", DataType::Boolean, true),
Field::new_list("list", Field::new_list_field(DataType::Int64, true), true),
Field::new_struct(
"nested",
vec![
Field::new("float", DataType::Float64, true),
Field::new("int", DataType::Int64, true),
],
true,
),
Field::new("string", DataType::Utf8, true),
]),
Ok(vec![
Arc::new(BooleanArray::new_null(3)) as ArrayRef,
Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
Some(vec![Some(1)]),
None,
Some(vec![Some(2), Some(3)]),
])),
Arc::new(StructArray::from(vec![
(
Arc::new(Field::new("float", DataType::Float64, true)),
Arc::new(Float64Array::new_null(3)) as ArrayRef,
),
(
Arc::new(Field::new("int", DataType::Int64, true)),
Arc::new(Int64Array::from(vec![-1, -2, -3])),
),
])),
Arc::new(StringArray::from(vec!["a", "b", "c"])),
]),
)
.test()?;
// Test align failed.
TestCase::new(
StructArray::try_from(vec![
("i", Arc::new(Int64Array::from(vec![1])) as ArrayRef),
("j", Arc::new(Int64Array::from(vec![2])) as ArrayRef),
])
.unwrap(),
Fields::from(vec![Field::new("i", DataType::Int64, true)]),
Err(
r#"Failed to align JSON array, reason: this json array has more fields ["j"]"#
.to_string(),
),
)
.test()?;
Ok(())
}
#[test]
fn test_record_batch() {
let arrow_schema = Arc::new(ArrowSchema::new(vec![

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#![recursion_limit = "256"]
pub mod alive_keeper;
pub mod config;
pub mod datanode;

View File

@@ -281,6 +281,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to align JSON array, reason: {reason}"))]
AlignJsonArray {
reason: String,
#[snafu(implicit)]
location: Location,
},
}
impl ErrorExt for Error {
@@ -324,7 +331,8 @@ impl ErrorExt for Error {
| ParseExtendedType { .. }
| InconsistentStructFieldsAndItems { .. }
| ArrowMetadata { .. }
| AlignJsonValue { .. } => StatusCode::Internal,
| AlignJsonValue { .. }
| AlignJsonArray { .. } => StatusCode::Internal,
}
}

View File

@@ -19,6 +19,7 @@
//! The struct will carry all the fields of the Json object. We will not flatten any json object in this implementation.
//!
pub mod requirement;
pub mod value;
use std::collections::{BTreeMap, HashSet};
@@ -26,12 +27,12 @@ use std::sync::Arc;
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value as Json};
use snafu::{OptionExt, ResultExt, ensure};
use snafu::{OptionExt, ResultExt};
use crate::error::{self, InvalidJsonSnafu, Result, SerializeSnafu};
use crate::json::value::{JsonValue, JsonVariant};
use crate::types::json_type::{JsonNativeType, JsonNumberType, JsonObjectType};
use crate::types::{StructField, StructType};
use crate::types::{JsonType, StructField, StructType};
use crate::value::{ListValue, StructValue, Value};
/// The configuration of JSON encoding
@@ -305,32 +306,45 @@ fn encode_json_array_with_context<'a>(
) -> Result<JsonValue> {
let json_array_len = json_array.len();
let mut items = Vec::with_capacity(json_array_len);
let mut element_type = item_type.cloned();
for (index, value) in json_array.into_iter().enumerate() {
let array_context = context.with_key(&index.to_string());
let item_value =
encode_json_value_with_context(value, element_type.as_ref(), &array_context)?;
let item_type = item_value.json_type().native_type().clone();
items.push(item_value.into_variant());
let item_value = encode_json_value_with_context(value, None, &array_context)?;
items.push(item_value);
}
// Determine the common type for the list
if let Some(current_type) = &element_type {
// It's valid for json array to have different types of items, for example,
// ["a string", 1]. However, the `JsonValue` will be converted to Arrow list array,
// which requires all items have exactly same type. So we forbid the different types
// case here. Besides, it's not common for items in a json array to differ. So I think
// we are good here.
ensure!(
item_type == *current_type,
error::InvalidJsonSnafu {
value: "all items in json array must have the same type"
}
);
} else {
element_type = Some(item_type);
// In specification, it's valid for a JSON array to have different types of items, for example,
// ["a string", 1]. However, in implementation, the `JsonValue` will be converted to Arrow list
// array, which requires all items have exactly the same type. So we merge out the maybe
// different item types to a unified type, and align all the item values to it.
let provided_item_type = item_type.map(|x| JsonType::new_json2(x.clone()));
let merged_item_type = if let Some((first, rests)) = items.split_first() {
let mut merged = first.json_type().clone();
for rest in rests.iter().map(|x| x.json_type()) {
merged.merge(rest)?;
}
Some(merged)
} else {
None
};
let unified_item_type = match (provided_item_type, merged_item_type) {
(Some(mut x), Some(y)) => {
x.merge(&y)?;
Some(x)
}
(Some(x), None) | (None, Some(x)) => Some(x),
(None, None) => None,
};
if let Some(unified_item_type) = unified_item_type {
for item in &mut items {
item.try_align(&unified_item_type)?;
}
}
let items = items
.into_iter()
.map(|x| x.into_variant())
.collect::<Vec<_>>();
Ok(JsonValue::new(JsonVariant::Array(items)))
}

View File

@@ -0,0 +1,77 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeMap;
use std::sync::Arc;
use crate::data_type::ConcreteDataType;
use crate::types::{StructField, StructType};
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct JsonPathTarget {
root: JsonPathTargetNode,
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
struct JsonPathTargetNode {
children: BTreeMap<String, JsonPathTargetNode>,
leaf_type: Option<ConcreteDataType>,
}
impl JsonPathTarget {
pub fn require_typed_path(&mut self, path: &str, data_type: ConcreteDataType) {
let mut current = &mut self.root;
for segment in path.split('.') {
current = current.children.entry(segment.to_string()).or_default();
}
current.require_leaf_type(data_type);
}
pub fn is_empty(&self) -> bool {
self.root.children.is_empty()
}
pub fn build_type(&self) -> Option<ConcreteDataType> {
if self.is_empty() {
None
} else {
Some(ConcreteDataType::Struct(self.root.build_struct_type()))
}
}
}
impl JsonPathTargetNode {
fn require_leaf_type(&mut self, data_type: ConcreteDataType) {
self.leaf_type = Some(data_type);
}
fn build_data_type(&self) -> ConcreteDataType {
if self.children.is_empty() {
self.leaf_type
.clone()
.unwrap_or_else(ConcreteDataType::string_datatype)
} else {
ConcreteDataType::Struct(self.build_struct_type())
}
}
fn build_struct_type(&self) -> StructType {
let fields = self
.children
.iter()
.map(|(name, child)| StructField::new(name.clone(), child.build_data_type(), true))
.collect::<Vec<_>>();
StructType::new(Arc::new(fields))
}
}

View File

@@ -161,12 +161,18 @@ impl JsonVariant {
};
JsonNativeType::Array(Box::new(item_type))
}
JsonVariant::Object(object) => JsonNativeType::Object(
object
.iter()
.map(|(k, v)| (k.clone(), v.native_type()))
.collect(),
),
JsonVariant::Object(object) => {
if object.is_empty() {
JsonNativeType::Null
} else {
JsonNativeType::Object(
object
.iter()
.map(|(k, v)| (k.clone(), v.native_type()))
.collect(),
)
}
}
JsonVariant::Variant(_) => JsonNativeType::Variant,
}
}
@@ -639,12 +645,18 @@ impl JsonVariantRef<'_> {
};
JsonNativeType::Array(Box::new(item_type))
}
JsonVariantRef::Object(object) => JsonNativeType::Object(
object
.iter()
.map(|(k, v)| (k.to_string(), native_type(v)))
.collect(),
),
JsonVariantRef::Object(object) => {
if object.is_empty() {
JsonNativeType::Null
} else {
JsonNativeType::Object(
object
.iter()
.map(|(k, v)| (k.to_string(), native_type(v)))
.collect(),
)
}
}
JsonVariantRef::Variant(_) => JsonNativeType::Variant,
}
}

View File

@@ -14,6 +14,7 @@
mod column_schema;
pub mod constraint;
pub mod ext;
use std::collections::HashMap;
use std::sync::Arc;

View File

@@ -0,0 +1,25 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::extension::json;
pub trait ArrowSchemaExt {
fn has_json_extension_field(&self) -> bool;
}
impl ArrowSchemaExt for arrow_schema::Schema {
fn has_json_extension_field(&self) -> bool {
self.fields().iter().any(json::is_json_extension_type)
}
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::borrow::Cow;
use std::collections::BTreeMap;
use std::fmt::{Debug, Display, Formatter};
use std::str::FromStr;
@@ -317,6 +318,29 @@ fn merge(this: &JsonNativeType, that: &JsonNativeType) -> JsonNativeType {
}
}
pub fn merge_as_json_type<'a>(
left: &'a ArrowDataType,
right: &ArrowDataType,
) -> Cow<'a, ArrowDataType> {
if left == right {
return Cow::Borrowed(left);
}
let mut left = JsonType::from(left);
let right = JsonType::from(right);
Cow::Owned(if left.merge(&right).is_ok() {
left.as_arrow_type()
} else {
ArrowDataType::Utf8
})
}
impl From<&ArrowDataType> for JsonType {
fn from(t: &ArrowDataType) -> Self {
JsonType::new_json2(JsonNativeType::from(&ConcreteDataType::from_arrow_type(t)))
}
}
impl DataType for JsonType {
fn name(&self) -> String {
match &self.format {

View File

@@ -3206,7 +3206,7 @@ pub(crate) mod tests {
]
.into(),
)),
48,
56,
);
}

View File

@@ -35,7 +35,7 @@ mod duration;
mod eq;
mod helper;
mod interval;
pub(crate) mod json;
pub mod json;
mod list;
mod null;
pub(crate) mod operations;

View File

@@ -12,4 +12,5 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod array;
pub(crate) mod builder;

View File

@@ -0,0 +1,423 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::cmp::Ordering;
use std::sync::Arc;
use arrow::compute;
use arrow::util::display::{ArrayFormatter, FormatOptions};
use arrow_array::cast::AsArray;
use arrow_array::types::{Float64Type, Int64Type, UInt64Type};
use arrow_array::{Array, ArrayRef, GenericListArray, StructArray, new_null_array};
use arrow_schema::DataType;
use snafu::ResultExt;
use crate::arrow_array::{MutableBinaryArray, StringArray, binary_array_value, string_array_value};
use crate::error::{
AlignJsonArraySnafu, ArrowComputeSnafu, DeserializeSnafu, Result, SerializeSnafu,
UnsupportedArrowTypeSnafu,
};
pub struct JsonArray<'a> {
inner: &'a ArrayRef,
}
impl JsonArray<'_> {
pub fn serialize_arrow_value(array: &ArrayRef, index: usize) -> Result<serde_json::Value> {
if array.is_null(index) {
return Ok(serde_json::Value::Null);
}
let value = match array.data_type() {
DataType::Null => serde_json::Value::Null,
DataType::Boolean => serde_json::Value::Bool(array.as_boolean().value(index)),
DataType::Int64 => {
serde_json::Value::from(array.as_primitive::<Int64Type>().value(index))
}
DataType::UInt64 => {
serde_json::Value::from(array.as_primitive::<UInt64Type>().value(index))
}
DataType::Float64 => {
serde_json::Value::from(array.as_primitive::<Float64Type>().value(index))
}
DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => {
serde_json::Value::String(string_array_value(array, index).to_string())
}
DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
let v = binary_array_value(array, index);
serde_json::from_slice(v).with_context(|_| DeserializeSnafu {
json: String::from_utf8(v.to_vec()).unwrap_or_else(|_| format!("{v:?}")),
})?
}
DataType::Struct(_) => {
let struct_array = array.as_struct();
let object = struct_array
.fields()
.iter()
.zip(struct_array.columns())
.map(|(field, column)| {
Ok((
field.name().clone(),
Self::serialize_arrow_value(column, index)?,
))
})
.collect::<Result<serde_json::Map<String, serde_json::Value>>>()?;
serde_json::Value::Object(object)
}
DataType::List(_) => {
let list_array = array.as_list::<i32>();
let values = list_array.value(index);
serde_json::Value::Array(Self::serialize_arrow_values(&values)?)
}
other => {
return UnsupportedArrowTypeSnafu {
arrow_type: other.clone(),
}
.fail();
}
};
common_telemetry::debug!("after serialize: {:?}", value);
Ok(value)
}
fn serialize_arrow_values(array: &ArrayRef) -> Result<Vec<serde_json::Value>> {
(0..array.len())
.map(|index| Self::serialize_arrow_value(array, index))
.collect()
}
/// Align a JSON array to the `expect` data type. The `expect` data type is often the
/// "largest" JSON type after some insertions in the table schema, while the JSON array previously
/// written in the SST could be lagged behind it. So it's important to "align" the JSON array by
/// setting the missing fields with null arrays, or casting the data.
///
/// # Panics
///
/// - The JSON array is not an Arrow [StructArray], or the provided `expect` data type is not
/// of Struct type. Both of which shouldn't happen unless we switch our implementation of how
/// JSON array is physically stored.
pub fn try_align(&self, expect: &DataType) -> Result<ArrayRef> {
common_telemetry::debug!("try align from {} to {}", self.inner.data_type(), expect);
let json_type = self.inner.data_type();
if json_type == expect {
return Ok(self.inner.clone());
}
let struct_array = self.inner.as_struct();
let array_fields = struct_array.fields();
let array_columns = struct_array.columns();
let DataType::Struct(expect_fields) = expect else {
unreachable!()
};
let mut aligned = Vec::with_capacity(expect_fields.len());
// Compare the fields in the JSON array and the to-be-aligned schema, amending with null arrays
// on the way. It's very important to note that fields in the JSON array and those in the JSON type
// are both **SORTED**.
debug_assert!(expect_fields.iter().map(|f| f.name()).is_sorted());
debug_assert!(array_fields.iter().map(|f| f.name()).is_sorted());
let mut i = 0; // point to the expect fields
let mut j = 0; // point to the array fields
while i < expect_fields.len() && j < array_fields.len() {
let expect_field = &expect_fields[i];
let array_field = &array_fields[j];
match expect_field.name().cmp(array_field.name()) {
Ordering::Equal => {
if expect_field.data_type() == array_field.data_type() {
aligned.push(array_columns[j].clone());
} else {
let expect_type = expect_field.data_type();
let array_type = array_field.data_type();
let array = match (expect_type, array_type) {
(DataType::Struct(_), DataType::Struct(_)) => {
JsonArray::from(&array_columns[j]).try_align(expect_type)?
}
(DataType::List(expect_item), DataType::List(array_item)) => {
let list_array = array_columns[j].as_list::<i32>();
let item_aligned =
match (expect_item.data_type(), array_item.data_type()) {
(DataType::Struct(_), DataType::Struct(_)) => {
JsonArray::from(list_array.values())
.try_align(expect_item.data_type())?
}
_ => JsonArray::from(list_array.values())
.try_cast(expect_item.data_type())?,
};
Arc::new(
GenericListArray::<i32>::try_new(
expect_item.clone(),
list_array.offsets().clone(),
item_aligned,
list_array.nulls().cloned(),
)
.context(ArrowComputeSnafu)?,
)
}
_ => JsonArray::from(&array_columns[j]).try_cast(expect_type)?,
};
aligned.push(array);
}
i += 1;
j += 1;
}
Ordering::Less => {
aligned.push(new_null_array(expect_field.data_type(), struct_array.len()));
i += 1;
}
Ordering::Greater => {
j += 1;
}
}
}
if i < expect_fields.len() {
for field in &expect_fields[i..] {
aligned.push(new_null_array(field.data_type(), struct_array.len()));
}
}
let json_array = StructArray::try_new(
expect_fields.clone(),
aligned,
struct_array.nulls().cloned(),
)
.map_err(|e| {
AlignJsonArraySnafu {
reason: e.to_string(),
}
.build()
})?;
Ok(Arc::new(json_array))
}
fn try_decode_variant(&self) -> Result<ArrayRef> {
let json_values = Self::serialize_arrow_values(self.inner)?;
let serialized_values = json_values
.iter()
.map(|value| {
(!value.is_null())
.then(|| serde_json::to_vec(value))
.transpose()
})
.collect::<std::result::Result<Vec<_>, _>>()
.context(SerializeSnafu)?;
let total_bytes = serialized_values.iter().flatten().map(Vec::len).sum();
let mut builder = MutableBinaryArray::with_capacity(self.inner.len(), total_bytes);
for serialized_value in serialized_values {
if let Some(bytes) = serialized_value {
builder.append_value(bytes);
} else {
builder.append_null();
}
}
Ok(Arc::new(builder.finish()))
}
fn try_cast(&self, to_type: &DataType) -> Result<ArrayRef> {
if matches!(to_type, DataType::Binary) {
return self.try_decode_variant();
}
if compute::can_cast_types(self.inner.data_type(), to_type) {
return compute::cast(&self.inner, to_type).context(ArrowComputeSnafu);
}
let formatter = ArrayFormatter::try_new(&self.inner, &FormatOptions::default())
.context(ArrowComputeSnafu)?;
let values = (0..self.inner.len())
.map(|i| {
self.inner
.is_valid(i)
.then(|| formatter.value(i).to_string())
})
.collect::<Vec<_>>();
Ok(Arc::new(StringArray::from(values)))
}
}
impl<'a> From<&'a ArrayRef> for JsonArray<'a> {
fn from(inner: &'a ArrayRef) -> Self {
Self { inner }
}
}
#[cfg(test)]
mod test {
use arrow_array::types::Int64Type;
use arrow_array::{BooleanArray, Float64Array, Int64Array, ListArray};
use arrow_schema::{Field, Fields};
use super::*;
#[test]
fn test_align_json_array() -> Result<()> {
struct TestCase {
json_array: ArrayRef,
schema_type: DataType,
expected: std::result::Result<ArrayRef, String>,
}
impl TestCase {
fn new(
json_array: StructArray,
schema_type: Fields,
expected: std::result::Result<Vec<ArrayRef>, String>,
) -> Self {
Self {
json_array: Arc::new(json_array),
schema_type: DataType::Struct(schema_type.clone()),
expected: expected
.map(|x| Arc::new(StructArray::new(schema_type, x, None)) as ArrayRef),
}
}
fn test(self) -> Result<()> {
let result = JsonArray::from(&self.json_array).try_align(&self.schema_type);
match (result, self.expected) {
(Ok(json_array), Ok(expected)) => assert_eq!(&json_array, &expected),
(Ok(json_array), Err(e)) => {
panic!("expecting error {e} but actually get: {json_array:?}")
}
(Err(e), Err(expected)) => assert_eq!(e.to_string(), expected),
(Err(e), Ok(_)) => return Err(e),
}
Ok(())
}
}
// Test empty json array can be aligned with a complex json type.
TestCase::new(
StructArray::new_empty_fields(2, None),
Fields::from(vec![
Field::new("int", DataType::Int64, true),
Field::new_struct(
"nested",
vec![Field::new("bool", DataType::Boolean, true)],
true,
),
Field::new("string", DataType::Utf8, true),
]),
Ok(vec![
Arc::new(Int64Array::new_null(2)) as ArrayRef,
Arc::new(StructArray::new_null(
Fields::from(vec![Arc::new(Field::new("bool", DataType::Boolean, true))]),
2,
)),
Arc::new(StringArray::new_null(2)),
]),
)
.test()?;
// Test simple json array alignment.
TestCase::new(
StructArray::from(vec![(
Arc::new(Field::new("float", DataType::Float64, true)),
Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])) as ArrayRef,
)]),
Fields::from(vec![
Field::new("float", DataType::Float64, true),
Field::new("string", DataType::Utf8, true),
]),
Ok(vec![
Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])) as ArrayRef,
Arc::new(StringArray::new_null(3)),
]),
)
.test()?;
// Test complex json array alignment.
TestCase::new(
StructArray::from(vec![
(
Arc::new(Field::new_list(
"list",
Field::new_list_field(DataType::Int64, true),
true,
)),
Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
Some(vec![Some(1)]),
None,
Some(vec![Some(2), Some(3)]),
])) as ArrayRef,
),
(
Arc::new(Field::new_struct(
"nested",
vec![Field::new("int", DataType::Int64, true)],
true,
)),
Arc::new(StructArray::from(vec![(
Arc::new(Field::new("int", DataType::Int64, true)),
Arc::new(Int64Array::from(vec![-1, -2, -3])) as ArrayRef,
)])),
),
(
Arc::new(Field::new("string", DataType::Utf8, true)),
Arc::new(StringArray::from(vec!["a", "b", "c"])),
),
]),
Fields::from(vec![
Field::new("bool", DataType::Boolean, true),
Field::new_list("list", Field::new_list_field(DataType::Int64, true), true),
Field::new_struct(
"nested",
vec![
Field::new("float", DataType::Float64, true),
Field::new("int", DataType::Int64, true),
],
true,
),
Field::new("string", DataType::Utf8, true),
]),
Ok(vec![
Arc::new(BooleanArray::new_null(3)) as ArrayRef,
Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
Some(vec![Some(1)]),
None,
Some(vec![Some(2), Some(3)]),
])),
Arc::new(StructArray::from(vec![
(
Arc::new(Field::new("float", DataType::Float64, true)),
Arc::new(Float64Array::new_null(3)) as ArrayRef,
),
(
Arc::new(Field::new("int", DataType::Int64, true)),
Arc::new(Int64Array::from(vec![-1, -2, -3])),
),
])),
Arc::new(StringArray::from(vec!["a", "b", "c"])),
]),
)
.test()?;
// Test align failed.
TestCase::new(
StructArray::try_from(vec![
("i", Arc::new(Int64Array::from(vec![1])) as ArrayRef),
("j", Arc::new(Int64Array::from(vec![2])) as ArrayRef),
])
.unwrap(),
Fields::from(vec![Field::new("i", DataType::Int64, true)]),
Err(
r#"Failed to align JSON array, reason: this json array has more fields ["j"]"#
.to_string(),
),
)
.test()?;
Ok(())
}
}

View File

@@ -16,6 +16,7 @@ workspace = true
[dependencies]
api.workspace = true
aquamarine.workspace = true
arrow-schema.workspace = true
async-channel = "1.9"
common-stat.workspace = true
async-stream.workspace = true

View File

@@ -229,6 +229,7 @@ fn bulk_part_converter(c: &mut Criterion) {
&FlatSchemaOptions {
raw_pk_columns: false,
string_pk_use_dict: false,
..Default::default()
},
);
let mut converter = BulkPartConverter::new(&metadata, schema, rows, codec, false);
@@ -255,6 +256,7 @@ fn bulk_part_converter(c: &mut Criterion) {
&FlatSchemaOptions {
raw_pk_columns: true,
string_pk_use_dict: true,
..Default::default()
},
);
let mut converter =

View File

@@ -15,6 +15,7 @@
use std::sync::Arc;
use std::time::{Duration, Instant};
use arrow_schema::SchemaRef;
use async_stream::try_stream;
use common_time::Timestamp;
use futures::{Stream, TryStreamExt};
@@ -403,7 +404,12 @@ impl AccessLayer {
}
FormatType::Flat => {
writer
.write_all_flat(request.source, request.max_sequence, write_opts)
.write_all_flat(
request.source,
request.schema,
request.max_sequence,
write_opts,
)
.await?
}
}
@@ -526,6 +532,8 @@ pub struct SstWriteRequest {
pub op_type: OperationType,
pub metadata: RegionMetadataRef,
pub source: FlatSource,
// FIXME(LFC): this schema is actually the "merged json2 datatype in `source` parquets", rename it
pub schema: Option<SchemaRef>,
pub cache_manager: CacheManagerRef,
#[allow(dead_code)]
pub storage: Option<String>,

View File

@@ -256,7 +256,12 @@ impl WriteCache {
}
crate::sst::FormatType::Flat => {
writer
.write_all_flat(write_request.source, write_request.max_sequence, write_opts)
.write_all_flat(
write_request.source,
write_request.schema,
write_request.max_sequence,
write_opts,
)
.await?
}
};
@@ -561,6 +566,7 @@ mod tests {
bloom_filter_index_config: Default::default(),
#[cfg(feature = "vector_index")]
vector_index_config: Default::default(),
schema: None,
};
let upload_request = SstUploadRequest {
@@ -664,6 +670,7 @@ mod tests {
bloom_filter_index_config: Default::default(),
#[cfg(feature = "vector_index")]
vector_index_config: Default::default(),
schema: None,
};
let write_opts = WriteOptions {
row_group_size: 512,
@@ -755,6 +762,7 @@ mod tests {
bloom_filter_index_config: Default::default(),
#[cfg(feature = "vector_index")]
vector_index_config: Default::default(),
schema: None,
};
let write_opts = WriteOptions {
row_group_size: 512,

View File

@@ -29,6 +29,7 @@ use std::time::Instant;
use api::v1::region::compact_request;
use api::v1::region::compact_request::Options;
use arrow_schema::SchemaRef;
use common_base::Plugins;
use common_base::cancellation::CancellationHandle;
use common_memory_manager::OnExhaustedPolicy;
@@ -39,6 +40,7 @@ use common_time::timestamp::TimeUnit;
use common_time::{TimeToLive, Timestamp};
use datafusion_common::ScalarValue;
use datafusion_expr::Expr;
use datatypes::schema::ext::ArrowSchemaExt;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
@@ -58,6 +60,7 @@ use crate::error::{
ManualCompactionOverrideSnafu, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu,
RemoteCompactionSnafu, Result, TimeRangePredicateOverflowSnafu, TimeoutSnafu,
};
use crate::memtable::merge_json_extension_fields;
use crate::metrics::{COMPACTION_STAGE_ELAPSED, INFLIGHT_COMPACTION_COUNT};
use crate::read::BoxedRecordBatchStream;
use crate::read::flat_projection::FlatProjectionMapper;
@@ -993,12 +996,29 @@ struct CompactionSstReaderBuilder<'a> {
impl CompactionSstReaderBuilder<'_> {
/// Builds [BoxedRecordBatchStream] that reads all SST files and yields batches in flat format for compaction.
async fn build_flat_sst_reader(self) -> Result<BoxedRecordBatchStream> {
async fn build_flat_sst_reader(self) -> Result<(Option<SchemaRef>, BoxedRecordBatchStream)> {
let scan_input = self.build_scan_input()?.with_compaction(true);
SeqScan::new(scan_input)
.build_flat_reader_for_compaction()
.await
let json_concretized_schema = if scan_input
.mapper
.output_schema()
.arrow_schema()
.has_json_extension_field()
{
let parquet_schemas = scan_input.collect_parquet_record_batch_schemas().await?;
if let Some((base, others)) = parquet_schemas.split_first() {
Some(merge_json_extension_fields(base, others))
} else {
None
}
} else {
None
};
let reader = SeqScan::new(scan_input)
.build_flat_reader_for_compaction(json_concretized_schema.clone())
.await?;
Ok((json_concretized_schema, reader))
}
fn build_scan_input(self) -> Result<ScanInput> {

View File

@@ -362,7 +362,7 @@ impl SstMerger for DefaultSstMerger {
time_range: output.output_time_range,
merge_mode,
};
let reader = builder.build_flat_sst_reader().await?;
let (schema, reader) = builder.build_flat_sst_reader().await?;
let source = FlatSource::Stream(reader);
let mut metrics = Metrics::new(WriteType::Compaction);
let region_metadata = compaction_region.region_metadata.clone();
@@ -373,6 +373,7 @@ impl SstMerger for DefaultSstMerger {
op_type: OperationType::Compact,
metadata: region_metadata.clone(),
source,
schema,
cache_manager: compaction_region.cache_manager.clone(),
storage,
max_sequence: max_sequence.map(NonZero::get),

View File

@@ -550,14 +550,14 @@ impl RegionFlushTask {
write_opts: &WriteOptions,
mem_ranges: MemtableRanges,
) -> Result<FlushFlatMemResult> {
let batch_schema = to_flat_sst_arrow_schema(
&version.metadata,
&FlatSchemaOptions::from_encoding(version.metadata.primary_key_encoding),
);
let mut options = FlatSchemaOptions::from_encoding(version.metadata.primary_key_encoding);
options.override_schema = mem_ranges.schema();
let batch_schema = to_flat_sst_arrow_schema(&version.metadata, &options);
let field_column_start =
flat_format::field_column_start(&version.metadata, batch_schema.fields().len());
let flat_sources = memtable_flat_sources(
batch_schema,
batch_schema.clone(),
mem_ranges,
&version.options,
field_column_start,
@@ -565,7 +565,8 @@ impl RegionFlushTask {
let mut tasks = Vec::with_capacity(flat_sources.encoded.len() + flat_sources.sources.len());
let num_encoded = flat_sources.encoded.len();
for (source, max_sequence) in flat_sources.sources {
let write_request = self.new_write_request(version, max_sequence, source);
let write_request =
self.new_write_request(version, max_sequence, source, batch_schema.clone());
let access_layer = self.access_layer.clone();
let write_opts = write_opts.clone();
let semaphore = self.flush_semaphore.clone();
@@ -643,6 +644,7 @@ impl RegionFlushTask {
version: &VersionRef,
max_sequence: u64,
source: FlatSource,
schema: SchemaRef,
) -> SstWriteRequest {
let flat_format = version
.options
@@ -653,6 +655,7 @@ impl RegionFlushTask {
op_type: OperationType::Flush,
metadata: version.metadata.clone(),
source,
schema: Some(schema),
cache_manager: self.cache_manager.clone(),
storage: version.options.storage.clone(),
max_sequence: Some(max_sequence),

View File

@@ -14,6 +14,7 @@
//! Memtables are write buffers for regions.
use std::borrow::Cow;
use std::collections::BTreeMap;
use std::fmt;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
@@ -62,6 +63,10 @@ pub use bulk::part::{
BulkPart, BulkPartEncoder, BulkPartMeta, UnorderedPart, record_batch_estimated_size,
sort_primary_key_record_batch,
};
use datatypes::arrow::datatypes::{Schema, SchemaRef};
use datatypes::extension::json;
use datatypes::schema::ext::ArrowSchemaExt;
use datatypes::types::json_type;
#[cfg(any(test, feature = "test"))]
pub use time_partition::filter_record_batch;
@@ -228,6 +233,55 @@ impl MemtableRanges {
.max()
.unwrap_or(0)
}
pub(crate) fn schema(&self) -> Option<SchemaRef> {
let mut schemas = self
.ranges
.values()
.filter_map(|x| x.record_batch_schema())
.collect::<Vec<_>>();
if schemas.iter().all(|x| !x.has_json_extension_field()) {
// If there are no JSON extension fields in any schemas, the invariant must be hold,
// that all schemas are same (they are all derived from same region metadata).
// So it's ok to return the first one as the schema of the whole memtable ranges.
return (!schemas.is_empty()).then(|| schemas.swap_remove(0));
}
// If there are JSON extension fields, by convention, only their concrete data types
// (Arrow's Struct) may differ. Other things like the metadata or the fields count are same.
// So to produce the final schema, we can solely merge the data types.
schemas
.split_first()
.map(|(first, rest)| merge_json_extension_fields(first, rest))
}
}
pub(crate) fn merge_json_extension_fields(base: &SchemaRef, others: &[SchemaRef]) -> SchemaRef {
let mut fields = base.fields().iter().cloned().collect::<Vec<_>>();
for (i, field) in fields.iter_mut().enumerate() {
if !json::is_json_extension_type(field) {
continue;
}
let merged = others
.iter()
.map(|x| Cow::Borrowed(x.field(i).data_type()))
.reduce(|acc, e| {
Cow::Owned(json_type::merge_as_json_type(acc.as_ref(), e.as_ref()).into_owned())
});
if let Some(merged) = merged
&& field.data_type() != merged.as_ref()
{
let merged =
json_type::merge_as_json_type(field.data_type(), merged.as_ref()).into_owned();
let mut new = field.as_ref().clone();
new.set_data_type(merged);
*field = Arc::new(new);
}
}
Arc::new(Schema::new_with_metadata(fields, base.metadata().clone()))
}
impl IterBuilder for MemtableRanges {
@@ -558,6 +612,11 @@ pub trait IterBuilder: Send + Sync {
.fail()
}
/// Returns the schema of record batches produced by this iterator.
fn record_batch_schema(&self) -> Option<SchemaRef> {
None
}
/// Returns the [EncodedRange] if the range is already encoded into SST.
fn encoded_range(&self) -> Option<EncodedRange> {
None
@@ -735,6 +794,11 @@ impl MemtableRange {
self.context.builder.is_record_batch()
}
/// Returns the schema of record batches if this range supports record batch iteration.
pub fn record_batch_schema(&self) -> Option<SchemaRef> {
self.context.builder.record_batch_schema()
}
pub fn num_rows(&self) -> usize {
self.stats.num_rows
}

View File

@@ -810,6 +810,10 @@ impl IterBuilder for BulkRangeIterBuilder {
fn encoded_range(&self) -> Option<EncodedRange> {
None
}
fn record_batch_schema(&self) -> Option<SchemaRef> {
Some(self.part.batch.schema())
}
}
impl IterBuilder for MultiBulkRangeIterBuilder {
@@ -842,6 +846,10 @@ impl IterBuilder for MultiBulkRangeIterBuilder {
fn encoded_range(&self) -> Option<EncodedRange> {
None
}
fn record_batch_schema(&self) -> Option<SchemaRef> {
self.part.record_batch_schema()
}
}
/// Iterator builder for encoded bulk range

View File

@@ -14,6 +14,7 @@
//! Bulk part encoder/decoder.
use std::borrow::Cow;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::{Duration, Instant};
@@ -34,8 +35,11 @@ use datatypes::arrow::datatypes::{
use datatypes::data_type::DataType;
use datatypes::extension::json::is_json_extension_type;
use datatypes::prelude::{MutableVector, Vector};
use datatypes::schema::ext::ArrowSchemaExt;
use datatypes::types::json_type;
use datatypes::value::ValueRef;
use datatypes::vectors::Helper;
use datatypes::vectors::json::array::JsonArray;
use mito_codec::key_values::{KeyValue, KeyValues};
use mito_codec::row_converter::PrimaryKeyCodec;
use parquet::arrow::ArrowWriter;
@@ -50,9 +54,9 @@ use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
use store_api::storage::{FileId, SequenceNumber, SequenceRange};
use crate::error::{
self, ColumnNotFoundSnafu, ComputeArrowSnafu, CreateDefaultSnafu, DataTypeMismatchSnafu,
EncodeMemtableSnafu, EncodeSnafu, InvalidMetadataSnafu, InvalidRequestSnafu,
NewRecordBatchSnafu, Result,
self, ColumnNotFoundSnafu, ComputeArrowSnafu, ConvertValueSnafu, CreateDefaultSnafu,
DataTypeMismatchSnafu, EncodeMemtableSnafu, EncodeSnafu, InvalidMetadataSnafu,
InvalidRequestSnafu, NewRecordBatchSnafu, Result,
};
use crate::memtable::bulk::context::BulkIterContextRef;
use crate::memtable::bulk::part_reader::EncodedBulkPartIter;
@@ -425,13 +429,15 @@ impl UnorderedPart {
return Ok(Some(self.parts[0].batch.clone()));
}
// Get the schema from the first part
// Get the schema from the first part and normalize JSON2 columns across all parts.
let schema = self.parts[0].batch.schema();
// Concatenate all record batches
let batches: Vec<RecordBatch> = self.parts.iter().map(|p| p.batch.clone()).collect();
let concatenated =
arrow::compute::concat_batches(&schema, &batches).context(ComputeArrowSnafu)?;
let concatenated = if schema.has_json_extension_field() {
let (schema, batches) = normalize_json_columns_for_concat(schema, &self.parts)?;
arrow::compute::concat_batches(&schema, &batches).context(ComputeArrowSnafu)?
} else {
arrow::compute::concat_batches(&schema, self.parts.iter().map(|x| &x.batch))
.context(ComputeArrowSnafu)?
};
// Sort the concatenated batch
let sorted_batch = sort_primary_key_record_batch(&concatenated)?;
@@ -468,6 +474,80 @@ impl UnorderedPart {
}
}
fn normalize_json_columns_for_concat(
base_schema: SchemaRef,
parts: &[BulkPart],
) -> Result<(SchemaRef, Vec<RecordBatch>)> {
common_telemetry::debug!("base schema: {:?}", base_schema);
debug_assert!(
parts
.iter()
.all(|x| x.batch.schema().fields().len() == base_schema.fields().len())
);
let mut merged_json_types = HashMap::new();
for (index, field) in base_schema.fields().iter().enumerate() {
if !is_json_extension_type(field) {
continue;
}
let merged = parts
.iter()
.map(|x| Cow::Borrowed(x.batch.schema_ref().field(index).data_type()))
.reduce(|acc, e| {
Cow::Owned(json_type::merge_as_json_type(acc.as_ref(), e.as_ref()).into_owned())
});
if let Some(merged) = merged {
merged_json_types.insert(index, merged.into_owned());
}
}
common_telemetry::debug!("merged json types: {:?}", merged_json_types);
if merged_json_types.is_empty() {
let batches = parts.iter().map(|p| p.batch.clone()).collect();
return Ok((base_schema, batches));
}
let fields = base_schema
.fields()
.iter()
.enumerate()
.map(|(index, field)| {
if let Some(data_type) = merged_json_types.get(&index) {
Arc::new(
Field::new(field.name().clone(), data_type.clone(), field.is_nullable())
.with_metadata(field.metadata().clone()),
)
} else {
field.clone()
}
})
.collect::<Vec<_>>();
let normalized_schema = Arc::new(Schema::new(fields));
let mut normalized_batches = Vec::with_capacity(parts.len());
for part in parts {
let mut columns = Vec::with_capacity(part.batch.num_columns());
for (index, column) in part.batch.columns().iter().enumerate() {
if let Some(target_type) = merged_json_types.get(&index) {
common_telemetry::debug!("merged json type is {}", target_type);
columns.push(
JsonArray::from(column)
.try_align(target_type)
.context(ConvertValueSnafu)?,
);
} else {
columns.push(column.clone());
}
}
let batch = RecordBatch::try_new(normalized_schema.clone(), columns)
.context(NewRecordBatchSnafu)?;
normalized_batches.push(batch);
}
Ok((normalized_schema, normalized_batches))
}
/// More accurate estimation of the size of a record batch.
pub fn record_batch_estimated_size(batch: &RecordBatch) -> usize {
batch
@@ -1334,6 +1414,11 @@ impl MultiBulkPart {
self.series_count
}
/// Returns the schema of batches in this part.
pub(crate) fn record_batch_schema(&self) -> Option<SchemaRef> {
self.batches.first().map(|batch| batch.schema())
}
/// Returns the number of record batches in this part.
pub fn num_batches(&self) -> usize {
self.batches.len()
@@ -1804,6 +1889,7 @@ mod tests {
&FlatSchemaOptions {
raw_pk_columns: false,
string_pk_use_dict: true,
..Default::default()
},
);
@@ -2241,6 +2327,7 @@ mod tests {
&FlatSchemaOptions {
raw_pk_columns: false,
string_pk_use_dict: true,
..Default::default()
},
);

View File

@@ -682,7 +682,7 @@ mod tests {
let rb = adapter.into_iter().next().unwrap().unwrap();
let mapper = FlatProjectionMapper::new(&metadata, [0, 3].into_iter()).unwrap();
assert_eq!(rb.schema(), mapper.input_arrow_schema(false));
assert_eq!(rb.schema(), mapper.input_arrow_schema(false, None));
// tag_0 + field_1 + ts + 3 internal columns.
assert_eq!(6, rb.num_columns());
assert_eq!(3, rb.num_rows());

View File

@@ -18,7 +18,6 @@ use std::collections::HashMap;
use std::sync::Arc;
use api::v1::SemanticType;
use common_recordbatch::recordbatch::align_json_array;
use datatypes::arrow::array::{
Array, ArrayRef, BinaryArray, BinaryBuilder, DictionaryArray, UInt32Array,
};
@@ -29,6 +28,7 @@ use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::DataType;
use datatypes::value::Value;
use datatypes::vectors::VectorRef;
use datatypes::vectors::json::array::JsonArray;
use mito_codec::row_converter::{
CompositeValues, PrimaryKeyCodec, SortField, build_primary_key_codec,
build_primary_key_codec_with_fields,
@@ -39,8 +39,8 @@ use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::storage::ColumnId;
use crate::error::{
CompatReaderSnafu, ComputeArrowSnafu, CreateDefaultSnafu, DecodeSnafu, EncodeSnafu,
NewRecordBatchSnafu, RecordBatchSnafu, Result, UnsupportedOperationSnafu,
CompatReaderSnafu, ComputeArrowSnafu, ConvertValueSnafu, CreateDefaultSnafu, DecodeSnafu,
EncodeSnafu, NewRecordBatchSnafu, Result, UnsupportedOperationSnafu,
};
use crate::read::flat_projection::{FlatProjectionMapper, flat_projected_columns};
use crate::sst::parquet::flat_format::primary_key_column_index;
@@ -241,8 +241,9 @@ impl FlatCompatBatch {
if let Some(ty) = cast_type {
let casted = if let Some(json_type) = ty.as_json() {
align_json_array(old_column, &json_type.as_arrow_type())
.context(RecordBatchSnafu)?
JsonArray::from(old_column)
.try_align(&json_type.as_arrow_type())
.context(ConvertValueSnafu)?
} else {
datatypes::arrow::compute::cast(old_column, &ty.as_arrow_type())
.context(ComputeArrowSnafu)?

View File

@@ -26,12 +26,14 @@ use datatypes::arrow::datatypes::{ArrowNativeType, BinaryType, DataType, SchemaR
use datatypes::arrow::error::ArrowError;
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::arrow_array::BinaryArray;
use datatypes::extension::json::is_json_extension_type;
use datatypes::timestamp::timestamp_array_to_primitive;
use datatypes::vectors::json::array::JsonArray;
use futures::{Stream, TryStreamExt};
use snafu::ResultExt;
use store_api::storage::SequenceNumber;
use crate::error::{ComputeArrowSnafu, Result};
use crate::error::{ComputeArrowSnafu, DataTypeMismatchSnafu, Result};
use crate::memtable::BoxedRecordBatchIterator;
use crate::metrics::READ_STAGE_ELAPSED;
use crate::read::BoxedRecordBatchStream;
@@ -258,14 +260,29 @@ impl BatchBuilder {
check_interleave_overflow(&self.batches, &self.schema, &self.indices)?;
let columns = (0..self.schema.fields.len())
.map(|column_idx| {
let arrays: Vec<_> = self
let columns = self
.schema
.fields()
.iter()
.enumerate()
.map(|(column_idx, field)| {
let arrays = self
.batches
.iter()
.map(|(_, batch)| batch.column(column_idx).as_ref())
.collect();
interleave(&arrays, &self.indices).context(ComputeArrowSnafu)
.map(|(_, batch)| {
let column = batch.column(column_idx);
let column = if is_json_extension_type(field) {
JsonArray::from(column)
.try_align(field.data_type())
.context(DataTypeMismatchSnafu)?
} else {
column.clone()
};
Ok(column)
})
.collect::<Result<Vec<_>>>()?;
let aligned = arrays.iter().map(|x| x.as_ref()).collect::<Vec<_>>();
interleave(&aligned, &self.indices).context(ComputeArrowSnafu)
})
.collect::<Result<Vec<_>>>()?;

View File

@@ -17,15 +17,21 @@
use std::sync::Arc;
use api::v1::SemanticType;
use arrow_schema::SchemaRef as ArrowSchemaRef;
use arrow_schema::extension::ExtensionType;
use common_error::ext::BoxedError;
use common_recordbatch::error::{ArrowComputeSnafu, ExternalSnafu, NewDfRecordBatchSnafu};
use common_recordbatch::error::{
ArrowComputeSnafu, DataTypesSnafu, ExternalSnafu, NewDfRecordBatchSnafu,
};
use common_recordbatch::{DfRecordBatch, RecordBatch};
use datatypes::arrow::array::Array;
use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field};
use datatypes::extension::json::JsonExtensionType;
use datatypes::prelude::{ConcreteDataType, DataType};
use datatypes::schema::{Schema, SchemaRef};
use datatypes::value::Value;
use datatypes::vectors::Helper;
use datatypes::vectors::json::array::JsonArray;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::storage::ColumnId;
@@ -43,6 +49,7 @@ use crate::sst::{
///
/// This mapper support duplicate and unsorted projection indices.
/// The output schema is determined by the projection indices.
#[derive(Clone)]
pub struct FlatProjectionMapper {
/// Metadata of the region.
metadata: RegionMetadataRef,
@@ -221,15 +228,15 @@ impl FlatProjectionMapper {
pub(crate) fn input_arrow_schema(
&self,
compaction: bool,
json_concretized_schema: Option<ArrowSchemaRef>,
) -> datatypes::arrow::datatypes::SchemaRef {
if !compaction {
self.input_arrow_schema.clone()
} else {
// For compaction, we need to build a different schema from encoding.
to_flat_sst_arrow_schema(
&self.metadata,
&FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding),
)
let mut options = FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding);
options.override_schema = json_concretized_schema;
to_flat_sst_arrow_schema(&self.metadata, &options)
}
}
@@ -240,6 +247,10 @@ impl FlatProjectionMapper {
self.output_schema.clone()
}
pub(crate) fn with_output_schema(&mut self, output_schema: SchemaRef) {
self.output_schema = output_schema;
}
/// Converts a flat format [RecordBatch] to a normal [RecordBatch].
///
/// The batch must match the `projection` using to build the mapper.
@@ -285,6 +296,13 @@ impl FlatProjectionMapper {
array = casted;
}
}
let field = self.output_schema.arrow_schema().field(output_idx);
if field.extension_type_name() == Some(JsonExtensionType::NAME) {
array = JsonArray::from(&array)
.try_align(field.data_type())
.context(DataTypesSnafu)?;
}
arrays.push(array);
}

View File

@@ -15,6 +15,7 @@
//! Structs for partition ranges.
use common_time::Timestamp;
use datatypes::arrow::datatypes::SchemaRef;
use smallvec::{SmallVec, smallvec};
use store_api::region_engine::PartitionRange;
use store_api::storage::TimeSeriesDistribution;
@@ -478,6 +479,11 @@ impl MemRangeBuilder {
pub(crate) fn stats(&self) -> &MemtableStats {
&self.stats
}
/// Returns the record batch schema for this memtable range if available.
pub(crate) fn record_batch_schema(&self) -> Option<SchemaRef> {
self.range.record_batch_schema()
}
}
#[cfg(test)]

View File

@@ -14,7 +14,7 @@
//! Scans a region according to the scan request.
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::fmt;
use std::num::NonZeroU64;
use std::sync::Arc;
@@ -27,11 +27,19 @@ use common_recordbatch::filter::SimpleFilterEvaluator;
use common_telemetry::tracing::Instrument;
use common_telemetry::{debug, error, tracing, warn};
use common_time::range::TimestampRange;
use datafusion::parquet::arrow::parquet_to_arrow_schema;
use datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr;
use datafusion_common::Column;
use datafusion_expr::Expr;
use datafusion_expr::utils::expr_to_columns;
use datatypes::arrow::datatypes::DataType as ArrowDataType;
use datatypes::data_type::{ConcreteDataType, DataType};
use datatypes::extension::json::is_json_extension_type;
use datatypes::schema::Schema;
use datatypes::schema::ext::ArrowSchemaExt;
use datatypes::types::json_type;
use futures::StreamExt;
use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData};
use partition::expr::PartitionExpr;
use smallvec::SmallVec;
use snafu::{OptionExt as _, ResultExt};
@@ -46,9 +54,9 @@ use tokio::sync::{Semaphore, mpsc};
use tokio_stream::wrappers::ReceiverStream;
use crate::access_layer::AccessLayerRef;
use crate::cache::CacheStrategy;
use crate::cache::{CacheStrategy, CachedSstMeta};
use crate::config::DEFAULT_MAX_CONCURRENT_SCAN_FILES;
use crate::error::{InvalidPartitionExprSnafu, InvalidRequestSnafu, Result};
use crate::error::{InvalidMetaSnafu, InvalidPartitionExprSnafu, InvalidRequestSnafu, Result};
#[cfg(feature = "enterprise")]
use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider};
use crate::memtable::{MemtableRange, RangesOptions};
@@ -75,7 +83,8 @@ use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBui
#[cfg(feature = "vector_index")]
use crate::sst::index::vector_index::applier::{VectorIndexApplier, VectorIndexApplierRef};
use crate::sst::parquet::file_range::PreFilterMode;
use crate::sst::parquet::reader::ReaderMetrics;
use crate::sst::parquet::metadata::MetadataLoader;
use crate::sst::parquet::reader::{MetadataCacheMetrics, ReaderMetrics};
#[cfg(feature = "vector_index")]
const VECTOR_INDEX_OVERFETCH_MULTIPLIER: usize = 2;
@@ -530,6 +539,7 @@ impl ScanRegion {
.with_merge_mode(self.version.options.merge_mode())
.with_series_row_selector(self.request.series_row_selector)
.with_distribution(self.request.distribution)
.with_json2_column_types(self.request.json2_column_types.clone())
.with_explain_flat_format(
self.version.options.sst_format == Some(crate::sst::FormatType::Flat),
)
@@ -554,6 +564,8 @@ impl ScanRegion {
} else {
input
};
let input = concretize_json2_types(input).await?;
Ok(input)
}
@@ -780,6 +792,107 @@ impl ScanRegion {
}
}
pub(crate) async fn concretize_json2_types(input: ScanInput) -> Result<ScanInput> {
let output_schema = input.mapper.output_schema();
let output_arrow_schema = output_schema.arrow_schema();
if !output_arrow_schema.has_json_extension_field() {
return Ok(input);
}
let memtable_schemas = input
.memtables
.iter()
.filter_map(|mem| mem.record_batch_schema())
.collect::<Vec<_>>();
let parquet_schemas = input.collect_parquet_record_batch_schemas().await?;
if memtable_schemas.is_empty()
&& parquet_schemas.is_empty()
// TODO(LFC): If we can concrete json2 type solely by query-driven hint, we can skip data-driven concretize.
&& input.json2_column_types.is_empty()
{
return Ok(input);
}
let mut column_schemas = output_schema.column_schemas().to_vec();
let mut changed = false;
for (idx, column_schema) in column_schemas.iter_mut().enumerate() {
let output_field = &output_arrow_schema.fields()[idx];
if !is_json_extension_type(output_field) {
continue;
}
let mut merged = input
.json2_column_types
.get(&column_schema.name)
.map(ConcreteDataType::as_arrow_type);
for schema in &memtable_schemas {
if let Some((_, field)) = schema.column_with_name(&column_schema.name) {
merge_json_type_candidate(&mut merged, field.data_type());
}
}
for schema in parquet_schemas.iter() {
if let Some((_, field)) = schema.as_ref().column_with_name(&column_schema.name) {
merge_json_type_candidate(&mut merged, field.data_type());
}
}
if let Some(merged) = merged
&& merged != *output_field.data_type()
{
column_schema.data_type = ConcreteDataType::from_arrow_type(&merged);
common_telemetry::info!("merged type: {}", column_schema.data_type);
changed = true;
}
}
if changed {
let mut mapper = Arc::unwrap_or_clone(input.mapper);
mapper.with_output_schema(Arc::new(Schema::new(column_schemas)));
Ok(ScanInput {
mapper: Arc::new(mapper),
..input
})
} else {
Ok(input)
}
}
fn merge_json_type_candidate(merged: &mut Option<ArrowDataType>, candidate: &ArrowDataType) {
match merged {
Some(current) => {
*current = json_type::merge_as_json_type(current, candidate).into_owned();
}
None => {
*merged = Some(candidate.clone());
}
}
}
async fn read_or_load_parquet_metadata(
file: &FileHandle,
access_layer: &AccessLayerRef,
cache_strategy: &CacheStrategy,
) -> Result<Arc<ParquetMetaData>> {
let mut metrics = MetadataCacheMetrics::default();
if let Some(metadata) = cache_strategy
.get_sst_meta_data(file.file_id(), &mut metrics, PageIndexPolicy::default())
.await
{
return Ok(metadata.parquet_metadata());
}
let file_path = file.file_path(access_layer.table_dir(), access_layer.path_type());
let file_size = file.meta_ref().file_size;
let metadata = MetadataLoader::new(access_layer.object_store().clone(), &file_path, file_size)
.load(&mut metrics)
.await
.and_then(|x| CachedSstMeta::try_new(&file_path, x))
.map(Arc::new)?;
cache_strategy.put_sst_meta_data(file.file_id(), metadata.clone());
Ok(metadata.parquet_metadata())
}
/// Returns true if the time range of a SST `file` matches the `predicate`.
fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
if predicate == &TimestampRange::min_to_max() {
@@ -839,6 +952,8 @@ pub struct ScanInput {
pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
/// Hint for the required distribution of the scanner.
pub(crate) distribution: Option<TimeSeriesDistribution>,
/// Query-driven target types for JSON2 columns.
json2_column_types: HashMap<String, ConcreteDataType>,
/// Whether the region's configured SST format is flat.
explain_flat_format: bool,
/// Snapshot upper bound bound at scan open and propagated back to the caller.
@@ -878,6 +993,7 @@ impl ScanInput {
merge_mode: MergeMode::default(),
series_row_selector: None,
distribution: None,
json2_column_types: HashMap::new(),
explain_flat_format: false,
snapshot_sequence: None,
compaction: false,
@@ -915,6 +1031,15 @@ impl ScanInput {
self
}
#[must_use]
fn with_json2_column_types(
mut self,
json2_column_types: HashMap<String, ConcreteDataType>,
) -> Self {
self.json2_column_types = json2_column_types;
self
}
/// Sets cache for this query.
#[must_use]
pub(crate) fn with_cache(mut self, cache: CacheStrategy) -> Self {
@@ -1288,6 +1413,35 @@ impl ScanInput {
pre_filter_mode(self.append_mode, self.merge_mode)
}
pub(crate) async fn collect_parquet_record_batch_schemas(
&self,
) -> Result<Vec<datatypes::arrow::datatypes::SchemaRef>> {
let mut schemas = Vec::with_capacity(self.files.len());
for file in &self.files {
let parquet_metadata =
read_or_load_parquet_metadata(file, &self.access_layer, &self.cache_strategy)
.await?;
let file_metadata = parquet_metadata.file_metadata();
let arrow_schema = parquet_to_arrow_schema(
file_metadata.schema_descr(),
file_metadata.key_value_metadata(),
)
.map_err(|e| {
InvalidMetaSnafu {
reason: format!(
"Failed to convert parquet metadata to arrow schema, file: {}, error: {e}",
file.file_id()
),
}
.build()
})?;
if arrow_schema.has_json_extension_field() {
schemas.push(Arc::new(arrow_schema));
}
}
Ok(schemas)
}
}
#[cfg(feature = "enterprise")]

View File

@@ -18,6 +18,7 @@ use std::fmt;
use std::sync::Arc;
use std::time::Instant;
use arrow_schema::SchemaRef as ArrowSchemaRef;
use async_stream::try_stream;
use common_error::ext::BoxedError;
use common_recordbatch::util::ChainedRecordBatchStream;
@@ -130,7 +131,10 @@ impl SeqScan {
///
/// # Panics
/// Panics if the compaction flag is not set.
pub async fn build_flat_reader_for_compaction(&self) -> Result<BoxedRecordBatchStream> {
pub async fn build_flat_reader_for_compaction(
&self,
json_concretized_schema: Option<ArrowSchemaRef>,
) -> Result<BoxedRecordBatchStream> {
assert!(self.stream_ctx.input.compaction);
let metrics_set = ExecutionPlanMetricsSet::new();
@@ -143,6 +147,7 @@ impl SeqScan {
partition_ranges,
&part_metrics,
self.pruner.clone(),
json_concretized_schema,
)
.await?;
Ok(reader)
@@ -155,6 +160,7 @@ impl SeqScan {
partition_ranges: &[PartitionRange],
part_metrics: &PartitionMetrics,
pruner: Arc<Pruner>,
json_concretized_schema: Option<ArrowSchemaRef>,
) -> Result<BoxedRecordBatchStream> {
pruner.add_partition_ranges(partition_ranges);
let partition_pruner = Arc::new(PartitionPruner::new(pruner, partition_ranges));
@@ -186,6 +192,7 @@ impl SeqScan {
None,
false,
compute_parallel_channel_size(DEFAULT_READ_BATCH_SIZE),
json_concretized_schema,
)
.await
}
@@ -201,6 +208,7 @@ impl SeqScan {
part_metrics: Option<&PartitionMetrics>,
skip_dedup: bool,
channel_size: usize,
json_concretized_schema: Option<ArrowSchemaRef>,
) -> Result<BoxedRecordBatchStream> {
if let Some(semaphore) = semaphore.as_ref() {
// Read sources in parallel.
@@ -219,7 +227,8 @@ impl SeqScan {
// that source may have duplicate rows.
sources.pop().unwrap()
} else {
let schema = mapper.input_arrow_schema(stream_ctx.input.compaction);
let schema =
mapper.input_arrow_schema(stream_ctx.input.compaction, json_concretized_schema);
let metrics_reporter = part_metrics.map(|m| m.merge_metrics_reporter());
let reader =
FlatMergeReader::new(schema, sources, DEFAULT_READ_BATCH_SIZE, metrics_reporter)
@@ -305,6 +314,7 @@ impl SeqScan {
Some(part_metrics),
false,
channel_size,
None,
)
.await?;

View File

@@ -545,6 +545,7 @@ impl SeriesDistributor {
Some(&part_metrics),
true,
channel_size,
None,
)
.await?;
let mut metrics = SeriesDistributorMetrics::default();

View File

@@ -22,6 +22,7 @@ use datatypes::arrow::datatypes::{
DataType as ArrowDataType, Field, FieldRef, Fields, Schema, SchemaRef,
};
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::extension::json::is_json_extension_type;
use datatypes::prelude::ConcreteDataType;
use datatypes::timestamp::timestamp_array_to_primitive;
use serde::{Deserialize, Serialize};
@@ -91,6 +92,7 @@ pub struct FlatSchemaOptions {
/// when storing primary key columns.
/// Only takes effect when `raw_pk_columns` is true.
pub string_pk_use_dict: bool,
pub override_schema: Option<SchemaRef>,
}
impl Default for FlatSchemaOptions {
@@ -98,6 +100,7 @@ impl Default for FlatSchemaOptions {
Self {
raw_pk_columns: true,
string_pk_use_dict: true,
override_schema: None,
}
}
}
@@ -111,6 +114,7 @@ impl FlatSchemaOptions {
Self {
raw_pk_columns: false,
string_pk_use_dict: false,
override_schema: None,
}
}
}
@@ -131,7 +135,22 @@ pub fn to_flat_sst_arrow_schema(
) -> SchemaRef {
let num_fields = flat_sst_arrow_schema_column_num(metadata, options);
let mut fields = Vec::with_capacity(num_fields);
let schema = metadata.schema.arrow_schema();
let mut schema = metadata.schema.arrow_schema().clone();
if let Some(override_schema) = &options.override_schema {
let mut fields = Vec::with_capacity(schema.fields().len());
for field in schema.fields() {
if is_json_extension_type(field)
&& let Some((_, override_field)) = override_schema.fields().find(field.name())
{
fields.push(override_field.clone());
} else {
fields.push(field.clone());
}
}
schema = Arc::new(Schema::new_with_metadata(fields, schema.metadata().clone()));
};
if options.raw_pk_columns {
for pk_id in &metadata.primary_key {
let pk_index = metadata.column_index_by_id(*pk_id).unwrap();

View File

@@ -1382,6 +1382,7 @@ mod tests {
bloom_filter_index_config: Default::default(),
#[cfg(feature = "vector_index")]
vector_index_config: Default::default(),
schema: None,
};
let mut metrics = Metrics::new(WriteType::Flush);
env.access_layer

View File

@@ -1254,7 +1254,7 @@ mod tests {
.await;
writer
.write_all_flat(flat_source, None, write_opts)
.write_all_flat(flat_source, None, None, write_opts)
.await
.unwrap()
.remove(0)
@@ -1366,7 +1366,7 @@ mod tests {
.await;
let info = writer
.write_all_flat(flat_source, None, &write_opts)
.write_all_flat(flat_source, None, None, &write_opts)
.await
.unwrap()
.remove(0);

View File

@@ -49,7 +49,7 @@ use store_api::storage::{ColumnId, SequenceNumber};
use crate::error::{
ComputeArrowSnafu, DecodeSnafu, InvalidParquetSnafu, InvalidRecordBatchSnafu,
NewRecordBatchSnafu, Result,
NewRecordBatchSnafu, RecordBatchSnafu, Result,
};
use crate::sst::parquet::format::{
FIXED_POS_COLUMN_NUM, FormatProjection, INTERNAL_COLUMN_NUM, PrimaryKeyArray,
@@ -103,6 +103,11 @@ impl FlatWriteFormat {
let sequence_array = Arc::new(UInt64Array::from(vec![override_sequence; batch.num_rows()]));
columns[sequence_column_index(batch.num_columns())] = sequence_array;
let columns = common_recordbatch::recordbatch::maybe_align_json_array_with_schema(
&self.arrow_schema,
columns,
)
.context(RecordBatchSnafu)?;
RecordBatch::try_new(self.arrow_schema.clone(), columns).context(NewRecordBatchSnafu)
}
}

View File

@@ -438,8 +438,7 @@ impl ParquetReaderBuilder {
.unwrap_or_else(|| region_meta.schema.clone());
// Create ArrowReaderMetadata for async stream building.
let arrow_reader_options =
ArrowReaderOptions::new().with_schema(read_format.arrow_schema().clone());
let arrow_reader_options = ArrowReaderOptions::new();
let arrow_metadata =
ArrowReaderMetadata::try_new(parquet_meta.clone(), arrow_reader_options)
.context(ReadDataPartSnafu)?;

View File

@@ -72,6 +72,7 @@ enum FlatBatchConverter {
}
impl FlatBatchConverter {
#[expect(unused)]
fn arrow_schema(&self) -> &SchemaRef {
match self {
FlatBatchConverter::Flat(f) => f.arrow_schema(),
@@ -275,15 +276,16 @@ where
pub async fn write_all_flat(
&mut self,
source: FlatSource,
override_schema: Option<SchemaRef>,
override_sequence: Option<SequenceNumber>,
opts: &WriteOptions,
) -> Result<SstInfoArray> {
let mut options = FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding);
options.override_schema = override_schema;
let converter = FlatBatchConverter::Flat(
FlatWriteFormat::new(
self.metadata.clone(),
&FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding),
)
.with_override_sequence(override_sequence),
FlatWriteFormat::new(self.metadata.clone(), &options)
.with_override_sequence(override_sequence),
);
let res = self.write_all_flat_inner(source, &converter, opts).await;
if res.is_err() {
@@ -406,7 +408,7 @@ where
let arrow_batch = converter.convert_batch(&record_batch)?;
let start = Instant::now();
self.maybe_init_writer(converter.arrow_schema(), opts)
self.maybe_init_writer(arrow_batch.schema_ref(), opts)
.await?
.write(&arrow_batch)
.await

View File

@@ -15,6 +15,7 @@
//! Planner, QueryEngine implementations based on DataFusion.
mod error;
mod json2_expr_planner;
mod planner;
use std::any::Any;

View File

@@ -0,0 +1,127 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use arrow_schema::Field;
use arrow_schema::extension::ExtensionType;
use common_function::scalars::json::json2_get::{Json2GetFunction, datatype_expr};
use common_function::scalars::udf::create_udf;
use datafusion_common::arrow::datatypes::DataType;
use datafusion_common::{Column, DataFusionError, Result, ScalarValue, TableReference};
use datafusion_expr::expr::{BinaryExpr, ScalarFunction};
use datafusion_expr::planner::{ExprPlanner, PlannerResult, RawBinaryExpr};
use datafusion_expr::{Expr, ExprSchemable, Operator};
use datatypes::extension::json::JsonExtensionType;
use sqlparser::ast::BinaryOperator;
#[derive(Debug)]
pub(crate) struct Json2ExprPlanner;
fn json2_get(base: Expr, path: String) -> Result<Expr> {
let args = vec![
base,
Expr::Literal(ScalarValue::Utf8(Some(path)), None),
datatype_expr(&DataType::Utf8View)?,
];
let function = create_udf(Arc::new(Json2GetFunction::default()));
Ok(Expr::ScalarFunction(ScalarFunction::new_udf(
Arc::new(function),
args,
)))
}
impl ExprPlanner for Json2ExprPlanner {
fn plan_binary_op(
&self,
expr: RawBinaryExpr,
schema: &datafusion_common::DFSchema,
) -> Result<PlannerResult<RawBinaryExpr>> {
let Some(operator) = parse_sql_binary_op(&expr.op) else {
return Ok(PlannerResult::Original(expr));
};
let left_type = expr.left.get_type(schema)?;
let right_type = expr.right.get_type(schema)?;
let left_rewritten = rewrite_expr_json2_get(&expr.left, right_type)?;
let right_rewritten = rewrite_expr_json2_get(&expr.right, left_type)?;
if left_rewritten.is_none() && right_rewritten.is_none() {
return Ok(PlannerResult::Original(expr));
}
let rewritten = Expr::BinaryExpr(BinaryExpr::new(
Box::new(left_rewritten.unwrap_or(expr.left)),
operator,
Box::new(right_rewritten.unwrap_or(expr.right)),
));
common_telemetry::debug!("json2 plan_binary_op: rewritten={rewritten:?}");
Ok(PlannerResult::Planned(rewritten))
}
fn plan_compound_identifier(
&self,
field: &Field,
qualifier: Option<&TableReference>,
nested_names: &[String],
) -> Result<PlannerResult<Vec<Expr>>> {
if field.extension_type_name() != Some(JsonExtensionType::NAME) {
return Ok(PlannerResult::Original(Vec::new()));
}
let path = nested_names.join(".");
let column = Column::from((qualifier, field));
json2_get(Expr::Column(column), path).map(PlannerResult::Planned)
}
}
fn rewrite_expr_json2_get(expr: &Expr, data_type: DataType) -> Result<Option<Expr>> {
let Expr::ScalarFunction(func) = expr else {
return Ok(None);
};
if func.func.name() != Json2GetFunction::NAME {
return Ok(None);
}
if func.args.len() != 3 {
return Err(DataFusionError::Internal(format!(
"Function {} is expected to have 3 arguments!",
func.name()
)));
}
let expected_expr = datatype_expr(&data_type)?;
let rewritten = Expr::ScalarFunction(ScalarFunction {
func: func.func.clone(),
args: vec![func.args[0].clone(), func.args[1].clone(), expected_expr],
});
Ok(Some(rewritten))
}
fn parse_sql_binary_op(op: &BinaryOperator) -> Option<Operator> {
match *op {
BinaryOperator::Gt => Some(Operator::Gt),
BinaryOperator::GtEq => Some(Operator::GtEq),
BinaryOperator::Lt => Some(Operator::Lt),
BinaryOperator::LtEq => Some(Operator::LtEq),
BinaryOperator::Eq => Some(Operator::Eq),
BinaryOperator::NotEq => Some(Operator::NotEq),
BinaryOperator::Plus => Some(Operator::Plus),
BinaryOperator::Minus => Some(Operator::Minus),
BinaryOperator::Multiply => Some(Operator::Multiply),
BinaryOperator::Divide => Some(Operator::Divide),
BinaryOperator::Modulo => Some(Operator::Modulo),
BinaryOperator::And => Some(Operator::And),
BinaryOperator::Or => Some(Operator::Or),
_ => None,
}
}

View File

@@ -38,6 +38,7 @@ use datafusion_sql::parser::Statement as DfStatement;
use session::context::QueryContextRef;
use snafu::{Location, ResultExt};
use crate::datafusion::json2_expr_planner::Json2ExprPlanner;
use crate::error::{CatalogSnafu, Result};
use crate::query_engine::{DefaultPlanDecoder, QueryEngineState};
@@ -87,6 +88,9 @@ impl DfContextProviderAdapter {
.map(|format| (format.get_ext().to_lowercase(), format))
.collect();
let mut expr_planners = SessionStateDefaults::default_expr_planners();
expr_planners.insert(0, Arc::new(Json2ExprPlanner));
Ok(Self {
engine_state,
session_state,
@@ -94,7 +98,7 @@ impl DfContextProviderAdapter {
table_provider,
query_ctx,
file_formats,
expr_planners: SessionStateDefaults::default_expr_planners(),
expr_planners,
})
}
}

View File

@@ -15,6 +15,7 @@
//! Dummy catalog for region server.
use std::any::Any;
use std::collections::HashMap;
use std::fmt;
use std::sync::{Arc, Mutex};
@@ -30,6 +31,7 @@ use datafusion::physical_plan::ExecutionPlan;
use datafusion_common::DataFusionError;
use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
use datatypes::arrow::datatypes::SchemaRef;
use datatypes::data_type::ConcreteDataType;
use futures::stream::BoxStream;
use session::context::{QueryContext, QueryContextRef};
use snafu::ResultExt;
@@ -282,6 +284,10 @@ impl DummyTableProvider {
self.scan_request.lock().unwrap().vector_search.clone()
}
pub fn with_json2_type_hint(&self, json2_column_types: &HashMap<String, ConcreteDataType>) {
self.scan_request.lock().unwrap().json2_column_types = json2_column_types.clone();
}
pub fn with_sequence(&self, sequence: u64) {
self.scan_request.lock().unwrap().memtable_max_sequence = Some(sequence);
}

View File

@@ -15,6 +15,7 @@
pub mod constant_term;
pub mod count_nest_aggr;
pub mod count_wildcard;
pub mod json2_scan_hint;
pub mod parallelize_scan;
pub mod pass_distribution;
pub mod remove_duplicate;

View File

@@ -0,0 +1,225 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use common_function::scalars::json::json2_get::Json2GetFunction;
use datafusion::datasource::DefaultTableSource;
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion};
use datafusion_common::{Result, ScalarValue, TableReference, internal_err};
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::{Expr, LogicalPlan};
use datafusion_optimizer::{OptimizerConfig, OptimizerRule};
use datatypes::data_type::ConcreteDataType;
use datatypes::json::requirement::JsonPathTarget;
use datatypes::types::JsonFormat;
use crate::dummy_catalog::DummyTableProvider;
#[derive(Debug)]
pub struct Json2ScanHintRule;
impl OptimizerRule for Json2ScanHintRule {
fn name(&self) -> &str {
"Json2ScanHintRule"
}
fn rewrite(
&self,
plan: LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
let requirements = Json2TypeRequirements::collect(&plan)?;
if requirements.is_empty() {
return Ok(Transformed::no(plan));
}
plan.transform_down(&mut |plan| match &plan {
LogicalPlan::TableScan(table_scan) => {
let Some(source) = table_scan
.source
.as_any()
.downcast_ref::<DefaultTableSource>()
else {
return Ok(Transformed::no(plan));
};
let Some(adapter) = source
.table_provider
.as_any()
.downcast_ref::<DummyTableProvider>()
else {
return Ok(Transformed::no(plan));
};
let hints =
requirements.merge(&table_scan.table_name, &adapter.region_metadata().schema);
adapter.with_json2_type_hint(&hints);
Ok(Transformed::yes(plan))
}
_ => Ok(Transformed::no(plan)),
})
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct Json2ColumnKey {
relation: Option<TableReference>,
name: String,
}
#[derive(Debug, Default)]
struct Json2TypeRequirements {
path_targets: HashMap<Json2ColumnKey, JsonPathTarget>,
}
impl Json2TypeRequirements {
fn collect(plan: &LogicalPlan) -> Result<Self> {
let mut collector = Self::default();
plan.apply(|node| {
for expr in node.expressions() {
let _ = expr.apply(|expr| {
if let Some((column, path, data_type)) = extract_json2_get(expr)? {
collector
.path_targets
.entry(column)
.or_default()
.require_typed_path(&path, data_type);
}
Ok(TreeNodeRecursion::Continue)
})?;
}
Ok(TreeNodeRecursion::Continue)
})?;
Ok(collector)
}
fn is_empty(&self) -> bool {
self.path_targets.is_empty()
}
fn merge(
&self,
table_name: &TableReference,
schema: &datatypes::schema::SchemaRef,
) -> HashMap<String, ConcreteDataType> {
let mut types = HashMap::new();
for column_schema in schema.column_schemas() {
let ConcreteDataType::Json(json_type) = &column_schema.data_type else {
continue;
};
if !matches!(json_type.format, JsonFormat::Json2(_)) {
continue;
}
let matching_keys = self
.path_targets
.iter()
.filter(|(key, _)| {
key.name == column_schema.name
&& key.relation.as_ref().is_none_or(|x| x == table_name)
})
.map(|(_, target)| target.clone())
.collect::<Vec<_>>();
if matching_keys.is_empty() {
continue;
}
let mut merged = JsonPathTarget::default();
for target in matching_keys {
if let Some(data_type) = target.build_type() {
merge_path_target_from_type(&mut merged, &data_type, "");
}
}
if let Some(data_type) = merged.build_type() {
let _ = types.insert(column_schema.name.clone(), data_type);
}
}
types
}
}
fn extract_json2_get(expr: &Expr) -> Result<Option<(Json2ColumnKey, String, ConcreteDataType)>> {
let Expr::ScalarFunction(ScalarFunction { func, args }) = expr else {
return Ok(None);
};
if func.name() != Json2GetFunction::NAME {
return Ok(None);
}
if args.len() != 3 {
return internal_err!("function {} must have 3 arguments", Json2GetFunction::NAME);
}
let Expr::Column(column) = &args[0] else {
return Ok(None);
};
let path = match &args[1] {
Expr::Literal(ScalarValue::Utf8(Some(path)), _)
| Expr::Literal(ScalarValue::LargeUtf8(Some(path)), _)
| Expr::Literal(ScalarValue::Utf8View(Some(path)), _) => path.clone(),
_ => return Ok(None),
};
let data_type = args
.get(2)
.and_then(extract_expected_type)
.unwrap_or_else(ConcreteDataType::string_datatype);
Ok(Some((
Json2ColumnKey {
relation: column.relation.clone(),
name: column.name.clone(),
},
path,
data_type,
)))
}
fn extract_expected_type(expr: &Expr) -> Option<ConcreteDataType> {
match expr {
Expr::Literal(value, _) => {
let data_type = value.data_type();
Some(ConcreteDataType::from_arrow_type(&data_type))
}
_ => None,
}
}
fn merge_path_target_from_type(
target: &mut JsonPathTarget,
data_type: &ConcreteDataType,
prefix: &str,
) {
match data_type {
ConcreteDataType::Struct(struct_type) => {
let fields = struct_type.fields();
for field in fields.iter() {
let path = if prefix.is_empty() {
field.name().to_string()
} else {
format!("{prefix}.{}", field.name())
};
merge_path_target_from_type(target, field.data_type(), &path);
}
}
_ => {
if !prefix.is_empty() {
target.require_typed_path(prefix, data_type.clone());
}
}
}
}

View File

@@ -62,6 +62,7 @@ use crate::optimizer::ExtensionAnalyzerRule;
use crate::optimizer::constant_term::MatchesConstantTermOptimizer;
use crate::optimizer::count_nest_aggr::CountNestAggrRule;
use crate::optimizer::count_wildcard::CountWildcardToTimeIndexRule;
use crate::optimizer::json2_scan_hint::Json2ScanHintRule;
use crate::optimizer::parallelize_scan::ParallelizeScan;
use crate::optimizer::pass_distribution::PassDistribution;
use crate::optimizer::remove_duplicate::RemoveDuplicate;
@@ -173,6 +174,7 @@ impl QueryEngineState {
analyzer.rules.push(Arc::new(FixStateUdafOrderingAnalyzer));
let mut optimizer = Optimizer::new();
optimizer.rules.push(Arc::new(Json2ScanHintRule));
optimizer.rules.push(Arc::new(ScanHintRule));
// add physical optimizer

View File

@@ -12,12 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::fmt::{Display, Formatter};
use common_error::ext::BoxedError;
use common_recordbatch::OrderOption;
use datafusion_expr::expr::Expr;
// Re-export vector types from datatypes to avoid duplication
use datatypes::data_type::ConcreteDataType;
pub use datatypes::schema::{VectorDistanceMetric, VectorIndexEngineType};
use strum::Display;
@@ -128,6 +130,8 @@ pub struct ScanRequest {
/// Optional hint for KNN vector search. When set, the scan should use
/// vector index to find the k nearest neighbors.
pub vector_search: Option<VectorSearchRequest>,
/// Optional target types for query-driven JSON2 concretization.
pub json2_column_types: HashMap<String, ConcreteDataType>,
}
impl ScanRequest {
@@ -227,6 +231,14 @@ impl Display for ScanRequest {
vector_search.metric
)?;
}
if !self.json2_column_types.is_empty() {
write!(
f,
"{}json2_column_types: {:?}",
delimiter.as_str(),
self.json2_column_types
)?;
}
write!(f, " }}")
}
}

View File

@@ -7,7 +7,7 @@ autotests = false
[[test]]
name = "main"
path = "tests/main.rs"
path = "tests/it/main.rs"
[features]
dashboard = ["servers/dashboard"]

View File

@@ -153,16 +153,10 @@ async fn query_data(frontend: &Arc<Instance>) -> io::Result<()> {
+----------+"#;
execute_sql_and_expect(frontend, sql, expected).await;
let sql = "SELECT * FROM bluesky ORDER BY time_us";
let expected = fs::read_to_string(find_workspace_path(
"tests-integration/resources/jsonbench-select-all.txt",
))?;
execute_sql_and_expect(frontend, sql, &expected).await;
// query 1:
let sql = "
SELECT
json_get_string(data, '$.commit.collection') AS event, count() AS count
data.commit.collection AS event, count() AS count
FROM bluesky
GROUP BY event
ORDER BY count DESC, event ASC";
@@ -180,13 +174,12 @@ ORDER BY count DESC, event ASC";
// query 2:
let sql = "
SELECT
json_get_string(data, '$.commit.collection') AS event,
data.commit.collection AS event,
count() AS count,
count(DISTINCT json_get_string(data, '$.did')) AS users
count(DISTINCT data.did) AS users
FROM bluesky
WHERE
(json_get_string(data, '$.kind') = 'commit') AND
(json_get_string(data, '$.commit.operation') = 'create')
data.kind = 'commit' AND data.commit.operation = 'create'
GROUP BY event
ORDER BY count DESC, event ASC";
let expected = r#"
@@ -203,15 +196,14 @@ ORDER BY count DESC, event ASC";
// query 3:
let sql = "
SELECT
json_get_string(data, '$.commit.collection') AS event,
date_part('hour', to_timestamp_micros(json_get_int(data, '$.time_us'))) as hour_of_day,
data.commit.collection AS event,
date_part('hour', to_timestamp_micros(arrow_cast(data.time_us, 'Int64'))) as hour_of_day,
count() AS count
FROM bluesky
WHERE
(json_get_string(data, '$.kind') = 'commit') AND
(json_get_string(data, '$.commit.operation') = 'create') AND
json_get_string(data, '$.commit.collection') IN
('app.bsky.feed.post', 'app.bsky.feed.repost', 'app.bsky.feed.like')
data.kind = 'commit' AND
data.commit.operation = 'create' AND
data.commit.collection in ('app.bsky.feed.post', 'app.bsky.feed.repost', 'app.bsky.feed.like')
GROUP BY event, hour_of_day
ORDER BY hour_of_day, event";
let expected = r#"
@@ -227,13 +219,13 @@ ORDER BY hour_of_day, event";
// query 4:
let sql = "
SELECT
json_get_string(data, '$.did') as user_id,
min(to_timestamp_micros(json_get_int(data, '$.time_us'))) AS first_post_ts
data.did::String as user_id,
min(to_timestamp_micros(arrow_cast(data.time_us, 'Int64'))) AS first_post_ts
FROM bluesky
WHERE
(json_get_string(data, '$.kind') = 'commit') AND
(json_get_string(data, '$.commit.operation') = 'create') AND
(json_get_string(data, '$.commit.collection') = 'app.bsky.feed.post')
data.kind = 'commit' AND
data.commit.operation = 'create' AND
data.commit.collection = 'app.bsky.feed.post'
GROUP BY user_id
ORDER BY first_post_ts ASC, user_id DESC
LIMIT 3";
@@ -250,17 +242,17 @@ LIMIT 3";
// query 5:
let sql = "
SELECT
json_get_string(data, '$.did') as user_id,
data.did::String as user_id,
date_part(
'epoch',
max(to_timestamp_micros(json_get_int(data, '$.time_us'))) -
min(to_timestamp_micros(json_get_int(data, '$.time_us')))
max(to_timestamp_micros(arrow_cast(data.time_us, 'Int64'))) -
min(to_timestamp_micros(arrow_cast(data.time_us, 'Int64')))
) AS activity_span
FROM bluesky
WHERE
(json_get_string(data, '$.kind') = 'commit') AND
(json_get_string(data, '$.commit.operation') = 'create') AND
(json_get_string(data, '$.commit.collection') = 'app.bsky.feed.post')
data.kind = 'commit' AND
data.commit.operation = 'create' AND
data.commit.collection = 'app.bsky.feed.post'
GROUP BY user_id
ORDER BY activity_span DESC, user_id DESC
LIMIT 3";
@@ -304,30 +296,21 @@ async fn insert_data_by_sql(frontend: &Arc<Instance>) -> io::Result<()> {
async fn desc_table(frontend: &Arc<Instance>) {
let sql = "DESC TABLE bluesky";
let expected = r#"
+---------+------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
| Column | Type | Key | Null | Default | Semantic Type |
+---------+------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
| data | Json<{"_raw":"<String>","commit.collection":"<String>","commit.operation":"<String>","did":"<String>","kind":"<String>","time_us":"<Number>"}> | | YES | | FIELD |
| time_us | TimestampMicrosecond | PRI | NO | | TIMESTAMP |
+---------+------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+"#;
+---------+----------------------+-----+------+---------+---------------+
| Column | Type | Key | Null | Default | Semantic Type |
+---------+----------------------+-----+------+---------+---------------+
| data | JSON2 | | YES | | FIELD |
| time_us | TimestampMicrosecond | PRI | NO | | TIMESTAMP |
+---------+----------------------+-----+------+---------+---------------+"#;
execute_sql_and_expect(frontend, sql, expected).await;
}
async fn create_table(frontend: &Arc<Instance>) {
let sql = r#"
CREATE TABLE bluesky (
"data" JSON (
format = "partial",
fields = Struct<
kind String,
"commit.operation" String,
"commit.collection" String,
did String,
time_us Bigint
>,
),
"data" JSON2,
time_us TimestampMicrosecond TIME INDEX,
)
) WITH ('append_mode' = 'true', 'sst_format' = 'flat')
"#;
execute_sql_and_expect(frontend, sql, "Affected Rows: 0").await;
}

View File

@@ -14,6 +14,14 @@ values (1, '{"a": {"b": 1}, "c": "s1", "d": [{"e": {"f": 0.1}}]}'),
Affected Rows: 2
admin flush_table('json2_table');
+----------------------------------+
| ADMIN flush_table('json2_table') |
+----------------------------------+
| 0 |
+----------------------------------+
insert into json2_table (ts, j)
values (3, '{"a": {"b": 3}, "c": "s3"}');
@@ -26,18 +34,148 @@ values (4, '{"a": {"b": -4}, "d": [{"e": {"g": -0.4}}]}'),
Affected Rows: 3
admin flush_table('json2_table');
+----------------------------------+
| ADMIN flush_table('json2_table') |
+----------------------------------+
| 0 |
+----------------------------------+
admin compact_table('json2_table', 'swcs', '86400');
+-----------------------------------------------------+
| ADMIN compact_table('json2_table', 'swcs', '86400') |
+-----------------------------------------------------+
| 0 |
+-----------------------------------------------------+
insert into json2_table
values (7, '{"a": {"b": "s7"}, "c": [1], "d": [{"e": {"g": -0.7}}]}'),
(8, '{"a": {"b": 8}, "c": "s8"}');
Affected Rows: 2
admin flush_table('json2_table');
+----------------------------------+
| ADMIN flush_table('json2_table') |
+----------------------------------+
| 0 |
+----------------------------------+
insert into json2_table
values (9, '{"a": {"x": true}, "c": "s9", "d": [{"e": {"g": -0.9}}]}'),
(10, '{"a": {"b": 10}, "y": false}');
Affected Rows: 2
select j.a.b from json2_table order by ts;
+-----------------------------------------------------+
| json2_get(json2_table.j,Utf8("a.b"),Utf8View(NULL)) |
+-----------------------------------------------------+
| 1 |
| -2 |
| 3 |
| -4 |
| |
| |
| "s7" |
| 8 |
| |
| 10 |
+-----------------------------------------------------+
select j.a, j.a.x from json2_table order by ts;
+---------------------------------------------------+-----------------------------------------------------+
| json2_get(json2_table.j,Utf8("a"),Utf8View(NULL)) | json2_get(json2_table.j,Utf8("a.x"),Utf8View(NULL)) |
+---------------------------------------------------+-----------------------------------------------------+
| {"b":1,"x":null} | |
| {"b":-2,"x":null} | |
| {"b":3,"x":null} | |
| {"b":-4,"x":null} | |
| {"b":null,"x":null} | |
| | |
| {"b":"s7","x":null} | |
| {"b":8,"x":null} | |
| {"b":null,"x":true} | true |
| {"b":10,"x":null} | |
+---------------------------------------------------+-----------------------------------------------------+
select j.c, j.y from json2_table order by ts;
+---------------------------------------------------+---------------------------------------------------+
| json2_get(json2_table.j,Utf8("c"),Utf8View(NULL)) | json2_get(json2_table.j,Utf8("y"),Utf8View(NULL)) |
+---------------------------------------------------+---------------------------------------------------+
| "s1" | |
| "s2" | |
| "s3" | |
| | |
| "s5" | |
| "s6" | |
| [1] | |
| "s8" | |
| "s9" | |
| | false |
+---------------------------------------------------+---------------------------------------------------+
select j from json2_table order by ts;
Error: 3001(EngineExecuteQuery), Invalid argument error: column types must match schema types, expected Struct() but found Struct("a": Struct("b": Binary, "x": Boolean), "c": Binary, "d": List(Struct("e": Struct("f": Float64, "g": Float64))), "y": Boolean) at column index 0
select * from json2_table order by ts;
Error: 3001(EngineExecuteQuery), Invalid argument error: column types must match schema types, expected Struct() but found Struct("a": Struct("b": Binary, "x": Boolean), "c": Binary, "d": List(Struct("e": Struct("f": Float64, "g": Float64))), "y": Boolean) at column index 1
select j.a.b + 1 from json2_table order by ts;
+-------------------------------------------------------------+
| json2_get(json2_table.j,Utf8("a.b"),Int64(NULL)) + Int64(1) |
+-------------------------------------------------------------+
| 2 |
| -1 |
| 4 |
| -3 |
| |
| |
| |
| 9 |
| |
| 11 |
+-------------------------------------------------------------+
select abs(j.a.b) from json2_table order by ts;
Error: 3000(PlanQuery), Failed to plan SQL: Error during planning: Function 'abs' expects NativeType::Numeric but received NativeType::String No function matches the given name and argument types 'abs(Utf8View)'. You might need to add explicit type casts.
Candidate functions:
abs(Numeric(1))
-- "j.c" is of type "String", "abs" is expected to be all "null"s.
select abs(j.c) from json2_table order by ts;
Error: 3000(PlanQuery), Failed to plan SQL: Error during planning: Function 'abs' expects NativeType::Numeric but received NativeType::String No function matches the given name and argument types 'abs(Utf8View)'. You might need to add explicit type casts.
Candidate functions:
abs(Numeric(1))
select j.d from json2_table order by ts;
+---------------------------------------------------+
| json2_get(json2_table.j,Utf8("d"),Utf8View(NULL)) |
+---------------------------------------------------+
| [{"e":{"f":0.1,"g":null}}] |
| [{"e":{"f":0.2,"g":null}}] |
| |
| [{"e":{"f":null,"g":-0.4}}] |
| |
| |
| [{"e":{"g":-0.7}}] |
| |
| [{"e":{"g":-0.9}}] |
| |
+---------------------------------------------------+
drop table json2_table;
Affected Rows: 0

View File

@@ -10,6 +10,8 @@ insert into json2_table (ts, j)
values (1, '{"a": {"b": 1}, "c": "s1", "d": [{"e": {"f": 0.1}}]}'),
(2, '{"a": {"b": -2}, "c": "s2", "d": [{"e": {"f": 0.2}}]}');
admin flush_table('json2_table');
insert into json2_table (ts, j)
values (3, '{"a": {"b": 3}, "c": "s3"}');
@@ -18,12 +20,37 @@ values (4, '{"a": {"b": -4}, "d": [{"e": {"g": -0.4}}]}'),
(5, '{"a": {}, "c": "s5"}'),
(6, '{"c": "s6"}');
admin flush_table('json2_table');
admin compact_table('json2_table', 'swcs', '86400');
insert into json2_table
values (7, '{"a": {"b": "s7"}, "c": [1], "d": [{"e": {"g": -0.7}}]}'),
(8, '{"a": {"b": 8}, "c": "s8"}');
admin flush_table('json2_table');
insert into json2_table
values (9, '{"a": {"x": true}, "c": "s9", "d": [{"e": {"g": -0.9}}]}'),
(10, '{"a": {"b": 10}, "y": false}');
select j.a.b from json2_table order by ts;
select j.a, j.a.x from json2_table order by ts;
select j.c, j.y from json2_table order by ts;
select j from json2_table order by ts;
select * from json2_table order by ts;
select j.a.b + 1 from json2_table order by ts;
select abs(j.a.b) from json2_table order by ts;
-- "j.c" is of type "String", "abs" is expected to be all "null"s.
select abs(j.c) from json2_table order by ts;
select j.d from json2_table order by ts;
drop table json2_table;

View File

@@ -0,0 +1,176 @@
CREATE TABLE bluesky (
`data` JSON2,
time_us TimestampMicrosecond TIME INDEX
) WITH ('append_mode' = 'true', 'sst_format' = 'flat');
Affected Rows: 0
INSERT INTO bluesky (time_us, data)
VALUES (1732206349000167,
'{"did":"did:plc:yj3sjq3blzpynh27cumnp5ks","time_us":1732206349000167,"kind":"commit","commit":{"rev":"3lbhtytnn2k2f","operation":"create","collection":"app.bsky.feed.post","rkey":"3lbhtyteurk2y","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-21T16:09:27.095Z","langs":["en"],"reply":{"parent":{"cid":"bafyreibfglofvqou2yiqvwzk4rcgkhhxrbunyemshdjledgwymimqkg24e","uri":"at://did:plc:6tr6tuzlx2db3rduzr2d6r24/app.bsky.feed.post/3lbhqo2rtys2z"},"root":{"cid":"bafyreibfglofvqou2yiqvwzk4rcgkhhxrbunyemshdjledgwymimqkg24e","uri":"at://did:plc:6tr6tuzlx2db3rduzr2d6r24/app.bsky.feed.post/3lbhqo2rtys2z"}},"text":"aaaaah.  LIght shines in a corner of WTF...."},"cid":"bafyreidblutgvj75o4q4akzyyejedjj6l3it6hgqwee6jpwv2wqph5fsgm"}}');
Affected Rows: 1
INSERT INTO bluesky (time_us, data)
VALUES (1732206349000644,
'{"did":"did:plc:3i4xf2v4wcnyktgv6satke64","time_us":1732206349000644,"kind":"commit","commit":{"rev":"3lbhuvzds6d2a","operation":"create","collection":"app.bsky.feed.like","rkey":"3lbhuvzdked2a","record":{"$type":"app.bsky.feed.like","createdAt":"2024-11-21T16:25:46.221Z","subject":{"cid":"bafyreidjvrcmckkm765mct5fph36x7kupkfo35rjklbf2k76xkzwyiauge","uri":"at://did:plc:azrv4rcbws6kmcga4fsbphg2/app.bsky.feed.post/3lbgjdpbiec2l"}},"cid":"bafyreia5l5vrkh5oj4cjyhcqby2dprhyvcyofo2q5562tijlae2pzih23m"}}');
Affected Rows: 1
ADMIN flush_table('bluesky');
+------------------------------+
| ADMIN flush_table('bluesky') |
+------------------------------+
| 0 |
+------------------------------+
INSERT INTO bluesky (time_us, data)
VALUES (1732206349001108,
'{"did":"did:plc:gccfnqqizz4urhchsaie6jft","time_us":1732206349001108,"kind":"commit","commit":{"rev":"3lbhuvze3gi2u","operation":"create","collection":"app.bsky.graph.follow","rkey":"3lbhuvzdtmi2u","record":{"$type":"app.bsky.graph.follow","createdAt":"2024-11-21T16:27:40.923Z","subject":"did:plc:r7cdh4sgzqbfdc6wcdxxti7c"},"cid":"bafyreiew2p6cgirfaj45qoenm4fgumib7xoloclrap3jgkz5es7g7kby3i"}}');
Affected Rows: 1
INSERT INTO bluesky (time_us, data)
VALUES (1732206349001372,
'{"did":"did:plc:msxqf3twq7abtdw7dbfskphk","time_us":1732206349001372,"kind":"commit","commit":{"rev":"3lbhueija5p22","operation":"create","collection":"app.bsky.feed.like","rkey":"3lbhueiizcx22","record":{"$type":"app.bsky.feed.like","createdAt":"2024-11-21T16:15:58.232Z","subject":{"cid":"bafyreiavpshyqzrlo5m7fqodjhs6jevweqnif4phasiwimv4a7mnsqi2fe","uri":"at://did:plc:fusulxqc52zbrc75fi6xrcof/app.bsky.feed.post/3lbhskq5zn22f"}},"cid":"bafyreidjix4dauj2afjlbzmhj3a7gwftcevvmmy6edww6vrjdbst26rkby"}}');
Affected Rows: 1
ADMIN flush_table('bluesky');
+------------------------------+
| ADMIN flush_table('bluesky') |
+------------------------------+
| 0 |
+------------------------------+
INSERT INTO bluesky (time_us, data)
VALUES (1732206349001905,
'{"did":"did:plc:l5o3qjrmfztir54cpwlv2eme","time_us":1732206349001905,"kind":"commit","commit":{"rev":"3lbhtytohxc2o","operation":"create","collection":"app.bsky.feed.post","rkey":"3lbhtytjqzk2q","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-21T16:09:27.254Z","langs":["en"],"reply":{"parent":{"cid":"bafyreih35fe2jj3gchmgk4amold4l6sfxd2sby5wrg3jrws5fkdypxrbg4","uri":"at://did:plc:6wx2gg5yqgvmlu35r6y3bk6d/app.bsky.feed.post/3lbhtj2eb4s2o"},"root":{"cid":"bafyreifipyt3vctd4ptuoicvio7rbr5xvjv4afwuggnd2prnmn55mu6luu","uri":"at://did:plc:474ldquxwzrlcvjhhbbk2wte/app.bsky.feed.post/3lbhdzrynik27"}},"text":"okay i take mine back because I hadnt heard this one yet^^"},"cid":"bafyreigzdsdne3z2xxcakgisieyj7y47hj6eg7lj6v4q25ah5q2qotu5ku"}}');
Affected Rows: 1
ADMIN compact_table('bluesky', 'swcs', '86400');
+-------------------------------------------------+
| ADMIN compact_table('bluesky', 'swcs', '86400') |
+-------------------------------------------------+
| 0 |
+-------------------------------------------------+
SELECT count(*) FROM bluesky;
+----------+
| count(*) |
+----------+
| 5 |
+----------+
-- Query 1:
SELECT data.commit.collection AS event,
count() AS count
FROM bluesky
GROUP BY event
ORDER BY count DESC, event ASC;
+-----------------------+-------+
| event | count |
+-----------------------+-------+
| app.bsky.feed.like | 2 |
| app.bsky.feed.post | 2 |
| app.bsky.graph.follow | 1 |
+-----------------------+-------+
-- Query 2:
SELECT data.commit.collection AS event,
count() AS count,
count(DISTINCT data.did) AS users
FROM bluesky
WHERE data.kind = 'commit' AND data.commit.operation = 'create'
GROUP BY event
ORDER BY count DESC, event ASC;
+-----------------------+-------+-------+
| event | count | users |
+-----------------------+-------+-------+
| app.bsky.feed.like | 2 | 2 |
| app.bsky.feed.post | 2 | 2 |
| app.bsky.graph.follow | 1 | 1 |
+-----------------------+-------+-------+
-- Query 3:
SELECT data.commit.collection AS event,
date_part('hour', to_timestamp_micros(arrow_cast(data.time_us, 'Int64'))) as hour_of_day,
count() AS count
FROM bluesky
WHERE data.kind = 'commit'
AND data.commit.operation = 'create'
AND data.commit.collection in ('app.bsky.feed.post', 'app.bsky.feed.repost', 'app.bsky.feed.like')
GROUP BY event, hour_of_day
ORDER BY hour_of_day, event;
+--------------------+-------------+-------+
| event | hour_of_day | count |
+--------------------+-------------+-------+
| app.bsky.feed.like | 16 | 2 |
| app.bsky.feed.post | 16 | 2 |
+--------------------+-------------+-------+
-- Query 4:
SELECT data.did::String as user_id,
min(to_timestamp_micros(arrow_cast(data.time_us, 'Int64'))) AS first_post_ts
FROM bluesky
WHERE data.kind = 'commit'
AND data.commit.operation = 'create'
AND data.commit.collection = 'app.bsky.feed.post'
GROUP BY user_id
ORDER BY first_post_ts ASC, user_id DESC
LIMIT 3;
+----------------------------------+----------------------------+
| user_id | first_post_ts |
+----------------------------------+----------------------------+
| did:plc:yj3sjq3blzpynh27cumnp5ks | 2024-11-21T16:25:49.000167 |
| did:plc:l5o3qjrmfztir54cpwlv2eme | 2024-11-21T16:25:49.001905 |
+----------------------------------+----------------------------+
-- Query 5:
SELECT data.did::String as user_id,
date_part(
'epoch',
max(to_timestamp_micros(arrow_cast(data.time_us, 'Int64'))) -
min(to_timestamp_micros(arrow_cast(data.time_us, 'Int64')))
) AS activity_span
FROM bluesky
WHERE data.kind = 'commit'
AND data.commit.operation = 'create'
AND data.commit.collection = 'app.bsky.feed.post'
GROUP BY user_id
ORDER BY activity_span DESC, user_id DESC
LIMIT 3;
+----------------------------------+---------------+
| user_id | activity_span |
+----------------------------------+---------------+
| did:plc:yj3sjq3blzpynh27cumnp5ks | 0.0 |
| did:plc:l5o3qjrmfztir54cpwlv2eme | 0.0 |
+----------------------------------+---------------+
-- SQLNESS REPLACE (peers.*) REDACTED
EXPLAIN
SELECT date_part('hour', to_timestamp_micros(arrow_cast(data.time_us, 'Int64'))) as hour_of_day
FROM bluesky;
+---------------+--------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+--------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false, remote_input=[ |
| | Projection: date_part(Utf8("hour"), to_timestamp_micros(json2_get(bluesky.data, Utf8("time_us"), Int64(NULL)))) AS hour_of_day |
| | TableScan: bluesky |
| | ]] |
| physical_plan | CooperativeExec |
| | MergeScanExec: REDACTED
| | |
+---------------+--------------------------------------------------------------------------------------------------------------------------------+

View File

@@ -0,0 +1,90 @@
CREATE TABLE bluesky (
`data` JSON2,
time_us TimestampMicrosecond TIME INDEX
) WITH ('append_mode' = 'true', 'sst_format' = 'flat');
INSERT INTO bluesky (time_us, data)
VALUES (1732206349000167,
'{"did":"did:plc:yj3sjq3blzpynh27cumnp5ks","time_us":1732206349000167,"kind":"commit","commit":{"rev":"3lbhtytnn2k2f","operation":"create","collection":"app.bsky.feed.post","rkey":"3lbhtyteurk2y","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-21T16:09:27.095Z","langs":["en"],"reply":{"parent":{"cid":"bafyreibfglofvqou2yiqvwzk4rcgkhhxrbunyemshdjledgwymimqkg24e","uri":"at://did:plc:6tr6tuzlx2db3rduzr2d6r24/app.bsky.feed.post/3lbhqo2rtys2z"},"root":{"cid":"bafyreibfglofvqou2yiqvwzk4rcgkhhxrbunyemshdjledgwymimqkg24e","uri":"at://did:plc:6tr6tuzlx2db3rduzr2d6r24/app.bsky.feed.post/3lbhqo2rtys2z"}},"text":"aaaaah.  LIght shines in a corner of WTF...."},"cid":"bafyreidblutgvj75o4q4akzyyejedjj6l3it6hgqwee6jpwv2wqph5fsgm"}}');
INSERT INTO bluesky (time_us, data)
VALUES (1732206349000644,
'{"did":"did:plc:3i4xf2v4wcnyktgv6satke64","time_us":1732206349000644,"kind":"commit","commit":{"rev":"3lbhuvzds6d2a","operation":"create","collection":"app.bsky.feed.like","rkey":"3lbhuvzdked2a","record":{"$type":"app.bsky.feed.like","createdAt":"2024-11-21T16:25:46.221Z","subject":{"cid":"bafyreidjvrcmckkm765mct5fph36x7kupkfo35rjklbf2k76xkzwyiauge","uri":"at://did:plc:azrv4rcbws6kmcga4fsbphg2/app.bsky.feed.post/3lbgjdpbiec2l"}},"cid":"bafyreia5l5vrkh5oj4cjyhcqby2dprhyvcyofo2q5562tijlae2pzih23m"}}');
ADMIN flush_table('bluesky');
INSERT INTO bluesky (time_us, data)
VALUES (1732206349001108,
'{"did":"did:plc:gccfnqqizz4urhchsaie6jft","time_us":1732206349001108,"kind":"commit","commit":{"rev":"3lbhuvze3gi2u","operation":"create","collection":"app.bsky.graph.follow","rkey":"3lbhuvzdtmi2u","record":{"$type":"app.bsky.graph.follow","createdAt":"2024-11-21T16:27:40.923Z","subject":"did:plc:r7cdh4sgzqbfdc6wcdxxti7c"},"cid":"bafyreiew2p6cgirfaj45qoenm4fgumib7xoloclrap3jgkz5es7g7kby3i"}}');
INSERT INTO bluesky (time_us, data)
VALUES (1732206349001372,
'{"did":"did:plc:msxqf3twq7abtdw7dbfskphk","time_us":1732206349001372,"kind":"commit","commit":{"rev":"3lbhueija5p22","operation":"create","collection":"app.bsky.feed.like","rkey":"3lbhueiizcx22","record":{"$type":"app.bsky.feed.like","createdAt":"2024-11-21T16:15:58.232Z","subject":{"cid":"bafyreiavpshyqzrlo5m7fqodjhs6jevweqnif4phasiwimv4a7mnsqi2fe","uri":"at://did:plc:fusulxqc52zbrc75fi6xrcof/app.bsky.feed.post/3lbhskq5zn22f"}},"cid":"bafyreidjix4dauj2afjlbzmhj3a7gwftcevvmmy6edww6vrjdbst26rkby"}}');
ADMIN flush_table('bluesky');
INSERT INTO bluesky (time_us, data)
VALUES (1732206349001905,
'{"did":"did:plc:l5o3qjrmfztir54cpwlv2eme","time_us":1732206349001905,"kind":"commit","commit":{"rev":"3lbhtytohxc2o","operation":"create","collection":"app.bsky.feed.post","rkey":"3lbhtytjqzk2q","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-21T16:09:27.254Z","langs":["en"],"reply":{"parent":{"cid":"bafyreih35fe2jj3gchmgk4amold4l6sfxd2sby5wrg3jrws5fkdypxrbg4","uri":"at://did:plc:6wx2gg5yqgvmlu35r6y3bk6d/app.bsky.feed.post/3lbhtj2eb4s2o"},"root":{"cid":"bafyreifipyt3vctd4ptuoicvio7rbr5xvjv4afwuggnd2prnmn55mu6luu","uri":"at://did:plc:474ldquxwzrlcvjhhbbk2wte/app.bsky.feed.post/3lbhdzrynik27"}},"text":"okay i take mine back because I hadnt heard this one yet^^"},"cid":"bafyreigzdsdne3z2xxcakgisieyj7y47hj6eg7lj6v4q25ah5q2qotu5ku"}}');
ADMIN compact_table('bluesky', 'swcs', '86400');
SELECT count(*) FROM bluesky;
-- Query 1:
SELECT data.commit.collection AS event,
count() AS count
FROM bluesky
GROUP BY event
ORDER BY count DESC, event ASC;
-- Query 2:
SELECT data.commit.collection AS event,
count() AS count,
count(DISTINCT data.did) AS users
FROM bluesky
WHERE data.kind = 'commit' AND data.commit.operation = 'create'
GROUP BY event
ORDER BY count DESC, event ASC;
-- Query 3:
SELECT data.commit.collection AS event,
date_part('hour', to_timestamp_micros(arrow_cast(data.time_us, 'Int64'))) as hour_of_day,
count() AS count
FROM bluesky
WHERE data.kind = 'commit'
AND data.commit.operation = 'create'
AND data.commit.collection in ('app.bsky.feed.post', 'app.bsky.feed.repost', 'app.bsky.feed.like')
GROUP BY event, hour_of_day
ORDER BY hour_of_day, event;
-- Query 4:
SELECT data.did::String as user_id,
min(to_timestamp_micros(arrow_cast(data.time_us, 'Int64'))) AS first_post_ts
FROM bluesky
WHERE data.kind = 'commit'
AND data.commit.operation = 'create'
AND data.commit.collection = 'app.bsky.feed.post'
GROUP BY user_id
ORDER BY first_post_ts ASC, user_id DESC
LIMIT 3;
-- Query 5:
SELECT data.did::String as user_id,
date_part(
'epoch',
max(to_timestamp_micros(arrow_cast(data.time_us, 'Int64'))) -
min(to_timestamp_micros(arrow_cast(data.time_us, 'Int64')))
) AS activity_span
FROM bluesky
WHERE data.kind = 'commit'
AND data.commit.operation = 'create'
AND data.commit.collection = 'app.bsky.feed.post'
GROUP BY user_id
ORDER BY activity_span DESC, user_id DESC
LIMIT 3;
-- SQLNESS REPLACE (peers.*) REDACTED
EXPLAIN
SELECT date_part('hour', to_timestamp_micros(arrow_cast(data.time_us, 'Int64'))) as hour_of_day
FROM bluesky;