Compare commits

..

5 Commits

Author SHA1 Message Date
evenyag
3d78113c22 feat: reduce rows returned 2025-03-14 00:35:19 +08:00
evenyag
9ee50dae6d feat: push down tag distinct 2025-03-14 00:35:00 +08:00
evenyag
fa57df9dc2 feat: use tag only reader 2025-03-13 23:07:37 +08:00
evenyag
f935921831 feat: tags only reader 2025-03-13 23:01:39 +08:00
evenyag
7f7d431cd8 feat: tag only distinct hint wip 2025-03-13 16:09:54 +08:00
30 changed files with 206 additions and 654 deletions

24
Cargo.lock generated
View File

@@ -4119,12 +4119,11 @@ dependencies = [
[[package]]
name = "flate2"
version = "1.1.0"
version = "1.0.34"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "11faaf5a5236997af9848be0bef4db95824b1d534ebc64d0f0c6cf3e67bd38dc"
checksum = "a1b589b4dc103969ad3cf85c950899926ec64300a1a46d76c03a6072957036f0"
dependencies = [
"crc32fast",
"libz-rs-sys",
"libz-sys",
"miniz_oxide",
]
@@ -6279,15 +6278,6 @@ dependencies = [
"vcpkg",
]
[[package]]
name = "libz-rs-sys"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "902bc563b5d65ad9bba616b490842ef0651066a1a1dc3ce1087113ffcb873c8d"
dependencies = [
"zlib-rs",
]
[[package]]
name = "libz-sys"
version = "1.1.20"
@@ -6832,9 +6822,9 @@ checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
[[package]]
name = "miniz_oxide"
version = "0.8.5"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e3e04debbb59698c15bacbb6d93584a8c0ca9cc3213cb423d31f760d8843ce5"
checksum = "e2d80299ef12ff69b16a84bb182e3b9df68b5a91574d3d4fa6e41b65deec4df1"
dependencies = [
"adler2",
]
@@ -13964,12 +13954,6 @@ dependencies = [
"syn 2.0.96",
]
[[package]]
name = "zlib-rs"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b20717f0917c908dc63de2e44e97f1e6b126ca58d0e391cee86d504eb8fbd05"
[[package]]
name = "zstd"
version = "0.11.2+zstd.1.5.2"

View File

@@ -126,7 +126,6 @@ deadpool-postgres = "0.12"
derive_builder = "0.12"
dotenv = "0.15"
etcd-client = "0.14"
flate2 = { version = "1.1.0", default-features = false, features = ["zlib-rs"] }
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"

View File

@@ -132,15 +132,6 @@ pub fn options_from_skipping(skipping: &SkippingIndexOptions) -> Result<Option<C
Ok((!options.options.is_empty()).then_some(options))
}
/// Tries to construct a `ColumnOptions` for inverted index.
pub fn options_from_inverted() -> ColumnOptions {
let mut options = ColumnOptions::default();
options
.options
.insert(INVERTED_INDEX_GRPC_KEY.to_string(), "true".to_string());
options
}
/// Tries to construct a `FulltextAnalyzer` from the given analyzer.
pub fn as_fulltext_option(analyzer: Analyzer) -> FulltextAnalyzer {
match analyzer {

View File

@@ -26,7 +26,6 @@ use datafusion_common::cast::{as_boolean_array, as_null_array};
use datafusion_common::{internal_err, DataFusionError, ScalarValue};
use datatypes::arrow::array::{Array, BooleanArray, RecordBatch};
use datatypes::arrow::compute::filter_record_batch;
use datatypes::compute::or_kleene;
use datatypes::vectors::VectorRef;
use snafu::ResultExt;
@@ -48,8 +47,6 @@ pub struct SimpleFilterEvaluator {
literal: Scalar<ArrayRef>,
/// The operator.
op: Operator,
/// Only used when the operator is `Or`-chain.
literal_list: Vec<Scalar<ArrayRef>>,
}
impl SimpleFilterEvaluator {
@@ -72,7 +69,6 @@ impl SimpleFilterEvaluator {
column_name,
literal: val.to_scalar().ok()?,
op,
literal_list: vec![],
})
}
@@ -87,35 +83,6 @@ impl SimpleFilterEvaluator {
| Operator::LtEq
| Operator::Gt
| Operator::GtEq => {}
Operator::Or => {
let lhs = Self::try_new(&binary.left)?;
let rhs = Self::try_new(&binary.right)?;
if lhs.column_name != rhs.column_name
|| !matches!(lhs.op, Operator::Eq | Operator::Or)
|| !matches!(rhs.op, Operator::Eq | Operator::Or)
{
return None;
}
let mut list = vec![];
let placeholder_literal = lhs.literal.clone();
// above check guarantees the op is either `Eq` or `Or`
if matches!(lhs.op, Operator::Or) {
list.extend(lhs.literal_list);
} else {
list.push(lhs.literal);
}
if matches!(rhs.op, Operator::Or) {
list.extend(rhs.literal_list);
} else {
list.push(rhs.literal);
}
return Some(Self {
column_name: lhs.column_name,
literal: placeholder_literal,
op: Operator::Or,
literal_list: list,
});
}
_ => return None,
}
@@ -136,7 +103,6 @@ impl SimpleFilterEvaluator {
column_name: lhs.name.clone(),
literal,
op,
literal_list: vec![],
})
}
_ => None,
@@ -152,19 +118,19 @@ impl SimpleFilterEvaluator {
let input = input
.to_scalar()
.with_context(|_| ToArrowScalarSnafu { v: input.clone() })?;
let result = self.evaluate_datum(&input, 1)?;
let result = self.evaluate_datum(&input)?;
Ok(result.value(0))
}
pub fn evaluate_array(&self, input: &ArrayRef) -> Result<BooleanBuffer> {
self.evaluate_datum(input, input.len())
self.evaluate_datum(input)
}
pub fn evaluate_vector(&self, input: &VectorRef) -> Result<BooleanBuffer> {
self.evaluate_datum(&input.to_arrow_array(), input.len())
self.evaluate_datum(&input.to_arrow_array())
}
fn evaluate_datum(&self, input: &impl Datum, input_len: usize) -> Result<BooleanBuffer> {
fn evaluate_datum(&self, input: &impl Datum) -> Result<BooleanBuffer> {
let result = match self.op {
Operator::Eq => cmp::eq(input, &self.literal),
Operator::NotEq => cmp::neq(input, &self.literal),
@@ -172,15 +138,6 @@ impl SimpleFilterEvaluator {
Operator::LtEq => cmp::lt_eq(input, &self.literal),
Operator::Gt => cmp::gt(input, &self.literal),
Operator::GtEq => cmp::gt_eq(input, &self.literal),
Operator::Or => {
// OR operator stands for OR-chained EQs (or INLIST in other words)
let mut result: BooleanArray = vec![false; input_len].into();
for literal in &self.literal_list {
let rhs = cmp::eq(input, literal).context(ArrowComputeSnafu)?;
result = or_kleene(&result, &rhs).context(ArrowComputeSnafu)?;
}
Ok(result)
}
_ => {
return UnsupportedOperationSnafu {
reason: format!("{:?}", self.op),
@@ -392,49 +349,4 @@ mod test {
let expected = datatypes::arrow::array::Int32Array::from(vec![5, 6]);
assert_eq!(first_column_values, &expected);
}
#[test]
fn test_complex_filter_expression() {
// Create an expression tree for: col = 'B' OR col = 'C' OR col = 'D'
let col_eq_b = col("col").eq(lit("B"));
let col_eq_c = col("col").eq(lit("C"));
let col_eq_d = col("col").eq(lit("D"));
// Build the OR chain
let col_or_expr = col_eq_b.or(col_eq_c).or(col_eq_d);
// Check that SimpleFilterEvaluator can handle OR chain
let or_evaluator = SimpleFilterEvaluator::try_new(&col_or_expr).unwrap();
assert_eq!(or_evaluator.column_name, "col");
assert_eq!(or_evaluator.op, Operator::Or);
assert_eq!(or_evaluator.literal_list.len(), 3);
assert_eq!(format!("{:?}", or_evaluator.literal_list), "[Scalar(StringArray\n[\n \"B\",\n]), Scalar(StringArray\n[\n \"C\",\n]), Scalar(StringArray\n[\n \"D\",\n])]");
// Create a schema and batch for testing
let schema = Schema::new(vec![Field::new("col", DataType::Utf8, false)]);
let df_schema = DFSchema::try_from(schema.clone()).unwrap();
let props = ExecutionProps::new();
let physical_expr = create_physical_expr(&col_or_expr, &df_schema, &props).unwrap();
// Create test data
let col_data = Arc::new(datatypes::arrow::array::StringArray::from(vec![
"B", "C", "E", "B", "C", "D", "F",
]));
let batch = RecordBatch::try_new(Arc::new(schema), vec![col_data]).unwrap();
let expected = datatypes::arrow::array::StringArray::from(vec!["B", "C", "B", "C", "D"]);
// Filter the batch
let filtered_batch = batch_filter(&batch, &physical_expr).unwrap();
// Expected: rows with col in ("B", "C", "D")
// That would be rows 0, 1, 3, 4, 5
assert_eq!(filtered_batch.num_rows(), 5);
let col_filtered = filtered_batch
.column(0)
.as_any()
.downcast_ref::<datatypes::arrow::array::StringArray>()
.unwrap();
assert_eq!(col_filtered, &expected);
}
}

View File

@@ -339,6 +339,7 @@ impl MetadataRegion {
series_row_selector: None,
sequence: None,
distribution: None,
tag_only_distinct: false,
}
}
@@ -529,6 +530,7 @@ impl MetadataRegion {
series_row_selector: None,
sequence: None,
distribution: None,
tag_only_distinct: false,
};
let record_batch_stream = self
.mito

View File

@@ -81,6 +81,7 @@ async fn test_scan_projection() {
series_row_selector: None,
sequence: None,
distribution: None,
tag_only_distinct: false,
};
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();

View File

@@ -25,7 +25,7 @@ use datatypes::vectors::MutableVector;
use crate::error::Result;
use crate::metrics::MERGE_FILTER_ROWS_TOTAL;
use crate::read::{Batch, BatchColumn, BatchReader};
use crate::read::{Batch, BatchColumn, BatchReader, BoxedBatchReader};
/// A reader that dedup sorted batches from a source based on the
/// dedup strategy.
@@ -581,6 +581,52 @@ impl<I: Iterator<Item = Result<Batch>>> Iterator for LastNonNullIter<I> {
}
}
/// A reader that only returns tags for select distinct.
pub(crate) struct TagOnlyReader {
source: BoxedBatchReader,
/// Batch to return.
to_return: Option<Batch>,
}
impl TagOnlyReader {
/// Creates a new tags only reader.
pub(crate) fn new(source: BoxedBatchReader) -> Self {
Self {
source,
to_return: None,
}
}
}
#[async_trait]
impl BatchReader for TagOnlyReader {
async fn next_batch(&mut self) -> Result<Option<Batch>> {
while let Some(batch) = self.source.next_batch().await? {
if batch.is_empty() {
// Ensure that the batch is not empty before proceeding.
continue;
}
if let Some(to_return) = self.to_return.take() {
if to_return.primary_key() != batch.primary_key() {
self.to_return = Some(batch);
// A new key, store the batch and returns the previous one.
// Safety: The batch is not empty, so it has at least one row.
return Ok(Some(to_return.slice(0, 1)));
} else {
// The same key, override the batch.
self.to_return = Some(batch);
}
} else {
// No batch to return, store the current batch.
self.to_return = Some(batch);
}
}
Ok(self.to_return.take())
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;

View File

@@ -357,6 +357,7 @@ impl ScanRegion {
Some(p) => ProjectionMapper::new(&self.version.metadata, p.iter().copied())?,
None => ProjectionMapper::all(&self.version.metadata)?,
};
// Get memtable ranges to scan.
let memtables = memtables
.into_iter()
@@ -385,7 +386,8 @@ impl ScanRegion {
.with_filter_deleted(filter_deleted)
.with_merge_mode(self.version.options.merge_mode())
.with_series_row_selector(self.request.series_row_selector)
.with_distribution(self.request.distribution);
.with_distribution(self.request.distribution)
.with_tag_only_distinct(self.request.tag_only_distinct);
Ok(input)
}
@@ -567,6 +569,8 @@ pub(crate) struct ScanInput {
pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
/// Hint for the required distribution of the scanner.
pub(crate) distribution: Option<TimeSeriesDistribution>,
/// Hint for tag-only distinct scan.
pub(crate) tag_only_distinct: bool,
}
impl ScanInput {
@@ -592,6 +596,7 @@ impl ScanInput {
merge_mode: MergeMode::default(),
series_row_selector: None,
distribution: None,
tag_only_distinct: false,
}
}
@@ -724,6 +729,13 @@ impl ScanInput {
self
}
/// Sets the tag-only distinct scan hint.
#[must_use]
pub(crate) fn with_tag_only_distinct(mut self, tag_only_distinct: bool) -> Self {
self.tag_only_distinct = tag_only_distinct;
self
}
/// Scans sources in parallel.
///
/// # Panics if the input doesn't allow parallel scan.

View File

@@ -33,7 +33,7 @@ use store_api::storage::{TimeSeriesDistribution, TimeSeriesRowSelector};
use tokio::sync::Semaphore;
use crate::error::{PartitionOutOfRangeSnafu, Result};
use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
use crate::read::dedup::{DedupReader, LastNonNull, LastRow, TagOnlyReader};
use crate::read::last_row::LastRowReader;
use crate::read::merge::MergeReaderBuilder;
use crate::read::range::RangeBuilderList;
@@ -216,6 +216,7 @@ impl SeqScan {
let compaction = self.compaction;
let distinguish_range = self.properties.distinguish_partition_range;
let part_metrics = self.new_partition_metrics(partition);
let tag_only = self.stream_ctx.input.tag_only_distinct;
let stream = try_stream! {
part_metrics.on_first_poll();
@@ -241,6 +242,9 @@ impl SeqScan {
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
if tag_only {
reader = Box::new(TagOnlyReader::new(reader));
}
let cache = &stream_ctx.input.cache_strategy;
let mut metrics = ScannerMetrics::default();
let mut fetch_start = Instant::now();

View File

@@ -24,7 +24,6 @@ pub mod join;
pub mod json_path;
pub mod letter;
pub mod regex;
pub mod simple_extract;
pub mod timestamp;
pub mod urlencoding;
@@ -52,7 +51,6 @@ use super::error::{
use super::field::{Field, Fields};
use super::PipelineMap;
use crate::etl::error::{Error, Result};
use crate::etl::processor::simple_extract::SimpleExtractProcessor;
use crate::etl_error::UnsupportedProcessorSnafu;
const FIELD_NAME: &str = "field";
@@ -65,7 +63,6 @@ const SEPARATOR_NAME: &str = "separator";
const TARGET_FIELDS_NAME: &str = "target_fields";
const JSON_PATH_NAME: &str = "json_path";
const JSON_PATH_RESULT_INDEX_NAME: &str = "result_index";
const SIMPLE_EXTRACT_KEY_NAME: &str = "key";
/// Processor trait defines the interface for all processors.
///
@@ -100,7 +97,6 @@ pub enum ProcessorKind {
Epoch(EpochProcessor),
Date(DateProcessor),
JsonPath(JsonPathProcessor),
SimpleJsonPath(SimpleExtractProcessor),
Decolorize(DecolorizeProcessor),
Digest(DigestProcessor),
}
@@ -178,9 +174,6 @@ fn parse_processor(doc: &yaml_rust::Yaml) -> Result<ProcessorKind> {
ProcessorKind::Decolorize(DecolorizeProcessor::try_from(value)?)
}
digest::PROCESSOR_DIGEST => ProcessorKind::Digest(DigestProcessor::try_from(value)?),
simple_extract::PROCESSOR_SIMPLE_EXTRACT => {
ProcessorKind::SimpleJsonPath(SimpleExtractProcessor::try_from(value)?)
}
_ => return UnsupportedProcessorSnafu { processor: str_key }.fail(),
};

View File

@@ -1,148 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use snafu::OptionExt as _;
use crate::etl::error::{Error, Result};
use crate::etl::field::Fields;
use crate::etl::processor::{
yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, FIELDS_NAME, FIELD_NAME,
IGNORE_MISSING_NAME, SIMPLE_EXTRACT_KEY_NAME,
};
use crate::etl_error::{KeyMustBeStringSnafu, ProcessorMissingFieldSnafu};
use crate::{PipelineMap, Processor, Value};
pub(crate) const PROCESSOR_SIMPLE_EXTRACT: &str = "simple_extract";
#[derive(Debug, Default)]
pub struct SimpleExtractProcessor {
fields: Fields,
/// simple keys to extract nested JSON field
/// key `a.b` is saved as ['a', 'b'], each key represents a level of the JSON tree
key: Vec<String>,
ignore_missing: bool,
}
impl TryFrom<&yaml_rust::yaml::Hash> for SimpleExtractProcessor {
type Error = Error;
fn try_from(value: &yaml_rust::yaml::Hash) -> std::result::Result<Self, Self::Error> {
let mut fields = Fields::default();
let mut ignore_missing = false;
let mut keys = vec![];
for (k, v) in value.iter() {
let key = k
.as_str()
.with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
match key {
FIELD_NAME => {
fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
}
FIELDS_NAME => {
fields = yaml_new_fields(v, FIELDS_NAME)?;
}
IGNORE_MISSING_NAME => {
ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
}
SIMPLE_EXTRACT_KEY_NAME => {
let key_str = yaml_string(v, SIMPLE_EXTRACT_KEY_NAME)?;
keys.extend(key_str.split(".").map(|s| s.to_string()));
}
_ => {}
}
}
let processor = SimpleExtractProcessor {
fields,
key: keys,
ignore_missing,
};
Ok(processor)
}
}
impl SimpleExtractProcessor {
fn process_field(&self, val: &Value) -> Result<Value> {
let mut current = val;
for key in self.key.iter() {
let Value::Map(map) = current else {
return Ok(Value::Null);
};
let Some(v) = map.get(key) else {
return Ok(Value::Null);
};
current = v;
}
Ok(current.clone())
}
}
impl Processor for SimpleExtractProcessor {
fn kind(&self) -> &str {
PROCESSOR_SIMPLE_EXTRACT
}
fn ignore_missing(&self) -> bool {
self.ignore_missing
}
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
for field in self.fields.iter() {
let index = field.input_field();
match val.get(index) {
Some(v) => {
let processed = self.process_field(v)?;
let output_index = field.target_or_input_field();
val.insert(output_index.to_string(), processed);
}
None => {
if !self.ignore_missing {
return ProcessorMissingFieldSnafu {
processor: self.kind(),
field: field.input_field(),
}
.fail();
}
}
}
}
Ok(())
}
}
#[cfg(test)]
mod test {
#[test]
fn test_json_path() {
use super::*;
use crate::{Map, Value};
let processor = SimpleExtractProcessor {
key: vec!["hello".to_string()],
..Default::default()
};
let result = processor
.process_field(&Value::Map(Map::one(
"hello",
Value::String("world".to_string()),
)))
.unwrap();
assert_eq!(result, Value::String("world".to_string()));
}
}

View File

@@ -18,7 +18,6 @@ pub mod transformer;
use snafu::OptionExt;
use crate::etl::error::{Error, Result};
use crate::etl::processor::yaml_bool;
use crate::etl::transform::index::Index;
use crate::etl::value::Value;
@@ -26,7 +25,6 @@ const TRANSFORM_FIELD: &str = "field";
const TRANSFORM_FIELDS: &str = "fields";
const TRANSFORM_TYPE: &str = "type";
const TRANSFORM_INDEX: &str = "index";
const TRANSFORM_TAG: &str = "tag";
const TRANSFORM_DEFAULT: &str = "default";
const TRANSFORM_ON_FAILURE: &str = "on_failure";
@@ -146,8 +144,6 @@ pub struct Transform {
pub index: Option<Index>,
pub tag: bool,
pub on_failure: Option<OnFailure>,
}
@@ -158,7 +154,6 @@ impl Default for Transform {
type_: Value::Null,
default: None,
index: None,
tag: false,
on_failure: None,
}
}
@@ -190,7 +185,6 @@ impl TryFrom<&yaml_rust::yaml::Hash> for Transform {
let mut type_ = Value::Null;
let mut default = None;
let mut index = None;
let mut tag = false;
let mut on_failure = None;
for (k, v) in hash {
@@ -216,10 +210,6 @@ impl TryFrom<&yaml_rust::yaml::Hash> for Transform {
index = Some(index_str.try_into()?);
}
TRANSFORM_TAG => {
tag = yaml_bool(v, TRANSFORM_TAG)?;
}
TRANSFORM_DEFAULT => {
default = Some(Value::try_from(v)?);
}
@@ -257,7 +247,6 @@ impl TryFrom<&yaml_rust::yaml::Hash> for Transform {
default: final_default,
index,
on_failure,
tag,
};
Ok(builder)

View File

@@ -19,17 +19,14 @@ const INDEX_TIMEINDEX: &str = "time";
const INDEX_TAG: &str = "tag";
const INDEX_FULLTEXT: &str = "fulltext";
const INDEX_SKIPPING: &str = "skipping";
const INDEX_INVERTED: &str = "inverted";
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
#[allow(clippy::enum_variant_names)]
pub enum Index {
Time,
// deprecated, use Inverted instead
Tag,
Fulltext,
Skipping,
Inverted,
}
impl std::fmt::Display for Index {
@@ -39,7 +36,6 @@ impl std::fmt::Display for Index {
Index::Tag => INDEX_TAG,
Index::Fulltext => INDEX_FULLTEXT,
Index::Skipping => INDEX_SKIPPING,
Index::Inverted => INDEX_INVERTED,
};
write!(f, "{}", index)
@@ -63,7 +59,6 @@ impl TryFrom<&str> for Index {
INDEX_TAG => Ok(Index::Tag),
INDEX_FULLTEXT => Ok(Index::Fulltext),
INDEX_SKIPPING => Ok(Index::Skipping),
INDEX_INVERTED => Ok(Index::Inverted),
_ => UnsupportedIndexTypeSnafu { value }.fail(),
}
}

View File

@@ -96,7 +96,6 @@ impl GreptimeTransformer {
default,
index: Some(Index::Time),
on_failure: Some(crate::etl::transform::OnFailure::Default),
tag: false,
};
transforms.push(transform);
}

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use api::v1::column_data_type_extension::TypeExt;
use api::v1::column_def::{options_from_fulltext, options_from_inverted, options_from_skipping};
use api::v1::column_def::{options_from_fulltext, options_from_skipping};
use api::v1::{ColumnDataTypeExtension, ColumnOptions, JsonTypeExtension};
use datatypes::schema::{FulltextOptions, SkippingIndexOptions};
use greptime_proto::v1::value::ValueData;
@@ -95,16 +95,10 @@ pub(crate) fn coerce_columns(transform: &Transform) -> Result<Vec<ColumnSchema>>
}
fn coerce_semantic_type(transform: &Transform) -> SemanticType {
if transform.tag {
return SemanticType::Tag;
}
match transform.index {
Some(Index::Tag) => SemanticType::Tag,
Some(Index::Time) => SemanticType::Timestamp,
Some(Index::Fulltext) | Some(Index::Skipping) | Some(Index::Inverted) | None => {
SemanticType::Field
}
Some(Index::Fulltext) | Some(Index::Skipping) | None => SemanticType::Field,
}
}
@@ -118,7 +112,6 @@ fn coerce_options(transform: &Transform) -> Result<Option<ColumnOptions>> {
Some(Index::Skipping) => {
options_from_skipping(&SkippingIndexOptions::default()).context(ColumnOptionsSnafu)
}
Some(Index::Inverted) => Ok(Some(options_from_inverted())),
_ => Ok(None),
}
}
@@ -485,7 +478,6 @@ mod tests {
default: None,
index: None,
on_failure: None,
tag: false,
};
// valid string
@@ -511,7 +503,6 @@ mod tests {
default: None,
index: None,
on_failure: Some(OnFailure::Ignore),
tag: false,
};
let val = Value::String("hello".to_string());
@@ -527,7 +518,6 @@ mod tests {
default: None,
index: None,
on_failure: Some(OnFailure::Default),
tag: false,
};
// with no explicit default value

View File

@@ -1,69 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod common;
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, ColumnSchema, SemanticType};
use lazy_static::lazy_static;
lazy_static! {
static ref EXPECTED_SCHEMA: Vec<ColumnSchema> = vec![
common::make_column_schema(
"commit_author".to_string(),
ColumnDataType::String,
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
];
}
#[test]
fn test_gsub() {
let input_value_str = r#"
[
{
"commit": {
"commitTime": "1573840000.000",
"commitAuthor": "test"
}
}
]
"#;
let pipeline_yaml = r#"
---
processors:
- simple_extract:
field: commit, commit_author
key: "commitAuthor"
transform:
- field: commit_author
type: string
"#;
let output = common::parse_and_exec(input_value_str, pipeline_yaml);
assert_eq!(output.schema, *EXPECTED_SCHEMA);
assert_eq!(
output.rows[0].values[0].value_data,
Some(ValueData::StringValue("test".to_string()))
);
}

View File

@@ -96,7 +96,13 @@ impl Categorizer {
LogicalPlan::Extension(extension) => {
Self::check_extension_plan(extension.node.as_ref() as _)
}
LogicalPlan::Distinct(_) => Commutativity::Unimplemented,
LogicalPlan::Distinct(_) => {
if partition_cols.is_empty() {
return Commutativity::Commutative;
}
Commutativity::Unimplemented
}
LogicalPlan::Unnest(_) => Commutativity::Commutative,
LogicalPlan::Statement(_) => Commutativity::Unsupported,
LogicalPlan::Values(_) => Commutativity::Unsupported,

View File

@@ -247,6 +247,10 @@ impl DummyTableProvider {
self.scan_request.lock().unwrap().sequence = Some(sequence);
}
pub fn with_tag_only_distinct(&self, tag_only_distinct: bool) {
self.scan_request.lock().unwrap().tag_only_distinct = tag_only_distinct;
}
/// Gets the scan request of the provider.
#[cfg(test)]
pub fn scan_request(&self) -> ScanRequest {

View File

@@ -1,191 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_error::ext::BoxedError;
use common_query::logical_plan::SubstraitPlanDecoder;
use datafusion::catalog::CatalogProviderList;
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion};
use datafusion_common::{DFSchema, DFSchemaRef, DataFusionError};
use datafusion_expr::{LogicalPlan, UserDefinedLogicalNodeCore};
use snafu::ResultExt;
use crate::error::{DataFusionSnafu, Error, QueryPlanSnafu};
use crate::query_engine::DefaultPlanDecoder;
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
pub struct UnexpandedNode {
pub inner: Vec<u8>,
pub schema: DFSchemaRef,
}
impl UnexpandedNode {
pub fn new_no_schema(inner: Vec<u8>) -> Self {
Self {
inner,
schema: Arc::new(DFSchema::empty()),
}
}
}
impl PartialOrd for UnexpandedNode {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
self.inner.partial_cmp(&other.inner)
}
}
impl UnexpandedNode {
const NAME: &'static str = "Unexpanded";
}
impl UserDefinedLogicalNodeCore for UnexpandedNode {
fn name(&self) -> &'static str {
Self::NAME
}
fn inputs(&self) -> Vec<&LogicalPlan> {
vec![]
}
fn schema(&self) -> &DFSchemaRef {
&self.schema
}
fn with_exprs_and_inputs(
&self,
_: Vec<datafusion_expr::Expr>,
_: Vec<LogicalPlan>,
) -> datafusion_common::Result<Self> {
Ok(self.clone())
}
fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{}", Self::NAME)
}
fn expressions(&self) -> Vec<datafusion_expr::Expr> {
vec![]
}
}
/// Rewrite decoded `LogicalPlan` so all `UnexpandedNode` are expanded
///
/// This is a hack to support decoded substrait plan using async functions
///
/// Corresponding encode method should put custom logical node's input plan into `UnexpandedNode` after encoding into bytes
pub struct UnexpandDecoder {
pub default_decoder: DefaultPlanDecoder,
}
impl UnexpandDecoder {
pub fn new(default_decoder: DefaultPlanDecoder) -> Self {
Self { default_decoder }
}
}
impl UnexpandDecoder {
/// Decode substrait plan into `LogicalPlan` and recursively expand all unexpanded nodes
///
/// supporting async functions so our custom logical plan's input can be decoded as well
pub async fn decode(
&self,
message: bytes::Bytes,
catalog_list: Arc<dyn CatalogProviderList>,
optimize: bool,
) -> Result<LogicalPlan, Error> {
let plan = self
.default_decoder
.decode(message, catalog_list.clone(), optimize)
.await
.map_err(BoxedError::new)
.context(QueryPlanSnafu)?;
self.expand(plan, catalog_list, optimize)
.await
.map_err(BoxedError::new)
.context(QueryPlanSnafu)
}
/// Recursively expand all unexpanded nodes in the plan
pub async fn expand(
&self,
plan: LogicalPlan,
catalog_list: Arc<dyn CatalogProviderList>,
optimize: bool,
) -> Result<LogicalPlan, Error> {
let mut cur_unexpanded_node = None;
let mut root_expanded_plan = plan.clone();
loop {
root_expanded_plan
.apply(|p| {
if let LogicalPlan::Extension(node) = p {
if node.node.name() == UnexpandedNode::NAME {
let node = node.node.as_any().downcast_ref::<UnexpandedNode>().ok_or(
DataFusionError::Plan(
"Failed to downcast to UnexpandedNode".to_string(),
),
)?;
cur_unexpanded_node = Some(node.clone());
return Ok(TreeNodeRecursion::Stop);
}
}
Ok(TreeNodeRecursion::Continue)
})
.context(DataFusionSnafu)?;
if let Some(unexpanded) = cur_unexpanded_node.take() {
let decoded = self
.default_decoder
.decode(
unexpanded.inner.clone().into(),
catalog_list.clone(),
optimize,
)
.await
.map_err(BoxedError::new)
.context(QueryPlanSnafu)?;
let mut decoded = Some(decoded);
// replace it with decoded plan
// since if unexpanded the first node we encountered is the same node
root_expanded_plan = root_expanded_plan
.transform(|p| {
let Some(decoded) = decoded.take() else {
return Ok(Transformed::no(p));
};
if let LogicalPlan::Extension(node) = &p
&& node.node.name() == UnexpandedNode::NAME
{
let _ = node.node.as_any().downcast_ref::<UnexpandedNode>().ok_or(
DataFusionError::Plan(
"Failed to downcast to UnexpandedNode".to_string(),
),
)?;
Ok(Transformed::yes(decoded))
} else {
Ok(Transformed::no(p))
}
})
.context(DataFusionSnafu)?
.data;
} else {
// all node are expanded
break;
}
}
Ok(root_expanded_plan)
}
}

View File

@@ -26,7 +26,6 @@ pub mod dist_plan;
pub mod dummy_catalog;
pub mod error;
pub mod executor;
pub mod expand;
pub mod log_query;
pub mod metrics;
mod optimizer;

View File

@@ -21,7 +21,7 @@ use datafusion::datasource::DefaultTableSource;
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeVisitor};
use datafusion_common::{Column, Result};
use datafusion_expr::expr::Sort;
use datafusion_expr::{utils, Expr, LogicalPlan};
use datafusion_expr::{utils, Aggregate, Expr, LogicalPlan};
use datafusion_optimizer::{OptimizerConfig, OptimizerRule};
use store_api::storage::{TimeSeriesDistribution, TimeSeriesRowSelector};
@@ -92,6 +92,15 @@ impl ScanHintRule {
);
}
// set distinct columns hint
if !visitor.distinct_columns.is_empty() {
Self::set_distinct_columns_hint(
adapter,
&visitor.distinct_columns,
&visitor.distinct_filter_columns,
);
}
transformed = true;
}
}
@@ -185,6 +194,43 @@ impl ScanHintRule {
adapter.with_time_series_selector_hint(TimeSeriesRowSelector::LastRow);
}
}
fn set_distinct_columns_hint(
adapter: &DummyTableProvider,
distinct_columns: &HashSet<Column>,
distinct_filter_columns: &HashSet<Column>,
) {
let region_metadata = adapter.region_metadata();
let mut should_set_distinct_hint = true;
// check if all group_by columns are primary key
for col in distinct_columns {
let Some(column_metadata) = region_metadata.column_by_name(&col.name) else {
should_set_distinct_hint = false;
break;
};
if column_metadata.semantic_type != SemanticType::Tag {
should_set_distinct_hint = false;
break;
}
}
// check if all filter columns are primary key columns or time index.
for col in distinct_filter_columns {
let Some(column_metadata) = region_metadata.column_by_name(&col.name) else {
should_set_distinct_hint = false;
break;
};
if column_metadata.semantic_type != SemanticType::Tag
&& column_metadata.semantic_type != SemanticType::Timestamp
{
should_set_distinct_hint = false;
break;
}
}
if should_set_distinct_hint {
adapter.with_tag_only_distinct(true);
}
}
}
/// Traverse and fetch hints.
@@ -196,6 +242,10 @@ struct ScanHintVisitor {
/// This field stores saved `group_by` columns when all aggregate functions are `last_value`
/// and the `order_by` column which should be time index.
ts_row_selector: Option<(HashSet<Column>, Column)>,
/// Distinct columns for select distinct operation.
distinct_columns: HashSet<Column>,
/// Distinct filter column.
distinct_filter_columns: HashSet<Column>,
}
impl TreeNodeVisitor<'_> for ScanHintVisitor {
@@ -263,23 +313,31 @@ impl TreeNodeVisitor<'_> for ScanHintVisitor {
self.ts_row_selector = Some((group_by_cols, order_by_col));
}
}
// Check distinct.
if !is_all_last_value {
self.collect_distinct_columns(aggregate);
}
}
if self.ts_row_selector.is_some()
&& (matches!(node, LogicalPlan::Subquery(_)) || node.inputs().len() > 1)
{
if matches!(node, LogicalPlan::Subquery(_)) || node.inputs().len() > 1 {
// clean previous time series selector hint when encounter subqueries or join
self.ts_row_selector = None;
self.distinct_columns.clear();
}
if let LogicalPlan::Filter(filter) = node
&& let Some(group_by_exprs) = &self.ts_row_selector
{
let mut filter_referenced_cols = HashSet::default();
utils::expr_to_columns(&filter.predicate, &mut filter_referenced_cols)?;
// ensure only group_by columns are used in filter
if !filter_referenced_cols.is_subset(&group_by_exprs.0) {
self.ts_row_selector = None;
if let LogicalPlan::Filter(filter) = node {
if let Some(group_by_exprs) = &self.ts_row_selector {
let mut filter_referenced_cols = HashSet::default();
utils::expr_to_columns(&filter.predicate, &mut filter_referenced_cols)?;
// ensure only group_by columns are used in filter
if !filter_referenced_cols.is_subset(&group_by_exprs.0) {
self.ts_row_selector = None;
}
}
if !self.distinct_columns.is_empty() {
utils::expr_to_columns(&filter.predicate, &mut self.distinct_filter_columns)?;
}
}
@@ -289,7 +347,31 @@ impl TreeNodeVisitor<'_> for ScanHintVisitor {
impl ScanHintVisitor {
fn need_rewrite(&self) -> bool {
self.order_expr.is_some() || self.ts_row_selector.is_some()
self.order_expr.is_some()
|| self.ts_row_selector.is_some()
|| !self.distinct_columns.is_empty()
}
/// Returns select distinct columns.
fn collect_distinct_columns(&mut self, aggregate: &Aggregate) {
if !aggregate.aggr_expr.is_empty() {
return;
}
let mut is_all_distinct = true;
// make sure all the exprs are DIRECT `col` and collect them
let mut group_by_cols = HashSet::with_capacity(aggregate.group_expr.len());
for expr in &aggregate.group_expr {
if let Expr::Column(col) = expr {
group_by_cols.insert(col.clone());
} else {
is_all_distinct = false;
break;
}
}
if is_all_distinct {
self.distinct_columns = group_by_cols;
}
}
}

View File

@@ -32,22 +32,22 @@ pub struct TraceSpan {
pub service_name: Option<String>,
pub trace_id: String,
pub span_id: String,
pub parent_span_id: Option<String>,
pub parent_span_id: String,
// the following are fields
pub resource_attributes: Attributes,
pub resource_attributes: Attributes, // TODO(yuanbohan): Map in the future
pub scope_name: String,
pub scope_version: String,
pub scope_attributes: Attributes,
pub scope_attributes: Attributes, // TODO(yuanbohan): Map in the future
pub trace_state: String,
pub span_name: String,
pub span_kind: String,
pub span_status_code: String,
pub span_status_message: String,
pub span_attributes: Attributes,
pub span_events: SpanEvents, // TODO(yuanbohan): List in the future
pub span_links: SpanLinks, // TODO(yuanbohan): List in the future
pub start_in_nanosecond: u64, // this is also the Timestamp Index
pub span_attributes: Attributes, // TODO(yuanbohan): Map in the future
pub span_events: SpanEvents, // TODO(yuanbohan): List in the future
pub span_links: SpanLinks, // TODO(yuanbohan): List in the future
pub start_in_nanosecond: u64, // this is also the Timestamp Index
pub end_in_nanosecond: u64,
}
@@ -203,11 +203,7 @@ pub fn parse_span(
service_name,
trace_id: bytes_to_hex_string(&span.trace_id),
span_id: bytes_to_hex_string(&span.span_id),
parent_span_id: if span.parent_span_id.is_empty() {
None
} else {
Some(bytes_to_hex_string(&span.parent_span_id))
},
parent_span_id: bytes_to_hex_string(&span.parent_span_id),
resource_attributes: Attributes::from(resource_attrs),
trace_state: span.trace_state,

View File

@@ -91,10 +91,7 @@ pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()>
let iter = vec![
(TRACE_ID_COLUMN, span.trace_id),
(SPAN_ID_COLUMN, span.span_id),
(
PARENT_SPAN_ID_COLUMN,
span.parent_span_id.unwrap_or_default(),
),
(PARENT_SPAN_ID_COLUMN, span.parent_span_id),
]
.into_iter()
.map(|(col, val)| (col.to_string(), val));

View File

@@ -103,18 +103,8 @@ pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()>
row_writer::write_tags(writer, tags.into_iter(), &mut row)?;
// write fields
if let Some(parent_span_id) = span.parent_span_id {
row_writer::write_fields(
writer,
std::iter::once(make_string_column_data(
PARENT_SPAN_ID_COLUMN,
parent_span_id,
)),
&mut row,
)?;
}
let fields = vec![
make_string_column_data(PARENT_SPAN_ID_COLUMN, span.parent_span_id),
make_string_column_data(SPAN_KIND_COLUMN, span.span_kind),
make_string_column_data(SPAN_NAME_COLUMN, span.span_name),
make_string_column_data("span_status_code", span.span_status_code),

View File

@@ -58,4 +58,6 @@ pub struct ScanRequest {
pub sequence: Option<SequenceNumber>,
/// Optional hint for the distribution of time-series data.
pub distribution: Option<TimeSeriesDistribution>,
/// Optional hint for the tag-only distinct operation.
pub tag_only_distinct: bool,
}

View File

@@ -37,16 +37,16 @@ common-telemetry.workspace = true
common-test-util.workspace = true
common-time.workspace = true
common-wal.workspace = true
datanode.workspace = true
datanode = { workspace = true }
datatypes.workspace = true
dotenv.workspace = true
flate2.workspace = true
flate2 = "1.0"
flow.workspace = true
frontend = { workspace = true, features = ["testing"] }
futures.workspace = true
futures-util.workspace = true
hyper-util = { workspace = true, features = ["tokio"] }
log-query.workspace = true
log-query = { workspace = true }
loki-proto.workspace = true
meta-client.workspace = true
meta-srv = { workspace = true, features = ["mock"] }
@@ -96,5 +96,5 @@ prost.workspace = true
rand.workspace = true
session = { workspace = true, features = ["testing"] }
store-api.workspace = true
tokio-postgres.workspace = true
tokio-postgres = { workspace = true }
url = "2.3"

View File

@@ -1265,18 +1265,15 @@ transform:
- id1
- id2
type: int32
index: inverted
- fields:
- logger
type: string
- field: type
type: string
index: skipping
tag: true
- field: log
type: string
index: fulltext
tag: true
- field: time
type: time
index: timestamp
@@ -1351,7 +1348,8 @@ transform:
assert_eq!(res.status(), StatusCode::OK);
// 3. check schema
let expected_schema = "[[\"logs1\",\"CREATE TABLE IF NOT EXISTS \\\"logs1\\\" (\\n \\\"id1\\\" INT NULL INVERTED INDEX,\\n \\\"id2\\\" INT NULL INVERTED INDEX,\\n \\\"logger\\\" STRING NULL,\\n \\\"type\\\" STRING NULL SKIPPING INDEX WITH(granularity = '10240', type = 'BLOOM'),\\n \\\"log\\\" STRING NULL FULLTEXT INDEX WITH(analyzer = 'English', case_sensitive = 'false'),\\n \\\"time\\\" TIMESTAMP(9) NOT NULL,\\n TIME INDEX (\\\"time\\\"),\\n PRIMARY KEY (\\\"type\\\", \\\"log\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]";
let expected_schema = "[[\"logs1\",\"CREATE TABLE IF NOT EXISTS \\\"logs1\\\" (\\n \\\"id1\\\" INT NULL,\\n \\\"id2\\\" INT NULL,\\n \\\"logger\\\" STRING NULL,\\n \\\"type\\\" STRING NULL SKIPPING INDEX WITH(granularity = '10240', type = 'BLOOM'),\\n \\\"log\\\" STRING NULL FULLTEXT INDEX WITH(analyzer = 'English', case_sensitive = 'false'),\\n \\\"time\\\" TIMESTAMP(9) NOT NULL,\\n TIME INDEX (\\\"time\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]";
validate_data(
"pipeline_schema",
&client,
@@ -2197,7 +2195,7 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) {
assert_eq!(StatusCode::OK, res.status());
// select traces data
let expected = r#"[[1736480942444376000,1736480942444499000,123000,"c05d7a4ec8e1f231f02ed6e8da8655b4","9630f2916e2f7909","d24f921c75f68e23","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-client",[],[]],[1736480942444376000,1736480942444499000,123000,"c05d7a4ec8e1f231f02ed6e8da8655b4","d24f921c75f68e23",null,"SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-server",[],[]],[1736480942444589000,1736480942444712000,123000,"cc9e0991a2e63d274984bd44ee669203","8f847259b0f6e1ab","eba7be77e3558179","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-client",[],[]],[1736480942444589000,1736480942444712000,123000,"cc9e0991a2e63d274984bd44ee669203","eba7be77e3558179",null,"SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-server",[],[]]]"#;
let expected = r#"[[1736480942444376000,1736480942444499000,123000,"c05d7a4ec8e1f231f02ed6e8da8655b4","9630f2916e2f7909","d24f921c75f68e23","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-client",[],[]],[1736480942444376000,1736480942444499000,123000,"c05d7a4ec8e1f231f02ed6e8da8655b4","d24f921c75f68e23","","SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-server",[],[]],[1736480942444589000,1736480942444712000,123000,"cc9e0991a2e63d274984bd44ee669203","8f847259b0f6e1ab","eba7be77e3558179","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-client",[],[]],[1736480942444589000,1736480942444712000,123000,"cc9e0991a2e63d274984bd44ee669203","eba7be77e3558179","","SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-server",[],[]]]"#;
validate_data("otlp_traces", &client, "select * from mytable;", expected).await;
let expected_ddl = r#"[["mytable","CREATE TABLE IF NOT EXISTS \"mytable\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"timestamp_end\" TIMESTAMP(9) NULL,\n \"duration_nano\" BIGINT UNSIGNED NULL,\n \"trace_id\" STRING NULL SKIPPING INDEX WITH(granularity = '10240', type = 'BLOOM'),\n \"span_id\" STRING NULL,\n \"parent_span_id\" STRING NULL SKIPPING INDEX WITH(granularity = '10240', type = 'BLOOM'),\n \"span_kind\" STRING NULL,\n \"span_name\" STRING NULL SKIPPING INDEX WITH(granularity = '10240', type = 'BLOOM'),\n \"span_status_code\" STRING NULL,\n \"span_status_message\" STRING NULL,\n \"trace_state\" STRING NULL,\n \"scope_name\" STRING NULL,\n \"scope_version\" STRING NULL,\n \"service_name\" STRING NULL,\n \"span_attributes.net.peer.ip\" STRING NULL,\n \"span_attributes.peer.service\" STRING NULL,\n \"span_events\" JSON NULL,\n \"span_links\" JSON NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"trace_id\", \"span_id\")\n)\nPARTITION ON COLUMNS (\"trace_id\") (\n trace_id < '1',\n trace_id >= '1' AND trace_id < '2',\n trace_id >= '2' AND trace_id < '3',\n trace_id >= '3' AND trace_id < '4',\n trace_id >= '4' AND trace_id < '5',\n trace_id >= '5' AND trace_id < '6',\n trace_id >= '6' AND trace_id < '7',\n trace_id >= '7' AND trace_id < '8',\n trace_id >= '8' AND trace_id < '9',\n trace_id >= '9' AND trace_id < 'A',\n trace_id >= 'A' AND trace_id < 'B' OR trace_id >= 'a' AND trace_id < 'b',\n trace_id >= 'B' AND trace_id < 'C' OR trace_id >= 'b' AND trace_id < 'c',\n trace_id >= 'C' AND trace_id < 'D' OR trace_id >= 'c' AND trace_id < 'd',\n trace_id >= 'D' AND trace_id < 'E' OR trace_id >= 'd' AND trace_id < 'e',\n trace_id >= 'E' AND trace_id < 'F' OR trace_id >= 'e' AND trace_id < 'f',\n trace_id >= 'F' AND trace_id < 'a' OR trace_id >= 'f'\n)\nENGINE=mito\nWITH(\n append_mode = 'true'\n)"]]"#;

View File

@@ -204,26 +204,3 @@ DROP TABLE integers;
Affected Rows: 0
CREATE TABLE characters(c STRING, t TIMESTAMP TIME INDEX);
Affected Rows: 0
INSERT INTO characters VALUES ('a', 1), ('b', 2), ('c', 3), (NULL, 4), ('a', 5), ('b', 6), ('c', 7), (NULL, 8);
Affected Rows: 8
SELECT * FROM characters WHERE c IN ('a', 'c') ORDER BY t;
+---+-------------------------+
| c | t |
+---+-------------------------+
| a | 1970-01-01T00:00:00.001 |
| c | 1970-01-01T00:00:00.003 |
| a | 1970-01-01T00:00:00.005 |
| c | 1970-01-01T00:00:00.007 |
+---+-------------------------+
DROP TABLE characters;
Affected Rows: 0

View File

@@ -57,11 +57,3 @@ SELECT * FROM (SELECT i1.i AS a, i2.i AS b, row_number() OVER (ORDER BY i1.i, i2
SELECT * FROM (SELECT 0=1 AS cond FROM integers i1, integers i2 GROUP BY 1) a1 WHERE cond ORDER BY 1;
DROP TABLE integers;
CREATE TABLE characters(c STRING, t TIMESTAMP TIME INDEX);
INSERT INTO characters VALUES ('a', 1), ('b', 2), ('c', 3), (NULL, 4), ('a', 5), ('b', 6), ('c', 7), (NULL, 8);
SELECT * FROM characters WHERE c IN ('a', 'c') ORDER BY t;
DROP TABLE characters;

View File

@@ -15,8 +15,8 @@ common-error.workspace = true
common-query.workspace = true
common-recordbatch.workspace = true
common-time.workspace = true
datatypes.workspace = true
flate2.workspace = true
datatypes = { workspace = true }
flate2 = "1.0"
hex = "0.4"
local-ip-address = "0.6"
mysql = { version = "25.0.1", default-features = false, features = ["minimal", "rustls-tls"] }
@@ -31,5 +31,5 @@ tar = "0.4"
tempfile.workspace = true
tinytemplate = "1.2"
tokio.workspace = true
tokio-postgres.workspace = true
tokio-postgres = { workspace = true }
tokio-stream.workspace = true