diff --git a/Cargo.lock b/Cargo.lock index 2b09c5ae07..f6f851ccce 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1182,6 +1182,7 @@ dependencies = [ "hyper", "log-store", "metrics 0.18.1", + "object-store", "query", "serde", "serde_json", @@ -3757,6 +3758,7 @@ dependencies = [ name = "table-engine" version = "0.1.0" dependencies = [ + "arc-swap", "async-stream", "async-trait", "chrono", @@ -3768,6 +3770,9 @@ dependencies = [ "datatypes", "futures", "log-store", + "object-store", + "serde", + "serde_json", "snafu", "storage", "store-api", diff --git a/src/catalog/src/consts.rs b/src/catalog/src/consts.rs index f0307d162f..5e74470ae8 100644 --- a/src/catalog/src/consts.rs +++ b/src/catalog/src/consts.rs @@ -1,6 +1,6 @@ pub const SYSTEM_CATALOG_NAME: &str = "system"; pub const INFORMATION_SCHEMA_NAME: &str = "information_schema"; -pub const SYSTEM_CATALOG_TABLE_ID: u64 = 0; +pub const SYSTEM_CATALOG_TABLE_ID: u32 = 0; pub const SYSTEM_CATALOG_TABLE_NAME: &str = "system_catalog"; pub const DEFAULT_CATALOG_NAME: &str = "greptime"; pub const DEFAULT_SCHEMA_NAME: &str = "public"; diff --git a/src/catalog/src/system.rs b/src/catalog/src/system.rs index 6df6cc6d2a..406fcba870 100644 --- a/src/catalog/src/system.rs +++ b/src/catalog/src/system.rs @@ -8,6 +8,7 @@ use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt, ResultExt}; use table::engine::{EngineContext, TableEngineRef}; +use table::metadata::TableId; use table::requests::{CreateTableRequest, OpenTableRequest}; use table::{Table, TableRef}; @@ -65,6 +66,7 @@ impl SystemCatalogTable { } else { // system catalog table is not yet created, try to create let request = CreateTableRequest { + id: SYSTEM_CATALOG_TABLE_ID, name: SYSTEM_CATALOG_TABLE_NAME.to_string(), desc: Some("System catalog table".to_string()), schema: schema.clone(), @@ -219,12 +221,12 @@ pub struct TableEntry { pub catalog_name: String, pub schema_name: String, pub table_name: String, - pub table_id: u64, + pub table_id: TableId, } #[derive(Debug, Serialize, Deserialize, PartialEq)] pub struct TableEntryValue { - pub table_id: u64, + pub table_id: TableId, } #[cfg(test)] diff --git a/src/cmd/src/bin/greptime.rs b/src/cmd/src/bin/greptime.rs index 26436e6af3..54140306b7 100644 --- a/src/cmd/src/bin/greptime.rs +++ b/src/cmd/src/bin/greptime.rs @@ -8,7 +8,7 @@ use common_telemetry::{self, logging::error, logging::info}; #[derive(Parser)] #[clap(name = "greptimedb")] struct Command { - #[clap(long, default_value = "/tmp/greptime/logs")] + #[clap(long, default_value = "/tmp/greptimedb/logs")] log_dir: String, #[clap(long, default_value = "info")] log_level: String, diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index 1c2afbf6f6..3b48f8c5aa 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -18,6 +18,7 @@ datatypes = { path = "../datatypes"} hyper = { version = "0.14", features = ["full"] } log-store = { path = "../log-store" } metrics = "0.18" +object-store = { path = "../object-store" } query = { path = "../query" } serde = "1.0" serde_json = "1.0" diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index d4762091a9..b8c506c057 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -4,11 +4,37 @@ use crate::error::Result; use crate::instance::{Instance, InstanceRef}; use crate::server::Services; +#[derive(Debug, Clone)] +pub struct FileStoreConfig { + /// Storage path + pub store_dir: String, +} + +impl Default for FileStoreConfig { + fn default() -> Self { + Self { + store_dir: "/tmp/greptimedb/data/".to_string(), + } + } +} + +#[derive(Debug, Clone)] +pub enum ObjectStoreConfig { + File(FileStoreConfig), +} + +impl Default for ObjectStoreConfig { + fn default() -> Self { + ObjectStoreConfig::File(FileStoreConfig::default()) + } +} + #[derive(Clone, Debug)] pub struct DatanodeOptions { pub http_addr: String, pub rpc_addr: String, pub wal_dir: String, + pub store_config: ObjectStoreConfig, } impl Default for DatanodeOptions { @@ -17,6 +43,7 @@ impl Default for DatanodeOptions { http_addr: Default::default(), rpc_addr: Default::default(), wal_dir: "/tmp/wal".to_string(), + store_config: ObjectStoreConfig::default(), } } } diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index de5c9da3e4..12f7d30fbe 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -108,6 +108,13 @@ pub enum Error { #[snafu(display("Failed to storage engine, source: {}", source))] OpenStorageEngine { source: StorageError }, + + #[snafu(display("Failed to init backend, dir: {}, source: {}", dir, source))] + InitBackend { + dir: String, + source: std::io::Error, + backtrace: Backtrace, + }, } pub type Result = std::result::Result; @@ -133,6 +140,7 @@ impl ErrorExt for Error { | Error::TcpBind { .. } | Error::StartGrpc { .. } | Error::CreateDir { .. } => StatusCode::Internal, + Error::InitBackend { .. } => StatusCode::StorageUnavailable, Error::OpenLogStore { source } => source.status_code(), Error::OpenStorageEngine { source } => source.status_code(), } diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 33a53e969a..c5c605ba85 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -6,16 +6,18 @@ use common_telemetry::logging::info; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnSchema, Schema}; use log_store::fs::{config::LogConfig, log::LocalFileLogStore}; +use object_store::{backend::fs::Backend, util, ObjectStore}; use query::query_engine::{Output, QueryEngineFactory, QueryEngineRef}; use snafu::{OptionExt, ResultExt}; use sql::statements::statement::Statement; -use storage::{config::EngineConfig, EngineImpl}; +use storage::{config::EngineConfig as StorageEngineConfig, EngineImpl}; use table::engine::EngineContext; use table::engine::TableEngine; use table::requests::CreateTableRequest; +use table_engine::config::EngineConfig as TableEngineConfig; use table_engine::engine::MitoEngine; -use crate::datanode::DatanodeOptions; +use crate::datanode::{DatanodeOptions, ObjectStoreConfig}; use crate::error::{ self, CreateTableSnafu, ExecuteSqlSnafu, InsertSnafu, NewCatalogSnafu, Result, TableNotFoundSnafu, @@ -39,11 +41,17 @@ pub type InstanceRef = Arc; impl Instance { pub async fn new(opts: &DatanodeOptions) -> Result { + let object_store = new_object_store(&opts.store_config).await?; let log_store = create_local_file_log_store(opts).await?; + let table_engine = DefaultEngine::new( - EngineImpl::new(EngineConfig::default(), Arc::new(log_store)) - .await - .context(error::OpenStorageEngineSnafu)?, + TableEngineConfig::default(), + EngineImpl::new( + StorageEngineConfig::default(), + Arc::new(log_store), + object_store.clone(), + ), + object_store, ); let catalog_manager = Arc::new( catalog::LocalCatalogManager::try_new(Arc::new(table_engine.clone())) @@ -138,6 +146,7 @@ impl Instance { .create_table( &EngineContext::default(), CreateTableRequest { + id: 1, name: table_name.to_string(), desc: Some(" a test table".to_string()), schema: Arc::new( @@ -166,6 +175,26 @@ impl Instance { } } +async fn new_object_store(store_config: &ObjectStoreConfig) -> Result { + // TODO(dennis): supports other backend + let store_dir = util::normalize_dir(match store_config { + ObjectStoreConfig::File(file) => &file.store_dir, + }); + + fs::create_dir_all(path::Path::new(&store_dir)) + .context(error::CreateDirSnafu { dir: &store_dir })?; + + info!("The storage directory is: {}", &store_dir); + + let accessor = Backend::build() + .root(&store_dir) + .finish() + .await + .context(error::InitBackendSnafu { dir: &store_dir })?; + + Ok(ObjectStore::new(accessor)) +} + async fn create_local_file_log_store(opts: &DatanodeOptions) -> Result { // create WAL directory fs::create_dir_all(path::Path::new(&opts.wal_dir)) @@ -196,7 +225,7 @@ mod tests { #[tokio::test] async fn test_execute_insert() { common_telemetry::init_default_ut_logging(); - let (opts, _tmp_dir) = test_util::create_tmp_dir_and_datanode_opts(); + let (opts, _guard) = test_util::create_tmp_dir_and_datanode_opts(); let instance = Instance::new(&opts).await.unwrap(); instance.start().await.unwrap(); @@ -215,7 +244,7 @@ mod tests { #[tokio::test] async fn test_execute_query() { - let (opts, _tmp_dir) = test_util::create_tmp_dir_and_datanode_opts(); + let (opts, _guard) = test_util::create_tmp_dir_and_datanode_opts(); let instance = Instance::new(&opts).await.unwrap(); instance.start().await.unwrap(); diff --git a/src/datanode/src/server/http/handler.rs b/src/datanode/src/server/http/handler.rs index 0c0d17f9ee..34380442b6 100644 --- a/src/datanode/src/server/http/handler.rs +++ b/src/datanode/src/server/http/handler.rs @@ -47,7 +47,7 @@ mod tests { use super::*; use crate::instance::Instance; use crate::server::http::JsonOutput; - use crate::test_util; + use crate::test_util::{self, TestGuard}; fn create_params() -> Query> { let mut map = HashMap::new(); @@ -58,16 +58,16 @@ mod tests { Query(map) } - async fn create_extension() -> Extension { - let (opts, _tmp_dir) = test_util::create_tmp_dir_and_datanode_opts(); + async fn create_extension() -> (Extension, TestGuard) { + let (opts, guard) = test_util::create_tmp_dir_and_datanode_opts(); let instance = Arc::new(Instance::new(&opts).await.unwrap()); instance.start().await.unwrap(); - Extension(instance) + (Extension(instance), guard) } #[tokio::test] async fn test_sql_not_provided() { - let extension = create_extension().await; + let (extension, _guard) = create_extension().await; let json = sql(extension, Query(HashMap::default())).await; match json { @@ -84,7 +84,7 @@ mod tests { async fn test_sql_output_rows() { common_telemetry::init_default_ut_logging(); let query = create_params(); - let extension = create_extension().await; + let (extension, _guard) = create_extension().await; let json = sql(extension, query).await; @@ -112,7 +112,7 @@ mod tests { counter!("test_metrics", 1); let query = create_params(); - let extension = create_extension().await; + let (extension, _guard) = create_extension().await; let text = metrics(extension, query).await; match text { diff --git a/src/datanode/src/sql.rs b/src/datanode/src/sql.rs index 10e45a5e12..13a5353b51 100644 --- a/src/datanode/src/sql.rs +++ b/src/datanode/src/sql.rs @@ -65,11 +65,13 @@ mod tests { use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; use datatypes::value::Value; use log_store::fs::noop::NoopLogStore; + use object_store::{backend::fs::Backend, ObjectStore}; use query::QueryEngineFactory; - use storage::config::EngineConfig; + use storage::config::EngineConfig as StorageEngineConfig; use storage::EngineImpl; use table::error::Result as TableResult; use table::{Table, TableRef}; + use table_engine::config::EngineConfig as TableEngineConfig; use table_engine::engine::MitoEngine; use tempdir::TempDir; @@ -138,6 +140,8 @@ mod tests { async fn test_statement_to_request() { let dir = TempDir::new("setup_test_engine_and_table").unwrap(); let store_dir = dir.path().to_string_lossy(); + let accessor = Backend::build().root(&store_dir).finish().await.unwrap(); + let object_store = ObjectStore::new(accessor); let catalog_list = catalog::memory::new_memory_catalog_list().unwrap(); let factory = QueryEngineFactory::new(catalog_list); @@ -149,12 +153,13 @@ mod tests { "#; let table_engine = MitoEngine::>::new( + TableEngineConfig::default(), EngineImpl::new( - EngineConfig::with_store_dir(&store_dir), + StorageEngineConfig::default(), Arc::new(NoopLogStore::default()), - ) - .await - .unwrap(), + object_store.clone(), + ), + object_store, ); let sql_handler = SqlHandler::new(table_engine); diff --git a/src/datanode/src/test_util.rs b/src/datanode/src/test_util.rs index 6aee7d3bfa..6b3d1513d3 100644 --- a/src/datanode/src/test_util.rs +++ b/src/datanode/src/test_util.rs @@ -1,17 +1,32 @@ use tempdir::TempDir; -use crate::datanode::DatanodeOptions; +use crate::datanode::{DatanodeOptions, FileStoreConfig, ObjectStoreConfig}; /// Create a tmp dir(will be deleted once it goes out of scope.) and a default `DatanodeOptions`, /// Only for test. /// /// TODO: Add a test feature -pub fn create_tmp_dir_and_datanode_opts() -> (DatanodeOptions, TempDir) { - let tmp_dir = TempDir::new("/tmp/greptimedb_test").unwrap(); +pub struct TestGuard { + _wal_tmp_dir: TempDir, + _data_tmp_dir: TempDir, +} + +pub fn create_tmp_dir_and_datanode_opts() -> (DatanodeOptions, TestGuard) { + let wal_tmp_dir = TempDir::new("/tmp/greptimedb_test_wal").unwrap(); + let data_tmp_dir = TempDir::new("/tmp/greptimedb_test_data").unwrap(); let opts = DatanodeOptions { - wal_dir: tmp_dir.path().to_str().unwrap().to_string(), + wal_dir: wal_tmp_dir.path().to_str().unwrap().to_string(), + store_config: ObjectStoreConfig::File(FileStoreConfig { + store_dir: data_tmp_dir.path().to_str().unwrap().to_string(), + }), ..Default::default() }; - (opts, tmp_dir) + ( + opts, + TestGuard { + _wal_tmp_dir: wal_tmp_dir, + _data_tmp_dir: data_tmp_dir, + }, + ) } diff --git a/src/datanode/src/tests/http_test.rs b/src/datanode/src/tests/http_test.rs index eabc4c9adb..8db7cc11af 100644 --- a/src/datanode/src/tests/http_test.rs +++ b/src/datanode/src/tests/http_test.rs @@ -8,20 +8,20 @@ use axum_test_helper::TestClient; use crate::instance::Instance; use crate::server::http::HttpServer; -use crate::test_util; +use crate::test_util::{self, TestGuard}; -async fn make_test_app() -> Router { - let (opts, _tmp_dir) = test_util::create_tmp_dir_and_datanode_opts(); +async fn make_test_app() -> (Router, TestGuard) { + let (opts, guard) = test_util::create_tmp_dir_and_datanode_opts(); let instance = Arc::new(Instance::new(&opts).await.unwrap()); instance.start().await.unwrap(); let http_server = HttpServer::new(instance); - http_server.make_app() + (http_server.make_app(), guard) } #[tokio::test] async fn test_sql_api() { common_telemetry::init_default_ut_logging(); - let app = make_test_app().await; + let (app, _guard) = make_test_app().await; let client = TestClient::new(app); let res = client.get("/sql").send().await; assert_eq!(res.status(), StatusCode::OK); @@ -49,7 +49,7 @@ async fn test_sql_api() { async fn test_metrics_api() { common_telemetry::init_default_ut_logging(); common_telemetry::init_default_metrics_recorder(); - let app = make_test_app().await; + let (app, _guard) = make_test_app().await; let client = TestClient::new(app); // Send a sql diff --git a/src/storage/src/config.rs b/src/storage/src/config.rs index 6294095aa2..b1adf325e7 100644 --- a/src/storage/src/config.rs +++ b/src/storage/src/config.rs @@ -1,56 +1,4 @@ -//! Engine config -#[derive(Debug, Clone)] -pub struct FileStoreConfig { - /// Storage path - pub store_dir: String, -} +//! storage engine config -impl Default for FileStoreConfig { - fn default() -> Self { - Self { - store_dir: "/tmp/greptimedb/".to_string(), - } - } -} - -#[derive(Debug, Clone)] -pub enum ObjectStoreConfig { - File(FileStoreConfig), -} - -impl Default for ObjectStoreConfig { - fn default() -> Self { - ObjectStoreConfig::File(FileStoreConfig::default()) - } -} - -#[derive(Debug, Clone, Default)] -pub struct EngineConfig { - pub store_config: ObjectStoreConfig, -} - -impl EngineConfig { - pub fn with_store_dir(store_dir: &str) -> Self { - Self { - store_config: ObjectStoreConfig::File(FileStoreConfig { - store_dir: store_dir.to_string(), - }), - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_default_engine_config() { - let engine_config = EngineConfig::default(); - - let store_dir = match &engine_config.store_config { - ObjectStoreConfig::File(file) => &file.store_dir, - }; - - assert_eq!("/tmp/greptimedb/", store_dir); - } -} +#[derive(Debug, Default, Clone)] +pub struct EngineConfig {} diff --git a/src/storage/src/engine.rs b/src/storage/src/engine.rs index 0f774b839c..39dd931319 100644 --- a/src/storage/src/engine.rs +++ b/src/storage/src/engine.rs @@ -3,15 +3,15 @@ use std::sync::{Arc, RwLock}; use async_trait::async_trait; use common_telemetry::logging::info; -use object_store::{backend::fs::Backend, util, ObjectStore}; +use object_store::{util, ObjectStore}; use snafu::ResultExt; use store_api::{ logstore::LogStore, - storage::{EngineContext, OpenOptions, RegionDescriptor, StorageEngine}, + storage::{CreateOptions, EngineContext, OpenOptions, RegionDescriptor, StorageEngine}, }; use crate::background::JobPoolImpl; -use crate::config::{EngineConfig, ObjectStoreConfig}; +use crate::config::EngineConfig; use crate::error::{self, Error, Result}; use crate::flush::{FlushSchedulerImpl, FlushSchedulerRef, FlushStrategyRef, SizeBasedStrategy}; use crate::manifest::region::RegionManifest; @@ -55,8 +55,9 @@ impl StorageEngine for EngineImpl { &self, _ctx: &EngineContext, descriptor: RegionDescriptor, + opts: &CreateOptions, ) -> Result { - self.inner.create_region(descriptor).await + self.inner.create_region(descriptor, opts).await } async fn drop_region(&self, _ctx: &EngineContext, _region: Self::Region) -> Result<()> { @@ -69,36 +70,25 @@ impl StorageEngine for EngineImpl { } impl EngineImpl { - pub async fn new(config: EngineConfig, log_store: Arc) -> Result { - Ok(Self { - inner: Arc::new(EngineInner::new(config, log_store).await?), - }) + pub fn new(config: EngineConfig, log_store: Arc, object_store: ObjectStore) -> Self { + Self { + inner: Arc::new(EngineInner::new(config, log_store, object_store)), + } } } -async fn new_object_store(store_config: &ObjectStoreConfig) -> Result { - // TODO(dennis): supports other backend - let store_dir = util::normalize_dir(match store_config { - ObjectStoreConfig::File(file) => &file.store_dir, - }); - - let accessor = Backend::build() - .root(&store_dir) - .finish() - .await - .context(error::InitBackendSnafu { dir: &store_dir })?; - - Ok(ObjectStore::new(accessor)) +/// Generate region sst path, +/// parent_dir is resolved in function `region_store_config` to ensure it's ended with '/'. +#[inline] +pub fn region_sst_dir(parent_dir: &str, region_name: &str) -> String { + format!("{}{}/", parent_dir, region_name) } +/// Generate region manifest path, +/// parent_dir is resolved in function `region_store_config` to ensure it's ended with '/'. #[inline] -pub fn region_sst_dir(region_name: &str) -> String { - format!("{}/", region_name) -} - -#[inline] -pub fn region_manifest_dir(region_name: &str) -> String { - format!("{}/manifest/", region_name) +pub fn region_manifest_dir(parent_dir: &str, region_name: &str) -> String { + format!("{}{}/manifest/", parent_dir, region_name) } /// A slot for region in the engine. @@ -209,19 +199,18 @@ struct EngineInner { } impl EngineInner { - pub async fn new(config: EngineConfig, log_store: Arc) -> Result { + pub fn new(_config: EngineConfig, log_store: Arc, object_store: ObjectStore) -> Self { let job_pool = Arc::new(JobPoolImpl {}); let flush_scheduler = Arc::new(FlushSchedulerImpl::new(job_pool)); - let object_store = new_object_store(&config.store_config).await?; - Ok(Self { + Self { object_store, log_store, regions: RwLock::new(Default::default()), memtable_builder: Arc::new(DefaultMemtableBuilder {}), flush_scheduler, flush_strategy: Arc::new(SizeBasedStrategy::default()), - }) + } } /// Returns the `Some(slot)` if there is existing slot with given `name`, or insert @@ -256,8 +245,7 @@ impl EngineInner { let mut guard = SlotGuard::new(name, &self.regions); - // FIXME(yingwen): Get region id or remove dependency of region id. - let store_config = self.region_store_config(name); + let store_config = self.region_store_config(&opts.parent_dir, name); let region = match RegionImpl::open(name.to_string(), store_config, opts).await? { None => return Ok(None), @@ -268,7 +256,11 @@ impl EngineInner { Ok(Some(region)) } - async fn create_region(&self, descriptor: RegionDescriptor) -> Result> { + async fn create_region( + &self, + descriptor: RegionDescriptor, + opts: &CreateOptions, + ) -> Result> { if let Some(slot) = self.get_or_occupy_slot(&descriptor.name, RegionSlot::Creating) { return slot.try_get_ready_region(); } @@ -283,7 +275,7 @@ impl EngineInner { .context(error::InvalidRegionDescSnafu { region: ®ion_name, })?; - let store_config = self.region_store_config(®ion_name); + let store_config = self.region_store_config(&opts.parent_dir, ®ion_name); let region = RegionImpl::create(metadata, store_config).await?; @@ -299,10 +291,12 @@ impl EngineInner { slot.get_ready_region() } - fn region_store_config(&self, region_name: &str) -> StoreConfig { - let sst_dir = ®ion_sst_dir(region_name); + fn region_store_config(&self, parent_dir: &str, region_name: &str) -> StoreConfig { + let parent_dir = util::normalize_dir(parent_dir); + + let sst_dir = ®ion_sst_dir(&parent_dir, region_name); let sst_layer = Arc::new(FsAccessLayer::new(sst_dir, self.object_store.clone())); - let manifest_dir = region_manifest_dir(region_name); + let manifest_dir = region_manifest_dir(&parent_dir, region_name); let manifest = RegionManifest::new(&manifest_dir, self.object_store.clone()); StoreConfig { @@ -320,6 +314,7 @@ impl EngineInner { mod tests { use datatypes::type_id::LogicalTypeId; use log_store::test_util::log_store_util; + use object_store::backend::fs::Backend; use store_api::storage::Region; use tempdir::TempDir; @@ -332,9 +327,12 @@ mod tests { log_store_util::create_tmp_local_file_log_store("test_engine_wal").await; let dir = TempDir::new("test_create_new_region").unwrap(); let store_dir = dir.path().to_string_lossy(); - let config = EngineConfig::with_store_dir(&store_dir); + let accessor = Backend::build().root(&store_dir).finish().await.unwrap(); + let object_store = ObjectStore::new(accessor); - let engine = EngineImpl::new(config, Arc::new(log_store)).await.unwrap(); + let config = EngineConfig::default(); + + let engine = EngineImpl::new(config, Arc::new(log_store), object_store); let region_name = "region-0"; let desc = RegionDescBuilder::new(region_name) @@ -342,7 +340,10 @@ mod tests { .push_value_column(("v1", LogicalTypeId::Float32, true)) .build(); let ctx = EngineContext::default(); - let region = engine.create_region(&ctx, desc).await.unwrap(); + let region = engine + .create_region(&ctx, desc, &CreateOptions::default()) + .await + .unwrap(); assert_eq!(region_name, region.name()); diff --git a/src/storage/src/error.rs b/src/storage/src/error.rs index dd26c0402e..a445053574 100644 --- a/src/storage/src/error.rs +++ b/src/storage/src/error.rs @@ -13,7 +13,7 @@ use store_api::storage::SequenceNumber; use crate::metadata::Error as MetadataError; #[derive(Debug, Snafu)] -#[snafu(visibility(pub(crate)))] +#[snafu(visibility(pub))] pub enum Error { #[snafu(display("Invalid region descriptor, region: {}, source: {}", region, source))] InvalidRegionDesc { @@ -43,13 +43,6 @@ pub enum Error { backtrace: Backtrace, }, - #[snafu(display("Failed to init backend, source: {}", source))] - InitBackend { - dir: String, - source: std::io::Error, - backtrace: Backtrace, - }, - #[snafu(display("Failed to write parquet file, source: {}", source))] WriteParquet { source: arrow::error::ArrowError, @@ -162,8 +155,8 @@ pub enum Error { backtrace: Backtrace, }, - #[snafu(display("Failed to decode region action list, {}", msg))] - DecodeRegionMetaActionList { msg: String, backtrace: Backtrace }, + #[snafu(display("Failed to decode action list, {}", msg))] + DecodeMetaActionList { msg: String, backtrace: Backtrace }, #[snafu(display("Failed to read line, err: {}", source))] Readline { source: IoError }, @@ -249,7 +242,7 @@ impl ErrorExt for Error { | DecodeJson { .. } | JoinTask { .. } | Cancelled { .. } - | DecodeRegionMetaActionList { .. } + | DecodeMetaActionList { .. } | Readline { .. } | InvalidParquetSchema { .. } | SequenceColumnNotFound { .. } @@ -258,7 +251,6 @@ impl ErrorExt for Error { | SequenceNotMonotonic { .. } => StatusCode::Unexpected, FlushIo { .. } - | InitBackend { .. } | WriteParquet { .. } | ReadObject { .. } | WriteObject { .. } diff --git a/src/storage/src/lib.rs b/src/storage/src/lib.rs index 3464dfe61e..8b31f52122 100644 --- a/src/storage/src/lib.rs +++ b/src/storage/src/lib.rs @@ -8,7 +8,7 @@ pub mod config; mod engine; pub mod error; mod flush; -mod manifest; +pub mod manifest; pub mod memtable; pub mod metadata; mod proto; diff --git a/src/storage/src/manifest.rs b/src/storage/src/manifest.rs index 3c9ae3cabd..ada6d31b21 100644 --- a/src/storage/src/manifest.rs +++ b/src/storage/src/manifest.rs @@ -1,7 +1,11 @@ //! manifest storage pub(crate) mod action; pub(crate) mod checkpoint; +pub mod helper; +mod impl_; pub mod region; pub(crate) mod storage; #[cfg(test)] pub mod test_utils; + +pub use self::impl_::*; diff --git a/src/storage/src/manifest/action.rs b/src/storage/src/manifest/action.rs index 8b356afb28..6b8fd3de69 100644 --- a/src/storage/src/manifest/action.rs +++ b/src/storage/src/manifest/action.rs @@ -1,20 +1,19 @@ -use std::io::{BufRead, BufReader, Write}; +use std::io::{BufRead, BufReader}; use serde::{Deserialize, Serialize}; use serde_json as json; -use serde_json::ser::to_writer; use snafu::{ensure, OptionExt, ResultExt}; -use store_api::manifest::action::ProtocolAction; -use store_api::manifest::action::ProtocolVersion; +use store_api::manifest::action::{ProtocolAction, ProtocolVersion, VersionHeader}; use store_api::manifest::ManifestVersion; use store_api::manifest::MetaAction; use store_api::storage::RegionId; use store_api::storage::SequenceNumber; use crate::error::{ - DecodeJsonSnafu, DecodeRegionMetaActionListSnafu, EncodeJsonSnafu, - ManifestProtocolForbidReadSnafu, ReadlineSnafu, Result, + self, DecodeJsonSnafu, DecodeMetaActionListSnafu, ManifestProtocolForbidReadSnafu, + ReadlineSnafu, Result, }; +use crate::manifest::helper; use crate::metadata::{RegionMetadataRef, VersionNumber}; use crate::sst::FileMeta; @@ -50,13 +49,6 @@ pub enum RegionMetaAction { Edit(RegionEdit), } -#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] -struct VersionHeader { - prev_version: ManifestVersion, -} - -const NEWLINE: &[u8] = b"\n"; - impl RegionMetaActionList { pub fn with_action(action: RegionMetaAction) -> Self { Self { @@ -71,31 +63,21 @@ impl RegionMetaActionList { prev_version: 0, } } +} - /// Encode self into json in the form of string lines, starts with prev_version and then action json list. - pub(crate) fn encode(&self) -> Result> { - let mut bytes = Vec::default(); +impl MetaAction for RegionMetaActionList { + type Error = error::Error; - { - // Encode prev_version - let v = VersionHeader { - prev_version: self.prev_version, - }; - - to_writer(&mut bytes, &v).context(EncodeJsonSnafu)?; - // unwrap is fine here, because we write into a buffer. - bytes.write_all(NEWLINE).unwrap(); - } - - for action in &self.actions { - to_writer(&mut bytes, action).context(EncodeJsonSnafu)?; - bytes.write_all(NEWLINE).unwrap(); - } - - Ok(bytes) + fn set_prev_version(&mut self, version: ManifestVersion) { + self.prev_version = version; } - pub(crate) fn decode( + /// Encode self into json in the form of string lines, starts with prev_version and then action json list. + fn encode(&self) -> Result> { + helper::encode_actions(self.prev_version, &self.actions) + } + + fn decode( bs: &[u8], reader_version: ProtocolVersion, ) -> Result<(Self, Option)> { @@ -109,7 +91,7 @@ impl RegionMetaActionList { { let first_line = lines .next() - .with_context(|| DecodeRegionMetaActionListSnafu { + .with_context(|| DecodeMetaActionListSnafu { msg: format!( "Invalid content in manifest: {}", std::str::from_utf8(bs).unwrap_or("**invalid bytes**") @@ -148,12 +130,6 @@ impl RegionMetaActionList { } } -impl MetaAction for RegionMetaActionList { - fn set_prev_version(&mut self, version: ManifestVersion) { - self.prev_version = version; - } -} - #[cfg(test)] mod tests { use common_telemetry::logging; diff --git a/src/storage/src/manifest/helper.rs b/src/storage/src/manifest/helper.rs new file mode 100644 index 0000000000..eb9a0f6b6b --- /dev/null +++ b/src/storage/src/manifest/helper.rs @@ -0,0 +1,33 @@ +use std::io::Write; + +use serde::Serialize; +use serde_json::to_writer; +use snafu::ResultExt; +use store_api::manifest::action::VersionHeader; +use store_api::manifest::ManifestVersion; + +use crate::error::{EncodeJsonSnafu, Result}; + +pub const NEWLINE: &[u8] = b"\n"; + +pub fn encode_actions( + prev_version: ManifestVersion, + actions: &[T], +) -> Result> { + let mut bytes = Vec::default(); + { + // Encode prev_version + let v = VersionHeader { prev_version }; + + to_writer(&mut bytes, &v).context(EncodeJsonSnafu)?; + // unwrap is fine here, because we write into a buffer. + bytes.write_all(NEWLINE).unwrap(); + } + + for action in actions { + to_writer(&mut bytes, action).context(EncodeJsonSnafu)?; + bytes.write_all(NEWLINE).unwrap(); + } + + Ok(bytes) +} diff --git a/src/storage/src/manifest/impl_.rs b/src/storage/src/manifest/impl_.rs new file mode 100644 index 0000000000..a185da44e9 --- /dev/null +++ b/src/storage/src/manifest/impl_.rs @@ -0,0 +1,177 @@ +use std::marker::PhantomData; +use std::sync::{ + atomic::{AtomicU64, Ordering}, + Arc, +}; + +use arc_swap::ArcSwap; +use async_trait::async_trait; +use common_telemetry::logging; +use object_store::ObjectStore; +use snafu::ensure; +use store_api::manifest::action::{self, ProtocolAction, ProtocolVersion}; +use store_api::manifest::*; + +use crate::error::{Error, ManifestProtocolForbidWriteSnafu, Result}; +use crate::manifest::storage::ManifestObjectStore; +use crate::manifest::storage::ObjectStoreLogIterator; + +#[derive(Clone, Debug)] +pub struct ManifestImpl> { + inner: Arc>, +} + +impl> ManifestImpl { + pub fn new(manifest_dir: &str, object_store: ObjectStore) -> Self { + ManifestImpl { + inner: Arc::new(ManifestImplInner::new(manifest_dir, object_store)), + } + } + + /// Update inner state. + pub fn update_state(&self, version: ManifestVersion, protocol: Option) { + self.inner.update_state(version, protocol); + } +} + +#[async_trait] +impl> Manifest for ManifestImpl { + type Error = Error; + type MetaAction = M; + type MetaActionIterator = MetaActionIteratorImpl; + + async fn update(&self, action_list: M) -> Result { + self.inner.save(action_list).await + } + + async fn scan( + &self, + start: ManifestVersion, + end: ManifestVersion, + ) -> Result { + self.inner.scan(start, end).await + } + + async fn checkpoint(&self) -> Result { + unimplemented!(); + } + + fn last_version(&self) -> ManifestVersion { + self.inner.last_version() + } +} + +#[derive(Debug)] +struct ManifestImplInner> { + store: Arc, + version: AtomicU64, + /// Current using protocol + protocol: ArcSwap, + /// Current node supported protocols (reader_version, writer_version) + supported_reader_version: ProtocolVersion, + supported_writer_version: ProtocolVersion, + _phantom: PhantomData, +} + +pub struct MetaActionIteratorImpl> { + log_iter: ObjectStoreLogIterator, + reader_version: ProtocolVersion, + last_protocol: Option, + _phantom: PhantomData, +} + +impl> MetaActionIteratorImpl { + pub fn last_protocol(&self) -> &Option { + &self.last_protocol + } +} + +#[async_trait] +impl> MetaActionIterator for MetaActionIteratorImpl { + type Error = Error; + type MetaAction = M; + + async fn next_action(&mut self) -> Result> { + match self.log_iter.next_log().await? { + Some((v, bytes)) => { + let (action_list, protocol) = M::decode(&bytes, self.reader_version)?; + + if protocol.is_some() { + self.last_protocol = protocol; + } + + Ok(Some((v, action_list))) + } + None => Ok(None), + } + } +} + +impl> ManifestImplInner { + fn new(manifest_dir: &str, object_store: ObjectStore) -> Self { + let (reader_version, writer_version) = action::supported_protocol_version(); + + Self { + store: Arc::new(ManifestObjectStore::new(manifest_dir, object_store)), + version: AtomicU64::new(0), + protocol: ArcSwap::new(Arc::new(ProtocolAction::new())), + supported_reader_version: reader_version, + supported_writer_version: writer_version, + _phantom: PhantomData, + } + } + + #[inline] + fn inc_version(&self) -> ManifestVersion { + self.version.fetch_add(1, Ordering::Relaxed) + } + + fn update_state(&self, version: ManifestVersion, protocol: Option) { + self.version.store(version, Ordering::Relaxed); + if let Some(p) = protocol { + self.protocol.store(Arc::new(p)); + } + } + + #[inline] + fn last_version(&self) -> ManifestVersion { + self.version.load(Ordering::Relaxed) + } + + async fn save(&self, action_list: M) -> Result { + let protocol = self.protocol.load(); + + ensure!( + protocol.is_writable(self.supported_writer_version), + ManifestProtocolForbidWriteSnafu { + min_version: protocol.min_writer_version, + supported_version: self.supported_writer_version, + } + ); + + let version = self.inc_version(); + + logging::debug!( + "Save region metadata action: {:?}, version: {}", + action_list, + version + ); + + self.store.save(version, &action_list.encode()?).await?; + + Ok(version) + } + + async fn scan( + &self, + start: ManifestVersion, + end: ManifestVersion, + ) -> Result> { + Ok(MetaActionIteratorImpl { + log_iter: self.store.scan(start, end).await?, + reader_version: self.supported_reader_version, + last_protocol: None, + _phantom: PhantomData, + }) + } +} diff --git a/src/storage/src/manifest/region.rs b/src/storage/src/manifest/region.rs index e3f83338f2..92aea2980e 100644 --- a/src/storage/src/manifest/region.rs +++ b/src/storage/src/manifest/region.rs @@ -1,182 +1,15 @@ //! Region manifest impl -use std::sync::{ - atomic::{AtomicU64, Ordering}, - Arc, -}; - -use arc_swap::ArcSwap; -use async_trait::async_trait; -use common_telemetry::logging; -use object_store::ObjectStore; -use snafu::ensure; -use store_api::manifest::action::{self, ProtocolAction, ProtocolVersion}; -use store_api::manifest::*; - -use crate::error::{Error, ManifestProtocolForbidWriteSnafu, Result}; use crate::manifest::action::*; -use crate::manifest::storage::ManifestObjectStore; -use crate::manifest::storage::ObjectStoreLogIterator; +use crate::manifest::ManifestImpl; -#[derive(Clone, Debug)] -pub struct RegionManifest { - inner: Arc, -} - -impl RegionManifest { - pub fn new(manifest_dir: &str, object_store: ObjectStore) -> Self { - RegionManifest { - inner: Arc::new(RegionManifestInner::new(manifest_dir, object_store)), - } - } - - /// Update inner state. - pub fn update_state(&self, version: ManifestVersion, protocol: Option) { - self.inner.update_state(version, protocol); - } -} - -#[async_trait] -impl Manifest for RegionManifest { - type Error = Error; - type MetaAction = RegionMetaActionList; - type MetaActionIterator = RegionMetaActionListIterator; - - async fn update(&self, action_list: RegionMetaActionList) -> Result { - self.inner.save(action_list).await - } - - async fn scan( - &self, - start: ManifestVersion, - end: ManifestVersion, - ) -> Result { - self.inner.scan(start, end).await - } - - async fn checkpoint(&self) -> Result { - unimplemented!(); - } - - fn last_version(&self) -> ManifestVersion { - self.inner.last_version() - } -} - -#[derive(Debug)] -struct RegionManifestInner { - store: Arc, - version: AtomicU64, - /// Current using protocol - protocol: ArcSwap, - /// Current node supported protocols (reader_version, writer_version) - supported_reader_version: ProtocolVersion, - supported_writer_version: ProtocolVersion, -} - -pub struct RegionMetaActionListIterator { - log_iter: ObjectStoreLogIterator, - reader_version: ProtocolVersion, - last_protocol: Option, -} - -impl RegionMetaActionListIterator { - pub fn last_protocol(&self) -> &Option { - &self.last_protocol - } -} - -#[async_trait] -impl MetaActionIterator for RegionMetaActionListIterator { - type Error = Error; - type MetaAction = RegionMetaActionList; - - async fn next_action(&mut self) -> Result> { - match self.log_iter.next_log().await? { - Some((v, bytes)) => { - let (action_list, protocol) = - RegionMetaActionList::decode(&bytes, self.reader_version)?; - - if protocol.is_some() { - self.last_protocol = protocol; - } - - Ok(Some((v, action_list))) - } - None => Ok(None), - } - } -} - -impl RegionManifestInner { - fn new(manifest_dir: &str, object_store: ObjectStore) -> Self { - let (reader_version, writer_version) = action::supported_protocol_version(); - - Self { - store: Arc::new(ManifestObjectStore::new(manifest_dir, object_store)), - version: AtomicU64::new(0), - protocol: ArcSwap::new(Arc::new(ProtocolAction::new())), - supported_reader_version: reader_version, - supported_writer_version: writer_version, - } - } - - #[inline] - fn inc_version(&self) -> ManifestVersion { - self.version.fetch_add(1, Ordering::Relaxed) - } - - fn update_state(&self, version: ManifestVersion, protocol: Option) { - self.version.store(version, Ordering::Relaxed); - if let Some(p) = protocol { - self.protocol.store(Arc::new(p)); - } - } - - #[inline] - fn last_version(&self) -> ManifestVersion { - self.version.load(Ordering::Relaxed) - } - - async fn save(&self, action_list: RegionMetaActionList) -> Result { - let protocol = self.protocol.load(); - - ensure!( - protocol.is_writable(self.supported_writer_version), - ManifestProtocolForbidWriteSnafu { - min_version: protocol.min_writer_version, - supported_version: self.supported_writer_version, - } - ); - - let version = self.inc_version(); - - logging::debug!( - "Save region metadata action: {:?}, version: {}", - action_list, - version - ); - - self.store.save(version, &action_list.encode()?).await?; - - Ok(version) - } - - async fn scan( - &self, - start: ManifestVersion, - end: ManifestVersion, - ) -> Result { - Ok(RegionMetaActionListIterator { - log_iter: self.store.scan(start, end).await?, - reader_version: self.supported_reader_version, - last_protocol: None, - }) - } -} +pub type RegionManifest = ManifestImpl; #[cfg(test)] mod tests { + use std::sync::Arc; + use object_store::{backend::fs, ObjectStore}; + use store_api::manifest::{Manifest, MetaActionIterator, MAX_VERSION}; use tempdir::TempDir; use super::*; diff --git a/src/storage/src/region/tests/flush.rs b/src/storage/src/region/tests/flush.rs index ef784779a9..50d4995783 100644 --- a/src/storage/src/region/tests/flush.rs +++ b/src/storage/src/region/tests/flush.rs @@ -103,7 +103,7 @@ async fn test_flush_and_stall() { tester.put(&data).await; // Check parquet files. - let sst_dir = format!("{}/{}", store_dir, engine::region_sst_dir(REGION_NAME)); + let sst_dir = format!("{}/{}", store_dir, engine::region_sst_dir("", REGION_NAME)); let mut has_parquet_file = false; for entry in std::fs::read_dir(sst_dir).unwrap() { let entry = entry.unwrap(); diff --git a/src/storage/src/test_util/config_util.rs b/src/storage/src/test_util/config_util.rs index 8be3399665..46fbfc004e 100644 --- a/src/storage/src/test_util/config_util.rs +++ b/src/storage/src/test_util/config_util.rs @@ -20,8 +20,9 @@ pub async fn new_store_config( region_name: &str, store_dir: &str, ) -> StoreConfig { - let sst_dir = engine::region_sst_dir(region_name); - let manifest_dir = engine::region_manifest_dir(region_name); + let parent_dir = ""; + let sst_dir = engine::region_sst_dir(parent_dir, region_name); + let manifest_dir = engine::region_manifest_dir(parent_dir, region_name); let accessor = Backend::build().root(store_dir).finish().await.unwrap(); let object_store = ObjectStore::new(accessor); diff --git a/src/store-api/src/manifest.rs b/src/store-api/src/manifest.rs index 095f2a3f91..f75f9718ae 100644 --- a/src/store-api/src/manifest.rs +++ b/src/store-api/src/manifest.rs @@ -6,14 +6,29 @@ use async_trait::async_trait; use common_error::ext::ErrorExt; use serde::{de::DeserializeOwned, Serialize}; +use crate::manifest::action::ProtocolAction; +use crate::manifest::action::ProtocolVersion; pub use crate::manifest::storage::*; pub type ManifestVersion = u64; pub const MIN_VERSION: u64 = 0; pub const MAX_VERSION: u64 = u64::MAX; -pub trait MetaAction: Serialize + DeserializeOwned { +pub trait MetaAction: Serialize + DeserializeOwned + Send + Sync + Clone + std::fmt::Debug { + type Error: ErrorExt + Send + Sync; + + /// Set previous valid manifest version. fn set_prev_version(&mut self, version: ManifestVersion); + + /// Encode this action into a byte vector + fn encode(&self) -> Result, Self::Error>; + + /// Decode self from byte slice with reader protocol version, + /// return error when reader version is not supported. + fn decode( + bs: &[u8], + reader_version: ProtocolVersion, + ) -> Result<(Self, Option), Self::Error>; } #[async_trait] diff --git a/src/store-api/src/manifest/action.rs b/src/store-api/src/manifest/action.rs index 89070d02ae..dc784962b3 100644 --- a/src/store-api/src/manifest/action.rs +++ b/src/store-api/src/manifest/action.rs @@ -1,6 +1,8 @@ ///! Common actions for manifest use serde::{Deserialize, Serialize}; +use crate::manifest::ManifestVersion; + pub type ProtocolVersion = u16; /// Current reader and writer versions @@ -23,6 +25,11 @@ pub struct ProtocolAction { pub min_writer_version: ProtocolVersion, } +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +pub struct VersionHeader { + pub prev_version: ManifestVersion, +} + impl Default for ProtocolAction { fn default() -> Self { let (min_reader_version, min_writer_version) = supported_protocol_version(); diff --git a/src/store-api/src/storage.rs b/src/store-api/src/storage.rs index 930c137d3a..8616c981b9 100644 --- a/src/store-api/src/storage.rs +++ b/src/store-api/src/storage.rs @@ -16,7 +16,7 @@ pub use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; pub use self::chunk::{Chunk, ChunkReader}; pub use self::descriptors::*; -pub use self::engine::{EngineContext, OpenOptions, StorageEngine}; +pub use self::engine::{CreateOptions, EngineContext, OpenOptions, StorageEngine}; pub use self::metadata::RegionMeta; pub use self::region::{Region, WriteContext}; pub use self::requests::{GetRequest, PutOperation, ScanRequest, WriteRequest}; diff --git a/src/store-api/src/storage/descriptors.rs b/src/store-api/src/storage/descriptors.rs index 90f8bf1dbf..d73cd3ca12 100644 --- a/src/store-api/src/storage/descriptors.rs +++ b/src/store-api/src/storage/descriptors.rs @@ -9,7 +9,7 @@ pub type ColumnId = u32; /// Id of column family, unique in each region. pub type ColumnFamilyId = u32; /// Id of the region. -pub type RegionId = u32; +pub type RegionId = u64; // TODO(yingwen): Validate default value has same type with column, and name is a valid column name. /// A [ColumnDescriptor] contains information to create a column. diff --git a/src/store-api/src/storage/engine.rs b/src/store-api/src/storage/engine.rs index 2f6c6f9950..e9bdc27eaf 100644 --- a/src/store-api/src/storage/engine.rs +++ b/src/store-api/src/storage/engine.rs @@ -39,6 +39,7 @@ pub trait StorageEngine: Send + Sync + Clone + 'static { &self, ctx: &EngineContext, descriptor: RegionDescriptor, + opts: &CreateOptions, ) -> Result; /// Drops given region. @@ -62,6 +63,16 @@ pub trait StorageEngine: Send + Sync + Clone + 'static { #[derive(Debug, Clone, Default)] pub struct EngineContext {} +/// Options to create a region. +#[derive(Debug, Clone, Default)] +pub struct CreateOptions { + /// Region parent directory + pub parent_dir: String, +} + /// Options to open a region. #[derive(Debug, Clone, Default)] -pub struct OpenOptions {} +pub struct OpenOptions { + /// Region parent directory + pub parent_dir: String, +} diff --git a/src/table-engine/Cargo.toml b/src/table-engine/Cargo.toml index 8722a9c5a3..5d73710a21 100644 --- a/src/table-engine/Cargo.toml +++ b/src/table-engine/Cargo.toml @@ -4,6 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] +arc-swap = "1.0" async-stream = "0.3" async-trait = "0.1" chrono = { version = "0.4", features = ["serde"] } @@ -15,6 +16,9 @@ datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git" , b datatypes = { path = "../datatypes" } futures = "0.3" log-store = { path = "../log-store" } +object-store = { path = "../object-store" } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" snafu = { version = "0.7", features = ["backtraces"] } storage ={ path = "../storage" } store-api ={ path = "../store-api" } diff --git a/src/table-engine/src/config.rs b/src/table-engine/src/config.rs new file mode 100644 index 0000000000..335fce4069 --- /dev/null +++ b/src/table-engine/src/config.rs @@ -0,0 +1,4 @@ +//! Table Engine config + +#[derive(Debug, Clone, Default)] +pub struct EngineConfig {} diff --git a/src/table-engine/src/engine.rs b/src/table-engine/src/engine.rs index 884a0d84cf..de7e693a5c 100644 --- a/src/table-engine/src/engine.rs +++ b/src/table-engine/src/engine.rs @@ -1,34 +1,52 @@ use std::collections::HashMap; -use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::sync::RwLock; use async_trait::async_trait; use common_error::ext::BoxedError; use common_telemetry::logging; +use object_store::ObjectStore; use snafu::{OptionExt, ResultExt}; use store_api::storage::{ self, ColumnDescriptorBuilder, ColumnFamilyDescriptor, ColumnFamilyDescriptorBuilder, ColumnId, - OpenOptions, Region, RegionDescriptorBuilder, RegionMeta, RowKeyDescriptor, - RowKeyDescriptorBuilder, StorageEngine, + CreateOptions, OpenOptions, Region, RegionDescriptorBuilder, RegionId, RegionMeta, + RowKeyDescriptor, RowKeyDescriptorBuilder, StorageEngine, }; use table::engine::{EngineContext, TableEngine}; use table::requests::{AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest}; use table::Result as TableResult; use table::{ - metadata::{TableId, TableInfoBuilder, TableMetaBuilder, TableType}, + metadata::{TableId, TableInfoBuilder, TableMetaBuilder, TableType, TableVersion}, table::TableRef, }; use tokio::sync::Mutex; +use crate::config::EngineConfig; use crate::error::{ self, BuildColumnDescriptorSnafu, BuildColumnFamilyDescriptorSnafu, BuildRegionDescriptorSnafu, - BuildRowKeyDescriptorSnafu, MissingTimestampIndexSnafu, Result, + BuildRowKeyDescriptorSnafu, MissingTimestampIndexSnafu, Result, TableExistsSnafu, }; use crate::table::MitoTable; pub const MITO_ENGINE: &str = "mito"; const INIT_COLUMN_ID: ColumnId = 0; +const INIT_TABLE_VERSION: TableVersion = 0; + +/// Generate region name in the form of "{TABLE_ID}_{REGION_NUMBER}" +#[inline] +fn region_name(table_id: TableId, n: u32) -> String { + format!("{}_{:010}", table_id, n) +} + +#[inline] +fn region_id(table_id: TableId, n: u32) -> RegionId { + (u64::from(table_id) << 32) | u64::from(n) +} + +#[inline] +fn table_dir(table_name: &str) -> String { + format!("{}/", table_name) +} /// [TableEngine] implementation. /// @@ -40,9 +58,9 @@ pub struct MitoEngine { } impl MitoEngine { - pub fn new(storage_engine: S) -> Self { + pub fn new(config: EngineConfig, storage_engine: S, object_store: ObjectStore) -> Self { Self { - inner: Arc::new(MitoEngineInner::new(storage_engine)), + inner: Arc::new(MitoEngineInner::new(config, storage_engine, object_store)), } } } @@ -100,29 +118,13 @@ struct MitoEngineInner { /// /// Writing to `tables` should also hold the `table_mutex`. tables: RwLock>, + object_store: ObjectStore, storage_engine: S, - // FIXME(yingwen): Remove `next_table_id`. Table id should be assigned by other module (maybe catalog). - next_table_id: AtomicU64, /// Table mutex is used to protect the operations such as creating/opening/closing /// a table, to avoid things like opening the same table simultaneously. table_mutex: Mutex<()>, } -impl MitoEngineInner { - fn new(storage_engine: S) -> Self { - Self { - tables: RwLock::new(HashMap::default()), - storage_engine, - next_table_id: AtomicU64::new(0), - table_mutex: Mutex::new(()), - } - } - - fn next_table_id(&self) -> TableId { - self.next_table_id.fetch_add(1, Ordering::Relaxed) - } -} - fn build_row_key_desc_from_schema( mut column_id: ColumnId, request: &CreateTableRequest, @@ -239,17 +241,25 @@ impl MitoEngineInner { let table_name = &request.name; if let Some(table) = self.get_table(table_name) { - return Ok(table); + if request.create_if_not_exists { + return Ok(table); + } else { + return TableExistsSnafu { table_name }.fail(); + } } let (next_column_id, default_cf) = build_column_family_from_request(INIT_COLUMN_ID, &request)?; let (next_column_id, row_key) = build_row_key_desc_from_schema(next_column_id, &request)?; - // Now we just use table name as region name. TODO(yingwen): Naming pattern of region. - let region_name = table_name.clone(); + let table_id = request.id; + // TODO(dennis): supports multi regions; + let region_number = 0; + let region_id = region_id(table_id, region_number); + + let region_name = region_name(table_id, region_number); let region_descriptor = RegionDescriptorBuilder::default() - .id(0) + .id(region_id) .name(®ion_name) .row_key(row_key) .default_cf(default_cf) @@ -262,12 +272,20 @@ impl MitoEngineInner { let _lock = self.table_mutex.lock().await; // Checks again, read lock should be enough since we are guarded by the mutex. if let Some(table) = self.get_table(table_name) { - return Ok(table); + if request.create_if_not_exists { + return Ok(table); + } else { + return TableExistsSnafu { table_name }.fail(); + } } + let opts = CreateOptions { + parent_dir: table_dir(table_name), + }; + let region = self .storage_engine - .create_region(&storage::EngineContext::default(), region_descriptor) + .create_region(&storage::EngineContext::default(), region_descriptor, &opts) .await .map_err(BoxedError::new) .context(error::CreateRegionSnafu)?; @@ -282,17 +300,18 @@ impl MitoEngineInner { .context(error::BuildTableMetaSnafu { table_name })?; let table_info = TableInfoBuilder::new(table_name.clone(), table_meta) - .ident(self.next_table_id()) - .table_version(0u64) + .ident(table_id) + .table_version(INIT_TABLE_VERSION) .table_type(TableType::Base) .desc(request.desc) .build() .context(error::BuildTableInfoSnafu { table_name })?; - //TODO(dennis): persist table info to table manifest service. - logging::info!("Mito engine created table: {:?}.", table_info); + let table = Arc::new( + MitoTable::create(table_name, table_info, region, self.object_store.clone()).await?, + ); - let table = Arc::new(MitoTable::new(table_info, region)); + logging::info!("Mito engine created table: {:?}.", table.table_info()); self.tables .write() @@ -323,13 +342,18 @@ impl MitoEngineInner { } let engine_ctx = storage::EngineContext::default(); - let opts = OpenOptions::default(); - let region_name = table_name; - // Now we just use table name as region name. TODO(yingwen): Naming pattern of region. + let opts = OpenOptions { + parent_dir: table_dir(table_name), + }; + + let table_id = request.table_id; + // TODO(dennis): supports multi regions; + let region_number = 0; + let region_name = region_name(table_id, region_number); let region = match self .storage_engine - .open_region(&engine_ctx, region_name, &opts) + .open_region(&engine_ctx, ®ion_name, &opts) .await .map_err(BoxedError::new) .context(error::OpenRegionSnafu { region_name })? @@ -338,23 +362,8 @@ impl MitoEngineInner { Some(region) => region, }; - //FIXME(boyan): recover table meta from table manifest - let table_meta = TableMetaBuilder::default() - .schema(region.in_memory_metadata().schema().clone()) - .engine(MITO_ENGINE) - .next_column_id(INIT_COLUMN_ID) - .primary_key_indices(Vec::default()) - .build() - .context(error::BuildTableMetaSnafu { table_name })?; - - let table_info = TableInfoBuilder::new(table_name.clone(), table_meta) - .ident(request.table_id) - .table_version(0u64) - .table_type(TableType::Base) - .build() - .context(error::BuildTableInfoSnafu { table_name })?; - - let table = Arc::new(MitoTable::new(table_info, region)); + let table = + Arc::new(MitoTable::open(table_name, region, self.object_store.clone()).await?); self.tables .write() @@ -373,16 +382,43 @@ impl MitoEngineInner { } } +impl MitoEngineInner { + fn new(_config: EngineConfig, storage_engine: S, object_store: ObjectStore) -> Self { + Self { + tables: RwLock::new(HashMap::default()), + storage_engine, + object_store, + table_mutex: Mutex::new(()), + } + } +} + #[cfg(test)] mod tests { use common_recordbatch::util; use datafusion_common::field_util::FieldExt; use datafusion_common::field_util::SchemaExt; use datatypes::vectors::*; + use store_api::manifest::Manifest; use table::requests::InsertRequest; use super::*; use crate::table::test_util; + use crate::table::test_util::MockRegion; + + #[test] + fn test_region_name() { + assert_eq!("1_0000000000", region_name(1, 0)); + assert_eq!("1_0000000001", region_name(1, 1)); + assert_eq!("99_0000000100", region_name(99, 100)); + assert_eq!("1000_0000009999", region_name(1000, 9999)); + } + + #[test] + fn test_table_dir() { + assert_eq!("test_table/", table_dir("test_table")); + assert_eq!("demo/", table_dir("demo")); + } #[tokio::test] async fn test_create_table_insert_scan() { @@ -434,6 +470,55 @@ mod tests { assert_eq!(memories.to_arrow_array(), columns[3]); } + #[tokio::test] + async fn test_create_if_not_exists() { + common_telemetry::init_default_ut_logging(); + let ctx = EngineContext::default(); + + let (_engine, table_engine, table, _object_store, _dir) = + test_util::setup_mock_engine_and_table().await; + + let table = table + .as_any() + .downcast_ref::>() + .unwrap(); + let table_info = table.table_info(); + + let request = CreateTableRequest { + id: 1, + name: table_info.name.to_string(), + schema: table_info.meta.schema.clone(), + create_if_not_exists: true, + desc: None, + primary_key_indices: Vec::default(), + }; + + let created_table = table_engine.create_table(&ctx, request).await.unwrap(); + assert_eq!( + table_info, + created_table + .as_any() + .downcast_ref::>() + .unwrap() + .table_info() + ); + + // test create_if_not_exists=false + let request = CreateTableRequest { + id: 1, + name: table_info.name.to_string(), + schema: table_info.meta.schema.clone(), + create_if_not_exists: false, + desc: None, + primary_key_indices: Vec::default(), + }; + + let result = table_engine.create_table(&ctx, request).await; + + assert!(result.is_err()); + assert!(matches!(result, Err(e) if format!("{:?}", e).contains("Table already exists"))); + } + #[tokio::test] async fn test_open_table() { common_telemetry::init_default_ut_logging(); @@ -443,12 +528,13 @@ mod tests { catalog_name: String::new(), schema_name: String::new(), table_name: test_util::TABLE_NAME.to_string(), - // Currently the first table has id 0. - table_id: 0, + // the test table id is 1 + table_id: 1, }; - let (engine, table) = { - let (engine, table_engine, table) = test_util::setup_mock_engine_and_table().await; + let (engine, table, object_store, _dir) = { + let (engine, table_engine, table, object_store, dir) = + test_util::setup_mock_engine_and_table().await; assert_eq!(MITO_ENGINE, table_engine.name()); // Now try to open the table again. let reopened = table_engine @@ -458,16 +544,39 @@ mod tests { .unwrap(); assert_eq!(table.schema(), reopened.schema()); - (engine, table) + (engine, table, object_store, dir) }; // Construct a new table engine, and try to open the table. - let table_engine = MitoEngine::new(engine); + let table_engine = MitoEngine::new(EngineConfig::default(), engine, object_store); let reopened = table_engine .open_table(&ctx, open_req.clone()) .await .unwrap() .unwrap(); assert_eq!(table.schema(), reopened.schema()); + + let table = table + .as_any() + .downcast_ref::>() + .unwrap(); + let reopened = reopened + .as_any() + .downcast_ref::>() + .unwrap(); + + // assert recovered table_info is correct + assert_eq!(table.table_info(), reopened.table_info()); + assert_eq!(reopened.manifest().last_version(), 1); + } + + #[test] + fn test_region_id() { + assert_eq!(1, region_id(0, 1)); + assert_eq!(4294967296, region_id(1, 0)); + assert_eq!(4294967297, region_id(1, 1)); + assert_eq!(4294967396, region_id(1, 100)); + assert_eq!(8589934602, region_id(2, 10)); + assert_eq!(18446744069414584330, region_id(u32::MAX, 10)); } } diff --git a/src/table-engine/src/error.rs b/src/table-engine/src/error.rs index 740c415b04..d14f5c2691 100644 --- a/src/table-engine/src/error.rs +++ b/src/table-engine/src/error.rs @@ -95,6 +95,40 @@ pub enum Error { region_name: String, backtrace: Backtrace, }, + + #[snafu(display( + "Failed to update table metadata to manifest, table: {}, source: {}", + table_name, + source, + ))] + UpdateTableManifest { + #[snafu(backtrace)] + source: storage::error::Error, + table_name: String, + }, + + #[snafu(display( + "Failed to scan table metadata from manifest, table: {}, source: {}", + table_name, + source, + ))] + ScanTableManifest { + #[snafu(backtrace)] + source: storage::error::Error, + table_name: String, + }, + + #[snafu(display("Table info not found in manifest, table: {}", table_name))] + TableInfoNotFound { + backtrace: Backtrace, + table_name: String, + }, + + #[snafu(display("Table already exists: {}", table_name))] + TableExists { + backtrace: Backtrace, + table_name: String, + }, } impl From for table::error::Error { @@ -118,7 +152,12 @@ impl ErrorExt for Error { | BuildTableMeta { .. } | BuildTableInfo { .. } | BuildRegionDescriptor { .. } + | TableExists { .. } | MissingTimestampIndex { .. } => StatusCode::InvalidArguments, + + TableInfoNotFound { .. } => StatusCode::Unexpected, + + ScanTableManifest { .. } | UpdateTableManifest { .. } => StatusCode::StorageUnavailable, } } diff --git a/src/table-engine/src/lib.rs b/src/table-engine/src/lib.rs index bf9f81629f..462396858a 100644 --- a/src/table-engine/src/lib.rs +++ b/src/table-engine/src/lib.rs @@ -1,3 +1,5 @@ +pub mod config; pub mod engine; pub mod error; +mod manifest; pub mod table; diff --git a/src/table-engine/src/manifest.rs b/src/table-engine/src/manifest.rs new file mode 100644 index 0000000000..f8ca01d1c0 --- /dev/null +++ b/src/table-engine/src/manifest.rs @@ -0,0 +1,85 @@ +//! Table manifest service +pub mod action; + +use storage::manifest::ManifestImpl; + +use crate::manifest::action::TableMetaActionList; + +pub type TableManifest = ManifestImpl; + +#[cfg(test)] +mod tests { + use storage::manifest::MetaActionIteratorImpl; + use store_api::manifest::action::ProtocolAction; + use store_api::manifest::{Manifest, MetaActionIterator}; + use table::metadata::TableInfo; + + use super::*; + use crate::manifest::action::{TableChange, TableMetaAction, TableRemove}; + use crate::table::test_util; + type TableManifestActionIter = MetaActionIteratorImpl; + + async fn assert_actions( + iter: &mut TableManifestActionIter, + protocol: &ProtocolAction, + table_info: &TableInfo, + ) { + match iter.next_action().await.unwrap() { + Some((v, action_list)) => { + assert_eq!(v, 0); + assert_eq!(2, action_list.actions.len()); + assert!( + matches!(&action_list.actions[0], TableMetaAction::Protocol(p) if *p == *protocol) + ); + assert!( + matches!(&action_list.actions[1], TableMetaAction::Change(c) if c.table_info == *table_info) + ); + } + _ => unreachable!(), + } + } + + #[tokio::test] + async fn test_table_manifest() { + let (_dir, object_store) = test_util::new_test_object_store("test_table_manifest").await; + + let manifest = TableManifest::new("manifest/", object_store); + + let mut iter = manifest.scan(0, 100).await.unwrap(); + assert!(iter.next_action().await.unwrap().is_none()); + + let protocol = ProtocolAction::new(); + let table_info = test_util::build_test_table_info(); + let action_list = TableMetaActionList::new(vec![ + TableMetaAction::Protocol(protocol.clone()), + TableMetaAction::Change(Box::new(TableChange { + table_info: table_info.clone(), + })), + ]); + + assert_eq!(0, manifest.update(action_list).await.unwrap()); + + let mut iter = manifest.scan(0, 100).await.unwrap(); + assert_actions(&mut iter, &protocol, &table_info).await; + assert!(iter.next_action().await.unwrap().is_none()); + + // update another action + let action_list = TableMetaActionList::new(vec![TableMetaAction::Remove(TableRemove { + table_name: table_info.name.clone(), + table_ident: table_info.ident.clone(), + })]); + assert_eq!(1, manifest.update(action_list).await.unwrap()); + let mut iter = manifest.scan(0, 100).await.unwrap(); + assert_actions(&mut iter, &protocol, &table_info).await; + + match iter.next_action().await.unwrap() { + Some((v, action_list)) => { + assert_eq!(v, 1); + assert_eq!(1, action_list.actions.len()); + assert!(matches!(&action_list.actions[0], + TableMetaAction::Remove(r) if r.table_name == table_info.name && r.table_ident == table_info.ident)); + } + _ => unreachable!(), + } + } +} diff --git a/src/table-engine/src/manifest/action.rs b/src/table-engine/src/manifest/action.rs new file mode 100644 index 0000000000..6b9cae8646 --- /dev/null +++ b/src/table-engine/src/manifest/action.rs @@ -0,0 +1,154 @@ +use std::io::{BufRead, BufReader}; + +use serde::{Deserialize, Serialize}; +use serde_json as json; +use snafu::{ensure, OptionExt, ResultExt}; +use storage::error::{ + DecodeJsonSnafu, DecodeMetaActionListSnafu, Error as StorageError, + ManifestProtocolForbidReadSnafu, ReadlineSnafu, +}; +use storage::manifest::helper; +use store_api::manifest::action::{ProtocolAction, ProtocolVersion, VersionHeader}; +use store_api::manifest::ManifestVersion; +use store_api::manifest::MetaAction; +use table::metadata::{TableIdent, TableInfo}; + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +pub struct TableChange { + pub table_info: TableInfo, +} + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +pub struct TableRemove { + pub table_ident: TableIdent, + pub table_name: String, +} + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +pub enum TableMetaAction { + Protocol(ProtocolAction), + // Boxed TableChange to reduce the total size of enum + Change(Box), + Remove(TableRemove), +} + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +pub struct TableMetaActionList { + pub actions: Vec, + pub prev_version: ManifestVersion, +} + +impl TableMetaActionList { + pub fn new(actions: Vec) -> Self { + Self { + actions, + prev_version: 0, + } + } +} + +impl MetaAction for TableMetaActionList { + type Error = StorageError; + + fn set_prev_version(&mut self, version: ManifestVersion) { + self.prev_version = version; + } + + fn encode(&self) -> Result, Self::Error> { + helper::encode_actions(self.prev_version, &self.actions) + } + + /// TODO(dennis): duplicated code with RegionMetaActionList::decode, try to refactor it. + fn decode( + bs: &[u8], + reader_version: ProtocolVersion, + ) -> Result<(Self, Option), Self::Error> { + let mut lines = BufReader::new(bs).lines(); + + let mut action_list = TableMetaActionList { + actions: Vec::default(), + prev_version: 0, + }; + + { + let first_line = lines + .next() + .with_context(|| DecodeMetaActionListSnafu { + msg: format!( + "Invalid content in manifest: {}", + std::str::from_utf8(bs).unwrap_or("**invalid bytes**") + ), + })? + .context(ReadlineSnafu)?; + + // Decode prev_version + let v: VersionHeader = json::from_str(&first_line).context(DecodeJsonSnafu)?; + action_list.prev_version = v.prev_version; + } + + // Decode actions + let mut protocol_action = None; + let mut actions = Vec::default(); + for line in lines { + let line = &line.context(ReadlineSnafu)?; + let action: TableMetaAction = json::from_str(line).context(DecodeJsonSnafu)?; + + if let TableMetaAction::Protocol(p) = &action { + ensure!( + p.is_readable(reader_version), + ManifestProtocolForbidReadSnafu { + min_version: p.min_reader_version, + supported_version: reader_version, + } + ); + protocol_action = Some(p.clone()); + } + + actions.push(action); + } + action_list.actions = actions; + + Ok((action_list, protocol_action)) + } +} + +#[cfg(test)] +mod tests { + use common_telemetry::logging; + + use super::*; + use crate::table::test_util; + + #[test] + fn test_encode_decode_action_list() { + common_telemetry::init_default_ut_logging(); + let mut protocol = ProtocolAction::new(); + protocol.min_reader_version = 1; + + let table_info = test_util::build_test_table_info(); + + let mut action_list = TableMetaActionList::new(vec![ + TableMetaAction::Protocol(protocol.clone()), + TableMetaAction::Change(Box::new(TableChange { table_info })), + ]); + action_list.set_prev_version(3); + + let bs = action_list.encode().unwrap(); + + logging::debug!( + "Encoded action list: \r\n{}", + String::from_utf8(bs.clone()).unwrap() + ); + + let e = TableMetaActionList::decode(&bs, 0); + assert!(e.is_err()); + assert_eq!( + "Manifest protocol forbid to read, min_version: 1, supported_version: 0", + format!("{}", e.err().unwrap()) + ); + + let (decode_list, p) = TableMetaActionList::decode(&bs, 1).unwrap(); + assert_eq!(decode_list, action_list); + assert_eq!(p.unwrap(), protocol); + } +} diff --git a/src/table-engine/src/table.rs b/src/table-engine/src/table.rs index 2c91ad2f4d..e1cfbc4f2b 100644 --- a/src/table-engine/src/table.rs +++ b/src/table-engine/src/table.rs @@ -8,9 +8,13 @@ use async_trait::async_trait; use common_query::logical_plan::Expr; use common_recordbatch::error::{Error as RecordBatchError, Result as RecordBatchResult}; use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream}; +use common_telemetry::logging; use futures::task::{Context, Poll}; use futures::Stream; -use snafu::OptionExt; +use object_store::ObjectStore; +use snafu::{OptionExt, ResultExt}; +use store_api::manifest::action::ProtocolAction; +use store_api::manifest::{self, Manifest, ManifestVersion, MetaActionIterator}; use store_api::storage::{ ChunkReader, PutOperation, ReadContext, Region, ScanRequest, SchemaRef, Snapshot, WriteContext, WriteRequest, @@ -22,10 +26,22 @@ use table::{ table::Table, }; +use crate::error::{ + Result, ScanTableManifestSnafu, TableInfoNotFoundSnafu, UpdateTableManifestSnafu, +}; +use crate::manifest::action::*; +use crate::manifest::TableManifest; + +#[inline] +fn table_manifest_dir(table_name: &str) -> String { + format!("{}/manifest/", table_name) +} + /// [Table] implementation. pub struct MitoTable { + manifest: TableManifest, table_info: TableInfo, - //TODO(dennis): a table contains multi regions + // TODO(dennis): a table contains multi regions region: R, } @@ -139,7 +155,118 @@ impl Stream for ChunkStream { } impl MitoTable { - pub fn new(table_info: TableInfo, region: R) -> Self { - Self { table_info, region } + fn new(table_info: TableInfo, region: R, manifest: TableManifest) -> Self { + Self { + table_info, + region, + manifest, + } + } + + pub async fn create( + table_name: &str, + table_info: TableInfo, + region: R, + object_store: ObjectStore, + ) -> Result> { + let manifest = TableManifest::new(&table_manifest_dir(table_name), object_store); + + // TODO(dennis): save manifest version into catalog? + let _manifest_version = manifest + .update(TableMetaActionList::new(vec![ + TableMetaAction::Protocol(ProtocolAction::new()), + TableMetaAction::Change(Box::new(TableChange { + table_info: table_info.clone(), + })), + ])) + .await + .context(UpdateTableManifestSnafu { table_name })?; + + Ok(MitoTable::new(table_info, region, manifest)) + } + + pub async fn open( + table_name: &str, + region: R, + object_store: ObjectStore, + ) -> Result> { + let manifest = TableManifest::new(&table_manifest_dir(table_name), object_store); + + let table_info = Self::recover_table_info(table_name, &manifest) + .await? + .context(TableInfoNotFoundSnafu { table_name })?; + + Ok(MitoTable::new(table_info, region, manifest)) + } + + async fn recover_table_info( + table_name: &str, + manifest: &TableManifest, + ) -> Result> { + let (start, end) = Self::manifest_scan_range(); + let mut iter = manifest + .scan(start, end) + .await + .context(ScanTableManifestSnafu { table_name })?; + + let mut last_manifest_version = manifest::MIN_VERSION; + let mut table_info = None; + while let Some((manifest_version, action_list)) = iter + .next_action() + .await + .context(ScanTableManifestSnafu { table_name })? + { + last_manifest_version = manifest_version; + + for action in action_list.actions { + match action { + TableMetaAction::Change(c) => { + table_info = Some(c.table_info); + } + TableMetaAction::Protocol(_) => {} + _ => unimplemented!(), + } + } + } + + if table_info.is_some() { + // update manifest state after recovering + let protocol = iter.last_protocol(); + manifest.update_state(last_manifest_version + 1, protocol.clone()); + } + + logging::debug!( + "Recovered table info {:?} for table: {}", + table_info, + table_name + ); + + Ok(table_info) + } + + #[inline] + pub fn table_info(&self) -> &TableInfo { + &self.table_info + } + + #[inline] + pub fn manifest(&self) -> &TableManifest { + &self.manifest + } + + fn manifest_scan_range() -> (ManifestVersion, ManifestVersion) { + // TODO(dennis): use manifest version in catalog ? + (manifest::MIN_VERSION, manifest::MAX_VERSION) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_table_manifest_dir() { + assert_eq!("demo/manifest/", table_manifest_dir("demo")); + assert_eq!("numbers/manifest/", table_manifest_dir("numbers")); } } diff --git a/src/table-engine/src/table/test_util.rs b/src/table-engine/src/table/test_util.rs index f82dccecd9..180ebefc39 100644 --- a/src/table-engine/src/table/test_util.rs +++ b/src/table-engine/src/table/test_util.rs @@ -6,20 +6,25 @@ use datatypes::prelude::ConcreteDataType; use datatypes::schema::SchemaRef; use datatypes::schema::{ColumnSchema, Schema}; use log_store::fs::noop::NoopLogStore; -use storage::config::EngineConfig; +use object_store::{backend::fs::Backend, ObjectStore}; +use storage::config::EngineConfig as StorageEngineConfig; use storage::EngineImpl; use table::engine::EngineContext; use table::engine::TableEngine; +use table::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder, TableType}; use table::requests::CreateTableRequest; use table::TableRef; use tempdir::TempDir; +use crate::config::EngineConfig; use crate::engine::MitoEngine; -use crate::table::test_util::mock_engine::MockEngine; +use crate::engine::MITO_ENGINE; +pub use crate::table::test_util::mock_engine::MockEngine; +pub use crate::table::test_util::mock_engine::MockRegion; pub const TABLE_NAME: &str = "demo"; -fn schema_for_test() -> Schema { +pub fn schema_for_test() -> Schema { let column_schemas = vec![ ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), true), ColumnSchema::new("host", ConcreteDataType::string_datatype(), false), @@ -32,22 +37,47 @@ fn schema_for_test() -> Schema { pub type MockMitoEngine = MitoEngine; +pub fn build_test_table_info() -> TableInfo { + let table_meta = TableMetaBuilder::default() + .schema(Arc::new(schema_for_test())) + .engine(MITO_ENGINE) + .next_column_id(1) + .primary_key_indices(vec![0, 1]) + .build() + .unwrap(); + + TableInfoBuilder::new(TABLE_NAME.to_string(), table_meta) + .ident(0) + .table_version(0u64) + .table_type(TableType::Base) + .build() + .unwrap() +} + +pub async fn new_test_object_store(prefix: &str) -> (TempDir, ObjectStore) { + let dir = TempDir::new(prefix).unwrap(); + let store_dir = dir.path().to_string_lossy(); + let accessor = Backend::build().root(&store_dir).finish().await.unwrap(); + + (dir, ObjectStore::new(accessor)) +} + pub async fn setup_test_engine_and_table() -> ( MitoEngine>, TableRef, SchemaRef, TempDir, ) { - let dir = TempDir::new("setup_test_engine_and_table").unwrap(); - let store_dir = dir.path().to_string_lossy(); + let (dir, object_store) = new_test_object_store("setup_test_engine_and_table").await; let table_engine = MitoEngine::new( + EngineConfig::default(), EngineImpl::new( - EngineConfig::with_store_dir(&store_dir), + StorageEngineConfig::default(), Arc::new(NoopLogStore::default()), - ) - .await - .unwrap(), + object_store.clone(), + ), + object_store, ); let schema = Arc::new(schema_for_test()); @@ -55,6 +85,7 @@ pub async fn setup_test_engine_and_table() -> ( .create_table( &EngineContext::default(), CreateTableRequest { + id: 1, name: TABLE_NAME.to_string(), desc: Some("a test table".to_string()), schema: schema.clone(), @@ -68,15 +99,22 @@ pub async fn setup_test_engine_and_table() -> ( (table_engine, table, schema, dir) } -pub async fn setup_mock_engine_and_table() -> (MockEngine, MockMitoEngine, TableRef) { +pub async fn setup_mock_engine_and_table( +) -> (MockEngine, MockMitoEngine, TableRef, ObjectStore, TempDir) { let mock_engine = MockEngine::default(); - let table_engine = MitoEngine::new(mock_engine.clone()); + let (dir, object_store) = new_test_object_store("setup_mock_engine_and_table").await; + let table_engine = MitoEngine::new( + EngineConfig::default(), + mock_engine.clone(), + object_store.clone(), + ); let schema = Arc::new(schema_for_test()); let table = table_engine .create_table( &EngineContext::default(), CreateTableRequest { + id: 1, name: TABLE_NAME.to_string(), desc: None, schema: schema.clone(), @@ -87,5 +125,5 @@ pub async fn setup_mock_engine_and_table() -> (MockEngine, MockMitoEngine, Table .await .unwrap(); - (mock_engine, table_engine, table) + (mock_engine, table_engine, table, object_store, dir) } diff --git a/src/table-engine/src/table/test_util/mock_engine.rs b/src/table-engine/src/table/test_util/mock_engine.rs index a446e7be33..aacf2109d7 100644 --- a/src/table-engine/src/table/test_util/mock_engine.rs +++ b/src/table-engine/src/table/test_util/mock_engine.rs @@ -10,9 +10,9 @@ use common_telemetry::logging; use storage::metadata::{RegionMetaImpl, RegionMetadataRef}; use storage::write_batch::WriteBatch; use store_api::storage::{ - Chunk, ChunkReader, EngineContext, GetRequest, GetResponse, OpenOptions, ReadContext, Region, - RegionDescriptor, ScanRequest, ScanResponse, SchemaRef, Snapshot, StorageEngine, WriteContext, - WriteResponse, + Chunk, ChunkReader, CreateOptions, EngineContext, GetRequest, GetResponse, OpenOptions, + ReadContext, Region, RegionDescriptor, ScanRequest, ScanResponse, SchemaRef, Snapshot, + StorageEngine, WriteContext, WriteResponse, }; pub type Result = std::result::Result; @@ -153,6 +153,7 @@ impl StorageEngine for MockEngine { &self, _ctx: &EngineContext, descriptor: RegionDescriptor, + _opts: &CreateOptions, ) -> Result { logging::info!("Mock engine create region, descriptor: {:?}", descriptor); diff --git a/src/table/src/metadata.rs b/src/table/src/metadata.rs index 39b578d13f..f34828bab6 100644 --- a/src/table/src/metadata.rs +++ b/src/table/src/metadata.rs @@ -3,14 +3,15 @@ use std::collections::HashMap; use chrono::{DateTime, Utc}; use datatypes::schema::SchemaRef; use derive_builder::Builder; +use serde::{Deserialize, Serialize}; use store_api::storage::ColumnId; -pub type TableId = u64; +pub type TableId = u32; pub type TableVersion = u64; /// Indicates whether and how a filter expression can be handled by a /// Table for table scans. -#[derive(Debug, Clone, PartialEq)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] pub enum FilterPushDownType { /// The expression cannot be used by the provider. Unsupported, @@ -26,7 +27,7 @@ pub enum FilterPushDownType { } /// Indicates the type of this table for metadata/catalog purposes. -#[derive(Debug, Clone, Copy, PartialEq)] +#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq)] pub enum TableType { /// An ordinary physical table. Base, @@ -36,13 +37,13 @@ pub enum TableType { Temporary, } -#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq, Default)] +#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, Default)] pub struct TableIdent { pub table_id: TableId, pub version: TableVersion, } -#[derive(Clone, Debug, Builder)] +#[derive(Serialize, Deserialize, Clone, Debug, Builder, PartialEq)] #[builder(pattern = "mutable")] pub struct TableMeta { pub schema: SchemaRef, @@ -90,7 +91,7 @@ impl TableMeta { } } -#[derive(Clone, Debug, Builder)] +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Builder)] #[builder(pattern = "owned")] pub struct TableInfo { #[builder(default, setter(into))] diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index e80c777961..2c60021a54 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -15,6 +15,7 @@ pub struct InsertRequest { /// Create table request #[derive(Debug)] pub struct CreateTableRequest { + pub id: TableId, pub name: String, pub desc: Option, pub schema: SchemaRef,