refactor(table): remove Table impl for system (#2270)

* refactor(table): remove Table impl for system

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: format & import

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
Zhenchi
2023-08-29 11:43:43 +08:00
committed by GitHub
parent 22dea02485
commit 2a6c830ca7
3 changed files with 90 additions and 96 deletions

View File

@@ -136,7 +136,7 @@ impl LocalCatalogManager {
schema: INFORMATION_SCHEMA_NAME.to_string(),
table_name: SYSTEM_CATALOG_TABLE_NAME.to_string(),
table_id: SYSTEM_CATALOG_TABLE_ID,
table: self.system.information_schema.system.clone(),
table: self.system.information_schema.system.as_table_ref(),
};
self.catalogs.register_table(register_table_req).await?;

View File

@@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;
@@ -21,24 +20,23 @@ use common_catalog::consts::{
SYSTEM_CATALOG_NAME, SYSTEM_CATALOG_TABLE_ID, SYSTEM_CATALOG_TABLE_NAME,
};
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::debug;
use common_telemetry::{debug, warn};
use common_time::util;
use datatypes::prelude::{ConcreteDataType, ScalarVector, VectorRef};
use datatypes::schema::{ColumnSchema, RawSchema, SchemaRef};
use datatypes::schema::{ColumnSchema, RawSchema};
use datatypes::vectors::{BinaryVector, TimestampMillisecondVector, UInt8Vector};
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::ScanRequest;
use table::engine::{EngineContext, TableEngineRef};
use table::metadata::{TableId, TableInfoRef, TableType};
use table::requests::{
CreateTableRequest, DeleteRequest, InsertRequest, OpenTableRequest, TableOptions,
};
use table::{Result as TableResult, Table, TableRef};
use table::metadata::TableId;
use table::requests::{CreateTableRequest, InsertRequest, OpenTableRequest, TableOptions};
use table::TableRef;
use crate::error::{
self, CreateSystemCatalogSnafu, EmptyValueSnafu, Error, InvalidEntryTypeSnafu, InvalidKeySnafu,
OpenSystemCatalogSnafu, Result, ValueDeserializeSnafu,
self, CreateSystemCatalogSnafu, DeregisterTableSnafu, EmptyValueSnafu, Error,
InsertCatalogRecordSnafu, InvalidEntryTypeSnafu, InvalidKeySnafu, OpenSystemCatalogSnafu,
Result, ValueDeserializeSnafu,
};
use crate::DeregisterTableRequest;
@@ -48,42 +46,6 @@ pub const VALUE_INDEX: usize = 3;
pub struct SystemCatalogTable(TableRef);
#[async_trait::async_trait]
impl Table for SystemCatalogTable {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.0.schema()
}
async fn scan_to_stream(&self, request: ScanRequest) -> TableResult<SendableRecordBatchStream> {
self.0.scan_to_stream(request).await
}
/// Insert values into table.
async fn insert(&self, request: InsertRequest) -> TableResult<usize> {
self.0.insert(request).await
}
fn table_info(&self) -> TableInfoRef {
self.0.table_info()
}
fn table_type(&self) -> TableType {
self.0.table_type()
}
async fn delete(&self, request: DeleteRequest) -> TableResult<usize> {
self.0.delete(request).await
}
fn statistics(&self) -> Option<table::stats::TableStatistics> {
self.0.statistics()
}
}
impl SystemCatalogTable {
pub async fn new(engine: TableEngineRef) -> Result<Self> {
let request = OpenTableRequest {
@@ -126,6 +88,54 @@ impl SystemCatalogTable {
}
}
pub async fn register_table(
&self,
catalog: String,
schema: String,
table_name: String,
table_id: TableId,
engine: String,
) -> Result<usize> {
let insert_request =
build_table_insert_request(catalog, schema, table_name, table_id, engine);
self.0
.insert(insert_request)
.await
.context(InsertCatalogRecordSnafu)
}
pub(crate) async fn deregister_table(
&self,
request: &DeregisterTableRequest,
table_id: TableId,
) -> Result<()> {
let deletion_request = build_table_deletion_request(request, table_id);
self.0
.insert(deletion_request)
.await
.map(|x| {
if x != 1 {
let table = common_catalog::format_full_table_name(
&request.catalog,
&request.schema,
&request.table_name
);
warn!("Failed to delete table record from information_schema, unexpected returned result: {x}, table: {table}");
}
})
.with_context(|_| DeregisterTableSnafu {
request: request.clone(),
})
}
pub async fn register_schema(&self, catalog: String, schema: String) -> Result<usize> {
let insert_request = build_schema_insert_request(catalog, schema);
self.0
.insert(insert_request)
.await
.context(InsertCatalogRecordSnafu)
}
/// Create a stream of all entries inside system catalog table
pub async fn records(&self) -> Result<SendableRecordBatchStream> {
let full_projection = None;
@@ -137,11 +147,16 @@ impl SystemCatalogTable {
limit: None,
};
let stream = self
.0
.scan_to_stream(scan_req)
.await
.context(error::SystemCatalogTableScanSnafu)?;
Ok(stream)
}
pub fn as_table_ref(&self) -> TableRef {
self.0.clone()
}
}
/// Build system catalog table schema.
@@ -541,14 +556,14 @@ mod tests {
async fn test_system_table_type() {
let (_dir, table_engine) = prepare_table_engine().await;
let system_table = SystemCatalogTable::new(table_engine).await.unwrap();
assert_eq!(Base, system_table.table_type());
assert_eq!(Base, system_table.as_table_ref().table_type());
}
#[tokio::test]
async fn test_system_table_info() {
let (_dir, table_engine) = prepare_table_engine().await;
let system_table = SystemCatalogTable::new(table_engine).await.unwrap();
let info = system_table.table_info();
let info = system_table.as_table_ref().table_info();
assert_eq!(TableType::Base, info.table_type);
assert_eq!(SYSTEM_CATALOG_TABLE_NAME, info.name);
assert_eq!(SYSTEM_CATALOG_TABLE_ID, info.ident.table_id);
@@ -561,14 +576,16 @@ mod tests {
let (_, table_engine) = prepare_table_engine().await;
let catalog_table = SystemCatalogTable::new(table_engine).await.unwrap();
let table_insertion = build_table_insert_request(
DEFAULT_CATALOG_NAME.to_string(),
DEFAULT_SCHEMA_NAME.to_string(),
"my_table".to_string(),
1,
MITO_ENGINE.to_string(),
);
let result = catalog_table.insert(table_insertion).await.unwrap();
let result = catalog_table
.register_table(
DEFAULT_CATALOG_NAME.to_string(),
DEFAULT_SCHEMA_NAME.to_string(),
"my_table".to_string(),
1,
MITO_ENGINE.to_string(),
)
.await
.unwrap();
assert_eq!(result, 1);
let records = catalog_table.records().await.unwrap();
@@ -598,16 +615,17 @@ mod tests {
});
assert_eq!(entry, expected);
let table_deletion = build_table_deletion_request(
&DeregisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "my_table".to_string(),
},
1,
);
let result = catalog_table.insert(table_deletion).await.unwrap();
assert_eq!(result, 1);
catalog_table
.deregister_table(
&DeregisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "my_table".to_string(),
},
1,
)
.await
.unwrap();
let records = catalog_table.records().await.unwrap();
let batches = RecordBatches::try_collect(records).await.unwrap().take();

View File

@@ -16,16 +16,9 @@
use std::sync::Arc;
use common_telemetry::logging;
use snafu::ResultExt;
use table::metadata::TableId;
use table::Table;
use crate::error::{self, InsertCatalogRecordSnafu, Result as CatalogResult};
use crate::system::{
build_schema_insert_request, build_table_deletion_request, build_table_insert_request,
SystemCatalogTable,
};
use crate::system::SystemCatalogTable;
use crate::DeregisterTableRequest;
pub struct InformationSchema {
@@ -54,36 +47,21 @@ impl SystemCatalog {
table_id: TableId,
engine: String,
) -> crate::error::Result<usize> {
let request = build_table_insert_request(catalog, schema, table_name, table_id, engine);
self.information_schema
.system
.insert(request)
.register_table(catalog, schema, table_name, table_id, engine)
.await
.context(InsertCatalogRecordSnafu)
}
pub(crate) async fn deregister_table(
&self,
request: &DeregisterTableRequest,
table_id: TableId,
) -> CatalogResult<()> {
) -> crate::error::Result<()> {
self.information_schema
.system
.insert(build_table_deletion_request(request, table_id))
.deregister_table(request, table_id)
.await
.map(|x| {
if x != 1 {
let table = common_catalog::format_full_table_name(
&request.catalog,
&request.schema,
&request.table_name
);
logging::warn!("Failed to delete table record from information_schema, unexpected returned result: {x}, table: {table}");
}
})
.with_context(|_| error::DeregisterTableSnafu {
request: request.clone(),
})
}
pub async fn register_schema(
@@ -91,11 +69,9 @@ impl SystemCatalog {
catalog: String,
schema: String,
) -> crate::error::Result<usize> {
let request = build_schema_insert_request(catalog, schema);
self.information_schema
.system
.insert(request)
.register_schema(catalog, schema)
.await
.context(InsertCatalogRecordSnafu)
}
}