From f907a93b97a9c97de93371ec88f159f31cc2a10b Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Tue, 3 Jan 2023 18:04:26 +0800 Subject: [PATCH] feat: impl RangeArray based on DictionaryArray (#796) * feat: impl RangeArray based on DictionaryArray Signed-off-by: Ruihang Xia * fix clippys Signed-off-by: Ruihang Xia * apply review suggs * fix typo Signed-off-by: Ruihang Xia * update license header Signed-off-by: Ruihang Xia * Apply suggestions from code review Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com> * update doc to change i32 to u32 Signed-off-by: Ruihang Xia Signed-off-by: Ruihang Xia Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com> --- Cargo.lock | 7 + src/promql/Cargo.toml | 1 + src/promql/src/error.rs | 17 ++ src/promql/src/lib.rs | 1 + src/promql/src/range_array.rs | 365 ++++++++++++++++++++++++++++++++++ 5 files changed, 391 insertions(+) create mode 100644 src/promql/src/range_array.rs diff --git a/Cargo.lock b/Cargo.lock index 8b9c74195d..0c941d2ca5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -954,6 +954,12 @@ version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2c676a478f63e9fa2dd5368a42f28bba0d6c560b775f38583c8bbaa7fcd67c9c" +[[package]] +name = "bytemuck" +version = "1.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aaa3a8d9a1ca92e282c96a32d6511b695d7d994d1d102ba85d279f9b2756947f" + [[package]] name = "byteorder" version = "1.4.3" @@ -5046,6 +5052,7 @@ dependencies = [ name = "promql" version = "0.1.0" dependencies = [ + "bytemuck", "common-error", "datafusion", "datatypes", diff --git a/src/promql/Cargo.toml b/src/promql/Cargo.toml index 48b988f923..8fe2be2ec5 100644 --- a/src/promql/Cargo.toml +++ b/src/promql/Cargo.toml @@ -5,6 +5,7 @@ edition.workspace = true license.workspace = true [dependencies] +bytemuck = "1.12" common-error = { path = "../common/error" } datafusion.workspace = true datatypes = { path = "../datatypes" } diff --git a/src/promql/src/error.rs b/src/promql/src/error.rs index 0ec503fa4c..dc0f813323 100644 --- a/src/promql/src/error.rs +++ b/src/promql/src/error.rs @@ -21,6 +21,22 @@ use common_error::prelude::*; pub enum Error { #[snafu(display("Unsupported expr type: {}", name))] UnsupportedExpr { name: String, backtrace: Backtrace }, + + #[snafu(display( + "Illegal range: offset {}, length {}, array len {}", + offset, + length, + len + ))] + IllegalRange { + offset: u32, + length: u32, + len: usize, + backtrace: Backtrace, + }, + + #[snafu(display("Empty range is not expected"))] + EmptyRange { backtrace: Backtrace }, } impl ErrorExt for Error { @@ -28,6 +44,7 @@ impl ErrorExt for Error { use Error::*; match self { UnsupportedExpr { .. } => StatusCode::InvalidArguments, + IllegalRange { .. } | EmptyRange { .. } => StatusCode::Internal, } } fn backtrace_opt(&self) -> Option<&Backtrace> { diff --git a/src/promql/src/lib.rs b/src/promql/src/lib.rs index 83fb3293f6..40874fe267 100644 --- a/src/promql/src/lib.rs +++ b/src/promql/src/lib.rs @@ -15,3 +15,4 @@ pub mod engine; pub mod error; pub mod extension_plan; +pub mod range_array; diff --git a/src/promql/src/range_array.rs b/src/promql/src/range_array.rs new file mode 100644 index 0000000000..7784b61360 --- /dev/null +++ b/src/promql/src/range_array.rs @@ -0,0 +1,365 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//!An extended "array" based on [DictionaryArray]. + +use datatypes::arrow::array::{Array, ArrayData, ArrayRef, DictionaryArray, Int64Array}; +use datatypes::arrow::datatypes::{DataType, Int64Type}; +use snafu::{ensure, OptionExt}; + +use crate::error::{EmptyRangeSnafu, IllegalRangeSnafu, Result}; + +/// An compound logical "array" type. Represent serval ranges (slices) of one array. +/// It's useful to use case like compute sliding window, or range selector from promql. +/// +/// It's build on top of Arrow's [DictionaryArray]. [DictionaryArray] contains two +/// sub-arrays, one for dictionary key and another for dictionary value. Both of them +/// can be arbitrary types, but here the key array is fixed to u32 type. +/// +/// ```text +/// │ ┌─────┬─────┬─────┬─────┐ +/// Two │ │ i64 │ i64 │ i64 │ i64 │ Keys +/// Arrays │ └─────┴─────┴─────┴─────┘ (Fixed to i64) +/// In │ +/// Arrow's │ ┌────────────────────────────┐ +/// Dictionary │ │ │ Values +/// Array │ └────────────────────────────┘(Any Type) +/// ``` +/// +/// Because the i64 key array is reinterpreted into two u32 for offset and length +/// in [RangeArray] to represent a "range": +/// +/// ```text +/// 63 32│31 0 +/// ┌───────────────┼───────────────┐ +/// │ offset (u32) │ length (u32) │ +/// └───────────────┼───────────────┘ +/// ``` +/// +/// Then the [DictionaryArray] can be expanded to serveral ranges like this: +/// +/// ```text +/// Keys +/// ┌───────┬───────┬───────┐ ┌───────┐ +/// │ (0,2) │ (1,2) │ (2,2) │ ─┐ │[A,B,C]│ values.slice(0,2) +/// └───────┴───────┴───────┘ │ ├───────┤ +/// Values ├─► │[B,C,D]│ values.slice(1,2) +/// ┌───┬───┬───┬───┬───┐ │ ├───────┤ +/// │ A │ B │ C │ D │ E │ ─┘ │[C,D,E]│ values.slice(2,2) +/// └───┴───┴───┴───┴───┘ └───────┘ +/// ``` +pub struct RangeArray { + array: DictionaryArray, +} + +impl RangeArray { + pub const fn key_type() -> DataType { + DataType::Int64 + } + + pub fn try_new(dict: DictionaryArray) -> Result { + let ranges_iter = dict + .keys() + .iter() + .map(|compound_key| compound_key.map(unpack)) + .collect::>>() + .context(EmptyRangeSnafu)?; + Self::check_ranges(dict.values().len(), ranges_iter)?; + + Ok(Self { array: dict }) + } + + pub fn from_ranges(values: ArrayRef, ranges: R) -> Result + where + R: IntoIterator + Clone, + { + Self::check_ranges(values.len(), ranges.clone())?; + + unsafe { Ok(Self::from_ranges_unchecked(values, ranges)) } + } + + /// Construct [RangeArray] from given range without checking its validaty. + /// + /// # Safety + /// + /// Caller should ensure the given range are valid. Otherwise use [`from_ranges`] + /// instead. + /// + /// [`from_ranges`]: crate::range_array::RangeArray#method.from_ranges + pub unsafe fn from_ranges_unchecked(values: ArrayRef, ranges: R) -> Self + where + R: IntoIterator, + { + let key_array = Int64Array::from_iter( + ranges + .into_iter() + .map(|(offset, length)| pack(offset, length)), + ); + + // Build from ArrayData to bypass the "offset" checker. Because + // we are not using "keys" as-is. + // This paragraph is copied from arrow-rs dictionary_array.rs `try_new()`. + let mut data = ArrayData::builder(DataType::Dictionary( + Box::new(Self::key_type()), + Box::new(values.data_type().clone()), + )) + .len(key_array.len()) + .add_buffer(key_array.data().buffers()[0].clone()) + .add_child_data(values.data().clone()); + match key_array.data().null_buffer() { + Some(buffer) if key_array.data().null_count() > 0 => { + data = data + .null_bit_buffer(Some(buffer.clone())) + .null_count(key_array.data().null_count()); + } + _ => data = data.null_count(0), + } + let array_data = unsafe { data.build_unchecked() }; + + Self { + array: array_data.into(), + } + } + + pub fn len(&self) -> usize { + self.array.keys().len() + } + + pub fn is_empty(&self) -> bool { + self.array.keys().is_empty() + } + + pub fn get(&self, index: usize) -> Option { + if index >= self.len() { + return None; + } + + let compound_key = self.array.keys().value(index); + let (offset, length) = unpack(compound_key); + let array = self.array.values().slice(offset as usize, length as usize); + + Some(array) + } + + /// Return the underlying Arrow's [DictionaryArray]. Notes the dictionary array might be + /// invalid from Arrow's definition. Be care if try to access this dictionary through + /// Arrow's API. + pub fn into_dict(self) -> DictionaryArray { + self.array + } + + fn check_ranges(value_len: usize, ranges: R) -> Result<()> + where + R: IntoIterator, + { + for (offset, length) in ranges.into_iter() { + ensure!( + offset as usize + length as usize <= value_len, + IllegalRangeSnafu { + offset, + length, + len: value_len + } + ); + } + Ok(()) + } +} + +impl Array for RangeArray { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn data(&self) -> &ArrayData { + self.array.data() + } + + fn into_data(self) -> ArrayData { + self.array.into_data() + } +} + +impl std::fmt::Debug for RangeArray { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let ranges = self + .array + .keys() + .iter() + .map(|compound_key| { + compound_key.map(|key| { + let (offset, length) = unpack(key); + offset..(offset + length) + }) + }) + .collect::>(); + f.debug_struct("RangeArray") + .field("base array", self.array.values()) + .field("ranges", &ranges) + .finish() + } +} + +// util functions + +fn pack(offset: u32, length: u32) -> i64 { + bytemuck::cast::<[u32; 2], i64>([offset, length]) +} + +fn unpack(compound: i64) -> (u32, u32) { + let [offset, length] = bytemuck::cast::(compound); + (offset, length) +} + +#[cfg(test)] +mod test { + use std::fmt::Write; + use std::sync::Arc; + + use datatypes::arrow::array::UInt64Array; + + use super::*; + + fn expand_format(range_array: &RangeArray) -> String { + let mut result = String::new(); + for i in 0..range_array.len() { + writeln!(result, "{:?}", range_array.get(i)).unwrap(); + } + result + } + + #[test] + fn construct_from_ranges() { + let values_array = Arc::new(UInt64Array::from_iter([1, 2, 3, 4, 5, 6, 7, 8, 9])); + let ranges = [(0, 2), (0, 5), (1, 1), (3, 3), (8, 1), (9, 0)]; + + let range_array = RangeArray::from_ranges(values_array, ranges).unwrap(); + assert_eq!(range_array.len(), 6); + + let expected = String::from( + "Some(PrimitiveArray\ + \n[\ + \n 1,\ + \n 2,\ + \n])\ + \nSome(PrimitiveArray\ + \n[\ + \n 1,\ + \n 2,\ + \n 3,\ + \n 4,\ + \n 5,\ + \n])\ + \nSome(PrimitiveArray\ + \n[\ + \n 2,\ + \n])\ + \ + \nSome(PrimitiveArray\ + \n[\ + \n 4,\ + \n 5,\ + \n 6,\ + \n])\ + \nSome(PrimitiveArray\ + \n[\ + \n 9,\ + \n])\ + \nSome(PrimitiveArray\ + \n[\ + \n])\ + \n", + ); + + let formatted = expand_format(&range_array); + assert_eq!(formatted, expected); + } + + #[test] + fn illegal_range() { + let values_array = Arc::new(UInt64Array::from_iter([1, 2, 3, 4, 5, 6, 7, 8, 9])); + let ranges = [(9, 1)]; + assert!(RangeArray::from_ranges(values_array, ranges).is_err()); + } + + #[test] + fn dict_array_round_trip() { + let values_array = Arc::new(UInt64Array::from_iter([1, 2, 3, 4, 5, 6, 7, 8, 9])); + let ranges = [(0, 4), (1, 4), (2, 4), (3, 4), (4, 4), (5, 4)]; + let expected = String::from( + "Some(PrimitiveArray\ + \n[\ + \n 1,\ + \n 2,\ + \n 3,\ + \n 4,\ + \n])\ + \nSome(PrimitiveArray\ + \n[\ + \n 2,\ + \n 3,\ + \n 4,\ + \n 5,\ + \n])\ + \nSome(PrimitiveArray\ + \n[\ + \n 3,\ + \n 4,\ + \n 5,\ + \n 6,\ + \n])\ + \nSome(PrimitiveArray\ + \n[\ + \n 4,\ + \n 5,\ + \n 6,\ + \n 7,\ + \n])\ + \nSome(PrimitiveArray\ + \n[\ + \n 5,\ + \n 6,\ + \n 7,\ + \n 8,\ + \n])\ + \nSome(PrimitiveArray\ + \n[\ + \n 6,\ + \n 7,\ + \n 8,\ + \n 9,\ + \n])\ + \n", + ); + + let range_array = RangeArray::from_ranges(values_array, ranges).unwrap(); + assert_eq!(range_array.len(), 6); + let formatted = expand_format(&range_array); + assert_eq!(formatted, expected); + + // this dict array is invalid from Arrow's definition + let dict_array = range_array.into_dict(); + let rounded_range_array = RangeArray::try_new(dict_array).unwrap(); + let formatted = expand_format(&rounded_range_array); + assert_eq!(formatted, expected); + } + + #[test] + fn empty_range_array() { + let values_array = Arc::new(UInt64Array::from_iter([1, 2, 3, 4, 5, 6, 7, 8, 9])); + let ranges = []; + let range_array = RangeArray::from_ranges(values_array, ranges).unwrap(); + assert!(range_array.is_empty()); + } +}