From 4b644aa4824df5132b5e8b1cef873d2ac85abc0d Mon Sep 17 00:00:00 2001 From: Yingwen Date: Tue, 13 Dec 2022 15:53:55 +0800 Subject: [PATCH] fix: Fix compiler errors in catalog and mito crates (#742) * fix: Fix compiler errors in mito * fix: Fix compiler errors in catalog crate * style: Fix clippy * chore: Fix use --- src/catalog/src/error.rs | 23 ++- src/catalog/src/helper.rs | 2 +- src/catalog/src/local/manager.rs | 33 ++-- src/catalog/src/system.rs | 21 +-- src/catalog/src/tables.rs | 94 ++++++----- src/common/query/src/physical_plan.rs | 2 +- src/mito/src/engine.rs | 164 ++++++++++---------- src/mito/src/manifest/action.rs | 6 +- src/mito/src/table.rs | 5 +- src/mito/src/table/test_util/mock_engine.rs | 9 +- 10 files changed, 176 insertions(+), 183 deletions(-) diff --git a/src/catalog/src/error.rs b/src/catalog/src/error.rs index 05e6944cd5..f344ae3bb8 100644 --- a/src/catalog/src/error.rs +++ b/src/catalog/src/error.rs @@ -17,7 +17,7 @@ use std::any::Any; use common_error::ext::{BoxedError, ErrorExt}; use common_error::prelude::{Snafu, StatusCode}; use datafusion::error::DataFusionError; -use datatypes::arrow; +use datatypes::prelude::ConcreteDataType; use datatypes::schema::RawSchema; use snafu::{Backtrace, ErrorCompat}; @@ -51,14 +51,12 @@ pub enum Error { SystemCatalog { msg: String, backtrace: Backtrace }, #[snafu(display( - "System catalog table type mismatch, expected: binary, found: {:?} source: {}", + "System catalog table type mismatch, expected: binary, found: {:?}", data_type, - source ))] SystemCatalogTypeMismatch { - data_type: arrow::datatypes::DataType, - #[snafu(backtrace)] - source: datatypes::error::Error, + data_type: ConcreteDataType, + backtrace: Backtrace, }, #[snafu(display("Invalid system catalog entry type: {:?}", entry_type))] @@ -222,10 +220,11 @@ impl ErrorExt for Error { | Error::ValueDeserialize { .. } | Error::Io { .. } => StatusCode::StorageUnavailable, - Error::RegisterTable { .. } => StatusCode::Internal, + Error::RegisterTable { .. } | Error::SystemCatalogTypeMismatch { .. } => { + StatusCode::Internal + } Error::ReadSystemCatalog { source, .. } => source.status_code(), - Error::SystemCatalogTypeMismatch { source, .. } => source.status_code(), Error::InvalidCatalogValue { source, .. } => source.status_code(), Error::TableExists { .. } => StatusCode::TableAlreadyExists, @@ -265,7 +264,6 @@ impl From for DataFusionError { #[cfg(test)] mod tests { use common_error::mock::MockError; - use datatypes::arrow::datatypes::DataType; use snafu::GenerateImplicitData; use super::*; @@ -314,11 +312,8 @@ mod tests { assert_eq!( StatusCode::Internal, Error::SystemCatalogTypeMismatch { - data_type: DataType::Boolean, - source: datatypes::error::Error::UnsupportedArrowType { - arrow_type: DataType::Boolean, - backtrace: Backtrace::generate() - } + data_type: ConcreteDataType::binary_datatype(), + backtrace: Backtrace::generate(), } .status_code() ); diff --git a/src/catalog/src/helper.rs b/src/catalog/src/helper.rs index 2caf098865..062d07bc19 100644 --- a/src/catalog/src/helper.rs +++ b/src/catalog/src/helper.rs @@ -138,7 +138,7 @@ impl TableGlobalKey { /// Table global info contains necessary info for a datanode to create table regions, including /// table id, table meta(schema...), region id allocation across datanodes. -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct TableGlobalValue { /// Id of datanode that created the global table info kv. only for debugging. pub node_id: u64, diff --git a/src/catalog/src/local/manager.rs b/src/catalog/src/local/manager.rs index d09411cbaa..e4c89933e0 100644 --- a/src/catalog/src/local/manager.rs +++ b/src/catalog/src/local/manager.rs @@ -145,27 +145,34 @@ impl LocalCatalogManager { /// Convert `RecordBatch` to a vector of `Entry`. fn record_batch_to_entry(rb: RecordBatch) -> Result> { ensure!( - rb.df_recordbatch.columns().len() >= 6, + rb.num_columns() >= 6, SystemCatalogSnafu { - msg: format!("Length mismatch: {}", rb.df_recordbatch.columns().len()) + msg: format!("Length mismatch: {}", rb.num_columns()) } ); - let entry_type = UInt8Vector::try_from_arrow_array(&rb.df_recordbatch.columns()[0]) - .with_context(|_| SystemCatalogTypeMismatchSnafu { - data_type: rb.df_recordbatch.columns()[ENTRY_TYPE_INDEX] - .data_type() - .clone(), + let entry_type = rb + .column(ENTRY_TYPE_INDEX) + .as_any() + .downcast_ref::() + .with_context(|| SystemCatalogTypeMismatchSnafu { + data_type: rb.column(ENTRY_TYPE_INDEX).data_type(), })?; - let key = BinaryVector::try_from_arrow_array(&rb.df_recordbatch.columns()[1]) - .with_context(|_| SystemCatalogTypeMismatchSnafu { - data_type: rb.df_recordbatch.columns()[KEY_INDEX].data_type().clone(), + let key = rb + .column(KEY_INDEX) + .as_any() + .downcast_ref::() + .with_context(|| SystemCatalogTypeMismatchSnafu { + data_type: rb.column(KEY_INDEX).data_type(), })?; - let value = BinaryVector::try_from_arrow_array(&rb.df_recordbatch.columns()[3]) - .with_context(|_| SystemCatalogTypeMismatchSnafu { - data_type: rb.df_recordbatch.columns()[VALUE_INDEX].data_type().clone(), + let value = rb + .column(VALUE_INDEX) + .as_any() + .downcast_ref::() + .with_context(|| SystemCatalogTypeMismatchSnafu { + data_type: rb.column(VALUE_INDEX).data_type(), })?; let mut res = Vec::with_capacity(rb.num_rows()); diff --git a/src/catalog/src/system.rs b/src/catalog/src/system.rs index 50bb715fc3..960be1fa24 100644 --- a/src/catalog/src/system.rs +++ b/src/catalog/src/system.rs @@ -21,14 +21,13 @@ use common_catalog::consts::{ SYSTEM_CATALOG_TABLE_ID, SYSTEM_CATALOG_TABLE_NAME, }; use common_query::logical_plan::Expr; -use common_query::physical_plan::{PhysicalPlanRef, RuntimeEnv}; +use common_query::physical_plan::{PhysicalPlanRef, SessionContext}; use common_recordbatch::SendableRecordBatchStream; use common_telemetry::debug; -use common_time::timestamp::Timestamp; use common_time::util; use datatypes::prelude::{ConcreteDataType, ScalarVector}; use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder, SchemaRef}; -use datatypes::vectors::{BinaryVector, TimestampVector, UInt8Vector}; +use datatypes::vectors::{BinaryVector, TimestampMillisecondVector, UInt8Vector}; use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt, ResultExt}; use table::engine::{EngineContext, TableEngineRef}; @@ -127,13 +126,14 @@ impl SystemCatalogTable { /// Create a stream of all entries inside system catalog table pub async fn records(&self) -> Result { let full_projection = None; + let ctx = SessionContext::new(); let scan = self .table .scan(&full_projection, &[], None) .await .context(error::SystemCatalogTableScanSnafu)?; let stream = scan - .execute(0, Arc::new(RuntimeEnv::default())) + .execute(0, ctx.task_ctx()) .context(error::SystemCatalogTableScanExecSnafu)?; Ok(stream) } @@ -222,9 +222,7 @@ pub fn build_insert_request(entry_type: EntryType, key: &[u8], value: &[u8]) -> // Timestamp in key part is intentionally left to 0 columns_values.insert( "timestamp".to_string(), - Arc::new(TimestampVector::from_slice(&[Timestamp::new_millisecond( - 0, - )])) as _, + Arc::new(TimestampMillisecondVector::from_slice(&[0])) as _, ); columns_values.insert( @@ -232,18 +230,15 @@ pub fn build_insert_request(entry_type: EntryType, key: &[u8], value: &[u8]) -> Arc::new(BinaryVector::from_slice(&[value])) as _, ); + let now = util::current_time_millis(); columns_values.insert( "gmt_created".to_string(), - Arc::new(TimestampVector::from_slice(&[Timestamp::new_millisecond( - util::current_time_millis(), - )])) as _, + Arc::new(TimestampMillisecondVector::from_slice(&[now])) as _, ); columns_values.insert( "gmt_modified".to_string(), - Arc::new(TimestampVector::from_slice(&[Timestamp::new_millisecond( - util::current_time_millis(), - )])) as _, + Arc::new(TimestampMillisecondVector::from_slice(&[now])) as _, ); InsertRequest { diff --git a/src/catalog/src/tables.rs b/src/catalog/src/tables.rs index b11fc870de..8dd59fb1bf 100644 --- a/src/catalog/src/tables.rs +++ b/src/catalog/src/tables.rs @@ -26,9 +26,9 @@ use common_query::logical_plan::Expr; use common_query::physical_plan::PhysicalPlanRef; use common_recordbatch::error::Result as RecordBatchResult; use common_recordbatch::{RecordBatch, RecordBatchStream}; -use datatypes::prelude::{ConcreteDataType, VectorBuilder}; +use datatypes::prelude::{ConcreteDataType, DataType}; use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; -use datatypes::value::Value; +use datatypes::value::ValueRef; use datatypes::vectors::VectorRef; use futures::Stream; use snafu::ResultExt; @@ -149,26 +149,33 @@ fn tables_to_record_batch( engine: &str, ) -> Vec { let mut catalog_vec = - VectorBuilder::with_capacity(ConcreteDataType::string_datatype(), table_names.len()); + ConcreteDataType::string_datatype().create_mutable_vector(table_names.len()); let mut schema_vec = - VectorBuilder::with_capacity(ConcreteDataType::string_datatype(), table_names.len()); + ConcreteDataType::string_datatype().create_mutable_vector(table_names.len()); let mut table_name_vec = - VectorBuilder::with_capacity(ConcreteDataType::string_datatype(), table_names.len()); + ConcreteDataType::string_datatype().create_mutable_vector(table_names.len()); let mut engine_vec = - VectorBuilder::with_capacity(ConcreteDataType::string_datatype(), table_names.len()); + ConcreteDataType::string_datatype().create_mutable_vector(table_names.len()); for table_name in table_names { - catalog_vec.push(&Value::String(catalog_name.into())); - schema_vec.push(&Value::String(schema_name.into())); - table_name_vec.push(&Value::String(table_name.into())); - engine_vec.push(&Value::String(engine.into())); + // Safety: All these vectors are string type. + catalog_vec + .push_value_ref(ValueRef::String(catalog_name)) + .unwrap(); + schema_vec + .push_value_ref(ValueRef::String(schema_name)) + .unwrap(); + table_name_vec + .push_value_ref(ValueRef::String(&table_name)) + .unwrap(); + engine_vec.push_value_ref(ValueRef::String(engine)).unwrap(); } vec![ - catalog_vec.finish(), - schema_vec.finish(), - table_name_vec.finish(), - engine_vec.finish(), + catalog_vec.to_vector(), + schema_vec.to_vector(), + table_name_vec.to_vector(), + engine_vec.to_vector(), ] } @@ -340,9 +347,7 @@ fn build_schema_for_tables() -> Schema { #[cfg(test)] mod tests { use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; - use common_query::physical_plan::RuntimeEnv; - use datatypes::arrow::array::Utf8Array; - use datatypes::arrow::datatypes::DataType; + use common_query::physical_plan::SessionContext; use futures_util::StreamExt; use table::table::numbers::NumbersTable; @@ -366,56 +371,47 @@ mod tests { let tables = Tables::new(catalog_list, "test_engine".to_string()); let tables_stream = tables.scan(&None, &[], None).await.unwrap(); - let mut tables_stream = tables_stream - .execute(0, Arc::new(RuntimeEnv::default())) - .unwrap(); + let session_ctx = SessionContext::new(); + let mut tables_stream = tables_stream.execute(0, session_ctx.task_ctx()).unwrap(); if let Some(t) = tables_stream.next().await { - let batch = t.unwrap().df_recordbatch; + let batch = t.unwrap(); assert_eq!(1, batch.num_rows()); assert_eq!(4, batch.num_columns()); - assert_eq!(&DataType::Utf8, batch.column(0).data_type()); - assert_eq!(&DataType::Utf8, batch.column(1).data_type()); - assert_eq!(&DataType::Utf8, batch.column(2).data_type()); - assert_eq!(&DataType::Utf8, batch.column(3).data_type()); + assert_eq!( + ConcreteDataType::string_datatype(), + batch.column(0).data_type() + ); + assert_eq!( + ConcreteDataType::string_datatype(), + batch.column(1).data_type() + ); + assert_eq!( + ConcreteDataType::string_datatype(), + batch.column(2).data_type() + ); + assert_eq!( + ConcreteDataType::string_datatype(), + batch.column(3).data_type() + ); assert_eq!( "greptime", - batch - .column(0) - .as_any() - .downcast_ref::>() - .unwrap() - .value(0) + batch.column(0).get_ref(0).as_string().unwrap().unwrap() ); assert_eq!( "public", - batch - .column(1) - .as_any() - .downcast_ref::>() - .unwrap() - .value(0) + batch.column(1).get_ref(0).as_string().unwrap().unwrap() ); assert_eq!( "test_table", - batch - .column(2) - .as_any() - .downcast_ref::>() - .unwrap() - .value(0) + batch.column(2).get_ref(0).as_string().unwrap().unwrap() ); assert_eq!( "test_engine", - batch - .column(3) - .as_any() - .downcast_ref::>() - .unwrap() - .value(0) + batch.column(3).get_ref(0).as_string().unwrap().unwrap() ); } else { panic!("Record batch should not be empty!") diff --git a/src/common/query/src/physical_plan.rs b/src/common/query/src/physical_plan.rs index 2284c1b2d7..70c8d85a52 100644 --- a/src/common/query/src/physical_plan.rs +++ b/src/common/query/src/physical_plan.rs @@ -20,7 +20,7 @@ use common_recordbatch::adapter::{DfRecordBatchStreamAdapter, RecordBatchStreamA use common_recordbatch::{DfSendableRecordBatchStream, SendableRecordBatchStream}; use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef; use datafusion::error::Result as DfResult; -pub use datafusion::execution::context::TaskContext; +pub use datafusion::execution::context::{SessionContext, TaskContext}; use datafusion::physical_plan::expressions::PhysicalSortExpr; pub use datafusion::physical_plan::Partitioning; use datafusion::physical_plan::Statistics; diff --git a/src/mito/src/engine.rs b/src/mito/src/engine.rs index 845493d745..264eab75a7 100644 --- a/src/mito/src/engine.rs +++ b/src/mito/src/engine.rs @@ -519,13 +519,12 @@ impl MitoEngineInner { #[cfg(test)] mod tests { - use common_query::physical_plan::RuntimeEnv; + use common_query::physical_plan::SessionContext; use common_recordbatch::util; - use datafusion_common::field_util::{FieldExt, SchemaExt}; - use datatypes::prelude::{ConcreteDataType, ScalarVector}; + use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, SchemaBuilder}; use datatypes::value::Value; - use datatypes::vectors::*; + use datatypes::vectors::TimestampMillisecondVector; use log_store::fs::noop::NoopLogStore; use storage::config::EngineConfig as StorageEngineConfig; use storage::EngineImpl; @@ -600,30 +599,29 @@ mod tests { let (_dir, table_name, table) = setup_table_with_column_default_constraint().await; let mut columns_values: HashMap = HashMap::with_capacity(4); - let names = StringVector::from(vec!["first", "second"]); - let tss = TimestampVector::from_vec(vec![1, 2]); + let names: VectorRef = Arc::new(StringVector::from(vec!["first", "second"])); + let tss: VectorRef = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 2])); - columns_values.insert("name".to_string(), Arc::new(names.clone())); - columns_values.insert("ts".to_string(), Arc::new(tss.clone())); + columns_values.insert("name".to_string(), names.clone()); + columns_values.insert("ts".to_string(), tss.clone()); let insert_req = new_insert_request(table_name.to_string(), columns_values); assert_eq!(2, table.insert(insert_req).await.unwrap()); + let session_ctx = SessionContext::new(); let stream = table.scan(&None, &[], None).await.unwrap(); - let stream = stream.execute(0, Arc::new(RuntimeEnv::default())).unwrap(); + let stream = stream.execute(0, session_ctx.task_ctx()).unwrap(); let batches = util::collect(stream).await.unwrap(); assert_eq!(1, batches.len()); - let record = &batches[0].df_recordbatch; + let record = &batches[0]; assert_eq!(record.num_columns(), 3); - let columns = record.columns(); - assert_eq!(3, columns.len()); - assert_eq!(names.to_arrow_array(), columns[0]); + assert_eq!(names, *record.column(0)); assert_eq!( - Int32Vector::from_vec(vec![42, 42]).to_arrow_array(), - columns[1] + Arc::new(Int32Vector::from_vec(vec![42, 42])) as VectorRef, + *record.column(1) ); - assert_eq!(tss.to_arrow_array(), columns[2]); + assert_eq!(tss, *record.column(2)); } #[tokio::test] @@ -631,29 +629,28 @@ mod tests { let (_dir, table_name, table) = setup_table_with_column_default_constraint().await; let mut columns_values: HashMap = HashMap::with_capacity(4); - let names = StringVector::from(vec!["first", "second"]); - let nums = Int32Vector::from(vec![None, Some(66)]); - let tss = TimestampVector::from_vec(vec![1, 2]); + let names: VectorRef = Arc::new(StringVector::from(vec!["first", "second"])); + let nums: VectorRef = Arc::new(Int32Vector::from(vec![None, Some(66)])); + let tss: VectorRef = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 2])); - columns_values.insert("name".to_string(), Arc::new(names.clone())); - columns_values.insert("n".to_string(), Arc::new(nums.clone())); - columns_values.insert("ts".to_string(), Arc::new(tss.clone())); + columns_values.insert("name".to_string(), names.clone()); + columns_values.insert("n".to_string(), nums.clone()); + columns_values.insert("ts".to_string(), tss.clone()); let insert_req = new_insert_request(table_name.to_string(), columns_values); assert_eq!(2, table.insert(insert_req).await.unwrap()); + let session_ctx = SessionContext::new(); let stream = table.scan(&None, &[], None).await.unwrap(); - let stream = stream.execute(0, Arc::new(RuntimeEnv::default())).unwrap(); + let stream = stream.execute(0, session_ctx.task_ctx()).unwrap(); let batches = util::collect(stream).await.unwrap(); assert_eq!(1, batches.len()); - let record = &batches[0].df_recordbatch; + let record = &batches[0]; assert_eq!(record.num_columns(), 3); - let columns = record.columns(); - assert_eq!(3, columns.len()); - assert_eq!(names.to_arrow_array(), columns[0]); - assert_eq!(nums.to_arrow_array(), columns[1]); - assert_eq!(tss.to_arrow_array(), columns[2]); + assert_eq!(names, *record.column(0)); + assert_eq!(nums, *record.column(1)); + assert_eq!(tss, *record.column(2)); } #[test] @@ -730,73 +727,73 @@ mod tests { assert_eq!(0, table.insert(insert_req).await.unwrap()); let mut columns_values: HashMap = HashMap::with_capacity(4); - let hosts = StringVector::from(vec!["host1", "host2"]); - let cpus = Float64Vector::from_vec(vec![55.5, 66.6]); - let memories = Float64Vector::from_vec(vec![1024f64, 4096f64]); - let tss = TimestampVector::from_vec(vec![1, 2]); + let hosts: VectorRef = Arc::new(StringVector::from(vec!["host1", "host2"])); + let cpus: VectorRef = Arc::new(Float64Vector::from_vec(vec![55.5, 66.6])); + let memories: VectorRef = Arc::new(Float64Vector::from_vec(vec![1024f64, 4096f64])); + let tss: VectorRef = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 2])); - columns_values.insert("host".to_string(), Arc::new(hosts.clone())); - columns_values.insert("cpu".to_string(), Arc::new(cpus.clone())); - columns_values.insert("memory".to_string(), Arc::new(memories.clone())); - columns_values.insert("ts".to_string(), Arc::new(tss.clone())); + columns_values.insert("host".to_string(), hosts.clone()); + columns_values.insert("cpu".to_string(), cpus.clone()); + columns_values.insert("memory".to_string(), memories.clone()); + columns_values.insert("ts".to_string(), tss.clone()); let insert_req = new_insert_request("demo".to_string(), columns_values); assert_eq!(2, table.insert(insert_req).await.unwrap()); + let session_ctx = SessionContext::new(); let stream = table.scan(&None, &[], None).await.unwrap(); - let stream = stream.execute(0, Arc::new(RuntimeEnv::default())).unwrap(); + let stream = stream.execute(0, session_ctx.task_ctx()).unwrap(); let batches = util::collect(stream).await.unwrap(); assert_eq!(1, batches.len()); - assert_eq!(batches[0].df_recordbatch.num_columns(), 4); + assert_eq!(batches[0].num_columns(), 4); - let arrow_schema = batches[0].schema.arrow_schema(); - assert_eq!(arrow_schema.fields().len(), 4); + let batch_schema = &batches[0].schema; + assert_eq!(batch_schema.num_columns(), 4); + assert_eq!(batch_schema.column_schemas()[0].name, "host"); + assert_eq!(batch_schema.column_schemas()[1].name, "cpu"); + assert_eq!(batch_schema.column_schemas()[2].name, "memory"); + assert_eq!(batch_schema.column_schemas()[3].name, "ts"); - assert_eq!(arrow_schema.field(0).name(), "host"); - assert_eq!(arrow_schema.field(1).name(), "cpu"); - assert_eq!(arrow_schema.field(2).name(), "memory"); - assert_eq!(arrow_schema.field(3).name(), "ts"); - - let columns = batches[0].df_recordbatch.columns(); - assert_eq!(4, columns.len()); - assert_eq!(hosts.to_arrow_array(), columns[0]); - assert_eq!(cpus.to_arrow_array(), columns[1]); - assert_eq!(memories.to_arrow_array(), columns[2]); - assert_eq!(tss.to_arrow_array(), columns[3]); + let batch = &batches[0]; + assert_eq!(4, batch.num_columns()); + assert_eq!(hosts, *batch.column(0)); + assert_eq!(cpus, *batch.column(1)); + assert_eq!(memories, *batch.column(2)); + assert_eq!(tss, *batch.column(3)); // Scan with projections: cpu and memory let stream = table.scan(&Some(vec![1, 2]), &[], None).await.unwrap(); - let stream = stream.execute(0, Arc::new(RuntimeEnv::default())).unwrap(); + let stream = stream.execute(0, session_ctx.task_ctx()).unwrap(); let batches = util::collect(stream).await.unwrap(); assert_eq!(1, batches.len()); - assert_eq!(batches[0].df_recordbatch.num_columns(), 2); + assert_eq!(batches[0].num_columns(), 2); - let arrow_schema = batches[0].schema.arrow_schema(); - assert_eq!(arrow_schema.fields().len(), 2); + let batch_schema = &batches[0].schema; + assert_eq!(batch_schema.num_columns(), 2); - assert_eq!(arrow_schema.field(0).name(), "cpu"); - assert_eq!(arrow_schema.field(1).name(), "memory"); + assert_eq!(batch_schema.column_schemas()[0].name, "cpu"); + assert_eq!(batch_schema.column_schemas()[1].name, "memory"); - let columns = batches[0].df_recordbatch.columns(); - assert_eq!(2, columns.len()); - assert_eq!(cpus.to_arrow_array(), columns[0]); - assert_eq!(memories.to_arrow_array(), columns[1]); + let batch = &batches[0]; + assert_eq!(2, batch.num_columns()); + assert_eq!(cpus, *batch.column(0)); + assert_eq!(memories, *batch.column(1)); // Scan with projections: only ts let stream = table.scan(&Some(vec![3]), &[], None).await.unwrap(); - let stream = stream.execute(0, Arc::new(RuntimeEnv::default())).unwrap(); + let stream = stream.execute(0, session_ctx.task_ctx()).unwrap(); let batches = util::collect(stream).await.unwrap(); assert_eq!(1, batches.len()); - assert_eq!(batches[0].df_recordbatch.num_columns(), 1); + assert_eq!(batches[0].num_columns(), 1); - let arrow_schema = batches[0].schema.arrow_schema(); - assert_eq!(arrow_schema.fields().len(), 1); + let batch_schema = &batches[0].schema; + assert_eq!(batch_schema.num_columns(), 1); - assert_eq!(arrow_schema.field(0).name(), "ts"); + assert_eq!(batch_schema.column_schemas()[0].name, "ts"); - let columns = batches[0].df_recordbatch.columns(); - assert_eq!(1, columns.len()); - assert_eq!(tss.to_arrow_array(), columns[0]); + let record = &batches[0]; + assert_eq!(1, record.num_columns()); + assert_eq!(tss, *record.column(0)); } #[tokio::test] @@ -810,28 +807,31 @@ mod tests { // Insert more than batch size rows to the table. let test_batch_size = default_batch_size * 4; let mut columns_values: HashMap = HashMap::with_capacity(4); - let hosts = StringVector::from(vec!["host1"; test_batch_size]); - let cpus = Float64Vector::from_vec(vec![55.5; test_batch_size]); - let memories = Float64Vector::from_vec(vec![1024f64; test_batch_size]); - let tss = TimestampVector::from_values((0..test_batch_size).map(|v| v as i64)); + let hosts: VectorRef = Arc::new(StringVector::from(vec!["host1"; test_batch_size])); + let cpus: VectorRef = Arc::new(Float64Vector::from_vec(vec![55.5; test_batch_size])); + let memories: VectorRef = Arc::new(Float64Vector::from_vec(vec![1024f64; test_batch_size])); + let tss: VectorRef = Arc::new(TimestampMillisecondVector::from_values( + (0..test_batch_size).map(|v| v as i64), + )); - columns_values.insert("host".to_string(), Arc::new(hosts)); - columns_values.insert("cpu".to_string(), Arc::new(cpus)); - columns_values.insert("memory".to_string(), Arc::new(memories)); - columns_values.insert("ts".to_string(), Arc::new(tss.clone())); + columns_values.insert("host".to_string(), hosts); + columns_values.insert("cpu".to_string(), cpus); + columns_values.insert("memory".to_string(), memories); + columns_values.insert("ts".to_string(), tss.clone()); let insert_req = new_insert_request("demo".to_string(), columns_values); assert_eq!(test_batch_size, table.insert(insert_req).await.unwrap()); + let session_ctx = SessionContext::new(); let stream = table.scan(&None, &[], None).await.unwrap(); - let stream = stream.execute(0, Arc::new(RuntimeEnv::default())).unwrap(); + let stream = stream.execute(0, session_ctx.task_ctx()).unwrap(); let batches = util::collect(stream).await.unwrap(); let mut total = 0; for batch in batches { - assert_eq!(batch.df_recordbatch.num_columns(), 4); - let ts = batch.df_recordbatch.column(3); + assert_eq!(batch.num_columns(), 4); + let ts = batch.column(3); let expect = tss.slice(total, ts.len()); - assert_eq!(expect.to_arrow_array(), *ts); + assert_eq!(expect, *ts); total += ts.len(); } assert_eq!(test_batch_size, total); diff --git a/src/mito/src/manifest/action.rs b/src/mito/src/manifest/action.rs index f8428367d4..4e2ba43db4 100644 --- a/src/mito/src/manifest/action.rs +++ b/src/mito/src/manifest/action.rs @@ -26,7 +26,7 @@ use store_api::manifest::action::{ProtocolAction, ProtocolVersion, VersionHeader use store_api::manifest::{ManifestVersion, MetaAction}; use table::metadata::{RawTableInfo, TableIdent}; -#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] pub struct TableChange { pub table_info: RawTableInfo, } @@ -37,7 +37,7 @@ pub struct TableRemove { pub table_name: String, } -#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] pub enum TableMetaAction { Protocol(ProtocolAction), // Boxed TableChange to reduce the total size of enum @@ -45,7 +45,7 @@ pub enum TableMetaAction { Remove(TableRemove), } -#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] pub struct TableMetaActionList { pub actions: Vec, pub prev_version: ManifestVersion, diff --git a/src/mito/src/table.rs b/src/mito/src/table.rs index 689a2b4c1b..d5f554a994 100644 --- a/src/mito/src/table.rs +++ b/src/mito/src/table.rs @@ -21,9 +21,10 @@ use std::sync::Arc; use arc_swap::ArcSwap; use async_trait::async_trait; +use common_error::ext::BoxedError; use common_query::logical_plan::Expr; use common_query::physical_plan::PhysicalPlanRef; -use common_recordbatch::error::{Error as RecordBatchError, Result as RecordBatchResult}; +use common_recordbatch::error::{ExternalSnafu, Result as RecordBatchResult}; use common_recordbatch::{RecordBatch, RecordBatchStream}; use common_telemetry::logging; use datatypes::schema::ColumnSchema; @@ -189,7 +190,7 @@ impl Table for MitoTable { let stream_schema = schema.clone(); let stream = Box::pin(async_stream::try_stream! { - while let Some(chunk) = reader.next_chunk().await.map_err(RecordBatchError::new)? { + while let Some(chunk) = reader.next_chunk().await.map_err(BoxedError::new).context(ExternalSnafu)? { yield RecordBatch::new(stream_schema.clone(), chunk.columns)? } }); diff --git a/src/mito/src/table/test_util/mock_engine.rs b/src/mito/src/table/test_util/mock_engine.rs index 08b137cdc7..54b845bc51 100644 --- a/src/mito/src/table/test_util/mock_engine.rs +++ b/src/mito/src/table/test_util/mock_engine.rs @@ -21,7 +21,7 @@ use arc_swap::ArcSwap; use async_trait::async_trait; use common_error::mock::MockError; use common_telemetry::logging; -use datatypes::prelude::{Value, VectorBuilder, VectorRef}; +use datatypes::prelude::{DataType, Value, VectorRef}; use datatypes::schema::{ColumnSchema, Schema}; use storage::metadata::{RegionMetaImpl, RegionMetadata}; use storage::write_batch::{Mutation, WriteBatch}; @@ -58,12 +58,11 @@ impl ChunkReader for MockChunkReader { .iter() .map(|column_schema| { let data = self.memtable.get(&column_schema.name).unwrap(); - let mut builder = - VectorBuilder::with_capacity(column_schema.data_type.clone(), data.len()); + let mut builder = column_schema.data_type.create_mutable_vector(data.len()); for v in data { - builder.push(v); + builder.push_value_ref(v.as_value_ref()).unwrap(); } - builder.finish() + builder.to_vector() }) .collect::>(); self.read = true;