mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-22 16:00:38 +00:00
Merge branch 'main' into fix/parameter-datetime
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -7886,6 +7886,7 @@ dependencies = [
|
||||
"common-base",
|
||||
"common-error",
|
||||
"common-function",
|
||||
"common-grpc",
|
||||
"common-macro",
|
||||
"common-meta",
|
||||
"common-query",
|
||||
|
||||
@@ -17,6 +17,7 @@ bytes.workspace = true
|
||||
fxhash = "0.2"
|
||||
common-base.workspace = true
|
||||
common-error.workspace = true
|
||||
common-grpc.workspace = true
|
||||
common-macro.workspace = true
|
||||
common-query.workspace = true
|
||||
common-recordbatch.workspace = true
|
||||
|
||||
426
src/metric-engine/src/batch_modifier.rs
Normal file
426
src/metric-engine/src/batch_modifier.rs
Normal file
@@ -0,0 +1,426 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::hash::Hasher;
|
||||
use std::sync::Arc;
|
||||
|
||||
use datatypes::arrow::array::{Array, BinaryBuilder, StringArray, UInt64Array};
|
||||
use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
|
||||
use datatypes::arrow::record_batch::RecordBatch;
|
||||
use datatypes::value::ValueRef;
|
||||
use fxhash::FxHasher;
|
||||
use mito_codec::row_converter::SparsePrimaryKeyCodec;
|
||||
use snafu::ResultExt;
|
||||
use store_api::storage::ColumnId;
|
||||
use store_api::storage::consts::{PRIMARY_KEY_COLUMN_NAME, ReservedColumnId};
|
||||
|
||||
use crate::error::{EncodePrimaryKeySnafu, Result, UnexpectedRequestSnafu};
|
||||
|
||||
/// Info about a tag column for TSID computation and sparse primary key encoding.
|
||||
#[allow(dead_code)]
|
||||
pub(crate) struct TagColumnInfo {
|
||||
/// Column name (used for label-name hash).
|
||||
pub name: String,
|
||||
/// Column index in the RecordBatch.
|
||||
pub index: usize,
|
||||
/// Column ID in the physical region.
|
||||
pub column_id: ColumnId,
|
||||
}
|
||||
|
||||
/// Computes `__tsid` values for each row.
|
||||
#[allow(dead_code)]
|
||||
pub(crate) fn compute_tsid_array(
|
||||
batch: &RecordBatch,
|
||||
sorted_tag_columns: &[TagColumnInfo],
|
||||
tag_arrays: &[&StringArray],
|
||||
) -> UInt64Array {
|
||||
let num_rows = batch.num_rows();
|
||||
|
||||
let label_name_hash = {
|
||||
let mut hasher = FxHasher::default();
|
||||
for tag_col in sorted_tag_columns {
|
||||
hasher.write(tag_col.name.as_bytes());
|
||||
hasher.write_u8(0xff);
|
||||
}
|
||||
hasher.finish()
|
||||
};
|
||||
|
||||
let mut tsid_values = Vec::with_capacity(num_rows);
|
||||
for row in 0..num_rows {
|
||||
let has_null = tag_arrays.iter().any(|arr| arr.is_null(row));
|
||||
|
||||
let tsid = if !has_null {
|
||||
let mut hasher = FxHasher::default();
|
||||
hasher.write_u64(label_name_hash);
|
||||
for arr in tag_arrays {
|
||||
hasher.write(arr.value(row).as_bytes());
|
||||
hasher.write_u8(0xff);
|
||||
}
|
||||
hasher.finish()
|
||||
} else {
|
||||
let mut name_hasher = FxHasher::default();
|
||||
for (tc, arr) in sorted_tag_columns.iter().zip(tag_arrays.iter()) {
|
||||
if !arr.is_null(row) {
|
||||
name_hasher.write(tc.name.as_bytes());
|
||||
name_hasher.write_u8(0xff);
|
||||
}
|
||||
}
|
||||
let row_label_hash = name_hasher.finish();
|
||||
|
||||
let mut val_hasher = FxHasher::default();
|
||||
val_hasher.write_u64(row_label_hash);
|
||||
for arr in tag_arrays {
|
||||
if !arr.is_null(row) {
|
||||
val_hasher.write(arr.value(row).as_bytes());
|
||||
val_hasher.write_u8(0xff);
|
||||
}
|
||||
}
|
||||
val_hasher.finish()
|
||||
};
|
||||
|
||||
tsid_values.push(tsid);
|
||||
}
|
||||
|
||||
UInt64Array::from(tsid_values)
|
||||
}
|
||||
|
||||
fn build_tag_arrays<'a>(
|
||||
batch: &'a RecordBatch,
|
||||
sorted_tag_columns: &[TagColumnInfo],
|
||||
) -> Vec<&'a StringArray> {
|
||||
sorted_tag_columns
|
||||
.iter()
|
||||
.map(|tc| {
|
||||
batch
|
||||
.column(tc.index)
|
||||
.as_any()
|
||||
.downcast_ref::<StringArray>()
|
||||
.expect("tag column must be utf8")
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Modifies a RecordBatch for sparse primary key encoding.
|
||||
#[allow(dead_code)]
|
||||
pub(crate) fn modify_batch_sparse(
|
||||
batch: RecordBatch,
|
||||
table_id: u32,
|
||||
sorted_tag_columns: &[TagColumnInfo],
|
||||
non_tag_column_indices: &[usize],
|
||||
) -> Result<RecordBatch> {
|
||||
let num_rows = batch.num_rows();
|
||||
let codec = SparsePrimaryKeyCodec::schemaless();
|
||||
let tag_arrays: Vec<&StringArray> = build_tag_arrays(&batch, sorted_tag_columns);
|
||||
let tsid_array = compute_tsid_array(&batch, sorted_tag_columns, &tag_arrays);
|
||||
|
||||
let mut pk_builder = BinaryBuilder::with_capacity(num_rows, 0);
|
||||
let mut buffer = Vec::new();
|
||||
for row in 0..num_rows {
|
||||
buffer.clear();
|
||||
let internal = [
|
||||
(ReservedColumnId::table_id(), ValueRef::UInt32(table_id)),
|
||||
(
|
||||
ReservedColumnId::tsid(),
|
||||
ValueRef::UInt64(tsid_array.value(row)),
|
||||
),
|
||||
];
|
||||
codec
|
||||
.encode_to_vec(internal.into_iter(), &mut buffer)
|
||||
.context(EncodePrimaryKeySnafu)?;
|
||||
|
||||
let tags = sorted_tag_columns
|
||||
.iter()
|
||||
.zip(tag_arrays.iter())
|
||||
.filter(|(_, arr)| !arr.is_null(row))
|
||||
.map(|(tc, arr)| (tc.column_id, ValueRef::String(arr.value(row))));
|
||||
codec
|
||||
.encode_to_vec(tags, &mut buffer)
|
||||
.context(EncodePrimaryKeySnafu)?;
|
||||
|
||||
pk_builder.append_value(&buffer);
|
||||
}
|
||||
|
||||
let pk_array = pk_builder.finish();
|
||||
|
||||
let mut fields = vec![Arc::new(Field::new(
|
||||
PRIMARY_KEY_COLUMN_NAME,
|
||||
DataType::Binary,
|
||||
false,
|
||||
))];
|
||||
let mut columns: Vec<Arc<dyn Array>> = vec![Arc::new(pk_array)];
|
||||
|
||||
for &idx in non_tag_column_indices {
|
||||
fields.push(batch.schema().fields()[idx].clone());
|
||||
columns.push(batch.column(idx).clone());
|
||||
}
|
||||
|
||||
let new_schema = Arc::new(ArrowSchema::new(fields));
|
||||
RecordBatch::try_new(new_schema, columns).map_err(|e| {
|
||||
UnexpectedRequestSnafu {
|
||||
reason: format!("Failed to build modified sparse RecordBatch: {e}"),
|
||||
}
|
||||
.build()
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::value::ValueData;
|
||||
use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value};
|
||||
use datatypes::arrow::array::{BinaryArray, Int64Array, StringArray};
|
||||
use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
|
||||
use datatypes::arrow::record_batch::RecordBatch;
|
||||
use store_api::codec::PrimaryKeyEncoding;
|
||||
use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
|
||||
|
||||
use super::*;
|
||||
use crate::row_modifier::{RowModifier, RowsIter, TableIdInput};
|
||||
|
||||
fn build_sparse_test_batch() -> RecordBatch {
|
||||
let schema = Arc::new(ArrowSchema::new(vec![
|
||||
Field::new("greptime_timestamp", DataType::Int64, false),
|
||||
Field::new("greptime_value", DataType::Float64, true),
|
||||
Field::new("namespace", DataType::Utf8, true),
|
||||
Field::new("host", DataType::Utf8, true),
|
||||
]));
|
||||
RecordBatch::try_new(
|
||||
schema,
|
||||
vec![
|
||||
Arc::new(Int64Array::from(vec![1000])),
|
||||
Arc::new(datatypes::arrow::array::Float64Array::from(vec![42.0])),
|
||||
Arc::new(StringArray::from(vec!["greptimedb"])),
|
||||
Arc::new(StringArray::from(vec!["127.0.0.1"])),
|
||||
],
|
||||
)
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
fn sparse_tag_columns() -> Vec<TagColumnInfo> {
|
||||
vec![
|
||||
TagColumnInfo {
|
||||
name: "host".to_string(),
|
||||
index: 3,
|
||||
column_id: 3,
|
||||
},
|
||||
TagColumnInfo {
|
||||
name: "namespace".to_string(),
|
||||
index: 2,
|
||||
column_id: 2,
|
||||
},
|
||||
]
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_compute_tsid_basic() {
|
||||
let schema = Arc::new(ArrowSchema::new(vec![
|
||||
Field::new("namespace", DataType::Utf8, true),
|
||||
Field::new("host", DataType::Utf8, true),
|
||||
]));
|
||||
let batch = RecordBatch::try_new(
|
||||
schema,
|
||||
vec![
|
||||
Arc::new(StringArray::from(vec!["greptimedb"])),
|
||||
Arc::new(StringArray::from(vec!["127.0.0.1"])),
|
||||
],
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let tag_columns: Vec<TagColumnInfo> = vec![
|
||||
TagColumnInfo {
|
||||
name: "host".to_string(),
|
||||
index: 1,
|
||||
column_id: 2,
|
||||
},
|
||||
TagColumnInfo {
|
||||
name: "namespace".to_string(),
|
||||
index: 0,
|
||||
column_id: 1,
|
||||
},
|
||||
];
|
||||
let tag_arrays = build_tag_arrays(&batch, &tag_columns);
|
||||
let tsid_array = compute_tsid_array(&batch, &tag_columns, &tag_arrays);
|
||||
|
||||
assert_eq!(tsid_array.value(0), 2721566936019240841);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_compute_tsid_with_nulls() {
|
||||
let schema = Arc::new(ArrowSchema::new(vec![
|
||||
Field::new("a", DataType::Utf8, true),
|
||||
Field::new("b", DataType::Utf8, true),
|
||||
]));
|
||||
let batch_no_null = RecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![
|
||||
Arc::new(StringArray::from(vec!["A"])),
|
||||
Arc::new(StringArray::from(vec!["B"])),
|
||||
],
|
||||
)
|
||||
.unwrap();
|
||||
let tag_cols_2: Vec<TagColumnInfo> = vec![
|
||||
TagColumnInfo {
|
||||
name: "a".to_string(),
|
||||
index: 0,
|
||||
column_id: 1,
|
||||
},
|
||||
TagColumnInfo {
|
||||
name: "b".to_string(),
|
||||
index: 1,
|
||||
column_id: 2,
|
||||
},
|
||||
];
|
||||
let tag_arrays_2 = build_tag_arrays(&batch_no_null, &tag_cols_2);
|
||||
let tsid_no_null = compute_tsid_array(&batch_no_null, &tag_cols_2, &tag_arrays_2);
|
||||
|
||||
let schema3 = Arc::new(ArrowSchema::new(vec![
|
||||
Field::new("a", DataType::Utf8, true),
|
||||
Field::new("b", DataType::Utf8, true),
|
||||
Field::new("c", DataType::Utf8, true),
|
||||
]));
|
||||
let batch_with_null = RecordBatch::try_new(
|
||||
schema3,
|
||||
vec![
|
||||
Arc::new(StringArray::from(vec!["A"])),
|
||||
Arc::new(StringArray::from(vec!["B"])),
|
||||
Arc::new(StringArray::from(vec![None as Option<&str>])),
|
||||
],
|
||||
)
|
||||
.unwrap();
|
||||
let tag_cols_3: Vec<TagColumnInfo> = vec![
|
||||
TagColumnInfo {
|
||||
name: "a".to_string(),
|
||||
index: 0,
|
||||
column_id: 1,
|
||||
},
|
||||
TagColumnInfo {
|
||||
name: "b".to_string(),
|
||||
index: 1,
|
||||
column_id: 2,
|
||||
},
|
||||
TagColumnInfo {
|
||||
name: "c".to_string(),
|
||||
index: 2,
|
||||
column_id: 3,
|
||||
},
|
||||
];
|
||||
let tag_arrays_3 = build_tag_arrays(&batch_with_null, &tag_cols_3);
|
||||
let tsid_with_null = compute_tsid_array(&batch_with_null, &tag_cols_3, &tag_arrays_3);
|
||||
|
||||
assert_eq!(tsid_no_null.value(0), tsid_with_null.value(0));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_modify_batch_sparse() {
|
||||
let batch = build_sparse_test_batch();
|
||||
let tag_columns = sparse_tag_columns();
|
||||
let non_tag_indices = vec![0, 1];
|
||||
let table_id: u32 = 1025;
|
||||
|
||||
let modified =
|
||||
modify_batch_sparse(batch, table_id, &tag_columns, &non_tag_indices).unwrap();
|
||||
|
||||
assert_eq!(modified.num_columns(), 3);
|
||||
assert_eq!(modified.schema().field(0).name(), PRIMARY_KEY_COLUMN_NAME);
|
||||
assert_eq!(modified.schema().field(1).name(), "greptime_timestamp");
|
||||
assert_eq!(modified.schema().field(2).name(), "greptime_value");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_modify_batch_sparse_matches_row_modifier() {
|
||||
let batch = build_sparse_test_batch();
|
||||
let tag_columns = sparse_tag_columns();
|
||||
let non_tag_indices = vec![0, 1];
|
||||
let table_id: u32 = 1025;
|
||||
let modified =
|
||||
modify_batch_sparse(batch, table_id, &tag_columns, &non_tag_indices).unwrap();
|
||||
|
||||
let name_to_column_id: HashMap<String, ColumnId> = [
|
||||
("greptime_timestamp".to_string(), 0),
|
||||
("greptime_value".to_string(), 1),
|
||||
("namespace".to_string(), 2),
|
||||
("host".to_string(), 3),
|
||||
]
|
||||
.into_iter()
|
||||
.collect();
|
||||
|
||||
let rows = Rows {
|
||||
schema: vec![
|
||||
ColumnSchema {
|
||||
column_name: "greptime_timestamp".to_string(),
|
||||
datatype: ColumnDataType::TimestampMillisecond as i32,
|
||||
semantic_type: SemanticType::Timestamp as i32,
|
||||
..Default::default()
|
||||
},
|
||||
ColumnSchema {
|
||||
column_name: "greptime_value".to_string(),
|
||||
datatype: ColumnDataType::Float64 as i32,
|
||||
semantic_type: SemanticType::Field as i32,
|
||||
..Default::default()
|
||||
},
|
||||
ColumnSchema {
|
||||
column_name: "namespace".to_string(),
|
||||
datatype: ColumnDataType::String as i32,
|
||||
semantic_type: SemanticType::Tag as i32,
|
||||
..Default::default()
|
||||
},
|
||||
ColumnSchema {
|
||||
column_name: "host".to_string(),
|
||||
datatype: ColumnDataType::String as i32,
|
||||
semantic_type: SemanticType::Tag as i32,
|
||||
..Default::default()
|
||||
},
|
||||
],
|
||||
rows: vec![Row {
|
||||
values: vec![
|
||||
Value {
|
||||
value_data: Some(ValueData::TimestampMillisecondValue(1000)),
|
||||
},
|
||||
Value {
|
||||
value_data: Some(ValueData::F64Value(42.0)),
|
||||
},
|
||||
Value {
|
||||
value_data: Some(ValueData::StringValue("greptimedb".to_string())),
|
||||
},
|
||||
Value {
|
||||
value_data: Some(ValueData::StringValue("127.0.0.1".to_string())),
|
||||
},
|
||||
],
|
||||
}],
|
||||
};
|
||||
|
||||
let row_iter = RowsIter::new(rows, &name_to_column_id);
|
||||
let rows = RowModifier::default()
|
||||
.modify_rows(
|
||||
row_iter,
|
||||
TableIdInput::Single(table_id),
|
||||
PrimaryKeyEncoding::Sparse,
|
||||
)
|
||||
.unwrap();
|
||||
let ValueData::BinaryValue(expected_pk) =
|
||||
rows.rows[0].values[0].value_data.clone().unwrap()
|
||||
else {
|
||||
panic!("expected binary primary key");
|
||||
};
|
||||
|
||||
let actual_array = modified
|
||||
.column(0)
|
||||
.as_any()
|
||||
.downcast_ref::<BinaryArray>()
|
||||
.unwrap();
|
||||
assert_eq!(actual_array.value(0), expected_pk.as_slice());
|
||||
}
|
||||
}
|
||||
@@ -13,6 +13,7 @@
|
||||
// limitations under the License.
|
||||
|
||||
mod alter;
|
||||
mod bulk_insert;
|
||||
mod catchup;
|
||||
mod close;
|
||||
mod create;
|
||||
@@ -288,9 +289,8 @@ impl RegionEngine for MetricEngine {
|
||||
debug_assert_eq!(region_id, resp_region_id);
|
||||
return response;
|
||||
}
|
||||
RegionRequest::BulkInserts(_) => {
|
||||
// todo(hl): find a way to support bulk inserts in metric engine.
|
||||
UnsupportedRegionRequestSnafu { request }.fail()
|
||||
RegionRequest::BulkInserts(bulk) => {
|
||||
self.inner.bulk_insert_region(region_id, bulk).await
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
783
src/metric-engine/src/engine/bulk_insert.rs
Normal file
783
src/metric-engine/src/engine/bulk_insert.rs
Normal file
@@ -0,0 +1,783 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashSet;
|
||||
|
||||
use api::v1::{ArrowIpc, ColumnDataType, SemanticType};
|
||||
use bytes::Bytes;
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_grpc::flight::{FlightEncoder, FlightMessage};
|
||||
use common_query::prelude::{greptime_timestamp, greptime_value};
|
||||
use datatypes::arrow::array::{Array, Float64Array, StringArray, TimestampMillisecondArray};
|
||||
use datatypes::arrow::record_batch::RecordBatch;
|
||||
use snafu::{OptionExt, ensure};
|
||||
use store_api::codec::PrimaryKeyEncoding;
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::region_request::{
|
||||
AffectedRows, RegionBulkInsertsRequest, RegionPutRequest, RegionRequest,
|
||||
};
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::batch_modifier::{TagColumnInfo, modify_batch_sparse};
|
||||
use crate::engine::MetricEngineInner;
|
||||
use crate::error;
|
||||
use crate::error::Result;
|
||||
|
||||
impl MetricEngineInner {
|
||||
/// Bulk-inserts logical rows into a metric region.
|
||||
///
|
||||
/// This method accepts a `RegionBulkInsertsRequest` whose payload is a logical
|
||||
/// `RecordBatch` (timestamp, value and tag columns) for the given logical `region_id`.
|
||||
///
|
||||
/// The transformed batch is encoded to Arrow IPC and forwarded as a `BulkInserts`
|
||||
/// request to the data region, along with the original `partition_expr_version`.
|
||||
/// If the data region reports `StatusCode::Unsupported` for bulk inserts, the request
|
||||
/// is transparently retried as a `Put` by converting the original logical batch into
|
||||
/// `api::v1::Rows`, so callers observe the same semantics as `put_region`.
|
||||
///
|
||||
/// Returns the number of affected rows, or `0` if the input batch is empty.
|
||||
pub async fn bulk_insert_region(
|
||||
&self,
|
||||
region_id: RegionId,
|
||||
request: RegionBulkInsertsRequest,
|
||||
) -> Result<AffectedRows> {
|
||||
ensure!(
|
||||
!self.is_physical_region(region_id),
|
||||
error::UnsupportedRegionRequestSnafu {
|
||||
request: RegionRequest::BulkInserts(request),
|
||||
}
|
||||
);
|
||||
|
||||
let (physical_region_id, data_region_id, primary_key_encoding) =
|
||||
self.find_data_region_meta(region_id)?;
|
||||
|
||||
if primary_key_encoding != PrimaryKeyEncoding::Sparse {
|
||||
return error::UnsupportedRegionRequestSnafu {
|
||||
request: RegionRequest::BulkInserts(request),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
|
||||
let batch = request.payload;
|
||||
if batch.num_rows() == 0 {
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
let logical_metadata = self
|
||||
.logical_region_metadata(physical_region_id, region_id)
|
||||
.await?;
|
||||
let (tag_columns, non_tag_indices) = self.resolve_tag_columns_from_metadata(
|
||||
region_id,
|
||||
data_region_id,
|
||||
&batch,
|
||||
&logical_metadata,
|
||||
)?;
|
||||
let modified_batch = modify_batch_sparse(
|
||||
batch.clone(),
|
||||
region_id.table_id(),
|
||||
&tag_columns,
|
||||
&non_tag_indices,
|
||||
)?;
|
||||
let (schema, data_header, payload) = record_batch_to_ipc(&modified_batch)?;
|
||||
|
||||
let partition_expr_version = request.partition_expr_version;
|
||||
let request = RegionBulkInsertsRequest {
|
||||
region_id: data_region_id,
|
||||
payload: modified_batch,
|
||||
raw_data: ArrowIpc {
|
||||
schema,
|
||||
data_header,
|
||||
payload,
|
||||
},
|
||||
partition_expr_version,
|
||||
};
|
||||
match self
|
||||
.data_region
|
||||
.write_data(data_region_id, RegionRequest::BulkInserts(request))
|
||||
.await
|
||||
{
|
||||
Ok(affected_rows) => Ok(affected_rows),
|
||||
Err(err) if err.status_code() == StatusCode::Unsupported => {
|
||||
// todo(hl): fallback path for PartitionTreeMemtable, remove this once we remove it
|
||||
let rows = record_batch_to_rows(&batch, region_id)?;
|
||||
self.put_region(
|
||||
region_id,
|
||||
RegionPutRequest {
|
||||
rows,
|
||||
hint: None,
|
||||
partition_expr_version,
|
||||
},
|
||||
)
|
||||
.await
|
||||
}
|
||||
Err(err) => Err(err),
|
||||
}
|
||||
}
|
||||
|
||||
fn resolve_tag_columns_from_metadata(
|
||||
&self,
|
||||
logical_region_id: RegionId,
|
||||
data_region_id: RegionId,
|
||||
batch: &RecordBatch,
|
||||
logical_metadata: &RegionMetadataRef,
|
||||
) -> Result<(Vec<TagColumnInfo>, Vec<usize>)> {
|
||||
let tag_names: HashSet<&str> = logical_metadata
|
||||
.column_metadatas
|
||||
.iter()
|
||||
.filter_map(|column| {
|
||||
if column.semantic_type == SemanticType::Tag {
|
||||
Some(column.column_schema.name.as_str())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
let mut tag_columns = Vec::new();
|
||||
let mut non_tag_indices = Vec::new();
|
||||
{
|
||||
let state = self.state.read().unwrap();
|
||||
let physical_columns = state
|
||||
.physical_region_states()
|
||||
.get(&data_region_id)
|
||||
.context(error::PhysicalRegionNotFoundSnafu {
|
||||
region_id: data_region_id,
|
||||
})?
|
||||
.physical_columns();
|
||||
|
||||
for (index, field) in batch.schema().fields().iter().enumerate() {
|
||||
let name = field.name();
|
||||
let column_id =
|
||||
*physical_columns
|
||||
.get(name)
|
||||
.with_context(|| error::ColumnNotFoundSnafu {
|
||||
name: name.clone(),
|
||||
region_id: logical_region_id,
|
||||
})?;
|
||||
if tag_names.contains(name.as_str()) {
|
||||
tag_columns.push(TagColumnInfo {
|
||||
name: name.clone(),
|
||||
index,
|
||||
column_id,
|
||||
});
|
||||
} else {
|
||||
non_tag_indices.push(index);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tag_columns.sort_by(|a, b| a.name.cmp(&b.name));
|
||||
Ok((tag_columns, non_tag_indices))
|
||||
}
|
||||
}
|
||||
|
||||
fn record_batch_to_rows(batch: &RecordBatch, logical_region_id: RegionId) -> Result<api::v1::Rows> {
|
||||
let schema_ref = batch.schema();
|
||||
let fields = schema_ref.fields();
|
||||
|
||||
let mut ts_idx = None;
|
||||
let mut val_idx = None;
|
||||
let mut tag_indices = Vec::new();
|
||||
|
||||
for (idx, field) in fields.iter().enumerate() {
|
||||
if field.name() == greptime_timestamp() {
|
||||
ts_idx = Some(idx);
|
||||
if !matches!(
|
||||
field.data_type(),
|
||||
datatypes::arrow::datatypes::DataType::Timestamp(
|
||||
datatypes::arrow::datatypes::TimeUnit::Millisecond,
|
||||
_
|
||||
)
|
||||
) {
|
||||
return error::UnexpectedRequestSnafu {
|
||||
reason: format!(
|
||||
"Timestamp column '{}' in region {:?} has incompatible type: {:?}",
|
||||
field.name(),
|
||||
logical_region_id,
|
||||
field.data_type()
|
||||
),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
} else if field.name() == greptime_value() {
|
||||
val_idx = Some(idx);
|
||||
if !matches!(
|
||||
field.data_type(),
|
||||
datatypes::arrow::datatypes::DataType::Float64
|
||||
) {
|
||||
return error::UnexpectedRequestSnafu {
|
||||
reason: format!(
|
||||
"Value column '{}' in region {:?} has incompatible type: {:?}",
|
||||
field.name(),
|
||||
logical_region_id,
|
||||
field.data_type()
|
||||
),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
} else {
|
||||
if !matches!(
|
||||
field.data_type(),
|
||||
datatypes::arrow::datatypes::DataType::Utf8
|
||||
) {
|
||||
return error::UnexpectedRequestSnafu {
|
||||
reason: format!(
|
||||
"Tag column '{}' in region {:?} must be Utf8, found: {:?}",
|
||||
field.name(),
|
||||
logical_region_id,
|
||||
field.data_type()
|
||||
),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
tag_indices.push(idx);
|
||||
}
|
||||
}
|
||||
|
||||
let ts_idx = ts_idx.with_context(|| error::UnexpectedRequestSnafu {
|
||||
reason: format!(
|
||||
"Timestamp column '{}' not found in RecordBatch for region {:?}",
|
||||
greptime_timestamp(),
|
||||
logical_region_id
|
||||
),
|
||||
})?;
|
||||
let val_idx = val_idx.with_context(|| error::UnexpectedRequestSnafu {
|
||||
reason: format!(
|
||||
"Value column '{}' not found in RecordBatch for region {:?}",
|
||||
greptime_value(),
|
||||
logical_region_id
|
||||
),
|
||||
})?;
|
||||
|
||||
let mut schema = Vec::with_capacity(2 + tag_indices.len());
|
||||
schema.push(api::v1::ColumnSchema {
|
||||
column_name: greptime_timestamp().to_string(),
|
||||
datatype: ColumnDataType::TimestampMillisecond as i32,
|
||||
semantic_type: SemanticType::Timestamp as i32,
|
||||
datatype_extension: None,
|
||||
options: None,
|
||||
});
|
||||
schema.push(api::v1::ColumnSchema {
|
||||
column_name: greptime_value().to_string(),
|
||||
datatype: ColumnDataType::Float64 as i32,
|
||||
semantic_type: SemanticType::Field as i32,
|
||||
datatype_extension: None,
|
||||
options: None,
|
||||
});
|
||||
for &idx in &tag_indices {
|
||||
let field = &fields[idx];
|
||||
schema.push(api::v1::ColumnSchema {
|
||||
column_name: field.name().clone(),
|
||||
datatype: ColumnDataType::String as i32,
|
||||
semantic_type: SemanticType::Tag as i32,
|
||||
datatype_extension: None,
|
||||
options: None,
|
||||
});
|
||||
}
|
||||
|
||||
let ts_array = batch
|
||||
.column(ts_idx)
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampMillisecondArray>()
|
||||
.expect("validated as TimestampMillisecond");
|
||||
let val_array = batch
|
||||
.column(val_idx)
|
||||
.as_any()
|
||||
.downcast_ref::<Float64Array>()
|
||||
.expect("validated as Float64");
|
||||
let tag_arrays: Vec<&StringArray> = tag_indices
|
||||
.iter()
|
||||
.map(|&idx| {
|
||||
batch
|
||||
.column(idx)
|
||||
.as_any()
|
||||
.downcast_ref::<StringArray>()
|
||||
.expect("validated as Utf8")
|
||||
})
|
||||
.collect();
|
||||
|
||||
let num_rows = batch.num_rows();
|
||||
let mut rows = Vec::with_capacity(num_rows);
|
||||
for row_idx in 0..num_rows {
|
||||
let mut values = Vec::with_capacity(2 + tag_arrays.len());
|
||||
|
||||
if ts_array.is_null(row_idx) {
|
||||
values.push(api::v1::Value { value_data: None });
|
||||
} else {
|
||||
values.push(api::v1::Value {
|
||||
value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(
|
||||
ts_array.value(row_idx),
|
||||
)),
|
||||
});
|
||||
}
|
||||
|
||||
if val_array.is_null(row_idx) {
|
||||
values.push(api::v1::Value { value_data: None });
|
||||
} else {
|
||||
values.push(api::v1::Value {
|
||||
value_data: Some(api::v1::value::ValueData::F64Value(
|
||||
val_array.value(row_idx),
|
||||
)),
|
||||
});
|
||||
}
|
||||
|
||||
for arr in &tag_arrays {
|
||||
if arr.is_null(row_idx) {
|
||||
values.push(api::v1::Value { value_data: None });
|
||||
} else {
|
||||
values.push(api::v1::Value {
|
||||
value_data: Some(api::v1::value::ValueData::StringValue(
|
||||
arr.value(row_idx).to_string(),
|
||||
)),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
rows.push(api::v1::Row { values });
|
||||
}
|
||||
|
||||
Ok(api::v1::Rows { schema, rows })
|
||||
}
|
||||
|
||||
fn record_batch_to_ipc(record_batch: &RecordBatch) -> Result<(Bytes, Bytes, Bytes)> {
|
||||
let mut encoder = FlightEncoder::default();
|
||||
let schema = encoder.encode_schema(record_batch.schema().as_ref());
|
||||
let mut iter = encoder
|
||||
.encode(FlightMessage::RecordBatch(record_batch.clone()))
|
||||
.into_iter();
|
||||
|
||||
let Some(flight_data) = iter.next() else {
|
||||
return error::UnexpectedRequestSnafu {
|
||||
reason: "Failed to encode empty flight data",
|
||||
}
|
||||
.fail();
|
||||
};
|
||||
ensure!(
|
||||
iter.next().is_none(),
|
||||
error::UnexpectedRequestSnafu {
|
||||
reason: "Bulk insert RecordBatch with dictionary arrays is unsupported".to_string(),
|
||||
}
|
||||
);
|
||||
|
||||
Ok((
|
||||
schema.data_header,
|
||||
flight_data.data_header,
|
||||
flight_data.data_body,
|
||||
))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::assert_matches::assert_matches;
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::ArrowIpc;
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_query::prelude::{greptime_timestamp, greptime_value};
|
||||
use common_recordbatch::RecordBatches;
|
||||
use datatypes::arrow::array::{Float64Array, StringArray, TimestampMillisecondArray};
|
||||
use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema, TimeUnit};
|
||||
use datatypes::arrow::record_batch::RecordBatch;
|
||||
use store_api::metric_engine_consts::MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING;
|
||||
use store_api::path_utils::table_dir;
|
||||
use store_api::region_engine::RegionEngine;
|
||||
use store_api::region_request::{RegionBulkInsertsRequest, RegionPutRequest, RegionRequest};
|
||||
use store_api::storage::{RegionId, ScanRequest};
|
||||
|
||||
use super::record_batch_to_ipc;
|
||||
use crate::error::Error;
|
||||
use crate::test_util::{self, TestEnv};
|
||||
|
||||
fn build_logical_batch(start: usize, rows: usize) -> RecordBatch {
|
||||
let schema = Arc::new(ArrowSchema::new(vec![
|
||||
Field::new(
|
||||
greptime_timestamp(),
|
||||
DataType::Timestamp(TimeUnit::Millisecond, None),
|
||||
false,
|
||||
),
|
||||
Field::new(greptime_value(), DataType::Float64, true),
|
||||
Field::new("job", DataType::Utf8, true),
|
||||
]));
|
||||
|
||||
let mut ts = Vec::with_capacity(rows);
|
||||
let mut values = Vec::with_capacity(rows);
|
||||
let mut tags = Vec::with_capacity(rows);
|
||||
for i in start..start + rows {
|
||||
ts.push(i as i64);
|
||||
values.push(i as f64);
|
||||
tags.push("tag_0".to_string());
|
||||
}
|
||||
|
||||
RecordBatch::try_new(
|
||||
schema,
|
||||
vec![
|
||||
Arc::new(TimestampMillisecondArray::from(ts)),
|
||||
Arc::new(Float64Array::from(values)),
|
||||
Arc::new(StringArray::from(tags)),
|
||||
],
|
||||
)
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
fn build_bulk_request(logical_region_id: RegionId, batch: RecordBatch) -> RegionRequest {
|
||||
let (schema, data_header, payload) = record_batch_to_ipc(&batch).unwrap();
|
||||
RegionRequest::BulkInserts(RegionBulkInsertsRequest {
|
||||
region_id: logical_region_id,
|
||||
payload: batch,
|
||||
raw_data: ArrowIpc {
|
||||
schema,
|
||||
data_header,
|
||||
payload,
|
||||
},
|
||||
partition_expr_version: None,
|
||||
})
|
||||
}
|
||||
|
||||
async fn init_dense_metric_region(env: &TestEnv) -> RegionId {
|
||||
let physical_region_id = env.default_physical_region_id();
|
||||
env.create_physical_region(
|
||||
physical_region_id,
|
||||
&TestEnv::default_table_dir(),
|
||||
vec![(
|
||||
MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING.to_string(),
|
||||
"dense".to_string(),
|
||||
)],
|
||||
)
|
||||
.await;
|
||||
|
||||
let logical_region_id = env.default_logical_region_id();
|
||||
let request = test_util::create_logical_region_request(
|
||||
&["job"],
|
||||
physical_region_id,
|
||||
&table_dir("test", logical_region_id.table_id()),
|
||||
);
|
||||
env.metric()
|
||||
.handle_request(logical_region_id, RegionRequest::Create(request))
|
||||
.await
|
||||
.unwrap();
|
||||
logical_region_id
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_bulk_insert_empty_batch_returns_zero() {
|
||||
let env = TestEnv::new().await;
|
||||
env.init_metric_region().await;
|
||||
let logical_region_id = env.default_logical_region_id();
|
||||
|
||||
let batch = build_logical_batch(0, 0);
|
||||
let request = RegionRequest::BulkInserts(RegionBulkInsertsRequest {
|
||||
region_id: logical_region_id,
|
||||
payload: batch,
|
||||
raw_data: ArrowIpc::default(),
|
||||
partition_expr_version: None,
|
||||
});
|
||||
let response = env
|
||||
.metric()
|
||||
.handle_request(logical_region_id, request)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(response.affected_rows, 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_bulk_insert_physical_region_rejected() {
|
||||
let env = TestEnv::new().await;
|
||||
env.init_metric_region().await;
|
||||
|
||||
let physical_region_id = env.default_physical_region_id();
|
||||
let batch = build_logical_batch(0, 2);
|
||||
let request = build_bulk_request(physical_region_id, batch);
|
||||
|
||||
let err = env
|
||||
.metric()
|
||||
.handle_request(physical_region_id, request)
|
||||
.await
|
||||
.unwrap_err();
|
||||
let Some(err) = err.as_any().downcast_ref::<Error>() else {
|
||||
panic!("unexpected error type");
|
||||
};
|
||||
assert_matches!(err, Error::UnsupportedRegionRequest { .. });
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_bulk_insert_unknown_column_errors() {
|
||||
let env = TestEnv::new().await;
|
||||
env.init_metric_region().await;
|
||||
let logical_region_id = env.default_logical_region_id();
|
||||
|
||||
let schema = Arc::new(ArrowSchema::new(vec![
|
||||
Field::new(
|
||||
greptime_timestamp(),
|
||||
DataType::Timestamp(TimeUnit::Millisecond, None),
|
||||
false,
|
||||
),
|
||||
Field::new(greptime_value(), DataType::Float64, true),
|
||||
Field::new("nonexistent_column", DataType::Utf8, true),
|
||||
]));
|
||||
let batch = RecordBatch::try_new(
|
||||
schema,
|
||||
vec![
|
||||
Arc::new(TimestampMillisecondArray::from(vec![0i64])),
|
||||
Arc::new(Float64Array::from(vec![1.0])),
|
||||
Arc::new(StringArray::from(vec!["val"])),
|
||||
],
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let request = build_bulk_request(logical_region_id, batch);
|
||||
let err = env
|
||||
.metric()
|
||||
.handle_request(logical_region_id, request)
|
||||
.await
|
||||
.unwrap_err();
|
||||
let Some(err) = err.as_any().downcast_ref::<Error>() else {
|
||||
panic!("unexpected error type");
|
||||
};
|
||||
assert_matches!(err, Error::ColumnNotFound { .. });
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_bulk_insert_multiple_tag_columns() {
|
||||
let env = TestEnv::new().await;
|
||||
let physical_region_id = env.default_physical_region_id();
|
||||
env.create_physical_region(physical_region_id, &TestEnv::default_table_dir(), vec![])
|
||||
.await;
|
||||
let logical_region_id = env.default_logical_region_id();
|
||||
let request = test_util::create_logical_region_request(
|
||||
&["host", "region"],
|
||||
physical_region_id,
|
||||
&table_dir("test", logical_region_id.table_id()),
|
||||
);
|
||||
env.metric()
|
||||
.handle_request(logical_region_id, RegionRequest::Create(request))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let schema = Arc::new(ArrowSchema::new(vec![
|
||||
Field::new(
|
||||
greptime_timestamp(),
|
||||
DataType::Timestamp(TimeUnit::Millisecond, None),
|
||||
false,
|
||||
),
|
||||
Field::new(greptime_value(), DataType::Float64, true),
|
||||
Field::new("host", DataType::Utf8, true),
|
||||
Field::new("region", DataType::Utf8, true),
|
||||
]));
|
||||
let batch = RecordBatch::try_new(
|
||||
schema,
|
||||
vec![
|
||||
Arc::new(TimestampMillisecondArray::from(vec![0i64, 1, 2])),
|
||||
Arc::new(Float64Array::from(vec![10.0, 20.0, 30.0])),
|
||||
Arc::new(StringArray::from(vec!["h1", "h2", "h1"])),
|
||||
Arc::new(StringArray::from(vec!["us-east", "us-west", "eu-west"])),
|
||||
],
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let request = build_bulk_request(logical_region_id, batch);
|
||||
let response = env
|
||||
.metric()
|
||||
.handle_request(logical_region_id, request)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(response.affected_rows, 3);
|
||||
|
||||
let stream = env
|
||||
.metric()
|
||||
.scan_to_stream(logical_region_id, ScanRequest::default())
|
||||
.await
|
||||
.unwrap();
|
||||
let batches = RecordBatches::try_collect(stream).await.unwrap();
|
||||
assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 3);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_bulk_insert_accumulates_rows() {
|
||||
let env = TestEnv::new().await;
|
||||
env.init_metric_region().await;
|
||||
let logical_region_id = env.default_logical_region_id();
|
||||
|
||||
let request = build_bulk_request(logical_region_id, build_logical_batch(0, 3));
|
||||
let response = env
|
||||
.metric()
|
||||
.handle_request(logical_region_id, request)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(response.affected_rows, 3);
|
||||
|
||||
let request = build_bulk_request(logical_region_id, build_logical_batch(3, 5));
|
||||
let response = env
|
||||
.metric()
|
||||
.handle_request(logical_region_id, request)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(response.affected_rows, 5);
|
||||
|
||||
let stream = env
|
||||
.metric()
|
||||
.scan_to_stream(logical_region_id, ScanRequest::default())
|
||||
.await
|
||||
.unwrap();
|
||||
let batches = RecordBatches::try_collect(stream).await.unwrap();
|
||||
assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 8);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_bulk_insert_sparse_encoding() {
|
||||
let env = TestEnv::new().await;
|
||||
env.init_metric_region().await;
|
||||
let logical_region_id = env.default_logical_region_id();
|
||||
|
||||
let request = build_bulk_request(logical_region_id, build_logical_batch(0, 4));
|
||||
let response = env
|
||||
.metric()
|
||||
.handle_request(logical_region_id, request)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(response.affected_rows, 4);
|
||||
|
||||
let stream = env
|
||||
.metric()
|
||||
.scan_to_stream(logical_region_id, ScanRequest::default())
|
||||
.await
|
||||
.unwrap();
|
||||
let batches = RecordBatches::try_collect(stream).await.unwrap();
|
||||
assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 4);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_bulk_insert_dense_encoding_rejected() {
|
||||
let env = TestEnv::new().await;
|
||||
let logical_region_id = init_dense_metric_region(&env).await;
|
||||
|
||||
let request = build_bulk_request(logical_region_id, build_logical_batch(0, 2));
|
||||
let err = env
|
||||
.metric()
|
||||
.handle_request(logical_region_id, request)
|
||||
.await
|
||||
.unwrap_err();
|
||||
let Some(err) = err.as_any().downcast_ref::<Error>() else {
|
||||
panic!("unexpected error type");
|
||||
};
|
||||
assert_matches!(err, Error::UnsupportedRegionRequest { .. });
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_bulk_insert_matches_put() {
|
||||
let env_put = TestEnv::new().await;
|
||||
env_put.init_metric_region().await;
|
||||
let logical_region_id = env_put.default_logical_region_id();
|
||||
let schema = test_util::row_schema_with_tags(&["job"]);
|
||||
let rows = test_util::build_rows(1, 5);
|
||||
env_put
|
||||
.metric()
|
||||
.handle_request(
|
||||
logical_region_id,
|
||||
RegionRequest::Put(RegionPutRequest {
|
||||
rows: api::v1::Rows { schema, rows },
|
||||
hint: None,
|
||||
partition_expr_version: None,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let put_stream = env_put
|
||||
.metric()
|
||||
.scan_to_stream(logical_region_id, ScanRequest::default())
|
||||
.await
|
||||
.unwrap();
|
||||
let put_batches = RecordBatches::try_collect(put_stream).await.unwrap();
|
||||
let put_output = put_batches.pretty_print().unwrap();
|
||||
|
||||
let env_bulk = TestEnv::new().await;
|
||||
env_bulk.init_metric_region().await;
|
||||
let request = build_bulk_request(logical_region_id, build_logical_batch(0, 5));
|
||||
env_bulk
|
||||
.metric()
|
||||
.handle_request(logical_region_id, request)
|
||||
.await
|
||||
.unwrap();
|
||||
let bulk_stream = env_bulk
|
||||
.metric()
|
||||
.scan_to_stream(logical_region_id, ScanRequest::default())
|
||||
.await
|
||||
.unwrap();
|
||||
let bulk_batches = RecordBatches::try_collect(bulk_stream).await.unwrap();
|
||||
let bulk_output = bulk_batches.pretty_print().unwrap();
|
||||
|
||||
assert_eq!(put_output, bulk_output);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_record_batch_to_rows_with_null_values() {
|
||||
use datatypes::arrow::array::{Float64Array, StringArray, TimestampMillisecondArray};
|
||||
use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema, TimeUnit};
|
||||
use datatypes::arrow::record_batch::RecordBatch;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::engine::bulk_insert::record_batch_to_rows;
|
||||
|
||||
let schema = Arc::new(ArrowSchema::new(vec![
|
||||
Field::new(
|
||||
greptime_timestamp(),
|
||||
DataType::Timestamp(TimeUnit::Millisecond, None),
|
||||
true,
|
||||
),
|
||||
Field::new(greptime_value(), DataType::Float64, true),
|
||||
Field::new("job", DataType::Utf8, true),
|
||||
Field::new("host", DataType::Utf8, true),
|
||||
]));
|
||||
|
||||
let ts_array = TimestampMillisecondArray::from(vec![Some(1000), None, Some(3000)]);
|
||||
let val_array = Float64Array::from(vec![Some(1.0), Some(2.0), None]);
|
||||
let job_array = StringArray::from(vec![Some("job1"), None, Some("job3")]);
|
||||
let host_array = StringArray::from(vec![None, Some("host2"), Some("host3")]);
|
||||
|
||||
let batch = RecordBatch::try_new(
|
||||
schema,
|
||||
vec![
|
||||
Arc::new(ts_array),
|
||||
Arc::new(val_array),
|
||||
Arc::new(job_array),
|
||||
Arc::new(host_array),
|
||||
],
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let rows = record_batch_to_rows(&batch, region_id).unwrap();
|
||||
|
||||
assert_eq!(rows.rows.len(), 3);
|
||||
assert_eq!(rows.schema.len(), 4);
|
||||
|
||||
// Row 0: all non-null except host
|
||||
assert!(rows.rows[0].values[0].value_data.is_some());
|
||||
assert!(rows.rows[0].values[1].value_data.is_some());
|
||||
assert!(rows.rows[0].values[2].value_data.is_some());
|
||||
assert!(rows.rows[0].values[3].value_data.is_none());
|
||||
|
||||
// Row 1: null timestamp, null job
|
||||
assert!(rows.rows[1].values[0].value_data.is_none());
|
||||
assert!(rows.rows[1].values[1].value_data.is_some());
|
||||
assert!(rows.rows[1].values[2].value_data.is_none());
|
||||
assert!(rows.rows[1].values[3].value_data.is_some());
|
||||
|
||||
// Row 2: null value
|
||||
assert!(rows.rows[2].values[0].value_data.is_some());
|
||||
assert!(rows.rows[2].values[1].value_data.is_none());
|
||||
assert!(rows.rows[2].values[2].value_data.is_some());
|
||||
assert!(rows.rows[2].values[3].value_data.is_some());
|
||||
}
|
||||
}
|
||||
@@ -460,7 +460,7 @@ impl MetricEngineInner {
|
||||
.await
|
||||
}
|
||||
|
||||
fn find_data_region_meta(
|
||||
pub(crate) fn find_data_region_meta(
|
||||
&self,
|
||||
logical_region_id: RegionId,
|
||||
) -> Result<(RegionId, RegionId, PrimaryKeyEncoding)> {
|
||||
|
||||
@@ -52,6 +52,7 @@
|
||||
|
||||
#![feature(assert_matches)]
|
||||
|
||||
mod batch_modifier;
|
||||
pub mod config;
|
||||
mod data_region;
|
||||
pub mod engine;
|
||||
|
||||
@@ -28,7 +28,7 @@ use mito2::memtable::bulk::part_reader::BulkPartBatchIter;
|
||||
use mito2::memtable::bulk::{BulkMemtable, BulkMemtableConfig};
|
||||
use mito2::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtable};
|
||||
use mito2::memtable::time_series::TimeSeriesMemtable;
|
||||
use mito2::memtable::{KeyValues, Memtable, RangesOptions};
|
||||
use mito2::memtable::{IterBuilder, KeyValues, Memtable, RangesOptions};
|
||||
use mito2::read::flat_merge::FlatMergeIterator;
|
||||
use mito2::read::scan_region::PredicateGroup;
|
||||
use mito2::region::options::MergeMode;
|
||||
@@ -105,7 +105,11 @@ fn full_scan(c: &mut Criterion) {
|
||||
}
|
||||
|
||||
b.iter(|| {
|
||||
let iter = memtable.iter(None, None, None).unwrap();
|
||||
let iter = memtable
|
||||
.ranges(None, RangesOptions::default())
|
||||
.unwrap()
|
||||
.build(None)
|
||||
.unwrap();
|
||||
for batch in iter {
|
||||
let _batch = batch.unwrap();
|
||||
}
|
||||
@@ -145,7 +149,17 @@ fn filter_1_host(c: &mut Criterion) {
|
||||
let predicate = generator.random_host_filter();
|
||||
|
||||
b.iter(|| {
|
||||
let iter = memtable.iter(None, Some(predicate.clone()), None).unwrap();
|
||||
let iter = memtable
|
||||
.ranges(
|
||||
None,
|
||||
RangesOptions {
|
||||
predicate: PredicateGroup::new(&metadata, predicate.exprs()).unwrap(),
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.unwrap()
|
||||
.build(None)
|
||||
.unwrap();
|
||||
for batch in iter {
|
||||
let _batch = batch.unwrap();
|
||||
}
|
||||
|
||||
@@ -21,7 +21,7 @@ use criterion::{Criterion, criterion_group, criterion_main};
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::schema::ColumnSchema;
|
||||
use mito2::memtable::simple_bulk_memtable::SimpleBulkMemtable;
|
||||
use mito2::memtable::{KeyValues, Memtable, MemtableRanges, RangesOptions};
|
||||
use mito2::memtable::{IterBuilder, KeyValues, Memtable, MemtableRanges, RangesOptions};
|
||||
use mito2::read;
|
||||
use mito2::read::Source;
|
||||
use mito2::read::dedup::DedupReader;
|
||||
@@ -156,7 +156,11 @@ async fn flush(mem: &SimpleBulkMemtable) {
|
||||
}
|
||||
|
||||
async fn flush_original(mem: &SimpleBulkMemtable) {
|
||||
let iter = mem.iter(None, None, None).unwrap();
|
||||
let iter = mem
|
||||
.ranges(None, RangesOptions::default())
|
||||
.unwrap()
|
||||
.build(None)
|
||||
.unwrap();
|
||||
for b in iter {
|
||||
black_box(b.unwrap());
|
||||
}
|
||||
|
||||
@@ -49,6 +49,7 @@ use crate::cache::write_cache::WriteCacheRef;
|
||||
use crate::memtable::record_batch_estimated_size;
|
||||
use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS};
|
||||
use crate::read::Batch;
|
||||
use crate::read::range_cache::{RangeScanCacheKey, RangeScanCacheValue};
|
||||
use crate::sst::file::{RegionFileId, RegionIndexId};
|
||||
use crate::sst::parquet::reader::MetadataCacheMetrics;
|
||||
|
||||
@@ -64,6 +65,8 @@ const FILE_TYPE: &str = "file";
|
||||
const INDEX_TYPE: &str = "index";
|
||||
/// Metrics type key for selector result cache.
|
||||
const SELECTOR_RESULT_TYPE: &str = "selector_result";
|
||||
/// Metrics type key for range scan result cache.
|
||||
const RANGE_RESULT_TYPE: &str = "range_result";
|
||||
|
||||
/// Cache strategies that may only enable a subset of caches.
|
||||
#[derive(Clone)]
|
||||
@@ -223,6 +226,32 @@ impl CacheStrategy {
|
||||
}
|
||||
}
|
||||
|
||||
/// Calls [CacheManager::get_range_result()].
|
||||
/// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
|
||||
#[cfg_attr(not(test), allow(dead_code))]
|
||||
pub(crate) fn get_range_result(
|
||||
&self,
|
||||
key: &RangeScanCacheKey,
|
||||
) -> Option<Arc<RangeScanCacheValue>> {
|
||||
match self {
|
||||
CacheStrategy::EnableAll(cache_manager) => cache_manager.get_range_result(key),
|
||||
CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Calls [CacheManager::put_range_result()].
|
||||
/// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
|
||||
#[cfg_attr(not(test), allow(dead_code))]
|
||||
pub(crate) fn put_range_result(
|
||||
&self,
|
||||
key: RangeScanCacheKey,
|
||||
result: Arc<RangeScanCacheValue>,
|
||||
) {
|
||||
if let CacheStrategy::EnableAll(cache_manager) = self {
|
||||
cache_manager.put_range_result(key, result);
|
||||
}
|
||||
}
|
||||
|
||||
/// Calls [CacheManager::write_cache()].
|
||||
/// It returns None if the strategy is [CacheStrategy::Disabled].
|
||||
pub fn write_cache(&self) -> Option<&WriteCacheRef> {
|
||||
@@ -324,6 +353,9 @@ pub struct CacheManager {
|
||||
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
|
||||
/// Cache for time series selectors.
|
||||
selector_result_cache: Option<SelectorResultCache>,
|
||||
/// Cache for range scan outputs in flat format.
|
||||
#[cfg_attr(not(test), allow(dead_code))]
|
||||
range_result_cache: Option<RangeResultCache>,
|
||||
/// Cache for index result.
|
||||
index_result_cache: Option<IndexResultCache>,
|
||||
}
|
||||
@@ -512,6 +544,32 @@ impl CacheManager {
|
||||
}
|
||||
}
|
||||
|
||||
/// Gets cached result for range scan.
|
||||
#[cfg_attr(not(test), allow(dead_code))]
|
||||
pub(crate) fn get_range_result(
|
||||
&self,
|
||||
key: &RangeScanCacheKey,
|
||||
) -> Option<Arc<RangeScanCacheValue>> {
|
||||
self.range_result_cache
|
||||
.as_ref()
|
||||
.and_then(|cache| update_hit_miss(cache.get(key), RANGE_RESULT_TYPE))
|
||||
}
|
||||
|
||||
/// Puts range scan result into the cache.
|
||||
#[cfg_attr(not(test), allow(dead_code))]
|
||||
pub(crate) fn put_range_result(
|
||||
&self,
|
||||
key: RangeScanCacheKey,
|
||||
result: Arc<RangeScanCacheValue>,
|
||||
) {
|
||||
if let Some(cache) = &self.range_result_cache {
|
||||
CACHE_BYTES
|
||||
.with_label_values(&[RANGE_RESULT_TYPE])
|
||||
.add(range_result_cache_weight(&key, &result).into());
|
||||
cache.insert(key, result);
|
||||
}
|
||||
}
|
||||
|
||||
/// Gets the write cache.
|
||||
pub(crate) fn write_cache(&self) -> Option<&WriteCacheRef> {
|
||||
self.write_cache.as_ref()
|
||||
@@ -562,6 +620,7 @@ pub struct CacheManagerBuilder {
|
||||
puffin_metadata_size: u64,
|
||||
write_cache: Option<WriteCacheRef>,
|
||||
selector_result_cache_size: u64,
|
||||
range_result_cache_size: u64,
|
||||
}
|
||||
|
||||
impl CacheManagerBuilder {
|
||||
@@ -625,6 +684,12 @@ impl CacheManagerBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets range result cache size.
|
||||
pub fn range_result_cache_size(mut self, bytes: u64) -> Self {
|
||||
self.range_result_cache_size = bytes;
|
||||
self
|
||||
}
|
||||
|
||||
/// Builds the [CacheManager].
|
||||
pub fn build(self) -> CacheManager {
|
||||
fn to_str(cause: RemovalCause) -> &'static str {
|
||||
@@ -712,6 +777,21 @@ impl CacheManagerBuilder {
|
||||
})
|
||||
.build()
|
||||
});
|
||||
let range_result_cache = (self.range_result_cache_size != 0).then(|| {
|
||||
Cache::builder()
|
||||
.max_capacity(self.range_result_cache_size)
|
||||
.weigher(range_result_cache_weight)
|
||||
.eviction_listener(|k, v, cause| {
|
||||
let size = range_result_cache_weight(&k, &v);
|
||||
CACHE_BYTES
|
||||
.with_label_values(&[RANGE_RESULT_TYPE])
|
||||
.sub(size.into());
|
||||
CACHE_EVICTION
|
||||
.with_label_values(&[RANGE_RESULT_TYPE, to_str(cause)])
|
||||
.inc();
|
||||
})
|
||||
.build()
|
||||
});
|
||||
CacheManager {
|
||||
sst_meta_cache,
|
||||
vector_cache,
|
||||
@@ -723,6 +803,7 @@ impl CacheManagerBuilder {
|
||||
vector_index_cache,
|
||||
puffin_metadata_cache: Some(Arc::new(puffin_metadata_cache)),
|
||||
selector_result_cache,
|
||||
range_result_cache,
|
||||
index_result_cache,
|
||||
}
|
||||
}
|
||||
@@ -746,6 +827,10 @@ fn selector_result_cache_weight(k: &SelectorResultKey, v: &Arc<SelectorResultVal
|
||||
(mem::size_of_val(k) + v.estimated_size()) as u32
|
||||
}
|
||||
|
||||
fn range_result_cache_weight(k: &RangeScanCacheKey, v: &Arc<RangeScanCacheValue>) -> u32 {
|
||||
(k.estimated_size() + v.estimated_size()) as u32
|
||||
}
|
||||
|
||||
/// Updates cache hit/miss metrics.
|
||||
fn update_hit_miss<T>(value: Option<T>, cache_type: &str) -> Option<T> {
|
||||
if value.is_some() {
|
||||
@@ -902,6 +987,8 @@ type VectorCache = Cache<(ConcreteDataType, Value), VectorRef>;
|
||||
type PageCache = Cache<PageKey, Arc<PageValue>>;
|
||||
/// Maps (file id, row group id, time series row selector) to [SelectorResultValue].
|
||||
type SelectorResultCache = Cache<SelectorResultKey, Arc<SelectorResultValue>>;
|
||||
/// Maps partition-range scan key to cached flat batches.
|
||||
type RangeResultCache = Cache<RangeScanCacheKey, Arc<RangeScanCacheValue>>;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
@@ -916,6 +1003,9 @@ mod tests {
|
||||
use crate::cache::index::bloom_filter_index::Tag;
|
||||
use crate::cache::index::result_cache::PredicateKey;
|
||||
use crate::cache::test_util::parquet_meta;
|
||||
use crate::read::range_cache::{
|
||||
RangeScanCacheKey, RangeScanCacheValue, ScanRequestFingerprintBuilder,
|
||||
};
|
||||
use crate::sst::parquet::row_selection::RowGroupSelection;
|
||||
|
||||
#[tokio::test]
|
||||
@@ -1028,6 +1118,50 @@ mod tests {
|
||||
assert!(cache.get_selector_result(&key).is_some());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_range_result_cache() {
|
||||
let cache = Arc::new(
|
||||
CacheManager::builder()
|
||||
.range_result_cache_size(1024 * 1024)
|
||||
.build(),
|
||||
);
|
||||
|
||||
let key = RangeScanCacheKey {
|
||||
region_id: RegionId::new(1, 1),
|
||||
row_groups: vec![(FileId::random(), 0)],
|
||||
scan: ScanRequestFingerprintBuilder {
|
||||
read_column_ids: vec![],
|
||||
read_column_types: vec![],
|
||||
filters: vec!["tag_0 = 1".to_string()],
|
||||
time_filters: vec![],
|
||||
series_row_selector: None,
|
||||
append_mode: false,
|
||||
filter_deleted: true,
|
||||
merge_mode: crate::region::options::MergeMode::LastRow,
|
||||
partition_expr_version: 0,
|
||||
}
|
||||
.build(),
|
||||
};
|
||||
let value = Arc::new(RangeScanCacheValue::new(Vec::new()));
|
||||
|
||||
assert!(cache.get_range_result(&key).is_none());
|
||||
cache.put_range_result(key.clone(), value.clone());
|
||||
assert!(cache.get_range_result(&key).is_some());
|
||||
|
||||
let enable_all = CacheStrategy::EnableAll(cache.clone());
|
||||
assert!(enable_all.get_range_result(&key).is_some());
|
||||
|
||||
let compaction = CacheStrategy::Compaction(cache.clone());
|
||||
assert!(compaction.get_range_result(&key).is_none());
|
||||
compaction.put_range_result(key.clone(), value.clone());
|
||||
assert!(cache.get_range_result(&key).is_some());
|
||||
|
||||
let disabled = CacheStrategy::Disabled;
|
||||
assert!(disabled.get_range_result(&key).is_none());
|
||||
disabled.put_range_result(key.clone(), value);
|
||||
assert!(cache.get_range_result(&key).is_some());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_evict_puffin_cache_clears_all_entries() {
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
|
||||
@@ -24,7 +24,7 @@ use crate::test_util::{
|
||||
CreateRequestBuilder, TestEnv, build_rows_for_key, flush_region, put_rows, rows_schema,
|
||||
};
|
||||
|
||||
async fn test_last_row(append_mode: bool) {
|
||||
async fn test_last_row(append_mode: bool, flat_format: bool) {
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let region_id = RegionId::new(1, 1);
|
||||
@@ -39,9 +39,12 @@ async fn test_last_row(append_mode: bool) {
|
||||
env.get_kv_backend(),
|
||||
)
|
||||
.await;
|
||||
let request = CreateRequestBuilder::new()
|
||||
.insert_option("append_mode", &append_mode.to_string())
|
||||
.build();
|
||||
let mut request_builder =
|
||||
CreateRequestBuilder::new().insert_option("append_mode", &append_mode.to_string());
|
||||
if flat_format {
|
||||
request_builder = request_builder.insert_option("sst_format", "flat");
|
||||
}
|
||||
let request = request_builder.build();
|
||||
let column_schemas = rows_schema(&request);
|
||||
engine
|
||||
.handle_request(region_id, RegionRequest::Create(request))
|
||||
@@ -106,10 +109,20 @@ async fn test_last_row(append_mode: bool) {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_last_row_append_mode_disabled() {
|
||||
test_last_row(false).await;
|
||||
test_last_row(false, false).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_last_row_append_mode_enabled() {
|
||||
test_last_row(true).await;
|
||||
test_last_row(true, false).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_last_row_flat_format_append_mode_disabled() {
|
||||
test_last_row(false, true).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_last_row_flat_format_append_mode_enabled() {
|
||||
test_last_row(true, true).await;
|
||||
}
|
||||
|
||||
@@ -28,6 +28,7 @@ use mito_codec::key_values::KeyValue;
|
||||
pub use mito_codec::key_values::KeyValues;
|
||||
use mito_codec::row_converter::{PrimaryKeyCodec, build_primary_key_codec};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::ensure;
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::storage::{ColumnId, SequenceNumber, SequenceRange};
|
||||
|
||||
@@ -231,10 +232,17 @@ impl MemtableRanges {
|
||||
|
||||
impl IterBuilder for MemtableRanges {
|
||||
fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
|
||||
UnsupportedOperationSnafu {
|
||||
err_msg: "MemtableRanges does not support build iterator",
|
||||
}
|
||||
.fail()
|
||||
ensure!(
|
||||
self.ranges.len() == 1,
|
||||
UnsupportedOperationSnafu {
|
||||
err_msg: format!(
|
||||
"Building an iterator from MemtableRanges expects 1 range, but got {}",
|
||||
self.ranges.len()
|
||||
),
|
||||
}
|
||||
);
|
||||
|
||||
self.ranges.values().next().unwrap().build_iter()
|
||||
}
|
||||
|
||||
fn is_record_batch(&self) -> bool {
|
||||
@@ -256,20 +264,6 @@ pub trait Memtable: Send + Sync + fmt::Debug {
|
||||
/// Writes an encoded batch of into memtable.
|
||||
fn write_bulk(&self, part: crate::memtable::bulk::part::BulkPart) -> Result<()>;
|
||||
|
||||
/// Scans the memtable.
|
||||
/// `projection` selects columns to read, `None` means reading all columns.
|
||||
/// `filters` are the predicates to be pushed down to memtable.
|
||||
///
|
||||
/// # Note
|
||||
/// This method should only be used for tests.
|
||||
#[cfg(any(test, feature = "test"))]
|
||||
fn iter(
|
||||
&self,
|
||||
projection: Option<&[ColumnId]>,
|
||||
predicate: Option<table::predicate::Predicate>,
|
||||
sequence: Option<SequenceRange>,
|
||||
) -> Result<BoxedBatchIterator>;
|
||||
|
||||
/// Returns the ranges in the memtable.
|
||||
///
|
||||
/// The returned map contains the range id and the range after applying the predicate.
|
||||
|
||||
@@ -462,16 +462,6 @@ impl Memtable for BulkMemtable {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(any(test, feature = "test"))]
|
||||
fn iter(
|
||||
&self,
|
||||
_projection: Option<&[ColumnId]>,
|
||||
_predicate: Option<table::predicate::Predicate>,
|
||||
_sequence: Option<SequenceRange>,
|
||||
) -> Result<crate::memtable::BoxedBatchIterator> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn ranges(
|
||||
&self,
|
||||
projection: Option<&[ColumnId]>,
|
||||
|
||||
@@ -177,16 +177,6 @@ impl Memtable for PartitionTreeMemtable {
|
||||
.fail()
|
||||
}
|
||||
|
||||
#[cfg(any(test, feature = "test"))]
|
||||
fn iter(
|
||||
&self,
|
||||
projection: Option<&[ColumnId]>,
|
||||
predicate: Option<Predicate>,
|
||||
sequence: Option<SequenceRange>,
|
||||
) -> Result<BoxedBatchIterator> {
|
||||
self.tree.read(projection, predicate, sequence, None)
|
||||
}
|
||||
|
||||
fn ranges(
|
||||
&self,
|
||||
projection: Option<&[ColumnId]>,
|
||||
@@ -396,8 +386,6 @@ mod tests {
|
||||
use api::v1::{Mutation, OpType, Rows, SemanticType};
|
||||
use common_query::prelude::{greptime_timestamp, greptime_value};
|
||||
use common_time::Timestamp;
|
||||
use datafusion_common::Column;
|
||||
use datafusion_expr::{BinaryExpr, Expr, Literal, Operator};
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::prelude::Vector;
|
||||
use datatypes::scalars::ScalarVector;
|
||||
@@ -548,7 +536,10 @@ mod tests {
|
||||
let expect = (0..100).collect::<Vec<_>>();
|
||||
let kvs = memtable_util::build_key_values(&metadata, "hello".to_string(), 10, &expect, 1);
|
||||
memtable.write(&kvs).unwrap();
|
||||
let iter = memtable.iter(Some(&[3]), None, None).unwrap();
|
||||
let ranges = memtable
|
||||
.ranges(Some(&[3]), RangesOptions::default())
|
||||
.unwrap();
|
||||
let iter = ranges.build(None).unwrap();
|
||||
|
||||
let mut v0_all = vec![];
|
||||
for res in iter {
|
||||
@@ -625,41 +616,6 @@ mod tests {
|
||||
assert_eq!(expect, read);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_memtable_filter() {
|
||||
let metadata = Arc::new(memtable_util::metadata_with_primary_key(vec![0, 1], false));
|
||||
// Try to build a memtable via the builder.
|
||||
let memtable = PartitionTreeMemtableBuilder::new(
|
||||
PartitionTreeConfig {
|
||||
index_max_keys_per_shard: 40,
|
||||
..Default::default()
|
||||
},
|
||||
None,
|
||||
)
|
||||
.build(1, &metadata);
|
||||
|
||||
for i in 0..100 {
|
||||
let timestamps: Vec<_> = (0..10).map(|v| i as i64 * 1000 + v).collect();
|
||||
let kvs =
|
||||
memtable_util::build_key_values(&metadata, "hello".to_string(), i, ×tamps, 1);
|
||||
memtable.write(&kvs).unwrap();
|
||||
}
|
||||
|
||||
for i in 0..100 {
|
||||
let timestamps: Vec<_> = (0..10).map(|v| i as i64 * 1000 + v).collect();
|
||||
let expr = Expr::BinaryExpr(BinaryExpr {
|
||||
left: Box::new(Expr::Column(Column::from_name("k1"))),
|
||||
op: Operator::Eq,
|
||||
right: Box::new((i as u32).lit()),
|
||||
});
|
||||
let iter = memtable
|
||||
.iter(None, Some(Predicate::new(vec![expr])), None)
|
||||
.unwrap();
|
||||
let read = collect_iter_timestamps(iter);
|
||||
assert_eq!(timestamps, read);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_deserialize_config() {
|
||||
let config = PartitionTreeConfig {
|
||||
@@ -811,7 +767,11 @@ mod tests {
|
||||
))
|
||||
.unwrap();
|
||||
|
||||
let mut reader = new_memtable.iter(None, None, None).unwrap();
|
||||
let mut reader = new_memtable
|
||||
.ranges(None, RangesOptions::default())
|
||||
.unwrap()
|
||||
.build(None)
|
||||
.unwrap();
|
||||
let batch = reader.next().unwrap().unwrap();
|
||||
let pk = codec.decode(batch.primary_key()).unwrap().into_dense();
|
||||
if let Value::String(s) = &pk[2] {
|
||||
@@ -916,7 +876,14 @@ mod tests {
|
||||
.unwrap();
|
||||
memtable.freeze().unwrap();
|
||||
assert_eq!(
|
||||
collect_kvs(memtable.iter(None, None, None).unwrap(), &metadata),
|
||||
collect_kvs(
|
||||
memtable
|
||||
.ranges(None, RangesOptions::default())
|
||||
.unwrap()
|
||||
.build(None)
|
||||
.unwrap(),
|
||||
&metadata
|
||||
),
|
||||
('a'..'h').map(|c| (c.to_string(), c.to_string())).collect()
|
||||
);
|
||||
let forked = memtable.fork(2, &metadata);
|
||||
@@ -925,7 +892,14 @@ mod tests {
|
||||
forked.write(&key_values(&metadata, keys.iter())).unwrap();
|
||||
forked.freeze().unwrap();
|
||||
assert_eq!(
|
||||
collect_kvs(forked.iter(None, None, None).unwrap(), &metadata),
|
||||
collect_kvs(
|
||||
forked
|
||||
.ranges(None, RangesOptions::default())
|
||||
.unwrap()
|
||||
.build(None)
|
||||
.unwrap(),
|
||||
&metadata
|
||||
),
|
||||
keys.iter()
|
||||
.map(|c| (c.to_string(), c.to_string()))
|
||||
.collect()
|
||||
@@ -936,7 +910,14 @@ mod tests {
|
||||
let keys = ["g", "e", "a", "f", "b", "c", "h"];
|
||||
forked2.write(&key_values(&metadata, keys.iter())).unwrap();
|
||||
|
||||
let kvs = collect_kvs(forked2.iter(None, None, None).unwrap(), &metadata);
|
||||
let kvs = collect_kvs(
|
||||
forked2
|
||||
.ranges(None, RangesOptions::default())
|
||||
.unwrap()
|
||||
.build(None)
|
||||
.unwrap(),
|
||||
&metadata,
|
||||
);
|
||||
let expected = keys
|
||||
.iter()
|
||||
.map(|c| (c.to_string(), c.to_string()))
|
||||
|
||||
@@ -213,22 +213,6 @@ impl Memtable for SimpleBulkMemtable {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(any(test, feature = "test"))]
|
||||
fn iter(
|
||||
&self,
|
||||
projection: Option<&[ColumnId]>,
|
||||
_predicate: Option<table::predicate::Predicate>,
|
||||
sequence: Option<store_api::storage::SequenceRange>,
|
||||
) -> error::Result<BoxedBatchIterator> {
|
||||
let iter = self.create_iter(projection, sequence)?.build(None)?;
|
||||
if self.merge_mode == MergeMode::LastNonNull {
|
||||
let iter = LastNonNullIter::new(iter);
|
||||
Ok(Box::new(iter))
|
||||
} else {
|
||||
Ok(Box::new(iter))
|
||||
}
|
||||
}
|
||||
|
||||
fn ranges(
|
||||
&self,
|
||||
projection: Option<&[ColumnId]>,
|
||||
@@ -526,7 +510,11 @@ mod tests {
|
||||
))
|
||||
.unwrap();
|
||||
|
||||
let mut iter = memtable.iter(None, None, None).unwrap();
|
||||
let mut iter = memtable
|
||||
.ranges(None, RangesOptions::default())
|
||||
.unwrap()
|
||||
.build(None)
|
||||
.unwrap();
|
||||
let batch = iter.next().unwrap().unwrap();
|
||||
assert_eq!(2, batch.num_rows());
|
||||
assert_eq!(2, batch.fields().len());
|
||||
@@ -551,7 +539,11 @@ mod tests {
|
||||
))
|
||||
.unwrap();
|
||||
|
||||
let mut iter = memtable.iter(None, None, None).unwrap();
|
||||
let mut iter = memtable
|
||||
.ranges(None, RangesOptions::default())
|
||||
.unwrap()
|
||||
.build(None)
|
||||
.unwrap();
|
||||
let batch = iter.next().unwrap().unwrap();
|
||||
assert_eq!(1, batch.num_rows());
|
||||
assert_eq!(2, batch.fields().len());
|
||||
@@ -565,7 +557,11 @@ mod tests {
|
||||
|
||||
// Only project column 2 (f1)
|
||||
let projection = vec![2];
|
||||
let mut iter = memtable.iter(Some(&projection), None, None).unwrap();
|
||||
let mut iter = memtable
|
||||
.ranges(Some(&projection), RangesOptions::default())
|
||||
.unwrap()
|
||||
.build(None)
|
||||
.unwrap();
|
||||
let batch = iter.next().unwrap().unwrap();
|
||||
|
||||
assert_eq!(1, batch.num_rows());
|
||||
@@ -592,7 +588,11 @@ mod tests {
|
||||
OpType::Put,
|
||||
))
|
||||
.unwrap();
|
||||
let mut iter = memtable.iter(None, None, None).unwrap();
|
||||
let mut iter = memtable
|
||||
.ranges(None, RangesOptions::default())
|
||||
.unwrap()
|
||||
.build(None)
|
||||
.unwrap();
|
||||
let batch = iter.next().unwrap().unwrap();
|
||||
|
||||
assert_eq!(1, batch.num_rows()); // deduped to 1 row
|
||||
@@ -611,7 +611,11 @@ mod tests {
|
||||
let kv = kvs.iter().next().unwrap();
|
||||
memtable.write_one(kv).unwrap();
|
||||
|
||||
let mut iter = memtable.iter(None, None, None).unwrap();
|
||||
let mut iter = memtable
|
||||
.ranges(None, RangesOptions::default())
|
||||
.unwrap()
|
||||
.build(None)
|
||||
.unwrap();
|
||||
let batch = iter.next().unwrap().unwrap();
|
||||
assert_eq!(1, batch.num_rows());
|
||||
}
|
||||
@@ -745,7 +749,11 @@ mod tests {
|
||||
};
|
||||
memtable.write_bulk(part).unwrap();
|
||||
|
||||
let mut iter = memtable.iter(None, None, None).unwrap();
|
||||
let mut iter = memtable
|
||||
.ranges(None, RangesOptions::default())
|
||||
.unwrap()
|
||||
.build(None)
|
||||
.unwrap();
|
||||
let batch = iter.next().unwrap().unwrap();
|
||||
assert_eq!(2, batch.num_rows());
|
||||
|
||||
@@ -764,7 +772,11 @@ mod tests {
|
||||
OpType::Put,
|
||||
);
|
||||
memtable.write(&kvs).unwrap();
|
||||
let mut iter = memtable.iter(None, None, None).unwrap();
|
||||
let mut iter = memtable
|
||||
.ranges(None, RangesOptions::default())
|
||||
.unwrap()
|
||||
.build(None)
|
||||
.unwrap();
|
||||
let batch = iter.next().unwrap().unwrap();
|
||||
assert_eq!(3, batch.num_rows());
|
||||
assert_eq!(
|
||||
@@ -854,7 +866,15 @@ mod tests {
|
||||
|
||||
// Filter with sequence 0 should only return first write
|
||||
let mut iter = memtable
|
||||
.iter(None, None, Some(SequenceRange::LtEq { max: 0 }))
|
||||
.ranges(
|
||||
None,
|
||||
RangesOptions {
|
||||
sequence: Some(SequenceRange::LtEq { max: 0 }),
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.unwrap()
|
||||
.build(None)
|
||||
.unwrap();
|
||||
let batch = iter.next().unwrap().unwrap();
|
||||
assert_eq!(1, batch.num_rows());
|
||||
|
||||
@@ -12,98 +12,12 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashSet;
|
||||
use std::time::Instant;
|
||||
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::storage::{ColumnId, SequenceRange};
|
||||
|
||||
use crate::error;
|
||||
use crate::memtable::simple_bulk_memtable::{Iter, SimpleBulkMemtable};
|
||||
use crate::memtable::time_series::Values;
|
||||
use crate::memtable::{BoxedBatchIterator, IterBuilder, MemScanMetrics};
|
||||
use crate::read::dedup::LastNonNullIter;
|
||||
use crate::region::options::MergeMode;
|
||||
use crate::memtable::simple_bulk_memtable::SimpleBulkMemtable;
|
||||
|
||||
impl SimpleBulkMemtable {
|
||||
pub fn region_metadata(&self) -> RegionMetadataRef {
|
||||
self.region_metadata.clone()
|
||||
}
|
||||
|
||||
pub(crate) fn create_iter(
|
||||
&self,
|
||||
projection: Option<&[ColumnId]>,
|
||||
sequence: Option<SequenceRange>,
|
||||
) -> error::Result<BatchIterBuilderDeprecated> {
|
||||
let mut series = self.series.write().unwrap();
|
||||
|
||||
let values = if series.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(series.compact(&self.region_metadata)?.clone())
|
||||
};
|
||||
let projection = self.build_projection(projection);
|
||||
Ok(BatchIterBuilderDeprecated {
|
||||
region_metadata: self.region_metadata.clone(),
|
||||
values,
|
||||
projection,
|
||||
dedup: self.dedup,
|
||||
sequence,
|
||||
merge_mode: self.merge_mode,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct BatchIterBuilderDeprecated {
|
||||
region_metadata: RegionMetadataRef,
|
||||
values: Option<Values>,
|
||||
projection: HashSet<ColumnId>,
|
||||
sequence: Option<SequenceRange>,
|
||||
dedup: bool,
|
||||
merge_mode: MergeMode,
|
||||
}
|
||||
|
||||
impl IterBuilder for BatchIterBuilderDeprecated {
|
||||
fn build(&self, metrics: Option<MemScanMetrics>) -> error::Result<BoxedBatchIterator> {
|
||||
let start_time = Instant::now();
|
||||
let Some(values) = self.values.clone() else {
|
||||
return Ok(Box::new(Iter { batch: None }));
|
||||
};
|
||||
|
||||
let maybe_batch = values
|
||||
.to_batch(
|
||||
&[],
|
||||
&self.region_metadata,
|
||||
&self.projection,
|
||||
self.sequence,
|
||||
self.dedup,
|
||||
self.merge_mode,
|
||||
)
|
||||
.map(Some)
|
||||
.transpose();
|
||||
|
||||
// Collect metrics from the batch
|
||||
if let Some(metrics) = metrics {
|
||||
let (num_rows, num_batches) = match &maybe_batch {
|
||||
Some(Ok(batch)) => (batch.num_rows(), 1),
|
||||
_ => (0, 0),
|
||||
};
|
||||
let inner = crate::memtable::MemScanMetricsData {
|
||||
total_series: 1,
|
||||
num_rows,
|
||||
num_batches,
|
||||
scan_cost: start_time.elapsed(),
|
||||
};
|
||||
metrics.merge_inner(&inner);
|
||||
}
|
||||
|
||||
let iter = Iter { batch: maybe_batch };
|
||||
|
||||
if self.merge_mode == MergeMode::LastNonNull {
|
||||
Ok(Box::new(LastNonNullIter::new(iter)))
|
||||
} else {
|
||||
Ok(Box::new(iter))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -827,6 +827,7 @@ mod tests {
|
||||
use super::*;
|
||||
use crate::memtable::partition_tree::PartitionTreeMemtableBuilder;
|
||||
use crate::memtable::time_series::TimeSeriesMemtableBuilder;
|
||||
use crate::memtable::{IterBuilder, RangesOptions};
|
||||
use crate::test_util::memtable_util::{self, collect_iter_timestamps};
|
||||
|
||||
#[test]
|
||||
@@ -852,7 +853,11 @@ mod tests {
|
||||
partitions.list_memtables(&mut memtables);
|
||||
assert_eq!(0, memtables[0].id());
|
||||
|
||||
let iter = memtables[0].iter(None, None, None).unwrap();
|
||||
let iter = memtables[0]
|
||||
.ranges(None, RangesOptions::default())
|
||||
.unwrap()
|
||||
.build(None)
|
||||
.unwrap();
|
||||
let timestamps = collect_iter_timestamps(iter);
|
||||
assert_eq!(&[1000, 3000, 5000, 6000, 7000], ×tamps[..]);
|
||||
}
|
||||
@@ -890,7 +895,11 @@ mod tests {
|
||||
|
||||
let mut memtables = Vec::new();
|
||||
partitions.list_memtables(&mut memtables);
|
||||
let iter = memtables[0].iter(None, None, None).unwrap();
|
||||
let iter = memtables[0]
|
||||
.ranges(None, RangesOptions::default())
|
||||
.unwrap()
|
||||
.build(None)
|
||||
.unwrap();
|
||||
let timestamps = collect_iter_timestamps(iter);
|
||||
assert_eq!(&[0, 2000, 3000, 4000, 5000, 7000], ×tamps[..]);
|
||||
let parts = partitions.list_partitions();
|
||||
@@ -943,7 +952,12 @@ mod tests {
|
||||
let partitions = new_multi_partitions(&metadata);
|
||||
|
||||
let parts = partitions.list_partitions();
|
||||
let iter = parts[0].memtable.iter(None, None, None).unwrap();
|
||||
let iter = parts[0]
|
||||
.memtable
|
||||
.ranges(None, RangesOptions::default())
|
||||
.unwrap()
|
||||
.build(None)
|
||||
.unwrap();
|
||||
let timestamps = collect_iter_timestamps(iter);
|
||||
assert_eq!(0, parts[0].memtable.id());
|
||||
assert_eq!(
|
||||
@@ -955,7 +969,12 @@ mod tests {
|
||||
parts[0].time_range.max_timestamp
|
||||
);
|
||||
assert_eq!(&[0, 2000, 3000, 4000], ×tamps[..]);
|
||||
let iter = parts[1].memtable.iter(None, None, None).unwrap();
|
||||
let iter = parts[1]
|
||||
.memtable
|
||||
.ranges(None, RangesOptions::default())
|
||||
.unwrap()
|
||||
.build(None)
|
||||
.unwrap();
|
||||
assert_eq!(1, parts[1].memtable.id());
|
||||
let timestamps = collect_iter_timestamps(iter);
|
||||
assert_eq!(&[5000, 7000], ×tamps[..]);
|
||||
@@ -1273,7 +1292,12 @@ mod tests {
|
||||
|
||||
let parts = partitions.list_partitions();
|
||||
assert_eq!(1, parts.len());
|
||||
let iter = parts[0].memtable.iter(None, None, None).unwrap();
|
||||
let iter = parts[0]
|
||||
.memtable
|
||||
.ranges(None, RangesOptions::default())
|
||||
.unwrap()
|
||||
.build(None)
|
||||
.unwrap();
|
||||
let timestamps = collect_iter_timestamps(iter);
|
||||
assert_eq!(&[1000, 2000, 3000], ×tamps[..]);
|
||||
|
||||
@@ -1284,11 +1308,21 @@ mod tests {
|
||||
let parts = partitions.list_partitions();
|
||||
assert_eq!(2, parts.len());
|
||||
// Check first partition [0, 5000)
|
||||
let iter = parts[0].memtable.iter(None, None, None).unwrap();
|
||||
let iter = parts[0]
|
||||
.memtable
|
||||
.ranges(None, RangesOptions::default())
|
||||
.unwrap()
|
||||
.build(None)
|
||||
.unwrap();
|
||||
let timestamps = collect_iter_timestamps(iter);
|
||||
assert_eq!(&[1000, 2000, 3000, 4000], ×tamps[..]);
|
||||
// Check second partition [5000, 10000)
|
||||
let iter = parts[1].memtable.iter(None, None, None).unwrap();
|
||||
let iter = parts[1]
|
||||
.memtable
|
||||
.ranges(None, RangesOptions::default())
|
||||
.unwrap()
|
||||
.build(None)
|
||||
.unwrap();
|
||||
let timestamps = collect_iter_timestamps(iter);
|
||||
assert_eq!(&[5000, 6000], ×tamps[..]);
|
||||
|
||||
@@ -1301,7 +1335,12 @@ mod tests {
|
||||
assert_eq!(3, parts.len());
|
||||
|
||||
// Check new partition [10000, 15000)
|
||||
let iter = parts[2].memtable.iter(None, None, None).unwrap();
|
||||
let iter = parts[2]
|
||||
.memtable
|
||||
.ranges(None, RangesOptions::default())
|
||||
.unwrap()
|
||||
.build(None)
|
||||
.unwrap();
|
||||
let timestamps = collect_iter_timestamps(iter);
|
||||
assert_eq!(&[11000, 12000], ×tamps[..]);
|
||||
|
||||
@@ -1314,7 +1353,12 @@ mod tests {
|
||||
|
||||
let parts = partitions.list_partitions();
|
||||
assert_eq!(1, parts.len());
|
||||
let iter = parts[0].memtable.iter(None, None, None).unwrap();
|
||||
let iter = parts[0]
|
||||
.memtable
|
||||
.ranges(None, RangesOptions::default())
|
||||
.unwrap()
|
||||
.build(None)
|
||||
.unwrap();
|
||||
let timestamps = collect_iter_timestamps(iter);
|
||||
assert_eq!(&[1000, 5000, 9000], ×tamps[..]);
|
||||
}
|
||||
|
||||
@@ -267,39 +267,6 @@ impl Memtable for TimeSeriesMemtable {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(any(test, feature = "test"))]
|
||||
fn iter(
|
||||
&self,
|
||||
projection: Option<&[ColumnId]>,
|
||||
filters: Option<Predicate>,
|
||||
sequence: Option<SequenceRange>,
|
||||
) -> Result<BoxedBatchIterator> {
|
||||
let projection = if let Some(projection) = projection {
|
||||
projection.iter().copied().collect()
|
||||
} else {
|
||||
self.region_metadata
|
||||
.field_columns()
|
||||
.map(|c| c.column_id)
|
||||
.collect()
|
||||
};
|
||||
|
||||
let iter = self.series_set.iter_series(
|
||||
projection,
|
||||
filters,
|
||||
self.dedup,
|
||||
self.merge_mode,
|
||||
sequence,
|
||||
None,
|
||||
)?;
|
||||
|
||||
if self.merge_mode == MergeMode::LastNonNull {
|
||||
let iter = LastNonNullIter::new(iter);
|
||||
Ok(Box::new(iter))
|
||||
} else {
|
||||
Ok(Box::new(iter))
|
||||
}
|
||||
}
|
||||
|
||||
fn ranges(
|
||||
&self,
|
||||
projection: Option<&[ColumnId]>,
|
||||
@@ -1798,7 +1765,9 @@ mod tests {
|
||||
*expected_ts.entry(ts).or_default() += if dedup { 1 } else { 2 };
|
||||
}
|
||||
|
||||
let iter = memtable.iter(None, None, None).unwrap();
|
||||
let ranges = memtable.ranges(None, RangesOptions::default()).unwrap();
|
||||
let range = ranges.ranges.into_values().next().unwrap();
|
||||
let iter = range.build_iter().unwrap();
|
||||
let mut read = HashMap::new();
|
||||
|
||||
for ts in iter
|
||||
@@ -1838,7 +1807,11 @@ mod tests {
|
||||
let memtable = TimeSeriesMemtable::new(schema, 42, None, true, MergeMode::LastRow);
|
||||
memtable.write(&kvs).unwrap();
|
||||
|
||||
let iter = memtable.iter(Some(&[3]), None, None).unwrap();
|
||||
let iter = memtable
|
||||
.ranges(Some(&[3]), RangesOptions::default())
|
||||
.unwrap()
|
||||
.build(None)
|
||||
.unwrap();
|
||||
|
||||
let mut v0_all = vec![];
|
||||
|
||||
@@ -1917,7 +1890,11 @@ mod tests {
|
||||
barrier.wait();
|
||||
|
||||
for _ in 0..10 {
|
||||
let iter = memtable.iter(None, None, None).unwrap();
|
||||
let iter = memtable
|
||||
.ranges(None, RangesOptions::default())
|
||||
.unwrap()
|
||||
.build(None)
|
||||
.unwrap();
|
||||
for batch_result in iter {
|
||||
let _ = batch_result.unwrap();
|
||||
}
|
||||
@@ -1936,7 +1913,11 @@ mod tests {
|
||||
handle.join().unwrap();
|
||||
}
|
||||
|
||||
let iter = memtable.iter(None, None, None).unwrap();
|
||||
let iter = memtable
|
||||
.ranges(None, RangesOptions::default())
|
||||
.unwrap()
|
||||
.build(None)
|
||||
.unwrap();
|
||||
let mut series_count = 0;
|
||||
let mut row_count = 0;
|
||||
|
||||
|
||||
@@ -27,6 +27,7 @@ pub mod projection;
|
||||
pub(crate) mod prune;
|
||||
pub(crate) mod pruner;
|
||||
pub mod range;
|
||||
pub(crate) mod range_cache;
|
||||
pub mod scan_region;
|
||||
pub mod scan_util;
|
||||
pub(crate) mod seq_scan;
|
||||
|
||||
@@ -59,7 +59,7 @@ impl BatchToRecordBatchAdapter {
|
||||
/// - `metadata`: region metadata describing the schema.
|
||||
/// - `codec`: codec for decoding the encoded primary key bytes.
|
||||
/// - `read_column_ids`: projected column ids to read.
|
||||
pub(crate) fn new(
|
||||
pub fn new(
|
||||
iter: BoxedBatchIterator,
|
||||
metadata: RegionMetadataRef,
|
||||
codec: Arc<dyn PrimaryKeyCodec>,
|
||||
|
||||
@@ -21,6 +21,7 @@ use datatypes::arrow::array::{Array, BinaryArray};
|
||||
use datatypes::arrow::compute::concat_batches;
|
||||
use datatypes::arrow::record_batch::RecordBatch;
|
||||
use datatypes::vectors::UInt32Vector;
|
||||
use futures::{Stream, TryStreamExt};
|
||||
use snafu::ResultExt;
|
||||
use store_api::storage::{FileId, TimeSeriesRowSelector};
|
||||
|
||||
@@ -30,7 +31,7 @@ use crate::cache::{
|
||||
};
|
||||
use crate::error::{ComputeArrowSnafu, Result};
|
||||
use crate::memtable::partition_tree::data::timestamp_array_to_i64_slice;
|
||||
use crate::read::{Batch, BatchReader, BoxedBatchReader};
|
||||
use crate::read::{Batch, BatchReader, BoxedBatchReader, BoxedRecordBatchStream};
|
||||
use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
|
||||
use crate::sst::parquet::flat_format::{primary_key_column_index, time_index_column_index};
|
||||
use crate::sst::parquet::format::{PrimaryKeyArray, primary_key_offsets};
|
||||
@@ -610,6 +611,41 @@ impl FlatLastTimestampSelector {
|
||||
}
|
||||
}
|
||||
|
||||
/// Reader that keeps only the last row of each time series from a flat RecordBatch stream.
|
||||
/// Assumes input is sorted, deduped, and contains no delete operations.
|
||||
pub(crate) struct FlatLastRowReader {
|
||||
stream: BoxedRecordBatchStream,
|
||||
selector: FlatLastTimestampSelector,
|
||||
pending: BatchBuffer,
|
||||
}
|
||||
|
||||
impl FlatLastRowReader {
|
||||
/// Creates a new `FlatLastRowReader`.
|
||||
pub(crate) fn new(stream: BoxedRecordBatchStream) -> Self {
|
||||
Self {
|
||||
stream,
|
||||
selector: FlatLastTimestampSelector::default(),
|
||||
pending: BatchBuffer::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Converts the reader into a stream of RecordBatches.
|
||||
pub(crate) fn into_stream(mut self) -> impl Stream<Item = Result<RecordBatch>> {
|
||||
async_stream::try_stream! {
|
||||
while let Some(batch) = self.stream.try_next().await? {
|
||||
self.selector.on_next(batch, &mut self.pending)?;
|
||||
if self.pending.is_full() {
|
||||
yield self.pending.concat()?;
|
||||
}
|
||||
}
|
||||
self.selector.finish(&mut self.pending)?;
|
||||
if !self.pending.is_empty() {
|
||||
yield self.pending.concat()?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Gets the primary key bytes at `index` from the primary key dictionary column.
|
||||
fn primary_key_bytes_at(batch: &RecordBatch, pk_col_idx: usize, index: usize) -> &[u8] {
|
||||
let pk_dict = batch
|
||||
|
||||
252
src/mito2/src/read/range_cache.rs
Normal file
252
src/mito2/src/read/range_cache.rs
Normal file
@@ -0,0 +1,252 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Utilities for the partition range scan result cache.
|
||||
|
||||
use std::mem;
|
||||
use std::sync::Arc;
|
||||
|
||||
use datatypes::arrow::record_batch::RecordBatch;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use store_api::storage::{ColumnId, FileId, RegionId, TimeSeriesRowSelector};
|
||||
|
||||
use crate::memtable::record_batch_estimated_size;
|
||||
use crate::region::options::MergeMode;
|
||||
|
||||
/// Fingerprint of the scan request fields that affect partition range cache reuse.
|
||||
///
|
||||
/// It records a normalized view of the projected columns and filters, plus
|
||||
/// scan options that can change the returned rows. Schema-dependent metadata
|
||||
/// and the partition expression version are included so cached results are not
|
||||
/// reused across incompatible schema or partitioning changes.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
pub(crate) struct ScanRequestFingerprint {
|
||||
/// Projection and filters without the time index and partition exprs.
|
||||
inner: Arc<SharedScanRequestFingerprint>,
|
||||
/// Filters with the time index column.
|
||||
time_filters: Option<Arc<Vec<String>>>,
|
||||
series_row_selector: Option<TimeSeriesRowSelector>,
|
||||
append_mode: bool,
|
||||
filter_deleted: bool,
|
||||
merge_mode: MergeMode,
|
||||
/// We keep the partition expr version to ensure we won't reuse the fingerprint after we change the partition expr.
|
||||
/// We store the version instead of the whole partition expr or partition expr filters.
|
||||
partition_expr_version: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct ScanRequestFingerprintBuilder {
|
||||
pub(crate) read_column_ids: Vec<ColumnId>,
|
||||
pub(crate) read_column_types: Vec<Option<ConcreteDataType>>,
|
||||
pub(crate) filters: Vec<String>,
|
||||
pub(crate) time_filters: Vec<String>,
|
||||
pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
|
||||
pub(crate) append_mode: bool,
|
||||
pub(crate) filter_deleted: bool,
|
||||
pub(crate) merge_mode: MergeMode,
|
||||
pub(crate) partition_expr_version: u64,
|
||||
}
|
||||
|
||||
impl ScanRequestFingerprintBuilder {
|
||||
pub(crate) fn build(self) -> ScanRequestFingerprint {
|
||||
let Self {
|
||||
read_column_ids,
|
||||
read_column_types,
|
||||
filters,
|
||||
time_filters,
|
||||
series_row_selector,
|
||||
append_mode,
|
||||
filter_deleted,
|
||||
merge_mode,
|
||||
partition_expr_version,
|
||||
} = self;
|
||||
|
||||
ScanRequestFingerprint {
|
||||
inner: Arc::new(SharedScanRequestFingerprint {
|
||||
read_column_ids,
|
||||
read_column_types,
|
||||
filters,
|
||||
}),
|
||||
time_filters: (!time_filters.is_empty()).then(|| Arc::new(time_filters)),
|
||||
series_row_selector,
|
||||
append_mode,
|
||||
filter_deleted,
|
||||
merge_mode,
|
||||
partition_expr_version,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Non-copiable struct of the fingerprint.
|
||||
#[derive(Debug, PartialEq, Eq, Hash)]
|
||||
struct SharedScanRequestFingerprint {
|
||||
/// Column ids of the projection.
|
||||
read_column_ids: Vec<ColumnId>,
|
||||
/// Column types of the projection.
|
||||
/// We keep this to ensure we won't reuse the fingerprint after a schema change.
|
||||
read_column_types: Vec<Option<ConcreteDataType>>,
|
||||
/// Filters without the time index column and region partition exprs.
|
||||
filters: Vec<String>,
|
||||
}
|
||||
|
||||
impl ScanRequestFingerprint {
|
||||
#[cfg(test)]
|
||||
pub(crate) fn read_column_ids(&self) -> &[ColumnId] {
|
||||
&self.inner.read_column_ids
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn read_column_types(&self) -> &[Option<ConcreteDataType>] {
|
||||
&self.inner.read_column_types
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn filters(&self) -> &[String] {
|
||||
&self.inner.filters
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn time_filters(&self) -> &[String] {
|
||||
self.time_filters
|
||||
.as_deref()
|
||||
.map(Vec::as_slice)
|
||||
.unwrap_or(&[])
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn without_time_filters(&self) -> Self {
|
||||
Self {
|
||||
inner: Arc::clone(&self.inner),
|
||||
time_filters: None,
|
||||
series_row_selector: self.series_row_selector,
|
||||
append_mode: self.append_mode,
|
||||
filter_deleted: self.filter_deleted,
|
||||
merge_mode: self.merge_mode,
|
||||
partition_expr_version: self.partition_expr_version,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn estimated_size(&self) -> usize {
|
||||
mem::size_of::<SharedScanRequestFingerprint>()
|
||||
+ self.inner.read_column_ids.capacity() * mem::size_of::<ColumnId>()
|
||||
+ self.inner.read_column_types.capacity() * mem::size_of::<Option<ConcreteDataType>>()
|
||||
+ self.inner.filters.capacity() * mem::size_of::<String>()
|
||||
+ self
|
||||
.inner
|
||||
.filters
|
||||
.iter()
|
||||
.map(|filter| filter.capacity())
|
||||
.sum::<usize>()
|
||||
+ self.time_filters.as_ref().map_or(0, |filters| {
|
||||
mem::size_of::<Vec<String>>()
|
||||
+ filters.capacity() * mem::size_of::<String>()
|
||||
+ filters
|
||||
.iter()
|
||||
.map(|filter| filter.capacity())
|
||||
.sum::<usize>()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Cache key for range scan outputs.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
pub(crate) struct RangeScanCacheKey {
|
||||
pub(crate) region_id: RegionId,
|
||||
/// Sorted (file_id, row_group_index) pairs that uniquely identify the covered data.
|
||||
pub(crate) row_groups: Vec<(FileId, i64)>,
|
||||
pub(crate) scan: ScanRequestFingerprint,
|
||||
}
|
||||
|
||||
impl RangeScanCacheKey {
|
||||
pub(crate) fn estimated_size(&self) -> usize {
|
||||
mem::size_of::<Self>()
|
||||
+ self.row_groups.capacity() * mem::size_of::<(FileId, i64)>()
|
||||
+ self.scan.estimated_size()
|
||||
}
|
||||
}
|
||||
|
||||
/// Cached result for one range scan.
|
||||
pub(crate) struct RangeScanCacheValue {
|
||||
pub(crate) batches: Vec<RecordBatch>,
|
||||
}
|
||||
|
||||
impl RangeScanCacheValue {
|
||||
#[cfg_attr(not(test), allow(dead_code))]
|
||||
pub(crate) fn new(batches: Vec<RecordBatch>) -> Self {
|
||||
Self { batches }
|
||||
}
|
||||
|
||||
pub(crate) fn estimated_size(&self) -> usize {
|
||||
mem::size_of::<Self>()
|
||||
+ self.batches.capacity() * mem::size_of::<RecordBatch>()
|
||||
+ self
|
||||
.batches
|
||||
.iter()
|
||||
.map(record_batch_estimated_size)
|
||||
.sum::<usize>()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use store_api::storage::TimeSeriesRowSelector;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn normalizes_and_clears_time_filters() {
|
||||
let normalized = ScanRequestFingerprintBuilder {
|
||||
read_column_ids: vec![1, 2],
|
||||
read_column_types: vec![None, None],
|
||||
filters: vec!["k0 = 'foo'".to_string()],
|
||||
time_filters: vec![],
|
||||
series_row_selector: None,
|
||||
append_mode: false,
|
||||
filter_deleted: true,
|
||||
merge_mode: MergeMode::LastRow,
|
||||
partition_expr_version: 0,
|
||||
}
|
||||
.build();
|
||||
|
||||
assert!(normalized.time_filters().is_empty());
|
||||
|
||||
let fingerprint = ScanRequestFingerprintBuilder {
|
||||
read_column_ids: vec![1, 2],
|
||||
read_column_types: vec![None, None],
|
||||
filters: vec!["k0 = 'foo'".to_string()],
|
||||
time_filters: vec!["ts >= 1000".to_string()],
|
||||
series_row_selector: Some(TimeSeriesRowSelector::LastRow),
|
||||
append_mode: false,
|
||||
filter_deleted: true,
|
||||
merge_mode: MergeMode::LastRow,
|
||||
partition_expr_version: 7,
|
||||
}
|
||||
.build();
|
||||
|
||||
let reset = fingerprint.without_time_filters();
|
||||
|
||||
assert_eq!(reset.read_column_ids(), fingerprint.read_column_ids());
|
||||
assert_eq!(reset.read_column_types(), fingerprint.read_column_types());
|
||||
assert_eq!(reset.filters(), fingerprint.filters());
|
||||
assert!(reset.time_filters().is_empty());
|
||||
assert_eq!(reset.series_row_selector, fingerprint.series_row_selector);
|
||||
assert_eq!(reset.append_mode, fingerprint.append_mode);
|
||||
assert_eq!(reset.filter_deleted, fingerprint.filter_deleted);
|
||||
assert_eq!(reset.merge_mode, fingerprint.merge_mode);
|
||||
assert_eq!(
|
||||
reset.partition_expr_version,
|
||||
fingerprint.partition_expr_version
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -55,6 +55,7 @@ use crate::metrics::READ_SST_COUNT;
|
||||
use crate::read::compat::{self, CompatBatch, FlatCompatBatch, PrimaryKeyCompatBatch};
|
||||
use crate::read::projection::ProjectionMapper;
|
||||
use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
|
||||
use crate::read::range_cache::ScanRequestFingerprint;
|
||||
use crate::read::seq_scan::SeqScan;
|
||||
use crate::read::series_scan::SeriesScan;
|
||||
use crate::read::stream::ScanBatchStream;
|
||||
@@ -815,7 +816,7 @@ pub struct ScanInput {
|
||||
/// But this read columns might also include non-projected columns needed for filtering.
|
||||
pub(crate) read_column_ids: Vec<ColumnId>,
|
||||
/// Time range filter for time index.
|
||||
time_range: Option<TimestampRange>,
|
||||
pub(crate) time_range: Option<TimestampRange>,
|
||||
/// Predicate to push down.
|
||||
pub(crate) predicate: PredicateGroup,
|
||||
/// Region partition expr applied at read time.
|
||||
@@ -1417,6 +1418,92 @@ fn pre_filter_mode(append_mode: bool, merge_mode: MergeMode) -> PreFilterMode {
|
||||
}
|
||||
}
|
||||
|
||||
/// Builds a [ScanRequestFingerprint] from a [ScanInput] if the scan is eligible
|
||||
/// for partition range caching.
|
||||
#[cfg_attr(not(test), allow(dead_code))]
|
||||
pub(crate) fn build_scan_fingerprint(input: &ScanInput) -> Option<ScanRequestFingerprint> {
|
||||
let eligible = input.flat_format
|
||||
&& !input.compaction
|
||||
&& !input.files.is_empty()
|
||||
&& matches!(input.cache_strategy, CacheStrategy::EnableAll(_));
|
||||
|
||||
if !eligible {
|
||||
return None;
|
||||
}
|
||||
|
||||
let metadata = input.region_metadata();
|
||||
let tag_names: HashSet<&str> = metadata
|
||||
.column_metadatas
|
||||
.iter()
|
||||
.filter(|col| col.semantic_type == SemanticType::Tag)
|
||||
.map(|col| col.column_schema.name.as_str())
|
||||
.collect();
|
||||
|
||||
let time_index_name = metadata.time_index_column().column_schema.name.clone();
|
||||
|
||||
let exprs = input
|
||||
.predicate_group()
|
||||
.predicate_without_region()
|
||||
.map(|predicate| predicate.exprs())
|
||||
.unwrap_or_default();
|
||||
|
||||
let mut filters = Vec::new();
|
||||
let mut time_filters = Vec::new();
|
||||
let mut has_tag_filter = false;
|
||||
let mut columns = HashSet::new();
|
||||
|
||||
for expr in exprs {
|
||||
columns.clear();
|
||||
let is_time_only = match expr_to_columns(expr, &mut columns) {
|
||||
Ok(()) if !columns.is_empty() => {
|
||||
has_tag_filter |= columns
|
||||
.iter()
|
||||
.any(|col| tag_names.contains(col.name.as_str()));
|
||||
columns.iter().all(|col| col.name == time_index_name)
|
||||
}
|
||||
_ => false,
|
||||
};
|
||||
|
||||
if is_time_only {
|
||||
time_filters.push(expr.to_string());
|
||||
} else {
|
||||
filters.push(expr.to_string());
|
||||
}
|
||||
}
|
||||
|
||||
if !has_tag_filter {
|
||||
// We only cache requests that have tag filters to avoid caching all series.
|
||||
return None;
|
||||
}
|
||||
|
||||
// Ensure the filters are sorted for consistent fingerprinting.
|
||||
filters.sort_unstable();
|
||||
time_filters.sort_unstable();
|
||||
|
||||
Some(
|
||||
crate::read::range_cache::ScanRequestFingerprintBuilder {
|
||||
read_column_ids: input.read_column_ids.clone(),
|
||||
read_column_types: input
|
||||
.read_column_ids
|
||||
.iter()
|
||||
.map(|id| {
|
||||
metadata
|
||||
.column_by_id(*id)
|
||||
.map(|col| col.column_schema.data_type.clone())
|
||||
})
|
||||
.collect(),
|
||||
filters,
|
||||
time_filters,
|
||||
series_row_selector: input.series_row_selector,
|
||||
append_mode: input.append_mode,
|
||||
filter_deleted: input.filter_deleted,
|
||||
merge_mode: input.merge_mode,
|
||||
partition_expr_version: metadata.partition_expr_version,
|
||||
}
|
||||
.build(),
|
||||
)
|
||||
}
|
||||
|
||||
/// Context shared by different streams from a scanner.
|
||||
/// It contains the input and ranges to scan.
|
||||
pub struct StreamContext {
|
||||
@@ -1763,10 +1850,15 @@ mod tests {
|
||||
|
||||
use datafusion::physical_plan::expressions::lit as physical_lit;
|
||||
use datafusion_expr::{col, lit};
|
||||
use store_api::storage::ScanRequest;
|
||||
use datatypes::value::Value;
|
||||
use partition::expr::col as partition_col;
|
||||
use store_api::metadata::RegionMetadataBuilder;
|
||||
use store_api::storage::{ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector};
|
||||
|
||||
use super::*;
|
||||
use crate::cache::CacheManager;
|
||||
use crate::memtable::time_partition::TimePartitions;
|
||||
use crate::read::range_cache::ScanRequestFingerprintBuilder;
|
||||
use crate::region::options::RegionOptions;
|
||||
use crate::region::version::VersionBuilder;
|
||||
use crate::sst::FormatType;
|
||||
@@ -1804,6 +1896,26 @@ mod tests {
|
||||
)
|
||||
}
|
||||
|
||||
async fn new_scan_input(metadata: RegionMetadataRef, filters: Vec<Expr>) -> ScanInput {
|
||||
let env = SchedulerEnv::new().await;
|
||||
let mapper = ProjectionMapper::new(&metadata, [0, 2, 3].into_iter(), true).unwrap();
|
||||
let predicate = PredicateGroup::new(metadata.as_ref(), &filters).unwrap();
|
||||
let file = FileHandle::new(
|
||||
crate::sst::file::FileMeta::default(),
|
||||
Arc::new(crate::sst::file_purger::NoopFilePurger),
|
||||
);
|
||||
|
||||
ScanInput::new(env.access_layer.clone(), mapper)
|
||||
.with_predicate(predicate)
|
||||
.with_cache(CacheStrategy::EnableAll(Arc::new(
|
||||
CacheManager::builder()
|
||||
.range_result_cache_size(1024)
|
||||
.build(),
|
||||
)))
|
||||
.with_flat_format(true)
|
||||
.with_files(vec![file])
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_build_read_column_ids_includes_filters() {
|
||||
let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
|
||||
@@ -1923,6 +2035,133 @@ mod tests {
|
||||
assert!(scan_region.use_flat_format());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_build_scan_fingerprint_for_eligible_scan() {
|
||||
let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
|
||||
let input = new_scan_input(
|
||||
metadata.clone(),
|
||||
vec![
|
||||
col("ts").gt_eq(lit(1000)),
|
||||
col("k0").eq(lit("foo")),
|
||||
col("v0").gt(lit(1)),
|
||||
],
|
||||
)
|
||||
.await
|
||||
.with_distribution(Some(TimeSeriesDistribution::PerSeries))
|
||||
.with_series_row_selector(Some(TimeSeriesRowSelector::LastRow))
|
||||
.with_merge_mode(MergeMode::LastNonNull)
|
||||
.with_filter_deleted(false);
|
||||
|
||||
let fingerprint = build_scan_fingerprint(&input).unwrap();
|
||||
|
||||
let expected = ScanRequestFingerprintBuilder {
|
||||
read_column_ids: input.read_column_ids.clone(),
|
||||
read_column_types: vec![
|
||||
metadata
|
||||
.column_by_id(0)
|
||||
.map(|col| col.column_schema.data_type.clone()),
|
||||
metadata
|
||||
.column_by_id(2)
|
||||
.map(|col| col.column_schema.data_type.clone()),
|
||||
metadata
|
||||
.column_by_id(3)
|
||||
.map(|col| col.column_schema.data_type.clone()),
|
||||
],
|
||||
filters: vec![
|
||||
col("k0").eq(lit("foo")).to_string(),
|
||||
col("v0").gt(lit(1)).to_string(),
|
||||
],
|
||||
time_filters: vec![col("ts").gt_eq(lit(1000)).to_string()],
|
||||
series_row_selector: Some(TimeSeriesRowSelector::LastRow),
|
||||
append_mode: false,
|
||||
filter_deleted: false,
|
||||
merge_mode: MergeMode::LastNonNull,
|
||||
partition_expr_version: 0,
|
||||
}
|
||||
.build();
|
||||
assert_eq!(expected, fingerprint);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_build_scan_fingerprint_requires_tag_filter() {
|
||||
let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
|
||||
let input = new_scan_input(
|
||||
metadata,
|
||||
vec![col("ts").gt_eq(lit(1000)), col("v0").gt(lit(1))],
|
||||
)
|
||||
.await;
|
||||
|
||||
assert!(build_scan_fingerprint(&input).is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_build_scan_fingerprint_respects_scan_eligibility() {
|
||||
let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
|
||||
let filters = vec![col("k0").eq(lit("foo"))];
|
||||
|
||||
let disabled = ScanInput::new(
|
||||
SchedulerEnv::new().await.access_layer.clone(),
|
||||
ProjectionMapper::new(&metadata, [0, 2, 3].into_iter(), true).unwrap(),
|
||||
)
|
||||
.with_predicate(PredicateGroup::new(metadata.as_ref(), &filters).unwrap())
|
||||
.with_flat_format(true);
|
||||
assert!(build_scan_fingerprint(&disabled).is_none());
|
||||
|
||||
let non_flat = new_scan_input(metadata.clone(), filters.clone())
|
||||
.await
|
||||
.with_flat_format(false);
|
||||
assert!(build_scan_fingerprint(&non_flat).is_none());
|
||||
|
||||
let compaction = new_scan_input(metadata.clone(), filters.clone())
|
||||
.await
|
||||
.with_compaction(true);
|
||||
assert!(build_scan_fingerprint(&compaction).is_none());
|
||||
|
||||
// No files to read.
|
||||
let no_files = new_scan_input(metadata, filters).await.with_files(vec![]);
|
||||
assert!(build_scan_fingerprint(&no_files).is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_build_scan_fingerprint_tracks_schema_and_partition_expr_changes() {
|
||||
let base = metadata_with_primary_key(vec![0, 1], false);
|
||||
let mut builder = RegionMetadataBuilder::from_existing(base);
|
||||
let partition_expr = partition_col("k0")
|
||||
.gt_eq(Value::String("foo".into()))
|
||||
.as_json_str()
|
||||
.unwrap();
|
||||
builder.partition_expr_json(Some(partition_expr));
|
||||
let metadata = Arc::new(builder.build_without_validation().unwrap());
|
||||
|
||||
let input = new_scan_input(metadata.clone(), vec![col("k0").eq(lit("foo"))]).await;
|
||||
let fingerprint = build_scan_fingerprint(&input).unwrap();
|
||||
|
||||
let expected = ScanRequestFingerprintBuilder {
|
||||
read_column_ids: input.read_column_ids.clone(),
|
||||
read_column_types: vec![
|
||||
metadata
|
||||
.column_by_id(0)
|
||||
.map(|col| col.column_schema.data_type.clone()),
|
||||
metadata
|
||||
.column_by_id(2)
|
||||
.map(|col| col.column_schema.data_type.clone()),
|
||||
metadata
|
||||
.column_by_id(3)
|
||||
.map(|col| col.column_schema.data_type.clone()),
|
||||
],
|
||||
filters: vec![col("k0").eq(lit("foo")).to_string()],
|
||||
time_filters: vec![],
|
||||
series_row_selector: None,
|
||||
append_mode: false,
|
||||
filter_deleted: true,
|
||||
merge_mode: MergeMode::LastRow,
|
||||
partition_expr_version: metadata.partition_expr_version,
|
||||
}
|
||||
.build();
|
||||
assert_eq!(expected, fingerprint);
|
||||
assert_ne!(0, metadata.partition_expr_version);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_update_dyn_filters_with_empty_base_predicates() {
|
||||
let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
|
||||
|
||||
@@ -39,7 +39,7 @@ use crate::error::{PartitionOutOfRangeSnafu, Result, TooManyFilesToReadSnafu, Un
|
||||
use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
|
||||
use crate::read::flat_dedup::{FlatDedupReader, FlatLastNonNull, FlatLastRow};
|
||||
use crate::read::flat_merge::FlatMergeReader;
|
||||
use crate::read::last_row::LastRowReader;
|
||||
use crate::read::last_row::{FlatLastRowReader, LastRowReader};
|
||||
use crate::read::merge::MergeReaderBuilder;
|
||||
use crate::read::pruner::{PartitionPruner, Pruner};
|
||||
use crate::read::range::RangeMeta;
|
||||
@@ -289,6 +289,13 @@ impl SeqScan {
|
||||
Box::pin(reader.into_stream()) as _
|
||||
};
|
||||
|
||||
let reader = match &stream_ctx.input.series_row_selector {
|
||||
Some(TimeSeriesRowSelector::LastRow) => {
|
||||
Box::pin(FlatLastRowReader::new(reader).into_stream()) as _
|
||||
}
|
||||
None => reader,
|
||||
};
|
||||
|
||||
Ok(reader)
|
||||
}
|
||||
|
||||
|
||||
@@ -50,7 +50,7 @@ pub(crate) fn parse_wal_options(
|
||||
}
|
||||
|
||||
/// Mode to handle duplicate rows while merging.
|
||||
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, EnumString)]
|
||||
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, EnumString)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
#[strum(serialize_all = "snake_case")]
|
||||
pub enum MergeMode {
|
||||
|
||||
@@ -83,16 +83,6 @@ impl Memtable for EmptyMemtable {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(any(test, feature = "test"))]
|
||||
fn iter(
|
||||
&self,
|
||||
_projection: Option<&[ColumnId]>,
|
||||
_filters: Option<Predicate>,
|
||||
_sequence: Option<SequenceRange>,
|
||||
) -> Result<BoxedBatchIterator> {
|
||||
Ok(Box::new(std::iter::empty()))
|
||||
}
|
||||
|
||||
fn ranges(
|
||||
&self,
|
||||
_projection: Option<&[ColumnId]>,
|
||||
|
||||
@@ -1 +1 @@
|
||||
v0.11.13
|
||||
v0.12.0
|
||||
|
||||
@@ -112,8 +112,8 @@ pub mod utils;
|
||||
use result::HttpOutputWriter;
|
||||
pub(crate) use timeout::DynamicTimeoutLayer;
|
||||
|
||||
mod client_ip;
|
||||
use crate::prom_remote_write::validation::PromValidationMode;
|
||||
|
||||
mod hints;
|
||||
mod read_preference;
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
@@ -883,6 +883,7 @@ impl HttpServer {
|
||||
authorize::check_http_auth,
|
||||
))
|
||||
.layer(middleware::from_fn(hints::extract_hints))
|
||||
.layer(middleware::from_fn(client_ip::log_error_with_client_ip))
|
||||
.layer(middleware::from_fn(
|
||||
read_preference::extract_read_preference,
|
||||
)),
|
||||
@@ -1247,7 +1248,10 @@ impl Server for HttpServer {
|
||||
error!(e; "Failed to set TCP_NODELAY on incoming connection");
|
||||
}
|
||||
});
|
||||
let serve = axum::serve(listener, app.into_make_service());
|
||||
let serve = axum::serve(
|
||||
listener,
|
||||
app.into_make_service_with_connect_info::<SocketAddr>(),
|
||||
);
|
||||
|
||||
// FIXME(yingwen): Support keepalive.
|
||||
// See:
|
||||
|
||||
109
src/servers/src/http/client_ip.rs
Normal file
109
src/servers/src/http/client_ip.rs
Normal file
@@ -0,0 +1,109 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::net::SocketAddr;
|
||||
|
||||
use axum::body::Body;
|
||||
use axum::extract::{ConnectInfo, MatchedPath};
|
||||
use axum::http::Request;
|
||||
use axum::middleware::Next;
|
||||
use axum::response::Response;
|
||||
use common_telemetry::warn;
|
||||
|
||||
/// Middleware that logs HTTP error responses (4xx/5xx) with client IP address.
|
||||
///
|
||||
/// Extracts client address from [`ConnectInfo`] if available.
|
||||
pub async fn log_error_with_client_ip(req: Request<Body>, next: Next) -> Response {
|
||||
let request_info = req
|
||||
.extensions()
|
||||
.get::<ConnectInfo<SocketAddr>>()
|
||||
.map(|c| c.0)
|
||||
.map(|addr| {
|
||||
let method = req.method().clone();
|
||||
let uri = req.uri().clone();
|
||||
let matched_path = req.extensions().get::<MatchedPath>().cloned();
|
||||
(addr, method, uri, matched_path)
|
||||
});
|
||||
|
||||
let response = next.run(req).await;
|
||||
|
||||
if (response.status().is_client_error() || response.status().is_server_error())
|
||||
&& let Some((addr, method, uri, matched_path)) = request_info
|
||||
{
|
||||
warn!(
|
||||
"HTTP error response {} for {} {} (matched: {}) from client {}",
|
||||
response.status(),
|
||||
method,
|
||||
uri,
|
||||
matched_path
|
||||
.as_ref()
|
||||
.map(|p| p.as_str())
|
||||
.unwrap_or("<unknown>"),
|
||||
addr
|
||||
);
|
||||
}
|
||||
|
||||
response
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use axum::Router;
|
||||
use axum::routing::get;
|
||||
use http::StatusCode;
|
||||
use tower::ServiceExt;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_middleware_passes_error_response() {
|
||||
async fn not_found_handler() -> StatusCode {
|
||||
StatusCode::NOT_FOUND
|
||||
}
|
||||
|
||||
let app = Router::new()
|
||||
.route("/not-found", get(not_found_handler))
|
||||
.layer(axum::middleware::from_fn(log_error_with_client_ip));
|
||||
|
||||
let response = app
|
||||
.oneshot(
|
||||
Request::builder()
|
||||
.uri("/not-found")
|
||||
.body(Body::empty())
|
||||
.unwrap(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(response.status(), StatusCode::NOT_FOUND);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_middleware_passes_success_response() {
|
||||
async fn ok_handler() -> StatusCode {
|
||||
StatusCode::OK
|
||||
}
|
||||
|
||||
let app = Router::new()
|
||||
.route("/ok", get(ok_handler))
|
||||
.layer(axum::middleware::from_fn(log_error_with_client_ip));
|
||||
|
||||
let response = app
|
||||
.oneshot(Request::builder().uri("/ok").body(Body::empty()).unwrap())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(response.status(), StatusCode::OK);
|
||||
}
|
||||
}
|
||||
@@ -32,17 +32,24 @@ pub struct ErrorResponse {
|
||||
impl ErrorResponse {
|
||||
pub fn from_error(error: impl ErrorExt) -> Self {
|
||||
let code = error.status_code();
|
||||
|
||||
if code.should_log_error() {
|
||||
error!(error; "Failed to handle HTTP request");
|
||||
} else {
|
||||
debug!("Failed to handle HTTP request, err: {:?}", error);
|
||||
}
|
||||
|
||||
Self::from_error_message(code, error.output_msg())
|
||||
ErrorResponse {
|
||||
code: code as u32,
|
||||
error: error.output_msg(),
|
||||
execution_time_ms: 0,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_error_message(code: StatusCode, msg: String) -> Self {
|
||||
if code.should_log_error() {
|
||||
error!("Failed to handle HTTP request: {}", msg);
|
||||
} else {
|
||||
debug!("Failed to handle HTTP request: {}", msg);
|
||||
}
|
||||
ErrorResponse {
|
||||
code: code as u32,
|
||||
error: msg,
|
||||
|
||||
Reference in New Issue
Block a user