feat(mito): bulk insert request handling on datanode (#5831)

* wip: implement basic request handling

* feat/bulk-insert:
 ### Add Error Handling and Enhance Bulk Insert Functionality

 - **Error Handling**: Introduced a new error variant `ConvertDataType` in `error.rs` to handle conversion failures from `ConcreteDataType` to `ColumnDataType`.
 - **Bulk Insert Enhancements**:
   - Updated `WorkerRequest::BulkInserts` in `request.rs` to include metadata and sender.
   - Implemented `handle_bulk_inserts` in `worker.rs` to process bulk insert requests with region metadata.
   - Added functions `region_metadata_to_column_schema` and `record_batch_to_rows` in `handle_bulk_insert.rs` for schema conversion and row processing.
 - **API Changes**: Modified `RegionBulkInsertsRequest` in `region_request.rs` to include `region_id`.

 Files affected: `error.rs`, `request.rs`, `worker.rs`, `handle_bulk_insert.rs`, `region_request.rs`.

* feat/bulk-insert:
 **Enhance Error Handling and Add Unit Tests**

 - Improved error handling in `record_batch_to_rows` function within `handle_bulk_insert.rs` by returning `Result` and handling errors with `context`.
 - Added unit tests for `region_metadata_to_column_schema` and `record_batch_to_rows` functions in `handle_bulk_insert.rs` to ensure correct functionality and error handling.

* chore: update proto version

* feat/bulk-insert:
 - **Refactor Error Handling**: Updated error handling in `error.rs` by modifying the `ConvertDataType` error handling.
 - **Improve Logging and Error Reporting**: Enhanced logging and error reporting in `worker.rs` by adding error messages for missing region metadata.
 - **Add New Error Type**: Introduced `DecodeArrowIpc` error in `metadata.rs` to handle Arrow IPC decoding failures.
 - **Handle Arrow IPC Decoding**: Updated `region_request.rs` to handle Arrow IPC decoding errors using the new `DecodeArrowIpc` error type.

* chore: update proto version

* feat/bulk-insert:
 Refactor `handle_bulk_insert.rs` to simplify row construction

 - Removed the mutable `current_row` vector and refactored `row_at` function to return a new vector directly.
 - Updated `record_batch_to_rows` to utilize the refactored `row_at` function for constructing rows.

* feat/bulk-insert:
 ### Commit Summary

 **Enhancements in Region Server Request Handling**

 - Updated `region_server.rs` to include `RegionRequest::BulkInserts(_)` in the `RegionChange::Ingest` category, improving the handling of bulk insert operations.
 - Refined the categorization of region requests to ensure accurate mapping to `RegionChange` actions.
This commit is contained in:
Lei, HUANG
2025-04-15 22:11:50 +08:00
committed by GitHub
parent dcf1a486f6
commit 799c7cbfa9
12 changed files with 387 additions and 18 deletions

2
Cargo.lock generated
View File

@@ -4722,7 +4722,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=dd4a1996982534636734674db66e44464b0c0d83#dd4a1996982534636734674db66e44464b0c0d83"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=583daa3fbbbe39c90b7b92d13646bc3291d9c941#583daa3fbbbe39c90b7b92d13646bc3291d9c941"
dependencies = [
"prost 0.13.5",
"serde",

View File

@@ -129,7 +129,7 @@ etcd-client = "0.14"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "dd4a1996982534636734674db66e44464b0c0d83" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "583daa3fbbbe39c90b7b92d13646bc3291d9c941" }
hex = "0.4"
http = "1"
humantime = "2.1"

View File

@@ -902,7 +902,9 @@ impl RegionServerInner {
RegionChange::Register(attribute)
}
RegionRequest::Close(_) | RegionRequest::Drop(_) => RegionChange::Deregisters,
RegionRequest::Put(_) | RegionRequest::Delete(_) => RegionChange::Ingest,
RegionRequest::Put(_) | RegionRequest::Delete(_) | RegionRequest::BulkInserts(_) => {
RegionChange::Ingest
}
RegionRequest::Alter(_)
| RegionRequest::Flush(_)
| RegionRequest::Compact(_)

View File

@@ -221,6 +221,10 @@ impl RegionEngine for MetricEngine {
}
}
RegionRequest::Catchup(req) => self.inner.catchup_region(region_id, req).await,
RegionRequest::BulkInserts(_) => {
// todo(hl): find a way to support bulk inserts in metric engine.
UnsupportedRegionRequestSnafu { request }.fail()
}
};
result.map_err(BoxedError::new).map(|rows| RegionResponse {

View File

@@ -1021,6 +1021,17 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Failed to convert ConcreteDataType to ColumnDataType: {:?}",
data_type
))]
ConvertDataType {
data_type: ConcreteDataType,
source: api::error::Error,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -1172,6 +1183,7 @@ impl ErrorExt for Error {
ManualCompactionOverride {} => StatusCode::Cancelled,
IncompatibleWalProviderChange { .. } => StatusCode::InvalidArguments,
ConvertDataType { .. } => StatusCode::Internal,
}
}

