mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-03-17 18:50:41 +00:00
Compare commits
11 Commits
composite-
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
68a9066d13 | ||
|
|
d02559a4d1 | ||
|
|
1922abaf33 | ||
|
|
d0c5ffb0aa | ||
|
|
18fedd9384 | ||
|
|
2098fca47f | ||
|
|
1251b40c93 | ||
|
|
09a49b872c | ||
|
|
b9ace002ce | ||
|
|
51f340f83d | ||
|
|
57fe659fff |
@@ -47,7 +47,7 @@ rustc-hash = "2.0.0"
|
||||
thiserror = "2.0.1"
|
||||
htmlescape = "0.3.1"
|
||||
fail = { version = "0.5.0", optional = true }
|
||||
time = { version = "0.3.35", features = ["serde-well-known"] }
|
||||
time = { version = "0.3.47", features = ["serde-well-known"] }
|
||||
smallvec = "1.8.0"
|
||||
rayon = "1.5.2"
|
||||
lru = "0.16.3"
|
||||
@@ -64,7 +64,7 @@ query-grammar = { version = "0.25.0", path = "./query-grammar", package = "tanti
|
||||
tantivy-bitpacker = { version = "0.9", path = "./bitpacker" }
|
||||
common = { version = "0.10", path = "./common/", package = "tantivy-common" }
|
||||
tokenizer-api = { version = "0.6", path = "./tokenizer-api", package = "tantivy-tokenizer-api" }
|
||||
sketches-ddsketch = { path = "./sketches-ddsketch", features = ["use_serde"] }
|
||||
sketches-ddsketch = { git = "https://github.com/quickwit-oss/rust-sketches-ddsketch.git", rev = "555caf1", features = ["use_serde"] }
|
||||
datasketches = "0.2.0"
|
||||
futures-util = { version = "0.3.28", optional = true }
|
||||
futures-channel = { version = "0.3.28", optional = true }
|
||||
@@ -86,7 +86,7 @@ futures = "0.3.21"
|
||||
paste = "1.0.11"
|
||||
more-asserts = "0.3.1"
|
||||
rand_distr = "0.5"
|
||||
time = { version = "0.3.10", features = ["serde-well-known", "macros"] }
|
||||
time = { version = "0.3.47", features = ["serde-well-known", "macros"] }
|
||||
postcard = { version = "1.0.4", features = [
|
||||
"use-std",
|
||||
], default-features = false }
|
||||
@@ -144,7 +144,6 @@ members = [
|
||||
"sstable",
|
||||
"tokenizer-api",
|
||||
"columnar",
|
||||
"sketches-ddsketch",
|
||||
]
|
||||
|
||||
# Following the "fail" crate best practises, we isolate
|
||||
@@ -202,4 +201,3 @@ harness = false
|
||||
[[bench]]
|
||||
name = "regex_all_terms"
|
||||
harness = false
|
||||
|
||||
|
||||
@@ -15,11 +15,10 @@ repository = "https://github.com/quickwit-oss/tantivy"
|
||||
byteorder = "1.4.3"
|
||||
ownedbytes = { version= "0.9", path="../ownedbytes" }
|
||||
async-trait = "0.1"
|
||||
time = { version = "0.3.10", features = ["serde-well-known"] }
|
||||
time = { version = "0.3.47", features = ["serde-well-known"] }
|
||||
serde = { version = "1.0.136", features = ["derive"] }
|
||||
|
||||
[dev-dependencies]
|
||||
binggan = "0.14.0"
|
||||
proptest = "1.0.0"
|
||||
rand = "0.9"
|
||||
|
||||
|
||||
@@ -62,7 +62,9 @@ impl<W: TerminatingWrite> TerminatingWrite for CountingWriter<W> {
|
||||
pub struct AntiCallToken(());
|
||||
|
||||
/// Trait used to indicate when no more write need to be done on a writer
|
||||
pub trait TerminatingWrite: Write + Send + Sync {
|
||||
///
|
||||
/// Thread-safety is enforced at the call sites that require it.
|
||||
pub trait TerminatingWrite: Write {
|
||||
/// Indicate that the writer will no longer be used. Internally call terminate_ref.
|
||||
fn terminate(mut self) -> io::Result<()>
|
||||
where Self: Sized {
|
||||
|
||||
@@ -1,27 +0,0 @@
|
||||
[package]
|
||||
name = "sketches-ddsketch"
|
||||
version = "0.3.0"
|
||||
authors = ["Mike Heffner <mikeh@fesnel.com>"]
|
||||
edition = "2018"
|
||||
license = "Apache-2.0"
|
||||
readme = "README.md"
|
||||
repository = "https://github.com/mheffner/rust-sketches-ddsketch"
|
||||
homepage = "https://github.com/mheffner/rust-sketches-ddsketch"
|
||||
description = """
|
||||
A direct port of the Golang DDSketch implementation.
|
||||
"""
|
||||
exclude = [".gitignore"]
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
serde = { package = "serde", version = "1.0", optional = true, features = ["derive", "serde_derive"] }
|
||||
|
||||
[dev-dependencies]
|
||||
approx = "0.5.1"
|
||||
rand = "0.8.5"
|
||||
rand_distr = "0.4.3"
|
||||
|
||||
[features]
|
||||
use_serde = ["serde", "serde/derive"]
|
||||
|
||||
@@ -1,201 +0,0 @@
|
||||
Apache License
|
||||
Version 2.0, January 2004
|
||||
http://www.apache.org/licenses/
|
||||
|
||||
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
|
||||
|
||||
1. Definitions.
|
||||
|
||||
"License" shall mean the terms and conditions for use, reproduction,
|
||||
and distribution as defined by Sections 1 through 9 of this document.
|
||||
|
||||
"Licensor" shall mean the copyright owner or entity authorized by
|
||||
the copyright owner that is granting the License.
|
||||
|
||||
"Legal Entity" shall mean the union of the acting entity and all
|
||||
other entities that control, are controlled by, or are under common
|
||||
control with that entity. For the purposes of this definition,
|
||||
"control" means (i) the power, direct or indirect, to cause the
|
||||
direction or management of such entity, whether by contract or
|
||||
otherwise, or (ii) ownership of fifty percent (50%) or more of the
|
||||
outstanding shares, or (iii) beneficial ownership of such entity.
|
||||
|
||||
"You" (or "Your") shall mean an individual or Legal Entity
|
||||
exercising permissions granted by this License.
|
||||
|
||||
"Source" form shall mean the preferred form for making modifications,
|
||||
including but not limited to software source code, documentation
|
||||
source, and configuration files.
|
||||
|
||||
"Object" form shall mean any form resulting from mechanical
|
||||
transformation or translation of a Source form, including but
|
||||
not limited to compiled object code, generated documentation,
|
||||
and conversions to other media types.
|
||||
|
||||
"Work" shall mean the work of authorship, whether in Source or
|
||||
Object form, made available under the License, as indicated by a
|
||||
copyright notice that is included in or attached to the work
|
||||
(an example is provided in the Appendix below).
|
||||
|
||||
"Derivative Works" shall mean any work, whether in Source or Object
|
||||
form, that is based on (or derived from) the Work and for which the
|
||||
editorial revisions, annotations, elaborations, or other modifications
|
||||
represent, as a whole, an original work of authorship. For the purposes
|
||||
of this License, Derivative Works shall not include works that remain
|
||||
separable from, or merely link (or bind by name) to the interfaces of,
|
||||
the Work and Derivative Works thereof.
|
||||
|
||||
"Contribution" shall mean any work of authorship, including
|
||||
the original version of the Work and any modifications or additions
|
||||
to that Work or Derivative Works thereof, that is intentionally
|
||||
submitted to Licensor for inclusion in the Work by the copyright owner
|
||||
or by an individual or Legal Entity authorized to submit on behalf of
|
||||
the copyright owner. For the purposes of this definition, "submitted"
|
||||
means any form of electronic, verbal, or written communication sent
|
||||
to the Licensor or its representatives, including but not limited to
|
||||
communication on electronic mailing lists, source code control systems,
|
||||
and issue tracking systems that are managed by, or on behalf of, the
|
||||
Licensor for the purpose of discussing and improving the Work, but
|
||||
excluding communication that is conspicuously marked or otherwise
|
||||
designated in writing by the copyright owner as "Not a Contribution."
|
||||
|
||||
"Contributor" shall mean Licensor and any individual or Legal Entity
|
||||
on behalf of whom a Contribution has been received by Licensor and
|
||||
subsequently incorporated within the Work.
|
||||
|
||||
2. Grant of Copyright License. Subject to the terms and conditions of
|
||||
this License, each Contributor hereby grants to You a perpetual,
|
||||
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
|
||||
copyright license to reproduce, prepare Derivative Works of,
|
||||
publicly display, publicly perform, sublicense, and distribute the
|
||||
Work and such Derivative Works in Source or Object form.
|
||||
|
||||
3. Grant of Patent License. Subject to the terms and conditions of
|
||||
this License, each Contributor hereby grants to You a perpetual,
|
||||
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
|
||||
(except as stated in this section) patent license to make, have made,
|
||||
use, offer to sell, sell, import, and otherwise transfer the Work,
|
||||
where such license applies only to those patent claims licensable
|
||||
by such Contributor that are necessarily infringed by their
|
||||
Contribution(s) alone or by combination of their Contribution(s)
|
||||
with the Work to which such Contribution(s) was submitted. If You
|
||||
institute patent litigation against any entity (including a
|
||||
cross-claim or counterclaim in a lawsuit) alleging that the Work
|
||||
or a Contribution incorporated within the Work constitutes direct
|
||||
or contributory patent infringement, then any patent licenses
|
||||
granted to You under this License for that Work shall terminate
|
||||
as of the date such litigation is filed.
|
||||
|
||||
4. Redistribution. You may reproduce and distribute copies of the
|
||||
Work or Derivative Works thereof in any medium, with or without
|
||||
modifications, and in Source or Object form, provided that You
|
||||
meet the following conditions:
|
||||
|
||||
(a) You must give any other recipients of the Work or
|
||||
Derivative Works a copy of this License; and
|
||||
|
||||
(b) You must cause any modified files to carry prominent notices
|
||||
stating that You changed the files; and
|
||||
|
||||
(c) You must retain, in the Source form of any Derivative Works
|
||||
that You distribute, all copyright, patent, trademark, and
|
||||
attribution notices from the Source form of the Work,
|
||||
excluding those notices that do not pertain to any part of
|
||||
the Derivative Works; and
|
||||
|
||||
(d) If the Work includes a "NOTICE" text file as part of its
|
||||
distribution, then any Derivative Works that You distribute must
|
||||
include a readable copy of the attribution notices contained
|
||||
within such NOTICE file, excluding those notices that do not
|
||||
pertain to any part of the Derivative Works, in at least one
|
||||
of the following places: within a NOTICE text file distributed
|
||||
as part of the Derivative Works; within the Source form or
|
||||
documentation, if provided along with the Derivative Works; or,
|
||||
within a display generated by the Derivative Works, if and
|
||||
wherever such third-party notices normally appear. The contents
|
||||
of the NOTICE file are for informational purposes only and
|
||||
do not modify the License. You may add Your own attribution
|
||||
notices within Derivative Works that You distribute, alongside
|
||||
or as an addendum to the NOTICE text from the Work, provided
|
||||
that such additional attribution notices cannot be construed
|
||||
as modifying the License.
|
||||
|
||||
You may add Your own copyright statement to Your modifications and
|
||||
may provide additional or different license terms and conditions
|
||||
for use, reproduction, or distribution of Your modifications, or
|
||||
for any such Derivative Works as a whole, provided Your use,
|
||||
reproduction, and distribution of the Work otherwise complies with
|
||||
the conditions stated in this License.
|
||||
|
||||
5. Submission of Contributions. Unless You explicitly state otherwise,
|
||||
any Contribution intentionally submitted for inclusion in the Work
|
||||
by You to the Licensor shall be under the terms and conditions of
|
||||
this License, without any additional terms or conditions.
|
||||
Notwithstanding the above, nothing herein shall supersede or modify
|
||||
the terms of any separate license agreement you may have executed
|
||||
with Licensor regarding such Contributions.
|
||||
|
||||
6. Trademarks. This License does not grant permission to use the trade
|
||||
names, trademarks, service marks, or product names of the Licensor,
|
||||
except as required for reasonable and customary use in describing the
|
||||
origin of the Work and reproducing the content of the NOTICE file.
|
||||
|
||||
7. Disclaimer of Warranty. Unless required by applicable law or
|
||||
agreed to in writing, Licensor provides the Work (and each
|
||||
Contributor provides its Contributions) on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
implied, including, without limitation, any warranties or conditions
|
||||
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
|
||||
PARTICULAR PURPOSE. You are solely responsible for determining the
|
||||
appropriateness of using or redistributing the Work and assume any
|
||||
risks associated with Your exercise of permissions under this License.
|
||||
|
||||
8. Limitation of Liability. In no event and under no legal theory,
|
||||
whether in tort (including negligence), contract, or otherwise,
|
||||
unless required by applicable law (such as deliberate and grossly
|
||||
negligent acts) or agreed to in writing, shall any Contributor be
|
||||
liable to You for damages, including any direct, indirect, special,
|
||||
incidental, or consequential damages of any character arising as a
|
||||
result of this License or out of the use or inability to use the
|
||||
Work (including but not limited to damages for loss of goodwill,
|
||||
work stoppage, computer failure or malfunction, or any and all
|
||||
other commercial damages or losses), even if such Contributor
|
||||
has been advised of the possibility of such damages.
|
||||
|
||||
9. Accepting Warranty or Additional Liability. While redistributing
|
||||
the Work or Derivative Works thereof, You may choose to offer,
|
||||
and charge a fee for, acceptance of support, warranty, indemnity,
|
||||
or other liability obligations and/or rights consistent with this
|
||||
License. However, in accepting such obligations, You may act only
|
||||
on Your own behalf and on Your sole responsibility, not on behalf
|
||||
of any other Contributor, and only if You agree to indemnify,
|
||||
defend, and hold each Contributor harmless for any liability
|
||||
incurred by, or claims asserted against, such Contributor by reason
|
||||
of your accepting any such warranty or additional liability.
|
||||
|
||||
END OF TERMS AND CONDITIONS
|
||||
|
||||
APPENDIX: How to apply the Apache License to your work.
|
||||
|
||||
To apply the Apache License to your work, attach the following
|
||||
boilerplate notice, with the fields enclosed by brackets "[]"
|
||||
replaced with your own identifying information. (Don't include
|
||||
the brackets!) The text should be enclosed in the appropriate
|
||||
comment syntax for the file format. We also recommend that a
|
||||
file or class name and description of purpose be included on the
|
||||
same "printed page" as the copyright notice for easier
|
||||
identification within third-party archives.
|
||||
|
||||
Copyright [2019] [Mike Heffner]
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
@@ -1,11 +0,0 @@
|
||||
clean:
|
||||
cargo clean
|
||||
|
||||
test:
|
||||
cargo test
|
||||
|
||||
test_logs:
|
||||
cargo test -- --nocapture
|
||||
|
||||
test_performance:
|
||||
cargo test --release --jobs 1 test_performance -- --ignored --nocapture
|
||||
@@ -1,37 +0,0 @@
|
||||
# sketches-ddsketch
|
||||
|
||||
This is a direct port of the [Golang](https://github.com/DataDog/sketches-go)
|
||||
[DDSketch](https://arxiv.org/pdf/1908.10693.pdf) quantile sketch implementation
|
||||
to Rust. DDSketch is a fully-mergeable quantile sketch with relative-error
|
||||
guarantees and is extremely fast.
|
||||
|
||||
# DDSketch
|
||||
|
||||
* Sketch size automatically grows as needed, starting with 128 bins.
|
||||
* Extremely fast sample insertion and sketch merges.
|
||||
|
||||
## Usage
|
||||
|
||||
```rust
|
||||
use sketches_ddsketch::{Config, DDSketch};
|
||||
|
||||
let config = Config::defaults();
|
||||
let mut sketch = DDSketch::new(c);
|
||||
|
||||
sketch.add(1.0);
|
||||
sketch.add(1.0);
|
||||
sketch.add(1.0);
|
||||
|
||||
// Get p=50%
|
||||
let quantile = sketch.quantile(0.5).unwrap();
|
||||
assert_eq!(quantile, Some(1.0));
|
||||
```
|
||||
|
||||
## Performance
|
||||
|
||||
No performance tuning has been done with this implementation of the port, so we
|
||||
would expect similar profiles to the original implementation.
|
||||
|
||||
Out of the box we see can achieve over 70M sample inserts/sec and 350K sketch
|
||||
merges/sec. All tests run on a single core Intel i7 processor with 4.2Ghz max
|
||||
clock.
|
||||
@@ -1,98 +0,0 @@
|
||||
#[cfg(feature = "use_serde")]
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
const DEFAULT_MAX_BINS: u32 = 2048;
|
||||
const DEFAULT_ALPHA: f64 = 0.01;
|
||||
const DEFAULT_MIN_VALUE: f64 = 1.0e-9;
|
||||
|
||||
/// The configuration struct for constructing a `DDSketch`
|
||||
#[derive(Copy, Clone, Debug, PartialEq)]
|
||||
#[cfg_attr(feature = "use_serde", derive(Serialize, Deserialize))]
|
||||
pub struct Config {
|
||||
pub max_num_bins: u32,
|
||||
pub gamma: f64,
|
||||
pub(crate) gamma_ln: f64,
|
||||
pub(crate) min_value: f64,
|
||||
pub offset: i32,
|
||||
}
|
||||
|
||||
fn log_gamma(value: f64, gamma_ln: f64) -> f64 {
|
||||
value.ln() / gamma_ln
|
||||
}
|
||||
|
||||
impl Config {
|
||||
/// Construct a new `Config` struct with specific parameters. If you are unsure of how to
|
||||
/// configure this, the `defaults` method constructs a `Config` with built-in defaults.
|
||||
///
|
||||
/// `max_num_bins` is the max number of bins the DDSketch will grow to, in steps of 128 bins.
|
||||
pub fn new(alpha: f64, max_num_bins: u32, min_value: f64) -> Self {
|
||||
// Aligned with Java's LogarithmicMapping / LogLikeIndexMapping:
|
||||
// gamma = (1 + alpha) / (1 - alpha) (correctingFactor=1 for LogarithmicMapping)
|
||||
// gamma_ln = gamma.ln() (not ln_1p, to match Java's Math.log(gamma))
|
||||
// See: https://github.com/DataDog/sketches-java/blob/master/src/main/java/com/datadoghq/sketch/ddsketch/mapping/LogLikeIndexMapping.java (gamma() static method)
|
||||
// See: https://github.com/DataDog/sketches-java/blob/master/src/main/java/com/datadoghq/sketch/ddsketch/mapping/LogarithmicMapping.java (constructor, correctingFactor()=1)
|
||||
let gamma = (1.0 + alpha) / (1.0 - alpha);
|
||||
let gamma_ln = gamma.ln();
|
||||
|
||||
Config {
|
||||
max_num_bins,
|
||||
gamma,
|
||||
gamma_ln,
|
||||
min_value,
|
||||
offset: 1 - (log_gamma(min_value, gamma_ln) as i32),
|
||||
}
|
||||
}
|
||||
|
||||
/// Return a `Config` using built-in default settings
|
||||
pub fn defaults() -> Self {
|
||||
Self::new(DEFAULT_ALPHA, DEFAULT_MAX_BINS, DEFAULT_MIN_VALUE)
|
||||
}
|
||||
|
||||
pub fn key(&self, v: f64) -> i32 {
|
||||
// Aligned with Java's LogLikeIndexMapping.index(): floor-based indexing.
|
||||
// Java uses `(int) index` / `(int) index - 1` which is equivalent to floor().
|
||||
// See: https://github.com/DataDog/sketches-java/blob/master/src/main/java/com/datadoghq/sketch/ddsketch/mapping/LogLikeIndexMapping.java (index() method)
|
||||
self.log_gamma(v).floor() as i32
|
||||
}
|
||||
|
||||
pub fn value(&self, key: i32) -> f64 {
|
||||
// Aligned with Java's LogLikeIndexMapping.value():
|
||||
// lowerBound(index) * (1 + relativeAccuracy)
|
||||
// = logInverse((index - indexOffset) / multiplier) * (1 + relativeAccuracy)
|
||||
// = gamma^key * 2*gamma/(gamma+1)
|
||||
// See: https://github.com/DataDog/sketches-java/blob/master/src/main/java/com/datadoghq/sketch/ddsketch/mapping/LogLikeIndexMapping.java (value() and lowerBound() methods)
|
||||
self.pow_gamma(key) * (2.0 * self.gamma / (1.0 + self.gamma))
|
||||
}
|
||||
|
||||
pub fn log_gamma(&self, value: f64) -> f64 {
|
||||
log_gamma(value, self.gamma_ln)
|
||||
}
|
||||
|
||||
pub fn pow_gamma(&self, key: i32) -> f64 {
|
||||
((key as f64) * self.gamma_ln).exp()
|
||||
}
|
||||
|
||||
pub fn min_possible(&self) -> f64 {
|
||||
self.min_value
|
||||
}
|
||||
|
||||
/// Reconstruct a Config from a gamma value (as decoded from the binary format).
|
||||
/// Uses default max_num_bins and min_value.
|
||||
/// See Java: https://github.com/DataDog/sketches-java/blob/master/src/main/java/com/datadoghq/sketch/ddsketch/mapping/LogarithmicMapping.java (LogarithmicMapping(double gamma, double indexOffset) constructor)
|
||||
pub(crate) fn from_gamma(gamma: f64) -> Self {
|
||||
let gamma_ln = gamma.ln();
|
||||
Config {
|
||||
max_num_bins: DEFAULT_MAX_BINS,
|
||||
gamma,
|
||||
gamma_ln,
|
||||
min_value: DEFAULT_MIN_VALUE,
|
||||
offset: 1 - (log_gamma(DEFAULT_MIN_VALUE, gamma_ln) as i32),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for Config {
|
||||
fn default() -> Self {
|
||||
Self::new(DEFAULT_ALPHA, DEFAULT_MAX_BINS, DEFAULT_MIN_VALUE)
|
||||
}
|
||||
}
|
||||
@@ -1,385 +0,0 @@
|
||||
use std::{error, fmt};
|
||||
|
||||
#[cfg(feature = "use_serde")]
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::config::Config;
|
||||
use crate::store::Store;
|
||||
|
||||
type Result<T> = std::result::Result<T, DDSketchError>;
|
||||
|
||||
/// General error type for DDSketch, represents either an invalid quantile or an
|
||||
/// incompatible merge operation.
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum DDSketchError {
|
||||
Quantile,
|
||||
Merge,
|
||||
}
|
||||
impl fmt::Display for DDSketchError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
match self {
|
||||
DDSketchError::Quantile => {
|
||||
write!(f, "Invalid quantile, must be between 0 and 1 (inclusive)")
|
||||
}
|
||||
DDSketchError::Merge => write!(f, "Can not merge sketches with different configs"),
|
||||
}
|
||||
}
|
||||
}
|
||||
impl error::Error for DDSketchError {
|
||||
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
|
||||
// Generic
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// This struct represents a [DDSketch](https://arxiv.org/pdf/1908.10693.pdf)
|
||||
#[derive(Clone)]
|
||||
#[cfg_attr(feature = "use_serde", derive(Serialize, Deserialize))]
|
||||
pub struct DDSketch {
|
||||
pub(crate) config: Config,
|
||||
pub(crate) store: Store,
|
||||
pub(crate) negative_store: Store,
|
||||
pub(crate) min: f64,
|
||||
pub(crate) max: f64,
|
||||
pub(crate) sum: f64,
|
||||
pub(crate) zero_count: u64,
|
||||
}
|
||||
|
||||
impl Default for DDSketch {
|
||||
fn default() -> Self {
|
||||
Self::new(Default::default())
|
||||
}
|
||||
}
|
||||
|
||||
// XXX: functions should return Option<> in the case of empty
|
||||
impl DDSketch {
|
||||
/// Construct a `DDSketch`. Requires a `Config` specifying the parameters of the sketch
|
||||
pub fn new(config: Config) -> Self {
|
||||
DDSketch {
|
||||
config,
|
||||
store: Store::new(config.max_num_bins as usize),
|
||||
negative_store: Store::new(config.max_num_bins as usize),
|
||||
min: f64::INFINITY,
|
||||
max: f64::NEG_INFINITY,
|
||||
sum: 0.0,
|
||||
zero_count: 0,
|
||||
}
|
||||
}
|
||||
|
||||
/// Add the sample to the sketch
|
||||
pub fn add(&mut self, v: f64) {
|
||||
if v > self.config.min_possible() {
|
||||
let key = self.config.key(v);
|
||||
self.store.add(key);
|
||||
} else if v < -self.config.min_possible() {
|
||||
let key = self.config.key(-v);
|
||||
self.negative_store.add(key);
|
||||
} else {
|
||||
self.zero_count += 1;
|
||||
}
|
||||
|
||||
if v < self.min {
|
||||
self.min = v;
|
||||
}
|
||||
if self.max < v {
|
||||
self.max = v;
|
||||
}
|
||||
self.sum += v;
|
||||
}
|
||||
|
||||
/// Return the quantile value for quantiles between 0.0 and 1.0. Result is an error, represented
|
||||
/// as DDSketchError::Quantile if the requested quantile is outside of that range.
|
||||
///
|
||||
/// If the sketch is empty the result is None, else Some(v) for the quantile value.
|
||||
pub fn quantile(&self, q: f64) -> Result<Option<f64>> {
|
||||
if !(0.0..=1.0).contains(&q) {
|
||||
return Err(DDSketchError::Quantile);
|
||||
}
|
||||
|
||||
if self.empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
if q == 0.0 {
|
||||
return Ok(Some(self.min));
|
||||
} else if q == 1.0 {
|
||||
return Ok(Some(self.max));
|
||||
}
|
||||
|
||||
let rank = (q * (self.count() as f64 - 1.0)) as u64;
|
||||
let quantile;
|
||||
if rank < self.negative_store.count() {
|
||||
let reversed_rank = self.negative_store.count() - rank - 1;
|
||||
let key = self.negative_store.key_at_rank(reversed_rank);
|
||||
quantile = -self.config.value(key);
|
||||
} else if rank < self.zero_count + self.negative_store.count() {
|
||||
quantile = 0.0;
|
||||
} else {
|
||||
let key = self
|
||||
.store
|
||||
.key_at_rank(rank - self.zero_count - self.negative_store.count());
|
||||
quantile = self.config.value(key);
|
||||
}
|
||||
|
||||
Ok(Some(quantile))
|
||||
}
|
||||
|
||||
/// Returns the minimum value seen, or None if sketch is empty
|
||||
pub fn min(&self) -> Option<f64> {
|
||||
if self.empty() {
|
||||
None
|
||||
} else {
|
||||
Some(self.min)
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the maximum value seen, or None if sketch is empty
|
||||
pub fn max(&self) -> Option<f64> {
|
||||
if self.empty() {
|
||||
None
|
||||
} else {
|
||||
Some(self.max)
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the sum of values seen, or None if sketch is empty
|
||||
pub fn sum(&self) -> Option<f64> {
|
||||
if self.empty() {
|
||||
None
|
||||
} else {
|
||||
Some(self.sum)
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the number of values added to the sketch
|
||||
pub fn count(&self) -> usize {
|
||||
(self.store.count() + self.zero_count + self.negative_store.count()) as usize
|
||||
}
|
||||
|
||||
/// Returns the length of the underlying `Store`. This is mainly only useful for understanding
|
||||
/// how much the sketch has grown given the inserted values.
|
||||
pub fn length(&self) -> usize {
|
||||
self.store.length() as usize + self.negative_store.length() as usize
|
||||
}
|
||||
|
||||
/// Merge the contents of another sketch into this one. The sketch that is merged into this one
|
||||
/// is unchanged after the merge.
|
||||
pub fn merge(&mut self, o: &DDSketch) -> Result<()> {
|
||||
if self.config != o.config {
|
||||
return Err(DDSketchError::Merge);
|
||||
}
|
||||
|
||||
let was_empty = self.store.count() == 0;
|
||||
|
||||
// Merge the stores
|
||||
self.store.merge(&o.store);
|
||||
self.negative_store.merge(&o.negative_store);
|
||||
self.zero_count += o.zero_count;
|
||||
|
||||
// Need to ensure we don't override min/max with initializers
|
||||
// if either store were empty
|
||||
if was_empty {
|
||||
self.min = o.min;
|
||||
self.max = o.max;
|
||||
} else if o.store.count() > 0 {
|
||||
if o.min < self.min {
|
||||
self.min = o.min
|
||||
}
|
||||
if o.max > self.max {
|
||||
self.max = o.max;
|
||||
}
|
||||
}
|
||||
self.sum += o.sum;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn empty(&self) -> bool {
|
||||
self.count() == 0
|
||||
}
|
||||
|
||||
/// Encode this sketch into the Java-compatible binary format used by
|
||||
/// `com.datadoghq.sketch.ddsketch.DDSketchWithExactSummaryStatistics`.
|
||||
pub fn to_java_bytes(&self) -> Vec<u8> {
|
||||
crate::encoding::encode_to_java_bytes(self)
|
||||
}
|
||||
|
||||
/// Decode a sketch from the Java-compatible binary format.
|
||||
/// Accepts bytes produced by Java's `DDSketchWithExactSummaryStatistics.encode()`
|
||||
/// with or without the `0x02` version prefix.
|
||||
pub fn from_java_bytes(
|
||||
bytes: &[u8],
|
||||
) -> std::result::Result<Self, crate::encoding::DecodeError> {
|
||||
crate::encoding::decode_from_java_bytes(bytes)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use approx::assert_relative_eq;
|
||||
|
||||
use crate::{Config, DDSketch};
|
||||
|
||||
#[test]
|
||||
fn test_add_zero() {
|
||||
let alpha = 0.01;
|
||||
let c = Config::new(alpha, 2048, 10e-9);
|
||||
let mut dd = DDSketch::new(c);
|
||||
dd.add(0.0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_quartiles() {
|
||||
let alpha = 0.01;
|
||||
let c = Config::new(alpha, 2048, 10e-9);
|
||||
let mut dd = DDSketch::new(c);
|
||||
|
||||
// Initialize sketch with {1.0, 2.0, 3.0, 4.0}
|
||||
for i in 1..5 {
|
||||
dd.add(i as f64);
|
||||
}
|
||||
|
||||
// We expect the following mappings from quantile to value:
|
||||
// [0,0.33]: 1.0, (0.34,0.66]: 2.0, (0.67,0.99]: 3.0, (0.99, 1.0]: 4.0
|
||||
let test_cases = vec![
|
||||
(0.0, 1.0),
|
||||
(0.25, 1.0),
|
||||
(0.33, 1.0),
|
||||
(0.34, 2.0),
|
||||
(0.5, 2.0),
|
||||
(0.66, 2.0),
|
||||
(0.67, 3.0),
|
||||
(0.75, 3.0),
|
||||
(0.99, 3.0),
|
||||
(1.0, 4.0),
|
||||
];
|
||||
|
||||
for (q, val) in test_cases {
|
||||
assert_relative_eq!(dd.quantile(q).unwrap().unwrap(), val, max_relative = alpha);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_neg_quartiles() {
|
||||
let alpha = 0.01;
|
||||
let c = Config::new(alpha, 2048, 10e-9);
|
||||
let mut dd = DDSketch::new(c);
|
||||
|
||||
// Initialize sketch with {1.0, 2.0, 3.0, 4.0}
|
||||
for i in 1..5 {
|
||||
dd.add(-i as f64);
|
||||
}
|
||||
|
||||
let test_cases = vec![
|
||||
(0.0, -4.0),
|
||||
(0.25, -4.0),
|
||||
(0.5, -3.0),
|
||||
(0.75, -2.0),
|
||||
(1.0, -1.0),
|
||||
];
|
||||
|
||||
for (q, val) in test_cases {
|
||||
assert_relative_eq!(dd.quantile(q).unwrap().unwrap(), val, max_relative = alpha);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_simple_quantile() {
|
||||
let c = Config::defaults();
|
||||
let mut dd = DDSketch::new(c);
|
||||
|
||||
for i in 1..101 {
|
||||
dd.add(i as f64);
|
||||
}
|
||||
|
||||
assert_eq!(dd.quantile(0.95).unwrap().unwrap().ceil(), 95.0);
|
||||
|
||||
assert!(dd.quantile(-1.01).is_err());
|
||||
assert!(dd.quantile(1.01).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_empty_sketch() {
|
||||
let c = Config::defaults();
|
||||
let dd = DDSketch::new(c);
|
||||
|
||||
assert_eq!(dd.quantile(0.98).unwrap(), None);
|
||||
assert_eq!(dd.max(), None);
|
||||
assert_eq!(dd.min(), None);
|
||||
assert_eq!(dd.sum(), None);
|
||||
assert_eq!(dd.count(), 0);
|
||||
|
||||
assert!(dd.quantile(1.01).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_basic_histogram_data() {
|
||||
let values = &[
|
||||
0.754225035,
|
||||
0.752900282,
|
||||
0.752812246,
|
||||
0.752602367,
|
||||
0.754310155,
|
||||
0.753525981,
|
||||
0.752981082,
|
||||
0.752715536,
|
||||
0.751667941,
|
||||
0.755079054,
|
||||
0.753528150,
|
||||
0.755188464,
|
||||
0.752508723,
|
||||
0.750064549,
|
||||
0.753960428,
|
||||
0.751139298,
|
||||
0.752523560,
|
||||
0.753253428,
|
||||
0.753498342,
|
||||
0.751858358,
|
||||
0.752104636,
|
||||
0.753841300,
|
||||
0.754467374,
|
||||
0.753814334,
|
||||
0.750881719,
|
||||
0.753182556,
|
||||
0.752576884,
|
||||
0.753945708,
|
||||
0.753571911,
|
||||
0.752314573,
|
||||
0.752586651,
|
||||
];
|
||||
|
||||
let c = Config::defaults();
|
||||
let mut dd = DDSketch::new(c);
|
||||
|
||||
for value in values {
|
||||
dd.add(*value);
|
||||
}
|
||||
|
||||
assert_eq!(dd.max(), Some(0.755188464));
|
||||
assert_eq!(dd.min(), Some(0.750064549));
|
||||
assert_eq!(dd.count(), 31);
|
||||
assert_eq!(dd.sum(), Some(23.343630625000003));
|
||||
|
||||
assert!(dd.quantile(0.25).unwrap().is_some());
|
||||
assert!(dd.quantile(0.5).unwrap().is_some());
|
||||
assert!(dd.quantile(0.75).unwrap().is_some());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_length() {
|
||||
let mut dd = DDSketch::default();
|
||||
assert_eq!(dd.length(), 0);
|
||||
|
||||
dd.add(1.0);
|
||||
assert_eq!(dd.length(), 128);
|
||||
dd.add(2.0);
|
||||
dd.add(3.0);
|
||||
assert_eq!(dd.length(), 128);
|
||||
|
||||
dd.add(-1.0);
|
||||
assert_eq!(dd.length(), 256);
|
||||
dd.add(-2.0);
|
||||
dd.add(-3.0);
|
||||
assert_eq!(dd.length(), 256);
|
||||
}
|
||||
}
|
||||
@@ -1,813 +0,0 @@
|
||||
//! Java-compatible binary encoding/decoding for DDSketch.
|
||||
//!
|
||||
//! This module implements the binary format used by the Java
|
||||
//! `com.datadoghq.sketch.ddsketch.DDSketchWithExactSummaryStatistics` class
|
||||
//! from the DataDog/sketches-java library. It enables cross-language
|
||||
//! serialization so that sketches produced in Rust can be deserialized
|
||||
//! and merged by Java consumers.
|
||||
|
||||
use std::fmt;
|
||||
|
||||
use crate::config::Config;
|
||||
use crate::ddsketch::DDSketch;
|
||||
use crate::store::Store;
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Flag byte layout
|
||||
//
|
||||
// Each flag byte packs a 2-bit type ordinal in the low bits and a 6-bit
|
||||
// subflag in the upper bits: (subflag << 2) | type_ordinal
|
||||
// See: https://github.com/DataDog/sketches-java/blob/master/src/main/java/com/datadoghq/sketch/ddsketch/encoding/Flag.java
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// The 2-bit type field occupying the low bits of every flag byte.
|
||||
#[repr(u8)]
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
enum FlagType {
|
||||
SketchFeatures = 0,
|
||||
PositiveStore = 1,
|
||||
IndexMapping = 2,
|
||||
NegativeStore = 3,
|
||||
}
|
||||
|
||||
impl FlagType {
|
||||
fn from_byte(b: u8) -> Option<Self> {
|
||||
match b & 0x03 {
|
||||
0 => Some(Self::SketchFeatures),
|
||||
1 => Some(Self::PositiveStore),
|
||||
2 => Some(Self::IndexMapping),
|
||||
3 => Some(Self::NegativeStore),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Construct a flag byte from a subflag and a type.
|
||||
const fn flag(subflag: u8, flag_type: FlagType) -> u8 {
|
||||
(subflag << 2) | (flag_type as u8)
|
||||
}
|
||||
|
||||
// Pre-computed flag bytes for the sketch features we encode/decode.
|
||||
const FLAG_INDEX_MAPPING_LOG: u8 = flag(0, FlagType::IndexMapping); // 0x02
|
||||
const FLAG_ZERO_COUNT: u8 = flag(1, FlagType::SketchFeatures); // 0x04
|
||||
const FLAG_COUNT: u8 = flag(0x28, FlagType::SketchFeatures); // 0xA0
|
||||
const FLAG_SUM: u8 = flag(0x21, FlagType::SketchFeatures); // 0x84
|
||||
const FLAG_MIN: u8 = flag(0x22, FlagType::SketchFeatures); // 0x88
|
||||
const FLAG_MAX: u8 = flag(0x23, FlagType::SketchFeatures); // 0x8C
|
||||
|
||||
/// BinEncodingMode subflags for store flag bytes.
|
||||
/// See: https://github.com/DataDog/sketches-java/blob/master/src/main/java/com/datadoghq/sketch/ddsketch/encoding/BinEncodingMode.java
|
||||
#[repr(u8)]
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
enum BinEncodingMode {
|
||||
IndexDeltasAndCounts = 1,
|
||||
IndexDeltas = 2,
|
||||
ContiguousCounts = 3,
|
||||
}
|
||||
|
||||
impl BinEncodingMode {
|
||||
fn from_subflag(subflag: u8) -> Option<Self> {
|
||||
match subflag {
|
||||
1 => Some(Self::IndexDeltasAndCounts),
|
||||
2 => Some(Self::IndexDeltas),
|
||||
3 => Some(Self::ContiguousCounts),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const VAR_DOUBLE_ROTATE_DISTANCE: u32 = 6;
|
||||
const MAX_VAR_LEN_64: usize = 9;
|
||||
|
||||
const DEFAULT_MAX_BINS: u32 = 2048;
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Error type
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum DecodeError {
|
||||
UnexpectedEof,
|
||||
InvalidFlag(u8),
|
||||
InvalidData(String),
|
||||
}
|
||||
|
||||
impl fmt::Display for DecodeError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
Self::UnexpectedEof => write!(f, "unexpected end of input"),
|
||||
Self::InvalidFlag(b) => write!(f, "invalid flag byte: 0x{b:02X}"),
|
||||
Self::InvalidData(msg) => write!(f, "invalid data: {msg}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for DecodeError {}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// VarEncoding — bit-exact port of Java VarEncodingHelper
|
||||
// See: https://github.com/DataDog/sketches-java/blob/master/src/main/java/com/datadoghq/sketch/ddsketch/encoding/VarEncodingHelper.java
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
fn encode_unsigned_var_long(out: &mut Vec<u8>, mut value: u64) {
|
||||
let length = ((63 - value.leading_zeros() as i32) / 7).clamp(0, 8);
|
||||
for _ in 0..length {
|
||||
out.push((value as u8) | 0x80);
|
||||
value >>= 7;
|
||||
}
|
||||
out.push(value as u8);
|
||||
}
|
||||
|
||||
fn decode_unsigned_var_long(input: &mut &[u8]) -> Result<u64, DecodeError> {
|
||||
let mut value: u64 = 0;
|
||||
let mut shift: u32 = 0;
|
||||
loop {
|
||||
let next = read_byte(input)?;
|
||||
if next < 0x80 || shift == 56 {
|
||||
return Ok(value | (u64::from(next) << shift));
|
||||
}
|
||||
value |= (u64::from(next) & 0x7F) << shift;
|
||||
shift += 7;
|
||||
}
|
||||
}
|
||||
|
||||
/// ZigZag encode then var-long encode.
|
||||
fn encode_signed_var_long(out: &mut Vec<u8>, value: i64) {
|
||||
let encoded = ((value >> 63) ^ (value << 1)) as u64;
|
||||
encode_unsigned_var_long(out, encoded);
|
||||
}
|
||||
|
||||
fn decode_signed_var_long(input: &mut &[u8]) -> Result<i64, DecodeError> {
|
||||
let encoded = decode_unsigned_var_long(input)?;
|
||||
Ok(((encoded >> 1) as i64) ^ -((encoded & 1) as i64))
|
||||
}
|
||||
|
||||
fn double_to_var_bits(value: f64) -> u64 {
|
||||
let bits = f64::to_bits(value + 1.0).wrapping_sub(f64::to_bits(1.0));
|
||||
bits.rotate_left(VAR_DOUBLE_ROTATE_DISTANCE)
|
||||
}
|
||||
|
||||
fn var_bits_to_double(bits: u64) -> f64 {
|
||||
f64::from_bits(
|
||||
bits.rotate_right(VAR_DOUBLE_ROTATE_DISTANCE)
|
||||
.wrapping_add(f64::to_bits(1.0)),
|
||||
) - 1.0
|
||||
}
|
||||
|
||||
fn encode_var_double(out: &mut Vec<u8>, value: f64) {
|
||||
let mut bits = double_to_var_bits(value);
|
||||
for _ in 0..MAX_VAR_LEN_64 - 1 {
|
||||
let next = (bits >> 57) as u8;
|
||||
bits <<= 7;
|
||||
if bits == 0 {
|
||||
out.push(next);
|
||||
return;
|
||||
}
|
||||
out.push(next | 0x80);
|
||||
}
|
||||
out.push((bits >> 56) as u8);
|
||||
}
|
||||
|
||||
fn decode_var_double(input: &mut &[u8]) -> Result<f64, DecodeError> {
|
||||
let mut bits: u64 = 0;
|
||||
let mut shift: i32 = 57; // 8*8 - 7
|
||||
loop {
|
||||
let next = read_byte(input)?;
|
||||
if shift == 1 {
|
||||
bits |= u64::from(next);
|
||||
break;
|
||||
}
|
||||
if next < 0x80 {
|
||||
bits |= u64::from(next) << shift;
|
||||
break;
|
||||
}
|
||||
bits |= (u64::from(next) & 0x7F) << shift;
|
||||
shift -= 7;
|
||||
}
|
||||
Ok(var_bits_to_double(bits))
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Byte-level helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
fn read_byte(input: &mut &[u8]) -> Result<u8, DecodeError> {
|
||||
match input.split_first() {
|
||||
Some((&byte, rest)) => {
|
||||
*input = rest;
|
||||
Ok(byte)
|
||||
}
|
||||
None => Err(DecodeError::UnexpectedEof),
|
||||
}
|
||||
}
|
||||
|
||||
fn write_f64_le(out: &mut Vec<u8>, value: f64) {
|
||||
out.extend_from_slice(&value.to_le_bytes());
|
||||
}
|
||||
|
||||
fn read_f64_le(input: &mut &[u8]) -> Result<f64, DecodeError> {
|
||||
if input.len() < 8 {
|
||||
return Err(DecodeError::UnexpectedEof);
|
||||
}
|
||||
let (bytes, rest) = input.split_at(8);
|
||||
*input = rest;
|
||||
// bytes is guaranteed to be length 8 by the split_at above.
|
||||
let arr = [
|
||||
bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
|
||||
];
|
||||
Ok(f64::from_le_bytes(arr))
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Store encoding/decoding
|
||||
// See: https://github.com/DataDog/sketches-java/blob/master/src/main/java/com/datadoghq/sketch/ddsketch/store/DenseStore.java (encode/decode methods)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// Collect non-zero bins in the store as (absolute_index, count) pairs.
|
||||
///
|
||||
/// Allocation is acceptable here: this runs once per encode and the Vec
|
||||
/// has at most `max_num_bins` entries.
|
||||
fn collect_non_zero_bins(store: &Store) -> Vec<(i32, u64)> {
|
||||
if store.count == 0 {
|
||||
return Vec::new();
|
||||
}
|
||||
let start = (store.min_key - store.offset) as usize;
|
||||
let end = ((store.max_key - store.offset + 1) as usize).min(store.bins.len());
|
||||
store.bins[start..end]
|
||||
.iter()
|
||||
.enumerate()
|
||||
.filter(|&(_, &count)| count > 0)
|
||||
.map(|(i, &count)| (start as i32 + i as i32 + store.offset, count))
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn encode_store(out: &mut Vec<u8>, store: &Store, flag_type: FlagType) {
|
||||
let bins = collect_non_zero_bins(store);
|
||||
if bins.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
out.push(flag(BinEncodingMode::IndexDeltasAndCounts as u8, flag_type));
|
||||
encode_unsigned_var_long(out, bins.len() as u64);
|
||||
|
||||
let mut prev_index: i64 = 0;
|
||||
for &(index, count) in &bins {
|
||||
encode_signed_var_long(out, i64::from(index) - prev_index);
|
||||
encode_var_double(out, count as f64);
|
||||
prev_index = i64::from(index);
|
||||
}
|
||||
}
|
||||
|
||||
fn decode_store(input: &mut &[u8], subflag: u8, bin_limit: usize) -> Result<Store, DecodeError> {
|
||||
let mode = BinEncodingMode::from_subflag(subflag).ok_or_else(|| {
|
||||
DecodeError::InvalidData(format!("unknown bin encoding mode subflag: {subflag}"))
|
||||
})?;
|
||||
let num_bins = decode_unsigned_var_long(input)? as usize;
|
||||
let mut store = Store::new(bin_limit);
|
||||
|
||||
match mode {
|
||||
BinEncodingMode::IndexDeltasAndCounts => {
|
||||
let mut index: i64 = 0;
|
||||
for _ in 0..num_bins {
|
||||
index += decode_signed_var_long(input)?;
|
||||
let count = decode_var_double(input)?;
|
||||
store.add_count(index as i32, count as u64);
|
||||
}
|
||||
}
|
||||
BinEncodingMode::IndexDeltas => {
|
||||
let mut index: i64 = 0;
|
||||
for _ in 0..num_bins {
|
||||
index += decode_signed_var_long(input)?;
|
||||
store.add_count(index as i32, 1);
|
||||
}
|
||||
}
|
||||
BinEncodingMode::ContiguousCounts => {
|
||||
let start_index = decode_signed_var_long(input)?;
|
||||
let index_delta = decode_signed_var_long(input)?;
|
||||
let mut index = start_index;
|
||||
for _ in 0..num_bins {
|
||||
let count = decode_var_double(input)?;
|
||||
store.add_count(index as i32, count as u64);
|
||||
index += index_delta;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(store)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Top-level encode / decode
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// Encode a DDSketch into the Java-compatible binary format.
|
||||
///
|
||||
/// The output follows the encoding order of
|
||||
/// `DDSketchWithExactSummaryStatistics.encode()` then `DDSketch.encode()`:
|
||||
///
|
||||
/// 1. Summary statistics: COUNT, MIN, MAX (if count > 0)
|
||||
/// 2. SUM (if sum != 0)
|
||||
/// 3. Index mapping (LOG layout): gamma, indexOffset
|
||||
/// 4. Zero count (if > 0)
|
||||
/// 5. Positive store bins
|
||||
/// 6. Negative store bins
|
||||
pub fn encode_to_java_bytes(sketch: &DDSketch) -> Vec<u8> {
|
||||
let mut out = Vec::new();
|
||||
let count = sketch.count() as f64;
|
||||
|
||||
// Summary statistics (DDSketchWithExactSummaryStatistics.encode)
|
||||
if count != 0.0 {
|
||||
out.push(FLAG_COUNT);
|
||||
encode_var_double(&mut out, count);
|
||||
out.push(FLAG_MIN);
|
||||
write_f64_le(&mut out, sketch.min);
|
||||
out.push(FLAG_MAX);
|
||||
write_f64_le(&mut out, sketch.max);
|
||||
}
|
||||
if sketch.sum != 0.0 {
|
||||
out.push(FLAG_SUM);
|
||||
write_f64_le(&mut out, sketch.sum);
|
||||
}
|
||||
|
||||
// DDSketch.encode: index mapping + zero count + stores
|
||||
out.push(FLAG_INDEX_MAPPING_LOG);
|
||||
write_f64_le(&mut out, sketch.config.gamma);
|
||||
write_f64_le(&mut out, 0.0_f64);
|
||||
|
||||
if sketch.zero_count != 0 {
|
||||
out.push(FLAG_ZERO_COUNT);
|
||||
encode_var_double(&mut out, sketch.zero_count as f64);
|
||||
}
|
||||
|
||||
encode_store(&mut out, &sketch.store, FlagType::PositiveStore);
|
||||
encode_store(&mut out, &sketch.negative_store, FlagType::NegativeStore);
|
||||
|
||||
out
|
||||
}
|
||||
|
||||
/// Decode a DDSketch from the Java-compatible binary format.
|
||||
///
|
||||
/// Accepts bytes with or without a `0x02` version prefix.
|
||||
pub fn decode_from_java_bytes(bytes: &[u8]) -> Result<DDSketch, DecodeError> {
|
||||
if bytes.is_empty() {
|
||||
return Err(DecodeError::UnexpectedEof);
|
||||
}
|
||||
|
||||
let mut input = bytes;
|
||||
|
||||
// Skip optional version prefix (0x02 followed by a valid flag byte).
|
||||
if input.len() >= 2 && input[0] == 0x02 && is_valid_flag_byte(input[1]) {
|
||||
input = &input[1..];
|
||||
}
|
||||
|
||||
let mut gamma: Option<f64> = None;
|
||||
let mut zero_count: f64 = 0.0;
|
||||
let mut sum: f64 = 0.0;
|
||||
let mut min: f64 = f64::INFINITY;
|
||||
let mut max: f64 = f64::NEG_INFINITY;
|
||||
let mut positive_store: Option<Store> = None;
|
||||
let mut negative_store: Option<Store> = None;
|
||||
|
||||
while !input.is_empty() {
|
||||
let flag_byte = read_byte(&mut input)?;
|
||||
let flag_type =
|
||||
FlagType::from_byte(flag_byte).ok_or(DecodeError::InvalidFlag(flag_byte))?;
|
||||
let subflag = flag_byte >> 2;
|
||||
|
||||
match flag_type {
|
||||
FlagType::IndexMapping => {
|
||||
gamma = Some(read_f64_le(&mut input)?);
|
||||
let _index_offset = read_f64_le(&mut input)?;
|
||||
}
|
||||
FlagType::SketchFeatures => match flag_byte {
|
||||
FLAG_ZERO_COUNT => zero_count += decode_var_double(&mut input)?,
|
||||
FLAG_COUNT => {
|
||||
let _count = decode_var_double(&mut input)?;
|
||||
}
|
||||
FLAG_SUM => sum = read_f64_le(&mut input)?,
|
||||
FLAG_MIN => min = read_f64_le(&mut input)?,
|
||||
FLAG_MAX => max = read_f64_le(&mut input)?,
|
||||
_ => return Err(DecodeError::InvalidFlag(flag_byte)),
|
||||
},
|
||||
FlagType::PositiveStore => {
|
||||
positive_store = Some(decode_store(
|
||||
&mut input,
|
||||
subflag,
|
||||
DEFAULT_MAX_BINS as usize,
|
||||
)?);
|
||||
}
|
||||
FlagType::NegativeStore => {
|
||||
negative_store = Some(decode_store(
|
||||
&mut input,
|
||||
subflag,
|
||||
DEFAULT_MAX_BINS as usize,
|
||||
)?);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let g = gamma.unwrap_or_else(|| Config::defaults().gamma);
|
||||
let config = Config::from_gamma(g);
|
||||
let store = positive_store.unwrap_or_else(|| Store::new(config.max_num_bins as usize));
|
||||
let neg = negative_store.unwrap_or_else(|| Store::new(config.max_num_bins as usize));
|
||||
|
||||
Ok(DDSketch {
|
||||
config,
|
||||
store,
|
||||
negative_store: neg,
|
||||
min,
|
||||
max,
|
||||
sum,
|
||||
zero_count: zero_count as u64,
|
||||
})
|
||||
}
|
||||
|
||||
/// Check whether a byte is a valid flag byte for the DDSketch binary format.
|
||||
fn is_valid_flag_byte(b: u8) -> bool {
|
||||
// Known sketch-feature flags
|
||||
if matches!(
|
||||
b,
|
||||
FLAG_ZERO_COUNT | FLAG_COUNT | FLAG_SUM | FLAG_MIN | FLAG_MAX | FLAG_INDEX_MAPPING_LOG
|
||||
) {
|
||||
return true;
|
||||
}
|
||||
let Some(flag_type) = FlagType::from_byte(b) else {
|
||||
return false;
|
||||
};
|
||||
let subflag = b >> 2;
|
||||
match flag_type {
|
||||
FlagType::PositiveStore | FlagType::NegativeStore => (1..=3).contains(&subflag),
|
||||
FlagType::IndexMapping => subflag <= 4, // LOG=0, LOG_LINEAR=1 .. LOG_QUARTIC=4
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Tests
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::{Config, DDSketch};
|
||||
|
||||
// --- VarEncoding unit tests ---
|
||||
|
||||
#[test]
|
||||
fn test_unsigned_var_long_zero() {
|
||||
let mut buf = Vec::new();
|
||||
encode_unsigned_var_long(&mut buf, 0);
|
||||
assert_eq!(buf, [0x00]);
|
||||
|
||||
let mut input = buf.as_slice();
|
||||
assert_eq!(decode_unsigned_var_long(&mut input).unwrap(), 0);
|
||||
assert!(input.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_unsigned_var_long_small() {
|
||||
let mut buf = Vec::new();
|
||||
encode_unsigned_var_long(&mut buf, 1);
|
||||
assert_eq!(buf, [0x01]);
|
||||
|
||||
let mut input = buf.as_slice();
|
||||
assert_eq!(decode_unsigned_var_long(&mut input).unwrap(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_unsigned_var_long_128() {
|
||||
let mut buf = Vec::new();
|
||||
encode_unsigned_var_long(&mut buf, 128);
|
||||
assert_eq!(buf, [0x80, 0x01]);
|
||||
|
||||
let mut input = buf.as_slice();
|
||||
assert_eq!(decode_unsigned_var_long(&mut input).unwrap(), 128);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_unsigned_var_long_roundtrip() {
|
||||
for v in [0u64, 1, 127, 128, 255, 256, 16383, 16384, u64::MAX] {
|
||||
let mut buf = Vec::new();
|
||||
encode_unsigned_var_long(&mut buf, v);
|
||||
let mut input = buf.as_slice();
|
||||
let decoded = decode_unsigned_var_long(&mut input).unwrap();
|
||||
assert_eq!(decoded, v, "roundtrip failed for {}", v);
|
||||
assert!(input.is_empty());
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_signed_var_long_roundtrip() {
|
||||
for v in [0i64, 1, -1, 63, -64, 64, -65, i64::MAX, i64::MIN] {
|
||||
let mut buf = Vec::new();
|
||||
encode_signed_var_long(&mut buf, v);
|
||||
let mut input = buf.as_slice();
|
||||
let decoded = decode_signed_var_long(&mut input).unwrap();
|
||||
assert_eq!(decoded, v, "roundtrip failed for {}", v);
|
||||
assert!(input.is_empty());
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_var_double_roundtrip() {
|
||||
for v in [0.0, 1.0, 2.0, 5.0, 15.0, 42.0, 100.0, 1e-9, 1e15, 0.5, 7.77] {
|
||||
let mut buf = Vec::new();
|
||||
encode_var_double(&mut buf, v);
|
||||
let mut input = buf.as_slice();
|
||||
let decoded = decode_var_double(&mut input).unwrap();
|
||||
assert!(
|
||||
(decoded - v).abs() < 1e-15 || decoded == v,
|
||||
"roundtrip failed for {}: got {}",
|
||||
v,
|
||||
decoded,
|
||||
);
|
||||
assert!(input.is_empty());
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_var_double_small_integers() {
|
||||
let mut buf = Vec::new();
|
||||
encode_var_double(&mut buf, 1.0);
|
||||
assert_eq!(buf.len(), 1, "VarDouble(1.0) should be 1 byte");
|
||||
|
||||
buf.clear();
|
||||
encode_var_double(&mut buf, 5.0);
|
||||
assert_eq!(buf.len(), 1, "VarDouble(5.0) should be 1 byte");
|
||||
}
|
||||
|
||||
// --- DDSketch encode/decode roundtrip tests ---
|
||||
|
||||
#[test]
|
||||
fn test_encode_empty_sketch() {
|
||||
let sketch = DDSketch::new(Config::defaults());
|
||||
let bytes = sketch.to_java_bytes();
|
||||
assert!(!bytes.is_empty());
|
||||
|
||||
let decoded = DDSketch::from_java_bytes(&bytes).unwrap();
|
||||
assert_eq!(decoded.count(), 0);
|
||||
assert_eq!(decoded.min(), None);
|
||||
assert_eq!(decoded.max(), None);
|
||||
assert_eq!(decoded.sum(), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_encode_simple_sketch() {
|
||||
let mut sketch = DDSketch::new(Config::defaults());
|
||||
for v in [1.0, 2.0, 3.0, 4.0, 5.0] {
|
||||
sketch.add(v);
|
||||
}
|
||||
|
||||
let bytes = sketch.to_java_bytes();
|
||||
let decoded = DDSketch::from_java_bytes(&bytes).unwrap();
|
||||
|
||||
assert_eq!(decoded.count(), 5);
|
||||
assert_eq!(decoded.min(), Some(1.0));
|
||||
assert_eq!(decoded.max(), Some(5.0));
|
||||
assert_eq!(decoded.sum(), Some(15.0));
|
||||
|
||||
assert_quantiles_match(&sketch, &decoded, &[0.5, 0.9, 0.95, 0.99]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_encode_single_value() {
|
||||
let mut sketch = DDSketch::new(Config::defaults());
|
||||
sketch.add(42.0);
|
||||
|
||||
let bytes = sketch.to_java_bytes();
|
||||
let decoded = DDSketch::from_java_bytes(&bytes).unwrap();
|
||||
|
||||
assert_eq!(decoded.count(), 1);
|
||||
assert_eq!(decoded.min(), Some(42.0));
|
||||
assert_eq!(decoded.max(), Some(42.0));
|
||||
assert_eq!(decoded.sum(), Some(42.0));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_encode_negative_values() {
|
||||
let mut sketch = DDSketch::new(Config::defaults());
|
||||
for v in [-3.0, -1.0, 2.0, 5.0] {
|
||||
sketch.add(v);
|
||||
}
|
||||
|
||||
let bytes = sketch.to_java_bytes();
|
||||
let decoded = DDSketch::from_java_bytes(&bytes).unwrap();
|
||||
|
||||
assert_eq!(decoded.count(), 4);
|
||||
assert_eq!(decoded.min(), Some(-3.0));
|
||||
assert_eq!(decoded.max(), Some(5.0));
|
||||
assert_eq!(decoded.sum(), Some(3.0));
|
||||
|
||||
assert_quantiles_match(&sketch, &decoded, &[0.0, 0.25, 0.5, 0.75, 1.0]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_encode_with_zero_value() {
|
||||
let mut sketch = DDSketch::new(Config::defaults());
|
||||
for v in [0.0, 1.0, 2.0] {
|
||||
sketch.add(v);
|
||||
}
|
||||
|
||||
let bytes = sketch.to_java_bytes();
|
||||
let decoded = DDSketch::from_java_bytes(&bytes).unwrap();
|
||||
|
||||
assert_eq!(decoded.count(), 3);
|
||||
assert_eq!(decoded.min(), Some(0.0));
|
||||
assert_eq!(decoded.max(), Some(2.0));
|
||||
assert_eq!(decoded.sum(), Some(3.0));
|
||||
assert_eq!(decoded.zero_count, 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_encode_large_range() {
|
||||
let mut sketch = DDSketch::new(Config::defaults());
|
||||
sketch.add(0.001);
|
||||
sketch.add(1_000_000.0);
|
||||
|
||||
let bytes = sketch.to_java_bytes();
|
||||
let decoded = DDSketch::from_java_bytes(&bytes).unwrap();
|
||||
|
||||
assert_eq!(decoded.count(), 2);
|
||||
assert_eq!(decoded.min(), Some(0.001));
|
||||
assert_eq!(decoded.max(), Some(1_000_000.0));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_encode_with_version_prefix() {
|
||||
let mut sketch = DDSketch::new(Config::defaults());
|
||||
for v in [1.0, 2.0, 3.0] {
|
||||
sketch.add(v);
|
||||
}
|
||||
|
||||
let bytes = sketch.to_java_bytes();
|
||||
|
||||
// Simulate Java's toByteArrayV2: prepend 0x02
|
||||
let mut v2_bytes = vec![0x02];
|
||||
v2_bytes.extend_from_slice(&bytes);
|
||||
|
||||
let decoded = DDSketch::from_java_bytes(&v2_bytes).unwrap();
|
||||
assert_eq!(decoded.count(), 3);
|
||||
assert_eq!(decoded.min(), Some(1.0));
|
||||
assert_eq!(decoded.max(), Some(3.0));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_byte_level_encoding() {
|
||||
let mut sketch = DDSketch::new(Config::defaults());
|
||||
sketch.add(1.0);
|
||||
|
||||
let bytes = sketch.to_java_bytes();
|
||||
|
||||
assert_eq!(bytes[0], FLAG_COUNT, "first byte should be COUNT flag");
|
||||
assert!(
|
||||
bytes.contains(&FLAG_INDEX_MAPPING_LOG),
|
||||
"should contain index mapping flag"
|
||||
);
|
||||
}
|
||||
|
||||
// --- Cross-language golden byte tests ---
|
||||
//
|
||||
// Golden bytes generated by Java's DDSketchWithExactSummaryStatistics.encode()
|
||||
// using LogarithmicMapping(0.01) + CollapsingLowestDenseStore(2048).
|
||||
|
||||
const GOLDEN_SIMPLE: &str = "a00588000000000000f03f8c0000000000001440840000000000002e4002fd4a815abf52f03f000000000000000005050002440228021e021602";
|
||||
const GOLDEN_SINGLE: &str = "a0028800000000000045408c000000000000454084000000000000454002fd4a815abf52f03f00000000000000000501f40202";
|
||||
const GOLDEN_NEGATIVE: &str = "a084408800000000000008c08c000000000000144084000000000000084002fd4a815abf52f03f0000000000000000050244025c02070200026c02";
|
||||
const GOLDEN_ZERO: &str = "a0048800000000000000008c000000000000004084000000000000084002fd4a815abf52f03f00000000000000000402050200024402";
|
||||
const GOLDEN_EMPTY: &str = "02fd4a815abf52f03f0000000000000000";
|
||||
const GOLDEN_MANY: &str = "a08d1488000000000000f03f8c0000000000005940840000000000bab34002fd4a815abf52f03f000000000000000005550002440228021e021602120210020c020c020c0208020a020802060208020602060206020602040206020402040204020402040204020402040204020202040202020402020204020202020204020202020202020402020202020202020202020202020202020202020202020202020202020203020202020202020302020202020302020202020302020203020202030202020302030202020302030203020202030203020302030202";
|
||||
|
||||
fn hex_to_bytes(hex: &str) -> Vec<u8> {
|
||||
(0..hex.len())
|
||||
.step_by(2)
|
||||
.map(|i| u8::from_str_radix(&hex[i..i + 2], 16).unwrap())
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn bytes_to_hex(bytes: &[u8]) -> String {
|
||||
bytes.iter().map(|b| format!("{b:02x}")).collect()
|
||||
}
|
||||
|
||||
fn assert_golden(label: &str, sketch: &DDSketch, golden_hex: &str) {
|
||||
let bytes = sketch.to_java_bytes();
|
||||
let expected = hex_to_bytes(golden_hex);
|
||||
assert_eq!(
|
||||
bytes,
|
||||
expected,
|
||||
"Rust encoding doesn't match Java golden bytes for {}.\nRust: {}\nJava: {}",
|
||||
label,
|
||||
bytes_to_hex(&bytes),
|
||||
golden_hex,
|
||||
);
|
||||
}
|
||||
|
||||
fn assert_quantiles_match(a: &DDSketch, b: &DDSketch, quantiles: &[f64]) {
|
||||
for &q in quantiles {
|
||||
let va = a.quantile(q).unwrap().unwrap();
|
||||
let vb = b.quantile(q).unwrap().unwrap();
|
||||
assert!(
|
||||
(va - vb).abs() / va.abs().max(1e-15) < 1e-12,
|
||||
"quantile({}) mismatch: {} vs {}",
|
||||
q,
|
||||
va,
|
||||
vb,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_cross_language_simple() {
|
||||
let mut sketch = DDSketch::new(Config::defaults());
|
||||
for v in [1.0, 2.0, 3.0, 4.0, 5.0] {
|
||||
sketch.add(v);
|
||||
}
|
||||
assert_golden("SIMPLE", &sketch, GOLDEN_SIMPLE);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_cross_language_single() {
|
||||
let mut sketch = DDSketch::new(Config::defaults());
|
||||
sketch.add(42.0);
|
||||
assert_golden("SINGLE", &sketch, GOLDEN_SINGLE);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_cross_language_negative() {
|
||||
let mut sketch = DDSketch::new(Config::defaults());
|
||||
for v in [-3.0, -1.0, 2.0, 5.0] {
|
||||
sketch.add(v);
|
||||
}
|
||||
assert_golden("NEGATIVE", &sketch, GOLDEN_NEGATIVE);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_cross_language_zero() {
|
||||
let mut sketch = DDSketch::new(Config::defaults());
|
||||
for v in [0.0, 1.0, 2.0] {
|
||||
sketch.add(v);
|
||||
}
|
||||
assert_golden("ZERO", &sketch, GOLDEN_ZERO);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_cross_language_empty() {
|
||||
let sketch = DDSketch::new(Config::defaults());
|
||||
assert_golden("EMPTY", &sketch, GOLDEN_EMPTY);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_cross_language_many() {
|
||||
let mut sketch = DDSketch::new(Config::defaults());
|
||||
for i in 1..=100 {
|
||||
sketch.add(i as f64);
|
||||
}
|
||||
assert_golden("MANY", &sketch, GOLDEN_MANY);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_decode_java_golden_bytes() {
|
||||
for (name, hex) in [
|
||||
("SIMPLE", GOLDEN_SIMPLE),
|
||||
("SINGLE", GOLDEN_SINGLE),
|
||||
("NEGATIVE", GOLDEN_NEGATIVE),
|
||||
("ZERO", GOLDEN_ZERO),
|
||||
("EMPTY", GOLDEN_EMPTY),
|
||||
("MANY", GOLDEN_MANY),
|
||||
] {
|
||||
let bytes = hex_to_bytes(hex);
|
||||
let result = DDSketch::from_java_bytes(&bytes);
|
||||
assert!(
|
||||
result.is_ok(),
|
||||
"failed to decode {}: {:?}",
|
||||
name,
|
||||
result.err()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_encode_decode_many_values() {
|
||||
let mut sketch = DDSketch::new(Config::defaults());
|
||||
for i in 1..=100 {
|
||||
sketch.add(i as f64);
|
||||
}
|
||||
|
||||
let bytes = sketch.to_java_bytes();
|
||||
let decoded = DDSketch::from_java_bytes(&bytes).unwrap();
|
||||
|
||||
assert_eq!(decoded.count(), 100);
|
||||
assert_eq!(decoded.min(), Some(1.0));
|
||||
assert_eq!(decoded.max(), Some(100.0));
|
||||
assert_eq!(decoded.sum(), Some(5050.0));
|
||||
|
||||
let alpha = 0.01;
|
||||
let orig_p95 = sketch.quantile(0.95).unwrap().unwrap();
|
||||
let dec_p95 = decoded.quantile(0.95).unwrap().unwrap();
|
||||
assert!(
|
||||
(orig_p95 - dec_p95).abs() / orig_p95 < alpha,
|
||||
"p95 mismatch: {} vs {}",
|
||||
orig_p95,
|
||||
dec_p95,
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -1,52 +0,0 @@
|
||||
//! This crate provides a direct port of the [Golang](https://github.com/DataDog/sketches-go)
|
||||
//! [DDSketch](https://arxiv.org/pdf/1908.10693.pdf) implementation to Rust. All efforts
|
||||
//! have been made to keep this as close to the original implementation as possible, with a few
|
||||
//! tweaks to get closer to idiomatic Rust.
|
||||
//!
|
||||
//! # Usage
|
||||
//!
|
||||
//! Add multiple samples to a DDSketch and invoke the `quantile` method to pull any quantile from
|
||||
//! 0.0* to *1.0*.
|
||||
//!
|
||||
//! ```rust
|
||||
//! use sketches_ddsketch::{Config, DDSketch};
|
||||
//!
|
||||
//! let c = Config::defaults();
|
||||
//! let mut d = DDSketch::new(c);
|
||||
//!
|
||||
//! d.add(1.0);
|
||||
//! d.add(1.0);
|
||||
//! d.add(1.0);
|
||||
//!
|
||||
//! let q = d.quantile(0.50).unwrap();
|
||||
//!
|
||||
//! assert!(q < Some(1.02));
|
||||
//! assert!(q > Some(0.98));
|
||||
//! ```
|
||||
//!
|
||||
//! Sketches can also be merged.
|
||||
//!
|
||||
//! ```rust
|
||||
//! use sketches_ddsketch::{Config, DDSketch};
|
||||
//!
|
||||
//! let c = Config::defaults();
|
||||
//! let mut d1 = DDSketch::new(c);
|
||||
//! let mut d2 = DDSketch::new(c);
|
||||
//!
|
||||
//! d1.add(1.0);
|
||||
//! d2.add(2.0);
|
||||
//! d2.add(2.0);
|
||||
//!
|
||||
//! d1.merge(&d2);
|
||||
//!
|
||||
//! assert_eq!(d1.count(), 3);
|
||||
//! ```
|
||||
|
||||
pub use self::config::Config;
|
||||
pub use self::ddsketch::{DDSketch, DDSketchError};
|
||||
pub use self::encoding::DecodeError;
|
||||
|
||||
mod config;
|
||||
mod ddsketch;
|
||||
pub mod encoding;
|
||||
mod store;
|
||||
@@ -1,252 +0,0 @@
|
||||
#[cfg(feature = "use_serde")]
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
const CHUNK_SIZE: i32 = 128;
|
||||
|
||||
// Divide the `dividend` by the `divisor`, rounding towards positive infinity.
|
||||
//
|
||||
// Similar to the nightly only `std::i32::div_ceil`.
|
||||
fn div_ceil(dividend: i32, divisor: i32) -> i32 {
|
||||
(dividend + divisor - 1) / divisor
|
||||
}
|
||||
|
||||
/// CollapsingLowestDenseStore
|
||||
#[derive(Clone, Debug)]
|
||||
#[cfg_attr(feature = "use_serde", derive(Serialize, Deserialize))]
|
||||
pub struct Store {
|
||||
pub(crate) bins: Vec<u64>,
|
||||
pub(crate) count: u64,
|
||||
pub(crate) min_key: i32,
|
||||
pub(crate) max_key: i32,
|
||||
pub(crate) offset: i32,
|
||||
pub(crate) bin_limit: usize,
|
||||
is_collapsed: bool,
|
||||
}
|
||||
|
||||
impl Store {
|
||||
pub fn new(bin_limit: usize) -> Self {
|
||||
Store {
|
||||
bins: Vec::new(),
|
||||
count: 0,
|
||||
min_key: i32::MAX,
|
||||
max_key: i32::MIN,
|
||||
offset: 0,
|
||||
bin_limit,
|
||||
is_collapsed: false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Return the number of bins.
|
||||
pub fn length(&self) -> i32 {
|
||||
self.bins.len() as i32
|
||||
}
|
||||
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.bins.is_empty()
|
||||
}
|
||||
|
||||
pub fn add(&mut self, key: i32) {
|
||||
let idx = self.get_index(key);
|
||||
self.bins[idx] += 1;
|
||||
self.count += 1;
|
||||
}
|
||||
|
||||
/// See Java: https://github.com/DataDog/sketches-java/blob/master/src/main/java/com/datadoghq/sketch/ddsketch/store/DenseStore.java (add(int index, double count) method)
|
||||
pub(crate) fn add_count(&mut self, key: i32, count: u64) {
|
||||
let idx = self.get_index(key);
|
||||
self.bins[idx] += count;
|
||||
self.count += count;
|
||||
}
|
||||
|
||||
fn get_index(&mut self, key: i32) -> usize {
|
||||
if key < self.min_key {
|
||||
if self.is_collapsed {
|
||||
return 0;
|
||||
}
|
||||
|
||||
self.extend_range(key, None);
|
||||
if self.is_collapsed {
|
||||
return 0;
|
||||
}
|
||||
} else if key > self.max_key {
|
||||
self.extend_range(key, None);
|
||||
}
|
||||
|
||||
(key - self.offset) as usize
|
||||
}
|
||||
|
||||
fn extend_range(&mut self, key: i32, second_key: Option<i32>) {
|
||||
let second_key = second_key.unwrap_or(key);
|
||||
let new_min_key = i32::min(key, i32::min(second_key, self.min_key));
|
||||
let new_max_key = i32::max(key, i32::max(second_key, self.max_key));
|
||||
|
||||
if self.is_empty() {
|
||||
let new_len = self.get_new_length(new_min_key, new_max_key);
|
||||
self.bins.resize(new_len, 0);
|
||||
self.offset = new_min_key;
|
||||
self.adjust(new_min_key, new_max_key);
|
||||
} else if new_min_key >= self.min_key && new_max_key < self.offset + self.length() {
|
||||
self.min_key = new_min_key;
|
||||
self.max_key = new_max_key;
|
||||
} else {
|
||||
// Grow bins
|
||||
let new_length = self.get_new_length(new_min_key, new_max_key);
|
||||
if new_length > self.length() as usize {
|
||||
self.bins.resize(new_length, 0);
|
||||
}
|
||||
self.adjust(new_min_key, new_max_key);
|
||||
}
|
||||
}
|
||||
|
||||
fn get_new_length(&self, new_min_key: i32, new_max_key: i32) -> usize {
|
||||
let desired_length = new_max_key - new_min_key + 1;
|
||||
usize::min(
|
||||
(CHUNK_SIZE * div_ceil(desired_length, CHUNK_SIZE)) as usize,
|
||||
self.bin_limit,
|
||||
)
|
||||
}
|
||||
|
||||
fn adjust(&mut self, new_min_key: i32, new_max_key: i32) {
|
||||
if new_max_key - new_min_key + 1 > self.length() {
|
||||
let new_min_key = new_max_key - self.length() + 1;
|
||||
|
||||
if new_min_key >= self.max_key {
|
||||
// Put everything in the first bin.
|
||||
self.offset = new_min_key;
|
||||
self.min_key = new_min_key;
|
||||
self.bins.fill(0);
|
||||
self.bins[0] = self.count;
|
||||
} else {
|
||||
let shift = self.offset - new_min_key;
|
||||
if shift < 0 {
|
||||
let collapse_start_index = (self.min_key - self.offset) as usize;
|
||||
let collapse_end_index = (new_min_key - self.offset) as usize;
|
||||
let collapsed_count: u64 = self.bins[collapse_start_index..collapse_end_index]
|
||||
.iter()
|
||||
.sum();
|
||||
let zero_len = (new_min_key - self.min_key) as usize;
|
||||
self.bins.splice(
|
||||
collapse_start_index..collapse_end_index,
|
||||
std::iter::repeat_n(0, zero_len),
|
||||
);
|
||||
self.bins[collapse_end_index] += collapsed_count;
|
||||
}
|
||||
self.min_key = new_min_key;
|
||||
self.shift_bins(shift);
|
||||
}
|
||||
|
||||
self.max_key = new_max_key;
|
||||
self.is_collapsed = true;
|
||||
} else {
|
||||
self.center_bins(new_min_key, new_max_key);
|
||||
self.min_key = new_min_key;
|
||||
self.max_key = new_max_key;
|
||||
}
|
||||
}
|
||||
|
||||
fn shift_bins(&mut self, shift: i32) {
|
||||
if shift > 0 {
|
||||
let shift = shift as usize;
|
||||
self.bins.rotate_right(shift);
|
||||
for idx in 0..shift {
|
||||
self.bins[idx] = 0;
|
||||
}
|
||||
} else {
|
||||
let shift = shift.unsigned_abs() as usize;
|
||||
for idx in 0..shift {
|
||||
self.bins[idx] = 0;
|
||||
}
|
||||
self.bins.rotate_left(shift);
|
||||
}
|
||||
|
||||
self.offset -= shift;
|
||||
}
|
||||
|
||||
fn center_bins(&mut self, new_min_key: i32, new_max_key: i32) {
|
||||
let middle_key = new_min_key + (new_max_key - new_min_key + 1) / 2;
|
||||
let shift = self.offset + self.length() / 2 - middle_key;
|
||||
self.shift_bins(shift)
|
||||
}
|
||||
|
||||
pub fn key_at_rank(&self, rank: u64) -> i32 {
|
||||
let mut n = 0;
|
||||
for (i, bin) in self.bins.iter().enumerate() {
|
||||
n += *bin;
|
||||
if n > rank {
|
||||
return i as i32 + self.offset;
|
||||
}
|
||||
}
|
||||
|
||||
self.max_key
|
||||
}
|
||||
|
||||
pub fn count(&self) -> u64 {
|
||||
self.count
|
||||
}
|
||||
|
||||
pub fn merge(&mut self, other: &Store) {
|
||||
if other.count == 0 {
|
||||
return;
|
||||
}
|
||||
|
||||
if self.count == 0 {
|
||||
self.copy(other);
|
||||
return;
|
||||
}
|
||||
|
||||
if other.min_key < self.min_key || other.max_key > self.max_key {
|
||||
self.extend_range(other.min_key, Some(other.max_key));
|
||||
}
|
||||
|
||||
let collapse_start_index = other.min_key - other.offset;
|
||||
let mut collapse_end_index = i32::min(self.min_key, other.max_key + 1) - other.offset;
|
||||
if collapse_end_index > collapse_start_index {
|
||||
let collapsed_count: u64 = self.bins
|
||||
[collapse_start_index as usize..collapse_end_index as usize]
|
||||
.iter()
|
||||
.sum();
|
||||
self.bins[0] += collapsed_count;
|
||||
} else {
|
||||
collapse_end_index = collapse_start_index;
|
||||
}
|
||||
|
||||
for key in (collapse_end_index + other.offset)..(other.max_key + 1) {
|
||||
self.bins[(key - self.offset) as usize] += other.bins[(key - other.offset) as usize]
|
||||
}
|
||||
|
||||
self.count += other.count;
|
||||
}
|
||||
|
||||
fn copy(&mut self, o: &Store) {
|
||||
self.bins = o.bins.clone();
|
||||
self.count = o.count;
|
||||
self.min_key = o.min_key;
|
||||
self.max_key = o.max_key;
|
||||
self.offset = o.offset;
|
||||
self.bin_limit = o.bin_limit;
|
||||
self.is_collapsed = o.is_collapsed;
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::store::Store;
|
||||
|
||||
#[test]
|
||||
fn test_simple_store() {
|
||||
let mut s = Store::new(2048);
|
||||
|
||||
for i in 0..2048 {
|
||||
s.add(i);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_simple_store_rev() {
|
||||
let mut s = Store::new(2048);
|
||||
|
||||
for i in (0..2048).rev() {
|
||||
s.add(i);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,88 +0,0 @@
|
||||
use std::cmp::Ordering;
|
||||
use std::f64::NAN;
|
||||
|
||||
pub struct Dataset {
|
||||
values: Vec<f64>,
|
||||
sum: f64,
|
||||
sorted: bool,
|
||||
}
|
||||
|
||||
fn cmp_f64(a: &f64, b: &f64) -> Ordering {
|
||||
assert!(!a.is_nan() && !b.is_nan());
|
||||
|
||||
if a < b {
|
||||
return Ordering::Less;
|
||||
} else if a > b {
|
||||
return Ordering::Greater;
|
||||
} else {
|
||||
return Ordering::Equal;
|
||||
}
|
||||
}
|
||||
|
||||
impl Dataset {
|
||||
pub fn new() -> Self {
|
||||
Dataset {
|
||||
values: Vec::new(),
|
||||
sum: 0.0,
|
||||
sorted: false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add(&mut self, value: f64) {
|
||||
self.values.push(value);
|
||||
self.sum += value;
|
||||
self.sorted = false;
|
||||
}
|
||||
|
||||
// pub fn quantile(&mut self, q: f64) -> f64 {
|
||||
// self.lower_quantile(q)
|
||||
// }
|
||||
|
||||
pub fn lower_quantile(&mut self, q: f64) -> f64 {
|
||||
if q < 0.0 || q > 1.0 || self.values.len() == 0 {
|
||||
return NAN;
|
||||
}
|
||||
|
||||
self.sort();
|
||||
let rank = q * (self.values.len() - 1) as f64;
|
||||
|
||||
self.values[rank.floor() as usize]
|
||||
}
|
||||
|
||||
pub fn upper_quantile(&mut self, q: f64) -> f64 {
|
||||
if q < 0.0 || q > 1.0 || self.values.len() == 0 {
|
||||
return NAN;
|
||||
}
|
||||
|
||||
self.sort();
|
||||
let rank = q * (self.values.len() - 1) as f64;
|
||||
self.values[rank.ceil() as usize]
|
||||
}
|
||||
|
||||
pub fn min(&mut self) -> f64 {
|
||||
self.sort();
|
||||
self.values[0]
|
||||
}
|
||||
|
||||
pub fn max(&mut self) -> f64 {
|
||||
self.sort();
|
||||
self.values[self.values.len() - 1]
|
||||
}
|
||||
|
||||
pub fn sum(&self) -> f64 {
|
||||
self.sum
|
||||
}
|
||||
|
||||
pub fn count(&self) -> usize {
|
||||
self.values.len()
|
||||
}
|
||||
|
||||
fn sort(&mut self) {
|
||||
if self.sorted {
|
||||
return;
|
||||
}
|
||||
|
||||
self.values.sort_by(cmp_f64);
|
||||
self.sorted = true;
|
||||
}
|
||||
}
|
||||
@@ -1,100 +0,0 @@
|
||||
extern crate rand;
|
||||
extern crate rand_distr;
|
||||
|
||||
use rand::prelude::*;
|
||||
|
||||
pub trait Generator {
|
||||
fn generate(&mut self) -> f64;
|
||||
}
|
||||
|
||||
// Constant generator
|
||||
//
|
||||
pub struct Constant {
|
||||
value: f64,
|
||||
}
|
||||
impl Constant {
|
||||
pub fn new(value: f64) -> Self {
|
||||
Constant { value }
|
||||
}
|
||||
}
|
||||
impl Generator for Constant {
|
||||
fn generate(&mut self) -> f64 {
|
||||
self.value
|
||||
}
|
||||
}
|
||||
|
||||
// Linear generator
|
||||
//
|
||||
pub struct Linear {
|
||||
current_value: f64,
|
||||
step: f64,
|
||||
}
|
||||
impl Linear {
|
||||
pub fn new(start_value: f64, step: f64) -> Self {
|
||||
Linear {
|
||||
current_value: start_value,
|
||||
step,
|
||||
}
|
||||
}
|
||||
}
|
||||
impl Generator for Linear {
|
||||
fn generate(&mut self) -> f64 {
|
||||
let value = self.current_value;
|
||||
self.current_value += self.step;
|
||||
value
|
||||
}
|
||||
}
|
||||
|
||||
// Normal distribution generator
|
||||
//
|
||||
pub struct Normal {
|
||||
distr: rand_distr::Normal<f64>,
|
||||
}
|
||||
impl Normal {
|
||||
pub fn new(mean: f64, stddev: f64) -> Self {
|
||||
Normal {
|
||||
distr: rand_distr::Normal::new(mean, stddev).unwrap(),
|
||||
}
|
||||
}
|
||||
}
|
||||
impl Generator for Normal {
|
||||
fn generate(&mut self) -> f64 {
|
||||
self.distr.sample(&mut rand::thread_rng())
|
||||
}
|
||||
}
|
||||
|
||||
// Lognormal distribution generator
|
||||
//
|
||||
pub struct Lognormal {
|
||||
distr: rand_distr::LogNormal<f64>,
|
||||
}
|
||||
impl Lognormal {
|
||||
pub fn new(mean: f64, stddev: f64) -> Self {
|
||||
Lognormal {
|
||||
distr: rand_distr::LogNormal::new(mean, stddev).unwrap(),
|
||||
}
|
||||
}
|
||||
}
|
||||
impl Generator for Lognormal {
|
||||
fn generate(&mut self) -> f64 {
|
||||
self.distr.sample(&mut rand::thread_rng())
|
||||
}
|
||||
}
|
||||
|
||||
// Exponential distribution generator
|
||||
//
|
||||
pub struct Exponential {
|
||||
distr: rand_distr::Exp<f64>,
|
||||
}
|
||||
impl Exponential {
|
||||
pub fn new(lambda: f64) -> Self {
|
||||
Exponential {
|
||||
distr: rand_distr::Exp::new(lambda).unwrap(),
|
||||
}
|
||||
}
|
||||
}
|
||||
impl Generator for Exponential {
|
||||
fn generate(&mut self) -> f64 {
|
||||
self.distr.sample(&mut rand::thread_rng())
|
||||
}
|
||||
}
|
||||
@@ -1,2 +0,0 @@
|
||||
pub mod dataset;
|
||||
pub mod generator;
|
||||
@@ -1,316 +0,0 @@
|
||||
mod common;
|
||||
use std::time::Instant;
|
||||
|
||||
use common::dataset::Dataset;
|
||||
use common::generator;
|
||||
use common::generator::Generator;
|
||||
use sketches_ddsketch::{Config, DDSketch};
|
||||
|
||||
const TEST_ALPHA: f64 = 0.01;
|
||||
const TEST_MAX_BINS: u32 = 1024;
|
||||
const TEST_MIN_VALUE: f64 = 1.0e-9;
|
||||
|
||||
// Used for float equality
|
||||
const TEST_ERROR_THRESH: f64 = 1.0e-9;
|
||||
|
||||
const TEST_SIZES: [usize; 5] = [3, 5, 10, 100, 1000];
|
||||
const TEST_QUANTILES: [f64; 10] = [0.0, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99, 0.999, 1.0];
|
||||
|
||||
#[test]
|
||||
fn test_constant() {
|
||||
evaluate_sketches(|| Box::new(generator::Constant::new(42.0)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_linear() {
|
||||
evaluate_sketches(|| Box::new(generator::Linear::new(0.0, 1.0)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_normal() {
|
||||
evaluate_sketches(|| Box::new(generator::Normal::new(35.0, 1.0)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_lognormal() {
|
||||
evaluate_sketches(|| Box::new(generator::Lognormal::new(0.0, 2.0)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_exponential() {
|
||||
evaluate_sketches(|| Box::new(generator::Exponential::new(2.0)));
|
||||
}
|
||||
|
||||
fn evaluate_test_sizes(f: impl Fn(usize)) {
|
||||
for sz in &TEST_SIZES {
|
||||
f(*sz);
|
||||
}
|
||||
}
|
||||
|
||||
fn evaluate_sketches(gen_factory: impl Fn() -> Box<dyn generator::Generator>) {
|
||||
evaluate_test_sizes(|sz: usize| {
|
||||
let mut generator = gen_factory();
|
||||
evaluate_sketch(sz, &mut generator);
|
||||
});
|
||||
}
|
||||
|
||||
fn new_config() -> Config {
|
||||
Config::new(TEST_ALPHA, TEST_MAX_BINS, TEST_MIN_VALUE)
|
||||
}
|
||||
|
||||
fn assert_float_eq(a: f64, b: f64) {
|
||||
assert!((a - b).abs() < TEST_ERROR_THRESH, "{} != {}", a, b);
|
||||
}
|
||||
|
||||
fn evaluate_sketch(count: usize, generator: &mut Box<dyn generator::Generator>) {
|
||||
let c = new_config();
|
||||
let mut g = DDSketch::new(c);
|
||||
|
||||
let mut d = Dataset::new();
|
||||
|
||||
for _i in 0..count {
|
||||
let value = generator.generate();
|
||||
|
||||
g.add(value);
|
||||
d.add(value);
|
||||
}
|
||||
|
||||
compare_sketches(&mut d, &g);
|
||||
}
|
||||
|
||||
fn compare_sketches(d: &mut Dataset, g: &DDSketch) {
|
||||
for q in &TEST_QUANTILES {
|
||||
let lower = d.lower_quantile(*q);
|
||||
let upper = d.upper_quantile(*q);
|
||||
|
||||
let min_expected;
|
||||
if lower < 0.0 {
|
||||
min_expected = lower * (1.0 + TEST_ALPHA);
|
||||
} else {
|
||||
min_expected = lower * (1.0 - TEST_ALPHA);
|
||||
}
|
||||
|
||||
let max_expected;
|
||||
if upper > 0.0 {
|
||||
max_expected = upper * (1.0 + TEST_ALPHA);
|
||||
} else {
|
||||
max_expected = upper * (1.0 - TEST_ALPHA);
|
||||
}
|
||||
|
||||
let quantile = g.quantile(*q).unwrap().unwrap();
|
||||
|
||||
assert!(
|
||||
min_expected <= quantile,
|
||||
"Lower than min, quantile: {}, wanted {} <= {}",
|
||||
*q,
|
||||
min_expected,
|
||||
quantile
|
||||
);
|
||||
assert!(
|
||||
quantile <= max_expected,
|
||||
"Higher than max, quantile: {}, wanted {} <= {}",
|
||||
*q,
|
||||
quantile,
|
||||
max_expected
|
||||
);
|
||||
|
||||
// verify that calls do not modify result (not mut so not possible?)
|
||||
let quantile2 = g.quantile(*q).unwrap().unwrap();
|
||||
assert_eq!(quantile, quantile2);
|
||||
}
|
||||
|
||||
assert_eq!(g.min().unwrap(), d.min());
|
||||
assert_eq!(g.max().unwrap(), d.max());
|
||||
assert_float_eq(g.sum().unwrap(), d.sum());
|
||||
assert_eq!(g.count(), d.count());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_merge_normal() {
|
||||
evaluate_test_sizes(|sz: usize| {
|
||||
let c = new_config();
|
||||
let mut d = Dataset::new();
|
||||
let mut g1 = DDSketch::new(c);
|
||||
|
||||
let mut generator1 = generator::Normal::new(35.0, 1.0);
|
||||
for _ in (0..sz).step_by(3) {
|
||||
let value = generator1.generate();
|
||||
g1.add(value);
|
||||
d.add(value);
|
||||
}
|
||||
let mut g2 = DDSketch::new(c);
|
||||
let mut generator2 = generator::Normal::new(50.0, 2.0);
|
||||
for _ in (1..sz).step_by(3) {
|
||||
let value = generator2.generate();
|
||||
g2.add(value);
|
||||
d.add(value);
|
||||
}
|
||||
g1.merge(&g2).unwrap();
|
||||
|
||||
let mut g3 = DDSketch::new(c);
|
||||
let mut generator3 = generator::Normal::new(40.0, 0.5);
|
||||
for _ in (2..sz).step_by(3) {
|
||||
let value = generator3.generate();
|
||||
g3.add(value);
|
||||
d.add(value);
|
||||
}
|
||||
g1.merge(&g3).unwrap();
|
||||
|
||||
compare_sketches(&mut d, &g1);
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_merge_empty() {
|
||||
evaluate_test_sizes(|sz: usize| {
|
||||
let c = new_config();
|
||||
|
||||
let mut d = Dataset::new();
|
||||
|
||||
let mut g1 = DDSketch::new(c);
|
||||
let mut g2 = DDSketch::new(c);
|
||||
let mut generator = generator::Exponential::new(5.0);
|
||||
|
||||
for _ in 0..sz {
|
||||
let value = generator.generate();
|
||||
g2.add(value);
|
||||
d.add(value);
|
||||
}
|
||||
g1.merge(&g2).unwrap();
|
||||
compare_sketches(&mut d, &g1);
|
||||
|
||||
let g3 = DDSketch::new(c);
|
||||
g2.merge(&g3).unwrap();
|
||||
compare_sketches(&mut d, &g2);
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_merge_mixed() {
|
||||
evaluate_test_sizes(|sz: usize| {
|
||||
let c = new_config();
|
||||
let mut d = Dataset::new();
|
||||
let mut g1 = DDSketch::new(c);
|
||||
|
||||
let mut generator1 = generator::Normal::new(100.0, 1.0);
|
||||
for _ in (0..sz).step_by(3) {
|
||||
let value = generator1.generate();
|
||||
g1.add(value);
|
||||
d.add(value);
|
||||
}
|
||||
|
||||
let mut g2 = DDSketch::new(c);
|
||||
let mut generator2 = generator::Exponential::new(5.0);
|
||||
for _ in (1..sz).step_by(3) {
|
||||
let value = generator2.generate();
|
||||
g2.add(value);
|
||||
d.add(value);
|
||||
}
|
||||
g1.merge(&g2).unwrap();
|
||||
|
||||
let mut g3 = DDSketch::new(c);
|
||||
let mut generator3 = generator::Exponential::new(0.1);
|
||||
for _ in (2..sz).step_by(3) {
|
||||
let value = generator3.generate();
|
||||
g3.add(value);
|
||||
d.add(value);
|
||||
}
|
||||
g1.merge(&g3).unwrap();
|
||||
|
||||
compare_sketches(&mut d, &g1);
|
||||
})
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_merge_incompatible() {
|
||||
let c1 = Config::new(TEST_ALPHA, TEST_MAX_BINS, TEST_MIN_VALUE);
|
||||
let c2 = Config::new(TEST_ALPHA * 2.0, TEST_MAX_BINS, TEST_MIN_VALUE);
|
||||
|
||||
let mut d1 = DDSketch::new(c1);
|
||||
let d2 = DDSketch::new(c2);
|
||||
|
||||
assert!(d1.merge(&d2).is_err());
|
||||
|
||||
let c3 = Config::new(TEST_ALPHA, TEST_MAX_BINS, TEST_MIN_VALUE * 10.0);
|
||||
let d3 = DDSketch::new(c3);
|
||||
|
||||
assert!(d1.merge(&d3).is_err());
|
||||
|
||||
let c4 = Config::new(TEST_ALPHA, TEST_MAX_BINS * 2, TEST_MIN_VALUE);
|
||||
let d4 = DDSketch::new(c4);
|
||||
|
||||
assert!(d1.merge(&d4).is_err());
|
||||
|
||||
// the same should work
|
||||
let c5 = Config::new(TEST_ALPHA, TEST_MAX_BINS, TEST_MIN_VALUE);
|
||||
let dsame = DDSketch::new(c5);
|
||||
assert!(d1.merge(&dsame).is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn test_performance_insert() {
|
||||
let c = Config::defaults();
|
||||
let mut g = DDSketch::new(c);
|
||||
let mut gen = generator::Normal::new(1000.0, 500.0);
|
||||
let count = 300_000_000;
|
||||
|
||||
let mut values = Vec::new();
|
||||
for _ in 0..count {
|
||||
values.push(gen.generate());
|
||||
}
|
||||
|
||||
let start_time = Instant::now();
|
||||
for value in values {
|
||||
g.add(value);
|
||||
}
|
||||
|
||||
// This simply ensures the operations don't get optimzed out as ignored
|
||||
let quantile = g.quantile(0.50).unwrap().unwrap();
|
||||
|
||||
let elapsed = start_time.elapsed().as_micros() as f64;
|
||||
let elapsed = elapsed / 1_000_000.0;
|
||||
|
||||
println!(
|
||||
"RESULT: p50={:.2} => Added {}M samples in {:2} secs ({:.2}M samples/sec)",
|
||||
quantile,
|
||||
count / 1_000_000,
|
||||
elapsed,
|
||||
(count as f64) / 1_000_000.0 / elapsed
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn test_performance_merge() {
|
||||
let c = Config::defaults();
|
||||
let mut gen = generator::Normal::new(1000.0, 500.0);
|
||||
let merge_count = 500_000;
|
||||
let sample_count = 1_000;
|
||||
let mut sketches = Vec::new();
|
||||
|
||||
for _ in 0..merge_count {
|
||||
let mut d = DDSketch::new(c);
|
||||
for _ in 0..sample_count {
|
||||
d.add(gen.generate());
|
||||
}
|
||||
sketches.push(d);
|
||||
}
|
||||
|
||||
let mut base = DDSketch::new(c);
|
||||
|
||||
let start_time = Instant::now();
|
||||
for sketch in &sketches {
|
||||
base.merge(sketch).unwrap();
|
||||
}
|
||||
|
||||
let elapsed = start_time.elapsed().as_micros() as f64;
|
||||
let elapsed = elapsed / 1_000_000.0;
|
||||
|
||||
println!(
|
||||
"RESULT: Merged {} sketches in {:2} secs ({:.2} merges/sec)",
|
||||
merge_count,
|
||||
elapsed,
|
||||
(merge_count as f64) / elapsed
|
||||
);
|
||||
}
|
||||
@@ -331,7 +331,7 @@ mod tests {
|
||||
use crate::aggregation::AggregationCollector;
|
||||
use crate::query::AllQuery;
|
||||
use crate::schema::{Schema, FAST};
|
||||
use crate::Index;
|
||||
use crate::{assert_nearly_equals, Index};
|
||||
|
||||
#[test]
|
||||
fn test_aggregation_percentiles_empty_index() -> crate::Result<()> {
|
||||
@@ -614,13 +614,17 @@ mod tests {
|
||||
let res = exec_request_with_query(agg_req, &index, None)?;
|
||||
assert_eq!(res["range_with_stats"]["buckets"][0]["doc_count"], 3);
|
||||
|
||||
assert_eq!(
|
||||
res["range_with_stats"]["buckets"][0]["percentiles"]["values"]["1.0"],
|
||||
5.002829575110705
|
||||
assert_nearly_equals!(
|
||||
res["range_with_stats"]["buckets"][0]["percentiles"]["values"]["1.0"]
|
||||
.as_f64()
|
||||
.unwrap(),
|
||||
5.0028295751107414
|
||||
);
|
||||
assert_eq!(
|
||||
res["range_with_stats"]["buckets"][0]["percentiles"]["values"]["99.0"],
|
||||
10.07469668951133
|
||||
assert_nearly_equals!(
|
||||
res["range_with_stats"]["buckets"][0]["percentiles"]["values"]["99.0"]
|
||||
.as_f64()
|
||||
.unwrap(),
|
||||
10.07469668951144
|
||||
);
|
||||
|
||||
Ok(())
|
||||
@@ -665,8 +669,14 @@ mod tests {
|
||||
|
||||
let res = exec_request_with_query(agg_req, &index, None)?;
|
||||
|
||||
assert_eq!(res["percentiles"]["values"]["1.0"], 5.002829575110705);
|
||||
assert_eq!(res["percentiles"]["values"]["99.0"], 10.07469668951133);
|
||||
assert_nearly_equals!(
|
||||
res["percentiles"]["values"]["1.0"].as_f64().unwrap(),
|
||||
5.0028295751107414
|
||||
);
|
||||
assert_nearly_equals!(
|
||||
res["percentiles"]["values"]["99.0"].as_f64().unwrap(),
|
||||
10.07469668951144
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -21,7 +21,7 @@ use std::path::PathBuf;
|
||||
pub use common::file_slice::{FileHandle, FileSlice};
|
||||
pub use common::{AntiCallToken, OwnedBytes, TerminatingWrite};
|
||||
|
||||
pub(crate) use self::composite_file::{CompositeFile, CompositeWrite};
|
||||
pub use self::composite_file::{CompositeFile, CompositeWrite};
|
||||
pub use self::directory::{Directory, DirectoryClone, DirectoryLock};
|
||||
pub use self::directory_lock::{Lock, INDEX_WRITER_LOCK, META_LOCK};
|
||||
pub use self::ram_directory::RamDirectory;
|
||||
@@ -52,7 +52,7 @@ pub use self::mmap_directory::MmapDirectory;
|
||||
///
|
||||
/// `WritePtr` are required to implement both Write
|
||||
/// and Seek.
|
||||
pub type WritePtr = BufWriter<Box<dyn TerminatingWrite>>;
|
||||
pub type WritePtr = BufWriter<Box<dyn TerminatingWrite + Send + Sync>>;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
@@ -94,7 +94,7 @@ impl MergePolicy for LogMergePolicy {
|
||||
fn compute_merge_candidates(&self, segments: &[SegmentMeta]) -> Vec<MergeCandidate> {
|
||||
let size_sorted_segments = segments
|
||||
.iter()
|
||||
.filter(|seg| seg.num_docs() <= (self.max_docs_before_merge as u32))
|
||||
.filter(|seg| (seg.num_docs() as usize) <= self.max_docs_before_merge)
|
||||
.sorted_by_key(|seg| std::cmp::Reverse(seg.max_doc()))
|
||||
.collect::<Vec<&SegmentMeta>>();
|
||||
|
||||
@@ -372,4 +372,21 @@ mod tests {
|
||||
assert_eq!(merge_candidates[0].0.len(), 1);
|
||||
assert_eq!(merge_candidates[0].0[0], test_input[1].id());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_max_docs_before_merge_large_value() {
|
||||
// Regression test: (max_docs_before_merge as u32) truncates values > u32::MAX.
|
||||
// Casting num_docs() to usize instead avoids the truncation.
|
||||
let mut policy = LogMergePolicy::default();
|
||||
policy.set_min_num_segments(2);
|
||||
policy.set_max_docs_before_merge(5_000_000_000usize);
|
||||
let test_input = vec![
|
||||
create_random_segment_meta(100_000),
|
||||
create_random_segment_meta(100_000),
|
||||
];
|
||||
let result = policy.compute_merge_candidates(&test_input);
|
||||
// Both segments should be eligible (100_000 < 5_000_000_000)
|
||||
assert_eq!(result.len(), 1);
|
||||
assert_eq!(result[0].0.len(), 2);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -403,7 +403,8 @@ impl SegmentUpdater {
|
||||
// from the different drives.
|
||||
//
|
||||
// Segment 1 from disk 1, Segment 1 from disk 2, etc.
|
||||
committed_segment_metas.sort_by_key(|segment_meta| -(segment_meta.max_doc() as i32));
|
||||
committed_segment_metas
|
||||
.sort_by_key(|segment_meta| std::cmp::Reverse(segment_meta.max_doc()));
|
||||
let index_meta = IndexMeta {
|
||||
index_settings: index.settings().clone(),
|
||||
segments: committed_segment_metas,
|
||||
@@ -705,6 +706,7 @@ mod tests {
|
||||
use crate::collector::TopDocs;
|
||||
use crate::directory::RamDirectory;
|
||||
use crate::fastfield::AliveBitSet;
|
||||
use crate::index::{SegmentId, SegmentMetaInventory};
|
||||
use crate::indexer::merge_policy::tests::MergeWheneverPossible;
|
||||
use crate::indexer::merger::IndexMerger;
|
||||
use crate::indexer::segment_updater::merge_filtered_segments;
|
||||
@@ -712,6 +714,22 @@ mod tests {
|
||||
use crate::schema::*;
|
||||
use crate::{Directory, DocAddress, Index, Segment};
|
||||
|
||||
#[test]
|
||||
fn test_segment_sort_large_max_doc() {
|
||||
// Regression test: -(max_doc as i32) overflows for max_doc >= 2^31.
|
||||
// Using std::cmp::Reverse avoids this.
|
||||
let inventory = SegmentMetaInventory::default();
|
||||
let mut metas = vec![
|
||||
inventory.new_segment_meta(SegmentId::generate_random(), 100),
|
||||
inventory.new_segment_meta(SegmentId::generate_random(), (1u32 << 31) - 1),
|
||||
inventory.new_segment_meta(SegmentId::generate_random(), 50_000),
|
||||
];
|
||||
metas.sort_by_key(|m| std::cmp::Reverse(m.max_doc()));
|
||||
assert_eq!(metas[0].max_doc(), (1u32 << 31) - 1);
|
||||
assert_eq!(metas[1].max_doc(), 50_000);
|
||||
assert_eq!(metas[2].max_doc(), 100);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_delete_during_merge() -> crate::Result<()> {
|
||||
let mut schema_builder = Schema::builder();
|
||||
|
||||
@@ -169,8 +169,10 @@ mod macros;
|
||||
mod future_result;
|
||||
|
||||
// Re-exports
|
||||
pub use columnar;
|
||||
pub use common::{ByteCount, DateTime};
|
||||
pub use {columnar, query_grammar, time};
|
||||
pub use query_grammar;
|
||||
pub use time;
|
||||
|
||||
pub use crate::error::TantivyError;
|
||||
pub use crate::future_result::FutureResult;
|
||||
|
||||
@@ -14,7 +14,8 @@ mod postings;
|
||||
mod postings_writer;
|
||||
mod recorder;
|
||||
mod segment_postings;
|
||||
mod serializer;
|
||||
/// Serializer module for the inverted index
|
||||
pub mod serializer;
|
||||
mod skip;
|
||||
mod term_info;
|
||||
|
||||
|
||||
@@ -11,7 +11,7 @@ use crate::positions::PositionSerializer;
|
||||
use crate::postings::compression::{BlockEncoder, VIntEncoder, COMPRESSION_BLOCK_SIZE};
|
||||
use crate::postings::skip::SkipSerializer;
|
||||
use crate::query::Bm25Weight;
|
||||
use crate::schema::{Field, FieldEntry, FieldType, IndexRecordOption, Schema};
|
||||
use crate::schema::{Field, FieldEntry, IndexRecordOption, Schema};
|
||||
use crate::termdict::TermDictionaryBuilder;
|
||||
use crate::{DocId, Score};
|
||||
|
||||
@@ -80,9 +80,12 @@ impl InvertedIndexSerializer {
|
||||
let term_dictionary_write = self.terms_write.for_field(field);
|
||||
let postings_write = self.postings_write.for_field(field);
|
||||
let positions_write = self.positions_write.for_field(field);
|
||||
let field_type: FieldType = (*field_entry.field_type()).clone();
|
||||
let index_record_option = field_entry
|
||||
.field_type()
|
||||
.index_record_option()
|
||||
.unwrap_or(IndexRecordOption::Basic);
|
||||
FieldSerializer::create(
|
||||
&field_type,
|
||||
index_record_option,
|
||||
total_num_tokens,
|
||||
term_dictionary_write,
|
||||
postings_write,
|
||||
@@ -102,29 +105,27 @@ impl InvertedIndexSerializer {
|
||||
|
||||
/// The field serializer is in charge of
|
||||
/// the serialization of a specific field.
|
||||
pub struct FieldSerializer<'a> {
|
||||
term_dictionary_builder: TermDictionaryBuilder<&'a mut CountingWriter<WritePtr>>,
|
||||
pub struct FieldSerializer<'a, W: Write = WritePtr> {
|
||||
term_dictionary_builder: TermDictionaryBuilder<&'a mut CountingWriter<W>>,
|
||||
postings_serializer: PostingsSerializer,
|
||||
positions_serializer_opt: Option<PositionSerializer<&'a mut CountingWriter<WritePtr>>>,
|
||||
positions_serializer_opt: Option<PositionSerializer<&'a mut CountingWriter<W>>>,
|
||||
current_term_info: TermInfo,
|
||||
term_open: bool,
|
||||
postings_write: &'a mut CountingWriter<WritePtr>,
|
||||
postings_write: &'a mut CountingWriter<W>,
|
||||
postings_start_offset: u64,
|
||||
}
|
||||
|
||||
impl<'a> FieldSerializer<'a> {
|
||||
fn create(
|
||||
field_type: &FieldType,
|
||||
impl<'a, W: Write> FieldSerializer<'a, W> {
|
||||
/// Creates a new `FieldSerializer` for the given field type.
|
||||
pub fn create(
|
||||
index_record_option: IndexRecordOption,
|
||||
total_num_tokens: u64,
|
||||
term_dictionary_write: &'a mut CountingWriter<WritePtr>,
|
||||
postings_write: &'a mut CountingWriter<WritePtr>,
|
||||
positions_write: &'a mut CountingWriter<WritePtr>,
|
||||
term_dictionary_write: &'a mut CountingWriter<W>,
|
||||
postings_write: &'a mut CountingWriter<W>,
|
||||
positions_write: &'a mut CountingWriter<W>,
|
||||
fieldnorm_reader: Option<FieldNormReader>,
|
||||
) -> io::Result<FieldSerializer<'a>> {
|
||||
) -> io::Result<FieldSerializer<'a, W>> {
|
||||
total_num_tokens.serialize(postings_write)?;
|
||||
let index_record_option = field_type
|
||||
.index_record_option()
|
||||
.unwrap_or(IndexRecordOption::Basic);
|
||||
let term_dictionary_builder = TermDictionaryBuilder::create(term_dictionary_write)?;
|
||||
let average_fieldnorm = fieldnorm_reader
|
||||
.as_ref()
|
||||
@@ -192,6 +193,11 @@ impl<'a> FieldSerializer<'a> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Starts the postings for a new term without recording term frequencies.
|
||||
pub fn new_term_without_freq(&mut self, term: &[u8]) -> io::Result<()> {
|
||||
self.new_term(term, 0, false)
|
||||
}
|
||||
|
||||
/// Serialize the information that a document contains for the current term:
|
||||
/// its term frequency, and the position deltas.
|
||||
///
|
||||
@@ -297,6 +303,7 @@ impl Block {
|
||||
}
|
||||
}
|
||||
|
||||
/// Serializer for postings lists.
|
||||
pub struct PostingsSerializer {
|
||||
last_doc_id_encoded: u32,
|
||||
|
||||
@@ -316,6 +323,9 @@ pub struct PostingsSerializer {
|
||||
}
|
||||
|
||||
impl PostingsSerializer {
|
||||
/// Creates a new `PostingsSerializer`.
|
||||
/// * avg_fieldnorm - average field norm for the field being serialized.
|
||||
/// * mode - indexing options for the field being serialized.
|
||||
pub fn new(
|
||||
avg_fieldnorm: Score,
|
||||
mode: IndexRecordOption,
|
||||
@@ -338,6 +348,8 @@ impl PostingsSerializer {
|
||||
}
|
||||
}
|
||||
|
||||
/// Starts the serialization for a new term.
|
||||
/// * term_doc_freq - the number of documents containing the term.
|
||||
pub fn new_term(&mut self, term_doc_freq: u32, record_term_freq: bool) {
|
||||
self.bm25_weight = None;
|
||||
|
||||
@@ -377,6 +389,7 @@ impl PostingsSerializer {
|
||||
self.postings_write.extend(block_encoded);
|
||||
}
|
||||
if self.term_has_freq {
|
||||
// encode the term frequencies
|
||||
let (num_bits, block_encoded): (u8, &[u8]) = self
|
||||
.block_encoder
|
||||
.compress_block_unsorted(self.block.term_freqs(), true);
|
||||
@@ -417,6 +430,9 @@ impl PostingsSerializer {
|
||||
self.block.clear();
|
||||
}
|
||||
|
||||
/// Register that the given document contains the current term.
|
||||
/// * doc_id - the document id.
|
||||
/// * term_freq - the term frequency within the document.
|
||||
pub fn write_doc(&mut self, doc_id: DocId, term_freq: u32) {
|
||||
self.block.append_doc(doc_id, term_freq);
|
||||
if self.block.is_full() {
|
||||
@@ -424,6 +440,7 @@ impl PostingsSerializer {
|
||||
}
|
||||
}
|
||||
|
||||
/// Finish the serialization for this term.
|
||||
pub fn close_term(
|
||||
&mut self,
|
||||
doc_freq: u32,
|
||||
|
||||
@@ -14,7 +14,11 @@ use crate::{DocId, Score, TERMINATED};
|
||||
// (requiring a 6th bit), but the biggest doc_id we can want to encode is TERMINATED-1, which can
|
||||
// be represented on 31b without delta encoding.
|
||||
fn encode_bitwidth(bitwidth: u8, delta_1: bool) -> u8 {
|
||||
assert!(bitwidth < 32);
|
||||
assert!(
|
||||
bitwidth < 32,
|
||||
"bitwidth needs to be less than 32, but got {}",
|
||||
bitwidth
|
||||
);
|
||||
bitwidth | ((delta_1 as u8) << 6)
|
||||
}
|
||||
|
||||
|
||||
@@ -302,8 +302,9 @@ where
|
||||
|| self.previous_key[keep_len] < key[keep_len];
|
||||
assert!(
|
||||
increasing_keys,
|
||||
"Keys should be increasing. ({:?} > {key:?})",
|
||||
self.previous_key
|
||||
"Keys should be increasing. ({:?} > {:?})",
|
||||
String::from_utf8_lossy(&self.previous_key),
|
||||
String::from_utf8_lossy(key),
|
||||
);
|
||||
self.previous_key.resize(key.len(), 0u8);
|
||||
self.previous_key[keep_len..].copy_from_slice(&key[keep_len..]);
|
||||
|
||||
Reference in New Issue
Block a user