feat: always use flat scan path for both format (#7901)

* feat: remove primary_key format scan path

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: remove flat format flag

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: remove CompatReader tests

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: show whether the format is flat in explain

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: stable series scan result

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yingwen
2026-04-02 15:53:33 +08:00
committed by GitHub
parent ba32c5fe9e
commit 2af59ed386
20 changed files with 159 additions and 1775 deletions

View File

@@ -211,7 +211,6 @@ impl ObjbenchCommand {
object_store.clone(),
)
.expected_metadata(Some(region_meta.clone()))
.flat_format(true)
.build()
.await
.map_err(|e| {

View File

@@ -102,10 +102,6 @@ pub struct ScanbenchCommand {
#[clap(long, value_name = "FILE")]
pprof_file: Option<PathBuf>,
/// Force reading the region in flat format.
#[clap(long, default_value_t = false)]
force_flat_format: bool,
/// Enable WAL replay when opening the region.
#[clap(long, default_value_t = false)]
enable_wal: bool,
@@ -580,12 +576,11 @@ impl ScanbenchCommand {
};
println!(
"{} Scanner: {}, Parallelism: {}, Iterations: {}, Force flat format: {}",
"{} Scanner: {}, Parallelism: {}, Iterations: {}",
"".blue(),
self.scanner,
self.parallelism,
self.iterations,
self.force_flat_format,
);
// Start profiling if pprof_file is specified (unless pprof_after_warmup is set)
@@ -626,7 +621,6 @@ impl ScanbenchCommand {
filters: filters.clone(),
series_row_selector,
distribution,
force_flat_format: self.force_flat_format,
..Default::default()
};

View File

@@ -848,7 +848,7 @@ impl CompactionSstReaderBuilder<'_> {
}
fn build_scan_input(self) -> Result<ScanInput> {
let mapper = ProjectionMapper::all(&self.metadata, true)?;
let mapper = ProjectionMapper::all(&self.metadata)?;
let mut scan_input = ScanInput::new(self.sst_layer, mapper)
.with_files(self.inputs.to_vec())
.with_append_mode(self.append_mode)
@@ -857,8 +857,7 @@ impl CompactionSstReaderBuilder<'_> {
.with_filter_deleted(self.filter_deleted)
// We ignore file not found error during compaction.
.with_ignore_file_not_found(true)
.with_merge_mode(self.merge_mode)
.with_flat_format(true);
.with_merge_mode(self.merge_mode);
// This serves as a workaround of https://github.com/GreptimeTeam/greptimedb/issues/3944
// by converting time ranges into predicate.

View File

@@ -12,11 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeMap;
use api::v1::Rows;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_recordbatch::RecordBatches;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datatypes::arrow::array::AsArray;
use datatypes::arrow::datatypes::{Float64Type, TimestampMillisecondType};
use futures::TryStreamExt;
use store_api::region_engine::{PrepareRequest, RegionEngine, RegionScanner};
use store_api::region_request::RegionRequest;
@@ -222,11 +226,16 @@ async fn test_max_concurrent_scan_files_with_format(flat_format: bool) {
}
#[tokio::test]
async fn test_series_scan_primarykey() {
async fn test_series_scan() {
test_series_scan_with_format(false).await;
test_series_scan_with_format(true).await;
}
async fn test_series_scan_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("test_series_scan").await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: false,
default_experimental_flat_format: flat_format,
..Default::default()
})
.await;
@@ -295,10 +304,27 @@ async fn test_series_scan_primarykey() {
})
.unwrap();
let actual_rows = collect_partition_rows_round_robin(&scanner, 3).await;
let mut expected_rows = Vec::new();
for value in [0_i64, 1, 2, 3, 4, 5, 3600, 3601, 3602, 7200, 7201, 7202] {
expected_rows.push((value.to_string(), value as f64, value * 1000));
}
expected_rows.sort_by(|a, b| a.0.cmp(&b.0).then(a.2.cmp(&b.2)));
assert_eq!(expected_rows, actual_rows);
}
/// Scans all partitions in round-robin fashion and returns rows sorted by (tag, ts).
/// Also asserts that each series appears in only one partition.
async fn collect_partition_rows_round_robin(
scanner: &dyn RegionScanner,
num_partitions: usize,
) -> Vec<(String, f64, i64)> {
let metrics_set = ExecutionPlanMetricsSet::default();
let mut partition_batches = vec![vec![]; 3];
let mut streams: Vec<_> = (0..3)
let mut partition_batches = vec![vec![]; num_partitions];
let mut streams: Vec<_> = (0..num_partitions)
.map(|partition| {
let stream = scanner
.scan_partition(&Default::default(), &metrics_set, partition)
@@ -309,11 +335,11 @@ async fn test_series_scan_primarykey() {
let mut num_done = 0;
let mut schema = None;
// Pull streams in round-robin fashion to get the consistent output from the sender.
while num_done < 3 {
while num_done < num_partitions {
if schema.is_none() {
schema = Some(streams[0].as_ref().unwrap().schema().clone());
}
for i in 0..3 {
for i in 0..num_partitions {
let Some(mut stream) = streams[i].take() else {
continue;
};
@@ -326,189 +352,54 @@ async fn test_series_scan_primarykey() {
}
}
let mut check_result = |expected| {
let batches =
RecordBatches::try_new(schema.clone().unwrap(), partition_batches.remove(0)).unwrap();
assert_eq!(expected, batches.pretty_print().unwrap());
};
// Output series order is 0, 1, 2, 3, 3600, 3601, 3602, 4, 5, 7200, 7201, 7202
let expected = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 0 | 0.0 | 1970-01-01T00:00:00 |
| 3 | 3.0 | 1970-01-01T00:00:03 |
| 3602 | 3602.0 | 1970-01-01T01:00:02 |
| 7200 | 7200.0 | 1970-01-01T02:00:00 |
+-------+---------+---------------------+";
check_result(expected);
let expected = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 1 | 1.0 | 1970-01-01T00:00:01 |
| 3600 | 3600.0 | 1970-01-01T01:00:00 |
| 4 | 4.0 | 1970-01-01T00:00:04 |
| 7201 | 7201.0 | 1970-01-01T02:00:01 |
+-------+---------+---------------------+";
check_result(expected);
let expected = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 2 | 2.0 | 1970-01-01T00:00:02 |
| 3601 | 3601.0 | 1970-01-01T01:00:01 |
| 5 | 5.0 | 1970-01-01T00:00:05 |
| 7202 | 7202.0 | 1970-01-01T02:00:02 |
+-------+---------+---------------------+";
check_result(expected);
let schema = schema.unwrap();
collect_and_assert_partition_rows(schema, partition_batches)
}
#[tokio::test]
async fn test_series_scan_flat() {
let mut env = TestEnv::with_prefix("test_series_scan").await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: true,
..Default::default()
})
.await;
/// Collects rows sorted by (tag, ts) from partition batches.
/// Also asserts that each series appears in only one partition.
fn collect_and_assert_partition_rows(
schema: datatypes::schema::SchemaRef,
partition_batches: Vec<Vec<common_recordbatch::RecordBatch>>,
) -> Vec<(String, f64, i64)> {
let mut series_to_partition = BTreeMap::new();
let mut actual_rows = Vec::new();
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new()
.insert_option("compaction.type", "twcs")
.insert_option("compaction.twcs.time_window", "1h")
.build();
let column_schemas = test_util::rows_schema(&request);
for (partition, batches) in partition_batches.into_iter().enumerate() {
let batches = RecordBatches::try_new(schema.clone(), batches).unwrap();
let mut partition_series = Vec::new();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
for batch in batches.iter() {
let tags = batch.column_by_name("tag_0").unwrap().as_string::<i32>();
let fields = batch
.column_by_name("field_0")
.unwrap()
.as_primitive::<Float64Type>();
let ts = batch
.column_by_name("ts")
.unwrap()
.as_primitive::<TimestampMillisecondType>();
let put_flush_rows = async |start, end| {
let rows = Rows {
schema: column_schemas.clone(),
rows: test_util::build_rows(start, end),
};
test_util::put_rows(&engine, region_id, rows).await;
test_util::flush_region(&engine, region_id, None).await;
};
// generates 3 SST files
put_flush_rows(0, 3).await;
put_flush_rows(2, 6).await;
put_flush_rows(3600, 3603).await;
// Put to memtable.
let rows = Rows {
schema: column_schemas.clone(),
rows: test_util::build_rows(7200, 7203),
};
test_util::put_rows(&engine, region_id, rows).await;
let request = ScanRequest {
distribution: Some(TimeSeriesDistribution::PerSeries),
..Default::default()
};
let scanner = engine.scanner(region_id, request).await.unwrap();
let Scanner::Series(mut scanner) = scanner else {
panic!("Scanner should be series scan");
};
// 3 partition ranges for 3 time window.
assert_eq!(
3,
scanner.properties().partitions[0].len(),
"unexpected ranges: {:?}",
scanner.properties().partitions
);
let raw_ranges: Vec<_> = scanner
.properties()
.partitions
.iter()
.flatten()
.cloned()
.collect();
let mut new_ranges = Vec::with_capacity(3);
for range in raw_ranges {
new_ranges.push(vec![range]);
}
scanner
.prepare(PrepareRequest {
ranges: Some(new_ranges),
..Default::default()
})
.unwrap();
let metrics_set = ExecutionPlanMetricsSet::default();
let mut partition_batches = vec![vec![]; 3];
let mut streams: Vec<_> = (0..3)
.map(|partition| {
let stream = scanner
.scan_partition(&Default::default(), &metrics_set, partition)
.unwrap();
Some(stream)
})
.collect();
let mut num_done = 0;
let mut schema = None;
// Pull streams in round-robin fashion to get the consistent output from the sender.
while num_done < 3 {
if schema.is_none() {
schema = Some(streams[0].as_ref().unwrap().schema().clone());
for row in 0..batch.num_rows() {
let tag = tags.value(row).to_string();
let field = fields.value(row);
let ts = ts.value(row);
partition_series.push(tag.clone());
actual_rows.push((tag, field, ts));
}
}
for i in 0..3 {
let Some(mut stream) = streams[i].take() else {
continue;
};
let Some(rb) = stream.try_next().await.unwrap() else {
num_done += 1;
continue;
};
partition_batches[i].push(rb);
streams[i] = Some(stream);
partition_series.sort();
partition_series.dedup();
for tag in partition_series {
let prev = series_to_partition.insert(tag.clone(), partition);
assert_eq!(
None, prev,
"series {tag} appears in multiple partitions: {prev:?} and {partition}"
);
}
}
let mut check_result = |expected| {
let batches =
RecordBatches::try_new(schema.clone().unwrap(), partition_batches.remove(0)).unwrap();
assert_eq!(expected, batches.pretty_print().unwrap());
};
// Output series order is 0, 1, 2, 3, 3600, 3601, 3602, 4, 5, 7200, 7201, 7202
let expected = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 0 | 0.0 | 1970-01-01T00:00:00 |
| 1 | 1.0 | 1970-01-01T00:00:01 |
| 2 | 2.0 | 1970-01-01T00:00:02 |
| 3 | 3.0 | 1970-01-01T00:00:03 |
| 7200 | 7200.0 | 1970-01-01T02:00:00 |
| 7201 | 7201.0 | 1970-01-01T02:00:01 |
| 7202 | 7202.0 | 1970-01-01T02:00:02 |
+-------+---------+---------------------+";
check_result(expected);
let expected = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 3600 | 3600.0 | 1970-01-01T01:00:00 |
| 3601 | 3601.0 | 1970-01-01T01:00:01 |
| 3602 | 3602.0 | 1970-01-01T01:00:02 |
+-------+---------+---------------------+";
check_result(expected);
let expected = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 4 | 4.0 | 1970-01-01T00:00:04 |
| 5 | 5.0 | 1970-01-01T00:00:05 |
+-------+---------+---------------------+";
check_result(expected);
actual_rows.sort_by(|a, b| a.0.cmp(&b.0).then(a.2.cmp(&b.2)));
actual_rows
}

View File

@@ -175,6 +175,7 @@ impl Batch {
}
/// Create an empty [`Batch`].
#[allow(dead_code)]
pub(crate) fn empty() -> Self {
Self {
primary_key: vec![],
@@ -677,6 +678,7 @@ impl Batch {
/// Checks the batch is monotonic by timestamps.
#[cfg(debug_assertions)]
#[allow(dead_code)]
pub(crate) fn check_monotonic(&self) -> Result<(), String> {
use std::cmp::Ordering;
if self.timestamps_native().is_none() {
@@ -719,6 +721,7 @@ impl Batch {
/// Returns Ok if the given batch is behind the current batch.
#[cfg(debug_assertions)]
#[allow(dead_code)]
pub(crate) fn check_next_batch(&self, other: &Batch) -> Result<(), String> {
// Checks the primary key
if self.primary_key() < other.primary_key() {
@@ -798,6 +801,7 @@ impl Batch {
/// A struct to check the batch is monotonic.
#[cfg(debug_assertions)]
#[derive(Default)]
#[allow(dead_code)]
pub(crate) struct BatchChecker {
last_batch: Option<Batch>,
start: Option<Timestamp>,
@@ -805,6 +809,7 @@ pub(crate) struct BatchChecker {
}
#[cfg(debug_assertions)]
#[allow(dead_code)]
impl BatchChecker {
/// Attaches the given start timestamp to the checker.
pub(crate) fn with_start(mut self, start: Option<Timestamp>) -> Self {

View File

@@ -98,6 +98,7 @@ pub(crate) enum CompatBatch {
impl CompatBatch {
/// Returns the inner primary key batch adapter if this is a PrimaryKey format.
#[allow(dead_code)]
pub(crate) fn as_primary_key(&self) -> Option<&PrimaryKeyCompatBatch> {
match self {
CompatBatch::PrimaryKey(batch) => Some(batch),
@@ -980,7 +981,6 @@ mod tests {
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use datatypes::value::ValueRef;
use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, UInt8Vector, UInt64Vector};
use mito_codec::row_converter::{
DensePrimaryKeyCodec, PrimaryKeyCodecExt, SparsePrimaryKeyCodec,
};
@@ -992,7 +992,6 @@ mod tests {
use crate::read::flat_projection::FlatProjectionMapper;
use crate::sst::parquet::flat_format::FlatReadFormat;
use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
use crate::test_util::{VecBatchReader, check_reader_result};
/// Creates a new [RegionMetadata].
fn new_metadata(
@@ -1053,44 +1052,6 @@ mod tests {
buffer
}
/// Creates a batch for specific primary `key`.
///
/// `fields`: [(column_id of the field, is null)]
fn new_batch(
primary_key: &[u8],
fields: &[(ColumnId, bool)],
start_ts: i64,
num_rows: usize,
) -> Batch {
let timestamps = Arc::new(TimestampMillisecondVector::from_values(
start_ts..start_ts + num_rows as i64,
));
let sequences = Arc::new(UInt64Vector::from_values(0..num_rows as u64));
let op_types = Arc::new(UInt8Vector::from_vec(vec![OpType::Put as u8; num_rows]));
let field_columns = fields
.iter()
.map(|(id, is_null)| {
let data = if *is_null {
Arc::new(Int64Vector::from(vec![None; num_rows]))
} else {
Arc::new(Int64Vector::from_vec(vec![*id as i64; num_rows]))
};
BatchColumn {
column_id: *id,
data,
}
})
.collect();
Batch::new(
primary_key.to_vec(),
timestamps,
sequences,
op_types,
field_columns,
)
.unwrap()
}
#[test]
fn test_invalid_pk_len() {
let reader_meta = new_metadata(
@@ -1213,311 +1174,6 @@ mod tests {
assert!(may_compat_fields(&mapper, &reader_meta).unwrap().is_none())
}
#[tokio::test]
async fn test_compat_reader() {
let reader_meta = Arc::new(new_metadata(
&[
(
0,
SemanticType::Timestamp,
ConcreteDataType::timestamp_millisecond_datatype(),
),
(1, SemanticType::Tag, ConcreteDataType::string_datatype()),
(2, SemanticType::Field, ConcreteDataType::int64_datatype()),
],
&[1],
));
let expect_meta = Arc::new(new_metadata(
&[
(
0,
SemanticType::Timestamp,
ConcreteDataType::timestamp_millisecond_datatype(),
),
(1, SemanticType::Tag, ConcreteDataType::string_datatype()),
(2, SemanticType::Field, ConcreteDataType::int64_datatype()),
(3, SemanticType::Tag, ConcreteDataType::string_datatype()),
(4, SemanticType::Field, ConcreteDataType::int64_datatype()),
],
&[1, 3],
));
let mapper = ProjectionMapper::all(&expect_meta, false).unwrap();
let k1 = encode_key(&[Some("a")]);
let k2 = encode_key(&[Some("b")]);
let source_reader = VecBatchReader::new(&[
new_batch(&k1, &[(2, false)], 1000, 3),
new_batch(&k2, &[(2, false)], 1000, 3),
]);
let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap();
let k1 = encode_key(&[Some("a"), None]);
let k2 = encode_key(&[Some("b"), None]);
check_reader_result(
&mut compat_reader,
&[
new_batch(&k1, &[(2, false), (4, true)], 1000, 3),
new_batch(&k2, &[(2, false), (4, true)], 1000, 3),
],
)
.await;
}
#[tokio::test]
async fn test_compat_reader_different_order() {
let reader_meta = Arc::new(new_metadata(
&[
(
0,
SemanticType::Timestamp,
ConcreteDataType::timestamp_millisecond_datatype(),
),
(1, SemanticType::Tag, ConcreteDataType::string_datatype()),
(2, SemanticType::Field, ConcreteDataType::int64_datatype()),
],
&[1],
));
let expect_meta = Arc::new(new_metadata(
&[
(
0,
SemanticType::Timestamp,
ConcreteDataType::timestamp_millisecond_datatype(),
),
(1, SemanticType::Tag, ConcreteDataType::string_datatype()),
(3, SemanticType::Field, ConcreteDataType::int64_datatype()),
(2, SemanticType::Field, ConcreteDataType::int64_datatype()),
(4, SemanticType::Field, ConcreteDataType::int64_datatype()),
],
&[1],
));
let mapper = ProjectionMapper::all(&expect_meta, false).unwrap();
let k1 = encode_key(&[Some("a")]);
let k2 = encode_key(&[Some("b")]);
let source_reader = VecBatchReader::new(&[
new_batch(&k1, &[(2, false)], 1000, 3),
new_batch(&k2, &[(2, false)], 1000, 3),
]);
let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap();
check_reader_result(
&mut compat_reader,
&[
new_batch(&k1, &[(3, true), (2, false), (4, true)], 1000, 3),
new_batch(&k2, &[(3, true), (2, false), (4, true)], 1000, 3),
],
)
.await;
}
#[tokio::test]
async fn test_compat_reader_different_types() {
let actual_meta = Arc::new(new_metadata(
&[
(
0,
SemanticType::Timestamp,
ConcreteDataType::timestamp_millisecond_datatype(),
),
(1, SemanticType::Tag, ConcreteDataType::string_datatype()),
(2, SemanticType::Field, ConcreteDataType::int64_datatype()),
],
&[1],
));
let expect_meta = Arc::new(new_metadata(
&[
(
0,
SemanticType::Timestamp,
ConcreteDataType::timestamp_millisecond_datatype(),
),
(1, SemanticType::Tag, ConcreteDataType::string_datatype()),
(2, SemanticType::Field, ConcreteDataType::string_datatype()),
],
&[1],
));
let mapper = ProjectionMapper::all(&expect_meta, false).unwrap();
let k1 = encode_key(&[Some("a")]);
let k2 = encode_key(&[Some("b")]);
let source_reader = VecBatchReader::new(&[
new_batch(&k1, &[(2, false)], 1000, 3),
new_batch(&k2, &[(2, false)], 1000, 3),
]);
let fn_batch_cast = |batch: Batch| {
let mut new_fields = batch.fields.clone();
new_fields[0].data = new_fields[0]
.data
.cast(&ConcreteDataType::string_datatype())
.unwrap();
batch.with_fields(new_fields).unwrap()
};
let mut compat_reader = CompatReader::new(&mapper, actual_meta, source_reader).unwrap();
check_reader_result(
&mut compat_reader,
&[
fn_batch_cast(new_batch(&k1, &[(2, false)], 1000, 3)),
fn_batch_cast(new_batch(&k2, &[(2, false)], 1000, 3)),
],
)
.await;
}
#[tokio::test]
async fn test_compat_reader_projection() {
let reader_meta = Arc::new(new_metadata(
&[
(
0,
SemanticType::Timestamp,
ConcreteDataType::timestamp_millisecond_datatype(),
),
(1, SemanticType::Tag, ConcreteDataType::string_datatype()),
(2, SemanticType::Field, ConcreteDataType::int64_datatype()),
],
&[1],
));
let expect_meta = Arc::new(new_metadata(
&[
(
0,
SemanticType::Timestamp,
ConcreteDataType::timestamp_millisecond_datatype(),
),
(1, SemanticType::Tag, ConcreteDataType::string_datatype()),
(3, SemanticType::Field, ConcreteDataType::int64_datatype()),
(2, SemanticType::Field, ConcreteDataType::int64_datatype()),
(4, SemanticType::Field, ConcreteDataType::int64_datatype()),
],
&[1],
));
// tag_1, field_2, field_3
let mapper = ProjectionMapper::new(&expect_meta, [1, 3, 2].into_iter(), false).unwrap();
let k1 = encode_key(&[Some("a")]);
let source_reader = VecBatchReader::new(&[new_batch(&k1, &[(2, false)], 1000, 3)]);
let mut compat_reader =
CompatReader::new(&mapper, reader_meta.clone(), source_reader).unwrap();
check_reader_result(
&mut compat_reader,
&[new_batch(&k1, &[(3, true), (2, false)], 1000, 3)],
)
.await;
// tag_1, field_4, field_3
let mapper = ProjectionMapper::new(&expect_meta, [1, 4, 2].into_iter(), false).unwrap();
let k1 = encode_key(&[Some("a")]);
let source_reader = VecBatchReader::new(&[new_batch(&k1, &[], 1000, 3)]);
let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap();
check_reader_result(
&mut compat_reader,
&[new_batch(&k1, &[(3, true), (4, true)], 1000, 3)],
)
.await;
}
#[tokio::test]
async fn test_compat_reader_projection_read_superset() {
let reader_meta = Arc::new(new_metadata(
&[
(
0,
SemanticType::Timestamp,
ConcreteDataType::timestamp_millisecond_datatype(),
),
(1, SemanticType::Tag, ConcreteDataType::string_datatype()),
(2, SemanticType::Field, ConcreteDataType::int64_datatype()),
],
&[1],
));
let expect_meta = Arc::new(new_metadata(
&[
(
0,
SemanticType::Timestamp,
ConcreteDataType::timestamp_millisecond_datatype(),
),
(1, SemanticType::Tag, ConcreteDataType::string_datatype()),
(3, SemanticType::Field, ConcreteDataType::int64_datatype()),
(2, SemanticType::Field, ConcreteDataType::int64_datatype()),
(4, SemanticType::Field, ConcreteDataType::int64_datatype()),
],
&[1],
));
// Output: tag_1, field_3, field_2. Read also includes field_4.
let mapper = ProjectionMapper::new_with_read_columns(
&expect_meta,
[1, 3, 2].into_iter(),
false,
vec![1, 3, 2, 4],
)
.unwrap();
let k1 = encode_key(&[Some("a")]);
let source_reader = VecBatchReader::new(&[new_batch(&k1, &[(2, false)], 1000, 3)]);
let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap();
check_reader_result(
&mut compat_reader,
&[new_batch(&k1, &[(3, true), (2, false), (4, true)], 1000, 3)],
)
.await;
}
#[tokio::test]
async fn test_compat_reader_different_pk_encoding() {
let mut reader_meta = new_metadata(
&[
(
0,
SemanticType::Timestamp,
ConcreteDataType::timestamp_millisecond_datatype(),
),
(1, SemanticType::Tag, ConcreteDataType::string_datatype()),
(2, SemanticType::Field, ConcreteDataType::int64_datatype()),
],
&[1],
);
reader_meta.primary_key_encoding = PrimaryKeyEncoding::Dense;
let reader_meta = Arc::new(reader_meta);
let mut expect_meta = new_metadata(
&[
(
0,
SemanticType::Timestamp,
ConcreteDataType::timestamp_millisecond_datatype(),
),
(1, SemanticType::Tag, ConcreteDataType::string_datatype()),
(2, SemanticType::Field, ConcreteDataType::int64_datatype()),
(3, SemanticType::Tag, ConcreteDataType::string_datatype()),
(4, SemanticType::Field, ConcreteDataType::int64_datatype()),
],
&[1, 3],
);
expect_meta.primary_key_encoding = PrimaryKeyEncoding::Sparse;
let expect_meta = Arc::new(expect_meta);
let mapper = ProjectionMapper::all(&expect_meta, false).unwrap();
let k1 = encode_key(&[Some("a")]);
let k2 = encode_key(&[Some("b")]);
let source_reader = VecBatchReader::new(&[
new_batch(&k1, &[(2, false)], 1000, 3),
new_batch(&k2, &[(2, false)], 1000, 3),
]);
let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap();
let k1 = encode_sparse_key(&[(1, Some("a")), (3, None)]);
let k2 = encode_sparse_key(&[(1, Some("b")), (3, None)]);
check_reader_result(
&mut compat_reader,
&[
new_batch(&k1, &[(2, false), (4, true)], 1000, 3),
new_batch(&k2, &[(2, false), (4, true)], 1000, 3),
],
)
.await;
}
/// Creates a primary key array for flat format testing.
fn build_flat_test_pk_array(primary_keys: &[&[u8]]) -> ArrayRef {
let mut builder = BinaryDictionaryBuilder::<UInt32Type>::new();

View File

@@ -45,6 +45,7 @@ use crate::sst::parquet::reader::{FlatRowGroupReader, ReaderMetrics, RowGroupRea
///
/// This reader is different from the [MergeMode](crate::region::options::MergeMode) as
/// it focus on time series (the same key).
#[allow(dead_code)]
pub(crate) struct LastRowReader {
/// Inner reader.
reader: BoxedBatchReader,
@@ -52,6 +53,7 @@ pub(crate) struct LastRowReader {
selector: LastRowSelector,
}
#[allow(dead_code)]
impl LastRowReader {
/// Creates a new `LastRowReader`.
pub(crate) fn new(reader: BoxedBatchReader) -> Self {

View File

@@ -52,51 +52,27 @@ impl ProjectionMapper {
pub fn new(
metadata: &RegionMetadataRef,
projection: impl Iterator<Item = usize> + Clone,
flat_format: bool,
) -> Result<Self> {
if flat_format {
Ok(ProjectionMapper::Flat(FlatProjectionMapper::new(
metadata, projection,
)?))
} else {
Ok(ProjectionMapper::PrimaryKey(
PrimaryKeyProjectionMapper::new(metadata, projection)?,
))
}
Ok(ProjectionMapper::Flat(FlatProjectionMapper::new(
metadata, projection,
)?))
}
/// Returns a new mapper with output projection and explicit read columns.
pub fn new_with_read_columns(
metadata: &RegionMetadataRef,
projection: impl Iterator<Item = usize>,
flat_format: bool,
read_column_ids: Vec<ColumnId>,
) -> Result<Self> {
let projection: Vec<_> = projection.collect();
if flat_format {
Ok(ProjectionMapper::Flat(
FlatProjectionMapper::new_with_read_columns(metadata, projection, read_column_ids)?,
))
} else {
Ok(ProjectionMapper::PrimaryKey(
PrimaryKeyProjectionMapper::new_with_read_columns(
metadata,
projection,
read_column_ids,
)?,
))
}
Ok(ProjectionMapper::Flat(
FlatProjectionMapper::new_with_read_columns(metadata, projection, read_column_ids)?,
))
}
/// Returns a new mapper without projection.
pub fn all(metadata: &RegionMetadataRef, flat_format: bool) -> Result<Self> {
if flat_format {
Ok(ProjectionMapper::Flat(FlatProjectionMapper::all(metadata)?))
} else {
Ok(ProjectionMapper::PrimaryKey(
PrimaryKeyProjectionMapper::all(metadata)?,
))
}
pub fn all(metadata: &RegionMetadataRef) -> Result<Self> {
Ok(ProjectionMapper::Flat(FlatProjectionMapper::all(metadata)?))
}
/// Returns the metadata that created the mapper.
@@ -159,6 +135,7 @@ impl ProjectionMapper {
}
/// Handles projection and converts a projected [Batch] to a projected [RecordBatch].
#[allow(dead_code)]
pub struct PrimaryKeyProjectionMapper {
/// Metadata of the region.
metadata: RegionMetadataRef,
@@ -178,6 +155,7 @@ pub struct PrimaryKeyProjectionMapper {
is_empty_projection: bool,
}
#[allow(dead_code)]
impl PrimaryKeyProjectionMapper {
/// Returns a new mapper with projection.
/// If `projection` is empty, it outputs [RecordBatch] without any column but only a row count.
@@ -413,6 +391,7 @@ pub(crate) fn read_column_ids_from_projection(
/// Index of a vector in a [Batch].
#[derive(Debug, Clone, Copy)]
#[allow(dead_code)]
enum BatchIndex {
/// Index in primary keys.
Tag((usize, ColumnId)),
@@ -480,53 +459,6 @@ mod tests {
};
use super::*;
use crate::cache::CacheManager;
use crate::read::BatchBuilder;
fn new_batch(
ts_start: i64,
tags: &[i64],
fields: &[(ColumnId, i64)],
num_rows: usize,
) -> Batch {
let converter = DensePrimaryKeyCodec::with_fields(
(0..tags.len())
.map(|idx| {
(
idx as u32,
SortField::new(ConcreteDataType::int64_datatype()),
)
})
.collect(),
);
let primary_key = converter
.encode(tags.iter().map(|v| ValueRef::Int64(*v)))
.unwrap();
let mut builder = BatchBuilder::new(primary_key);
builder
.timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
(0..num_rows).map(|i| ts_start + i as i64 * 1000),
)))
.unwrap()
.sequences_array(Arc::new(UInt64Array::from_iter_values(0..num_rows as u64)))
.unwrap()
.op_types_array(Arc::new(UInt8Array::from_iter_values(
(0..num_rows).map(|_| OpType::Put as u8),
)))
.unwrap();
for (column_id, field) in fields {
builder
.push_field_array(
*column_id,
Arc::new(Int64Array::from_iter_values(std::iter::repeat_n(
*field, num_rows,
))),
)
.unwrap();
}
builder.build().unwrap()
}
fn print_record_batch(record_batch: RecordBatch) -> String {
pretty::pretty_format_batches(&[record_batch.into_df_record_batch()])
@@ -534,166 +466,6 @@ mod tests {
.to_string()
}
#[test]
fn test_projection_mapper_all() {
let metadata = Arc::new(
TestRegionMetadataBuilder::default()
.num_tags(2)
.num_fields(2)
.build(),
);
// Create the enum wrapper with default format (primary key)
let mapper = ProjectionMapper::all(&metadata, false).unwrap();
assert_eq!([0, 1, 2, 3, 4], mapper.column_ids());
assert_eq!(
[
(3, ConcreteDataType::int64_datatype()),
(4, ConcreteDataType::int64_datatype())
],
mapper.as_primary_key().unwrap().batch_fields()
);
// With vector cache.
let cache = CacheManager::builder().vector_cache_size(1024).build();
let cache = CacheStrategy::EnableAll(Arc::new(cache));
let batch = new_batch(0, &[1, 2], &[(3, 3), (4, 4)], 3);
let record_batch = mapper
.as_primary_key()
.unwrap()
.convert(&batch, &cache)
.unwrap();
let expect = "\
+---------------------+----+----+----+----+
| ts | k0 | k1 | v0 | v1 |
+---------------------+----+----+----+----+
| 1970-01-01T00:00:00 | 1 | 2 | 3 | 4 |
| 1970-01-01T00:00:01 | 1 | 2 | 3 | 4 |
| 1970-01-01T00:00:02 | 1 | 2 | 3 | 4 |
+---------------------+----+----+----+----+";
assert_eq!(expect, print_record_batch(record_batch));
assert!(
cache
.get_repeated_vector(&ConcreteDataType::int64_datatype(), &Value::Int64(1))
.is_some()
);
assert!(
cache
.get_repeated_vector(&ConcreteDataType::int64_datatype(), &Value::Int64(2))
.is_some()
);
assert!(
cache
.get_repeated_vector(&ConcreteDataType::int64_datatype(), &Value::Int64(3))
.is_none()
);
let record_batch = mapper
.as_primary_key()
.unwrap()
.convert(&batch, &cache)
.unwrap();
assert_eq!(expect, print_record_batch(record_batch));
}
#[test]
fn test_projection_mapper_with_projection() {
let metadata = Arc::new(
TestRegionMetadataBuilder::default()
.num_tags(2)
.num_fields(2)
.build(),
);
// Columns v1, k0
let mapper = ProjectionMapper::new(&metadata, [4, 1].into_iter(), false).unwrap();
assert_eq!([4, 1], mapper.column_ids());
assert_eq!(
[(4, ConcreteDataType::int64_datatype())],
mapper.as_primary_key().unwrap().batch_fields()
);
let batch = new_batch(0, &[1, 2], &[(4, 4)], 3);
let cache = CacheManager::builder().vector_cache_size(1024).build();
let cache = CacheStrategy::EnableAll(Arc::new(cache));
let record_batch = mapper
.as_primary_key()
.unwrap()
.convert(&batch, &cache)
.unwrap();
let expect = "\
+----+----+
| v1 | k0 |
+----+----+
| 4 | 1 |
| 4 | 1 |
| 4 | 1 |
+----+----+";
assert_eq!(expect, print_record_batch(record_batch));
}
#[test]
fn test_projection_mapper_read_superset() {
let metadata = Arc::new(
TestRegionMetadataBuilder::default()
.num_tags(2)
.num_fields(2)
.build(),
);
// Output columns v1, k0. Read also includes v0.
let mapper = ProjectionMapper::new_with_read_columns(
&metadata,
[4, 1].into_iter(),
false,
vec![4, 1, 3],
)
.unwrap();
assert_eq!([4, 1, 3], mapper.column_ids());
let batch = new_batch(0, &[1, 2], &[(3, 3), (4, 4)], 3);
let cache = CacheManager::builder().vector_cache_size(1024).build();
let cache = CacheStrategy::EnableAll(Arc::new(cache));
let record_batch = mapper
.as_primary_key()
.unwrap()
.convert(&batch, &cache)
.unwrap();
let expect = "\
+----+----+
| v1 | k0 |
+----+----+
| 4 | 1 |
| 4 | 1 |
| 4 | 1 |
+----+----+";
assert_eq!(expect, print_record_batch(record_batch));
}
#[test]
fn test_projection_mapper_empty_projection() {
let metadata = Arc::new(
TestRegionMetadataBuilder::default()
.num_tags(2)
.num_fields(2)
.build(),
);
// Empty projection
let mapper = ProjectionMapper::new(&metadata, [].into_iter(), false).unwrap();
assert_eq!([0], mapper.column_ids()); // Should still read the time index column
assert!(mapper.output_schema().is_empty());
let pk_mapper = mapper.as_primary_key().unwrap();
assert!(pk_mapper.batch_fields().is_empty());
assert!(!pk_mapper.has_tags);
assert!(pk_mapper.batch_indices.is_empty());
assert!(pk_mapper.is_empty_projection);
let batch = new_batch(0, &[1, 2], &[], 3);
let cache = CacheManager::builder().vector_cache_size(1024).build();
let cache = CacheStrategy::EnableAll(Arc::new(cache));
let record_batch = pk_mapper.convert(&batch, &cache).unwrap();
assert_eq!(3, record_batch.num_rows());
assert_eq!(0, record_batch.num_columns());
assert!(record_batch.schema.is_empty());
}
fn new_flat_batch(
ts_start: Option<i64>,
idx_tags: &[(usize, i64)],
@@ -809,7 +581,7 @@ mod tests {
.build(),
);
let cache = CacheStrategy::Disabled;
let mapper = ProjectionMapper::all(&metadata, true).unwrap();
let mapper = ProjectionMapper::all(&metadata).unwrap();
assert_eq!([0, 1, 2, 3, 4], mapper.column_ids());
assert_eq!(
[
@@ -845,7 +617,7 @@ mod tests {
);
let cache = CacheStrategy::Disabled;
// Columns v1, k0
let mapper = ProjectionMapper::new(&metadata, [4, 1].into_iter(), true).unwrap();
let mapper = ProjectionMapper::new(&metadata, [4, 1].into_iter()).unwrap();
assert_eq!([4, 1], mapper.column_ids());
assert_eq!(
[
@@ -879,13 +651,9 @@ mod tests {
);
let cache = CacheStrategy::Disabled;
// Output columns v1, k0. Read also includes v0.
let mapper = ProjectionMapper::new_with_read_columns(
&metadata,
[4, 1].into_iter(),
true,
vec![4, 1, 3],
)
.unwrap();
let mapper =
ProjectionMapper::new_with_read_columns(&metadata, [4, 1].into_iter(), vec![4, 1, 3])
.unwrap();
assert_eq!([4, 1, 3], mapper.column_ids());
let batch = new_flat_batch(None, &[(1, 1)], &[(3, 3), (4, 4)], 3);
@@ -911,7 +679,7 @@ mod tests {
);
let cache = CacheStrategy::Disabled;
// Empty projection
let mapper = ProjectionMapper::new(&metadata, [].into_iter(), true).unwrap();
let mapper = ProjectionMapper::new(&metadata, [].into_iter()).unwrap();
assert_eq!([0], mapper.column_ids()); // Should still read the time index column
assert!(mapper.output_schema().is_empty());
let flat_mapper = mapper.as_flat().unwrap();

View File

@@ -30,11 +30,13 @@ use crate::sst::file::FileTimeRange;
use crate::sst::parquet::file_range::FileRangeContextRef;
use crate::sst::parquet::reader::{FlatRowGroupReader, ReaderMetrics, RowGroupReader};
#[allow(dead_code)]
pub enum Source {
RowGroup(RowGroupReader),
LastRow(RowGroupLastRowCachedReader),
}
#[allow(dead_code)]
impl Source {
async fn next_batch(&mut self) -> Result<Option<Batch>> {
match self {
@@ -44,6 +46,7 @@ impl Source {
}
}
#[allow(dead_code)]
pub struct PruneReader {
/// Context for file ranges.
context: FileRangeContextRef,
@@ -53,6 +56,7 @@ pub struct PruneReader {
skip_fields: bool,
}
#[allow(dead_code)]
impl PruneReader {
pub(crate) fn new_with_row_group_reader(
ctx: FileRangeContextRef,

View File

@@ -515,7 +515,7 @@ mod tests {
) -> (StreamContext, PartitionRange) {
let env = SchedulerEnv::new().await;
let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
let mapper = ProjectionMapper::new(&metadata, [0, 2, 3].into_iter(), true).unwrap();
let mapper = ProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap();
let predicate = PredicateGroup::new(metadata.as_ref(), &filters).unwrap();
let file_id = FileId::random();
let file = sst_file_handle_with_file_id(
@@ -527,8 +527,7 @@ mod tests {
.with_predicate(predicate)
.with_time_range(query_time_range)
.with_files(vec![file])
.with_cache(test_cache_strategy())
.with_flat_format(true);
.with_cache(test_cache_strategy());
let range_meta = RangeMeta {
time_range: partition_time_range,
indices: smallvec![SourceIndex {

View File

@@ -63,7 +63,6 @@ use crate::read::unordered_scan::UnorderedScan;
use crate::read::{Batch, BoxedRecordBatchStream, RecordBatch, Source};
use crate::region::options::MergeMode;
use crate::region::version::VersionRef;
use crate::sst::FormatType;
use crate::sst::file::FileHandle;
use crate::sst::index::bloom_filter::applier::{
BloomFilterIndexApplierBuilder, BloomFilterIndexApplierRef,
@@ -77,8 +76,6 @@ use crate::sst::index::vector_index::applier::{VectorIndexApplier, VectorIndexAp
use crate::sst::parquet::file_range::PreFilterMode;
use crate::sst::parquet::reader::ReaderMetrics;
/// Parallel scan channel size for flat format.
const FLAT_SCAN_CHANNEL_SIZE: usize = 2;
#[cfg(feature = "vector_index")]
const VECTOR_INDEX_OVERFETCH_MULTIPLIER: usize = 2;
@@ -399,19 +396,12 @@ impl ScanRegion {
self.request.distribution == Some(TimeSeriesDistribution::PerSeries)
}
/// Returns true if the region use flat format.
fn use_flat_format(&self) -> bool {
self.request.force_flat_format
|| self.version.options.sst_format.unwrap_or_default() == FormatType::Flat
}
/// Creates a scan input.
#[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
async fn scan_input(mut self) -> Result<ScanInput> {
async fn scan_input(self) -> Result<ScanInput> {
let sst_min_sequence = self.request.sst_min_sequence.and_then(NonZeroU64::new);
let time_range = self.build_time_range_predicate();
let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
let flat_format = self.use_flat_format();
let read_column_ids = match &self.request.projection {
Some(p) => self.build_read_column_ids(p, &predicate)?,
@@ -429,10 +419,9 @@ impl ScanRegion {
Some(p) => ProjectionMapper::new_with_read_columns(
&self.version.metadata,
p.iter().copied(),
flat_format,
read_column_ids.clone(),
)?,
None => ProjectionMapper::all(&self.version.metadata, flat_format)?,
None => ProjectionMapper::all(&self.version.metadata)?,
};
let ssts = &self.version.ssts;
@@ -496,14 +485,13 @@ impl ScanRegion {
let region_id = self.region_id();
debug!(
"Scan region {}, request: {:?}, time range: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}, flat_format: {}",
"Scan region {}, request: {:?}, time range: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}",
region_id,
self.request,
time_range,
mem_range_builders.len(),
files.len(),
self.version.options.append_mode,
flat_format,
);
let (non_field_filters, field_filters) = self.partition_by_field_filters();
@@ -530,11 +518,6 @@ impl ScanRegion {
}
});
if flat_format {
// The batch is already large enough so we use a small channel size here.
self.parallel_scan_channel_size = FLAT_SCAN_CHANNEL_SIZE;
}
let input = ScanInput::new(self.access_layer, mapper)
.with_time_range(Some(time_range))
.with_predicate(predicate)
@@ -552,7 +535,9 @@ impl ScanRegion {
.with_merge_mode(self.version.options.merge_mode())
.with_series_row_selector(self.request.series_row_selector)
.with_distribution(self.request.distribution)
.with_flat_format(flat_format);
.with_explain_flat_format(
self.version.options.sst_format == Some(crate::sst::FormatType::Flat),
);
#[cfg(feature = "vector_index")]
let input = input
.with_vector_index_applier(vector_index_applier)
@@ -855,8 +840,8 @@ pub struct ScanInput {
pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
/// Hint for the required distribution of the scanner.
pub(crate) distribution: Option<TimeSeriesDistribution>,
/// Whether to use flat format.
pub(crate) flat_format: bool,
/// Whether the region's configured SST format is flat.
explain_flat_format: bool,
/// Whether this scan is for compaction.
pub(crate) compaction: bool,
#[cfg(feature = "enterprise")]
@@ -893,7 +878,7 @@ impl ScanInput {
merge_mode: MergeMode::default(),
series_row_selector: None,
distribution: None,
flat_format: false,
explain_flat_format: false,
compaction: false,
#[cfg(feature = "enterprise")]
extension_ranges: Vec::new(),
@@ -1049,6 +1034,13 @@ impl ScanInput {
self
}
/// Sets whether the region's configured SST format is flat for explain output.
#[must_use]
pub(crate) fn with_explain_flat_format(mut self, explain_flat_format: bool) -> Self {
self.explain_flat_format = explain_flat_format;
self
}
/// Sets the time series row selector.
#[must_use]
pub(crate) fn with_series_row_selector(
@@ -1059,13 +1051,6 @@ impl ScanInput {
self
}
/// Sets whether to use flat format.
#[must_use]
pub(crate) fn with_flat_format(mut self, flat_format: bool) -> Self {
self.flat_format = flat_format;
self
}
/// Sets whether this scan is for compaction.
#[must_use]
pub(crate) fn with_compaction(mut self, compaction: bool) -> Self {
@@ -1165,7 +1150,6 @@ impl ScanInput {
};
let res = reader
.expected_metadata(Some(self.mapper.metadata().clone()))
.flat_format(self.flat_format)
.compaction(self.compaction)
.pre_filter_mode(filter_mode)
.decode_primary_key_values(decode_pk_values)
@@ -1421,8 +1405,7 @@ fn pre_filter_mode(append_mode: bool, merge_mode: MergeMode) -> PreFilterMode {
/// Builds a [ScanRequestFingerprint] from a [ScanInput] if the scan is eligible
/// for partition range caching.
pub(crate) fn build_scan_fingerprint(input: &ScanInput) -> Option<ScanRequestFingerprint> {
let eligible = input.flat_format
&& !input.compaction
let eligible = !input.compaction
&& !input.files.is_empty()
&& matches!(input.cache_strategy, CacheStrategy::EnableAll(_));
@@ -1709,8 +1692,7 @@ impl StreamContext {
.entries(self.input.files.iter().map(|file| FileWrapper { file }))
.finish()?;
}
write!(f, ", \"flat_format\": {}", self.input.flat_format)?;
write!(f, ", \"flat_format\": {}", self.input.explain_flat_format)?;
#[cfg(feature = "enterprise")]
self.format_extension_ranges(f)?;
@@ -1881,9 +1863,7 @@ mod tests {
use crate::cache::CacheManager;
use crate::memtable::time_partition::TimePartitions;
use crate::read::range_cache::ScanRequestFingerprintBuilder;
use crate::region::options::RegionOptions;
use crate::region::version::VersionBuilder;
use crate::sst::FormatType;
use crate::test_util::memtable_util::{EmptyMemtableBuilder, metadata_with_primary_key};
use crate::test_util::scheduler_util::SchedulerEnv;
@@ -1897,30 +1877,9 @@ mod tests {
Arc::new(VersionBuilder::new(metadata, mutable).build())
}
fn new_version_with_sst_format(
metadata: RegionMetadataRef,
sst_format: Option<FormatType>,
) -> VersionRef {
let mutable = Arc::new(TimePartitions::new(
metadata.clone(),
Arc::new(EmptyMemtableBuilder::default()),
0,
None,
));
let options = RegionOptions {
sst_format,
..Default::default()
};
Arc::new(
VersionBuilder::new(metadata, mutable)
.options(options)
.build(),
)
}
async fn new_scan_input(metadata: RegionMetadataRef, filters: Vec<Expr>) -> ScanInput {
let env = SchedulerEnv::new().await;
let mapper = ProjectionMapper::new(&metadata, [0, 2, 3].into_iter(), true).unwrap();
let mapper = ProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap();
let predicate = PredicateGroup::new(metadata.as_ref(), &filters).unwrap();
let file = FileHandle::new(
crate::sst::file::FileMeta::default(),
@@ -1934,7 +1893,6 @@ mod tests {
.range_result_cache_size(1024)
.build(),
)))
.with_flat_format(true)
.with_files(vec![file])
}
@@ -2018,45 +1976,6 @@ mod tests {
assert_eq!(vec![4, 1, 3], read_ids);
}
#[tokio::test]
async fn test_use_flat_format_honors_request_override() {
let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
let env = SchedulerEnv::new().await;
let primary_key_version =
new_version_with_sst_format(metadata.clone(), Some(FormatType::PrimaryKey));
let request = ScanRequest::default();
let scan_region = ScanRegion::new(
primary_key_version.clone(),
env.access_layer.clone(),
request,
CacheStrategy::Disabled,
);
assert!(!scan_region.use_flat_format());
let request = ScanRequest {
force_flat_format: true,
..Default::default()
};
let scan_region = ScanRegion::new(
primary_key_version,
env.access_layer.clone(),
request,
CacheStrategy::Disabled,
);
assert!(scan_region.use_flat_format());
let flat_version = new_version_with_sst_format(metadata, Some(FormatType::Flat));
let request = ScanRequest::default();
let scan_region = ScanRegion::new(
flat_version,
env.access_layer.clone(),
request,
CacheStrategy::Disabled,
);
assert!(scan_region.use_flat_format());
}
/// Helper to create a timestamp millisecond literal.
fn ts_lit(val: i64) -> datafusion_expr::Expr {
lit(ScalarValue::TimestampMillisecond(Some(val), None))
@@ -2128,17 +2047,11 @@ mod tests {
let disabled = ScanInput::new(
SchedulerEnv::new().await.access_layer.clone(),
ProjectionMapper::new(&metadata, [0, 2, 3].into_iter(), true).unwrap(),
ProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap(),
)
.with_predicate(PredicateGroup::new(metadata.as_ref(), &filters).unwrap())
.with_flat_format(true);
.with_predicate(PredicateGroup::new(metadata.as_ref(), &filters).unwrap());
assert!(build_scan_fingerprint(&disabled).is_none());
let non_flat = new_scan_input(metadata.clone(), filters.clone())
.await
.with_flat_format(false);
assert!(build_scan_fingerprint(&non_flat).is_none());
let compaction = new_scan_input(metadata.clone(), filters.clone())
.await
.with_compaction(true);

View File

@@ -43,7 +43,7 @@ use crate::read::merge::{MergeMetrics, MergeMetricsReport};
use crate::read::pruner::PartitionPruner;
use crate::read::range::{RangeMeta, RowGroupIndex};
use crate::read::scan_region::StreamContext;
use crate::read::{Batch, BoxedBatchStream, BoxedRecordBatchStream, ScannerMetrics, Source};
use crate::read::{BoxedRecordBatchStream, ScannerMetrics};
use crate::sst::file::{FileTimeRange, RegionFileId};
use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplyMetrics;
use crate::sst::index::fulltext_index::applier::FulltextIndexApplyMetrics;
@@ -1186,45 +1186,6 @@ pub(crate) struct SeriesDistributorMetrics {
pub(crate) divider_cost: Duration,
}
/// Scans memtable ranges at `index`.
#[tracing::instrument(
skip_all,
fields(
region_id = %stream_ctx.input.region_metadata().region_id,
file_or_mem_index = %index.index,
row_group_index = %index.row_group_index,
source = "mem"
)
)]
pub(crate) fn scan_mem_ranges(
stream_ctx: Arc<StreamContext>,
part_metrics: PartitionMetrics,
index: RowGroupIndex,
time_range: FileTimeRange,
) -> impl Stream<Item = Result<Batch>> {
try_stream! {
let ranges = stream_ctx.input.build_mem_ranges(index);
part_metrics.inc_num_mem_ranges(ranges.len());
for range in ranges {
let build_reader_start = Instant::now();
let mem_scan_metrics = Some(MemScanMetrics::default());
let iter = range.build_prune_iter(time_range, mem_scan_metrics.clone())?;
part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
let mut source = Source::Iter(iter);
while let Some(batch) = source.next_batch().await? {
yield batch;
}
// Report the memtable scan metrics to partition metrics
if let Some(ref metrics) = mem_scan_metrics {
let data = metrics.data();
part_metrics.report_mem_scan_metrics(&data);
}
}
}
}
/// Scans memtable ranges at `index` using flat format that returns RecordBatch.
#[tracing::instrument(
skip_all,
@@ -1342,59 +1303,6 @@ fn new_filter_metrics(explain_verbose: bool) -> ReaderFilterMetrics {
}
}
/// Scans file ranges at `index`.
#[tracing::instrument(
skip_all,
fields(
region_id = %stream_ctx.input.region_metadata().region_id,
row_group_index = %index.index,
source = read_type
)
)]
pub(crate) async fn scan_file_ranges(
stream_ctx: Arc<StreamContext>,
part_metrics: PartitionMetrics,
index: RowGroupIndex,
read_type: &'static str,
partition_pruner: Arc<PartitionPruner>,
) -> Result<impl Stream<Item = Result<Batch>>> {
let mut reader_metrics = ReaderMetrics {
filter_metrics: new_filter_metrics(part_metrics.explain_verbose()),
..Default::default()
};
let ranges = partition_pruner
.build_file_ranges(index, &part_metrics, &mut reader_metrics)
.await?;
part_metrics.inc_num_file_ranges(ranges.len());
part_metrics.merge_reader_metrics(&reader_metrics, None);
// Creates initial per-file metrics with build_part_cost.
let init_per_file_metrics = if part_metrics.explain_verbose() {
let file = stream_ctx.input.file_from_index(index);
let file_id = file.file_id();
let mut map = HashMap::new();
map.insert(
file_id,
FileScanMetrics {
build_part_cost: reader_metrics.build_cost,
..Default::default()
},
);
Some(map)
} else {
None
};
Ok(build_file_range_scan_stream(
stream_ctx,
part_metrics,
read_type,
ranges,
init_per_file_metrics,
))
}
/// Scans file ranges at `index` using flat reader that returns RecordBatch.
#[tracing::instrument(
skip_all,
@@ -1448,70 +1356,6 @@ pub(crate) async fn scan_flat_file_ranges(
))
}
/// Build the stream of scanning the input [`FileRange`]s.
#[tracing::instrument(
skip_all,
fields(read_type = read_type, range_count = ranges.len())
)]
pub fn build_file_range_scan_stream(
stream_ctx: Arc<StreamContext>,
part_metrics: PartitionMetrics,
read_type: &'static str,
ranges: SmallVec<[FileRange; 2]>,
mut per_file_metrics: Option<HashMap<RegionFileId, FileScanMetrics>>,
) -> impl Stream<Item = Result<Batch>> {
try_stream! {
let fetch_metrics = if part_metrics.explain_verbose() {
Some(Arc::new(ParquetFetchMetrics::default()))
} else {
None
};
let reader_metrics = &mut ReaderMetrics {
fetch_metrics: fetch_metrics.clone(),
..Default::default()
};
for range in ranges {
let build_reader_start = Instant::now();
let Some(reader) = range.reader(stream_ctx.input.series_row_selector, fetch_metrics.as_deref()).await? else {
continue;
};
let build_cost = build_reader_start.elapsed();
part_metrics.inc_build_reader_cost(build_cost);
let compat_batch = range.compat_batch();
let mut source = Source::PruneReader(reader);
while let Some(mut batch) = source.next_batch().await? {
if let Some(compact_batch) = compat_batch {
batch = compact_batch.as_primary_key().unwrap().compat_batch(batch)?;
}
yield batch;
}
if let Source::PruneReader(reader) = source {
let prune_metrics = reader.metrics();
// Update per-file metrics if tracking is enabled
if let Some(file_metrics_map) = per_file_metrics.as_mut() {
let file_id = range.file_handle().file_id();
let file_metrics = file_metrics_map
.entry(file_id)
.or_insert_with(FileScanMetrics::default);
file_metrics.num_ranges += 1;
file_metrics.num_rows += prune_metrics.num_rows;
file_metrics.build_reader_cost += build_cost;
file_metrics.scan_cost += prune_metrics.scan_cost;
}
reader_metrics.merge_from(&prune_metrics);
}
}
// Reports metrics.
reader_metrics.observe_rows(read_type);
reader_metrics.filter_metrics.observe();
part_metrics.merge_reader_metrics(reader_metrics, per_file_metrics.as_ref());
}
}
/// Build the stream of scanning the input [`FileRange`]s using flat reader that returns RecordBatch.
#[tracing::instrument(
skip_all,
@@ -1591,47 +1435,6 @@ pub fn build_flat_file_range_scan_stream(
}
}
/// Build the stream of scanning the extension range denoted by the [`RowGroupIndex`].
#[cfg(feature = "enterprise")]
pub(crate) async fn scan_extension_range(
context: Arc<StreamContext>,
index: RowGroupIndex,
partition_metrics: PartitionMetrics,
) -> Result<BoxedBatchStream> {
use snafu::ResultExt;
let range = context.input.extension_range(index.index);
let reader = range.reader(context.as_ref());
let stream = reader
.read(context, partition_metrics, index)
.await
.context(crate::error::ScanExternalRangeSnafu)?;
Ok(stream)
}
pub(crate) async fn maybe_scan_other_ranges(
context: &Arc<StreamContext>,
index: RowGroupIndex,
metrics: &PartitionMetrics,
) -> Result<BoxedBatchStream> {
#[cfg(feature = "enterprise")]
{
scan_extension_range(context.clone(), index, metrics.clone()).await
}
#[cfg(not(feature = "enterprise"))]
{
let _ = context;
let _ = index;
let _ = metrics;
crate::error::UnexpectedSnafu {
reason: "no other ranges scannable",
}
.fail()
}
}
/// Build the stream of scanning the extension range in flat format denoted by the [`RowGroupIndex`].
#[cfg(feature = "enterprise")]
pub(crate) async fn scan_flat_extension_range(

View File

@@ -27,7 +27,7 @@ use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
use datatypes::schema::SchemaRef;
use futures::{StreamExt, TryStreamExt};
use snafu::{OptionExt, ensure};
use snafu::ensure;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{
PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
@@ -35,24 +35,19 @@ use store_api::region_engine::{
use store_api::storage::TimeSeriesRowSelector;
use tokio::sync::Semaphore;
use crate::error::{PartitionOutOfRangeSnafu, Result, TooManyFilesToReadSnafu, UnexpectedSnafu};
use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
use crate::error::{PartitionOutOfRangeSnafu, Result, TooManyFilesToReadSnafu};
use crate::read::flat_dedup::{FlatDedupReader, FlatLastNonNull, FlatLastRow};
use crate::read::flat_merge::FlatMergeReader;
use crate::read::last_row::{FlatLastRowReader, LastRowReader};
use crate::read::merge::MergeReaderBuilder;
use crate::read::last_row::FlatLastRowReader;
use crate::read::pruner::{PartitionPruner, Pruner};
use crate::read::range::RangeMeta;
use crate::read::scan_region::{ScanInput, StreamContext};
use crate::read::scan_util::{
PartitionMetrics, PartitionMetricsList, SplitRecordBatchStream, scan_file_ranges,
scan_flat_file_ranges, scan_flat_mem_ranges, scan_mem_ranges,
should_split_flat_batches_for_merge,
PartitionMetrics, PartitionMetricsList, SplitRecordBatchStream, scan_flat_file_ranges,
scan_flat_mem_ranges, should_split_flat_batches_for_merge,
};
use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
use crate::read::{
Batch, BatchReader, BoxedBatchReader, BoxedRecordBatchStream, ScannerMetrics, Source, scan_util,
};
use crate::read::{BoxedRecordBatchStream, ScannerMetrics, scan_util};
use crate::region::options::MergeMode;
use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
@@ -121,7 +116,7 @@ impl SeqScan {
let streams = (0..self.properties.partitions.len())
.map(|partition| {
let metrics = self.new_partition_metrics(false, &metrics_set, partition);
self.scan_batch_in_partition(partition, metrics)
self.scan_flat_batch_in_partition(partition, metrics)
})
.collect::<Result<Vec<_>>>()?;
@@ -184,57 +179,6 @@ impl SeqScan {
Self::build_flat_reader_from_sources(stream_ctx, sources, None, None).await
}
/// Builds a reader to read sources. If `semaphore` is provided, reads sources in parallel
/// if possible.
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
pub(crate) async fn build_reader_from_sources(
stream_ctx: &StreamContext,
mut sources: Vec<Source>,
semaphore: Option<Arc<Semaphore>>,
part_metrics: Option<&PartitionMetrics>,
) -> Result<BoxedBatchReader> {
if let Some(semaphore) = semaphore.as_ref() {
// Read sources in parallel.
if sources.len() > 1 {
sources = stream_ctx
.input
.create_parallel_sources(sources, semaphore.clone())?;
}
}
let mut builder = MergeReaderBuilder::from_sources(sources);
if let Some(metrics) = part_metrics {
builder.with_metrics_reporter(Some(metrics.merge_metrics_reporter()));
}
let reader = builder.build().await?;
let dedup = !stream_ctx.input.append_mode;
let dedup_metrics_reporter = part_metrics.map(|m| m.dedup_metrics_reporter());
let reader = if dedup {
match stream_ctx.input.merge_mode {
MergeMode::LastRow => Box::new(DedupReader::new(
reader,
LastRow::new(stream_ctx.input.filter_deleted),
dedup_metrics_reporter,
)) as _,
MergeMode::LastNonNull => Box::new(DedupReader::new(
reader,
LastNonNull::new(stream_ctx.input.filter_deleted),
dedup_metrics_reporter,
)) as _,
}
} else {
Box::new(reader) as _
};
let reader = match &stream_ctx.input.series_row_selector {
Some(TimeSeriesRowSelector::LastRow) => Box::new(LastRowReader::new(reader)) as _,
None => reader,
};
Ok(reader)
}
/// Builds a flat reader to read sources that returns RecordBatch. If `semaphore` is provided, reads sources in parallel
/// if possible.
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
@@ -318,13 +262,7 @@ impl SeqScan {
let metrics = self.new_partition_metrics(ctx.explain_verbose, metrics_set, partition);
let input = &self.stream_ctx.input;
let batch_stream = if input.flat_format {
// Use flat scan for bulk memtables
self.scan_flat_batch_in_partition(partition, metrics.clone())?
} else {
// Use regular batch scan for normal memtables
self.scan_batch_in_partition(partition, metrics.clone())?
};
let batch_stream = self.scan_flat_batch_in_partition(partition, metrics.clone())?;
let record_batch_stream = ConvertBatchStream::new(
batch_stream,
input.mapper.clone(),
@@ -338,125 +276,6 @@ impl SeqScan {
)))
}
#[tracing::instrument(
skip_all,
fields(
region_id = %self.stream_ctx.input.mapper.metadata().region_id,
partition = partition
)
)]
fn scan_batch_in_partition(
&self,
partition: usize,
part_metrics: PartitionMetrics,
) -> Result<ScanBatchStream> {
ensure!(
partition < self.properties.partitions.len(),
PartitionOutOfRangeSnafu {
given: partition,
all: self.properties.partitions.len(),
}
);
if self.properties.partitions[partition].is_empty() {
return Ok(Box::pin(futures::stream::empty()));
}
let stream_ctx = self.stream_ctx.clone();
let semaphore = self.new_semaphore();
let partition_ranges = self.properties.partitions[partition].clone();
let compaction = self.stream_ctx.input.compaction;
let distinguish_range = self.properties.distinguish_partition_range;
let file_scan_semaphore = if compaction { None } else { semaphore.clone() };
let pruner = self.pruner.clone();
// Initializes ref counts for the pruner.
// If we call scan_batch_in_partition() multiple times but don't read all batches from the stream,
// then the ref count won't be decremented.
// This is a rare case and keeping all remaining entries still uses less memory than a per partition cache.
pruner.add_partition_ranges(&partition_ranges);
let partition_pruner = Arc::new(PartitionPruner::new(pruner, &partition_ranges));
let stream = try_stream! {
part_metrics.on_first_poll();
// Start fetch time before building sources so scan cost contains
// build part cost.
let mut fetch_start = Instant::now();
let _mapper = stream_ctx.input.mapper.as_primary_key().context(UnexpectedSnafu {
reason: "Unexpected format",
})?;
// Scans each part.
for part_range in partition_ranges {
let mut sources = Vec::new();
build_sources(
&stream_ctx,
&part_range,
compaction,
&part_metrics,
partition_pruner.clone(),
&mut sources,
file_scan_semaphore.clone(),
).await?;
let mut reader =
Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone(), Some(&part_metrics))
.await?;
#[cfg(debug_assertions)]
let mut checker = crate::read::BatchChecker::default()
.with_start(Some(part_range.start))
.with_end(Some(part_range.end));
let mut metrics = ScannerMetrics {
scan_cost: fetch_start.elapsed(),
..Default::default()
};
fetch_start = Instant::now();
while let Some(batch) = reader.next_batch().await? {
metrics.scan_cost += fetch_start.elapsed();
metrics.num_batches += 1;
metrics.num_rows += batch.num_rows();
debug_assert!(!batch.is_empty());
if batch.is_empty() {
fetch_start = Instant::now();
continue;
}
#[cfg(debug_assertions)]
checker.ensure_part_range_batch(
"SeqScan",
_mapper.metadata().region_id,
partition,
part_range,
&batch,
);
let yield_start = Instant::now();
yield ScanBatch::Normal(batch);
metrics.yield_cost += yield_start.elapsed();
fetch_start = Instant::now();
}
// Yields an empty part to indicate this range is terminated.
// The query engine can use this to optimize some queries.
if distinguish_range {
let yield_start = Instant::now();
yield ScanBatch::Normal(Batch::empty());
metrics.yield_cost += yield_start.elapsed();
}
metrics.scan_cost += fetch_start.elapsed();
fetch_start = Instant::now();
part_metrics.merge_metrics(&metrics);
}
part_metrics.on_finish();
};
Ok(Box::pin(stream))
}
#[tracing::instrument(
skip_all,
fields(
@@ -709,108 +528,6 @@ impl fmt::Debug for SeqScan {
}
}
/// Builds sources for the partition range and push them to the `sources` vector.
pub(crate) async fn build_sources(
stream_ctx: &Arc<StreamContext>,
part_range: &PartitionRange,
compaction: bool,
part_metrics: &PartitionMetrics,
partition_pruner: Arc<PartitionPruner>,
sources: &mut Vec<Source>,
semaphore: Option<Arc<Semaphore>>,
) -> Result<()> {
// Gets range meta.
let range_meta = &stream_ctx.ranges[part_range.identifier];
#[cfg(debug_assertions)]
if compaction {
// Compaction expects input sources are not been split.
debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
// It should scan all row groups.
debug_assert_eq!(
-1, row_group_idx.row_group_index,
"Expect {} range scan all row groups, given: {}",
i, row_group_idx.row_group_index,
);
}
}
let read_type = if compaction {
"compaction"
} else {
"seq_scan_files"
};
let num_indices = range_meta.row_group_indices.len();
if num_indices == 0 {
return Ok(());
}
sources.reserve(num_indices);
let mut ordered_sources = Vec::with_capacity(num_indices);
ordered_sources.resize_with(num_indices, || None);
let mut file_scan_tasks = Vec::new();
for (position, index) in range_meta.row_group_indices.iter().enumerate() {
if stream_ctx.is_mem_range_index(*index) {
let stream = scan_mem_ranges(
stream_ctx.clone(),
part_metrics.clone(),
*index,
range_meta.time_range,
);
ordered_sources[position] = Some(Source::Stream(Box::pin(stream) as _));
} else if stream_ctx.is_file_range_index(*index) {
if let Some(semaphore_ref) = semaphore.as_ref() {
// run in parallel, controlled by semaphore
let stream_ctx = stream_ctx.clone();
let part_metrics = part_metrics.clone();
let partition_pruner = partition_pruner.clone();
let semaphore = Arc::clone(semaphore_ref);
let row_group_index = *index;
file_scan_tasks.push(async move {
let _permit = semaphore.acquire().await.unwrap();
let stream = scan_file_ranges(
stream_ctx,
part_metrics,
row_group_index,
read_type,
partition_pruner,
)
.await?;
Ok((position, Source::Stream(Box::pin(stream) as _)))
});
} else {
// no semaphore, run sequentially
let stream = scan_file_ranges(
stream_ctx.clone(),
part_metrics.clone(),
*index,
read_type,
partition_pruner.clone(),
)
.await?;
ordered_sources[position] = Some(Source::Stream(Box::pin(stream) as _));
}
} else {
let stream =
scan_util::maybe_scan_other_ranges(stream_ctx, *index, part_metrics).await?;
ordered_sources[position] = Some(Source::Stream(stream));
}
}
if !file_scan_tasks.is_empty() {
let results = futures::future::try_join_all(file_scan_tasks).await?;
for (position, source) in results {
ordered_sources[position] = Some(source);
}
}
for source in ordered_sources.into_iter().flatten() {
sources.push(source);
}
Ok(())
}
/// Builds flat sources for the partition range and push them to the `sources` vector.
pub(crate) async fn build_flat_sources(
stream_ctx: &Arc<StreamContext>,

View File

@@ -30,7 +30,7 @@ use datatypes::arrow::array::BinaryArray;
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::schema::SchemaRef;
use futures::{StreamExt, TryStreamExt};
use smallvec::{SmallVec, smallvec};
use smallvec::SmallVec;
use snafu::{OptionExt, ResultExt, ensure};
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{
@@ -44,12 +44,12 @@ use crate::error::{
Error, InvalidSenderSnafu, PartitionOutOfRangeSnafu, Result, ScanMultiTimesSnafu,
ScanSeriesSnafu, TooManyFilesToReadSnafu,
};
use crate::read::ScannerMetrics;
use crate::read::pruner::{PartitionPruner, Pruner};
use crate::read::scan_region::{ScanInput, StreamContext};
use crate::read::scan_util::{PartitionMetrics, PartitionMetricsList, SeriesDistributorMetrics};
use crate::read::seq_scan::{SeqScan, build_flat_sources, build_sources};
use crate::read::seq_scan::{SeqScan, build_flat_sources};
use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
use crate::read::{Batch, ScannerMetrics};
use crate::sst::parquet::flat_format::primary_key_column_index;
use crate::sst::parquet::format::PrimaryKeyArray;
@@ -443,11 +443,7 @@ impl SeriesDistributor {
fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
)]
async fn execute(&mut self) {
let result = if self.stream_ctx.input.flat_format {
self.scan_partitions_flat().await
} else {
self.scan_partitions().await
};
let result = self.scan_partitions_flat().await;
if let Err(e) = result {
self.senders.send_error(e).await;
@@ -559,151 +555,11 @@ impl SeriesDistributor {
Ok(())
}
/// Scans all parts.
#[tracing::instrument(
skip_all,
fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
)]
async fn scan_partitions(&mut self) -> Result<()> {
// Initialize reference counts for all partition ranges.
for partition_ranges in &self.partitions {
self.pruner.add_partition_ranges(partition_ranges);
}
// Create PartitionPruner covering all partitions
let all_partition_ranges: Vec<_> = self.partitions.iter().flatten().cloned().collect();
let partition_pruner = Arc::new(PartitionPruner::new(
self.pruner.clone(),
&all_partition_ranges,
));
let part_metrics = new_partition_metrics(
&self.stream_ctx,
self.explain_verbose,
&self.metrics_set,
self.partitions.len(),
&self.metrics_list,
);
part_metrics.on_first_poll();
// Start fetch time before building sources so scan cost contains
// build part cost.
let mut fetch_start = Instant::now();
// Scans all parts.
let mut sources = Vec::with_capacity(self.partitions.len());
for partition in &self.partitions {
sources.reserve(partition.len());
for part_range in partition {
build_sources(
&self.stream_ctx,
part_range,
false,
&part_metrics,
partition_pruner.clone(),
&mut sources,
self.semaphore.clone(),
)
.await?;
}
}
// Builds a reader that merge sources from all parts.
let mut reader = SeqScan::build_reader_from_sources(
&self.stream_ctx,
sources,
self.semaphore.clone(),
Some(&part_metrics),
)
.await?;
let mut metrics = SeriesDistributorMetrics::default();
let mut current_series = PrimaryKeySeriesBatch::default();
while let Some(batch) = reader.next_batch().await? {
metrics.scan_cost += fetch_start.elapsed();
metrics.num_batches += 1;
metrics.num_rows += batch.num_rows();
debug_assert!(!batch.is_empty());
if batch.is_empty() {
fetch_start = Instant::now();
continue;
}
let Some(last_key) = current_series.current_key() else {
current_series.push(batch);
fetch_start = Instant::now();
continue;
};
if last_key == batch.primary_key() {
current_series.push(batch);
fetch_start = Instant::now();
continue;
}
// We find a new series, send the current one.
let to_send =
std::mem::replace(&mut current_series, PrimaryKeySeriesBatch::single(batch));
let yield_start = Instant::now();
self.senders
.send_batch(SeriesBatch::PrimaryKey(to_send))
.await?;
metrics.yield_cost += yield_start.elapsed();
fetch_start = Instant::now();
}
if !current_series.is_empty() {
let yield_start = Instant::now();
self.senders
.send_batch(SeriesBatch::PrimaryKey(current_series))
.await?;
metrics.yield_cost += yield_start.elapsed();
}
metrics.scan_cost += fetch_start.elapsed();
metrics.num_series_send_timeout = self.senders.num_timeout;
metrics.num_series_send_full = self.senders.num_full;
part_metrics.set_distributor_metrics(&metrics);
part_metrics.on_finish();
Ok(())
}
}
/// Batches of the same series in primary key format.
#[derive(Default, Debug)]
pub struct PrimaryKeySeriesBatch {
pub batches: SmallVec<[Batch; 4]>,
}
impl PrimaryKeySeriesBatch {
/// Creates a new [PrimaryKeySeriesBatch] from a single [Batch].
fn single(batch: Batch) -> Self {
Self {
batches: smallvec![batch],
}
}
fn current_key(&self) -> Option<&[u8]> {
self.batches.first().map(|batch| batch.primary_key())
}
fn push(&mut self, batch: Batch) {
self.batches.push(batch);
}
/// Returns true if there is no batch.
fn is_empty(&self) -> bool {
self.batches.is_empty()
}
}
/// Batches of the same series.
#[derive(Debug)]
pub enum SeriesBatch {
PrimaryKey(PrimaryKeySeriesBatch),
Flat(FlatSeriesBatch),
}
@@ -711,7 +567,6 @@ impl SeriesBatch {
/// Returns the number of batches.
pub fn num_batches(&self) -> usize {
match self {
SeriesBatch::PrimaryKey(primary_key_batch) => primary_key_batch.batches.len(),
SeriesBatch::Flat(flat_batch) => flat_batch.batches.len(),
}
}
@@ -719,9 +574,6 @@ impl SeriesBatch {
/// Returns the total number of rows across all batches.
pub fn num_rows(&self) -> usize {
match self {
SeriesBatch::PrimaryKey(primary_key_batch) => {
primary_key_batch.batches.iter().map(|x| x.num_rows()).sum()
}
SeriesBatch::Flat(flat_batch) => flat_batch.batches.iter().map(|x| x.num_rows()).sum(),
}
}

View File

@@ -27,14 +27,12 @@ use snafu::ResultExt;
use crate::cache::CacheStrategy;
use crate::error::Result;
use crate::read::Batch;
use crate::read::projection::ProjectionMapper;
use crate::read::scan_util::PartitionMetrics;
use crate::read::series_scan::SeriesBatch;
/// All kinds of [`Batch`]es to produce in scanner.
pub enum ScanBatch {
Normal(Batch),
Series(SeriesBatch),
RecordBatch(DfRecordBatch),
}
@@ -45,6 +43,7 @@ pub type ScanBatchStream = BoxStream<'static, Result<ScanBatch>>;
pub(crate) struct ConvertBatchStream {
inner: ScanBatchStream,
projection_mapper: Arc<ProjectionMapper>,
#[allow(dead_code)]
cache_strategy: CacheStrategy,
partition_metrics: PartitionMetrics,
pending: VecDeque<RecordBatch>,
@@ -68,41 +67,19 @@ impl ConvertBatchStream {
fn convert(&mut self, batch: ScanBatch) -> common_recordbatch::error::Result<RecordBatch> {
match batch {
ScanBatch::Normal(batch) => {
// Safety: Only primary key format returns this batch.
let mapper = self.projection_mapper.as_primary_key().unwrap();
if batch.is_empty() {
Ok(mapper.empty_record_batch())
} else {
mapper.convert(&batch, &self.cache_strategy)
}
}
ScanBatch::Series(series) => {
debug_assert!(
self.pending.is_empty(),
"ConvertBatchStream should not convert a new SeriesBatch when pending batches exist"
);
match series {
SeriesBatch::PrimaryKey(primary_key_batch) => {
// Safety: Only primary key format returns this batch.
let mapper = self.projection_mapper.as_primary_key().unwrap();
let SeriesBatch::Flat(flat_batch) = series;
// Safety: Only flat format returns this batch.
let mapper = self.projection_mapper.as_flat().unwrap();
for batch in primary_key_batch.batches {
self.pending
.push_back(mapper.convert(&batch, &self.cache_strategy)?);
}
}
SeriesBatch::Flat(flat_batch) => {
// Safety: Only flat format returns this batch.
let mapper = self.projection_mapper.as_flat().unwrap();
for batch in flat_batch.batches {
self.pending
.push_back(mapper.convert(&batch, &self.cache_strategy)?);
}
}
for batch in flat_batch.batches {
self.pending
.push_back(mapper.convert(&batch, &self.cache_strategy)?);
}
let output_schema = self.projection_mapper.output_schema();

View File

@@ -37,11 +37,10 @@ use crate::error::{PartitionOutOfRangeSnafu, Result};
use crate::read::pruner::{PartitionPruner, Pruner};
use crate::read::scan_region::{ScanInput, StreamContext};
use crate::read::scan_util::{
PartitionMetrics, PartitionMetricsList, scan_file_ranges, scan_flat_file_ranges,
scan_flat_mem_ranges, scan_mem_ranges,
PartitionMetrics, PartitionMetricsList, scan_flat_file_ranges, scan_flat_mem_ranges,
};
use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
use crate::read::{Batch, ScannerMetrics, scan_util};
use crate::read::{ScannerMetrics, scan_util};
/// Scans a region without providing any output ordering guarantee.
///
@@ -103,59 +102,6 @@ impl UnorderedScan {
Ok(stream)
}
/// Scans a [PartitionRange] by its `identifier` and returns a stream.
#[tracing::instrument(
skip_all,
fields(
region_id = %stream_ctx.input.region_metadata().region_id,
part_range_id = part_range_id
)
)]
fn scan_partition_range(
stream_ctx: Arc<StreamContext>,
part_range_id: usize,
part_metrics: PartitionMetrics,
partition_pruner: Arc<PartitionPruner>,
) -> impl Stream<Item = Result<Batch>> {
try_stream! {
// Gets range meta.
let range_meta = &stream_ctx.ranges[part_range_id];
for index in &range_meta.row_group_indices {
if stream_ctx.is_mem_range_index(*index) {
let stream = scan_mem_ranges(
stream_ctx.clone(),
part_metrics.clone(),
*index,
range_meta.time_range,
);
for await batch in stream {
yield batch?;
}
} else if stream_ctx.is_file_range_index(*index) {
let stream = scan_file_ranges(
stream_ctx.clone(),
part_metrics.clone(),
*index,
"unordered_scan_files",
partition_pruner.clone(),
).await?;
for await batch in stream {
yield batch?;
}
} else {
let stream = scan_util::maybe_scan_other_ranges(
&stream_ctx,
*index,
&part_metrics,
).await?;
for await batch in stream {
yield batch?;
}
}
}
}
}
/// Scans a [PartitionRange] by its `identifier` and returns a flat stream of RecordBatch.
#[tracing::instrument(
skip_all,
@@ -216,7 +162,7 @@ impl UnorderedScan {
let streams = (0..self.properties.partitions.len())
.map(|partition| {
let metrics = self.partition_metrics(false, partition, &metrics_set);
self.scan_batch_in_partition(partition, metrics)
self.scan_flat_batch_in_partition(partition, metrics)
})
.collect::<Result<Vec<_>>>()?;
@@ -265,13 +211,7 @@ impl UnorderedScan {
let metrics = self.partition_metrics(ctx.explain_verbose, partition, metrics_set);
let input = &self.stream_ctx.input;
let batch_stream = if input.flat_format {
// Use flat scan for bulk memtables
self.scan_flat_batch_in_partition(partition, metrics.clone())?
} else {
// Use regular batch scan for normal memtables
self.scan_batch_in_partition(partition, metrics.clone())?
};
let batch_stream = self.scan_flat_batch_in_partition(partition, metrics.clone())?;
let record_batch_stream = ConvertBatchStream::new(
batch_stream,
@@ -286,100 +226,6 @@ impl UnorderedScan {
)))
}
#[tracing::instrument(
skip_all,
fields(
region_id = %self.stream_ctx.input.mapper.metadata().region_id,
partition = partition
)
)]
fn scan_batch_in_partition(
&self,
partition: usize,
part_metrics: PartitionMetrics,
) -> Result<ScanBatchStream> {
ensure!(
partition < self.properties.partitions.len(),
PartitionOutOfRangeSnafu {
given: partition,
all: self.properties.partitions.len(),
}
);
let stream_ctx = self.stream_ctx.clone();
let part_ranges = self.properties.partitions[partition].clone();
let distinguish_range = self.properties.distinguish_partition_range;
let pruner = self.pruner.clone();
// Initializes ref counts for the pruner.
// If we call scan_batch_in_partition() multiple times but don't read all batches from the stream,
// then the ref count won't be decremented.
// This is a rare case and keeping all remaining entries still uses less memory than a per partition cache.
pruner.add_partition_ranges(&part_ranges);
let partition_pruner = Arc::new(PartitionPruner::new(pruner, &part_ranges));
let stream = try_stream! {
part_metrics.on_first_poll();
// Scans each part.
for part_range in part_ranges {
let mut metrics = ScannerMetrics::default();
let mut fetch_start = Instant::now();
let _mapper = &stream_ctx.input.mapper;
#[cfg(debug_assertions)]
let mut checker = crate::read::BatchChecker::default()
.with_start(Some(part_range.start))
.with_end(Some(part_range.end));
let stream = Self::scan_partition_range(
stream_ctx.clone(),
part_range.identifier,
part_metrics.clone(),
partition_pruner.clone(),
);
for await batch in stream {
let batch = batch?;
metrics.scan_cost += fetch_start.elapsed();
metrics.num_batches += 1;
metrics.num_rows += batch.num_rows();
debug_assert!(!batch.is_empty());
if batch.is_empty() {
continue;
}
#[cfg(debug_assertions)]
checker.ensure_part_range_batch(
"UnorderedScan",
_mapper.metadata().region_id,
partition,
part_range,
&batch,
);
let yield_start = Instant::now();
yield ScanBatch::Normal(batch);
metrics.yield_cost += yield_start.elapsed();
fetch_start = Instant::now();
}
// Yields an empty part to indicate this range is terminated.
// The query engine can use this to optimize some queries.
if distinguish_range {
let yield_start = Instant::now();
yield ScanBatch::Normal(Batch::empty());
metrics.yield_cost += yield_start.elapsed();
}
metrics.scan_cost += fetch_start.elapsed();
part_metrics.merge_metrics(&metrics);
}
part_metrics.on_finish();
};
Ok(Box::pin(stream))
}
#[tracing::instrument(
skip_all,
fields(

View File

@@ -895,7 +895,6 @@ mod tests {
handle.clone(),
object_store.clone(),
)
.flat_format(true)
.predicate(Some(Predicate::new(preds)))
.inverted_index_appliers([inverted_index_applier.clone(), None])
.bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
@@ -960,7 +959,6 @@ mod tests {
handle.clone(),
object_store.clone(),
)
.flat_format(true)
.predicate(Some(Predicate::new(preds)))
.inverted_index_appliers([inverted_index_applier.clone(), None])
.bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
@@ -1015,7 +1013,6 @@ mod tests {
handle.clone(),
object_store.clone(),
)
.flat_format(true)
.predicate(Some(Predicate::new(preds)))
.inverted_index_appliers([inverted_index_applier.clone(), None])
.bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
@@ -1549,7 +1546,6 @@ mod tests {
handle.clone(),
object_store.clone(),
)
.flat_format(true)
.predicate(Some(Predicate::new(preds)))
.inverted_index_appliers([inverted_index_applier.clone(), None])
.cache(CacheStrategy::EnableAll(cache.clone()));
@@ -1652,7 +1648,6 @@ mod tests {
handle.clone(),
object_store.clone(),
)
.flat_format(true)
.predicate(Some(Predicate::new(preds)))
.bloom_filter_index_appliers([None, bloom_filter_applier.clone()])
.cache(CacheStrategy::EnableAll(cache.clone()));
@@ -1712,7 +1707,6 @@ mod tests {
let builder =
ParquetReaderBuilder::new(FILE_DIR.to_string(), PathType::Bare, handle, object_store)
.flat_format(true)
.predicate(Some(Predicate::new(vec![col("tag_0").eq(lit("a"))])));
let mut metrics = ReaderMetrics::default();
@@ -1774,7 +1768,6 @@ mod tests {
let builder =
ParquetReaderBuilder::new(FILE_DIR.to_string(), PathType::Bare, handle, object_store)
.flat_format(true)
.predicate(Some(Predicate::new(vec![col("tag_0").eq(lit("a"))])));
let mut metrics = ReaderMetrics::default();
@@ -1884,7 +1877,6 @@ mod tests {
handle.clone(),
object_store.clone(),
)
.flat_format(true)
.predicate(Some(Predicate::new(preds)))
.inverted_index_appliers([inverted_index_applier.clone(), None])
.cache(CacheStrategy::EnableAll(cache.clone()));
@@ -1991,7 +1983,6 @@ mod tests {
handle.clone(),
object_store.clone(),
)
.flat_format(true)
.predicate(Some(Predicate::new(preds)))
.bloom_filter_index_appliers([None, bloom_filter_applier.clone()])
.cache(CacheStrategy::EnableAll(cache.clone()));
@@ -2255,7 +2246,6 @@ mod tests {
handle.clone(),
object_store.clone(),
)
.flat_format(true)
.predicate(Some(Predicate::new(preds)))
.fulltext_index_appliers([None, fulltext_applier.clone()])
.cache(CacheStrategy::EnableAll(cache.clone()));
@@ -2304,7 +2294,6 @@ mod tests {
handle.clone(),
object_store.clone(),
)
.flat_format(true)
.predicate(Some(Predicate::new(preds)))
.fulltext_index_appliers([None, fulltext_applier.clone()])
.cache(CacheStrategy::EnableAll(cache.clone()));

View File

@@ -175,6 +175,7 @@ impl FileRange {
}
/// Returns a reader to read the [FileRange].
#[allow(dead_code)]
pub(crate) async fn reader(
&self,
selector: Option<TimeSeriesRowSelector>,

View File

@@ -141,8 +141,6 @@ pub struct ParquetReaderBuilder {
/// This is usually the latest metadata of the region. The reader use
/// it get the correct column id of a column by name.
expected_metadata: Option<RegionMetadataRef>,
/// Whether to use flat format for reading.
flat_format: bool,
/// Whether this reader is for compaction.
compaction: bool,
/// Mode to pre-filter columns.
@@ -176,7 +174,6 @@ impl ParquetReaderBuilder {
#[cfg(feature = "vector_index")]
vector_index_k: None,
expected_metadata: None,
flat_format: false,
compaction: false,
pre_filter_mode: PreFilterMode::All,
decode_primary_key_values: false,
@@ -257,13 +254,6 @@ impl ParquetReaderBuilder {
self
}
/// Sets the flat format flag.
#[must_use]
pub fn flat_format(mut self, flat_format: bool) -> Self {
self.flat_format = flat_format;
self
}
/// Sets the compaction flag.
#[must_use]
pub fn compaction(mut self, compaction: bool) -> Self {
@@ -304,8 +294,7 @@ impl ParquetReaderBuilder {
pub async fn build(&self) -> Result<Option<ParquetReader>> {
let mut metrics = ReaderMetrics::default();
let Some((context, selection)) = self.build_reader_input_inner(&mut metrics, true).await?
else {
let Some((context, selection)) = self.build_reader_input_inner(&mut metrics).await? else {
return Ok(None);
};
ParquetReader::new(Arc::new(context), selection)
@@ -327,14 +316,12 @@ impl ParquetReaderBuilder {
&self,
metrics: &mut ReaderMetrics,
) -> Result<Option<(FileRangeContext, RowGroupSelection)>> {
self.build_reader_input_inner(metrics, self.flat_format)
.await
self.build_reader_input_inner(metrics).await
}
async fn build_reader_input_inner(
&self,
metrics: &mut ReaderMetrics,
flat_format: bool,
) -> Result<Option<(FileRangeContext, RowGroupSelection)>> {
let start = Instant::now();
@@ -376,7 +363,6 @@ impl ParquetReaderBuilder {
// before compat handling.
let compaction_projection_mapper = if self.compaction
&& !is_same_region_partition
&& flat_format
&& region_meta.primary_key_encoding == PrimaryKeyEncoding::Sparse
{
Some(CompactionProjectionMapper::try_new(&region_meta)?)
@@ -388,7 +374,7 @@ impl ParquetReaderBuilder {
ReadFormat::new(
region_meta.clone(),
Some(column_ids),
flat_format,
true, // Always reads as flat format.
Some(parquet_meta.file_metadata().schema_descr().num_columns()),
&file_path,
skip_auto_convert,
@@ -404,7 +390,7 @@ impl ParquetReaderBuilder {
ReadFormat::new(
region_meta.clone(),
Some(&column_ids),
flat_format,
true, // Always reads as flat format.
Some(parquet_meta.file_metadata().schema_descr().num_columns()),
&file_path,
skip_auto_convert,
@@ -2060,6 +2046,7 @@ impl RowGroupReaderContext for FileRangeContextRef {
/// [RowGroupReader] that reads from [FileRange].
pub(crate) type RowGroupReader = RowGroupReaderBase<FileRangeContextRef>;
#[allow(dead_code)]
impl RowGroupReader {
/// Creates a new reader from file range.
pub(crate) fn new(
@@ -2084,6 +2071,7 @@ pub(crate) struct RowGroupReaderBase<T> {
override_sequence: Option<ArrayRef>,
}
#[allow(dead_code)]
impl<T> RowGroupReaderBase<T>
where
T: RowGroupReaderContext,

View File

@@ -128,8 +128,6 @@ pub struct ScanRequest {
/// Optional hint for KNN vector search. When set, the scan should use
/// vector index to find the k nearest neighbors.
pub vector_search: Option<VectorSearchRequest>,
/// Whether to force reading region data in flat format.
pub force_flat_format: bool,
}
impl Display for ScanRequest {
@@ -220,14 +218,6 @@ impl Display for ScanRequest {
vector_search.metric
)?;
}
if self.force_flat_format {
write!(
f,
"{}force_flat_format: {}",
delimiter.as_str(),
self.force_flat_format
)?;
}
write!(f, " }}")
}
}
@@ -282,15 +272,6 @@ mod tests {
"ScanRequest { projection: [1, 2], limit: 10 }"
);
let request = ScanRequest {
force_flat_format: true,
..Default::default()
};
assert_eq!(
request.to_string(),
"ScanRequest { force_flat_format: true }"
);
let request = ScanRequest {
snapshot_on_scan: true,
..Default::default()