From 80372720bbe413f532c15e9d1c5bfd5bd221e227 Mon Sep 17 00:00:00 2001 From: "Lei, Huang" <6406592+v0y4g3r@users.noreply.github.com> Date: Mon, 8 Aug 2022 16:53:52 +0800 Subject: [PATCH] refactor: open_region return None if region does not exist (#145) * refactor: open_region return None if region does not exist * fix some unit tests * fix some CR comments --- src/storage/src/engine.rs | 15 +++++++------ src/storage/src/region.rs | 18 +++++++-------- src/storage/src/region/tests.rs | 13 +++++------ src/storage/src/region/tests/basic.rs | 17 ++++++++------ src/store-api/src/storage/engine.rs | 6 ++--- src/table-engine/src/engine.rs | 22 ++++++++++++------- .../src/table/test_util/mock_engine.rs | 6 ++--- src/table/src/engine.rs | 9 ++++++-- 8 files changed, 60 insertions(+), 46 deletions(-) diff --git a/src/storage/src/engine.rs b/src/storage/src/engine.rs index fa2552f954..0f774b839c 100644 --- a/src/storage/src/engine.rs +++ b/src/storage/src/engine.rs @@ -43,7 +43,7 @@ impl StorageEngine for EngineImpl { _ctx: &EngineContext, name: &str, opts: &OpenOptions, - ) -> Result { + ) -> Result> { self.inner.open_region(name, opts).await } @@ -247,24 +247,25 @@ impl EngineInner { None } - async fn open_region(&self, name: &str, opts: &OpenOptions) -> Result> { + async fn open_region(&self, name: &str, opts: &OpenOptions) -> Result>> { // We can wait until the state of the slot has been changed to ready, but this will // make the code more complicate, so we just return the error here. if let Some(slot) = self.get_or_occupy_slot(name, RegionSlot::Opening) { - return slot.try_get_ready_region(); + return slot.try_get_ready_region().map(Some); } let mut guard = SlotGuard::new(name, &self.regions); // FIXME(yingwen): Get region id or remove dependency of region id. let store_config = self.region_store_config(name); - let region = RegionImpl::open(name.to_string(), store_config, opts).await?; + let region = match RegionImpl::open(name.to_string(), store_config, opts).await? { + None => return Ok(None), + Some(v) => v, + }; guard.update(RegionSlot::Ready(region.clone())); - info!("Storage engine open region {:?}", ®ion); - - Ok(region) + Ok(Some(region)) } async fn create_region(&self, descriptor: RegionDescriptor) -> Result> { diff --git a/src/storage/src/region.rs b/src/storage/src/region.rs index eaee5d6970..09a30c2bda 100644 --- a/src/storage/src/region.rs +++ b/src/storage/src/region.rs @@ -7,7 +7,7 @@ use std::sync::Arc; use async_trait::async_trait; use common_telemetry::logging; use datatypes::schema::SchemaRef; -use snafu::{ensure, OptionExt}; +use snafu::ensure; use store_api::logstore::LogStore; use store_api::manifest::{ self, action::ProtocolAction, Manifest, ManifestVersion, MetaActionIterator, @@ -146,9 +146,12 @@ impl RegionImpl { name: String, store_config: StoreConfig, _opts: &OpenOptions, - ) -> Result> { + ) -> Result>> { // Load version meta data from manifest. - let version = Self::recover_from_manifest(&name, &store_config.manifest).await?; + let version = match Self::recover_from_manifest(&store_config.manifest).await? { + None => return Ok(None), + Some(version) => version, + }; logging::debug!( "Region recovered version from manifest, version: {:?}", @@ -187,13 +190,10 @@ impl RegionImpl { manifest: store_config.manifest, }); - Ok(RegionImpl { inner }) + Ok(Some(RegionImpl { inner })) } - async fn recover_from_manifest( - region_name: &str, - manifest: &RegionManifest, - ) -> Result { + async fn recover_from_manifest(manifest: &RegionManifest) -> Result> { let (start, end) = Self::manifest_scan_range(); let mut iter = manifest.scan(start, end).await?; @@ -236,7 +236,7 @@ impl RegionImpl { manifest.update_state(last_manifest_version + 1, protocol.clone()); } - version.context(error::VersionNotFoundSnafu { region_name }) + Ok(version) } fn manifest_scan_range() -> (ManifestVersion, ManifestVersion) { diff --git a/src/storage/src/region/tests.rs b/src/storage/src/region/tests.rs index ce1fbf0946..7b7f7ce969 100644 --- a/src/storage/src/region/tests.rs +++ b/src/storage/src/region/tests.rs @@ -184,15 +184,13 @@ async fn test_recover_region_manifets() { ); let manifest = RegionManifest::new("/manifest/", object_store); - let region_name = "region-0"; let region_meta = Arc::new(build_region_meta()); // Recover from empty - assert!( - RegionImpl::::recover_from_manifest(region_name, &manifest) - .await - .is_err() - ); + assert!(RegionImpl::::recover_from_manifest(&manifest) + .await + .unwrap() + .is_none()); { // save some actions into region_meta @@ -215,8 +213,9 @@ async fn test_recover_region_manifets() { } // try to recover - let version = RegionImpl::::recover_from_manifest(region_name, &manifest) + let version = RegionImpl::::recover_from_manifest(&manifest) .await + .unwrap() .unwrap(); assert_eq!(*version.metadata(), region_meta); assert_eq!(version.flushed_sequence(), 2); diff --git a/src/storage/src/region/tests/basic.rs b/src/storage/src/region/tests/basic.rs index 29a7d37772..c74e99b16d 100644 --- a/src/storage/src/region/tests/basic.rs +++ b/src/storage/src/region/tests/basic.rs @@ -54,17 +54,21 @@ impl Tester { self.try_reopen().await.unwrap(); } - async fn try_reopen(&mut self) -> Result<()> { + async fn try_reopen(&mut self) -> Result { // Close the old region. self.base = None; // Reopen the region. let store_config = config_util::new_store_config(&self.region_name, &self.store_dir).await; let opts = OpenOptions::default(); let region = RegionImpl::open(self.region_name.clone(), store_config, &opts).await?; - let base = FileTesterBase::with_region(region); - self.base = Some(base); - - Ok(()) + match region { + None => Ok(false), + Some(region) => { + let base = FileTesterBase::with_region(region); + self.base = Some(base); + Ok(true) + } + } } #[inline] @@ -156,6 +160,5 @@ async fn test_open_empty() { let mut tester = Tester::empty(REGION_NAME, store_dir).await; let ret = tester.try_reopen().await; - // TODO(yingwen): Also check status code. - assert!(ret.is_err()); + assert!(!ret.unwrap()); } diff --git a/src/store-api/src/storage/engine.rs b/src/store-api/src/storage/engine.rs index db97d5d2e1..2f6c6f9950 100644 --- a/src/store-api/src/storage/engine.rs +++ b/src/store-api/src/storage/engine.rs @@ -16,13 +16,13 @@ pub trait StorageEngine: Send + Sync + Clone + 'static { type Error: ErrorExt + Send + Sync; type Region: Region; - /// Opens an existing region. + /// Opens an existing region. Returns `Ok(None)` if region does not exists. async fn open_region( &self, ctx: &EngineContext, name: &str, opts: &OpenOptions, - ) -> Result; + ) -> Result, Self::Error>; /// Closes given region. async fn close_region( @@ -33,7 +33,7 @@ pub trait StorageEngine: Send + Sync + Clone + 'static { /// Creates and returns the created region. /// - /// Returns exsiting region if region with same name already exists. The region will + /// Returns existing region if region with same name already exists. The region will /// be opened before returning. async fn create_region( &self, diff --git a/src/table-engine/src/engine.rs b/src/table-engine/src/engine.rs index 2cb16706df..53295a7e45 100644 --- a/src/table-engine/src/engine.rs +++ b/src/table-engine/src/engine.rs @@ -61,7 +61,7 @@ impl TableEngine for MitoEngine { &self, ctx: &EngineContext, request: OpenTableRequest, - ) -> TableResult { + ) -> TableResult> { Ok(self.inner.open_table(ctx, request).await?) } @@ -303,11 +303,11 @@ impl MitoEngineInner { &self, _ctx: &EngineContext, request: OpenTableRequest, - ) -> TableResult { + ) -> TableResult> { let table_name = &request.table_name; if let Some(table) = self.get_table(table_name) { // Table has already been opened. - return Ok(table); + return Ok(Some(table)); } // Acquires the mutex before opening a new table. @@ -315,19 +315,24 @@ impl MitoEngineInner { let _lock = self.table_mutex.lock().await; // Checks again, read lock should be enough since we are guarded by the mutex. if let Some(table) = self.get_table(table_name) { - return Ok(table); + return Ok(Some(table)); } let engine_ctx = storage::EngineContext::default(); let opts = OpenOptions::default(); let region_name = table_name; // Now we just use table name as region name. TODO(yingwen): Naming pattern of region. - let region = self + + let region = match self .storage_engine .open_region(&engine_ctx, region_name, &opts) .await .map_err(BoxedError::new) - .context(error::OpenRegionSnafu { region_name })?; + .context(error::OpenRegionSnafu { region_name })? + { + None => return Ok(None), + Some(region) => region, + }; //FIXME(boyan): recover table meta from table manifest let table_meta = TableMetaBuilder::default() @@ -351,8 +356,7 @@ impl MitoEngineInner { .write() .unwrap() .insert(table_name.to_string(), table.clone()); - - table + Some(table as _) }; logging::info!("Mito engine opened table {}", table_name); @@ -445,6 +449,7 @@ mod tests { let reopened = table_engine .open_table(&ctx, open_req.clone()) .await + .unwrap() .unwrap(); assert_eq!(table.schema(), reopened.schema()); @@ -456,6 +461,7 @@ mod tests { let reopened = table_engine .open_table(&ctx, open_req.clone()) .await + .unwrap() .unwrap(); assert_eq!(table.schema(), reopened.schema()); } diff --git a/src/table-engine/src/table/test_util/mock_engine.rs b/src/table-engine/src/table/test_util/mock_engine.rs index c75222301b..a446e7be33 100644 --- a/src/table-engine/src/table/test_util/mock_engine.rs +++ b/src/table-engine/src/table/test_util/mock_engine.rs @@ -127,19 +127,19 @@ impl StorageEngine for MockEngine { _ctx: &EngineContext, name: &str, _opts: &OpenOptions, - ) -> Result { + ) -> Result> { logging::info!("Mock engine create region, name: {}", name); let mut regions = self.regions.lock().unwrap(); if let Some(region) = regions.opened_regions.get(name) { - return Ok(region.clone()); + return Ok(Some(region.clone())); } if let Some(region) = regions.closed_regions.remove(name) { regions .opened_regions .insert(name.to_string(), region.clone()); - return Ok(region); + return Ok(Some(region)); } Err(MockError::with_backtrace(StatusCode::Unexpected)) diff --git a/src/table/src/engine.rs b/src/table/src/engine.rs index aad991ed80..0f12bc08cb 100644 --- a/src/table/src/engine.rs +++ b/src/table/src/engine.rs @@ -16,8 +16,13 @@ pub trait TableEngine: Send + Sync { request: CreateTableRequest, ) -> Result; - /// Open an existing table by given `request`, returns the opened table. - async fn open_table(&self, ctx: &EngineContext, request: OpenTableRequest) -> Result; + /// Open an existing table by given `request`, returns the opened table. If the table does not + /// exist, returns an `Ok(None)`. + async fn open_table( + &self, + ctx: &EngineContext, + request: OpenTableRequest, + ) -> Result>; /// Alter table schema, options etc. by given request, ///