mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-14 03:50:39 +00:00
json2
(?) data-driven concretize (?) select (?) compaction Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
@@ -355,7 +355,7 @@ pub fn merge_record_batches(schema: SchemaRef, batches: &[RecordBatch]) -> Resul
|
||||
Ok(RecordBatch::from_df_record_batch(schema, record_batch))
|
||||
}
|
||||
|
||||
fn maybe_align_json_array_with_schema(
|
||||
pub fn maybe_align_json_array_with_schema(
|
||||
schema: &ArrowSchemaRef,
|
||||
arrays: Vec<ArrayRef>,
|
||||
) -> Result<Vec<ArrayRef>> {
|
||||
|
||||
@@ -12,6 +12,8 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#![recursion_limit = "256"]
|
||||
|
||||
pub mod alive_keeper;
|
||||
pub mod config;
|
||||
pub mod datanode;
|
||||
|
||||
@@ -26,12 +26,12 @@ use std::sync::Arc;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::{Map, Value as Json};
|
||||
use snafu::{OptionExt, ResultExt, ensure};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
|
||||
use crate::error::{self, InvalidJsonSnafu, Result, SerializeSnafu};
|
||||
use crate::json::value::{JsonValue, JsonVariant};
|
||||
use crate::types::json_type::{JsonNativeType, JsonNumberType, JsonObjectType};
|
||||
use crate::types::{StructField, StructType};
|
||||
use crate::types::{JsonType, StructField, StructType};
|
||||
use crate::value::{ListValue, StructValue, Value};
|
||||
|
||||
/// The configuration of JSON encoding
|
||||
@@ -305,32 +305,45 @@ fn encode_json_array_with_context<'a>(
|
||||
) -> Result<JsonValue> {
|
||||
let json_array_len = json_array.len();
|
||||
let mut items = Vec::with_capacity(json_array_len);
|
||||
let mut element_type = item_type.cloned();
|
||||
|
||||
for (index, value) in json_array.into_iter().enumerate() {
|
||||
let array_context = context.with_key(&index.to_string());
|
||||
let item_value =
|
||||
encode_json_value_with_context(value, element_type.as_ref(), &array_context)?;
|
||||
let item_type = item_value.json_type().native_type().clone();
|
||||
items.push(item_value.into_variant());
|
||||
let item_value = encode_json_value_with_context(value, None, &array_context)?;
|
||||
items.push(item_value);
|
||||
}
|
||||
|
||||
// Determine the common type for the list
|
||||
if let Some(current_type) = &element_type {
|
||||
// It's valid for json array to have different types of items, for example,
|
||||
// ["a string", 1]. However, the `JsonValue` will be converted to Arrow list array,
|
||||
// which requires all items have exactly same type. So we forbid the different types
|
||||
// case here. Besides, it's not common for items in a json array to differ. So I think
|
||||
// we are good here.
|
||||
ensure!(
|
||||
item_type == *current_type,
|
||||
error::InvalidJsonSnafu {
|
||||
value: "all items in json array must have the same type"
|
||||
}
|
||||
);
|
||||
} else {
|
||||
element_type = Some(item_type);
|
||||
// In specification, it's valid for a JSON array to have different types of items, for example,
|
||||
// ["a string", 1]. However, in implementation, the `JsonValue` will be converted to Arrow list
|
||||
// array, which requires all items have exactly the same type. So we merge out the maybe
|
||||
// different item types to a unified type, and align all the item values to it.
|
||||
|
||||
let provided_item_type = item_type.map(|x| JsonType::new_json2(x.clone()));
|
||||
let merged_item_type = if let Some((first, rests)) = items.split_first() {
|
||||
let mut merged = first.json_type().clone();
|
||||
for rest in rests.iter().map(|x| x.json_type()) {
|
||||
merged.merge(rest)?;
|
||||
}
|
||||
Some(merged)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let unified_item_type = match (provided_item_type, merged_item_type) {
|
||||
(Some(mut x), Some(y)) => {
|
||||
x.merge(&y)?;
|
||||
Some(x)
|
||||
}
|
||||
(Some(x), None) | (None, Some(x)) => Some(x),
|
||||
(None, None) => None,
|
||||
};
|
||||
if let Some(unified_item_type) = unified_item_type {
|
||||
for item in &mut items {
|
||||
item.try_align(&unified_item_type)?;
|
||||
}
|
||||
}
|
||||
let items = items
|
||||
.into_iter()
|
||||
.map(|x| x.into_variant())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
Ok(JsonValue::new(JsonVariant::Array(items)))
|
||||
}
|
||||
@@ -729,7 +742,7 @@ where
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use common_base::bytes::Bytes;
|
||||
use serde_json::json;
|
||||
|
||||
use super::*;
|
||||
@@ -1050,11 +1063,21 @@ mod tests {
|
||||
fn test_encode_json_array_mixed_types() {
|
||||
let json = json!([1, "hello", true, 3.15]);
|
||||
let settings = JsonStructureSettings::Structured(None);
|
||||
let result = settings.encode_with_type(json, None);
|
||||
assert_eq!(
|
||||
result.unwrap_err().to_string(),
|
||||
"Invalid JSON: all items in json array must have the same type"
|
||||
);
|
||||
let result = settings
|
||||
.encode_with_type(json, None)
|
||||
.unwrap()
|
||||
.into_json_inner()
|
||||
.unwrap();
|
||||
|
||||
if let Value::List(list_value) = result {
|
||||
assert_eq!(list_value.items().len(), 4);
|
||||
assert_eq!(
|
||||
list_value.datatype(),
|
||||
Arc::new(ConcreteDataType::binary_datatype())
|
||||
);
|
||||
} else {
|
||||
panic!("Expected List value");
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -1276,12 +1299,12 @@ mod tests {
|
||||
#[test]
|
||||
fn test_encode_json_array_with_item_type() {
|
||||
let json = json!([1, 2, 3]);
|
||||
let item_type = Arc::new(ConcreteDataType::uint64_datatype());
|
||||
let item_type = Arc::new(ConcreteDataType::int64_datatype());
|
||||
let settings = JsonStructureSettings::Structured(None);
|
||||
let result = settings
|
||||
.encode_with_type(
|
||||
json,
|
||||
Some(&JsonNativeType::Array(Box::new(JsonNativeType::u64()))),
|
||||
Some(&JsonNativeType::Array(Box::new(JsonNativeType::i64()))),
|
||||
)
|
||||
.unwrap()
|
||||
.into_json_inner()
|
||||
@@ -1289,9 +1312,9 @@ mod tests {
|
||||
|
||||
if let Value::List(list_value) = result {
|
||||
assert_eq!(list_value.items().len(), 3);
|
||||
assert_eq!(list_value.items()[0], Value::UInt64(1));
|
||||
assert_eq!(list_value.items()[1], Value::UInt64(2));
|
||||
assert_eq!(list_value.items()[2], Value::UInt64(3));
|
||||
assert_eq!(list_value.items()[0], Value::Int64(1));
|
||||
assert_eq!(list_value.items()[1], Value::Int64(2));
|
||||
assert_eq!(list_value.items()[2], Value::Int64(3));
|
||||
assert_eq!(list_value.datatype(), item_type);
|
||||
} else {
|
||||
panic!("Expected List value");
|
||||
@@ -2249,11 +2272,32 @@ mod tests {
|
||||
)])),
|
||||
);
|
||||
|
||||
let decoded_struct = settings.decode_struct(array_struct);
|
||||
assert_eq!(
|
||||
decoded_struct.unwrap_err().to_string(),
|
||||
"Invalid JSON: all items in json array must have the same type"
|
||||
);
|
||||
let decoded_struct = settings.decode_struct(array_struct).unwrap();
|
||||
let fields = decoded_struct.struct_type().fields();
|
||||
let decoded_fields: Vec<&str> = fields.iter().map(|f| f.name()).collect();
|
||||
assert!(decoded_fields.contains(&"value"));
|
||||
|
||||
if let Value::List(list_value) = &decoded_struct.items()[0] {
|
||||
assert_eq!(list_value.items().len(), 4);
|
||||
assert_eq!(
|
||||
list_value.items()[0],
|
||||
Value::Binary(Bytes::from("1".as_bytes()))
|
||||
);
|
||||
assert_eq!(
|
||||
list_value.items()[1],
|
||||
Value::Binary(Bytes::from(r#""hello""#.as_bytes()))
|
||||
);
|
||||
assert_eq!(
|
||||
list_value.items()[2],
|
||||
Value::Binary(Bytes::from("true".as_bytes()))
|
||||
);
|
||||
assert_eq!(
|
||||
list_value.items()[3],
|
||||
Value::Binary(Bytes::from("3.15".as_bytes()))
|
||||
);
|
||||
} else {
|
||||
panic!("Expected array to be decoded as ListValue");
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -161,12 +161,18 @@ impl JsonVariant {
|
||||
};
|
||||
JsonNativeType::Array(Box::new(item_type))
|
||||
}
|
||||
JsonVariant::Object(object) => JsonNativeType::Object(
|
||||
object
|
||||
.iter()
|
||||
.map(|(k, v)| (k.clone(), v.native_type()))
|
||||
.collect(),
|
||||
),
|
||||
JsonVariant::Object(object) => {
|
||||
if object.is_empty() {
|
||||
JsonNativeType::Null
|
||||
} else {
|
||||
JsonNativeType::Object(
|
||||
object
|
||||
.iter()
|
||||
.map(|(k, v)| (k.clone(), v.native_type()))
|
||||
.collect(),
|
||||
)
|
||||
}
|
||||
}
|
||||
JsonVariant::Variant(_) => JsonNativeType::Variant,
|
||||
}
|
||||
}
|
||||
@@ -639,12 +645,18 @@ impl JsonVariantRef<'_> {
|
||||
};
|
||||
JsonNativeType::Array(Box::new(item_type))
|
||||
}
|
||||
JsonVariantRef::Object(object) => JsonNativeType::Object(
|
||||
object
|
||||
.iter()
|
||||
.map(|(k, v)| (k.to_string(), native_type(v)))
|
||||
.collect(),
|
||||
),
|
||||
JsonVariantRef::Object(object) => {
|
||||
if object.is_empty() {
|
||||
JsonNativeType::Null
|
||||
} else {
|
||||
JsonNativeType::Object(
|
||||
object
|
||||
.iter()
|
||||
.map(|(k, v)| (k.to_string(), native_type(v)))
|
||||
.collect(),
|
||||
)
|
||||
}
|
||||
}
|
||||
JsonVariantRef::Variant(_) => JsonNativeType::Variant,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::borrow::Cow;
|
||||
use std::collections::BTreeMap;
|
||||
use std::fmt::{Debug, Display, Formatter};
|
||||
use std::str::FromStr;
|
||||
@@ -114,6 +115,14 @@ impl JsonNativeType {
|
||||
(JsonNativeType::Null, that) => that.clone(),
|
||||
(this, JsonNativeType::Null) => this,
|
||||
(this, that) if this == *that => this,
|
||||
|
||||
// (JsonNativeType::Number(x), JsonNativeType::Number(y)) => {
|
||||
// JsonNativeType::Number(match (x, y) {
|
||||
// (x, y) if x == y => *x,
|
||||
// (JsonNumberType::F64, _) | (_, JsonNumberType::F64) => JsonNumberType::F64,
|
||||
// _ => JsonNumberType::I64,
|
||||
// })
|
||||
// }
|
||||
_ => JsonNativeType::Variant,
|
||||
};
|
||||
}
|
||||
@@ -376,6 +385,23 @@ pub(crate) fn plain_json_struct_type(item_type: ConcreteDataType) -> StructType
|
||||
StructType::new(Arc::new(vec![field]))
|
||||
}
|
||||
|
||||
pub fn merge_as_json_type<'a>(
|
||||
left: &'a ArrowDataType,
|
||||
right: &ArrowDataType,
|
||||
) -> Cow<'a, ArrowDataType> {
|
||||
if left == right {
|
||||
return Cow::Borrowed(left);
|
||||
}
|
||||
|
||||
let mut left = JsonType::from(left);
|
||||
let right = JsonType::from(right);
|
||||
Cow::Owned(if left.merge(&right).is_ok() {
|
||||
left.as_arrow_type()
|
||||
} else {
|
||||
ArrowDataType::Utf8
|
||||
})
|
||||
}
|
||||
|
||||
impl From<&ArrowDataType> for JsonType {
|
||||
fn from(t: &ArrowDataType) -> Self {
|
||||
JsonType::new_json2(JsonNativeType::from(&ConcreteDataType::from_arrow_type(t)))
|
||||
@@ -807,18 +833,15 @@ mod tests {
|
||||
Ok(r#""<Bool>""#),
|
||||
)?;
|
||||
|
||||
// Identical number categories should stay as Number.
|
||||
test(
|
||||
"1",
|
||||
&mut JsonType::new_json2(JsonNativeType::i64()),
|
||||
Ok(r#""<Number>""#),
|
||||
)?;
|
||||
|
||||
// Conflicting number categories should be lifted to Variant.
|
||||
test(
|
||||
"1.5",
|
||||
&mut JsonType::new_json2(JsonNativeType::i64()),
|
||||
Ok(r#""<Variant>""#),
|
||||
Ok(r#""<Number>""#),
|
||||
)?;
|
||||
|
||||
// Object merge should preserve existing fields and append missing fields.
|
||||
|
||||
@@ -24,9 +24,10 @@ use arrow_schema::{DataType, FieldRef};
|
||||
use serde_json::Value;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
|
||||
use crate::arrow_array::{StringArray, binary_array_value, string_array_value};
|
||||
use crate::arrow_array::{MutableBinaryArray, StringArray, binary_array_value, string_array_value};
|
||||
use crate::error::{
|
||||
AlignJsonArraySnafu, ArrowComputeSnafu, DeserializeSnafu, InvalidJsonSnafu, Result,
|
||||
SerializeSnafu,
|
||||
};
|
||||
|
||||
pub struct JsonArray<'a> {
|
||||
@@ -177,7 +178,39 @@ impl JsonArray<'_> {
|
||||
Ok(Arc::new(json_array))
|
||||
}
|
||||
|
||||
fn try_decode_variant(&self) -> Result<ArrayRef> {
|
||||
let json_values = (0..self.inner.len())
|
||||
.map(|i| self.try_get_value(i))
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
|
||||
let serialized_values = json_values
|
||||
.iter()
|
||||
.map(|value| {
|
||||
(!value.is_null())
|
||||
.then(|| serde_json::to_vec(value))
|
||||
.transpose()
|
||||
})
|
||||
.collect::<std::result::Result<Vec<_>, _>>()
|
||||
.context(SerializeSnafu)?;
|
||||
let total_bytes = serialized_values.iter().flatten().map(Vec::len).sum();
|
||||
|
||||
let mut builder = MutableBinaryArray::with_capacity(self.inner.len(), total_bytes);
|
||||
for serialized_value in serialized_values {
|
||||
if let Some(bytes) = serialized_value {
|
||||
builder.append_value(bytes);
|
||||
} else {
|
||||
builder.append_null();
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Arc::new(builder.finish()))
|
||||
}
|
||||
|
||||
fn try_cast(&self, to_type: &DataType) -> Result<ArrayRef> {
|
||||
if matches!(to_type, DataType::Binary) {
|
||||
return self.try_decode_variant();
|
||||
}
|
||||
|
||||
if compute::can_cast_types(self.inner.data_type(), to_type) {
|
||||
return compute::cast(&self.inner, to_type).context(ArrowComputeSnafu);
|
||||
}
|
||||
|
||||
@@ -229,6 +229,7 @@ fn bulk_part_converter(c: &mut Criterion) {
|
||||
&FlatSchemaOptions {
|
||||
raw_pk_columns: false,
|
||||
string_pk_use_dict: false,
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
let mut converter = BulkPartConverter::new(&metadata, schema, rows, codec, false);
|
||||
@@ -255,6 +256,7 @@ fn bulk_part_converter(c: &mut Criterion) {
|
||||
&FlatSchemaOptions {
|
||||
raw_pk_columns: true,
|
||||
string_pk_use_dict: true,
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
let mut converter =
|
||||
|
||||
@@ -1008,6 +1008,10 @@ impl CompactionSstReaderBuilder<'_> {
|
||||
|
||||
fn build_scan_input(self) -> Result<ScanInput> {
|
||||
let mapper = FlatProjectionMapper::all(&self.metadata)?;
|
||||
|
||||
// ScanInput::collect_parquet_record_batch_schemas()
|
||||
// ScanInput::concretize_json_types()
|
||||
|
||||
let mut scan_input = ScanInput::new(self.sst_layer, mapper)
|
||||
.with_files(self.inputs.to_vec())
|
||||
.with_append_mode(self.append_mode)
|
||||
|
||||
@@ -567,14 +567,16 @@ impl RegionFlushTask {
|
||||
write_opts: &WriteOptions,
|
||||
mem_ranges: MemtableRanges,
|
||||
) -> Result<FlushFlatMemResult> {
|
||||
let batch_schema = to_flat_sst_arrow_schema(
|
||||
&version.metadata,
|
||||
&FlatSchemaOptions::from_encoding(version.metadata.primary_key_encoding),
|
||||
);
|
||||
// let memtable_schema = mem_ranges
|
||||
// .schema()
|
||||
// .unwrap_or_else(|| version.metadata.schema.arrow_schema().clone());
|
||||
|
||||
let options = FlatSchemaOptions::from_encoding(version.metadata.primary_key_encoding);
|
||||
let batch_schema = to_flat_sst_arrow_schema(&version.metadata, &options);
|
||||
let field_column_start =
|
||||
flat_format::field_column_start(&version.metadata, batch_schema.fields().len());
|
||||
let flat_sources = memtable_flat_sources(
|
||||
batch_schema,
|
||||
batch_schema.clone(),
|
||||
mem_ranges,
|
||||
&version.options,
|
||||
field_column_start,
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
//! Memtables are write buffers for regions.
|
||||
|
||||
use std::borrow::Cow;
|
||||
use std::collections::BTreeMap;
|
||||
use std::fmt;
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
@@ -62,6 +63,10 @@ pub use bulk::part::{
|
||||
BulkPart, BulkPartEncoder, BulkPartMeta, UnorderedPart, record_batch_estimated_size,
|
||||
sort_primary_key_record_batch,
|
||||
};
|
||||
use datatypes::arrow::datatypes::{Schema, SchemaRef};
|
||||
use datatypes::extension::json;
|
||||
use datatypes::schema::ext::ArrowSchemaExt;
|
||||
use datatypes::types::json_type;
|
||||
#[cfg(any(test, feature = "test"))]
|
||||
pub use time_partition::filter_record_batch;
|
||||
|
||||
@@ -228,6 +233,56 @@ impl MemtableRanges {
|
||||
.max()
|
||||
.unwrap_or(0)
|
||||
}
|
||||
|
||||
#[expect(unused)]
|
||||
pub(crate) fn schema(&self) -> Option<SchemaRef> {
|
||||
let mut schemas = self
|
||||
.ranges
|
||||
.values()
|
||||
.filter_map(|x| x.record_batch_schema())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
if schemas.iter().all(|x| !x.has_json_extension_field()) {
|
||||
// If there are no JSON extension fields in any schemas, the invariant must be hold,
|
||||
// that all schemas are same (they are all derived from same region metadata).
|
||||
// So it's ok to return the first one as the schema of the whole memtable ranges.
|
||||
return (!schemas.is_empty()).then(|| schemas.swap_remove(0));
|
||||
}
|
||||
|
||||
// If there are JSON extension fields, by convention, only their concrete data types
|
||||
// (Arrow's Struct) may differ. Other things like the metadata or the fields count are same.
|
||||
// So to produce the final schema, we can solely merge the data types.
|
||||
schemas
|
||||
.split_first()
|
||||
.map(|(first, rest)| merge_json_extension_fields(first, rest))
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn merge_json_extension_fields(base: &SchemaRef, others: &[SchemaRef]) -> SchemaRef {
|
||||
let mut fields = base.fields().iter().cloned().collect::<Vec<_>>();
|
||||
for (i, field) in fields.iter_mut().enumerate() {
|
||||
if !json::is_json_extension_type(field) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let merged = others
|
||||
.iter()
|
||||
.map(|x| Cow::Borrowed(x.field(i).data_type()))
|
||||
.reduce(|acc, e| {
|
||||
Cow::Owned(json_type::merge_as_json_type(acc.as_ref(), e.as_ref()).into_owned())
|
||||
});
|
||||
if let Some(merged) = merged
|
||||
&& field.data_type() != merged.as_ref()
|
||||
{
|
||||
let merged =
|
||||
json_type::merge_as_json_type(field.data_type(), merged.as_ref()).into_owned();
|
||||
|
||||
let mut new = field.as_ref().clone();
|
||||
new.set_data_type(merged);
|
||||
*field = Arc::new(new);
|
||||
}
|
||||
}
|
||||
Arc::new(Schema::new_with_metadata(fields, base.metadata().clone()))
|
||||
}
|
||||
|
||||
impl IterBuilder for MemtableRanges {
|
||||
@@ -558,6 +613,11 @@ pub trait IterBuilder: Send + Sync {
|
||||
.fail()
|
||||
}
|
||||
|
||||
/// Returns the schema of record batches produced by this iterator.
|
||||
fn record_batch_schema(&self) -> Option<SchemaRef> {
|
||||
None
|
||||
}
|
||||
|
||||
/// Returns the [EncodedRange] if the range is already encoded into SST.
|
||||
fn encoded_range(&self) -> Option<EncodedRange> {
|
||||
None
|
||||
@@ -735,6 +795,11 @@ impl MemtableRange {
|
||||
self.context.builder.is_record_batch()
|
||||
}
|
||||
|
||||
/// Returns the schema of record batches if this range supports record batch iteration.
|
||||
pub fn record_batch_schema(&self) -> Option<SchemaRef> {
|
||||
self.context.builder.record_batch_schema()
|
||||
}
|
||||
|
||||
pub fn num_rows(&self) -> usize {
|
||||
self.stats.num_rows
|
||||
}
|
||||
|
||||
@@ -816,6 +816,10 @@ impl IterBuilder for BulkRangeIterBuilder {
|
||||
fn encoded_range(&self) -> Option<EncodedRange> {
|
||||
None
|
||||
}
|
||||
|
||||
fn record_batch_schema(&self) -> Option<SchemaRef> {
|
||||
Some(self.part.batch.schema())
|
||||
}
|
||||
}
|
||||
|
||||
impl IterBuilder for MultiBulkRangeIterBuilder {
|
||||
@@ -848,6 +852,10 @@ impl IterBuilder for MultiBulkRangeIterBuilder {
|
||||
fn encoded_range(&self) -> Option<EncodedRange> {
|
||||
None
|
||||
}
|
||||
|
||||
fn record_batch_schema(&self) -> Option<SchemaRef> {
|
||||
self.part.record_batch_schema()
|
||||
}
|
||||
}
|
||||
|
||||
/// Iterator builder for encoded bulk range
|
||||
|
||||
@@ -433,7 +433,7 @@ impl UnorderedPart {
|
||||
return Ok(Some(self.parts[0].batch.clone()));
|
||||
}
|
||||
|
||||
// Get the schema from the first part
|
||||
// Get the schema from the first part and normalize JSON2 columns across all parts.
|
||||
let schema = self.parts[0].batch.schema();
|
||||
let concatenated = if schema.has_json_extension_field() {
|
||||
let (schema, batches) = align_parts(&self.parts)?;
|
||||
@@ -1608,6 +1608,11 @@ impl MultiBulkPart {
|
||||
self.series_count
|
||||
}
|
||||
|
||||
/// Returns the schema of batches in this part.
|
||||
pub(crate) fn record_batch_schema(&self) -> Option<SchemaRef> {
|
||||
self.batches.first().map(|batch| batch.schema())
|
||||
}
|
||||
|
||||
/// Returns the number of record batches in this part.
|
||||
pub fn num_batches(&self) -> usize {
|
||||
self.batches.len()
|
||||
@@ -2115,6 +2120,7 @@ mod tests {
|
||||
&FlatSchemaOptions {
|
||||
raw_pk_columns: false,
|
||||
string_pk_use_dict: true,
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
|
||||
@@ -2552,6 +2558,7 @@ mod tests {
|
||||
&FlatSchemaOptions {
|
||||
raw_pk_columns: false,
|
||||
string_pk_use_dict: true,
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
|
||||
|
||||
@@ -1128,8 +1128,7 @@ impl FlatSource {
|
||||
}
|
||||
}
|
||||
|
||||
#[expect(unused)]
|
||||
fn schema(&self) -> &SchemaRef {
|
||||
pub(crate) fn schema(&self) -> &SchemaRef {
|
||||
&self.schema
|
||||
}
|
||||
|
||||
|
||||
@@ -26,12 +26,14 @@ use datatypes::arrow::datatypes::{ArrowNativeType, BinaryType, DataType, SchemaR
|
||||
use datatypes::arrow::error::ArrowError;
|
||||
use datatypes::arrow::record_batch::RecordBatch;
|
||||
use datatypes::arrow_array::BinaryArray;
|
||||
use datatypes::extension::json::is_json_extension_type;
|
||||
use datatypes::timestamp::timestamp_array_to_primitive;
|
||||
use datatypes::vectors::json::array::JsonArray;
|
||||
use futures::{Stream, TryStreamExt};
|
||||
use snafu::ResultExt;
|
||||
use store_api::storage::SequenceNumber;
|
||||
|
||||
use crate::error::{ComputeArrowSnafu, Result};
|
||||
use crate::error::{ComputeArrowSnafu, DataTypeMismatchSnafu, Result};
|
||||
use crate::memtable::BoxedRecordBatchIterator;
|
||||
use crate::metrics::READ_STAGE_ELAPSED;
|
||||
use crate::read::BoxedRecordBatchStream;
|
||||
@@ -258,14 +260,29 @@ impl BatchBuilder {
|
||||
|
||||
check_interleave_overflow(&self.batches, &self.schema, &self.indices)?;
|
||||
|
||||
let columns = (0..self.schema.fields.len())
|
||||
.map(|column_idx| {
|
||||
let arrays: Vec<_> = self
|
||||
let columns = self
|
||||
.schema
|
||||
.fields()
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(column_idx, field)| {
|
||||
let arrays = self
|
||||
.batches
|
||||
.iter()
|
||||
.map(|(_, batch)| batch.column(column_idx).as_ref())
|
||||
.collect();
|
||||
interleave(&arrays, &self.indices).context(ComputeArrowSnafu)
|
||||
.map(|(_, batch)| {
|
||||
let column = batch.column(column_idx);
|
||||
let column = if is_json_extension_type(field) {
|
||||
JsonArray::from(column)
|
||||
.try_align(field.data_type())
|
||||
.context(DataTypeMismatchSnafu)?
|
||||
} else {
|
||||
column.clone()
|
||||
};
|
||||
Ok(column)
|
||||
})
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
let aligned = arrays.iter().map(|x| x.as_ref()).collect::<Vec<_>>();
|
||||
interleave(&aligned, &self.indices).context(ComputeArrowSnafu)
|
||||
})
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
|
||||
|
||||
@@ -49,6 +49,7 @@ use crate::sst::{
|
||||
///
|
||||
/// This mapper support duplicate and unsorted projection indices.
|
||||
/// The output schema is determined by the projection indices.
|
||||
#[derive(Clone)]
|
||||
pub struct FlatProjectionMapper {
|
||||
/// Metadata of the region.
|
||||
metadata: RegionMetadataRef,
|
||||
@@ -228,10 +229,8 @@ impl FlatProjectionMapper {
|
||||
self.input_arrow_schema.clone()
|
||||
} else {
|
||||
// For compaction, we need to build a different schema from encoding.
|
||||
to_flat_sst_arrow_schema(
|
||||
&self.metadata,
|
||||
&FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding),
|
||||
)
|
||||
let options = FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding);
|
||||
to_flat_sst_arrow_schema(&self.metadata, &options)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
//! Structs for partition ranges.
|
||||
|
||||
use common_time::Timestamp;
|
||||
use datatypes::arrow::datatypes::SchemaRef;
|
||||
use smallvec::{SmallVec, smallvec};
|
||||
use store_api::region_engine::PartitionRange;
|
||||
use store_api::storage::TimeSeriesDistribution;
|
||||
@@ -478,6 +479,12 @@ impl MemRangeBuilder {
|
||||
pub(crate) fn stats(&self) -> &MemtableStats {
|
||||
&self.stats
|
||||
}
|
||||
|
||||
/// Returns the record batch schema for this memtable range if available.
|
||||
#[expect(unused)]
|
||||
pub(crate) fn record_batch_schema(&self) -> Option<SchemaRef> {
|
||||
self.range.record_batch_schema()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -27,6 +27,7 @@ use common_recordbatch::filter::SimpleFilterEvaluator;
|
||||
use common_telemetry::tracing::Instrument;
|
||||
use common_telemetry::{debug, error, tracing, warn};
|
||||
use common_time::range::TimestampRange;
|
||||
use datafusion::parquet::arrow::parquet_to_arrow_schema;
|
||||
use datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr;
|
||||
use datafusion_common::Column;
|
||||
use datafusion_expr::Expr;
|
||||
@@ -37,6 +38,7 @@ use datatypes::schema::Schema;
|
||||
use datatypes::schema::ext::ArrowSchemaExt;
|
||||
use datatypes::types::json_type::JsonNativeType;
|
||||
use futures::StreamExt;
|
||||
use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData};
|
||||
use partition::expr::PartitionExpr;
|
||||
use smallvec::SmallVec;
|
||||
use snafu::ResultExt;
|
||||
@@ -51,9 +53,9 @@ use tokio::sync::{Semaphore, mpsc};
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
|
||||
use crate::access_layer::AccessLayerRef;
|
||||
use crate::cache::CacheStrategy;
|
||||
use crate::cache::{CacheStrategy, CachedSstMeta};
|
||||
use crate::config::DEFAULT_MAX_CONCURRENT_SCAN_FILES;
|
||||
use crate::error::{InvalidPartitionExprSnafu, Result};
|
||||
use crate::error::{InvalidMetaSnafu, InvalidPartitionExprSnafu, Result};
|
||||
#[cfg(feature = "enterprise")]
|
||||
use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider};
|
||||
use crate::memtable::{MemtableRange, RangesOptions};
|
||||
@@ -83,7 +85,8 @@ use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBui
|
||||
#[cfg(feature = "vector_index")]
|
||||
use crate::sst::index::vector_index::applier::{VectorIndexApplier, VectorIndexApplierRef};
|
||||
use crate::sst::parquet::file_range::PreFilterMode;
|
||||
use crate::sst::parquet::reader::ReaderMetrics;
|
||||
use crate::sst::parquet::metadata::MetadataLoader;
|
||||
use crate::sst::parquet::reader::{MetadataCacheMetrics, ReaderMetrics};
|
||||
|
||||
#[cfg(feature = "vector_index")]
|
||||
const VECTOR_INDEX_OVERFETCH_MULTIPLIER: usize = 2;
|
||||
@@ -743,6 +746,20 @@ fn concretize_json_types(
|
||||
return;
|
||||
}
|
||||
|
||||
// let memtable_schemas = input
|
||||
// .memtables
|
||||
// .iter()
|
||||
// .filter_map(|mem| mem.record_batch_schema())
|
||||
// .collect::<Vec<_>>();
|
||||
// let parquet_schemas = input.collect_parquet_record_batch_schemas().await?;
|
||||
// if memtable_schemas.is_empty()
|
||||
// && parquet_schemas.is_empty()
|
||||
// // TODO(LFC): If we can concrete json2 type solely by query-driven hint, we can skip data-driven concretize.
|
||||
// && input.json2_column_types.is_empty()
|
||||
// {
|
||||
// return Ok(input);
|
||||
// }
|
||||
|
||||
let mut column_schemas = output_schema.column_schemas().to_vec();
|
||||
for (idx, column_schema) in column_schemas.iter_mut().enumerate() {
|
||||
if !is_json_extension_type(&output_arrow_schema.fields()[idx]) {
|
||||
@@ -751,6 +768,18 @@ fn concretize_json_types(
|
||||
let Some(json_type) = json_type_hint.get(&column_schema.name) else {
|
||||
continue;
|
||||
};
|
||||
|
||||
// for schema in &memtable_schemas {
|
||||
// if let Some((_, field)) = schema.column_with_name(&column_schema.name) {
|
||||
// merge_json_type_candidate(&mut merged, field.data_type());
|
||||
// }
|
||||
// }
|
||||
// for schema in parquet_schemas.iter() {
|
||||
// if let Some((_, field)) = schema.as_ref().column_with_name(&column_schema.name) {
|
||||
// merge_json_type_candidate(&mut merged, field.data_type());
|
||||
// }
|
||||
// }
|
||||
|
||||
column_schema.data_type = ConcreteDataType::from(json_type);
|
||||
}
|
||||
|
||||
@@ -761,6 +790,42 @@ fn concretize_json_types(
|
||||
mapper.with_output_schema(output_schema);
|
||||
}
|
||||
|
||||
// fn merge_json_type_candidate(merged: &mut Option<ArrowDataType>, candidate: &ArrowDataType) {
|
||||
// match merged {
|
||||
// Some(current) => {
|
||||
// *current = json_type::merge_as_json_type(current, candidate).into_owned();
|
||||
// }
|
||||
// None => {
|
||||
// *merged = Some(candidate.clone());
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
async fn read_or_load_parquet_metadata(
|
||||
file: &FileHandle,
|
||||
access_layer: &AccessLayerRef,
|
||||
cache_strategy: &CacheStrategy,
|
||||
) -> Result<Arc<ParquetMetaData>> {
|
||||
let mut metrics = MetadataCacheMetrics::default();
|
||||
if let Some(metadata) = cache_strategy
|
||||
.get_sst_meta_data(file.file_id(), &mut metrics, PageIndexPolicy::default())
|
||||
.await
|
||||
{
|
||||
return Ok(metadata.parquet_metadata());
|
||||
}
|
||||
|
||||
let file_path = file.file_path(access_layer.table_dir(), access_layer.path_type());
|
||||
let file_size = file.meta_ref().file_size;
|
||||
let metadata = MetadataLoader::new(access_layer.object_store().clone(), &file_path, file_size)
|
||||
.load(&mut metrics)
|
||||
.await
|
||||
.and_then(|x| CachedSstMeta::try_new(&file_path, x))
|
||||
.map(Arc::new)?;
|
||||
cache_strategy.put_sst_meta_data(file.file_id(), metadata.clone());
|
||||
|
||||
Ok(metadata.parquet_metadata())
|
||||
}
|
||||
|
||||
/// Returns true if the time range of a SST `file` matches the `predicate`.
|
||||
fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
|
||||
if predicate == &TimestampRange::min_to_max() {
|
||||
@@ -1269,6 +1334,36 @@ impl ScanInput {
|
||||
|
||||
pre_filter_mode(self.append_mode, self.merge_mode)
|
||||
}
|
||||
|
||||
#[expect(unused)]
|
||||
pub(crate) async fn collect_parquet_record_batch_schemas(
|
||||
&self,
|
||||
) -> Result<Vec<datatypes::arrow::datatypes::SchemaRef>> {
|
||||
let mut schemas = Vec::with_capacity(self.files.len());
|
||||
for file in &self.files {
|
||||
let parquet_metadata =
|
||||
read_or_load_parquet_metadata(file, &self.access_layer, &self.cache_strategy)
|
||||
.await?;
|
||||
let file_metadata = parquet_metadata.file_metadata();
|
||||
let arrow_schema = parquet_to_arrow_schema(
|
||||
file_metadata.schema_descr(),
|
||||
file_metadata.key_value_metadata(),
|
||||
)
|
||||
.map_err(|e| {
|
||||
InvalidMetaSnafu {
|
||||
reason: format!(
|
||||
"Failed to convert parquet metadata to arrow schema, file: {}, error: {e}",
|
||||
file.file_id()
|
||||
),
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
if arrow_schema.has_json_extension_field() {
|
||||
schemas.push(Arc::new(arrow_schema));
|
||||
}
|
||||
}
|
||||
Ok(schemas)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "enterprise")]
|
||||
|
||||
@@ -130,7 +130,7 @@ impl SeqScan {
|
||||
///
|
||||
/// # Panics
|
||||
/// Panics if the compaction flag is not set.
|
||||
pub async fn build_flat_reader_for_compaction(&self) -> Result<BoxedRecordBatchStream> {
|
||||
pub(crate) async fn build_flat_reader_for_compaction(&self) -> Result<BoxedRecordBatchStream> {
|
||||
assert!(self.stream_ctx.input.compaction);
|
||||
|
||||
let metrics_set = ExecutionPlanMetricsSet::new();
|
||||
|
||||
@@ -108,6 +108,7 @@ pub struct FlatSchemaOptions {
|
||||
/// when storing primary key columns.
|
||||
/// Only takes effect when `raw_pk_columns` is true.
|
||||
pub string_pk_use_dict: bool,
|
||||
pub override_schema: Option<SchemaRef>,
|
||||
}
|
||||
|
||||
impl Default for FlatSchemaOptions {
|
||||
@@ -115,6 +116,7 @@ impl Default for FlatSchemaOptions {
|
||||
Self {
|
||||
raw_pk_columns: true,
|
||||
string_pk_use_dict: true,
|
||||
override_schema: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -128,6 +130,7 @@ impl FlatSchemaOptions {
|
||||
Self {
|
||||
raw_pk_columns: false,
|
||||
string_pk_use_dict: false,
|
||||
override_schema: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -594,7 +594,7 @@ mod tests {
|
||||
|
||||
let writer_props = props_builder.build();
|
||||
|
||||
let write_format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default());
|
||||
let write_format = FlatWriteFormat::new(metadata.schema.arrow_schema().clone());
|
||||
let fields: Vec<_> = write_format
|
||||
.arrow_schema()
|
||||
.fields()
|
||||
|
||||
@@ -49,7 +49,7 @@ use store_api::storage::{ColumnId, SequenceNumber};
|
||||
|
||||
use crate::error::{
|
||||
ComputeArrowSnafu, DecodeSnafu, InvalidParquetSnafu, InvalidRecordBatchSnafu,
|
||||
NewRecordBatchSnafu, Result,
|
||||
NewRecordBatchSnafu, RecordBatchSnafu, Result,
|
||||
};
|
||||
use crate::read::read_columns::ReadColumns;
|
||||
use crate::sst::parquet::format::{
|
||||
@@ -71,8 +71,7 @@ pub(crate) struct FlatWriteFormat {
|
||||
|
||||
impl FlatWriteFormat {
|
||||
/// Creates a new helper.
|
||||
pub(crate) fn new(metadata: RegionMetadataRef, options: &FlatSchemaOptions) -> FlatWriteFormat {
|
||||
let arrow_schema = to_flat_sst_arrow_schema(&metadata, options);
|
||||
pub(crate) fn new(arrow_schema: SchemaRef) -> FlatWriteFormat {
|
||||
FlatWriteFormat {
|
||||
arrow_schema,
|
||||
override_sequence: None,
|
||||
@@ -106,6 +105,12 @@ impl FlatWriteFormat {
|
||||
let sequence_array = Arc::new(UInt64Array::from(vec![override_sequence; batch.num_rows()]));
|
||||
columns[sequence_column_index(batch.num_columns())] = sequence_array;
|
||||
|
||||
// let columns = common_recordbatch::recordbatch::maybe_align_json_array_with_schema(
|
||||
// &self.arrow_schema,
|
||||
// columns,
|
||||
// )
|
||||
// .context(RecordBatchSnafu)?;
|
||||
// RecordBatch::try_new(self.arrow_schema.clone(), columns).context(NewRecordBatchSnafu)
|
||||
RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1186,7 +1186,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_flat_to_sst_arrow_schema() {
|
||||
let metadata = build_test_region_metadata();
|
||||
let format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default());
|
||||
let format = FlatWriteFormat::new(metadata.schema.arrow_schema().clone());
|
||||
assert_eq!(
|
||||
&build_test_flat_sst_schema_with_field_ids(),
|
||||
format.arrow_schema()
|
||||
@@ -1209,7 +1209,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_flat_convert_batch() {
|
||||
let metadata = build_test_region_metadata();
|
||||
let format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default());
|
||||
let format = FlatWriteFormat::new(metadata.schema.arrow_schema().clone());
|
||||
|
||||
let num_rows = 4;
|
||||
let columns: Vec<ArrayRef> = input_columns_for_flat_batch(num_rows);
|
||||
@@ -1226,7 +1226,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_flat_convert_with_override_sequence() {
|
||||
let metadata = build_test_region_metadata();
|
||||
let format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default())
|
||||
let format = FlatWriteFormat::new(metadata.schema.arrow_schema().clone())
|
||||
.with_override_sequence(Some(415411));
|
||||
|
||||
let num_rows = 4;
|
||||
|
||||
@@ -22,6 +22,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::task::{Context, Poll};
|
||||
use std::time::Instant;
|
||||
|
||||
use arrow_schema::Schema;
|
||||
use common_telemetry::debug;
|
||||
use common_time::Timestamp;
|
||||
use datatypes::arrow::array::{
|
||||
@@ -31,6 +32,8 @@ use datatypes::arrow::array::{
|
||||
use datatypes::arrow::compute::{max, min};
|
||||
use datatypes::arrow::datatypes::{DataType, SchemaRef, TimeUnit};
|
||||
use datatypes::arrow::record_batch::RecordBatch;
|
||||
use datatypes::extension::json::is_json_extension_type;
|
||||
use datatypes::schema::ext::ArrowSchemaExt;
|
||||
use object_store::{FuturesAsyncWriter, ObjectStore};
|
||||
use parquet::arrow::AsyncArrowWriter;
|
||||
use parquet::basic::{Compression, Encoding, ZstdLevel};
|
||||
@@ -58,6 +61,7 @@ use crate::sst::parquet::format::PrimaryKeyWriteFormat;
|
||||
use crate::sst::parquet::{PARQUET_METADATA_KEY, SstInfo, WriteOptions};
|
||||
use crate::sst::{
|
||||
DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY, FlatSchemaOptions, SeriesEstimator,
|
||||
to_flat_sst_arrow_schema,
|
||||
};
|
||||
|
||||
/// Converts a flat RecordBatch for writing to parquet.
|
||||
@@ -271,12 +275,25 @@ where
|
||||
override_sequence: Option<SequenceNumber>,
|
||||
opts: &WriteOptions,
|
||||
) -> Result<SstInfoArray> {
|
||||
let options = FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding);
|
||||
|
||||
let mut schema = to_flat_sst_arrow_schema(&self.metadata, &options);
|
||||
if schema.has_json_extension_field() {
|
||||
let mut fields = Vec::with_capacity(schema.fields().len());
|
||||
for field in schema.fields() {
|
||||
if is_json_extension_type(field)
|
||||
&& let Some((_, override_field)) = source.schema().fields().find(field.name())
|
||||
{
|
||||
fields.push(override_field.clone());
|
||||
} else {
|
||||
fields.push(field.clone());
|
||||
}
|
||||
}
|
||||
schema = Arc::new(Schema::new_with_metadata(fields, schema.metadata().clone()));
|
||||
}
|
||||
|
||||
let converter = FlatBatchConverter::Flat(
|
||||
FlatWriteFormat::new(
|
||||
self.metadata.clone(),
|
||||
&FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding),
|
||||
)
|
||||
.with_override_sequence(override_sequence),
|
||||
FlatWriteFormat::new(schema).with_override_sequence(override_sequence),
|
||||
);
|
||||
let res = self.write_all_flat_inner(source, &converter, opts).await;
|
||||
if res.is_err() {
|
||||
|
||||
@@ -153,16 +153,10 @@ async fn query_data(frontend: &Arc<Instance>) -> io::Result<()> {
|
||||
+----------+"#;
|
||||
execute_sql_and_expect(frontend, sql, expected).await;
|
||||
|
||||
let sql = "SELECT * FROM bluesky ORDER BY time_us";
|
||||
let expected = fs::read_to_string(find_workspace_path(
|
||||
"tests-integration/resources/jsonbench-select-all.txt",
|
||||
))?;
|
||||
execute_sql_and_expect(frontend, sql, &expected).await;
|
||||
|
||||
// query 1:
|
||||
let sql = "
|
||||
SELECT
|
||||
json_get_string(data, '$.commit.collection') AS event, count() AS count
|
||||
data.commit.collection AS event, count() AS count
|
||||
FROM bluesky
|
||||
GROUP BY event
|
||||
ORDER BY count DESC, event ASC";
|
||||
@@ -180,13 +174,12 @@ ORDER BY count DESC, event ASC";
|
||||
// query 2:
|
||||
let sql = "
|
||||
SELECT
|
||||
json_get_string(data, '$.commit.collection') AS event,
|
||||
data.commit.collection AS event,
|
||||
count() AS count,
|
||||
count(DISTINCT json_get_string(data, '$.did')) AS users
|
||||
count(DISTINCT data.did) AS users
|
||||
FROM bluesky
|
||||
WHERE
|
||||
(json_get_string(data, '$.kind') = 'commit') AND
|
||||
(json_get_string(data, '$.commit.operation') = 'create')
|
||||
data.kind = 'commit' AND data.commit.operation = 'create'
|
||||
GROUP BY event
|
||||
ORDER BY count DESC, event ASC";
|
||||
let expected = r#"
|
||||
@@ -203,15 +196,14 @@ ORDER BY count DESC, event ASC";
|
||||
// query 3:
|
||||
let sql = "
|
||||
SELECT
|
||||
json_get_string(data, '$.commit.collection') AS event,
|
||||
date_part('hour', to_timestamp_micros(json_get_int(data, '$.time_us'))) as hour_of_day,
|
||||
data.commit.collection AS event,
|
||||
date_part('hour', to_timestamp_micros(arrow_cast(data.time_us, 'Int64'))) as hour_of_day,
|
||||
count() AS count
|
||||
FROM bluesky
|
||||
WHERE
|
||||
(json_get_string(data, '$.kind') = 'commit') AND
|
||||
(json_get_string(data, '$.commit.operation') = 'create') AND
|
||||
json_get_string(data, '$.commit.collection') IN
|
||||
('app.bsky.feed.post', 'app.bsky.feed.repost', 'app.bsky.feed.like')
|
||||
data.kind = 'commit' AND
|
||||
data.commit.operation = 'create' AND
|
||||
data.commit.collection in ('app.bsky.feed.post', 'app.bsky.feed.repost', 'app.bsky.feed.like')
|
||||
GROUP BY event, hour_of_day
|
||||
ORDER BY hour_of_day, event";
|
||||
let expected = r#"
|
||||
@@ -227,13 +219,13 @@ ORDER BY hour_of_day, event";
|
||||
// query 4:
|
||||
let sql = "
|
||||
SELECT
|
||||
json_get_string(data, '$.did') as user_id,
|
||||
min(to_timestamp_micros(json_get_int(data, '$.time_us'))) AS first_post_ts
|
||||
data.did::String as user_id,
|
||||
min(to_timestamp_micros(arrow_cast(data.time_us, 'Int64'))) AS first_post_ts
|
||||
FROM bluesky
|
||||
WHERE
|
||||
(json_get_string(data, '$.kind') = 'commit') AND
|
||||
(json_get_string(data, '$.commit.operation') = 'create') AND
|
||||
(json_get_string(data, '$.commit.collection') = 'app.bsky.feed.post')
|
||||
data.kind = 'commit' AND
|
||||
data.commit.operation = 'create' AND
|
||||
data.commit.collection = 'app.bsky.feed.post'
|
||||
GROUP BY user_id
|
||||
ORDER BY first_post_ts ASC, user_id DESC
|
||||
LIMIT 3";
|
||||
@@ -250,17 +242,17 @@ LIMIT 3";
|
||||
// query 5:
|
||||
let sql = "
|
||||
SELECT
|
||||
json_get_string(data, '$.did') as user_id,
|
||||
data.did::String as user_id,
|
||||
date_part(
|
||||
'epoch',
|
||||
max(to_timestamp_micros(json_get_int(data, '$.time_us'))) -
|
||||
min(to_timestamp_micros(json_get_int(data, '$.time_us')))
|
||||
max(to_timestamp_micros(arrow_cast(data.time_us, 'Int64'))) -
|
||||
min(to_timestamp_micros(arrow_cast(data.time_us, 'Int64')))
|
||||
) AS activity_span
|
||||
FROM bluesky
|
||||
WHERE
|
||||
(json_get_string(data, '$.kind') = 'commit') AND
|
||||
(json_get_string(data, '$.commit.operation') = 'create') AND
|
||||
(json_get_string(data, '$.commit.collection') = 'app.bsky.feed.post')
|
||||
data.kind = 'commit' AND
|
||||
data.commit.operation = 'create' AND
|
||||
data.commit.collection = 'app.bsky.feed.post'
|
||||
GROUP BY user_id
|
||||
ORDER BY activity_span DESC, user_id DESC
|
||||
LIMIT 3";
|
||||
@@ -304,30 +296,21 @@ async fn insert_data_by_sql(frontend: &Arc<Instance>) -> io::Result<()> {
|
||||
async fn desc_table(frontend: &Arc<Instance>) {
|
||||
let sql = "DESC TABLE bluesky";
|
||||
let expected = r#"
|
||||
+---------+------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
|
||||
| Column | Type | Key | Null | Default | Semantic Type |
|
||||
+---------+------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
|
||||
| data | Json<{"_raw":"<String>","commit.collection":"<String>","commit.operation":"<String>","did":"<String>","kind":"<String>","time_us":"<Number>"}> | | YES | | FIELD |
|
||||
| time_us | TimestampMicrosecond | PRI | NO | | TIMESTAMP |
|
||||
+---------+------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+"#;
|
||||
+---------+----------------------+-----+------+---------+---------------+
|
||||
| Column | Type | Key | Null | Default | Semantic Type |
|
||||
+---------+----------------------+-----+------+---------+---------------+
|
||||
| data | JSON2 | | YES | | FIELD |
|
||||
| time_us | TimestampMicrosecond | PRI | NO | | TIMESTAMP |
|
||||
+---------+----------------------+-----+------+---------+---------------+"#;
|
||||
execute_sql_and_expect(frontend, sql, expected).await;
|
||||
}
|
||||
|
||||
async fn create_table(frontend: &Arc<Instance>) {
|
||||
let sql = r#"
|
||||
CREATE TABLE bluesky (
|
||||
"data" JSON (
|
||||
format = "partial",
|
||||
fields = Struct<
|
||||
kind String,
|
||||
"commit.operation" String,
|
||||
"commit.collection" String,
|
||||
did String,
|
||||
time_us Bigint
|
||||
>,
|
||||
),
|
||||
"data" JSON2,
|
||||
time_us TimestampMicrosecond TIME INDEX,
|
||||
)
|
||||
) WITH ('append_mode' = 'true', 'sst_format' = 'flat')
|
||||
"#;
|
||||
execute_sql_and_expect(frontend, sql, "Affected Rows: 0").await;
|
||||
}
|
||||
|
||||
@@ -42,6 +42,14 @@ admin flush_table('json2_table');
|
||||
| 0 |
|
||||
+----------------------------------+
|
||||
|
||||
admin compact_table('json2_table', 'swcs', '86400');
|
||||
|
||||
+-----------------------------------------------------+
|
||||
| ADMIN compact_table('json2_table', 'swcs', '86400') |
|
||||
+-----------------------------------------------------+
|
||||
| 0 |
|
||||
+-----------------------------------------------------+
|
||||
|
||||
insert into json2_table
|
||||
values (7, '{"a": {"b": "s7"}, "c": [1], "d": [{"e": {"g": -0.7}}]}'),
|
||||
(8, '{"a": {"b": 8}, "c": "s8"}');
|
||||
@@ -109,6 +117,23 @@ select j.a.b from json2_table order by ts;
|
||||
| 10 |
|
||||
+-------------------------------------+
|
||||
|
||||
select j.a, j.a.x from json2_table order by ts;
|
||||
|
||||
+--------------------------------------------------+----------------------------------------------------+
|
||||
| json_get(json2_table.j,Utf8("a"),Utf8View(NULL)) | json_get(json2_table.j,Utf8("a.x"),Utf8View(NULL)) |
|
||||
+--------------------------------------------------+----------------------------------------------------+
|
||||
| {"b":1,"x":null} | |
|
||||
| {"b":-2,"x":null} | |
|
||||
| {"b":3,"x":null} | |
|
||||
| {"b":-4,"x":null} | |
|
||||
| {"b":null,"x":null} | |
|
||||
| | |
|
||||
| {"b":"s7","x":null} | |
|
||||
| {"b":8,"x":null} | |
|
||||
| {"b":null,"x":true} | true |
|
||||
| {"b":10,"x":null} | |
|
||||
+--------------------------------------------------+----------------------------------------------------+
|
||||
|
||||
select j.c, j.y from json2_table order by ts;
|
||||
|
||||
+-----------------------------------+-----------------------------------+
|
||||
@@ -126,6 +151,44 @@ select j.c, j.y from json2_table order by ts;
|
||||
| | false |
|
||||
+-----------------------------------+-----------------------------------+
|
||||
|
||||
select j from json2_table order by ts;
|
||||
|
||||
Error: 3001(EngineExecuteQuery), Invalid argument error: column types must match schema types, expected Struct() but found Struct("a": Struct("b": Binary, "x": Boolean), "c": Binary, "d": List(Struct("e": Struct("f": Float64, "g": Float64))), "y": Boolean) at column index 0
|
||||
|
||||
select * from json2_table order by ts;
|
||||
|
||||
Error: 3001(EngineExecuteQuery), Invalid argument error: column types must match schema types, expected Struct() but found Struct("a": Struct("b": Binary, "x": Boolean), "c": Binary, "d": List(Struct("e": Struct("f": Float64, "g": Float64))), "y": Boolean) at column index 1
|
||||
|
||||
select j.a.b + 1 from json2_table order by ts;
|
||||
|
||||
+------------------------------------------------------------+
|
||||
| json_get(json2_table.j,Utf8("a.b"),Int64(NULL)) + Int64(1) |
|
||||
+------------------------------------------------------------+
|
||||
| 2 |
|
||||
| -1 |
|
||||
| 4 |
|
||||
| -3 |
|
||||
| |
|
||||
| |
|
||||
| |
|
||||
| 9 |
|
||||
| |
|
||||
| 11 |
|
||||
+------------------------------------------------------------+
|
||||
|
||||
select abs(j.a.b) from json2_table order by ts;
|
||||
|
||||
Error: 3000(PlanQuery), Failed to plan SQL: Error during planning: Function 'abs' expects NativeType::Numeric but received NativeType::String No function matches the given name and argument types 'abs(Utf8View)'. You might need to add explicit type casts.
|
||||
Candidate functions:
|
||||
abs(Numeric(1))
|
||||
|
||||
-- "j.c" is of type "String", "abs" is expected to be all "null"s.
|
||||
select abs(j.c) from json2_table order by ts;
|
||||
|
||||
Error: 3000(PlanQuery), Failed to plan SQL: Error during planning: Function 'abs' expects NativeType::Numeric but received NativeType::String No function matches the given name and argument types 'abs(Utf8View)'. You might need to add explicit type casts.
|
||||
Candidate functions:
|
||||
abs(Numeric(1))
|
||||
|
||||
select j.d from json2_table order by ts;
|
||||
|
||||
+-----------------------------------+
|
||||
|
||||
@@ -22,6 +22,8 @@ values (4, '{"a": {"b": -4}, "d": [{"e": {"g": -0.4}}]}'),
|
||||
|
||||
admin flush_table('json2_table');
|
||||
|
||||
admin compact_table('json2_table', 'swcs', '86400');
|
||||
|
||||
insert into json2_table
|
||||
values (7, '{"a": {"b": "s7"}, "c": [1], "d": [{"e": {"g": -0.7}}]}'),
|
||||
(8, '{"a": {"b": 8}, "c": "s8"}');
|
||||
@@ -40,8 +42,21 @@ explain select j.a.x::bool from json2_table;
|
||||
|
||||
select j.a.b from json2_table order by ts;
|
||||
|
||||
select j.a, j.a.x from json2_table order by ts;
|
||||
|
||||
select j.c, j.y from json2_table order by ts;
|
||||
|
||||
select j from json2_table order by ts;
|
||||
|
||||
select * from json2_table order by ts;
|
||||
|
||||
select j.a.b + 1 from json2_table order by ts;
|
||||
|
||||
select abs(j.a.b) from json2_table order by ts;
|
||||
|
||||
-- "j.c" is of type "String", "abs" is expected to be all "null"s.
|
||||
select abs(j.c) from json2_table order by ts;
|
||||
|
||||
select j.d from json2_table order by ts;
|
||||
|
||||
drop table json2_table;
|
||||
|
||||
180
tests/cases/standalone/common/types/json/jsonbench.result
Normal file
180
tests/cases/standalone/common/types/json/jsonbench.result
Normal file
@@ -0,0 +1,180 @@
|
||||
CREATE TABLE bluesky (
|
||||
`data` JSON2,
|
||||
time_us TimestampMicrosecond TIME INDEX
|
||||
) WITH ('append_mode' = 'true', 'sst_format' = 'flat');
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
INSERT INTO bluesky (time_us, data)
|
||||
VALUES (1732206349000167,
|
||||
'{"did":"did:plc:yj3sjq3blzpynh27cumnp5ks","time_us":1732206349000167,"kind":"commit","commit":{"rev":"3lbhtytnn2k2f","operation":"create","collection":"app.bsky.feed.post","rkey":"3lbhtyteurk2y","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-21T16:09:27.095Z","langs":["en"],"reply":{"parent":{"cid":"bafyreibfglofvqou2yiqvwzk4rcgkhhxrbunyemshdjledgwymimqkg24e","uri":"at://did:plc:6tr6tuzlx2db3rduzr2d6r24/app.bsky.feed.post/3lbhqo2rtys2z"},"root":{"cid":"bafyreibfglofvqou2yiqvwzk4rcgkhhxrbunyemshdjledgwymimqkg24e","uri":"at://did:plc:6tr6tuzlx2db3rduzr2d6r24/app.bsky.feed.post/3lbhqo2rtys2z"}},"text":"aaaaah. LIght shines in a corner of WTF...."},"cid":"bafyreidblutgvj75o4q4akzyyejedjj6l3it6hgqwee6jpwv2wqph5fsgm"}}');
|
||||
|
||||
Affected Rows: 1
|
||||
|
||||
INSERT INTO bluesky (time_us, data)
|
||||
VALUES (1732206349000644,
|
||||
'{"did":"did:plc:3i4xf2v4wcnyktgv6satke64","time_us":1732206349000644,"kind":"commit","commit":{"rev":"3lbhuvzds6d2a","operation":"create","collection":"app.bsky.feed.like","rkey":"3lbhuvzdked2a","record":{"$type":"app.bsky.feed.like","createdAt":"2024-11-21T16:25:46.221Z","subject":{"cid":"bafyreidjvrcmckkm765mct5fph36x7kupkfo35rjklbf2k76xkzwyiauge","uri":"at://did:plc:azrv4rcbws6kmcga4fsbphg2/app.bsky.feed.post/3lbgjdpbiec2l"}},"cid":"bafyreia5l5vrkh5oj4cjyhcqby2dprhyvcyofo2q5562tijlae2pzih23m"}}');
|
||||
|
||||
Affected Rows: 1
|
||||
|
||||
ADMIN flush_table('bluesky');
|
||||
|
||||
+------------------------------+
|
||||
| ADMIN flush_table('bluesky') |
|
||||
+------------------------------+
|
||||
| 0 |
|
||||
+------------------------------+
|
||||
|
||||
INSERT INTO bluesky (time_us, data)
|
||||
VALUES (1732206349001108,
|
||||
'{"did":"did:plc:gccfnqqizz4urhchsaie6jft","time_us":1732206349001108,"kind":"commit","commit":{"rev":"3lbhuvze3gi2u","operation":"create","collection":"app.bsky.graph.follow","rkey":"3lbhuvzdtmi2u","record":{"$type":"app.bsky.graph.follow","createdAt":"2024-11-21T16:27:40.923Z","subject":"did:plc:r7cdh4sgzqbfdc6wcdxxti7c"},"cid":"bafyreiew2p6cgirfaj45qoenm4fgumib7xoloclrap3jgkz5es7g7kby3i"}}');
|
||||
|
||||
Affected Rows: 1
|
||||
|
||||
INSERT INTO bluesky (time_us, data)
|
||||
VALUES (1732206349001372,
|
||||
'{"did":"did:plc:msxqf3twq7abtdw7dbfskphk","time_us":1732206349001372,"kind":"commit","commit":{"rev":"3lbhueija5p22","operation":"create","collection":"app.bsky.feed.like","rkey":"3lbhueiizcx22","record":{"$type":"app.bsky.feed.like","createdAt":"2024-11-21T16:15:58.232Z","subject":{"cid":"bafyreiavpshyqzrlo5m7fqodjhs6jevweqnif4phasiwimv4a7mnsqi2fe","uri":"at://did:plc:fusulxqc52zbrc75fi6xrcof/app.bsky.feed.post/3lbhskq5zn22f"}},"cid":"bafyreidjix4dauj2afjlbzmhj3a7gwftcevvmmy6edww6vrjdbst26rkby"}}');
|
||||
|
||||
Affected Rows: 1
|
||||
|
||||
ADMIN flush_table('bluesky');
|
||||
|
||||
+------------------------------+
|
||||
| ADMIN flush_table('bluesky') |
|
||||
+------------------------------+
|
||||
| 0 |
|
||||
+------------------------------+
|
||||
|
||||
INSERT INTO bluesky (time_us, data)
|
||||
VALUES (1732206349001905,
|
||||
'{"did":"did:plc:l5o3qjrmfztir54cpwlv2eme","time_us":1732206349001905,"kind":"commit","commit":{"rev":"3lbhtytohxc2o","operation":"create","collection":"app.bsky.feed.post","rkey":"3lbhtytjqzk2q","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-21T16:09:27.254Z","langs":["en"],"reply":{"parent":{"cid":"bafyreih35fe2jj3gchmgk4amold4l6sfxd2sby5wrg3jrws5fkdypxrbg4","uri":"at://did:plc:6wx2gg5yqgvmlu35r6y3bk6d/app.bsky.feed.post/3lbhtj2eb4s2o"},"root":{"cid":"bafyreifipyt3vctd4ptuoicvio7rbr5xvjv4afwuggnd2prnmn55mu6luu","uri":"at://did:plc:474ldquxwzrlcvjhhbbk2wte/app.bsky.feed.post/3lbhdzrynik27"}},"text":"okay i take mine back because I hadn’t heard this one yet^^"},"cid":"bafyreigzdsdne3z2xxcakgisieyj7y47hj6eg7lj6v4q25ah5q2qotu5ku"}}');
|
||||
|
||||
Affected Rows: 1
|
||||
|
||||
ADMIN compact_table('bluesky', 'swcs', '86400');
|
||||
|
||||
+-------------------------------------------------+
|
||||
| ADMIN compact_table('bluesky', 'swcs', '86400') |
|
||||
+-------------------------------------------------+
|
||||
| 0 |
|
||||
+-------------------------------------------------+
|
||||
|
||||
SELECT count(*) FROM bluesky;
|
||||
|
||||
+----------+
|
||||
| count(*) |
|
||||
+----------+
|
||||
| 5 |
|
||||
+----------+
|
||||
|
||||
-- Query 1:
|
||||
SELECT data.commit.collection AS event,
|
||||
count() AS count
|
||||
FROM bluesky
|
||||
GROUP BY event
|
||||
ORDER BY count DESC, event ASC;
|
||||
|
||||
+-----------------------+-------+
|
||||
| event | count |
|
||||
+-----------------------+-------+
|
||||
| app.bsky.feed.like | 2 |
|
||||
| app.bsky.feed.post | 2 |
|
||||
| app.bsky.graph.follow | 1 |
|
||||
+-----------------------+-------+
|
||||
|
||||
-- Query 2:
|
||||
SELECT data.commit.collection AS event,
|
||||
count() AS count,
|
||||
count(DISTINCT data.did) AS users
|
||||
FROM bluesky
|
||||
WHERE data.kind = 'commit' AND data.commit.operation = 'create'
|
||||
GROUP BY event
|
||||
ORDER BY count DESC, event ASC;
|
||||
|
||||
+-----------------------+-------+-------+
|
||||
| event | count | users |
|
||||
+-----------------------+-------+-------+
|
||||
| app.bsky.feed.like | 2 | 2 |
|
||||
| app.bsky.feed.post | 2 | 2 |
|
||||
| app.bsky.graph.follow | 1 | 1 |
|
||||
+-----------------------+-------+-------+
|
||||
|
||||
-- Query 3:
|
||||
SELECT data.commit.collection AS event,
|
||||
date_part('hour', to_timestamp_micros(arrow_cast(data.time_us, 'Int64'))) as hour_of_day,
|
||||
count() AS count
|
||||
FROM bluesky
|
||||
WHERE data.kind = 'commit'
|
||||
AND data.commit.operation = 'create'
|
||||
AND data.commit.collection in ('app.bsky.feed.post', 'app.bsky.feed.repost', 'app.bsky.feed.like')
|
||||
GROUP BY event, hour_of_day
|
||||
ORDER BY hour_of_day, event;
|
||||
|
||||
+--------------------+-------------+-------+
|
||||
| event | hour_of_day | count |
|
||||
+--------------------+-------------+-------+
|
||||
| app.bsky.feed.like | 16 | 2 |
|
||||
| app.bsky.feed.post | 16 | 2 |
|
||||
+--------------------+-------------+-------+
|
||||
|
||||
-- Query 4:
|
||||
SELECT data.did::String as user_id,
|
||||
min(to_timestamp_micros(arrow_cast(data.time_us, 'Int64'))) AS first_post_ts
|
||||
FROM bluesky
|
||||
WHERE data.kind = 'commit'
|
||||
AND data.commit.operation = 'create'
|
||||
AND data.commit.collection = 'app.bsky.feed.post'
|
||||
GROUP BY user_id
|
||||
ORDER BY first_post_ts ASC, user_id DESC
|
||||
LIMIT 3;
|
||||
|
||||
+----------------------------------+----------------------------+
|
||||
| user_id | first_post_ts |
|
||||
+----------------------------------+----------------------------+
|
||||
| did:plc:yj3sjq3blzpynh27cumnp5ks | 2024-11-21T16:25:49.000167 |
|
||||
| did:plc:l5o3qjrmfztir54cpwlv2eme | 2024-11-21T16:25:49.001905 |
|
||||
+----------------------------------+----------------------------+
|
||||
|
||||
-- Query 5:
|
||||
SELECT data.did::String as user_id,
|
||||
date_part(
|
||||
'epoch',
|
||||
max(to_timestamp_micros(arrow_cast(data.time_us, 'Int64'))) -
|
||||
min(to_timestamp_micros(arrow_cast(data.time_us, 'Int64')))
|
||||
) AS activity_span
|
||||
FROM bluesky
|
||||
WHERE data.kind = 'commit'
|
||||
AND data.commit.operation = 'create'
|
||||
AND data.commit.collection = 'app.bsky.feed.post'
|
||||
GROUP BY user_id
|
||||
ORDER BY activity_span DESC, user_id DESC
|
||||
LIMIT 3;
|
||||
|
||||
+----------------------------------+---------------+
|
||||
| user_id | activity_span |
|
||||
+----------------------------------+---------------+
|
||||
| did:plc:yj3sjq3blzpynh27cumnp5ks | 0.0 |
|
||||
| did:plc:l5o3qjrmfztir54cpwlv2eme | 0.0 |
|
||||
+----------------------------------+---------------+
|
||||
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
EXPLAIN
|
||||
SELECT date_part('hour', to_timestamp_micros(arrow_cast(data.time_us, 'Int64'))) as hour_of_day
|
||||
FROM bluesky;
|
||||
|
||||
+---------------+--------------------------------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+--------------------------------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | MergeScan [is_placeholder=false, remote_input=[ |
|
||||
| | Projection: date_part(Utf8("hour"), to_timestamp_micros(json2_get(bluesky.data, Utf8("time_us"), Int64(NULL)))) AS hour_of_day |
|
||||
| | TableScan: bluesky |
|
||||
| | ]] |
|
||||
| physical_plan | CooperativeExec |
|
||||
| | MergeScanExec: REDACTED
|
||||
| | |
|
||||
+---------------+--------------------------------------------------------------------------------------------------------------------------------+
|
||||
|
||||
DROP TABLE bluesky;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
92
tests/cases/standalone/common/types/json/jsonbench.sql
Normal file
92
tests/cases/standalone/common/types/json/jsonbench.sql
Normal file
@@ -0,0 +1,92 @@
|
||||
CREATE TABLE bluesky (
|
||||
`data` JSON2,
|
||||
time_us TimestampMicrosecond TIME INDEX
|
||||
) WITH ('append_mode' = 'true', 'sst_format' = 'flat');
|
||||
|
||||
INSERT INTO bluesky (time_us, data)
|
||||
VALUES (1732206349000167,
|
||||
'{"did":"did:plc:yj3sjq3blzpynh27cumnp5ks","time_us":1732206349000167,"kind":"commit","commit":{"rev":"3lbhtytnn2k2f","operation":"create","collection":"app.bsky.feed.post","rkey":"3lbhtyteurk2y","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-21T16:09:27.095Z","langs":["en"],"reply":{"parent":{"cid":"bafyreibfglofvqou2yiqvwzk4rcgkhhxrbunyemshdjledgwymimqkg24e","uri":"at://did:plc:6tr6tuzlx2db3rduzr2d6r24/app.bsky.feed.post/3lbhqo2rtys2z"},"root":{"cid":"bafyreibfglofvqou2yiqvwzk4rcgkhhxrbunyemshdjledgwymimqkg24e","uri":"at://did:plc:6tr6tuzlx2db3rduzr2d6r24/app.bsky.feed.post/3lbhqo2rtys2z"}},"text":"aaaaah. LIght shines in a corner of WTF...."},"cid":"bafyreidblutgvj75o4q4akzyyejedjj6l3it6hgqwee6jpwv2wqph5fsgm"}}');
|
||||
|
||||
INSERT INTO bluesky (time_us, data)
|
||||
VALUES (1732206349000644,
|
||||
'{"did":"did:plc:3i4xf2v4wcnyktgv6satke64","time_us":1732206349000644,"kind":"commit","commit":{"rev":"3lbhuvzds6d2a","operation":"create","collection":"app.bsky.feed.like","rkey":"3lbhuvzdked2a","record":{"$type":"app.bsky.feed.like","createdAt":"2024-11-21T16:25:46.221Z","subject":{"cid":"bafyreidjvrcmckkm765mct5fph36x7kupkfo35rjklbf2k76xkzwyiauge","uri":"at://did:plc:azrv4rcbws6kmcga4fsbphg2/app.bsky.feed.post/3lbgjdpbiec2l"}},"cid":"bafyreia5l5vrkh5oj4cjyhcqby2dprhyvcyofo2q5562tijlae2pzih23m"}}');
|
||||
|
||||
ADMIN flush_table('bluesky');
|
||||
|
||||
INSERT INTO bluesky (time_us, data)
|
||||
VALUES (1732206349001108,
|
||||
'{"did":"did:plc:gccfnqqizz4urhchsaie6jft","time_us":1732206349001108,"kind":"commit","commit":{"rev":"3lbhuvze3gi2u","operation":"create","collection":"app.bsky.graph.follow","rkey":"3lbhuvzdtmi2u","record":{"$type":"app.bsky.graph.follow","createdAt":"2024-11-21T16:27:40.923Z","subject":"did:plc:r7cdh4sgzqbfdc6wcdxxti7c"},"cid":"bafyreiew2p6cgirfaj45qoenm4fgumib7xoloclrap3jgkz5es7g7kby3i"}}');
|
||||
|
||||
INSERT INTO bluesky (time_us, data)
|
||||
VALUES (1732206349001372,
|
||||
'{"did":"did:plc:msxqf3twq7abtdw7dbfskphk","time_us":1732206349001372,"kind":"commit","commit":{"rev":"3lbhueija5p22","operation":"create","collection":"app.bsky.feed.like","rkey":"3lbhueiizcx22","record":{"$type":"app.bsky.feed.like","createdAt":"2024-11-21T16:15:58.232Z","subject":{"cid":"bafyreiavpshyqzrlo5m7fqodjhs6jevweqnif4phasiwimv4a7mnsqi2fe","uri":"at://did:plc:fusulxqc52zbrc75fi6xrcof/app.bsky.feed.post/3lbhskq5zn22f"}},"cid":"bafyreidjix4dauj2afjlbzmhj3a7gwftcevvmmy6edww6vrjdbst26rkby"}}');
|
||||
|
||||
ADMIN flush_table('bluesky');
|
||||
|
||||
INSERT INTO bluesky (time_us, data)
|
||||
VALUES (1732206349001905,
|
||||
'{"did":"did:plc:l5o3qjrmfztir54cpwlv2eme","time_us":1732206349001905,"kind":"commit","commit":{"rev":"3lbhtytohxc2o","operation":"create","collection":"app.bsky.feed.post","rkey":"3lbhtytjqzk2q","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-21T16:09:27.254Z","langs":["en"],"reply":{"parent":{"cid":"bafyreih35fe2jj3gchmgk4amold4l6sfxd2sby5wrg3jrws5fkdypxrbg4","uri":"at://did:plc:6wx2gg5yqgvmlu35r6y3bk6d/app.bsky.feed.post/3lbhtj2eb4s2o"},"root":{"cid":"bafyreifipyt3vctd4ptuoicvio7rbr5xvjv4afwuggnd2prnmn55mu6luu","uri":"at://did:plc:474ldquxwzrlcvjhhbbk2wte/app.bsky.feed.post/3lbhdzrynik27"}},"text":"okay i take mine back because I hadn’t heard this one yet^^"},"cid":"bafyreigzdsdne3z2xxcakgisieyj7y47hj6eg7lj6v4q25ah5q2qotu5ku"}}');
|
||||
|
||||
ADMIN compact_table('bluesky', 'swcs', '86400');
|
||||
|
||||
SELECT count(*) FROM bluesky;
|
||||
|
||||
-- Query 1:
|
||||
SELECT data.commit.collection AS event,
|
||||
count() AS count
|
||||
FROM bluesky
|
||||
GROUP BY event
|
||||
ORDER BY count DESC, event ASC;
|
||||
|
||||
-- Query 2:
|
||||
SELECT data.commit.collection AS event,
|
||||
count() AS count,
|
||||
count(DISTINCT data.did) AS users
|
||||
FROM bluesky
|
||||
WHERE data.kind = 'commit' AND data.commit.operation = 'create'
|
||||
GROUP BY event
|
||||
ORDER BY count DESC, event ASC;
|
||||
|
||||
-- Query 3:
|
||||
SELECT data.commit.collection AS event,
|
||||
date_part('hour', to_timestamp_micros(arrow_cast(data.time_us, 'Int64'))) as hour_of_day,
|
||||
count() AS count
|
||||
FROM bluesky
|
||||
WHERE data.kind = 'commit'
|
||||
AND data.commit.operation = 'create'
|
||||
AND data.commit.collection in ('app.bsky.feed.post', 'app.bsky.feed.repost', 'app.bsky.feed.like')
|
||||
GROUP BY event, hour_of_day
|
||||
ORDER BY hour_of_day, event;
|
||||
|
||||
-- Query 4:
|
||||
SELECT data.did::String as user_id,
|
||||
min(to_timestamp_micros(arrow_cast(data.time_us, 'Int64'))) AS first_post_ts
|
||||
FROM bluesky
|
||||
WHERE data.kind = 'commit'
|
||||
AND data.commit.operation = 'create'
|
||||
AND data.commit.collection = 'app.bsky.feed.post'
|
||||
GROUP BY user_id
|
||||
ORDER BY first_post_ts ASC, user_id DESC
|
||||
LIMIT 3;
|
||||
|
||||
-- Query 5:
|
||||
SELECT data.did::String as user_id,
|
||||
date_part(
|
||||
'epoch',
|
||||
max(to_timestamp_micros(arrow_cast(data.time_us, 'Int64'))) -
|
||||
min(to_timestamp_micros(arrow_cast(data.time_us, 'Int64')))
|
||||
) AS activity_span
|
||||
FROM bluesky
|
||||
WHERE data.kind = 'commit'
|
||||
AND data.commit.operation = 'create'
|
||||
AND data.commit.collection = 'app.bsky.feed.post'
|
||||
GROUP BY user_id
|
||||
ORDER BY activity_span DESC, user_id DESC
|
||||
LIMIT 3;
|
||||
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
EXPLAIN
|
||||
SELECT date_part('hour', to_timestamp_micros(arrow_cast(data.time_us, 'Int64'))) as hour_of_day
|
||||
FROM bluesky;
|
||||
|
||||
DROP TABLE bluesky;
|
||||
Reference in New Issue
Block a user