chore: bump version to 0.14.1 (#6006)

* feat: remove own greatest fn (#5994)

* fix: prune primary key with multiple columns may use default value as statistics (#5996)

* test: incorrect test result when filtering pk with multiple columns

* fix: prune non first tag correctly

Distinguish no column and no stats and only use default value when no
column

* test: update test result

* refactor: rename test file

* test: add test for null filter

* fix: use StatValues for null counts

* test: drop table

* test: fix unstable flow test

* fix: check if memtable is empty by stats (#5989)

fix/checking-memtable-empty-and-stats:
 - **Refactor timestamp updates**: Simplified timestamp range updates in `PartitionTreeMemtable` and `TimeSeriesMemtable` by replacing `update_timestamp_range` with `fetch_max` and `fetch_min` methods for `max_timestamp` and `min_timestamp`.
   - Affected files: `partition_tree.rs`, `time_series.rs`

 - **Remove unused code**: Deleted the `update_timestamp_range` method from `WriteMetrics` and removed unnecessary imports.
   - Affected file: `stats.rs`

 - **Optimize memtable filtering**: Streamlined the check for empty memtables in `ScanRegion` by directly using `time_range`.
   - Affected file: `scan_region.rs`

* chore: bump version to 0.14.1

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
Co-authored-by: dennis zhuang <killme2008@gmail.com>
Co-authored-by: Yingwen <realevenyag@gmail.com>
Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com>
This commit is contained in:
Zhenchi
2025-04-28 15:39:49 +08:00
committed by GitHub
parent 66e2242e46
commit e2df38d0d1
15 changed files with 408 additions and 490 deletions

View File

@@ -13,10 +13,8 @@
// limitations under the License.
use std::sync::Arc;
mod greatest;
mod to_unixtime;
use greatest::GreatestFunction;
use to_unixtime::ToUnixtimeFunction;
use crate::function_registry::FunctionRegistry;
@@ -26,6 +24,5 @@ pub(crate) struct TimestampFunction;
impl TimestampFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register(Arc::new(ToUnixtimeFunction));
registry.register(Arc::new(GreatestFunction));
}
}

View File

@@ -1,328 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::{self};
use common_query::error::{
self, ArrowComputeSnafu, InvalidFuncArgsSnafu, Result, UnsupportedInputDataTypeSnafu,
};
use common_query::prelude::{Signature, Volatility};
use datafusion::arrow::compute::kernels::cmp::gt;
use datatypes::arrow::array::AsArray;
use datatypes::arrow::compute::cast;
use datatypes::arrow::compute::kernels::zip;
use datatypes::arrow::datatypes::{
DataType as ArrowDataType, Date32Type, TimeUnit, TimestampMicrosecondType,
TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType,
};
use datatypes::prelude::ConcreteDataType;
use datatypes::types::TimestampType;
use datatypes::vectors::{Helper, VectorRef};
use snafu::{ensure, ResultExt};
use crate::function::{Function, FunctionContext};
#[derive(Clone, Debug, Default)]
pub struct GreatestFunction;
const NAME: &str = "greatest";
macro_rules! gt_time_types {
($ty: ident, $columns:expr) => {{
let column1 = $columns[0].to_arrow_array();
let column2 = $columns[1].to_arrow_array();
let column1 = column1.as_primitive::<$ty>();
let column2 = column2.as_primitive::<$ty>();
let boolean_array = gt(&column1, &column2).context(ArrowComputeSnafu)?;
let result = zip::zip(&boolean_array, &column1, &column2).context(ArrowComputeSnafu)?;
Helper::try_into_vector(&result).context(error::FromArrowArraySnafu)
}};
}
impl Function for GreatestFunction {
fn name(&self) -> &str {
NAME
}
fn return_type(&self, input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
ensure!(
input_types.len() == 2,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly two, have: {}",
input_types.len()
)
}
);
match &input_types[0] {
ConcreteDataType::String(_) => Ok(ConcreteDataType::timestamp_millisecond_datatype()),
ConcreteDataType::Date(_) => Ok(ConcreteDataType::date_datatype()),
ConcreteDataType::Timestamp(ts_type) => Ok(ConcreteDataType::Timestamp(*ts_type)),
_ => UnsupportedInputDataTypeSnafu {
function: NAME,
datatypes: input_types,
}
.fail(),
}
}
fn signature(&self) -> Signature {
Signature::uniform(
2,
vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::date_datatype(),
ConcreteDataType::timestamp_nanosecond_datatype(),
ConcreteDataType::timestamp_microsecond_datatype(),
ConcreteDataType::timestamp_millisecond_datatype(),
ConcreteDataType::timestamp_second_datatype(),
],
Volatility::Immutable,
)
}
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 2,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly two, have: {}",
columns.len()
),
}
);
match columns[0].data_type() {
ConcreteDataType::String(_) => {
let column1 = cast(
&columns[0].to_arrow_array(),
&ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
)
.context(ArrowComputeSnafu)?;
let column1 = column1.as_primitive::<TimestampMillisecondType>();
let column2 = cast(
&columns[1].to_arrow_array(),
&ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
)
.context(ArrowComputeSnafu)?;
let column2 = column2.as_primitive::<TimestampMillisecondType>();
let boolean_array = gt(&column1, &column2).context(ArrowComputeSnafu)?;
let result =
zip::zip(&boolean_array, &column1, &column2).context(ArrowComputeSnafu)?;
Ok(Helper::try_into_vector(&result).context(error::FromArrowArraySnafu)?)
}
ConcreteDataType::Date(_) => gt_time_types!(Date32Type, columns),
ConcreteDataType::Timestamp(ts_type) => match ts_type {
TimestampType::Second(_) => gt_time_types!(TimestampSecondType, columns),
TimestampType::Millisecond(_) => {
gt_time_types!(TimestampMillisecondType, columns)
}
TimestampType::Microsecond(_) => {
gt_time_types!(TimestampMicrosecondType, columns)
}
TimestampType::Nanosecond(_) => {
gt_time_types!(TimestampNanosecondType, columns)
}
},
_ => UnsupportedInputDataTypeSnafu {
function: NAME,
datatypes: columns.iter().map(|c| c.data_type()).collect::<Vec<_>>(),
}
.fail(),
}
}
}
impl fmt::Display for GreatestFunction {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "GREATEST")
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_time::timestamp::TimeUnit;
use common_time::{Date, Timestamp};
use datatypes::types::{
DateType, TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
TimestampSecondType,
};
use datatypes::value::Value;
use datatypes::vectors::{
DateVector, StringVector, TimestampMicrosecondVector, TimestampMillisecondVector,
TimestampNanosecondVector, TimestampSecondVector, Vector,
};
use paste::paste;
use super::*;
#[test]
fn test_greatest_takes_string_vector() {
let function = GreatestFunction;
assert_eq!(
function
.return_type(&[
ConcreteDataType::string_datatype(),
ConcreteDataType::string_datatype()
])
.unwrap(),
ConcreteDataType::timestamp_millisecond_datatype()
);
let columns = vec![
Arc::new(StringVector::from(vec![
"1970-01-01".to_string(),
"2012-12-23".to_string(),
])) as _,
Arc::new(StringVector::from(vec![
"2001-02-01".to_string(),
"1999-01-01".to_string(),
])) as _,
];
let result = function
.eval(&FunctionContext::default(), &columns)
.unwrap();
let result = result
.as_any()
.downcast_ref::<TimestampMillisecondVector>()
.unwrap();
assert_eq!(result.len(), 2);
assert_eq!(
result.get(0),
Value::Timestamp(Timestamp::from_str("2001-02-01 00:00:00", None).unwrap())
);
assert_eq!(
result.get(1),
Value::Timestamp(Timestamp::from_str("2012-12-23 00:00:00", None).unwrap())
);
}
#[test]
fn test_greatest_takes_date_vector() {
let function = GreatestFunction;
assert_eq!(
function
.return_type(&[
ConcreteDataType::date_datatype(),
ConcreteDataType::date_datatype()
])
.unwrap(),
ConcreteDataType::Date(DateType)
);
let columns = vec![
Arc::new(DateVector::from_slice(vec![-1, 2])) as _,
Arc::new(DateVector::from_slice(vec![0, 1])) as _,
];
let result = function
.eval(&FunctionContext::default(), &columns)
.unwrap();
let result = result.as_any().downcast_ref::<DateVector>().unwrap();
assert_eq!(result.len(), 2);
assert_eq!(
result.get(0),
Value::Date(Date::from_str_utc("1970-01-01").unwrap())
);
assert_eq!(
result.get(1),
Value::Date(Date::from_str_utc("1970-01-03").unwrap())
);
}
#[test]
fn test_greatest_takes_datetime_vector() {
let function = GreatestFunction;
assert_eq!(
function
.return_type(&[
ConcreteDataType::timestamp_millisecond_datatype(),
ConcreteDataType::timestamp_millisecond_datatype()
])
.unwrap(),
ConcreteDataType::timestamp_millisecond_datatype()
);
let columns = vec![
Arc::new(TimestampMillisecondVector::from_slice(vec![-1, 2])) as _,
Arc::new(TimestampMillisecondVector::from_slice(vec![0, 1])) as _,
];
let result = function
.eval(&FunctionContext::default(), &columns)
.unwrap();
let result = result
.as_any()
.downcast_ref::<TimestampMillisecondVector>()
.unwrap();
assert_eq!(result.len(), 2);
assert_eq!(
result.get(0),
Value::Timestamp(Timestamp::from_str("1970-01-01 00:00:00", None).unwrap())
);
assert_eq!(
result.get(1),
Value::Timestamp(Timestamp::from_str("1970-01-01 00:00:00.002", None).unwrap())
);
}
macro_rules! test_timestamp {
($type: expr,$unit: ident) => {
paste! {
#[test]
fn [<test_greatest_takes_ $unit:lower _vector>]() {
let function = GreatestFunction;
assert_eq!(
function.return_type(&[$type, $type]).unwrap(),
ConcreteDataType::Timestamp(TimestampType::$unit([<Timestamp $unit Type>]))
);
let columns = vec![
Arc::new([<Timestamp $unit Vector>]::from_slice(vec![-1, 2])) as _,
Arc::new([<Timestamp $unit Vector>]::from_slice(vec![0, 1])) as _,
];
let result = function.eval(&FunctionContext::default(), &columns).unwrap();
let result = result.as_any().downcast_ref::<[<Timestamp $unit Vector>]>().unwrap();
assert_eq!(result.len(), 2);
assert_eq!(
result.get(0),
Value::Timestamp(Timestamp::new(0, TimeUnit::$unit))
);
assert_eq!(
result.get(1),
Value::Timestamp(Timestamp::new(2, TimeUnit::$unit))
);
}
}
}
}
test_timestamp!(
ConcreteDataType::timestamp_nanosecond_datatype(),
Nanosecond
);
test_timestamp!(
ConcreteDataType::timestamp_microsecond_datatype(),
Microsecond
);
test_timestamp!(
ConcreteDataType::timestamp_millisecond_datatype(),
Millisecond
);
test_timestamp!(ConcreteDataType::timestamp_second_datatype(), Second);
}

