feat: save create table schema (#211)

* feat: save create table schema and respect user defined columns order when querying, close #179

* fix: address CR problems

* refactor: use with_context with ProjectedColumnNotFoundSnafu
This commit is contained in:
dennis zhuang
2022-08-26 19:22:55 +08:00
committed by GitHub
parent ad1bbc3817
commit 1caa94cd3e
11 changed files with 179 additions and 28 deletions

View File

@@ -97,11 +97,12 @@ async fn test_insert_and_select() {
let actual_columns = select_result.columns;
assert_eq!(4, actual_columns.len());
// Respect the order in create table schema
let expected_columns = vec![
expected_ts_col,
expected_host_col,
expected_cpu_col,
expected_mem_col,
expected_ts_col,
];
expected_columns
.iter()

View File

@@ -13,6 +13,7 @@ async fn make_test_app() -> (Router, TestGuard) {
let (opts, guard) = test_util::create_tmp_dir_and_datanode_opts();
let instance = Arc::new(Instance::new(&opts).await.unwrap());
instance.start().await.unwrap();
test_util::create_test_table(&instance).await.unwrap();
let http_server = HttpServer::new(instance);
(http_server.make_app(), guard)
}
@@ -42,6 +43,39 @@ async fn test_sql_api() {
body,
r#"{"success":true,"output":{"Rows":[{"schema":{"fields":[{"name":"number","data_type":"UInt32","is_nullable":false,"metadata":{}}],"metadata":{"greptime:version":"0"}},"columns":[[0,1,2,3,4,5,6,7,8,9]]}]}}"#
);
// test insert and select
let res = client
.get("/sql?sql=insert into demo values('host', 66.6, 1024, 0)")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
// select *
let res = client
.get("/sql?sql=select * from demo limit 10")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = res.text().await;
assert_eq!(
body,
r#"{"success":true,"output":{"Rows":[{"schema":{"fields":[{"name":"host","data_type":"LargeUtf8","is_nullable":false,"metadata":{}},{"name":"cpu","data_type":"Float64","is_nullable":true,"metadata":{}},{"name":"memory","data_type":"Float64","is_nullable":true,"metadata":{}},{"name":"ts","data_type":"Int64","is_nullable":true,"metadata":{}}],"metadata":{"greptime:timestamp_column":"ts","greptime:version":"0"}},"columns":[["host"],[66.6],[1024.0],[0]]}]}}"#
);
// select with projections
let res = client
.get("/sql?sql=select cpu, ts from demo limit 10")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = res.text().await;
assert_eq!(
body,
r#"{"success":true,"output":{"Rows":[{"schema":{"fields":[{"name":"cpu","data_type":"Float64","is_nullable":true,"metadata":{}},{"name":"ts","data_type":"Int64","is_nullable":true,"metadata":{}}],"metadata":{"greptime:timestamp_column":"ts","greptime:version":"0"}},"columns":[[66.6],[0]]}]}}"#
);
}
#[tokio::test]

View File

@@ -82,6 +82,19 @@ impl Schema {
.map(|index| &self.column_schemas[*index])
}
/// Retrieve the column's name by index
/// # Panics
/// This method **may** panic if the index is out of range of column schemas.
#[inline]
pub fn column_name_by_index(&self, idx: usize) -> &str {
&self.column_schemas[idx].name
}
#[inline]
pub fn column_index_by_name(&self, name: &str) -> Option<usize> {
self.name_to_index.get(name).copied()
}
#[inline]
pub fn num_columns(&self) -> usize {
self.column_schemas.len()

View File

@@ -6,7 +6,6 @@ use std::sync::Arc;
use async_trait::async_trait;
use common_telemetry::logging;
use datatypes::schema::SchemaRef;
use snafu::{ensure, ResultExt};
use store_api::logstore::LogStore;
use store_api::manifest::{
@@ -69,8 +68,8 @@ impl<S: LogStore> Region for RegionImpl<S> {
Ok(self.inner.create_snapshot())
}
fn write_request(&self, schema: SchemaRef) -> Self::WriteRequest {
WriteBatch::new(schema)
fn write_request(&self) -> Self::WriteRequest {
WriteBatch::new(self.in_memory_metadata().schema().clone())
}
}

View File

@@ -20,7 +20,6 @@
use async_trait::async_trait;
use common_error::ext::ErrorExt;
use datatypes::schema::SchemaRef;
use crate::storage::engine::OpenOptions;
use crate::storage::metadata::RegionMeta;
@@ -53,7 +52,7 @@ pub trait Region: Send + Sync + Clone + std::fmt::Debug + 'static {
fn snapshot(&self, ctx: &ReadContext) -> Result<Self::Snapshot, Self::Error>;
/// Create write request
fn write_request(&self, schema: SchemaRef) -> Self::WriteRequest;
fn write_request(&self) -> Self::WriteRequest;
}
/// Context for write operations.

View File

@@ -32,4 +32,5 @@ tokio = { version = "1.0", features = ["full"] }
[dev-dependencies]
datatypes = { path = "../datatypes" }
tempdir = { version = "0.3" }
tokio = { version = "1.18", features = ["full"] }

View File

@@ -9,8 +9,8 @@ use object_store::ObjectStore;
use snafu::{OptionExt, ResultExt};
use store_api::storage::{
self, ColumnDescriptorBuilder, ColumnFamilyDescriptor, ColumnFamilyDescriptorBuilder, ColumnId,
CreateOptions, OpenOptions, Region, RegionDescriptorBuilder, RegionId, RegionMeta,
RowKeyDescriptor, RowKeyDescriptorBuilder, StorageEngine,
CreateOptions, OpenOptions, RegionDescriptorBuilder, RegionId, RowKeyDescriptor,
RowKeyDescriptorBuilder, StorageEngine,
};
use table::engine::{EngineContext, TableEngine};
use table::requests::{AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest};
@@ -290,9 +290,8 @@ impl<S: StorageEngine> MitoEngineInner<S> {
.map_err(BoxedError::new)
.context(error::CreateRegionSnafu)?;
// Use region meta schema instead of request schema
let table_meta = TableMetaBuilder::default()
.schema(region.in_memory_metadata().schema().clone())
.schema(request.schema)
.engine(MITO_ENGINE)
.next_column_id(next_column_id)
.primary_key_indices(request.primary_key_indices.clone())
@@ -457,17 +456,50 @@ mod tests {
let arrow_schema = batches[0].schema.arrow_schema();
assert_eq!(arrow_schema.fields().len(), 4);
assert_eq!(arrow_schema.field(0).name(), "ts");
assert_eq!(arrow_schema.field(1).name(), "host");
assert_eq!(arrow_schema.field(2).name(), "cpu");
assert_eq!(arrow_schema.field(3).name(), "memory");
assert_eq!(arrow_schema.field(0).name(), "host");
assert_eq!(arrow_schema.field(1).name(), "cpu");
assert_eq!(arrow_schema.field(2).name(), "memory");
assert_eq!(arrow_schema.field(3).name(), "ts");
let columns = batches[0].df_recordbatch.columns();
assert_eq!(4, columns.len());
assert_eq!(hosts.to_arrow_array(), columns[0]);
assert_eq!(cpus.to_arrow_array(), columns[1]);
assert_eq!(memories.to_arrow_array(), columns[2]);
assert_eq!(tss.to_arrow_array(), columns[3]);
// Scan with projections: cpu and memory
let stream = table.scan(&Some(vec![1, 2]), &[], None).await.unwrap();
let batches = util::collect(stream).await.unwrap();
assert_eq!(1, batches.len());
assert_eq!(batches[0].df_recordbatch.num_columns(), 2);
let arrow_schema = batches[0].schema.arrow_schema();
assert_eq!(arrow_schema.fields().len(), 2);
assert_eq!(arrow_schema.field(0).name(), "cpu");
assert_eq!(arrow_schema.field(1).name(), "memory");
let columns = batches[0].df_recordbatch.columns();
assert_eq!(2, columns.len());
assert_eq!(cpus.to_arrow_array(), columns[0]);
assert_eq!(memories.to_arrow_array(), columns[1]);
// Scan with projections: only ts
let stream = table.scan(&Some(vec![3]), &[], None).await.unwrap();
let batches = util::collect(stream).await.unwrap();
assert_eq!(1, batches.len());
assert_eq!(batches[0].df_recordbatch.num_columns(), 1);
let arrow_schema = batches[0].schema.arrow_schema();
assert_eq!(arrow_schema.fields().len(), 1);
assert_eq!(arrow_schema.field(0).name(), "ts");
let columns = batches[0].df_recordbatch.columns();
assert_eq!(1, columns.len());
assert_eq!(tss.to_arrow_array(), columns[0]);
assert_eq!(hosts.to_arrow_array(), columns[1]);
assert_eq!(cpus.to_arrow_array(), columns[2]);
assert_eq!(memories.to_arrow_array(), columns[3]);
}
#[tokio::test]

View File

@@ -129,6 +129,15 @@ pub enum Error {
backtrace: Backtrace,
table_name: String,
},
#[snafu(display(
"Projected columnd not found in region, column: {}",
column_qualified_name
))]
ProjectedColumnNotFound {
backtrace: Backtrace,
column_qualified_name: String,
},
}
impl From<Error> for table::error::Error {
@@ -153,6 +162,7 @@ impl ErrorExt for Error {
| BuildTableInfo { .. }
| BuildRegionDescriptor { .. }
| TableExists { .. }
| ProjectedColumnNotFound { .. }
| MissingTimestampIndex { .. } => StatusCode::InvalidArguments,
TableInfoNotFound { .. } => StatusCode::Unexpected,

View File

@@ -16,8 +16,8 @@ use snafu::{OptionExt, ResultExt};
use store_api::manifest::action::ProtocolAction;
use store_api::manifest::{self, Manifest, ManifestVersion, MetaActionIterator};
use store_api::storage::{
ChunkReader, PutOperation, ReadContext, Region, ScanRequest, SchemaRef, Snapshot, WriteContext,
WriteRequest,
ChunkReader, PutOperation, ReadContext, Region, RegionMeta, ScanRequest, SchemaRef, Snapshot,
WriteContext, WriteRequest,
};
use table::error::{Error as TableError, MissingColumnSnafu, Result as TableResult};
use table::requests::InsertRequest;
@@ -27,7 +27,8 @@ use table::{
};
use crate::error::{
Result, ScanTableManifestSnafu, TableInfoNotFoundSnafu, UpdateTableManifestSnafu,
ProjectedColumnNotFoundSnafu, Result, ScanTableManifestSnafu, TableInfoNotFoundSnafu,
UpdateTableManifestSnafu,
};
use crate::manifest::action::*;
use crate::manifest::TableManifest;
@@ -60,7 +61,7 @@ impl<R: Region> Table for MitoTable<R> {
return Ok(0);
}
let mut write_request = self.region.write_request(self.schema());
let mut write_request = self.region.write_request();
let mut put_op = write_request.put_op();
let mut columns_values = request.columns_values;
@@ -112,8 +113,10 @@ impl<R: Region> Table for MitoTable<R> {
let read_ctx = ReadContext::default();
let snapshot = self.region.snapshot(&read_ctx).map_err(TableError::new)?;
let projection = self.transform_projection(&self.region, projection.clone())?;
let scan_request = ScanRequest {
projection: projection.clone(),
projection,
..Default::default()
};
let mut reader = snapshot
@@ -158,6 +161,11 @@ impl Stream for ChunkStream {
}
}
#[inline]
fn column_qualified_name(table_name: &str, region_name: &str, column_name: &str) -> String {
format!("{}.{}.{}", table_name, region_name, column_name)
}
impl<R: Region> MitoTable<R> {
fn new(table_info: TableInfo, region: R, manifest: TableManifest) -> Self {
Self {
@@ -167,6 +175,60 @@ impl<R: Region> MitoTable<R> {
}
}
/// Transform projection which is based on table schema
/// into projection based on region schema.
fn transform_projection(
&self,
region: &R,
projection: Option<Vec<usize>>,
) -> Result<Option<Vec<usize>>> {
let table_schema = &self.table_info.meta.schema;
let region_meta = region.in_memory_metadata();
let region_schema = region_meta.schema();
if projection.is_none() {
// In fact, datafusion always calls scan with not-none projection
// generated by table schema right now, but to prevent future compatibility
// issue, we process this case here.
let projection: Result<Vec<_>> = table_schema
.column_schemas()
.iter()
.map(|column_schema| &column_schema.name)
.map(|name| {
region_schema.column_index_by_name(name).with_context(|| {
ProjectedColumnNotFoundSnafu {
column_qualified_name: column_qualified_name(
&self.table_info.name,
region.name(),
name,
),
}
})
})
.collect();
return Some(projection).transpose();
}
projection
.map(|p| {
p.iter()
.map(|idx| table_schema.column_name_by_index(*idx))
.map(|name| {
region_schema.column_index_by_name(name).with_context(|| {
ProjectedColumnNotFoundSnafu {
column_qualified_name: column_qualified_name(
&self.table_info.name,
region.name(),
name,
),
}
})
})
.collect()
})
.transpose()
}
pub async fn create(
table_name: &str,
table_info: TableInfo,

View File

@@ -25,14 +25,14 @@ pub const TABLE_NAME: &str = "demo";
pub fn schema_for_test() -> Schema {
let column_schemas = vec![
ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), true),
ColumnSchema::new("host", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true),
ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true),
ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), true),
];
SchemaBuilder::from(column_schemas)
.timestamp_index(0)
.timestamp_index(3)
.build()
.expect("ts must be timestamp column")
}

View File

@@ -10,8 +10,8 @@ use storage::metadata::{RegionMetaImpl, RegionMetadataRef};
use storage::write_batch::WriteBatch;
use store_api::storage::{
Chunk, ChunkReader, CreateOptions, EngineContext, GetRequest, GetResponse, OpenOptions,
ReadContext, Region, RegionDescriptor, ScanRequest, ScanResponse, SchemaRef, Snapshot,
StorageEngine, WriteContext, WriteResponse,
ReadContext, Region, RegionDescriptor, RegionMeta, ScanRequest, ScanResponse, SchemaRef,
Snapshot, StorageEngine, WriteContext, WriteResponse,
};
pub type Result<T> = std::result::Result<T, MockError>;
@@ -98,8 +98,8 @@ impl Region for MockRegion {
})
}
fn write_request(&self, schema: SchemaRef) -> WriteBatch {
WriteBatch::new(schema)
fn write_request(&self) -> WriteBatch {
WriteBatch::new(self.in_memory_metadata().schema().clone())
}
}