mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-29 11:20:38 +00:00
refactor: open_region return None if region does not exist (#145)
* refactor: open_region return None if region does not exist * fix some unit tests * fix some CR comments
This commit is contained in:
@@ -43,7 +43,7 @@ impl<S: LogStore> StorageEngine for EngineImpl<S> {
|
||||
_ctx: &EngineContext,
|
||||
name: &str,
|
||||
opts: &OpenOptions,
|
||||
) -> Result<Self::Region> {
|
||||
) -> Result<Option<Self::Region>> {
|
||||
self.inner.open_region(name, opts).await
|
||||
}
|
||||
|
||||
@@ -247,24 +247,25 @@ impl<S: LogStore> EngineInner<S> {
|
||||
None
|
||||
}
|
||||
|
||||
async fn open_region(&self, name: &str, opts: &OpenOptions) -> Result<RegionImpl<S>> {
|
||||
async fn open_region(&self, name: &str, opts: &OpenOptions) -> Result<Option<RegionImpl<S>>> {
|
||||
// We can wait until the state of the slot has been changed to ready, but this will
|
||||
// make the code more complicate, so we just return the error here.
|
||||
if let Some(slot) = self.get_or_occupy_slot(name, RegionSlot::Opening) {
|
||||
return slot.try_get_ready_region();
|
||||
return slot.try_get_ready_region().map(Some);
|
||||
}
|
||||
|
||||
let mut guard = SlotGuard::new(name, &self.regions);
|
||||
|
||||
// FIXME(yingwen): Get region id or remove dependency of region id.
|
||||
let store_config = self.region_store_config(name);
|
||||
let region = RegionImpl::open(name.to_string(), store_config, opts).await?;
|
||||
|
||||
let region = match RegionImpl::open(name.to_string(), store_config, opts).await? {
|
||||
None => return Ok(None),
|
||||
Some(v) => v,
|
||||
};
|
||||
guard.update(RegionSlot::Ready(region.clone()));
|
||||
|
||||
info!("Storage engine open region {:?}", ®ion);
|
||||
|
||||
Ok(region)
|
||||
Ok(Some(region))
|
||||
}
|
||||
|
||||
async fn create_region(&self, descriptor: RegionDescriptor) -> Result<RegionImpl<S>> {
|
||||
|
||||
@@ -7,7 +7,7 @@ use std::sync::Arc;
|
||||
use async_trait::async_trait;
|
||||
use common_telemetry::logging;
|
||||
use datatypes::schema::SchemaRef;
|
||||
use snafu::{ensure, OptionExt};
|
||||
use snafu::ensure;
|
||||
use store_api::logstore::LogStore;
|
||||
use store_api::manifest::{
|
||||
self, action::ProtocolAction, Manifest, ManifestVersion, MetaActionIterator,
|
||||
@@ -146,9 +146,12 @@ impl<S: LogStore> RegionImpl<S> {
|
||||
name: String,
|
||||
store_config: StoreConfig<S>,
|
||||
_opts: &OpenOptions,
|
||||
) -> Result<RegionImpl<S>> {
|
||||
) -> Result<Option<RegionImpl<S>>> {
|
||||
// Load version meta data from manifest.
|
||||
let version = Self::recover_from_manifest(&name, &store_config.manifest).await?;
|
||||
let version = match Self::recover_from_manifest(&store_config.manifest).await? {
|
||||
None => return Ok(None),
|
||||
Some(version) => version,
|
||||
};
|
||||
|
||||
logging::debug!(
|
||||
"Region recovered version from manifest, version: {:?}",
|
||||
@@ -187,13 +190,10 @@ impl<S: LogStore> RegionImpl<S> {
|
||||
manifest: store_config.manifest,
|
||||
});
|
||||
|
||||
Ok(RegionImpl { inner })
|
||||
Ok(Some(RegionImpl { inner }))
|
||||
}
|
||||
|
||||
async fn recover_from_manifest(
|
||||
region_name: &str,
|
||||
manifest: &RegionManifest,
|
||||
) -> Result<Version> {
|
||||
async fn recover_from_manifest(manifest: &RegionManifest) -> Result<Option<Version>> {
|
||||
let (start, end) = Self::manifest_scan_range();
|
||||
let mut iter = manifest.scan(start, end).await?;
|
||||
|
||||
@@ -236,7 +236,7 @@ impl<S: LogStore> RegionImpl<S> {
|
||||
manifest.update_state(last_manifest_version + 1, protocol.clone());
|
||||
}
|
||||
|
||||
version.context(error::VersionNotFoundSnafu { region_name })
|
||||
Ok(version)
|
||||
}
|
||||
|
||||
fn manifest_scan_range() -> (ManifestVersion, ManifestVersion) {
|
||||
|
||||
@@ -184,15 +184,13 @@ async fn test_recover_region_manifets() {
|
||||
);
|
||||
|
||||
let manifest = RegionManifest::new("/manifest/", object_store);
|
||||
let region_name = "region-0";
|
||||
let region_meta = Arc::new(build_region_meta());
|
||||
|
||||
// Recover from empty
|
||||
assert!(
|
||||
RegionImpl::<NoopLogStore>::recover_from_manifest(region_name, &manifest)
|
||||
.await
|
||||
.is_err()
|
||||
);
|
||||
assert!(RegionImpl::<NoopLogStore>::recover_from_manifest(&manifest)
|
||||
.await
|
||||
.unwrap()
|
||||
.is_none());
|
||||
|
||||
{
|
||||
// save some actions into region_meta
|
||||
@@ -215,8 +213,9 @@ async fn test_recover_region_manifets() {
|
||||
}
|
||||
|
||||
// try to recover
|
||||
let version = RegionImpl::<NoopLogStore>::recover_from_manifest(region_name, &manifest)
|
||||
let version = RegionImpl::<NoopLogStore>::recover_from_manifest(&manifest)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(*version.metadata(), region_meta);
|
||||
assert_eq!(version.flushed_sequence(), 2);
|
||||
|
||||
@@ -54,17 +54,21 @@ impl Tester {
|
||||
self.try_reopen().await.unwrap();
|
||||
}
|
||||
|
||||
async fn try_reopen(&mut self) -> Result<()> {
|
||||
async fn try_reopen(&mut self) -> Result<bool> {
|
||||
// Close the old region.
|
||||
self.base = None;
|
||||
// Reopen the region.
|
||||
let store_config = config_util::new_store_config(&self.region_name, &self.store_dir).await;
|
||||
let opts = OpenOptions::default();
|
||||
let region = RegionImpl::open(self.region_name.clone(), store_config, &opts).await?;
|
||||
let base = FileTesterBase::with_region(region);
|
||||
self.base = Some(base);
|
||||
|
||||
Ok(())
|
||||
match region {
|
||||
None => Ok(false),
|
||||
Some(region) => {
|
||||
let base = FileTesterBase::with_region(region);
|
||||
self.base = Some(base);
|
||||
Ok(true)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
@@ -156,6 +160,5 @@ async fn test_open_empty() {
|
||||
let mut tester = Tester::empty(REGION_NAME, store_dir).await;
|
||||
|
||||
let ret = tester.try_reopen().await;
|
||||
// TODO(yingwen): Also check status code.
|
||||
assert!(ret.is_err());
|
||||
assert!(!ret.unwrap());
|
||||
}
|
||||
|
||||
@@ -16,13 +16,13 @@ pub trait StorageEngine: Send + Sync + Clone + 'static {
|
||||
type Error: ErrorExt + Send + Sync;
|
||||
type Region: Region;
|
||||
|
||||
/// Opens an existing region.
|
||||
/// Opens an existing region. Returns `Ok(None)` if region does not exists.
|
||||
async fn open_region(
|
||||
&self,
|
||||
ctx: &EngineContext,
|
||||
name: &str,
|
||||
opts: &OpenOptions,
|
||||
) -> Result<Self::Region, Self::Error>;
|
||||
) -> Result<Option<Self::Region>, Self::Error>;
|
||||
|
||||
/// Closes given region.
|
||||
async fn close_region(
|
||||
@@ -33,7 +33,7 @@ pub trait StorageEngine: Send + Sync + Clone + 'static {
|
||||
|
||||
/// Creates and returns the created region.
|
||||
///
|
||||
/// Returns exsiting region if region with same name already exists. The region will
|
||||
/// Returns existing region if region with same name already exists. The region will
|
||||
/// be opened before returning.
|
||||
async fn create_region(
|
||||
&self,
|
||||
|
||||
@@ -61,7 +61,7 @@ impl<S: StorageEngine> TableEngine for MitoEngine<S> {
|
||||
&self,
|
||||
ctx: &EngineContext,
|
||||
request: OpenTableRequest,
|
||||
) -> TableResult<TableRef> {
|
||||
) -> TableResult<Option<TableRef>> {
|
||||
Ok(self.inner.open_table(ctx, request).await?)
|
||||
}
|
||||
|
||||
@@ -303,11 +303,11 @@ impl<S: StorageEngine> MitoEngineInner<S> {
|
||||
&self,
|
||||
_ctx: &EngineContext,
|
||||
request: OpenTableRequest,
|
||||
) -> TableResult<TableRef> {
|
||||
) -> TableResult<Option<TableRef>> {
|
||||
let table_name = &request.table_name;
|
||||
if let Some(table) = self.get_table(table_name) {
|
||||
// Table has already been opened.
|
||||
return Ok(table);
|
||||
return Ok(Some(table));
|
||||
}
|
||||
|
||||
// Acquires the mutex before opening a new table.
|
||||
@@ -315,19 +315,24 @@ impl<S: StorageEngine> MitoEngineInner<S> {
|
||||
let _lock = self.table_mutex.lock().await;
|
||||
// Checks again, read lock should be enough since we are guarded by the mutex.
|
||||
if let Some(table) = self.get_table(table_name) {
|
||||
return Ok(table);
|
||||
return Ok(Some(table));
|
||||
}
|
||||
|
||||
let engine_ctx = storage::EngineContext::default();
|
||||
let opts = OpenOptions::default();
|
||||
let region_name = table_name;
|
||||
// Now we just use table name as region name. TODO(yingwen): Naming pattern of region.
|
||||
let region = self
|
||||
|
||||
let region = match self
|
||||
.storage_engine
|
||||
.open_region(&engine_ctx, region_name, &opts)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::OpenRegionSnafu { region_name })?;
|
||||
.context(error::OpenRegionSnafu { region_name })?
|
||||
{
|
||||
None => return Ok(None),
|
||||
Some(region) => region,
|
||||
};
|
||||
|
||||
//FIXME(boyan): recover table meta from table manifest
|
||||
let table_meta = TableMetaBuilder::default()
|
||||
@@ -351,8 +356,7 @@ impl<S: StorageEngine> MitoEngineInner<S> {
|
||||
.write()
|
||||
.unwrap()
|
||||
.insert(table_name.to_string(), table.clone());
|
||||
|
||||
table
|
||||
Some(table as _)
|
||||
};
|
||||
|
||||
logging::info!("Mito engine opened table {}", table_name);
|
||||
@@ -445,6 +449,7 @@ mod tests {
|
||||
let reopened = table_engine
|
||||
.open_table(&ctx, open_req.clone())
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(table.schema(), reopened.schema());
|
||||
|
||||
@@ -456,6 +461,7 @@ mod tests {
|
||||
let reopened = table_engine
|
||||
.open_table(&ctx, open_req.clone())
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(table.schema(), reopened.schema());
|
||||
}
|
||||
|
||||
@@ -127,19 +127,19 @@ impl StorageEngine for MockEngine {
|
||||
_ctx: &EngineContext,
|
||||
name: &str,
|
||||
_opts: &OpenOptions,
|
||||
) -> Result<MockRegion> {
|
||||
) -> Result<Option<MockRegion>> {
|
||||
logging::info!("Mock engine create region, name: {}", name);
|
||||
|
||||
let mut regions = self.regions.lock().unwrap();
|
||||
if let Some(region) = regions.opened_regions.get(name) {
|
||||
return Ok(region.clone());
|
||||
return Ok(Some(region.clone()));
|
||||
}
|
||||
|
||||
if let Some(region) = regions.closed_regions.remove(name) {
|
||||
regions
|
||||
.opened_regions
|
||||
.insert(name.to_string(), region.clone());
|
||||
return Ok(region);
|
||||
return Ok(Some(region));
|
||||
}
|
||||
|
||||
Err(MockError::with_backtrace(StatusCode::Unexpected))
|
||||
|
||||
@@ -16,8 +16,13 @@ pub trait TableEngine: Send + Sync {
|
||||
request: CreateTableRequest,
|
||||
) -> Result<TableRef>;
|
||||
|
||||
/// Open an existing table by given `request`, returns the opened table.
|
||||
async fn open_table(&self, ctx: &EngineContext, request: OpenTableRequest) -> Result<TableRef>;
|
||||
/// Open an existing table by given `request`, returns the opened table. If the table does not
|
||||
/// exist, returns an `Ok(None)`.
|
||||
async fn open_table(
|
||||
&self,
|
||||
ctx: &EngineContext,
|
||||
request: OpenTableRequest,
|
||||
) -> Result<Option<TableRef>>;
|
||||
|
||||
/// Alter table schema, options etc. by given request,
|
||||
///
|
||||
|
||||
Reference in New Issue
Block a user