diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs index ae0af78904..5f7db4d928 100644 --- a/src/datanode/src/tests.rs +++ b/src/datanode/src/tests.rs @@ -252,7 +252,7 @@ impl RegionEngine for MockRegionEngine { unimplemented!() } - async fn get_last_seq_num(&self, _: RegionId) -> Result, BoxedError> { + async fn get_committed_sequence(&self, _: RegionId) -> Result { unimplemented!() } diff --git a/src/file-engine/src/engine.rs b/src/file-engine/src/engine.rs index bc1ceaed40..d3ec72c1e2 100644 --- a/src/file-engine/src/engine.rs +++ b/src/file-engine/src/engine.rs @@ -115,8 +115,8 @@ impl RegionEngine for FileRegionEngine { None } - async fn get_last_seq_num(&self, _: RegionId) -> Result, BoxedError> { - Ok(None) + async fn get_committed_sequence(&self, _: RegionId) -> Result { + Ok(Default::default()) } fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError> { diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index 7677ae4619..0066118f2c 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -257,10 +257,10 @@ impl RegionEngine for MetricEngine { self.handle_query(region_id, request).await } - async fn get_last_seq_num( + async fn get_committed_sequence( &self, region_id: RegionId, - ) -> Result, BoxedError> { + ) -> Result { self.inner .get_last_seq_num(region_id) .await diff --git a/src/metric-engine/src/engine/read.rs b/src/metric-engine/src/engine/read.rs index 07643873c1..13ae461db1 100644 --- a/src/metric-engine/src/engine/read.rs +++ b/src/metric-engine/src/engine/read.rs @@ -89,7 +89,7 @@ impl MetricEngineInner { Ok(scanner) } - pub async fn get_last_seq_num(&self, region_id: RegionId) -> Result> { + pub async fn get_last_seq_num(&self, region_id: RegionId) -> Result { let region_id = if self.is_physical_region(region_id) { region_id } else { @@ -97,7 +97,7 @@ impl MetricEngineInner { utils::to_data_region_id(physical_region_id) }; self.mito - .get_last_seq_num(region_id) + .get_committed_sequence(region_id) .await .context(MitoReadOperationSnafu) } diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index e28d29c152..0cf946416c 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -483,6 +483,7 @@ impl Compactor for DefaultCompactor { .map(|seconds| Duration::from_secs(seconds as u64)), flushed_entry_id: None, flushed_sequence: None, + committed_sequence: None, }; let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone())); diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index cef9c6853c..ca8faebb4a 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -23,6 +23,8 @@ mod basic_test; #[cfg(test)] mod batch_open_test; #[cfg(test)] +mod bump_committed_sequence_test; +#[cfg(test)] mod catchup_test; #[cfg(test)] mod close_test; @@ -469,6 +471,7 @@ fn is_valid_region_edit(edit: &RegionEdit) -> bool { compaction_time_window: None, flushed_entry_id: None, flushed_sequence: None, + .. } ) } @@ -662,10 +665,11 @@ impl EngineInner { receiver.await.context(RecvSnafu)? } - fn get_last_seq_num(&self, region_id: RegionId) -> Result> { + /// Returns the sequence of latest committed data. + fn get_committed_sequence(&self, region_id: RegionId) -> Result { // Reading a region doesn't need to go through the region worker thread. - let region = self.find_region(region_id)?; - Ok(Some(region.find_committed_sequence())) + self.find_region(region_id) + .map(|r| r.find_committed_sequence()) } /// Handles the scan `request` and returns a [ScanRegion]. @@ -832,12 +836,12 @@ impl RegionEngine for MitoEngine { .map_err(BoxedError::new) } - async fn get_last_seq_num( + async fn get_committed_sequence( &self, region_id: RegionId, - ) -> Result, BoxedError> { + ) -> Result { self.inner - .get_last_seq_num(region_id) + .get_committed_sequence(region_id) .map_err(BoxedError::new) } @@ -1021,6 +1025,7 @@ mod tests { compaction_time_window: None, flushed_entry_id: None, flushed_sequence: None, + committed_sequence: None, }; assert!(is_valid_region_edit(&edit)); @@ -1032,6 +1037,7 @@ mod tests { compaction_time_window: None, flushed_entry_id: None, flushed_sequence: None, + committed_sequence: None, }; assert!(!is_valid_region_edit(&edit)); @@ -1043,6 +1049,7 @@ mod tests { compaction_time_window: None, flushed_entry_id: None, flushed_sequence: None, + committed_sequence: None, }; assert!(!is_valid_region_edit(&edit)); @@ -1054,6 +1061,7 @@ mod tests { compaction_time_window: Some(Duration::from_secs(1)), flushed_entry_id: None, flushed_sequence: None, + committed_sequence: None, }; assert!(!is_valid_region_edit(&edit)); let edit = RegionEdit { @@ -1063,6 +1071,7 @@ mod tests { compaction_time_window: None, flushed_entry_id: Some(1), flushed_sequence: None, + committed_sequence: None, }; assert!(!is_valid_region_edit(&edit)); let edit = RegionEdit { @@ -1072,6 +1081,7 @@ mod tests { compaction_time_window: None, flushed_entry_id: None, flushed_sequence: Some(1), + committed_sequence: None, }; assert!(!is_valid_region_edit(&edit)); } diff --git a/src/mito2/src/engine/bump_committed_sequence_test.rs b/src/mito2/src/engine/bump_committed_sequence_test.rs new file mode 100644 index 0000000000..edf29deb6c --- /dev/null +++ b/src/mito2/src/engine/bump_committed_sequence_test.rs @@ -0,0 +1,136 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use api::v1::Rows; +use store_api::region_engine::{RegionEngine, RegionRole}; +use store_api::region_request::{PathType, RegionOpenRequest, RegionRequest}; +use store_api::storage::RegionId; + +use crate::config::MitoConfig; +use crate::manifest::action::RegionEdit; +use crate::sst::file::FileMeta; +use crate::test_util::{CreateRequestBuilder, TestEnv, build_rows, put_rows, rows_schema}; + +#[tokio::test] +async fn test_bump_committed_sequence() { + common_telemetry::init_default_ut_logging(); + let mut env = TestEnv::new().await; + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + + let table_dir = request.table_dir.clone(); + + env.get_schema_metadata_manager() + .register_region_table_info( + region_id.table_id(), + "test_table", + "test_catalog", + "test_schema", + None, + env.get_kv_backend(), + ) + .await; + + let _ = engine + .handle_request(region_id, RegionRequest::Create(request.clone())) + .await + .unwrap(); + let region = engine.get_region(region_id).unwrap(); + assert_eq!(region.version_control.current().committed_sequence, 0); + assert_eq!(region.version_control.current().version.flushed_sequence, 0); + + let column_schemas = rows_schema(&request); + let rows = Rows { + schema: column_schemas, + rows: build_rows(0, 42), + }; + put_rows(&engine, region_id, rows).await; + assert_eq!(region.version_control.current().committed_sequence, 42); + assert_eq!(region.version_control.current().version.flushed_sequence, 0); + + engine + .edit_region( + region_id, + RegionEdit { + files_to_add: vec![FileMeta::default()], + files_to_remove: vec![], + timestamp_ms: None, + compaction_time_window: None, + flushed_entry_id: None, + flushed_sequence: None, + committed_sequence: None, + }, + ) + .await + .unwrap(); + + assert_eq!(region.version_control.current().version.flushed_sequence, 0); + assert_eq!(region.version_control.committed_sequence(), 43); + + // Reopen region. + let engine = env.reopen_engine(engine, MitoConfig::default()).await; + engine + .handle_request( + region_id, + RegionRequest::Open(RegionOpenRequest { + engine: String::new(), + table_dir: table_dir.clone(), + path_type: PathType::Bare, + options: HashMap::default(), + skip_wal_replay: false, + checkpoint: None, + }), + ) + .await + .unwrap(); + let region = engine.get_region(region_id).unwrap(); + region.set_role(RegionRole::Leader); + assert_eq!(region.version_control.current().version.flushed_sequence, 0); + assert_eq!(region.version_control.committed_sequence(), 43); + + // Write another 2 rows after editing. + let column_schemas = rows_schema(&request); + let rows = Rows { + schema: column_schemas, + rows: build_rows(0, 2), + }; + put_rows(&engine, region_id, rows).await; + assert_eq!(region.version_control.committed_sequence(), 45); + assert_eq!(region.version_control.current().version.flushed_sequence, 0); + + // Reopen region. + let engine = env.reopen_engine(engine, MitoConfig::default()).await; + engine + .handle_request( + region_id, + RegionRequest::Open(RegionOpenRequest { + engine: String::new(), + table_dir, + path_type: PathType::Bare, + options: HashMap::default(), + skip_wal_replay: false, + checkpoint: None, + }), + ) + .await + .unwrap(); + let region = engine.get_region(region_id).unwrap(); + region.set_role(RegionRole::Leader); + assert_eq!(region.version_control.current().version.flushed_sequence, 0); + assert_eq!(region.version_control.committed_sequence(), 45); +} diff --git a/src/mito2/src/engine/edit_region_test.rs b/src/mito2/src/engine/edit_region_test.rs index 20f68fe5d9..1a9407fb3b 100644 --- a/src/mito2/src/engine/edit_region_test.rs +++ b/src/mito2/src/engine/edit_region_test.rs @@ -96,6 +96,7 @@ async fn test_edit_region_schedule_compaction() { compaction_time_window: None, flushed_entry_id: None, flushed_sequence: None, + committed_sequence: None, }; engine .edit_region(region.region_id, new_edit()) @@ -185,6 +186,7 @@ async fn test_edit_region_fill_cache() { compaction_time_window: None, flushed_entry_id: None, flushed_sequence: None, + committed_sequence: None, }; engine.edit_region(region.region_id, edit).await.unwrap(); @@ -236,6 +238,7 @@ async fn test_edit_region_concurrently() { compaction_time_window: None, flushed_entry_id: None, flushed_sequence: None, + committed_sequence: None, }; engine .edit_region(self.region.region_id, edit) diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 010fb88946..27c04851c8 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -384,6 +384,7 @@ impl RegionFlushTask { // The last entry has been flushed. flushed_entry_id: Some(version_data.last_entry_id), flushed_sequence: Some(version_data.committed_sequence), + committed_sequence: None, }; info!("Applying {edit:?} to region {}", self.region_id); @@ -1313,6 +1314,7 @@ mod tests { compaction_time_window: None, flushed_entry_id: None, flushed_sequence: None, + committed_sequence: None, }), &[0], builder.file_purger(), diff --git a/src/mito2/src/manifest/action.rs b/src/mito2/src/manifest/action.rs index 85b8677b0a..351e935734 100644 --- a/src/mito2/src/manifest/action.rs +++ b/src/mito2/src/manifest/action.rs @@ -62,6 +62,7 @@ pub struct RegionEdit { pub compaction_time_window: Option, pub flushed_entry_id: Option, pub flushed_sequence: Option, + pub committed_sequence: Option, } #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] @@ -119,6 +120,7 @@ pub struct RegionManifest { pub flushed_entry_id: EntryId, /// Last sequence of flushed data. pub flushed_sequence: SequenceNumber, + pub committed_sequence: Option, /// Current manifest version. pub manifest_version: ManifestVersion, /// Last WAL entry id of truncated data. @@ -138,6 +140,7 @@ impl PartialEq for RegionManifest { && self.manifest_version == other.manifest_version && self.truncated_entry_id == other.truncated_entry_id && self.compaction_time_window == other.compaction_time_window + && self.committed_sequence == other.committed_sequence } } @@ -151,6 +154,7 @@ pub struct RegionManifestBuilder { manifest_version: ManifestVersion, truncated_entry_id: Option, compaction_time_window: Option, + committed_sequence: Option, } impl RegionManifestBuilder { @@ -166,6 +170,7 @@ impl RegionManifestBuilder { flushed_sequence: s.flushed_sequence, truncated_entry_id: s.truncated_entry_id, compaction_time_window: s.compaction_time_window, + committed_sequence: s.committed_sequence, } } else { Default::default() @@ -199,6 +204,13 @@ impl RegionManifestBuilder { if let Some(flushed_sequence) = edit.flushed_sequence { self.flushed_sequence = self.flushed_sequence.max(flushed_sequence); } + + if let Some(committed_sequence) = edit.committed_sequence { + self.committed_sequence = Some( + self.committed_sequence + .map_or(committed_sequence, |exist| exist.max(committed_sequence)), + ); + } if let Some(window) = edit.compaction_time_window { self.compaction_time_window = Some(window); } @@ -253,6 +265,7 @@ impl RegionManifestBuilder { removed_files: self.removed_files, flushed_entry_id: self.flushed_entry_id, flushed_sequence: self.flushed_sequence, + committed_sequence: self.committed_sequence, manifest_version: self.manifest_version, truncated_entry_id: self.truncated_entry_id, compaction_time_window: self.compaction_time_window, @@ -377,6 +390,7 @@ impl RegionMetaActionList { compaction_time_window: None, flushed_entry_id: None, flushed_sequence: None, + committed_sequence: None, }; for action in self.actions { @@ -391,6 +405,10 @@ impl RegionMetaActionList { if let Some(seq) = region_edit.flushed_sequence { edit.flushed_sequence = Some(edit.flushed_sequence.map_or(seq, |v| v.max(seq))); } + if let Some(seq) = region_edit.committed_sequence { + edit.committed_sequence = + Some(edit.committed_sequence.map_or(seq, |v| v.max(seq))); + } // Prefer the latest non-none time window if region_edit.compaction_time_window.is_some() { edit.compaction_time_window = region_edit.compaction_time_window; @@ -698,6 +716,7 @@ mod tests { files: HashMap::new(), flushed_entry_id: 0, flushed_sequence: 0, + committed_sequence: None, manifest_version: 0, truncated_entry_id: None, compaction_time_window: None, @@ -807,6 +826,7 @@ mod tests { removed_files: Default::default(), flushed_entry_id: 0, flushed_sequence: 0, + committed_sequence: None, manifest_version: 0, truncated_entry_id: None, compaction_time_window: None, @@ -819,6 +839,7 @@ mod tests { removed_files: Default::default(), flushed_entry_id: 0, flushed_sequence: 0, + committed_sequence: None, manifest_version: 0, truncated_entry_id: None, compaction_time_window: None, @@ -872,6 +893,7 @@ mod tests { compaction_time_window: None, flushed_entry_id: None, flushed_sequence: None, + committed_sequence: None, }, new_from_old ); @@ -884,6 +906,7 @@ mod tests { compaction_time_window: None, flushed_entry_id: None, flushed_sequence: None, + committed_sequence: None, }; let new_json = serde_json::to_string(&new).unwrap(); diff --git a/src/mito2/src/manifest/manager.rs b/src/mito2/src/manifest/manager.rs index b988d7d423..a1befe96bb 100644 --- a/src/mito2/src/manifest/manager.rs +++ b/src/mito2/src/manifest/manager.rs @@ -194,6 +194,7 @@ impl RegionManifestManager { compaction_time_window: None, flushed_entry_id: Some(flushed_entry_id), flushed_sequence: None, + committed_sequence: None, })); } @@ -886,6 +887,7 @@ mod test { compaction_time_window: None, flushed_entry_id: None, flushed_sequence: None, + committed_sequence: None, })]), RegionRoleState::Leader(RegionLeaderState::Writable), ) @@ -913,6 +915,6 @@ mod test { // get manifest size again let manifest_size = manager.manifest_usage(); - assert_eq!(manifest_size, 1669); + assert_eq!(manifest_size, 1721); } } diff --git a/src/mito2/src/manifest/tests/checkpoint.rs b/src/mito2/src/manifest/tests/checkpoint.rs index b76394e9a3..3408c978a8 100644 --- a/src/mito2/src/manifest/tests/checkpoint.rs +++ b/src/mito2/src/manifest/tests/checkpoint.rs @@ -77,6 +77,7 @@ fn nop_action() -> RegionMetaActionList { compaction_time_window: None, flushed_entry_id: None, flushed_sequence: None, + committed_sequence: None, })]) } @@ -276,6 +277,7 @@ async fn checkpoint_with_different_compression_types() { compaction_time_window: None, flushed_entry_id: None, flushed_sequence: None, + committed_sequence: None, })]); actions.push(action); } @@ -340,6 +342,7 @@ fn generate_action_lists(num: usize) -> (Vec, Vec) compaction_time_window: None, flushed_entry_id: None, flushed_sequence: None, + committed_sequence: None, })]); actions.push(action); } diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index c06ca890c6..dcebd65b98 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -473,6 +473,7 @@ impl RegionOpener { .build(); let flushed_entry_id = version.flushed_entry_id; let version_control = Arc::new(VersionControl::new(version)); + let topic_latest_entry_id = if !self.skip_wal_replay { let replay_from_entry_id = self .replay_checkpoint @@ -508,6 +509,21 @@ impl RegionOpener { 0 }; + + if let Some(committed_in_manifest) = manifest.committed_sequence { + let committed_after_replay = version_control.committed_sequence(); + if committed_in_manifest > committed_after_replay { + info!( + "Overriding committed sequence, region: {}, flushed_sequence: {}, committed_sequence: {} -> {}", + self.region_id, + version_control.current().version.flushed_sequence, + version_control.committed_sequence(), + committed_in_manifest + ); + version_control.set_committed_sequence(committed_in_manifest); + } + } + let now = self.time_provider.current_time_millis(); let region = MitoRegion { region_id: self.region_id, @@ -729,14 +745,22 @@ where mutation.rows, mutation.write_hint, OptionOutputTx::none(), + // We should respect the sequence in WAL during replay. + Some(mutation.sequence), ); } for bulk_entry in entry.bulk_entries { let part = BulkPart::try_from(bulk_entry)?; rows_replayed += part.num_rows(); + // During replay, we should adopt the sequence from WAL. + let bulk_sequence_from_wal = part.sequence; ensure!( - region_write_ctx.push_bulk(OptionOutputTx::none(), part), + region_write_ctx.push_bulk( + OptionOutputTx::none(), + part, + Some(bulk_sequence_from_wal) + ), RegionCorruptedSnafu { region_id, reason: "unable to replay memtable with bulk entries", diff --git a/src/mito2/src/region/version.rs b/src/mito2/src/region/version.rs index e37ed743ce..c7438b196a 100644 --- a/src/mito2/src/region/version.rs +++ b/src/mito2/src/region/version.rs @@ -71,6 +71,12 @@ impl VersionControl { self.data.read().unwrap().clone() } + /// Updates the `committed_sequence` of version. + pub(crate) fn set_committed_sequence(&self, seq: SequenceNumber) { + let mut data = self.data.write().unwrap(); + data.committed_sequence = seq; + } + /// Updates committed sequence and entry id. pub(crate) fn set_sequence_and_entry_id(&self, seq: SequenceNumber, entry_id: EntryId) { let mut data = self.data.write().unwrap(); @@ -137,6 +143,7 @@ impl VersionControl { ) { let version = self.current().version; let builder = VersionBuilder::from_version(version); + let committed_sequence = edit.as_ref().and_then(|e| e.committed_sequence); let builder = if let Some(edit) = edit { builder.apply_edit(edit, purger) } else { @@ -145,6 +152,11 @@ impl VersionControl { let new_version = Arc::new(builder.remove_memtables(memtables_to_remove).build()); let mut version_data = self.data.write().unwrap(); + version_data.committed_sequence = if let Some(committed_in_edit) = committed_sequence { + version_data.committed_sequence.max(committed_in_edit) + } else { + version_data.committed_sequence + }; version_data.version = new_version; } diff --git a/src/mito2/src/region_write_ctx.rs b/src/mito2/src/region_write_ctx.rs index 231ad085cb..6ada2d2765 100644 --- a/src/mito2/src/region_write_ctx.rs +++ b/src/mito2/src/region_write_ctx.rs @@ -145,13 +145,18 @@ impl RegionWriteCtx { } /// Push mutation to the context. + /// This method adopts the sequence number in parameters if present. pub(crate) fn push_mutation( &mut self, op_type: i32, rows: Option, write_hint: Option, tx: OptionOutputTx, + sequence: Option, ) { + if let Some(sequence) = sequence { + self.next_sequence = sequence; + } let num_rows = rows.as_ref().map(|rows| rows.rows.len()).unwrap_or(0); self.wal_entry.mutations.push(Mutation { op_type, @@ -268,7 +273,15 @@ impl RegionWriteCtx { .set_sequence_and_entry_id(self.next_sequence - 1, self.next_entry_id - 1); } - pub(crate) fn push_bulk(&mut self, sender: OptionOutputTx, mut bulk: BulkPart) -> bool { + pub(crate) fn push_bulk( + &mut self, + sender: OptionOutputTx, + mut bulk: BulkPart, + sequence: Option, + ) -> bool { + if let Some(sequence) = sequence { + self.next_sequence = sequence; + } bulk.sequence = self.next_sequence; let entry = match BulkWalEntry::try_from(&bulk) { Ok(entry) => entry, diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index d888c518ff..508a767e20 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -382,7 +382,7 @@ impl WriteRequest { if column.column_schema.is_default_impure() { UnexpectedSnafu { reason: format!( - "unexpected impure default value with region_id: {}, column: {}, default_value: {:?}", + "unexpected impure default value with region_id: {}, column: {}, default_value: {:?}", self.region_id, column.column_schema.name, column.column_schema.default_constraint(), diff --git a/src/mito2/src/test_util/version_util.rs b/src/mito2/src/test_util/version_util.rs index 16fe0eaa12..d9d1b5a99c 100644 --- a/src/mito2/src/test_util/version_util.rs +++ b/src/mito2/src/test_util/version_util.rs @@ -211,6 +211,7 @@ pub(crate) fn apply_edit( compaction_time_window: None, flushed_entry_id: None, flushed_sequence: None, + committed_sequence: None, }), &[], purger, diff --git a/src/mito2/src/worker/handle_manifest.rs b/src/mito2/src/worker/handle_manifest.rs index 4fb32befed..9f19df3bde 100644 --- a/src/mito2/src/worker/handle_manifest.rs +++ b/src/mito2/src/worker/handle_manifest.rs @@ -17,6 +17,7 @@ //! It updates the manifest and applies the changes to the region in background. use std::collections::{HashMap, VecDeque}; +use std::num::NonZeroU64; use std::sync::Arc; use common_telemetry::{info, warn}; @@ -203,9 +204,16 @@ impl RegionWorkerLoop { let RegionEditRequest { region_id: _, - edit, + mut edit, tx: sender, } = request; + let file_sequence = region.version_control.committed_sequence() + 1; + edit.committed_sequence = Some(file_sequence); + + // For every file added through region edit, we should fill the file sequence + for file in &mut edit.files_to_add { + file.sequence = NonZeroU64::new(file_sequence); + } // Marks the region as editing. if let Err(e) = region.set_editing() { @@ -229,6 +237,7 @@ impl RegionWorkerLoop { result, }), }; + // We don't set state back as the worker loop is already exited. if let Err(res) = request_sender .send(WorkerRequestWithTime::new(notify)) diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index 7b8f6a70cb..e86aa67630 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -316,6 +316,7 @@ impl RegionWorkerLoop { Some(sender_req.request.rows), sender_req.request.hint, sender_req.sender, + None, ); } } @@ -401,7 +402,7 @@ impl RegionWorkerLoop { } // Collect requests by region. - if !region_ctx.push_bulk(bulk_req.sender, bulk_req.request) { + if !region_ctx.push_bulk(bulk_req.sender, bulk_req.request, None) { return; } } diff --git a/src/query/src/optimizer/test_util.rs b/src/query/src/optimizer/test_util.rs index 2b3b473770..cc5712b8a5 100644 --- a/src/query/src/optimizer/test_util.rs +++ b/src/query/src/optimizer/test_util.rs @@ -86,11 +86,11 @@ impl RegionEngine for MetaRegionEngine { None } - async fn get_last_seq_num( + async fn get_committed_sequence( &self, _region_id: RegionId, - ) -> Result, BoxedError> { - Ok(None) + ) -> Result { + Ok(SequenceNumber::default()) } async fn stop(&self) -> Result<(), BoxedError> { diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index f1a31d04e1..000b36cc17 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -34,9 +34,7 @@ use tokio::sync::Semaphore; use crate::logstore::entry; use crate::metadata::RegionMetadataRef; -use crate::region_request::{ - BatchRegionDdlRequest, RegionOpenRequest, RegionRequest, RegionSequencesRequest, -}; +use crate::region_request::{BatchRegionDdlRequest, RegionOpenRequest, RegionRequest}; use crate::storage::{RegionId, ScanRequest, SequenceNumber}; /// The settable region role state. @@ -746,25 +744,11 @@ pub trait RegionEngine: Send + Sync { request: RegionRequest, ) -> Result; - /// Returns the last sequence number of the region. - async fn get_last_seq_num( + /// Returns the committed sequence (sequence of latest written data). + async fn get_committed_sequence( &self, region_id: RegionId, - ) -> Result, BoxedError>; - - async fn get_region_sequences( - &self, - seqs: RegionSequencesRequest, - ) -> Result, BoxedError> { - let mut results = HashMap::with_capacity(seqs.region_ids.len()); - - for region_id in seqs.region_ids { - let seq = self.get_last_seq_num(region_id).await?.unwrap_or_default(); - results.insert(region_id.as_u64(), seq); - } - - Ok(results) - } + ) -> Result; /// Handles query and return a scanner that can be used to scan the region concurrently. async fn handle_query( diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index 09d1fe72ed..c44a225d6c 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -1377,12 +1377,6 @@ pub struct RegionCatchupRequest { pub checkpoint: Option, } -/// Get sequences of regions by region ids. -#[derive(Debug, Clone)] -pub struct RegionSequencesRequest { - pub region_ids: Vec, -} - #[derive(Debug, Clone)] pub struct RegionBulkInsertsRequest { pub region_id: RegionId,