mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-13 08:43:00 +00:00
feat(mito): Implement open and close for mito2 regions (#2052)
* feat: add close request * feat: handle close and open request * feat: Implement open * test: add TestEnv::new * feat: close region/engine and test * style: fix clippy * style: import log macros * docs: update docs * docs: add mermaid for manifest manager
This commit is contained in:
@@ -16,7 +16,7 @@
|
||||
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_datasource::compression::CompressionType;
|
||||
use common_telemetry::logging;
|
||||
use common_telemetry::warn;
|
||||
|
||||
/// Default region worker num.
|
||||
const DEFAULT_NUM_WORKERS: usize = 1;
|
||||
@@ -64,16 +64,15 @@ impl MitoConfig {
|
||||
}
|
||||
self.num_workers = self.num_workers.next_power_of_two();
|
||||
if num_workers_before != self.num_workers {
|
||||
logging::warn!(
|
||||
warn!(
|
||||
"Sanitize worker num {} to {}",
|
||||
num_workers_before,
|
||||
self.num_workers
|
||||
num_workers_before, self.num_workers
|
||||
);
|
||||
}
|
||||
|
||||
// Sanitize channel size.
|
||||
if self.worker_channel_size == 0 {
|
||||
logging::warn!("Sanitize channel size 0 to 1");
|
||||
warn!("Sanitize channel size 0 to 1");
|
||||
self.worker_channel_size = 1;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -27,7 +27,7 @@ use store_api::storage::RegionId;
|
||||
use crate::config::MitoConfig;
|
||||
use crate::error::{RecvSnafu, Result};
|
||||
pub use crate::worker::request::CreateRequest;
|
||||
use crate::worker::request::{RegionRequest, RequestBody};
|
||||
use crate::worker::request::{CloseRequest, OpenRequest, RegionRequest, RequestBody};
|
||||
use crate::worker::WorkerGroup;
|
||||
|
||||
/// Region engine implementation for timeseries data.
|
||||
@@ -56,8 +56,28 @@ impl MitoEngine {
|
||||
}
|
||||
|
||||
/// Creates a new region.
|
||||
pub async fn create_region(&self, request: CreateRequest) -> Result<()> {
|
||||
self.inner.create_region(request).await
|
||||
pub async fn create_region(&self, create_request: CreateRequest) -> Result<()> {
|
||||
self.inner
|
||||
.handle_request_body(RequestBody::Create(create_request))
|
||||
.await
|
||||
}
|
||||
|
||||
/// Opens an existing region.
|
||||
///
|
||||
/// Returns error if the region does not exist.
|
||||
pub async fn open_region(&self, open_request: OpenRequest) -> Result<()> {
|
||||
self.inner
|
||||
.handle_request_body(RequestBody::Open(open_request))
|
||||
.await
|
||||
}
|
||||
|
||||
/// Closes a region.
|
||||
///
|
||||
/// Does nothing if the region is already closed.
|
||||
pub async fn close_region(&self, close_request: CloseRequest) -> Result<()> {
|
||||
self.inner
|
||||
.handle_request_body(RequestBody::Close(close_request))
|
||||
.await
|
||||
}
|
||||
|
||||
/// Returns true if the specific region exists.
|
||||
@@ -89,9 +109,9 @@ impl EngineInner {
|
||||
self.workers.stop().await
|
||||
}
|
||||
|
||||
/// Creates a new region.
|
||||
async fn create_region(&self, create_request: CreateRequest) -> Result<()> {
|
||||
let (request, receiver) = RegionRequest::from_body(RequestBody::Create(create_request));
|
||||
/// Handles [RequestBody] and return its executed result.
|
||||
async fn handle_request_body(&self, body: RequestBody) -> Result<()> {
|
||||
let (request, receiver) = RegionRequest::from_body(body);
|
||||
self.workers.submit_to_worker(request).await?;
|
||||
|
||||
receiver.await.context(RecvSnafu)?
|
||||
|
||||
@@ -19,15 +19,22 @@ use store_api::storage::RegionId;
|
||||
use super::*;
|
||||
use crate::error::Error;
|
||||
use crate::test_util::{CreateRequestBuilder, TestEnv};
|
||||
use crate::worker::request::RegionOptions;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_engine_new_stop() {
|
||||
let env = TestEnv::new("engine-stop");
|
||||
let env = TestEnv::with_prefix("engine-stop");
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
|
||||
engine.stop().await.unwrap();
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new(region_id).build();
|
||||
engine.create_region(request).await.unwrap();
|
||||
|
||||
let request = CreateRequestBuilder::new(RegionId::new(1, 1)).build();
|
||||
// Stop the engine to reject further requests.
|
||||
engine.stop().await.unwrap();
|
||||
assert!(!engine.is_region_exists(region_id));
|
||||
|
||||
let request = CreateRequestBuilder::new(RegionId::new(1, 2)).build();
|
||||
let err = engine.create_region(request).await.unwrap_err();
|
||||
assert!(
|
||||
matches!(err, Error::WorkerStopped { .. }),
|
||||
@@ -37,7 +44,7 @@ async fn test_engine_new_stop() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_engine_create_new_region() {
|
||||
let env = TestEnv::new("new-region");
|
||||
let env = TestEnv::with_prefix("new-region");
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
@@ -49,7 +56,7 @@ async fn test_engine_create_new_region() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_engine_create_region_if_not_exists() {
|
||||
let env = TestEnv::new("create-not-exists");
|
||||
let env = TestEnv::with_prefix("create-not-exists");
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
|
||||
let builder = CreateRequestBuilder::new(RegionId::new(1, 1)).create_if_not_exists(true);
|
||||
@@ -61,7 +68,7 @@ async fn test_engine_create_region_if_not_exists() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_engine_create_existing_region() {
|
||||
let env = TestEnv::new("create-existing");
|
||||
let env = TestEnv::with_prefix("create-existing");
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
|
||||
let builder = CreateRequestBuilder::new(RegionId::new(1, 1));
|
||||
@@ -74,3 +81,98 @@ async fn test_engine_create_existing_region() {
|
||||
"unexpected err: {err}"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_engine_open_empty() {
|
||||
let env = TestEnv::with_prefix("open-empty");
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
|
||||
let err = engine
|
||||
.open_region(OpenRequest {
|
||||
region_id: RegionId::new(1, 1),
|
||||
region_dir: "empty".to_string(),
|
||||
options: RegionOptions::default(),
|
||||
})
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert!(
|
||||
matches!(err, Error::RegionNotFound { .. }),
|
||||
"unexpected err: {err}"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_engine_open_existing() {
|
||||
let env = TestEnv::with_prefix("open-exiting");
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new(region_id).build();
|
||||
let region_dir = request.region_dir.clone();
|
||||
engine.create_region(request).await.unwrap();
|
||||
|
||||
engine
|
||||
.open_region(OpenRequest {
|
||||
region_id,
|
||||
region_dir,
|
||||
options: RegionOptions::default(),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_engine_close_region() {
|
||||
let env = TestEnv::with_prefix("close");
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
// It's okay to close a region doesn't exist.
|
||||
engine
|
||||
.close_region(CloseRequest { region_id })
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let request = CreateRequestBuilder::new(region_id).build();
|
||||
engine.create_region(request).await.unwrap();
|
||||
|
||||
engine
|
||||
.close_region(CloseRequest { region_id })
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(!engine.is_region_exists(region_id));
|
||||
|
||||
// It's okay to close this region again.
|
||||
engine
|
||||
.close_region(CloseRequest { region_id })
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_engine_reopen_region() {
|
||||
let env = TestEnv::with_prefix("reopen-region");
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new(region_id).build();
|
||||
let region_dir = request.region_dir.clone();
|
||||
engine.create_region(request).await.unwrap();
|
||||
|
||||
// Close the region.
|
||||
engine
|
||||
.close_region(CloseRequest { region_id })
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Open the region again.
|
||||
engine
|
||||
.open_region(OpenRequest {
|
||||
region_id,
|
||||
region_dir,
|
||||
options: RegionOptions::default(),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(engine.is_region_exists(region_id));
|
||||
}
|
||||
|
||||
@@ -156,6 +156,24 @@ pub enum Error {
|
||||
source: parquet::errors::ParquetError,
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Region {} not found, location: {}", region_id, location))]
|
||||
RegionNotFound {
|
||||
region_id: RegionId,
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Region {} is corrupted, reason: {}, location: {}",
|
||||
region_id,
|
||||
reason,
|
||||
location
|
||||
))]
|
||||
RegionCorrupted {
|
||||
region_id: RegionId,
|
||||
reason: String,
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -173,7 +191,9 @@ impl ErrorExt for Error {
|
||||
| SerdeJson { .. }
|
||||
| Utf8 { .. }
|
||||
| RegionExists { .. }
|
||||
| NewRecordBatch { .. } => StatusCode::Unexpected,
|
||||
| NewRecordBatch { .. }
|
||||
| RegionNotFound { .. }
|
||||
| RegionCorrupted { .. } => StatusCode::Unexpected,
|
||||
InvalidScanIndex { .. } | InvalidMeta { .. } | InvalidSchema { .. } => {
|
||||
StatusCode::InvalidArguments
|
||||
}
|
||||
|
||||
@@ -162,4 +162,8 @@ mod worker;
|
||||
///
|
||||
/// The engine handles DMLs and DDLs in dedicated [workers](crate::worker::WorkerGroup).
|
||||
///
|
||||
/// ## Region manifest
|
||||
///
|
||||
/// The [RegionManifestManager](crate::manifest::manager::RegionManifestManager) manages metadata of the engine.
|
||||
///
|
||||
mod docs {}
|
||||
|
||||
@@ -105,7 +105,7 @@ impl RegionManifestBuilder {
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if the builder keeps a [RegionMetadata]
|
||||
/// Check if the builder keeps a [RegionMetadata](crate::metadata::RegionMetadata).
|
||||
pub fn contains_metadata(&self) -> bool {
|
||||
self.metadata.is_some()
|
||||
}
|
||||
|
||||
@@ -33,8 +33,67 @@ use crate::metadata::RegionMetadataRef;
|
||||
// trait MetaAction -> struct RegionMetaActionList
|
||||
// trait MetaActionIterator -> struct MetaActionIteratorImpl
|
||||
|
||||
#[cfg_attr(doc, aquamarine::aquamarine)]
|
||||
/// Manage region's manifest. Provide APIs to access (create/modify/recover) region's persisted
|
||||
/// metadata.
|
||||
///
|
||||
/// ```mermaid
|
||||
/// classDiagram
|
||||
/// class RegionManifestManager {
|
||||
/// -ManifestObjectStore store
|
||||
/// -RegionManifestOptions options
|
||||
/// -RegionManifest manifest
|
||||
/// +new() RegionManifestManager
|
||||
/// +open() Option~RegionManifestManager~
|
||||
/// +stop()
|
||||
/// +update(RegionMetaActionList action_list) ManifestVersion
|
||||
/// +manifest() RegionManifest
|
||||
/// }
|
||||
/// class ManifestObjectStore {
|
||||
/// -ObjectStore object_store
|
||||
/// }
|
||||
/// class RegionChange {
|
||||
/// -RegionMetadataRef metadata
|
||||
/// }
|
||||
/// class RegionEdit {
|
||||
/// -VersionNumber regoin_version
|
||||
/// -Vec~FileMeta~ files_to_add
|
||||
/// -Vec~FileMeta~ files_to_remove
|
||||
/// -SequenceNumber flushed_sequence
|
||||
/// }
|
||||
/// class RegionRemove {
|
||||
/// -RegionId region_id
|
||||
/// }
|
||||
/// RegionManifestManager o-- ManifestObjectStore
|
||||
/// RegionManifestManager o-- RegionManifest
|
||||
/// RegionManifestManager o-- RegionManifestOptions
|
||||
/// RegionManifestManager -- RegionMetaActionList
|
||||
/// RegionManifestManager -- RegionCheckpoint
|
||||
/// ManifestObjectStore o-- ObjectStore
|
||||
/// RegionMetaActionList o-- RegionMetaAction
|
||||
/// RegionMetaAction o-- ProtocolAction
|
||||
/// RegionMetaAction o-- RegionChange
|
||||
/// RegionMetaAction o-- RegionEdit
|
||||
/// RegionMetaAction o-- RegionRemove
|
||||
/// RegionChange o-- RegionMetadata
|
||||
/// RegionEdit o-- FileMeta
|
||||
///
|
||||
/// class RegionManifest {
|
||||
/// -RegionMetadataRef metadata
|
||||
/// -HashMap<FileId, FileMeta> files
|
||||
/// -ManifestVersion manifest_version
|
||||
/// }
|
||||
/// class RegionMetadata
|
||||
/// class FileMeta
|
||||
/// RegionManifest o-- RegionMetadata
|
||||
/// RegionManifest o-- FileMeta
|
||||
///
|
||||
/// class RegionCheckpoint {
|
||||
/// -ManifestVersion last_version
|
||||
/// -Option~RegionManifest~ checkpoint
|
||||
/// }
|
||||
/// RegionCheckpoint o-- RegionManifest
|
||||
/// ```
|
||||
#[derive(Debug)]
|
||||
pub struct RegionManifestManager {
|
||||
inner: RwLock<RegionManifestManagerInner>,
|
||||
@@ -377,7 +436,7 @@ mod test {
|
||||
#[tokio::test]
|
||||
async fn create_manifest_manager() {
|
||||
let metadata = Arc::new(basic_region_metadata());
|
||||
let env = TestEnv::new("");
|
||||
let env = TestEnv::new();
|
||||
let manager = env
|
||||
.create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
|
||||
.await
|
||||
@@ -389,7 +448,7 @@ mod test {
|
||||
|
||||
#[tokio::test]
|
||||
async fn open_manifest_manager() {
|
||||
let env = TestEnv::new("");
|
||||
let env = TestEnv::new();
|
||||
// Try to opens an empty manifest.
|
||||
assert!(env
|
||||
.create_manifest_manager(CompressionType::Uncompressed, 10, None)
|
||||
@@ -420,7 +479,7 @@ mod test {
|
||||
#[tokio::test]
|
||||
async fn region_change_add_column() {
|
||||
let metadata = Arc::new(basic_region_metadata());
|
||||
let env = TestEnv::new("");
|
||||
let env = TestEnv::new();
|
||||
let manager = env
|
||||
.create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
|
||||
.await
|
||||
|
||||
@@ -17,7 +17,7 @@ use std::iter::Iterator;
|
||||
use std::str::FromStr;
|
||||
|
||||
use common_datasource::compression::CompressionType;
|
||||
use common_telemetry::logging;
|
||||
use common_telemetry::{debug, info};
|
||||
use futures::TryStreamExt;
|
||||
use lazy_static::lazy_static;
|
||||
use object_store::{raw_normalize_path, util, Entry, ErrorKind, ObjectStore};
|
||||
@@ -267,7 +267,7 @@ impl ManifestObjectStore {
|
||||
.collect();
|
||||
let ret = paths.len();
|
||||
|
||||
logging::debug!(
|
||||
debug!(
|
||||
"Deleting {} logs from manifest storage path {} until {}, checkpoint: {:?}, paths: {:?}",
|
||||
ret,
|
||||
self.path,
|
||||
@@ -300,7 +300,7 @@ impl ManifestObjectStore {
|
||||
.map(|e| e.path().to_string())
|
||||
.collect();
|
||||
|
||||
logging::info!(
|
||||
info!(
|
||||
"Deleting {} from manifest storage path {} paths: {:?}",
|
||||
paths.len(),
|
||||
self.path,
|
||||
@@ -318,14 +318,14 @@ impl ManifestObjectStore {
|
||||
.remove_all(&self.path)
|
||||
.await
|
||||
.context(OpenDalSnafu)?;
|
||||
logging::info!("Deleted manifest storage path {}", self.path);
|
||||
info!("Deleted manifest storage path {}", self.path);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn save(&self, version: ManifestVersion, bytes: &[u8]) -> Result<()> {
|
||||
let path = self.delta_file_path(version);
|
||||
logging::debug!("Save log to manifest storage, version: {}", version);
|
||||
debug!("Save log to manifest storage, version: {}", version);
|
||||
let data = self
|
||||
.compress_type
|
||||
.encode(bytes)
|
||||
@@ -357,10 +357,9 @@ impl ManifestObjectStore {
|
||||
}
|
||||
}
|
||||
|
||||
logging::debug!(
|
||||
debug!(
|
||||
"Deleting logs from manifest storage, start: {}, end: {}",
|
||||
start,
|
||||
end
|
||||
start, end
|
||||
);
|
||||
|
||||
self.object_store
|
||||
@@ -396,10 +395,9 @@ impl ManifestObjectStore {
|
||||
extend_metadata: HashMap::new(),
|
||||
};
|
||||
|
||||
logging::debug!(
|
||||
debug!(
|
||||
"Save checkpoint in path: {}, metadata: {:?}",
|
||||
last_checkpoint_path,
|
||||
checkpoint_metadata
|
||||
last_checkpoint_path, checkpoint_metadata
|
||||
);
|
||||
|
||||
let bytes = checkpoint_metadata.encode()?;
|
||||
@@ -437,10 +435,9 @@ impl ManifestObjectStore {
|
||||
&checkpoint_file(version),
|
||||
FALL_BACK_COMPRESS_TYPE,
|
||||
);
|
||||
logging::debug!(
|
||||
debug!(
|
||||
"Failed to load checkpoint from path: {}, fall back to path: {}",
|
||||
path,
|
||||
fall_back_path
|
||||
path, fall_back_path
|
||||
);
|
||||
match self.object_store.read(&fall_back_path).await {
|
||||
Ok(checkpoint) => {
|
||||
@@ -506,10 +503,9 @@ impl ManifestObjectStore {
|
||||
|
||||
let checkpoint_metadata = CheckpointMetadata::decode(&last_checkpoint_data)?;
|
||||
|
||||
logging::debug!(
|
||||
debug!(
|
||||
"Load checkpoint in path: {}, metadata: {:?}",
|
||||
last_checkpoint_path,
|
||||
checkpoint_metadata
|
||||
last_checkpoint_path, checkpoint_metadata
|
||||
);
|
||||
|
||||
self.load_checkpoint(checkpoint_metadata.version).await
|
||||
|
||||
@@ -20,8 +20,10 @@ mod version;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use common_telemetry::info;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::manifest::manager::RegionManifestManager;
|
||||
use crate::region::version::VersionControlRef;
|
||||
|
||||
@@ -45,6 +47,17 @@ pub(crate) struct MitoRegion {
|
||||
|
||||
pub(crate) type MitoRegionRef = Arc<MitoRegion>;
|
||||
|
||||
impl MitoRegion {
|
||||
/// Stop background tasks for this region.
|
||||
pub(crate) async fn stop(&self) -> Result<()> {
|
||||
self.manifest_manager.stop().await?;
|
||||
|
||||
info!("Stopped region, region_id: {}", self.region_id);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Regions indexed by ids.
|
||||
#[derive(Debug, Default)]
|
||||
pub(crate) struct RegionMap {
|
||||
@@ -63,6 +76,29 @@ impl RegionMap {
|
||||
let mut regions = self.regions.write().unwrap();
|
||||
regions.insert(region.region_id, region);
|
||||
}
|
||||
|
||||
/// Get region by region id.
|
||||
pub(crate) fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
|
||||
let regions = self.regions.read().unwrap();
|
||||
regions.get(®ion_id).cloned()
|
||||
}
|
||||
|
||||
/// Remove region by id.
|
||||
pub(crate) fn remove_region(&self, region_id: RegionId) {
|
||||
let mut regions = self.regions.write().unwrap();
|
||||
regions.remove(®ion_id);
|
||||
}
|
||||
|
||||
/// List all regions.
|
||||
pub(crate) fn list_regions(&self) -> Vec<MitoRegionRef> {
|
||||
let regions = self.regions.read().unwrap();
|
||||
regions.values().cloned().collect()
|
||||
}
|
||||
|
||||
/// Clear the map.
|
||||
pub(crate) fn clear(&self) {
|
||||
self.regions.write().unwrap().clear();
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) type RegionMapRef = Arc<RegionMap>;
|
||||
|
||||
@@ -18,9 +18,11 @@ use std::sync::Arc;
|
||||
|
||||
use object_store::util::join_dir;
|
||||
use object_store::ObjectStore;
|
||||
use snafu::{ensure, OptionExt};
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::config::MitoConfig;
|
||||
use crate::error::Result;
|
||||
use crate::error::{RegionCorruptedSnafu, RegionNotFoundSnafu, Result};
|
||||
use crate::manifest::manager::RegionManifestManager;
|
||||
use crate::manifest::options::RegionManifestOptions;
|
||||
use crate::memtable::MemtableBuilderRef;
|
||||
@@ -30,7 +32,8 @@ use crate::region::MitoRegion;
|
||||
|
||||
/// Builder to create a new [MitoRegion] or open an existing one.
|
||||
pub(crate) struct RegionOpener {
|
||||
metadata: RegionMetadata,
|
||||
region_id: RegionId,
|
||||
metadata: Option<RegionMetadata>,
|
||||
memtable_builder: MemtableBuilderRef,
|
||||
object_store: ObjectStore,
|
||||
region_dir: String,
|
||||
@@ -39,18 +42,25 @@ pub(crate) struct RegionOpener {
|
||||
impl RegionOpener {
|
||||
/// Returns a new opener.
|
||||
pub(crate) fn new(
|
||||
metadata: RegionMetadata,
|
||||
region_id: RegionId,
|
||||
memtable_builder: MemtableBuilderRef,
|
||||
object_store: ObjectStore,
|
||||
) -> RegionOpener {
|
||||
RegionOpener {
|
||||
metadata,
|
||||
region_id,
|
||||
metadata: None,
|
||||
memtable_builder,
|
||||
object_store,
|
||||
region_dir: String::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Sets metadata of the region to create.
|
||||
pub(crate) fn metadata(mut self, metadata: RegionMetadata) -> Self {
|
||||
self.metadata = Some(metadata);
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets the region dir.
|
||||
pub(crate) fn region_dir(mut self, value: &str) -> Self {
|
||||
self.region_dir = value.to_string();
|
||||
@@ -58,9 +68,12 @@ impl RegionOpener {
|
||||
}
|
||||
|
||||
/// Writes region manifest and creates a new region.
|
||||
///
|
||||
/// # Panics
|
||||
/// Panics if metadata is not set.
|
||||
pub(crate) async fn create(self, config: &MitoConfig) -> Result<MitoRegion> {
|
||||
let region_id = self.metadata.region_id;
|
||||
let metadata = Arc::new(self.metadata);
|
||||
let region_id = self.region_id;
|
||||
let metadata = Arc::new(self.metadata.unwrap());
|
||||
|
||||
// Create a manifest manager for this region.
|
||||
let options = RegionManifestOptions {
|
||||
@@ -83,6 +96,47 @@ impl RegionOpener {
|
||||
manifest_manager,
|
||||
})
|
||||
}
|
||||
|
||||
/// Opens an existing region.
|
||||
///
|
||||
/// Returns error if the region doesn't exist.
|
||||
pub(crate) async fn open(self, config: &MitoConfig) -> Result<MitoRegion> {
|
||||
let options = RegionManifestOptions {
|
||||
manifest_dir: new_manifest_dir(&self.region_dir),
|
||||
object_store: self.object_store,
|
||||
compress_type: config.manifest_compress_type,
|
||||
checkpoint_interval: config.manifest_checkpoint_interval,
|
||||
};
|
||||
let manifest_manager =
|
||||
RegionManifestManager::open(options)
|
||||
.await?
|
||||
.context(RegionNotFoundSnafu {
|
||||
region_id: self.region_id,
|
||||
})?;
|
||||
|
||||
let manifest = manifest_manager.manifest().await;
|
||||
let metadata = manifest.metadata.clone();
|
||||
|
||||
ensure!(
|
||||
metadata.region_id == self.region_id,
|
||||
RegionCorruptedSnafu {
|
||||
region_id: self.region_id,
|
||||
reason: format!("region id in metadata is {}", metadata.region_id),
|
||||
}
|
||||
);
|
||||
|
||||
let mutable = self.memtable_builder.build(&metadata);
|
||||
let version = VersionBuilder::new(metadata, mutable).build();
|
||||
let version_control = Arc::new(VersionControl::new(version));
|
||||
|
||||
// TODO(yingwen): Replay.
|
||||
|
||||
Ok(MitoRegion {
|
||||
region_id: self.region_id,
|
||||
version_control,
|
||||
manifest_manager,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the directory to the manifest files.
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
|
||||
//! Parquet writer.
|
||||
|
||||
use common_telemetry::logging;
|
||||
use common_telemetry::debug;
|
||||
use object_store::ObjectStore;
|
||||
use parquet::basic::{Compression, Encoding, ZstdLevel};
|
||||
use parquet::file::metadata::KeyValue;
|
||||
@@ -80,7 +80,7 @@ impl<'a> ParquetWriter<'a> {
|
||||
let stats = self.source.stats();
|
||||
|
||||
if stats.num_rows == 0 {
|
||||
logging::debug!(
|
||||
debug!(
|
||||
"No data written, try to stop the writer: {}",
|
||||
self.file_path
|
||||
);
|
||||
|
||||
@@ -42,9 +42,22 @@ pub struct TestEnv {
|
||||
data_home: TempDir,
|
||||
}
|
||||
|
||||
impl Default for TestEnv {
|
||||
fn default() -> Self {
|
||||
TestEnv::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl TestEnv {
|
||||
/// Returns a new env with empty prefix for test.
|
||||
pub fn new() -> TestEnv {
|
||||
TestEnv {
|
||||
data_home: create_temp_dir(""),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a new env with specific `prefix` for test.
|
||||
pub fn new(prefix: &str) -> TestEnv {
|
||||
pub fn with_prefix(prefix: &str) -> TestEnv {
|
||||
TestEnv {
|
||||
data_home: create_temp_dir(prefix),
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
//! Structs and utilities for writing regions.
|
||||
|
||||
mod handle_close;
|
||||
mod handle_create;
|
||||
mod handle_open;
|
||||
pub(crate) mod request;
|
||||
@@ -24,7 +25,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_runtime::JoinHandle;
|
||||
use common_telemetry::logging;
|
||||
use common_telemetry::{error, info, warn};
|
||||
use futures::future::try_join_all;
|
||||
use object_store::ObjectStore;
|
||||
use snafu::{ensure, ResultExt};
|
||||
@@ -42,6 +43,7 @@ use crate::worker::request::{RegionRequest, RequestBody, WorkerRequest};
|
||||
/// Identifier for a worker.
|
||||
pub(crate) type WorkerId = u32;
|
||||
|
||||
#[cfg_attr(doc, aquamarine::aquamarine)]
|
||||
/// A fixed size group of [RegionWorkers](RegionWorker).
|
||||
///
|
||||
/// A worker group binds each region to a specific [RegionWorker] and sends
|
||||
@@ -111,7 +113,7 @@ impl WorkerGroup {
|
||||
|
||||
/// Stop the worker group.
|
||||
pub(crate) async fn stop(&self) -> Result<()> {
|
||||
logging::info!("Stop region worker group");
|
||||
info!("Stop region worker group");
|
||||
|
||||
try_join_all(self.workers.iter().map(|worker| worker.stop())).await?;
|
||||
|
||||
@@ -204,7 +206,7 @@ impl RegionWorker {
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
logging::warn!(
|
||||
warn!(
|
||||
"Worker {} is already exited but the running flag is still true",
|
||||
self.id
|
||||
);
|
||||
@@ -222,11 +224,11 @@ impl RegionWorker {
|
||||
async fn stop(&self) -> Result<()> {
|
||||
let handle = self.handle.lock().await.take();
|
||||
if let Some(handle) = handle {
|
||||
logging::info!("Stop region worker {}", self.id);
|
||||
info!("Stop region worker {}", self.id);
|
||||
|
||||
self.set_running(false);
|
||||
if self.sender.send(WorkerRequest::Stop).await.is_err() {
|
||||
logging::warn!("Worker {} is already exited before stop", self.id);
|
||||
warn!("Worker {} is already exited before stop", self.id);
|
||||
}
|
||||
|
||||
handle.await.context(JoinSnafu)?;
|
||||
@@ -285,7 +287,7 @@ struct RegionWorkerLoop<S> {
|
||||
impl<S> RegionWorkerLoop<S> {
|
||||
/// Starts the worker loop.
|
||||
async fn run(&mut self) {
|
||||
logging::info!("Start region worker thread {}", self.id);
|
||||
info!("Start region worker thread {}", self.id);
|
||||
|
||||
// Buffer to retrieve requests from receiver.
|
||||
let mut buffer = RequestBuffer::with_capacity(self.config.worker_request_batch_size);
|
||||
@@ -312,7 +314,9 @@ impl<S> RegionWorkerLoop<S> {
|
||||
self.handle_requests(&mut buffer).await;
|
||||
}
|
||||
|
||||
logging::info!("Exit region worker thread {}", self.id);
|
||||
self.clean().await;
|
||||
|
||||
info!("Exit region worker thread {}", self.id);
|
||||
}
|
||||
|
||||
/// Dispatches and processes requests.
|
||||
@@ -367,6 +371,7 @@ impl<S> RegionWorkerLoop<S> {
|
||||
let res = match request.body {
|
||||
RequestBody::Create(req) => self.handle_create_request(req).await,
|
||||
RequestBody::Open(req) => self.handle_open_request(req).await,
|
||||
RequestBody::Close(req) => self.handle_close_request(req).await,
|
||||
RequestBody::Write(_) => unreachable!(),
|
||||
};
|
||||
|
||||
@@ -376,6 +381,19 @@ impl<S> RegionWorkerLoop<S> {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Clean up the worker.
|
||||
async fn clean(&self) {
|
||||
// Closes remaining regions.
|
||||
let regions = self.regions.list_regions();
|
||||
for region in regions {
|
||||
if let Err(e) = region.stop().await {
|
||||
error!(e; "Failed to stop region {}", region.region_id);
|
||||
}
|
||||
}
|
||||
|
||||
self.regions.clear();
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -398,7 +416,7 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_worker_group_start_stop() {
|
||||
let env = TestEnv::new("group-stop");
|
||||
let env = TestEnv::with_prefix("group-stop");
|
||||
let group = env
|
||||
.create_worker_group(MitoConfig {
|
||||
num_workers: 4,
|
||||
|
||||
38
src/mito2/src/worker/handle_close.rs
Normal file
38
src/mito2/src/worker/handle_close.rs
Normal file
@@ -0,0 +1,38 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Handling close request.
|
||||
|
||||
use common_telemetry::info;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::worker::request::CloseRequest;
|
||||
use crate::worker::RegionWorkerLoop;
|
||||
|
||||
impl<S> RegionWorkerLoop<S> {
|
||||
pub(crate) async fn handle_close_request(&mut self, request: CloseRequest) -> Result<()> {
|
||||
let Some(region) = self.regions.get_region(request.region_id) else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
info!("Try to close region {}", request.region_id);
|
||||
|
||||
region.stop().await?;
|
||||
self.regions.remove_region(request.region_id);
|
||||
|
||||
info!("Region {} closed", request.region_id);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -16,7 +16,7 @@
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_telemetry::logging;
|
||||
use common_telemetry::info;
|
||||
use snafu::ensure;
|
||||
|
||||
use crate::error::{RegionExistsSnafu, Result};
|
||||
@@ -50,16 +50,19 @@ impl<S> RegionWorkerLoop<S> {
|
||||
|
||||
// Create a MitoRegion from the RegionMetadata.
|
||||
let region = RegionOpener::new(
|
||||
metadata,
|
||||
request.region_id,
|
||||
self.memtable_builder.clone(),
|
||||
self.object_store.clone(),
|
||||
)
|
||||
.metadata(metadata)
|
||||
.region_dir(&request.region_dir)
|
||||
.create(&self.config)
|
||||
.await?;
|
||||
|
||||
// TODO(yingwen): Custom the Debug format for the metadata and also print it.
|
||||
logging::info!("A new region created, region_id: {}", region.region_id);
|
||||
info!("A new region created, region_id: {}", region.region_id);
|
||||
|
||||
// TODO(yingwen): Metrics.
|
||||
|
||||
// Insert the MitoRegion into the RegionMap.
|
||||
self.regions.insert_region(Arc::new(region));
|
||||
|
||||
@@ -14,12 +14,38 @@
|
||||
|
||||
//! Handling open request.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_telemetry::info;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::region::opener::RegionOpener;
|
||||
use crate::worker::request::OpenRequest;
|
||||
use crate::worker::RegionWorkerLoop;
|
||||
|
||||
impl<S> RegionWorkerLoop<S> {
|
||||
pub(crate) async fn handle_open_request(&mut self, _request: OpenRequest) -> Result<()> {
|
||||
unimplemented!()
|
||||
pub(crate) async fn handle_open_request(&mut self, request: OpenRequest) -> Result<()> {
|
||||
if self.regions.is_region_exists(request.region_id) {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
info!("Try to open region {}", request.region_id);
|
||||
|
||||
// Open region from specific region dir.
|
||||
let region = RegionOpener::new(
|
||||
request.region_id,
|
||||
self.memtable_builder.clone(),
|
||||
self.object_store.clone(),
|
||||
)
|
||||
.region_dir(&request.region_dir)
|
||||
.open(&self.config)
|
||||
.await?;
|
||||
|
||||
info!("Region {} is opened", request.region_id);
|
||||
|
||||
// Insert the MitoRegion into the RegionMap.
|
||||
self.regions.insert_region(Arc::new(region));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -69,10 +69,19 @@ pub struct CreateRequest {
|
||||
pub struct OpenRequest {
|
||||
/// Region to open.
|
||||
pub region_id: RegionId,
|
||||
/// Data directory of the region.
|
||||
pub region_dir: String,
|
||||
/// Options of the created region.
|
||||
pub options: RegionOptions,
|
||||
}
|
||||
|
||||
/// Close region request.
|
||||
#[derive(Debug)]
|
||||
pub struct CloseRequest {
|
||||
/// Region to close.
|
||||
pub region_id: RegionId,
|
||||
}
|
||||
|
||||
/// Request to write a region.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct WriteRequest {
|
||||
@@ -93,6 +102,9 @@ pub(crate) enum WorkerRequest {
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct RegionRequest {
|
||||
/// Sender to send result.
|
||||
///
|
||||
/// Now the result is a `Result<()>`, but we could replace the empty tuple
|
||||
/// with an enum if we need to carry more information.
|
||||
pub(crate) sender: Option<Sender<Result<()>>>,
|
||||
/// Request body.
|
||||
pub(crate) body: RequestBody,
|
||||
@@ -124,6 +136,8 @@ pub(crate) enum RequestBody {
|
||||
Create(CreateRequest),
|
||||
/// Opens an existing region.
|
||||
Open(OpenRequest),
|
||||
/// Closes a region.
|
||||
Close(CloseRequest),
|
||||
}
|
||||
|
||||
impl RequestBody {
|
||||
@@ -133,6 +147,7 @@ impl RequestBody {
|
||||
RequestBody::Write(req) => req.region_id,
|
||||
RequestBody::Create(req) => req.region_id,
|
||||
RequestBody::Open(req) => req.region_id,
|
||||
RequestBody::Close(req) => req.region_id,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -142,6 +157,7 @@ impl RequestBody {
|
||||
RequestBody::Write(_) => false,
|
||||
RequestBody::Create(_) => true,
|
||||
RequestBody::Open(_) => true,
|
||||
RequestBody::Close(_) => true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user