fix: order by optimization (#1748)

* add some debug log

* fix: use lazy parquet reader in MitoTable::scan_to_stream to avoid IO in plan stage

* fix: unit tests

* fix: order-by optimization

* add some tests

* fix: move metric names to metrics.rs

* fix: some cr comments
This commit is contained in:
Lei, HUANG
2023-06-12 11:45:43 +08:00
committed by GitHub
parent 7efcf868d5
commit ddcee052b2
19 changed files with 217 additions and 61 deletions

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::{Debug, Display, Formatter};
use crate::timestamp::TimeUnit;
use crate::timestamp_millis::TimestampMillis;
use crate::Timestamp;
@@ -193,6 +195,38 @@ impl<T: PartialOrd> GenericRange<T> {
pub type TimestampRange = GenericRange<Timestamp>;
impl Display for TimestampRange {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let s = match (&self.start, &self.end) {
(Some(start), Some(end)) => {
format!(
"TimestampRange{{[{}{},{}{})}}",
start.value(),
start.unit().short_name(),
end.value(),
end.unit().short_name()
)
}
(Some(start), None) => {
format!(
"TimestampRange{{[{}{},#)}}",
start.value(),
start.unit().short_name()
)
}
(None, Some(end)) => {
format!(
"TimestampRange{{[#,{}{})}}",
end.value(),
end.unit().short_name()
)
}
(None, None) => "TimestampRange{{[#,#)}}".to_string(),
};
f.write_str(&s)
}
}
impl TimestampRange {
/// Create a TimestampRange with optional inclusive end timestamp.
/// If end timestamp is present and is less than start timestamp, this method will return

View File

@@ -336,6 +336,15 @@ impl TimeUnit {
TimeUnit::Nanosecond => 1,
}
}
pub(crate) fn short_name(&self) -> &'static str {
match self {
TimeUnit::Second => "s",
TimeUnit::Millisecond => "ms",
TimeUnit::Microsecond => "us",
TimeUnit::Nanosecond => "ns",
}
}
}
impl PartialOrd for Timestamp {

View File

@@ -41,7 +41,7 @@ impl BenchContext {
batch_size,
..Default::default()
};
let iter = self.memtable.iter(&iter_ctx).unwrap();
let iter = self.memtable.iter(iter_ctx).unwrap();
for batch in iter {
batch.unwrap();
read_count += batch_size;

View File

@@ -220,7 +220,9 @@ impl ChunkReaderBuilder {
.batch_size(self.iter_ctx.batch_size);
for mem in &self.memtables {
let iter = mem.iter(&self.iter_ctx)?;
let mut iter_ctx = self.iter_ctx.clone();
iter_ctx.time_range = Some(*time_range);
let iter = mem.iter(iter_ctx)?;
reader_builder = reader_builder.push_batch_iter(iter);
}

View File

@@ -217,7 +217,7 @@ mod tests {
seq.fetch_add(1, Ordering::Relaxed);
}
let iter = memtable.iter(&IterContext::default()).unwrap();
let iter = memtable.iter(IterContext::default()).unwrap();
let file_path = sst_file_id.as_parquet();
let writer = ParquetWriter::new(&file_path, Source::Iter(iter), object_store.clone());

View File

@@ -143,7 +143,7 @@ mod tests {
&[(Some(1), Some(1)), (Some(2), Some(2))],
);
let iter = memtable.iter(&IterContext::default()).unwrap();
let iter = memtable.iter(IterContext::default()).unwrap();
let sst_path = "table1";
let layer = Arc::new(FsAccessLayer::new(sst_path, os.clone()));
let sst_info = layer

View File

@@ -267,7 +267,7 @@ impl<S: LogStore> FlushJob<S> {
let file_id = FileId::random();
// TODO(hl): Check if random file name already exists in meta.
let iter = m.iter(&iter_ctx)?;
let iter = m.iter(iter_ctx.clone())?;
let sst_layer = self.sst_layer.clone();
let write_options = WriteOptions {
sst_write_buffer_size: self.engine_config.sst_write_buffer_size,

View File

@@ -73,7 +73,7 @@ pub trait Memtable: Send + Sync + fmt::Debug {
fn write(&self, kvs: &KeyValues) -> Result<()>;
/// Iterates the memtable.
fn iter(&self, ctx: &IterContext) -> Result<BoxedBatchIterator>;
fn iter(&self, ctx: IterContext) -> Result<BoxedBatchIterator>;
/// Returns the estimated bytes allocated by this memtable from heap. Result
/// of this method may be larger than the estimated based on [`num_rows`] because

View File

@@ -145,10 +145,10 @@ impl Memtable for BTreeMemtable {
Ok(())
}
fn iter(&self, ctx: &IterContext) -> Result<BoxedBatchIterator> {
fn iter(&self, ctx: IterContext) -> Result<BoxedBatchIterator> {
assert!(ctx.batch_size > 0);
let iter = BTreeIterator::new(ctx.clone(), self.schema.clone(), self.map.clone())?;
let iter = BTreeIterator::new(ctx, self.schema.clone(), self.map.clone())?;
Ok(Box::new(iter))
}

View File

@@ -179,7 +179,7 @@ mod tests {
max_ts: i64,
min_ts: i64,
) {
let iter = mem.iter(&IterContext::default()).unwrap();
let iter = mem.iter(IterContext::default()).unwrap();
assert_eq!(min_ts, mem.stats().min_timestamp.value());
assert_eq!(max_ts, mem.stats().max_timestamp.value());

View File

@@ -196,7 +196,7 @@ struct TestContext {
fn write_iter_memtable_case(ctx: &TestContext) {
// Test iterating an empty memtable.
let mut iter = ctx.memtable.iter(&IterContext::default()).unwrap();
let mut iter = ctx.memtable.iter(IterContext::default()).unwrap();
assert!(iter.next().is_none());
// Poll the empty iterator again.
assert!(iter.next().is_none());
@@ -234,7 +234,7 @@ fn write_iter_memtable_case(ctx: &TestContext) {
batch_size,
..Default::default()
};
let mut iter = ctx.memtable.iter(&iter_ctx).unwrap();
let mut iter = ctx.memtable.iter(iter_ctx.clone()).unwrap();
assert_eq!(
ctx.schema.user_schema(),
iter.schema().projected_user_schema()
@@ -322,7 +322,7 @@ fn test_iter_batch_size() {
..Default::default()
};
let mut iter = ctx.memtable.iter(&iter_ctx).unwrap();
let mut iter = ctx.memtable.iter(iter_ctx.clone()).unwrap();
check_iter_batch_size(&mut *iter, total, batch_size);
}
});
@@ -355,7 +355,7 @@ fn test_duplicate_key_across_batch() {
..Default::default()
};
let mut iter = ctx.memtable.iter(&iter_ctx).unwrap();
let mut iter = ctx.memtable.iter(iter_ctx.clone()).unwrap();
check_iter_content(
&mut *iter,
&[1000, 1001, 2000, 2001], // keys
@@ -391,7 +391,7 @@ fn test_duplicate_key_in_batch() {
..Default::default()
};
let mut iter = ctx.memtable.iter(&iter_ctx).unwrap();
let mut iter = ctx.memtable.iter(iter_ctx.clone()).unwrap();
check_iter_content(
&mut *iter,
&[1000, 1001, 2001], // keys
@@ -440,7 +440,7 @@ fn test_sequence_visibility() {
time_range: None,
};
let mut iter = ctx.memtable.iter(&iter_ctx).unwrap();
let mut iter = ctx.memtable.iter(iter_ctx).unwrap();
check_iter_content(
&mut *iter,
&[], // keys
@@ -459,7 +459,7 @@ fn test_sequence_visibility() {
time_range: None,
};
let mut iter = ctx.memtable.iter(&iter_ctx).unwrap();
let mut iter = ctx.memtable.iter(iter_ctx).unwrap();
check_iter_content(
&mut *iter,
&[1000], // keys
@@ -478,7 +478,7 @@ fn test_sequence_visibility() {
time_range: None,
};
let mut iter = ctx.memtable.iter(&iter_ctx).unwrap();
let mut iter = ctx.memtable.iter(iter_ctx).unwrap();
check_iter_content(
&mut *iter,
&[1000], // keys
@@ -507,7 +507,7 @@ fn test_iter_after_none() {
..Default::default()
};
let mut iter = ctx.memtable.iter(&iter_ctx).unwrap();
let mut iter = ctx.memtable.iter(iter_ctx).unwrap();
assert!(iter.next().is_some());
assert!(iter.next().is_none());
assert!(iter.next().is_none());
@@ -538,7 +538,7 @@ fn test_filter_memtable() {
..Default::default()
};
let mut iter = ctx.memtable.iter(&iter_ctx).unwrap();
let mut iter = ctx.memtable.iter(iter_ctx).unwrap();
let batch = iter.next().unwrap().unwrap();
assert_eq!(5, batch.columns.len());
assert_eq!(
@@ -574,7 +574,7 @@ fn test_memtable_projection() {
..Default::default()
};
let mut iter = ctx.memtable.iter(&iter_ctx).unwrap();
let mut iter = ctx.memtable.iter(iter_ctx).unwrap();
let batch = iter.next().unwrap().unwrap();
assert!(iter.next().is_none());

View File

@@ -38,3 +38,7 @@ pub const WRITE_BUFFER_BYTES: &str = "storage.write_buffer_bytes";
pub const MEMTABLE_WRITE_ELAPSED: &str = "storage.memtable.write.elapsed";
/// Elapsed time of preprocessing write batch.
pub const PREPROCESS_ELAPSED: &str = "storage.write.preprocess.elapsed";
/// Elapsed time for windowed scan
pub const WINDOW_SCAN_ELAPSED: &str = "query.scan.window_scan.elapsed";
/// Rows per window during window scan
pub const WINDOW_SCAN_ROWS_PER_WINDOW: &str = "query.scan.window_scan.window_row_size";

View File

@@ -16,8 +16,10 @@ use arrow::compute::SortOptions;
use arrow::row::{RowConverter, SortField};
use arrow_array::{Array, ArrayRef};
use common_recordbatch::OrderOption;
use common_telemetry::timer;
use datatypes::data_type::DataType;
use datatypes::vectors::Helper;
use metrics::histogram;
use snafu::ResultExt;
use crate::error::{self, Result};
@@ -60,6 +62,7 @@ where
R: BatchReader,
{
async fn next_batch(&mut self) -> Result<Option<Batch>> {
let _window_scan_elapsed = timer!(crate::metrics::WINDOW_SCAN_ELAPSED);
let Some(mut reader) = self.readers.pop() else { return Ok(None); };
let store_schema = self.schema.schema_to_read();
@@ -75,7 +78,12 @@ where
}
let Some(num_columns) = batches.get(0).map(|b| b.len()) else {
return Ok(Some(Batch::new(vec![])));
// the reader does not yield data, a batch of empty vectors must be returned instead of
// an empty batch without any column.
let empty_columns = store_schema.columns().iter().map(|s| {
s.desc.data_type.create_mutable_vector(0).to_vector()
}).collect();
return Ok(Some(Batch::new(empty_columns)));
};
let mut vectors_in_batch = Vec::with_capacity(num_columns);
@@ -85,6 +93,9 @@ where
vectors_in_batch
.push(arrow::compute::concat(&columns).context(error::ConvertColumnsToRowsSnafu)?);
}
if let Some(v) = vectors_in_batch.get(0) {
histogram!(crate::metrics::WINDOW_SCAN_ROWS_PER_WINDOW, v.len() as f64);
}
let sorted = sort_by_rows(&self.schema, vectors_in_batch, &self.order_options)?;
let vectors = sorted
.iter()

View File

@@ -160,6 +160,24 @@ impl<S: LogStore> TesterBase<S> {
dst
}
pub async fn scan(&self, req: ScanRequest) -> Vec<(i64, Option<String>)> {
logging::info!("Full scan with ctx {:?}", self.read_ctx);
let snapshot = self.region.snapshot(&self.read_ctx).unwrap();
let resp = snapshot.scan(&self.read_ctx, req).await.unwrap();
let mut reader = resp.reader;
let metadata = self.region.in_memory_metadata();
assert_eq!(metadata.schema(), reader.user_schema());
let mut dst = Vec::new();
while let Some(chunk) = reader.next_chunk().await.unwrap() {
let chunk = reader.project_chunk(chunk);
append_chunk_to(&chunk, &mut dst);
}
dst
}
pub fn committed_sequence(&self) -> SequenceNumber {
self.region.committed_sequence()
}

View File

@@ -17,9 +17,15 @@
use std::sync::Arc;
use std::time::Duration;
use arrow::compute::SortOptions;
use common_query::prelude::Expr;
use common_recordbatch::OrderOption;
use common_test_util::temp_dir::create_temp_dir;
use datafusion_common::Column;
use log_store::raft_engine::log_store::RaftEngineLogStore;
use store_api::storage::{FlushContext, FlushReason, OpenOptions, Region, WriteResponse};
use store_api::storage::{
FlushContext, FlushReason, OpenOptions, Region, ScanRequest, WriteResponse,
};
use crate::engine::{self, RegionMap};
use crate::flush::{FlushStrategyRef, FlushType};
@@ -105,6 +111,10 @@ impl FlushTester {
self.base().full_scan().await
}
async fn scan(&self, req: ScanRequest) -> Vec<(i64, Option<String>)> {
self.base().scan(req).await
}
async fn flush(&self, wait: Option<bool>) {
let ctx = wait
.map(|wait| FlushContext {
@@ -347,3 +357,50 @@ async fn test_schedule_engine_flush() {
let sst_dir = format!("{}/{}", store_dir, engine::region_sst_dir("", REGION_NAME));
assert!(has_parquet_file(&sst_dir));
}
#[tokio::test]
async fn test_flush_and_query_empty() {
common_telemetry::init_default_ut_logging();
let dir = create_temp_dir("flush_and_query_empty_range");
let store_dir = dir.path().to_str().unwrap();
let flush_switch = Arc::new(FlushSwitch::default());
let tester = FlushTester::new(store_dir, flush_switch.clone()).await;
tester
.put(
&(20000..30000)
.map(|v| (v as i64, Some(v as i64)))
.collect::<Vec<_>>(),
)
.await;
tester.flush(Some(true)).await;
tester
.put(
&(20100..20200)
.map(|v| (v as i64, Some(v as i64)))
.collect::<Vec<_>>(),
)
.await;
tester.flush(Some(true)).await;
use datafusion_expr::Expr as DfExpr;
let req = ScanRequest {
sequence: None,
projection: None,
filters: vec![Expr::from(datafusion_expr::binary_expr(
DfExpr::Column(Column::from("timestamp")),
datafusion_expr::Operator::GtEq,
datafusion_expr::lit(20000),
))],
output_ordering: Some(vec![OrderOption {
name: "timestamp".to_string(),
options: SortOptions {
descending: true,
nulls_first: true,
},
}]),
limit: Some(1),
};
let _ = tester.scan(req).await;
}

View File

@@ -41,10 +41,10 @@ use crate::error;
use crate::error::{DeleteSstSnafu, Result};
use crate::file_purger::{FilePurgeRequest, FilePurgerRef};
use crate::memtable::BoxedBatchIterator;
use crate::read::{Batch, BoxedBatchReader};
use crate::read::{Batch, BatchReader, BoxedBatchReader};
use crate::scheduler::Scheduler;
use crate::schema::ProjectedSchemaRef;
use crate::sst::parquet::{ParquetReader, ParquetWriter};
use crate::sst::parquet::{ChunkStream, ParquetReader, ParquetWriter};
/// Maximum level of SSTs.
pub const MAX_LEVEL: u8 = 2;
@@ -574,8 +574,7 @@ impl AccessLayer for FsAccessLayer {
opts.time_range,
);
let stream = reader.chunk_stream().await?;
Ok(Box::new(stream))
Ok(Box::new(LazyParquetBatchReader::new(reader)))
}
/// Deletes a SST file with given file id.
@@ -588,6 +587,34 @@ impl AccessLayer for FsAccessLayer {
}
}
struct LazyParquetBatchReader {
inner: ParquetReader,
stream: Option<ChunkStream>,
}
impl LazyParquetBatchReader {
fn new(inner: ParquetReader) -> Self {
Self {
inner,
stream: None,
}
}
}
#[async_trait]
impl BatchReader for LazyParquetBatchReader {
async fn next_batch(&mut self) -> Result<Option<Batch>> {
if let Some(s) = &mut self.stream {
s.next_batch().await
} else {
let mut stream = self.inner.chunk_stream().await?;
let res = stream.next_batch().await;
self.stream = Some(stream);
res
}
}
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;

View File

@@ -579,7 +579,7 @@ mod tests {
let object_store = create_object_store(path);
let sst_file_name = "test-flush.parquet";
let iter = memtable.iter(&IterContext::default()).unwrap();
let iter = memtable.iter(IterContext::default()).unwrap();
let writer = ParquetWriter::new(sst_file_name, Source::Iter(iter), object_store.clone());
writer
@@ -663,7 +663,7 @@ mod tests {
let object_store = create_object_store(path);
let sst_file_handle = new_file_handle(FileId::random());
let sst_file_name = sst_file_handle.file_name();
let iter = memtable.iter(&IterContext::default()).unwrap();
let iter = memtable.iter(IterContext::default()).unwrap();
let writer = ParquetWriter::new(&sst_file_name, Source::Iter(iter), object_store.clone());
let SstInfo {
@@ -749,7 +749,7 @@ mod tests {
let object_store = create_object_store(path);
let file_handle = new_file_handle(FileId::random());
let sst_file_name = file_handle.file_name();
let iter = memtable.iter(&IterContext::default()).unwrap();
let iter = memtable.iter(IterContext::default()).unwrap();
let writer = ParquetWriter::new(&sst_file_name, Source::Iter(iter), object_store.clone());
let SstInfo {
@@ -855,7 +855,7 @@ mod tests {
let object_store = create_object_store(path);
let sst_file_handle = new_file_handle(FileId::random());
let sst_file_name = sst_file_handle.file_name();
let iter = memtable.iter(&IterContext::default()).unwrap();
let iter = memtable.iter(IterContext::default()).unwrap();
let writer = ParquetWriter::new(&sst_file_name, Source::Iter(iter), object_store.clone());
let SstInfo {
@@ -950,7 +950,7 @@ mod tests {
builder.root(path);
let object_store = ObjectStore::new(builder).unwrap().finish();
let sst_file_name = "test-read.parquet";
let iter = memtable.iter(&IterContext::default()).unwrap();
let iter = memtable.iter(IterContext::default()).unwrap();
let writer = ParquetWriter::new(sst_file_name, Source::Iter(iter), object_store.clone());
let sst_info_opt = writer

View File

@@ -23,10 +23,11 @@ use crate::memtable::MemtableStats;
use crate::sst::FileMeta;
/// A set of predefined time windows.
const TIME_WINDOW_SIZE: [i64; 9] = [
const TIME_WINDOW_SIZE: [i64; 10] = [
1, // 1 second
60, // 1 minute
60 * 10, // 10 minute
60 * 30, // 30 minute
60 * 10, // 10 minutes
60 * 30, // 30 minutes
60 * 60, // 1 hour
2 * 60 * 60, // 2 hours
6 * 60 * 60, // 6 hours
@@ -160,11 +161,12 @@ mod tests {
#[test]
fn test_get_time_window_size() {
assert_eq!(60, min_duration_to_window_size(0));
assert_eq!(1, min_duration_to_window_size(0));
for window in TIME_WINDOW_SIZE {
assert_eq!(window, min_duration_to_window_size(window));
}
assert_eq!(60, min_duration_to_window_size(1));
assert_eq!(1, min_duration_to_window_size(1));
assert_eq!(60, min_duration_to_window_size(60));
assert_eq!(60 * 10, min_duration_to_window_size(100));
assert_eq!(60 * 30, min_duration_to_window_size(1800));
assert_eq!(60 * 60, min_duration_to_window_size(3000));
@@ -244,7 +246,7 @@ mod tests {
true,
);
assert_eq!(
vec![TimestampRange::with_unit(0, 60, TimeUnit::Second).unwrap()],
vec![TimestampRange::with_unit(0, 60, TimeUnit::Second).unwrap(),],
res
);
@@ -295,14 +297,14 @@ mod tests {
}],
true,
);
assert_eq!(
vec![
TimestampRange::with_unit(0, 60, TimeUnit::Second).unwrap(),
TimestampRange::with_unit(60, 120, TimeUnit::Second).unwrap(),
TimestampRange::with_unit(60 * 60, 61 * 60, TimeUnit::Second).unwrap(),
],
res
);
let mut expect = (0..=61)
.map(|s| TimestampRange::with_unit(s, s + 1, TimeUnit::Second).unwrap())
.collect::<Vec<_>>();
expect.push(TimestampRange::with_unit(60 * 60, 60 * 60 + 1, TimeUnit::Second).unwrap());
expect.push(TimestampRange::with_unit(60 * 60 + 1, 60 * 60 + 2, TimeUnit::Second).unwrap());
assert_eq!(expect, res);
let res = window_inference.infer_window(
&[

View File

@@ -63,16 +63,13 @@ async fn test_create_database_and_insert_query(instance: Arc<dyn MockInstance>)
.await;
assert!(matches!(output, Output::AffectedRows(2)));
let query_output = execute_sql(&instance, "select ts from test.demo order by ts").await;
let query_output = execute_sql(&instance, "select ts from test.demo order by ts limit 1").await;
match query_output {
Output::Stream(s) => {
let batches = util::collect(s).await.unwrap();
assert_eq!(1, batches[0].num_columns());
assert_eq!(
Arc::new(Int64Vector::from_vec(vec![
1655276557000_i64,
1655276558000_i64
])) as VectorRef,
Arc::new(Int64Vector::from_vec(vec![1655276557000_i64])) as VectorRef,
*batches[0].column(0)
);
}
@@ -193,7 +190,7 @@ async fn test_issue477_same_table_name_in_different_databases(instance: Arc<dyn
// Query data and assert
assert_query_result(
&instance,
"select host,ts from a.demo order by ts",
"select host,ts from a.demo order by ts limit 1",
1655276557000,
"host1",
)
@@ -329,32 +326,27 @@ async fn test_execute_insert_query_with_i64_timestamp(instance: Arc<dyn MockInst
.await;
assert!(matches!(output, Output::AffectedRows(2)));
let query_output = execute_sql(&instance, "select ts from demo order by ts").await;
let query_output = execute_sql(&instance, "select ts from demo order by ts limit 1").await;
match query_output {
Output::Stream(s) => {
let batches = util::collect(s).await.unwrap();
assert_eq!(1, batches[0].num_columns());
assert_eq!(
Arc::new(Int64Vector::from_vec(vec![
1655276557000_i64,
1655276558000_i64
])) as VectorRef,
Arc::new(Int64Vector::from_vec(vec![1655276557000_i64,])) as VectorRef,
*batches[0].column(0)
);
}
_ => unreachable!(),
}
let query_output = execute_sql(&instance, "select ts as time from demo order by ts").await;
let query_output =
execute_sql(&instance, "select ts as time from demo order by ts limit 1").await;
match query_output {
Output::Stream(s) => {
let batches = util::collect(s).await.unwrap();
assert_eq!(1, batches[0].num_columns());
assert_eq!(
Arc::new(Int64Vector::from_vec(vec![
1655276557000_i64,
1655276558000_i64
])) as VectorRef,
Arc::new(Int64Vector::from_vec(vec![1655276557000_i64,])) as VectorRef,
*batches[0].column(0)
);
}