mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-30 03:40:37 +00:00
feat: do not concat batches in MergeReader (#2833)
This commit is contained in:
@@ -28,9 +28,6 @@ use crate::memtable::BoxedBatchIterator;
|
||||
use crate::metrics::{MERGE_FILTER_ROWS_TOTAL, READ_STAGE_ELAPSED};
|
||||
use crate::read::{Batch, BatchReader, BoxedBatchReader, Source};
|
||||
|
||||
/// Minimum batch size to output.
|
||||
const MIN_BATCH_SIZE: usize = 64;
|
||||
|
||||
/// Reader to merge sorted batches.
|
||||
///
|
||||
/// The merge reader merges [Batch]es from multiple sources that yield sorted batches.
|
||||
@@ -49,11 +46,8 @@ pub struct MergeReader {
|
||||
///
|
||||
/// `Node` in this heap **must** not be empty.
|
||||
cold: BinaryHeap<Node>,
|
||||
/// Batches to output.
|
||||
batch_merger: BatchMerger,
|
||||
/// Suggested size of each batch. The batch returned by the reader can have more rows than the
|
||||
/// batch size.
|
||||
batch_size: usize,
|
||||
/// Batch to output.
|
||||
output_batch: Option<Batch>,
|
||||
/// Local metrics.
|
||||
metrics: Metrics,
|
||||
}
|
||||
@@ -62,15 +56,7 @@ pub struct MergeReader {
|
||||
impl BatchReader for MergeReader {
|
||||
async fn next_batch(&mut self) -> Result<Option<Batch>> {
|
||||
let start = Instant::now();
|
||||
while !self.hot.is_empty() && self.batch_merger.num_rows() < self.batch_size {
|
||||
if let Some(current_key) = self.batch_merger.primary_key() {
|
||||
// If the hottest node has a different key, we have finish collecting current key.
|
||||
// Safety: hot is not empty.
|
||||
if self.hot.peek().unwrap().primary_key() != current_key {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
while !self.hot.is_empty() && self.output_batch.is_none() {
|
||||
if self.hot.len() == 1 {
|
||||
// No need to do merge sort if only one batch in the hot heap.
|
||||
self.fetch_batch_from_hottest().await?;
|
||||
@@ -82,17 +68,14 @@ impl BatchReader for MergeReader {
|
||||
}
|
||||
}
|
||||
|
||||
if self.batch_merger.is_empty() {
|
||||
if let Some(batch) = self.output_batch.take() {
|
||||
self.metrics.scan_cost += start.elapsed();
|
||||
self.metrics.num_output_rows += batch.num_rows();
|
||||
Ok(Some(batch))
|
||||
} else {
|
||||
// Nothing fetched.
|
||||
self.metrics.scan_cost += start.elapsed();
|
||||
// Update deleted rows num.
|
||||
self.metrics.num_deleted_rows = self.batch_merger.num_deleted_rows();
|
||||
Ok(None)
|
||||
} else {
|
||||
let batch = self.batch_merger.merge_batches()?;
|
||||
self.metrics.scan_cost += start.elapsed();
|
||||
self.metrics.num_output_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0);
|
||||
Ok(batch)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -115,7 +98,7 @@ impl Drop for MergeReader {
|
||||
|
||||
impl MergeReader {
|
||||
/// Creates and initializes a new [MergeReader].
|
||||
pub async fn new(sources: Vec<Source>, batch_size: usize) -> Result<MergeReader> {
|
||||
pub async fn new(sources: Vec<Source>) -> Result<MergeReader> {
|
||||
let start = Instant::now();
|
||||
let mut metrics = Metrics::default();
|
||||
|
||||
@@ -132,8 +115,7 @@ impl MergeReader {
|
||||
let mut reader = MergeReader {
|
||||
hot,
|
||||
cold,
|
||||
batch_merger: BatchMerger::new(),
|
||||
batch_size,
|
||||
output_batch: None,
|
||||
metrics,
|
||||
};
|
||||
// Initializes the reader.
|
||||
@@ -168,7 +150,7 @@ impl MergeReader {
|
||||
|
||||
let mut hottest = self.hot.pop().unwrap();
|
||||
let batch = hottest.fetch_batch(&mut self.metrics).await?;
|
||||
self.batch_merger.push(batch)?;
|
||||
Self::maybe_output_batch(batch, &mut self.output_batch, &mut self.metrics)?;
|
||||
self.reheap(hottest)
|
||||
}
|
||||
|
||||
@@ -199,7 +181,11 @@ impl MergeReader {
|
||||
// They have duplicate timestamps. Outputs timestamps before the duplicated timestamp.
|
||||
// Batch itself doesn't contain duplicate timestamps so timestamps before `pos`
|
||||
// must be less than `next_min_ts`.
|
||||
self.batch_merger.push(top.slice(0, pos))?;
|
||||
Self::maybe_output_batch(
|
||||
top.slice(0, pos),
|
||||
&mut self.output_batch,
|
||||
&mut self.metrics,
|
||||
)?;
|
||||
// This keep the duplicate timestamp in the node.
|
||||
top_node.skip_rows(pos, &mut self.metrics).await?;
|
||||
// The merge window should contain this timestamp so only nodes in the hot heap
|
||||
@@ -209,7 +195,11 @@ impl MergeReader {
|
||||
}
|
||||
Err(pos) => {
|
||||
// No duplicate timestamp. Outputs timestamp before `pos`.
|
||||
self.batch_merger.push(top.slice(0, pos))?;
|
||||
Self::maybe_output_batch(
|
||||
top.slice(0, pos),
|
||||
&mut self.output_batch,
|
||||
&mut self.metrics,
|
||||
)?;
|
||||
top_node.skip_rows(pos, &mut self.metrics).await?;
|
||||
self.reheap(top_node)?;
|
||||
}
|
||||
@@ -300,16 +290,37 @@ impl MergeReader {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Removeds deleted entries and sets the `batch` to the `output_batch`.
|
||||
///
|
||||
/// Ignores the `batch` if it is empty.
|
||||
fn maybe_output_batch(
|
||||
mut batch: Batch,
|
||||
output_batch: &mut Option<Batch>,
|
||||
metrics: &mut Metrics,
|
||||
) -> Result<()> {
|
||||
debug_assert!(output_batch.is_none());
|
||||
|
||||
let num_rows = batch.num_rows();
|
||||
batch.filter_deleted()?;
|
||||
// Update deleted rows metrics.
|
||||
metrics.num_deleted_rows += num_rows - batch.num_rows();
|
||||
if batch.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
*output_batch = Some(batch);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Builder to build and initialize a [MergeReader].
|
||||
#[derive(Default)]
|
||||
pub struct MergeReaderBuilder {
|
||||
/// Input sources.
|
||||
///
|
||||
/// All source must yield batches with the same schema.
|
||||
sources: Vec<Source>,
|
||||
/// Batch size of the reader.
|
||||
batch_size: usize,
|
||||
}
|
||||
|
||||
impl MergeReaderBuilder {
|
||||
@@ -330,25 +341,10 @@ impl MergeReaderBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets the batch size of the reader.
|
||||
pub fn batch_size(&mut self, size: usize) -> &mut Self {
|
||||
self.batch_size = if size == 0 { MIN_BATCH_SIZE } else { size };
|
||||
self
|
||||
}
|
||||
|
||||
/// Builds and initializes the reader, then resets the builder.
|
||||
pub async fn build(&mut self) -> Result<MergeReader> {
|
||||
let sources = mem::take(&mut self.sources);
|
||||
MergeReader::new(sources, self.batch_size).await
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for MergeReaderBuilder {
|
||||
fn default() -> Self {
|
||||
MergeReaderBuilder {
|
||||
sources: Vec::new(),
|
||||
batch_size: MIN_BATCH_SIZE,
|
||||
}
|
||||
MergeReader::new(sources).await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -371,89 +367,6 @@ struct Metrics {
|
||||
num_deleted_rows: usize,
|
||||
}
|
||||
|
||||
/// Helper to collect and merge small batches for same primary key.
|
||||
struct BatchMerger {
|
||||
/// Buffered non-empty batches to merge.
|
||||
batches: Vec<Batch>,
|
||||
/// Number of rows in the batch.
|
||||
num_rows: usize,
|
||||
/// Number of rows deleted.
|
||||
num_deleted_rows: usize,
|
||||
}
|
||||
|
||||
impl BatchMerger {
|
||||
/// Returns a empty merger.
|
||||
fn new() -> BatchMerger {
|
||||
BatchMerger {
|
||||
batches: Vec::new(),
|
||||
num_rows: 0,
|
||||
num_deleted_rows: 0,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the number of rows.
|
||||
fn num_rows(&self) -> usize {
|
||||
self.num_rows
|
||||
}
|
||||
|
||||
/// Returns the number of rows deleted.
|
||||
fn num_deleted_rows(&self) -> usize {
|
||||
self.num_deleted_rows
|
||||
}
|
||||
|
||||
/// Returns true if the merger is empty.
|
||||
fn is_empty(&self) -> bool {
|
||||
self.num_rows() == 0
|
||||
}
|
||||
|
||||
/// Returns the primary key of current merger and `None` if the merger is empty.
|
||||
fn primary_key(&self) -> Option<&[u8]> {
|
||||
self.batches.first().map(|batch| batch.primary_key())
|
||||
}
|
||||
|
||||
/// Removeds deleted entries and pushes a `batch` into the merger.
|
||||
///
|
||||
/// Ignores the `batch` if it is empty.
|
||||
///
|
||||
/// # Panics
|
||||
/// Panics if the `batch` has another primary key.
|
||||
fn push(&mut self, mut batch: Batch) -> Result<()> {
|
||||
debug_assert!(self
|
||||
.batches
|
||||
.last()
|
||||
.map(|b| b.primary_key() == batch.primary_key())
|
||||
.unwrap_or(true));
|
||||
|
||||
let num_rows = batch.num_rows();
|
||||
batch.filter_deleted()?;
|
||||
self.num_deleted_rows += num_rows - batch.num_rows();
|
||||
if batch.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
self.num_rows += batch.num_rows();
|
||||
self.batches.push(batch);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Merge all buffered batches and returns the merged batch. Then
|
||||
/// reset the buffer.
|
||||
fn merge_batches(&mut self) -> Result<Option<Batch>> {
|
||||
if self.batches.is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
// Reset number of rows.
|
||||
self.num_rows = 0;
|
||||
if self.batches.len() == 1 {
|
||||
return Ok(self.batches.pop());
|
||||
}
|
||||
let batches = mem::take(&mut self.batches);
|
||||
Batch::concat(batches).map(Some)
|
||||
}
|
||||
}
|
||||
|
||||
/// A `Node` represent an individual input data source to be merged.
|
||||
struct Node {
|
||||
/// Data source of this `Node`.
|
||||
@@ -669,17 +582,19 @@ mod tests {
|
||||
&[
|
||||
new_batch(
|
||||
b"k1",
|
||||
&[1, 2, 4, 5, 7],
|
||||
&[11, 12, 14, 15, 17],
|
||||
&[
|
||||
OpType::Put,
|
||||
OpType::Put,
|
||||
OpType::Put,
|
||||
OpType::Put,
|
||||
OpType::Put,
|
||||
],
|
||||
&[21, 22, 24, 25, 27],
|
||||
&[1, 2],
|
||||
&[11, 12],
|
||||
&[OpType::Put, OpType::Put],
|
||||
&[21, 22],
|
||||
),
|
||||
new_batch(
|
||||
b"k1",
|
||||
&[4, 5],
|
||||
&[14, 15],
|
||||
&[OpType::Put, OpType::Put],
|
||||
&[24, 25],
|
||||
),
|
||||
new_batch(b"k1", &[7], &[17], &[OpType::Put], &[27]),
|
||||
new_batch(b"k2", &[3], &[13], &[OpType::Put], &[23]),
|
||||
],
|
||||
)
|
||||
@@ -718,13 +633,10 @@ mod tests {
|
||||
check_reader_result(
|
||||
&mut reader,
|
||||
&[
|
||||
new_batch(
|
||||
b"k1",
|
||||
&[1, 2, 3, 4],
|
||||
&[10, 11, 10, 11],
|
||||
&[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
|
||||
&[21, 32, 23, 34],
|
||||
),
|
||||
new_batch(b"k1", &[1], &[10], &[OpType::Put], &[21]),
|
||||
new_batch(b"k1", &[2], &[11], &[OpType::Put], &[32]),
|
||||
new_batch(b"k1", &[3], &[10], &[OpType::Put], &[23]),
|
||||
new_batch(b"k1", &[4], &[11], &[OpType::Put], &[34]),
|
||||
new_batch(b"k2", &[3], &[10], &[OpType::Put], &[23]),
|
||||
],
|
||||
)
|
||||
@@ -785,18 +697,16 @@ mod tests {
|
||||
&[
|
||||
new_batch(
|
||||
b"k1",
|
||||
&[1, 2, 3, 4],
|
||||
&[11, 12, 10, 14],
|
||||
&[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
|
||||
&[21, 22, 33, 24],
|
||||
),
|
||||
new_batch(
|
||||
b"k2",
|
||||
&[1, 3, 10],
|
||||
&[11, 13, 20],
|
||||
&[OpType::Put, OpType::Put, OpType::Put],
|
||||
&[21, 23, 30],
|
||||
&[1, 2],
|
||||
&[11, 12],
|
||||
&[OpType::Put, OpType::Put],
|
||||
&[21, 22],
|
||||
),
|
||||
new_batch(b"k1", &[3], &[10], &[OpType::Put], &[33]),
|
||||
new_batch(b"k1", &[4], &[14], &[OpType::Put], &[24]),
|
||||
new_batch(b"k2", &[1], &[11], &[OpType::Put], &[21]),
|
||||
new_batch(b"k2", &[3], &[13], &[OpType::Put], &[23]),
|
||||
new_batch(b"k2", &[10], &[20], &[OpType::Put], &[30]),
|
||||
],
|
||||
)
|
||||
.await;
|
||||
@@ -900,13 +810,16 @@ mod tests {
|
||||
.unwrap();
|
||||
check_reader_result(
|
||||
&mut reader,
|
||||
&[new_batch(
|
||||
b"k1",
|
||||
&[1, 2, 3],
|
||||
&[10, 11, 11],
|
||||
&[OpType::Put, OpType::Put, OpType::Put],
|
||||
&[21, 32, 33],
|
||||
)],
|
||||
&[
|
||||
new_batch(b"k1", &[1], &[10], &[OpType::Put], &[21]),
|
||||
new_batch(
|
||||
b"k1",
|
||||
&[2, 3],
|
||||
&[11, 11],
|
||||
&[OpType::Put, OpType::Put],
|
||||
&[32, 33],
|
||||
),
|
||||
],
|
||||
)
|
||||
.await;
|
||||
}
|
||||
@@ -945,19 +858,18 @@ mod tests {
|
||||
.unwrap();
|
||||
check_reader_result(
|
||||
&mut reader,
|
||||
&[new_batch(
|
||||
b"k1",
|
||||
&[1, 6, 8, 10, 20],
|
||||
&[11, 11, 11, 10, 11],
|
||||
&[
|
||||
OpType::Put,
|
||||
OpType::Put,
|
||||
OpType::Put,
|
||||
OpType::Put,
|
||||
OpType::Put,
|
||||
],
|
||||
&[31, 36, 38, 30, 40],
|
||||
)],
|
||||
&[
|
||||
new_batch(b"k1", &[1], &[11], &[OpType::Put], &[31]),
|
||||
new_batch(
|
||||
b"k1",
|
||||
&[6, 8],
|
||||
&[11, 11],
|
||||
&[OpType::Put, OpType::Put],
|
||||
&[36, 38],
|
||||
),
|
||||
new_batch(b"k1", &[10], &[10], &[OpType::Put], &[30]),
|
||||
new_batch(b"k1", &[20], &[11], &[OpType::Put], &[40]),
|
||||
],
|
||||
)
|
||||
.await;
|
||||
}
|
||||
@@ -965,7 +877,6 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_merge_many_duplicates() {
|
||||
let mut builder = MergeReaderBuilder::new();
|
||||
builder.batch_size(3);
|
||||
for i in 0..10 {
|
||||
let batches: Vec<_> = (0..8)
|
||||
.map(|ts| new_batch(b"k1", &[ts], &[i], &[OpType::Put], &[100]))
|
||||
@@ -974,184 +885,9 @@ mod tests {
|
||||
builder.push_batch_reader(Box::new(reader));
|
||||
}
|
||||
let mut reader = builder.build().await.unwrap();
|
||||
check_reader_result(
|
||||
&mut reader,
|
||||
&[
|
||||
new_batch(
|
||||
b"k1",
|
||||
&[0, 1, 2],
|
||||
&[9, 9, 9],
|
||||
&[OpType::Put, OpType::Put, OpType::Put],
|
||||
&[100, 100, 100],
|
||||
),
|
||||
new_batch(
|
||||
b"k1",
|
||||
&[3, 4, 5],
|
||||
&[9, 9, 9],
|
||||
&[OpType::Put, OpType::Put, OpType::Put],
|
||||
&[100, 100, 100],
|
||||
),
|
||||
new_batch(
|
||||
b"k1",
|
||||
&[6, 7],
|
||||
&[9, 9],
|
||||
&[OpType::Put, OpType::Put],
|
||||
&[100, 100],
|
||||
),
|
||||
],
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_merge_more_than_batch_size() {
|
||||
let batches: Vec<_> = (0..MIN_BATCH_SIZE as i64 * 2)
|
||||
.map(|ts| new_batch(b"k1", &[ts], &[10], &[OpType::Put], &[100]))
|
||||
let expect: Vec<_> = (0..8)
|
||||
.map(|ts| new_batch(b"k1", &[ts], &[9], &[OpType::Put], &[100]))
|
||||
.collect();
|
||||
let reader = VecBatchReader::new(&batches);
|
||||
let mut reader = MergeReaderBuilder::new()
|
||||
.push_batch_reader(Box::new(reader))
|
||||
// Still use the default batch size.
|
||||
.batch_size(0)
|
||||
.build()
|
||||
.await
|
||||
.unwrap();
|
||||
let ts1: Vec<_> = (0..MIN_BATCH_SIZE as i64).collect();
|
||||
let ts2: Vec<_> = (MIN_BATCH_SIZE as i64..MIN_BATCH_SIZE as i64 * 2).collect();
|
||||
let seqs = vec![10; MIN_BATCH_SIZE];
|
||||
let op_types = vec![OpType::Put; MIN_BATCH_SIZE];
|
||||
let fields = vec![100; MIN_BATCH_SIZE];
|
||||
check_reader_result(
|
||||
&mut reader,
|
||||
&[
|
||||
new_batch(b"k1", &ts1, &seqs, &op_types, &fields),
|
||||
new_batch(b"k1", &ts2, &seqs, &op_types, &fields),
|
||||
],
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_merge_more_than_batch_size_overlapping() {
|
||||
let reader1 = VecBatchReader::new(&[new_batch(
|
||||
b"k1",
|
||||
&[1, 2, 3, 4, 5, 6, 7, 8, 9],
|
||||
&[11, 10, 11, 10, 11, 10, 11, 10, 11],
|
||||
&[
|
||||
OpType::Put,
|
||||
OpType::Put,
|
||||
OpType::Put,
|
||||
OpType::Put,
|
||||
OpType::Put,
|
||||
OpType::Put,
|
||||
OpType::Put,
|
||||
OpType::Put,
|
||||
OpType::Put,
|
||||
],
|
||||
&[21, 22, 23, 24, 25, 26, 27, 28, 29],
|
||||
)]);
|
||||
let reader2 = VecBatchReader::new(&[new_batch(
|
||||
b"k1",
|
||||
&[1, 2, 3, 4, 5, 6, 7, 8, 9],
|
||||
&[10, 11, 10, 11, 10, 11, 10, 11, 10],
|
||||
&[
|
||||
OpType::Put,
|
||||
OpType::Put,
|
||||
OpType::Put,
|
||||
OpType::Put,
|
||||
OpType::Put,
|
||||
OpType::Put,
|
||||
OpType::Put,
|
||||
OpType::Put,
|
||||
OpType::Put,
|
||||
],
|
||||
&[31, 32, 33, 34, 35, 36, 37, 38, 39],
|
||||
)]);
|
||||
let mut reader = MergeReaderBuilder::new()
|
||||
.push_batch_iter(Box::new(reader1))
|
||||
.push_batch_reader(Box::new(reader2))
|
||||
.batch_size(3)
|
||||
.build()
|
||||
.await
|
||||
.unwrap();
|
||||
check_reader_result(
|
||||
&mut reader,
|
||||
&[
|
||||
new_batch(
|
||||
b"k1",
|
||||
&[1, 2, 3],
|
||||
&[11, 11, 11],
|
||||
&[OpType::Put, OpType::Put, OpType::Put],
|
||||
&[21, 32, 23],
|
||||
),
|
||||
new_batch(
|
||||
b"k1",
|
||||
&[4, 5, 6],
|
||||
&[11, 11, 11],
|
||||
&[OpType::Put, OpType::Put, OpType::Put],
|
||||
&[34, 25, 36],
|
||||
),
|
||||
new_batch(
|
||||
b"k1",
|
||||
&[7, 8, 9],
|
||||
&[11, 11, 11],
|
||||
&[OpType::Put, OpType::Put, OpType::Put],
|
||||
&[27, 38, 29],
|
||||
),
|
||||
],
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_batch_merger_empty() {
|
||||
let mut merger = BatchMerger::new();
|
||||
assert!(merger.is_empty());
|
||||
assert!(merger.merge_batches().unwrap().is_none());
|
||||
assert!(merger.primary_key().is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_merge_one_batch() {
|
||||
let mut merger = BatchMerger::new();
|
||||
let expect = new_batch(b"k1", &[1], &[10], &[OpType::Put], &[21]);
|
||||
merger.push(expect.clone()).unwrap();
|
||||
let batch = merger.merge_batches().unwrap().unwrap();
|
||||
assert_eq!(1, batch.num_rows());
|
||||
assert_eq!(expect, batch,);
|
||||
assert!(merger.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_merge_batches() {
|
||||
let mut merger = BatchMerger::new();
|
||||
merger
|
||||
.push(new_batch(b"k1", &[1], &[10], &[OpType::Put], &[21]))
|
||||
.unwrap();
|
||||
assert_eq!(1, merger.num_rows());
|
||||
assert!(!merger.is_empty());
|
||||
merger
|
||||
.push(new_batch(b"k1", &[2], &[10], &[OpType::Put], &[22]))
|
||||
.unwrap();
|
||||
assert_eq!(2, merger.num_rows());
|
||||
merger
|
||||
.push(new_batch(b"k1", &[3], &[10], &[OpType::Delete], &[23]))
|
||||
.unwrap();
|
||||
assert_eq!(2, merger.num_rows());
|
||||
|
||||
let batch = merger.merge_batches().unwrap().unwrap();
|
||||
assert_eq!(2, batch.num_rows());
|
||||
assert_eq!(
|
||||
batch,
|
||||
new_batch(
|
||||
b"k1",
|
||||
&[1, 2],
|
||||
&[10, 10],
|
||||
&[OpType::Put, OpType::Put,],
|
||||
&[21, 22]
|
||||
)
|
||||
);
|
||||
assert!(merger.is_empty());
|
||||
assert_eq!(1, merger.num_deleted_rows());
|
||||
check_reader_result(&mut reader, &expect).await;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user