feat(mito): Add writable flag to region (#2349)

* feat: add writable flag to region.

* refactor: rename MitoEngine to MitoEngine::scanner

* feat: add set_writable() to RegionEngine

* feat: check whether region is writable

* feat: make set_writable sync

* test: test set_writable

* docs: update comments

* feat: send result on compaction failure

* refactor: wrap output sender in new type

* feat: on failure

* refactor: use get_region_or/writable_region_or

* refactor: remove send_result

* feat: notify waiters on flush scheduler drop

* test: fix tests

* fix: only alter writable region
This commit is contained in:
Yingwen
2023-09-14 10:45:30 +08:00
committed by GitHub
parent da54a0c139
commit d4ae8a6fed
23 changed files with 357 additions and 178 deletions

View File

@@ -58,6 +58,7 @@ pub enum StatusCode {
DatabaseNotFound = 4004,
RegionNotFound = 4005,
RegionAlreadyExists = 4006,
RegionReadonly = 4007,
// ====== End of catalog related status code =======
// ====== Begin of storage related status code =====
@@ -117,6 +118,7 @@ impl StatusCode {
| StatusCode::TableNotFound
| StatusCode::RegionNotFound
| StatusCode::RegionAlreadyExists
| StatusCode::RegionReadonly
| StatusCode::TableColumnNotFound
| StatusCode::TableColumnExists
| StatusCode::DatabaseNotFound
@@ -151,6 +153,7 @@ impl StatusCode {
| StatusCode::TableNotFound
| StatusCode::RegionNotFound
| StatusCode::RegionAlreadyExists
| StatusCode::RegionReadonly
| StatusCode::TableColumnNotFound
| StatusCode::TableColumnExists
| StatusCode::DatabaseNotFound
@@ -183,6 +186,7 @@ impl StatusCode {
v if v == StatusCode::RegionAlreadyExists as u32 => {
Some(StatusCode::RegionAlreadyExists)
}
v if v == StatusCode::RegionReadonly as u32 => Some(StatusCode::RegionReadonly),
v if v == StatusCode::TableColumnNotFound as u32 => {
Some(StatusCode::TableColumnNotFound)
}

View File

@@ -21,18 +21,16 @@ mod twcs;
use std::sync::Arc;
use std::time::Duration;
use common_query::Output;
use common_telemetry::debug;
pub use picker::CompactionPickerRef;
use store_api::storage::{CompactionStrategy, RegionId, TwcsOptions};
use tokio::sync::{mpsc, oneshot};
use tokio::sync::mpsc;
use crate::access_layer::AccessLayerRef;
use crate::compaction::twcs::TwcsPicker;
use crate::error;
use crate::error::Result;
use crate::region::version::VersionRef;
use crate::request::WorkerRequest;
use crate::request::{OptionOutputTx, WorkerRequest};
use crate::schedule::scheduler::SchedulerRef;
use crate::sst::file_purger::FilePurgerRef;
@@ -43,7 +41,7 @@ pub struct CompactionRequest {
pub(crate) ttl: Option<Duration>,
pub(crate) compaction_time_window: Option<i64>,
pub(crate) request_sender: mpsc::Sender<WorkerRequest>,
pub(crate) waiter: Option<oneshot::Sender<error::Result<Output>>>,
pub(crate) waiter: OptionOutputTx,
pub(crate) file_purger: FilePurgerRef,
}

View File

@@ -27,7 +27,6 @@ use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::RegionId;
use tokio::sync::mpsc;
use tokio::sync::oneshot::Sender;
use crate::access_layer::AccessLayerRef;
use crate::compaction::output::CompactionOutput;
@@ -35,11 +34,12 @@ use crate::compaction::picker::{CompactionTask, Picker};
use crate::compaction::CompactionRequest;
use crate::error;
use crate::error::CompactRegionSnafu;
use crate::request::{BackgroundNotify, CompactionFailed, CompactionFinished, WorkerRequest};
use crate::request::{
BackgroundNotify, CompactionFailed, CompactionFinished, OptionOutputTx, WorkerRequest,
};
use crate::sst::file::{FileHandle, FileId, FileMeta};
use crate::sst::file_purger::FilePurgerRef;
use crate::sst::version::LevelMeta;
use crate::worker::send_result;
const MAX_PARALLEL_COMPACTION: usize = 8;
@@ -157,7 +157,7 @@ impl Picker for TwcsPicker {
if outputs.is_empty() && expired_ssts.is_empty() {
// Nothing to compact.
send_result(waiter, Ok(Output::AffectedRows(0)));
waiter.send(Ok(Output::AffectedRows(0)));
return None;
}
let task = TwcsCompactionTask {
@@ -228,7 +228,7 @@ pub(crate) struct TwcsCompactionTask {
/// Request sender to notify the worker.
pub(crate) request_sender: mpsc::Sender<WorkerRequest>,
/// Sender that are used to notify waiters waiting for pending compaction tasks.
pub sender: Option<Sender<error::Result<Output>>>,
pub sender: OptionOutputTx,
}
impl Debug for TwcsCompactionTask {
@@ -321,11 +321,10 @@ impl TwcsCompactionTask {
/// Handles compaction failure, notifies all waiters.
fn on_failure(&mut self, err: Arc<error::Error>) {
if let Some(sender) = self.sender.take() {
let _ = sender.send(Err(err.clone()).context(CompactRegionSnafu {
self.sender
.send_mut(Err(err.clone()).context(CompactRegionSnafu {
region_id: self.region_id,
}));
}
}
/// Notifies region worker to handle post-compaction tasks.

View File

@@ -78,7 +78,8 @@ impl MitoEngine {
self.inner.workers.is_region_exists(region_id)
}
fn scan(&self, region_id: RegionId, request: ScanRequest) -> Result<Scanner> {
/// Returns a scanner to scan for `request`.
fn scanner(&self, region_id: RegionId, request: ScanRequest) -> Result<Scanner> {
self.inner.handle_query(region_id, request)
}
@@ -143,6 +144,17 @@ impl EngineInner {
scan_region.scanner()
}
/// Set writable mode for a region.
fn set_writable(&self, region_id: RegionId, writable: bool) -> Result<()> {
let region = self
.workers
.get_region(region_id)
.context(RegionNotFoundSnafu { region_id })?;
region.set_writable(writable);
Ok(())
}
}
#[async_trait]
@@ -168,7 +180,7 @@ impl RegionEngine for MitoEngine {
region_id: RegionId,
request: ScanRequest,
) -> std::result::Result<SendableRecordBatchStream, BoxedError> {
self.scan(region_id, request)
self.scanner(region_id, request)
.map_err(BoxedError::new)?
.scan()
.await
@@ -191,6 +203,12 @@ impl RegionEngine for MitoEngine {
async fn stop(&self) -> std::result::Result<(), BoxedError> {
self.inner.stop().await.map_err(BoxedError::new)
}
fn set_writable(&self, region_id: RegionId, writable: bool) -> Result<(), BoxedError> {
self.inner
.set_writable(region_id, writable)
.map_err(BoxedError::new)
}
}
// Tests methods.

View File

@@ -34,7 +34,7 @@ use crate::test_util::{
async fn scan_check_after_alter(engine: &MitoEngine, region_id: RegionId, expected: &str) {
let request = ScanRequest::default();
let scanner = engine.scan(region_id, request).unwrap();
let scanner = engine.scanner(region_id, request).unwrap();
assert_eq!(0, scanner.num_memtables());
assert_eq!(1, scanner.num_files());
let stream = scanner.scan().await.unwrap();
@@ -206,6 +206,8 @@ async fn test_put_after_alter() {
)
.await
.unwrap();
// Set writable.
engine.set_writable(region_id, true).unwrap();
// Put with old schema.
let rows = Rows {

View File

@@ -52,7 +52,7 @@ async fn test_manual_flush() {
flush_region(&engine, region_id).await;
let request = ScanRequest::default();
let scanner = engine.scan(region_id, request).unwrap();
let scanner = engine.scanner(region_id, request).unwrap();
assert_eq!(0, scanner.num_memtables());
assert_eq!(1, scanner.num_files());
let stream = scanner.scan().await.unwrap();
@@ -110,7 +110,7 @@ async fn test_flush_engine() {
listener.wait().await;
let request = ScanRequest::default();
let scanner = engine.scan(region_id, request).unwrap();
let scanner = engine.scanner(region_id, request).unwrap();
assert_eq!(1, scanner.num_memtables());
assert_eq!(1, scanner.num_files());
let stream = scanner.scan().await.unwrap();
@@ -175,7 +175,7 @@ async fn test_write_stall() {
put_rows(&engine, region_id, rows).await;
let request = ScanRequest::default();
let scanner = engine.scan(region_id, request).unwrap();
let scanner = engine.scanner(region_id, request).unwrap();
assert_eq!(1, scanner.num_memtables());
assert_eq!(1, scanner.num_files());
let stream = scanner.scan().await.unwrap();
@@ -211,7 +211,7 @@ async fn test_flush_empty() {
flush_region(&engine, region_id).await;
let request = ScanRequest::default();
let scanner = engine.scan(region_id, request).unwrap();
let scanner = engine.scanner(region_id, request).unwrap();
assert_eq!(0, scanner.num_memtables());
assert_eq!(0, scanner.num_files());
let stream = scanner.scan().await.unwrap();
@@ -255,7 +255,7 @@ async fn test_flush_reopen_region() {
};
check_region();
reopen_region(&engine, region_id, region_dir).await;
reopen_region(&engine, region_id, region_dir, true).await;
check_region();
// Puts again.

View File

@@ -14,23 +14,27 @@
use std::collections::HashMap;
use api::v1::Rows;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{RegionOpenRequest, RegionRequest};
use store_api::region_request::{RegionOpenRequest, RegionPutRequest, RegionRequest};
use store_api::storage::RegionId;
use crate::config::MitoConfig;
use crate::test_util::{reopen_region, CreateRequestBuilder, TestEnv};
use crate::test_util::{
build_rows, put_rows, reopen_region, rows_schema, CreateRequestBuilder, TestEnv,
};
#[tokio::test]
async fn test_engine_open_empty() {
let mut env = TestEnv::with_prefix("open-empty");
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let err = engine
.handle_request(
RegionId::new(1, 1),
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: String::new(),
region_dir: "empty".to_string(),
@@ -39,10 +43,9 @@ async fn test_engine_open_empty() {
)
.await
.unwrap_err();
assert!(
matches!(err.status_code(), StatusCode::RegionNotFound),
"unexpected err: {err}"
);
assert_eq!(StatusCode::RegionNotFound, err.status_code());
let err = engine.set_writable(region_id, true).unwrap_err();
assert_eq!(StatusCode::RegionNotFound, err.status_code());
}
#[tokio::test]
@@ -84,6 +87,41 @@ async fn test_engine_reopen_region() {
.await
.unwrap();
reopen_region(&engine, region_id, region_dir).await;
reopen_region(&engine, region_id, region_dir, false).await;
assert!(engine.is_region_exists(region_id));
}
#[tokio::test]
async fn test_engine_open_readonly() {
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
let region_dir = request.region_dir.clone();
let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
reopen_region(&engine, region_id, region_dir, false).await;
// Region is readonly.
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows(0, 2),
};
let err = engine
.handle_request(
region_id,
RegionRequest::Put(RegionPutRequest { rows: rows.clone() }),
)
.await
.unwrap_err();
assert_eq!(StatusCode::RegionReadonly, err.status_code());
// Set writable and write.
engine.set_writable(region_id, true).unwrap();
put_rows(&engine, region_id, rows).await;
}

View File

@@ -125,8 +125,7 @@ async fn test_region_replay() {
assert_eq!(0, rows);
let request = ScanRequest::default();
let scanner = engine.scan(region_id, request).unwrap();
let stream = scanner.scan().await.unwrap();
let stream = engine.handle_query(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(42, batches.iter().map(|b| b.num_rows()).sum::<usize>());
@@ -216,8 +215,7 @@ async fn test_put_delete() {
delete_rows(&engine, region_id, rows).await;
let request = ScanRequest::default();
let scanner = engine.scan(region_id, request).unwrap();
let stream = scanner.scan().await.unwrap();
let stream = engine.handle_query(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+---------+---------------------+
@@ -274,8 +272,7 @@ async fn test_put_overwrite() {
put_rows(&engine, region_id, rows).await;
let request = ScanRequest::default();
let scanner = engine.scan(region_id, request).unwrap();
let stream = scanner.scan().await.unwrap();
let stream = engine.handle_query(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+---------+---------------------+

View File

@@ -469,6 +469,12 @@ pub enum Error {
source: store_api::metadata::MetadataError,
location: Location,
},
#[snafu(display("Region {} is read only, location: {}", region_id, location))]
RegionReadonly {
region_id: RegionId,
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -539,6 +545,7 @@ impl ErrorExt for Error {
CompactRegion { source, .. } => source.status_code(),
CompatReader { .. } => StatusCode::Unexpected,
InvalidRegionRequest { source, .. } => source.status_code(),
RegionReadonly { .. } => StatusCode::RegionReadonly,
}
}

View File

@@ -22,7 +22,7 @@ use common_query::Output;
use common_telemetry::{error, info};
use snafu::ResultExt;
use store_api::storage::RegionId;
use tokio::sync::{mpsc, oneshot};
use tokio::sync::mpsc;
use crate::access_layer::AccessLayerRef;
use crate::error::{Error, FlushRegionSnafu, RegionClosedSnafu, RegionDroppedSnafu, Result};
@@ -31,14 +31,13 @@ use crate::read::Source;
use crate::region::version::{VersionControlData, VersionRef};
use crate::region::MitoRegionRef;
use crate::request::{
BackgroundNotify, FlushFailed, FlushFinished, SenderDdlRequest, SenderWriteRequest,
WorkerRequest,
BackgroundNotify, FlushFailed, FlushFinished, OptionOutputTx, OutputTx, SenderDdlRequest,
SenderWriteRequest, WorkerRequest,
};
use crate::schedule::scheduler::{Job, SchedulerRef};
use crate::sst::file::{FileId, FileMeta};
use crate::sst::file_purger::FilePurgerRef;
use crate::sst::parquet::WriteOptions;
use crate::worker::send_result;
/// Global write buffer (memtable) manager.
///
@@ -178,7 +177,7 @@ pub(crate) struct RegionFlushTask {
/// Reason to flush.
pub(crate) reason: FlushReason,
/// Flush result senders.
pub(crate) senders: Vec<oneshot::Sender<Result<Output>>>,
pub(crate) senders: Vec<OutputTx>,
/// Request sender to notify the worker.
pub(crate) request_sender: mpsc::Sender<WorkerRequest>,
@@ -188,18 +187,24 @@ pub(crate) struct RegionFlushTask {
}
impl RegionFlushTask {
/// Push the sender if it is not none.
pub(crate) fn push_sender(&mut self, mut sender: OptionOutputTx) {
if let Some(sender) = sender.take_inner() {
self.senders.push(sender);
}
}
/// Consumes the task and notify the sender the job is success.
fn on_success(self) {
for sender in self.senders {
let _ = sender.send(Ok(Output::AffectedRows(0)));
sender.send(Ok(Output::AffectedRows(0)));
}
}
/// Send flush error to waiter.
fn on_failure(&mut self, err: Arc<Error>) {
for sender in self.senders.drain(..) {
// Ignore send result.
let _ = sender.send(Err(err.clone()).context(FlushRegionSnafu {
sender.send(Err(err.clone()).context(FlushRegionSnafu {
region_id: self.region_id,
}));
}
@@ -516,6 +521,15 @@ impl FlushScheduler {
}
}
impl Drop for FlushScheduler {
fn drop(&mut self) {
for (region_id, flush_status) in self.region_status.drain() {
// We are shutting down so notify all pending tasks.
flush_status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build()));
}
}
}
/// Flush status of a region scheduled by the [FlushScheduler].
///
/// Tracks running and pending flush tasks and all pending requests of a region.
@@ -557,20 +571,16 @@ impl FlushStatus {
task.on_failure(err.clone());
}
for ddl in self.pending_ddls {
send_result(
ddl.sender,
Err(err.clone()).context(FlushRegionSnafu {
region_id: self.region.region_id,
}),
);
ddl.sender.send(Err(err.clone()).context(FlushRegionSnafu {
region_id: self.region.region_id,
}));
}
for write_req in self.pending_writes {
send_result(
write_req.sender,
Err(err.clone()).context(FlushRegionSnafu {
write_req
.sender
.send(Err(err.clone()).context(FlushRegionSnafu {
region_id: self.region.region_id,
}),
);
}));
}
}
}

View File

@@ -18,18 +18,20 @@ pub(crate) mod opener;
pub(crate) mod version;
use std::collections::HashMap;
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
use std::sync::{Arc, RwLock};
use common_telemetry::info;
use common_time::util::current_time_millis;
use snafu::{ensure, OptionExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::RegionId;
use crate::access_layer::AccessLayerRef;
use crate::error::Result;
use crate::error::{RegionNotFoundSnafu, RegionReadonlySnafu, Result};
use crate::manifest::manager::RegionManifestManager;
use crate::region::version::{VersionControlRef, VersionRef};
use crate::request::OnFailure;
use crate::sst::file_purger::FilePurgerRef;
/// Metadata and runtime status of a region.
@@ -55,6 +57,8 @@ pub(crate) struct MitoRegion {
pub(crate) file_purger: FilePurgerRef,
/// Last flush time in millis.
last_flush_millis: AtomicI64,
/// Whether the region is writable.
writable: AtomicBool,
}
pub(crate) type MitoRegionRef = Arc<MitoRegion>;
@@ -94,6 +98,16 @@ impl MitoRegion {
let now = current_time_millis();
self.last_flush_millis.store(now, Ordering::Relaxed);
}
/// Returns whether the region is writable.
pub(crate) fn is_writable(&self) -> bool {
self.writable.load(Ordering::Relaxed)
}
/// Sets the writable flag.
pub(crate) fn set_writable(&self, writable: bool) {
self.writable.store(writable, Ordering::Relaxed);
}
}
/// Regions indexed by ids.
@@ -115,12 +129,40 @@ impl RegionMap {
regions.insert(region.region_id, region);
}
/// Get region by region id.
/// Gets region by region id.
pub(crate) fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
let regions = self.regions.read().unwrap();
regions.get(&region_id).cloned()
}
/// Gets writable region by region id.
///
/// Returns error if the region does not exist or is readonly.
pub(crate) fn writable_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
let region = self
.get_region(region_id)
.context(RegionNotFoundSnafu { region_id })?;
ensure!(region.is_writable(), RegionReadonlySnafu { region_id });
Ok(region)
}
/// Gets writable region by region id.
///
/// Calls the callback if the region does not exist or is readonly.
pub(crate) fn writable_region_or<F: OnFailure>(
&self,
region_id: RegionId,
cb: &mut F,
) -> Option<MitoRegionRef> {
match self.writable_region(region_id) {
Ok(region) => Some(region),
Err(e) => {
cb.on_failure(e);
None
}
}
}
/// Remove region by id.
pub(crate) fn remove_region(&self, region_id: RegionId) {
let mut regions = self.regions.write().unwrap();

View File

@@ -14,7 +14,7 @@
//! Region opener.
use std::sync::atomic::AtomicI64;
use std::sync::atomic::{AtomicBool, AtomicI64};
use std::sync::Arc;
use common_telemetry::info;
@@ -35,6 +35,7 @@ use crate::memtable::MemtableBuilderRef;
use crate::region::version::{VersionBuilder, VersionControl, VersionControlRef};
use crate::region::MitoRegion;
use crate::region_write_ctx::RegionWriteCtx;
use crate::request::OptionOutputTx;
use crate::schedule::scheduler::SchedulerRef;
use crate::sst::file_purger::LocalFilePurger;
use crate::wal::{EntryId, Wal};
@@ -110,10 +111,12 @@ impl RegionOpener {
manifest_manager,
file_purger: Arc::new(LocalFilePurger::new(self.scheduler, access_layer)),
last_flush_millis: AtomicI64::new(current_time_millis()),
// Region is writable after it is created.
writable: AtomicBool::new(true),
})
}
/// Opens an existing region.
/// Opens an existing region in read only mode.
///
/// Returns error if the region doesn't exist.
pub(crate) async fn open<S: LogStore>(
@@ -165,6 +168,8 @@ impl RegionOpener {
manifest_manager,
file_purger,
last_flush_millis: AtomicI64::new(current_time_millis()),
// Region is always opened in read only mode.
writable: AtomicBool::new(false),
};
Ok(region)
}
@@ -192,7 +197,7 @@ async fn replay_memtable<S: LogStore>(
.as_ref()
.map(|rows| rows.rows.len())
.unwrap_or(0);
region_write_ctx.push_mutation(mutation.op_type, mutation.rows, None);
region_write_ctx.push_mutation(mutation.op_type, mutation.rows, OptionOutputTx::none());
}
}

View File

@@ -20,11 +20,11 @@ use common_query::Output;
use snafu::ResultExt;
use store_api::logstore::LogStore;
use store_api::storage::{RegionId, SequenceNumber};
use tokio::sync::oneshot::Sender;
use crate::error::{Error, Result, WriteGroupSnafu};
use crate::memtable::KeyValues;
use crate::region::version::{VersionControlData, VersionControlRef, VersionRef};
use crate::request::OptionOutputTx;
use crate::wal::{EntryId, WalWriter};
/// Context to keep region metadata and buffer write requests.
@@ -34,14 +34,14 @@ struct WriteNotify {
/// Error to send to the waiter.
err: Option<Arc<Error>>,
/// Sender to send write result to the waiter for this mutation.
sender: Option<Sender<Result<Output>>>,
sender: OptionOutputTx,
/// Number of rows to be written.
num_rows: usize,
}
impl WriteNotify {
/// Creates a new notify from the `sender`.
fn new(sender: Option<Sender<Result<Output>>>, num_rows: usize) -> WriteNotify {
fn new(sender: OptionOutputTx, num_rows: usize) -> WriteNotify {
WriteNotify {
err: None,
sender,
@@ -51,15 +51,14 @@ impl WriteNotify {
/// Send result to the waiter.
fn notify_result(&mut self) {
let Some(sender) = self.sender.take() else {
return;
};
if let Some(err) = &self.err {
// Try to send the error to waiters.
let _ = sender.send(Err(err.clone()).context(WriteGroupSnafu));
self.sender
.send_mut(Err(err.clone()).context(WriteGroupSnafu));
} else {
// Send success result.
let _ = sender.send(Ok(Output::AffectedRows(self.num_rows)));
self.sender
.send_mut(Ok(Output::AffectedRows(self.num_rows)));
}
}
}
@@ -117,12 +116,7 @@ impl RegionWriteCtx {
}
/// Push [SenderWriteRequest] to the context.
pub(crate) fn push_mutation(
&mut self,
op_type: i32,
rows: Option<Rows>,
tx: Option<Sender<Result<Output>>>,
) {
pub(crate) fn push_mutation(&mut self, op_type: i32, rows: Option<Rows>, tx: OptionOutputTx) {
let num_rows = rows.as_ref().map(|rows| rows.rows.len()).unwrap_or(0);
self.wal_entry.mutations.push(Mutation {
op_type,

View File

@@ -391,11 +391,86 @@ pub(crate) fn validate_proto_value(
Ok(())
}
/// Oneshot output result sender.
#[derive(Debug)]
pub(crate) struct OutputTx(Sender<Result<Output>>);
impl OutputTx {
/// Creates a new output sender.
pub(crate) fn new(sender: Sender<Result<Output>>) -> OutputTx {
OutputTx(sender)
}
/// Sends the `result`.
pub(crate) fn send(self, result: Result<Output>) {
// Ignores send result.
let _ = self.0.send(result);
}
}
/// Optional output result sender.
#[derive(Debug)]
pub(crate) struct OptionOutputTx(Option<OutputTx>);
impl OptionOutputTx {
/// Creates a sender.
pub(crate) fn new(sender: Option<OutputTx>) -> OptionOutputTx {
OptionOutputTx(sender)
}
/// Creates an empty sender.
pub(crate) fn none() -> OptionOutputTx {
OptionOutputTx(None)
}
/// Sends the `result` and consumes the inner sender.
pub(crate) fn send_mut(&mut self, result: Result<Output>) {
if let Some(sender) = self.0.take() {
sender.send(result);
}
}
/// Sends the `result` and consumes the sender.
pub(crate) fn send(mut self, result: Result<Output>) {
if let Some(sender) = self.0.take() {
sender.send(result);
}
}
/// Takes the sender.
pub(crate) fn take(&mut self) -> OptionOutputTx {
OptionOutputTx(self.0.take())
}
/// Takes the inner sender.
pub(crate) fn take_inner(&mut self) -> Option<OutputTx> {
self.0.take()
}
}
impl From<Sender<Result<Output>>> for OptionOutputTx {
fn from(sender: Sender<Result<Output>>) -> Self {
Self::new(Some(OutputTx::new(sender)))
}
}
impl OnFailure for OptionOutputTx {
fn on_failure(&mut self, err: Error) {
self.send_mut(Err(err));
}
}
/// Callback on failure.
pub(crate) trait OnFailure {
/// Handles `err` on failure.
fn on_failure(&mut self, err: Error);
}
/// Sender and write request.
#[derive(Debug)]
pub(crate) struct SenderWriteRequest {
/// Result sender.
pub(crate) sender: Option<Sender<Result<Output>>>,
pub(crate) sender: OptionOutputTx,
pub(crate) request: WriteRequest,
}
@@ -431,50 +506,50 @@ impl WorkerRequest {
RegionRequest::Put(v) => {
let write_request = WriteRequest::new(region_id, OpType::Put, v.rows)?;
WorkerRequest::Write(SenderWriteRequest {
sender: Some(sender),
sender: sender.into(),
request: write_request,
})
}
RegionRequest::Delete(v) => {
let write_request = WriteRequest::new(region_id, OpType::Delete, v.rows)?;
WorkerRequest::Write(SenderWriteRequest {
sender: Some(sender),
sender: sender.into(),
request: write_request,
})
}
RegionRequest::Create(v) => WorkerRequest::Ddl(SenderDdlRequest {
region_id,
sender: Some(sender),
sender: sender.into(),
request: DdlRequest::Create(v),
}),
RegionRequest::Drop(v) => WorkerRequest::Ddl(SenderDdlRequest {
region_id,
sender: Some(sender),
sender: sender.into(),
request: DdlRequest::Drop(v),
}),
RegionRequest::Open(v) => WorkerRequest::Ddl(SenderDdlRequest {
region_id,
sender: Some(sender),
sender: sender.into(),
request: DdlRequest::Open(v),
}),
RegionRequest::Close(v) => WorkerRequest::Ddl(SenderDdlRequest {
region_id,
sender: Some(sender),
sender: sender.into(),
request: DdlRequest::Close(v),
}),
RegionRequest::Alter(v) => WorkerRequest::Ddl(SenderDdlRequest {
region_id,
sender: Some(sender),
sender: sender.into(),
request: DdlRequest::Alter(v),
}),
RegionRequest::Flush(v) => WorkerRequest::Ddl(SenderDdlRequest {
region_id,
sender: Some(sender),
sender: sender.into(),
request: DdlRequest::Flush(v),
}),
RegionRequest::Compact(v) => WorkerRequest::Ddl(SenderDdlRequest {
region_id,
sender: Some(sender),
sender: sender.into(),
request: DdlRequest::Compact(v),
}),
};
@@ -501,7 +576,7 @@ pub(crate) struct SenderDdlRequest {
/// Region id of the request.
pub(crate) region_id: RegionId,
/// Result sender.
pub(crate) sender: Option<Sender<Result<Output>>>,
pub(crate) sender: OptionOutputTx,
/// Ddl request.
pub(crate) request: DdlRequest,
}
@@ -533,35 +608,35 @@ pub(crate) struct FlushFinished {
/// Id of memtables to remove.
pub(crate) memtables_to_remove: SmallVec<[MemtableId; 2]>,
/// Flush result senders.
pub(crate) senders: Vec<oneshot::Sender<Result<Output>>>,
pub(crate) senders: Vec<OutputTx>,
/// File purger for cleaning files on failure.
pub(crate) file_purger: FilePurgerRef,
}
impl FlushFinished {
pub(crate) fn on_failure(self, err: Error) {
let err = Arc::new(err);
pub(crate) fn on_success(self) {
for sender in self.senders {
// Ignore send result.
let _ = sender.send(Err(err.clone()).context(FlushRegionSnafu {
sender.send(Ok(Output::AffectedRows(0)));
}
}
}
impl OnFailure for FlushFinished {
fn on_failure(&mut self, err: Error) {
let err = Arc::new(err);
for sender in self.senders.drain(..) {
sender.send(Err(err.clone()).context(FlushRegionSnafu {
region_id: self.region_id,
}));
}
// Clean flushed files.
for file in self.file_metas {
for file in &self.file_metas {
self.file_purger.send_request(PurgeRequest {
region_id: file.region_id,
file_id: file.file_id,
});
}
}
pub(crate) fn on_success(self) {
for sender in self.senders {
// Ignore send result.
let _ = sender.send(Ok(Output::AffectedRows(0)));
}
}
}
/// Notifies a flush job is failed.
@@ -581,22 +656,23 @@ pub(crate) struct CompactionFinished {
/// Compacted files that are to be removed from region version.
pub(crate) compacted_files: Vec<FileMeta>,
/// Compaction result sender.
pub(crate) sender: Option<oneshot::Sender<Result<Output>>>,
pub(crate) sender: OptionOutputTx,
/// File purger for cleaning files on failure.
pub(crate) file_purger: FilePurgerRef,
}
impl CompactionFinished {
pub fn on_success(self) {
if let Some(sender) = self.sender {
let _ = sender.send(Ok(AffectedRows(0)));
}
self.sender.send(Ok(AffectedRows(0)));
info!("Successfully compacted region: {}", self.region_id);
}
}
impl OnFailure for CompactionFinished {
/// Compaction succeeded but failed to update manifest or region's already been dropped,
/// clean compaction output files.
pub fn on_failure(self, _err: Error) {
fn on_failure(&mut self, err: Error) {
self.sender.send_mut(Err(err));
for file in &self.compacted_files {
let file_id = file.file_id;
warn!(

View File

@@ -569,7 +569,12 @@ pub async fn flush_region(engine: &MitoEngine, region_id: RegionId) {
}
/// Reopen a region.
pub async fn reopen_region(engine: &MitoEngine, region_id: RegionId, region_dir: String) {
pub async fn reopen_region(
engine: &MitoEngine,
region_id: RegionId,
region_dir: String,
writable: bool,
) {
// Close the region.
engine
.handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
@@ -588,4 +593,8 @@ pub async fn reopen_region(engine: &MitoEngine, region_id: RegionId, region_dir:
)
.await
.unwrap();
if writable {
engine.set_writable(region_id, true).unwrap();
}
}

View File

@@ -28,7 +28,6 @@ use std::hash::{Hash, Hasher};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use common_query::Output;
use common_runtime::JoinHandle;
use common_telemetry::{error, info, warn};
use futures::future::try_join_all;
@@ -37,7 +36,7 @@ use snafu::{ensure, ResultExt};
use store_api::logstore::LogStore;
use store_api::storage::RegionId;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::{mpsc, oneshot, Mutex};
use tokio::sync::{mpsc, Mutex};
use crate::compaction::CompactionScheduler;
use crate::config::MitoConfig;
@@ -177,14 +176,6 @@ impl WorkerGroup {
}
}
/// Send result to the sender.
pub(crate) fn send_result(sender: Option<oneshot::Sender<Result<Output>>>, res: Result<Output>) {
if let Some(sender) = sender {
// Ignore send result.
let _ = sender.send(res);
}
}
// Tests methods.
#[cfg(test)]
impl WorkerGroup {
@@ -514,10 +505,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
}
};
if let Some(sender) = ddl.sender {
// Ignore send result.
let _ = sender.send(res);
}
ddl.sender.send(res);
}
}

View File

@@ -22,26 +22,24 @@ use snafu::ResultExt;
use store_api::metadata::{RegionMetadata, RegionMetadataBuilder, RegionMetadataRef};
use store_api::region_request::RegionAlterRequest;
use store_api::storage::RegionId;
use tokio::sync::oneshot;
use crate::error::{InvalidMetadataSnafu, InvalidRegionRequestSnafu, RegionNotFoundSnafu, Result};
use crate::error::{InvalidMetadataSnafu, InvalidRegionRequestSnafu, Result};
use crate::flush::FlushReason;
use crate::manifest::action::{RegionChange, RegionMetaAction, RegionMetaActionList};
use crate::memtable::MemtableBuilderRef;
use crate::region::version::Version;
use crate::region::MitoRegionRef;
use crate::request::{DdlRequest, SenderDdlRequest};
use crate::worker::{send_result, RegionWorkerLoop};
use crate::request::{DdlRequest, OptionOutputTx, SenderDdlRequest};
use crate::worker::RegionWorkerLoop;
impl<S> RegionWorkerLoop<S> {
pub(crate) async fn handle_alter_request(
&mut self,
region_id: RegionId,
request: RegionAlterRequest,
sender: Option<oneshot::Sender<Result<Output>>>,
mut sender: OptionOutputTx,
) {
let Some(region) = self.regions.get_region(region_id) else {
send_result(sender, RegionNotFoundSnafu { region_id }.fail());
let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else {
return;
};
@@ -59,7 +57,7 @@ impl<S> RegionWorkerLoop<S> {
let task = self.new_flush_task(&region, FlushReason::Alter);
if let Err(e) = self.flush_scheduler.schedule_flush(&region, task) {
// Unable to flush the region, send error to waiter.
send_result(sender, Err(e));
sender.send(Err(e));
return;
}
@@ -79,7 +77,7 @@ impl<S> RegionWorkerLoop<S> {
alter_region_schema(&region, &version, request, &self.memtable_builder).await
{
error!(e; "Failed to alter region schema, region_id: {}", region_id);
send_result(sender, Err(e));
sender.send(Err(e));
return;
}
@@ -91,7 +89,7 @@ impl<S> RegionWorkerLoop<S> {
);
// Notifies waiters.
send_result(sender, Ok(Output::AffectedRows(0)));
sender.send(Ok(Output::AffectedRows(0)));
}
}

View File

@@ -12,28 +12,24 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_query::Output;
use common_telemetry::{error, info};
use store_api::logstore::LogStore;
use store_api::storage::RegionId;
use tokio::sync::oneshot;
use crate::compaction::CompactionRequest;
use crate::error::{RegionNotFoundSnafu, Result};
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
use crate::region::MitoRegionRef;
use crate::request::{CompactionFailed, CompactionFinished};
use crate::worker::{send_result, RegionWorkerLoop};
use crate::request::{CompactionFailed, CompactionFinished, OnFailure, OptionOutputTx};
use crate::worker::RegionWorkerLoop;
impl<S: LogStore> RegionWorkerLoop<S> {
/// Handles compaction request submitted to region worker.
pub(crate) fn handle_compaction_request(
&mut self,
region_id: RegionId,
sender: Option<oneshot::Sender<Result<Output>>>,
mut sender: OptionOutputTx,
) {
let Some(region) = self.regions.get_region(region_id) else {
send_result(sender, RegionNotFoundSnafu { region_id }.fail());
let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else {
return;
};
@@ -54,8 +50,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
region_id: RegionId,
mut request: CompactionFinished,
) {
let Some(region) = self.regions.get_region(region_id) else {
request.on_failure(RegionNotFoundSnafu { region_id }.build());
let Some(region) = self.regions.writable_region_or(region_id, &mut request) else {
return;
};
@@ -90,7 +85,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
fn new_compaction_request(
&self,
region: &MitoRegionRef,
waiter: Option<oneshot::Sender<Result<Output>>>,
waiter: OptionOutputTx,
) -> CompactionRequest {
let current_version = region.version_control.current().version;
let access_layer = region.access_layer.clone();

View File

@@ -26,7 +26,7 @@ use snafu::ResultExt;
use store_api::storage::RegionId;
use tokio::time::sleep;
use crate::error::{self, OpenDalSnafu, RegionNotFoundSnafu, Result};
use crate::error::{OpenDalSnafu, Result};
use crate::region::RegionMapRef;
use crate::worker::{RegionWorkerLoop, DROPPING_MARKER_FILE};
@@ -35,9 +35,7 @@ const MAX_RETRY_TIMES: u64 = 288; // 24 hours (5m * 288)
impl<S> RegionWorkerLoop<S> {
pub(crate) async fn handle_drop_request(&mut self, region_id: RegionId) -> Result<Output> {
let Some(region) = self.regions.get_region(region_id) else {
return RegionNotFoundSnafu { region_id }.fail();
};
let region = self.regions.writable_region(region_id)?;
info!("Try to drop region: {}", region_id);
@@ -119,7 +117,7 @@ pub(crate) async fn remove_region_dir_once(
.lister_with(region_path)
.await
.context(OpenDalSnafu)?;
while let Some(file) = files.try_next().await.context(error::OpenDalSnafu)? {
while let Some(file) = files.try_next().await.context(OpenDalSnafu)? {
if file.path().ends_with(".parquet") {
has_parquet_file = true;
break;

View File

@@ -14,19 +14,17 @@
//! Handling flush related requests.
use common_query::Output;
use common_telemetry::{error, info};
use common_time::util::current_time_millis;
use store_api::logstore::LogStore;
use store_api::storage::RegionId;
use tokio::sync::oneshot;
use crate::error::{RegionNotFoundSnafu, Result};
use crate::error::Result;
use crate::flush::{FlushReason, RegionFlushTask};
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
use crate::region::MitoRegionRef;
use crate::request::{FlushFailed, FlushFinished};
use crate::worker::{send_result, RegionWorkerLoop};
use crate::request::{FlushFailed, FlushFinished, OnFailure, OptionOutputTx};
use crate::worker::RegionWorkerLoop;
impl<S: LogStore> RegionWorkerLoop<S> {
/// On region flush job finished.
@@ -35,9 +33,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
region_id: RegionId,
mut request: FlushFinished,
) {
let Some(region) = self.regions.get_region(region_id) else {
// We may dropped or closed the region.
request.on_failure(RegionNotFoundSnafu { region_id }.build());
let Some(region) = self.regions.writable_region_or(region_id, &mut request) else {
return;
};
@@ -102,17 +98,14 @@ impl<S> RegionWorkerLoop<S> {
pub(crate) async fn handle_flush_request(
&mut self,
region_id: RegionId,
sender: Option<oneshot::Sender<Result<Output>>>,
mut sender: OptionOutputTx,
) {
let Some(region) = self.regions.get_region(region_id) else {
send_result(sender, RegionNotFoundSnafu { region_id }.fail());
let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else {
return;
};
let mut task = self.new_flush_task(&region, FlushReason::Manual);
if let Some(sender) = sender {
task.senders.push(sender);
}
task.push_sender(sender);
if let Err(e) = self.flush_scheduler.schedule_flush(&region, task) {
error!(e; "Failed to schedule flush task for region {}", region.region_id);
}

View File

@@ -21,10 +21,10 @@ use store_api::logstore::LogStore;
use store_api::metadata::RegionMetadata;
use store_api::storage::RegionId;
use crate::error::{RegionNotFoundSnafu, RejectWriteSnafu, Result};
use crate::error::{RejectWriteSnafu, Result};
use crate::region_write_ctx::RegionWriteCtx;
use crate::request::{SenderWriteRequest, WriteRequest};
use crate::worker::{send_result, RegionWorkerLoop};
use crate::worker::RegionWorkerLoop;
impl<S: LogStore> RegionWorkerLoop<S> {
/// Takes and handles all write requests.
@@ -102,10 +102,11 @@ impl<S> RegionWorkerLoop<S> {
// Checks whether the region exists and is it stalling.
if let hash_map::Entry::Vacant(e) = region_ctxs.entry(region_id) {
let Some(region) = self.regions.get_region(region_id) else {
// No such region.
send_result(sender_req.sender, RegionNotFoundSnafu { region_id }.fail());
let Some(region) = self
.regions
.writable_region_or(region_id, &mut sender_req.sender)
else {
// No such region or the region is read only.
continue;
};
@@ -121,7 +122,7 @@ impl<S> RegionWorkerLoop<S> {
if let Err(e) =
maybe_fill_missing_columns(&mut sender_req.request, &region_ctx.version().metadata)
{
send_result(sender_req.sender, Err(e));
sender_req.sender.send(Err(e));
continue;
}
@@ -148,14 +149,12 @@ impl<S> RegionWorkerLoop<S> {
/// Send rejected error to all `write_requests`.
fn reject_write_requests(write_requests: Vec<SenderWriteRequest>) {
for req in write_requests {
if let Some(sender) = req.sender {
let _ = sender.send(
RejectWriteSnafu {
region_id: req.request.region_id,
}
.fail(),
);
}
req.sender.send(
RejectWriteSnafu {
region_id: req.request.region_id,
}
.fail(),
);
}
}

View File

@@ -479,7 +479,9 @@ pub fn status_to_tonic_code(status_code: StatusCode) -> Code {
| StatusCode::UserPasswordMismatch
| StatusCode::AuthHeaderNotFound
| StatusCode::InvalidAuthHeader => Code::Unauthenticated,
StatusCode::AccessDenied | StatusCode::PermissionDenied => Code::PermissionDenied,
StatusCode::AccessDenied | StatusCode::PermissionDenied | StatusCode::RegionReadonly => {
Code::PermissionDenied
}
}
}

View File

@@ -30,7 +30,7 @@ pub trait RegionEngine: Send + Sync {
/// Name of this engine
fn name(&self) -> &str;
/// Handle request to the region.
/// Handles request to the region.
///
/// Only query is not included, which is handled in `handle_query`
async fn handle_request(
@@ -39,18 +39,25 @@ pub trait RegionEngine: Send + Sync {
request: RegionRequest,
) -> Result<Output, BoxedError>;
/// Handle substrait query and return a stream of record batches
/// Handles substrait query and return a stream of record batches
async fn handle_query(
&self,
region_id: RegionId,
request: ScanRequest,
) -> Result<SendableRecordBatchStream, BoxedError>;
/// Retrieve region's metadata.
/// Retrieves region's metadata.
async fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef, BoxedError>;
/// Stop the engine
/// Stops the engine
async fn stop(&self) -> Result<(), BoxedError>;
/// Sets writable mode for a region.
///
/// The engine checks whether the region is writable before writing to the region. Setting
/// the region as readonly doesn't guarantee that write operations in progress will not
/// take effect.
fn set_writable(&self, region_id: RegionId, writable: bool) -> Result<(), BoxedError>;
}
pub type RegionEngineRef = Arc<dyn RegionEngine>;