feat: Implement append mode for a region (#3558)

* feat: add dedup option to merge reader

* test: test merger

* feat: append mode option

* feat: implement append mode for regions

* feat: only allow put under append mode

* feat: always create builder

* test: test append mode

* style: fix clippy

* test: trigger compaction

* chore: fix compiler errors
This commit is contained in:
Yingwen
2024-03-27 11:21:22 +08:00
committed by GitHub
parent 653697f1d5
commit 922b1a9b66
12 changed files with 372 additions and 65 deletions

View File

@@ -188,6 +188,7 @@ impl Picker for TwcsPicker {
cache_manager,
storage: current_version.options.storage.clone(),
index_options: current_version.options.index_options.clone(),
append_mode: current_version.options.append_mode,
};
Some(Box::new(task))
}
@@ -255,6 +256,8 @@ pub(crate) struct TwcsCompactionTask {
pub(crate) storage: Option<String>,
/// Index options of the region.
pub(crate) index_options: IndexOptions,
/// The region is using append mode.
pub(crate) append_mode: bool,
}
impl Debug for TwcsCompactionTask {
@@ -264,6 +267,7 @@ impl Debug for TwcsCompactionTask {
.field("outputs", &self.outputs)
.field("expired_ssts", &self.expired_ssts)
.field("compaction_time_window", &self.compaction_time_window)
.field("append_mode", &self.append_mode)
.finish()
}
}
@@ -332,9 +336,15 @@ impl TwcsCompactionTask {
let cache_manager = self.cache_manager.clone();
let storage = self.storage.clone();
let index_options = self.index_options.clone();
let append_mode = self.append_mode;
futs.push(async move {
let reader =
build_sst_reader(metadata.clone(), sst_layer.clone(), &output.inputs).await?;
let reader = build_sst_reader(
metadata.clone(),
sst_layer.clone(),
&output.inputs,
append_mode,
)
.await?;
let file_meta_opt = sst_layer
.write_sst(
SstWriteRequest {
@@ -565,9 +575,11 @@ async fn build_sst_reader(
metadata: RegionMetadataRef,
sst_layer: AccessLayerRef,
inputs: &[FileHandle],
append_mode: bool,
) -> error::Result<BoxedBatchReader> {
SeqScan::new(sst_layer, ProjectionMapper::all(&metadata)?)
.with_files(inputs.to_vec())
.with_append_mode(append_mode)
// We ignore file not found error during compaction.
.with_ignore_file_not_found(true)
.build_reader()

View File

@@ -17,6 +17,8 @@
#[cfg(test)]
mod alter_test;
#[cfg(test)]
mod append_mode_test;
#[cfg(test)]
mod basic_test;
#[cfg(test)]
mod catchup_test;

View File

@@ -0,0 +1,153 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Tests for append mode.
use api::v1::Rows;
use common_recordbatch::RecordBatches;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{RegionCompactRequest, RegionRequest};
use store_api::storage::{RegionId, ScanRequest};
use crate::config::MitoConfig;
use crate::test_util::{
build_rows, build_rows_for_key, flush_region, put_rows, rows_schema, CreateRequestBuilder,
TestEnv,
};
#[tokio::test]
async fn test_append_mode_write_query() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new()
.insert_option("append_mode", "true")
.build();
let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
// rows 1, 2
let rows = build_rows(1, 3);
let rows = Rows {
schema: column_schemas.clone(),
rows,
};
put_rows(&engine, region_id, rows).await;
let mut rows = build_rows(0, 2);
rows.append(&mut build_rows(1, 2));
// rows 0, 1, 1
let rows = Rows {
schema: column_schemas,
rows,
};
put_rows(&engine, region_id, rows).await;
let request = ScanRequest::default();
let stream = engine.handle_query(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 0 | 0.0 | 1970-01-01T00:00:00 |
| 1 | 1.0 | 1970-01-01T00:00:01 |
| 1 | 1.0 | 1970-01-01T00:00:01 |
| 1 | 1.0 | 1970-01-01T00:00:01 |
| 2 | 2.0 | 1970-01-01T00:00:02 |
+-------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());
}
#[tokio::test]
async fn test_append_mode_compaction() {
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new()
.insert_option("compaction.type", "twcs")
.insert_option("compaction.twcs.max_active_window_files", "2")
.insert_option("compaction.twcs.max_inactive_window_files", "2")
.insert_option("append_mode", "true")
.build();
let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
// Flush 2 SSTs for compaction.
// a, field 1, 2
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows_for_key("a", 1, 3, 1),
};
put_rows(&engine, region_id, rows).await;
flush_region(&engine, region_id, None).await;
// a, field 0, 1
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows_for_key("a", 0, 2, 0),
};
put_rows(&engine, region_id, rows).await;
flush_region(&engine, region_id, None).await;
// b, field 0, 1
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows_for_key("b", 0, 2, 0),
};
put_rows(&engine, region_id, rows).await;
flush_region(&engine, region_id, None).await;
let output = engine
.handle_request(region_id, RegionRequest::Compact(RegionCompactRequest {}))
.await
.unwrap();
assert_eq!(output.affected_rows, 0);
// a, field 2, 3
let rows = Rows {
schema: column_schemas,
rows: build_rows_for_key("a", 2, 4, 2),
};
put_rows(&engine, region_id, rows).await;
let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
assert_eq!(1, scanner.num_files());
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| a | 0.0 | 1970-01-01T00:00:00 |
| a | 1.0 | 1970-01-01T00:00:01 |
| a | 1.0 | 1970-01-01T00:00:01 |
| a | 2.0 | 1970-01-01T00:00:02 |
| a | 2.0 | 1970-01-01T00:00:02 |
| a | 3.0 | 1970-01-01T00:00:03 |
| b | 0.0 | 1970-01-01T00:00:00 |
| b | 1.0 | 1970-01-01T00:00:01 |
+-------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());
}

View File

@@ -24,6 +24,7 @@ use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use table::predicate::Predicate;
use crate::config::MitoConfig;
use crate::error::Result;
use crate::flush::WriteBufferManagerRef;
use crate::memtable::key_values::KeyValue;
@@ -212,27 +213,29 @@ impl Drop for AllocTracker {
#[derive(Clone)]
pub(crate) struct MemtableBuilderProvider {
write_buffer_manager: Option<WriteBufferManagerRef>,
default_memtable_builder: MemtableBuilderRef,
config: Arc<MitoConfig>,
}
impl MemtableBuilderProvider {
pub(crate) fn new(
write_buffer_manager: Option<WriteBufferManagerRef>,
default_memtable_builder: MemtableBuilderRef,
config: Arc<MitoConfig>,
) -> Self {
Self {
write_buffer_manager,
default_memtable_builder,
config,
}
}
pub(crate) fn builder_for_options(
&self,
options: Option<&MemtableOptions>,
dedup: bool,
) -> MemtableBuilderRef {
match options {
Some(MemtableOptions::TimeSeries) => Arc::new(TimeSeriesMemtableBuilder::new(
self.write_buffer_manager.clone(),
dedup,
)),
Some(MemtableOptions::PartitionTree(opts)) => {
Arc::new(PartitionTreeMemtableBuilder::new(
@@ -240,12 +243,29 @@ impl MemtableBuilderProvider {
index_max_keys_per_shard: opts.index_max_keys_per_shard,
data_freeze_threshold: opts.data_freeze_threshold,
fork_dictionary_bytes: opts.fork_dictionary_bytes,
..Default::default()
dedup,
},
self.write_buffer_manager.clone(),
))
}
None => self.default_memtable_builder.clone(),
None => self.default_memtable_builder(dedup),
}
}
fn default_memtable_builder(&self, dedup: bool) -> MemtableBuilderRef {
match &self.config.memtable {
MemtableConfig::PartitionTree(config) => {
let mut config = config.clone();
config.dedup = dedup;
Arc::new(PartitionTreeMemtableBuilder::new(
config,
self.write_buffer_manager.clone(),
))
}
MemtableConfig::TimeSeries => Arc::new(TimeSeriesMemtableBuilder::new(
self.write_buffer_manager.clone(),
dedup,
)),
}
}
}

View File

@@ -54,13 +54,15 @@ const INITIAL_BUILDER_CAPACITY: usize = 0;
#[derive(Debug, Default)]
pub struct TimeSeriesMemtableBuilder {
write_buffer_manager: Option<WriteBufferManagerRef>,
dedup: bool,
}
impl TimeSeriesMemtableBuilder {
/// Creates a new builder with specific `write_buffer_manager`.
pub fn new(write_buffer_manager: Option<WriteBufferManagerRef>) -> Self {
pub fn new(write_buffer_manager: Option<WriteBufferManagerRef>, dedup: bool) -> Self {
Self {
write_buffer_manager,
dedup,
}
}
}
@@ -71,7 +73,7 @@ impl MemtableBuilder for TimeSeriesMemtableBuilder {
metadata.clone(),
id,
self.write_buffer_manager.clone(),
true, // todo(hl): set according to region option
self.dedup,
))
}
}

