feat: Engine::open_region code skeleton (#120)

* refactor: Move fields in SharedData to EngineInner

Since `SharedData` isn't shared now, we move all its fields to
EngineInner, and remove the `SharedData` struct, also remove the
unused config field.

* feat: Store RegionSlot in engine's region map

A `RegionSlot` has three possible state:
- Opening
- Creating
- Ready (Holds the `RegionImpl`)

Also use the `RegionSlot` as a placeholder in the region map to indicate
the region is opening/creating, so another open/create request will
fail immediately. The `SlotGuard` is used to clean the slot if we failed
to create/open the region.

* feat: Add a blank method `RegionImpl::open`

* feat: Remove MetadataId from Manifest

Now metadata id of manifest is unused, also unnecessary as we have
manifest dir to build the manifest, but constructing the manifest
still needs a passing region id as argument, which is unavailable
during opening region. So we remove the metadata id from manifest so
`region_store_config()` don't need region id as input anymore

* feat: Remove region id from logstore::Namespace and Wal

This is necessary for implementing open, since we don't have region
id this time, but we need to build Wal and its logstore namespace. Now
this is ok as id is not actually used by logstore.

* feat: Setup `open_region` code skeleton
This commit is contained in:
evenyag
2022-07-29 17:52:33 +08:00
committed by GitHub
parent 62cb649389
commit f06968f4f5
19 changed files with 390 additions and 161 deletions

View File

@@ -9,21 +9,19 @@ pub struct LocalNamespace {
impl Default for LocalNamespace {
fn default() -> Self {
LocalNamespace::new("", 0)
LocalNamespace::new("")
}
}
#[derive(Debug)]
struct LocalNamespaceInner {
name: String,
id: u64,
}
impl Namespace for LocalNamespace {
fn new(name: &str, id: u64) -> Self {
fn new(name: &str) -> Self {
let inner = Arc::new(LocalNamespaceInner {
name: name.to_string(),
id,
});
Self { inner }
}
@@ -32,10 +30,3 @@ impl Namespace for LocalNamespace {
self.inner.name.as_str()
}
}
#[allow(dead_code)]
impl LocalNamespace {
fn id(&self) -> u64 {
self.inner.id
}
}

View File

@@ -9,7 +9,7 @@ use store_api::manifest::action::ProtocolAction;
use store_api::{
logstore::LogStore,
manifest::Manifest,
storage::{EngineContext, RegionDescriptor, StorageEngine},
storage::{EngineContext, OpenOptions, RegionDescriptor, StorageEngine},
};
use crate::background::JobPoolImpl;
@@ -41,8 +41,13 @@ impl<S: LogStore> StorageEngine for EngineImpl<S> {
type Error = Error;
type Region = RegionImpl<S>;
async fn open_region(&self, _ctx: &EngineContext, _name: &str) -> Result<Self::Region> {
unimplemented!()
async fn open_region(
&self,
_ctx: &EngineContext,
name: &str,
opts: &OpenOptions,
) -> Result<Self::Region> {
self.inner.open_region(name, opts).await
}
async fn close_region(&self, _ctx: &EngineContext, _region: Self::Region) -> Result<()> {
@@ -74,34 +79,19 @@ impl<S: LogStore> EngineImpl<S> {
}
}
/// Engine share data
/// TODO(dennis): merge to EngineInner?
#[derive(Clone, Debug)]
struct SharedData {
pub _config: EngineConfig,
pub object_store: ObjectStore,
}
async fn new_object_store(store_config: &ObjectStoreConfig) -> Result<ObjectStore> {
// TODO(dennis): supports other backend
let store_dir = util::normalize_dir(match store_config {
ObjectStoreConfig::File(file) => &file.store_dir,
});
impl SharedData {
async fn new(config: EngineConfig) -> Result<Self> {
// TODO(dennis): supports other backend
let store_dir = util::normalize_dir(match &config.store_config {
ObjectStoreConfig::File(file) => &file.store_dir,
});
let accessor = Backend::build()
.root(&store_dir)
.finish()
.await
.context(error::InitBackendSnafu { dir: &store_dir })?;
let accessor = Backend::build()
.root(&store_dir)
.finish()
.await
.context(error::InitBackendSnafu { dir: &store_dir })?;
let object_store = ObjectStore::new(accessor);
Ok(Self {
_config: config,
object_store,
})
}
Ok(ObjectStore::new(accessor))
}
#[inline]
@@ -114,12 +104,108 @@ pub fn region_manifest_dir(region_name: &str) -> String {
format!("{}/manifest/", region_name)
}
type RegionMap<S> = HashMap<String, RegionImpl<S>>;
/// A slot for region in the engine.
///
/// Also used as a placeholder in the region map when the region isn't ready, e.g. during
/// creating/opening.
#[derive(Debug)]
enum RegionSlot<S: LogStore> {
/// The region is during creation.
Creating,
/// The region is during opening.
Opening,
/// The region is ready for access.
Ready(RegionImpl<S>),
// TODO(yingwen): Closing state.
}
impl<S: LogStore> RegionSlot<S> {
/// Try to get a ready region.
fn try_get_ready_region(&self) -> Result<RegionImpl<S>> {
if let RegionSlot::Ready(region) = self {
Ok(region.clone())
} else {
error::InvalidRegionStateSnafu {
state: self.state_name(),
}
.fail()
}
}
/// Returns the ready region or `None`.
fn get_ready_region(&self) -> Option<RegionImpl<S>> {
if let RegionSlot::Ready(region) = self {
Some(region.clone())
} else {
None
}
}
fn state_name(&self) -> &'static str {
match self {
RegionSlot::Creating => "creating",
RegionSlot::Opening => "opening",
RegionSlot::Ready(_) => "ready",
}
}
}
impl<S: LogStore> Clone for RegionSlot<S> {
// Manually implement Clone due to [rust#26925](https://github.com/rust-lang/rust/issues/26925).
// Maybe we should require `LogStore` to be clonable to work around this.
fn clone(&self) -> RegionSlot<S> {
match self {
RegionSlot::Creating => RegionSlot::Creating,
RegionSlot::Opening => RegionSlot::Opening,
RegionSlot::Ready(region) => RegionSlot::Ready(region.clone()),
}
}
}
/// Used to update slot or clean the slot on failure.
struct SlotGuard<'a, S: LogStore> {
name: &'a str,
regions: &'a RwLock<RegionMap<S>>,
skip_clean: bool,
}
impl<'a, S: LogStore> SlotGuard<'a, S> {
fn new(name: &'a str, regions: &'a RwLock<RegionMap<S>>) -> SlotGuard<'a, S> {
SlotGuard {
name,
regions,
skip_clean: false,
}
}
/// Update the slot and skip cleaning on drop.
fn update(&mut self, slot: RegionSlot<S>) {
{
let mut regions = self.regions.write().unwrap();
if let Some(old) = regions.get_mut(self.name) {
*old = slot;
}
}
self.skip_clean = true;
}
}
impl<'a, S: LogStore> Drop for SlotGuard<'a, S> {
fn drop(&mut self) {
if !self.skip_clean {
let mut regions = self.regions.write().unwrap();
regions.remove(self.name);
}
}
}
type RegionMap<S> = HashMap<String, RegionSlot<S>>;
struct EngineInner<S: LogStore> {
object_store: ObjectStore,
log_store: Arc<S>,
regions: RwLock<RegionMap<S>>,
shared: SharedData,
memtable_builder: MemtableBuilderRef,
flush_scheduler: FlushSchedulerRef,
flush_strategy: FlushStrategyRef,
@@ -129,50 +215,79 @@ impl<S: LogStore> EngineInner<S> {
pub async fn new(config: EngineConfig, log_store: Arc<S>) -> Result<Self> {
let job_pool = Arc::new(JobPoolImpl {});
let flush_scheduler = Arc::new(FlushSchedulerImpl::new(job_pool));
let object_store = new_object_store(&config.store_config).await?;
Ok(Self {
object_store,
log_store,
regions: RwLock::new(Default::default()),
shared: SharedData::new(config).await?,
memtable_builder: Arc::new(DefaultMemtableBuilder {}),
flush_scheduler,
flush_strategy: Arc::new(SizeBasedStrategy::default()),
})
}
async fn create_region(&self, descriptor: RegionDescriptor) -> Result<RegionImpl<S>> {
/// Returns the `Some(slot)` if there is existing slot with given `name`, or insert
/// given `slot` and returns `None`.
fn get_or_occupy_slot(&self, name: &str, slot: RegionSlot<S>) -> Option<RegionSlot<S>> {
{
// Try to get the region under read lock.
let regions = self.regions.read().unwrap();
if let Some(region) = regions.get(&descriptor.name) {
return Ok(region.clone());
if let Some(slot) = regions.get(name) {
return Some(slot.clone());
}
}
// Get the region under write lock.
let mut regions = self.regions.write().unwrap();
if let Some(slot) = regions.get(name) {
return Some(slot.clone());
}
// No slot in map, we can insert the slot now.
regions.insert(name.to_string(), slot);
None
}
async fn open_region(&self, name: &str, opts: &OpenOptions) -> Result<RegionImpl<S>> {
// We can wait until the state of the slot has been changed to ready, but this will
// make the code more complicate, so we just return the error here.
if let Some(slot) = self.get_or_occupy_slot(name, RegionSlot::Opening) {
return slot.try_get_ready_region();
}
let mut guard = SlotGuard::new(name, &self.regions);
// FIXME(yingwen): Get region id or remove dependency of region id.
let store_config = self.region_store_config(name);
let region = RegionImpl::open(name.to_string(), store_config, opts).await?;
guard.update(RegionSlot::Ready(region.clone()));
info!("Storage engine open region {:?}", &region);
Ok(region)
}
async fn create_region(&self, descriptor: RegionDescriptor) -> Result<RegionImpl<S>> {
if let Some(slot) = self.get_or_occupy_slot(&descriptor.name, RegionSlot::Creating) {
return slot.try_get_ready_region();
}
// Now the region in under `Creating` state.
let region_id = descriptor.id;
let region_name = descriptor.name.clone();
let mut guard = SlotGuard::new(&region_name, &self.regions);
let metadata: RegionMetadata =
descriptor
.try_into()
.context(error::InvalidRegionDescSnafu {
region: &region_name,
})?;
let sst_dir = &region_sst_dir(&region_name);
let sst_layer = Arc::new(FsAccessLayer::new(
sst_dir,
self.shared.object_store.clone(),
));
let manifest_dir = region_manifest_dir(&region_name);
let manifest =
RegionManifest::new(region_id, &manifest_dir, self.shared.object_store.clone());
let store_config = StoreConfig {
log_store: self.log_store.clone(),
sst_layer,
manifest: manifest.clone(),
memtable_builder: self.memtable_builder.clone(),
flush_scheduler: self.flush_scheduler.clone(),
flush_strategy: self.flush_strategy.clone(),
};
let store_config = self.region_store_config(&region_name);
let manifest = store_config.manifest.clone();
let region = RegionImpl::new(
region_id,
@@ -180,6 +295,7 @@ impl<S: LogStore> EngineInner<S> {
metadata.clone(),
store_config,
);
// Persist region metadata
manifest
.update(RegionMetaActionList::new(vec![
@@ -190,14 +306,7 @@ impl<S: LogStore> EngineInner<S> {
]))
.await?;
{
let mut regions = self.regions.write().unwrap();
if let Some(region) = regions.get(&region_name) {
return Ok(region.clone());
}
regions.insert(region_name.clone(), region.clone());
}
guard.update(RegionSlot::Ready(region.clone()));
info!("Storage engine create region {:?}", &region);
@@ -205,7 +314,24 @@ impl<S: LogStore> EngineInner<S> {
}
fn get_region(&self, name: &str) -> Option<RegionImpl<S>> {
self.regions.read().unwrap().get(name).cloned()
let slot = self.regions.read().unwrap().get(name).cloned()?;
slot.get_ready_region()
}
fn region_store_config(&self, region_name: &str) -> StoreConfig<S> {
let sst_dir = &region_sst_dir(region_name);
let sst_layer = Arc::new(FsAccessLayer::new(sst_dir, self.object_store.clone()));
let manifest_dir = region_manifest_dir(region_name);
let manifest = RegionManifest::new(&manifest_dir, self.object_store.clone());
StoreConfig {
log_store: self.log_store.clone(),
sst_layer,
manifest,
memtable_builder: self.memtable_builder.clone(),
flush_scheduler: self.flush_scheduler.clone(),
flush_strategy: self.flush_strategy.clone(),
}
}
}

View File

@@ -108,14 +108,8 @@ pub enum Error {
backtrace: Backtrace,
},
#[snafu(display(
"Failed to write WAL, region id: {}, WAL name: {}, source: {}",
region_id,
name,
source
))]
#[snafu(display("Failed to write WAL, WAL name: {}, source: {}", name, source))]
WriteWal {
region_id: u32,
name: String,
#[snafu(backtrace)]
source: BoxedError,
@@ -196,6 +190,19 @@ pub enum Error {
#[snafu(backtrace)]
source: datatypes::error::Error,
},
#[snafu(display("Region is under {} state, cannot proceed operation", state))]
InvalidRegionState {
state: &'static str,
backtrace: Backtrace,
},
#[snafu(display("Failed to read WAL, name: {}, source: {}", name, source))]
ReadWal {
name: String,
#[snafu(backtrace)]
source: BoxedError,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -235,7 +242,9 @@ impl ErrorExt for Error {
| ManifestProtocolForbidRead { .. }
| ManifestProtocolForbidWrite { .. }
| ReadParquet { .. }
| ReadParquetIo { .. } => StatusCode::StorageUnavailable,
| ReadParquetIo { .. }
| InvalidRegionState { .. }
| ReadWal { .. } => StatusCode::StorageUnavailable,
}
}

View File

@@ -41,7 +41,8 @@ pub struct RegionEdit {
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct RegionManifestData {
pub region_meta: RegionMetadataRef,
// TODO(dennis): version metadata
// TODO(dennis): [open_region] 1. load version metadata 2. The `region_meta` field could be removed if we
// have a `version` field.
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]

View File

@@ -11,7 +11,6 @@ use object_store::ObjectStore;
use snafu::ensure;
use store_api::manifest::action::{self, ProtocolAction, ProtocolVersion};
use store_api::manifest::*;
use store_api::storage::RegionId;
use crate::error::{Error, ManifestProtocolForbidWriteSnafu, Result};
use crate::manifest::action::*;
@@ -27,12 +26,11 @@ pub struct RegionManifest {
impl Manifest for RegionManifest {
type Error = Error;
type MetaAction = RegionMetaActionList;
type MetadataId = RegionId;
type Metadata = RegionManifestData;
fn new(id: Self::MetadataId, manifest_dir: &str, object_store: ObjectStore) -> Self {
fn new(manifest_dir: &str, object_store: ObjectStore) -> Self {
RegionManifest {
inner: Arc::new(RegionManifestInner::new(id, manifest_dir, object_store)),
inner: Arc::new(RegionManifestInner::new(manifest_dir, object_store)),
}
}
@@ -52,6 +50,8 @@ impl Manifest for RegionManifest {
let mut iter = self.inner.scan(start_bound, MAX_VERSION).await?;
// TODO(yingwen): [open_region] 1. Create Version from metadata 2. Load VersionEdits
// and apply to the Version by `Version::apply_edit`.
while let Some((_v, action_list)) = iter.next_action().await? {
for action in action_list.actions {
if let RegionMetaAction::Change(c) = action {
@@ -68,15 +68,10 @@ impl Manifest for RegionManifest {
async fn checkpoint(&self) -> Result<ManifestVersion> {
unimplemented!();
}
fn metadata_id(&self) -> RegionId {
self.inner.region_id
}
}
#[derive(Debug)]
struct RegionManifestInner {
region_id: RegionId,
store: Arc<ManifestObjectStore>,
version: AtomicU64,
/// Current using protocol
@@ -106,11 +101,10 @@ impl RegionMetaActionListIterator {
}
impl RegionManifestInner {
fn new(region_id: RegionId, manifest_dir: &str, object_store: ObjectStore) -> Self {
fn new(manifest_dir: &str, object_store: ObjectStore) -> Self {
let (reader_version, writer_version) = action::supported_protocol_version();
Self {
region_id,
store: Arc::new(ManifestObjectStore::new(manifest_dir, object_store)),
// TODO(dennis): recover the last version from history
version: AtomicU64::new(0),
@@ -187,14 +181,12 @@ mod tests {
.await
.unwrap(),
);
let region_id = 0;
let manifest = RegionManifest::new(region_id, "/manifest/", object_store);
assert_eq!(region_id, manifest.metadata_id());
let manifest = RegionManifest::new("/manifest/", object_store);
let region_name = "region-0";
let desc = RegionDescBuilder::new(region_name)
.id(region_id)
.id(0)
.push_key_column(("k1", LogicalTypeId::Int32, false))
.push_value_column(("v1", LogicalTypeId::Float32, true))
.build();
@@ -218,7 +210,7 @@ mod tests {
// save another metadata
let region_name = "region-0";
let desc = RegionDescBuilder::new(region_name)
.id(region_id)
.id(0)
.push_key_column(("k1", LogicalTypeId::Int32, false))
.push_key_column(("k2", LogicalTypeId::Int64, false))
.push_value_column(("v1", LogicalTypeId::Float32, true))

View File

@@ -7,17 +7,20 @@ use std::sync::Arc;
use async_trait::async_trait;
use snafu::ensure;
use store_api::logstore::LogStore;
use store_api::storage::{ReadContext, Region, RegionId, RegionMeta, WriteContext, WriteResponse};
use store_api::manifest::Manifest;
use store_api::storage::{
OpenOptions, ReadContext, Region, RegionId, RegionMeta, WriteContext, WriteResponse,
};
use crate::error::{self, Error, Result};
use crate::flush::{FlushSchedulerRef, FlushStrategyRef};
use crate::manifest::region::RegionManifest;
use crate::memtable::{MemtableBuilderRef, MemtableVersion};
use crate::memtable::MemtableBuilderRef;
use crate::metadata::{RegionMetaImpl, RegionMetadata};
pub use crate::region::writer::{RegionWriter, RegionWriterRef, WriterContext};
use crate::snapshot::SnapshotImpl;
use crate::sst::AccessLayerRef;
use crate::version::{VersionControl, VersionControlRef};
use crate::version::{Version, VersionControl, VersionControlRef};
use crate::wal::Wal;
use crate::write_batch::WriteBatch;
@@ -73,15 +76,15 @@ pub struct StoreConfig<S> {
}
impl<S: LogStore> RegionImpl<S> {
/// Create a new region without any data.
pub fn new(
id: RegionId,
name: String,
metadata: RegionMetadata,
store_config: StoreConfig<S>,
) -> RegionImpl<S> {
let memtable_version = MemtableVersion::new();
let version_control = VersionControl::new(metadata, memtable_version);
let wal = Wal::new(id, name.clone(), store_config.log_store);
let version_control = VersionControl::new(metadata);
let wal = Wal::new(name.clone(), store_config.log_store);
let inner = Arc::new(RegionInner {
shared: Arc::new(SharedData {
@@ -99,6 +102,45 @@ impl<S: LogStore> RegionImpl<S> {
RegionImpl { inner }
}
/// Open an exsiting region and recover its data.
pub async fn open(
name: String,
store_config: StoreConfig<S>,
opts: &OpenOptions,
) -> Result<RegionImpl<S>> {
// Load version meta data from manifest.
let version = Self::recover_from_manifest(&store_config.manifest).await?;
let metadata = version.metadata().clone();
let version_control = Arc::new(VersionControl::with_version(version));
let wal = Wal::new(name.clone(), store_config.log_store);
let shared = Arc::new(SharedData {
id: metadata.id,
name,
version_control,
});
let writer = Arc::new(RegionWriter::new(store_config.memtable_builder));
let writer_ctx = WriterContext {
shared: &shared,
flush_strategy: &store_config.flush_strategy,
flush_scheduler: &store_config.flush_scheduler,
sst_layer: &store_config.sst_layer,
wal: &wal,
writer: &writer,
manifest: &store_config.manifest,
};
writer.replay(writer_ctx, opts).await?;
unimplemented!()
}
async fn recover_from_manifest(manifest: &RegionManifest) -> Result<Version> {
let _metadata = manifest.load().await?;
// TODO(yingwen): [open_region] Get version from metadata.
unimplemented!()
}
}
// Private methods for tests.

View File

@@ -12,7 +12,6 @@ use crate::test_util::{self, config_util, descriptor_util::RegionDescBuilder, sc
#[tokio::test]
async fn test_new_region() {
let region_id = 0;
let region_name = "region-0";
let desc = RegionDescBuilder::new(region_name)
.enable_version_column(true)
@@ -27,9 +26,9 @@ async fn test_new_region() {
.to_string_lossy()
.to_string();
let store_config = config_util::new_store_config(&store_dir, region_id, region_name).await;
let store_config = config_util::new_store_config(&store_dir, region_name).await;
let region = RegionImpl::new(region_id, region_name.to_string(), metadata, store_config);
let region = RegionImpl::new(0, region_name.to_string(), metadata, store_config);
let expect_schema = schema_util::new_schema_ref(
&[

View File

@@ -19,14 +19,12 @@ async fn new_region_for_flush(
enable_version_column: bool,
flush_strategy: FlushStrategyRef,
) -> RegionImpl<NoopLogStore> {
let region_id = 0;
let metadata = read_write::new_metadata(REGION_NAME, enable_version_column);
let mut store_config = config_util::new_store_config(store_dir, region_id, REGION_NAME).await;
let mut store_config = config_util::new_store_config(store_dir, REGION_NAME).await;
store_config.flush_strategy = flush_strategy;
RegionImpl::new(region_id, REGION_NAME.to_string(), metadata, store_config)
RegionImpl::new(0, REGION_NAME.to_string(), metadata, store_config)
}
struct FlushTester {

View File

@@ -29,14 +29,13 @@ async fn new_region_for_rw(
store_dir: &str,
enable_version_column: bool,
) -> RegionImpl<NoopLogStore> {
let region_id = 0;
let region_name = "region-rw-0";
let metadata = new_metadata(region_name, enable_version_column);
let store_config = config_util::new_store_config(store_dir, region_id, region_name).await;
let store_config = config_util::new_store_config(store_dir, region_name).await;
RegionImpl::new(region_id, region_name.to_string(), metadata, store_config)
RegionImpl::new(0, region_name.to_string(), metadata, store_config)
}
fn new_write_batch_for_test(enable_version_column: bool) -> WriteBatch {

View File

@@ -2,9 +2,10 @@ use std::sync::Arc;
use common_telemetry::logging;
use common_time::RangeMillis;
use futures::TryStreamExt;
use snafu::ResultExt;
use store_api::logstore::LogStore;
use store_api::storage::{SequenceNumber, WriteContext, WriteRequest, WriteResponse};
use store_api::storage::{OpenOptions, SequenceNumber, WriteContext, WriteRequest, WriteResponse};
use tokio::sync::Mutex;
use crate::background::JobHandle;
@@ -85,6 +86,16 @@ impl RegionWriter {
Ok(())
}
/// Replay data to memtables.
pub async fn replay<S: LogStore>(
&self,
writer_ctx: WriterContext<'_, S>,
opts: &OpenOptions,
) -> Result<()> {
let mut inner = self.inner.lock().await;
inner.replay(&self.version_mutex, writer_ctx, opts).await
}
async fn persist_manifest_version<S: LogStore>(
&self,
wal: &Wal<S>,
@@ -188,6 +199,31 @@ impl WriterInner {
Ok(WriteResponse {})
}
async fn replay<S: LogStore>(
&mut self,
_version_mutex: &Mutex<()>,
writer_ctx: WriterContext<'_, S>,
opts: &OpenOptions,
) -> Result<()> {
// TODO(yingwen): [open_region] Read `WriteBatch` from wal and invoke `WriterInner::write` to
// insert into memtables.
let version_control = writer_ctx.version_control();
let version = version_control.current();
// Data after flushed sequence need to be recovered.
let start_sequence = version.flushed_sequence() + 1;
let _write_ctx = WriteContext::from(opts);
let mut stream = writer_ctx.wal.read_from_wal(start_sequence).await?;
while let Some((_header, _write_batch)) = stream.try_next().await? {
// TODO(yingwen): [open_region] 1. Split write batch and insert into memtables. 2. Need to update
// (recover) committed_sequence.
}
unimplemented!()
}
/// Preprocess before write.
///
/// Creates needed mutable memtables, ensures there is enough capacity in memtable and trigger

View File

@@ -3,7 +3,6 @@ use std::sync::Arc;
use log_store::fs::noop::NoopLogStore;
use object_store::{backend::fs::Backend, ObjectStore};
use store_api::manifest::Manifest;
use store_api::storage::RegionId;
use crate::background::JobPoolImpl;
use crate::engine;
@@ -14,18 +13,14 @@ use crate::region::StoreConfig;
use crate::sst::FsAccessLayer;
/// Create a new StoreConfig for test.
pub async fn new_store_config(
store_dir: &str,
region_id: RegionId,
region_name: &str,
) -> StoreConfig<NoopLogStore> {
pub async fn new_store_config(store_dir: &str, region_name: &str) -> StoreConfig<NoopLogStore> {
let sst_dir = engine::region_sst_dir(region_name);
let manifest_dir = engine::region_manifest_dir(region_name);
let accessor = Backend::build().root(store_dir).finish().await.unwrap();
let object_store = ObjectStore::new(accessor);
let sst_layer = Arc::new(FsAccessLayer::new(&sst_dir, object_store.clone()));
let manifest = RegionManifest::new(region_id, &manifest_dir, object_store);
let manifest = RegionManifest::new(&manifest_dir, object_store);
let job_pool = Arc::new(JobPoolImpl {});
let flush_scheduler = Arc::new(FlushSchedulerImpl::new(job_pool));

View File

@@ -37,9 +37,17 @@ pub struct VersionControl {
impl VersionControl {
/// Construct a new version control from `metadata`.
pub fn new(metadata: RegionMetadata, memtables: MemtableVersion) -> VersionControl {
pub fn new(metadata: RegionMetadata) -> VersionControl {
VersionControl {
version: CowCell::new(Version::new(metadata, memtables)),
version: CowCell::new(Version::new(metadata)),
committed_sequence: AtomicU64::new(0),
}
}
/// Construct a new version control from existing `version`.
pub fn with_version(version: Version) -> VersionControl {
VersionControl {
version: CowCell::new(version),
committed_sequence: AtomicU64::new(0),
}
}
@@ -139,16 +147,21 @@ pub struct Version {
}
impl Version {
pub fn new(metadata: RegionMetadata, memtables: MemtableVersion) -> Version {
pub fn new(metadata: RegionMetadata) -> Version {
Version {
metadata: Arc::new(metadata),
memtables: Arc::new(memtables),
memtables: Arc::new(MemtableVersion::new()),
ssts: Arc::new(LevelMetas::new()),
flushed_sequence: 0,
manifest_version: 0,
}
}
#[inline]
pub fn metadata(&self) -> &RegionMetadataRef {
&self.metadata
}
#[inline]
pub fn schema(&self) -> &SchemaRef {
&self.metadata.schema
@@ -169,6 +182,11 @@ impl Version {
&self.ssts
}
#[inline]
pub fn flushed_sequence(&self) -> SequenceNumber {
self.flushed_sequence
}
/// Returns duration used to partition the memtables and ssts by time.
pub fn bucket_duration(&self) -> Duration {
DEFAULT_BUCKET_DURATION
@@ -218,7 +236,7 @@ mod tests {
.build();
let metadata: RegionMetadata = desc.try_into().unwrap();
VersionControl::new(metadata, MemtableVersion::new())
VersionControl::new(metadata)
}
#[test]

View File

@@ -1,6 +1,8 @@
use std::pin::Pin;
use std::sync::Arc;
use common_error::prelude::BoxedError;
use futures::{stream, Stream, TryStreamExt};
use prost::Message;
use snafu::ResultExt;
use store_api::{
@@ -17,16 +19,17 @@ use crate::{
#[derive(Debug)]
pub struct Wal<S: LogStore> {
region_id: u32,
namespace: S::Namespace,
store: Arc<S>,
}
pub type WriteBatchStream<'a> =
Pin<Box<dyn Stream<Item = Result<(WalHeader, WriteBatch)>> + Send + 'a>>;
// wal should be cheap to clone
impl<S: LogStore> Clone for Wal<S> {
fn clone(&self) -> Self {
Self {
region_id: self.region_id,
namespace: self.namespace.clone(),
store: self.store.clone(),
}
@@ -34,20 +37,11 @@ impl<S: LogStore> Clone for Wal<S> {
}
impl<S: LogStore> Wal<S> {
pub fn new(region_id: u32, region_name: impl Into<String>, store: Arc<S>) -> Self {
pub fn new(region_name: impl Into<String>, store: Arc<S>) -> Self {
let region_name = region_name.into();
let namespace = S::Namespace::new(&region_name, region_id as u64);
let namespace = S::Namespace::new(&region_name);
Self {
region_id,
namespace,
store,
}
}
#[inline]
pub fn region_id(&self) -> u32 {
self.region_id
Self { namespace, store }
}
#[inline]
@@ -96,10 +90,7 @@ impl<S: LogStore> Wal<S> {
encoder
.encode(batch, &mut buf)
.map_err(BoxedError::new)
.context(error::WriteWalSnafu {
region_id: self.region_id(),
name: self.name(),
})?;
.context(error::WriteWalSnafu { name: self.name() })?;
}
// TODO(jiachun): encode protobuf payload
@@ -108,6 +99,27 @@ impl<S: LogStore> Wal<S> {
self.write(seq, &buf).await
}
pub async fn read_from_wal(&self, start_seq: SequenceNumber) -> Result<WriteBatchStream<'_>> {
let stream = self
.store
.read(self.namespace.clone(), start_seq)
.await
.map_err(BoxedError::new)
.context(error::ReadWalSnafu { name: self.name() })?
.map_err(|e| Error::ReadWal {
name: self.name().to_string(),
source: BoxedError::new(e),
})
.and_then(|entries| async {
let iter = entries.into_iter().map(decode_entry);
Ok(stream::iter(iter))
})
.try_flatten();
Ok(Box::pin(stream))
}
async fn write(&self, seq: SequenceNumber, bytes: &[u8]) -> Result<(u64, usize)> {
let ns = self.namespace.clone();
let mut e = S::Entry::new(bytes);
@@ -118,15 +130,17 @@ impl<S: LogStore> Wal<S> {
.append(ns, e)
.await
.map_err(BoxedError::new)
.context(error::WriteWalSnafu {
region_id: self.region_id(),
name: self.name(),
})?;
.context(error::WriteWalSnafu { name: self.name() })?;
Ok((res.entry_id(), res.offset()))
}
}
fn decode_entry<E: Entry>(_entry: E) -> Result<(WalHeader, WriteBatch)> {
// TODO(yingwen): [open_region] Decode entry into write batch.
unimplemented!()
}
pub enum Payload<'a> {
None, // only header
WriteBatchArrow(&'a WriteBatch),
@@ -187,7 +201,7 @@ mod tests {
pub async fn test_write_wal() {
let (log_store, _tmp) =
test_util::log_store_util::create_tmp_local_file_log_store("wal_test").await;
let wal = Wal::new(0, "test_region", Arc::new(log_store));
let wal = Wal::new("test_region", Arc::new(log_store));
let res = wal.write(0, b"test1").await.unwrap();

View File

@@ -1,5 +1,5 @@
pub trait Namespace: Send + Sync + Clone + std::fmt::Debug {
fn new(name: &str, id: u64) -> Self;
fn new(name: &str) -> Self;
fn name(&self) -> &str;
}

View File

@@ -6,7 +6,8 @@ use async_trait::async_trait;
use common_error::ext::ErrorExt;
use object_store::ObjectStore;
use serde::{de::DeserializeOwned, Serialize};
pub use storage::*;
pub use crate::manifest::storage::*;
pub type ManifestVersion = u64;
pub const MIN_VERSION: u64 = 0;
@@ -14,8 +15,6 @@ pub const MAX_VERSION: u64 = u64::MAX;
pub trait Metadata: Clone {}
pub trait MetadataId: Clone + Copy {}
pub trait MetaAction: Serialize + DeserializeOwned {
fn set_prev_version(&mut self, version: ManifestVersion);
}
@@ -25,10 +24,9 @@ pub trait MetaAction: Serialize + DeserializeOwned {
pub trait Manifest: Send + Sync + Clone + 'static {
type Error: ErrorExt + Send + Sync;
type MetaAction: MetaAction;
type MetadataId: MetadataId;
type Metadata: Metadata;
fn new(id: Self::MetadataId, manifest_dir: &str, object_store: ObjectStore) -> Self;
fn new(manifest_dir: &str, object_store: ObjectStore) -> Self;
/// Update metadata by the action
async fn update(&self, action: Self::MetaAction) -> Result<ManifestVersion, Self::Error>;
@@ -37,6 +35,4 @@ pub trait Manifest: Send + Sync + Clone + 'static {
async fn load(&self) -> Result<Option<Self::Metadata>, Self::Error>;
async fn checkpoint(&self) -> Result<ManifestVersion, Self::Error>;
fn metadata_id(&self) -> Self::MetadataId;
}

View File

@@ -20,7 +20,7 @@ pub use self::descriptors::{
ColumnFamilyDescriptorBuilder, ColumnFamilyId, ColumnId, RegionDescriptor, RegionId,
RowKeyDescriptor, RowKeyDescriptorBuilder,
};
pub use self::engine::{EngineContext, StorageEngine};
pub use self::engine::{EngineContext, OpenOptions, StorageEngine};
pub use self::metadata::RegionMeta;
pub use self::region::{Region, WriteContext};
pub use self::requests::{GetRequest, PutOperation, ScanRequest, WriteRequest};

View File

@@ -2,15 +2,14 @@ use datatypes::value::Value;
use derive_builder::Builder;
use serde::{Deserialize, Serialize};
use crate::manifest::MetadataId;
use crate::storage::{consts, ColumnSchema, ConcreteDataType};
/// Id of column, unique in each region.
pub type ColumnId = u32;
/// Id of column family, unique in each region.
pub type ColumnFamilyId = u32;
/// Id of the region.
pub type RegionId = u32;
impl MetadataId for RegionId {}
/// Default region name prefix
pub const REGION_PREFIX: &str = "r_";

View File

@@ -21,6 +21,7 @@ pub trait StorageEngine: Send + Sync + Clone + 'static {
&self,
ctx: &EngineContext,
name: &str,
opts: &OpenOptions,
) -> Result<Self::Region, Self::Error>;
/// Closes given region.
@@ -60,3 +61,9 @@ pub trait StorageEngine: Send + Sync + Clone + 'static {
/// Storage engine context.
#[derive(Debug, Clone, Default)]
pub struct EngineContext {}
/// Options to open a region.
#[derive(Debug, Clone, Default)]
pub struct OpenOptions {
// TODO(yingwen): [open_region] Supports create if not exists.
}

View File

@@ -21,6 +21,7 @@
use async_trait::async_trait;
use common_error::ext::ErrorExt;
use crate::storage::engine::OpenOptions;
use crate::storage::metadata::RegionMeta;
use crate::storage::requests::WriteRequest;
use crate::storage::responses::WriteResponse;
@@ -28,7 +29,7 @@ use crate::storage::snapshot::{ReadContext, Snapshot};
/// Chunks of rows in storage engine.
#[async_trait]
pub trait Region: Send + Sync + Clone + 'static {
pub trait Region: Send + Sync + Clone + std::fmt::Debug + 'static {
type Error: ErrorExt + Send + Sync;
type Meta: RegionMeta;
type WriteRequest: WriteRequest;
@@ -54,3 +55,9 @@ pub trait Region: Send + Sync + Clone + 'static {
/// Context for write operations.
#[derive(Debug, Clone, Default)]
pub struct WriteContext {}
impl From<&OpenOptions> for WriteContext {
fn from(_opts: &OpenOptions) -> WriteContext {
WriteContext::default()
}
}