feat(mito): Implement skeleton for alteration (#2343)

* feat: impl handle_alter wip

* refactor: move send_result to worker.rs

* feat: skeleton for handle_alter_request

* feat: write requests should wait for alteration

* feat: define alter request

* chore: no warnings

* fix: remove memtables after flush

* chore: update comments and impl add_write_request_to_pending

* feat: add schema version to RegionMetadata

* feat: impl alter_schema/can_alter_directly

* chore: use send_result

* test: pull next_batch again

* feat: convert pb AlterRequest to RegionAlterRequest

* feat: validate alter request

* feat: validate request and alter metadata

* feat: allow none location

* test: test alter

* fix: recover files and flushed entry id from manifest

* test: test alter

* chore: change comments and variables

* chore: fix compiler errors

* feat: add is_empty() to MemtableVersion

* test: fix metadata alter test

* fix: Compaction picker doesn't notify waiters if it returns None

* chore: address CR comments

* test: add tests for alter request

* refactor: use send_result
This commit is contained in:
Yingwen
2023-09-11 22:08:23 +08:00
committed by Ruihang Xia
parent 3331e3158c
commit 606ee43f1d
22 changed files with 1240 additions and 151 deletions

View File

@@ -39,6 +39,7 @@ use crate::request::{BackgroundNotify, CompactionFailed, CompactionFinished, Wor
use crate::sst::file::{FileHandle, FileId, FileMeta};
use crate::sst::file_purger::FilePurgerRef;
use crate::sst::version::LevelMeta;
use crate::worker::send_result;
const MAX_PARALLEL_COMPACTION: usize = 8;
@@ -122,7 +123,7 @@ impl Picker for TwcsPicker {
ttl,
compaction_time_window,
request_sender,
waiter: waiters,
waiter,
file_purger,
} = req;
@@ -155,6 +156,8 @@ impl Picker for TwcsPicker {
let outputs = self.build_output(&windows, active_window, time_window_size);
if outputs.is_empty() && expired_ssts.is_empty() {
// Nothing to compact.
send_result(waiter, Ok(Output::AffectedRows(0)));
return None;
}
let task = TwcsCompactionTask {
@@ -166,7 +169,7 @@ impl Picker for TwcsPicker {
sst_write_buffer_size: ReadableSize::mb(4),
compaction_time_window: None,
request_sender,
sender: waiters,
sender: waiter,
file_purger,
};
Some(Box::new(task))
@@ -267,7 +270,8 @@ impl TwcsCompactionTask {
compacted_inputs.extend(output.inputs.iter().map(FileHandle::meta));
info!(
"Compaction output [{}]-> {}",
"Compaction region {} output [{}]-> {}",
self.region_id,
output
.inputs
.iter()
@@ -315,15 +319,6 @@ impl TwcsCompactionTask {
Ok((output, compacted))
}
/// Handles compaction success.
fn early_success(&mut self) {
if let Some(sender) = self.sender.take() {
let _ = sender.send(Ok(Output::AffectedRows(0)).context(CompactRegionSnafu {
region_id: self.region_id,
}));
}
}
/// Handles compaction failure, notifies all waiters.
fn on_failure(&mut self, err: Arc<error::Error>) {
if let Some(sender) = self.sender.take() {
@@ -353,7 +348,6 @@ impl CompactionTask for TwcsCompactionTask {
"Compacted SST files, input: {:?}, output: {:?}, window: {:?}",
added, deleted, self.compaction_time_window
);
self.early_success();
BackgroundNotify::CompactionFinished(CompactionFinished {
region_id: self.region_id,

View File

@@ -14,6 +14,8 @@
//! Mito region engine.
#[cfg(test)]
mod alter_test;
#[cfg(test)]
mod close_test;
#[cfg(test)]

View File

@@ -0,0 +1,111 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use api::v1::{Rows, SemanticType};
use common_recordbatch::RecordBatches;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use store_api::metadata::ColumnMetadata;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{
AddColumn, AddColumnLocation, AlterKind, RegionAlterRequest, RegionOpenRequest, RegionRequest,
};
use store_api::storage::{RegionId, ScanRequest};
use crate::config::MitoConfig;
use crate::engine::MitoEngine;
use crate::test_util::{build_rows, put_rows, rows_schema, CreateRequestBuilder, TestEnv};
async fn scan_check_after_alter(engine: &MitoEngine, region_id: RegionId, expected: &str) {
let request = ScanRequest::default();
let scanner = engine.scan(region_id, request).unwrap();
assert_eq!(0, scanner.num_memtables());
assert_eq!(1, scanner.num_files());
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(expected, batches.pretty_print().unwrap());
}
#[tokio::test]
async fn test_alter_region() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
let column_schemas = rows_schema(&request);
let region_dir = request.region_dir.clone();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
let rows = Rows {
schema: column_schemas,
rows: build_rows(0, 3),
};
put_rows(&engine, region_id, rows).await;
let request = RegionAlterRequest {
schema_version: 0,
kind: AlterKind::AddColumns {
columns: vec![AddColumn {
column_metadata: ColumnMetadata {
column_schema: ColumnSchema::new(
"tag_1",
ConcreteDataType::string_datatype(),
true,
),
semantic_type: SemanticType::Tag,
column_id: 3,
},
location: Some(AddColumnLocation::First),
}],
},
};
engine
.handle_request(region_id, RegionRequest::Alter(request))
.await
.unwrap();
let expected = "\
+-------+-------+---------+---------------------+
| tag_1 | tag_0 | field_0 | ts |
+-------+-------+---------+---------------------+
| | 0 | 0.0 | 1970-01-01T00:00:00 |
| | 1 | 1.0 | 1970-01-01T00:00:01 |
| | 2 | 2.0 | 1970-01-01T00:00:02 |
+-------+-------+---------+---------------------+";
scan_check_after_alter(&engine, region_id, expected).await;
// Reopen region.
let engine = env.reopen_engine(engine, MitoConfig::default()).await;
engine
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: String::new(),
region_dir,
options: HashMap::default(),
}),
)
.await
.unwrap();
scan_check_after_alter(&engine, region_id, expected).await;
}

View File

@@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use std::ops::Range;
use api::v1::{ColumnSchema, Rows};
@@ -29,7 +28,7 @@ use store_api::storage::{RegionId, ScanRequest};
use crate::config::MitoConfig;
use crate::engine::MitoEngine;
use crate::test_util::{
build_rows, column_metadata_to_column_schema, put_rows, CreateRequestBuilder, TestEnv,
build_rows_for_key, column_metadata_to_column_schema, put_rows, CreateRequestBuilder, TestEnv,
};
async fn put_and_flush(
@@ -40,7 +39,7 @@ async fn put_and_flush(
) {
let rows = Rows {
schema: column_schemas.to_vec(),
rows: build_rows(rows.start, rows.end),
rows: build_rows_for_key("a", rows.start, rows.end, 0),
};
put_rows(engine, region_id, rows).await;
@@ -63,7 +62,7 @@ async fn delete_and_flush(
let row_cnt = rows.len();
let rows = Rows {
schema: column_schemas.to_vec(),
rows: build_rows(rows.start, rows.end),
rows: build_rows_for_key("a", rows.start, rows.end, 0),
};
let deleted = engine
@@ -89,8 +88,8 @@ async fn delete_and_flush(
assert_eq!(0, rows);
}
async fn collect_stream_ts(stream: SendableRecordBatchStream) -> HashSet<i64> {
let mut res = HashSet::new();
async fn collect_stream_ts(stream: SendableRecordBatchStream) -> Vec<i64> {
let mut res = Vec::new();
let batches = RecordBatches::try_collect(stream).await.unwrap();
for batch in batches {
let ts_col = batch
@@ -139,5 +138,5 @@ async fn test_compaction_region() {
.unwrap();
let vec = collect_stream_ts(stream).await;
assert_eq!((0..25).map(|v| v * 1000).collect::<HashSet<_>>(), vec);
assert_eq!((0..25).map(|v| v * 1000).collect::<Vec<_>>(), vec);
}

View File

@@ -53,6 +53,7 @@ async fn test_manual_flush() {
let request = ScanRequest::default();
let scanner = engine.scan(region_id, request).unwrap();
assert_eq!(0, scanner.num_memtables());
assert_eq!(1, scanner.num_files());
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
@@ -98,7 +99,7 @@ async fn test_flush_engine() {
write_buffer_manager.set_should_flush(true);
// Writes and triggers flush.
// Writes to the mutable memtable and triggers flush.
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows_for_key("b", 0, 2, 0),
@@ -110,6 +111,7 @@ async fn test_flush_engine() {
let request = ScanRequest::default();
let scanner = engine.scan(region_id, request).unwrap();
assert_eq!(1, scanner.num_memtables());
assert_eq!(1, scanner.num_files());
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
@@ -174,6 +176,7 @@ async fn test_write_stall() {
let request = ScanRequest::default();
let scanner = engine.scan(region_id, request).unwrap();
assert_eq!(1, scanner.num_memtables());
assert_eq!(1, scanner.num_files());
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
@@ -209,6 +212,7 @@ async fn test_flush_empty() {
let request = ScanRequest::default();
let scanner = engine.scan(region_id, request).unwrap();
assert_eq!(0, scanner.num_memtables());
assert_eq!(0, scanner.num_files());
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();

View File

@@ -468,6 +468,12 @@ pub enum Error {
reason: String,
location: Location,
},
#[snafu(display("{}, location: {}", source, location))]
InvalidRegionRequest {
source: store_api::metadata::MetadataError,
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -537,6 +543,7 @@ impl ErrorExt for Error {
RejectWrite { .. } => StatusCode::StorageUnavailable,
CompactRegion { source, .. } => source.status_code(),
CompatReader { .. } => StatusCode::Unexpected,
InvalidRegionRequest { source, .. } => source.status_code(),
}
}

View File

@@ -31,12 +31,14 @@ use crate::read::Source;
use crate::region::version::{VersionControlData, VersionRef};
use crate::region::MitoRegionRef;
use crate::request::{
BackgroundNotify, FlushFailed, FlushFinished, SenderDdlRequest, WorkerRequest,
BackgroundNotify, FlushFailed, FlushFinished, SenderDdlRequest, SenderWriteRequest,
WorkerRequest,
};
use crate::schedule::scheduler::{Job, SchedulerRef};
use crate::sst::file::{FileId, FileMeta};
use crate::sst::file_purger::FilePurgerRef;
use crate::sst::parquet::WriteOptions;
use crate::worker::send_result;
/// Global write buffer (memtable) manager.
///
@@ -165,7 +167,8 @@ pub enum FlushReason {
EngineFull,
/// Manual flush.
Manual,
// TODO(yingwen): Alter.
/// Flush to alter table.
Alter,
}
/// Task to flush a region.
@@ -357,13 +360,13 @@ impl FlushScheduler {
// Checks whether we can flush the region now.
if flush_status.flushing {
// There is already a flush job running.
flush_status.push_task(task);
flush_status.merge_task(task);
return Ok(());
}
// If there are pending tasks, then we should push it to pending list.
if flush_status.pending_task.is_some() {
flush_status.push_task(task);
flush_status.merge_task(task);
return Ok(());
}
@@ -393,7 +396,7 @@ impl FlushScheduler {
pub(crate) fn on_flush_success(
&mut self,
region_id: RegionId,
) -> Option<Vec<SenderDdlRequest>> {
) -> Option<(Vec<SenderDdlRequest>, Vec<SenderWriteRequest>)> {
let Some(flush_status) = self.region_status.get_mut(&region_id) else {
return None;
};
@@ -401,11 +404,11 @@ impl FlushScheduler {
// This region doesn't have running flush job.
flush_status.flushing = false;
let pending_ddls = if flush_status.pending_task.is_none() {
let pending_requests = if flush_status.pending_task.is_none() {
// The region doesn't have any pending flush task.
// Safety: The flush status exists.
let flush_status = self.region_status.remove(&region_id).unwrap();
Some(flush_status.pending_ddls)
Some((flush_status.pending_ddls, flush_status.pending_writes))
} else {
None
};
@@ -415,7 +418,7 @@ impl FlushScheduler {
error!(e; "Flush of region {} is successful, but failed to schedule next flush", region_id);
}
pending_ddls
pending_requests
}
/// Notifies the scheduler that the flush job is finished.
@@ -460,17 +463,31 @@ impl FlushScheduler {
/// Add ddl request to pending queue.
///
/// Returns error if region doesn't request flush.
pub(crate) fn add_ddl_request_to_pending(
&mut self,
request: SenderDdlRequest,
) -> Result<(), SenderDdlRequest> {
if let Some(status) = self.region_status.get_mut(&request.region_id) {
status.pending_ddls.push(request);
return Ok(());
}
/// # Panics
/// Panics if region didn't request flush.
pub(crate) fn add_ddl_request_to_pending(&mut self, request: SenderDdlRequest) {
let status = self.region_status.get_mut(&request.region_id).unwrap();
status.pending_ddls.push(request);
}
Err(request)
/// Add write request to pending queue.
///
/// # Panics
/// Panics if region didn't request flush.
pub(crate) fn add_write_request_to_pending(&mut self, request: SenderWriteRequest) {
let status = self
.region_status
.get_mut(&request.request.region_id)
.unwrap();
status.pending_writes.push(request);
}
/// Returns true if the region has pending DDLs.
pub(crate) fn has_pending_ddls(&self, region_id: RegionId) -> bool {
self.region_status
.get(&region_id)
.map(|status| !status.pending_ddls.is_empty())
.unwrap_or(false)
}
/// Schedules a new flush task when the scheduler can submit next task.
@@ -508,6 +525,8 @@ struct FlushStatus {
pending_task: Option<RegionFlushTask>,
/// Pending ddl requests.
pending_ddls: Vec<SenderDdlRequest>,
/// Requests waiting to write after altering the region.
pending_writes: Vec<SenderWriteRequest>,
}
impl FlushStatus {
@@ -517,10 +536,12 @@ impl FlushStatus {
flushing: false,
pending_task: None,
pending_ddls: Vec::new(),
pending_writes: Vec::new(),
}
}
fn push_task(&mut self, task: RegionFlushTask) {
/// Merges the task to pending task.
fn merge_task(&mut self, task: RegionFlushTask) {
if let Some(pending) = &mut self.pending_task {
pending.merge(task);
} else {
@@ -533,11 +554,20 @@ impl FlushStatus {
task.on_failure(err.clone());
}
for ddl in self.pending_ddls {
if let Some(sender) = ddl.sender {
let _ = sender.send(Err(err.clone()).context(FlushRegionSnafu {
send_result(
ddl.sender,
Err(err.clone()).context(FlushRegionSnafu {
region_id: self.region.region_id,
}));
}
}),
);
}
for write_req in self.pending_writes {
send_result(
write_req.sender,
Err(err.clone()).context(FlushRegionSnafu {
region_id: self.region.region_id,
}),
);
}
}
}

View File

@@ -232,7 +232,9 @@ mod tests {
{"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}
],
"primary_key":[1],
"region_id":5299989648942}
"region_id":5299989648942,
"schema_version":0
}
}"#;
let _ = serde_json::from_str::<RegionChange>(region_change).unwrap();

View File

@@ -149,7 +149,7 @@ async fn manager_with_checkpoint_distance_1() {
.await
.unwrap();
let raw_json = std::str::from_utf8(&raw_bytes).unwrap();
let expected_json = "{\"size\":750,\"version\":9,\"checksum\":null,\"extend_metadata\":{}}";
let expected_json = "{\"size\":769,\"version\":9,\"checksum\":null,\"extend_metadata\":{}}";
assert_eq!(expected_json, raw_json);
// reopen the manager

View File

@@ -18,10 +18,10 @@ use std::sync::Arc;
use smallvec::SmallVec;
use crate::memtable::MemtableRef;
use crate::memtable::{MemtableId, MemtableRef};
/// A version of current memtables in a region.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub(crate) struct MemtableVersion {
/// Mutable memtable.
pub(crate) mutable: MemtableRef,
@@ -85,8 +85,26 @@ impl MemtableVersion {
})
}
/// Removes memtables by ids from immutable memtables.
pub(crate) fn remove_memtables(&mut self, ids: &[MemtableId]) {
self.immutables = self
.immutables
.iter()
.filter(|mem| !ids.contains(&mem.id()))
.cloned()
.collect();
}
/// Returns the memory usage of the mutable memtable.
pub(crate) fn mutable_usage(&self) -> usize {
self.mutable.stats().estimated_bytes
}
/// Returns true if the memtable version is empty.
///
/// The version is empty when mutable memtable is empty and there is no
/// immutable memtables.
pub(crate) fn is_empty(&self) -> bool {
self.mutable.is_empty() && self.immutables.is_empty()
}
}

View File

@@ -52,6 +52,13 @@ impl Scanner {
Scanner::Seq(seq_scan) => seq_scan.num_files(),
}
}
/// Returns number of memtables to scan.
pub(crate) fn num_memtables(&self) -> usize {
match self {
Scanner::Seq(seq_scan) => seq_scan.num_memtables(),
}
}
}
#[cfg_attr(doc, aquamarine::aquamarine)]
@@ -139,6 +146,11 @@ impl ScanRegion {
}
let memtables = self.version.memtables.list_memtables();
// Skip empty memtables.
let memtables: Vec<_> = memtables
.into_iter()
.filter(|mem| !mem.is_empty())
.collect();
debug!(
"Seq scan region {}, memtables: {}, ssts_to_read: {}, total_ssts: {}",

View File

@@ -147,6 +147,11 @@ impl SeqScan {
#[cfg(test)]
impl SeqScan {
/// Returns number of memtables to scan.
pub(crate) fn num_memtables(&self) -> usize {
self.memtables.len()
}
/// Returns number of SST files to scan.
pub(crate) fn num_files(&self) -> usize {
self.files.len()

View File

@@ -146,19 +146,23 @@ impl RegionOpener {
);
let region_id = metadata.region_id;
let mutable = self.memtable_builder.build(&metadata);
let version = VersionBuilder::new(metadata, mutable).build();
let flushed_sequence = version.flushed_entry_id;
let version_control = Arc::new(VersionControl::new(version));
replay_memtable(wal, region_id, flushed_sequence, &version_control).await?;
let access_layer = Arc::new(AccessLayer::new(self.region_dir, self.object_store.clone()));
let file_purger = Arc::new(LocalFilePurger::new(self.scheduler, access_layer.clone()));
let mutable = self.memtable_builder.build(&metadata);
let version = VersionBuilder::new(metadata, mutable)
.add_files(file_purger.clone(), manifest.files.values().cloned())
.flushed_entry_id(manifest.flushed_entry_id)
.build();
let flushed_entry_id = version.flushed_entry_id;
let version_control = Arc::new(VersionControl::new(version));
replay_memtable(wal, region_id, flushed_entry_id, &version_control).await?;
let region = MitoRegion {
region_id: self.region_id,
version_control,
access_layer: access_layer.clone(),
access_layer,
manifest_manager,
file_purger: Arc::new(LocalFilePurger::new(self.scheduler, access_layer)),
file_purger,
last_flush_millis: AtomicI64::new(current_time_millis()),
};
Ok(region)

View File

@@ -30,7 +30,8 @@ use store_api::storage::SequenceNumber;
use crate::manifest::action::RegionEdit;
use crate::memtable::version::{MemtableVersion, MemtableVersionRef};
use crate::memtable::{MemtableBuilderRef, MemtableRef};
use crate::memtable::{MemtableBuilderRef, MemtableId, MemtableRef};
use crate::sst::file::FileMeta;
use crate::sst::file_purger::FilePurgerRef;
use crate::sst::version::{SstVersion, SstVersionRef};
use crate::wal::EntryId;
@@ -90,11 +91,17 @@ impl VersionControl {
}
/// Apply edit to current version.
pub(crate) fn apply_edit(&self, edit: RegionEdit, purger: FilePurgerRef) {
pub(crate) fn apply_edit(
&self,
edit: RegionEdit,
memtables_to_remove: &[MemtableId],
purger: FilePurgerRef,
) {
let version = self.current().version;
let new_version = Arc::new(
VersionBuilder::from_version(version)
.apply_edit(edit, purger)
.remove_memtables(memtables_to_remove)
.build(),
);
@@ -108,6 +115,26 @@ impl VersionControl {
data.is_dropped = true;
data.version.ssts.mark_all_deleted();
}
/// Alter schema of the region.
///
/// It replaces existing mutable memtable with a memtable that uses the
/// new schema. Memtables of the version must be empty.
pub(crate) fn alter_schema(&self, metadata: RegionMetadataRef, builder: &MemtableBuilderRef) {
let new_mutable = builder.build(&metadata);
let version = self.current().version;
debug_assert!(version.memtables.mutable.is_empty());
debug_assert!(version.memtables.immutables().is_empty());
let new_version = Arc::new(
VersionBuilder::from_version(version)
.metadata(metadata)
.memtables(MemtableVersion::new(new_mutable))
.build(),
);
let mut version_data = self.data.write().unwrap();
version_data.version = new_version;
}
}
pub(crate) type VersionControlRef = Arc<VersionControl>;
@@ -156,7 +183,7 @@ pub(crate) struct VersionBuilder {
impl VersionBuilder {
/// Returns a new builder.
pub(crate) fn new(metadata: RegionMetadataRef, mutable: MemtableRef) -> VersionBuilder {
pub(crate) fn new(metadata: RegionMetadataRef, mutable: MemtableRef) -> Self {
VersionBuilder {
metadata,
memtables: Arc::new(MemtableVersion::new(mutable)),
@@ -166,7 +193,7 @@ impl VersionBuilder {
}
/// Returns a new builder from an existing version.
pub(crate) fn from_version(version: VersionRef) -> VersionBuilder {
pub(crate) fn from_version(version: VersionRef) -> Self {
VersionBuilder {
metadata: version.metadata.clone(),
memtables: version.memtables.clone(),
@@ -176,17 +203,25 @@ impl VersionBuilder {
}
/// Sets memtables.
pub(crate) fn memtables(mut self, memtables: MemtableVersion) -> VersionBuilder {
pub(crate) fn memtables(mut self, memtables: MemtableVersion) -> Self {
self.memtables = Arc::new(memtables);
self
}
/// Sets metadata.
pub(crate) fn metadata(mut self, metadata: RegionMetadataRef) -> Self {
self.metadata = metadata;
self
}
/// Sets flushed entry id.
pub(crate) fn flushed_entry_id(mut self, entry_id: EntryId) -> Self {
self.flushed_entry_id = entry_id;
self
}
/// Apply edit to the builder.
pub(crate) fn apply_edit(
mut self,
edit: RegionEdit,
file_purger: FilePurgerRef,
) -> VersionBuilder {
pub(crate) fn apply_edit(mut self, edit: RegionEdit, file_purger: FilePurgerRef) -> Self {
if let Some(flushed_entry_id) = edit.flushed_entry_id {
self.flushed_entry_id = self.flushed_entry_id.max(flushed_entry_id);
}
@@ -200,6 +235,29 @@ impl VersionBuilder {
self
}
/// Remove memtables from the builder.
pub(crate) fn remove_memtables(mut self, ids: &[MemtableId]) -> Self {
if !ids.is_empty() {
let mut memtables = (*self.memtables).clone();
memtables.remove_memtables(ids);
self.memtables = Arc::new(memtables);
}
self
}
/// Add files to the builder.
pub(crate) fn add_files(
mut self,
file_purger: FilePurgerRef,
files: impl Iterator<Item = FileMeta>,
) -> Self {
let mut ssts = (*self.ssts).clone();
ssts.add_files(file_purger, files);
self.ssts = Arc::new(ssts);
self
}
/// Builds a new [Version] from the builder.
pub(crate) fn build(self) -> Version {
Version {

View File

@@ -137,25 +137,6 @@ impl TestEnv {
)
}
/// Reopen the engine.
pub async fn reopen_engine_with(
&self,
engine: MitoEngine,
config: MitoConfig,
manager: WriteBufferManagerRef,
listener: Option<EventListenerRef>,
) -> MitoEngine {
engine.stop().await.unwrap();
MitoEngine::new_for_test(
config,
self.logstore.clone().unwrap(),
self.object_store.clone().unwrap(),
manager,
listener,
)
}
/// Creates a new [WorkerGroup] with specific config under this env.
pub(crate) async fn create_worker_group(&self, config: MitoConfig) -> WorkerGroup {
let (log_store, object_store) = self.create_log_and_object_store().await;
@@ -404,6 +385,8 @@ pub async fn check_reader_result<R: BatchReader>(reader: &mut R, expect: &[Batch
}
assert_eq!(expect, result);
// Next call to `next_batch()` still returns None.
assert!(reader.next_batch().await.unwrap().is_none());
}
/// A mock [WriteBufferManager] that supports controlling whether to flush/stall.

View File

@@ -14,6 +14,7 @@
//! Structs and utilities for writing regions.
mod handle_alter;
mod handle_close;
mod handle_compaction;
mod handle_create;
@@ -27,6 +28,7 @@ use std::hash::{Hash, Hasher};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use common_query::Output;
use common_runtime::JoinHandle;
use common_telemetry::{error, info, warn};
use futures::future::try_join_all;
@@ -35,7 +37,7 @@ use snafu::{ensure, ResultExt};
use store_api::logstore::LogStore;
use store_api::storage::RegionId;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::{mpsc, Mutex};
use tokio::sync::{mpsc, oneshot, Mutex};
use crate::compaction::CompactionScheduler;
use crate::config::MitoConfig;
@@ -175,6 +177,14 @@ impl WorkerGroup {
}
}
/// Send result to the sender.
pub(crate) fn send_result(sender: Option<oneshot::Sender<Result<Output>>>, res: Result<Output>) {
if let Some(sender) = sender {
// Ignore send result.
let _ = sender.send(res);
}
}
// Tests methods.
#[cfg(test)]
impl WorkerGroup {
@@ -489,7 +499,11 @@ impl<S: LogStore> RegionWorkerLoop<S> {
DdlRequest::Drop(_) => self.handle_drop_request(ddl.region_id).await,
DdlRequest::Open(req) => self.handle_open_request(ddl.region_id, req).await,
DdlRequest::Close(_) => self.handle_close_request(ddl.region_id).await,
DdlRequest::Alter(_) => todo!(),
DdlRequest::Alter(req) => {
self.handle_alter_request(ddl.region_id, req, ddl.sender)
.await;
continue;
}
DdlRequest::Flush(_) => {
self.handle_flush_request(ddl.region_id, ddl.sender).await;
continue;

View File

@@ -0,0 +1,139 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Handling alter related requests.
use std::sync::Arc;
use common_query::Output;
use common_telemetry::{error, info};
use snafu::ResultExt;
use store_api::metadata::{RegionMetadata, RegionMetadataBuilder, RegionMetadataRef};
use store_api::region_request::RegionAlterRequest;
use store_api::storage::RegionId;
use tokio::sync::oneshot;
use crate::error::{InvalidMetadataSnafu, InvalidRegionRequestSnafu, RegionNotFoundSnafu, Result};
use crate::flush::FlushReason;
use crate::manifest::action::{RegionChange, RegionMetaAction, RegionMetaActionList};
use crate::memtable::MemtableBuilderRef;
use crate::region::version::Version;
use crate::region::MitoRegionRef;
use crate::request::{DdlRequest, SenderDdlRequest};
use crate::worker::{send_result, RegionWorkerLoop};
impl<S> RegionWorkerLoop<S> {
pub(crate) async fn handle_alter_request(
&mut self,
region_id: RegionId,
request: RegionAlterRequest,
sender: Option<oneshot::Sender<Result<Output>>>,
) {
let Some(region) = self.regions.get_region(region_id) else {
send_result(sender, RegionNotFoundSnafu { region_id }.fail());
return;
};
info!("Try to alter region: {}, request: {:?}", region_id, request);
// Get the version before alter.
let version = region.version();
// Checks whether we can alter the region directly.
if !version.memtables.is_empty() {
// If memtable is not empty, we can't alter it directly and need to flush
// all memtables first.
info!("Flush region: {} before alteration", region_id);
// Try to submit a flush task.
let task = self.new_flush_task(&region, FlushReason::Alter);
if let Err(e) = self.flush_scheduler.schedule_flush(&region, task) {
// Unable to flush the region, send error to waiter.
send_result(sender, Err(e));
return;
}
// Safety: We have requested flush.
self.flush_scheduler
.add_ddl_request_to_pending(SenderDdlRequest {
region_id,
sender,
request: DdlRequest::Alter(request),
});
return;
}
// Now we can alter the region directly.
if let Err(e) =
alter_region_schema(&region, &version, request, &self.memtable_builder).await
{
error!(e; "Failed to alter region schema, region_id: {}", region_id);
send_result(sender, Err(e));
return;
}
info!(
"Schema of region {} is altered from {} to {}",
region_id,
version.metadata.schema_version,
region.metadata().schema_version
);
// Notifies waiters.
send_result(sender, Ok(Output::AffectedRows(0)));
}
}
/// Alter the schema of the region.
async fn alter_region_schema(
region: &MitoRegionRef,
version: &Version,
request: RegionAlterRequest,
builder: &MemtableBuilderRef,
) -> Result<()> {
let new_meta = metadata_after_alteration(&version.metadata, request)?;
// Persist the metadata to region's manifest.
let change = RegionChange {
metadata: new_meta.clone(),
};
let action_list = RegionMetaActionList::with_action(RegionMetaAction::Change(change));
region.manifest_manager.update(action_list).await?;
// Apply the metadata to region's version.
region.version_control.alter_schema(new_meta, builder);
Ok(())
}
/// Creates a metadata after applying the alter `request` to the old `metadata`.
///
/// Returns an error if the `request` is invalid.
fn metadata_after_alteration(
metadata: &RegionMetadata,
request: RegionAlterRequest,
) -> Result<RegionMetadataRef> {
// Validates request.
request
.validate(metadata)
.context(InvalidRegionRequestSnafu)?;
let mut builder = RegionMetadataBuilder::from_existing(metadata.clone());
builder
.alter(request.kind)
.context(InvalidRegionRequestSnafu)?
.bump_version();
let new_meta = builder.build().context(InvalidMetadataSnafu)?;
assert_eq!(request.schema_version + 1, new_meta.schema_version);
Ok(Arc::new(new_meta))
}

View File

@@ -23,7 +23,7 @@ use crate::error::{RegionNotFoundSnafu, Result};
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
use crate::region::MitoRegionRef;
use crate::request::{CompactionFailed, CompactionFinished};
use crate::worker::RegionWorkerLoop;
use crate::worker::{send_result, RegionWorkerLoop};
impl<S: LogStore> RegionWorkerLoop<S> {
/// Handles compaction request submitted to region worker.
@@ -33,9 +33,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
sender: Option<oneshot::Sender<Result<Output>>>,
) {
let Some(region) = self.regions.get_region(region_id) else {
if let Some(sender) = sender {
let _ = sender.send(RegionNotFoundSnafu { region_id }.fail());
}
send_result(sender, RegionNotFoundSnafu { region_id }.fail());
return;
};
@@ -78,7 +76,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
// Apply edit to region's version.
region
.version_control
.apply_edit(edit, region.file_purger.clone());
.apply_edit(edit, &[], region.file_purger.clone());
request.on_success();
}

View File

@@ -26,7 +26,7 @@ use crate::flush::{FlushReason, RegionFlushTask};
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
use crate::region::MitoRegionRef;
use crate::request::{FlushFailed, FlushFinished};
use crate::worker::RegionWorkerLoop;
use crate::worker::{send_result, RegionWorkerLoop};
impl<S: LogStore> RegionWorkerLoop<S> {
/// On region flush job finished.
@@ -56,9 +56,11 @@ impl<S: LogStore> RegionWorkerLoop<S> {
}
// Apply edit to region's version.
region
.version_control
.apply_edit(edit, region.file_purger.clone());
region.version_control.apply_edit(
edit,
&request.memtables_to_remove,
region.file_purger.clone(),
);
region.update_flush_millis();
// Delete wal.
@@ -75,9 +77,14 @@ impl<S: LogStore> RegionWorkerLoop<S> {
// Notifies waiters.
request.on_success();
// Handle pending DDL requests for the region.
if let Some(ddl_requests) = self.flush_scheduler.on_flush_success(region_id) {
// Handle pending requests for the region.
if let Some((ddl_requests, write_requests)) =
self.flush_scheduler.on_flush_success(region_id)
{
// Perform DDLs first because they require empty memtables.
self.handle_ddl_requests(ddl_requests).await;
// Handle pending write requests, we don't stall these requests.
self.handle_write_requests(write_requests, false).await;
}
// Handle stalled requests.
@@ -97,9 +104,7 @@ impl<S> RegionWorkerLoop<S> {
sender: Option<oneshot::Sender<Result<Output>>>,
) {
let Some(region) = self.regions.get_region(region_id) else {
if let Some(sender) = sender {
let _ = sender.send(RegionNotFoundSnafu { region_id }.fail());
}
send_result(sender, RegionNotFoundSnafu { region_id }.fail());
return;
};
@@ -174,7 +179,12 @@ impl<S> RegionWorkerLoop<S> {
Ok(())
}
fn new_flush_task(&self, region: &MitoRegionRef, reason: FlushReason) -> RegionFlushTask {
/// Create a flush task with specific `reason` for the `region`.
pub(crate) fn new_flush_task(
&self,
region: &MitoRegionRef,
reason: FlushReason,
) -> RegionFlushTask {
// TODO(yingwen): metrics for flush requested.
RegionFlushTask {
region_id: region.region_id,

View File

@@ -17,16 +17,14 @@
use std::collections::{hash_map, HashMap};
use std::sync::Arc;
use common_query::Output;
use store_api::logstore::LogStore;
use store_api::metadata::RegionMetadata;
use store_api::storage::RegionId;
use tokio::sync::oneshot::Sender;
use crate::error::{RegionNotFoundSnafu, RejectWriteSnafu, Result};
use crate::region_write_ctx::RegionWriteCtx;
use crate::request::{SenderWriteRequest, WriteRequest};
use crate::worker::RegionWorkerLoop;
use crate::worker::{send_result, RegionWorkerLoop};
impl<S: LogStore> RegionWorkerLoop<S> {
/// Takes and handles all write requests.
@@ -93,6 +91,15 @@ impl<S> RegionWorkerLoop<S> {
for mut sender_req in write_requests {
let region_id = sender_req.request.region_id;
// If region is waiting for alteration, add requests to pending writes.
if self.flush_scheduler.has_pending_ddls(region_id) {
// TODO(yingwen): consider adding some metrics for this.
// Safety: The region has pending ddls.
self.flush_scheduler
.add_write_request_to_pending(sender_req);
continue;
}
// Checks whether the region exists and is it stalling.
if let hash_map::Entry::Vacant(e) = region_ctxs.entry(region_id) {
let Some(region) = self.regions.get_region(region_id) else {
@@ -167,11 +174,3 @@ fn maybe_fill_missing_columns(request: &mut WriteRequest, metadata: &RegionMetad
Ok(())
}
/// Send result to the request.
fn send_result(sender: Option<Sender<Result<Output>>>, res: Result<Output>) {
if let Some(sender) = sender {
// Ignore send result.
let _ = sender.send(res);
}
}

View File

@@ -32,6 +32,7 @@ use serde::de::Error;
use serde::{Deserialize, Deserializer, Serialize};
use snafu::{ensure, Location, OptionExt, ResultExt, Snafu};
use crate::region_request::{AddColumn, AddColumnLocation, AlterKind};
use crate::storage::{ColumnId, RegionId};
pub type Result<T> = std::result::Result<T, MetadataError>;
@@ -127,6 +128,10 @@ pub struct RegionMetadata {
/// Immutable and unique id of a region.
pub region_id: RegionId,
/// Current version of the region schema.
///
/// The version starts from 0. Altering the schema bumps the version.
pub schema_version: u64,
}
pub type RegionMetadataRef = Arc<RegionMetadata>;
@@ -142,6 +147,7 @@ impl<'de> Deserialize<'de> for RegionMetadata {
column_metadatas: Vec<ColumnMetadata>,
primary_key: Vec<ColumnId>,
region_id: RegionId,
schema_version: u64,
}
let without_schema = RegionMetadataWithoutSchema::deserialize(deserializer)?;
@@ -155,6 +161,7 @@ impl<'de> Deserialize<'de> for RegionMetadata {
column_metadatas: without_schema.column_metadatas,
primary_key: without_schema.primary_key,
region_id: without_schema.region_id,
schema_version: without_schema.schema_version,
})
}
}
@@ -378,6 +385,7 @@ pub struct RegionMetadataBuilder {
region_id: RegionId,
column_metadatas: Vec<ColumnMetadata>,
primary_key: Vec<ColumnId>,
schema_version: u64,
}
impl RegionMetadataBuilder {
@@ -387,31 +395,50 @@ impl RegionMetadataBuilder {
region_id: id,
column_metadatas: vec![],
primary_key: vec![],
schema_version: 0,
}
}
/// Create a builder from existing [RegionMetadata].
/// Creates a builder from existing [RegionMetadata].
pub fn from_existing(existing: RegionMetadata) -> Self {
Self {
column_metadatas: existing.column_metadatas,
primary_key: existing.primary_key,
region_id: existing.region_id,
schema_version: existing.schema_version,
}
}
/// Push a new column metadata to this region's metadata.
/// Pushes a new column metadata to this region's metadata.
pub fn push_column_metadata(&mut self, column_metadata: ColumnMetadata) -> &mut Self {
self.column_metadatas.push(column_metadata);
self
}
/// Set the primary key of the region.
/// Sets the primary key of the region.
pub fn primary_key(&mut self, key: Vec<ColumnId>) -> &mut Self {
self.primary_key = key;
self
}
/// Consume the builder and build a [RegionMetadata].
/// Increases the schema version by 1.
pub fn bump_version(&mut self) -> &mut Self {
self.schema_version += 1;
self
}
/// Applies the alter `kind` to the builder.
///
/// The `kind` should be valid.
pub fn alter(&mut self, kind: AlterKind) -> Result<&mut Self> {
match kind {
AlterKind::AddColumns { columns } => self.add_columns(columns)?,
AlterKind::DropColumns { names } => self.drop_columns(&names),
}
Ok(self)
}
/// Consumes the builder and build a [RegionMetadata].
pub fn build(self) -> Result<RegionMetadata> {
let skipped = SkippedFields::new(&self.column_metadatas)?;
@@ -422,12 +449,58 @@ impl RegionMetadataBuilder {
column_metadatas: self.column_metadatas,
primary_key: self.primary_key,
region_id: self.region_id,
schema_version: self.schema_version,
};
meta.validate()?;
Ok(meta)
}
/// Adds columns to the metadata.
fn add_columns(&mut self, columns: Vec<AddColumn>) -> Result<()> {
for add_column in columns {
let column_id = add_column.column_metadata.column_id;
let semantic_type = add_column.column_metadata.semantic_type;
match add_column.location {
None => {
self.column_metadatas.push(add_column.column_metadata);
}
Some(AddColumnLocation::First) => {
self.column_metadatas.insert(0, add_column.column_metadata);
}
Some(AddColumnLocation::After { column_name }) => {
let pos = self
.column_metadatas
.iter()
.position(|col| col.column_schema.name == column_name)
.context(InvalidRegionRequestSnafu {
region_id: self.region_id,
err: format!(
"column {} not found, failed to add column {} after it",
column_name, add_column.column_metadata.column_schema.name
),
})?;
// Insert after pos.
self.column_metadatas
.insert(pos + 1, add_column.column_metadata);
}
}
if semantic_type == SemanticType::Tag {
// For a new tag, we extend the primary key.
self.primary_key.push(column_id);
}
}
Ok(())
}
/// Drops columns from the metadata.
fn drop_columns(&mut self, names: &[String]) {
let name_set: HashSet<_> = names.iter().collect();
self.column_metadatas
.retain(|col| !name_set.contains(&col.column_schema.name));
}
}
/// Fields skipped in serialization.
@@ -497,7 +570,7 @@ pub enum MetadataError {
},
#[snafu(display(
"Failed to convert with struct from datatypes, location: {}, source: {}",
"Failed to convert struct from datatypes, location: {}, source: {}",
location,
source
))]
@@ -506,8 +579,20 @@ pub enum MetadataError {
source: datatypes::error::Error,
},
#[snafu(display("Invalid raw region request: {err}, at {location}"))]
#[snafu(display("Invalid raw region request, err: {}, location: {}", err, location))]
InvalidRawRegionRequest { err: String, location: Location },
#[snafu(display(
"Invalid region request, region_id: {}, err: {}, location: {}",
region_id,
err,
location
))]
InvalidRegionRequest {
region_id: RegionId,
err: String,
location: Location,
},
}
impl ErrorExt for MetadataError {
@@ -812,4 +897,113 @@ mod test {
"unexpected err: {err}",
);
}
#[test]
fn test_bump_version() {
let mut region_metadata = build_test_region_metadata();
let mut builder = RegionMetadataBuilder::from_existing(region_metadata.clone());
builder.bump_version();
let new_meta = builder.build().unwrap();
region_metadata.schema_version += 1;
assert_eq!(region_metadata, new_meta);
}
fn new_column_metadata(name: &str, is_tag: bool, column_id: ColumnId) -> ColumnMetadata {
let semantic_type = if is_tag {
SemanticType::Tag
} else {
SemanticType::Field
};
ColumnMetadata {
column_schema: ColumnSchema::new(name, ConcreteDataType::string_datatype(), true),
semantic_type,
column_id,
}
}
fn check_columns(metadata: &RegionMetadata, names: &[&str]) {
let actual: Vec<_> = metadata
.column_metadatas
.iter()
.map(|col| &col.column_schema.name)
.collect();
assert_eq!(names, actual);
}
#[test]
fn test_alter() {
// a (tag), b (field), c (ts)
let metadata = build_test_region_metadata();
let mut builder = RegionMetadataBuilder::from_existing(metadata);
builder
.alter(AlterKind::AddColumns {
columns: vec![AddColumn {
column_metadata: new_column_metadata("d", true, 4),
location: None,
}],
})
.unwrap();
let metadata = builder.build().unwrap();
check_columns(&metadata, &["a", "b", "c", "d"]);
assert_eq!([1, 4], &metadata.primary_key[..]);
let mut builder = RegionMetadataBuilder::from_existing(metadata);
builder
.alter(AlterKind::AddColumns {
columns: vec![AddColumn {
column_metadata: new_column_metadata("e", false, 5),
location: Some(AddColumnLocation::First),
}],
})
.unwrap();
let metadata = builder.build().unwrap();
check_columns(&metadata, &["e", "a", "b", "c", "d"]);
let mut builder = RegionMetadataBuilder::from_existing(metadata);
builder
.alter(AlterKind::AddColumns {
columns: vec![AddColumn {
column_metadata: new_column_metadata("f", false, 6),
location: Some(AddColumnLocation::After {
column_name: "b".to_string(),
}),
}],
})
.unwrap();
let metadata = builder.build().unwrap();
check_columns(&metadata, &["e", "a", "b", "f", "c", "d"]);
let mut builder = RegionMetadataBuilder::from_existing(metadata);
builder
.alter(AlterKind::AddColumns {
columns: vec![AddColumn {
column_metadata: new_column_metadata("g", false, 7),
location: Some(AddColumnLocation::After {
column_name: "d".to_string(),
}),
}],
})
.unwrap();
let metadata = builder.build().unwrap();
check_columns(&metadata, &["e", "a", "b", "f", "c", "d", "g"]);
let mut builder = RegionMetadataBuilder::from_existing(metadata);
builder
.alter(AlterKind::DropColumns {
names: vec!["g".to_string(), "e".to_string()],
})
.unwrap();
let metadata = builder.build().unwrap();
check_columns(&metadata, &["a", "b", "f", "c", "d"]);
let mut builder = RegionMetadataBuilder::from_existing(metadata);
builder
.alter(AlterKind::DropColumns {
names: vec!["a".to_string()],
})
.unwrap();
// Build returns error as the primary key has more columns.
let err = builder.build().unwrap_err();
assert_eq!(StatusCode::InvalidArguments, err.status_code());
}
}

View File

@@ -12,15 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use api::v1::region::region_request;
use api::v1::Rows;
use snafu::OptionExt;
use api::v1::add_column_location::LocationType;
use api::v1::region::{alter_request, region_request, AlterRequest};
use api::v1::{self, Rows, SemanticType};
use snafu::{ensure, OptionExt};
use crate::metadata::{ColumnMetadata, InvalidRawRegionRequestSnafu, MetadataError};
use crate::metadata::{
ColumnMetadata, InvalidRawRegionRequestSnafu, InvalidRegionRequestSnafu, MetadataError,
RegionMetadata, Result,
};
use crate::path_utils::region_dir;
use crate::storage::{AlterRequest, ColumnId, RegionId, ScanRequest};
use crate::storage::{ColumnId, RegionId, ScanRequest};
#[derive(Debug)]
pub enum RegionRequest {
@@ -39,11 +43,7 @@ pub enum RegionRequest {
impl RegionRequest {
/// Convert [Body](region_request::Body) to a group of [RegionRequest] with region id.
/// Inserts/Deletes request might become multiple requests. Others are one-to-one.
// TODO: implement alter request
#[allow(unreachable_code)]
pub fn try_from_request_body(
body: region_request::Body,
) -> Result<Vec<(RegionId, Self)>, MetadataError> {
pub fn try_from_request_body(body: region_request::Body) -> Result<Vec<(RegionId, Self)>> {
match body {
region_request::Body::Inserts(inserts) => Ok(inserts
.requests
@@ -68,7 +68,7 @@ impl RegionRequest {
.column_defs
.into_iter()
.map(ColumnMetadata::try_from_column_def)
.collect::<Result<Vec<_>, _>>()?;
.collect::<Result<Vec<_>>>()?;
let region_id = create.region_id.into();
let region_dir = region_dir(&create.catalog, &create.schema, region_id);
Ok(vec![(
@@ -103,25 +103,10 @@ impl RegionRequest {
close.region_id.into(),
Self::Close(RegionCloseRequest {}),
)]),
region_request::Body::Alter(alter) => {
let kind = alter.kind.context(InvalidRawRegionRequestSnafu {
err: "'kind' is absent",
})?;
Ok(vec![(
alter.region_id.into(),
Self::Alter(RegionAlterRequest {
request: AlterRequest {
operation: kind.try_into().map_err(|e| {
InvalidRawRegionRequestSnafu {
err: format!("{e}"),
}
.build()
})?,
version: alter.schema_version as _,
},
}),
)])
}
region_request::Body::Alter(alter) => Ok(vec![(
alter.region_id.into(),
Self::Alter(RegionAlterRequest::try_from(alter)?),
)]),
region_request::Body::Flush(flush) => Ok(vec![(
flush.region_id.into(),
Self::Flush(RegionFlushRequest {}),
@@ -189,9 +174,236 @@ pub struct RegionOpenRequest {
#[derive(Debug)]
pub struct RegionCloseRequest {}
#[derive(Debug)]
/// Alter metadata of a region.
#[derive(Debug, PartialEq, Eq)]
pub struct RegionAlterRequest {
pub request: AlterRequest,
/// The version of the schema before applying the alteration.
pub schema_version: u64,
/// Kind of alteration to do.
pub kind: AlterKind,
}
impl RegionAlterRequest {
/// Checks whether the request is valid, returns an error if it is invalid.
pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
ensure!(
metadata.schema_version == self.schema_version,
InvalidRegionRequestSnafu {
region_id: metadata.region_id,
err: format!(
"region schema version {} is not equal to request schema version {}",
metadata.schema_version, self.schema_version
),
}
);
self.kind.validate(metadata)?;
Ok(())
}
}
impl TryFrom<AlterRequest> for RegionAlterRequest {
type Error = MetadataError;
fn try_from(value: AlterRequest) -> Result<Self> {
let kind = value.kind.context(InvalidRawRegionRequestSnafu {
err: "missing kind in AlterRequest",
})?;
let kind = AlterKind::try_from(kind)?;
Ok(RegionAlterRequest {
schema_version: value.schema_version,
kind,
})
}
}
/// Kind of the alteration.
#[derive(Debug, PartialEq, Eq)]
pub enum AlterKind {
/// Add columns to the region.
AddColumns {
/// Columns to add.
columns: Vec<AddColumn>,
},
/// Drop columns from the region, only fields are allowed to drop.
DropColumns {
/// Name of columns to drop.
names: Vec<String>,
},
}
impl AlterKind {
/// Returns an error if the the alter kind is invalid.
pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
match self {
AlterKind::AddColumns { columns } => {
let mut names = HashSet::with_capacity(columns.len());
for col_to_add in columns {
ensure!(
!names.contains(&col_to_add.column_metadata.column_schema.name),
InvalidRegionRequestSnafu {
region_id: metadata.region_id,
err: format!(
"add column {} more than once",
col_to_add.column_metadata.column_schema.name
),
}
);
col_to_add.validate(metadata)?;
names.insert(&col_to_add.column_metadata.column_schema.name);
}
}
AlterKind::DropColumns { names } => {
for name in names {
Self::validate_column_to_drop(name, metadata)?;
}
}
}
Ok(())
}
/// Returns an error if the column to drop is invalid.
fn validate_column_to_drop(name: &str, metadata: &RegionMetadata) -> Result<()> {
let column = metadata
.column_by_name(name)
.with_context(|| InvalidRegionRequestSnafu {
region_id: metadata.region_id,
err: format!("column {} does not exist", name),
})?;
ensure!(
column.semantic_type == SemanticType::Field,
InvalidRegionRequestSnafu {
region_id: metadata.region_id,
err: format!("column {} is not a field and could not be dropped", name),
}
);
Ok(())
}
}
impl TryFrom<alter_request::Kind> for AlterKind {
type Error = MetadataError;
fn try_from(kind: alter_request::Kind) -> Result<Self> {
let alter_kind = match kind {
alter_request::Kind::AddColumns(x) => {
let columns = x
.add_columns
.into_iter()
.map(|x| x.try_into())
.collect::<Result<Vec<_>>>()?;
AlterKind::AddColumns { columns }
}
alter_request::Kind::DropColumns(x) => {
let names = x.drop_columns.into_iter().map(|x| x.name).collect();
AlterKind::DropColumns { names }
}
};
Ok(alter_kind)
}
}
/// Adds a column.
#[derive(Debug, PartialEq, Eq)]
pub struct AddColumn {
/// Metadata of the column to add.
pub column_metadata: ColumnMetadata,
/// Location to add the column. If location is None, the region adds
/// the column to the last.
pub location: Option<AddColumnLocation>,
}
impl AddColumn {
/// Returns an error if the column to add is invalid.
pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
ensure!(
self.column_metadata.column_schema.is_nullable()
|| self
.column_metadata
.column_schema
.default_constraint()
.is_some(),
InvalidRegionRequestSnafu {
region_id: metadata.region_id,
err: format!(
"no default value for column {}",
self.column_metadata.column_schema.name
),
}
);
ensure!(
metadata
.column_by_name(&self.column_metadata.column_schema.name)
.is_none(),
InvalidRegionRequestSnafu {
region_id: metadata.region_id,
err: format!(
"column {} already exists",
self.column_metadata.column_schema.name
),
}
);
Ok(())
}
}
impl TryFrom<v1::region::AddColumn> for AddColumn {
type Error = MetadataError;
fn try_from(add_column: v1::region::AddColumn) -> Result<Self> {
let column_def = add_column
.column_def
.context(InvalidRawRegionRequestSnafu {
err: "missing column_def in AddColumn",
})?;
let column_metadata = ColumnMetadata::try_from_column_def(column_def)?;
let location = add_column
.location
.map(AddColumnLocation::try_from)
.transpose()?;
Ok(AddColumn {
column_metadata,
location,
})
}
}
/// Location to add a column.
#[derive(Debug, PartialEq, Eq)]
pub enum AddColumnLocation {
/// Add the column to the first position of columns.
First,
/// Add the column after specific column.
After {
/// Add the column after this column.
column_name: String,
},
}
impl TryFrom<v1::AddColumnLocation> for AddColumnLocation {
type Error = MetadataError;
fn try_from(location: v1::AddColumnLocation) -> Result<Self> {
let location_type = LocationType::from_i32(location.location_type).context(
InvalidRawRegionRequestSnafu {
err: format!("unknown location type {}", location.location_type),
},
)?;
let add_column_location = match location_type {
LocationType::First => AddColumnLocation::First,
LocationType::After => AddColumnLocation::After {
column_name: location.after_column_name,
},
};
Ok(add_column_location)
}
}
#[derive(Debug)]
@@ -199,3 +411,297 @@ pub struct RegionFlushRequest {}
#[derive(Debug)]
pub struct RegionCompactRequest {}
#[cfg(test)]
mod tests {
use api::v1::region::RegionColumnDef;
use api::v1::{ColumnDataType, ColumnDef};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use super::*;
use crate::metadata::RegionMetadataBuilder;
#[test]
fn test_from_proto_location() {
let proto_location = v1::AddColumnLocation {
location_type: LocationType::First as i32,
after_column_name: "".to_string(),
};
let location = AddColumnLocation::try_from(proto_location).unwrap();
assert_eq!(location, AddColumnLocation::First);
let proto_location = v1::AddColumnLocation {
location_type: 10,
after_column_name: "".to_string(),
};
AddColumnLocation::try_from(proto_location).unwrap_err();
let proto_location = v1::AddColumnLocation {
location_type: LocationType::After as i32,
after_column_name: "a".to_string(),
};
let location = AddColumnLocation::try_from(proto_location).unwrap();
assert_eq!(
location,
AddColumnLocation::After {
column_name: "a".to_string()
}
);
}
#[test]
fn test_from_none_proto_add_column() {
AddColumn::try_from(v1::region::AddColumn {
column_def: None,
location: None,
})
.unwrap_err();
}
#[test]
fn test_from_proto_alter_request() {
RegionAlterRequest::try_from(AlterRequest {
region_id: 0,
schema_version: 1,
kind: None,
})
.unwrap_err();
let request = RegionAlterRequest::try_from(AlterRequest {
region_id: 0,
schema_version: 1,
kind: Some(alter_request::Kind::AddColumns(v1::region::AddColumns {
add_columns: vec![v1::region::AddColumn {
column_def: Some(RegionColumnDef {
column_def: Some(ColumnDef {
name: "a".to_string(),
data_type: ColumnDataType::String as i32,
is_nullable: true,
default_constraint: vec![],
semantic_type: SemanticType::Field as i32,
}),
column_id: 1,
}),
location: Some(v1::AddColumnLocation {
location_type: LocationType::First as i32,
after_column_name: "".to_string(),
}),
}],
})),
})
.unwrap();
assert_eq!(
request,
RegionAlterRequest {
schema_version: 1,
kind: AlterKind::AddColumns {
columns: vec![AddColumn {
column_metadata: ColumnMetadata {
column_schema: ColumnSchema::new(
"a",
ConcreteDataType::string_datatype(),
true,
),
semantic_type: SemanticType::Field,
column_id: 1,
},
location: Some(AddColumnLocation::First),
}]
},
}
);
}
fn new_metadata() -> RegionMetadata {
let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 1,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"tag_0",
ConcreteDataType::string_datatype(),
true,
),
semantic_type: SemanticType::Tag,
column_id: 2,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"field_0",
ConcreteDataType::string_datatype(),
true,
),
semantic_type: SemanticType::Field,
column_id: 3,
})
.primary_key(vec![2]);
builder.build().unwrap()
}
#[test]
fn test_add_column_validate() {
let metadata = new_metadata();
AddColumn {
column_metadata: ColumnMetadata {
column_schema: ColumnSchema::new(
"tag_1",
ConcreteDataType::string_datatype(),
true,
),
semantic_type: SemanticType::Tag,
column_id: 4,
},
location: None,
}
.validate(&metadata)
.unwrap();
AddColumn {
column_metadata: ColumnMetadata {
column_schema: ColumnSchema::new(
"tag_1",
ConcreteDataType::string_datatype(),
false,
),
semantic_type: SemanticType::Tag,
column_id: 4,
},
location: None,
}
.validate(&metadata)
.unwrap_err();
AddColumn {
column_metadata: ColumnMetadata {
column_schema: ColumnSchema::new(
"tag_0",
ConcreteDataType::string_datatype(),
true,
),
semantic_type: SemanticType::Tag,
column_id: 4,
},
location: None,
}
.validate(&metadata)
.unwrap_err();
}
#[test]
fn test_add_duplicate_columns() {
let kind = AlterKind::AddColumns {
columns: vec![
AddColumn {
column_metadata: ColumnMetadata {
column_schema: ColumnSchema::new(
"tag_1",
ConcreteDataType::string_datatype(),
true,
),
semantic_type: SemanticType::Tag,
column_id: 4,
},
location: None,
},
AddColumn {
column_metadata: ColumnMetadata {
column_schema: ColumnSchema::new(
"tag_1",
ConcreteDataType::string_datatype(),
true,
),
semantic_type: SemanticType::Field,
column_id: 5,
},
location: None,
},
],
};
let metadata = new_metadata();
kind.validate(&metadata).unwrap_err();
}
#[test]
fn test_validate_drop_column() {
let metadata = new_metadata();
AlterKind::DropColumns {
names: vec!["xxxx".to_string()],
}
.validate(&metadata)
.unwrap_err();
AlterKind::DropColumns {
names: vec!["tag_0".to_string()],
}
.validate(&metadata)
.unwrap_err();
AlterKind::DropColumns {
names: vec!["field_0".to_string()],
}
.validate(&metadata)
.unwrap();
}
#[test]
fn test_validate_schema_version() {
let mut metadata = new_metadata();
metadata.schema_version = 2;
RegionAlterRequest {
schema_version: 1,
kind: AlterKind::DropColumns {
names: vec!["field_0".to_string()],
},
}
.validate(&metadata)
.unwrap_err();
}
#[test]
fn test_validate_add_columns() {
let kind = AlterKind::AddColumns {
columns: vec![
AddColumn {
column_metadata: ColumnMetadata {
column_schema: ColumnSchema::new(
"tag_1",
ConcreteDataType::string_datatype(),
true,
),
semantic_type: SemanticType::Tag,
column_id: 4,
},
location: None,
},
AddColumn {
column_metadata: ColumnMetadata {
column_schema: ColumnSchema::new(
"field_1",
ConcreteDataType::string_datatype(),
true,
),
semantic_type: SemanticType::Field,
column_id: 5,
},
location: None,
},
],
};
let request = RegionAlterRequest {
schema_version: 1,
kind,
};
let mut metadata = new_metadata();
metadata.schema_version = 1;
request.validate(&metadata).unwrap();
}
}