feat(bulk): write to multiple time partitions (#6086)

* add benchmark for splitting according to time partition

* feat/write-to-multiple-time-partitions:
 **Enhancements to Bulk Processing and Time Partitioning**

 - **`part.rs`**: Added `Snafu` to imports and introduced `timestamp_index` in `BulkPart` struct. Implemented `timestamps` method for accessing timestamp columns.
 - **`simple_bulk_memtable.rs`**: Updated tests to include `timestamp_index` initialization.
 - **`time_partition.rs`**: Enhanced `TimePartition` to support partial writes with `write_record_batch_partial`. Implemented `split_record_batch` for filtering records by timestamp range. Added comprehensive tests for `split_record_batch`.
 - **`handle_bulk_insert.rs`**: Modified to retrieve timestamp index and column together, updating `BulkPart` initialization with `timestamp_index`.

* feat/write-to-multiple-time-partitions:
 ### Enhance Time Partitioning Logic

 - **`time_partition.rs`**:
   - Introduced `HashSet` for efficient partition management.
   - Refactored `write_bulk` to handle multiple partitions and added `find_partitions_by_time_range` for identifying existing and missing partitions.
   - Updated `get_or_create_time_partition` to manage partition creation.
   - Added comprehensive tests for partition finding logic, covering various scenarios including overlapping and non-overlapping time ranges.

 - **Tests**:
   - Added `test_find_partitions_by_time_range` to validate new partitioning logic.
   - Updated `test_split_record_batch` to ensure correct record batch splitting behavior.

* feat/write-to-multiple-time-partitions:
 ### Enhance Time Partitioning and Testing in `time_partition.rs`

 - **Time Partitioning Enhancements**:
   - Updated `split_record_batch` to handle multiple timestamp units (`Second`, `Millisecond`, `Microsecond`, `Nanosecond`) by matching on `DataType`.
   - Improved filtering logic for timestamp arrays to support various time units.

 - **Testing Enhancements**:
   - Added `test_write_bulk` to verify writing across multiple partitions and scenarios in `time_partition.rs`.
   - Updated `test_split_record_batch` to use `TimestampMillisecondArray` for testing timestamp partitioning.

 - **Imports and Dependencies**:
   - Added necessary imports for new timestamp array types and testing utilities.

* feat/write-to-multiple-time-partitions:
 ### Refactor and Enhance Time Partition Filtering

 - **Refactor Filtering Logic**: Consolidated the filtering logic for timestamp arrays using macros in `time_partition.rs` and `bench_filter_time_partition.rs`. This reduces code duplication and improves maintainability.
 - **Enhance `BulkPart` Struct**: Made fields in `BulkPart` public to facilitate easier access and manipulation in `memtable.rs` and `part.rs`.
 - **Rename Function**: Renamed `split_record_batch` to `filter_record_batch` for clarity in `time_partition.rs` and `bench_filter_time_partition.rs`.
 - **Add Feature Flag**: Introduced `int_roundings` feature in `lib.rs` to support new functionality.

* refactor tests

* feat/write-to-multiple-time-partitions:
 Improve timestamp handling in `time_partition.rs`

 - Enhanced safety comments for timestamp conversion to ensure clarity.
 - Modified logic to prevent overflow by using `div_euclid` for `bulk_start_sec` and `bulk_end_sec` calculations.
 - Adjusted the `filter_map` logic to correctly compute timestamps using `start_sec` and `part_duration_sec`.

* feat/write-to-multiple-time-partitions:
 **Refactor timestamp handling and add utility function**

 - **Refactor `time_partition.rs`:** Simplified timestamp handling by replacing direct type access with a utility function to retrieve the timestamp unit. Improved error handling for timestamp conversion.
 - **Enhance `metadata.rs`:** Added `time_index_type` function to `RegionMetadata` to retrieve the timestamp type of the time index column, ensuring safer and more readable code.

* feat/write-to-multiple-time-partitions:
 Refactor time partition variable names in `time_partition.rs`

 - Renamed variables for clarity: `bulk_start_sec` to `start_bucket` and `bulk_end_sec` to `end_bucket`.
 - Updated related logic to use new variable names for improved readability and maintainability.

* feat/write-to-multiple-time-partitions:
 **Refactor variable names in `time_partition.rs`**

 - Updated variable names from `matching` and `missing` to `matchings` and `missings` for clarity and consistency.
 - Modified function calls and loop iterations to align with the new variable names.
 - Affected file: `src/mito2/src/memtable/time_partition.rs`

* feat/write-to-multiple-time-partitions:
 ### Refactor variable names in `time_partition.rs`

 - Updated variable names for clarity in `time_partition.rs`:
   - Renamed `matchings` to `matching_parts`
   - Renamed `missings` to `missing_parts`
 - Adjusted logic to use new variable names in methods `find_partitions_by_time_range` and `write_record_batch`.

* feat/write-to-multiple-time-partitions:
 ### Enhance Time Partition Handling

 - **`time_partition.rs`**:
   - Added `ArrayRef` to handle timestamp arrays, improving the partitioning logic by allowing more efficient timestamp range checks.
   - Enhanced `find_partitions_by_time_range` to support sparse data and handle different timestamp units (`Second`, `Millisecond`, `Microsecond`, `Nanosecond`).
   - Updated test cases to cover new scenarios, including sparse data and edge cases, ensuring robustness of partition handling.

---------

Co-authored-by: Lei <lei@Leis-MacBook-Pro.local>
This commit is contained in:
Lei, HUANG
2025-05-14 13:09:59 +08:00
committed by GitHub
parent 209f8371f2
commit 5a9023d6b3
9 changed files with 801 additions and 57 deletions

View File

@@ -92,3 +92,8 @@ toml.workspace = true
name = "memtable_bench"
harness = false
required-features = ["test"]
[[bench]]
name = "bench_filter_time_partition"
harness = false
required-features = ["test"]

View File

@@ -0,0 +1,124 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::str::FromStr;
use std::sync::Arc;
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use datatypes::arrow;
use datatypes::arrow::array::{ArrayRef, RecordBatch, TimestampMillisecondArray};
use datatypes::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use datatypes::arrow_array::StringArray;
use mito2::memtable::{filter_record_batch, BulkPart};
fn random_array(num: usize) -> BulkPart {
let mut min = i64::MAX;
let mut max = i64::MIN;
let ts_data: Vec<_> = (0..num)
.map(|_| {
let val = rand::random::<i64>();
min = min.min(val);
max = max.max(val);
val
})
.collect();
let val = StringArray::from_iter_values(ts_data.iter().map(|v| v.to_string()));
let ts = TimestampMillisecondArray::from(ts_data);
let schema = Arc::new(Schema::new(vec![
Field::new(
"ts",
DataType::Timestamp(TimeUnit::Millisecond, None),
false,
),
Field::new("val", DataType::Utf8, false),
]));
let batch = RecordBatch::try_new(
schema,
vec![Arc::new(ts) as ArrayRef, Arc::new(val) as ArrayRef],
)
.unwrap();
BulkPart {
batch,
num_rows: num,
max_ts: max,
min_ts: min,
sequence: 0,
timestamp_index: 0,
}
}
fn filter_arrow_impl(part: &BulkPart, min: i64, max: i64) -> Option<BulkPart> {
let ts_array = part.timestamps();
let gt_eq =
arrow::compute::kernels::cmp::gt_eq(ts_array, &TimestampMillisecondArray::new_scalar(min))
.unwrap();
let lt =
arrow::compute::kernels::cmp::lt(ts_array, &TimestampMillisecondArray::new_scalar(max))
.unwrap();
let predicate = arrow::compute::and(&gt_eq, &lt).unwrap();
let ts_filtered = arrow::compute::filter(ts_array, &predicate).unwrap();
if ts_filtered.is_empty() {
return None;
}
let num_rows_filtered = ts_filtered.len();
let i64array = ts_filtered
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap();
let max = arrow::compute::max(i64array).unwrap();
let min = arrow::compute::min(i64array).unwrap();
let batch = arrow::compute::filter_record_batch(&part.batch, &predicate).unwrap();
Some(BulkPart {
batch,
num_rows: num_rows_filtered,
max_ts: max,
min_ts: min,
sequence: 0,
timestamp_index: part.timestamp_index,
})
}
fn filter_batch_by_time_range(c: &mut Criterion) {
let num = std::env::var("BENCH_FILTER_BATCH_TIME_RANGE_NUM")
.ok()
.and_then(|v| usize::from_str(&v).ok())
.unwrap_or(10000);
let part = random_array(num);
let min = 0;
let max = 10000;
let mut group = c.benchmark_group("filter_by_time_range");
group.bench_function("arrow_impl", |b| {
b.iter(|| {
let _ = black_box(filter_arrow_impl(&part, min, max));
});
});
group.bench_function("manual_impl", |b| {
b.iter(|| {
let _ = black_box(filter_record_batch(&part, min, max));
});
});
group.finish();
}
criterion_group!(benches, filter_batch_by_time_range);
criterion_main!(benches);

View File

@@ -19,6 +19,7 @@
#![feature(let_chains)]
#![feature(assert_matches)]
#![feature(result_flattening)]
#![feature(int_roundings)]
#[cfg(any(test, feature = "test"))]
#[cfg_attr(feature = "test", allow(unused))]

View File

@@ -29,7 +29,6 @@ use table::predicate::Predicate;
use crate::config::MitoConfig;
use crate::error::Result;
use crate::flush::WriteBufferManagerRef;
use crate::memtable::bulk::part::BulkPart;
use crate::memtable::key_values::KeyValue;
pub use crate::memtable::key_values::KeyValues;
use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtableBuilder};
@@ -51,6 +50,11 @@ pub mod time_partition;
pub mod time_series;
pub(crate) mod version;
#[cfg(any(test, feature = "test"))]
pub use bulk::part::BulkPart;
#[cfg(any(test, feature = "test"))]
pub use time_partition::filter_record_batch;
/// Id for memtables.
///
/// Should be unique under the same region.
@@ -142,7 +146,7 @@ pub trait Memtable: Send + Sync + fmt::Debug {
fn write_one(&self, key_value: KeyValue) -> Result<()>;
/// Writes an encoded batch of into memtable.
fn write_bulk(&self, part: BulkPart) -> Result<()>;
fn write_bulk(&self, part: crate::memtable::bulk::part::BulkPart) -> Result<()>;
/// Scans the memtable.
/// `projection` selects columns to read, `None` means reading all columns.

View File

@@ -40,7 +40,7 @@ use parquet::arrow::ArrowWriter;
use parquet::data_type::AsBytes;
use parquet::file::metadata::ParquetMetaData;
use parquet::file::properties::WriterProperties;
use snafu::{OptionExt, ResultExt};
use snafu::{OptionExt, ResultExt, Snafu};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::SequenceNumber;
use table::predicate::Predicate;
@@ -56,12 +56,14 @@ use crate::sst::parquet::format::{PrimaryKeyArray, ReadFormat};
use crate::sst::parquet::helper::parse_parquet_metadata;
use crate::sst::to_sst_arrow_schema;
#[derive(Clone)]
pub struct BulkPart {
pub(crate) batch: RecordBatch,
pub(crate) num_rows: usize,
pub(crate) max_ts: i64,
pub(crate) min_ts: i64,
pub(crate) sequence: u64,
pub batch: RecordBatch,
pub num_rows: usize,
pub max_ts: i64,
pub min_ts: i64,
pub sequence: u64,
pub timestamp_index: usize,
}
impl BulkPart {
@@ -124,6 +126,10 @@ impl BulkPart {
write_hint: None,
})
}
pub fn timestamps(&self) -> &ArrayRef {
self.batch.column(self.timestamp_index)
}
}
#[derive(Debug)]

