feat: impl scanning data from storage engine for table (#47)

* feat: impl scanning data from storage for MitoTable

* adds test mod to setup table engine test

* fix: comment error

* fix: boyan -> dennis in todo comments

* fix: remove necessary send in BatchIteratorPtr
This commit is contained in:
dennis zhuang
2022-06-20 15:42:57 +08:00
committed by GitHub
parent 056185eb24
commit 4071b0cff2
10 changed files with 168 additions and 54 deletions

3
Cargo.lock generated
View File

@@ -3145,13 +3145,16 @@ dependencies = [
name = "table-engine"
version = "0.1.0"
dependencies = [
"async-stream",
"async-trait",
"chrono",
"common-error",
"common-query",
"common-recordbatch",
"common-telemetry",
"datafusion-common",
"datatypes",
"futures",
"snafu",
"storage",
"store-api",

View File

@@ -1,6 +1,9 @@
use datatypes::arrow::error::ArrowError;
use snafu::{Backtrace, Snafu};
// TODO(dennis): use ErrorExt instead.
pub type BoxedError = Box<dyn std::error::Error + Send + Sync>;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
@@ -9,6 +12,9 @@ pub enum Error {
source: ArrowError,
backtrace: Backtrace,
},
#[snafu(display("Storage error: {}, source: {}", msg, source))]
Storage { source: BoxedError, msg: String },
}
pub type Result<T> = std::result::Result<T, Error>;

View File

@@ -5,7 +5,7 @@ use datatypes::prelude::ConcreteDataType;
use table::error::Error as TableError;
use table_engine::error::Error as TableEngineError;
// TODO(boyan): use ErrorExt instead.
// TODO(dennis): use ErrorExt instead.
pub type BoxedError = Box<dyn std::error::Error + Send + Sync>;
/// Business error of datanode.

View File

@@ -79,7 +79,7 @@ impl Instance {
}
pub async fn start(&self) -> Result<()> {
// FIXME(boyan): create a demo table for test
// FIXME(dennis): create a demo table for test
let column_schemas = vec![
ColumnSchema::new("host", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true),

View File

@@ -19,7 +19,7 @@ use crate::value::Value;
use crate::vectors::{self, MutableVector, Validity, Vector, VectorRef};
/// Vector for primitive data types.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct PrimitiveVector<T: Primitive> {
array: PrimitiveArray<T>,
}

View File

@@ -4,12 +4,16 @@ version = "0.1.0"
edition = "2021"
[dependencies]
async-stream = "0.3"
async-trait = "0.1"
chrono = { version = "0.4", features = ["serde"] }
common-error = {path = "../common/error" }
common-query = {path = "../common/query" }
common-recordbatch = {path = "../common/recordbatch" }
common-telemetry = {path = "../common/telemetry" }
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2"}
datatypes = { path = "../datatypes" }
futures = "0.3"
snafu = { version = "0.7", features = ["backtraces"] }
storage ={ path = "../storage" }
store-api ={ path = "../store-api" }

View File

@@ -73,7 +73,7 @@ impl<Store: StorageEngine> TableEngine for MitoEngine<Store> {
}
}
/// FIXME(boyan) impl system catalog to keep table metadata.
/// FIXME(dennis) impl system catalog to keep table metadata.
struct MitoEngineInner<Store: StorageEngine> {
tables: RwLock<HashMap<String, TableRef>>,
storage_engine: Store,
@@ -100,7 +100,7 @@ impl<Store: StorageEngine> MitoEngineInner<Store> {
_ctx: &EngineContext,
request: CreateTableRequest,
) -> Result<TableRef> {
//FIXME(boyan): we only supports creating a demo table right now
//FIXME(dennis): we only supports creating a demo table right now
//The create table sql is like:
// create table demo(host string,
// ts int64,
@@ -108,7 +108,7 @@ impl<Store: StorageEngine> MitoEngineInner<Store> {
// memory float64,
// PRIMARY KEY(ts, host)) with regions=1;
//TODO(boyan): supports multi regions
//TODO(dennis): supports multi regions
let region_id: RegionId = 0;
let name = store::gen_region_name(region_id);
@@ -183,69 +183,62 @@ impl<Store: StorageEngine> MitoEngineInner<Store> {
#[cfg(test)]
mod tests {
use datatypes::schema::{ColumnSchema, Schema};
use common_recordbatch::util;
use datafusion_common::field_util::FieldExt;
use datafusion_common::field_util::SchemaExt;
use datatypes::vectors::*;
use storage::EngineImpl;
use table::requests::InsertRequest;
use super::*;
use crate::table::test;
#[tokio::test]
async fn test_creat_table_insert() {
let column_schemas = vec![
ColumnSchema::new("host", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), true),
ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true),
ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true),
];
let table_engine = MitoEngine::<EngineImpl>::new(EngineImpl::new());
let table_name = "demo";
let schema = Arc::new(Schema::new(column_schemas));
let table = table_engine
.create_table(
&EngineContext::default(),
CreateTableRequest {
name: table_name.to_string(),
desc: Some(" a test table".to_string()),
schema: schema.clone(),
},
)
.await
.unwrap();
async fn test_creat_table_insert_scan() {
let (_engine, table, schema) = test::setup_test_engine_and_table().await;
assert_eq!(TableType::Base, table.table_type());
assert_eq!(schema, table.schema());
let insert_req = InsertRequest {
table_name: table_name.to_string(),
table_name: "demo".to_string(),
columns_values: HashMap::default(),
};
assert_eq!(0, table.insert(insert_req).await.unwrap());
let mut columns_values: HashMap<String, VectorRef> = HashMap::with_capacity(4);
columns_values.insert(
"host".to_string(),
Arc::new(StringVector::from(vec!["host1", "host2"])),
);
columns_values.insert(
"cpu".to_string(),
Arc::new(Float64Vector::from_vec(vec![55.5, 66.6])),
);
columns_values.insert(
"memory".to_string(),
Arc::new(Float64Vector::from_vec(vec![1024f64, 4096f64])),
);
columns_values.insert(
"ts".to_string(),
Arc::new(Int64Vector::from_vec(vec![1, 2])),
);
let hosts = StringVector::from(vec!["host1", "host2"]);
let cpus = Float64Vector::from_vec(vec![55.5, 66.6]);
let memories = Float64Vector::from_vec(vec![1024f64, 4096f64]);
let tss = Int64Vector::from_vec(vec![1, 2]);
columns_values.insert("host".to_string(), Arc::new(hosts.clone()));
columns_values.insert("cpu".to_string(), Arc::new(cpus.clone()));
columns_values.insert("memory".to_string(), Arc::new(memories.clone()));
columns_values.insert("ts".to_string(), Arc::new(tss.clone()));
let insert_req = InsertRequest {
table_name: table_name.to_string(),
table_name: "demo".to_string(),
columns_values,
};
assert_eq!(2, table.insert(insert_req).await.unwrap());
let stream = table.scan(&None, &[], None).await.unwrap();
let batches = util::collect(stream).await.unwrap();
assert_eq!(1, batches.len());
assert_eq!(batches[0].df_recordbatch.num_columns(), 4);
let arrow_schema = batches[0].schema.arrow_schema();
assert_eq!(arrow_schema.fields().len(), 4);
assert_eq!(arrow_schema.field(0).name(), "host");
assert_eq!(arrow_schema.field(1).name(), "ts");
assert_eq!(arrow_schema.field(2).name(), "cpu");
assert_eq!(arrow_schema.field(3).name(), "memory");
let columns = batches[0].df_recordbatch.columns();
assert_eq!(4, columns.len());
assert_eq!(hosts.to_arrow_array(), columns[0]);
assert_eq!(tss.to_arrow_array(), columns[1]);
assert_eq!(cpus.to_arrow_array(), columns[2]);
assert_eq!(memories.to_arrow_array(), columns[3]);
}
}

View File

@@ -2,7 +2,7 @@ use std::any::Any;
use common_error::prelude::*;
// TODO(boyan): use ErrorExt instead.
// TODO(dennis): use ErrorExt instead.
pub type BoxedError = Box<dyn std::error::Error + Send + Sync>;
#[derive(Debug, Snafu)]

View File

@@ -1,11 +1,22 @@
#[cfg(test)]
pub mod test;
use std::any::Any;
use std::pin::Pin;
use async_trait::async_trait;
use common_query::logical_plan::Expr;
use common_recordbatch::SendableRecordBatchStream;
use common_recordbatch::error::{Result as RecordBatchResult, StorageSnafu};
use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream};
use datafusion_common::record_batch::RecordBatch as DfRecordBatch;
use futures::task::{Context, Poll};
use futures::Stream;
use snafu::OptionExt;
use snafu::ResultExt;
use store_api::storage::SchemaRef;
use store_api::storage::{PutOperation, Region, WriteContext, WriteRequest};
use store_api::storage::{
ChunkReader, PutOperation, ReadContext, Region, ScanRequest, Snapshot, WriteContext,
WriteRequest,
};
use table::error::{Error as TableError, MissingColumnSnafu, Result as TableResult};
use table::requests::InsertRequest;
use table::{
@@ -37,7 +48,7 @@ impl<R: Region> Table for MitoTable<R> {
let mut write_request = R::WriteRequest::new(self.schema());
//FIXME(boyan): we can only insert to demo table right now
//FIXME(dennis): we can only insert to demo table right now
let mut put_op = <<R as Region>::WriteRequest as WriteRequest>::PutOp::new();
let mut columns_values = request.columns_values;
let key_columns = vec!["ts", "host"];
@@ -83,7 +94,66 @@ impl<R: Region> Table for MitoTable<R> {
_filters: &[Expr],
_limit: Option<usize>,
) -> TableResult<SendableRecordBatchStream> {
unimplemented!();
let read_ctx = ReadContext::default();
let snapshot = self.region.snapshot(&read_ctx).map_err(TableError::new)?;
let mut reader = snapshot
.scan(&read_ctx, ScanRequest::default())
.await
.map_err(TableError::new)?
.reader;
let schema = reader.schema().clone();
let stream_schema = schema.clone();
let stream = Box::pin(async_stream::try_stream! {
for chunk in reader.next_chunk()
.await
.map_err(|e| Box::new(e) as _)
.context(StorageSnafu {
msg: "Fail to reader chunk",
})?
{
let batch = DfRecordBatch::try_new(
stream_schema.arrow_schema().clone(),
chunk.columns
.into_iter()
.map(|v| v.to_arrow_array())
.collect());
let batch = batch
.map_err(|e| Box::new(e) as _)
.context(StorageSnafu {
msg: "Fail to new datafusion record batch",
})?;
yield RecordBatch {
schema: stream_schema.clone(),
df_recordbatch: batch,
}
}
});
Ok(Box::pin(ChunkStream { schema, stream }))
}
}
struct ChunkStream {
schema: SchemaRef,
stream: Pin<Box<dyn Stream<Item = RecordBatchResult<RecordBatch>> + Send>>,
}
impl RecordBatchStream for ChunkStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}
impl Stream for ChunkStream {
type Item = RecordBatchResult<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.stream).poll_next(ctx)
}
}

View File

@@ -0,0 +1,38 @@
use std::sync::Arc;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::SchemaRef;
use datatypes::schema::{ColumnSchema, Schema};
use storage::EngineImpl;
use table::engine::{EngineContext, TableEngine};
use table::requests::CreateTableRequest;
use table::TableRef;
use crate::engine::MitoEngine;
pub async fn setup_test_engine_and_table() -> (MitoEngine<EngineImpl>, TableRef, SchemaRef) {
let column_schemas = vec![
ColumnSchema::new("host", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), true),
ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true),
ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true),
];
let table_engine = MitoEngine::<EngineImpl>::new(EngineImpl::new());
let table_name = "demo";
let schema = Arc::new(Schema::new(column_schemas));
let table = table_engine
.create_table(
&EngineContext::default(),
CreateTableRequest {
name: table_name.to_string(),
desc: Some(" a test table".to_string()),
schema: schema.clone(),
},
)
.await
.unwrap();
(table_engine, table, schema)
}