refactor: remove the most Table impls (#2274)

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Zhenchi
2023-09-01 11:34:03 +08:00
committed by Ruihang Xia
parent 86d56f71ef
commit d6c82867d5
24 changed files with 219 additions and 267 deletions

View File

@@ -55,14 +55,14 @@ impl TableEngine for MockTableEngine {
let data = vec![Arc::new(StringVector::from(vec!["a", "b", "c"])) as _];
let record_batch = RecordBatch::new(schema, data).unwrap();
let table: TableRef = Arc::new(MemTable::new_with_catalog(
let table = MemTable::new_with_catalog(
&request.table_name,
record_batch,
table_id,
request.catalog_name,
request.schema_name,
vec![0],
)) as Arc<_>;
);
let mut tables = self.tables.write().unwrap();
let _ = tables.insert(table_id, table.clone() as TableRef);

View File

@@ -561,7 +561,7 @@ mod test {
table_id: 1,
engine: "MockTableEngine".to_string(),
};
let table = Arc::new(EmptyTable::new(CreateTableRequest {
let table = EmptyTable::table(CreateTableRequest {
id: 1,
catalog_name: catalog.to_string(),
schema_name: schema.to_string(),
@@ -577,7 +577,7 @@ mod test {
create_if_not_exists: false,
table_options: TableOptions::default(),
engine: "MockTableEngine".to_string(),
}));
});
let catalog_manager = MemoryCatalogManager::new_with_table(table.clone());
keepers
.register_table(table_ident.clone(), table, catalog_manager)

View File

@@ -379,7 +379,7 @@ mod tests {
schema: table_before.schema.clone(),
table_name: table_before.table.clone(),
table_id: table_before.table_id,
table: Arc::new(EmptyTable::new(CreateTableRequest {
table: EmptyTable::table(CreateTableRequest {
id: table_before.table_id,
catalog_name: table_before.catalog.clone(),
schema_name: table_before.schema.clone(),
@@ -391,7 +391,7 @@ mod tests {
create_if_not_exists: false,
table_options: Default::default(),
engine: MITO_ENGINE.to_string(),
})),
}),
};
assert!(catalog_manager.register_table(request).await.unwrap());
@@ -418,7 +418,7 @@ mod tests {
schema: table_after.schema.clone(),
table_name: table_after.table.clone(),
table_id: table_after.table_id,
table: Arc::new(EmptyTable::new(CreateTableRequest {
table: EmptyTable::table(CreateTableRequest {
id: table_after.table_id,
catalog_name: table_after.catalog.clone(),
schema_name: table_after.schema.clone(),
@@ -430,7 +430,7 @@ mod tests {
create_if_not_exists: false,
table_options: Default::default(),
engine: MITO_ENGINE.to_string(),
})),
}),
};
assert!(catalog_manager.register_table(request).await.unwrap());

View File

@@ -30,7 +30,7 @@ use table::metadata::{TableId, TableInfo, TableInfoBuilder, TableMetaBuilder, Ta
use table::requests::{
AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest, TruncateTableRequest,
};
use table::{error as table_error, Result as TableResult, Table, TableRef};
use table::{error as table_error, Result as TableResult, TableRef};
use tokio::sync::Mutex;
use crate::config::EngineConfig;
@@ -302,7 +302,7 @@ impl EngineInner {
let _ = self.tables.write().unwrap().insert(table_id, table.clone());
Ok(table)
Ok(table.as_table_ref())
}
fn get_table(&self, table_id: TableId) -> Option<TableRef> {
@@ -311,7 +311,7 @@ impl EngineInner {
.unwrap()
.get(&table_id)
.cloned()
.map(|table| table as _)
.map(|table| table.as_table_ref())
}
async fn open_table(
@@ -365,7 +365,7 @@ impl EngineInner {
);
let _ = self.tables.write().unwrap().insert(table_id, table.clone());
Some(table as _)
Some(table.as_table_ref())
};
logging::info!(

View File

@@ -17,15 +17,14 @@ use std::sync::Arc;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, IMMUTABLE_FILE_ENGINE};
use table::engine::{EngineContext, TableEngine, TableEngineProcedure};
use table::error as table_error;
use table::requests::{
AlterKind, AlterTableRequest, DropTableRequest, OpenTableRequest, TruncateTableRequest,
};
use table::{error as table_error, Table};
use crate::config::EngineConfig;
use crate::engine::immutable::ImmutableFileTableEngine;
use crate::manifest::immutable::manifest_path;
use crate::table::immutable::ImmutableFileTable;
use crate::test_util::{self, TestEngineComponents, TEST_TABLE_NAME};
#[tokio::test]
@@ -78,11 +77,6 @@ async fn test_open_table() {
.unwrap()
.unwrap();
let reopened = reopened
.as_any()
.downcast_ref::<ImmutableFileTable>()
.unwrap();
let left = table.table_info();
let right = reopened.table_info();

View File

@@ -12,10 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use std::sync::Arc;
use async_trait::async_trait;
use common_datasource::file_format::Format;
use common_datasource::object_store::build_backend;
use common_error::ext::BoxedError;
@@ -24,10 +22,12 @@ use datatypes::schema::SchemaRef;
use object_store::ObjectStore;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use store_api::data_source::DataSource;
use store_api::storage::{RegionNumber, ScanRequest};
use table::error::{self as table_error, Result as TableResult};
use table::metadata::{RawTableInfo, TableInfo, TableInfoRef, TableType};
use table::{requests, Table};
use table::metadata::{FilterPushDownType, RawTableInfo, TableInfo};
use table::thin_table::{ThinTable, ThinTableAdapter};
use table::{requests, TableRef};
use super::format::create_stream;
use crate::error::{self, ConvertRawSnafu, Result};
@@ -45,68 +45,12 @@ pub struct ImmutableFileTableOptions {
pub struct ImmutableFileTable {
metadata: ImmutableMetadata,
// currently, it's immutable
table_info: Arc<TableInfo>,
object_store: ObjectStore,
files: Vec<String>,
format: Format,
table_ref: TableRef,
}
pub type ImmutableFileTableRef = Arc<ImmutableFileTable>;
#[async_trait]
impl Table for ImmutableFileTable {
fn as_any(&self) -> &dyn Any {
self
}
/// The [`SchemaRef`] before the projection.
/// It contains all the columns that may appear in the files (All missing columns should be filled NULLs).
fn schema(&self) -> SchemaRef {
self.table_info().meta.schema.clone()
}
fn table_info(&self) -> TableInfoRef {
self.table_info.clone()
}
fn table_type(&self) -> TableType {
self.table_info().table_type
}
async fn scan_to_stream(&self, request: ScanRequest) -> TableResult<SendableRecordBatchStream> {
create_stream(
&self.format,
&CreateScanPlanContext::default(),
&ScanPlanConfig {
file_schema: self.schema(),
files: &self.files,
projection: request.projection.as_ref(),
filters: &request.filters,
limit: request.limit,
store: self.object_store.clone(),
},
)
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)
}
async fn flush(
&self,
_region_number: Option<RegionNumber>,
_wait: Option<bool>,
) -> TableResult<()> {
// nothing to flush
Ok(())
}
}
impl ImmutableFileTable {
#[inline]
pub fn metadata(&self) -> &ImmutableMetadata {
&self.metadata
}
pub(crate) fn new(table_info: TableInfo, metadata: ImmutableMetadata) -> Result<Self> {
let table_info = Arc::new(table_info);
let options = &table_info.meta.options.extra_options;
@@ -129,12 +73,19 @@ impl ImmutableFileTable {
let object_store = build_backend(url, options).context(error::BuildBackendSnafu)?;
let schema = table_info.meta.schema.clone();
let thin_table = ThinTable::new(table_info, FilterPushDownType::Unsupported);
let data_source = Arc::new(ImmutableFileDataSource::new(
schema,
object_store,
meta.files,
format,
));
let table_ref = Arc::new(ThinTableAdapter::new(thin_table, data_source));
Ok(Self {
metadata,
table_info,
object_store,
files: meta.files,
format,
table_ref,
})
}
@@ -171,4 +122,63 @@ impl ImmutableFileTable {
Ok((metadata, table_info))
}
#[inline]
pub fn metadata(&self) -> &ImmutableMetadata {
&self.metadata
}
pub fn as_table_ref(&self) -> TableRef {
self.table_ref.clone()
}
pub async fn close(&self, regions: &[RegionNumber]) -> TableResult<()> {
self.table_ref.close(regions).await
}
}
struct ImmutableFileDataSource {
schema: SchemaRef,
object_store: ObjectStore,
files: Vec<String>,
format: Format,
}
impl ImmutableFileDataSource {
fn new(
schema: SchemaRef,
object_store: ObjectStore,
files: Vec<String>,
format: Format,
) -> Self {
Self {
schema,
object_store,
files,
format,
}
}
}
impl DataSource for ImmutableFileDataSource {
fn get_stream(
&self,
request: ScanRequest,
) -> std::result::Result<SendableRecordBatchStream, BoxedError> {
create_stream(
&self.format,
&CreateScanPlanContext::default(),
&ScanPlanConfig {
file_schema: self.schema.clone(),
files: &self.files,
projection: request.projection.as_ref(),
filters: &request.filters,
limit: request.limit,
store: self.object_store.clone(),
},
)
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)
.map_err(BoxedError::new)
}
}

View File

@@ -1429,7 +1429,7 @@ mod test {
.meta(table_meta)
.build()
.unwrap();
let table = Arc::new(EmptyTable::from_table_info(&table_info));
let table = EmptyTable::from_table_info(&table_info);
let catalog_list = MemoryCatalogManager::with_default_setup();
assert!(catalog_list
.register_table(RegisterTableRequest {

View File

@@ -377,7 +377,7 @@ mod test {
.meta(table_meta)
.build()
.unwrap();
let table = Arc::new(EmptyTable::from_table_info(&table_info));
let table = EmptyTable::from_table_info(&table_info);
let catalog_list = MemoryCatalogManager::with_default_setup();
assert!(catalog_list
.register_table(RegisterTableRequest {

View File

@@ -475,6 +475,6 @@ mod test {
data: Vec<VectorRef>,
) -> TableRef {
let record_batch = RecordBatch::new(table_schema, data).unwrap();
Arc::new(MemTable::new(table_name, record_batch))
MemTable::table(table_name, record_batch)
}
}

View File

@@ -12,13 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use catalog::local::MemoryCatalogManager;
use common_query::Output;
use common_recordbatch::{util, RecordBatch};
use session::context::QueryContext;
use table::test_util::MemTable;
use table::TableRef;
use crate::parser::QueryLanguageParser;
use crate::{QueryEngineFactory, QueryEngineRef};
@@ -50,8 +48,7 @@ async fn exec_selection(engine: QueryEngineRef, sql: &str) -> Vec<RecordBatch> {
util::collect(stream).await.unwrap()
}
pub fn new_query_engine_with_table(table: MemTable) -> QueryEngineRef {
let table = Arc::new(table);
pub fn new_query_engine_with_table(table: TableRef) -> QueryEngineRef {
let catalog_manager = MemoryCatalogManager::new_with_table(table);
QueryEngineFactory::new(catalog_manager, false).query_engine()

View File

@@ -48,7 +48,7 @@ pub fn create_query_engine() -> QueryEngineRef {
let schema = Arc::new(Schema::new(column_schemas.clone()));
let recordbatch = RecordBatch::new(schema, columns).unwrap();
let number_table = MemTable::new("numbers", recordbatch);
let number_table = MemTable::table("numbers", recordbatch);
new_query_engine_with_table(number_table)
}

View File

@@ -202,7 +202,7 @@ where
let schema = Arc::new(Schema::new(column_schemas.clone()));
let column: VectorRef = Arc::new(T::VectorType::from_vec(numbers));
let recordbatch = RecordBatch::new(schema, vec![column]).unwrap();
let testing_table = MemTable::new(&table_name, recordbatch);
let testing_table = MemTable::table(&table_name, recordbatch);
let engine = new_query_engine_with_table(testing_table);

View File

@@ -92,6 +92,6 @@ fn create_correctness_engine() -> Arc<dyn QueryEngine> {
columns.push(column);
let schema = Arc::new(Schema::new(column_schemas));
let number_table = MemTable::new("corr_numbers", RecordBatch::new(schema, columns).unwrap());
let number_table = MemTable::table("corr_numbers", RecordBatch::new(schema, columns).unwrap());
new_query_engine_with_table(number_table)
}

View File

@@ -60,7 +60,7 @@ async fn test_datafusion_query_engine() -> Result<()> {
(0..100).collect::<Vec<_>>(),
))];
let recordbatch = RecordBatch::new(schema, columns).unwrap();
let table = Arc::new(MemTable::new("numbers", recordbatch));
let table = MemTable::table("numbers", recordbatch);
let limit = 10;
let table_provider = Arc::new(DfTableProviderAdapter::new(table.clone()));

View File

@@ -12,72 +12,58 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use catalog::local::new_memory_catalog_manager;
use catalog::RegisterTableRequest;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::ext::BoxedError;
use common_query::prelude::Expr;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use common_time::range::TimestampRange;
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::vectors::{Int64Vector, TimestampMillisecondVector};
use store_api::data_source::{DataSource, DataSourceRef};
use store_api::storage::ScanRequest;
use table::metadata::{FilterPushDownType, TableInfoRef, TableType};
use table::metadata::FilterPushDownType;
use table::predicate::TimeRangePredicateBuilder;
use table::test_util::MemTable;
use table::Table;
use tokio::sync::RwLock;
use table::thin_table::{ThinTable, ThinTableAdapter};
use table::TableRef;
use crate::tests::exec_selection;
use crate::{QueryEngineFactory, QueryEngineRef};
struct MemTableWrapper {
inner: MemTable,
filter: RwLock<Vec<Expr>>,
}
struct MemTableWrapper;
impl MemTableWrapper {
pub async fn get_filters(&self) -> Vec<Expr> {
self.filter.write().await.drain(..).collect()
pub fn table(table: TableRef, filter: Arc<RwLock<Vec<Expr>>>) -> TableRef {
let table_info = table.table_info();
let thin_table_adapter = table.as_any().downcast_ref::<ThinTableAdapter>().unwrap();
let data_source = thin_table_adapter.data_source();
let thin_table = ThinTable::new(table_info, FilterPushDownType::Exact);
let data_source = Arc::new(DataSourceWrapper {
inner: data_source,
filter,
});
Arc::new(ThinTableAdapter::new(thin_table, data_source))
}
}
#[async_trait::async_trait]
impl Table for MemTableWrapper {
fn as_any(&self) -> &dyn Any {
self
}
struct DataSourceWrapper {
inner: DataSourceRef,
filter: Arc<RwLock<Vec<Expr>>>,
}
fn schema(&self) -> SchemaRef {
self.inner.schema()
}
fn table_info(&self) -> TableInfoRef {
self.inner.table_info()
}
fn table_type(&self) -> TableType {
self.inner.table_type()
}
async fn scan_to_stream(
&self,
request: ScanRequest,
) -> table::Result<SendableRecordBatchStream> {
*self.filter.write().await = request.filters.clone();
self.inner.scan_to_stream(request).await
}
fn supports_filters_pushdown(
&self,
filters: &[&Expr],
) -> table::Result<Vec<FilterPushDownType>> {
Ok(vec![FilterPushDownType::Exact; filters.len()])
impl DataSource for DataSourceWrapper {
fn get_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream, BoxedError> {
*self.filter.write().unwrap() = request.filters.clone();
self.inner.get_stream(request)
}
}
@@ -92,8 +78,9 @@ fn create_test_engine() -> TimeRangeTester {
])
.unwrap();
let table = Arc::new(MemTableWrapper {
inner: MemTable::new(
let filter = Arc::new(RwLock::new(vec![]));
let table = MemTableWrapper::table(
MemTable::table(
"m",
RecordBatch::new(
Arc::new(schema),
@@ -106,8 +93,8 @@ fn create_test_engine() -> TimeRangeTester {
)
.unwrap(),
),
filter: Default::default(),
});
filter.clone(),
);
let catalog_manager = new_memory_catalog_manager().unwrap();
let req = RegisterTableRequest {
@@ -120,22 +107,26 @@ fn create_test_engine() -> TimeRangeTester {
let _ = catalog_manager.register_table_sync(req).unwrap();
let engine = QueryEngineFactory::new(catalog_manager, false).query_engine();
TimeRangeTester { engine, table }
TimeRangeTester { engine, filter }
}
struct TimeRangeTester {
engine: QueryEngineRef,
table: Arc<MemTableWrapper>,
filter: Arc<RwLock<Vec<Expr>>>,
}
impl TimeRangeTester {
async fn check(&self, sql: &str, expect: TimestampRange) {
let _ = exec_selection(self.engine.clone(), sql).await;
let filters = self.table.get_filters().await;
let filters = self.get_filters();
let range = TimeRangePredicateBuilder::new("ts", TimeUnit::Millisecond, &filters).build();
assert_eq!(expect, range);
}
fn get_filters(&self) -> Vec<Expr> {
self.filter.write().unwrap().drain(..).collect()
}
}
#[tokio::test]

View File

@@ -582,7 +582,7 @@ mod tests {
.unwrap();
let ctx = SessionContext::new();
let table = Arc::new(MemTable::new("test", recordbatch));
let table = MemTable::table("test", recordbatch);
let table_provider = Arc::new(DfTableProviderAdapter::new(table));
let dataframe = ctx.read_table(table_provider.clone()).unwrap();

View File

@@ -30,6 +30,7 @@ use servers::query_handler::grpc::ServerGrpcQueryHandlerRef;
use servers::server::Server;
use snafu::ResultExt;
use table::test_util::MemTable;
use table::TableRef;
use tokio::net::TcpListener;
use tokio_stream::wrappers::TcpListenerStream;
@@ -99,7 +100,7 @@ impl Server for MockGrpcServer {
}
}
fn create_grpc_server(table: MemTable) -> Result<Arc<dyn Server>> {
fn create_grpc_server(table: TableRef) -> Result<Arc<dyn Server>> {
let query_handler = create_testing_grpc_query_handler(table);
let io_runtime = Arc::new(
RuntimeBuilder::default()

View File

@@ -34,7 +34,7 @@ use servers::query_handler::{ScriptHandler, ScriptHandlerRef};
use session::context::QueryContextRef;
use snafu::ensure;
use sql::statements::statement::Statement;
use table::test_util::MemTable;
use table::TableRef;
mod grpc;
mod http;
@@ -202,21 +202,20 @@ impl GrpcQueryHandler for DummyInstance {
}
}
fn create_testing_instance(table: MemTable) -> DummyInstance {
let table = Arc::new(table);
fn create_testing_instance(table: TableRef) -> DummyInstance {
let catalog_manager = MemoryCatalogManager::new_with_table(table);
let query_engine = QueryEngineFactory::new(catalog_manager, false).query_engine();
DummyInstance::new(query_engine)
}
fn create_testing_script_handler(table: MemTable) -> ScriptHandlerRef {
fn create_testing_script_handler(table: TableRef) -> ScriptHandlerRef {
Arc::new(create_testing_instance(table)) as _
}
fn create_testing_sql_query_handler(table: MemTable) -> ServerSqlQueryHandlerRef {
fn create_testing_sql_query_handler(table: TableRef) -> ServerSqlQueryHandlerRef {
Arc::new(create_testing_instance(table)) as _
}
fn create_testing_grpc_query_handler(table: MemTable) -> ServerGrpcQueryHandlerRef {
fn create_testing_grpc_query_handler(table: TableRef) -> ServerGrpcQueryHandlerRef {
Arc::new(create_testing_instance(table)) as _
}

View File

@@ -32,6 +32,7 @@ use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
use servers::server::Server;
use servers::tls::TlsOption;
use table::test_util::MemTable;
use table::TableRef;
use crate::create_testing_sql_query_handler;
use crate::mysql::{all_datatype_testing_data, MysqlTextRow, TestingData};
@@ -43,7 +44,7 @@ struct MysqlOpts<'a> {
reject_no_database: bool,
}
fn create_mysql_server(table: MemTable, opts: MysqlOpts<'_>) -> Result<Box<dyn Server>> {
fn create_mysql_server(table: TableRef, opts: MysqlOpts<'_>) -> Result<Box<dyn Server>> {
let query_handler = create_testing_sql_query_handler(table);
let io_runtime = Arc::new(
RuntimeBuilder::default()
@@ -262,7 +263,7 @@ async fn test_server_required_secure_client_plain() -> Result<()> {
} = all_datatype_testing_data();
let schema = Arc::new(Schema::new(column_schemas.clone()));
let recordbatch = RecordBatch::new(schema, columns).unwrap();
let table = MemTable::new("all_datatypes", recordbatch);
let table = MemTable::table("all_datatypes", recordbatch);
let mysql_server = create_mysql_server(
table,
@@ -299,7 +300,7 @@ async fn test_server_required_secure_client_plain_with_pkcs8_priv_key() -> Resul
} = all_datatype_testing_data();
let schema = Arc::new(Schema::new(column_schemas.clone()));
let recordbatch = RecordBatch::new(schema, columns).unwrap();
let table = MemTable::new("all_datatypes", recordbatch);
let table = MemTable::table("all_datatypes", recordbatch);
let mysql_server = create_mysql_server(
table,
@@ -331,7 +332,7 @@ async fn test_db_name() -> Result<()> {
} = all_datatype_testing_data();
let schema = Arc::new(Schema::new(column_schemas.clone()));
let recordbatch = RecordBatch::new(schema, columns).unwrap();
let table = MemTable::new("all_datatypes", recordbatch);
let table = MemTable::table("all_datatypes", recordbatch);
let mysql_server = create_mysql_server(
table,
@@ -363,7 +364,7 @@ async fn do_test_query_all_datatypes(server_tls: TlsOption, client_tls: bool) ->
} = all_datatype_testing_data();
let schema = Arc::new(Schema::new(column_schemas.clone()));
let recordbatch = RecordBatch::new(schema, columns).unwrap();
let table = MemTable::new("all_datatypes", recordbatch);
let table = MemTable::table("all_datatypes", recordbatch);
let mysql_server = create_mysql_server(
table,
@@ -462,7 +463,7 @@ async fn test_query_prepared() -> Result<()> {
} = all_datatype_testing_data();
let schema = Arc::new(Schema::new(column_schemas.clone()));
let recordbatch = RecordBatch::new(schema, columns.clone()).unwrap();
let table = MemTable::new("all_datatypes", recordbatch);
let table = MemTable::table("all_datatypes", recordbatch);
let mysql_server = create_mysql_server(
table,

View File

@@ -30,12 +30,13 @@ use servers::postgres::PostgresServer;
use servers::server::Server;
use servers::tls::TlsOption;
use table::test_util::MemTable;
use table::TableRef;
use tokio_postgres::{Client, Error as PgError, NoTls, SimpleQueryMessage};
use crate::create_testing_instance;
fn create_postgres_server(
table: MemTable,
table: TableRef,
check_pwd: bool,
tls: TlsOption,
auth_info: Option<DatabaseAuthInfo>,

View File

@@ -14,20 +14,23 @@
use std::sync::Arc;
use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_recordbatch::{EmptyRecordBatchStream, SendableRecordBatchStream};
use datatypes::schema::SchemaRef;
use store_api::data_source::DataSource;
use store_api::storage::ScanRequest;
use crate::metadata::{TableInfo, TableInfoBuilder, TableInfoRef, TableMetaBuilder, TableType};
use crate::requests::{CreateTableRequest, InsertRequest};
use crate::{Result, Table};
use crate::metadata::{
FilterPushDownType, TableInfo, TableInfoBuilder, TableMetaBuilder, TableType,
};
use crate::requests::CreateTableRequest;
use crate::thin_table::{ThinTable, ThinTableAdapter};
use crate::TableRef;
pub struct EmptyTable {
info: TableInfoRef,
}
pub struct EmptyTable;
impl EmptyTable {
pub fn new(req: CreateTableRequest) -> Self {
pub fn table(req: CreateTableRequest) -> TableRef {
let schema = Arc::new(req.schema.try_into().unwrap());
let table_meta = TableMetaBuilder::default()
.schema(schema)
@@ -48,41 +51,27 @@ impl EmptyTable {
.build()
.unwrap();
Self {
info: Arc::new(table_info),
}
Self::from_table_info(&table_info)
}
pub fn from_table_info(info: &TableInfo) -> Self {
Self {
info: Arc::new(info.clone()),
}
pub fn from_table_info(info: &TableInfo) -> TableRef {
let thin_table = ThinTable::new(Arc::new(info.clone()), FilterPushDownType::Unsupported);
let data_source = Arc::new(EmptyDataSource {
schema: info.meta.schema.clone(),
});
Arc::new(ThinTableAdapter::new(thin_table, data_source))
}
}
#[async_trait]
impl Table for EmptyTable {
fn as_any(&self) -> &dyn std::any::Any {
self as _
}
struct EmptyDataSource {
schema: SchemaRef,
}
fn schema(&self) -> datatypes::schema::SchemaRef {
self.info.meta.schema.clone()
}
fn table_info(&self) -> TableInfoRef {
self.info.clone()
}
fn table_type(&self) -> TableType {
self.info.table_type
}
async fn insert(&self, _request: InsertRequest) -> Result<usize> {
Ok(0)
}
async fn scan_to_stream(&self, _: ScanRequest) -> Result<SendableRecordBatchStream> {
Ok(Box::pin(EmptyRecordBatchStream::new(self.schema())))
impl DataSource for EmptyDataSource {
fn get_stream(
&self,
_request: ScanRequest,
) -> std::result::Result<SendableRecordBatchStream, BoxedError> {
Ok(Box::pin(EmptyRecordBatchStream::new(self.schema.clone())))
}
}

View File

@@ -12,11 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use std::pin::Pin;
use std::sync::Arc;
use async_trait::async_trait;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::ext::BoxedError;
use common_recordbatch::error::Result as RecordBatchResult;
@@ -27,22 +25,20 @@ use datatypes::vectors::UInt32Vector;
use futures::task::{Context, Poll};
use futures::Stream;
use snafu::prelude::*;
use store_api::data_source::DataSource;
use store_api::storage::{RegionNumber, ScanRequest};
use crate::error::{Result, SchemaConversionSnafu, TableProjectionSnafu, TablesRecordBatchSnafu};
use crate::error::{SchemaConversionSnafu, TableProjectionSnafu, TablesRecordBatchSnafu};
use crate::metadata::{
TableId, TableInfoBuilder, TableInfoRef, TableMetaBuilder, TableType, TableVersion,
FilterPushDownType, TableId, TableInfoBuilder, TableMetaBuilder, TableType, TableVersion,
};
use crate::{ColumnStatistics, Table, TableStatistics};
use crate::thin_table::{ThinTable, ThinTableAdapter};
use crate::TableRef;
#[derive(Debug, Clone)]
pub struct MemTable {
info: TableInfoRef,
recordbatch: RecordBatch,
}
pub struct MemTable;
impl MemTable {
pub fn new(table_name: impl Into<String>, recordbatch: RecordBatch) -> Self {
pub fn table(table_name: impl Into<String>, recordbatch: RecordBatch) -> TableRef {
Self::new_with_region(table_name, recordbatch, vec![0])
}
@@ -50,7 +46,7 @@ impl MemTable {
table_name: impl Into<String>,
recordbatch: RecordBatch,
regions: Vec<RegionNumber>,
) -> Self {
) -> TableRef {
Self::new_with_catalog(
table_name,
recordbatch,
@@ -68,7 +64,7 @@ impl MemTable {
catalog_name: String,
schema_name: String,
regions: Vec<RegionNumber>,
) -> Self {
) -> TableRef {
let schema = recordbatch.schema.clone();
let meta = TableMetaBuilder::default()
@@ -98,16 +94,14 @@ impl MemTable {
.unwrap(),
);
Self { info, recordbatch }
}
pub fn table_name(&self) -> &str {
&self.info.name
let thin_table = ThinTable::new(info, FilterPushDownType::Unsupported);
let data_source = Arc::new(MemtableDataSource { recordbatch });
Arc::new(ThinTableAdapter::new(thin_table, data_source))
}
/// Creates a 1 column 100 rows table, with table name "numbers", column name "uint32s" and
/// column type "uint32". Column data increased from 0 to 100.
pub fn default_numbers_table() -> Self {
pub fn default_numbers_table() -> TableRef {
let column_schemas = vec![ColumnSchema::new(
"uint32s",
ConcreteDataType::uint32_datatype(),
@@ -118,34 +112,25 @@ impl MemTable {
(0..100).collect::<Vec<_>>(),
))];
let recordbatch = RecordBatch::new(schema, columns).unwrap();
MemTable::new("numbers", recordbatch)
MemTable::table("numbers", recordbatch)
}
}
#[async_trait]
impl Table for MemTable {
fn as_any(&self) -> &dyn Any {
self
}
struct MemtableDataSource {
recordbatch: RecordBatch,
}
fn schema(&self) -> SchemaRef {
self.recordbatch.schema.clone()
}
fn table_info(&self) -> TableInfoRef {
self.info.clone()
}
fn table_type(&self) -> TableType {
self.info.table_type
}
async fn scan_to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
impl DataSource for MemtableDataSource {
fn get_stream(
&self,
request: ScanRequest,
) -> std::result::Result<SendableRecordBatchStream, BoxedError> {
let df_recordbatch = if let Some(indices) = request.projection {
self.recordbatch
.df_record_batch()
.project(&indices)
.context(TableProjectionSnafu)?
.context(TableProjectionSnafu)
.map_err(BoxedError::new)?
} else {
self.recordbatch.df_record_batch().clone()
};
@@ -159,41 +144,22 @@ impl Table for MemTable {
let df_recordbatch = df_recordbatch.slice(0, limit);
let recordbatch = RecordBatch::try_from_df_record_batch(
Arc::new(Schema::try_from(df_recordbatch.schema()).context(SchemaConversionSnafu)?),
Arc::new(
Schema::try_from(df_recordbatch.schema())
.context(SchemaConversionSnafu)
.map_err(BoxedError::new)?,
),
df_recordbatch,
)
.map_err(BoxedError::new)
.context(TablesRecordBatchSnafu)?;
.context(TablesRecordBatchSnafu)
.map_err(BoxedError::new)?;
Ok(Box::pin(MemtableStream {
schema: recordbatch.schema.clone(),
recordbatch: Some(recordbatch),
}))
}
fn statistics(&self) -> Option<TableStatistics> {
let df_recordbatch = self.recordbatch.df_record_batch();
let num_rows = df_recordbatch.num_rows();
let total_byte_size = df_recordbatch.get_array_memory_size();
let column_statistics: Vec<_> = df_recordbatch
.columns()
.iter()
.map(|col| {
let null_count = col.null_count();
ColumnStatistics {
null_count: Some(null_count),
// TODO(discord9): implement more statistics
..Default::default()
}
})
.collect();
Some(TableStatistics {
num_rows: Some(num_rows),
total_byte_size: Some(total_byte_size),
column_statistics: Some(column_statistics),
is_exact: true,
})
}
}
impl RecordBatchStream for MemtableStream {
@@ -278,7 +244,7 @@ mod test {
assert_eq!(vec!["hello"], string_column);
}
fn build_testing_table() -> MemTable {
fn build_testing_table() -> TableRef {
let i32_column_schema =
ColumnSchema::new("i32_numbers", ConcreteDataType::int32_datatype(), true);
let string_column_schema =
@@ -301,6 +267,6 @@ mod test {
])),
];
let recordbatch = RecordBatch::new(schema, columns).unwrap();
MemTable::new("", recordbatch)
MemTable::table("", recordbatch)
}
}

View File

@@ -13,7 +13,6 @@
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
use common_procedure::BoxedProcedure;
@@ -53,7 +52,7 @@ impl TableEngine for MockTableEngine {
let schema_name = request.schema_name.clone();
let table_name = request.table_name.clone();
let table_ref = Arc::new(EmptyTable::new(request));
let table_ref = EmptyTable::table(request);
let _ = self
.tables

View File

@@ -51,6 +51,10 @@ impl ThinTableAdapter {
pub fn new(table: ThinTable, data_source: DataSourceRef) -> Self {
Self { table, data_source }
}
pub fn data_source(&self) -> DataSourceRef {
self.data_source.clone()
}
}
#[async_trait]