mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-07 13:52:59 +00:00
chore: bump sequence on region edit (#6947)
* chore/update-sequence-on-region-edit: ### Commit Message Refactor `get_last_seq_num` Method Across Engines - **Change Return Type**: Updated the `get_last_seq_num` method to return `Result<SequenceNumber, BoxedError>` instead of `Result<Option<SequenceNumber>, BoxedError>` in the following files: - `src/datanode/src/tests.rs` - `src/file-engine/src/engine.rs` - `src/metric-engine/src/engine.rs` - `src/metric-engine/src/engine/read.rs` - `src/mito2/src/engine.rs` - `src/query/src/optimizer/test_util.rs` - `src/store-api/src/region_engine.rs` - **Enhance Region Edit Handling**: Modified `RegionWorkerLoop` in `src/mito2/src/worker/handle_manifest.rs` to update file sequences during region edits. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * add committed_sequence to RegionEdit Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * chore/update-sequence-on-region-edit: ### Commit Message Refactor sequence retrieval method - **Renamed Method**: Changed `get_last_seq_num` to `get_committed_sequence` across multiple files to better reflect its purpose of retrieving the latest committed sequence. - Affected files: `tests.rs`, `engine.rs` in `file-engine`, `metric-engine`, `mito2`, `test_util.rs`, and `region_engine.rs`. - **Removed Unused Struct**: Deleted `RegionSequencesRequest` struct from `region_request.rs` as it is no longer needed. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * chore/update-sequence-on-region-edit: **Add Committed Sequence Handling in Region Engine** - **`engine.rs`**: Introduced a new test module `bump_committed_sequence_test` to verify committed sequence handling. - **`bump_committed_sequence_test.rs`**: Added a test to ensure the committed sequence is correctly updated and persisted across region reopenings. - **`action.rs`**: Updated `RegionManifest` and `RegionManifestBuilder` to include `committed_sequence` for tracking. - **`manager.rs`**: Adjusted manifest size assertion to accommodate new committed sequence data. - **`opener.rs`**: Implemented logic to override committed sequence during region opening. - **`version.rs`**: Added `set_committed_sequence` method to update the committed sequence in `VersionControl`. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * chore/update-sequence-on-region-edit: **Enhance `test_bump_committed_sequence` in `bump_committed_sequence_test.rs`** - Updated the test to include row operations using `build_rows`, `put_rows`, and `rows_schema` to verify the committed sequence behavior. - Adjusted assertions to reflect changes in committed sequence after row operations and region edits. - Added comments to clarify the expected behavior of committed sequence after reopening the region and replaying the WAL. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * chore/update-sequence-on-region-edit: **Enhance Region Sequence Management** - **`bump_committed_sequence_test.rs`**: Updated test to handle region reopening and sequence management, ensuring committed sequences are correctly set and verified after edits. - **`opener.rs`**: Improved committed sequence handling by overriding it only if the manifest's sequence is greater than the replayed sequence. Added logging for mutation sequence replay. - **`region_write_ctx.rs`**: Modified `push_mutation` and `push_bulk` methods to adopt sequence numbers from parameters, enhancing sequence management during write operations. - **`handle_write.rs`**: Updated `RegionWorkerLoop` to pass sequence numbers in `push_bulk` and `push_mutation` methods, ensuring consistent sequence handling. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * chore/update-sequence-on-region-edit: ### Remove Debug Logging from `opener.rs` - Removed debug logging for mutation sequences in `opener.rs` to clean up the output and improve performance. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> --------- Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
@@ -252,7 +252,7 @@ impl RegionEngine for MockRegionEngine {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn get_last_seq_num(&self, _: RegionId) -> Result<Option<SequenceNumber>, BoxedError> {
|
||||
async fn get_committed_sequence(&self, _: RegionId) -> Result<SequenceNumber, BoxedError> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
|
||||
@@ -115,8 +115,8 @@ impl RegionEngine for FileRegionEngine {
|
||||
None
|
||||
}
|
||||
|
||||
async fn get_last_seq_num(&self, _: RegionId) -> Result<Option<SequenceNumber>, BoxedError> {
|
||||
Ok(None)
|
||||
async fn get_committed_sequence(&self, _: RegionId) -> Result<SequenceNumber, BoxedError> {
|
||||
Ok(Default::default())
|
||||
}
|
||||
|
||||
fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError> {
|
||||
|
||||
@@ -257,10 +257,10 @@ impl RegionEngine for MetricEngine {
|
||||
self.handle_query(region_id, request).await
|
||||
}
|
||||
|
||||
async fn get_last_seq_num(
|
||||
async fn get_committed_sequence(
|
||||
&self,
|
||||
region_id: RegionId,
|
||||
) -> Result<Option<SequenceNumber>, BoxedError> {
|
||||
) -> Result<SequenceNumber, BoxedError> {
|
||||
self.inner
|
||||
.get_last_seq_num(region_id)
|
||||
.await
|
||||
|
||||
@@ -89,7 +89,7 @@ impl MetricEngineInner {
|
||||
Ok(scanner)
|
||||
}
|
||||
|
||||
pub async fn get_last_seq_num(&self, region_id: RegionId) -> Result<Option<SequenceNumber>> {
|
||||
pub async fn get_last_seq_num(&self, region_id: RegionId) -> Result<SequenceNumber> {
|
||||
let region_id = if self.is_physical_region(region_id) {
|
||||
region_id
|
||||
} else {
|
||||
@@ -97,7 +97,7 @@ impl MetricEngineInner {
|
||||
utils::to_data_region_id(physical_region_id)
|
||||
};
|
||||
self.mito
|
||||
.get_last_seq_num(region_id)
|
||||
.get_committed_sequence(region_id)
|
||||
.await
|
||||
.context(MitoReadOperationSnafu)
|
||||
}
|
||||
|
||||
@@ -483,6 +483,7 @@ impl Compactor for DefaultCompactor {
|
||||
.map(|seconds| Duration::from_secs(seconds as u64)),
|
||||
flushed_entry_id: None,
|
||||
flushed_sequence: None,
|
||||
committed_sequence: None,
|
||||
};
|
||||
|
||||
let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
|
||||
|
||||
@@ -23,6 +23,8 @@ mod basic_test;
|
||||
#[cfg(test)]
|
||||
mod batch_open_test;
|
||||
#[cfg(test)]
|
||||
mod bump_committed_sequence_test;
|
||||
#[cfg(test)]
|
||||
mod catchup_test;
|
||||
#[cfg(test)]
|
||||
mod close_test;
|
||||
@@ -469,6 +471,7 @@ fn is_valid_region_edit(edit: &RegionEdit) -> bool {
|
||||
compaction_time_window: None,
|
||||
flushed_entry_id: None,
|
||||
flushed_sequence: None,
|
||||
..
|
||||
}
|
||||
)
|
||||
}
|
||||
@@ -662,10 +665,11 @@ impl EngineInner {
|
||||
receiver.await.context(RecvSnafu)?
|
||||
}
|
||||
|
||||
fn get_last_seq_num(&self, region_id: RegionId) -> Result<Option<SequenceNumber>> {
|
||||
/// Returns the sequence of latest committed data.
|
||||
fn get_committed_sequence(&self, region_id: RegionId) -> Result<SequenceNumber> {
|
||||
// Reading a region doesn't need to go through the region worker thread.
|
||||
let region = self.find_region(region_id)?;
|
||||
Ok(Some(region.find_committed_sequence()))
|
||||
self.find_region(region_id)
|
||||
.map(|r| r.find_committed_sequence())
|
||||
}
|
||||
|
||||
/// Handles the scan `request` and returns a [ScanRegion].
|
||||
@@ -832,12 +836,12 @@ impl RegionEngine for MitoEngine {
|
||||
.map_err(BoxedError::new)
|
||||
}
|
||||
|
||||
async fn get_last_seq_num(
|
||||
async fn get_committed_sequence(
|
||||
&self,
|
||||
region_id: RegionId,
|
||||
) -> Result<Option<SequenceNumber>, BoxedError> {
|
||||
) -> Result<SequenceNumber, BoxedError> {
|
||||
self.inner
|
||||
.get_last_seq_num(region_id)
|
||||
.get_committed_sequence(region_id)
|
||||
.map_err(BoxedError::new)
|
||||
}
|
||||
|
||||
@@ -1021,6 +1025,7 @@ mod tests {
|
||||
compaction_time_window: None,
|
||||
flushed_entry_id: None,
|
||||
flushed_sequence: None,
|
||||
committed_sequence: None,
|
||||
};
|
||||
assert!(is_valid_region_edit(&edit));
|
||||
|
||||
@@ -1032,6 +1037,7 @@ mod tests {
|
||||
compaction_time_window: None,
|
||||
flushed_entry_id: None,
|
||||
flushed_sequence: None,
|
||||
committed_sequence: None,
|
||||
};
|
||||
assert!(!is_valid_region_edit(&edit));
|
||||
|
||||
@@ -1043,6 +1049,7 @@ mod tests {
|
||||
compaction_time_window: None,
|
||||
flushed_entry_id: None,
|
||||
flushed_sequence: None,
|
||||
committed_sequence: None,
|
||||
};
|
||||
assert!(!is_valid_region_edit(&edit));
|
||||
|
||||
@@ -1054,6 +1061,7 @@ mod tests {
|
||||
compaction_time_window: Some(Duration::from_secs(1)),
|
||||
flushed_entry_id: None,
|
||||
flushed_sequence: None,
|
||||
committed_sequence: None,
|
||||
};
|
||||
assert!(!is_valid_region_edit(&edit));
|
||||
let edit = RegionEdit {
|
||||
@@ -1063,6 +1071,7 @@ mod tests {
|
||||
compaction_time_window: None,
|
||||
flushed_entry_id: Some(1),
|
||||
flushed_sequence: None,
|
||||
committed_sequence: None,
|
||||
};
|
||||
assert!(!is_valid_region_edit(&edit));
|
||||
let edit = RegionEdit {
|
||||
@@ -1072,6 +1081,7 @@ mod tests {
|
||||
compaction_time_window: None,
|
||||
flushed_entry_id: None,
|
||||
flushed_sequence: Some(1),
|
||||
committed_sequence: None,
|
||||
};
|
||||
assert!(!is_valid_region_edit(&edit));
|
||||
}
|
||||
|
||||
136
src/mito2/src/engine/bump_committed_sequence_test.rs
Normal file
136
src/mito2/src/engine/bump_committed_sequence_test.rs
Normal file
@@ -0,0 +1,136 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use api::v1::Rows;
|
||||
use store_api::region_engine::{RegionEngine, RegionRole};
|
||||
use store_api::region_request::{PathType, RegionOpenRequest, RegionRequest};
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::config::MitoConfig;
|
||||
use crate::manifest::action::RegionEdit;
|
||||
use crate::sst::file::FileMeta;
|
||||
use crate::test_util::{CreateRequestBuilder, TestEnv, build_rows, put_rows, rows_schema};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_bump_committed_sequence() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new().build();
|
||||
|
||||
let table_dir = request.table_dir.clone();
|
||||
|
||||
env.get_schema_metadata_manager()
|
||||
.register_region_table_info(
|
||||
region_id.table_id(),
|
||||
"test_table",
|
||||
"test_catalog",
|
||||
"test_schema",
|
||||
None,
|
||||
env.get_kv_backend(),
|
||||
)
|
||||
.await;
|
||||
|
||||
let _ = engine
|
||||
.handle_request(region_id, RegionRequest::Create(request.clone()))
|
||||
.await
|
||||
.unwrap();
|
||||
let region = engine.get_region(region_id).unwrap();
|
||||
assert_eq!(region.version_control.current().committed_sequence, 0);
|
||||
assert_eq!(region.version_control.current().version.flushed_sequence, 0);
|
||||
|
||||
let column_schemas = rows_schema(&request);
|
||||
let rows = Rows {
|
||||
schema: column_schemas,
|
||||
rows: build_rows(0, 42),
|
||||
};
|
||||
put_rows(&engine, region_id, rows).await;
|
||||
assert_eq!(region.version_control.current().committed_sequence, 42);
|
||||
assert_eq!(region.version_control.current().version.flushed_sequence, 0);
|
||||
|
||||
engine
|
||||
.edit_region(
|
||||
region_id,
|
||||
RegionEdit {
|
||||
files_to_add: vec![FileMeta::default()],
|
||||
files_to_remove: vec![],
|
||||
timestamp_ms: None,
|
||||
compaction_time_window: None,
|
||||
flushed_entry_id: None,
|
||||
flushed_sequence: None,
|
||||
committed_sequence: None,
|
||||
},
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(region.version_control.current().version.flushed_sequence, 0);
|
||||
assert_eq!(region.version_control.committed_sequence(), 43);
|
||||
|
||||
// Reopen region.
|
||||
let engine = env.reopen_engine(engine, MitoConfig::default()).await;
|
||||
engine
|
||||
.handle_request(
|
||||
region_id,
|
||||
RegionRequest::Open(RegionOpenRequest {
|
||||
engine: String::new(),
|
||||
table_dir: table_dir.clone(),
|
||||
path_type: PathType::Bare,
|
||||
options: HashMap::default(),
|
||||
skip_wal_replay: false,
|
||||
checkpoint: None,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let region = engine.get_region(region_id).unwrap();
|
||||
region.set_role(RegionRole::Leader);
|
||||
assert_eq!(region.version_control.current().version.flushed_sequence, 0);
|
||||
assert_eq!(region.version_control.committed_sequence(), 43);
|
||||
|
||||
// Write another 2 rows after editing.
|
||||
let column_schemas = rows_schema(&request);
|
||||
let rows = Rows {
|
||||
schema: column_schemas,
|
||||
rows: build_rows(0, 2),
|
||||
};
|
||||
put_rows(&engine, region_id, rows).await;
|
||||
assert_eq!(region.version_control.committed_sequence(), 45);
|
||||
assert_eq!(region.version_control.current().version.flushed_sequence, 0);
|
||||
|
||||
// Reopen region.
|
||||
let engine = env.reopen_engine(engine, MitoConfig::default()).await;
|
||||
engine
|
||||
.handle_request(
|
||||
region_id,
|
||||
RegionRequest::Open(RegionOpenRequest {
|
||||
engine: String::new(),
|
||||
table_dir,
|
||||
path_type: PathType::Bare,
|
||||
options: HashMap::default(),
|
||||
skip_wal_replay: false,
|
||||
checkpoint: None,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let region = engine.get_region(region_id).unwrap();
|
||||
region.set_role(RegionRole::Leader);
|
||||
assert_eq!(region.version_control.current().version.flushed_sequence, 0);
|
||||
assert_eq!(region.version_control.committed_sequence(), 45);
|
||||
}
|
||||
@@ -96,6 +96,7 @@ async fn test_edit_region_schedule_compaction() {
|
||||
compaction_time_window: None,
|
||||
flushed_entry_id: None,
|
||||
flushed_sequence: None,
|
||||
committed_sequence: None,
|
||||
};
|
||||
engine
|
||||
.edit_region(region.region_id, new_edit())
|
||||
@@ -185,6 +186,7 @@ async fn test_edit_region_fill_cache() {
|
||||
compaction_time_window: None,
|
||||
flushed_entry_id: None,
|
||||
flushed_sequence: None,
|
||||
committed_sequence: None,
|
||||
};
|
||||
engine.edit_region(region.region_id, edit).await.unwrap();
|
||||
|
||||
@@ -236,6 +238,7 @@ async fn test_edit_region_concurrently() {
|
||||
compaction_time_window: None,
|
||||
flushed_entry_id: None,
|
||||
flushed_sequence: None,
|
||||
committed_sequence: None,
|
||||
};
|
||||
engine
|
||||
.edit_region(self.region.region_id, edit)
|
||||
|
||||
@@ -384,6 +384,7 @@ impl RegionFlushTask {
|
||||
// The last entry has been flushed.
|
||||
flushed_entry_id: Some(version_data.last_entry_id),
|
||||
flushed_sequence: Some(version_data.committed_sequence),
|
||||
committed_sequence: None,
|
||||
};
|
||||
info!("Applying {edit:?} to region {}", self.region_id);
|
||||
|
||||
@@ -1313,6 +1314,7 @@ mod tests {
|
||||
compaction_time_window: None,
|
||||
flushed_entry_id: None,
|
||||
flushed_sequence: None,
|
||||
committed_sequence: None,
|
||||
}),
|
||||
&[0],
|
||||
builder.file_purger(),
|
||||
|
||||
@@ -62,6 +62,7 @@ pub struct RegionEdit {
|
||||
pub compaction_time_window: Option<Duration>,
|
||||
pub flushed_entry_id: Option<EntryId>,
|
||||
pub flushed_sequence: Option<SequenceNumber>,
|
||||
pub committed_sequence: Option<SequenceNumber>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
|
||||
@@ -119,6 +120,7 @@ pub struct RegionManifest {
|
||||
pub flushed_entry_id: EntryId,
|
||||
/// Last sequence of flushed data.
|
||||
pub flushed_sequence: SequenceNumber,
|
||||
pub committed_sequence: Option<SequenceNumber>,
|
||||
/// Current manifest version.
|
||||
pub manifest_version: ManifestVersion,
|
||||
/// Last WAL entry id of truncated data.
|
||||
@@ -138,6 +140,7 @@ impl PartialEq for RegionManifest {
|
||||
&& self.manifest_version == other.manifest_version
|
||||
&& self.truncated_entry_id == other.truncated_entry_id
|
||||
&& self.compaction_time_window == other.compaction_time_window
|
||||
&& self.committed_sequence == other.committed_sequence
|
||||
}
|
||||
}
|
||||
|
||||
@@ -151,6 +154,7 @@ pub struct RegionManifestBuilder {
|
||||
manifest_version: ManifestVersion,
|
||||
truncated_entry_id: Option<EntryId>,
|
||||
compaction_time_window: Option<Duration>,
|
||||
committed_sequence: Option<SequenceNumber>,
|
||||
}
|
||||
|
||||
impl RegionManifestBuilder {
|
||||
@@ -166,6 +170,7 @@ impl RegionManifestBuilder {
|
||||
flushed_sequence: s.flushed_sequence,
|
||||
truncated_entry_id: s.truncated_entry_id,
|
||||
compaction_time_window: s.compaction_time_window,
|
||||
committed_sequence: s.committed_sequence,
|
||||
}
|
||||
} else {
|
||||
Default::default()
|
||||
@@ -199,6 +204,13 @@ impl RegionManifestBuilder {
|
||||
if let Some(flushed_sequence) = edit.flushed_sequence {
|
||||
self.flushed_sequence = self.flushed_sequence.max(flushed_sequence);
|
||||
}
|
||||
|
||||
if let Some(committed_sequence) = edit.committed_sequence {
|
||||
self.committed_sequence = Some(
|
||||
self.committed_sequence
|
||||
.map_or(committed_sequence, |exist| exist.max(committed_sequence)),
|
||||
);
|
||||
}
|
||||
if let Some(window) = edit.compaction_time_window {
|
||||
self.compaction_time_window = Some(window);
|
||||
}
|
||||
@@ -253,6 +265,7 @@ impl RegionManifestBuilder {
|
||||
removed_files: self.removed_files,
|
||||
flushed_entry_id: self.flushed_entry_id,
|
||||
flushed_sequence: self.flushed_sequence,
|
||||
committed_sequence: self.committed_sequence,
|
||||
manifest_version: self.manifest_version,
|
||||
truncated_entry_id: self.truncated_entry_id,
|
||||
compaction_time_window: self.compaction_time_window,
|
||||
@@ -377,6 +390,7 @@ impl RegionMetaActionList {
|
||||
compaction_time_window: None,
|
||||
flushed_entry_id: None,
|
||||
flushed_sequence: None,
|
||||
committed_sequence: None,
|
||||
};
|
||||
|
||||
for action in self.actions {
|
||||
@@ -391,6 +405,10 @@ impl RegionMetaActionList {
|
||||
if let Some(seq) = region_edit.flushed_sequence {
|
||||
edit.flushed_sequence = Some(edit.flushed_sequence.map_or(seq, |v| v.max(seq)));
|
||||
}
|
||||
if let Some(seq) = region_edit.committed_sequence {
|
||||
edit.committed_sequence =
|
||||
Some(edit.committed_sequence.map_or(seq, |v| v.max(seq)));
|
||||
}
|
||||
// Prefer the latest non-none time window
|
||||
if region_edit.compaction_time_window.is_some() {
|
||||
edit.compaction_time_window = region_edit.compaction_time_window;
|
||||
@@ -698,6 +716,7 @@ mod tests {
|
||||
files: HashMap::new(),
|
||||
flushed_entry_id: 0,
|
||||
flushed_sequence: 0,
|
||||
committed_sequence: None,
|
||||
manifest_version: 0,
|
||||
truncated_entry_id: None,
|
||||
compaction_time_window: None,
|
||||
@@ -807,6 +826,7 @@ mod tests {
|
||||
removed_files: Default::default(),
|
||||
flushed_entry_id: 0,
|
||||
flushed_sequence: 0,
|
||||
committed_sequence: None,
|
||||
manifest_version: 0,
|
||||
truncated_entry_id: None,
|
||||
compaction_time_window: None,
|
||||
@@ -819,6 +839,7 @@ mod tests {
|
||||
removed_files: Default::default(),
|
||||
flushed_entry_id: 0,
|
||||
flushed_sequence: 0,
|
||||
committed_sequence: None,
|
||||
manifest_version: 0,
|
||||
truncated_entry_id: None,
|
||||
compaction_time_window: None,
|
||||
@@ -872,6 +893,7 @@ mod tests {
|
||||
compaction_time_window: None,
|
||||
flushed_entry_id: None,
|
||||
flushed_sequence: None,
|
||||
committed_sequence: None,
|
||||
},
|
||||
new_from_old
|
||||
);
|
||||
@@ -884,6 +906,7 @@ mod tests {
|
||||
compaction_time_window: None,
|
||||
flushed_entry_id: None,
|
||||
flushed_sequence: None,
|
||||
committed_sequence: None,
|
||||
};
|
||||
|
||||
let new_json = serde_json::to_string(&new).unwrap();
|
||||
|
||||
@@ -194,6 +194,7 @@ impl RegionManifestManager {
|
||||
compaction_time_window: None,
|
||||
flushed_entry_id: Some(flushed_entry_id),
|
||||
flushed_sequence: None,
|
||||
committed_sequence: None,
|
||||
}));
|
||||
}
|
||||
|
||||
@@ -886,6 +887,7 @@ mod test {
|
||||
compaction_time_window: None,
|
||||
flushed_entry_id: None,
|
||||
flushed_sequence: None,
|
||||
committed_sequence: None,
|
||||
})]),
|
||||
RegionRoleState::Leader(RegionLeaderState::Writable),
|
||||
)
|
||||
@@ -913,6 +915,6 @@ mod test {
|
||||
|
||||
// get manifest size again
|
||||
let manifest_size = manager.manifest_usage();
|
||||
assert_eq!(manifest_size, 1669);
|
||||
assert_eq!(manifest_size, 1721);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -77,6 +77,7 @@ fn nop_action() -> RegionMetaActionList {
|
||||
compaction_time_window: None,
|
||||
flushed_entry_id: None,
|
||||
flushed_sequence: None,
|
||||
committed_sequence: None,
|
||||
})])
|
||||
}
|
||||
|
||||
@@ -276,6 +277,7 @@ async fn checkpoint_with_different_compression_types() {
|
||||
compaction_time_window: None,
|
||||
flushed_entry_id: None,
|
||||
flushed_sequence: None,
|
||||
committed_sequence: None,
|
||||
})]);
|
||||
actions.push(action);
|
||||
}
|
||||
@@ -340,6 +342,7 @@ fn generate_action_lists(num: usize) -> (Vec<FileId>, Vec<RegionMetaActionList>)
|
||||
compaction_time_window: None,
|
||||
flushed_entry_id: None,
|
||||
flushed_sequence: None,
|
||||
committed_sequence: None,
|
||||
})]);
|
||||
actions.push(action);
|
||||
}
|
||||
|
||||
@@ -473,6 +473,7 @@ impl RegionOpener {
|
||||
.build();
|
||||
let flushed_entry_id = version.flushed_entry_id;
|
||||
let version_control = Arc::new(VersionControl::new(version));
|
||||
|
||||
let topic_latest_entry_id = if !self.skip_wal_replay {
|
||||
let replay_from_entry_id = self
|
||||
.replay_checkpoint
|
||||
@@ -508,6 +509,21 @@ impl RegionOpener {
|
||||
|
||||
0
|
||||
};
|
||||
|
||||
if let Some(committed_in_manifest) = manifest.committed_sequence {
|
||||
let committed_after_replay = version_control.committed_sequence();
|
||||
if committed_in_manifest > committed_after_replay {
|
||||
info!(
|
||||
"Overriding committed sequence, region: {}, flushed_sequence: {}, committed_sequence: {} -> {}",
|
||||
self.region_id,
|
||||
version_control.current().version.flushed_sequence,
|
||||
version_control.committed_sequence(),
|
||||
committed_in_manifest
|
||||
);
|
||||
version_control.set_committed_sequence(committed_in_manifest);
|
||||
}
|
||||
}
|
||||
|
||||
let now = self.time_provider.current_time_millis();
|
||||
let region = MitoRegion {
|
||||
region_id: self.region_id,
|
||||
@@ -729,14 +745,22 @@ where
|
||||
mutation.rows,
|
||||
mutation.write_hint,
|
||||
OptionOutputTx::none(),
|
||||
// We should respect the sequence in WAL during replay.
|
||||
Some(mutation.sequence),
|
||||
);
|
||||
}
|
||||
|
||||
for bulk_entry in entry.bulk_entries {
|
||||
let part = BulkPart::try_from(bulk_entry)?;
|
||||
rows_replayed += part.num_rows();
|
||||
// During replay, we should adopt the sequence from WAL.
|
||||
let bulk_sequence_from_wal = part.sequence;
|
||||
ensure!(
|
||||
region_write_ctx.push_bulk(OptionOutputTx::none(), part),
|
||||
region_write_ctx.push_bulk(
|
||||
OptionOutputTx::none(),
|
||||
part,
|
||||
Some(bulk_sequence_from_wal)
|
||||
),
|
||||
RegionCorruptedSnafu {
|
||||
region_id,
|
||||
reason: "unable to replay memtable with bulk entries",
|
||||
|
||||
@@ -71,6 +71,12 @@ impl VersionControl {
|
||||
self.data.read().unwrap().clone()
|
||||
}
|
||||
|
||||
/// Updates the `committed_sequence` of version.
|
||||
pub(crate) fn set_committed_sequence(&self, seq: SequenceNumber) {
|
||||
let mut data = self.data.write().unwrap();
|
||||
data.committed_sequence = seq;
|
||||
}
|
||||
|
||||
/// Updates committed sequence and entry id.
|
||||
pub(crate) fn set_sequence_and_entry_id(&self, seq: SequenceNumber, entry_id: EntryId) {
|
||||
let mut data = self.data.write().unwrap();
|
||||
@@ -137,6 +143,7 @@ impl VersionControl {
|
||||
) {
|
||||
let version = self.current().version;
|
||||
let builder = VersionBuilder::from_version(version);
|
||||
let committed_sequence = edit.as_ref().and_then(|e| e.committed_sequence);
|
||||
let builder = if let Some(edit) = edit {
|
||||
builder.apply_edit(edit, purger)
|
||||
} else {
|
||||
@@ -145,6 +152,11 @@ impl VersionControl {
|
||||
let new_version = Arc::new(builder.remove_memtables(memtables_to_remove).build());
|
||||
|
||||
let mut version_data = self.data.write().unwrap();
|
||||
version_data.committed_sequence = if let Some(committed_in_edit) = committed_sequence {
|
||||
version_data.committed_sequence.max(committed_in_edit)
|
||||
} else {
|
||||
version_data.committed_sequence
|
||||
};
|
||||
version_data.version = new_version;
|
||||
}
|
||||
|
||||
|
||||
@@ -145,13 +145,18 @@ impl RegionWriteCtx {
|
||||
}
|
||||
|
||||
/// Push mutation to the context.
|
||||
/// This method adopts the sequence number in parameters if present.
|
||||
pub(crate) fn push_mutation(
|
||||
&mut self,
|
||||
op_type: i32,
|
||||
rows: Option<Rows>,
|
||||
write_hint: Option<WriteHint>,
|
||||
tx: OptionOutputTx,
|
||||
sequence: Option<SequenceNumber>,
|
||||
) {
|
||||
if let Some(sequence) = sequence {
|
||||
self.next_sequence = sequence;
|
||||
}
|
||||
let num_rows = rows.as_ref().map(|rows| rows.rows.len()).unwrap_or(0);
|
||||
self.wal_entry.mutations.push(Mutation {
|
||||
op_type,
|
||||
@@ -268,7 +273,15 @@ impl RegionWriteCtx {
|
||||
.set_sequence_and_entry_id(self.next_sequence - 1, self.next_entry_id - 1);
|
||||
}
|
||||
|
||||
pub(crate) fn push_bulk(&mut self, sender: OptionOutputTx, mut bulk: BulkPart) -> bool {
|
||||
pub(crate) fn push_bulk(
|
||||
&mut self,
|
||||
sender: OptionOutputTx,
|
||||
mut bulk: BulkPart,
|
||||
sequence: Option<SequenceNumber>,
|
||||
) -> bool {
|
||||
if let Some(sequence) = sequence {
|
||||
self.next_sequence = sequence;
|
||||
}
|
||||
bulk.sequence = self.next_sequence;
|
||||
let entry = match BulkWalEntry::try_from(&bulk) {
|
||||
Ok(entry) => entry,
|
||||
|
||||
@@ -382,7 +382,7 @@ impl WriteRequest {
|
||||
if column.column_schema.is_default_impure() {
|
||||
UnexpectedSnafu {
|
||||
reason: format!(
|
||||
"unexpected impure default value with region_id: {}, column: {}, default_value: {:?}",
|
||||
"unexpected impure default value with region_id: {}, column: {}, default_value: {:?}",
|
||||
self.region_id,
|
||||
column.column_schema.name,
|
||||
column.column_schema.default_constraint(),
|
||||
|
||||
@@ -211,6 +211,7 @@ pub(crate) fn apply_edit(
|
||||
compaction_time_window: None,
|
||||
flushed_entry_id: None,
|
||||
flushed_sequence: None,
|
||||
committed_sequence: None,
|
||||
}),
|
||||
&[],
|
||||
purger,
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
//! It updates the manifest and applies the changes to the region in background.
|
||||
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::num::NonZeroU64;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_telemetry::{info, warn};
|
||||
@@ -203,9 +204,16 @@ impl<S> RegionWorkerLoop<S> {
|
||||
|
||||
let RegionEditRequest {
|
||||
region_id: _,
|
||||
edit,
|
||||
mut edit,
|
||||
tx: sender,
|
||||
} = request;
|
||||
let file_sequence = region.version_control.committed_sequence() + 1;
|
||||
edit.committed_sequence = Some(file_sequence);
|
||||
|
||||
// For every file added through region edit, we should fill the file sequence
|
||||
for file in &mut edit.files_to_add {
|
||||
file.sequence = NonZeroU64::new(file_sequence);
|
||||
}
|
||||
|
||||
// Marks the region as editing.
|
||||
if let Err(e) = region.set_editing() {
|
||||
@@ -229,6 +237,7 @@ impl<S> RegionWorkerLoop<S> {
|
||||
result,
|
||||
}),
|
||||
};
|
||||
|
||||
// We don't set state back as the worker loop is already exited.
|
||||
if let Err(res) = request_sender
|
||||
.send(WorkerRequestWithTime::new(notify))
|
||||
|
||||
@@ -316,6 +316,7 @@ impl<S> RegionWorkerLoop<S> {
|
||||
Some(sender_req.request.rows),
|
||||
sender_req.request.hint,
|
||||
sender_req.sender,
|
||||
None,
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -401,7 +402,7 @@ impl<S> RegionWorkerLoop<S> {
|
||||
}
|
||||
|
||||
// Collect requests by region.
|
||||
if !region_ctx.push_bulk(bulk_req.sender, bulk_req.request) {
|
||||
if !region_ctx.push_bulk(bulk_req.sender, bulk_req.request, None) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -86,11 +86,11 @@ impl RegionEngine for MetaRegionEngine {
|
||||
None
|
||||
}
|
||||
|
||||
async fn get_last_seq_num(
|
||||
async fn get_committed_sequence(
|
||||
&self,
|
||||
_region_id: RegionId,
|
||||
) -> Result<Option<SequenceNumber>, BoxedError> {
|
||||
Ok(None)
|
||||
) -> Result<SequenceNumber, BoxedError> {
|
||||
Ok(SequenceNumber::default())
|
||||
}
|
||||
|
||||
async fn stop(&self) -> Result<(), BoxedError> {
|
||||
|
||||
@@ -34,9 +34,7 @@ use tokio::sync::Semaphore;
|
||||
|
||||
use crate::logstore::entry;
|
||||
use crate::metadata::RegionMetadataRef;
|
||||
use crate::region_request::{
|
||||
BatchRegionDdlRequest, RegionOpenRequest, RegionRequest, RegionSequencesRequest,
|
||||
};
|
||||
use crate::region_request::{BatchRegionDdlRequest, RegionOpenRequest, RegionRequest};
|
||||
use crate::storage::{RegionId, ScanRequest, SequenceNumber};
|
||||
|
||||
/// The settable region role state.
|
||||
@@ -746,25 +744,11 @@ pub trait RegionEngine: Send + Sync {
|
||||
request: RegionRequest,
|
||||
) -> Result<RegionResponse, BoxedError>;
|
||||
|
||||
/// Returns the last sequence number of the region.
|
||||
async fn get_last_seq_num(
|
||||
/// Returns the committed sequence (sequence of latest written data).
|
||||
async fn get_committed_sequence(
|
||||
&self,
|
||||
region_id: RegionId,
|
||||
) -> Result<Option<SequenceNumber>, BoxedError>;
|
||||
|
||||
async fn get_region_sequences(
|
||||
&self,
|
||||
seqs: RegionSequencesRequest,
|
||||
) -> Result<HashMap<u64, u64>, BoxedError> {
|
||||
let mut results = HashMap::with_capacity(seqs.region_ids.len());
|
||||
|
||||
for region_id in seqs.region_ids {
|
||||
let seq = self.get_last_seq_num(region_id).await?.unwrap_or_default();
|
||||
results.insert(region_id.as_u64(), seq);
|
||||
}
|
||||
|
||||
Ok(results)
|
||||
}
|
||||
) -> Result<SequenceNumber, BoxedError>;
|
||||
|
||||
/// Handles query and return a scanner that can be used to scan the region concurrently.
|
||||
async fn handle_query(
|
||||
|
||||
@@ -1377,12 +1377,6 @@ pub struct RegionCatchupRequest {
|
||||
pub checkpoint: Option<ReplayCheckpoint>,
|
||||
}
|
||||
|
||||
/// Get sequences of regions by region ids.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct RegionSequencesRequest {
|
||||
pub region_ids: Vec<RegionId>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct RegionBulkInsertsRequest {
|
||||
pub region_id: RegionId,
|
||||
|
||||
Reference in New Issue
Block a user