feat: support region ddl for custom_storage (#2679)

* feat: support region ddl for custom_storage

* fix: typo

* fix: propagate error

* refactor: have manifest_options accept RegionOptions

* chore: improve document
This commit is contained in:
Niwaka
2023-11-06 20:18:47 +09:00
committed by GitHub
parent f387a09535
commit e9f7579091
10 changed files with 293 additions and 41 deletions

View File

@@ -166,3 +166,35 @@ async fn test_engine_create_with_options() {
region.version().options.ttl.unwrap()
);
}
#[tokio::test]
async fn test_engine_create_with_custom_store() {
let mut env = TestEnv::new();
let engine = env
.create_engine_with_multiple_object_stores(MitoConfig::default(), None, None, &["Gcs"])
.await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new()
.insert_option("storage", "Gcs")
.build();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
assert!(engine.is_region_exists(region_id));
let region = engine.get_region(region_id).unwrap();
let region_dir = region.access_layer.region_dir();
let object_store_manager = env.get_object_store_manager().unwrap();
assert!(object_store_manager
.find("Gcs")
.unwrap()
.is_exist(region_dir)
.await
.unwrap());
assert!(!object_store_manager
.default_object_store()
.is_exist(region_dir)
.await
.unwrap());
}

View File

@@ -23,6 +23,7 @@ use store_api::storage::RegionId;
use crate::config::MitoConfig;
use crate::engine::listener::DropListener;
use crate::engine::MitoEngine;
use crate::test_util::{
build_rows_for_key, flush_region, put_rows, rows_schema, CreateRequestBuilder, TestEnv,
};
@@ -82,3 +83,84 @@ async fn test_engine_drop_region() {
let object_store = env.get_object_store().unwrap();
assert!(!object_store.is_exist(&region_dir).await.unwrap());
}
#[tokio::test]
async fn test_engine_drop_region_for_custom_store() {
common_telemetry::init_default_ut_logging();
async fn setup(engine: &MitoEngine, region_id: RegionId, storage_name: &str) {
let request = CreateRequestBuilder::new()
.insert_option("storage", storage_name)
.region_dir(storage_name)
.build();
let column_schema = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
let rows = Rows {
schema: column_schema.clone(),
rows: build_rows_for_key("a", 0, 2, 0),
};
put_rows(engine, region_id, rows).await;
flush_region(engine, region_id, None).await;
}
let mut env = TestEnv::with_prefix("drop");
let listener = Arc::new(DropListener::new(Duration::from_millis(100)));
let engine = env
.create_engine_with_multiple_object_stores(
MitoConfig::default(),
None,
Some(listener.clone()),
&["Gcs"],
)
.await;
let object_store_manager = env.get_object_store_manager().unwrap();
let global_region_id = RegionId::new(1, 1);
setup(&engine, global_region_id, "default").await;
let custom_region_id = RegionId::new(2, 1);
setup(&engine, custom_region_id, "Gcs").await;
let global_region = engine.get_region(global_region_id).unwrap();
let global_region_dir = global_region.access_layer.region_dir().to_string();
let custom_region = engine.get_region(custom_region_id).unwrap();
let custom_region_dir = custom_region.access_layer.region_dir().to_string();
// Both these regions should exist before dropping the custom region.
assert!(object_store_manager
.find("Gcs")
.unwrap()
.is_exist(&custom_region_dir)
.await
.unwrap());
assert!(object_store_manager
.find("default")
.unwrap()
.is_exist(&global_region_dir)
.await
.unwrap());
// Drop the custom region.
engine
.handle_request(custom_region_id, RegionRequest::Drop(RegionDropRequest {}))
.await
.unwrap();
assert!(!engine.is_region_exists(custom_region_id));
// Wait for drop task.
listener.wait().await;
assert!(!object_store_manager
.find("Gcs")
.unwrap()
.is_exist(&custom_region_dir)
.await
.unwrap());
assert!(object_store_manager
.find("default")
.unwrap()
.is_exist(&global_region_dir)
.await
.unwrap());
}

View File

