diff --git a/src/storage/src/lib.rs b/src/storage/src/lib.rs index 6d500c0af5..a7174dd24d 100644 --- a/src/storage/src/lib.rs +++ b/src/storage/src/lib.rs @@ -20,6 +20,6 @@ mod sync; mod test_util; mod version; mod wal; -mod write_batch; +pub mod write_batch; pub use engine::EngineImpl; diff --git a/src/storage/src/region.rs b/src/storage/src/region.rs index c04d8c86c6..5831b039f5 100644 --- a/src/storage/src/region.rs +++ b/src/storage/src/region.rs @@ -91,6 +91,8 @@ impl RegionImpl { /// Create a new region and also persist the region metadata to manifest. /// /// The caller should avoid calling this method simultaneously. + // FIXME(yingwen): Region id is already specific in metadata, but name is not specific in metadata. We should + // add name to RegionMetadata. pub async fn create( id: RegionId, name: String, diff --git a/src/store-api/src/storage.rs b/src/store-api/src/storage.rs index b69f78ead9..eb83f4d38d 100644 --- a/src/store-api/src/storage.rs +++ b/src/store-api/src/storage.rs @@ -16,7 +16,7 @@ pub use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; pub use self::chunk::{Chunk, ChunkReader}; pub use self::descriptors::{ - gen_region_name, ColumnDescriptor, ColumnDescriptorBuilder, ColumnFamilyDescriptor, + ColumnDescriptor, ColumnDescriptorBuilder, ColumnFamilyDescriptor, ColumnFamilyDescriptorBuilder, ColumnFamilyId, ColumnId, RegionDescriptor, RegionId, RowKeyDescriptor, RowKeyDescriptorBuilder, }; diff --git a/src/store-api/src/storage/descriptors.rs b/src/store-api/src/storage/descriptors.rs index f7dbb854e9..90f8bf1dbf 100644 --- a/src/store-api/src/storage/descriptors.rs +++ b/src/store-api/src/storage/descriptors.rs @@ -10,13 +10,6 @@ pub type ColumnId = u32; pub type ColumnFamilyId = u32; /// Id of the region. pub type RegionId = u32; -/// Default region name prefix -pub const REGION_PREFIX: &str = "r_"; - -#[inline] -pub fn gen_region_name(id: RegionId) -> String { - format!("{}{}", REGION_PREFIX, id) -} // TODO(yingwen): Validate default value has same type with column, and name is a valid column name. /// A [ColumnDescriptor] contains information to create a column. @@ -234,10 +227,4 @@ mod tests { .unwrap(); assert_eq!(1, desc.columns.len()); } - - #[test] - fn test_gen_region_name() { - assert_eq!("r_0", gen_region_name(0)); - assert_eq!("r_99", gen_region_name(99)); - } } diff --git a/src/table-engine/Cargo.toml b/src/table-engine/Cargo.toml index a35e275a90..8722a9c5a3 100644 --- a/src/table-engine/Cargo.toml +++ b/src/table-engine/Cargo.toml @@ -19,6 +19,7 @@ snafu = { version = "0.7", features = ["backtraces"] } storage ={ path = "../storage" } store-api ={ path = "../store-api" } table = { path = "../table" } +tokio = { version = "1.0", features = ["full"] } [dev-dependencies] datatypes = { path = "../datatypes" } diff --git a/src/table-engine/src/engine.rs b/src/table-engine/src/engine.rs index ee09d80609..71c45690d6 100644 --- a/src/table-engine/src/engine.rs +++ b/src/table-engine/src/engine.rs @@ -5,20 +5,20 @@ use std::sync::RwLock; use async_trait::async_trait; use common_error::ext::BoxedError; +use common_telemetry::logging; use snafu::ResultExt; -use store_api::storage::ConcreteDataType; use store_api::storage::{ - self as store, ColumnDescriptorBuilder, ColumnFamilyDescriptorBuilder, - EngineContext as StorageContext, Region, RegionDescriptor, RegionId, RegionMeta, - RowKeyDescriptorBuilder, StorageEngine, + self, ColumnDescriptorBuilder, ColumnFamilyDescriptorBuilder, ConcreteDataType, OpenOptions, + Region, RegionDescriptor, RegionId, RegionMeta, RowKeyDescriptorBuilder, StorageEngine, }; use table::engine::{EngineContext, TableEngine}; -use table::requests::{AlterTableRequest, CreateTableRequest, DropTableRequest}; +use table::requests::{AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest}; use table::Result as TableResult; use table::{ metadata::{TableId, TableInfoBuilder, TableMetaBuilder, TableType}, table::TableRef, }; +use tokio::sync::Mutex; use crate::error::{self, Result}; use crate::table::MitoTable; @@ -30,12 +30,12 @@ pub const DEFAULT_ENGINE: &str = "mito"; /// About mito . /// "you can't be a true petrolhead until you've owned an Alfa Romeo" -- by Jeremy Clarkson #[derive(Clone)] -pub struct MitoEngine { - inner: Arc>, +pub struct MitoEngine { + inner: Arc>, } -impl MitoEngine { - pub fn new(storage_engine: Store) -> Self { +impl MitoEngine { + pub fn new(storage_engine: S) -> Self { Self { inner: Arc::new(MitoEngineInner::new(storage_engine)), } @@ -43,7 +43,7 @@ impl MitoEngine { } #[async_trait] -impl TableEngine for MitoEngine { +impl TableEngine for MitoEngine { async fn create_table( &self, ctx: &EngineContext, @@ -52,6 +52,14 @@ impl TableEngine for MitoEngine { Ok(self.inner.create_table(ctx, request).await?) } + async fn open_table( + &self, + ctx: &EngineContext, + request: OpenTableRequest, + ) -> TableResult { + Ok(self.inner.open_table(ctx, request).await?) + } + async fn alter_table( &self, _ctx: &EngineContext, @@ -60,8 +68,8 @@ impl TableEngine for MitoEngine { unimplemented!(); } - fn get_table(&self, ctx: &EngineContext, name: &str) -> TableResult> { - Ok(self.inner.get_table(ctx, name)?) + fn get_table(&self, _ctx: &EngineContext, name: &str) -> TableResult> { + Ok(self.inner.get_table(name)) } fn table_exists(&self, _ctx: &EngineContext, _name: &str) -> bool { @@ -78,18 +86,26 @@ impl TableEngine for MitoEngine { } /// FIXME(dennis) impl system catalog to keep table metadata. -struct MitoEngineInner { +struct MitoEngineInner { + /// All tables opened by the engine. + /// + /// Writing to `tables` should also hold the `table_mutex`. tables: RwLock>, - storage_engine: Store, + storage_engine: S, + // FIXME(yingwen): Remove `next_table_id`. Table id should be assigned by other module (maybe catalog). next_table_id: AtomicU64, + /// Table mutex is used to protect the operations such as creating/opening/closing + /// a table, to avoid things like opening the same table simultaneously. + table_mutex: Mutex<()>, } -impl MitoEngineInner { - fn new(storage_engine: Store) -> Self { +impl MitoEngineInner { + fn new(storage_engine: S) -> Self { Self { tables: RwLock::new(HashMap::default()), storage_engine, next_table_id: AtomicU64::new(0), + table_mutex: Mutex::new(()), } } @@ -98,7 +114,7 @@ impl MitoEngineInner { } } -impl MitoEngineInner { +impl MitoEngineInner { async fn create_table( &self, _ctx: &EngineContext, @@ -114,7 +130,8 @@ impl MitoEngineInner { //TODO(dennis): supports multi regions let region_id: RegionId = 0; - let name = store::gen_region_name(region_id); + // TODO(yingwen): Maybe we should use table name as part of region name. + let name = request.name.clone(); let host_column = ColumnDescriptorBuilder::new(0, "host", ConcreteDataType::string_datatype()) @@ -150,7 +167,7 @@ impl MitoEngineInner { let region = self .storage_engine .create_region( - &StorageContext::default(), + &storage::EngineContext::default(), RegionDescriptor { id: region_id, name, @@ -189,8 +206,66 @@ impl MitoEngineInner { Ok(table) } - fn get_table(&self, _ctx: &EngineContext, name: &str) -> Result> { - Ok(self.tables.read().unwrap().get(name).cloned()) + // TODO(yingwen): Support catalog and schema name. + async fn open_table( + &self, + _ctx: &EngineContext, + request: OpenTableRequest, + ) -> TableResult { + let table_name = &request.table_name; + if let Some(table) = self.get_table(table_name) { + // Table has already been opened. + return Ok(table); + } + + // Acquires the mutex before opening a new table. + let table = { + let _lock = self.table_mutex.lock().await; + // Checks again, read lock should be enough since we are guarded by the mutex. + if let Some(table) = self.get_table(table_name) { + return Ok(table); + } + + let engine_ctx = storage::EngineContext::default(); + let opts = OpenOptions::default(); + let region_name = table_name; + // Now we just use table name as region name. TODO(yingwen): Naming pattern of region. + let region = self + .storage_engine + .open_region(&engine_ctx, region_name, &opts) + .await + .map_err(BoxedError::new) + .context(error::OpenRegionSnafu { region_name })?; + + let table_meta = TableMetaBuilder::default() + .schema(region.in_memory_metadata().schema().clone()) + .engine(DEFAULT_ENGINE) + .build() + .context(error::BuildTableMetaSnafu)?; + let table_info = TableInfoBuilder::new(table_name.clone(), table_meta) + .ident(request.table_id) + .table_version(0u64) + .table_type(TableType::Base) + .build() + .context(error::BuildTableInfoSnafu)?; + + let table = Arc::new(MitoTable::new(table_info, region)); + + self.tables + .write() + .unwrap() + .insert(table_name.to_string(), table.clone()); + + table + }; + + logging::info!("Mito engine opened table {}", table_name); + + Ok(table) + } + + fn get_table(&self, name: &str) -> Option { + self.tables.read().unwrap().get(name).cloned() } } @@ -203,11 +278,11 @@ mod tests { use table::requests::InsertRequest; use super::*; - use crate::table::test; + use crate::table::test_util; #[tokio::test] async fn test_create_table_insert_scan() { - let (_engine, table, schema, _dir) = test::setup_test_engine_and_table().await; + let (_engine, table, schema, _dir) = test_util::setup_test_engine_and_table().await; assert_eq!(TableType::Base, table.table_type()); assert_eq!(schema, table.schema()); @@ -254,4 +329,38 @@ mod tests { assert_eq!(cpus.to_arrow_array(), columns[2]); assert_eq!(memories.to_arrow_array(), columns[3]); } + + #[tokio::test] + async fn test_open_table() { + common_telemetry::init_default_ut_logging(); + + let ctx = EngineContext::default(); + let open_req = OpenTableRequest { + catalog_name: String::new(), + schema_name: String::new(), + table_name: test_util::TABLE_NAME.to_string(), + // Currently the first table has id 0. + table_id: 0, + }; + + let (engine, table) = { + let (engine, table_engine, table) = test_util::setup_mock_engine_and_table().await; + // Now try to open the table again. + let reopened = table_engine + .open_table(&ctx, open_req.clone()) + .await + .unwrap(); + assert_eq!(table.schema(), reopened.schema()); + + (engine, table) + }; + + // Construct a new table engine, and try to open the table. + let table_engine = MitoEngine::new(engine); + let reopened = table_engine + .open_table(&ctx, open_req.clone()) + .await + .unwrap(); + assert_eq!(table.schema(), reopened.schema()); + } } diff --git a/src/table-engine/src/error.rs b/src/table-engine/src/error.rs index 07c165ecc4..9f63749852 100644 --- a/src/table-engine/src/error.rs +++ b/src/table-engine/src/error.rs @@ -2,15 +2,35 @@ use std::any::Any; use common_error::ext::BoxedError; use common_error::prelude::*; +use table::metadata::{TableInfoBuilderError, TableMetaBuilderError}; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { - #[snafu(display("Fail to create region, source: {}", source))] + #[snafu(display("Failed to create region, source: {}", source))] CreateRegion { #[snafu(backtrace)] source: BoxedError, }, + + #[snafu(display("Failed to open region, region: {}, source: {}", region_name, source))] + OpenRegion { + region_name: String, + #[snafu(backtrace)] + source: BoxedError, + }, + + #[snafu(display("Failed to build table meta, source: {}", source))] + BuildTableMeta { + source: TableMetaBuilderError, + backtrace: Backtrace, + }, + + #[snafu(display("Failed to build table info, source: {}", source))] + BuildTableInfo { + source: TableInfoBuilderError, + backtrace: Backtrace, + }, } impl From for table::error::Error { @@ -23,8 +43,11 @@ pub type Result = std::result::Result; impl ErrorExt for Error { fn status_code(&self) -> StatusCode { + use Error::*; + match self { - Error::CreateRegion { source, .. } => source.status_code(), + CreateRegion { source, .. } | OpenRegion { source, .. } => source.status_code(), + BuildTableMeta { .. } | BuildTableInfo { .. } => StatusCode::Unexpected, } } diff --git a/src/table-engine/src/table.rs b/src/table-engine/src/table.rs index 4146edbb10..7259674587 100644 --- a/src/table-engine/src/table.rs +++ b/src/table-engine/src/table.rs @@ -1,5 +1,5 @@ #[cfg(test)] -pub mod test; +pub mod test_util; use std::any::Any; use std::pin::Pin; diff --git a/src/table-engine/src/table/test.rs b/src/table-engine/src/table/test_util.rs similarity index 58% rename from src/table-engine/src/table/test.rs rename to src/table-engine/src/table/test_util.rs index 418de6c2db..99deb75537 100644 --- a/src/table-engine/src/table/test.rs +++ b/src/table-engine/src/table/test_util.rs @@ -1,3 +1,5 @@ +mod mock_engine; + use std::sync::Arc; use datatypes::prelude::ConcreteDataType; @@ -13,13 +15,11 @@ use table::TableRef; use tempdir::TempDir; use crate::engine::MitoEngine; +use crate::table::test_util::mock_engine::MockEngine; -pub async fn setup_test_engine_and_table() -> ( - MitoEngine>, - TableRef, - SchemaRef, - TempDir, -) { +pub const TABLE_NAME: &str = "demo"; + +fn schema_for_test() -> Schema { let column_schemas = vec![ ColumnSchema::new("host", ConcreteDataType::string_datatype(), false), ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), true), @@ -27,10 +27,21 @@ pub async fn setup_test_engine_and_table() -> ( ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true), ]; + Schema::with_timestamp_index(column_schemas, 1).expect("ts must be timestamp column") +} + +pub type MockMitoEngine = MitoEngine; + +pub async fn setup_test_engine_and_table() -> ( + MitoEngine>, + TableRef, + SchemaRef, + TempDir, +) { let dir = TempDir::new("setup_test_engine_and_table").unwrap(); let store_dir = dir.path().to_string_lossy(); - let table_engine = MitoEngine::>::new( + let table_engine = MitoEngine::new( EngineImpl::new( EngineConfig::with_store_dir(&store_dir), Arc::new(NoopLogStore::default()), @@ -39,16 +50,13 @@ pub async fn setup_test_engine_and_table() -> ( .unwrap(), ); - let table_name = "demo"; - let schema = Arc::new( - Schema::with_timestamp_index(column_schemas, 1).expect("ts must be timestamp column"), - ); + let schema = Arc::new(schema_for_test()); let table = table_engine .create_table( &EngineContext::default(), CreateTableRequest { - name: table_name.to_string(), - desc: Some(" a test table".to_string()), + name: TABLE_NAME.to_string(), + desc: Some("a test table".to_string()), schema: schema.clone(), }, ) @@ -57,3 +65,23 @@ pub async fn setup_test_engine_and_table() -> ( (table_engine, table, schema, dir) } + +pub async fn setup_mock_engine_and_table() -> (MockEngine, MockMitoEngine, TableRef) { + let mock_engine = MockEngine::default(); + let table_engine = MitoEngine::new(mock_engine.clone()); + + let schema = Arc::new(schema_for_test()); + let table = table_engine + .create_table( + &EngineContext::default(), + CreateTableRequest { + name: TABLE_NAME.to_string(), + desc: None, + schema: schema.clone(), + }, + ) + .await + .unwrap(); + + (mock_engine, table_engine, table) +} diff --git a/src/table-engine/src/table/test_util/mock_engine.rs b/src/table-engine/src/table/test_util/mock_engine.rs new file mode 100644 index 0000000000..c75222301b --- /dev/null +++ b/src/table-engine/src/table/test_util/mock_engine.rs @@ -0,0 +1,183 @@ +//! A mock storage engine for table test purpose. + +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; + +use async_trait::async_trait; +use common_error::mock::MockError; +use common_error::prelude::StatusCode; +use common_telemetry::logging; +use storage::metadata::{RegionMetaImpl, RegionMetadataRef}; +use storage::write_batch::WriteBatch; +use store_api::storage::{ + Chunk, ChunkReader, EngineContext, GetRequest, GetResponse, OpenOptions, ReadContext, Region, + RegionDescriptor, ScanRequest, ScanResponse, SchemaRef, Snapshot, StorageEngine, WriteContext, + WriteResponse, +}; + +pub type Result = std::result::Result; + +pub struct MockChunkReader { + schema: SchemaRef, +} + +#[async_trait] +impl ChunkReader for MockChunkReader { + type Error = MockError; + + fn schema(&self) -> &SchemaRef { + &self.schema + } + + async fn next_chunk(&mut self) -> Result> { + Ok(None) + } +} + +pub struct MockSnapshot { + metadata: RegionMetadataRef, +} + +#[async_trait] +impl Snapshot for MockSnapshot { + type Error = MockError; + type Reader = MockChunkReader; + + fn schema(&self) -> &SchemaRef { + &self.metadata.schema + } + + async fn scan( + &self, + _ctx: &ReadContext, + _request: ScanRequest, + ) -> Result> { + let reader = MockChunkReader { + schema: self.metadata.schema.clone(), + }; + + Ok(ScanResponse { reader }) + } + + async fn get(&self, _ctx: &ReadContext, _request: GetRequest) -> Result { + Ok(GetResponse {}) + } +} + +// Clones a MockRegion is not cheap as we need to clone the string name, but for test +// purpose the cost should be acceptable. +#[derive(Debug, Clone)] +pub struct MockRegion { + // FIXME(yingwen): Remove this once name is provided by metadata. + name: String, + // We share the same metadata definition with the storage engine. + metadata: RegionMetadataRef, +} + +#[async_trait] +impl Region for MockRegion { + type Error = MockError; + type Meta = RegionMetaImpl; + type WriteRequest = WriteBatch; + type Snapshot = MockSnapshot; + + fn name(&self) -> &str { + &self.name + } + + fn in_memory_metadata(&self) -> RegionMetaImpl { + RegionMetaImpl::new(self.metadata.clone()) + } + + async fn write(&self, _ctx: &WriteContext, _request: WriteBatch) -> Result { + Ok(WriteResponse {}) + } + + fn snapshot(&self, _ctx: &ReadContext) -> Result { + Ok(MockSnapshot { + metadata: self.metadata.clone(), + }) + } + + fn write_request(&self, schema: SchemaRef) -> WriteBatch { + WriteBatch::new(schema) + } +} + +type RegionMap = HashMap; + +#[derive(Debug, Default)] +struct RegionManager { + opened_regions: RegionMap, + closed_regions: RegionMap, +} + +#[derive(Debug, Clone, Default)] +pub struct MockEngine { + regions: Arc>, +} + +#[async_trait] +impl StorageEngine for MockEngine { + type Error = MockError; + type Region = MockRegion; + + async fn open_region( + &self, + _ctx: &EngineContext, + name: &str, + _opts: &OpenOptions, + ) -> Result { + logging::info!("Mock engine create region, name: {}", name); + + let mut regions = self.regions.lock().unwrap(); + if let Some(region) = regions.opened_regions.get(name) { + return Ok(region.clone()); + } + + if let Some(region) = regions.closed_regions.remove(name) { + regions + .opened_regions + .insert(name.to_string(), region.clone()); + return Ok(region); + } + + Err(MockError::with_backtrace(StatusCode::Unexpected)) + } + + async fn close_region(&self, _ctx: &EngineContext, _region: MockRegion) -> Result<()> { + unimplemented!() + } + + async fn create_region( + &self, + _ctx: &EngineContext, + descriptor: RegionDescriptor, + ) -> Result { + logging::info!("Mock engine create region, descriptor: {:?}", descriptor); + + let mut regions = self.regions.lock().unwrap(); + if let Some(region) = regions.opened_regions.get(&descriptor.name) { + return Ok(region.clone()); + } + + let name = descriptor.name.clone(); + let metadata = descriptor.try_into().unwrap(); + let region = MockRegion { + name: name.clone(), + metadata: Arc::new(metadata), + }; + regions.opened_regions.insert(name, region.clone()); + + Ok(region) + } + + async fn drop_region(&self, _ctx: &EngineContext, _region: Self::Region) -> Result<()> { + unimplemented!() + } + + fn get_region(&self, _ctx: &EngineContext, name: &str) -> Result> { + let regions = self.regions.lock().unwrap(); + Ok(regions.opened_regions.get(name).cloned()) + } +} diff --git a/src/table/src/engine.rs b/src/table/src/engine.rs index 972a05128a..aad991ed80 100644 --- a/src/table/src/engine.rs +++ b/src/table/src/engine.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use crate::error::Result; -use crate::requests::{AlterTableRequest, CreateTableRequest, DropTableRequest}; +use crate::requests::{AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest}; use crate::TableRef; /// Table engine abstraction. @@ -16,6 +16,9 @@ pub trait TableEngine: Send + Sync { request: CreateTableRequest, ) -> Result; + /// Open an existing table by given `request`, returns the opened table. + async fn open_table(&self, ctx: &EngineContext, request: OpenTableRequest) -> Result; + /// Alter table schema, options etc. by given request, /// /// Returns the table after altered. diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index 48a144bfee..bbec4a7631 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -4,6 +4,8 @@ use std::collections::HashMap; use datatypes::prelude::VectorRef; use datatypes::schema::SchemaRef; +use crate::metadata::TableId; + /// Insert request pub struct InsertRequest { pub table_name: String, @@ -11,14 +13,28 @@ pub struct InsertRequest { } /// Create table request +#[derive(Debug)] pub struct CreateTableRequest { pub name: String, pub desc: Option, pub schema: SchemaRef, + // TODO(yingwen): 1. Add catalog_name/schema_name and other infos 2. Support create_if_not_exists. +} + +/// Open table request +#[derive(Debug, Clone)] +pub struct OpenTableRequest { + pub catalog_name: String, + pub schema_name: String, + pub table_name: String, + // TODO(yingwen): TableId could be recovered from the table metadata. + pub table_id: TableId, } /// Alter table request +#[derive(Debug)] pub struct AlterTableRequest {} /// Drop table request +#[derive(Debug)] pub struct DropTableRequest {}