mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-26 08:00:01 +00:00
feat: skip filling NULL for put and delete requests (#3364)
* feat: optimize for sparse data Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * remove old structures Signed-off-by: Ruihang Xia <waynestxia@gmail.com> --------- Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
@@ -28,7 +28,7 @@ pub struct KeyValues {
|
||||
/// must not be `None`.
|
||||
mutation: Mutation,
|
||||
/// Key value read helper.
|
||||
helper: ReadRowHelper,
|
||||
helper: SparseReadRowHelper,
|
||||
}
|
||||
|
||||
impl KeyValues {
|
||||
@@ -37,7 +37,7 @@ impl KeyValues {
|
||||
/// Returns `None` if `rows` of the `mutation` is `None`.
|
||||
pub fn new(metadata: &RegionMetadata, mutation: Mutation) -> Option<KeyValues> {
|
||||
let rows = mutation.rows.as_ref()?;
|
||||
let helper = ReadRowHelper::new(metadata, rows);
|
||||
let helper = SparseReadRowHelper::new(metadata, rows);
|
||||
|
||||
Some(KeyValues { mutation, helper })
|
||||
}
|
||||
@@ -75,7 +75,7 @@ impl KeyValues {
|
||||
pub struct KeyValue<'a> {
|
||||
row: &'a Row,
|
||||
schema: &'a Vec<ColumnSchema>,
|
||||
helper: &'a ReadRowHelper,
|
||||
helper: &'a SparseReadRowHelper,
|
||||
sequence: SequenceNumber,
|
||||
op_type: OpType,
|
||||
}
|
||||
@@ -85,11 +85,12 @@ impl<'a> KeyValue<'a> {
|
||||
pub fn primary_keys(&self) -> impl Iterator<Item = ValueRef> {
|
||||
self.helper.indices[..self.helper.num_primary_key_column]
|
||||
.iter()
|
||||
.map(|idx| {
|
||||
api::helper::pb_value_to_value_ref(
|
||||
&self.row.values[*idx],
|
||||
&self.schema[*idx].datatype_extension,
|
||||
)
|
||||
.map(|idx| match idx {
|
||||
Some(i) => api::helper::pb_value_to_value_ref(
|
||||
&self.row.values[*i],
|
||||
&self.schema[*i].datatype_extension,
|
||||
),
|
||||
None => ValueRef::Null,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -97,18 +98,19 @@ impl<'a> KeyValue<'a> {
|
||||
pub fn fields(&self) -> impl Iterator<Item = ValueRef> {
|
||||
self.helper.indices[self.helper.num_primary_key_column + 1..]
|
||||
.iter()
|
||||
.map(|idx| {
|
||||
api::helper::pb_value_to_value_ref(
|
||||
&self.row.values[*idx],
|
||||
&self.schema[*idx].datatype_extension,
|
||||
)
|
||||
.map(|idx| match idx {
|
||||
Some(i) => api::helper::pb_value_to_value_ref(
|
||||
&self.row.values[*i],
|
||||
&self.schema[*i].datatype_extension,
|
||||
),
|
||||
None => ValueRef::Null,
|
||||
})
|
||||
}
|
||||
|
||||
/// Get timestamp.
|
||||
pub fn timestamp(&self) -> ValueRef {
|
||||
// Timestamp is primitive, we clone it.
|
||||
let index = self.helper.indices[self.helper.num_primary_key_column];
|
||||
let index = self.helper.indices[self.helper.num_primary_key_column].unwrap();
|
||||
api::helper::pb_value_to_value_ref(
|
||||
&self.row.values[index],
|
||||
&self.schema[index].datatype_extension,
|
||||
@@ -136,33 +138,24 @@ impl<'a> KeyValue<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper to read rows in key, value order.
|
||||
/// Helper to read rows in key, value order for sparse data.
|
||||
#[derive(Debug)]
|
||||
struct ReadRowHelper {
|
||||
struct SparseReadRowHelper {
|
||||
/// Key and value column indices.
|
||||
///
|
||||
/// `indices[..num_primary_key_column]` are primary key columns, `indices[num_primary_key_column]`
|
||||
/// is the timestamp column and remainings are field columns.
|
||||
indices: Vec<usize>,
|
||||
indices: Vec<Option<usize>>,
|
||||
/// Number of primary key columns.
|
||||
num_primary_key_column: usize,
|
||||
}
|
||||
|
||||
impl ReadRowHelper {
|
||||
/// Creates a [ReadRowHelper] for specific `rows`.
|
||||
impl SparseReadRowHelper {
|
||||
/// Creates a [SparseReadRowHelper] for specific `rows`.
|
||||
///
|
||||
/// # Panics
|
||||
/// The `rows` must fill their missing columns first and have same columns with `metadata`.
|
||||
/// Otherwise this method will panic.
|
||||
fn new(metadata: &RegionMetadata, rows: &Rows) -> ReadRowHelper {
|
||||
assert_eq!(
|
||||
metadata.column_metadatas.len(),
|
||||
rows.schema.len(),
|
||||
"Length mismatch, column_metas: {:?}, rows_schema: {:?}",
|
||||
metadata.column_metadatas,
|
||||
rows.schema
|
||||
);
|
||||
|
||||
/// Time index column must exist.
|
||||
fn new(metadata: &RegionMetadata, rows: &Rows) -> SparseReadRowHelper {
|
||||
// Build a name to index mapping for rows.
|
||||
let name_to_index: HashMap<_, _> = rows
|
||||
.schema
|
||||
@@ -173,25 +166,27 @@ impl ReadRowHelper {
|
||||
let mut indices = Vec::with_capacity(metadata.column_metadatas.len());
|
||||
|
||||
// Get primary key indices.
|
||||
for pk_id in &metadata.primary_key {
|
||||
for pk_column_id in &metadata.primary_key {
|
||||
// Safety: Id comes from primary key.
|
||||
let column = metadata.column_by_id(*pk_id).unwrap();
|
||||
let index = name_to_index.get(&column.column_schema.name).unwrap();
|
||||
indices.push(*index);
|
||||
let column = metadata.column_by_id(*pk_column_id).unwrap();
|
||||
let index = name_to_index.get(&column.column_schema.name);
|
||||
indices.push(index.copied());
|
||||
}
|
||||
// Get timestamp index.
|
||||
// Safety: time index must exist
|
||||
let ts_index = name_to_index
|
||||
.get(&metadata.time_index_column().column_schema.name)
|
||||
.unwrap();
|
||||
indices.push(*ts_index);
|
||||
indices.push(Some(*ts_index));
|
||||
|
||||
// Iterate columns and find field columns.
|
||||
for column in metadata.field_columns() {
|
||||
// Get index in request for each field column.
|
||||
let index = name_to_index.get(&column.column_schema.name).unwrap();
|
||||
indices.push(*index);
|
||||
let index = name_to_index.get(&column.column_schema.name);
|
||||
indices.push(index.copied());
|
||||
}
|
||||
|
||||
ReadRowHelper {
|
||||
SparseReadRowHelper {
|
||||
indices,
|
||||
num_primary_key_column: metadata.primary_key.len(),
|
||||
}
|
||||
|
||||
@@ -247,6 +247,10 @@ impl WriteRequest {
|
||||
// Need to add a default value for this column.
|
||||
let proto_value = self.column_default_value(column)?;
|
||||
|
||||
if proto_value.value_data.is_none() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Insert default value to each row.
|
||||
for row in &mut self.rows.rows {
|
||||
row.values.push(proto_value.clone());
|
||||
@@ -989,16 +993,13 @@ mod tests {
|
||||
request.fill_missing_columns(&metadata).unwrap();
|
||||
|
||||
let expect_rows = Rows {
|
||||
schema: vec![
|
||||
new_column_schema(
|
||||
"ts",
|
||||
ColumnDataType::TimestampMillisecond,
|
||||
SemanticType::Timestamp,
|
||||
),
|
||||
new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
|
||||
],
|
||||
schema: vec![new_column_schema(
|
||||
"ts",
|
||||
ColumnDataType::TimestampMillisecond,
|
||||
SemanticType::Timestamp,
|
||||
)],
|
||||
rows: vec![Row {
|
||||
values: vec![ts_ms_value(1), Value { value_data: None }],
|
||||
values: vec![ts_ms_value(1)],
|
||||
}],
|
||||
};
|
||||
assert_eq!(expect_rows, request.rows);
|
||||
@@ -1104,17 +1105,11 @@ mod tests {
|
||||
ColumnDataType::TimestampMillisecond,
|
||||
SemanticType::Timestamp,
|
||||
),
|
||||
new_column_schema("f0", ColumnDataType::Int64, SemanticType::Field),
|
||||
new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
|
||||
],
|
||||
// Column f1 is not nullable and we use 0 for padding.
|
||||
rows: vec![Row {
|
||||
values: vec![
|
||||
i64_value(100),
|
||||
ts_ms_value(1),
|
||||
Value { value_data: None },
|
||||
i64_value(0),
|
||||
],
|
||||
values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
|
||||
}],
|
||||
};
|
||||
assert_eq!(expect_rows, request.rows);
|
||||
@@ -1173,17 +1168,11 @@ mod tests {
|
||||
ColumnDataType::TimestampMillisecond,
|
||||
SemanticType::Timestamp,
|
||||
),
|
||||
new_column_schema("f0", ColumnDataType::Int64, SemanticType::Field),
|
||||
new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
|
||||
],
|
||||
// Column f1 is not nullable and we use 0 for padding.
|
||||
rows: vec![Row {
|
||||
values: vec![
|
||||
i64_value(100),
|
||||
ts_ms_value(1),
|
||||
Value { value_data: None },
|
||||
i64_value(0),
|
||||
],
|
||||
values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
|
||||
}],
|
||||
};
|
||||
assert_eq!(expect_rows, request.rows);
|
||||
|
||||
Reference in New Issue
Block a user