View File

@@ -33,7 +33,8 @@ use crate::read::{Batch, BatchReader, BoxedBatchReader, Source};
/// The merge reader merges [Batch]es from multiple sources that yield sorted batches.
/// 1. Batch is ordered by primary key, time index, sequence desc, op type desc (we can
/// ignore op type as sequence is already unique).
/// 2. Batch doesn't have duplicate elements (elements with the same primary key and time index).
/// 2. Batch doesn't have duplicate elements (elements with the same primary key and time index) if
/// dedup is true.
/// 3. Batches from sources **must** not be empty.
pub struct MergeReader {
/// Holds [Node]s whose key range of current batch **is** overlapped with the merge window.
@@ -48,6 +49,8 @@ pub struct MergeReader {
cold: BinaryHeap<Node>,
/// Batch to output.
output_batch: Option<Batch>,
/// Remove duplicate timestamps.
dedup: bool,
/// Local metrics.
metrics: Metrics,
}
@@ -98,7 +101,7 @@ impl Drop for MergeReader {
impl MergeReader {
/// Creates and initializes a new [MergeReader].
pub async fn new(sources: Vec<Source>) -> Result<MergeReader> {
pub async fn new(sources: Vec<Source>, dedup: bool) -> Result<MergeReader> {
let start = Instant::now();
let mut metrics = Metrics::default();
@@ -116,6 +119,7 @@ impl MergeReader {
hot,
cold,
output_batch: None,
dedup,
metrics,
};
// Initializes the reader.
@@ -150,12 +154,13 @@ impl MergeReader {
let mut hottest = self.hot.pop().unwrap();
let batch = hottest.fetch_batch(&mut self.metrics).await?;
Self::maybe_output_batch(batch, &mut self.output_batch, &mut self.metrics)?;
Self::maybe_output_batch(batch, &mut self.output_batch, self.dedup, &mut self.metrics)?;
self.reheap(hottest)
}
/// Fetches non-duplicated rows from the hottest node and skips the timestamp duplicated
/// with the first timestamp in the next node.
/// Fetches non-duplicated rows from the hottest node.
///
/// If `dedup` is true, it skips the timestamp duplicated with the first timestamp in the next node.
async fn fetch_rows_from_hottest(&mut self) -> Result<()> {
// Safety: `fetch_batches_to_output()` ensures the hot heap has more than 1 element.
// Pop hottest node.
@@ -176,36 +181,58 @@ impl MergeReader {
// Binary searches the timestamp in the top batch.
// Safety: Batches should have the same timestamp resolution so we can compare the native
// value directly.
match timestamps.binary_search(&next_min_ts.value()) {
Ok(pos) => {
// They have duplicate timestamps. Outputs timestamps before the duplicated timestamp.
// Batch itself doesn't contain duplicate timestamps so timestamps before `pos`
// must be less than `next_min_ts`.
Self::maybe_output_batch(
top.slice(0, pos),
&mut self.output_batch,
&mut self.metrics,
)?;
// This keep the duplicate timestamp in the node.
top_node.skip_rows(pos, &mut self.metrics).await?;
// The merge window should contain this timestamp so only nodes in the hot heap
// have this timestamp.
self.filter_first_duplicate_timestamp_in_hot(top_node, next_min_ts)
.await?;
}
let duplicate_pos = match timestamps.binary_search(&next_min_ts.value()) {
Ok(pos) => pos,
Err(pos) => {
// No duplicate timestamp. Outputs timestamp before `pos`.
Self::maybe_output_batch(
top.slice(0, pos),
&mut self.output_batch,
self.dedup,
&mut self.metrics,
)?;
top_node.skip_rows(pos, &mut self.metrics).await?;
self.reheap(top_node)?;
return self.reheap(top_node);
}
};
if self.dedup {
// They have duplicate timestamps. Outputs timestamps before the duplicated timestamp.
// Batch itself doesn't contain duplicate timestamps so timestamps before `duplicate_pos`
// must be less than `next_min_ts`.
Self::maybe_output_batch(
top.slice(0, duplicate_pos),
&mut self.output_batch,
self.dedup,
&mut self.metrics,
)?;
// This keep the duplicate timestamp in the node.
top_node.skip_rows(duplicate_pos, &mut self.metrics).await?;
// The merge window should contain this timestamp so only nodes in the hot heap
// have this timestamp.
return self
.filter_first_duplicate_timestamp_in_hot(top_node, next_min_ts)
.await;
}
Ok(())
// No need to remove duplicate timestamps.
let output_end = if duplicate_pos == 0 {
// If the first timestamp of the top node is duplicate. We can simply return the first row
// as the heap ensure it is the one with largest sequence.
1
} else {
// We don't know which one has the larger sequence so we use the range before
// the duplicate pos.
duplicate_pos
};
Self::maybe_output_batch(
top.slice(0, output_end),
&mut self.output_batch,
self.dedup,
&mut self.metrics,
)?;
top_node.skip_rows(output_end, &mut self.metrics).await?;
self.reheap(top_node)
}
/// Filters the first duplicate `timestamp` in `top_node` and `hot` heap. Only keeps the timestamp
@@ -297,12 +324,17 @@ impl MergeReader {
fn maybe_output_batch(
mut batch: Batch,
output_batch: &mut Option<Batch>,
dedup: bool,
metrics: &mut Metrics,
) -> Result<()> {
debug_assert!(output_batch.is_none());
let num_rows = batch.num_rows();
batch.filter_deleted()?;
// If dedup is false, we don't expect delete happens and we skip checking whether there
// is any deleted entry.
if dedup {
batch.filter_deleted()?;
}
// Update deleted rows metrics.
metrics.num_deleted_rows += num_rows - batch.num_rows();
if batch.is_empty() {
@@ -315,12 +347,13 @@ impl MergeReader {
}
/// Builder to build and initialize a [MergeReader].
#[derive(Default)]
pub struct MergeReaderBuilder {
/// Input sources.
///
/// All source must yield batches with the same schema.
sources: Vec<Source>,
/// Remove duplicate timestamps. Default is true.
dedup: bool,
}
impl MergeReaderBuilder {
@@ -330,8 +363,8 @@ impl MergeReaderBuilder {
}
/// Creates a builder from sources.
pub fn from_sources(sources: Vec<Source>) -> MergeReaderBuilder {
MergeReaderBuilder { sources }
pub fn from_sources(sources: Vec<Source>, dedup: bool) -> MergeReaderBuilder {
MergeReaderBuilder { sources, dedup }
}
/// Pushes a batch reader to sources.
@@ -349,7 +382,16 @@ impl MergeReaderBuilder {
/// Builds and initializes the reader, then resets the builder.
pub async fn build(&mut self) -> Result<MergeReader> {
let sources = mem::take(&mut self.sources);
MergeReader::new(sources).await
MergeReader::new(sources, self.dedup).await
}
}
impl Default for MergeReaderBuilder {
fn default() -> Self {
MergeReaderBuilder {
sources: Vec::new(),
dedup: true,
}
}
}
@@ -901,4 +943,40 @@ mod tests {
.collect();
check_reader_result(&mut reader, &expect).await;
}
#[tokio::test]
async fn test_merge_keep_duplicate() {
let reader1 = VecBatchReader::new(&[new_batch(
b"k1",
&[1, 2],
&[10, 10],
&[OpType::Put, OpType::Put],
&[21, 22],
)]);
let reader2 = VecBatchReader::new(&[new_batch(
b"k1",
&[2, 3],
&[11, 11],
&[OpType::Put, OpType::Put],
&[32, 33],
)]);
let sources = vec![
Source::Reader(Box::new(reader1)),
Source::Iter(Box::new(reader2)),
];
let mut reader = MergeReaderBuilder::from_sources(sources, false)
.build()
.await
.unwrap();
check_reader_result(
&mut reader,
&[
new_batch(b"k1", &[1], &[10], &[OpType::Put], &[21]),
new_batch(b"k1", &[2], &[11], &[OpType::Put], &[32]),
new_batch(b"k1", &[2], &[10], &[OpType::Put], &[22]),
new_batch(b"k1", &[3], &[11], &[OpType::Put], &[33]),
],
)
.await;
}
}

View File

@@ -210,12 +210,13 @@ impl ScanRegion {
.collect();
debug!(
"Seq scan region {}, request: {:?}, memtables: {}, ssts_to_read: {}, total_ssts: {}",
"Seq scan region {}, request: {:?}, memtables: {}, ssts_to_read: {}, total_ssts: {}, append_mode: {}",
self.version.metadata.region_id,
self.request,
memtables.len(),
files.len(),
total_ssts
total_ssts,
self.version.options.append_mode,
);
let index_applier = self.build_index_applier();
@@ -234,7 +235,8 @@ impl ScanRegion {
.with_cache(self.cache_manager)
.with_index_applier(index_applier)
.with_parallelism(self.parallelism)
.with_start_time(self.start_time);
.with_start_time(self.start_time)
.with_append_mode(self.version.options.append_mode);
Ok(seq_scan)
}

View File

@@ -67,6 +67,8 @@ pub struct SeqScan {
index_applier: Option<SstIndexApplierRef>,
/// Start time of the query.
query_start: Option<Instant>,
/// The region is using append mode.
append_mode: bool,
}
impl SeqScan {
@@ -85,6 +87,7 @@ impl SeqScan {
parallelism: ScanParallism::default(),
index_applier: None,
query_start: None,
append_mode: false,
}
}
@@ -151,6 +154,12 @@ impl SeqScan {
self
}
#[must_use]
pub(crate) fn with_append_mode(mut self, is_append_mode: bool) -> Self {
self.append_mode = is_append_mode;
self
}
/// Builds a stream for the query.
pub async fn build_stream(&self) -> Result<SendableRecordBatchStream> {
let mut metrics = Metrics::default();
@@ -210,8 +219,10 @@ impl SeqScan {
pub async fn build_reader(&self) -> Result<BoxedBatchReader> {
// Scans all memtables and SSTs. Builds a merge reader to merge results.
let sources = self.build_sources().await?;
let mut builder = MergeReaderBuilder::from_sources(sources);
Ok(Box::new(builder.build().await?))
let dedup = !self.append_mode;
let mut builder = MergeReaderBuilder::from_sources(sources, dedup);
let reader = builder.build().await?;
Ok(Box::new(reader))
}
/// Builds a [BoxedBatchReader] that can scan memtables and SSTs in parallel.
@@ -228,8 +239,10 @@ impl SeqScan {
Source::Stream(stream)
})
.collect();
let mut builder = MergeReaderBuilder::from_sources(sources);
Ok(Box::new(builder.build().await?))
let dedup = !self.append_mode;
let mut builder = MergeReaderBuilder::from_sources(sources, dedup);
let reader = builder.build().await?;
Ok(Box::new(reader))
}
/// Builds and returns sources to read.

View File

@@ -173,7 +173,7 @@ impl RegionOpener {
let memtable_builder = self
.memtable_builder_provider
.builder_for_options(options.memtable.as_ref());
.builder_for_options(options.memtable.as_ref(), !options.append_mode);
// Initial memtable id is 0.
let part_duration = options.compaction.time_window();
let mutable = Arc::new(TimePartitions::new(
@@ -281,9 +281,10 @@ impl RegionOpener {
access_layer.clone(),
self.cache_manager.clone(),
));
let memtable_builder = self
.memtable_builder_provider
.builder_for_options(region_options.memtable.as_ref());
let memtable_builder = self.memtable_builder_provider.builder_for_options(
region_options.memtable.as_ref(),
!region_options.append_mode,
);
// Initial memtable id is 0.
let part_duration = region_options.compaction.time_window();
let mutable = Arc::new(TimePartitions::new(

View File

@@ -46,6 +46,8 @@ pub struct RegionOptions {
pub compaction: CompactionOptions,
/// Custom storage. Uses default storage if it is `None`.
pub storage: Option<String>,
/// If append mode is enabled, the region keeps duplicate rows.
pub append_mode: bool,
/// Wal options.
pub wal_options: WalOptions,
/// Index options.
@@ -91,6 +93,7 @@ impl TryFrom<&HashMap<String, String>> for RegionOptions {
ttl: options.ttl,
compaction,
storage: options.storage,
append_mode: options.append_mode,
wal_options,
index_options,
memtable,
@@ -166,6 +169,7 @@ impl Default for TwcsOptions {
/// We need to define a new struct without enum fields as `#[serde(default)]` does not
/// support external tagging.
#[serde_as]
#[derive(Debug, Deserialize)]
#[serde(default)]
struct RegionOptionsWithoutEnum {
@@ -173,6 +177,8 @@ struct RegionOptionsWithoutEnum {
#[serde(with = "humantime_serde")]
ttl: Option<Duration>,
storage: Option<String>,
#[serde_as(as = "DisplayFromStr")]
append_mode: bool,
}
impl Default for RegionOptionsWithoutEnum {
@@ -181,6 +187,7 @@ impl Default for RegionOptionsWithoutEnum {
RegionOptionsWithoutEnum {
ttl: options.ttl,
storage: options.storage,
append_mode: options.append_mode,
}
}
}
@@ -482,6 +489,7 @@ mod tests {
("compaction.twcs.time_window", "2h"),
("compaction.type", "twcs"),
("storage", "S3"),
("append_mode", "true"),
("index.inverted_index.ignore_column_ids", "1,2,3"),
("index.inverted_index.segment_row_count", "512"),
(
@@ -502,6 +510,7 @@ mod tests {
time_window: Some(Duration::from_secs(3600 * 2)),
}),
storage: Some("S3".to_string()),
append_mode: true,
wal_options,
index_options: IndexOptions {
inverted_index: InvertedIndexOptions {

View File

@@ -48,9 +48,7 @@ use crate::config::MitoConfig;
use crate::error::{InvalidRequestSnafu, JoinSnafu, Result, WorkerStoppedSnafu};
use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef};
use crate::manifest::action::RegionEdit;
use crate::memtable::partition_tree::PartitionTreeMemtableBuilder;
use crate::memtable::time_series::TimeSeriesMemtableBuilder;
use crate::memtable::{MemtableBuilderProvider, MemtableConfig};
use crate::memtable::MemtableBuilderProvider;
use crate::region::{MitoRegionRef, RegionMap, RegionMapRef};
use crate::request::{
BackgroundNotify, DdlRequest, SenderDdlRequest, SenderWriteRequest, WorkerRequest,
@@ -338,20 +336,10 @@ impl<S: LogStore> WorkerStarter<S> {
let (sender, receiver) = mpsc::channel(self.config.worker_channel_size);
let running = Arc::new(AtomicBool::new(true));
let default_memtable_builder = match &self.config.memtable {
MemtableConfig::PartitionTree(config) => Arc::new(PartitionTreeMemtableBuilder::new(
config.clone(),
Some(self.write_buffer_manager.clone()),
)) as _,
MemtableConfig::TimeSeries => Arc::new(TimeSeriesMemtableBuilder::new(Some(
self.write_buffer_manager.clone(),
))) as _,
};
let now = self.time_provider.current_time_millis();
let mut worker_thread = RegionWorkerLoop {
id: self.id,
config: self.config,
config: self.config.clone(),
regions: regions.clone(),
dropping_regions: Arc::new(RegionMap::default()),
sender: sender.clone(),
@@ -361,7 +349,7 @@ impl<S: LogStore> WorkerStarter<S> {
running: running.clone(),
memtable_builder_provider: MemtableBuilderProvider::new(
Some(self.write_buffer_manager.clone()),
default_memtable_builder,
self.config,
),
scheduler: self.scheduler.clone(),
write_buffer_manager: self.write_buffer_manager,

View File

@@ -17,11 +17,13 @@
use std::collections::{hash_map, HashMap};
use std::sync::Arc;
use api::v1::OpType;
use snafu::ensure;
use store_api::logstore::LogStore;
use store_api::metadata::RegionMetadata;
use store_api::storage::RegionId;
use crate::error::{RejectWriteSnafu, Result};
use crate::error::{InvalidRequestSnafu, RejectWriteSnafu, Result};
use crate::metrics::{
WRITE_REJECT_TOTAL, WRITE_ROWS_TOTAL, WRITE_STAGE_ELAPSED, WRITE_STALL_TOTAL,
};
@@ -162,6 +164,16 @@ impl<S> RegionWorkerLoop<S> {
// Safety: Now we ensure the region exists.
let region_ctx = region_ctxs.get_mut(&region_id).unwrap();
if let Err(e) = check_op_type(
region_ctx.version().options.append_mode,
&sender_req.request,
) {
// Do not allow non-put op under append mode.
sender_req.sender.send(Err(e));
continue;
}
// Checks whether request schema is compatible with region schema.
if let Err(e) =
maybe_fill_missing_columns(&mut sender_req.request, &region_ctx.version().metadata)
@@ -219,3 +231,18 @@ fn maybe_fill_missing_columns(request: &mut WriteRequest, metadata: &RegionMetad
Ok(())
}
/// Rejects delete request under append mode.
fn check_op_type(append_mode: bool, request: &WriteRequest) -> Result<()> {
if append_mode {
ensure!(
request.op_type == OpType::Put,
InvalidRequestSnafu {
region_id: request.region_id,
reason: "Only put is allowed under append mode",
}
);
}
Ok(())
}