Skip to main content

metric_engine/engine/
bulk_insert.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use api::v1::{ArrowIpc, SemanticType};
18use bytes::Bytes;
19use common_grpc::flight::{FlightEncoder, FlightMessage};
20use datatypes::arrow::record_batch::RecordBatch;
21use snafu::{OptionExt, ensure};
22use store_api::codec::PrimaryKeyEncoding;
23use store_api::metadata::RegionMetadataRef;
24use store_api::region_request::{AffectedRows, RegionBulkInsertsRequest, RegionRequest};
25use store_api::storage::RegionId;
26
27use crate::batch_modifier::{TagColumnInfo, modify_batch_sparse};
28use crate::engine::MetricEngineInner;
29use crate::error;
30use crate::error::Result;
31use crate::metrics::MITO_OPERATION_ELAPSED;
32
33impl MetricEngineInner {
34    /// Bulk-inserts rows into a metric region.
35    ///
36    /// **Logical region path:** The request payload is a logical `RecordBatch`
37    /// (timestamp, value and tag columns). It is transformed to physical format
38    /// via `modify_batch_sparse`, encoded to Arrow IPC, and forwarded as a
39    /// `BulkInserts` request to the data region.
40    ///
41    /// **Physical region path:** The request payload is already in physical format
42    /// (produced by the batcher's `flush_batch_physical`). It is forwarded directly
43    /// to the data region with no transformation.
44    ///
45    /// Returns the number of affected rows, or `0` if the input batch is empty.
46    pub async fn bulk_insert_region(
47        &self,
48        region_id: RegionId,
49        request: RegionBulkInsertsRequest,
50    ) -> Result<AffectedRows> {
51        if request.payload.num_rows() == 0 {
52            return Ok(0);
53        }
54        if self.is_physical_region(region_id) {
55            let _timer = MITO_OPERATION_ELAPSED
56                .with_label_values(&["bulk_insert_physical"])
57                .start_timer();
58            return self.bulk_insert_physical_region(region_id, request).await;
59        }
60
61        let _timer = MITO_OPERATION_ELAPSED
62            .with_label_values(&["bulk_insert_logical"])
63            .start_timer();
64        self.bulk_insert_logical_region(region_id, request).await
65    }
66
67    /// Passthrough for bulk inserts targeting a physical data region.
68    ///
69    /// The batch is already in physical format (with `__primary_key`, timestamp,
70    /// value columns), so no logical-to-physical transformation is needed.
71    async fn bulk_insert_physical_region(
72        &self,
73        region_id: RegionId,
74        request: RegionBulkInsertsRequest,
75    ) -> Result<AffectedRows> {
76        self.data_region
77            .write_data(region_id, RegionRequest::BulkInserts(request))
78            .await
79    }
80
81    /// Bulk-inserts logical rows, transforming them to physical format first.
82    async fn bulk_insert_logical_region(
83        &self,
84        region_id: RegionId,
85        request: RegionBulkInsertsRequest,
86    ) -> Result<AffectedRows> {
87        let (physical_region_id, data_region_id, primary_key_encoding) =
88            self.find_data_region_meta(region_id)?;
89
90        if primary_key_encoding != PrimaryKeyEncoding::Sparse {
91            return error::UnsupportedRegionRequestSnafu {
92                request: RegionRequest::BulkInserts(request),
93            }
94            .fail();
95        }
96
97        let batch = request.payload;
98        if batch.num_rows() == 0 {
99            return Ok(0);
100        }
101
102        let logical_metadata = self
103            .logical_region_metadata(physical_region_id, region_id)
104            .await?;
105        let (tag_columns, non_tag_indices) = self.resolve_tag_columns_from_metadata(
106            region_id,
107            data_region_id,
108            &batch,
109            &logical_metadata,
110        )?;
111        let modified_batch = modify_batch_sparse(
112            batch.clone(),
113            region_id.table_id(),
114            &tag_columns,
115            &non_tag_indices,
116        )?;
117        let (schema, data_header, payload) = record_batch_to_ipc(&modified_batch)?;
118
119        let partition_expr_version = request.partition_expr_version;
120        let request = RegionBulkInsertsRequest {
121            region_id: data_region_id,
122            payload: modified_batch,
123            raw_data: ArrowIpc {
124                schema,
125                data_header,
126                payload,
127            },
128            partition_expr_version,
129        };
130        self.data_region
131            .write_data(data_region_id, RegionRequest::BulkInserts(request))
132            .await
133    }
134
135    fn resolve_tag_columns_from_metadata(
136        &self,
137        logical_region_id: RegionId,
138        data_region_id: RegionId,
139        batch: &RecordBatch,
140        logical_metadata: &RegionMetadataRef,
141    ) -> Result<(Vec<TagColumnInfo>, Vec<usize>)> {
142        let tag_names: HashSet<&str> = logical_metadata
143            .column_metadatas
144            .iter()
145            .filter_map(|column| {
146                if column.semantic_type == SemanticType::Tag {
147                    Some(column.column_schema.name.as_str())
148                } else {
149                    None
150                }
151            })
152            .collect();
153
154        let mut tag_columns = Vec::new();
155        let mut non_tag_indices = Vec::new();
156        {
157            let state = self.state.read().unwrap();
158            let physical_columns = state
159                .physical_region_states()
160                .get(&data_region_id)
161                .context(error::PhysicalRegionNotFoundSnafu {
162                    region_id: data_region_id,
163                })?
164                .physical_columns();
165
166            for (index, field) in batch.schema().fields().iter().enumerate() {
167                let name = field.name();
168                let column_id = physical_columns
169                    .get(name)
170                    .map(|info| info.column_id)
171                    .with_context(|| error::ColumnNotFoundSnafu {
172                        name: name.clone(),
173                        region_id: logical_region_id,
174                    })?;
175                if tag_names.contains(name.as_str()) {
176                    tag_columns.push(TagColumnInfo {
177                        name: name.clone(),
178                        index,
179                        column_id,
180                    });
181                } else {
182                    non_tag_indices.push(index);
183                }
184            }
185        }
186
187        tag_columns.sort_by(|a, b| a.name.cmp(&b.name));
188        Ok((tag_columns, non_tag_indices))
189    }
190}
191
192fn record_batch_to_ipc(record_batch: &RecordBatch) -> Result<(Bytes, Bytes, Bytes)> {
193    let mut encoder = FlightEncoder::default();
194    let schema = encoder.encode_schema(record_batch.schema().as_ref());
195    let mut iter = encoder
196        .encode(FlightMessage::RecordBatch(record_batch.clone()))
197        .into_iter();
198
199    let Some(flight_data) = iter.next() else {
200        return error::UnexpectedRequestSnafu {
201            reason: "Failed to encode empty flight data",
202        }
203        .fail();
204    };
205    ensure!(
206        iter.next().is_none(),
207        error::UnexpectedRequestSnafu {
208            reason: "Bulk insert RecordBatch with dictionary arrays is unsupported".to_string(),
209        }
210    );
211
212    Ok((
213        schema.data_header,
214        flight_data.data_header,
215        flight_data.data_body,
216    ))
217}
218
219#[cfg(test)]
220mod tests {
221    use std::assert_matches;
222    use std::sync::Arc;
223
224    use api::v1::ArrowIpc;
225    use common_error::ext::ErrorExt;
226    use common_query::prelude::{greptime_timestamp, greptime_value};
227    use common_recordbatch::RecordBatches;
228    use datatypes::arrow::array::{Float64Array, StringArray, TimestampMillisecondArray};
229    use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema, TimeUnit};
230    use datatypes::arrow::record_batch::RecordBatch;
231    use mito2::config::MitoConfig;
232    use store_api::metric_engine_consts::PRIMARY_KEY_ENCODING;
233    use store_api::path_utils::table_dir;
234    use store_api::region_engine::RegionEngine;
235    use store_api::region_request::{RegionBulkInsertsRequest, RegionPutRequest, RegionRequest};
236    use store_api::storage::{RegionId, ScanRequest};
237
238    use super::record_batch_to_ipc;
239    use crate::batch_modifier::{TagColumnInfo, modify_batch_sparse};
240    use crate::error::Error;
241    use crate::test_util::{self, TestEnv};
242
243    fn build_logical_batch(start: usize, rows: usize) -> RecordBatch {
244        let schema = Arc::new(ArrowSchema::new(vec![
245            Field::new(
246                greptime_timestamp(),
247                DataType::Timestamp(TimeUnit::Millisecond, None),
248                false,
249            ),
250            Field::new(greptime_value(), DataType::Float64, true),
251            Field::new("job", DataType::Utf8, true),
252        ]));
253
254        let mut ts = Vec::with_capacity(rows);
255        let mut values = Vec::with_capacity(rows);
256        let mut tags = Vec::with_capacity(rows);
257        for i in start..start + rows {
258            ts.push(i as i64);
259            values.push(i as f64);
260            tags.push("tag_0".to_string());
261        }
262
263        RecordBatch::try_new(
264            schema,
265            vec![
266                Arc::new(TimestampMillisecondArray::from(ts)),
267                Arc::new(Float64Array::from(values)),
268                Arc::new(StringArray::from(tags)),
269            ],
270        )
271        .unwrap()
272    }
273
274    fn build_bulk_request(logical_region_id: RegionId, batch: RecordBatch) -> RegionRequest {
275        let (schema, data_header, payload) = record_batch_to_ipc(&batch).unwrap();
276        RegionRequest::BulkInserts(RegionBulkInsertsRequest {
277            region_id: logical_region_id,
278            payload: batch,
279            raw_data: ArrowIpc {
280                schema,
281                data_header,
282                payload,
283            },
284            partition_expr_version: None,
285        })
286    }
287
288    async fn init_dense_metric_region(env: &TestEnv) -> RegionId {
289        let physical_region_id = env.default_physical_region_id();
290        env.create_physical_region(
291            physical_region_id,
292            &TestEnv::default_table_dir(),
293            vec![(PRIMARY_KEY_ENCODING.to_string(), "dense".to_string())],
294        )
295        .await;
296
297        let logical_region_id = env.default_logical_region_id();
298        let request = test_util::create_logical_region_request(
299            &["job"],
300            physical_region_id,
301            &table_dir("test", logical_region_id.table_id()),
302        );
303        env.metric()
304            .handle_request(logical_region_id, RegionRequest::Create(request))
305            .await
306            .unwrap();
307        logical_region_id
308    }
309
310    #[tokio::test]
311    async fn test_bulk_insert_empty_batch_returns_zero() {
312        let env = TestEnv::new().await;
313        env.init_metric_region().await;
314        let logical_region_id = env.default_logical_region_id();
315
316        let batch = build_logical_batch(0, 0);
317        let request = RegionRequest::BulkInserts(RegionBulkInsertsRequest {
318            region_id: logical_region_id,
319            payload: batch,
320            raw_data: ArrowIpc::default(),
321            partition_expr_version: None,
322        });
323        let response = env
324            .metric()
325            .handle_request(logical_region_id, request)
326            .await
327            .unwrap();
328        assert_eq!(response.affected_rows, 0);
329    }
330
331    #[tokio::test]
332    async fn test_bulk_insert_physical_region_passthrough() {
333        // Use flat format so that BulkMemtable is used (supports write_bulk).
334        let mito_config = MitoConfig {
335            default_flat_format: true,
336            ..Default::default()
337        };
338        let env = TestEnv::with_mito_config("", mito_config, Default::default()).await;
339        env.init_metric_region().await;
340        let physical_region_id = env.default_physical_region_id();
341        let logical_region_id = env.default_logical_region_id();
342
343        // First, do a normal logical bulk insert so we can compare results.
344        let logical_batch = build_logical_batch(0, 3);
345        let logical_request = build_bulk_request(logical_region_id, logical_batch.clone());
346        let response = env
347            .metric()
348            .handle_request(logical_region_id, logical_request)
349            .await
350            .unwrap();
351        assert_eq!(response.affected_rows, 3);
352
353        // Now build a physical-format batch using modify_batch_sparse (simulating
354        // what the batcher's flush_batch_physical does) and send it directly to
355        // the physical region.
356        let tag_columns = vec![TagColumnInfo {
357            name: "job".to_string(),
358            index: 2,
359            column_id: 2, // column_id for "job" in the physical table
360        }];
361        let non_tag_indices = vec![0, 1]; // timestamp, value
362        let second_batch = build_logical_batch(3, 3);
363        let physical_batch = modify_batch_sparse(
364            second_batch,
365            logical_region_id.table_id(),
366            &tag_columns,
367            &non_tag_indices,
368        )
369        .unwrap();
370        let request = build_bulk_request(physical_region_id, physical_batch);
371        let response = env
372            .metric()
373            .handle_request(physical_region_id, request)
374            .await
375            .unwrap();
376        assert_eq!(response.affected_rows, 3);
377
378        // Verify all 6 rows are readable from the logical region.
379        let stream = env
380            .metric()
381            .scan_to_stream(logical_region_id, ScanRequest::default())
382            .await
383            .unwrap();
384        let batches = RecordBatches::try_collect(stream).await.unwrap();
385        assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 6);
386    }
387
388    #[tokio::test]
389    async fn test_bulk_insert_physical_region_empty_batch() {
390        // Use flat format so that BulkMemtable is used (supports write_bulk).
391        let mito_config = MitoConfig {
392            default_flat_format: true,
393            ..Default::default()
394        };
395        let env = TestEnv::with_mito_config("", mito_config, Default::default()).await;
396        env.init_metric_region().await;
397        let physical_region_id = env.default_physical_region_id();
398
399        let batch = build_logical_batch(0, 0);
400        let request = build_bulk_request(physical_region_id, batch);
401        let response = env
402            .metric()
403            .handle_request(physical_region_id, request)
404            .await
405            .unwrap();
406        assert_eq!(response.affected_rows, 0);
407    }
408
409    #[tokio::test]
410    async fn test_bulk_insert_unknown_column_errors() {
411        let env = TestEnv::new().await;
412        env.init_metric_region().await;
413        let logical_region_id = env.default_logical_region_id();
414
415        let schema = Arc::new(ArrowSchema::new(vec![
416            Field::new(
417                greptime_timestamp(),
418                DataType::Timestamp(TimeUnit::Millisecond, None),
419                false,
420            ),
421            Field::new(greptime_value(), DataType::Float64, true),
422            Field::new("nonexistent_column", DataType::Utf8, true),
423        ]));
424        let batch = RecordBatch::try_new(
425            schema,
426            vec![
427                Arc::new(TimestampMillisecondArray::from(vec![0i64])),
428                Arc::new(Float64Array::from(vec![1.0])),
429                Arc::new(StringArray::from(vec!["val"])),
430            ],
431        )
432        .unwrap();
433
434        let request = build_bulk_request(logical_region_id, batch);
435        let err = env
436            .metric()
437            .handle_request(logical_region_id, request)
438            .await
439            .unwrap_err();
440        let Some(err) = err.as_any().downcast_ref::<Error>() else {
441            panic!("unexpected error type");
442        };
443        assert_matches!(err, Error::ColumnNotFound { .. });
444    }
445
446    #[tokio::test]
447    async fn test_bulk_insert_multiple_tag_columns() {
448        let env = TestEnv::new().await;
449        let physical_region_id = env.default_physical_region_id();
450        env.create_physical_region(physical_region_id, &TestEnv::default_table_dir(), vec![])
451            .await;
452        let logical_region_id = env.default_logical_region_id();
453        let request = test_util::create_logical_region_request(
454            &["host", "region"],
455            physical_region_id,
456            &table_dir("test", logical_region_id.table_id()),
457        );
458        env.metric()
459            .handle_request(logical_region_id, RegionRequest::Create(request))
460            .await
461            .unwrap();
462
463        let schema = Arc::new(ArrowSchema::new(vec![
464            Field::new(
465                greptime_timestamp(),
466                DataType::Timestamp(TimeUnit::Millisecond, None),
467                false,
468            ),
469            Field::new(greptime_value(), DataType::Float64, true),
470            Field::new("host", DataType::Utf8, true),
471            Field::new("region", DataType::Utf8, true),
472        ]));
473        let batch = RecordBatch::try_new(
474            schema,
475            vec![
476                Arc::new(TimestampMillisecondArray::from(vec![0i64, 1, 2])),
477                Arc::new(Float64Array::from(vec![10.0, 20.0, 30.0])),
478                Arc::new(StringArray::from(vec!["h1", "h2", "h1"])),
479                Arc::new(StringArray::from(vec!["us-east", "us-west", "eu-west"])),
480            ],
481        )
482        .unwrap();
483
484        let request = build_bulk_request(logical_region_id, batch);
485        let response = env
486            .metric()
487            .handle_request(logical_region_id, request)
488            .await
489            .unwrap();
490        assert_eq!(response.affected_rows, 3);
491
492        let stream = env
493            .metric()
494            .scan_to_stream(logical_region_id, ScanRequest::default())
495            .await
496            .unwrap();
497        let batches = RecordBatches::try_collect(stream).await.unwrap();
498        assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 3);
499    }
500
501    #[tokio::test]
502    async fn test_bulk_insert_accumulates_rows() {
503        let env = TestEnv::new().await;
504        env.init_metric_region().await;
505        let logical_region_id = env.default_logical_region_id();
506
507        let request = build_bulk_request(logical_region_id, build_logical_batch(0, 3));
508        let response = env
509            .metric()
510            .handle_request(logical_region_id, request)
511            .await
512            .unwrap();
513        assert_eq!(response.affected_rows, 3);
514
515        let request = build_bulk_request(logical_region_id, build_logical_batch(3, 5));
516        let response = env
517            .metric()
518            .handle_request(logical_region_id, request)
519            .await
520            .unwrap();
521        assert_eq!(response.affected_rows, 5);
522
523        let stream = env
524            .metric()
525            .scan_to_stream(logical_region_id, ScanRequest::default())
526            .await
527            .unwrap();
528        let batches = RecordBatches::try_collect(stream).await.unwrap();
529        assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 8);
530    }
531
532    #[tokio::test]
533    async fn test_bulk_insert_sparse_encoding() {
534        let env = TestEnv::new().await;
535        env.init_metric_region().await;
536        let logical_region_id = env.default_logical_region_id();
537
538        let request = build_bulk_request(logical_region_id, build_logical_batch(0, 4));
539        let response = env
540            .metric()
541            .handle_request(logical_region_id, request)
542            .await
543            .unwrap();
544        assert_eq!(response.affected_rows, 4);
545
546        let stream = env
547            .metric()
548            .scan_to_stream(logical_region_id, ScanRequest::default())
549            .await
550            .unwrap();
551        let batches = RecordBatches::try_collect(stream).await.unwrap();
552        assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 4);
553    }
554
555    #[tokio::test]
556    async fn test_bulk_insert_dense_encoding_rejected() {
557        let env = TestEnv::new().await;
558        let logical_region_id = init_dense_metric_region(&env).await;
559
560        let request = build_bulk_request(logical_region_id, build_logical_batch(0, 2));
561        let err = env
562            .metric()
563            .handle_request(logical_region_id, request)
564            .await
565            .unwrap_err();
566        let Some(err) = err.as_any().downcast_ref::<Error>() else {
567            panic!("unexpected error type");
568        };
569        assert_matches!(err, Error::UnsupportedRegionRequest { .. });
570    }
571
572    #[tokio::test]
573    async fn test_bulk_insert_matches_put() {
574        let env_put = TestEnv::new().await;
575        env_put.init_metric_region().await;
576        let logical_region_id = env_put.default_logical_region_id();
577        let schema = test_util::row_schema_with_tags(&["job"]);
578        let rows = test_util::build_rows(1, 5);
579        env_put
580            .metric()
581            .handle_request(
582                logical_region_id,
583                RegionRequest::Put(RegionPutRequest {
584                    rows: api::v1::Rows { schema, rows },
585                    hint: None,
586                    partition_expr_version: None,
587                }),
588            )
589            .await
590            .unwrap();
591        let put_stream = env_put
592            .metric()
593            .scan_to_stream(logical_region_id, ScanRequest::default())
594            .await
595            .unwrap();
596        let put_batches = RecordBatches::try_collect(put_stream).await.unwrap();
597        let put_output = put_batches.pretty_print().unwrap();
598
599        let env_bulk = TestEnv::new().await;
600        env_bulk.init_metric_region().await;
601        let request = build_bulk_request(logical_region_id, build_logical_batch(0, 5));
602        env_bulk
603            .metric()
604            .handle_request(logical_region_id, request)
605            .await
606            .unwrap();
607        let bulk_stream = env_bulk
608            .metric()
609            .scan_to_stream(logical_region_id, ScanRequest::default())
610            .await
611            .unwrap();
612        let bulk_batches = RecordBatches::try_collect(bulk_stream).await.unwrap();
613        let bulk_output = bulk_batches.pretty_print().unwrap();
614
615        assert_eq!(put_output, bulk_output);
616    }
617}