diff --git a/src/log-store/src/fs/namespace.rs b/src/log-store/src/fs/namespace.rs index 536f31260d..ac29167413 100644 --- a/src/log-store/src/fs/namespace.rs +++ b/src/log-store/src/fs/namespace.rs @@ -9,21 +9,19 @@ pub struct LocalNamespace { impl Default for LocalNamespace { fn default() -> Self { - LocalNamespace::new("", 0) + LocalNamespace::new("") } } #[derive(Debug)] struct LocalNamespaceInner { name: String, - id: u64, } impl Namespace for LocalNamespace { - fn new(name: &str, id: u64) -> Self { + fn new(name: &str) -> Self { let inner = Arc::new(LocalNamespaceInner { name: name.to_string(), - id, }); Self { inner } } @@ -32,10 +30,3 @@ impl Namespace for LocalNamespace { self.inner.name.as_str() } } - -#[allow(dead_code)] -impl LocalNamespace { - fn id(&self) -> u64 { - self.inner.id - } -} diff --git a/src/storage/src/engine.rs b/src/storage/src/engine.rs index c85922b0fd..158031c7b3 100644 --- a/src/storage/src/engine.rs +++ b/src/storage/src/engine.rs @@ -9,7 +9,7 @@ use store_api::manifest::action::ProtocolAction; use store_api::{ logstore::LogStore, manifest::Manifest, - storage::{EngineContext, RegionDescriptor, StorageEngine}, + storage::{EngineContext, OpenOptions, RegionDescriptor, StorageEngine}, }; use crate::background::JobPoolImpl; @@ -41,8 +41,13 @@ impl StorageEngine for EngineImpl { type Error = Error; type Region = RegionImpl; - async fn open_region(&self, _ctx: &EngineContext, _name: &str) -> Result { - unimplemented!() + async fn open_region( + &self, + _ctx: &EngineContext, + name: &str, + opts: &OpenOptions, + ) -> Result { + self.inner.open_region(name, opts).await } async fn close_region(&self, _ctx: &EngineContext, _region: Self::Region) -> Result<()> { @@ -74,34 +79,19 @@ impl EngineImpl { } } -/// Engine share data -/// TODO(dennis): merge to EngineInner? -#[derive(Clone, Debug)] -struct SharedData { - pub _config: EngineConfig, - pub object_store: ObjectStore, -} +async fn new_object_store(store_config: &ObjectStoreConfig) -> Result { + // TODO(dennis): supports other backend + let store_dir = util::normalize_dir(match store_config { + ObjectStoreConfig::File(file) => &file.store_dir, + }); -impl SharedData { - async fn new(config: EngineConfig) -> Result { - // TODO(dennis): supports other backend - let store_dir = util::normalize_dir(match &config.store_config { - ObjectStoreConfig::File(file) => &file.store_dir, - }); + let accessor = Backend::build() + .root(&store_dir) + .finish() + .await + .context(error::InitBackendSnafu { dir: &store_dir })?; - let accessor = Backend::build() - .root(&store_dir) - .finish() - .await - .context(error::InitBackendSnafu { dir: &store_dir })?; - - let object_store = ObjectStore::new(accessor); - - Ok(Self { - _config: config, - object_store, - }) - } + Ok(ObjectStore::new(accessor)) } #[inline] @@ -114,12 +104,108 @@ pub fn region_manifest_dir(region_name: &str) -> String { format!("{}/manifest/", region_name) } -type RegionMap = HashMap>; +/// A slot for region in the engine. +/// +/// Also used as a placeholder in the region map when the region isn't ready, e.g. during +/// creating/opening. +#[derive(Debug)] +enum RegionSlot { + /// The region is during creation. + Creating, + /// The region is during opening. + Opening, + /// The region is ready for access. + Ready(RegionImpl), + // TODO(yingwen): Closing state. +} + +impl RegionSlot { + /// Try to get a ready region. + fn try_get_ready_region(&self) -> Result> { + if let RegionSlot::Ready(region) = self { + Ok(region.clone()) + } else { + error::InvalidRegionStateSnafu { + state: self.state_name(), + } + .fail() + } + } + + /// Returns the ready region or `None`. + fn get_ready_region(&self) -> Option> { + if let RegionSlot::Ready(region) = self { + Some(region.clone()) + } else { + None + } + } + + fn state_name(&self) -> &'static str { + match self { + RegionSlot::Creating => "creating", + RegionSlot::Opening => "opening", + RegionSlot::Ready(_) => "ready", + } + } +} + +impl Clone for RegionSlot { + // Manually implement Clone due to [rust#26925](https://github.com/rust-lang/rust/issues/26925). + // Maybe we should require `LogStore` to be clonable to work around this. + fn clone(&self) -> RegionSlot { + match self { + RegionSlot::Creating => RegionSlot::Creating, + RegionSlot::Opening => RegionSlot::Opening, + RegionSlot::Ready(region) => RegionSlot::Ready(region.clone()), + } + } +} + +/// Used to update slot or clean the slot on failure. +struct SlotGuard<'a, S: LogStore> { + name: &'a str, + regions: &'a RwLock>, + skip_clean: bool, +} + +impl<'a, S: LogStore> SlotGuard<'a, S> { + fn new(name: &'a str, regions: &'a RwLock>) -> SlotGuard<'a, S> { + SlotGuard { + name, + regions, + skip_clean: false, + } + } + + /// Update the slot and skip cleaning on drop. + fn update(&mut self, slot: RegionSlot) { + { + let mut regions = self.regions.write().unwrap(); + if let Some(old) = regions.get_mut(self.name) { + *old = slot; + } + } + + self.skip_clean = true; + } +} + +impl<'a, S: LogStore> Drop for SlotGuard<'a, S> { + fn drop(&mut self) { + if !self.skip_clean { + let mut regions = self.regions.write().unwrap(); + regions.remove(self.name); + } + } +} + +type RegionMap = HashMap>; struct EngineInner { + object_store: ObjectStore, log_store: Arc, regions: RwLock>, - shared: SharedData, memtable_builder: MemtableBuilderRef, flush_scheduler: FlushSchedulerRef, flush_strategy: FlushStrategyRef, @@ -129,50 +215,79 @@ impl EngineInner { pub async fn new(config: EngineConfig, log_store: Arc) -> Result { let job_pool = Arc::new(JobPoolImpl {}); let flush_scheduler = Arc::new(FlushSchedulerImpl::new(job_pool)); + let object_store = new_object_store(&config.store_config).await?; Ok(Self { + object_store, log_store, regions: RwLock::new(Default::default()), - shared: SharedData::new(config).await?, memtable_builder: Arc::new(DefaultMemtableBuilder {}), flush_scheduler, flush_strategy: Arc::new(SizeBasedStrategy::default()), }) } - async fn create_region(&self, descriptor: RegionDescriptor) -> Result> { + /// Returns the `Some(slot)` if there is existing slot with given `name`, or insert + /// given `slot` and returns `None`. + fn get_or_occupy_slot(&self, name: &str, slot: RegionSlot) -> Option> { { + // Try to get the region under read lock. let regions = self.regions.read().unwrap(); - if let Some(region) = regions.get(&descriptor.name) { - return Ok(region.clone()); + if let Some(slot) = regions.get(name) { + return Some(slot.clone()); } } + // Get the region under write lock. + let mut regions = self.regions.write().unwrap(); + if let Some(slot) = regions.get(name) { + return Some(slot.clone()); + } + + // No slot in map, we can insert the slot now. + regions.insert(name.to_string(), slot); + + None + } + + async fn open_region(&self, name: &str, opts: &OpenOptions) -> Result> { + // We can wait until the state of the slot has been changed to ready, but this will + // make the code more complicate, so we just return the error here. + if let Some(slot) = self.get_or_occupy_slot(name, RegionSlot::Opening) { + return slot.try_get_ready_region(); + } + + let mut guard = SlotGuard::new(name, &self.regions); + + // FIXME(yingwen): Get region id or remove dependency of region id. + let store_config = self.region_store_config(name); + let region = RegionImpl::open(name.to_string(), store_config, opts).await?; + + guard.update(RegionSlot::Ready(region.clone())); + + info!("Storage engine open region {:?}", ®ion); + + Ok(region) + } + + async fn create_region(&self, descriptor: RegionDescriptor) -> Result> { + if let Some(slot) = self.get_or_occupy_slot(&descriptor.name, RegionSlot::Creating) { + return slot.try_get_ready_region(); + } + + // Now the region in under `Creating` state. let region_id = descriptor.id; let region_name = descriptor.name.clone(); + let mut guard = SlotGuard::new(®ion_name, &self.regions); + let metadata: RegionMetadata = descriptor .try_into() .context(error::InvalidRegionDescSnafu { region: ®ion_name, })?; - let sst_dir = ®ion_sst_dir(®ion_name); - let sst_layer = Arc::new(FsAccessLayer::new( - sst_dir, - self.shared.object_store.clone(), - )); - let manifest_dir = region_manifest_dir(®ion_name); - let manifest = - RegionManifest::new(region_id, &manifest_dir, self.shared.object_store.clone()); - - let store_config = StoreConfig { - log_store: self.log_store.clone(), - sst_layer, - manifest: manifest.clone(), - memtable_builder: self.memtable_builder.clone(), - flush_scheduler: self.flush_scheduler.clone(), - flush_strategy: self.flush_strategy.clone(), - }; + let store_config = self.region_store_config(®ion_name); + let manifest = store_config.manifest.clone(); let region = RegionImpl::new( region_id, @@ -180,6 +295,7 @@ impl EngineInner { metadata.clone(), store_config, ); + // Persist region metadata manifest .update(RegionMetaActionList::new(vec![ @@ -190,14 +306,7 @@ impl EngineInner { ])) .await?; - { - let mut regions = self.regions.write().unwrap(); - if let Some(region) = regions.get(®ion_name) { - return Ok(region.clone()); - } - - regions.insert(region_name.clone(), region.clone()); - } + guard.update(RegionSlot::Ready(region.clone())); info!("Storage engine create region {:?}", ®ion); @@ -205,7 +314,24 @@ impl EngineInner { } fn get_region(&self, name: &str) -> Option> { - self.regions.read().unwrap().get(name).cloned() + let slot = self.regions.read().unwrap().get(name).cloned()?; + slot.get_ready_region() + } + + fn region_store_config(&self, region_name: &str) -> StoreConfig { + let sst_dir = ®ion_sst_dir(region_name); + let sst_layer = Arc::new(FsAccessLayer::new(sst_dir, self.object_store.clone())); + let manifest_dir = region_manifest_dir(region_name); + let manifest = RegionManifest::new(&manifest_dir, self.object_store.clone()); + + StoreConfig { + log_store: self.log_store.clone(), + sst_layer, + manifest, + memtable_builder: self.memtable_builder.clone(), + flush_scheduler: self.flush_scheduler.clone(), + flush_strategy: self.flush_strategy.clone(), + } } } diff --git a/src/storage/src/error.rs b/src/storage/src/error.rs index 8ea8373ad8..ae02e4119f 100644 --- a/src/storage/src/error.rs +++ b/src/storage/src/error.rs @@ -108,14 +108,8 @@ pub enum Error { backtrace: Backtrace, }, - #[snafu(display( - "Failed to write WAL, region id: {}, WAL name: {}, source: {}", - region_id, - name, - source - ))] + #[snafu(display("Failed to write WAL, WAL name: {}, source: {}", name, source))] WriteWal { - region_id: u32, name: String, #[snafu(backtrace)] source: BoxedError, @@ -196,6 +190,19 @@ pub enum Error { #[snafu(backtrace)] source: datatypes::error::Error, }, + + #[snafu(display("Region is under {} state, cannot proceed operation", state))] + InvalidRegionState { + state: &'static str, + backtrace: Backtrace, + }, + + #[snafu(display("Failed to read WAL, name: {}, source: {}", name, source))] + ReadWal { + name: String, + #[snafu(backtrace)] + source: BoxedError, + }, } pub type Result = std::result::Result; @@ -235,7 +242,9 @@ impl ErrorExt for Error { | ManifestProtocolForbidRead { .. } | ManifestProtocolForbidWrite { .. } | ReadParquet { .. } - | ReadParquetIo { .. } => StatusCode::StorageUnavailable, + | ReadParquetIo { .. } + | InvalidRegionState { .. } + | ReadWal { .. } => StatusCode::StorageUnavailable, } } diff --git a/src/storage/src/manifest/action.rs b/src/storage/src/manifest/action.rs index 0d45e72fb0..e278bd1b10 100644 --- a/src/storage/src/manifest/action.rs +++ b/src/storage/src/manifest/action.rs @@ -41,7 +41,8 @@ pub struct RegionEdit { #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] pub struct RegionManifestData { pub region_meta: RegionMetadataRef, - // TODO(dennis): version metadata + // TODO(dennis): [open_region] 1. load version metadata 2. The `region_meta` field could be removed if we + // have a `version` field. } #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] diff --git a/src/storage/src/manifest/region.rs b/src/storage/src/manifest/region.rs index 9cd3f97371..aeffbccb36 100644 --- a/src/storage/src/manifest/region.rs +++ b/src/storage/src/manifest/region.rs @@ -11,7 +11,6 @@ use object_store::ObjectStore; use snafu::ensure; use store_api::manifest::action::{self, ProtocolAction, ProtocolVersion}; use store_api::manifest::*; -use store_api::storage::RegionId; use crate::error::{Error, ManifestProtocolForbidWriteSnafu, Result}; use crate::manifest::action::*; @@ -27,12 +26,11 @@ pub struct RegionManifest { impl Manifest for RegionManifest { type Error = Error; type MetaAction = RegionMetaActionList; - type MetadataId = RegionId; type Metadata = RegionManifestData; - fn new(id: Self::MetadataId, manifest_dir: &str, object_store: ObjectStore) -> Self { + fn new(manifest_dir: &str, object_store: ObjectStore) -> Self { RegionManifest { - inner: Arc::new(RegionManifestInner::new(id, manifest_dir, object_store)), + inner: Arc::new(RegionManifestInner::new(manifest_dir, object_store)), } } @@ -52,6 +50,8 @@ impl Manifest for RegionManifest { let mut iter = self.inner.scan(start_bound, MAX_VERSION).await?; + // TODO(yingwen): [open_region] 1. Create Version from metadata 2. Load VersionEdits + // and apply to the Version by `Version::apply_edit`. while let Some((_v, action_list)) = iter.next_action().await? { for action in action_list.actions { if let RegionMetaAction::Change(c) = action { @@ -68,15 +68,10 @@ impl Manifest for RegionManifest { async fn checkpoint(&self) -> Result { unimplemented!(); } - - fn metadata_id(&self) -> RegionId { - self.inner.region_id - } } #[derive(Debug)] struct RegionManifestInner { - region_id: RegionId, store: Arc, version: AtomicU64, /// Current using protocol @@ -106,11 +101,10 @@ impl RegionMetaActionListIterator { } impl RegionManifestInner { - fn new(region_id: RegionId, manifest_dir: &str, object_store: ObjectStore) -> Self { + fn new(manifest_dir: &str, object_store: ObjectStore) -> Self { let (reader_version, writer_version) = action::supported_protocol_version(); Self { - region_id, store: Arc::new(ManifestObjectStore::new(manifest_dir, object_store)), // TODO(dennis): recover the last version from history version: AtomicU64::new(0), @@ -187,14 +181,12 @@ mod tests { .await .unwrap(), ); - let region_id = 0; - let manifest = RegionManifest::new(region_id, "/manifest/", object_store); - assert_eq!(region_id, manifest.metadata_id()); + let manifest = RegionManifest::new("/manifest/", object_store); let region_name = "region-0"; let desc = RegionDescBuilder::new(region_name) - .id(region_id) + .id(0) .push_key_column(("k1", LogicalTypeId::Int32, false)) .push_value_column(("v1", LogicalTypeId::Float32, true)) .build(); @@ -218,7 +210,7 @@ mod tests { // save another metadata let region_name = "region-0"; let desc = RegionDescBuilder::new(region_name) - .id(region_id) + .id(0) .push_key_column(("k1", LogicalTypeId::Int32, false)) .push_key_column(("k2", LogicalTypeId::Int64, false)) .push_value_column(("v1", LogicalTypeId::Float32, true)) diff --git a/src/storage/src/region.rs b/src/storage/src/region.rs index 7ebf02e881..dd4337c491 100644 --- a/src/storage/src/region.rs +++ b/src/storage/src/region.rs @@ -7,17 +7,20 @@ use std::sync::Arc; use async_trait::async_trait; use snafu::ensure; use store_api::logstore::LogStore; -use store_api::storage::{ReadContext, Region, RegionId, RegionMeta, WriteContext, WriteResponse}; +use store_api::manifest::Manifest; +use store_api::storage::{ + OpenOptions, ReadContext, Region, RegionId, RegionMeta, WriteContext, WriteResponse, +}; use crate::error::{self, Error, Result}; use crate::flush::{FlushSchedulerRef, FlushStrategyRef}; use crate::manifest::region::RegionManifest; -use crate::memtable::{MemtableBuilderRef, MemtableVersion}; +use crate::memtable::MemtableBuilderRef; use crate::metadata::{RegionMetaImpl, RegionMetadata}; pub use crate::region::writer::{RegionWriter, RegionWriterRef, WriterContext}; use crate::snapshot::SnapshotImpl; use crate::sst::AccessLayerRef; -use crate::version::{VersionControl, VersionControlRef}; +use crate::version::{Version, VersionControl, VersionControlRef}; use crate::wal::Wal; use crate::write_batch::WriteBatch; @@ -73,15 +76,15 @@ pub struct StoreConfig { } impl RegionImpl { + /// Create a new region without any data. pub fn new( id: RegionId, name: String, metadata: RegionMetadata, store_config: StoreConfig, ) -> RegionImpl { - let memtable_version = MemtableVersion::new(); - let version_control = VersionControl::new(metadata, memtable_version); - let wal = Wal::new(id, name.clone(), store_config.log_store); + let version_control = VersionControl::new(metadata); + let wal = Wal::new(name.clone(), store_config.log_store); let inner = Arc::new(RegionInner { shared: Arc::new(SharedData { @@ -99,6 +102,45 @@ impl RegionImpl { RegionImpl { inner } } + + /// Open an exsiting region and recover its data. + pub async fn open( + name: String, + store_config: StoreConfig, + opts: &OpenOptions, + ) -> Result> { + // Load version meta data from manifest. + let version = Self::recover_from_manifest(&store_config.manifest).await?; + let metadata = version.metadata().clone(); + let version_control = Arc::new(VersionControl::with_version(version)); + let wal = Wal::new(name.clone(), store_config.log_store); + let shared = Arc::new(SharedData { + id: metadata.id, + name, + version_control, + }); + + let writer = Arc::new(RegionWriter::new(store_config.memtable_builder)); + let writer_ctx = WriterContext { + shared: &shared, + flush_strategy: &store_config.flush_strategy, + flush_scheduler: &store_config.flush_scheduler, + sst_layer: &store_config.sst_layer, + wal: &wal, + writer: &writer, + manifest: &store_config.manifest, + }; + writer.replay(writer_ctx, opts).await?; + + unimplemented!() + } + + async fn recover_from_manifest(manifest: &RegionManifest) -> Result { + let _metadata = manifest.load().await?; + + // TODO(yingwen): [open_region] Get version from metadata. + unimplemented!() + } } // Private methods for tests. diff --git a/src/storage/src/region/tests.rs b/src/storage/src/region/tests.rs index a072176a89..6521cf8888 100644 --- a/src/storage/src/region/tests.rs +++ b/src/storage/src/region/tests.rs @@ -12,7 +12,6 @@ use crate::test_util::{self, config_util, descriptor_util::RegionDescBuilder, sc #[tokio::test] async fn test_new_region() { - let region_id = 0; let region_name = "region-0"; let desc = RegionDescBuilder::new(region_name) .enable_version_column(true) @@ -27,9 +26,9 @@ async fn test_new_region() { .to_string_lossy() .to_string(); - let store_config = config_util::new_store_config(&store_dir, region_id, region_name).await; + let store_config = config_util::new_store_config(&store_dir, region_name).await; - let region = RegionImpl::new(region_id, region_name.to_string(), metadata, store_config); + let region = RegionImpl::new(0, region_name.to_string(), metadata, store_config); let expect_schema = schema_util::new_schema_ref( &[ diff --git a/src/storage/src/region/tests/flush.rs b/src/storage/src/region/tests/flush.rs index 7c0e76ee04..623304ecdf 100644 --- a/src/storage/src/region/tests/flush.rs +++ b/src/storage/src/region/tests/flush.rs @@ -19,14 +19,12 @@ async fn new_region_for_flush( enable_version_column: bool, flush_strategy: FlushStrategyRef, ) -> RegionImpl { - let region_id = 0; - let metadata = read_write::new_metadata(REGION_NAME, enable_version_column); - let mut store_config = config_util::new_store_config(store_dir, region_id, REGION_NAME).await; + let mut store_config = config_util::new_store_config(store_dir, REGION_NAME).await; store_config.flush_strategy = flush_strategy; - RegionImpl::new(region_id, REGION_NAME.to_string(), metadata, store_config) + RegionImpl::new(0, REGION_NAME.to_string(), metadata, store_config) } struct FlushTester { diff --git a/src/storage/src/region/tests/read_write.rs b/src/storage/src/region/tests/read_write.rs index 6af5c26b55..8a0397b918 100644 --- a/src/storage/src/region/tests/read_write.rs +++ b/src/storage/src/region/tests/read_write.rs @@ -29,14 +29,13 @@ async fn new_region_for_rw( store_dir: &str, enable_version_column: bool, ) -> RegionImpl { - let region_id = 0; let region_name = "region-rw-0"; let metadata = new_metadata(region_name, enable_version_column); - let store_config = config_util::new_store_config(store_dir, region_id, region_name).await; + let store_config = config_util::new_store_config(store_dir, region_name).await; - RegionImpl::new(region_id, region_name.to_string(), metadata, store_config) + RegionImpl::new(0, region_name.to_string(), metadata, store_config) } fn new_write_batch_for_test(enable_version_column: bool) -> WriteBatch { diff --git a/src/storage/src/region/writer.rs b/src/storage/src/region/writer.rs index f8e59f26f1..773284dc79 100644 --- a/src/storage/src/region/writer.rs +++ b/src/storage/src/region/writer.rs @@ -2,9 +2,10 @@ use std::sync::Arc; use common_telemetry::logging; use common_time::RangeMillis; +use futures::TryStreamExt; use snafu::ResultExt; use store_api::logstore::LogStore; -use store_api::storage::{SequenceNumber, WriteContext, WriteRequest, WriteResponse}; +use store_api::storage::{OpenOptions, SequenceNumber, WriteContext, WriteRequest, WriteResponse}; use tokio::sync::Mutex; use crate::background::JobHandle; @@ -85,6 +86,16 @@ impl RegionWriter { Ok(()) } + /// Replay data to memtables. + pub async fn replay( + &self, + writer_ctx: WriterContext<'_, S>, + opts: &OpenOptions, + ) -> Result<()> { + let mut inner = self.inner.lock().await; + inner.replay(&self.version_mutex, writer_ctx, opts).await + } + async fn persist_manifest_version( &self, wal: &Wal, @@ -188,6 +199,31 @@ impl WriterInner { Ok(WriteResponse {}) } + async fn replay( + &mut self, + _version_mutex: &Mutex<()>, + writer_ctx: WriterContext<'_, S>, + opts: &OpenOptions, + ) -> Result<()> { + // TODO(yingwen): [open_region] Read `WriteBatch` from wal and invoke `WriterInner::write` to + // insert into memtables. + let version_control = writer_ctx.version_control(); + let version = version_control.current(); + + // Data after flushed sequence need to be recovered. + let start_sequence = version.flushed_sequence() + 1; + let _write_ctx = WriteContext::from(opts); + + let mut stream = writer_ctx.wal.read_from_wal(start_sequence).await?; + + while let Some((_header, _write_batch)) = stream.try_next().await? { + // TODO(yingwen): [open_region] 1. Split write batch and insert into memtables. 2. Need to update + // (recover) committed_sequence. + } + + unimplemented!() + } + /// Preprocess before write. /// /// Creates needed mutable memtables, ensures there is enough capacity in memtable and trigger diff --git a/src/storage/src/test_util/config_util.rs b/src/storage/src/test_util/config_util.rs index 392d055338..42793af108 100644 --- a/src/storage/src/test_util/config_util.rs +++ b/src/storage/src/test_util/config_util.rs @@ -3,7 +3,6 @@ use std::sync::Arc; use log_store::fs::noop::NoopLogStore; use object_store::{backend::fs::Backend, ObjectStore}; use store_api::manifest::Manifest; -use store_api::storage::RegionId; use crate::background::JobPoolImpl; use crate::engine; @@ -14,18 +13,14 @@ use crate::region::StoreConfig; use crate::sst::FsAccessLayer; /// Create a new StoreConfig for test. -pub async fn new_store_config( - store_dir: &str, - region_id: RegionId, - region_name: &str, -) -> StoreConfig { +pub async fn new_store_config(store_dir: &str, region_name: &str) -> StoreConfig { let sst_dir = engine::region_sst_dir(region_name); let manifest_dir = engine::region_manifest_dir(region_name); let accessor = Backend::build().root(store_dir).finish().await.unwrap(); let object_store = ObjectStore::new(accessor); let sst_layer = Arc::new(FsAccessLayer::new(&sst_dir, object_store.clone())); - let manifest = RegionManifest::new(region_id, &manifest_dir, object_store); + let manifest = RegionManifest::new(&manifest_dir, object_store); let job_pool = Arc::new(JobPoolImpl {}); let flush_scheduler = Arc::new(FlushSchedulerImpl::new(job_pool)); diff --git a/src/storage/src/version.rs b/src/storage/src/version.rs index 2e8462664d..ab44f28c75 100644 --- a/src/storage/src/version.rs +++ b/src/storage/src/version.rs @@ -37,9 +37,17 @@ pub struct VersionControl { impl VersionControl { /// Construct a new version control from `metadata`. - pub fn new(metadata: RegionMetadata, memtables: MemtableVersion) -> VersionControl { + pub fn new(metadata: RegionMetadata) -> VersionControl { VersionControl { - version: CowCell::new(Version::new(metadata, memtables)), + version: CowCell::new(Version::new(metadata)), + committed_sequence: AtomicU64::new(0), + } + } + + /// Construct a new version control from existing `version`. + pub fn with_version(version: Version) -> VersionControl { + VersionControl { + version: CowCell::new(version), committed_sequence: AtomicU64::new(0), } } @@ -139,16 +147,21 @@ pub struct Version { } impl Version { - pub fn new(metadata: RegionMetadata, memtables: MemtableVersion) -> Version { + pub fn new(metadata: RegionMetadata) -> Version { Version { metadata: Arc::new(metadata), - memtables: Arc::new(memtables), + memtables: Arc::new(MemtableVersion::new()), ssts: Arc::new(LevelMetas::new()), flushed_sequence: 0, manifest_version: 0, } } + #[inline] + pub fn metadata(&self) -> &RegionMetadataRef { + &self.metadata + } + #[inline] pub fn schema(&self) -> &SchemaRef { &self.metadata.schema @@ -169,6 +182,11 @@ impl Version { &self.ssts } + #[inline] + pub fn flushed_sequence(&self) -> SequenceNumber { + self.flushed_sequence + } + /// Returns duration used to partition the memtables and ssts by time. pub fn bucket_duration(&self) -> Duration { DEFAULT_BUCKET_DURATION @@ -218,7 +236,7 @@ mod tests { .build(); let metadata: RegionMetadata = desc.try_into().unwrap(); - VersionControl::new(metadata, MemtableVersion::new()) + VersionControl::new(metadata) } #[test] diff --git a/src/storage/src/wal.rs b/src/storage/src/wal.rs index 9ae5880e19..43f230b17b 100644 --- a/src/storage/src/wal.rs +++ b/src/storage/src/wal.rs @@ -1,6 +1,8 @@ +use std::pin::Pin; use std::sync::Arc; use common_error::prelude::BoxedError; +use futures::{stream, Stream, TryStreamExt}; use prost::Message; use snafu::ResultExt; use store_api::{ @@ -17,16 +19,17 @@ use crate::{ #[derive(Debug)] pub struct Wal { - region_id: u32, namespace: S::Namespace, store: Arc, } +pub type WriteBatchStream<'a> = + Pin> + Send + 'a>>; + // wal should be cheap to clone impl Clone for Wal { fn clone(&self) -> Self { Self { - region_id: self.region_id, namespace: self.namespace.clone(), store: self.store.clone(), } @@ -34,20 +37,11 @@ impl Clone for Wal { } impl Wal { - pub fn new(region_id: u32, region_name: impl Into, store: Arc) -> Self { + pub fn new(region_name: impl Into, store: Arc) -> Self { let region_name = region_name.into(); - let namespace = S::Namespace::new(®ion_name, region_id as u64); + let namespace = S::Namespace::new(®ion_name); - Self { - region_id, - namespace, - store, - } - } - - #[inline] - pub fn region_id(&self) -> u32 { - self.region_id + Self { namespace, store } } #[inline] @@ -96,10 +90,7 @@ impl Wal { encoder .encode(batch, &mut buf) .map_err(BoxedError::new) - .context(error::WriteWalSnafu { - region_id: self.region_id(), - name: self.name(), - })?; + .context(error::WriteWalSnafu { name: self.name() })?; } // TODO(jiachun): encode protobuf payload @@ -108,6 +99,27 @@ impl Wal { self.write(seq, &buf).await } + pub async fn read_from_wal(&self, start_seq: SequenceNumber) -> Result> { + let stream = self + .store + .read(self.namespace.clone(), start_seq) + .await + .map_err(BoxedError::new) + .context(error::ReadWalSnafu { name: self.name() })? + .map_err(|e| Error::ReadWal { + name: self.name().to_string(), + source: BoxedError::new(e), + }) + .and_then(|entries| async { + let iter = entries.into_iter().map(decode_entry); + + Ok(stream::iter(iter)) + }) + .try_flatten(); + + Ok(Box::pin(stream)) + } + async fn write(&self, seq: SequenceNumber, bytes: &[u8]) -> Result<(u64, usize)> { let ns = self.namespace.clone(); let mut e = S::Entry::new(bytes); @@ -118,15 +130,17 @@ impl Wal { .append(ns, e) .await .map_err(BoxedError::new) - .context(error::WriteWalSnafu { - region_id: self.region_id(), - name: self.name(), - })?; + .context(error::WriteWalSnafu { name: self.name() })?; Ok((res.entry_id(), res.offset())) } } +fn decode_entry(_entry: E) -> Result<(WalHeader, WriteBatch)> { + // TODO(yingwen): [open_region] Decode entry into write batch. + unimplemented!() +} + pub enum Payload<'a> { None, // only header WriteBatchArrow(&'a WriteBatch), @@ -187,7 +201,7 @@ mod tests { pub async fn test_write_wal() { let (log_store, _tmp) = test_util::log_store_util::create_tmp_local_file_log_store("wal_test").await; - let wal = Wal::new(0, "test_region", Arc::new(log_store)); + let wal = Wal::new("test_region", Arc::new(log_store)); let res = wal.write(0, b"test1").await.unwrap(); diff --git a/src/store-api/src/logstore/namespace.rs b/src/store-api/src/logstore/namespace.rs index a74c0c25a7..6b238aaf56 100644 --- a/src/store-api/src/logstore/namespace.rs +++ b/src/store-api/src/logstore/namespace.rs @@ -1,5 +1,5 @@ pub trait Namespace: Send + Sync + Clone + std::fmt::Debug { - fn new(name: &str, id: u64) -> Self; + fn new(name: &str) -> Self; fn name(&self) -> &str; } diff --git a/src/store-api/src/manifest.rs b/src/store-api/src/manifest.rs index 95eecccc33..fbd562a338 100644 --- a/src/store-api/src/manifest.rs +++ b/src/store-api/src/manifest.rs @@ -6,7 +6,8 @@ use async_trait::async_trait; use common_error::ext::ErrorExt; use object_store::ObjectStore; use serde::{de::DeserializeOwned, Serialize}; -pub use storage::*; + +pub use crate::manifest::storage::*; pub type ManifestVersion = u64; pub const MIN_VERSION: u64 = 0; @@ -14,8 +15,6 @@ pub const MAX_VERSION: u64 = u64::MAX; pub trait Metadata: Clone {} -pub trait MetadataId: Clone + Copy {} - pub trait MetaAction: Serialize + DeserializeOwned { fn set_prev_version(&mut self, version: ManifestVersion); } @@ -25,10 +24,9 @@ pub trait MetaAction: Serialize + DeserializeOwned { pub trait Manifest: Send + Sync + Clone + 'static { type Error: ErrorExt + Send + Sync; type MetaAction: MetaAction; - type MetadataId: MetadataId; type Metadata: Metadata; - fn new(id: Self::MetadataId, manifest_dir: &str, object_store: ObjectStore) -> Self; + fn new(manifest_dir: &str, object_store: ObjectStore) -> Self; /// Update metadata by the action async fn update(&self, action: Self::MetaAction) -> Result; @@ -37,6 +35,4 @@ pub trait Manifest: Send + Sync + Clone + 'static { async fn load(&self) -> Result, Self::Error>; async fn checkpoint(&self) -> Result; - - fn metadata_id(&self) -> Self::MetadataId; } diff --git a/src/store-api/src/storage.rs b/src/store-api/src/storage.rs index 3f39a8e56a..b69f78ead9 100644 --- a/src/store-api/src/storage.rs +++ b/src/store-api/src/storage.rs @@ -20,7 +20,7 @@ pub use self::descriptors::{ ColumnFamilyDescriptorBuilder, ColumnFamilyId, ColumnId, RegionDescriptor, RegionId, RowKeyDescriptor, RowKeyDescriptorBuilder, }; -pub use self::engine::{EngineContext, StorageEngine}; +pub use self::engine::{EngineContext, OpenOptions, StorageEngine}; pub use self::metadata::RegionMeta; pub use self::region::{Region, WriteContext}; pub use self::requests::{GetRequest, PutOperation, ScanRequest, WriteRequest}; diff --git a/src/store-api/src/storage/descriptors.rs b/src/store-api/src/storage/descriptors.rs index e2b149f069..030f8c83b7 100644 --- a/src/store-api/src/storage/descriptors.rs +++ b/src/store-api/src/storage/descriptors.rs @@ -2,15 +2,14 @@ use datatypes::value::Value; use derive_builder::Builder; use serde::{Deserialize, Serialize}; -use crate::manifest::MetadataId; use crate::storage::{consts, ColumnSchema, ConcreteDataType}; /// Id of column, unique in each region. pub type ColumnId = u32; /// Id of column family, unique in each region. pub type ColumnFamilyId = u32; +/// Id of the region. pub type RegionId = u32; -impl MetadataId for RegionId {} /// Default region name prefix pub const REGION_PREFIX: &str = "r_"; diff --git a/src/store-api/src/storage/engine.rs b/src/store-api/src/storage/engine.rs index 90fab46b01..5b2096453d 100644 --- a/src/store-api/src/storage/engine.rs +++ b/src/store-api/src/storage/engine.rs @@ -21,6 +21,7 @@ pub trait StorageEngine: Send + Sync + Clone + 'static { &self, ctx: &EngineContext, name: &str, + opts: &OpenOptions, ) -> Result; /// Closes given region. @@ -60,3 +61,9 @@ pub trait StorageEngine: Send + Sync + Clone + 'static { /// Storage engine context. #[derive(Debug, Clone, Default)] pub struct EngineContext {} + +/// Options to open a region. +#[derive(Debug, Clone, Default)] +pub struct OpenOptions { + // TODO(yingwen): [open_region] Supports create if not exists. +} diff --git a/src/store-api/src/storage/region.rs b/src/store-api/src/storage/region.rs index 8a43dcbb83..1497eb8f14 100644 --- a/src/store-api/src/storage/region.rs +++ b/src/store-api/src/storage/region.rs @@ -21,6 +21,7 @@ use async_trait::async_trait; use common_error::ext::ErrorExt; +use crate::storage::engine::OpenOptions; use crate::storage::metadata::RegionMeta; use crate::storage::requests::WriteRequest; use crate::storage::responses::WriteResponse; @@ -28,7 +29,7 @@ use crate::storage::snapshot::{ReadContext, Snapshot}; /// Chunks of rows in storage engine. #[async_trait] -pub trait Region: Send + Sync + Clone + 'static { +pub trait Region: Send + Sync + Clone + std::fmt::Debug + 'static { type Error: ErrorExt + Send + Sync; type Meta: RegionMeta; type WriteRequest: WriteRequest; @@ -54,3 +55,9 @@ pub trait Region: Send + Sync + Clone + 'static { /// Context for write operations. #[derive(Debug, Clone, Default)] pub struct WriteContext {} + +impl From<&OpenOptions> for WriteContext { + fn from(_opts: &OpenOptions) -> WriteContext { + WriteContext::default() + } +}