diff --git a/Cargo.toml b/Cargo.toml index 27fcb1b59..990bbb7ad 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -64,7 +64,7 @@ query-grammar = { version = "0.25.0", path = "./query-grammar", package = "tanti tantivy-bitpacker = { version = "0.9", path = "./bitpacker" } common = { version = "0.10", path = "./common/", package = "tantivy-common" } tokenizer-api = { version = "0.6", path = "./tokenizer-api", package = "tantivy-tokenizer-api" } -sketches-ddsketch = { version = "0.3.0", features = ["use_serde"] } +sketches-ddsketch = { path = "./sketches-ddsketch", features = ["use_serde"] } datasketches = "0.2.0" futures-util = { version = "0.3.28", optional = true } futures-channel = { version = "0.3.28", optional = true } @@ -144,6 +144,7 @@ members = [ "sstable", "tokenizer-api", "columnar", + "sketches-ddsketch", ] # Following the "fail" crate best practises, we isolate diff --git a/sketches-ddsketch/Cargo.toml b/sketches-ddsketch/Cargo.toml new file mode 100644 index 000000000..e62c2b93c --- /dev/null +++ b/sketches-ddsketch/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "sketches-ddsketch" +version = "0.3.0" +authors = ["Mike Heffner "] +edition = "2018" +license = "Apache-2.0" +readme = "README.md" +repository = "https://github.com/mheffner/rust-sketches-ddsketch" +homepage = "https://github.com/mheffner/rust-sketches-ddsketch" +description = """ +A direct port of the Golang DDSketch implementation. +""" +exclude = [".gitignore"] + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +serde = { package = "serde", version = "1.0", optional = true, features = ["derive", "serde_derive"] } + +[dev-dependencies] +approx = "0.5.1" +rand = "0.8.5" +rand_distr = "0.4.3" + +[features] +use_serde = ["serde", "serde/derive"] + diff --git a/sketches-ddsketch/LICENSE b/sketches-ddsketch/LICENSE new file mode 100644 index 000000000..ef11f508a --- /dev/null +++ b/sketches-ddsketch/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [2019] [Mike Heffner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/sketches-ddsketch/Makefile b/sketches-ddsketch/Makefile new file mode 100644 index 000000000..df0acaabf --- /dev/null +++ b/sketches-ddsketch/Makefile @@ -0,0 +1,11 @@ +clean: + cargo clean + +test: + cargo test + +test_logs: + cargo test -- --nocapture + +test_performance: + cargo test --release --jobs 1 test_performance -- --ignored --nocapture \ No newline at end of file diff --git a/sketches-ddsketch/README.md b/sketches-ddsketch/README.md new file mode 100644 index 000000000..fd45f4991 --- /dev/null +++ b/sketches-ddsketch/README.md @@ -0,0 +1,37 @@ +# sketches-ddsketch + +This is a direct port of the [Golang](https://github.com/DataDog/sketches-go) +[DDSketch](https://arxiv.org/pdf/1908.10693.pdf) quantile sketch implementation +to Rust. DDSketch is a fully-mergeable quantile sketch with relative-error +guarantees and is extremely fast. + +# DDSketch + +* Sketch size automatically grows as needed, starting with 128 bins. +* Extremely fast sample insertion and sketch merges. + +## Usage + +```rust +use sketches_ddsketch::{Config, DDSketch}; + +let config = Config::defaults(); +let mut sketch = DDSketch::new(c); + +sketch.add(1.0); +sketch.add(1.0); +sketch.add(1.0); + +// Get p=50% +let quantile = sketch.quantile(0.5).unwrap(); +assert_eq!(quantile, Some(1.0)); +``` + +## Performance + +No performance tuning has been done with this implementation of the port, so we +would expect similar profiles to the original implementation. + +Out of the box we see can achieve over 70M sample inserts/sec and 350K sketch +merges/sec. All tests run on a single core Intel i7 processor with 4.2Ghz max +clock. \ No newline at end of file diff --git a/sketches-ddsketch/src/config.rs b/sketches-ddsketch/src/config.rs new file mode 100644 index 000000000..5acf951bc --- /dev/null +++ b/sketches-ddsketch/src/config.rs @@ -0,0 +1,93 @@ +#[cfg(feature = "use_serde")] +use serde::{Deserialize, Serialize}; + +const DEFAULT_MAX_BINS: u32 = 2048; +const DEFAULT_ALPHA: f64 = 0.01; +const DEFAULT_MIN_VALUE: f64 = 1.0e-9; + +/// The configuration struct for constructing a `DDSketch` +#[derive(Copy, Clone, Debug, PartialEq)] +#[cfg_attr(feature = "use_serde", derive(Serialize, Deserialize))] +pub struct Config { + pub max_num_bins: u32, + pub gamma: f64, + pub(crate) gamma_ln: f64, + pub(crate) min_value: f64, + pub offset: i32, +} + +fn log_gamma(value: f64, gamma_ln: f64) -> f64 { + value.ln() / gamma_ln +} + +impl Config { + /// Construct a new `Config` struct with specific parameters. If you are unsure of how to + /// configure this, the `defaults` method constructs a `Config` with built-in defaults. + /// + /// `max_num_bins` is the max number of bins the DDSketch will grow to, in steps of 128 bins. + pub fn new(alpha: f64, max_num_bins: u32, min_value: f64) -> Self { + // Compute gamma the same way Java's LogarithmicMapping does: + // gamma = (1 + alpha) / (1 - alpha), which is algebraically + // the same as 1 + 2*alpha/(1-alpha). + // Using gamma.ln() (not ln_1p) to match Java's Math.log(gamma) + // for bit-exact cross-language bin index compatibility. + let gamma = (1.0 + alpha) / (1.0 - alpha); + let gamma_ln = gamma.ln(); + + Config { + max_num_bins, + gamma, + gamma_ln, + min_value, + offset: 1 - (log_gamma(min_value, gamma_ln) as i32), + } + } + + /// Return a `Config` using built-in default settings + pub fn defaults() -> Self { + Self::new(DEFAULT_ALPHA, DEFAULT_MAX_BINS, DEFAULT_MIN_VALUE) + } + + pub fn key(&self, v: f64) -> i32 { + // Match Java's LogLikeIndexMapping.index(): + // floor-based indexing for cross-language bin compatibility. + self.log_gamma(v).floor() as i32 + } + + pub fn value(&self, key: i32) -> f64 { + // Match Java's LogLikeIndexMapping.value(): + // gamma^key * (1 + relativeAccuracy) = gamma^key * 2*gamma/(gamma+1) + self.pow_gamma(key) * (2.0 * self.gamma / (1.0 + self.gamma)) + } + + pub fn log_gamma(&self, value: f64) -> f64 { + log_gamma(value, self.gamma_ln) + } + + pub fn pow_gamma(&self, key: i32) -> f64 { + ((key as f64) * self.gamma_ln).exp() + } + + pub fn min_possible(&self) -> f64 { + self.min_value + } + + /// Reconstruct a Config from a gamma value (as decoded from the binary format). + /// Uses default max_num_bins and min_value. + pub(crate) fn from_gamma(gamma: f64) -> Self { + let gamma_ln = gamma.ln(); + Config { + max_num_bins: DEFAULT_MAX_BINS, + gamma, + gamma_ln, + min_value: DEFAULT_MIN_VALUE, + offset: 1 - (log_gamma(DEFAULT_MIN_VALUE, gamma_ln) as i32), + } + } +} + +impl Default for Config { + fn default() -> Self { + Self::new(DEFAULT_ALPHA, DEFAULT_MAX_BINS, DEFAULT_MIN_VALUE) + } +} diff --git a/sketches-ddsketch/src/ddsketch.rs b/sketches-ddsketch/src/ddsketch.rs new file mode 100644 index 000000000..4d6b17375 --- /dev/null +++ b/sketches-ddsketch/src/ddsketch.rs @@ -0,0 +1,386 @@ +use std::error; +use std::fmt; + +use crate::config::Config; +use crate::store::Store; + +#[cfg(feature = "use_serde")] +use serde::{Deserialize, Serialize}; + +type Result = std::result::Result; + +/// General error type for DDSketch, represents either an invalid quantile or an +/// incompatible merge operation. +/// +#[derive(Debug, Clone)] +pub enum DDSketchError { + Quantile, + Merge, +} +impl fmt::Display for DDSketchError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + DDSketchError::Quantile => { + write!(f, "Invalid quantile, must be between 0 and 1 (inclusive)") + } + DDSketchError::Merge => write!(f, "Can not merge sketches with different configs"), + } + } +} +impl error::Error for DDSketchError { + fn source(&self) -> Option<&(dyn error::Error + 'static)> { + // Generic + None + } +} + +/// This struct represents a [DDSketch](https://arxiv.org/pdf/1908.10693.pdf) +#[derive(Clone)] +#[cfg_attr(feature = "use_serde", derive(Serialize, Deserialize))] +pub struct DDSketch { + pub(crate) config: Config, + pub(crate) store: Store, + pub(crate) negative_store: Store, + pub(crate) min: f64, + pub(crate) max: f64, + pub(crate) sum: f64, + pub(crate) zero_count: u64, +} + +impl Default for DDSketch { + fn default() -> Self { + Self::new(Default::default()) + } +} + +// XXX: functions should return Option<> in the case of empty +impl DDSketch { + /// Construct a `DDSketch`. Requires a `Config` specifying the parameters of the sketch + pub fn new(config: Config) -> Self { + DDSketch { + config, + store: Store::new(config.max_num_bins as usize), + negative_store: Store::new(config.max_num_bins as usize), + min: f64::INFINITY, + max: f64::NEG_INFINITY, + sum: 0.0, + zero_count: 0, + } + } + + /// Add the sample to the sketch + pub fn add(&mut self, v: f64) { + if v > self.config.min_possible() { + let key = self.config.key(v); + self.store.add(key); + } else if v < -self.config.min_possible() { + let key = self.config.key(-v); + self.negative_store.add(key); + } else { + self.zero_count += 1; + } + + if v < self.min { + self.min = v; + } + if self.max < v { + self.max = v; + } + self.sum += v; + } + + /// Return the quantile value for quantiles between 0.0 and 1.0. Result is an error, represented + /// as DDSketchError::Quantile if the requested quantile is outside of that range. + /// + /// If the sketch is empty the result is None, else Some(v) for the quantile value. + pub fn quantile(&self, q: f64) -> Result> { + if q < 0.0 || q > 1.0 { + return Err(DDSketchError::Quantile); + } + + if self.empty() { + return Ok(None); + } + + if q == 0.0 { + return Ok(Some(self.min)); + } else if q == 1.0 { + return Ok(Some(self.max)); + } + + let rank = (q * (self.count() as f64 - 1.0)) as u64; + let quantile; + if rank < self.negative_store.count() { + let reversed_rank = self.negative_store.count() - rank - 1; + let key = self.negative_store.key_at_rank(reversed_rank); + quantile = -self.config.value(key); + } else if rank < self.zero_count + self.negative_store.count() { + quantile = 0.0; + } else { + let key = self + .store + .key_at_rank(rank - self.zero_count - self.negative_store.count()); + quantile = self.config.value(key); + } + + Ok(Some(quantile)) + } + + /// Returns the minimum value seen, or None if sketch is empty + pub fn min(&self) -> Option { + if self.empty() { + None + } else { + Some(self.min) + } + } + + /// Returns the maximum value seen, or None if sketch is empty + pub fn max(&self) -> Option { + if self.empty() { + None + } else { + Some(self.max) + } + } + + /// Returns the sum of values seen, or None if sketch is empty + pub fn sum(&self) -> Option { + if self.empty() { + None + } else { + Some(self.sum) + } + } + + /// Returns the number of values added to the sketch + pub fn count(&self) -> usize { + (self.store.count() + self.zero_count + self.negative_store.count()) as usize + } + + /// Returns the length of the underlying `Store`. This is mainly only useful for understanding + /// how much the sketch has grown given the inserted values. + pub fn length(&self) -> usize { + self.store.length() as usize + self.negative_store.length() as usize + } + + /// Merge the contents of another sketch into this one. The sketch that is merged into this one + /// is unchanged after the merge. + pub fn merge(&mut self, o: &DDSketch) -> Result<()> { + if self.config != o.config { + return Err(DDSketchError::Merge); + } + + let was_empty = self.store.count() == 0; + + // Merge the stores + self.store.merge(&o.store); + self.negative_store.merge(&o.negative_store); + self.zero_count += o.zero_count; + + // Need to ensure we don't override min/max with initializers + // if either store were empty + if was_empty { + self.min = o.min; + self.max = o.max; + } else if o.store.count() > 0 { + if o.min < self.min { + self.min = o.min + } + if o.max > self.max { + self.max = o.max; + } + } + self.sum += o.sum; + + Ok(()) + } + + fn empty(&self) -> bool { + self.count() == 0 + } + + /// Encode this sketch into the Java-compatible binary format used by + /// `com.datadoghq.sketch.ddsketch.DDSketchWithExactSummaryStatistics`. + pub fn to_java_bytes(&self) -> Vec { + crate::encoding::encode_to_java_bytes(self) + } + + /// Decode a sketch from the Java-compatible binary format. + /// Accepts bytes produced by Java's `DDSketchWithExactSummaryStatistics.encode()` + /// with or without the `0x02` version prefix. + pub fn from_java_bytes(bytes: &[u8]) -> std::result::Result { + crate::encoding::decode_from_java_bytes(bytes) + } +} + +#[cfg(test)] +mod tests { + use approx::assert_relative_eq; + + use crate::Config; + use crate::DDSketch; + + #[test] + fn test_add_zero() { + let alpha = 0.01; + let c = Config::new(alpha, 2048, 10e-9); + let mut dd = DDSketch::new(c); + dd.add(0.0); + } + + #[test] + fn test_quartiles() { + let alpha = 0.01; + let c = Config::new(alpha, 2048, 10e-9); + let mut dd = DDSketch::new(c); + + // Initialize sketch with {1.0, 2.0, 3.0, 4.0} + for i in 1..5 { + dd.add(i as f64); + } + + // We expect the following mappings from quantile to value: + // [0,0.33]: 1.0, (0.34,0.66]: 2.0, (0.67,0.99]: 3.0, (0.99, 1.0]: 4.0 + let test_cases = vec![ + (0.0, 1.0), + (0.25, 1.0), + (0.33, 1.0), + (0.34, 2.0), + (0.5, 2.0), + (0.66, 2.0), + (0.67, 3.0), + (0.75, 3.0), + (0.99, 3.0), + (1.0, 4.0), + ]; + + for (q, val) in test_cases { + assert_relative_eq!(dd.quantile(q).unwrap().unwrap(), val, max_relative = alpha); + } + } + + #[test] + fn test_neg_quartiles() { + let alpha = 0.01; + let c = Config::new(alpha, 2048, 10e-9); + let mut dd = DDSketch::new(c); + + // Initialize sketch with {1.0, 2.0, 3.0, 4.0} + for i in 1..5 { + dd.add(-i as f64); + } + + let test_cases = vec![ + (0.0, -4.0), + (0.25, -4.0), + (0.5, -3.0), + (0.75, -2.0), + (1.0, -1.0), + ]; + + for (q, val) in test_cases { + assert_relative_eq!(dd.quantile(q).unwrap().unwrap(), val, max_relative = alpha); + } + } + + #[test] + fn test_simple_quantile() { + let c = Config::defaults(); + let mut dd = DDSketch::new(c); + + for i in 1..101 { + dd.add(i as f64); + } + + assert_eq!(dd.quantile(0.95).unwrap().unwrap().ceil(), 95.0); + + assert!(dd.quantile(-1.01).is_err()); + assert!(dd.quantile(1.01).is_err()); + } + + #[test] + fn test_empty_sketch() { + let c = Config::defaults(); + let dd = DDSketch::new(c); + + assert_eq!(dd.quantile(0.98).unwrap(), None); + assert_eq!(dd.max(), None); + assert_eq!(dd.min(), None); + assert_eq!(dd.sum(), None); + assert_eq!(dd.count(), 0); + + assert!(dd.quantile(1.01).is_err()); + } + + #[test] + fn test_basic_histogram_data() { + let values = &[ + 0.754225035, + 0.752900282, + 0.752812246, + 0.752602367, + 0.754310155, + 0.753525981, + 0.752981082, + 0.752715536, + 0.751667941, + 0.755079054, + 0.753528150, + 0.755188464, + 0.752508723, + 0.750064549, + 0.753960428, + 0.751139298, + 0.752523560, + 0.753253428, + 0.753498342, + 0.751858358, + 0.752104636, + 0.753841300, + 0.754467374, + 0.753814334, + 0.750881719, + 0.753182556, + 0.752576884, + 0.753945708, + 0.753571911, + 0.752314573, + 0.752586651, + ]; + + let c = Config::defaults(); + let mut dd = DDSketch::new(c); + + for value in values { + dd.add(*value); + } + + assert_eq!(dd.max(), Some(0.755188464)); + assert_eq!(dd.min(), Some(0.750064549)); + assert_eq!(dd.count(), 31); + assert_eq!(dd.sum(), Some(23.343630625000003)); + + assert!(dd.quantile(0.25).unwrap().is_some()); + assert!(dd.quantile(0.5).unwrap().is_some()); + assert!(dd.quantile(0.75).unwrap().is_some()); + } + + #[test] + fn test_length() { + let mut dd = DDSketch::default(); + assert_eq!(dd.length(), 0); + + dd.add(1.0); + assert_eq!(dd.length(), 128); + dd.add(2.0); + dd.add(3.0); + assert_eq!(dd.length(), 128); + + dd.add(-1.0); + assert_eq!(dd.length(), 256); + dd.add(-2.0); + dd.add(-3.0); + assert_eq!(dd.length(), 256); + } +} diff --git a/sketches-ddsketch/src/encoding.rs b/sketches-ddsketch/src/encoding.rs new file mode 100644 index 000000000..3bacd3882 --- /dev/null +++ b/sketches-ddsketch/src/encoding.rs @@ -0,0 +1,810 @@ +//! Java-compatible binary encoding/decoding for DDSketch. +//! +//! This module implements the binary format used by the Java +//! `com.datadoghq.sketch.ddsketch.DDSketchWithExactSummaryStatistics` class +//! from the DataDog/sketches-java library. It enables cross-language +//! serialization so that sketches produced in Rust can be deserialized +//! and merged by Java consumers. + +use std::convert::TryInto; +use std::fmt; + +use crate::config::Config; +use crate::ddsketch::DDSketch; +use crate::store::Store; + +// --------------------------------------------------------------------------- +// Flag byte layout: (subflag << 2) | type_ordinal +// --------------------------------------------------------------------------- + +const FLAG_TYPE_SKETCH_FEATURES: u8 = 0b00; +const FLAG_TYPE_POSITIVE_STORE: u8 = 0b01; +const FLAG_TYPE_INDEX_MAPPING: u8 = 0b10; +const FLAG_TYPE_NEGATIVE_STORE: u8 = 0b11; + +const FLAG_INDEX_MAPPING_LOG: u8 = (0 << 2) | FLAG_TYPE_INDEX_MAPPING; // 0x02 +const FLAG_ZERO_COUNT: u8 = (1 << 2) | FLAG_TYPE_SKETCH_FEATURES; // 0x04 +const FLAG_COUNT: u8 = (0x28 << 2) | FLAG_TYPE_SKETCH_FEATURES; // 0xA0 +const FLAG_SUM: u8 = (0x21 << 2) | FLAG_TYPE_SKETCH_FEATURES; // 0x84 +const FLAG_MIN: u8 = (0x22 << 2) | FLAG_TYPE_SKETCH_FEATURES; // 0x88 +const FLAG_MAX: u8 = (0x23 << 2) | FLAG_TYPE_SKETCH_FEATURES; // 0x8C + +// BinEncodingMode subflags +const BIN_MODE_INDEX_DELTAS_AND_COUNTS: u8 = 1; +const BIN_MODE_INDEX_DELTAS: u8 = 2; +const BIN_MODE_CONTIGUOUS_COUNTS: u8 = 3; + +const VAR_DOUBLE_ROTATE_DISTANCE: u32 = 6; +const MAX_VAR_LEN_64: usize = 9; + +const DEFAULT_MAX_BINS: u32 = 2048; + +// --------------------------------------------------------------------------- +// Error type +// --------------------------------------------------------------------------- + +#[derive(Debug, Clone)] +pub enum DecodeError { + UnexpectedEof, + InvalidFlag(u8), + InvalidData(String), +} + +impl fmt::Display for DecodeError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + DecodeError::UnexpectedEof => write!(f, "unexpected end of input"), + DecodeError::InvalidFlag(b) => write!(f, "invalid flag byte: 0x{:02X}", b), + DecodeError::InvalidData(msg) => write!(f, "invalid data: {}", msg), + } + } +} + +impl std::error::Error for DecodeError {} + +// --------------------------------------------------------------------------- +// VarEncoding — bit-exact port of Java VarEncodingHelper +// --------------------------------------------------------------------------- + +fn encode_unsigned_var_long(out: &mut Vec, mut value: u64) { + let length = ((63 - value.leading_zeros() as i32) / 7).max(0).min(8); + for _ in 0..length { + out.push((value as u8) | 0x80); + value >>= 7; + } + out.push(value as u8); +} + +fn decode_unsigned_var_long(input: &mut &[u8]) -> Result { + let mut value: u64 = 0; + let mut shift: u32 = 0; + loop { + let next = read_byte(input)?; + if next < 0x80 || shift == 56 { + return Ok(value | ((next as u64) << shift)); + } + value |= ((next as u64) & 0x7F) << shift; + shift += 7; + } +} + +fn encode_signed_var_long(out: &mut Vec, value: i64) { + let encoded = ((value >> 63) ^ (value << 1)) as u64; + encode_unsigned_var_long(out, encoded); +} + +fn decode_signed_var_long(input: &mut &[u8]) -> Result { + let encoded = decode_unsigned_var_long(input)?; + Ok(((encoded >> 1) as i64) ^ -((encoded & 1) as i64)) +} + +fn double_to_var_bits(value: f64) -> u64 { + let bits = f64::to_bits(value + 1.0).wrapping_sub(f64::to_bits(1.0_f64)); + bits.rotate_left(VAR_DOUBLE_ROTATE_DISTANCE) +} + +fn var_bits_to_double(bits: u64) -> f64 { + f64::from_bits( + bits.rotate_right(VAR_DOUBLE_ROTATE_DISTANCE) + .wrapping_add(f64::to_bits(1.0_f64)), + ) - 1.0 +} + +fn encode_var_double(out: &mut Vec, value: f64) { + let mut bits = double_to_var_bits(value); + for _ in 0..MAX_VAR_LEN_64 - 1 { + let next = (bits >> 57) as u8; + bits <<= 7; + if bits == 0 { + out.push(next); + return; + } + out.push(next | 0x80); + } + out.push((bits >> 56) as u8); +} + +fn decode_var_double(input: &mut &[u8]) -> Result { + let mut bits: u64 = 0; + let mut shift: i32 = 57; // 8*8 - 7 + loop { + let next = read_byte(input)?; + if shift == 1 { + bits |= next as u64; + break; + } + if next < 0x80 { + bits |= (next as u64) << shift; + break; + } + bits |= ((next as u64) & 0x7F) << shift; + shift -= 7; + } + Ok(var_bits_to_double(bits)) +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +fn read_byte(input: &mut &[u8]) -> Result { + if input.is_empty() { + return Err(DecodeError::UnexpectedEof); + } + let b = input[0]; + *input = &input[1..]; + Ok(b) +} + +fn write_f64_le(out: &mut Vec, value: f64) { + out.extend_from_slice(&value.to_le_bytes()); +} + +fn read_f64_le(input: &mut &[u8]) -> Result { + if input.len() < 8 { + return Err(DecodeError::UnexpectedEof); + } + let bytes: [u8; 8] = input[..8].try_into().unwrap(); + *input = &input[8..]; + Ok(f64::from_le_bytes(bytes)) +} + +// --------------------------------------------------------------------------- +// Store encoding/decoding +// --------------------------------------------------------------------------- + +/// Iterate the non-zero bins in the store as (absolute_index, count) pairs. +fn non_zero_bins(store: &Store) -> Vec<(i32, u64)> { + if store.count == 0 { + return Vec::new(); + } + let start = (store.min_key - store.offset) as usize; + let end = (store.max_key - store.offset + 1) as usize; + let end = end.min(store.bins.len()); + let mut result = Vec::new(); + for i in start..end { + let count = store.bins[i]; + if count > 0 { + result.push((i as i32 + store.offset, count)); + } + } + result +} + +fn encode_store(out: &mut Vec, store: &Store, flag_type: u8) { + let bins = non_zero_bins(store); + if bins.is_empty() { + return; + } + + // INDEX_DELTAS_AND_COUNTS mode + out.push((BIN_MODE_INDEX_DELTAS_AND_COUNTS << 2) | flag_type); + encode_unsigned_var_long(out, bins.len() as u64); + + let mut prev_index: i64 = 0; + for &(index, count) in &bins { + encode_signed_var_long(out, (index as i64) - prev_index); + encode_var_double(out, count as f64); + prev_index = index as i64; + } +} + +fn decode_store(input: &mut &[u8], subflag: u8, bin_limit: usize) -> Result { + let mode = subflag; + let num_bins = decode_unsigned_var_long(input)? as usize; + let mut store = Store::new(bin_limit); + + match mode { + BIN_MODE_INDEX_DELTAS_AND_COUNTS => { + let mut index: i64 = 0; + for _ in 0..num_bins { + let delta = decode_signed_var_long(input)?; + let count = decode_var_double(input)?; + index += delta; + store.add_count(index as i32, count as u64); + } + } + BIN_MODE_INDEX_DELTAS => { + let mut index: i64 = 0; + for _ in 0..num_bins { + let delta = decode_signed_var_long(input)?; + index += delta; + store.add_count(index as i32, 1); + } + } + BIN_MODE_CONTIGUOUS_COUNTS => { + let start_index = decode_signed_var_long(input)?; + let index_delta = decode_signed_var_long(input)?; + let mut index = start_index; + for _ in 0..num_bins { + let count = decode_var_double(input)?; + store.add_count(index as i32, count as u64); + index += index_delta; + } + } + other => { + return Err(DecodeError::InvalidData(format!( + "unknown bin encoding mode subflag: {}", + other + ))); + } + } + + Ok(store) +} + +// --------------------------------------------------------------------------- +// Top-level encode / decode +// --------------------------------------------------------------------------- + +/// Encode a DDSketch into the Java-compatible binary format. +/// +/// The output follows the encoding order of +/// `DDSketchWithExactSummaryStatistics.encode()` then `DDSketch.encode()`: +/// +/// 1. Summary statistics: COUNT, MIN, MAX (if count > 0) +/// 2. SUM (if sum != 0) +/// 3. Index mapping (LOG layout): gamma, indexOffset +/// 4. Zero count (if > 0) +/// 5. Positive store bins +/// 6. Negative store bins +pub fn encode_to_java_bytes(sketch: &DDSketch) -> Vec { + let mut out = Vec::new(); + + let count = sketch.count() as f64; + + // --- Summary statistics (DDSketchWithExactSummaryStatistics.encode) --- + if count != 0.0 { + out.push(FLAG_COUNT); + encode_var_double(&mut out, count); + out.push(FLAG_MIN); + write_f64_le(&mut out, sketch.min); + out.push(FLAG_MAX); + write_f64_le(&mut out, sketch.max); + } + if sketch.sum != 0.0 { + out.push(FLAG_SUM); + write_f64_le(&mut out, sketch.sum); + } + + // --- DDSketch.encode (index mapping + zero count + stores) --- + + // Index mapping (LOG layout, indexOffset = 0.0) + out.push(FLAG_INDEX_MAPPING_LOG); + write_f64_le(&mut out, sketch.config.gamma); + write_f64_le(&mut out, 0.0_f64); + + // Zero count + if sketch.zero_count != 0 { + out.push(FLAG_ZERO_COUNT); + encode_var_double(&mut out, sketch.zero_count as f64); + } + + // Positive store + encode_store(&mut out, &sketch.store, FLAG_TYPE_POSITIVE_STORE); + + // Negative store + encode_store(&mut out, &sketch.negative_store, FLAG_TYPE_NEGATIVE_STORE); + + out +} + +/// Decode a DDSketch from the Java-compatible binary format. +/// +/// Accepts bytes with or without a `0x02` version prefix. +pub fn decode_from_java_bytes(bytes: &[u8]) -> Result { + if bytes.is_empty() { + return Err(DecodeError::UnexpectedEof); + } + + let mut input = bytes; + + // Skip optional version prefix (0x02 followed by a valid flag byte) + if input.len() >= 2 && input[0] == 0x02 { + let second = input[1]; + if is_valid_flag_byte(second) { + input = &input[1..]; + } + } + + let mut gamma: Option = None; + let mut zero_count: f64 = 0.0; + let mut sum: f64 = 0.0; + let mut min: f64 = f64::INFINITY; + let mut max: f64 = f64::NEG_INFINITY; + let mut positive_store: Option = None; + let mut negative_store: Option = None; + + while !input.is_empty() { + let flag = read_byte(&mut input)?; + let flag_type = flag & 0x03; + let subflag = flag >> 2; + + match flag_type { + FLAG_TYPE_INDEX_MAPPING => { + gamma = Some(read_f64_le(&mut input)?); + let _index_offset = read_f64_le(&mut input)?; + } + FLAG_TYPE_SKETCH_FEATURES => { + if flag == FLAG_ZERO_COUNT { + zero_count += decode_var_double(&mut input)?; + } else if flag == FLAG_COUNT { + let _count = decode_var_double(&mut input)?; + } else if flag == FLAG_SUM { + sum = read_f64_le(&mut input)?; + } else if flag == FLAG_MIN { + min = read_f64_le(&mut input)?; + } else if flag == FLAG_MAX { + max = read_f64_le(&mut input)?; + } else { + return Err(DecodeError::InvalidFlag(flag)); + } + } + FLAG_TYPE_POSITIVE_STORE => { + positive_store = Some(decode_store(&mut input, subflag, DEFAULT_MAX_BINS as usize)?); + } + FLAG_TYPE_NEGATIVE_STORE => { + negative_store = Some(decode_store(&mut input, subflag, DEFAULT_MAX_BINS as usize)?); + } + _ => { + return Err(DecodeError::InvalidFlag(flag)); + } + } + } + + let g = gamma.unwrap_or_else(|| Config::defaults().gamma); + let config = Config::from_gamma(g); + let pos = positive_store.unwrap_or_else(|| Store::new(config.max_num_bins as usize)); + let neg = negative_store.unwrap_or_else(|| Store::new(config.max_num_bins as usize)); + + Ok(DDSketch { + config, + store: pos, + negative_store: neg, + min, + max, + sum, + zero_count: zero_count as u64, + }) +} + +/// Check whether a byte is a valid flag byte for the DDSketch binary format. +/// Used to detect the optional version prefix. +fn is_valid_flag_byte(b: u8) -> bool { + matches!( + b, + FLAG_ZERO_COUNT + | FLAG_COUNT + | FLAG_SUM + | FLAG_MIN + | FLAG_MAX + | FLAG_INDEX_MAPPING_LOG + ) || { + let flag_type = b & 0x03; + let subflag = b >> 2; + (flag_type == FLAG_TYPE_POSITIVE_STORE || flag_type == FLAG_TYPE_NEGATIVE_STORE) + && (1..=3).contains(&subflag) + } || { + // INDEX_MAPPING with other layouts (LOG_LINEAR=1..LOG_QUARTIC=4) + let flag_type = b & 0x03; + let subflag = b >> 2; + flag_type == FLAG_TYPE_INDEX_MAPPING && subflag <= 4 + } +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + use crate::{Config, DDSketch}; + + // --- VarEncoding unit tests --- + + #[test] + fn test_unsigned_var_long_zero() { + let mut buf = Vec::new(); + encode_unsigned_var_long(&mut buf, 0); + assert_eq!(buf, vec![0x00]); + + let mut input = buf.as_slice(); + assert_eq!(decode_unsigned_var_long(&mut input).unwrap(), 0); + assert!(input.is_empty()); + } + + #[test] + fn test_unsigned_var_long_small() { + let mut buf = Vec::new(); + encode_unsigned_var_long(&mut buf, 1); + assert_eq!(buf, vec![0x01]); + + let mut input = buf.as_slice(); + assert_eq!(decode_unsigned_var_long(&mut input).unwrap(), 1); + } + + #[test] + fn test_unsigned_var_long_128() { + let mut buf = Vec::new(); + encode_unsigned_var_long(&mut buf, 128); + assert_eq!(buf, vec![0x80, 0x01]); + + let mut input = buf.as_slice(); + assert_eq!(decode_unsigned_var_long(&mut input).unwrap(), 128); + } + + #[test] + fn test_unsigned_var_long_roundtrip() { + for &v in &[0u64, 1, 127, 128, 255, 256, 16383, 16384, u64::MAX] { + let mut buf = Vec::new(); + encode_unsigned_var_long(&mut buf, v); + let mut input = buf.as_slice(); + let decoded = decode_unsigned_var_long(&mut input).unwrap(); + assert_eq!(decoded, v, "roundtrip failed for {}", v); + assert!(input.is_empty()); + } + } + + #[test] + fn test_signed_var_long_roundtrip() { + for &v in &[0i64, 1, -1, 63, -64, 64, -65, i64::MAX, i64::MIN] { + let mut buf = Vec::new(); + encode_signed_var_long(&mut buf, v); + let mut input = buf.as_slice(); + let decoded = decode_signed_var_long(&mut input).unwrap(); + assert_eq!(decoded, v, "roundtrip failed for {}", v); + assert!(input.is_empty()); + } + } + + #[test] + fn test_var_double_roundtrip() { + for &v in &[0.0, 1.0, 2.0, 5.0, 15.0, 42.0, 100.0, 1e-9, 1e15, 0.5, 3.14159] { + let mut buf = Vec::new(); + encode_var_double(&mut buf, v); + let mut input = buf.as_slice(); + let decoded = decode_var_double(&mut input).unwrap(); + assert!( + (decoded - v).abs() < 1e-15 || decoded == v, + "roundtrip failed for {}: got {}", + v, + decoded + ); + assert!(input.is_empty()); + } + } + + #[test] + fn test_var_double_small_integers() { + // Small non-negative integers should encode compactly + let mut buf = Vec::new(); + encode_var_double(&mut buf, 1.0); + assert_eq!(buf.len(), 1, "VarDouble(1.0) should be 1 byte"); + + buf.clear(); + encode_var_double(&mut buf, 5.0); + assert_eq!(buf.len(), 1, "VarDouble(5.0) should be 1 byte"); + } + + // --- DDSketch encode/decode roundtrip tests --- + + #[test] + fn test_encode_empty_sketch() { + let sketch = DDSketch::new(Config::defaults()); + let bytes = sketch.to_java_bytes(); + // Empty sketch: no summary stats, just index mapping + assert!(!bytes.is_empty()); + + let decoded = DDSketch::from_java_bytes(&bytes).unwrap(); + assert_eq!(decoded.count(), 0); + assert_eq!(decoded.min(), None); + assert_eq!(decoded.max(), None); + assert_eq!(decoded.sum(), None); + } + + #[test] + fn test_encode_simple_sketch() { + let mut sketch = DDSketch::new(Config::defaults()); + for v in [1.0, 2.0, 3.0, 4.0, 5.0] { + sketch.add(v); + } + + let bytes = sketch.to_java_bytes(); + let decoded = DDSketch::from_java_bytes(&bytes).unwrap(); + + assert_eq!(decoded.count(), 5); + assert_eq!(decoded.min(), Some(1.0)); + assert_eq!(decoded.max(), Some(5.0)); + assert_eq!(decoded.sum(), Some(15.0)); + + for q in [0.5, 0.9, 0.95, 0.99] { + let orig = sketch.quantile(q).unwrap().unwrap(); + let dec = decoded.quantile(q).unwrap().unwrap(); + assert!( + (orig - dec).abs() / orig.abs().max(1e-15) < 1e-12, + "quantile({}) mismatch: {} vs {}", + q, + orig, + dec + ); + } + } + + #[test] + fn test_encode_single_value() { + let mut sketch = DDSketch::new(Config::defaults()); + sketch.add(42.0); + + let bytes = sketch.to_java_bytes(); + let decoded = DDSketch::from_java_bytes(&bytes).unwrap(); + + assert_eq!(decoded.count(), 1); + assert_eq!(decoded.min(), Some(42.0)); + assert_eq!(decoded.max(), Some(42.0)); + assert_eq!(decoded.sum(), Some(42.0)); + } + + #[test] + fn test_encode_negative_values() { + let mut sketch = DDSketch::new(Config::defaults()); + for v in [-3.0, -1.0, 2.0, 5.0] { + sketch.add(v); + } + + let bytes = sketch.to_java_bytes(); + let decoded = DDSketch::from_java_bytes(&bytes).unwrap(); + + assert_eq!(decoded.count(), 4); + assert_eq!(decoded.min(), Some(-3.0)); + assert_eq!(decoded.max(), Some(5.0)); + assert_eq!(decoded.sum(), Some(3.0)); + + for q in [0.0, 0.25, 0.5, 0.75, 1.0] { + let orig = sketch.quantile(q).unwrap().unwrap(); + let dec = decoded.quantile(q).unwrap().unwrap(); + assert!( + (orig - dec).abs() / orig.abs().max(1e-15) < 1e-12, + "quantile({}) mismatch: {} vs {}", + q, + orig, + dec + ); + } + } + + #[test] + fn test_encode_with_zero_value() { + let mut sketch = DDSketch::new(Config::defaults()); + for v in [0.0, 1.0, 2.0] { + sketch.add(v); + } + + let bytes = sketch.to_java_bytes(); + let decoded = DDSketch::from_java_bytes(&bytes).unwrap(); + + assert_eq!(decoded.count(), 3); + assert_eq!(decoded.min(), Some(0.0)); + assert_eq!(decoded.max(), Some(2.0)); + assert_eq!(decoded.sum(), Some(3.0)); + assert_eq!(decoded.zero_count, 1); + } + + #[test] + fn test_encode_large_range() { + let mut sketch = DDSketch::new(Config::defaults()); + sketch.add(0.001); + sketch.add(1_000_000.0); + + let bytes = sketch.to_java_bytes(); + let decoded = DDSketch::from_java_bytes(&bytes).unwrap(); + + assert_eq!(decoded.count(), 2); + assert_eq!(decoded.min(), Some(0.001)); + assert_eq!(decoded.max(), Some(1_000_000.0)); + } + + #[test] + fn test_encode_with_version_prefix() { + let mut sketch = DDSketch::new(Config::defaults()); + for v in [1.0, 2.0, 3.0] { + sketch.add(v); + } + + let bytes = sketch.to_java_bytes(); + + // Simulate Java's toByteArrayV2: prepend 0x02 + let mut v2_bytes = vec![0x02]; + v2_bytes.extend_from_slice(&bytes); + + let decoded = DDSketch::from_java_bytes(&v2_bytes).unwrap(); + assert_eq!(decoded.count(), 3); + assert_eq!(decoded.min(), Some(1.0)); + assert_eq!(decoded.max(), Some(3.0)); + } + + #[test] + fn test_byte_level_encoding() { + let mut sketch = DDSketch::new(Config::defaults()); + sketch.add(1.0); + + let bytes = sketch.to_java_bytes(); + + // First byte should be FLAG_COUNT (0xA0) since count > 0 + assert_eq!(bytes[0], FLAG_COUNT, "first byte should be COUNT flag"); + + // After count + min + max + sum blocks, we should see FLAG_INDEX_MAPPING_LOG (0x02) + let has_mapping = bytes.contains(&FLAG_INDEX_MAPPING_LOG); + assert!(has_mapping, "should contain index mapping flag"); + } + + fn hex_to_bytes(hex: &str) -> Vec { + (0..hex.len()) + .step_by(2) + .map(|i| u8::from_str_radix(&hex[i..i + 2], 16).unwrap()) + .collect() + } + + // Golden bytes generated by Java's DDSketchWithExactSummaryStatistics.encode() + // using LogarithmicMapping(0.01) + CollapsingLowestDenseStore(2048) + const GOLDEN_SIMPLE: &str = "a00588000000000000f03f8c0000000000001440840000000000002e4002fd4a815abf52f03f000000000000000005050002440228021e021602"; + const GOLDEN_SINGLE: &str = "a0028800000000000045408c000000000000454084000000000000454002fd4a815abf52f03f00000000000000000501f40202"; + const GOLDEN_NEGATIVE: &str = "a084408800000000000008c08c000000000000144084000000000000084002fd4a815abf52f03f0000000000000000050244025c02070200026c02"; + const GOLDEN_ZERO: &str = "a0048800000000000000008c000000000000004084000000000000084002fd4a815abf52f03f00000000000000000402050200024402"; + const GOLDEN_EMPTY: &str = "02fd4a815abf52f03f0000000000000000"; + const GOLDEN_MANY: &str = "a08d1488000000000000f03f8c0000000000005940840000000000bab34002fd4a815abf52f03f000000000000000005550002440228021e021602120210020c020c020c0208020a020802060208020602060206020602040206020402040204020402040204020402040204020202040202020402020204020202020204020202020202020402020202020202020202020202020202020202020202020202020202020203020202020202020302020202020302020202020302020203020202030202020302030202020302030203020202030203020302030202"; + + #[test] + fn test_cross_language_simple() { + let mut sketch = DDSketch::new(Config::defaults()); + for v in [1.0, 2.0, 3.0, 4.0, 5.0] { + sketch.add(v); + } + let bytes = sketch.to_java_bytes(); + let expected = hex_to_bytes(GOLDEN_SIMPLE); + assert_eq!( + bytes, expected, + "Rust encoding doesn't match Java golden bytes for SIMPLE.\nRust: {}\nJava: {}", + bytes.iter().map(|b| format!("{:02x}", b)).collect::(), + GOLDEN_SIMPLE + ); + } + + #[test] + fn test_cross_language_single() { + let mut sketch = DDSketch::new(Config::defaults()); + sketch.add(42.0); + let bytes = sketch.to_java_bytes(); + let expected = hex_to_bytes(GOLDEN_SINGLE); + assert_eq!( + bytes, expected, + "Rust encoding doesn't match Java golden bytes for SINGLE.\nRust: {}\nJava: {}", + bytes.iter().map(|b| format!("{:02x}", b)).collect::(), + GOLDEN_SINGLE + ); + } + + #[test] + fn test_cross_language_negative() { + let mut sketch = DDSketch::new(Config::defaults()); + for v in [-3.0, -1.0, 2.0, 5.0] { + sketch.add(v); + } + let bytes = sketch.to_java_bytes(); + let expected = hex_to_bytes(GOLDEN_NEGATIVE); + assert_eq!( + bytes, expected, + "Rust encoding doesn't match Java golden bytes for NEGATIVE.\nRust: {}\nJava: {}", + bytes.iter().map(|b| format!("{:02x}", b)).collect::(), + GOLDEN_NEGATIVE + ); + } + + #[test] + fn test_cross_language_zero() { + let mut sketch = DDSketch::new(Config::defaults()); + for v in [0.0, 1.0, 2.0] { + sketch.add(v); + } + let bytes = sketch.to_java_bytes(); + let expected = hex_to_bytes(GOLDEN_ZERO); + assert_eq!( + bytes, expected, + "Rust encoding doesn't match Java golden bytes for ZERO.\nRust: {}\nJava: {}", + bytes.iter().map(|b| format!("{:02x}", b)).collect::(), + GOLDEN_ZERO + ); + } + + #[test] + fn test_cross_language_empty() { + let sketch = DDSketch::new(Config::defaults()); + let bytes = sketch.to_java_bytes(); + let expected = hex_to_bytes(GOLDEN_EMPTY); + assert_eq!( + bytes, expected, + "Rust encoding doesn't match Java golden bytes for EMPTY.\nRust: {}\nJava: {}", + bytes.iter().map(|b| format!("{:02x}", b)).collect::(), + GOLDEN_EMPTY + ); + } + + #[test] + fn test_cross_language_many() { + let mut sketch = DDSketch::new(Config::defaults()); + for i in 1..=100 { + sketch.add(i as f64); + } + let bytes = sketch.to_java_bytes(); + let expected = hex_to_bytes(GOLDEN_MANY); + assert_eq!( + bytes, expected, + "Rust encoding doesn't match Java golden bytes for MANY.\nRust: {}\nJava: {}", + bytes.iter().map(|b| format!("{:02x}", b)).collect::(), + GOLDEN_MANY + ); + } + + #[test] + fn test_decode_java_golden_bytes() { + // Verify we can decode all Java golden bytes + for (name, hex) in [ + ("SIMPLE", GOLDEN_SIMPLE), + ("SINGLE", GOLDEN_SINGLE), + ("NEGATIVE", GOLDEN_NEGATIVE), + ("ZERO", GOLDEN_ZERO), + ("EMPTY", GOLDEN_EMPTY), + ("MANY", GOLDEN_MANY), + ] { + let bytes = hex_to_bytes(hex); + let result = DDSketch::from_java_bytes(&bytes); + assert!(result.is_ok(), "failed to decode {}: {:?}", name, result.err()); + } + } + + #[test] + fn test_encode_decode_many_values() { + let mut sketch = DDSketch::new(Config::defaults()); + for i in 1..=100 { + sketch.add(i as f64); + } + + let bytes = sketch.to_java_bytes(); + let decoded = DDSketch::from_java_bytes(&bytes).unwrap(); + + assert_eq!(decoded.count(), 100); + assert_eq!(decoded.min(), Some(1.0)); + assert_eq!(decoded.max(), Some(100.0)); + assert_eq!(decoded.sum(), Some(5050.0)); + + let alpha = 0.01; + let orig_p95 = sketch.quantile(0.95).unwrap().unwrap(); + let dec_p95 = decoded.quantile(0.95).unwrap().unwrap(); + assert!( + (orig_p95 - dec_p95).abs() / orig_p95 < alpha, + "p95 mismatch: {} vs {}", + orig_p95, + dec_p95 + ); + } +} diff --git a/sketches-ddsketch/src/lib.rs b/sketches-ddsketch/src/lib.rs new file mode 100644 index 000000000..4e1fdf22f --- /dev/null +++ b/sketches-ddsketch/src/lib.rs @@ -0,0 +1,55 @@ +/*! +This crate provides a direct port of the [Golang](https://github.com/DataDog/sketches-go) +[DDSketch](https://arxiv.org/pdf/1908.10693.pdf) implementation to Rust. All efforts +have been made to keep this as close to the original implementation as possible, with a few tweaks to +get closer to idiomatic Rust. + +# Usage + +Add multiple samples to a DDSketch and invoke the `quantile` method to pull any quantile from +*0.0* to *1.0*. + +```rust +use sketches_ddsketch::{Config, DDSketch}; + +let c = Config::defaults(); +let mut d = DDSketch::new(c); + +d.add(1.0); +d.add(1.0); +d.add(1.0); + +let q = d.quantile(0.50).unwrap(); + +assert!(q < Some(1.02)); +assert!(q > Some(0.98)); +``` + +Sketches can also be merged. + +```rust +use sketches_ddsketch::{Config, DDSketch}; + +let c = Config::defaults(); +let mut d1 = DDSketch::new(c); +let mut d2 = DDSketch::new(c); + +d1.add(1.0); +d2.add(2.0); +d2.add(2.0); + +d1.merge(&d2); + +assert_eq!(d1.count(), 3); +``` + + */ + +pub use self::config::Config; +pub use self::ddsketch::{DDSketch, DDSketchError}; +pub use self::encoding::DecodeError; + +mod config; +mod ddsketch; +pub mod encoding; +mod store; diff --git a/sketches-ddsketch/src/store.rs b/sketches-ddsketch/src/store.rs new file mode 100644 index 000000000..3b144d68c --- /dev/null +++ b/sketches-ddsketch/src/store.rs @@ -0,0 +1,251 @@ +#[cfg(feature = "use_serde")] +use serde::{Deserialize, Serialize}; + +const CHUNK_SIZE: i32 = 128; + +// Divide the `dividend` by the `divisor`, rounding towards positive infinity. +// +// Similar to the nightly only `std::i32::div_ceil`. +fn div_ceil(dividend: i32, divisor: i32) -> i32 { + (dividend + divisor - 1) / divisor +} + +/// CollapsingLowestDenseStore +#[derive(Clone, Debug)] +#[cfg_attr(feature = "use_serde", derive(Serialize, Deserialize))] +pub struct Store { + pub(crate) bins: Vec, + pub(crate) count: u64, + pub(crate) min_key: i32, + pub(crate) max_key: i32, + pub(crate) offset: i32, + pub(crate) bin_limit: usize, + is_collapsed: bool, +} + +impl Store { + pub fn new(bin_limit: usize) -> Self { + Store { + bins: Vec::new(), + count: 0, + min_key: i32::MAX, + max_key: i32::MIN, + offset: 0, + bin_limit, + is_collapsed: false, + } + } + + /// Return the number of bins. + pub fn length(&self) -> i32 { + self.bins.len() as i32 + } + + pub fn is_empty(&self) -> bool { + self.bins.is_empty() + } + + pub fn add(&mut self, key: i32) { + let idx = self.get_index(key); + self.bins[idx] += 1; + self.count += 1; + } + + pub(crate) fn add_count(&mut self, key: i32, count: u64) { + let idx = self.get_index(key); + self.bins[idx] += count; + self.count += count; + } + + fn get_index(&mut self, key: i32) -> usize { + if key < self.min_key { + if self.is_collapsed { + return 0; + } + + self.extend_range(key, None); + if self.is_collapsed { + return 0; + } + } else if key > self.max_key { + self.extend_range(key, None); + } + + (key - self.offset) as usize + } + + fn extend_range(&mut self, key: i32, second_key: Option) { + let second_key = second_key.unwrap_or(key); + let new_min_key = i32::min(key, i32::min(second_key, self.min_key)); + let new_max_key = i32::max(key, i32::max(second_key, self.max_key)); + + if self.is_empty() { + let new_len = self.get_new_length(new_min_key, new_max_key); + self.bins.resize(new_len, 0); + self.offset = new_min_key; + self.adjust(new_min_key, new_max_key); + } else if new_min_key >= self.min_key && new_max_key < self.offset + self.length() { + self.min_key = new_min_key; + self.max_key = new_max_key; + } else { + // Grow bins + let new_length = self.get_new_length(new_min_key, new_max_key); + if new_length > self.length() as usize { + self.bins.resize(new_length, 0); + } + self.adjust(new_min_key, new_max_key); + } + } + + fn get_new_length(&self, new_min_key: i32, new_max_key: i32) -> usize { + let desired_length = new_max_key - new_min_key + 1; + usize::min( + (CHUNK_SIZE * div_ceil(desired_length, CHUNK_SIZE)) as usize, + self.bin_limit, + ) + } + + fn adjust(&mut self, new_min_key: i32, new_max_key: i32) { + if new_max_key - new_min_key + 1 > self.length() { + let new_min_key = new_max_key - self.length() + 1; + + if new_min_key >= self.max_key { + // Put everything in the first bin. + self.offset = new_min_key; + self.min_key = new_min_key; + self.bins.fill(0); + self.bins[0] = self.count; + } else { + let shift = self.offset - new_min_key; + if shift < 0 { + let collapse_start_index = (self.min_key - self.offset) as usize; + let collapse_end_index = (new_min_key - self.offset) as usize; + let collapsed_count: u64 = self.bins[collapse_start_index..collapse_end_index] + .iter() + .sum(); + let zero_len = (new_min_key - self.min_key) as usize; + self.bins.splice( + collapse_start_index..collapse_end_index, + std::iter::repeat(0).take(zero_len), + ); + self.bins[collapse_end_index] += collapsed_count; + } + self.min_key = new_min_key; + self.shift_bins(shift); + } + + self.max_key = new_max_key; + self.is_collapsed = true; + } else { + self.center_bins(new_min_key, new_max_key); + self.min_key = new_min_key; + self.max_key = new_max_key; + } + } + + fn shift_bins(&mut self, shift: i32) { + if shift > 0 { + let shift = shift as usize; + self.bins.rotate_right(shift); + for idx in 0..shift { + self.bins[idx] = 0; + } + } else { + let shift = shift.abs() as usize; + for idx in 0..shift { + self.bins[idx] = 0; + } + self.bins.rotate_left(shift); + } + + self.offset -= shift; + } + + fn center_bins(&mut self, new_min_key: i32, new_max_key: i32) { + let middle_key = new_min_key + (new_max_key - new_min_key + 1) / 2; + let shift = self.offset + self.length() / 2 - middle_key; + self.shift_bins(shift) + } + + pub fn key_at_rank(&self, rank: u64) -> i32 { + let mut n = 0; + for (i, bin) in self.bins.iter().enumerate() { + n += *bin; + if n > rank { + return i as i32 + self.offset; + } + } + + self.max_key + } + + pub fn count(&self) -> u64 { + self.count + } + + pub fn merge(&mut self, other: &Store) { + if other.count == 0 { + return; + } + + if self.count == 0 { + self.copy(other); + return; + } + + if other.min_key < self.min_key || other.max_key > self.max_key { + self.extend_range(other.min_key, Some(other.max_key)); + } + + let collapse_start_index = other.min_key - other.offset; + let mut collapse_end_index = i32::min(self.min_key, other.max_key + 1) - other.offset; + if collapse_end_index > collapse_start_index { + let collapsed_count: u64 = self.bins + [collapse_start_index as usize..collapse_end_index as usize] + .iter() + .sum(); + self.bins[0] += collapsed_count; + } else { + collapse_end_index = collapse_start_index; + } + + for key in (collapse_end_index + other.offset)..(other.max_key + 1) { + self.bins[(key - self.offset) as usize] += other.bins[(key - other.offset) as usize] + } + + self.count += other.count; + } + + fn copy(&mut self, o: &Store) { + self.bins = o.bins.clone(); + self.count = o.count; + self.min_key = o.min_key; + self.max_key = o.max_key; + self.offset = o.offset; + self.bin_limit = o.bin_limit; + self.is_collapsed = o.is_collapsed; + } +} + +#[cfg(test)] +mod tests { + use crate::store::Store; + + #[test] + fn test_simple_store() { + let mut s = Store::new(2048); + + for i in 0..2048 { + s.add(i); + } + } + + #[test] + fn test_simple_store_rev() { + let mut s = Store::new(2048); + + for i in 2048..0 { + s.add(i); + } + } +} diff --git a/sketches-ddsketch/tests/common/dataset.rs b/sketches-ddsketch/tests/common/dataset.rs new file mode 100644 index 000000000..ad88d55d6 --- /dev/null +++ b/sketches-ddsketch/tests/common/dataset.rs @@ -0,0 +1,90 @@ +use std::cmp::Ordering; +use std::f64::NAN; + +pub struct Dataset { + values: Vec, + sum: f64, + sorted: bool, +} + +fn cmp_f64(a: &f64, b: &f64) -> Ordering { + assert!(!a.is_nan() && !b.is_nan()); + + if a < b { + return Ordering::Less; + } else if a > b { + return Ordering::Greater; + } else { + return Ordering::Equal; + } +} + +impl Dataset { + pub fn new() -> Self { + Dataset { + values: Vec::new(), + sum: 0.0, + sorted: false, + } + } + + pub fn add(&mut self, value: f64) { + self.values.push(value); + self.sum += value; + self.sorted = false; + } + + /* + pub fn quantile(&mut self, q: f64) -> f64 { + self.lower_quantile(q) + } + */ + + pub fn lower_quantile(&mut self, q: f64) -> f64 { + if q < 0.0 || q > 1.0 || self.values.len() == 0 { + return NAN; + } + + self.sort(); + let rank = q * (self.values.len() - 1) as f64; + + self.values[rank.floor() as usize] + } + + pub fn upper_quantile(&mut self, q: f64) -> f64 { + if q < 0.0 || q > 1.0 || self.values.len() == 0 { + return NAN; + } + + self.sort(); + let rank = q * (self.values.len() - 1) as f64; + self.values[rank.ceil() as usize] + } + + pub fn min(&mut self) -> f64 { + self.sort(); + self.values[0] + } + + pub fn max(&mut self) -> f64 { + self.sort(); + self.values[self.values.len() - 1] + } + + pub fn sum(&self) -> f64 { + self.sum + } + + pub fn count(&self) -> usize { + self.values.len() + } + + fn sort(&mut self) { + if self.sorted { + return; + } + + self.values.sort_by(cmp_f64); + self.sorted = true; + } +} diff --git a/sketches-ddsketch/tests/common/generator.rs b/sketches-ddsketch/tests/common/generator.rs new file mode 100644 index 000000000..eff0057c9 --- /dev/null +++ b/sketches-ddsketch/tests/common/generator.rs @@ -0,0 +1,105 @@ +extern crate rand; +extern crate rand_distr; + +use rand::prelude::*; + +pub trait Generator { + fn generate(&mut self) -> f64; +} + +// +// Constant generator +// +pub struct Constant { + value: f64, +} +impl Constant { + pub fn new(value: f64) -> Self { + Constant { value } + } +} +impl Generator for Constant { + fn generate(&mut self) -> f64 { + self.value + } +} + +// +// Linear generator +// +pub struct Linear { + current_value: f64, + step: f64, +} +impl Linear { + pub fn new(start_value: f64, step: f64) -> Self { + Linear { + current_value: start_value, + step, + } + } +} +impl Generator for Linear { + fn generate(&mut self) -> f64 { + let value = self.current_value; + self.current_value += self.step; + value + } +} + +// +// Normal distribution generator +// +pub struct Normal { + distr: rand_distr::Normal, +} +impl Normal { + pub fn new(mean: f64, stddev: f64) -> Self { + Normal { + distr: rand_distr::Normal::new(mean, stddev).unwrap(), + } + } +} +impl Generator for Normal { + fn generate(&mut self) -> f64 { + self.distr.sample(&mut rand::thread_rng()) + } +} + +// +// Lognormal distribution generator +// +pub struct Lognormal { + distr: rand_distr::LogNormal, +} +impl Lognormal { + pub fn new(mean: f64, stddev: f64) -> Self { + Lognormal { + distr: rand_distr::LogNormal::new(mean, stddev).unwrap(), + } + } +} +impl Generator for Lognormal { + fn generate(&mut self) -> f64 { + self.distr.sample(&mut rand::thread_rng()) + } +} + +// +// Exponential distribution generator +// +pub struct Exponential { + distr: rand_distr::Exp, +} +impl Exponential { + pub fn new(lambda: f64) -> Self { + Exponential { + distr: rand_distr::Exp::new(lambda).unwrap(), + } + } +} +impl Generator for Exponential { + fn generate(&mut self) -> f64 { + self.distr.sample(&mut rand::thread_rng()) + } +} diff --git a/sketches-ddsketch/tests/common/mod.rs b/sketches-ddsketch/tests/common/mod.rs new file mode 100644 index 000000000..5cfaae4b8 --- /dev/null +++ b/sketches-ddsketch/tests/common/mod.rs @@ -0,0 +1,2 @@ +pub mod dataset; +pub mod generator; diff --git a/sketches-ddsketch/tests/test_ddsketch.rs b/sketches-ddsketch/tests/test_ddsketch.rs new file mode 100644 index 000000000..ea53eaa2e --- /dev/null +++ b/sketches-ddsketch/tests/test_ddsketch.rs @@ -0,0 +1,318 @@ +mod common; +use common::dataset::Dataset; +use common::generator; +use common::generator::Generator; + +use std::time::Instant; + +use sketches_ddsketch::Config; +use sketches_ddsketch::DDSketch; + +const TEST_ALPHA: f64 = 0.01; +const TEST_MAX_BINS: u32 = 1024; +const TEST_MIN_VALUE: f64 = 1.0e-9; + +// Used for float equality +const TEST_ERROR_THRESH: f64 = 1.0e-9; + +const TEST_SIZES: [usize; 5] = [3, 5, 10, 100, 1000]; +const TEST_QUANTILES: [f64; 10] = [0.0, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99, 0.999, 1.0]; + +#[test] +fn test_constant() { + evaluate_sketches(|| Box::new(generator::Constant::new(42.0))); +} + +#[test] +fn test_linear() { + evaluate_sketches(|| Box::new(generator::Linear::new(0.0, 1.0))); +} + +#[test] +fn test_normal() { + evaluate_sketches(|| Box::new(generator::Normal::new(35.0, 1.0))); +} + +#[test] +fn test_lognormal() { + evaluate_sketches(|| Box::new(generator::Lognormal::new(0.0, 2.0))); +} + +#[test] +fn test_exponential() { + evaluate_sketches(|| Box::new(generator::Exponential::new(2.0))); +} + +fn evaluate_test_sizes(f: impl Fn(usize)) { + for sz in &TEST_SIZES { + f(*sz); + } +} + +fn evaluate_sketches(gen_factory: impl Fn() -> Box) { + evaluate_test_sizes(|sz: usize| { + let mut generator = gen_factory(); + evaluate_sketch(sz, &mut generator); + }); +} + +fn new_config() -> Config { + Config::new(TEST_ALPHA, TEST_MAX_BINS, TEST_MIN_VALUE) +} + +fn assert_float_eq(a: f64, b: f64) { + assert!((a - b).abs() < TEST_ERROR_THRESH, "{} != {}", a, b); +} + +fn evaluate_sketch(count: usize, generator: &mut Box) { + let c = new_config(); + let mut g = DDSketch::new(c); + + let mut d = Dataset::new(); + + for _i in 0..count { + let value = generator.generate(); + + g.add(value); + d.add(value); + } + + compare_sketches(&mut d, &g); +} + +fn compare_sketches(d: &mut Dataset, g: &DDSketch) { + for q in &TEST_QUANTILES { + let lower = d.lower_quantile(*q); + let upper = d.upper_quantile(*q); + + let min_expected; + if lower < 0.0 { + min_expected = lower * (1.0 + TEST_ALPHA); + } else { + min_expected = lower * (1.0 - TEST_ALPHA); + } + + let max_expected; + if upper > 0.0 { + max_expected = upper * (1.0 + TEST_ALPHA); + } else { + max_expected = upper * (1.0 - TEST_ALPHA); + } + + let quantile = g.quantile(*q).unwrap().unwrap(); + + assert!( + min_expected <= quantile, + "Lower than min, quantile: {}, wanted {} <= {}", + *q, + min_expected, + quantile + ); + assert!( + quantile <= max_expected, + "Higher than max, quantile: {}, wanted {} <= {}", + *q, + quantile, + max_expected + ); + + // verify that calls do not modify result (not mut so not possible?) + let quantile2 = g.quantile(*q).unwrap().unwrap(); + assert_eq!(quantile, quantile2); + } + + assert_eq!(g.min().unwrap(), d.min()); + assert_eq!(g.max().unwrap(), d.max()); + assert_float_eq(g.sum().unwrap(), d.sum()); + assert_eq!(g.count(), d.count()); +} + +#[test] +fn test_merge_normal() { + evaluate_test_sizes(|sz: usize| { + let c = new_config(); + let mut d = Dataset::new(); + let mut g1 = DDSketch::new(c); + + let mut generator1 = generator::Normal::new(35.0, 1.0); + for _ in (0..sz).step_by(3) { + let value = generator1.generate(); + g1.add(value); + d.add(value); + } + let mut g2 = DDSketch::new(c); + let mut generator2 = generator::Normal::new(50.0, 2.0); + for _ in (1..sz).step_by(3) { + let value = generator2.generate(); + g2.add(value); + d.add(value); + } + g1.merge(&g2).unwrap(); + + let mut g3 = DDSketch::new(c); + let mut generator3 = generator::Normal::new(40.0, 0.5); + for _ in (2..sz).step_by(3) { + let value = generator3.generate(); + g3.add(value); + d.add(value); + } + g1.merge(&g3).unwrap(); + + compare_sketches(&mut d, &g1); + }); +} + +#[test] +fn test_merge_empty() { + evaluate_test_sizes(|sz: usize| { + let c = new_config(); + + let mut d = Dataset::new(); + + let mut g1 = DDSketch::new(c); + let mut g2 = DDSketch::new(c); + let mut generator = generator::Exponential::new(5.0); + + for _ in 0..sz { + let value = generator.generate(); + g2.add(value); + d.add(value); + } + g1.merge(&g2).unwrap(); + compare_sketches(&mut d, &g1); + + let g3 = DDSketch::new(c); + g2.merge(&g3).unwrap(); + compare_sketches(&mut d, &g2); + }); +} + +#[test] +fn test_merge_mixed() { + evaluate_test_sizes(|sz: usize| { + let c = new_config(); + let mut d = Dataset::new(); + let mut g1 = DDSketch::new(c); + + let mut generator1 = generator::Normal::new(100.0, 1.0); + for _ in (0..sz).step_by(3) { + let value = generator1.generate(); + g1.add(value); + d.add(value); + } + + let mut g2 = DDSketch::new(c); + let mut generator2 = generator::Exponential::new(5.0); + for _ in (1..sz).step_by(3) { + let value = generator2.generate(); + g2.add(value); + d.add(value); + } + g1.merge(&g2).unwrap(); + + let mut g3 = DDSketch::new(c); + let mut generator3 = generator::Exponential::new(0.1); + for _ in (2..sz).step_by(3) { + let value = generator3.generate(); + g3.add(value); + d.add(value); + } + g1.merge(&g3).unwrap(); + + compare_sketches(&mut d, &g1); + }) +} + +#[test] +fn test_merge_incompatible() { + let c1 = Config::new(TEST_ALPHA, TEST_MAX_BINS, TEST_MIN_VALUE); + let c2 = Config::new(TEST_ALPHA * 2.0, TEST_MAX_BINS, TEST_MIN_VALUE); + + let mut d1 = DDSketch::new(c1); + let d2 = DDSketch::new(c2); + + assert!(d1.merge(&d2).is_err()); + + let c3 = Config::new(TEST_ALPHA, TEST_MAX_BINS, TEST_MIN_VALUE * 10.0); + let d3 = DDSketch::new(c3); + + assert!(d1.merge(&d3).is_err()); + + let c4 = Config::new(TEST_ALPHA, TEST_MAX_BINS * 2, TEST_MIN_VALUE); + let d4 = DDSketch::new(c4); + + assert!(d1.merge(&d4).is_err()); + + // the same should work + let c5 = Config::new(TEST_ALPHA, TEST_MAX_BINS, TEST_MIN_VALUE); + let dsame = DDSketch::new(c5); + assert!(d1.merge(&dsame).is_ok()); +} + +#[test] +#[ignore] +fn test_performance_insert() { + let c = Config::defaults(); + let mut g = DDSketch::new(c); + let mut gen = generator::Normal::new(1000.0, 500.0); + let count = 300_000_000; + + let mut values = Vec::new(); + for _ in 0..count { + values.push(gen.generate()); + } + + let start_time = Instant::now(); + for value in values { + g.add(value); + } + + // This simply ensures the operations don't get optimzed out as ignored + let quantile = g.quantile(0.50).unwrap().unwrap(); + + let elapsed = start_time.elapsed().as_micros() as f64; + let elapsed = elapsed / 1_000_000.0; + + println!( + "RESULT: p50={:.2} => Added {}M samples in {:2} secs ({:.2}M samples/sec)", + quantile, + count / 1_000_000, + elapsed, + (count as f64) / 1_000_000.0 / elapsed + ); +} + +#[test] +#[ignore] +fn test_performance_merge() { + let c = Config::defaults(); + let mut gen = generator::Normal::new(1000.0, 500.0); + let merge_count = 500_000; + let sample_count = 1_000; + let mut sketches = Vec::new(); + + for _ in 0..merge_count { + let mut d = DDSketch::new(c); + for _ in 0..sample_count { + d.add(gen.generate()); + } + sketches.push(d); + } + + let mut base = DDSketch::new(c); + + let start_time = Instant::now(); + for sketch in &sketches { + base.merge(sketch).unwrap(); + } + + let elapsed = start_time.elapsed().as_micros() as f64; + let elapsed = elapsed / 1_000_000.0; + + println!( + "RESULT: Merged {} sketches in {:2} secs ({:.2} merges/sec)", + merge_count, + elapsed, + (merge_count as f64) / elapsed + ); +} diff --git a/src/aggregation/metric/percentiles.rs b/src/aggregation/metric/percentiles.rs index ff9de45f1..3ab921809 100644 --- a/src/aggregation/metric/percentiles.rs +++ b/src/aggregation/metric/percentiles.rs @@ -222,6 +222,12 @@ impl PercentilesCollector { self.sketch.add(val); } + /// Encode the underlying DDSketch to Java-compatible binary format + /// for cross-language serialization with event-query. + pub fn to_sketch_bytes(&self) -> Vec { + self.sketch.to_java_bytes() + } + pub(crate) fn merge_fruits(&mut self, right: PercentilesCollector) -> crate::Result<()> { self.sketch.merge(&right.sketch).map_err(|err| { TantivyError::AggregationError(AggregationError::InternalError(format!( @@ -610,11 +616,11 @@ mod tests { assert_eq!( res["range_with_stats"]["buckets"][0]["percentiles"]["values"]["1.0"], - 5.0028295751107414 + 5.002829575110705 ); assert_eq!( res["range_with_stats"]["buckets"][0]["percentiles"]["values"]["99.0"], - 10.07469668951144 + 10.07469668951133 ); Ok(()) @@ -659,8 +665,8 @@ mod tests { let res = exec_request_with_query(agg_req, &index, None)?; - assert_eq!(res["percentiles"]["values"]["1.0"], 5.0028295751107414); - assert_eq!(res["percentiles"]["values"]["99.0"], 10.07469668951144); + assert_eq!(res["percentiles"]["values"]["1.0"], 5.002829575110705); + assert_eq!(res["percentiles"]["values"]["99.0"], 10.07469668951133); Ok(()) }