mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-07-04 13:00:38 +00:00
feat: json2 field access pushdown to parquet
Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
476
Cargo.lock
generated
476
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -13,8 +13,11 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::any::Any;
|
||||
use std::fmt::{Debug, Formatter};
|
||||
use std::sync::Arc;
|
||||
|
||||
use snafu::{FromString, Snafu};
|
||||
|
||||
use crate::status_code::StatusCode;
|
||||
|
||||
/// Extension to [`Error`](std::error::Error) in std.
|
||||
@@ -116,6 +119,39 @@ impl<T: StackError> StackError for Box<T> {
|
||||
}
|
||||
}
|
||||
|
||||
/// A simple [Result] of which the error is convertible from [ErrorExt] (which every GreptimeDB
|
||||
/// error implements). Use this if you are tired of writing `unwrap`s in test codes, that you can
|
||||
/// use the `?` on all GreptimeDB errors.
|
||||
pub type WhateverResult<T> = Result<T, Whatever>;
|
||||
|
||||
#[derive(Snafu)]
|
||||
#[snafu(display("{inner}"))]
|
||||
pub struct Whatever {
|
||||
inner: snafu::Whatever,
|
||||
}
|
||||
|
||||
impl Debug for Whatever {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", self.inner)
|
||||
}
|
||||
}
|
||||
|
||||
impl<E: ErrorExt> From<E> for Whatever {
|
||||
fn from(e: E) -> Self {
|
||||
Self {
|
||||
inner: FromString::without_source(format!("{e:?}")),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<String> for Whatever {
|
||||
fn from(s: String) -> Self {
|
||||
Self {
|
||||
inner: FromString::without_source(s),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// An opaque boxed error based on errors that implement [ErrorExt] trait.
|
||||
pub struct BoxedError {
|
||||
inner: Box<dyn crate::ext::ErrorExt + Send + Sync>,
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
|
||||
use std::any::Any;
|
||||
|
||||
use common_error::ext::{ErrorExt, PlainError, StackError};
|
||||
use common_error::ext::{ErrorExt, PlainError, StackError, WhateverResult};
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_macro::stack_trace_debug;
|
||||
use snafu::{Location, ResultExt, Snafu};
|
||||
@@ -60,6 +60,34 @@ fn transparent_error() -> Result<(), MyError> {
|
||||
Err(plain_error)?
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_into_whatever_error() {
|
||||
fn f(g: fn() -> Result<(), MyError>) -> WhateverResult<()> {
|
||||
g()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
let whatever = f(normal_error).unwrap_err();
|
||||
assert_eq!(
|
||||
normalize_path(&whatever.to_string()),
|
||||
format!(
|
||||
r#"0: A normal error with "display" attribute, message "blabla", at {}:55:22
|
||||
1: PlainError {{ msg: "<root cause>", status_code: Unexpected }}"#,
|
||||
normalize_path(file!())
|
||||
)
|
||||
);
|
||||
|
||||
let whatever = f(transparent_error).unwrap_err();
|
||||
assert_eq!(
|
||||
normalize_path(&whatever.to_string()),
|
||||
format!(
|
||||
r#"0: <transparent>, at {}:60:5
|
||||
1: PlainError {{ msg: "<root cause>", status_code: Unexpected }}"#,
|
||||
normalize_path(file!())
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_output_msg() {
|
||||
let result = normal_error();
|
||||
|
||||
32
src/common/recordbatch/src/ext.rs
Normal file
32
src/common/recordbatch/src/ext.rs
Normal file
@@ -0,0 +1,32 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use datatypes::arrow::record_batch::RecordBatch;
|
||||
use datatypes::arrow::util::pretty;
|
||||
|
||||
pub trait RecordBatchExt {
|
||||
fn pretty_print(&self) -> String;
|
||||
}
|
||||
|
||||
impl RecordBatchExt for RecordBatch {
|
||||
fn pretty_print(&self) -> String {
|
||||
match pretty::pretty_format_batches(std::slice::from_ref(self)) {
|
||||
Ok(s) => s.to_string(),
|
||||
Err(e) => format!(
|
||||
r#"Unable to pretty print, error: "{}". Debug print is: {:?}"#,
|
||||
e, self,
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -17,6 +17,7 @@
|
||||
pub mod adapter;
|
||||
pub mod cursor;
|
||||
pub mod error;
|
||||
pub mod ext;
|
||||
pub mod filter;
|
||||
pub mod recordbatch;
|
||||
pub mod util;
|
||||
|
||||
@@ -12,11 +12,14 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
|
||||
use api::helper::encode_json_value;
|
||||
use api::v1::Rows;
|
||||
use api::v1::helper::row;
|
||||
use api::v1::value::ValueData;
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_error::ext::{ErrorExt, WhateverResult};
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_recordbatch::RecordBatches;
|
||||
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
|
||||
@@ -24,17 +27,135 @@ use datafusion_common::ScalarValue;
|
||||
use datafusion_expr::{col, lit};
|
||||
use datatypes::arrow::array::AsArray;
|
||||
use datatypes::arrow::datatypes::{Float64Type, TimestampMillisecondType};
|
||||
use datatypes::json::value::JsonValue;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::types::json_type::{JsonNativeType, JsonObjectType};
|
||||
use futures::TryStreamExt;
|
||||
use serde_json::json;
|
||||
use store_api::region_engine::{PrepareRequest, RegionEngine, RegionScanner};
|
||||
use store_api::region_request::RegionRequest;
|
||||
use store_api::storage::{RegionId, ScanRequest, TimeSeriesDistribution};
|
||||
use store_api::storage::{ProjectionInput, RegionId, ScanRequest, TimeSeriesDistribution};
|
||||
|
||||
use crate::config::MitoConfig;
|
||||
use crate::error::Error;
|
||||
use crate::read::read_columns::{ReadColumn, ReadColumns};
|
||||
use crate::read::scan_region::Scanner;
|
||||
use crate::test_util;
|
||||
use crate::test_util::{CreateRequestBuilder, TestEnv};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_json_type_hint_pushdown_scanner_returns_batches() -> WhateverResult<()> {
|
||||
// Create a region with a JSON2 field whose physical Parquet representation is a nested struct.
|
||||
// The scan below will only ask for field_0.a.x.
|
||||
|
||||
let request = CreateRequestBuilder::new()
|
||||
.field_datatype(ConcreteDataType::json2(JsonNativeType::Object(
|
||||
JsonObjectType::from([
|
||||
(
|
||||
"a".to_string(),
|
||||
JsonNativeType::Object(JsonObjectType::from([
|
||||
("x".to_string(), JsonNativeType::i64()),
|
||||
("y".to_string(), JsonNativeType::String),
|
||||
])),
|
||||
),
|
||||
("b".to_string(), JsonNativeType::String),
|
||||
]),
|
||||
)))
|
||||
.build();
|
||||
let schema = test_util::rows_schema(&request);
|
||||
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let region_id = RegionId::new(1024, 0);
|
||||
engine
|
||||
.handle_request(region_id, RegionRequest::Create(request))
|
||||
.await?;
|
||||
|
||||
// Write full JSON objects, then flush them so the scanner has an Parquet file where nested
|
||||
// projection can be pushed down.
|
||||
let rows = Rows {
|
||||
schema,
|
||||
rows: vec![
|
||||
row(vec![
|
||||
ValueData::StringValue("tag-1".to_string()),
|
||||
ValueData::JsonValue(encode_json_value(JsonValue::from(json!({
|
||||
"a": { "x": 10, "y": "ignored-a" },
|
||||
"b": "ignored-b"
|
||||
})))),
|
||||
ValueData::TimestampMillisecondValue(1000),
|
||||
]),
|
||||
row(vec![
|
||||
ValueData::StringValue("tag-2".to_string()),
|
||||
ValueData::JsonValue(encode_json_value(JsonValue::from(json!({
|
||||
"a": { "x": 20, "y": "ignored-c" },
|
||||
"b": "ignored-d"
|
||||
})))),
|
||||
ValueData::TimestampMillisecondValue(2000),
|
||||
]),
|
||||
],
|
||||
};
|
||||
test_util::put_rows(&engine, region_id, rows).await;
|
||||
test_util::flush_region(&engine, region_id, None).await;
|
||||
|
||||
// Simulate a query expression like json_get(field_0, 'a.x'): the logical projection still
|
||||
// returns the JSON2 root column, while json_type_hint tells scan input construction which
|
||||
// nested physical path is needed.
|
||||
|
||||
let request = ScanRequest {
|
||||
projection_input: Some(ProjectionInput::new(vec![1, 0])),
|
||||
json_type_hint: HashMap::from([(
|
||||
"field_0".to_string(),
|
||||
JsonNativeType::Object(JsonObjectType::from([(
|
||||
"a".to_string(),
|
||||
JsonNativeType::Object(JsonObjectType::from([(
|
||||
"x".to_string(),
|
||||
JsonNativeType::i64(),
|
||||
)])),
|
||||
)])),
|
||||
)]),
|
||||
..Default::default()
|
||||
};
|
||||
let scanner = engine.scanner(region_id, request).await?;
|
||||
let Scanner::Seq(seq_scan) = &scanner else {
|
||||
unreachable!();
|
||||
};
|
||||
// Verify the scan input only asks storage for field_0.a.x instead of the
|
||||
// whole JSON2 struct. tag_0 is still read as a normal root column.
|
||||
assert_eq!(
|
||||
seq_scan.input().read_cols,
|
||||
ReadColumns {
|
||||
cols: vec![
|
||||
ReadColumn::new(0, vec![]),
|
||||
ReadColumn::new(
|
||||
1,
|
||||
vec![vec![
|
||||
"field_0".to_string(),
|
||||
"a".to_string(),
|
||||
"x".to_string()
|
||||
]]
|
||||
),
|
||||
]
|
||||
}
|
||||
);
|
||||
|
||||
// The scanner should still return a valid RecordBatch in the requested logical projection.
|
||||
// Fields outside the pushed-down path are present as empty or null values because they were
|
||||
// not read from Parquet.
|
||||
|
||||
let stream = scanner.scan().await?;
|
||||
let batches = RecordBatches::try_collect(stream).await?;
|
||||
let expected = r#"
|
||||
+------------------------+-------+
|
||||
| field_0 | tag_0 |
|
||||
+------------------------+-------+
|
||||
| {a: {x: 10, y: }, b: } | tag-1 |
|
||||
| {a: {x: 20, y: }, b: } | tag-2 |
|
||||
+------------------------+-------+
|
||||
"#;
|
||||
assert_eq!(batches.pretty_print()?, expected.trim());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_incremental_query_stale_error() {
|
||||
let mut env = TestEnv::with_prefix("test_incremental_query_stale_error").await;
|
||||
|
||||
@@ -170,7 +170,7 @@ fn normalize_nested_paths(nested_paths: Vec<NestedPath>) -> Vec<NestedPath> {
|
||||
normalized
|
||||
}
|
||||
|
||||
fn merge_nested_paths(merged: &mut Vec<NestedPath>, incoming: Vec<NestedPath>) {
|
||||
pub(crate) fn merge_nested_paths(merged: &mut Vec<NestedPath>, incoming: Vec<NestedPath>) {
|
||||
for path in incoming {
|
||||
if merged
|
||||
.iter()
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
|
||||
//! Scans a region according to the scan request.
|
||||
|
||||
use std::collections::HashSet;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::fmt;
|
||||
use std::num::NonZeroU64;
|
||||
use std::sync::Arc;
|
||||
@@ -32,6 +32,7 @@ use datafusion_common::Column;
|
||||
use datafusion_expr::Expr;
|
||||
use datafusion_expr::utils::expr_to_columns;
|
||||
use datatypes::schema::ext::ArrowSchemaExt;
|
||||
use datatypes::types::json_type::JsonNativeType;
|
||||
use futures::StreamExt;
|
||||
use itertools::Itertools;
|
||||
use partition::expr::PartitionExpr;
|
||||
@@ -40,8 +41,8 @@ use snafu::ResultExt;
|
||||
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
|
||||
use store_api::region_engine::{PartitionRange, RegionScannerRef};
|
||||
use store_api::storage::{
|
||||
RegionId, ScanRequest, SequenceNumber, SequenceRange, TimeSeriesDistribution,
|
||||
TimeSeriesRowSelector,
|
||||
NestedPath, ProjectionInput, RegionId, ScanRequest, SequenceNumber, SequenceRange,
|
||||
TimeSeriesDistribution, TimeSeriesRowSelector,
|
||||
};
|
||||
use table::predicate::{Predicate, build_time_range_predicate, extract_time_range_from_expr};
|
||||
use tokio::sync::{Semaphore, mpsc};
|
||||
@@ -60,7 +61,8 @@ use crate::read::flat_projection::FlatProjectionMapper;
|
||||
use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
|
||||
use crate::read::range_cache::{ScanRequestFingerprint, implied_time_range_from_exprs};
|
||||
use crate::read::read_columns::{
|
||||
ReadColumns, merge, read_columns_from_predicate, read_columns_from_projection,
|
||||
ReadColumns, merge, merge_nested_paths, read_columns_from_predicate,
|
||||
read_columns_from_projection,
|
||||
};
|
||||
use crate::read::seq_scan::SeqScan;
|
||||
use crate::read::series_scan::SeriesScan;
|
||||
@@ -399,10 +401,17 @@ impl ScanRegion {
|
||||
|
||||
/// Creates a scan input.
|
||||
#[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
|
||||
async fn scan_input(self) -> Result<ScanInput> {
|
||||
async fn scan_input(mut self) -> Result<ScanInput> {
|
||||
let sst_min_sequence = self.request.sst_min_sequence.and_then(NonZeroU64::new);
|
||||
let time_range = self.build_time_range_predicate();
|
||||
let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
|
||||
if let Some(input) = self.request.projection_input.as_mut() {
|
||||
fill_json_nested_paths_from_hint(
|
||||
input,
|
||||
&self.request.json_type_hint,
|
||||
&self.version.metadata,
|
||||
);
|
||||
}
|
||||
|
||||
let read_cols = match &self.request.projection_input {
|
||||
Some(p) => {
|
||||
@@ -1313,6 +1322,58 @@ fn pre_filter_mode(append_mode: bool, merge_mode: MergeMode) -> PreFilterMode {
|
||||
}
|
||||
}
|
||||
|
||||
fn fill_json_nested_paths_from_hint(
|
||||
input: &mut ProjectionInput,
|
||||
json_type_hint: &HashMap<String, JsonNativeType>,
|
||||
metadata: &RegionMetadata,
|
||||
) {
|
||||
if json_type_hint.is_empty() {
|
||||
return;
|
||||
}
|
||||
let paths = nested_paths_from_json_type_hint(json_type_hint, metadata, &input.projection);
|
||||
merge_nested_paths(&mut input.nested_paths, paths);
|
||||
}
|
||||
|
||||
fn nested_paths_from_json_type_hint(
|
||||
json_type_hint: &HashMap<String, JsonNativeType>,
|
||||
metadata: &RegionMetadata,
|
||||
projection: &[usize],
|
||||
) -> Vec<NestedPath> {
|
||||
let mut paths = Vec::new();
|
||||
|
||||
for i in projection {
|
||||
let Some(column) = metadata.column_metadatas.get(*i) else {
|
||||
continue;
|
||||
};
|
||||
let column_name = &column.column_schema.name;
|
||||
let Some(json_type) = json_type_hint.get(column_name) else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let mut current = vec![column_name.clone()];
|
||||
collect_json_nested_paths(json_type, &mut current, &mut paths);
|
||||
}
|
||||
|
||||
paths
|
||||
}
|
||||
|
||||
fn collect_json_nested_paths(
|
||||
json_type: &JsonNativeType,
|
||||
current: &mut NestedPath,
|
||||
paths: &mut Vec<NestedPath>,
|
||||
) {
|
||||
match json_type {
|
||||
JsonNativeType::Object(fields) if !fields.is_empty() => {
|
||||
for (field, child) in fields {
|
||||
current.push(field.clone());
|
||||
collect_json_nested_paths(child, current, paths);
|
||||
current.pop();
|
||||
}
|
||||
}
|
||||
_ => paths.push(current.clone()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Output of [build_scan_fingerprint]: the cache fingerprint plus the derived
|
||||
/// implied time range used to decide whether the cache key can drop the time
|
||||
/// predicates for a given partition (see `build_range_cache_key`).
|
||||
@@ -1807,14 +1868,21 @@ mod tests {
|
||||
use datafusion::physical_plan::expressions::lit as physical_lit;
|
||||
use datafusion_common::ScalarValue;
|
||||
use datafusion_expr::{col, lit};
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::ColumnSchema;
|
||||
use datatypes::types::json_type::JsonObjectType;
|
||||
use datatypes::value::Value;
|
||||
use partition::expr::col as partition_col;
|
||||
use store_api::metadata::RegionMetadataBuilder;
|
||||
use store_api::storage::{TimeSeriesDistribution, TimeSeriesRowSelector};
|
||||
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
|
||||
use store_api::storage::{
|
||||
ProjectionInput, RegionId, TimeSeriesDistribution, TimeSeriesRowSelector,
|
||||
};
|
||||
|
||||
use super::*;
|
||||
use crate::cache::CacheManager;
|
||||
use crate::error::InvalidMetadataSnafu;
|
||||
use crate::read::range_cache::ScanRequestFingerprintBuilder;
|
||||
use crate::read::read_columns::ReadColumn;
|
||||
use crate::test_util::memtable_util::metadata_with_primary_key;
|
||||
use crate::test_util::scheduler_util::SchedulerEnv;
|
||||
|
||||
@@ -1842,6 +1910,89 @@ mod tests {
|
||||
lit(ScalarValue::TimestampMillisecond(Some(val), None))
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_fill_json_nested_paths_from_hint() -> Result<()> {
|
||||
fn json_projection_test_metadata() -> Result<RegionMetadataRef> {
|
||||
let mut builder = RegionMetadataBuilder::new(RegionId::new(1024, 0));
|
||||
builder
|
||||
.push_column_metadata(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new(
|
||||
"tag".to_string(),
|
||||
ConcreteDataType::string_datatype(),
|
||||
true,
|
||||
),
|
||||
semantic_type: SemanticType::Tag,
|
||||
column_id: 0,
|
||||
})
|
||||
.push_column_metadata(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new(
|
||||
"j".to_string(),
|
||||
ConcreteDataType::json2(JsonNativeType::Object(JsonObjectType::new())),
|
||||
true,
|
||||
),
|
||||
semantic_type: SemanticType::Field,
|
||||
column_id: 1,
|
||||
})
|
||||
.push_column_metadata(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new(
|
||||
"ts".to_string(),
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
false,
|
||||
),
|
||||
semantic_type: SemanticType::Timestamp,
|
||||
column_id: 2,
|
||||
});
|
||||
builder.primary_key(vec![0]);
|
||||
builder.build().context(InvalidMetadataSnafu).map(Arc::new)
|
||||
}
|
||||
|
||||
let metadata = json_projection_test_metadata()?;
|
||||
let hint = HashMap::from([(
|
||||
"j".to_string(),
|
||||
JsonNativeType::Object(JsonObjectType::from([
|
||||
("a".to_string(), JsonNativeType::i64()),
|
||||
(
|
||||
"b".to_string(),
|
||||
JsonNativeType::Object(JsonObjectType::from([(
|
||||
"c".to_string(),
|
||||
JsonNativeType::String,
|
||||
)])),
|
||||
),
|
||||
])),
|
||||
)]);
|
||||
|
||||
fn nested_path(parts: &[&str]) -> NestedPath {
|
||||
parts.iter().map(|part| part.to_string()).collect()
|
||||
}
|
||||
|
||||
let mut input = ProjectionInput::new(vec![1, 0]);
|
||||
fill_json_nested_paths_from_hint(&mut input, &hint, metadata.as_ref());
|
||||
let read_columns = read_columns_from_projection(input, &metadata)?;
|
||||
assert_eq!(
|
||||
read_columns,
|
||||
ReadColumns {
|
||||
cols: vec![
|
||||
ReadColumn::new(
|
||||
1,
|
||||
vec![nested_path(&["j", "a"]), nested_path(&["j", "b", "c"])]
|
||||
),
|
||||
ReadColumn::new(0, vec![])
|
||||
]
|
||||
}
|
||||
);
|
||||
|
||||
let mut input = ProjectionInput::new(vec![0]);
|
||||
fill_json_nested_paths_from_hint(&mut input, &hint, metadata.as_ref());
|
||||
let read_columns = read_columns_from_projection(input, &metadata)?;
|
||||
assert_eq!(
|
||||
read_columns,
|
||||
ReadColumns {
|
||||
cols: vec![ReadColumn::new(0, vec![])]
|
||||
}
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_build_scan_fingerprint_for_eligible_scan() {
|
||||
let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
|
||||
|
||||
@@ -827,6 +827,7 @@ mod tests {
|
||||
use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field, Schema, TimeUnit};
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::ColumnSchema;
|
||||
use datatypes::types::json_type::{JsonNativeType, JsonObjectType};
|
||||
use datatypes::value::ValueRef;
|
||||
use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, UInt8Vector, UInt64Vector};
|
||||
use mito_codec::row_converter::{
|
||||
@@ -838,8 +839,10 @@ mod tests {
|
||||
use store_api::storage::consts::ReservedColumnId;
|
||||
|
||||
use super::*;
|
||||
use crate::error::InvalidMetadataSnafu;
|
||||
use crate::read::read_columns::ReadColumn;
|
||||
use crate::sst::parquet::flat_format::{
|
||||
FlatReadFormat, FlatWriteFormat, sequence_column_index,
|
||||
FlatReadFormat, FlatWriteFormat, sequence_column_index, sst_column_id_indices,
|
||||
};
|
||||
use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema, with_field_id};
|
||||
|
||||
@@ -999,6 +1002,59 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_format_projection_preserves_nested_paths() -> Result<()> {
|
||||
let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
|
||||
builder
|
||||
.push_column_metadata(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new("tag0", ConcreteDataType::string_datatype(), true),
|
||||
semantic_type: SemanticType::Tag,
|
||||
column_id: 1,
|
||||
})
|
||||
.push_column_metadata(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new(
|
||||
"j",
|
||||
ConcreteDataType::json2(JsonNativeType::Object(JsonObjectType::from([
|
||||
("a".to_string(), JsonNativeType::i64()),
|
||||
("b".to_string(), JsonNativeType::String),
|
||||
]))),
|
||||
true,
|
||||
),
|
||||
semantic_type: SemanticType::Field,
|
||||
column_id: 4,
|
||||
})
|
||||
.push_column_metadata(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new(
|
||||
"ts",
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
false,
|
||||
),
|
||||
semantic_type: SemanticType::Timestamp,
|
||||
column_id: 5,
|
||||
})
|
||||
.primary_key(vec![1]);
|
||||
let metadata = Arc::new(builder.build().context(InvalidMetadataSnafu)?);
|
||||
let column_id_to_parquet_index = sst_column_id_indices(&metadata);
|
||||
let projection = FormatProjection::compute_format_projection(
|
||||
&column_id_to_parquet_index,
|
||||
metadata.column_metadatas.len() + FIXED_POS_COLUMN_NUM,
|
||||
ReadColumns {
|
||||
cols: vec![ReadColumn::new(
|
||||
4,
|
||||
vec![vec!["j".to_string(), "a".to_string()]],
|
||||
)],
|
||||
},
|
||||
);
|
||||
|
||||
let columns = projection.parquet_read_cols.columns();
|
||||
assert_eq!(1, columns[0].root_index());
|
||||
assert_eq!(
|
||||
&[vec!["j".to_string(), "a".to_string()]],
|
||||
columns[0].nested_paths()
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_empty_primary_key_offsets() {
|
||||
let array = build_test_pk_array(&[]);
|
||||
|
||||
@@ -2304,6 +2304,8 @@ mod tests {
|
||||
use std::fmt::{Debug, Formatter};
|
||||
use std::sync::{Arc, LazyLock};
|
||||
|
||||
use common_error::ext::WhateverResult;
|
||||
use common_recordbatch::ext::RecordBatchExt;
|
||||
use datafusion::arrow::datatypes::DataType;
|
||||
use datafusion_common::ScalarValue;
|
||||
use datafusion_expr::expr::ScalarFunction;
|
||||
@@ -2311,8 +2313,10 @@ mod tests {
|
||||
ColumnarValue, Expr, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, Volatility,
|
||||
col, lit,
|
||||
};
|
||||
use datatypes::arrow::array::{ArrayRef, Int64Array};
|
||||
use datatypes::arrow::array::{ArrayRef, Int64Array, StringArray, StructArray};
|
||||
use datatypes::arrow::datatypes::{Fields, Schema};
|
||||
use datatypes::arrow::record_batch::RecordBatch;
|
||||
use datatypes::extension::json::{JsonExtensionType, JsonMetadata};
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::ColumnSchema;
|
||||
use object_store::services::Memory;
|
||||
@@ -2325,8 +2329,123 @@ mod tests {
|
||||
|
||||
use super::*;
|
||||
use crate::sst::parquet::metadata::MetadataLoader;
|
||||
use crate::sst::parquet::read_columns::{ParquetReadColumn, ParquetReadColumns};
|
||||
use crate::test_util::sst_util::{sst_file_handle, sst_region_metadata};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_nested_projection_reads_partial_json2_physical_fields() -> WhateverResult<()> {
|
||||
// Write a full JSON2-like Arrow struct:
|
||||
// j: { a: { x: int, y: string }, b: string }.
|
||||
// The test later requests only j.a.x and verifies that the physical Parquet projection
|
||||
// does not materialize j.a.y or j.b.
|
||||
|
||||
let xy_fields = Fields::from(vec![
|
||||
Arc::new(Field::new("x", DataType::Int64, true)),
|
||||
Arc::new(Field::new("y", DataType::Utf8, true)),
|
||||
]);
|
||||
let a_field = Arc::new(Field::new("a", DataType::Struct(xy_fields.clone()), true));
|
||||
let b_field = Arc::new(Field::new("b", DataType::Utf8, true));
|
||||
let json_fields = Fields::from(vec![a_field, b_field]);
|
||||
let json_field = Field::new("j", DataType::Struct(json_fields.clone()), true)
|
||||
.with_extension_type(JsonExtensionType::new(Arc::new(JsonMetadata::default())));
|
||||
let schema = Arc::new(Schema::new(vec![json_field]));
|
||||
|
||||
let a_array = Arc::new(StructArray::new(
|
||||
xy_fields,
|
||||
vec![
|
||||
Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef,
|
||||
Arc::new(StringArray::from_iter_values(["x1", "x2", "x3"])) as ArrayRef,
|
||||
],
|
||||
None,
|
||||
)) as ArrayRef;
|
||||
let b_array = Arc::new(StringArray::from_iter_values(["b1", "b2", "b3"])) as ArrayRef;
|
||||
let j_array =
|
||||
Arc::new(StructArray::new(json_fields, vec![a_array, b_array], None)) as ArrayRef;
|
||||
let columns = vec![j_array];
|
||||
|
||||
let batch = RecordBatch::try_new(schema, columns).map_err(|e| e.to_string())?;
|
||||
|
||||
// Persist the complete nested schema to an in-memory Parquet file so the projection is
|
||||
// exercised through parquet-rs rather than a mock.
|
||||
|
||||
let object_store = ObjectStore::new(Memory::default())
|
||||
.map_err(|e| e.to_string())?
|
||||
.finish();
|
||||
let file_handle = sst_file_handle(0, 1);
|
||||
let file_path = file_handle.file_path("test_table", PathType::Bare);
|
||||
|
||||
let mut parquet_bytes = Vec::new();
|
||||
ArrowWriter::try_new(&mut parquet_bytes, batch.schema(), None)
|
||||
.and_then(|mut w| {
|
||||
w.write(&batch)?;
|
||||
Ok(w)
|
||||
})
|
||||
.and_then(|w| w.close())
|
||||
.map_err(|e| e.to_string())?;
|
||||
let file_size = parquet_bytes.len() as u64;
|
||||
object_store
|
||||
.write(&file_path, parquet_bytes)
|
||||
.await
|
||||
.map_err(|e| e.to_string())?;
|
||||
|
||||
let mut cache_metrics = MetadataCacheMetrics::default();
|
||||
let loader = MetadataLoader::new(object_store.clone(), &file_path, file_size);
|
||||
let parquet_meta = loader.load(&mut cache_metrics).await?;
|
||||
let parquet_schema = parquet_meta.file_metadata().schema_descr();
|
||||
assert_eq!(3, parquet_schema.num_columns());
|
||||
|
||||
// Ask Parquet to read only the deepest requested JSON2 path. This should select the single
|
||||
// leaf j.a.x and avoid both sibling leaves j.a.y and j.b.
|
||||
|
||||
let projection =
|
||||
ParquetReadColumns::from_deduped(vec![ParquetReadColumn::new(0).with_nested_paths(
|
||||
vec![vec!["j".to_string(), "a".to_string(), "x".to_string()]],
|
||||
)]);
|
||||
let projection_plan = build_projection_plan(&projection, parquet_schema);
|
||||
assert_eq!(vec![true], projection_plan.projected_root_presence);
|
||||
assert_eq!(
|
||||
projection_plan.mask,
|
||||
ProjectionMask::leaves(parquet_schema, vec![0])
|
||||
);
|
||||
|
||||
// Read through the low-level stream directly.
|
||||
|
||||
let arrow_metadata =
|
||||
ArrowReaderMetadata::try_new(Arc::new(parquet_meta), ArrowReaderOptions::new())
|
||||
.map_err(|e| e.to_string())?;
|
||||
let fetcher = SstParquetRangeFetcher::new(
|
||||
file_handle.file_id(),
|
||||
file_path.clone(),
|
||||
object_store,
|
||||
CacheStrategy::Disabled,
|
||||
0,
|
||||
None,
|
||||
);
|
||||
let mut stream = build_sst_parquet_record_batch_stream(
|
||||
arrow_metadata,
|
||||
0,
|
||||
None,
|
||||
projection_plan.mask,
|
||||
fetcher,
|
||||
file_path,
|
||||
)?;
|
||||
|
||||
let Some(batch) = stream.next().await.transpose()? else {
|
||||
unreachable!()
|
||||
};
|
||||
let expected = r#"
|
||||
+-------------+
|
||||
| j |
|
||||
+-------------+
|
||||
| {a: {x: 1}} |
|
||||
| {a: {x: 2}} |
|
||||
| {a: {x: 3}} |
|
||||
+-------------+
|
||||
"#;
|
||||
assert_eq!(batch.pretty_print(), expected.trim());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "current_thread")]
|
||||
async fn test_minmax_predicate_key_not_built_when_index_result_cache_disabled() {
|
||||
#[derive(Eq, PartialEq, Hash)]
|
||||
|
||||
@@ -724,6 +724,7 @@ pub struct CreateRequestBuilder {
|
||||
table_dir: String,
|
||||
tag_num: usize,
|
||||
field_num: usize,
|
||||
field_datatype: ConcreteDataType,
|
||||
options: HashMap<String, String>,
|
||||
primary_key: Option<Vec<ColumnId>>,
|
||||
all_not_null: bool,
|
||||
@@ -740,6 +741,7 @@ impl Default for CreateRequestBuilder {
|
||||
table_dir: "test".to_string(),
|
||||
tag_num: 1,
|
||||
field_num: 1,
|
||||
field_datatype: ConcreteDataType::float64_datatype(),
|
||||
options: HashMap::new(),
|
||||
primary_key: None,
|
||||
all_not_null: false,
|
||||
@@ -775,6 +777,11 @@ impl CreateRequestBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
pub(crate) fn field_datatype(mut self, value: ConcreteDataType) -> Self {
|
||||
self.field_datatype = value;
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn primary_key(mut self, primary_key: Vec<ColumnId>) -> Self {
|
||||
self.primary_key = Some(primary_key);
|
||||
@@ -833,7 +840,7 @@ impl CreateRequestBuilder {
|
||||
column_metadatas.push(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new(
|
||||
format!("field_{i}"),
|
||||
ConcreteDataType::float64_datatype(),
|
||||
self.field_datatype.clone(),
|
||||
nullable,
|
||||
),
|
||||
semantic_type: SemanticType::Field,
|
||||
@@ -895,7 +902,7 @@ impl CreateRequestBuilder {
|
||||
column_metadatas.push(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new(
|
||||
format!("field_{i}"),
|
||||
ConcreteDataType::float64_datatype(),
|
||||
self.field_datatype.clone(),
|
||||
nullable,
|
||||
),
|
||||
semantic_type: SemanticType::Field,
|
||||
|
||||
Reference in New Issue
Block a user