feat: impl bulk memtable and bridge bulk inserts (#6054)

* feat/bridge-bulk-insert:
 ## Implement Bulk Insert and Update Dependencies

 - **Bulk Insert Implementation**: Added `handle_bulk_inserts` method in `src/operator/src/bulk_insert.rs` to manage bulk insert requests using `FlightDecoder` and `FlightData`.
 - **Dependency Updates**: Updated `Cargo.lock` and `Cargo.toml` to use the latest revision of `greptime-proto` and added new dependencies like `arrow`, `arrow-ipc`, `bytes`, and `prost`.
 - **gRPC Enhancements**: Modified `put_record_batch` method in `src/frontend/src/instance/grpc.rs` and `src/servers/src/grpc/flight.rs` to handle `FlightData` instead of `RawRecordBatch`.
 - **Error Handling**: Added new error types in `src/operator/src/error.rs` for handling Arrow operations and decoding flight data.
 - **Miscellaneous**: Updated `src/operator/src/insert.rs` to expose `partition_manager` and `node_manager` as public fields.

* feat/bridge-bulk-insert:
 - **Update `greptime-proto` Dependency**: Updated the `greptime-proto` dependency to a new revision in `Cargo.lock` and `Cargo.toml`.
 - **Refactor gRPC Query Handling**: Removed `RawRecordBatch` usage from `grpc.rs`, `flight.rs`, `greptime_handler.rs`, and test files, simplifying the gRPC query handling.
 - **Enhance Bulk Insert Logic**: Improved bulk insert logic in `bulk_insert.rs` and `region_request.rs` by using `FlightDecoder` and `BooleanArray` for better performance and clarity.
 - **Add `common-grpc` Dependency**: Added `common-grpc` as a workspace dependency in `store-api/Cargo.toml` to support gRPC functionalities.

* fix: clippy

* fix schema serialization

* feat/bridge-bulk-insert:
 Add error handling for encoding/decoding in `metadata.rs` and `region_request.rs`

 - Introduced new error variants `FlightCodec` and `Prost` in `MetadataError` to handle encoding/decoding failures in `metadata.rs`.
 - Updated `make_region_bulk_inserts` function in `region_request.rs` to use `context` for error handling with `ProstSnafu` and `FlightCodecSnafu`.
 - Enhanced error handling for `FlightData` decoding and `filter_record_batch` operations.

* fix: test

* refactor: rename

* allow empty app_metadata in FlightData

* feat/bridge-bulk-insert:
 - **Remove Logging**: Removed unnecessary logging of affected rows in `region_server.rs`.
 - **Error Handling Enhancement**: Improved error handling in `bulk_insert.rs` by adding context to `split_record_batch` and handling single datanode fast path.
 - **Error Enum Cleanup**: Removed unused `Arrow` error variant from `error.rs`.

* fix: standalone test

* feat/bridge-bulk-insert:
 ### Enhance Bulk Insert Handling and Metadata Management

 - **`lib.rs`**: Enabled the `result_flattening` feature for improved error handling.
 - **`request.rs`**: Made `name_to_index` and `has_null` fields public in `WriteRequest` for better accessibility.
 - **`handle_bulk_insert.rs`**:
   - Added `handle_record_batch` function to streamline processing of bulk insert payloads.
   - Improved error handling and task management for bulk insert operations.
   - Updated `region_metadata_to_column_schema` to return both column schemas and a name-to-index map for efficient data access.

* feat/bridge-bulk-insert:
 - **Refactor `handle_bulk_insert.rs`:**
   - Replaced `handle_record_batch` with `handle_payload` for handling payloads.
   - Modified the fast path to use `common_runtime::spawn_global` for asynchronous task execution.

 - **Optimize `multi_dim.rs`:**
   - Added a fast path for single-region scenarios in `MultiDimPartitionRule::partition_record_batch`.

* feat/bridge-bulk-insert:
 - **Update `greptime-proto` Dependency**: Updated the `greptime-proto` dependency to a new revision in both `Cargo.lock` and `Cargo.toml`.
 - **Optimize Memory Allocation**: Increased initial and builder capacities in `time_series.rs` to improve performance.
 - **Enhance Data Handling**: Modified `bulk_insert.rs` to use `Bytes` for efficient data handling.
 - **Improve Bulk Insert Logic**: Refined the bulk insert logic in `region_request.rs` to handle schema and payload data more effectively and optimize record batch filtering.
 - **String Handling Improvement**: Updated string conversion in `helper.rs` for better performance.

* fix: clippy warnings

* feat/bridge-bulk-insert:
 **Add Metrics and Improve Error Handling**

 - **Metrics Enhancements**: Introduced new metrics for bulk insert operations in `metrics.rs`, `bulk_insert.rs`, `greptime_handler.rs`, and `region_request.rs`. Added `HANDLE_BULK_INSERT_ELAPSED`, `BULK_REQUEST_MESSAGE_SIZE`, and `GRPC_BULK_INSERT_ELAPSED` histograms to
 monitor performance.
 - **Error Handling Improvements**: Removed unnecessary error handling in `handle_bulk_insert.rs` by eliminating redundant `let _ =` patterns.
 - **Dependency Updates**: Added `lazy_static` and `prometheus` to `Cargo.lock` and `Cargo.toml` for metrics support.
 - **Code Refactoring**: Simplified function calls in `region_server.rs` and `handle_bulk_insert.rs` for better readability.

* chore: rebase main

* implement simple bulk memtable

* impl write_bulk

* implement simple bulk memtable

* feat/simple-bulk-memtable:
 ### Enhance Time-Series Memtable and Bulk Insert Handling

 - **Visibility Modifications**: Made `mutable_array` in `PrimitiveVectorBuilder` and `StringVectorBuilder` public in `primitive.rs` and `string.rs`.
 - **New Module**: Added `builder.rs` to `memtable` for time-series builders, including `FieldBuilder` and `StringBuilder` implementations.
 - **Bulk Insert Enhancements**:
   - Added `sequence` field to `BulkPart` in `part.rs` and updated its handling in `simple_bulk_memtable.rs` and `region_write_ctx.rs`.
   - Introduced metrics for bulk insert operations in `metrics.rs` and `bulk_insert.rs`.
 - **Performance Metrics**: Added timing metrics for write operations in `metrics.rs`, `region_write_ctx.rs`, and `handle_write.rs`.
 - **Region Request Handling**: Updated `make_region_bulk_inserts` in `region_request.rs` to include performance metrics.

* feat/simple-bulk-memtable:
 **Improve Memtable Stats Calculation and Add Metrics Timer**

 - **`simple_bulk_memtable.rs`**: Refactored `stats` method to use `num_rows` for checking if rows have been written, improving accuracy in memory table statistics.
 - **`handle_bulk_insert.rs`**: Introduced a metrics timer to measure the elapsed time for processing bulk requests, enhancing performance monitoring.

* feat/simple-bulk-memtable:
 ### Commit Message

 **Enhancements and Bug Fixes**

 - **Dependency Update**: Updated `greptime-proto` dependency to a new revision in `Cargo.lock` and `Cargo.toml`.
 - **Feature Addition**: Implemented `to_mutation` method in `BulkPart` to convert `BulkPart` to `Mutation` for fallback `write_bulk` implementation in `src/mito2/src/memtable/bulk/part.rs`.
 - **Functionality Improvement**: Modified `write_bulk` method in `TimeSeriesMemtable` to support default implementation fallback to row iteration in `src/mito2/src/memtable/time_series.rs`.
 - **Performance Optimization**: Enhanced `bulk_insert` handling by optimizing region request processing and data partitioning in `src/operator/src/bulk_insert.rs`.
 - **Error Handling**: Added `ComputeArrow` error variant for better error management in `src/operator/src/error.rs`.
 - **Code Refactoring**: Simplified region bulk insert request processing in `src/store-api/src/region_request.rs`.

* fix: some clippy warnings

* feat/simple-bulk-memtable:
 ### Commit Summary

 - **Refactor Return Types to `Result`:**
   Updated the return type of the `ranges` method in `memtable.rs`, `bulk.rs`, `partition_tree.rs`, `simple_bulk_memtable.rs`, `time_series.rs`, and `memtable_util.rs` to return `Result<MemtableRanges>` for better error handling.

 - **Enhance Metrics Tracking:**
   Improved metrics tracking by adding `num_rows` and `max_sequence` to `WriteMetrics` in `stats.rs`. Updated related methods in `partition_tree.rs`, `simple_bulk_memtable.rs`, `time_series.rs`, and `scan_region.rs` to utilize these metrics.

 - **Remove Unused Imports:**
   Cleaned up unused imports in `time_series.rs` to streamline the codebase.

* merge main

* remove useless error vairant

* use newer version of proto

* feat/simple-bulk-memtable:
                                                                                                                                 Commit Message

                                                                                                                                     Summary

Enhance FieldBuilder and StringBuilder functionality, add tests, and improve error handling.

                                                                                                                                   Key Changes

 • builder.rs:
    • Added documentation for FieldBuilder methods.
    • Renamed append_string_vector to append_vector in StringBuilder.
 • simple_bulk_memtable.rs:
    • Added new test cases for write_one, write_bulk, is_empty, stats, fork, and sequence_filter.
 • time_series.rs:
    • Improved error handling in ValueBuilder for type mismatches.
 • memtable_util.rs:
    • Removed unused imports and streamlined code.

These changes enhance the robustness and test coverage of the memtable components.

* feat/simple-bulk-memtable:
 Improve Time Partition Matching Logic in `time_partition.rs`

 - Enhanced the `write_bulk` method in `time_partition.rs` to improve the logic for matching partitions based on time ranges.
 - Introduced a new mechanism to filter and select partitions that overlap with the record batch's timestamp range before writing.

* feat/simple-bulk-memtable:
 Improve Metrics Handling in `bulk_insert.rs`

 - Removed the `group_request_timer` and its associated metric observation to streamline the timing logic.
 - Moved the `BULK_REQUEST_ROWS` metric observation to occur after filtering, ensuring accurate row count metrics.

* feat/simple-bulk-memtable:
 **Enhance Stalled Requests Calculation and Update Metrics**

 - **`worker.rs`**: Updated the `stalled_count` method to include both `reqs` and `bulk_reqs` in the calculation of stalled requests.
 - **`bulk_insert.rs`**: Removed duplicate observation of `BULK_REQUEST_MESSAGE_SIZE` metric.
 - **`metrics.rs`**: Changed the bucket strategy for `BULK_REQUEST_ROWS` from linear to exponential, improving the granularity of metrics collection.

* feat/simple-bulk-memtable:
 **Refactor `StringVector` Usage and Update Method Signatures**

 - **`src/datatypes/src/vectors/string.rs`**: Changed `StringVector`'s `array` field from public to private.
 - **`src/mito2/src/memtable/builder.rs`**: Refactored `append_vector` method to `append_array`, updating its usage to work directly with `StringArray` instead of `StringVector`.
 - **`src/mito2/src/memtable/time_series.rs`**: Updated `ValueBuilder` to handle `StringArray` directly, replacing `StringVector` usage with `StringArray` in the `FieldBuilder::String` case.

* feat/simple-bulk-memtable:
 - **Refactor `PrimitiveVectorBuilder`**: Made `mutable_array` private in `src/datatypes/src/vectors/primitive.rs`.
 - **Optimize `ValueBuilder`**: Replaced `UInt64VectorBuilder` and `UInt8VectorBuilder` with `Vec<u64>` and `Vec<u8>` for `sequence` and `op_type` in `src/mito2/src/memtable/time_series.rs`.
 - **Improve Metrics Initialization**: Updated histogram bucket initialization to use `exponential_buckets` in `src/mito2/src/metrics.rs`.

* feat/simple-bulk-memtable:
 Improve error handling in `simple_bulk_memtable.rs` and `time_series.rs`

 - Enhanced error handling by using `OptionExt` for more concise error context management in `simple_bulk_memtable.rs` and `time_series.rs`.
 - Replaced `ok_or` with `with_context` to streamline error context creation in both files.

* feat/simple-bulk-memtable:
 **Enhance Time Partition Handling in `time_partition.rs`**

 - Introduced `create_time_partition` function to streamline the creation of new time partitions, ensuring thread safety by acquiring a lock.
 - Modified logic to handle cases where no matching time partitions exist, creating new partitions as needed.
 - Updated `write_record_batch` and `write_one` methods to utilize the new partition creation logic, improving partition management and data writing efficiency.

* replace proto

* feat/simple-bulk-memtable:
 Update `metrics.rs` to adjust the range of exponential buckets for bulk insert message rows from `10 ~ 1_000_000` to `10 ~ 100_000`.
This commit is contained in:
Lei, HUANG
2025-05-09 10:56:09 +08:00
committed by GitHub
parent b442414422
commit 8685ceb232
27 changed files with 1868 additions and 732 deletions

2
Cargo.lock generated
View File

@@ -4820,7 +4820,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=159e92d30b4c0116a7ef376b535d880c6d580fb9#159e92d30b4c0116a7ef376b535d880c6d580fb9"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=17a3550751c8b1e02ec16be40101d5f24dc255c3#17a3550751c8b1e02ec16be40101d5f24dc255c3"
dependencies = [
"prost 0.13.5",
"serde",

View File

@@ -130,7 +130,7 @@ etcd-client = "0.14"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "159e92d30b4c0116a7ef376b535d880c6d580fb9" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "17a3550751c8b1e02ec16be40101d5f24dc255c3" }
hex = "0.4"
http = "1"
humantime = "2.1"

View File

@@ -166,7 +166,7 @@ impl ScalarVector for StringVector {
}
pub struct StringVectorBuilder {
mutable_array: MutableStringArray,
pub mutable_array: MutableStringArray,
}
impl MutableVector for StringVectorBuilder {

View File

@@ -1021,17 +1021,6 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Failed to convert ConcreteDataType to ColumnDataType: {:?}",
data_type
))]
ConvertDataType {
data_type: ConcreteDataType,
source: api::error::Error,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -1183,7 +1172,6 @@ impl ErrorExt for Error {
ManualCompactionOverride {} => StatusCode::Cancelled,
IncompatibleWalProviderChange { .. } => StatusCode::InvalidArguments,
ConvertDataType { .. } => StatusCode::Internal,
}
}

View File

