mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-05 21:02:58 +00:00
feat: Implements RegionWriter::alter (#292)
* fix(storage): Failure of writing manifest version won't abort applying edit * feat(storage): Adds RegionMetadata::validate_alter to validate AlterRequest * fix(storage): Protect write and apply region edit by version mutex The region meta action needs previous manifest version, so we need to use the version mutex to avoid other thread update the manifest version during writing the action to the manifest. * feat(storage): Implement RegionWriter::alter RegionWriter::alter() would 1. acquire write lock first 2. then validate the alter request 3. build the new metadata by RegionMetadata::alter() 4. acquire the version lock 5. write the metadata to the manifest, which also bump the manifest version 6. freeze mutable memtables and apply the new metadata to Version 7. write the manifest version to wal * test(storage): Add tests for Region::alter() * test(storage): Add tests for RegionMetadata::validate_alter * chore(storage): Modify InvalidAlterRequest error msg * chore: Adjust comment
This commit is contained in:
@@ -257,6 +257,18 @@ pub enum Error {
|
||||
#[snafu(backtrace)]
|
||||
source: datatypes::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid alter request, source: {}", source))]
|
||||
InvalidAlterRequest {
|
||||
#[snafu(backtrace)]
|
||||
source: MetadataError,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to alter metadata, source: {}", source))]
|
||||
AlterMetadata {
|
||||
#[snafu(backtrace)]
|
||||
source: MetadataError,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -267,7 +279,6 @@ impl ErrorExt for Error {
|
||||
|
||||
match self {
|
||||
InvalidScanIndex { .. }
|
||||
| InvalidRegionDesc { .. }
|
||||
| InvalidInputSchema { .. }
|
||||
| BatchMissingColumn { .. }
|
||||
| BatchMissingTimestamp { .. }
|
||||
@@ -288,7 +299,8 @@ impl ErrorExt for Error {
|
||||
| SequenceNotMonotonic { .. }
|
||||
| ConvertStoreSchema { .. }
|
||||
| InvalidRawRegion { .. }
|
||||
| FilterColumn { .. } => StatusCode::Unexpected,
|
||||
| FilterColumn { .. }
|
||||
| AlterMetadata { .. } => StatusCode::Unexpected,
|
||||
|
||||
FlushIo { .. }
|
||||
| WriteParquet { .. }
|
||||
@@ -306,6 +318,9 @@ impl ErrorExt for Error {
|
||||
| InvalidRegionState { .. }
|
||||
| ReadWal { .. } => StatusCode::StorageUnavailable,
|
||||
|
||||
InvalidAlterRequest { source, .. } | InvalidRegionDesc { source, .. } => {
|
||||
source.status_code()
|
||||
}
|
||||
PushBatch { source, .. } => source.status_code(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,7 +4,6 @@ use async_trait::async_trait;
|
||||
use common_telemetry::logging;
|
||||
use common_time::RangeMillis;
|
||||
use store_api::logstore::LogStore;
|
||||
use store_api::manifest::{Manifest, ManifestVersion, MetaAction};
|
||||
use store_api::storage::SequenceNumber;
|
||||
use uuid::Uuid;
|
||||
|
||||
@@ -16,7 +15,6 @@ use crate::memtable::{IterContext, MemtableId, MemtableRef};
|
||||
use crate::region::RegionWriterRef;
|
||||
use crate::region::SharedDataRef;
|
||||
use crate::sst::{AccessLayerRef, FileMeta, WriteOptions};
|
||||
use crate::version::VersionEdit;
|
||||
use crate::wal::Wal;
|
||||
|
||||
/// Default write buffer size (32M).
|
||||
@@ -195,25 +193,23 @@ impl<S: LogStore> FlushJob<S> {
|
||||
Ok(metas)
|
||||
}
|
||||
|
||||
async fn write_to_manifest(&self, file_metas: &[FileMeta]) -> Result<ManifestVersion> {
|
||||
async fn write_manifest_and_apply(&self, file_metas: &[FileMeta]) -> Result<()> {
|
||||
let edit = RegionEdit {
|
||||
region_version: self.shared.version_control.metadata().version(),
|
||||
flushed_sequence: self.flush_sequence,
|
||||
files_to_add: file_metas.to_vec(),
|
||||
files_to_remove: Vec::default(),
|
||||
};
|
||||
let prev_version = self.shared.version_control.current_manifest_version();
|
||||
|
||||
logging::debug!(
|
||||
"Write region edit: {:?} to manifest, prev_version: {}.",
|
||||
edit,
|
||||
prev_version,
|
||||
);
|
||||
|
||||
let mut action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit));
|
||||
action_list.set_prev_version(prev_version);
|
||||
|
||||
self.manifest.update(action_list).await
|
||||
self.writer
|
||||
.write_edit_and_apply(
|
||||
&self.wal,
|
||||
&self.shared,
|
||||
&self.manifest,
|
||||
edit,
|
||||
self.max_memtable_id,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Generates random SST file name in format: `^[a-f\d]{8}(-[a-f\d]{4}){3}-[a-f\d]{12}.parquet$`
|
||||
@@ -228,18 +224,7 @@ impl<S: LogStore> Job for FlushJob<S> {
|
||||
async fn run(&mut self, ctx: &Context) -> Result<()> {
|
||||
let file_metas = self.write_memtables_to_layer(ctx).await?;
|
||||
|
||||
let manifest_version = self.write_to_manifest(&file_metas).await?;
|
||||
|
||||
let edit = VersionEdit {
|
||||
files_to_add: file_metas,
|
||||
flushed_sequence: Some(self.flush_sequence),
|
||||
manifest_version,
|
||||
max_memtable_id: Some(self.max_memtable_id),
|
||||
};
|
||||
|
||||
self.writer
|
||||
.apply_version_edit(&self.wal, edit, &self.shared)
|
||||
.await?;
|
||||
self.write_manifest_and_apply(&file_metas).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -7,10 +7,10 @@ use serde::{Deserialize, Serialize};
|
||||
use snafu::{ensure, OptionExt};
|
||||
use store_api::storage::{
|
||||
consts::{self, ReservedColumnId},
|
||||
AlterRequest, ColumnDescriptor, ColumnDescriptorBuilder, ColumnFamilyDescriptor,
|
||||
ColumnFamilyDescriptorBuilder, ColumnFamilyId, ColumnId, RegionDescriptor,
|
||||
RegionDescriptorBuilder, RegionId, RegionMeta, RowKeyDescriptor, RowKeyDescriptorBuilder,
|
||||
Schema, SchemaRef,
|
||||
AddColumn, AlterOperation, AlterRequest, ColumnDescriptor, ColumnDescriptorBuilder,
|
||||
ColumnFamilyDescriptor, ColumnFamilyDescriptorBuilder, ColumnFamilyId, ColumnId,
|
||||
RegionDescriptor, RegionDescriptorBuilder, RegionId, RegionMeta, RowKeyDescriptor,
|
||||
RowKeyDescriptorBuilder, Schema, SchemaRef,
|
||||
};
|
||||
|
||||
use crate::manifest::action::{RawColumnFamiliesMetadata, RawColumnsMetadata, RawRegionMetadata};
|
||||
@@ -43,16 +43,46 @@ pub enum Error {
|
||||
#[snafu(display("Missing timestamp key column"))]
|
||||
MissingTimestamp { backtrace: Backtrace },
|
||||
|
||||
// Variants for validating `AlterRequest`, which won't have a backtrace.
|
||||
#[snafu(display("Expect altering metadata with version {}, given {}", expect, given))]
|
||||
InvalidVersion {
|
||||
InvalidAlterVersion {
|
||||
expect: VersionNumber,
|
||||
given: VersionNumber,
|
||||
backtrace: Backtrace,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to add column as there is already a column named {}", name))]
|
||||
AddExistColumn { name: String },
|
||||
|
||||
#[snafu(display("Failed to add a non null column {}", name))]
|
||||
AddNonNullColumn { name: String },
|
||||
|
||||
#[snafu(display("Failed to drop column as there is no column named {}", name))]
|
||||
DropAbsentColumn { name: String },
|
||||
|
||||
#[snafu(display("Failed to drop column {} as it is part of key", name))]
|
||||
DropKeyColumn { name: String },
|
||||
|
||||
#[snafu(display("Failed to drop column {} as it is an internal column", name))]
|
||||
DropInternalColumn { name: String },
|
||||
// End of variants for validating `AlterRequest`.
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
impl ErrorExt for Error {
|
||||
fn status_code(&self) -> StatusCode {
|
||||
StatusCode::InvalidArguments
|
||||
}
|
||||
|
||||
fn backtrace_opt(&self) -> Option<&Backtrace> {
|
||||
ErrorCompat::backtrace(self)
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn std::any::Any {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
/// Implementation of [RegionMeta].
|
||||
///
|
||||
/// Holds a snapshot of region metadata.
|
||||
@@ -122,16 +152,43 @@ impl RegionMetadata {
|
||||
self.schema.version()
|
||||
}
|
||||
|
||||
/// Returns a new [RegionMetadata] after alteration, leave `self` unchanged.
|
||||
pub fn alter(&self, req: &AlterRequest) -> Result<RegionMetadata> {
|
||||
/// Checks whether the `req` is valid, returns `Err` if it is invalid.
|
||||
pub fn validate_alter(&self, req: &AlterRequest) -> Result<()> {
|
||||
ensure!(
|
||||
req.version == self.version,
|
||||
InvalidVersionSnafu {
|
||||
InvalidAlterVersionSnafu {
|
||||
expect: req.version,
|
||||
given: self.version,
|
||||
}
|
||||
);
|
||||
|
||||
match &req.operation {
|
||||
AlterOperation::AddColumns { columns } => {
|
||||
for col in columns {
|
||||
self.validate_add_column(col)?;
|
||||
}
|
||||
}
|
||||
AlterOperation::DropColumns { names } => {
|
||||
for name in names {
|
||||
self.validate_drop_column(name)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns a new [RegionMetadata] after alteration, leave `self` unchanged.
|
||||
///
|
||||
/// Caller should use [RegionMetadata::validate_alter] to validate the `req` and
|
||||
/// ensure the version of the `req` is equal to the version of the metadata.
|
||||
///
|
||||
/// # Panics
|
||||
/// Panics if `req.version != self.version`.
|
||||
pub fn alter(&self, req: &AlterRequest) -> Result<RegionMetadata> {
|
||||
// The req should have been validated before.
|
||||
assert_eq!(req.version, self.version);
|
||||
|
||||
let mut desc = self.to_descriptor();
|
||||
// Apply the alter operation to the descriptor.
|
||||
req.operation.apply(&mut desc);
|
||||
@@ -141,6 +198,46 @@ impl RegionMetadata {
|
||||
.build()
|
||||
}
|
||||
|
||||
fn validate_add_column(&self, add_column: &AddColumn) -> Result<()> {
|
||||
// We don't check the case that the column is not nullable but default constraint is null. The
|
||||
// caller should guarantee this.
|
||||
ensure!(
|
||||
add_column.desc.is_nullable || add_column.desc.default_constraint.is_some(),
|
||||
AddNonNullColumnSnafu {
|
||||
name: &add_column.desc.name,
|
||||
}
|
||||
);
|
||||
|
||||
// Use the store schema to check the column as it contains all internal columns.
|
||||
let store_schema = self.schema.store_schema();
|
||||
ensure!(
|
||||
!store_schema.contains_column(&add_column.desc.name),
|
||||
AddExistColumnSnafu {
|
||||
name: &add_column.desc.name,
|
||||
}
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn validate_drop_column(&self, name: &str) -> Result<()> {
|
||||
let store_schema = self.schema.store_schema();
|
||||
ensure!(
|
||||
store_schema.contains_column(name),
|
||||
DropAbsentColumnSnafu { name }
|
||||
);
|
||||
ensure!(
|
||||
!store_schema.is_key_column(name),
|
||||
DropKeyColumnSnafu { name }
|
||||
);
|
||||
ensure!(
|
||||
store_schema.is_user_column(name),
|
||||
DropInternalColumnSnafu { name }
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn to_descriptor(&self) -> RegionDescriptor {
|
||||
let row_key = self.columns.to_row_key_descriptor();
|
||||
let mut builder = RegionDescriptorBuilder::default()
|
||||
@@ -948,7 +1045,6 @@ mod tests {
|
||||
"k2",
|
||||
ConcreteDataType::int32_datatype(),
|
||||
)
|
||||
.is_nullable(false)
|
||||
.build()
|
||||
.unwrap(),
|
||||
is_key: true,
|
||||
@@ -967,13 +1063,14 @@ mod tests {
|
||||
},
|
||||
version: 0,
|
||||
};
|
||||
metadata.validate_alter(&req).unwrap();
|
||||
let metadata = metadata.alter(&req).unwrap();
|
||||
|
||||
let builder: RegionMetadataBuilder = RegionDescBuilder::new(region_name)
|
||||
.enable_version_column(false)
|
||||
.push_key_column(("k1", LogicalTypeId::Int32, false))
|
||||
.push_value_column(("v1", LogicalTypeId::Float32, true))
|
||||
.push_key_column(("k2", LogicalTypeId::Int32, false))
|
||||
.push_key_column(("k2", LogicalTypeId::Int32, true))
|
||||
.push_value_column(("v2", LogicalTypeId::Float32, true))
|
||||
.build()
|
||||
.try_into()
|
||||
@@ -1020,4 +1117,118 @@ mod tests {
|
||||
let expect = builder.version(1).build().unwrap();
|
||||
assert_eq!(expect, metadata);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_validate_alter_request() {
|
||||
let builder = RegionDescBuilder::new("region-alter")
|
||||
.enable_version_column(false)
|
||||
.timestamp(("ts", LogicalTypeId::Timestamp, false))
|
||||
.push_key_column(("k0", LogicalTypeId::Int32, false))
|
||||
.push_value_column(("v0", LogicalTypeId::Float32, true))
|
||||
.push_value_column(("v1", LogicalTypeId::Float32, true));
|
||||
let last_column_id = builder.last_column_id();
|
||||
let metadata: RegionMetadata = builder.build().try_into().unwrap();
|
||||
|
||||
// Test request with different version.
|
||||
let mut req = AlterRequest {
|
||||
operation: AlterOperation::AddColumns {
|
||||
columns: vec![AddColumn {
|
||||
desc: ColumnDescriptorBuilder::new(
|
||||
last_column_id + 1,
|
||||
"k2",
|
||||
ConcreteDataType::int32_datatype(),
|
||||
)
|
||||
.build()
|
||||
.unwrap(),
|
||||
is_key: true,
|
||||
}],
|
||||
},
|
||||
version: 1,
|
||||
};
|
||||
assert!(matches!(
|
||||
metadata.validate_alter(&req).err().unwrap(),
|
||||
Error::InvalidAlterVersion { .. }
|
||||
));
|
||||
req.version = 0;
|
||||
|
||||
// Add existing column.
|
||||
req.operation = AlterOperation::AddColumns {
|
||||
columns: vec![AddColumn {
|
||||
desc: ColumnDescriptorBuilder::new(
|
||||
last_column_id + 1,
|
||||
"ts",
|
||||
ConcreteDataType::int32_datatype(),
|
||||
)
|
||||
.build()
|
||||
.unwrap(),
|
||||
is_key: false,
|
||||
}],
|
||||
};
|
||||
assert!(matches!(
|
||||
metadata.validate_alter(&req).err().unwrap(),
|
||||
Error::AddExistColumn { .. }
|
||||
));
|
||||
|
||||
// Add non null column.
|
||||
req.operation = AlterOperation::AddColumns {
|
||||
columns: vec![AddColumn {
|
||||
desc: ColumnDescriptorBuilder::new(
|
||||
last_column_id + 1,
|
||||
"v2",
|
||||
ConcreteDataType::int32_datatype(),
|
||||
)
|
||||
.is_nullable(false)
|
||||
.build()
|
||||
.unwrap(),
|
||||
is_key: false,
|
||||
}],
|
||||
};
|
||||
assert!(matches!(
|
||||
metadata.validate_alter(&req).err().unwrap(),
|
||||
Error::AddNonNullColumn { .. }
|
||||
));
|
||||
|
||||
// Drop absent column.
|
||||
let mut req = AlterRequest {
|
||||
operation: AlterOperation::DropColumns {
|
||||
names: vec![String::from("v2")],
|
||||
},
|
||||
version: 0,
|
||||
};
|
||||
assert!(matches!(
|
||||
metadata.validate_alter(&req).err().unwrap(),
|
||||
Error::DropAbsentColumn { .. }
|
||||
));
|
||||
|
||||
// Drop key column.
|
||||
req.operation = AlterOperation::DropColumns {
|
||||
names: vec![String::from("ts")],
|
||||
};
|
||||
assert!(matches!(
|
||||
metadata.validate_alter(&req).err().unwrap(),
|
||||
Error::DropKeyColumn { .. }
|
||||
));
|
||||
req.operation = AlterOperation::DropColumns {
|
||||
names: vec![String::from("k0")],
|
||||
};
|
||||
assert!(matches!(
|
||||
metadata.validate_alter(&req).err().unwrap(),
|
||||
Error::DropKeyColumn { .. }
|
||||
));
|
||||
|
||||
// Drop internal column.
|
||||
req.operation = AlterOperation::DropColumns {
|
||||
names: vec![String::from(consts::SEQUENCE_COLUMN_NAME)],
|
||||
};
|
||||
assert!(matches!(
|
||||
metadata.validate_alter(&req).err().unwrap(),
|
||||
Error::DropInternalColumn { .. }
|
||||
));
|
||||
|
||||
// Valid request
|
||||
req.operation = AlterOperation::DropColumns {
|
||||
names: vec![String::from("v0")],
|
||||
};
|
||||
metadata.validate_alter(&req).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
//! Region tests.
|
||||
|
||||
mod alter;
|
||||
mod basic;
|
||||
mod flush;
|
||||
mod projection;
|
||||
@@ -24,16 +25,16 @@ use crate::test_util::{
|
||||
};
|
||||
use crate::write_batch::PutData;
|
||||
|
||||
/// Create metadata of a region with schema: (timestamp, v1).
|
||||
/// Create metadata of a region with schema: (timestamp, v0).
|
||||
pub fn new_metadata(region_name: &str, enable_version_column: bool) -> RegionMetadata {
|
||||
let desc = RegionDescBuilder::new(region_name)
|
||||
.enable_version_column(enable_version_column)
|
||||
.push_value_column(("v1", LogicalTypeId::Int64, true))
|
||||
.push_value_column(("v0", LogicalTypeId::Int64, true))
|
||||
.build();
|
||||
desc.try_into().unwrap()
|
||||
}
|
||||
|
||||
/// Test region with schema (timestamp, v1).
|
||||
/// Test region with schema (timestamp, v0).
|
||||
pub struct TesterBase<S: LogStore> {
|
||||
pub region: RegionImpl<S>,
|
||||
write_ctx: WriteContext,
|
||||
@@ -51,7 +52,7 @@ impl<S: LogStore> TesterBase<S> {
|
||||
|
||||
/// Put without version specified.
|
||||
///
|
||||
/// Format of data: (timestamp, v1), timestamp is key, v1 is value.
|
||||
/// Format of data: (timestamp, v0), timestamp is key, v0 is value.
|
||||
pub async fn put(&self, data: &[(i64, Option<i64>)]) -> WriteResponse {
|
||||
let data: Vec<(Timestamp, Option<i64>)> =
|
||||
data.iter().map(|(l, r)| ((*l).into(), *r)).collect();
|
||||
@@ -98,7 +99,7 @@ fn new_write_batch_for_test(enable_version_column: bool) -> WriteBatch {
|
||||
&[
|
||||
(test_util::TIMESTAMP_NAME, LogicalTypeId::Timestamp, false),
|
||||
(consts::VERSION_COLUMN_NAME, LogicalTypeId::UInt64, false),
|
||||
("v1", LogicalTypeId::Int64, true),
|
||||
("v0", LogicalTypeId::Int64, true),
|
||||
],
|
||||
Some(0),
|
||||
)
|
||||
@@ -106,7 +107,7 @@ fn new_write_batch_for_test(enable_version_column: bool) -> WriteBatch {
|
||||
write_batch_util::new_write_batch(
|
||||
&[
|
||||
(test_util::TIMESTAMP_NAME, LogicalTypeId::Timestamp, false),
|
||||
("v1", LogicalTypeId::Int64, true),
|
||||
("v0", LogicalTypeId::Int64, true),
|
||||
],
|
||||
Some(0),
|
||||
)
|
||||
@@ -122,7 +123,7 @@ fn new_put_data(data: &[(Timestamp, Option<i64>)]) -> PutData {
|
||||
put_data
|
||||
.add_key_column(test_util::TIMESTAMP_NAME, Arc::new(timestamps))
|
||||
.unwrap();
|
||||
put_data.add_value_column("v1", Arc::new(values)).unwrap();
|
||||
put_data.add_value_column("v0", Arc::new(values)).unwrap();
|
||||
|
||||
put_data
|
||||
}
|
||||
@@ -149,7 +150,7 @@ async fn test_new_region() {
|
||||
let desc = RegionDescBuilder::new(region_name)
|
||||
.enable_version_column(true)
|
||||
.push_key_column(("k1", LogicalTypeId::Int32, false))
|
||||
.push_value_column(("v1", LogicalTypeId::Float32, true))
|
||||
.push_value_column(("v0", LogicalTypeId::Float32, true))
|
||||
.build();
|
||||
let metadata = desc.try_into().unwrap();
|
||||
|
||||
@@ -168,7 +169,7 @@ async fn test_new_region() {
|
||||
("k1", LogicalTypeId::Int32, false),
|
||||
(test_util::TIMESTAMP_NAME, LogicalTypeId::Timestamp, false),
|
||||
(consts::VERSION_COLUMN_NAME, LogicalTypeId::UInt64, false),
|
||||
("v1", LogicalTypeId::Float32, true),
|
||||
("v0", LogicalTypeId::Float32, true),
|
||||
],
|
||||
Some(1),
|
||||
);
|
||||
|
||||
144
src/storage/src/region/tests/alter.rs
Normal file
144
src/storage/src/region/tests/alter.rs
Normal file
@@ -0,0 +1,144 @@
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use log_store::fs::log::LocalFileLogStore;
|
||||
use store_api::storage::{
|
||||
AddColumn, AlterOperation, AlterRequest, ColumnDescriptor, ColumnDescriptorBuilder, ColumnId,
|
||||
Region, RegionMeta, SchemaRef,
|
||||
};
|
||||
use tempdir::TempDir;
|
||||
|
||||
use crate::region::tests::{self, FileTesterBase};
|
||||
use crate::region::RegionImpl;
|
||||
use crate::test_util::config_util;
|
||||
|
||||
const REGION_NAME: &str = "region-alter-0";
|
||||
|
||||
async fn create_region_for_alter(store_dir: &str) -> RegionImpl<LocalFileLogStore> {
|
||||
// Always disable version column in this test.
|
||||
let metadata = tests::new_metadata(REGION_NAME, false);
|
||||
|
||||
let store_config = config_util::new_store_config(REGION_NAME, store_dir).await;
|
||||
|
||||
RegionImpl::create(metadata, store_config).await.unwrap()
|
||||
}
|
||||
|
||||
/// Tester for region alter.
|
||||
struct AlterTester {
|
||||
base: Option<FileTesterBase>,
|
||||
}
|
||||
|
||||
impl AlterTester {
|
||||
async fn new(store_dir: &str) -> AlterTester {
|
||||
let region = create_region_for_alter(store_dir).await;
|
||||
|
||||
AlterTester {
|
||||
base: Some(FileTesterBase::with_region(region)),
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn base(&self) -> &FileTesterBase {
|
||||
self.base.as_ref().unwrap()
|
||||
}
|
||||
|
||||
fn schema(&self) -> SchemaRef {
|
||||
let metadata = self.base().region.in_memory_metadata();
|
||||
metadata.schema().clone()
|
||||
}
|
||||
|
||||
/// Put data with initial schema.
|
||||
async fn put_before_alter(&self, data: &[(i64, Option<i64>)]) {
|
||||
self.base().put(data).await;
|
||||
}
|
||||
|
||||
async fn alter(&self, mut req: AlterRequest) {
|
||||
let version = self.version();
|
||||
req.version = version;
|
||||
|
||||
self.base().region.alter(req).await.unwrap();
|
||||
}
|
||||
|
||||
fn version(&self) -> u32 {
|
||||
let metadata = self.base().region.in_memory_metadata();
|
||||
metadata.version()
|
||||
}
|
||||
}
|
||||
|
||||
fn new_column_desc(id: ColumnId, name: &str) -> ColumnDescriptor {
|
||||
ColumnDescriptorBuilder::new(id, name, ConcreteDataType::int64_datatype())
|
||||
.is_nullable(true)
|
||||
.build()
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
fn add_column_req(desc_and_is_key: &[(ColumnDescriptor, bool)]) -> AlterRequest {
|
||||
let columns = desc_and_is_key
|
||||
.iter()
|
||||
.map(|(desc, is_key)| AddColumn {
|
||||
desc: desc.clone(),
|
||||
is_key: *is_key,
|
||||
})
|
||||
.collect();
|
||||
let operation = AlterOperation::AddColumns { columns };
|
||||
|
||||
AlterRequest {
|
||||
operation,
|
||||
version: 0,
|
||||
}
|
||||
}
|
||||
|
||||
fn drop_column_req(names: &[&str]) -> AlterRequest {
|
||||
let names = names.iter().map(|s| s.to_string()).collect();
|
||||
let operation = AlterOperation::DropColumns { names };
|
||||
|
||||
AlterRequest {
|
||||
operation,
|
||||
version: 0,
|
||||
}
|
||||
}
|
||||
|
||||
fn check_schema_names(schema: &SchemaRef, names: &[&str]) {
|
||||
assert_eq!(names.len(), schema.num_columns());
|
||||
for (idx, name) in names.iter().enumerate() {
|
||||
assert_eq!(*name, schema.column_name_by_index(idx));
|
||||
assert!(schema.column_schema_by_name(name).is_some());
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_alter_region() {
|
||||
let dir = TempDir::new("alter-region").unwrap();
|
||||
let store_dir = dir.path().to_str().unwrap();
|
||||
let tester = AlterTester::new(store_dir).await;
|
||||
|
||||
let data = vec![(1000, Some(100)), (1001, Some(101)), (1002, Some(102))];
|
||||
|
||||
tester.put_before_alter(&data).await;
|
||||
|
||||
let schema = tester.schema();
|
||||
check_schema_names(&schema, &["timestamp", "v0"]);
|
||||
|
||||
let req = add_column_req(&[
|
||||
(new_column_desc(4, "k0"), true), // key column k0
|
||||
(new_column_desc(5, "v1"), false), // value column v1
|
||||
]);
|
||||
tester.alter(req).await;
|
||||
|
||||
let schema = tester.schema();
|
||||
check_schema_names(&schema, &["k0", "timestamp", "v0", "v1"]);
|
||||
|
||||
let req = add_column_req(&[
|
||||
(new_column_desc(6, "v2"), false),
|
||||
(new_column_desc(7, "v3"), false),
|
||||
]);
|
||||
tester.alter(req).await;
|
||||
|
||||
let schema = tester.schema();
|
||||
check_schema_names(&schema, &["k0", "timestamp", "v0", "v1", "v2", "v3"]);
|
||||
|
||||
// Remove v0, v1
|
||||
let req = drop_column_req(&["v0", "v1"]);
|
||||
tester.alter(req).await;
|
||||
|
||||
let schema = tester.schema();
|
||||
check_schema_names(&schema, &["k0", "timestamp", "v2", "v3"]);
|
||||
}
|
||||
@@ -5,12 +5,16 @@ use common_time::RangeMillis;
|
||||
use futures::TryStreamExt;
|
||||
use snafu::ResultExt;
|
||||
use store_api::logstore::LogStore;
|
||||
use store_api::storage::{AlterRequest, SequenceNumber, WriteContext, WriteRequest, WriteResponse};
|
||||
use store_api::manifest::{Manifest, ManifestVersion, MetaAction};
|
||||
use store_api::storage::{AlterRequest, WriteContext, WriteRequest, WriteResponse};
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use crate::background::JobHandle;
|
||||
use crate::error::{self, Result};
|
||||
use crate::flush::{FlushJob, FlushSchedulerRef, FlushStrategyRef};
|
||||
use crate::manifest::action::{
|
||||
RawRegionMetadata, RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList,
|
||||
};
|
||||
use crate::memtable::{Inserter, MemtableBuilderRef, MemtableId, MemtableSet};
|
||||
use crate::proto::wal::WalHeader;
|
||||
use crate::region::RegionManifest;
|
||||
@@ -27,6 +31,7 @@ pub type RegionWriterRef = Arc<RegionWriter>;
|
||||
/// Region writer manages all write operations to the region.
|
||||
#[derive(Debug)]
|
||||
pub struct RegionWriter {
|
||||
// To avoid dead lock, we need to ensure the lock order is: inner -> version_mutex.
|
||||
/// Inner writer guarded by write lock, the write lock is used to ensure
|
||||
/// all write operations are serialized.
|
||||
inner: Mutex<WriterInner>,
|
||||
@@ -57,66 +62,125 @@ impl RegionWriter {
|
||||
.await
|
||||
}
|
||||
|
||||
/// Apply version edit.
|
||||
pub async fn apply_version_edit<S: LogStore>(
|
||||
&self,
|
||||
wal: &Wal<S>,
|
||||
edit: VersionEdit,
|
||||
shared: &SharedDataRef,
|
||||
) -> Result<()> {
|
||||
// HACK: We won't acquire the write lock here because write stall would hold
|
||||
// write lock thus we have no chance to get the lock and apply the version edit.
|
||||
// So we add a version lock to ensure modification to `VersionControl` is
|
||||
// serialized.
|
||||
let version_control = &shared.version_control;
|
||||
|
||||
let _lock = self.version_mutex.lock().await;
|
||||
let next_sequence = version_control.committed_sequence() + 1;
|
||||
|
||||
self.persist_manifest_version(wal, next_sequence, &edit)
|
||||
.await?;
|
||||
|
||||
version_control.apply_edit(edit);
|
||||
|
||||
version_control.set_committed_sequence(next_sequence);
|
||||
|
||||
// TODO(yingwen): We should set the flush handle to `None`, but we can't acquire
|
||||
// write lock here.
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Replay data to memtables.
|
||||
pub async fn replay<S: LogStore>(&self, writer_ctx: WriterContext<'_, S>) -> Result<()> {
|
||||
let mut inner = self.inner.lock().await;
|
||||
inner.replay(&self.version_mutex, writer_ctx).await
|
||||
}
|
||||
|
||||
/// Write and apply the region edit.
|
||||
pub(crate) async fn write_edit_and_apply<S: LogStore>(
|
||||
&self,
|
||||
wal: &Wal<S>,
|
||||
shared: &SharedDataRef,
|
||||
manifest: &RegionManifest,
|
||||
edit: RegionEdit,
|
||||
max_memtable_id: MemtableId,
|
||||
) -> Result<()> {
|
||||
let _lock = self.version_mutex.lock().await;
|
||||
// HACK: We won't acquire the write lock here because write stall would hold
|
||||
// write lock thus we have no chance to get the lock and apply the version edit.
|
||||
// So we add a version lock to ensure modification to `VersionControl` is
|
||||
// serialized.
|
||||
let version_control = &shared.version_control;
|
||||
let prev_version = version_control.current_manifest_version();
|
||||
|
||||
logging::debug!(
|
||||
"Write region edit: {:?} to manifest, prev_version: {}.",
|
||||
edit,
|
||||
prev_version,
|
||||
);
|
||||
|
||||
let files_to_add = edit.files_to_add.clone();
|
||||
let flushed_sequence = edit.flushed_sequence;
|
||||
|
||||
// Persist the meta action.
|
||||
let mut action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit));
|
||||
action_list.set_prev_version(prev_version);
|
||||
let manifest_version = manifest.update(action_list).await?;
|
||||
|
||||
let version_edit = VersionEdit {
|
||||
files_to_add,
|
||||
flushed_sequence: Some(flushed_sequence),
|
||||
manifest_version,
|
||||
max_memtable_id: Some(max_memtable_id),
|
||||
};
|
||||
|
||||
// We could tolerate failure during persisting manifest version to the WAL, since it won't
|
||||
// affect how we applying the edit to the version.
|
||||
version_control.apply_edit(version_edit);
|
||||
// TODO(yingwen): We should set the flush handle to `None`, but we can't acquire
|
||||
// write lock here.
|
||||
|
||||
// Persist the manifest version to notify subscriber of the wal that the manifest has been
|
||||
// updated. This should be done at the end of the method.
|
||||
self.persist_manifest_version(wal, version_control, manifest_version)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Alter schema of the region.
|
||||
pub async fn alter<S: LogStore>(
|
||||
&self,
|
||||
_alter_ctx: AlterContext<'_, S>,
|
||||
_request: AlterRequest,
|
||||
alter_ctx: AlterContext<'_, S>,
|
||||
request: AlterRequest,
|
||||
) -> Result<()> {
|
||||
// TODO(yingwen): [alter] implements alter:
|
||||
// 1. acquire version lock
|
||||
// 2. validate request
|
||||
// 3. build schema based on new request
|
||||
// 4. persist it into the region manifest
|
||||
// 5. update version in VersionControl
|
||||
// To alter the schema, we need to acquire the write lock first, so we could
|
||||
// avoid other writers write to the region and switch the memtable safely.
|
||||
// Another potential benefit is that the write lock also protect against concurrent
|
||||
// alter request to the region.
|
||||
let _inner = self.inner.lock().await;
|
||||
|
||||
unimplemented!()
|
||||
let version_control = alter_ctx.version_control();
|
||||
|
||||
let old_metadata = version_control.metadata();
|
||||
old_metadata
|
||||
.validate_alter(&request)
|
||||
.context(error::InvalidAlterRequestSnafu)?;
|
||||
|
||||
// The write lock protects us against other alter request, so we could build the new
|
||||
// metadata struct outside of the version mutex.
|
||||
let new_metadata = old_metadata
|
||||
.alter(&request)
|
||||
.context(error::AlterMetadataSnafu)?;
|
||||
|
||||
let raw = RawRegionMetadata::from(&new_metadata);
|
||||
let mut action_list =
|
||||
RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange {
|
||||
metadata: raw,
|
||||
}));
|
||||
let new_metadata = Arc::new(new_metadata);
|
||||
|
||||
// Acquire the version lock before altering the metadata.
|
||||
let _lock = self.version_mutex.lock().await;
|
||||
|
||||
// Persist the meta action.
|
||||
let prev_version = version_control.current_manifest_version();
|
||||
action_list.set_prev_version(prev_version);
|
||||
let manifest_version = alter_ctx.manifest.update(action_list).await?;
|
||||
|
||||
// Now we could switch memtables and apply the new metadata to the version.
|
||||
version_control.freeze_mutable_and_apply_metadata(new_metadata, manifest_version);
|
||||
|
||||
self.persist_manifest_version(alter_ctx.wal, version_control, manifest_version)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Allocate a sequence and persist the manifest version using that sequence to the wal.
|
||||
///
|
||||
/// This method should be protected by the `version_mutex`.
|
||||
async fn persist_manifest_version<S: LogStore>(
|
||||
&self,
|
||||
wal: &Wal<S>,
|
||||
seq: SequenceNumber,
|
||||
edit: &VersionEdit,
|
||||
version_control: &VersionControlRef,
|
||||
manifest_version: ManifestVersion,
|
||||
) -> Result<()> {
|
||||
let header = WalHeader::with_last_manifest_version(edit.manifest_version);
|
||||
let next_sequence = version_control.committed_sequence() + 1;
|
||||
|
||||
wal.write_to_wal(seq, header, Payload::None).await?;
|
||||
let header = WalHeader::with_last_manifest_version(manifest_version);
|
||||
wal.write_to_wal(next_sequence, header, Payload::None)
|
||||
.await?;
|
||||
|
||||
version_control.set_committed_sequence(next_sequence);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -158,6 +222,13 @@ pub struct AlterContext<'a, S: LogStore> {
|
||||
pub manifest: &'a RegionManifest,
|
||||
}
|
||||
|
||||
impl<'a, S: LogStore> AlterContext<'a, S> {
|
||||
#[inline]
|
||||
fn version_control(&self) -> &VersionControlRef {
|
||||
&self.shared.version_control
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct WriterInner {
|
||||
memtable_builder: MemtableBuilderRef,
|
||||
|
||||
@@ -239,6 +239,24 @@ impl StoreSchema {
|
||||
Ok(Batch::new(columns))
|
||||
}
|
||||
|
||||
pub(crate) fn contains_column(&self, name: &str) -> bool {
|
||||
self.schema.column_schema_by_name(name).is_some()
|
||||
}
|
||||
|
||||
pub(crate) fn is_key_column(&self, name: &str) -> bool {
|
||||
self.schema
|
||||
.column_index_by_name(name)
|
||||
.map(|idx| idx < self.row_key_end)
|
||||
.unwrap_or(false)
|
||||
}
|
||||
|
||||
pub(crate) fn is_user_column(&self, name: &str) -> bool {
|
||||
self.schema
|
||||
.column_index_by_name(name)
|
||||
.map(|idx| idx < self.user_column_end)
|
||||
.unwrap_or(false)
|
||||
}
|
||||
|
||||
fn from_columns_metadata(columns: &ColumnsMetadata, version: u32) -> Result<StoreSchema> {
|
||||
let column_schemas: Vec<_> = columns
|
||||
.iter_all_columns()
|
||||
|
||||
@@ -102,11 +102,32 @@ impl VersionControl {
|
||||
version_to_update.commit();
|
||||
}
|
||||
|
||||
/// Apply [VersionEdit] to the version.
|
||||
pub fn apply_edit(&self, edit: VersionEdit) {
|
||||
let mut version_to_update = self.version.lock();
|
||||
version_to_update.apply_edit(edit);
|
||||
version_to_update.commit();
|
||||
}
|
||||
|
||||
/// Freeze all mutable memtables and then apply the new metadata to the version.
|
||||
pub fn freeze_mutable_and_apply_metadata(
|
||||
&self,
|
||||
metadata: RegionMetadataRef,
|
||||
manifest_version: ManifestVersion,
|
||||
) {
|
||||
let mut version_to_update = self.version.lock();
|
||||
|
||||
let memtable_version = version_to_update.memtables();
|
||||
// When applying metadata, mutable memtable set might be empty and there is no
|
||||
// need to freeze it.
|
||||
if !memtable_version.mutable_memtables().is_empty() {
|
||||
let freezed = memtable_version.freeze_mutable();
|
||||
version_to_update.memtables = Arc::new(freezed);
|
||||
}
|
||||
|
||||
version_to_update.apply_metadata(metadata, manifest_version);
|
||||
version_to_update.commit();
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -227,6 +248,29 @@ impl Version {
|
||||
self.ssts = Arc::new(merged_ssts);
|
||||
}
|
||||
|
||||
/// Updates metadata of the version.
|
||||
///
|
||||
/// # Panics
|
||||
/// Panics if `metadata.version() <= self.metadata.version()`.
|
||||
pub fn apply_metadata(
|
||||
&mut self,
|
||||
metadata: RegionMetadataRef,
|
||||
manifest_version: ManifestVersion,
|
||||
) {
|
||||
assert!(
|
||||
metadata.version() > self.metadata.version(),
|
||||
"Updating metadata from version {} to {} is not allowed",
|
||||
self.metadata.version(),
|
||||
metadata.version()
|
||||
);
|
||||
|
||||
if self.manifest_version < manifest_version {
|
||||
self.manifest_version = manifest_version;
|
||||
}
|
||||
|
||||
self.metadata = metadata;
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn manifest_version(&self) -> ManifestVersion {
|
||||
self.manifest_version
|
||||
|
||||
Reference in New Issue
Block a user