feat: dedup rows while flushing memtables (#1916)

* test: enlarge meta client timeout

* feat: dedup on flush

* test: enlarge datanode clients timeout

* chore: fix typo
This commit is contained in:
Yingwen
2023-07-10 16:07:10 +09:00
committed by GitHub
parent 553530cff4
commit 4ea8a78817
6 changed files with 16 additions and 19 deletions

View File

@@ -23,6 +23,8 @@ use moka::future::{Cache, CacheBuilder};
use crate::Client;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
pub struct DatanodeClients {
channel_manager: ChannelManager,
clients: Cache<Peer, Client>,
@@ -31,7 +33,9 @@ pub struct DatanodeClients {
impl Default for DatanodeClients {
fn default() -> Self {
let config = ChannelConfig::new().timeout(Duration::from_secs(8));
let config = ChannelConfig::new()
.timeout(DEFAULT_TIMEOUT)
.connect_timeout(DEFAULT_TIMEOUT);
Self {
channel_manager: ChannelManager::with_config(config),

View File

@@ -96,8 +96,8 @@ pub async fn mock(
});
let config = ChannelConfig::new()
.timeout(Duration::from_secs(1))
.connect_timeout(Duration::from_secs(1))
.timeout(Duration::from_secs(10))
.connect_timeout(Duration::from_secs(10))
.tcp_nodelay(true);
let channel_manager = ChannelManager::with_config(config);

View File

@@ -253,9 +253,9 @@ impl<S: LogStore> FlushJob<S> {
let region_id = self.shared.id();
let mut futures = Vec::with_capacity(self.memtables.len());
let iter_ctx = IterContext {
for_flush: true,
// TODO(ruihang): dynamic row group size based on content (#412)
batch_size: WRITE_ROW_GROUP_SIZE,
// All sequences are visible by default.
..Default::default()
};

View File

@@ -99,11 +99,6 @@ pub struct IterContext {
/// Max visible sequence (inclusive).
pub visible_sequence: SequenceNumber,
// TODO(yingwen): [flush] Maybe delay deduping and visiblility handling, just returns all rows
// in memtable.
/// Returns all rows, ignores sequence visibility and key duplication.
pub for_flush: bool,
/// Schema the reader expect to read.
///
/// Set to `None` to read all columns.
@@ -119,7 +114,6 @@ impl Default for IterContext {
batch_size: consts::READ_BATCH_SIZE,
// All data in memory is visible by default.
visible_sequence: SequenceNumber::MAX,
for_flush: false,
projected_schema: None,
time_range: None,
}

View File

@@ -238,12 +238,8 @@ impl BTreeIterator {
map.range(..)
};
let (keys, sequences, op_types, values) = if self.ctx.for_flush {
collect_iter(iter, self.ctx.batch_size)
} else {
let iter = MapIterWrapper::new(iter, self.ctx.visible_sequence, self.ctx.time_range);
collect_iter(iter, self.ctx.batch_size)
};
let iter = MapIterWrapper::new(iter, self.ctx.visible_sequence, self.ctx.time_range);
let (keys, sequences, op_types, values) = collect_iter(iter, self.ctx.batch_size);
if keys.is_empty() {
return Ok(None);

View File

@@ -267,6 +267,12 @@ fn write_iter_memtable_case(ctx: &TestContext) {
}
}
#[test]
fn test_iter_context_default() {
let ctx = IterContext::default();
assert_eq!(SequenceNumber::MAX, ctx.visible_sequence);
}
#[test]
fn test_write_iter_memtable() {
let tester = MemtableTester::default();
@@ -435,7 +441,6 @@ fn test_sequence_visibility() {
let iter_ctx = IterContext {
batch_size: 1,
visible_sequence: 9,
for_flush: false,
projected_schema: None,
time_range: None,
};
@@ -454,7 +459,6 @@ fn test_sequence_visibility() {
let iter_ctx = IterContext {
batch_size: 1,
visible_sequence: 10,
for_flush: false,
projected_schema: None,
time_range: None,
};
@@ -473,7 +477,6 @@ fn test_sequence_visibility() {
let iter_ctx = IterContext {
batch_size: 1,
visible_sequence: 11,
for_flush: false,
projected_schema: None,
time_range: None,
};