mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-05 21:02:58 +00:00
fix: filter empty batch in bulk insert api (#6459)
* fix/filter-empty-batch-in-bulk-insert-api: **Add Early Return for Empty Record Batches in `bulk_insert.rs`** - Implemented an early return in the `Inserter` implementation to handle cases where `record_batch.num_rows()` is zero, improving efficiency by avoiding unnecessary processing. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * fix/filter-empty-batch-in-bulk-insert-api: **Improve Bulk Insert Handling** - **`handle_bulk_insert.rs`**: Added a check to handle cases where the batch has zero rows, immediately returning and sending a success response with zero rows processed. - **`bulk_insert.rs`**: Enhanced logic to skip processing for masks that select none, optimizing the bulk insert operation by avoiding unnecessary iterations. These changes improve the efficiency and robustness of the bulk insert process by handling edge cases more effectively. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * fix/filter-empty-batch-in-bulk-insert-api: ### Refactor and Error Handling Enhancements - **Refactored Timestamp Handling**: Introduced `timestamp_array_to_primitive` function in `timestamp.rs` to streamline conversion of timestamp arrays to primitive arrays, reducing redundancy in `handle_bulk_insert.rs` and `bulk_insert.rs`. - **Error Handling**: Added `InconsistentTimestampLength` error in `error.rs` to handle mismatched timestamp column lengths in bulk insert operations. - **Bulk Insert Logic**: Updated `handle_bulk_insert.rs` to utilize the new timestamp conversion function and added checks for timestamp length consistency. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * fix/filter-empty-batch-in-bulk-insert-api: **Refactor `bulk_insert.rs` to streamline imports** - Simplified import statements by removing unused timestamp-related arrays and data types from the `arrow` crate in `bulk_insert.rs`. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> --------- Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
@@ -12,6 +12,11 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use arrow_array::{
|
||||
ArrayRef, PrimitiveArray, TimestampMicrosecondArray, TimestampMillisecondArray,
|
||||
TimestampNanosecondArray, TimestampSecondArray,
|
||||
};
|
||||
use arrow_schema::DataType;
|
||||
use common_time::timestamp::TimeUnit;
|
||||
use common_time::Timestamp;
|
||||
use paste::paste;
|
||||
@@ -138,6 +143,41 @@ define_timestamp_with_unit!(Millisecond);
|
||||
define_timestamp_with_unit!(Microsecond);
|
||||
define_timestamp_with_unit!(Nanosecond);
|
||||
|
||||
pub fn timestamp_array_to_primitive(
|
||||
ts_array: &ArrayRef,
|
||||
) -> Option<(
|
||||
PrimitiveArray<arrow_array::types::Int64Type>,
|
||||
arrow::datatypes::TimeUnit,
|
||||
)> {
|
||||
let DataType::Timestamp(unit, _) = ts_array.data_type() else {
|
||||
return None;
|
||||
};
|
||||
|
||||
let ts_primitive = match unit {
|
||||
arrow_schema::TimeUnit::Second => ts_array
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampSecondArray>()
|
||||
.unwrap()
|
||||
.reinterpret_cast::<arrow_array::types::Int64Type>(),
|
||||
arrow_schema::TimeUnit::Millisecond => ts_array
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampMillisecondArray>()
|
||||
.unwrap()
|
||||
.reinterpret_cast::<arrow_array::types::Int64Type>(),
|
||||
arrow_schema::TimeUnit::Microsecond => ts_array
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampMicrosecondArray>()
|
||||
.unwrap()
|
||||
.reinterpret_cast::<arrow_array::types::Int64Type>(),
|
||||
arrow_schema::TimeUnit::Nanosecond => ts_array
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampNanosecondArray>()
|
||||
.unwrap()
|
||||
.reinterpret_cast::<arrow_array::types::Int64Type>(),
|
||||
};
|
||||
Some((ts_primitive, *unit))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use common_time::timezone::set_default_timezone;
|
||||
|
||||
@@ -1020,6 +1020,18 @@ pub enum Error {
|
||||
location: Location,
|
||||
source: mito_codec::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Inconsistent timestamp column length, expect: {}, actual: {}",
|
||||
expected,
|
||||
actual
|
||||
))]
|
||||
InconsistentTimestampLength {
|
||||
expected: usize,
|
||||
actual: usize,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
@@ -1175,6 +1187,8 @@ impl ErrorExt for Error {
|
||||
ConvertBulkWalEntry { source, .. } => source.status_code(),
|
||||
|
||||
Encode { source, .. } | Decode { source, .. } => source.status_code(),
|
||||
|
||||
InconsistentTimestampLength { .. } => StatusCode::InvalidArguments,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -15,15 +15,11 @@
|
||||
//! Handles bulk insert requests.
|
||||
|
||||
use datatypes::arrow;
|
||||
use datatypes::arrow::array::{
|
||||
TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
|
||||
TimestampSecondArray,
|
||||
};
|
||||
use datatypes::arrow::datatypes::{DataType, TimeUnit};
|
||||
use store_api::logstore::LogStore;
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::region_request::RegionBulkInsertsRequest;
|
||||
|
||||
use crate::error::InconsistentTimestampLengthSnafu;
|
||||
use crate::memtable::bulk::part::BulkPart;
|
||||
use crate::request::{OptionOutputTx, SenderBulkRequest};
|
||||
use crate::worker::RegionWorkerLoop;
|
||||
@@ -41,6 +37,10 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
.with_label_values(&["process_bulk_req"])
|
||||
.start_timer();
|
||||
let batch = request.payload;
|
||||
if batch.num_rows() == 0 {
|
||||
sender.send(Ok(0));
|
||||
return;
|
||||
}
|
||||
|
||||
let Some((ts_index, ts)) = batch
|
||||
.schema()
|
||||
@@ -60,55 +60,23 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
return;
|
||||
};
|
||||
|
||||
let DataType::Timestamp(unit, _) = ts.data_type() else {
|
||||
// safety: ts data type must be a timestamp type.
|
||||
unreachable!()
|
||||
};
|
||||
if batch.num_rows() != ts.len() {
|
||||
sender.send(
|
||||
InconsistentTimestampLengthSnafu {
|
||||
expected: batch.num_rows(),
|
||||
actual: ts.len(),
|
||||
}
|
||||
.fail(),
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
let (min_ts, max_ts) = match unit {
|
||||
TimeUnit::Second => {
|
||||
let ts = ts.as_any().downcast_ref::<TimestampSecondArray>().unwrap();
|
||||
(
|
||||
//safety: ts array must contain at least one row so this won't return None.
|
||||
arrow::compute::min(ts).unwrap(),
|
||||
arrow::compute::max(ts).unwrap(),
|
||||
)
|
||||
}
|
||||
// safety: ts data type must be a timestamp type.
|
||||
let (ts_primitive, _) = datatypes::timestamp::timestamp_array_to_primitive(ts).unwrap();
|
||||
|
||||
TimeUnit::Millisecond => {
|
||||
let ts = ts
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampMillisecondArray>()
|
||||
.unwrap();
|
||||
(
|
||||
//safety: ts array must contain at least one row so this won't return None.
|
||||
arrow::compute::min(ts).unwrap(),
|
||||
arrow::compute::max(ts).unwrap(),
|
||||
)
|
||||
}
|
||||
TimeUnit::Microsecond => {
|
||||
let ts = ts
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampMicrosecondArray>()
|
||||
.unwrap();
|
||||
(
|
||||
//safety: ts array must contain at least one row so this won't return None.
|
||||
arrow::compute::min(ts).unwrap(),
|
||||
arrow::compute::max(ts).unwrap(),
|
||||
)
|
||||
}
|
||||
TimeUnit::Nanosecond => {
|
||||
let ts = ts
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampNanosecondArray>()
|
||||
.unwrap();
|
||||
(
|
||||
//safety: ts array must contain at least one row so this won't return None.
|
||||
arrow::compute::min(ts).unwrap(),
|
||||
arrow::compute::max(ts).unwrap(),
|
||||
)
|
||||
}
|
||||
};
|
||||
// safety: we've checked ts.len() == batch.num_rows() and batch is not empty
|
||||
let min_ts = arrow::compute::min(&ts_primitive).unwrap();
|
||||
let max_ts = arrow::compute::max(&ts_primitive).unwrap();
|
||||
|
||||
let part = BulkPart {
|
||||
batch,
|
||||
|
||||
@@ -20,11 +20,7 @@ use api::v1::region::{
|
||||
bulk_insert_request, region_request, BulkInsertRequest, RegionRequest, RegionRequestHeader,
|
||||
};
|
||||
use api::v1::ArrowIpc;
|
||||
use arrow::array::{
|
||||
Array, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
|
||||
TimestampSecondArray,
|
||||
};
|
||||
use arrow::datatypes::{DataType, Int64Type, TimeUnit};
|
||||
use arrow::array::Array;
|
||||
use arrow::record_batch::RecordBatch;
|
||||
use common_base::AffectedRows;
|
||||
use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
|
||||
@@ -62,6 +58,10 @@ impl Inserter {
|
||||
};
|
||||
decode_timer.observe_duration();
|
||||
|
||||
if record_batch.num_rows() == 0 {
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
// notify flownode to update dirty timestamps if flow is configured.
|
||||
self.maybe_update_flow_dirty_window(table_info, record_batch.clone());
|
||||
|
||||
@@ -155,6 +155,9 @@ impl Inserter {
|
||||
let mut raw_data_bytes = None;
|
||||
for (peer, masks) in mask_per_datanode {
|
||||
for (region_id, mask) in masks {
|
||||
if mask.select_none() {
|
||||
continue;
|
||||
}
|
||||
let rb = record_batch.clone();
|
||||
let schema_bytes = schema_bytes.clone();
|
||||
let node_manager = self.node_manager.clone();
|
||||
@@ -304,32 +307,11 @@ fn extract_timestamps(rb: &RecordBatch, timestamp_index_name: &str) -> error::Re
|
||||
if rb.num_rows() == 0 {
|
||||
return Ok(vec![]);
|
||||
}
|
||||
let primitive = match ts_col.data_type() {
|
||||
DataType::Timestamp(unit, _) => match unit {
|
||||
TimeUnit::Second => ts_col
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampSecondArray>()
|
||||
.unwrap()
|
||||
.reinterpret_cast::<Int64Type>(),
|
||||
TimeUnit::Millisecond => ts_col
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampMillisecondArray>()
|
||||
.unwrap()
|
||||
.reinterpret_cast::<Int64Type>(),
|
||||
TimeUnit::Microsecond => ts_col
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampMicrosecondArray>()
|
||||
.unwrap()
|
||||
.reinterpret_cast::<Int64Type>(),
|
||||
TimeUnit::Nanosecond => ts_col
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampNanosecondArray>()
|
||||
.unwrap()
|
||||
.reinterpret_cast::<Int64Type>(),
|
||||
},
|
||||
t => {
|
||||
return error::InvalidTimeIndexTypeSnafu { ty: t.clone() }.fail();
|
||||
}
|
||||
};
|
||||
let (primitive, _) =
|
||||
datatypes::timestamp::timestamp_array_to_primitive(ts_col).with_context(|| {
|
||||
error::InvalidTimeIndexTypeSnafu {
|
||||
ty: ts_col.data_type().clone(),
|
||||
}
|
||||
})?;
|
||||
Ok(primitive.iter().flatten().collect())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user