View File

@@ -302,7 +302,10 @@ impl PartitionTreeMemtable {
fn update_stats(&self, metrics: &WriteMetrics) {
// Only let the tracker tracks value bytes.
self.alloc_tracker.on_allocation(metrics.value_bytes);
metrics.update_timestamp_range(&self.max_timestamp, &self.min_timestamp);
self.max_timestamp
.fetch_max(metrics.max_ts, Ordering::SeqCst);
self.min_timestamp
.fetch_min(metrics.min_ts, Ordering::SeqCst);
}
}

View File

@@ -14,8 +14,6 @@
//! Internal metrics of the memtable.
use std::sync::atomic::{AtomicI64, Ordering};
/// Metrics of writing memtables.
pub(crate) struct WriteMetrics {
/// Size allocated by keys.
@@ -28,51 +26,6 @@ pub(crate) struct WriteMetrics {
pub(crate) max_ts: i64,
}
impl WriteMetrics {
/// Update the min/max timestamp range according to current write metric.
pub(crate) fn update_timestamp_range(&self, prev_max_ts: &AtomicI64, prev_min_ts: &AtomicI64) {
loop {
let current_min = prev_min_ts.load(Ordering::Relaxed);
if self.min_ts >= current_min {
break;
}
let Err(updated) = prev_min_ts.compare_exchange(
current_min,
self.min_ts,
Ordering::Relaxed,
Ordering::Relaxed,
) else {
break;
};
if updated == self.min_ts {
break;
}
}
loop {
let current_max = prev_max_ts.load(Ordering::Relaxed);
if self.max_ts <= current_max {
break;
}
let Err(updated) = prev_max_ts.compare_exchange(
current_max,
self.max_ts,
Ordering::Relaxed,
Ordering::Relaxed,
) else {
break;
};
if updated == self.max_ts {
break;
}
}
}
}
impl Default for WriteMetrics {
fn default() -> Self {
Self {

View File

@@ -147,7 +147,8 @@ impl TimeSeriesMemtable {
fn update_stats(&self, stats: WriteMetrics) {
self.alloc_tracker
.on_allocation(stats.key_bytes + stats.value_bytes);
stats.update_timestamp_range(&self.max_timestamp, &self.min_timestamp);
self.max_timestamp.fetch_max(stats.max_ts, Ordering::SeqCst);
self.min_timestamp.fetch_min(stats.min_ts, Ordering::SeqCst);
}
fn write_key_value(&self, kv: KeyValue, stats: &mut WriteMetrics) -> Result<()> {

View File

@@ -322,13 +322,10 @@ impl ScanRegion {
let memtables: Vec<_> = memtables
.into_iter()
.filter(|mem| {
if mem.is_empty() {
// check if memtable is empty by reading stats.
let Some((start, end)) = mem.stats().time_range() else {
return false;
}
let stats = mem.stats();
// Safety: the memtable is not empty.
let (start, end) = stats.time_range().unwrap();
};
// The time range of the memtable is inclusive.
let memtable_range = TimestampRange::new_inclusive(Some(start), Some(end));
memtable_range.intersects(&time_range)

View File

@@ -134,6 +134,7 @@ impl WriteFormat {
/// Helper for reading the SST format.
pub struct ReadFormat {
/// The metadata stored in the SST.
metadata: RegionMetadataRef,
/// SST file schema.
arrow_schema: SchemaRef,
@@ -305,17 +306,23 @@ impl ReadFormat {
&self,
row_groups: &[impl Borrow<RowGroupMetaData>],
column_id: ColumnId,
) -> Option<ArrayRef> {
let column = self.metadata.column_by_id(column_id)?;
) -> StatValues {
let Some(column) = self.metadata.column_by_id(column_id) else {
// No such column in the SST.
return StatValues::NoColumn;
};
match column.semantic_type {
SemanticType::Tag => self.tag_values(row_groups, column, true),
SemanticType::Field => {
let index = self.field_id_to_index.get(&column_id)?;
Self::column_values(row_groups, column, *index, true)
// Safety: `field_id_to_index` is initialized by the semantic type.
let index = self.field_id_to_index.get(&column_id).unwrap();
let stats = Self::column_values(row_groups, column, *index, true);
StatValues::from_stats_opt(stats)
}
SemanticType::Timestamp => {
let index = self.time_index_position();
Self::column_values(row_groups, column, index, true)
let stats = Self::column_values(row_groups, column, index, true);
StatValues::from_stats_opt(stats)
}
}
}
@@ -325,17 +332,23 @@ impl ReadFormat {
&self,
row_groups: &[impl Borrow<RowGroupMetaData>],
column_id: ColumnId,
) -> Option<ArrayRef> {
let column = self.metadata.column_by_id(column_id)?;
) -> StatValues {
let Some(column) = self.metadata.column_by_id(column_id) else {
// No such column in the SST.
return StatValues::NoColumn;
};
match column.semantic_type {
SemanticType::Tag => self.tag_values(row_groups, column, false),
SemanticType::Field => {
let index = self.field_id_to_index.get(&column_id)?;
Self::column_values(row_groups, column, *index, false)
// Safety: `field_id_to_index` is initialized by the semantic type.
let index = self.field_id_to_index.get(&column_id).unwrap();
let stats = Self::column_values(row_groups, column, *index, false);
StatValues::from_stats_opt(stats)
}
SemanticType::Timestamp => {
let index = self.time_index_position();
Self::column_values(row_groups, column, index, false)
let stats = Self::column_values(row_groups, column, index, false);
StatValues::from_stats_opt(stats)
}
}
}
@@ -345,17 +358,23 @@ impl ReadFormat {
&self,
row_groups: &[impl Borrow<RowGroupMetaData>],
column_id: ColumnId,
) -> Option<ArrayRef> {
let column = self.metadata.column_by_id(column_id)?;
) -> StatValues {
let Some(column) = self.metadata.column_by_id(column_id) else {
// No such column in the SST.
return StatValues::NoColumn;
};
match column.semantic_type {
SemanticType::Tag => None,
SemanticType::Tag => StatValues::NoStats,
SemanticType::Field => {
let index = self.field_id_to_index.get(&column_id)?;
Self::column_null_counts(row_groups, *index)
// Safety: `field_id_to_index` is initialized by the semantic type.
let index = self.field_id_to_index.get(&column_id).unwrap();
let stats = Self::column_null_counts(row_groups, *index);
StatValues::from_stats_opt(stats)
}
SemanticType::Timestamp => {
let index = self.time_index_position();
Self::column_null_counts(row_groups, index)
let stats = Self::column_null_counts(row_groups, index);
StatValues::from_stats_opt(stats)
}
}
}
@@ -390,8 +409,7 @@ impl ReadFormat {
row_groups: &[impl Borrow<RowGroupMetaData>],
column: &ColumnMetadata,
is_min: bool,
) -> Option<ArrayRef> {
let primary_key_encoding = self.metadata.primary_key_encoding;
) -> StatValues {
let is_first_tag = self
.metadata
.primary_key
@@ -400,9 +418,28 @@ impl ReadFormat {
.unwrap_or(false);
if !is_first_tag {
// Only the min-max of the first tag is available in the primary key.
return None;
return StatValues::NoStats;
}
StatValues::from_stats_opt(self.first_tag_values(row_groups, column, is_min))
}
/// Returns min/max values of the first tag.
/// Returns None if the tag does not have statistics.
fn first_tag_values(
&self,
row_groups: &[impl Borrow<RowGroupMetaData>],
column: &ColumnMetadata,
is_min: bool,
) -> Option<ArrayRef> {
debug_assert!(self
.metadata
.primary_key
.first()
.map(|id| *id == column.column_id)
.unwrap_or(false));
let primary_key_encoding = self.metadata.primary_key_encoding;
let converter = build_primary_key_codec_with_fields(
primary_key_encoding,
[(
@@ -452,6 +489,7 @@ impl ReadFormat {
}
/// Returns min/max values of specific non-tag columns.
/// Returns None if the column does not have statistics.
fn column_values(
row_groups: &[impl Borrow<RowGroupMetaData>],
column: &ColumnMetadata,
@@ -544,6 +582,29 @@ impl ReadFormat {
}
}
/// Values of column statistics of the SST.
///
/// It also distinguishes the case that a column is not found and
/// the column exists but has no statistics.
pub enum StatValues {
/// Values of each row group.
Values(ArrayRef),
/// No such column.
NoColumn,
/// Column exists but has no statistics.
NoStats,
}
impl StatValues {
/// Creates a new `StatValues` instance from optional statistics.
pub fn from_stats_opt(stats: Option<ArrayRef>) -> Self {
match stats {
Some(stats) => StatValues::Values(stats),
None => StatValues::NoStats,
}
}
}
#[cfg(test)]
impl ReadFormat {
/// Creates a helper with existing `metadata` and all columns.

View File

@@ -25,7 +25,7 @@ use parquet::file::metadata::RowGroupMetaData;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use crate::sst::parquet::format::ReadFormat;
use crate::sst::parquet::format::{ReadFormat, StatValues};
/// Statistics for pruning row groups.
pub(crate) struct RowGroupPruningStats<'a, T> {
@@ -100,16 +100,18 @@ impl<T: Borrow<RowGroupMetaData>> PruningStatistics for RowGroupPruningStats<'_,
fn min_values(&self, column: &Column) -> Option<ArrayRef> {
let column_id = self.column_id_to_prune(&column.name)?;
match self.read_format.min_values(self.row_groups, column_id) {
Some(values) => Some(values),
None => self.compat_default_value(&column.name),
StatValues::Values(values) => Some(values),
StatValues::NoColumn => self.compat_default_value(&column.name),
StatValues::NoStats => None,
}
}
fn max_values(&self, column: &Column) -> Option<ArrayRef> {
let column_id = self.column_id_to_prune(&column.name)?;
match self.read_format.max_values(self.row_groups, column_id) {
Some(values) => Some(values),
None => self.compat_default_value(&column.name),
StatValues::Values(values) => Some(values),
StatValues::NoColumn => self.compat_default_value(&column.name),
StatValues::NoStats => None,
}
}
@@ -118,10 +120,12 @@ impl<T: Borrow<RowGroupMetaData>> PruningStatistics for RowGroupPruningStats<'_,
}
fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
let Some(column_id) = self.column_id_to_prune(&column.name) else {
return self.compat_null_count(&column.name);
};
self.read_format.null_counts(self.row_groups, column_id)
let column_id = self.column_id_to_prune(&column.name)?;
match self.read_format.null_counts(self.row_groups, column_id) {
StatValues::Values(values) => Some(values),
StatValues::NoColumn => self.compat_null_count(&column.name),
StatValues::NoStats => None,
}
}
fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {