fix: Fix MitoTable::scan only returns one batch (#227)

* feat: Add tests for different batch

Add region scan test with different batch size

* fix: Fix table scan only returns one batch

* style: Fix clippy

* test: Add tests to scan table with rows more than batch size

* fix: Fix MockChunkReader never stop
This commit is contained in:
evenyag
2022-09-06 20:36:05 +08:00
committed by GitHub
parent 7f8195861e
commit 0ae99f7ac3
5 changed files with 80 additions and 6 deletions

View File

@@ -4,6 +4,7 @@ mod basic;
mod flush;
mod projection;
use common_telemetry::logging;
use datatypes::prelude::ScalarVector;
use datatypes::type_id::LogicalTypeId;
use datatypes::vectors::Int64Vector;
@@ -35,7 +36,7 @@ pub fn new_metadata(region_name: &str, enable_version_column: bool) -> RegionMet
pub struct TesterBase<S: LogStore> {
pub region: RegionImpl<S>,
write_ctx: WriteContext,
read_ctx: ReadContext,
pub read_ctx: ReadContext,
}
impl<S: LogStore> TesterBase<S> {
@@ -61,6 +62,8 @@ impl<S: LogStore> TesterBase<S> {
/// Scan all data.
pub async fn full_scan(&self) -> Vec<(i64, Option<i64>)> {
logging::info!("Full scan with ctx {:?}", self.read_ctx);
let snapshot = self.region.snapshot(&self.read_ctx).unwrap();
let resp = snapshot

View File

@@ -76,6 +76,11 @@ impl Tester {
self.base.as_ref().unwrap()
}
#[inline]
fn set_batch_size(&mut self, batch_size: usize) {
self.base.as_mut().unwrap().read_ctx.batch_size = batch_size;
}
async fn put(&self, data: &[(i64, Option<i64>)]) -> WriteResponse {
self.base().put(data).await
}
@@ -162,3 +167,24 @@ async fn test_open_empty() {
let ret = tester.try_reopen().await;
assert!(!ret.unwrap());
}
#[tokio::test]
async fn test_scan_different_batch() {
let dir = TempDir::new("different-batch").unwrap();
let store_dir = dir.path().to_str().unwrap();
let mut tester = Tester::new(REGION_NAME, store_dir).await;
let data: Vec<_> = (0..=2000).map(|i| (i, Some(i))).collect();
for chunk in data.chunks(100) {
tester.put(chunk).await;
}
let batch_sizes = [1, 2, 4, 16, 64, 128, 256, 512];
for batch_size in batch_sizes {
tester.set_batch_size(batch_size);
let output = tester.full_scan().await;
assert_eq!(data, output);
}
}

View File

@@ -423,6 +423,7 @@ mod tests {
use datatypes::schema::ColumnSchema;
use datatypes::vectors::*;
use store_api::manifest::Manifest;
use store_api::storage::ReadContext;
use table::requests::{AlterKind, InsertRequest};
use super::*;
@@ -526,6 +527,46 @@ mod tests {
assert_eq!(tss.to_arrow_array(), columns[0]);
}
#[tokio::test]
async fn test_create_table_scan_batches() {
common_telemetry::init_default_ut_logging();
let (_engine, table, _schema, _dir) = test_util::setup_test_engine_and_table().await;
// TODO(yingwen): Custom batch size once the table support setting batch_size.
let default_batch_size = ReadContext::default().batch_size;
// Insert more than batch size rows to the table.
let test_batch_size = default_batch_size * 4;
let mut columns_values: HashMap<String, VectorRef> = HashMap::with_capacity(4);
let hosts = StringVector::from(vec!["host1"; test_batch_size]);
let cpus = Float64Vector::from_vec(vec![55.5; test_batch_size]);
let memories = Float64Vector::from_vec(vec![1024f64; test_batch_size]);
let tss = Int64Vector::from_vec((0..test_batch_size).map(|v| v as i64).collect());
columns_values.insert("host".to_string(), Arc::new(hosts));
columns_values.insert("cpu".to_string(), Arc::new(cpus));
columns_values.insert("memory".to_string(), Arc::new(memories));
columns_values.insert("ts".to_string(), Arc::new(tss.clone()));
let insert_req = InsertRequest {
table_name: "demo".to_string(),
columns_values,
};
assert_eq!(test_batch_size, table.insert(insert_req).await.unwrap());
let stream = table.scan(&None, &[], None).await.unwrap();
let batches = util::collect(stream).await.unwrap();
let mut total = 0;
for batch in batches {
assert_eq!(batch.df_recordbatch.num_columns(), 4);
let ts = batch.df_recordbatch.column(3);
let expect = tss.slice(total, ts.len());
assert_eq!(expect.to_arrow_array(), *ts);
total += ts.len();
}
assert_eq!(test_batch_size, total);
}
#[tokio::test]
async fn test_create_if_not_exists() {
common_telemetry::init_default_ut_logging();

View File

@@ -140,11 +140,7 @@ impl<R: Region> Table for MitoTable<R> {
let stream_schema = schema.clone();
let stream = Box::pin(async_stream::try_stream! {
for chunk in reader.next_chunk()
.await
.map_err(RecordBatchError::new)?
{
while let Some(chunk) = reader.next_chunk().await.map_err(RecordBatchError::new)? {
yield RecordBatch::new(stream_schema.clone(), chunk.columns)?
}
});

View File

@@ -22,6 +22,7 @@ pub type Result<T> = std::result::Result<T, MockError>;
pub struct MockChunkReader {
schema: SchemaRef,
memtable: MockMemtable,
read: bool,
}
#[async_trait]
@@ -33,6 +34,10 @@ impl ChunkReader for MockChunkReader {
}
async fn next_chunk(&mut self) -> Result<Option<Chunk>> {
if self.read {
return Ok(None);
}
let columns = self
.schema
.column_schemas()
@@ -47,6 +52,8 @@ impl ChunkReader for MockChunkReader {
builder.finish()
})
.collect::<Vec<VectorRef>>();
self.read = true;
Ok(Some(Chunk::new(columns)))
}
}
@@ -77,6 +84,7 @@ impl Snapshot for MockSnapshot {
let reader = MockChunkReader {
schema: self.schema().clone(),
memtable,
read: false,
};
Ok(ScanResponse { reader })
}