View File

@@ -35,9 +35,9 @@ use store_api::manifest::ManifestVersion;
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
use store_api::region_engine::{SetRegionRoleStateResponse, SettableRegionRoleState};
use store_api::region_request::{
AffectedRows, RegionAlterRequest, RegionCatchupRequest, RegionCloseRequest,
RegionCompactRequest, RegionCreateRequest, RegionFlushRequest, RegionOpenRequest,
RegionRequest, RegionTruncateRequest,
AffectedRows, RegionAlterRequest, RegionBulkInsertsRequest, RegionCatchupRequest,
RegionCloseRequest, RegionCompactRequest, RegionCreateRequest, RegionFlushRequest,
RegionOpenRequest, RegionRequest, RegionTruncateRequest,
};
use store_api::storage::{RegionId, SequenceNumber};
use tokio::sync::oneshot::{self, Receiver, Sender};
@@ -569,6 +569,13 @@ pub(crate) enum WorkerRequest {
/// Keep the manifest of a region up to date.
SyncRegion(RegionSyncRequest),
/// Bulk inserts request and region metadata.
BulkInserts {
metadata: Option<RegionMetadataRef>,
request: RegionBulkInsertsRequest,
sender: OptionOutputTx,
},
}
impl WorkerRequest {
@@ -668,6 +675,11 @@ impl WorkerRequest {
sender: sender.into(),
request: DdlRequest::Catchup(v),
}),
RegionRequest::BulkInserts(region_bulk_inserts_request) => WorkerRequest::BulkInserts {
metadata: region_metadata,
sender: sender.into(),
request: region_bulk_inserts_request,
},
};
Ok((worker_request, receiver))

View File

@@ -15,6 +15,7 @@
//! Structs and utilities for writing regions.
mod handle_alter;
mod handle_bulk_insert;
mod handle_catchup;
mod handle_close;
mod handle_compaction;
@@ -25,6 +26,7 @@ mod handle_manifest;
mod handle_open;
mod handle_truncate;
mod handle_write;
use std::collections::HashMap;
use std::path::Path;
use std::sync::atomic::{AtomicBool, Ordering};
@@ -52,6 +54,7 @@ use crate::cache::write_cache::{WriteCache, WriteCacheRef};
use crate::cache::{CacheManager, CacheManagerRef};
use crate::compaction::CompactionScheduler;
use crate::config::MitoConfig;
use crate::error;
use crate::error::{CreateDirSnafu, JoinSnafu, Result, WorkerStoppedSnafu};
use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef};
use crate::memtable::MemtableBuilderProvider;
@@ -820,16 +823,30 @@ impl<S: LogStore> RegionWorkerLoop<S> {
WorkerRequest::EditRegion(request) => {
self.handle_region_edit(request).await;
}
// We receive a stop signal, but we still want to process remaining
// requests. The worker thread will then check the running flag and
// then exit.
WorkerRequest::Stop => {
debug_assert!(!self.running.load(Ordering::Relaxed));
}
WorkerRequest::SyncRegion(req) => {
self.handle_region_sync(req).await;
}
WorkerRequest::BulkInserts {
metadata,
request,
sender,
} => {
if let Some(region_metadata) = metadata {
self.handle_bulk_inserts(request, region_metadata, write_requests, sender)
.await;
} else {
error!("Cannot find region metadata for {}", request.region_id);
sender.send(
error::RegionNotFoundSnafu {
region_id: request.region_id,
}
.fail(),
);
}
}
}
}

View File

