mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-30 03:40:37 +00:00
fix(mito): incorrect field index in ProjectionMapper (#2388)
* chore: update todo comments * test: add test for projection * fix: panics when projecting fields * chore: remove todos
This commit is contained in:
@@ -31,6 +31,8 @@ pub(crate) mod listener;
|
||||
#[cfg(test)]
|
||||
mod open_test;
|
||||
#[cfg(test)]
|
||||
mod projection_test;
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
94
src/mito2/src/engine/projection_test.rs
Normal file
94
src/mito2/src/engine/projection_test.rs
Normal file
@@ -0,0 +1,94 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use api::v1::value::ValueData;
|
||||
use api::v1::{Row, Rows};
|
||||
use common_recordbatch::RecordBatches;
|
||||
use store_api::region_engine::RegionEngine;
|
||||
use store_api::region_request::RegionRequest;
|
||||
use store_api::storage::{RegionId, ScanRequest};
|
||||
|
||||
use crate::config::MitoConfig;
|
||||
use crate::test_util::{put_rows, rows_schema, CreateRequestBuilder, TestEnv};
|
||||
|
||||
/// Build rows for multiple tags and fields.
|
||||
fn build_rows_multi_tags_fields(
|
||||
tags: &[&str],
|
||||
field_starts: &[usize],
|
||||
ts_range: (usize, usize),
|
||||
) -> Vec<Row> {
|
||||
(ts_range.0..ts_range.1)
|
||||
.enumerate()
|
||||
.map(|(idx, ts)| {
|
||||
let mut values = Vec::with_capacity(tags.len() + field_starts.len() + 1);
|
||||
for tag in tags {
|
||||
values.push(api::v1::Value {
|
||||
value_data: Some(ValueData::StringValue(tag.to_string())),
|
||||
});
|
||||
}
|
||||
for field_start in field_starts {
|
||||
values.push(api::v1::Value {
|
||||
value_data: Some(ValueData::F64Value((field_start + idx) as f64)),
|
||||
});
|
||||
}
|
||||
values.push(api::v1::Value {
|
||||
value_data: Some(ValueData::TsMillisecondValue(ts as i64 * 1000)),
|
||||
});
|
||||
|
||||
api::v1::Row { values }
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_scan_projection() {
|
||||
let mut env = TestEnv::new();
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
// [tag_0, tag_1, field_0, field_1, ts]
|
||||
let request = CreateRequestBuilder::new().tag_num(2).field_num(2).build();
|
||||
|
||||
let column_schemas = rows_schema(&request);
|
||||
engine
|
||||
.handle_request(region_id, RegionRequest::Create(request))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let rows = Rows {
|
||||
schema: column_schemas,
|
||||
rows: build_rows_multi_tags_fields(&["a", "b"], &[0, 10], (0, 3)),
|
||||
};
|
||||
put_rows(&engine, region_id, rows).await;
|
||||
|
||||
// Scans tag_1, field_1, ts
|
||||
let request = ScanRequest {
|
||||
sequence: None,
|
||||
projection: Some(vec![1, 3, 4]),
|
||||
filters: Vec::new(),
|
||||
output_ordering: None,
|
||||
limit: None,
|
||||
};
|
||||
let stream = engine.handle_query(region_id, request).await.unwrap();
|
||||
let batches = RecordBatches::try_collect(stream).await.unwrap();
|
||||
let expected = "\
|
||||
+-------+---------+---------------------+
|
||||
| tag_1 | field_1 | ts |
|
||||
+-------+---------+---------------------+
|
||||
| b | 10.0 | 1970-01-01T00:00:00 |
|
||||
| b | 11.0 | 1970-01-01T00:00:01 |
|
||||
| b | 12.0 | 1970-01-01T00:00:02 |
|
||||
+-------+---------+---------------------+";
|
||||
assert_eq!(expected, batches.pretty_print().unwrap());
|
||||
}
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
//! Utilities for projection.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::SemanticType;
|
||||
@@ -55,40 +56,23 @@ impl ProjectionMapper {
|
||||
metadata: &RegionMetadataRef,
|
||||
projection: impl Iterator<Item = usize>,
|
||||
) -> Result<ProjectionMapper> {
|
||||
let projection_len = projection.size_hint().0;
|
||||
let mut batch_indices = Vec::with_capacity(projection_len);
|
||||
let mut column_schemas = Vec::with_capacity(projection_len);
|
||||
let mut column_ids = Vec::with_capacity(projection_len);
|
||||
for idx in projection {
|
||||
let projection: Vec<_> = projection.collect();
|
||||
let mut column_schemas = Vec::with_capacity(projection.len());
|
||||
let mut column_ids = Vec::with_capacity(projection.len());
|
||||
for idx in &projection {
|
||||
// For each projection index, we get the column id for projection.
|
||||
let column = metadata
|
||||
.column_metadatas
|
||||
.get(idx)
|
||||
.get(*idx)
|
||||
.context(InvalidRequestSnafu {
|
||||
region_id: metadata.region_id,
|
||||
reason: format!("projection index {} is out of bound", idx),
|
||||
})?;
|
||||
|
||||
// Get column index in a batch by its semantic type and column id.
|
||||
let batch_index = match column.semantic_type {
|
||||
SemanticType::Tag => {
|
||||
// Safety: It is a primary key column.
|
||||
let index = metadata.primary_key_index(column.column_id).unwrap();
|
||||
BatchIndex::Tag(index)
|
||||
}
|
||||
SemanticType::Timestamp => BatchIndex::Timestamp,
|
||||
SemanticType::Field => {
|
||||
// Safety: It is a field column.
|
||||
let index = metadata.field_index(column.column_id).unwrap();
|
||||
BatchIndex::Field(index)
|
||||
}
|
||||
};
|
||||
batch_indices.push(batch_index);
|
||||
column_ids.push(column.column_id);
|
||||
// Safety: idx is valid.
|
||||
column_schemas.push(metadata.schema.column_schemas()[idx].clone());
|
||||
column_schemas.push(metadata.schema.column_schemas()[*idx].clone());
|
||||
}
|
||||
|
||||
let codec = McmpRowCodec::new(
|
||||
metadata
|
||||
.primary_key_columns()
|
||||
@@ -97,8 +81,39 @@ impl ProjectionMapper {
|
||||
);
|
||||
// Safety: Columns come from existing schema.
|
||||
let output_schema = Arc::new(Schema::new(column_schemas));
|
||||
// Get fields in each batch.
|
||||
let batch_fields = Batch::projected_fields(metadata, &column_ids);
|
||||
|
||||
// Field column id to its index in batch.
|
||||
let field_id_to_index: HashMap<_, _> = batch_fields
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(index, column_id)| (*column_id, index))
|
||||
.collect();
|
||||
// For each projected column, compute its index in batches.
|
||||
let mut batch_indices = Vec::with_capacity(projection.len());
|
||||
for idx in &projection {
|
||||
// Safety: idx is valid.
|
||||
let column = &metadata.column_metadatas[*idx];
|
||||
// Get column index in a batch by its semantic type and column id.
|
||||
let batch_index = match column.semantic_type {
|
||||
SemanticType::Tag => {
|
||||
// Safety: It is a primary key column.
|
||||
let index = metadata.primary_key_index(column.column_id).unwrap();
|
||||
// We always read all primary key so the column always exists and the tag
|
||||
// index is always valid.
|
||||
BatchIndex::Tag(index)
|
||||
}
|
||||
SemanticType::Timestamp => BatchIndex::Timestamp,
|
||||
SemanticType::Field => {
|
||||
// Safety: It is a field column so it should be in `field_id_to_index`.
|
||||
let index = field_id_to_index[&column.column_id];
|
||||
BatchIndex::Field(index)
|
||||
}
|
||||
};
|
||||
batch_indices.push(batch_index);
|
||||
}
|
||||
|
||||
Ok(ProjectionMapper {
|
||||
metadata: metadata.clone(),
|
||||
batch_indices,
|
||||
|
||||
@@ -198,6 +198,8 @@ impl TestEnv {
|
||||
}
|
||||
|
||||
/// Builder to mock a [RegionCreateRequest].
|
||||
///
|
||||
/// It builds schema like `[tag_0, tag_1, ..., field_0, field_1, ..., ts]`.
|
||||
pub struct CreateRequestBuilder {
|
||||
region_dir: String,
|
||||
tag_num: usize,
|
||||
@@ -232,7 +234,7 @@ impl CreateRequestBuilder {
|
||||
}
|
||||
|
||||
pub fn field_num(mut self, value: usize) -> Self {
|
||||
self.tag_num = value;
|
||||
self.field_num = value;
|
||||
self
|
||||
}
|
||||
|
||||
|
||||
@@ -233,14 +233,6 @@ impl RegionMetadata {
|
||||
self.primary_key.iter().position(|id| *id == column_id)
|
||||
}
|
||||
|
||||
/// Returns a column's index in fields if it is a field column.
|
||||
///
|
||||
/// This does a linear search.
|
||||
pub fn field_index(&self, column_id: ColumnId) -> Option<usize> {
|
||||
self.field_columns()
|
||||
.position(|column| column.column_id == column_id)
|
||||
}
|
||||
|
||||
/// Checks whether the metadata is valid.
|
||||
fn validate(&self) -> Result<()> {
|
||||
// Id to name.
|
||||
|
||||
Reference in New Issue
Block a user