metric_engine/
batch_modifier.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::hash::Hasher;
16use std::sync::Arc;
17
18use datatypes::arrow::array::{Array, BinaryBuilder, StringArray, UInt64Array};
19use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
20use datatypes::arrow::record_batch::RecordBatch;
21use datatypes::value::ValueRef;
22use fxhash::FxHasher;
23use mito_codec::row_converter::SparsePrimaryKeyCodec;
24use snafu::ResultExt;
25use store_api::storage::ColumnId;
26use store_api::storage::consts::{PRIMARY_KEY_COLUMN_NAME, ReservedColumnId};
27
28use crate::error::{EncodePrimaryKeySnafu, Result, UnexpectedRequestSnafu};
29
30/// Info about a tag column for TSID computation and sparse primary key encoding.
31#[allow(dead_code)]
32pub(crate) struct TagColumnInfo {
33    /// Column name (used for label-name hash).
34    pub name: String,
35    /// Column index in the RecordBatch.
36    pub index: usize,
37    /// Column ID in the physical region.
38    pub column_id: ColumnId,
39}
40
41/// Computes `__tsid` values for each row.
42#[allow(dead_code)]
43pub(crate) fn compute_tsid_array(
44    batch: &RecordBatch,
45    sorted_tag_columns: &[TagColumnInfo],
46    tag_arrays: &[&StringArray],
47) -> UInt64Array {
48    let num_rows = batch.num_rows();
49
50    let label_name_hash = {
51        let mut hasher = FxHasher::default();
52        for tag_col in sorted_tag_columns {
53            hasher.write(tag_col.name.as_bytes());
54            hasher.write_u8(0xff);
55        }
56        hasher.finish()
57    };
58
59    let mut tsid_values = Vec::with_capacity(num_rows);
60    for row in 0..num_rows {
61        let has_null = tag_arrays.iter().any(|arr| arr.is_null(row));
62
63        let tsid = if !has_null {
64            let mut hasher = FxHasher::default();
65            hasher.write_u64(label_name_hash);
66            for arr in tag_arrays {
67                hasher.write(arr.value(row).as_bytes());
68                hasher.write_u8(0xff);
69            }
70            hasher.finish()
71        } else {
72            let mut name_hasher = FxHasher::default();
73            for (tc, arr) in sorted_tag_columns.iter().zip(tag_arrays.iter()) {
74                if !arr.is_null(row) {
75                    name_hasher.write(tc.name.as_bytes());
76                    name_hasher.write_u8(0xff);
77                }
78            }
79            let row_label_hash = name_hasher.finish();
80
81            let mut val_hasher = FxHasher::default();
82            val_hasher.write_u64(row_label_hash);
83            for arr in tag_arrays {
84                if !arr.is_null(row) {
85                    val_hasher.write(arr.value(row).as_bytes());
86                    val_hasher.write_u8(0xff);
87                }
88            }
89            val_hasher.finish()
90        };
91
92        tsid_values.push(tsid);
93    }
94
95    UInt64Array::from(tsid_values)
96}
97
98fn build_tag_arrays<'a>(
99    batch: &'a RecordBatch,
100    sorted_tag_columns: &[TagColumnInfo],
101) -> Vec<&'a StringArray> {
102    sorted_tag_columns
103        .iter()
104        .map(|tc| {
105            batch
106                .column(tc.index)
107                .as_any()
108                .downcast_ref::<StringArray>()
109                .expect("tag column must be utf8")
110        })
111        .collect()
112}
113
114/// Modifies a RecordBatch for sparse primary key encoding.
115#[allow(dead_code)]
116pub(crate) fn modify_batch_sparse(
117    batch: RecordBatch,
118    table_id: u32,
119    sorted_tag_columns: &[TagColumnInfo],
120    non_tag_column_indices: &[usize],
121) -> Result<RecordBatch> {
122    let num_rows = batch.num_rows();
123    let codec = SparsePrimaryKeyCodec::schemaless();
124    let tag_arrays: Vec<&StringArray> = build_tag_arrays(&batch, sorted_tag_columns);
125    let tsid_array = compute_tsid_array(&batch, sorted_tag_columns, &tag_arrays);
126
127    let mut pk_builder = BinaryBuilder::with_capacity(num_rows, 0);
128    let mut buffer = Vec::new();
129    for row in 0..num_rows {
130        buffer.clear();
131        let internal = [
132            (ReservedColumnId::table_id(), ValueRef::UInt32(table_id)),
133            (
134                ReservedColumnId::tsid(),
135                ValueRef::UInt64(tsid_array.value(row)),
136            ),
137        ];
138        codec
139            .encode_to_vec(internal.into_iter(), &mut buffer)
140            .context(EncodePrimaryKeySnafu)?;
141
142        let tags = sorted_tag_columns
143            .iter()
144            .zip(tag_arrays.iter())
145            .filter(|(_, arr)| !arr.is_null(row))
146            .map(|(tc, arr)| (tc.column_id, ValueRef::String(arr.value(row))));
147        codec
148            .encode_to_vec(tags, &mut buffer)
149            .context(EncodePrimaryKeySnafu)?;
150
151        pk_builder.append_value(&buffer);
152    }
153
154    let pk_array = pk_builder.finish();
155
156    let mut fields = vec![Arc::new(Field::new(
157        PRIMARY_KEY_COLUMN_NAME,
158        DataType::Binary,
159        false,
160    ))];
161    let mut columns: Vec<Arc<dyn Array>> = vec![Arc::new(pk_array)];
162
163    for &idx in non_tag_column_indices {
164        fields.push(batch.schema().fields()[idx].clone());
165        columns.push(batch.column(idx).clone());
166    }
167
168    let new_schema = Arc::new(ArrowSchema::new(fields));
169    RecordBatch::try_new(new_schema, columns).map_err(|e| {
170        UnexpectedRequestSnafu {
171            reason: format!("Failed to build modified sparse RecordBatch: {e}"),
172        }
173        .build()
174    })
175}
176
177#[cfg(test)]
178mod tests {
179    use std::collections::HashMap;
180    use std::sync::Arc;
181
182    use api::v1::value::ValueData;
183    use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value};
184    use datatypes::arrow::array::{BinaryArray, Int64Array, StringArray};
185    use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
186    use datatypes::arrow::record_batch::RecordBatch;
187    use store_api::codec::PrimaryKeyEncoding;
188    use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
189
190    use super::*;
191    use crate::row_modifier::{RowModifier, RowsIter, TableIdInput};
192
193    fn build_sparse_test_batch() -> RecordBatch {
194        let schema = Arc::new(ArrowSchema::new(vec![
195            Field::new("greptime_timestamp", DataType::Int64, false),
196            Field::new("greptime_value", DataType::Float64, true),
197            Field::new("namespace", DataType::Utf8, true),
198            Field::new("host", DataType::Utf8, true),
199        ]));
200        RecordBatch::try_new(
201            schema,
202            vec![
203                Arc::new(Int64Array::from(vec![1000])),
204                Arc::new(datatypes::arrow::array::Float64Array::from(vec![42.0])),
205                Arc::new(StringArray::from(vec!["greptimedb"])),
206                Arc::new(StringArray::from(vec!["127.0.0.1"])),
207            ],
208        )
209        .unwrap()
210    }
211
212    fn sparse_tag_columns() -> Vec<TagColumnInfo> {
213        vec![
214            TagColumnInfo {
215                name: "host".to_string(),
216                index: 3,
217                column_id: 3,
218            },
219            TagColumnInfo {
220                name: "namespace".to_string(),
221                index: 2,
222                column_id: 2,
223            },
224        ]
225    }
226
227    #[test]
228    fn test_compute_tsid_basic() {
229        let schema = Arc::new(ArrowSchema::new(vec![
230            Field::new("namespace", DataType::Utf8, true),
231            Field::new("host", DataType::Utf8, true),
232        ]));
233        let batch = RecordBatch::try_new(
234            schema,
235            vec![
236                Arc::new(StringArray::from(vec!["greptimedb"])),
237                Arc::new(StringArray::from(vec!["127.0.0.1"])),
238            ],
239        )
240        .unwrap();
241
242        let tag_columns: Vec<TagColumnInfo> = vec![
243            TagColumnInfo {
244                name: "host".to_string(),
245                index: 1,
246                column_id: 2,
247            },
248            TagColumnInfo {
249                name: "namespace".to_string(),
250                index: 0,
251                column_id: 1,
252            },
253        ];
254        let tag_arrays = build_tag_arrays(&batch, &tag_columns);
255        let tsid_array = compute_tsid_array(&batch, &tag_columns, &tag_arrays);
256
257        assert_eq!(tsid_array.value(0), 2721566936019240841);
258    }
259
260    #[test]
261    fn test_compute_tsid_with_nulls() {
262        let schema = Arc::new(ArrowSchema::new(vec![
263            Field::new("a", DataType::Utf8, true),
264            Field::new("b", DataType::Utf8, true),
265        ]));
266        let batch_no_null = RecordBatch::try_new(
267            schema.clone(),
268            vec![
269                Arc::new(StringArray::from(vec!["A"])),
270                Arc::new(StringArray::from(vec!["B"])),
271            ],
272        )
273        .unwrap();
274        let tag_cols_2: Vec<TagColumnInfo> = vec![
275            TagColumnInfo {
276                name: "a".to_string(),
277                index: 0,
278                column_id: 1,
279            },
280            TagColumnInfo {
281                name: "b".to_string(),
282                index: 1,
283                column_id: 2,
284            },
285        ];
286        let tag_arrays_2 = build_tag_arrays(&batch_no_null, &tag_cols_2);
287        let tsid_no_null = compute_tsid_array(&batch_no_null, &tag_cols_2, &tag_arrays_2);
288
289        let schema3 = Arc::new(ArrowSchema::new(vec![
290            Field::new("a", DataType::Utf8, true),
291            Field::new("b", DataType::Utf8, true),
292            Field::new("c", DataType::Utf8, true),
293        ]));
294        let batch_with_null = RecordBatch::try_new(
295            schema3,
296            vec![
297                Arc::new(StringArray::from(vec!["A"])),
298                Arc::new(StringArray::from(vec!["B"])),
299                Arc::new(StringArray::from(vec![None as Option<&str>])),
300            ],
301        )
302        .unwrap();
303        let tag_cols_3: Vec<TagColumnInfo> = vec![
304            TagColumnInfo {
305                name: "a".to_string(),
306                index: 0,
307                column_id: 1,
308            },
309            TagColumnInfo {
310                name: "b".to_string(),
311                index: 1,
312                column_id: 2,
313            },
314            TagColumnInfo {
315                name: "c".to_string(),
316                index: 2,
317                column_id: 3,
318            },
319        ];
320        let tag_arrays_3 = build_tag_arrays(&batch_with_null, &tag_cols_3);
321        let tsid_with_null = compute_tsid_array(&batch_with_null, &tag_cols_3, &tag_arrays_3);
322
323        assert_eq!(tsid_no_null.value(0), tsid_with_null.value(0));
324    }
325
326    #[test]
327    fn test_modify_batch_sparse() {
328        let batch = build_sparse_test_batch();
329        let tag_columns = sparse_tag_columns();
330        let non_tag_indices = vec![0, 1];
331        let table_id: u32 = 1025;
332
333        let modified =
334            modify_batch_sparse(batch, table_id, &tag_columns, &non_tag_indices).unwrap();
335
336        assert_eq!(modified.num_columns(), 3);
337        assert_eq!(modified.schema().field(0).name(), PRIMARY_KEY_COLUMN_NAME);
338        assert_eq!(modified.schema().field(1).name(), "greptime_timestamp");
339        assert_eq!(modified.schema().field(2).name(), "greptime_value");
340    }
341
342    #[test]
343    fn test_modify_batch_sparse_matches_row_modifier() {
344        let batch = build_sparse_test_batch();
345        let tag_columns = sparse_tag_columns();
346        let non_tag_indices = vec![0, 1];
347        let table_id: u32 = 1025;
348        let modified =
349            modify_batch_sparse(batch, table_id, &tag_columns, &non_tag_indices).unwrap();
350
351        let name_to_column_id: HashMap<String, ColumnId> = [
352            ("greptime_timestamp".to_string(), 0),
353            ("greptime_value".to_string(), 1),
354            ("namespace".to_string(), 2),
355            ("host".to_string(), 3),
356        ]
357        .into_iter()
358        .collect();
359
360        let rows = Rows {
361            schema: vec![
362                ColumnSchema {
363                    column_name: "greptime_timestamp".to_string(),
364                    datatype: ColumnDataType::TimestampMillisecond as i32,
365                    semantic_type: SemanticType::Timestamp as i32,
366                    ..Default::default()
367                },
368                ColumnSchema {
369                    column_name: "greptime_value".to_string(),
370                    datatype: ColumnDataType::Float64 as i32,
371                    semantic_type: SemanticType::Field as i32,
372                    ..Default::default()
373                },
374                ColumnSchema {
375                    column_name: "namespace".to_string(),
376                    datatype: ColumnDataType::String as i32,
377                    semantic_type: SemanticType::Tag as i32,
378                    ..Default::default()
379                },
380                ColumnSchema {
381                    column_name: "host".to_string(),
382                    datatype: ColumnDataType::String as i32,
383                    semantic_type: SemanticType::Tag as i32,
384                    ..Default::default()
385                },
386            ],
387            rows: vec![Row {
388                values: vec![
389                    Value {
390                        value_data: Some(ValueData::TimestampMillisecondValue(1000)),
391                    },
392                    Value {
393                        value_data: Some(ValueData::F64Value(42.0)),
394                    },
395                    Value {
396                        value_data: Some(ValueData::StringValue("greptimedb".to_string())),
397                    },
398                    Value {
399                        value_data: Some(ValueData::StringValue("127.0.0.1".to_string())),
400                    },
401                ],
402            }],
403        };
404
405        let row_iter = RowsIter::new(rows, &name_to_column_id);
406        let rows = RowModifier::default()
407            .modify_rows(
408                row_iter,
409                TableIdInput::Single(table_id),
410                PrimaryKeyEncoding::Sparse,
411            )
412            .unwrap();
413        let ValueData::BinaryValue(expected_pk) =
414            rows.rows[0].values[0].value_data.clone().unwrap()
415        else {
416            panic!("expected binary primary key");
417        };
418
419        let actual_array = modified
420            .column(0)
421            .as_any()
422            .downcast_ref::<BinaryArray>()
423            .unwrap();
424        assert_eq!(actual_array.value(0), expected_pk.as_slice());
425    }
426}