feat(metric-engine): introduce RowModifier for MetricEngine (#5380)

* feat(metric-engine): store physical table ColumnIds in `MetricEngineState`

* feat(metric-engine): introduce `RowModifier` for MetricEngine

* chore: upgrade greptime-proto

* feat: introduce `WriteHint` to `RegionPutRequest`

* chore: apply suggestions from CR

* chore: udpate greptime-proto

* chore: apply suggestions from CR

* chore: add comments

* chore: update proto
This commit is contained in:
Weny Xu
2025-01-22 13:16:44 +08:00
committed by GitHub
parent 4259975be9
commit 965a48656f
25 changed files with 640 additions and 90 deletions

3
Cargo.lock generated
View File

@@ -4451,7 +4451,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=ec801a91aa22f9666063d02805f1f60f7c93458a#ec801a91aa22f9666063d02805f1f60f7c93458a"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=6cee3db98a552f1dd848dec3eefcce8f26343748#6cee3db98a552f1dd848dec3eefcce8f26343748"
dependencies = [
"prost 0.12.6",
"serde",
@@ -6495,6 +6495,7 @@ dependencies = [
"prometheus",
"serde",
"serde_json",
"smallvec",
"snafu 0.8.5",
"store-api",
"tokio",

View File

@@ -124,7 +124,7 @@ etcd-client = "0.13"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "ec801a91aa22f9666063d02805f1f60f7c93458a" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "6cee3db98a552f1dd848dec3eefcce8f26343748" }
hex = "0.4"
http = "0.2"
humantime = "2.1"

View File

@@ -29,6 +29,7 @@ object-store.workspace = true
prometheus.workspace = true
serde.workspace = true
serde_json.workspace = true
smallvec.workspace = true
snafu.workspace = true
store-api.workspace = true
tokio.workspace = true

View File

@@ -50,6 +50,7 @@ use crate::config::EngineConfig;
use crate::data_region::DataRegion;
use crate::error::{self, Result, UnsupportedRegionRequestSnafu};
use crate::metadata_region::MetadataRegion;
use crate::row_modifier::RowModifier;
use crate::utils;
#[cfg_attr(doc, aquamarine::aquamarine)]
@@ -267,6 +268,7 @@ impl MetricEngine {
data_region,
state: RwLock::default(),
config,
row_modifier: RowModifier::new(),
}),
}
}
@@ -310,6 +312,7 @@ struct MetricEngineInner {
/// TODO(weny): remove it after the config is used.
#[allow(unused)]
config: EngineConfig,
row_modifier: RowModifier,
}
#[cfg(test)]

View File

