diff --git a/src/storage/src/region/tests.rs b/src/storage/src/region/tests.rs index d0c6131c3a..7f6350b6d8 100644 --- a/src/storage/src/region/tests.rs +++ b/src/storage/src/region/tests.rs @@ -4,6 +4,7 @@ mod basic; mod flush; mod projection; +use common_telemetry::logging; use datatypes::prelude::ScalarVector; use datatypes::type_id::LogicalTypeId; use datatypes::vectors::Int64Vector; @@ -35,7 +36,7 @@ pub fn new_metadata(region_name: &str, enable_version_column: bool) -> RegionMet pub struct TesterBase { pub region: RegionImpl, write_ctx: WriteContext, - read_ctx: ReadContext, + pub read_ctx: ReadContext, } impl TesterBase { @@ -61,6 +62,8 @@ impl TesterBase { /// Scan all data. pub async fn full_scan(&self) -> Vec<(i64, Option)> { + logging::info!("Full scan with ctx {:?}", self.read_ctx); + let snapshot = self.region.snapshot(&self.read_ctx).unwrap(); let resp = snapshot diff --git a/src/storage/src/region/tests/basic.rs b/src/storage/src/region/tests/basic.rs index c74e99b16d..3265209f8c 100644 --- a/src/storage/src/region/tests/basic.rs +++ b/src/storage/src/region/tests/basic.rs @@ -76,6 +76,11 @@ impl Tester { self.base.as_ref().unwrap() } + #[inline] + fn set_batch_size(&mut self, batch_size: usize) { + self.base.as_mut().unwrap().read_ctx.batch_size = batch_size; + } + async fn put(&self, data: &[(i64, Option)]) -> WriteResponse { self.base().put(data).await } @@ -162,3 +167,24 @@ async fn test_open_empty() { let ret = tester.try_reopen().await; assert!(!ret.unwrap()); } + +#[tokio::test] +async fn test_scan_different_batch() { + let dir = TempDir::new("different-batch").unwrap(); + let store_dir = dir.path().to_str().unwrap(); + let mut tester = Tester::new(REGION_NAME, store_dir).await; + + let data: Vec<_> = (0..=2000).map(|i| (i, Some(i))).collect(); + + for chunk in data.chunks(100) { + tester.put(chunk).await; + } + + let batch_sizes = [1, 2, 4, 16, 64, 128, 256, 512]; + for batch_size in batch_sizes { + tester.set_batch_size(batch_size); + + let output = tester.full_scan().await; + assert_eq!(data, output); + } +} diff --git a/src/table-engine/src/engine.rs b/src/table-engine/src/engine.rs index 42372300ad..9342cf85b0 100644 --- a/src/table-engine/src/engine.rs +++ b/src/table-engine/src/engine.rs @@ -423,6 +423,7 @@ mod tests { use datatypes::schema::ColumnSchema; use datatypes::vectors::*; use store_api::manifest::Manifest; + use store_api::storage::ReadContext; use table::requests::{AlterKind, InsertRequest}; use super::*; @@ -526,6 +527,46 @@ mod tests { assert_eq!(tss.to_arrow_array(), columns[0]); } + #[tokio::test] + async fn test_create_table_scan_batches() { + common_telemetry::init_default_ut_logging(); + + let (_engine, table, _schema, _dir) = test_util::setup_test_engine_and_table().await; + + // TODO(yingwen): Custom batch size once the table support setting batch_size. + let default_batch_size = ReadContext::default().batch_size; + // Insert more than batch size rows to the table. + let test_batch_size = default_batch_size * 4; + let mut columns_values: HashMap = HashMap::with_capacity(4); + let hosts = StringVector::from(vec!["host1"; test_batch_size]); + let cpus = Float64Vector::from_vec(vec![55.5; test_batch_size]); + let memories = Float64Vector::from_vec(vec![1024f64; test_batch_size]); + let tss = Int64Vector::from_vec((0..test_batch_size).map(|v| v as i64).collect()); + + columns_values.insert("host".to_string(), Arc::new(hosts)); + columns_values.insert("cpu".to_string(), Arc::new(cpus)); + columns_values.insert("memory".to_string(), Arc::new(memories)); + columns_values.insert("ts".to_string(), Arc::new(tss.clone())); + + let insert_req = InsertRequest { + table_name: "demo".to_string(), + columns_values, + }; + assert_eq!(test_batch_size, table.insert(insert_req).await.unwrap()); + + let stream = table.scan(&None, &[], None).await.unwrap(); + let batches = util::collect(stream).await.unwrap(); + let mut total = 0; + for batch in batches { + assert_eq!(batch.df_recordbatch.num_columns(), 4); + let ts = batch.df_recordbatch.column(3); + let expect = tss.slice(total, ts.len()); + assert_eq!(expect.to_arrow_array(), *ts); + total += ts.len(); + } + assert_eq!(test_batch_size, total); + } + #[tokio::test] async fn test_create_if_not_exists() { common_telemetry::init_default_ut_logging(); diff --git a/src/table-engine/src/table.rs b/src/table-engine/src/table.rs index d7257e8688..5712d011dc 100644 --- a/src/table-engine/src/table.rs +++ b/src/table-engine/src/table.rs @@ -140,11 +140,7 @@ impl Table for MitoTable { let stream_schema = schema.clone(); let stream = Box::pin(async_stream::try_stream! { - - for chunk in reader.next_chunk() - .await - .map_err(RecordBatchError::new)? - { + while let Some(chunk) = reader.next_chunk().await.map_err(RecordBatchError::new)? { yield RecordBatch::new(stream_schema.clone(), chunk.columns)? } }); diff --git a/src/table-engine/src/table/test_util/mock_engine.rs b/src/table-engine/src/table/test_util/mock_engine.rs index b07ecf3eb0..d46c63f449 100644 --- a/src/table-engine/src/table/test_util/mock_engine.rs +++ b/src/table-engine/src/table/test_util/mock_engine.rs @@ -22,6 +22,7 @@ pub type Result = std::result::Result; pub struct MockChunkReader { schema: SchemaRef, memtable: MockMemtable, + read: bool, } #[async_trait] @@ -33,6 +34,10 @@ impl ChunkReader for MockChunkReader { } async fn next_chunk(&mut self) -> Result> { + if self.read { + return Ok(None); + } + let columns = self .schema .column_schemas() @@ -47,6 +52,8 @@ impl ChunkReader for MockChunkReader { builder.finish() }) .collect::>(); + self.read = true; + Ok(Some(Chunk::new(columns))) } } @@ -77,6 +84,7 @@ impl Snapshot for MockSnapshot { let reader = MockChunkReader { schema: self.schema().clone(), memtable, + read: false, }; Ok(ScanResponse { reader }) }