feat: Change signature of the Region::alter method (#287)

* feat: Change signature of the Region::alter method

* refactor: Add builders for ColumnsMetadata and ColumnFamiliesMetadata

* feat: Support altering the region metadata

Altering the region metadata is done in a copy-write fashion:
1. Convert the `RegionMetadata` into `RegionDescriptor` which is more
   convenient to mutate
2. Apply the `AlterOperation` to the `RegionDescriptor`. This would
   mutate the descriptor in-place
3. Create a `RegionMetadataBuilder` from the descriptor, bump the
   version and then build the new metadata

* feat: Implement altering table using the new Region::alter api

* refactor: Replaced wal name by region id

Region id is cheaper to clone than name

* chore: Remove pub(crate) of build_xxxx in engine mod

* style: fix clippy

* test: Add tests for AlterOperation and RegionMetadata::alter

* chore: ColumnsMetadataBuilder methods return &mut Self
This commit is contained in:
evenyag
2022-09-28 13:56:25 +08:00
committed by GitHub
parent 25078e821b
commit ed89cc3e21
13 changed files with 644 additions and 198 deletions

View File

@@ -8,7 +8,7 @@ use datatypes::arrow::error::ArrowError;
use serde_json::error::Error as JsonError;
use store_api::manifest::action::ProtocolVersion;
use store_api::manifest::ManifestVersion;
use store_api::storage::SequenceNumber;
use store_api::storage::{RegionId, SequenceNumber};
use crate::metadata::Error as MetadataError;
@@ -102,9 +102,13 @@ pub enum Error {
backtrace: Backtrace,
},
#[snafu(display("Failed to write WAL, WAL name: {}, source: {}", name, source))]
#[snafu(display(
"Failed to write WAL, WAL region_id: {}, source: {}",
region_id,
source
))]
WriteWal {
name: String,
region_id: RegionId,
#[snafu(backtrace)]
source: BoxedError,
},
@@ -187,16 +191,16 @@ pub enum Error {
backtrace: Backtrace,
},
#[snafu(display("Failed to read WAL, name: {}, source: {}", name, source))]
#[snafu(display("Failed to read WAL, region_id: {}, source: {}", region_id, source))]
ReadWal {
name: String,
region_id: RegionId,
#[snafu(backtrace)]
source: BoxedError,
},
#[snafu(display("WAL data corrupted, name: {}, message: {}", name, message))]
#[snafu(display("WAL data corrupted, region_id: {}, message: {}", region_id, message))]
WalDataCorrupted {
name: String,
region_id: RegionId,
message: String,
backtrace: Backtrace,
},

View File