@@ -12,15 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::hash::Hash;
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType};
use api::v1::{Rows, WriteHint};
use common_telemetry::{error, info};
use snafu::{ensure, OptionExt};
use store_api::metric_engine_consts::{
DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME,
};
use store_api::codec::PrimaryKeyEncoding;
use store_api::region_request::{AffectedRows, RegionPutRequest};
use store_api::storage::{RegionId, TableId};
@@ -30,11 +25,9 @@ use crate::error::{
PhysicalRegionNotFoundSnafu, Result,
};
use crate::metrics::{FORBIDDEN_OPERATION_COUNT, MITO_OPERATION_ELAPSED};
use crate::row_modifier::RowsIter;
use crate::utils::to_data_region_id;
// A random number
const TSID_HASH_SEED: u32 = 846793005;
impl MetricEngineInner {
/// Dispatch region put request
pub async fn put_region(
@@ -82,8 +75,21 @@ impl MetricEngineInner {
// write to data region
// TODO(weny): retrieve the encoding from the metadata region.
let encoding = PrimaryKeyEncoding::Dense;
// TODO: retrieve table name
self.modify_rows(logical_region_id.table_id(), &mut request.rows)?;
self.modify_rows(
physical_region_id,
logical_region_id.table_id(),
&mut request.rows,
encoding,
)?;
if encoding == PrimaryKeyEncoding::Sparse {
request.hint = Some(WriteHint {
primary_key_encoding: api::v1::PrimaryKeyEncoding::Sparse.into(),
});
}
self.data_region.write_data(data_region_id, request).await
}
@@ -133,67 +139,28 @@ impl MetricEngineInner {
/// Perform metric engine specific logic to incoming rows.
/// - Add table_id column
/// - Generate tsid
fn modify_rows(&self, table_id: TableId, rows: &mut Rows) -> Result<()> {
// gather tag column indices
let tag_col_indices = rows
.schema
.iter()
.enumerate()
.filter_map(|(idx, col)| {
if col.semantic_type == SemanticType::Tag as i32 {
Some((idx, col.column_name.clone()))
} else {
None
}
})
.collect::<Vec<_>>();
// add table_name column
rows.schema.push(ColumnSchema {
column_name: DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string(),
datatype: ColumnDataType::Uint32 as i32,
semantic_type: SemanticType::Tag as _,
datatype_extension: None,
options: None,
});
// add tsid column
rows.schema.push(ColumnSchema {
column_name: DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
datatype: ColumnDataType::Uint64 as i32,
semantic_type: SemanticType::Tag as _,
datatype_extension: None,
options: None,
});
// fill internal columns
for row in &mut rows.rows {
Self::fill_internal_columns(table_id, &tag_col_indices, row);
}
Ok(())
}
/// Fills internal columns of a row with table name and a hash of tag values.
fn fill_internal_columns(
fn modify_rows(
&self,
physical_region_id: RegionId,
table_id: TableId,
tag_col_indices: &[(usize, String)],
row: &mut Row,
) {
let mut hasher = mur3::Hasher128::with_seed(TSID_HASH_SEED);
for (idx, name) in tag_col_indices {
let tag = row.values[*idx].clone();
name.hash(&mut hasher);
// The type is checked before. So only null is ignored.
if let Some(ValueData::StringValue(string)) = tag.value_data {
string.hash(&mut hasher);
}
}
// TSID is 64 bits, simply truncate the 128 bits hash
let (hash, _) = hasher.finish128();
// fill table id and tsid
row.values.push(ValueData::U32Value(table_id).into());
row.values.push(ValueData::U64Value(hash).into());
rows: &mut Rows,
encoding: PrimaryKeyEncoding,
) -> Result<()> {
let input = std::mem::take(rows);
let iter = {
let state = self.state.read().unwrap();
let name_to_id = state
.physical_region_states()
.get(&physical_region_id)
.with_context(|| PhysicalRegionNotFoundSnafu {
region_id: physical_region_id,
})?
.physical_columns();
RowsIter::new(input, name_to_id)
};
let output = self.row_modifier.modify_rows(iter, table_id, encoding)?;
*rows = output;
Ok(())
}
}
@@ -217,6 +184,7 @@ mod tests {
let rows = test_util::build_rows(1, 5);
let request = RegionRequest::Put(RegionPutRequest {
rows: Rows { schema, rows },
hint: None,
});
// write data
@@ -290,6 +258,7 @@ mod tests {
let rows = test_util::build_rows(3, 100);
let request = RegionRequest::Put(RegionPutRequest {
rows: Rows { schema, rows },
hint: None,
});
// write data
@@ -311,6 +280,7 @@ mod tests {
let rows = test_util::build_rows(1, 100);
let request = RegionRequest::Put(RegionPutRequest {
rows: Rows { schema, rows },
hint: None,
});
engine
@@ -330,6 +300,7 @@ mod tests {
let rows = test_util::build_rows(1, 100);
let request = RegionRequest::Put(RegionPutRequest {
rows: Rows { schema, rows },
hint: None,
});
engine

View File

@@ -105,6 +105,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to encode primary key"))]
EncodePrimaryKey {
source: mito2::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Mito write operation fails"))]
MitoWriteOperation {
source: BoxedError,
@@ -283,6 +290,8 @@ impl ErrorExt for Error {
| MitoCatchupOperation { source, .. }
| MitoFlushOperation { source, .. } => source.status_code(),
EncodePrimaryKey { source, .. } => source.status_code(),
CollectRecordBatchStream { source, .. } => source.status_code(),
RegionAlreadyExists { .. } => StatusCode::RegionAlreadyExists,

View File

@@ -58,6 +58,7 @@ pub mod engine;
pub mod error;
mod metadata_region;
mod metrics;
mod row_modifier;
#[cfg(test)]
mod test_util;
mod utils;

View File

@@ -512,7 +512,7 @@ impl MetadataRegion {
}],
};
RegionPutRequest { rows }
RegionPutRequest { rows, hint: None }
}
fn build_delete_request(keys: &[String]) -> RegionDeleteRequest {

View File

@@ -0,0 +1,495 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{BTreeMap, HashMap};
use std::hash::Hash;
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value};
use datatypes::value::ValueRef;
use mito2::row_converter::SparsePrimaryKeyCodec;
use smallvec::SmallVec;
use snafu::ResultExt;
use store_api::codec::PrimaryKeyEncoding;
use store_api::metric_engine_consts::{
DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME,
};
use store_api::storage::consts::{ReservedColumnId, PRIMARY_KEY_COLUMN_NAME};
use store_api::storage::{ColumnId, TableId};
use crate::error::{EncodePrimaryKeySnafu, Result};
// A random number
const TSID_HASH_SEED: u32 = 846793005;
/// A row modifier modifies [`Rows`].
///
/// - For [`PrimaryKeyEncoding::Sparse`] encoding,
/// it replaces the primary key columns with the encoded primary key column(`__primary_key`).
///
/// - For [`PrimaryKeyEncoding::Dense`] encoding,
/// it adds two columns(`__table_id`, `__tsid`) to the row.
pub struct RowModifier {
codec: SparsePrimaryKeyCodec,
}
impl RowModifier {
pub fn new() -> Self {
Self {
codec: SparsePrimaryKeyCodec::schemaless(),
}
}
/// Modify rows with the given primary key encoding.
pub fn modify_rows(
&self,
iter: RowsIter,
table_id: TableId,
encoding: PrimaryKeyEncoding,
) -> Result<Rows> {
match encoding {
PrimaryKeyEncoding::Sparse => self.modify_rows_sparse(iter, table_id),
PrimaryKeyEncoding::Dense => self.modify_rows_dense(iter, table_id),
}
}
/// Modifies rows with sparse primary key encoding.
/// It replaces the primary key columns with the encoded primary key column(`__primary_key`).
fn modify_rows_sparse(&self, mut iter: RowsIter, table_id: TableId) -> Result<Rows> {
let num_column = iter.rows.schema.len();
let num_primary_key_column = iter.index.num_primary_key_column;
// num_output_column = remaining columns(fields columns + timestamp column) + 1 (encoded primary key column)
let num_output_column = num_column - num_primary_key_column + 1;
let mut buffer = vec![];
for mut iter in iter.iter_mut() {
let (table_id, tsid) = self.fill_internal_columns(table_id, &iter);
let mut values = Vec::with_capacity(num_output_column);
buffer.clear();
let internal_columns = [
(
ReservedColumnId::table_id(),
api::helper::pb_value_to_value_ref(&table_id, &None),
),
(
ReservedColumnId::tsid(),
api::helper::pb_value_to_value_ref(&tsid, &None),
),
];
self.codec
.encode_to_vec(internal_columns.into_iter(), &mut buffer)
.context(EncodePrimaryKeySnafu)?;
self.codec
.encode_to_vec(iter.primary_keys(), &mut buffer)
.context(EncodePrimaryKeySnafu)?;
values.push(ValueData::BinaryValue(buffer.clone()).into());
values.extend(iter.remaining());
// Replace the row with the encoded row
*iter.row = Row { values };
}
// Update the schema
let mut schema = Vec::with_capacity(num_output_column);
schema.push(ColumnSchema {
column_name: PRIMARY_KEY_COLUMN_NAME.to_string(),
datatype: ColumnDataType::Binary as i32,
semantic_type: SemanticType::Tag as _,
datatype_extension: None,
options: None,
});
schema.extend(iter.remaining_columns());
iter.rows.schema = schema;
Ok(iter.rows)
}
/// Modifies rows with dense primary key encoding.
/// It adds two columns(`__table_id`, `__tsid`) to the row.
fn modify_rows_dense(&self, mut iter: RowsIter, table_id: TableId) -> Result<Rows> {
// add table_name column
iter.rows.schema.push(ColumnSchema {
column_name: DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string(),
datatype: ColumnDataType::Uint32 as i32,
semantic_type: SemanticType::Tag as _,
datatype_extension: None,
options: None,
});
// add tsid column
iter.rows.schema.push(ColumnSchema {
column_name: DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
datatype: ColumnDataType::Uint64 as i32,
semantic_type: SemanticType::Tag as _,
datatype_extension: None,
options: None,
});
for iter in iter.iter_mut() {
let (table_id, tsid) = self.fill_internal_columns(table_id, &iter);
iter.row.values.push(table_id);
iter.row.values.push(tsid);
}
Ok(iter.rows)
}
/// Fills internal columns of a row with table name and a hash of tag values.
fn fill_internal_columns(&self, table_id: TableId, iter: &RowIter<'_>) -> (Value, Value) {
let mut hasher = mur3::Hasher128::with_seed(TSID_HASH_SEED);
for (name, value) in iter.primary_keys_with_name() {
// The type is checked before. So only null is ignored.
if let Some(ValueData::StringValue(string)) = &value.value_data {
name.hash(&mut hasher);
string.hash(&mut hasher);
}
}
// TSID is 64 bits, simply truncate the 128 bits hash
let (hash, _) = hasher.finish128();
(
ValueData::U32Value(table_id).into(),
ValueData::U64Value(hash).into(),
)
}
}
/// Index of a value.
#[derive(Debug, Clone, Copy)]
struct ValueIndex {
column_id: ColumnId,
index: usize,
}
/// Index of a row.
struct IterIndex {
indices: Vec<ValueIndex>,
num_primary_key_column: usize,
}
impl IterIndex {
fn new(row_schema: &[ColumnSchema], name_to_column_id: &HashMap<String, ColumnId>) -> Self {
let mut reserved_indices = SmallVec::<[ValueIndex; 2]>::new();
// Uses BTreeMap to keep the primary key column name order (lexicographical)
let mut primary_key_indices = BTreeMap::new();
let mut field_indices = SmallVec::<[ValueIndex; 1]>::new();
let mut ts_index = None;
for (idx, col) in row_schema.iter().enumerate() {
match col.semantic_type() {
SemanticType::Tag => match col.column_name.as_str() {
DATA_SCHEMA_TABLE_ID_COLUMN_NAME => {
reserved_indices.push(ValueIndex {
column_id: ReservedColumnId::table_id(),
index: idx,
});
}
DATA_SCHEMA_TSID_COLUMN_NAME => {
reserved_indices.push(ValueIndex {
column_id: ReservedColumnId::tsid(),
index: idx,
});
}
_ => {
// Inserts primary key column name follower the column name order (lexicographical)
primary_key_indices.insert(
col.column_name.as_str(),
ValueIndex {
column_id: *name_to_column_id.get(&col.column_name).unwrap(),
index: idx,
},
);
}
},
SemanticType::Field => {
field_indices.push(ValueIndex {
column_id: *name_to_column_id.get(&col.column_name).unwrap(),
index: idx,
});
}
SemanticType::Timestamp => {
ts_index = Some(ValueIndex {
column_id: *name_to_column_id.get(&col.column_name).unwrap(),
index: idx,
});
}
}
}
let num_primary_key_column = primary_key_indices.len() + reserved_indices.len();
let indices = reserved_indices
.into_iter()
.chain(primary_key_indices.values().cloned())
.chain(ts_index)
.chain(field_indices)
.collect();
IterIndex {
indices,
num_primary_key_column,
}
}
}
/// Iterator of rows.
pub(crate) struct RowsIter {
rows: Rows,
index: IterIndex,
}
impl RowsIter {
pub fn new(rows: Rows, name_to_column_id: &HashMap<String, ColumnId>) -> Self {
let index: IterIndex = IterIndex::new(&rows.schema, name_to_column_id);
Self { rows, index }
}
/// Returns the iterator of rows.
fn iter_mut(&mut self) -> impl Iterator<Item = RowIter> {
self.rows.rows.iter_mut().map(|row| RowIter {
row,
index: &self.index,
schema: &self.rows.schema,
})
}
/// Returns the remaining columns.
fn remaining_columns(&mut self) -> impl Iterator<Item = ColumnSchema> + '_ {
self.index.indices[self.index.num_primary_key_column..]
.iter()
.map(|idx| std::mem::take(&mut self.rows.schema[idx.index]))
}
}
/// Iterator of a row.
struct RowIter<'a> {
row: &'a mut Row,
index: &'a IterIndex,
schema: &'a Vec<ColumnSchema>,
}
impl RowIter<'_> {
/// Returns the primary keys with their names.
fn primary_keys_with_name(&self) -> impl Iterator<Item = (&String, &Value)> {
self.index.indices[..self.index.num_primary_key_column]
.iter()
.map(|idx| {
(
&self.schema[idx.index].column_name,
&self.row.values[idx.index],
)
})
}
/// Returns the primary keys.
fn primary_keys(&self) -> impl Iterator<Item = (ColumnId, ValueRef)> {
self.index.indices[..self.index.num_primary_key_column]
.iter()
.map(|idx| {
(
idx.column_id,
api::helper::pb_value_to_value_ref(
&self.row.values[idx.index],
&self.schema[idx.index].datatype_extension,
),
)
})
}
/// Returns the remaining columns.
fn remaining(&mut self) -> impl Iterator<Item = Value> + '_ {
self.index.indices[self.index.num_primary_key_column..]
.iter()
.map(|idx| std::mem::take(&mut self.row.values[idx.index]))
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use api::v1::{Row, Rows};
use super::*;
fn test_schema() -> Vec<ColumnSchema> {
vec![
ColumnSchema {
column_name: "namespace".to_string(),
datatype: ColumnDataType::String as i32,
semantic_type: SemanticType::Tag as _,
datatype_extension: None,
options: None,
},
ColumnSchema {
column_name: "host".to_string(),
datatype: ColumnDataType::String as i32,
semantic_type: SemanticType::Tag as _,
datatype_extension: None,
options: None,
},
]
}
fn test_row(v1: &str, v2: &str) -> Row {
Row {
values: vec![
ValueData::StringValue(v1.to_string()).into(),
ValueData::StringValue(v2.to_string()).into(),
],
}
}
fn test_name_to_column_id() -> HashMap<String, ColumnId> {
HashMap::from([("namespace".to_string(), 1), ("host".to_string(), 2)])
}
#[test]
fn test_encode_sparse() {
let name_to_column_id = test_name_to_column_id();
let encoder = RowModifier::new();
let table_id = 1025;
let schema = test_schema();
let row = test_row("greptimedb", "127.0.0.1");
let rows = Rows {
schema,
rows: vec![row],
};
let rows_iter = RowsIter::new(rows, &name_to_column_id);
let result = encoder.modify_rows_sparse(rows_iter, table_id).unwrap();
assert_eq!(result.rows[0].values.len(), 1);
let encoded_primary_key = vec![
128, 0, 0, 4, 1, 0, 0, 4, 1, 128, 0, 0, 3, 1, 131, 9, 166, 190, 173, 37, 39, 240, 0, 0,
0, 2, 1, 1, 49, 50, 55, 46, 48, 46, 48, 46, 9, 49, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1,
1, 1, 103, 114, 101, 112, 116, 105, 109, 101, 9, 100, 98, 0, 0, 0, 0, 0, 0, 2,
];
assert_eq!(
result.rows[0].values[0],
ValueData::BinaryValue(encoded_primary_key).into()
);
assert_eq!(result.schema, expected_sparse_schema());
}
fn expected_sparse_schema() -> Vec<ColumnSchema> {
vec![ColumnSchema {
column_name: PRIMARY_KEY_COLUMN_NAME.to_string(),
datatype: ColumnDataType::Binary as i32,
semantic_type: SemanticType::Tag as _,
datatype_extension: None,
options: None,
}]
}
fn expected_dense_schema() -> Vec<ColumnSchema> {
vec![
ColumnSchema {
column_name: "namespace".to_string(),
datatype: ColumnDataType::String as i32,
semantic_type: SemanticType::Tag as _,
datatype_extension: None,
options: None,
},
ColumnSchema {
column_name: "host".to_string(),
datatype: ColumnDataType::String as i32,
semantic_type: SemanticType::Tag as _,
datatype_extension: None,
options: None,
},
ColumnSchema {
column_name: DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string(),
datatype: ColumnDataType::Uint32 as i32,
semantic_type: SemanticType::Tag as _,
datatype_extension: None,
options: None,
},
ColumnSchema {
column_name: DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
datatype: ColumnDataType::Uint64 as i32,
semantic_type: SemanticType::Tag as _,
datatype_extension: None,
options: None,
},
]
}
#[test]
fn test_encode_dense() {
let name_to_column_id = test_name_to_column_id();
let encoder = RowModifier::new();
let table_id = 1025;
let schema = test_schema();
let row = test_row("greptimedb", "127.0.0.1");
let rows = Rows {
schema,
rows: vec![row],
};
let rows_iter = RowsIter::new(rows, &name_to_column_id);
let result = encoder.modify_rows_dense(rows_iter, table_id).unwrap();
assert_eq!(
result.rows[0].values[0],
ValueData::StringValue("greptimedb".to_string()).into()
);
assert_eq!(
result.rows[0].values[1],
ValueData::StringValue("127.0.0.1".to_string()).into()
);
assert_eq!(result.rows[0].values[2], ValueData::U32Value(1025).into());
assert_eq!(
result.rows[0].values[3],
ValueData::U64Value(9442261431637846000).into()
);
assert_eq!(result.schema, expected_dense_schema());
}
#[test]
fn test_fill_internal_columns() {
let name_to_column_id = test_name_to_column_id();
let encoder = RowModifier::new();
let table_id = 1025;
let schema = test_schema();
let row = test_row("greptimedb", "127.0.0.1");
let rows = Rows {
schema,
rows: vec![row],
};
let mut rows_iter = RowsIter::new(rows, &name_to_column_id);
let row_iter = rows_iter.iter_mut().next().unwrap();
let (encoded_table_id, tsid) = encoder.fill_internal_columns(table_id, &row_iter);
assert_eq!(encoded_table_id, ValueData::U32Value(1025).into());
assert_eq!(tsid, ValueData::U64Value(9442261431637846000).into());
// Change the column order
let schema = vec![
ColumnSchema {
column_name: "host".to_string(),
datatype: ColumnDataType::String as i32,
semantic_type: SemanticType::Tag as _,
datatype_extension: None,
options: None,
},
ColumnSchema {
column_name: "namespace".to_string(),
datatype: ColumnDataType::String as i32,
semantic_type: SemanticType::Tag as _,
datatype_extension: None,
options: None,
},
];
let row = test_row("127.0.0.1", "greptimedb");
let rows = Rows {
schema,
rows: vec![row],
};
let mut rows_iter = RowsIter::new(rows, &name_to_column_id);
let row_iter = rows_iter.iter_mut().next().unwrap();
let (encoded_table_id, tsid) = encoder.fill_internal_columns(table_id, &row_iter);
assert_eq!(encoded_table_id, ValueData::U32Value(1025).into());
assert_eq!(tsid, ValueData::U64Value(9442261431637846000).into());
}
}

View File

@@ -275,6 +275,7 @@ impl CpuDataGenerator {
schema: self.column_schemas.clone(),
rows,
}),
write_hint: None,
};
KeyValues::new(&self.metadata, mutation).unwrap()

View File

@@ -530,7 +530,10 @@ async fn test_absent_and_invalid_columns() {
rows,
};
let err = engine
.handle_request(region_id, RegionRequest::Put(RegionPutRequest { rows }))
.handle_request(
region_id,
RegionRequest::Put(RegionPutRequest { rows, hint: None }),
)
.await
.unwrap_err();
assert_eq!(StatusCode::InvalidArguments, err.status_code());

View File

@@ -129,7 +129,10 @@ async fn test_engine_open_readonly() {
let err = engine
.handle_request(
region_id,
RegionRequest::Put(RegionPutRequest { rows: rows.clone() }),
RegionRequest::Put(RegionPutRequest {
rows: rows.clone(),
hint: None,
}),
)
.await
.unwrap_err();

View File

@@ -74,7 +74,10 @@ async fn test_set_role_state_gracefully() {
let error = engine
.handle_request(
region_id,
RegionRequest::Put(RegionPutRequest { rows: rows.clone() }),
RegionRequest::Put(RegionPutRequest {
rows: rows.clone(),
hint: None,
}),
)
.await
.unwrap_err();
@@ -152,7 +155,13 @@ async fn test_write_downgrading_region() {
rows: build_rows(0, 42),
};
let err = engine
.handle_request(region_id, RegionRequest::Put(RegionPutRequest { rows }))
.handle_request(
region_id,
RegionRequest::Put(RegionPutRequest {
rows: rows.clone(),
hint: None,
}),
)
.await
.unwrap_err();
assert_eq!(err.status_code(), StatusCode::RegionNotReady)

View File

@@ -323,6 +323,7 @@ mod tests {
op_type: OpType::Put as i32,
sequence: START_SEQ,
rows: Some(rows),
write_hint: None,
}
}
@@ -360,6 +361,7 @@ mod tests {
op_type: OpType::Put as i32,
sequence: 100,
rows: None,
write_hint: None,
};
let kvs = KeyValues::new(&meta, mutation);
assert!(kvs.is_none());

View File

@@ -741,6 +741,7 @@ mod tests {
schema: column_schema,
rows,
}),
write_hint: None,
};
KeyValues::new(metadata.as_ref(), mutation).unwrap()
}

