diff --git a/Cargo.lock b/Cargo.lock index f0419018d9..2e5c85eba7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4820,7 +4820,7 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=159e92d30b4c0116a7ef376b535d880c6d580fb9#159e92d30b4c0116a7ef376b535d880c6d580fb9" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=17a3550751c8b1e02ec16be40101d5f24dc255c3#17a3550751c8b1e02ec16be40101d5f24dc255c3" dependencies = [ "prost 0.13.5", "serde", diff --git a/Cargo.toml b/Cargo.toml index 16548c30fd..b6a76c4309 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -130,7 +130,7 @@ etcd-client = "0.14" fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "159e92d30b4c0116a7ef376b535d880c6d580fb9" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "17a3550751c8b1e02ec16be40101d5f24dc255c3" } hex = "0.4" http = "1" humantime = "2.1" diff --git a/src/datatypes/src/vectors/string.rs b/src/datatypes/src/vectors/string.rs index 18c35eaace..5603c96e15 100644 --- a/src/datatypes/src/vectors/string.rs +++ b/src/datatypes/src/vectors/string.rs @@ -166,7 +166,7 @@ impl ScalarVector for StringVector { } pub struct StringVectorBuilder { - mutable_array: MutableStringArray, + pub mutable_array: MutableStringArray, } impl MutableVector for StringVectorBuilder { diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 0ac04e8b3e..9c9c78f07e 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -1021,17 +1021,6 @@ pub enum Error { #[snafu(implicit)] location: Location, }, - - #[snafu(display( - "Failed to convert ConcreteDataType to ColumnDataType: {:?}", - data_type - ))] - ConvertDataType { - data_type: ConcreteDataType, - source: api::error::Error, - #[snafu(implicit)] - location: Location, - }, } pub type Result = std::result::Result; @@ -1183,7 +1172,6 @@ impl ErrorExt for Error { ManualCompactionOverride {} => StatusCode::Cancelled, IncompatibleWalProviderChange { .. } => StatusCode::InvalidArguments, - ConvertDataType { .. } => StatusCode::Internal, } } diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index ffd6e896a3..d5012a3fb7 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -41,8 +41,8 @@ use crate::region::options::IndexOptions; use crate::region::version::{VersionControlData, VersionControlRef}; use crate::region::{ManifestContextRef, RegionLeaderState}; use crate::request::{ - BackgroundNotify, FlushFailed, FlushFinished, OptionOutputTx, OutputTx, SenderDdlRequest, - SenderWriteRequest, WorkerRequest, + BackgroundNotify, FlushFailed, FlushFinished, OptionOutputTx, OutputTx, SenderBulkRequest, + SenderDdlRequest, SenderWriteRequest, WorkerRequest, }; use crate::schedule::scheduler::{Job, SchedulerRef}; use crate::sst::file::FileMeta; @@ -547,7 +547,11 @@ impl FlushScheduler { pub(crate) fn on_flush_success( &mut self, region_id: RegionId, - ) -> Option<(Vec, Vec)> { + ) -> Option<( + Vec, + Vec, + Vec, + )> { let flush_status = self.region_status.get_mut(®ion_id)?; // This region doesn't have running flush job. @@ -557,7 +561,11 @@ impl FlushScheduler { // The region doesn't have any pending flush task. // Safety: The flush status must exist. let flush_status = self.region_status.remove(®ion_id).unwrap(); - Some((flush_status.pending_ddls, flush_status.pending_writes)) + Some(( + flush_status.pending_ddls, + flush_status.pending_writes, + flush_status.pending_bulk_writes, + )) } else { let version_data = flush_status.version_control.current(); if version_data.version.memtables.is_empty() { @@ -570,7 +578,11 @@ impl FlushScheduler { // it from the status to avoid leaking pending requests. // Safety: The flush status must exist. let flush_status = self.region_status.remove(®ion_id).unwrap(); - Some((flush_status.pending_ddls, flush_status.pending_writes)) + Some(( + flush_status.pending_ddls, + flush_status.pending_writes, + flush_status.pending_bulk_writes, + )) } else { // We can flush the region again, keep it in the region status. None @@ -657,6 +669,15 @@ impl FlushScheduler { status.pending_writes.push(request); } + /// Add bulk write request to pending queue. + /// + /// # Panics + /// Panics if region didn't request flush. + pub(crate) fn add_bulk_request_to_pending(&mut self, request: SenderBulkRequest) { + let status = self.region_status.get_mut(&request.region_id).unwrap(); + status.pending_bulk_writes.push(request); + } + /// Returns true if the region has pending DDLs. pub(crate) fn has_pending_ddls(&self, region_id: RegionId) -> bool { self.region_status @@ -717,6 +738,8 @@ struct FlushStatus { pending_ddls: Vec, /// Requests waiting to write after altering the region. pending_writes: Vec, + /// Bulk requests waiting to write after altering the region. + pending_bulk_writes: Vec, } impl FlushStatus { @@ -728,6 +751,7 @@ impl FlushStatus { pending_task: None, pending_ddls: Vec::new(), pending_writes: Vec::new(), + pending_bulk_writes: Vec::new(), } } diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index af185c9c61..a999c90f0c 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -19,7 +19,7 @@ use std::fmt; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; -pub use bulk::part::BulkPart; +pub use bulk::part::EncodedBulkPart; use common_time::Timestamp; use serde::{Deserialize, Serialize}; use store_api::metadata::RegionMetadataRef; @@ -29,6 +29,7 @@ use table::predicate::Predicate; use crate::config::MitoConfig; use crate::error::Result; use crate::flush::WriteBufferManagerRef; +use crate::memtable::bulk::part::BulkPart; use crate::memtable::key_values::KeyValue; pub use crate::memtable::key_values::KeyValues; use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtableBuilder}; @@ -40,9 +41,11 @@ use crate::read::Batch; use crate::region::options::{MemtableOptions, MergeMode}; use crate::sst::file::FileTimeRange; +mod builder; pub mod bulk; pub mod key_values; pub mod partition_tree; +mod simple_bulk_memtable; mod stats; pub mod time_partition; pub mod time_series; @@ -158,7 +161,7 @@ pub trait Memtable: Send + Sync + fmt::Debug { projection: Option<&[ColumnId]>, predicate: PredicateGroup, sequence: Option, - ) -> MemtableRanges; + ) -> Result; /// Returns true if the memtable is empty. fn is_empty(&self) -> bool; diff --git a/src/mito2/src/memtable/builder.rs b/src/mito2/src/memtable/builder.rs new file mode 100644 index 0000000000..a75937fc26 --- /dev/null +++ b/src/mito2/src/memtable/builder.rs @@ -0,0 +1,300 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Builders for time-series memtable + +use std::sync::Arc; + +use datatypes::arrow; +use datatypes::arrow::array::{ + Array, ArrayDataBuilder, BufferBuilder, GenericByteArray, NullBufferBuilder, UInt8BufferBuilder, +}; +use datatypes::arrow_array::StringArray; +use datatypes::data_type::DataType; +use datatypes::prelude::{ConcreteDataType, MutableVector, VectorRef}; +use datatypes::value::ValueRef; +use datatypes::vectors::StringVector; + +/// Field builder with special implementation for strings. +pub(crate) enum FieldBuilder { + String(StringBuilder), + Other(Box), +} + +impl FieldBuilder { + /// Creates a [FieldBuilder] instance with given type and capacity. + pub fn create(data_type: &ConcreteDataType, init_cap: usize) -> Self { + if let ConcreteDataType::String(_) = data_type { + Self::String(StringBuilder::with_capacity(init_cap / 16, init_cap)) + } else { + Self::Other(data_type.create_mutable_vector(init_cap)) + } + } + + /// Pushes a value into builder. + pub(crate) fn push(&mut self, value: ValueRef) -> datatypes::error::Result<()> { + match self { + FieldBuilder::String(b) => { + if let Some(s) = value.as_string()? { + b.append(s); + } else { + b.append_null(); + } + Ok(()) + } + FieldBuilder::Other(b) => b.try_push_value_ref(value), + } + } + + /// Push n null values into builder. + pub(crate) fn push_nulls(&mut self, n: usize) { + match self { + FieldBuilder::String(s) => { + s.append_n_nulls(n); + } + FieldBuilder::Other(v) => { + v.push_nulls(n); + } + } + } + + /// Finishes builder and builder a [VectorRef]. + pub(crate) fn finish(&mut self) -> VectorRef { + match self { + FieldBuilder::String(s) => Arc::new(StringVector::from(s.build())) as _, + FieldBuilder::Other(v) => v.to_vector(), + } + } +} + +/// [StringBuilder] serves as a workaround for lacking [`GenericStringBuilder::append_array`](https://docs.rs/arrow-array/latest/arrow_array/builder/type.GenericStringBuilder.html#method.append_array) +/// which is only available since arrow-rs 55.0.0. +pub(crate) struct StringBuilder { + value_builder: UInt8BufferBuilder, + offsets_builder: BufferBuilder, + null_buffer_builder: NullBufferBuilder, +} + +impl Default for StringBuilder { + fn default() -> Self { + Self::with_capacity(16, 256) + } +} + +impl StringBuilder { + /// Creates a new [`GenericByteBuilder`]. + /// + /// - `item_capacity` is the number of items to pre-allocate. + /// The size of the preallocated buffer of offsets is the number of items plus one. + /// - `data_capacity` is the total number of bytes of data to pre-allocate + /// (for all items, not per item). + pub fn with_capacity(item_capacity: usize, data_capacity: usize) -> Self { + let mut offsets_builder = BufferBuilder::::new(item_capacity + 1); + offsets_builder.append(0); + Self { + value_builder: UInt8BufferBuilder::new(data_capacity), + offsets_builder, + null_buffer_builder: NullBufferBuilder::new(item_capacity), + } + } + + pub fn append(&mut self, data: &str) { + self.value_builder.append_slice(data.as_bytes()); + self.null_buffer_builder.append(true); + self.offsets_builder.append(self.next_offset()); + } + + #[inline] + fn next_offset(&self) -> i32 { + i32::try_from(self.value_builder.len()).expect("byte array offset overflow") + } + + pub fn len(&self) -> usize { + self.null_buffer_builder.len() + } + + /// Based on arrow-rs' GenericByteBuilder: + /// https://github.com/apache/arrow-rs/blob/7905545537c50590fdb4dc645e3e0130fce80b57/arrow-array/src/builder/generic_bytes_builder.rs#L135 + pub fn append_array(&mut self, array: &StringArray) { + if array.len() == 0 { + return; + } + + let offsets = array.offsets(); + + // If the offsets are contiguous, we can append them directly avoiding the need to align + // for example, when the first appended array is not sliced (starts at offset 0) + if self.next_offset() == offsets[0] { + self.offsets_builder.append_slice(&offsets[1..]); + } else { + // Shifting all the offsets + let shift: i32 = self.next_offset() - offsets[0]; + + // Creating intermediate offsets instead of pushing each offset is faster + // (even if we make MutableBuffer to avoid updating length on each push + // and reserve the necessary capacity, it's still slower) + let mut intermediate = Vec::with_capacity(offsets.len() - 1); + + for &offset in &offsets[1..] { + intermediate.push(offset + shift) + } + + self.offsets_builder.append_slice(&intermediate); + } + + // Append underlying values, starting from the first offset and ending at the last offset + self.value_builder.append_slice( + &array.values().as_slice()[offsets[0] as usize..offsets[array.len()] as usize], + ); + + if let Some(null_buffer) = array.nulls() { + let data: Vec<_> = null_buffer.inner().iter().collect(); + self.null_buffer_builder.append_slice(&data); + } else { + self.null_buffer_builder.append_n_non_nulls(array.len()); + } + } + + pub fn append_null(&mut self) { + self.null_buffer_builder.append(false); + self.offsets_builder.append(self.next_offset()); + } + + pub fn append_n_nulls(&mut self, n: usize) { + self.null_buffer_builder.append_n_nulls(n); + self.offsets_builder.append_n(n, self.next_offset()); + } + + pub fn build(&mut self) -> StringArray { + let array_builder = ArrayDataBuilder::new(arrow::datatypes::DataType::Utf8) + .len(self.len()) + .add_buffer(self.offsets_builder.finish()) + .add_buffer(self.value_builder.finish()) + .nulls(self.null_buffer_builder.finish()); + + self.offsets_builder.append(self.next_offset()); + let array_data = unsafe { array_builder.build_unchecked() }; + GenericByteArray::from(array_data) + } +} + +#[cfg(test)] +mod tests { + use datatypes::arrow::array::StringArray; + + use super::*; + + #[test] + fn test_append() { + let mut builder = StringBuilder::default(); + builder.append_n_nulls(10); + let array = builder.build(); + assert_eq!(vec![None; 10], array.iter().collect::>()); + + let mut builder = StringBuilder::default(); + builder.append_n_nulls(3); + builder.append("hello"); + builder.append_null(); + builder.append("world"); + assert_eq!( + vec![None, None, None, Some("hello"), None, Some("world")], + builder.build().iter().collect::>() + ) + } + + #[test] + fn test_append_empty_string() { + let mut builder = StringBuilder::default(); + builder.append(""); + builder.append_null(); + builder.append(""); + let array = builder.build(); + assert_eq!( + vec![Some(""), None, Some("")], + array.iter().collect::>() + ); + } + + #[test] + fn test_append_large_string() { + let large_str = "a".repeat(1024); + let mut builder = StringBuilder::default(); + builder.append(&large_str); + let array = builder.build(); + assert_eq!(large_str.as_str(), array.value(0)); + } + + #[test] + fn test_append_array() { + let mut builder_1 = StringBuilder::default(); + builder_1.append("hello"); + builder_1.append_null(); + builder_1.append("world"); + + let mut builder_2 = StringBuilder::default(); + builder_2.append_null(); + builder_2.append("!"); + builder_2.append_array(&builder_1.build()); + assert_eq!( + vec![None, Some("!"), Some("hello"), None, Some("world")], + builder_2.build().iter().collect::>() + ) + } + + #[test] + fn test_append_empty_array() { + let mut builder = StringBuilder::default(); + builder.append_array(&StringArray::from(vec![] as Vec<&str>)); + let array = builder.build(); + assert_eq!(0, array.len()); + } + + #[test] + fn test_append_partial_array() { + let source = StringArray::from(vec![Some("a"), None, Some("b"), Some("c")]); + let sliced = source.slice(1, 2); // [None, Some("b")] + + let mut builder = StringBuilder::default(); + builder.append_array(&sliced); + let array = builder.build(); + assert_eq!(vec![None, Some("b")], array.iter().collect::>()); + } + + #[test] + fn test_builder_capacity() { + let mut builder = StringBuilder::with_capacity(10, 100); + assert_eq!(0, builder.len()); + + for i in 0..10 { + builder.append(&format!("string-{}", i)); + } + + let array = builder.build(); + assert_eq!(10, array.len()); + assert_eq!("string-0", array.value(0)); + assert_eq!("string-9", array.value(9)); + } + + #[test] + fn test_builder_reset_after_build() { + let mut builder = StringBuilder::default(); + builder.append("first"); + let array1 = builder.build(); + assert_eq!(1, array1.len()); + + builder.append("second"); + let array2 = builder.build(); + assert_eq!(1, array2.len()); // Not 2 because build() doesn't reset + } +} diff --git a/src/mito2/src/memtable/bulk.rs b/src/mito2/src/memtable/bulk.rs index cf7e7403ed..940aee29a5 100644 --- a/src/mito2/src/memtable/bulk.rs +++ b/src/mito2/src/memtable/bulk.rs @@ -21,7 +21,7 @@ use store_api::storage::{ColumnId, SequenceNumber}; use table::predicate::Predicate; use crate::error::Result; -use crate::memtable::bulk::part::BulkPart; +use crate::memtable::bulk::part::{BulkPart, EncodedBulkPart}; use crate::memtable::key_values::KeyValue; use crate::memtable::{ BoxedBatchIterator, KeyValues, Memtable, MemtableId, MemtableRanges, MemtableRef, @@ -38,7 +38,7 @@ mod row_group_reader; #[derive(Debug)] pub struct BulkMemtable { id: MemtableId, - parts: RwLock>, + parts: RwLock>, } impl Memtable for BulkMemtable { @@ -54,9 +54,7 @@ impl Memtable for BulkMemtable { unimplemented!() } - fn write_bulk(&self, fragment: BulkPart) -> Result<()> { - let mut parts = self.parts.write().unwrap(); - parts.push(fragment); + fn write_bulk(&self, _fragment: BulkPart) -> Result<()> { Ok(()) } @@ -74,7 +72,7 @@ impl Memtable for BulkMemtable { _projection: Option<&[ColumnId]>, _predicate: PredicateGroup, _sequence: Option, - ) -> MemtableRanges { + ) -> Result { todo!() } diff --git a/src/mito2/src/memtable/bulk/part.rs b/src/mito2/src/memtable/bulk/part.rs index 07f5fda529..c9d1349f5f 100644 --- a/src/mito2/src/memtable/bulk/part.rs +++ b/src/mito2/src/memtable/bulk/part.rs @@ -17,13 +17,15 @@ use std::collections::VecDeque; use std::sync::Arc; -use api::v1::Mutation; +use api::helper::{value_to_grpc_value, ColumnDataTypeWrapper}; +use api::v1::{Mutation, OpType}; use bytes::Bytes; +use common_recordbatch::DfRecordBatch as RecordBatch; use common_time::timestamp::TimeUnit; use datafusion::arrow::array::{TimestampNanosecondArray, UInt64Builder}; use datatypes::arrow; use datatypes::arrow::array::{ - Array, ArrayRef, BinaryBuilder, DictionaryArray, RecordBatch, TimestampMicrosecondArray, + Array, ArrayRef, BinaryBuilder, DictionaryArray, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampSecondArray, UInt32Array, UInt64Array, UInt8Array, UInt8Builder, }; @@ -32,34 +34,105 @@ use datatypes::arrow::datatypes::SchemaRef; use datatypes::arrow_array::BinaryArray; use datatypes::data_type::DataType; use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector}; -use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; +use datatypes::value::Value; +use datatypes::vectors::Helper; use parquet::arrow::ArrowWriter; use parquet::data_type::AsBytes; use parquet::file::metadata::ParquetMetaData; use parquet::file::properties::WriterProperties; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use store_api::metadata::RegionMetadataRef; -use store_api::storage::{ColumnId, SequenceNumber}; +use store_api::storage::SequenceNumber; use table::predicate::Predicate; use crate::error; use crate::error::{ComputeArrowSnafu, EncodeMemtableSnafu, NewRecordBatchSnafu, Result}; use crate::memtable::bulk::context::BulkIterContextRef; use crate::memtable::bulk::part_reader::BulkPartIter; -use crate::memtable::key_values::KeyValuesRef; +use crate::memtable::key_values::{KeyValue, KeyValuesRef}; use crate::memtable::BoxedBatchIterator; use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt}; use crate::sst::parquet::format::{PrimaryKeyArray, ReadFormat}; use crate::sst::parquet::helper::parse_parquet_metadata; use crate::sst::to_sst_arrow_schema; -#[derive(Debug)] pub struct BulkPart { + pub(crate) batch: RecordBatch, + pub(crate) num_rows: usize, + pub(crate) max_ts: i64, + pub(crate) min_ts: i64, + pub(crate) sequence: u64, +} + +impl BulkPart { + pub(crate) fn estimated_size(&self) -> usize { + self.batch.get_array_memory_size() + } + + /// Converts [BulkPart] to [Mutation] for fallback `write_bulk` implementation. + pub(crate) fn to_mutation(&self, region_metadata: &RegionMetadataRef) -> Result { + let vectors = region_metadata + .schema + .column_schemas() + .iter() + .map(|col| match self.batch.column_by_name(&col.name) { + None => Ok(None), + Some(col) => Helper::try_into_vector(col).map(Some), + }) + .collect::>>() + .context(error::ComputeVectorSnafu)?; + + let rows = (0..self.num_rows) + .map(|row_idx| { + let values = (0..self.batch.num_columns()) + .map(|col_idx| { + if let Some(v) = &vectors[col_idx] { + value_to_grpc_value(v.get(row_idx)) + } else { + api::v1::Value { value_data: None } + } + }) + .collect::>(); + api::v1::Row { values } + }) + .collect::>(); + + let schema = region_metadata + .column_metadatas + .iter() + .map(|c| { + let data_type_wrapper = + ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())?; + Ok(api::v1::ColumnSchema { + column_name: c.column_schema.name.clone(), + datatype: data_type_wrapper.datatype() as i32, + semantic_type: c.semantic_type as i32, + ..Default::default() + }) + }) + .collect::>>() + .context(error::ConvertColumnDataTypeSnafu { + reason: "failed to convert region metadata to column schema", + })?; + + let rows = api::v1::Rows { schema, rows }; + + Ok(Mutation { + op_type: OpType::Put as i32, + sequence: self.sequence, + rows: Some(rows), + write_hint: None, + }) + } +} + +#[derive(Debug)] +pub struct EncodedBulkPart { data: Bytes, metadata: BulkPartMeta, } -impl BulkPart { +impl EncodedBulkPart { pub fn new(data: Bytes, metadata: BulkPartMeta) -> Self { Self { data, metadata } } @@ -138,8 +211,8 @@ impl BulkPartEncoder { } impl BulkPartEncoder { - /// Encodes mutations to a [BulkPart], returns true if encoded data has been written to `dest`. - fn encode_mutations(&self, mutations: &[Mutation]) -> Result> { + /// Encodes mutations to a [EncodedBulkPart], returns true if encoded data has been written to `dest`. + fn encode_mutations(&self, mutations: &[Mutation]) -> Result> { let Some((arrow_record_batch, min_ts, max_ts)) = mutations_to_record_batch(mutations, &self.metadata, &self.pk_encoder, self.dedup)? else { @@ -162,7 +235,7 @@ impl BulkPartEncoder { let buf = Bytes::from(buf); let parquet_metadata = Arc::new(parse_parquet_metadata(file_metadata)?); - Ok(Some(BulkPart { + Ok(Some(EncodedBulkPart { data: buf, metadata: BulkPartMeta { num_rows: arrow_record_batch.num_rows(), @@ -742,7 +815,7 @@ mod tests { ); } - fn encode(input: &[MutationInput]) -> BulkPart { + fn encode(input: &[MutationInput]) -> EncodedBulkPart { let metadata = metadata_for_test(); let mutations = input .iter() @@ -823,7 +896,7 @@ mod tests { assert_eq!(vec![0.1, 0.2, 0.0], field); } - fn prepare(key_values: Vec<(&str, u32, (i64, i64), u64)>) -> BulkPart { + fn prepare(key_values: Vec<(&str, u32, (i64, i64), u64)>) -> EncodedBulkPart { let metadata = metadata_for_test(); let mutations = key_values .into_iter() @@ -838,7 +911,11 @@ mod tests { encoder.encode_mutations(&mutations).unwrap().unwrap() } - fn check_prune_row_group(part: &BulkPart, predicate: Option, expected_rows: usize) { + fn check_prune_row_group( + part: &EncodedBulkPart, + predicate: Option, + expected_rows: usize, + ) { let context = Arc::new(BulkIterContext::new( part.metadata.region_metadata.clone(), &None, diff --git a/src/mito2/src/memtable/partition_tree.rs b/src/mito2/src/memtable/partition_tree.rs index f359d24cc9..68960c1afe 100644 --- a/src/mito2/src/memtable/partition_tree.rs +++ b/src/mito2/src/memtable/partition_tree.rs @@ -37,11 +37,12 @@ use table::predicate::Predicate; use crate::error::{Result, UnsupportedOperationSnafu}; use crate::flush::WriteBufferManagerRef; +use crate::memtable::bulk::part::BulkPart; use crate::memtable::key_values::KeyValue; use crate::memtable::partition_tree::tree::PartitionTree; use crate::memtable::stats::WriteMetrics; use crate::memtable::{ - AllocTracker, BoxedBatchIterator, BulkPart, IterBuilder, KeyValues, Memtable, MemtableBuilder, + AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, Memtable, MemtableBuilder, MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, PredicateGroup, }; @@ -147,15 +148,11 @@ impl Memtable for PartitionTreeMemtable { // Ensures the memtable always updates stats. let res = self.tree.write(kvs, &mut pk_buffer, &mut metrics); - self.update_stats(&metrics); - - // update max_sequence if res.is_ok() { - let sequence = kvs.max_sequence(); - self.max_sequence.fetch_max(sequence, Ordering::Relaxed); + metrics.max_sequence = kvs.max_sequence(); + metrics.num_rows = kvs.num_rows(); + self.update_stats(&metrics); } - - self.num_rows.fetch_add(kvs.num_rows(), Ordering::Relaxed); res } @@ -165,15 +162,12 @@ impl Memtable for PartitionTreeMemtable { // Ensures the memtable always updates stats. let res = self.tree.write_one(key_value, &mut pk_buffer, &mut metrics); - self.update_stats(&metrics); - // update max_sequence if res.is_ok() { - self.max_sequence - .fetch_max(key_value.sequence(), Ordering::Relaxed); + metrics.max_sequence = metrics.max_sequence.max(key_value.sequence()); + metrics.num_rows = 1; + self.update_stats(&metrics); } - - self.num_rows.fetch_add(1, Ordering::Relaxed); res } @@ -198,7 +192,7 @@ impl Memtable for PartitionTreeMemtable { projection: Option<&[ColumnId]>, predicate: PredicateGroup, sequence: Option, - ) -> MemtableRanges { + ) -> Result { let projection = projection.map(|ids| ids.to_vec()); let builder = Box::new(PartitionTreeIterBuilder { tree: self.tree.clone(), @@ -208,10 +202,10 @@ impl Memtable for PartitionTreeMemtable { }); let context = Arc::new(MemtableRangeContext::new(self.id, builder, predicate)); - MemtableRanges { + Ok(MemtableRanges { ranges: [(0, MemtableRange::new(context))].into(), stats: self.stats(), - } + }) } fn is_empty(&self) -> bool { @@ -306,6 +300,9 @@ impl PartitionTreeMemtable { .fetch_max(metrics.max_ts, Ordering::SeqCst); self.min_timestamp .fetch_min(metrics.min_ts, Ordering::SeqCst); + self.num_rows.fetch_add(metrics.num_rows, Ordering::SeqCst); + self.max_sequence + .fetch_max(metrics.max_sequence, Ordering::SeqCst); } } diff --git a/src/mito2/src/memtable/simple_bulk_memtable.rs b/src/mito2/src/memtable/simple_bulk_memtable.rs new file mode 100644 index 0000000000..edae897d60 --- /dev/null +++ b/src/mito2/src/memtable/simple_bulk_memtable.rs @@ -0,0 +1,673 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashSet; +use std::fmt::{Debug, Formatter}; +use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering}; +use std::sync::{Arc, RwLock}; + +use api::v1::OpType; +use datatypes::vectors::Helper; +use snafu::{OptionExt, ResultExt}; +use store_api::metadata::RegionMetadataRef; +use store_api::storage::{ColumnId, SequenceNumber}; +use table::predicate::Predicate; + +use crate::flush::WriteBufferManagerRef; +use crate::memtable::bulk::part::BulkPart; +use crate::memtable::key_values::KeyValue; +use crate::memtable::stats::WriteMetrics; +use crate::memtable::time_series::{Series, Values}; +use crate::memtable::{ + AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, Memtable, MemtableId, MemtableRange, + MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, +}; +use crate::read::dedup::LastNonNullIter; +use crate::read::scan_region::PredicateGroup; +use crate::read::Batch; +use crate::region::options::MergeMode; +use crate::{error, metrics}; + +pub struct SimpleBulkMemtable { + id: MemtableId, + region_metadata: RegionMetadataRef, + alloc_tracker: AllocTracker, + max_timestamp: AtomicI64, + min_timestamp: AtomicI64, + max_sequence: AtomicU64, + dedup: bool, + merge_mode: MergeMode, + num_rows: AtomicUsize, + series: RwLock, +} + +impl SimpleBulkMemtable { + pub(crate) fn new( + id: MemtableId, + region_metadata: RegionMetadataRef, + write_buffer_manager: Option, + dedup: bool, + merge_mode: MergeMode, + ) -> Self { + let dedup = if merge_mode == MergeMode::LastNonNull { + false + } else { + dedup + }; + let series = RwLock::new(Series::new(®ion_metadata)); + + Self { + id, + region_metadata, + alloc_tracker: AllocTracker::new(write_buffer_manager), + max_timestamp: AtomicI64::new(i64::MIN), + min_timestamp: AtomicI64::new(i64::MAX), + max_sequence: AtomicU64::new(0), + dedup, + merge_mode, + num_rows: AtomicUsize::new(0), + series, + } + } + + fn build_projection(&self, projection: Option<&[ColumnId]>) -> HashSet { + if let Some(projection) = projection { + projection.iter().copied().collect() + } else { + self.region_metadata + .field_columns() + .map(|c| c.column_id) + .collect() + } + } + + fn create_iter( + &self, + projection: Option<&[ColumnId]>, + sequence: Option, + ) -> error::Result { + let mut series = self.series.write().unwrap(); + + let values = if series.is_empty() { + None + } else { + Some(series.compact(&self.region_metadata)?.clone()) + }; + + let projection = self.build_projection(projection); + + Ok(BatchIterBuilder { + region_metadata: self.region_metadata.clone(), + values, + projection, + dedup: self.dedup, + sequence, + merge_mode: self.merge_mode, + }) + } + + fn write_key_value(&self, kv: KeyValue, stats: &mut WriteMetrics) { + let ts = kv.timestamp(); + let sequence = kv.sequence(); + let op_type = kv.op_type(); + let mut series = self.series.write().unwrap(); + let size = series.push(ts, sequence, op_type, kv.fields()); + stats.value_bytes += size; + // safety: timestamp of kv must be both present and a valid timestamp value. + let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value(); + stats.min_ts = stats.min_ts.min(ts); + stats.max_ts = stats.max_ts.max(ts); + } + + /// Updates memtable stats. + fn update_stats(&self, stats: WriteMetrics) { + self.alloc_tracker + .on_allocation(stats.key_bytes + stats.value_bytes); + self.num_rows.fetch_add(stats.num_rows, Ordering::SeqCst); + self.max_timestamp.fetch_max(stats.max_ts, Ordering::SeqCst); + self.min_timestamp.fetch_min(stats.min_ts, Ordering::SeqCst); + self.max_sequence + .fetch_max(stats.max_sequence, Ordering::SeqCst); + } + + #[cfg(test)] + fn schema(&self) -> &RegionMetadataRef { + &self.region_metadata + } +} + +impl Debug for SimpleBulkMemtable { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SimpleBulkMemtable").finish() + } +} + +impl Memtable for SimpleBulkMemtable { + fn id(&self) -> MemtableId { + self.id + } + + fn write(&self, kvs: &KeyValues) -> error::Result<()> { + let mut stats = WriteMetrics::default(); + let max_sequence = kvs.max_sequence(); + for kv in kvs.iter() { + self.write_key_value(kv, &mut stats); + } + stats.max_sequence = max_sequence; + stats.num_rows = kvs.num_rows(); + self.update_stats(stats); + Ok(()) + } + + fn write_one(&self, kv: KeyValue) -> error::Result<()> { + debug_assert_eq!(0, kv.num_primary_keys()); + let mut stats = WriteMetrics::default(); + self.write_key_value(kv, &mut stats); + stats.num_rows = 1; + stats.max_sequence = kv.sequence(); + self.update_stats(stats); + Ok(()) + } + + fn write_bulk(&self, part: BulkPart) -> error::Result<()> { + let rb = &part.batch; + + let ts = Helper::try_into_vector( + rb.column_by_name(&self.region_metadata.time_index_column().column_schema.name) + .with_context(|| error::InvalidRequestSnafu { + region_id: self.region_metadata.region_id, + reason: "Timestamp not found", + })?, + ) + .context(error::ConvertVectorSnafu)?; + + let sequence = part.sequence; + + let fields: Vec<_> = self + .region_metadata + .field_columns() + .map(|f| { + let array = rb.column_by_name(&f.column_schema.name).ok_or_else(|| { + error::InvalidRequestSnafu { + region_id: self.region_metadata.region_id, + reason: format!("Column {} not found", f.column_schema.name), + } + .build() + })?; + Helper::try_into_vector(array).context(error::ConvertVectorSnafu) + }) + .collect::>>()?; + + let mut series = self.series.write().unwrap(); + let extend_timer = metrics::REGION_WORKER_HANDLE_WRITE_ELAPSED + .with_label_values(&["bulk_extend"]) + .start_timer(); + series.extend(ts, OpType::Put as u8, sequence, fields.into_iter())?; + extend_timer.observe_duration(); + + self.update_stats(WriteMetrics { + key_bytes: 0, + value_bytes: part.estimated_size(), + min_ts: part.min_ts, + max_ts: part.max_ts, + num_rows: part.num_rows, + max_sequence: sequence, + }); + Ok(()) + } + + fn iter( + &self, + projection: Option<&[ColumnId]>, + _predicate: Option, + sequence: Option, + ) -> error::Result { + let iter = self.create_iter(projection, sequence)?.build()?; + + if self.merge_mode == MergeMode::LastNonNull { + let iter = LastNonNullIter::new(iter); + Ok(Box::new(iter)) + } else { + Ok(Box::new(iter)) + } + } + + fn ranges( + &self, + projection: Option<&[ColumnId]>, + predicate: PredicateGroup, + sequence: Option, + ) -> error::Result { + let builder = Box::new(self.create_iter(projection, sequence).unwrap()); + + let context = Arc::new(MemtableRangeContext::new(self.id, builder, predicate)); + Ok(MemtableRanges { + ranges: [(0, MemtableRange::new(context))].into(), + stats: self.stats(), + }) + } + + fn is_empty(&self) -> bool { + self.series.read().unwrap().is_empty() + } + + fn freeze(&self) -> error::Result<()> { + self.series.write().unwrap().freeze(&self.region_metadata); + Ok(()) + } + + fn stats(&self) -> MemtableStats { + let estimated_bytes = self.alloc_tracker.bytes_allocated(); + let num_rows = self.num_rows.load(Ordering::Relaxed); + if num_rows == 0 { + // no rows ever written + return MemtableStats { + estimated_bytes, + time_range: None, + num_rows: 0, + num_ranges: 0, + max_sequence: 0, + }; + } + let ts_type = self + .region_metadata + .time_index_column() + .column_schema + .data_type + .clone() + .as_timestamp() + .expect("Timestamp column must have timestamp type"); + let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed)); + let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed)); + MemtableStats { + estimated_bytes, + time_range: Some((min_timestamp, max_timestamp)), + num_rows, + num_ranges: 1, + max_sequence: self.max_sequence.load(Ordering::Relaxed), + } + } + + fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef { + Arc::new(Self::new( + id, + metadata.clone(), + self.alloc_tracker.write_buffer_manager(), + self.dedup, + self.merge_mode, + )) + } +} + +#[derive(Clone)] +struct BatchIterBuilder { + region_metadata: RegionMetadataRef, + values: Option, + projection: HashSet, + sequence: Option, + dedup: bool, + merge_mode: MergeMode, +} + +impl IterBuilder for BatchIterBuilder { + fn build(&self) -> error::Result { + let Some(values) = self.values.clone() else { + return Ok(Box::new(Iter { batch: None })); + }; + + let maybe_batch = values + .to_batch(&[], &self.region_metadata, &self.projection, self.dedup) + .and_then(|mut b| { + b.filter_by_sequence(self.sequence)?; + Ok(b) + }) + .map(Some) + .transpose(); + + let iter = Iter { batch: maybe_batch }; + + if self.merge_mode == MergeMode::LastNonNull { + Ok(Box::new(LastNonNullIter::new(iter))) + } else { + Ok(Box::new(iter)) + } + } +} + +struct Iter { + batch: Option>, +} + +impl Iterator for Iter { + type Item = error::Result; + + fn next(&mut self) -> Option { + self.batch.take() + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use api::v1::value::ValueData; + use api::v1::{Mutation, OpType, Row, Rows, SemanticType}; + use common_recordbatch::DfRecordBatch; + use common_time::Timestamp; + use datatypes::arrow::array::{ArrayRef, Float64Array, TimestampMillisecondArray}; + use datatypes::arrow_array::StringArray; + use datatypes::data_type::ConcreteDataType; + use datatypes::prelude::{ScalarVector, Vector}; + use datatypes::schema::ColumnSchema; + use datatypes::value::Value; + use datatypes::vectors::TimestampMillisecondVector; + use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder}; + use store_api::storage::SequenceNumber; + + use super::*; + use crate::region::options::MergeMode; + use crate::test_util::column_metadata_to_column_schema; + + fn new_test_metadata() -> RegionMetadataRef { + let mut builder = RegionMetadataBuilder::new(1.into()); + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("f1", ConcreteDataType::float64_datatype(), true), + semantic_type: SemanticType::Field, + column_id: 2, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("f2", ConcreteDataType::string_datatype(), true), + semantic_type: SemanticType::Field, + column_id: 3, + }); + Arc::new(builder.build().unwrap()) + } + + fn new_test_memtable(dedup: bool, merge_mode: MergeMode) -> SimpleBulkMemtable { + SimpleBulkMemtable::new(1, new_test_metadata(), None, dedup, merge_mode) + } + + fn build_key_values( + metadata: &RegionMetadataRef, + sequence: SequenceNumber, + row_values: &[(i64, f64, String)], + ) -> KeyValues { + let column_schemas: Vec<_> = metadata + .column_metadatas + .iter() + .map(column_metadata_to_column_schema) + .collect(); + + let rows: Vec<_> = row_values + .iter() + .map(|(ts, f1, f2)| Row { + values: vec![ + api::v1::Value { + value_data: Some(ValueData::TimestampMillisecondValue(*ts)), + }, + api::v1::Value { + value_data: Some(ValueData::F64Value(*f1)), + }, + api::v1::Value { + value_data: Some(ValueData::StringValue(f2.clone())), + }, + ], + }) + .collect(); + let mutation = Mutation { + op_type: OpType::Put as i32, + sequence, + rows: Some(Rows { + schema: column_schemas, + rows, + }), + write_hint: None, + }; + KeyValues::new(metadata, mutation).unwrap() + } + + #[test] + fn test_write_and_iter() { + let memtable = new_test_memtable(false, MergeMode::LastRow); + memtable + .write(&build_key_values( + &memtable.region_metadata, + 0, + &[(1, 1.0, "a".to_string())], + )) + .unwrap(); + memtable + .write(&build_key_values( + &memtable.region_metadata, + 1, + &[(2, 2.0, "b".to_string())], + )) + .unwrap(); + + let mut iter = memtable.iter(None, None, None).unwrap(); + let batch = iter.next().unwrap().unwrap(); + assert_eq!(2, batch.num_rows()); + assert_eq!(2, batch.fields().len()); + let ts_v = batch + .timestamps() + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(Value::Timestamp(Timestamp::new_millisecond(1)), ts_v.get(0)); + assert_eq!(Value::Timestamp(Timestamp::new_millisecond(2)), ts_v.get(1)); + } + + #[test] + fn test_projection() { + let memtable = new_test_memtable(false, MergeMode::LastRow); + memtable + .write(&build_key_values( + &memtable.region_metadata, + 0, + &[(1, 1.0, "a".to_string())], + )) + .unwrap(); + + let mut iter = memtable.iter(None, None, None).unwrap(); + let batch = iter.next().unwrap().unwrap(); + assert_eq!(1, batch.num_rows()); + assert_eq!(2, batch.fields().len()); + + let ts_v = batch + .timestamps() + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(Value::Timestamp(Timestamp::new_millisecond(1)), ts_v.get(0)); + + // Only project column 2 (f1) + let projection = vec![2]; + let mut iter = memtable.iter(Some(&projection), None, None).unwrap(); + let batch = iter.next().unwrap().unwrap(); + + assert_eq!(1, batch.num_rows()); + assert_eq!(1, batch.fields().len()); // only f1 + assert_eq!(2, batch.fields()[0].column_id); + } + + #[test] + fn test_dedup() { + let memtable = new_test_memtable(true, MergeMode::LastRow); + memtable + .write(&build_key_values( + &memtable.region_metadata, + 0, + &[(1, 1.0, "a".to_string())], + )) + .unwrap(); + memtable + .write(&build_key_values( + &memtable.region_metadata, + 1, + &[(1, 2.0, "b".to_string())], + )) + .unwrap(); + let mut iter = memtable.iter(None, None, None).unwrap(); + let batch = iter.next().unwrap().unwrap(); + + assert_eq!(1, batch.num_rows()); // deduped to 1 row + assert_eq!(2.0, batch.fields()[0].data.get(0).as_f64_lossy().unwrap()); // last write wins + } + + #[test] + fn test_write_one() { + let memtable = new_test_memtable(false, MergeMode::LastRow); + let kvs = build_key_values(&memtable.region_metadata, 0, &[(1, 1.0, "a".to_string())]); + let kv = kvs.iter().next().unwrap(); + memtable.write_one(kv).unwrap(); + + let mut iter = memtable.iter(None, None, None).unwrap(); + let batch = iter.next().unwrap().unwrap(); + assert_eq!(1, batch.num_rows()); + } + + #[test] + fn test_write_bulk() { + let memtable = new_test_memtable(false, MergeMode::LastRow); + let arrow_schema = memtable.schema().schema.arrow_schema().clone(); + let arrays = vec![ + Arc::new(TimestampMillisecondArray::from(vec![1, 2])) as ArrayRef, + Arc::new(Float64Array::from(vec![1.0, 2.0])) as ArrayRef, + Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef, + ]; + let rb = DfRecordBatch::try_new(arrow_schema, arrays).unwrap(); + + let part = BulkPart { + batch: rb, + sequence: 1, + min_ts: 1, + max_ts: 2, + num_rows: 2, + }; + memtable.write_bulk(part).unwrap(); + + let mut iter = memtable.iter(None, None, None).unwrap(); + let batch = iter.next().unwrap().unwrap(); + assert_eq!(2, batch.num_rows()); + + let stats = memtable.stats(); + assert_eq!(1, stats.max_sequence); + assert_eq!(2, stats.num_rows); + assert_eq!( + Some((Timestamp::new_millisecond(1), Timestamp::new_millisecond(2))), + stats.time_range + ); + + let kvs = build_key_values(&memtable.region_metadata, 2, &[(3, 3.0, "c".to_string())]); + memtable.write(&kvs).unwrap(); + let mut iter = memtable.iter(None, None, None).unwrap(); + let batch = iter.next().unwrap().unwrap(); + assert_eq!(3, batch.num_rows()); + assert_eq!( + vec![1, 2, 3], + batch + .timestamps() + .as_any() + .downcast_ref::() + .unwrap() + .iter_data() + .map(|t| { t.unwrap().0.value() }) + .collect::>() + ); + } + + #[test] + fn test_is_empty() { + let memtable = new_test_memtable(false, MergeMode::LastRow); + assert!(memtable.is_empty()); + + memtable + .write(&build_key_values( + &memtable.region_metadata, + 0, + &[(1, 1.0, "a".to_string())], + )) + .unwrap(); + assert!(!memtable.is_empty()); + } + + #[test] + fn test_stats() { + let memtable = new_test_memtable(false, MergeMode::LastRow); + let stats = memtable.stats(); + assert_eq!(0, stats.num_rows); + assert!(stats.time_range.is_none()); + + memtable + .write(&build_key_values( + &memtable.region_metadata, + 0, + &[(1, 1.0, "a".to_string())], + )) + .unwrap(); + let stats = memtable.stats(); + assert_eq!(1, stats.num_rows); + assert!(stats.time_range.is_some()); + } + + #[test] + fn test_fork() { + let memtable = new_test_memtable(false, MergeMode::LastRow); + memtable + .write(&build_key_values( + &memtable.region_metadata, + 0, + &[(1, 1.0, "a".to_string())], + )) + .unwrap(); + + let forked = memtable.fork(2, &memtable.region_metadata); + assert!(forked.is_empty()); + } + + #[test] + fn test_sequence_filter() { + let memtable = new_test_memtable(false, MergeMode::LastRow); + memtable + .write(&build_key_values( + &memtable.region_metadata, + 0, + &[(1, 1.0, "a".to_string())], + )) + .unwrap(); + memtable + .write(&build_key_values( + &memtable.region_metadata, + 1, + &[(2, 2.0, "b".to_string())], + )) + .unwrap(); + + // Filter with sequence 0 should only return first write + let mut iter = memtable.iter(None, None, Some(0)).unwrap(); + let batch = iter.next().unwrap().unwrap(); + assert_eq!(1, batch.num_rows()); + assert_eq!(1.0, batch.fields()[0].data.get(0).as_f64_lossy().unwrap()); + } +} diff --git a/src/mito2/src/memtable/stats.rs b/src/mito2/src/memtable/stats.rs index d203da3194..2058b96bd1 100644 --- a/src/mito2/src/memtable/stats.rs +++ b/src/mito2/src/memtable/stats.rs @@ -14,6 +14,8 @@ //! Internal metrics of the memtable. +use store_api::storage::SequenceNumber; + /// Metrics of writing memtables. pub(crate) struct WriteMetrics { /// Size allocated by keys. @@ -24,6 +26,10 @@ pub(crate) struct WriteMetrics { pub(crate) min_ts: i64, /// Maximum timestamp pub(crate) max_ts: i64, + /// Rows written. + pub(crate) num_rows: usize, + /// Max sequence number written. + pub(crate) max_sequence: SequenceNumber, } impl Default for WriteMetrics { @@ -33,6 +39,8 @@ impl Default for WriteMetrics { value_bytes: 0, min_ts: i64::MAX, max_ts: i64::MIN, + num_rows: 0, + max_sequence: SequenceNumber::MIN, } } } diff --git a/src/mito2/src/memtable/time_partition.rs b/src/mito2/src/memtable/time_partition.rs index 4a49d9c031..3801855ee9 100644 --- a/src/mito2/src/memtable/time_partition.rs +++ b/src/mito2/src/memtable/time_partition.rs @@ -15,7 +15,7 @@ //! Partitions memtables by time. use std::collections::HashMap; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, MutexGuard}; use std::time::Duration; use common_telemetry::debug; @@ -26,7 +26,9 @@ use smallvec::{smallvec, SmallVec}; use snafu::OptionExt; use store_api::metadata::RegionMetadataRef; +use crate::error; use crate::error::{InvalidRequestSnafu, Result}; +use crate::memtable::bulk::part::BulkPart; use crate::memtable::key_values::KeyValue; use crate::memtable::version::SmallMemtableVec; use crate::memtable::{KeyValues, MemtableBuilderRef, MemtableId, MemtableRef}; @@ -55,6 +57,11 @@ impl TimePartition { fn write(&self, kvs: &KeyValues) -> Result<()> { self.memtable.write(kvs) } + + /// Writes a record batch to memtable. + fn write_record_batch(&self, rb: BulkPart) -> error::Result<()> { + self.memtable.write_bulk(rb) + } } type PartitionVec = SmallVec<[TimePartition; 2]>; @@ -141,6 +148,101 @@ impl TimePartitions { self.write_multi_parts(kvs, &parts) } + pub fn write_bulk(&self, rb: BulkPart) -> Result<()> { + // Get all parts. + let parts = self.list_partitions(); + let mut matched = vec![]; + for part in &parts { + let Some(part_time_range) = part.time_range.as_ref() else { + matched.push(part); + continue; + }; + if !(rb.max_ts < part_time_range.min_timestamp.value() + || rb.min_ts >= part_time_range.max_timestamp.value()) + { + // find all intersecting time partitions. + matched.push(part); + } + } + + if !matched.is_empty() { + // fixme(hl): we now only write to the first time partition, we should strictly + // split the record batch according to time window + matched[0].write_record_batch(rb) + } else { + // safety: part_duration field must be set when reach here because otherwise + // matched won't be empty. + let part_duration = self.part_duration.unwrap(); + let bulk_start_ts = self + .metadata + .time_index_column() + .column_schema + .data_type + .as_timestamp() + .unwrap() + .create_timestamp(rb.min_ts); + let part_start = + partition_start_timestamp(bulk_start_ts, part_duration).with_context(|| { + InvalidRequestSnafu { + region_id: self.metadata.region_id, + reason: format!( + "timestamp {bulk_start_ts:?} and bucket {part_duration:?} are out of range" + ), + } + })?; + + let new_part = { + let mut inner = self.inner.lock().unwrap(); + self.create_time_partition(part_start, &mut inner)? + }; + new_part.memtable.write_bulk(rb) + } + } + + // Creates new parts and return the partition created. + // Acquires the lock to avoid others create the same partition. + fn create_time_partition( + &self, + part_start: Timestamp, + inner: &mut MutexGuard, + ) -> Result { + let part_duration = self.part_duration.unwrap(); + let part_pos = match inner + .parts + .iter() + .position(|part| part.time_range.unwrap().min_timestamp == part_start) + { + Some(pos) => pos, + None => { + let range = PartTimeRange::from_start_duration(part_start, part_duration) + .with_context(|| InvalidRequestSnafu { + region_id: self.metadata.region_id, + reason: format!( + "Partition time range for {part_start:?} is out of bound, bucket size: {part_duration:?}", + ), + })?; + let memtable = self + .builder + .build(inner.alloc_memtable_id(), &self.metadata); + debug!( + "Create time partition {:?} for region {}, duration: {:?}, memtable_id: {}, parts_total: {}", + range, + self.metadata.region_id, + part_duration, + memtable.id(), + inner.parts.len() + 1 + ); + let pos = inner.parts.len(); + inner.parts.push(TimePartition { + memtable, + time_range: Some(range), + }); + pos + } + }; + Ok(inner.parts[part_pos].clone()) + } + /// Append memtables in partitions to `memtables`. pub fn list_memtables(&self, memtables: &mut Vec) { let inner = self.inner.lock().unwrap(); @@ -330,48 +432,13 @@ impl TimePartitions { } } - let part_duration = self.part_duration.unwrap(); // Creates new parts and writes to them. Acquires the lock to avoid others create // the same partition. let mut inner = self.inner.lock().unwrap(); for (part_start, key_values) in missing_parts { - let part_pos = match inner - .parts - .iter() - .position(|part| part.time_range.unwrap().min_timestamp == part_start) - { - Some(pos) => pos, - None => { - let range = PartTimeRange::from_start_duration(part_start, part_duration) - .with_context(|| InvalidRequestSnafu { - region_id: self.metadata.region_id, - reason: format!( - "Partition time range for {part_start:?} is out of bound, bucket size: {part_duration:?}", - ), - })?; - let memtable = self - .builder - .build(inner.alloc_memtable_id(), &self.metadata); - debug!( - "Create time partition {:?} for region {}, duration: {:?}, memtable_id: {}, parts_total: {}", - range, - self.metadata.region_id, - part_duration, - memtable.id(), - inner.parts.len() + 1 - ); - let pos = inner.parts.len(); - inner.parts.push(TimePartition { - memtable, - time_range: Some(range), - }); - pos - } - }; - - let memtable = &inner.parts[part_pos].memtable; + let partition = self.create_time_partition(part_start, &mut inner)?; for kv in key_values { - memtable.write_one(kv)?; + partition.memtable.write_one(kv)?; } } diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index 751e0cdc5d..9743ac1843 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -15,6 +15,7 @@ use std::collections::btree_map::Entry; use std::collections::{BTreeMap, Bound, HashSet}; use std::fmt::{Debug, Formatter}; +use std::iter; use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; @@ -25,28 +26,30 @@ use common_telemetry::{debug, error}; use common_time::Timestamp; use datatypes::arrow; use datatypes::arrow::array::ArrayRef; +use datatypes::arrow_array::StringArray; use datatypes::data_type::{ConcreteDataType, DataType}; -use datatypes::prelude::{MutableVector, Vector, VectorRef}; +use datatypes::prelude::{ScalarVector, Vector, VectorRef}; use datatypes::types::TimestampType; use datatypes::value::{Value, ValueRef}; use datatypes::vectors::{ Helper, TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector, TimestampSecondVector, UInt64Vector, UInt8Vector, }; -use snafu::{ensure, ResultExt}; +use snafu::{ensure, OptionExt, ResultExt}; use store_api::metadata::RegionMetadataRef; use store_api::storage::{ColumnId, SequenceNumber}; use table::predicate::Predicate; -use crate::error::{ - ComputeArrowSnafu, ConvertVectorSnafu, PrimaryKeyLengthMismatchSnafu, Result, - UnsupportedOperationSnafu, -}; +use crate::error; +use crate::error::{ComputeArrowSnafu, ConvertVectorSnafu, PrimaryKeyLengthMismatchSnafu, Result}; use crate::flush::WriteBufferManagerRef; +use crate::memtable::builder::{FieldBuilder, StringBuilder}; +use crate::memtable::bulk::part::BulkPart; use crate::memtable::key_values::KeyValue; +use crate::memtable::simple_bulk_memtable::SimpleBulkMemtable; use crate::memtable::stats::WriteMetrics; use crate::memtable::{ - AllocTracker, BoxedBatchIterator, BulkPart, IterBuilder, KeyValues, Memtable, MemtableBuilder, + AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, Memtable, MemtableBuilder, MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, PredicateGroup, }; @@ -57,7 +60,7 @@ use crate::region::options::MergeMode; use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt}; /// Initial vector builder capacity. -const INITIAL_BUILDER_CAPACITY: usize = 16; +const INITIAL_BUILDER_CAPACITY: usize = 1024 * 8; /// Vector builder capacity. const BUILDER_CAPACITY: usize = 512; @@ -87,13 +90,23 @@ impl TimeSeriesMemtableBuilder { impl MemtableBuilder for TimeSeriesMemtableBuilder { fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef { - Arc::new(TimeSeriesMemtable::new( - metadata.clone(), - id, - self.write_buffer_manager.clone(), - self.dedup, - self.merge_mode, - )) + if metadata.primary_key.is_empty() { + Arc::new(SimpleBulkMemtable::new( + id, + metadata.clone(), + self.write_buffer_manager.clone(), + self.dedup, + self.merge_mode, + )) + } else { + Arc::new(TimeSeriesMemtable::new( + metadata.clone(), + id, + self.write_buffer_manager.clone(), + self.dedup, + self.merge_mode, + )) + } } } @@ -149,6 +162,9 @@ impl TimeSeriesMemtable { .on_allocation(stats.key_bytes + stats.value_bytes); self.max_timestamp.fetch_max(stats.max_ts, Ordering::SeqCst); self.min_timestamp.fetch_min(stats.min_ts, Ordering::SeqCst); + self.max_sequence + .fetch_max(stats.max_sequence, Ordering::SeqCst); + self.num_rows.fetch_add(stats.num_rows, Ordering::SeqCst); } fn write_key_value(&self, kv: KeyValue, stats: &mut WriteMetrics) -> Result<()> { @@ -198,17 +214,12 @@ impl Memtable for TimeSeriesMemtable { } local_stats.value_bytes += kvs.num_rows() * std::mem::size_of::(); local_stats.value_bytes += kvs.num_rows() * std::mem::size_of::(); - + local_stats.max_sequence = kvs.max_sequence(); + local_stats.num_rows = kvs.num_rows(); // TODO(hl): this maybe inaccurate since for-iteration may return early. // We may lift the primary key length check out of Memtable::write // so that we can ensure writing to memtable will succeed. self.update_stats(local_stats); - - // update max_sequence - let sequence = kvs.max_sequence(); - self.max_sequence.fetch_max(sequence, Ordering::Relaxed); - - self.num_rows.fetch_add(kvs.num_rows(), Ordering::Relaxed); Ok(()) } @@ -216,24 +227,31 @@ impl Memtable for TimeSeriesMemtable { let mut metrics = WriteMetrics::default(); let res = self.write_key_value(key_value, &mut metrics); metrics.value_bytes += std::mem::size_of::() + std::mem::size_of::(); + metrics.max_sequence = key_value.sequence(); + metrics.num_rows = 1; - self.update_stats(metrics); - - // update max_sequence if res.is_ok() { - self.max_sequence - .fetch_max(key_value.sequence(), Ordering::Relaxed); + self.update_stats(metrics); } - - self.num_rows.fetch_add(1, Ordering::Relaxed); res } - fn write_bulk(&self, _part: BulkPart) -> Result<()> { - UnsupportedOperationSnafu { - err_msg: "TimeSeriesMemtable does not support write_bulk", + fn write_bulk(&self, part: BulkPart) -> Result<()> { + // Default implementation fallback to row iteration. + let mutation = part.to_mutation(&self.region_metadata)?; + let mut metrics = WriteMetrics::default(); + if let Some(key_values) = KeyValues::new(&self.region_metadata, mutation) { + for kv in key_values.iter() { + self.write_key_value(kv, &mut metrics)? + } } - .fail() + + metrics.max_sequence = part.sequence; + metrics.max_ts = part.max_ts; + metrics.min_ts = part.min_ts; + metrics.num_rows = part.num_rows; + self.update_stats(metrics); + Ok(()) } fn iter( @@ -268,7 +286,7 @@ impl Memtable for TimeSeriesMemtable { projection: Option<&[ColumnId]>, predicate: PredicateGroup, sequence: Option, - ) -> MemtableRanges { + ) -> Result { let projection = if let Some(projection) = projection { projection.iter().copied().collect() } else { @@ -287,10 +305,10 @@ impl Memtable for TimeSeriesMemtable { }); let context = Arc::new(MemtableRangeContext::new(self.id, builder, predicate)); - MemtableRanges { + Ok(MemtableRanges { ranges: [(0, MemtableRange::new(context))].into(), stats: self.stats(), - } + }) } fn is_empty(&self) -> bool { @@ -349,10 +367,10 @@ impl Memtable for TimeSeriesMemtable { type SeriesRwLockMap = RwLock, Arc>>>; #[derive(Clone)] -struct SeriesSet { - region_metadata: RegionMetadataRef, - series: Arc, - codec: Arc, +pub(crate) struct SeriesSet { + pub(crate) region_metadata: RegionMetadataRef, + pub(crate) series: Arc, + pub(crate) codec: Arc, } impl SeriesSet { @@ -637,7 +655,7 @@ fn prune_primary_key( } /// A `Series` holds a list of field values of some given primary key. -struct Series { +pub(crate) struct Series { pk_cache: Option>, active: ValueBuilder, frozen: Vec, @@ -645,7 +663,7 @@ struct Series { } impl Series { - fn new(region_metadata: &RegionMetadataRef) -> Self { + pub(crate) fn new(region_metadata: &RegionMetadataRef) -> Self { Self { pk_cache: None, active: ValueBuilder::new(region_metadata, INITIAL_BUILDER_CAPACITY), @@ -654,8 +672,12 @@ impl Series { } } + pub fn is_empty(&self) -> bool { + self.active.len() == 0 && self.frozen.is_empty() + } + /// Pushes a row of values into Series. Return the size of values. - fn push<'a>( + pub(crate) fn push<'a>( &mut self, ts: ValueRef<'a>, sequence: u64, @@ -675,7 +697,7 @@ impl Series { } /// Freezes the active part and push it to `frozen`. - fn freeze(&mut self, region_metadata: &RegionMetadataRef) { + pub(crate) fn freeze(&mut self, region_metadata: &RegionMetadataRef) { if self.active.len() != 0 { let mut builder = ValueBuilder::new(region_metadata, INITIAL_BUILDER_CAPACITY); std::mem::swap(&mut self.active, &mut builder); @@ -683,9 +705,19 @@ impl Series { } } + pub(crate) fn extend( + &mut self, + ts_v: VectorRef, + op_type_v: u8, + sequence_v: u64, + fields: impl Iterator, + ) -> Result<()> { + self.active.extend(ts_v, op_type_v, sequence_v, fields) + } + /// Freezes active part to frozen part and compact frozen part to reduce memory fragmentation. /// Returns the frozen and compacted values. - fn compact(&mut self, region_metadata: &RegionMetadataRef) -> Result<&Values> { + pub(crate) fn compact(&mut self, region_metadata: &RegionMetadataRef) -> Result<&Values> { self.freeze(region_metadata); let frozen = &self.frozen; @@ -729,12 +761,12 @@ struct ValueBuilder { timestamp_type: ConcreteDataType, sequence: Vec, op_type: Vec, - fields: Vec>>, + fields: Vec>, field_types: Vec, } impl ValueBuilder { - fn new(region_metadata: &RegionMetadataRef, capacity: usize) -> Self { + pub(crate) fn new(region_metadata: &RegionMetadataRef, capacity: usize) -> Self { let timestamp_type = region_metadata .time_index_column() .column_schema @@ -788,12 +820,19 @@ impl ValueBuilder { size += field_value.data_size(); if !field_value.is_null() || self.fields[idx].is_some() { if let Some(field) = self.fields[idx].as_mut() { - let _ = field.try_push_value_ref(field_value); + let _ = field.push(field_value); } else { - let mut mutable_vector = self.field_types[idx] - .create_mutable_vector(num_rows.max(INITIAL_BUILDER_CAPACITY)); + let mut mutable_vector = + if let ConcreteDataType::String(_) = &self.field_types[idx] { + FieldBuilder::String(StringBuilder::with_capacity(256, 4096)) + } else { + FieldBuilder::Other( + self.field_types[idx] + .create_mutable_vector(num_rows.max(INITIAL_BUILDER_CAPACITY)), + ) + }; mutable_vector.push_nulls(num_rows - 1); - let _ = mutable_vector.try_push_value_ref(field_value); + let _ = mutable_vector.push(field_value); self.fields[idx] = Some(mutable_vector); } } @@ -802,6 +841,96 @@ impl ValueBuilder { size } + pub(crate) fn extend( + &mut self, + ts_v: VectorRef, + op_type: u8, + sequence: u64, + fields: impl Iterator, + ) -> error::Result<()> { + let num_rows_before = self.timestamp.len(); + let num_rows_to_write = ts_v.len(); + self.timestamp.reserve(num_rows_to_write); + match self.timestamp_type { + ConcreteDataType::Timestamp(TimestampType::Second(_)) => { + self.timestamp.extend( + ts_v.as_any() + .downcast_ref::() + .unwrap() + .iter_data() + .map(|v| v.unwrap().0.value()), + ); + } + ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => { + self.timestamp.extend( + ts_v.as_any() + .downcast_ref::() + .unwrap() + .iter_data() + .map(|v| v.unwrap().0.value()), + ); + } + ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => { + self.timestamp.extend( + ts_v.as_any() + .downcast_ref::() + .unwrap() + .iter_data() + .map(|v| v.unwrap().0.value()), + ); + } + ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => { + self.timestamp.extend( + ts_v.as_any() + .downcast_ref::() + .unwrap() + .iter_data() + .map(|v| v.unwrap().0.value()), + ); + } + _ => unreachable!(), + }; + + self.op_type.reserve(num_rows_to_write); + self.op_type + .extend(iter::repeat_n(op_type, num_rows_to_write)); + self.sequence.reserve(num_rows_to_write); + self.sequence + .extend(iter::repeat_n(sequence, num_rows_to_write)); + + for (field_idx, (field_src, field_dest)) in fields.zip(self.fields.iter_mut()).enumerate() { + let builder = field_dest.get_or_insert_with(|| { + let mut field_builder = + FieldBuilder::create(&self.field_types[field_idx], INITIAL_BUILDER_CAPACITY); + field_builder.push_nulls(num_rows_before); + field_builder + }); + match builder { + FieldBuilder::String(builder) => { + let array = field_src.to_arrow_array(); + let string_array = + array + .as_any() + .downcast_ref::() + .with_context(|| error::InvalidBatchSnafu { + reason: format!( + "Field type mismatch, expecting String, given: {}", + field_src.data_type() + ), + })?; + builder.append_array(string_array); + } + FieldBuilder::Other(builder) => { + let len = field_src.len(); + builder + .extend_slice_of(&*field_src, 0, len) + .context(error::ComputeVectorSnafu)?; + } + } + } + Ok(()) + } + /// Returns the length of [ValueBuilder] fn len(&self) -> usize { let sequence_len = self.sequence.len(); @@ -813,7 +942,7 @@ impl ValueBuilder { /// [Values] holds an immutable vectors of field columns, including `sequence` and `op_type`. #[derive(Clone)] -struct Values { +pub(crate) struct Values { timestamp: VectorRef, sequence: Arc, op_type: Arc, @@ -891,7 +1020,7 @@ impl From for Values { .enumerate() .map(|(i, v)| { if let Some(v) = v { - v.to_vector() + v.finish() } else { let mut single_null = value.field_types[i].create_mutable_vector(num_rows); single_null.push_nulls(num_rows); @@ -899,6 +1028,7 @@ impl From for Values { } }) .collect::>(); + let sequence = Arc::new(UInt64Vector::from_vec(value.sequence)); let op_type = Arc::new(UInt8Vector::from_vec(value.op_type)); let timestamp: VectorRef = match value.timestamp_type { diff --git a/src/mito2/src/metrics.rs b/src/mito2/src/metrics.rs index e702aa86f5..68d09f2947 100644 --- a/src/mito2/src/metrics.rs +++ b/src/mito2/src/metrics.rs @@ -392,6 +392,15 @@ lazy_static! { // 0.01 ~ 1000 exponential_buckets(0.01, 10.0, 6).unwrap(), ).unwrap(); + + + pub static ref REGION_WORKER_HANDLE_WRITE_ELAPSED: HistogramVec = register_histogram_vec!( + "greptime_region_worker_handle_write", + "elapsed time for handling writes in region worker loop", + &["stage"], + exponential_buckets(0.001, 10.0, 5).unwrap() + ).unwrap(); + } /// Stager notifier to collect metrics. diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index dd40cfc622..a35dd00e1b 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -371,14 +371,14 @@ impl ScanRegion { let memtables = memtables .into_iter() .map(|mem| { - let ranges = mem.ranges( + mem.ranges( Some(mapper.column_ids()), predicate.clone(), self.request.sequence, - ); - MemRangeBuilder::new(ranges) + ) + .map(MemRangeBuilder::new) }) - .collect(); + .collect::>>()?; let input = ScanInput::new(self.access_layer, mapper) .with_time_range(Some(time_range)) diff --git a/src/mito2/src/region_write_ctx.rs b/src/mito2/src/region_write_ctx.rs index 2a1f935245..ea0b049faa 100644 --- a/src/mito2/src/region_write_ctx.rs +++ b/src/mito2/src/region_write_ctx.rs @@ -23,7 +23,9 @@ use store_api::logstore::LogStore; use store_api::storage::{RegionId, SequenceNumber}; use crate::error::{Error, Result, WriteGroupSnafu}; +use crate::memtable::bulk::part::BulkPart; use crate::memtable::KeyValues; +use crate::metrics; use crate::region::version::{VersionControlData, VersionControlRef, VersionRef}; use crate::request::OptionOutputTx; use crate::wal::{EntryId, WalWriter}; @@ -92,6 +94,10 @@ pub(crate) struct RegionWriteCtx { /// /// The i-th notify is for i-th mutation. notifiers: Vec, + /// Notifiers for bulk requests. + bulk_notifiers: Vec, + /// Pending bulk write requests + pub(crate) bulk_parts: Vec, /// The write operation is failed and we should not write to the mutable memtable. failed: bool, @@ -125,9 +131,11 @@ impl RegionWriteCtx { wal_entry: WalEntry::default(), provider, notifiers: Vec::new(), + bulk_notifiers: vec![], failed: false, put_num: 0, delete_num: 0, + bulk_parts: vec![], } } @@ -243,4 +251,53 @@ impl RegionWriteCtx { self.version_control .set_sequence_and_entry_id(self.next_sequence - 1, self.next_entry_id - 1); } + + pub(crate) fn push_bulk(&mut self, sender: OptionOutputTx, mut bulk: BulkPart) { + self.bulk_notifiers + .push(WriteNotify::new(sender, bulk.num_rows)); + bulk.sequence = self.next_sequence; + self.next_sequence += bulk.num_rows as u64; + self.bulk_parts.push(bulk); + } + + pub(crate) async fn write_bulk(&mut self) { + if self.failed || self.bulk_parts.is_empty() { + return; + } + let _timer = metrics::REGION_WORKER_HANDLE_WRITE_ELAPSED + .with_label_values(&["write_bulk"]) + .start_timer(); + + if self.bulk_parts.len() == 1 { + let part = self.bulk_parts.swap_remove(0); + let num_rows = part.num_rows; + if let Err(e) = self.version.memtables.mutable.write_bulk(part) { + self.bulk_notifiers[0].err = Some(Arc::new(e)); + } else { + self.put_num += num_rows; + } + return; + } + + let mut tasks = FuturesUnordered::new(); + for (i, part) in self.bulk_parts.drain(..).enumerate() { + let mutable = self.version.memtables.mutable.clone(); + tasks.push(common_runtime::spawn_blocking_global(move || { + let num_rows = part.num_rows; + (i, mutable.write_bulk(part), num_rows) + })); + } + while let Some(result) = tasks.next().await { + // first unwrap the result from `spawn` above + let (i, result, num_rows) = result.unwrap(); + if let Err(err) = result { + self.bulk_notifiers[i].err = Some(Arc::new(err)); + } else { + self.put_num += num_rows; + } + } + + self.version_control + .set_sequence_and_entry_id(self.next_sequence - 1, self.next_entry_id - 1); + } } diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index 9332deb8fc..5910e78485 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -47,6 +47,7 @@ use crate::error::{ FlushRegionSnafu, InvalidRequestSnafu, Result, UnexpectedImpureDefaultSnafu, }; use crate::manifest::action::RegionEdit; +use crate::memtable::bulk::part::BulkPart; use crate::memtable::MemtableId; use crate::metrics::COMPACTION_ELAPSED_TOTAL; use crate::wal::entry_distributor::WalEntryReceiver; @@ -534,6 +535,13 @@ pub(crate) struct SenderWriteRequest { pub(crate) request: WriteRequest, } +pub(crate) struct SenderBulkRequest { + pub(crate) sender: OptionOutputTx, + pub(crate) region_id: RegionId, + pub(crate) request: BulkPart, + pub(crate) region_metadata: RegionMetadataRef, +} + /// Request sent to a worker #[derive(Debug)] pub(crate) enum WorkerRequest { diff --git a/src/mito2/src/test_util/memtable_util.rs b/src/mito2/src/test_util/memtable_util.rs index d8064ba86b..2d118b17da 100644 --- a/src/mito2/src/test_util/memtable_util.rs +++ b/src/mito2/src/test_util/memtable_util.rs @@ -30,10 +30,11 @@ use store_api::storage::{ColumnId, RegionId, SequenceNumber}; use table::predicate::Predicate; use crate::error::Result; +use crate::memtable::bulk::part::BulkPart; use crate::memtable::key_values::KeyValue; use crate::memtable::partition_tree::data::{timestamp_array_to_i64_slice, DataBatch, DataBuffer}; use crate::memtable::{ - BoxedBatchIterator, BulkPart, KeyValues, Memtable, MemtableBuilder, MemtableId, MemtableRanges, + BoxedBatchIterator, KeyValues, Memtable, MemtableBuilder, MemtableId, MemtableRanges, MemtableRef, MemtableStats, }; use crate::read::scan_region::PredicateGroup; @@ -95,8 +96,8 @@ impl Memtable for EmptyMemtable { _projection: Option<&[ColumnId]>, _predicate: PredicateGroup, _sequence: Option, - ) -> MemtableRanges { - MemtableRanges::default() + ) -> Result { + Ok(MemtableRanges::default()) } fn is_empty(&self) -> bool { diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 6614bcc749..9cc81da453 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -61,7 +61,8 @@ use crate::memtable::MemtableBuilderProvider; use crate::metrics::{REGION_COUNT, WRITE_STALL_TOTAL}; use crate::region::{MitoRegionRef, OpeningRegions, OpeningRegionsRef, RegionMap, RegionMapRef}; use crate::request::{ - BackgroundNotify, DdlRequest, SenderDdlRequest, SenderWriteRequest, WorkerRequest, + BackgroundNotify, DdlRequest, SenderBulkRequest, SenderDdlRequest, SenderWriteRequest, + WorkerRequest, }; use crate::schedule::scheduler::{LocalScheduler, SchedulerRef}; use crate::sst::file::FileId; @@ -593,22 +594,39 @@ pub(crate) struct StalledRequests { /// /// Key: RegionId /// Value: (estimated size, stalled requests) - pub(crate) requests: HashMap)>, + pub(crate) requests: + HashMap, Vec)>, /// Estimated size of all stalled requests. pub(crate) estimated_size: usize, } impl StalledRequests { /// Appends stalled requests. - pub(crate) fn append(&mut self, requests: &mut Vec) { + pub(crate) fn append( + &mut self, + requests: &mut Vec, + bulk_requests: &mut Vec, + ) { for req in requests.drain(..) { self.push(req); } + for req in bulk_requests.drain(..) { + self.push_bulk(req); + } } /// Pushes a stalled request to the buffer. pub(crate) fn push(&mut self, req: SenderWriteRequest) { - let (size, requests) = self.requests.entry(req.request.region_id).or_default(); + let (size, requests, _) = self.requests.entry(req.request.region_id).or_default(); + let req_size = req.request.estimated_size(); + *size += req_size; + self.estimated_size += req_size; + requests.push(req); + } + + pub(crate) fn push_bulk(&mut self, req: SenderBulkRequest) { + let region_id = req.region_id; + let (size, _, requests) = self.requests.entry(region_id).or_default(); let req_size = req.request.estimated_size(); *size += req_size; self.estimated_size += req_size; @@ -616,18 +634,24 @@ impl StalledRequests { } /// Removes stalled requests of specific region. - pub(crate) fn remove(&mut self, region_id: &RegionId) -> Vec { - if let Some((size, requests)) = self.requests.remove(region_id) { + pub(crate) fn remove( + &mut self, + region_id: &RegionId, + ) -> (Vec, Vec) { + if let Some((size, write_reqs, bulk_reqs)) = self.requests.remove(region_id) { self.estimated_size -= size; - requests + (write_reqs, bulk_reqs) } else { - vec![] + (vec![], vec![]) } } /// Returns the total number of all stalled requests. pub(crate) fn stalled_count(&self) -> usize { - self.requests.values().map(|reqs| reqs.1.len()).sum() + self.requests + .values() + .map(|(_, reqs, bulk_reqs)| reqs.len() + bulk_reqs.len()) + .sum() } } @@ -704,6 +728,8 @@ impl RegionWorkerLoop { // Buffer to retrieve requests from receiver. let mut write_req_buffer: Vec = Vec::with_capacity(self.config.worker_request_batch_size); + let mut bulk_req_buffer: Vec = + Vec::with_capacity(self.config.worker_request_batch_size); let mut ddl_req_buffer: Vec = Vec::with_capacity(self.config.worker_request_batch_size); let mut general_req_buffer: Vec = @@ -782,6 +808,7 @@ impl RegionWorkerLoop { &mut write_req_buffer, &mut ddl_req_buffer, &mut general_req_buffer, + &mut bulk_req_buffer, ) .await; @@ -801,6 +828,7 @@ impl RegionWorkerLoop { write_requests: &mut Vec, ddl_requests: &mut Vec, general_requests: &mut Vec, + bulk_requests: &mut Vec, ) { for worker_req in general_requests.drain(..) { match worker_req { @@ -835,8 +863,13 @@ impl RegionWorkerLoop { sender, } => { if let Some(region_metadata) = metadata { - self.handle_bulk_insert(request, region_metadata, write_requests, sender) - .await; + self.handle_bulk_insert_batch( + region_metadata, + request, + bulk_requests, + sender, + ) + .await; } else { error!("Cannot find region metadata for {}", request.region_id); sender.send( @@ -852,7 +885,8 @@ impl RegionWorkerLoop { // Handles all write requests first. So we can alter regions without // considering existing write requests. - self.handle_write_requests(write_requests, true).await; + self.handle_write_requests(write_requests, bulk_requests, true) + .await; self.handle_ddl_requests(ddl_requests).await; } diff --git a/src/mito2/src/worker/handle_bulk_insert.rs b/src/mito2/src/worker/handle_bulk_insert.rs index d54ba6b7ed..cdc40d184e 100644 --- a/src/mito2/src/worker/handle_bulk_insert.rs +++ b/src/mito2/src/worker/handle_bulk_insert.rs @@ -14,339 +14,113 @@ //! Handles bulk insert requests. -use std::collections::HashMap; - -use api::helper::{value_to_grpc_value, ColumnDataTypeWrapper}; -use api::v1::{ColumnSchema, OpType, Row, Rows}; -use common_base::AffectedRows; -use common_recordbatch::DfRecordBatch; -use datatypes::prelude::VectorRef; -use datatypes::vectors::Helper; -use snafu::ResultExt; +use datatypes::arrow; +use datatypes::arrow::array::{ + TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, + TimestampSecondArray, +}; +use datatypes::arrow::datatypes::{DataType, TimeUnit}; use store_api::logstore::LogStore; use store_api::metadata::RegionMetadataRef; -use store_api::region_request::{BulkInsertPayload, RegionBulkInsertsRequest}; -use tokio::sync::oneshot::Receiver; +use store_api::region_request::RegionBulkInsertsRequest; -use crate::error; -use crate::request::{OptionOutputTx, SenderWriteRequest, WriteRequest}; +use crate::memtable::bulk::part::BulkPart; +use crate::request::{OptionOutputTx, SenderBulkRequest}; use crate::worker::RegionWorkerLoop; +use crate::{error, metrics}; impl RegionWorkerLoop { - pub(crate) async fn handle_bulk_insert( + pub(crate) async fn handle_bulk_insert_batch( &mut self, - mut request: RegionBulkInsertsRequest, region_metadata: RegionMetadataRef, - pending_write_requests: &mut Vec, + request: RegionBulkInsertsRequest, + pending_bulk_request: &mut Vec, sender: OptionOutputTx, ) { - let (column_schemas, name_to_index) = - match region_metadata_to_column_schema(®ion_metadata) { - Ok(schema) => schema, - Err(e) => { - sender.send(Err(e)); - return; - } - }; + let _timer = metrics::REGION_WORKER_HANDLE_WRITE_ELAPSED + .with_label_values(&["process_bulk_req"]) + .start_timer(); + let batch = request.payload; + let num_rows = batch.num_rows(); - // fast path: only one payload. - if request.payloads.len() == 1 { - match Self::handle_payload( - ®ion_metadata, - request.payloads.swap_remove(0), - pending_write_requests, - column_schemas, - name_to_index, - ) { - Ok(task_future) => common_runtime::spawn_global(async move { - sender.send(task_future.await.context(error::RecvSnafu).flatten()); - }), - Err(e) => { - sender.send(Err(e)); - return; - } - }; - return; - } - - let mut pending_tasks = Vec::with_capacity(request.payloads.len()); - for req in request.payloads { - match Self::handle_payload( - ®ion_metadata, - req, - pending_write_requests, - column_schemas.clone(), - name_to_index.clone(), - ) { - Ok(task_future) => { - pending_tasks.push(task_future); - } - Err(e) => { - sender.send(Err(e)); - return; - } - } - } - - common_runtime::spawn_global(async move { - let results = match futures::future::try_join_all(pending_tasks).await { - Ok(results) => results, - Err(e) => { - sender.send(Err(e).context(error::RecvSnafu)); - return; - } - }; + let Some(ts) = + batch.column_by_name(®ion_metadata.time_index_column().column_schema.name) + else { sender.send( - match results.into_iter().collect::>>() { - Ok(results) => Ok(results.into_iter().sum()), - Err(e) => Err(e), - }, + error::InvalidRequestSnafu { + region_id: region_metadata.region_id, + reason: format!( + "timestamp column `{}` not found", + region_metadata.time_index_column().column_schema.name + ), + } + .fail(), ); + return; + }; + + let DataType::Timestamp(unit, _) = ts.data_type() else { + // safety: ts data type must be a timestamp type. + unreachable!() + }; + + let (min_ts, max_ts) = match unit { + TimeUnit::Second => { + let ts = ts.as_any().downcast_ref::().unwrap(); + ( + //safety: ts array must contain at least one row so this won't return None. + arrow::compute::min(ts).unwrap(), + arrow::compute::max(ts).unwrap(), + ) + } + + TimeUnit::Millisecond => { + let ts = ts + .as_any() + .downcast_ref::() + .unwrap(); + ( + //safety: ts array must contain at least one row so this won't return None. + arrow::compute::min(ts).unwrap(), + arrow::compute::max(ts).unwrap(), + ) + } + TimeUnit::Microsecond => { + let ts = ts + .as_any() + .downcast_ref::() + .unwrap(); + ( + //safety: ts array must contain at least one row so this won't return None. + arrow::compute::min(ts).unwrap(), + arrow::compute::max(ts).unwrap(), + ) + } + TimeUnit::Nanosecond => { + let ts = ts + .as_any() + .downcast_ref::() + .unwrap(); + ( + //safety: ts array must contain at least one row so this won't return None. + arrow::compute::min(ts).unwrap(), + arrow::compute::max(ts).unwrap(), + ) + } + }; + + let part = BulkPart { + batch, + num_rows, + max_ts, + min_ts, + sequence: 0, + }; + pending_bulk_request.push(SenderBulkRequest { + sender, + request: part, + region_id: request.region_id, + region_metadata, }); } - - fn handle_payload( - region_metadata: &RegionMetadataRef, - payload: BulkInsertPayload, - pending_write_requests: &mut Vec, - column_schemas: Vec, - name_to_index: HashMap, - ) -> error::Result>> { - let rx = match payload { - BulkInsertPayload::ArrowIpc(rb) => Self::handle_arrow_ipc( - region_metadata, - rb, - pending_write_requests, - column_schemas, - name_to_index, - ), - BulkInsertPayload::Rows { data, has_null } => Self::handle_rows( - region_metadata, - data, - column_schemas, - has_null, - pending_write_requests, - name_to_index, - ), - }?; - - Ok(rx) - } - - fn handle_arrow_ipc( - region_metadata: &RegionMetadataRef, - df_record_batch: DfRecordBatch, - pending_write_requests: &mut Vec, - column_schemas: Vec, - name_to_index: HashMap, - ) -> error::Result>> { - let has_null: Vec<_> = df_record_batch - .columns() - .iter() - .map(|c| c.null_count() > 0) - .collect(); - - let rows = record_batch_to_rows(region_metadata, &df_record_batch)?; - - let write_request = WriteRequest { - region_id: region_metadata.region_id, - op_type: OpType::Put, - rows: Rows { - schema: column_schemas, - rows, - }, - name_to_index, - has_null, - hint: None, - region_metadata: Some(region_metadata.clone()), - }; - - let (tx, rx) = tokio::sync::oneshot::channel(); - let sender = OptionOutputTx::from(tx); - let req = SenderWriteRequest { - sender, - request: write_request, - }; - pending_write_requests.push(req); - Ok(rx) - } - - fn handle_rows( - region_metadata: &RegionMetadataRef, - rows: Vec, - column_schemas: Vec, - has_null: Vec, - pending_write_requests: &mut Vec, - name_to_index: HashMap, - ) -> error::Result>> { - let write_request = WriteRequest { - region_id: region_metadata.region_id, - op_type: OpType::Put, - rows: Rows { - schema: column_schemas, - rows, - }, - name_to_index, - has_null, - hint: None, - region_metadata: Some(region_metadata.clone()), - }; - - let (tx, rx) = tokio::sync::oneshot::channel(); - let sender = OptionOutputTx::from(tx); - let req = SenderWriteRequest { - sender, - request: write_request, - }; - pending_write_requests.push(req); - Ok(rx) - } -} - -fn region_metadata_to_column_schema( - region_meta: &RegionMetadataRef, -) -> error::Result<(Vec, HashMap)> { - let mut column_schemas = Vec::with_capacity(region_meta.column_metadatas.len()); - let mut name_to_index = HashMap::with_capacity(region_meta.column_metadatas.len()); - - for (idx, c) in region_meta.column_metadatas.iter().enumerate() { - let wrapper = ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone()) - .with_context(|_| error::ConvertDataTypeSnafu { - data_type: c.column_schema.data_type.clone(), - })?; - column_schemas.push(ColumnSchema { - column_name: c.column_schema.name.clone(), - datatype: wrapper.datatype() as i32, - semantic_type: c.semantic_type as i32, - ..Default::default() - }); - - name_to_index.insert(c.column_schema.name.clone(), idx); - } - - Ok((column_schemas, name_to_index)) -} - -/// Convert [DfRecordBatch] to gRPC rows. -fn record_batch_to_rows( - region_metadata: &RegionMetadataRef, - rb: &DfRecordBatch, -) -> error::Result> { - let num_rows = rb.num_rows(); - let mut rows = Vec::with_capacity(num_rows); - if num_rows == 0 { - return Ok(rows); - } - let vectors: Vec> = region_metadata - .column_metadatas - .iter() - .map(|c| { - rb.column_by_name(&c.column_schema.name) - .map(|column| Helper::try_into_vector(column).context(error::ConvertVectorSnafu)) - .transpose() - }) - .collect::>()?; - - for row_idx in 0..num_rows { - let row = Row { - values: row_at(&vectors, row_idx), - }; - rows.push(row); - } - Ok(rows) -} - -fn row_at(vectors: &[Option], row_idx: usize) -> Vec { - let mut row = Vec::with_capacity(vectors.len()); - for a in vectors { - let value = if let Some(a) = a { - value_to_grpc_value(a.get(row_idx)) - } else { - api::v1::Value { value_data: None } - }; - row.push(value) - } - row -} - -#[cfg(test)] -mod tests { - use std::sync::Arc; - - use api::v1::SemanticType; - use datatypes::arrow::array::{Int64Array, TimestampMillisecondArray}; - - use super::*; - use crate::test_util::meta_util::TestRegionMetadataBuilder; - - fn build_record_batch(num_rows: usize) -> DfRecordBatch { - let region_metadata = Arc::new(TestRegionMetadataBuilder::default().build()); - let schema = region_metadata.schema.arrow_schema().clone(); - let values = (0..num_rows).map(|v| v as i64).collect::>(); - let ts_array = Arc::new(TimestampMillisecondArray::from_iter_values(values.clone())); - let k0_array = Arc::new(Int64Array::from_iter_values(values.clone())); - let v0_array = Arc::new(Int64Array::from_iter_values(values)); - DfRecordBatch::try_new(schema, vec![ts_array, k0_array, v0_array]).unwrap() - } - - #[test] - fn test_region_metadata_to_column_schema() { - let region_metadata = Arc::new(TestRegionMetadataBuilder::default().build()); - let (result, _) = region_metadata_to_column_schema(®ion_metadata).unwrap(); - assert_eq!(result.len(), 3); - - assert_eq!(result[0].column_name, "ts"); - assert_eq!(result[0].semantic_type, SemanticType::Timestamp as i32); - - assert_eq!(result[1].column_name, "k0"); - assert_eq!(result[1].semantic_type, SemanticType::Tag as i32); - - assert_eq!(result[2].column_name, "v0"); - assert_eq!(result[2].semantic_type, SemanticType::Field as i32); - } - - #[test] - fn test_record_batch_to_rows() { - // Create record batch - let region_metadata = Arc::new(TestRegionMetadataBuilder::default().build()); - let record_batch = build_record_batch(10); - let rows = record_batch_to_rows(®ion_metadata, &record_batch).unwrap(); - - assert_eq!(rows.len(), 10); - assert_eq!(rows[0].values.len(), 3); - - for (row_idx, row) in rows.iter().enumerate().take(10) { - assert_eq!( - row.values[0].value_data.as_ref().unwrap(), - &api::v1::value::ValueData::TimestampMillisecondValue(row_idx as i64) - ); - } - } - - #[test] - fn test_record_batch_to_rows_schema_mismatch() { - let region_metadata = Arc::new(TestRegionMetadataBuilder::default().num_fields(2).build()); - let record_batch = build_record_batch(1); - - let rows = record_batch_to_rows(®ion_metadata, &record_batch).unwrap(); - assert_eq!(rows.len(), 1); - - // Check first row - let row1 = &rows[0]; - assert_eq!(row1.values.len(), 4); - assert_eq!( - row1.values[0].value_data.as_ref().unwrap(), - &api::v1::value::ValueData::TimestampMillisecondValue(0) - ); - assert_eq!( - row1.values[1].value_data.as_ref().unwrap(), - &api::v1::value::ValueData::I64Value(0) - ); - assert_eq!( - row1.values[2].value_data.as_ref().unwrap(), - &api::v1::value::ValueData::I64Value(0) - ); - - assert!(row1.values[3].value_data.is_none()); - } } diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index ca7499f326..7d471c6517 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -236,13 +236,14 @@ impl RegionWorkerLoop { request.on_success(); // Handle pending requests for the region. - if let Some((mut ddl_requests, mut write_requests)) = + if let Some((mut ddl_requests, mut write_requests, mut bulk_writes)) = self.flush_scheduler.on_flush_success(region_id) { // Perform DDLs first because they require empty memtables. self.handle_ddl_requests(&mut ddl_requests).await; // Handle pending write requests, we don't stall these requests. - self.handle_write_requests(&mut write_requests, false).await; + self.handle_write_requests(&mut write_requests, &mut bulk_writes, false) + .await; } // Handle stalled requests. diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index b6e8783e1e..0f21820d64 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -26,10 +26,11 @@ use store_api::logstore::LogStore; use store_api::storage::RegionId; use crate::error::{InvalidRequestSnafu, RegionStateSnafu, RejectWriteSnafu, Result}; +use crate::metrics; use crate::metrics::{WRITE_REJECT_TOTAL, WRITE_ROWS_TOTAL, WRITE_STAGE_ELAPSED}; use crate::region::{RegionLeaderState, RegionRoleState}; use crate::region_write_ctx::RegionWriteCtx; -use crate::request::{SenderWriteRequest, WriteRequest}; +use crate::request::{SenderBulkRequest, SenderWriteRequest, WriteRequest}; use crate::worker::RegionWorkerLoop; impl RegionWorkerLoop { @@ -37,9 +38,10 @@ impl RegionWorkerLoop { pub(crate) async fn handle_write_requests( &mut self, write_requests: &mut Vec, + bulk_requests: &mut Vec, allow_stall: bool, ) { - if write_requests.is_empty() { + if write_requests.is_empty() && bulk_requests.is_empty() { return; } @@ -48,7 +50,7 @@ impl RegionWorkerLoop { if self.should_reject_write() { // The memory pressure is still too high, reject write requests. - reject_write_requests(write_requests); + reject_write_requests(write_requests, bulk_requests); // Also reject all stalled requests. self.reject_stalled_requests(); return; @@ -56,7 +58,7 @@ impl RegionWorkerLoop { if self.write_buffer_manager.should_stall() && allow_stall { self.stalled_count.add(write_requests.len() as i64); - self.stalled_requests.append(write_requests); + self.stalled_requests.append(write_requests, bulk_requests); self.listener.on_write_stall(); return; } @@ -66,7 +68,7 @@ impl RegionWorkerLoop { let _timer = WRITE_STAGE_ELAPSED .with_label_values(&["prepare_ctx"]) .start_timer(); - self.prepare_region_write_ctx(write_requests) + self.prepare_region_write_ctx(write_requests, bulk_requests) }; // Write to WAL. @@ -117,6 +119,7 @@ impl RegionWorkerLoop { // fast path for single region. let mut region_ctx = region_ctxs.into_values().next().unwrap(); region_ctx.write_memtable().await; + region_ctx.write_bulk().await; put_rows += region_ctx.put_num; delete_rows += region_ctx.delete_num; } else { @@ -126,6 +129,7 @@ impl RegionWorkerLoop { // use tokio runtime to schedule tasks. common_runtime::spawn_global(async move { region_ctx.write_memtable().await; + region_ctx.write_bulk().await; (region_ctx.put_num, region_ctx.delete_num) }) }) @@ -158,8 +162,9 @@ impl RegionWorkerLoop { let stalled = std::mem::take(&mut self.stalled_requests); self.stalled_count.sub(stalled.stalled_count() as i64); // We already stalled these requests, don't stall them again. - for (_, (_, mut requests)) in stalled.requests { - self.handle_write_requests(&mut requests, false).await; + for (_, (_, mut requests, mut bulk)) in stalled.requests { + self.handle_write_requests(&mut requests, &mut bulk, false) + .await; } } @@ -167,25 +172,26 @@ impl RegionWorkerLoop { pub(crate) fn reject_stalled_requests(&mut self) { let stalled = std::mem::take(&mut self.stalled_requests); self.stalled_count.sub(stalled.stalled_count() as i64); - for (_, (_, mut requests)) in stalled.requests { - reject_write_requests(&mut requests); + for (_, (_, mut requests, mut bulk)) in stalled.requests { + reject_write_requests(&mut requests, &mut bulk); } } /// Rejects a specific region's stalled requests. pub(crate) fn reject_region_stalled_requests(&mut self, region_id: &RegionId) { debug!("Rejects stalled requests for region {}", region_id); - let mut requests = self.stalled_requests.remove(region_id); + let (mut requests, mut bulk) = self.stalled_requests.remove(region_id); self.stalled_count.sub(requests.len() as i64); - reject_write_requests(&mut requests); + reject_write_requests(&mut requests, &mut bulk); } /// Handles a specific region's stalled requests. pub(crate) async fn handle_region_stalled_requests(&mut self, region_id: &RegionId) { debug!("Handles stalled requests for region {}", region_id); - let mut requests = self.stalled_requests.remove(region_id); + let (mut requests, mut bulk) = self.stalled_requests.remove(region_id); self.stalled_count.sub(requests.len() as i64); - self.handle_write_requests(&mut requests, true).await; + self.handle_write_requests(&mut requests, &mut bulk, true) + .await; } } @@ -194,9 +200,20 @@ impl RegionWorkerLoop { fn prepare_region_write_ctx( &mut self, write_requests: &mut Vec, + bulk_requests: &mut Vec, ) -> HashMap { // Initialize region write context map. let mut region_ctxs = HashMap::new(); + self.process_write_requests(&mut region_ctxs, write_requests); + self.process_bulk_requests(&mut region_ctxs, bulk_requests); + region_ctxs + } + + fn process_write_requests( + &mut self, + region_ctxs: &mut HashMap, + write_requests: &mut Vec, + ) { for mut sender_req in write_requests.drain(..) { let region_id = sender_req.request.region_id; @@ -294,8 +311,89 @@ impl RegionWorkerLoop { sender_req.sender, ); } + } - region_ctxs + /// Processes bulk insert requests. + fn process_bulk_requests( + &mut self, + region_ctxs: &mut HashMap, + requests: &mut Vec, + ) { + let _timer = metrics::REGION_WORKER_HANDLE_WRITE_ELAPSED + .with_label_values(&["prepare_bulk_request"]) + .start_timer(); + for mut bulk_req in requests.drain(..) { + let region_id = bulk_req.region_id; + // If region is waiting for alteration, add requests to pending writes. + if self.flush_scheduler.has_pending_ddls(region_id) { + // Safety: The region has pending ddls. + self.flush_scheduler.add_bulk_request_to_pending(bulk_req); + continue; + } + + // Checks whether the region exists and is it stalling. + if let hash_map::Entry::Vacant(e) = region_ctxs.entry(region_id) { + let Some(region) = self.regions.get_region_or(region_id, &mut bulk_req.sender) + else { + continue; + }; + match region.state() { + RegionRoleState::Leader(RegionLeaderState::Writable) => { + let region_ctx = RegionWriteCtx::new( + region.region_id, + ®ion.version_control, + region.provider.clone(), + ); + + e.insert(region_ctx); + } + RegionRoleState::Leader(RegionLeaderState::Altering) => { + debug!( + "Region {} is altering, add request to pending writes", + region.region_id + ); + self.stalled_count.add(1); + self.stalled_requests.push_bulk(bulk_req); + continue; + } + state => { + // The region is not writable. + bulk_req.sender.send( + RegionStateSnafu { + region_id, + state, + expect: RegionRoleState::Leader(RegionLeaderState::Writable), + } + .fail(), + ); + continue; + } + } + } + + // Safety: Now we ensure the region exists. + let region_ctx = region_ctxs.get_mut(®ion_id).unwrap(); + + // Double-check the request schema + let need_fill_missing_columns = region_ctx.version().metadata.schema_version + != bulk_req.region_metadata.schema_version; + + // Only fill missing columns if primary key is dense encoded. + if need_fill_missing_columns { + // todo(hl): support filling default columns + bulk_req.sender.send( + InvalidRequestSnafu { + region_id, + reason: "Schema mismatch", + } + .fail(), + ); + return; + } + + // Collect requests by region. + region_ctx.push_bulk(bulk_req.sender, bulk_req.request); + } } /// Returns true if the engine needs to reject some write requests. @@ -307,7 +405,10 @@ impl RegionWorkerLoop { } /// Send rejected error to all `write_requests`. -fn reject_write_requests(write_requests: &mut Vec) { +fn reject_write_requests( + write_requests: &mut Vec, + bulk_requests: &mut Vec, +) { WRITE_REJECT_TOTAL.inc_by(write_requests.len() as u64); for req in write_requests.drain(..) { @@ -318,6 +419,10 @@ fn reject_write_requests(write_requests: &mut Vec) { .fail(), ); } + for req in bulk_requests.drain(..) { + let region_id = req.region_id; + req.sender.send(RejectWriteSnafu { region_id }.fail()); + } } /// Rejects delete request under append mode. diff --git a/src/operator/src/bulk_insert.rs b/src/operator/src/bulk_insert.rs index 24cd81f358..6d199f3025 100644 --- a/src/operator/src/bulk_insert.rs +++ b/src/operator/src/bulk_insert.rs @@ -12,16 +12,20 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use ahash::{HashMap, HashMapExt}; use api::v1::region::{ bulk_insert_request, region_request, ArrowIpc, BulkInsertRequest, RegionRequest, - RegionRequestHeader, RegionSelection, + RegionRequestHeader, }; use bytes::Bytes; use common_base::AffectedRows; use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage}; use common_grpc::FlightData; +use common_recordbatch::RecordBatch; use common_telemetry::tracing_context::TracingContext; +use datatypes::schema::Schema; use prost::Message; use snafu::ResultExt; use store_api::storage::RegionId; @@ -41,7 +45,6 @@ impl Inserter { let decode_timer = metrics::HANDLE_BULK_INSERT_ELAPSED .with_label_values(&["decode_request"]) .start_timer(); - let raw_flight_data = Bytes::from(data.encode_to_vec()); let body_size = data.data_body.len(); // Build region server requests let message = decoder @@ -50,16 +53,19 @@ impl Inserter { let FlightMessage::Recordbatch(rb) = message else { return Ok(0); }; - metrics::BULK_REQUEST_MESSAGE_SIZE.observe(body_size as f64); + let record_batch = rb.df_record_batch(); decode_timer.observe_duration(); + metrics::BULK_REQUEST_MESSAGE_SIZE.observe(body_size as f64); + metrics::BULK_REQUEST_ROWS + .with_label_values(&["raw"]) + .observe(record_batch.num_rows() as f64); // todo(hl): find a way to embed raw FlightData messages in greptimedb proto files so we don't have to encode here. + // safety: when reach here schema must be present. let schema_message = FlightEncoder::default() .encode(FlightMessage::Schema(decoder.schema().unwrap().clone())); - let schema_data = Bytes::from(schema_message.encode_to_vec()); - - let record_batch = rb.df_record_batch(); + let schema_bytes = Bytes::from(schema_message.encode_to_vec()); let partition_timer = metrics::HANDLE_BULK_INSERT_ELAPSED .with_label_values(&["partition"]) @@ -76,10 +82,6 @@ impl Inserter { .context(error::SplitInsertSnafu)?; partition_timer.observe_duration(); - let group_request_timer = metrics::HANDLE_BULK_INSERT_ELAPSED - .with_label_values(&["group_request"]) - .start_timer(); - let mut mask_per_datanode = HashMap::with_capacity(region_masks.len()); for (region_number, mask) in region_masks { let region_id = RegionId::new(table_id, region_number); @@ -88,79 +90,82 @@ impl Inserter { .find_region_leader(region_id) .await .context(error::FindRegionLeaderSnafu)?; - let selection = RegionSelection { - region_id: region_id.as_u64(), - selection: mask.values().inner().as_slice().to_vec(), - }; mask_per_datanode .entry(datanode) .or_insert_with(Vec::new) - .push(selection); + .push((region_id, mask)); } - group_request_timer.observe_duration(); - let datanode_handle_timer = metrics::HANDLE_BULK_INSERT_ELAPSED - .with_label_values(&["datanode_handle"]) + let wait_all_datanode_timer = metrics::HANDLE_BULK_INSERT_ELAPSED + .with_label_values(&["wait_all_datanode"]) .start_timer(); - // fast path: only one datanode - if mask_per_datanode.len() == 1 { - let (peer, requests) = mask_per_datanode.into_iter().next().unwrap(); - let datanode = self.node_manager.datanode(&peer).await; - let request = RegionRequest { - header: Some(RegionRequestHeader { - tracing_context: TracingContext::from_current_span().to_w3c(), - ..Default::default() - }), - body: Some(region_request::Body::BulkInsert(BulkInsertRequest { - body: Some(bulk_insert_request::Body::ArrowIpc(ArrowIpc { - schema: schema_data, - payload: raw_flight_data, - region_selection: requests, - })), - })), - }; - let response = datanode - .handle(request) - .await - .context(error::RequestRegionSnafu)?; - return Ok(response.affected_rows); - } let mut handles = Vec::with_capacity(mask_per_datanode.len()); + let record_batch_schema = + Arc::new(Schema::try_from(record_batch.schema()).context(error::ConvertSchemaSnafu)?); + for (peer, masks) in mask_per_datanode { - let node_manager = self.node_manager.clone(); - let schema = schema_data.clone(); - let payload = raw_flight_data.clone(); + for (region_id, mask) in masks { + let rb = record_batch.clone(); + let schema_bytes = schema_bytes.clone(); + let record_batch_schema = record_batch_schema.clone(); + let node_manager = self.node_manager.clone(); + let peer = peer.clone(); + let handle: common_runtime::JoinHandle> = + common_runtime::spawn_global(async move { + let filter_timer = metrics::HANDLE_BULK_INSERT_ELAPSED + .with_label_values(&["filter"]) + .start_timer(); + let rb = arrow::compute::filter_record_batch(&rb, &mask) + .context(error::ComputeArrowSnafu)?; + filter_timer.observe_duration(); + metrics::BULK_REQUEST_ROWS + .with_label_values(&["rows_per_region"]) + .observe(rb.num_rows() as f64); - let handle: common_runtime::JoinHandle> = - common_runtime::spawn_global(async move { - let request = RegionRequest { - header: Some(RegionRequestHeader { - tracing_context: TracingContext::from_current_span().to_w3c(), - ..Default::default() - }), - body: Some(region_request::Body::BulkInsert(BulkInsertRequest { - body: Some(bulk_insert_request::Body::ArrowIpc(ArrowIpc { - schema, - payload, - region_selection: masks, + let encode_timer = metrics::HANDLE_BULK_INSERT_ELAPSED + .with_label_values(&["encode"]) + .start_timer(); + let batch = RecordBatch::try_from_df_record_batch(record_batch_schema, rb) + .context(error::BuildRecordBatchSnafu)?; + let payload = Bytes::from( + FlightEncoder::default() + .encode(FlightMessage::Recordbatch(batch)) + .encode_to_vec(), + ); + encode_timer.observe_duration(); + + let _datanode_handle_timer = metrics::HANDLE_BULK_INSERT_ELAPSED + .with_label_values(&["datanode_handle"]) + .start_timer(); + let request = RegionRequest { + header: Some(RegionRequestHeader { + tracing_context: TracingContext::from_current_span().to_w3c(), + ..Default::default() + }), + body: Some(region_request::Body::BulkInsert(BulkInsertRequest { + body: Some(bulk_insert_request::Body::ArrowIpc(ArrowIpc { + region_id: region_id.as_u64(), + schema: schema_bytes, + payload, + })), })), - })), - }; + }; - let datanode = node_manager.datanode(&peer).await; - datanode - .handle(request) - .await - .context(error::RequestRegionSnafu) - }); - handles.push(handle); + let datanode = node_manager.datanode(&peer).await; + datanode + .handle(request) + .await + .context(error::RequestRegionSnafu) + }); + handles.push(handle); + } } let region_responses = futures::future::try_join_all(handles) .await .context(error::JoinTaskSnafu)?; - datanode_handle_timer.observe_duration(); + wait_all_datanode_timer.observe_duration(); let mut rows_inserted: usize = 0; for res in region_responses { rows_inserted += res?.affected_rows; diff --git a/src/operator/src/error.rs b/src/operator/src/error.rs index 488005f5d5..8b80977b8b 100644 --- a/src/operator/src/error.rs +++ b/src/operator/src/error.rs @@ -814,6 +814,14 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Failed to perform arrow compute"))] + ComputeArrow { + #[snafu(source)] + error: ArrowError, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -936,6 +944,7 @@ impl ErrorExt for Error { Error::StatementTimeout { .. } => StatusCode::Cancelled, Error::ColumnOptions { source, .. } => source.status_code(), Error::DecodeFlightData { source, .. } => source.status_code(), + Error::ComputeArrow { .. } => StatusCode::Internal, } } diff --git a/src/operator/src/metrics.rs b/src/operator/src/metrics.rs index 07f6897e37..5cc0bac19d 100644 --- a/src/operator/src/metrics.rs +++ b/src/operator/src/metrics.rs @@ -93,4 +93,12 @@ lazy_static! { ] ) .unwrap(); + pub static ref BULK_REQUEST_ROWS: HistogramVec = register_histogram_vec!( + "greptime_table_operator_bulk_insert_message_rows", + "table operator bulk inserts message rows", + &["type"], + // 10 ~ 100_000 + exponential_buckets(10.0, 10.0, 5).unwrap() + ) + .unwrap(); } diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index e083ddbf95..2bd9a89978 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -14,9 +14,8 @@ use std::collections::HashMap; use std::fmt::{self, Display}; -use std::time::{Duration, Instant}; -use api::helper::{value_to_grpc_value, ColumnDataTypeWrapper}; +use api::helper::ColumnDataTypeWrapper; use api::v1::add_column_location::LocationType; use api::v1::column_def::{ as_fulltext_option_analyzer, as_fulltext_option_backend, as_skipping_index_type, @@ -28,7 +27,7 @@ use api::v1::region::{ DropRequests, FlushRequest, InsertRequests, OpenRequest, TruncateRequest, }; use api::v1::{ - self, set_index, Analyzer, FulltextBackend as PbFulltextBackend, Option as PbOption, Row, Rows, + self, set_index, Analyzer, FulltextBackend as PbFulltextBackend, Option as PbOption, Rows, SemanticType, SkippingIndexType as PbSkippingIndexType, WriteHint, }; pub use common_base::AffectedRows; @@ -36,12 +35,8 @@ use common_grpc::flight::{FlightDecoder, FlightMessage}; use common_grpc::FlightData; use common_recordbatch::DfRecordBatch; use common_time::TimeToLive; -use datatypes::arrow; -use datatypes::arrow::array::{Array, BooleanArray}; -use datatypes::arrow::buffer::{BooleanBuffer, Buffer}; -use datatypes::prelude::{ConcreteDataType, VectorRef}; +use datatypes::prelude::ConcreteDataType; use datatypes::schema::{FulltextOptions, SkippingIndexOptions}; -use datatypes::vectors::Helper; use prost::Message; use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt, ResultExt}; @@ -49,8 +44,8 @@ use strum::{AsRefStr, IntoStaticStr}; use crate::logstore::entry; use crate::metadata::{ - ColumnMetadata, DecodeArrowIpcSnafu, DecodeProtoSnafu, FlightCodecSnafu, - InvalidRawRegionRequestSnafu, InvalidRegionRequestSnafu, InvalidSetRegionOptionRequestSnafu, + ColumnMetadata, DecodeProtoSnafu, FlightCodecSnafu, InvalidRawRegionRequestSnafu, + InvalidRegionRequestSnafu, InvalidSetRegionOptionRequestSnafu, InvalidUnsetRegionOptionRequestSnafu, MetadataError, ProstSnafu, RegionMetadata, Result, UnexpectedSnafu, }; @@ -159,7 +154,7 @@ impl RegionRequest { region_request::Body::Creates(creates) => make_region_creates(creates), region_request::Body::Drops(drops) => make_region_drops(drops), region_request::Body::Alters(alters) => make_region_alters(alters), - region_request::Body::BulkInsert(bulk) => make_region_rows_bulk_inserts(bulk), + region_request::Body::BulkInsert(bulk) => make_region_bulk_inserts(bulk), region_request::Body::Sync(_) => UnexpectedSnafu { reason: "Sync request should be handled separately by RegionServer", } @@ -334,165 +329,30 @@ fn make_region_truncate(truncate: TruncateRequest) -> Result Result> { let Some(Body::ArrowIpc(request)) = request.body else { return Ok(vec![]); }; - let mut region_requests: HashMap = - HashMap::with_capacity(request.region_selection.len()); - - let schema_data = FlightData::decode(request.schema.clone()).context(ProstSnafu)?; - let payload_data = FlightData::decode(request.payload.clone()).context(ProstSnafu)?; - let mut decoder = FlightDecoder::default(); - let _schema_message = decoder.try_decode(schema_data).context(FlightCodecSnafu)?; - let FlightMessage::Recordbatch(rb) = - decoder.try_decode(payload_data).context(FlightCodecSnafu)? - else { - unreachable!("Always expect record batch message after schema"); - }; - - for region_selection in request.region_selection { - let region_id = region_selection.region_id; - let region_mask = BooleanArray::new( - BooleanBuffer::new(Buffer::from(region_selection.selection), 0, rb.num_rows()), - None, - ); - - let region_batch = if region_mask.true_count() == rb.num_rows() { - rb.df_record_batch().clone() - } else { - arrow::compute::filter_record_batch(rb.df_record_batch(), ®ion_mask) - .context(DecodeArrowIpcSnafu)? - }; - - region_requests.insert(region_id, BulkInsertPayload::ArrowIpc(region_batch)); - } - - let result = region_requests - .into_iter() - .map(|(region_id, payload)| { - ( - region_id.into(), - RegionRequest::BulkInserts(RegionBulkInsertsRequest { - region_id: region_id.into(), - payloads: vec![payload], - }), - ) - }) - .collect::>(); - Ok(result) -} - -/// Convert [BulkInsertRequest] to [RegionRequest] and group by [RegionId]. -fn make_region_rows_bulk_inserts( - request: BulkInsertRequest, -) -> Result> { - let Some(Body::ArrowIpc(request)) = request.body else { - return Ok(vec![]); - }; - - let mut region_requests: HashMap = - HashMap::with_capacity(request.region_selection.len()); - - let decode_timer = metrics::CONVERT_REGION_BULK_REQUEST + let decoder_timer = metrics::CONVERT_REGION_BULK_REQUEST .with_label_values(&["decode"]) .start_timer(); let schema_data = FlightData::decode(request.schema.clone()).context(ProstSnafu)?; let payload_data = FlightData::decode(request.payload.clone()).context(ProstSnafu)?; let mut decoder = FlightDecoder::default(); - let _schema_message = decoder.try_decode(schema_data).context(FlightCodecSnafu)?; + let _ = decoder.try_decode(schema_data).context(FlightCodecSnafu)?; let FlightMessage::Recordbatch(rb) = decoder.try_decode(payload_data).context(FlightCodecSnafu)? else { unreachable!("Always expect record batch message after schema"); }; - decode_timer.observe_duration(); - - let filter_timer = metrics::CONVERT_REGION_BULK_REQUEST.with_label_values(&["filter_batch"]); - let convert_to_rows_timer = - metrics::CONVERT_REGION_BULK_REQUEST.with_label_values(&["convert_to_rows"]); - - let mut filter_time = Duration::default(); - let mut convert_to_rows_time = Duration::default(); - for region_selection in request.region_selection { - let region_id = region_selection.region_id; - let start = Instant::now(); - let region_mask = BooleanArray::new( - BooleanBuffer::new(Buffer::from(region_selection.selection), 0, rb.num_rows()), - None, - ); - - let region_batch = if region_mask.true_count() == rb.num_rows() { - rb.df_record_batch().clone() - } else { - arrow::compute::filter_record_batch(rb.df_record_batch(), ®ion_mask) - .context(DecodeArrowIpcSnafu)? - }; - filter_time += start.elapsed(); - - let start = Instant::now(); - let (rows, has_null) = record_batch_to_rows(®ion_batch); - convert_to_rows_time += start.elapsed(); - - region_requests.insert( - region_id, - BulkInsertPayload::Rows { - data: rows, - has_null, - }, - ); - } - filter_timer.observe(filter_time.as_secs_f64()); - convert_to_rows_timer.observe(convert_to_rows_time.as_secs_f64()); - - let result = region_requests - .into_iter() - .map(|(region_id, payload)| { - ( - region_id.into(), - RegionRequest::BulkInserts(RegionBulkInsertsRequest { - region_id: region_id.into(), - payloads: vec![payload], - }), - ) - }) - .collect::>(); - - Ok(result) -} - -/// Convert [DfRecordBatch] to gRPC rows. -fn record_batch_to_rows(rb: &DfRecordBatch) -> (Vec, Vec) { - let num_rows = rb.num_rows(); - let mut rows = Vec::with_capacity(num_rows); - if num_rows == 0 { - return (rows, vec![false; rb.num_columns()]); - } - - let mut vectors = Vec::with_capacity(rb.num_columns()); - let mut has_null = Vec::with_capacity(rb.num_columns()); - for c in rb.columns() { - vectors.push(Helper::try_into_vector(c).unwrap()); - has_null.push(c.null_count() > 0); - } - - for row_idx in 0..num_rows { - let row = Row { - values: row_at(&vectors, row_idx), - }; - rows.push(row); - } - (rows, has_null) -} - -fn row_at(vectors: &[VectorRef], row_idx: usize) -> Vec { - let mut row = Vec::with_capacity(vectors.len()); - for a in vectors { - row.push(value_to_grpc_value(a.get(row_idx))) - } - row + decoder_timer.observe_duration(); + let payload = rb.into_df_record_batch(); + let region_id: RegionId = request.region_id.into(); + Ok(vec![( + region_id, + RegionRequest::BulkInserts(RegionBulkInsertsRequest { region_id, payload }), + )]) } /// Request to put data into a region. @@ -1302,13 +1162,13 @@ pub struct RegionSequencesRequest { #[derive(Debug, Clone)] pub struct RegionBulkInsertsRequest { pub region_id: RegionId, - pub payloads: Vec, + pub payload: DfRecordBatch, } -#[derive(Debug, Clone)] -pub enum BulkInsertPayload { - ArrowIpc(DfRecordBatch), - Rows { data: Vec, has_null: Vec }, +impl RegionBulkInsertsRequest { + pub fn estimated_size(&self) -> usize { + self.payload.get_array_memory_size() + } } impl fmt::Display for RegionRequest {