mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-26 01:40:36 +00:00
feat(mito): Allow to retry create request and alter request (#2447)
* feat: RegionMetadataBuilder allow adding/dropping columns multiple times * test: test add if not exists/drop if exists * feat: change validator and add need_alter * test: fix tests and test need_alter * test: test alter retry * feat: open before create * style: fix clippy
This commit is contained in:
@@ -106,7 +106,6 @@ async fn test_alter_region() {
|
||||
assert_eq!(1, version_data.last_entry_id);
|
||||
assert_eq!(3, version_data.committed_sequence);
|
||||
assert_eq!(1, version_data.version.flushed_entry_id);
|
||||
assert_eq!(1, version_data.version.flushed_entry_id);
|
||||
assert_eq!(3, version_data.version.flushed_sequence);
|
||||
};
|
||||
check_region(&engine);
|
||||
@@ -245,3 +244,53 @@ async fn test_put_after_alter() {
|
||||
let batches = RecordBatches::try_collect(stream).await.unwrap();
|
||||
assert_eq!(expected, batches.pretty_print().unwrap());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_alter_region_retry() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let mut env = TestEnv::new();
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new().build();
|
||||
|
||||
let column_schemas = rows_schema(&request);
|
||||
engine
|
||||
.handle_request(region_id, RegionRequest::Create(request))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let rows = Rows {
|
||||
schema: column_schemas,
|
||||
rows: build_rows_for_key("a", 0, 2, 0),
|
||||
};
|
||||
put_rows(&engine, region_id, rows).await;
|
||||
|
||||
let request = add_tag1();
|
||||
engine
|
||||
.handle_request(region_id, RegionRequest::Alter(request))
|
||||
.await
|
||||
.unwrap();
|
||||
// Retries request.
|
||||
let request = add_tag1();
|
||||
engine
|
||||
.handle_request(region_id, RegionRequest::Alter(request))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let expected = "\
|
||||
+-------+-------+---------+---------------------+
|
||||
| tag_1 | tag_0 | field_0 | ts |
|
||||
+-------+-------+---------+---------------------+
|
||||
| | a | 0.0 | 1970-01-01T00:00:00 |
|
||||
| | a | 1.0 | 1970-01-01T00:00:01 |
|
||||
+-------+-------+---------+---------------------+";
|
||||
scan_check_after_alter(&engine, region_id, expected).await;
|
||||
let region = engine.get_region(region_id).unwrap();
|
||||
let version_data = region.version_control.current();
|
||||
assert_eq!(1, version_data.last_entry_id);
|
||||
assert_eq!(2, version_data.committed_sequence);
|
||||
assert_eq!(1, version_data.version.flushed_entry_id);
|
||||
assert_eq!(2, version_data.version.flushed_sequence);
|
||||
}
|
||||
|
||||
@@ -14,8 +14,6 @@
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_error::status_code::StatusCode;
|
||||
use store_api::region_engine::RegionEngine;
|
||||
use store_api::region_request::RegionRequest;
|
||||
use store_api::storage::RegionId;
|
||||
@@ -39,12 +37,12 @@ async fn test_engine_create_new_region() {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_engine_create_region_if_not_exists() {
|
||||
let mut env = TestEnv::with_prefix("create-not-exists");
|
||||
async fn test_engine_create_existing_region() {
|
||||
let mut env = TestEnv::with_prefix("create-existing");
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let builder = CreateRequestBuilder::new().create_if_not_exists(true);
|
||||
let builder = CreateRequestBuilder::new();
|
||||
engine
|
||||
.handle_request(region_id, RegionRequest::Create(builder.build()))
|
||||
.await
|
||||
@@ -58,8 +56,8 @@ async fn test_engine_create_region_if_not_exists() {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_engine_create_existing_region() {
|
||||
let mut env = TestEnv::with_prefix("create-existing");
|
||||
async fn test_engine_create_with_different_id() {
|
||||
let mut env = TestEnv::new();
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
@@ -69,15 +67,51 @@ async fn test_engine_create_existing_region() {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Create the same region again.
|
||||
let err = engine
|
||||
// Creates with different id.
|
||||
engine
|
||||
.handle_request(RegionId::new(2, 1), RegionRequest::Create(builder.build()))
|
||||
.await
|
||||
.unwrap_err();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_engine_create_with_different_schema() {
|
||||
let mut env = TestEnv::new();
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let builder = CreateRequestBuilder::new();
|
||||
engine
|
||||
.handle_request(region_id, RegionRequest::Create(builder.build()))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Creates with different schema.
|
||||
let builder = builder.tag_num(2);
|
||||
engine
|
||||
.handle_request(region_id, RegionRequest::Create(builder.build()))
|
||||
.await
|
||||
.unwrap_err();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_engine_create_with_different_primary_key() {
|
||||
let mut env = TestEnv::new();
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let builder = CreateRequestBuilder::new().tag_num(2);
|
||||
engine
|
||||
.handle_request(region_id, RegionRequest::Create(builder.build()))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Creates with different schema.
|
||||
let builder = builder.primary_key(vec![1]);
|
||||
engine
|
||||
.handle_request(region_id, RegionRequest::Create(builder.build()))
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert!(
|
||||
matches!(err.status_code(), StatusCode::RegionAlreadyExists),
|
||||
"unexpected err: {err}"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
@@ -250,7 +250,6 @@ async fn test_flush_reopen_region() {
|
||||
assert_eq!(1, version_data.last_entry_id);
|
||||
assert_eq!(3, version_data.committed_sequence);
|
||||
assert_eq!(1, version_data.version.flushed_entry_id);
|
||||
assert_eq!(1, version_data.version.flushed_entry_id);
|
||||
assert_eq!(3, version_data.version.flushed_sequence);
|
||||
};
|
||||
check_region();
|
||||
|
||||
@@ -459,6 +459,17 @@ pub enum Error {
|
||||
source: serde_json::Error,
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Empty region directory, region_id: {}, region_dir: {}",
|
||||
region_id,
|
||||
region_dir,
|
||||
))]
|
||||
EmptyRegionDir {
|
||||
region_id: RegionId,
|
||||
region_dir: String,
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
@@ -529,6 +540,7 @@ impl ErrorExt for Error {
|
||||
InvalidRegionRequest { source, .. } => source.status_code(),
|
||||
RegionReadonly { .. } => StatusCode::RegionReadonly,
|
||||
JsonOptions { .. } => StatusCode::InvalidArguments,
|
||||
EmptyRegionDir { .. } => StatusCode::RegionNotFound,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -18,19 +18,19 @@ use std::collections::HashMap;
|
||||
use std::sync::atomic::{AtomicBool, AtomicI64};
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_telemetry::info;
|
||||
use common_telemetry::{debug, error, info, warn};
|
||||
use common_time::util::current_time_millis;
|
||||
use futures::StreamExt;
|
||||
use object_store::util::join_dir;
|
||||
use object_store::ObjectStore;
|
||||
use snafu::{ensure, OptionExt};
|
||||
use store_api::logstore::LogStore;
|
||||
use store_api::metadata::RegionMetadata;
|
||||
use store_api::storage::RegionId;
|
||||
use store_api::metadata::{ColumnMetadata, RegionMetadata};
|
||||
use store_api::storage::{ColumnId, RegionId};
|
||||
|
||||
use crate::access_layer::AccessLayer;
|
||||
use crate::config::MitoConfig;
|
||||
use crate::error::{RegionCorruptedSnafu, RegionNotFoundSnafu, Result};
|
||||
use crate::error::{EmptyRegionDirSnafu, RegionCorruptedSnafu, Result};
|
||||
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
|
||||
use crate::memtable::MemtableBuilderRef;
|
||||
use crate::region::options::RegionOptions;
|
||||
@@ -90,22 +90,50 @@ impl RegionOpener {
|
||||
self
|
||||
}
|
||||
|
||||
/// Writes region manifest and creates a new region.
|
||||
/// Writes region manifest and creates a new region if it does not exist.
|
||||
/// Opens the region if it already exists.
|
||||
///
|
||||
/// # Panics
|
||||
/// Panics if metadata is not set.
|
||||
pub(crate) async fn create(self, config: &MitoConfig) -> Result<MitoRegion> {
|
||||
pub(crate) async fn create_or_open<S: LogStore>(
|
||||
self,
|
||||
config: &MitoConfig,
|
||||
wal: &Wal<S>,
|
||||
) -> Result<MitoRegion> {
|
||||
let region_id = self.region_id;
|
||||
let metadata = Arc::new(self.metadata.unwrap());
|
||||
let options = self.manifest_options(config);
|
||||
|
||||
// Create a manifest manager for this region.
|
||||
let options = RegionManifestOptions {
|
||||
manifest_dir: new_manifest_dir(&self.region_dir),
|
||||
object_store: self.object_store.clone(),
|
||||
compress_type: config.manifest_compress_type,
|
||||
checkpoint_distance: config.manifest_checkpoint_distance,
|
||||
};
|
||||
// Writes regions to the manifest file.
|
||||
// Tries to open the region.
|
||||
match self.maybe_open(config, wal).await {
|
||||
Ok(Some(region)) => {
|
||||
let recovered = region.metadata();
|
||||
// Checks the schema of the region.
|
||||
let expect = self.metadata.as_ref().unwrap();
|
||||
check_recovered_region(
|
||||
&recovered,
|
||||
expect.region_id,
|
||||
&expect.column_metadatas,
|
||||
&expect.primary_key,
|
||||
)?;
|
||||
|
||||
return Ok(region);
|
||||
}
|
||||
Ok(None) => {
|
||||
debug!(
|
||||
"No data under directory {}, region_id: {}",
|
||||
self.region_dir, self.region_id
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"Failed to open region {} before creating it, region_dir: {}, err: {}",
|
||||
self.region_id, self.region_dir, e
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
let metadata = Arc::new(self.metadata.unwrap());
|
||||
// Create a manifest manager for this region and writes regions to the manifest file.
|
||||
let manifest_manager = RegionManifestManager::new(metadata.clone(), options).await?;
|
||||
|
||||
let mutable = self.memtable_builder.build(&metadata);
|
||||
@@ -137,33 +165,52 @@ impl RegionOpener {
|
||||
config: &MitoConfig,
|
||||
wal: &Wal<S>,
|
||||
) -> Result<MitoRegion> {
|
||||
let options = RegionManifestOptions {
|
||||
manifest_dir: new_manifest_dir(&self.region_dir),
|
||||
object_store: self.object_store.clone(),
|
||||
compress_type: config.manifest_compress_type,
|
||||
checkpoint_distance: config.manifest_checkpoint_distance,
|
||||
let region_id = self.region_id;
|
||||
let region = self
|
||||
.maybe_open(config, wal)
|
||||
.await?
|
||||
.context(EmptyRegionDirSnafu {
|
||||
region_id,
|
||||
region_dir: self.region_dir,
|
||||
})?;
|
||||
|
||||
ensure!(
|
||||
region.region_id == self.region_id,
|
||||
RegionCorruptedSnafu {
|
||||
region_id: self.region_id,
|
||||
reason: format!(
|
||||
"recovered region has different region id {}",
|
||||
region.region_id
|
||||
),
|
||||
}
|
||||
);
|
||||
|
||||
Ok(region)
|
||||
}
|
||||
|
||||
/// Tries to open the region and returns `None` if the region directory is empty.
|
||||
async fn maybe_open<S: LogStore>(
|
||||
&self,
|
||||
config: &MitoConfig,
|
||||
wal: &Wal<S>,
|
||||
) -> Result<Option<MitoRegion>> {
|
||||
let options = self.manifest_options(config);
|
||||
let Some(manifest_manager) = RegionManifestManager::open(options).await? else {
|
||||
return Ok(None);
|
||||
};
|
||||
let manifest_manager =
|
||||
RegionManifestManager::open(options)
|
||||
.await?
|
||||
.context(RegionNotFoundSnafu {
|
||||
region_id: self.region_id,
|
||||
})?;
|
||||
|
||||
let manifest = manifest_manager.manifest().await;
|
||||
let metadata = manifest.metadata.clone();
|
||||
|
||||
ensure!(
|
||||
metadata.region_id == self.region_id,
|
||||
RegionCorruptedSnafu {
|
||||
region_id: self.region_id,
|
||||
reason: format!("region id in metadata is {}", metadata.region_id),
|
||||
}
|
||||
);
|
||||
|
||||
let region_id = metadata.region_id;
|
||||
let access_layer = Arc::new(AccessLayer::new(self.region_dir, self.object_store.clone()));
|
||||
let file_purger = Arc::new(LocalFilePurger::new(self.scheduler, access_layer.clone()));
|
||||
let region_id = self.region_id;
|
||||
let access_layer = Arc::new(AccessLayer::new(
|
||||
self.region_dir.clone(),
|
||||
self.object_store.clone(),
|
||||
));
|
||||
let file_purger = Arc::new(LocalFilePurger::new(
|
||||
self.scheduler.clone(),
|
||||
access_layer.clone(),
|
||||
));
|
||||
let mutable = self.memtable_builder.build(&metadata);
|
||||
let options = RegionOptions::try_from(&self.options)?;
|
||||
let version = VersionBuilder::new(metadata, mutable)
|
||||
@@ -187,8 +234,67 @@ impl RegionOpener {
|
||||
// Region is always opened in read only mode.
|
||||
writable: AtomicBool::new(false),
|
||||
};
|
||||
Ok(region)
|
||||
Ok(Some(region))
|
||||
}
|
||||
|
||||
/// Returns a new manifest options.
|
||||
fn manifest_options(&self, config: &MitoConfig) -> RegionManifestOptions {
|
||||
RegionManifestOptions {
|
||||
manifest_dir: new_manifest_dir(&self.region_dir),
|
||||
object_store: self.object_store.clone(),
|
||||
compress_type: config.manifest_compress_type,
|
||||
checkpoint_distance: config.manifest_checkpoint_distance,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Checks whether the recovered region has the same schema as region to create.
|
||||
pub(crate) fn check_recovered_region(
|
||||
recovered: &RegionMetadata,
|
||||
region_id: RegionId,
|
||||
column_metadatas: &[ColumnMetadata],
|
||||
primary_key: &[ColumnId],
|
||||
) -> Result<()> {
|
||||
if recovered.region_id != region_id {
|
||||
error!(
|
||||
"Recovered region {}, expect region {}",
|
||||
recovered.region_id, region_id
|
||||
);
|
||||
return RegionCorruptedSnafu {
|
||||
region_id,
|
||||
reason: format!(
|
||||
"recovered metadata has different region id {}",
|
||||
recovered.region_id
|
||||
),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
if recovered.column_metadatas != column_metadatas {
|
||||
error!(
|
||||
"Unexpected schema in recovered region {}, recovered: {:?}, expect: {:?}",
|
||||
recovered.region_id, recovered.column_metadatas, column_metadatas
|
||||
);
|
||||
|
||||
return RegionCorruptedSnafu {
|
||||
region_id,
|
||||
reason: "recovered metadata has different schema",
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
if recovered.primary_key != primary_key {
|
||||
error!(
|
||||
"Unexpected primary key in recovered region {}, recovered: {:?}, expect: {:?}",
|
||||
recovered.region_id, recovered.primary_key, primary_key
|
||||
);
|
||||
|
||||
return RegionCorruptedSnafu {
|
||||
region_id,
|
||||
reason: "recovered metadata has different primary key",
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Replays the mutations from WAL and inserts mutations to memtable of given region.
|
||||
|
||||
@@ -42,7 +42,7 @@ use store_api::region_request::{
|
||||
RegionCloseRequest, RegionCreateRequest, RegionDeleteRequest, RegionFlushRequest,
|
||||
RegionOpenRequest, RegionPutRequest, RegionRequest,
|
||||
};
|
||||
use store_api::storage::RegionId;
|
||||
use store_api::storage::{ColumnId, RegionId};
|
||||
|
||||
use crate::config::MitoConfig;
|
||||
use crate::engine::listener::EventListenerRef;
|
||||
@@ -208,8 +208,8 @@ pub struct CreateRequestBuilder {
|
||||
region_dir: String,
|
||||
tag_num: usize,
|
||||
field_num: usize,
|
||||
create_if_not_exists: bool,
|
||||
options: HashMap<String, String>,
|
||||
primary_key: Option<Vec<ColumnId>>,
|
||||
}
|
||||
|
||||
impl Default for CreateRequestBuilder {
|
||||
@@ -218,34 +218,39 @@ impl Default for CreateRequestBuilder {
|
||||
region_dir: "test".to_string(),
|
||||
tag_num: 1,
|
||||
field_num: 1,
|
||||
create_if_not_exists: false,
|
||||
options: HashMap::new(),
|
||||
primary_key: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl CreateRequestBuilder {
|
||||
#[must_use]
|
||||
pub fn new() -> CreateRequestBuilder {
|
||||
CreateRequestBuilder::default()
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn region_dir(mut self, value: &str) -> Self {
|
||||
self.region_dir = value.to_string();
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn tag_num(mut self, value: usize) -> Self {
|
||||
self.tag_num = value;
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn field_num(mut self, value: usize) -> Self {
|
||||
self.field_num = value;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn create_if_not_exists(mut self, value: bool) -> Self {
|
||||
self.create_if_not_exists = value;
|
||||
#[must_use]
|
||||
pub fn primary_key(mut self, primary_key: Vec<ColumnId>) -> Self {
|
||||
self.primary_key = Some(primary_key);
|
||||
self
|
||||
}
|
||||
|
||||
@@ -297,9 +302,9 @@ impl CreateRequestBuilder {
|
||||
// We use empty engine name as we already locates the engine.
|
||||
engine: String::new(),
|
||||
column_metadatas,
|
||||
primary_key,
|
||||
create_if_not_exists: self.create_if_not_exists,
|
||||
primary_key: self.primary_key.clone().unwrap_or(primary_key),
|
||||
options: self.options.clone(),
|
||||
create_if_not_exists: false,
|
||||
region_dir: self.region_dir.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,7 +17,7 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_query::Output;
|
||||
use common_telemetry::{error, info, warn};
|
||||
use common_telemetry::{debug, error, info, warn};
|
||||
use snafu::ResultExt;
|
||||
use store_api::metadata::{RegionMetadata, RegionMetadataBuilder, RegionMetadataRef};
|
||||
use store_api::region_request::RegionAlterRequest;
|
||||
@@ -48,14 +48,31 @@ impl<S> RegionWorkerLoop<S> {
|
||||
// Get the version before alter.
|
||||
let version = region.version();
|
||||
if version.metadata.schema_version > request.schema_version {
|
||||
// This is possible if we retry the request.
|
||||
warn!(
|
||||
"Ignored alert request, region id:{}, region schema version {} is greater than request schema version {}",
|
||||
"Ignores alter request, region id:{}, region schema version {} is greater than request schema version {}",
|
||||
region_id, version.metadata.schema_version, request.schema_version
|
||||
);
|
||||
// Returns if it altered.
|
||||
sender.send(Ok(Output::AffectedRows(0)));
|
||||
return;
|
||||
}
|
||||
// Validate request.
|
||||
if let Err(e) = request.validate(&version.metadata) {
|
||||
// Invalid request.
|
||||
sender.send(Err(e).context(InvalidRegionRequestSnafu));
|
||||
return;
|
||||
}
|
||||
// Checks whether we need to alter the region.
|
||||
if !request.need_alter(&version.metadata) {
|
||||
debug!(
|
||||
"Ignores alter request as it alters nothing, region_id: {}, request: {:?}",
|
||||
region_id, request
|
||||
);
|
||||
sender.send(Ok(Output::AffectedRows(0)));
|
||||
return;
|
||||
}
|
||||
|
||||
// Checks whether we can alter the region directly.
|
||||
if !version.memtables.is_empty() {
|
||||
// If memtable is not empty, we can't alter it directly and need to flush
|
||||
@@ -132,11 +149,6 @@ fn metadata_after_alteration(
|
||||
metadata: &RegionMetadata,
|
||||
request: RegionAlterRequest,
|
||||
) -> Result<RegionMetadataRef> {
|
||||
// Validates request.
|
||||
request
|
||||
.validate(metadata)
|
||||
.context(InvalidRegionRequestSnafu)?;
|
||||
|
||||
let mut builder = RegionMetadataBuilder::from_existing(metadata.clone());
|
||||
builder
|
||||
.alter(request.kind)
|
||||
|
||||
@@ -18,14 +18,14 @@ use std::sync::Arc;
|
||||
|
||||
use common_query::Output;
|
||||
use common_telemetry::info;
|
||||
use snafu::{ensure, ResultExt};
|
||||
use snafu::ResultExt;
|
||||
use store_api::logstore::LogStore;
|
||||
use store_api::metadata::RegionMetadataBuilder;
|
||||
use store_api::region_request::RegionCreateRequest;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::error::{InvalidMetadataSnafu, RegionExistsSnafu, Result};
|
||||
use crate::region::opener::RegionOpener;
|
||||
use crate::error::{InvalidMetadataSnafu, Result};
|
||||
use crate::region::opener::{check_recovered_region, RegionOpener};
|
||||
use crate::worker::RegionWorkerLoop;
|
||||
|
||||
impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
@@ -35,13 +35,15 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
request: RegionCreateRequest,
|
||||
) -> Result<Output> {
|
||||
// Checks whether the table exists.
|
||||
if self.regions.is_region_exists(region_id) {
|
||||
ensure!(
|
||||
request.create_if_not_exists,
|
||||
RegionExistsSnafu { region_id }
|
||||
);
|
||||
|
||||
if let Some(region) = self.regions.get_region(region_id) {
|
||||
// Region already exists.
|
||||
check_recovered_region(
|
||||
®ion.metadata(),
|
||||
region_id,
|
||||
&request.column_metadatas,
|
||||
&request.primary_key,
|
||||
)?;
|
||||
|
||||
return Ok(Output::AffectedRows(0));
|
||||
}
|
||||
|
||||
@@ -63,7 +65,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
.metadata(metadata)
|
||||
.region_dir(&request.region_dir)
|
||||
.options(request.options)
|
||||
.create(&self.config)
|
||||
.create_or_open(&self.config, &self.wal)
|
||||
.await?;
|
||||
|
||||
// TODO(yingwen): Custom the Debug format for the metadata and also print it.
|
||||
|
||||
@@ -353,7 +353,6 @@ impl RegionMetadata {
|
||||
|
||||
/// Checks whether it is a valid column.
|
||||
fn validate_column_metadata(column_metadata: &ColumnMetadata) -> Result<()> {
|
||||
// TODO(yingwen): Ensure column name is not internal columns.
|
||||
if column_metadata.semantic_type == SemanticType::Timestamp {
|
||||
ensure!(
|
||||
column_metadata
|
||||
@@ -460,11 +459,23 @@ impl RegionMetadataBuilder {
|
||||
Ok(meta)
|
||||
}
|
||||
|
||||
/// Adds columns to the metadata.
|
||||
/// Adds columns to the metadata if not exist.
|
||||
fn add_columns(&mut self, columns: Vec<AddColumn>) -> Result<()> {
|
||||
let mut names: HashSet<_> = self
|
||||
.column_metadatas
|
||||
.iter()
|
||||
.map(|col| col.column_schema.name.clone())
|
||||
.collect();
|
||||
|
||||
for add_column in columns {
|
||||
if names.contains(&add_column.column_metadata.column_schema.name) {
|
||||
// Column already exists.
|
||||
continue;
|
||||
}
|
||||
|
||||
let column_id = add_column.column_metadata.column_id;
|
||||
let semantic_type = add_column.column_metadata.semantic_type;
|
||||
let column_name = add_column.column_metadata.column_schema.name.clone();
|
||||
match add_column.location {
|
||||
None => {
|
||||
self.column_metadatas.push(add_column.column_metadata);
|
||||
@@ -489,6 +500,7 @@ impl RegionMetadataBuilder {
|
||||
.insert(pos + 1, add_column.column_metadata);
|
||||
}
|
||||
}
|
||||
names.insert(column_name);
|
||||
if semantic_type == SemanticType::Tag {
|
||||
// For a new tag, we extend the primary key.
|
||||
self.primary_key.push(column_id);
|
||||
@@ -498,7 +510,7 @@ impl RegionMetadataBuilder {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Drops columns from the metadata.
|
||||
/// Drops columns from the metadata if exist.
|
||||
fn drop_columns(&mut self, names: &[String]) {
|
||||
let name_set: HashSet<_> = names.iter().collect();
|
||||
self.column_metadatas
|
||||
@@ -938,6 +950,7 @@ mod test {
|
||||
// a (tag), b (field), c (ts)
|
||||
let metadata = build_test_region_metadata();
|
||||
let mut builder = RegionMetadataBuilder::from_existing(metadata);
|
||||
// tag d
|
||||
builder
|
||||
.alter(AlterKind::AddColumns {
|
||||
columns: vec![AddColumn {
|
||||
@@ -1005,11 +1018,91 @@ mod test {
|
||||
names: vec!["a".to_string()],
|
||||
})
|
||||
.unwrap();
|
||||
// Build returns error as the primary key has more columns.
|
||||
// Build returns error as the primary key contains a.
|
||||
let err = builder.build().unwrap_err();
|
||||
assert_eq!(StatusCode::InvalidArguments, err.status_code());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_add_if_not_exists() {
|
||||
// a (tag), b (field), c (ts)
|
||||
let metadata = build_test_region_metadata();
|
||||
let mut builder = RegionMetadataBuilder::from_existing(metadata);
|
||||
// tag d
|
||||
builder
|
||||
.alter(AlterKind::AddColumns {
|
||||
columns: vec![
|
||||
AddColumn {
|
||||
column_metadata: new_column_metadata("d", true, 4),
|
||||
location: None,
|
||||
},
|
||||
AddColumn {
|
||||
column_metadata: new_column_metadata("d", true, 4),
|
||||
location: None,
|
||||
},
|
||||
],
|
||||
})
|
||||
.unwrap();
|
||||
let metadata = builder.build().unwrap();
|
||||
check_columns(&metadata, &["a", "b", "c", "d"]);
|
||||
assert_eq!([1, 4], &metadata.primary_key[..]);
|
||||
|
||||
let mut builder = RegionMetadataBuilder::from_existing(metadata);
|
||||
// field b.
|
||||
builder
|
||||
.alter(AlterKind::AddColumns {
|
||||
columns: vec![AddColumn {
|
||||
column_metadata: new_column_metadata("b", false, 2),
|
||||
location: None,
|
||||
}],
|
||||
})
|
||||
.unwrap();
|
||||
let metadata = builder.build().unwrap();
|
||||
check_columns(&metadata, &["a", "b", "c", "d"]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_drop_if_exists() {
|
||||
// a (tag), b (field), c (ts)
|
||||
let metadata = build_test_region_metadata();
|
||||
let mut builder = RegionMetadataBuilder::from_existing(metadata);
|
||||
// field d, e
|
||||
builder
|
||||
.alter(AlterKind::AddColumns {
|
||||
columns: vec![
|
||||
AddColumn {
|
||||
column_metadata: new_column_metadata("d", false, 4),
|
||||
location: None,
|
||||
},
|
||||
AddColumn {
|
||||
column_metadata: new_column_metadata("e", false, 5),
|
||||
location: None,
|
||||
},
|
||||
],
|
||||
})
|
||||
.unwrap();
|
||||
let metadata = builder.build().unwrap();
|
||||
check_columns(&metadata, &["a", "b", "c", "d", "e"]);
|
||||
|
||||
let mut builder = RegionMetadataBuilder::from_existing(metadata);
|
||||
builder
|
||||
.alter(AlterKind::DropColumns {
|
||||
names: vec!["b".to_string(), "b".to_string()],
|
||||
})
|
||||
.unwrap();
|
||||
let metadata = builder.build().unwrap();
|
||||
check_columns(&metadata, &["a", "c", "d", "e"]);
|
||||
|
||||
let mut builder = RegionMetadataBuilder::from_existing(metadata);
|
||||
builder
|
||||
.alter(AlterKind::DropColumns {
|
||||
names: vec!["b".to_string(), "e".to_string()],
|
||||
})
|
||||
.unwrap();
|
||||
let metadata = builder.build().unwrap();
|
||||
check_columns(&metadata, &["a", "c", "d"]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_invalid_column_name() {
|
||||
let mut builder = create_builder();
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::collections::HashMap;
|
||||
use std::fmt;
|
||||
|
||||
use api::v1::add_column_location::LocationType;
|
||||
@@ -151,6 +151,7 @@ pub struct RegionCreateRequest {
|
||||
/// Columns in the primary key.
|
||||
pub primary_key: Vec<ColumnId>,
|
||||
/// Create region if not exists.
|
||||
// TODO(yingwen): Remove this.
|
||||
pub create_if_not_exists: bool,
|
||||
/// Options of the created region.
|
||||
pub options: HashMap<String, String>,
|
||||
@@ -203,6 +204,14 @@ impl RegionAlterRequest {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns true if we need to apply the request to the region.
|
||||
///
|
||||
/// The `request` should be valid.
|
||||
pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
|
||||
debug_assert!(self.validate(metadata).is_ok());
|
||||
self.kind.need_alter(metadata)
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<AlterRequest> for RegionAlterRequest {
|
||||
@@ -238,23 +247,13 @@ pub enum AlterKind {
|
||||
|
||||
impl AlterKind {
|
||||
/// Returns an error if the the alter kind is invalid.
|
||||
///
|
||||
/// It allows adding column if not exists and dropping column if exists.
|
||||
pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
|
||||
match self {
|
||||
AlterKind::AddColumns { columns } => {
|
||||
let mut names = HashSet::with_capacity(columns.len());
|
||||
for col_to_add in columns {
|
||||
ensure!(
|
||||
!names.contains(&col_to_add.column_metadata.column_schema.name),
|
||||
InvalidRegionRequestSnafu {
|
||||
region_id: metadata.region_id,
|
||||
err: format!(
|
||||
"add column {} more than once",
|
||||
col_to_add.column_metadata.column_schema.name
|
||||
),
|
||||
}
|
||||
);
|
||||
col_to_add.validate(metadata)?;
|
||||
names.insert(&col_to_add.column_metadata.column_schema.name);
|
||||
}
|
||||
}
|
||||
AlterKind::DropColumns { names } => {
|
||||
@@ -266,14 +265,24 @@ impl AlterKind {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns true if we need to apply the alteration to the region.
|
||||
pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
|
||||
debug_assert!(self.validate(metadata).is_ok());
|
||||
match self {
|
||||
AlterKind::AddColumns { columns } => columns
|
||||
.iter()
|
||||
.any(|col_to_add| col_to_add.need_alter(metadata)),
|
||||
AlterKind::DropColumns { names } => names
|
||||
.iter()
|
||||
.any(|name| metadata.column_by_name(name).is_some()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns an error if the column to drop is invalid.
|
||||
fn validate_column_to_drop(name: &str, metadata: &RegionMetadata) -> Result<()> {
|
||||
let column = metadata
|
||||
.column_by_name(name)
|
||||
.with_context(|| InvalidRegionRequestSnafu {
|
||||
region_id: metadata.region_id,
|
||||
err: format!("column {} does not exist", name),
|
||||
})?;
|
||||
let Some(column) = metadata.column_by_name(name) else {
|
||||
return Ok(());
|
||||
};
|
||||
ensure!(
|
||||
column.semantic_type == SemanticType::Field,
|
||||
InvalidRegionRequestSnafu {
|
||||
@@ -320,6 +329,8 @@ pub struct AddColumn {
|
||||
|
||||
impl AddColumn {
|
||||
/// Returns an error if the column to add is invalid.
|
||||
///
|
||||
/// It allows adding existing columns.
|
||||
pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
|
||||
ensure!(
|
||||
self.column_metadata.column_schema.is_nullable()
|
||||
@@ -336,21 +347,17 @@ impl AddColumn {
|
||||
),
|
||||
}
|
||||
);
|
||||
ensure!(
|
||||
metadata
|
||||
.column_by_name(&self.column_metadata.column_schema.name)
|
||||
.is_none(),
|
||||
InvalidRegionRequestSnafu {
|
||||
region_id: metadata.region_id,
|
||||
err: format!(
|
||||
"column {} already exists",
|
||||
self.column_metadata.column_schema.name
|
||||
),
|
||||
}
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns true if no column to add to the region.
|
||||
pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
|
||||
debug_assert!(self.validate(metadata).is_ok());
|
||||
metadata
|
||||
.column_by_name(&self.column_metadata.column_schema.name)
|
||||
.is_none()
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<v1::region::AddColumn> for AddColumn {
|
||||
@@ -574,7 +581,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_add_column_validate() {
|
||||
let metadata = new_metadata();
|
||||
AddColumn {
|
||||
let add_column = AddColumn {
|
||||
column_metadata: ColumnMetadata {
|
||||
column_schema: ColumnSchema::new(
|
||||
"tag_1",
|
||||
@@ -585,10 +592,11 @@ mod tests {
|
||||
column_id: 4,
|
||||
},
|
||||
location: None,
|
||||
}
|
||||
.validate(&metadata)
|
||||
.unwrap();
|
||||
};
|
||||
add_column.validate(&metadata).unwrap();
|
||||
assert!(add_column.need_alter(&metadata));
|
||||
|
||||
// Add not null column.
|
||||
AddColumn {
|
||||
column_metadata: ColumnMetadata {
|
||||
column_schema: ColumnSchema::new(
|
||||
@@ -604,7 +612,8 @@ mod tests {
|
||||
.validate(&metadata)
|
||||
.unwrap_err();
|
||||
|
||||
AddColumn {
|
||||
// Add existing column.
|
||||
let add_column = AddColumn {
|
||||
column_metadata: ColumnMetadata {
|
||||
column_schema: ColumnSchema::new(
|
||||
"tag_0",
|
||||
@@ -615,9 +624,9 @@ mod tests {
|
||||
column_id: 4,
|
||||
},
|
||||
location: None,
|
||||
}
|
||||
.validate(&metadata)
|
||||
.unwrap_err();
|
||||
};
|
||||
add_column.validate(&metadata).unwrap();
|
||||
assert!(!add_column.need_alter(&metadata));
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -651,27 +660,30 @@ mod tests {
|
||||
],
|
||||
};
|
||||
let metadata = new_metadata();
|
||||
kind.validate(&metadata).unwrap_err();
|
||||
kind.validate(&metadata).unwrap();
|
||||
assert!(kind.need_alter(&metadata));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_validate_drop_column() {
|
||||
let metadata = new_metadata();
|
||||
AlterKind::DropColumns {
|
||||
let kind = AlterKind::DropColumns {
|
||||
names: vec!["xxxx".to_string()],
|
||||
}
|
||||
.validate(&metadata)
|
||||
.unwrap_err();
|
||||
};
|
||||
kind.validate(&metadata).unwrap();
|
||||
assert!(!kind.need_alter(&metadata));
|
||||
|
||||
AlterKind::DropColumns {
|
||||
names: vec!["tag_0".to_string()],
|
||||
}
|
||||
.validate(&metadata)
|
||||
.unwrap_err();
|
||||
AlterKind::DropColumns {
|
||||
|
||||
let kind = AlterKind::DropColumns {
|
||||
names: vec!["field_0".to_string()],
|
||||
}
|
||||
.validate(&metadata)
|
||||
.unwrap();
|
||||
};
|
||||
kind.validate(&metadata).unwrap();
|
||||
assert!(kind.need_alter(&metadata));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
Reference in New Issue
Block a user