From 370ec04a9d95f71e7dd4dd7d833bd6373b6e7db8 Mon Sep 17 00:00:00 2001 From: WU Jingdi Date: Wed, 13 Dec 2023 17:53:35 +0800 Subject: [PATCH] fix: use linear interpolation to implement range LINEAR fill strategy (#2903) * fix: use linear interpolation to implement range LINEAR fill strategy * chore: update test case * chore: optimize linear interpolation implementation * chore: update test and add comment --- src/query/src/range_select/plan.rs | 228 +++++++++++++++++++++-------- 1 file changed, 167 insertions(+), 61 deletions(-) diff --git a/src/query/src/range_select/plan.rs b/src/query/src/range_select/plan.rs index bedb3242a9..b687b6d93c 100644 --- a/src/query/src/range_select/plan.rs +++ b/src/query/src/range_select/plan.rs @@ -105,9 +105,12 @@ impl Fill { } /// The input `data` contains data on a complete time series. - /// If the filling strategy is `PREV` or `LINEAR`, caller must be ensured that the incoming `data` is ascending time order. - pub fn apply_fill_strategy(&self, data: &mut [ScalarValue]) -> DfResult<()> { + /// If the filling strategy is `PREV` or `LINEAR`, caller must be ensured that the incoming `ts`&`data` is ascending time order. + pub fn apply_fill_strategy(&self, ts: &[i64], data: &mut [ScalarValue]) -> DfResult<()> { let len = data.len(); + if *self == Fill::Linear { + return Self::fill_linear(ts, data); + } for i in 0..len { if data[i].is_null() { match self { @@ -117,32 +120,101 @@ impl Fill { data[i] = data[i - 1].clone() } } - Fill::Linear => { - if 0 < i && i < len - 1 { - match (&data[i - 1], &data[i + 1]) { - (ScalarValue::Float64(Some(a)), ScalarValue::Float64(Some(b))) => { - data[i] = ScalarValue::Float64(Some((a + b) / 2.0)); - } - (ScalarValue::Float32(Some(a)), ScalarValue::Float32(Some(b))) => { - data[i] = ScalarValue::Float32(Some((a + b) / 2.0)); - } - (a, b) => { - if !a.is_null() && !b.is_null() { - return Err(DataFusionError::Execution( - "RangePlan: Apply Fill LINEAR strategy on Non-floating type".to_string())); - } else { - continue; - } - } - } - } - } + // The calculation of linear interpolation is relatively complicated. + // `Self::fill_linear` is used to dispose `Fill::Linear`. + Fill::Linear => unreachable!(), Fill::Const(v) => data[i] = v.clone(), } } } Ok(()) } + + fn fill_linear(ts: &[i64], data: &mut [ScalarValue]) -> DfResult<()> { + let not_null_num = data + .iter() + .fold(0, |acc, x| if x.is_null() { acc } else { acc + 1 }); + // We need at least two non-empty data points to perform linear interpolation + if not_null_num < 2 { + return Ok(()); + } + let mut index = 0; + let mut head: Option = None; + let mut tail: Option = None; + while index < data.len() { + // find null interval [start, end) + // start is null, end is not-null + let start = data[index..] + .iter() + .position(ScalarValue::is_null) + .unwrap_or(data.len() - index) + + index; + if start == data.len() { + break; + } + let end = data[start..] + .iter() + .position(|r| !r.is_null()) + .unwrap_or(data.len() - start) + + start; + index = end + 1; + // head or tail null dispose later, record start/end first + if start == 0 { + head = Some(end); + } else if end == data.len() { + tail = Some(start); + } else { + linear_interpolation(ts, data, start - 1, end, start, end)?; + } + } + // dispose head null interval + if let Some(end) = head { + linear_interpolation(ts, data, end, end + 1, 0, end)?; + } + // dispose tail null interval + if let Some(start) = tail { + linear_interpolation(ts, data, start - 2, start - 1, start, data.len())?; + } + Ok(()) + } +} + +/// use `(ts[i1], data[i1])`, `(ts[i2], data[i2])` as endpoint, linearly interpolates element over the interval `[start, end)` +fn linear_interpolation( + ts: &[i64], + data: &mut [ScalarValue], + i1: usize, + i2: usize, + start: usize, + end: usize, +) -> DfResult<()> { + let (x0, x1) = (ts[i1] as f64, ts[i2] as f64); + let (y0, y1, is_float32) = match (&data[i1], &data[i2]) { + (ScalarValue::Float64(Some(y0)), ScalarValue::Float64(Some(y1))) => (*y0, *y1, false), + (ScalarValue::Float32(Some(y0)), ScalarValue::Float32(Some(y1))) => { + (*y0 as f64, *y1 as f64, true) + } + _ => { + return Err(DataFusionError::Execution( + "RangePlan: Apply Fill LINEAR strategy on Non-floating type".to_string(), + )); + } + }; + // To avoid divide zero error, kind of defensive programming + if x1 == x0 { + return Err(DataFusionError::Execution( + "RangePlan: Linear interpolation using the same coordinate points".to_string(), + )); + } + for i in start..end { + let val = y0 + (y1 - y0) / (x1 - x0) * (ts[i] as f64 - x0); + data[i] = if is_float32 { + ScalarValue::Float32(Some(val as f32)) + } else { + ScalarValue::Float64(Some(val)) + } + } + Ok(()) } #[derive(Eq, Clone, Debug)] @@ -859,25 +931,16 @@ impl RangeSelectStream { } in self.series_map.values() { // collect data on time series - if !need_sort_output { - for (ts, accumulators) in align_ts_accumulator { - for (i, accumulator) in accumulators.iter().enumerate() { - all_scalar[i].push(accumulator.evaluate()?); - } - ts_builder.append_value(*ts); - } - } else { - let mut keys = align_ts_accumulator.keys().copied().collect::>(); - keys.sort(); - for key in &keys { - for (i, accumulator) in - align_ts_accumulator.get(key).unwrap().iter().enumerate() - { - all_scalar[i].push(accumulator.evaluate()?); - } - } - ts_builder.append_slice(&keys); + let mut align_ts = align_ts_accumulator.keys().copied().collect::>(); + if need_sort_output { + align_ts.sort(); } + for ts in &align_ts { + for (i, accumulator) in align_ts_accumulator.get(ts).unwrap().iter().enumerate() { + all_scalar[i].push(accumulator.evaluate()?); + } + } + ts_builder.append_slice(&align_ts); // apply fill strategy on time series for ( i, @@ -891,7 +954,7 @@ impl RangeSelectStream { if let Some(data_type) = need_cast { cast_scalar_values(time_series_data, data_type)?; } - fill.apply_fill_strategy(time_series_data)?; + fill.apply_fill_strategy(&align_ts, time_series_data)?; } by_rows.resize(by_rows.len() + align_ts_accumulator.len(), row.row()); start_index += align_ts_accumulator.len(); @@ -1220,13 +1283,13 @@ mod test { \n| 1.0 | 1.0 | 1970-01-01T00:00:10 | host1 |\ \n| 1.0 | 1.5 | 1970-01-01T00:00:15 | host1 |\ \n| 2.0 | 2.0 | 1970-01-01T00:00:20 | host1 |\ - \n| 2.0 | | 1970-01-01T00:00:25 | host1 |\ + \n| 2.0 | 2.5 | 1970-01-01T00:00:25 | host1 |\ \n| 3.0 | 3.0 | 1970-01-01T00:00:00 | host2 |\ \n| 3.0 | 3.5 | 1970-01-01T00:00:05 | host2 |\ \n| 4.0 | 4.0 | 1970-01-01T00:00:10 | host2 |\ \n| 4.0 | 4.5 | 1970-01-01T00:00:15 | host2 |\ \n| 5.0 | 5.0 | 1970-01-01T00:00:20 | host2 |\ - \n| 5.0 | | 1970-01-01T00:00:25 | host2 |\ + \n| 5.0 | 5.5 | 1970-01-01T00:00:25 | host2 |\ \n+------------+------------+---------------------+-------+", ); do_range_select_test(10_000, 5_000, 5_000, Fill::Linear, true, expected).await; @@ -1266,13 +1329,13 @@ mod test { \n| 1.0 | 1.0 | 1970-01-01T00:00:10 | host1 |\ \n| 1.0 | 1.5 | 1970-01-01T00:00:15 | host1 |\ \n| 2.0 | 2.0 | 1970-01-01T00:00:20 | host1 |\ - \n| 2.0 | | 1970-01-01T00:00:25 | host1 |\ + \n| 2.0 | 2.5 | 1970-01-01T00:00:25 | host1 |\ \n| 3.0 | 3.0 | 1970-01-01T00:00:00 | host2 |\ \n| 3.0 | 3.5 | 1970-01-01T00:00:05 | host2 |\ \n| 4.0 | 4.0 | 1970-01-01T00:00:10 | host2 |\ \n| 4.0 | 4.5 | 1970-01-01T00:00:15 | host2 |\ \n| 5.0 | 5.0 | 1970-01-01T00:00:20 | host2 |\ - \n| 5.0 | | 1970-01-01T00:00:25 | host2 |\ + \n| 5.0 | 5.5 | 1970-01-01T00:00:25 | host2 |\ \n+------------+------------+---------------------+-------+", ); do_range_select_test(10_000, 5_000, 5_000, Fill::Linear, false, expected).await; @@ -1339,29 +1402,72 @@ mod test { ScalarValue::UInt8(None), ScalarValue::UInt8(Some(9)), ]; - Fill::Null.apply_fill_strategy(&mut test1).unwrap(); + Fill::Null.apply_fill_strategy(&[], &mut test1).unwrap(); assert_eq!(test1[1], ScalarValue::UInt8(None)); - Fill::Prev.apply_fill_strategy(&mut test1).unwrap(); + Fill::Prev.apply_fill_strategy(&[], &mut test1).unwrap(); assert_eq!(test1[1], ScalarValue::UInt8(Some(8))); test1[1] = ScalarValue::UInt8(None); Fill::Const(ScalarValue::UInt8(Some(10))) - .apply_fill_strategy(&mut test1) + .apply_fill_strategy(&[], &mut test1) .unwrap(); assert_eq!(test1[1], ScalarValue::UInt8(Some(10))); - test1[1] = ScalarValue::UInt8(None); - assert_eq!( - Fill::Linear - .apply_fill_strategy(&mut test1) - .unwrap_err() - .to_string(), - "Execution error: RangePlan: Apply Fill LINEAR strategy on Non-floating type" - ); - let mut test2 = vec![ - ScalarValue::Float32(Some(8.0)), + } + + #[test] + fn test_fill_linear() { + let ts = vec![1, 2, 3, 4, 5]; + let mut test = vec![ + ScalarValue::Float32(Some(1.0)), ScalarValue::Float32(None), - ScalarValue::Float32(Some(9.0)), + ScalarValue::Float32(Some(3.0)), + ScalarValue::Float32(None), + ScalarValue::Float32(Some(5.0)), ]; - Fill::Linear.apply_fill_strategy(&mut test2).unwrap(); - assert_eq!(test2[1], ScalarValue::Float32(Some(8.5))); + Fill::Linear.apply_fill_strategy(&ts, &mut test).unwrap(); + let mut test1 = vec![ + ScalarValue::Float32(None), + ScalarValue::Float32(Some(2.0)), + ScalarValue::Float32(None), + ScalarValue::Float32(Some(4.0)), + ScalarValue::Float32(None), + ]; + Fill::Linear.apply_fill_strategy(&ts, &mut test1).unwrap(); + assert_eq!(test, test1); + // test linear interpolation on irregularly spaced ts/data + let ts = vec![ + 1, // None + 3, // 1.0 + 8, // 11.0 + 30, // None + 88, // 10.0 + 108, // 5.0 + 128, // None + ]; + let mut test = vec![ + ScalarValue::Float64(None), + ScalarValue::Float64(Some(1.0)), + ScalarValue::Float64(Some(11.0)), + ScalarValue::Float64(None), + ScalarValue::Float64(Some(10.0)), + ScalarValue::Float64(Some(5.0)), + ScalarValue::Float64(None), + ]; + Fill::Linear.apply_fill_strategy(&ts, &mut test).unwrap(); + let data: Vec<_> = test + .into_iter() + .map(|x| { + let ScalarValue::Float64(Some(f)) = x else { + unreachable!() + }; + f + }) + .collect(); + assert_eq!(data, vec![-3.0, 1.0, 11.0, 10.725, 10.0, 5.0, 0.0]); + // test corner case + let ts = vec![1]; + let test = vec![ScalarValue::Float32(None)]; + let mut test1 = test.clone(); + Fill::Linear.apply_fill_strategy(&ts, &mut test1).unwrap(); + assert_eq!(test, test1); } }