mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-05 21:02:58 +00:00
feat: local catalog drop table (#913)
* feat: local catalog drop table * Update src/catalog/src/local/manager.rs Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com> * Update src/catalog/src/local/manager.rs Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com> * fix: resolve PR comments --------- Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com>
This commit is contained in:
@@ -13,6 +13,7 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::any::Any;
|
||||
use std::fmt::Debug;
|
||||
|
||||
use common_error::ext::{BoxedError, ErrorExt};
|
||||
use common_error::prelude::{Snafu, StatusCode};
|
||||
@@ -21,6 +22,8 @@ use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::RawSchema;
|
||||
use snafu::{Backtrace, ErrorCompat};
|
||||
|
||||
use crate::DeregisterTableRequest;
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
#[snafu(visibility(pub))]
|
||||
pub enum Error {
|
||||
@@ -96,18 +99,15 @@ pub enum Error {
|
||||
#[snafu(display("Table `{}` already exists", table))]
|
||||
TableExists { table: String, backtrace: Backtrace },
|
||||
|
||||
#[snafu(display("Table `{}` not exist", table))]
|
||||
TableNotExist { table: String, backtrace: Backtrace },
|
||||
|
||||
#[snafu(display("Schema {} already exists", schema))]
|
||||
SchemaExists {
|
||||
schema: String,
|
||||
backtrace: Backtrace,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to register table"))]
|
||||
RegisterTable {
|
||||
#[snafu(backtrace)]
|
||||
source: BoxedError,
|
||||
},
|
||||
|
||||
#[snafu(display("Operation {} not implemented yet", operation))]
|
||||
Unimplemented {
|
||||
operation: String,
|
||||
@@ -142,6 +142,17 @@ pub enum Error {
|
||||
source: table::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Failed to deregister table, request: {:?}, source: {}",
|
||||
request,
|
||||
source
|
||||
))]
|
||||
DeregisterTable {
|
||||
request: DeregisterTableRequest,
|
||||
#[snafu(backtrace)]
|
||||
source: table::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Illegal catalog manager state: {}", msg))]
|
||||
IllegalManagerState { backtrace: Backtrace, msg: String },
|
||||
|
||||
@@ -165,7 +176,10 @@ pub enum Error {
|
||||
},
|
||||
|
||||
#[snafu(display("Failure during SchemaProvider operation, source: {}", source))]
|
||||
SchemaProviderOperation { source: BoxedError },
|
||||
SchemaProviderOperation {
|
||||
#[snafu(backtrace)]
|
||||
source: BoxedError,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to execute system catalog table scan, source: {}", source))]
|
||||
SystemCatalogTableScanExec {
|
||||
@@ -178,15 +192,6 @@ pub enum Error {
|
||||
source: common_catalog::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("IO error occurred while fetching catalog info, source: {}", source))]
|
||||
Io {
|
||||
backtrace: Backtrace,
|
||||
source: std::io::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Local and remote catalog data are inconsistent, msg: {}", msg))]
|
||||
CatalogStateInconsistent { msg: String, backtrace: Backtrace },
|
||||
|
||||
#[snafu(display("Failed to perform metasrv operation, source: {}", source))]
|
||||
MetaSrv {
|
||||
#[snafu(backtrace)]
|
||||
@@ -198,12 +203,6 @@ pub enum Error {
|
||||
#[snafu(backtrace)]
|
||||
source: datatypes::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Catalog internal error: {}", source))]
|
||||
Internal {
|
||||
#[snafu(backtrace)]
|
||||
source: BoxedError,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -216,37 +215,34 @@ impl ErrorExt for Error {
|
||||
| Error::TableNotFound { .. }
|
||||
| Error::IllegalManagerState { .. }
|
||||
| Error::CatalogNotFound { .. }
|
||||
| Error::InvalidEntryType { .. }
|
||||
| Error::CatalogStateInconsistent { .. } => StatusCode::Unexpected,
|
||||
| Error::InvalidEntryType { .. } => StatusCode::Unexpected,
|
||||
|
||||
Error::SystemCatalog { .. }
|
||||
| Error::EmptyValue { .. }
|
||||
| Error::ValueDeserialize { .. }
|
||||
| Error::Io { .. } => StatusCode::StorageUnavailable,
|
||||
| Error::ValueDeserialize { .. } => StatusCode::StorageUnavailable,
|
||||
|
||||
Error::RegisterTable { .. } | Error::SystemCatalogTypeMismatch { .. } => {
|
||||
StatusCode::Internal
|
||||
}
|
||||
Error::SystemCatalogTypeMismatch { .. } => StatusCode::Internal,
|
||||
|
||||
Error::ReadSystemCatalog { source, .. } => source.status_code(),
|
||||
Error::InvalidCatalogValue { source, .. } => source.status_code(),
|
||||
|
||||
Error::TableExists { .. } => StatusCode::TableAlreadyExists,
|
||||
Error::TableNotExist { .. } => StatusCode::TableNotFound,
|
||||
Error::SchemaExists { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
Error::OpenSystemCatalog { source, .. }
|
||||
| Error::CreateSystemCatalog { source, .. }
|
||||
| Error::InsertCatalogRecord { source, .. }
|
||||
| Error::OpenTable { source, .. }
|
||||
| Error::CreateTable { source, .. } => source.status_code(),
|
||||
| Error::CreateTable { source, .. }
|
||||
| Error::DeregisterTable { source, .. } => source.status_code(),
|
||||
|
||||
Error::MetaSrv { source, .. } => source.status_code(),
|
||||
Error::SystemCatalogTableScan { source } => source.status_code(),
|
||||
Error::SystemCatalogTableScanExec { source } => source.status_code(),
|
||||
Error::InvalidTableSchema { source, .. } => source.status_code(),
|
||||
Error::InvalidTableInfoInCatalog { .. } => StatusCode::Unexpected,
|
||||
Error::Internal { source, .. } | Error::SchemaProviderOperation { source } => {
|
||||
source.status_code()
|
||||
}
|
||||
Error::SchemaProviderOperation { source } => source.status_code(),
|
||||
|
||||
Error::Unimplemented { .. } => StatusCode::Unsupported,
|
||||
}
|
||||
|
||||
@@ -154,7 +154,7 @@ pub struct RenameTableRequest {
|
||||
pub table_id: TableId,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct DeregisterTableRequest {
|
||||
pub catalog: String,
|
||||
pub schema: String,
|
||||
|
||||
@@ -34,9 +34,9 @@ use table::table::TableIdProvider;
|
||||
use table::TableRef;
|
||||
|
||||
use crate::error::{
|
||||
CatalogNotFoundSnafu, IllegalManagerStateSnafu, OpenTableSnafu, ReadSystemCatalogSnafu, Result,
|
||||
SchemaExistsSnafu, SchemaNotFoundSnafu, SystemCatalogSnafu, SystemCatalogTypeMismatchSnafu,
|
||||
TableExistsSnafu, TableNotFoundSnafu, UnimplementedSnafu,
|
||||
self, CatalogNotFoundSnafu, IllegalManagerStateSnafu, OpenTableSnafu, ReadSystemCatalogSnafu,
|
||||
Result, SchemaExistsSnafu, SchemaNotFoundSnafu, SystemCatalogSnafu,
|
||||
SystemCatalogTypeMismatchSnafu, TableExistsSnafu, TableNotFoundSnafu,
|
||||
};
|
||||
use crate::local::memory::{MemoryCatalogManager, MemoryCatalogProvider, MemorySchemaProvider};
|
||||
use crate::system::{
|
||||
@@ -419,11 +419,36 @@ impl CatalogManager for LocalCatalogManager {
|
||||
.is_ok())
|
||||
}
|
||||
|
||||
async fn deregister_table(&self, _request: DeregisterTableRequest) -> Result<bool> {
|
||||
UnimplementedSnafu {
|
||||
operation: "deregister table",
|
||||
async fn deregister_table(&self, request: DeregisterTableRequest) -> Result<bool> {
|
||||
{
|
||||
let started = *self.init_lock.lock().await;
|
||||
ensure!(started, IllegalManagerStateSnafu { msg: "not started" });
|
||||
}
|
||||
|
||||
{
|
||||
let _ = self.register_lock.lock().await;
|
||||
|
||||
let DeregisterTableRequest {
|
||||
catalog,
|
||||
schema,
|
||||
table_name,
|
||||
} = &request;
|
||||
let table_id = self
|
||||
.catalogs
|
||||
.table(catalog, schema, table_name)?
|
||||
.with_context(|| error::TableNotExistSnafu {
|
||||
table: format!("{catalog}.{schema}.{table_name}"),
|
||||
})?
|
||||
.table_info()
|
||||
.ident
|
||||
.table_id;
|
||||
|
||||
if !self.system.deregister_table(&request, table_id).await? {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
self.catalogs.deregister_table(request).await
|
||||
}
|
||||
.fail()
|
||||
}
|
||||
|
||||
async fn register_schema(&self, request: RegisterSchemaRequest) -> Result<bool> {
|
||||
|
||||
@@ -20,13 +20,13 @@ use std::sync::{Arc, RwLock};
|
||||
|
||||
use common_catalog::consts::MIN_USER_TABLE_ID;
|
||||
use common_telemetry::error;
|
||||
use snafu::OptionExt;
|
||||
use snafu::{ensure, OptionExt};
|
||||
use table::metadata::TableId;
|
||||
use table::table::TableIdProvider;
|
||||
use table::TableRef;
|
||||
|
||||
use crate::error::{
|
||||
CatalogNotFoundSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu, TableNotFoundSnafu,
|
||||
self, CatalogNotFoundSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu, TableNotFoundSnafu,
|
||||
};
|
||||
use crate::schema::SchemaProvider;
|
||||
use crate::{
|
||||
@@ -250,6 +250,10 @@ impl CatalogProvider for MemoryCatalogProvider {
|
||||
schema: SchemaProviderRef,
|
||||
) -> Result<Option<SchemaProviderRef>> {
|
||||
let mut schemas = self.schemas.write().unwrap();
|
||||
ensure!(
|
||||
!schemas.contains_key(&name),
|
||||
error::SchemaExistsSnafu { schema: &name }
|
||||
);
|
||||
Ok(schemas.insert(name, schema))
|
||||
}
|
||||
|
||||
|
||||
@@ -25,29 +25,27 @@ use common_query::physical_plan::{PhysicalPlanRef, SessionContext};
|
||||
use common_recordbatch::SendableRecordBatchStream;
|
||||
use common_telemetry::debug;
|
||||
use common_time::util;
|
||||
use datatypes::prelude::{ConcreteDataType, ScalarVector};
|
||||
use datatypes::prelude::{ConcreteDataType, ScalarVector, VectorRef};
|
||||
use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder, SchemaRef};
|
||||
use datatypes::vectors::{BinaryVector, TimestampMillisecondVector, UInt8Vector};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use table::engine::{EngineContext, TableEngineRef};
|
||||
use table::metadata::{TableId, TableInfoRef};
|
||||
use table::requests::{CreateTableRequest, InsertRequest, OpenTableRequest};
|
||||
use table::requests::{CreateTableRequest, DeleteRequest, InsertRequest, OpenTableRequest};
|
||||
use table::{Table, TableRef};
|
||||
|
||||
use crate::error::{
|
||||
self, CreateSystemCatalogSnafu, EmptyValueSnafu, Error, InvalidEntryTypeSnafu, InvalidKeySnafu,
|
||||
OpenSystemCatalogSnafu, Result, ValueDeserializeSnafu,
|
||||
};
|
||||
use crate::DeregisterTableRequest;
|
||||
|
||||
pub const ENTRY_TYPE_INDEX: usize = 0;
|
||||
pub const KEY_INDEX: usize = 1;
|
||||
pub const VALUE_INDEX: usize = 3;
|
||||
|
||||
pub struct SystemCatalogTable {
|
||||
table_info: TableInfoRef,
|
||||
pub table: TableRef,
|
||||
}
|
||||
pub struct SystemCatalogTable(TableRef);
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Table for SystemCatalogTable {
|
||||
@@ -56,25 +54,29 @@ impl Table for SystemCatalogTable {
|
||||
}
|
||||
|
||||
fn schema(&self) -> SchemaRef {
|
||||
self.table_info.meta.schema.clone()
|
||||
self.0.schema()
|
||||
}
|
||||
|
||||
async fn scan(
|
||||
&self,
|
||||
_projection: Option<&Vec<usize>>,
|
||||
_filters: &[Expr],
|
||||
_limit: Option<usize>,
|
||||
projection: Option<&Vec<usize>>,
|
||||
filters: &[Expr],
|
||||
limit: Option<usize>,
|
||||
) -> table::Result<PhysicalPlanRef> {
|
||||
panic!("System catalog table does not support scan!")
|
||||
self.0.scan(projection, filters, limit).await
|
||||
}
|
||||
|
||||
/// Insert values into table.
|
||||
async fn insert(&self, request: InsertRequest) -> table::error::Result<usize> {
|
||||
self.table.insert(request).await
|
||||
self.0.insert(request).await
|
||||
}
|
||||
|
||||
fn table_info(&self) -> TableInfoRef {
|
||||
self.table_info.clone()
|
||||
self.0.table_info()
|
||||
}
|
||||
|
||||
async fn delete(&self, request: DeleteRequest) -> table::Result<usize> {
|
||||
self.0.delete(request).await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -95,10 +97,7 @@ impl SystemCatalogTable {
|
||||
.await
|
||||
.context(OpenSystemCatalogSnafu)?
|
||||
{
|
||||
Ok(Self {
|
||||
table_info: table.table_info(),
|
||||
table,
|
||||
})
|
||||
Ok(Self(table))
|
||||
} else {
|
||||
// system catalog table is not yet created, try to create
|
||||
let request = CreateTableRequest {
|
||||
@@ -118,8 +117,7 @@ impl SystemCatalogTable {
|
||||
.create_table(&ctx, request)
|
||||
.await
|
||||
.context(CreateSystemCatalogSnafu)?;
|
||||
let table_info = table.table_info();
|
||||
Ok(Self { table, table_info })
|
||||
Ok(Self(table))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -128,7 +126,6 @@ impl SystemCatalogTable {
|
||||
let full_projection = None;
|
||||
let ctx = SessionContext::new();
|
||||
let scan = self
|
||||
.table
|
||||
.scan(full_projection, &[], None)
|
||||
.await
|
||||
.context(error::SystemCatalogTableScanSnafu)?;
|
||||
@@ -208,6 +205,34 @@ pub fn build_table_insert_request(
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) fn build_table_deletion_request(
|
||||
request: &DeregisterTableRequest,
|
||||
table_id: TableId,
|
||||
) -> DeleteRequest {
|
||||
let table_key = format_table_entry_key(&request.catalog, &request.schema, table_id);
|
||||
DeleteRequest {
|
||||
key_column_values: build_primary_key_columns(EntryType::Table, table_key.as_bytes()),
|
||||
}
|
||||
}
|
||||
|
||||
fn build_primary_key_columns(entry_type: EntryType, key: &[u8]) -> HashMap<String, VectorRef> {
|
||||
let mut m = HashMap::with_capacity(3);
|
||||
m.insert(
|
||||
"entry_type".to_string(),
|
||||
Arc::new(UInt8Vector::from_slice(&[entry_type as u8])) as _,
|
||||
);
|
||||
m.insert(
|
||||
"key".to_string(),
|
||||
Arc::new(BinaryVector::from_slice(&[key])) as _,
|
||||
);
|
||||
// Timestamp in key part is intentionally left to 0
|
||||
m.insert(
|
||||
"timestamp".to_string(),
|
||||
Arc::new(TimestampMillisecondVector::from_slice(&[0])) as _,
|
||||
);
|
||||
m
|
||||
}
|
||||
|
||||
pub fn build_schema_insert_request(catalog_name: String, schema_name: String) -> InsertRequest {
|
||||
let full_schema_name = format!("{catalog_name}.{schema_name}");
|
||||
build_insert_request(
|
||||
@@ -220,22 +245,10 @@ pub fn build_schema_insert_request(catalog_name: String, schema_name: String) ->
|
||||
}
|
||||
|
||||
pub fn build_insert_request(entry_type: EntryType, key: &[u8], value: &[u8]) -> InsertRequest {
|
||||
let primary_key_columns = build_primary_key_columns(entry_type, key);
|
||||
|
||||
let mut columns_values = HashMap::with_capacity(6);
|
||||
columns_values.insert(
|
||||
"entry_type".to_string(),
|
||||
Arc::new(UInt8Vector::from_slice(&[entry_type as u8])) as _,
|
||||
);
|
||||
|
||||
columns_values.insert(
|
||||
"key".to_string(),
|
||||
Arc::new(BinaryVector::from_slice(&[key])) as _,
|
||||
);
|
||||
|
||||
// Timestamp in key part is intentionally left to 0
|
||||
columns_values.insert(
|
||||
"timestamp".to_string(),
|
||||
Arc::new(TimestampMillisecondVector::from_slice(&[0])) as _,
|
||||
);
|
||||
columns_values.extend(primary_key_columns.into_iter());
|
||||
|
||||
columns_values.insert(
|
||||
"value".to_string(),
|
||||
@@ -380,6 +393,8 @@ pub struct TableEntryValue {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use common_recordbatch::RecordBatches;
|
||||
use datatypes::value::Value;
|
||||
use log_store::NoopLogStore;
|
||||
use mito::config::EngineConfig;
|
||||
use mito::engine::MitoEngine;
|
||||
@@ -500,4 +515,53 @@ mod tests {
|
||||
assert_eq!(SYSTEM_CATALOG_NAME, info.catalog_name);
|
||||
assert_eq!(INFORMATION_SCHEMA_NAME, info.schema_name);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_system_catalog_table_records() {
|
||||
let (_, table_engine) = prepare_table_engine().await;
|
||||
let catalog_table = SystemCatalogTable::new(table_engine).await.unwrap();
|
||||
|
||||
let table_insertion = build_table_insert_request(
|
||||
DEFAULT_CATALOG_NAME.to_string(),
|
||||
DEFAULT_SCHEMA_NAME.to_string(),
|
||||
"my_table".to_string(),
|
||||
1,
|
||||
);
|
||||
let result = catalog_table.insert(table_insertion).await.unwrap();
|
||||
assert_eq!(result, 1);
|
||||
|
||||
let records = catalog_table.records().await.unwrap();
|
||||
let mut batches = RecordBatches::try_collect(records).await.unwrap().take();
|
||||
assert_eq!(batches.len(), 1);
|
||||
let batch = batches.remove(0);
|
||||
assert_eq!(batch.num_rows(), 1);
|
||||
|
||||
let row = batch.rows().next().unwrap();
|
||||
let Value::UInt8(entry_type) = row[0] else { unreachable!() };
|
||||
let Value::Binary(key) = row[1].clone() else { unreachable!() };
|
||||
let Value::Binary(value) = row[3].clone() else { unreachable!() };
|
||||
let entry = decode_system_catalog(Some(entry_type), Some(&*key), Some(&*value)).unwrap();
|
||||
let expected = Entry::Table(TableEntry {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: "my_table".to_string(),
|
||||
table_id: 1,
|
||||
});
|
||||
assert_eq!(entry, expected);
|
||||
|
||||
let table_deletion = build_table_deletion_request(
|
||||
&DeregisterTableRequest {
|
||||
catalog: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: "my_table".to_string(),
|
||||
},
|
||||
1,
|
||||
);
|
||||
let result = catalog_table.delete(table_deletion).await.unwrap();
|
||||
assert_eq!(result, 1);
|
||||
|
||||
let records = catalog_table.records().await.unwrap();
|
||||
let batches = RecordBatches::try_collect(records).await.unwrap().take();
|
||||
assert_eq!(batches.len(), 0);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -38,9 +38,14 @@ use table::metadata::{TableId, TableInfoRef};
|
||||
use table::table::scan::SimpleTableScan;
|
||||
use table::{Table, TableRef};
|
||||
|
||||
use crate::error::{Error, InsertCatalogRecordSnafu};
|
||||
use crate::system::{build_schema_insert_request, build_table_insert_request, SystemCatalogTable};
|
||||
use crate::{CatalogListRef, CatalogProvider, SchemaProvider, SchemaProviderRef};
|
||||
use crate::error::{self, Error, InsertCatalogRecordSnafu, Result as CatalogResult};
|
||||
use crate::system::{
|
||||
build_schema_insert_request, build_table_deletion_request, build_table_insert_request,
|
||||
SystemCatalogTable,
|
||||
};
|
||||
use crate::{
|
||||
CatalogListRef, CatalogProvider, DeregisterTableRequest, SchemaProvider, SchemaProviderRef,
|
||||
};
|
||||
|
||||
/// Tables holds all tables created by user.
|
||||
pub struct Tables {
|
||||
@@ -279,6 +284,21 @@ impl SystemCatalog {
|
||||
.context(InsertCatalogRecordSnafu)
|
||||
}
|
||||
|
||||
pub(crate) async fn deregister_table(
|
||||
&self,
|
||||
request: &DeregisterTableRequest,
|
||||
table_id: TableId,
|
||||
) -> CatalogResult<bool> {
|
||||
self.information_schema
|
||||
.system
|
||||
.delete(build_table_deletion_request(request, table_id))
|
||||
.await
|
||||
.map(|x| x == 1)
|
||||
.with_context(|_| error::DeregisterTableSnafu {
|
||||
request: request.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn register_schema(
|
||||
&self,
|
||||
catalog: String,
|
||||
|
||||
@@ -421,7 +421,7 @@ mod tests {
|
||||
}
|
||||
|
||||
fn throw_catalog_error() -> catalog::error::Result<()> {
|
||||
Err(catalog::error::Error::RegisterTable {
|
||||
Err(catalog::error::Error::SchemaProviderOperation {
|
||||
source: BoxedError::new(MockError::with_backtrace(StatusCode::Internal)),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -117,7 +117,7 @@ mod tests {
|
||||
use std::any::Any;
|
||||
use std::sync::Arc;
|
||||
|
||||
use catalog::{CatalogList, SchemaProvider};
|
||||
use catalog::{CatalogManager, RegisterTableRequest};
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use common_query::logical_plan::Expr;
|
||||
use common_query::physical_plan::PhysicalPlanRef;
|
||||
@@ -137,7 +137,7 @@ mod tests {
|
||||
use storage::EngineImpl;
|
||||
use table::error::Result as TableResult;
|
||||
use table::metadata::TableInfoRef;
|
||||
use table::{Table, TableRef};
|
||||
use table::Table;
|
||||
use tempdir::TempDir;
|
||||
|
||||
use super::*;
|
||||
@@ -185,43 +185,6 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
struct MockSchemaProvider;
|
||||
|
||||
impl SchemaProvider for MockSchemaProvider {
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
|
||||
fn table_names(&self) -> catalog::error::Result<Vec<String>> {
|
||||
Ok(vec!["demo".to_string()])
|
||||
}
|
||||
|
||||
fn table(&self, name: &str) -> catalog::error::Result<Option<TableRef>> {
|
||||
assert_eq!(name, "demo");
|
||||
Ok(Some(Arc::new(DemoTable {})))
|
||||
}
|
||||
|
||||
fn register_table(
|
||||
&self,
|
||||
_name: String,
|
||||
_table: TableRef,
|
||||
) -> catalog::error::Result<Option<TableRef>> {
|
||||
unimplemented!();
|
||||
}
|
||||
|
||||
fn rename_table(&self, _name: &str, _new_name: String) -> catalog::error::Result<TableRef> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn deregister_table(&self, _name: &str) -> catalog::error::Result<Option<TableRef>> {
|
||||
unimplemented!();
|
||||
}
|
||||
|
||||
fn table_exist(&self, name: &str) -> catalog::error::Result<bool> {
|
||||
Ok(name == "demo")
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_statement_to_request() {
|
||||
let dir = TempDir::new("setup_test_engine_and_table").unwrap();
|
||||
@@ -249,12 +212,16 @@ mod tests {
|
||||
.await
|
||||
.unwrap(),
|
||||
);
|
||||
let catalog_provider = catalog_list.catalog(DEFAULT_CATALOG_NAME).unwrap().unwrap();
|
||||
catalog_provider
|
||||
.register_schema(
|
||||
DEFAULT_SCHEMA_NAME.to_string(),
|
||||
Arc::new(MockSchemaProvider {}),
|
||||
)
|
||||
catalog_list.start().await.unwrap();
|
||||
catalog_list
|
||||
.register_table(RegisterTableRequest {
|
||||
catalog: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: "demo".to_string(),
|
||||
table_id: 1,
|
||||
table: Arc::new(DemoTable),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let factory = QueryEngineFactory::new(catalog_list.clone());
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow::datatypes::DataType as ArrowDataType;
|
||||
use common_base::bytes::StringBytes;
|
||||
use common_base::bytes::Bytes;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::data_type::{DataType, DataTypeRef};
|
||||
@@ -43,7 +43,7 @@ impl DataType for BinaryType {
|
||||
}
|
||||
|
||||
fn default_value(&self) -> Value {
|
||||
StringBytes::default().into()
|
||||
Bytes::default().into()
|
||||
}
|
||||
|
||||
fn as_arrow_type(&self) -> ArrowDataType {
|
||||
|
||||
@@ -1,12 +1,12 @@
|
||||
CREATE TABLE test (a INTEGER, b INTEGER, t BIGINT TIME INDEX);
|
||||
CREATE TABLE test_distinct (a INTEGER, b INTEGER, t BIGINT TIME INDEX);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
INSERT INTO test VALUES (11, 22, 1), (13, 22, 2), (11, 21, 3), (11, 22, 4);
|
||||
INSERT INTO test_distinct VALUES (11, 22, 1), (13, 22, 2), (11, 21, 3), (11, 22, 4);
|
||||
|
||||
Affected Rows: 4
|
||||
|
||||
SELECT DISTINCT a, b FROM test ORDER BY a, b;
|
||||
SELECT DISTINCT a, b FROM test_distinct ORDER BY a, b;
|
||||
|
||||
+----+----+
|
||||
| a | b |
|
||||
@@ -16,7 +16,7 @@ SELECT DISTINCT a, b FROM test ORDER BY a, b;
|
||||
| 13 | 22 |
|
||||
+----+----+
|
||||
|
||||
SELECT DISTINCT test.a, b FROM test ORDER BY a, b;
|
||||
SELECT DISTINCT test_distinct.a, b FROM test_distinct ORDER BY a, b;
|
||||
|
||||
+----+----+
|
||||
| a | b |
|
||||
@@ -26,7 +26,7 @@ SELECT DISTINCT test.a, b FROM test ORDER BY a, b;
|
||||
| 13 | 22 |
|
||||
+----+----+
|
||||
|
||||
SELECT DISTINCT a FROM test ORDER BY a;
|
||||
SELECT DISTINCT a FROM test_distinct ORDER BY a;
|
||||
|
||||
+----+
|
||||
| a |
|
||||
@@ -35,7 +35,7 @@ SELECT DISTINCT a FROM test ORDER BY a;
|
||||
| 13 |
|
||||
+----+
|
||||
|
||||
SELECT DISTINCT b FROM test ORDER BY b;
|
||||
SELECT DISTINCT b FROM test_distinct ORDER BY b;
|
||||
|
||||
+----+
|
||||
| b |
|
||||
@@ -44,32 +44,32 @@ SELECT DISTINCT b FROM test ORDER BY b;
|
||||
| 22 |
|
||||
+----+
|
||||
|
||||
SELECT DISTINCT a, SUM(B) FROM test GROUP BY a ORDER BY a;
|
||||
SELECT DISTINCT a, SUM(B) FROM test_distinct GROUP BY a ORDER BY a;
|
||||
|
||||
+----+-------------+
|
||||
| a | SUM(test.b) |
|
||||
+----+-------------+
|
||||
| 11 | 65 |
|
||||
| 13 | 22 |
|
||||
+----+-------------+
|
||||
+----+----------------------+
|
||||
| a | SUM(test_distinct.b) |
|
||||
+----+----------------------+
|
||||
| 11 | 65 |
|
||||
| 13 | 22 |
|
||||
+----+----------------------+
|
||||
|
||||
SELECT DISTINCT MAX(b) FROM test GROUP BY a;
|
||||
SELECT DISTINCT MAX(b) FROM test_distinct GROUP BY a;
|
||||
|
||||
+-------------+
|
||||
| MAX(test.b) |
|
||||
+-------------+
|
||||
| 22 |
|
||||
+-------------+
|
||||
+----------------------+
|
||||
| MAX(test_distinct.b) |
|
||||
+----------------------+
|
||||
| 22 |
|
||||
+----------------------+
|
||||
|
||||
SELECT DISTINCT CASE WHEN a > 11 THEN 11 ELSE a END FROM test;
|
||||
SELECT DISTINCT CASE WHEN a > 11 THEN 11 ELSE a END FROM test_distinct;
|
||||
|
||||
+-------------------------------------------------------------+
|
||||
| CASE WHEN test.a > Int64(11) THEN Int64(11) ELSE test.a END |
|
||||
+-------------------------------------------------------------+
|
||||
| 11 |
|
||||
+-------------------------------------------------------------+
|
||||
+-------------------------------------------------------------------------------+
|
||||
| CASE WHEN test_distinct.a > Int64(11) THEN Int64(11) ELSE test_distinct.a END |
|
||||
+-------------------------------------------------------------------------------+
|
||||
| 11 |
|
||||
+-------------------------------------------------------------------------------+
|
||||
|
||||
DROP TABLE test;
|
||||
DROP TABLE test_distinct;
|
||||
|
||||
Affected Rows: 1
|
||||
|
||||
|
||||
@@ -1,19 +1,19 @@
|
||||
CREATE TABLE test (a INTEGER, b INTEGER, t BIGINT TIME INDEX);
|
||||
CREATE TABLE test_distinct (a INTEGER, b INTEGER, t BIGINT TIME INDEX);
|
||||
|
||||
INSERT INTO test VALUES (11, 22, 1), (13, 22, 2), (11, 21, 3), (11, 22, 4);
|
||||
INSERT INTO test_distinct VALUES (11, 22, 1), (13, 22, 2), (11, 21, 3), (11, 22, 4);
|
||||
|
||||
SELECT DISTINCT a, b FROM test ORDER BY a, b;
|
||||
SELECT DISTINCT a, b FROM test_distinct ORDER BY a, b;
|
||||
|
||||
SELECT DISTINCT test.a, b FROM test ORDER BY a, b;
|
||||
SELECT DISTINCT test_distinct.a, b FROM test_distinct ORDER BY a, b;
|
||||
|
||||
SELECT DISTINCT a FROM test ORDER BY a;
|
||||
SELECT DISTINCT a FROM test_distinct ORDER BY a;
|
||||
|
||||
SELECT DISTINCT b FROM test ORDER BY b;
|
||||
SELECT DISTINCT b FROM test_distinct ORDER BY b;
|
||||
|
||||
SELECT DISTINCT a, SUM(B) FROM test GROUP BY a ORDER BY a;
|
||||
SELECT DISTINCT a, SUM(B) FROM test_distinct GROUP BY a ORDER BY a;
|
||||
|
||||
SELECT DISTINCT MAX(b) FROM test GROUP BY a;
|
||||
SELECT DISTINCT MAX(b) FROM test_distinct GROUP BY a;
|
||||
|
||||
SELECT DISTINCT CASE WHEN a > 11 THEN 11 ELSE a END FROM test;
|
||||
SELECT DISTINCT CASE WHEN a > 11 THEN 11 ELSE a END FROM test_distinct;
|
||||
|
||||
DROP TABLE test;
|
||||
DROP TABLE test_distinct;
|
||||
|
||||
@@ -1,16 +1,16 @@
|
||||
CREATE TABLE test(i INTEGER, j BIGINT TIME INDEX);
|
||||
CREATE TABLE test_add_col(i INTEGER, j BIGINT TIME INDEX);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
INSERT INTO test VALUES (1, 1), (2, 2);
|
||||
INSERT INTO test_add_col VALUES (1, 1), (2, 2);
|
||||
|
||||
Affected Rows: 2
|
||||
|
||||
ALTER TABLE test ADD COLUMN k INTEGER;
|
||||
ALTER TABLE test_add_col ADD COLUMN k INTEGER;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
SELECT * FROM test;
|
||||
SELECT * FROM test_add_col;
|
||||
|
||||
+---+---+---+
|
||||
| i | j | k |
|
||||
@@ -19,7 +19,7 @@ SELECT * FROM test;
|
||||
| 2 | 2 | |
|
||||
+---+---+---+
|
||||
|
||||
DROP TABLE test;
|
||||
DROP TABLE test_add_col;
|
||||
|
||||
Affected Rows: 1
|
||||
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
CREATE TABLE test(i INTEGER, j BIGINT TIME INDEX);
|
||||
CREATE TABLE test_add_col(i INTEGER, j BIGINT TIME INDEX);
|
||||
|
||||
INSERT INTO test VALUES (1, 1), (2, 2);
|
||||
INSERT INTO test_add_col VALUES (1, 1), (2, 2);
|
||||
|
||||
ALTER TABLE test ADD COLUMN k INTEGER;
|
||||
ALTER TABLE test_add_col ADD COLUMN k INTEGER;
|
||||
|
||||
SELECT * FROM test;
|
||||
SELECT * FROM test_add_col;
|
||||
|
||||
DROP TABLE test;
|
||||
DROP TABLE test_add_col;
|
||||
|
||||
@@ -1,41 +1,41 @@
|
||||
CREATE SCHEMA test;
|
||||
CREATE SCHEMA test_schema;
|
||||
|
||||
Affected Rows: 1
|
||||
|
||||
SHOW DATABASES;
|
||||
|
||||
+---------+
|
||||
| Schemas |
|
||||
+---------+
|
||||
| public |
|
||||
| test |
|
||||
+---------+
|
||||
+-------------+
|
||||
| Schemas |
|
||||
+-------------+
|
||||
| public |
|
||||
| test_schema |
|
||||
+-------------+
|
||||
|
||||
CREATE TABLE test.hello(i BIGINT TIME INDEX);
|
||||
CREATE TABLE test_schema.hello(i BIGINT TIME INDEX);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
DROP TABLE test.hello;
|
||||
DROP TABLE test_schema.hello;
|
||||
|
||||
Affected Rows: 1
|
||||
|
||||
DROP SCHEMA test;
|
||||
DROP SCHEMA test_schema;
|
||||
|
||||
Error: 1001(Unsupported), SQL statement is not supported: DROP SCHEMA test;, keyword: SCHEMA
|
||||
Error: 1001(Unsupported), SQL statement is not supported: DROP SCHEMA test_schema;, keyword: SCHEMA
|
||||
|
||||
CREATE SCHEMA test;
|
||||
CREATE SCHEMA test_schema;
|
||||
|
||||
Affected Rows: 1
|
||||
Error: 1003(Internal), Schema test_schema already exists
|
||||
|
||||
CREATE TABLE test.hello(i BIGINT TIME INDEX);
|
||||
CREATE TABLE test_schema.hello(i BIGINT TIME INDEX);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
INSERT INTO test.hello VALUES (2), (3), (4);
|
||||
INSERT INTO test_schema.hello VALUES (2), (3), (4);
|
||||
|
||||
Affected Rows: 3
|
||||
|
||||
SELECT * FROM test.hello;
|
||||
SELECT * FROM test_schema.hello;
|
||||
|
||||
+---+
|
||||
| i |
|
||||
@@ -51,9 +51,10 @@ SHOW TABLES;
|
||||
| Tables |
|
||||
+---------+
|
||||
| numbers |
|
||||
| scripts |
|
||||
+---------+
|
||||
|
||||
SHOW TABLES FROM test;
|
||||
SHOW TABLES FROM test_schema;
|
||||
|
||||
+--------+
|
||||
| Tables |
|
||||
@@ -61,15 +62,26 @@ SHOW TABLES FROM test;
|
||||
| hello |
|
||||
+--------+
|
||||
|
||||
DROP TABLE test.hello;
|
||||
DROP TABLE test_schema.hello;
|
||||
|
||||
Affected Rows: 1
|
||||
|
||||
DROP SCHEMA test;
|
||||
DROP TABLE test_schema.hello;
|
||||
|
||||
Error: 1001(Unsupported), SQL statement is not supported: DROP SCHEMA test;, keyword: SCHEMA
|
||||
Error: 4001(TableNotFound), Table `greptime.test_schema.hello` not exist
|
||||
|
||||
SELECT * FROM test.hello;
|
||||
SHOW TABLES FROM test_schema;
|
||||
|
||||
Error: 3000(PlanQuery), Error during planning: table 'greptime.test.hello' not found
|
||||
+--------+
|
||||
| Tables |
|
||||
+--------+
|
||||
+--------+
|
||||
|
||||
DROP SCHEMA test_schema;
|
||||
|
||||
Error: 1001(Unsupported), SQL statement is not supported: DROP SCHEMA test_schema;, keyword: SCHEMA
|
||||
|
||||
SELECT * FROM test_schema.hello;
|
||||
|
||||
Error: 3000(PlanQuery), Error during planning: table 'greptime.test_schema.hello' not found
|
||||
|
||||
|
||||
@@ -1,27 +1,31 @@
|
||||
CREATE SCHEMA test;
|
||||
CREATE SCHEMA test_schema;
|
||||
|
||||
SHOW DATABASES;
|
||||
|
||||
CREATE TABLE test.hello(i BIGINT TIME INDEX);
|
||||
CREATE TABLE test_schema.hello(i BIGINT TIME INDEX);
|
||||
|
||||
DROP TABLE test.hello;
|
||||
DROP TABLE test_schema.hello;
|
||||
|
||||
DROP SCHEMA test;
|
||||
DROP SCHEMA test_schema;
|
||||
|
||||
CREATE SCHEMA test;
|
||||
CREATE SCHEMA test_schema;
|
||||
|
||||
CREATE TABLE test.hello(i BIGINT TIME INDEX);
|
||||
CREATE TABLE test_schema.hello(i BIGINT TIME INDEX);
|
||||
|
||||
INSERT INTO test.hello VALUES (2), (3), (4);
|
||||
INSERT INTO test_schema.hello VALUES (2), (3), (4);
|
||||
|
||||
SELECT * FROM test.hello;
|
||||
SELECT * FROM test_schema.hello;
|
||||
|
||||
SHOW TABLES;
|
||||
|
||||
SHOW TABLES FROM test;
|
||||
SHOW TABLES FROM test_schema;
|
||||
|
||||
DROP TABLE test.hello;
|
||||
DROP TABLE test_schema.hello;
|
||||
|
||||
DROP SCHEMA test;
|
||||
DROP TABLE test_schema.hello;
|
||||
|
||||
SELECT * FROM test.hello;
|
||||
SHOW TABLES FROM test_schema;
|
||||
|
||||
DROP SCHEMA test_schema;
|
||||
|
||||
SELECT * FROM test_schema.hello;
|
||||
|
||||
@@ -1,16 +1,16 @@
|
||||
CREATE TABLE strings(i STRING, t BIGINT, time index(t));
|
||||
CREATE TABLE insert_invalid_strings(i STRING, t BIGINT, time index(t));
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
INSERT INTO strings VALUES ('â‚(', 1);
|
||||
INSERT INTO insert_invalid_strings VALUES ('â‚(', 1);
|
||||
|
||||
Affected Rows: 1
|
||||
|
||||
INSERT INTO strings VALUES (3, 4);
|
||||
INSERT INTO insert_invalid_strings VALUES (3, 4);
|
||||
|
||||
Error: 2000(InvalidSyntax), Failed to parse value: Fail to parse number 3, invalid column type: String(StringType)
|
||||
|
||||
SELECT * FROM strings WHERE i = 'â‚(';
|
||||
SELECT * FROM insert_invalid_strings WHERE i = 'â‚(';
|
||||
|
||||
+-----+---+
|
||||
| i | t |
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
CREATE TABLE strings(i STRING, t BIGINT, time index(t));
|
||||
CREATE TABLE insert_invalid_strings(i STRING, t BIGINT, time index(t));
|
||||
|
||||
INSERT INTO strings VALUES ('â‚(', 1);
|
||||
INSERT INTO insert_invalid_strings VALUES ('â‚(', 1);
|
||||
|
||||
INSERT INTO strings VALUES (3, 4);
|
||||
INSERT INTO insert_invalid_strings VALUES (3, 4);
|
||||
|
||||
SELECT * FROM strings WHERE i = 'â‚(';
|
||||
SELECT * FROM insert_invalid_strings WHERE i = 'â‚(';
|
||||
|
||||
CREATE TABLE a(i integer, j BIGINT, time index(j));
|
||||
|
||||
|
||||
@@ -137,7 +137,7 @@ SELECT * FROM tpch_q1_agg ORDER BY l_returnflag, l_linestatus;
|
||||
| R | F | 3785523 | 5337950526.47 | 5071818532.942 | 5274405503.049367 | 25.5259438574251 | 35994.029214030925 | 0.04998927856184382 | 148301 | 2 |
|
||||
+--------------+--------------+---------+----------------+-----------------+--------------------+--------------------+--------------------+---------------------+-------------+---+
|
||||
|
||||
create table test5 (i int, s varchar, t BIGINT TIME INDEX);
|
||||
create table order_variable_size_payload_test5 (i int, s varchar, t BIGINT TIME INDEX);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
|
||||
@@ -35,7 +35,7 @@ INSERT INTO tpch_q1_agg VALUES ('N', 'O', 7459297, 10512270008.90, 9986238338.38
|
||||
|
||||
SELECT * FROM tpch_q1_agg ORDER BY l_returnflag, l_linestatus;
|
||||
|
||||
create table test5 (i int, s varchar, t BIGINT TIME INDEX);
|
||||
create table order_variable_size_payload_test5 (i int, s varchar, t BIGINT TIME INDEX);
|
||||
|
||||
CREATE TABLE test6 (i1 INT, s1 VARCHAR, i2 int, s2 VARCHAR, t BIGINT TIME INDEX);
|
||||
|
||||
|
||||
@@ -82,7 +82,7 @@ impl Env {
|
||||
// Start the DB
|
||||
let server_process = Command::new("./greptime")
|
||||
.current_dir(util::get_binary_dir("debug"))
|
||||
.args(["standalone", "start", "-m"])
|
||||
.args(["standalone", "start"])
|
||||
.stdout(log_file)
|
||||
.spawn()
|
||||
.expect("Failed to start the DB");
|
||||
|
||||
Reference in New Issue
Block a user