View File

@@ -1165,6 +1165,7 @@ mod tests {
schema: column_schema,
rows,
}),
write_hint: None,
};
KeyValues::new(schema.as_ref(), mutation).unwrap()
}

View File

@@ -137,6 +137,7 @@ impl RegionWriteCtx {
op_type,
sequence: self.next_sequence,
rows,
write_hint: None,
});
let notify = WriteNotify::new(tx, num_rows);

View File

@@ -42,7 +42,9 @@ struct SparsePrimaryKeyCodecInner {
// User defined label field
label_field: SortField,
// Columns in primary key
columns: HashSet<ColumnId>,
//
// None means all unknown columns is primary key(`Self::label_field`).
columns: Option<HashSet<ColumnId>>,
}
/// Sparse values representation.
@@ -85,18 +87,37 @@ impl SparsePrimaryKeyCodec {
table_id_field: SortField::new(ConcreteDataType::uint32_datatype()),
tsid_field: SortField::new(ConcreteDataType::uint64_datatype()),
label_field: SortField::new(ConcreteDataType::string_datatype()),
columns: region_metadata
.primary_key_columns()
.map(|c| c.column_id)
.collect(),
columns: Some(
region_metadata
.primary_key_columns()
.map(|c| c.column_id)
.collect(),
),
}),
}
}
/// Returns a new [`SparsePrimaryKeyCodec`] instance.
///
/// It treats all unknown columns as primary key(label field).
pub fn schemaless() -> Self {
Self {
inner: Arc::new(SparsePrimaryKeyCodecInner {
table_id_field: SortField::new(ConcreteDataType::uint32_datatype()),
tsid_field: SortField::new(ConcreteDataType::uint64_datatype()),
label_field: SortField::new(ConcreteDataType::string_datatype()),
columns: None,
}),
}
}
/// Returns the field of the given column id.
fn get_field(&self, column_id: ColumnId) -> Option<&SortField> {
if !self.inner.columns.contains(&column_id) {
return None;
// if the `columns` is not specified, all unknown columns is primary key(label field).
if let Some(columns) = &self.inner.columns {
if !columns.contains(&column_id) {
return None;
}
}
match column_id {
@@ -107,7 +128,7 @@ impl SparsePrimaryKeyCodec {
}
/// Encodes the given bytes into a [`SparseValues`].
pub(crate) fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
pub fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
where
I: Iterator<Item = (ColumnId, ValueRef<'a>)>,
{

View File

@@ -1049,7 +1049,10 @@ pub fn delete_rows_schema(request: &RegionCreateRequest) -> Vec<api::v1::ColumnS
pub async fn put_rows(engine: &MitoEngine, region_id: RegionId, rows: Rows) {
let num_rows = rows.rows.len();
let result = engine
.handle_request(region_id, RegionRequest::Put(RegionPutRequest { rows }))
.handle_request(
region_id,
RegionRequest::Put(RegionPutRequest { rows, hint: None }),
)
.await
.unwrap();
assert_eq!(num_rows, result.affected_rows);

View File

@@ -289,6 +289,7 @@ pub(crate) fn build_key_values_with_ts_seq_values(
schema: column_schema,
rows,
}),
write_hint: None,
};
KeyValues::new(metadata.as_ref(), mutation).unwrap()
}

View File

@@ -165,6 +165,7 @@ pub(crate) fn write_rows_to_version(
op_type: OpType::Put as i32,
sequence: start_ts as u64, // The sequence may be incorrect, but it's fine in test.
rows: Some(rows),
write_hint: None,
};
let key_values = KeyValues::new(&version.metadata, mutation).unwrap();
version.memtables.mutable.write(&key_values).unwrap();

View File

@@ -287,6 +287,7 @@ mod tests {
op_type: op_type as i32,
sequence,
rows: Some(Rows { schema, rows }),
write_hint: None,
}
}

View File

@@ -279,6 +279,7 @@ mod tests {
op_type: OpType::Put as i32,
sequence: 1u64,
rows: None,
write_hint: None,
}],
}
.encode_to_vec(),
@@ -292,6 +293,7 @@ mod tests {
op_type: OpType::Put as i32,
sequence: 2u64,
rows: None,
write_hint: None,
}],
}
.encode_to_vec(),
@@ -305,6 +307,7 @@ mod tests {
op_type: OpType::Put as i32,
sequence: 3u64,
rows: None,
write_hint: None,
}],
}
.encode_to_vec(),
@@ -348,6 +351,7 @@ mod tests {
op_type: OpType::Put as i32,
sequence: 1u64,
rows: None,
write_hint: None,
}],
}
)]
@@ -367,6 +371,7 @@ mod tests {
op_type: OpType::Put as i32,
sequence: 2u64,
rows: None,
write_hint: None,
}],
}
)]
@@ -382,6 +387,7 @@ mod tests {
op_type: OpType::Put as i32,
sequence: 1u64,
rows: None,
write_hint: None,
}],
};
let region2 = RegionId::new(1, 2);
@@ -390,6 +396,7 @@ mod tests {
op_type: OpType::Put as i32,
sequence: 3u64,
rows: None,
write_hint: None,
}],
};
let region3 = RegionId::new(1, 3);
@@ -398,6 +405,7 @@ mod tests {
op_type: OpType::Put as i32,
sequence: 3u64,
rows: None,
write_hint: None,
}],
};
let provider = Provider::kafka_provider("my_topic".to_string());
@@ -475,6 +483,7 @@ mod tests {
op_type: OpType::Put as i32,
sequence: 1u64,
rows: None,
write_hint: None,
}],
};
let region2 = RegionId::new(1, 2);
@@ -551,6 +560,7 @@ mod tests {
op_type: OpType::Put as i32,
sequence: 1u64,
rows: None,
write_hint: None,
}],
}
.encode_to_vec(),
@@ -564,6 +574,7 @@ mod tests {
op_type: OpType::Put as i32,
sequence: 2u64,
rows: None,
write_hint: None,
}],
}
.encode_to_vec(),
@@ -577,6 +588,7 @@ mod tests {
op_type: OpType::Put as i32,
sequence: 3u64,
rows: None,
write_hint: None,
}],
}
.encode_to_vec(),
@@ -590,6 +602,7 @@ mod tests {
op_type: OpType::Put as i32,
sequence: 4u64,
rows: None,
write_hint: None,
}],
}
.encode_to_vec(),
@@ -624,6 +637,7 @@ mod tests {
op_type: OpType::Put as i32,
sequence: 4u64,
rows: None,
write_hint: None,
}],
}
)]