@@ -172,3 +172,56 @@ async fn test_engine_region_open_with_options() {
region.version().options.ttl.unwrap()
);
}
#[tokio::test]
async fn test_engine_region_open_with_custom_store() {
let mut env = TestEnv::new();
let engine = env
.create_engine_with_multiple_object_stores(MitoConfig::default(), None, None, &["Gcs"])
.await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new()
.insert_option("storage", "Gcs")
.build();
let region_dir = request.region_dir.clone();
// Create a custom region.
engine
.handle_request(region_id, RegionRequest::Create(request.clone()))
.await
.unwrap();
// Close the custom region.
engine
.handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
.await
.unwrap();
// Open the custom region.
engine
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: String::new(),
region_dir,
options: HashMap::from([("storage".to_string(), "Gcs".to_string())]),
}),
)
.await
.unwrap();
// The region should not be opened with the default object store.
let region = engine.get_region(region_id).unwrap();
let object_store_manager = env.get_object_store_manager().unwrap();
assert!(!object_store_manager
.default_object_store()
.is_exist(region.access_layer.region_dir())
.await
.unwrap());
assert!(object_store_manager
.find("Gcs")
.unwrap()
.is_exist(region.access_layer.region_dir())
.await
.unwrap());
}

View File

@@ -142,6 +142,12 @@ pub enum Error {
location: Location,
},
#[snafu(display("Object store not found: {}", object_store))]
ObjectStoreNotFound {
object_store: String,
location: Location,
},
#[snafu(display("Region {} is corrupted, reason: {}", region_id, reason))]
RegionCorrupted {
region_id: RegionId,
@@ -427,7 +433,8 @@ impl ErrorExt for Error {
| CreateDefault { .. }
| InvalidParquet { .. } => StatusCode::Unexpected,
RegionNotFound { .. } => StatusCode::RegionNotFound,
InvalidScanIndex { .. }
ObjectStoreNotFound { .. }
| InvalidScanIndex { .. }
| InvalidMeta { .. }
| InvalidRequest { .. }
| FillDefault { .. }

View File

@@ -21,8 +21,8 @@ use std::sync::Arc;
use common_telemetry::{debug, error, info, warn};
use common_time::util::current_time_millis;
use futures::StreamExt;
use object_store::manager::ObjectStoreManagerRef;
use object_store::util::{join_dir, normalize_dir};
use object_store::ObjectStore;
use snafu::{ensure, OptionExt};
use store_api::logstore::LogStore;
use store_api::metadata::{ColumnMetadata, RegionMetadata};
@@ -31,7 +31,7 @@ use store_api::storage::{ColumnId, RegionId};
use crate::access_layer::AccessLayer;
use crate::cache::CacheManagerRef;
use crate::config::MitoConfig;
use crate::error::{EmptyRegionDirSnafu, RegionCorruptedSnafu, Result};
use crate::error::{EmptyRegionDirSnafu, ObjectStoreNotFoundSnafu, RegionCorruptedSnafu, Result};
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
use crate::memtable::MemtableBuilderRef;
use crate::region::options::RegionOptions;
@@ -48,7 +48,7 @@ pub(crate) struct RegionOpener {
region_id: RegionId,
metadata: Option<RegionMetadata>,
memtable_builder: MemtableBuilderRef,
object_store: ObjectStore,
object_store_manager: ObjectStoreManagerRef,
region_dir: String,
scheduler: SchedulerRef,
options: HashMap<String, String>,
@@ -61,14 +61,14 @@ impl RegionOpener {
region_id: RegionId,
region_dir: &str,
memtable_builder: MemtableBuilderRef,
object_store: ObjectStore,
object_store_manager: ObjectStoreManagerRef,
scheduler: SchedulerRef,
) -> RegionOpener {
RegionOpener {
region_id,
metadata: None,
memtable_builder,
object_store,
object_store_manager,
region_dir: normalize_dir(region_dir),
scheduler,
options: HashMap::new(),
@@ -105,7 +105,6 @@ impl RegionOpener {
wal: &Wal<S>,
) -> Result<MitoRegion> {
let region_id = self.region_id;
let options = self.manifest_options(config);
// Tries to open the region.
match self.maybe_open(config, wal).await {
@@ -136,19 +135,22 @@ impl RegionOpener {
);
}
}
let options = RegionOptions::try_from(&self.options)?;
let object_store = self.object_store(&options.storage)?.clone();
let metadata = Arc::new(self.metadata.unwrap());
// Create a manifest manager for this region and writes regions to the manifest file.
let manifest_manager = RegionManifestManager::new(metadata.clone(), options).await?;
let region_manifest_options = self.manifest_options(config, &options)?;
let metadata = Arc::new(self.metadata.unwrap());
let manifest_manager =
RegionManifestManager::new(metadata.clone(), region_manifest_options).await?;
let mutable = self.memtable_builder.build(&metadata);
let options = RegionOptions::try_from(&self.options)?;
let version = VersionBuilder::new(metadata, mutable)
.options(options)
.build();
let version_control = Arc::new(VersionControl::new(version));
let access_layer = Arc::new(AccessLayer::new(self.region_dir, self.object_store.clone()));
let access_layer = Arc::new(AccessLayer::new(self.region_dir, object_store));
Ok(MitoRegion {
region_id,
@@ -203,8 +205,10 @@ impl RegionOpener {
config: &MitoConfig,
wal: &Wal<S>,
) -> Result<Option<MitoRegion>> {
let options = self.manifest_options(config);
let Some(manifest_manager) = RegionManifestManager::open(options).await? else {
let region_options = RegionOptions::try_from(&self.options)?;
let region_manifest_options = self.manifest_options(config, &region_options)?;
let Some(manifest_manager) = RegionManifestManager::open(region_manifest_options).await?
else {
return Ok(None);
};
@@ -212,24 +216,21 @@ impl RegionOpener {
let metadata = manifest.metadata.clone();
let region_id = self.region_id;
let access_layer = Arc::new(AccessLayer::new(
self.region_dir.clone(),
self.object_store.clone(),
));
let object_store = self.object_store(&region_options.storage)?.clone();
let access_layer = Arc::new(AccessLayer::new(self.region_dir.clone(), object_store));
let file_purger = Arc::new(LocalFilePurger::new(
self.scheduler.clone(),
access_layer.clone(),
self.cache_manager.clone(),
));
let mutable = self.memtable_builder.build(&metadata);
let options = RegionOptions::try_from(&self.options)?;
let version = VersionBuilder::new(metadata, mutable)
.add_files(file_purger.clone(), manifest.files.values().cloned())
.flushed_entry_id(manifest.flushed_entry_id)
.flushed_sequence(manifest.flushed_sequence)
.truncated_entry_id(manifest.truncated_entry_id)
.compaction_time_window(manifest.compaction_time_window)
.options(options)
.options(region_options)
.build();
let flushed_entry_id = version.flushed_entry_id;
let version_control = Arc::new(VersionControl::new(version));
@@ -249,12 +250,31 @@ impl RegionOpener {
}
/// Returns a new manifest options.
fn manifest_options(&self, config: &MitoConfig) -> RegionManifestOptions {
RegionManifestOptions {
fn manifest_options(
&self,
config: &MitoConfig,
options: &RegionOptions,
) -> Result<RegionManifestOptions> {
let object_store = self.object_store(&options.storage)?.clone();
Ok(RegionManifestOptions {
manifest_dir: new_manifest_dir(&self.region_dir),
object_store: self.object_store.clone(),
object_store,
compress_type: config.manifest_compress_type,
checkpoint_distance: config.manifest_checkpoint_distance,
})
}
/// Returns an object store corresponding to `name`. If `name` is `None`, this method returns the default object store.
fn object_store(&self, name: &Option<String>) -> Result<&object_store::ObjectStore> {
if let Some(name) = name {
Ok(self
.object_store_manager
.find(name)
.context(ObjectStoreNotFoundSnafu {
object_store: name.to_string(),
})?)
} else {
Ok(self.object_store_manager.default_object_store())
}
}
}

View File

@@ -35,6 +35,8 @@ pub struct RegionOptions {
pub ttl: Option<Duration>,
/// Compaction options.
pub compaction: CompactionOptions,
/// Custom storage.
pub storage: Option<String>,
}
impl TryFrom<&HashMap<String, String>> for RegionOptions {
@@ -54,6 +56,7 @@ impl TryFrom<&HashMap<String, String>> for RegionOptions {
Ok(RegionOptions {
ttl: options.ttl,
compaction,
storage: options.storage,
})
}
}
@@ -124,12 +127,16 @@ struct RegionOptionsWithoutEnum {
/// Region SST files TTL.
#[serde(with = "humantime_serde")]
ttl: Option<Duration>,
storage: Option<String>,
}
impl Default for RegionOptionsWithoutEnum {
fn default() -> Self {
let options = RegionOptions::default();
RegionOptionsWithoutEnum { ttl: options.ttl }
RegionOptionsWithoutEnum {
ttl: options.ttl,
storage: options.storage,
}
}
}
@@ -181,6 +188,17 @@ mod tests {
assert_eq!(expect, options);
}
#[test]
fn test_with_storage() {
let map = make_map(&[("storage", "S3")]);
let options = RegionOptions::try_from(&map).unwrap();
let expect = RegionOptions {
storage: Some("s3".to_string()),
..Default::default()
};
assert_eq!(expect, options);
}
#[test]
fn test_without_compaction_type() {
// If `compaction.type` is not provided, we ignore all compaction
@@ -222,6 +240,7 @@ mod tests {
("compaction.twcs.max_inactive_window_files", "2"),
("compaction.twcs.time_window", "2h"),
("compaction.type", "twcs"),
("storage", "S3"),
]);
let options = RegionOptions::try_from(&map).unwrap();
let expect = RegionOptions {
@@ -231,6 +250,7 @@ mod tests {
max_inactive_window_files: 2,
time_window: Some(Duration::from_secs(3600 * 2)),
}),
storage: Some("s3".to_string()),
};
assert_eq!(expect, options);
}

View File

@@ -124,6 +124,10 @@ impl TestEnv {
self.data_home.path()
}
pub fn get_object_store_manager(&self) -> Option<Arc<ObjectStoreManager>> {
self.object_store_manager.clone()
}
/// Creates a new engine with specific config under this env.
pub async fn create_engine(&mut self, config: MitoConfig) -> MitoEngine {
let (log_store, object_store_manager) = self.create_log_and_object_store_manager().await;
@@ -151,6 +155,35 @@ impl TestEnv {
MitoEngine::new_for_test(config, logstore, object_store_manager, manager, listener)
}
pub async fn create_engine_with_multiple_object_stores(
&mut self,
config: MitoConfig,
manager: Option<WriteBufferManagerRef>,
listener: Option<EventListenerRef>,
custom_storage_names: &[&str],
) -> MitoEngine {
let (logstore, mut object_store_manager) = self.create_log_and_object_store_manager().await;
for storage_name in custom_storage_names {
let data_path = self
.data_home
.path()
.join("data")
.join(storage_name)
.as_path()
.display()
.to_string();
let mut builder = Fs::default();
builder.root(&data_path);
let object_store = ObjectStore::new(builder).unwrap().finish();
object_store_manager.add(storage_name, object_store);
}
let logstore = Arc::new(logstore);
let object_store_manager = Arc::new(object_store_manager);
self.logstore = Some(logstore.clone());
self.object_store_manager = Some(object_store_manager.clone());
MitoEngine::new_for_test(config, logstore, object_store_manager, manager, listener)
}
/// Reopen the engine.
pub async fn reopen_engine(&mut self, engine: MitoEngine, config: MitoConfig) -> MitoEngine {
engine.stop().await.unwrap();

View File

@@ -55,13 +55,12 @@ impl<S: LogStore> RegionWorkerLoop<S> {
}
builder.primary_key(request.primary_key);
let metadata = builder.build().context(InvalidMetadataSnafu)?;
// Create a MitoRegion from the RegionMetadata.
let region = RegionOpener::new(
region_id,
&request.region_dir,
self.memtable_builder.clone(),
self.object_store_manager.default_object_store().clone(),
self.object_store_manager.clone(),
self.scheduler.clone(),
)
.metadata(metadata)

View File

@@ -19,12 +19,12 @@ use std::sync::Arc;
use common_query::Output;
use common_telemetry::info;
use object_store::util::join_path;
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};
use store_api::logstore::LogStore;
use store_api::region_request::RegionOpenRequest;
use store_api::storage::RegionId;
use crate::error::{OpenDalSnafu, RegionNotFoundSnafu, Result};
use crate::error::{ObjectStoreNotFoundSnafu, OpenDalSnafu, RegionNotFoundSnafu, Result};
use crate::metrics::REGION_COUNT;
use crate::region::opener::RegionOpener;
use crate::worker::handle_drop::remove_region_dir_once;
@@ -39,21 +39,23 @@ impl<S: LogStore> RegionWorkerLoop<S> {
if self.regions.is_region_exists(region_id) {
return Ok(Output::AffectedRows(0));
}
let object_store = if let Some(storage_name) = request.options.get("storage") {
self.object_store_manager
.find(storage_name)
.context(ObjectStoreNotFoundSnafu {
object_store: storage_name.to_string(),
})?
} else {
self.object_store_manager.default_object_store()
};
// Check if this region is pending drop. And clean the entire dir if so.
if !self.dropping_regions.is_region_exists(region_id)
&& self
.object_store_manager
.default_object_store()
&& object_store
.is_exist(&join_path(&request.region_dir, DROPPING_MARKER_FILE))
.await
.context(OpenDalSnafu)?
{
let result = remove_region_dir_once(
&request.region_dir,
self.object_store_manager.default_object_store(),
)
.await;
let result = remove_region_dir_once(&request.region_dir, object_store).await;
info!("Region {} is dropped, result: {:?}", region_id, result);
return RegionNotFoundSnafu { region_id }.fail();
}
@@ -65,7 +67,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
region_id,
&request.region_dir,
self.memtable_builder.clone(),
self.object_store_manager.default_object_store().clone(),
self.object_store_manager.clone(),
self.scheduler.clone(),
)
.options(request.options)

View File

@@ -21,6 +21,7 @@ pub type ObjectStoreManagerRef = Arc<ObjectStoreManager>;
/// Manages multiple object stores so that users can configure a storage for each table.
/// This struct certainly have one default object store, and can have zero or more custom object stores.
#[derive(Debug)]
pub struct ObjectStoreManager {
stores: HashMap<String, ObjectStore>,
default_object_store: ObjectStore,
@@ -30,19 +31,19 @@ impl ObjectStoreManager {
/// Creates a new manager from the object store used as a default one.
pub fn new(name: &str, object_store: ObjectStore) -> Self {
ObjectStoreManager {
stores: [(name.to_string(), object_store.clone())].into(),
stores: [(name.to_lowercase(), object_store.clone())].into(),
default_object_store: object_store,
}
}
/// Adds an object store to the manager.
pub fn add(&mut self, name: &str, object_store: ObjectStore) {
self.stores.insert(name.to_string(), object_store);
self.stores.insert(name.to_lowercase(), object_store);
}
/// Finds an object store corresponding to the name.
pub fn find(&self, name: &str) -> Option<&ObjectStore> {
self.stores.get(name)
self.stores.get(&name.to_lowercase())
}
pub fn default_object_store(&self) -> &ObjectStore {
@@ -68,10 +69,12 @@ mod tests {
#[test]
fn test_manager_behavior() {
let dir = create_temp_dir("default");
let mut manager = ObjectStoreManager::new("default", new_object_store(&dir));
let mut manager = ObjectStoreManager::new("Default", new_object_store(&dir));
assert!(manager.find("default").is_some());
assert!(manager.find("Default").is_some());
assert!(manager.find("Gcs").is_none());
assert!(manager.find("gcs").is_none());
let dir = create_temp_dir("default");
manager.add("Gcs", new_object_store(&dir));
@@ -79,5 +82,6 @@ mod tests {
// Should not overwrite the default object store with the new one.
assert!(manager.find("default").is_some());
assert!(manager.find("Gcs").is_some());
assert!(manager.find("gcs").is_some());
}
}