mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-05 21:02:58 +00:00
Compare commits
7 Commits
v0.3.1
...
v0.4.0-nig
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
964d26e415 | ||
|
|
fd412b7b07 | ||
|
|
223cf31409 | ||
|
|
62f660e439 | ||
|
|
0fb18245b8 | ||
|
|
caed6879e6 | ||
|
|
5ab0747092 |
@@ -20,6 +20,3 @@ out/
|
||||
|
||||
# Rust
|
||||
target/
|
||||
|
||||
# Git
|
||||
.git
|
||||
|
||||
2
.gitignore
vendored
2
.gitignore
vendored
@@ -44,3 +44,5 @@ benchmarks/data
|
||||
|
||||
# Vscode workspace
|
||||
*.code-workspace
|
||||
|
||||
venv/
|
||||
|
||||
37
Cargo.lock
generated
37
Cargo.lock
generated
@@ -1679,6 +1679,7 @@ dependencies = [
|
||||
"derive_builder 0.12.0",
|
||||
"futures",
|
||||
"object-store",
|
||||
"orc-rust",
|
||||
"paste",
|
||||
"regex",
|
||||
"snafu",
|
||||
@@ -3069,6 +3070,12 @@ version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7"
|
||||
|
||||
[[package]]
|
||||
name = "fallible-streaming-iterator"
|
||||
version = "0.1.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a"
|
||||
|
||||
[[package]]
|
||||
name = "fastrand"
|
||||
version = "1.9.0"
|
||||
@@ -5983,6 +5990,27 @@ version = "0.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "978aa494585d3ca4ad74929863093e87cac9790d81fe7aba2b3dc2890643a0fc"
|
||||
|
||||
[[package]]
|
||||
name = "orc-rust"
|
||||
version = "0.2.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e15d3f67795da54d9526e46b7808181ce6236d518f56ca1ee556d3a3fdd77c66"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"bytes",
|
||||
"chrono",
|
||||
"fallible-streaming-iterator",
|
||||
"flate2",
|
||||
"futures",
|
||||
"futures-util",
|
||||
"lazy_static",
|
||||
"paste",
|
||||
"prost",
|
||||
"snafu",
|
||||
"tokio",
|
||||
"zigzag",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ordered-float"
|
||||
version = "1.1.1"
|
||||
@@ -11214,6 +11242,15 @@ version = "1.6.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2a0956f1ba7c7909bfb66c2e9e4124ab6f6482560f6628b5aaeba39207c9aad9"
|
||||
|
||||
[[package]]
|
||||
name = "zigzag"
|
||||
version = "0.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "70b40401a28d86ce16a330b863b86fd7dbee4d7c940587ab09ab8c019f9e3fdf"
|
||||
dependencies = [
|
||||
"num-traits",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "zstd"
|
||||
version = "0.11.2+zstd.1.5.2"
|
||||
|
||||
@@ -8,6 +8,7 @@ RUN apt-get update && apt-get install -y \
|
||||
libssl-dev \
|
||||
protobuf-compiler \
|
||||
curl \
|
||||
git \
|
||||
build-essential \
|
||||
pkg-config \
|
||||
python3 \
|
||||
|
||||
@@ -8,6 +8,7 @@ RUN apt-get update && apt-get install -y \
|
||||
libssl-dev \
|
||||
protobuf-compiler \
|
||||
curl \
|
||||
git \
|
||||
build-essential \
|
||||
pkg-config \
|
||||
wget
|
||||
|
||||
@@ -201,6 +201,9 @@ impl TableRegionalKey {
|
||||
/// region ids allocated by metasrv.
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct TableRegionalValue {
|
||||
// We can remove the `Option` from the table id once all regional values
|
||||
// stored in meta have table ids.
|
||||
pub table_id: Option<TableId>,
|
||||
pub version: TableVersion,
|
||||
pub regions_ids: Vec<u32>,
|
||||
pub engine_name: Option<String>,
|
||||
|
||||
@@ -984,21 +984,29 @@ impl SchemaProvider for RemoteSchemaProvider {
|
||||
.get(key.as_bytes())
|
||||
.await?
|
||||
.map(|Kv(_, v)| {
|
||||
let TableRegionalValue { engine_name, .. } =
|
||||
TableRegionalValue::parse(String::from_utf8_lossy(&v))
|
||||
.context(InvalidCatalogValueSnafu)?;
|
||||
let reference = TableReference {
|
||||
catalog: &self.catalog_name,
|
||||
schema: &self.schema_name,
|
||||
table: name,
|
||||
let TableRegionalValue {
|
||||
table_id,
|
||||
engine_name,
|
||||
..
|
||||
} = TableRegionalValue::parse(String::from_utf8_lossy(&v))
|
||||
.context(InvalidCatalogValueSnafu)?;
|
||||
|
||||
let Some(table_id) = table_id else {
|
||||
warn!("Cannot find table id for {key}, the value has an old format");
|
||||
return Ok(None);
|
||||
};
|
||||
let engine_name = engine_name.as_deref().unwrap_or(MITO_ENGINE);
|
||||
let engine = self
|
||||
.engine_manager
|
||||
.engine(engine_name)
|
||||
.context(TableEngineNotFoundSnafu { engine_name })?;
|
||||
let reference = TableReference {
|
||||
catalog: &self.catalog_name,
|
||||
schema: &self.schema_name,
|
||||
table: name,
|
||||
};
|
||||
let table = engine
|
||||
.get_table(&EngineContext {}, &reference)
|
||||
.get_table(&EngineContext {}, table_id)
|
||||
.with_context(|_| OpenTableSnafu {
|
||||
table_info: reference.to_string(),
|
||||
})?;
|
||||
@@ -1011,9 +1019,12 @@ impl SchemaProvider for RemoteSchemaProvider {
|
||||
}
|
||||
|
||||
async fn register_table(&self, name: String, table: TableRef) -> Result<Option<TableRef>> {
|
||||
// Currently, initiate_tables() always call this method to register the table to the schema thus we
|
||||
// always update the region value.
|
||||
let table_info = table.table_info();
|
||||
let table_version = table_info.ident.version;
|
||||
let table_value = TableRegionalValue {
|
||||
table_id: Some(table_info.ident.table_id),
|
||||
version: table_version,
|
||||
regions_ids: table.table_info().meta.region_numbers.clone(),
|
||||
engine_name: Some(table_info.meta.engine.clone()),
|
||||
@@ -1061,25 +1072,27 @@ impl SchemaProvider for RemoteSchemaProvider {
|
||||
.get(table_key.as_bytes())
|
||||
.await?
|
||||
.map(|Kv(_, v)| {
|
||||
let TableRegionalValue { engine_name, .. } =
|
||||
TableRegionalValue::parse(String::from_utf8_lossy(&v))
|
||||
.context(InvalidCatalogValueSnafu)?;
|
||||
Ok(engine_name)
|
||||
let TableRegionalValue {
|
||||
table_id,
|
||||
engine_name,
|
||||
..
|
||||
} = TableRegionalValue::parse(String::from_utf8_lossy(&v))
|
||||
.context(InvalidCatalogValueSnafu)?;
|
||||
Ok(engine_name.and_then(|name| table_id.map(|id| (name, id))))
|
||||
})
|
||||
.transpose()?
|
||||
.flatten();
|
||||
|
||||
let engine_name = engine_opt.as_deref().unwrap_or_else(|| {
|
||||
warn!("Cannot find table engine name for {table_key}");
|
||||
MITO_ENGINE
|
||||
});
|
||||
|
||||
self.backend.delete(table_key.as_bytes()).await?;
|
||||
debug!(
|
||||
"Successfully deleted catalog table entry, key: {}",
|
||||
table_key
|
||||
);
|
||||
|
||||
let Some((engine_name, table_id)) = engine_opt else {
|
||||
warn!("Cannot find table id and engine name for {table_key}");
|
||||
return Ok(None);
|
||||
};
|
||||
let reference = TableReference {
|
||||
catalog: &self.catalog_name,
|
||||
schema: &self.schema_name,
|
||||
@@ -1088,9 +1101,9 @@ impl SchemaProvider for RemoteSchemaProvider {
|
||||
// deregistering table does not necessarily mean dropping the table
|
||||
let table = self
|
||||
.engine_manager
|
||||
.engine(engine_name)
|
||||
.engine(&engine_name)
|
||||
.context(TableEngineNotFoundSnafu { engine_name })?
|
||||
.get_table(&EngineContext {}, &reference)
|
||||
.get_table(&EngineContext {}, table_id)
|
||||
.with_context(|_| OpenTableSnafu {
|
||||
table_info: reference.to_string(),
|
||||
})?;
|
||||
|
||||
@@ -16,8 +16,7 @@ use std::any::Any;
|
||||
use std::collections::btree_map::Entry;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::fmt::{Display, Formatter};
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::sync::{Arc, RwLock as StdRwLock};
|
||||
|
||||
use async_stream::stream;
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
@@ -27,7 +26,7 @@ use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::schema::{ColumnSchema, Schema};
|
||||
use datatypes::vectors::StringVector;
|
||||
use serde::Serializer;
|
||||
use table::engine::{CloseTableResult, EngineContext, TableEngine, TableReference};
|
||||
use table::engine::{CloseTableResult, EngineContext, TableEngine};
|
||||
use table::metadata::TableId;
|
||||
use table::requests::{
|
||||
AlterTableRequest, CloseTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest,
|
||||
@@ -167,7 +166,7 @@ impl KvBackend for MockKvBackend {
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct MockTableEngine {
|
||||
tables: RwLock<HashMap<String, TableRef>>,
|
||||
tables: StdRwLock<HashMap<TableId, TableRef>>,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
@@ -182,21 +181,8 @@ impl TableEngine for MockTableEngine {
|
||||
_ctx: &EngineContext,
|
||||
request: CreateTableRequest,
|
||||
) -> table::Result<TableRef> {
|
||||
let table_name = request.table_name.clone();
|
||||
let catalog_name = request.catalog_name.clone();
|
||||
let schema_name = request.schema_name.clone();
|
||||
let table_full_name =
|
||||
TableReference::full(&catalog_name, &schema_name, &table_name).to_string();
|
||||
let table_id = request.id;
|
||||
|
||||
let default_table_id = "0".to_owned();
|
||||
let table_id = TableId::from_str(
|
||||
request
|
||||
.table_options
|
||||
.extra_options
|
||||
.get("table_id")
|
||||
.unwrap_or(&default_table_id),
|
||||
)
|
||||
.unwrap();
|
||||
let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
|
||||
"name",
|
||||
ConcreteDataType::string_datatype(),
|
||||
@@ -206,16 +192,16 @@ impl TableEngine for MockTableEngine {
|
||||
let data = vec![Arc::new(StringVector::from(vec!["a", "b", "c"])) as _];
|
||||
let record_batch = RecordBatch::new(schema, data).unwrap();
|
||||
let table: TableRef = Arc::new(MemTable::new_with_catalog(
|
||||
&table_name,
|
||||
&request.table_name,
|
||||
record_batch,
|
||||
table_id,
|
||||
catalog_name,
|
||||
schema_name,
|
||||
request.catalog_name,
|
||||
request.schema_name,
|
||||
vec![0],
|
||||
)) as Arc<_>;
|
||||
|
||||
let mut tables = self.tables.write().await;
|
||||
tables.insert(table_full_name, table.clone() as TableRef);
|
||||
let mut tables = self.tables.write().unwrap();
|
||||
tables.insert(table_id, table.clone() as TableRef);
|
||||
Ok(table)
|
||||
}
|
||||
|
||||
@@ -224,7 +210,7 @@ impl TableEngine for MockTableEngine {
|
||||
_ctx: &EngineContext,
|
||||
request: OpenTableRequest,
|
||||
) -> table::Result<Option<TableRef>> {
|
||||
Ok(self.tables.read().await.get(&request.table_name).cloned())
|
||||
Ok(self.tables.read().unwrap().get(&request.table_id).cloned())
|
||||
}
|
||||
|
||||
async fn alter_table(
|
||||
@@ -238,25 +224,13 @@ impl TableEngine for MockTableEngine {
|
||||
fn get_table(
|
||||
&self,
|
||||
_ctx: &EngineContext,
|
||||
table_ref: &TableReference,
|
||||
table_id: TableId,
|
||||
) -> table::Result<Option<TableRef>> {
|
||||
futures::executor::block_on(async {
|
||||
Ok(self
|
||||
.tables
|
||||
.read()
|
||||
.await
|
||||
.get(&table_ref.to_string())
|
||||
.cloned())
|
||||
})
|
||||
Ok(self.tables.read().unwrap().get(&table_id).cloned())
|
||||
}
|
||||
|
||||
fn table_exists(&self, _ctx: &EngineContext, table_ref: &TableReference) -> bool {
|
||||
futures::executor::block_on(async {
|
||||
self.tables
|
||||
.read()
|
||||
.await
|
||||
.contains_key(&table_ref.to_string())
|
||||
})
|
||||
fn table_exists(&self, _ctx: &EngineContext, table_id: TableId) -> bool {
|
||||
self.tables.read().unwrap().contains_key(&table_id)
|
||||
}
|
||||
|
||||
async fn drop_table(
|
||||
@@ -272,11 +246,7 @@ impl TableEngine for MockTableEngine {
|
||||
_ctx: &EngineContext,
|
||||
request: CloseTableRequest,
|
||||
) -> table::Result<CloseTableResult> {
|
||||
let _ = self
|
||||
.tables
|
||||
.write()
|
||||
.await
|
||||
.remove(&request.table_ref().to_string());
|
||||
let _ = self.tables.write().unwrap().remove(&request.table_id);
|
||||
Ok(CloseTableResult::Released(vec![]))
|
||||
}
|
||||
|
||||
|
||||
@@ -475,6 +475,7 @@ impl CountdownTask {
|
||||
catalog_name: table_ident.catalog.clone(),
|
||||
schema_name: table_ident.schema.clone(),
|
||||
table_name: table_ident.table.clone(),
|
||||
table_id: table_ident.table_id,
|
||||
region_numbers: vec![region],
|
||||
flush: true,
|
||||
};
|
||||
@@ -499,7 +500,7 @@ mod test {
|
||||
use common_meta::heartbeat::mailbox::HeartbeatMailbox;
|
||||
use datatypes::schema::RawSchema;
|
||||
use table::engine::manager::MemoryTableEngineManager;
|
||||
use table::engine::{TableEngine, TableReference};
|
||||
use table::engine::TableEngine;
|
||||
use table::requests::{CreateTableRequest, TableOptions};
|
||||
use table::test_util::EmptyTable;
|
||||
|
||||
@@ -751,8 +752,9 @@ mod test {
|
||||
let catalog = "my_catalog";
|
||||
let schema = "my_schema";
|
||||
let table = "my_table";
|
||||
let table_id = 1;
|
||||
let request = CreateTableRequest {
|
||||
id: 1,
|
||||
id: table_id,
|
||||
catalog_name: catalog.to_string(),
|
||||
schema_name: schema.to_string(),
|
||||
table_name: table.to_string(),
|
||||
@@ -768,7 +770,6 @@ mod test {
|
||||
table_options: TableOptions::default(),
|
||||
engine: "mito".to_string(),
|
||||
};
|
||||
let table_ref = TableReference::full(catalog, schema, table);
|
||||
|
||||
let table_engine = Arc::new(MockTableEngine::default());
|
||||
table_engine.create_table(ctx, request).await.unwrap();
|
||||
@@ -777,7 +778,7 @@ mod test {
|
||||
catalog: catalog.to_string(),
|
||||
schema: schema.to_string(),
|
||||
table: table.to_string(),
|
||||
table_id: 1024,
|
||||
table_id,
|
||||
engine: "mito".to_string(),
|
||||
};
|
||||
let (tx, rx) = mpsc::channel(10);
|
||||
@@ -813,9 +814,9 @@ mod test {
|
||||
.unwrap();
|
||||
|
||||
// assert the table is closed after deadline is reached
|
||||
assert!(table_engine.table_exists(ctx, &table_ref));
|
||||
assert!(table_engine.table_exists(ctx, table_id));
|
||||
// spare 500ms for the task to close the table
|
||||
tokio::time::sleep(Duration::from_millis(2000)).await;
|
||||
assert!(!table_engine.table_exists(ctx, &table_ref));
|
||||
assert!(!table_engine.table_exists(ctx, table_id));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,6 +24,7 @@ datafusion.workspace = true
|
||||
derive_builder = "0.12"
|
||||
futures.workspace = true
|
||||
object-store = { path = "../../object-store" }
|
||||
orc-rust = "0.2.3"
|
||||
regex = "1.7"
|
||||
snafu.workspace = true
|
||||
tokio.workspace = true
|
||||
|
||||
@@ -54,6 +54,12 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to build orc reader, source: {}", source))]
|
||||
OrcReader {
|
||||
location: Location,
|
||||
source: orc_rust::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to read object from path: {}, source: {}", path, source))]
|
||||
ReadObject {
|
||||
path: String,
|
||||
@@ -171,7 +177,8 @@ impl ErrorExt for Error {
|
||||
| ReadRecordBatch { .. }
|
||||
| WriteRecordBatch { .. }
|
||||
| EncodeRecordBatch { .. }
|
||||
| BufferedWriterClosed { .. } => StatusCode::Unexpected,
|
||||
| BufferedWriterClosed { .. }
|
||||
| OrcReader { .. } => StatusCode::Unexpected,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -182,6 +189,7 @@ impl ErrorExt for Error {
|
||||
fn location_opt(&self) -> Option<common_error::snafu::Location> {
|
||||
use Error::*;
|
||||
match self {
|
||||
OrcReader { location, .. } => Some(*location),
|
||||
BuildBackend { location, .. } => Some(*location),
|
||||
ReadObject { location, .. } => Some(*location),
|
||||
ListObjects { location, .. } => Some(*location),
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
pub mod csv;
|
||||
pub mod json;
|
||||
pub mod orc;
|
||||
pub mod parquet;
|
||||
#[cfg(test)]
|
||||
pub mod tests;
|
||||
@@ -38,6 +39,7 @@ use snafu::ResultExt;
|
||||
|
||||
use self::csv::CsvFormat;
|
||||
use self::json::JsonFormat;
|
||||
use self::orc::OrcFormat;
|
||||
use self::parquet::ParquetFormat;
|
||||
use crate::buffered_writer::{DfRecordBatchEncoder, LazyBufferedWriter};
|
||||
use crate::compression::CompressionType;
|
||||
@@ -56,6 +58,7 @@ pub enum Format {
|
||||
Csv(CsvFormat),
|
||||
Json(JsonFormat),
|
||||
Parquet(ParquetFormat),
|
||||
Orc(OrcFormat),
|
||||
}
|
||||
|
||||
impl Format {
|
||||
@@ -64,6 +67,7 @@ impl Format {
|
||||
Format::Csv(_) => ".csv",
|
||||
Format::Json(_) => ".json",
|
||||
Format::Parquet(_) => ".parquet",
|
||||
&Format::Orc(_) => ".orc",
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -81,6 +85,7 @@ impl TryFrom<&HashMap<String, String>> for Format {
|
||||
"CSV" => Ok(Self::Csv(CsvFormat::try_from(options)?)),
|
||||
"JSON" => Ok(Self::Json(JsonFormat::try_from(options)?)),
|
||||
"PARQUET" => Ok(Self::Parquet(ParquetFormat::default())),
|
||||
"ORC" => Ok(Self::Orc(OrcFormat)),
|
||||
_ => error::UnsupportedFormatSnafu { format: &format }.fail(),
|
||||
}
|
||||
}
|
||||
|
||||
102
src/common/datasource/src/file_format/orc.rs
Normal file
102
src/common/datasource/src/file_format/orc.rs
Normal file
@@ -0,0 +1,102 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use arrow_schema::{Schema, SchemaRef};
|
||||
use async_trait::async_trait;
|
||||
use datafusion::arrow::record_batch::RecordBatch as DfRecordBatch;
|
||||
use datafusion::error::{DataFusionError, Result as DfResult};
|
||||
use datafusion::physical_plan::RecordBatchStream;
|
||||
use futures::Stream;
|
||||
use object_store::ObjectStore;
|
||||
use orc_rust::arrow_reader::{create_arrow_schema, Cursor};
|
||||
use orc_rust::async_arrow_reader::ArrowStreamReader;
|
||||
pub use orc_rust::error::Error as OrcError;
|
||||
use orc_rust::reader::Reader;
|
||||
use snafu::ResultExt;
|
||||
use tokio::io::{AsyncRead, AsyncSeek};
|
||||
|
||||
use crate::error::{self, Result};
|
||||
use crate::file_format::FileFormat;
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
|
||||
pub struct OrcFormat;
|
||||
|
||||
pub async fn new_orc_cursor<R: AsyncRead + AsyncSeek + Unpin + Send + 'static>(
|
||||
reader: R,
|
||||
) -> Result<Cursor<R>> {
|
||||
let reader = Reader::new_async(reader)
|
||||
.await
|
||||
.context(error::OrcReaderSnafu)?;
|
||||
let cursor = Cursor::root(reader).context(error::OrcReaderSnafu)?;
|
||||
Ok(cursor)
|
||||
}
|
||||
|
||||
pub async fn new_orc_stream_reader<R: AsyncRead + AsyncSeek + Unpin + Send + 'static>(
|
||||
reader: R,
|
||||
) -> Result<ArrowStreamReader<R>> {
|
||||
let cursor = new_orc_cursor(reader).await?;
|
||||
Ok(ArrowStreamReader::new(cursor, None))
|
||||
}
|
||||
|
||||
pub async fn infer_orc_schema<R: AsyncRead + AsyncSeek + Unpin + Send + 'static>(
|
||||
reader: R,
|
||||
) -> Result<Schema> {
|
||||
let cursor = new_orc_cursor(reader).await?;
|
||||
Ok(create_arrow_schema(&cursor))
|
||||
}
|
||||
|
||||
pub struct OrcArrowStreamReaderAdapter<T: AsyncRead + AsyncSeek + Unpin + Send + 'static> {
|
||||
stream: ArrowStreamReader<T>,
|
||||
}
|
||||
|
||||
impl<T: AsyncRead + AsyncSeek + Unpin + Send + 'static> OrcArrowStreamReaderAdapter<T> {
|
||||
pub fn new(stream: ArrowStreamReader<T>) -> Self {
|
||||
Self { stream }
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: AsyncRead + AsyncSeek + Unpin + Send + 'static> RecordBatchStream
|
||||
for OrcArrowStreamReaderAdapter<T>
|
||||
{
|
||||
fn schema(&self) -> SchemaRef {
|
||||
self.stream.schema()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: AsyncRead + AsyncSeek + Unpin + Send + 'static> Stream for OrcArrowStreamReaderAdapter<T> {
|
||||
type Item = DfResult<DfRecordBatch>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let batch = futures::ready!(Pin::new(&mut self.stream).poll_next(cx))
|
||||
.map(|r| r.map_err(|e| DataFusionError::External(Box::new(e))));
|
||||
Poll::Ready(batch)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl FileFormat for OrcFormat {
|
||||
async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> {
|
||||
let reader = store
|
||||
.reader(path)
|
||||
.await
|
||||
.context(error::ReadObjectSnafu { path })?;
|
||||
|
||||
let schema = infer_orc_schema(reader).await?;
|
||||
|
||||
Ok(schema)
|
||||
}
|
||||
}
|
||||
11
src/common/datasource/tests/orc/README.md
Normal file
11
src/common/datasource/tests/orc/README.md
Normal file
@@ -0,0 +1,11 @@
|
||||
## Generate orc data
|
||||
|
||||
```bash
|
||||
python3 -m venv venv
|
||||
venv/bin/pip install -U pip
|
||||
venv/bin/pip install -U pyorc
|
||||
|
||||
./venv/bin/python write.py
|
||||
|
||||
cargo test
|
||||
```
|
||||
BIN
src/common/datasource/tests/orc/test.orc
Normal file
BIN
src/common/datasource/tests/orc/test.orc
Normal file
Binary file not shown.
103
src/common/datasource/tests/orc/write.py
Normal file
103
src/common/datasource/tests/orc/write.py
Normal file
@@ -0,0 +1,103 @@
|
||||
# Copyright 2023 Greptime Team
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import random
|
||||
import datetime
|
||||
import pyorc
|
||||
|
||||
data = {
|
||||
"double_a": [1.0, 2.0, 3.0, 4.0, 5.0],
|
||||
"a": [1.0, 2.0, None, 4.0, 5.0],
|
||||
"b": [True, False, None, True, False],
|
||||
"str_direct": ["a", "cccccc", None, "ddd", "ee"],
|
||||
"d": ["a", "bb", None, "ccc", "ddd"],
|
||||
"e": ["ddd", "cc", None, "bb", "a"],
|
||||
"f": ["aaaaa", "bbbbb", None, "ccccc", "ddddd"],
|
||||
"int_short_repeated": [5, 5, None, 5, 5],
|
||||
"int_neg_short_repeated": [-5, -5, None, -5, -5],
|
||||
"int_delta": [1, 2, None, 4, 5],
|
||||
"int_neg_delta": [5, 4, None, 2, 1],
|
||||
"int_direct": [1, 6, None, 3, 2],
|
||||
"int_neg_direct": [-1, -6, None, -3, -2],
|
||||
"bigint_direct": [1, 6, None, 3, 2],
|
||||
"bigint_neg_direct": [-1, -6, None, -3, -2],
|
||||
"bigint_other": [5, -5, 1, 5, 5],
|
||||
"utf8_increase": ["a", "bb", "ccc", "dddd", "eeeee"],
|
||||
"utf8_decrease": ["eeeee", "dddd", "ccc", "bb", "a"],
|
||||
"timestamp_simple": [datetime.datetime(2023, 4, 1, 20, 15, 30, 2000), datetime.datetime.fromtimestamp(int('1629617204525777000')/1000000000), datetime.datetime(2023, 1, 1), datetime.datetime(2023, 2, 1), datetime.datetime(2023, 3, 1)],
|
||||
"date_simple": [datetime.date(2023, 4, 1), datetime.date(2023, 3, 1), datetime.date(2023, 1, 1), datetime.date(2023, 2, 1), datetime.date(2023, 3, 1)]
|
||||
}
|
||||
|
||||
def infer_schema(data):
|
||||
schema = "struct<"
|
||||
for key, value in data.items():
|
||||
dt = type(value[0])
|
||||
if dt == float:
|
||||
dt = "float"
|
||||
elif dt == int:
|
||||
dt = "int"
|
||||
elif dt == bool:
|
||||
dt = "boolean"
|
||||
elif dt == str:
|
||||
dt = "string"
|
||||
elif key.startswith("timestamp"):
|
||||
dt = "timestamp"
|
||||
elif key.startswith("date"):
|
||||
dt = "date"
|
||||
else:
|
||||
print(key,value,dt)
|
||||
raise NotImplementedError
|
||||
if key.startswith("double"):
|
||||
dt = "double"
|
||||
if key.startswith("bigint"):
|
||||
dt = "bigint"
|
||||
schema += key + ":" + dt + ","
|
||||
|
||||
schema = schema[:-1] + ">"
|
||||
return schema
|
||||
|
||||
|
||||
|
||||
def _write(
|
||||
schema: str,
|
||||
data,
|
||||
file_name: str,
|
||||
compression=pyorc.CompressionKind.NONE,
|
||||
dict_key_size_threshold=0.0,
|
||||
):
|
||||
output = open(file_name, "wb")
|
||||
writer = pyorc.Writer(
|
||||
output,
|
||||
schema,
|
||||
dict_key_size_threshold=dict_key_size_threshold,
|
||||
# use a small number to ensure that compression crosses value boundaries
|
||||
compression_block_size=32,
|
||||
compression=compression,
|
||||
)
|
||||
num_rows = len(list(data.values())[0])
|
||||
for x in range(num_rows):
|
||||
row = tuple(values[x] for values in data.values())
|
||||
writer.write(row)
|
||||
writer.close()
|
||||
|
||||
with open(file_name, "rb") as f:
|
||||
reader = pyorc.Reader(f)
|
||||
list(reader)
|
||||
|
||||
|
||||
_write(
|
||||
infer_schema(data),
|
||||
data,
|
||||
"test.orc",
|
||||
)
|
||||
@@ -34,7 +34,7 @@ const LOCATION_TYPE_FIRST: i32 = LocationType::First as i32;
|
||||
const LOCATION_TYPE_AFTER: i32 = LocationType::After as i32;
|
||||
|
||||
/// Convert an [`AlterExpr`] to an [`AlterTableRequest`]
|
||||
pub fn alter_expr_to_request(expr: AlterExpr) -> Result<AlterTableRequest> {
|
||||
pub fn alter_expr_to_request(table_id: TableId, expr: AlterExpr) -> Result<AlterTableRequest> {
|
||||
let catalog_name = expr.catalog_name;
|
||||
let schema_name = expr.schema_name;
|
||||
let kind = expr.kind.context(MissingFieldSnafu { field: "kind" })?;
|
||||
@@ -69,6 +69,7 @@ pub fn alter_expr_to_request(expr: AlterExpr) -> Result<AlterTableRequest> {
|
||||
catalog_name,
|
||||
schema_name,
|
||||
table_name: expr.table_name,
|
||||
table_id,
|
||||
alter_kind,
|
||||
};
|
||||
Ok(request)
|
||||
@@ -82,6 +83,7 @@ pub fn alter_expr_to_request(expr: AlterExpr) -> Result<AlterTableRequest> {
|
||||
catalog_name,
|
||||
schema_name,
|
||||
table_name: expr.table_name,
|
||||
table_id,
|
||||
alter_kind,
|
||||
};
|
||||
Ok(request)
|
||||
@@ -92,6 +94,7 @@ pub fn alter_expr_to_request(expr: AlterExpr) -> Result<AlterTableRequest> {
|
||||
catalog_name,
|
||||
schema_name,
|
||||
table_name: expr.table_name,
|
||||
table_id,
|
||||
alter_kind,
|
||||
};
|
||||
Ok(request)
|
||||
@@ -239,7 +242,7 @@ mod tests {
|
||||
})),
|
||||
};
|
||||
|
||||
let alter_request = alter_expr_to_request(expr).unwrap();
|
||||
let alter_request = alter_expr_to_request(1, expr).unwrap();
|
||||
assert_eq!(alter_request.catalog_name, "");
|
||||
assert_eq!(alter_request.schema_name, "");
|
||||
assert_eq!("monitor".to_string(), alter_request.table_name);
|
||||
@@ -296,7 +299,7 @@ mod tests {
|
||||
})),
|
||||
};
|
||||
|
||||
let alter_request = alter_expr_to_request(expr).unwrap();
|
||||
let alter_request = alter_expr_to_request(1, expr).unwrap();
|
||||
assert_eq!(alter_request.catalog_name, "");
|
||||
assert_eq!(alter_request.schema_name, "");
|
||||
assert_eq!("monitor".to_string(), alter_request.table_name);
|
||||
@@ -344,7 +347,7 @@ mod tests {
|
||||
})),
|
||||
};
|
||||
|
||||
let alter_request = alter_expr_to_request(expr).unwrap();
|
||||
let alter_request = alter_expr_to_request(1, expr).unwrap();
|
||||
assert_eq!(alter_request.catalog_name, "test_catalog");
|
||||
assert_eq!(alter_request.schema_name, "test_schema");
|
||||
assert_eq!("monitor".to_string(), alter_request.table_name);
|
||||
|
||||
@@ -22,6 +22,7 @@ use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef;
|
||||
use datafusion::error::Result as DfResult;
|
||||
pub use datafusion::execution::context::{SessionContext, TaskContext};
|
||||
use datafusion::physical_plan::expressions::PhysicalSortExpr;
|
||||
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
|
||||
pub use datafusion::physical_plan::Partitioning;
|
||||
use datafusion::physical_plan::Statistics;
|
||||
use datatypes::schema::SchemaRef;
|
||||
@@ -69,6 +70,10 @@ pub trait PhysicalPlan: Debug + Send + Sync {
|
||||
partition: usize,
|
||||
context: Arc<TaskContext>,
|
||||
) -> Result<SendableRecordBatchStream>;
|
||||
|
||||
fn metrics(&self) -> Option<MetricsSet> {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Adapt DataFusion's [`ExecutionPlan`](DfPhysicalPlan) to GreptimeDB's [`PhysicalPlan`].
|
||||
@@ -76,11 +81,16 @@ pub trait PhysicalPlan: Debug + Send + Sync {
|
||||
pub struct PhysicalPlanAdapter {
|
||||
schema: SchemaRef,
|
||||
df_plan: Arc<dyn DfPhysicalPlan>,
|
||||
metric: ExecutionPlanMetricsSet,
|
||||
}
|
||||
|
||||
impl PhysicalPlanAdapter {
|
||||
pub fn new(schema: SchemaRef, df_plan: Arc<dyn DfPhysicalPlan>) -> Self {
|
||||
Self { schema, df_plan }
|
||||
Self {
|
||||
schema,
|
||||
df_plan,
|
||||
metric: ExecutionPlanMetricsSet::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn df_plan(&self) -> Arc<dyn DfPhysicalPlan> {
|
||||
@@ -127,15 +137,21 @@ impl PhysicalPlan for PhysicalPlanAdapter {
|
||||
partition: usize,
|
||||
context: Arc<TaskContext>,
|
||||
) -> Result<SendableRecordBatchStream> {
|
||||
let baseline_metric = BaselineMetrics::new(&self.metric, partition);
|
||||
|
||||
let df_plan = self.df_plan.clone();
|
||||
let stream = df_plan
|
||||
.execute(partition, context)
|
||||
.context(error::GeneralDataFusionSnafu)?;
|
||||
let adapter = RecordBatchStreamAdapter::try_new(stream)
|
||||
let adapter = RecordBatchStreamAdapter::try_new_with_metrics(stream, baseline_metric)
|
||||
.context(error::ConvertDfRecordBatchStreamSnafu)?;
|
||||
|
||||
Ok(Box::pin(adapter))
|
||||
}
|
||||
|
||||
fn metrics(&self) -> Option<MetricsSet> {
|
||||
Some(self.metric.clone_inner())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -196,6 +212,10 @@ impl DfPhysicalPlan for DfPhysicalPlanAdapter {
|
||||
fn statistics(&self) -> Statistics {
|
||||
Statistics::default()
|
||||
}
|
||||
|
||||
fn metrics(&self) -> Option<MetricsSet> {
|
||||
self.0.metrics()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -20,6 +20,7 @@ use std::task::{Context, Poll};
|
||||
use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef;
|
||||
use datafusion::error::Result as DfResult;
|
||||
use datafusion::parquet::arrow::async_reader::{AsyncFileReader, ParquetRecordBatchStream};
|
||||
use datafusion::physical_plan::metrics::BaselineMetrics;
|
||||
use datafusion::physical_plan::RecordBatchStream as DfRecordBatchStream;
|
||||
use datafusion_common::DataFusionError;
|
||||
use datatypes::schema::{Schema, SchemaRef};
|
||||
@@ -115,13 +116,31 @@ impl Stream for DfRecordBatchStreamAdapter {
|
||||
pub struct RecordBatchStreamAdapter {
|
||||
schema: SchemaRef,
|
||||
stream: DfSendableRecordBatchStream,
|
||||
metrics: Option<BaselineMetrics>,
|
||||
}
|
||||
|
||||
impl RecordBatchStreamAdapter {
|
||||
pub fn try_new(stream: DfSendableRecordBatchStream) -> Result<Self> {
|
||||
let schema =
|
||||
Arc::new(Schema::try_from(stream.schema()).context(error::SchemaConversionSnafu)?);
|
||||
Ok(Self { schema, stream })
|
||||
Ok(Self {
|
||||
schema,
|
||||
stream,
|
||||
metrics: None,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn try_new_with_metrics(
|
||||
stream: DfSendableRecordBatchStream,
|
||||
metrics: BaselineMetrics,
|
||||
) -> Result<Self> {
|
||||
let schema =
|
||||
Arc::new(Schema::try_from(stream.schema()).context(error::SchemaConversionSnafu)?);
|
||||
Ok(Self {
|
||||
schema,
|
||||
stream,
|
||||
metrics: Some(metrics),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -135,6 +154,12 @@ impl Stream for RecordBatchStreamAdapter {
|
||||
type Item = Result<RecordBatch>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let timer = self
|
||||
.metrics
|
||||
.as_ref()
|
||||
.map(|m| m.elapsed_compute().clone())
|
||||
.unwrap_or_default();
|
||||
let _guard = timer.timer();
|
||||
match Pin::new(&mut self.stream).poll_next(cx) {
|
||||
Poll::Pending => Poll::Pending,
|
||||
Poll::Ready(Some(df_record_batch)) => {
|
||||
|
||||
@@ -164,7 +164,7 @@ impl CloseRegionHandler {
|
||||
}
|
||||
|
||||
if engine
|
||||
.get_table(&ctx, table_ref)
|
||||
.get_table(&ctx, region_ident.table_ident.table_id)
|
||||
.with_context(|_| error::GetTableSnafu {
|
||||
table_name: table_ref.to_string(),
|
||||
})?
|
||||
@@ -178,6 +178,7 @@ impl CloseRegionHandler {
|
||||
schema_name: table_ref.schema.to_string(),
|
||||
table_name: table_ref.table.to_string(),
|
||||
region_numbers: region_numbers.clone(),
|
||||
table_id: region_ident.table_ident.table_id,
|
||||
flush: true,
|
||||
},
|
||||
)
|
||||
|
||||
@@ -106,7 +106,13 @@ impl Instance {
|
||||
let name = alter_table.table_name().clone();
|
||||
let (catalog, schema, table) = table_idents_to_full_name(&name, query_ctx.clone())?;
|
||||
let table_ref = TableReference::full(&catalog, &schema, &table);
|
||||
let req = SqlHandler::alter_to_request(alter_table, table_ref)?;
|
||||
// Currently, we have to get the table multiple times. Consider remove the sql handler in the future.
|
||||
let table = self.sql_handler.get_table(&table_ref).await?;
|
||||
let req = SqlHandler::alter_to_request(
|
||||
alter_table,
|
||||
table_ref,
|
||||
table.table_info().ident.table_id,
|
||||
)?;
|
||||
self.sql_handler
|
||||
.execute(SqlRequest::Alter(req), query_ctx)
|
||||
.await
|
||||
@@ -114,10 +120,13 @@ impl Instance {
|
||||
Statement::DropTable(drop_table) => {
|
||||
let (catalog_name, schema_name, table_name) =
|
||||
table_idents_to_full_name(drop_table.table_name(), query_ctx.clone())?;
|
||||
let table_ref = TableReference::full(&catalog_name, &schema_name, &table_name);
|
||||
let table = self.sql_handler.get_table(&table_ref).await?;
|
||||
let req = DropTableRequest {
|
||||
catalog_name,
|
||||
schema_name,
|
||||
table_name,
|
||||
table_id: table.table_info().ident.table_id,
|
||||
};
|
||||
self.sql_handler
|
||||
.execute(SqlRequest::DropTable(req), query_ctx)
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
use api::v1::{AlterExpr, CreateTableExpr, DropTableExpr, FlushTableExpr};
|
||||
use common_catalog::consts::IMMUTABLE_FILE_ENGINE;
|
||||
use common_catalog::format_full_table_name;
|
||||
use common_grpc_expr::{alter_expr_to_request, create_expr_to_request};
|
||||
use common_query::Output;
|
||||
use common_telemetry::info;
|
||||
@@ -22,8 +23,8 @@ use snafu::prelude::*;
|
||||
use table::requests::{DropTableRequest, FlushTableRequest};
|
||||
|
||||
use crate::error::{
|
||||
AlterExprToRequestSnafu, BumpTableIdSnafu, CreateExprToRequestSnafu,
|
||||
IncorrectInternalStateSnafu, Result,
|
||||
AlterExprToRequestSnafu, BumpTableIdSnafu, CatalogSnafu, CreateExprToRequestSnafu,
|
||||
IncorrectInternalStateSnafu, Result, TableNotFoundSnafu,
|
||||
};
|
||||
use crate::instance::Instance;
|
||||
use crate::sql::SqlRequest;
|
||||
@@ -69,17 +70,45 @@ impl Instance {
|
||||
}
|
||||
|
||||
pub(crate) async fn handle_alter(&self, expr: AlterExpr) -> Result<Output> {
|
||||
let request = alter_expr_to_request(expr).context(AlterExprToRequestSnafu)?;
|
||||
let table = self
|
||||
.catalog_manager
|
||||
.table(&expr.catalog_name, &expr.schema_name, &expr.table_name)
|
||||
.await
|
||||
.context(CatalogSnafu)?
|
||||
.with_context(|| TableNotFoundSnafu {
|
||||
table_name: format_full_table_name(
|
||||
&expr.catalog_name,
|
||||
&expr.schema_name,
|
||||
&expr.table_name,
|
||||
),
|
||||
})?;
|
||||
|
||||
let request = alter_expr_to_request(table.table_info().ident.table_id, expr)
|
||||
.context(AlterExprToRequestSnafu)?;
|
||||
self.sql_handler()
|
||||
.execute(SqlRequest::Alter(request), QueryContext::arc())
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn handle_drop_table(&self, expr: DropTableExpr) -> Result<Output> {
|
||||
let table = self
|
||||
.catalog_manager
|
||||
.table(&expr.catalog_name, &expr.schema_name, &expr.table_name)
|
||||
.await
|
||||
.context(CatalogSnafu)?
|
||||
.with_context(|| TableNotFoundSnafu {
|
||||
table_name: format_full_table_name(
|
||||
&expr.catalog_name,
|
||||
&expr.schema_name,
|
||||
&expr.table_name,
|
||||
),
|
||||
})?;
|
||||
|
||||
let req = DropTableRequest {
|
||||
catalog_name: expr.catalog_name,
|
||||
schema_name: expr.schema_name,
|
||||
table_name: expr.table_name,
|
||||
table_id: table.table_info().ident.table_id,
|
||||
};
|
||||
self.sql_handler()
|
||||
.execute(SqlRequest::DropTable(req), QueryContext::arc())
|
||||
|
||||
@@ -19,6 +19,7 @@ use snafu::prelude::*;
|
||||
use sql::statements::alter::{AlterTable, AlterTableOperation};
|
||||
use sql::statements::column_def_to_schema;
|
||||
use table::engine::TableReference;
|
||||
use table::metadata::TableId;
|
||||
use table::requests::{AddColumnRequest, AlterKind, AlterTableRequest};
|
||||
use table_procedure::AlterTableProcedure;
|
||||
|
||||
@@ -60,6 +61,7 @@ impl SqlHandler {
|
||||
pub(crate) fn alter_to_request(
|
||||
alter_table: AlterTable,
|
||||
table_ref: TableReference,
|
||||
table_id: TableId,
|
||||
) -> Result<AlterTableRequest> {
|
||||
let alter_kind = match &alter_table.alter_operation() {
|
||||
AlterTableOperation::AddConstraint(table_constraint) => {
|
||||
@@ -91,6 +93,7 @@ impl SqlHandler {
|
||||
catalog_name: table_ref.catalog.to_string(),
|
||||
schema_name: table_ref.schema.to_string(),
|
||||
table_name: table_ref.table.to_string(),
|
||||
table_id,
|
||||
alter_kind,
|
||||
})
|
||||
}
|
||||
@@ -128,6 +131,7 @@ mod tests {
|
||||
let req = SqlHandler::alter_to_request(
|
||||
alter_table,
|
||||
TableReference::full("greptime", "public", "my_metric_1"),
|
||||
1,
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(req.catalog_name, "greptime");
|
||||
@@ -154,6 +158,7 @@ mod tests {
|
||||
let req = SqlHandler::alter_to_request(
|
||||
alter_table,
|
||||
TableReference::full("greptime", "public", "test_table"),
|
||||
1,
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(req.catalog_name, "greptime");
|
||||
|
||||
@@ -25,7 +25,7 @@ use object_store::ObjectStore;
|
||||
use snafu::ResultExt;
|
||||
use table::engine::{table_dir, EngineContext, TableEngine, TableEngineProcedure, TableReference};
|
||||
use table::error::TableOperationSnafu;
|
||||
use table::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder, TableType};
|
||||
use table::metadata::{TableId, TableInfo, TableInfoBuilder, TableMetaBuilder, TableType};
|
||||
use table::requests::{AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest};
|
||||
use table::{error as table_error, Result as TableResult, Table, TableRef};
|
||||
use tokio::sync::Mutex;
|
||||
@@ -88,16 +88,12 @@ impl TableEngine for ImmutableFileTableEngine {
|
||||
.fail()
|
||||
}
|
||||
|
||||
fn get_table(
|
||||
&self,
|
||||
_ctx: &EngineContext,
|
||||
table_ref: &TableReference,
|
||||
) -> TableResult<Option<TableRef>> {
|
||||
Ok(self.inner.get_table(table_ref))
|
||||
fn get_table(&self, _ctx: &EngineContext, table_id: TableId) -> TableResult<Option<TableRef>> {
|
||||
Ok(self.inner.get_table(table_id))
|
||||
}
|
||||
|
||||
fn table_exists(&self, _ctx: &EngineContext, table_ref: &TableReference) -> bool {
|
||||
self.inner.get_table(table_ref).is_some()
|
||||
fn table_exists(&self, _ctx: &EngineContext, table_id: TableId) -> bool {
|
||||
self.inner.get_table(table_id).is_some()
|
||||
}
|
||||
|
||||
async fn drop_table(
|
||||
@@ -151,8 +147,8 @@ impl TableEngineProcedure for ImmutableFileTableEngine {
|
||||
|
||||
#[cfg(test)]
|
||||
impl ImmutableFileTableEngine {
|
||||
pub async fn close_table(&self, table_ref: &TableReference<'_>) -> TableResult<()> {
|
||||
self.inner.close_table(table_ref).await
|
||||
pub async fn close_table(&self, table_id: TableId) -> TableResult<()> {
|
||||
self.inner.close_table(table_id).await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -173,10 +169,10 @@ impl ImmutableFileTableEngine {
|
||||
}
|
||||
|
||||
struct EngineInner {
|
||||
/// All tables opened by the engine. Map key is formatted [TableReference].
|
||||
/// All tables opened by the engine.
|
||||
///
|
||||
/// Writing to `tables` should also hold the `table_mutex`.
|
||||
tables: RwLock<HashMap<String, ImmutableFileTableRef>>,
|
||||
tables: RwLock<HashMap<TableId, ImmutableFileTableRef>>,
|
||||
object_store: ObjectStore,
|
||||
|
||||
/// Table mutex is used to protect the operations such as creating/opening/closing
|
||||
@@ -199,6 +195,7 @@ impl EngineInner {
|
||||
request: CreateTableRequest,
|
||||
) -> Result<TableRef> {
|
||||
let CreateTableRequest {
|
||||
id: table_id,
|
||||
catalog_name,
|
||||
schema_name,
|
||||
table_name,
|
||||
@@ -212,7 +209,7 @@ impl EngineInner {
|
||||
table: &table_name,
|
||||
};
|
||||
|
||||
if let Some(table) = self.get_table(&table_ref) {
|
||||
if let Some(table) = self.get_table(table_id) {
|
||||
return if create_if_not_exists {
|
||||
Ok(table)
|
||||
} else {
|
||||
@@ -223,14 +220,13 @@ impl EngineInner {
|
||||
let table_schema =
|
||||
Arc::new(Schema::try_from(request.schema).context(InvalidRawSchemaSnafu)?);
|
||||
|
||||
let table_id = request.id;
|
||||
let table_dir = table_dir(&catalog_name, &schema_name, table_id);
|
||||
|
||||
let table_full_name = table_ref.to_string();
|
||||
|
||||
let _lock = self.table_mutex.lock().await;
|
||||
// Checks again, read lock should be enough since we are guarded by the mutex.
|
||||
if let Some(table) = self.get_table_by_full_name(&table_full_name) {
|
||||
if let Some(table) = self.get_table(table_id) {
|
||||
return if request.create_if_not_exists {
|
||||
Ok(table)
|
||||
} else {
|
||||
@@ -279,27 +275,20 @@ impl EngineInner {
|
||||
table_id
|
||||
);
|
||||
|
||||
self.tables
|
||||
.write()
|
||||
.unwrap()
|
||||
.insert(table_full_name, table.clone());
|
||||
self.tables.write().unwrap().insert(table_id, table.clone());
|
||||
|
||||
Ok(table)
|
||||
}
|
||||
|
||||
fn get_table_by_full_name(&self, full_name: &str) -> Option<TableRef> {
|
||||
fn get_table(&self, table_id: TableId) -> Option<TableRef> {
|
||||
self.tables
|
||||
.read()
|
||||
.unwrap()
|
||||
.get(full_name)
|
||||
.get(&table_id)
|
||||
.cloned()
|
||||
.map(|table| table as _)
|
||||
}
|
||||
|
||||
fn get_table(&self, table_ref: &TableReference) -> Option<TableRef> {
|
||||
self.get_table_by_full_name(&table_ref.to_string())
|
||||
}
|
||||
|
||||
async fn open_table(
|
||||
&self,
|
||||
_ctx: &EngineContext,
|
||||
@@ -309,6 +298,7 @@ impl EngineInner {
|
||||
catalog_name,
|
||||
schema_name,
|
||||
table_name,
|
||||
table_id,
|
||||
..
|
||||
} = request;
|
||||
let table_ref = TableReference {
|
||||
@@ -317,16 +307,15 @@ impl EngineInner {
|
||||
table: &table_name,
|
||||
};
|
||||
|
||||
let table_full_name = table_ref.to_string();
|
||||
|
||||
if let Some(table) = self.get_table_by_full_name(&table_full_name) {
|
||||
if let Some(table) = self.get_table(table_id) {
|
||||
return Ok(Some(table));
|
||||
}
|
||||
|
||||
let table_full_name = table_ref.to_string();
|
||||
let table = {
|
||||
let _lock = self.table_mutex.lock().await;
|
||||
// Checks again, read lock should be enough since we are guarded by the mutex.
|
||||
if let Some(table) = self.get_table_by_full_name(&table_full_name) {
|
||||
if let Some(table) = self.get_table(table_id) {
|
||||
return Ok(Some(table));
|
||||
}
|
||||
|
||||
@@ -350,10 +339,7 @@ impl EngineInner {
|
||||
.context(table_error::TableOperationSnafu)?,
|
||||
);
|
||||
|
||||
self.tables
|
||||
.write()
|
||||
.unwrap()
|
||||
.insert(table_full_name, table.clone());
|
||||
self.tables.write().unwrap().insert(table_id, table.clone());
|
||||
Some(table as _)
|
||||
};
|
||||
|
||||
@@ -375,7 +361,7 @@ impl EngineInner {
|
||||
|
||||
let table_full_name = table_ref.to_string();
|
||||
let _lock = self.table_mutex.lock().await;
|
||||
if let Some(table) = self.get_table_by_full_name(&table_full_name) {
|
||||
if let Some(table) = self.get_table(req.table_id) {
|
||||
let table_id = table.table_info().ident.table_id;
|
||||
let table_dir = table_dir(&req.catalog_name, &req.schema_name, table_id);
|
||||
|
||||
@@ -389,7 +375,7 @@ impl EngineInner {
|
||||
.context(DropTableSnafu {
|
||||
table_name: &table_full_name,
|
||||
})?;
|
||||
self.tables.write().unwrap().remove(&table_full_name);
|
||||
self.tables.write().unwrap().remove(&req.table_id);
|
||||
|
||||
Ok(true)
|
||||
} else {
|
||||
@@ -429,12 +415,10 @@ impl EngineInner {
|
||||
|
||||
#[cfg(test)]
|
||||
impl EngineInner {
|
||||
pub async fn close_table(&self, table_ref: &TableReference<'_>) -> TableResult<()> {
|
||||
let full_name = table_ref.to_string();
|
||||
|
||||
pub async fn close_table(&self, table_id: TableId) -> TableResult<()> {
|
||||
let _lock = self.table_mutex.lock().await;
|
||||
|
||||
if let Some(table) = self.get_table_by_full_name(&full_name) {
|
||||
if let Some(table) = self.get_table(table_id) {
|
||||
let regions = Vec::new();
|
||||
table
|
||||
.close(®ions)
|
||||
@@ -443,7 +427,7 @@ impl EngineInner {
|
||||
.context(table_error::TableOperationSnafu)?;
|
||||
}
|
||||
|
||||
self.tables.write().unwrap().remove(&full_name);
|
||||
self.tables.write().unwrap().remove(&table_id);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -92,14 +92,13 @@ impl CreateImmutableFileTable {
|
||||
|
||||
fn on_prepare(&mut self) -> Result<Status> {
|
||||
let engine_ctx = EngineContext::default();
|
||||
let table_ref = self.data.table_ref();
|
||||
// Safety: Current get_table implementation always returns Ok.
|
||||
if self.engine.table_exists(&engine_ctx, &table_ref) {
|
||||
if self.engine.table_exists(&engine_ctx, self.data.request.id) {
|
||||
// The table already exists.
|
||||
ensure!(
|
||||
self.data.request.create_if_not_exists,
|
||||
TableExistsSnafu {
|
||||
table_name: table_ref.to_string(),
|
||||
table_name: self.data.table_ref().to_string(),
|
||||
}
|
||||
);
|
||||
|
||||
@@ -113,8 +112,7 @@ impl CreateImmutableFileTable {
|
||||
|
||||
async fn on_create_table(&mut self) -> Result<Status> {
|
||||
let engine_ctx = EngineContext::default();
|
||||
let table_ref = self.data.table_ref();
|
||||
if self.engine.table_exists(&engine_ctx, &table_ref) {
|
||||
if self.engine.table_exists(&engine_ctx, self.data.request.id) {
|
||||
// Table already created. We don't need to check create_if_not_exists as
|
||||
// we have checked it in prepare state.
|
||||
return Ok(Status::Done);
|
||||
|
||||
@@ -16,7 +16,7 @@ use std::assert_matches::assert_matches;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, IMMUTABLE_FILE_ENGINE};
|
||||
use table::engine::{EngineContext, TableEngine, TableEngineProcedure, TableReference};
|
||||
use table::engine::{EngineContext, TableEngine, TableEngineProcedure};
|
||||
use table::requests::{AlterKind, AlterTableRequest, DropTableRequest, OpenTableRequest};
|
||||
use table::{error as table_error, Table};
|
||||
|
||||
@@ -35,14 +35,9 @@ async fn test_get_table() {
|
||||
..
|
||||
} = test_util::setup_test_engine_and_table("test_get_table").await;
|
||||
let table_info = table.table_info();
|
||||
let table_ref = TableReference {
|
||||
catalog: &table_info.catalog_name,
|
||||
schema: &table_info.schema_name,
|
||||
table: &table_info.name,
|
||||
};
|
||||
|
||||
let got = table_engine
|
||||
.get_table(&EngineContext::default(), &table_ref)
|
||||
.get_table(&EngineContext::default(), table_info.ident.table_id)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
@@ -53,21 +48,17 @@ async fn test_get_table() {
|
||||
async fn test_open_table() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let ctx = EngineContext::default();
|
||||
// the test table id is 1
|
||||
let table_id = 1;
|
||||
let open_req = OpenTableRequest {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: test_util::TEST_TABLE_NAME.to_string(),
|
||||
// the test table id is 1
|
||||
table_id: 1,
|
||||
table_id,
|
||||
region_numbers: vec![0],
|
||||
};
|
||||
|
||||
let table_ref = TableReference {
|
||||
catalog: DEFAULT_CATALOG_NAME,
|
||||
schema: DEFAULT_SCHEMA_NAME,
|
||||
table: test_util::TEST_TABLE_NAME,
|
||||
};
|
||||
|
||||
let TestEngineComponents {
|
||||
table_engine,
|
||||
table_ref: table,
|
||||
@@ -77,7 +68,7 @@ async fn test_open_table() {
|
||||
|
||||
assert_eq!(IMMUTABLE_FILE_ENGINE, table_engine.name());
|
||||
|
||||
table_engine.close_table(&table_ref).await.unwrap();
|
||||
table_engine.close_table(table_id).await.unwrap();
|
||||
|
||||
let reopened = table_engine
|
||||
.open_table(&ctx, open_req.clone())
|
||||
@@ -101,21 +92,17 @@ async fn test_open_table() {
|
||||
async fn test_close_all_table() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let table_ref = TableReference {
|
||||
catalog: DEFAULT_CATALOG_NAME,
|
||||
schema: DEFAULT_SCHEMA_NAME,
|
||||
table: test_util::TEST_TABLE_NAME,
|
||||
};
|
||||
|
||||
let TestEngineComponents {
|
||||
table_engine,
|
||||
dir: _dir,
|
||||
table_ref: table,
|
||||
..
|
||||
} = test_util::setup_test_engine_and_table("test_close_all_table").await;
|
||||
|
||||
table_engine.close().await.unwrap();
|
||||
|
||||
let exist = table_engine.table_exists(&EngineContext::default(), &table_ref);
|
||||
let table_id = table.table_info().ident.table_id;
|
||||
let exist = table_engine.table_exists(&EngineContext::default(), table_id);
|
||||
|
||||
assert!(!exist);
|
||||
}
|
||||
@@ -126,6 +113,7 @@ async fn test_alter_table() {
|
||||
let TestEngineComponents {
|
||||
table_engine,
|
||||
dir: _dir,
|
||||
table_ref,
|
||||
..
|
||||
} = test_util::setup_test_engine_and_table("test_alter_table").await;
|
||||
|
||||
@@ -133,6 +121,7 @@ async fn test_alter_table() {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: TEST_TABLE_NAME.to_string(),
|
||||
table_id: table_ref.table_info().ident.table_id,
|
||||
alter_kind: AlterKind::RenameTable {
|
||||
new_table_name: "foo".to_string(),
|
||||
},
|
||||
@@ -151,12 +140,6 @@ async fn test_alter_table() {
|
||||
async fn test_drop_table() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let drop_req = DropTableRequest {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: TEST_TABLE_NAME.to_string(),
|
||||
};
|
||||
|
||||
let TestEngineComponents {
|
||||
table_engine,
|
||||
object_store,
|
||||
@@ -167,12 +150,13 @@ async fn test_drop_table() {
|
||||
} = test_util::setup_test_engine_and_table("test_drop_table").await;
|
||||
|
||||
let table_info = table.table_info();
|
||||
let table_ref = TableReference {
|
||||
catalog: &table_info.catalog_name,
|
||||
schema: &table_info.schema_name,
|
||||
table: &table_info.name,
|
||||
};
|
||||
|
||||
let drop_req = DropTableRequest {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: TEST_TABLE_NAME.to_string(),
|
||||
table_id: table_info.ident.table_id,
|
||||
};
|
||||
let dropped = table_engine
|
||||
.drop_table(&EngineContext::default(), drop_req)
|
||||
.await
|
||||
@@ -180,7 +164,7 @@ async fn test_drop_table() {
|
||||
|
||||
assert!(dropped);
|
||||
|
||||
let exist = table_engine.table_exists(&EngineContext::default(), &table_ref);
|
||||
let exist = table_engine.table_exists(&EngineContext::default(), table_info.ident.table_id);
|
||||
assert!(!exist);
|
||||
|
||||
// check table_dir manifest
|
||||
@@ -203,13 +187,14 @@ async fn test_create_drop_table_procedure() {
|
||||
let engine_ctx = EngineContext::default();
|
||||
// Test create table by procedure.
|
||||
let create_request = test_util::new_create_request(schema);
|
||||
let table_id = create_request.id;
|
||||
let mut procedure = table_engine
|
||||
.create_table_procedure(&engine_ctx, create_request.clone())
|
||||
.unwrap();
|
||||
common_procedure_test::execute_procedure_until_done(&mut procedure).await;
|
||||
|
||||
assert!(table_engine
|
||||
.get_table(&engine_ctx, &create_request.table_ref())
|
||||
.get_table(&engine_ctx, table_id)
|
||||
.unwrap()
|
||||
.is_some());
|
||||
|
||||
@@ -218,6 +203,7 @@ async fn test_create_drop_table_procedure() {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: TEST_TABLE_NAME.to_string(),
|
||||
table_id,
|
||||
};
|
||||
let mut procedure = table_engine
|
||||
.drop_table_procedure(&engine_ctx, drop_request)
|
||||
@@ -225,7 +211,7 @@ async fn test_create_drop_table_procedure() {
|
||||
common_procedure_test::execute_procedure_until_done(&mut procedure).await;
|
||||
|
||||
assert!(table_engine
|
||||
.get_table(&engine_ctx, &create_request.table_ref())
|
||||
.get_table(&engine_ctx, table_id)
|
||||
.unwrap()
|
||||
.is_none());
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
use std::any::Any;
|
||||
|
||||
use common_datasource::file_format::Format;
|
||||
use common_error::prelude::*;
|
||||
use datafusion::arrow::error::ArrowError;
|
||||
use datafusion::error::DataFusionError;
|
||||
@@ -175,6 +176,9 @@ pub enum Error {
|
||||
source: datatypes::error::Error,
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Unsupported format: {:?}", format))]
|
||||
UnsupportedFormat { format: Format, location: Location },
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -191,7 +195,8 @@ impl ErrorExt for Error {
|
||||
| BuildCsvConfig { .. }
|
||||
| ProjectSchema { .. }
|
||||
| MissingRequiredField { .. }
|
||||
| ConvertSchema { .. } => StatusCode::InvalidArguments,
|
||||
| ConvertSchema { .. }
|
||||
| UnsupportedFormat { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
BuildBackend { source, .. } => source.status_code(),
|
||||
BuildStreamAdapter { source, .. } => source.status_code(),
|
||||
|
||||
@@ -94,29 +94,8 @@ fn build_scan_plan<T: FileOpener + Send + 'static>(
|
||||
projection: Option<&Vec<usize>>,
|
||||
limit: Option<usize>,
|
||||
) -> Result<PhysicalPlanRef> {
|
||||
let stream = FileStream::new(
|
||||
&FileScanConfig {
|
||||
object_store_url: ObjectStoreUrl::parse("empty://").unwrap(), // won't be used
|
||||
file_schema,
|
||||
file_groups: vec![files
|
||||
.iter()
|
||||
.map(|filename| PartitionedFile::new(filename.to_string(), 0))
|
||||
.collect::<Vec<_>>()],
|
||||
statistics: Default::default(),
|
||||
projection: projection.cloned(),
|
||||
limit,
|
||||
table_partition_cols: vec![],
|
||||
output_ordering: None,
|
||||
infinite_source: false,
|
||||
},
|
||||
0, // partition: hard-code
|
||||
opener,
|
||||
&ExecutionPlanMetricsSet::new(),
|
||||
)
|
||||
.context(error::BuildStreamSnafu)?;
|
||||
let adapter = RecordBatchStreamAdapter::try_new(Box::pin(stream))
|
||||
.context(error::BuildStreamAdapterSnafu)?;
|
||||
Ok(Arc::new(StreamScanAdapter::new(Box::pin(adapter))))
|
||||
let adapter = build_record_batch_stream(opener, file_schema, files, projection, limit)?;
|
||||
Ok(Arc::new(StreamScanAdapter::new(adapter)))
|
||||
}
|
||||
|
||||
fn build_record_batch_stream<T: FileOpener + Send + 'static>(
|
||||
@@ -382,6 +361,7 @@ pub fn create_physical_plan(
|
||||
Format::Csv(format) => new_csv_scan_plan(ctx, config, format),
|
||||
Format::Json(format) => new_json_scan_plan(ctx, config, format),
|
||||
Format::Parquet(format) => new_parquet_scan_plan(ctx, config, format),
|
||||
_ => error::UnsupportedFormatSnafu { format: *format }.fail(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -394,5 +374,6 @@ pub fn create_stream(
|
||||
Format::Csv(format) => new_csv_stream(ctx, config, format),
|
||||
Format::Json(format) => new_json_stream(ctx, config, format),
|
||||
Format::Parquet(format) => new_parquet_stream(ctx, config, format),
|
||||
_ => error::UnsupportedFormatSnafu { format: *format }.fail(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
use std::any::Any;
|
||||
|
||||
use common_datasource::file_format::Format;
|
||||
use common_error::prelude::*;
|
||||
use datafusion::parquet;
|
||||
use datatypes::arrow::error::ArrowError;
|
||||
@@ -443,6 +444,9 @@ pub enum Error {
|
||||
source: common_datasource::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Unsupported format: {:?}", format))]
|
||||
UnsupportedFormat { location: Location, format: Format },
|
||||
|
||||
#[snafu(display("Failed to parse file format, source: {}", source))]
|
||||
ParseFileFormat {
|
||||
#[snafu(backtrace)]
|
||||
@@ -500,6 +504,12 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to read orc schema, source: {}", source))]
|
||||
ReadOrc {
|
||||
source: common_datasource::error::Error,
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to build parquet record batch stream, source: {}", source))]
|
||||
BuildParquetRecordBatchStream {
|
||||
location: Location,
|
||||
@@ -575,7 +585,8 @@ impl ErrorExt for Error {
|
||||
| Error::InvalidSchema { .. }
|
||||
| Error::PrepareImmutableTable { .. }
|
||||
| Error::BuildCsvConfig { .. }
|
||||
| Error::ProjectSchema { .. } => StatusCode::InvalidArguments,
|
||||
| Error::ProjectSchema { .. }
|
||||
| Error::UnsupportedFormat { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
Error::NotSupported { .. } => StatusCode::Unsupported,
|
||||
|
||||
@@ -667,7 +678,9 @@ impl ErrorExt for Error {
|
||||
|
||||
Error::TableScanExec { source, .. } => source.status_code(),
|
||||
|
||||
Error::ReadObject { .. } | Error::ReadParquet { .. } => StatusCode::StorageUnavailable,
|
||||
Error::ReadObject { .. } | Error::ReadParquet { .. } | Error::ReadOrc { .. } => {
|
||||
StatusCode::StorageUnavailable
|
||||
}
|
||||
|
||||
Error::ListObjects { source }
|
||||
| Error::ParseUrl { source }
|
||||
|
||||
@@ -543,8 +543,11 @@ impl DistInstance {
|
||||
table_name: format_full_table_name(catalog_name, schema_name, table_name),
|
||||
})?;
|
||||
|
||||
let request = common_grpc_expr::alter_expr_to_request(expr.clone())
|
||||
.context(AlterExprToRequestSnafu)?;
|
||||
let request = common_grpc_expr::alter_expr_to_request(
|
||||
table.table_info().ident.table_id,
|
||||
expr.clone(),
|
||||
)
|
||||
.context(AlterExprToRequestSnafu)?;
|
||||
|
||||
let mut context = AlterContext::with_capacity(1);
|
||||
|
||||
|
||||
@@ -20,6 +20,9 @@ use async_compat::CompatExt;
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_datasource::file_format::csv::{CsvConfigBuilder, CsvOpener};
|
||||
use common_datasource::file_format::json::JsonOpener;
|
||||
use common_datasource::file_format::orc::{
|
||||
infer_orc_schema, new_orc_stream_reader, OrcArrowStreamReaderAdapter,
|
||||
};
|
||||
use common_datasource::file_format::{FileFormat, Format};
|
||||
use common_datasource::lister::{Lister, Source};
|
||||
use common_datasource::object_store::{build_backend, parse_url};
|
||||
@@ -110,6 +113,18 @@ impl StatementExecutor {
|
||||
.context(error::ReadParquetSnafu)?;
|
||||
Ok(builder.schema().clone())
|
||||
}
|
||||
Format::Orc(_) => {
|
||||
let reader = object_store
|
||||
.reader(path)
|
||||
.await
|
||||
.context(error::ReadObjectSnafu { path })?;
|
||||
|
||||
let schema = infer_orc_schema(reader)
|
||||
.await
|
||||
.context(error::ReadOrcSnafu)?;
|
||||
|
||||
Ok(Arc::new(schema))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -201,6 +216,18 @@ impl StatementExecutor {
|
||||
|
||||
Ok(Box::pin(ParquetRecordBatchStreamAdapter::new(upstream)))
|
||||
}
|
||||
Format::Orc(_) => {
|
||||
let reader = object_store
|
||||
.reader(path)
|
||||
.await
|
||||
.context(error::ReadObjectSnafu { path })?;
|
||||
let stream = new_orc_stream_reader(reader)
|
||||
.await
|
||||
.context(error::ReadOrcSnafu)?;
|
||||
let stream = OrcArrowStreamReaderAdapter::new(stream);
|
||||
|
||||
Ok(Box::pin(stream))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -68,6 +68,7 @@ impl StatementExecutor {
|
||||
|
||||
Ok(rows_copied)
|
||||
}
|
||||
_ => error::UnsupportedFormatSnafu { format: *format }.fail(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -382,6 +382,7 @@ impl DistTable {
|
||||
schema_name,
|
||||
table_name,
|
||||
alter_kind,
|
||||
table_id: _table_id,
|
||||
} = request;
|
||||
|
||||
let alter_expr = context
|
||||
|
||||
@@ -42,8 +42,7 @@ use table::engine::{
|
||||
};
|
||||
use table::metadata::{TableId, TableInfo, TableVersion};
|
||||
use table::requests::{
|
||||
AlterKind, AlterTableRequest, CloseTableRequest, CreateTableRequest, DropTableRequest,
|
||||
OpenTableRequest,
|
||||
AlterTableRequest, CloseTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest,
|
||||
};
|
||||
use table::{error as table_error, Result as TableResult, Table, TableRef};
|
||||
|
||||
@@ -102,9 +101,8 @@ impl<S: StorageEngine> TableEngine for MitoEngine<S> {
|
||||
.map_err(BoxedError::new)
|
||||
.context(table_error::TableOperationSnafu)?;
|
||||
|
||||
let table_ref = request.table_ref();
|
||||
let _lock = self.inner.table_mutex.lock(table_ref.to_string()).await;
|
||||
if let Some(table) = self.inner.get_mito_table(&table_ref) {
|
||||
let _lock = self.inner.table_mutex.lock(request.id).await;
|
||||
if let Some(table) = self.inner.get_mito_table(request.id) {
|
||||
if request.create_if_not_exists {
|
||||
return Ok(table);
|
||||
} else {
|
||||
@@ -148,26 +146,10 @@ impl<S: StorageEngine> TableEngine for MitoEngine<S> {
|
||||
) -> TableResult<TableRef> {
|
||||
let _timer = common_telemetry::timer!(metrics::MITO_ALTER_TABLE_ELAPSED);
|
||||
|
||||
if let AlterKind::RenameTable { new_table_name } = &req.alter_kind {
|
||||
let mut table_ref = req.table_ref();
|
||||
table_ref.table = new_table_name;
|
||||
if self.inner.get_mito_table(&table_ref).is_some() {
|
||||
return TableExistsSnafu {
|
||||
table_name: table_ref.to_string(),
|
||||
}
|
||||
.fail()
|
||||
.map_err(BoxedError::new)
|
||||
.context(table_error::TableOperationSnafu)?;
|
||||
}
|
||||
}
|
||||
|
||||
let mut procedure = AlterMitoTable::new(req, self.inner.clone())
|
||||
.map_err(BoxedError::new)
|
||||
.context(table_error::TableOperationSnafu)?;
|
||||
|
||||
// TODO(yingwen): Rename has concurrent issue without the procedure runtime. But
|
||||
// users can't use this method to alter a table so it is still safe. We should
|
||||
// refactor the table engine to avoid using table name as key.
|
||||
procedure
|
||||
.engine_alter_table()
|
||||
.await
|
||||
@@ -175,16 +157,12 @@ impl<S: StorageEngine> TableEngine for MitoEngine<S> {
|
||||
.context(table_error::TableOperationSnafu)
|
||||
}
|
||||
|
||||
fn get_table(
|
||||
&self,
|
||||
_ctx: &EngineContext,
|
||||
table_ref: &TableReference,
|
||||
) -> TableResult<Option<TableRef>> {
|
||||
Ok(self.inner.get_table(table_ref))
|
||||
fn get_table(&self, _ctx: &EngineContext, table_id: TableId) -> TableResult<Option<TableRef>> {
|
||||
Ok(self.inner.get_table(table_id))
|
||||
}
|
||||
|
||||
fn table_exists(&self, _ctx: &EngineContext, table_ref: &TableReference) -> bool {
|
||||
self.inner.get_table(table_ref).is_some()
|
||||
fn table_exists(&self, _ctx: &EngineContext, table_id: TableId) -> bool {
|
||||
self.inner.get_table(table_id).is_some()
|
||||
}
|
||||
|
||||
async fn drop_table(
|
||||
@@ -254,16 +232,16 @@ impl<S: StorageEngine> TableEngineProcedure for MitoEngine<S> {
|
||||
}
|
||||
|
||||
pub(crate) struct MitoEngineInner<S: StorageEngine> {
|
||||
/// All tables opened by the engine. Map key is formatted [TableReference].
|
||||
/// All tables opened by the engine.
|
||||
///
|
||||
/// Writing to `tables` should also hold the `table_mutex`.
|
||||
tables: DashMap<String, Arc<MitoTable<S::Region>>>,
|
||||
tables: DashMap<TableId, Arc<MitoTable<S::Region>>>,
|
||||
object_store: ObjectStore,
|
||||
compress_type: CompressionType,
|
||||
storage_engine: S,
|
||||
/// Table mutex is used to protect the operations such as creating/opening/closing
|
||||
/// a table, to avoid things like opening the same table simultaneously.
|
||||
table_mutex: Arc<KeyLock<String>>,
|
||||
table_mutex: Arc<KeyLock<TableId>>,
|
||||
}
|
||||
|
||||
fn build_row_key_desc(
|
||||
@@ -429,11 +407,6 @@ impl<S: StorageEngine> MitoEngineInner<S> {
|
||||
let catalog_name = &request.catalog_name;
|
||||
let schema_name = &request.schema_name;
|
||||
let table_name = &request.table_name;
|
||||
let table_ref = TableReference {
|
||||
catalog: catalog_name,
|
||||
schema: schema_name,
|
||||
table: table_name,
|
||||
};
|
||||
|
||||
let table_id = request.table_id;
|
||||
let engine_ctx = StorageEngineContext::default();
|
||||
@@ -463,6 +436,11 @@ impl<S: StorageEngine> MitoEngineInner<S> {
|
||||
|
||||
let mut regions = HashMap::with_capacity(table_info.meta.region_numbers.len());
|
||||
|
||||
let table_ref = TableReference {
|
||||
catalog: catalog_name,
|
||||
schema: schema_name,
|
||||
table: table_name,
|
||||
};
|
||||
for region_number in &request.region_numbers {
|
||||
let region = self
|
||||
.open_region(&engine_ctx, table_id, *region_number, &table_ref, &opts)
|
||||
@@ -556,16 +534,7 @@ impl<S: StorageEngine> MitoEngineInner<S> {
|
||||
ctx: &EngineContext,
|
||||
request: OpenTableRequest,
|
||||
) -> TableResult<Option<TableRef>> {
|
||||
let catalog_name = &request.catalog_name;
|
||||
let schema_name = &request.schema_name;
|
||||
let table_name = &request.table_name;
|
||||
let table_ref = TableReference {
|
||||
catalog: catalog_name,
|
||||
schema: schema_name,
|
||||
table: table_name,
|
||||
};
|
||||
|
||||
if let Some(table) = self.get_table(&table_ref) {
|
||||
if let Some(table) = self.get_table(request.table_id) {
|
||||
if let Some(table) = self.check_regions(table, &request.region_numbers)? {
|
||||
return Ok(Some(table));
|
||||
}
|
||||
@@ -573,11 +542,10 @@ impl<S: StorageEngine> MitoEngineInner<S> {
|
||||
|
||||
// Acquires the mutex before opening a new table.
|
||||
let table = {
|
||||
let table_name_key = table_ref.to_string();
|
||||
let _lock = self.table_mutex.lock(table_name_key.clone()).await;
|
||||
let _lock = self.table_mutex.lock(request.table_id).await;
|
||||
|
||||
// Checks again, read lock should be enough since we are guarded by the mutex.
|
||||
if let Some(table) = self.get_mito_table(&table_ref) {
|
||||
if let Some(table) = self.get_mito_table(request.table_id) {
|
||||
// Contains all regions or target region
|
||||
if let Some(table) = self.check_regions(table.clone(), &request.region_numbers)? {
|
||||
Some(table)
|
||||
@@ -593,7 +561,7 @@ impl<S: StorageEngine> MitoEngineInner<S> {
|
||||
let table = self.recover_table(ctx, request.clone()).await?;
|
||||
if let Some(table) = table {
|
||||
// already locked
|
||||
self.tables.insert(table_ref.to_string(), table.clone());
|
||||
self.tables.insert(request.table_id, table.clone());
|
||||
|
||||
Some(table as _)
|
||||
} else {
|
||||
@@ -604,8 +572,8 @@ impl<S: StorageEngine> MitoEngineInner<S> {
|
||||
|
||||
logging::info!(
|
||||
"Mito engine opened table: {} in schema: {}",
|
||||
table_name,
|
||||
schema_name
|
||||
request.table_name,
|
||||
request.schema_name
|
||||
);
|
||||
|
||||
Ok(table)
|
||||
@@ -613,10 +581,8 @@ impl<S: StorageEngine> MitoEngineInner<S> {
|
||||
|
||||
async fn drop_table(&self, request: DropTableRequest) -> TableResult<bool> {
|
||||
// Remove the table from the engine to avoid further access from users.
|
||||
let table_ref = request.table_ref();
|
||||
|
||||
let _lock = self.table_mutex.lock(table_ref.to_string()).await;
|
||||
let removed_table = self.tables.remove(&table_ref.to_string());
|
||||
let _lock = self.table_mutex.lock(request.table_id).await;
|
||||
let removed_table = self.tables.remove(&request.table_id);
|
||||
// Close the table to close all regions. Closing a region is idempotent.
|
||||
if let Some((_, table)) = &removed_table {
|
||||
let regions = table.region_ids();
|
||||
@@ -663,17 +629,13 @@ impl<S: StorageEngine> MitoEngineInner<S> {
|
||||
Ok(Some((manifest, table_info)))
|
||||
}
|
||||
|
||||
fn get_table(&self, table_ref: &TableReference) -> Option<TableRef> {
|
||||
self.tables
|
||||
.get(&table_ref.to_string())
|
||||
.map(|en| en.value().clone() as _)
|
||||
fn get_table(&self, table_id: TableId) -> Option<TableRef> {
|
||||
self.tables.get(&table_id).map(|en| en.value().clone() as _)
|
||||
}
|
||||
|
||||
/// Returns the [MitoTable].
|
||||
fn get_mito_table(&self, table_ref: &TableReference) -> Option<Arc<MitoTable<S::Region>>> {
|
||||
self.tables
|
||||
.get(&table_ref.to_string())
|
||||
.map(|en| en.value().clone())
|
||||
fn get_mito_table(&self, table_id: TableId) -> Option<Arc<MitoTable<S::Region>>> {
|
||||
self.tables.get(&table_id).map(|en| en.value().clone())
|
||||
}
|
||||
|
||||
async fn close(&self) -> TableResult<()> {
|
||||
@@ -696,8 +658,7 @@ impl<S: StorageEngine> MitoEngineInner<S> {
|
||||
}
|
||||
|
||||
async fn close_table(&self, request: CloseTableRequest) -> TableResult<CloseTableResult> {
|
||||
let table_ref = request.table_ref();
|
||||
if let Some(table) = self.get_mito_table(&table_ref) {
|
||||
if let Some(table) = self.get_mito_table(request.table_id) {
|
||||
return self
|
||||
.close_table_inner(table, Some(&request.region_numbers), request.flush)
|
||||
.await;
|
||||
@@ -713,13 +674,8 @@ impl<S: StorageEngine> MitoEngineInner<S> {
|
||||
flush: bool,
|
||||
) -> TableResult<CloseTableResult> {
|
||||
let info = table.table_info();
|
||||
let table_ref = TableReference {
|
||||
catalog: &info.catalog_name,
|
||||
schema: &info.schema_name,
|
||||
table: &info.name,
|
||||
};
|
||||
let table_id = info.ident.table_id;
|
||||
let _lock = self.table_mutex.lock(table_ref.to_string()).await;
|
||||
let _lock = self.table_mutex.lock(table_id).await;
|
||||
|
||||
let all_regions = table.region_ids();
|
||||
let regions = regions.unwrap_or(&all_regions);
|
||||
@@ -738,12 +694,12 @@ impl<S: StorageEngine> MitoEngineInner<S> {
|
||||
}
|
||||
|
||||
if table.is_releasable() {
|
||||
self.tables.remove(&table_ref.to_string());
|
||||
self.tables.remove(&table_id);
|
||||
|
||||
logging::info!(
|
||||
"Mito engine closed table: {} in schema: {}",
|
||||
table_ref.table,
|
||||
table_ref.schema,
|
||||
info.name,
|
||||
info.schema_name,
|
||||
);
|
||||
return Ok(CloseTableResult::Released(removed_regions));
|
||||
}
|
||||
|
||||
@@ -29,9 +29,7 @@ use table::requests::{AlterKind, AlterTableRequest};
|
||||
use table::{Table, TableRef};
|
||||
|
||||
use crate::engine::MitoEngineInner;
|
||||
use crate::error::{
|
||||
TableExistsSnafu, TableNotFoundSnafu, UpdateTableManifestSnafu, VersionChangedSnafu,
|
||||
};
|
||||
use crate::error::{TableNotFoundSnafu, UpdateTableManifestSnafu, VersionChangedSnafu};
|
||||
use crate::manifest::action::{TableChange, TableMetaAction, TableMetaActionList};
|
||||
use crate::metrics;
|
||||
use crate::table::MitoTable;
|
||||
@@ -39,7 +37,6 @@ use crate::table::MitoTable;
|
||||
/// Procedure to alter a [MitoTable].
|
||||
pub(crate) struct AlterMitoTable<S: StorageEngine> {
|
||||
data: AlterTableData,
|
||||
engine_inner: Arc<MitoEngineInner<S>>,
|
||||
table: Arc<MitoTable<S::Region>>,
|
||||
/// The table info after alteration.
|
||||
new_info: Option<TableInfo>,
|
||||
@@ -107,18 +104,16 @@ impl<S: StorageEngine> AlterMitoTable<S> {
|
||||
table_version: 0,
|
||||
};
|
||||
let table_ref = data.table_ref();
|
||||
let table =
|
||||
engine_inner
|
||||
.get_mito_table(&table_ref)
|
||||
.with_context(|| TableNotFoundSnafu {
|
||||
table_name: table_ref.to_string(),
|
||||
})?;
|
||||
let table = engine_inner
|
||||
.get_mito_table(data.request.table_id)
|
||||
.with_context(|| TableNotFoundSnafu {
|
||||
table_name: table_ref.to_string(),
|
||||
})?;
|
||||
let info = table.table_info();
|
||||
data.table_version = info.ident.version;
|
||||
|
||||
Ok(AlterMitoTable {
|
||||
data,
|
||||
engine_inner,
|
||||
table,
|
||||
new_info: None,
|
||||
alter_op: None,
|
||||
@@ -148,16 +143,14 @@ impl<S: StorageEngine> AlterMitoTable<S> {
|
||||
fn from_json(json: &str, engine_inner: Arc<MitoEngineInner<S>>) -> Result<Self> {
|
||||
let data: AlterTableData = serde_json::from_str(json).context(FromJsonSnafu)?;
|
||||
let table_ref = data.table_ref();
|
||||
let table =
|
||||
engine_inner
|
||||
.get_mito_table(&table_ref)
|
||||
.with_context(|| TableNotFoundSnafu {
|
||||
table_name: table_ref.to_string(),
|
||||
})?;
|
||||
let table = engine_inner
|
||||
.get_mito_table(data.request.table_id)
|
||||
.with_context(|| TableNotFoundSnafu {
|
||||
table_name: table_ref.to_string(),
|
||||
})?;
|
||||
|
||||
Ok(AlterMitoTable {
|
||||
data,
|
||||
engine_inner,
|
||||
table,
|
||||
new_info: None,
|
||||
alter_op: None,
|
||||
@@ -176,17 +169,8 @@ impl<S: StorageEngine> AlterMitoTable<S> {
|
||||
}
|
||||
);
|
||||
|
||||
if let AlterKind::RenameTable { new_table_name } = &self.data.request.alter_kind {
|
||||
let mut table_ref = self.data.table_ref();
|
||||
table_ref.table = new_table_name;
|
||||
ensure!(
|
||||
self.engine_inner.get_mito_table(&table_ref).is_none(),
|
||||
TableExistsSnafu {
|
||||
table_name: table_ref.to_string(),
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
// We don't check the table name in the table engine as it is the catalog
|
||||
// manager's duty to ensure the table name is unused.
|
||||
self.data.state = AlterTableState::EngineAlterTable;
|
||||
|
||||
Ok(Status::executing(true))
|
||||
@@ -252,22 +236,6 @@ impl<S: StorageEngine> AlterMitoTable<S> {
|
||||
// Update in memory metadata of the table.
|
||||
self.table.set_table_info(new_info.clone());
|
||||
|
||||
// Rename key in tables map.
|
||||
if let AlterKind::RenameTable { new_table_name } = &self.data.request.alter_kind {
|
||||
let mut table_ref = self.data.table_ref();
|
||||
let removed = {
|
||||
let _lock = self.engine_inner.table_mutex.lock(table_ref.to_string());
|
||||
self.engine_inner.tables.remove(&table_ref.to_string())
|
||||
};
|
||||
ensure!(removed.is_some(), TableNotFoundSnafu { table_name });
|
||||
|
||||
table_ref.table = new_table_name.as_str();
|
||||
let _lock = self.engine_inner.table_mutex.lock(table_ref.to_string());
|
||||
self.engine_inner
|
||||
.tables
|
||||
.insert(table_ref.to_string(), self.table.clone());
|
||||
}
|
||||
|
||||
Ok(self.table.clone())
|
||||
}
|
||||
|
||||
@@ -363,19 +331,15 @@ mod tests {
|
||||
procedure_test_util::execute_procedure_until_done(&mut procedure).await;
|
||||
|
||||
// Get metadata of the created table.
|
||||
let table_ref = TableReference {
|
||||
catalog: &request.catalog_name,
|
||||
schema: &request.schema_name,
|
||||
table: &request.table_name,
|
||||
};
|
||||
let table = table_engine
|
||||
.get_table(&engine_ctx, &table_ref)
|
||||
.get_table(&engine_ctx, request.id)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let old_info = table.table_info();
|
||||
let old_meta = &old_info.meta;
|
||||
|
||||
// Alter the table.
|
||||
let table_id = request.id;
|
||||
let request = new_add_columns_req();
|
||||
let mut procedure = table_engine
|
||||
.alter_table_procedure(&engine_ctx, request.clone())
|
||||
@@ -384,7 +348,7 @@ mod tests {
|
||||
|
||||
// Validate.
|
||||
let table = table_engine
|
||||
.get_table(&engine_ctx, &table_ref)
|
||||
.get_table(&engine_ctx, table_id)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let new_info = table.table_info();
|
||||
@@ -405,7 +369,7 @@ mod tests {
|
||||
ConcreteDataType::string_datatype(),
|
||||
true,
|
||||
);
|
||||
let request = new_add_columns_req_with_location(&new_tag, &new_field);
|
||||
let request = new_add_columns_req_with_location(table_id, &new_tag, &new_field);
|
||||
let mut procedure = table_engine
|
||||
.alter_table_procedure(&engine_ctx, request.clone())
|
||||
.unwrap();
|
||||
@@ -413,7 +377,7 @@ mod tests {
|
||||
|
||||
// Validate.
|
||||
let table = table_engine
|
||||
.get_table(&engine_ctx, &table_ref)
|
||||
.get_table(&engine_ctx, table_id)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let new_info = table.table_info();
|
||||
@@ -456,6 +420,7 @@ mod tests {
|
||||
procedure_test_util::execute_procedure_until_done(&mut procedure).await;
|
||||
|
||||
// Add columns.
|
||||
let table_id = request.id;
|
||||
let request = new_add_columns_req();
|
||||
let mut procedure = table_engine
|
||||
.alter_table_procedure(&engine_ctx, request.clone())
|
||||
@@ -463,13 +428,8 @@ mod tests {
|
||||
procedure_test_util::execute_procedure_until_done(&mut procedure).await;
|
||||
|
||||
// Get metadata.
|
||||
let table_ref = TableReference {
|
||||
catalog: &request.catalog_name,
|
||||
schema: &request.schema_name,
|
||||
table: &request.table_name,
|
||||
};
|
||||
let table = table_engine
|
||||
.get_table(&engine_ctx, &table_ref)
|
||||
.get_table(&engine_ctx, table_id)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let old_info = table.table_info();
|
||||
@@ -521,13 +481,8 @@ mod tests {
|
||||
procedure_test_util::execute_procedure_until_done(&mut procedure).await;
|
||||
|
||||
// Get metadata of the created table.
|
||||
let mut table_ref = TableReference {
|
||||
catalog: &create_request.catalog_name,
|
||||
schema: &create_request.schema_name,
|
||||
table: &create_request.table_name,
|
||||
};
|
||||
let table = table_engine
|
||||
.get_table(&engine_ctx, &table_ref)
|
||||
.get_table(&engine_ctx, create_request.id)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
@@ -546,12 +501,7 @@ mod tests {
|
||||
let info = table.table_info();
|
||||
assert_eq!(new_name, info.name);
|
||||
assert!(table_engine
|
||||
.get_table(&engine_ctx, &table_ref)
|
||||
.unwrap()
|
||||
.is_none());
|
||||
table_ref.table = &new_name;
|
||||
assert!(table_engine
|
||||
.get_table(&engine_ctx, &table_ref)
|
||||
.get_table(&engine_ctx, create_request.id)
|
||||
.unwrap()
|
||||
.is_some());
|
||||
}
|
||||
|
||||
@@ -131,7 +131,12 @@ impl<S: StorageEngine> CreateMitoTable<S> {
|
||||
let table_ref = self.creator.data.table_ref();
|
||||
logging::debug!("on prepare create table {}", table_ref);
|
||||
|
||||
if self.creator.engine_inner.get_table(&table_ref).is_some() {
|
||||
if self
|
||||
.creator
|
||||
.engine_inner
|
||||
.get_table(self.creator.data.request.id)
|
||||
.is_some()
|
||||
{
|
||||
// If the table already exists.
|
||||
ensure!(
|
||||
self.creator.data.request.create_if_not_exists,
|
||||
@@ -152,14 +157,14 @@ impl<S: StorageEngine> CreateMitoTable<S> {
|
||||
async fn on_engine_create_table(&mut self) -> Result<Status> {
|
||||
// In this state, we can ensure we are able to create a new table.
|
||||
let table_ref = self.creator.data.table_ref();
|
||||
logging::debug!("on engine create table {}", table_ref);
|
||||
let table_id = self.creator.data.request.id;
|
||||
logging::debug!(
|
||||
"on engine create table {}, table_id: {}",
|
||||
table_ref,
|
||||
table_id
|
||||
);
|
||||
|
||||
let _lock = self
|
||||
.creator
|
||||
.engine_inner
|
||||
.table_mutex
|
||||
.lock(table_ref.to_string())
|
||||
.await;
|
||||
let _lock = self.creator.engine_inner.table_mutex.lock(table_id).await;
|
||||
self.creator.create_table().await?;
|
||||
|
||||
Ok(Status::Done)
|
||||
@@ -209,14 +214,13 @@ impl<S: StorageEngine> TableCreator<S> {
|
||||
self.data.request.id,
|
||||
);
|
||||
|
||||
let table_ref = self.data.table_ref();
|
||||
// It is possible that the procedure retries `CREATE TABLE` many times, so we
|
||||
// return the table if it exists.
|
||||
if let Some(table) = self.engine_inner.get_table(&table_ref) {
|
||||
if let Some(table) = self.engine_inner.get_table(self.data.request.id) {
|
||||
return Ok(table.clone());
|
||||
}
|
||||
|
||||
logging::debug!("Creator create table {}", table_ref);
|
||||
logging::debug!("Creator create table {}", self.data.table_ref());
|
||||
|
||||
self.create_regions(&table_dir).await?;
|
||||
|
||||
@@ -313,7 +317,6 @@ impl<S: StorageEngine> TableCreator<S> {
|
||||
/// Writes metadata to the table manifest.
|
||||
async fn write_table_manifest(&mut self, table_dir: &str) -> Result<TableRef> {
|
||||
// Try to open the table first, as the table manifest might already exist.
|
||||
let table_ref = self.data.table_ref();
|
||||
if let Some((manifest, table_info)) = self
|
||||
.engine_inner
|
||||
.recover_table_manifest_and_info(&self.data.request.table_name, table_dir)
|
||||
@@ -323,7 +326,7 @@ impl<S: StorageEngine> TableCreator<S> {
|
||||
|
||||
self.engine_inner
|
||||
.tables
|
||||
.insert(table_ref.to_string(), table.clone());
|
||||
.insert(self.data.request.id, table.clone());
|
||||
return Ok(table);
|
||||
}
|
||||
|
||||
@@ -333,7 +336,7 @@ impl<S: StorageEngine> TableCreator<S> {
|
||||
|
||||
self.engine_inner
|
||||
.tables
|
||||
.insert(table_ref.to_string(), table.clone());
|
||||
.insert(self.data.request.id, table.clone());
|
||||
|
||||
Ok(table)
|
||||
}
|
||||
@@ -432,13 +435,8 @@ mod tests {
|
||||
.unwrap();
|
||||
procedure_test_util::execute_procedure_until_done(&mut procedure).await;
|
||||
|
||||
let table_ref = TableReference {
|
||||
catalog: &request.catalog_name,
|
||||
schema: &request.schema_name,
|
||||
table: &request.table_name,
|
||||
};
|
||||
assert!(table_engine
|
||||
.get_table(&EngineContext::default(), &table_ref)
|
||||
.get_table(&EngineContext::default(), request.id)
|
||||
.unwrap()
|
||||
.is_some());
|
||||
}
|
||||
|
||||
@@ -84,8 +84,7 @@ impl<S: StorageEngine> DropMitoTable<S> {
|
||||
state: DropTableState::Prepare,
|
||||
request,
|
||||
};
|
||||
let table_ref = data.table_ref();
|
||||
let table = engine_inner.get_mito_table(&table_ref);
|
||||
let table = engine_inner.get_mito_table(data.request.table_id);
|
||||
|
||||
Ok(DropMitoTable {
|
||||
data,
|
||||
@@ -115,8 +114,7 @@ impl<S: StorageEngine> DropMitoTable<S> {
|
||||
/// Recover the procedure from json.
|
||||
fn from_json(json: &str, engine_inner: Arc<MitoEngineInner<S>>) -> Result<Self> {
|
||||
let data: DropTableData = serde_json::from_str(json).context(FromJsonSnafu)?;
|
||||
let table_ref = data.table_ref();
|
||||
let table = engine_inner.get_mito_table(&table_ref);
|
||||
let table = engine_inner.get_mito_table(data.request.table_id);
|
||||
|
||||
Ok(DropMitoTable {
|
||||
data,
|
||||
@@ -182,6 +180,7 @@ mod tests {
|
||||
procedure_test_util::execute_procedure_until_done(&mut procedure).await;
|
||||
|
||||
// Drop the table.
|
||||
let table_id = request.id;
|
||||
let request = test_util::new_drop_request();
|
||||
let mut procedure = table_engine
|
||||
.drop_table_procedure(&engine_ctx, request.clone())
|
||||
@@ -189,13 +188,8 @@ mod tests {
|
||||
procedure_test_util::execute_procedure_until_done(&mut procedure).await;
|
||||
|
||||
// The table is dropped.
|
||||
let table_ref = TableReference {
|
||||
catalog: &request.catalog_name,
|
||||
schema: &request.schema_name,
|
||||
table: &request.table_name,
|
||||
};
|
||||
assert!(table_engine
|
||||
.get_table(&engine_ctx, &table_ref)
|
||||
.get_table(&engine_ctx, table_id)
|
||||
.unwrap()
|
||||
.is_none());
|
||||
}
|
||||
|
||||
@@ -40,7 +40,7 @@ use table::Table;
|
||||
|
||||
use super::*;
|
||||
use crate::table::test_util::{
|
||||
self, new_insert_request, schema_for_test, setup_table, TestEngineComponents, TABLE_NAME,
|
||||
self, new_insert_request, setup_table, TestEngineComponents, TABLE_NAME,
|
||||
};
|
||||
|
||||
pub fn has_parquet_file(sst_dir: &str) -> bool {
|
||||
@@ -537,11 +537,16 @@ fn test_region_id() {
|
||||
assert_eq!(18446744069414584330, region_id(u32::MAX, 10));
|
||||
}
|
||||
|
||||
fn new_add_columns_req(new_tag: &ColumnSchema, new_field: &ColumnSchema) -> AlterTableRequest {
|
||||
fn new_add_columns_req(
|
||||
table_id: TableId,
|
||||
new_tag: &ColumnSchema,
|
||||
new_field: &ColumnSchema,
|
||||
) -> AlterTableRequest {
|
||||
AlterTableRequest {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: TABLE_NAME.to_string(),
|
||||
table_id,
|
||||
alter_kind: AlterKind::AddColumns {
|
||||
columns: vec![
|
||||
AddColumnRequest {
|
||||
@@ -560,6 +565,7 @@ fn new_add_columns_req(new_tag: &ColumnSchema, new_field: &ColumnSchema) -> Alte
|
||||
}
|
||||
|
||||
pub(crate) fn new_add_columns_req_with_location(
|
||||
table_id: TableId,
|
||||
new_tag: &ColumnSchema,
|
||||
new_field: &ColumnSchema,
|
||||
) -> AlterTableRequest {
|
||||
@@ -567,6 +573,7 @@ pub(crate) fn new_add_columns_req_with_location(
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: TABLE_NAME.to_string(),
|
||||
table_id,
|
||||
alter_kind: AlterKind::AddColumns {
|
||||
columns: vec![
|
||||
AddColumnRequest {
|
||||
@@ -597,7 +604,7 @@ async fn test_alter_table_add_column() {
|
||||
|
||||
let new_tag = ColumnSchema::new("my_tag", ConcreteDataType::string_datatype(), true);
|
||||
let new_field = ColumnSchema::new("my_field", ConcreteDataType::string_datatype(), true);
|
||||
let req = new_add_columns_req(&new_tag, &new_field);
|
||||
let req = new_add_columns_req(table.table_info().ident.table_id, &new_tag, &new_field);
|
||||
let table = table_engine
|
||||
.alter_table(&EngineContext::default(), req)
|
||||
.await
|
||||
@@ -633,7 +640,7 @@ async fn test_alter_table_add_column() {
|
||||
ConcreteDataType::string_datatype(),
|
||||
true,
|
||||
);
|
||||
let req = new_add_columns_req_with_location(&new_tag, &new_field);
|
||||
let req = new_add_columns_req_with_location(new_info.ident.table_id, &new_tag, &new_field);
|
||||
let table = table_engine
|
||||
.alter_table(&EngineContext::default(), req)
|
||||
.await
|
||||
@@ -653,13 +660,13 @@ async fn test_alter_table_add_column() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_alter_table_remove_column() {
|
||||
let (_engine, table_engine, _table, _object_store, _dir) =
|
||||
let (_engine, table_engine, table, _object_store, _dir) =
|
||||
test_util::setup_mock_engine_and_table().await;
|
||||
|
||||
// Add two columns to the table first.
|
||||
let new_tag = ColumnSchema::new("my_tag", ConcreteDataType::string_datatype(), true);
|
||||
let new_field = ColumnSchema::new("my_field", ConcreteDataType::string_datatype(), true);
|
||||
let req = new_add_columns_req(&new_tag, &new_field);
|
||||
let req = new_add_columns_req(table.table_info().ident.table_id, &new_tag, &new_field);
|
||||
let table = table_engine
|
||||
.alter_table(&EngineContext::default(), req)
|
||||
.await
|
||||
@@ -674,6 +681,7 @@ async fn test_alter_table_remove_column() {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: TABLE_NAME.to_string(),
|
||||
table_id: table.table_info().ident.table_id,
|
||||
alter_kind: AlterKind::DropColumns {
|
||||
names: vec![String::from("memory"), String::from("my_field")],
|
||||
},
|
||||
@@ -706,45 +714,13 @@ async fn test_alter_rename_table() {
|
||||
let TestEngineComponents {
|
||||
table_engine,
|
||||
storage_engine,
|
||||
table_ref,
|
||||
object_store,
|
||||
dir: _dir,
|
||||
..
|
||||
} = test_util::setup_test_engine_and_table().await;
|
||||
let ctx = EngineContext::default();
|
||||
|
||||
// register another table
|
||||
let another_name = "another_table";
|
||||
let req = CreateTableRequest {
|
||||
id: 1024,
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: another_name.to_string(),
|
||||
desc: Some("another test table".to_string()),
|
||||
schema: RawSchema::from(&schema_for_test()),
|
||||
region_numbers: vec![0],
|
||||
primary_key_indices: vec![0],
|
||||
create_if_not_exists: true,
|
||||
table_options: TableOptions::default(),
|
||||
engine: MITO_ENGINE.to_string(),
|
||||
};
|
||||
table_engine
|
||||
.create_table(&ctx, req)
|
||||
.await
|
||||
.expect("create table must succeed");
|
||||
// test renaming a table with an existing name.
|
||||
let req = AlterTableRequest {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: TABLE_NAME.to_string(),
|
||||
alter_kind: AlterKind::RenameTable {
|
||||
new_table_name: another_name.to_string(),
|
||||
},
|
||||
};
|
||||
let err = table_engine.alter_table(&ctx, req).await.err().unwrap();
|
||||
assert!(
|
||||
err.to_string().contains("Table already exists"),
|
||||
"Unexpected error: {err}"
|
||||
);
|
||||
let table_id = table_ref.table_info().ident.table_id;
|
||||
|
||||
let new_table_name = "test_table";
|
||||
// test rename table
|
||||
@@ -752,6 +728,7 @@ async fn test_alter_rename_table() {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: TABLE_NAME.to_string(),
|
||||
table_id,
|
||||
alter_kind: AlterKind::RenameTable {
|
||||
new_table_name: new_table_name.to_string(),
|
||||
},
|
||||
@@ -765,7 +742,7 @@ async fn test_alter_rename_table() {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: new_table_name.to_string(),
|
||||
table_id: 1,
|
||||
table_id,
|
||||
region_numbers: vec![0],
|
||||
};
|
||||
|
||||
@@ -794,17 +771,13 @@ async fn test_drop_table() {
|
||||
let engine_ctx = EngineContext {};
|
||||
|
||||
let table_info = table.table_info();
|
||||
let table_reference = TableReference {
|
||||
catalog: DEFAULT_CATALOG_NAME,
|
||||
schema: DEFAULT_SCHEMA_NAME,
|
||||
table: &table_info.name,
|
||||
};
|
||||
|
||||
let table_id = 1;
|
||||
let create_table_request = CreateTableRequest {
|
||||
id: 1,
|
||||
id: table_id,
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: table_info.name.to_string(),
|
||||
table_name: table_info.name.clone(),
|
||||
schema: RawSchema::from(&*table_info.meta.schema),
|
||||
create_if_not_exists: true,
|
||||
desc: None,
|
||||
@@ -819,23 +792,25 @@ async fn test_drop_table() {
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(table_info, created_table.table_info());
|
||||
assert!(table_engine.table_exists(&engine_ctx, &table_reference));
|
||||
assert!(table_engine.table_exists(&engine_ctx, table_id));
|
||||
|
||||
let drop_table_request = DropTableRequest {
|
||||
catalog_name: table_reference.catalog.to_string(),
|
||||
schema_name: table_reference.schema.to_string(),
|
||||
table_name: table_reference.table.to_string(),
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: table_info.name.clone(),
|
||||
table_id,
|
||||
};
|
||||
let table_dropped = table_engine
|
||||
.drop_table(&engine_ctx, drop_table_request)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(table_dropped);
|
||||
assert!(!table_engine.table_exists(&engine_ctx, &table_reference));
|
||||
assert!(!table_engine.table_exists(&engine_ctx, table_id));
|
||||
|
||||
// should be able to re-create
|
||||
let table_id = 2;
|
||||
let request = CreateTableRequest {
|
||||
id: 2,
|
||||
id: table_id,
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: table_info.name.to_string(),
|
||||
@@ -848,7 +823,7 @@ async fn test_drop_table() {
|
||||
engine: MITO_ENGINE.to_string(),
|
||||
};
|
||||
table_engine.create_table(&ctx, request).await.unwrap();
|
||||
assert!(table_engine.table_exists(&engine_ctx, &table_reference));
|
||||
assert!(table_engine.table_exists(&engine_ctx, table_id));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
@@ -28,7 +28,7 @@ use storage::compaction::noop::NoopCompactionScheduler;
|
||||
use storage::config::EngineConfig as StorageEngineConfig;
|
||||
use storage::EngineImpl;
|
||||
use table::engine::{EngineContext, TableEngine};
|
||||
use table::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder, TableType};
|
||||
use table::metadata::{TableId, TableInfo, TableInfoBuilder, TableMetaBuilder, TableType};
|
||||
use table::requests::{
|
||||
AlterKind, AlterTableRequest, CreateTableRequest, DropTableRequest, InsertRequest, TableOptions,
|
||||
};
|
||||
@@ -39,6 +39,7 @@ use crate::engine::{MitoEngine, MITO_ENGINE};
|
||||
pub use crate::table::test_util::mock_engine::{MockEngine, MockRegion};
|
||||
|
||||
pub const TABLE_NAME: &str = "demo";
|
||||
pub const TABLE_ID: TableId = 1;
|
||||
|
||||
/// Create a InsertRequest with default catalog and schema.
|
||||
pub fn new_insert_request(
|
||||
@@ -107,7 +108,7 @@ pub async fn new_test_object_store(prefix: &str) -> (TempDir, ObjectStore) {
|
||||
|
||||
pub fn new_create_request(schema: SchemaRef) -> CreateTableRequest {
|
||||
CreateTableRequest {
|
||||
id: 1,
|
||||
id: TABLE_ID,
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: TABLE_NAME.to_string(),
|
||||
@@ -126,6 +127,7 @@ pub fn new_alter_request(alter_kind: AlterKind) -> AlterTableRequest {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: TABLE_NAME.to_string(),
|
||||
table_id: TABLE_ID,
|
||||
alter_kind,
|
||||
}
|
||||
}
|
||||
@@ -135,6 +137,7 @@ pub fn new_drop_request() -> DropTableRequest {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: TABLE_NAME.to_string(),
|
||||
table_id: TABLE_ID,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -332,6 +332,7 @@ fn parse_immutable_file_table_format(
|
||||
Format::Csv(format) => Box::new(format),
|
||||
Format::Json(format) => Box::new(format),
|
||||
Format::Parquet(format) => Box::new(format),
|
||||
Format::Orc(format) => Box::new(format),
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
@@ -312,7 +312,7 @@ where
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::atomic::AtomicI32;
|
||||
use std::sync::atomic::{AtomicBool, AtomicI32};
|
||||
use std::time::Duration;
|
||||
|
||||
use store_api::storage::RegionId;
|
||||
@@ -564,7 +564,9 @@ mod tests {
|
||||
let task_scheduled = Arc::new(AtomicI32::new(0));
|
||||
let task_scheduled_cloned = task_scheduled.clone();
|
||||
|
||||
common_runtime::spawn_write(async move {
|
||||
let scheduling = Arc::new(AtomicBool::new(true));
|
||||
let scheduling_clone = scheduling.clone();
|
||||
let handle = common_runtime::spawn_write(async move {
|
||||
for i in 0..10000 {
|
||||
if let Ok(res) = scheduler_cloned.schedule(MockRequest {
|
||||
region_id: i as RegionId,
|
||||
@@ -573,12 +575,19 @@ mod tests {
|
||||
task_scheduled_cloned.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
if !scheduling_clone.load(Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(1)).await;
|
||||
scheduler.stop(true).await.unwrap();
|
||||
scheduling.store(false, Ordering::Relaxed);
|
||||
|
||||
let finished = finished.load(Ordering::Relaxed);
|
||||
handle.await.unwrap();
|
||||
|
||||
assert_eq!(finished, task_scheduled.load(Ordering::Relaxed));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -315,13 +315,14 @@ mod tests {
|
||||
async fn test_alter_table_procedure_rename() {
|
||||
let env = TestEnv::new("rename");
|
||||
let table_name = "test_old";
|
||||
env.create_table(table_name).await;
|
||||
let table_id = env.create_table(table_name).await;
|
||||
|
||||
let new_table_name = "test_new";
|
||||
let request = AlterTableRequest {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: table_name.to_string(),
|
||||
table_id,
|
||||
alter_kind: AlterKind::RenameTable {
|
||||
new_table_name: new_table_name.to_string(),
|
||||
},
|
||||
|
||||
@@ -349,14 +349,9 @@ mod tests {
|
||||
table_engine.clone(),
|
||||
);
|
||||
|
||||
let table_ref = TableReference {
|
||||
catalog: &request.catalog_name,
|
||||
schema: &request.schema_name,
|
||||
table: &request.table_name,
|
||||
};
|
||||
let engine_ctx = EngineContext::default();
|
||||
assert!(table_engine
|
||||
.get_table(&engine_ctx, &table_ref)
|
||||
.get_table(&engine_ctx, request.id)
|
||||
.unwrap()
|
||||
.is_none());
|
||||
|
||||
@@ -365,7 +360,7 @@ mod tests {
|
||||
watcher.changed().await.unwrap();
|
||||
|
||||
assert!(table_engine
|
||||
.get_table(&engine_ctx, &table_ref)
|
||||
.get_table(&engine_ctx, request.id)
|
||||
.unwrap()
|
||||
.is_some());
|
||||
}
|
||||
@@ -390,14 +385,9 @@ mod tests {
|
||||
table_engine.clone(),
|
||||
);
|
||||
|
||||
let table_ref = TableReference {
|
||||
catalog: &request.catalog_name,
|
||||
schema: &request.schema_name,
|
||||
table: &request.table_name,
|
||||
};
|
||||
let engine_ctx = EngineContext::default();
|
||||
assert!(table_engine
|
||||
.get_table(&engine_ctx, &table_ref)
|
||||
.get_table(&engine_ctx, request.id)
|
||||
.unwrap()
|
||||
.is_none());
|
||||
|
||||
@@ -452,7 +442,7 @@ mod tests {
|
||||
|
||||
// The table is created.
|
||||
assert!(table_engine
|
||||
.get_table(&engine_ctx, &table_ref)
|
||||
.get_table(&engine_ctx, request.id)
|
||||
.unwrap()
|
||||
.is_some());
|
||||
}
|
||||
|
||||
@@ -277,12 +277,13 @@ mod tests {
|
||||
async fn test_drop_table_procedure() {
|
||||
let env = TestEnv::new("drop");
|
||||
let table_name = "test_drop";
|
||||
env.create_table(table_name).await;
|
||||
let table_id = env.create_table(table_name).await;
|
||||
|
||||
let request = DropTableRequest {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: table_name.to_string(),
|
||||
table_id,
|
||||
};
|
||||
let TestEnv {
|
||||
dir: _dir,
|
||||
@@ -305,13 +306,6 @@ mod tests {
|
||||
let schema = catalog.schema(DEFAULT_SCHEMA_NAME).await.unwrap().unwrap();
|
||||
assert!(schema.table(table_name).await.unwrap().is_none());
|
||||
let ctx = EngineContext::default();
|
||||
assert!(!table_engine.table_exists(
|
||||
&ctx,
|
||||
&TableReference {
|
||||
catalog: DEFAULT_CATALOG_NAME,
|
||||
schema: DEFAULT_SCHEMA_NAME,
|
||||
table: table_name,
|
||||
}
|
||||
));
|
||||
assert!(!table_engine.table_exists(&ctx, table_id,));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -32,6 +32,7 @@ use object_store::ObjectStore;
|
||||
use storage::compaction::noop::NoopCompactionScheduler;
|
||||
use storage::config::EngineConfig as StorageEngineConfig;
|
||||
use storage::EngineImpl;
|
||||
use table::metadata::TableId;
|
||||
use table::requests::CreateTableRequest;
|
||||
|
||||
use crate::CreateTableProcedure;
|
||||
@@ -92,8 +93,9 @@ impl TestEnv {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn create_table(&self, table_name: &str) {
|
||||
pub async fn create_table(&self, table_name: &str) -> TableId {
|
||||
let request = new_create_request(table_name);
|
||||
let table_id = request.id;
|
||||
let procedure = CreateTableProcedure::new(
|
||||
request,
|
||||
self.catalog_manager.clone(),
|
||||
@@ -108,6 +110,8 @@ impl TestEnv {
|
||||
.await
|
||||
.unwrap();
|
||||
watcher.changed().await.unwrap();
|
||||
|
||||
table_id
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -108,14 +108,10 @@ pub trait TableEngine: Send + Sync {
|
||||
) -> Result<TableRef>;
|
||||
|
||||
/// Returns the table by it's name.
|
||||
fn get_table(
|
||||
&self,
|
||||
ctx: &EngineContext,
|
||||
table_ref: &TableReference,
|
||||
) -> Result<Option<TableRef>>;
|
||||
fn get_table(&self, ctx: &EngineContext, table_id: TableId) -> Result<Option<TableRef>>;
|
||||
|
||||
/// Returns true when the given table is exists.
|
||||
fn table_exists(&self, ctx: &EngineContext, table_ref: &TableReference) -> bool;
|
||||
fn table_exists(&self, ctx: &EngineContext, table_id: TableId) -> bool;
|
||||
|
||||
/// Drops the given table. Return true if the table is dropped, or false if the table doesn't exist.
|
||||
async fn drop_table(&self, ctx: &EngineContext, request: DropTableRequest) -> Result<bool>;
|
||||
|
||||
@@ -162,6 +162,7 @@ pub struct AlterTableRequest {
|
||||
pub catalog_name: String,
|
||||
pub schema_name: String,
|
||||
pub table_name: String,
|
||||
pub table_id: TableId,
|
||||
pub alter_kind: AlterKind,
|
||||
}
|
||||
|
||||
@@ -200,6 +201,7 @@ pub struct DropTableRequest {
|
||||
pub catalog_name: String,
|
||||
pub schema_name: String,
|
||||
pub table_name: String,
|
||||
pub table_id: TableId,
|
||||
}
|
||||
|
||||
impl DropTableRequest {
|
||||
@@ -218,6 +220,7 @@ pub struct CloseTableRequest {
|
||||
pub catalog_name: String,
|
||||
pub schema_name: String,
|
||||
pub table_name: String,
|
||||
pub table_id: TableId,
|
||||
/// Do nothing if region_numbers is empty
|
||||
pub region_numbers: Vec<RegionNumber>,
|
||||
/// flush regions
|
||||
|
||||
@@ -14,15 +14,20 @@
|
||||
|
||||
use std::any::Any;
|
||||
use std::fmt::{Debug, Formatter};
|
||||
use std::pin::Pin;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use common_query::error as query_error;
|
||||
use common_query::error::Result as QueryResult;
|
||||
use common_query::physical_plan::{Partitioning, PhysicalPlan, PhysicalPlanRef};
|
||||
use common_recordbatch::SendableRecordBatchStream;
|
||||
use common_recordbatch::error::Result as RecordBatchResult;
|
||||
use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream};
|
||||
use datafusion::execution::context::TaskContext;
|
||||
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
|
||||
use datafusion_physical_expr::PhysicalSortExpr;
|
||||
use datatypes::schema::SchemaRef;
|
||||
use futures::{Stream, StreamExt};
|
||||
use snafu::OptionExt;
|
||||
|
||||
/// Adapt greptime's [SendableRecordBatchStream] to DataFusion's [PhysicalPlan].
|
||||
@@ -30,6 +35,7 @@ pub struct StreamScanAdapter {
|
||||
stream: Mutex<Option<SendableRecordBatchStream>>,
|
||||
schema: SchemaRef,
|
||||
output_ordering: Option<Vec<PhysicalSortExpr>>,
|
||||
metric: ExecutionPlanMetricsSet,
|
||||
}
|
||||
|
||||
impl Debug for StreamScanAdapter {
|
||||
@@ -49,6 +55,7 @@ impl StreamScanAdapter {
|
||||
stream: Mutex::new(Some(stream)),
|
||||
schema,
|
||||
output_ordering: None,
|
||||
metric: ExecutionPlanMetricsSet::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -85,11 +92,46 @@ impl PhysicalPlan for StreamScanAdapter {
|
||||
|
||||
fn execute(
|
||||
&self,
|
||||
_partition: usize,
|
||||
partition: usize,
|
||||
_context: Arc<TaskContext>,
|
||||
) -> QueryResult<SendableRecordBatchStream> {
|
||||
let mut stream = self.stream.lock().unwrap();
|
||||
stream.take().context(query_error::ExecuteRepeatedlySnafu)
|
||||
let stream = stream.take().context(query_error::ExecuteRepeatedlySnafu)?;
|
||||
let baseline_metric = BaselineMetrics::new(&self.metric, partition);
|
||||
Ok(Box::pin(StreamWithMetricWrapper {
|
||||
stream,
|
||||
metric: baseline_metric,
|
||||
}))
|
||||
}
|
||||
|
||||
fn metrics(&self) -> Option<MetricsSet> {
|
||||
Some(self.metric.clone_inner())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct StreamWithMetricWrapper {
|
||||
stream: SendableRecordBatchStream,
|
||||
metric: BaselineMetrics,
|
||||
}
|
||||
|
||||
impl Stream for StreamWithMetricWrapper {
|
||||
type Item = RecordBatchResult<RecordBatch>;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let this = self.get_mut();
|
||||
let _timer = this.metric.elapsed_compute().timer();
|
||||
let poll = this.stream.poll_next_unpin(cx);
|
||||
if let Poll::Ready(Option::Some(Result::Ok(record_batch))) = &poll {
|
||||
this.metric.record_output(record_batch.num_rows());
|
||||
}
|
||||
|
||||
poll
|
||||
}
|
||||
}
|
||||
|
||||
impl RecordBatchStream for StreamWithMetricWrapper {
|
||||
fn schema(&self) -> SchemaRef {
|
||||
self.stream.schema()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -19,7 +19,8 @@ use async_trait::async_trait;
|
||||
use common_procedure::BoxedProcedure;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use crate::engine::{EngineContext, TableEngine, TableEngineProcedure, TableReference};
|
||||
use crate::engine::{EngineContext, TableEngine, TableEngineProcedure};
|
||||
use crate::metadata::TableId;
|
||||
use crate::requests::{AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest};
|
||||
use crate::test_util::EmptyTable;
|
||||
use crate::{Result, TableRef};
|
||||
@@ -86,11 +87,11 @@ impl TableEngine for MockTableEngine {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn get_table(&self, _ctx: &EngineContext, _ref: &TableReference) -> Result<Option<TableRef>> {
|
||||
fn get_table(&self, _ctx: &EngineContext, _table_id: TableId) -> Result<Option<TableRef>> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn table_exists(&self, _ctx: &EngineContext, _name: &TableReference) -> bool {
|
||||
fn table_exists(&self, _ctx: &EngineContext, _table_id: TableId) -> bool {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
|
||||
@@ -1227,6 +1227,45 @@ async fn test_execute_copy_from_s3(instance: Arc<dyn MockInstance>) {
|
||||
}
|
||||
}
|
||||
|
||||
#[apply(both_instances_cases)]
|
||||
async fn test_execute_copy_from_orc(instance: Arc<dyn MockInstance>) {
|
||||
logging::init_default_ut_logging();
|
||||
let instance = instance.frontend();
|
||||
|
||||
// setups
|
||||
execute_sql(
|
||||
&instance,
|
||||
"create table demo(double_a double, a float, b boolean, str_direct string, d string, e string, f string, int_short_repeated int, int_neg_short_repeated int, int_delta int, int_neg_delta int, int_direct int, int_neg_direct int, bigint_direct bigint, bigint_neg_direct bigint, bigint_other bigint, utf8_increase string, utf8_decrease string, timestamp_simple timestamp(9) time index, date_simple date);",
|
||||
)
|
||||
.await;
|
||||
|
||||
let filepath = get_data_dir("../src/common/datasource/tests/orc/test.orc")
|
||||
.canonicalize()
|
||||
.unwrap()
|
||||
.display()
|
||||
.to_string();
|
||||
|
||||
let output = execute_sql(
|
||||
&instance,
|
||||
&format!("copy demo from '{}' WITH(FORMAT='orc');", &filepath),
|
||||
)
|
||||
.await;
|
||||
|
||||
assert!(matches!(output, Output::AffectedRows(5)));
|
||||
|
||||
let output = execute_sql(&instance, "select * from demo order by double_a;").await;
|
||||
let expected = r#"+----------+-----+-------+------------+-----+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+----------------------------+-------------+
|
||||
| double_a | a | b | str_direct | d | e | f | int_short_repeated | int_neg_short_repeated | int_delta | int_neg_delta | int_direct | int_neg_direct | bigint_direct | bigint_neg_direct | bigint_other | utf8_increase | utf8_decrease | timestamp_simple | date_simple |
|
||||
+----------+-----+-------+------------+-----+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+----------------------------+-------------+
|
||||
| 1.0 | 1.0 | true | a | a | ddd | aaaaa | 5 | -5 | 1 | 5 | 1 | -1 | 1 | -1 | 5 | a | eeeee | 2023-04-01T20:15:30.002 | 2023-04-01 |
|
||||
| 2.0 | 2.0 | false | cccccc | bb | cc | bbbbb | 5 | -5 | 2 | 4 | 6 | -6 | 6 | -6 | -5 | bb | dddd | 2021-08-22T07:26:44.525777 | 2023-03-01 |
|
||||
| 3.0 | | | | | | | | | | | | | | | 1 | ccc | ccc | 2023-01-01T00:00:00 | 2023-01-01 |
|
||||
| 4.0 | 4.0 | true | ddd | ccc | bb | ccccc | 5 | -5 | 4 | 2 | 3 | -3 | 3 | -3 | 5 | dddd | bb | 2023-02-01T00:00:00 | 2023-02-01 |
|
||||
| 5.0 | 5.0 | false | ee | ddd | a | ddddd | 5 | -5 | 5 | 1 | 2 | -2 | 2 | -2 | 5 | eeeee | a | 2023-03-01T00:00:00 | 2023-03-01 |
|
||||
+----------+-----+-------+------------+-----+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+----------------------------+-------------+"#;
|
||||
check_output_stream(output, expected).await;
|
||||
}
|
||||
|
||||
#[apply(both_instances_cases)]
|
||||
async fn test_cast_type_issue_1594(instance: Arc<dyn MockInstance>) {
|
||||
let instance = instance.frontend();
|
||||
|
||||
Reference in New Issue
Block a user