test: add jsonbench tests (#8165)

Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
LFC
2026-05-27 16:34:06 +08:00
committed by GitHub
parent 9487e2c3ca
commit bf7e3551fe
10 changed files with 481 additions and 61 deletions

View File

@@ -59,7 +59,10 @@ impl FunctionRewrite for JsonGetRewriter {
// json_get(column, path, <data_type>)
// )
fn inject_type_from_cast_expr(cast: Cast) -> Result<Transformed<Expr>> {
let Cast { expr, data_type } = cast;
let Cast {
expr,
mut data_type,
} = cast;
let mut json_get = match *expr {
Expr::ScalarFunction(f)
@@ -75,6 +78,9 @@ fn inject_type_from_cast_expr(cast: Cast) -> Result<Transformed<Expr>> {
}
};
if data_type.is_string() {
data_type = DataType::Utf8View;
}
let with_type = ScalarValue::try_new_null(&data_type).map(|x| Expr::Literal(x, None))?;
json_get.args.push(with_type);
Ok(Transformed::yes(Expr::ScalarFunction(json_get)))

View File

@@ -128,7 +128,7 @@ impl JsonNativeType {
JsonNumberType::I64 => ArrowDataType::Int64,
JsonNumberType::F64 => ArrowDataType::Float64,
},
JsonNativeType::String => ArrowDataType::Utf8,
JsonNativeType::String => ArrowDataType::Utf8View,
JsonNativeType::Array(array) => {
ArrowDataType::List(Arc::new(Field::new("item", array.as_arrow_type(), true)))
}

View File

@@ -17,16 +17,24 @@ use std::sync::Arc;
use arrow::compute;
use arrow::util::display::{ArrayFormatter, FormatOptions};
use arrow_array::builder::{
ArrayBuilder, BooleanBuilder, Float64Builder, Int64Builder, NullBuilder, StringViewBuilder,
make_builder,
};
use arrow_array::cast::AsArray;
use arrow_array::types::{Float64Type, Int64Type, UInt64Type};
use arrow_array::{Array, ArrayRef, GenericListArray, ListArray, StructArray, new_null_array};
use arrow_schema::{DataType, FieldRef};
use common_telemetry::debug;
use serde_json::Value;
use snafu::{OptionExt, ResultExt};
use crate::arrow_array::{StringArray, binary_array_value, string_array_value};
use crate::arrow_array::{
MutableBinaryArray, StringViewArray, binary_array_value, string_array_value,
};
use crate::error::{
AlignJsonArraySnafu, ArrowComputeSnafu, DeserializeSnafu, InvalidJsonSnafu, Result,
AlignJsonArraySnafu, ArrowComputeSnafu, CastTypeSnafu, DeserializeSnafu, InvalidJsonSnafu,
Result, SerializeSnafu,
};
pub struct JsonArray<'a> {
@@ -101,6 +109,12 @@ impl JsonArray<'_> {
return Ok(self.inner.clone());
}
debug!(
"Try aligning JSON array {} to data type {}",
self.inner.data_type(),
expect
);
let struct_array = self.inner.as_struct_opt().context(AlignJsonArraySnafu {
reason: "expect struct array",
})?;
@@ -178,11 +192,23 @@ impl JsonArray<'_> {
}
fn try_cast(&self, to_type: &DataType) -> Result<ArrayRef> {
if compute::can_cast_types(self.inner.data_type(), to_type) {
let from_type = self.inner.data_type();
if from_type == to_type {
return Ok(self.inner.clone());
}
if from_type.is_binary() && !to_type.is_binary() {
return self.decode_variant(to_type);
}
if !from_type.is_binary() && to_type.is_binary() {
return self.encode_variant();
}
if compute::can_cast_types(from_type, to_type) {
return compute::cast(&self.inner, to_type).context(ArrowComputeSnafu);
}
// TODO(LFC): Cast according to `to_type` instead of formatting to String here.
let formatter = ArrayFormatter::try_new(&self.inner, &FormatOptions::default())
.context(ArrowComputeSnafu)?;
let values = (0..self.inner.len())
@@ -192,7 +218,91 @@ impl JsonArray<'_> {
.then(|| formatter.value(i).to_string())
})
.collect::<Vec<_>>();
Ok(Arc::new(StringArray::from(values)))
Ok(Arc::new(StringViewArray::from(values)))
}
fn encode_variant(&self) -> Result<ArrayRef> {
let len = self.inner.len();
let mut encoded = Vec::with_capacity(len);
let mut total_bytes = 0;
for i in 0..len {
let value = self.try_get_value(i)?;
if value.is_null() {
encoded.push(None);
} else {
let bytes = serde_json::to_vec(&value).context(SerializeSnafu)?;
total_bytes += bytes.len();
encoded.push(Some(bytes));
}
}
let mut builder = MutableBinaryArray::with_capacity(len, total_bytes);
for value in encoded {
builder.append_option(value);
}
Ok(Arc::new(builder.finish()))
}
fn decode_variant(&self, to_type: &DataType) -> Result<ArrayRef> {
fn downcast_builder<'a, T: ArrayBuilder>(
builder: &'a mut dyn ArrayBuilder,
to_type: &DataType,
) -> Result<&'a mut T> {
builder
.as_any_mut()
.downcast_mut::<T>()
.with_context(|| CastTypeSnafu {
msg: format!("Expect ArrayBuilder is of type {to_type}"),
})
}
let mut builder = make_builder(to_type, self.inner.len());
if to_type.is_null() {
downcast_builder::<NullBuilder>(builder.as_mut(), to_type)?
.append_nulls(self.inner.len());
} else {
match to_type {
DataType::Boolean => {
let b = downcast_builder::<BooleanBuilder>(builder.as_mut(), to_type)?;
for i in 0..self.inner.len() {
b.append_option(self.try_get_value(i)?.as_bool());
}
}
DataType::Int64 => {
let b = downcast_builder::<Int64Builder>(builder.as_mut(), to_type)?;
for i in 0..self.inner.len() {
b.append_option(self.try_get_value(i)?.as_i64());
}
}
DataType::Float64 => {
let b = downcast_builder::<Float64Builder>(builder.as_mut(), to_type)?;
for i in 0..self.inner.len() {
b.append_option(self.try_get_value(i)?.as_f64());
}
}
DataType::Utf8View => {
let b = downcast_builder::<StringViewBuilder>(builder.as_mut(), to_type)?;
for i in 0..self.inner.len() {
let v = self.try_get_value(i)?;
if v.is_null() {
b.append_null();
} else if let Some(s) = v.as_str() {
b.append_value(s);
} else {
b.append_value(v.to_string());
}
}
}
_ => {
return CastTypeSnafu {
msg: format!("Cannot cast JSON value to {to_type}"),
}
.fail();
}
}
}
Ok(builder.finish())
}
}
@@ -231,7 +341,9 @@ impl<'a> From<&'a ArrayRef> for JsonArray<'a> {
#[cfg(test)]
mod test {
use arrow_array::types::Int64Type;
use arrow_array::{BinaryArray, BooleanArray, Float64Array, Int32Array, Int64Array, ListArray};
use arrow_array::{
BinaryArray, BooleanArray, Float64Array, Int32Array, Int64Array, ListArray, StringArray,
};
use arrow_schema::{Field, Fields};
use serde_json::json;

View File

@@ -33,6 +33,7 @@ use datafusion_expr::Expr;
use datafusion_expr::utils::expr_to_columns;
use datatypes::schema::ext::ArrowSchemaExt;
use futures::StreamExt;
use itertools::Itertools;
use partition::expr::PartitionExpr;
use smallvec::SmallVec;
use snafu::ResultExt;
@@ -436,7 +437,16 @@ impl ScanRegion {
.schema
.arrow_schema()
.has_json_extension_field()
.then_some(&self.request.json_type_hint);
.then_some(&self.request.json_type_hint)
.inspect(|json_type_hint| {
debug!(
"Concretized JSON type: {{{}}}",
json_type_hint
.iter()
.map(|(k, v)| format!("{}: {}", k, v))
.join(", ")
);
});
let mapper = FlatProjectionMapper::new_with_read_columns(
&self.version.metadata,
projection,

View File

@@ -115,11 +115,14 @@ fn extract_untyped_json_get(expr: &mut Expr) -> Option<&mut ScalarFunction> {
}
}
fn push_json_get_type_arg(mut expr: Expr, data_type: DataType) -> Result<Either<Expr, Expr>> {
fn push_json_get_type_arg(mut expr: Expr, mut data_type: DataType) -> Result<Either<Expr, Expr>> {
let Some(json_get) = extract_untyped_json_get(&mut expr) else {
return Ok(Either::Left(expr));
};
if data_type.is_string() {
data_type = DataType::Utf8View;
}
let with_type = ScalarValue::try_new_null(&data_type).map(|x| Expr::Literal(x, None))?;
json_get.args.push(with_type);

View File

@@ -25,8 +25,6 @@ use servers::server::ServerHandlers;
use tests_integration::standalone::GreptimeDbStandaloneBuilder;
use tests_integration::test_util::execute_sql_and_expect;
// TODO(LFC): Unignore the test when JSON2 is ready.
#[ignore]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_load_jsonbench_data_by_pipeline() -> io::Result<()> {
common_telemetry::init_default_ut_logging();
@@ -123,8 +121,6 @@ transform:
assert!(response.starts_with(pattern));
}
// TODO(LFC): Unignore the test when JSON2 is ready.
#[ignore]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_load_jsonbench_data_by_sql() -> io::Result<()> {
common_telemetry::init_default_ut_logging();
@@ -153,16 +149,10 @@ async fn query_data(frontend: &Arc<Instance>) -> io::Result<()> {
+----------+"#;
execute_sql_and_expect(frontend, sql, expected).await;
let sql = "SELECT * FROM bluesky ORDER BY time_us";
let expected = fs::read_to_string(find_workspace_path(
"tests-integration/resources/jsonbench-select-all.txt",
))?;
execute_sql_and_expect(frontend, sql, &expected).await;
// query 1:
let sql = "
SELECT
json_get_string(data, '$.commit.collection') AS event, count() AS count
data.commit.collection AS event, count() AS count
FROM bluesky
GROUP BY event
ORDER BY count DESC, event ASC";
@@ -180,13 +170,12 @@ ORDER BY count DESC, event ASC";
// query 2:
let sql = "
SELECT
json_get_string(data, '$.commit.collection') AS event,
data.commit.collection AS event,
count() AS count,
count(DISTINCT json_get_string(data, '$.did')) AS users
count(DISTINCT data.did) AS users
FROM bluesky
WHERE
(json_get_string(data, '$.kind') = 'commit') AND
(json_get_string(data, '$.commit.operation') = 'create')
data.kind = 'commit' AND data.commit.operation = 'create'
GROUP BY event
ORDER BY count DESC, event ASC";
let expected = r#"
@@ -203,15 +192,14 @@ ORDER BY count DESC, event ASC";
// query 3:
let sql = "
SELECT
json_get_string(data, '$.commit.collection') AS event,
date_part('hour', to_timestamp_micros(json_get_int(data, '$.time_us'))) as hour_of_day,
data.commit.collection AS event,
date_part('hour', to_timestamp_micros(arrow_cast(data.time_us, 'Int64'))) as hour_of_day,
count() AS count
FROM bluesky
WHERE
(json_get_string(data, '$.kind') = 'commit') AND
(json_get_string(data, '$.commit.operation') = 'create') AND
json_get_string(data, '$.commit.collection') IN
('app.bsky.feed.post', 'app.bsky.feed.repost', 'app.bsky.feed.like')
data.kind = 'commit' AND
data.commit.operation = 'create' AND
data.commit.collection in ('app.bsky.feed.post', 'app.bsky.feed.repost', 'app.bsky.feed.like')
GROUP BY event, hour_of_day
ORDER BY hour_of_day, event";
let expected = r#"
@@ -227,13 +215,13 @@ ORDER BY hour_of_day, event";
// query 4:
let sql = "
SELECT
json_get_string(data, '$.did') as user_id,
min(to_timestamp_micros(json_get_int(data, '$.time_us'))) AS first_post_ts
data.did::String as user_id,
min(to_timestamp_micros(arrow_cast(data.time_us, 'Int64'))) AS first_post_ts
FROM bluesky
WHERE
(json_get_string(data, '$.kind') = 'commit') AND
(json_get_string(data, '$.commit.operation') = 'create') AND
(json_get_string(data, '$.commit.collection') = 'app.bsky.feed.post')
data.kind = 'commit' AND
data.commit.operation = 'create' AND
data.commit.collection = 'app.bsky.feed.post'
GROUP BY user_id
ORDER BY first_post_ts ASC, user_id DESC
LIMIT 3";
@@ -250,17 +238,17 @@ LIMIT 3";
// query 5:
let sql = "
SELECT
json_get_string(data, '$.did') as user_id,
data.did::String as user_id,
date_part(
'epoch',
max(to_timestamp_micros(json_get_int(data, '$.time_us'))) -
min(to_timestamp_micros(json_get_int(data, '$.time_us')))
max(to_timestamp_micros(arrow_cast(data.time_us, 'Int64'))) -
min(to_timestamp_micros(arrow_cast(data.time_us, 'Int64')))
) AS activity_span
FROM bluesky
WHERE
(json_get_string(data, '$.kind') = 'commit') AND
(json_get_string(data, '$.commit.operation') = 'create') AND
(json_get_string(data, '$.commit.collection') = 'app.bsky.feed.post')
data.kind = 'commit' AND
data.commit.operation = 'create' AND
data.commit.collection = 'app.bsky.feed.post'
GROUP BY user_id
ORDER BY activity_span DESC, user_id DESC
LIMIT 3";
@@ -304,30 +292,21 @@ async fn insert_data_by_sql(frontend: &Arc<Instance>) -> io::Result<()> {
async fn desc_table(frontend: &Arc<Instance>) {
let sql = "DESC TABLE bluesky";
let expected = r#"
+---------+------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
| Column | Type | Key | Null | Default | Semantic Type |
+---------+------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
| data | Json<{"_raw":"<String>","commit.collection":"<String>","commit.operation":"<String>","did":"<String>","kind":"<String>","time_us":"<Number>"}> | | YES | | FIELD |
| time_us | TimestampMicrosecond | PRI | NO | | TIMESTAMP |
+---------+------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+"#;
+---------+----------------------+-----+------+---------+---------------+
| Column | Type | Key | Null | Default | Semantic Type |
+---------+----------------------+-----+------+---------+---------------+
| data | Json2{} | | YES | | FIELD |
| time_us | TimestampMicrosecond | PRI | NO | | TIMESTAMP |
+---------+----------------------+-----+------+---------+---------------+"#;
execute_sql_and_expect(frontend, sql, expected).await;
}
async fn create_table(frontend: &Arc<Instance>) {
let sql = r#"
CREATE TABLE bluesky (
"data" JSON (
format = "partial",
fields = Struct<
kind String,
"commit.operation" String,
"commit.collection" String,
did String,
time_us Bigint
>,
),
"data" JSON2,
time_us TimestampMicrosecond TIME INDEX,
)
) WITH ('append_mode' = 'true', 'sst_format' = 'flat')
"#;
execute_sql_and_expect(frontend, sql, "Affected Rows: 0").await;
}

View File

@@ -111,12 +111,29 @@ select j.a.b from json2_table order by ts;
| -4 |
| |
| |
| "s7" |
| s7 |
| 8 |
| |
| 10 |
+-------------------------------------+
select j.a, j.a.x from json2_table order by ts;
+-----------------------------------+-------------------------------------+
| json_get(json2_table.j,Utf8("a")) | json_get(json2_table.j,Utf8("a.x")) |
+-----------------------------------+-------------------------------------+
| {"b":1} | |
| {"b":-2} | |
| {"b":3} | |
| {"b":-4} | |
| {"b":null} | |
| | |
| {"b":"s7"} | |
| {"b":8} | |
| {"b":null,"x":true} | true |
| {"b":10,"x":null} | null |
+-----------------------------------+-------------------------------------+
select j.c, j.y from json2_table order by ts;
+-----------------------------------+-----------------------------------+
@@ -129,11 +146,28 @@ select j.c, j.y from json2_table order by ts;
| s5 | |
| s6 | |
| [1] | |
| "s8" | |
| s8 | |
| s9 | |
| | false |
+-----------------------------------+-----------------------------------+
select j.a.b + 1 from json2_table order by ts;
+------------------------------------------------------------+
| json_get(json2_table.j,Utf8("a.b"),Int64(NULL)) + Int64(1) |
+------------------------------------------------------------+
| 2 |
| -1 |
| 4 |
| -3 |
| |
| |
| |
| 9 |
| |
| 11 |
+------------------------------------------------------------+
select j.d from json2_table order by ts;
+-----------------------------------+

View File

@@ -42,8 +42,12 @@ explain select j.a.x::bool from json2_table;
select j.a.b from json2_table order by ts;
select j.a, j.a.x from json2_table order by ts;
select j.c, j.y from json2_table order by ts;
select j.a.b + 1 from json2_table order by ts;
select j.d from json2_table order by ts;
drop table json2_table;

View File

@@ -0,0 +1,180 @@
CREATE TABLE bluesky (
`data` JSON2,
time_us TimestampMicrosecond TIME INDEX
) WITH ('append_mode' = 'true', 'sst_format' = 'flat');
Affected Rows: 0
INSERT INTO bluesky (time_us, data)
VALUES (1732206349000167,
'{"did":"did:plc:yj3sjq3blzpynh27cumnp5ks","time_us":1732206349000167,"kind":"commit","commit":{"rev":"3lbhtytnn2k2f","operation":"create","collection":"app.bsky.feed.post","rkey":"3lbhtyteurk2y","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-21T16:09:27.095Z","langs":["en"],"reply":{"parent":{"cid":"bafyreibfglofvqou2yiqvwzk4rcgkhhxrbunyemshdjledgwymimqkg24e","uri":"at://did:plc:6tr6tuzlx2db3rduzr2d6r24/app.bsky.feed.post/3lbhqo2rtys2z"},"root":{"cid":"bafyreibfglofvqou2yiqvwzk4rcgkhhxrbunyemshdjledgwymimqkg24e","uri":"at://did:plc:6tr6tuzlx2db3rduzr2d6r24/app.bsky.feed.post/3lbhqo2rtys2z"}},"text":"aaaaah.  LIght shines in a corner of WTF...."},"cid":"bafyreidblutgvj75o4q4akzyyejedjj6l3it6hgqwee6jpwv2wqph5fsgm"}}');
Affected Rows: 1
INSERT INTO bluesky (time_us, data)
VALUES (1732206349000644,
'{"did":"did:plc:3i4xf2v4wcnyktgv6satke64","time_us":1732206349000644,"kind":"commit","commit":{"rev":"3lbhuvzds6d2a","operation":"create","collection":"app.bsky.feed.like","rkey":"3lbhuvzdked2a","record":{"$type":"app.bsky.feed.like","createdAt":"2024-11-21T16:25:46.221Z","subject":{"cid":"bafyreidjvrcmckkm765mct5fph36x7kupkfo35rjklbf2k76xkzwyiauge","uri":"at://did:plc:azrv4rcbws6kmcga4fsbphg2/app.bsky.feed.post/3lbgjdpbiec2l"}},"cid":"bafyreia5l5vrkh5oj4cjyhcqby2dprhyvcyofo2q5562tijlae2pzih23m"}}');
Affected Rows: 1
ADMIN flush_table('bluesky');
+------------------------------+
| ADMIN flush_table('bluesky') |
+------------------------------+
| 0 |
+------------------------------+
INSERT INTO bluesky (time_us, data)
VALUES (1732206349001108,
'{"did":"did:plc:gccfnqqizz4urhchsaie6jft","time_us":1732206349001108,"kind":"commit","commit":{"rev":"3lbhuvze3gi2u","operation":"create","collection":"app.bsky.graph.follow","rkey":"3lbhuvzdtmi2u","record":{"$type":"app.bsky.graph.follow","createdAt":"2024-11-21T16:27:40.923Z","subject":"did:plc:r7cdh4sgzqbfdc6wcdxxti7c"},"cid":"bafyreiew2p6cgirfaj45qoenm4fgumib7xoloclrap3jgkz5es7g7kby3i"}}');
Affected Rows: 1
INSERT INTO bluesky (time_us, data)
VALUES (1732206349001372,
'{"did":"did:plc:msxqf3twq7abtdw7dbfskphk","time_us":1732206349001372,"kind":"commit","commit":{"rev":"3lbhueija5p22","operation":"create","collection":"app.bsky.feed.like","rkey":"3lbhueiizcx22","record":{"$type":"app.bsky.feed.like","createdAt":"2024-11-21T16:15:58.232Z","subject":{"cid":"bafyreiavpshyqzrlo5m7fqodjhs6jevweqnif4phasiwimv4a7mnsqi2fe","uri":"at://did:plc:fusulxqc52zbrc75fi6xrcof/app.bsky.feed.post/3lbhskq5zn22f"}},"cid":"bafyreidjix4dauj2afjlbzmhj3a7gwftcevvmmy6edww6vrjdbst26rkby"}}');
Affected Rows: 1
ADMIN flush_table('bluesky');
+------------------------------+
| ADMIN flush_table('bluesky') |
+------------------------------+
| 0 |
+------------------------------+
INSERT INTO bluesky (time_us, data)
VALUES (1732206349001905,
'{"did":"did:plc:l5o3qjrmfztir54cpwlv2eme","time_us":1732206349001905,"kind":"commit","commit":{"rev":"3lbhtytohxc2o","operation":"create","collection":"app.bsky.feed.post","rkey":"3lbhtytjqzk2q","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-21T16:09:27.254Z","langs":["en"],"reply":{"parent":{"cid":"bafyreih35fe2jj3gchmgk4amold4l6sfxd2sby5wrg3jrws5fkdypxrbg4","uri":"at://did:plc:6wx2gg5yqgvmlu35r6y3bk6d/app.bsky.feed.post/3lbhtj2eb4s2o"},"root":{"cid":"bafyreifipyt3vctd4ptuoicvio7rbr5xvjv4afwuggnd2prnmn55mu6luu","uri":"at://did:plc:474ldquxwzrlcvjhhbbk2wte/app.bsky.feed.post/3lbhdzrynik27"}},"text":"okay i take mine back because I hadnt heard this one yet^^"},"cid":"bafyreigzdsdne3z2xxcakgisieyj7y47hj6eg7lj6v4q25ah5q2qotu5ku"}}');
Affected Rows: 1
ADMIN compact_table('bluesky', 'swcs', '86400');
+-------------------------------------------------+
| ADMIN compact_table('bluesky', 'swcs', '86400') |
+-------------------------------------------------+
| 0 |
+-------------------------------------------------+
SELECT count(*) FROM bluesky;
+----------+
| count(*) |
+----------+
| 5 |
+----------+
-- Query 1:
SELECT data.commit.collection AS event,
count() AS count
FROM bluesky
GROUP BY event
ORDER BY count DESC, event ASC;
+-----------------------+-------+
| event | count |
+-----------------------+-------+
| app.bsky.feed.like | 2 |
| app.bsky.feed.post | 2 |
| app.bsky.graph.follow | 1 |
+-----------------------+-------+
-- Query 2:
SELECT data.commit.collection AS event,
count() AS count,
count(DISTINCT data.did) AS users
FROM bluesky
WHERE data.kind = 'commit' AND data.commit.operation = 'create'
GROUP BY event
ORDER BY count DESC, event ASC;
+-----------------------+-------+-------+
| event | count | users |
+-----------------------+-------+-------+
| app.bsky.feed.like | 2 | 2 |
| app.bsky.feed.post | 2 | 2 |
| app.bsky.graph.follow | 1 | 1 |
+-----------------------+-------+-------+
-- Query 3:
SELECT data.commit.collection AS event,
date_part('hour', to_timestamp_micros(arrow_cast(data.time_us, 'Int64'))) as hour_of_day,
count() AS count
FROM bluesky
WHERE data.kind = 'commit'
AND data.commit.operation = 'create'
AND data.commit.collection in ('app.bsky.feed.post', 'app.bsky.feed.repost', 'app.bsky.feed.like')
GROUP BY event, hour_of_day
ORDER BY hour_of_day, event;
+--------------------+-------------+-------+
| event | hour_of_day | count |
+--------------------+-------------+-------+
| app.bsky.feed.like | 16 | 2 |
| app.bsky.feed.post | 16 | 2 |
+--------------------+-------------+-------+
-- Query 4:
SELECT data.did::String as user_id,
min(to_timestamp_micros(arrow_cast(data.time_us, 'Int64'))) AS first_post_ts
FROM bluesky
WHERE data.kind = 'commit'
AND data.commit.operation = 'create'
AND data.commit.collection = 'app.bsky.feed.post'
GROUP BY user_id
ORDER BY first_post_ts ASC, user_id DESC
LIMIT 3;
+----------------------------------+----------------------------+
| user_id | first_post_ts |
+----------------------------------+----------------------------+
| did:plc:yj3sjq3blzpynh27cumnp5ks | 2024-11-21T16:25:49.000167 |
| did:plc:l5o3qjrmfztir54cpwlv2eme | 2024-11-21T16:25:49.001905 |
+----------------------------------+----------------------------+
-- Query 5:
SELECT data.did::String as user_id,
date_part(
'epoch',
max(to_timestamp_micros(arrow_cast(data.time_us, 'Int64'))) -
min(to_timestamp_micros(arrow_cast(data.time_us, 'Int64')))
) AS activity_span
FROM bluesky
WHERE data.kind = 'commit'
AND data.commit.operation = 'create'
AND data.commit.collection = 'app.bsky.feed.post'
GROUP BY user_id
ORDER BY activity_span DESC, user_id DESC
LIMIT 3;
+----------------------------------+---------------+
| user_id | activity_span |
+----------------------------------+---------------+
| did:plc:yj3sjq3blzpynh27cumnp5ks | 0.0 |
| did:plc:l5o3qjrmfztir54cpwlv2eme | 0.0 |
+----------------------------------+---------------+
-- SQLNESS REPLACE (peers.*) REDACTED
EXPLAIN
SELECT date_part('hour', to_timestamp_micros(arrow_cast(data.time_us, 'Int64'))) as hour_of_day
FROM bluesky;
+---------------+-------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false, remote_input=[ |
| | Projection: date_part(Utf8("hour"), to_timestamp_micros(json_get(bluesky.data, Utf8("time_us"), Int64(NULL)))) AS hour_of_day |
| | TableScan: bluesky |
| | ]] |
| physical_plan | CooperativeExec |
| | MergeScanExec: REDACTED
| | |
+---------------+-------------------------------------------------------------------------------------------------------------------------------+
DROP TABLE bluesky;
Affected Rows: 0

View File

@@ -0,0 +1,92 @@
CREATE TABLE bluesky (
`data` JSON2,
time_us TimestampMicrosecond TIME INDEX
) WITH ('append_mode' = 'true', 'sst_format' = 'flat');
INSERT INTO bluesky (time_us, data)
VALUES (1732206349000167,
'{"did":"did:plc:yj3sjq3blzpynh27cumnp5ks","time_us":1732206349000167,"kind":"commit","commit":{"rev":"3lbhtytnn2k2f","operation":"create","collection":"app.bsky.feed.post","rkey":"3lbhtyteurk2y","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-21T16:09:27.095Z","langs":["en"],"reply":{"parent":{"cid":"bafyreibfglofvqou2yiqvwzk4rcgkhhxrbunyemshdjledgwymimqkg24e","uri":"at://did:plc:6tr6tuzlx2db3rduzr2d6r24/app.bsky.feed.post/3lbhqo2rtys2z"},"root":{"cid":"bafyreibfglofvqou2yiqvwzk4rcgkhhxrbunyemshdjledgwymimqkg24e","uri":"at://did:plc:6tr6tuzlx2db3rduzr2d6r24/app.bsky.feed.post/3lbhqo2rtys2z"}},"text":"aaaaah.  LIght shines in a corner of WTF...."},"cid":"bafyreidblutgvj75o4q4akzyyejedjj6l3it6hgqwee6jpwv2wqph5fsgm"}}');
INSERT INTO bluesky (time_us, data)
VALUES (1732206349000644,
'{"did":"did:plc:3i4xf2v4wcnyktgv6satke64","time_us":1732206349000644,"kind":"commit","commit":{"rev":"3lbhuvzds6d2a","operation":"create","collection":"app.bsky.feed.like","rkey":"3lbhuvzdked2a","record":{"$type":"app.bsky.feed.like","createdAt":"2024-11-21T16:25:46.221Z","subject":{"cid":"bafyreidjvrcmckkm765mct5fph36x7kupkfo35rjklbf2k76xkzwyiauge","uri":"at://did:plc:azrv4rcbws6kmcga4fsbphg2/app.bsky.feed.post/3lbgjdpbiec2l"}},"cid":"bafyreia5l5vrkh5oj4cjyhcqby2dprhyvcyofo2q5562tijlae2pzih23m"}}');
ADMIN flush_table('bluesky');
INSERT INTO bluesky (time_us, data)
VALUES (1732206349001108,
'{"did":"did:plc:gccfnqqizz4urhchsaie6jft","time_us":1732206349001108,"kind":"commit","commit":{"rev":"3lbhuvze3gi2u","operation":"create","collection":"app.bsky.graph.follow","rkey":"3lbhuvzdtmi2u","record":{"$type":"app.bsky.graph.follow","createdAt":"2024-11-21T16:27:40.923Z","subject":"did:plc:r7cdh4sgzqbfdc6wcdxxti7c"},"cid":"bafyreiew2p6cgirfaj45qoenm4fgumib7xoloclrap3jgkz5es7g7kby3i"}}');
INSERT INTO bluesky (time_us, data)
VALUES (1732206349001372,
'{"did":"did:plc:msxqf3twq7abtdw7dbfskphk","time_us":1732206349001372,"kind":"commit","commit":{"rev":"3lbhueija5p22","operation":"create","collection":"app.bsky.feed.like","rkey":"3lbhueiizcx22","record":{"$type":"app.bsky.feed.like","createdAt":"2024-11-21T16:15:58.232Z","subject":{"cid":"bafyreiavpshyqzrlo5m7fqodjhs6jevweqnif4phasiwimv4a7mnsqi2fe","uri":"at://did:plc:fusulxqc52zbrc75fi6xrcof/app.bsky.feed.post/3lbhskq5zn22f"}},"cid":"bafyreidjix4dauj2afjlbzmhj3a7gwftcevvmmy6edww6vrjdbst26rkby"}}');
ADMIN flush_table('bluesky');
INSERT INTO bluesky (time_us, data)
VALUES (1732206349001905,
'{"did":"did:plc:l5o3qjrmfztir54cpwlv2eme","time_us":1732206349001905,"kind":"commit","commit":{"rev":"3lbhtytohxc2o","operation":"create","collection":"app.bsky.feed.post","rkey":"3lbhtytjqzk2q","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-21T16:09:27.254Z","langs":["en"],"reply":{"parent":{"cid":"bafyreih35fe2jj3gchmgk4amold4l6sfxd2sby5wrg3jrws5fkdypxrbg4","uri":"at://did:plc:6wx2gg5yqgvmlu35r6y3bk6d/app.bsky.feed.post/3lbhtj2eb4s2o"},"root":{"cid":"bafyreifipyt3vctd4ptuoicvio7rbr5xvjv4afwuggnd2prnmn55mu6luu","uri":"at://did:plc:474ldquxwzrlcvjhhbbk2wte/app.bsky.feed.post/3lbhdzrynik27"}},"text":"okay i take mine back because I hadnt heard this one yet^^"},"cid":"bafyreigzdsdne3z2xxcakgisieyj7y47hj6eg7lj6v4q25ah5q2qotu5ku"}}');
ADMIN compact_table('bluesky', 'swcs', '86400');
SELECT count(*) FROM bluesky;
-- Query 1:
SELECT data.commit.collection AS event,
count() AS count
FROM bluesky
GROUP BY event
ORDER BY count DESC, event ASC;
-- Query 2:
SELECT data.commit.collection AS event,
count() AS count,
count(DISTINCT data.did) AS users
FROM bluesky
WHERE data.kind = 'commit' AND data.commit.operation = 'create'
GROUP BY event
ORDER BY count DESC, event ASC;
-- Query 3:
SELECT data.commit.collection AS event,
date_part('hour', to_timestamp_micros(arrow_cast(data.time_us, 'Int64'))) as hour_of_day,
count() AS count
FROM bluesky
WHERE data.kind = 'commit'
AND data.commit.operation = 'create'
AND data.commit.collection in ('app.bsky.feed.post', 'app.bsky.feed.repost', 'app.bsky.feed.like')
GROUP BY event, hour_of_day
ORDER BY hour_of_day, event;
-- Query 4:
SELECT data.did::String as user_id,
min(to_timestamp_micros(arrow_cast(data.time_us, 'Int64'))) AS first_post_ts
FROM bluesky
WHERE data.kind = 'commit'
AND data.commit.operation = 'create'
AND data.commit.collection = 'app.bsky.feed.post'
GROUP BY user_id
ORDER BY first_post_ts ASC, user_id DESC
LIMIT 3;
-- Query 5:
SELECT data.did::String as user_id,
date_part(
'epoch',
max(to_timestamp_micros(arrow_cast(data.time_us, 'Int64'))) -
min(to_timestamp_micros(arrow_cast(data.time_us, 'Int64')))
) AS activity_span
FROM bluesky
WHERE data.kind = 'commit'
AND data.commit.operation = 'create'
AND data.commit.collection = 'app.bsky.feed.post'
GROUP BY user_id
ORDER BY activity_span DESC, user_id DESC
LIMIT 3;
-- SQLNESS REPLACE (peers.*) REDACTED
EXPLAIN
SELECT date_part('hour', to_timestamp_micros(arrow_cast(data.time_us, 'Int64'))) as hour_of_day
FROM bluesky;
DROP TABLE bluesky;