mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-23 14:40:01 +00:00
Compare commits
5 Commits
async_deco
...
test/scan-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3d78113c22 | ||
|
|
9ee50dae6d | ||
|
|
fa57df9dc2 | ||
|
|
f935921831 | ||
|
|
7f7d431cd8 |
24
Cargo.lock
generated
24
Cargo.lock
generated
@@ -4119,12 +4119,11 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "flate2"
|
||||
version = "1.1.0"
|
||||
version = "1.0.34"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "11faaf5a5236997af9848be0bef4db95824b1d534ebc64d0f0c6cf3e67bd38dc"
|
||||
checksum = "a1b589b4dc103969ad3cf85c950899926ec64300a1a46d76c03a6072957036f0"
|
||||
dependencies = [
|
||||
"crc32fast",
|
||||
"libz-rs-sys",
|
||||
"libz-sys",
|
||||
"miniz_oxide",
|
||||
]
|
||||
@@ -6279,15 +6278,6 @@ dependencies = [
|
||||
"vcpkg",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "libz-rs-sys"
|
||||
version = "0.4.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "902bc563b5d65ad9bba616b490842ef0651066a1a1dc3ce1087113ffcb873c8d"
|
||||
dependencies = [
|
||||
"zlib-rs",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "libz-sys"
|
||||
version = "1.1.20"
|
||||
@@ -6832,9 +6822,9 @@ checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
|
||||
|
||||
[[package]]
|
||||
name = "miniz_oxide"
|
||||
version = "0.8.5"
|
||||
version = "0.8.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8e3e04debbb59698c15bacbb6d93584a8c0ca9cc3213cb423d31f760d8843ce5"
|
||||
checksum = "e2d80299ef12ff69b16a84bb182e3b9df68b5a91574d3d4fa6e41b65deec4df1"
|
||||
dependencies = [
|
||||
"adler2",
|
||||
]
|
||||
@@ -13964,12 +13954,6 @@ dependencies = [
|
||||
"syn 2.0.96",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "zlib-rs"
|
||||
version = "0.4.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8b20717f0917c908dc63de2e44e97f1e6b126ca58d0e391cee86d504eb8fbd05"
|
||||
|
||||
[[package]]
|
||||
name = "zstd"
|
||||
version = "0.11.2+zstd.1.5.2"
|
||||
|
||||
@@ -126,7 +126,6 @@ deadpool-postgres = "0.12"
|
||||
derive_builder = "0.12"
|
||||
dotenv = "0.15"
|
||||
etcd-client = "0.14"
|
||||
flate2 = { version = "1.1.0", default-features = false, features = ["zlib-rs"] }
|
||||
fst = "0.4.7"
|
||||
futures = "0.3"
|
||||
futures-util = "0.3"
|
||||
|
||||
@@ -132,15 +132,6 @@ pub fn options_from_skipping(skipping: &SkippingIndexOptions) -> Result<Option<C
|
||||
Ok((!options.options.is_empty()).then_some(options))
|
||||
}
|
||||
|
||||
/// Tries to construct a `ColumnOptions` for inverted index.
|
||||
pub fn options_from_inverted() -> ColumnOptions {
|
||||
let mut options = ColumnOptions::default();
|
||||
options
|
||||
.options
|
||||
.insert(INVERTED_INDEX_GRPC_KEY.to_string(), "true".to_string());
|
||||
options
|
||||
}
|
||||
|
||||
/// Tries to construct a `FulltextAnalyzer` from the given analyzer.
|
||||
pub fn as_fulltext_option(analyzer: Analyzer) -> FulltextAnalyzer {
|
||||
match analyzer {
|
||||
|
||||
@@ -26,7 +26,6 @@ use datafusion_common::cast::{as_boolean_array, as_null_array};
|
||||
use datafusion_common::{internal_err, DataFusionError, ScalarValue};
|
||||
use datatypes::arrow::array::{Array, BooleanArray, RecordBatch};
|
||||
use datatypes::arrow::compute::filter_record_batch;
|
||||
use datatypes::compute::or_kleene;
|
||||
use datatypes::vectors::VectorRef;
|
||||
use snafu::ResultExt;
|
||||
|
||||
@@ -48,8 +47,6 @@ pub struct SimpleFilterEvaluator {
|
||||
literal: Scalar<ArrayRef>,
|
||||
/// The operator.
|
||||
op: Operator,
|
||||
/// Only used when the operator is `Or`-chain.
|
||||
literal_list: Vec<Scalar<ArrayRef>>,
|
||||
}
|
||||
|
||||
impl SimpleFilterEvaluator {
|
||||
@@ -72,7 +69,6 @@ impl SimpleFilterEvaluator {
|
||||
column_name,
|
||||
literal: val.to_scalar().ok()?,
|
||||
op,
|
||||
literal_list: vec![],
|
||||
})
|
||||
}
|
||||
|
||||
@@ -87,35 +83,6 @@ impl SimpleFilterEvaluator {
|
||||
| Operator::LtEq
|
||||
| Operator::Gt
|
||||
| Operator::GtEq => {}
|
||||
Operator::Or => {
|
||||
let lhs = Self::try_new(&binary.left)?;
|
||||
let rhs = Self::try_new(&binary.right)?;
|
||||
if lhs.column_name != rhs.column_name
|
||||
|| !matches!(lhs.op, Operator::Eq | Operator::Or)
|
||||
|| !matches!(rhs.op, Operator::Eq | Operator::Or)
|
||||
{
|
||||
return None;
|
||||
}
|
||||
let mut list = vec![];
|
||||
let placeholder_literal = lhs.literal.clone();
|
||||
// above check guarantees the op is either `Eq` or `Or`
|
||||
if matches!(lhs.op, Operator::Or) {
|
||||
list.extend(lhs.literal_list);
|
||||
} else {
|
||||
list.push(lhs.literal);
|
||||
}
|
||||
if matches!(rhs.op, Operator::Or) {
|
||||
list.extend(rhs.literal_list);
|
||||
} else {
|
||||
list.push(rhs.literal);
|
||||
}
|
||||
return Some(Self {
|
||||
column_name: lhs.column_name,
|
||||
literal: placeholder_literal,
|
||||
op: Operator::Or,
|
||||
literal_list: list,
|
||||
});
|
||||
}
|
||||
_ => return None,
|
||||
}
|
||||
|
||||
@@ -136,7 +103,6 @@ impl SimpleFilterEvaluator {
|
||||
column_name: lhs.name.clone(),
|
||||
literal,
|
||||
op,
|
||||
literal_list: vec![],
|
||||
})
|
||||
}
|
||||
_ => None,
|
||||
@@ -152,19 +118,19 @@ impl SimpleFilterEvaluator {
|
||||
let input = input
|
||||
.to_scalar()
|
||||
.with_context(|_| ToArrowScalarSnafu { v: input.clone() })?;
|
||||
let result = self.evaluate_datum(&input, 1)?;
|
||||
let result = self.evaluate_datum(&input)?;
|
||||
Ok(result.value(0))
|
||||
}
|
||||
|
||||
pub fn evaluate_array(&self, input: &ArrayRef) -> Result<BooleanBuffer> {
|
||||
self.evaluate_datum(input, input.len())
|
||||
self.evaluate_datum(input)
|
||||
}
|
||||
|
||||
pub fn evaluate_vector(&self, input: &VectorRef) -> Result<BooleanBuffer> {
|
||||
self.evaluate_datum(&input.to_arrow_array(), input.len())
|
||||
self.evaluate_datum(&input.to_arrow_array())
|
||||
}
|
||||
|
||||
fn evaluate_datum(&self, input: &impl Datum, input_len: usize) -> Result<BooleanBuffer> {
|
||||
fn evaluate_datum(&self, input: &impl Datum) -> Result<BooleanBuffer> {
|
||||
let result = match self.op {
|
||||
Operator::Eq => cmp::eq(input, &self.literal),
|
||||
Operator::NotEq => cmp::neq(input, &self.literal),
|
||||
@@ -172,15 +138,6 @@ impl SimpleFilterEvaluator {
|
||||
Operator::LtEq => cmp::lt_eq(input, &self.literal),
|
||||
Operator::Gt => cmp::gt(input, &self.literal),
|
||||
Operator::GtEq => cmp::gt_eq(input, &self.literal),
|
||||
Operator::Or => {
|
||||
// OR operator stands for OR-chained EQs (or INLIST in other words)
|
||||
let mut result: BooleanArray = vec![false; input_len].into();
|
||||
for literal in &self.literal_list {
|
||||
let rhs = cmp::eq(input, literal).context(ArrowComputeSnafu)?;
|
||||
result = or_kleene(&result, &rhs).context(ArrowComputeSnafu)?;
|
||||
}
|
||||
Ok(result)
|
||||
}
|
||||
_ => {
|
||||
return UnsupportedOperationSnafu {
|
||||
reason: format!("{:?}", self.op),
|
||||
@@ -392,49 +349,4 @@ mod test {
|
||||
let expected = datatypes::arrow::array::Int32Array::from(vec![5, 6]);
|
||||
assert_eq!(first_column_values, &expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_complex_filter_expression() {
|
||||
// Create an expression tree for: col = 'B' OR col = 'C' OR col = 'D'
|
||||
let col_eq_b = col("col").eq(lit("B"));
|
||||
let col_eq_c = col("col").eq(lit("C"));
|
||||
let col_eq_d = col("col").eq(lit("D"));
|
||||
|
||||
// Build the OR chain
|
||||
let col_or_expr = col_eq_b.or(col_eq_c).or(col_eq_d);
|
||||
|
||||
// Check that SimpleFilterEvaluator can handle OR chain
|
||||
let or_evaluator = SimpleFilterEvaluator::try_new(&col_or_expr).unwrap();
|
||||
assert_eq!(or_evaluator.column_name, "col");
|
||||
assert_eq!(or_evaluator.op, Operator::Or);
|
||||
assert_eq!(or_evaluator.literal_list.len(), 3);
|
||||
assert_eq!(format!("{:?}", or_evaluator.literal_list), "[Scalar(StringArray\n[\n \"B\",\n]), Scalar(StringArray\n[\n \"C\",\n]), Scalar(StringArray\n[\n \"D\",\n])]");
|
||||
|
||||
// Create a schema and batch for testing
|
||||
let schema = Schema::new(vec![Field::new("col", DataType::Utf8, false)]);
|
||||
let df_schema = DFSchema::try_from(schema.clone()).unwrap();
|
||||
let props = ExecutionProps::new();
|
||||
let physical_expr = create_physical_expr(&col_or_expr, &df_schema, &props).unwrap();
|
||||
|
||||
// Create test data
|
||||
let col_data = Arc::new(datatypes::arrow::array::StringArray::from(vec![
|
||||
"B", "C", "E", "B", "C", "D", "F",
|
||||
]));
|
||||
let batch = RecordBatch::try_new(Arc::new(schema), vec![col_data]).unwrap();
|
||||
let expected = datatypes::arrow::array::StringArray::from(vec!["B", "C", "B", "C", "D"]);
|
||||
|
||||
// Filter the batch
|
||||
let filtered_batch = batch_filter(&batch, &physical_expr).unwrap();
|
||||
|
||||
// Expected: rows with col in ("B", "C", "D")
|
||||
// That would be rows 0, 1, 3, 4, 5
|
||||
assert_eq!(filtered_batch.num_rows(), 5);
|
||||
|
||||
let col_filtered = filtered_batch
|
||||
.column(0)
|
||||
.as_any()
|
||||
.downcast_ref::<datatypes::arrow::array::StringArray>()
|
||||
.unwrap();
|
||||
assert_eq!(col_filtered, &expected);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -339,6 +339,7 @@ impl MetadataRegion {
|
||||
series_row_selector: None,
|
||||
sequence: None,
|
||||
distribution: None,
|
||||
tag_only_distinct: false,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -529,6 +530,7 @@ impl MetadataRegion {
|
||||
series_row_selector: None,
|
||||
sequence: None,
|
||||
distribution: None,
|
||||
tag_only_distinct: false,
|
||||
};
|
||||
let record_batch_stream = self
|
||||
.mito
|
||||
|
||||
@@ -81,6 +81,7 @@ async fn test_scan_projection() {
|
||||
series_row_selector: None,
|
||||
sequence: None,
|
||||
distribution: None,
|
||||
tag_only_distinct: false,
|
||||
};
|
||||
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
|
||||
let batches = RecordBatches::try_collect(stream).await.unwrap();
|
||||
|
||||
@@ -25,7 +25,7 @@ use datatypes::vectors::MutableVector;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::metrics::MERGE_FILTER_ROWS_TOTAL;
|
||||
use crate::read::{Batch, BatchColumn, BatchReader};
|
||||
use crate::read::{Batch, BatchColumn, BatchReader, BoxedBatchReader};
|
||||
|
||||
/// A reader that dedup sorted batches from a source based on the
|
||||
/// dedup strategy.
|
||||
@@ -581,6 +581,52 @@ impl<I: Iterator<Item = Result<Batch>>> Iterator for LastNonNullIter<I> {
|
||||
}
|
||||
}
|
||||
|
||||
/// A reader that only returns tags for select distinct.
|
||||
pub(crate) struct TagOnlyReader {
|
||||
source: BoxedBatchReader,
|
||||
/// Batch to return.
|
||||
to_return: Option<Batch>,
|
||||
}
|
||||
|
||||
impl TagOnlyReader {
|
||||
/// Creates a new tags only reader.
|
||||
pub(crate) fn new(source: BoxedBatchReader) -> Self {
|
||||
Self {
|
||||
source,
|
||||
to_return: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl BatchReader for TagOnlyReader {
|
||||
async fn next_batch(&mut self) -> Result<Option<Batch>> {
|
||||
while let Some(batch) = self.source.next_batch().await? {
|
||||
if batch.is_empty() {
|
||||
// Ensure that the batch is not empty before proceeding.
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(to_return) = self.to_return.take() {
|
||||
if to_return.primary_key() != batch.primary_key() {
|
||||
self.to_return = Some(batch);
|
||||
// A new key, store the batch and returns the previous one.
|
||||
// Safety: The batch is not empty, so it has at least one row.
|
||||
return Ok(Some(to_return.slice(0, 1)));
|
||||
} else {
|
||||
// The same key, override the batch.
|
||||
self.to_return = Some(batch);
|
||||
}
|
||||
} else {
|
||||
// No batch to return, store the current batch.
|
||||
self.to_return = Some(batch);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(self.to_return.take())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -357,6 +357,7 @@ impl ScanRegion {
|
||||
Some(p) => ProjectionMapper::new(&self.version.metadata, p.iter().copied())?,
|
||||
None => ProjectionMapper::all(&self.version.metadata)?,
|
||||
};
|
||||
|
||||
// Get memtable ranges to scan.
|
||||
let memtables = memtables
|
||||
.into_iter()
|
||||
@@ -385,7 +386,8 @@ impl ScanRegion {
|
||||
.with_filter_deleted(filter_deleted)
|
||||
.with_merge_mode(self.version.options.merge_mode())
|
||||
.with_series_row_selector(self.request.series_row_selector)
|
||||
.with_distribution(self.request.distribution);
|
||||
.with_distribution(self.request.distribution)
|
||||
.with_tag_only_distinct(self.request.tag_only_distinct);
|
||||
Ok(input)
|
||||
}
|
||||
|
||||
@@ -567,6 +569,8 @@ pub(crate) struct ScanInput {
|
||||
pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
|
||||
/// Hint for the required distribution of the scanner.
|
||||
pub(crate) distribution: Option<TimeSeriesDistribution>,
|
||||
/// Hint for tag-only distinct scan.
|
||||
pub(crate) tag_only_distinct: bool,
|
||||
}
|
||||
|
||||
impl ScanInput {
|
||||
@@ -592,6 +596,7 @@ impl ScanInput {
|
||||
merge_mode: MergeMode::default(),
|
||||
series_row_selector: None,
|
||||
distribution: None,
|
||||
tag_only_distinct: false,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -724,6 +729,13 @@ impl ScanInput {
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets the tag-only distinct scan hint.
|
||||
#[must_use]
|
||||
pub(crate) fn with_tag_only_distinct(mut self, tag_only_distinct: bool) -> Self {
|
||||
self.tag_only_distinct = tag_only_distinct;
|
||||
self
|
||||
}
|
||||
|
||||
/// Scans sources in parallel.
|
||||
///
|
||||
/// # Panics if the input doesn't allow parallel scan.
|
||||
|
||||
@@ -33,7 +33,7 @@ use store_api::storage::{TimeSeriesDistribution, TimeSeriesRowSelector};
|
||||
use tokio::sync::Semaphore;
|
||||
|
||||
use crate::error::{PartitionOutOfRangeSnafu, Result};
|
||||
use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
|
||||
use crate::read::dedup::{DedupReader, LastNonNull, LastRow, TagOnlyReader};
|
||||
use crate::read::last_row::LastRowReader;
|
||||
use crate::read::merge::MergeReaderBuilder;
|
||||
use crate::read::range::RangeBuilderList;
|
||||
@@ -216,6 +216,7 @@ impl SeqScan {
|
||||
let compaction = self.compaction;
|
||||
let distinguish_range = self.properties.distinguish_partition_range;
|
||||
let part_metrics = self.new_partition_metrics(partition);
|
||||
let tag_only = self.stream_ctx.input.tag_only_distinct;
|
||||
|
||||
let stream = try_stream! {
|
||||
part_metrics.on_first_poll();
|
||||
@@ -241,6 +242,9 @@ impl SeqScan {
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
if tag_only {
|
||||
reader = Box::new(TagOnlyReader::new(reader));
|
||||
}
|
||||
let cache = &stream_ctx.input.cache_strategy;
|
||||
let mut metrics = ScannerMetrics::default();
|
||||
let mut fetch_start = Instant::now();
|
||||
|
||||
@@ -24,7 +24,6 @@ pub mod join;
|
||||
pub mod json_path;
|
||||
pub mod letter;
|
||||
pub mod regex;
|
||||
pub mod simple_extract;
|
||||
pub mod timestamp;
|
||||
pub mod urlencoding;
|
||||
|
||||
@@ -52,7 +51,6 @@ use super::error::{
|
||||
use super::field::{Field, Fields};
|
||||
use super::PipelineMap;
|
||||
use crate::etl::error::{Error, Result};
|
||||
use crate::etl::processor::simple_extract::SimpleExtractProcessor;
|
||||
use crate::etl_error::UnsupportedProcessorSnafu;
|
||||
|
||||
const FIELD_NAME: &str = "field";
|
||||
@@ -65,7 +63,6 @@ const SEPARATOR_NAME: &str = "separator";
|
||||
const TARGET_FIELDS_NAME: &str = "target_fields";
|
||||
const JSON_PATH_NAME: &str = "json_path";
|
||||
const JSON_PATH_RESULT_INDEX_NAME: &str = "result_index";
|
||||
const SIMPLE_EXTRACT_KEY_NAME: &str = "key";
|
||||
|
||||
/// Processor trait defines the interface for all processors.
|
||||
///
|
||||
@@ -100,7 +97,6 @@ pub enum ProcessorKind {
|
||||
Epoch(EpochProcessor),
|
||||
Date(DateProcessor),
|
||||
JsonPath(JsonPathProcessor),
|
||||
SimpleJsonPath(SimpleExtractProcessor),
|
||||
Decolorize(DecolorizeProcessor),
|
||||
Digest(DigestProcessor),
|
||||
}
|
||||
@@ -178,9 +174,6 @@ fn parse_processor(doc: &yaml_rust::Yaml) -> Result<ProcessorKind> {
|
||||
ProcessorKind::Decolorize(DecolorizeProcessor::try_from(value)?)
|
||||
}
|
||||
digest::PROCESSOR_DIGEST => ProcessorKind::Digest(DigestProcessor::try_from(value)?),
|
||||
simple_extract::PROCESSOR_SIMPLE_EXTRACT => {
|
||||
ProcessorKind::SimpleJsonPath(SimpleExtractProcessor::try_from(value)?)
|
||||
}
|
||||
_ => return UnsupportedProcessorSnafu { processor: str_key }.fail(),
|
||||
};
|
||||
|
||||
|
||||
@@ -1,148 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use snafu::OptionExt as _;
|
||||
|
||||
use crate::etl::error::{Error, Result};
|
||||
use crate::etl::field::Fields;
|
||||
use crate::etl::processor::{
|
||||
yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, FIELDS_NAME, FIELD_NAME,
|
||||
IGNORE_MISSING_NAME, SIMPLE_EXTRACT_KEY_NAME,
|
||||
};
|
||||
use crate::etl_error::{KeyMustBeStringSnafu, ProcessorMissingFieldSnafu};
|
||||
use crate::{PipelineMap, Processor, Value};
|
||||
|
||||
pub(crate) const PROCESSOR_SIMPLE_EXTRACT: &str = "simple_extract";
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct SimpleExtractProcessor {
|
||||
fields: Fields,
|
||||
/// simple keys to extract nested JSON field
|
||||
/// key `a.b` is saved as ['a', 'b'], each key represents a level of the JSON tree
|
||||
key: Vec<String>,
|
||||
ignore_missing: bool,
|
||||
}
|
||||
|
||||
impl TryFrom<&yaml_rust::yaml::Hash> for SimpleExtractProcessor {
|
||||
type Error = Error;
|
||||
|
||||
fn try_from(value: &yaml_rust::yaml::Hash) -> std::result::Result<Self, Self::Error> {
|
||||
let mut fields = Fields::default();
|
||||
let mut ignore_missing = false;
|
||||
let mut keys = vec![];
|
||||
|
||||
for (k, v) in value.iter() {
|
||||
let key = k
|
||||
.as_str()
|
||||
.with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
|
||||
match key {
|
||||
FIELD_NAME => {
|
||||
fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
|
||||
}
|
||||
FIELDS_NAME => {
|
||||
fields = yaml_new_fields(v, FIELDS_NAME)?;
|
||||
}
|
||||
IGNORE_MISSING_NAME => {
|
||||
ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
|
||||
}
|
||||
SIMPLE_EXTRACT_KEY_NAME => {
|
||||
let key_str = yaml_string(v, SIMPLE_EXTRACT_KEY_NAME)?;
|
||||
keys.extend(key_str.split(".").map(|s| s.to_string()));
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
let processor = SimpleExtractProcessor {
|
||||
fields,
|
||||
key: keys,
|
||||
ignore_missing,
|
||||
};
|
||||
|
||||
Ok(processor)
|
||||
}
|
||||
}
|
||||
|
||||
impl SimpleExtractProcessor {
|
||||
fn process_field(&self, val: &Value) -> Result<Value> {
|
||||
let mut current = val;
|
||||
for key in self.key.iter() {
|
||||
let Value::Map(map) = current else {
|
||||
return Ok(Value::Null);
|
||||
};
|
||||
let Some(v) = map.get(key) else {
|
||||
return Ok(Value::Null);
|
||||
};
|
||||
current = v;
|
||||
}
|
||||
Ok(current.clone())
|
||||
}
|
||||
}
|
||||
|
||||
impl Processor for SimpleExtractProcessor {
|
||||
fn kind(&self) -> &str {
|
||||
PROCESSOR_SIMPLE_EXTRACT
|
||||
}
|
||||
|
||||
fn ignore_missing(&self) -> bool {
|
||||
self.ignore_missing
|
||||
}
|
||||
|
||||
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
|
||||
for field in self.fields.iter() {
|
||||
let index = field.input_field();
|
||||
match val.get(index) {
|
||||
Some(v) => {
|
||||
let processed = self.process_field(v)?;
|
||||
let output_index = field.target_or_input_field();
|
||||
val.insert(output_index.to_string(), processed);
|
||||
}
|
||||
None => {
|
||||
if !self.ignore_missing {
|
||||
return ProcessorMissingFieldSnafu {
|
||||
processor: self.kind(),
|
||||
field: field.input_field(),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
|
||||
#[test]
|
||||
fn test_json_path() {
|
||||
use super::*;
|
||||
use crate::{Map, Value};
|
||||
|
||||
let processor = SimpleExtractProcessor {
|
||||
key: vec!["hello".to_string()],
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let result = processor
|
||||
.process_field(&Value::Map(Map::one(
|
||||
"hello",
|
||||
Value::String("world".to_string()),
|
||||
)))
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(result, Value::String("world".to_string()));
|
||||
}
|
||||
}
|
||||
@@ -18,7 +18,6 @@ pub mod transformer;
|
||||
use snafu::OptionExt;
|
||||
|
||||
use crate::etl::error::{Error, Result};
|
||||
use crate::etl::processor::yaml_bool;
|
||||
use crate::etl::transform::index::Index;
|
||||
use crate::etl::value::Value;
|
||||
|
||||
@@ -26,7 +25,6 @@ const TRANSFORM_FIELD: &str = "field";
|
||||
const TRANSFORM_FIELDS: &str = "fields";
|
||||
const TRANSFORM_TYPE: &str = "type";
|
||||
const TRANSFORM_INDEX: &str = "index";
|
||||
const TRANSFORM_TAG: &str = "tag";
|
||||
const TRANSFORM_DEFAULT: &str = "default";
|
||||
const TRANSFORM_ON_FAILURE: &str = "on_failure";
|
||||
|
||||
@@ -146,8 +144,6 @@ pub struct Transform {
|
||||
|
||||
pub index: Option<Index>,
|
||||
|
||||
pub tag: bool,
|
||||
|
||||
pub on_failure: Option<OnFailure>,
|
||||
}
|
||||
|
||||
@@ -158,7 +154,6 @@ impl Default for Transform {
|
||||
type_: Value::Null,
|
||||
default: None,
|
||||
index: None,
|
||||
tag: false,
|
||||
on_failure: None,
|
||||
}
|
||||
}
|
||||
@@ -190,7 +185,6 @@ impl TryFrom<&yaml_rust::yaml::Hash> for Transform {
|
||||
let mut type_ = Value::Null;
|
||||
let mut default = None;
|
||||
let mut index = None;
|
||||
let mut tag = false;
|
||||
let mut on_failure = None;
|
||||
|
||||
for (k, v) in hash {
|
||||
@@ -216,10 +210,6 @@ impl TryFrom<&yaml_rust::yaml::Hash> for Transform {
|
||||
index = Some(index_str.try_into()?);
|
||||
}
|
||||
|
||||
TRANSFORM_TAG => {
|
||||
tag = yaml_bool(v, TRANSFORM_TAG)?;
|
||||
}
|
||||
|
||||
TRANSFORM_DEFAULT => {
|
||||
default = Some(Value::try_from(v)?);
|
||||
}
|
||||
@@ -257,7 +247,6 @@ impl TryFrom<&yaml_rust::yaml::Hash> for Transform {
|
||||
default: final_default,
|
||||
index,
|
||||
on_failure,
|
||||
tag,
|
||||
};
|
||||
|
||||
Ok(builder)
|
||||
|
||||
@@ -19,17 +19,14 @@ const INDEX_TIMEINDEX: &str = "time";
|
||||
const INDEX_TAG: &str = "tag";
|
||||
const INDEX_FULLTEXT: &str = "fulltext";
|
||||
const INDEX_SKIPPING: &str = "skipping";
|
||||
const INDEX_INVERTED: &str = "inverted";
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
||||
#[allow(clippy::enum_variant_names)]
|
||||
pub enum Index {
|
||||
Time,
|
||||
// deprecated, use Inverted instead
|
||||
Tag,
|
||||
Fulltext,
|
||||
Skipping,
|
||||
Inverted,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for Index {
|
||||
@@ -39,7 +36,6 @@ impl std::fmt::Display for Index {
|
||||
Index::Tag => INDEX_TAG,
|
||||
Index::Fulltext => INDEX_FULLTEXT,
|
||||
Index::Skipping => INDEX_SKIPPING,
|
||||
Index::Inverted => INDEX_INVERTED,
|
||||
};
|
||||
|
||||
write!(f, "{}", index)
|
||||
@@ -63,7 +59,6 @@ impl TryFrom<&str> for Index {
|
||||
INDEX_TAG => Ok(Index::Tag),
|
||||
INDEX_FULLTEXT => Ok(Index::Fulltext),
|
||||
INDEX_SKIPPING => Ok(Index::Skipping),
|
||||
INDEX_INVERTED => Ok(Index::Inverted),
|
||||
_ => UnsupportedIndexTypeSnafu { value }.fail(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -96,7 +96,6 @@ impl GreptimeTransformer {
|
||||
default,
|
||||
index: Some(Index::Time),
|
||||
on_failure: Some(crate::etl::transform::OnFailure::Default),
|
||||
tag: false,
|
||||
};
|
||||
transforms.push(transform);
|
||||
}
|
||||
|
||||
@@ -13,7 +13,7 @@
|
||||
// limitations under the License.
|
||||
|
||||
use api::v1::column_data_type_extension::TypeExt;
|
||||
use api::v1::column_def::{options_from_fulltext, options_from_inverted, options_from_skipping};
|
||||
use api::v1::column_def::{options_from_fulltext, options_from_skipping};
|
||||
use api::v1::{ColumnDataTypeExtension, ColumnOptions, JsonTypeExtension};
|
||||
use datatypes::schema::{FulltextOptions, SkippingIndexOptions};
|
||||
use greptime_proto::v1::value::ValueData;
|
||||
@@ -95,16 +95,10 @@ pub(crate) fn coerce_columns(transform: &Transform) -> Result<Vec<ColumnSchema>>
|
||||
}
|
||||
|
||||
fn coerce_semantic_type(transform: &Transform) -> SemanticType {
|
||||
if transform.tag {
|
||||
return SemanticType::Tag;
|
||||
}
|
||||
|
||||
match transform.index {
|
||||
Some(Index::Tag) => SemanticType::Tag,
|
||||
Some(Index::Time) => SemanticType::Timestamp,
|
||||
Some(Index::Fulltext) | Some(Index::Skipping) | Some(Index::Inverted) | None => {
|
||||
SemanticType::Field
|
||||
}
|
||||
Some(Index::Fulltext) | Some(Index::Skipping) | None => SemanticType::Field,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -118,7 +112,6 @@ fn coerce_options(transform: &Transform) -> Result<Option<ColumnOptions>> {
|
||||
Some(Index::Skipping) => {
|
||||
options_from_skipping(&SkippingIndexOptions::default()).context(ColumnOptionsSnafu)
|
||||
}
|
||||
Some(Index::Inverted) => Ok(Some(options_from_inverted())),
|
||||
_ => Ok(None),
|
||||
}
|
||||
}
|
||||
@@ -485,7 +478,6 @@ mod tests {
|
||||
default: None,
|
||||
index: None,
|
||||
on_failure: None,
|
||||
tag: false,
|
||||
};
|
||||
|
||||
// valid string
|
||||
@@ -511,7 +503,6 @@ mod tests {
|
||||
default: None,
|
||||
index: None,
|
||||
on_failure: Some(OnFailure::Ignore),
|
||||
tag: false,
|
||||
};
|
||||
|
||||
let val = Value::String("hello".to_string());
|
||||
@@ -527,7 +518,6 @@ mod tests {
|
||||
default: None,
|
||||
index: None,
|
||||
on_failure: Some(OnFailure::Default),
|
||||
tag: false,
|
||||
};
|
||||
|
||||
// with no explicit default value
|
||||
|
||||
@@ -1,69 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
mod common;
|
||||
|
||||
use api::v1::value::ValueData;
|
||||
use api::v1::{ColumnDataType, ColumnSchema, SemanticType};
|
||||
use lazy_static::lazy_static;
|
||||
|
||||
lazy_static! {
|
||||
static ref EXPECTED_SCHEMA: Vec<ColumnSchema> = vec![
|
||||
common::make_column_schema(
|
||||
"commit_author".to_string(),
|
||||
ColumnDataType::String,
|
||||
SemanticType::Field,
|
||||
),
|
||||
common::make_column_schema(
|
||||
"greptime_timestamp".to_string(),
|
||||
ColumnDataType::TimestampNanosecond,
|
||||
SemanticType::Timestamp,
|
||||
),
|
||||
];
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_gsub() {
|
||||
let input_value_str = r#"
|
||||
[
|
||||
{
|
||||
"commit": {
|
||||
"commitTime": "1573840000.000",
|
||||
"commitAuthor": "test"
|
||||
}
|
||||
}
|
||||
]
|
||||
"#;
|
||||
|
||||
let pipeline_yaml = r#"
|
||||
---
|
||||
processors:
|
||||
- simple_extract:
|
||||
field: commit, commit_author
|
||||
key: "commitAuthor"
|
||||
|
||||
transform:
|
||||
- field: commit_author
|
||||
type: string
|
||||
"#;
|
||||
|
||||
let output = common::parse_and_exec(input_value_str, pipeline_yaml);
|
||||
|
||||
assert_eq!(output.schema, *EXPECTED_SCHEMA);
|
||||
|
||||
assert_eq!(
|
||||
output.rows[0].values[0].value_data,
|
||||
Some(ValueData::StringValue("test".to_string()))
|
||||
);
|
||||
}
|
||||
@@ -96,7 +96,13 @@ impl Categorizer {
|
||||
LogicalPlan::Extension(extension) => {
|
||||
Self::check_extension_plan(extension.node.as_ref() as _)
|
||||
}
|
||||
LogicalPlan::Distinct(_) => Commutativity::Unimplemented,
|
||||
LogicalPlan::Distinct(_) => {
|
||||
if partition_cols.is_empty() {
|
||||
return Commutativity::Commutative;
|
||||
}
|
||||
|
||||
Commutativity::Unimplemented
|
||||
}
|
||||
LogicalPlan::Unnest(_) => Commutativity::Commutative,
|
||||
LogicalPlan::Statement(_) => Commutativity::Unsupported,
|
||||
LogicalPlan::Values(_) => Commutativity::Unsupported,
|
||||
|
||||
@@ -247,6 +247,10 @@ impl DummyTableProvider {
|
||||
self.scan_request.lock().unwrap().sequence = Some(sequence);
|
||||
}
|
||||
|
||||
pub fn with_tag_only_distinct(&self, tag_only_distinct: bool) {
|
||||
self.scan_request.lock().unwrap().tag_only_distinct = tag_only_distinct;
|
||||
}
|
||||
|
||||
/// Gets the scan request of the provider.
|
||||
#[cfg(test)]
|
||||
pub fn scan_request(&self) -> ScanRequest {
|
||||
|
||||
@@ -1,191 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_error::ext::BoxedError;
|
||||
use common_query::logical_plan::SubstraitPlanDecoder;
|
||||
use datafusion::catalog::CatalogProviderList;
|
||||
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion};
|
||||
use datafusion_common::{DFSchema, DFSchemaRef, DataFusionError};
|
||||
use datafusion_expr::{LogicalPlan, UserDefinedLogicalNodeCore};
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::error::{DataFusionSnafu, Error, QueryPlanSnafu};
|
||||
use crate::query_engine::DefaultPlanDecoder;
|
||||
|
||||
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
|
||||
pub struct UnexpandedNode {
|
||||
pub inner: Vec<u8>,
|
||||
pub schema: DFSchemaRef,
|
||||
}
|
||||
|
||||
impl UnexpandedNode {
|
||||
pub fn new_no_schema(inner: Vec<u8>) -> Self {
|
||||
Self {
|
||||
inner,
|
||||
schema: Arc::new(DFSchema::empty()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialOrd for UnexpandedNode {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
|
||||
self.inner.partial_cmp(&other.inner)
|
||||
}
|
||||
}
|
||||
|
||||
impl UnexpandedNode {
|
||||
const NAME: &'static str = "Unexpanded";
|
||||
}
|
||||
|
||||
impl UserDefinedLogicalNodeCore for UnexpandedNode {
|
||||
fn name(&self) -> &'static str {
|
||||
Self::NAME
|
||||
}
|
||||
|
||||
fn inputs(&self) -> Vec<&LogicalPlan> {
|
||||
vec![]
|
||||
}
|
||||
|
||||
fn schema(&self) -> &DFSchemaRef {
|
||||
&self.schema
|
||||
}
|
||||
|
||||
fn with_exprs_and_inputs(
|
||||
&self,
|
||||
_: Vec<datafusion_expr::Expr>,
|
||||
_: Vec<LogicalPlan>,
|
||||
) -> datafusion_common::Result<Self> {
|
||||
Ok(self.clone())
|
||||
}
|
||||
|
||||
fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
write!(f, "{}", Self::NAME)
|
||||
}
|
||||
|
||||
fn expressions(&self) -> Vec<datafusion_expr::Expr> {
|
||||
vec![]
|
||||
}
|
||||
}
|
||||
|
||||
/// Rewrite decoded `LogicalPlan` so all `UnexpandedNode` are expanded
|
||||
///
|
||||
/// This is a hack to support decoded substrait plan using async functions
|
||||
///
|
||||
/// Corresponding encode method should put custom logical node's input plan into `UnexpandedNode` after encoding into bytes
|
||||
pub struct UnexpandDecoder {
|
||||
pub default_decoder: DefaultPlanDecoder,
|
||||
}
|
||||
|
||||
impl UnexpandDecoder {
|
||||
pub fn new(default_decoder: DefaultPlanDecoder) -> Self {
|
||||
Self { default_decoder }
|
||||
}
|
||||
}
|
||||
|
||||
impl UnexpandDecoder {
|
||||
/// Decode substrait plan into `LogicalPlan` and recursively expand all unexpanded nodes
|
||||
///
|
||||
/// supporting async functions so our custom logical plan's input can be decoded as well
|
||||
pub async fn decode(
|
||||
&self,
|
||||
message: bytes::Bytes,
|
||||
catalog_list: Arc<dyn CatalogProviderList>,
|
||||
optimize: bool,
|
||||
) -> Result<LogicalPlan, Error> {
|
||||
let plan = self
|
||||
.default_decoder
|
||||
.decode(message, catalog_list.clone(), optimize)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(QueryPlanSnafu)?;
|
||||
self.expand(plan, catalog_list, optimize)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(QueryPlanSnafu)
|
||||
}
|
||||
|
||||
/// Recursively expand all unexpanded nodes in the plan
|
||||
pub async fn expand(
|
||||
&self,
|
||||
plan: LogicalPlan,
|
||||
catalog_list: Arc<dyn CatalogProviderList>,
|
||||
optimize: bool,
|
||||
) -> Result<LogicalPlan, Error> {
|
||||
let mut cur_unexpanded_node = None;
|
||||
let mut root_expanded_plan = plan.clone();
|
||||
loop {
|
||||
root_expanded_plan
|
||||
.apply(|p| {
|
||||
if let LogicalPlan::Extension(node) = p {
|
||||
if node.node.name() == UnexpandedNode::NAME {
|
||||
let node = node.node.as_any().downcast_ref::<UnexpandedNode>().ok_or(
|
||||
DataFusionError::Plan(
|
||||
"Failed to downcast to UnexpandedNode".to_string(),
|
||||
),
|
||||
)?;
|
||||
cur_unexpanded_node = Some(node.clone());
|
||||
return Ok(TreeNodeRecursion::Stop);
|
||||
}
|
||||
}
|
||||
Ok(TreeNodeRecursion::Continue)
|
||||
})
|
||||
.context(DataFusionSnafu)?;
|
||||
|
||||
if let Some(unexpanded) = cur_unexpanded_node.take() {
|
||||
let decoded = self
|
||||
.default_decoder
|
||||
.decode(
|
||||
unexpanded.inner.clone().into(),
|
||||
catalog_list.clone(),
|
||||
optimize,
|
||||
)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(QueryPlanSnafu)?;
|
||||
let mut decoded = Some(decoded);
|
||||
|
||||
// replace it with decoded plan
|
||||
// since if unexpanded the first node we encountered is the same node
|
||||
root_expanded_plan = root_expanded_plan
|
||||
.transform(|p| {
|
||||
let Some(decoded) = decoded.take() else {
|
||||
return Ok(Transformed::no(p));
|
||||
};
|
||||
|
||||
if let LogicalPlan::Extension(node) = &p
|
||||
&& node.node.name() == UnexpandedNode::NAME
|
||||
{
|
||||
let _ = node.node.as_any().downcast_ref::<UnexpandedNode>().ok_or(
|
||||
DataFusionError::Plan(
|
||||
"Failed to downcast to UnexpandedNode".to_string(),
|
||||
),
|
||||
)?;
|
||||
Ok(Transformed::yes(decoded))
|
||||
} else {
|
||||
Ok(Transformed::no(p))
|
||||
}
|
||||
})
|
||||
.context(DataFusionSnafu)?
|
||||
.data;
|
||||
} else {
|
||||
// all node are expanded
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(root_expanded_plan)
|
||||
}
|
||||
}
|
||||
@@ -26,7 +26,6 @@ pub mod dist_plan;
|
||||
pub mod dummy_catalog;
|
||||
pub mod error;
|
||||
pub mod executor;
|
||||
pub mod expand;
|
||||
pub mod log_query;
|
||||
pub mod metrics;
|
||||
mod optimizer;
|
||||
|
||||
@@ -21,7 +21,7 @@ use datafusion::datasource::DefaultTableSource;
|
||||
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeVisitor};
|
||||
use datafusion_common::{Column, Result};
|
||||
use datafusion_expr::expr::Sort;
|
||||
use datafusion_expr::{utils, Expr, LogicalPlan};
|
||||
use datafusion_expr::{utils, Aggregate, Expr, LogicalPlan};
|
||||
use datafusion_optimizer::{OptimizerConfig, OptimizerRule};
|
||||
use store_api::storage::{TimeSeriesDistribution, TimeSeriesRowSelector};
|
||||
|
||||
@@ -92,6 +92,15 @@ impl ScanHintRule {
|
||||
);
|
||||
}
|
||||
|
||||
// set distinct columns hint
|
||||
if !visitor.distinct_columns.is_empty() {
|
||||
Self::set_distinct_columns_hint(
|
||||
adapter,
|
||||
&visitor.distinct_columns,
|
||||
&visitor.distinct_filter_columns,
|
||||
);
|
||||
}
|
||||
|
||||
transformed = true;
|
||||
}
|
||||
}
|
||||
@@ -185,6 +194,43 @@ impl ScanHintRule {
|
||||
adapter.with_time_series_selector_hint(TimeSeriesRowSelector::LastRow);
|
||||
}
|
||||
}
|
||||
|
||||
fn set_distinct_columns_hint(
|
||||
adapter: &DummyTableProvider,
|
||||
distinct_columns: &HashSet<Column>,
|
||||
distinct_filter_columns: &HashSet<Column>,
|
||||
) {
|
||||
let region_metadata = adapter.region_metadata();
|
||||
let mut should_set_distinct_hint = true;
|
||||
// check if all group_by columns are primary key
|
||||
for col in distinct_columns {
|
||||
let Some(column_metadata) = region_metadata.column_by_name(&col.name) else {
|
||||
should_set_distinct_hint = false;
|
||||
break;
|
||||
};
|
||||
if column_metadata.semantic_type != SemanticType::Tag {
|
||||
should_set_distinct_hint = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
// check if all filter columns are primary key columns or time index.
|
||||
for col in distinct_filter_columns {
|
||||
let Some(column_metadata) = region_metadata.column_by_name(&col.name) else {
|
||||
should_set_distinct_hint = false;
|
||||
break;
|
||||
};
|
||||
if column_metadata.semantic_type != SemanticType::Tag
|
||||
&& column_metadata.semantic_type != SemanticType::Timestamp
|
||||
{
|
||||
should_set_distinct_hint = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if should_set_distinct_hint {
|
||||
adapter.with_tag_only_distinct(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Traverse and fetch hints.
|
||||
@@ -196,6 +242,10 @@ struct ScanHintVisitor {
|
||||
/// This field stores saved `group_by` columns when all aggregate functions are `last_value`
|
||||
/// and the `order_by` column which should be time index.
|
||||
ts_row_selector: Option<(HashSet<Column>, Column)>,
|
||||
/// Distinct columns for select distinct operation.
|
||||
distinct_columns: HashSet<Column>,
|
||||
/// Distinct filter column.
|
||||
distinct_filter_columns: HashSet<Column>,
|
||||
}
|
||||
|
||||
impl TreeNodeVisitor<'_> for ScanHintVisitor {
|
||||
@@ -263,23 +313,31 @@ impl TreeNodeVisitor<'_> for ScanHintVisitor {
|
||||
self.ts_row_selector = Some((group_by_cols, order_by_col));
|
||||
}
|
||||
}
|
||||
|
||||
// Check distinct.
|
||||
if !is_all_last_value {
|
||||
self.collect_distinct_columns(aggregate);
|
||||
}
|
||||
}
|
||||
|
||||
if self.ts_row_selector.is_some()
|
||||
&& (matches!(node, LogicalPlan::Subquery(_)) || node.inputs().len() > 1)
|
||||
{
|
||||
if matches!(node, LogicalPlan::Subquery(_)) || node.inputs().len() > 1 {
|
||||
// clean previous time series selector hint when encounter subqueries or join
|
||||
self.ts_row_selector = None;
|
||||
self.distinct_columns.clear();
|
||||
}
|
||||
|
||||
if let LogicalPlan::Filter(filter) = node
|
||||
&& let Some(group_by_exprs) = &self.ts_row_selector
|
||||
{
|
||||
let mut filter_referenced_cols = HashSet::default();
|
||||
utils::expr_to_columns(&filter.predicate, &mut filter_referenced_cols)?;
|
||||
// ensure only group_by columns are used in filter
|
||||
if !filter_referenced_cols.is_subset(&group_by_exprs.0) {
|
||||
self.ts_row_selector = None;
|
||||
if let LogicalPlan::Filter(filter) = node {
|
||||
if let Some(group_by_exprs) = &self.ts_row_selector {
|
||||
let mut filter_referenced_cols = HashSet::default();
|
||||
utils::expr_to_columns(&filter.predicate, &mut filter_referenced_cols)?;
|
||||
// ensure only group_by columns are used in filter
|
||||
if !filter_referenced_cols.is_subset(&group_by_exprs.0) {
|
||||
self.ts_row_selector = None;
|
||||
}
|
||||
}
|
||||
|
||||
if !self.distinct_columns.is_empty() {
|
||||
utils::expr_to_columns(&filter.predicate, &mut self.distinct_filter_columns)?;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -289,7 +347,31 @@ impl TreeNodeVisitor<'_> for ScanHintVisitor {
|
||||
|
||||
impl ScanHintVisitor {
|
||||
fn need_rewrite(&self) -> bool {
|
||||
self.order_expr.is_some() || self.ts_row_selector.is_some()
|
||||
self.order_expr.is_some()
|
||||
|| self.ts_row_selector.is_some()
|
||||
|| !self.distinct_columns.is_empty()
|
||||
}
|
||||
|
||||
/// Returns select distinct columns.
|
||||
fn collect_distinct_columns(&mut self, aggregate: &Aggregate) {
|
||||
if !aggregate.aggr_expr.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
let mut is_all_distinct = true;
|
||||
// make sure all the exprs are DIRECT `col` and collect them
|
||||
let mut group_by_cols = HashSet::with_capacity(aggregate.group_expr.len());
|
||||
for expr in &aggregate.group_expr {
|
||||
if let Expr::Column(col) = expr {
|
||||
group_by_cols.insert(col.clone());
|
||||
} else {
|
||||
is_all_distinct = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if is_all_distinct {
|
||||
self.distinct_columns = group_by_cols;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -32,22 +32,22 @@ pub struct TraceSpan {
|
||||
pub service_name: Option<String>,
|
||||
pub trace_id: String,
|
||||
pub span_id: String,
|
||||
pub parent_span_id: Option<String>,
|
||||
pub parent_span_id: String,
|
||||
|
||||
// the following are fields
|
||||
pub resource_attributes: Attributes,
|
||||
pub resource_attributes: Attributes, // TODO(yuanbohan): Map in the future
|
||||
pub scope_name: String,
|
||||
pub scope_version: String,
|
||||
pub scope_attributes: Attributes,
|
||||
pub scope_attributes: Attributes, // TODO(yuanbohan): Map in the future
|
||||
pub trace_state: String,
|
||||
pub span_name: String,
|
||||
pub span_kind: String,
|
||||
pub span_status_code: String,
|
||||
pub span_status_message: String,
|
||||
pub span_attributes: Attributes,
|
||||
pub span_events: SpanEvents, // TODO(yuanbohan): List in the future
|
||||
pub span_links: SpanLinks, // TODO(yuanbohan): List in the future
|
||||
pub start_in_nanosecond: u64, // this is also the Timestamp Index
|
||||
pub span_attributes: Attributes, // TODO(yuanbohan): Map in the future
|
||||
pub span_events: SpanEvents, // TODO(yuanbohan): List in the future
|
||||
pub span_links: SpanLinks, // TODO(yuanbohan): List in the future
|
||||
pub start_in_nanosecond: u64, // this is also the Timestamp Index
|
||||
pub end_in_nanosecond: u64,
|
||||
}
|
||||
|
||||
@@ -203,11 +203,7 @@ pub fn parse_span(
|
||||
service_name,
|
||||
trace_id: bytes_to_hex_string(&span.trace_id),
|
||||
span_id: bytes_to_hex_string(&span.span_id),
|
||||
parent_span_id: if span.parent_span_id.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(bytes_to_hex_string(&span.parent_span_id))
|
||||
},
|
||||
parent_span_id: bytes_to_hex_string(&span.parent_span_id),
|
||||
|
||||
resource_attributes: Attributes::from(resource_attrs),
|
||||
trace_state: span.trace_state,
|
||||
|
||||
@@ -91,10 +91,7 @@ pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()>
|
||||
let iter = vec![
|
||||
(TRACE_ID_COLUMN, span.trace_id),
|
||||
(SPAN_ID_COLUMN, span.span_id),
|
||||
(
|
||||
PARENT_SPAN_ID_COLUMN,
|
||||
span.parent_span_id.unwrap_or_default(),
|
||||
),
|
||||
(PARENT_SPAN_ID_COLUMN, span.parent_span_id),
|
||||
]
|
||||
.into_iter()
|
||||
.map(|(col, val)| (col.to_string(), val));
|
||||
|
||||
@@ -103,18 +103,8 @@ pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()>
|
||||
row_writer::write_tags(writer, tags.into_iter(), &mut row)?;
|
||||
|
||||
// write fields
|
||||
if let Some(parent_span_id) = span.parent_span_id {
|
||||
row_writer::write_fields(
|
||||
writer,
|
||||
std::iter::once(make_string_column_data(
|
||||
PARENT_SPAN_ID_COLUMN,
|
||||
parent_span_id,
|
||||
)),
|
||||
&mut row,
|
||||
)?;
|
||||
}
|
||||
|
||||
let fields = vec![
|
||||
make_string_column_data(PARENT_SPAN_ID_COLUMN, span.parent_span_id),
|
||||
make_string_column_data(SPAN_KIND_COLUMN, span.span_kind),
|
||||
make_string_column_data(SPAN_NAME_COLUMN, span.span_name),
|
||||
make_string_column_data("span_status_code", span.span_status_code),
|
||||
|
||||
@@ -58,4 +58,6 @@ pub struct ScanRequest {
|
||||
pub sequence: Option<SequenceNumber>,
|
||||
/// Optional hint for the distribution of time-series data.
|
||||
pub distribution: Option<TimeSeriesDistribution>,
|
||||
/// Optional hint for the tag-only distinct operation.
|
||||
pub tag_only_distinct: bool,
|
||||
}
|
||||
|
||||
@@ -37,16 +37,16 @@ common-telemetry.workspace = true
|
||||
common-test-util.workspace = true
|
||||
common-time.workspace = true
|
||||
common-wal.workspace = true
|
||||
datanode.workspace = true
|
||||
datanode = { workspace = true }
|
||||
datatypes.workspace = true
|
||||
dotenv.workspace = true
|
||||
flate2.workspace = true
|
||||
flate2 = "1.0"
|
||||
flow.workspace = true
|
||||
frontend = { workspace = true, features = ["testing"] }
|
||||
futures.workspace = true
|
||||
futures-util.workspace = true
|
||||
hyper-util = { workspace = true, features = ["tokio"] }
|
||||
log-query.workspace = true
|
||||
log-query = { workspace = true }
|
||||
loki-proto.workspace = true
|
||||
meta-client.workspace = true
|
||||
meta-srv = { workspace = true, features = ["mock"] }
|
||||
@@ -96,5 +96,5 @@ prost.workspace = true
|
||||
rand.workspace = true
|
||||
session = { workspace = true, features = ["testing"] }
|
||||
store-api.workspace = true
|
||||
tokio-postgres.workspace = true
|
||||
tokio-postgres = { workspace = true }
|
||||
url = "2.3"
|
||||
|
||||
@@ -1265,18 +1265,15 @@ transform:
|
||||
- id1
|
||||
- id2
|
||||
type: int32
|
||||
index: inverted
|
||||
- fields:
|
||||
- logger
|
||||
type: string
|
||||
- field: type
|
||||
type: string
|
||||
index: skipping
|
||||
tag: true
|
||||
- field: log
|
||||
type: string
|
||||
index: fulltext
|
||||
tag: true
|
||||
- field: time
|
||||
type: time
|
||||
index: timestamp
|
||||
@@ -1351,7 +1348,8 @@ transform:
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
|
||||
// 3. check schema
|
||||
let expected_schema = "[[\"logs1\",\"CREATE TABLE IF NOT EXISTS \\\"logs1\\\" (\\n \\\"id1\\\" INT NULL INVERTED INDEX,\\n \\\"id2\\\" INT NULL INVERTED INDEX,\\n \\\"logger\\\" STRING NULL,\\n \\\"type\\\" STRING NULL SKIPPING INDEX WITH(granularity = '10240', type = 'BLOOM'),\\n \\\"log\\\" STRING NULL FULLTEXT INDEX WITH(analyzer = 'English', case_sensitive = 'false'),\\n \\\"time\\\" TIMESTAMP(9) NOT NULL,\\n TIME INDEX (\\\"time\\\"),\\n PRIMARY KEY (\\\"type\\\", \\\"log\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]";
|
||||
|
||||
let expected_schema = "[[\"logs1\",\"CREATE TABLE IF NOT EXISTS \\\"logs1\\\" (\\n \\\"id1\\\" INT NULL,\\n \\\"id2\\\" INT NULL,\\n \\\"logger\\\" STRING NULL,\\n \\\"type\\\" STRING NULL SKIPPING INDEX WITH(granularity = '10240', type = 'BLOOM'),\\n \\\"log\\\" STRING NULL FULLTEXT INDEX WITH(analyzer = 'English', case_sensitive = 'false'),\\n \\\"time\\\" TIMESTAMP(9) NOT NULL,\\n TIME INDEX (\\\"time\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]";
|
||||
validate_data(
|
||||
"pipeline_schema",
|
||||
&client,
|
||||
@@ -2197,7 +2195,7 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) {
|
||||
assert_eq!(StatusCode::OK, res.status());
|
||||
|
||||
// select traces data
|
||||
let expected = r#"[[1736480942444376000,1736480942444499000,123000,"c05d7a4ec8e1f231f02ed6e8da8655b4","9630f2916e2f7909","d24f921c75f68e23","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-client",[],[]],[1736480942444376000,1736480942444499000,123000,"c05d7a4ec8e1f231f02ed6e8da8655b4","d24f921c75f68e23",null,"SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-server",[],[]],[1736480942444589000,1736480942444712000,123000,"cc9e0991a2e63d274984bd44ee669203","8f847259b0f6e1ab","eba7be77e3558179","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-client",[],[]],[1736480942444589000,1736480942444712000,123000,"cc9e0991a2e63d274984bd44ee669203","eba7be77e3558179",null,"SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-server",[],[]]]"#;
|
||||
let expected = r#"[[1736480942444376000,1736480942444499000,123000,"c05d7a4ec8e1f231f02ed6e8da8655b4","9630f2916e2f7909","d24f921c75f68e23","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-client",[],[]],[1736480942444376000,1736480942444499000,123000,"c05d7a4ec8e1f231f02ed6e8da8655b4","d24f921c75f68e23","","SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-server",[],[]],[1736480942444589000,1736480942444712000,123000,"cc9e0991a2e63d274984bd44ee669203","8f847259b0f6e1ab","eba7be77e3558179","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-client",[],[]],[1736480942444589000,1736480942444712000,123000,"cc9e0991a2e63d274984bd44ee669203","eba7be77e3558179","","SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-server",[],[]]]"#;
|
||||
validate_data("otlp_traces", &client, "select * from mytable;", expected).await;
|
||||
|
||||
let expected_ddl = r#"[["mytable","CREATE TABLE IF NOT EXISTS \"mytable\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"timestamp_end\" TIMESTAMP(9) NULL,\n \"duration_nano\" BIGINT UNSIGNED NULL,\n \"trace_id\" STRING NULL SKIPPING INDEX WITH(granularity = '10240', type = 'BLOOM'),\n \"span_id\" STRING NULL,\n \"parent_span_id\" STRING NULL SKIPPING INDEX WITH(granularity = '10240', type = 'BLOOM'),\n \"span_kind\" STRING NULL,\n \"span_name\" STRING NULL SKIPPING INDEX WITH(granularity = '10240', type = 'BLOOM'),\n \"span_status_code\" STRING NULL,\n \"span_status_message\" STRING NULL,\n \"trace_state\" STRING NULL,\n \"scope_name\" STRING NULL,\n \"scope_version\" STRING NULL,\n \"service_name\" STRING NULL,\n \"span_attributes.net.peer.ip\" STRING NULL,\n \"span_attributes.peer.service\" STRING NULL,\n \"span_events\" JSON NULL,\n \"span_links\" JSON NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"trace_id\", \"span_id\")\n)\nPARTITION ON COLUMNS (\"trace_id\") (\n trace_id < '1',\n trace_id >= '1' AND trace_id < '2',\n trace_id >= '2' AND trace_id < '3',\n trace_id >= '3' AND trace_id < '4',\n trace_id >= '4' AND trace_id < '5',\n trace_id >= '5' AND trace_id < '6',\n trace_id >= '6' AND trace_id < '7',\n trace_id >= '7' AND trace_id < '8',\n trace_id >= '8' AND trace_id < '9',\n trace_id >= '9' AND trace_id < 'A',\n trace_id >= 'A' AND trace_id < 'B' OR trace_id >= 'a' AND trace_id < 'b',\n trace_id >= 'B' AND trace_id < 'C' OR trace_id >= 'b' AND trace_id < 'c',\n trace_id >= 'C' AND trace_id < 'D' OR trace_id >= 'c' AND trace_id < 'd',\n trace_id >= 'D' AND trace_id < 'E' OR trace_id >= 'd' AND trace_id < 'e',\n trace_id >= 'E' AND trace_id < 'F' OR trace_id >= 'e' AND trace_id < 'f',\n trace_id >= 'F' AND trace_id < 'a' OR trace_id >= 'f'\n)\nENGINE=mito\nWITH(\n append_mode = 'true'\n)"]]"#;
|
||||
|
||||
@@ -204,26 +204,3 @@ DROP TABLE integers;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
CREATE TABLE characters(c STRING, t TIMESTAMP TIME INDEX);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
INSERT INTO characters VALUES ('a', 1), ('b', 2), ('c', 3), (NULL, 4), ('a', 5), ('b', 6), ('c', 7), (NULL, 8);
|
||||
|
||||
Affected Rows: 8
|
||||
|
||||
SELECT * FROM characters WHERE c IN ('a', 'c') ORDER BY t;
|
||||
|
||||
+---+-------------------------+
|
||||
| c | t |
|
||||
+---+-------------------------+
|
||||
| a | 1970-01-01T00:00:00.001 |
|
||||
| c | 1970-01-01T00:00:00.003 |
|
||||
| a | 1970-01-01T00:00:00.005 |
|
||||
| c | 1970-01-01T00:00:00.007 |
|
||||
+---+-------------------------+
|
||||
|
||||
DROP TABLE characters;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
|
||||
@@ -57,11 +57,3 @@ SELECT * FROM (SELECT i1.i AS a, i2.i AS b, row_number() OVER (ORDER BY i1.i, i2
|
||||
SELECT * FROM (SELECT 0=1 AS cond FROM integers i1, integers i2 GROUP BY 1) a1 WHERE cond ORDER BY 1;
|
||||
|
||||
DROP TABLE integers;
|
||||
|
||||
CREATE TABLE characters(c STRING, t TIMESTAMP TIME INDEX);
|
||||
|
||||
INSERT INTO characters VALUES ('a', 1), ('b', 2), ('c', 3), (NULL, 4), ('a', 5), ('b', 6), ('c', 7), (NULL, 8);
|
||||
|
||||
SELECT * FROM characters WHERE c IN ('a', 'c') ORDER BY t;
|
||||
|
||||
DROP TABLE characters;
|
||||
|
||||
@@ -15,8 +15,8 @@ common-error.workspace = true
|
||||
common-query.workspace = true
|
||||
common-recordbatch.workspace = true
|
||||
common-time.workspace = true
|
||||
datatypes.workspace = true
|
||||
flate2.workspace = true
|
||||
datatypes = { workspace = true }
|
||||
flate2 = "1.0"
|
||||
hex = "0.4"
|
||||
local-ip-address = "0.6"
|
||||
mysql = { version = "25.0.1", default-features = false, features = ["minimal", "rustls-tls"] }
|
||||
@@ -31,5 +31,5 @@ tar = "0.4"
|
||||
tempfile.workspace = true
|
||||
tinytemplate = "1.2"
|
||||
tokio.workspace = true
|
||||
tokio-postgres.workspace = true
|
||||
tokio-postgres = { workspace = true }
|
||||
tokio-stream.workspace = true
|
||||
|
||||
Reference in New Issue
Block a user