feat: impl RangeArray based on DictionaryArray (#796)

* feat: impl RangeArray based on DictionaryArray

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippys

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* apply review suggs

* fix typo

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update license header

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* Apply suggestions from code review

Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com>

* update doc to change i32 to u32

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com>
This commit is contained in:
Ruihang Xia
2023-01-03 18:04:26 +08:00
committed by GitHub
parent a6eb213adf
commit f907a93b97
5 changed files with 391 additions and 0 deletions

7
Cargo.lock generated
View File

@@ -954,6 +954,12 @@ version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2c676a478f63e9fa2dd5368a42f28bba0d6c560b775f38583c8bbaa7fcd67c9c"
[[package]]
name = "bytemuck"
version = "1.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aaa3a8d9a1ca92e282c96a32d6511b695d7d994d1d102ba85d279f9b2756947f"
[[package]]
name = "byteorder"
version = "1.4.3"
@@ -5046,6 +5052,7 @@ dependencies = [
name = "promql"
version = "0.1.0"
dependencies = [
"bytemuck",
"common-error",
"datafusion",
"datatypes",

View File

@@ -5,6 +5,7 @@ edition.workspace = true
license.workspace = true
[dependencies]
bytemuck = "1.12"
common-error = { path = "../common/error" }
datafusion.workspace = true
datatypes = { path = "../datatypes" }

View File

@@ -21,6 +21,22 @@ use common_error::prelude::*;
pub enum Error {
#[snafu(display("Unsupported expr type: {}", name))]
UnsupportedExpr { name: String, backtrace: Backtrace },
#[snafu(display(
"Illegal range: offset {}, length {}, array len {}",
offset,
length,
len
))]
IllegalRange {
offset: u32,
length: u32,
len: usize,
backtrace: Backtrace,
},
#[snafu(display("Empty range is not expected"))]
EmptyRange { backtrace: Backtrace },
}
impl ErrorExt for Error {
@@ -28,6 +44,7 @@ impl ErrorExt for Error {
use Error::*;
match self {
UnsupportedExpr { .. } => StatusCode::InvalidArguments,
IllegalRange { .. } | EmptyRange { .. } => StatusCode::Internal,
}
}
fn backtrace_opt(&self) -> Option<&Backtrace> {

View File

@@ -15,3 +15,4 @@
pub mod engine;
pub mod error;
pub mod extension_plan;
pub mod range_array;

View File

@@ -0,0 +1,365 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//An extended "array" based on [DictionaryArray].
use datatypes::arrow::array::{Array, ArrayData, ArrayRef, DictionaryArray, Int64Array};
use datatypes::arrow::datatypes::{DataType, Int64Type};
use snafu::{ensure, OptionExt};
use crate::error::{EmptyRangeSnafu, IllegalRangeSnafu, Result};
/// An compound logical "array" type. Represent serval ranges (slices) of one array.
/// It's useful to use case like compute sliding window, or range selector from promql.
///
/// It's build on top of Arrow's [DictionaryArray]. [DictionaryArray] contains two
/// sub-arrays, one for dictionary key and another for dictionary value. Both of them
/// can be arbitrary types, but here the key array is fixed to u32 type.
///
/// ```text
/// │ ┌─────┬─────┬─────┬─────┐
/// Two │ │ i64 │ i64 │ i64 │ i64 │ Keys
/// Arrays │ └─────┴─────┴─────┴─────┘ (Fixed to i64)
/// In │
/// Arrow's │ ┌────────────────────────────┐
/// Dictionary │ │ │ Values
/// Array │ └────────────────────────────┘(Any Type)
/// ```
///
/// Because the i64 key array is reinterpreted into two u32 for offset and length
/// in [RangeArray] to represent a "range":
///
/// ```text
/// 63 32│31 0
/// ┌───────────────┼───────────────┐
/// │ offset (u32) │ length (u32) │
/// └───────────────┼───────────────┘
/// ```
///
/// Then the [DictionaryArray] can be expanded to serveral ranges like this:
///
/// ```text
/// Keys
/// ┌───────┬───────┬───────┐ ┌───────┐
/// │ (0,2) │ (1,2) │ (2,2) │ ─┐ │[A,B,C]│ values.slice(0,2)
/// └───────┴───────┴───────┘ │ ├───────┤
/// Values ├─► │[B,C,D]│ values.slice(1,2)
/// ┌───┬───┬───┬───┬───┐ │ ├───────┤
/// │ A │ B │ C │ D │ E │ ─┘ │[C,D,E]│ values.slice(2,2)
/// └───┴───┴───┴───┴───┘ └───────┘
/// ```
pub struct RangeArray {
array: DictionaryArray<Int64Type>,
}
impl RangeArray {
pub const fn key_type() -> DataType {
DataType::Int64
}
pub fn try_new(dict: DictionaryArray<Int64Type>) -> Result<Self> {
let ranges_iter = dict
.keys()
.iter()
.map(|compound_key| compound_key.map(unpack))
.collect::<Option<Vec<_>>>()
.context(EmptyRangeSnafu)?;
Self::check_ranges(dict.values().len(), ranges_iter)?;
Ok(Self { array: dict })
}
pub fn from_ranges<R>(values: ArrayRef, ranges: R) -> Result<Self>
where
R: IntoIterator<Item = (u32, u32)> + Clone,
{
Self::check_ranges(values.len(), ranges.clone())?;
unsafe { Ok(Self::from_ranges_unchecked(values, ranges)) }
}
/// Construct [RangeArray] from given range without checking its validaty.
///
/// # Safety
///
/// Caller should ensure the given range are valid. Otherwise use [`from_ranges`]
/// instead.
///
/// [`from_ranges`]: crate::range_array::RangeArray#method.from_ranges
pub unsafe fn from_ranges_unchecked<R>(values: ArrayRef, ranges: R) -> Self
where
R: IntoIterator<Item = (u32, u32)>,
{
let key_array = Int64Array::from_iter(
ranges
.into_iter()
.map(|(offset, length)| pack(offset, length)),
);
// Build from ArrayData to bypass the "offset" checker. Because
// we are not using "keys" as-is.
// This paragraph is copied from arrow-rs dictionary_array.rs `try_new()`.
let mut data = ArrayData::builder(DataType::Dictionary(
Box::new(Self::key_type()),
Box::new(values.data_type().clone()),
))
.len(key_array.len())
.add_buffer(key_array.data().buffers()[0].clone())
.add_child_data(values.data().clone());
match key_array.data().null_buffer() {
Some(buffer) if key_array.data().null_count() > 0 => {
data = data
.null_bit_buffer(Some(buffer.clone()))
.null_count(key_array.data().null_count());
}
_ => data = data.null_count(0),
}
let array_data = unsafe { data.build_unchecked() };
Self {
array: array_data.into(),
}
}
pub fn len(&self) -> usize {
self.array.keys().len()
}
pub fn is_empty(&self) -> bool {
self.array.keys().is_empty()
}
pub fn get(&self, index: usize) -> Option<ArrayRef> {
if index >= self.len() {
return None;
}
let compound_key = self.array.keys().value(index);
let (offset, length) = unpack(compound_key);
let array = self.array.values().slice(offset as usize, length as usize);
Some(array)
}
/// Return the underlying Arrow's [DictionaryArray]. Notes the dictionary array might be
/// invalid from Arrow's definition. Be care if try to access this dictionary through
/// Arrow's API.
pub fn into_dict(self) -> DictionaryArray<Int64Type> {
self.array
}
fn check_ranges<R>(value_len: usize, ranges: R) -> Result<()>
where
R: IntoIterator<Item = (u32, u32)>,
{
for (offset, length) in ranges.into_iter() {
ensure!(
offset as usize + length as usize <= value_len,
IllegalRangeSnafu {
offset,
length,
len: value_len
}
);
}
Ok(())
}
}
impl Array for RangeArray {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn data(&self) -> &ArrayData {
self.array.data()
}
fn into_data(self) -> ArrayData {
self.array.into_data()
}
}
impl std::fmt::Debug for RangeArray {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let ranges = self
.array
.keys()
.iter()
.map(|compound_key| {
compound_key.map(|key| {
let (offset, length) = unpack(key);
offset..(offset + length)
})
})
.collect::<Vec<_>>();
f.debug_struct("RangeArray")
.field("base array", self.array.values())
.field("ranges", &ranges)
.finish()
}
}
// util functions
fn pack(offset: u32, length: u32) -> i64 {
bytemuck::cast::<[u32; 2], i64>([offset, length])
}
fn unpack(compound: i64) -> (u32, u32) {
let [offset, length] = bytemuck::cast::<i64, [u32; 2]>(compound);
(offset, length)
}
#[cfg(test)]
mod test {
use std::fmt::Write;
use std::sync::Arc;
use datatypes::arrow::array::UInt64Array;
use super::*;
fn expand_format(range_array: &RangeArray) -> String {
let mut result = String::new();
for i in 0..range_array.len() {
writeln!(result, "{:?}", range_array.get(i)).unwrap();
}
result
}
#[test]
fn construct_from_ranges() {
let values_array = Arc::new(UInt64Array::from_iter([1, 2, 3, 4, 5, 6, 7, 8, 9]));
let ranges = [(0, 2), (0, 5), (1, 1), (3, 3), (8, 1), (9, 0)];
let range_array = RangeArray::from_ranges(values_array, ranges).unwrap();
assert_eq!(range_array.len(), 6);
let expected = String::from(
"Some(PrimitiveArray<UInt64>\
\n[\
\n 1,\
\n 2,\
\n])\
\nSome(PrimitiveArray<UInt64>\
\n[\
\n 1,\
\n 2,\
\n 3,\
\n 4,\
\n 5,\
\n])\
\nSome(PrimitiveArray<UInt64>\
\n[\
\n 2,\
\n])\
\
\nSome(PrimitiveArray<UInt64>\
\n[\
\n 4,\
\n 5,\
\n 6,\
\n])\
\nSome(PrimitiveArray<UInt64>\
\n[\
\n 9,\
\n])\
\nSome(PrimitiveArray<UInt64>\
\n[\
\n])\
\n",
);
let formatted = expand_format(&range_array);
assert_eq!(formatted, expected);
}
#[test]
fn illegal_range() {
let values_array = Arc::new(UInt64Array::from_iter([1, 2, 3, 4, 5, 6, 7, 8, 9]));
let ranges = [(9, 1)];
assert!(RangeArray::from_ranges(values_array, ranges).is_err());
}
#[test]
fn dict_array_round_trip() {
let values_array = Arc::new(UInt64Array::from_iter([1, 2, 3, 4, 5, 6, 7, 8, 9]));
let ranges = [(0, 4), (1, 4), (2, 4), (3, 4), (4, 4), (5, 4)];
let expected = String::from(
"Some(PrimitiveArray<UInt64>\
\n[\
\n 1,\
\n 2,\
\n 3,\
\n 4,\
\n])\
\nSome(PrimitiveArray<UInt64>\
\n[\
\n 2,\
\n 3,\
\n 4,\
\n 5,\
\n])\
\nSome(PrimitiveArray<UInt64>\
\n[\
\n 3,\
\n 4,\
\n 5,\
\n 6,\
\n])\
\nSome(PrimitiveArray<UInt64>\
\n[\
\n 4,\
\n 5,\
\n 6,\
\n 7,\
\n])\
\nSome(PrimitiveArray<UInt64>\
\n[\
\n 5,\
\n 6,\
\n 7,\
\n 8,\
\n])\
\nSome(PrimitiveArray<UInt64>\
\n[\
\n 6,\
\n 7,\
\n 8,\
\n 9,\
\n])\
\n",
);
let range_array = RangeArray::from_ranges(values_array, ranges).unwrap();
assert_eq!(range_array.len(), 6);
let formatted = expand_format(&range_array);
assert_eq!(formatted, expected);
// this dict array is invalid from Arrow's definition
let dict_array = range_array.into_dict();
let rounded_range_array = RangeArray::try_new(dict_array).unwrap();
let formatted = expand_format(&rounded_range_array);
assert_eq!(formatted, expected);
}
#[test]
fn empty_range_array() {
let values_array = Arc::new(UInt64Array::from_iter([1, 2, 3, 4, 5, 6, 7, 8, 9]));
let ranges = [];
let range_array = RangeArray::from_ranges(values_array, ranges).unwrap();
assert!(range_array.is_empty());
}
}