@@ -0,0 +1,247 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Handles bulk insert requests.
use api::helper::{value_to_grpc_value, ColumnDataTypeWrapper};
use api::v1::{ColumnSchema, OpType, Row, Rows};
use common_recordbatch::DfRecordBatch;
use datatypes::prelude::VectorRef;
use datatypes::vectors::Helper;
use snafu::ResultExt;
use store_api::logstore::LogStore;
use store_api::metadata::RegionMetadataRef;
use store_api::region_request::{BulkInsertPayload, RegionBulkInsertsRequest};
use crate::error;
use crate::request::{OptionOutputTx, SenderWriteRequest, WriteRequest};
use crate::worker::RegionWorkerLoop;
impl<S: LogStore> RegionWorkerLoop<S> {
pub(crate) async fn handle_bulk_inserts(
&mut self,
request: RegionBulkInsertsRequest,
region_metadata: RegionMetadataRef,
pending_write_requests: &mut Vec<SenderWriteRequest>,
sender: OptionOutputTx,
) {
let schema = match region_metadata_to_column_schema(&region_metadata) {
Ok(schema) => schema,
Err(e) => {
sender.send(Err(e));
return;
}
};
let mut pending_tasks = Vec::with_capacity(request.payloads.len());
for req in request.payloads {
match req {
BulkInsertPayload::ArrowIpc(df_record_batch) => {
let rows = match record_batch_to_rows(&region_metadata, &df_record_batch) {
Ok(rows) => rows,
Err(e) => {
sender.send(Err(e));
return;
}
};
let write_request = match WriteRequest::new(
region_metadata.region_id,
OpType::Put,
Rows {
schema: schema.clone(),
rows,
},
Some(region_metadata.clone()),
) {
Ok(write_request) => write_request,
Err(e) => {
sender.send(Err(e));
return;
}
};
let (tx, rx) = tokio::sync::oneshot::channel();
let sender = OptionOutputTx::from(tx);
let req = SenderWriteRequest {
sender,
request: write_request,
};
pending_tasks.push(rx);
pending_write_requests.push(req);
}
}
}
common_runtime::spawn_global(async move {
let results = match futures::future::try_join_all(pending_tasks).await {
Ok(results) => results,
Err(e) => {
sender.send(Err(e).context(error::RecvSnafu));
return;
}
};
let result1 = match results.into_iter().collect::<error::Result<Vec<_>>>() {
Ok(results) => Ok(results.into_iter().sum()),
Err(e) => Err(e),
};
sender.send(result1);
});
}
}
fn region_metadata_to_column_schema(
region_meta: &RegionMetadataRef,
) -> error::Result<Vec<ColumnSchema>> {
region_meta
.column_metadatas
.iter()
.map(|c| {
let wrapper = ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())
.with_context(|_| error::ConvertDataTypeSnafu {
data_type: c.column_schema.data_type.clone(),
})?;
Ok(ColumnSchema {
column_name: c.column_schema.name.clone(),
datatype: wrapper.datatype() as i32,
semantic_type: c.semantic_type as i32,
..Default::default()
})
})
.collect::<error::Result<_>>()
}
/// Convert [DfRecordBatch] to gRPC rows.
fn record_batch_to_rows(
region_metadata: &RegionMetadataRef,
rb: &DfRecordBatch,
) -> error::Result<Vec<Row>> {
let num_rows = rb.num_rows();
let mut rows = Vec::with_capacity(num_rows);
if num_rows == 0 {
return Ok(rows);
}
let vectors: Vec<Option<VectorRef>> = region_metadata
.column_metadatas
.iter()
.map(|c| {
rb.column_by_name(&c.column_schema.name)
.map(|column| Helper::try_into_vector(column).context(error::ConvertVectorSnafu))
.transpose()
})
.collect::<error::Result<_>>()?;
for row_idx in 0..num_rows {
let row = Row {
values: row_at(&vectors, row_idx),
};
rows.push(row);
}
Ok(rows)
}
fn row_at(vectors: &[Option<VectorRef>], row_idx: usize) -> Vec<api::v1::Value> {
let mut row = Vec::with_capacity(vectors.len());
for a in vectors {
let value = if let Some(a) = a {
value_to_grpc_value(a.get(row_idx))
} else {
api::v1::Value { value_data: None }
};
row.push(value)
}
row
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use api::v1::SemanticType;
use datatypes::arrow::array::{Int64Array, TimestampMillisecondArray};
use super::*;
use crate::test_util::meta_util::TestRegionMetadataBuilder;
fn build_record_batch(num_rows: usize) -> DfRecordBatch {
let region_metadata = Arc::new(TestRegionMetadataBuilder::default().build());
let schema = region_metadata.schema.arrow_schema().clone();
let values = (0..num_rows).map(|v| v as i64).collect::<Vec<_>>();
let ts_array = Arc::new(TimestampMillisecondArray::from_iter_values(values.clone()));
let k0_array = Arc::new(Int64Array::from_iter_values(values.clone()));
let v0_array = Arc::new(Int64Array::from_iter_values(values));
DfRecordBatch::try_new(schema, vec![ts_array, k0_array, v0_array]).unwrap()
}
#[test]
fn test_region_metadata_to_column_schema() {
let region_metadata = Arc::new(TestRegionMetadataBuilder::default().build());
let result = region_metadata_to_column_schema(&region_metadata).unwrap();
assert_eq!(result.len(), 3);
assert_eq!(result[0].column_name, "ts");
assert_eq!(result[0].semantic_type, SemanticType::Timestamp as i32);
assert_eq!(result[1].column_name, "k0");
assert_eq!(result[1].semantic_type, SemanticType::Tag as i32);
assert_eq!(result[2].column_name, "v0");
assert_eq!(result[2].semantic_type, SemanticType::Field as i32);
}
#[test]
fn test_record_batch_to_rows() {
// Create record batch
let region_metadata = Arc::new(TestRegionMetadataBuilder::default().build());
let record_batch = build_record_batch(10);
let rows = record_batch_to_rows(&region_metadata, &record_batch).unwrap();
assert_eq!(rows.len(), 10);
assert_eq!(rows[0].values.len(), 3);
for (row_idx, row) in rows.iter().enumerate().take(10) {
assert_eq!(
row.values[0].value_data.as_ref().unwrap(),
&api::v1::value::ValueData::TimestampMillisecondValue(row_idx as i64)
);
}
}
#[test]
fn test_record_batch_to_rows_schema_mismatch() {
let region_metadata = Arc::new(TestRegionMetadataBuilder::default().num_fields(2).build());
let record_batch = build_record_batch(1);
let rows = record_batch_to_rows(&region_metadata, &record_batch).unwrap();
assert_eq!(rows.len(), 1);
// Check first row
let row1 = &rows[0];
assert_eq!(row1.values.len(), 4);
assert_eq!(
row1.values[0].value_data.as_ref().unwrap(),
&api::v1::value::ValueData::TimestampMillisecondValue(0)
);
assert_eq!(
row1.values[1].value_data.as_ref().unwrap(),
&api::v1::value::ValueData::I64Value(0)
);
assert_eq!(
row1.values[2].value_data.as_ref().unwrap(),
&api::v1::value::ValueData::I64Value(0)
);
assert!(row1.values[3].value_data.is_none());
}
}

