From 7341f23019db3af619e5f37f1257f7d2d42a8b25 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 22 Feb 2024 22:30:43 +0800 Subject: [PATCH] feat: skip filling NULL for put and delete requests (#3364) * feat: optimize for sparse data Signed-off-by: Ruihang Xia * remove old structures Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- src/mito2/src/memtable/key_values.rs | 71 +++++++++++++--------------- src/mito2/src/request.rs | 35 +++++--------- 2 files changed, 45 insertions(+), 61 deletions(-) diff --git a/src/mito2/src/memtable/key_values.rs b/src/mito2/src/memtable/key_values.rs index 10854d23a8..29e7b0beca 100644 --- a/src/mito2/src/memtable/key_values.rs +++ b/src/mito2/src/memtable/key_values.rs @@ -28,7 +28,7 @@ pub struct KeyValues { /// must not be `None`. mutation: Mutation, /// Key value read helper. - helper: ReadRowHelper, + helper: SparseReadRowHelper, } impl KeyValues { @@ -37,7 +37,7 @@ impl KeyValues { /// Returns `None` if `rows` of the `mutation` is `None`. pub fn new(metadata: &RegionMetadata, mutation: Mutation) -> Option { let rows = mutation.rows.as_ref()?; - let helper = ReadRowHelper::new(metadata, rows); + let helper = SparseReadRowHelper::new(metadata, rows); Some(KeyValues { mutation, helper }) } @@ -75,7 +75,7 @@ impl KeyValues { pub struct KeyValue<'a> { row: &'a Row, schema: &'a Vec, - helper: &'a ReadRowHelper, + helper: &'a SparseReadRowHelper, sequence: SequenceNumber, op_type: OpType, } @@ -85,11 +85,12 @@ impl<'a> KeyValue<'a> { pub fn primary_keys(&self) -> impl Iterator { self.helper.indices[..self.helper.num_primary_key_column] .iter() - .map(|idx| { - api::helper::pb_value_to_value_ref( - &self.row.values[*idx], - &self.schema[*idx].datatype_extension, - ) + .map(|idx| match idx { + Some(i) => api::helper::pb_value_to_value_ref( + &self.row.values[*i], + &self.schema[*i].datatype_extension, + ), + None => ValueRef::Null, }) } @@ -97,18 +98,19 @@ impl<'a> KeyValue<'a> { pub fn fields(&self) -> impl Iterator { self.helper.indices[self.helper.num_primary_key_column + 1..] .iter() - .map(|idx| { - api::helper::pb_value_to_value_ref( - &self.row.values[*idx], - &self.schema[*idx].datatype_extension, - ) + .map(|idx| match idx { + Some(i) => api::helper::pb_value_to_value_ref( + &self.row.values[*i], + &self.schema[*i].datatype_extension, + ), + None => ValueRef::Null, }) } /// Get timestamp. pub fn timestamp(&self) -> ValueRef { // Timestamp is primitive, we clone it. - let index = self.helper.indices[self.helper.num_primary_key_column]; + let index = self.helper.indices[self.helper.num_primary_key_column].unwrap(); api::helper::pb_value_to_value_ref( &self.row.values[index], &self.schema[index].datatype_extension, @@ -136,33 +138,24 @@ impl<'a> KeyValue<'a> { } } -/// Helper to read rows in key, value order. +/// Helper to read rows in key, value order for sparse data. #[derive(Debug)] -struct ReadRowHelper { +struct SparseReadRowHelper { /// Key and value column indices. /// /// `indices[..num_primary_key_column]` are primary key columns, `indices[num_primary_key_column]` /// is the timestamp column and remainings are field columns. - indices: Vec, + indices: Vec>, /// Number of primary key columns. num_primary_key_column: usize, } -impl ReadRowHelper { - /// Creates a [ReadRowHelper] for specific `rows`. +impl SparseReadRowHelper { + /// Creates a [SparseReadRowHelper] for specific `rows`. /// /// # Panics - /// The `rows` must fill their missing columns first and have same columns with `metadata`. - /// Otherwise this method will panic. - fn new(metadata: &RegionMetadata, rows: &Rows) -> ReadRowHelper { - assert_eq!( - metadata.column_metadatas.len(), - rows.schema.len(), - "Length mismatch, column_metas: {:?}, rows_schema: {:?}", - metadata.column_metadatas, - rows.schema - ); - + /// Time index column must exist. + fn new(metadata: &RegionMetadata, rows: &Rows) -> SparseReadRowHelper { // Build a name to index mapping for rows. let name_to_index: HashMap<_, _> = rows .schema @@ -173,25 +166,27 @@ impl ReadRowHelper { let mut indices = Vec::with_capacity(metadata.column_metadatas.len()); // Get primary key indices. - for pk_id in &metadata.primary_key { + for pk_column_id in &metadata.primary_key { // Safety: Id comes from primary key. - let column = metadata.column_by_id(*pk_id).unwrap(); - let index = name_to_index.get(&column.column_schema.name).unwrap(); - indices.push(*index); + let column = metadata.column_by_id(*pk_column_id).unwrap(); + let index = name_to_index.get(&column.column_schema.name); + indices.push(index.copied()); } // Get timestamp index. + // Safety: time index must exist let ts_index = name_to_index .get(&metadata.time_index_column().column_schema.name) .unwrap(); - indices.push(*ts_index); + indices.push(Some(*ts_index)); + // Iterate columns and find field columns. for column in metadata.field_columns() { // Get index in request for each field column. - let index = name_to_index.get(&column.column_schema.name).unwrap(); - indices.push(*index); + let index = name_to_index.get(&column.column_schema.name); + indices.push(index.copied()); } - ReadRowHelper { + SparseReadRowHelper { indices, num_primary_key_column: metadata.primary_key.len(), } diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index 2d440f6451..f475db5ad8 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -247,6 +247,10 @@ impl WriteRequest { // Need to add a default value for this column. let proto_value = self.column_default_value(column)?; + if proto_value.value_data.is_none() { + return Ok(()); + } + // Insert default value to each row. for row in &mut self.rows.rows { row.values.push(proto_value.clone()); @@ -989,16 +993,13 @@ mod tests { request.fill_missing_columns(&metadata).unwrap(); let expect_rows = Rows { - schema: vec![ - new_column_schema( - "ts", - ColumnDataType::TimestampMillisecond, - SemanticType::Timestamp, - ), - new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag), - ], + schema: vec![new_column_schema( + "ts", + ColumnDataType::TimestampMillisecond, + SemanticType::Timestamp, + )], rows: vec![Row { - values: vec![ts_ms_value(1), Value { value_data: None }], + values: vec![ts_ms_value(1)], }], }; assert_eq!(expect_rows, request.rows); @@ -1104,17 +1105,11 @@ mod tests { ColumnDataType::TimestampMillisecond, SemanticType::Timestamp, ), - new_column_schema("f0", ColumnDataType::Int64, SemanticType::Field), new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field), ], // Column f1 is not nullable and we use 0 for padding. rows: vec![Row { - values: vec![ - i64_value(100), - ts_ms_value(1), - Value { value_data: None }, - i64_value(0), - ], + values: vec![i64_value(100), ts_ms_value(1), i64_value(0)], }], }; assert_eq!(expect_rows, request.rows); @@ -1173,17 +1168,11 @@ mod tests { ColumnDataType::TimestampMillisecond, SemanticType::Timestamp, ), - new_column_schema("f0", ColumnDataType::Int64, SemanticType::Field), new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field), ], // Column f1 is not nullable and we use 0 for padding. rows: vec![Row { - values: vec![ - i64_value(100), - ts_ms_value(1), - Value { value_data: None }, - i64_value(0), - ], + values: vec![i64_value(100), ts_ms_value(1), i64_value(0)], }], }; assert_eq!(expect_rows, request.rows);