feat: add table info (#323)

* refactor: add table_info method for Table trait

* feat: add table_info method to Table trait

* test: add more unit test

* fix: impl table_info for SystemTable

* test: fix failing test
This commit is contained in:
Lei, Huang
2022-10-20 12:23:44 +08:00
committed by GitHub
parent d5800d0b60
commit 2d52f19662
27 changed files with 318 additions and 70 deletions

7
Cargo.lock generated
View File

@@ -644,10 +644,16 @@ dependencies = [
"datatypes",
"futures",
"futures-util",
"log-store",
"object-store",
"opendal",
"serde",
"serde_json",
"snafu",
"storage",
"table",
"table-engine",
"tempdir",
"tokio",
]
@@ -4752,6 +4758,7 @@ checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
name = "sql"
version = "0.1.0"
dependencies = [
"catalog",
"common-error",
"common-time",
"datatypes",

View File

@@ -22,4 +22,10 @@ snafu = { version = "0.7", features = ["backtraces"] }
table = { path = "../table" }
[dev-dependencies]
log-store = { path = "../log-store" }
object-store = { path = "../object-store" }
opendal = "0.17"
storage = { path = "../storage" }
table-engine = { path = "../table-engine" }
tempdir = "0.3"
tokio = { version = "1.0", features = ["full"] }

View File

@@ -97,7 +97,7 @@ impl LocalCatalogManager {
let table_id = req.create_table_request.id;
let table = if let Some(table) =
self.table(catalog_name.as_deref(), schema_name.as_deref(), table_name)?
self.table(Some(catalog_name), Some(schema_name), table_name)?
{
table
} else {
@@ -108,15 +108,12 @@ impl LocalCatalogManager {
.with_context(|_| CreateTableSnafu {
table_info: format!(
"{}.{}.{}, id: {}",
catalog_name.as_deref().unwrap_or(DEFAULT_CATALOG_NAME),
schema_name.as_deref().unwrap_or(DEFAULT_SCHEMA_NAME),
table_name,
table_id,
catalog_name, schema_name, table_name, table_id,
),
})?;
self.register_table(RegisterTableRequest {
catalog: catalog_name.clone(),
schema: schema_name.clone(),
catalog: Some(catalog_name.clone()),
schema: Some(schema_name.clone()),
table_name: table_name.clone(),
table_id,
table: table.clone(),

View File

@@ -13,7 +13,7 @@ use datatypes::vectors::{BinaryVector, TimestampVector, UInt8Vector};
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use table::engine::{EngineContext, TableEngineRef};
use table::metadata::TableId;
use table::metadata::{TableId, TableInfoRef};
use table::requests::{CreateTableRequest, InsertRequest, OpenTableRequest};
use table::{Table, TableRef};
@@ -32,7 +32,7 @@ pub const TIMESTAMP_INDEX: usize = 2;
pub const VALUE_INDEX: usize = 3;
pub struct SystemCatalogTable {
schema: SchemaRef,
table_info: TableInfoRef,
pub table: TableRef,
}
@@ -43,7 +43,7 @@ impl Table for SystemCatalogTable {
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
self.table_info.meta.schema.clone()
}
async fn scan(
@@ -59,6 +59,10 @@ impl Table for SystemCatalogTable {
async fn insert(&self, request: InsertRequest) -> table::error::Result<usize> {
self.table.insert(request).await
}
fn table_info(&self) -> TableInfoRef {
self.table_info.clone()
}
}
impl SystemCatalogTable {
@@ -77,13 +81,16 @@ impl SystemCatalogTable {
.await
.context(OpenSystemCatalogSnafu)?
{
Ok(Self { table, schema })
Ok(Self {
table_info: table.table_info(),
table,
})
} else {
// system catalog table is not yet created, try to create
let request = CreateTableRequest {
id: SYSTEM_CATALOG_TABLE_ID,
catalog_name: Some(SYSTEM_CATALOG_NAME.to_string()),
schema_name: Some(INFORMATION_SCHEMA_NAME.to_string()),
catalog_name: SYSTEM_CATALOG_NAME.to_string(),
schema_name: INFORMATION_SCHEMA_NAME.to_string(),
table_name: SYSTEM_CATALOG_TABLE_NAME.to_string(),
desc: Some("System catalog table".to_string()),
schema: schema.clone(),
@@ -96,7 +103,8 @@ impl SystemCatalogTable {
.create_table(&ctx, request)
.await
.context(CreateSystemCatalogSnafu)?;
Ok(Self { table, schema })
let table_info = table.table_info();
Ok(Self { table, table_info })
}
}
@@ -320,6 +328,22 @@ pub struct TableEntryValue {
#[cfg(test)]
mod tests {
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::field_util::SchemaExt;
use datatypes::arrow;
use log_store::fs::noop::NoopLogStore;
use object_store::ObjectStore;
use storage::config::EngineConfig as StorageEngineConfig;
use storage::EngineImpl;
use table::engine::TableEngine;
use table::metadata::TableType;
use table::metadata::TableType::Base;
use table::requests::{AlterTableRequest, DropTableRequest};
use table::table::adapter::TableAdapter;
use table_engine::config::EngineConfig;
use table_engine::engine::MitoEngine;
use tempdir::TempDir;
use super::*;
#[test]
@@ -391,4 +415,120 @@ mod tests {
assert_eq!(EntryType::Table, EntryType::try_from(3).unwrap());
assert!(EntryType::try_from(4).is_err());
}
struct MockTableEngine {
table_name: String,
sole_table: TableRef,
}
impl Default for MockTableEngine {
fn default() -> Self {
Self {
table_name: SYSTEM_CATALOG_TABLE_NAME.to_string(),
sole_table: Arc::new(
TableAdapter::new(
Arc::new(datafusion::datasource::empty::EmptyTable::new(Arc::new(
arrow::datatypes::Schema::empty(),
))),
Arc::new(RuntimeEnv::default()),
)
.unwrap(),
),
}
}
}
#[async_trait::async_trait]
impl TableEngine for MockTableEngine {
fn name(&self) -> &str {
"MockTableEngine"
}
async fn create_table(
&self,
_ctx: &EngineContext,
_request: CreateTableRequest,
) -> table::Result<TableRef> {
unreachable!()
}
async fn open_table(
&self,
_ctx: &EngineContext,
request: OpenTableRequest,
) -> table::Result<Option<TableRef>> {
if request.table_name == self.table_name {
Ok(Some(self.sole_table.clone()))
} else {
Ok(None)
}
}
async fn alter_table(
&self,
_ctx: &EngineContext,
_request: AlterTableRequest,
) -> table::Result<TableRef> {
unreachable!()
}
fn get_table(&self, _ctx: &EngineContext, name: &str) -> table::Result<Option<TableRef>> {
if name == self.table_name {
Ok(Some(self.sole_table.clone()))
} else {
Ok(None)
}
}
fn table_exists(&self, _ctx: &EngineContext, name: &str) -> bool {
name == self.table_name
}
async fn drop_table(
&self,
_ctx: &EngineContext,
_request: DropTableRequest,
) -> table::Result<()> {
unreachable!()
}
}
async fn prepare_table_engine() -> (TempDir, TableEngineRef) {
let dir = TempDir::new("system-table-test").unwrap();
let store_dir = dir.path().to_string_lossy();
let accessor = opendal::services::fs::Builder::default()
.root(&store_dir)
.build()
.unwrap();
let object_store = ObjectStore::new(accessor);
let table_engine = Arc::new(MitoEngine::new(
EngineConfig::default(),
EngineImpl::new(
StorageEngineConfig::default(),
Arc::new(NoopLogStore::default()),
object_store.clone(),
),
object_store,
));
(dir, table_engine)
}
#[tokio::test]
async fn test_system_table_type() {
let (_dir, table_engine) = prepare_table_engine().await;
let system_table = SystemCatalogTable::new(table_engine).await.unwrap();
assert_eq!(Base, system_table.table_type());
}
#[tokio::test]
async fn test_system_table_info() {
let (_dir, table_engine) = prepare_table_engine().await;
let system_table = SystemCatalogTable::new(table_engine).await.unwrap();
let info = system_table.table_info();
assert_eq!(TableType::Base, info.table_type);
assert_eq!(SYSTEM_CATALOG_TABLE_NAME, info.name);
assert_eq!(SYSTEM_CATALOG_TABLE_ID, info.ident.table_id);
assert_eq!(SYSTEM_CATALOG_NAME, info.catalog_name);
assert_eq!(INFORMATION_SCHEMA_NAME, info.schema_name);
}
}

View File

@@ -16,7 +16,7 @@ use datatypes::vectors::VectorRef;
use futures::Stream;
use snafu::ResultExt;
use table::engine::TableEngineRef;
use table::metadata::TableId;
use table::metadata::{TableId, TableInfoRef};
use table::{Table, TableRef};
use crate::consts::{INFORMATION_SCHEMA_NAME, SYSTEM_CATALOG_TABLE_NAME};
@@ -53,6 +53,10 @@ impl Table for Tables {
self.schema.clone()
}
fn table_info(&self) -> TableInfoRef {
unreachable!("Tables does not support table_info method")
}
async fn scan(
&self,
_projection: &Option<Vec<usize>>,

View File

@@ -51,13 +51,20 @@ impl Instance {
async fn create_table_by_insert_batches(
&self,
catalog_name: &str,
schema_name: &str,
table_name: &str,
insert_batches: &[InsertBatch],
) -> Result<()> {
// Create table automatically, build schema from data.
let table_id = self.catalog_manager.next_table_id();
let create_table_request =
insert::build_create_table_request(table_id, table_name, insert_batches)?;
let create_table_request = insert::build_create_table_request(
catalog_name,
schema_name,
table_id,
table_name,
insert_batches,
)?;
info!(
"Try to create table: {} automatically with request: {:?}",
@@ -79,11 +86,15 @@ impl Instance {
table_name: &str,
values: insert_expr::Values,
) -> Result<Output> {
// maybe infer from insert batch?
let catalog_name = DEFAULT_CATALOG_NAME;
let schema_name = DEFAULT_SCHEMA_NAME;
let schema_provider = self
.catalog_manager
.catalog(DEFAULT_CATALOG_NAME)
.catalog(catalog_name)
.unwrap()
.schema(DEFAULT_SCHEMA_NAME)
.schema(schema_name)
.unwrap();
let insert_batches = insert::insert_batches(values.values)?;
@@ -98,8 +109,13 @@ impl Instance {
table
} else {
self.create_table_by_insert_batches(table_name, &insert_batches)
.await?;
self.create_table_by_insert_batches(
catalog_name,
schema_name,
table_name,
&insert_batches,
)
.await?;
schema_provider
.table(table_name)

View File

@@ -2,6 +2,7 @@ use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;
use api::v1::{alter_expr::Kind, AdminResult, AlterExpr, ColumnDef, CreateExpr};
use catalog::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::prelude::{ErrorExt, StatusCode};
use common_query::Output;
use datatypes::schema::ColumnDefaultConstraint;
@@ -77,10 +78,16 @@ impl Instance {
let table_id = self.catalog_manager().next_table_id();
let catalog_name = expr
.catalog_name
.unwrap_or_else(|| DEFAULT_CATALOG_NAME.to_string());
let schema_name = expr
.schema_name
.unwrap_or_else(|| DEFAULT_SCHEMA_NAME.to_string());
Ok(CreateTableRequest {
id: table_id,
catalog_name: expr.catalog_name,
schema_name: expr.schema_name,
catalog_name,
schema_name,
table_name: expr.table_name,
desc: expr.desc,
schema,
@@ -182,8 +189,8 @@ mod tests {
let expr = testing_create_expr();
let request = instance.create_expr_to_request(expr).unwrap();
assert_eq!(request.id, MIN_USER_TABLE_ID);
assert_eq!(request.catalog_name, None);
assert_eq!(request.schema_name, None);
assert_eq!(request.catalog_name, "greptime".to_string());
assert_eq!(request.schema_name, "public".to_string());
assert_eq!(request.table_name, "my-metrics");
assert_eq!(request.desc, Some("blabla".to_string()));
assert_eq!(request.schema, expected_table_schema());

View File

@@ -97,6 +97,8 @@ pub fn build_alter_table_request(
/// Try to build create table request from insert data.
pub fn build_create_table_request(
catalog_name: &str,
schema_name: &str,
table_id: TableId,
table_name: &str,
insert_batches: &[InsertBatch],
@@ -158,8 +160,8 @@ pub fn build_create_table_request(
return Ok(CreateTableRequest {
id: table_id,
catalog_name: None,
schema_name: None,
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
table_name: table_name.to_string(),
desc: None,
schema,
@@ -371,6 +373,7 @@ mod tests {
value::Value,
};
use table::error::Result as TableResult;
use table::metadata::TableInfoRef;
use table::Table;
use super::{
@@ -384,14 +387,13 @@ mod tests {
let table_id = 10;
let table_name = "test_metric";
assert!(build_create_table_request(table_id, table_name, &[]).is_err());
assert!(build_create_table_request("", "", table_id, table_name, &[]).is_err());
let insert_batches = insert_batches(mock_insert_batches()).unwrap();
let req = build_create_table_request(table_id, table_name, &insert_batches).unwrap();
let req =
build_create_table_request("", "", table_id, table_name, &insert_batches).unwrap();
assert_eq!(table_id, req.id);
assert!(req.catalog_name.is_none());
assert!(req.schema_name.is_none());
assert_eq!(table_name, req.table_name);
assert!(req.desc.is_none());
assert_eq!(vec![0], req.primary_key_indices);
@@ -540,6 +542,11 @@ mod tests {
.unwrap(),
)
}
fn table_info(&self) -> TableInfoRef {
unimplemented!()
}
async fn scan(
&self,
_projection: &Option<Vec<usize>>,

View File

@@ -104,6 +104,7 @@ mod tests {
use storage::config::EngineConfig as StorageEngineConfig;
use storage::EngineImpl;
use table::error::Result as TableResult;
use table::metadata::TableInfoRef;
use table::{Table, TableRef};
use table_engine::config::EngineConfig as TableEngineConfig;
use table_engine::engine::MitoEngine;
@@ -135,6 +136,11 @@ mod tests {
.unwrap(),
)
}
fn table_info(&self) -> TableInfoRef {
unimplemented!()
}
async fn scan(
&self,
_projection: &Option<Vec<usize>>,

View File

@@ -44,8 +44,8 @@ impl SqlHandler {
},
};
Ok(AlterTableRequest {
catalog_name,
schema_name,
catalog_name: Some(catalog_name),
schema_name: Some(schema_name),
table_name,
alter_kind,
})
@@ -80,8 +80,8 @@ mod tests {
let handler = create_mock_sql_handler().await;
let alter_table = parse_sql("ALTER TABLE my_metric_1 ADD tagk_i STRING Null;");
let req = handler.alter_to_request(alter_table).unwrap();
assert_eq!(req.catalog_name, None);
assert_eq!(req.schema_name, None);
assert_eq!(req.catalog_name, Some("greptime".to_string()));
assert_eq!(req.schema_name, Some("public".to_string()));
assert_eq!(req.table_name, "my_metric_1");
let alter_kind = req.alter_kind;

View File

@@ -37,8 +37,8 @@ impl SqlHandler {
})?;
let register_req = RegisterTableRequest {
catalog: catalog_name,
schema: schema_name,
catalog: Some(catalog_name.to_string()),
schema: Some(schema_name.to_string()),
table_name: table_name.clone(),
table_id,
table,
@@ -203,8 +203,6 @@ mod tests {
let c = handler.create_to_request(42, parsed_stmt).unwrap();
assert_eq!("demo_table", c.table_name);
assert_eq!(42, c.id);
assert!(c.schema_name.is_none());
assert!(c.catalog_name.is_none());
assert!(!c.create_if_not_exists);
assert_eq!(vec![0], c.primary_key_indices);
assert_eq!(1, c.schema.timestamp_index().unwrap());
@@ -302,8 +300,8 @@ mod tests {
let request = handler.create_to_request(42, create_table).unwrap();
assert_eq!(42, request.id);
assert_eq!(Some("c".to_string()), request.catalog_name);
assert_eq!(Some("s".to_string()), request.schema_name);
assert_eq!("c".to_string(), request.catalog_name);
assert_eq!("s".to_string(), request.schema_name);
assert_eq!("demo".to_string(), request.table_name);
assert!(!request.create_if_not_exists);
assert_eq!(4, request.schema.column_schemas().len());

View File

@@ -58,8 +58,8 @@ pub async fn create_test_table(instance: &Instance) -> Result<()> {
&EngineContext::default(),
CreateTableRequest {
id: MIN_USER_TABLE_ID,
catalog_name: None,
schema_name: None,
catalog_name: "greptime".to_string(),
schema_name: "public".to_string(),
table_name: table_name.to_string(),
desc: Some(" a test table".to_string()),
schema: Arc::new(

View File

@@ -136,8 +136,8 @@ fn create_to_expr(create: CreateTable) -> Result<CreateExpr> {
table_idents_to_full_name(&create.name).context(error::ParseSqlSnafu)?;
let expr = CreateExpr {
catalog_name,
schema_name,
catalog_name: Some(catalog_name),
schema_name: Some(schema_name),
table_name,
column_defs: columns_to_expr(&create.columns)?,
time_index: find_time_index(&create.constraints)?,

View File

@@ -2,7 +2,6 @@
name = "meta-client"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]

View File

@@ -2,7 +2,6 @@
name = "meta-srv"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
@@ -14,8 +13,8 @@ common-telemetry = { path = "../common/telemetry" }
etcd-client = "0.10"
futures = "0.3"
http-body = "0.4"
snafu = { version = "0.7", features = ["backtraces"] }
serde = "1.0"
snafu = { version = "0.7", features = ["backtraces"] }
tokio = { version = "1.0", features = ["full"] }
tokio-stream = { version = "0.1", features = ["net"] }
tonic = "0.8"

View File

@@ -41,8 +41,8 @@ impl ScriptsTable {
// maybe put into system catalog?
let request = CreateTableRequest {
id: SCRIPTS_TABLE_ID,
catalog_name: Some(DEFAULT_CATALOG_NAME.to_string()),
schema_name: Some(DEFAULT_SCHEMA_NAME.to_string()),
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: SCRIPTS_TABLE_NAME.to_string(),
desc: Some("Scripts table".to_string()),
schema,

View File

@@ -5,6 +5,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
catalog = { path = "../catalog" }
common-error = { path = "../common/error" }
common-time = { path = "../common/time" }
datatypes = { path = "../datatypes" }

View File

@@ -7,6 +7,7 @@ pub mod statement;
use std::str::FromStr;
use catalog::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_time::Timestamp;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema};
@@ -24,14 +25,16 @@ use crate::error::{
/// Converts maybe fully-qualified table name (`<catalog>.<schema>.<table>` or `<table>` when
/// catalog and schema are default) to tuple.
pub fn table_idents_to_full_name(
obj_name: &ObjectName,
) -> Result<(Option<String>, Option<String>, String)> {
pub fn table_idents_to_full_name(obj_name: &ObjectName) -> Result<(String, String, String)> {
match &obj_name.0[..] {
[table] => Ok((None, None, table.value.clone())),
[table] => Ok((
DEFAULT_CATALOG_NAME.to_string(),
DEFAULT_SCHEMA_NAME.to_string(),
table.value.clone(),
)),
[catalog, schema, table] => Ok((
Some(catalog.value.clone()),
Some(schema.value.clone()),
catalog.value.clone(),
schema.value.clone(),
table.value.clone(),
)),
_ => error::InvalidSqlSnafu {

View File

@@ -238,13 +238,18 @@ impl<S: StorageEngine> MitoEngineInner<S> {
_ctx: &EngineContext,
request: CreateTableRequest,
) -> Result<TableRef> {
let catalog_name = request.catalog_name;
let schema_name = request.schema_name;
let table_name = &request.table_name;
if let Some(table) = self.get_table(table_name) {
if request.create_if_not_exists {
return Ok(table);
} else {
return TableExistsSnafu { table_name }.fail();
return TableExistsSnafu {
table_name: format!("{}.{}.{}", catalog_name, schema_name, table_name),
}
.fail();
}
}
@@ -313,6 +318,8 @@ impl<S: StorageEngine> MitoEngineInner<S> {
.ident(table_id)
.table_version(INIT_TABLE_VERSION)
.table_type(TableType::Base)
.catalog_name(catalog_name.to_string())
.schema_name(schema_name.to_string())
.desc(request.desc)
.build()
.context(error::BuildTableInfoSnafu { table_name })?;
@@ -479,8 +486,8 @@ mod tests {
&EngineContext::default(),
CreateTableRequest {
id: 1,
catalog_name: None,
schema_name: None,
catalog_name: "greptime".to_string(),
schema_name: "public".to_string(),
table_name: table_name.to_string(),
desc: Some("a test table".to_string()),
schema: schema.clone(),
@@ -713,8 +720,8 @@ mod tests {
let request = CreateTableRequest {
id: 1,
catalog_name: None,
schema_name: None,
catalog_name: "greptime".to_string(),
schema_name: "public".to_string(),
table_name: table_info.name.to_string(),
schema: table_info.meta.schema.clone(),
create_if_not_exists: true,
@@ -736,8 +743,8 @@ mod tests {
// test create_if_not_exists=false
let request = CreateTableRequest {
id: 1,
catalog_name: None,
schema_name: None,
catalog_name: "greptime".to_string(),
schema_name: "public".to_string(),
table_name: table_info.name.to_string(),
schema: table_info.meta.schema.clone(),
create_if_not_exists: false,

View File

@@ -145,6 +145,10 @@ impl<R: Region> Table for MitoTable<R> {
self.table_info().table_type
}
fn table_info(&self) -> TableInfoRef {
self.table_info.load_full()
}
async fn scan(
&self,
projection: &Option<Vec<usize>>,

View File

@@ -93,8 +93,8 @@ pub async fn setup_test_engine_and_table() -> (
&EngineContext::default(),
CreateTableRequest {
id: 1,
catalog_name: None,
schema_name: None,
catalog_name: "greptime".to_string(),
schema_name: "public".to_string(),
table_name: TABLE_NAME.to_string(),
desc: Some("a test table".to_string()),
schema: schema.clone(),
@@ -125,8 +125,8 @@ pub async fn setup_mock_engine_and_table(
&EngineContext::default(),
CreateTableRequest {
id: 1,
catalog_name: None,
schema_name: None,
catalog_name: "greptime".to_string(),
schema_name: "public".to_string(),
table_name: TABLE_NAME.to_string(),
desc: None,
schema: schema.clone(),

View File

@@ -101,6 +101,10 @@ pub struct TableInfo {
pub name: String,
#[builder(default, setter(into))]
pub desc: Option<String>,
#[builder(default, setter(into))]
pub catalog_name: String,
#[builder(default, setter(into))]
pub schema_name: String,
pub meta: TableMeta,
#[builder(default = "TableType::Base")]
pub table_type: TableType,

View File

@@ -17,8 +17,8 @@ pub struct InsertRequest {
#[derive(Debug, Clone)]
pub struct CreateTableRequest {
pub id: TableId,
pub catalog_name: Option<String>,
pub schema_name: Option<String>,
pub catalog_name: String,
pub schema_name: String,
pub table_name: String,
pub desc: Option<String>,
pub schema: SchemaRef,

View File

@@ -9,7 +9,7 @@ use common_recordbatch::SendableRecordBatchStream;
use datatypes::schema::SchemaRef;
use crate::error::Result;
use crate::metadata::{FilterPushDownType, TableType};
use crate::metadata::{FilterPushDownType, TableInfoRef, TableType};
use crate::requests::{AlterTableRequest, InsertRequest};
/// Table abstraction.
@@ -22,6 +22,9 @@ pub trait Table: Send + Sync {
/// Get a reference to the schema for this table
fn schema(&self) -> SchemaRef;
/// Get a reference to the table info.
fn table_info(&self) -> TableInfoRef;
/// Get the type of this table for metadata/catalog purposes.
fn table_type(&self) -> TableType {
TableType::Base

View File

@@ -32,6 +32,7 @@ use futures::Stream;
use snafu::prelude::*;
use crate::error::{self, Result};
use crate::metadata::TableInfoRef;
use crate::table::{FilterPushDownType, Table, TableRef, TableType};
/// Greptime SendableRecordBatchStream -> datafusion ExecutionPlan.
@@ -189,6 +190,10 @@ impl Table for TableAdapter {
self.schema.clone()
}
fn table_info(&self) -> TableInfoRef {
unreachable!("Should not call table_info of TableAdaptor directly")
}
fn table_type(&self) -> TableType {
match self.table_provider.table_type() {
DfTableType::Base => TableType::Base,
@@ -309,3 +314,28 @@ impl Stream for RecordBatchStreamAdapter {
self.stream.size_hint()
}
}
#[cfg(test)]
mod tests {
use datafusion::arrow;
use datafusion::datasource::empty::EmptyTable;
use datafusion_common::field_util::SchemaExt;
use super::*;
use crate::metadata::TableType::Base;
#[test]
#[should_panic]
fn test_table_adaptor_info() {
let df_table = Arc::new(EmptyTable::new(Arc::new(arrow::datatypes::Schema::empty())));
let table_adapter = TableAdapter::new(df_table, Arc::new(RuntimeEnv::default())).unwrap();
let _ = table_adapter.table_info();
}
#[test]
fn test_table_adaptor_type() {
let df_table = Arc::new(EmptyTable::new(Arc::new(arrow::datatypes::Schema::empty())));
let table_adapter = TableAdapter::new(df_table, Arc::new(RuntimeEnv::default())).unwrap();
assert_eq!(Base, table_adapter.table_type());
}
}

View File

@@ -12,6 +12,7 @@ use futures::task::{Context, Poll};
use futures::Stream;
use crate::error::Result;
use crate::metadata::TableInfoRef;
use crate::table::{Expr, Table};
/// numbers table for test
@@ -43,6 +44,10 @@ impl Table for NumbersTable {
self.schema.clone()
}
fn table_info(&self) -> TableInfoRef {
unimplemented!()
}
async fn scan(
&self,
_projection: &Option<Vec<usize>>,

View File

@@ -13,6 +13,7 @@ use futures::task::{Context, Poll};
use futures::Stream;
use snafu::prelude::*;
use table::error::{Result, SchemaConversionSnafu, TableProjectionSnafu};
use table::metadata::TableInfoRef;
use table::Table;
#[derive(Debug, Clone)]
@@ -60,6 +61,10 @@ impl Table for MemTable {
self.recordbatch.schema.clone()
}
fn table_info(&self) -> TableInfoRef {
unimplemented!()
}
async fn scan(
&self,
projection: &Option<Vec<usize>>,