View File

@@ -115,6 +115,7 @@ mod tests {
op_type: OpType::Put as i32,
sequence: 1u64,
rows: None,
write_hint: None,
}],
};
let encoded_entry = wal_entry.encode_to_vec();

View File

@@ -23,7 +23,7 @@ use api::v1::region::{
CompactRequest, CreateRequest, CreateRequests, DeleteRequests, DropRequest, DropRequests,
FlushRequest, InsertRequests, OpenRequest, TruncateRequest,
};
use api::v1::{self, set_index, Analyzer, Option as PbOption, Rows, SemanticType};
use api::v1::{self, set_index, Analyzer, Option as PbOption, Rows, SemanticType, WriteHint};
pub use common_base::AffectedRows;
use common_time::TimeToLive;
use datatypes::data_type::ConcreteDataType;
@@ -95,8 +95,12 @@ fn make_region_puts(inserts: InsertRequests) -> Result<Vec<(RegionId, RegionRequ
.into_iter()
.filter_map(|r| {
let region_id = r.region_id.into();
r.rows
.map(|rows| (region_id, RegionRequest::Put(RegionPutRequest { rows })))
r.rows.map(|rows| {
(
region_id,
RegionRequest::Put(RegionPutRequest { rows, hint: None }),
)
})
})
.collect();
Ok(requests)
@@ -232,6 +236,8 @@ fn make_region_truncate(truncate: TruncateRequest) -> Result<Vec<(RegionId, Regi
pub struct RegionPutRequest {
/// Rows to put.
pub rows: Rows,
/// Write hint.
pub hint: Option<WriteHint>,
}
#[derive(Debug)]