feat: region manifest checkpoint (#1202)

* chore: adds log when manifest protocol is changed

* chore: refactor region manifest

* temp commit

* feat: impl region manifest checkpoint

* feat: recover region version from manifest snapshot

* test: adds region snapshot test

* test: region manifest checkpoint

* test: alter region with manifest checkpoint

* fix: revert storage api

* feat: delete old snapshot

* refactor: manifest log storage

* Update src/storage/src/version.rs

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>

* Update src/storage/src/manifest/checkpoint.rs

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>

* Update src/storage/src/manifest/region.rs

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>

* Update src/storage/src/manifest/region.rs

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>

* chore: by CR comments

* refactor: by CR comments

* fix: typo

* chore: tweak start_version

---------

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
dennis zhuang
2023-03-27 11:15:52 +08:00
committed by GitHub
parent 15ee4ac729
commit 4f15b26b28
22 changed files with 994 additions and 71 deletions

18
Cargo.lock generated
View File

@@ -4622,7 +4622,7 @@ dependencies = [
"parking_lot",
"percent-encoding",
"pin-project",
"quick-xml",
"quick-xml 0.27.1",
"reqsign",
"reqwest",
"serde",
@@ -5687,6 +5687,16 @@ dependencies = [
"serde",
]
[[package]]
name = "quick-xml"
version = "0.28.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5c1a97b1bc42b1d550bfb48d4262153fe400a12bab1511821736f7eac76d7e2"
dependencies = [
"memchr",
"serde",
]
[[package]]
name = "quote"
version = "1.0.23"
@@ -5888,9 +5898,9 @@ dependencies = [
[[package]]
name = "reqsign"
version = "0.8.3"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ef4d5fefeaaa1e64f4aabb79da4ea68bf6d0e7935ad927728280d2a8e95735fc"
checksum = "a7db6d8d2cd7fa61403d14de670f98d7cedac38143681c124943d7bb69258b3a"
dependencies = [
"anyhow",
"backon 0.4.0",
@@ -5905,7 +5915,7 @@ dependencies = [
"log",
"once_cell",
"percent-encoding",
"quick-xml",
"quick-xml 0.28.1",
"rand",
"rsa",
"rust-ini",

View File

@@ -16,10 +16,33 @@
pub mod action;
use storage::manifest::ManifestImpl;
use store_api::manifest::action::{ProtocolAction, ProtocolVersion};
use store_api::manifest::{Checkpoint, ManifestVersion};
use crate::manifest::action::TableMetaActionList;
pub type TableManifest = ManifestImpl<TableMetaActionList>;
#[derive(Debug, Clone)]
pub struct NoopCheckpoint {}
impl Checkpoint for NoopCheckpoint {
type Error = storage::error::Error;
fn set_protocol(&mut self, _action: ProtocolAction) {}
fn last_version(&self) -> ManifestVersion {
unreachable!();
}
fn encode(&self) -> Result<Vec<u8>, Self::Error> {
unreachable!();
}
fn decode(_bs: &[u8], _reader_version: ProtocolVersion) -> Result<Self, Self::Error> {
unreachable!();
}
}
pub type TableManifest = ManifestImpl<NoopCheckpoint, TableMetaActionList>;
#[cfg(test)]
mod tests {
@@ -57,7 +80,7 @@ mod tests {
async fn test_table_manifest() {
let (_dir, object_store) = test_util::new_test_object_store("test_table_manifest").await;
let manifest = TableManifest::new("manifest/", object_store);
let manifest = TableManifest::new("manifest/", object_store, None);
let mut iter = manifest.scan(0, 100).await.unwrap();
assert!(iter.next_action().await.unwrap().is_none());

View File

@@ -471,7 +471,7 @@ impl<R: Region> MitoTable<R> {
regions: HashMap<RegionNumber, R>,
object_store: ObjectStore,
) -> Result<MitoTable<R>> {
let manifest = TableManifest::new(&table_manifest_dir(table_dir), object_store);
let manifest = TableManifest::new(&table_manifest_dir(table_dir), object_store, None);
// TODO(dennis): save manifest version into catalog?
let _manifest_version = manifest
@@ -487,7 +487,7 @@ impl<R: Region> MitoTable<R> {
}
pub(crate) fn build_manifest(table_dir: &str, object_store: ObjectStore) -> TableManifest {
TableManifest::new(&table_manifest_dir(table_dir), object_store)
TableManifest::new(&table_manifest_dir(table_dir), object_store, None)
}
pub(crate) async fn recover_table_info(

View File

@@ -354,7 +354,7 @@ impl<S: LogStore> EngineInner<S> {
let sst_dir = &region_sst_dir(&parent_dir, region_name);
let sst_layer = Arc::new(FsAccessLayer::new(sst_dir, self.object_store.clone()));
let manifest_dir = region_manifest_dir(&parent_dir, region_name);
let manifest = RegionManifest::new(&manifest_dir, self.object_store.clone());
let manifest = RegionManifest::with_checkpointer(&manifest_dir, self.object_store.clone());
let flush_strategy = write_buffer_size
.map(|size| Arc::new(SizeBasedStrategy::new(size)) as Arc<_>)

View File

@@ -440,6 +440,9 @@ pub enum Error {
#[snafu(backtrace)]
source: common_time::error::Error,
},
#[snafu(display("Failed to create a checkpoint: {}", msg))]
ManifestCheckpoint { msg: String, backtrace: Backtrace },
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -484,6 +487,7 @@ impl ErrorExt for Error {
| BatchCorrupted { .. }
| DecodeArrow { .. }
| EncodeArrow { .. }
| ManifestCheckpoint { .. }
| ParseSchema { .. } => StatusCode::Unexpected,
WriteParquet { .. }

View File

@@ -14,6 +14,7 @@
//! manifest storage
pub(crate) mod action;
pub mod checkpoint;
pub mod helper;
mod impl_;
pub mod region;

View File

@@ -12,13 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::io::{BufRead, BufReader};
use serde::{Deserialize, Serialize};
use serde_json as json;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::manifest::action::{ProtocolAction, ProtocolVersion, VersionHeader};
use store_api::manifest::{ManifestVersion, MetaAction};
use store_api::manifest::{Checkpoint, ManifestVersion, MetaAction};
use store_api::storage::{RegionId, SequenceNumber};
use crate::error::{
@@ -27,10 +28,10 @@ use crate::error::{
};
use crate::manifest::helper;
use crate::metadata::{ColumnFamilyMetadata, ColumnMetadata, VersionNumber};
use crate::sst::FileMeta;
use crate::sst::{FileId, FileMeta};
/// Minimal data that could be used to persist and recover [RegionMetadata](crate::metadata::RegionMetadata).
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
pub struct RawRegionMetadata {
pub id: RegionId,
pub name: String,
@@ -40,7 +41,7 @@ pub struct RawRegionMetadata {
}
/// Minimal data that could be used to persist and recover [ColumnsMetadata](crate::metadata::ColumnsMetadata).
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
pub struct RawColumnsMetadata {
pub columns: Vec<ColumnMetadata>,
pub row_key_end: usize,
@@ -50,7 +51,7 @@ pub struct RawColumnsMetadata {
}
/// Minimal data that could be used to persist and recover [ColumnFamiliesMetadata](crate::metadata::ColumnFamiliesMetadata).
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
pub struct RawColumnFamiliesMetadata {
pub column_families: Vec<ColumnFamilyMetadata>,
}
@@ -78,6 +79,111 @@ pub struct RegionEdit {
pub files_to_remove: Vec<FileMeta>,
}
/// The region version checkpoint
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct RegionVersion {
pub manifest_version: ManifestVersion,
pub flushed_sequence: Option<SequenceNumber>,
pub files: HashMap<FileId, FileMeta>,
}
/// The region manifest data checkpoint
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Default)]
pub struct RegionManifestData {
pub committed_sequence: SequenceNumber,
pub metadata: RawRegionMetadata,
pub version: Option<RegionVersion>,
}
#[derive(Debug, Default)]
pub struct RegionManifestDataBuilder {
committed_sequence: SequenceNumber,
metadata: RawRegionMetadata,
version: Option<RegionVersion>,
}
impl RegionManifestDataBuilder {
pub fn with_checkpoint(checkpoint: Option<RegionManifestData>) -> Self {
if let Some(s) = checkpoint {
Self {
metadata: s.metadata,
version: s.version,
committed_sequence: s.committed_sequence,
}
} else {
Default::default()
}
}
pub fn apply_change(&mut self, change: RegionChange) {
self.metadata = change.metadata;
self.committed_sequence = change.committed_sequence;
}
pub fn apply_edit(&mut self, manifest_version: ManifestVersion, edit: RegionEdit) {
if let Some(version) = &mut self.version {
version.manifest_version = manifest_version;
version.flushed_sequence = edit.flushed_sequence;
for file in edit.files_to_add {
version.files.insert(file.file_id, file);
}
for file in edit.files_to_remove {
version.files.remove(&file.file_id);
}
} else {
self.version = Some(RegionVersion {
manifest_version,
flushed_sequence: edit.flushed_sequence,
files: edit
.files_to_add
.into_iter()
.map(|f| (f.file_id, f))
.collect(),
});
}
}
pub fn build(self) -> RegionManifestData {
RegionManifestData {
metadata: self.metadata,
version: self.version,
committed_sequence: self.committed_sequence,
}
}
}
// The checkpoint of region manifest, generated by checkpoint.
#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
pub struct RegionCheckpoint {
/// The snasphot protocol
pub protocol: ProtocolAction,
/// The last manifest version that this checkpoint compacts(inclusive).
pub last_version: ManifestVersion,
// The number of manifest actions that this checkpoint compacts.
pub compacted_actions: usize,
// The checkpoint data
pub checkpoint: Option<RegionManifestData>,
}
impl Checkpoint for RegionCheckpoint {
type Error = error::Error;
fn set_protocol(&mut self, action: ProtocolAction) {
self.protocol = action;
}
fn last_version(&self) -> ManifestVersion {
self.last_version
}
fn encode(&self) -> Result<Vec<u8>> {
helper::encode_checkpoint(self)
}
fn decode(bs: &[u8], reader_version: ProtocolVersion) -> Result<Self> {
helper::decode_checkpoint(bs, reader_version)
}
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub enum RegionMetaAction {
Protocol(ProtocolAction),
@@ -181,10 +287,13 @@ impl MetaAction for RegionMetaActionList {
#[cfg(test)]
mod tests {
use common_telemetry::logging;
use datatypes::type_id::LogicalTypeId;
use super::*;
use crate::manifest::test_utils;
use crate::metadata::RegionMetadata;
use crate::sst::FileId;
use crate::test_util::descriptor_util::RegionDescBuilder;
#[test]
fn test_encode_decode_action_list() {
@@ -240,4 +349,87 @@ mod tests {
let protocol_action = r#"{"min_reader_version":1,"min_writer_version":2}"#;
serde_json::from_str::<ProtocolAction>(protocol_action).unwrap();
}
fn mock_file_meta() -> FileMeta {
FileMeta {
region_id: 0,
file_id: FileId::random(),
time_range: None,
level: 0,
file_size: 1024,
}
}
#[test]
fn test_region_manifest_builder() {
let desc = RegionDescBuilder::new("test_region_manifest_builder")
.enable_version_column(true)
.push_value_column(("v0", LogicalTypeId::Int64, true))
.build();
let region_metadata: RegionMetadata = desc.try_into().unwrap();
let mut builder = RegionManifestDataBuilder::with_checkpoint(None);
builder.apply_change(RegionChange {
committed_sequence: 42,
metadata: RawRegionMetadata::from(&region_metadata),
});
let files = vec![mock_file_meta(), mock_file_meta()];
builder.apply_edit(
84,
RegionEdit {
region_version: 0,
flushed_sequence: Some(99),
files_to_add: files.clone(),
files_to_remove: vec![],
},
);
builder.apply_edit(
85,
RegionEdit {
region_version: 0,
flushed_sequence: Some(100),
files_to_add: vec![],
files_to_remove: vec![files[0].clone()],
},
);
let manifest = builder.build();
assert_eq!(manifest.metadata, RawRegionMetadata::from(&region_metadata));
assert_eq!(manifest.committed_sequence, 42);
assert_eq!(
manifest.version,
Some(RegionVersion {
manifest_version: 85,
flushed_sequence: Some(100),
files: files[1..].iter().map(|f| (f.file_id, f.clone())).collect(),
})
);
}
#[test]
fn test_encode_decode_region_checkpoint() {
let region_checkpoint = RegionCheckpoint {
protocol: ProtocolAction::default(),
last_version: 42,
compacted_actions: 10,
checkpoint: Some(RegionManifestData {
committed_sequence: 100,
metadata: RawRegionMetadata::default(),
version: Some(RegionVersion {
manifest_version: 84,
flushed_sequence: Some(99),
files: vec![mock_file_meta(), mock_file_meta()]
.into_iter()
.map(|f| (f.file_id, f))
.collect(),
}),
}),
};
let bytes = region_checkpoint.encode().unwrap();
assert!(!bytes.is_empty());
let decoded_checkpoint = RegionCheckpoint::decode(&bytes, 0).unwrap();
assert_eq!(region_checkpoint, decoded_checkpoint);
}
}

View File

@@ -0,0 +1,35 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use async_trait::async_trait;
use store_api::manifest::{Checkpoint, MetaAction};
use crate::error::{Error, Result};
use crate::manifest::ManifestImpl;
#[async_trait]
pub trait Checkpointer: Send + Sync + std::fmt::Debug {
type Checkpoint: Checkpoint<Error = Error>;
type MetaAction: MetaAction<Error = Error>;
/// Try to create a checkpoint, return the checkpoint if successes.
async fn do_checkpoint(
&self,
manifest: &ManifestImpl<Self::Checkpoint, Self::MetaAction>,
) -> Result<Option<Self::Checkpoint>>;
fn as_any(&self) -> &dyn Any;
}

View File

@@ -16,11 +16,14 @@ use std::io::Write;
use serde::Serialize;
use serde_json::to_writer;
use snafu::ResultExt;
use store_api::manifest::action::VersionHeader;
use snafu::{ensure, ResultExt};
use store_api::manifest::action::{ProtocolVersion, VersionHeader};
use store_api::manifest::ManifestVersion;
use crate::error::{EncodeJsonSnafu, Result};
use crate::error::{
DecodeJsonSnafu, EncodeJsonSnafu, ManifestProtocolForbidReadSnafu, Result, Utf8Snafu,
};
use crate::manifest::action::RegionCheckpoint;
pub const NEWLINE: &[u8] = b"\n";
@@ -45,3 +48,22 @@ pub fn encode_actions<T: Serialize>(
Ok(bytes)
}
pub fn encode_checkpoint(snasphot: &RegionCheckpoint) -> Result<Vec<u8>> {
let s = serde_json::to_string(snasphot).context(EncodeJsonSnafu)?;
Ok(s.into_bytes())
}
pub fn decode_checkpoint(bs: &[u8], reader_version: ProtocolVersion) -> Result<RegionCheckpoint> {
let s = std::str::from_utf8(bs).context(Utf8Snafu)?;
let checkpoint: RegionCheckpoint = serde_json::from_str(s).context(DecodeJsonSnafu)?;
ensure!(
checkpoint.protocol.is_readable(reader_version),
ManifestProtocolForbidReadSnafu {
min_version: checkpoint.protocol.min_reader_version,
supported_version: reader_version,
}
);
Ok(checkpoint)
}

View File

@@ -18,41 +18,94 @@ use std::sync::Arc;
use arc_swap::ArcSwap;
use async_trait::async_trait;
use common_telemetry::logging;
use common_telemetry::{debug, logging, warn};
use object_store::ObjectStore;
use snafu::ensure;
use store_api::manifest::action::{self, ProtocolAction, ProtocolVersion};
use store_api::manifest::*;
use crate::error::{Error, ManifestProtocolForbidWriteSnafu, Result};
use crate::manifest::action::RegionCheckpoint;
use crate::manifest::checkpoint::Checkpointer;
use crate::manifest::storage::{ManifestObjectStore, ObjectStoreLogIterator};
// TODO(dennis): export this option to table options or storage options.
const CHECKPOINT_ACTIONS_MARGIN: u64 = 10;
#[derive(Clone, Debug)]
pub struct ManifestImpl<M: MetaAction<Error = Error>> {
inner: Arc<ManifestImplInner<M>>,
pub struct ManifestImpl<S: Checkpoint<Error = Error>, M: MetaAction<Error = Error>> {
inner: Arc<ManifestImplInner<S, M>>,
checkpointer: Option<Arc<dyn Checkpointer<Checkpoint = S, MetaAction = M>>>,
last_checkpoint_version: Arc<AtomicU64>,
}
impl<M: MetaAction<Error = Error>> ManifestImpl<M> {
pub fn new(manifest_dir: &str, object_store: ObjectStore) -> Self {
impl<S: Checkpoint<Error = Error>, M: MetaAction<Error = Error>> ManifestImpl<S, M> {
pub fn new(
manifest_dir: &str,
object_store: ObjectStore,
checkpointer: Option<Arc<dyn Checkpointer<Checkpoint = S, MetaAction = M>>>,
) -> Self {
ManifestImpl {
inner: Arc::new(ManifestImplInner::new(manifest_dir, object_store)),
checkpointer,
last_checkpoint_version: Arc::new(AtomicU64::new(MIN_VERSION)),
}
}
pub fn checkpointer(&self) -> &Option<Arc<dyn Checkpointer<Checkpoint = S, MetaAction = M>>> {
&self.checkpointer
}
pub fn set_last_checkpoint_version(&self, version: ManifestVersion) {
self.last_checkpoint_version
.store(version, Ordering::Relaxed);
}
/// Update inner state.
pub fn update_state(&self, version: ManifestVersion, protocol: Option<ProtocolAction>) {
self.inner.update_state(version, protocol);
}
pub async fn save_checkpoint(&self, checkpoint: &RegionCheckpoint) -> Result<()> {
ensure!(
checkpoint
.protocol
.is_writable(self.inner.supported_writer_version),
ManifestProtocolForbidWriteSnafu {
min_version: checkpoint.protocol.min_writer_version,
supported_version: self.inner.supported_writer_version,
}
);
let bytes = checkpoint.encode()?;
self.manifest_store()
.save_checkpoint(checkpoint.last_version, &bytes)
.await
}
#[inline]
pub fn manifest_store(&self) -> &Arc<ManifestObjectStore> {
self.inner.manifest_store()
}
}
#[async_trait]
impl<M: 'static + MetaAction<Error = Error>> Manifest for ManifestImpl<M> {
impl<S: 'static + Checkpoint<Error = Error>, M: 'static + MetaAction<Error = Error>> Manifest
for ManifestImpl<S, M>
{
type Error = Error;
type Checkpoint = S;
type MetaAction = M;
type MetaActionIterator = MetaActionIteratorImpl<M>;
async fn update(&self, action_list: M) -> Result<ManifestVersion> {
self.inner.save(action_list).await
let version = self.inner.save(action_list).await?;
if version - self.last_checkpoint_version.load(Ordering::Relaxed)
>= CHECKPOINT_ACTIONS_MARGIN
{
let s = self.do_checkpoint().await?;
debug!("Manifest checkpoint, checkpoint: {:#?}", s);
}
Ok(version)
}
async fn scan(
@@ -63,8 +116,20 @@ impl<M: 'static + MetaAction<Error = Error>> Manifest for ManifestImpl<M> {
self.inner.scan(start, end).await
}
async fn checkpoint(&self) -> Result<ManifestVersion> {
unimplemented!();
async fn do_checkpoint(&self) -> Result<Option<S>> {
if let Some(cp) = &self.checkpointer {
let checkpoint = cp.do_checkpoint(self).await?;
if let Some(checkpoint) = &checkpoint {
self.set_last_checkpoint_version(checkpoint.last_version());
}
return Ok(checkpoint);
}
Ok(None)
}
async fn last_checkpoint(&self) -> Result<Option<S>> {
self.inner.last_checkpoint().await
}
fn last_version(&self) -> ManifestVersion {
@@ -73,7 +138,7 @@ impl<M: 'static + MetaAction<Error = Error>> Manifest for ManifestImpl<M> {
}
#[derive(Debug)]
struct ManifestImplInner<M: MetaAction<Error = Error>> {
struct ManifestImplInner<S: Checkpoint<Error = Error>, M: MetaAction<Error = Error>> {
store: Arc<ManifestObjectStore>,
version: AtomicU64,
/// Current using protocol
@@ -81,7 +146,7 @@ struct ManifestImplInner<M: MetaAction<Error = Error>> {
/// Current node supported protocols (reader_version, writer_version)
supported_reader_version: ProtocolVersion,
supported_writer_version: ProtocolVersion,
_phantom: PhantomData<M>,
_phantom: PhantomData<(S, M)>,
}
pub struct MetaActionIteratorImpl<M: MetaAction<Error = Error>> {
@@ -118,7 +183,7 @@ impl<M: MetaAction<Error = Error>> MetaActionIterator for MetaActionIteratorImpl
}
}
impl<M: MetaAction<Error = Error>> ManifestImplInner<M> {
impl<S: Checkpoint<Error = Error>, M: MetaAction<Error = Error>> ManifestImplInner<S, M> {
fn new(manifest_dir: &str, object_store: ObjectStore) -> Self {
let (reader_version, writer_version) = action::supported_protocol_version();
@@ -132,6 +197,11 @@ impl<M: MetaAction<Error = Error>> ManifestImplInner<M> {
}
}
#[inline]
fn manifest_store(&self) -> &Arc<ManifestObjectStore> {
&self.store
}
#[inline]
fn inc_version(&self) -> ManifestVersion {
self.version.fetch_add(1, Ordering::Relaxed)
@@ -169,6 +239,12 @@ impl<M: MetaAction<Error = Error>> ManifestImplInner<M> {
};
action_list.set_protocol(new_protocol.clone());
logging::info!(
"Updated manifest protocol from {} to {}.",
protocol,
new_protocol
);
self.protocol.store(Arc::new(new_protocol));
}
@@ -195,4 +271,30 @@ impl<M: MetaAction<Error = Error>> ManifestImplInner<M> {
_phantom: PhantomData,
})
}
async fn last_checkpoint(&self) -> Result<Option<S>> {
let protocol = self.protocol.load();
let last_checkpoint = self.store.load_last_checkpoint().await?;
if let Some((version, bytes)) = last_checkpoint {
let checkpoint = S::decode(&bytes, protocol.min_reader_version)?;
assert!(checkpoint.last_version() >= version);
if checkpoint.last_version() > version {
// It happens when saving checkpoint successfully, but failed at saving checkpoint metadata(the "__last_checkpoint" file).
// Then we try to use the old checkpoint and do the checkpoint next time.
// If the old checkpoint was deleted, it's fine that we return the latest checkpoint.
// the only side effect is leaving some unused checkpoint checkpoint files.
// TODO(dennis): delete unused checkpoint files
warn!("The checkpoint manifest version {} in {} is greater than checkpoint metadata version {}.", self.store.path(), checkpoint.last_version(), version);
if let Some((_, bytes)) = self.store.load_checkpoint(version).await? {
let old_checkpoint = S::decode(&bytes, protocol.min_reader_version)?;
return Ok(Some(old_checkpoint));
}
}
Ok(Some(checkpoint))
} else {
Ok(None)
}
}
}

View File

@@ -13,10 +13,153 @@
// limitations under the License.
//! Region manifest impl
use std::any::Any;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use async_trait::async_trait;
use common_telemetry::info;
use object_store::ObjectStore;
use store_api::manifest::action::ProtocolAction;
use store_api::manifest::{
Manifest, ManifestLogStorage, ManifestVersion, MetaActionIterator, MIN_VERSION,
};
use crate::error::{ManifestCheckpointSnafu, Result};
use crate::manifest::action::*;
use crate::manifest::checkpoint::Checkpointer;
use crate::manifest::ManifestImpl;
pub type RegionManifest = ManifestImpl<RegionMetaActionList>;
pub type RegionManifest = ManifestImpl<RegionCheckpoint, RegionMetaActionList>;
#[derive(Debug)]
pub struct RegionManifestCheckpointer {
// The latest manifest version when flushing memtables.
// Checkpoint can't exceed over flushed manifest version because we have to keep
// the region metadata for replaying WAL to ensure correct data schema.
flushed_manifest_version: AtomicU64,
}
impl RegionManifestCheckpointer {
pub(crate) fn set_flushed_manifest_version(&self, manifest_version: ManifestVersion) {
self.flushed_manifest_version
.store(manifest_version, Ordering::Relaxed);
}
}
#[async_trait]
impl Checkpointer for RegionManifestCheckpointer {
type Checkpoint = RegionCheckpoint;
type MetaAction = RegionMetaActionList;
async fn do_checkpoint(
&self,
manifest: &ManifestImpl<RegionCheckpoint, RegionMetaActionList>,
) -> Result<Option<RegionCheckpoint>> {
let last_checkpoint = manifest.last_checkpoint().await?;
let current_version = manifest.last_version();
let (start_version, mut protocol, mut manifest_builder) =
if let Some(checkpoint) = last_checkpoint {
(
checkpoint.last_version + 1,
checkpoint.protocol,
RegionManifestDataBuilder::with_checkpoint(checkpoint.checkpoint),
)
} else {
(
MIN_VERSION,
ProtocolAction::default(),
RegionManifestDataBuilder::default(),
)
};
let end_version =
current_version.min(self.flushed_manifest_version.load(Ordering::Relaxed)) + 1;
if start_version >= end_version {
return Ok(None);
}
let mut iter = manifest.scan(start_version, end_version).await?;
let mut last_version = start_version;
let mut compacted_actions = 0;
while let Some((version, action_list)) = iter.next_action().await? {
for action in action_list.actions {
match action {
RegionMetaAction::Change(c) => manifest_builder.apply_change(c),
RegionMetaAction::Edit(e) => manifest_builder.apply_edit(version, e),
RegionMetaAction::Protocol(p) => protocol = p,
action => {
return ManifestCheckpointSnafu {
msg: format!("can't apply region action: {:?}", action),
}
.fail();
}
}
}
last_version = version;
compacted_actions += 1;
}
if compacted_actions == 0 {
return Ok(None);
}
let region_manifest = manifest_builder.build();
let checkpoint = RegionCheckpoint {
protocol,
last_version,
compacted_actions,
checkpoint: Some(region_manifest),
};
manifest.save_checkpoint(&checkpoint).await?;
// TODO(dennis): background task to clean old manifest actions and checkpoints.
manifest
.manifest_store()
.delete(start_version, last_version + 1)
.await?;
if start_version > MIN_VERSION {
manifest
.manifest_store()
.delete_checkpoint(start_version - 1)
.await?
}
info!("Region manifest checkpoint, start_version: {}, last_version: {}, compacted actions: {}", start_version, last_version, compacted_actions);
Ok(Some(checkpoint))
}
fn as_any(&self) -> &dyn Any {
self
}
}
impl RegionManifest {
pub fn with_checkpointer(manifest_dir: &str, object_store: ObjectStore) -> Self {
Self::new(
manifest_dir,
object_store,
Some(Arc::new(RegionManifestCheckpointer {
flushed_manifest_version: AtomicU64::new(0),
})),
)
}
// Update flushed manifest version in checkpointer
pub fn set_flushed_manifest_version(&self, manifest_version: ManifestVersion) {
if let Some(checkpointer) = self.checkpointer() {
if let Some(checkpointer) = checkpointer
.as_any()
.downcast_ref::<RegionManifestCheckpointer>()
{
checkpointer.set_flushed_manifest_version(manifest_version);
}
}
}
}
#[cfg(test)]
mod tests {
@@ -45,7 +188,7 @@ mod tests {
)
.finish();
let manifest = RegionManifest::new("/manifest/", object_store);
let manifest = RegionManifest::with_checkpointer("/manifest/", object_store);
let region_meta = Arc::new(build_region_meta());
@@ -136,4 +279,161 @@ mod tests {
// Reach end
assert!(iter.next_action().await.unwrap().is_none());
}
async fn assert_scan(manifest: &RegionManifest, start_version: ManifestVersion, expected: u64) {
let mut iter = manifest.scan(0, MAX_VERSION).await.unwrap();
let mut actions = 0;
while let Some((v, _)) = iter.next_action().await.unwrap() {
assert_eq!(v, start_version + actions);
actions += 1;
}
assert_eq!(expected, actions);
}
#[tokio::test]
async fn test_region_manifest_checkpoint() {
common_telemetry::init_default_ut_logging();
let tmp_dir = create_temp_dir("test_region_manifest_checkpoint");
let object_store = ObjectStore::new(
Fs::default()
.root(&tmp_dir.path().to_string_lossy())
.build()
.unwrap(),
)
.finish();
let manifest = RegionManifest::with_checkpointer("/manifest/", object_store);
let region_meta = Arc::new(build_region_meta());
let new_region_meta = Arc::new(build_altered_region_meta());
let file = FileId::random();
let file_ids = vec![FileId::random(), FileId::random()];
let actions: Vec<RegionMetaActionList> = vec![
RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange {
metadata: region_meta.as_ref().into(),
committed_sequence: 1,
})),
RegionMetaActionList::new(vec![
RegionMetaAction::Edit(build_region_edit(2, &[file], &[])),
RegionMetaAction::Edit(build_region_edit(3, &file_ids, &[file])),
]),
RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange {
metadata: new_region_meta.as_ref().into(),
committed_sequence: 99,
})),
];
for action in actions {
manifest.update(action).await.unwrap();
}
assert!(manifest.last_checkpoint().await.unwrap().is_none());
assert_scan(&manifest, 0, 3).await;
// update flushed manifest version for doing checkpoint
manifest.set_flushed_manifest_version(2);
// do a checkpoint
let checkpoint = manifest.do_checkpoint().await.unwrap().unwrap();
let last_checkpoint = manifest.last_checkpoint().await.unwrap().unwrap();
assert_eq!(checkpoint, last_checkpoint);
assert_eq!(checkpoint.compacted_actions, 3);
assert_eq!(checkpoint.last_version, 2);
let alterd_raw_meta = RawRegionMetadata::from(new_region_meta.as_ref());
assert!(matches!(&checkpoint.checkpoint, Some(RegionManifestData {
committed_sequence: 99,
metadata,
version: Some(RegionVersion {
manifest_version: 1,
flushed_sequence: Some(3),
files,
}),
}) if files.len() == 2 &&
files.contains_key(&file_ids[0]) &&
files.contains_key(&file_ids[1]) &&
*metadata == alterd_raw_meta));
// all actions were compacted
assert!(manifest
.scan(0, MAX_VERSION)
.await
.unwrap()
.next_action()
.await
.unwrap()
.is_none());
assert!(manifest.do_checkpoint().await.unwrap().is_none());
let last_checkpoint = manifest.last_checkpoint().await.unwrap().unwrap();
assert_eq!(checkpoint, last_checkpoint);
// add new actions
let new_file = FileId::random();
let actions: Vec<RegionMetaActionList> = vec![
RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange {
metadata: region_meta.as_ref().into(),
committed_sequence: 200,
})),
RegionMetaActionList::new(vec![RegionMetaAction::Edit(build_region_edit(
201,
&[new_file],
&file_ids,
))]),
];
for action in actions {
manifest.update(action).await.unwrap();
}
assert_scan(&manifest, 3, 2).await;
// do another checkpoints
// compacted RegionChange
manifest.set_flushed_manifest_version(3);
let checkpoint = manifest.do_checkpoint().await.unwrap().unwrap();
let last_checkpoint = manifest.last_checkpoint().await.unwrap().unwrap();
assert_eq!(checkpoint, last_checkpoint);
assert_eq!(checkpoint.compacted_actions, 1);
assert_eq!(checkpoint.last_version, 3);
assert!(matches!(&checkpoint.checkpoint, Some(RegionManifestData {
committed_sequence: 200,
metadata,
version: Some(RegionVersion {
manifest_version: 1,
flushed_sequence: Some(3),
files,
}),
}) if files.len() == 2 &&
files.contains_key(&file_ids[0]) &&
files.contains_key(&file_ids[1]) &&
*metadata == RawRegionMetadata::from(region_meta.as_ref())));
assert_scan(&manifest, 4, 1).await;
// compacted RegionEdit
manifest.set_flushed_manifest_version(4);
let checkpoint = manifest.do_checkpoint().await.unwrap().unwrap();
let last_checkpoint = manifest.last_checkpoint().await.unwrap().unwrap();
assert_eq!(checkpoint, last_checkpoint);
assert_eq!(checkpoint.compacted_actions, 1);
assert_eq!(checkpoint.last_version, 4);
assert!(matches!(&checkpoint.checkpoint, Some(RegionManifestData {
committed_sequence: 200,
metadata,
version: Some(RegionVersion {
manifest_version: 4,
flushed_sequence: Some(201),
files,
}),
}) if files.len() == 1 &&
files.contains_key(&new_file) &&
*metadata == RawRegionMetadata::from(region_meta.as_ref())));
// all actions were compacted
assert!(manifest
.scan(0, MAX_VERSION)
.await
.unwrap()
.next_action()
.await
.unwrap()
.is_none());
}
}

View File

@@ -98,13 +98,24 @@ impl ManifestObjectStore {
}
}
#[inline]
fn delta_file_path(&self, version: ManifestVersion) -> String {
format!("{}{}", self.path, delta_file(version))
}
#[inline]
fn checkpoint_file_path(&self, version: ManifestVersion) -> String {
format!("{}{}", self.path, checkpoint_file(version))
}
#[inline]
fn last_checkpoint_path(&self) -> String {
format!("{}{}", self.path, LAST_CHECKPOINT_FILE)
}
pub(crate) fn path(&self) -> &str {
&self.path
}
}
#[derive(Serialize, Deserialize, Debug)]
@@ -209,9 +220,7 @@ impl ManifestLogStorage for ManifestObjectStore {
path: object.path(),
})?;
let last_checkpoint = self
.object_store
.object(&format!("{}{}", self.path, LAST_CHECKPOINT_FILE));
let last_checkpoint = self.object_store.object(&self.last_checkpoint_path());
let checkpoint_metadata = CheckpointMetadata {
size: bytes.len(),
@@ -237,10 +246,35 @@ impl ManifestLogStorage for ManifestObjectStore {
Ok(())
}
async fn load_checkpoint(&self) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
let last_checkpoint = self
async fn load_checkpoint(
&self,
version: ManifestVersion,
) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
let checkpoint = self
.object_store
.object(&format!("{}{}", self.path, LAST_CHECKPOINT_FILE));
.object(&self.checkpoint_file_path(version));
Ok(Some((
version,
checkpoint.read().await.context(ReadObjectSnafu {
path: checkpoint.path(),
})?,
)))
}
async fn delete_checkpoint(&self, version: ManifestVersion) -> Result<()> {
let checkpoint = self
.object_store
.object(&self.checkpoint_file_path(version));
checkpoint.delete().await.context(DeleteObjectSnafu {
path: checkpoint.path(),
})?;
Ok(())
}
async fn load_last_checkpoint(&self) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
let last_checkpoint = self.object_store.object(&self.last_checkpoint_path());
let checkpoint_exists = last_checkpoint.is_exist().await.context(ReadObjectSnafu {
path: last_checkpoint.path(),
@@ -259,16 +293,7 @@ impl ManifestLogStorage for ManifestObjectStore {
checkpoint_metadata
);
let checkpoint = self
.object_store
.object(&self.checkpoint_file_path(checkpoint_metadata.version));
Ok(Some((
checkpoint_metadata.version,
checkpoint.read().await.context(ReadObjectSnafu {
path: checkpoint.path(),
})?,
)))
self.load_checkpoint(checkpoint_metadata.version).await
} else {
Ok(None)
}
@@ -333,13 +358,13 @@ mod tests {
assert!(it.next_log().await.unwrap().is_none());
// test checkpoint
assert!(log_store.load_checkpoint().await.unwrap().is_none());
assert!(log_store.load_last_checkpoint().await.unwrap().is_none());
log_store
.save_checkpoint(3, "checkpoint".as_bytes())
.await
.unwrap();
let (v, checkpoint) = log_store.load_checkpoint().await.unwrap().unwrap();
let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap();
assert_eq!(checkpoint, "checkpoint".as_bytes());
assert_eq!(3, v);
}

View File

@@ -32,6 +32,17 @@ pub fn build_region_meta() -> RegionMetadata {
desc.try_into().unwrap()
}
pub fn build_altered_region_meta() -> RegionMetadata {
let region_name = "region-0";
let desc = RegionDescBuilder::new(region_name)
.id(0)
.push_key_column(("k1", LogicalTypeId::Int32, false))
.push_value_column(("v1", LogicalTypeId::Float32, true))
.push_value_column(("v2", LogicalTypeId::Float32, true))
.build();
desc.try_into().unwrap()
}
pub fn build_region_edit(
sequence: SequenceNumber,
files_to_add: &[FileId],

View File

@@ -37,7 +37,7 @@ use crate::error::{self, Error, Result};
use crate::file_purger::FilePurgerRef;
use crate::flush::{FlushSchedulerRef, FlushStrategyRef};
use crate::manifest::action::{
RawRegionMetadata, RegionChange, RegionMetaAction, RegionMetaActionList,
RawRegionMetadata, RegionChange, RegionCheckpoint, RegionMetaAction, RegionMetaActionList,
};
use crate::manifest::region::RegionManifest;
use crate::memtable::MemtableBuilderRef;
@@ -330,16 +330,69 @@ impl<S: LogStore> RegionImpl<S> {
self.inner.shared.id()
}
fn create_version_with_checkpoint(
checkpoint: RegionCheckpoint,
memtable_builder: &MemtableBuilderRef,
sst_layer: &AccessLayerRef,
file_purger: &FilePurgerRef,
) -> Result<Option<Version>> {
if checkpoint.checkpoint.is_none() {
return Ok(None);
}
// Safety: it's safe to unwrap here, checking it above.
let s = checkpoint.checkpoint.unwrap();
let region = s.metadata.name.clone();
let region_metadata: RegionMetadata = s
.metadata
.try_into()
.context(error::InvalidRawRegionSnafu { region })?;
let memtable = memtable_builder.build(region_metadata.schema().clone());
let mut version = Version::with_manifest_version(
Arc::new(region_metadata),
checkpoint.last_version,
memtable,
sst_layer.clone(),
file_purger.clone(),
);
if let Some(v) = s.version {
version.apply_checkpoint(
v.flushed_sequence,
v.manifest_version,
v.files.into_values(),
);
}
Ok(Some(version))
}
async fn recover_from_manifest(
manifest: &RegionManifest,
memtable_builder: &MemtableBuilderRef,
sst_layer: &AccessLayerRef,
file_purger: &FilePurgerRef,
) -> Result<(Option<Version>, RecoveredMetadataMap)> {
let (start, end) = Self::manifest_scan_range();
let checkpoint = manifest.last_checkpoint().await?;
let (start, end, mut version) = if let Some(checkpoint) = checkpoint {
(
checkpoint.last_version + 1,
manifest::MAX_VERSION,
Self::create_version_with_checkpoint(
checkpoint,
memtable_builder,
sst_layer,
file_purger,
)?,
)
} else {
(manifest::MIN_VERSION, manifest::MAX_VERSION, None)
};
let mut iter = manifest.scan(start, end).await?;
let mut version = None;
let mut actions = Vec::new();
let mut last_manifest_version = manifest::MIN_VERSION;
let mut recovered_metadata = BTreeMap::new();
@@ -387,20 +440,16 @@ impl<S: LogStore> RegionImpl<S> {
assert!(actions.is_empty() || version.is_none());
if version.is_some() {
if let Some(version) = &version {
// update manifest state after recovering
let protocol = iter.last_protocol();
manifest.update_state(last_manifest_version + 1, protocol.clone());
manifest.set_flushed_manifest_version(version.manifest_version());
}
Ok((version, recovered_metadata))
}
fn manifest_scan_range() -> (ManifestVersion, ManifestVersion) {
// TODO(dennis): use manifest version in WAL
(manifest::MIN_VERSION, manifest::MAX_VERSION)
}
fn replay_edit(
manifest_version: ManifestVersion,
action: RegionMetaAction,

View File

@@ -32,6 +32,7 @@ use log_store::raft_engine::log_store::RaftEngineLogStore;
use log_store::NoopLogStore;
use object_store::services::Fs;
use object_store::{ObjectStore, ObjectStoreBuilder};
use store_api::manifest::MAX_VERSION;
use store_api::storage::{
consts, Chunk, ChunkReader, RegionMeta, ScanRequest, SequenceNumber, Snapshot, WriteRequest,
};
@@ -71,6 +72,12 @@ impl<S: LogStore> TesterBase<S> {
}
}
pub async fn checkpoint_manifest(&self) {
let manifest = &self.region.inner.manifest;
manifest.set_flushed_manifest_version(manifest.last_version() - 1);
manifest.do_checkpoint().await.unwrap().unwrap();
}
pub async fn close(&self) {
self.region.inner.wal.close().await.unwrap();
}
@@ -275,7 +282,8 @@ async fn test_new_region() {
#[tokio::test]
async fn test_recover_region_manifets() {
let tmp_dir = create_temp_dir("test_new_region");
common_telemetry::init_default_ut_logging();
let tmp_dir = create_temp_dir("test_recover_region_manifets");
let memtable_builder = Arc::new(DefaultMemtableBuilder::default()) as _;
let object_store = ObjectStore::new(
@@ -286,7 +294,7 @@ async fn test_recover_region_manifets() {
)
.finish();
let manifest = RegionManifest::new("/manifest/", object_store.clone());
let manifest = RegionManifest::with_checkpointer("/manifest/", object_store.clone());
let region_meta = Arc::new(build_region_meta());
let sst_layer = Arc::new(FsAccessLayer::new("sst", object_store)) as _;
@@ -351,9 +359,62 @@ async fn test_recover_region_manifets() {
.await
.unwrap();
assert_recovered_manifest(
version,
recovered_metadata,
&file_id_a,
&file_id_b,
&file_id_c,
&region_meta,
);
// do a manifest checkpoint
let checkpoint = manifest.do_checkpoint().await.unwrap().unwrap();
assert_eq!(1, checkpoint.last_version);
assert_eq!(2, checkpoint.compacted_actions);
assert_eq!(
manifest.last_checkpoint().await.unwrap().unwrap(),
checkpoint
);
// recover from checkpoint
let (version, recovered_metadata) = RegionImpl::<NoopLogStore>::recover_from_manifest(
&manifest,
&memtable_builder,
&sst_layer,
&file_purger,
)
.await
.unwrap();
assert_recovered_manifest(
version,
recovered_metadata,
&file_id_a,
&file_id_b,
&file_id_c,
&region_meta,
);
// check manifest state
assert_eq!(3, manifest.last_version());
let mut iter = manifest.scan(0, MAX_VERSION).await.unwrap();
let (version, action) = iter.next_action().await.unwrap().unwrap();
assert_eq!(2, version);
assert!(matches!(action.actions[0], RegionMetaAction::Change(..)));
assert!(iter.next_action().await.unwrap().is_none());
}
fn assert_recovered_manifest(
version: Option<Version>,
recovered_metadata: RecoveredMetadataMap,
file_id_a: &FileId,
file_id_b: &FileId,
file_id_c: &FileId,
region_meta: &Arc<RegionMetadata>,
) {
assert_eq!(42, *recovered_metadata.first_key_value().unwrap().0);
let version = version.unwrap();
assert_eq!(*version.metadata(), region_meta);
assert_eq!(*version.metadata(), *region_meta);
assert_eq!(version.flushed_sequence(), 2);
assert_eq!(version.manifest_version(), 1);
let ssts = version.ssts();
@@ -370,7 +431,4 @@ async fn test_recover_region_manifets() {
]),
files
);
// check manifest state
assert_eq!(3, manifest.last_version());
}

View File

@@ -22,8 +22,8 @@ use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, VectorRef};
use log_store::raft_engine::log_store::RaftEngineLogStore;
use store_api::storage::{
AddColumn, AlterOperation, AlterRequest, Chunk, ChunkReader, ColumnDescriptor,
ColumnDescriptorBuilder, ColumnId, Region, RegionMeta, ScanRequest, SchemaRef, Snapshot,
WriteRequest, WriteResponse,
ColumnDescriptorBuilder, ColumnId, FlushContext, Region, RegionMeta, ScanRequest, SchemaRef,
Snapshot, WriteRequest, WriteResponse,
};
use crate::region::tests::{self, FileTesterBase};
@@ -117,6 +117,15 @@ impl AlterTester {
self.base = Some(FileTesterBase::with_region(region));
}
async fn flush(&self, wait: Option<bool>) {
let ctx = wait.map(|wait| FlushContext { wait }).unwrap_or_default();
self.base().region.flush(&ctx).await.unwrap();
}
async fn checkpoint_manifest(&self) {
self.base().checkpoint_manifest().await
}
#[inline]
fn base(&self) -> &FileTesterBase {
self.base.as_ref().unwrap()
@@ -264,6 +273,11 @@ fn check_schema_names(schema: &SchemaRef, names: &[&str]) {
#[tokio::test]
async fn test_alter_region_with_reopen() {
test_alter_region_with_reopen0(true).await;
test_alter_region_with_reopen0(false).await;
}
async fn test_alter_region_with_reopen0(flush_and_checkpoint: bool) {
common_telemetry::init_default_ut_logging();
let dir = create_temp_dir("alter-region");
@@ -291,6 +305,11 @@ async fn test_alter_region_with_reopen() {
];
tester.put(&data).await;
if flush_and_checkpoint {
tester.flush(None).await;
tester.checkpoint_manifest().await;
}
// Scan with new schema before reopen.
let mut expect = vec![
DataRow::new(None, 1000, Some(100), None),
@@ -322,6 +341,11 @@ async fn test_alter_region_with_reopen() {
let req = drop_column_req(&["v2", "v3"]);
tester.alter(req).await;
if flush_and_checkpoint {
tester.flush(None).await;
tester.checkpoint_manifest().await;
}
// reopen and write again
tester.reopen().await;
let schema = tester.schema();

View File

@@ -132,6 +132,11 @@ impl RegionWriter {
action_list.set_prev_version(prev_version);
let manifest_version = manifest.update(action_list).await?;
// Notify checkpointer the flushed manifest version after flushing memtable
if flushed_sequence.is_some() {
manifest.set_flushed_manifest_version(manifest_version);
}
let version_edit = VersionEdit {
files_to_add,
files_to_remove,
@@ -260,7 +265,7 @@ impl RegionWriter {
Ok(())
}
/// Flush task manually
/// Flush task manually
pub async fn flush<S: LogStore>(
&self,
writer_ctx: WriterContext<'_, S>,

View File

@@ -46,7 +46,7 @@ pub async fn new_store_config(
let accessor = Builder::default().root(store_dir).build().unwrap();
let object_store = ObjectStore::new(accessor).finish();
let sst_layer = Arc::new(FsAccessLayer::new(&sst_dir, object_store.clone()));
let manifest = RegionManifest::new(&manifest_dir, object_store);
let manifest = RegionManifest::with_checkpointer(&manifest_dir, object_store);
let job_pool = Arc::new(JobPoolImpl {});
let flush_scheduler = Arc::new(FlushSchedulerImpl::new(job_pool));
let log_config = LogConfig {

View File

@@ -227,6 +227,25 @@ impl Version {
self.flushed_sequence
}
pub fn apply_checkpoint(
&mut self,
flushed_sequence: Option<SequenceNumber>,
manifest_version: ManifestVersion,
files: impl Iterator<Item = FileMeta>,
) {
self.flushed_sequence = flushed_sequence.unwrap_or(self.flushed_sequence);
self.manifest_version = manifest_version;
let ssts = self.ssts.merge(files, std::iter::empty());
info!(
"After applying checkpoint, region: {}, flushed_sequence: {}, manifest_version: {}",
self.metadata.id(),
self.flushed_sequence,
self.manifest_version,
);
self.ssts = Arc::new(ssts);
}
pub fn apply_edit(&mut self, edit: VersionEdit) {
let flushed_sequence = edit.flushed_sequence.unwrap_or(self.flushed_sequence);
if self.flushed_sequence < flushed_sequence {
@@ -249,7 +268,7 @@ impl Version {
.merge(handles_to_add, edit.files_to_remove.into_iter());
info!(
"After apply edit, region: {}, SST files: {:?}",
"After applying edit, region: {}, SST files: {:?}",
self.metadata.id(),
merged_ssts
);

View File

@@ -28,6 +28,7 @@ pub type ManifestVersion = u64;
pub const MIN_VERSION: u64 = 0;
pub const MAX_VERSION: u64 = u64::MAX;
/// The action to alter metadata
pub trait MetaAction: Serialize + DeserializeOwned + Send + Sync + Clone + std::fmt::Debug {
type Error: ErrorExt + Send + Sync;
@@ -47,6 +48,23 @@ pub trait MetaAction: Serialize + DeserializeOwned + Send + Sync + Clone + std::
reader_version: ProtocolVersion,
) -> Result<(Self, Option<ProtocolAction>), Self::Error>;
}
/// The checkpoint by checkpoint
pub trait Checkpoint: Send + Sync + Clone + std::fmt::Debug {
type Error: ErrorExt + Send + Sync;
/// Set a protocol action into checkpoint
fn set_protocol(&mut self, action: ProtocolAction);
/// The last compacted action's version of checkpoint
fn last_version(&self) -> ManifestVersion;
/// Encode this checkpoint into a byte vector
fn encode(&self) -> Result<Vec<u8>, Self::Error>;
/// Decode self from byte slice with reader protocol version,
/// return error when reader version is not supported.
fn decode(bs: &[u8], reader_version: ProtocolVersion) -> Result<Self, Self::Error>;
}
#[async_trait]
pub trait MetaActionIterator {
@@ -64,6 +82,7 @@ pub trait Manifest: Send + Sync + Clone + 'static {
type Error: ErrorExt + Send + Sync;
type MetaAction: MetaAction;
type MetaActionIterator: MetaActionIterator<Error = Self::Error, MetaAction = Self::MetaAction>;
type Checkpoint: Checkpoint;
/// Update metadata by the action
async fn update(&self, action: Self::MetaAction) -> Result<ManifestVersion, Self::Error>;
@@ -75,7 +94,12 @@ pub trait Manifest: Send + Sync + Clone + 'static {
end: ManifestVersion,
) -> Result<Self::MetaActionIterator, Self::Error>;
async fn checkpoint(&self) -> Result<ManifestVersion, Self::Error>;
/// Do a checkpoint, it will create a checkpoint and compact actions.
async fn do_checkpoint(&self) -> Result<Option<Self::Checkpoint>, Self::Error>;
/// Returns the last success checkpoint
async fn last_checkpoint(&self) -> Result<Option<Self::Checkpoint>, Self::Error>;
/// Returns the last(or latest) manifest version.
fn last_version(&self) -> ManifestVersion;
}

View File

@@ -39,6 +39,16 @@ pub struct ProtocolAction {
pub min_writer_version: ProtocolVersion,
}
impl std::fmt::Display for ProtocolAction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Protocol({}, {})",
&self.min_reader_version, &self.min_writer_version,
)
}
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct VersionHeader {
pub prev_version: ManifestVersion,

View File

@@ -51,5 +51,14 @@ pub trait ManifestLogStorage {
) -> Result<(), Self::Error>;
/// Load the latest checkpoint
async fn load_checkpoint(&self) -> Result<Option<(ManifestVersion, Vec<u8>)>, Self::Error>;
async fn load_last_checkpoint(&self)
-> Result<Option<(ManifestVersion, Vec<u8>)>, Self::Error>;
/// Delete the checkpoint by version
async fn delete_checkpoint(&self, version: ManifestVersion) -> Result<(), Self::Error>;
/// Load the checkpoint by version
async fn load_checkpoint(
&self,
version: ManifestVersion,
) -> Result<Option<(ManifestVersion, Vec<u8>)>, Self::Error>;
}