@@ -7,8 +7,10 @@ use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt};
use store_api::storage::{
consts::{self, ReservedColumnId},
ColumnDescriptor, ColumnDescriptorBuilder, ColumnFamilyDescriptor, ColumnFamilyId, ColumnId,
ColumnSchema, RegionDescriptor, RegionId, RegionMeta, RowKeyDescriptor, Schema, SchemaRef,
AlterRequest, ColumnDescriptor, ColumnDescriptorBuilder, ColumnFamilyDescriptor,
ColumnFamilyDescriptorBuilder, ColumnFamilyId, ColumnId, RegionDescriptor,
RegionDescriptorBuilder, RegionId, RegionMeta, RowKeyDescriptor, RowKeyDescriptorBuilder,
Schema, SchemaRef,
};
use crate::manifest::action::{RawColumnFamiliesMetadata, RawColumnsMetadata, RawRegionMetadata};
@@ -40,6 +42,13 @@ pub enum Error {
#[snafu(display("Missing timestamp key column"))]
MissingTimestamp { backtrace: Backtrace },
#[snafu(display("Expect altering metadata with version {}, given {}", expect, given))]
InvalidVersion {
expect: VersionNumber,
given: VersionNumber,
backtrace: Backtrace,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -62,6 +71,10 @@ impl RegionMeta for RegionMetaImpl {
fn schema(&self) -> &SchemaRef {
self.metadata.user_schema()
}
fn version(&self) -> u32 {
self.metadata.version
}
}
pub type VersionNumber = u32;
@@ -108,6 +121,52 @@ impl RegionMetadata {
pub fn version(&self) -> u32 {
self.schema.version()
}
/// Returns a new [RegionMetadata] after alteration, leave `self` unchanged.
pub fn alter(&self, req: &AlterRequest) -> Result<RegionMetadata> {
ensure!(
req.version == self.version,
InvalidVersionSnafu {
expect: req.version,
given: self.version,
}
);
let mut desc = self.to_descriptor();
// Apply the alter operation to the descriptor.
req.operation.apply(&mut desc);
RegionMetadataBuilder::try_from(desc)?
.version(self.version + 1) // Bump the metadata version.
.build()
}
fn to_descriptor(&self) -> RegionDescriptor {
let row_key = self.columns.to_row_key_descriptor();
let mut builder = RegionDescriptorBuilder::default()
.id(self.id)
.name(&self.name)
.row_key(row_key);
for (cf_id, cf) in &self.column_families.id_to_cfs {
let mut cf_builder = ColumnFamilyDescriptorBuilder::default()
.cf_id(*cf_id)
.name(&cf.name);
for column in &self.columns.columns[cf.column_index_start..cf.column_index_end] {
cf_builder = cf_builder.push_column(column.desc.clone());
}
// It should always be able to build the descriptor back.
let desc = cf_builder.build().unwrap();
if *cf_id == consts::DEFAULT_CF_ID {
builder = builder.default_cf(desc);
} else {
builder = builder.push_extra_column_family(desc);
}
}
// We could ensure all fields are set here.
builder.build().unwrap()
}
}
pub type RegionMetadataRef = Arc<RegionMetadata>;
@@ -238,6 +297,20 @@ impl ColumnsMetadata {
pub fn column_metadata(&self, idx: usize) -> &ColumnMetadata {
&self.columns[idx]
}
fn to_row_key_descriptor(&self) -> RowKeyDescriptor {
let mut builder =
RowKeyDescriptorBuilder::default().enable_version_column(self.enable_version_column);
for (idx, column) in self.iter_row_key_columns().enumerate() {
// Not a timestamp column.
if idx != self.timestamp_key_index {
builder = builder.push_column(column.desc.clone());
}
}
builder = builder.timestamp(self.column_metadata(self.timestamp_key_index).desc.clone());
// Since the metadata is built from descriptor, so it should always be able to build the descriptor back.
builder.build().unwrap()
}
}
pub type ColumnsMetadataRef = Arc<ColumnsMetadata>;
@@ -315,12 +388,10 @@ pub struct ColumnFamilyMetadata {
pub column_index_end: usize,
}
impl TryFrom<RegionDescriptor> for RegionMetadata {
impl TryFrom<RegionDescriptor> for RegionMetadataBuilder {
type Error = Error;
fn try_from(desc: RegionDescriptor) -> Result<RegionMetadata> {
// Doesn't set version explicitly here, because this is a new region meta
// created from descriptor, using initial version is reasonable.
fn try_from(desc: RegionDescriptor) -> Result<RegionMetadataBuilder> {
let mut builder = RegionMetadataBuilder::new()
.name(desc.name)
.id(desc.id)
@@ -330,17 +401,25 @@ impl TryFrom<RegionDescriptor> for RegionMetadata {
builder = builder.add_column_family(cf)?;
}
Ok(builder)
}
}
impl TryFrom<RegionDescriptor> for RegionMetadata {
type Error = Error;
fn try_from(desc: RegionDescriptor) -> Result<RegionMetadata> {
// Doesn't set version explicitly here, because this is a new region meta
// created from descriptor, using initial version is reasonable.
let builder = RegionMetadataBuilder::try_from(desc)?;
builder.build()
}
}
// TODO(yingwen): Add a builder to build ColumnMetadata and refactor this builder.
#[derive(Default)]
struct RegionMetadataBuilder {
id: RegionId,
name: String,
struct ColumnsMetadataBuilder {
columns: Vec<ColumnMetadata>,
column_schemas: Vec<ColumnSchema>,
name_to_col_index: HashMap<String, usize>,
/// Column id set, used to validate column id uniqueness.
column_ids: HashSet<ColumnId>,
@@ -349,27 +428,10 @@ struct RegionMetadataBuilder {
row_key_end: usize,
timestamp_key_index: Option<usize>,
enable_version_column: bool,
id_to_cfs: HashMap<ColumnFamilyId, ColumnFamilyMetadata>,
cf_names: HashSet<String>,
}
impl RegionMetadataBuilder {
fn new() -> RegionMetadataBuilder {
RegionMetadataBuilder::default()
}
fn name(mut self, name: impl Into<String>) -> Self {
self.name = name.into();
self
}
fn id(mut self, id: RegionId) -> Self {
self.id = id;
self
}
fn row_key(mut self, key: RowKeyDescriptor) -> Result<Self> {
impl ColumnsMetadataBuilder {
fn row_key(&mut self, key: RowKeyDescriptor) -> Result<&mut Self> {
for col in key.columns {
self.push_row_key_column(col)?;
}
@@ -390,7 +452,77 @@ impl RegionMetadataBuilder {
Ok(self)
}
fn add_column_family(mut self, cf: ColumnFamilyDescriptor) -> Result<Self> {
fn push_row_key_column(&mut self, desc: ColumnDescriptor) -> Result<&mut Self> {
self.push_value_column(consts::KEY_CF_ID, desc)
}
fn push_value_column(
&mut self,
cf_id: ColumnFamilyId,
desc: ColumnDescriptor,
) -> Result<&mut Self> {
ensure!(
!is_internal_value_column(&desc.name),
ReservedColumnSnafu { name: &desc.name }
);
self.push_new_column(cf_id, desc)
}
fn push_new_column(
&mut self,
cf_id: ColumnFamilyId,
desc: ColumnDescriptor,
) -> Result<&mut Self> {
ensure!(
!self.name_to_col_index.contains_key(&desc.name),
ColNameExistsSnafu { name: &desc.name }
);
ensure!(
!self.column_ids.contains(&desc.id),
ColIdExistsSnafu { id: desc.id }
);
let column_name = desc.name.clone();
let column_id = desc.id;
let meta = ColumnMetadata { cf_id, desc };
let column_index = self.columns.len();
self.columns.push(meta);
self.name_to_col_index.insert(column_name, column_index);
self.column_ids.insert(column_id);
Ok(self)
}
fn build(mut self) -> Result<ColumnsMetadata> {
let timestamp_key_index = self.timestamp_key_index.context(MissingTimestampSnafu)?;
let user_column_end = self.columns.len();
// Setup internal columns.
for internal_desc in internal_column_descs() {
self.push_new_column(consts::DEFAULT_CF_ID, internal_desc)?;
}
Ok(ColumnsMetadata {
columns: self.columns,
name_to_col_index: self.name_to_col_index,
row_key_end: self.row_key_end,
timestamp_key_index,
enable_version_column: self.enable_version_column,
user_column_end,
})
}
}
#[derive(Default)]
struct ColumnFamiliesMetadataBuilder {
id_to_cfs: HashMap<ColumnFamilyId, ColumnFamilyMetadata>,
cf_names: HashSet<String>,
}
impl ColumnFamiliesMetadataBuilder {
fn add_column_family(&mut self, cf: ColumnFamilyMetadata) -> Result<&mut Self> {
ensure!(
!self.id_to_cfs.contains_key(&cf.cf_id),
CfIdExistsSnafu { id: cf.cf_id }
@@ -401,12 +533,68 @@ impl RegionMetadataBuilder {
CfNameExistsSnafu { name: &cf.name }
);
let column_index_start = self.columns.len();
let column_index_end = column_index_start + cf.columns.len();
for col in cf.columns {
self.push_value_column(cf.cf_id, col)?;
}
self.cf_names.insert(cf.name.clone());
self.id_to_cfs.insert(cf.cf_id, cf);
Ok(self)
}
fn build(self) -> ColumnFamiliesMetadata {
ColumnFamiliesMetadata {
id_to_cfs: self.id_to_cfs,
}
}
}
struct RegionMetadataBuilder {
id: RegionId,
name: String,
columns_meta_builder: ColumnsMetadataBuilder,
cfs_meta_builder: ColumnFamiliesMetadataBuilder,
version: VersionNumber,
}
impl Default for RegionMetadataBuilder {
fn default() -> RegionMetadataBuilder {
RegionMetadataBuilder::new()
}
}
impl RegionMetadataBuilder {
fn new() -> RegionMetadataBuilder {
RegionMetadataBuilder {
id: 0,
name: String::new(),
columns_meta_builder: ColumnsMetadataBuilder::default(),
cfs_meta_builder: ColumnFamiliesMetadataBuilder::default(),
version: Schema::INITIAL_VERSION,
}
}
fn name(mut self, name: impl Into<String>) -> Self {
self.name = name.into();
self
}
fn id(mut self, id: RegionId) -> Self {
self.id = id;
self
}
fn version(mut self, version: VersionNumber) -> Self {
self.version = version;
self
}
fn row_key(mut self, key: RowKeyDescriptor) -> Result<Self> {
self.columns_meta_builder.row_key(key)?;
Ok(self)
}
fn add_column_family(mut self, cf: ColumnFamilyDescriptor) -> Result<Self> {
let column_index_start = self.columns_meta_builder.columns.len();
let column_index_end = column_index_start + cf.columns.len();
let cf_meta = ColumnFamilyMetadata {
name: cf.name.clone(),
cf_id: cf.cf_id,
@@ -414,85 +602,29 @@ impl RegionMetadataBuilder {
column_index_end,
};
self.id_to_cfs.insert(cf.cf_id, cf_meta);
self.cf_names.insert(cf.name);
self.cfs_meta_builder.add_column_family(cf_meta)?;
for col in cf.columns {
self.columns_meta_builder.push_value_column(cf.cf_id, col)?;
}
Ok(self)
}
fn build(mut self) -> Result<RegionMetadata> {
let timestamp_key_index = self.timestamp_key_index.context(MissingTimestampSnafu)?;
let user_column_end = self.columns.len();
// Setup internal columns.
for internal_desc in internal_column_descs() {
self.push_new_column(consts::DEFAULT_CF_ID, internal_desc)?;
}
let columns = Arc::new(ColumnsMetadata {
columns: self.columns,
name_to_col_index: self.name_to_col_index,
row_key_end: self.row_key_end,
timestamp_key_index,
enable_version_column: self.enable_version_column,
user_column_end,
});
let schema = Arc::new(
RegionSchema::new(columns.clone(), Schema::INITIAL_VERSION)
.context(InvalidSchemaSnafu)?,
);
fn build(self) -> Result<RegionMetadata> {
let columns = Arc::new(self.columns_meta_builder.build()?);
let schema =
Arc::new(RegionSchema::new(columns.clone(), self.version).context(InvalidSchemaSnafu)?);
Ok(RegionMetadata {
id: self.id,
name: self.name,
schema,
columns,
column_families: ColumnFamiliesMetadata {
id_to_cfs: self.id_to_cfs,
},
version: 0,
column_families: self.cfs_meta_builder.build(),
version: self.version,
})
}
// Helper methods:
fn push_row_key_column(&mut self, desc: ColumnDescriptor) -> Result<()> {
self.push_value_column(consts::KEY_CF_ID, desc)
}
fn push_value_column(&mut self, cf_id: ColumnFamilyId, desc: ColumnDescriptor) -> Result<()> {
ensure!(
!is_internal_value_column(&desc.name),
ReservedColumnSnafu { name: &desc.name }
);
self.push_new_column(cf_id, desc)
}
fn push_new_column(&mut self, cf_id: ColumnFamilyId, desc: ColumnDescriptor) -> Result<()> {
ensure!(
!self.name_to_col_index.contains_key(&desc.name),
ColNameExistsSnafu { name: &desc.name }
);
ensure!(
!self.column_ids.contains(&desc.id),
ColIdExistsSnafu { id: desc.id }
);
let column_schema = ColumnSchema::from(&desc);
let column_name = desc.name.clone();
let column_id = desc.id;
let meta = ColumnMetadata { cf_id, desc };
let column_index = self.columns.len();
self.columns.push(meta);
self.column_schemas.push(column_schema);
self.name_to_col_index.insert(column_name, column_index);
self.column_ids.insert(column_id);
Ok(())
}
}
fn version_column_desc() -> ColumnDescriptor {
@@ -541,7 +673,8 @@ fn is_internal_value_column(column_name: &str) -> bool {
mod tests {
use datatypes::type_id::LogicalTypeId;
use store_api::storage::{
ColumnDescriptorBuilder, ColumnFamilyDescriptorBuilder, RowKeyDescriptorBuilder,
AddColumn, AlterOperation, ColumnDescriptorBuilder, ColumnFamilyDescriptorBuilder,
RowKeyDescriptorBuilder,
};
use super::*;
@@ -795,4 +928,96 @@ mod tests {
let converted = RegionMetadata::try_from(raw).unwrap();
assert_eq!(metadata, converted);
}
#[test]
fn test_alter_metadata_add_columns() {
let region_name = "region-0";
let builder = RegionDescBuilder::new(region_name)
.enable_version_column(false)
.push_key_column(("k1", LogicalTypeId::Int32, false))
.push_value_column(("v1", LogicalTypeId::Float32, true));
let last_column_id = builder.last_column_id();
let metadata: RegionMetadata = builder.build().try_into().unwrap();
let req = AlterRequest {
operation: AlterOperation::AddColumns {
columns: vec![
AddColumn {
desc: ColumnDescriptorBuilder::new(
last_column_id + 1,
"k2",
ConcreteDataType::int32_datatype(),
)
.is_nullable(false)
.build()
.unwrap(),
is_key: true,
},
AddColumn {
desc: ColumnDescriptorBuilder::new(
last_column_id + 2,
"v2",
ConcreteDataType::float32_datatype(),
)
.build()
.unwrap(),
is_key: false,
},
],
},
version: 0,
};
let metadata = metadata.alter(&req).unwrap();
let builder: RegionMetadataBuilder = RegionDescBuilder::new(region_name)
.enable_version_column(false)
.push_key_column(("k1", LogicalTypeId::Int32, false))
.push_value_column(("v1", LogicalTypeId::Float32, true))
.push_key_column(("k2", LogicalTypeId::Int32, false))
.push_value_column(("v2", LogicalTypeId::Float32, true))
.build()
.try_into()
.unwrap();
let expect = builder.version(1).build().unwrap();
assert_eq!(expect, metadata);
}
#[test]
fn test_alter_metadata_drop_columns() {
let region_name = "region-0";
let metadata: RegionMetadata = RegionDescBuilder::new(region_name)
.enable_version_column(false)
.push_key_column(("k1", LogicalTypeId::Int32, false))
.push_key_column(("k2", LogicalTypeId::Int32, false))
.push_value_column(("v1", LogicalTypeId::Float32, true))
.push_value_column(("v2", LogicalTypeId::Float32, true))
.build()
.try_into()
.unwrap();
let req = AlterRequest {
operation: AlterOperation::DropColumns {
names: vec![
String::from("k1"), // k1 would be ignored.
String::from("v1"),
],
},
version: 0,
};
let metadata = metadata.alter(&req).unwrap();
let builder = RegionDescBuilder::new(region_name)
.enable_version_column(false)
.push_key_column(("k1", LogicalTypeId::Int32, false))
.push_key_column(("k2", LogicalTypeId::Int32, false));
let last_column_id = builder.last_column_id() + 1;
let builder: RegionMetadataBuilder = builder
.set_last_column_id(last_column_id) // This id is reserved for v1
.push_value_column(("v2", LogicalTypeId::Float32, true))
.build()
.try_into()
.unwrap();
let expect = builder.version(1).build().unwrap();
assert_eq!(expect, metadata);
}
}

View File

@@ -12,7 +12,8 @@ use store_api::manifest::{
self, action::ProtocolAction, Manifest, ManifestVersion, MetaActionIterator,
};
use store_api::storage::{
OpenOptions, ReadContext, Region, RegionId, RegionMeta, WriteContext, WriteResponse,
AlterRequest, OpenOptions, ReadContext, Region, RegionId, RegionMeta, WriteContext,
WriteResponse,
};
use crate::error::{self, Error, Result};
@@ -23,7 +24,7 @@ use crate::manifest::{
};
use crate::memtable::MemtableBuilderRef;
use crate::metadata::{RegionMetaImpl, RegionMetadata};
pub use crate::region::writer::{RegionWriter, RegionWriterRef, WriterContext};
pub use crate::region::writer::{AlterContext, RegionWriter, RegionWriterRef, WriterContext};
use crate::snapshot::SnapshotImpl;
use crate::sst::AccessLayerRef;
use crate::version::VersionEdit;
@@ -75,6 +76,10 @@ impl<S: LogStore> Region for RegionImpl<S> {
fn write_request(&self) -> Self::WriteRequest {
WriteBatch::new(self.in_memory_metadata().schema().clone())
}
async fn alter(&self, request: AlterRequest) -> Result<()> {
self.inner.alter(request).await
}
}
/// Storage related config for region.
@@ -343,6 +348,7 @@ impl<S: LogStore> RegionInner<S> {
}
async fn write(&self, ctx: &WriteContext, request: WriteBatch) -> Result<WriteResponse> {
// FIXME(yingwen): [alter] The schema may be outdated.
let metadata = self.in_memory_metadata();
let schema = metadata.schema();
// Only compare column schemas.
@@ -365,4 +371,16 @@ impl<S: LogStore> RegionInner<S> {
// Now altering schema is not allowed, so it is safe to validate schema outside of the lock.
self.writer.write(ctx, request, writer_ctx).await
}
async fn alter(&self, request: AlterRequest) -> Result<()> {
// TODO(yingwen): [alter] Log the request.
let alter_ctx = AlterContext {
shared: &self.shared,
wal: &self.wal,
manifest: &self.manifest,
};
self.writer.alter(alter_ctx, request).await
}
}

View File

@@ -5,7 +5,7 @@ use common_time::RangeMillis;
use futures::TryStreamExt;
use snafu::ResultExt;
use store_api::logstore::LogStore;
use store_api::storage::{SequenceNumber, WriteContext, WriteRequest, WriteResponse};
use store_api::storage::{AlterRequest, SequenceNumber, WriteContext, WriteRequest, WriteResponse};
use tokio::sync::Mutex;
use crate::background::JobHandle;
@@ -92,6 +92,22 @@ impl RegionWriter {
inner.replay(&self.version_mutex, writer_ctx).await
}
/// Alter schema of the region.
pub async fn alter<S: LogStore>(
&self,
_alter_ctx: AlterContext<'_, S>,
_request: AlterRequest,
) -> Result<()> {
// TODO(yingwen): [alter] implements alter:
// 1. acquire version lock
// 2. validate request
// 3. build schema based on new request
// 4. persist it into the region manifest
// 5. update version in VersionControl
unimplemented!()
}
async fn persist_manifest_version<S: LogStore>(
&self,
wal: &Wal<S>,
@@ -136,6 +152,12 @@ impl<'a, S: LogStore> WriterContext<'a, S> {
}
}
pub struct AlterContext<'a, S: LogStore> {
pub shared: &'a SharedDataRef,
pub wal: &'a Wal<S>,
pub manifest: &'a RegionManifest,
}
#[derive(Debug)]
struct WriterInner {
memtable_builder: MemtableBuilderRef,

View File

@@ -20,7 +20,7 @@ impl RegionDescBuilder {
pub fn new<T: Into<String>>(name: T) -> Self {
let key_builder = RowKeyDescriptorBuilder::new(
ColumnDescriptorBuilder::new(
2,
1,
test_util::TIMESTAMP_NAME,
ConcreteDataType::timestamp_millis_datatype(),
)
@@ -32,7 +32,7 @@ impl RegionDescBuilder {
Self {
id: 0,
name: name.into(),
last_column_id: 2,
last_column_id: 1,
key_builder,
default_cf_builder: ColumnFamilyDescriptorBuilder::default(),
}
@@ -43,20 +43,9 @@ impl RegionDescBuilder {
self
}
// This will reset the row key builder, so should be called before `push_key_column()`
// and `enable_version_column()`, or just call after `new()`.
//
// NOTE(ning): it's now possible to change this function to:
//
// ```
// self.key_builder.timestamp(self.new_column(column_def))
// ```
// to resolve the constraint above
pub fn timestamp(mut self, column_def: ColumnDef) -> Self {
let builder = RowKeyDescriptorBuilder::new(self.new_column(column_def));
self.key_builder = builder;
let column = self.new_column(column_def);
self.key_builder = self.key_builder.timestamp(column);
self
}
@@ -77,6 +66,11 @@ impl RegionDescBuilder {
self
}
pub fn set_last_column_id(mut self, column_id: ColumnId) -> Self {
self.last_column_id = column_id;
self
}
pub fn build(self) -> RegionDescriptor {
RegionDescriptor {
id: self.id,
@@ -87,6 +81,10 @@ impl RegionDescBuilder {
}
}
pub fn last_column_id(&self) -> ColumnId {
self.last_column_id
}
fn alloc_column_id(&mut self) -> ColumnId {
self.last_column_id += 1;
self.last_column_id

View File

@@ -26,7 +26,7 @@ use crate::{
#[derive(Debug)]
pub struct Wal<S: LogStore> {
name: String,
region_id: RegionId,
namespace: S::Namespace,
store: Arc<S>,
}
@@ -35,11 +35,11 @@ pub type WriteBatchStream<'a> = Pin<
Box<dyn Stream<Item = Result<(SequenceNumber, WalHeader, Option<WriteBatch>)>> + Send + 'a>,
>;
// wal should be cheap to clone
// Wal should be cheap to clone, so avoid holding things like String, Vec.
impl<S: LogStore> Clone for Wal<S> {
fn clone(&self) -> Self {
Self {
name: self.name.clone(),
region_id: self.region_id,
namespace: self.namespace.clone(),
store: self.store.clone(),
}
@@ -50,15 +50,15 @@ impl<S: LogStore> Wal<S> {
pub fn new(region_id: RegionId, store: Arc<S>) -> Self {
let namespace = store.namespace(region_id);
Self {
name: region_id.to_string(),
region_id,
namespace,
store,
}
}
#[inline]
pub fn name(&self) -> &str {
&self.name
pub fn region_id(&self) -> RegionId {
self.region_id
}
}
@@ -102,7 +102,9 @@ impl<S: LogStore> Wal<S> {
encoder
.encode(batch, &mut buf)
.map_err(BoxedError::new)
.context(error::WriteWalSnafu { name: self.name() })?;
.context(error::WriteWalSnafu {
region_id: self.region_id(),
})?;
} else if let Payload::WriteBatchProto(batch) = payload {
// entry
let encoder = WriteBatchProtobufEncoder {};
@@ -110,7 +112,9 @@ impl<S: LogStore> Wal<S> {
encoder
.encode(batch, &mut buf)
.map_err(BoxedError::new)
.context(error::WriteWalSnafu { name: self.name() })?;
.context(error::WriteWalSnafu {
region_id: self.region_id(),
})?;
}
// write bytes to wal
@@ -123,9 +127,12 @@ impl<S: LogStore> Wal<S> {
.read(&self.namespace, start_seq)
.await
.map_err(BoxedError::new)
.context(error::ReadWalSnafu { name: self.name() })?
.context(error::ReadWalSnafu {
region_id: self.region_id(),
})?
// Handle the error when reading from the stream.
.map_err(|e| Error::ReadWal {
name: self.name().to_string(),
region_id: self.region_id(),
source: BoxedError::new(e),
})
.and_then(|entries| async {
@@ -146,7 +153,9 @@ impl<S: LogStore> Wal<S> {
.append(e)
.await
.map_err(BoxedError::new)
.context(error::WriteWalSnafu { name: self.name() })?;
.context(error::WriteWalSnafu {
region_id: self.region_id(),
})?;
Ok((res.entry_id(), res.offset()))
}
@@ -164,7 +173,7 @@ impl<S: LogStore> Wal<S> {
ensure!(
data_pos <= input.len(),
error::WalDataCorruptedSnafu {
name: self.name(),
region_id: self.region_id(),
message: format!(
"Not enough input buffer, expected data position={}, actual buffer length={}",
data_pos,
@@ -181,7 +190,9 @@ impl<S: LogStore> Wal<S> {
let write_batch = decoder
.decode(&input[data_pos..])
.map_err(BoxedError::new)
.context(error::ReadWalSnafu { name: self.name() })?;
.context(error::ReadWalSnafu {
region_id: self.region_id(),
})?;
Ok((seq_num, header, Some(write_batch)))
}
@@ -191,12 +202,14 @@ impl<S: LogStore> Wal<S> {
let write_batch = decoder
.decode(&input[data_pos..])
.map_err(BoxedError::new)
.context(error::ReadWalSnafu { name: self.name() })?;
.context(error::ReadWalSnafu {
region_id: self.region_id(),
})?;
Ok((seq_num, header, Some(write_batch)))
}
_ => error::WalDataCorruptedSnafu {
name: self.name(),
region_id: self.region_id(),
message: format!("invalid payload type={}", header.payload_type),
}
.fail(),

View File

@@ -21,7 +21,9 @@ pub use self::descriptors::*;
pub use self::engine::{CreateOptions, EngineContext, OpenOptions, StorageEngine};
pub use self::metadata::RegionMeta;
pub use self::region::{Region, WriteContext};
pub use self::requests::{GetRequest, PutOperation, ScanRequest, WriteRequest};
pub use self::requests::{
AddColumn, AlterOperation, AlterRequest, GetRequest, PutOperation, ScanRequest, WriteRequest,
};
pub use self::responses::{GetResponse, ScanResponse, WriteResponse};
pub use self::snapshot::{ReadContext, Snapshot};
pub use self::types::{OpType, SequenceNumber};

View File

@@ -4,4 +4,7 @@ use crate::storage::SchemaRef;
pub trait RegionMeta: Send + Sync {
/// Returns the schema of the region.
fn schema(&self) -> &SchemaRef;
/// Returns the version of the region metadata.
fn version(&self) -> u32;
}

View File

@@ -23,10 +23,10 @@ use common_error::ext::ErrorExt;
use crate::storage::engine::OpenOptions;
use crate::storage::metadata::RegionMeta;
use crate::storage::requests::WriteRequest;
use crate::storage::requests::{AlterRequest, WriteRequest};
use crate::storage::responses::WriteResponse;
use crate::storage::snapshot::{ReadContext, Snapshot};
use crate::storage::{RegionDescriptor, RegionId};
use crate::storage::RegionId;
/// Chunks of rows in storage engine.
#[async_trait]
@@ -57,9 +57,7 @@ pub trait Region: Send + Sync + Clone + std::fmt::Debug + 'static {
/// Create write request
fn write_request(&self) -> Self::WriteRequest;
fn alter(&self, _descriptor: RegionDescriptor) -> Result<(), Self::Error> {
unimplemented!()
}
async fn alter(&self, request: AlterRequest) -> Result<(), Self::Error>;
}
/// Context for write operations.

View File

@@ -1,3 +1,4 @@
use std::collections::HashSet;
use std::time::Duration;
use common_error::ext::ErrorExt;
@@ -5,7 +6,7 @@ use common_query::logical_plan::Expr;
use common_time::RangeMillis;
use datatypes::vectors::VectorRef;
use crate::storage::SequenceNumber;
use crate::storage::{ColumnDescriptor, RegionDescriptor, SequenceNumber};
/// Write request holds a collection of updates to apply to a region.
pub trait WriteRequest: Send {
@@ -50,3 +51,157 @@ pub struct ScanRequest {
#[derive(Debug)]
pub struct GetRequest {}
/// Operation to add a column.
#[derive(Debug)]
pub struct AddColumn {
/// Descriptor of the column to add.
pub desc: ColumnDescriptor,
/// Is the column a key column.
pub is_key: bool,
}
/// Operation to alter a region.
#[derive(Debug)]
pub enum AlterOperation {
/// Add columns to the region.
AddColumns {
/// Columns to add.
columns: Vec<AddColumn>,
},
/// Drop columns from the region, only value columns are allowed to drop.
DropColumns {
/// Name of columns to drop.
names: Vec<String>,
},
}
impl AlterOperation {
/// Apply the operation to the [RegionDescriptor].
pub fn apply(&self, descriptor: &mut RegionDescriptor) {
match self {
AlterOperation::AddColumns { columns } => {
Self::apply_add(columns, descriptor);
}
AlterOperation::DropColumns { names } => {
Self::apply_drop(names, descriptor);
}
}
}
/// Add `columns` to the [RegionDescriptor].
///
/// Value columns would be added to the default column family.
fn apply_add(columns: &[AddColumn], descriptor: &mut RegionDescriptor) {
for col in columns {
if col.is_key {
descriptor.row_key.columns.push(col.desc.clone());
} else {
descriptor.default_cf.columns.push(col.desc.clone());
}
}
}
/// Drop columns from the [RegionDescriptor] by their `names`.
///
/// Only value columns would be removed, non-value columns in `names` would be ignored.
fn apply_drop(names: &[String], descriptor: &mut RegionDescriptor) {
let name_set: HashSet<_> = names.iter().collect();
// Remove columns in the default cf.
descriptor
.default_cf
.columns
.retain(|col| !name_set.contains(&col.name));
// Remove columns in other cfs.
for cf in &mut descriptor.extra_cfs {
cf.columns.retain(|col| !name_set.contains(&col.name));
}
}
}
/// Alter region request.
#[derive(Debug)]
pub struct AlterRequest {
/// Operation to do.
pub operation: AlterOperation,
/// The version of the schema before applying the alteration.
pub version: u32,
}
#[cfg(test)]
mod tests {
use datatypes::prelude::*;
use super::*;
use crate::storage::{
ColumnDescriptorBuilder, ColumnFamilyDescriptorBuilder, ColumnId, RegionDescriptorBuilder,
RowKeyDescriptorBuilder,
};
fn new_column_desc(id: ColumnId) -> ColumnDescriptor {
ColumnDescriptorBuilder::new(id, id.to_string(), ConcreteDataType::int64_datatype())
.is_nullable(false)
.build()
.unwrap()
}
fn new_region_descriptor() -> RegionDescriptor {
let row_key = RowKeyDescriptorBuilder::default()
.timestamp(new_column_desc(1))
.build()
.unwrap();
let default_cf = ColumnFamilyDescriptorBuilder::default()
.push_column(new_column_desc(2))
.build()
.unwrap();
RegionDescriptorBuilder::default()
.id(1)
.name("test")
.row_key(row_key)
.default_cf(default_cf)
.build()
.unwrap()
}
#[test]
fn test_alter_operation() {
let mut desc = new_region_descriptor();
let op = AlterOperation::AddColumns {
columns: vec![
AddColumn {
desc: new_column_desc(3),
is_key: true,
},
AddColumn {
desc: new_column_desc(4),
is_key: false,
},
],
};
op.apply(&mut desc);
assert_eq!(1, desc.row_key.columns.len());
assert_eq!("3", desc.row_key.columns[0].name);
assert_eq!(2, desc.default_cf.columns.len());
assert_eq!("2", desc.default_cf.columns[0].name);
assert_eq!("4", desc.default_cf.columns[1].name);
let op = AlterOperation::DropColumns {
names: vec![String::from("2")],
};
op.apply(&mut desc);
assert_eq!(1, desc.row_key.columns.len());
assert_eq!(1, desc.default_cf.columns.len());
assert_eq!("4", desc.default_cf.columns[0].name);
// Key columns are ignored.
let op = AlterOperation::DropColumns {
names: vec![String::from("1"), String::from("3")],
};
op.apply(&mut desc);
assert_eq!(1, desc.row_key.columns.len());
assert_eq!(1, desc.default_cf.columns.len());
}
}

View File

@@ -126,7 +126,7 @@ struct MitoEngineInner<S: StorageEngine> {
table_mutex: Mutex<()>,
}
pub(crate) fn build_row_key_desc(
fn build_row_key_desc(
mut column_id: ColumnId,
table_name: &str,
table_schema: &SchemaRef,
@@ -189,7 +189,7 @@ pub(crate) fn build_row_key_desc(
))
}
pub(crate) fn build_column_family(
fn build_column_family(
mut column_id: ColumnId,
table_name: &str,
table_schema: &SchemaRef,

View File

@@ -23,10 +23,9 @@ use object_store::ObjectStore;
use snafu::{OptionExt, ResultExt};
use store_api::manifest::action::ProtocolAction;
use store_api::manifest::{self, Manifest, ManifestVersion, MetaActionIterator};
use store_api::storage::RegionDescriptorBuilder;
use store_api::storage::{
ChunkReader, PutOperation, ReadContext, Region, RegionMeta, ScanRequest, SchemaRef, Snapshot,
WriteContext, WriteRequest,
AddColumn, AlterOperation, AlterRequest, ChunkReader, ColumnDescriptorBuilder, PutOperation,
ReadContext, Region, RegionMeta, ScanRequest, SchemaRef, Snapshot, WriteContext, WriteRequest,
};
use table::error::{Error as TableError, MissingColumnSnafu, Result as TableResult};
use table::metadata::{FilterPushDownType, TableInfoRef, TableMetaBuilder};
@@ -37,7 +36,6 @@ use table::{
};
use tokio::sync::Mutex;
use crate::engine::{build_column_family, build_row_key_desc, INIT_COLUMN_ID};
use crate::error::{
self, ProjectedColumnNotFoundSnafu, Result, ScanTableManifestSnafu, SchemaBuildSnafu,
TableInfoNotFoundSnafu, UnsupportedDefaultConstraintSnafu, UpdateTableManifestSnafu,
@@ -184,31 +182,43 @@ impl<R: Region> Table for MitoTable<R> {
let table_info = self.table_info();
let table_name = &table_info.name;
let table_meta = &table_info.meta;
let table_schema = match &req.alter_kind {
let (alter_op, table_schema) = match &req.alter_kind {
AlterKind::AddColumn { new_column } => {
build_table_schema_with_new_column(table_name, &table_meta.schema, new_column)?
let desc = ColumnDescriptorBuilder::new(
table_meta.next_column_id,
&new_column.name,
new_column.data_type.clone(),
)
.is_nullable(new_column.is_nullable)
.default_constraint(new_column.default_constraint.clone())
.build()
.context(error::BuildColumnDescriptorSnafu {
table_name,
column_name: &new_column.name,
})?;
let alter_op = AlterOperation::AddColumns {
columns: vec![AddColumn {
desc,
// TODO(yingwen): [alter] AlterTableRequest should be able to add a key column.
is_key: false,
}],
};
// TODO(yingwen): [alter] Better way to alter the schema struct. In fact the column order
// in table schema could differ from the region schema, so we could just push this column
// to the back of the schema (as last column).
let table_schema =
build_table_schema_with_new_column(table_name, &table_meta.schema, new_column)?;
(alter_op, table_schema)
}
};
let primary_key_indices = &table_meta.primary_key_indices;
let (next_column_id, default_cf) = build_column_family(
INIT_COLUMN_ID,
table_name,
&table_schema,
primary_key_indices,
)?;
let (next_column_id, row_key) = build_row_key_desc(
next_column_id,
table_name,
&table_schema,
primary_key_indices,
)?;
let new_meta = TableMetaBuilder::default()
.schema(table_schema.clone())
.engine(&table_meta.engine)
.next_column_id(next_column_id)
.primary_key_indices(primary_key_indices.clone())
.next_column_id(table_meta.next_column_id + 1) // Bump next column id.
.primary_key_indices(table_meta.primary_key_indices.clone())
.build()
.context(error::BuildTableMetaSnafu { table_name })?;
@@ -216,25 +226,21 @@ impl<R: Region> Table for MitoTable<R> {
new_info.ident.version = table_info.ident.version + 1;
new_info.meta = new_meta;
// first alter region
// FIXME(yingwen): [alter] Alter the region, now this is a temporary implementation.
let region = self.region();
let region_descriptor = RegionDescriptorBuilder::default()
.id(region.id())
.name(region.name())
.row_key(row_key)
.default_cf(default_cf)
.build()
.context(error::BuildRegionDescriptorSnafu {
table_name,
region_name: region.name(),
})?;
let region_meta = region.in_memory_metadata();
let alter_req = AlterRequest {
operation: alter_op,
version: region_meta.version(),
};
logging::debug!(
"start altering region {} of table {}, with new region descriptor {:?}",
"start altering region {} of table {}, with request {:?}",
region.name(),
table_name,
region_descriptor
alter_req,
);
region.alter(region_descriptor).map_err(TableError::new)?;
region.alter(alter_req).await.map_err(TableError::new)?;
// then alter table info
logging::debug!(

View File

@@ -12,9 +12,9 @@ use datatypes::schema::{ColumnSchema, Schema};
use storage::metadata::{RegionMetaImpl, RegionMetadata};
use storage::write_batch::{Mutation, WriteBatch};
use store_api::storage::{
Chunk, ChunkReader, CreateOptions, EngineContext, GetRequest, GetResponse, OpenOptions,
ReadContext, Region, RegionDescriptor, RegionId, RegionMeta, ScanRequest, ScanResponse,
SchemaRef, Snapshot, StorageEngine, WriteContext, WriteResponse,
AlterRequest, Chunk, ChunkReader, CreateOptions, EngineContext, GetRequest, GetResponse,
OpenOptions, ReadContext, Region, RegionDescriptor, RegionId, RegionMeta, ScanRequest,
ScanResponse, SchemaRef, Snapshot, StorageEngine, WriteContext, WriteResponse,
};
pub type Result<T> = std::result::Result<T, MockError>;
@@ -115,13 +115,12 @@ impl Snapshot for MockSnapshot {
// purpose the cost should be acceptable.
#[derive(Debug, Clone)]
pub struct MockRegion {
// FIXME(yingwen): Remove this once name is provided by metadata.
name: String,
pub inner: Arc<MockRegionInner>,
}
#[derive(Debug)]
pub struct MockRegionInner {
name: String,
pub metadata: ArcSwap<RegionMetadata>,
memtable: Arc<RwLock<MockMemtable>>,
}
@@ -140,7 +139,7 @@ impl Region for MockRegion {
}
fn name(&self) -> &str {
&self.name
&self.inner.name
}
fn in_memory_metadata(&self) -> RegionMetaImpl {
@@ -163,9 +162,12 @@ impl Region for MockRegion {
WriteBatch::new(self.in_memory_metadata().schema().clone())
}
fn alter(&self, descriptor: RegionDescriptor) -> Result<()> {
let metadata = descriptor.try_into().unwrap();
async fn alter(&self, request: AlterRequest) -> Result<()> {
let current = self.inner.metadata.load();
// Mock engine just panic if failed to create a new metadata.
let metadata = current.alter(&request).unwrap();
self.inner.update_metadata(metadata);
Ok(())
}
}
@@ -177,6 +179,7 @@ impl MockRegionInner {
memtable.insert(column.name.clone(), vec![]);
}
Self {
name: metadata.name().to_string(),
metadata: ArcSwap::new(Arc::new(metadata)),
memtable: Arc::new(RwLock::new(memtable)),
}
@@ -277,7 +280,6 @@ impl StorageEngine for MockEngine {
let name = descriptor.name.clone();
let metadata = descriptor.try_into().unwrap();
let region = MockRegion {
name: name.clone(),
inner: Arc::new(MockRegionInner::new(metadata)),
};
regions.opened_regions.insert(name, region.clone());