From b9ace002cebd6195bf581e7f708a6788cefc8d5f Mon Sep 17 00:00:00 2001 From: "cong.xie" Date: Thu, 19 Feb 2026 12:22:19 -0500 Subject: [PATCH] Replace vendored sketches-ddsketch with git dependency Move the vendored sketches-ddsketch crate (with Java-compatible binary encoding) to its own repo at quickwit-oss/rust-sketches-ddsketch and reference it via git+rev in Cargo.toml. Co-authored-by: Cursor --- Cargo.toml | 3 +- sketches-ddsketch/Cargo.toml | 27 - sketches-ddsketch/LICENSE | 201 ----- sketches-ddsketch/Makefile | 11 - sketches-ddsketch/README.md | 37 - sketches-ddsketch/src/config.rs | 98 --- sketches-ddsketch/src/ddsketch.rs | 385 --------- sketches-ddsketch/src/encoding.rs | 813 -------------------- sketches-ddsketch/src/lib.rs | 52 -- sketches-ddsketch/src/store.rs | 252 ------ sketches-ddsketch/tests/common/dataset.rs | 88 --- sketches-ddsketch/tests/common/generator.rs | 100 --- sketches-ddsketch/tests/common/mod.rs | 2 - sketches-ddsketch/tests/test_ddsketch.rs | 316 -------- 14 files changed, 1 insertion(+), 2384 deletions(-) delete mode 100644 sketches-ddsketch/Cargo.toml delete mode 100644 sketches-ddsketch/LICENSE delete mode 100644 sketches-ddsketch/Makefile delete mode 100644 sketches-ddsketch/README.md delete mode 100644 sketches-ddsketch/src/config.rs delete mode 100644 sketches-ddsketch/src/ddsketch.rs delete mode 100644 sketches-ddsketch/src/encoding.rs delete mode 100644 sketches-ddsketch/src/lib.rs delete mode 100644 sketches-ddsketch/src/store.rs delete mode 100644 sketches-ddsketch/tests/common/dataset.rs delete mode 100644 sketches-ddsketch/tests/common/generator.rs delete mode 100644 sketches-ddsketch/tests/common/mod.rs delete mode 100644 sketches-ddsketch/tests/test_ddsketch.rs diff --git a/Cargo.toml b/Cargo.toml index 990bbb7ad..6e4967028 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -64,7 +64,7 @@ query-grammar = { version = "0.25.0", path = "./query-grammar", package = "tanti tantivy-bitpacker = { version = "0.9", path = "./bitpacker" } common = { version = "0.10", path = "./common/", package = "tantivy-common" } tokenizer-api = { version = "0.6", path = "./tokenizer-api", package = "tantivy-tokenizer-api" } -sketches-ddsketch = { path = "./sketches-ddsketch", features = ["use_serde"] } +sketches-ddsketch = { git = "https://github.com/quickwit-oss/rust-sketches-ddsketch.git", rev = "555caf1", features = ["use_serde"] } datasketches = "0.2.0" futures-util = { version = "0.3.28", optional = true } futures-channel = { version = "0.3.28", optional = true } @@ -144,7 +144,6 @@ members = [ "sstable", "tokenizer-api", "columnar", - "sketches-ddsketch", ] # Following the "fail" crate best practises, we isolate diff --git a/sketches-ddsketch/Cargo.toml b/sketches-ddsketch/Cargo.toml deleted file mode 100644 index e62c2b93c..000000000 --- a/sketches-ddsketch/Cargo.toml +++ /dev/null @@ -1,27 +0,0 @@ -[package] -name = "sketches-ddsketch" -version = "0.3.0" -authors = ["Mike Heffner "] -edition = "2018" -license = "Apache-2.0" -readme = "README.md" -repository = "https://github.com/mheffner/rust-sketches-ddsketch" -homepage = "https://github.com/mheffner/rust-sketches-ddsketch" -description = """ -A direct port of the Golang DDSketch implementation. -""" -exclude = [".gitignore"] - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -serde = { package = "serde", version = "1.0", optional = true, features = ["derive", "serde_derive"] } - -[dev-dependencies] -approx = "0.5.1" -rand = "0.8.5" -rand_distr = "0.4.3" - -[features] -use_serde = ["serde", "serde/derive"] - diff --git a/sketches-ddsketch/LICENSE b/sketches-ddsketch/LICENSE deleted file mode 100644 index ef11f508a..000000000 --- a/sketches-ddsketch/LICENSE +++ /dev/null @@ -1,201 +0,0 @@ - Apache License - Version 2.0, January 2004 - http://www.apache.org/licenses/ - - TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION - - 1. Definitions. - - "License" shall mean the terms and conditions for use, reproduction, - and distribution as defined by Sections 1 through 9 of this document. - - "Licensor" shall mean the copyright owner or entity authorized by - the copyright owner that is granting the License. - - "Legal Entity" shall mean the union of the acting entity and all - other entities that control, are controlled by, or are under common - control with that entity. For the purposes of this definition, - "control" means (i) the power, direct or indirect, to cause the - direction or management of such entity, whether by contract or - otherwise, or (ii) ownership of fifty percent (50%) or more of the - outstanding shares, or (iii) beneficial ownership of such entity. - - "You" (or "Your") shall mean an individual or Legal Entity - exercising permissions granted by this License. - - "Source" form shall mean the preferred form for making modifications, - including but not limited to software source code, documentation - source, and configuration files. - - "Object" form shall mean any form resulting from mechanical - transformation or translation of a Source form, including but - not limited to compiled object code, generated documentation, - and conversions to other media types. - - "Work" shall mean the work of authorship, whether in Source or - Object form, made available under the License, as indicated by a - copyright notice that is included in or attached to the work - (an example is provided in the Appendix below). - - "Derivative Works" shall mean any work, whether in Source or Object - form, that is based on (or derived from) the Work and for which the - editorial revisions, annotations, elaborations, or other modifications - represent, as a whole, an original work of authorship. For the purposes - of this License, Derivative Works shall not include works that remain - separable from, or merely link (or bind by name) to the interfaces of, - the Work and Derivative Works thereof. - - "Contribution" shall mean any work of authorship, including - the original version of the Work and any modifications or additions - to that Work or Derivative Works thereof, that is intentionally - submitted to Licensor for inclusion in the Work by the copyright owner - or by an individual or Legal Entity authorized to submit on behalf of - the copyright owner. For the purposes of this definition, "submitted" - means any form of electronic, verbal, or written communication sent - to the Licensor or its representatives, including but not limited to - communication on electronic mailing lists, source code control systems, - and issue tracking systems that are managed by, or on behalf of, the - Licensor for the purpose of discussing and improving the Work, but - excluding communication that is conspicuously marked or otherwise - designated in writing by the copyright owner as "Not a Contribution." - - "Contributor" shall mean Licensor and any individual or Legal Entity - on behalf of whom a Contribution has been received by Licensor and - subsequently incorporated within the Work. - - 2. Grant of Copyright License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - copyright license to reproduce, prepare Derivative Works of, - publicly display, publicly perform, sublicense, and distribute the - Work and such Derivative Works in Source or Object form. - - 3. Grant of Patent License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - (except as stated in this section) patent license to make, have made, - use, offer to sell, sell, import, and otherwise transfer the Work, - where such license applies only to those patent claims licensable - by such Contributor that are necessarily infringed by their - Contribution(s) alone or by combination of their Contribution(s) - with the Work to which such Contribution(s) was submitted. If You - institute patent litigation against any entity (including a - cross-claim or counterclaim in a lawsuit) alleging that the Work - or a Contribution incorporated within the Work constitutes direct - or contributory patent infringement, then any patent licenses - granted to You under this License for that Work shall terminate - as of the date such litigation is filed. - - 4. Redistribution. You may reproduce and distribute copies of the - Work or Derivative Works thereof in any medium, with or without - modifications, and in Source or Object form, provided that You - meet the following conditions: - - (a) You must give any other recipients of the Work or - Derivative Works a copy of this License; and - - (b) You must cause any modified files to carry prominent notices - stating that You changed the files; and - - (c) You must retain, in the Source form of any Derivative Works - that You distribute, all copyright, patent, trademark, and - attribution notices from the Source form of the Work, - excluding those notices that do not pertain to any part of - the Derivative Works; and - - (d) If the Work includes a "NOTICE" text file as part of its - distribution, then any Derivative Works that You distribute must - include a readable copy of the attribution notices contained - within such NOTICE file, excluding those notices that do not - pertain to any part of the Derivative Works, in at least one - of the following places: within a NOTICE text file distributed - as part of the Derivative Works; within the Source form or - documentation, if provided along with the Derivative Works; or, - within a display generated by the Derivative Works, if and - wherever such third-party notices normally appear. The contents - of the NOTICE file are for informational purposes only and - do not modify the License. You may add Your own attribution - notices within Derivative Works that You distribute, alongside - or as an addendum to the NOTICE text from the Work, provided - that such additional attribution notices cannot be construed - as modifying the License. - - You may add Your own copyright statement to Your modifications and - may provide additional or different license terms and conditions - for use, reproduction, or distribution of Your modifications, or - for any such Derivative Works as a whole, provided Your use, - reproduction, and distribution of the Work otherwise complies with - the conditions stated in this License. - - 5. Submission of Contributions. Unless You explicitly state otherwise, - any Contribution intentionally submitted for inclusion in the Work - by You to the Licensor shall be under the terms and conditions of - this License, without any additional terms or conditions. - Notwithstanding the above, nothing herein shall supersede or modify - the terms of any separate license agreement you may have executed - with Licensor regarding such Contributions. - - 6. Trademarks. This License does not grant permission to use the trade - names, trademarks, service marks, or product names of the Licensor, - except as required for reasonable and customary use in describing the - origin of the Work and reproducing the content of the NOTICE file. - - 7. Disclaimer of Warranty. Unless required by applicable law or - agreed to in writing, Licensor provides the Work (and each - Contributor provides its Contributions) on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - implied, including, without limitation, any warranties or conditions - of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A - PARTICULAR PURPOSE. You are solely responsible for determining the - appropriateness of using or redistributing the Work and assume any - risks associated with Your exercise of permissions under this License. - - 8. Limitation of Liability. In no event and under no legal theory, - whether in tort (including negligence), contract, or otherwise, - unless required by applicable law (such as deliberate and grossly - negligent acts) or agreed to in writing, shall any Contributor be - liable to You for damages, including any direct, indirect, special, - incidental, or consequential damages of any character arising as a - result of this License or out of the use or inability to use the - Work (including but not limited to damages for loss of goodwill, - work stoppage, computer failure or malfunction, or any and all - other commercial damages or losses), even if such Contributor - has been advised of the possibility of such damages. - - 9. Accepting Warranty or Additional Liability. While redistributing - the Work or Derivative Works thereof, You may choose to offer, - and charge a fee for, acceptance of support, warranty, indemnity, - or other liability obligations and/or rights consistent with this - License. However, in accepting such obligations, You may act only - on Your own behalf and on Your sole responsibility, not on behalf - of any other Contributor, and only if You agree to indemnify, - defend, and hold each Contributor harmless for any liability - incurred by, or claims asserted against, such Contributor by reason - of your accepting any such warranty or additional liability. - - END OF TERMS AND CONDITIONS - - APPENDIX: How to apply the Apache License to your work. - - To apply the Apache License to your work, attach the following - boilerplate notice, with the fields enclosed by brackets "[]" - replaced with your own identifying information. (Don't include - the brackets!) The text should be enclosed in the appropriate - comment syntax for the file format. We also recommend that a - file or class name and description of purpose be included on the - same "printed page" as the copyright notice for easier - identification within third-party archives. - - Copyright [2019] [Mike Heffner] - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. diff --git a/sketches-ddsketch/Makefile b/sketches-ddsketch/Makefile deleted file mode 100644 index df0acaabf..000000000 --- a/sketches-ddsketch/Makefile +++ /dev/null @@ -1,11 +0,0 @@ -clean: - cargo clean - -test: - cargo test - -test_logs: - cargo test -- --nocapture - -test_performance: - cargo test --release --jobs 1 test_performance -- --ignored --nocapture \ No newline at end of file diff --git a/sketches-ddsketch/README.md b/sketches-ddsketch/README.md deleted file mode 100644 index fd45f4991..000000000 --- a/sketches-ddsketch/README.md +++ /dev/null @@ -1,37 +0,0 @@ -# sketches-ddsketch - -This is a direct port of the [Golang](https://github.com/DataDog/sketches-go) -[DDSketch](https://arxiv.org/pdf/1908.10693.pdf) quantile sketch implementation -to Rust. DDSketch is a fully-mergeable quantile sketch with relative-error -guarantees and is extremely fast. - -# DDSketch - -* Sketch size automatically grows as needed, starting with 128 bins. -* Extremely fast sample insertion and sketch merges. - -## Usage - -```rust -use sketches_ddsketch::{Config, DDSketch}; - -let config = Config::defaults(); -let mut sketch = DDSketch::new(c); - -sketch.add(1.0); -sketch.add(1.0); -sketch.add(1.0); - -// Get p=50% -let quantile = sketch.quantile(0.5).unwrap(); -assert_eq!(quantile, Some(1.0)); -``` - -## Performance - -No performance tuning has been done with this implementation of the port, so we -would expect similar profiles to the original implementation. - -Out of the box we see can achieve over 70M sample inserts/sec and 350K sketch -merges/sec. All tests run on a single core Intel i7 processor with 4.2Ghz max -clock. \ No newline at end of file diff --git a/sketches-ddsketch/src/config.rs b/sketches-ddsketch/src/config.rs deleted file mode 100644 index f4f2127e1..000000000 --- a/sketches-ddsketch/src/config.rs +++ /dev/null @@ -1,98 +0,0 @@ -#[cfg(feature = "use_serde")] -use serde::{Deserialize, Serialize}; - -const DEFAULT_MAX_BINS: u32 = 2048; -const DEFAULT_ALPHA: f64 = 0.01; -const DEFAULT_MIN_VALUE: f64 = 1.0e-9; - -/// The configuration struct for constructing a `DDSketch` -#[derive(Copy, Clone, Debug, PartialEq)] -#[cfg_attr(feature = "use_serde", derive(Serialize, Deserialize))] -pub struct Config { - pub max_num_bins: u32, - pub gamma: f64, - pub(crate) gamma_ln: f64, - pub(crate) min_value: f64, - pub offset: i32, -} - -fn log_gamma(value: f64, gamma_ln: f64) -> f64 { - value.ln() / gamma_ln -} - -impl Config { - /// Construct a new `Config` struct with specific parameters. If you are unsure of how to - /// configure this, the `defaults` method constructs a `Config` with built-in defaults. - /// - /// `max_num_bins` is the max number of bins the DDSketch will grow to, in steps of 128 bins. - pub fn new(alpha: f64, max_num_bins: u32, min_value: f64) -> Self { - // Aligned with Java's LogarithmicMapping / LogLikeIndexMapping: - // gamma = (1 + alpha) / (1 - alpha) (correctingFactor=1 for LogarithmicMapping) - // gamma_ln = gamma.ln() (not ln_1p, to match Java's Math.log(gamma)) - // See: https://github.com/DataDog/sketches-java/blob/master/src/main/java/com/datadoghq/sketch/ddsketch/mapping/LogLikeIndexMapping.java (gamma() static method) - // See: https://github.com/DataDog/sketches-java/blob/master/src/main/java/com/datadoghq/sketch/ddsketch/mapping/LogarithmicMapping.java (constructor, correctingFactor()=1) - let gamma = (1.0 + alpha) / (1.0 - alpha); - let gamma_ln = gamma.ln(); - - Config { - max_num_bins, - gamma, - gamma_ln, - min_value, - offset: 1 - (log_gamma(min_value, gamma_ln) as i32), - } - } - - /// Return a `Config` using built-in default settings - pub fn defaults() -> Self { - Self::new(DEFAULT_ALPHA, DEFAULT_MAX_BINS, DEFAULT_MIN_VALUE) - } - - pub fn key(&self, v: f64) -> i32 { - // Aligned with Java's LogLikeIndexMapping.index(): floor-based indexing. - // Java uses `(int) index` / `(int) index - 1` which is equivalent to floor(). - // See: https://github.com/DataDog/sketches-java/blob/master/src/main/java/com/datadoghq/sketch/ddsketch/mapping/LogLikeIndexMapping.java (index() method) - self.log_gamma(v).floor() as i32 - } - - pub fn value(&self, key: i32) -> f64 { - // Aligned with Java's LogLikeIndexMapping.value(): - // lowerBound(index) * (1 + relativeAccuracy) - // = logInverse((index - indexOffset) / multiplier) * (1 + relativeAccuracy) - // = gamma^key * 2*gamma/(gamma+1) - // See: https://github.com/DataDog/sketches-java/blob/master/src/main/java/com/datadoghq/sketch/ddsketch/mapping/LogLikeIndexMapping.java (value() and lowerBound() methods) - self.pow_gamma(key) * (2.0 * self.gamma / (1.0 + self.gamma)) - } - - pub fn log_gamma(&self, value: f64) -> f64 { - log_gamma(value, self.gamma_ln) - } - - pub fn pow_gamma(&self, key: i32) -> f64 { - ((key as f64) * self.gamma_ln).exp() - } - - pub fn min_possible(&self) -> f64 { - self.min_value - } - - /// Reconstruct a Config from a gamma value (as decoded from the binary format). - /// Uses default max_num_bins and min_value. - /// See Java: https://github.com/DataDog/sketches-java/blob/master/src/main/java/com/datadoghq/sketch/ddsketch/mapping/LogarithmicMapping.java (LogarithmicMapping(double gamma, double indexOffset) constructor) - pub(crate) fn from_gamma(gamma: f64) -> Self { - let gamma_ln = gamma.ln(); - Config { - max_num_bins: DEFAULT_MAX_BINS, - gamma, - gamma_ln, - min_value: DEFAULT_MIN_VALUE, - offset: 1 - (log_gamma(DEFAULT_MIN_VALUE, gamma_ln) as i32), - } - } -} - -impl Default for Config { - fn default() -> Self { - Self::new(DEFAULT_ALPHA, DEFAULT_MAX_BINS, DEFAULT_MIN_VALUE) - } -} diff --git a/sketches-ddsketch/src/ddsketch.rs b/sketches-ddsketch/src/ddsketch.rs deleted file mode 100644 index c738d5dad..000000000 --- a/sketches-ddsketch/src/ddsketch.rs +++ /dev/null @@ -1,385 +0,0 @@ -use std::{error, fmt}; - -#[cfg(feature = "use_serde")] -use serde::{Deserialize, Serialize}; - -use crate::config::Config; -use crate::store::Store; - -type Result = std::result::Result; - -/// General error type for DDSketch, represents either an invalid quantile or an -/// incompatible merge operation. -#[derive(Debug, Clone)] -pub enum DDSketchError { - Quantile, - Merge, -} -impl fmt::Display for DDSketchError { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self { - DDSketchError::Quantile => { - write!(f, "Invalid quantile, must be between 0 and 1 (inclusive)") - } - DDSketchError::Merge => write!(f, "Can not merge sketches with different configs"), - } - } -} -impl error::Error for DDSketchError { - fn source(&self) -> Option<&(dyn error::Error + 'static)> { - // Generic - None - } -} - -/// This struct represents a [DDSketch](https://arxiv.org/pdf/1908.10693.pdf) -#[derive(Clone)] -#[cfg_attr(feature = "use_serde", derive(Serialize, Deserialize))] -pub struct DDSketch { - pub(crate) config: Config, - pub(crate) store: Store, - pub(crate) negative_store: Store, - pub(crate) min: f64, - pub(crate) max: f64, - pub(crate) sum: f64, - pub(crate) zero_count: u64, -} - -impl Default for DDSketch { - fn default() -> Self { - Self::new(Default::default()) - } -} - -// XXX: functions should return Option<> in the case of empty -impl DDSketch { - /// Construct a `DDSketch`. Requires a `Config` specifying the parameters of the sketch - pub fn new(config: Config) -> Self { - DDSketch { - config, - store: Store::new(config.max_num_bins as usize), - negative_store: Store::new(config.max_num_bins as usize), - min: f64::INFINITY, - max: f64::NEG_INFINITY, - sum: 0.0, - zero_count: 0, - } - } - - /// Add the sample to the sketch - pub fn add(&mut self, v: f64) { - if v > self.config.min_possible() { - let key = self.config.key(v); - self.store.add(key); - } else if v < -self.config.min_possible() { - let key = self.config.key(-v); - self.negative_store.add(key); - } else { - self.zero_count += 1; - } - - if v < self.min { - self.min = v; - } - if self.max < v { - self.max = v; - } - self.sum += v; - } - - /// Return the quantile value for quantiles between 0.0 and 1.0. Result is an error, represented - /// as DDSketchError::Quantile if the requested quantile is outside of that range. - /// - /// If the sketch is empty the result is None, else Some(v) for the quantile value. - pub fn quantile(&self, q: f64) -> Result> { - if !(0.0..=1.0).contains(&q) { - return Err(DDSketchError::Quantile); - } - - if self.empty() { - return Ok(None); - } - - if q == 0.0 { - return Ok(Some(self.min)); - } else if q == 1.0 { - return Ok(Some(self.max)); - } - - let rank = (q * (self.count() as f64 - 1.0)) as u64; - let quantile; - if rank < self.negative_store.count() { - let reversed_rank = self.negative_store.count() - rank - 1; - let key = self.negative_store.key_at_rank(reversed_rank); - quantile = -self.config.value(key); - } else if rank < self.zero_count + self.negative_store.count() { - quantile = 0.0; - } else { - let key = self - .store - .key_at_rank(rank - self.zero_count - self.negative_store.count()); - quantile = self.config.value(key); - } - - Ok(Some(quantile)) - } - - /// Returns the minimum value seen, or None if sketch is empty - pub fn min(&self) -> Option { - if self.empty() { - None - } else { - Some(self.min) - } - } - - /// Returns the maximum value seen, or None if sketch is empty - pub fn max(&self) -> Option { - if self.empty() { - None - } else { - Some(self.max) - } - } - - /// Returns the sum of values seen, or None if sketch is empty - pub fn sum(&self) -> Option { - if self.empty() { - None - } else { - Some(self.sum) - } - } - - /// Returns the number of values added to the sketch - pub fn count(&self) -> usize { - (self.store.count() + self.zero_count + self.negative_store.count()) as usize - } - - /// Returns the length of the underlying `Store`. This is mainly only useful for understanding - /// how much the sketch has grown given the inserted values. - pub fn length(&self) -> usize { - self.store.length() as usize + self.negative_store.length() as usize - } - - /// Merge the contents of another sketch into this one. The sketch that is merged into this one - /// is unchanged after the merge. - pub fn merge(&mut self, o: &DDSketch) -> Result<()> { - if self.config != o.config { - return Err(DDSketchError::Merge); - } - - let was_empty = self.store.count() == 0; - - // Merge the stores - self.store.merge(&o.store); - self.negative_store.merge(&o.negative_store); - self.zero_count += o.zero_count; - - // Need to ensure we don't override min/max with initializers - // if either store were empty - if was_empty { - self.min = o.min; - self.max = o.max; - } else if o.store.count() > 0 { - if o.min < self.min { - self.min = o.min - } - if o.max > self.max { - self.max = o.max; - } - } - self.sum += o.sum; - - Ok(()) - } - - fn empty(&self) -> bool { - self.count() == 0 - } - - /// Encode this sketch into the Java-compatible binary format used by - /// `com.datadoghq.sketch.ddsketch.DDSketchWithExactSummaryStatistics`. - pub fn to_java_bytes(&self) -> Vec { - crate::encoding::encode_to_java_bytes(self) - } - - /// Decode a sketch from the Java-compatible binary format. - /// Accepts bytes produced by Java's `DDSketchWithExactSummaryStatistics.encode()` - /// with or without the `0x02` version prefix. - pub fn from_java_bytes( - bytes: &[u8], - ) -> std::result::Result { - crate::encoding::decode_from_java_bytes(bytes) - } -} - -#[cfg(test)] -mod tests { - use approx::assert_relative_eq; - - use crate::{Config, DDSketch}; - - #[test] - fn test_add_zero() { - let alpha = 0.01; - let c = Config::new(alpha, 2048, 10e-9); - let mut dd = DDSketch::new(c); - dd.add(0.0); - } - - #[test] - fn test_quartiles() { - let alpha = 0.01; - let c = Config::new(alpha, 2048, 10e-9); - let mut dd = DDSketch::new(c); - - // Initialize sketch with {1.0, 2.0, 3.0, 4.0} - for i in 1..5 { - dd.add(i as f64); - } - - // We expect the following mappings from quantile to value: - // [0,0.33]: 1.0, (0.34,0.66]: 2.0, (0.67,0.99]: 3.0, (0.99, 1.0]: 4.0 - let test_cases = vec![ - (0.0, 1.0), - (0.25, 1.0), - (0.33, 1.0), - (0.34, 2.0), - (0.5, 2.0), - (0.66, 2.0), - (0.67, 3.0), - (0.75, 3.0), - (0.99, 3.0), - (1.0, 4.0), - ]; - - for (q, val) in test_cases { - assert_relative_eq!(dd.quantile(q).unwrap().unwrap(), val, max_relative = alpha); - } - } - - #[test] - fn test_neg_quartiles() { - let alpha = 0.01; - let c = Config::new(alpha, 2048, 10e-9); - let mut dd = DDSketch::new(c); - - // Initialize sketch with {1.0, 2.0, 3.0, 4.0} - for i in 1..5 { - dd.add(-i as f64); - } - - let test_cases = vec![ - (0.0, -4.0), - (0.25, -4.0), - (0.5, -3.0), - (0.75, -2.0), - (1.0, -1.0), - ]; - - for (q, val) in test_cases { - assert_relative_eq!(dd.quantile(q).unwrap().unwrap(), val, max_relative = alpha); - } - } - - #[test] - fn test_simple_quantile() { - let c = Config::defaults(); - let mut dd = DDSketch::new(c); - - for i in 1..101 { - dd.add(i as f64); - } - - assert_eq!(dd.quantile(0.95).unwrap().unwrap().ceil(), 95.0); - - assert!(dd.quantile(-1.01).is_err()); - assert!(dd.quantile(1.01).is_err()); - } - - #[test] - fn test_empty_sketch() { - let c = Config::defaults(); - let dd = DDSketch::new(c); - - assert_eq!(dd.quantile(0.98).unwrap(), None); - assert_eq!(dd.max(), None); - assert_eq!(dd.min(), None); - assert_eq!(dd.sum(), None); - assert_eq!(dd.count(), 0); - - assert!(dd.quantile(1.01).is_err()); - } - - #[test] - fn test_basic_histogram_data() { - let values = &[ - 0.754225035, - 0.752900282, - 0.752812246, - 0.752602367, - 0.754310155, - 0.753525981, - 0.752981082, - 0.752715536, - 0.751667941, - 0.755079054, - 0.753528150, - 0.755188464, - 0.752508723, - 0.750064549, - 0.753960428, - 0.751139298, - 0.752523560, - 0.753253428, - 0.753498342, - 0.751858358, - 0.752104636, - 0.753841300, - 0.754467374, - 0.753814334, - 0.750881719, - 0.753182556, - 0.752576884, - 0.753945708, - 0.753571911, - 0.752314573, - 0.752586651, - ]; - - let c = Config::defaults(); - let mut dd = DDSketch::new(c); - - for value in values { - dd.add(*value); - } - - assert_eq!(dd.max(), Some(0.755188464)); - assert_eq!(dd.min(), Some(0.750064549)); - assert_eq!(dd.count(), 31); - assert_eq!(dd.sum(), Some(23.343630625000003)); - - assert!(dd.quantile(0.25).unwrap().is_some()); - assert!(dd.quantile(0.5).unwrap().is_some()); - assert!(dd.quantile(0.75).unwrap().is_some()); - } - - #[test] - fn test_length() { - let mut dd = DDSketch::default(); - assert_eq!(dd.length(), 0); - - dd.add(1.0); - assert_eq!(dd.length(), 128); - dd.add(2.0); - dd.add(3.0); - assert_eq!(dd.length(), 128); - - dd.add(-1.0); - assert_eq!(dd.length(), 256); - dd.add(-2.0); - dd.add(-3.0); - assert_eq!(dd.length(), 256); - } -} diff --git a/sketches-ddsketch/src/encoding.rs b/sketches-ddsketch/src/encoding.rs deleted file mode 100644 index df23336f2..000000000 --- a/sketches-ddsketch/src/encoding.rs +++ /dev/null @@ -1,813 +0,0 @@ -//! Java-compatible binary encoding/decoding for DDSketch. -//! -//! This module implements the binary format used by the Java -//! `com.datadoghq.sketch.ddsketch.DDSketchWithExactSummaryStatistics` class -//! from the DataDog/sketches-java library. It enables cross-language -//! serialization so that sketches produced in Rust can be deserialized -//! and merged by Java consumers. - -use std::fmt; - -use crate::config::Config; -use crate::ddsketch::DDSketch; -use crate::store::Store; - -// --------------------------------------------------------------------------- -// Flag byte layout -// -// Each flag byte packs a 2-bit type ordinal in the low bits and a 6-bit -// subflag in the upper bits: (subflag << 2) | type_ordinal -// See: https://github.com/DataDog/sketches-java/blob/master/src/main/java/com/datadoghq/sketch/ddsketch/encoding/Flag.java -// --------------------------------------------------------------------------- - -/// The 2-bit type field occupying the low bits of every flag byte. -#[repr(u8)] -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -enum FlagType { - SketchFeatures = 0, - PositiveStore = 1, - IndexMapping = 2, - NegativeStore = 3, -} - -impl FlagType { - fn from_byte(b: u8) -> Option { - match b & 0x03 { - 0 => Some(Self::SketchFeatures), - 1 => Some(Self::PositiveStore), - 2 => Some(Self::IndexMapping), - 3 => Some(Self::NegativeStore), - _ => None, - } - } -} - -/// Construct a flag byte from a subflag and a type. -const fn flag(subflag: u8, flag_type: FlagType) -> u8 { - (subflag << 2) | (flag_type as u8) -} - -// Pre-computed flag bytes for the sketch features we encode/decode. -const FLAG_INDEX_MAPPING_LOG: u8 = flag(0, FlagType::IndexMapping); // 0x02 -const FLAG_ZERO_COUNT: u8 = flag(1, FlagType::SketchFeatures); // 0x04 -const FLAG_COUNT: u8 = flag(0x28, FlagType::SketchFeatures); // 0xA0 -const FLAG_SUM: u8 = flag(0x21, FlagType::SketchFeatures); // 0x84 -const FLAG_MIN: u8 = flag(0x22, FlagType::SketchFeatures); // 0x88 -const FLAG_MAX: u8 = flag(0x23, FlagType::SketchFeatures); // 0x8C - -/// BinEncodingMode subflags for store flag bytes. -/// See: https://github.com/DataDog/sketches-java/blob/master/src/main/java/com/datadoghq/sketch/ddsketch/encoding/BinEncodingMode.java -#[repr(u8)] -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -enum BinEncodingMode { - IndexDeltasAndCounts = 1, - IndexDeltas = 2, - ContiguousCounts = 3, -} - -impl BinEncodingMode { - fn from_subflag(subflag: u8) -> Option { - match subflag { - 1 => Some(Self::IndexDeltasAndCounts), - 2 => Some(Self::IndexDeltas), - 3 => Some(Self::ContiguousCounts), - _ => None, - } - } -} - -const VAR_DOUBLE_ROTATE_DISTANCE: u32 = 6; -const MAX_VAR_LEN_64: usize = 9; - -const DEFAULT_MAX_BINS: u32 = 2048; - -// --------------------------------------------------------------------------- -// Error type -// --------------------------------------------------------------------------- - -#[derive(Debug, Clone)] -pub enum DecodeError { - UnexpectedEof, - InvalidFlag(u8), - InvalidData(String), -} - -impl fmt::Display for DecodeError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Self::UnexpectedEof => write!(f, "unexpected end of input"), - Self::InvalidFlag(b) => write!(f, "invalid flag byte: 0x{b:02X}"), - Self::InvalidData(msg) => write!(f, "invalid data: {msg}"), - } - } -} - -impl std::error::Error for DecodeError {} - -// --------------------------------------------------------------------------- -// VarEncoding — bit-exact port of Java VarEncodingHelper -// See: https://github.com/DataDog/sketches-java/blob/master/src/main/java/com/datadoghq/sketch/ddsketch/encoding/VarEncodingHelper.java -// --------------------------------------------------------------------------- - -fn encode_unsigned_var_long(out: &mut Vec, mut value: u64) { - let length = ((63 - value.leading_zeros() as i32) / 7).clamp(0, 8); - for _ in 0..length { - out.push((value as u8) | 0x80); - value >>= 7; - } - out.push(value as u8); -} - -fn decode_unsigned_var_long(input: &mut &[u8]) -> Result { - let mut value: u64 = 0; - let mut shift: u32 = 0; - loop { - let next = read_byte(input)?; - if next < 0x80 || shift == 56 { - return Ok(value | (u64::from(next) << shift)); - } - value |= (u64::from(next) & 0x7F) << shift; - shift += 7; - } -} - -/// ZigZag encode then var-long encode. -fn encode_signed_var_long(out: &mut Vec, value: i64) { - let encoded = ((value >> 63) ^ (value << 1)) as u64; - encode_unsigned_var_long(out, encoded); -} - -fn decode_signed_var_long(input: &mut &[u8]) -> Result { - let encoded = decode_unsigned_var_long(input)?; - Ok(((encoded >> 1) as i64) ^ -((encoded & 1) as i64)) -} - -fn double_to_var_bits(value: f64) -> u64 { - let bits = f64::to_bits(value + 1.0).wrapping_sub(f64::to_bits(1.0)); - bits.rotate_left(VAR_DOUBLE_ROTATE_DISTANCE) -} - -fn var_bits_to_double(bits: u64) -> f64 { - f64::from_bits( - bits.rotate_right(VAR_DOUBLE_ROTATE_DISTANCE) - .wrapping_add(f64::to_bits(1.0)), - ) - 1.0 -} - -fn encode_var_double(out: &mut Vec, value: f64) { - let mut bits = double_to_var_bits(value); - for _ in 0..MAX_VAR_LEN_64 - 1 { - let next = (bits >> 57) as u8; - bits <<= 7; - if bits == 0 { - out.push(next); - return; - } - out.push(next | 0x80); - } - out.push((bits >> 56) as u8); -} - -fn decode_var_double(input: &mut &[u8]) -> Result { - let mut bits: u64 = 0; - let mut shift: i32 = 57; // 8*8 - 7 - loop { - let next = read_byte(input)?; - if shift == 1 { - bits |= u64::from(next); - break; - } - if next < 0x80 { - bits |= u64::from(next) << shift; - break; - } - bits |= (u64::from(next) & 0x7F) << shift; - shift -= 7; - } - Ok(var_bits_to_double(bits)) -} - -// --------------------------------------------------------------------------- -// Byte-level helpers -// --------------------------------------------------------------------------- - -fn read_byte(input: &mut &[u8]) -> Result { - match input.split_first() { - Some((&byte, rest)) => { - *input = rest; - Ok(byte) - } - None => Err(DecodeError::UnexpectedEof), - } -} - -fn write_f64_le(out: &mut Vec, value: f64) { - out.extend_from_slice(&value.to_le_bytes()); -} - -fn read_f64_le(input: &mut &[u8]) -> Result { - if input.len() < 8 { - return Err(DecodeError::UnexpectedEof); - } - let (bytes, rest) = input.split_at(8); - *input = rest; - // bytes is guaranteed to be length 8 by the split_at above. - let arr = [ - bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7], - ]; - Ok(f64::from_le_bytes(arr)) -} - -// --------------------------------------------------------------------------- -// Store encoding/decoding -// See: https://github.com/DataDog/sketches-java/blob/master/src/main/java/com/datadoghq/sketch/ddsketch/store/DenseStore.java (encode/decode methods) -// --------------------------------------------------------------------------- - -/// Collect non-zero bins in the store as (absolute_index, count) pairs. -/// -/// Allocation is acceptable here: this runs once per encode and the Vec -/// has at most `max_num_bins` entries. -fn collect_non_zero_bins(store: &Store) -> Vec<(i32, u64)> { - if store.count == 0 { - return Vec::new(); - } - let start = (store.min_key - store.offset) as usize; - let end = ((store.max_key - store.offset + 1) as usize).min(store.bins.len()); - store.bins[start..end] - .iter() - .enumerate() - .filter(|&(_, &count)| count > 0) - .map(|(i, &count)| (start as i32 + i as i32 + store.offset, count)) - .collect() -} - -fn encode_store(out: &mut Vec, store: &Store, flag_type: FlagType) { - let bins = collect_non_zero_bins(store); - if bins.is_empty() { - return; - } - - out.push(flag(BinEncodingMode::IndexDeltasAndCounts as u8, flag_type)); - encode_unsigned_var_long(out, bins.len() as u64); - - let mut prev_index: i64 = 0; - for &(index, count) in &bins { - encode_signed_var_long(out, i64::from(index) - prev_index); - encode_var_double(out, count as f64); - prev_index = i64::from(index); - } -} - -fn decode_store(input: &mut &[u8], subflag: u8, bin_limit: usize) -> Result { - let mode = BinEncodingMode::from_subflag(subflag).ok_or_else(|| { - DecodeError::InvalidData(format!("unknown bin encoding mode subflag: {subflag}")) - })?; - let num_bins = decode_unsigned_var_long(input)? as usize; - let mut store = Store::new(bin_limit); - - match mode { - BinEncodingMode::IndexDeltasAndCounts => { - let mut index: i64 = 0; - for _ in 0..num_bins { - index += decode_signed_var_long(input)?; - let count = decode_var_double(input)?; - store.add_count(index as i32, count as u64); - } - } - BinEncodingMode::IndexDeltas => { - let mut index: i64 = 0; - for _ in 0..num_bins { - index += decode_signed_var_long(input)?; - store.add_count(index as i32, 1); - } - } - BinEncodingMode::ContiguousCounts => { - let start_index = decode_signed_var_long(input)?; - let index_delta = decode_signed_var_long(input)?; - let mut index = start_index; - for _ in 0..num_bins { - let count = decode_var_double(input)?; - store.add_count(index as i32, count as u64); - index += index_delta; - } - } - } - - Ok(store) -} - -// --------------------------------------------------------------------------- -// Top-level encode / decode -// --------------------------------------------------------------------------- - -/// Encode a DDSketch into the Java-compatible binary format. -/// -/// The output follows the encoding order of -/// `DDSketchWithExactSummaryStatistics.encode()` then `DDSketch.encode()`: -/// -/// 1. Summary statistics: COUNT, MIN, MAX (if count > 0) -/// 2. SUM (if sum != 0) -/// 3. Index mapping (LOG layout): gamma, indexOffset -/// 4. Zero count (if > 0) -/// 5. Positive store bins -/// 6. Negative store bins -pub fn encode_to_java_bytes(sketch: &DDSketch) -> Vec { - let mut out = Vec::new(); - let count = sketch.count() as f64; - - // Summary statistics (DDSketchWithExactSummaryStatistics.encode) - if count != 0.0 { - out.push(FLAG_COUNT); - encode_var_double(&mut out, count); - out.push(FLAG_MIN); - write_f64_le(&mut out, sketch.min); - out.push(FLAG_MAX); - write_f64_le(&mut out, sketch.max); - } - if sketch.sum != 0.0 { - out.push(FLAG_SUM); - write_f64_le(&mut out, sketch.sum); - } - - // DDSketch.encode: index mapping + zero count + stores - out.push(FLAG_INDEX_MAPPING_LOG); - write_f64_le(&mut out, sketch.config.gamma); - write_f64_le(&mut out, 0.0_f64); - - if sketch.zero_count != 0 { - out.push(FLAG_ZERO_COUNT); - encode_var_double(&mut out, sketch.zero_count as f64); - } - - encode_store(&mut out, &sketch.store, FlagType::PositiveStore); - encode_store(&mut out, &sketch.negative_store, FlagType::NegativeStore); - - out -} - -/// Decode a DDSketch from the Java-compatible binary format. -/// -/// Accepts bytes with or without a `0x02` version prefix. -pub fn decode_from_java_bytes(bytes: &[u8]) -> Result { - if bytes.is_empty() { - return Err(DecodeError::UnexpectedEof); - } - - let mut input = bytes; - - // Skip optional version prefix (0x02 followed by a valid flag byte). - if input.len() >= 2 && input[0] == 0x02 && is_valid_flag_byte(input[1]) { - input = &input[1..]; - } - - let mut gamma: Option = None; - let mut zero_count: f64 = 0.0; - let mut sum: f64 = 0.0; - let mut min: f64 = f64::INFINITY; - let mut max: f64 = f64::NEG_INFINITY; - let mut positive_store: Option = None; - let mut negative_store: Option = None; - - while !input.is_empty() { - let flag_byte = read_byte(&mut input)?; - let flag_type = - FlagType::from_byte(flag_byte).ok_or(DecodeError::InvalidFlag(flag_byte))?; - let subflag = flag_byte >> 2; - - match flag_type { - FlagType::IndexMapping => { - gamma = Some(read_f64_le(&mut input)?); - let _index_offset = read_f64_le(&mut input)?; - } - FlagType::SketchFeatures => match flag_byte { - FLAG_ZERO_COUNT => zero_count += decode_var_double(&mut input)?, - FLAG_COUNT => { - let _count = decode_var_double(&mut input)?; - } - FLAG_SUM => sum = read_f64_le(&mut input)?, - FLAG_MIN => min = read_f64_le(&mut input)?, - FLAG_MAX => max = read_f64_le(&mut input)?, - _ => return Err(DecodeError::InvalidFlag(flag_byte)), - }, - FlagType::PositiveStore => { - positive_store = Some(decode_store( - &mut input, - subflag, - DEFAULT_MAX_BINS as usize, - )?); - } - FlagType::NegativeStore => { - negative_store = Some(decode_store( - &mut input, - subflag, - DEFAULT_MAX_BINS as usize, - )?); - } - } - } - - let g = gamma.unwrap_or_else(|| Config::defaults().gamma); - let config = Config::from_gamma(g); - let store = positive_store.unwrap_or_else(|| Store::new(config.max_num_bins as usize)); - let neg = negative_store.unwrap_or_else(|| Store::new(config.max_num_bins as usize)); - - Ok(DDSketch { - config, - store, - negative_store: neg, - min, - max, - sum, - zero_count: zero_count as u64, - }) -} - -/// Check whether a byte is a valid flag byte for the DDSketch binary format. -fn is_valid_flag_byte(b: u8) -> bool { - // Known sketch-feature flags - if matches!( - b, - FLAG_ZERO_COUNT | FLAG_COUNT | FLAG_SUM | FLAG_MIN | FLAG_MAX | FLAG_INDEX_MAPPING_LOG - ) { - return true; - } - let Some(flag_type) = FlagType::from_byte(b) else { - return false; - }; - let subflag = b >> 2; - match flag_type { - FlagType::PositiveStore | FlagType::NegativeStore => (1..=3).contains(&subflag), - FlagType::IndexMapping => subflag <= 4, // LOG=0, LOG_LINEAR=1 .. LOG_QUARTIC=4 - _ => false, - } -} - -// --------------------------------------------------------------------------- -// Tests -// --------------------------------------------------------------------------- - -#[cfg(test)] -mod tests { - use super::*; - use crate::{Config, DDSketch}; - - // --- VarEncoding unit tests --- - - #[test] - fn test_unsigned_var_long_zero() { - let mut buf = Vec::new(); - encode_unsigned_var_long(&mut buf, 0); - assert_eq!(buf, [0x00]); - - let mut input = buf.as_slice(); - assert_eq!(decode_unsigned_var_long(&mut input).unwrap(), 0); - assert!(input.is_empty()); - } - - #[test] - fn test_unsigned_var_long_small() { - let mut buf = Vec::new(); - encode_unsigned_var_long(&mut buf, 1); - assert_eq!(buf, [0x01]); - - let mut input = buf.as_slice(); - assert_eq!(decode_unsigned_var_long(&mut input).unwrap(), 1); - } - - #[test] - fn test_unsigned_var_long_128() { - let mut buf = Vec::new(); - encode_unsigned_var_long(&mut buf, 128); - assert_eq!(buf, [0x80, 0x01]); - - let mut input = buf.as_slice(); - assert_eq!(decode_unsigned_var_long(&mut input).unwrap(), 128); - } - - #[test] - fn test_unsigned_var_long_roundtrip() { - for v in [0u64, 1, 127, 128, 255, 256, 16383, 16384, u64::MAX] { - let mut buf = Vec::new(); - encode_unsigned_var_long(&mut buf, v); - let mut input = buf.as_slice(); - let decoded = decode_unsigned_var_long(&mut input).unwrap(); - assert_eq!(decoded, v, "roundtrip failed for {}", v); - assert!(input.is_empty()); - } - } - - #[test] - fn test_signed_var_long_roundtrip() { - for v in [0i64, 1, -1, 63, -64, 64, -65, i64::MAX, i64::MIN] { - let mut buf = Vec::new(); - encode_signed_var_long(&mut buf, v); - let mut input = buf.as_slice(); - let decoded = decode_signed_var_long(&mut input).unwrap(); - assert_eq!(decoded, v, "roundtrip failed for {}", v); - assert!(input.is_empty()); - } - } - - #[test] - fn test_var_double_roundtrip() { - for v in [0.0, 1.0, 2.0, 5.0, 15.0, 42.0, 100.0, 1e-9, 1e15, 0.5, 7.77] { - let mut buf = Vec::new(); - encode_var_double(&mut buf, v); - let mut input = buf.as_slice(); - let decoded = decode_var_double(&mut input).unwrap(); - assert!( - (decoded - v).abs() < 1e-15 || decoded == v, - "roundtrip failed for {}: got {}", - v, - decoded, - ); - assert!(input.is_empty()); - } - } - - #[test] - fn test_var_double_small_integers() { - let mut buf = Vec::new(); - encode_var_double(&mut buf, 1.0); - assert_eq!(buf.len(), 1, "VarDouble(1.0) should be 1 byte"); - - buf.clear(); - encode_var_double(&mut buf, 5.0); - assert_eq!(buf.len(), 1, "VarDouble(5.0) should be 1 byte"); - } - - // --- DDSketch encode/decode roundtrip tests --- - - #[test] - fn test_encode_empty_sketch() { - let sketch = DDSketch::new(Config::defaults()); - let bytes = sketch.to_java_bytes(); - assert!(!bytes.is_empty()); - - let decoded = DDSketch::from_java_bytes(&bytes).unwrap(); - assert_eq!(decoded.count(), 0); - assert_eq!(decoded.min(), None); - assert_eq!(decoded.max(), None); - assert_eq!(decoded.sum(), None); - } - - #[test] - fn test_encode_simple_sketch() { - let mut sketch = DDSketch::new(Config::defaults()); - for v in [1.0, 2.0, 3.0, 4.0, 5.0] { - sketch.add(v); - } - - let bytes = sketch.to_java_bytes(); - let decoded = DDSketch::from_java_bytes(&bytes).unwrap(); - - assert_eq!(decoded.count(), 5); - assert_eq!(decoded.min(), Some(1.0)); - assert_eq!(decoded.max(), Some(5.0)); - assert_eq!(decoded.sum(), Some(15.0)); - - assert_quantiles_match(&sketch, &decoded, &[0.5, 0.9, 0.95, 0.99]); - } - - #[test] - fn test_encode_single_value() { - let mut sketch = DDSketch::new(Config::defaults()); - sketch.add(42.0); - - let bytes = sketch.to_java_bytes(); - let decoded = DDSketch::from_java_bytes(&bytes).unwrap(); - - assert_eq!(decoded.count(), 1); - assert_eq!(decoded.min(), Some(42.0)); - assert_eq!(decoded.max(), Some(42.0)); - assert_eq!(decoded.sum(), Some(42.0)); - } - - #[test] - fn test_encode_negative_values() { - let mut sketch = DDSketch::new(Config::defaults()); - for v in [-3.0, -1.0, 2.0, 5.0] { - sketch.add(v); - } - - let bytes = sketch.to_java_bytes(); - let decoded = DDSketch::from_java_bytes(&bytes).unwrap(); - - assert_eq!(decoded.count(), 4); - assert_eq!(decoded.min(), Some(-3.0)); - assert_eq!(decoded.max(), Some(5.0)); - assert_eq!(decoded.sum(), Some(3.0)); - - assert_quantiles_match(&sketch, &decoded, &[0.0, 0.25, 0.5, 0.75, 1.0]); - } - - #[test] - fn test_encode_with_zero_value() { - let mut sketch = DDSketch::new(Config::defaults()); - for v in [0.0, 1.0, 2.0] { - sketch.add(v); - } - - let bytes = sketch.to_java_bytes(); - let decoded = DDSketch::from_java_bytes(&bytes).unwrap(); - - assert_eq!(decoded.count(), 3); - assert_eq!(decoded.min(), Some(0.0)); - assert_eq!(decoded.max(), Some(2.0)); - assert_eq!(decoded.sum(), Some(3.0)); - assert_eq!(decoded.zero_count, 1); - } - - #[test] - fn test_encode_large_range() { - let mut sketch = DDSketch::new(Config::defaults()); - sketch.add(0.001); - sketch.add(1_000_000.0); - - let bytes = sketch.to_java_bytes(); - let decoded = DDSketch::from_java_bytes(&bytes).unwrap(); - - assert_eq!(decoded.count(), 2); - assert_eq!(decoded.min(), Some(0.001)); - assert_eq!(decoded.max(), Some(1_000_000.0)); - } - - #[test] - fn test_encode_with_version_prefix() { - let mut sketch = DDSketch::new(Config::defaults()); - for v in [1.0, 2.0, 3.0] { - sketch.add(v); - } - - let bytes = sketch.to_java_bytes(); - - // Simulate Java's toByteArrayV2: prepend 0x02 - let mut v2_bytes = vec![0x02]; - v2_bytes.extend_from_slice(&bytes); - - let decoded = DDSketch::from_java_bytes(&v2_bytes).unwrap(); - assert_eq!(decoded.count(), 3); - assert_eq!(decoded.min(), Some(1.0)); - assert_eq!(decoded.max(), Some(3.0)); - } - - #[test] - fn test_byte_level_encoding() { - let mut sketch = DDSketch::new(Config::defaults()); - sketch.add(1.0); - - let bytes = sketch.to_java_bytes(); - - assert_eq!(bytes[0], FLAG_COUNT, "first byte should be COUNT flag"); - assert!( - bytes.contains(&FLAG_INDEX_MAPPING_LOG), - "should contain index mapping flag" - ); - } - - // --- Cross-language golden byte tests --- - // - // Golden bytes generated by Java's DDSketchWithExactSummaryStatistics.encode() - // using LogarithmicMapping(0.01) + CollapsingLowestDenseStore(2048). - - const GOLDEN_SIMPLE: &str = "a00588000000000000f03f8c0000000000001440840000000000002e4002fd4a815abf52f03f000000000000000005050002440228021e021602"; - const GOLDEN_SINGLE: &str = "a0028800000000000045408c000000000000454084000000000000454002fd4a815abf52f03f00000000000000000501f40202"; - const GOLDEN_NEGATIVE: &str = "a084408800000000000008c08c000000000000144084000000000000084002fd4a815abf52f03f0000000000000000050244025c02070200026c02"; - const GOLDEN_ZERO: &str = "a0048800000000000000008c000000000000004084000000000000084002fd4a815abf52f03f00000000000000000402050200024402"; - const GOLDEN_EMPTY: &str = "02fd4a815abf52f03f0000000000000000"; - const GOLDEN_MANY: &str = "a08d1488000000000000f03f8c0000000000005940840000000000bab34002fd4a815abf52f03f000000000000000005550002440228021e021602120210020c020c020c0208020a020802060208020602060206020602040206020402040204020402040204020402040204020202040202020402020204020202020204020202020202020402020202020202020202020202020202020202020202020202020202020203020202020202020302020202020302020202020302020203020202030202020302030202020302030203020202030203020302030202"; - - fn hex_to_bytes(hex: &str) -> Vec { - (0..hex.len()) - .step_by(2) - .map(|i| u8::from_str_radix(&hex[i..i + 2], 16).unwrap()) - .collect() - } - - fn bytes_to_hex(bytes: &[u8]) -> String { - bytes.iter().map(|b| format!("{b:02x}")).collect() - } - - fn assert_golden(label: &str, sketch: &DDSketch, golden_hex: &str) { - let bytes = sketch.to_java_bytes(); - let expected = hex_to_bytes(golden_hex); - assert_eq!( - bytes, - expected, - "Rust encoding doesn't match Java golden bytes for {}.\nRust: {}\nJava: {}", - label, - bytes_to_hex(&bytes), - golden_hex, - ); - } - - fn assert_quantiles_match(a: &DDSketch, b: &DDSketch, quantiles: &[f64]) { - for &q in quantiles { - let va = a.quantile(q).unwrap().unwrap(); - let vb = b.quantile(q).unwrap().unwrap(); - assert!( - (va - vb).abs() / va.abs().max(1e-15) < 1e-12, - "quantile({}) mismatch: {} vs {}", - q, - va, - vb, - ); - } - } - - #[test] - fn test_cross_language_simple() { - let mut sketch = DDSketch::new(Config::defaults()); - for v in [1.0, 2.0, 3.0, 4.0, 5.0] { - sketch.add(v); - } - assert_golden("SIMPLE", &sketch, GOLDEN_SIMPLE); - } - - #[test] - fn test_cross_language_single() { - let mut sketch = DDSketch::new(Config::defaults()); - sketch.add(42.0); - assert_golden("SINGLE", &sketch, GOLDEN_SINGLE); - } - - #[test] - fn test_cross_language_negative() { - let mut sketch = DDSketch::new(Config::defaults()); - for v in [-3.0, -1.0, 2.0, 5.0] { - sketch.add(v); - } - assert_golden("NEGATIVE", &sketch, GOLDEN_NEGATIVE); - } - - #[test] - fn test_cross_language_zero() { - let mut sketch = DDSketch::new(Config::defaults()); - for v in [0.0, 1.0, 2.0] { - sketch.add(v); - } - assert_golden("ZERO", &sketch, GOLDEN_ZERO); - } - - #[test] - fn test_cross_language_empty() { - let sketch = DDSketch::new(Config::defaults()); - assert_golden("EMPTY", &sketch, GOLDEN_EMPTY); - } - - #[test] - fn test_cross_language_many() { - let mut sketch = DDSketch::new(Config::defaults()); - for i in 1..=100 { - sketch.add(i as f64); - } - assert_golden("MANY", &sketch, GOLDEN_MANY); - } - - #[test] - fn test_decode_java_golden_bytes() { - for (name, hex) in [ - ("SIMPLE", GOLDEN_SIMPLE), - ("SINGLE", GOLDEN_SINGLE), - ("NEGATIVE", GOLDEN_NEGATIVE), - ("ZERO", GOLDEN_ZERO), - ("EMPTY", GOLDEN_EMPTY), - ("MANY", GOLDEN_MANY), - ] { - let bytes = hex_to_bytes(hex); - let result = DDSketch::from_java_bytes(&bytes); - assert!( - result.is_ok(), - "failed to decode {}: {:?}", - name, - result.err() - ); - } - } - - #[test] - fn test_encode_decode_many_values() { - let mut sketch = DDSketch::new(Config::defaults()); - for i in 1..=100 { - sketch.add(i as f64); - } - - let bytes = sketch.to_java_bytes(); - let decoded = DDSketch::from_java_bytes(&bytes).unwrap(); - - assert_eq!(decoded.count(), 100); - assert_eq!(decoded.min(), Some(1.0)); - assert_eq!(decoded.max(), Some(100.0)); - assert_eq!(decoded.sum(), Some(5050.0)); - - let alpha = 0.01; - let orig_p95 = sketch.quantile(0.95).unwrap().unwrap(); - let dec_p95 = decoded.quantile(0.95).unwrap().unwrap(); - assert!( - (orig_p95 - dec_p95).abs() / orig_p95 < alpha, - "p95 mismatch: {} vs {}", - orig_p95, - dec_p95, - ); - } -} diff --git a/sketches-ddsketch/src/lib.rs b/sketches-ddsketch/src/lib.rs deleted file mode 100644 index f10ad7e3f..000000000 --- a/sketches-ddsketch/src/lib.rs +++ /dev/null @@ -1,52 +0,0 @@ -//! This crate provides a direct port of the [Golang](https://github.com/DataDog/sketches-go) -//! [DDSketch](https://arxiv.org/pdf/1908.10693.pdf) implementation to Rust. All efforts -//! have been made to keep this as close to the original implementation as possible, with a few -//! tweaks to get closer to idiomatic Rust. -//! -//! # Usage -//! -//! Add multiple samples to a DDSketch and invoke the `quantile` method to pull any quantile from -//! 0.0* to *1.0*. -//! -//! ```rust -//! use sketches_ddsketch::{Config, DDSketch}; -//! -//! let c = Config::defaults(); -//! let mut d = DDSketch::new(c); -//! -//! d.add(1.0); -//! d.add(1.0); -//! d.add(1.0); -//! -//! let q = d.quantile(0.50).unwrap(); -//! -//! assert!(q < Some(1.02)); -//! assert!(q > Some(0.98)); -//! ``` -//! -//! Sketches can also be merged. -//! -//! ```rust -//! use sketches_ddsketch::{Config, DDSketch}; -//! -//! let c = Config::defaults(); -//! let mut d1 = DDSketch::new(c); -//! let mut d2 = DDSketch::new(c); -//! -//! d1.add(1.0); -//! d2.add(2.0); -//! d2.add(2.0); -//! -//! d1.merge(&d2); -//! -//! assert_eq!(d1.count(), 3); -//! ``` - -pub use self::config::Config; -pub use self::ddsketch::{DDSketch, DDSketchError}; -pub use self::encoding::DecodeError; - -mod config; -mod ddsketch; -pub mod encoding; -mod store; diff --git a/sketches-ddsketch/src/store.rs b/sketches-ddsketch/src/store.rs deleted file mode 100644 index cf5e6fbb0..000000000 --- a/sketches-ddsketch/src/store.rs +++ /dev/null @@ -1,252 +0,0 @@ -#[cfg(feature = "use_serde")] -use serde::{Deserialize, Serialize}; - -const CHUNK_SIZE: i32 = 128; - -// Divide the `dividend` by the `divisor`, rounding towards positive infinity. -// -// Similar to the nightly only `std::i32::div_ceil`. -fn div_ceil(dividend: i32, divisor: i32) -> i32 { - (dividend + divisor - 1) / divisor -} - -/// CollapsingLowestDenseStore -#[derive(Clone, Debug)] -#[cfg_attr(feature = "use_serde", derive(Serialize, Deserialize))] -pub struct Store { - pub(crate) bins: Vec, - pub(crate) count: u64, - pub(crate) min_key: i32, - pub(crate) max_key: i32, - pub(crate) offset: i32, - pub(crate) bin_limit: usize, - is_collapsed: bool, -} - -impl Store { - pub fn new(bin_limit: usize) -> Self { - Store { - bins: Vec::new(), - count: 0, - min_key: i32::MAX, - max_key: i32::MIN, - offset: 0, - bin_limit, - is_collapsed: false, - } - } - - /// Return the number of bins. - pub fn length(&self) -> i32 { - self.bins.len() as i32 - } - - pub fn is_empty(&self) -> bool { - self.bins.is_empty() - } - - pub fn add(&mut self, key: i32) { - let idx = self.get_index(key); - self.bins[idx] += 1; - self.count += 1; - } - - /// See Java: https://github.com/DataDog/sketches-java/blob/master/src/main/java/com/datadoghq/sketch/ddsketch/store/DenseStore.java (add(int index, double count) method) - pub(crate) fn add_count(&mut self, key: i32, count: u64) { - let idx = self.get_index(key); - self.bins[idx] += count; - self.count += count; - } - - fn get_index(&mut self, key: i32) -> usize { - if key < self.min_key { - if self.is_collapsed { - return 0; - } - - self.extend_range(key, None); - if self.is_collapsed { - return 0; - } - } else if key > self.max_key { - self.extend_range(key, None); - } - - (key - self.offset) as usize - } - - fn extend_range(&mut self, key: i32, second_key: Option) { - let second_key = second_key.unwrap_or(key); - let new_min_key = i32::min(key, i32::min(second_key, self.min_key)); - let new_max_key = i32::max(key, i32::max(second_key, self.max_key)); - - if self.is_empty() { - let new_len = self.get_new_length(new_min_key, new_max_key); - self.bins.resize(new_len, 0); - self.offset = new_min_key; - self.adjust(new_min_key, new_max_key); - } else if new_min_key >= self.min_key && new_max_key < self.offset + self.length() { - self.min_key = new_min_key; - self.max_key = new_max_key; - } else { - // Grow bins - let new_length = self.get_new_length(new_min_key, new_max_key); - if new_length > self.length() as usize { - self.bins.resize(new_length, 0); - } - self.adjust(new_min_key, new_max_key); - } - } - - fn get_new_length(&self, new_min_key: i32, new_max_key: i32) -> usize { - let desired_length = new_max_key - new_min_key + 1; - usize::min( - (CHUNK_SIZE * div_ceil(desired_length, CHUNK_SIZE)) as usize, - self.bin_limit, - ) - } - - fn adjust(&mut self, new_min_key: i32, new_max_key: i32) { - if new_max_key - new_min_key + 1 > self.length() { - let new_min_key = new_max_key - self.length() + 1; - - if new_min_key >= self.max_key { - // Put everything in the first bin. - self.offset = new_min_key; - self.min_key = new_min_key; - self.bins.fill(0); - self.bins[0] = self.count; - } else { - let shift = self.offset - new_min_key; - if shift < 0 { - let collapse_start_index = (self.min_key - self.offset) as usize; - let collapse_end_index = (new_min_key - self.offset) as usize; - let collapsed_count: u64 = self.bins[collapse_start_index..collapse_end_index] - .iter() - .sum(); - let zero_len = (new_min_key - self.min_key) as usize; - self.bins.splice( - collapse_start_index..collapse_end_index, - std::iter::repeat_n(0, zero_len), - ); - self.bins[collapse_end_index] += collapsed_count; - } - self.min_key = new_min_key; - self.shift_bins(shift); - } - - self.max_key = new_max_key; - self.is_collapsed = true; - } else { - self.center_bins(new_min_key, new_max_key); - self.min_key = new_min_key; - self.max_key = new_max_key; - } - } - - fn shift_bins(&mut self, shift: i32) { - if shift > 0 { - let shift = shift as usize; - self.bins.rotate_right(shift); - for idx in 0..shift { - self.bins[idx] = 0; - } - } else { - let shift = shift.unsigned_abs() as usize; - for idx in 0..shift { - self.bins[idx] = 0; - } - self.bins.rotate_left(shift); - } - - self.offset -= shift; - } - - fn center_bins(&mut self, new_min_key: i32, new_max_key: i32) { - let middle_key = new_min_key + (new_max_key - new_min_key + 1) / 2; - let shift = self.offset + self.length() / 2 - middle_key; - self.shift_bins(shift) - } - - pub fn key_at_rank(&self, rank: u64) -> i32 { - let mut n = 0; - for (i, bin) in self.bins.iter().enumerate() { - n += *bin; - if n > rank { - return i as i32 + self.offset; - } - } - - self.max_key - } - - pub fn count(&self) -> u64 { - self.count - } - - pub fn merge(&mut self, other: &Store) { - if other.count == 0 { - return; - } - - if self.count == 0 { - self.copy(other); - return; - } - - if other.min_key < self.min_key || other.max_key > self.max_key { - self.extend_range(other.min_key, Some(other.max_key)); - } - - let collapse_start_index = other.min_key - other.offset; - let mut collapse_end_index = i32::min(self.min_key, other.max_key + 1) - other.offset; - if collapse_end_index > collapse_start_index { - let collapsed_count: u64 = self.bins - [collapse_start_index as usize..collapse_end_index as usize] - .iter() - .sum(); - self.bins[0] += collapsed_count; - } else { - collapse_end_index = collapse_start_index; - } - - for key in (collapse_end_index + other.offset)..(other.max_key + 1) { - self.bins[(key - self.offset) as usize] += other.bins[(key - other.offset) as usize] - } - - self.count += other.count; - } - - fn copy(&mut self, o: &Store) { - self.bins = o.bins.clone(); - self.count = o.count; - self.min_key = o.min_key; - self.max_key = o.max_key; - self.offset = o.offset; - self.bin_limit = o.bin_limit; - self.is_collapsed = o.is_collapsed; - } -} - -#[cfg(test)] -mod tests { - use crate::store::Store; - - #[test] - fn test_simple_store() { - let mut s = Store::new(2048); - - for i in 0..2048 { - s.add(i); - } - } - - #[test] - fn test_simple_store_rev() { - let mut s = Store::new(2048); - - for i in (0..2048).rev() { - s.add(i); - } - } -} diff --git a/sketches-ddsketch/tests/common/dataset.rs b/sketches-ddsketch/tests/common/dataset.rs deleted file mode 100644 index e2e8c700b..000000000 --- a/sketches-ddsketch/tests/common/dataset.rs +++ /dev/null @@ -1,88 +0,0 @@ -use std::cmp::Ordering; -use std::f64::NAN; - -pub struct Dataset { - values: Vec, - sum: f64, - sorted: bool, -} - -fn cmp_f64(a: &f64, b: &f64) -> Ordering { - assert!(!a.is_nan() && !b.is_nan()); - - if a < b { - return Ordering::Less; - } else if a > b { - return Ordering::Greater; - } else { - return Ordering::Equal; - } -} - -impl Dataset { - pub fn new() -> Self { - Dataset { - values: Vec::new(), - sum: 0.0, - sorted: false, - } - } - - pub fn add(&mut self, value: f64) { - self.values.push(value); - self.sum += value; - self.sorted = false; - } - - // pub fn quantile(&mut self, q: f64) -> f64 { - // self.lower_quantile(q) - // } - - pub fn lower_quantile(&mut self, q: f64) -> f64 { - if q < 0.0 || q > 1.0 || self.values.len() == 0 { - return NAN; - } - - self.sort(); - let rank = q * (self.values.len() - 1) as f64; - - self.values[rank.floor() as usize] - } - - pub fn upper_quantile(&mut self, q: f64) -> f64 { - if q < 0.0 || q > 1.0 || self.values.len() == 0 { - return NAN; - } - - self.sort(); - let rank = q * (self.values.len() - 1) as f64; - self.values[rank.ceil() as usize] - } - - pub fn min(&mut self) -> f64 { - self.sort(); - self.values[0] - } - - pub fn max(&mut self) -> f64 { - self.sort(); - self.values[self.values.len() - 1] - } - - pub fn sum(&self) -> f64 { - self.sum - } - - pub fn count(&self) -> usize { - self.values.len() - } - - fn sort(&mut self) { - if self.sorted { - return; - } - - self.values.sort_by(cmp_f64); - self.sorted = true; - } -} diff --git a/sketches-ddsketch/tests/common/generator.rs b/sketches-ddsketch/tests/common/generator.rs deleted file mode 100644 index b1b9c8548..000000000 --- a/sketches-ddsketch/tests/common/generator.rs +++ /dev/null @@ -1,100 +0,0 @@ -extern crate rand; -extern crate rand_distr; - -use rand::prelude::*; - -pub trait Generator { - fn generate(&mut self) -> f64; -} - -// Constant generator -// -pub struct Constant { - value: f64, -} -impl Constant { - pub fn new(value: f64) -> Self { - Constant { value } - } -} -impl Generator for Constant { - fn generate(&mut self) -> f64 { - self.value - } -} - -// Linear generator -// -pub struct Linear { - current_value: f64, - step: f64, -} -impl Linear { - pub fn new(start_value: f64, step: f64) -> Self { - Linear { - current_value: start_value, - step, - } - } -} -impl Generator for Linear { - fn generate(&mut self) -> f64 { - let value = self.current_value; - self.current_value += self.step; - value - } -} - -// Normal distribution generator -// -pub struct Normal { - distr: rand_distr::Normal, -} -impl Normal { - pub fn new(mean: f64, stddev: f64) -> Self { - Normal { - distr: rand_distr::Normal::new(mean, stddev).unwrap(), - } - } -} -impl Generator for Normal { - fn generate(&mut self) -> f64 { - self.distr.sample(&mut rand::thread_rng()) - } -} - -// Lognormal distribution generator -// -pub struct Lognormal { - distr: rand_distr::LogNormal, -} -impl Lognormal { - pub fn new(mean: f64, stddev: f64) -> Self { - Lognormal { - distr: rand_distr::LogNormal::new(mean, stddev).unwrap(), - } - } -} -impl Generator for Lognormal { - fn generate(&mut self) -> f64 { - self.distr.sample(&mut rand::thread_rng()) - } -} - -// Exponential distribution generator -// -pub struct Exponential { - distr: rand_distr::Exp, -} -impl Exponential { - pub fn new(lambda: f64) -> Self { - Exponential { - distr: rand_distr::Exp::new(lambda).unwrap(), - } - } -} -impl Generator for Exponential { - fn generate(&mut self) -> f64 { - self.distr.sample(&mut rand::thread_rng()) - } -} diff --git a/sketches-ddsketch/tests/common/mod.rs b/sketches-ddsketch/tests/common/mod.rs deleted file mode 100644 index 5cfaae4b8..000000000 --- a/sketches-ddsketch/tests/common/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod dataset; -pub mod generator; diff --git a/sketches-ddsketch/tests/test_ddsketch.rs b/sketches-ddsketch/tests/test_ddsketch.rs deleted file mode 100644 index b5d54df18..000000000 --- a/sketches-ddsketch/tests/test_ddsketch.rs +++ /dev/null @@ -1,316 +0,0 @@ -mod common; -use std::time::Instant; - -use common::dataset::Dataset; -use common::generator; -use common::generator::Generator; -use sketches_ddsketch::{Config, DDSketch}; - -const TEST_ALPHA: f64 = 0.01; -const TEST_MAX_BINS: u32 = 1024; -const TEST_MIN_VALUE: f64 = 1.0e-9; - -// Used for float equality -const TEST_ERROR_THRESH: f64 = 1.0e-9; - -const TEST_SIZES: [usize; 5] = [3, 5, 10, 100, 1000]; -const TEST_QUANTILES: [f64; 10] = [0.0, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99, 0.999, 1.0]; - -#[test] -fn test_constant() { - evaluate_sketches(|| Box::new(generator::Constant::new(42.0))); -} - -#[test] -fn test_linear() { - evaluate_sketches(|| Box::new(generator::Linear::new(0.0, 1.0))); -} - -#[test] -fn test_normal() { - evaluate_sketches(|| Box::new(generator::Normal::new(35.0, 1.0))); -} - -#[test] -fn test_lognormal() { - evaluate_sketches(|| Box::new(generator::Lognormal::new(0.0, 2.0))); -} - -#[test] -fn test_exponential() { - evaluate_sketches(|| Box::new(generator::Exponential::new(2.0))); -} - -fn evaluate_test_sizes(f: impl Fn(usize)) { - for sz in &TEST_SIZES { - f(*sz); - } -} - -fn evaluate_sketches(gen_factory: impl Fn() -> Box) { - evaluate_test_sizes(|sz: usize| { - let mut generator = gen_factory(); - evaluate_sketch(sz, &mut generator); - }); -} - -fn new_config() -> Config { - Config::new(TEST_ALPHA, TEST_MAX_BINS, TEST_MIN_VALUE) -} - -fn assert_float_eq(a: f64, b: f64) { - assert!((a - b).abs() < TEST_ERROR_THRESH, "{} != {}", a, b); -} - -fn evaluate_sketch(count: usize, generator: &mut Box) { - let c = new_config(); - let mut g = DDSketch::new(c); - - let mut d = Dataset::new(); - - for _i in 0..count { - let value = generator.generate(); - - g.add(value); - d.add(value); - } - - compare_sketches(&mut d, &g); -} - -fn compare_sketches(d: &mut Dataset, g: &DDSketch) { - for q in &TEST_QUANTILES { - let lower = d.lower_quantile(*q); - let upper = d.upper_quantile(*q); - - let min_expected; - if lower < 0.0 { - min_expected = lower * (1.0 + TEST_ALPHA); - } else { - min_expected = lower * (1.0 - TEST_ALPHA); - } - - let max_expected; - if upper > 0.0 { - max_expected = upper * (1.0 + TEST_ALPHA); - } else { - max_expected = upper * (1.0 - TEST_ALPHA); - } - - let quantile = g.quantile(*q).unwrap().unwrap(); - - assert!( - min_expected <= quantile, - "Lower than min, quantile: {}, wanted {} <= {}", - *q, - min_expected, - quantile - ); - assert!( - quantile <= max_expected, - "Higher than max, quantile: {}, wanted {} <= {}", - *q, - quantile, - max_expected - ); - - // verify that calls do not modify result (not mut so not possible?) - let quantile2 = g.quantile(*q).unwrap().unwrap(); - assert_eq!(quantile, quantile2); - } - - assert_eq!(g.min().unwrap(), d.min()); - assert_eq!(g.max().unwrap(), d.max()); - assert_float_eq(g.sum().unwrap(), d.sum()); - assert_eq!(g.count(), d.count()); -} - -#[test] -fn test_merge_normal() { - evaluate_test_sizes(|sz: usize| { - let c = new_config(); - let mut d = Dataset::new(); - let mut g1 = DDSketch::new(c); - - let mut generator1 = generator::Normal::new(35.0, 1.0); - for _ in (0..sz).step_by(3) { - let value = generator1.generate(); - g1.add(value); - d.add(value); - } - let mut g2 = DDSketch::new(c); - let mut generator2 = generator::Normal::new(50.0, 2.0); - for _ in (1..sz).step_by(3) { - let value = generator2.generate(); - g2.add(value); - d.add(value); - } - g1.merge(&g2).unwrap(); - - let mut g3 = DDSketch::new(c); - let mut generator3 = generator::Normal::new(40.0, 0.5); - for _ in (2..sz).step_by(3) { - let value = generator3.generate(); - g3.add(value); - d.add(value); - } - g1.merge(&g3).unwrap(); - - compare_sketches(&mut d, &g1); - }); -} - -#[test] -fn test_merge_empty() { - evaluate_test_sizes(|sz: usize| { - let c = new_config(); - - let mut d = Dataset::new(); - - let mut g1 = DDSketch::new(c); - let mut g2 = DDSketch::new(c); - let mut generator = generator::Exponential::new(5.0); - - for _ in 0..sz { - let value = generator.generate(); - g2.add(value); - d.add(value); - } - g1.merge(&g2).unwrap(); - compare_sketches(&mut d, &g1); - - let g3 = DDSketch::new(c); - g2.merge(&g3).unwrap(); - compare_sketches(&mut d, &g2); - }); -} - -#[test] -fn test_merge_mixed() { - evaluate_test_sizes(|sz: usize| { - let c = new_config(); - let mut d = Dataset::new(); - let mut g1 = DDSketch::new(c); - - let mut generator1 = generator::Normal::new(100.0, 1.0); - for _ in (0..sz).step_by(3) { - let value = generator1.generate(); - g1.add(value); - d.add(value); - } - - let mut g2 = DDSketch::new(c); - let mut generator2 = generator::Exponential::new(5.0); - for _ in (1..sz).step_by(3) { - let value = generator2.generate(); - g2.add(value); - d.add(value); - } - g1.merge(&g2).unwrap(); - - let mut g3 = DDSketch::new(c); - let mut generator3 = generator::Exponential::new(0.1); - for _ in (2..sz).step_by(3) { - let value = generator3.generate(); - g3.add(value); - d.add(value); - } - g1.merge(&g3).unwrap(); - - compare_sketches(&mut d, &g1); - }) -} - -#[test] -fn test_merge_incompatible() { - let c1 = Config::new(TEST_ALPHA, TEST_MAX_BINS, TEST_MIN_VALUE); - let c2 = Config::new(TEST_ALPHA * 2.0, TEST_MAX_BINS, TEST_MIN_VALUE); - - let mut d1 = DDSketch::new(c1); - let d2 = DDSketch::new(c2); - - assert!(d1.merge(&d2).is_err()); - - let c3 = Config::new(TEST_ALPHA, TEST_MAX_BINS, TEST_MIN_VALUE * 10.0); - let d3 = DDSketch::new(c3); - - assert!(d1.merge(&d3).is_err()); - - let c4 = Config::new(TEST_ALPHA, TEST_MAX_BINS * 2, TEST_MIN_VALUE); - let d4 = DDSketch::new(c4); - - assert!(d1.merge(&d4).is_err()); - - // the same should work - let c5 = Config::new(TEST_ALPHA, TEST_MAX_BINS, TEST_MIN_VALUE); - let dsame = DDSketch::new(c5); - assert!(d1.merge(&dsame).is_ok()); -} - -#[test] -#[ignore] -fn test_performance_insert() { - let c = Config::defaults(); - let mut g = DDSketch::new(c); - let mut gen = generator::Normal::new(1000.0, 500.0); - let count = 300_000_000; - - let mut values = Vec::new(); - for _ in 0..count { - values.push(gen.generate()); - } - - let start_time = Instant::now(); - for value in values { - g.add(value); - } - - // This simply ensures the operations don't get optimzed out as ignored - let quantile = g.quantile(0.50).unwrap().unwrap(); - - let elapsed = start_time.elapsed().as_micros() as f64; - let elapsed = elapsed / 1_000_000.0; - - println!( - "RESULT: p50={:.2} => Added {}M samples in {:2} secs ({:.2}M samples/sec)", - quantile, - count / 1_000_000, - elapsed, - (count as f64) / 1_000_000.0 / elapsed - ); -} - -#[test] -#[ignore] -fn test_performance_merge() { - let c = Config::defaults(); - let mut gen = generator::Normal::new(1000.0, 500.0); - let merge_count = 500_000; - let sample_count = 1_000; - let mut sketches = Vec::new(); - - for _ in 0..merge_count { - let mut d = DDSketch::new(c); - for _ in 0..sample_count { - d.add(gen.generate()); - } - sketches.push(d); - } - - let mut base = DDSketch::new(c); - - let start_time = Instant::now(); - for sketch in &sketches { - base.merge(sketch).unwrap(); - } - - let elapsed = start_time.elapsed().as_micros() as f64; - let elapsed = elapsed / 1_000_000.0; - - println!( - "RESULT: Merged {} sketches in {:2} secs ({:.2} merges/sec)", - merge_count, - elapsed, - (merge_count as f64) / elapsed - ); -}