mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-08 14:22:58 +00:00
feat(mito): create region in mito2 engine (#1999)
* chore: check table existence * refactor: rename LevelMetaVec * feat: create request to metadata * refactor: Share MitoConfig between workers * feat: impl handle_create_request * refactor: move tests mod * feat: validate time index nullable * feat: test create region * feat: test create if not exists * feat: remove option * style: fix clippy * chore: address CR comments
This commit is contained in:
@@ -14,19 +14,32 @@
|
||||
|
||||
//! Configurations.
|
||||
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_datasource::compression::CompressionType;
|
||||
use common_telemetry::logging;
|
||||
|
||||
/// Default region worker num.
|
||||
const DEFAULT_NUM_WORKERS: usize = 1;
|
||||
/// Default region write buffer size.
|
||||
pub(crate) const DEFAULT_WRITE_BUFFER_SIZE: ReadableSize = ReadableSize::mb(32);
|
||||
|
||||
/// Configuration for [MitoEngine](crate::engine::MitoEngine).
|
||||
#[derive(Debug)]
|
||||
pub struct MitoConfig {
|
||||
/// Number of region workers.
|
||||
// Worker configs:
|
||||
/// Number of region workers (default 1).
|
||||
pub num_workers: usize,
|
||||
/// Request channel size of each worker.
|
||||
/// Request channel size of each worker (default 128).
|
||||
pub worker_channel_size: usize,
|
||||
/// Max batch size for a worker to handle requests.
|
||||
/// Max batch size for a worker to handle requests (default 64).
|
||||
pub worker_request_batch_size: usize,
|
||||
|
||||
// Manifest configs:
|
||||
/// Number of meta action updated to trigger a new checkpoint
|
||||
/// for the manifest (default 10).
|
||||
pub manifest_checkpoint_interval: u64,
|
||||
/// Manifest compression type (default uncompressed).
|
||||
pub manifest_compress_type: CompressionType,
|
||||
}
|
||||
|
||||
impl Default for MitoConfig {
|
||||
@@ -35,6 +48,8 @@ impl Default for MitoConfig {
|
||||
num_workers: DEFAULT_NUM_WORKERS,
|
||||
worker_channel_size: 128,
|
||||
worker_request_batch_size: 64,
|
||||
manifest_checkpoint_interval: 10,
|
||||
manifest_compress_type: CompressionType::Uncompressed,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,11 +12,17 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Mito region engine.
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use object_store::ObjectStore;
|
||||
use snafu::ResultExt;
|
||||
use store_api::logstore::LogStore;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::config::MitoConfig;
|
||||
use crate::error::{RecvSnafu, Result};
|
||||
@@ -53,6 +59,11 @@ impl MitoEngine {
|
||||
pub async fn create_region(&self, request: CreateRequest) -> Result<()> {
|
||||
self.inner.create_region(request).await
|
||||
}
|
||||
|
||||
/// Returns true if the specific region exists.
|
||||
pub fn is_region_exists(&self, region_id: RegionId) -> bool {
|
||||
self.inner.workers.is_region_exists(region_id)
|
||||
}
|
||||
}
|
||||
|
||||
/// Inner struct of [MitoEngine].
|
||||
@@ -69,7 +80,7 @@ impl EngineInner {
|
||||
object_store: ObjectStore,
|
||||
) -> EngineInner {
|
||||
EngineInner {
|
||||
workers: WorkerGroup::start(&config, log_store, object_store),
|
||||
workers: WorkerGroup::start(config, log_store, object_store),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -86,17 +97,3 @@ impl EngineInner {
|
||||
receiver.await.context(RecvSnafu)?
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::test_util::TestEnv;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_engine_new_stop() {
|
||||
let env = TestEnv::new("engine-stop");
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
|
||||
engine.stop().await.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
76
src/mito2/src/engine/tests.rs
Normal file
76
src/mito2/src/engine/tests.rs
Normal file
@@ -0,0 +1,76 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Tests for mito engine.
|
||||
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use super::*;
|
||||
use crate::error::Error;
|
||||
use crate::test_util::{CreateRequestBuilder, TestEnv};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_engine_new_stop() {
|
||||
let env = TestEnv::new("engine-stop");
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
|
||||
engine.stop().await.unwrap();
|
||||
|
||||
let request = CreateRequestBuilder::new(RegionId::new(1, 1)).build();
|
||||
let err = engine.create_region(request).await.unwrap_err();
|
||||
assert!(
|
||||
matches!(err, Error::WorkerStopped { .. }),
|
||||
"unexpected err: {err}"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_engine_create_new_region() {
|
||||
let env = TestEnv::new("new-region");
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new(region_id).build();
|
||||
engine.create_region(request).await.unwrap();
|
||||
|
||||
assert!(engine.is_region_exists(region_id));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_engine_create_region_if_not_exists() {
|
||||
let env = TestEnv::new("create-not-exists");
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
|
||||
let builder = CreateRequestBuilder::new(RegionId::new(1, 1)).create_if_not_exists(true);
|
||||
engine.create_region(builder.build()).await.unwrap();
|
||||
|
||||
// Create the same region again.
|
||||
engine.create_region(builder.build()).await.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_engine_create_existing_region() {
|
||||
let env = TestEnv::new("create-existing");
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
|
||||
let builder = CreateRequestBuilder::new(RegionId::new(1, 1));
|
||||
engine.create_region(builder.build()).await.unwrap();
|
||||
|
||||
// Create the same region again.
|
||||
let err = engine.create_region(builder.build()).await.unwrap_err();
|
||||
assert!(
|
||||
matches!(err, Error::RegionExists { .. }),
|
||||
"unexpected err: {err}"
|
||||
);
|
||||
}
|
||||
@@ -19,6 +19,7 @@ use common_error::ext::ErrorExt;
|
||||
use common_error::status_code::StatusCode;
|
||||
use snafu::{Location, Snafu};
|
||||
use store_api::manifest::ManifestVersion;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::worker::WorkerId;
|
||||
|
||||
@@ -110,6 +111,12 @@ pub enum Error {
|
||||
source: datatypes::error::Error,
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Region {} already exists, location: {}", region_id, location))]
|
||||
RegionExists {
|
||||
region_id: RegionId,
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -120,9 +127,11 @@ impl ErrorExt for Error {
|
||||
|
||||
match self {
|
||||
OpenDal { .. } => StatusCode::StorageUnavailable,
|
||||
CompressObject { .. } | DecompressObject { .. } | SerdeJson { .. } | Utf8 { .. } => {
|
||||
StatusCode::Unexpected
|
||||
}
|
||||
CompressObject { .. }
|
||||
| DecompressObject { .. }
|
||||
| SerdeJson { .. }
|
||||
| Utf8 { .. }
|
||||
| RegionExists { .. } => StatusCode::Unexpected,
|
||||
InvalidScanIndex { .. }
|
||||
| InitialMetadata { .. }
|
||||
| InvalidMeta { .. }
|
||||
|
||||
@@ -292,7 +292,7 @@ mod test {
|
||||
async fn create_region_without_initial_metadata() {
|
||||
let env = TestEnv::new("");
|
||||
let result = env
|
||||
.create_manifest_manager(CompressionType::Uncompressed, None, None)
|
||||
.create_manifest_manager(CompressionType::Uncompressed, 10, None)
|
||||
.await;
|
||||
assert!(matches!(
|
||||
result.err().unwrap(),
|
||||
@@ -305,7 +305,7 @@ mod test {
|
||||
let metadata = basic_region_metadata();
|
||||
let env = TestEnv::new("");
|
||||
let manager = env
|
||||
.create_manifest_manager(CompressionType::Uncompressed, None, Some(metadata.clone()))
|
||||
.create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@@ -318,7 +318,7 @@ mod test {
|
||||
let metadata = basic_region_metadata();
|
||||
let env = TestEnv::new("");
|
||||
let manager = env
|
||||
.create_manifest_manager(CompressionType::Uncompressed, None, Some(metadata.clone()))
|
||||
.create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
||||
@@ -24,10 +24,10 @@ pub struct RegionManifestOptions {
|
||||
pub manifest_dir: String,
|
||||
pub object_store: ObjectStore,
|
||||
pub compress_type: CompressionType,
|
||||
/// Interval of version ([ManifestVersion](store_api::manifest::ManifestVersion)) between two checkpoints
|
||||
/// `None` means disable checkpoint.
|
||||
pub checkpoint_interval: Option<u64>,
|
||||
/// Interval of version ([ManifestVersion](store_api::manifest::ManifestVersion)) between two checkpoints.
|
||||
pub checkpoint_interval: u64,
|
||||
/// Initial [RegionMetadata](crate::metadata::RegionMetadata) of this region.
|
||||
/// Only need to set when create a new region, otherwise it will be ignored.
|
||||
// TODO(yingwen): Could we pass RegionMetadataRef?
|
||||
pub initial_metadata: Option<RegionMetadata>,
|
||||
}
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
pub(crate) mod version;
|
||||
|
||||
use std::fmt;
|
||||
use std::sync::atomic::{AtomicU32, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::metadata::RegionMetadataRef;
|
||||
@@ -41,3 +42,39 @@ pub trait MemtableBuilder: Send + Sync + fmt::Debug {
|
||||
}
|
||||
|
||||
pub type MemtableBuilderRef = Arc<dyn MemtableBuilder>;
|
||||
|
||||
// TODO(yingwen): Remove it once we port the memtable.
|
||||
/// Empty memtable for test.
|
||||
#[derive(Debug, Default)]
|
||||
pub(crate) struct EmptyMemtable {
|
||||
/// Id of this memtable.
|
||||
id: MemtableId,
|
||||
}
|
||||
|
||||
impl EmptyMemtable {
|
||||
/// Returns a new memtable with specific `id`.
|
||||
pub(crate) fn new(id: MemtableId) -> EmptyMemtable {
|
||||
EmptyMemtable { id }
|
||||
}
|
||||
}
|
||||
|
||||
impl Memtable for EmptyMemtable {
|
||||
fn id(&self) -> MemtableId {
|
||||
self.id
|
||||
}
|
||||
}
|
||||
|
||||
/// Default memtable builder.
|
||||
#[derive(Debug, Default)]
|
||||
pub(crate) struct DefaultMemtableBuilder {
|
||||
/// Next memtable id.
|
||||
next_id: AtomicU32,
|
||||
}
|
||||
|
||||
impl MemtableBuilder for DefaultMemtableBuilder {
|
||||
fn build(&self, _metadata: &RegionMetadataRef) -> MemtableRef {
|
||||
Arc::new(EmptyMemtable::new(
|
||||
self.next_id.fetch_add(1, Ordering::Relaxed),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -27,6 +27,9 @@ use store_api::storage::{ColumnId, RegionId};
|
||||
use crate::error::{InvalidMetaSnafu, InvalidSchemaSnafu, Result};
|
||||
use crate::region::VersionNumber;
|
||||
|
||||
/// Initial version number of a new region.
|
||||
pub(crate) const INIT_REGION_VERSION: VersionNumber = 0;
|
||||
|
||||
#[cfg_attr(doc, aquamarine::aquamarine)]
|
||||
/// Static metadata of a region.
|
||||
///
|
||||
@@ -57,6 +60,9 @@ pub struct RegionMetadata {
|
||||
/// Latest schema constructed from [column_metadatas](RegionMetadata::column_metadatas).
|
||||
#[serde(skip)]
|
||||
pub schema: SchemaRef,
|
||||
|
||||
// We don't pub `time_index` and `id_to_index` and always construct them via [SkippedFields]
|
||||
// so we can assumes they are valid.
|
||||
/// Id of the time index column.
|
||||
#[serde(skip)]
|
||||
time_index: ColumnId,
|
||||
@@ -153,6 +159,22 @@ impl SkippedFields {
|
||||
}
|
||||
|
||||
impl RegionMetadata {
|
||||
/// Find column by id.
|
||||
pub(crate) fn column_by_id(&self, column_id: ColumnId) -> Option<&ColumnMetadata> {
|
||||
self.id_to_index
|
||||
.get(&column_id)
|
||||
.map(|index| &self.column_metadatas[*index])
|
||||
}
|
||||
|
||||
/// Returns the time index column
|
||||
///
|
||||
/// # Panics
|
||||
/// Panics if the time index column id is invalid.
|
||||
pub(crate) fn time_index_column(&self) -> &ColumnMetadata {
|
||||
let index = self.id_to_index[&self.time_index];
|
||||
&self.column_metadatas[index]
|
||||
}
|
||||
|
||||
/// Checks whether the metadata is valid.
|
||||
fn validate(&self) -> Result<()> {
|
||||
// Id to name.
|
||||
@@ -188,6 +210,17 @@ impl RegionMetadata {
|
||||
}
|
||||
);
|
||||
|
||||
// Checks the time index column is not nullable.
|
||||
ensure!(
|
||||
!self.time_index_column().column_schema.is_nullable(),
|
||||
InvalidMetaSnafu {
|
||||
reason: format!(
|
||||
"time index column {} must be NOT NULL",
|
||||
self.time_index_column().column_schema.name
|
||||
),
|
||||
}
|
||||
);
|
||||
|
||||
if !self.primary_key.is_empty() {
|
||||
let mut pk_ids = HashSet::with_capacity(self.primary_key.len());
|
||||
// Checks column ids in the primary key is valid.
|
||||
@@ -362,6 +395,17 @@ mod test {
|
||||
builder.build().unwrap()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_region_metadata() {
|
||||
let region_metadata = build_test_region_metadata();
|
||||
assert_eq!("c", region_metadata.time_index_column().column_schema.name);
|
||||
assert_eq!(
|
||||
"a",
|
||||
region_metadata.column_by_id(1).unwrap().column_schema.name
|
||||
);
|
||||
assert_eq!(None, region_metadata.column_by_id(10));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_region_metadata_serde() {
|
||||
let region_metadata = build_test_region_metadata();
|
||||
@@ -384,8 +428,7 @@ mod test {
|
||||
let err = builder.build().unwrap_err();
|
||||
assert!(
|
||||
err.to_string().contains("ts is not timestamp compatible"),
|
||||
"unexpected err: {}",
|
||||
err
|
||||
"unexpected err: {err}",
|
||||
);
|
||||
}
|
||||
|
||||
@@ -396,8 +439,7 @@ mod test {
|
||||
// A region must have a time index.
|
||||
assert!(
|
||||
err.to_string().contains("time index not found"),
|
||||
"unexpected err: {}",
|
||||
err
|
||||
"unexpected err: {err}",
|
||||
);
|
||||
}
|
||||
|
||||
@@ -423,8 +465,7 @@ mod test {
|
||||
assert!(
|
||||
err.to_string()
|
||||
.contains("column a and b have the same column id"),
|
||||
"unexpected err: {}",
|
||||
err
|
||||
"unexpected err: {err}",
|
||||
);
|
||||
}
|
||||
|
||||
@@ -453,8 +494,7 @@ mod test {
|
||||
let err = builder.build().unwrap_err();
|
||||
assert!(
|
||||
err.to_string().contains("expect only one time index"),
|
||||
"unexpected err: {}",
|
||||
err
|
||||
"unexpected err: {err}",
|
||||
);
|
||||
}
|
||||
|
||||
@@ -480,8 +520,7 @@ mod test {
|
||||
let err = builder.build().unwrap_err();
|
||||
assert!(
|
||||
err.to_string().contains("unknown column id 3"),
|
||||
"unexpected err: {}",
|
||||
err
|
||||
"unexpected err: {err}",
|
||||
);
|
||||
}
|
||||
|
||||
@@ -508,8 +547,7 @@ mod test {
|
||||
assert!(
|
||||
err.to_string()
|
||||
.contains("duplicate column a in primary key"),
|
||||
"unexpected err: {}",
|
||||
err
|
||||
"unexpected err: {err}",
|
||||
);
|
||||
}
|
||||
|
||||
@@ -531,8 +569,27 @@ mod test {
|
||||
assert!(
|
||||
err.to_string()
|
||||
.contains("column ts is already a time index column"),
|
||||
"unexpected err: {}",
|
||||
err
|
||||
"unexpected err: {err}",
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_nullable_time_index() {
|
||||
let mut builder = create_builder();
|
||||
builder.push_column_metadata(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new(
|
||||
"ts",
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
true,
|
||||
),
|
||||
semantic_type: SemanticType::Timestamp,
|
||||
column_id: 1,
|
||||
});
|
||||
let err = builder.build().unwrap_err();
|
||||
assert!(
|
||||
err.to_string()
|
||||
.contains("time index column ts must be NOT NULL"),
|
||||
"unexpected err: {err}",
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
//! Mito region.
|
||||
|
||||
pub(crate) mod opener;
|
||||
mod version;
|
||||
|
||||
use std::collections::HashMap;
|
||||
@@ -21,9 +22,8 @@ use std::sync::{Arc, RwLock};
|
||||
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::memtable::MemtableBuilderRef;
|
||||
use crate::metadata::RegionMetadataRef;
|
||||
use crate::region::version::{VersionBuilder, VersionControl, VersionControlRef};
|
||||
use crate::manifest::manager::RegionManifestManager;
|
||||
use crate::region::version::VersionControlRef;
|
||||
|
||||
/// Type to store region version.
|
||||
pub type VersionNumber = u32;
|
||||
@@ -31,7 +31,16 @@ pub type VersionNumber = u32;
|
||||
/// Metadata and runtime status of a region.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct MitoRegion {
|
||||
/// Id of this region.
|
||||
///
|
||||
/// Accessing region id from the version control is inconvenient so
|
||||
/// we also store it here.
|
||||
pub(crate) region_id: RegionId,
|
||||
|
||||
/// Version controller for this region.
|
||||
version_control: VersionControlRef,
|
||||
/// Manager to maintain manifest for this region.
|
||||
manifest_manager: RegionManifestManager,
|
||||
}
|
||||
|
||||
pub(crate) type MitoRegionRef = Arc<MitoRegion>;
|
||||
@@ -42,33 +51,18 @@ pub(crate) struct RegionMap {
|
||||
regions: RwLock<HashMap<RegionId, MitoRegionRef>>,
|
||||
}
|
||||
|
||||
impl RegionMap {
|
||||
/// Returns true if the region exists.
|
||||
pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
|
||||
let regions = self.regions.read().unwrap();
|
||||
regions.contains_key(®ion_id)
|
||||
}
|
||||
|
||||
/// Inserts a new region into the map.
|
||||
pub(crate) fn insert_region(&self, region: MitoRegionRef) {
|
||||
let mut regions = self.regions.write().unwrap();
|
||||
regions.insert(region.region_id, region);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) type RegionMapRef = Arc<RegionMap>;
|
||||
|
||||
/// [MitoRegion] builder.
|
||||
pub(crate) struct RegionBuilder {
|
||||
metadata: RegionMetadataRef,
|
||||
memtable_builder: MemtableBuilderRef,
|
||||
}
|
||||
|
||||
impl RegionBuilder {
|
||||
/// Returns a new builder.
|
||||
pub(crate) fn new(
|
||||
metadata: RegionMetadataRef,
|
||||
memtable_builder: MemtableBuilderRef,
|
||||
) -> RegionBuilder {
|
||||
RegionBuilder {
|
||||
metadata,
|
||||
memtable_builder,
|
||||
}
|
||||
}
|
||||
|
||||
/// Builds a new region.
|
||||
pub(crate) fn build(self) -> MitoRegion {
|
||||
let mutable = self.memtable_builder.build(&self.metadata);
|
||||
|
||||
let version = VersionBuilder::new(self.metadata, mutable).build();
|
||||
let version_control = Arc::new(VersionControl::new(version));
|
||||
|
||||
MitoRegion { version_control }
|
||||
}
|
||||
}
|
||||
|
||||
92
src/mito2/src/region/opener.rs
Normal file
92
src/mito2/src/region/opener.rs
Normal file
@@ -0,0 +1,92 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Region opener.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use object_store::util::join_dir;
|
||||
use object_store::ObjectStore;
|
||||
|
||||
use crate::config::MitoConfig;
|
||||
use crate::error::Result;
|
||||
use crate::manifest::manager::RegionManifestManager;
|
||||
use crate::manifest::options::RegionManifestOptions;
|
||||
use crate::memtable::MemtableBuilderRef;
|
||||
use crate::metadata::RegionMetadata;
|
||||
use crate::region::version::{VersionBuilder, VersionControl};
|
||||
use crate::region::MitoRegion;
|
||||
|
||||
/// Builder to create a new [MitoRegion] or open an existing one.
|
||||
pub(crate) struct RegionOpener {
|
||||
metadata: RegionMetadata,
|
||||
memtable_builder: MemtableBuilderRef,
|
||||
object_store: ObjectStore,
|
||||
region_dir: String,
|
||||
}
|
||||
|
||||
impl RegionOpener {
|
||||
/// Returns a new opener.
|
||||
pub(crate) fn new(
|
||||
metadata: RegionMetadata,
|
||||
memtable_builder: MemtableBuilderRef,
|
||||
object_store: ObjectStore,
|
||||
) -> RegionOpener {
|
||||
RegionOpener {
|
||||
metadata,
|
||||
memtable_builder,
|
||||
object_store,
|
||||
region_dir: String::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Sets the region dir.
|
||||
pub(crate) fn region_dir(mut self, value: &str) -> Self {
|
||||
self.region_dir = value.to_string();
|
||||
self
|
||||
}
|
||||
|
||||
/// Writes region manifest and creates a new region.
|
||||
pub(crate) async fn create(self, config: &MitoConfig) -> Result<MitoRegion> {
|
||||
let region_id = self.metadata.region_id;
|
||||
// Create a manifest manager for this region.
|
||||
let options = RegionManifestOptions {
|
||||
manifest_dir: new_manifest_dir(&self.region_dir),
|
||||
object_store: self.object_store,
|
||||
compress_type: config.manifest_compress_type,
|
||||
checkpoint_interval: config.manifest_checkpoint_interval,
|
||||
// We are creating a new region, so we need to set this field.
|
||||
initial_metadata: Some(self.metadata.clone()),
|
||||
};
|
||||
// Writes regions to the manifest file.
|
||||
let manifest_manager = RegionManifestManager::new(options).await?;
|
||||
|
||||
let metadata = Arc::new(self.metadata);
|
||||
let mutable = self.memtable_builder.build(&metadata);
|
||||
|
||||
let version = VersionBuilder::new(metadata, mutable).build();
|
||||
let version_control = Arc::new(VersionControl::new(version));
|
||||
|
||||
Ok(MitoRegion {
|
||||
region_id,
|
||||
version_control,
|
||||
manifest_manager,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the directory to the manifest files.
|
||||
fn new_manifest_dir(region_dir: &str) -> String {
|
||||
join_dir(region_dir, "manifest")
|
||||
}
|
||||
@@ -68,8 +68,10 @@ pub(crate) struct Version {
|
||||
ssts: SstVersionRef,
|
||||
/// Inclusive max sequence of flushed data.
|
||||
flushed_sequence: SequenceNumber,
|
||||
// TODO(yingwen): Remove this.
|
||||
/// Current version of region manifest.
|
||||
manifest_version: ManifestVersion,
|
||||
// TODO(yingwen): RegionOptions.
|
||||
}
|
||||
|
||||
/// Version builder.
|
||||
|
||||
@@ -23,7 +23,7 @@ use crate::sst::file::{FileHandle, FileId, Level, MAX_LEVEL};
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct SstVersion {
|
||||
/// SST metadata organized by levels.
|
||||
levels: LevelMetaVec,
|
||||
levels: LevelMetaArray,
|
||||
}
|
||||
|
||||
pub(crate) type SstVersionRef = Arc<SstVersion>;
|
||||
@@ -38,8 +38,8 @@ impl SstVersion {
|
||||
}
|
||||
|
||||
// We only has fixed number of level, so we use array to hold elements. This implementation
|
||||
// detail of LevelMetaVec should not be exposed to the user of [LevelMetas].
|
||||
type LevelMetaVec = [LevelMeta; MAX_LEVEL as usize];
|
||||
// detail of LevelMetaArray should not be exposed to users of [LevelMetas].
|
||||
type LevelMetaArray = [LevelMeta; MAX_LEVEL as usize];
|
||||
|
||||
/// Metadata of files in the same SST level.
|
||||
pub struct LevelMeta {
|
||||
@@ -68,10 +68,10 @@ impl fmt::Debug for LevelMeta {
|
||||
}
|
||||
}
|
||||
|
||||
fn new_level_meta_vec() -> LevelMetaVec {
|
||||
fn new_level_meta_vec() -> LevelMetaArray {
|
||||
(0u8..MAX_LEVEL)
|
||||
.map(LevelMeta::new)
|
||||
.collect::<Vec<_>>()
|
||||
.try_into()
|
||||
.unwrap() // safety: LevelMetaVec is a fixed length array with length MAX_LEVEL
|
||||
.unwrap() // safety: LevelMetaArray is a fixed length array with length MAX_LEVEL
|
||||
}
|
||||
|
||||
@@ -14,24 +14,26 @@
|
||||
|
||||
//! Utilities for testing.
|
||||
|
||||
use std::sync::atomic::{AtomicU32, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_datasource::compression::CompressionType;
|
||||
use common_test_util::temp_dir::{create_temp_dir, TempDir};
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::ColumnSchema;
|
||||
use log_store::raft_engine::log_store::RaftEngineLogStore;
|
||||
use log_store::test_util::log_store_util;
|
||||
use object_store::services::Fs;
|
||||
use object_store::util::join_dir;
|
||||
use object_store::ObjectStore;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::config::MitoConfig;
|
||||
use crate::engine::MitoEngine;
|
||||
use crate::error::Result;
|
||||
use crate::manifest::manager::RegionManifestManager;
|
||||
use crate::manifest::options::RegionManifestOptions;
|
||||
use crate::memtable::{Memtable, MemtableBuilder, MemtableId, MemtableRef};
|
||||
use crate::metadata::{RegionMetadata, RegionMetadataRef};
|
||||
use crate::metadata::{ColumnMetadata, RegionMetadata, SemanticType};
|
||||
use crate::worker::request::{CreateRequest, RegionOptions};
|
||||
use crate::worker::WorkerGroup;
|
||||
|
||||
/// Env to test mito engine.
|
||||
@@ -56,7 +58,7 @@ impl TestEnv {
|
||||
}
|
||||
|
||||
/// Creates a new [WorkerGroup] with specific config under this env.
|
||||
pub(crate) async fn create_worker_group(&self, config: &MitoConfig) -> WorkerGroup {
|
||||
pub(crate) async fn create_worker_group(&self, config: MitoConfig) -> WorkerGroup {
|
||||
let (log_store, object_store) = self.create_log_and_object_store().await;
|
||||
|
||||
WorkerGroup::start(config, Arc::new(log_store), object_store)
|
||||
@@ -78,7 +80,7 @@ impl TestEnv {
|
||||
pub async fn create_manifest_manager(
|
||||
&self,
|
||||
compress_type: CompressionType,
|
||||
checkpoint_interval: Option<u64>,
|
||||
checkpoint_interval: u64,
|
||||
initial_metadata: Option<RegionMetadata>,
|
||||
) -> Result<RegionManifestManager> {
|
||||
let data_home = self.data_home.path().to_str().unwrap();
|
||||
@@ -100,36 +102,101 @@ impl TestEnv {
|
||||
}
|
||||
}
|
||||
|
||||
/// Memtable that only for testing metadata.
|
||||
#[derive(Debug, Default)]
|
||||
pub struct MetaOnlyMemtable {
|
||||
/// Id of this memtable.
|
||||
id: MemtableId,
|
||||
/// Builder to mock a [CreateRequest].
|
||||
pub struct CreateRequestBuilder {
|
||||
region_id: RegionId,
|
||||
region_dir: String,
|
||||
tag_num: usize,
|
||||
field_num: usize,
|
||||
create_if_not_exists: bool,
|
||||
}
|
||||
|
||||
impl MetaOnlyMemtable {
|
||||
/// Returns a new memtable with specific `id`.
|
||||
pub fn new(id: MemtableId) -> MetaOnlyMemtable {
|
||||
MetaOnlyMemtable { id }
|
||||
impl Default for CreateRequestBuilder {
|
||||
fn default() -> Self {
|
||||
CreateRequestBuilder {
|
||||
region_id: RegionId::default(),
|
||||
region_dir: "test".to_string(),
|
||||
tag_num: 1,
|
||||
field_num: 1,
|
||||
create_if_not_exists: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Memtable for MetaOnlyMemtable {
|
||||
fn id(&self) -> MemtableId {
|
||||
self.id
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct MetaOnlyBuilder {
|
||||
/// Next memtable id.
|
||||
next_id: AtomicU32,
|
||||
}
|
||||
|
||||
impl MemtableBuilder for MetaOnlyBuilder {
|
||||
fn build(&self, _metadata: &RegionMetadataRef) -> MemtableRef {
|
||||
Arc::new(MetaOnlyMemtable::new(
|
||||
self.next_id.fetch_add(1, Ordering::Relaxed),
|
||||
))
|
||||
impl CreateRequestBuilder {
|
||||
pub fn new(region_id: RegionId) -> CreateRequestBuilder {
|
||||
CreateRequestBuilder {
|
||||
region_id,
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn region_dir(mut self, value: &str) -> Self {
|
||||
self.region_dir = value.to_string();
|
||||
self
|
||||
}
|
||||
|
||||
pub fn tag_num(mut self, value: usize) -> Self {
|
||||
self.tag_num = value;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn field_num(mut self, value: usize) -> Self {
|
||||
self.tag_num = value;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn create_if_not_exists(mut self, value: bool) -> Self {
|
||||
self.create_if_not_exists = value;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn build(&self) -> CreateRequest {
|
||||
let mut column_id = 0;
|
||||
let mut column_metadatas = Vec::with_capacity(self.tag_num + self.field_num + 1);
|
||||
let mut primary_key = Vec::with_capacity(self.tag_num);
|
||||
for i in 0..self.tag_num {
|
||||
column_metadatas.push(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new(
|
||||
format!("tag_{i}"),
|
||||
ConcreteDataType::string_datatype(),
|
||||
true,
|
||||
),
|
||||
semantic_type: SemanticType::Tag,
|
||||
column_id,
|
||||
});
|
||||
primary_key.push(column_id);
|
||||
column_id += 1;
|
||||
}
|
||||
for i in 0..self.field_num {
|
||||
column_metadatas.push(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new(
|
||||
format!("field_{i}"),
|
||||
ConcreteDataType::float64_datatype(),
|
||||
true,
|
||||
),
|
||||
semantic_type: SemanticType::Field,
|
||||
column_id,
|
||||
});
|
||||
column_id += 1;
|
||||
}
|
||||
column_metadatas.push(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new(
|
||||
"ts",
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
false,
|
||||
),
|
||||
semantic_type: SemanticType::Timestamp,
|
||||
column_id,
|
||||
});
|
||||
|
||||
CreateRequest {
|
||||
region_id: self.region_id,
|
||||
region_dir: self.region_dir.clone(),
|
||||
column_metadatas,
|
||||
primary_key,
|
||||
create_if_not_exists: self.create_if_not_exists,
|
||||
options: RegionOptions::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -35,6 +35,7 @@ use tokio::sync::{mpsc, Mutex};
|
||||
|
||||
use crate::config::MitoConfig;
|
||||
use crate::error::{JoinSnafu, Result, WorkerStoppedSnafu};
|
||||
use crate::memtable::{DefaultMemtableBuilder, MemtableBuilderRef};
|
||||
use crate::region::{RegionMap, RegionMapRef};
|
||||
use crate::worker::request::{RegionRequest, RequestBody, WorkerRequest};
|
||||
|
||||
@@ -87,20 +88,18 @@ impl WorkerGroup {
|
||||
///
|
||||
/// The number of workers should be power of two.
|
||||
pub(crate) fn start<S: LogStore>(
|
||||
config: &MitoConfig,
|
||||
config: MitoConfig,
|
||||
log_store: Arc<S>,
|
||||
object_store: ObjectStore,
|
||||
) -> WorkerGroup {
|
||||
assert!(config.num_workers.is_power_of_two());
|
||||
let config = Arc::new(config);
|
||||
|
||||
let workers = (0..config.num_workers)
|
||||
.map(|id| {
|
||||
RegionWorker::start(
|
||||
WorkerConfig {
|
||||
id: id as WorkerId,
|
||||
channel_size: config.worker_channel_size,
|
||||
request_batch_size: config.worker_request_batch_size,
|
||||
},
|
||||
id as WorkerId,
|
||||
config.clone(),
|
||||
log_store.clone(),
|
||||
object_store.clone(),
|
||||
)
|
||||
@@ -126,6 +125,11 @@ impl WorkerGroup {
|
||||
.await
|
||||
}
|
||||
|
||||
/// Returns true if the specific region exists.
|
||||
pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
|
||||
self.worker(region_id).is_region_exists(region_id)
|
||||
}
|
||||
|
||||
/// Get worker for specific `region_id`.
|
||||
fn worker(&self, region_id: RegionId) -> &RegionWorker {
|
||||
let mut hasher = DefaultHasher::new();
|
||||
@@ -141,17 +145,6 @@ fn value_to_index(value: usize, num_workers: usize) -> usize {
|
||||
value & (num_workers - 1)
|
||||
}
|
||||
|
||||
/// Config for region worker.
|
||||
#[derive(Debug, Clone)]
|
||||
struct WorkerConfig {
|
||||
/// Id of the worker
|
||||
id: WorkerId,
|
||||
/// Capacity of the request channel.
|
||||
channel_size: usize,
|
||||
/// Batch size to process request.
|
||||
request_batch_size: usize,
|
||||
}
|
||||
|
||||
/// Worker to write and alter regions bound to it.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct RegionWorker {
|
||||
@@ -170,29 +163,31 @@ pub(crate) struct RegionWorker {
|
||||
impl RegionWorker {
|
||||
/// Start a region worker and its background thread.
|
||||
fn start<S: LogStore>(
|
||||
config: WorkerConfig,
|
||||
id: WorkerId,
|
||||
config: Arc<MitoConfig>,
|
||||
log_store: Arc<S>,
|
||||
object_store: ObjectStore,
|
||||
) -> RegionWorker {
|
||||
let regions = Arc::new(RegionMap::default());
|
||||
let (sender, receiver) = mpsc::channel(config.channel_size);
|
||||
let (sender, receiver) = mpsc::channel(config.worker_channel_size);
|
||||
|
||||
let running = Arc::new(AtomicBool::new(true));
|
||||
let mut worker_thread = RegionWorkerLoop {
|
||||
id: config.id,
|
||||
id,
|
||||
config,
|
||||
regions: regions.clone(),
|
||||
receiver,
|
||||
log_store,
|
||||
object_store,
|
||||
running: running.clone(),
|
||||
request_batch_size: config.request_batch_size,
|
||||
memtable_builder: Arc::new(DefaultMemtableBuilder::default()),
|
||||
};
|
||||
let handle = common_runtime::spawn_bg(async move {
|
||||
worker_thread.run().await;
|
||||
});
|
||||
|
||||
RegionWorker {
|
||||
id: config.id,
|
||||
id,
|
||||
regions,
|
||||
sender,
|
||||
handle: Mutex::new(Some(handle)),
|
||||
@@ -249,6 +244,11 @@ impl RegionWorker {
|
||||
fn set_running(&self, value: bool) {
|
||||
self.running.store(value, Ordering::Relaxed)
|
||||
}
|
||||
|
||||
/// Returns true if the worker contains specific region.
|
||||
fn is_region_exists(&self, region_id: RegionId) -> bool {
|
||||
self.regions.is_region_exists(region_id)
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for RegionWorker {
|
||||
@@ -266,6 +266,8 @@ type RequestBuffer = Vec<WorkerRequest>;
|
||||
struct RegionWorkerLoop<S> {
|
||||
// Id of the worker.
|
||||
id: WorkerId,
|
||||
/// Engine config.
|
||||
config: Arc<MitoConfig>,
|
||||
/// Regions bound to the worker.
|
||||
regions: RegionMapRef,
|
||||
/// Request receiver.
|
||||
@@ -276,8 +278,8 @@ struct RegionWorkerLoop<S> {
|
||||
object_store: ObjectStore,
|
||||
/// Whether the worker thread is still running.
|
||||
running: Arc<AtomicBool>,
|
||||
/// Batch size to fetch requests from channel.
|
||||
request_batch_size: usize,
|
||||
/// Memtable builder for each region.
|
||||
memtable_builder: MemtableBuilderRef,
|
||||
}
|
||||
|
||||
impl<S> RegionWorkerLoop<S> {
|
||||
@@ -286,7 +288,7 @@ impl<S> RegionWorkerLoop<S> {
|
||||
logging::info!("Start region worker thread {}", self.id);
|
||||
|
||||
// Buffer to retrieve requests from receiver.
|
||||
let mut buffer = RequestBuffer::with_capacity(self.request_batch_size);
|
||||
let mut buffer = RequestBuffer::with_capacity(self.config.worker_request_batch_size);
|
||||
|
||||
while self.running.load(Ordering::Relaxed) {
|
||||
// Clear the buffer before handling next batch of requests.
|
||||
@@ -398,7 +400,7 @@ mod tests {
|
||||
async fn test_worker_group_start_stop() {
|
||||
let env = TestEnv::new("group-stop");
|
||||
let group = env
|
||||
.create_worker_group(&MitoConfig {
|
||||
.create_worker_group(MitoConfig {
|
||||
num_workers: 4,
|
||||
..Default::default()
|
||||
})
|
||||
|
||||
@@ -14,17 +14,56 @@
|
||||
|
||||
//! Handling create request.
|
||||
|
||||
use crate::error::Result;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_telemetry::logging;
|
||||
use snafu::ensure;
|
||||
|
||||
use crate::error::{RegionExistsSnafu, Result};
|
||||
use crate::metadata::{RegionMetadataBuilder, INIT_REGION_VERSION};
|
||||
use crate::region::opener::RegionOpener;
|
||||
use crate::worker::request::CreateRequest;
|
||||
use crate::worker::RegionWorkerLoop;
|
||||
|
||||
impl<S> RegionWorkerLoop<S> {
|
||||
pub(crate) async fn handle_create_request(&mut self, _request: CreateRequest) -> Result<()> {
|
||||
// 1. Checks whether the table exists.
|
||||
pub(crate) async fn handle_create_request(&mut self, request: CreateRequest) -> Result<()> {
|
||||
// Checks whether the table exists.
|
||||
if self.regions.is_region_exists(request.region_id) {
|
||||
ensure!(
|
||||
request.create_if_not_exists,
|
||||
RegionExistsSnafu {
|
||||
region_id: request.region_id,
|
||||
}
|
||||
);
|
||||
|
||||
// 2. Convert the request into RegionMetadata
|
||||
// Region already exists.
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// 3. Write manifest
|
||||
unimplemented!()
|
||||
// Convert the request into a RegionMetadata and validate it.
|
||||
let mut builder = RegionMetadataBuilder::new(request.region_id, INIT_REGION_VERSION);
|
||||
for column in request.column_metadatas {
|
||||
builder.push_column_metadata(column);
|
||||
}
|
||||
builder.primary_key(request.primary_key);
|
||||
let metadata = builder.build()?;
|
||||
|
||||
// Create a MitoRegion from the RegionMetadata.
|
||||
let region = RegionOpener::new(
|
||||
metadata,
|
||||
self.memtable_builder.clone(),
|
||||
self.object_store.clone(),
|
||||
)
|
||||
.region_dir(&request.region_dir)
|
||||
.create(&self.config)
|
||||
.await?;
|
||||
|
||||
// TODO(yingwen): Custom the Debug format for the metadata and also print it.
|
||||
logging::info!("A new region created, region_id: {}", region.region_id);
|
||||
|
||||
// Insert the MitoRegion into the RegionMap.
|
||||
self.regions.insert_region(Arc::new(region));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,6 +20,7 @@ use common_base::readable_size::ReadableSize;
|
||||
use store_api::storage::{ColumnId, CompactionStrategy, RegionId};
|
||||
use tokio::sync::oneshot::{self, Receiver, Sender};
|
||||
|
||||
use crate::config::DEFAULT_WRITE_BUFFER_SIZE;
|
||||
use crate::error::Result;
|
||||
use crate::metadata::ColumnMetadata;
|
||||
|
||||
@@ -36,11 +37,23 @@ pub struct RegionOptions {
|
||||
pub compaction_strategy: CompactionStrategy,
|
||||
}
|
||||
|
||||
impl Default for RegionOptions {
|
||||
fn default() -> Self {
|
||||
RegionOptions {
|
||||
write_buffer_size: Some(DEFAULT_WRITE_BUFFER_SIZE),
|
||||
ttl: None,
|
||||
compaction_strategy: CompactionStrategy::LeveledTimeWindow,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Create region request.
|
||||
#[derive(Debug)]
|
||||
pub struct CreateRequest {
|
||||
/// Region to create.
|
||||
pub region_id: RegionId,
|
||||
/// Data directory of the region.
|
||||
pub region_dir: String,
|
||||
/// Columns in this region.
|
||||
pub column_metadatas: Vec<ColumnMetadata>,
|
||||
/// Columns in the primary key.
|
||||
@@ -51,13 +64,6 @@ pub struct CreateRequest {
|
||||
pub options: RegionOptions,
|
||||
}
|
||||
|
||||
impl CreateRequest {
|
||||
/// Validate the request.
|
||||
fn validate(&self) -> Result<()> {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
/// Open region request.
|
||||
#[derive(Debug)]
|
||||
pub struct OpenRequest {
|
||||
|
||||
@@ -39,8 +39,7 @@ pub fn normalize_dir(dir: &str) -> String {
|
||||
pub fn join_dir(parent: &str, child: &str) -> String {
|
||||
// Always adds a `/` to the output path.
|
||||
let output = format!("{parent}/{child}/");
|
||||
// We call opendal's normalize_dir which doesn't push `/` to
|
||||
// the end of path.
|
||||
// We call opendal's normalize_root which keep the last `/`.
|
||||
opendal::raw::normalize_root(&output)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user