mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-24 08:50:40 +00:00
feat: manifest improvements (#303)
* feat: adds commited_sequence to RegionChange action, #281 * refactor: saving protocol action when writer version is changed * feat: recover all region medata in manifest and replay them when replaying WAL, #282 * refactor: minor change and test recovering metadata after altering table schema * fix: write wrong min_reader_version into manifest for region * refactor: move up DataRow * refactor: by CR comments * test: assert recovered metadata * refactor: by CR comments * fix: comment
This commit is contained in:
@@ -1,4 +1,5 @@
|
||||
//! Storage engine implementation.
|
||||
#![feature(map_first_last)]
|
||||
mod arrow_stream;
|
||||
mod background;
|
||||
mod chunk;
|
||||
|
||||
@@ -45,6 +45,7 @@ pub struct RawColumnFamiliesMetadata {
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
|
||||
pub struct RegionChange {
|
||||
pub committed_sequence: SequenceNumber,
|
||||
pub metadata: RawRegionMetadata,
|
||||
}
|
||||
|
||||
@@ -94,6 +95,11 @@ impl RegionMetaActionList {
|
||||
impl MetaAction for RegionMetaActionList {
|
||||
type Error = error::Error;
|
||||
|
||||
fn set_protocol(&mut self, action: ProtocolAction) {
|
||||
// The protocol action should be the first action in action list by convention.
|
||||
self.actions.insert(0, RegionMetaAction::Protocol(action));
|
||||
}
|
||||
|
||||
fn set_prev_version(&mut self, version: ManifestVersion) {
|
||||
self.prev_version = version;
|
||||
}
|
||||
|
||||
@@ -138,7 +138,7 @@ impl<M: MetaAction<Error = Error>> ManifestImplInner<M> {
|
||||
self.version.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
async fn save(&self, action_list: M) -> Result<ManifestVersion> {
|
||||
async fn save(&self, mut action_list: M) -> Result<ManifestVersion> {
|
||||
let protocol = self.protocol.load();
|
||||
|
||||
ensure!(
|
||||
@@ -151,6 +151,16 @@ impl<M: MetaAction<Error = Error>> ManifestImplInner<M> {
|
||||
|
||||
let version = self.inc_version();
|
||||
|
||||
if version == 0 || protocol.min_writer_version < self.supported_writer_version {
|
||||
let new_protocol = ProtocolAction {
|
||||
min_reader_version: self.supported_reader_version,
|
||||
min_writer_version: self.supported_writer_version,
|
||||
};
|
||||
action_list.set_protocol(new_protocol.clone());
|
||||
|
||||
self.protocol.store(Arc::new(new_protocol));
|
||||
}
|
||||
|
||||
logging::debug!(
|
||||
"Save region metadata action: {:?}, version: {}",
|
||||
action_list,
|
||||
|
||||
@@ -9,6 +9,7 @@ mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use object_store::{backend::fs, ObjectStore};
|
||||
use store_api::manifest::action::ProtocolAction;
|
||||
use store_api::manifest::{Manifest, MetaActionIterator, MAX_VERSION};
|
||||
use tempdir::TempDir;
|
||||
|
||||
@@ -45,6 +46,7 @@ mod tests {
|
||||
.update(RegionMetaActionList::with_action(RegionMetaAction::Change(
|
||||
RegionChange {
|
||||
metadata: region_meta.as_ref().into(),
|
||||
committed_sequence: 99,
|
||||
},
|
||||
)))
|
||||
.await
|
||||
@@ -54,8 +56,14 @@ mod tests {
|
||||
|
||||
let (v, action_list) = iter.next_action().await.unwrap().unwrap();
|
||||
assert_eq!(0, v);
|
||||
assert_eq!(1, action_list.actions.len());
|
||||
let action = &action_list.actions[0];
|
||||
assert_eq!(2, action_list.actions.len());
|
||||
let protocol = &action_list.actions[0];
|
||||
assert!(matches!(
|
||||
protocol,
|
||||
RegionMetaAction::Protocol(ProtocolAction { .. })
|
||||
));
|
||||
|
||||
let action = &action_list.actions[1];
|
||||
|
||||
match action {
|
||||
RegionMetaAction::Change(c) => {
|
||||
@@ -63,6 +71,7 @@ mod tests {
|
||||
RegionMetadata::try_from(c.metadata.clone()).unwrap(),
|
||||
*region_meta
|
||||
);
|
||||
assert_eq!(c.committed_sequence, 99);
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
@@ -79,14 +88,21 @@ mod tests {
|
||||
let mut iter = manifest.scan(0, MAX_VERSION).await.unwrap();
|
||||
let (v, action_list) = iter.next_action().await.unwrap().unwrap();
|
||||
assert_eq!(0, v);
|
||||
assert_eq!(1, action_list.actions.len());
|
||||
let action = &action_list.actions[0];
|
||||
assert_eq!(2, action_list.actions.len());
|
||||
let protocol = &action_list.actions[0];
|
||||
assert!(matches!(
|
||||
protocol,
|
||||
RegionMetaAction::Protocol(ProtocolAction { .. })
|
||||
));
|
||||
|
||||
let action = &action_list.actions[1];
|
||||
match action {
|
||||
RegionMetaAction::Change(c) => {
|
||||
assert_eq!(
|
||||
RegionMetadata::try_from(c.metadata.clone()).unwrap(),
|
||||
*region_meta
|
||||
);
|
||||
assert_eq!(c.committed_sequence, 99);
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
|
||||
@@ -1,25 +1,23 @@
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
mod writer;
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use common_telemetry::logging;
|
||||
use snafu::{ensure, ResultExt};
|
||||
use store_api::logstore::LogStore;
|
||||
use store_api::manifest::{
|
||||
self, action::ProtocolAction, Manifest, ManifestVersion, MetaActionIterator,
|
||||
};
|
||||
use store_api::manifest::{self, Manifest, ManifestVersion, MetaActionIterator};
|
||||
use store_api::storage::{
|
||||
AlterRequest, OpenOptions, ReadContext, Region, RegionId, RegionMeta, WriteContext,
|
||||
WriteResponse,
|
||||
AlterRequest, OpenOptions, ReadContext, Region, RegionId, RegionMeta, SequenceNumber,
|
||||
WriteContext, WriteResponse,
|
||||
};
|
||||
|
||||
use crate::error::{self, Error, Result};
|
||||
use crate::flush::{FlushSchedulerRef, FlushStrategyRef};
|
||||
use crate::manifest::{
|
||||
action::{RegionChange, RegionMetaAction, RegionMetaActionList},
|
||||
action::{RawRegionMetadata, RegionChange, RegionMetaAction, RegionMetaActionList},
|
||||
region::RegionManifest,
|
||||
};
|
||||
use crate::memtable::MemtableBuilderRef;
|
||||
@@ -28,7 +26,7 @@ pub use crate::region::writer::{AlterContext, RegionWriter, RegionWriterRef, Wri
|
||||
use crate::snapshot::SnapshotImpl;
|
||||
use crate::sst::AccessLayerRef;
|
||||
use crate::version::VersionEdit;
|
||||
use crate::version::{Version, VersionControl, VersionControlRef};
|
||||
use crate::version::{Version, VersionControl, VersionControlRef, INIT_COMMITTED_SEQUENCE};
|
||||
use crate::wal::Wal;
|
||||
use crate::write_batch::WriteBatch;
|
||||
|
||||
@@ -95,6 +93,8 @@ pub struct StoreConfig<S> {
|
||||
pub flush_strategy: FlushStrategyRef,
|
||||
}
|
||||
|
||||
pub type RecoveredMetadataMap = BTreeMap<SequenceNumber, (ManifestVersion, RawRegionMetadata)>;
|
||||
|
||||
impl<S: LogStore> RegionImpl<S> {
|
||||
/// Create a new region and also persist the region metadata to manifest.
|
||||
///
|
||||
@@ -108,12 +108,12 @@ impl<S: LogStore> RegionImpl<S> {
|
||||
// the manifest.
|
||||
let manifest_version = store_config
|
||||
.manifest
|
||||
.update(RegionMetaActionList::new(vec![
|
||||
RegionMetaAction::Protocol(ProtocolAction::new()),
|
||||
RegionMetaAction::Change(RegionChange {
|
||||
.update(RegionMetaActionList::with_action(RegionMetaAction::Change(
|
||||
RegionChange {
|
||||
metadata: metadata.as_ref().into(),
|
||||
}),
|
||||
]))
|
||||
committed_sequence: INIT_COMMITTED_SEQUENCE,
|
||||
},
|
||||
)))
|
||||
.await?;
|
||||
|
||||
let version = Version::with_manifest_version(metadata, manifest_version);
|
||||
@@ -156,10 +156,11 @@ impl<S: LogStore> RegionImpl<S> {
|
||||
_opts: &OpenOptions,
|
||||
) -> Result<Option<RegionImpl<S>>> {
|
||||
// Load version meta data from manifest.
|
||||
let version = match Self::recover_from_manifest(&store_config.manifest).await? {
|
||||
None => return Ok(None),
|
||||
Some(version) => version,
|
||||
};
|
||||
let (version, mut recovered_metadata) =
|
||||
match Self::recover_from_manifest(&store_config.manifest).await? {
|
||||
(None, _) => return Ok(None),
|
||||
(Some(v), m) => (v, m),
|
||||
};
|
||||
|
||||
logging::debug!(
|
||||
"Region recovered version from manifest, version: {:?}",
|
||||
@@ -167,7 +168,28 @@ impl<S: LogStore> RegionImpl<S> {
|
||||
);
|
||||
|
||||
let metadata = version.metadata().clone();
|
||||
let flushed_sequence = version.flushed_sequence();
|
||||
let version_control = Arc::new(VersionControl::with_version(version));
|
||||
|
||||
let recovered_metadata_after_flushed =
|
||||
recovered_metadata.split_off(&(flushed_sequence + 1));
|
||||
// apply the last flushed metadata
|
||||
if let Some((sequence, (manifest_version, metadata))) = recovered_metadata.pop_last() {
|
||||
let metadata = Arc::new(
|
||||
metadata
|
||||
.try_into()
|
||||
.context(error::InvalidRawRegionSnafu { region: &name })?,
|
||||
);
|
||||
version_control.freeze_mutable_and_apply_metadata(metadata, manifest_version);
|
||||
|
||||
logging::debug!(
|
||||
"Applied the last flushed metadata to region: {}, sequence: {}, manifest: {}",
|
||||
name,
|
||||
sequence,
|
||||
manifest_version,
|
||||
);
|
||||
}
|
||||
|
||||
let wal = Wal::new(metadata.id(), store_config.log_store);
|
||||
let shared = Arc::new(SharedData {
|
||||
id: metadata.id(),
|
||||
@@ -186,7 +208,9 @@ impl<S: LogStore> RegionImpl<S> {
|
||||
manifest: &store_config.manifest,
|
||||
};
|
||||
// Replay all unflushed data.
|
||||
writer.replay(writer_ctx).await?;
|
||||
writer
|
||||
.replay(recovered_metadata_after_flushed, writer_ctx)
|
||||
.await?;
|
||||
|
||||
let inner = Arc::new(RegionInner {
|
||||
shared,
|
||||
@@ -201,13 +225,17 @@ impl<S: LogStore> RegionImpl<S> {
|
||||
Ok(Some(RegionImpl { inner }))
|
||||
}
|
||||
|
||||
async fn recover_from_manifest(manifest: &RegionManifest) -> Result<Option<Version>> {
|
||||
async fn recover_from_manifest(
|
||||
manifest: &RegionManifest,
|
||||
) -> Result<(Option<Version>, RecoveredMetadataMap)> {
|
||||
let (start, end) = Self::manifest_scan_range();
|
||||
let mut iter = manifest.scan(start, end).await?;
|
||||
|
||||
let mut version = None;
|
||||
let mut actions = Vec::new();
|
||||
let mut last_manifest_version = manifest::MIN_VERSION;
|
||||
let mut recovered_metadata = BTreeMap::new();
|
||||
|
||||
while let Some((manifest_version, action_list)) = iter.next_action().await? {
|
||||
last_manifest_version = manifest_version;
|
||||
|
||||
@@ -227,8 +255,10 @@ impl<S: LogStore> RegionImpl<S> {
|
||||
version = Self::replay_edit(manifest_version, action, version);
|
||||
}
|
||||
}
|
||||
(RegionMetaAction::Change(_), Some(_)) => {
|
||||
unimplemented!("alter schema is not implemented")
|
||||
(RegionMetaAction::Change(c), Some(v)) => {
|
||||
recovered_metadata
|
||||
.insert(c.committed_sequence, (manifest_version, c.metadata));
|
||||
version = Some(v);
|
||||
}
|
||||
(action, None) => {
|
||||
actions.push((manifest_version, action));
|
||||
@@ -249,7 +279,7 @@ impl<S: LogStore> RegionImpl<S> {
|
||||
manifest.update_state(last_manifest_version + 1, protocol.clone());
|
||||
}
|
||||
|
||||
Ok(version)
|
||||
Ok((version, recovered_metadata))
|
||||
}
|
||||
|
||||
fn manifest_scan_range() -> (ManifestVersion, ManifestVersion) {
|
||||
@@ -351,6 +381,7 @@ impl<S: LogStore> RegionInner<S> {
|
||||
// FIXME(yingwen): [alter] The schema may be outdated.
|
||||
let metadata = self.in_memory_metadata();
|
||||
let schema = metadata.schema();
|
||||
|
||||
// Only compare column schemas.
|
||||
ensure!(
|
||||
schema.column_schemas() == request.schema().column_schemas(),
|
||||
|
||||
@@ -37,7 +37,7 @@ pub fn new_metadata(region_name: &str, enable_version_column: bool) -> RegionMet
|
||||
/// Test region with schema (timestamp, v0).
|
||||
pub struct TesterBase<S: LogStore> {
|
||||
pub region: RegionImpl<S>,
|
||||
write_ctx: WriteContext,
|
||||
pub write_ctx: WriteContext,
|
||||
pub read_ctx: ReadContext,
|
||||
}
|
||||
|
||||
@@ -197,6 +197,7 @@ async fn test_recover_region_manifets() {
|
||||
assert!(RegionImpl::<NoopLogStore>::recover_from_manifest(&manifest)
|
||||
.await
|
||||
.unwrap()
|
||||
.0
|
||||
.is_none());
|
||||
|
||||
{
|
||||
@@ -205,6 +206,7 @@ async fn test_recover_region_manifets() {
|
||||
.update(RegionMetaActionList::with_action(RegionMetaAction::Change(
|
||||
RegionChange {
|
||||
metadata: region_meta.as_ref().into(),
|
||||
committed_sequence: 40,
|
||||
},
|
||||
)))
|
||||
.await
|
||||
@@ -217,13 +219,26 @@ async fn test_recover_region_manifets() {
|
||||
]))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
manifest
|
||||
.update(RegionMetaActionList::with_action(RegionMetaAction::Change(
|
||||
RegionChange {
|
||||
metadata: region_meta.as_ref().into(),
|
||||
committed_sequence: 42,
|
||||
},
|
||||
)))
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
// try to recover
|
||||
let version = RegionImpl::<NoopLogStore>::recover_from_manifest(&manifest)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let (version, recovered_metadata) =
|
||||
RegionImpl::<NoopLogStore>::recover_from_manifest(&manifest)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(42, *recovered_metadata.first_key_value().unwrap().0);
|
||||
let version = version.unwrap();
|
||||
assert_eq!(*version.metadata(), region_meta);
|
||||
assert_eq!(version.flushed_sequence(), 2);
|
||||
assert_eq!(version.manifest_version(), 1);
|
||||
@@ -236,5 +251,5 @@ async fn test_recover_region_manifets() {
|
||||
assert!(version.mutable_memtables().is_empty());
|
||||
|
||||
// check manifest state
|
||||
assert_eq!(2, manifest.last_version());
|
||||
assert_eq!(3, manifest.last_version());
|
||||
}
|
||||
|
||||
@@ -1,14 +1,27 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_time::Timestamp;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::prelude::ScalarVector;
|
||||
use datatypes::type_id::LogicalTypeId;
|
||||
use datatypes::vectors::Int64Vector;
|
||||
use datatypes::vectors::TimestampVector;
|
||||
use log_store::fs::log::LocalFileLogStore;
|
||||
use store_api::storage::PutOperation;
|
||||
use store_api::storage::WriteRequest;
|
||||
use store_api::storage::{
|
||||
AddColumn, AlterOperation, AlterRequest, ColumnDescriptor, ColumnDescriptorBuilder, ColumnId,
|
||||
Region, RegionMeta, SchemaRef,
|
||||
Region, RegionMeta, SchemaRef, WriteResponse,
|
||||
};
|
||||
use tempdir::TempDir;
|
||||
|
||||
use crate::region::tests::{self, FileTesterBase};
|
||||
use crate::region::OpenOptions;
|
||||
use crate::region::RegionImpl;
|
||||
use crate::test_util::config_util;
|
||||
use crate::test_util::{self, write_batch_util};
|
||||
use crate::write_batch::PutData;
|
||||
use crate::write_batch::WriteBatch;
|
||||
|
||||
const REGION_NAME: &str = "region-alter-0";
|
||||
|
||||
@@ -23,18 +36,82 @@ async fn create_region_for_alter(store_dir: &str) -> RegionImpl<LocalFileLogStor
|
||||
|
||||
/// Tester for region alter.
|
||||
struct AlterTester {
|
||||
store_dir: String,
|
||||
base: Option<FileTesterBase>,
|
||||
}
|
||||
|
||||
struct DataRow {
|
||||
key: Option<i64>,
|
||||
ts: Timestamp,
|
||||
v0: Option<i64>,
|
||||
v1: Option<i64>,
|
||||
}
|
||||
|
||||
impl DataRow {
|
||||
fn new(key: Option<i64>, ts: i64, v0: Option<i64>, v1: Option<i64>) -> Self {
|
||||
DataRow {
|
||||
key,
|
||||
ts: ts.into(),
|
||||
v0,
|
||||
v1,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn new_write_batch_for_test() -> WriteBatch {
|
||||
write_batch_util::new_write_batch(
|
||||
&[
|
||||
("k0", LogicalTypeId::Int64, true),
|
||||
(test_util::TIMESTAMP_NAME, LogicalTypeId::Timestamp, false),
|
||||
("v0", LogicalTypeId::Int64, true),
|
||||
("v1", LogicalTypeId::Int64, true),
|
||||
],
|
||||
Some(1),
|
||||
)
|
||||
}
|
||||
|
||||
fn new_put_data(data: &[DataRow]) -> PutData {
|
||||
let mut put_data = PutData::with_num_columns(4);
|
||||
|
||||
let keys = Int64Vector::from_iter(data.iter().map(|v| v.key));
|
||||
let timestamps = TimestampVector::from_vec(data.iter().map(|v| v.ts).collect());
|
||||
let values1 = Int64Vector::from_iter(data.iter().map(|kv| kv.v0));
|
||||
let values2 = Int64Vector::from_iter(data.iter().map(|kv| kv.v1));
|
||||
|
||||
put_data.add_key_column("k0", Arc::new(keys)).unwrap();
|
||||
put_data
|
||||
.add_key_column(test_util::TIMESTAMP_NAME, Arc::new(timestamps))
|
||||
.unwrap();
|
||||
|
||||
put_data.add_value_column("v0", Arc::new(values1)).unwrap();
|
||||
put_data.add_value_column("v1", Arc::new(values2)).unwrap();
|
||||
|
||||
put_data
|
||||
}
|
||||
|
||||
impl AlterTester {
|
||||
async fn new(store_dir: &str) -> AlterTester {
|
||||
let region = create_region_for_alter(store_dir).await;
|
||||
|
||||
AlterTester {
|
||||
base: Some(FileTesterBase::with_region(region)),
|
||||
store_dir: store_dir.to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn reopen(&mut self) {
|
||||
// Close the old region.
|
||||
self.base = None;
|
||||
// Reopen the region.
|
||||
let store_config = config_util::new_store_config(REGION_NAME, &self.store_dir).await;
|
||||
let opts = OpenOptions::default();
|
||||
let region = RegionImpl::open(REGION_NAME.to_string(), store_config, &opts)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
self.base = Some(FileTesterBase::with_region(region));
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn base(&self) -> &FileTesterBase {
|
||||
self.base.as_ref().unwrap()
|
||||
@@ -45,6 +122,18 @@ impl AlterTester {
|
||||
metadata.schema().clone()
|
||||
}
|
||||
|
||||
async fn put(&self, data: &[DataRow]) -> WriteResponse {
|
||||
let mut batch = new_write_batch_for_test();
|
||||
let put_data = new_put_data(data);
|
||||
batch.put(put_data).unwrap();
|
||||
|
||||
self.base()
|
||||
.region
|
||||
.write(&self.base().write_ctx, batch)
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
/// Put data with initial schema.
|
||||
async fn put_before_alter(&self, data: &[(i64, Option<i64>)]) {
|
||||
self.base().put(data).await;
|
||||
@@ -61,6 +150,10 @@ impl AlterTester {
|
||||
let metadata = self.base().region.in_memory_metadata();
|
||||
metadata.version()
|
||||
}
|
||||
|
||||
async fn full_scan(&self) -> Vec<(i64, Option<i64>)> {
|
||||
self.base().full_scan().await
|
||||
}
|
||||
}
|
||||
|
||||
fn new_column_desc(id: ColumnId, name: &str) -> ColumnDescriptor {
|
||||
@@ -104,6 +197,66 @@ fn check_schema_names(schema: &SchemaRef, names: &[&str]) {
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_alter_region_with_reopen() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let dir = TempDir::new("alter-region").unwrap();
|
||||
let store_dir = dir.path().to_str().unwrap();
|
||||
let mut tester = AlterTester::new(store_dir).await;
|
||||
|
||||
let data = vec![(1000, Some(100)), (1001, Some(101)), (1002, Some(102))];
|
||||
|
||||
tester.put_before_alter(&data).await;
|
||||
assert_eq!(3, tester.full_scan().await.len());
|
||||
|
||||
let schema = tester.schema();
|
||||
check_schema_names(&schema, &["timestamp", "v0"]);
|
||||
|
||||
let req = add_column_req(&[
|
||||
(new_column_desc(4, "k0"), true), // key column k0
|
||||
(new_column_desc(5, "v1"), false), // value column v1
|
||||
]);
|
||||
tester.alter(req).await;
|
||||
|
||||
let schema = tester.schema();
|
||||
check_schema_names(&schema, &["k0", "timestamp", "v0", "v1"]);
|
||||
|
||||
let data = vec![
|
||||
DataRow::new(Some(10000), 1003, Some(103), Some(201)),
|
||||
DataRow::new(Some(10001), 1004, Some(104), Some(202)),
|
||||
DataRow::new(Some(10002), 1005, Some(105), Some(203)),
|
||||
];
|
||||
tester.put(&data).await;
|
||||
|
||||
tester.reopen().await;
|
||||
let data = vec![
|
||||
DataRow::new(Some(10003), 1006, Some(106), Some(204)),
|
||||
DataRow::new(Some(10004), 1007, Some(107), Some(205)),
|
||||
DataRow::new(Some(10005), 1008, Some(108), Some(206)),
|
||||
];
|
||||
|
||||
tester.put(&data).await;
|
||||
|
||||
// add columns,then remove them without writing data.
|
||||
let req = add_column_req(&[
|
||||
(new_column_desc(6, "v2"), false), // key column k0
|
||||
(new_column_desc(7, "v3"), false), // value column v1
|
||||
]);
|
||||
tester.alter(req).await;
|
||||
|
||||
let req = drop_column_req(&["v2", "v3"]);
|
||||
tester.alter(req).await;
|
||||
|
||||
// reopen and write again
|
||||
tester.reopen().await;
|
||||
let schema = tester.schema();
|
||||
check_schema_names(&schema, &["k0", "timestamp", "v0", "v1"]);
|
||||
|
||||
let data = vec![DataRow::new(Some(10006), 1009, Some(109), Some(207))];
|
||||
|
||||
tester.put(&data).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_alter_region() {
|
||||
let dir = TempDir::new("alter-region").unwrap();
|
||||
|
||||
@@ -17,8 +17,7 @@ use crate::manifest::action::{
|
||||
};
|
||||
use crate::memtable::{Inserter, MemtableBuilderRef, MemtableId, MemtableSet};
|
||||
use crate::proto::wal::WalHeader;
|
||||
use crate::region::RegionManifest;
|
||||
use crate::region::SharedDataRef;
|
||||
use crate::region::{RecoveredMetadataMap, RegionManifest, SharedDataRef};
|
||||
use crate::sst::AccessLayerRef;
|
||||
use crate::version::{VersionControlRef, VersionEdit};
|
||||
use crate::wal::{Payload, Wal};
|
||||
@@ -63,9 +62,15 @@ impl RegionWriter {
|
||||
}
|
||||
|
||||
/// Replay data to memtables.
|
||||
pub async fn replay<S: LogStore>(&self, writer_ctx: WriterContext<'_, S>) -> Result<()> {
|
||||
pub async fn replay<S: LogStore>(
|
||||
&self,
|
||||
recovered_metadata: RecoveredMetadataMap,
|
||||
writer_ctx: WriterContext<'_, S>,
|
||||
) -> Result<()> {
|
||||
let mut inner = self.inner.lock().await;
|
||||
inner.replay(&self.version_mutex, writer_ctx).await
|
||||
inner
|
||||
.replay(&self.version_mutex, recovered_metadata, writer_ctx)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Write and apply the region edit.
|
||||
@@ -144,15 +149,17 @@ impl RegionWriter {
|
||||
.context(error::AlterMetadataSnafu)?;
|
||||
|
||||
let raw = RawRegionMetadata::from(&new_metadata);
|
||||
let mut action_list =
|
||||
RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange {
|
||||
metadata: raw,
|
||||
}));
|
||||
let new_metadata = Arc::new(new_metadata);
|
||||
|
||||
// Acquire the version lock before altering the metadata.
|
||||
let _lock = self.version_mutex.lock().await;
|
||||
|
||||
let mut action_list =
|
||||
RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange {
|
||||
metadata: raw,
|
||||
committed_sequence: version_control.committed_sequence(),
|
||||
}));
|
||||
let new_metadata = Arc::new(new_metadata);
|
||||
|
||||
// Persist the meta action.
|
||||
let prev_version = version_control.current_manifest_version();
|
||||
action_list.set_prev_version(prev_version);
|
||||
@@ -175,6 +182,7 @@ impl RegionWriter {
|
||||
manifest_version: ManifestVersion,
|
||||
) -> Result<()> {
|
||||
let next_sequence = version_control.committed_sequence() + 1;
|
||||
version_control.set_committed_sequence(next_sequence);
|
||||
|
||||
let header = WalHeader::with_last_manifest_version(manifest_version);
|
||||
wal.write_to_wal(next_sequence, header, Payload::None)
|
||||
@@ -291,12 +299,15 @@ impl WriterInner {
|
||||
async fn replay<S: LogStore>(
|
||||
&mut self,
|
||||
version_mutex: &Mutex<()>,
|
||||
mut recovered_metadata: RecoveredMetadataMap,
|
||||
writer_ctx: WriterContext<'_, S>,
|
||||
) -> Result<()> {
|
||||
let version_control = writer_ctx.version_control();
|
||||
|
||||
let (flushed_sequence, mut last_sequence);
|
||||
let mut num_requests = 0;
|
||||
let mut num_recovered_metadata = 0;
|
||||
let mut next_apply_metadata = recovered_metadata.pop_first();
|
||||
{
|
||||
let _lock = version_mutex.lock().await;
|
||||
|
||||
@@ -307,6 +318,31 @@ impl WriterInner {
|
||||
// should be flushed_sequence + 1.
|
||||
let mut stream = writer_ctx.wal.read_from_wal(flushed_sequence + 1).await?;
|
||||
while let Some((req_sequence, _header, request)) = stream.try_next().await? {
|
||||
while let Some((next_apply_sequence, _)) = next_apply_metadata {
|
||||
if req_sequence >= next_apply_sequence {
|
||||
// It's safe to unwrap here. It's checked above.
|
||||
// Move out metadata to avoid cloning it.
|
||||
let (_, (manifest_version, metadata)) = next_apply_metadata.take().unwrap();
|
||||
version_control.freeze_mutable_and_apply_metadata(
|
||||
Arc::new(metadata.try_into().context(
|
||||
error::InvalidRawRegionSnafu {
|
||||
region: &writer_ctx.shared.name,
|
||||
},
|
||||
)?),
|
||||
manifest_version,
|
||||
);
|
||||
num_recovered_metadata += 1;
|
||||
logging::debug!("Applied metadata to region: {} when replaying WAL: sequence={} manifest={} ",
|
||||
writer_ctx.shared.name,
|
||||
next_apply_sequence,
|
||||
manifest_version);
|
||||
next_apply_metadata = recovered_metadata.pop_first();
|
||||
} else {
|
||||
// Keep the next_apply_metadata until req_sequence >= next_apply_sequence
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(request) = request {
|
||||
num_requests += 1;
|
||||
let time_ranges = self.prepare_memtables(&request, version_control)?;
|
||||
@@ -345,12 +381,13 @@ impl WriterInner {
|
||||
}
|
||||
|
||||
logging::info!(
|
||||
"Region replay finished, region_id: {}, region_name: {}, flushed_sequence: {}, last_sequence: {}, num_requests: {}",
|
||||
"Region replay finished, region_id: {}, region_name: {}, flushed_sequence: {}, last_sequence: {}, num_requests: {}, num_recovered_metadata: {}",
|
||||
writer_ctx.shared.id,
|
||||
writer_ctx.shared.name,
|
||||
flushed_sequence,
|
||||
last_sequence,
|
||||
num_requests,
|
||||
num_recovered_metadata,
|
||||
);
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -24,6 +24,8 @@ use crate::sync::CowCell;
|
||||
/// Default bucket duration: 2 Hours.
|
||||
const DEFAULT_BUCKET_DURATION: Duration = Duration::from_secs(3600 * 2);
|
||||
|
||||
pub const INIT_COMMITTED_SEQUENCE: u64 = 0;
|
||||
|
||||
/// Controls version of in memory state for a region.
|
||||
#[derive(Debug)]
|
||||
pub struct VersionControl {
|
||||
@@ -41,7 +43,7 @@ impl VersionControl {
|
||||
pub fn with_version(version: Version) -> VersionControl {
|
||||
VersionControl {
|
||||
version: CowCell::new(version),
|
||||
committed_sequence: AtomicU64::new(0),
|
||||
committed_sequence: AtomicU64::new(INIT_COMMITTED_SEQUENCE),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -17,6 +17,9 @@ pub const MAX_VERSION: u64 = u64::MAX;
|
||||
pub trait MetaAction: Serialize + DeserializeOwned + Send + Sync + Clone + std::fmt::Debug {
|
||||
type Error: ErrorExt + Send + Sync;
|
||||
|
||||
/// Set a protocol action into meta action
|
||||
fn set_protocol(&mut self, action: ProtocolAction);
|
||||
|
||||
/// Set previous valid manifest version.
|
||||
fn set_prev_version(&mut self, version: ManifestVersion);
|
||||
|
||||
|
||||
@@ -50,12 +50,10 @@ mod tests {
|
||||
|
||||
let protocol = ProtocolAction::new();
|
||||
let table_info = test_util::build_test_table_info();
|
||||
let action_list = TableMetaActionList::new(vec![
|
||||
TableMetaAction::Protocol(protocol.clone()),
|
||||
TableMetaAction::Change(Box::new(TableChange {
|
||||
let action_list =
|
||||
TableMetaActionList::new(vec![TableMetaAction::Change(Box::new(TableChange {
|
||||
table_info: table_info.clone(),
|
||||
})),
|
||||
]);
|
||||
}))]);
|
||||
|
||||
assert_eq!(0, manifest.update(action_list).await.unwrap());
|
||||
|
||||
|
||||
@@ -39,6 +39,13 @@ pub struct TableMetaActionList {
|
||||
}
|
||||
|
||||
impl TableMetaActionList {
|
||||
pub fn with_action(action: TableMetaAction) -> Self {
|
||||
Self {
|
||||
actions: vec![action],
|
||||
prev_version: 0,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new(actions: Vec<TableMetaAction>) -> Self {
|
||||
Self {
|
||||
actions,
|
||||
@@ -50,6 +57,11 @@ impl TableMetaActionList {
|
||||
impl MetaAction for TableMetaActionList {
|
||||
type Error = StorageError;
|
||||
|
||||
fn set_protocol(&mut self, action: ProtocolAction) {
|
||||
// The protocol action should be the first action in action list by convention.
|
||||
self.actions.insert(0, TableMetaAction::Protocol(action));
|
||||
}
|
||||
|
||||
fn set_prev_version(&mut self, version: ManifestVersion) {
|
||||
self.prev_version = version;
|
||||
}
|
||||
|
||||
@@ -21,7 +21,6 @@ use futures::task::{Context, Poll};
|
||||
use futures::Stream;
|
||||
use object_store::ObjectStore;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::manifest::action::ProtocolAction;
|
||||
use store_api::manifest::{self, Manifest, ManifestVersion, MetaActionIterator};
|
||||
use store_api::storage::{
|
||||
AddColumn, AlterOperation, AlterRequest, ChunkReader, ColumnDescriptorBuilder, PutOperation,
|
||||
@@ -249,12 +248,11 @@ impl<R: Region> Table for MitoTable<R> {
|
||||
new_info
|
||||
);
|
||||
self.manifest
|
||||
.update(TableMetaActionList::new(vec![
|
||||
TableMetaAction::Protocol(ProtocolAction::new()),
|
||||
TableMetaAction::Change(Box::new(TableChange {
|
||||
.update(TableMetaActionList::with_action(TableMetaAction::Change(
|
||||
Box::new(TableChange {
|
||||
table_info: new_info.clone(),
|
||||
})),
|
||||
]))
|
||||
}),
|
||||
)))
|
||||
.await
|
||||
.context(UpdateTableManifestSnafu {
|
||||
table_name: &self.table_info().name,
|
||||
@@ -414,12 +412,11 @@ impl<R: Region> MitoTable<R> {
|
||||
|
||||
// TODO(dennis): save manifest version into catalog?
|
||||
let _manifest_version = manifest
|
||||
.update(TableMetaActionList::new(vec![
|
||||
TableMetaAction::Protocol(ProtocolAction::new()),
|
||||
TableMetaAction::Change(Box::new(TableChange {
|
||||
.update(TableMetaActionList::with_action(TableMetaAction::Change(
|
||||
Box::new(TableChange {
|
||||
table_info: table_info.clone(),
|
||||
})),
|
||||
]))
|
||||
}),
|
||||
)))
|
||||
.await
|
||||
.context(UpdateTableManifestSnafu { table_name })?;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user