feat(wal): support bulk wal entries (#6178)

* feat/bulk-wal:
 ### Refactor: Simplify Data Handling in LogStore Implementations

 - **`kafka/log_store.rs`, `raft_engine/log_store.rs`, `wal.rs`, `raw_entry_reader.rs`, `logstore.rs`:**
   - Refactored `entry` and `build_entry` functions to accept `Vec<u8>` directly instead of `&mut Vec<u8>`.
   - Removed usage of `std::mem::take` for data handling, simplifying the code and improving readability.
   - Updated test cases to align with the new function signatures.

* feat/bulk-wal:
 ### Add Support for Bulk WAL Entries and Flight Data Encoding

 - **Add `raw_data` field to `BulkPart` and related structs**: Updated `BulkPart` and related structures in `src/mito2/src/memtable/bulk/part.rs`, `src/mito2/src/memtable/simple_bulk_memtable.rs`, `src/mito2/src/memtable/time_partition.rs`, `src/mito2/src/region_write_ctx.rs`,
 `src/mito2/src/worker/handle_bulk_insert.rs`, and `src/store-api/src/region_request.rs` to include a new `raw_data` field for handling Arrow IPC data.
 - **Implement Flight Data Encoding**: Added a new module `flight` in `src/common/test-util/src/flight.rs` to encode record batches to Flight data format.
 - **Update `greptime-proto` dependency**: Changed the revision of the `greptime-proto` dependency in `Cargo.lock` and `Cargo.toml`.
 - **Enhance WAL Writer and Tests**: Modified `src/mito2/src/wal.rs` and related test files to support bulk WAL entries and added tests for encoding and handling bulk data.

* feat/bulk-wal:
 - **Update `greptime-proto` Dependency**: Updated the `greptime-proto` dependency to a new revision in `Cargo.lock` and `Cargo.toml`.
 - **Add `common-grpc` Dependency**: Added `common-grpc` as a dependency in `Cargo.lock` and `src/mito2/Cargo.toml`.
 - **Refactor `BulkPart` Structure**: Removed `num_rows` field and added `num_rows()` method in `src/mito2/src/memtable/bulk/part.rs`. Updated related usages in `src/mito2/src/memtable/simple_bulk_memtable.rs`, `src/mito2/src/memtable/time_partition.rs`, `src/mito2/src/memtable/time_series.rs`,
 `src/mito2/src/region_write_ctx.rs`, and `src/mito2/src/worker/handle_bulk_insert.rs`.
 - **Implement `TryFrom` and `From` for `BulkWalEntry`**: Added implementations for converting between `BulkPart` and `BulkWalEntry` in `src/mito2/src/memtable/bulk/part.rs`.
 - **Handle Bulk Entries in Region Opener**: Added logic to process bulk entries in `src/mito2/src/region/opener.rs`.
 - **Fix `BulkInsertRequest` Handling**: Corrected `region_id` handling in `src/operator/src/bulk_insert.rs` and `src/store-api/src/region_request.rs`.
 - **Add Error Variant for `ConvertBulkWalEntry`**: Added a new error variant in `src/mito2/src/error.rs` for handling bulk WAL entry conversion errors.

* fix: ci

* feat/bulk-wal:
 Add bulk write operation in `opener.rs`

 - Enhanced the region write context by adding a call to `write_bulk()` after `write_memtable()` in `opener.rs`.
 - This change aims to improve the efficiency of writing operations by enabling bulk writes.

* feat/bulk-wal:
 Enhance error handling and metrics in `bulk_insert.rs`

 - Updated `Inserter` to improve error handling by capturing the result of `datanode.handle(request)` and incrementing the `DIST_INGEST_ROW_COUNT` metric with the number of affected rows.

* feat/bulk-wal:
 ### Remove Encode Error Handling for WAL Entries

 - **`error.rs`**: Removed the `EncodeWal` error variant and its associated handling.
 - **`wal.rs`**: Eliminated the `entry_encode_buf` buffer and its usage for encoding WAL entries. Replaced with direct encoding to a vector using `encode_to_vec()`.
This commit is contained in:
Lei, HUANG
2025-05-29 17:10:30 +08:00
committed by GitHub
parent 9afc61f778
commit 4e615e8906
24 changed files with 242 additions and 79 deletions

4
Cargo.lock generated
View File

@@ -2542,6 +2542,7 @@ name = "common-test-util"
version = "0.15.0"
dependencies = [
"client",
"common-grpc",
"common-query",
"common-recordbatch",
"once_cell",
@@ -4875,7 +4876,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=67ee5f94e5da72314cda7d0eb90106eb1c16a1ae#67ee5f94e5da72314cda7d0eb90106eb1c16a1ae"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=442348b2518c0bf187fb1ad011ba370c38b96cc4#442348b2518c0bf187fb1ad011ba370c38b96cc4"
dependencies = [
"prost 0.13.5",
"serde",
@@ -6975,6 +6976,7 @@ dependencies = [
"common-decimal",
"common-error",
"common-function",
"common-grpc",
"common-macro",
"common-meta",
"common-query",

View File

@@ -132,7 +132,7 @@ etcd-client = "0.14"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "67ee5f94e5da72314cda7d0eb90106eb1c16a1ae" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "442348b2518c0bf187fb1ad011ba370c38b96cc4" }
hex = "0.4"
http = "1"
humantime = "2.1"

View File

@@ -9,6 +9,7 @@ workspace = true
[dependencies]
client = { workspace = true, features = ["testing"] }
common-grpc.workspace = true
common-query.workspace = true
common-recordbatch.workspace = true
once_cell.workspace = true

View File

@@ -0,0 +1,26 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_grpc::flight::{FlightEncoder, FlightMessage};
use common_grpc::FlightData;
use common_recordbatch::DfRecordBatch;
/// Encodes record batch to a Schema message and a RecordBatch message.
pub fn encode_to_flight_data(rb: DfRecordBatch) -> (FlightData, FlightData) {
let mut encoder = FlightEncoder::default();
(
encoder.encode(FlightMessage::Schema(rb.schema())),
encoder.encode(FlightMessage::RecordBatch(rb)),
)
}

View File

@@ -16,6 +16,7 @@ use std::path::{Path, PathBuf};
use std::process::Command;
use std::sync::LazyLock;
pub mod flight;
pub mod ports;
pub mod recordbatch;
pub mod temp_dir;

View File

@@ -98,7 +98,7 @@ impl KafkaLogStore {
}
fn build_entry(
data: &mut Vec<u8>,
data: Vec<u8>,
entry_id: EntryId,
region_id: RegionId,
provider: &Provider,
@@ -109,10 +109,10 @@ fn build_entry(
provider: provider.clone(),
region_id,
entry_id,
data: std::mem::take(data),
data,
})
} else {
let parts = std::mem::take(data)
let parts = data
.chunks(max_data_size)
.map(|s| s.into())
.collect::<Vec<_>>();
@@ -140,7 +140,7 @@ impl LogStore for KafkaLogStore {
/// Creates an [Entry].
fn entry(
&self,
data: &mut Vec<u8>,
data: Vec<u8>,
entry_id: EntryId,
region_id: RegionId,
provider: &Provider,
@@ -479,7 +479,7 @@ mod tests {
fn test_build_naive_entry() {
let provider = Provider::kafka_provider("my_topic".to_string());
let region_id = RegionId::new(1, 1);
let entry = build_entry(&mut vec![1; 100], 1, region_id, &provider, 120);
let entry = build_entry(vec![1; 100], 1, region_id, &provider, 120);
assert_eq!(
entry.into_naive_entry().unwrap(),
@@ -496,7 +496,7 @@ mod tests {
fn test_build_into_multiple_part_entry() {
let provider = Provider::kafka_provider("my_topic".to_string());
let region_id = RegionId::new(1, 1);
let entry = build_entry(&mut vec![1; 100], 1, region_id, &provider, 50);
let entry = build_entry(vec![1; 100], 1, region_id, &provider, 50);
assert_eq!(
entry.into_multiple_part_entry().unwrap(),
@@ -510,7 +510,7 @@ mod tests {
);
let region_id = RegionId::new(1, 1);
let entry = build_entry(&mut vec![1; 100], 1, region_id, &provider, 21);
let entry = build_entry(vec![1; 100], 1, region_id, &provider, 21);
assert_eq!(
entry.into_multiple_part_entry().unwrap(),
@@ -545,9 +545,9 @@ mod tests {
) -> Vec<Entry> {
(0..num_entries)
.map(|_| {
let mut data: Vec<u8> = (0..data_len).map(|_| rand::random::<u8>()).collect();
let data: Vec<u8> = (0..data_len).map(|_| rand::random::<u8>()).collect();
// Always set `entry_id` to 0, the real entry_id will be set during the read.
logstore.entry(&mut data, 0, region_id, provider).unwrap()
logstore.entry(data, 0, region_id, provider).unwrap()
})
.collect()
}

View File

@@ -442,7 +442,7 @@ impl LogStore for RaftEngineLogStore {
fn entry(
&self,
data: &mut Vec<u8>,
data: Vec<u8>,
entry_id: EntryId,
region_id: RegionId,
provider: &Provider,
@@ -455,7 +455,7 @@ impl LogStore for RaftEngineLogStore {
provider: provider.clone(),
region_id,
entry_id,
data: std::mem::take(data),
data,
}))
}

View File

@@ -24,6 +24,7 @@ common-config.workspace = true
common-datasource.workspace = true
common-decimal.workspace = true
common-error.workspace = true
common-grpc.workspace = true
common-macro.workspace = true
common-meta.workspace = true
common-query.workspace = true

View File

@@ -53,11 +53,11 @@ fn random_array(num: usize) -> BulkPart {
.unwrap();
BulkPart {
batch,
num_rows: num,
max_ts: max,
min_ts: min,
sequence: 0,
timestamp_index: 0,
raw_data: None,
}
}
@@ -76,7 +76,6 @@ fn filter_arrow_impl(part: &BulkPart, min: i64, max: i64) -> Option<BulkPart> {
return None;
}
let num_rows_filtered = ts_filtered.len();
let i64array = ts_filtered
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
@@ -87,11 +86,11 @@ fn filter_arrow_impl(part: &BulkPart, min: i64, max: i64) -> Option<BulkPart> {
let batch = arrow::compute::filter_record_batch(&part.batch, &predicate).unwrap();
Some(BulkPart {
batch,
num_rows: num_rows_filtered,
max_ts: max,
min_ts: min,
sequence: 0,
timestamp_index: part.timestamp_index,
raw_data: None,
})
}

View File

@@ -25,7 +25,7 @@ use common_time::Timestamp;
use datatypes::arrow::error::ArrowError;
use datatypes::prelude::ConcreteDataType;
use object_store::ErrorKind;
use prost::{DecodeError, EncodeError};
use prost::DecodeError;
use snafu::{Location, Snafu};
use store_api::logstore::provider::Provider;
use store_api::manifest::ManifestVersion;
@@ -255,15 +255,6 @@ pub enum Error {
source: BoxedError,
},
#[snafu(display("Failed to encode WAL entry, region_id: {}", region_id))]
EncodeWal {
region_id: RegionId,
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: EncodeError,
},
#[snafu(display("Failed to write WAL"))]
WriteWal {
#[snafu(implicit)]
@@ -1037,6 +1028,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to decode bulk wal entry"))]
ConvertBulkWalEntry {
#[snafu(implicit)]
location: Location,
source: common_grpc::Error,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -1098,7 +1096,6 @@ impl ErrorExt for Error {
| Join { .. }
| WorkerStopped { .. }
| Recv { .. }
| EncodeWal { .. }
| ConvertMetaData { .. }
| DecodeWal { .. }
| ComputeArrow { .. }
@@ -1192,6 +1189,7 @@ impl ErrorExt for Error {
ScanSeries { source, .. } => source.status_code(),
ScanMultiTimes { .. } => StatusCode::InvalidArguments,
Error::ConvertBulkWalEntry { source, .. } => source.status_code(),
}
}

View File

@@ -18,8 +18,10 @@ use std::collections::VecDeque;
use std::sync::Arc;
use api::helper::{value_to_grpc_value, ColumnDataTypeWrapper};
use api::v1::{Mutation, OpType};
use api::v1::bulk_wal_entry::Body;
use api::v1::{bulk_wal_entry, ArrowIpc, BulkWalEntry, Mutation, OpType};
use bytes::Bytes;
use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
use common_recordbatch::DfRecordBatch as RecordBatch;
use common_time::timestamp::TimeUnit;
use datafusion::arrow::array::{TimestampNanosecondArray, UInt64Builder};
@@ -59,11 +61,66 @@ use crate::sst::to_sst_arrow_schema;
#[derive(Clone)]
pub struct BulkPart {
pub batch: RecordBatch,
pub num_rows: usize,
pub max_ts: i64,
pub min_ts: i64,
pub sequence: u64,
pub timestamp_index: usize,
pub raw_data: Option<ArrowIpc>,
}
impl TryFrom<BulkWalEntry> for BulkPart {
type Error = error::Error;
fn try_from(value: BulkWalEntry) -> std::result::Result<Self, Self::Error> {
match value.body.expect("Entry payload should be present") {
Body::ArrowIpc(ipc) => {
let mut decoder = FlightDecoder::try_from_schema_bytes(&ipc.schema)
.context(error::ConvertBulkWalEntrySnafu)?;
let batch = decoder
.try_decode_record_batch(&ipc.data_header, &ipc.payload)
.context(error::ConvertBulkWalEntrySnafu)?;
Ok(Self {
batch,
max_ts: value.max_ts,
min_ts: value.min_ts,
sequence: value.sequence,
timestamp_index: value.timestamp_index as usize,
raw_data: Some(ipc),
})
}
}
}
}
impl From<&BulkPart> for BulkWalEntry {
fn from(value: &BulkPart) -> Self {
if let Some(ipc) = &value.raw_data {
BulkWalEntry {
sequence: value.sequence,
max_ts: value.max_ts,
min_ts: value.min_ts,
timestamp_index: value.timestamp_index as u32,
body: Some(Body::ArrowIpc(ipc.clone())),
}
} else {
let mut encoder = FlightEncoder::default();
let schema_bytes = encoder
.encode(FlightMessage::Schema(value.batch.schema()))
.data_header;
let rb_data = encoder.encode(FlightMessage::RecordBatch(value.batch.clone()));
BulkWalEntry {
sequence: value.sequence,
max_ts: value.max_ts,
min_ts: value.min_ts,
timestamp_index: value.timestamp_index as u32,
body: Some(Body::ArrowIpc(ArrowIpc {
schema: schema_bytes,
data_header: rb_data.data_header,
payload: rb_data.data_body,
})),
}
}
}
}
impl BulkPart {
@@ -84,7 +141,7 @@ impl BulkPart {
.collect::<datatypes::error::Result<Vec<_>>>()
.context(error::ComputeVectorSnafu)?;
let rows = (0..self.num_rows)
let rows = (0..self.num_rows())
.map(|row_idx| {
let values = (0..self.batch.num_columns())
.map(|col_idx| {
@@ -130,6 +187,10 @@ impl BulkPart {
pub fn timestamps(&self) -> &ArrayRef {
self.batch.column(self.timestamp_index)
}
pub fn num_rows(&self) -> usize {
self.batch.num_rows()
}
}
#[derive(Debug)]

View File

@@ -221,7 +221,7 @@ impl Memtable for SimpleBulkMemtable {
value_bytes: part.estimated_size(),
min_ts: part.min_ts,
max_ts: part.max_ts,
num_rows: part.num_rows,
num_rows: part.num_rows(),
max_sequence: sequence,
});
Ok(())
@@ -563,8 +563,8 @@ mod tests {
sequence: 1,
min_ts: 1,
max_ts: 2,
num_rows: 2,
timestamp_index: 0,
raw_data: None,
};
memtable.write_bulk(part).unwrap();

View File

@@ -193,11 +193,11 @@ pub fn filter_record_batch(part: &BulkPart, min: i64, max: i64) -> Result<Option
.context(error::NewRecordBatchSnafu)?;
Ok(Some(BulkPart {
batch,
num_rows,
max_ts,
min_ts,
sequence: part.sequence,
timestamp_index: part.timestamp_index,
raw_data: None,
}))
}
@@ -1184,11 +1184,11 @@ mod tests {
let min_ts = ts.iter().min().copied().unwrap();
BulkPart {
batch,
num_rows: ts.len(),
max_ts,
min_ts,
sequence,
timestamp_index: 0,
raw_data: None,
}
}
@@ -1297,17 +1297,17 @@ mod tests {
let part = BulkPart {
batch,
num_rows: 5,
max_ts: 8000,
min_ts: 1000,
sequence: 0,
timestamp_index: 0,
raw_data: None,
};
let result = filter_record_batch(&part, 1000, 2000).unwrap();
assert!(result.is_some());
let filtered = result.unwrap();
assert_eq!(filtered.num_rows, 1);
assert_eq!(filtered.num_rows(), 1);
assert_eq!(filtered.min_ts, 1000);
assert_eq!(filtered.max_ts, 1000);
@@ -1315,7 +1315,7 @@ mod tests {
let result = filter_record_batch(&part, 3000, 6000).unwrap();
assert!(result.is_some());
let filtered = result.unwrap();
assert_eq!(filtered.num_rows, 1);
assert_eq!(filtered.num_rows(), 1);
assert_eq!(filtered.min_ts, 5000);
assert_eq!(filtered.max_ts, 5000);
@@ -1327,7 +1327,7 @@ mod tests {
let result = filter_record_batch(&part, 0, 9000).unwrap();
assert!(result.is_some());
let filtered = result.unwrap();
assert_eq!(filtered.num_rows, 5);
assert_eq!(filtered.num_rows(), 5);
assert_eq!(filtered.min_ts, 1000);
assert_eq!(filtered.max_ts, 8000);
}

View File

@@ -249,7 +249,7 @@ impl Memtable for TimeSeriesMemtable {
metrics.max_sequence = part.sequence;
metrics.max_ts = part.max_ts;
metrics.min_ts = part.min_ts;
metrics.num_rows = part.num_rows;
metrics.num_rows = part.num_rows();
self.update_stats(metrics);
Ok(())
}

View File

@@ -47,6 +47,7 @@ use crate::error::{
use crate::manifest::action::RegionManifest;
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
use crate::manifest::storage::manifest_compress_type;
use crate::memtable::bulk::part::BulkPart;
use crate::memtable::time_partition::TimePartitions;
use crate::memtable::MemtableBuilderProvider;
use crate::region::options::RegionOptions;
@@ -648,9 +649,16 @@ where
);
}
for bulk_entry in entry.bulk_entries {
let part = BulkPart::try_from(bulk_entry)?;
rows_replayed += part.num_rows();
region_write_ctx.push_bulk(OptionOutputTx::none(), part);
}
// set next_entry_id and write to memtable.
region_write_ctx.set_next_entry_id(last_entry_id + 1);
region_write_ctx.write_memtable().await;
region_write_ctx.write_bulk().await;
}
// TODO(weny): We need to update `flushed_entry_id` in the region manifest

View File

@@ -15,7 +15,7 @@
use std::mem;
use std::sync::Arc;
use api::v1::{Mutation, OpType, Rows, WalEntry, WriteHint};
use api::v1::{BulkWalEntry, Mutation, OpType, Rows, WalEntry, WriteHint};
use futures::stream::{FuturesUnordered, StreamExt};
use snafu::ResultExt;
use store_api::logstore::provider::Provider;
@@ -254,9 +254,12 @@ impl RegionWriteCtx {
pub(crate) fn push_bulk(&mut self, sender: OptionOutputTx, mut bulk: BulkPart) {
self.bulk_notifiers
.push(WriteNotify::new(sender, bulk.num_rows));
.push(WriteNotify::new(sender, bulk.num_rows()));
bulk.sequence = self.next_sequence;
self.next_sequence += bulk.num_rows as u64;
// Add bulk wal entry
self.wal_entry.bulk_entries.push(BulkWalEntry::from(&bulk));
self.next_sequence += bulk.num_rows() as u64;
self.bulk_parts.push(bulk);
}
@@ -270,7 +273,7 @@ impl RegionWriteCtx {
if self.bulk_parts.len() == 1 {
let part = self.bulk_parts.swap_remove(0);
let num_rows = part.num_rows;
let num_rows = part.num_rows();
if let Err(e) = self.version.memtables.mutable.write_bulk(part) {
self.bulk_notifiers[0].err = Some(Arc::new(e));
} else {
@@ -283,7 +286,7 @@ impl RegionWriteCtx {
for (i, part) in self.bulk_parts.drain(..).enumerate() {
let mutable = self.version.memtables.mutable.clone();
tasks.push(common_runtime::spawn_blocking_global(move || {
let num_rows = part.num_rows;
let num_rows = part.num_rows();
(i, mutable.write_bulk(part), num_rows)
}));
}

View File

@@ -35,7 +35,7 @@ use store_api::logstore::provider::Provider;
use store_api::logstore::{AppendBatchResponse, LogStore, WalIndex};
use store_api::storage::RegionId;
use crate::error::{BuildEntrySnafu, DeleteWalSnafu, EncodeWalSnafu, Result, WriteWalSnafu};
use crate::error::{BuildEntrySnafu, DeleteWalSnafu, Result, WriteWalSnafu};
use crate::wal::entry_reader::{LogStoreEntryReader, WalEntryReader};
use crate::wal::raw_entry_reader::{LogStoreRawEntryReader, RegionRawEntryReader};
@@ -78,7 +78,6 @@ impl<S: LogStore> Wal<S> {
WalWriter {
store: self.store.clone(),
entries: Vec::new(),
entry_encode_buf: Vec::new(),
providers: HashMap::new(),
}
}
@@ -176,8 +175,6 @@ pub struct WalWriter<S: LogStore> {
store: Arc<S>,
/// Entries to write.
entries: Vec<Entry>,
/// Buffer to encode WAL entry.
entry_encode_buf: Vec<u8>,
/// Providers of regions being written into.
providers: HashMap<RegionId, Provider>,
}
@@ -197,14 +194,10 @@ impl<S: LogStore> WalWriter<S> {
.entry(region_id)
.or_insert_with(|| provider.clone());
// Encode wal entry to log store entry.
self.entry_encode_buf.clear();
wal_entry
.encode(&mut self.entry_encode_buf)
.context(EncodeWalSnafu { region_id })?;
let data = wal_entry.encode_to_vec();
let entry = self
.store
.entry(&mut self.entry_encode_buf, entry_id, region_id, provider)
.entry(data, entry_id, region_id, provider)
.map_err(BoxedError::new)
.context(BuildEntrySnafu { region_id })?;
@@ -229,9 +222,16 @@ impl<S: LogStore> WalWriter<S> {
#[cfg(test)]
mod tests {
use api::v1::{
value, ColumnDataType, ColumnSchema, Mutation, OpType, Row, Rows, SemanticType, Value,
bulk_wal_entry, value, ArrowIpc, BulkWalEntry, ColumnDataType, ColumnSchema, Mutation,
OpType, Row, Rows, SemanticType, Value,
};
use common_recordbatch::DfRecordBatch;
use common_test_util::flight::encode_to_flight_data;
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use datatypes::arrow;
use datatypes::arrow::array::{ArrayRef, TimestampMillisecondArray};
use datatypes::arrow::datatypes::Field;
use datatypes::arrow_array::StringArray;
use futures::TryStreamExt;
use log_store::raft_engine::log_store::RaftEngineLogStore;
use log_store::test_util::log_store_util;
@@ -313,6 +313,7 @@ mod tests {
new_mutation(OpType::Put, 1, &[("k1", 1), ("k2", 2)]),
new_mutation(OpType::Put, 2, &[("k3", 3), ("k4", 4)]),
],
bulk_entries: vec![],
};
let mut writer = wal.writer();
// Region 1 entry 1.
@@ -350,6 +351,46 @@ mod tests {
writer.write_to_wal().await.unwrap();
}
fn build_record_batch(rows: &[(&str, i64)]) -> DfRecordBatch {
let schema = Arc::new(arrow::datatypes::Schema::new(vec![
Field::new("tag", arrow::datatypes::DataType::Utf8, false),
Field::new(
"ts",
arrow::datatypes::DataType::Timestamp(
arrow::datatypes::TimeUnit::Millisecond,
None,
),
false,
),
]));
let tag = Arc::new(StringArray::from_iter_values(
rows.iter().map(|r| r.0.to_string()),
)) as ArrayRef;
let ts = Arc::new(TimestampMillisecondArray::from_iter_values(
rows.iter().map(|r| r.1),
)) as ArrayRef;
DfRecordBatch::try_new(schema, vec![tag, ts]).unwrap()
}
fn build_bulk_wal_entry(sequence_number: SequenceNumber, rows: &[(&str, i64)]) -> BulkWalEntry {
let rb = build_record_batch(rows);
let (schema, rb) = encode_to_flight_data(rb);
let max_ts = rows.iter().map(|r| r.1).max().unwrap();
let min_ts = rows.iter().map(|r| r.1).min().unwrap();
BulkWalEntry {
sequence: sequence_number,
max_ts,
min_ts,
timestamp_index: 1,
body: Some(bulk_wal_entry::Body::ArrowIpc(ArrowIpc {
schema: schema.data_header,
data_header: rb.data_header,
payload: rb.data_body,
})),
}
}
fn sample_entries() -> Vec<WalEntry> {
vec![
WalEntry {
@@ -357,18 +398,22 @@ mod tests {
new_mutation(OpType::Put, 1, &[("k1", 1), ("k2", 2)]),
new_mutation(OpType::Put, 2, &[("k3", 3), ("k4", 4)]),
],
bulk_entries: vec![],
},
WalEntry {
mutations: vec![new_mutation(OpType::Put, 3, &[("k1", 1), ("k2", 2)])],
bulk_entries: vec![],
},
WalEntry {
mutations: vec![
new_mutation(OpType::Put, 4, &[("k1", 1), ("k2", 2)]),
new_mutation(OpType::Put, 5, &[("k3", 3), ("k4", 4)]),
],
bulk_entries: vec![],
},
WalEntry {
mutations: vec![new_mutation(OpType::Put, 6, &[("k1", 1), ("k2", 2)])],
bulk_entries: vec![build_bulk_wal_entry(7, &[("k1", 8), ("k2", 9)])],
},
]
}

View File

@@ -281,6 +281,7 @@ mod tests {
rows: None,
write_hint: None,
}],
bulk_entries: vec![],
}
.encode_to_vec(),
}),
@@ -295,6 +296,7 @@ mod tests {
rows: None,
write_hint: None,
}],
bulk_entries: vec![],
}
.encode_to_vec(),
}),
@@ -309,6 +311,7 @@ mod tests {
rows: None,
write_hint: None,
}],
bulk_entries: vec![],
}
.encode_to_vec(),
}),
@@ -353,6 +356,7 @@ mod tests {
rows: None,
write_hint: None,
}],
bulk_entries: vec![],
}
)]
);
@@ -373,6 +377,7 @@ mod tests {
rows: None,
write_hint: None,
}],
bulk_entries: vec![],
}
)]
);
@@ -389,6 +394,7 @@ mod tests {
rows: None,
write_hint: None,
}],
bulk_entries: vec![],
};
let region2 = RegionId::new(1, 2);
let region2_expected_wal_entry = WalEntry {
@@ -398,6 +404,7 @@ mod tests {
rows: None,
write_hint: None,
}],
bulk_entries: vec![],
};
let region3 = RegionId::new(1, 3);
let region3_expected_wal_entry = WalEntry {
@@ -407,6 +414,7 @@ mod tests {
rows: None,
write_hint: None,
}],
bulk_entries: vec![],
};
let provider = Provider::kafka_provider("my_topic".to_string());
entries.extend(generate_tail_corrupted_stream(
@@ -485,6 +493,7 @@ mod tests {
rows: None,
write_hint: None,
}],
bulk_entries: vec![],
};
let region2 = RegionId::new(1, 2);
let provider = Provider::kafka_provider("my_topic".to_string());
@@ -562,6 +571,7 @@ mod tests {
rows: None,
write_hint: None,
}],
bulk_entries: vec![],
}
.encode_to_vec(),
}),
@@ -576,6 +586,7 @@ mod tests {
rows: None,
write_hint: None,
}],
bulk_entries: vec![],
}
.encode_to_vec(),
}),
@@ -590,6 +601,7 @@ mod tests {
rows: None,
write_hint: None,
}],
bulk_entries: vec![],
}
.encode_to_vec(),
}),
@@ -604,6 +616,7 @@ mod tests {
rows: None,
write_hint: None,
}],
bulk_entries: vec![],
}
.encode_to_vec(),
}),
@@ -639,6 +652,7 @@ mod tests {
rows: None,
write_hint: None,
}],
bulk_entries: vec![],
}
)]
);

