feat: Adds TableEngine::open_table() (#132)

* feat: Add `open_table()` method to `TableEngine`

* feat: Implements MitoEngine::open_table()

For simplicity, this implementation just use the table name as region
name, and using that name to open a region for that table. It also
introduce a mutex to avoid opening the same table simultaneously.

* refactor: Shorten generic param name

Use `S` instead of `Store` for `MitoEngine`.

* test: Mock storage engine for table engine test

Add a `MockEngine` to mock the storage engine, so that testing the mito
table engine can sometimes use the mocked storage.

* test: Add open table test

Also remove `storage::gen_region_name` method, and always use table name
as default region name, so the table engine can open the table created
by `create_table()`.

* chore: Add open table log
This commit is contained in:
evenyag
2022-08-04 17:35:17 +08:00
committed by GitHub
parent 56fae412d2
commit fb4495eb46
12 changed files with 407 additions and 55 deletions

View File

@@ -20,6 +20,6 @@ mod sync;
mod test_util;
mod version;
mod wal;
mod write_batch;
pub mod write_batch;
pub use engine::EngineImpl;

View File

@@ -91,6 +91,8 @@ impl<S: LogStore> RegionImpl<S> {
/// Create a new region and also persist the region metadata to manifest.
///
/// The caller should avoid calling this method simultaneously.
// FIXME(yingwen): Region id is already specific in metadata, but name is not specific in metadata. We should
// add name to RegionMetadata.
pub async fn create(
id: RegionId,
name: String,

View File

@@ -16,7 +16,7 @@ pub use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
pub use self::chunk::{Chunk, ChunkReader};
pub use self::descriptors::{
gen_region_name, ColumnDescriptor, ColumnDescriptorBuilder, ColumnFamilyDescriptor,
ColumnDescriptor, ColumnDescriptorBuilder, ColumnFamilyDescriptor,
ColumnFamilyDescriptorBuilder, ColumnFamilyId, ColumnId, RegionDescriptor, RegionId,
RowKeyDescriptor, RowKeyDescriptorBuilder,
};

View File

@@ -10,13 +10,6 @@ pub type ColumnId = u32;
pub type ColumnFamilyId = u32;
/// Id of the region.
pub type RegionId = u32;
/// Default region name prefix
pub const REGION_PREFIX: &str = "r_";
#[inline]
pub fn gen_region_name(id: RegionId) -> String {
format!("{}{}", REGION_PREFIX, id)
}
// TODO(yingwen): Validate default value has same type with column, and name is a valid column name.
/// A [ColumnDescriptor] contains information to create a column.
@@ -234,10 +227,4 @@ mod tests {
.unwrap();
assert_eq!(1, desc.columns.len());
}
#[test]
fn test_gen_region_name() {
assert_eq!("r_0", gen_region_name(0));
assert_eq!("r_99", gen_region_name(99));
}
}

View File

@@ -19,6 +19,7 @@ snafu = { version = "0.7", features = ["backtraces"] }
storage ={ path = "../storage" }
store-api ={ path = "../store-api" }
table = { path = "../table" }
tokio = { version = "1.0", features = ["full"] }
[dev-dependencies]
datatypes = { path = "../datatypes" }

View File

@@ -5,20 +5,20 @@ use std::sync::RwLock;
use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_telemetry::logging;
use snafu::ResultExt;
use store_api::storage::ConcreteDataType;
use store_api::storage::{
self as store, ColumnDescriptorBuilder, ColumnFamilyDescriptorBuilder,
EngineContext as StorageContext, Region, RegionDescriptor, RegionId, RegionMeta,
RowKeyDescriptorBuilder, StorageEngine,
self, ColumnDescriptorBuilder, ColumnFamilyDescriptorBuilder, ConcreteDataType, OpenOptions,
Region, RegionDescriptor, RegionId, RegionMeta, RowKeyDescriptorBuilder, StorageEngine,
};
use table::engine::{EngineContext, TableEngine};
use table::requests::{AlterTableRequest, CreateTableRequest, DropTableRequest};
use table::requests::{AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest};
use table::Result as TableResult;
use table::{
metadata::{TableId, TableInfoBuilder, TableMetaBuilder, TableType},
table::TableRef,
};
use tokio::sync::Mutex;
use crate::error::{self, Result};
use crate::table::MitoTable;
@@ -30,12 +30,12 @@ pub const DEFAULT_ENGINE: &str = "mito";
/// About mito <https://en.wikipedia.org/wiki/Alfa_Romeo_MiTo>.
/// "you can't be a true petrolhead until you've owned an Alfa Romeo" -- by Jeremy Clarkson
#[derive(Clone)]
pub struct MitoEngine<Store: StorageEngine> {
inner: Arc<MitoEngineInner<Store>>,
pub struct MitoEngine<S: StorageEngine> {
inner: Arc<MitoEngineInner<S>>,
}
impl<Store: StorageEngine> MitoEngine<Store> {
pub fn new(storage_engine: Store) -> Self {
impl<S: StorageEngine> MitoEngine<S> {
pub fn new(storage_engine: S) -> Self {
Self {
inner: Arc::new(MitoEngineInner::new(storage_engine)),
}
@@ -43,7 +43,7 @@ impl<Store: StorageEngine> MitoEngine<Store> {
}
#[async_trait]
impl<Store: StorageEngine> TableEngine for MitoEngine<Store> {
impl<S: StorageEngine> TableEngine for MitoEngine<S> {
async fn create_table(
&self,
ctx: &EngineContext,
@@ -52,6 +52,14 @@ impl<Store: StorageEngine> TableEngine for MitoEngine<Store> {
Ok(self.inner.create_table(ctx, request).await?)
}
async fn open_table(
&self,
ctx: &EngineContext,
request: OpenTableRequest,
) -> TableResult<TableRef> {
Ok(self.inner.open_table(ctx, request).await?)
}
async fn alter_table(
&self,
_ctx: &EngineContext,
@@ -60,8 +68,8 @@ impl<Store: StorageEngine> TableEngine for MitoEngine<Store> {
unimplemented!();
}
fn get_table(&self, ctx: &EngineContext, name: &str) -> TableResult<Option<TableRef>> {
Ok(self.inner.get_table(ctx, name)?)
fn get_table(&self, _ctx: &EngineContext, name: &str) -> TableResult<Option<TableRef>> {
Ok(self.inner.get_table(name))
}
fn table_exists(&self, _ctx: &EngineContext, _name: &str) -> bool {
@@ -78,18 +86,26 @@ impl<Store: StorageEngine> TableEngine for MitoEngine<Store> {
}
/// FIXME(dennis) impl system catalog to keep table metadata.
struct MitoEngineInner<Store: StorageEngine> {
struct MitoEngineInner<S: StorageEngine> {
/// All tables opened by the engine.
///
/// Writing to `tables` should also hold the `table_mutex`.
tables: RwLock<HashMap<String, TableRef>>,
storage_engine: Store,
storage_engine: S,
// FIXME(yingwen): Remove `next_table_id`. Table id should be assigned by other module (maybe catalog).
next_table_id: AtomicU64,
/// Table mutex is used to protect the operations such as creating/opening/closing
/// a table, to avoid things like opening the same table simultaneously.
table_mutex: Mutex<()>,
}
impl<Store: StorageEngine> MitoEngineInner<Store> {
fn new(storage_engine: Store) -> Self {
impl<S: StorageEngine> MitoEngineInner<S> {
fn new(storage_engine: S) -> Self {
Self {
tables: RwLock::new(HashMap::default()),
storage_engine,
next_table_id: AtomicU64::new(0),
table_mutex: Mutex::new(()),
}
}
@@ -98,7 +114,7 @@ impl<Store: StorageEngine> MitoEngineInner<Store> {
}
}
impl<Store: StorageEngine> MitoEngineInner<Store> {
impl<S: StorageEngine> MitoEngineInner<S> {
async fn create_table(
&self,
_ctx: &EngineContext,
@@ -114,7 +130,8 @@ impl<Store: StorageEngine> MitoEngineInner<Store> {
//TODO(dennis): supports multi regions
let region_id: RegionId = 0;
let name = store::gen_region_name(region_id);
// TODO(yingwen): Maybe we should use table name as part of region name.
let name = request.name.clone();
let host_column =
ColumnDescriptorBuilder::new(0, "host", ConcreteDataType::string_datatype())
@@ -150,7 +167,7 @@ impl<Store: StorageEngine> MitoEngineInner<Store> {
let region = self
.storage_engine
.create_region(
&StorageContext::default(),
&storage::EngineContext::default(),
RegionDescriptor {
id: region_id,
name,
@@ -189,8 +206,66 @@ impl<Store: StorageEngine> MitoEngineInner<Store> {
Ok(table)
}
fn get_table(&self, _ctx: &EngineContext, name: &str) -> Result<Option<TableRef>> {
Ok(self.tables.read().unwrap().get(name).cloned())
// TODO(yingwen): Support catalog and schema name.
async fn open_table(
&self,
_ctx: &EngineContext,
request: OpenTableRequest,
) -> TableResult<TableRef> {
let table_name = &request.table_name;
if let Some(table) = self.get_table(table_name) {
// Table has already been opened.
return Ok(table);
}
// Acquires the mutex before opening a new table.
let table = {
let _lock = self.table_mutex.lock().await;
// Checks again, read lock should be enough since we are guarded by the mutex.
if let Some(table) = self.get_table(table_name) {
return Ok(table);
}
let engine_ctx = storage::EngineContext::default();
let opts = OpenOptions::default();
let region_name = table_name;
// Now we just use table name as region name. TODO(yingwen): Naming pattern of region.
let region = self
.storage_engine
.open_region(&engine_ctx, region_name, &opts)
.await
.map_err(BoxedError::new)
.context(error::OpenRegionSnafu { region_name })?;
let table_meta = TableMetaBuilder::default()
.schema(region.in_memory_metadata().schema().clone())
.engine(DEFAULT_ENGINE)
.build()
.context(error::BuildTableMetaSnafu)?;
let table_info = TableInfoBuilder::new(table_name.clone(), table_meta)
.ident(request.table_id)
.table_version(0u64)
.table_type(TableType::Base)
.build()
.context(error::BuildTableInfoSnafu)?;
let table = Arc::new(MitoTable::new(table_info, region));
self.tables
.write()
.unwrap()
.insert(table_name.to_string(), table.clone());
table
};
logging::info!("Mito engine opened table {}", table_name);
Ok(table)
}
fn get_table(&self, name: &str) -> Option<TableRef> {
self.tables.read().unwrap().get(name).cloned()
}
}
@@ -203,11 +278,11 @@ mod tests {
use table::requests::InsertRequest;
use super::*;
use crate::table::test;
use crate::table::test_util;
#[tokio::test]
async fn test_create_table_insert_scan() {
let (_engine, table, schema, _dir) = test::setup_test_engine_and_table().await;
let (_engine, table, schema, _dir) = test_util::setup_test_engine_and_table().await;
assert_eq!(TableType::Base, table.table_type());
assert_eq!(schema, table.schema());
@@ -254,4 +329,38 @@ mod tests {
assert_eq!(cpus.to_arrow_array(), columns[2]);
assert_eq!(memories.to_arrow_array(), columns[3]);
}
#[tokio::test]
async fn test_open_table() {
common_telemetry::init_default_ut_logging();
let ctx = EngineContext::default();
let open_req = OpenTableRequest {
catalog_name: String::new(),
schema_name: String::new(),
table_name: test_util::TABLE_NAME.to_string(),
// Currently the first table has id 0.
table_id: 0,
};
let (engine, table) = {
let (engine, table_engine, table) = test_util::setup_mock_engine_and_table().await;
// Now try to open the table again.
let reopened = table_engine
.open_table(&ctx, open_req.clone())
.await
.unwrap();
assert_eq!(table.schema(), reopened.schema());
(engine, table)
};
// Construct a new table engine, and try to open the table.
let table_engine = MitoEngine::new(engine);
let reopened = table_engine
.open_table(&ctx, open_req.clone())
.await
.unwrap();
assert_eq!(table.schema(), reopened.schema());
}
}

View File

@@ -2,15 +2,35 @@ use std::any::Any;
use common_error::ext::BoxedError;
use common_error::prelude::*;
use table::metadata::{TableInfoBuilderError, TableMetaBuilderError};
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Fail to create region, source: {}", source))]
#[snafu(display("Failed to create region, source: {}", source))]
CreateRegion {
#[snafu(backtrace)]
source: BoxedError,
},
#[snafu(display("Failed to open region, region: {}, source: {}", region_name, source))]
OpenRegion {
region_name: String,
#[snafu(backtrace)]
source: BoxedError,
},
#[snafu(display("Failed to build table meta, source: {}", source))]
BuildTableMeta {
source: TableMetaBuilderError,
backtrace: Backtrace,
},
#[snafu(display("Failed to build table info, source: {}", source))]
BuildTableInfo {
source: TableInfoBuilderError,
backtrace: Backtrace,
},
}
impl From<Error> for table::error::Error {
@@ -23,8 +43,11 @@ pub type Result<T> = std::result::Result<T, Error>;
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
use Error::*;
match self {
Error::CreateRegion { source, .. } => source.status_code(),
CreateRegion { source, .. } | OpenRegion { source, .. } => source.status_code(),
BuildTableMeta { .. } | BuildTableInfo { .. } => StatusCode::Unexpected,
}
}

View File

@@ -1,5 +1,5 @@
#[cfg(test)]
pub mod test;
pub mod test_util;
use std::any::Any;
use std::pin::Pin;

View File

@@ -1,3 +1,5 @@
mod mock_engine;
use std::sync::Arc;
use datatypes::prelude::ConcreteDataType;
@@ -13,13 +15,11 @@ use table::TableRef;
use tempdir::TempDir;
use crate::engine::MitoEngine;
use crate::table::test_util::mock_engine::MockEngine;
pub async fn setup_test_engine_and_table() -> (
MitoEngine<EngineImpl<NoopLogStore>>,
TableRef,
SchemaRef,
TempDir,
) {
pub const TABLE_NAME: &str = "demo";
fn schema_for_test() -> Schema {
let column_schemas = vec![
ColumnSchema::new("host", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), true),
@@ -27,10 +27,21 @@ pub async fn setup_test_engine_and_table() -> (
ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true),
];
Schema::with_timestamp_index(column_schemas, 1).expect("ts must be timestamp column")
}
pub type MockMitoEngine = MitoEngine<MockEngine>;
pub async fn setup_test_engine_and_table() -> (
MitoEngine<EngineImpl<NoopLogStore>>,
TableRef,
SchemaRef,
TempDir,
) {
let dir = TempDir::new("setup_test_engine_and_table").unwrap();
let store_dir = dir.path().to_string_lossy();
let table_engine = MitoEngine::<EngineImpl<NoopLogStore>>::new(
let table_engine = MitoEngine::new(
EngineImpl::new(
EngineConfig::with_store_dir(&store_dir),
Arc::new(NoopLogStore::default()),
@@ -39,16 +50,13 @@ pub async fn setup_test_engine_and_table() -> (
.unwrap(),
);
let table_name = "demo";
let schema = Arc::new(
Schema::with_timestamp_index(column_schemas, 1).expect("ts must be timestamp column"),
);
let schema = Arc::new(schema_for_test());
let table = table_engine
.create_table(
&EngineContext::default(),
CreateTableRequest {
name: table_name.to_string(),
desc: Some(" a test table".to_string()),
name: TABLE_NAME.to_string(),
desc: Some("a test table".to_string()),
schema: schema.clone(),
},
)
@@ -57,3 +65,23 @@ pub async fn setup_test_engine_and_table() -> (
(table_engine, table, schema, dir)
}
pub async fn setup_mock_engine_and_table() -> (MockEngine, MockMitoEngine, TableRef) {
let mock_engine = MockEngine::default();
let table_engine = MitoEngine::new(mock_engine.clone());
let schema = Arc::new(schema_for_test());
let table = table_engine
.create_table(
&EngineContext::default(),
CreateTableRequest {
name: TABLE_NAME.to_string(),
desc: None,
schema: schema.clone(),
},
)
.await
.unwrap();
(mock_engine, table_engine, table)
}

View File

@@ -0,0 +1,183 @@
//! A mock storage engine for table test purpose.
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use async_trait::async_trait;
use common_error::mock::MockError;
use common_error::prelude::StatusCode;
use common_telemetry::logging;
use storage::metadata::{RegionMetaImpl, RegionMetadataRef};
use storage::write_batch::WriteBatch;
use store_api::storage::{
Chunk, ChunkReader, EngineContext, GetRequest, GetResponse, OpenOptions, ReadContext, Region,
RegionDescriptor, ScanRequest, ScanResponse, SchemaRef, Snapshot, StorageEngine, WriteContext,
WriteResponse,
};
pub type Result<T> = std::result::Result<T, MockError>;
pub struct MockChunkReader {
schema: SchemaRef,
}
#[async_trait]
impl ChunkReader for MockChunkReader {
type Error = MockError;
fn schema(&self) -> &SchemaRef {
&self.schema
}
async fn next_chunk(&mut self) -> Result<Option<Chunk>> {
Ok(None)
}
}
pub struct MockSnapshot {
metadata: RegionMetadataRef,
}
#[async_trait]
impl Snapshot for MockSnapshot {
type Error = MockError;
type Reader = MockChunkReader;
fn schema(&self) -> &SchemaRef {
&self.metadata.schema
}
async fn scan(
&self,
_ctx: &ReadContext,
_request: ScanRequest,
) -> Result<ScanResponse<MockChunkReader>> {
let reader = MockChunkReader {
schema: self.metadata.schema.clone(),
};
Ok(ScanResponse { reader })
}
async fn get(&self, _ctx: &ReadContext, _request: GetRequest) -> Result<GetResponse> {
Ok(GetResponse {})
}
}
// Clones a MockRegion is not cheap as we need to clone the string name, but for test
// purpose the cost should be acceptable.
#[derive(Debug, Clone)]
pub struct MockRegion {
// FIXME(yingwen): Remove this once name is provided by metadata.
name: String,
// We share the same metadata definition with the storage engine.
metadata: RegionMetadataRef,
}
#[async_trait]
impl Region for MockRegion {
type Error = MockError;
type Meta = RegionMetaImpl;
type WriteRequest = WriteBatch;
type Snapshot = MockSnapshot;
fn name(&self) -> &str {
&self.name
}
fn in_memory_metadata(&self) -> RegionMetaImpl {
RegionMetaImpl::new(self.metadata.clone())
}
async fn write(&self, _ctx: &WriteContext, _request: WriteBatch) -> Result<WriteResponse> {
Ok(WriteResponse {})
}
fn snapshot(&self, _ctx: &ReadContext) -> Result<MockSnapshot> {
Ok(MockSnapshot {
metadata: self.metadata.clone(),
})
}
fn write_request(&self, schema: SchemaRef) -> WriteBatch {
WriteBatch::new(schema)
}
}
type RegionMap = HashMap<String, MockRegion>;
#[derive(Debug, Default)]
struct RegionManager {
opened_regions: RegionMap,
closed_regions: RegionMap,
}
#[derive(Debug, Clone, Default)]
pub struct MockEngine {
regions: Arc<Mutex<RegionManager>>,
}
#[async_trait]
impl StorageEngine for MockEngine {
type Error = MockError;
type Region = MockRegion;
async fn open_region(
&self,
_ctx: &EngineContext,
name: &str,
_opts: &OpenOptions,
) -> Result<MockRegion> {
logging::info!("Mock engine create region, name: {}", name);
let mut regions = self.regions.lock().unwrap();
if let Some(region) = regions.opened_regions.get(name) {
return Ok(region.clone());
}
if let Some(region) = regions.closed_regions.remove(name) {
regions
.opened_regions
.insert(name.to_string(), region.clone());
return Ok(region);
}
Err(MockError::with_backtrace(StatusCode::Unexpected))
}
async fn close_region(&self, _ctx: &EngineContext, _region: MockRegion) -> Result<()> {
unimplemented!()
}
async fn create_region(
&self,
_ctx: &EngineContext,
descriptor: RegionDescriptor,
) -> Result<MockRegion> {
logging::info!("Mock engine create region, descriptor: {:?}", descriptor);
let mut regions = self.regions.lock().unwrap();
if let Some(region) = regions.opened_regions.get(&descriptor.name) {
return Ok(region.clone());
}
let name = descriptor.name.clone();
let metadata = descriptor.try_into().unwrap();
let region = MockRegion {
name: name.clone(),
metadata: Arc::new(metadata),
};
regions.opened_regions.insert(name, region.clone());
Ok(region)
}
async fn drop_region(&self, _ctx: &EngineContext, _region: Self::Region) -> Result<()> {
unimplemented!()
}
fn get_region(&self, _ctx: &EngineContext, name: &str) -> Result<Option<MockRegion>> {
let regions = self.regions.lock().unwrap();
Ok(regions.opened_regions.get(name).cloned())
}
}

View File

@@ -1,7 +1,7 @@
use std::sync::Arc;
use crate::error::Result;
use crate::requests::{AlterTableRequest, CreateTableRequest, DropTableRequest};
use crate::requests::{AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest};
use crate::TableRef;
/// Table engine abstraction.
@@ -16,6 +16,9 @@ pub trait TableEngine: Send + Sync {
request: CreateTableRequest,
) -> Result<TableRef>;
/// Open an existing table by given `request`, returns the opened table.
async fn open_table(&self, ctx: &EngineContext, request: OpenTableRequest) -> Result<TableRef>;
/// Alter table schema, options etc. by given request,
///
/// Returns the table after altered.

View File

@@ -4,6 +4,8 @@ use std::collections::HashMap;
use datatypes::prelude::VectorRef;
use datatypes::schema::SchemaRef;
use crate::metadata::TableId;
/// Insert request
pub struct InsertRequest {
pub table_name: String,
@@ -11,14 +13,28 @@ pub struct InsertRequest {
}
/// Create table request
#[derive(Debug)]
pub struct CreateTableRequest {
pub name: String,
pub desc: Option<String>,
pub schema: SchemaRef,
// TODO(yingwen): 1. Add catalog_name/schema_name and other infos 2. Support create_if_not_exists.
}
/// Open table request
#[derive(Debug, Clone)]
pub struct OpenTableRequest {
pub catalog_name: String,
pub schema_name: String,
pub table_name: String,
// TODO(yingwen): TableId could be recovered from the table metadata.
pub table_id: TableId,
}
/// Alter table request
#[derive(Debug)]
pub struct AlterTableRequest {}
/// Drop table request
#[derive(Debug)]
pub struct DropTableRequest {}