@@ -41,8 +41,8 @@ use crate::region::options::IndexOptions;
use crate::region::version::{VersionControlData, VersionControlRef};
use crate::region::{ManifestContextRef, RegionLeaderState};
use crate::request::{
BackgroundNotify, FlushFailed, FlushFinished, OptionOutputTx, OutputTx, SenderDdlRequest,
SenderWriteRequest, WorkerRequest,
BackgroundNotify, FlushFailed, FlushFinished, OptionOutputTx, OutputTx, SenderBulkRequest,
SenderDdlRequest, SenderWriteRequest, WorkerRequest,
};
use crate::schedule::scheduler::{Job, SchedulerRef};
use crate::sst::file::FileMeta;
@@ -547,7 +547,11 @@ impl FlushScheduler {
pub(crate) fn on_flush_success(
&mut self,
region_id: RegionId,
) -> Option<(Vec<SenderDdlRequest>, Vec<SenderWriteRequest>)> {
) -> Option<(
Vec<SenderDdlRequest>,
Vec<SenderWriteRequest>,
Vec<SenderBulkRequest>,
)> {
let flush_status = self.region_status.get_mut(&region_id)?;
// This region doesn't have running flush job.
@@ -557,7 +561,11 @@ impl FlushScheduler {
// The region doesn't have any pending flush task.
// Safety: The flush status must exist.
let flush_status = self.region_status.remove(&region_id).unwrap();
Some((flush_status.pending_ddls, flush_status.pending_writes))
Some((
flush_status.pending_ddls,
flush_status.pending_writes,
flush_status.pending_bulk_writes,
))
} else {
let version_data = flush_status.version_control.current();
if version_data.version.memtables.is_empty() {
@@ -570,7 +578,11 @@ impl FlushScheduler {
// it from the status to avoid leaking pending requests.
// Safety: The flush status must exist.
let flush_status = self.region_status.remove(&region_id).unwrap();
Some((flush_status.pending_ddls, flush_status.pending_writes))
Some((
flush_status.pending_ddls,
flush_status.pending_writes,
flush_status.pending_bulk_writes,
))
} else {
// We can flush the region again, keep it in the region status.
None
@@ -657,6 +669,15 @@ impl FlushScheduler {
status.pending_writes.push(request);
}
/// Add bulk write request to pending queue.
///
/// # Panics
/// Panics if region didn't request flush.
pub(crate) fn add_bulk_request_to_pending(&mut self, request: SenderBulkRequest) {
let status = self.region_status.get_mut(&request.region_id).unwrap();
status.pending_bulk_writes.push(request);
}
/// Returns true if the region has pending DDLs.
pub(crate) fn has_pending_ddls(&self, region_id: RegionId) -> bool {
self.region_status
@@ -717,6 +738,8 @@ struct FlushStatus {
pending_ddls: Vec<SenderDdlRequest>,
/// Requests waiting to write after altering the region.
pending_writes: Vec<SenderWriteRequest>,
/// Bulk requests waiting to write after altering the region.
pending_bulk_writes: Vec<SenderBulkRequest>,
}
impl FlushStatus {
@@ -728,6 +751,7 @@ impl FlushStatus {
pending_task: None,
pending_ddls: Vec::new(),
pending_writes: Vec::new(),
pending_bulk_writes: Vec::new(),
}
}

View File

@@ -19,7 +19,7 @@ use std::fmt;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
pub use bulk::part::BulkPart;
pub use bulk::part::EncodedBulkPart;
use common_time::Timestamp;
use serde::{Deserialize, Serialize};
use store_api::metadata::RegionMetadataRef;
@@ -29,6 +29,7 @@ use table::predicate::Predicate;
use crate::config::MitoConfig;
use crate::error::Result;
use crate::flush::WriteBufferManagerRef;
use crate::memtable::bulk::part::BulkPart;
use crate::memtable::key_values::KeyValue;
pub use crate::memtable::key_values::KeyValues;
use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtableBuilder};
@@ -40,9 +41,11 @@ use crate::read::Batch;
use crate::region::options::{MemtableOptions, MergeMode};
use crate::sst::file::FileTimeRange;
mod builder;
pub mod bulk;
pub mod key_values;
pub mod partition_tree;
mod simple_bulk_memtable;
mod stats;
pub mod time_partition;
pub mod time_series;
@@ -158,7 +161,7 @@ pub trait Memtable: Send + Sync + fmt::Debug {
projection: Option<&[ColumnId]>,
predicate: PredicateGroup,
sequence: Option<SequenceNumber>,
) -> MemtableRanges;
) -> Result<MemtableRanges>;
/// Returns true if the memtable is empty.
fn is_empty(&self) -> bool;

View File

@@ -0,0 +1,300 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Builders for time-series memtable
use std::sync::Arc;
use datatypes::arrow;
use datatypes::arrow::array::{
Array, ArrayDataBuilder, BufferBuilder, GenericByteArray, NullBufferBuilder, UInt8BufferBuilder,
};
use datatypes::arrow_array::StringArray;
use datatypes::data_type::DataType;
use datatypes::prelude::{ConcreteDataType, MutableVector, VectorRef};
use datatypes::value::ValueRef;
use datatypes::vectors::StringVector;
/// Field builder with special implementation for strings.
pub(crate) enum FieldBuilder {
String(StringBuilder),
Other(Box<dyn MutableVector>),
}
impl FieldBuilder {
/// Creates a [FieldBuilder] instance with given type and capacity.
pub fn create(data_type: &ConcreteDataType, init_cap: usize) -> Self {
if let ConcreteDataType::String(_) = data_type {
Self::String(StringBuilder::with_capacity(init_cap / 16, init_cap))
} else {
Self::Other(data_type.create_mutable_vector(init_cap))
}
}
/// Pushes a value into builder.
pub(crate) fn push(&mut self, value: ValueRef) -> datatypes::error::Result<()> {
match self {
FieldBuilder::String(b) => {
if let Some(s) = value.as_string()? {
b.append(s);
} else {
b.append_null();
}
Ok(())
}
FieldBuilder::Other(b) => b.try_push_value_ref(value),
}
}
/// Push n null values into builder.
pub(crate) fn push_nulls(&mut self, n: usize) {
match self {
FieldBuilder::String(s) => {
s.append_n_nulls(n);
}
FieldBuilder::Other(v) => {
v.push_nulls(n);
}
}
}
/// Finishes builder and builder a [VectorRef].
pub(crate) fn finish(&mut self) -> VectorRef {
match self {
FieldBuilder::String(s) => Arc::new(StringVector::from(s.build())) as _,
FieldBuilder::Other(v) => v.to_vector(),
}
}
}
/// [StringBuilder] serves as a workaround for lacking [`GenericStringBuilder::append_array`](https://docs.rs/arrow-array/latest/arrow_array/builder/type.GenericStringBuilder.html#method.append_array)
/// which is only available since arrow-rs 55.0.0.
pub(crate) struct StringBuilder {
value_builder: UInt8BufferBuilder,
offsets_builder: BufferBuilder<i32>,
null_buffer_builder: NullBufferBuilder,
}
impl Default for StringBuilder {
fn default() -> Self {
Self::with_capacity(16, 256)
}
}
impl StringBuilder {
/// Creates a new [`GenericByteBuilder`].
///
/// - `item_capacity` is the number of items to pre-allocate.
/// The size of the preallocated buffer of offsets is the number of items plus one.
/// - `data_capacity` is the total number of bytes of data to pre-allocate
/// (for all items, not per item).
pub fn with_capacity(item_capacity: usize, data_capacity: usize) -> Self {
let mut offsets_builder = BufferBuilder::<i32>::new(item_capacity + 1);
offsets_builder.append(0);
Self {
value_builder: UInt8BufferBuilder::new(data_capacity),
offsets_builder,
null_buffer_builder: NullBufferBuilder::new(item_capacity),
}
}
pub fn append(&mut self, data: &str) {
self.value_builder.append_slice(data.as_bytes());
self.null_buffer_builder.append(true);
self.offsets_builder.append(self.next_offset());
}
#[inline]
fn next_offset(&self) -> i32 {
i32::try_from(self.value_builder.len()).expect("byte array offset overflow")
}
pub fn len(&self) -> usize {
self.null_buffer_builder.len()
}
/// Based on arrow-rs' GenericByteBuilder:
/// https://github.com/apache/arrow-rs/blob/7905545537c50590fdb4dc645e3e0130fce80b57/arrow-array/src/builder/generic_bytes_builder.rs#L135
pub fn append_array(&mut self, array: &StringArray) {
if array.len() == 0 {
return;
}
let offsets = array.offsets();
// If the offsets are contiguous, we can append them directly avoiding the need to align
// for example, when the first appended array is not sliced (starts at offset 0)
if self.next_offset() == offsets[0] {
self.offsets_builder.append_slice(&offsets[1..]);
} else {
// Shifting all the offsets
let shift: i32 = self.next_offset() - offsets[0];
// Creating intermediate offsets instead of pushing each offset is faster
// (even if we make MutableBuffer to avoid updating length on each push
// and reserve the necessary capacity, it's still slower)
let mut intermediate = Vec::with_capacity(offsets.len() - 1);
for &offset in &offsets[1..] {
intermediate.push(offset + shift)
}
self.offsets_builder.append_slice(&intermediate);
}
// Append underlying values, starting from the first offset and ending at the last offset
self.value_builder.append_slice(
&array.values().as_slice()[offsets[0] as usize..offsets[array.len()] as usize],
);
if let Some(null_buffer) = array.nulls() {
let data: Vec<_> = null_buffer.inner().iter().collect();
self.null_buffer_builder.append_slice(&data);
} else {
self.null_buffer_builder.append_n_non_nulls(array.len());
}
}
pub fn append_null(&mut self) {
self.null_buffer_builder.append(false);
self.offsets_builder.append(self.next_offset());
}
pub fn append_n_nulls(&mut self, n: usize) {
self.null_buffer_builder.append_n_nulls(n);
self.offsets_builder.append_n(n, self.next_offset());
}
pub fn build(&mut self) -> StringArray {
let array_builder = ArrayDataBuilder::new(arrow::datatypes::DataType::Utf8)
.len(self.len())
.add_buffer(self.offsets_builder.finish())
.add_buffer(self.value_builder.finish())
.nulls(self.null_buffer_builder.finish());
self.offsets_builder.append(self.next_offset());
let array_data = unsafe { array_builder.build_unchecked() };
GenericByteArray::from(array_data)
}
}
#[cfg(test)]
mod tests {
use datatypes::arrow::array::StringArray;
use super::*;
#[test]
fn test_append() {
let mut builder = StringBuilder::default();
builder.append_n_nulls(10);
let array = builder.build();
assert_eq!(vec![None; 10], array.iter().collect::<Vec<_>>());
let mut builder = StringBuilder::default();
builder.append_n_nulls(3);
builder.append("hello");
builder.append_null();
builder.append("world");
assert_eq!(
vec![None, None, None, Some("hello"), None, Some("world")],
builder.build().iter().collect::<Vec<_>>()
)
}
#[test]
fn test_append_empty_string() {
let mut builder = StringBuilder::default();
builder.append("");
builder.append_null();
builder.append("");
let array = builder.build();
assert_eq!(
vec![Some(""), None, Some("")],
array.iter().collect::<Vec<_>>()
);
}
#[test]
fn test_append_large_string() {
let large_str = "a".repeat(1024);
let mut builder = StringBuilder::default();
builder.append(&large_str);
let array = builder.build();
assert_eq!(large_str.as_str(), array.value(0));
}
#[test]
fn test_append_array() {
let mut builder_1 = StringBuilder::default();
builder_1.append("hello");
builder_1.append_null();
builder_1.append("world");
let mut builder_2 = StringBuilder::default();
builder_2.append_null();
builder_2.append("!");
builder_2.append_array(&builder_1.build());
assert_eq!(
vec![None, Some("!"), Some("hello"), None, Some("world")],
builder_2.build().iter().collect::<Vec<_>>()
)
}
#[test]
fn test_append_empty_array() {
let mut builder = StringBuilder::default();
builder.append_array(&StringArray::from(vec![] as Vec<&str>));
let array = builder.build();
assert_eq!(0, array.len());
}
#[test]
fn test_append_partial_array() {
let source = StringArray::from(vec![Some("a"), None, Some("b"), Some("c")]);
let sliced = source.slice(1, 2); // [None, Some("b")]
let mut builder = StringBuilder::default();
builder.append_array(&sliced);
let array = builder.build();
assert_eq!(vec![None, Some("b")], array.iter().collect::<Vec<_>>());
}
#[test]
fn test_builder_capacity() {
let mut builder = StringBuilder::with_capacity(10, 100);
assert_eq!(0, builder.len());
for i in 0..10 {
builder.append(&format!("string-{}", i));
}
let array = builder.build();
assert_eq!(10, array.len());
assert_eq!("string-0", array.value(0));
assert_eq!("string-9", array.value(9));
}
#[test]
fn test_builder_reset_after_build() {
let mut builder = StringBuilder::default();
builder.append("first");
let array1 = builder.build();
assert_eq!(1, array1.len());
builder.append("second");
let array2 = builder.build();
assert_eq!(1, array2.len()); // Not 2 because build() doesn't reset
}
}

View File

@@ -21,7 +21,7 @@ use store_api::storage::{ColumnId, SequenceNumber};
use table::predicate::Predicate;
use crate::error::Result;
use crate::memtable::bulk::part::BulkPart;
use crate::memtable::bulk::part::{BulkPart, EncodedBulkPart};
use crate::memtable::key_values::KeyValue;
use crate::memtable::{
BoxedBatchIterator, KeyValues, Memtable, MemtableId, MemtableRanges, MemtableRef,
@@ -38,7 +38,7 @@ mod row_group_reader;
#[derive(Debug)]
pub struct BulkMemtable {
id: MemtableId,
parts: RwLock<Vec<BulkPart>>,
parts: RwLock<Vec<EncodedBulkPart>>,
}
impl Memtable for BulkMemtable {
@@ -54,9 +54,7 @@ impl Memtable for BulkMemtable {
unimplemented!()
}
fn write_bulk(&self, fragment: BulkPart) -> Result<()> {
let mut parts = self.parts.write().unwrap();
parts.push(fragment);
fn write_bulk(&self, _fragment: BulkPart) -> Result<()> {
Ok(())
}
@@ -74,7 +72,7 @@ impl Memtable for BulkMemtable {
_projection: Option<&[ColumnId]>,
_predicate: PredicateGroup,
_sequence: Option<SequenceNumber>,
) -> MemtableRanges {
) -> Result<MemtableRanges> {
todo!()
}

View File

@@ -17,13 +17,15 @@
use std::collections::VecDeque;
use std::sync::Arc;
use api::v1::Mutation;
use api::helper::{value_to_grpc_value, ColumnDataTypeWrapper};
use api::v1::{Mutation, OpType};
use bytes::Bytes;
use common_recordbatch::DfRecordBatch as RecordBatch;
use common_time::timestamp::TimeUnit;
use datafusion::arrow::array::{TimestampNanosecondArray, UInt64Builder};
use datatypes::arrow;
use datatypes::arrow::array::{
Array, ArrayRef, BinaryBuilder, DictionaryArray, RecordBatch, TimestampMicrosecondArray,
Array, ArrayRef, BinaryBuilder, DictionaryArray, TimestampMicrosecondArray,
TimestampMillisecondArray, TimestampSecondArray, UInt32Array, UInt64Array, UInt8Array,
UInt8Builder,
};
@@ -32,34 +34,105 @@ use datatypes::arrow::datatypes::SchemaRef;
use datatypes::arrow_array::BinaryArray;
use datatypes::data_type::DataType;
use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector};
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
use datatypes::value::Value;
use datatypes::vectors::Helper;
use parquet::arrow::ArrowWriter;
use parquet::data_type::AsBytes;
use parquet::file::metadata::ParquetMetaData;
use parquet::file::properties::WriterProperties;
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::{ColumnId, SequenceNumber};
use store_api::storage::SequenceNumber;
use table::predicate::Predicate;
use crate::error;
use crate::error::{ComputeArrowSnafu, EncodeMemtableSnafu, NewRecordBatchSnafu, Result};
use crate::memtable::bulk::context::BulkIterContextRef;
use crate::memtable::bulk::part_reader::BulkPartIter;
use crate::memtable::key_values::KeyValuesRef;
use crate::memtable::key_values::{KeyValue, KeyValuesRef};
use crate::memtable::BoxedBatchIterator;
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt};
use crate::sst::parquet::format::{PrimaryKeyArray, ReadFormat};
use crate::sst::parquet::helper::parse_parquet_metadata;
use crate::sst::to_sst_arrow_schema;
#[derive(Debug)]
pub struct BulkPart {
pub(crate) batch: RecordBatch,
pub(crate) num_rows: usize,
pub(crate) max_ts: i64,
pub(crate) min_ts: i64,
pub(crate) sequence: u64,
}
impl BulkPart {
pub(crate) fn estimated_size(&self) -> usize {
self.batch.get_array_memory_size()
}
/// Converts [BulkPart] to [Mutation] for fallback `write_bulk` implementation.
pub(crate) fn to_mutation(&self, region_metadata: &RegionMetadataRef) -> Result<Mutation> {
let vectors = region_metadata
.schema
.column_schemas()
.iter()
.map(|col| match self.batch.column_by_name(&col.name) {
None => Ok(None),
Some(col) => Helper::try_into_vector(col).map(Some),
})
.collect::<datatypes::error::Result<Vec<_>>>()
.context(error::ComputeVectorSnafu)?;
let rows = (0..self.num_rows)
.map(|row_idx| {
let values = (0..self.batch.num_columns())
.map(|col_idx| {
if let Some(v) = &vectors[col_idx] {
value_to_grpc_value(v.get(row_idx))
} else {
api::v1::Value { value_data: None }
}
})
.collect::<Vec<_>>();
api::v1::Row { values }
})
.collect::<Vec<_>>();
let schema = region_metadata
.column_metadatas
.iter()
.map(|c| {
let data_type_wrapper =
ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())?;
Ok(api::v1::ColumnSchema {
column_name: c.column_schema.name.clone(),
datatype: data_type_wrapper.datatype() as i32,
semantic_type: c.semantic_type as i32,
..Default::default()
})
})
.collect::<api::error::Result<Vec<_>>>()
.context(error::ConvertColumnDataTypeSnafu {
reason: "failed to convert region metadata to column schema",
})?;
let rows = api::v1::Rows { schema, rows };
Ok(Mutation {
op_type: OpType::Put as i32,
sequence: self.sequence,
rows: Some(rows),
write_hint: None,
})
}
}
#[derive(Debug)]
pub struct EncodedBulkPart {
data: Bytes,
metadata: BulkPartMeta,
}
impl BulkPart {
impl EncodedBulkPart {
pub fn new(data: Bytes, metadata: BulkPartMeta) -> Self {
Self { data, metadata }
}
@@ -138,8 +211,8 @@ impl BulkPartEncoder {
}
impl BulkPartEncoder {
/// Encodes mutations to a [BulkPart], returns true if encoded data has been written to `dest`.
fn encode_mutations(&self, mutations: &[Mutation]) -> Result<Option<BulkPart>> {
/// Encodes mutations to a [EncodedBulkPart], returns true if encoded data has been written to `dest`.
fn encode_mutations(&self, mutations: &[Mutation]) -> Result<Option<EncodedBulkPart>> {
let Some((arrow_record_batch, min_ts, max_ts)) =
mutations_to_record_batch(mutations, &self.metadata, &self.pk_encoder, self.dedup)?
else {
@@ -162,7 +235,7 @@ impl BulkPartEncoder {
let buf = Bytes::from(buf);
let parquet_metadata = Arc::new(parse_parquet_metadata(file_metadata)?);
Ok(Some(BulkPart {
Ok(Some(EncodedBulkPart {
data: buf,
metadata: BulkPartMeta {
num_rows: arrow_record_batch.num_rows(),
@@ -742,7 +815,7 @@ mod tests {
);
}
fn encode(input: &[MutationInput]) -> BulkPart {
fn encode(input: &[MutationInput]) -> EncodedBulkPart {
let metadata = metadata_for_test();
let mutations = input
.iter()
@@ -823,7 +896,7 @@ mod tests {
assert_eq!(vec![0.1, 0.2, 0.0], field);
}
fn prepare(key_values: Vec<(&str, u32, (i64, i64), u64)>) -> BulkPart {
fn prepare(key_values: Vec<(&str, u32, (i64, i64), u64)>) -> EncodedBulkPart {
let metadata = metadata_for_test();
let mutations = key_values
.into_iter()
@@ -838,7 +911,11 @@ mod tests {
encoder.encode_mutations(&mutations).unwrap().unwrap()
}
fn check_prune_row_group(part: &BulkPart, predicate: Option<Predicate>, expected_rows: usize) {
fn check_prune_row_group(
part: &EncodedBulkPart,
predicate: Option<Predicate>,
expected_rows: usize,
) {
let context = Arc::new(BulkIterContext::new(
part.metadata.region_metadata.clone(),
&None,

View File

@@ -37,11 +37,12 @@ use table::predicate::Predicate;
use crate::error::{Result, UnsupportedOperationSnafu};
use crate::flush::WriteBufferManagerRef;
use crate::memtable::bulk::part::BulkPart;
use crate::memtable::key_values::KeyValue;
use crate::memtable::partition_tree::tree::PartitionTree;
use crate::memtable::stats::WriteMetrics;
use crate::memtable::{
AllocTracker, BoxedBatchIterator, BulkPart, IterBuilder, KeyValues, Memtable, MemtableBuilder,
AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, Memtable, MemtableBuilder,
MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats,
PredicateGroup,
};
@@ -147,15 +148,11 @@ impl Memtable for PartitionTreeMemtable {
// Ensures the memtable always updates stats.
let res = self.tree.write(kvs, &mut pk_buffer, &mut metrics);
self.update_stats(&metrics);
// update max_sequence
if res.is_ok() {
let sequence = kvs.max_sequence();
self.max_sequence.fetch_max(sequence, Ordering::Relaxed);
metrics.max_sequence = kvs.max_sequence();
metrics.num_rows = kvs.num_rows();
self.update_stats(&metrics);
}
self.num_rows.fetch_add(kvs.num_rows(), Ordering::Relaxed);
res
}
@@ -165,15 +162,12 @@ impl Memtable for PartitionTreeMemtable {
// Ensures the memtable always updates stats.
let res = self.tree.write_one(key_value, &mut pk_buffer, &mut metrics);
self.update_stats(&metrics);
// update max_sequence
if res.is_ok() {
self.max_sequence
.fetch_max(key_value.sequence(), Ordering::Relaxed);
metrics.max_sequence = metrics.max_sequence.max(key_value.sequence());
metrics.num_rows = 1;
self.update_stats(&metrics);
}
self.num_rows.fetch_add(1, Ordering::Relaxed);
res
}
@@ -198,7 +192,7 @@ impl Memtable for PartitionTreeMemtable {
projection: Option<&[ColumnId]>,
predicate: PredicateGroup,
sequence: Option<SequenceNumber>,
) -> MemtableRanges {
) -> Result<MemtableRanges> {
let projection = projection.map(|ids| ids.to_vec());
let builder = Box::new(PartitionTreeIterBuilder {
tree: self.tree.clone(),
@@ -208,10 +202,10 @@ impl Memtable for PartitionTreeMemtable {
});
let context = Arc::new(MemtableRangeContext::new(self.id, builder, predicate));
MemtableRanges {
Ok(MemtableRanges {
ranges: [(0, MemtableRange::new(context))].into(),
stats: self.stats(),
}
})
}
fn is_empty(&self) -> bool {
@@ -306,6 +300,9 @@ impl PartitionTreeMemtable {
.fetch_max(metrics.max_ts, Ordering::SeqCst);
self.min_timestamp
.fetch_min(metrics.min_ts, Ordering::SeqCst);
self.num_rows.fetch_add(metrics.num_rows, Ordering::SeqCst);
self.max_sequence
.fetch_max(metrics.max_sequence, Ordering::SeqCst);
}
}

View File

@@ -0,0 +1,673 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use std::fmt::{Debug, Formatter};
use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use api::v1::OpType;
use datatypes::vectors::Helper;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::{ColumnId, SequenceNumber};
use table::predicate::Predicate;
use crate::flush::WriteBufferManagerRef;
use crate::memtable::bulk::part::BulkPart;
use crate::memtable::key_values::KeyValue;
use crate::memtable::stats::WriteMetrics;
use crate::memtable::time_series::{Series, Values};
use crate::memtable::{
AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, Memtable, MemtableId, MemtableRange,
MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats,
};
use crate::read::dedup::LastNonNullIter;
use crate::read::scan_region::PredicateGroup;
use crate::read::Batch;
use crate::region::options::MergeMode;
use crate::{error, metrics};
pub struct SimpleBulkMemtable {
id: MemtableId,
region_metadata: RegionMetadataRef,
alloc_tracker: AllocTracker,
max_timestamp: AtomicI64,
min_timestamp: AtomicI64,
max_sequence: AtomicU64,
dedup: bool,
merge_mode: MergeMode,
num_rows: AtomicUsize,
series: RwLock<Series>,
}
impl SimpleBulkMemtable {
pub(crate) fn new(
id: MemtableId,
region_metadata: RegionMetadataRef,
write_buffer_manager: Option<WriteBufferManagerRef>,
dedup: bool,
merge_mode: MergeMode,
) -> Self {
let dedup = if merge_mode == MergeMode::LastNonNull {
false
} else {
dedup
};
let series = RwLock::new(Series::new(&region_metadata));
Self {
id,
region_metadata,
alloc_tracker: AllocTracker::new(write_buffer_manager),
max_timestamp: AtomicI64::new(i64::MIN),
min_timestamp: AtomicI64::new(i64::MAX),
max_sequence: AtomicU64::new(0),
dedup,
merge_mode,
num_rows: AtomicUsize::new(0),
series,
}
}
fn build_projection(&self, projection: Option<&[ColumnId]>) -> HashSet<ColumnId> {
if let Some(projection) = projection {
projection.iter().copied().collect()
} else {
self.region_metadata
.field_columns()
.map(|c| c.column_id)
.collect()
}
}
fn create_iter(
&self,
projection: Option<&[ColumnId]>,
sequence: Option<SequenceNumber>,
) -> error::Result<BatchIterBuilder> {
let mut series = self.series.write().unwrap();
let values = if series.is_empty() {
None
} else {
Some(series.compact(&self.region_metadata)?.clone())
};
let projection = self.build_projection(projection);
Ok(BatchIterBuilder {
region_metadata: self.region_metadata.clone(),
values,
projection,
dedup: self.dedup,
sequence,
merge_mode: self.merge_mode,
})
}
fn write_key_value(&self, kv: KeyValue, stats: &mut WriteMetrics) {
let ts = kv.timestamp();
let sequence = kv.sequence();
let op_type = kv.op_type();
let mut series = self.series.write().unwrap();
let size = series.push(ts, sequence, op_type, kv.fields());
stats.value_bytes += size;
// safety: timestamp of kv must be both present and a valid timestamp value.
let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value();
stats.min_ts = stats.min_ts.min(ts);
stats.max_ts = stats.max_ts.max(ts);
}
/// Updates memtable stats.
fn update_stats(&self, stats: WriteMetrics) {
self.alloc_tracker
.on_allocation(stats.key_bytes + stats.value_bytes);
self.num_rows.fetch_add(stats.num_rows, Ordering::SeqCst);
self.max_timestamp.fetch_max(stats.max_ts, Ordering::SeqCst);
self.min_timestamp.fetch_min(stats.min_ts, Ordering::SeqCst);
self.max_sequence
.fetch_max(stats.max_sequence, Ordering::SeqCst);
}
#[cfg(test)]
fn schema(&self) -> &RegionMetadataRef {
&self.region_metadata
}
}
impl Debug for SimpleBulkMemtable {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SimpleBulkMemtable").finish()
}
}
impl Memtable for SimpleBulkMemtable {
fn id(&self) -> MemtableId {
self.id
}
fn write(&self, kvs: &KeyValues) -> error::Result<()> {
let mut stats = WriteMetrics::default();
let max_sequence = kvs.max_sequence();
for kv in kvs.iter() {
self.write_key_value(kv, &mut stats);
}
stats.max_sequence = max_sequence;
stats.num_rows = kvs.num_rows();
self.update_stats(stats);
Ok(())
}
fn write_one(&self, kv: KeyValue) -> error::Result<()> {
debug_assert_eq!(0, kv.num_primary_keys());
let mut stats = WriteMetrics::default();
self.write_key_value(kv, &mut stats);
stats.num_rows = 1;
stats.max_sequence = kv.sequence();
self.update_stats(stats);
Ok(())
}
fn write_bulk(&self, part: BulkPart) -> error::Result<()> {
let rb = &part.batch;
let ts = Helper::try_into_vector(
rb.column_by_name(&self.region_metadata.time_index_column().column_schema.name)
.with_context(|| error::InvalidRequestSnafu {
region_id: self.region_metadata.region_id,
reason: "Timestamp not found",
})?,
)
.context(error::ConvertVectorSnafu)?;
let sequence = part.sequence;
let fields: Vec<_> = self
.region_metadata
.field_columns()
.map(|f| {
let array = rb.column_by_name(&f.column_schema.name).ok_or_else(|| {
error::InvalidRequestSnafu {
region_id: self.region_metadata.region_id,
reason: format!("Column {} not found", f.column_schema.name),
}
.build()
})?;
Helper::try_into_vector(array).context(error::ConvertVectorSnafu)
})
.collect::<error::Result<Vec<_>>>()?;
let mut series = self.series.write().unwrap();
let extend_timer = metrics::REGION_WORKER_HANDLE_WRITE_ELAPSED
.with_label_values(&["bulk_extend"])
.start_timer();
series.extend(ts, OpType::Put as u8, sequence, fields.into_iter())?;
extend_timer.observe_duration();
self.update_stats(WriteMetrics {
key_bytes: 0,
value_bytes: part.estimated_size(),
min_ts: part.min_ts,
max_ts: part.max_ts,
num_rows: part.num_rows,
max_sequence: sequence,
});
Ok(())
}
fn iter(
&self,
projection: Option<&[ColumnId]>,
_predicate: Option<Predicate>,
sequence: Option<SequenceNumber>,
) -> error::Result<BoxedBatchIterator> {
let iter = self.create_iter(projection, sequence)?.build()?;
if self.merge_mode == MergeMode::LastNonNull {
let iter = LastNonNullIter::new(iter);
Ok(Box::new(iter))
} else {
Ok(Box::new(iter))
}
}
fn ranges(
&self,
projection: Option<&[ColumnId]>,
predicate: PredicateGroup,
sequence: Option<SequenceNumber>,
) -> error::Result<MemtableRanges> {
let builder = Box::new(self.create_iter(projection, sequence).unwrap());
let context = Arc::new(MemtableRangeContext::new(self.id, builder, predicate));
Ok(MemtableRanges {
ranges: [(0, MemtableRange::new(context))].into(),
stats: self.stats(),
})
}
fn is_empty(&self) -> bool {
self.series.read().unwrap().is_empty()
}
fn freeze(&self) -> error::Result<()> {
self.series.write().unwrap().freeze(&self.region_metadata);
Ok(())
}
fn stats(&self) -> MemtableStats {
let estimated_bytes = self.alloc_tracker.bytes_allocated();
let num_rows = self.num_rows.load(Ordering::Relaxed);
if num_rows == 0 {
// no rows ever written
return MemtableStats {
estimated_bytes,
time_range: None,
num_rows: 0,
num_ranges: 0,
max_sequence: 0,
};
}
let ts_type = self
.region_metadata
.time_index_column()
.column_schema
.data_type
.clone()
.as_timestamp()
.expect("Timestamp column must have timestamp type");
let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
MemtableStats {
estimated_bytes,
time_range: Some((min_timestamp, max_timestamp)),
num_rows,
num_ranges: 1,
max_sequence: self.max_sequence.load(Ordering::Relaxed),
}
}
fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
Arc::new(Self::new(
id,
metadata.clone(),
self.alloc_tracker.write_buffer_manager(),
self.dedup,
self.merge_mode,
))
}
}
#[derive(Clone)]
struct BatchIterBuilder {
region_metadata: RegionMetadataRef,
values: Option<Values>,
projection: HashSet<ColumnId>,
sequence: Option<SequenceNumber>,
dedup: bool,
merge_mode: MergeMode,
}
impl IterBuilder for BatchIterBuilder {
fn build(&self) -> error::Result<BoxedBatchIterator> {
let Some(values) = self.values.clone() else {
return Ok(Box::new(Iter { batch: None }));
};
let maybe_batch = values
.to_batch(&[], &self.region_metadata, &self.projection, self.dedup)
.and_then(|mut b| {
b.filter_by_sequence(self.sequence)?;
Ok(b)
})
.map(Some)
.transpose();
let iter = Iter { batch: maybe_batch };
if self.merge_mode == MergeMode::LastNonNull {
Ok(Box::new(LastNonNullIter::new(iter)))
} else {
Ok(Box::new(iter))
}
}
}
struct Iter {
batch: Option<error::Result<Batch>>,
}
impl Iterator for Iter {
type Item = error::Result<Batch>;
fn next(&mut self) -> Option<Self::Item> {
self.batch.take()
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use api::v1::value::ValueData;
use api::v1::{Mutation, OpType, Row, Rows, SemanticType};
use common_recordbatch::DfRecordBatch;
use common_time::Timestamp;
use datatypes::arrow::array::{ArrayRef, Float64Array, TimestampMillisecondArray};
use datatypes::arrow_array::StringArray;
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::{ScalarVector, Vector};
use datatypes::schema::ColumnSchema;
use datatypes::value::Value;
use datatypes::vectors::TimestampMillisecondVector;
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
use store_api::storage::SequenceNumber;
use super::*;
use crate::region::options::MergeMode;
use crate::test_util::column_metadata_to_column_schema;
fn new_test_metadata() -> RegionMetadataRef {
let mut builder = RegionMetadataBuilder::new(1.into());
builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 1,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("f1", ConcreteDataType::float64_datatype(), true),
semantic_type: SemanticType::Field,
column_id: 2,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("f2", ConcreteDataType::string_datatype(), true),
semantic_type: SemanticType::Field,
column_id: 3,
});
Arc::new(builder.build().unwrap())
}
fn new_test_memtable(dedup: bool, merge_mode: MergeMode) -> SimpleBulkMemtable {
SimpleBulkMemtable::new(1, new_test_metadata(), None, dedup, merge_mode)
}
fn build_key_values(
metadata: &RegionMetadataRef,
sequence: SequenceNumber,
row_values: &[(i64, f64, String)],
) -> KeyValues {
let column_schemas: Vec<_> = metadata
.column_metadatas
.iter()
.map(column_metadata_to_column_schema)
.collect();
let rows: Vec<_> = row_values
.iter()
.map(|(ts, f1, f2)| Row {
values: vec![
api::v1::Value {
value_data: Some(ValueData::TimestampMillisecondValue(*ts)),
},
api::v1::Value {
value_data: Some(ValueData::F64Value(*f1)),
},
api::v1::Value {
value_data: Some(ValueData::StringValue(f2.clone())),
},
],
})
.collect();
let mutation = Mutation {
op_type: OpType::Put as i32,
sequence,
rows: Some(Rows {
schema: column_schemas,
rows,
}),
write_hint: None,
};
KeyValues::new(metadata, mutation).unwrap()
}
#[test]
fn test_write_and_iter() {
let memtable = new_test_memtable(false, MergeMode::LastRow);
memtable
.write(&build_key_values(
&memtable.region_metadata,
0,
&[(1, 1.0, "a".to_string())],
))
.unwrap();
memtable
.write(&build_key_values(
&memtable.region_metadata,
1,
&[(2, 2.0, "b".to_string())],
))
.unwrap();
let mut iter = memtable.iter(None, None, None).unwrap();
let batch = iter.next().unwrap().unwrap();
assert_eq!(2, batch.num_rows());
assert_eq!(2, batch.fields().len());
let ts_v = batch
.timestamps()
.as_any()
.downcast_ref::<TimestampMillisecondVector>()
.unwrap();
assert_eq!(Value::Timestamp(Timestamp::new_millisecond(1)), ts_v.get(0));
assert_eq!(Value::Timestamp(Timestamp::new_millisecond(2)), ts_v.get(1));
}
#[test]
fn test_projection() {
let memtable = new_test_memtable(false, MergeMode::LastRow);
memtable
.write(&build_key_values(
&memtable.region_metadata,
0,
&[(1, 1.0, "a".to_string())],
))
.unwrap();
let mut iter = memtable.iter(None, None, None).unwrap();
let batch = iter.next().unwrap().unwrap();
assert_eq!(1, batch.num_rows());
assert_eq!(2, batch.fields().len());
let ts_v = batch
.timestamps()
.as_any()
.downcast_ref::<TimestampMillisecondVector>()
.unwrap();
assert_eq!(Value::Timestamp(Timestamp::new_millisecond(1)), ts_v.get(0));
// Only project column 2 (f1)
let projection = vec![2];
let mut iter = memtable.iter(Some(&projection), None, None).unwrap();
let batch = iter.next().unwrap().unwrap();
assert_eq!(1, batch.num_rows());
assert_eq!(1, batch.fields().len()); // only f1
assert_eq!(2, batch.fields()[0].column_id);
}
#[test]
fn test_dedup() {
let memtable = new_test_memtable(true, MergeMode::LastRow);
memtable
.write(&build_key_values(
&memtable.region_metadata,
0,
&[(1, 1.0, "a".to_string())],
))
.unwrap();
memtable
.write(&build_key_values(
&memtable.region_metadata,
1,
&[(1, 2.0, "b".to_string())],
))
.unwrap();
let mut iter = memtable.iter(None, None, None).unwrap();
let batch = iter.next().unwrap().unwrap();
assert_eq!(1, batch.num_rows()); // deduped to 1 row
assert_eq!(2.0, batch.fields()[0].data.get(0).as_f64_lossy().unwrap()); // last write wins
}
#[test]
fn test_write_one() {
let memtable = new_test_memtable(false, MergeMode::LastRow);
let kvs = build_key_values(&memtable.region_metadata, 0, &[(1, 1.0, "a".to_string())]);
let kv = kvs.iter().next().unwrap();
memtable.write_one(kv).unwrap();
let mut iter = memtable.iter(None, None, None).unwrap();
let batch = iter.next().unwrap().unwrap();
assert_eq!(1, batch.num_rows());
}
#[test]
fn test_write_bulk() {
let memtable = new_test_memtable(false, MergeMode::LastRow);
let arrow_schema = memtable.schema().schema.arrow_schema().clone();
let arrays = vec![
Arc::new(TimestampMillisecondArray::from(vec![1, 2])) as ArrayRef,
Arc::new(Float64Array::from(vec![1.0, 2.0])) as ArrayRef,
Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
];
let rb = DfRecordBatch::try_new(arrow_schema, arrays).unwrap();
let part = BulkPart {
batch: rb,
sequence: 1,
min_ts: 1,
max_ts: 2,
num_rows: 2,
};
memtable.write_bulk(part).unwrap();
let mut iter = memtable.iter(None, None, None).unwrap();
let batch = iter.next().unwrap().unwrap();
assert_eq!(2, batch.num_rows());
let stats = memtable.stats();
assert_eq!(1, stats.max_sequence);
assert_eq!(2, stats.num_rows);
assert_eq!(
Some((Timestamp::new_millisecond(1), Timestamp::new_millisecond(2))),
stats.time_range
);
let kvs = build_key_values(&memtable.region_metadata, 2, &[(3, 3.0, "c".to_string())]);
memtable.write(&kvs).unwrap();
let mut iter = memtable.iter(None, None, None).unwrap();
let batch = iter.next().unwrap().unwrap();
assert_eq!(3, batch.num_rows());
assert_eq!(
vec![1, 2, 3],
batch
.timestamps()
.as_any()
.downcast_ref::<TimestampMillisecondVector>()
.unwrap()
.iter_data()
.map(|t| { t.unwrap().0.value() })
.collect::<Vec<_>>()
);
}
#[test]
fn test_is_empty() {
let memtable = new_test_memtable(false, MergeMode::LastRow);
assert!(memtable.is_empty());
memtable
.write(&build_key_values(
&memtable.region_metadata,
0,
&[(1, 1.0, "a".to_string())],
))
.unwrap();
assert!(!memtable.is_empty());
}
#[test]
fn test_stats() {
let memtable = new_test_memtable(false, MergeMode::LastRow);
let stats = memtable.stats();
assert_eq!(0, stats.num_rows);
assert!(stats.time_range.is_none());
memtable
.write(&build_key_values(
&memtable.region_metadata,
0,
&[(1, 1.0, "a".to_string())],
))
.unwrap();
let stats = memtable.stats();
assert_eq!(1, stats.num_rows);
assert!(stats.time_range.is_some());
}
#[test]
fn test_fork() {
let memtable = new_test_memtable(false, MergeMode::LastRow);
memtable
.write(&build_key_values(
&memtable.region_metadata,
0,
&[(1, 1.0, "a".to_string())],
))
.unwrap();
let forked = memtable.fork(2, &memtable.region_metadata);
assert!(forked.is_empty());
}
#[test]
fn test_sequence_filter() {
let memtable = new_test_memtable(false, MergeMode::LastRow);
memtable
.write(&build_key_values(
&memtable.region_metadata,
0,
&[(1, 1.0, "a".to_string())],
))
.unwrap();
memtable
.write(&build_key_values(
&memtable.region_metadata,
1,
&[(2, 2.0, "b".to_string())],
))
.unwrap();
// Filter with sequence 0 should only return first write
let mut iter = memtable.iter(None, None, Some(0)).unwrap();
let batch = iter.next().unwrap().unwrap();
assert_eq!(1, batch.num_rows());
assert_eq!(1.0, batch.fields()[0].data.get(0).as_f64_lossy().unwrap());
}
}

View File

@@ -14,6 +14,8 @@
//! Internal metrics of the memtable.
use store_api::storage::SequenceNumber;
/// Metrics of writing memtables.
pub(crate) struct WriteMetrics {
/// Size allocated by keys.
@@ -24,6 +26,10 @@ pub(crate) struct WriteMetrics {
pub(crate) min_ts: i64,
/// Maximum timestamp
pub(crate) max_ts: i64,
/// Rows written.
pub(crate) num_rows: usize,
/// Max sequence number written.
pub(crate) max_sequence: SequenceNumber,
}
impl Default for WriteMetrics {
@@ -33,6 +39,8 @@ impl Default for WriteMetrics {
value_bytes: 0,
min_ts: i64::MAX,
max_ts: i64::MIN,
num_rows: 0,
max_sequence: SequenceNumber::MIN,
}
}
}

View File

@@ -15,7 +15,7 @@
//! Partitions memtables by time.
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, MutexGuard};
use std::time::Duration;
use common_telemetry::debug;
@@ -26,7 +26,9 @@ use smallvec::{smallvec, SmallVec};
use snafu::OptionExt;
use store_api::metadata::RegionMetadataRef;
use crate::error;
use crate::error::{InvalidRequestSnafu, Result};
use crate::memtable::bulk::part::BulkPart;
use crate::memtable::key_values::KeyValue;
use crate::memtable::version::SmallMemtableVec;
use crate::memtable::{KeyValues, MemtableBuilderRef, MemtableId, MemtableRef};
@@ -55,6 +57,11 @@ impl TimePartition {
fn write(&self, kvs: &KeyValues) -> Result<()> {
self.memtable.write(kvs)
}
/// Writes a record batch to memtable.
fn write_record_batch(&self, rb: BulkPart) -> error::Result<()> {
self.memtable.write_bulk(rb)
}
}
type PartitionVec = SmallVec<[TimePartition; 2]>;
@@ -141,6 +148,101 @@ impl TimePartitions {
self.write_multi_parts(kvs, &parts)
}
pub fn write_bulk(&self, rb: BulkPart) -> Result<()> {
// Get all parts.
let parts = self.list_partitions();
let mut matched = vec![];
for part in &parts {
let Some(part_time_range) = part.time_range.as_ref() else {
matched.push(part);
continue;
};
if !(rb.max_ts < part_time_range.min_timestamp.value()
|| rb.min_ts >= part_time_range.max_timestamp.value())
{
// find all intersecting time partitions.
matched.push(part);
}
}
if !matched.is_empty() {
// fixme(hl): we now only write to the first time partition, we should strictly
// split the record batch according to time window
matched[0].write_record_batch(rb)
} else {
// safety: part_duration field must be set when reach here because otherwise
// matched won't be empty.
let part_duration = self.part_duration.unwrap();
let bulk_start_ts = self
.metadata
.time_index_column()
.column_schema
.data_type
.as_timestamp()
.unwrap()
.create_timestamp(rb.min_ts);
let part_start =
partition_start_timestamp(bulk_start_ts, part_duration).with_context(|| {
InvalidRequestSnafu {
region_id: self.metadata.region_id,
reason: format!(
"timestamp {bulk_start_ts:?} and bucket {part_duration:?} are out of range"
),
}
})?;
let new_part = {
let mut inner = self.inner.lock().unwrap();
self.create_time_partition(part_start, &mut inner)?
};
new_part.memtable.write_bulk(rb)
}
}
// Creates new parts and return the partition created.
// Acquires the lock to avoid others create the same partition.
fn create_time_partition(
&self,
part_start: Timestamp,
inner: &mut MutexGuard<PartitionsInner>,
) -> Result<TimePartition> {
let part_duration = self.part_duration.unwrap();
let part_pos = match inner
.parts
.iter()
.position(|part| part.time_range.unwrap().min_timestamp == part_start)
{
Some(pos) => pos,
None => {
let range = PartTimeRange::from_start_duration(part_start, part_duration)
.with_context(|| InvalidRequestSnafu {
region_id: self.metadata.region_id,
reason: format!(
"Partition time range for {part_start:?} is out of bound, bucket size: {part_duration:?}",
),
})?;
let memtable = self
.builder
.build(inner.alloc_memtable_id(), &self.metadata);
debug!(
"Create time partition {:?} for region {}, duration: {:?}, memtable_id: {}, parts_total: {}",
range,
self.metadata.region_id,
part_duration,
memtable.id(),
inner.parts.len() + 1
);
let pos = inner.parts.len();
inner.parts.push(TimePartition {
memtable,
time_range: Some(range),
});
pos
}
};
Ok(inner.parts[part_pos].clone())
}
/// Append memtables in partitions to `memtables`.
pub fn list_memtables(&self, memtables: &mut Vec<MemtableRef>) {
let inner = self.inner.lock().unwrap();
@@ -330,48 +432,13 @@ impl TimePartitions {
}
}
let part_duration = self.part_duration.unwrap();
// Creates new parts and writes to them. Acquires the lock to avoid others create
// the same partition.
let mut inner = self.inner.lock().unwrap();
for (part_start, key_values) in missing_parts {
let part_pos = match inner
.parts
.iter()
.position(|part| part.time_range.unwrap().min_timestamp == part_start)
{
Some(pos) => pos,
None => {
let range = PartTimeRange::from_start_duration(part_start, part_duration)
.with_context(|| InvalidRequestSnafu {
region_id: self.metadata.region_id,
reason: format!(
"Partition time range for {part_start:?} is out of bound, bucket size: {part_duration:?}",
),
})?;
let memtable = self
.builder
.build(inner.alloc_memtable_id(), &self.metadata);
debug!(
"Create time partition {:?} for region {}, duration: {:?}, memtable_id: {}, parts_total: {}",
range,
self.metadata.region_id,
part_duration,
memtable.id(),
inner.parts.len() + 1
);
let pos = inner.parts.len();
inner.parts.push(TimePartition {
memtable,
time_range: Some(range),
});
pos
}
};
let memtable = &inner.parts[part_pos].memtable;
let partition = self.create_time_partition(part_start, &mut inner)?;
for kv in key_values {
memtable.write_one(kv)?;
partition.memtable.write_one(kv)?;
}
}

View File

@@ -15,6 +15,7 @@
use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, Bound, HashSet};
use std::fmt::{Debug, Formatter};
use std::iter;
use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
@@ -25,28 +26,30 @@ use common_telemetry::{debug, error};
use common_time::Timestamp;
use datatypes::arrow;
use datatypes::arrow::array::ArrayRef;
use datatypes::arrow_array::StringArray;
use datatypes::data_type::{ConcreteDataType, DataType};
use datatypes::prelude::{MutableVector, Vector, VectorRef};
use datatypes::prelude::{ScalarVector, Vector, VectorRef};
use datatypes::types::TimestampType;
use datatypes::value::{Value, ValueRef};
use datatypes::vectors::{
Helper, TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector,
TimestampSecondVector, UInt64Vector, UInt8Vector,
};
use snafu::{ensure, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::{ColumnId, SequenceNumber};
use table::predicate::Predicate;
use crate::error::{
ComputeArrowSnafu, ConvertVectorSnafu, PrimaryKeyLengthMismatchSnafu, Result,
UnsupportedOperationSnafu,
};
use crate::error;
use crate::error::{ComputeArrowSnafu, ConvertVectorSnafu, PrimaryKeyLengthMismatchSnafu, Result};
use crate::flush::WriteBufferManagerRef;
use crate::memtable::builder::{FieldBuilder, StringBuilder};
use crate::memtable::bulk::part::BulkPart;
use crate::memtable::key_values::KeyValue;
use crate::memtable::simple_bulk_memtable::SimpleBulkMemtable;
use crate::memtable::stats::WriteMetrics;
use crate::memtable::{
AllocTracker, BoxedBatchIterator, BulkPart, IterBuilder, KeyValues, Memtable, MemtableBuilder,
AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, Memtable, MemtableBuilder,
MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats,
PredicateGroup,
};
@@ -57,7 +60,7 @@ use crate::region::options::MergeMode;
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
/// Initial vector builder capacity.
const INITIAL_BUILDER_CAPACITY: usize = 16;
const INITIAL_BUILDER_CAPACITY: usize = 1024 * 8;
/// Vector builder capacity.
const BUILDER_CAPACITY: usize = 512;
@@ -87,13 +90,23 @@ impl TimeSeriesMemtableBuilder {
impl MemtableBuilder for TimeSeriesMemtableBuilder {
fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
Arc::new(TimeSeriesMemtable::new(
metadata.clone(),
id,
self.write_buffer_manager.clone(),
self.dedup,
self.merge_mode,
))
if metadata.primary_key.is_empty() {
Arc::new(SimpleBulkMemtable::new(
id,
metadata.clone(),
self.write_buffer_manager.clone(),
self.dedup,
self.merge_mode,
))
} else {
Arc::new(TimeSeriesMemtable::new(
metadata.clone(),
id,
self.write_buffer_manager.clone(),
self.dedup,
self.merge_mode,
))
}
}
}
@@ -149,6 +162,9 @@ impl TimeSeriesMemtable {
.on_allocation(stats.key_bytes + stats.value_bytes);
self.max_timestamp.fetch_max(stats.max_ts, Ordering::SeqCst);
self.min_timestamp.fetch_min(stats.min_ts, Ordering::SeqCst);
self.max_sequence
.fetch_max(stats.max_sequence, Ordering::SeqCst);
self.num_rows.fetch_add(stats.num_rows, Ordering::SeqCst);
}
fn write_key_value(&self, kv: KeyValue, stats: &mut WriteMetrics) -> Result<()> {
@@ -198,17 +214,12 @@ impl Memtable for TimeSeriesMemtable {
}
local_stats.value_bytes += kvs.num_rows() * std::mem::size_of::<Timestamp>();
local_stats.value_bytes += kvs.num_rows() * std::mem::size_of::<OpType>();
local_stats.max_sequence = kvs.max_sequence();
local_stats.num_rows = kvs.num_rows();
// TODO(hl): this maybe inaccurate since for-iteration may return early.
// We may lift the primary key length check out of Memtable::write
// so that we can ensure writing to memtable will succeed.
self.update_stats(local_stats);
// update max_sequence
let sequence = kvs.max_sequence();
self.max_sequence.fetch_max(sequence, Ordering::Relaxed);
self.num_rows.fetch_add(kvs.num_rows(), Ordering::Relaxed);
Ok(())
}
@@ -216,24 +227,31 @@ impl Memtable for TimeSeriesMemtable {
let mut metrics = WriteMetrics::default();
let res = self.write_key_value(key_value, &mut metrics);
metrics.value_bytes += std::mem::size_of::<Timestamp>() + std::mem::size_of::<OpType>();
metrics.max_sequence = key_value.sequence();
metrics.num_rows = 1;
self.update_stats(metrics);
// update max_sequence
if res.is_ok() {
self.max_sequence
.fetch_max(key_value.sequence(), Ordering::Relaxed);
self.update_stats(metrics);
}
self.num_rows.fetch_add(1, Ordering::Relaxed);
res
}
fn write_bulk(&self, _part: BulkPart) -> Result<()> {
UnsupportedOperationSnafu {
err_msg: "TimeSeriesMemtable does not support write_bulk",
fn write_bulk(&self, part: BulkPart) -> Result<()> {
// Default implementation fallback to row iteration.
let mutation = part.to_mutation(&self.region_metadata)?;
let mut metrics = WriteMetrics::default();
if let Some(key_values) = KeyValues::new(&self.region_metadata, mutation) {
for kv in key_values.iter() {
self.write_key_value(kv, &mut metrics)?
}
}
.fail()
metrics.max_sequence = part.sequence;
metrics.max_ts = part.max_ts;
metrics.min_ts = part.min_ts;
metrics.num_rows = part.num_rows;
self.update_stats(metrics);
Ok(())
}
fn iter(
@@ -268,7 +286,7 @@ impl Memtable for TimeSeriesMemtable {
projection: Option<&[ColumnId]>,
predicate: PredicateGroup,
sequence: Option<SequenceNumber>,
) -> MemtableRanges {
) -> Result<MemtableRanges> {
let projection = if let Some(projection) = projection {
projection.iter().copied().collect()
} else {
@@ -287,10 +305,10 @@ impl Memtable for TimeSeriesMemtable {
});
let context = Arc::new(MemtableRangeContext::new(self.id, builder, predicate));
MemtableRanges {
Ok(MemtableRanges {
ranges: [(0, MemtableRange::new(context))].into(),
stats: self.stats(),
}
})
}
fn is_empty(&self) -> bool {
@@ -349,10 +367,10 @@ impl Memtable for TimeSeriesMemtable {
type SeriesRwLockMap = RwLock<BTreeMap<Vec<u8>, Arc<RwLock<Series>>>>;
#[derive(Clone)]
struct SeriesSet {
region_metadata: RegionMetadataRef,
series: Arc<SeriesRwLockMap>,
codec: Arc<DensePrimaryKeyCodec>,
pub(crate) struct SeriesSet {
pub(crate) region_metadata: RegionMetadataRef,
pub(crate) series: Arc<SeriesRwLockMap>,
pub(crate) codec: Arc<DensePrimaryKeyCodec>,
}
impl SeriesSet {
@@ -637,7 +655,7 @@ fn prune_primary_key(
}
/// A `Series` holds a list of field values of some given primary key.
struct Series {
pub(crate) struct Series {
pk_cache: Option<Vec<Value>>,
active: ValueBuilder,
frozen: Vec<Values>,
@@ -645,7 +663,7 @@ struct Series {
}
impl Series {
fn new(region_metadata: &RegionMetadataRef) -> Self {
pub(crate) fn new(region_metadata: &RegionMetadataRef) -> Self {
Self {
pk_cache: None,
active: ValueBuilder::new(region_metadata, INITIAL_BUILDER_CAPACITY),
@@ -654,8 +672,12 @@ impl Series {
}
}
pub fn is_empty(&self) -> bool {
self.active.len() == 0 && self.frozen.is_empty()
}
/// Pushes a row of values into Series. Return the size of values.
fn push<'a>(
pub(crate) fn push<'a>(
&mut self,
ts: ValueRef<'a>,
sequence: u64,
@@ -675,7 +697,7 @@ impl Series {
}
/// Freezes the active part and push it to `frozen`.
fn freeze(&mut self, region_metadata: &RegionMetadataRef) {
pub(crate) fn freeze(&mut self, region_metadata: &RegionMetadataRef) {
if self.active.len() != 0 {
let mut builder = ValueBuilder::new(region_metadata, INITIAL_BUILDER_CAPACITY);
std::mem::swap(&mut self.active, &mut builder);
@@ -683,9 +705,19 @@ impl Series {
}
}
pub(crate) fn extend(
&mut self,
ts_v: VectorRef,
op_type_v: u8,
sequence_v: u64,
fields: impl Iterator<Item = VectorRef>,
) -> Result<()> {
self.active.extend(ts_v, op_type_v, sequence_v, fields)
}
/// Freezes active part to frozen part and compact frozen part to reduce memory fragmentation.
/// Returns the frozen and compacted values.
fn compact(&mut self, region_metadata: &RegionMetadataRef) -> Result<&Values> {
pub(crate) fn compact(&mut self, region_metadata: &RegionMetadataRef) -> Result<&Values> {
self.freeze(region_metadata);
let frozen = &self.frozen;
@@ -729,12 +761,12 @@ struct ValueBuilder {
timestamp_type: ConcreteDataType,
sequence: Vec<u64>,
op_type: Vec<u8>,
fields: Vec<Option<Box<dyn MutableVector>>>,
fields: Vec<Option<FieldBuilder>>,
field_types: Vec<ConcreteDataType>,
}
impl ValueBuilder {
fn new(region_metadata: &RegionMetadataRef, capacity: usize) -> Self {
pub(crate) fn new(region_metadata: &RegionMetadataRef, capacity: usize) -> Self {
let timestamp_type = region_metadata
.time_index_column()
.column_schema
@@ -788,12 +820,19 @@ impl ValueBuilder {
size += field_value.data_size();
if !field_value.is_null() || self.fields[idx].is_some() {
if let Some(field) = self.fields[idx].as_mut() {
let _ = field.try_push_value_ref(field_value);
let _ = field.push(field_value);
} else {
let mut mutable_vector = self.field_types[idx]
.create_mutable_vector(num_rows.max(INITIAL_BUILDER_CAPACITY));
let mut mutable_vector =
if let ConcreteDataType::String(_) = &self.field_types[idx] {
FieldBuilder::String(StringBuilder::with_capacity(256, 4096))
} else {
FieldBuilder::Other(
self.field_types[idx]
.create_mutable_vector(num_rows.max(INITIAL_BUILDER_CAPACITY)),
)
};
mutable_vector.push_nulls(num_rows - 1);
let _ = mutable_vector.try_push_value_ref(field_value);
let _ = mutable_vector.push(field_value);
self.fields[idx] = Some(mutable_vector);
}
}
@@ -802,6 +841,96 @@ impl ValueBuilder {
size
}
pub(crate) fn extend(
&mut self,
ts_v: VectorRef,
op_type: u8,
sequence: u64,
fields: impl Iterator<Item = VectorRef>,
) -> error::Result<()> {
let num_rows_before = self.timestamp.len();
let num_rows_to_write = ts_v.len();
self.timestamp.reserve(num_rows_to_write);
match self.timestamp_type {
ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
self.timestamp.extend(
ts_v.as_any()
.downcast_ref::<TimestampSecondVector>()
.unwrap()
.iter_data()
.map(|v| v.unwrap().0.value()),
);
}
ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
self.timestamp.extend(
ts_v.as_any()
.downcast_ref::<TimestampMillisecondVector>()
.unwrap()
.iter_data()
.map(|v| v.unwrap().0.value()),
);
}
ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
self.timestamp.extend(
ts_v.as_any()
.downcast_ref::<TimestampMicrosecondVector>()
.unwrap()
.iter_data()
.map(|v| v.unwrap().0.value()),
);
}
ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
self.timestamp.extend(
ts_v.as_any()
.downcast_ref::<TimestampNanosecondVector>()
.unwrap()
.iter_data()
.map(|v| v.unwrap().0.value()),
);
}
_ => unreachable!(),
};
self.op_type.reserve(num_rows_to_write);
self.op_type
.extend(iter::repeat_n(op_type, num_rows_to_write));
self.sequence.reserve(num_rows_to_write);
self.sequence
.extend(iter::repeat_n(sequence, num_rows_to_write));
for (field_idx, (field_src, field_dest)) in fields.zip(self.fields.iter_mut()).enumerate() {
let builder = field_dest.get_or_insert_with(|| {
let mut field_builder =
FieldBuilder::create(&self.field_types[field_idx], INITIAL_BUILDER_CAPACITY);
field_builder.push_nulls(num_rows_before);
field_builder
});
match builder {
FieldBuilder::String(builder) => {
let array = field_src.to_arrow_array();
let string_array =
array
.as_any()
.downcast_ref::<StringArray>()
.with_context(|| error::InvalidBatchSnafu {
reason: format!(
"Field type mismatch, expecting String, given: {}",
field_src.data_type()
),
})?;
builder.append_array(string_array);
}
FieldBuilder::Other(builder) => {
let len = field_src.len();
builder
.extend_slice_of(&*field_src, 0, len)
.context(error::ComputeVectorSnafu)?;
}
}
}
Ok(())
}
/// Returns the length of [ValueBuilder]
fn len(&self) -> usize {
let sequence_len = self.sequence.len();
@@ -813,7 +942,7 @@ impl ValueBuilder {
/// [Values] holds an immutable vectors of field columns, including `sequence` and `op_type`.
#[derive(Clone)]
struct Values {
pub(crate) struct Values {
timestamp: VectorRef,
sequence: Arc<UInt64Vector>,
op_type: Arc<UInt8Vector>,
@@ -891,7 +1020,7 @@ impl From<ValueBuilder> for Values {
.enumerate()
.map(|(i, v)| {
if let Some(v) = v {
v.to_vector()
v.finish()
} else {
let mut single_null = value.field_types[i].create_mutable_vector(num_rows);
single_null.push_nulls(num_rows);
@@ -899,6 +1028,7 @@ impl From<ValueBuilder> for Values {
}
})
.collect::<Vec<_>>();
let sequence = Arc::new(UInt64Vector::from_vec(value.sequence));
let op_type = Arc::new(UInt8Vector::from_vec(value.op_type));
let timestamp: VectorRef = match value.timestamp_type {

View File

@@ -392,6 +392,15 @@ lazy_static! {
// 0.01 ~ 1000
exponential_buckets(0.01, 10.0, 6).unwrap(),
).unwrap();
pub static ref REGION_WORKER_HANDLE_WRITE_ELAPSED: HistogramVec = register_histogram_vec!(
"greptime_region_worker_handle_write",
"elapsed time for handling writes in region worker loop",
&["stage"],
exponential_buckets(0.001, 10.0, 5).unwrap()
).unwrap();
}
/// Stager notifier to collect metrics.

View File

@@ -371,14 +371,14 @@ impl ScanRegion {
let memtables = memtables
.into_iter()
.map(|mem| {
let ranges = mem.ranges(
mem.ranges(
Some(mapper.column_ids()),
predicate.clone(),
self.request.sequence,
);
MemRangeBuilder::new(ranges)
)
.map(MemRangeBuilder::new)
})
.collect();
.collect::<Result<Vec<_>>>()?;
let input = ScanInput::new(self.access_layer, mapper)
.with_time_range(Some(time_range))

View File

@@ -23,7 +23,9 @@ use store_api::logstore::LogStore;
use store_api::storage::{RegionId, SequenceNumber};
use crate::error::{Error, Result, WriteGroupSnafu};
use crate::memtable::bulk::part::BulkPart;
use crate::memtable::KeyValues;
use crate::metrics;
use crate::region::version::{VersionControlData, VersionControlRef, VersionRef};
use crate::request::OptionOutputTx;
use crate::wal::{EntryId, WalWriter};
@@ -92,6 +94,10 @@ pub(crate) struct RegionWriteCtx {
///
/// The i-th notify is for i-th mutation.
notifiers: Vec<WriteNotify>,
/// Notifiers for bulk requests.
bulk_notifiers: Vec<WriteNotify>,
/// Pending bulk write requests
pub(crate) bulk_parts: Vec<BulkPart>,
/// The write operation is failed and we should not write to the mutable memtable.
failed: bool,
@@ -125,9 +131,11 @@ impl RegionWriteCtx {
wal_entry: WalEntry::default(),
provider,
notifiers: Vec::new(),
bulk_notifiers: vec![],
failed: false,
put_num: 0,
delete_num: 0,
bulk_parts: vec![],
}
}
@@ -243,4 +251,53 @@ impl RegionWriteCtx {
self.version_control
.set_sequence_and_entry_id(self.next_sequence - 1, self.next_entry_id - 1);
}
pub(crate) fn push_bulk(&mut self, sender: OptionOutputTx, mut bulk: BulkPart) {
self.bulk_notifiers
.push(WriteNotify::new(sender, bulk.num_rows));
bulk.sequence = self.next_sequence;
self.next_sequence += bulk.num_rows as u64;
self.bulk_parts.push(bulk);
}
pub(crate) async fn write_bulk(&mut self) {
if self.failed || self.bulk_parts.is_empty() {
return;
}
let _timer = metrics::REGION_WORKER_HANDLE_WRITE_ELAPSED
.with_label_values(&["write_bulk"])
.start_timer();
if self.bulk_parts.len() == 1 {
let part = self.bulk_parts.swap_remove(0);
let num_rows = part.num_rows;
if let Err(e) = self.version.memtables.mutable.write_bulk(part) {
self.bulk_notifiers[0].err = Some(Arc::new(e));
} else {
self.put_num += num_rows;
}
return;
}
let mut tasks = FuturesUnordered::new();
for (i, part) in self.bulk_parts.drain(..).enumerate() {
let mutable = self.version.memtables.mutable.clone();
tasks.push(common_runtime::spawn_blocking_global(move || {
let num_rows = part.num_rows;
(i, mutable.write_bulk(part), num_rows)
}));
}
while let Some(result) = tasks.next().await {
// first unwrap the result from `spawn` above
let (i, result, num_rows) = result.unwrap();
if let Err(err) = result {
self.bulk_notifiers[i].err = Some(Arc::new(err));
} else {
self.put_num += num_rows;
}
}
self.version_control
.set_sequence_and_entry_id(self.next_sequence - 1, self.next_entry_id - 1);
}
}

View File

@@ -47,6 +47,7 @@ use crate::error::{
FlushRegionSnafu, InvalidRequestSnafu, Result, UnexpectedImpureDefaultSnafu,
};
use crate::manifest::action::RegionEdit;
use crate::memtable::bulk::part::BulkPart;
use crate::memtable::MemtableId;
use crate::metrics::COMPACTION_ELAPSED_TOTAL;
use crate::wal::entry_distributor::WalEntryReceiver;
@@ -534,6 +535,13 @@ pub(crate) struct SenderWriteRequest {
pub(crate) request: WriteRequest,
}
pub(crate) struct SenderBulkRequest {
pub(crate) sender: OptionOutputTx,
pub(crate) region_id: RegionId,
pub(crate) request: BulkPart,
pub(crate) region_metadata: RegionMetadataRef,
}
/// Request sent to a worker
#[derive(Debug)]
pub(crate) enum WorkerRequest {

View File

@@ -30,10 +30,11 @@ use store_api::storage::{ColumnId, RegionId, SequenceNumber};
use table::predicate::Predicate;
use crate::error::Result;
use crate::memtable::bulk::part::BulkPart;
use crate::memtable::key_values::KeyValue;
use crate::memtable::partition_tree::data::{timestamp_array_to_i64_slice, DataBatch, DataBuffer};
use crate::memtable::{
BoxedBatchIterator, BulkPart, KeyValues, Memtable, MemtableBuilder, MemtableId, MemtableRanges,
BoxedBatchIterator, KeyValues, Memtable, MemtableBuilder, MemtableId, MemtableRanges,
MemtableRef, MemtableStats,
};
use crate::read::scan_region::PredicateGroup;
@@ -95,8 +96,8 @@ impl Memtable for EmptyMemtable {
_projection: Option<&[ColumnId]>,
_predicate: PredicateGroup,
_sequence: Option<SequenceNumber>,
) -> MemtableRanges {
MemtableRanges::default()
) -> Result<MemtableRanges> {
Ok(MemtableRanges::default())
}
fn is_empty(&self) -> bool {

View File

@@ -61,7 +61,8 @@ use crate::memtable::MemtableBuilderProvider;
use crate::metrics::{REGION_COUNT, WRITE_STALL_TOTAL};
use crate::region::{MitoRegionRef, OpeningRegions, OpeningRegionsRef, RegionMap, RegionMapRef};
use crate::request::{
BackgroundNotify, DdlRequest, SenderDdlRequest, SenderWriteRequest, WorkerRequest,
BackgroundNotify, DdlRequest, SenderBulkRequest, SenderDdlRequest, SenderWriteRequest,
WorkerRequest,
};
use crate::schedule::scheduler::{LocalScheduler, SchedulerRef};
use crate::sst::file::FileId;
@@ -593,22 +594,39 @@ pub(crate) struct StalledRequests {
///
/// Key: RegionId
/// Value: (estimated size, stalled requests)
pub(crate) requests: HashMap<RegionId, (usize, Vec<SenderWriteRequest>)>,
pub(crate) requests:
HashMap<RegionId, (usize, Vec<SenderWriteRequest>, Vec<SenderBulkRequest>)>,
/// Estimated size of all stalled requests.
pub(crate) estimated_size: usize,
}
impl StalledRequests {
/// Appends stalled requests.
pub(crate) fn append(&mut self, requests: &mut Vec<SenderWriteRequest>) {
pub(crate) fn append(
&mut self,
requests: &mut Vec<SenderWriteRequest>,
bulk_requests: &mut Vec<SenderBulkRequest>,
) {
for req in requests.drain(..) {
self.push(req);
}
for req in bulk_requests.drain(..) {
self.push_bulk(req);
}
}
/// Pushes a stalled request to the buffer.
pub(crate) fn push(&mut self, req: SenderWriteRequest) {
let (size, requests) = self.requests.entry(req.request.region_id).or_default();
let (size, requests, _) = self.requests.entry(req.request.region_id).or_default();
let req_size = req.request.estimated_size();
*size += req_size;
self.estimated_size += req_size;
requests.push(req);
}
pub(crate) fn push_bulk(&mut self, req: SenderBulkRequest) {
let region_id = req.region_id;
let (size, _, requests) = self.requests.entry(region_id).or_default();
let req_size = req.request.estimated_size();
*size += req_size;
self.estimated_size += req_size;
@@ -616,18 +634,24 @@ impl StalledRequests {
}
/// Removes stalled requests of specific region.
pub(crate) fn remove(&mut self, region_id: &RegionId) -> Vec<SenderWriteRequest> {
if let Some((size, requests)) = self.requests.remove(region_id) {
pub(crate) fn remove(
&mut self,
region_id: &RegionId,
) -> (Vec<SenderWriteRequest>, Vec<SenderBulkRequest>) {
if let Some((size, write_reqs, bulk_reqs)) = self.requests.remove(region_id) {
self.estimated_size -= size;
requests
(write_reqs, bulk_reqs)
} else {
vec![]
(vec![], vec![])
}
}
/// Returns the total number of all stalled requests.
pub(crate) fn stalled_count(&self) -> usize {
self.requests.values().map(|reqs| reqs.1.len()).sum()
self.requests
.values()
.map(|(_, reqs, bulk_reqs)| reqs.len() + bulk_reqs.len())
.sum()
}
}
@@ -704,6 +728,8 @@ impl<S: LogStore> RegionWorkerLoop<S> {
// Buffer to retrieve requests from receiver.
let mut write_req_buffer: Vec<SenderWriteRequest> =
Vec::with_capacity(self.config.worker_request_batch_size);
let mut bulk_req_buffer: Vec<SenderBulkRequest> =
Vec::with_capacity(self.config.worker_request_batch_size);
let mut ddl_req_buffer: Vec<SenderDdlRequest> =
Vec::with_capacity(self.config.worker_request_batch_size);
let mut general_req_buffer: Vec<WorkerRequest> =
@@ -782,6 +808,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
&mut write_req_buffer,
&mut ddl_req_buffer,
&mut general_req_buffer,
&mut bulk_req_buffer,
)
.await;
@@ -801,6 +828,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
write_requests: &mut Vec<SenderWriteRequest>,
ddl_requests: &mut Vec<SenderDdlRequest>,
general_requests: &mut Vec<WorkerRequest>,
bulk_requests: &mut Vec<SenderBulkRequest>,
) {
for worker_req in general_requests.drain(..) {
match worker_req {
@@ -835,8 +863,13 @@ impl<S: LogStore> RegionWorkerLoop<S> {
sender,
} => {
if let Some(region_metadata) = metadata {
self.handle_bulk_insert(request, region_metadata, write_requests, sender)
.await;
self.handle_bulk_insert_batch(
region_metadata,
request,
bulk_requests,
sender,
)
.await;
} else {
error!("Cannot find region metadata for {}", request.region_id);
sender.send(
@@ -852,7 +885,8 @@ impl<S: LogStore> RegionWorkerLoop<S> {
// Handles all write requests first. So we can alter regions without
// considering existing write requests.
self.handle_write_requests(write_requests, true).await;
self.handle_write_requests(write_requests, bulk_requests, true)
.await;
self.handle_ddl_requests(ddl_requests).await;
}

View File

@@ -14,339 +14,113 @@
//! Handles bulk insert requests.
use std::collections::HashMap;
use api::helper::{value_to_grpc_value, ColumnDataTypeWrapper};
use api::v1::{ColumnSchema, OpType, Row, Rows};
use common_base::AffectedRows;
use common_recordbatch::DfRecordBatch;
use datatypes::prelude::VectorRef;
use datatypes::vectors::Helper;
use snafu::ResultExt;
use datatypes::arrow;
use datatypes::arrow::array::{
TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
TimestampSecondArray,
};
use datatypes::arrow::datatypes::{DataType, TimeUnit};
use store_api::logstore::LogStore;
use store_api::metadata::RegionMetadataRef;
use store_api::region_request::{BulkInsertPayload, RegionBulkInsertsRequest};
use tokio::sync::oneshot::Receiver;
use store_api::region_request::RegionBulkInsertsRequest;
use crate::error;
use crate::request::{OptionOutputTx, SenderWriteRequest, WriteRequest};
use crate::memtable::bulk::part::BulkPart;
use crate::request::{OptionOutputTx, SenderBulkRequest};
use crate::worker::RegionWorkerLoop;
use crate::{error, metrics};
impl<S: LogStore> RegionWorkerLoop<S> {
pub(crate) async fn handle_bulk_insert(
pub(crate) async fn handle_bulk_insert_batch(
&mut self,
mut request: RegionBulkInsertsRequest,
region_metadata: RegionMetadataRef,
pending_write_requests: &mut Vec<SenderWriteRequest>,
request: RegionBulkInsertsRequest,
pending_bulk_request: &mut Vec<SenderBulkRequest>,
sender: OptionOutputTx,
) {
let (column_schemas, name_to_index) =
match region_metadata_to_column_schema(&region_metadata) {
Ok(schema) => schema,
Err(e) => {
sender.send(Err(e));
return;
}
};
let _timer = metrics::REGION_WORKER_HANDLE_WRITE_ELAPSED
.with_label_values(&["process_bulk_req"])
.start_timer();
let batch = request.payload;
let num_rows = batch.num_rows();
// fast path: only one payload.
if request.payloads.len() == 1 {
match Self::handle_payload(
&region_metadata,
request.payloads.swap_remove(0),
pending_write_requests,
column_schemas,
name_to_index,
) {
Ok(task_future) => common_runtime::spawn_global(async move {
sender.send(task_future.await.context(error::RecvSnafu).flatten());
}),
Err(e) => {
sender.send(Err(e));
return;
}
};
return;
}
let mut pending_tasks = Vec::with_capacity(request.payloads.len());
for req in request.payloads {
match Self::handle_payload(
&region_metadata,
req,
pending_write_requests,
column_schemas.clone(),
name_to_index.clone(),
) {
Ok(task_future) => {
pending_tasks.push(task_future);
}
Err(e) => {
sender.send(Err(e));
return;
}
}
}
common_runtime::spawn_global(async move {
let results = match futures::future::try_join_all(pending_tasks).await {
Ok(results) => results,
Err(e) => {
sender.send(Err(e).context(error::RecvSnafu));
return;
}
};
let Some(ts) =
batch.column_by_name(&region_metadata.time_index_column().column_schema.name)
else {
sender.send(
match results.into_iter().collect::<error::Result<Vec<_>>>() {
Ok(results) => Ok(results.into_iter().sum()),
Err(e) => Err(e),
},
error::InvalidRequestSnafu {
region_id: region_metadata.region_id,
reason: format!(
"timestamp column `{}` not found",
region_metadata.time_index_column().column_schema.name
),
}
.fail(),
);
return;
};
let DataType::Timestamp(unit, _) = ts.data_type() else {
// safety: ts data type must be a timestamp type.
unreachable!()
};
let (min_ts, max_ts) = match unit {
TimeUnit::Second => {
let ts = ts.as_any().downcast_ref::<TimestampSecondArray>().unwrap();
(
//safety: ts array must contain at least one row so this won't return None.
arrow::compute::min(ts).unwrap(),
arrow::compute::max(ts).unwrap(),
)
}
TimeUnit::Millisecond => {
let ts = ts
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap();
(
//safety: ts array must contain at least one row so this won't return None.
arrow::compute::min(ts).unwrap(),
arrow::compute::max(ts).unwrap(),
)
}
TimeUnit::Microsecond => {
let ts = ts
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.unwrap();
(
//safety: ts array must contain at least one row so this won't return None.
arrow::compute::min(ts).unwrap(),
arrow::compute::max(ts).unwrap(),
)
}
TimeUnit::Nanosecond => {
let ts = ts
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.unwrap();
(
//safety: ts array must contain at least one row so this won't return None.
arrow::compute::min(ts).unwrap(),
arrow::compute::max(ts).unwrap(),
)
}
};
let part = BulkPart {
batch,
num_rows,
max_ts,
min_ts,
sequence: 0,
};
pending_bulk_request.push(SenderBulkRequest {
sender,
request: part,
region_id: request.region_id,
region_metadata,
});
}
fn handle_payload(
region_metadata: &RegionMetadataRef,
payload: BulkInsertPayload,
pending_write_requests: &mut Vec<SenderWriteRequest>,
column_schemas: Vec<ColumnSchema>,
name_to_index: HashMap<String, usize>,
) -> error::Result<Receiver<error::Result<AffectedRows>>> {
let rx = match payload {
BulkInsertPayload::ArrowIpc(rb) => Self::handle_arrow_ipc(
region_metadata,
rb,
pending_write_requests,
column_schemas,
name_to_index,
),
BulkInsertPayload::Rows { data, has_null } => Self::handle_rows(
region_metadata,
data,
column_schemas,
has_null,
pending_write_requests,
name_to_index,
),
}?;
Ok(rx)
}
fn handle_arrow_ipc(
region_metadata: &RegionMetadataRef,
df_record_batch: DfRecordBatch,
pending_write_requests: &mut Vec<SenderWriteRequest>,
column_schemas: Vec<ColumnSchema>,
name_to_index: HashMap<String, usize>,
) -> error::Result<Receiver<error::Result<AffectedRows>>> {
let has_null: Vec<_> = df_record_batch
.columns()
.iter()
.map(|c| c.null_count() > 0)
.collect();
let rows = record_batch_to_rows(region_metadata, &df_record_batch)?;
let write_request = WriteRequest {
region_id: region_metadata.region_id,
op_type: OpType::Put,
rows: Rows {
schema: column_schemas,
rows,
},
name_to_index,
has_null,
hint: None,
region_metadata: Some(region_metadata.clone()),
};
let (tx, rx) = tokio::sync::oneshot::channel();
let sender = OptionOutputTx::from(tx);
let req = SenderWriteRequest {
sender,
request: write_request,
};
pending_write_requests.push(req);
Ok(rx)
}
fn handle_rows(
region_metadata: &RegionMetadataRef,
rows: Vec<Row>,
column_schemas: Vec<ColumnSchema>,
has_null: Vec<bool>,
pending_write_requests: &mut Vec<SenderWriteRequest>,
name_to_index: HashMap<String, usize>,
) -> error::Result<Receiver<error::Result<AffectedRows>>> {
let write_request = WriteRequest {
region_id: region_metadata.region_id,
op_type: OpType::Put,
rows: Rows {
schema: column_schemas,
rows,
},
name_to_index,
has_null,
hint: None,
region_metadata: Some(region_metadata.clone()),
};
let (tx, rx) = tokio::sync::oneshot::channel();
let sender = OptionOutputTx::from(tx);
let req = SenderWriteRequest {
sender,
request: write_request,
};
pending_write_requests.push(req);
Ok(rx)
}
}
fn region_metadata_to_column_schema(
region_meta: &RegionMetadataRef,
) -> error::Result<(Vec<ColumnSchema>, HashMap<String, usize>)> {
let mut column_schemas = Vec::with_capacity(region_meta.column_metadatas.len());
let mut name_to_index = HashMap::with_capacity(region_meta.column_metadatas.len());
for (idx, c) in region_meta.column_metadatas.iter().enumerate() {
let wrapper = ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())
.with_context(|_| error::ConvertDataTypeSnafu {
data_type: c.column_schema.data_type.clone(),
})?;
column_schemas.push(ColumnSchema {
column_name: c.column_schema.name.clone(),
datatype: wrapper.datatype() as i32,
semantic_type: c.semantic_type as i32,
..Default::default()
});
name_to_index.insert(c.column_schema.name.clone(), idx);
}
Ok((column_schemas, name_to_index))
}
/// Convert [DfRecordBatch] to gRPC rows.
fn record_batch_to_rows(
region_metadata: &RegionMetadataRef,
rb: &DfRecordBatch,
) -> error::Result<Vec<Row>> {
let num_rows = rb.num_rows();
let mut rows = Vec::with_capacity(num_rows);
if num_rows == 0 {
return Ok(rows);
}
let vectors: Vec<Option<VectorRef>> = region_metadata
.column_metadatas
.iter()
.map(|c| {
rb.column_by_name(&c.column_schema.name)
.map(|column| Helper::try_into_vector(column).context(error::ConvertVectorSnafu))
.transpose()
})
.collect::<error::Result<_>>()?;
for row_idx in 0..num_rows {
let row = Row {
values: row_at(&vectors, row_idx),
};
rows.push(row);
}
Ok(rows)
}
fn row_at(vectors: &[Option<VectorRef>], row_idx: usize) -> Vec<api::v1::Value> {
let mut row = Vec::with_capacity(vectors.len());
for a in vectors {
let value = if let Some(a) = a {
value_to_grpc_value(a.get(row_idx))
} else {
api::v1::Value { value_data: None }
};
row.push(value)
}
row
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use api::v1::SemanticType;
use datatypes::arrow::array::{Int64Array, TimestampMillisecondArray};
use super::*;
use crate::test_util::meta_util::TestRegionMetadataBuilder;
fn build_record_batch(num_rows: usize) -> DfRecordBatch {
let region_metadata = Arc::new(TestRegionMetadataBuilder::default().build());
let schema = region_metadata.schema.arrow_schema().clone();
let values = (0..num_rows).map(|v| v as i64).collect::<Vec<_>>();
let ts_array = Arc::new(TimestampMillisecondArray::from_iter_values(values.clone()));
let k0_array = Arc::new(Int64Array::from_iter_values(values.clone()));
let v0_array = Arc::new(Int64Array::from_iter_values(values));
DfRecordBatch::try_new(schema, vec![ts_array, k0_array, v0_array]).unwrap()
}
#[test]
fn test_region_metadata_to_column_schema() {
let region_metadata = Arc::new(TestRegionMetadataBuilder::default().build());
let (result, _) = region_metadata_to_column_schema(&region_metadata).unwrap();
assert_eq!(result.len(), 3);
assert_eq!(result[0].column_name, "ts");
assert_eq!(result[0].semantic_type, SemanticType::Timestamp as i32);
assert_eq!(result[1].column_name, "k0");
assert_eq!(result[1].semantic_type, SemanticType::Tag as i32);
assert_eq!(result[2].column_name, "v0");
assert_eq!(result[2].semantic_type, SemanticType::Field as i32);
}
#[test]
fn test_record_batch_to_rows() {
// Create record batch
let region_metadata = Arc::new(TestRegionMetadataBuilder::default().build());
let record_batch = build_record_batch(10);
let rows = record_batch_to_rows(&region_metadata, &record_batch).unwrap();
assert_eq!(rows.len(), 10);
assert_eq!(rows[0].values.len(), 3);
for (row_idx, row) in rows.iter().enumerate().take(10) {
assert_eq!(
row.values[0].value_data.as_ref().unwrap(),
&api::v1::value::ValueData::TimestampMillisecondValue(row_idx as i64)
);
}
}
#[test]
fn test_record_batch_to_rows_schema_mismatch() {
let region_metadata = Arc::new(TestRegionMetadataBuilder::default().num_fields(2).build());
let record_batch = build_record_batch(1);
let rows = record_batch_to_rows(&region_metadata, &record_batch).unwrap();
assert_eq!(rows.len(), 1);
// Check first row
let row1 = &rows[0];
assert_eq!(row1.values.len(), 4);
assert_eq!(
row1.values[0].value_data.as_ref().unwrap(),
&api::v1::value::ValueData::TimestampMillisecondValue(0)
);
assert_eq!(
row1.values[1].value_data.as_ref().unwrap(),
&api::v1::value::ValueData::I64Value(0)
);
assert_eq!(
row1.values[2].value_data.as_ref().unwrap(),
&api::v1::value::ValueData::I64Value(0)
);
assert!(row1.values[3].value_data.is_none());
}
}

View File

@@ -236,13 +236,14 @@ impl<S: LogStore> RegionWorkerLoop<S> {
request.on_success();
// Handle pending requests for the region.
if let Some((mut ddl_requests, mut write_requests)) =
if let Some((mut ddl_requests, mut write_requests, mut bulk_writes)) =
self.flush_scheduler.on_flush_success(region_id)
{
// Perform DDLs first because they require empty memtables.
self.handle_ddl_requests(&mut ddl_requests).await;
// Handle pending write requests, we don't stall these requests.
self.handle_write_requests(&mut write_requests, false).await;
self.handle_write_requests(&mut write_requests, &mut bulk_writes, false)
.await;
}
// Handle stalled requests.

View File

@@ -26,10 +26,11 @@ use store_api::logstore::LogStore;
use store_api::storage::RegionId;
use crate::error::{InvalidRequestSnafu, RegionStateSnafu, RejectWriteSnafu, Result};
use crate::metrics;
use crate::metrics::{WRITE_REJECT_TOTAL, WRITE_ROWS_TOTAL, WRITE_STAGE_ELAPSED};
use crate::region::{RegionLeaderState, RegionRoleState};
use crate::region_write_ctx::RegionWriteCtx;
use crate::request::{SenderWriteRequest, WriteRequest};
use crate::request::{SenderBulkRequest, SenderWriteRequest, WriteRequest};
use crate::worker::RegionWorkerLoop;
impl<S: LogStore> RegionWorkerLoop<S> {
@@ -37,9 +38,10 @@ impl<S: LogStore> RegionWorkerLoop<S> {
pub(crate) async fn handle_write_requests(
&mut self,
write_requests: &mut Vec<SenderWriteRequest>,
bulk_requests: &mut Vec<SenderBulkRequest>,
allow_stall: bool,
) {
if write_requests.is_empty() {
if write_requests.is_empty() && bulk_requests.is_empty() {
return;
}
@@ -48,7 +50,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
if self.should_reject_write() {
// The memory pressure is still too high, reject write requests.
reject_write_requests(write_requests);
reject_write_requests(write_requests, bulk_requests);
// Also reject all stalled requests.
self.reject_stalled_requests();
return;
@@ -56,7 +58,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
if self.write_buffer_manager.should_stall() && allow_stall {
self.stalled_count.add(write_requests.len() as i64);
self.stalled_requests.append(write_requests);
self.stalled_requests.append(write_requests, bulk_requests);
self.listener.on_write_stall();
return;
}
@@ -66,7 +68,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
let _timer = WRITE_STAGE_ELAPSED
.with_label_values(&["prepare_ctx"])
.start_timer();
self.prepare_region_write_ctx(write_requests)
self.prepare_region_write_ctx(write_requests, bulk_requests)
};
// Write to WAL.
@@ -117,6 +119,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
// fast path for single region.
let mut region_ctx = region_ctxs.into_values().next().unwrap();
region_ctx.write_memtable().await;
region_ctx.write_bulk().await;
put_rows += region_ctx.put_num;
delete_rows += region_ctx.delete_num;
} else {
@@ -126,6 +129,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
// use tokio runtime to schedule tasks.
common_runtime::spawn_global(async move {
region_ctx.write_memtable().await;
region_ctx.write_bulk().await;
(region_ctx.put_num, region_ctx.delete_num)
})
})
@@ -158,8 +162,9 @@ impl<S: LogStore> RegionWorkerLoop<S> {
let stalled = std::mem::take(&mut self.stalled_requests);
self.stalled_count.sub(stalled.stalled_count() as i64);
// We already stalled these requests, don't stall them again.
for (_, (_, mut requests)) in stalled.requests {
self.handle_write_requests(&mut requests, false).await;
for (_, (_, mut requests, mut bulk)) in stalled.requests {
self.handle_write_requests(&mut requests, &mut bulk, false)
.await;
}
}
@@ -167,25 +172,26 @@ impl<S: LogStore> RegionWorkerLoop<S> {
pub(crate) fn reject_stalled_requests(&mut self) {
let stalled = std::mem::take(&mut self.stalled_requests);
self.stalled_count.sub(stalled.stalled_count() as i64);
for (_, (_, mut requests)) in stalled.requests {
reject_write_requests(&mut requests);
for (_, (_, mut requests, mut bulk)) in stalled.requests {
reject_write_requests(&mut requests, &mut bulk);
}
}
/// Rejects a specific region's stalled requests.
pub(crate) fn reject_region_stalled_requests(&mut self, region_id: &RegionId) {
debug!("Rejects stalled requests for region {}", region_id);
let mut requests = self.stalled_requests.remove(region_id);
let (mut requests, mut bulk) = self.stalled_requests.remove(region_id);
self.stalled_count.sub(requests.len() as i64);
reject_write_requests(&mut requests);
reject_write_requests(&mut requests, &mut bulk);
}
/// Handles a specific region's stalled requests.
pub(crate) async fn handle_region_stalled_requests(&mut self, region_id: &RegionId) {
debug!("Handles stalled requests for region {}", region_id);
let mut requests = self.stalled_requests.remove(region_id);
let (mut requests, mut bulk) = self.stalled_requests.remove(region_id);
self.stalled_count.sub(requests.len() as i64);
self.handle_write_requests(&mut requests, true).await;
self.handle_write_requests(&mut requests, &mut bulk, true)
.await;
}
}
@@ -194,9 +200,20 @@ impl<S> RegionWorkerLoop<S> {
fn prepare_region_write_ctx(
&mut self,
write_requests: &mut Vec<SenderWriteRequest>,
bulk_requests: &mut Vec<SenderBulkRequest>,
) -> HashMap<RegionId, RegionWriteCtx> {
// Initialize region write context map.
let mut region_ctxs = HashMap::new();
self.process_write_requests(&mut region_ctxs, write_requests);
self.process_bulk_requests(&mut region_ctxs, bulk_requests);
region_ctxs
}
fn process_write_requests(
&mut self,
region_ctxs: &mut HashMap<RegionId, RegionWriteCtx>,
write_requests: &mut Vec<SenderWriteRequest>,
) {
for mut sender_req in write_requests.drain(..) {
let region_id = sender_req.request.region_id;
@@ -294,8 +311,89 @@ impl<S> RegionWorkerLoop<S> {
sender_req.sender,
);
}
}
region_ctxs
/// Processes bulk insert requests.
fn process_bulk_requests(
&mut self,
region_ctxs: &mut HashMap<RegionId, RegionWriteCtx>,
requests: &mut Vec<SenderBulkRequest>,
) {
let _timer = metrics::REGION_WORKER_HANDLE_WRITE_ELAPSED
.with_label_values(&["prepare_bulk_request"])
.start_timer();
for mut bulk_req in requests.drain(..) {
let region_id = bulk_req.region_id;
// If region is waiting for alteration, add requests to pending writes.
if self.flush_scheduler.has_pending_ddls(region_id) {
// Safety: The region has pending ddls.
self.flush_scheduler.add_bulk_request_to_pending(bulk_req);
continue;
}
// Checks whether the region exists and is it stalling.
if let hash_map::Entry::Vacant(e) = region_ctxs.entry(region_id) {
let Some(region) = self.regions.get_region_or(region_id, &mut bulk_req.sender)
else {
continue;
};
match region.state() {
RegionRoleState::Leader(RegionLeaderState::Writable) => {
let region_ctx = RegionWriteCtx::new(
region.region_id,
&region.version_control,
region.provider.clone(),
);
e.insert(region_ctx);
}
RegionRoleState::Leader(RegionLeaderState::Altering) => {
debug!(
"Region {} is altering, add request to pending writes",
region.region_id
);
self.stalled_count.add(1);
self.stalled_requests.push_bulk(bulk_req);
continue;
}
state => {
// The region is not writable.
bulk_req.sender.send(
RegionStateSnafu {
region_id,
state,
expect: RegionRoleState::Leader(RegionLeaderState::Writable),
}
.fail(),
);
continue;
}
}
}
// Safety: Now we ensure the region exists.
let region_ctx = region_ctxs.get_mut(&region_id).unwrap();
// Double-check the request schema
let need_fill_missing_columns = region_ctx.version().metadata.schema_version
!= bulk_req.region_metadata.schema_version;
// Only fill missing columns if primary key is dense encoded.
if need_fill_missing_columns {
// todo(hl): support filling default columns
bulk_req.sender.send(
InvalidRequestSnafu {
region_id,
reason: "Schema mismatch",
}
.fail(),
);
return;
}
// Collect requests by region.
region_ctx.push_bulk(bulk_req.sender, bulk_req.request);
}
}
/// Returns true if the engine needs to reject some write requests.
@@ -307,7 +405,10 @@ impl<S> RegionWorkerLoop<S> {
}
/// Send rejected error to all `write_requests`.
fn reject_write_requests(write_requests: &mut Vec<SenderWriteRequest>) {
fn reject_write_requests(
write_requests: &mut Vec<SenderWriteRequest>,
bulk_requests: &mut Vec<SenderBulkRequest>,
) {
WRITE_REJECT_TOTAL.inc_by(write_requests.len() as u64);
for req in write_requests.drain(..) {
@@ -318,6 +419,10 @@ fn reject_write_requests(write_requests: &mut Vec<SenderWriteRequest>) {
.fail(),
);
}
for req in bulk_requests.drain(..) {
let region_id = req.region_id;
req.sender.send(RejectWriteSnafu { region_id }.fail());
}
}
/// Rejects delete request under append mode.

View File

@@ -12,16 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use ahash::{HashMap, HashMapExt};
use api::v1::region::{
bulk_insert_request, region_request, ArrowIpc, BulkInsertRequest, RegionRequest,
RegionRequestHeader, RegionSelection,
RegionRequestHeader,
};
use bytes::Bytes;
use common_base::AffectedRows;
use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
use common_grpc::FlightData;
use common_recordbatch::RecordBatch;
use common_telemetry::tracing_context::TracingContext;
use datatypes::schema::Schema;
use prost::Message;
use snafu::ResultExt;
use store_api::storage::RegionId;
@@ -41,7 +45,6 @@ impl Inserter {
let decode_timer = metrics::HANDLE_BULK_INSERT_ELAPSED
.with_label_values(&["decode_request"])
.start_timer();
let raw_flight_data = Bytes::from(data.encode_to_vec());
let body_size = data.data_body.len();
// Build region server requests
let message = decoder
@@ -50,16 +53,19 @@ impl Inserter {
let FlightMessage::Recordbatch(rb) = message else {
return Ok(0);
};
metrics::BULK_REQUEST_MESSAGE_SIZE.observe(body_size as f64);
let record_batch = rb.df_record_batch();
decode_timer.observe_duration();
metrics::BULK_REQUEST_MESSAGE_SIZE.observe(body_size as f64);
metrics::BULK_REQUEST_ROWS
.with_label_values(&["raw"])
.observe(record_batch.num_rows() as f64);
// todo(hl): find a way to embed raw FlightData messages in greptimedb proto files so we don't have to encode here.
// safety: when reach here schema must be present.
let schema_message = FlightEncoder::default()
.encode(FlightMessage::Schema(decoder.schema().unwrap().clone()));
let schema_data = Bytes::from(schema_message.encode_to_vec());
let record_batch = rb.df_record_batch();
let schema_bytes = Bytes::from(schema_message.encode_to_vec());
let partition_timer = metrics::HANDLE_BULK_INSERT_ELAPSED
.with_label_values(&["partition"])
@@ -76,10 +82,6 @@ impl Inserter {
.context(error::SplitInsertSnafu)?;
partition_timer.observe_duration();
let group_request_timer = metrics::HANDLE_BULK_INSERT_ELAPSED
.with_label_values(&["group_request"])
.start_timer();
let mut mask_per_datanode = HashMap::with_capacity(region_masks.len());
for (region_number, mask) in region_masks {
let region_id = RegionId::new(table_id, region_number);
@@ -88,79 +90,82 @@ impl Inserter {
.find_region_leader(region_id)
.await
.context(error::FindRegionLeaderSnafu)?;
let selection = RegionSelection {
region_id: region_id.as_u64(),
selection: mask.values().inner().as_slice().to_vec(),
};
mask_per_datanode
.entry(datanode)
.or_insert_with(Vec::new)
.push(selection);
.push((region_id, mask));
}
group_request_timer.observe_duration();
let datanode_handle_timer = metrics::HANDLE_BULK_INSERT_ELAPSED
.with_label_values(&["datanode_handle"])
let wait_all_datanode_timer = metrics::HANDLE_BULK_INSERT_ELAPSED
.with_label_values(&["wait_all_datanode"])
.start_timer();
// fast path: only one datanode
if mask_per_datanode.len() == 1 {
let (peer, requests) = mask_per_datanode.into_iter().next().unwrap();
let datanode = self.node_manager.datanode(&peer).await;
let request = RegionRequest {
header: Some(RegionRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
..Default::default()
}),
body: Some(region_request::Body::BulkInsert(BulkInsertRequest {
body: Some(bulk_insert_request::Body::ArrowIpc(ArrowIpc {
schema: schema_data,
payload: raw_flight_data,
region_selection: requests,
})),
})),
};
let response = datanode
.handle(request)
.await
.context(error::RequestRegionSnafu)?;
return Ok(response.affected_rows);
}
let mut handles = Vec::with_capacity(mask_per_datanode.len());
let record_batch_schema =
Arc::new(Schema::try_from(record_batch.schema()).context(error::ConvertSchemaSnafu)?);
for (peer, masks) in mask_per_datanode {
let node_manager = self.node_manager.clone();
let schema = schema_data.clone();
let payload = raw_flight_data.clone();
for (region_id, mask) in masks {
let rb = record_batch.clone();
let schema_bytes = schema_bytes.clone();
let record_batch_schema = record_batch_schema.clone();
let node_manager = self.node_manager.clone();
let peer = peer.clone();
let handle: common_runtime::JoinHandle<error::Result<api::region::RegionResponse>> =
common_runtime::spawn_global(async move {
let filter_timer = metrics::HANDLE_BULK_INSERT_ELAPSED
.with_label_values(&["filter"])
.start_timer();
let rb = arrow::compute::filter_record_batch(&rb, &mask)
.context(error::ComputeArrowSnafu)?;
filter_timer.observe_duration();
metrics::BULK_REQUEST_ROWS
.with_label_values(&["rows_per_region"])
.observe(rb.num_rows() as f64);
let handle: common_runtime::JoinHandle<error::Result<api::region::RegionResponse>> =
common_runtime::spawn_global(async move {
let request = RegionRequest {
header: Some(RegionRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
..Default::default()
}),
body: Some(region_request::Body::BulkInsert(BulkInsertRequest {
body: Some(bulk_insert_request::Body::ArrowIpc(ArrowIpc {
schema,
payload,
region_selection: masks,
let encode_timer = metrics::HANDLE_BULK_INSERT_ELAPSED
.with_label_values(&["encode"])
.start_timer();
let batch = RecordBatch::try_from_df_record_batch(record_batch_schema, rb)
.context(error::BuildRecordBatchSnafu)?;
let payload = Bytes::from(
FlightEncoder::default()
.encode(FlightMessage::Recordbatch(batch))
.encode_to_vec(),
);
encode_timer.observe_duration();
let _datanode_handle_timer = metrics::HANDLE_BULK_INSERT_ELAPSED
.with_label_values(&["datanode_handle"])
.start_timer();
let request = RegionRequest {
header: Some(RegionRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
..Default::default()
}),
body: Some(region_request::Body::BulkInsert(BulkInsertRequest {
body: Some(bulk_insert_request::Body::ArrowIpc(ArrowIpc {
region_id: region_id.as_u64(),
schema: schema_bytes,
payload,
})),
})),
})),
};
};
let datanode = node_manager.datanode(&peer).await;
datanode
.handle(request)
.await
.context(error::RequestRegionSnafu)
});
handles.push(handle);
let datanode = node_manager.datanode(&peer).await;
datanode
.handle(request)
.await
.context(error::RequestRegionSnafu)
});
handles.push(handle);
}
}
let region_responses = futures::future::try_join_all(handles)
.await
.context(error::JoinTaskSnafu)?;
datanode_handle_timer.observe_duration();
wait_all_datanode_timer.observe_duration();
let mut rows_inserted: usize = 0;
for res in region_responses {
rows_inserted += res?.affected_rows;

View File

@@ -814,6 +814,14 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to perform arrow compute"))]
ComputeArrow {
#[snafu(source)]
error: ArrowError,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -936,6 +944,7 @@ impl ErrorExt for Error {
Error::StatementTimeout { .. } => StatusCode::Cancelled,
Error::ColumnOptions { source, .. } => source.status_code(),
Error::DecodeFlightData { source, .. } => source.status_code(),
Error::ComputeArrow { .. } => StatusCode::Internal,
}
}

View File

@@ -93,4 +93,12 @@ lazy_static! {
]
)
.unwrap();
pub static ref BULK_REQUEST_ROWS: HistogramVec = register_histogram_vec!(
"greptime_table_operator_bulk_insert_message_rows",
"table operator bulk inserts message rows",
&["type"],
// 10 ~ 100_000
exponential_buckets(10.0, 10.0, 5).unwrap()
)
.unwrap();
}

View File

@@ -14,9 +14,8 @@
use std::collections::HashMap;
use std::fmt::{self, Display};
use std::time::{Duration, Instant};
use api::helper::{value_to_grpc_value, ColumnDataTypeWrapper};
use api::helper::ColumnDataTypeWrapper;
use api::v1::add_column_location::LocationType;
use api::v1::column_def::{
as_fulltext_option_analyzer, as_fulltext_option_backend, as_skipping_index_type,
@@ -28,7 +27,7 @@ use api::v1::region::{
DropRequests, FlushRequest, InsertRequests, OpenRequest, TruncateRequest,
};
use api::v1::{
self, set_index, Analyzer, FulltextBackend as PbFulltextBackend, Option as PbOption, Row, Rows,
self, set_index, Analyzer, FulltextBackend as PbFulltextBackend, Option as PbOption, Rows,
SemanticType, SkippingIndexType as PbSkippingIndexType, WriteHint,
};
pub use common_base::AffectedRows;
@@ -36,12 +35,8 @@ use common_grpc::flight::{FlightDecoder, FlightMessage};
use common_grpc::FlightData;
use common_recordbatch::DfRecordBatch;
use common_time::TimeToLive;
use datatypes::arrow;
use datatypes::arrow::array::{Array, BooleanArray};
use datatypes::arrow::buffer::{BooleanBuffer, Buffer};
use datatypes::prelude::{ConcreteDataType, VectorRef};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{FulltextOptions, SkippingIndexOptions};
use datatypes::vectors::Helper;
use prost::Message;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
@@ -49,8 +44,8 @@ use strum::{AsRefStr, IntoStaticStr};
use crate::logstore::entry;
use crate::metadata::{
ColumnMetadata, DecodeArrowIpcSnafu, DecodeProtoSnafu, FlightCodecSnafu,
InvalidRawRegionRequestSnafu, InvalidRegionRequestSnafu, InvalidSetRegionOptionRequestSnafu,
ColumnMetadata, DecodeProtoSnafu, FlightCodecSnafu, InvalidRawRegionRequestSnafu,
InvalidRegionRequestSnafu, InvalidSetRegionOptionRequestSnafu,
InvalidUnsetRegionOptionRequestSnafu, MetadataError, ProstSnafu, RegionMetadata, Result,
UnexpectedSnafu,
};
@@ -159,7 +154,7 @@ impl RegionRequest {
region_request::Body::Creates(creates) => make_region_creates(creates),
region_request::Body::Drops(drops) => make_region_drops(drops),
region_request::Body::Alters(alters) => make_region_alters(alters),
region_request::Body::BulkInsert(bulk) => make_region_rows_bulk_inserts(bulk),
region_request::Body::BulkInsert(bulk) => make_region_bulk_inserts(bulk),
region_request::Body::Sync(_) => UnexpectedSnafu {
reason: "Sync request should be handled separately by RegionServer",
}
@@ -334,165 +329,30 @@ fn make_region_truncate(truncate: TruncateRequest) -> Result<Vec<(RegionId, Regi
}
/// Convert [BulkInsertRequest] to [RegionRequest] and group by [RegionId].
#[allow(unused)]
fn make_region_bulk_inserts(request: BulkInsertRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
let Some(Body::ArrowIpc(request)) = request.body else {
return Ok(vec![]);
};
let mut region_requests: HashMap<u64, BulkInsertPayload> =
HashMap::with_capacity(request.region_selection.len());
let schema_data = FlightData::decode(request.schema.clone()).context(ProstSnafu)?;
let payload_data = FlightData::decode(request.payload.clone()).context(ProstSnafu)?;
let mut decoder = FlightDecoder::default();
let _schema_message = decoder.try_decode(schema_data).context(FlightCodecSnafu)?;
let FlightMessage::Recordbatch(rb) =
decoder.try_decode(payload_data).context(FlightCodecSnafu)?
else {
unreachable!("Always expect record batch message after schema");
};
for region_selection in request.region_selection {
let region_id = region_selection.region_id;
let region_mask = BooleanArray::new(
BooleanBuffer::new(Buffer::from(region_selection.selection), 0, rb.num_rows()),
None,
);
let region_batch = if region_mask.true_count() == rb.num_rows() {
rb.df_record_batch().clone()
} else {
arrow::compute::filter_record_batch(rb.df_record_batch(), &region_mask)
.context(DecodeArrowIpcSnafu)?
};
region_requests.insert(region_id, BulkInsertPayload::ArrowIpc(region_batch));
}
let result = region_requests
.into_iter()
.map(|(region_id, payload)| {
(
region_id.into(),
RegionRequest::BulkInserts(RegionBulkInsertsRequest {
region_id: region_id.into(),
payloads: vec![payload],
}),
)
})
.collect::<Vec<_>>();
Ok(result)
}
/// Convert [BulkInsertRequest] to [RegionRequest] and group by [RegionId].
fn make_region_rows_bulk_inserts(
request: BulkInsertRequest,
) -> Result<Vec<(RegionId, RegionRequest)>> {
let Some(Body::ArrowIpc(request)) = request.body else {
return Ok(vec![]);
};
let mut region_requests: HashMap<u64, BulkInsertPayload> =
HashMap::with_capacity(request.region_selection.len());
let decode_timer = metrics::CONVERT_REGION_BULK_REQUEST
let decoder_timer = metrics::CONVERT_REGION_BULK_REQUEST
.with_label_values(&["decode"])
.start_timer();
let schema_data = FlightData::decode(request.schema.clone()).context(ProstSnafu)?;
let payload_data = FlightData::decode(request.payload.clone()).context(ProstSnafu)?;
let mut decoder = FlightDecoder::default();
let _schema_message = decoder.try_decode(schema_data).context(FlightCodecSnafu)?;
let _ = decoder.try_decode(schema_data).context(FlightCodecSnafu)?;
let FlightMessage::Recordbatch(rb) =
decoder.try_decode(payload_data).context(FlightCodecSnafu)?
else {
unreachable!("Always expect record batch message after schema");
};
decode_timer.observe_duration();
let filter_timer = metrics::CONVERT_REGION_BULK_REQUEST.with_label_values(&["filter_batch"]);
let convert_to_rows_timer =
metrics::CONVERT_REGION_BULK_REQUEST.with_label_values(&["convert_to_rows"]);
let mut filter_time = Duration::default();
let mut convert_to_rows_time = Duration::default();
for region_selection in request.region_selection {
let region_id = region_selection.region_id;
let start = Instant::now();
let region_mask = BooleanArray::new(
BooleanBuffer::new(Buffer::from(region_selection.selection), 0, rb.num_rows()),
None,
);
let region_batch = if region_mask.true_count() == rb.num_rows() {
rb.df_record_batch().clone()
} else {
arrow::compute::filter_record_batch(rb.df_record_batch(), &region_mask)
.context(DecodeArrowIpcSnafu)?
};
filter_time += start.elapsed();
let start = Instant::now();
let (rows, has_null) = record_batch_to_rows(&region_batch);
convert_to_rows_time += start.elapsed();
region_requests.insert(
region_id,
BulkInsertPayload::Rows {
data: rows,
has_null,
},
);
}
filter_timer.observe(filter_time.as_secs_f64());
convert_to_rows_timer.observe(convert_to_rows_time.as_secs_f64());
let result = region_requests
.into_iter()
.map(|(region_id, payload)| {
(
region_id.into(),
RegionRequest::BulkInserts(RegionBulkInsertsRequest {
region_id: region_id.into(),
payloads: vec![payload],
}),
)
})
.collect::<Vec<_>>();
Ok(result)
}
/// Convert [DfRecordBatch] to gRPC rows.
fn record_batch_to_rows(rb: &DfRecordBatch) -> (Vec<Row>, Vec<bool>) {
let num_rows = rb.num_rows();
let mut rows = Vec::with_capacity(num_rows);
if num_rows == 0 {
return (rows, vec![false; rb.num_columns()]);
}
let mut vectors = Vec::with_capacity(rb.num_columns());
let mut has_null = Vec::with_capacity(rb.num_columns());
for c in rb.columns() {
vectors.push(Helper::try_into_vector(c).unwrap());
has_null.push(c.null_count() > 0);
}
for row_idx in 0..num_rows {
let row = Row {
values: row_at(&vectors, row_idx),
};
rows.push(row);
}
(rows, has_null)
}
fn row_at(vectors: &[VectorRef], row_idx: usize) -> Vec<api::v1::Value> {
let mut row = Vec::with_capacity(vectors.len());
for a in vectors {
row.push(value_to_grpc_value(a.get(row_idx)))
}
row
decoder_timer.observe_duration();
let payload = rb.into_df_record_batch();
let region_id: RegionId = request.region_id.into();
Ok(vec![(
region_id,
RegionRequest::BulkInserts(RegionBulkInsertsRequest { region_id, payload }),
)])
}
/// Request to put data into a region.
@@ -1302,13 +1162,13 @@ pub struct RegionSequencesRequest {
#[derive(Debug, Clone)]
pub struct RegionBulkInsertsRequest {
pub region_id: RegionId,
pub payloads: Vec<BulkInsertPayload>,
pub payload: DfRecordBatch,
}
#[derive(Debug, Clone)]
pub enum BulkInsertPayload {
ArrowIpc(DfRecordBatch),
Rows { data: Vec<Row>, has_null: Vec<bool> },
impl RegionBulkInsertsRequest {
pub fn estimated_size(&self) -> usize {
self.payload.get_array_memory_size()
}
}
impl fmt::Display for RegionRequest {