fix: Fix compiler errors in catalog and mito crates (#742)

* fix: Fix compiler errors in mito

* fix: Fix compiler errors in catalog crate

* style: Fix clippy

* chore: Fix use
This commit is contained in:
Yingwen
2022-12-13 15:53:55 +08:00
committed by GitHub
parent 4defde055c
commit 4b644aa482
10 changed files with 176 additions and 183 deletions

View File

@@ -17,7 +17,7 @@ use std::any::Any;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::prelude::{Snafu, StatusCode};
use datafusion::error::DataFusionError;
use datatypes::arrow;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::RawSchema;
use snafu::{Backtrace, ErrorCompat};
@@ -51,14 +51,12 @@ pub enum Error {
SystemCatalog { msg: String, backtrace: Backtrace },
#[snafu(display(
"System catalog table type mismatch, expected: binary, found: {:?} source: {}",
"System catalog table type mismatch, expected: binary, found: {:?}",
data_type,
source
))]
SystemCatalogTypeMismatch {
data_type: arrow::datatypes::DataType,
#[snafu(backtrace)]
source: datatypes::error::Error,
data_type: ConcreteDataType,
backtrace: Backtrace,
},
#[snafu(display("Invalid system catalog entry type: {:?}", entry_type))]
@@ -222,10 +220,11 @@ impl ErrorExt for Error {
| Error::ValueDeserialize { .. }
| Error::Io { .. } => StatusCode::StorageUnavailable,
Error::RegisterTable { .. } => StatusCode::Internal,
Error::RegisterTable { .. } | Error::SystemCatalogTypeMismatch { .. } => {
StatusCode::Internal
}
Error::ReadSystemCatalog { source, .. } => source.status_code(),
Error::SystemCatalogTypeMismatch { source, .. } => source.status_code(),
Error::InvalidCatalogValue { source, .. } => source.status_code(),
Error::TableExists { .. } => StatusCode::TableAlreadyExists,
@@ -265,7 +264,6 @@ impl From<Error> for DataFusionError {
#[cfg(test)]
mod tests {
use common_error::mock::MockError;
use datatypes::arrow::datatypes::DataType;
use snafu::GenerateImplicitData;
use super::*;
@@ -314,11 +312,8 @@ mod tests {
assert_eq!(
StatusCode::Internal,
Error::SystemCatalogTypeMismatch {
data_type: DataType::Boolean,
source: datatypes::error::Error::UnsupportedArrowType {
arrow_type: DataType::Boolean,
backtrace: Backtrace::generate()
}
data_type: ConcreteDataType::binary_datatype(),
backtrace: Backtrace::generate(),
}
.status_code()
);

View File

@@ -138,7 +138,7 @@ impl TableGlobalKey {
/// Table global info contains necessary info for a datanode to create table regions, including
/// table id, table meta(schema...), region id allocation across datanodes.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct TableGlobalValue {
/// Id of datanode that created the global table info kv. only for debugging.
pub node_id: u64,

View File

@@ -145,27 +145,34 @@ impl LocalCatalogManager {
/// Convert `RecordBatch` to a vector of `Entry`.
fn record_batch_to_entry(rb: RecordBatch) -> Result<Vec<Entry>> {
ensure!(
rb.df_recordbatch.columns().len() >= 6,
rb.num_columns() >= 6,
SystemCatalogSnafu {
msg: format!("Length mismatch: {}", rb.df_recordbatch.columns().len())
msg: format!("Length mismatch: {}", rb.num_columns())
}
);
let entry_type = UInt8Vector::try_from_arrow_array(&rb.df_recordbatch.columns()[0])
.with_context(|_| SystemCatalogTypeMismatchSnafu {
data_type: rb.df_recordbatch.columns()[ENTRY_TYPE_INDEX]
.data_type()
.clone(),
let entry_type = rb
.column(ENTRY_TYPE_INDEX)
.as_any()
.downcast_ref::<UInt8Vector>()
.with_context(|| SystemCatalogTypeMismatchSnafu {
data_type: rb.column(ENTRY_TYPE_INDEX).data_type(),
})?;
let key = BinaryVector::try_from_arrow_array(&rb.df_recordbatch.columns()[1])
.with_context(|_| SystemCatalogTypeMismatchSnafu {
data_type: rb.df_recordbatch.columns()[KEY_INDEX].data_type().clone(),
let key = rb
.column(KEY_INDEX)
.as_any()
.downcast_ref::<BinaryVector>()
.with_context(|| SystemCatalogTypeMismatchSnafu {
data_type: rb.column(KEY_INDEX).data_type(),
})?;
let value = BinaryVector::try_from_arrow_array(&rb.df_recordbatch.columns()[3])
.with_context(|_| SystemCatalogTypeMismatchSnafu {
data_type: rb.df_recordbatch.columns()[VALUE_INDEX].data_type().clone(),
let value = rb
.column(VALUE_INDEX)
.as_any()
.downcast_ref::<BinaryVector>()
.with_context(|| SystemCatalogTypeMismatchSnafu {
data_type: rb.column(VALUE_INDEX).data_type(),
})?;
let mut res = Vec::with_capacity(rb.num_rows());

View File

@@ -21,14 +21,13 @@ use common_catalog::consts::{
SYSTEM_CATALOG_TABLE_ID, SYSTEM_CATALOG_TABLE_NAME,
};
use common_query::logical_plan::Expr;
use common_query::physical_plan::{PhysicalPlanRef, RuntimeEnv};
use common_query::physical_plan::{PhysicalPlanRef, SessionContext};
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::debug;
use common_time::timestamp::Timestamp;
use common_time::util;
use datatypes::prelude::{ConcreteDataType, ScalarVector};
use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder, SchemaRef};
use datatypes::vectors::{BinaryVector, TimestampVector, UInt8Vector};
use datatypes::vectors::{BinaryVector, TimestampMillisecondVector, UInt8Vector};
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use table::engine::{EngineContext, TableEngineRef};
@@ -127,13 +126,14 @@ impl SystemCatalogTable {
/// Create a stream of all entries inside system catalog table
pub async fn records(&self) -> Result<SendableRecordBatchStream> {
let full_projection = None;
let ctx = SessionContext::new();
let scan = self
.table
.scan(&full_projection, &[], None)
.await
.context(error::SystemCatalogTableScanSnafu)?;
let stream = scan
.execute(0, Arc::new(RuntimeEnv::default()))
.execute(0, ctx.task_ctx())
.context(error::SystemCatalogTableScanExecSnafu)?;
Ok(stream)
}
@@ -222,9 +222,7 @@ pub fn build_insert_request(entry_type: EntryType, key: &[u8], value: &[u8]) ->
// Timestamp in key part is intentionally left to 0
columns_values.insert(
"timestamp".to_string(),
Arc::new(TimestampVector::from_slice(&[Timestamp::new_millisecond(
0,
)])) as _,
Arc::new(TimestampMillisecondVector::from_slice(&[0])) as _,
);
columns_values.insert(
@@ -232,18 +230,15 @@ pub fn build_insert_request(entry_type: EntryType, key: &[u8], value: &[u8]) ->
Arc::new(BinaryVector::from_slice(&[value])) as _,
);
let now = util::current_time_millis();
columns_values.insert(
"gmt_created".to_string(),
Arc::new(TimestampVector::from_slice(&[Timestamp::new_millisecond(
util::current_time_millis(),
)])) as _,
Arc::new(TimestampMillisecondVector::from_slice(&[now])) as _,
);
columns_values.insert(
"gmt_modified".to_string(),
Arc::new(TimestampVector::from_slice(&[Timestamp::new_millisecond(
util::current_time_millis(),
)])) as _,
Arc::new(TimestampMillisecondVector::from_slice(&[now])) as _,
);
InsertRequest {

View File

@@ -26,9 +26,9 @@ use common_query::logical_plan::Expr;
use common_query::physical_plan::PhysicalPlanRef;
use common_recordbatch::error::Result as RecordBatchResult;
use common_recordbatch::{RecordBatch, RecordBatchStream};
use datatypes::prelude::{ConcreteDataType, VectorBuilder};
use datatypes::prelude::{ConcreteDataType, DataType};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::value::Value;
use datatypes::value::ValueRef;
use datatypes::vectors::VectorRef;
use futures::Stream;
use snafu::ResultExt;
@@ -149,26 +149,33 @@ fn tables_to_record_batch(
engine: &str,
) -> Vec<VectorRef> {
let mut catalog_vec =
VectorBuilder::with_capacity(ConcreteDataType::string_datatype(), table_names.len());
ConcreteDataType::string_datatype().create_mutable_vector(table_names.len());
let mut schema_vec =
VectorBuilder::with_capacity(ConcreteDataType::string_datatype(), table_names.len());
ConcreteDataType::string_datatype().create_mutable_vector(table_names.len());
let mut table_name_vec =
VectorBuilder::with_capacity(ConcreteDataType::string_datatype(), table_names.len());
ConcreteDataType::string_datatype().create_mutable_vector(table_names.len());
let mut engine_vec =
VectorBuilder::with_capacity(ConcreteDataType::string_datatype(), table_names.len());
ConcreteDataType::string_datatype().create_mutable_vector(table_names.len());
for table_name in table_names {
catalog_vec.push(&Value::String(catalog_name.into()));
schema_vec.push(&Value::String(schema_name.into()));
table_name_vec.push(&Value::String(table_name.into()));
engine_vec.push(&Value::String(engine.into()));
// Safety: All these vectors are string type.
catalog_vec
.push_value_ref(ValueRef::String(catalog_name))
.unwrap();
schema_vec
.push_value_ref(ValueRef::String(schema_name))
.unwrap();
table_name_vec
.push_value_ref(ValueRef::String(&table_name))
.unwrap();
engine_vec.push_value_ref(ValueRef::String(engine)).unwrap();
}
vec![
catalog_vec.finish(),
schema_vec.finish(),
table_name_vec.finish(),
engine_vec.finish(),
catalog_vec.to_vector(),
schema_vec.to_vector(),
table_name_vec.to_vector(),
engine_vec.to_vector(),
]
}
@@ -340,9 +347,7 @@ fn build_schema_for_tables() -> Schema {
#[cfg(test)]
mod tests {
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_query::physical_plan::RuntimeEnv;
use datatypes::arrow::array::Utf8Array;
use datatypes::arrow::datatypes::DataType;
use common_query::physical_plan::SessionContext;
use futures_util::StreamExt;
use table::table::numbers::NumbersTable;
@@ -366,56 +371,47 @@ mod tests {
let tables = Tables::new(catalog_list, "test_engine".to_string());
let tables_stream = tables.scan(&None, &[], None).await.unwrap();
let mut tables_stream = tables_stream
.execute(0, Arc::new(RuntimeEnv::default()))
.unwrap();
let session_ctx = SessionContext::new();
let mut tables_stream = tables_stream.execute(0, session_ctx.task_ctx()).unwrap();
if let Some(t) = tables_stream.next().await {
let batch = t.unwrap().df_recordbatch;
let batch = t.unwrap();
assert_eq!(1, batch.num_rows());
assert_eq!(4, batch.num_columns());
assert_eq!(&DataType::Utf8, batch.column(0).data_type());
assert_eq!(&DataType::Utf8, batch.column(1).data_type());
assert_eq!(&DataType::Utf8, batch.column(2).data_type());
assert_eq!(&DataType::Utf8, batch.column(3).data_type());
assert_eq!(
ConcreteDataType::string_datatype(),
batch.column(0).data_type()
);
assert_eq!(
ConcreteDataType::string_datatype(),
batch.column(1).data_type()
);
assert_eq!(
ConcreteDataType::string_datatype(),
batch.column(2).data_type()
);
assert_eq!(
ConcreteDataType::string_datatype(),
batch.column(3).data_type()
);
assert_eq!(
"greptime",
batch
.column(0)
.as_any()
.downcast_ref::<Utf8Array<i32>>()
.unwrap()
.value(0)
batch.column(0).get_ref(0).as_string().unwrap().unwrap()
);
assert_eq!(
"public",
batch
.column(1)
.as_any()
.downcast_ref::<Utf8Array<i32>>()
.unwrap()
.value(0)
batch.column(1).get_ref(0).as_string().unwrap().unwrap()
);
assert_eq!(
"test_table",
batch
.column(2)
.as_any()
.downcast_ref::<Utf8Array<i32>>()
.unwrap()
.value(0)
batch.column(2).get_ref(0).as_string().unwrap().unwrap()
);
assert_eq!(
"test_engine",
batch
.column(3)
.as_any()
.downcast_ref::<Utf8Array<i32>>()
.unwrap()
.value(0)
batch.column(3).get_ref(0).as_string().unwrap().unwrap()
);
} else {
panic!("Record batch should not be empty!")

View File

@@ -20,7 +20,7 @@ use common_recordbatch::adapter::{DfRecordBatchStreamAdapter, RecordBatchStreamA
use common_recordbatch::{DfSendableRecordBatchStream, SendableRecordBatchStream};
use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef;
use datafusion::error::Result as DfResult;
pub use datafusion::execution::context::TaskContext;
pub use datafusion::execution::context::{SessionContext, TaskContext};
use datafusion::physical_plan::expressions::PhysicalSortExpr;
pub use datafusion::physical_plan::Partitioning;
use datafusion::physical_plan::Statistics;

View File

@@ -519,13 +519,12 @@ impl<S: StorageEngine> MitoEngineInner<S> {
#[cfg(test)]
mod tests {
use common_query::physical_plan::RuntimeEnv;
use common_query::physical_plan::SessionContext;
use common_recordbatch::util;
use datafusion_common::field_util::{FieldExt, SchemaExt};
use datatypes::prelude::{ConcreteDataType, ScalarVector};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, SchemaBuilder};
use datatypes::value::Value;
use datatypes::vectors::*;
use datatypes::vectors::TimestampMillisecondVector;
use log_store::fs::noop::NoopLogStore;
use storage::config::EngineConfig as StorageEngineConfig;
use storage::EngineImpl;
@@ -600,30 +599,29 @@ mod tests {
let (_dir, table_name, table) = setup_table_with_column_default_constraint().await;
let mut columns_values: HashMap<String, VectorRef> = HashMap::with_capacity(4);
let names = StringVector::from(vec!["first", "second"]);
let tss = TimestampVector::from_vec(vec![1, 2]);
let names: VectorRef = Arc::new(StringVector::from(vec!["first", "second"]));
let tss: VectorRef = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 2]));
columns_values.insert("name".to_string(), Arc::new(names.clone()));
columns_values.insert("ts".to_string(), Arc::new(tss.clone()));
columns_values.insert("name".to_string(), names.clone());
columns_values.insert("ts".to_string(), tss.clone());
let insert_req = new_insert_request(table_name.to_string(), columns_values);
assert_eq!(2, table.insert(insert_req).await.unwrap());
let session_ctx = SessionContext::new();
let stream = table.scan(&None, &[], None).await.unwrap();
let stream = stream.execute(0, Arc::new(RuntimeEnv::default())).unwrap();
let stream = stream.execute(0, session_ctx.task_ctx()).unwrap();
let batches = util::collect(stream).await.unwrap();
assert_eq!(1, batches.len());
let record = &batches[0].df_recordbatch;
let record = &batches[0];
assert_eq!(record.num_columns(), 3);
let columns = record.columns();
assert_eq!(3, columns.len());
assert_eq!(names.to_arrow_array(), columns[0]);
assert_eq!(names, *record.column(0));
assert_eq!(
Int32Vector::from_vec(vec![42, 42]).to_arrow_array(),
columns[1]
Arc::new(Int32Vector::from_vec(vec![42, 42])) as VectorRef,
*record.column(1)
);
assert_eq!(tss.to_arrow_array(), columns[2]);
assert_eq!(tss, *record.column(2));
}
#[tokio::test]
@@ -631,29 +629,28 @@ mod tests {
let (_dir, table_name, table) = setup_table_with_column_default_constraint().await;
let mut columns_values: HashMap<String, VectorRef> = HashMap::with_capacity(4);
let names = StringVector::from(vec!["first", "second"]);
let nums = Int32Vector::from(vec![None, Some(66)]);
let tss = TimestampVector::from_vec(vec![1, 2]);
let names: VectorRef = Arc::new(StringVector::from(vec!["first", "second"]));
let nums: VectorRef = Arc::new(Int32Vector::from(vec![None, Some(66)]));
let tss: VectorRef = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 2]));
columns_values.insert("name".to_string(), Arc::new(names.clone()));
columns_values.insert("n".to_string(), Arc::new(nums.clone()));
columns_values.insert("ts".to_string(), Arc::new(tss.clone()));
columns_values.insert("name".to_string(), names.clone());
columns_values.insert("n".to_string(), nums.clone());
columns_values.insert("ts".to_string(), tss.clone());
let insert_req = new_insert_request(table_name.to_string(), columns_values);
assert_eq!(2, table.insert(insert_req).await.unwrap());
let session_ctx = SessionContext::new();
let stream = table.scan(&None, &[], None).await.unwrap();
let stream = stream.execute(0, Arc::new(RuntimeEnv::default())).unwrap();
let stream = stream.execute(0, session_ctx.task_ctx()).unwrap();
let batches = util::collect(stream).await.unwrap();
assert_eq!(1, batches.len());
let record = &batches[0].df_recordbatch;
let record = &batches[0];
assert_eq!(record.num_columns(), 3);
let columns = record.columns();
assert_eq!(3, columns.len());
assert_eq!(names.to_arrow_array(), columns[0]);
assert_eq!(nums.to_arrow_array(), columns[1]);
assert_eq!(tss.to_arrow_array(), columns[2]);
assert_eq!(names, *record.column(0));
assert_eq!(nums, *record.column(1));
assert_eq!(tss, *record.column(2));
}
#[test]
@@ -730,73 +727,73 @@ mod tests {
assert_eq!(0, table.insert(insert_req).await.unwrap());
let mut columns_values: HashMap<String, VectorRef> = HashMap::with_capacity(4);
let hosts = StringVector::from(vec!["host1", "host2"]);
let cpus = Float64Vector::from_vec(vec![55.5, 66.6]);
let memories = Float64Vector::from_vec(vec![1024f64, 4096f64]);
let tss = TimestampVector::from_vec(vec![1, 2]);
let hosts: VectorRef = Arc::new(StringVector::from(vec!["host1", "host2"]));
let cpus: VectorRef = Arc::new(Float64Vector::from_vec(vec![55.5, 66.6]));
let memories: VectorRef = Arc::new(Float64Vector::from_vec(vec![1024f64, 4096f64]));
let tss: VectorRef = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 2]));
columns_values.insert("host".to_string(), Arc::new(hosts.clone()));
columns_values.insert("cpu".to_string(), Arc::new(cpus.clone()));
columns_values.insert("memory".to_string(), Arc::new(memories.clone()));
columns_values.insert("ts".to_string(), Arc::new(tss.clone()));
columns_values.insert("host".to_string(), hosts.clone());
columns_values.insert("cpu".to_string(), cpus.clone());
columns_values.insert("memory".to_string(), memories.clone());
columns_values.insert("ts".to_string(), tss.clone());
let insert_req = new_insert_request("demo".to_string(), columns_values);
assert_eq!(2, table.insert(insert_req).await.unwrap());
let session_ctx = SessionContext::new();
let stream = table.scan(&None, &[], None).await.unwrap();
let stream = stream.execute(0, Arc::new(RuntimeEnv::default())).unwrap();
let stream = stream.execute(0, session_ctx.task_ctx()).unwrap();
let batches = util::collect(stream).await.unwrap();
assert_eq!(1, batches.len());
assert_eq!(batches[0].df_recordbatch.num_columns(), 4);
assert_eq!(batches[0].num_columns(), 4);
let arrow_schema = batches[0].schema.arrow_schema();
assert_eq!(arrow_schema.fields().len(), 4);
let batch_schema = &batches[0].schema;
assert_eq!(batch_schema.num_columns(), 4);
assert_eq!(batch_schema.column_schemas()[0].name, "host");
assert_eq!(batch_schema.column_schemas()[1].name, "cpu");
assert_eq!(batch_schema.column_schemas()[2].name, "memory");
assert_eq!(batch_schema.column_schemas()[3].name, "ts");
assert_eq!(arrow_schema.field(0).name(), "host");
assert_eq!(arrow_schema.field(1).name(), "cpu");
assert_eq!(arrow_schema.field(2).name(), "memory");
assert_eq!(arrow_schema.field(3).name(), "ts");
let columns = batches[0].df_recordbatch.columns();
assert_eq!(4, columns.len());
assert_eq!(hosts.to_arrow_array(), columns[0]);
assert_eq!(cpus.to_arrow_array(), columns[1]);
assert_eq!(memories.to_arrow_array(), columns[2]);
assert_eq!(tss.to_arrow_array(), columns[3]);
let batch = &batches[0];
assert_eq!(4, batch.num_columns());
assert_eq!(hosts, *batch.column(0));
assert_eq!(cpus, *batch.column(1));
assert_eq!(memories, *batch.column(2));
assert_eq!(tss, *batch.column(3));
// Scan with projections: cpu and memory
let stream = table.scan(&Some(vec![1, 2]), &[], None).await.unwrap();
let stream = stream.execute(0, Arc::new(RuntimeEnv::default())).unwrap();
let stream = stream.execute(0, session_ctx.task_ctx()).unwrap();
let batches = util::collect(stream).await.unwrap();
assert_eq!(1, batches.len());
assert_eq!(batches[0].df_recordbatch.num_columns(), 2);
assert_eq!(batches[0].num_columns(), 2);
let arrow_schema = batches[0].schema.arrow_schema();
assert_eq!(arrow_schema.fields().len(), 2);
let batch_schema = &batches[0].schema;
assert_eq!(batch_schema.num_columns(), 2);
assert_eq!(arrow_schema.field(0).name(), "cpu");
assert_eq!(arrow_schema.field(1).name(), "memory");
assert_eq!(batch_schema.column_schemas()[0].name, "cpu");
assert_eq!(batch_schema.column_schemas()[1].name, "memory");
let columns = batches[0].df_recordbatch.columns();
assert_eq!(2, columns.len());
assert_eq!(cpus.to_arrow_array(), columns[0]);
assert_eq!(memories.to_arrow_array(), columns[1]);
let batch = &batches[0];
assert_eq!(2, batch.num_columns());
assert_eq!(cpus, *batch.column(0));
assert_eq!(memories, *batch.column(1));
// Scan with projections: only ts
let stream = table.scan(&Some(vec![3]), &[], None).await.unwrap();
let stream = stream.execute(0, Arc::new(RuntimeEnv::default())).unwrap();
let stream = stream.execute(0, session_ctx.task_ctx()).unwrap();
let batches = util::collect(stream).await.unwrap();
assert_eq!(1, batches.len());
assert_eq!(batches[0].df_recordbatch.num_columns(), 1);
assert_eq!(batches[0].num_columns(), 1);
let arrow_schema = batches[0].schema.arrow_schema();
assert_eq!(arrow_schema.fields().len(), 1);
let batch_schema = &batches[0].schema;
assert_eq!(batch_schema.num_columns(), 1);
assert_eq!(arrow_schema.field(0).name(), "ts");
assert_eq!(batch_schema.column_schemas()[0].name, "ts");
let columns = batches[0].df_recordbatch.columns();
assert_eq!(1, columns.len());
assert_eq!(tss.to_arrow_array(), columns[0]);
let record = &batches[0];
assert_eq!(1, record.num_columns());
assert_eq!(tss, *record.column(0));
}
#[tokio::test]
@@ -810,28 +807,31 @@ mod tests {
// Insert more than batch size rows to the table.
let test_batch_size = default_batch_size * 4;
let mut columns_values: HashMap<String, VectorRef> = HashMap::with_capacity(4);
let hosts = StringVector::from(vec!["host1"; test_batch_size]);
let cpus = Float64Vector::from_vec(vec![55.5; test_batch_size]);
let memories = Float64Vector::from_vec(vec![1024f64; test_batch_size]);
let tss = TimestampVector::from_values((0..test_batch_size).map(|v| v as i64));
let hosts: VectorRef = Arc::new(StringVector::from(vec!["host1"; test_batch_size]));
let cpus: VectorRef = Arc::new(Float64Vector::from_vec(vec![55.5; test_batch_size]));
let memories: VectorRef = Arc::new(Float64Vector::from_vec(vec![1024f64; test_batch_size]));
let tss: VectorRef = Arc::new(TimestampMillisecondVector::from_values(
(0..test_batch_size).map(|v| v as i64),
));
columns_values.insert("host".to_string(), Arc::new(hosts));
columns_values.insert("cpu".to_string(), Arc::new(cpus));
columns_values.insert("memory".to_string(), Arc::new(memories));
columns_values.insert("ts".to_string(), Arc::new(tss.clone()));
columns_values.insert("host".to_string(), hosts);
columns_values.insert("cpu".to_string(), cpus);
columns_values.insert("memory".to_string(), memories);
columns_values.insert("ts".to_string(), tss.clone());
let insert_req = new_insert_request("demo".to_string(), columns_values);
assert_eq!(test_batch_size, table.insert(insert_req).await.unwrap());
let session_ctx = SessionContext::new();
let stream = table.scan(&None, &[], None).await.unwrap();
let stream = stream.execute(0, Arc::new(RuntimeEnv::default())).unwrap();
let stream = stream.execute(0, session_ctx.task_ctx()).unwrap();
let batches = util::collect(stream).await.unwrap();
let mut total = 0;
for batch in batches {
assert_eq!(batch.df_recordbatch.num_columns(), 4);
let ts = batch.df_recordbatch.column(3);
assert_eq!(batch.num_columns(), 4);
let ts = batch.column(3);
let expect = tss.slice(total, ts.len());
assert_eq!(expect.to_arrow_array(), *ts);
assert_eq!(expect, *ts);
total += ts.len();
}
assert_eq!(test_batch_size, total);

View File

@@ -26,7 +26,7 @@ use store_api::manifest::action::{ProtocolAction, ProtocolVersion, VersionHeader
use store_api::manifest::{ManifestVersion, MetaAction};
use table::metadata::{RawTableInfo, TableIdent};
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct TableChange {
pub table_info: RawTableInfo,
}
@@ -37,7 +37,7 @@ pub struct TableRemove {
pub table_name: String,
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub enum TableMetaAction {
Protocol(ProtocolAction),
// Boxed TableChange to reduce the total size of enum
@@ -45,7 +45,7 @@ pub enum TableMetaAction {
Remove(TableRemove),
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct TableMetaActionList {
pub actions: Vec<TableMetaAction>,
pub prev_version: ManifestVersion,

View File

@@ -21,9 +21,10 @@ use std::sync::Arc;
use arc_swap::ArcSwap;
use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_query::logical_plan::Expr;
use common_query::physical_plan::PhysicalPlanRef;
use common_recordbatch::error::{Error as RecordBatchError, Result as RecordBatchResult};
use common_recordbatch::error::{ExternalSnafu, Result as RecordBatchResult};
use common_recordbatch::{RecordBatch, RecordBatchStream};
use common_telemetry::logging;
use datatypes::schema::ColumnSchema;
@@ -189,7 +190,7 @@ impl<R: Region> Table for MitoTable<R> {
let stream_schema = schema.clone();
let stream = Box::pin(async_stream::try_stream! {
while let Some(chunk) = reader.next_chunk().await.map_err(RecordBatchError::new)? {
while let Some(chunk) = reader.next_chunk().await.map_err(BoxedError::new).context(ExternalSnafu)? {
yield RecordBatch::new(stream_schema.clone(), chunk.columns)?
}
});

View File

@@ -21,7 +21,7 @@ use arc_swap::ArcSwap;
use async_trait::async_trait;
use common_error::mock::MockError;
use common_telemetry::logging;
use datatypes::prelude::{Value, VectorBuilder, VectorRef};
use datatypes::prelude::{DataType, Value, VectorRef};
use datatypes::schema::{ColumnSchema, Schema};
use storage::metadata::{RegionMetaImpl, RegionMetadata};
use storage::write_batch::{Mutation, WriteBatch};
@@ -58,12 +58,11 @@ impl ChunkReader for MockChunkReader {
.iter()
.map(|column_schema| {
let data = self.memtable.get(&column_schema.name).unwrap();
let mut builder =
VectorBuilder::with_capacity(column_schema.data_type.clone(), data.len());
let mut builder = column_schema.data_type.create_mutable_vector(data.len());
for v in data {
builder.push(v);
builder.push_value_ref(v.as_value_ref()).unwrap();
}
builder.finish()
builder.to_vector()
})
.collect::<Vec<VectorRef>>();
self.read = true;