From ef7c5dd3114cf61c6c343da4399e24c091deed8c Mon Sep 17 00:00:00 2001 From: Yingwen Date: Fri, 14 Jul 2023 17:06:44 +0900 Subject: [PATCH] feat(mito): Implement WorkerGroup to handle requests (#1950) * feat: engine worker framework * feat: worder comments * feat: divide worker requests by type * feat: handlers for worker thread * refactor: rename requests to ddl and dml requests * feat: methods to stop and submit requests * refactor: rename request queue to request buffer * refactor: remove ddl and dml request * feat: send request to worker * test: test stop * docs(mito): worker group docs * style: fix clippy * docs: update WorkerGroup comment * chore: address CR comments * chore: fix comment issues * feat: use mpsc::channel * feat: check is_running flag * chore: Add stop request to notify a worker * refactor: add join_dir to join paths * feat: redefine region requests * docs: more comments * refactor: rename worker thread to worker loop * chore: address CR comments --- Cargo.lock | 1 + src/mito2/Cargo.toml | 5 + src/mito2/src/config.rs | 48 +++- src/mito2/src/engine.rs | 58 +++- src/mito2/src/error.rs | 29 +- src/mito2/src/lib.rs | 8 + src/mito2/src/metadata.rs | 9 +- src/mito2/src/test_util.rs | 70 +++++ src/mito2/src/worker.rs | 394 +++++++++++++++++++++++++- src/mito2/src/worker/handle_create.rs | 30 ++ src/mito2/src/worker/handle_open.rs | 25 ++ src/mito2/src/worker/request.rs | 141 +++++++++ src/object-store/src/util.rs | 29 ++ 13 files changed, 832 insertions(+), 15 deletions(-) create mode 100644 src/mito2/src/test_util.rs create mode 100644 src/mito2/src/worker/handle_create.rs create mode 100644 src/mito2/src/worker/handle_open.rs create mode 100644 src/mito2/src/worker/request.rs diff --git a/Cargo.lock b/Cargo.lock index b4e5176472..4348785381 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5464,6 +5464,7 @@ dependencies = [ "async-stream", "async-trait", "chrono", + "common-base", "common-catalog", "common-datasource", "common-error", diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index 640549dd5f..30fe7cf920 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -4,6 +4,10 @@ version.workspace = true edition.workspace = true license.workspace = true +[features] +default = [] +test = ["common-test-util"] + [dependencies] aquamarine = "0.3" anymap = "1.0.0-beta.2" @@ -12,6 +16,7 @@ async-stream.workspace = true async-trait = "0.1" chrono.workspace = true common-catalog = { path = "../common/catalog" } +common-base = { path = "../common/base" } common-error = { path = "../common/error" } common-procedure = { path = "../common/procedure" } common-query = { path = "../common/query" } diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index 87c25cd3d3..875a8c7dd0 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -14,6 +14,52 @@ //! Configurations. +use common_telemetry::logging; + +const DEFAULT_NUM_WORKERS: usize = 1; + /// Configuration for [MitoEngine](crate::engine::MitoEngine). #[derive(Debug)] -pub struct MitoConfig {} +pub struct MitoConfig { + /// Number of region workers. + pub num_workers: usize, + /// Request channel size of each worker. + pub worker_channel_size: usize, + /// Max batch size for a worker to handle requests. + pub worker_request_batch_size: usize, +} + +impl Default for MitoConfig { + fn default() -> Self { + MitoConfig { + num_workers: DEFAULT_NUM_WORKERS, + worker_channel_size: 128, + worker_request_batch_size: 64, + } + } +} + +impl MitoConfig { + /// Sanitize incorrect configurations. + pub(crate) fn sanitize(&mut self) { + // Sanitize worker num. + let num_workers_before = self.num_workers; + if self.num_workers == 0 { + self.num_workers = DEFAULT_NUM_WORKERS; + } + self.num_workers = self.num_workers.next_power_of_two(); + if num_workers_before != self.num_workers { + logging::warn!( + "Sanitize worker num {} to {}", + num_workers_before, + self.num_workers + ); + } + + // Sanitize channel size. + if self.worker_channel_size == 0 { + logging::warn!("Sanitize channel size 0 to 1"); + self.worker_channel_size = 1; + } + } +} diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index c5e66af2b5..c8801ea713 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -15,8 +15,13 @@ use std::sync::Arc; use object_store::ObjectStore; +use snafu::ResultExt; +use store_api::logstore::LogStore; use crate::config::MitoConfig; +use crate::error::{RecvSnafu, Result}; +pub use crate::worker::request::CreateRequest; +use crate::worker::request::{RegionRequest, RequestBody}; use crate::worker::WorkerGroup; /// Region engine implementation for timeseries data. @@ -27,11 +32,27 @@ pub struct MitoEngine { impl MitoEngine { /// Returns a new [MitoEngine] with specific `config`, `log_store` and `object_store`. - pub fn new(config: MitoConfig, log_store: S, object_store: ObjectStore) -> MitoEngine { + pub fn new( + mut config: MitoConfig, + log_store: Arc, + object_store: ObjectStore, + ) -> MitoEngine { + config.sanitize(); + MitoEngine { inner: Arc::new(EngineInner::new(config, log_store, object_store)), } } + + /// Stop the engine. + pub async fn stop(&self) -> Result<()> { + self.inner.stop().await + } + + /// Creates a new region. + pub async fn create_region(&self, request: CreateRequest) -> Result<()> { + self.inner.create_region(request).await + } } /// Inner struct of [MitoEngine]. @@ -42,9 +63,40 @@ struct EngineInner { impl EngineInner { /// Returns a new [EngineInner] with specific `config`, `log_store` and `object_store`. - fn new(_config: MitoConfig, _log_store: S, _object_store: ObjectStore) -> EngineInner { + fn new( + config: MitoConfig, + log_store: Arc, + object_store: ObjectStore, + ) -> EngineInner { EngineInner { - workers: WorkerGroup::default(), + workers: WorkerGroup::start(&config, log_store, object_store), } } + + /// Stop the inner engine. + async fn stop(&self) -> Result<()> { + self.workers.stop().await + } + + /// Creates a new region. + async fn create_region(&self, create_request: CreateRequest) -> Result<()> { + let (request, receiver) = RegionRequest::from_body(RequestBody::Create(create_request)); + self.workers.submit_to_worker(request).await?; + + receiver.await.context(RecvSnafu)? + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test_util::TestEnv; + + #[tokio::test] + async fn test_engine_new_stop() { + let env = TestEnv::new("engine-stop"); + let engine = env.create_engine(MitoConfig::default()).await; + + engine.stop().await.unwrap(); + } } diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 075feefab4..58a5d023a1 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -20,6 +20,8 @@ use common_error::status_code::StatusCode; use snafu::{Location, Snafu}; use store_api::manifest::ManifestVersion; +use crate::worker::WorkerId; + #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { @@ -78,15 +80,38 @@ pub enum Error { #[snafu(display("Cannot find RegionMetadata. Location: {}", location))] RegionMetadataNotFound { location: Location }, + + #[snafu(display("Failed to join handle, location: {}, source: {}", location, source))] + Join { + source: common_runtime::JoinError, + location: Location, + }, + + #[snafu(display("Worker {} is stopped, location: {}", id, location))] + WorkerStopped { id: WorkerId, location: Location }, + + #[snafu(display("Failed to recv result, location: {}, source: {}", location, source))] + Recv { + source: tokio::sync::oneshot::error::RecvError, + location: Location, + }, } pub type Result = std::result::Result; impl ErrorExt for Error { - #[allow(clippy::match_single_binding)] fn status_code(&self) -> StatusCode { + use Error::*; + match self { - _ => todo!(), + OpenDal { .. } => StatusCode::StorageUnavailable, + CompressObject { .. } | DecompressObject { .. } | SerdeJson { .. } | Utf8 { .. } => { + StatusCode::Unexpected + } + InvalidScanIndex { .. } => StatusCode::InvalidArguments, + RegionMetadataNotFound { .. } | Join { .. } | WorkerStopped { .. } | Recv { .. } => { + StatusCode::Internal + } } } diff --git a/src/mito2/src/lib.rs b/src/mito2/src/lib.rs index 72a1ccedf4..dae66d9416 100644 --- a/src/mito2/src/lib.rs +++ b/src/mito2/src/lib.rs @@ -16,6 +16,9 @@ //! //! Mito is the a region engine to store timeseries data. +#[cfg(any(test, feature = "test"))] +pub mod test_util; + // TODO(yingwen): Remove all `allow(dead_code)` after finish refactoring mito. pub mod config; #[allow(dead_code)] @@ -149,4 +152,9 @@ mod worker; /// FileHandle o-- FileMeta /// class RegionMetadata /// ``` +/// +/// ## Region workers +/// +/// The engine handles DMLs and DDLs in dedicated [workers](crate::worker::WorkerGroup). +/// mod docs {} diff --git a/src/mito2/src/metadata.rs b/src/mito2/src/metadata.rs index 04533b8525..02f793d046 100644 --- a/src/mito2/src/metadata.rs +++ b/src/mito2/src/metadata.rs @@ -51,13 +51,15 @@ pub struct RegionMetadata { /// Latest schema of this region #[serde(skip)] schema: SchemaRef, + /// Columns in the region. Has the same order as columns + /// in [schema](RegionMetadata::schema). column_metadatas: Vec, /// Version of metadata. version: VersionNumber, /// Maintains an ordered list of primary keys primary_key: Vec, - /// Immutable and unique id + /// Immutable and unique id of a region. region_id: RegionId, } @@ -148,15 +150,20 @@ impl RegionMetadataBuilder { pub struct ColumnMetadata { /// Schema of this column. Is the same as `column_schema` in [SchemaRef]. column_schema: ColumnSchema, + /// Semantic type of this column (e.g. tag or timestamp). semantic_type: SemanticType, + /// Immutable and unique id of a region. column_id: ColumnId, } /// The semantic type of one column #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub enum SemanticType { + /// Tag column, also is a part of primary key. Tag, + /// A column that isn't a time index or part of primary key. Field, + /// Time index column. Timestamp, } diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs new file mode 100644 index 0000000000..28eaf5786d --- /dev/null +++ b/src/mito2/src/test_util.rs @@ -0,0 +1,70 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Utilities for testing. + +use std::sync::Arc; + +use common_test_util::temp_dir::{create_temp_dir, TempDir}; +use log_store::raft_engine::log_store::RaftEngineLogStore; +use log_store::test_util::log_store_util; +use object_store::services::Fs; +use object_store::util::join_dir; +use object_store::ObjectStore; + +use crate::config::MitoConfig; +use crate::engine::MitoEngine; +use crate::worker::WorkerGroup; + +/// Env to test mito engine. +pub struct TestEnv { + /// Path to store data. + data_home: TempDir, +} + +impl TestEnv { + /// Returns a new env with specific `prefix` for test. + pub fn new(prefix: &str) -> TestEnv { + TestEnv { + data_home: create_temp_dir(prefix), + } + } + + /// Creates a new engine with specific config under this env. + pub async fn create_engine(&self, config: MitoConfig) -> MitoEngine { + let (log_store, object_store) = self.create_log_and_object_store().await; + + MitoEngine::new(config, Arc::new(log_store), object_store) + } + + /// Creates a new [WorkerGroup] with specific config under this env. + pub(crate) async fn create_worker_group(&self, config: &MitoConfig) -> WorkerGroup { + let (log_store, object_store) = self.create_log_and_object_store().await; + + WorkerGroup::start(config, Arc::new(log_store), object_store) + } + + async fn create_log_and_object_store(&self) -> (RaftEngineLogStore, ObjectStore) { + let data_home = self.data_home.path().to_str().unwrap(); + let wal_path = join_dir(data_home, "wal"); + let data_path = join_dir(data_home, "data"); + + let log_store = log_store_util::create_tmp_local_file_log_store(&wal_path).await; + let mut builder = Fs::default(); + builder.root(&data_path); + let object_store = ObjectStore::new(builder).unwrap().finish(); + + (log_store, object_store) + } +} diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index bbfe075486..0f92c416f8 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -14,18 +14,396 @@ //! Structs and utilities for writing regions. -use crate::region::RegionMapRef; +mod handle_create; +mod handle_open; +pub(crate) mod request; -/// A fixed size group of [RegionWorker]s. +use std::collections::hash_map::DefaultHasher; +use std::hash::{Hash, Hasher}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; + +use common_runtime::JoinHandle; +use common_telemetry::logging; +use futures::future::try_join_all; +use object_store::ObjectStore; +use snafu::{ensure, ResultExt}; +use store_api::logstore::LogStore; +use store_api::storage::RegionId; +use tokio::sync::mpsc::{Receiver, Sender}; +use tokio::sync::{mpsc, Mutex}; + +use crate::config::MitoConfig; +use crate::error::{JoinSnafu, Result, WorkerStoppedSnafu}; +use crate::region::{RegionMap, RegionMapRef}; +use crate::worker::request::{RegionRequest, RequestBody, WorkerRequest}; + +/// Identifier for a worker. +pub(crate) type WorkerId = u32; + +/// A fixed size group of [RegionWorkers](RegionWorker). /// -/// The group binds each region to a specific [RegionWorker]. -#[derive(Debug, Default)] +/// A worker group binds each region to a specific [RegionWorker] and sends +/// requests to region's dedicated worker. +/// +/// ```mermaid +/// graph LR +/// +/// RegionRequest -- Route by region id --> Worker0 & Worker1 +/// +/// subgraph MitoEngine +/// subgraph WorkerGroup +/// Worker0["RegionWorker 0"] +/// Worker1["RegionWorker 1"] +/// end +/// end +/// +/// Chan0[" Request channel 0"] +/// Chan1[" Request channel 1"] +/// WorkerThread1["RegionWorkerLoop 1"] +/// +/// subgraph WorkerThread0["RegionWorkerLoop 0"] +/// subgraph RegionMap["RegionMap (regions bound to worker 0)"] +/// Region0["Region 0"] +/// Region2["Region 2"] +/// end +/// Buffer0["RequestBuffer"] +/// +/// Buffer0 -- modify regions --> RegionMap +/// end +/// +/// Worker0 --> Chan0 +/// Worker1 --> Chan1 +/// Chan0 --> Buffer0 +/// Chan1 --> WorkerThread1 +/// ``` +#[derive(Debug)] pub(crate) struct WorkerGroup { workers: Vec, } -/// Worker to write and alter regions bound to it. -#[derive(Debug, Default)] -struct RegionWorker { - regions: RegionMapRef, +impl WorkerGroup { + /// Start a worker group. + /// + /// The number of workers should be power of two. + pub(crate) fn start( + config: &MitoConfig, + log_store: Arc, + object_store: ObjectStore, + ) -> WorkerGroup { + assert!(config.num_workers.is_power_of_two()); + + let workers = (0..config.num_workers) + .map(|id| { + RegionWorker::start( + WorkerConfig { + id: id as WorkerId, + channel_size: config.worker_channel_size, + request_batch_size: config.worker_request_batch_size, + }, + log_store.clone(), + object_store.clone(), + ) + }) + .collect(); + + WorkerGroup { workers } + } + + /// Stop the worker group. + pub(crate) async fn stop(&self) -> Result<()> { + logging::info!("Stop region worker group"); + + try_join_all(self.workers.iter().map(|worker| worker.stop())).await?; + + Ok(()) + } + + /// Submit a request to a worker in the group. + pub(crate) async fn submit_to_worker(&self, request: RegionRequest) -> Result<()> { + self.worker(request.body.region_id()) + .submit_request(request) + .await + } + + /// Get worker for specific `region_id`. + fn worker(&self, region_id: RegionId) -> &RegionWorker { + let mut hasher = DefaultHasher::new(); + region_id.hash(&mut hasher); + let value = hasher.finish() as usize; + let index = value_to_index(value, self.workers.len()); + + &self.workers[index] + } +} + +fn value_to_index(value: usize, num_workers: usize) -> usize { + value & (num_workers - 1) +} + +/// Config for region worker. +#[derive(Debug, Clone)] +struct WorkerConfig { + /// Id of the worker + id: WorkerId, + /// Capacity of the request channel. + channel_size: usize, + /// Batch size to process request. + request_batch_size: usize, +} + +/// Worker to write and alter regions bound to it. +#[derive(Debug)] +pub(crate) struct RegionWorker { + /// Id of the worker. + id: WorkerId, + /// Regions bound to the worker. + regions: RegionMapRef, + /// Request sender. + sender: Sender, + /// Handle to the worker thread. + handle: Mutex>>, + /// Whether to run the worker thread. + running: Arc, +} + +impl RegionWorker { + /// Start a region worker and its background thread. + fn start( + config: WorkerConfig, + log_store: Arc, + object_store: ObjectStore, + ) -> RegionWorker { + let regions = Arc::new(RegionMap::default()); + let (sender, receiver) = mpsc::channel(config.channel_size); + + let running = Arc::new(AtomicBool::new(true)); + let mut worker_thread = RegionWorkerLoop { + id: config.id, + regions: regions.clone(), + receiver, + log_store, + object_store, + running: running.clone(), + request_batch_size: config.request_batch_size, + }; + let handle = common_runtime::spawn_bg(async move { + worker_thread.run().await; + }); + + RegionWorker { + id: config.id, + regions, + sender, + handle: Mutex::new(Some(handle)), + running, + } + } + + /// Submit request to background worker thread. + async fn submit_request(&self, request: RegionRequest) -> Result<()> { + ensure!(self.is_running(), WorkerStoppedSnafu { id: self.id }); + if self + .sender + .send(WorkerRequest::Region(request)) + .await + .is_err() + { + logging::warn!( + "Worker {} is already exited but the running flag is still true", + self.id + ); + // Manually set the running flag to false to avoid printing more warning logs. + self.set_running(false); + return WorkerStoppedSnafu { id: self.id }.fail(); + } + + Ok(()) + } + + /// Stop the worker. + /// + /// This method waits until the worker thread exists. + async fn stop(&self) -> Result<()> { + let handle = self.handle.lock().await.take(); + if let Some(handle) = handle { + logging::info!("Stop region worker {}", self.id); + + self.set_running(false); + if self.sender.send(WorkerRequest::Stop).await.is_err() { + logging::warn!("Worker {} is already exited before stop", self.id); + } + + handle.await.context(JoinSnafu)?; + } + + Ok(()) + } + + /// Returns true if the worker is still running. + fn is_running(&self) -> bool { + self.running.load(Ordering::Relaxed) + } + + /// Sets whether the worker is still running. + fn set_running(&self, value: bool) { + self.running.store(value, Ordering::Relaxed) + } +} + +impl Drop for RegionWorker { + fn drop(&mut self) { + if self.is_running() { + self.set_running(false); + // Once we drop the sender, the worker thread will receive a disconnected error. + } + } +} + +type RequestBuffer = Vec; + +/// Background worker loop to handle requests. +struct RegionWorkerLoop { + // Id of the worker. + id: WorkerId, + /// Regions bound to the worker. + regions: RegionMapRef, + /// Request receiver. + receiver: Receiver, + // TODO(yingwen): Replaced by Wal. + log_store: Arc, + /// Object store for manifest and SSTs. + object_store: ObjectStore, + /// Whether the worker thread is still running. + running: Arc, + /// Batch size to fetch requests from channel. + request_batch_size: usize, +} + +impl RegionWorkerLoop { + /// Starts the worker loop. + async fn run(&mut self) { + logging::info!("Start region worker thread {}", self.id); + + // Buffer to retrieve requests from receiver. + let mut buffer = RequestBuffer::with_capacity(self.request_batch_size); + + while self.running.load(Ordering::Relaxed) { + // Clear the buffer before handling next batch of requests. + buffer.clear(); + + match self.receiver.recv().await { + Some(request) => buffer.push(request), + None => break, + } + + // Try to recv more requests from the channel. + for _ in 1..buffer.capacity() { + // We have received one request so we start from 1. + match self.receiver.try_recv() { + Ok(req) => buffer.push(req), + // We still need to handle remaining requests. + Err(_) => break, + } + } + + self.handle_requests(&mut buffer).await; + } + + logging::info!("Exit region worker thread {}", self.id); + } + + /// Dispatches and processes requests. + /// + /// `buffer` should be empty. + async fn handle_requests(&mut self, buffer: &mut RequestBuffer) { + let mut dml_requests = Vec::with_capacity(buffer.len()); + let mut ddl_requests = Vec::with_capacity(buffer.len()); + for worker_req in buffer.drain(..) { + match worker_req { + WorkerRequest::Region(req) => { + if req.body.is_ddl() { + ddl_requests.push(req); + } else { + dml_requests.push(req); + } + } + // We receive a stop signal, but we still want to process remaining + // requests. The worker thread will then check the running flag and + // then exit. + WorkerRequest::Stop => { + debug_assert!(!self.running.load(Ordering::Relaxed)); + } + } + } + + // Handles all dml requests first. So we can alter regions without + // considering existing dml requests. + self.handle_dml_requests(dml_requests).await; + + self.handle_ddl_requests(ddl_requests).await; + } + + /// Takes and handles all dml requests. + async fn handle_dml_requests(&mut self, write_requests: Vec) { + if write_requests.is_empty() { + return; + } + + // Create a write context that holds meta and sequence. + + unimplemented!() + } + + /// Takes and handles all ddl requests. + async fn handle_ddl_requests(&mut self, ddl_requests: Vec) { + if ddl_requests.is_empty() { + return; + } + + for request in ddl_requests { + let res = match request.body { + RequestBody::Create(req) => self.handle_create_request(req).await, + RequestBody::Open(req) => self.handle_open_request(req).await, + RequestBody::Write(_) => unreachable!(), + }; + + if let Some(sender) = request.sender { + // Ignore send result. + let _ = sender.send(res); + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test_util::TestEnv; + + #[test] + fn test_value_to_index() { + let num_workers = 1; + for i in 0..10 { + assert_eq!(0, value_to_index(i, num_workers)); + } + + let num_workers = 4; + for i in 0..10 { + assert_eq!(i % 4, value_to_index(i, num_workers)); + } + } + + #[tokio::test] + async fn test_worker_group_start_stop() { + let env = TestEnv::new("group-stop"); + let group = env + .create_worker_group(&MitoConfig { + num_workers: 4, + ..Default::default() + }) + .await; + + group.stop().await.unwrap(); + } } diff --git a/src/mito2/src/worker/handle_create.rs b/src/mito2/src/worker/handle_create.rs new file mode 100644 index 0000000000..b001cc295e --- /dev/null +++ b/src/mito2/src/worker/handle_create.rs @@ -0,0 +1,30 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Handling create request. + +use crate::error::Result; +use crate::worker::request::CreateRequest; +use crate::worker::RegionWorkerLoop; + +impl RegionWorkerLoop { + pub(crate) async fn handle_create_request(&mut self, _request: CreateRequest) -> Result<()> { + // 1. Checks whether the table exists. + + // 2. Convert the request into RegionMetadata + + // 3. Write manifest + unimplemented!() + } +} diff --git a/src/mito2/src/worker/handle_open.rs b/src/mito2/src/worker/handle_open.rs new file mode 100644 index 0000000000..9b3ab082fa --- /dev/null +++ b/src/mito2/src/worker/handle_open.rs @@ -0,0 +1,25 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Handling open request. + +use crate::error::Result; +use crate::worker::request::OpenRequest; +use crate::worker::RegionWorkerLoop; + +impl RegionWorkerLoop { + pub(crate) async fn handle_open_request(&mut self, _request: OpenRequest) -> Result<()> { + unimplemented!() + } +} diff --git a/src/mito2/src/worker/request.rs b/src/mito2/src/worker/request.rs new file mode 100644 index 0000000000..eb0bf66a0c --- /dev/null +++ b/src/mito2/src/worker/request.rs @@ -0,0 +1,141 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Worker requests. + +use std::time::Duration; + +use common_base::readable_size::ReadableSize; +use store_api::storage::{ColumnId, CompactionStrategy, RegionId}; +use tokio::sync::oneshot::{self, Receiver, Sender}; + +use crate::error::Result; +use crate::metadata::ColumnMetadata; + +/// Options that affect the entire region. +/// +/// Users need to specify the options while creating/opening a region. +#[derive(Debug)] +pub struct RegionOptions { + /// Region memtable max size in bytes. + pub write_buffer_size: Option, + /// Region SST files TTL. + pub ttl: Option, + /// Compaction strategy. + pub compaction_strategy: CompactionStrategy, +} + +/// Create region request. +#[derive(Debug)] +pub struct CreateRequest { + /// Region to create. + pub region_id: RegionId, + /// Columns in this region. + pub column_metadatas: Vec, + /// Columns in the primary key. + pub primary_key: Vec, + /// Create region if not exists. + pub create_if_not_exists: bool, + /// Options of the created region. + pub options: RegionOptions, +} + +impl CreateRequest { + /// Validate the request. + fn validate(&self) -> Result<()> { + unimplemented!() + } +} + +/// Open region request. +#[derive(Debug)] +pub struct OpenRequest { + /// Region to open. + pub region_id: RegionId, + /// Options of the created region. + pub options: RegionOptions, +} + +/// Request to write a region. +#[derive(Debug)] +pub(crate) struct WriteRequest { + /// Region to write. + pub region_id: RegionId, +} + +/// Request sent to a worker +pub(crate) enum WorkerRequest { + /// Region request. + Region(RegionRequest), + + /// Notify a worker to stop. + Stop, +} + +/// Request to modify a region. +#[derive(Debug)] +pub(crate) struct RegionRequest { + /// Sender to send result. + pub(crate) sender: Option>>, + /// Request body. + pub(crate) body: RequestBody, +} + +impl RegionRequest { + /// Creates a [RegionRequest] and a receiver from `body`. + pub(crate) fn from_body(body: RequestBody) -> (RegionRequest, Receiver>) { + let (sender, receiver) = oneshot::channel(); + ( + RegionRequest { + sender: Some(sender), + body, + }, + receiver, + ) + } +} + +/// Body to carry actual region request. +#[derive(Debug)] +pub(crate) enum RequestBody { + // DML: + /// Write to a region. + Write(WriteRequest), + + // DDL: + /// Creates a new region. + Create(CreateRequest), + /// Opens an existing region. + Open(OpenRequest), +} + +impl RequestBody { + /// Region id of this request. + pub(crate) fn region_id(&self) -> RegionId { + match self { + RequestBody::Write(req) => req.region_id, + RequestBody::Create(req) => req.region_id, + RequestBody::Open(req) => req.region_id, + } + } + + /// Returns whether the request is a DDL (e.g. CREATE/OPEN/ALTER). + pub(crate) fn is_ddl(&self) -> bool { + match self { + RequestBody::Write(_) => false, + RequestBody::Create(_) => true, + RequestBody::Open(_) => true, + } + } +} diff --git a/src/object-store/src/util.rs b/src/object-store/src/util.rs index 38aa02256e..fdb15b4256 100644 --- a/src/object-store/src/util.rs +++ b/src/object-store/src/util.rs @@ -29,6 +29,21 @@ pub fn normalize_dir(dir: &str) -> String { dir } +/// Join two paths and normalize the output dir. +/// +/// The output dir is always ends with `/`. e.g. +/// - `/a/b` join `c` => `/a/b/c/` +/// - `/a/b` join `/c/` => `/a/b/c/` +/// +/// All internal `//` will be replaced by `/`. +pub fn join_dir(parent: &str, child: &str) -> String { + // Always adds a `/` to the output path. + let output = format!("{parent}/{child}/"); + // We call opendal's normalize_dir which doesn't push `/` to + // the end of path. + opendal::raw::normalize_root(&output) +} + #[cfg(test)] mod tests { use super::*; @@ -39,4 +54,18 @@ mod tests { assert_eq!("/", normalize_dir("")); assert_eq!("/test/", normalize_dir("/test")); } + + #[test] + fn test_join_paths() { + assert_eq!("/", join_dir("", "")); + assert_eq!("/", join_dir("/", "")); + assert_eq!("/", join_dir("", "/")); + assert_eq!("/", join_dir("/", "/")); + assert_eq!("/a/", join_dir("/a", "")); + assert_eq!("/a/b/c/", join_dir("a/b", "c")); + assert_eq!("/a/b/c/", join_dir("/a/b", "c")); + assert_eq!("/a/b/c/", join_dir("/a/b", "c/")); + assert_eq!("/a/b/c/", join_dir("/a/b", "/c/")); + assert_eq!("/a/b/c/", join_dir("/a/b", "//c")); + } }