View File

@@ -299,7 +299,7 @@ impl<S> RegionWorkerLoop<S> {
}
/// Returns true if the engine needs to reject some write requests.
fn should_reject_write(&self) -> bool {
pub(crate) fn should_reject_write(&self) -> bool {
// If memory usage reaches high threshold (we should also consider stalled requests) returns true.
self.write_buffer_manager.memory_usage() + self.stalled_requests.estimated_size
>= self.config.global_write_buffer_reject_size.as_bytes() as usize

View File

@@ -1,4 +1,3 @@
#![feature(let_chains)]
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
@@ -15,6 +14,9 @@
//! Storage related APIs
#![feature(let_chains)]
#![feature(iterator_try_collect)]
pub mod codec;
pub mod data_source;
pub mod logstore;

View File

@@ -27,6 +27,7 @@ use api::v1::SemanticType;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use datatypes::arrow;
use datatypes::arrow::datatypes::FieldRef;
use datatypes::schema::{ColumnSchema, FulltextOptions, Schema, SchemaRef, SkippingIndexOptions};
use serde::de::Error;
@@ -957,6 +958,14 @@ pub enum MetadataError {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to decode arrow ipc record batches"))]
DecodeArrowIpc {
#[snafu(source)]
error: arrow::error::ArrowError,
#[snafu(implicit)]
location: Location,
},
}
impl ErrorExt for MetadataError {

View File

@@ -12,8 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::fmt::{self, Display};
use std::io::Cursor;
use api::helper::ColumnDataTypeWrapper;
use api::v1::add_column_location::LocationType;
@@ -21,16 +23,19 @@ use api::v1::column_def::{
as_fulltext_option_analyzer, as_fulltext_option_backend, as_skipping_index_type,
};
use api::v1::region::{
alter_request, compact_request, region_request, AlterRequest, AlterRequests, CloseRequest,
CompactRequest, CreateRequest, CreateRequests, DeleteRequests, DropRequest, DropRequests,
FlushRequest, InsertRequests, OpenRequest, TruncateRequest,
alter_request, compact_request, region_request, AlterRequest, AlterRequests,
BulkInsertRequests, CloseRequest, CompactRequest, CreateRequest, CreateRequests,
DeleteRequests, DropRequest, DropRequests, FlushRequest, InsertRequests, OpenRequest,
TruncateRequest,
};
use api::v1::{
self, set_index, Analyzer, FulltextBackend as PbFulltextBackend, Option as PbOption, Rows,
SemanticType, SkippingIndexType as PbSkippingIndexType, WriteHint,
};
pub use common_base::AffectedRows;
use common_recordbatch::DfRecordBatch;
use common_time::TimeToLive;
use datatypes::arrow::ipc::reader::FileReader;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{FulltextOptions, SkippingIndexOptions};
use serde::{Deserialize, Serialize};
@@ -39,9 +44,9 @@ use strum::{AsRefStr, IntoStaticStr};
use crate::logstore::entry;
use crate::metadata::{
ColumnMetadata, DecodeProtoSnafu, InvalidRawRegionRequestSnafu, InvalidRegionRequestSnafu,
InvalidSetRegionOptionRequestSnafu, InvalidUnsetRegionOptionRequestSnafu, MetadataError,
RegionMetadata, Result,
ColumnMetadata, DecodeArrowIpcSnafu, DecodeProtoSnafu, InvalidRawRegionRequestSnafu,
InvalidRegionRequestSnafu, InvalidSetRegionOptionRequestSnafu,
InvalidUnsetRegionOptionRequestSnafu, MetadataError, RegionMetadata, Result,
};
use crate::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
use crate::mito_engine_options::{
@@ -126,6 +131,7 @@ pub enum RegionRequest {
Compact(RegionCompactRequest),
Truncate(RegionTruncateRequest),
Catchup(RegionCatchupRequest),
BulkInserts(RegionBulkInsertsRequest),
}
impl RegionRequest {
@@ -146,6 +152,7 @@ impl RegionRequest {
region_request::Body::Creates(creates) => make_region_creates(creates),
region_request::Body::Drops(drops) => make_region_drops(drops),
region_request::Body::Alters(alters) => make_region_alters(alters),
region_request::Body::BulkInserts(bulk) => make_region_bulk_inserts(bulk),
}
}
@@ -315,6 +322,51 @@ fn make_region_truncate(truncate: TruncateRequest) -> Result<Vec<(RegionId, Regi
)])
}
/// Convert [BulkInsertRequests] to [RegionRequest] and group by [RegionId].
fn make_region_bulk_inserts(
requests: BulkInsertRequests,
) -> Result<Vec<(RegionId, RegionRequest)>> {
let mut region_requests: HashMap<u64, Vec<BulkInsertPayload>> =
HashMap::with_capacity(requests.requests.len());
for req in requests.requests {
let region_id = req.region_id;
match req.payload_type() {
api::v1::region::BulkInsertType::ArrowIpc => {
// todo(hl): use StreamReader instead
let reader = FileReader::try_new(Cursor::new(req.payload), None)
.context(DecodeArrowIpcSnafu)?;
let record_batches = reader
.map(|b| b.map(BulkInsertPayload::ArrowIpc))
.try_collect::<Vec<_>>()
.context(DecodeArrowIpcSnafu)?;
match region_requests.entry(region_id) {
Entry::Occupied(mut e) => {
e.get_mut().extend(record_batches);
}
Entry::Vacant(e) => {
e.insert(record_batches);
}
}
}
}
}
let result = region_requests
.into_iter()
.map(|(region_id, payloads)| {
(
region_id.into(),
RegionRequest::BulkInserts(RegionBulkInsertsRequest {
region_id: region_id.into(),
payloads,
}),
)
})
.collect::<Vec<_>>();
Ok(result)
}
/// Request to put data into a region.
#[derive(Debug)]
pub struct RegionPutRequest {
@@ -1119,6 +1171,17 @@ pub struct RegionSequencesRequest {
pub region_ids: Vec<RegionId>,
}
#[derive(Debug, Clone)]
pub struct RegionBulkInsertsRequest {
pub region_id: RegionId,
pub payloads: Vec<BulkInsertPayload>,
}
#[derive(Debug, Clone)]
pub enum BulkInsertPayload {
ArrowIpc(DfRecordBatch),
}
impl fmt::Display for RegionRequest {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
@@ -1133,6 +1196,7 @@ impl fmt::Display for RegionRequest {
RegionRequest::Compact(_) => write!(f, "Compact"),
RegionRequest::Truncate(_) => write!(f, "Truncate"),
RegionRequest::Catchup(_) => write!(f, "Catchup"),
RegionRequest::BulkInserts(_) => write!(f, "BulkInserts"),
}
}
}