feat(mito): Implement WorkerGroup to handle requests (#1950)

* feat: engine worker framework

* feat: worder comments

* feat: divide worker requests by type

* feat: handlers for worker thread

* refactor: rename requests to ddl and dml requests

* feat: methods to stop and submit requests

* refactor: rename request queue to request buffer

* refactor: remove ddl and dml request

* feat: send request to worker

* test: test stop

* docs(mito): worker group docs

* style: fix clippy

* docs: update WorkerGroup comment

* chore: address CR comments

* chore: fix comment issues

* feat: use mpsc::channel

* feat: check is_running flag

* chore: Add stop request to notify a worker

* refactor: add join_dir to join paths

* feat: redefine region requests

* docs: more comments

* refactor: rename worker thread to worker loop

* chore: address CR comments
This commit is contained in:
Yingwen
2023-07-14 17:06:44 +09:00
committed by GitHub
parent ce43896a0b
commit ef7c5dd311
13 changed files with 832 additions and 15 deletions

1
Cargo.lock generated
View File

@@ -5464,6 +5464,7 @@ dependencies = [
"async-stream",
"async-trait",
"chrono",
"common-base",
"common-catalog",
"common-datasource",
"common-error",

View File

@@ -4,6 +4,10 @@ version.workspace = true
edition.workspace = true
license.workspace = true
[features]
default = []
test = ["common-test-util"]
[dependencies]
aquamarine = "0.3"
anymap = "1.0.0-beta.2"
@@ -12,6 +16,7 @@ async-stream.workspace = true
async-trait = "0.1"
chrono.workspace = true
common-catalog = { path = "../common/catalog" }
common-base = { path = "../common/base" }
common-error = { path = "../common/error" }
common-procedure = { path = "../common/procedure" }
common-query = { path = "../common/query" }

View File

@@ -14,6 +14,52 @@
//! Configurations.
use common_telemetry::logging;
const DEFAULT_NUM_WORKERS: usize = 1;
/// Configuration for [MitoEngine](crate::engine::MitoEngine).
#[derive(Debug)]
pub struct MitoConfig {}
pub struct MitoConfig {
/// Number of region workers.
pub num_workers: usize,
/// Request channel size of each worker.
pub worker_channel_size: usize,
/// Max batch size for a worker to handle requests.
pub worker_request_batch_size: usize,
}
impl Default for MitoConfig {
fn default() -> Self {
MitoConfig {
num_workers: DEFAULT_NUM_WORKERS,
worker_channel_size: 128,
worker_request_batch_size: 64,
}
}
}
impl MitoConfig {
/// Sanitize incorrect configurations.
pub(crate) fn sanitize(&mut self) {
// Sanitize worker num.
let num_workers_before = self.num_workers;
if self.num_workers == 0 {
self.num_workers = DEFAULT_NUM_WORKERS;
}
self.num_workers = self.num_workers.next_power_of_two();
if num_workers_before != self.num_workers {
logging::warn!(
"Sanitize worker num {} to {}",
num_workers_before,
self.num_workers
);
}
// Sanitize channel size.
if self.worker_channel_size == 0 {
logging::warn!("Sanitize channel size 0 to 1");
self.worker_channel_size = 1;
}
}
}

View File

@@ -15,8 +15,13 @@
use std::sync::Arc;
use object_store::ObjectStore;
use snafu::ResultExt;
use store_api::logstore::LogStore;
use crate::config::MitoConfig;
use crate::error::{RecvSnafu, Result};
pub use crate::worker::request::CreateRequest;
use crate::worker::request::{RegionRequest, RequestBody};
use crate::worker::WorkerGroup;
/// Region engine implementation for timeseries data.
@@ -27,11 +32,27 @@ pub struct MitoEngine {
impl MitoEngine {
/// Returns a new [MitoEngine] with specific `config`, `log_store` and `object_store`.
pub fn new<S>(config: MitoConfig, log_store: S, object_store: ObjectStore) -> MitoEngine {
pub fn new<S: LogStore>(
mut config: MitoConfig,
log_store: Arc<S>,
object_store: ObjectStore,
) -> MitoEngine {
config.sanitize();
MitoEngine {
inner: Arc::new(EngineInner::new(config, log_store, object_store)),
}
}
/// Stop the engine.
pub async fn stop(&self) -> Result<()> {
self.inner.stop().await
}
/// Creates a new region.
pub async fn create_region(&self, request: CreateRequest) -> Result<()> {
self.inner.create_region(request).await
}
}
/// Inner struct of [MitoEngine].
@@ -42,9 +63,40 @@ struct EngineInner {
impl EngineInner {
/// Returns a new [EngineInner] with specific `config`, `log_store` and `object_store`.
fn new<S>(_config: MitoConfig, _log_store: S, _object_store: ObjectStore) -> EngineInner {
fn new<S: LogStore>(
config: MitoConfig,
log_store: Arc<S>,
object_store: ObjectStore,
) -> EngineInner {
EngineInner {
workers: WorkerGroup::default(),
workers: WorkerGroup::start(&config, log_store, object_store),
}
}
/// Stop the inner engine.
async fn stop(&self) -> Result<()> {
self.workers.stop().await
}
/// Creates a new region.
async fn create_region(&self, create_request: CreateRequest) -> Result<()> {
let (request, receiver) = RegionRequest::from_body(RequestBody::Create(create_request));
self.workers.submit_to_worker(request).await?;
receiver.await.context(RecvSnafu)?
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_util::TestEnv;
#[tokio::test]
async fn test_engine_new_stop() {
let env = TestEnv::new("engine-stop");
let engine = env.create_engine(MitoConfig::default()).await;
engine.stop().await.unwrap();
}
}

View File

@@ -20,6 +20,8 @@ use common_error::status_code::StatusCode;
use snafu::{Location, Snafu};
use store_api::manifest::ManifestVersion;
use crate::worker::WorkerId;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
@@ -78,15 +80,38 @@ pub enum Error {
#[snafu(display("Cannot find RegionMetadata. Location: {}", location))]
RegionMetadataNotFound { location: Location },
#[snafu(display("Failed to join handle, location: {}, source: {}", location, source))]
Join {
source: common_runtime::JoinError,
location: Location,
},
#[snafu(display("Worker {} is stopped, location: {}", id, location))]
WorkerStopped { id: WorkerId, location: Location },
#[snafu(display("Failed to recv result, location: {}, source: {}", location, source))]
Recv {
source: tokio::sync::oneshot::error::RecvError,
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
impl ErrorExt for Error {
#[allow(clippy::match_single_binding)]
fn status_code(&self) -> StatusCode {
use Error::*;
match self {
_ => todo!(),
OpenDal { .. } => StatusCode::StorageUnavailable,
CompressObject { .. } | DecompressObject { .. } | SerdeJson { .. } | Utf8 { .. } => {
StatusCode::Unexpected
}
InvalidScanIndex { .. } => StatusCode::InvalidArguments,
RegionMetadataNotFound { .. } | Join { .. } | WorkerStopped { .. } | Recv { .. } => {
StatusCode::Internal
}
}
}

View File

@@ -16,6 +16,9 @@
//!
//! Mito is the a region engine to store timeseries data.
#[cfg(any(test, feature = "test"))]
pub mod test_util;
// TODO(yingwen): Remove all `allow(dead_code)` after finish refactoring mito.
pub mod config;
#[allow(dead_code)]
@@ -149,4 +152,9 @@ mod worker;
/// FileHandle o-- FileMeta
/// class RegionMetadata
/// ```
///
/// ## Region workers
///
/// The engine handles DMLs and DDLs in dedicated [workers](crate::worker::WorkerGroup).
///
mod docs {}

View File

@@ -51,13 +51,15 @@ pub struct RegionMetadata {
/// Latest schema of this region
#[serde(skip)]
schema: SchemaRef,
/// Columns in the region. Has the same order as columns
/// in [schema](RegionMetadata::schema).
column_metadatas: Vec<ColumnMetadata>,
/// Version of metadata.
version: VersionNumber,
/// Maintains an ordered list of primary keys
primary_key: Vec<ColumnId>,
/// Immutable and unique id
/// Immutable and unique id of a region.
region_id: RegionId,
}
@@ -148,15 +150,20 @@ impl RegionMetadataBuilder {
pub struct ColumnMetadata {
/// Schema of this column. Is the same as `column_schema` in [SchemaRef].
column_schema: ColumnSchema,
/// Semantic type of this column (e.g. tag or timestamp).
semantic_type: SemanticType,
/// Immutable and unique id of a region.
column_id: ColumnId,
}
/// The semantic type of one column
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum SemanticType {
/// Tag column, also is a part of primary key.
Tag,
/// A column that isn't a time index or part of primary key.
Field,
/// Time index column.
Timestamp,
}

View File

@@ -0,0 +1,70 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Utilities for testing.
use std::sync::Arc;
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use log_store::raft_engine::log_store::RaftEngineLogStore;
use log_store::test_util::log_store_util;
use object_store::services::Fs;
use object_store::util::join_dir;
use object_store::ObjectStore;
use crate::config::MitoConfig;
use crate::engine::MitoEngine;
use crate::worker::WorkerGroup;
/// Env to test mito engine.
pub struct TestEnv {
/// Path to store data.
data_home: TempDir,
}
impl TestEnv {
/// Returns a new env with specific `prefix` for test.
pub fn new(prefix: &str) -> TestEnv {
TestEnv {
data_home: create_temp_dir(prefix),
}
}
/// Creates a new engine with specific config under this env.
pub async fn create_engine(&self, config: MitoConfig) -> MitoEngine {
let (log_store, object_store) = self.create_log_and_object_store().await;
MitoEngine::new(config, Arc::new(log_store), object_store)
}
/// Creates a new [WorkerGroup] with specific config under this env.
pub(crate) async fn create_worker_group(&self, config: &MitoConfig) -> WorkerGroup {
let (log_store, object_store) = self.create_log_and_object_store().await;
WorkerGroup::start(config, Arc::new(log_store), object_store)
}
async fn create_log_and_object_store(&self) -> (RaftEngineLogStore, ObjectStore) {
let data_home = self.data_home.path().to_str().unwrap();
let wal_path = join_dir(data_home, "wal");
let data_path = join_dir(data_home, "data");
let log_store = log_store_util::create_tmp_local_file_log_store(&wal_path).await;
let mut builder = Fs::default();
builder.root(&data_path);
let object_store = ObjectStore::new(builder).unwrap().finish();
(log_store, object_store)
}
}

View File

@@ -14,18 +14,396 @@
//! Structs and utilities for writing regions.
use crate::region::RegionMapRef;
mod handle_create;
mod handle_open;
pub(crate) mod request;
/// A fixed size group of [RegionWorker]s.
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use common_runtime::JoinHandle;
use common_telemetry::logging;
use futures::future::try_join_all;
use object_store::ObjectStore;
use snafu::{ensure, ResultExt};
use store_api::logstore::LogStore;
use store_api::storage::RegionId;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::{mpsc, Mutex};
use crate::config::MitoConfig;
use crate::error::{JoinSnafu, Result, WorkerStoppedSnafu};
use crate::region::{RegionMap, RegionMapRef};
use crate::worker::request::{RegionRequest, RequestBody, WorkerRequest};
/// Identifier for a worker.
pub(crate) type WorkerId = u32;
/// A fixed size group of [RegionWorkers](RegionWorker).
///
/// The group binds each region to a specific [RegionWorker].
#[derive(Debug, Default)]
/// A worker group binds each region to a specific [RegionWorker] and sends
/// requests to region's dedicated worker.
///
/// ```mermaid
/// graph LR
///
/// RegionRequest -- Route by region id --> Worker0 & Worker1
///
/// subgraph MitoEngine
/// subgraph WorkerGroup
/// Worker0["RegionWorker 0"]
/// Worker1["RegionWorker 1"]
/// end
/// end
///
/// Chan0[" Request channel 0"]
/// Chan1[" Request channel 1"]
/// WorkerThread1["RegionWorkerLoop 1"]
///
/// subgraph WorkerThread0["RegionWorkerLoop 0"]
/// subgraph RegionMap["RegionMap (regions bound to worker 0)"]
/// Region0["Region 0"]
/// Region2["Region 2"]
/// end
/// Buffer0["RequestBuffer"]
///
/// Buffer0 -- modify regions --> RegionMap
/// end
///
/// Worker0 --> Chan0
/// Worker1 --> Chan1
/// Chan0 --> Buffer0
/// Chan1 --> WorkerThread1
/// ```
#[derive(Debug)]
pub(crate) struct WorkerGroup {
workers: Vec<RegionWorker>,
}
/// Worker to write and alter regions bound to it.
#[derive(Debug, Default)]
struct RegionWorker {
regions: RegionMapRef,
impl WorkerGroup {
/// Start a worker group.
///
/// The number of workers should be power of two.
pub(crate) fn start<S: LogStore>(
config: &MitoConfig,
log_store: Arc<S>,
object_store: ObjectStore,
) -> WorkerGroup {
assert!(config.num_workers.is_power_of_two());
let workers = (0..config.num_workers)
.map(|id| {
RegionWorker::start(
WorkerConfig {
id: id as WorkerId,
channel_size: config.worker_channel_size,
request_batch_size: config.worker_request_batch_size,
},
log_store.clone(),
object_store.clone(),
)
})
.collect();
WorkerGroup { workers }
}
/// Stop the worker group.
pub(crate) async fn stop(&self) -> Result<()> {
logging::info!("Stop region worker group");
try_join_all(self.workers.iter().map(|worker| worker.stop())).await?;
Ok(())
}
/// Submit a request to a worker in the group.
pub(crate) async fn submit_to_worker(&self, request: RegionRequest) -> Result<()> {
self.worker(request.body.region_id())
.submit_request(request)
.await
}
/// Get worker for specific `region_id`.
fn worker(&self, region_id: RegionId) -> &RegionWorker {
let mut hasher = DefaultHasher::new();
region_id.hash(&mut hasher);
let value = hasher.finish() as usize;
let index = value_to_index(value, self.workers.len());
&self.workers[index]
}
}
fn value_to_index(value: usize, num_workers: usize) -> usize {
value & (num_workers - 1)
}
/// Config for region worker.
#[derive(Debug, Clone)]
struct WorkerConfig {
/// Id of the worker
id: WorkerId,
/// Capacity of the request channel.
channel_size: usize,
/// Batch size to process request.
request_batch_size: usize,
}
/// Worker to write and alter regions bound to it.
#[derive(Debug)]
pub(crate) struct RegionWorker {
/// Id of the worker.
id: WorkerId,
/// Regions bound to the worker.
regions: RegionMapRef,
/// Request sender.
sender: Sender<WorkerRequest>,
/// Handle to the worker thread.
handle: Mutex<Option<JoinHandle<()>>>,
/// Whether to run the worker thread.
running: Arc<AtomicBool>,
}
impl RegionWorker {
/// Start a region worker and its background thread.
fn start<S: LogStore>(
config: WorkerConfig,
log_store: Arc<S>,
object_store: ObjectStore,
) -> RegionWorker {
let regions = Arc::new(RegionMap::default());
let (sender, receiver) = mpsc::channel(config.channel_size);
let running = Arc::new(AtomicBool::new(true));
let mut worker_thread = RegionWorkerLoop {
id: config.id,
regions: regions.clone(),
receiver,
log_store,
object_store,
running: running.clone(),
request_batch_size: config.request_batch_size,
};
let handle = common_runtime::spawn_bg(async move {
worker_thread.run().await;
});
RegionWorker {
id: config.id,
regions,
sender,
handle: Mutex::new(Some(handle)),
running,
}
}
/// Submit request to background worker thread.
async fn submit_request(&self, request: RegionRequest) -> Result<()> {
ensure!(self.is_running(), WorkerStoppedSnafu { id: self.id });
if self
.sender
.send(WorkerRequest::Region(request))
.await
.is_err()
{
logging::warn!(
"Worker {} is already exited but the running flag is still true",
self.id
);
// Manually set the running flag to false to avoid printing more warning logs.
self.set_running(false);
return WorkerStoppedSnafu { id: self.id }.fail();
}
Ok(())
}
/// Stop the worker.
///
/// This method waits until the worker thread exists.
async fn stop(&self) -> Result<()> {
let handle = self.handle.lock().await.take();
if let Some(handle) = handle {
logging::info!("Stop region worker {}", self.id);
self.set_running(false);
if self.sender.send(WorkerRequest::Stop).await.is_err() {
logging::warn!("Worker {} is already exited before stop", self.id);
}
handle.await.context(JoinSnafu)?;
}
Ok(())
}
/// Returns true if the worker is still running.
fn is_running(&self) -> bool {
self.running.load(Ordering::Relaxed)
}
/// Sets whether the worker is still running.
fn set_running(&self, value: bool) {
self.running.store(value, Ordering::Relaxed)
}
}
impl Drop for RegionWorker {
fn drop(&mut self) {
if self.is_running() {
self.set_running(false);
// Once we drop the sender, the worker thread will receive a disconnected error.
}
}
}
type RequestBuffer = Vec<WorkerRequest>;
/// Background worker loop to handle requests.
struct RegionWorkerLoop<S> {
// Id of the worker.
id: WorkerId,
/// Regions bound to the worker.
regions: RegionMapRef,
/// Request receiver.
receiver: Receiver<WorkerRequest>,
// TODO(yingwen): Replaced by Wal.
log_store: Arc<S>,
/// Object store for manifest and SSTs.
object_store: ObjectStore,
/// Whether the worker thread is still running.
running: Arc<AtomicBool>,
/// Batch size to fetch requests from channel.
request_batch_size: usize,
}
impl<S> RegionWorkerLoop<S> {
/// Starts the worker loop.
async fn run(&mut self) {
logging::info!("Start region worker thread {}", self.id);
// Buffer to retrieve requests from receiver.
let mut buffer = RequestBuffer::with_capacity(self.request_batch_size);
while self.running.load(Ordering::Relaxed) {
// Clear the buffer before handling next batch of requests.
buffer.clear();
match self.receiver.recv().await {
Some(request) => buffer.push(request),
None => break,
}
// Try to recv more requests from the channel.
for _ in 1..buffer.capacity() {
// We have received one request so we start from 1.
match self.receiver.try_recv() {
Ok(req) => buffer.push(req),
// We still need to handle remaining requests.
Err(_) => break,
}
}
self.handle_requests(&mut buffer).await;
}
logging::info!("Exit region worker thread {}", self.id);
}
/// Dispatches and processes requests.
///
/// `buffer` should be empty.
async fn handle_requests(&mut self, buffer: &mut RequestBuffer) {
let mut dml_requests = Vec::with_capacity(buffer.len());
let mut ddl_requests = Vec::with_capacity(buffer.len());
for worker_req in buffer.drain(..) {
match worker_req {
WorkerRequest::Region(req) => {
if req.body.is_ddl() {
ddl_requests.push(req);
} else {
dml_requests.push(req);
}
}
// We receive a stop signal, but we still want to process remaining
// requests. The worker thread will then check the running flag and
// then exit.
WorkerRequest::Stop => {
debug_assert!(!self.running.load(Ordering::Relaxed));
}
}
}
// Handles all dml requests first. So we can alter regions without
// considering existing dml requests.
self.handle_dml_requests(dml_requests).await;
self.handle_ddl_requests(ddl_requests).await;
}
/// Takes and handles all dml requests.
async fn handle_dml_requests(&mut self, write_requests: Vec<RegionRequest>) {
if write_requests.is_empty() {
return;
}
// Create a write context that holds meta and sequence.
unimplemented!()
}
/// Takes and handles all ddl requests.
async fn handle_ddl_requests(&mut self, ddl_requests: Vec<RegionRequest>) {
if ddl_requests.is_empty() {
return;
}
for request in ddl_requests {
let res = match request.body {
RequestBody::Create(req) => self.handle_create_request(req).await,
RequestBody::Open(req) => self.handle_open_request(req).await,
RequestBody::Write(_) => unreachable!(),
};
if let Some(sender) = request.sender {
// Ignore send result.
let _ = sender.send(res);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_util::TestEnv;
#[test]
fn test_value_to_index() {
let num_workers = 1;
for i in 0..10 {
assert_eq!(0, value_to_index(i, num_workers));
}
let num_workers = 4;
for i in 0..10 {
assert_eq!(i % 4, value_to_index(i, num_workers));
}
}
#[tokio::test]
async fn test_worker_group_start_stop() {
let env = TestEnv::new("group-stop");
let group = env
.create_worker_group(&MitoConfig {
num_workers: 4,
..Default::default()
})
.await;
group.stop().await.unwrap();
}
}

View File

@@ -0,0 +1,30 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Handling create request.
use crate::error::Result;
use crate::worker::request::CreateRequest;
use crate::worker::RegionWorkerLoop;
impl<S> RegionWorkerLoop<S> {
pub(crate) async fn handle_create_request(&mut self, _request: CreateRequest) -> Result<()> {
// 1. Checks whether the table exists.
// 2. Convert the request into RegionMetadata
// 3. Write manifest
unimplemented!()
}
}

View File

@@ -0,0 +1,25 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Handling open request.
use crate::error::Result;
use crate::worker::request::OpenRequest;
use crate::worker::RegionWorkerLoop;
impl<S> RegionWorkerLoop<S> {
pub(crate) async fn handle_open_request(&mut self, _request: OpenRequest) -> Result<()> {
unimplemented!()
}
}

View File

@@ -0,0 +1,141 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Worker requests.
use std::time::Duration;
use common_base::readable_size::ReadableSize;
use store_api::storage::{ColumnId, CompactionStrategy, RegionId};
use tokio::sync::oneshot::{self, Receiver, Sender};
use crate::error::Result;
use crate::metadata::ColumnMetadata;
/// Options that affect the entire region.
///
/// Users need to specify the options while creating/opening a region.
#[derive(Debug)]
pub struct RegionOptions {
/// Region memtable max size in bytes.
pub write_buffer_size: Option<ReadableSize>,
/// Region SST files TTL.
pub ttl: Option<Duration>,
/// Compaction strategy.
pub compaction_strategy: CompactionStrategy,
}
/// Create region request.
#[derive(Debug)]
pub struct CreateRequest {
/// Region to create.
pub region_id: RegionId,
/// Columns in this region.
pub column_metadatas: Vec<ColumnMetadata>,
/// Columns in the primary key.
pub primary_key: Vec<ColumnId>,
/// Create region if not exists.
pub create_if_not_exists: bool,
/// Options of the created region.
pub options: RegionOptions,
}
impl CreateRequest {
/// Validate the request.
fn validate(&self) -> Result<()> {
unimplemented!()
}
}
/// Open region request.
#[derive(Debug)]
pub struct OpenRequest {
/// Region to open.
pub region_id: RegionId,
/// Options of the created region.
pub options: RegionOptions,
}
/// Request to write a region.
#[derive(Debug)]
pub(crate) struct WriteRequest {
/// Region to write.
pub region_id: RegionId,
}
/// Request sent to a worker
pub(crate) enum WorkerRequest {
/// Region request.
Region(RegionRequest),
/// Notify a worker to stop.
Stop,
}
/// Request to modify a region.
#[derive(Debug)]
pub(crate) struct RegionRequest {
/// Sender to send result.
pub(crate) sender: Option<Sender<Result<()>>>,
/// Request body.
pub(crate) body: RequestBody,
}
impl RegionRequest {
/// Creates a [RegionRequest] and a receiver from `body`.
pub(crate) fn from_body(body: RequestBody) -> (RegionRequest, Receiver<Result<()>>) {
let (sender, receiver) = oneshot::channel();
(
RegionRequest {
sender: Some(sender),
body,
},
receiver,
)
}
}
/// Body to carry actual region request.
#[derive(Debug)]
pub(crate) enum RequestBody {
// DML:
/// Write to a region.
Write(WriteRequest),
// DDL:
/// Creates a new region.
Create(CreateRequest),
/// Opens an existing region.
Open(OpenRequest),
}
impl RequestBody {
/// Region id of this request.
pub(crate) fn region_id(&self) -> RegionId {
match self {
RequestBody::Write(req) => req.region_id,
RequestBody::Create(req) => req.region_id,
RequestBody::Open(req) => req.region_id,
}
}
/// Returns whether the request is a DDL (e.g. CREATE/OPEN/ALTER).
pub(crate) fn is_ddl(&self) -> bool {
match self {
RequestBody::Write(_) => false,
RequestBody::Create(_) => true,
RequestBody::Open(_) => true,
}
}
}

View File

@@ -29,6 +29,21 @@ pub fn normalize_dir(dir: &str) -> String {
dir
}
/// Join two paths and normalize the output dir.
///
/// The output dir is always ends with `/`. e.g.
/// - `/a/b` join `c` => `/a/b/c/`
/// - `/a/b` join `/c/` => `/a/b/c/`
///
/// All internal `//` will be replaced by `/`.
pub fn join_dir(parent: &str, child: &str) -> String {
// Always adds a `/` to the output path.
let output = format!("{parent}/{child}/");
// We call opendal's normalize_dir which doesn't push `/` to
// the end of path.
opendal::raw::normalize_root(&output)
}
#[cfg(test)]
mod tests {
use super::*;
@@ -39,4 +54,18 @@ mod tests {
assert_eq!("/", normalize_dir(""));
assert_eq!("/test/", normalize_dir("/test"));
}
#[test]
fn test_join_paths() {
assert_eq!("/", join_dir("", ""));
assert_eq!("/", join_dir("/", ""));
assert_eq!("/", join_dir("", "/"));
assert_eq!("/", join_dir("/", "/"));
assert_eq!("/a/", join_dir("/a", ""));
assert_eq!("/a/b/c/", join_dir("a/b", "c"));
assert_eq!("/a/b/c/", join_dir("/a/b", "c"));
assert_eq!("/a/b/c/", join_dir("/a/b", "c/"));
assert_eq!("/a/b/c/", join_dir("/a/b", "/c/"));
assert_eq!("/a/b/c/", join_dir("/a/b", "//c"));
}
}