1use std::collections::HashSet;
16
17use api::v1::{ArrowIpc, SemanticType};
18use bytes::Bytes;
19use common_grpc::flight::{FlightEncoder, FlightMessage};
20use datatypes::arrow::record_batch::RecordBatch;
21use snafu::{OptionExt, ensure};
22use store_api::codec::PrimaryKeyEncoding;
23use store_api::metadata::RegionMetadataRef;
24use store_api::region_request::{AffectedRows, RegionBulkInsertsRequest, RegionRequest};
25use store_api::storage::RegionId;
26
27use crate::batch_modifier::{TagColumnInfo, modify_batch_sparse};
28use crate::engine::MetricEngineInner;
29use crate::error;
30use crate::error::Result;
31use crate::metrics::MITO_OPERATION_ELAPSED;
32
33impl MetricEngineInner {
34 pub async fn bulk_insert_region(
47 &self,
48 region_id: RegionId,
49 request: RegionBulkInsertsRequest,
50 ) -> Result<AffectedRows> {
51 if request.payload.num_rows() == 0 {
52 return Ok(0);
53 }
54 if self.is_physical_region(region_id) {
55 let _timer = MITO_OPERATION_ELAPSED
56 .with_label_values(&["bulk_insert_physical"])
57 .start_timer();
58 return self.bulk_insert_physical_region(region_id, request).await;
59 }
60
61 let _timer = MITO_OPERATION_ELAPSED
62 .with_label_values(&["bulk_insert_logical"])
63 .start_timer();
64 self.bulk_insert_logical_region(region_id, request).await
65 }
66
67 async fn bulk_insert_physical_region(
72 &self,
73 region_id: RegionId,
74 request: RegionBulkInsertsRequest,
75 ) -> Result<AffectedRows> {
76 self.data_region
77 .write_data(region_id, RegionRequest::BulkInserts(request))
78 .await
79 }
80
81 async fn bulk_insert_logical_region(
83 &self,
84 region_id: RegionId,
85 request: RegionBulkInsertsRequest,
86 ) -> Result<AffectedRows> {
87 let (physical_region_id, data_region_id, primary_key_encoding) =
88 self.find_data_region_meta(region_id)?;
89
90 if primary_key_encoding != PrimaryKeyEncoding::Sparse {
91 return error::UnsupportedRegionRequestSnafu {
92 request: RegionRequest::BulkInserts(request),
93 }
94 .fail();
95 }
96
97 let batch = request.payload;
98 if batch.num_rows() == 0 {
99 return Ok(0);
100 }
101
102 let logical_metadata = self
103 .logical_region_metadata(physical_region_id, region_id)
104 .await?;
105 let (tag_columns, non_tag_indices) = self.resolve_tag_columns_from_metadata(
106 region_id,
107 data_region_id,
108 &batch,
109 &logical_metadata,
110 )?;
111 let modified_batch = modify_batch_sparse(
112 batch.clone(),
113 region_id.table_id(),
114 &tag_columns,
115 &non_tag_indices,
116 )?;
117 let (schema, data_header, payload) = record_batch_to_ipc(&modified_batch)?;
118
119 let partition_expr_version = request.partition_expr_version;
120 let request = RegionBulkInsertsRequest {
121 region_id: data_region_id,
122 payload: modified_batch,
123 raw_data: ArrowIpc {
124 schema,
125 data_header,
126 payload,
127 },
128 partition_expr_version,
129 };
130 self.data_region
131 .write_data(data_region_id, RegionRequest::BulkInserts(request))
132 .await
133 }
134
135 fn resolve_tag_columns_from_metadata(
136 &self,
137 logical_region_id: RegionId,
138 data_region_id: RegionId,
139 batch: &RecordBatch,
140 logical_metadata: &RegionMetadataRef,
141 ) -> Result<(Vec<TagColumnInfo>, Vec<usize>)> {
142 let tag_names: HashSet<&str> = logical_metadata
143 .column_metadatas
144 .iter()
145 .filter_map(|column| {
146 if column.semantic_type == SemanticType::Tag {
147 Some(column.column_schema.name.as_str())
148 } else {
149 None
150 }
151 })
152 .collect();
153
154 let mut tag_columns = Vec::new();
155 let mut non_tag_indices = Vec::new();
156 {
157 let state = self.state.read().unwrap();
158 let physical_columns = state
159 .physical_region_states()
160 .get(&data_region_id)
161 .context(error::PhysicalRegionNotFoundSnafu {
162 region_id: data_region_id,
163 })?
164 .physical_columns();
165
166 for (index, field) in batch.schema().fields().iter().enumerate() {
167 let name = field.name();
168 let column_id = physical_columns
169 .get(name)
170 .map(|info| info.column_id)
171 .with_context(|| error::ColumnNotFoundSnafu {
172 name: name.clone(),
173 region_id: logical_region_id,
174 })?;
175 if tag_names.contains(name.as_str()) {
176 tag_columns.push(TagColumnInfo {
177 name: name.clone(),
178 index,
179 column_id,
180 });
181 } else {
182 non_tag_indices.push(index);
183 }
184 }
185 }
186
187 tag_columns.sort_by(|a, b| a.name.cmp(&b.name));
188 Ok((tag_columns, non_tag_indices))
189 }
190}
191
192fn record_batch_to_ipc(record_batch: &RecordBatch) -> Result<(Bytes, Bytes, Bytes)> {
193 let mut encoder = FlightEncoder::default();
194 let schema = encoder.encode_schema(record_batch.schema().as_ref());
195 let mut iter = encoder
196 .encode(FlightMessage::RecordBatch(record_batch.clone()))
197 .into_iter();
198
199 let Some(flight_data) = iter.next() else {
200 return error::UnexpectedRequestSnafu {
201 reason: "Failed to encode empty flight data",
202 }
203 .fail();
204 };
205 ensure!(
206 iter.next().is_none(),
207 error::UnexpectedRequestSnafu {
208 reason: "Bulk insert RecordBatch with dictionary arrays is unsupported".to_string(),
209 }
210 );
211
212 Ok((
213 schema.data_header,
214 flight_data.data_header,
215 flight_data.data_body,
216 ))
217}
218
219#[cfg(test)]
220mod tests {
221 use std::assert_matches;
222 use std::sync::Arc;
223
224 use api::v1::ArrowIpc;
225 use common_error::ext::ErrorExt;
226 use common_query::prelude::{greptime_timestamp, greptime_value};
227 use common_recordbatch::RecordBatches;
228 use datatypes::arrow::array::{Float64Array, StringArray, TimestampMillisecondArray};
229 use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema, TimeUnit};
230 use datatypes::arrow::record_batch::RecordBatch;
231 use mito2::config::MitoConfig;
232 use store_api::metric_engine_consts::PRIMARY_KEY_ENCODING;
233 use store_api::path_utils::table_dir;
234 use store_api::region_engine::RegionEngine;
235 use store_api::region_request::{RegionBulkInsertsRequest, RegionPutRequest, RegionRequest};
236 use store_api::storage::{RegionId, ScanRequest};
237
238 use super::record_batch_to_ipc;
239 use crate::batch_modifier::{TagColumnInfo, modify_batch_sparse};
240 use crate::error::Error;
241 use crate::test_util::{self, TestEnv};
242
243 fn build_logical_batch(start: usize, rows: usize) -> RecordBatch {
244 let schema = Arc::new(ArrowSchema::new(vec![
245 Field::new(
246 greptime_timestamp(),
247 DataType::Timestamp(TimeUnit::Millisecond, None),
248 false,
249 ),
250 Field::new(greptime_value(), DataType::Float64, true),
251 Field::new("job", DataType::Utf8, true),
252 ]));
253
254 let mut ts = Vec::with_capacity(rows);
255 let mut values = Vec::with_capacity(rows);
256 let mut tags = Vec::with_capacity(rows);
257 for i in start..start + rows {
258 ts.push(i as i64);
259 values.push(i as f64);
260 tags.push("tag_0".to_string());
261 }
262
263 RecordBatch::try_new(
264 schema,
265 vec![
266 Arc::new(TimestampMillisecondArray::from(ts)),
267 Arc::new(Float64Array::from(values)),
268 Arc::new(StringArray::from(tags)),
269 ],
270 )
271 .unwrap()
272 }
273
274 fn build_bulk_request(logical_region_id: RegionId, batch: RecordBatch) -> RegionRequest {
275 let (schema, data_header, payload) = record_batch_to_ipc(&batch).unwrap();
276 RegionRequest::BulkInserts(RegionBulkInsertsRequest {
277 region_id: logical_region_id,
278 payload: batch,
279 raw_data: ArrowIpc {
280 schema,
281 data_header,
282 payload,
283 },
284 partition_expr_version: None,
285 })
286 }
287
288 async fn init_dense_metric_region(env: &TestEnv) -> RegionId {
289 let physical_region_id = env.default_physical_region_id();
290 env.create_physical_region(
291 physical_region_id,
292 &TestEnv::default_table_dir(),
293 vec![(PRIMARY_KEY_ENCODING.to_string(), "dense".to_string())],
294 )
295 .await;
296
297 let logical_region_id = env.default_logical_region_id();
298 let request = test_util::create_logical_region_request(
299 &["job"],
300 physical_region_id,
301 &table_dir("test", logical_region_id.table_id()),
302 );
303 env.metric()
304 .handle_request(logical_region_id, RegionRequest::Create(request))
305 .await
306 .unwrap();
307 logical_region_id
308 }
309
310 #[tokio::test]
311 async fn test_bulk_insert_empty_batch_returns_zero() {
312 let env = TestEnv::new().await;
313 env.init_metric_region().await;
314 let logical_region_id = env.default_logical_region_id();
315
316 let batch = build_logical_batch(0, 0);
317 let request = RegionRequest::BulkInserts(RegionBulkInsertsRequest {
318 region_id: logical_region_id,
319 payload: batch,
320 raw_data: ArrowIpc::default(),
321 partition_expr_version: None,
322 });
323 let response = env
324 .metric()
325 .handle_request(logical_region_id, request)
326 .await
327 .unwrap();
328 assert_eq!(response.affected_rows, 0);
329 }
330
331 #[tokio::test]
332 async fn test_bulk_insert_physical_region_passthrough() {
333 let mito_config = MitoConfig {
335 default_flat_format: true,
336 ..Default::default()
337 };
338 let env = TestEnv::with_mito_config("", mito_config, Default::default()).await;
339 env.init_metric_region().await;
340 let physical_region_id = env.default_physical_region_id();
341 let logical_region_id = env.default_logical_region_id();
342
343 let logical_batch = build_logical_batch(0, 3);
345 let logical_request = build_bulk_request(logical_region_id, logical_batch.clone());
346 let response = env
347 .metric()
348 .handle_request(logical_region_id, logical_request)
349 .await
350 .unwrap();
351 assert_eq!(response.affected_rows, 3);
352
353 let tag_columns = vec![TagColumnInfo {
357 name: "job".to_string(),
358 index: 2,
359 column_id: 2, }];
361 let non_tag_indices = vec![0, 1]; let second_batch = build_logical_batch(3, 3);
363 let physical_batch = modify_batch_sparse(
364 second_batch,
365 logical_region_id.table_id(),
366 &tag_columns,
367 &non_tag_indices,
368 )
369 .unwrap();
370 let request = build_bulk_request(physical_region_id, physical_batch);
371 let response = env
372 .metric()
373 .handle_request(physical_region_id, request)
374 .await
375 .unwrap();
376 assert_eq!(response.affected_rows, 3);
377
378 let stream = env
380 .metric()
381 .scan_to_stream(logical_region_id, ScanRequest::default())
382 .await
383 .unwrap();
384 let batches = RecordBatches::try_collect(stream).await.unwrap();
385 assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 6);
386 }
387
388 #[tokio::test]
389 async fn test_bulk_insert_physical_region_empty_batch() {
390 let mito_config = MitoConfig {
392 default_flat_format: true,
393 ..Default::default()
394 };
395 let env = TestEnv::with_mito_config("", mito_config, Default::default()).await;
396 env.init_metric_region().await;
397 let physical_region_id = env.default_physical_region_id();
398
399 let batch = build_logical_batch(0, 0);
400 let request = build_bulk_request(physical_region_id, batch);
401 let response = env
402 .metric()
403 .handle_request(physical_region_id, request)
404 .await
405 .unwrap();
406 assert_eq!(response.affected_rows, 0);
407 }
408
409 #[tokio::test]
410 async fn test_bulk_insert_unknown_column_errors() {
411 let env = TestEnv::new().await;
412 env.init_metric_region().await;
413 let logical_region_id = env.default_logical_region_id();
414
415 let schema = Arc::new(ArrowSchema::new(vec![
416 Field::new(
417 greptime_timestamp(),
418 DataType::Timestamp(TimeUnit::Millisecond, None),
419 false,
420 ),
421 Field::new(greptime_value(), DataType::Float64, true),
422 Field::new("nonexistent_column", DataType::Utf8, true),
423 ]));
424 let batch = RecordBatch::try_new(
425 schema,
426 vec![
427 Arc::new(TimestampMillisecondArray::from(vec![0i64])),
428 Arc::new(Float64Array::from(vec![1.0])),
429 Arc::new(StringArray::from(vec!["val"])),
430 ],
431 )
432 .unwrap();
433
434 let request = build_bulk_request(logical_region_id, batch);
435 let err = env
436 .metric()
437 .handle_request(logical_region_id, request)
438 .await
439 .unwrap_err();
440 let Some(err) = err.as_any().downcast_ref::<Error>() else {
441 panic!("unexpected error type");
442 };
443 assert_matches!(err, Error::ColumnNotFound { .. });
444 }
445
446 #[tokio::test]
447 async fn test_bulk_insert_multiple_tag_columns() {
448 let env = TestEnv::new().await;
449 let physical_region_id = env.default_physical_region_id();
450 env.create_physical_region(physical_region_id, &TestEnv::default_table_dir(), vec![])
451 .await;
452 let logical_region_id = env.default_logical_region_id();
453 let request = test_util::create_logical_region_request(
454 &["host", "region"],
455 physical_region_id,
456 &table_dir("test", logical_region_id.table_id()),
457 );
458 env.metric()
459 .handle_request(logical_region_id, RegionRequest::Create(request))
460 .await
461 .unwrap();
462
463 let schema = Arc::new(ArrowSchema::new(vec![
464 Field::new(
465 greptime_timestamp(),
466 DataType::Timestamp(TimeUnit::Millisecond, None),
467 false,
468 ),
469 Field::new(greptime_value(), DataType::Float64, true),
470 Field::new("host", DataType::Utf8, true),
471 Field::new("region", DataType::Utf8, true),
472 ]));
473 let batch = RecordBatch::try_new(
474 schema,
475 vec![
476 Arc::new(TimestampMillisecondArray::from(vec![0i64, 1, 2])),
477 Arc::new(Float64Array::from(vec![10.0, 20.0, 30.0])),
478 Arc::new(StringArray::from(vec!["h1", "h2", "h1"])),
479 Arc::new(StringArray::from(vec!["us-east", "us-west", "eu-west"])),
480 ],
481 )
482 .unwrap();
483
484 let request = build_bulk_request(logical_region_id, batch);
485 let response = env
486 .metric()
487 .handle_request(logical_region_id, request)
488 .await
489 .unwrap();
490 assert_eq!(response.affected_rows, 3);
491
492 let stream = env
493 .metric()
494 .scan_to_stream(logical_region_id, ScanRequest::default())
495 .await
496 .unwrap();
497 let batches = RecordBatches::try_collect(stream).await.unwrap();
498 assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 3);
499 }
500
501 #[tokio::test]
502 async fn test_bulk_insert_accumulates_rows() {
503 let env = TestEnv::new().await;
504 env.init_metric_region().await;
505 let logical_region_id = env.default_logical_region_id();
506
507 let request = build_bulk_request(logical_region_id, build_logical_batch(0, 3));
508 let response = env
509 .metric()
510 .handle_request(logical_region_id, request)
511 .await
512 .unwrap();
513 assert_eq!(response.affected_rows, 3);
514
515 let request = build_bulk_request(logical_region_id, build_logical_batch(3, 5));
516 let response = env
517 .metric()
518 .handle_request(logical_region_id, request)
519 .await
520 .unwrap();
521 assert_eq!(response.affected_rows, 5);
522
523 let stream = env
524 .metric()
525 .scan_to_stream(logical_region_id, ScanRequest::default())
526 .await
527 .unwrap();
528 let batches = RecordBatches::try_collect(stream).await.unwrap();
529 assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 8);
530 }
531
532 #[tokio::test]
533 async fn test_bulk_insert_sparse_encoding() {
534 let env = TestEnv::new().await;
535 env.init_metric_region().await;
536 let logical_region_id = env.default_logical_region_id();
537
538 let request = build_bulk_request(logical_region_id, build_logical_batch(0, 4));
539 let response = env
540 .metric()
541 .handle_request(logical_region_id, request)
542 .await
543 .unwrap();
544 assert_eq!(response.affected_rows, 4);
545
546 let stream = env
547 .metric()
548 .scan_to_stream(logical_region_id, ScanRequest::default())
549 .await
550 .unwrap();
551 let batches = RecordBatches::try_collect(stream).await.unwrap();
552 assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 4);
553 }
554
555 #[tokio::test]
556 async fn test_bulk_insert_dense_encoding_rejected() {
557 let env = TestEnv::new().await;
558 let logical_region_id = init_dense_metric_region(&env).await;
559
560 let request = build_bulk_request(logical_region_id, build_logical_batch(0, 2));
561 let err = env
562 .metric()
563 .handle_request(logical_region_id, request)
564 .await
565 .unwrap_err();
566 let Some(err) = err.as_any().downcast_ref::<Error>() else {
567 panic!("unexpected error type");
568 };
569 assert_matches!(err, Error::UnsupportedRegionRequest { .. });
570 }
571
572 #[tokio::test]
573 async fn test_bulk_insert_matches_put() {
574 let env_put = TestEnv::new().await;
575 env_put.init_metric_region().await;
576 let logical_region_id = env_put.default_logical_region_id();
577 let schema = test_util::row_schema_with_tags(&["job"]);
578 let rows = test_util::build_rows(1, 5);
579 env_put
580 .metric()
581 .handle_request(
582 logical_region_id,
583 RegionRequest::Put(RegionPutRequest {
584 rows: api::v1::Rows { schema, rows },
585 hint: None,
586 partition_expr_version: None,
587 }),
588 )
589 .await
590 .unwrap();
591 let put_stream = env_put
592 .metric()
593 .scan_to_stream(logical_region_id, ScanRequest::default())
594 .await
595 .unwrap();
596 let put_batches = RecordBatches::try_collect(put_stream).await.unwrap();
597 let put_output = put_batches.pretty_print().unwrap();
598
599 let env_bulk = TestEnv::new().await;
600 env_bulk.init_metric_region().await;
601 let request = build_bulk_request(logical_region_id, build_logical_batch(0, 5));
602 env_bulk
603 .metric()
604 .handle_request(logical_region_id, request)
605 .await
606 .unwrap();
607 let bulk_stream = env_bulk
608 .metric()
609 .scan_to_stream(logical_region_id, ScanRequest::default())
610 .await
611 .unwrap();
612 let bulk_batches = RecordBatches::try_collect(bulk_stream).await.unwrap();
613 let bulk_output = bulk_batches.pretty_print().unwrap();
614
615 assert_eq!(put_output, bulk_output);
616 }
617}