From 690a21e636bb81466c48ae05686370feba070a72 Mon Sep 17 00:00:00 2001 From: Dennis Zhuang Date: Tue, 9 Aug 2022 20:57:43 +0800 Subject: [PATCH] feat: impl TableManifest and refactor table engine, object store etc. --- Cargo.lock | 5 + src/cmd/src/bin/greptime.rs | 2 +- src/datanode/Cargo.toml | 1 + src/datanode/src/datanode.rs | 27 +++ src/datanode/src/error.rs | 8 + src/datanode/src/instance.rs | 40 +++- src/datanode/src/sql.rs | 15 +- src/storage/src/config.rs | 58 +----- src/storage/src/engine.rs | 80 ++++---- src/storage/src/error.rs | 16 +- src/storage/src/lib.rs | 2 +- src/storage/src/manifest.rs | 4 + src/storage/src/manifest/action.rs | 58 ++---- src/storage/src/manifest/helper.rs | 33 ++++ src/storage/src/manifest/impl.rs | 174 +++++++++++++++++ src/storage/src/manifest/impl_.rs | 177 ++++++++++++++++++ src/storage/src/manifest/region.rs | 177 +----------------- src/storage/src/region/tests/flush.rs | 2 +- src/storage/src/test_util/config_util.rs | 5 +- src/store-api/src/manifest.rs | 17 +- src/store-api/src/manifest/action.rs | 7 + src/store-api/src/storage.rs | 2 +- src/store-api/src/storage/engine.rs | 13 +- src/table-engine/Cargo.toml | 4 + src/table-engine/src/config.rs | 4 + src/table-engine/src/engine.rs | 73 ++++++-- src/table-engine/src/error.rs | 1 - src/table-engine/src/lib.rs | 2 + src/table-engine/src/manifest.rs | 11 ++ src/table-engine/src/manifest/action.rs | 106 +++++++++++ src/table-engine/src/table.rs | 11 +- src/table-engine/src/table/test_util.rs | 35 +++- .../src/table/test_util/mock_engine.rs | 7 +- src/table/src/metadata.rs | 11 +- 34 files changed, 808 insertions(+), 380 deletions(-) create mode 100644 src/storage/src/manifest/helper.rs create mode 100644 src/storage/src/manifest/impl.rs create mode 100644 src/storage/src/manifest/impl_.rs create mode 100644 src/table-engine/src/config.rs create mode 100644 src/table-engine/src/manifest.rs create mode 100644 src/table-engine/src/manifest/action.rs diff --git a/Cargo.lock b/Cargo.lock index 2b09c5ae07..f6f851ccce 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1182,6 +1182,7 @@ dependencies = [ "hyper", "log-store", "metrics 0.18.1", + "object-store", "query", "serde", "serde_json", @@ -3757,6 +3758,7 @@ dependencies = [ name = "table-engine" version = "0.1.0" dependencies = [ + "arc-swap", "async-stream", "async-trait", "chrono", @@ -3768,6 +3770,9 @@ dependencies = [ "datatypes", "futures", "log-store", + "object-store", + "serde", + "serde_json", "snafu", "storage", "store-api", diff --git a/src/cmd/src/bin/greptime.rs b/src/cmd/src/bin/greptime.rs index 26436e6af3..54140306b7 100644 --- a/src/cmd/src/bin/greptime.rs +++ b/src/cmd/src/bin/greptime.rs @@ -8,7 +8,7 @@ use common_telemetry::{self, logging::error, logging::info}; #[derive(Parser)] #[clap(name = "greptimedb")] struct Command { - #[clap(long, default_value = "/tmp/greptime/logs")] + #[clap(long, default_value = "/tmp/greptimedb/logs")] log_dir: String, #[clap(long, default_value = "info")] log_level: String, diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index 1c2afbf6f6..3b48f8c5aa 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -18,6 +18,7 @@ datatypes = { path = "../datatypes"} hyper = { version = "0.14", features = ["full"] } log-store = { path = "../log-store" } metrics = "0.18" +object-store = { path = "../object-store" } query = { path = "../query" } serde = "1.0" serde_json = "1.0" diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index d4762091a9..b8c506c057 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -4,11 +4,37 @@ use crate::error::Result; use crate::instance::{Instance, InstanceRef}; use crate::server::Services; +#[derive(Debug, Clone)] +pub struct FileStoreConfig { + /// Storage path + pub store_dir: String, +} + +impl Default for FileStoreConfig { + fn default() -> Self { + Self { + store_dir: "/tmp/greptimedb/data/".to_string(), + } + } +} + +#[derive(Debug, Clone)] +pub enum ObjectStoreConfig { + File(FileStoreConfig), +} + +impl Default for ObjectStoreConfig { + fn default() -> Self { + ObjectStoreConfig::File(FileStoreConfig::default()) + } +} + #[derive(Clone, Debug)] pub struct DatanodeOptions { pub http_addr: String, pub rpc_addr: String, pub wal_dir: String, + pub store_config: ObjectStoreConfig, } impl Default for DatanodeOptions { @@ -17,6 +43,7 @@ impl Default for DatanodeOptions { http_addr: Default::default(), rpc_addr: Default::default(), wal_dir: "/tmp/wal".to_string(), + store_config: ObjectStoreConfig::default(), } } } diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index de5c9da3e4..85aab1c57a 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -108,6 +108,13 @@ pub enum Error { #[snafu(display("Failed to storage engine, source: {}", source))] OpenStorageEngine { source: StorageError }, + + #[snafu(display("Failed to init backend, source: {}", source))] + InitBackend { + dir: String, + source: std::io::Error, + backtrace: Backtrace, + }, } pub type Result = std::result::Result; @@ -133,6 +140,7 @@ impl ErrorExt for Error { | Error::TcpBind { .. } | Error::StartGrpc { .. } | Error::CreateDir { .. } => StatusCode::Internal, + Error::InitBackend { .. } => StatusCode::StorageUnavailable, Error::OpenLogStore { source } => source.status_code(), Error::OpenStorageEngine { source } => source.status_code(), } diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 33a53e969a..280cde2377 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -6,16 +6,18 @@ use common_telemetry::logging::info; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnSchema, Schema}; use log_store::fs::{config::LogConfig, log::LocalFileLogStore}; +use object_store::{backend::fs::Backend, util, ObjectStore}; use query::query_engine::{Output, QueryEngineFactory, QueryEngineRef}; use snafu::{OptionExt, ResultExt}; use sql::statements::statement::Statement; -use storage::{config::EngineConfig, EngineImpl}; +use storage::{config::EngineConfig as StorageEngineConfig, EngineImpl}; use table::engine::EngineContext; use table::engine::TableEngine; use table::requests::CreateTableRequest; +use table_engine::config::EngineConfig as TableEngineConfig; use table_engine::engine::MitoEngine; -use crate::datanode::DatanodeOptions; +use crate::datanode::{DatanodeOptions, ObjectStoreConfig}; use crate::error::{ self, CreateTableSnafu, ExecuteSqlSnafu, InsertSnafu, NewCatalogSnafu, Result, TableNotFoundSnafu, @@ -38,12 +40,18 @@ pub struct Instance { pub type InstanceRef = Arc; impl Instance { - pub async fn new(opts: &DatanodeOptions) -> Result { + pub async fn new(opts: &DatanodeOptions, catalog_list: CatalogListRef) -> Result { + let object_store = new_object_store(&opts.store_config).await?; let log_store = create_local_file_log_store(opts).await?; + let table_engine = DefaultEngine::new( - EngineImpl::new(EngineConfig::default(), Arc::new(log_store)) - .await - .context(error::OpenStorageEngineSnafu)?, + TableEngineConfig::default(), + EngineImpl::new( + StorageEngineConfig::default(), + Arc::new(log_store), + object_store.clone(), + ), + object_store, ); let catalog_manager = Arc::new( catalog::LocalCatalogManager::try_new(Arc::new(table_engine.clone())) @@ -166,6 +174,26 @@ impl Instance { } } +async fn new_object_store(store_config: &ObjectStoreConfig) -> Result { + // TODO(dennis): supports other backend + let store_dir = util::normalize_dir(match store_config { + ObjectStoreConfig::File(file) => &file.store_dir, + }); + + fs::create_dir_all(path::Path::new(&store_dir)) + .context(error::CreateDirSnafu { dir: &store_dir })?; + + info!("The storage directory is: {}", &store_dir); + + let accessor = Backend::build() + .root(&store_dir) + .finish() + .await + .context(error::InitBackendSnafu { dir: &store_dir })?; + + Ok(ObjectStore::new(accessor)) +} + async fn create_local_file_log_store(opts: &DatanodeOptions) -> Result { // create WAL directory fs::create_dir_all(path::Path::new(&opts.wal_dir)) diff --git a/src/datanode/src/sql.rs b/src/datanode/src/sql.rs index 10e45a5e12..13a5353b51 100644 --- a/src/datanode/src/sql.rs +++ b/src/datanode/src/sql.rs @@ -65,11 +65,13 @@ mod tests { use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; use datatypes::value::Value; use log_store::fs::noop::NoopLogStore; + use object_store::{backend::fs::Backend, ObjectStore}; use query::QueryEngineFactory; - use storage::config::EngineConfig; + use storage::config::EngineConfig as StorageEngineConfig; use storage::EngineImpl; use table::error::Result as TableResult; use table::{Table, TableRef}; + use table_engine::config::EngineConfig as TableEngineConfig; use table_engine::engine::MitoEngine; use tempdir::TempDir; @@ -138,6 +140,8 @@ mod tests { async fn test_statement_to_request() { let dir = TempDir::new("setup_test_engine_and_table").unwrap(); let store_dir = dir.path().to_string_lossy(); + let accessor = Backend::build().root(&store_dir).finish().await.unwrap(); + let object_store = ObjectStore::new(accessor); let catalog_list = catalog::memory::new_memory_catalog_list().unwrap(); let factory = QueryEngineFactory::new(catalog_list); @@ -149,12 +153,13 @@ mod tests { "#; let table_engine = MitoEngine::>::new( + TableEngineConfig::default(), EngineImpl::new( - EngineConfig::with_store_dir(&store_dir), + StorageEngineConfig::default(), Arc::new(NoopLogStore::default()), - ) - .await - .unwrap(), + object_store.clone(), + ), + object_store, ); let sql_handler = SqlHandler::new(table_engine); diff --git a/src/storage/src/config.rs b/src/storage/src/config.rs index 6294095aa2..b1adf325e7 100644 --- a/src/storage/src/config.rs +++ b/src/storage/src/config.rs @@ -1,56 +1,4 @@ -//! Engine config -#[derive(Debug, Clone)] -pub struct FileStoreConfig { - /// Storage path - pub store_dir: String, -} +//! storage engine config -impl Default for FileStoreConfig { - fn default() -> Self { - Self { - store_dir: "/tmp/greptimedb/".to_string(), - } - } -} - -#[derive(Debug, Clone)] -pub enum ObjectStoreConfig { - File(FileStoreConfig), -} - -impl Default for ObjectStoreConfig { - fn default() -> Self { - ObjectStoreConfig::File(FileStoreConfig::default()) - } -} - -#[derive(Debug, Clone, Default)] -pub struct EngineConfig { - pub store_config: ObjectStoreConfig, -} - -impl EngineConfig { - pub fn with_store_dir(store_dir: &str) -> Self { - Self { - store_config: ObjectStoreConfig::File(FileStoreConfig { - store_dir: store_dir.to_string(), - }), - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_default_engine_config() { - let engine_config = EngineConfig::default(); - - let store_dir = match &engine_config.store_config { - ObjectStoreConfig::File(file) => &file.store_dir, - }; - - assert_eq!("/tmp/greptimedb/", store_dir); - } -} +#[derive(Debug, Default, Clone)] +pub struct EngineConfig {} diff --git a/src/storage/src/engine.rs b/src/storage/src/engine.rs index 0f774b839c..50f593a438 100644 --- a/src/storage/src/engine.rs +++ b/src/storage/src/engine.rs @@ -3,15 +3,15 @@ use std::sync::{Arc, RwLock}; use async_trait::async_trait; use common_telemetry::logging::info; -use object_store::{backend::fs::Backend, util, ObjectStore}; +use object_store::{util, ObjectStore}; use snafu::ResultExt; use store_api::{ logstore::LogStore, - storage::{EngineContext, OpenOptions, RegionDescriptor, StorageEngine}, + storage::{CreateOptions, EngineContext, OpenOptions, RegionDescriptor, StorageEngine}, }; use crate::background::JobPoolImpl; -use crate::config::{EngineConfig, ObjectStoreConfig}; +use crate::config::EngineConfig; use crate::error::{self, Error, Result}; use crate::flush::{FlushSchedulerImpl, FlushSchedulerRef, FlushStrategyRef, SizeBasedStrategy}; use crate::manifest::region::RegionManifest; @@ -55,8 +55,9 @@ impl StorageEngine for EngineImpl { &self, _ctx: &EngineContext, descriptor: RegionDescriptor, + opts: &CreateOptions, ) -> Result { - self.inner.create_region(descriptor).await + self.inner.create_region(descriptor, opts).await } async fn drop_region(&self, _ctx: &EngineContext, _region: Self::Region) -> Result<()> { @@ -69,36 +70,21 @@ impl StorageEngine for EngineImpl { } impl EngineImpl { - pub async fn new(config: EngineConfig, log_store: Arc) -> Result { - Ok(Self { - inner: Arc::new(EngineInner::new(config, log_store).await?), - }) + pub fn new(config: EngineConfig, log_store: Arc, object_store: ObjectStore) -> Self { + Self { + inner: Arc::new(EngineInner::new(config, log_store, object_store)), + } } } -async fn new_object_store(store_config: &ObjectStoreConfig) -> Result { - // TODO(dennis): supports other backend - let store_dir = util::normalize_dir(match store_config { - ObjectStoreConfig::File(file) => &file.store_dir, - }); - - let accessor = Backend::build() - .root(&store_dir) - .finish() - .await - .context(error::InitBackendSnafu { dir: &store_dir })?; - - Ok(ObjectStore::new(accessor)) +#[inline] +pub fn region_sst_dir(parent_dir: &str, region_name: &str) -> String { + format!("{}{}/", parent_dir, region_name) } #[inline] -pub fn region_sst_dir(region_name: &str) -> String { - format!("{}/", region_name) -} - -#[inline] -pub fn region_manifest_dir(region_name: &str) -> String { - format!("{}/manifest/", region_name) +pub fn region_manifest_dir(parent_dir: &str, region_name: &str) -> String { + format!("{}{}/manifest/", parent_dir, region_name) } /// A slot for region in the engine. @@ -209,19 +195,18 @@ struct EngineInner { } impl EngineInner { - pub async fn new(config: EngineConfig, log_store: Arc) -> Result { + pub fn new(_config: EngineConfig, log_store: Arc, object_store: ObjectStore) -> Self { let job_pool = Arc::new(JobPoolImpl {}); let flush_scheduler = Arc::new(FlushSchedulerImpl::new(job_pool)); - let object_store = new_object_store(&config.store_config).await?; - Ok(Self { + Self { object_store, log_store, regions: RwLock::new(Default::default()), memtable_builder: Arc::new(DefaultMemtableBuilder {}), flush_scheduler, flush_strategy: Arc::new(SizeBasedStrategy::default()), - }) + } } /// Returns the `Some(slot)` if there is existing slot with given `name`, or insert @@ -257,7 +242,7 @@ impl EngineInner { let mut guard = SlotGuard::new(name, &self.regions); // FIXME(yingwen): Get region id or remove dependency of region id. - let store_config = self.region_store_config(name); + let store_config = self.region_store_config(&opts.parent_dir, name); let region = match RegionImpl::open(name.to_string(), store_config, opts).await? { None => return Ok(None), @@ -268,7 +253,11 @@ impl EngineInner { Ok(Some(region)) } - async fn create_region(&self, descriptor: RegionDescriptor) -> Result> { + async fn create_region( + &self, + descriptor: RegionDescriptor, + opts: &CreateOptions, + ) -> Result> { if let Some(slot) = self.get_or_occupy_slot(&descriptor.name, RegionSlot::Creating) { return slot.try_get_ready_region(); } @@ -283,7 +272,7 @@ impl EngineInner { .context(error::InvalidRegionDescSnafu { region: ®ion_name, })?; - let store_config = self.region_store_config(®ion_name); + let store_config = self.region_store_config(&opts.parent_dir, ®ion_name); let region = RegionImpl::create(metadata, store_config).await?; @@ -299,10 +288,12 @@ impl EngineInner { slot.get_ready_region() } - fn region_store_config(&self, region_name: &str) -> StoreConfig { - let sst_dir = ®ion_sst_dir(region_name); + fn region_store_config(&self, parent_dir: &str, region_name: &str) -> StoreConfig { + let parent_dir = util::normalize_dir(parent_dir); + + let sst_dir = ®ion_sst_dir(&parent_dir, region_name); let sst_layer = Arc::new(FsAccessLayer::new(sst_dir, self.object_store.clone())); - let manifest_dir = region_manifest_dir(region_name); + let manifest_dir = region_manifest_dir(&parent_dir, region_name); let manifest = RegionManifest::new(&manifest_dir, self.object_store.clone()); StoreConfig { @@ -320,6 +311,7 @@ impl EngineInner { mod tests { use datatypes::type_id::LogicalTypeId; use log_store::test_util::log_store_util; + use object_store::backend::fs::Backend; use store_api::storage::Region; use tempdir::TempDir; @@ -332,9 +324,12 @@ mod tests { log_store_util::create_tmp_local_file_log_store("test_engine_wal").await; let dir = TempDir::new("test_create_new_region").unwrap(); let store_dir = dir.path().to_string_lossy(); - let config = EngineConfig::with_store_dir(&store_dir); + let accessor = Backend::build().root(&store_dir).finish().await.unwrap(); + let object_store = ObjectStore::new(accessor); - let engine = EngineImpl::new(config, Arc::new(log_store)).await.unwrap(); + let config = EngineConfig::default(); + + let engine = EngineImpl::new(config, Arc::new(log_store), object_store); let region_name = "region-0"; let desc = RegionDescBuilder::new(region_name) @@ -342,7 +337,10 @@ mod tests { .push_value_column(("v1", LogicalTypeId::Float32, true)) .build(); let ctx = EngineContext::default(); - let region = engine.create_region(&ctx, desc).await.unwrap(); + let region = engine + .create_region(&ctx, desc, &CreateOptions::default()) + .await + .unwrap(); assert_eq!(region_name, region.name()); diff --git a/src/storage/src/error.rs b/src/storage/src/error.rs index dd26c0402e..a445053574 100644 --- a/src/storage/src/error.rs +++ b/src/storage/src/error.rs @@ -13,7 +13,7 @@ use store_api::storage::SequenceNumber; use crate::metadata::Error as MetadataError; #[derive(Debug, Snafu)] -#[snafu(visibility(pub(crate)))] +#[snafu(visibility(pub))] pub enum Error { #[snafu(display("Invalid region descriptor, region: {}, source: {}", region, source))] InvalidRegionDesc { @@ -43,13 +43,6 @@ pub enum Error { backtrace: Backtrace, }, - #[snafu(display("Failed to init backend, source: {}", source))] - InitBackend { - dir: String, - source: std::io::Error, - backtrace: Backtrace, - }, - #[snafu(display("Failed to write parquet file, source: {}", source))] WriteParquet { source: arrow::error::ArrowError, @@ -162,8 +155,8 @@ pub enum Error { backtrace: Backtrace, }, - #[snafu(display("Failed to decode region action list, {}", msg))] - DecodeRegionMetaActionList { msg: String, backtrace: Backtrace }, + #[snafu(display("Failed to decode action list, {}", msg))] + DecodeMetaActionList { msg: String, backtrace: Backtrace }, #[snafu(display("Failed to read line, err: {}", source))] Readline { source: IoError }, @@ -249,7 +242,7 @@ impl ErrorExt for Error { | DecodeJson { .. } | JoinTask { .. } | Cancelled { .. } - | DecodeRegionMetaActionList { .. } + | DecodeMetaActionList { .. } | Readline { .. } | InvalidParquetSchema { .. } | SequenceColumnNotFound { .. } @@ -258,7 +251,6 @@ impl ErrorExt for Error { | SequenceNotMonotonic { .. } => StatusCode::Unexpected, FlushIo { .. } - | InitBackend { .. } | WriteParquet { .. } | ReadObject { .. } | WriteObject { .. } diff --git a/src/storage/src/lib.rs b/src/storage/src/lib.rs index 3464dfe61e..8b31f52122 100644 --- a/src/storage/src/lib.rs +++ b/src/storage/src/lib.rs @@ -8,7 +8,7 @@ pub mod config; mod engine; pub mod error; mod flush; -mod manifest; +pub mod manifest; pub mod memtable; pub mod metadata; mod proto; diff --git a/src/storage/src/manifest.rs b/src/storage/src/manifest.rs index 3c9ae3cabd..ada6d31b21 100644 --- a/src/storage/src/manifest.rs +++ b/src/storage/src/manifest.rs @@ -1,7 +1,11 @@ //! manifest storage pub(crate) mod action; pub(crate) mod checkpoint; +pub mod helper; +mod impl_; pub mod region; pub(crate) mod storage; #[cfg(test)] pub mod test_utils; + +pub use self::impl_::*; diff --git a/src/storage/src/manifest/action.rs b/src/storage/src/manifest/action.rs index 8b356afb28..6b8fd3de69 100644 --- a/src/storage/src/manifest/action.rs +++ b/src/storage/src/manifest/action.rs @@ -1,20 +1,19 @@ -use std::io::{BufRead, BufReader, Write}; +use std::io::{BufRead, BufReader}; use serde::{Deserialize, Serialize}; use serde_json as json; -use serde_json::ser::to_writer; use snafu::{ensure, OptionExt, ResultExt}; -use store_api::manifest::action::ProtocolAction; -use store_api::manifest::action::ProtocolVersion; +use store_api::manifest::action::{ProtocolAction, ProtocolVersion, VersionHeader}; use store_api::manifest::ManifestVersion; use store_api::manifest::MetaAction; use store_api::storage::RegionId; use store_api::storage::SequenceNumber; use crate::error::{ - DecodeJsonSnafu, DecodeRegionMetaActionListSnafu, EncodeJsonSnafu, - ManifestProtocolForbidReadSnafu, ReadlineSnafu, Result, + self, DecodeJsonSnafu, DecodeMetaActionListSnafu, ManifestProtocolForbidReadSnafu, + ReadlineSnafu, Result, }; +use crate::manifest::helper; use crate::metadata::{RegionMetadataRef, VersionNumber}; use crate::sst::FileMeta; @@ -50,13 +49,6 @@ pub enum RegionMetaAction { Edit(RegionEdit), } -#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] -struct VersionHeader { - prev_version: ManifestVersion, -} - -const NEWLINE: &[u8] = b"\n"; - impl RegionMetaActionList { pub fn with_action(action: RegionMetaAction) -> Self { Self { @@ -71,31 +63,21 @@ impl RegionMetaActionList { prev_version: 0, } } +} - /// Encode self into json in the form of string lines, starts with prev_version and then action json list. - pub(crate) fn encode(&self) -> Result> { - let mut bytes = Vec::default(); +impl MetaAction for RegionMetaActionList { + type Error = error::Error; - { - // Encode prev_version - let v = VersionHeader { - prev_version: self.prev_version, - }; - - to_writer(&mut bytes, &v).context(EncodeJsonSnafu)?; - // unwrap is fine here, because we write into a buffer. - bytes.write_all(NEWLINE).unwrap(); - } - - for action in &self.actions { - to_writer(&mut bytes, action).context(EncodeJsonSnafu)?; - bytes.write_all(NEWLINE).unwrap(); - } - - Ok(bytes) + fn set_prev_version(&mut self, version: ManifestVersion) { + self.prev_version = version; } - pub(crate) fn decode( + /// Encode self into json in the form of string lines, starts with prev_version and then action json list. + fn encode(&self) -> Result> { + helper::encode_actions(self.prev_version, &self.actions) + } + + fn decode( bs: &[u8], reader_version: ProtocolVersion, ) -> Result<(Self, Option)> { @@ -109,7 +91,7 @@ impl RegionMetaActionList { { let first_line = lines .next() - .with_context(|| DecodeRegionMetaActionListSnafu { + .with_context(|| DecodeMetaActionListSnafu { msg: format!( "Invalid content in manifest: {}", std::str::from_utf8(bs).unwrap_or("**invalid bytes**") @@ -148,12 +130,6 @@ impl RegionMetaActionList { } } -impl MetaAction for RegionMetaActionList { - fn set_prev_version(&mut self, version: ManifestVersion) { - self.prev_version = version; - } -} - #[cfg(test)] mod tests { use common_telemetry::logging; diff --git a/src/storage/src/manifest/helper.rs b/src/storage/src/manifest/helper.rs new file mode 100644 index 0000000000..eb9a0f6b6b --- /dev/null +++ b/src/storage/src/manifest/helper.rs @@ -0,0 +1,33 @@ +use std::io::Write; + +use serde::Serialize; +use serde_json::to_writer; +use snafu::ResultExt; +use store_api::manifest::action::VersionHeader; +use store_api::manifest::ManifestVersion; + +use crate::error::{EncodeJsonSnafu, Result}; + +pub const NEWLINE: &[u8] = b"\n"; + +pub fn encode_actions( + prev_version: ManifestVersion, + actions: &[T], +) -> Result> { + let mut bytes = Vec::default(); + { + // Encode prev_version + let v = VersionHeader { prev_version }; + + to_writer(&mut bytes, &v).context(EncodeJsonSnafu)?; + // unwrap is fine here, because we write into a buffer. + bytes.write_all(NEWLINE).unwrap(); + } + + for action in actions { + to_writer(&mut bytes, action).context(EncodeJsonSnafu)?; + bytes.write_all(NEWLINE).unwrap(); + } + + Ok(bytes) +} diff --git a/src/storage/src/manifest/impl.rs b/src/storage/src/manifest/impl.rs new file mode 100644 index 0000000000..6cef4cf506 --- /dev/null +++ b/src/storage/src/manifest/impl.rs @@ -0,0 +1,174 @@ +use std::sync::{ + atomic::{AtomicU64, Ordering}, + Arc, +}; + +use arc_swap::ArcSwap; +use async_trait::async_trait; +use common_telemetry::logging; +use object_store::ObjectStore; +use snafu::ensure; +use store_api::manifest::action::{self, ProtocolAction, ProtocolVersion}; +use store_api::manifest::*; + +use crate::error::{Error, ManifestProtocolForbidWriteSnafu, Result}; +use crate::manifest::action::*; +use crate::manifest::storage::ManifestObjectStore; +use crate::manifest::storage::ObjectStoreLogIterator; + +#[derive(Clone, Debug)] +pub struct RegionManifest { + inner: Arc, +} + +impl RegionManifest { + pub fn new(manifest_dir: &str, object_store: ObjectStore) -> Self { + RegionManifest { + inner: Arc::new(RegionManifestInner::new(manifest_dir, object_store)), + } + } + + /// Update inner state. + pub fn update_state(&self, version: ManifestVersion, protocol: Option) { + self.inner.update_state(version, protocol); + } +} + +#[async_trait] +impl Manifest for RegionManifest { + type Error = Error; + type MetaAction = RegionMetaActionList; + type MetaActionIterator = RegionMetaActionListIterator; + + async fn update(&self, action_list: RegionMetaActionList) -> Result { + self.inner.save(action_list).await + } + + async fn scan( + &self, + start: ManifestVersion, + end: ManifestVersion, + ) -> Result { + self.inner.scan(start, end).await + } + + async fn checkpoint(&self) -> Result { + unimplemented!(); + } + + fn last_version(&self) -> ManifestVersion { + self.inner.last_version() + } +} + +#[derive(Debug)] +struct RegionManifestInner { + store: Arc, + version: AtomicU64, + /// Current using protocol + protocol: ArcSwap, + /// Current node supported protocols (reader_version, writer_version) + supported_reader_version: ProtocolVersion, + supported_writer_version: ProtocolVersion, +} + +pub struct RegionMetaActionListIterator { + log_iter: ObjectStoreLogIterator, + reader_version: ProtocolVersion, + last_protocol: Option, +} + +impl RegionMetaActionListIterator { + pub fn last_protocol(&self) -> &Option { + &self.last_protocol + } +} + +#[async_trait] +impl MetaActionIterator for RegionMetaActionListIterator { + type Error = Error; + type MetaAction = RegionMetaActionList; + + async fn next_action(&mut self) -> Result> { + match self.log_iter.next_log().await? { + Some((v, bytes)) => { + let (action_list, protocol) = + RegionMetaActionList::decode(&bytes, self.reader_version)?; + + if protocol.is_some() { + self.last_protocol = protocol; + } + + Ok(Some((v, action_list))) + } + None => Ok(None), + } + } +} + +impl RegionManifestInner { + fn new(manifest_dir: &str, object_store: ObjectStore) -> Self { + let (reader_version, writer_version) = action::supported_protocol_version(); + + Self { + store: Arc::new(ManifestObjectStore::new(manifest_dir, object_store)), + version: AtomicU64::new(0), + protocol: ArcSwap::new(Arc::new(ProtocolAction::new())), + supported_reader_version: reader_version, + supported_writer_version: writer_version, + } + } + + #[inline] + fn inc_version(&self) -> ManifestVersion { + self.version.fetch_add(1, Ordering::Relaxed) + } + + fn update_state(&self, version: ManifestVersion, protocol: Option) { + self.version.store(version, Ordering::Relaxed); + if let Some(p) = protocol { + self.protocol.store(Arc::new(p)); + } + } + + #[inline] + fn last_version(&self) -> ManifestVersion { + self.version.load(Ordering::Relaxed) + } + + async fn save(&self, action_list: RegionMetaActionList) -> Result { + let protocol = self.protocol.load(); + + ensure!( + protocol.is_writable(self.supported_writer_version), + ManifestProtocolForbidWriteSnafu { + min_version: protocol.min_writer_version, + supported_version: self.supported_writer_version, + } + ); + + let version = self.inc_version(); + + logging::debug!( + "Save region metadata action: {:?}, version: {}", + action_list, + version + ); + + self.store.save(version, &action_list.encode()?).await?; + + Ok(version) + } + + async fn scan( + &self, + start: ManifestVersion, + end: ManifestVersion, + ) -> Result { + Ok(RegionMetaActionListIterator { + log_iter: self.store.scan(start, end).await?, + reader_version: self.supported_reader_version, + last_protocol: None, + }) + } +} diff --git a/src/storage/src/manifest/impl_.rs b/src/storage/src/manifest/impl_.rs new file mode 100644 index 0000000000..a185da44e9 --- /dev/null +++ b/src/storage/src/manifest/impl_.rs @@ -0,0 +1,177 @@ +use std::marker::PhantomData; +use std::sync::{ + atomic::{AtomicU64, Ordering}, + Arc, +}; + +use arc_swap::ArcSwap; +use async_trait::async_trait; +use common_telemetry::logging; +use object_store::ObjectStore; +use snafu::ensure; +use store_api::manifest::action::{self, ProtocolAction, ProtocolVersion}; +use store_api::manifest::*; + +use crate::error::{Error, ManifestProtocolForbidWriteSnafu, Result}; +use crate::manifest::storage::ManifestObjectStore; +use crate::manifest::storage::ObjectStoreLogIterator; + +#[derive(Clone, Debug)] +pub struct ManifestImpl> { + inner: Arc>, +} + +impl> ManifestImpl { + pub fn new(manifest_dir: &str, object_store: ObjectStore) -> Self { + ManifestImpl { + inner: Arc::new(ManifestImplInner::new(manifest_dir, object_store)), + } + } + + /// Update inner state. + pub fn update_state(&self, version: ManifestVersion, protocol: Option) { + self.inner.update_state(version, protocol); + } +} + +#[async_trait] +impl> Manifest for ManifestImpl { + type Error = Error; + type MetaAction = M; + type MetaActionIterator = MetaActionIteratorImpl; + + async fn update(&self, action_list: M) -> Result { + self.inner.save(action_list).await + } + + async fn scan( + &self, + start: ManifestVersion, + end: ManifestVersion, + ) -> Result { + self.inner.scan(start, end).await + } + + async fn checkpoint(&self) -> Result { + unimplemented!(); + } + + fn last_version(&self) -> ManifestVersion { + self.inner.last_version() + } +} + +#[derive(Debug)] +struct ManifestImplInner> { + store: Arc, + version: AtomicU64, + /// Current using protocol + protocol: ArcSwap, + /// Current node supported protocols (reader_version, writer_version) + supported_reader_version: ProtocolVersion, + supported_writer_version: ProtocolVersion, + _phantom: PhantomData, +} + +pub struct MetaActionIteratorImpl> { + log_iter: ObjectStoreLogIterator, + reader_version: ProtocolVersion, + last_protocol: Option, + _phantom: PhantomData, +} + +impl> MetaActionIteratorImpl { + pub fn last_protocol(&self) -> &Option { + &self.last_protocol + } +} + +#[async_trait] +impl> MetaActionIterator for MetaActionIteratorImpl { + type Error = Error; + type MetaAction = M; + + async fn next_action(&mut self) -> Result> { + match self.log_iter.next_log().await? { + Some((v, bytes)) => { + let (action_list, protocol) = M::decode(&bytes, self.reader_version)?; + + if protocol.is_some() { + self.last_protocol = protocol; + } + + Ok(Some((v, action_list))) + } + None => Ok(None), + } + } +} + +impl> ManifestImplInner { + fn new(manifest_dir: &str, object_store: ObjectStore) -> Self { + let (reader_version, writer_version) = action::supported_protocol_version(); + + Self { + store: Arc::new(ManifestObjectStore::new(manifest_dir, object_store)), + version: AtomicU64::new(0), + protocol: ArcSwap::new(Arc::new(ProtocolAction::new())), + supported_reader_version: reader_version, + supported_writer_version: writer_version, + _phantom: PhantomData, + } + } + + #[inline] + fn inc_version(&self) -> ManifestVersion { + self.version.fetch_add(1, Ordering::Relaxed) + } + + fn update_state(&self, version: ManifestVersion, protocol: Option) { + self.version.store(version, Ordering::Relaxed); + if let Some(p) = protocol { + self.protocol.store(Arc::new(p)); + } + } + + #[inline] + fn last_version(&self) -> ManifestVersion { + self.version.load(Ordering::Relaxed) + } + + async fn save(&self, action_list: M) -> Result { + let protocol = self.protocol.load(); + + ensure!( + protocol.is_writable(self.supported_writer_version), + ManifestProtocolForbidWriteSnafu { + min_version: protocol.min_writer_version, + supported_version: self.supported_writer_version, + } + ); + + let version = self.inc_version(); + + logging::debug!( + "Save region metadata action: {:?}, version: {}", + action_list, + version + ); + + self.store.save(version, &action_list.encode()?).await?; + + Ok(version) + } + + async fn scan( + &self, + start: ManifestVersion, + end: ManifestVersion, + ) -> Result> { + Ok(MetaActionIteratorImpl { + log_iter: self.store.scan(start, end).await?, + reader_version: self.supported_reader_version, + last_protocol: None, + _phantom: PhantomData, + }) + } +} diff --git a/src/storage/src/manifest/region.rs b/src/storage/src/manifest/region.rs index e3f83338f2..92aea2980e 100644 --- a/src/storage/src/manifest/region.rs +++ b/src/storage/src/manifest/region.rs @@ -1,182 +1,15 @@ //! Region manifest impl -use std::sync::{ - atomic::{AtomicU64, Ordering}, - Arc, -}; - -use arc_swap::ArcSwap; -use async_trait::async_trait; -use common_telemetry::logging; -use object_store::ObjectStore; -use snafu::ensure; -use store_api::manifest::action::{self, ProtocolAction, ProtocolVersion}; -use store_api::manifest::*; - -use crate::error::{Error, ManifestProtocolForbidWriteSnafu, Result}; use crate::manifest::action::*; -use crate::manifest::storage::ManifestObjectStore; -use crate::manifest::storage::ObjectStoreLogIterator; +use crate::manifest::ManifestImpl; -#[derive(Clone, Debug)] -pub struct RegionManifest { - inner: Arc, -} - -impl RegionManifest { - pub fn new(manifest_dir: &str, object_store: ObjectStore) -> Self { - RegionManifest { - inner: Arc::new(RegionManifestInner::new(manifest_dir, object_store)), - } - } - - /// Update inner state. - pub fn update_state(&self, version: ManifestVersion, protocol: Option) { - self.inner.update_state(version, protocol); - } -} - -#[async_trait] -impl Manifest for RegionManifest { - type Error = Error; - type MetaAction = RegionMetaActionList; - type MetaActionIterator = RegionMetaActionListIterator; - - async fn update(&self, action_list: RegionMetaActionList) -> Result { - self.inner.save(action_list).await - } - - async fn scan( - &self, - start: ManifestVersion, - end: ManifestVersion, - ) -> Result { - self.inner.scan(start, end).await - } - - async fn checkpoint(&self) -> Result { - unimplemented!(); - } - - fn last_version(&self) -> ManifestVersion { - self.inner.last_version() - } -} - -#[derive(Debug)] -struct RegionManifestInner { - store: Arc, - version: AtomicU64, - /// Current using protocol - protocol: ArcSwap, - /// Current node supported protocols (reader_version, writer_version) - supported_reader_version: ProtocolVersion, - supported_writer_version: ProtocolVersion, -} - -pub struct RegionMetaActionListIterator { - log_iter: ObjectStoreLogIterator, - reader_version: ProtocolVersion, - last_protocol: Option, -} - -impl RegionMetaActionListIterator { - pub fn last_protocol(&self) -> &Option { - &self.last_protocol - } -} - -#[async_trait] -impl MetaActionIterator for RegionMetaActionListIterator { - type Error = Error; - type MetaAction = RegionMetaActionList; - - async fn next_action(&mut self) -> Result> { - match self.log_iter.next_log().await? { - Some((v, bytes)) => { - let (action_list, protocol) = - RegionMetaActionList::decode(&bytes, self.reader_version)?; - - if protocol.is_some() { - self.last_protocol = protocol; - } - - Ok(Some((v, action_list))) - } - None => Ok(None), - } - } -} - -impl RegionManifestInner { - fn new(manifest_dir: &str, object_store: ObjectStore) -> Self { - let (reader_version, writer_version) = action::supported_protocol_version(); - - Self { - store: Arc::new(ManifestObjectStore::new(manifest_dir, object_store)), - version: AtomicU64::new(0), - protocol: ArcSwap::new(Arc::new(ProtocolAction::new())), - supported_reader_version: reader_version, - supported_writer_version: writer_version, - } - } - - #[inline] - fn inc_version(&self) -> ManifestVersion { - self.version.fetch_add(1, Ordering::Relaxed) - } - - fn update_state(&self, version: ManifestVersion, protocol: Option) { - self.version.store(version, Ordering::Relaxed); - if let Some(p) = protocol { - self.protocol.store(Arc::new(p)); - } - } - - #[inline] - fn last_version(&self) -> ManifestVersion { - self.version.load(Ordering::Relaxed) - } - - async fn save(&self, action_list: RegionMetaActionList) -> Result { - let protocol = self.protocol.load(); - - ensure!( - protocol.is_writable(self.supported_writer_version), - ManifestProtocolForbidWriteSnafu { - min_version: protocol.min_writer_version, - supported_version: self.supported_writer_version, - } - ); - - let version = self.inc_version(); - - logging::debug!( - "Save region metadata action: {:?}, version: {}", - action_list, - version - ); - - self.store.save(version, &action_list.encode()?).await?; - - Ok(version) - } - - async fn scan( - &self, - start: ManifestVersion, - end: ManifestVersion, - ) -> Result { - Ok(RegionMetaActionListIterator { - log_iter: self.store.scan(start, end).await?, - reader_version: self.supported_reader_version, - last_protocol: None, - }) - } -} +pub type RegionManifest = ManifestImpl; #[cfg(test)] mod tests { + use std::sync::Arc; + use object_store::{backend::fs, ObjectStore}; + use store_api::manifest::{Manifest, MetaActionIterator, MAX_VERSION}; use tempdir::TempDir; use super::*; diff --git a/src/storage/src/region/tests/flush.rs b/src/storage/src/region/tests/flush.rs index ef784779a9..50d4995783 100644 --- a/src/storage/src/region/tests/flush.rs +++ b/src/storage/src/region/tests/flush.rs @@ -103,7 +103,7 @@ async fn test_flush_and_stall() { tester.put(&data).await; // Check parquet files. - let sst_dir = format!("{}/{}", store_dir, engine::region_sst_dir(REGION_NAME)); + let sst_dir = format!("{}/{}", store_dir, engine::region_sst_dir("", REGION_NAME)); let mut has_parquet_file = false; for entry in std::fs::read_dir(sst_dir).unwrap() { let entry = entry.unwrap(); diff --git a/src/storage/src/test_util/config_util.rs b/src/storage/src/test_util/config_util.rs index 8be3399665..46fbfc004e 100644 --- a/src/storage/src/test_util/config_util.rs +++ b/src/storage/src/test_util/config_util.rs @@ -20,8 +20,9 @@ pub async fn new_store_config( region_name: &str, store_dir: &str, ) -> StoreConfig { - let sst_dir = engine::region_sst_dir(region_name); - let manifest_dir = engine::region_manifest_dir(region_name); + let parent_dir = ""; + let sst_dir = engine::region_sst_dir(parent_dir, region_name); + let manifest_dir = engine::region_manifest_dir(parent_dir, region_name); let accessor = Backend::build().root(store_dir).finish().await.unwrap(); let object_store = ObjectStore::new(accessor); diff --git a/src/store-api/src/manifest.rs b/src/store-api/src/manifest.rs index 095f2a3f91..f75f9718ae 100644 --- a/src/store-api/src/manifest.rs +++ b/src/store-api/src/manifest.rs @@ -6,14 +6,29 @@ use async_trait::async_trait; use common_error::ext::ErrorExt; use serde::{de::DeserializeOwned, Serialize}; +use crate::manifest::action::ProtocolAction; +use crate::manifest::action::ProtocolVersion; pub use crate::manifest::storage::*; pub type ManifestVersion = u64; pub const MIN_VERSION: u64 = 0; pub const MAX_VERSION: u64 = u64::MAX; -pub trait MetaAction: Serialize + DeserializeOwned { +pub trait MetaAction: Serialize + DeserializeOwned + Send + Sync + Clone + std::fmt::Debug { + type Error: ErrorExt + Send + Sync; + + /// Set previous valid manifest version. fn set_prev_version(&mut self, version: ManifestVersion); + + /// Encode this action into a byte vector + fn encode(&self) -> Result, Self::Error>; + + /// Decode self from byte slice with reader protocol version, + /// return error when reader version is not supported. + fn decode( + bs: &[u8], + reader_version: ProtocolVersion, + ) -> Result<(Self, Option), Self::Error>; } #[async_trait] diff --git a/src/store-api/src/manifest/action.rs b/src/store-api/src/manifest/action.rs index 89070d02ae..dc784962b3 100644 --- a/src/store-api/src/manifest/action.rs +++ b/src/store-api/src/manifest/action.rs @@ -1,6 +1,8 @@ ///! Common actions for manifest use serde::{Deserialize, Serialize}; +use crate::manifest::ManifestVersion; + pub type ProtocolVersion = u16; /// Current reader and writer versions @@ -23,6 +25,11 @@ pub struct ProtocolAction { pub min_writer_version: ProtocolVersion, } +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +pub struct VersionHeader { + pub prev_version: ManifestVersion, +} + impl Default for ProtocolAction { fn default() -> Self { let (min_reader_version, min_writer_version) = supported_protocol_version(); diff --git a/src/store-api/src/storage.rs b/src/store-api/src/storage.rs index 930c137d3a..8616c981b9 100644 --- a/src/store-api/src/storage.rs +++ b/src/store-api/src/storage.rs @@ -16,7 +16,7 @@ pub use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; pub use self::chunk::{Chunk, ChunkReader}; pub use self::descriptors::*; -pub use self::engine::{EngineContext, OpenOptions, StorageEngine}; +pub use self::engine::{CreateOptions, EngineContext, OpenOptions, StorageEngine}; pub use self::metadata::RegionMeta; pub use self::region::{Region, WriteContext}; pub use self::requests::{GetRequest, PutOperation, ScanRequest, WriteRequest}; diff --git a/src/store-api/src/storage/engine.rs b/src/store-api/src/storage/engine.rs index 2f6c6f9950..e9bdc27eaf 100644 --- a/src/store-api/src/storage/engine.rs +++ b/src/store-api/src/storage/engine.rs @@ -39,6 +39,7 @@ pub trait StorageEngine: Send + Sync + Clone + 'static { &self, ctx: &EngineContext, descriptor: RegionDescriptor, + opts: &CreateOptions, ) -> Result; /// Drops given region. @@ -62,6 +63,16 @@ pub trait StorageEngine: Send + Sync + Clone + 'static { #[derive(Debug, Clone, Default)] pub struct EngineContext {} +/// Options to create a region. +#[derive(Debug, Clone, Default)] +pub struct CreateOptions { + /// Region parent directory + pub parent_dir: String, +} + /// Options to open a region. #[derive(Debug, Clone, Default)] -pub struct OpenOptions {} +pub struct OpenOptions { + /// Region parent directory + pub parent_dir: String, +} diff --git a/src/table-engine/Cargo.toml b/src/table-engine/Cargo.toml index 8722a9c5a3..5d73710a21 100644 --- a/src/table-engine/Cargo.toml +++ b/src/table-engine/Cargo.toml @@ -4,6 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] +arc-swap = "1.0" async-stream = "0.3" async-trait = "0.1" chrono = { version = "0.4", features = ["serde"] } @@ -15,6 +16,9 @@ datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git" , b datatypes = { path = "../datatypes" } futures = "0.3" log-store = { path = "../log-store" } +object-store = { path = "../object-store" } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" snafu = { version = "0.7", features = ["backtraces"] } storage ={ path = "../storage" } store-api ={ path = "../store-api" } diff --git a/src/table-engine/src/config.rs b/src/table-engine/src/config.rs new file mode 100644 index 0000000000..335fce4069 --- /dev/null +++ b/src/table-engine/src/config.rs @@ -0,0 +1,4 @@ +//! Table Engine config + +#[derive(Debug, Clone, Default)] +pub struct EngineConfig {} diff --git a/src/table-engine/src/engine.rs b/src/table-engine/src/engine.rs index 884a0d84cf..a95a4c9cfc 100644 --- a/src/table-engine/src/engine.rs +++ b/src/table-engine/src/engine.rs @@ -6,11 +6,12 @@ use std::sync::RwLock; use async_trait::async_trait; use common_error::ext::BoxedError; use common_telemetry::logging; +use object_store::ObjectStore; use snafu::{OptionExt, ResultExt}; use store_api::storage::{ self, ColumnDescriptorBuilder, ColumnFamilyDescriptor, ColumnFamilyDescriptorBuilder, ColumnId, - OpenOptions, Region, RegionDescriptorBuilder, RegionMeta, RowKeyDescriptor, - RowKeyDescriptorBuilder, StorageEngine, + CreateOptions, OpenOptions, Region, RegionDescriptorBuilder, RegionId, RegionMeta, + RowKeyDescriptor, RowKeyDescriptorBuilder, StorageEngine, }; use table::engine::{EngineContext, TableEngine}; use table::requests::{AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest}; @@ -21,15 +22,32 @@ use table::{ }; use tokio::sync::Mutex; +use crate::config::EngineConfig; use crate::error::{ self, BuildColumnDescriptorSnafu, BuildColumnFamilyDescriptorSnafu, BuildRegionDescriptorSnafu, BuildRowKeyDescriptorSnafu, MissingTimestampIndexSnafu, Result, }; +use crate::manifest::TableManifest; use crate::table::MitoTable; pub const MITO_ENGINE: &str = "mito"; const INIT_COLUMN_ID: ColumnId = 0; +#[inline] +fn region_name(id: RegionId) -> String { + format!("{:010}", id) +} + +#[inline] +fn table_dir(table_name: &str) -> String { + format!("{}/", table_name) +} + +#[inline] +fn table_manifest_dir(table_name: &str) -> String { + format!("{}/manifest/", table_name) +} + /// [TableEngine] implementation. /// /// About mito . @@ -40,9 +58,9 @@ pub struct MitoEngine { } impl MitoEngine { - pub fn new(storage_engine: S) -> Self { + pub fn new(config: EngineConfig, storage_engine: S, object_store: ObjectStore) -> Self { Self { - inner: Arc::new(MitoEngineInner::new(storage_engine)), + inner: Arc::new(MitoEngineInner::new(config, storage_engine, object_store)), } } } @@ -100,6 +118,7 @@ struct MitoEngineInner { /// /// Writing to `tables` should also hold the `table_mutex`. tables: RwLock>, + object_store: ObjectStore, storage_engine: S, // FIXME(yingwen): Remove `next_table_id`. Table id should be assigned by other module (maybe catalog). next_table_id: AtomicU64, @@ -109,10 +128,11 @@ struct MitoEngineInner { } impl MitoEngineInner { - fn new(storage_engine: S) -> Self { + fn new(_config: EngineConfig, storage_engine: S, object_store: ObjectStore) -> Self { Self { tables: RwLock::new(HashMap::default()), storage_engine, + object_store, next_table_id: AtomicU64::new(0), table_mutex: Mutex::new(()), } @@ -246,10 +266,11 @@ impl MitoEngineInner { build_column_family_from_request(INIT_COLUMN_ID, &request)?; let (next_column_id, row_key) = build_row_key_desc_from_schema(next_column_id, &request)?; - // Now we just use table name as region name. TODO(yingwen): Naming pattern of region. - let region_name = table_name.clone(); + // TODO(dennis): supports multi regions; + let region_id = 0; + let region_name = region_name(region_id); let region_descriptor = RegionDescriptorBuilder::default() - .id(0) + .id(region_id) .name(®ion_name) .row_key(row_key) .default_cf(default_cf) @@ -265,9 +286,13 @@ impl MitoEngineInner { return Ok(table); } + let opts = CreateOptions { + parent_dir: table_dir(table_name), + }; + let region = self .storage_engine - .create_region(&storage::EngineContext::default(), region_descriptor) + .create_region(&storage::EngineContext::default(), region_descriptor, &opts) .await .map_err(BoxedError::new) .context(error::CreateRegionSnafu)?; @@ -289,10 +314,13 @@ impl MitoEngineInner { .build() .context(error::BuildTableInfoSnafu { table_name })?; + let manifest = + TableManifest::new(&table_manifest_dir(table_name), self.object_store.clone()); + //TODO(dennis): persist table info to table manifest service. logging::info!("Mito engine created table: {:?}.", table_info); - let table = Arc::new(MitoTable::new(table_info, region)); + let table = Arc::new(MitoTable::new(table_info, region, manifest)); self.tables .write() @@ -323,13 +351,17 @@ impl MitoEngineInner { } let engine_ctx = storage::EngineContext::default(); - let opts = OpenOptions::default(); - let region_name = table_name; - // Now we just use table name as region name. TODO(yingwen): Naming pattern of region. + let opts = OpenOptions { + parent_dir: table_dir(table_name), + }; + + // TODO(dennis): supports multi regions + let region_id = 0; + let region_name = region_name(region_id); let region = match self .storage_engine - .open_region(&engine_ctx, region_name, &opts) + .open_region(&engine_ctx, ®ion_name, &opts) .await .map_err(BoxedError::new) .context(error::OpenRegionSnafu { region_name })? @@ -353,8 +385,10 @@ impl MitoEngineInner { .table_type(TableType::Base) .build() .context(error::BuildTableInfoSnafu { table_name })?; + let manifest = + TableManifest::new(&table_manifest_dir(table_name), self.object_store.clone()); - let table = Arc::new(MitoTable::new(table_info, region)); + let table = Arc::new(MitoTable::new(table_info, region, manifest)); self.tables .write() @@ -447,8 +481,9 @@ mod tests { table_id: 0, }; - let (engine, table) = { - let (engine, table_engine, table) = test_util::setup_mock_engine_and_table().await; + let (engine, table, object_store) = { + let (engine, table_engine, table, object_store) = + test_util::setup_mock_engine_and_table().await; assert_eq!(MITO_ENGINE, table_engine.name()); // Now try to open the table again. let reopened = table_engine @@ -458,11 +493,11 @@ mod tests { .unwrap(); assert_eq!(table.schema(), reopened.schema()); - (engine, table) + (engine, table, object_store) }; // Construct a new table engine, and try to open the table. - let table_engine = MitoEngine::new(engine); + let table_engine = MitoEngine::new(EngineConfig::default(), engine, object_store); let reopened = table_engine .open_table(&ctx, open_req.clone()) .await diff --git a/src/table-engine/src/error.rs b/src/table-engine/src/error.rs index 740c415b04..5c217419dc 100644 --- a/src/table-engine/src/error.rs +++ b/src/table-engine/src/error.rs @@ -111,7 +111,6 @@ impl ErrorExt for Error { match self { CreateRegion { source, .. } | OpenRegion { source, .. } => source.status_code(), - BuildRowKeyDescriptor { .. } | BuildColumnDescriptor { .. } | BuildColumnFamilyDescriptor { .. } diff --git a/src/table-engine/src/lib.rs b/src/table-engine/src/lib.rs index bf9f81629f..462396858a 100644 --- a/src/table-engine/src/lib.rs +++ b/src/table-engine/src/lib.rs @@ -1,3 +1,5 @@ +pub mod config; pub mod engine; pub mod error; +mod manifest; pub mod table; diff --git a/src/table-engine/src/manifest.rs b/src/table-engine/src/manifest.rs new file mode 100644 index 0000000000..d131020560 --- /dev/null +++ b/src/table-engine/src/manifest.rs @@ -0,0 +1,11 @@ +//! Table manifest service +mod action; + +use storage::manifest::ManifestImpl; + +use crate::manifest::action::*; + +pub type TableManifest = ManifestImpl; + +#[cfg(test)] +mod tests {} diff --git a/src/table-engine/src/manifest/action.rs b/src/table-engine/src/manifest/action.rs new file mode 100644 index 0000000000..0e33486c2b --- /dev/null +++ b/src/table-engine/src/manifest/action.rs @@ -0,0 +1,106 @@ +use std::io::{BufRead, BufReader}; + +use serde::{Deserialize, Serialize}; +use serde_json as json; +use snafu::{ensure, OptionExt, ResultExt}; +use storage::error::{ + DecodeJsonSnafu, DecodeMetaActionListSnafu, Error as StorageError, + ManifestProtocolForbidReadSnafu, ReadlineSnafu, +}; +use storage::manifest::helper; +use store_api::manifest::action::{ProtocolAction, ProtocolVersion, VersionHeader}; +use store_api::manifest::ManifestVersion; +use store_api::manifest::MetaAction; +use table::metadata::{TableIdent, TableInfo}; + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +pub struct TableChange { + pub table_info: TableInfo, +} + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +pub struct TableRemove { + pub table_ident: TableIdent, + pub table_name: String, +} + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +pub enum TableMetaAction { + Protocol(ProtocolAction), + Change(TableChange), + Remove(TableRemove), +} + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +pub struct TableMetaActionList { + pub actions: Vec, + pub prev_version: ManifestVersion, +} + +impl MetaAction for TableMetaActionList { + type Error = StorageError; + + fn set_prev_version(&mut self, version: ManifestVersion) { + self.prev_version = version; + } + + fn encode(&self) -> Result, Self::Error> { + helper::encode_actions(self.prev_version, &self.actions) + } + + /// TODO(dennis): duplicated code with RegionMetaActionList::decode, try to refactor it. + fn decode( + bs: &[u8], + reader_version: ProtocolVersion, + ) -> Result<(Self, Option), Self::Error> { + let mut lines = BufReader::new(bs).lines(); + + let mut action_list = TableMetaActionList { + actions: Vec::default(), + prev_version: 0, + }; + + { + let first_line = lines + .next() + .with_context(|| DecodeMetaActionListSnafu { + msg: format!( + "Invalid content in manifest: {}", + std::str::from_utf8(bs).unwrap_or("**invalid bytes**") + ), + })? + .context(ReadlineSnafu)?; + + // Decode prev_version + let v: VersionHeader = json::from_str(&first_line).context(DecodeJsonSnafu)?; + action_list.prev_version = v.prev_version; + } + + // Decode actions + let mut protocol_action = None; + let mut actions = Vec::default(); + for line in lines { + let line = &line.context(ReadlineSnafu)?; + let action: TableMetaAction = json::from_str(line).context(DecodeJsonSnafu)?; + + if let TableMetaAction::Protocol(p) = &action { + ensure!( + p.is_readable(reader_version), + ManifestProtocolForbidReadSnafu { + min_version: p.min_reader_version, + supported_version: reader_version, + } + ); + protocol_action = Some(p.clone()); + } + + actions.push(action); + } + action_list.actions = actions; + + Ok((action_list, protocol_action)) + } +} + +#[cfg(test)] +mod tests {} diff --git a/src/table-engine/src/table.rs b/src/table-engine/src/table.rs index 2c91ad2f4d..3e0daca73b 100644 --- a/src/table-engine/src/table.rs +++ b/src/table-engine/src/table.rs @@ -22,8 +22,11 @@ use table::{ table::Table, }; +use crate::manifest::TableManifest; + /// [Table] implementation. pub struct MitoTable { + _manifest: TableManifest, table_info: TableInfo, //TODO(dennis): a table contains multi regions region: R, @@ -139,7 +142,11 @@ impl Stream for ChunkStream { } impl MitoTable { - pub fn new(table_info: TableInfo, region: R) -> Self { - Self { table_info, region } + pub fn new(table_info: TableInfo, region: R, manifest: TableManifest) -> Self { + Self { + table_info, + region, + _manifest: manifest, + } } } diff --git a/src/table-engine/src/table/test_util.rs b/src/table-engine/src/table/test_util.rs index f82dccecd9..dff33eb88f 100644 --- a/src/table-engine/src/table/test_util.rs +++ b/src/table-engine/src/table/test_util.rs @@ -6,7 +6,8 @@ use datatypes::prelude::ConcreteDataType; use datatypes::schema::SchemaRef; use datatypes::schema::{ColumnSchema, Schema}; use log_store::fs::noop::NoopLogStore; -use storage::config::EngineConfig; +use object_store::{backend::fs::Backend, ObjectStore}; +use storage::config::EngineConfig as StorageEngineConfig; use storage::EngineImpl; use table::engine::EngineContext; use table::engine::TableEngine; @@ -14,6 +15,7 @@ use table::requests::CreateTableRequest; use table::TableRef; use tempdir::TempDir; +use crate::config::EngineConfig; use crate::engine::MitoEngine; use crate::table::test_util::mock_engine::MockEngine; @@ -32,22 +34,30 @@ fn schema_for_test() -> Schema { pub type MockMitoEngine = MitoEngine; +pub async fn new_test_object_store(prefix: &str) -> (TempDir, ObjectStore) { + let dir = TempDir::new(prefix).unwrap(); + let store_dir = dir.path().to_string_lossy(); + let accessor = Backend::build().root(&store_dir).finish().await.unwrap(); + + (dir, ObjectStore::new(accessor)) +} + pub async fn setup_test_engine_and_table() -> ( MitoEngine>, TableRef, SchemaRef, TempDir, ) { - let dir = TempDir::new("setup_test_engine_and_table").unwrap(); - let store_dir = dir.path().to_string_lossy(); + let (dir, object_store) = new_test_object_store("setup_test_engine_and_table").await; let table_engine = MitoEngine::new( + EngineConfig::default(), EngineImpl::new( - EngineConfig::with_store_dir(&store_dir), + StorageEngineConfig::default(), Arc::new(NoopLogStore::default()), - ) - .await - .unwrap(), + object_store.clone(), + ), + object_store, ); let schema = Arc::new(schema_for_test()); @@ -68,9 +78,14 @@ pub async fn setup_test_engine_and_table() -> ( (table_engine, table, schema, dir) } -pub async fn setup_mock_engine_and_table() -> (MockEngine, MockMitoEngine, TableRef) { +pub async fn setup_mock_engine_and_table() -> (MockEngine, MockMitoEngine, TableRef, ObjectStore) { let mock_engine = MockEngine::default(); - let table_engine = MitoEngine::new(mock_engine.clone()); + let (_dir, object_store) = new_test_object_store("setup_mock_engine_and_table").await; + let table_engine = MitoEngine::new( + EngineConfig::default(), + mock_engine.clone(), + object_store.clone(), + ); let schema = Arc::new(schema_for_test()); let table = table_engine @@ -87,5 +102,5 @@ pub async fn setup_mock_engine_and_table() -> (MockEngine, MockMitoEngine, Table .await .unwrap(); - (mock_engine, table_engine, table) + (mock_engine, table_engine, table, object_store) } diff --git a/src/table-engine/src/table/test_util/mock_engine.rs b/src/table-engine/src/table/test_util/mock_engine.rs index a446e7be33..aacf2109d7 100644 --- a/src/table-engine/src/table/test_util/mock_engine.rs +++ b/src/table-engine/src/table/test_util/mock_engine.rs @@ -10,9 +10,9 @@ use common_telemetry::logging; use storage::metadata::{RegionMetaImpl, RegionMetadataRef}; use storage::write_batch::WriteBatch; use store_api::storage::{ - Chunk, ChunkReader, EngineContext, GetRequest, GetResponse, OpenOptions, ReadContext, Region, - RegionDescriptor, ScanRequest, ScanResponse, SchemaRef, Snapshot, StorageEngine, WriteContext, - WriteResponse, + Chunk, ChunkReader, CreateOptions, EngineContext, GetRequest, GetResponse, OpenOptions, + ReadContext, Region, RegionDescriptor, ScanRequest, ScanResponse, SchemaRef, Snapshot, + StorageEngine, WriteContext, WriteResponse, }; pub type Result = std::result::Result; @@ -153,6 +153,7 @@ impl StorageEngine for MockEngine { &self, _ctx: &EngineContext, descriptor: RegionDescriptor, + _opts: &CreateOptions, ) -> Result { logging::info!("Mock engine create region, descriptor: {:?}", descriptor); diff --git a/src/table/src/metadata.rs b/src/table/src/metadata.rs index 39b578d13f..bc7685ca04 100644 --- a/src/table/src/metadata.rs +++ b/src/table/src/metadata.rs @@ -3,6 +3,7 @@ use std::collections::HashMap; use chrono::{DateTime, Utc}; use datatypes::schema::SchemaRef; use derive_builder::Builder; +use serde::{Deserialize, Serialize}; use store_api::storage::ColumnId; pub type TableId = u64; @@ -10,7 +11,7 @@ pub type TableVersion = u64; /// Indicates whether and how a filter expression can be handled by a /// Table for table scans. -#[derive(Debug, Clone, PartialEq)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] pub enum FilterPushDownType { /// The expression cannot be used by the provider. Unsupported, @@ -26,7 +27,7 @@ pub enum FilterPushDownType { } /// Indicates the type of this table for metadata/catalog purposes. -#[derive(Debug, Clone, Copy, PartialEq)] +#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq)] pub enum TableType { /// An ordinary physical table. Base, @@ -36,13 +37,13 @@ pub enum TableType { Temporary, } -#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq, Default)] +#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, Default)] pub struct TableIdent { pub table_id: TableId, pub version: TableVersion, } -#[derive(Clone, Debug, Builder)] +#[derive(Serialize, Deserialize, Clone, Debug, Builder, PartialEq)] #[builder(pattern = "mutable")] pub struct TableMeta { pub schema: SchemaRef, @@ -90,7 +91,7 @@ impl TableMeta { } } -#[derive(Clone, Debug, Builder)] +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Builder)] #[builder(pattern = "owned")] pub struct TableInfo { #[builder(default, setter(into))]