diff --git a/src/client/src/client_manager.rs b/src/client/src/client_manager.rs index 3acb8b3d8f..6b7771a36b 100644 --- a/src/client/src/client_manager.rs +++ b/src/client/src/client_manager.rs @@ -23,6 +23,8 @@ use moka::future::{Cache, CacheBuilder}; use crate::Client; +const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10); + pub struct DatanodeClients { channel_manager: ChannelManager, clients: Cache, @@ -31,7 +33,9 @@ pub struct DatanodeClients { impl Default for DatanodeClients { fn default() -> Self { - let config = ChannelConfig::new().timeout(Duration::from_secs(8)); + let config = ChannelConfig::new() + .timeout(DEFAULT_TIMEOUT) + .connect_timeout(DEFAULT_TIMEOUT); Self { channel_manager: ChannelManager::with_config(config), diff --git a/src/meta-srv/src/mocks.rs b/src/meta-srv/src/mocks.rs index c6c38fbbdf..df901c5ea4 100644 --- a/src/meta-srv/src/mocks.rs +++ b/src/meta-srv/src/mocks.rs @@ -96,8 +96,8 @@ pub async fn mock( }); let config = ChannelConfig::new() - .timeout(Duration::from_secs(1)) - .connect_timeout(Duration::from_secs(1)) + .timeout(Duration::from_secs(10)) + .connect_timeout(Duration::from_secs(10)) .tcp_nodelay(true); let channel_manager = ChannelManager::with_config(config); diff --git a/src/storage/src/flush.rs b/src/storage/src/flush.rs index 901d7c2851..ebb41d7af9 100644 --- a/src/storage/src/flush.rs +++ b/src/storage/src/flush.rs @@ -253,9 +253,9 @@ impl FlushJob { let region_id = self.shared.id(); let mut futures = Vec::with_capacity(self.memtables.len()); let iter_ctx = IterContext { - for_flush: true, // TODO(ruihang): dynamic row group size based on content (#412) batch_size: WRITE_ROW_GROUP_SIZE, + // All sequences are visible by default. ..Default::default() }; diff --git a/src/storage/src/memtable.rs b/src/storage/src/memtable.rs index d89e06dcb0..b76af906ad 100644 --- a/src/storage/src/memtable.rs +++ b/src/storage/src/memtable.rs @@ -99,11 +99,6 @@ pub struct IterContext { /// Max visible sequence (inclusive). pub visible_sequence: SequenceNumber, - // TODO(yingwen): [flush] Maybe delay deduping and visiblility handling, just returns all rows - // in memtable. - /// Returns all rows, ignores sequence visibility and key duplication. - pub for_flush: bool, - /// Schema the reader expect to read. /// /// Set to `None` to read all columns. @@ -119,7 +114,6 @@ impl Default for IterContext { batch_size: consts::READ_BATCH_SIZE, // All data in memory is visible by default. visible_sequence: SequenceNumber::MAX, - for_flush: false, projected_schema: None, time_range: None, } diff --git a/src/storage/src/memtable/btree.rs b/src/storage/src/memtable/btree.rs index 949d2c67c3..c067eef293 100644 --- a/src/storage/src/memtable/btree.rs +++ b/src/storage/src/memtable/btree.rs @@ -238,12 +238,8 @@ impl BTreeIterator { map.range(..) }; - let (keys, sequences, op_types, values) = if self.ctx.for_flush { - collect_iter(iter, self.ctx.batch_size) - } else { - let iter = MapIterWrapper::new(iter, self.ctx.visible_sequence, self.ctx.time_range); - collect_iter(iter, self.ctx.batch_size) - }; + let iter = MapIterWrapper::new(iter, self.ctx.visible_sequence, self.ctx.time_range); + let (keys, sequences, op_types, values) = collect_iter(iter, self.ctx.batch_size); if keys.is_empty() { return Ok(None); diff --git a/src/storage/src/memtable/tests.rs b/src/storage/src/memtable/tests.rs index 83f97a8503..df7a639aac 100644 --- a/src/storage/src/memtable/tests.rs +++ b/src/storage/src/memtable/tests.rs @@ -267,6 +267,12 @@ fn write_iter_memtable_case(ctx: &TestContext) { } } +#[test] +fn test_iter_context_default() { + let ctx = IterContext::default(); + assert_eq!(SequenceNumber::MAX, ctx.visible_sequence); +} + #[test] fn test_write_iter_memtable() { let tester = MemtableTester::default(); @@ -435,7 +441,6 @@ fn test_sequence_visibility() { let iter_ctx = IterContext { batch_size: 1, visible_sequence: 9, - for_flush: false, projected_schema: None, time_range: None, }; @@ -454,7 +459,6 @@ fn test_sequence_visibility() { let iter_ctx = IterContext { batch_size: 1, visible_sequence: 10, - for_flush: false, projected_schema: None, time_range: None, }; @@ -473,7 +477,6 @@ fn test_sequence_visibility() { let iter_ctx = IterContext { batch_size: 1, visible_sequence: 11, - for_flush: false, projected_schema: None, time_range: None, };