Compare commits

...

7 Commits

Author SHA1 Message Date
Eugene Tolbakov
964d26e415 fix: docker build for aarch64 (#1826) 2023-06-25 18:29:00 +09:00
Yingwen
fd412b7b07 refactor!: Uses table id to locate tables in table engines (#1817)
* refactor: add table_id to get_table()/table_exists()

* refactor: Add table_id to alter table request

* refactor: Add table id to DropTableRequest

* refactor: add table id to DropTableRequest

* refactor: Use table id as key for the tables map

* refactor: use table id as file engine's map key

* refactor: Remove table reference from engine's get_table/table_exists

* style: remove unused imports

* feat!: Add table id to TableRegionalValue

* style: fix cilppy

* chore: add comments and logs
2023-06-25 15:05:20 +08:00
Weny Xu
223cf31409 feat: support to copy from orc format (#1814)
* feat: support to copy from orc format

* test: add copy from orc test

* chore: add license header

* refactor: remove unimplemented macro

* chore: apply suggestions from CR

* chore: bump orc-rust to 0.2.3
2023-06-25 14:07:16 +08:00
Ruihang Xia
62f660e439 feat: implement metrics for Scan plan (#1812)
* add metrics in some interfaces

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* calc elapsed time and rows

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2023-06-25 14:06:50 +08:00
Lei, HUANG
0fb18245b8 fix: docker build (#1822) 2023-06-25 11:05:46 +08:00
Weny Xu
caed6879e6 refactor: remove redundant code (#1821) 2023-06-25 10:56:31 +08:00
Yingwen
5ab0747092 test(storage): wait task before checking scheduled task num (#1811) 2023-06-21 18:04:34 +08:00
50 changed files with 773 additions and 471 deletions

View File

@@ -20,6 +20,3 @@ out/
# Rust
target/
# Git
.git

2
.gitignore vendored
View File

@@ -44,3 +44,5 @@ benchmarks/data
# Vscode workspace
*.code-workspace
venv/

37
Cargo.lock generated
View File

@@ -1679,6 +1679,7 @@ dependencies = [
"derive_builder 0.12.0",
"futures",
"object-store",
"orc-rust",
"paste",
"regex",
"snafu",
@@ -3069,6 +3070,12 @@ version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7"
[[package]]
name = "fallible-streaming-iterator"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a"
[[package]]
name = "fastrand"
version = "1.9.0"
@@ -5983,6 +5990,27 @@ version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "978aa494585d3ca4ad74929863093e87cac9790d81fe7aba2b3dc2890643a0fc"
[[package]]
name = "orc-rust"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e15d3f67795da54d9526e46b7808181ce6236d518f56ca1ee556d3a3fdd77c66"
dependencies = [
"arrow",
"bytes",
"chrono",
"fallible-streaming-iterator",
"flate2",
"futures",
"futures-util",
"lazy_static",
"paste",
"prost",
"snafu",
"tokio",
"zigzag",
]
[[package]]
name = "ordered-float"
version = "1.1.1"
@@ -11214,6 +11242,15 @@ version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a0956f1ba7c7909bfb66c2e9e4124ab6f6482560f6628b5aaeba39207c9aad9"
[[package]]
name = "zigzag"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70b40401a28d86ce16a330b863b86fd7dbee4d7c940587ab09ab8c019f9e3fdf"
dependencies = [
"num-traits",
]
[[package]]
name = "zstd"
version = "0.11.2+zstd.1.5.2"

View File

@@ -8,6 +8,7 @@ RUN apt-get update && apt-get install -y \
libssl-dev \
protobuf-compiler \
curl \
git \
build-essential \
pkg-config \
python3 \

View File

@@ -8,6 +8,7 @@ RUN apt-get update && apt-get install -y \
libssl-dev \
protobuf-compiler \
curl \
git \
build-essential \
pkg-config \
wget

View File

@@ -201,6 +201,9 @@ impl TableRegionalKey {
/// region ids allocated by metasrv.
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct TableRegionalValue {
// We can remove the `Option` from the table id once all regional values
// stored in meta have table ids.
pub table_id: Option<TableId>,
pub version: TableVersion,
pub regions_ids: Vec<u32>,
pub engine_name: Option<String>,

View File

@@ -984,21 +984,29 @@ impl SchemaProvider for RemoteSchemaProvider {
.get(key.as_bytes())
.await?
.map(|Kv(_, v)| {
let TableRegionalValue { engine_name, .. } =
TableRegionalValue::parse(String::from_utf8_lossy(&v))
.context(InvalidCatalogValueSnafu)?;
let reference = TableReference {
catalog: &self.catalog_name,
schema: &self.schema_name,
table: name,
let TableRegionalValue {
table_id,
engine_name,
..
} = TableRegionalValue::parse(String::from_utf8_lossy(&v))
.context(InvalidCatalogValueSnafu)?;
let Some(table_id) = table_id else {
warn!("Cannot find table id for {key}, the value has an old format");
return Ok(None);
};
let engine_name = engine_name.as_deref().unwrap_or(MITO_ENGINE);
let engine = self
.engine_manager
.engine(engine_name)
.context(TableEngineNotFoundSnafu { engine_name })?;
let reference = TableReference {
catalog: &self.catalog_name,
schema: &self.schema_name,
table: name,
};
let table = engine
.get_table(&EngineContext {}, &reference)
.get_table(&EngineContext {}, table_id)
.with_context(|_| OpenTableSnafu {
table_info: reference.to_string(),
})?;
@@ -1011,9 +1019,12 @@ impl SchemaProvider for RemoteSchemaProvider {
}
async fn register_table(&self, name: String, table: TableRef) -> Result<Option<TableRef>> {
// Currently, initiate_tables() always call this method to register the table to the schema thus we
// always update the region value.
let table_info = table.table_info();
let table_version = table_info.ident.version;
let table_value = TableRegionalValue {
table_id: Some(table_info.ident.table_id),
version: table_version,
regions_ids: table.table_info().meta.region_numbers.clone(),
engine_name: Some(table_info.meta.engine.clone()),
@@ -1061,25 +1072,27 @@ impl SchemaProvider for RemoteSchemaProvider {
.get(table_key.as_bytes())
.await?
.map(|Kv(_, v)| {
let TableRegionalValue { engine_name, .. } =
TableRegionalValue::parse(String::from_utf8_lossy(&v))
.context(InvalidCatalogValueSnafu)?;
Ok(engine_name)
let TableRegionalValue {
table_id,
engine_name,
..
} = TableRegionalValue::parse(String::from_utf8_lossy(&v))
.context(InvalidCatalogValueSnafu)?;
Ok(engine_name.and_then(|name| table_id.map(|id| (name, id))))
})
.transpose()?
.flatten();
let engine_name = engine_opt.as_deref().unwrap_or_else(|| {
warn!("Cannot find table engine name for {table_key}");
MITO_ENGINE
});
self.backend.delete(table_key.as_bytes()).await?;
debug!(
"Successfully deleted catalog table entry, key: {}",
table_key
);
let Some((engine_name, table_id)) = engine_opt else {
warn!("Cannot find table id and engine name for {table_key}");
return Ok(None);
};
let reference = TableReference {
catalog: &self.catalog_name,
schema: &self.schema_name,
@@ -1088,9 +1101,9 @@ impl SchemaProvider for RemoteSchemaProvider {
// deregistering table does not necessarily mean dropping the table
let table = self
.engine_manager
.engine(engine_name)
.engine(&engine_name)
.context(TableEngineNotFoundSnafu { engine_name })?
.get_table(&EngineContext {}, &reference)
.get_table(&EngineContext {}, table_id)
.with_context(|_| OpenTableSnafu {
table_info: reference.to_string(),
})?;

View File

@@ -16,8 +16,7 @@ use std::any::Any;
use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, HashMap};
use std::fmt::{Display, Formatter};
use std::str::FromStr;
use std::sync::Arc;
use std::sync::{Arc, RwLock as StdRwLock};
use async_stream::stream;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
@@ -27,7 +26,7 @@ use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::vectors::StringVector;
use serde::Serializer;
use table::engine::{CloseTableResult, EngineContext, TableEngine, TableReference};
use table::engine::{CloseTableResult, EngineContext, TableEngine};
use table::metadata::TableId;
use table::requests::{
AlterTableRequest, CloseTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest,
@@ -167,7 +166,7 @@ impl KvBackend for MockKvBackend {
#[derive(Default)]
pub struct MockTableEngine {
tables: RwLock<HashMap<String, TableRef>>,
tables: StdRwLock<HashMap<TableId, TableRef>>,
}
#[async_trait::async_trait]
@@ -182,21 +181,8 @@ impl TableEngine for MockTableEngine {
_ctx: &EngineContext,
request: CreateTableRequest,
) -> table::Result<TableRef> {
let table_name = request.table_name.clone();
let catalog_name = request.catalog_name.clone();
let schema_name = request.schema_name.clone();
let table_full_name =
TableReference::full(&catalog_name, &schema_name, &table_name).to_string();
let table_id = request.id;
let default_table_id = "0".to_owned();
let table_id = TableId::from_str(
request
.table_options
.extra_options
.get("table_id")
.unwrap_or(&default_table_id),
)
.unwrap();
let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
"name",
ConcreteDataType::string_datatype(),
@@ -206,16 +192,16 @@ impl TableEngine for MockTableEngine {
let data = vec![Arc::new(StringVector::from(vec!["a", "b", "c"])) as _];
let record_batch = RecordBatch::new(schema, data).unwrap();
let table: TableRef = Arc::new(MemTable::new_with_catalog(
&table_name,
&request.table_name,
record_batch,
table_id,
catalog_name,
schema_name,
request.catalog_name,
request.schema_name,
vec![0],
)) as Arc<_>;
let mut tables = self.tables.write().await;
tables.insert(table_full_name, table.clone() as TableRef);
let mut tables = self.tables.write().unwrap();
tables.insert(table_id, table.clone() as TableRef);
Ok(table)
}
@@ -224,7 +210,7 @@ impl TableEngine for MockTableEngine {
_ctx: &EngineContext,
request: OpenTableRequest,
) -> table::Result<Option<TableRef>> {
Ok(self.tables.read().await.get(&request.table_name).cloned())
Ok(self.tables.read().unwrap().get(&request.table_id).cloned())
}
async fn alter_table(
@@ -238,25 +224,13 @@ impl TableEngine for MockTableEngine {
fn get_table(
&self,
_ctx: &EngineContext,
table_ref: &TableReference,
table_id: TableId,
) -> table::Result<Option<TableRef>> {
futures::executor::block_on(async {
Ok(self
.tables
.read()
.await
.get(&table_ref.to_string())
.cloned())
})
Ok(self.tables.read().unwrap().get(&table_id).cloned())
}
fn table_exists(&self, _ctx: &EngineContext, table_ref: &TableReference) -> bool {
futures::executor::block_on(async {
self.tables
.read()
.await
.contains_key(&table_ref.to_string())
})
fn table_exists(&self, _ctx: &EngineContext, table_id: TableId) -> bool {
self.tables.read().unwrap().contains_key(&table_id)
}
async fn drop_table(
@@ -272,11 +246,7 @@ impl TableEngine for MockTableEngine {
_ctx: &EngineContext,
request: CloseTableRequest,
) -> table::Result<CloseTableResult> {
let _ = self
.tables
.write()
.await
.remove(&request.table_ref().to_string());
let _ = self.tables.write().unwrap().remove(&request.table_id);
Ok(CloseTableResult::Released(vec![]))
}

View File

@@ -475,6 +475,7 @@ impl CountdownTask {
catalog_name: table_ident.catalog.clone(),
schema_name: table_ident.schema.clone(),
table_name: table_ident.table.clone(),
table_id: table_ident.table_id,
region_numbers: vec![region],
flush: true,
};
@@ -499,7 +500,7 @@ mod test {
use common_meta::heartbeat::mailbox::HeartbeatMailbox;
use datatypes::schema::RawSchema;
use table::engine::manager::MemoryTableEngineManager;
use table::engine::{TableEngine, TableReference};
use table::engine::TableEngine;
use table::requests::{CreateTableRequest, TableOptions};
use table::test_util::EmptyTable;
@@ -751,8 +752,9 @@ mod test {
let catalog = "my_catalog";
let schema = "my_schema";
let table = "my_table";
let table_id = 1;
let request = CreateTableRequest {
id: 1,
id: table_id,
catalog_name: catalog.to_string(),
schema_name: schema.to_string(),
table_name: table.to_string(),
@@ -768,7 +770,6 @@ mod test {
table_options: TableOptions::default(),
engine: "mito".to_string(),
};
let table_ref = TableReference::full(catalog, schema, table);
let table_engine = Arc::new(MockTableEngine::default());
table_engine.create_table(ctx, request).await.unwrap();
@@ -777,7 +778,7 @@ mod test {
catalog: catalog.to_string(),
schema: schema.to_string(),
table: table.to_string(),
table_id: 1024,
table_id,
engine: "mito".to_string(),
};
let (tx, rx) = mpsc::channel(10);
@@ -813,9 +814,9 @@ mod test {
.unwrap();
// assert the table is closed after deadline is reached
assert!(table_engine.table_exists(ctx, &table_ref));
assert!(table_engine.table_exists(ctx, table_id));
// spare 500ms for the task to close the table
tokio::time::sleep(Duration::from_millis(2000)).await;
assert!(!table_engine.table_exists(ctx, &table_ref));
assert!(!table_engine.table_exists(ctx, table_id));
}
}

View File

@@ -24,6 +24,7 @@ datafusion.workspace = true
derive_builder = "0.12"
futures.workspace = true
object-store = { path = "../../object-store" }
orc-rust = "0.2.3"
regex = "1.7"
snafu.workspace = true
tokio.workspace = true

View File

@@ -54,6 +54,12 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to build orc reader, source: {}", source))]
OrcReader {
location: Location,
source: orc_rust::error::Error,
},
#[snafu(display("Failed to read object from path: {}, source: {}", path, source))]
ReadObject {
path: String,
@@ -171,7 +177,8 @@ impl ErrorExt for Error {
| ReadRecordBatch { .. }
| WriteRecordBatch { .. }
| EncodeRecordBatch { .. }
| BufferedWriterClosed { .. } => StatusCode::Unexpected,
| BufferedWriterClosed { .. }
| OrcReader { .. } => StatusCode::Unexpected,
}
}
@@ -182,6 +189,7 @@ impl ErrorExt for Error {
fn location_opt(&self) -> Option<common_error::snafu::Location> {
use Error::*;
match self {
OrcReader { location, .. } => Some(*location),
BuildBackend { location, .. } => Some(*location),
ReadObject { location, .. } => Some(*location),
ListObjects { location, .. } => Some(*location),

View File

@@ -14,6 +14,7 @@
pub mod csv;
pub mod json;
pub mod orc;
pub mod parquet;
#[cfg(test)]
pub mod tests;
@@ -38,6 +39,7 @@ use snafu::ResultExt;
use self::csv::CsvFormat;
use self::json::JsonFormat;
use self::orc::OrcFormat;
use self::parquet::ParquetFormat;
use crate::buffered_writer::{DfRecordBatchEncoder, LazyBufferedWriter};
use crate::compression::CompressionType;
@@ -56,6 +58,7 @@ pub enum Format {
Csv(CsvFormat),
Json(JsonFormat),
Parquet(ParquetFormat),
Orc(OrcFormat),
}
impl Format {
@@ -64,6 +67,7 @@ impl Format {
Format::Csv(_) => ".csv",
Format::Json(_) => ".json",
Format::Parquet(_) => ".parquet",
&Format::Orc(_) => ".orc",
}
}
}
@@ -81,6 +85,7 @@ impl TryFrom<&HashMap<String, String>> for Format {
"CSV" => Ok(Self::Csv(CsvFormat::try_from(options)?)),
"JSON" => Ok(Self::Json(JsonFormat::try_from(options)?)),
"PARQUET" => Ok(Self::Parquet(ParquetFormat::default())),
"ORC" => Ok(Self::Orc(OrcFormat)),
_ => error::UnsupportedFormatSnafu { format: &format }.fail(),
}
}

View File

@@ -0,0 +1,102 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::pin::Pin;
use std::task::{Context, Poll};
use arrow_schema::{Schema, SchemaRef};
use async_trait::async_trait;
use datafusion::arrow::record_batch::RecordBatch as DfRecordBatch;
use datafusion::error::{DataFusionError, Result as DfResult};
use datafusion::physical_plan::RecordBatchStream;
use futures::Stream;
use object_store::ObjectStore;
use orc_rust::arrow_reader::{create_arrow_schema, Cursor};
use orc_rust::async_arrow_reader::ArrowStreamReader;
pub use orc_rust::error::Error as OrcError;
use orc_rust::reader::Reader;
use snafu::ResultExt;
use tokio::io::{AsyncRead, AsyncSeek};
use crate::error::{self, Result};
use crate::file_format::FileFormat;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub struct OrcFormat;
pub async fn new_orc_cursor<R: AsyncRead + AsyncSeek + Unpin + Send + 'static>(
reader: R,
) -> Result<Cursor<R>> {
let reader = Reader::new_async(reader)
.await
.context(error::OrcReaderSnafu)?;
let cursor = Cursor::root(reader).context(error::OrcReaderSnafu)?;
Ok(cursor)
}
pub async fn new_orc_stream_reader<R: AsyncRead + AsyncSeek + Unpin + Send + 'static>(
reader: R,
) -> Result<ArrowStreamReader<R>> {
let cursor = new_orc_cursor(reader).await?;
Ok(ArrowStreamReader::new(cursor, None))
}
pub async fn infer_orc_schema<R: AsyncRead + AsyncSeek + Unpin + Send + 'static>(
reader: R,
) -> Result<Schema> {
let cursor = new_orc_cursor(reader).await?;
Ok(create_arrow_schema(&cursor))
}
pub struct OrcArrowStreamReaderAdapter<T: AsyncRead + AsyncSeek + Unpin + Send + 'static> {
stream: ArrowStreamReader<T>,
}
impl<T: AsyncRead + AsyncSeek + Unpin + Send + 'static> OrcArrowStreamReaderAdapter<T> {
pub fn new(stream: ArrowStreamReader<T>) -> Self {
Self { stream }
}
}
impl<T: AsyncRead + AsyncSeek + Unpin + Send + 'static> RecordBatchStream
for OrcArrowStreamReaderAdapter<T>
{
fn schema(&self) -> SchemaRef {
self.stream.schema()
}
}
impl<T: AsyncRead + AsyncSeek + Unpin + Send + 'static> Stream for OrcArrowStreamReaderAdapter<T> {
type Item = DfResult<DfRecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let batch = futures::ready!(Pin::new(&mut self.stream).poll_next(cx))
.map(|r| r.map_err(|e| DataFusionError::External(Box::new(e))));
Poll::Ready(batch)
}
}
#[async_trait]
impl FileFormat for OrcFormat {
async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> {
let reader = store
.reader(path)
.await
.context(error::ReadObjectSnafu { path })?;
let schema = infer_orc_schema(reader).await?;
Ok(schema)
}
}

View File

@@ -0,0 +1,11 @@
## Generate orc data
```bash
python3 -m venv venv
venv/bin/pip install -U pip
venv/bin/pip install -U pyorc
./venv/bin/python write.py
cargo test
```

Binary file not shown.

View File

@@ -0,0 +1,103 @@
# Copyright 2023 Greptime Team
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import random
import datetime
import pyorc
data = {
"double_a": [1.0, 2.0, 3.0, 4.0, 5.0],
"a": [1.0, 2.0, None, 4.0, 5.0],
"b": [True, False, None, True, False],
"str_direct": ["a", "cccccc", None, "ddd", "ee"],
"d": ["a", "bb", None, "ccc", "ddd"],
"e": ["ddd", "cc", None, "bb", "a"],
"f": ["aaaaa", "bbbbb", None, "ccccc", "ddddd"],
"int_short_repeated": [5, 5, None, 5, 5],
"int_neg_short_repeated": [-5, -5, None, -5, -5],
"int_delta": [1, 2, None, 4, 5],
"int_neg_delta": [5, 4, None, 2, 1],
"int_direct": [1, 6, None, 3, 2],
"int_neg_direct": [-1, -6, None, -3, -2],
"bigint_direct": [1, 6, None, 3, 2],
"bigint_neg_direct": [-1, -6, None, -3, -2],
"bigint_other": [5, -5, 1, 5, 5],
"utf8_increase": ["a", "bb", "ccc", "dddd", "eeeee"],
"utf8_decrease": ["eeeee", "dddd", "ccc", "bb", "a"],
"timestamp_simple": [datetime.datetime(2023, 4, 1, 20, 15, 30, 2000), datetime.datetime.fromtimestamp(int('1629617204525777000')/1000000000), datetime.datetime(2023, 1, 1), datetime.datetime(2023, 2, 1), datetime.datetime(2023, 3, 1)],
"date_simple": [datetime.date(2023, 4, 1), datetime.date(2023, 3, 1), datetime.date(2023, 1, 1), datetime.date(2023, 2, 1), datetime.date(2023, 3, 1)]
}
def infer_schema(data):
schema = "struct<"
for key, value in data.items():
dt = type(value[0])
if dt == float:
dt = "float"
elif dt == int:
dt = "int"
elif dt == bool:
dt = "boolean"
elif dt == str:
dt = "string"
elif key.startswith("timestamp"):
dt = "timestamp"
elif key.startswith("date"):
dt = "date"
else:
print(key,value,dt)
raise NotImplementedError
if key.startswith("double"):
dt = "double"
if key.startswith("bigint"):
dt = "bigint"
schema += key + ":" + dt + ","
schema = schema[:-1] + ">"
return schema
def _write(
schema: str,
data,
file_name: str,
compression=pyorc.CompressionKind.NONE,
dict_key_size_threshold=0.0,
):
output = open(file_name, "wb")
writer = pyorc.Writer(
output,
schema,
dict_key_size_threshold=dict_key_size_threshold,
# use a small number to ensure that compression crosses value boundaries
compression_block_size=32,
compression=compression,
)
num_rows = len(list(data.values())[0])
for x in range(num_rows):
row = tuple(values[x] for values in data.values())
writer.write(row)
writer.close()
with open(file_name, "rb") as f:
reader = pyorc.Reader(f)
list(reader)
_write(
infer_schema(data),
data,
"test.orc",
)

View File

@@ -34,7 +34,7 @@ const LOCATION_TYPE_FIRST: i32 = LocationType::First as i32;
const LOCATION_TYPE_AFTER: i32 = LocationType::After as i32;
/// Convert an [`AlterExpr`] to an [`AlterTableRequest`]
pub fn alter_expr_to_request(expr: AlterExpr) -> Result<AlterTableRequest> {
pub fn alter_expr_to_request(table_id: TableId, expr: AlterExpr) -> Result<AlterTableRequest> {
let catalog_name = expr.catalog_name;
let schema_name = expr.schema_name;
let kind = expr.kind.context(MissingFieldSnafu { field: "kind" })?;
@@ -69,6 +69,7 @@ pub fn alter_expr_to_request(expr: AlterExpr) -> Result<AlterTableRequest> {
catalog_name,
schema_name,
table_name: expr.table_name,
table_id,
alter_kind,
};
Ok(request)
@@ -82,6 +83,7 @@ pub fn alter_expr_to_request(expr: AlterExpr) -> Result<AlterTableRequest> {
catalog_name,
schema_name,
table_name: expr.table_name,
table_id,
alter_kind,
};
Ok(request)
@@ -92,6 +94,7 @@ pub fn alter_expr_to_request(expr: AlterExpr) -> Result<AlterTableRequest> {
catalog_name,
schema_name,
table_name: expr.table_name,
table_id,
alter_kind,
};
Ok(request)
@@ -239,7 +242,7 @@ mod tests {
})),
};
let alter_request = alter_expr_to_request(expr).unwrap();
let alter_request = alter_expr_to_request(1, expr).unwrap();
assert_eq!(alter_request.catalog_name, "");
assert_eq!(alter_request.schema_name, "");
assert_eq!("monitor".to_string(), alter_request.table_name);
@@ -296,7 +299,7 @@ mod tests {
})),
};
let alter_request = alter_expr_to_request(expr).unwrap();
let alter_request = alter_expr_to_request(1, expr).unwrap();
assert_eq!(alter_request.catalog_name, "");
assert_eq!(alter_request.schema_name, "");
assert_eq!("monitor".to_string(), alter_request.table_name);
@@ -344,7 +347,7 @@ mod tests {
})),
};
let alter_request = alter_expr_to_request(expr).unwrap();
let alter_request = alter_expr_to_request(1, expr).unwrap();
assert_eq!(alter_request.catalog_name, "test_catalog");
assert_eq!(alter_request.schema_name, "test_schema");
assert_eq!("monitor".to_string(), alter_request.table_name);

View File

@@ -22,6 +22,7 @@ use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef;
use datafusion::error::Result as DfResult;
pub use datafusion::execution::context::{SessionContext, TaskContext};
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
pub use datafusion::physical_plan::Partitioning;
use datafusion::physical_plan::Statistics;
use datatypes::schema::SchemaRef;
@@ -69,6 +70,10 @@ pub trait PhysicalPlan: Debug + Send + Sync {
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream>;
fn metrics(&self) -> Option<MetricsSet> {
None
}
}
/// Adapt DataFusion's [`ExecutionPlan`](DfPhysicalPlan) to GreptimeDB's [`PhysicalPlan`].
@@ -76,11 +81,16 @@ pub trait PhysicalPlan: Debug + Send + Sync {
pub struct PhysicalPlanAdapter {
schema: SchemaRef,
df_plan: Arc<dyn DfPhysicalPlan>,
metric: ExecutionPlanMetricsSet,
}
impl PhysicalPlanAdapter {
pub fn new(schema: SchemaRef, df_plan: Arc<dyn DfPhysicalPlan>) -> Self {
Self { schema, df_plan }
Self {
schema,
df_plan,
metric: ExecutionPlanMetricsSet::new(),
}
}
pub fn df_plan(&self) -> Arc<dyn DfPhysicalPlan> {
@@ -127,15 +137,21 @@ impl PhysicalPlan for PhysicalPlanAdapter {
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let baseline_metric = BaselineMetrics::new(&self.metric, partition);
let df_plan = self.df_plan.clone();
let stream = df_plan
.execute(partition, context)
.context(error::GeneralDataFusionSnafu)?;
let adapter = RecordBatchStreamAdapter::try_new(stream)
let adapter = RecordBatchStreamAdapter::try_new_with_metrics(stream, baseline_metric)
.context(error::ConvertDfRecordBatchStreamSnafu)?;
Ok(Box::pin(adapter))
}
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metric.clone_inner())
}
}
#[derive(Debug)]
@@ -196,6 +212,10 @@ impl DfPhysicalPlan for DfPhysicalPlanAdapter {
fn statistics(&self) -> Statistics {
Statistics::default()
}
fn metrics(&self) -> Option<MetricsSet> {
self.0.metrics()
}
}
#[cfg(test)]

View File

@@ -20,6 +20,7 @@ use std::task::{Context, Poll};
use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef;
use datafusion::error::Result as DfResult;
use datafusion::parquet::arrow::async_reader::{AsyncFileReader, ParquetRecordBatchStream};
use datafusion::physical_plan::metrics::BaselineMetrics;
use datafusion::physical_plan::RecordBatchStream as DfRecordBatchStream;
use datafusion_common::DataFusionError;
use datatypes::schema::{Schema, SchemaRef};
@@ -115,13 +116,31 @@ impl Stream for DfRecordBatchStreamAdapter {
pub struct RecordBatchStreamAdapter {
schema: SchemaRef,
stream: DfSendableRecordBatchStream,
metrics: Option<BaselineMetrics>,
}
impl RecordBatchStreamAdapter {
pub fn try_new(stream: DfSendableRecordBatchStream) -> Result<Self> {
let schema =
Arc::new(Schema::try_from(stream.schema()).context(error::SchemaConversionSnafu)?);
Ok(Self { schema, stream })
Ok(Self {
schema,
stream,
metrics: None,
})
}
pub fn try_new_with_metrics(
stream: DfSendableRecordBatchStream,
metrics: BaselineMetrics,
) -> Result<Self> {
let schema =
Arc::new(Schema::try_from(stream.schema()).context(error::SchemaConversionSnafu)?);
Ok(Self {
schema,
stream,
metrics: Some(metrics),
})
}
}
@@ -135,6 +154,12 @@ impl Stream for RecordBatchStreamAdapter {
type Item = Result<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let timer = self
.metrics
.as_ref()
.map(|m| m.elapsed_compute().clone())
.unwrap_or_default();
let _guard = timer.timer();
match Pin::new(&mut self.stream).poll_next(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Some(df_record_batch)) => {

View File

@@ -164,7 +164,7 @@ impl CloseRegionHandler {
}
if engine
.get_table(&ctx, table_ref)
.get_table(&ctx, region_ident.table_ident.table_id)
.with_context(|_| error::GetTableSnafu {
table_name: table_ref.to_string(),
})?
@@ -178,6 +178,7 @@ impl CloseRegionHandler {
schema_name: table_ref.schema.to_string(),
table_name: table_ref.table.to_string(),
region_numbers: region_numbers.clone(),
table_id: region_ident.table_ident.table_id,
flush: true,
},
)

View File

@@ -106,7 +106,13 @@ impl Instance {
let name = alter_table.table_name().clone();
let (catalog, schema, table) = table_idents_to_full_name(&name, query_ctx.clone())?;
let table_ref = TableReference::full(&catalog, &schema, &table);
let req = SqlHandler::alter_to_request(alter_table, table_ref)?;
// Currently, we have to get the table multiple times. Consider remove the sql handler in the future.
let table = self.sql_handler.get_table(&table_ref).await?;
let req = SqlHandler::alter_to_request(
alter_table,
table_ref,
table.table_info().ident.table_id,
)?;
self.sql_handler
.execute(SqlRequest::Alter(req), query_ctx)
.await
@@ -114,10 +120,13 @@ impl Instance {
Statement::DropTable(drop_table) => {
let (catalog_name, schema_name, table_name) =
table_idents_to_full_name(drop_table.table_name(), query_ctx.clone())?;
let table_ref = TableReference::full(&catalog_name, &schema_name, &table_name);
let table = self.sql_handler.get_table(&table_ref).await?;
let req = DropTableRequest {
catalog_name,
schema_name,
table_name,
table_id: table.table_info().ident.table_id,
};
self.sql_handler
.execute(SqlRequest::DropTable(req), query_ctx)

View File

@@ -14,6 +14,7 @@
use api::v1::{AlterExpr, CreateTableExpr, DropTableExpr, FlushTableExpr};
use common_catalog::consts::IMMUTABLE_FILE_ENGINE;
use common_catalog::format_full_table_name;
use common_grpc_expr::{alter_expr_to_request, create_expr_to_request};
use common_query::Output;
use common_telemetry::info;
@@ -22,8 +23,8 @@ use snafu::prelude::*;
use table::requests::{DropTableRequest, FlushTableRequest};
use crate::error::{
AlterExprToRequestSnafu, BumpTableIdSnafu, CreateExprToRequestSnafu,
IncorrectInternalStateSnafu, Result,
AlterExprToRequestSnafu, BumpTableIdSnafu, CatalogSnafu, CreateExprToRequestSnafu,
IncorrectInternalStateSnafu, Result, TableNotFoundSnafu,
};
use crate::instance::Instance;
use crate::sql::SqlRequest;
@@ -69,17 +70,45 @@ impl Instance {
}
pub(crate) async fn handle_alter(&self, expr: AlterExpr) -> Result<Output> {
let request = alter_expr_to_request(expr).context(AlterExprToRequestSnafu)?;
let table = self
.catalog_manager
.table(&expr.catalog_name, &expr.schema_name, &expr.table_name)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: format_full_table_name(
&expr.catalog_name,
&expr.schema_name,
&expr.table_name,
),
})?;
let request = alter_expr_to_request(table.table_info().ident.table_id, expr)
.context(AlterExprToRequestSnafu)?;
self.sql_handler()
.execute(SqlRequest::Alter(request), QueryContext::arc())
.await
}
pub(crate) async fn handle_drop_table(&self, expr: DropTableExpr) -> Result<Output> {
let table = self
.catalog_manager
.table(&expr.catalog_name, &expr.schema_name, &expr.table_name)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: format_full_table_name(
&expr.catalog_name,
&expr.schema_name,
&expr.table_name,
),
})?;
let req = DropTableRequest {
catalog_name: expr.catalog_name,
schema_name: expr.schema_name,
table_name: expr.table_name,
table_id: table.table_info().ident.table_id,
};
self.sql_handler()
.execute(SqlRequest::DropTable(req), QueryContext::arc())

View File

@@ -19,6 +19,7 @@ use snafu::prelude::*;
use sql::statements::alter::{AlterTable, AlterTableOperation};
use sql::statements::column_def_to_schema;
use table::engine::TableReference;
use table::metadata::TableId;
use table::requests::{AddColumnRequest, AlterKind, AlterTableRequest};
use table_procedure::AlterTableProcedure;
@@ -60,6 +61,7 @@ impl SqlHandler {
pub(crate) fn alter_to_request(
alter_table: AlterTable,
table_ref: TableReference,
table_id: TableId,
) -> Result<AlterTableRequest> {
let alter_kind = match &alter_table.alter_operation() {
AlterTableOperation::AddConstraint(table_constraint) => {
@@ -91,6 +93,7 @@ impl SqlHandler {
catalog_name: table_ref.catalog.to_string(),
schema_name: table_ref.schema.to_string(),
table_name: table_ref.table.to_string(),
table_id,
alter_kind,
})
}
@@ -128,6 +131,7 @@ mod tests {
let req = SqlHandler::alter_to_request(
alter_table,
TableReference::full("greptime", "public", "my_metric_1"),
1,
)
.unwrap();
assert_eq!(req.catalog_name, "greptime");
@@ -154,6 +158,7 @@ mod tests {
let req = SqlHandler::alter_to_request(
alter_table,
TableReference::full("greptime", "public", "test_table"),
1,
)
.unwrap();
assert_eq!(req.catalog_name, "greptime");

View File

@@ -25,7 +25,7 @@ use object_store::ObjectStore;
use snafu::ResultExt;
use table::engine::{table_dir, EngineContext, TableEngine, TableEngineProcedure, TableReference};
use table::error::TableOperationSnafu;
use table::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder, TableType};
use table::metadata::{TableId, TableInfo, TableInfoBuilder, TableMetaBuilder, TableType};
use table::requests::{AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest};
use table::{error as table_error, Result as TableResult, Table, TableRef};
use tokio::sync::Mutex;
@@ -88,16 +88,12 @@ impl TableEngine for ImmutableFileTableEngine {
.fail()
}
fn get_table(
&self,
_ctx: &EngineContext,
table_ref: &TableReference,
) -> TableResult<Option<TableRef>> {
Ok(self.inner.get_table(table_ref))
fn get_table(&self, _ctx: &EngineContext, table_id: TableId) -> TableResult<Option<TableRef>> {
Ok(self.inner.get_table(table_id))
}
fn table_exists(&self, _ctx: &EngineContext, table_ref: &TableReference) -> bool {
self.inner.get_table(table_ref).is_some()
fn table_exists(&self, _ctx: &EngineContext, table_id: TableId) -> bool {
self.inner.get_table(table_id).is_some()
}
async fn drop_table(
@@ -151,8 +147,8 @@ impl TableEngineProcedure for ImmutableFileTableEngine {
#[cfg(test)]
impl ImmutableFileTableEngine {
pub async fn close_table(&self, table_ref: &TableReference<'_>) -> TableResult<()> {
self.inner.close_table(table_ref).await
pub async fn close_table(&self, table_id: TableId) -> TableResult<()> {
self.inner.close_table(table_id).await
}
}
@@ -173,10 +169,10 @@ impl ImmutableFileTableEngine {
}
struct EngineInner {
/// All tables opened by the engine. Map key is formatted [TableReference].
/// All tables opened by the engine.
///
/// Writing to `tables` should also hold the `table_mutex`.
tables: RwLock<HashMap<String, ImmutableFileTableRef>>,
tables: RwLock<HashMap<TableId, ImmutableFileTableRef>>,
object_store: ObjectStore,
/// Table mutex is used to protect the operations such as creating/opening/closing
@@ -199,6 +195,7 @@ impl EngineInner {
request: CreateTableRequest,
) -> Result<TableRef> {
let CreateTableRequest {
id: table_id,
catalog_name,
schema_name,
table_name,
@@ -212,7 +209,7 @@ impl EngineInner {
table: &table_name,
};
if let Some(table) = self.get_table(&table_ref) {
if let Some(table) = self.get_table(table_id) {
return if create_if_not_exists {
Ok(table)
} else {
@@ -223,14 +220,13 @@ impl EngineInner {
let table_schema =
Arc::new(Schema::try_from(request.schema).context(InvalidRawSchemaSnafu)?);
let table_id = request.id;
let table_dir = table_dir(&catalog_name, &schema_name, table_id);
let table_full_name = table_ref.to_string();
let _lock = self.table_mutex.lock().await;
// Checks again, read lock should be enough since we are guarded by the mutex.
if let Some(table) = self.get_table_by_full_name(&table_full_name) {
if let Some(table) = self.get_table(table_id) {
return if request.create_if_not_exists {
Ok(table)
} else {
@@ -279,27 +275,20 @@ impl EngineInner {
table_id
);
self.tables
.write()
.unwrap()
.insert(table_full_name, table.clone());
self.tables.write().unwrap().insert(table_id, table.clone());
Ok(table)
}
fn get_table_by_full_name(&self, full_name: &str) -> Option<TableRef> {
fn get_table(&self, table_id: TableId) -> Option<TableRef> {
self.tables
.read()
.unwrap()
.get(full_name)
.get(&table_id)
.cloned()
.map(|table| table as _)
}
fn get_table(&self, table_ref: &TableReference) -> Option<TableRef> {
self.get_table_by_full_name(&table_ref.to_string())
}
async fn open_table(
&self,
_ctx: &EngineContext,
@@ -309,6 +298,7 @@ impl EngineInner {
catalog_name,
schema_name,
table_name,
table_id,
..
} = request;
let table_ref = TableReference {
@@ -317,16 +307,15 @@ impl EngineInner {
table: &table_name,
};
let table_full_name = table_ref.to_string();
if let Some(table) = self.get_table_by_full_name(&table_full_name) {
if let Some(table) = self.get_table(table_id) {
return Ok(Some(table));
}
let table_full_name = table_ref.to_string();
let table = {
let _lock = self.table_mutex.lock().await;
// Checks again, read lock should be enough since we are guarded by the mutex.
if let Some(table) = self.get_table_by_full_name(&table_full_name) {
if let Some(table) = self.get_table(table_id) {
return Ok(Some(table));
}
@@ -350,10 +339,7 @@ impl EngineInner {
.context(table_error::TableOperationSnafu)?,
);
self.tables
.write()
.unwrap()
.insert(table_full_name, table.clone());
self.tables.write().unwrap().insert(table_id, table.clone());
Some(table as _)
};
@@ -375,7 +361,7 @@ impl EngineInner {
let table_full_name = table_ref.to_string();
let _lock = self.table_mutex.lock().await;
if let Some(table) = self.get_table_by_full_name(&table_full_name) {
if let Some(table) = self.get_table(req.table_id) {
let table_id = table.table_info().ident.table_id;
let table_dir = table_dir(&req.catalog_name, &req.schema_name, table_id);
@@ -389,7 +375,7 @@ impl EngineInner {
.context(DropTableSnafu {
table_name: &table_full_name,
})?;
self.tables.write().unwrap().remove(&table_full_name);
self.tables.write().unwrap().remove(&req.table_id);
Ok(true)
} else {
@@ -429,12 +415,10 @@ impl EngineInner {
#[cfg(test)]
impl EngineInner {
pub async fn close_table(&self, table_ref: &TableReference<'_>) -> TableResult<()> {
let full_name = table_ref.to_string();
pub async fn close_table(&self, table_id: TableId) -> TableResult<()> {
let _lock = self.table_mutex.lock().await;
if let Some(table) = self.get_table_by_full_name(&full_name) {
if let Some(table) = self.get_table(table_id) {
let regions = Vec::new();
table
.close(&regions)
@@ -443,7 +427,7 @@ impl EngineInner {
.context(table_error::TableOperationSnafu)?;
}
self.tables.write().unwrap().remove(&full_name);
self.tables.write().unwrap().remove(&table_id);
Ok(())
}

View File

@@ -92,14 +92,13 @@ impl CreateImmutableFileTable {
fn on_prepare(&mut self) -> Result<Status> {
let engine_ctx = EngineContext::default();
let table_ref = self.data.table_ref();
// Safety: Current get_table implementation always returns Ok.
if self.engine.table_exists(&engine_ctx, &table_ref) {
if self.engine.table_exists(&engine_ctx, self.data.request.id) {
// The table already exists.
ensure!(
self.data.request.create_if_not_exists,
TableExistsSnafu {
table_name: table_ref.to_string(),
table_name: self.data.table_ref().to_string(),
}
);
@@ -113,8 +112,7 @@ impl CreateImmutableFileTable {
async fn on_create_table(&mut self) -> Result<Status> {
let engine_ctx = EngineContext::default();
let table_ref = self.data.table_ref();
if self.engine.table_exists(&engine_ctx, &table_ref) {
if self.engine.table_exists(&engine_ctx, self.data.request.id) {
// Table already created. We don't need to check create_if_not_exists as
// we have checked it in prepare state.
return Ok(Status::Done);

View File

@@ -16,7 +16,7 @@ use std::assert_matches::assert_matches;
use std::sync::Arc;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, IMMUTABLE_FILE_ENGINE};
use table::engine::{EngineContext, TableEngine, TableEngineProcedure, TableReference};
use table::engine::{EngineContext, TableEngine, TableEngineProcedure};
use table::requests::{AlterKind, AlterTableRequest, DropTableRequest, OpenTableRequest};
use table::{error as table_error, Table};
@@ -35,14 +35,9 @@ async fn test_get_table() {
..
} = test_util::setup_test_engine_and_table("test_get_table").await;
let table_info = table.table_info();
let table_ref = TableReference {
catalog: &table_info.catalog_name,
schema: &table_info.schema_name,
table: &table_info.name,
};
let got = table_engine
.get_table(&EngineContext::default(), &table_ref)
.get_table(&EngineContext::default(), table_info.ident.table_id)
.unwrap()
.unwrap();
@@ -53,21 +48,17 @@ async fn test_get_table() {
async fn test_open_table() {
common_telemetry::init_default_ut_logging();
let ctx = EngineContext::default();
// the test table id is 1
let table_id = 1;
let open_req = OpenTableRequest {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: test_util::TEST_TABLE_NAME.to_string(),
// the test table id is 1
table_id: 1,
table_id,
region_numbers: vec![0],
};
let table_ref = TableReference {
catalog: DEFAULT_CATALOG_NAME,
schema: DEFAULT_SCHEMA_NAME,
table: test_util::TEST_TABLE_NAME,
};
let TestEngineComponents {
table_engine,
table_ref: table,
@@ -77,7 +68,7 @@ async fn test_open_table() {
assert_eq!(IMMUTABLE_FILE_ENGINE, table_engine.name());
table_engine.close_table(&table_ref).await.unwrap();
table_engine.close_table(table_id).await.unwrap();
let reopened = table_engine
.open_table(&ctx, open_req.clone())
@@ -101,21 +92,17 @@ async fn test_open_table() {
async fn test_close_all_table() {
common_telemetry::init_default_ut_logging();
let table_ref = TableReference {
catalog: DEFAULT_CATALOG_NAME,
schema: DEFAULT_SCHEMA_NAME,
table: test_util::TEST_TABLE_NAME,
};
let TestEngineComponents {
table_engine,
dir: _dir,
table_ref: table,
..
} = test_util::setup_test_engine_and_table("test_close_all_table").await;
table_engine.close().await.unwrap();
let exist = table_engine.table_exists(&EngineContext::default(), &table_ref);
let table_id = table.table_info().ident.table_id;
let exist = table_engine.table_exists(&EngineContext::default(), table_id);
assert!(!exist);
}
@@ -126,6 +113,7 @@ async fn test_alter_table() {
let TestEngineComponents {
table_engine,
dir: _dir,
table_ref,
..
} = test_util::setup_test_engine_and_table("test_alter_table").await;
@@ -133,6 +121,7 @@ async fn test_alter_table() {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: TEST_TABLE_NAME.to_string(),
table_id: table_ref.table_info().ident.table_id,
alter_kind: AlterKind::RenameTable {
new_table_name: "foo".to_string(),
},
@@ -151,12 +140,6 @@ async fn test_alter_table() {
async fn test_drop_table() {
common_telemetry::init_default_ut_logging();
let drop_req = DropTableRequest {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: TEST_TABLE_NAME.to_string(),
};
let TestEngineComponents {
table_engine,
object_store,
@@ -167,12 +150,13 @@ async fn test_drop_table() {
} = test_util::setup_test_engine_and_table("test_drop_table").await;
let table_info = table.table_info();
let table_ref = TableReference {
catalog: &table_info.catalog_name,
schema: &table_info.schema_name,
table: &table_info.name,
};
let drop_req = DropTableRequest {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: TEST_TABLE_NAME.to_string(),
table_id: table_info.ident.table_id,
};
let dropped = table_engine
.drop_table(&EngineContext::default(), drop_req)
.await
@@ -180,7 +164,7 @@ async fn test_drop_table() {
assert!(dropped);
let exist = table_engine.table_exists(&EngineContext::default(), &table_ref);
let exist = table_engine.table_exists(&EngineContext::default(), table_info.ident.table_id);
assert!(!exist);
// check table_dir manifest
@@ -203,13 +187,14 @@ async fn test_create_drop_table_procedure() {
let engine_ctx = EngineContext::default();
// Test create table by procedure.
let create_request = test_util::new_create_request(schema);
let table_id = create_request.id;
let mut procedure = table_engine
.create_table_procedure(&engine_ctx, create_request.clone())
.unwrap();
common_procedure_test::execute_procedure_until_done(&mut procedure).await;
assert!(table_engine
.get_table(&engine_ctx, &create_request.table_ref())
.get_table(&engine_ctx, table_id)
.unwrap()
.is_some());
@@ -218,6 +203,7 @@ async fn test_create_drop_table_procedure() {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: TEST_TABLE_NAME.to_string(),
table_id,
};
let mut procedure = table_engine
.drop_table_procedure(&engine_ctx, drop_request)
@@ -225,7 +211,7 @@ async fn test_create_drop_table_procedure() {
common_procedure_test::execute_procedure_until_done(&mut procedure).await;
assert!(table_engine
.get_table(&engine_ctx, &create_request.table_ref())
.get_table(&engine_ctx, table_id)
.unwrap()
.is_none());
}

View File

@@ -14,6 +14,7 @@
use std::any::Any;
use common_datasource::file_format::Format;
use common_error::prelude::*;
use datafusion::arrow::error::ArrowError;
use datafusion::error::DataFusionError;
@@ -175,6 +176,9 @@ pub enum Error {
source: datatypes::error::Error,
location: Location,
},
#[snafu(display("Unsupported format: {:?}", format))]
UnsupportedFormat { format: Format, location: Location },
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -191,7 +195,8 @@ impl ErrorExt for Error {
| BuildCsvConfig { .. }
| ProjectSchema { .. }
| MissingRequiredField { .. }
| ConvertSchema { .. } => StatusCode::InvalidArguments,
| ConvertSchema { .. }
| UnsupportedFormat { .. } => StatusCode::InvalidArguments,
BuildBackend { source, .. } => source.status_code(),
BuildStreamAdapter { source, .. } => source.status_code(),

View File

@@ -94,29 +94,8 @@ fn build_scan_plan<T: FileOpener + Send + 'static>(
projection: Option<&Vec<usize>>,
limit: Option<usize>,
) -> Result<PhysicalPlanRef> {
let stream = FileStream::new(
&FileScanConfig {
object_store_url: ObjectStoreUrl::parse("empty://").unwrap(), // won't be used
file_schema,
file_groups: vec![files
.iter()
.map(|filename| PartitionedFile::new(filename.to_string(), 0))
.collect::<Vec<_>>()],
statistics: Default::default(),
projection: projection.cloned(),
limit,
table_partition_cols: vec![],
output_ordering: None,
infinite_source: false,
},
0, // partition: hard-code
opener,
&ExecutionPlanMetricsSet::new(),
)
.context(error::BuildStreamSnafu)?;
let adapter = RecordBatchStreamAdapter::try_new(Box::pin(stream))
.context(error::BuildStreamAdapterSnafu)?;
Ok(Arc::new(StreamScanAdapter::new(Box::pin(adapter))))
let adapter = build_record_batch_stream(opener, file_schema, files, projection, limit)?;
Ok(Arc::new(StreamScanAdapter::new(adapter)))
}
fn build_record_batch_stream<T: FileOpener + Send + 'static>(
@@ -382,6 +361,7 @@ pub fn create_physical_plan(
Format::Csv(format) => new_csv_scan_plan(ctx, config, format),
Format::Json(format) => new_json_scan_plan(ctx, config, format),
Format::Parquet(format) => new_parquet_scan_plan(ctx, config, format),
_ => error::UnsupportedFormatSnafu { format: *format }.fail(),
}
}
@@ -394,5 +374,6 @@ pub fn create_stream(
Format::Csv(format) => new_csv_stream(ctx, config, format),
Format::Json(format) => new_json_stream(ctx, config, format),
Format::Parquet(format) => new_parquet_stream(ctx, config, format),
_ => error::UnsupportedFormatSnafu { format: *format }.fail(),
}
}

View File

@@ -14,6 +14,7 @@
use std::any::Any;
use common_datasource::file_format::Format;
use common_error::prelude::*;
use datafusion::parquet;
use datatypes::arrow::error::ArrowError;
@@ -443,6 +444,9 @@ pub enum Error {
source: common_datasource::error::Error,
},
#[snafu(display("Unsupported format: {:?}", format))]
UnsupportedFormat { location: Location, format: Format },
#[snafu(display("Failed to parse file format, source: {}", source))]
ParseFileFormat {
#[snafu(backtrace)]
@@ -500,6 +504,12 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to read orc schema, source: {}", source))]
ReadOrc {
source: common_datasource::error::Error,
location: Location,
},
#[snafu(display("Failed to build parquet record batch stream, source: {}", source))]
BuildParquetRecordBatchStream {
location: Location,
@@ -575,7 +585,8 @@ impl ErrorExt for Error {
| Error::InvalidSchema { .. }
| Error::PrepareImmutableTable { .. }
| Error::BuildCsvConfig { .. }
| Error::ProjectSchema { .. } => StatusCode::InvalidArguments,
| Error::ProjectSchema { .. }
| Error::UnsupportedFormat { .. } => StatusCode::InvalidArguments,
Error::NotSupported { .. } => StatusCode::Unsupported,
@@ -667,7 +678,9 @@ impl ErrorExt for Error {
Error::TableScanExec { source, .. } => source.status_code(),
Error::ReadObject { .. } | Error::ReadParquet { .. } => StatusCode::StorageUnavailable,
Error::ReadObject { .. } | Error::ReadParquet { .. } | Error::ReadOrc { .. } => {
StatusCode::StorageUnavailable
}
Error::ListObjects { source }
| Error::ParseUrl { source }

View File

@@ -543,8 +543,11 @@ impl DistInstance {
table_name: format_full_table_name(catalog_name, schema_name, table_name),
})?;
let request = common_grpc_expr::alter_expr_to_request(expr.clone())
.context(AlterExprToRequestSnafu)?;
let request = common_grpc_expr::alter_expr_to_request(
table.table_info().ident.table_id,
expr.clone(),
)
.context(AlterExprToRequestSnafu)?;
let mut context = AlterContext::with_capacity(1);

View File

@@ -20,6 +20,9 @@ use async_compat::CompatExt;
use common_base::readable_size::ReadableSize;
use common_datasource::file_format::csv::{CsvConfigBuilder, CsvOpener};
use common_datasource::file_format::json::JsonOpener;
use common_datasource::file_format::orc::{
infer_orc_schema, new_orc_stream_reader, OrcArrowStreamReaderAdapter,
};
use common_datasource::file_format::{FileFormat, Format};
use common_datasource::lister::{Lister, Source};
use common_datasource::object_store::{build_backend, parse_url};
@@ -110,6 +113,18 @@ impl StatementExecutor {
.context(error::ReadParquetSnafu)?;
Ok(builder.schema().clone())
}
Format::Orc(_) => {
let reader = object_store
.reader(path)
.await
.context(error::ReadObjectSnafu { path })?;
let schema = infer_orc_schema(reader)
.await
.context(error::ReadOrcSnafu)?;
Ok(Arc::new(schema))
}
}
}
@@ -201,6 +216,18 @@ impl StatementExecutor {
Ok(Box::pin(ParquetRecordBatchStreamAdapter::new(upstream)))
}
Format::Orc(_) => {
let reader = object_store
.reader(path)
.await
.context(error::ReadObjectSnafu { path })?;
let stream = new_orc_stream_reader(reader)
.await
.context(error::ReadOrcSnafu)?;
let stream = OrcArrowStreamReaderAdapter::new(stream);
Ok(Box::pin(stream))
}
}
}

View File

@@ -68,6 +68,7 @@ impl StatementExecutor {
Ok(rows_copied)
}
_ => error::UnsupportedFormatSnafu { format: *format }.fail(),
}
}

View File

@@ -382,6 +382,7 @@ impl DistTable {
schema_name,
table_name,
alter_kind,
table_id: _table_id,
} = request;
let alter_expr = context

View File

@@ -42,8 +42,7 @@ use table::engine::{
};
use table::metadata::{TableId, TableInfo, TableVersion};
use table::requests::{
AlterKind, AlterTableRequest, CloseTableRequest, CreateTableRequest, DropTableRequest,
OpenTableRequest,
AlterTableRequest, CloseTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest,
};
use table::{error as table_error, Result as TableResult, Table, TableRef};
@@ -102,9 +101,8 @@ impl<S: StorageEngine> TableEngine for MitoEngine<S> {
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
let table_ref = request.table_ref();
let _lock = self.inner.table_mutex.lock(table_ref.to_string()).await;
if let Some(table) = self.inner.get_mito_table(&table_ref) {
let _lock = self.inner.table_mutex.lock(request.id).await;
if let Some(table) = self.inner.get_mito_table(request.id) {
if request.create_if_not_exists {
return Ok(table);
} else {
@@ -148,26 +146,10 @@ impl<S: StorageEngine> TableEngine for MitoEngine<S> {
) -> TableResult<TableRef> {
let _timer = common_telemetry::timer!(metrics::MITO_ALTER_TABLE_ELAPSED);
if let AlterKind::RenameTable { new_table_name } = &req.alter_kind {
let mut table_ref = req.table_ref();
table_ref.table = new_table_name;
if self.inner.get_mito_table(&table_ref).is_some() {
return TableExistsSnafu {
table_name: table_ref.to_string(),
}
.fail()
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
}
}
let mut procedure = AlterMitoTable::new(req, self.inner.clone())
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
// TODO(yingwen): Rename has concurrent issue without the procedure runtime. But
// users can't use this method to alter a table so it is still safe. We should
// refactor the table engine to avoid using table name as key.
procedure
.engine_alter_table()
.await
@@ -175,16 +157,12 @@ impl<S: StorageEngine> TableEngine for MitoEngine<S> {
.context(table_error::TableOperationSnafu)
}
fn get_table(
&self,
_ctx: &EngineContext,
table_ref: &TableReference,
) -> TableResult<Option<TableRef>> {
Ok(self.inner.get_table(table_ref))
fn get_table(&self, _ctx: &EngineContext, table_id: TableId) -> TableResult<Option<TableRef>> {
Ok(self.inner.get_table(table_id))
}
fn table_exists(&self, _ctx: &EngineContext, table_ref: &TableReference) -> bool {
self.inner.get_table(table_ref).is_some()
fn table_exists(&self, _ctx: &EngineContext, table_id: TableId) -> bool {
self.inner.get_table(table_id).is_some()
}
async fn drop_table(
@@ -254,16 +232,16 @@ impl<S: StorageEngine> TableEngineProcedure for MitoEngine<S> {
}
pub(crate) struct MitoEngineInner<S: StorageEngine> {
/// All tables opened by the engine. Map key is formatted [TableReference].
/// All tables opened by the engine.
///
/// Writing to `tables` should also hold the `table_mutex`.
tables: DashMap<String, Arc<MitoTable<S::Region>>>,
tables: DashMap<TableId, Arc<MitoTable<S::Region>>>,
object_store: ObjectStore,
compress_type: CompressionType,
storage_engine: S,
/// Table mutex is used to protect the operations such as creating/opening/closing
/// a table, to avoid things like opening the same table simultaneously.
table_mutex: Arc<KeyLock<String>>,
table_mutex: Arc<KeyLock<TableId>>,
}
fn build_row_key_desc(
@@ -429,11 +407,6 @@ impl<S: StorageEngine> MitoEngineInner<S> {
let catalog_name = &request.catalog_name;
let schema_name = &request.schema_name;
let table_name = &request.table_name;
let table_ref = TableReference {
catalog: catalog_name,
schema: schema_name,
table: table_name,
};
let table_id = request.table_id;
let engine_ctx = StorageEngineContext::default();
@@ -463,6 +436,11 @@ impl<S: StorageEngine> MitoEngineInner<S> {
let mut regions = HashMap::with_capacity(table_info.meta.region_numbers.len());
let table_ref = TableReference {
catalog: catalog_name,
schema: schema_name,
table: table_name,
};
for region_number in &request.region_numbers {
let region = self
.open_region(&engine_ctx, table_id, *region_number, &table_ref, &opts)
@@ -556,16 +534,7 @@ impl<S: StorageEngine> MitoEngineInner<S> {
ctx: &EngineContext,
request: OpenTableRequest,
) -> TableResult<Option<TableRef>> {
let catalog_name = &request.catalog_name;
let schema_name = &request.schema_name;
let table_name = &request.table_name;
let table_ref = TableReference {
catalog: catalog_name,
schema: schema_name,
table: table_name,
};
if let Some(table) = self.get_table(&table_ref) {
if let Some(table) = self.get_table(request.table_id) {
if let Some(table) = self.check_regions(table, &request.region_numbers)? {
return Ok(Some(table));
}
@@ -573,11 +542,10 @@ impl<S: StorageEngine> MitoEngineInner<S> {
// Acquires the mutex before opening a new table.
let table = {
let table_name_key = table_ref.to_string();
let _lock = self.table_mutex.lock(table_name_key.clone()).await;
let _lock = self.table_mutex.lock(request.table_id).await;
// Checks again, read lock should be enough since we are guarded by the mutex.
if let Some(table) = self.get_mito_table(&table_ref) {
if let Some(table) = self.get_mito_table(request.table_id) {
// Contains all regions or target region
if let Some(table) = self.check_regions(table.clone(), &request.region_numbers)? {
Some(table)
@@ -593,7 +561,7 @@ impl<S: StorageEngine> MitoEngineInner<S> {
let table = self.recover_table(ctx, request.clone()).await?;
if let Some(table) = table {
// already locked
self.tables.insert(table_ref.to_string(), table.clone());
self.tables.insert(request.table_id, table.clone());
Some(table as _)
} else {
@@ -604,8 +572,8 @@ impl<S: StorageEngine> MitoEngineInner<S> {
logging::info!(
"Mito engine opened table: {} in schema: {}",
table_name,
schema_name
request.table_name,
request.schema_name
);
Ok(table)
@@ -613,10 +581,8 @@ impl<S: StorageEngine> MitoEngineInner<S> {
async fn drop_table(&self, request: DropTableRequest) -> TableResult<bool> {
// Remove the table from the engine to avoid further access from users.
let table_ref = request.table_ref();
let _lock = self.table_mutex.lock(table_ref.to_string()).await;
let removed_table = self.tables.remove(&table_ref.to_string());
let _lock = self.table_mutex.lock(request.table_id).await;
let removed_table = self.tables.remove(&request.table_id);
// Close the table to close all regions. Closing a region is idempotent.
if let Some((_, table)) = &removed_table {
let regions = table.region_ids();
@@ -663,17 +629,13 @@ impl<S: StorageEngine> MitoEngineInner<S> {
Ok(Some((manifest, table_info)))
}
fn get_table(&self, table_ref: &TableReference) -> Option<TableRef> {
self.tables
.get(&table_ref.to_string())
.map(|en| en.value().clone() as _)
fn get_table(&self, table_id: TableId) -> Option<TableRef> {
self.tables.get(&table_id).map(|en| en.value().clone() as _)
}
/// Returns the [MitoTable].
fn get_mito_table(&self, table_ref: &TableReference) -> Option<Arc<MitoTable<S::Region>>> {
self.tables
.get(&table_ref.to_string())
.map(|en| en.value().clone())
fn get_mito_table(&self, table_id: TableId) -> Option<Arc<MitoTable<S::Region>>> {
self.tables.get(&table_id).map(|en| en.value().clone())
}
async fn close(&self) -> TableResult<()> {
@@ -696,8 +658,7 @@ impl<S: StorageEngine> MitoEngineInner<S> {
}
async fn close_table(&self, request: CloseTableRequest) -> TableResult<CloseTableResult> {
let table_ref = request.table_ref();
if let Some(table) = self.get_mito_table(&table_ref) {
if let Some(table) = self.get_mito_table(request.table_id) {
return self
.close_table_inner(table, Some(&request.region_numbers), request.flush)
.await;
@@ -713,13 +674,8 @@ impl<S: StorageEngine> MitoEngineInner<S> {
flush: bool,
) -> TableResult<CloseTableResult> {
let info = table.table_info();
let table_ref = TableReference {
catalog: &info.catalog_name,
schema: &info.schema_name,
table: &info.name,
};
let table_id = info.ident.table_id;
let _lock = self.table_mutex.lock(table_ref.to_string()).await;
let _lock = self.table_mutex.lock(table_id).await;
let all_regions = table.region_ids();
let regions = regions.unwrap_or(&all_regions);
@@ -738,12 +694,12 @@ impl<S: StorageEngine> MitoEngineInner<S> {
}
if table.is_releasable() {
self.tables.remove(&table_ref.to_string());
self.tables.remove(&table_id);
logging::info!(
"Mito engine closed table: {} in schema: {}",
table_ref.table,
table_ref.schema,
info.name,
info.schema_name,
);
return Ok(CloseTableResult::Released(removed_regions));
}

View File

@@ -29,9 +29,7 @@ use table::requests::{AlterKind, AlterTableRequest};
use table::{Table, TableRef};
use crate::engine::MitoEngineInner;
use crate::error::{
TableExistsSnafu, TableNotFoundSnafu, UpdateTableManifestSnafu, VersionChangedSnafu,
};
use crate::error::{TableNotFoundSnafu, UpdateTableManifestSnafu, VersionChangedSnafu};
use crate::manifest::action::{TableChange, TableMetaAction, TableMetaActionList};
use crate::metrics;
use crate::table::MitoTable;
@@ -39,7 +37,6 @@ use crate::table::MitoTable;
/// Procedure to alter a [MitoTable].
pub(crate) struct AlterMitoTable<S: StorageEngine> {
data: AlterTableData,
engine_inner: Arc<MitoEngineInner<S>>,
table: Arc<MitoTable<S::Region>>,
/// The table info after alteration.
new_info: Option<TableInfo>,
@@ -107,18 +104,16 @@ impl<S: StorageEngine> AlterMitoTable<S> {
table_version: 0,
};
let table_ref = data.table_ref();
let table =
engine_inner
.get_mito_table(&table_ref)
.with_context(|| TableNotFoundSnafu {
table_name: table_ref.to_string(),
})?;
let table = engine_inner
.get_mito_table(data.request.table_id)
.with_context(|| TableNotFoundSnafu {
table_name: table_ref.to_string(),
})?;
let info = table.table_info();
data.table_version = info.ident.version;
Ok(AlterMitoTable {
data,
engine_inner,
table,
new_info: None,
alter_op: None,
@@ -148,16 +143,14 @@ impl<S: StorageEngine> AlterMitoTable<S> {
fn from_json(json: &str, engine_inner: Arc<MitoEngineInner<S>>) -> Result<Self> {
let data: AlterTableData = serde_json::from_str(json).context(FromJsonSnafu)?;
let table_ref = data.table_ref();
let table =
engine_inner
.get_mito_table(&table_ref)
.with_context(|| TableNotFoundSnafu {
table_name: table_ref.to_string(),
})?;
let table = engine_inner
.get_mito_table(data.request.table_id)
.with_context(|| TableNotFoundSnafu {
table_name: table_ref.to_string(),
})?;
Ok(AlterMitoTable {
data,
engine_inner,
table,
new_info: None,
alter_op: None,
@@ -176,17 +169,8 @@ impl<S: StorageEngine> AlterMitoTable<S> {
}
);
if let AlterKind::RenameTable { new_table_name } = &self.data.request.alter_kind {
let mut table_ref = self.data.table_ref();
table_ref.table = new_table_name;
ensure!(
self.engine_inner.get_mito_table(&table_ref).is_none(),
TableExistsSnafu {
table_name: table_ref.to_string(),
}
);
}
// We don't check the table name in the table engine as it is the catalog
// manager's duty to ensure the table name is unused.
self.data.state = AlterTableState::EngineAlterTable;
Ok(Status::executing(true))
@@ -252,22 +236,6 @@ impl<S: StorageEngine> AlterMitoTable<S> {
// Update in memory metadata of the table.
self.table.set_table_info(new_info.clone());
// Rename key in tables map.
if let AlterKind::RenameTable { new_table_name } = &self.data.request.alter_kind {
let mut table_ref = self.data.table_ref();
let removed = {
let _lock = self.engine_inner.table_mutex.lock(table_ref.to_string());
self.engine_inner.tables.remove(&table_ref.to_string())
};
ensure!(removed.is_some(), TableNotFoundSnafu { table_name });
table_ref.table = new_table_name.as_str();
let _lock = self.engine_inner.table_mutex.lock(table_ref.to_string());
self.engine_inner
.tables
.insert(table_ref.to_string(), self.table.clone());
}
Ok(self.table.clone())
}
@@ -363,19 +331,15 @@ mod tests {
procedure_test_util::execute_procedure_until_done(&mut procedure).await;
// Get metadata of the created table.
let table_ref = TableReference {
catalog: &request.catalog_name,
schema: &request.schema_name,
table: &request.table_name,
};
let table = table_engine
.get_table(&engine_ctx, &table_ref)
.get_table(&engine_ctx, request.id)
.unwrap()
.unwrap();
let old_info = table.table_info();
let old_meta = &old_info.meta;
// Alter the table.
let table_id = request.id;
let request = new_add_columns_req();
let mut procedure = table_engine
.alter_table_procedure(&engine_ctx, request.clone())
@@ -384,7 +348,7 @@ mod tests {
// Validate.
let table = table_engine
.get_table(&engine_ctx, &table_ref)
.get_table(&engine_ctx, table_id)
.unwrap()
.unwrap();
let new_info = table.table_info();
@@ -405,7 +369,7 @@ mod tests {
ConcreteDataType::string_datatype(),
true,
);
let request = new_add_columns_req_with_location(&new_tag, &new_field);
let request = new_add_columns_req_with_location(table_id, &new_tag, &new_field);
let mut procedure = table_engine
.alter_table_procedure(&engine_ctx, request.clone())
.unwrap();
@@ -413,7 +377,7 @@ mod tests {
// Validate.
let table = table_engine
.get_table(&engine_ctx, &table_ref)
.get_table(&engine_ctx, table_id)
.unwrap()
.unwrap();
let new_info = table.table_info();
@@ -456,6 +420,7 @@ mod tests {
procedure_test_util::execute_procedure_until_done(&mut procedure).await;
// Add columns.
let table_id = request.id;
let request = new_add_columns_req();
let mut procedure = table_engine
.alter_table_procedure(&engine_ctx, request.clone())
@@ -463,13 +428,8 @@ mod tests {
procedure_test_util::execute_procedure_until_done(&mut procedure).await;
// Get metadata.
let table_ref = TableReference {
catalog: &request.catalog_name,
schema: &request.schema_name,
table: &request.table_name,
};
let table = table_engine
.get_table(&engine_ctx, &table_ref)
.get_table(&engine_ctx, table_id)
.unwrap()
.unwrap();
let old_info = table.table_info();
@@ -521,13 +481,8 @@ mod tests {
procedure_test_util::execute_procedure_until_done(&mut procedure).await;
// Get metadata of the created table.
let mut table_ref = TableReference {
catalog: &create_request.catalog_name,
schema: &create_request.schema_name,
table: &create_request.table_name,
};
let table = table_engine
.get_table(&engine_ctx, &table_ref)
.get_table(&engine_ctx, create_request.id)
.unwrap()
.unwrap();
@@ -546,12 +501,7 @@ mod tests {
let info = table.table_info();
assert_eq!(new_name, info.name);
assert!(table_engine
.get_table(&engine_ctx, &table_ref)
.unwrap()
.is_none());
table_ref.table = &new_name;
assert!(table_engine
.get_table(&engine_ctx, &table_ref)
.get_table(&engine_ctx, create_request.id)
.unwrap()
.is_some());
}

View File

@@ -131,7 +131,12 @@ impl<S: StorageEngine> CreateMitoTable<S> {
let table_ref = self.creator.data.table_ref();
logging::debug!("on prepare create table {}", table_ref);
if self.creator.engine_inner.get_table(&table_ref).is_some() {
if self
.creator
.engine_inner
.get_table(self.creator.data.request.id)
.is_some()
{
// If the table already exists.
ensure!(
self.creator.data.request.create_if_not_exists,
@@ -152,14 +157,14 @@ impl<S: StorageEngine> CreateMitoTable<S> {
async fn on_engine_create_table(&mut self) -> Result<Status> {
// In this state, we can ensure we are able to create a new table.
let table_ref = self.creator.data.table_ref();
logging::debug!("on engine create table {}", table_ref);
let table_id = self.creator.data.request.id;
logging::debug!(
"on engine create table {}, table_id: {}",
table_ref,
table_id
);
let _lock = self
.creator
.engine_inner
.table_mutex
.lock(table_ref.to_string())
.await;
let _lock = self.creator.engine_inner.table_mutex.lock(table_id).await;
self.creator.create_table().await?;
Ok(Status::Done)
@@ -209,14 +214,13 @@ impl<S: StorageEngine> TableCreator<S> {
self.data.request.id,
);
let table_ref = self.data.table_ref();
// It is possible that the procedure retries `CREATE TABLE` many times, so we
// return the table if it exists.
if let Some(table) = self.engine_inner.get_table(&table_ref) {
if let Some(table) = self.engine_inner.get_table(self.data.request.id) {
return Ok(table.clone());
}
logging::debug!("Creator create table {}", table_ref);
logging::debug!("Creator create table {}", self.data.table_ref());
self.create_regions(&table_dir).await?;
@@ -313,7 +317,6 @@ impl<S: StorageEngine> TableCreator<S> {
/// Writes metadata to the table manifest.
async fn write_table_manifest(&mut self, table_dir: &str) -> Result<TableRef> {
// Try to open the table first, as the table manifest might already exist.
let table_ref = self.data.table_ref();
if let Some((manifest, table_info)) = self
.engine_inner
.recover_table_manifest_and_info(&self.data.request.table_name, table_dir)
@@ -323,7 +326,7 @@ impl<S: StorageEngine> TableCreator<S> {
self.engine_inner
.tables
.insert(table_ref.to_string(), table.clone());
.insert(self.data.request.id, table.clone());
return Ok(table);
}
@@ -333,7 +336,7 @@ impl<S: StorageEngine> TableCreator<S> {
self.engine_inner
.tables
.insert(table_ref.to_string(), table.clone());
.insert(self.data.request.id, table.clone());
Ok(table)
}
@@ -432,13 +435,8 @@ mod tests {
.unwrap();
procedure_test_util::execute_procedure_until_done(&mut procedure).await;
let table_ref = TableReference {
catalog: &request.catalog_name,
schema: &request.schema_name,
table: &request.table_name,
};
assert!(table_engine
.get_table(&EngineContext::default(), &table_ref)
.get_table(&EngineContext::default(), request.id)
.unwrap()
.is_some());
}

View File

@@ -84,8 +84,7 @@ impl<S: StorageEngine> DropMitoTable<S> {
state: DropTableState::Prepare,
request,
};
let table_ref = data.table_ref();
let table = engine_inner.get_mito_table(&table_ref);
let table = engine_inner.get_mito_table(data.request.table_id);
Ok(DropMitoTable {
data,
@@ -115,8 +114,7 @@ impl<S: StorageEngine> DropMitoTable<S> {
/// Recover the procedure from json.
fn from_json(json: &str, engine_inner: Arc<MitoEngineInner<S>>) -> Result<Self> {
let data: DropTableData = serde_json::from_str(json).context(FromJsonSnafu)?;
let table_ref = data.table_ref();
let table = engine_inner.get_mito_table(&table_ref);
let table = engine_inner.get_mito_table(data.request.table_id);
Ok(DropMitoTable {
data,
@@ -182,6 +180,7 @@ mod tests {
procedure_test_util::execute_procedure_until_done(&mut procedure).await;
// Drop the table.
let table_id = request.id;
let request = test_util::new_drop_request();
let mut procedure = table_engine
.drop_table_procedure(&engine_ctx, request.clone())
@@ -189,13 +188,8 @@ mod tests {
procedure_test_util::execute_procedure_until_done(&mut procedure).await;
// The table is dropped.
let table_ref = TableReference {
catalog: &request.catalog_name,
schema: &request.schema_name,
table: &request.table_name,
};
assert!(table_engine
.get_table(&engine_ctx, &table_ref)
.get_table(&engine_ctx, table_id)
.unwrap()
.is_none());
}

View File

@@ -40,7 +40,7 @@ use table::Table;
use super::*;
use crate::table::test_util::{
self, new_insert_request, schema_for_test, setup_table, TestEngineComponents, TABLE_NAME,
self, new_insert_request, setup_table, TestEngineComponents, TABLE_NAME,
};
pub fn has_parquet_file(sst_dir: &str) -> bool {
@@ -537,11 +537,16 @@ fn test_region_id() {
assert_eq!(18446744069414584330, region_id(u32::MAX, 10));
}
fn new_add_columns_req(new_tag: &ColumnSchema, new_field: &ColumnSchema) -> AlterTableRequest {
fn new_add_columns_req(
table_id: TableId,
new_tag: &ColumnSchema,
new_field: &ColumnSchema,
) -> AlterTableRequest {
AlterTableRequest {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: TABLE_NAME.to_string(),
table_id,
alter_kind: AlterKind::AddColumns {
columns: vec![
AddColumnRequest {
@@ -560,6 +565,7 @@ fn new_add_columns_req(new_tag: &ColumnSchema, new_field: &ColumnSchema) -> Alte
}
pub(crate) fn new_add_columns_req_with_location(
table_id: TableId,
new_tag: &ColumnSchema,
new_field: &ColumnSchema,
) -> AlterTableRequest {
@@ -567,6 +573,7 @@ pub(crate) fn new_add_columns_req_with_location(
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: TABLE_NAME.to_string(),
table_id,
alter_kind: AlterKind::AddColumns {
columns: vec![
AddColumnRequest {
@@ -597,7 +604,7 @@ async fn test_alter_table_add_column() {
let new_tag = ColumnSchema::new("my_tag", ConcreteDataType::string_datatype(), true);
let new_field = ColumnSchema::new("my_field", ConcreteDataType::string_datatype(), true);
let req = new_add_columns_req(&new_tag, &new_field);
let req = new_add_columns_req(table.table_info().ident.table_id, &new_tag, &new_field);
let table = table_engine
.alter_table(&EngineContext::default(), req)
.await
@@ -633,7 +640,7 @@ async fn test_alter_table_add_column() {
ConcreteDataType::string_datatype(),
true,
);
let req = new_add_columns_req_with_location(&new_tag, &new_field);
let req = new_add_columns_req_with_location(new_info.ident.table_id, &new_tag, &new_field);
let table = table_engine
.alter_table(&EngineContext::default(), req)
.await
@@ -653,13 +660,13 @@ async fn test_alter_table_add_column() {
#[tokio::test]
async fn test_alter_table_remove_column() {
let (_engine, table_engine, _table, _object_store, _dir) =
let (_engine, table_engine, table, _object_store, _dir) =
test_util::setup_mock_engine_and_table().await;
// Add two columns to the table first.
let new_tag = ColumnSchema::new("my_tag", ConcreteDataType::string_datatype(), true);
let new_field = ColumnSchema::new("my_field", ConcreteDataType::string_datatype(), true);
let req = new_add_columns_req(&new_tag, &new_field);
let req = new_add_columns_req(table.table_info().ident.table_id, &new_tag, &new_field);
let table = table_engine
.alter_table(&EngineContext::default(), req)
.await
@@ -674,6 +681,7 @@ async fn test_alter_table_remove_column() {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: TABLE_NAME.to_string(),
table_id: table.table_info().ident.table_id,
alter_kind: AlterKind::DropColumns {
names: vec![String::from("memory"), String::from("my_field")],
},
@@ -706,45 +714,13 @@ async fn test_alter_rename_table() {
let TestEngineComponents {
table_engine,
storage_engine,
table_ref,
object_store,
dir: _dir,
..
} = test_util::setup_test_engine_and_table().await;
let ctx = EngineContext::default();
// register another table
let another_name = "another_table";
let req = CreateTableRequest {
id: 1024,
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: another_name.to_string(),
desc: Some("another test table".to_string()),
schema: RawSchema::from(&schema_for_test()),
region_numbers: vec![0],
primary_key_indices: vec![0],
create_if_not_exists: true,
table_options: TableOptions::default(),
engine: MITO_ENGINE.to_string(),
};
table_engine
.create_table(&ctx, req)
.await
.expect("create table must succeed");
// test renaming a table with an existing name.
let req = AlterTableRequest {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: TABLE_NAME.to_string(),
alter_kind: AlterKind::RenameTable {
new_table_name: another_name.to_string(),
},
};
let err = table_engine.alter_table(&ctx, req).await.err().unwrap();
assert!(
err.to_string().contains("Table already exists"),
"Unexpected error: {err}"
);
let table_id = table_ref.table_info().ident.table_id;
let new_table_name = "test_table";
// test rename table
@@ -752,6 +728,7 @@ async fn test_alter_rename_table() {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: TABLE_NAME.to_string(),
table_id,
alter_kind: AlterKind::RenameTable {
new_table_name: new_table_name.to_string(),
},
@@ -765,7 +742,7 @@ async fn test_alter_rename_table() {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: new_table_name.to_string(),
table_id: 1,
table_id,
region_numbers: vec![0],
};
@@ -794,17 +771,13 @@ async fn test_drop_table() {
let engine_ctx = EngineContext {};
let table_info = table.table_info();
let table_reference = TableReference {
catalog: DEFAULT_CATALOG_NAME,
schema: DEFAULT_SCHEMA_NAME,
table: &table_info.name,
};
let table_id = 1;
let create_table_request = CreateTableRequest {
id: 1,
id: table_id,
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_info.name.to_string(),
table_name: table_info.name.clone(),
schema: RawSchema::from(&*table_info.meta.schema),
create_if_not_exists: true,
desc: None,
@@ -819,23 +792,25 @@ async fn test_drop_table() {
.await
.unwrap();
assert_eq!(table_info, created_table.table_info());
assert!(table_engine.table_exists(&engine_ctx, &table_reference));
assert!(table_engine.table_exists(&engine_ctx, table_id));
let drop_table_request = DropTableRequest {
catalog_name: table_reference.catalog.to_string(),
schema_name: table_reference.schema.to_string(),
table_name: table_reference.table.to_string(),
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_info.name.clone(),
table_id,
};
let table_dropped = table_engine
.drop_table(&engine_ctx, drop_table_request)
.await
.unwrap();
assert!(table_dropped);
assert!(!table_engine.table_exists(&engine_ctx, &table_reference));
assert!(!table_engine.table_exists(&engine_ctx, table_id));
// should be able to re-create
let table_id = 2;
let request = CreateTableRequest {
id: 2,
id: table_id,
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_info.name.to_string(),
@@ -848,7 +823,7 @@ async fn test_drop_table() {
engine: MITO_ENGINE.to_string(),
};
table_engine.create_table(&ctx, request).await.unwrap();
assert!(table_engine.table_exists(&engine_ctx, &table_reference));
assert!(table_engine.table_exists(&engine_ctx, table_id));
}
#[tokio::test]

View File

@@ -28,7 +28,7 @@ use storage::compaction::noop::NoopCompactionScheduler;
use storage::config::EngineConfig as StorageEngineConfig;
use storage::EngineImpl;
use table::engine::{EngineContext, TableEngine};
use table::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder, TableType};
use table::metadata::{TableId, TableInfo, TableInfoBuilder, TableMetaBuilder, TableType};
use table::requests::{
AlterKind, AlterTableRequest, CreateTableRequest, DropTableRequest, InsertRequest, TableOptions,
};
@@ -39,6 +39,7 @@ use crate::engine::{MitoEngine, MITO_ENGINE};
pub use crate::table::test_util::mock_engine::{MockEngine, MockRegion};
pub const TABLE_NAME: &str = "demo";
pub const TABLE_ID: TableId = 1;
/// Create a InsertRequest with default catalog and schema.
pub fn new_insert_request(
@@ -107,7 +108,7 @@ pub async fn new_test_object_store(prefix: &str) -> (TempDir, ObjectStore) {
pub fn new_create_request(schema: SchemaRef) -> CreateTableRequest {
CreateTableRequest {
id: 1,
id: TABLE_ID,
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: TABLE_NAME.to_string(),
@@ -126,6 +127,7 @@ pub fn new_alter_request(alter_kind: AlterKind) -> AlterTableRequest {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: TABLE_NAME.to_string(),
table_id: TABLE_ID,
alter_kind,
}
}
@@ -135,6 +137,7 @@ pub fn new_drop_request() -> DropTableRequest {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: TABLE_NAME.to_string(),
table_id: TABLE_ID,
}
}

View File

@@ -332,6 +332,7 @@ fn parse_immutable_file_table_format(
Format::Csv(format) => Box::new(format),
Format::Json(format) => Box::new(format),
Format::Parquet(format) => Box::new(format),
Format::Orc(format) => Box::new(format),
},
)
}

View File

@@ -312,7 +312,7 @@ where
#[cfg(test)]
mod tests {
use std::sync::atomic::AtomicI32;
use std::sync::atomic::{AtomicBool, AtomicI32};
use std::time::Duration;
use store_api::storage::RegionId;
@@ -564,7 +564,9 @@ mod tests {
let task_scheduled = Arc::new(AtomicI32::new(0));
let task_scheduled_cloned = task_scheduled.clone();
common_runtime::spawn_write(async move {
let scheduling = Arc::new(AtomicBool::new(true));
let scheduling_clone = scheduling.clone();
let handle = common_runtime::spawn_write(async move {
for i in 0..10000 {
if let Ok(res) = scheduler_cloned.schedule(MockRequest {
region_id: i as RegionId,
@@ -573,12 +575,19 @@ mod tests {
task_scheduled_cloned.fetch_add(1, Ordering::Relaxed);
}
}
if !scheduling_clone.load(Ordering::Relaxed) {
break;
}
}
});
tokio::time::sleep(Duration::from_millis(1)).await;
scheduler.stop(true).await.unwrap();
scheduling.store(false, Ordering::Relaxed);
let finished = finished.load(Ordering::Relaxed);
handle.await.unwrap();
assert_eq!(finished, task_scheduled.load(Ordering::Relaxed));
}
}

View File

@@ -315,13 +315,14 @@ mod tests {
async fn test_alter_table_procedure_rename() {
let env = TestEnv::new("rename");
let table_name = "test_old";
env.create_table(table_name).await;
let table_id = env.create_table(table_name).await;
let new_table_name = "test_new";
let request = AlterTableRequest {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_name.to_string(),
table_id,
alter_kind: AlterKind::RenameTable {
new_table_name: new_table_name.to_string(),
},

View File

@@ -349,14 +349,9 @@ mod tests {
table_engine.clone(),
);
let table_ref = TableReference {
catalog: &request.catalog_name,
schema: &request.schema_name,
table: &request.table_name,
};
let engine_ctx = EngineContext::default();
assert!(table_engine
.get_table(&engine_ctx, &table_ref)
.get_table(&engine_ctx, request.id)
.unwrap()
.is_none());
@@ -365,7 +360,7 @@ mod tests {
watcher.changed().await.unwrap();
assert!(table_engine
.get_table(&engine_ctx, &table_ref)
.get_table(&engine_ctx, request.id)
.unwrap()
.is_some());
}
@@ -390,14 +385,9 @@ mod tests {
table_engine.clone(),
);
let table_ref = TableReference {
catalog: &request.catalog_name,
schema: &request.schema_name,
table: &request.table_name,
};
let engine_ctx = EngineContext::default();
assert!(table_engine
.get_table(&engine_ctx, &table_ref)
.get_table(&engine_ctx, request.id)
.unwrap()
.is_none());
@@ -452,7 +442,7 @@ mod tests {
// The table is created.
assert!(table_engine
.get_table(&engine_ctx, &table_ref)
.get_table(&engine_ctx, request.id)
.unwrap()
.is_some());
}

View File

@@ -277,12 +277,13 @@ mod tests {
async fn test_drop_table_procedure() {
let env = TestEnv::new("drop");
let table_name = "test_drop";
env.create_table(table_name).await;
let table_id = env.create_table(table_name).await;
let request = DropTableRequest {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_name.to_string(),
table_id,
};
let TestEnv {
dir: _dir,
@@ -305,13 +306,6 @@ mod tests {
let schema = catalog.schema(DEFAULT_SCHEMA_NAME).await.unwrap().unwrap();
assert!(schema.table(table_name).await.unwrap().is_none());
let ctx = EngineContext::default();
assert!(!table_engine.table_exists(
&ctx,
&TableReference {
catalog: DEFAULT_CATALOG_NAME,
schema: DEFAULT_SCHEMA_NAME,
table: table_name,
}
));
assert!(!table_engine.table_exists(&ctx, table_id,));
}
}

View File

@@ -32,6 +32,7 @@ use object_store::ObjectStore;
use storage::compaction::noop::NoopCompactionScheduler;
use storage::config::EngineConfig as StorageEngineConfig;
use storage::EngineImpl;
use table::metadata::TableId;
use table::requests::CreateTableRequest;
use crate::CreateTableProcedure;
@@ -92,8 +93,9 @@ impl TestEnv {
}
}
pub async fn create_table(&self, table_name: &str) {
pub async fn create_table(&self, table_name: &str) -> TableId {
let request = new_create_request(table_name);
let table_id = request.id;
let procedure = CreateTableProcedure::new(
request,
self.catalog_manager.clone(),
@@ -108,6 +110,8 @@ impl TestEnv {
.await
.unwrap();
watcher.changed().await.unwrap();
table_id
}
}

View File

@@ -108,14 +108,10 @@ pub trait TableEngine: Send + Sync {
) -> Result<TableRef>;
/// Returns the table by it's name.
fn get_table(
&self,
ctx: &EngineContext,
table_ref: &TableReference,
) -> Result<Option<TableRef>>;
fn get_table(&self, ctx: &EngineContext, table_id: TableId) -> Result<Option<TableRef>>;
/// Returns true when the given table is exists.
fn table_exists(&self, ctx: &EngineContext, table_ref: &TableReference) -> bool;
fn table_exists(&self, ctx: &EngineContext, table_id: TableId) -> bool;
/// Drops the given table. Return true if the table is dropped, or false if the table doesn't exist.
async fn drop_table(&self, ctx: &EngineContext, request: DropTableRequest) -> Result<bool>;

View File

@@ -162,6 +162,7 @@ pub struct AlterTableRequest {
pub catalog_name: String,
pub schema_name: String,
pub table_name: String,
pub table_id: TableId,
pub alter_kind: AlterKind,
}
@@ -200,6 +201,7 @@ pub struct DropTableRequest {
pub catalog_name: String,
pub schema_name: String,
pub table_name: String,
pub table_id: TableId,
}
impl DropTableRequest {
@@ -218,6 +220,7 @@ pub struct CloseTableRequest {
pub catalog_name: String,
pub schema_name: String,
pub table_name: String,
pub table_id: TableId,
/// Do nothing if region_numbers is empty
pub region_numbers: Vec<RegionNumber>,
/// flush regions

View File

@@ -14,15 +14,20 @@
use std::any::Any;
use std::fmt::{Debug, Formatter};
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use common_query::error as query_error;
use common_query::error::Result as QueryResult;
use common_query::physical_plan::{Partitioning, PhysicalPlan, PhysicalPlanRef};
use common_recordbatch::SendableRecordBatchStream;
use common_recordbatch::error::Result as RecordBatchResult;
use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream};
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use datafusion_physical_expr::PhysicalSortExpr;
use datatypes::schema::SchemaRef;
use futures::{Stream, StreamExt};
use snafu::OptionExt;
/// Adapt greptime's [SendableRecordBatchStream] to DataFusion's [PhysicalPlan].
@@ -30,6 +35,7 @@ pub struct StreamScanAdapter {
stream: Mutex<Option<SendableRecordBatchStream>>,
schema: SchemaRef,
output_ordering: Option<Vec<PhysicalSortExpr>>,
metric: ExecutionPlanMetricsSet,
}
impl Debug for StreamScanAdapter {
@@ -49,6 +55,7 @@ impl StreamScanAdapter {
stream: Mutex::new(Some(stream)),
schema,
output_ordering: None,
metric: ExecutionPlanMetricsSet::new(),
}
}
@@ -85,11 +92,46 @@ impl PhysicalPlan for StreamScanAdapter {
fn execute(
&self,
_partition: usize,
partition: usize,
_context: Arc<TaskContext>,
) -> QueryResult<SendableRecordBatchStream> {
let mut stream = self.stream.lock().unwrap();
stream.take().context(query_error::ExecuteRepeatedlySnafu)
let stream = stream.take().context(query_error::ExecuteRepeatedlySnafu)?;
let baseline_metric = BaselineMetrics::new(&self.metric, partition);
Ok(Box::pin(StreamWithMetricWrapper {
stream,
metric: baseline_metric,
}))
}
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metric.clone_inner())
}
}
pub struct StreamWithMetricWrapper {
stream: SendableRecordBatchStream,
metric: BaselineMetrics,
}
impl Stream for StreamWithMetricWrapper {
type Item = RecordBatchResult<RecordBatch>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
let _timer = this.metric.elapsed_compute().timer();
let poll = this.stream.poll_next_unpin(cx);
if let Poll::Ready(Option::Some(Result::Ok(record_batch))) = &poll {
this.metric.record_output(record_batch.num_rows());
}
poll
}
}
impl RecordBatchStream for StreamWithMetricWrapper {
fn schema(&self) -> SchemaRef {
self.stream.schema()
}
}

View File

@@ -19,7 +19,8 @@ use async_trait::async_trait;
use common_procedure::BoxedProcedure;
use tokio::sync::Mutex;
use crate::engine::{EngineContext, TableEngine, TableEngineProcedure, TableReference};
use crate::engine::{EngineContext, TableEngine, TableEngineProcedure};
use crate::metadata::TableId;
use crate::requests::{AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest};
use crate::test_util::EmptyTable;
use crate::{Result, TableRef};
@@ -86,11 +87,11 @@ impl TableEngine for MockTableEngine {
unimplemented!()
}
fn get_table(&self, _ctx: &EngineContext, _ref: &TableReference) -> Result<Option<TableRef>> {
fn get_table(&self, _ctx: &EngineContext, _table_id: TableId) -> Result<Option<TableRef>> {
unimplemented!()
}
fn table_exists(&self, _ctx: &EngineContext, _name: &TableReference) -> bool {
fn table_exists(&self, _ctx: &EngineContext, _table_id: TableId) -> bool {
unimplemented!()
}

View File

@@ -1227,6 +1227,45 @@ async fn test_execute_copy_from_s3(instance: Arc<dyn MockInstance>) {
}
}
#[apply(both_instances_cases)]
async fn test_execute_copy_from_orc(instance: Arc<dyn MockInstance>) {
logging::init_default_ut_logging();
let instance = instance.frontend();
// setups
execute_sql(
&instance,
"create table demo(double_a double, a float, b boolean, str_direct string, d string, e string, f string, int_short_repeated int, int_neg_short_repeated int, int_delta int, int_neg_delta int, int_direct int, int_neg_direct int, bigint_direct bigint, bigint_neg_direct bigint, bigint_other bigint, utf8_increase string, utf8_decrease string, timestamp_simple timestamp(9) time index, date_simple date);",
)
.await;
let filepath = get_data_dir("../src/common/datasource/tests/orc/test.orc")
.canonicalize()
.unwrap()
.display()
.to_string();
let output = execute_sql(
&instance,
&format!("copy demo from '{}' WITH(FORMAT='orc');", &filepath),
)
.await;
assert!(matches!(output, Output::AffectedRows(5)));
let output = execute_sql(&instance, "select * from demo order by double_a;").await;
let expected = r#"+----------+-----+-------+------------+-----+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+----------------------------+-------------+
| double_a | a | b | str_direct | d | e | f | int_short_repeated | int_neg_short_repeated | int_delta | int_neg_delta | int_direct | int_neg_direct | bigint_direct | bigint_neg_direct | bigint_other | utf8_increase | utf8_decrease | timestamp_simple | date_simple |
+----------+-----+-------+------------+-----+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+----------------------------+-------------+
| 1.0 | 1.0 | true | a | a | ddd | aaaaa | 5 | -5 | 1 | 5 | 1 | -1 | 1 | -1 | 5 | a | eeeee | 2023-04-01T20:15:30.002 | 2023-04-01 |
| 2.0 | 2.0 | false | cccccc | bb | cc | bbbbb | 5 | -5 | 2 | 4 | 6 | -6 | 6 | -6 | -5 | bb | dddd | 2021-08-22T07:26:44.525777 | 2023-03-01 |
| 3.0 | | | | | | | | | | | | | | | 1 | ccc | ccc | 2023-01-01T00:00:00 | 2023-01-01 |
| 4.0 | 4.0 | true | ddd | ccc | bb | ccccc | 5 | -5 | 4 | 2 | 3 | -3 | 3 | -3 | 5 | dddd | bb | 2023-02-01T00:00:00 | 2023-02-01 |
| 5.0 | 5.0 | false | ee | ddd | a | ddddd | 5 | -5 | 5 | 1 | 2 | -2 | 2 | -2 | 5 | eeeee | a | 2023-03-01T00:00:00 | 2023-03-01 |
+----------+-----+-------+------------+-----+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+----------------------------+-------------+"#;
check_output_stream(output, expected).await;
}
#[apply(both_instances_cases)]
async fn test_cast_type_issue_1594(instance: Arc<dyn MockInstance>) {
let instance = instance.frontend();