View File

@@ -564,6 +564,7 @@ mod tests {
min_ts: 1,
max_ts: 2,
num_rows: 2,
timestamp_index: 0,
};
memtable.write_bulk(part).unwrap();

View File

@@ -14,7 +14,7 @@
//! Partitions memtables by time.
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, Mutex, MutexGuard};
use std::time::Duration;
@@ -22,8 +22,15 @@ use common_telemetry::debug;
use common_time::timestamp::TimeUnit;
use common_time::timestamp_millis::BucketAligned;
use common_time::Timestamp;
use datatypes::arrow;
use datatypes::arrow::array::{
ArrayRef, BooleanArray, RecordBatch, RecordBatchOptions, TimestampMicrosecondArray,
TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
};
use datatypes::arrow::buffer::{BooleanBuffer, MutableBuffer};
use datatypes::arrow::datatypes::{DataType, Int64Type};
use smallvec::{smallvec, SmallVec};
use snafu::OptionExt;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use crate::error;
@@ -38,7 +45,8 @@ use crate::memtable::{KeyValues, MemtableBuilderRef, MemtableId, MemtableRef};
pub struct TimePartition {
/// Memtable of the partition.
memtable: MemtableRef,
/// Time range of the partition. `None` means there is no time range. The time
/// Time range of the partition. `min` is inclusive and `max` is exclusive.
/// `None` means there is no time range. The time
/// range is `None` if and only if the [TimePartitions::part_duration] is `None`.
time_range: Option<PartTimeRange>,
}
@@ -59,9 +67,138 @@ impl TimePartition {
}
/// Writes a record batch to memtable.
fn write_record_batch(&self, rb: BulkPart) -> error::Result<()> {
fn write_record_batch(&self, rb: BulkPart) -> Result<()> {
self.memtable.write_bulk(rb)
}
/// Write a partial [BulkPart] according to [TimePartition::time_range].
fn write_record_batch_partial(&self, part: &BulkPart) -> error::Result<()> {
let Some(range) = self.time_range else {
unreachable!("TimePartition must have explicit time range when a bulk request involves multiple time partition")
};
let Some(filtered) = filter_record_batch(
part,
range.min_timestamp.value(),
range.max_timestamp.value(),
)?
else {
return Ok(());
};
self.write_record_batch(filtered)
}
}
macro_rules! create_filter_buffer {
($ts_array:expr, $min:expr, $max:expr) => {{
let len = $ts_array.len();
let mut buffer = MutableBuffer::new(len.div_ceil(64) * 8);
let f = |idx: usize| -> bool {
// SAFETY: we only iterate the array within index bound.
unsafe {
let val = $ts_array.value_unchecked(idx);
val >= $min && val < $max
}
};
let chunks = len / 64;
let remainder = len % 64;
for chunk in 0..chunks {
let mut packed = 0;
for bit_idx in 0..64 {
let i = bit_idx + chunk * 64;
packed |= (f(i) as u64) << bit_idx;
}
// SAFETY: Already allocated sufficient capacity
unsafe { buffer.push_unchecked(packed) }
}
if remainder != 0 {
let mut packed = 0;
for bit_idx in 0..remainder {
let i = bit_idx + chunks * 64;
packed |= (f(i) as u64) << bit_idx;
}
// SAFETY: Already allocated sufficient capacity
unsafe { buffer.push_unchecked(packed) }
}
BooleanArray::new(BooleanBuffer::new(buffer.into(), 0, len), None)
}};
}
macro_rules! handle_timestamp_array {
($ts_array:expr, $array_type:ty, $min:expr, $max:expr) => {{
let ts_array = $ts_array.as_any().downcast_ref::<$array_type>().unwrap();
let filter = create_filter_buffer!(ts_array, $min, $max);
let res = arrow::compute::filter(ts_array, &filter).context(error::ComputeArrowSnafu)?;
if res.is_empty() {
return Ok(None);
}
let i64array = res.as_any().downcast_ref::<$array_type>().unwrap();
// safety: we've checked res is not empty
let max_ts = arrow::compute::max(i64array).unwrap();
let min_ts = arrow::compute::min(i64array).unwrap();
(res, filter, min_ts, max_ts)
}};
}
/// Filters the given part according to min (inclusive) and max (exclusive) timestamp range.
/// Returns [None] if no matching rows.
pub fn filter_record_batch(part: &BulkPart, min: i64, max: i64) -> Result<Option<BulkPart>> {
let ts_array = part.timestamps();
let (ts_array, filter, min_ts, max_ts) = match ts_array.data_type() {
DataType::Timestamp(unit, _) => match unit {
arrow::datatypes::TimeUnit::Second => {
handle_timestamp_array!(ts_array, TimestampSecondArray, min, max)
}
arrow::datatypes::TimeUnit::Millisecond => {
handle_timestamp_array!(ts_array, TimestampMillisecondArray, min, max)
}
arrow::datatypes::TimeUnit::Microsecond => {
handle_timestamp_array!(ts_array, TimestampMicrosecondArray, min, max)
}
arrow::datatypes::TimeUnit::Nanosecond => {
handle_timestamp_array!(ts_array, TimestampNanosecondArray, min, max)
}
},
_ => {
unreachable!("Got data type: {:?}", ts_array.data_type());
}
};
let num_rows = ts_array.len();
let arrays = part
.batch
.columns()
.iter()
.enumerate()
.map(|(index, array)| {
if index == part.timestamp_index {
Ok(ts_array.clone())
} else {
arrow::compute::filter(&array, &filter).context(error::ComputeArrowSnafu)
}
})
.collect::<Result<Vec<_>>>()?;
let batch = RecordBatch::try_new_with_options(
part.batch.schema(),
arrays,
&RecordBatchOptions::default().with_row_count(Some(num_rows)),
)
.context(error::NewRecordBatchSnafu)?;
Ok(Some(BulkPart {
batch,
num_rows,
max_ts,
min_ts,
sequence: part.sequence,
timestamp_index: part.timestamp_index,
}))
}
type PartitionVec = SmallVec<[TimePartition; 2]>;
@@ -148,60 +285,46 @@ impl TimePartitions {
self.write_multi_parts(kvs, &parts)
}
pub fn write_bulk(&self, rb: BulkPart) -> Result<()> {
pub fn write_bulk(&self, part: BulkPart) -> Result<()> {
let time_type = self
.metadata
.time_index_column()
.column_schema
.data_type
.as_timestamp()
.unwrap();
// Get all parts.
let parts = self.list_partitions();
let mut matched = vec![];
for part in &parts {
let Some(part_time_range) = part.time_range.as_ref() else {
matched.push(part);
continue;
};
if !(rb.max_ts < part_time_range.min_timestamp.value()
|| rb.min_ts >= part_time_range.max_timestamp.value())
{
// find all intersecting time partitions.
matched.push(part);
}
let (matching_parts, missing_parts) = self.find_partitions_by_time_range(
part.timestamps(),
&parts,
time_type.create_timestamp(part.min_ts),
time_type.create_timestamp(part.max_ts),
)?;
if matching_parts.len() == 1 && missing_parts.is_empty() {
// fast path: all timestamps fall in one time partition.
return matching_parts[0].write_record_batch(part);
}
if !matched.is_empty() {
// fixme(hl): we now only write to the first time partition, we should strictly
// split the record batch according to time window
matched[0].write_record_batch(rb)
} else {
// safety: part_duration field must be set when reach here because otherwise
// matched won't be empty.
let part_duration = self.part_duration.unwrap();
let bulk_start_ts = self
.metadata
.time_index_column()
.column_schema
.data_type
.as_timestamp()
.unwrap()
.create_timestamp(rb.min_ts);
let part_start =
partition_start_timestamp(bulk_start_ts, part_duration).with_context(|| {
InvalidRequestSnafu {
region_id: self.metadata.region_id,
reason: format!(
"timestamp {bulk_start_ts:?} and bucket {part_duration:?} are out of range"
),
}
})?;
for matching in matching_parts {
matching.write_record_batch_partial(&part)?
}
for missing in missing_parts {
let new_part = {
let mut inner = self.inner.lock().unwrap();
self.create_time_partition(part_start, &mut inner)?
self.get_or_create_time_partition(missing, &mut inner)?
};
new_part.memtable.write_bulk(rb)
new_part.write_record_batch_partial(&part)?;
}
Ok(())
}
// Creates new parts and return the partition created.
// Creates or gets parts with given start timestamp.
// Acquires the lock to avoid others create the same partition.
fn create_time_partition(
fn get_or_create_time_partition(
&self,
part_start: Timestamp,
inner: &mut MutexGuard<PartitionsInner>,
@@ -377,6 +500,127 @@ impl TimePartitions {
inner.parts.clone()
}
/// Find existing partitions that match the bulk data's time range and identify
/// any new partitions that need to be created
fn find_partitions_by_time_range<'a>(
&self,
ts_array: &ArrayRef,
existing_parts: &'a [TimePartition],
min: Timestamp,
max: Timestamp,
) -> Result<(Vec<&'a TimePartition>, Vec<Timestamp>)> {
let mut matching = Vec::new();
let mut present = HashSet::new();
// First find any existing partitions that overlap
for part in existing_parts {
let Some(part_time_range) = part.time_range.as_ref() else {
matching.push(part);
return Ok((matching, Vec::new()));
};
if !(max < part_time_range.min_timestamp || min >= part_time_range.max_timestamp) {
matching.push(part);
present.insert(part_time_range.min_timestamp.value());
}
}
// safety: self.part_duration can only be present when reach here.
let part_duration = self.part_duration.unwrap();
let timestamp_unit = self.metadata.time_index_type().unit();
let part_duration_sec = part_duration.as_secs() as i64;
// SAFETY: Timestamps won't overflow when converting to Second.
let start_bucket = min
.convert_to(TimeUnit::Second)
.unwrap()
.value()
.div_euclid(part_duration_sec);
let end_bucket = max
.convert_to(TimeUnit::Second)
.unwrap()
.value()
.div_euclid(part_duration_sec);
let bucket_num = (end_bucket - start_bucket + 1) as usize;
let num_timestamps = ts_array.len();
let missing = if bucket_num <= num_timestamps {
(start_bucket..=end_bucket)
.filter_map(|start_sec| {
let Some(timestamp) = Timestamp::new_second(start_sec * part_duration_sec)
.convert_to(timestamp_unit)
else {
return Some(
InvalidRequestSnafu {
region_id: self.metadata.region_id,
reason: format!("Timestamp out of range: {}", start_sec),
}
.fail(),
);
};
if present.insert(timestamp.value()) {
Some(Ok(timestamp))
} else {
None
}
})
.collect::<Result<Vec<_>>>()?
} else {
let ts_primitive = match ts_array.data_type() {
DataType::Timestamp(unit, _) => match unit {
arrow::datatypes::TimeUnit::Second => ts_array
.as_any()
.downcast_ref::<TimestampSecondArray>()
.unwrap()
.reinterpret_cast::<Int64Type>(),
arrow::datatypes::TimeUnit::Millisecond => ts_array
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap()
.reinterpret_cast::<Int64Type>(),
arrow::datatypes::TimeUnit::Microsecond => ts_array
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.unwrap()
.reinterpret_cast::<Int64Type>(),
arrow::datatypes::TimeUnit::Nanosecond => ts_array
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.unwrap()
.reinterpret_cast::<Int64Type>(),
},
_ => unreachable!(),
};
ts_primitive
.values()
.iter()
.filter_map(|ts| {
let ts = self.metadata.time_index_type().create_timestamp(*ts);
let Some(bucket_start) = ts
.convert_to(TimeUnit::Second)
.and_then(|ts| ts.align_by_bucket(part_duration_sec))
.and_then(|ts| ts.convert_to(timestamp_unit))
else {
return Some(
InvalidRequestSnafu {
region_id: self.metadata.region_id,
reason: format!("Timestamp out of range: {:?}", ts),
}
.fail(),
);
};
if present.insert(bucket_start.value()) {
Some(Ok(bucket_start))
} else {
None
}
})
.collect::<Result<Vec<_>>>()?
};
Ok((matching, missing))
}
/// Write to multiple partitions.
fn write_multi_parts(&self, kvs: &KeyValues, parts: &PartitionVec) -> Result<()> {
// If part duration is `None` then there is always one partition and all rows
@@ -436,7 +680,7 @@ impl TimePartitions {
// the same partition.
let mut inner = self.inner.lock().unwrap();
for (part_start, key_values) in missing_parts {
let partition = self.create_time_partition(part_start, &mut inner)?;
let partition = self.get_or_create_time_partition(part_start, &mut inner)?;
for kv in key_values {
partition.memtable.write_one(kv)?;
}
@@ -522,8 +766,20 @@ struct PartitionToWrite<'a> {
#[cfg(test)]
mod tests {
use std::sync::Arc;
use api::v1::SemanticType;
use datatypes::arrow::array::{ArrayRef, StringArray, TimestampMillisecondArray};
use datatypes::arrow::datatypes::{DataType, Field, Schema};
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
use store_api::storage::SequenceNumber;
use super::*;
use crate::memtable::partition_tree::PartitionTreeMemtableBuilder;
use crate::memtable::time_series::TimeSeriesMemtableBuilder;
use crate::test_util::memtable_util::{self, collect_iter_timestamps};
#[test]
@@ -745,4 +1001,334 @@ mod tests {
assert_eq!(3, new_parts.list_partitions()[0].memtable.id());
assert_eq!(4, new_parts.next_memtable_id());
}
#[test]
fn test_find_partitions_by_time_range() {
let metadata = memtable_util::metadata_for_test();
let builder = Arc::new(PartitionTreeMemtableBuilder::default());
// Case 1: No time range partitioning
let partitions = TimePartitions::new(metadata.clone(), builder.clone(), 0, None);
let parts = partitions.list_partitions();
let (matching, missing) = partitions
.find_partitions_by_time_range(
&(Arc::new(TimestampMillisecondArray::from_iter_values(1000..=2000)) as ArrayRef),
&parts,
Timestamp::new_millisecond(1000),
Timestamp::new_millisecond(2000),
)
.unwrap();
assert_eq!(matching.len(), 1);
assert!(missing.is_empty());
assert!(matching[0].time_range.is_none());
// Case 2: With time range partitioning
let partitions = TimePartitions::new(
metadata.clone(),
builder.clone(),
0,
Some(Duration::from_secs(5)),
);
// Create two existing partitions: [0, 5000) and [5000, 10000)
let kvs =
memtable_util::build_key_values(&metadata, "hello".to_string(), 0, &[2000, 4000], 0);
partitions.write(&kvs).unwrap();
let kvs =
memtable_util::build_key_values(&metadata, "hello".to_string(), 0, &[7000, 8000], 2);
partitions.write(&kvs).unwrap();
let parts = partitions.list_partitions();
assert_eq!(2, parts.len());
// Test case 2a: Query fully within existing partition
let (matching, missing) = partitions
.find_partitions_by_time_range(
&(Arc::new(TimestampMillisecondArray::from_iter_values(2000..=4000)) as ArrayRef),
&parts,
Timestamp::new_millisecond(2000),
Timestamp::new_millisecond(4000),
)
.unwrap();
assert_eq!(matching.len(), 1);
assert!(missing.is_empty());
assert_eq!(matching[0].time_range.unwrap().min_timestamp.value(), 0);
// Test case 2b: Query spanning multiple existing partitions
let (matching, missing) = partitions
.find_partitions_by_time_range(
&(Arc::new(TimestampMillisecondArray::from_iter_values(3000..=8000)) as ArrayRef),
&parts,
Timestamp::new_millisecond(3000),
Timestamp::new_millisecond(8000),
)
.unwrap();
assert_eq!(matching.len(), 2);
assert!(missing.is_empty());
assert_eq!(matching[0].time_range.unwrap().min_timestamp.value(), 0);
assert_eq!(matching[1].time_range.unwrap().min_timestamp.value(), 5000);
// Test case 2c: Query requiring new partition
let (matching, missing) = partitions
.find_partitions_by_time_range(
&(Arc::new(TimestampMillisecondArray::from_iter_values(12000..=13000)) as ArrayRef),
&parts,
Timestamp::new_millisecond(12000),
Timestamp::new_millisecond(13000),
)
.unwrap();
assert!(matching.is_empty());
assert_eq!(missing.len(), 1);
assert_eq!(missing[0].value(), 10000);
// Test case 2d: Query partially overlapping existing partition
let (matching, missing) = partitions
.find_partitions_by_time_range(
&(Arc::new(TimestampMillisecondArray::from_iter_values(4000..=6000)) as ArrayRef),
&parts,
Timestamp::new_millisecond(4000),
Timestamp::new_millisecond(6000),
)
.unwrap();
assert_eq!(matching.len(), 2);
assert!(missing.is_empty());
assert_eq!(matching[0].time_range.unwrap().min_timestamp.value(), 0);
assert_eq!(matching[1].time_range.unwrap().min_timestamp.value(), 5000);
// Test case 2e: Corner case
let (matching, missing) = partitions
.find_partitions_by_time_range(
&(Arc::new(TimestampMillisecondArray::from_iter_values(4999..=5000)) as ArrayRef),
&parts,
Timestamp::new_millisecond(4999),
Timestamp::new_millisecond(5000),
)
.unwrap();
assert_eq!(matching.len(), 2);
assert!(missing.is_empty());
assert_eq!(matching[0].time_range.unwrap().min_timestamp.value(), 0);
assert_eq!(matching[1].time_range.unwrap().min_timestamp.value(), 5000);
// Test case 2f: Corner case with
let (matching, missing) = partitions
.find_partitions_by_time_range(
&(Arc::new(TimestampMillisecondArray::from_iter_values(9999..=10000)) as ArrayRef),
&parts,
Timestamp::new_millisecond(9999),
Timestamp::new_millisecond(10000),
)
.unwrap();
assert_eq!(matching.len(), 1);
assert_eq!(1, missing.len());
assert_eq!(matching[0].time_range.unwrap().min_timestamp.value(), 5000);
assert_eq!(missing[0].value(), 10000);
// Test case 2g: Cross 0
let (matching, missing) = partitions
.find_partitions_by_time_range(
&(Arc::new(TimestampMillisecondArray::from_iter_values(-1000..=1000)) as ArrayRef),
&parts,
Timestamp::new_millisecond(-1000),
Timestamp::new_millisecond(1000),
)
.unwrap();
assert_eq!(matching.len(), 1);
assert_eq!(matching[0].time_range.unwrap().min_timestamp.value(), 0);
assert_eq!(1, missing.len());
assert_eq!(missing[0].value(), -5000);
// Test case 3: sparse data
let (matching, missing) = partitions
.find_partitions_by_time_range(
&(Arc::new(TimestampMillisecondArray::from(vec![
-100000000000,
0,
100000000000,
])) as ArrayRef),
&parts,
Timestamp::new_millisecond(-100000000000),
Timestamp::new_millisecond(100000000000),
)
.unwrap();
assert_eq!(2, matching.len());
assert_eq!(matching[0].time_range.unwrap().min_timestamp.value(), 0);
assert_eq!(matching[1].time_range.unwrap().min_timestamp.value(), 5000);
assert_eq!(2, missing.len());
assert_eq!(missing[0].value(), -100000000000);
assert_eq!(missing[1].value(), 100000000000);
}
fn build_part(ts: &[i64], sequence: SequenceNumber) -> BulkPart {
let schema = Arc::new(Schema::new(vec![
Field::new(
"ts",
arrow::datatypes::DataType::Timestamp(
arrow::datatypes::TimeUnit::Millisecond,
None,
),
false,
),
Field::new("val", DataType::Utf8, true),
]));
let ts_data = ts.to_vec();
let ts_array = Arc::new(TimestampMillisecondArray::from(ts_data));
let val_array = Arc::new(StringArray::from_iter_values(
ts.iter().map(|v| v.to_string()),
));
let batch = RecordBatch::try_new(
schema,
vec![ts_array.clone() as ArrayRef, val_array.clone() as ArrayRef],
)
.unwrap();
let max_ts = ts.iter().max().copied().unwrap();
let min_ts = ts.iter().min().copied().unwrap();
BulkPart {
batch,
num_rows: ts.len(),
max_ts,
min_ts,
sequence,
timestamp_index: 0,
}
}
#[test]
fn test_write_bulk() {
let mut metadata_builder = RegionMetadataBuilder::new(0.into());
metadata_builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 0,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("val", ConcreteDataType::string_datatype(), false),
semantic_type: SemanticType::Field,
column_id: 1,
})
.primary_key(vec![]);
let metadata = Arc::new(metadata_builder.build().unwrap());
let builder = Arc::new(TimeSeriesMemtableBuilder::default());
let partitions = TimePartitions::new(
metadata.clone(),
builder.clone(),
0,
Some(Duration::from_secs(5)),
);
// Test case 1: Write to single partition
partitions
.write_bulk(build_part(&[1000, 2000, 3000], 0))
.unwrap();
let parts = partitions.list_partitions();
assert_eq!(1, parts.len());
let iter = parts[0].memtable.iter(None, None, None).unwrap();
let timestamps = collect_iter_timestamps(iter);
assert_eq!(&[1000, 2000, 3000], &timestamps[..]);
// Test case 2: Write across multiple existing partitions
partitions
.write_bulk(build_part(&[4000, 5000, 6000], 1))
.unwrap();
let parts = partitions.list_partitions();
assert_eq!(2, parts.len());
// Check first partition [0, 5000)
let iter = parts[0].memtable.iter(None, None, None).unwrap();
let timestamps = collect_iter_timestamps(iter);
assert_eq!(&[1000, 2000, 3000, 4000], &timestamps[..]);
// Check second partition [5000, 10000)
let iter = parts[1].memtable.iter(None, None, None).unwrap();
let timestamps = collect_iter_timestamps(iter);
assert_eq!(&[5000, 6000], &timestamps[..]);
// Test case 3: Write requiring new partition
partitions
.write_bulk(build_part(&[11000, 12000], 3))
.unwrap();
let parts = partitions.list_partitions();
assert_eq!(3, parts.len());
// Check new partition [10000, 15000)
let iter = parts[2].memtable.iter(None, None, None).unwrap();
let timestamps = collect_iter_timestamps(iter);
assert_eq!(&[11000, 12000], &timestamps[..]);
// Test case 4: Write with no time range partitioning
let partitions = TimePartitions::new(metadata.clone(), builder, 3, None);
partitions
.write_bulk(build_part(&[1000, 5000, 9000], 4))
.unwrap();
let parts = partitions.list_partitions();
assert_eq!(1, parts.len());
let iter = parts[0].memtable.iter(None, None, None).unwrap();
let timestamps = collect_iter_timestamps(iter);
assert_eq!(&[1000, 5000, 9000], &timestamps[..]);
}
#[test]
fn test_split_record_batch() {
let schema = Arc::new(Schema::new(vec![
Field::new(
"ts",
DataType::Timestamp(TimeUnit::Millisecond.as_arrow_time_unit(), None),
false,
),
Field::new("val", DataType::Utf8, true),
]));
let ts_array = Arc::new(TimestampMillisecondArray::from(vec![
1000, 2000, 5000, 7000, 8000,
]));
let val_array = Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![ts_array as ArrayRef, val_array as ArrayRef],
)
.unwrap();
let part = BulkPart {
batch,
num_rows: 5,
max_ts: 8000,
min_ts: 1000,
sequence: 0,
timestamp_index: 0,
};
let result = filter_record_batch(&part, 1000, 2000).unwrap();
assert!(result.is_some());
let filtered = result.unwrap();
assert_eq!(filtered.num_rows, 1);
assert_eq!(filtered.min_ts, 1000);
assert_eq!(filtered.max_ts, 1000);
// Test splitting with range [3000, 6000)
let result = filter_record_batch(&part, 3000, 6000).unwrap();
assert!(result.is_some());
let filtered = result.unwrap();
assert_eq!(filtered.num_rows, 1);
assert_eq!(filtered.min_ts, 5000);
assert_eq!(filtered.max_ts, 5000);
// Test splitting with range that includes no points
let result = filter_record_batch(&part, 3000, 4000).unwrap();
assert!(result.is_none());
// Test splitting with range that includes all points
let result = filter_record_batch(&part, 0, 9000).unwrap();
assert!(result.is_some());
let filtered = result.unwrap();
assert_eq!(filtered.num_rows, 5);
assert_eq!(filtered.min_ts, 1000);
assert_eq!(filtered.max_ts, 8000);
}
}

View File

@@ -43,8 +43,10 @@ impl<S: LogStore> RegionWorkerLoop<S> {
let batch = request.payload;
let num_rows = batch.num_rows();
let Some(ts) =
batch.column_by_name(&region_metadata.time_index_column().column_schema.name)
let Some((ts_index, ts)) = batch
.schema()
.column_with_name(&region_metadata.time_index_column().column_schema.name)
.map(|(index, _)| (index, batch.column(index)))
else {
sender.send(
error::InvalidRequestSnafu {
@@ -115,6 +117,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
max_ts,
min_ts,
sequence: 0,
timestamp_index: ts_index,
};
pending_bulk_request.push(SenderBulkRequest {
sender,

View File

@@ -30,6 +30,7 @@ use common_macro::stack_trace_debug;
use datatypes::arrow;
use datatypes::arrow::datatypes::FieldRef;
use datatypes::schema::{ColumnSchema, FulltextOptions, Schema, SchemaRef, SkippingIndexOptions};
use datatypes::types::TimestampType;
use serde::de::Error;
use serde::{Deserialize, Deserializer, Serialize};
use snafu::{ensure, Location, OptionExt, ResultExt, Snafu};
@@ -241,6 +242,19 @@ impl RegionMetadata {
&self.column_metadatas[index]
}
/// Returns timestamp type of time index column
///
/// # Panics
/// Panics if the time index column id is invalid.
pub fn time_index_type(&self) -> TimestampType {
let index = self.id_to_index[&self.time_index];
self.column_metadatas[index]
.column_schema
.data_type
.as_timestamp()
.unwrap()
}
/// Returns the position of the time index.
pub fn time_index_column_pos(&self) -> usize {
self.id_to_index[&self.time_index]