diff --git a/Cargo.lock b/Cargo.lock index 55efad4511..84d6fac6cb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3145,13 +3145,16 @@ dependencies = [ name = "table-engine" version = "0.1.0" dependencies = [ + "async-stream", "async-trait", "chrono", "common-error", "common-query", "common-recordbatch", "common-telemetry", + "datafusion-common", "datatypes", + "futures", "snafu", "storage", "store-api", diff --git a/src/common/recordbatch/src/error.rs b/src/common/recordbatch/src/error.rs index 494258fa65..147ee77562 100644 --- a/src/common/recordbatch/src/error.rs +++ b/src/common/recordbatch/src/error.rs @@ -1,6 +1,9 @@ use datatypes::arrow::error::ArrowError; use snafu::{Backtrace, Snafu}; +// TODO(dennis): use ErrorExt instead. +pub type BoxedError = Box; + #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { @@ -9,6 +12,9 @@ pub enum Error { source: ArrowError, backtrace: Backtrace, }, + + #[snafu(display("Storage error: {}, source: {}", msg, source))] + Storage { source: BoxedError, msg: String }, } pub type Result = std::result::Result; diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 0a69691185..ec082dae66 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -5,7 +5,7 @@ use datatypes::prelude::ConcreteDataType; use table::error::Error as TableError; use table_engine::error::Error as TableEngineError; -// TODO(boyan): use ErrorExt instead. +// TODO(dennis): use ErrorExt instead. pub type BoxedError = Box; /// Business error of datanode. diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 6fbb4751cd..a6a07dfd54 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -79,7 +79,7 @@ impl Instance { } pub async fn start(&self) -> Result<()> { - // FIXME(boyan): create a demo table for test + // FIXME(dennis): create a demo table for test let column_schemas = vec![ ColumnSchema::new("host", ConcreteDataType::string_datatype(), false), ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true), diff --git a/src/datatypes/src/vectors/primitive.rs b/src/datatypes/src/vectors/primitive.rs index 08f64c4fda..5beb612781 100644 --- a/src/datatypes/src/vectors/primitive.rs +++ b/src/datatypes/src/vectors/primitive.rs @@ -19,7 +19,7 @@ use crate::value::Value; use crate::vectors::{self, MutableVector, Validity, Vector, VectorRef}; /// Vector for primitive data types. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct PrimitiveVector { array: PrimitiveArray, } diff --git a/src/table-engine/Cargo.toml b/src/table-engine/Cargo.toml index ac2ea093af..0f38728942 100644 --- a/src/table-engine/Cargo.toml +++ b/src/table-engine/Cargo.toml @@ -4,12 +4,16 @@ version = "0.1.0" edition = "2021" [dependencies] +async-stream = "0.3" async-trait = "0.1" chrono = { version = "0.4", features = ["serde"] } common-error = {path = "../common/error" } common-query = {path = "../common/query" } common-recordbatch = {path = "../common/recordbatch" } common-telemetry = {path = "../common/telemetry" } +datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2"} +datatypes = { path = "../datatypes" } +futures = "0.3" snafu = { version = "0.7", features = ["backtraces"] } storage ={ path = "../storage" } store-api ={ path = "../store-api" } diff --git a/src/table-engine/src/engine.rs b/src/table-engine/src/engine.rs index 45b0b2e6a3..a61cc1092b 100644 --- a/src/table-engine/src/engine.rs +++ b/src/table-engine/src/engine.rs @@ -73,7 +73,7 @@ impl TableEngine for MitoEngine { } } -/// FIXME(boyan) impl system catalog to keep table metadata. +/// FIXME(dennis) impl system catalog to keep table metadata. struct MitoEngineInner { tables: RwLock>, storage_engine: Store, @@ -100,7 +100,7 @@ impl MitoEngineInner { _ctx: &EngineContext, request: CreateTableRequest, ) -> Result { - //FIXME(boyan): we only supports creating a demo table right now + //FIXME(dennis): we only supports creating a demo table right now //The create table sql is like: // create table demo(host string, // ts int64, @@ -108,7 +108,7 @@ impl MitoEngineInner { // memory float64, // PRIMARY KEY(ts, host)) with regions=1; - //TODO(boyan): supports multi regions + //TODO(dennis): supports multi regions let region_id: RegionId = 0; let name = store::gen_region_name(region_id); @@ -183,69 +183,62 @@ impl MitoEngineInner { #[cfg(test)] mod tests { - use datatypes::schema::{ColumnSchema, Schema}; + use common_recordbatch::util; + use datafusion_common::field_util::FieldExt; + use datafusion_common::field_util::SchemaExt; use datatypes::vectors::*; - use storage::EngineImpl; use table::requests::InsertRequest; use super::*; + use crate::table::test; #[tokio::test] - async fn test_creat_table_insert() { - let column_schemas = vec![ - ColumnSchema::new("host", ConcreteDataType::string_datatype(), false), - ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), true), - ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true), - ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true), - ]; - - let table_engine = MitoEngine::::new(EngineImpl::new()); - - let table_name = "demo"; - let schema = Arc::new(Schema::new(column_schemas)); - let table = table_engine - .create_table( - &EngineContext::default(), - CreateTableRequest { - name: table_name.to_string(), - desc: Some(" a test table".to_string()), - schema: schema.clone(), - }, - ) - .await - .unwrap(); + async fn test_creat_table_insert_scan() { + let (_engine, table, schema) = test::setup_test_engine_and_table().await; assert_eq!(TableType::Base, table.table_type()); assert_eq!(schema, table.schema()); let insert_req = InsertRequest { - table_name: table_name.to_string(), + table_name: "demo".to_string(), columns_values: HashMap::default(), }; assert_eq!(0, table.insert(insert_req).await.unwrap()); let mut columns_values: HashMap = HashMap::with_capacity(4); - columns_values.insert( - "host".to_string(), - Arc::new(StringVector::from(vec!["host1", "host2"])), - ); - columns_values.insert( - "cpu".to_string(), - Arc::new(Float64Vector::from_vec(vec![55.5, 66.6])), - ); - columns_values.insert( - "memory".to_string(), - Arc::new(Float64Vector::from_vec(vec![1024f64, 4096f64])), - ); - columns_values.insert( - "ts".to_string(), - Arc::new(Int64Vector::from_vec(vec![1, 2])), - ); + let hosts = StringVector::from(vec!["host1", "host2"]); + let cpus = Float64Vector::from_vec(vec![55.5, 66.6]); + let memories = Float64Vector::from_vec(vec![1024f64, 4096f64]); + let tss = Int64Vector::from_vec(vec![1, 2]); + + columns_values.insert("host".to_string(), Arc::new(hosts.clone())); + columns_values.insert("cpu".to_string(), Arc::new(cpus.clone())); + columns_values.insert("memory".to_string(), Arc::new(memories.clone())); + columns_values.insert("ts".to_string(), Arc::new(tss.clone())); let insert_req = InsertRequest { - table_name: table_name.to_string(), + table_name: "demo".to_string(), columns_values, }; assert_eq!(2, table.insert(insert_req).await.unwrap()); + + let stream = table.scan(&None, &[], None).await.unwrap(); + let batches = util::collect(stream).await.unwrap(); + assert_eq!(1, batches.len()); + assert_eq!(batches[0].df_recordbatch.num_columns(), 4); + + let arrow_schema = batches[0].schema.arrow_schema(); + assert_eq!(arrow_schema.fields().len(), 4); + assert_eq!(arrow_schema.field(0).name(), "host"); + assert_eq!(arrow_schema.field(1).name(), "ts"); + assert_eq!(arrow_schema.field(2).name(), "cpu"); + assert_eq!(arrow_schema.field(3).name(), "memory"); + + let columns = batches[0].df_recordbatch.columns(); + assert_eq!(4, columns.len()); + assert_eq!(hosts.to_arrow_array(), columns[0]); + assert_eq!(tss.to_arrow_array(), columns[1]); + assert_eq!(cpus.to_arrow_array(), columns[2]); + assert_eq!(memories.to_arrow_array(), columns[3]); } } diff --git a/src/table-engine/src/error.rs b/src/table-engine/src/error.rs index 26b041be5c..bdafe44c76 100644 --- a/src/table-engine/src/error.rs +++ b/src/table-engine/src/error.rs @@ -2,7 +2,7 @@ use std::any::Any; use common_error::prelude::*; -// TODO(boyan): use ErrorExt instead. +// TODO(dennis): use ErrorExt instead. pub type BoxedError = Box; #[derive(Debug, Snafu)] diff --git a/src/table-engine/src/table.rs b/src/table-engine/src/table.rs index 493fb82b9a..26eae279e6 100644 --- a/src/table-engine/src/table.rs +++ b/src/table-engine/src/table.rs @@ -1,11 +1,22 @@ +#[cfg(test)] +pub mod test; use std::any::Any; +use std::pin::Pin; use async_trait::async_trait; use common_query::logical_plan::Expr; -use common_recordbatch::SendableRecordBatchStream; +use common_recordbatch::error::{Result as RecordBatchResult, StorageSnafu}; +use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream}; +use datafusion_common::record_batch::RecordBatch as DfRecordBatch; +use futures::task::{Context, Poll}; +use futures::Stream; use snafu::OptionExt; +use snafu::ResultExt; use store_api::storage::SchemaRef; -use store_api::storage::{PutOperation, Region, WriteContext, WriteRequest}; +use store_api::storage::{ + ChunkReader, PutOperation, ReadContext, Region, ScanRequest, Snapshot, WriteContext, + WriteRequest, +}; use table::error::{Error as TableError, MissingColumnSnafu, Result as TableResult}; use table::requests::InsertRequest; use table::{ @@ -37,7 +48,7 @@ impl Table for MitoTable { let mut write_request = R::WriteRequest::new(self.schema()); - //FIXME(boyan): we can only insert to demo table right now + //FIXME(dennis): we can only insert to demo table right now let mut put_op = <::WriteRequest as WriteRequest>::PutOp::new(); let mut columns_values = request.columns_values; let key_columns = vec!["ts", "host"]; @@ -83,7 +94,66 @@ impl Table for MitoTable { _filters: &[Expr], _limit: Option, ) -> TableResult { - unimplemented!(); + let read_ctx = ReadContext::default(); + let snapshot = self.region.snapshot(&read_ctx).map_err(TableError::new)?; + + let mut reader = snapshot + .scan(&read_ctx, ScanRequest::default()) + .await + .map_err(TableError::new)? + .reader; + + let schema = reader.schema().clone(); + let stream_schema = schema.clone(); + + let stream = Box::pin(async_stream::try_stream! { + + for chunk in reader.next_chunk() + .await + .map_err(|e| Box::new(e) as _) + .context(StorageSnafu { + msg: "Fail to reader chunk", + })? + { + let batch = DfRecordBatch::try_new( + stream_schema.arrow_schema().clone(), + chunk.columns + .into_iter() + .map(|v| v.to_arrow_array()) + .collect()); + let batch = batch + .map_err(|e| Box::new(e) as _) + .context(StorageSnafu { + msg: "Fail to new datafusion record batch", + })?; + + yield RecordBatch { + schema: stream_schema.clone(), + df_recordbatch: batch, + } + } + }); + + Ok(Box::pin(ChunkStream { schema, stream })) + } +} + +struct ChunkStream { + schema: SchemaRef, + stream: Pin> + Send>>, +} + +impl RecordBatchStream for ChunkStream { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} + +impl Stream for ChunkStream { + type Item = RecordBatchResult; + + fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.stream).poll_next(ctx) } } diff --git a/src/table-engine/src/table/test.rs b/src/table-engine/src/table/test.rs new file mode 100644 index 0000000000..b0793aa082 --- /dev/null +++ b/src/table-engine/src/table/test.rs @@ -0,0 +1,38 @@ +use std::sync::Arc; + +use datatypes::prelude::ConcreteDataType; +use datatypes::schema::SchemaRef; +use datatypes::schema::{ColumnSchema, Schema}; +use storage::EngineImpl; +use table::engine::{EngineContext, TableEngine}; +use table::requests::CreateTableRequest; +use table::TableRef; + +use crate::engine::MitoEngine; + +pub async fn setup_test_engine_and_table() -> (MitoEngine, TableRef, SchemaRef) { + let column_schemas = vec![ + ColumnSchema::new("host", ConcreteDataType::string_datatype(), false), + ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), true), + ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true), + ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true), + ]; + + let table_engine = MitoEngine::::new(EngineImpl::new()); + + let table_name = "demo"; + let schema = Arc::new(Schema::new(column_schemas)); + let table = table_engine + .create_table( + &EngineContext::default(), + CreateTableRequest { + name: table_name.to_string(), + desc: Some(" a test table".to_string()), + schema: schema.clone(), + }, + ) + .await + .unwrap(); + + (table_engine, table, schema) +}