View File

@@ -125,6 +125,7 @@ mod tests {
rows: None,
write_hint: None,
}],
bulk_entries: vec![],
};
let encoded_entry = wal_entry.encode_to_vec();
let parts = encoded_entry

View File

@@ -189,7 +189,7 @@ mod tests {
fn entry(
&self,
_data: &mut Vec<u8>,
_data: Vec<u8>,
_entry_id: EntryId,
_region_id: RegionId,
_provider: &Provider,

View File

@@ -41,7 +41,6 @@ impl<S: LogStore> RegionWorkerLoop<S> {
.with_label_values(&["process_bulk_req"])
.start_timer();
let batch = request.payload;
let num_rows = batch.num_rows();
let Some((ts_index, ts)) = batch
.schema()
@@ -113,11 +112,11 @@ impl<S: LogStore> RegionWorkerLoop<S> {
let part = BulkPart {
batch,
num_rows,
max_ts,
min_ts,
sequence: 0,
timestamp_index: ts_index,
raw_data: Some(request.raw_data),
};
pending_bulk_request.push(SenderBulkRequest {
sender,

View File

@@ -14,9 +14,9 @@
use ahash::{HashMap, HashMapExt};
use api::v1::region::{
bulk_insert_request, region_request, ArrowIpc, BulkInsertRequest, RegionRequest,
RegionRequestHeader,
bulk_insert_request, region_request, BulkInsertRequest, RegionRequest, RegionRequestHeader,
};
use api::v1::ArrowIpc;
use common_base::AffectedRows;
use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
use common_grpc::FlightData;
@@ -90,8 +90,8 @@ impl Inserter {
..Default::default()
}),
body: Some(region_request::Body::BulkInsert(BulkInsertRequest {
region_id: region_id.as_u64(),
body: Some(bulk_insert_request::Body::ArrowIpc(ArrowIpc {
region_id: region_id.as_u64(),
schema: schema_bytes,
data_header: data.data_header,
payload: data.data_body,
@@ -103,11 +103,15 @@ impl Inserter {
.with_label_values(&["datanode_handle"])
.start_timer();
let datanode = self.node_manager.datanode(&datanode).await;
return datanode
let result = datanode
.handle(request)
.await
.context(error::RequestRegionSnafu)
.map(|r| r.affected_rows);
if let Ok(rows) = result {
crate::metrics::DIST_INGEST_ROW_COUNT.inc_by(rows as u64);
}
return result;
}
let mut mask_per_datanode = HashMap::with_capacity(region_masks.len());
@@ -182,8 +186,8 @@ impl Inserter {
..Default::default()
}),
body: Some(region_request::Body::BulkInsert(BulkInsertRequest {
region_id: region_id.as_u64(),
body: Some(bulk_insert_request::Body::ArrowIpc(ArrowIpc {
region_id: region_id.as_u64(),
schema: schema_bytes,
data_header: header,
payload,

View File

@@ -89,7 +89,7 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug {
/// Makes an entry instance of the associated Entry type
fn entry(
&self,
data: &mut Vec<u8>,
data: Vec<u8>,
entry_id: EntryId,
region_id: RegionId,
provider: &Provider,

View File

@@ -22,13 +22,13 @@ use api::v1::column_def::{
};
use api::v1::region::bulk_insert_request::Body;
use api::v1::region::{
alter_request, compact_request, region_request, AlterRequest, AlterRequests, ArrowIpc,
BulkInsertRequest, CloseRequest, CompactRequest, CreateRequest, CreateRequests, DeleteRequests,
DropRequest, DropRequests, FlushRequest, InsertRequests, OpenRequest, TruncateRequest,
alter_request, compact_request, region_request, AlterRequest, AlterRequests, BulkInsertRequest,
CloseRequest, CompactRequest, CreateRequest, CreateRequests, DeleteRequests, DropRequest,
DropRequests, FlushRequest, InsertRequests, OpenRequest, TruncateRequest,
};
use api::v1::{
self, set_index, Analyzer, FulltextBackend as PbFulltextBackend, Option as PbOption, Rows,
SemanticType, SkippingIndexType as PbSkippingIndexType, WriteHint,
self, set_index, Analyzer, ArrowIpc, FulltextBackend as PbFulltextBackend, Option as PbOption,
Rows, SemanticType, SkippingIndexType as PbSkippingIndexType, WriteHint,
};
pub use common_base::AffectedRows;
use common_grpc::flight::FlightDecoder;
@@ -325,28 +325,27 @@ fn make_region_truncate(truncate: TruncateRequest) -> Result<Vec<(RegionId, Regi
/// Convert [BulkInsertRequest] to [RegionRequest] and group by [RegionId].
fn make_region_bulk_inserts(request: BulkInsertRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
let region_id = request.region_id.into();
let Some(Body::ArrowIpc(request)) = request.body else {
return Ok(vec![]);
};
let ArrowIpc {
region_id,
schema,
payload,
data_header,
} = request;
let decoder_timer = metrics::CONVERT_REGION_BULK_REQUEST
.with_label_values(&["decode"])
.start_timer();
let mut decoder = FlightDecoder::try_from_schema_bytes(&schema).context(FlightCodecSnafu)?;
let mut decoder =
FlightDecoder::try_from_schema_bytes(&request.schema).context(FlightCodecSnafu)?;
let payload = decoder
.try_decode_record_batch(&data_header, &payload)
.try_decode_record_batch(&request.data_header, &request.payload)
.context(FlightCodecSnafu)?;
decoder_timer.observe_duration();
let region_id: RegionId = region_id.into();
Ok(vec![(
region_id,
RegionRequest::BulkInserts(RegionBulkInsertsRequest { region_id, payload }),
RegionRequest::BulkInserts(RegionBulkInsertsRequest {
region_id,
payload,
raw_data: request,
}),
)])
}
@@ -1137,6 +1136,7 @@ pub struct RegionSequencesRequest {
pub struct RegionBulkInsertsRequest {
pub region_id: RegionId,
pub payload: DfRecordBatch,
pub raw_data: ArrowIpc,
}
impl RegionBulkInsertsRequest {