From 1caa94cd3e834673ce0b4d28c1848c1d52b62eed Mon Sep 17 00:00:00 2001 From: dennis zhuang Date: Fri, 26 Aug 2022 19:22:55 +0800 Subject: [PATCH] feat: save create table schema (#211) * feat: save create table schema and respect user defined columns order when querying, close #179 * fix: address CR problems * refactor: use with_context with ProjectedColumnNotFoundSnafu --- src/datanode/src/tests/grpc_test.rs | 3 +- src/datanode/src/tests/http_test.rs | 34 +++++++++ src/datatypes/src/schema.rs | 13 ++++ src/storage/src/region.rs | 5 +- src/store-api/src/storage/region.rs | 3 +- src/table-engine/Cargo.toml | 1 + src/table-engine/src/engine.rs | 54 +++++++++++--- src/table-engine/src/error.rs | 10 +++ src/table-engine/src/table.rs | 72 +++++++++++++++++-- src/table-engine/src/table/test_util.rs | 4 +- .../src/table/test_util/mock_engine.rs | 8 +-- 11 files changed, 179 insertions(+), 28 deletions(-) diff --git a/src/datanode/src/tests/grpc_test.rs b/src/datanode/src/tests/grpc_test.rs index 0b91fa3ef8..83c9ca6193 100644 --- a/src/datanode/src/tests/grpc_test.rs +++ b/src/datanode/src/tests/grpc_test.rs @@ -97,11 +97,12 @@ async fn test_insert_and_select() { let actual_columns = select_result.columns; assert_eq!(4, actual_columns.len()); + // Respect the order in create table schema let expected_columns = vec![ - expected_ts_col, expected_host_col, expected_cpu_col, expected_mem_col, + expected_ts_col, ]; expected_columns .iter() diff --git a/src/datanode/src/tests/http_test.rs b/src/datanode/src/tests/http_test.rs index 2a6c25e43b..e4e0d76171 100644 --- a/src/datanode/src/tests/http_test.rs +++ b/src/datanode/src/tests/http_test.rs @@ -13,6 +13,7 @@ async fn make_test_app() -> (Router, TestGuard) { let (opts, guard) = test_util::create_tmp_dir_and_datanode_opts(); let instance = Arc::new(Instance::new(&opts).await.unwrap()); instance.start().await.unwrap(); + test_util::create_test_table(&instance).await.unwrap(); let http_server = HttpServer::new(instance); (http_server.make_app(), guard) } @@ -42,6 +43,39 @@ async fn test_sql_api() { body, r#"{"success":true,"output":{"Rows":[{"schema":{"fields":[{"name":"number","data_type":"UInt32","is_nullable":false,"metadata":{}}],"metadata":{"greptime:version":"0"}},"columns":[[0,1,2,3,4,5,6,7,8,9]]}]}}"# ); + + // test insert and select + let res = client + .get("/sql?sql=insert into demo values('host', 66.6, 1024, 0)") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + + // select * + let res = client + .get("/sql?sql=select * from demo limit 10") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + + let body = res.text().await; + assert_eq!( + body, + r#"{"success":true,"output":{"Rows":[{"schema":{"fields":[{"name":"host","data_type":"LargeUtf8","is_nullable":false,"metadata":{}},{"name":"cpu","data_type":"Float64","is_nullable":true,"metadata":{}},{"name":"memory","data_type":"Float64","is_nullable":true,"metadata":{}},{"name":"ts","data_type":"Int64","is_nullable":true,"metadata":{}}],"metadata":{"greptime:timestamp_column":"ts","greptime:version":"0"}},"columns":[["host"],[66.6],[1024.0],[0]]}]}}"# + ); + + // select with projections + let res = client + .get("/sql?sql=select cpu, ts from demo limit 10") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + + let body = res.text().await; + assert_eq!( + body, + r#"{"success":true,"output":{"Rows":[{"schema":{"fields":[{"name":"cpu","data_type":"Float64","is_nullable":true,"metadata":{}},{"name":"ts","data_type":"Int64","is_nullable":true,"metadata":{}}],"metadata":{"greptime:timestamp_column":"ts","greptime:version":"0"}},"columns":[[66.6],[0]]}]}}"# + ); } #[tokio::test] diff --git a/src/datatypes/src/schema.rs b/src/datatypes/src/schema.rs index 4af64ec29f..4179940ab2 100644 --- a/src/datatypes/src/schema.rs +++ b/src/datatypes/src/schema.rs @@ -82,6 +82,19 @@ impl Schema { .map(|index| &self.column_schemas[*index]) } + /// Retrieve the column's name by index + /// # Panics + /// This method **may** panic if the index is out of range of column schemas. + #[inline] + pub fn column_name_by_index(&self, idx: usize) -> &str { + &self.column_schemas[idx].name + } + + #[inline] + pub fn column_index_by_name(&self, name: &str) -> Option { + self.name_to_index.get(name).copied() + } + #[inline] pub fn num_columns(&self) -> usize { self.column_schemas.len() diff --git a/src/storage/src/region.rs b/src/storage/src/region.rs index fce78bbf79..d045c38f64 100644 --- a/src/storage/src/region.rs +++ b/src/storage/src/region.rs @@ -6,7 +6,6 @@ use std::sync::Arc; use async_trait::async_trait; use common_telemetry::logging; -use datatypes::schema::SchemaRef; use snafu::{ensure, ResultExt}; use store_api::logstore::LogStore; use store_api::manifest::{ @@ -69,8 +68,8 @@ impl Region for RegionImpl { Ok(self.inner.create_snapshot()) } - fn write_request(&self, schema: SchemaRef) -> Self::WriteRequest { - WriteBatch::new(schema) + fn write_request(&self) -> Self::WriteRequest { + WriteBatch::new(self.in_memory_metadata().schema().clone()) } } diff --git a/src/store-api/src/storage/region.rs b/src/store-api/src/storage/region.rs index dc4a3e0f09..5559269d9b 100644 --- a/src/store-api/src/storage/region.rs +++ b/src/store-api/src/storage/region.rs @@ -20,7 +20,6 @@ use async_trait::async_trait; use common_error::ext::ErrorExt; -use datatypes::schema::SchemaRef; use crate::storage::engine::OpenOptions; use crate::storage::metadata::RegionMeta; @@ -53,7 +52,7 @@ pub trait Region: Send + Sync + Clone + std::fmt::Debug + 'static { fn snapshot(&self, ctx: &ReadContext) -> Result; /// Create write request - fn write_request(&self, schema: SchemaRef) -> Self::WriteRequest; + fn write_request(&self) -> Self::WriteRequest; } /// Context for write operations. diff --git a/src/table-engine/Cargo.toml b/src/table-engine/Cargo.toml index 52c5bbf87b..3d7b969db0 100644 --- a/src/table-engine/Cargo.toml +++ b/src/table-engine/Cargo.toml @@ -32,4 +32,5 @@ tokio = { version = "1.0", features = ["full"] } [dev-dependencies] datatypes = { path = "../datatypes" } +tempdir = { version = "0.3" } tokio = { version = "1.18", features = ["full"] } diff --git a/src/table-engine/src/engine.rs b/src/table-engine/src/engine.rs index bf6f84b651..ce041f5fd7 100644 --- a/src/table-engine/src/engine.rs +++ b/src/table-engine/src/engine.rs @@ -9,8 +9,8 @@ use object_store::ObjectStore; use snafu::{OptionExt, ResultExt}; use store_api::storage::{ self, ColumnDescriptorBuilder, ColumnFamilyDescriptor, ColumnFamilyDescriptorBuilder, ColumnId, - CreateOptions, OpenOptions, Region, RegionDescriptorBuilder, RegionId, RegionMeta, - RowKeyDescriptor, RowKeyDescriptorBuilder, StorageEngine, + CreateOptions, OpenOptions, RegionDescriptorBuilder, RegionId, RowKeyDescriptor, + RowKeyDescriptorBuilder, StorageEngine, }; use table::engine::{EngineContext, TableEngine}; use table::requests::{AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest}; @@ -290,9 +290,8 @@ impl MitoEngineInner { .map_err(BoxedError::new) .context(error::CreateRegionSnafu)?; - // Use region meta schema instead of request schema let table_meta = TableMetaBuilder::default() - .schema(region.in_memory_metadata().schema().clone()) + .schema(request.schema) .engine(MITO_ENGINE) .next_column_id(next_column_id) .primary_key_indices(request.primary_key_indices.clone()) @@ -457,17 +456,50 @@ mod tests { let arrow_schema = batches[0].schema.arrow_schema(); assert_eq!(arrow_schema.fields().len(), 4); - assert_eq!(arrow_schema.field(0).name(), "ts"); - assert_eq!(arrow_schema.field(1).name(), "host"); - assert_eq!(arrow_schema.field(2).name(), "cpu"); - assert_eq!(arrow_schema.field(3).name(), "memory"); + + assert_eq!(arrow_schema.field(0).name(), "host"); + assert_eq!(arrow_schema.field(1).name(), "cpu"); + assert_eq!(arrow_schema.field(2).name(), "memory"); + assert_eq!(arrow_schema.field(3).name(), "ts"); let columns = batches[0].df_recordbatch.columns(); assert_eq!(4, columns.len()); + assert_eq!(hosts.to_arrow_array(), columns[0]); + assert_eq!(cpus.to_arrow_array(), columns[1]); + assert_eq!(memories.to_arrow_array(), columns[2]); + assert_eq!(tss.to_arrow_array(), columns[3]); + + // Scan with projections: cpu and memory + let stream = table.scan(&Some(vec![1, 2]), &[], None).await.unwrap(); + let batches = util::collect(stream).await.unwrap(); + assert_eq!(1, batches.len()); + assert_eq!(batches[0].df_recordbatch.num_columns(), 2); + + let arrow_schema = batches[0].schema.arrow_schema(); + assert_eq!(arrow_schema.fields().len(), 2); + + assert_eq!(arrow_schema.field(0).name(), "cpu"); + assert_eq!(arrow_schema.field(1).name(), "memory"); + + let columns = batches[0].df_recordbatch.columns(); + assert_eq!(2, columns.len()); + assert_eq!(cpus.to_arrow_array(), columns[0]); + assert_eq!(memories.to_arrow_array(), columns[1]); + + // Scan with projections: only ts + let stream = table.scan(&Some(vec![3]), &[], None).await.unwrap(); + let batches = util::collect(stream).await.unwrap(); + assert_eq!(1, batches.len()); + assert_eq!(batches[0].df_recordbatch.num_columns(), 1); + + let arrow_schema = batches[0].schema.arrow_schema(); + assert_eq!(arrow_schema.fields().len(), 1); + + assert_eq!(arrow_schema.field(0).name(), "ts"); + + let columns = batches[0].df_recordbatch.columns(); + assert_eq!(1, columns.len()); assert_eq!(tss.to_arrow_array(), columns[0]); - assert_eq!(hosts.to_arrow_array(), columns[1]); - assert_eq!(cpus.to_arrow_array(), columns[2]); - assert_eq!(memories.to_arrow_array(), columns[3]); } #[tokio::test] diff --git a/src/table-engine/src/error.rs b/src/table-engine/src/error.rs index d14f5c2691..e9da912887 100644 --- a/src/table-engine/src/error.rs +++ b/src/table-engine/src/error.rs @@ -129,6 +129,15 @@ pub enum Error { backtrace: Backtrace, table_name: String, }, + + #[snafu(display( + "Projected columnd not found in region, column: {}", + column_qualified_name + ))] + ProjectedColumnNotFound { + backtrace: Backtrace, + column_qualified_name: String, + }, } impl From for table::error::Error { @@ -153,6 +162,7 @@ impl ErrorExt for Error { | BuildTableInfo { .. } | BuildRegionDescriptor { .. } | TableExists { .. } + | ProjectedColumnNotFound { .. } | MissingTimestampIndex { .. } => StatusCode::InvalidArguments, TableInfoNotFound { .. } => StatusCode::Unexpected, diff --git a/src/table-engine/src/table.rs b/src/table-engine/src/table.rs index de7b649dfd..e32a0a5839 100644 --- a/src/table-engine/src/table.rs +++ b/src/table-engine/src/table.rs @@ -16,8 +16,8 @@ use snafu::{OptionExt, ResultExt}; use store_api::manifest::action::ProtocolAction; use store_api::manifest::{self, Manifest, ManifestVersion, MetaActionIterator}; use store_api::storage::{ - ChunkReader, PutOperation, ReadContext, Region, ScanRequest, SchemaRef, Snapshot, WriteContext, - WriteRequest, + ChunkReader, PutOperation, ReadContext, Region, RegionMeta, ScanRequest, SchemaRef, Snapshot, + WriteContext, WriteRequest, }; use table::error::{Error as TableError, MissingColumnSnafu, Result as TableResult}; use table::requests::InsertRequest; @@ -27,7 +27,8 @@ use table::{ }; use crate::error::{ - Result, ScanTableManifestSnafu, TableInfoNotFoundSnafu, UpdateTableManifestSnafu, + ProjectedColumnNotFoundSnafu, Result, ScanTableManifestSnafu, TableInfoNotFoundSnafu, + UpdateTableManifestSnafu, }; use crate::manifest::action::*; use crate::manifest::TableManifest; @@ -60,7 +61,7 @@ impl Table for MitoTable { return Ok(0); } - let mut write_request = self.region.write_request(self.schema()); + let mut write_request = self.region.write_request(); let mut put_op = write_request.put_op(); let mut columns_values = request.columns_values; @@ -112,8 +113,10 @@ impl Table for MitoTable { let read_ctx = ReadContext::default(); let snapshot = self.region.snapshot(&read_ctx).map_err(TableError::new)?; + let projection = self.transform_projection(&self.region, projection.clone())?; + let scan_request = ScanRequest { - projection: projection.clone(), + projection, ..Default::default() }; let mut reader = snapshot @@ -158,6 +161,11 @@ impl Stream for ChunkStream { } } +#[inline] +fn column_qualified_name(table_name: &str, region_name: &str, column_name: &str) -> String { + format!("{}.{}.{}", table_name, region_name, column_name) +} + impl MitoTable { fn new(table_info: TableInfo, region: R, manifest: TableManifest) -> Self { Self { @@ -167,6 +175,60 @@ impl MitoTable { } } + /// Transform projection which is based on table schema + /// into projection based on region schema. + fn transform_projection( + &self, + region: &R, + projection: Option>, + ) -> Result>> { + let table_schema = &self.table_info.meta.schema; + let region_meta = region.in_memory_metadata(); + let region_schema = region_meta.schema(); + + if projection.is_none() { + // In fact, datafusion always calls scan with not-none projection + // generated by table schema right now, but to prevent future compatibility + // issue, we process this case here. + let projection: Result> = table_schema + .column_schemas() + .iter() + .map(|column_schema| &column_schema.name) + .map(|name| { + region_schema.column_index_by_name(name).with_context(|| { + ProjectedColumnNotFoundSnafu { + column_qualified_name: column_qualified_name( + &self.table_info.name, + region.name(), + name, + ), + } + }) + }) + .collect(); + return Some(projection).transpose(); + } + + projection + .map(|p| { + p.iter() + .map(|idx| table_schema.column_name_by_index(*idx)) + .map(|name| { + region_schema.column_index_by_name(name).with_context(|| { + ProjectedColumnNotFoundSnafu { + column_qualified_name: column_qualified_name( + &self.table_info.name, + region.name(), + name, + ), + } + }) + }) + .collect() + }) + .transpose() + } + pub async fn create( table_name: &str, table_info: TableInfo, diff --git a/src/table-engine/src/table/test_util.rs b/src/table-engine/src/table/test_util.rs index fa68020e5c..2bb4b50295 100644 --- a/src/table-engine/src/table/test_util.rs +++ b/src/table-engine/src/table/test_util.rs @@ -25,14 +25,14 @@ pub const TABLE_NAME: &str = "demo"; pub fn schema_for_test() -> Schema { let column_schemas = vec![ - ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), true), ColumnSchema::new("host", ConcreteDataType::string_datatype(), false), ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true), ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true), + ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), true), ]; SchemaBuilder::from(column_schemas) - .timestamp_index(0) + .timestamp_index(3) .build() .expect("ts must be timestamp column") } diff --git a/src/table-engine/src/table/test_util/mock_engine.rs b/src/table-engine/src/table/test_util/mock_engine.rs index a18b1390ab..da2dc1fea8 100644 --- a/src/table-engine/src/table/test_util/mock_engine.rs +++ b/src/table-engine/src/table/test_util/mock_engine.rs @@ -10,8 +10,8 @@ use storage::metadata::{RegionMetaImpl, RegionMetadataRef}; use storage::write_batch::WriteBatch; use store_api::storage::{ Chunk, ChunkReader, CreateOptions, EngineContext, GetRequest, GetResponse, OpenOptions, - ReadContext, Region, RegionDescriptor, ScanRequest, ScanResponse, SchemaRef, Snapshot, - StorageEngine, WriteContext, WriteResponse, + ReadContext, Region, RegionDescriptor, RegionMeta, ScanRequest, ScanResponse, SchemaRef, + Snapshot, StorageEngine, WriteContext, WriteResponse, }; pub type Result = std::result::Result; @@ -98,8 +98,8 @@ impl Region for MockRegion { }) } - fn write_request(&self, schema: SchemaRef) -> WriteBatch { - WriteBatch::new(schema) + fn write_request(&self) -> WriteBatch { + WriteBatch::new(self.in_memory_metadata().schema().clone()) } }