Compare commits

..

12 Commits

Author SHA1 Message Date
Paul Masurel
e6fdba1d21 rebasing composite aggregations 2026-03-18 15:14:38 +01:00
Paul Masurel
ad5eb250fe Fix format 2026-03-16 10:41:11 +01:00
Paul Masurel
d02559a4d1 Update time deps to defensively address a vulnerability. (#2850)
Closes #2849

Co-authored-by: Paul Masurel <paul.masurel@datadoghq.com>
2026-03-12 16:47:11 +01:00
Anas Limem
1922abaf33 Fixed integer overflow in segment sorting and merge policy truncation (#2846) 2026-03-12 16:44:38 +01:00
trinity-1686a
d0c5ffb0aa Merge pull request #2842 from quickwit-oss/congxie/replaceHll
Use sketches-ddsketch fork with Java-compatible binary encoding
2026-02-20 16:56:56 +01:00
cong.xie
18fedd9384 Fix nightly fmt: merge crate imports in percentiles tests
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-19 14:18:54 -05:00
cong.xie
2098fca47f Restore use_serde feature and simplify PercentilesCollector
Keep use_serde on sketches-ddsketch so DDSketch derives
Serialize/Deserialize, removing the need for custom impls
on PercentilesCollector.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-19 14:13:17 -05:00
cong.xie
1251b40c93 Drop use_serde feature; use Java binary encoding for PercentilesCollector
Replace the derived Serialize/Deserialize on PercentilesCollector with
custom impls that use DDSketch's Java-compatible binary encoding
(encode_to_java_bytes / decode_from_java_bytes). This removes the need
for the use_serde feature on sketches-ddsketch entirely.

Also restore original float test values and use assert_nearly_equals!
for all float comparisons in percentile tests, since DDSketch quantile
estimates can have minor precision differences across platforms.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-19 13:32:28 -05:00
cong.xie
09a49b872c Use assert_nearly_equals! for float comparisons in percentile test
Address review feedback: replace assert_eq! with assert_nearly_equals!
for float values that go through JSON serialization roundtrips, which
can introduce minor precision differences.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-19 13:21:48 -05:00
cong.xie
b9ace002ce Replace vendored sketches-ddsketch with git dependency
Move the vendored sketches-ddsketch crate (with Java-compatible binary
encoding) to its own repo at quickwit-oss/rust-sketches-ddsketch and
reference it via git+rev in Cargo.toml.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-19 12:22:19 -05:00
Adrien Guillo
51f340f83d Merge pull request #2837 from quickwit-oss/congxie/replaceHll
Replace hyperloglogplus with Apache DataSketches HLL (lg_k=11)
2026-02-12 17:19:40 -05:00
PSeitz
57fe659fff make serializer pub (#2835)
some changes on the posting list serializer to make it usable in
other contexts.

Improve errors

Signed-off-by: Pascal Seitz <pascal.seitz@gmail.com>
2026-02-11 14:37:42 +01:00
38 changed files with 553 additions and 2943 deletions

View File

@@ -47,7 +47,7 @@ rustc-hash = "2.0.0"
thiserror = "2.0.1"
htmlescape = "0.3.1"
fail = { version = "0.5.0", optional = true }
time = { version = "0.3.35", features = ["serde-well-known"] }
time = { version = "0.3.47", features = ["serde-well-known"] }
smallvec = "1.8.0"
rayon = "1.5.2"
lru = "0.16.3"
@@ -64,7 +64,7 @@ query-grammar = { version = "0.25.0", path = "./query-grammar", package = "tanti
tantivy-bitpacker = { version = "0.9", path = "./bitpacker" }
common = { version = "0.10", path = "./common/", package = "tantivy-common" }
tokenizer-api = { version = "0.6", path = "./tokenizer-api", package = "tantivy-tokenizer-api" }
sketches-ddsketch = { path = "./sketches-ddsketch", features = ["use_serde"] }
sketches-ddsketch = { git = "https://github.com/quickwit-oss/rust-sketches-ddsketch.git", rev = "555caf1", features = ["use_serde"] }
datasketches = "0.2.0"
futures-util = { version = "0.3.28", optional = true }
futures-channel = { version = "0.3.28", optional = true }
@@ -86,7 +86,7 @@ futures = "0.3.21"
paste = "1.0.11"
more-asserts = "0.3.1"
rand_distr = "0.5"
time = { version = "0.3.10", features = ["serde-well-known", "macros"] }
time = { version = "0.3.47", features = ["serde-well-known", "macros"] }
postcard = { version = "1.0.4", features = [
"use-std",
], default-features = false }
@@ -144,7 +144,6 @@ members = [
"sstable",
"tokenizer-api",
"columnar",
"sketches-ddsketch",
]
# Following the "fail" crate best practises, we isolate
@@ -202,4 +201,3 @@ harness = false
[[bench]]
name = "regex_all_terms"
harness = false

View File

@@ -1,6 +1,5 @@
use binggan::plugins::PeakMemAllocPlugin;
use binggan::{black_box, InputGroup, PeakMemAlloc, INSTRUMENTED_SYSTEM};
use common::DateTime;
use rand::distr::weighted::WeightedIndex;
use rand::rngs::StdRng;
use rand::seq::IndexedRandom;
@@ -11,7 +10,7 @@ use tantivy::aggregation::agg_req::Aggregations;
use tantivy::aggregation::AggregationCollector;
use tantivy::query::{AllQuery, TermQuery};
use tantivy::schema::{IndexRecordOption, Schema, TextFieldIndexing, FAST, STRING};
use tantivy::{doc, Index, Term};
use tantivy::{doc, DateTime, Index, Term};
#[global_allocator]
pub static GLOBAL: &PeakMemAlloc<std::alloc::System> = &INSTRUMENTED_SYSTEM;
@@ -320,6 +319,7 @@ fn terms_many_json_mixed_type_with_avg_sub_agg(index: &Index) {
});
execute_agg(index, agg_req);
}
fn composite_term_few(index: &Index) {
let agg_req = json!({
"my_ctf": {
@@ -354,7 +354,6 @@ fn composite_term_many_page_1000_with_avg_sub_agg(index: &Index) {
{ "text_many_terms": { "terms": { "field": "text_many_terms" } } }
],
"size": 1000,
},
"aggs": {
"average_f64": { "avg": { "field": "score_f64" } }
@@ -572,6 +571,7 @@ fn get_test_index_bench(cardinality: Cardinality) -> tantivy::Result<Index> {
let text_field_all_unique_terms =
schema_builder.add_text_field("text_all_unique_terms", STRING | FAST);
let text_field_many_terms = schema_builder.add_text_field("text_many_terms", STRING | FAST);
let text_field_few_terms = schema_builder.add_text_field("text_few_terms", STRING | FAST);
let text_field_few_terms_status =
schema_builder.add_text_field("text_few_terms_status", STRING | FAST);
let text_field_1000_terms_zipf =
@@ -600,6 +600,7 @@ fn get_test_index_bench(cardinality: Cardinality) -> tantivy::Result<Index> {
let log_level_distribution =
WeightedIndex::new(status_field_data.iter().map(|item| item.1)).unwrap();
let few_terms_data = ["INFO", "ERROR", "WARN", "DEBUG"];
let lg_norm = rand_distr::LogNormal::new(2.996f64, 0.979f64).unwrap();
let many_terms_data = (0..150_000)
@@ -635,6 +636,8 @@ fn get_test_index_bench(cardinality: Cardinality) -> tantivy::Result<Index> {
text_field_all_unique_terms => "coolo",
text_field_many_terms => "cool",
text_field_many_terms => "cool",
text_field_few_terms => "cool",
text_field_few_terms => "cool",
text_field_few_terms_status => log_level_sample_a,
text_field_few_terms_status => log_level_sample_b,
text_field_1000_terms_zipf => term_1000_a.as_str(),
@@ -665,6 +668,7 @@ fn get_test_index_bench(cardinality: Cardinality) -> tantivy::Result<Index> {
json_field => json,
text_field_all_unique_terms => format!("unique_term_{}", rng.random::<u64>()),
text_field_many_terms => many_terms_data.choose(&mut rng).unwrap().to_string(),
text_field_few_terms => few_terms_data.choose(&mut rng).unwrap().to_string(),
text_field_few_terms_status => status_field_data[log_level_distribution.sample(&mut rng)].0,
text_field_1000_terms_zipf => terms_1000[zipf_1000.sample(&mut rng) as usize - 1].as_str(),
score_field => val as u64,

View File

@@ -31,7 +31,7 @@ pub use u64_based::{
serialize_and_load_u64_based_column_values, serialize_u64_based_column_values,
};
pub use u128_based::{
CompactHit, CompactSpaceU64Accessor, open_u128_as_compact_u64, open_u128_mapped,
CompactSpaceU64Accessor, open_u128_as_compact_u64, open_u128_mapped,
serialize_column_values_u128,
};
pub use vec_column::VecColumn;

View File

@@ -292,19 +292,6 @@ impl BinarySerializable for IPCodecParams {
}
}
/// Represents the result of looking up a u128 value in the compact space.
///
/// If a value is outside the compact space, the next compact value is returned.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CompactHit {
/// The value exists in the compact space
Exact(u32),
/// The value does not exist in the compact space, but the next higher value does
Next(u32),
/// The value is greater than the maximum compact value
AfterLast,
}
/// Exposes the compact space compressed values as u64.
///
/// This allows faster access to the values, as u64 is faster to work with than u128.
@@ -322,11 +309,6 @@ impl CompactSpaceU64Accessor {
pub fn compact_to_u128(&self, compact: u32) -> u128 {
self.0.compact_to_u128(compact)
}
/// Finds the next compact space value for a given u128 value.
pub fn u128_to_next_compact(&self, value: u128) -> CompactHit {
self.0.u128_to_next_compact(value)
}
}
impl ColumnValues<u64> for CompactSpaceU64Accessor {
@@ -448,26 +430,6 @@ impl CompactSpaceDecompressor {
Ok(decompressor)
}
/// Finds the next compact space value for a given u128 value
pub fn u128_to_next_compact(&self, value: u128) -> CompactHit {
// Try to convert to compact space
match self.u128_to_compact(value) {
// Value is in compact space, return its compact representation
Ok(compact) => CompactHit::Exact(compact),
// Value is not in compact space
Err(pos) => {
if pos >= self.params.compact_space.ranges_mapping.len() {
// Value is beyond all ranges, no next value exists
CompactHit::AfterLast
} else {
// Get the next range and return its start compact value
let next_range = &self.params.compact_space.ranges_mapping[pos];
CompactHit::Next(next_range.compact_start)
}
}
}
}
/// Converting to compact space for the decompressor is more complex, since we may get values
/// which are outside the compact space. e.g. if we map
/// 1000 => 5
@@ -861,41 +823,6 @@ mod tests {
let _data = test_aux_vals(vals);
}
#[test]
fn test_u128_to_next_compact() {
let vals = &[100u128, 200u128, 1_000_000_000u128, 1_000_000_100u128];
let mut data = test_aux_vals(vals);
let _header = U128Header::deserialize(&mut data);
let decomp = CompactSpaceDecompressor::open(data).unwrap();
// Test value that's already in a range
let compact_100 = decomp.u128_to_compact(100).unwrap();
assert_eq!(
decomp.u128_to_next_compact(100),
CompactHit::Exact(compact_100)
);
// Test value between two ranges
let compact_million = decomp.u128_to_compact(1_000_000_000).unwrap();
assert_eq!(
decomp.u128_to_next_compact(250),
CompactHit::Next(compact_million)
);
// Test value before the first range
assert_eq!(
decomp.u128_to_next_compact(50),
CompactHit::Next(compact_100)
);
// Test value after the last range
assert_eq!(
decomp.u128_to_next_compact(10_000_000_000),
CompactHit::AfterLast
);
}
use proptest::prelude::*;
fn num_strategy() -> impl Strategy<Value = u128> {

View File

@@ -7,7 +7,7 @@ mod compact_space;
use common::{BinarySerializable, OwnedBytes, VInt};
pub use compact_space::{
CompactHit, CompactSpaceCompressor, CompactSpaceDecompressor, CompactSpaceU64Accessor,
CompactSpaceCompressor, CompactSpaceDecompressor, CompactSpaceU64Accessor,
};
use crate::column_values::monotonic_map_column;

View File

@@ -15,11 +15,10 @@ repository = "https://github.com/quickwit-oss/tantivy"
byteorder = "1.4.3"
ownedbytes = { version= "0.9", path="../ownedbytes" }
async-trait = "0.1"
time = { version = "0.3.10", features = ["serde-well-known"] }
time = { version = "0.3.47", features = ["serde-well-known"] }
serde = { version = "1.0.136", features = ["derive"] }
[dev-dependencies]
binggan = "0.14.0"
proptest = "1.0.0"
rand = "0.9"

View File

@@ -62,7 +62,9 @@ impl<W: TerminatingWrite> TerminatingWrite for CountingWriter<W> {
pub struct AntiCallToken(());
/// Trait used to indicate when no more write need to be done on a writer
pub trait TerminatingWrite: Write + Send + Sync {
///
/// Thread-safety is enforced at the call sites that require it.
pub trait TerminatingWrite: Write {
/// Indicate that the writer will no longer be used. Internally call terminate_ref.
fn terminate(mut self) -> io::Result<()>
where Self: Sized {

View File

@@ -1,27 +0,0 @@
[package]
name = "sketches-ddsketch"
version = "0.3.0"
authors = ["Mike Heffner <mikeh@fesnel.com>"]
edition = "2018"
license = "Apache-2.0"
readme = "README.md"
repository = "https://github.com/mheffner/rust-sketches-ddsketch"
homepage = "https://github.com/mheffner/rust-sketches-ddsketch"
description = """
A direct port of the Golang DDSketch implementation.
"""
exclude = [".gitignore"]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
serde = { package = "serde", version = "1.0", optional = true, features = ["derive", "serde_derive"] }
[dev-dependencies]
approx = "0.5.1"
rand = "0.8.5"
rand_distr = "0.4.3"
[features]
use_serde = ["serde", "serde/derive"]

View File

@@ -1,201 +0,0 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [2019] [Mike Heffner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@@ -1,11 +0,0 @@
clean:
cargo clean
test:
cargo test
test_logs:
cargo test -- --nocapture
test_performance:
cargo test --release --jobs 1 test_performance -- --ignored --nocapture

View File

@@ -1,37 +0,0 @@
# sketches-ddsketch
This is a direct port of the [Golang](https://github.com/DataDog/sketches-go)
[DDSketch](https://arxiv.org/pdf/1908.10693.pdf) quantile sketch implementation
to Rust. DDSketch is a fully-mergeable quantile sketch with relative-error
guarantees and is extremely fast.
# DDSketch
* Sketch size automatically grows as needed, starting with 128 bins.
* Extremely fast sample insertion and sketch merges.
## Usage
```rust
use sketches_ddsketch::{Config, DDSketch};
let config = Config::defaults();
let mut sketch = DDSketch::new(c);
sketch.add(1.0);
sketch.add(1.0);
sketch.add(1.0);
// Get p=50%
let quantile = sketch.quantile(0.5).unwrap();
assert_eq!(quantile, Some(1.0));
```
## Performance
No performance tuning has been done with this implementation of the port, so we
would expect similar profiles to the original implementation.
Out of the box we see can achieve over 70M sample inserts/sec and 350K sketch
merges/sec. All tests run on a single core Intel i7 processor with 4.2Ghz max
clock.

View File

@@ -1,98 +0,0 @@
#[cfg(feature = "use_serde")]
use serde::{Deserialize, Serialize};
const DEFAULT_MAX_BINS: u32 = 2048;
const DEFAULT_ALPHA: f64 = 0.01;
const DEFAULT_MIN_VALUE: f64 = 1.0e-9;
/// The configuration struct for constructing a `DDSketch`
#[derive(Copy, Clone, Debug, PartialEq)]
#[cfg_attr(feature = "use_serde", derive(Serialize, Deserialize))]
pub struct Config {
pub max_num_bins: u32,
pub gamma: f64,
pub(crate) gamma_ln: f64,
pub(crate) min_value: f64,
pub offset: i32,
}
fn log_gamma(value: f64, gamma_ln: f64) -> f64 {
value.ln() / gamma_ln
}
impl Config {
/// Construct a new `Config` struct with specific parameters. If you are unsure of how to
/// configure this, the `defaults` method constructs a `Config` with built-in defaults.
///
/// `max_num_bins` is the max number of bins the DDSketch will grow to, in steps of 128 bins.
pub fn new(alpha: f64, max_num_bins: u32, min_value: f64) -> Self {
// Aligned with Java's LogarithmicMapping / LogLikeIndexMapping:
// gamma = (1 + alpha) / (1 - alpha) (correctingFactor=1 for LogarithmicMapping)
// gamma_ln = gamma.ln() (not ln_1p, to match Java's Math.log(gamma))
// See: https://github.com/DataDog/sketches-java/blob/master/src/main/java/com/datadoghq/sketch/ddsketch/mapping/LogLikeIndexMapping.java (gamma() static method)
// See: https://github.com/DataDog/sketches-java/blob/master/src/main/java/com/datadoghq/sketch/ddsketch/mapping/LogarithmicMapping.java (constructor, correctingFactor()=1)
let gamma = (1.0 + alpha) / (1.0 - alpha);
let gamma_ln = gamma.ln();
Config {
max_num_bins,
gamma,
gamma_ln,
min_value,
offset: 1 - (log_gamma(min_value, gamma_ln) as i32),
}
}
/// Return a `Config` using built-in default settings
pub fn defaults() -> Self {
Self::new(DEFAULT_ALPHA, DEFAULT_MAX_BINS, DEFAULT_MIN_VALUE)
}
pub fn key(&self, v: f64) -> i32 {
// Aligned with Java's LogLikeIndexMapping.index(): floor-based indexing.
// Java uses `(int) index` / `(int) index - 1` which is equivalent to floor().
// See: https://github.com/DataDog/sketches-java/blob/master/src/main/java/com/datadoghq/sketch/ddsketch/mapping/LogLikeIndexMapping.java (index() method)
self.log_gamma(v).floor() as i32
}
pub fn value(&self, key: i32) -> f64 {
// Aligned with Java's LogLikeIndexMapping.value():
// lowerBound(index) * (1 + relativeAccuracy)
// = logInverse((index - indexOffset) / multiplier) * (1 + relativeAccuracy)
// = gamma^key * 2*gamma/(gamma+1)
// See: https://github.com/DataDog/sketches-java/blob/master/src/main/java/com/datadoghq/sketch/ddsketch/mapping/LogLikeIndexMapping.java (value() and lowerBound() methods)
self.pow_gamma(key) * (2.0 * self.gamma / (1.0 + self.gamma))
}
pub fn log_gamma(&self, value: f64) -> f64 {
log_gamma(value, self.gamma_ln)
}
pub fn pow_gamma(&self, key: i32) -> f64 {
((key as f64) * self.gamma_ln).exp()
}
pub fn min_possible(&self) -> f64 {
self.min_value
}
/// Reconstruct a Config from a gamma value (as decoded from the binary format).
/// Uses default max_num_bins and min_value.
/// See Java: https://github.com/DataDog/sketches-java/blob/master/src/main/java/com/datadoghq/sketch/ddsketch/mapping/LogarithmicMapping.java (LogarithmicMapping(double gamma, double indexOffset) constructor)
pub(crate) fn from_gamma(gamma: f64) -> Self {
let gamma_ln = gamma.ln();
Config {
max_num_bins: DEFAULT_MAX_BINS,
gamma,
gamma_ln,
min_value: DEFAULT_MIN_VALUE,
offset: 1 - (log_gamma(DEFAULT_MIN_VALUE, gamma_ln) as i32),
}
}
}
impl Default for Config {
fn default() -> Self {
Self::new(DEFAULT_ALPHA, DEFAULT_MAX_BINS, DEFAULT_MIN_VALUE)
}
}

View File

@@ -1,385 +0,0 @@
use std::{error, fmt};
#[cfg(feature = "use_serde")]
use serde::{Deserialize, Serialize};
use crate::config::Config;
use crate::store::Store;
type Result<T> = std::result::Result<T, DDSketchError>;
/// General error type for DDSketch, represents either an invalid quantile or an
/// incompatible merge operation.
#[derive(Debug, Clone)]
pub enum DDSketchError {
Quantile,
Merge,
}
impl fmt::Display for DDSketchError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
DDSketchError::Quantile => {
write!(f, "Invalid quantile, must be between 0 and 1 (inclusive)")
}
DDSketchError::Merge => write!(f, "Can not merge sketches with different configs"),
}
}
}
impl error::Error for DDSketchError {
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
// Generic
None
}
}
/// This struct represents a [DDSketch](https://arxiv.org/pdf/1908.10693.pdf)
#[derive(Clone)]
#[cfg_attr(feature = "use_serde", derive(Serialize, Deserialize))]
pub struct DDSketch {
pub(crate) config: Config,
pub(crate) store: Store,
pub(crate) negative_store: Store,
pub(crate) min: f64,
pub(crate) max: f64,
pub(crate) sum: f64,
pub(crate) zero_count: u64,
}
impl Default for DDSketch {
fn default() -> Self {
Self::new(Default::default())
}
}
// XXX: functions should return Option<> in the case of empty
impl DDSketch {
/// Construct a `DDSketch`. Requires a `Config` specifying the parameters of the sketch
pub fn new(config: Config) -> Self {
DDSketch {
config,
store: Store::new(config.max_num_bins as usize),
negative_store: Store::new(config.max_num_bins as usize),
min: f64::INFINITY,
max: f64::NEG_INFINITY,
sum: 0.0,
zero_count: 0,
}
}
/// Add the sample to the sketch
pub fn add(&mut self, v: f64) {
if v > self.config.min_possible() {
let key = self.config.key(v);
self.store.add(key);
} else if v < -self.config.min_possible() {
let key = self.config.key(-v);
self.negative_store.add(key);
} else {
self.zero_count += 1;
}
if v < self.min {
self.min = v;
}
if self.max < v {
self.max = v;
}
self.sum += v;
}
/// Return the quantile value for quantiles between 0.0 and 1.0. Result is an error, represented
/// as DDSketchError::Quantile if the requested quantile is outside of that range.
///
/// If the sketch is empty the result is None, else Some(v) for the quantile value.
pub fn quantile(&self, q: f64) -> Result<Option<f64>> {
if !(0.0..=1.0).contains(&q) {
return Err(DDSketchError::Quantile);
}
if self.empty() {
return Ok(None);
}
if q == 0.0 {
return Ok(Some(self.min));
} else if q == 1.0 {
return Ok(Some(self.max));
}
let rank = (q * (self.count() as f64 - 1.0)) as u64;
let quantile;
if rank < self.negative_store.count() {
let reversed_rank = self.negative_store.count() - rank - 1;
let key = self.negative_store.key_at_rank(reversed_rank);
quantile = -self.config.value(key);
} else if rank < self.zero_count + self.negative_store.count() {
quantile = 0.0;
} else {
let key = self
.store
.key_at_rank(rank - self.zero_count - self.negative_store.count());
quantile = self.config.value(key);
}
Ok(Some(quantile))
}
/// Returns the minimum value seen, or None if sketch is empty
pub fn min(&self) -> Option<f64> {
if self.empty() {
None
} else {
Some(self.min)
}
}
/// Returns the maximum value seen, or None if sketch is empty
pub fn max(&self) -> Option<f64> {
if self.empty() {
None
} else {
Some(self.max)
}
}
/// Returns the sum of values seen, or None if sketch is empty
pub fn sum(&self) -> Option<f64> {
if self.empty() {
None
} else {
Some(self.sum)
}
}
/// Returns the number of values added to the sketch
pub fn count(&self) -> usize {
(self.store.count() + self.zero_count + self.negative_store.count()) as usize
}
/// Returns the length of the underlying `Store`. This is mainly only useful for understanding
/// how much the sketch has grown given the inserted values.
pub fn length(&self) -> usize {
self.store.length() as usize + self.negative_store.length() as usize
}
/// Merge the contents of another sketch into this one. The sketch that is merged into this one
/// is unchanged after the merge.
pub fn merge(&mut self, o: &DDSketch) -> Result<()> {
if self.config != o.config {
return Err(DDSketchError::Merge);
}
let was_empty = self.store.count() == 0;
// Merge the stores
self.store.merge(&o.store);
self.negative_store.merge(&o.negative_store);
self.zero_count += o.zero_count;
// Need to ensure we don't override min/max with initializers
// if either store were empty
if was_empty {
self.min = o.min;
self.max = o.max;
} else if o.store.count() > 0 {
if o.min < self.min {
self.min = o.min
}
if o.max > self.max {
self.max = o.max;
}
}
self.sum += o.sum;
Ok(())
}
fn empty(&self) -> bool {
self.count() == 0
}
/// Encode this sketch into the Java-compatible binary format used by
/// `com.datadoghq.sketch.ddsketch.DDSketchWithExactSummaryStatistics`.
pub fn to_java_bytes(&self) -> Vec<u8> {
crate::encoding::encode_to_java_bytes(self)
}
/// Decode a sketch from the Java-compatible binary format.
/// Accepts bytes produced by Java's `DDSketchWithExactSummaryStatistics.encode()`
/// with or without the `0x02` version prefix.
pub fn from_java_bytes(
bytes: &[u8],
) -> std::result::Result<Self, crate::encoding::DecodeError> {
crate::encoding::decode_from_java_bytes(bytes)
}
}
#[cfg(test)]
mod tests {
use approx::assert_relative_eq;
use crate::{Config, DDSketch};
#[test]
fn test_add_zero() {
let alpha = 0.01;
let c = Config::new(alpha, 2048, 10e-9);
let mut dd = DDSketch::new(c);
dd.add(0.0);
}
#[test]
fn test_quartiles() {
let alpha = 0.01;
let c = Config::new(alpha, 2048, 10e-9);
let mut dd = DDSketch::new(c);
// Initialize sketch with {1.0, 2.0, 3.0, 4.0}
for i in 1..5 {
dd.add(i as f64);
}
// We expect the following mappings from quantile to value:
// [0,0.33]: 1.0, (0.34,0.66]: 2.0, (0.67,0.99]: 3.0, (0.99, 1.0]: 4.0
let test_cases = vec![
(0.0, 1.0),
(0.25, 1.0),
(0.33, 1.0),
(0.34, 2.0),
(0.5, 2.0),
(0.66, 2.0),
(0.67, 3.0),
(0.75, 3.0),
(0.99, 3.0),
(1.0, 4.0),
];
for (q, val) in test_cases {
assert_relative_eq!(dd.quantile(q).unwrap().unwrap(), val, max_relative = alpha);
}
}
#[test]
fn test_neg_quartiles() {
let alpha = 0.01;
let c = Config::new(alpha, 2048, 10e-9);
let mut dd = DDSketch::new(c);
// Initialize sketch with {1.0, 2.0, 3.0, 4.0}
for i in 1..5 {
dd.add(-i as f64);
}
let test_cases = vec![
(0.0, -4.0),
(0.25, -4.0),
(0.5, -3.0),
(0.75, -2.0),
(1.0, -1.0),
];
for (q, val) in test_cases {
assert_relative_eq!(dd.quantile(q).unwrap().unwrap(), val, max_relative = alpha);
}
}
#[test]
fn test_simple_quantile() {
let c = Config::defaults();
let mut dd = DDSketch::new(c);
for i in 1..101 {
dd.add(i as f64);
}
assert_eq!(dd.quantile(0.95).unwrap().unwrap().ceil(), 95.0);
assert!(dd.quantile(-1.01).is_err());
assert!(dd.quantile(1.01).is_err());
}
#[test]
fn test_empty_sketch() {
let c = Config::defaults();
let dd = DDSketch::new(c);
assert_eq!(dd.quantile(0.98).unwrap(), None);
assert_eq!(dd.max(), None);
assert_eq!(dd.min(), None);
assert_eq!(dd.sum(), None);
assert_eq!(dd.count(), 0);
assert!(dd.quantile(1.01).is_err());
}
#[test]
fn test_basic_histogram_data() {
let values = &[
0.754225035,
0.752900282,
0.752812246,
0.752602367,
0.754310155,
0.753525981,
0.752981082,
0.752715536,
0.751667941,
0.755079054,
0.753528150,
0.755188464,
0.752508723,
0.750064549,
0.753960428,
0.751139298,
0.752523560,
0.753253428,
0.753498342,
0.751858358,
0.752104636,
0.753841300,
0.754467374,
0.753814334,
0.750881719,
0.753182556,
0.752576884,
0.753945708,
0.753571911,
0.752314573,
0.752586651,
];
let c = Config::defaults();
let mut dd = DDSketch::new(c);
for value in values {
dd.add(*value);
}
assert_eq!(dd.max(), Some(0.755188464));
assert_eq!(dd.min(), Some(0.750064549));
assert_eq!(dd.count(), 31);
assert_eq!(dd.sum(), Some(23.343630625000003));
assert!(dd.quantile(0.25).unwrap().is_some());
assert!(dd.quantile(0.5).unwrap().is_some());
assert!(dd.quantile(0.75).unwrap().is_some());
}
#[test]
fn test_length() {
let mut dd = DDSketch::default();
assert_eq!(dd.length(), 0);
dd.add(1.0);
assert_eq!(dd.length(), 128);
dd.add(2.0);
dd.add(3.0);
assert_eq!(dd.length(), 128);
dd.add(-1.0);
assert_eq!(dd.length(), 256);
dd.add(-2.0);
dd.add(-3.0);
assert_eq!(dd.length(), 256);
}
}

View File

@@ -1,813 +0,0 @@
//! Java-compatible binary encoding/decoding for DDSketch.
//!
//! This module implements the binary format used by the Java
//! `com.datadoghq.sketch.ddsketch.DDSketchWithExactSummaryStatistics` class
//! from the DataDog/sketches-java library. It enables cross-language
//! serialization so that sketches produced in Rust can be deserialized
//! and merged by Java consumers.
use std::fmt;
use crate::config::Config;
use crate::ddsketch::DDSketch;
use crate::store::Store;
// ---------------------------------------------------------------------------
// Flag byte layout
//
// Each flag byte packs a 2-bit type ordinal in the low bits and a 6-bit
// subflag in the upper bits: (subflag << 2) | type_ordinal
// See: https://github.com/DataDog/sketches-java/blob/master/src/main/java/com/datadoghq/sketch/ddsketch/encoding/Flag.java
// ---------------------------------------------------------------------------
/// The 2-bit type field occupying the low bits of every flag byte.
#[repr(u8)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum FlagType {
SketchFeatures = 0,
PositiveStore = 1,
IndexMapping = 2,
NegativeStore = 3,
}
impl FlagType {
fn from_byte(b: u8) -> Option<Self> {
match b & 0x03 {
0 => Some(Self::SketchFeatures),
1 => Some(Self::PositiveStore),
2 => Some(Self::IndexMapping),
3 => Some(Self::NegativeStore),
_ => None,
}
}
}
/// Construct a flag byte from a subflag and a type.
const fn flag(subflag: u8, flag_type: FlagType) -> u8 {
(subflag << 2) | (flag_type as u8)
}
// Pre-computed flag bytes for the sketch features we encode/decode.
const FLAG_INDEX_MAPPING_LOG: u8 = flag(0, FlagType::IndexMapping); // 0x02
const FLAG_ZERO_COUNT: u8 = flag(1, FlagType::SketchFeatures); // 0x04
const FLAG_COUNT: u8 = flag(0x28, FlagType::SketchFeatures); // 0xA0
const FLAG_SUM: u8 = flag(0x21, FlagType::SketchFeatures); // 0x84
const FLAG_MIN: u8 = flag(0x22, FlagType::SketchFeatures); // 0x88
const FLAG_MAX: u8 = flag(0x23, FlagType::SketchFeatures); // 0x8C
/// BinEncodingMode subflags for store flag bytes.
/// See: https://github.com/DataDog/sketches-java/blob/master/src/main/java/com/datadoghq/sketch/ddsketch/encoding/BinEncodingMode.java
#[repr(u8)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum BinEncodingMode {
IndexDeltasAndCounts = 1,
IndexDeltas = 2,
ContiguousCounts = 3,
}
impl BinEncodingMode {
fn from_subflag(subflag: u8) -> Option<Self> {
match subflag {
1 => Some(Self::IndexDeltasAndCounts),
2 => Some(Self::IndexDeltas),
3 => Some(Self::ContiguousCounts),
_ => None,
}
}
}
const VAR_DOUBLE_ROTATE_DISTANCE: u32 = 6;
const MAX_VAR_LEN_64: usize = 9;
const DEFAULT_MAX_BINS: u32 = 2048;
// ---------------------------------------------------------------------------
// Error type
// ---------------------------------------------------------------------------
#[derive(Debug, Clone)]
pub enum DecodeError {
UnexpectedEof,
InvalidFlag(u8),
InvalidData(String),
}
impl fmt::Display for DecodeError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::UnexpectedEof => write!(f, "unexpected end of input"),
Self::InvalidFlag(b) => write!(f, "invalid flag byte: 0x{b:02X}"),
Self::InvalidData(msg) => write!(f, "invalid data: {msg}"),
}
}
}
impl std::error::Error for DecodeError {}
// ---------------------------------------------------------------------------
// VarEncoding — bit-exact port of Java VarEncodingHelper
// See: https://github.com/DataDog/sketches-java/blob/master/src/main/java/com/datadoghq/sketch/ddsketch/encoding/VarEncodingHelper.java
// ---------------------------------------------------------------------------
fn encode_unsigned_var_long(out: &mut Vec<u8>, mut value: u64) {
let length = ((63 - value.leading_zeros() as i32) / 7).clamp(0, 8);
for _ in 0..length {
out.push((value as u8) | 0x80);
value >>= 7;
}
out.push(value as u8);
}
fn decode_unsigned_var_long(input: &mut &[u8]) -> Result<u64, DecodeError> {
let mut value: u64 = 0;
let mut shift: u32 = 0;
loop {
let next = read_byte(input)?;
if next < 0x80 || shift == 56 {
return Ok(value | (u64::from(next) << shift));
}
value |= (u64::from(next) & 0x7F) << shift;
shift += 7;
}
}
/// ZigZag encode then var-long encode.
fn encode_signed_var_long(out: &mut Vec<u8>, value: i64) {
let encoded = ((value >> 63) ^ (value << 1)) as u64;
encode_unsigned_var_long(out, encoded);
}
fn decode_signed_var_long(input: &mut &[u8]) -> Result<i64, DecodeError> {
let encoded = decode_unsigned_var_long(input)?;
Ok(((encoded >> 1) as i64) ^ -((encoded & 1) as i64))
}
fn double_to_var_bits(value: f64) -> u64 {
let bits = f64::to_bits(value + 1.0).wrapping_sub(f64::to_bits(1.0));
bits.rotate_left(VAR_DOUBLE_ROTATE_DISTANCE)
}
fn var_bits_to_double(bits: u64) -> f64 {
f64::from_bits(
bits.rotate_right(VAR_DOUBLE_ROTATE_DISTANCE)
.wrapping_add(f64::to_bits(1.0)),
) - 1.0
}
fn encode_var_double(out: &mut Vec<u8>, value: f64) {
let mut bits = double_to_var_bits(value);
for _ in 0..MAX_VAR_LEN_64 - 1 {
let next = (bits >> 57) as u8;
bits <<= 7;
if bits == 0 {
out.push(next);
return;
}
out.push(next | 0x80);
}
out.push((bits >> 56) as u8);
}
fn decode_var_double(input: &mut &[u8]) -> Result<f64, DecodeError> {
let mut bits: u64 = 0;
let mut shift: i32 = 57; // 8*8 - 7
loop {
let next = read_byte(input)?;
if shift == 1 {
bits |= u64::from(next);
break;
}
if next < 0x80 {
bits |= u64::from(next) << shift;
break;
}
bits |= (u64::from(next) & 0x7F) << shift;
shift -= 7;
}
Ok(var_bits_to_double(bits))
}
// ---------------------------------------------------------------------------
// Byte-level helpers
// ---------------------------------------------------------------------------
fn read_byte(input: &mut &[u8]) -> Result<u8, DecodeError> {
match input.split_first() {
Some((&byte, rest)) => {
*input = rest;
Ok(byte)
}
None => Err(DecodeError::UnexpectedEof),
}
}
fn write_f64_le(out: &mut Vec<u8>, value: f64) {
out.extend_from_slice(&value.to_le_bytes());
}
fn read_f64_le(input: &mut &[u8]) -> Result<f64, DecodeError> {
if input.len() < 8 {
return Err(DecodeError::UnexpectedEof);
}
let (bytes, rest) = input.split_at(8);
*input = rest;
// bytes is guaranteed to be length 8 by the split_at above.
let arr = [
bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
];
Ok(f64::from_le_bytes(arr))
}
// ---------------------------------------------------------------------------
// Store encoding/decoding
// See: https://github.com/DataDog/sketches-java/blob/master/src/main/java/com/datadoghq/sketch/ddsketch/store/DenseStore.java (encode/decode methods)
// ---------------------------------------------------------------------------
/// Collect non-zero bins in the store as (absolute_index, count) pairs.
///
/// Allocation is acceptable here: this runs once per encode and the Vec
/// has at most `max_num_bins` entries.
fn collect_non_zero_bins(store: &Store) -> Vec<(i32, u64)> {
if store.count == 0 {
return Vec::new();
}
let start = (store.min_key - store.offset) as usize;
let end = ((store.max_key - store.offset + 1) as usize).min(store.bins.len());
store.bins[start..end]
.iter()
.enumerate()
.filter(|&(_, &count)| count > 0)
.map(|(i, &count)| (start as i32 + i as i32 + store.offset, count))
.collect()
}
fn encode_store(out: &mut Vec<u8>, store: &Store, flag_type: FlagType) {
let bins = collect_non_zero_bins(store);
if bins.is_empty() {
return;
}
out.push(flag(BinEncodingMode::IndexDeltasAndCounts as u8, flag_type));
encode_unsigned_var_long(out, bins.len() as u64);
let mut prev_index: i64 = 0;
for &(index, count) in &bins {
encode_signed_var_long(out, i64::from(index) - prev_index);
encode_var_double(out, count as f64);
prev_index = i64::from(index);
}
}
fn decode_store(input: &mut &[u8], subflag: u8, bin_limit: usize) -> Result<Store, DecodeError> {
let mode = BinEncodingMode::from_subflag(subflag).ok_or_else(|| {
DecodeError::InvalidData(format!("unknown bin encoding mode subflag: {subflag}"))
})?;
let num_bins = decode_unsigned_var_long(input)? as usize;
let mut store = Store::new(bin_limit);
match mode {
BinEncodingMode::IndexDeltasAndCounts => {
let mut index: i64 = 0;
for _ in 0..num_bins {
index += decode_signed_var_long(input)?;
let count = decode_var_double(input)?;
store.add_count(index as i32, count as u64);
}
}
BinEncodingMode::IndexDeltas => {
let mut index: i64 = 0;
for _ in 0..num_bins {
index += decode_signed_var_long(input)?;
store.add_count(index as i32, 1);
}
}
BinEncodingMode::ContiguousCounts => {
let start_index = decode_signed_var_long(input)?;
let index_delta = decode_signed_var_long(input)?;
let mut index = start_index;
for _ in 0..num_bins {
let count = decode_var_double(input)?;
store.add_count(index as i32, count as u64);
index += index_delta;
}
}
}
Ok(store)
}
// ---------------------------------------------------------------------------
// Top-level encode / decode
// ---------------------------------------------------------------------------
/// Encode a DDSketch into the Java-compatible binary format.
///
/// The output follows the encoding order of
/// `DDSketchWithExactSummaryStatistics.encode()` then `DDSketch.encode()`:
///
/// 1. Summary statistics: COUNT, MIN, MAX (if count > 0)
/// 2. SUM (if sum != 0)
/// 3. Index mapping (LOG layout): gamma, indexOffset
/// 4. Zero count (if > 0)
/// 5. Positive store bins
/// 6. Negative store bins
pub fn encode_to_java_bytes(sketch: &DDSketch) -> Vec<u8> {
let mut out = Vec::new();
let count = sketch.count() as f64;
// Summary statistics (DDSketchWithExactSummaryStatistics.encode)
if count != 0.0 {
out.push(FLAG_COUNT);
encode_var_double(&mut out, count);
out.push(FLAG_MIN);
write_f64_le(&mut out, sketch.min);
out.push(FLAG_MAX);
write_f64_le(&mut out, sketch.max);
}
if sketch.sum != 0.0 {
out.push(FLAG_SUM);
write_f64_le(&mut out, sketch.sum);
}
// DDSketch.encode: index mapping + zero count + stores
out.push(FLAG_INDEX_MAPPING_LOG);
write_f64_le(&mut out, sketch.config.gamma);
write_f64_le(&mut out, 0.0_f64);
if sketch.zero_count != 0 {
out.push(FLAG_ZERO_COUNT);
encode_var_double(&mut out, sketch.zero_count as f64);
}
encode_store(&mut out, &sketch.store, FlagType::PositiveStore);
encode_store(&mut out, &sketch.negative_store, FlagType::NegativeStore);
out
}
/// Decode a DDSketch from the Java-compatible binary format.
///
/// Accepts bytes with or without a `0x02` version prefix.
pub fn decode_from_java_bytes(bytes: &[u8]) -> Result<DDSketch, DecodeError> {
if bytes.is_empty() {
return Err(DecodeError::UnexpectedEof);
}
let mut input = bytes;
// Skip optional version prefix (0x02 followed by a valid flag byte).
if input.len() >= 2 && input[0] == 0x02 && is_valid_flag_byte(input[1]) {
input = &input[1..];
}
let mut gamma: Option<f64> = None;
let mut zero_count: f64 = 0.0;
let mut sum: f64 = 0.0;
let mut min: f64 = f64::INFINITY;
let mut max: f64 = f64::NEG_INFINITY;
let mut positive_store: Option<Store> = None;
let mut negative_store: Option<Store> = None;
while !input.is_empty() {
let flag_byte = read_byte(&mut input)?;
let flag_type =
FlagType::from_byte(flag_byte).ok_or(DecodeError::InvalidFlag(flag_byte))?;
let subflag = flag_byte >> 2;
match flag_type {
FlagType::IndexMapping => {
gamma = Some(read_f64_le(&mut input)?);
let _index_offset = read_f64_le(&mut input)?;
}
FlagType::SketchFeatures => match flag_byte {
FLAG_ZERO_COUNT => zero_count += decode_var_double(&mut input)?,
FLAG_COUNT => {
let _count = decode_var_double(&mut input)?;
}
FLAG_SUM => sum = read_f64_le(&mut input)?,
FLAG_MIN => min = read_f64_le(&mut input)?,
FLAG_MAX => max = read_f64_le(&mut input)?,
_ => return Err(DecodeError::InvalidFlag(flag_byte)),
},
FlagType::PositiveStore => {
positive_store = Some(decode_store(
&mut input,
subflag,
DEFAULT_MAX_BINS as usize,
)?);
}
FlagType::NegativeStore => {
negative_store = Some(decode_store(
&mut input,
subflag,
DEFAULT_MAX_BINS as usize,
)?);
}
}
}
let g = gamma.unwrap_or_else(|| Config::defaults().gamma);
let config = Config::from_gamma(g);
let store = positive_store.unwrap_or_else(|| Store::new(config.max_num_bins as usize));
let neg = negative_store.unwrap_or_else(|| Store::new(config.max_num_bins as usize));
Ok(DDSketch {
config,
store,
negative_store: neg,
min,
max,
sum,
zero_count: zero_count as u64,
})
}
/// Check whether a byte is a valid flag byte for the DDSketch binary format.
fn is_valid_flag_byte(b: u8) -> bool {
// Known sketch-feature flags
if matches!(
b,
FLAG_ZERO_COUNT | FLAG_COUNT | FLAG_SUM | FLAG_MIN | FLAG_MAX | FLAG_INDEX_MAPPING_LOG
) {
return true;
}
let Some(flag_type) = FlagType::from_byte(b) else {
return false;
};
let subflag = b >> 2;
match flag_type {
FlagType::PositiveStore | FlagType::NegativeStore => (1..=3).contains(&subflag),
FlagType::IndexMapping => subflag <= 4, // LOG=0, LOG_LINEAR=1 .. LOG_QUARTIC=4
_ => false,
}
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
#[cfg(test)]
mod tests {
use super::*;
use crate::{Config, DDSketch};
// --- VarEncoding unit tests ---
#[test]
fn test_unsigned_var_long_zero() {
let mut buf = Vec::new();
encode_unsigned_var_long(&mut buf, 0);
assert_eq!(buf, [0x00]);
let mut input = buf.as_slice();
assert_eq!(decode_unsigned_var_long(&mut input).unwrap(), 0);
assert!(input.is_empty());
}
#[test]
fn test_unsigned_var_long_small() {
let mut buf = Vec::new();
encode_unsigned_var_long(&mut buf, 1);
assert_eq!(buf, [0x01]);
let mut input = buf.as_slice();
assert_eq!(decode_unsigned_var_long(&mut input).unwrap(), 1);
}
#[test]
fn test_unsigned_var_long_128() {
let mut buf = Vec::new();
encode_unsigned_var_long(&mut buf, 128);
assert_eq!(buf, [0x80, 0x01]);
let mut input = buf.as_slice();
assert_eq!(decode_unsigned_var_long(&mut input).unwrap(), 128);
}
#[test]
fn test_unsigned_var_long_roundtrip() {
for v in [0u64, 1, 127, 128, 255, 256, 16383, 16384, u64::MAX] {
let mut buf = Vec::new();
encode_unsigned_var_long(&mut buf, v);
let mut input = buf.as_slice();
let decoded = decode_unsigned_var_long(&mut input).unwrap();
assert_eq!(decoded, v, "roundtrip failed for {}", v);
assert!(input.is_empty());
}
}
#[test]
fn test_signed_var_long_roundtrip() {
for v in [0i64, 1, -1, 63, -64, 64, -65, i64::MAX, i64::MIN] {
let mut buf = Vec::new();
encode_signed_var_long(&mut buf, v);
let mut input = buf.as_slice();
let decoded = decode_signed_var_long(&mut input).unwrap();
assert_eq!(decoded, v, "roundtrip failed for {}", v);
assert!(input.is_empty());
}
}
#[test]
fn test_var_double_roundtrip() {
for v in [0.0, 1.0, 2.0, 5.0, 15.0, 42.0, 100.0, 1e-9, 1e15, 0.5, 7.77] {
let mut buf = Vec::new();
encode_var_double(&mut buf, v);
let mut input = buf.as_slice();
let decoded = decode_var_double(&mut input).unwrap();
assert!(
(decoded - v).abs() < 1e-15 || decoded == v,
"roundtrip failed for {}: got {}",
v,
decoded,
);
assert!(input.is_empty());
}
}
#[test]
fn test_var_double_small_integers() {
let mut buf = Vec::new();
encode_var_double(&mut buf, 1.0);
assert_eq!(buf.len(), 1, "VarDouble(1.0) should be 1 byte");
buf.clear();
encode_var_double(&mut buf, 5.0);
assert_eq!(buf.len(), 1, "VarDouble(5.0) should be 1 byte");
}
// --- DDSketch encode/decode roundtrip tests ---
#[test]
fn test_encode_empty_sketch() {
let sketch = DDSketch::new(Config::defaults());
let bytes = sketch.to_java_bytes();
assert!(!bytes.is_empty());
let decoded = DDSketch::from_java_bytes(&bytes).unwrap();
assert_eq!(decoded.count(), 0);
assert_eq!(decoded.min(), None);
assert_eq!(decoded.max(), None);
assert_eq!(decoded.sum(), None);
}
#[test]
fn test_encode_simple_sketch() {
let mut sketch = DDSketch::new(Config::defaults());
for v in [1.0, 2.0, 3.0, 4.0, 5.0] {
sketch.add(v);
}
let bytes = sketch.to_java_bytes();
let decoded = DDSketch::from_java_bytes(&bytes).unwrap();
assert_eq!(decoded.count(), 5);
assert_eq!(decoded.min(), Some(1.0));
assert_eq!(decoded.max(), Some(5.0));
assert_eq!(decoded.sum(), Some(15.0));
assert_quantiles_match(&sketch, &decoded, &[0.5, 0.9, 0.95, 0.99]);
}
#[test]
fn test_encode_single_value() {
let mut sketch = DDSketch::new(Config::defaults());
sketch.add(42.0);
let bytes = sketch.to_java_bytes();
let decoded = DDSketch::from_java_bytes(&bytes).unwrap();
assert_eq!(decoded.count(), 1);
assert_eq!(decoded.min(), Some(42.0));
assert_eq!(decoded.max(), Some(42.0));
assert_eq!(decoded.sum(), Some(42.0));
}
#[test]
fn test_encode_negative_values() {
let mut sketch = DDSketch::new(Config::defaults());
for v in [-3.0, -1.0, 2.0, 5.0] {
sketch.add(v);
}
let bytes = sketch.to_java_bytes();
let decoded = DDSketch::from_java_bytes(&bytes).unwrap();
assert_eq!(decoded.count(), 4);
assert_eq!(decoded.min(), Some(-3.0));
assert_eq!(decoded.max(), Some(5.0));
assert_eq!(decoded.sum(), Some(3.0));
assert_quantiles_match(&sketch, &decoded, &[0.0, 0.25, 0.5, 0.75, 1.0]);
}
#[test]
fn test_encode_with_zero_value() {
let mut sketch = DDSketch::new(Config::defaults());
for v in [0.0, 1.0, 2.0] {
sketch.add(v);
}
let bytes = sketch.to_java_bytes();
let decoded = DDSketch::from_java_bytes(&bytes).unwrap();
assert_eq!(decoded.count(), 3);
assert_eq!(decoded.min(), Some(0.0));
assert_eq!(decoded.max(), Some(2.0));
assert_eq!(decoded.sum(), Some(3.0));
assert_eq!(decoded.zero_count, 1);
}
#[test]
fn test_encode_large_range() {
let mut sketch = DDSketch::new(Config::defaults());
sketch.add(0.001);
sketch.add(1_000_000.0);
let bytes = sketch.to_java_bytes();
let decoded = DDSketch::from_java_bytes(&bytes).unwrap();
assert_eq!(decoded.count(), 2);
assert_eq!(decoded.min(), Some(0.001));
assert_eq!(decoded.max(), Some(1_000_000.0));
}
#[test]
fn test_encode_with_version_prefix() {
let mut sketch = DDSketch::new(Config::defaults());
for v in [1.0, 2.0, 3.0] {
sketch.add(v);
}
let bytes = sketch.to_java_bytes();
// Simulate Java's toByteArrayV2: prepend 0x02
let mut v2_bytes = vec![0x02];
v2_bytes.extend_from_slice(&bytes);
let decoded = DDSketch::from_java_bytes(&v2_bytes).unwrap();
assert_eq!(decoded.count(), 3);
assert_eq!(decoded.min(), Some(1.0));
assert_eq!(decoded.max(), Some(3.0));
}
#[test]
fn test_byte_level_encoding() {
let mut sketch = DDSketch::new(Config::defaults());
sketch.add(1.0);
let bytes = sketch.to_java_bytes();
assert_eq!(bytes[0], FLAG_COUNT, "first byte should be COUNT flag");
assert!(
bytes.contains(&FLAG_INDEX_MAPPING_LOG),
"should contain index mapping flag"
);
}
// --- Cross-language golden byte tests ---
//
// Golden bytes generated by Java's DDSketchWithExactSummaryStatistics.encode()
// using LogarithmicMapping(0.01) + CollapsingLowestDenseStore(2048).
const GOLDEN_SIMPLE: &str = "a00588000000000000f03f8c0000000000001440840000000000002e4002fd4a815abf52f03f000000000000000005050002440228021e021602";
const GOLDEN_SINGLE: &str = "a0028800000000000045408c000000000000454084000000000000454002fd4a815abf52f03f00000000000000000501f40202";
const GOLDEN_NEGATIVE: &str = "a084408800000000000008c08c000000000000144084000000000000084002fd4a815abf52f03f0000000000000000050244025c02070200026c02";
const GOLDEN_ZERO: &str = "a0048800000000000000008c000000000000004084000000000000084002fd4a815abf52f03f00000000000000000402050200024402";
const GOLDEN_EMPTY: &str = "02fd4a815abf52f03f0000000000000000";
const GOLDEN_MANY: &str = "a08d1488000000000000f03f8c0000000000005940840000000000bab34002fd4a815abf52f03f000000000000000005550002440228021e021602120210020c020c020c0208020a020802060208020602060206020602040206020402040204020402040204020402040204020202040202020402020204020202020204020202020202020402020202020202020202020202020202020202020202020202020202020203020202020202020302020202020302020202020302020203020202030202020302030202020302030203020202030203020302030202";
fn hex_to_bytes(hex: &str) -> Vec<u8> {
(0..hex.len())
.step_by(2)
.map(|i| u8::from_str_radix(&hex[i..i + 2], 16).unwrap())
.collect()
}
fn bytes_to_hex(bytes: &[u8]) -> String {
bytes.iter().map(|b| format!("{b:02x}")).collect()
}
fn assert_golden(label: &str, sketch: &DDSketch, golden_hex: &str) {
let bytes = sketch.to_java_bytes();
let expected = hex_to_bytes(golden_hex);
assert_eq!(
bytes,
expected,
"Rust encoding doesn't match Java golden bytes for {}.\nRust: {}\nJava: {}",
label,
bytes_to_hex(&bytes),
golden_hex,
);
}
fn assert_quantiles_match(a: &DDSketch, b: &DDSketch, quantiles: &[f64]) {
for &q in quantiles {
let va = a.quantile(q).unwrap().unwrap();
let vb = b.quantile(q).unwrap().unwrap();
assert!(
(va - vb).abs() / va.abs().max(1e-15) < 1e-12,
"quantile({}) mismatch: {} vs {}",
q,
va,
vb,
);
}
}
#[test]
fn test_cross_language_simple() {
let mut sketch = DDSketch::new(Config::defaults());
for v in [1.0, 2.0, 3.0, 4.0, 5.0] {
sketch.add(v);
}
assert_golden("SIMPLE", &sketch, GOLDEN_SIMPLE);
}
#[test]
fn test_cross_language_single() {
let mut sketch = DDSketch::new(Config::defaults());
sketch.add(42.0);
assert_golden("SINGLE", &sketch, GOLDEN_SINGLE);
}
#[test]
fn test_cross_language_negative() {
let mut sketch = DDSketch::new(Config::defaults());
for v in [-3.0, -1.0, 2.0, 5.0] {
sketch.add(v);
}
assert_golden("NEGATIVE", &sketch, GOLDEN_NEGATIVE);
}
#[test]
fn test_cross_language_zero() {
let mut sketch = DDSketch::new(Config::defaults());
for v in [0.0, 1.0, 2.0] {
sketch.add(v);
}
assert_golden("ZERO", &sketch, GOLDEN_ZERO);
}
#[test]
fn test_cross_language_empty() {
let sketch = DDSketch::new(Config::defaults());
assert_golden("EMPTY", &sketch, GOLDEN_EMPTY);
}
#[test]
fn test_cross_language_many() {
let mut sketch = DDSketch::new(Config::defaults());
for i in 1..=100 {
sketch.add(i as f64);
}
assert_golden("MANY", &sketch, GOLDEN_MANY);
}
#[test]
fn test_decode_java_golden_bytes() {
for (name, hex) in [
("SIMPLE", GOLDEN_SIMPLE),
("SINGLE", GOLDEN_SINGLE),
("NEGATIVE", GOLDEN_NEGATIVE),
("ZERO", GOLDEN_ZERO),
("EMPTY", GOLDEN_EMPTY),
("MANY", GOLDEN_MANY),
] {
let bytes = hex_to_bytes(hex);
let result = DDSketch::from_java_bytes(&bytes);
assert!(
result.is_ok(),
"failed to decode {}: {:?}",
name,
result.err()
);
}
}
#[test]
fn test_encode_decode_many_values() {
let mut sketch = DDSketch::new(Config::defaults());
for i in 1..=100 {
sketch.add(i as f64);
}
let bytes = sketch.to_java_bytes();
let decoded = DDSketch::from_java_bytes(&bytes).unwrap();
assert_eq!(decoded.count(), 100);
assert_eq!(decoded.min(), Some(1.0));
assert_eq!(decoded.max(), Some(100.0));
assert_eq!(decoded.sum(), Some(5050.0));
let alpha = 0.01;
let orig_p95 = sketch.quantile(0.95).unwrap().unwrap();
let dec_p95 = decoded.quantile(0.95).unwrap().unwrap();
assert!(
(orig_p95 - dec_p95).abs() / orig_p95 < alpha,
"p95 mismatch: {} vs {}",
orig_p95,
dec_p95,
);
}
}

View File

@@ -1,52 +0,0 @@
//! This crate provides a direct port of the [Golang](https://github.com/DataDog/sketches-go)
//! [DDSketch](https://arxiv.org/pdf/1908.10693.pdf) implementation to Rust. All efforts
//! have been made to keep this as close to the original implementation as possible, with a few
//! tweaks to get closer to idiomatic Rust.
//!
//! # Usage
//!
//! Add multiple samples to a DDSketch and invoke the `quantile` method to pull any quantile from
//! 0.0* to *1.0*.
//!
//! ```rust
//! use sketches_ddsketch::{Config, DDSketch};
//!
//! let c = Config::defaults();
//! let mut d = DDSketch::new(c);
//!
//! d.add(1.0);
//! d.add(1.0);
//! d.add(1.0);
//!
//! let q = d.quantile(0.50).unwrap();
//!
//! assert!(q < Some(1.02));
//! assert!(q > Some(0.98));
//! ```
//!
//! Sketches can also be merged.
//!
//! ```rust
//! use sketches_ddsketch::{Config, DDSketch};
//!
//! let c = Config::defaults();
//! let mut d1 = DDSketch::new(c);
//! let mut d2 = DDSketch::new(c);
//!
//! d1.add(1.0);
//! d2.add(2.0);
//! d2.add(2.0);
//!
//! d1.merge(&d2);
//!
//! assert_eq!(d1.count(), 3);
//! ```
pub use self::config::Config;
pub use self::ddsketch::{DDSketch, DDSketchError};
pub use self::encoding::DecodeError;
mod config;
mod ddsketch;
pub mod encoding;
mod store;

View File

@@ -1,252 +0,0 @@
#[cfg(feature = "use_serde")]
use serde::{Deserialize, Serialize};
const CHUNK_SIZE: i32 = 128;
// Divide the `dividend` by the `divisor`, rounding towards positive infinity.
//
// Similar to the nightly only `std::i32::div_ceil`.
fn div_ceil(dividend: i32, divisor: i32) -> i32 {
(dividend + divisor - 1) / divisor
}
/// CollapsingLowestDenseStore
#[derive(Clone, Debug)]
#[cfg_attr(feature = "use_serde", derive(Serialize, Deserialize))]
pub struct Store {
pub(crate) bins: Vec<u64>,
pub(crate) count: u64,
pub(crate) min_key: i32,
pub(crate) max_key: i32,
pub(crate) offset: i32,
pub(crate) bin_limit: usize,
is_collapsed: bool,
}
impl Store {
pub fn new(bin_limit: usize) -> Self {
Store {
bins: Vec::new(),
count: 0,
min_key: i32::MAX,
max_key: i32::MIN,
offset: 0,
bin_limit,
is_collapsed: false,
}
}
/// Return the number of bins.
pub fn length(&self) -> i32 {
self.bins.len() as i32
}
pub fn is_empty(&self) -> bool {
self.bins.is_empty()
}
pub fn add(&mut self, key: i32) {
let idx = self.get_index(key);
self.bins[idx] += 1;
self.count += 1;
}
/// See Java: https://github.com/DataDog/sketches-java/blob/master/src/main/java/com/datadoghq/sketch/ddsketch/store/DenseStore.java (add(int index, double count) method)
pub(crate) fn add_count(&mut self, key: i32, count: u64) {
let idx = self.get_index(key);
self.bins[idx] += count;
self.count += count;
}
fn get_index(&mut self, key: i32) -> usize {
if key < self.min_key {
if self.is_collapsed {
return 0;
}
self.extend_range(key, None);
if self.is_collapsed {
return 0;
}
} else if key > self.max_key {
self.extend_range(key, None);
}
(key - self.offset) as usize
}
fn extend_range(&mut self, key: i32, second_key: Option<i32>) {
let second_key = second_key.unwrap_or(key);
let new_min_key = i32::min(key, i32::min(second_key, self.min_key));
let new_max_key = i32::max(key, i32::max(second_key, self.max_key));
if self.is_empty() {
let new_len = self.get_new_length(new_min_key, new_max_key);
self.bins.resize(new_len, 0);
self.offset = new_min_key;
self.adjust(new_min_key, new_max_key);
} else if new_min_key >= self.min_key && new_max_key < self.offset + self.length() {
self.min_key = new_min_key;
self.max_key = new_max_key;
} else {
// Grow bins
let new_length = self.get_new_length(new_min_key, new_max_key);
if new_length > self.length() as usize {
self.bins.resize(new_length, 0);
}
self.adjust(new_min_key, new_max_key);
}
}
fn get_new_length(&self, new_min_key: i32, new_max_key: i32) -> usize {
let desired_length = new_max_key - new_min_key + 1;
usize::min(
(CHUNK_SIZE * div_ceil(desired_length, CHUNK_SIZE)) as usize,
self.bin_limit,
)
}
fn adjust(&mut self, new_min_key: i32, new_max_key: i32) {
if new_max_key - new_min_key + 1 > self.length() {
let new_min_key = new_max_key - self.length() + 1;
if new_min_key >= self.max_key {
// Put everything in the first bin.
self.offset = new_min_key;
self.min_key = new_min_key;
self.bins.fill(0);
self.bins[0] = self.count;
} else {
let shift = self.offset - new_min_key;
if shift < 0 {
let collapse_start_index = (self.min_key - self.offset) as usize;
let collapse_end_index = (new_min_key - self.offset) as usize;
let collapsed_count: u64 = self.bins[collapse_start_index..collapse_end_index]
.iter()
.sum();
let zero_len = (new_min_key - self.min_key) as usize;
self.bins.splice(
collapse_start_index..collapse_end_index,
std::iter::repeat_n(0, zero_len),
);
self.bins[collapse_end_index] += collapsed_count;
}
self.min_key = new_min_key;
self.shift_bins(shift);
}
self.max_key = new_max_key;
self.is_collapsed = true;
} else {
self.center_bins(new_min_key, new_max_key);
self.min_key = new_min_key;
self.max_key = new_max_key;
}
}
fn shift_bins(&mut self, shift: i32) {
if shift > 0 {
let shift = shift as usize;
self.bins.rotate_right(shift);
for idx in 0..shift {
self.bins[idx] = 0;
}
} else {
let shift = shift.unsigned_abs() as usize;
for idx in 0..shift {
self.bins[idx] = 0;
}
self.bins.rotate_left(shift);
}
self.offset -= shift;
}
fn center_bins(&mut self, new_min_key: i32, new_max_key: i32) {
let middle_key = new_min_key + (new_max_key - new_min_key + 1) / 2;
let shift = self.offset + self.length() / 2 - middle_key;
self.shift_bins(shift)
}
pub fn key_at_rank(&self, rank: u64) -> i32 {
let mut n = 0;
for (i, bin) in self.bins.iter().enumerate() {
n += *bin;
if n > rank {
return i as i32 + self.offset;
}
}
self.max_key
}
pub fn count(&self) -> u64 {
self.count
}
pub fn merge(&mut self, other: &Store) {
if other.count == 0 {
return;
}
if self.count == 0 {
self.copy(other);
return;
}
if other.min_key < self.min_key || other.max_key > self.max_key {
self.extend_range(other.min_key, Some(other.max_key));
}
let collapse_start_index = other.min_key - other.offset;
let mut collapse_end_index = i32::min(self.min_key, other.max_key + 1) - other.offset;
if collapse_end_index > collapse_start_index {
let collapsed_count: u64 = self.bins
[collapse_start_index as usize..collapse_end_index as usize]
.iter()
.sum();
self.bins[0] += collapsed_count;
} else {
collapse_end_index = collapse_start_index;
}
for key in (collapse_end_index + other.offset)..(other.max_key + 1) {
self.bins[(key - self.offset) as usize] += other.bins[(key - other.offset) as usize]
}
self.count += other.count;
}
fn copy(&mut self, o: &Store) {
self.bins = o.bins.clone();
self.count = o.count;
self.min_key = o.min_key;
self.max_key = o.max_key;
self.offset = o.offset;
self.bin_limit = o.bin_limit;
self.is_collapsed = o.is_collapsed;
}
}
#[cfg(test)]
mod tests {
use crate::store::Store;
#[test]
fn test_simple_store() {
let mut s = Store::new(2048);
for i in 0..2048 {
s.add(i);
}
}
#[test]
fn test_simple_store_rev() {
let mut s = Store::new(2048);
for i in (0..2048).rev() {
s.add(i);
}
}
}

View File

@@ -1,88 +0,0 @@
use std::cmp::Ordering;
use std::f64::NAN;
pub struct Dataset {
values: Vec<f64>,
sum: f64,
sorted: bool,
}
fn cmp_f64(a: &f64, b: &f64) -> Ordering {
assert!(!a.is_nan() && !b.is_nan());
if a < b {
return Ordering::Less;
} else if a > b {
return Ordering::Greater;
} else {
return Ordering::Equal;
}
}
impl Dataset {
pub fn new() -> Self {
Dataset {
values: Vec::new(),
sum: 0.0,
sorted: false,
}
}
pub fn add(&mut self, value: f64) {
self.values.push(value);
self.sum += value;
self.sorted = false;
}
// pub fn quantile(&mut self, q: f64) -> f64 {
// self.lower_quantile(q)
// }
pub fn lower_quantile(&mut self, q: f64) -> f64 {
if q < 0.0 || q > 1.0 || self.values.len() == 0 {
return NAN;
}
self.sort();
let rank = q * (self.values.len() - 1) as f64;
self.values[rank.floor() as usize]
}
pub fn upper_quantile(&mut self, q: f64) -> f64 {
if q < 0.0 || q > 1.0 || self.values.len() == 0 {
return NAN;
}
self.sort();
let rank = q * (self.values.len() - 1) as f64;
self.values[rank.ceil() as usize]
}
pub fn min(&mut self) -> f64 {
self.sort();
self.values[0]
}
pub fn max(&mut self) -> f64 {
self.sort();
self.values[self.values.len() - 1]
}
pub fn sum(&self) -> f64 {
self.sum
}
pub fn count(&self) -> usize {
self.values.len()
}
fn sort(&mut self) {
if self.sorted {
return;
}
self.values.sort_by(cmp_f64);
self.sorted = true;
}
}

View File

@@ -1,100 +0,0 @@
extern crate rand;
extern crate rand_distr;
use rand::prelude::*;
pub trait Generator {
fn generate(&mut self) -> f64;
}
// Constant generator
//
pub struct Constant {
value: f64,
}
impl Constant {
pub fn new(value: f64) -> Self {
Constant { value }
}
}
impl Generator for Constant {
fn generate(&mut self) -> f64 {
self.value
}
}
// Linear generator
//
pub struct Linear {
current_value: f64,
step: f64,
}
impl Linear {
pub fn new(start_value: f64, step: f64) -> Self {
Linear {
current_value: start_value,
step,
}
}
}
impl Generator for Linear {
fn generate(&mut self) -> f64 {
let value = self.current_value;
self.current_value += self.step;
value
}
}
// Normal distribution generator
//
pub struct Normal {
distr: rand_distr::Normal<f64>,
}
impl Normal {
pub fn new(mean: f64, stddev: f64) -> Self {
Normal {
distr: rand_distr::Normal::new(mean, stddev).unwrap(),
}
}
}
impl Generator for Normal {
fn generate(&mut self) -> f64 {
self.distr.sample(&mut rand::thread_rng())
}
}
// Lognormal distribution generator
//
pub struct Lognormal {
distr: rand_distr::LogNormal<f64>,
}
impl Lognormal {
pub fn new(mean: f64, stddev: f64) -> Self {
Lognormal {
distr: rand_distr::LogNormal::new(mean, stddev).unwrap(),
}
}
}
impl Generator for Lognormal {
fn generate(&mut self) -> f64 {
self.distr.sample(&mut rand::thread_rng())
}
}
// Exponential distribution generator
//
pub struct Exponential {
distr: rand_distr::Exp<f64>,
}
impl Exponential {
pub fn new(lambda: f64) -> Self {
Exponential {
distr: rand_distr::Exp::new(lambda).unwrap(),
}
}
}
impl Generator for Exponential {
fn generate(&mut self) -> f64 {
self.distr.sample(&mut rand::thread_rng())
}
}

View File

@@ -1,2 +0,0 @@
pub mod dataset;
pub mod generator;

View File

@@ -1,316 +0,0 @@
mod common;
use std::time::Instant;
use common::dataset::Dataset;
use common::generator;
use common::generator::Generator;
use sketches_ddsketch::{Config, DDSketch};
const TEST_ALPHA: f64 = 0.01;
const TEST_MAX_BINS: u32 = 1024;
const TEST_MIN_VALUE: f64 = 1.0e-9;
// Used for float equality
const TEST_ERROR_THRESH: f64 = 1.0e-9;
const TEST_SIZES: [usize; 5] = [3, 5, 10, 100, 1000];
const TEST_QUANTILES: [f64; 10] = [0.0, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99, 0.999, 1.0];
#[test]
fn test_constant() {
evaluate_sketches(|| Box::new(generator::Constant::new(42.0)));
}
#[test]
fn test_linear() {
evaluate_sketches(|| Box::new(generator::Linear::new(0.0, 1.0)));
}
#[test]
fn test_normal() {
evaluate_sketches(|| Box::new(generator::Normal::new(35.0, 1.0)));
}
#[test]
fn test_lognormal() {
evaluate_sketches(|| Box::new(generator::Lognormal::new(0.0, 2.0)));
}
#[test]
fn test_exponential() {
evaluate_sketches(|| Box::new(generator::Exponential::new(2.0)));
}
fn evaluate_test_sizes(f: impl Fn(usize)) {
for sz in &TEST_SIZES {
f(*sz);
}
}
fn evaluate_sketches(gen_factory: impl Fn() -> Box<dyn generator::Generator>) {
evaluate_test_sizes(|sz: usize| {
let mut generator = gen_factory();
evaluate_sketch(sz, &mut generator);
});
}
fn new_config() -> Config {
Config::new(TEST_ALPHA, TEST_MAX_BINS, TEST_MIN_VALUE)
}
fn assert_float_eq(a: f64, b: f64) {
assert!((a - b).abs() < TEST_ERROR_THRESH, "{} != {}", a, b);
}
fn evaluate_sketch(count: usize, generator: &mut Box<dyn generator::Generator>) {
let c = new_config();
let mut g = DDSketch::new(c);
let mut d = Dataset::new();
for _i in 0..count {
let value = generator.generate();
g.add(value);
d.add(value);
}
compare_sketches(&mut d, &g);
}
fn compare_sketches(d: &mut Dataset, g: &DDSketch) {
for q in &TEST_QUANTILES {
let lower = d.lower_quantile(*q);
let upper = d.upper_quantile(*q);
let min_expected;
if lower < 0.0 {
min_expected = lower * (1.0 + TEST_ALPHA);
} else {
min_expected = lower * (1.0 - TEST_ALPHA);
}
let max_expected;
if upper > 0.0 {
max_expected = upper * (1.0 + TEST_ALPHA);
} else {
max_expected = upper * (1.0 - TEST_ALPHA);
}
let quantile = g.quantile(*q).unwrap().unwrap();
assert!(
min_expected <= quantile,
"Lower than min, quantile: {}, wanted {} <= {}",
*q,
min_expected,
quantile
);
assert!(
quantile <= max_expected,
"Higher than max, quantile: {}, wanted {} <= {}",
*q,
quantile,
max_expected
);
// verify that calls do not modify result (not mut so not possible?)
let quantile2 = g.quantile(*q).unwrap().unwrap();
assert_eq!(quantile, quantile2);
}
assert_eq!(g.min().unwrap(), d.min());
assert_eq!(g.max().unwrap(), d.max());
assert_float_eq(g.sum().unwrap(), d.sum());
assert_eq!(g.count(), d.count());
}
#[test]
fn test_merge_normal() {
evaluate_test_sizes(|sz: usize| {
let c = new_config();
let mut d = Dataset::new();
let mut g1 = DDSketch::new(c);
let mut generator1 = generator::Normal::new(35.0, 1.0);
for _ in (0..sz).step_by(3) {
let value = generator1.generate();
g1.add(value);
d.add(value);
}
let mut g2 = DDSketch::new(c);
let mut generator2 = generator::Normal::new(50.0, 2.0);
for _ in (1..sz).step_by(3) {
let value = generator2.generate();
g2.add(value);
d.add(value);
}
g1.merge(&g2).unwrap();
let mut g3 = DDSketch::new(c);
let mut generator3 = generator::Normal::new(40.0, 0.5);
for _ in (2..sz).step_by(3) {
let value = generator3.generate();
g3.add(value);
d.add(value);
}
g1.merge(&g3).unwrap();
compare_sketches(&mut d, &g1);
});
}
#[test]
fn test_merge_empty() {
evaluate_test_sizes(|sz: usize| {
let c = new_config();
let mut d = Dataset::new();
let mut g1 = DDSketch::new(c);
let mut g2 = DDSketch::new(c);
let mut generator = generator::Exponential::new(5.0);
for _ in 0..sz {
let value = generator.generate();
g2.add(value);
d.add(value);
}
g1.merge(&g2).unwrap();
compare_sketches(&mut d, &g1);
let g3 = DDSketch::new(c);
g2.merge(&g3).unwrap();
compare_sketches(&mut d, &g2);
});
}
#[test]
fn test_merge_mixed() {
evaluate_test_sizes(|sz: usize| {
let c = new_config();
let mut d = Dataset::new();
let mut g1 = DDSketch::new(c);
let mut generator1 = generator::Normal::new(100.0, 1.0);
for _ in (0..sz).step_by(3) {
let value = generator1.generate();
g1.add(value);
d.add(value);
}
let mut g2 = DDSketch::new(c);
let mut generator2 = generator::Exponential::new(5.0);
for _ in (1..sz).step_by(3) {
let value = generator2.generate();
g2.add(value);
d.add(value);
}
g1.merge(&g2).unwrap();
let mut g3 = DDSketch::new(c);
let mut generator3 = generator::Exponential::new(0.1);
for _ in (2..sz).step_by(3) {
let value = generator3.generate();
g3.add(value);
d.add(value);
}
g1.merge(&g3).unwrap();
compare_sketches(&mut d, &g1);
})
}
#[test]
fn test_merge_incompatible() {
let c1 = Config::new(TEST_ALPHA, TEST_MAX_BINS, TEST_MIN_VALUE);
let c2 = Config::new(TEST_ALPHA * 2.0, TEST_MAX_BINS, TEST_MIN_VALUE);
let mut d1 = DDSketch::new(c1);
let d2 = DDSketch::new(c2);
assert!(d1.merge(&d2).is_err());
let c3 = Config::new(TEST_ALPHA, TEST_MAX_BINS, TEST_MIN_VALUE * 10.0);
let d3 = DDSketch::new(c3);
assert!(d1.merge(&d3).is_err());
let c4 = Config::new(TEST_ALPHA, TEST_MAX_BINS * 2, TEST_MIN_VALUE);
let d4 = DDSketch::new(c4);
assert!(d1.merge(&d4).is_err());
// the same should work
let c5 = Config::new(TEST_ALPHA, TEST_MAX_BINS, TEST_MIN_VALUE);
let dsame = DDSketch::new(c5);
assert!(d1.merge(&dsame).is_ok());
}
#[test]
#[ignore]
fn test_performance_insert() {
let c = Config::defaults();
let mut g = DDSketch::new(c);
let mut gen = generator::Normal::new(1000.0, 500.0);
let count = 300_000_000;
let mut values = Vec::new();
for _ in 0..count {
values.push(gen.generate());
}
let start_time = Instant::now();
for value in values {
g.add(value);
}
// This simply ensures the operations don't get optimzed out as ignored
let quantile = g.quantile(0.50).unwrap().unwrap();
let elapsed = start_time.elapsed().as_micros() as f64;
let elapsed = elapsed / 1_000_000.0;
println!(
"RESULT: p50={:.2} => Added {}M samples in {:2} secs ({:.2}M samples/sec)",
quantile,
count / 1_000_000,
elapsed,
(count as f64) / 1_000_000.0 / elapsed
);
}
#[test]
#[ignore]
fn test_performance_merge() {
let c = Config::defaults();
let mut gen = generator::Normal::new(1000.0, 500.0);
let merge_count = 500_000;
let sample_count = 1_000;
let mut sketches = Vec::new();
for _ in 0..merge_count {
let mut d = DDSketch::new(c);
for _ in 0..sample_count {
d.add(gen.generate());
}
sketches.push(d);
}
let mut base = DDSketch::new(c);
let start_time = Instant::now();
for sketch in &sketches {
base.merge(sketch).unwrap();
}
let elapsed = start_time.elapsed().as_micros() as f64;
let elapsed = elapsed / 1_000_000.0;
println!(
"RESULT: Merged {} sketches in {:2} secs ({:.2} merges/sec)",
merge_count,
elapsed,
(merge_count as f64) / elapsed
);
}

View File

@@ -95,21 +95,11 @@ pub(crate) fn get_all_ff_reader_or_empty(
allowed_column_types: Option<&[ColumnType]>,
fallback_type: ColumnType,
) -> crate::Result<Vec<(columnar::Column<u64>, ColumnType)>> {
let mut ff_field_with_type = get_all_ff_readers(reader, field_name, allowed_column_types)?;
let ff_fields = reader.fast_fields();
let mut ff_field_with_type =
ff_fields.u64_lenient_for_type_all(allowed_column_types, field_name)?;
if ff_field_with_type.is_empty() {
ff_field_with_type.push((Column::build_empty_column(reader.num_docs()), fallback_type));
}
Ok(ff_field_with_type)
}
/// Get all fast field reader.
pub(crate) fn get_all_ff_readers(
reader: &SegmentReader,
field_name: &str,
allowed_column_types: Option<&[ColumnType]>,
) -> crate::Result<Vec<(columnar::Column<u64>, ColumnType)>> {
let ff_fields = reader.fast_fields();
let ff_field_with_type =
ff_fields.u64_lenient_for_type_all(allowed_column_types, field_name)?;
Ok(ff_field_with_type)
}

View File

@@ -9,12 +9,12 @@ use crate::aggregation::accessor_helpers::{
get_numeric_or_date_column_types,
};
use crate::aggregation::agg_req::{Aggregation, AggregationVariants, Aggregations};
pub use crate::aggregation::bucket::{CompositeAggReqData, CompositeSourceAccessors};
use crate::aggregation::bucket::{
build_segment_filter_collector, build_segment_range_collector, CompositeAggregation,
FilterAggReqData, HistogramAggReqData, HistogramBounds, IncludeExcludeParam,
MissingTermAggReqData, RangeAggReqData, SegmentCompositeCollector, SegmentHistogramCollector,
TermMissingAgg, TermsAggReqData, TermsAggregation, TermsAggregationInternal,
build_segment_filter_collector, build_segment_range_collector, CompositeAggReqData,
CompositeAggregation, CompositeSourceAccessors, FilterAggReqData, HistogramAggReqData,
HistogramBounds, IncludeExcludeParam, MissingTermAggReqData, RangeAggReqData,
SegmentHistogramCollector, TermMissingAgg, TermsAggReqData, TermsAggregation,
TermsAggregationInternal,
};
use crate::aggregation::metric::{
build_segment_stats_collector, AverageAggregation, CardinalityAggReqData,
@@ -143,14 +143,8 @@ impl AggregationsSegmentCtx {
.as_deref_mut()
.expect("histogram_req_data slot is empty (taken)")
}
#[inline]
pub(crate) fn get_composite_req_data_mut(&mut self, idx: usize) -> &mut CompositeAggReqData {
self.per_request.composite_req_data[idx]
.as_deref_mut()
.expect("composite_req_data slot is empty (taken)")
}
// ---------- take / put (terms, histogram, range, composite) ----------
// ---------- take / put (terms, histogram, range) ----------
/// Move out the boxed Histogram request at `idx`, leaving `None`.
#[inline]
@@ -238,8 +232,6 @@ pub struct PerRequestAggSegCtx {
pub range_req_data: Vec<Option<Box<RangeAggReqData>>>,
/// FilterAggReqData contains the request data for a filter aggregation.
pub filter_req_data: Vec<Option<Box<FilterAggReqData>>>,
/// CompositeAggReqData contains the request data for a composite aggregation.
pub composite_req_data: Vec<Option<Box<CompositeAggReqData>>>,
/// Shared by avg, min, max, sum, stats, extended_stats, count
pub stats_metric_req_data: Vec<MetricAggReqData>,
/// CardinalityAggReqData contains the request data for a cardinality aggregation.
@@ -248,6 +240,8 @@ pub struct PerRequestAggSegCtx {
pub top_hits_req_data: Vec<TopHitsAggReqData>,
/// MissingTermAggReqData contains the request data for a missing term aggregation.
pub missing_term_req_data: Vec<MissingTermAggReqData>,
/// CompositeAggReqData contains the request data for a composite aggregation.
pub composite_req_data: Vec<Option<Box<CompositeAggReqData>>>,
/// Request tree used to build collectors.
pub agg_tree: Vec<AggRefNode>,
@@ -298,7 +292,7 @@ impl PerRequestAggSegCtx {
+ self
.composite_req_data
.iter()
.map(|t| t.as_ref().unwrap().get_memory_consumption())
.map(|b| b.as_ref().map(|d| d.get_memory_consumption()).unwrap_or(0))
.sum::<usize>()
+ self.agg_tree.len() * std::mem::size_of::<AggRefNode>()
}
@@ -336,7 +330,7 @@ impl PerRequestAggSegCtx {
.expect("filter_req_data slot is empty (taken)")
.name
.as_str(),
AggKind::Composite => &self.composite_req_data[idx]
AggKind::Composite => self.composite_req_data[idx]
.as_deref()
.expect("composite_req_data slot is empty (taken)")
.name
@@ -467,9 +461,11 @@ pub(crate) fn build_segment_agg_collector(
)?)),
AggKind::Range => Ok(build_segment_range_collector(req, node)?),
AggKind::Filter => build_segment_filter_collector(req, node),
AggKind::Composite => Ok(Box::new(SegmentCompositeCollector::from_req_and_validate(
req, node,
)?)),
AggKind::Composite => Ok(Box::new(
crate::aggregation::bucket::SegmentCompositeCollector::from_req_and_validate(
req, node,
)?,
)),
}
}
@@ -764,6 +760,14 @@ fn build_nodes(
children,
}])
}
AggregationVariants::Composite(composite_req) => Ok(vec![build_composite_node(
agg_name,
reader,
segment_ordinal,
data,
&req.sub_aggregation,
composite_req,
)?]),
AggregationVariants::Filter(filter_req) => {
// Build the query and evaluator upfront
let schema = reader.schema();
@@ -795,17 +799,38 @@ fn build_nodes(
children,
}])
}
AggregationVariants::Composite(composite_req) => Ok(vec![build_composite_node(
agg_name,
reader,
segment_ordinal,
data,
&req.sub_aggregation,
composite_req,
)?]),
}
}
fn build_composite_node(
agg_name: &str,
reader: &SegmentReader,
_segment_ordinal: SegmentOrdinal,
data: &mut AggregationsSegmentCtx,
sub_aggs: &Aggregations,
req: &CompositeAggregation,
) -> crate::Result<AggRefNode> {
let mut composite_accessors = Vec::with_capacity(req.sources.len());
for source in &req.sources {
let source_after_key_opt = req.after.get(source.name()).map(|k| &k.0);
let source_accessor =
CompositeSourceAccessors::build_for_source(reader, source, source_after_key_opt)?;
composite_accessors.push(source_accessor);
}
let agg = CompositeAggReqData {
name: agg_name.to_string(),
req: req.clone(),
composite_accessors,
};
let idx = data.push_composite_req_data(agg);
let children = build_children(sub_aggs, reader, _segment_ordinal, data)?;
Ok(AggRefNode {
kind: AggKind::Composite,
idx_in_req_data: idx,
children,
})
}
fn build_children(
aggs: &Aggregations,
reader: &SegmentReader,
@@ -998,35 +1023,6 @@ fn build_terms_or_cardinality_nodes(
Ok(nodes)
}
fn build_composite_node(
agg_name: &str,
reader: &SegmentReader,
segment_ordinal: SegmentOrdinal,
data: &mut AggregationsSegmentCtx,
sub_aggs: &Aggregations,
req: &CompositeAggregation,
) -> crate::Result<AggRefNode> {
let mut composite_accessors = Vec::with_capacity(req.sources.len());
for source in &req.sources {
let source_after_key_opt = req.after.get(source.name()).map(|k| &k.0);
let source_accessor =
CompositeSourceAccessors::build_for_source(reader, source, source_after_key_opt)?;
composite_accessors.push(source_accessor);
}
let agg = CompositeAggReqData {
name: agg_name.to_string(),
req: req.clone(),
composite_accessors,
};
let idx = data.push_composite_req_data(agg);
let children = build_children(sub_aggs, reader, segment_ordinal, data)?;
Ok(AggRefNode {
kind: AggKind::Composite,
idx_in_req_data: idx,
children,
})
}
/// Builds a single BitSet of allowed term ordinals for a string dictionary column according to
/// include/exclude parameters.
fn build_allowed_term_ids_for_str(

View File

@@ -32,15 +32,14 @@ use rustc_hash::FxHashMap;
use serde::{Deserialize, Serialize};
use super::bucket::{
DateHistogramAggregationReq, FilterAggregation, HistogramAggregation, RangeAggregation,
TermsAggregation,
CompositeAggregation, DateHistogramAggregationReq, FilterAggregation, HistogramAggregation,
RangeAggregation, TermsAggregation,
};
use super::metric::{
AverageAggregation, CardinalityAggregationReq, CountAggregation, ExtendedStatsAggregation,
MaxAggregation, MinAggregation, PercentilesAggregationReq, StatsAggregation, SumAggregation,
TopHitsAggregationReq,
};
use crate::aggregation::bucket::CompositeAggregation;
/// The top-level aggregation request structure, which contains [`Aggregation`] and their user
/// defined names. It is also used in buckets aggregations to define sub-aggregations.
@@ -135,7 +134,7 @@ pub enum AggregationVariants {
/// Filter documents into a single bucket.
#[serde(rename = "filter")]
Filter(FilterAggregation),
/// Put data into multi level paginated buckets.
/// Multi-dimensional, paginable bucket aggregation.
#[serde(rename = "composite")]
Composite(CompositeAggregation),
@@ -187,7 +186,7 @@ impl AggregationVariants {
AggregationVariants::Composite(composite) => composite
.sources
.iter()
.map(|source_map| source_map.field())
.map(|source| source.field())
.collect(),
AggregationVariants::Average(avg) => vec![avg.field_name()],
AggregationVariants::Count(count) => vec![count.field_name()],

View File

@@ -9,12 +9,12 @@ use rustc_hash::FxHashMap;
use serde::{Deserialize, Serialize};
use super::bucket::GetDocCount;
use super::intermediate_agg_result::CompositeIntermediateKey;
use super::metric::{
ExtendedStats, PercentilesMetricResult, SingleMetricResult, Stats, TopHitsMetricResult,
};
use super::{AggregationError, Key};
use crate::aggregation::bucket::AfterKey;
use crate::aggregation::intermediate_agg_result::CompositeIntermediateKey;
use crate::TantivyError;
#[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize)]
@@ -160,11 +160,9 @@ pub enum BucketResult {
},
/// This is the filter result - a single bucket with sub-aggregations
Filter(FilterBucketResult),
/// This is the composite aggregation result
/// This is the composite result
Composite {
/// The buckets
///
/// See [`CompositeAggregation`](super::bucket::CompositeAggregation)
buckets: Vec<CompositeBucketEntry>,
/// The key to start after when paginating
#[serde(skip_serializing_if = "FxHashMap::is_empty")]
@@ -353,10 +351,6 @@ pub struct FilterBucketResult {
pub sub_aggregations: AggregationResults,
}
/// The JSON mappable key to identify a composite bucket.
///
/// This is similar to `Key`, but composite keys can also be boolean and null.
///
/// Note the type information loss compared to `CompositeIntermediateKey`.
/// Pagination is performed using `AfterKey`, which encodes type information.
#[derive(Clone, Debug, Serialize, Deserialize)]
@@ -398,15 +392,7 @@ impl PartialEq for CompositeKey {
(Self::I64(l), Self::I64(r)) => l == r,
(Self::U64(l), Self::U64(r)) => l == r,
(Self::Null, Self::Null) => true,
(
Self::Bool(_)
| Self::Str(_)
| Self::F64(_)
| Self::I64(_)
| Self::U64(_)
| Self::Null,
_,
) => false,
_ => false,
}
}
}
@@ -415,7 +401,6 @@ impl From<CompositeIntermediateKey> for CompositeKey {
match value {
CompositeIntermediateKey::Str(s) => Self::Str(s),
CompositeIntermediateKey::IpAddr(s) => {
// Prefer to use the IPv4 representation if possible
if let Some(ip) = s.to_ipv4_mapped() {
Self::Str(ip.to_string())
} else {
@@ -426,43 +411,13 @@ impl From<CompositeIntermediateKey> for CompositeKey {
CompositeIntermediateKey::Bool(f) => Self::Bool(f),
CompositeIntermediateKey::U64(f) => Self::U64(f),
CompositeIntermediateKey::I64(f) => Self::I64(f),
CompositeIntermediateKey::DateTime(f) => Self::I64(f / 1_000_000), // Convert ns to ms
CompositeIntermediateKey::DateTime(f) => Self::I64(f / 1_000_000), // ns to ms
CompositeIntermediateKey::Null => Self::Null,
}
}
}
/// This is the default entry for a bucket, which contains a composite key, count, and optionally
/// sub-aggregations.
/// ...
/// "my_composite": {
/// "buckets": [
/// {
/// "key": {
/// "date": 1494201600000,
/// "product": "rocky"
/// },
/// "doc_count": 5
/// },
/// {
/// "key": {
/// "date": 1494201600000,
/// "product": "balboa"
/// },
/// "doc_count": 2
/// },
/// {
/// "key": {
/// "date": 1494201700000,
/// "product": "john"
/// },
/// "doc_count": 3
/// }
/// ]
/// }
/// ...
/// }
/// ```
/// Composite bucket entry with a multi-dimensional key.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct CompositeBucketEntry {
/// The identifier of the bucket.

View File

@@ -1,10 +1,9 @@
use std::fmt::Debug;
use std::net::Ipv6Addr;
use columnar::column_values::{CompactHit, CompactSpaceU64Accessor};
use columnar::column_values::CompactSpaceU64Accessor;
use columnar::{Column, ColumnType, MonotonicallyMappableToU64, StrColumn, TermOrdHit};
use crate::aggregation::accessor_helpers::{get_all_ff_readers, get_numeric_or_date_column_types};
use crate::aggregation::accessor_helpers::get_numeric_or_date_column_types;
use crate::aggregation::bucket::composite::numeric_types::num_proj;
use crate::aggregation::bucket::composite::numeric_types::num_proj::ProjectedNumber;
use crate::aggregation::bucket::composite::ToTypePaginationOrder;
@@ -116,11 +115,14 @@ impl CompositeSourceAccessors {
ColumnType::IpAddr,
// ColumnType::Bytes Unsupported
];
let mut columns_and_types =
get_all_ff_readers(reader, &source.field, Some(&allowed_column_types))?;
let mut columns_and_types = reader
.fast_fields()
.u64_lenient_for_type_all(Some(&allowed_column_types), &source.field)?;
// Sort columns by their pagination order and determine which to skip
columns_and_types.sort_by_key(|(_, col_type)| col_type.column_pagination_order());
columns_and_types.sort_by_key(|(_, col_type): &(Column, ColumnType)| {
col_type.column_pagination_order()
});
if source.order == Order::Desc {
columns_and_types.reverse();
}
@@ -172,11 +174,11 @@ impl CompositeSourceAccessors {
})
}
CompositeAggregationSource::Histogram(source) => {
let column_and_types: Vec<(Column, ColumnType)> = get_all_ff_readers(
reader,
&source.field,
Some(get_numeric_or_date_column_types()),
)?;
let column_and_types: Vec<(Column, ColumnType)> =
reader.fast_fields().u64_lenient_for_type_all(
Some(get_numeric_or_date_column_types()),
&source.field,
)?;
let source_collectors: Vec<CompositeAccessor> = column_and_types
.into_iter()
.map(|(column, column_type)| {
@@ -212,8 +214,9 @@ impl CompositeSourceAccessors {
})
}
CompositeAggregationSource::DateHistogram(source) => {
let column_and_types =
get_all_ff_readers(reader, &source.field, Some(&[ColumnType::DateTime]))?;
let column_and_types = reader
.fast_fields()
.u64_lenient_for_type_all(Some(&[ColumnType::DateTime]), &source.field)?;
let date_histogram_interval =
PrecomputedDateInterval::from_date_histogram_source_intervals(
&source.fixed_interval,
@@ -377,16 +380,6 @@ impl From<TermOrdHit> for PrecomputedAfterKey {
}
}
impl From<CompactHit> for PrecomputedAfterKey {
fn from(hit: CompactHit) -> Self {
match hit {
CompactHit::Exact(ord) => PrecomputedAfterKey::Exact(ord as u64),
CompactHit::Next(ord) => PrecomputedAfterKey::Next(ord as u64),
CompactHit::AfterLast => PrecomputedAfterKey::AfterLast,
}
}
}
impl<T: MonotonicallyMappableToU64> From<ProjectedNumber<T>> for PrecomputedAfterKey {
fn from(num: ProjectedNumber<T>) -> Self {
match num {
@@ -425,18 +418,58 @@ impl PrecomputedAfterKey {
}
fn precompute_ip_addr(column: &Column<u64>, key: &Ipv6Addr) -> crate::Result<Self> {
let compact_space_accessor = column
.values
.clone()
.downcast_arc::<CompactSpaceU64Accessor>()
.map_err(|_| {
TantivyError::AggregationError(crate::aggregation::AggregationError::InternalError(
"type mismatch: could not downcast to CompactSpaceU64Accessor".to_string(),
))
})?;
// For IP addresses we need to find the compact space value.
// We try to convert via the column's min/max range scan.
// Since CompactSpaceU64Accessor::u128_to_compact is not public,
// we search linearly for the exact u64 value by scanning column values.
let ip_u128 = key.to_bits();
let ip_next_compact = compact_space_accessor.u128_to_next_compact(ip_u128);
Ok(ip_next_compact.into())
// Scan for matching value - IP columns are typically small
let num_vals = column.values.num_vals();
let mut found_exact = false;
let mut exact_compact = 0u64;
let mut best_next: Option<u64> = None;
for doc_id in 0..num_vals {
let val = column.values.get_val(doc_id);
// We need the CompactSpaceU64Accessor to convert compact to u128
let compact_accessor = column
.values
.clone()
.downcast_arc::<CompactSpaceU64Accessor>()
.map_err(|_| {
TantivyError::AggregationError(
crate::aggregation::AggregationError::InternalError(
"type mismatch: could not downcast to CompactSpaceU64Accessor"
.to_string(),
),
)
})?;
let val_u128 = compact_accessor.compact_to_u128(val as u32);
if val_u128 == ip_u128 {
found_exact = true;
exact_compact = val;
break;
} else if val_u128 > ip_u128 {
match best_next {
None => best_next = Some(val),
Some(current_best) => {
let current_u128 = compact_accessor.compact_to_u128(current_best as u32);
if val_u128 < current_u128 {
best_next = Some(val);
}
}
}
}
}
if found_exact {
Ok(PrecomputedAfterKey::Exact(exact_compact))
} else if let Some(next) = best_next {
Ok(PrecomputedAfterKey::Next(next))
} else {
Ok(PrecomputedAfterKey::AfterLast)
}
}
fn precompute_term_ord(

View File

@@ -1,4 +1,5 @@
use std::fmt::Debug;
use std::mem;
use std::net::Ipv6Addr;
use columnar::column_values::CompactSpaceU64Accessor;
@@ -20,75 +21,94 @@ use crate::aggregation::bucket::composite::map::{DynArrayHeapMap, MAX_DYN_ARRAY_
use crate::aggregation::bucket::{
CalendarInterval, CompositeAggregationSource, MissingOrder, Order,
};
use crate::aggregation::cached_sub_aggs::{CachedSubAggs, HighCardSubAggCache};
use crate::aggregation::intermediate_agg_result::{
CompositeIntermediateKey, IntermediateAggregationResult, IntermediateAggregationResults,
IntermediateBucketResult, IntermediateCompositeBucketEntry, IntermediateCompositeBucketResult,
};
use crate::aggregation::segment_agg_result::SegmentAggregationCollector;
use crate::aggregation::segment_agg_result::{BucketIdProvider, SegmentAggregationCollector};
use crate::aggregation::BucketId;
use crate::TantivyError;
#[derive(Debug)]
#[derive(Clone, Debug)]
struct CompositeBucketCollector {
count: u32,
bucket_id: BucketId,
}
impl CompositeBucketCollector {
fn new() -> Self {
CompositeBucketCollector { count: 0 }
}
#[inline]
fn collect(&mut self) {
self.count += 1;
}
}
/// The value is represented as a tuple of:
/// - the column index or missing value sentinel
/// - if the value is present, store the accessor index + 1
/// - if the value is missing, store 0 (for missing first) or u8::MAX (for missing last)
/// - the fast field value u64 representation
/// - 0 if the field is missing
/// - regular u64 repr if the ordering is ascending
/// - bitwise NOT of the u64 repr if the ordering is descending
/// Compact sortable representation of a single source value within a composite key.
///
/// The struct encodes both the column identity and the fast field value in a way
/// that preserves the desired sort order via the derived `Ord` implementation
/// (fields are compared top-to-bottom: `sort_key` first, then `encoded_value`).
///
/// ## `sort_key` encoding
/// - `0` — missing value, sorted first
/// - `1..=254` — present value; the original accessor index is `sort_key - 1`
/// - `u8::MAX` (255) — missing value, sorted last
///
/// ## `encoded_value` encoding
/// - `0` when the field is missing
/// - The raw u64 fast-field representation when order is ascending
/// - Bitwise NOT of the raw u64 when order is descending
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Default, Hash)]
struct InternalValueRepr(u8, u64);
struct InternalValueRepr {
/// Column index biased by +1 (so 0 and u8::MAX are reserved for missing sentinels).
sort_key: u8,
/// Fast field value, possibly bit-flipped for descending order.
encoded_value: u64,
}
impl InternalValueRepr {
#[inline]
fn new_term(raw: u64, accessor_idx: u8, order: Order) -> Self {
match order {
Order::Asc => InternalValueRepr(accessor_idx + 1, raw),
Order::Desc => InternalValueRepr(accessor_idx + 1, !raw),
let encoded_value = match order {
Order::Asc => raw,
Order::Desc => !raw,
};
InternalValueRepr {
sort_key: accessor_idx + 1,
encoded_value,
}
}
/// For histogram, the source column does not matter
/// For histogram sources the column index is irrelevant (always 1).
#[inline]
fn new_histogram(raw: u64, order: Order) -> Self {
match order {
Order::Asc => InternalValueRepr(1, raw),
Order::Desc => InternalValueRepr(1, !raw),
let encoded_value = match order {
Order::Asc => raw,
Order::Desc => !raw,
};
InternalValueRepr {
sort_key: 1,
encoded_value,
}
}
#[inline]
fn new_missing(order: Order, missing_order: MissingOrder) -> Self {
let column_idx = match (missing_order, order) {
(MissingOrder::First, _) => 0,
(MissingOrder::Last, _) => u8::MAX,
(MissingOrder::Default, Order::Asc) => 0,
(MissingOrder::Default, Order::Desc) => u8::MAX,
let sort_key = match (missing_order, order) {
(MissingOrder::First, _) | (MissingOrder::Default, Order::Asc) => 0,
(MissingOrder::Last, _) | (MissingOrder::Default, Order::Desc) => u8::MAX,
};
InternalValueRepr(column_idx, 0)
InternalValueRepr {
sort_key,
encoded_value: 0,
}
}
/// Decode back to `(accessor_idx, raw_value)`.
/// Returns `None` when the value represents a missing field.
#[inline]
fn decode(self, order: Order) -> Option<(u8, u64)> {
if self.0 == u8::MAX || self.0 == 0 {
if self.sort_key == 0 || self.sort_key == u8::MAX {
return None;
}
match order {
Order::Asc => Some((self.0 - 1, self.1)),
Order::Desc => Some((self.0 - 1, !self.1)),
}
let raw = match order {
Order::Asc => self.encoded_value,
Order::Desc => !self.encoded_value,
};
Some((self.sort_key - 1, raw))
}
}
@@ -96,8 +116,13 @@ impl InternalValueRepr {
/// does a conversion to the correct datatype.
#[derive(Debug)]
pub struct SegmentCompositeCollector {
buckets: DynArrayHeapMap<InternalValueRepr, CompositeBucketCollector>,
/// One DynArrayHeapMap per parent bucket.
parent_buckets: Vec<DynArrayHeapMap<InternalValueRepr, CompositeBucketCollector>>,
accessor_idx: usize,
sub_agg: Option<CachedSubAggs<HighCardSubAggCache>>,
bucket_id_provider: BucketIdProvider,
/// Number of sources, needed when creating new DynArrayHeapMaps.
num_sources: usize,
}
impl SegmentAggregationCollector for SegmentCompositeCollector {
@@ -105,14 +130,14 @@ impl SegmentAggregationCollector for SegmentCompositeCollector {
&mut self,
agg_data: &AggregationsSegmentCtx,
results: &mut IntermediateAggregationResults,
_parent_bucket_id: BucketId,
parent_bucket_id: BucketId,
) -> crate::Result<()> {
let name = agg_data
.get_composite_req_data(self.accessor_idx)
.name
.clone();
let buckets = self.into_intermediate_bucket_result(agg_data)?;
let buckets = self.into_intermediate_bucket_result(agg_data, parent_bucket_id)?;
results.push(
name,
IntermediateAggregationResult::Bucket(IntermediateBucketResult::Composite { buckets }),
@@ -121,10 +146,9 @@ impl SegmentAggregationCollector for SegmentCompositeCollector {
Ok(())
}
#[inline]
fn collect(
&mut self,
_parent_bucket_id: BucketId,
parent_bucket_id: BucketId,
docs: &[crate::DocId],
agg_data: &mut AggregationsSegmentCtx,
) -> crate::Result<()> {
@@ -135,16 +159,21 @@ impl SegmentAggregationCollector for SegmentCompositeCollector {
let mut sub_level_values = SmallVec::new();
recursive_key_visitor(
*doc,
agg_data,
&composite_agg_data,
0,
&mut sub_level_values,
&mut self.buckets,
&mut self.parent_buckets[parent_bucket_id as usize],
true,
&mut self.sub_agg,
&mut self.bucket_id_provider,
)?;
}
agg_data.put_back_composite_req_data(self.accessor_idx, composite_agg_data);
if let Some(sub_agg) = &mut self.sub_agg {
sub_agg.check_flush_local(agg_data)?;
}
let mem_delta = self.get_memory_consumption() - mem_pre;
if mem_delta > 0 {
agg_data.context.limits.add_memory_consumed(mem_delta)?;
@@ -153,22 +182,33 @@ impl SegmentAggregationCollector for SegmentCompositeCollector {
Ok(())
}
fn prepare_max_bucket(
&mut self,
_max_bucket: BucketId,
_agg_data: &AggregationsSegmentCtx,
) -> crate::Result<()> {
fn flush(&mut self, agg_data: &mut AggregationsSegmentCtx) -> crate::Result<()> {
if let Some(sub_agg) = &mut self.sub_agg {
sub_agg.flush(agg_data)?;
}
Ok(())
}
fn flush(&mut self, _agg_data: &mut AggregationsSegmentCtx) -> crate::Result<()> {
fn prepare_max_bucket(
&mut self,
max_bucket: BucketId,
_agg_data: &AggregationsSegmentCtx,
) -> crate::Result<()> {
let required_len = max_bucket as usize + 1;
while self.parent_buckets.len() < required_len {
let map = DynArrayHeapMap::try_new(self.num_sources)?;
self.parent_buckets.push(map);
}
Ok(())
}
}
impl SegmentCompositeCollector {
fn get_memory_consumption(&self) -> u64 {
self.buckets.memory_consumption()
self.parent_buckets
.iter()
.map(|m| m.memory_consumption())
.sum()
}
pub(crate) fn from_req_and_validate(
@@ -177,34 +217,54 @@ impl SegmentCompositeCollector {
) -> crate::Result<Self> {
validate_req(req_data, node.idx_in_req_data)?;
if !node.children.is_empty() {
let _sub_aggregation = build_segment_agg_collectors(req_data, &node.children)?;
}
let has_sub_aggregations = !node.children.is_empty();
let sub_agg = if has_sub_aggregations {
let sub_agg_collector = build_segment_agg_collectors(req_data, &node.children)?;
Some(CachedSubAggs::new(sub_agg_collector))
} else {
None
};
let composite_req_data = req_data.get_composite_req_data(node.idx_in_req_data);
let num_sources = composite_req_data.req.sources.len();
Ok(SegmentCompositeCollector {
buckets: DynArrayHeapMap::try_new(composite_req_data.req.sources.len())?,
parent_buckets: vec![DynArrayHeapMap::try_new(num_sources)?],
accessor_idx: node.idx_in_req_data,
sub_agg,
bucket_id_provider: BucketIdProvider::default(),
num_sources,
})
}
#[inline]
pub(crate) fn into_intermediate_bucket_result(
fn into_intermediate_bucket_result(
&mut self,
agg_data: &AggregationsSegmentCtx,
parent_bucket_id: BucketId,
) -> crate::Result<IntermediateCompositeBucketResult> {
let empty_map = DynArrayHeapMap::try_new(self.num_sources)?;
let heap_map = mem::replace(
&mut self.parent_buckets[parent_bucket_id as usize],
empty_map,
);
let mut dict: FxHashMap<Vec<CompositeIntermediateKey>, IntermediateCompositeBucketEntry> =
Default::default();
dict.reserve(self.buckets.size());
dict.reserve(heap_map.size());
let composite_data = agg_data.get_composite_req_data(self.accessor_idx);
let buckets = std::mem::replace(
&mut self.buckets,
DynArrayHeapMap::try_new(composite_data.req.sources.len())
.expect("already validated source count"),
);
for (key_internal_repr, agg) in buckets.into_iter() {
for (key_internal_repr, agg) in heap_map.into_iter() {
let key = resolve_key(&key_internal_repr, composite_data)?;
let sub_aggregation_res = IntermediateAggregationResults::default();
let mut sub_aggregation_res = IntermediateAggregationResults::default();
if let Some(sub_agg) = &mut self.sub_agg {
sub_agg
.get_sub_agg_collector()
.add_intermediate_aggregation_result(
agg_data,
&mut sub_aggregation_res,
agg.bucket_id,
)?;
}
dict.insert(
key,
@@ -268,34 +328,47 @@ fn validate_req(req_data: &mut AggregationsSegmentCtx, accessor_idx: usize) -> c
}
fn collect_bucket_with_limit(
agg_data: &mut AggregationsSegmentCtx,
composite_agg_data: &CompositeAggReqData,
doc_id: crate::DocId,
limit_num_buckets: usize,
buckets: &mut DynArrayHeapMap<InternalValueRepr, CompositeBucketCollector>,
key: &[InternalValueRepr],
) -> crate::Result<()> {
if (buckets.size() as u32) < composite_agg_data.req.size {
buckets
.get_or_insert_with(key, CompositeBucketCollector::new)
.collect();
return Ok(());
sub_agg: &mut Option<CachedSubAggs<HighCardSubAggCache>>,
bucket_id_provider: &mut BucketIdProvider,
) {
let mut record_in_bucket = |bucket: &mut CompositeBucketCollector| {
bucket.count += 1;
if let Some(sub_agg) = sub_agg {
sub_agg.push(bucket.bucket_id, doc_id);
}
};
// We still have room for buckets, just insert
if buckets.size() < limit_num_buckets {
let bucket = buckets.get_or_insert_with(key, || CompositeBucketCollector {
count: 0,
bucket_id: bucket_id_provider.next_bucket_id(),
});
record_in_bucket(bucket);
return;
}
if let Some(entry) = buckets.get_mut(key) {
entry.collect();
return Ok(());
// Map is full, but we can still update the bucket if it already exists
if let Some(bucket) = buckets.get_mut(key) {
record_in_bucket(bucket);
return;
}
// Check if the item qualifies to enter the top-k, and evict the highest if it does
if let Some(highest_key) = buckets.peek_highest() {
if key < highest_key {
buckets.evict_highest();
buckets
.get_or_insert_with(key, CompositeBucketCollector::new)
.collect();
let bucket = buckets.get_or_insert_with(key, || CompositeBucketCollector {
count: 0,
bucket_id: bucket_id_provider.next_bucket_id(),
});
record_in_bucket(bucket);
}
}
let _ = agg_data;
Ok(())
}
/// Converts the composite key from its internal column space representation
@@ -305,7 +378,7 @@ fn resolve_key(
agg_data: &CompositeAggReqData,
) -> crate::Result<Vec<CompositeIntermediateKey>> {
internal_key
.into_iter()
.iter()
.enumerate()
.map(|(idx, val)| {
resolve_internal_value_repr(
@@ -390,20 +463,18 @@ fn resolve_term(
let val: u128 = compact_space_accessor.compact_to_u128(val as u32);
let val = Ipv6Addr::from_u128(val);
CompositeIntermediateKey::IpAddr(val)
} else if *column_type == ColumnType::U64 {
CompositeIntermediateKey::U64(val)
} else if *column_type == ColumnType::I64 {
CompositeIntermediateKey::I64(i64::from_u64(val))
} else {
if *column_type == ColumnType::U64 {
CompositeIntermediateKey::U64(val)
} else if *column_type == ColumnType::I64 {
CompositeIntermediateKey::I64(i64::from_u64(val))
} else {
let val = f64::from_u64(val);
let val: NumericalValue = val.into();
let val = f64::from_u64(val);
let val: NumericalValue = val.into();
match val.normalize() {
NumericalValue::U64(val) => CompositeIntermediateKey::U64(val),
NumericalValue::I64(val) => CompositeIntermediateKey::I64(val),
NumericalValue::F64(val) => CompositeIntermediateKey::F64(val),
}
match val.normalize() {
NumericalValue::U64(val) => CompositeIntermediateKey::U64(val),
NumericalValue::I64(val) => CompositeIntermediateKey::I64(val),
NumericalValue::F64(val) => CompositeIntermediateKey::F64(val),
}
};
Ok(key)
@@ -413,21 +484,25 @@ fn resolve_term(
/// and update the buckets.
fn recursive_key_visitor(
doc_id: crate::DocId,
agg_data: &mut AggregationsSegmentCtx,
composite_agg_data: &CompositeAggReqData,
source_idx_for_recursion: usize,
sub_level_values: &mut SmallVec<[InternalValueRepr; MAX_DYN_ARRAY_SIZE]>,
buckets: &mut DynArrayHeapMap<InternalValueRepr, CompositeBucketCollector>,
// whether we need to consider the after_key in the following levels
is_on_after_key: bool,
sub_agg: &mut Option<CachedSubAggs<HighCardSubAggCache>>,
bucket_id_provider: &mut BucketIdProvider,
) -> crate::Result<()> {
if source_idx_for_recursion == composite_agg_data.req.sources.len() {
if !is_on_after_key {
collect_bucket_with_limit(
agg_data,
composite_agg_data,
doc_id,
composite_agg_data.req.size as usize,
buckets,
sub_level_values,
)?;
sub_agg,
bucket_id_provider,
);
}
return Ok(());
}
@@ -467,12 +542,13 @@ fn recursive_key_visitor(
matches_after_key_type && current_level_accessors.after_key.equals(value);
recursive_key_visitor(
doc_id,
agg_data,
composite_agg_data,
source_idx_for_recursion + 1,
sub_level_values,
buckets,
is_on_after_key && still_on_after_key,
sub_agg,
bucket_id_provider,
)?;
sub_level_values.pop();
}
@@ -507,12 +583,13 @@ fn recursive_key_visitor(
let still_on_after_key = current_level_accessors.after_key.equals(bucket_value);
recursive_key_visitor(
doc_id,
agg_data,
composite_agg_data,
source_idx_for_recursion + 1,
sub_level_values,
buckets,
is_on_after_key && still_on_after_key,
sub_agg,
bucket_id_provider,
)?;
sub_level_values.pop();
}
@@ -560,12 +637,13 @@ fn recursive_key_visitor(
let still_on_after_key = current_level_accessors.after_key.equals(bucket_value);
recursive_key_visitor(
doc_id,
agg_data,
composite_agg_data,
source_idx_for_recursion + 1,
sub_level_values,
buckets,
is_on_after_key && still_on_after_key,
sub_agg,
bucket_id_provider,
)?;
sub_level_values.pop();
}
@@ -582,12 +660,13 @@ fn recursive_key_visitor(
));
recursive_key_visitor(
doc_id,
agg_data,
composite_agg_data,
source_idx_for_recursion + 1,
sub_level_values,
buckets,
is_on_after_key && current_level_accessors.is_after_key_explicit_missing,
sub_agg,
bucket_id_provider,
)?;
sub_level_values.pop();
}

View File

@@ -66,10 +66,6 @@ impl<K: Copy + Ord + Clone + 'static, V: 'static, const S: usize> ArrayHeapMap<K
.map(|(k, v)| (SmallVec::from_slice(&k), v)),
)
}
fn values_mut<'a>(&'a mut self) -> Box<dyn Iterator<Item = &'a mut V> + 'a> {
Box::new(self.buckets.values_mut())
}
}
pub(super) const MAX_DYN_ARRAY_SIZE: usize = 16;
@@ -301,28 +297,6 @@ impl<K: Ord + Clone + Copy + 'static, V: 'static> DynArrayHeapMap<K, V> {
DynArrayHeapMapInner::Dim16(map) => map.into_iter(),
}
}
/// Returns an iterator over mutable references to the values in the map.
pub(super) fn values_mut(&mut self) -> impl Iterator<Item = &mut V> {
match &mut self.0 {
DynArrayHeapMapInner::Dim1(map) => map.values_mut(),
DynArrayHeapMapInner::Dim2(map) => map.values_mut(),
DynArrayHeapMapInner::Dim3(map) => map.values_mut(),
DynArrayHeapMapInner::Dim4(map) => map.values_mut(),
DynArrayHeapMapInner::Dim5(map) => map.values_mut(),
DynArrayHeapMapInner::Dim6(map) => map.values_mut(),
DynArrayHeapMapInner::Dim7(map) => map.values_mut(),
DynArrayHeapMapInner::Dim8(map) => map.values_mut(),
DynArrayHeapMapInner::Dim9(map) => map.values_mut(),
DynArrayHeapMapInner::Dim10(map) => map.values_mut(),
DynArrayHeapMapInner::Dim11(map) => map.values_mut(),
DynArrayHeapMapInner::Dim12(map) => map.values_mut(),
DynArrayHeapMapInner::Dim13(map) => map.values_mut(),
DynArrayHeapMapInner::Dim14(map) => map.values_mut(),
DynArrayHeapMapInner::Dim15(map) => map.values_mut(),
DynArrayHeapMapInner::Dim16(map) => map.values_mut(),
}
}
}
#[cfg(test)]
@@ -345,15 +319,6 @@ mod tests {
assert_eq!(map.size(), 1);
assert_eq!(map.peek_highest(), Some(&key1[..]));
// mutable iterator
{
let mut mut_iter = map.values_mut();
let v = mut_iter.next().unwrap();
assert_eq!(*v, "a");
*v = "c";
assert_eq!(mut_iter.next(), None);
}
// into_iter
let mut iter = map.into_iter();
let (k, v) = iter.next().unwrap();

View File

@@ -338,76 +338,89 @@ impl ToTypePaginationOrder for CompositeKey {
}
}
/// A wrapper type for CompositeIntermediateKey that serializes to ES-compatible
/// raw values (strings as strings, numbers as numbers, etc.) and deserializes
/// from both raw ES format and the legacy "<type>:<value>" format.
/// After key is a string that encodes the intermediate composite key as "<type>:<value>"
/// A wrapper type for CompositeIntermediateKey that serializes/deserializes
/// to/from the "<type>:<value>" format.
#[derive(Clone, Debug, PartialEq)]
pub struct AfterKey(pub CompositeIntermediateKey);
impl Serialize for AfterKey {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where S: serde::Serializer {
match &self.0 {
CompositeIntermediateKey::Bool(b) => serializer.serialize_bool(*b),
CompositeIntermediateKey::Str(s) => serializer.serialize_str(s),
CompositeIntermediateKey::I64(i) => serializer.serialize_i64(*i),
CompositeIntermediateKey::U64(u) => serializer.serialize_u64(*u),
CompositeIntermediateKey::F64(f) => serializer.serialize_f64(*f),
CompositeIntermediateKey::IpAddr(ip) => serializer.serialize_str(&ip.to_string()),
CompositeIntermediateKey::DateTime(dt) => serializer.serialize_i64(*dt),
CompositeIntermediateKey::Null => serializer.serialize_none(),
}
let s = match &self.0 {
CompositeIntermediateKey::Bool(b) => format!("bool:{}", b),
CompositeIntermediateKey::Str(s) => format!("str:{}", s),
CompositeIntermediateKey::I64(i) => format!("i64:{}", i),
CompositeIntermediateKey::U64(u) => format!("u64:{}", u),
CompositeIntermediateKey::F64(f) => format!("f64:{}", f),
CompositeIntermediateKey::IpAddr(ip) => format!("ip:{}", ip),
CompositeIntermediateKey::DateTime(dt) => format!("dt:{}", dt),
CompositeIntermediateKey::Null => "null:".to_string(),
};
serializer.serialize_str(&s)
}
}
impl<'de> Deserialize<'de> for AfterKey {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where D: serde::Deserializer<'de> {
use serde::de;
let s = String::deserialize(deserializer)?;
let parts: Vec<&str> = s.splitn(2, ':').collect();
struct AfterKeyVisitor;
impl<'de> de::Visitor<'de> for AfterKeyVisitor {
type Value = AfterKey;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter.write_str("a string, number, boolean, or null")
}
fn visit_bool<E: de::Error>(self, v: bool) -> Result<AfterKey, E> {
Ok(AfterKey(CompositeIntermediateKey::Bool(v)))
}
fn visit_i64<E: de::Error>(self, v: i64) -> Result<AfterKey, E> {
Ok(AfterKey(CompositeIntermediateKey::I64(v)))
}
fn visit_u64<E: de::Error>(self, v: u64) -> Result<AfterKey, E> {
Ok(AfterKey(CompositeIntermediateKey::U64(v)))
}
fn visit_f64<E: de::Error>(self, v: f64) -> Result<AfterKey, E> {
Ok(AfterKey(CompositeIntermediateKey::F64(v)))
}
fn visit_str<E: de::Error>(self, v: &str) -> Result<AfterKey, E> {
Ok(AfterKey(CompositeIntermediateKey::Str(v.to_string())))
}
fn visit_string<E: de::Error>(self, v: String) -> Result<AfterKey, E> {
Ok(AfterKey(CompositeIntermediateKey::Str(v)))
}
fn visit_none<E: de::Error>(self) -> Result<AfterKey, E> {
Ok(AfterKey(CompositeIntermediateKey::Null))
}
fn visit_unit<E: de::Error>(self) -> Result<AfterKey, E> {
Ok(AfterKey(CompositeIntermediateKey::Null))
}
if parts.len() != 2 {
return Err(serde::de::Error::custom("invalid after key format"));
}
deserializer.deserialize_any(AfterKeyVisitor)
let key = match parts[0] {
"bool" => {
let b = parts[1].parse::<bool>().map_err(|e| {
serde::de::Error::custom(format!("failed to parse bool: {}", e))
})?;
CompositeIntermediateKey::Bool(b)
}
"str" => CompositeIntermediateKey::Str(parts[1].to_string()),
"i64" => {
let i = parts[1]
.parse::<i64>()
.map_err(|e| serde::de::Error::custom(format!("failed to parse i64: {}", e)))?;
CompositeIntermediateKey::I64(i)
}
"u64" => {
let u = parts[1]
.parse::<u64>()
.map_err(|e| serde::de::Error::custom(format!("failed to parse u64: {}", e)))?;
CompositeIntermediateKey::U64(u)
}
"f64" => {
let f = parts[1]
.parse::<f64>()
.map_err(|e| serde::de::Error::custom(format!("failed to parse f64: {}", e)))?;
if f.is_nan() {
return Err(serde::de::Error::custom(
"NaN is not supported in after key",
));
}
CompositeIntermediateKey::F64(f)
}
"ip" => {
let ip = IpAddr::from_str(parts[1]).map_err(|e: AddrParseError| {
serde::de::Error::custom(format!("failed to parse ip: {}", e))
})?;
CompositeIntermediateKey::IpAddr(ip.into_ipv6_addr())
}
"dt" => {
let dt = parts[1].parse::<i64>().map_err(|e| {
serde::de::Error::custom(format!("failed to parse datetime: {}", e))
})?;
CompositeIntermediateKey::DateTime(dt)
}
"null" => CompositeIntermediateKey::Null,
_ => {
return Err(serde::de::Error::custom("invalid after key type"));
}
};
Ok(AfterKey(key))
}
}

View File

@@ -15,8 +15,9 @@ use serde::{Deserialize, Serialize};
use super::agg_req::{Aggregation, AggregationVariants, Aggregations};
use super::agg_result::{AggregationResult, BucketResult, MetricResult, RangeBucketEntry};
use super::bucket::{
cut_off_buckets, get_agg_name_and_property, intermediate_histogram_buckets_to_final_buckets,
GetDocCount, Order, OrderTarget, RangeAggregation, TermsAggregation,
composite_intermediate_key_ordering, cut_off_buckets, get_agg_name_and_property,
intermediate_histogram_buckets_to_final_buckets, CompositeAggregation, GetDocCount,
MissingOrder, Order, OrderTarget, RangeAggregation, TermsAggregation,
};
use super::metric::{
IntermediateAverage, IntermediateCount, IntermediateExtendedStats, IntermediateMax,
@@ -27,10 +28,7 @@ use super::{format_date, AggregationError, Key, SerializedKey};
use crate::aggregation::agg_result::{
AggregationResults, BucketEntries, BucketEntry, CompositeBucketEntry, FilterBucketResult,
};
use crate::aggregation::bucket::{
composite_intermediate_key_ordering, CompositeAggregation, MissingOrder,
TermsAggregationInternal,
};
use crate::aggregation::bucket::TermsAggregationInternal;
use crate::aggregation::metric::CardinalityCollector;
use crate::TantivyError;
@@ -249,11 +247,6 @@ pub(crate) fn empty_from_req(req: &Aggregation) -> IntermediateAggregationResult
is_date_agg: true,
})
}
Composite(_) => {
IntermediateAggregationResult::Bucket(IntermediateBucketResult::Composite {
buckets: Default::default(),
})
}
Average(_) => IntermediateAggregationResult::Metric(IntermediateMetricResult::Average(
IntermediateAverage::default(),
)),
@@ -288,6 +281,11 @@ pub(crate) fn empty_from_req(req: &Aggregation) -> IntermediateAggregationResult
doc_count: 0,
sub_aggregations: IntermediateAggregationResults::default(),
}),
Composite(_) => {
IntermediateAggregationResult::Bucket(IntermediateBucketResult::Composite {
buckets: IntermediateCompositeBucketResult::default(),
})
}
}
}
@@ -581,13 +579,13 @@ impl IntermediateBucketResult {
sub_aggregations: final_sub_aggregations,
}))
}
IntermediateBucketResult::Composite { buckets } => buckets.into_final_result(
req.agg
IntermediateBucketResult::Composite { buckets } => {
let composite_req = req
.agg
.as_composite()
.expect("unexpected aggregation, expected composite aggregation"),
req.sub_aggregation(),
limits,
),
.expect("unexpected aggregation, expected composite aggregation");
buckets.into_final_result(composite_req, req.sub_aggregation(), limits)
}
}
}
@@ -656,13 +654,13 @@ impl IntermediateBucketResult {
}
(
IntermediateBucketResult::Composite {
buckets: buckets_left,
buckets: composite_left,
},
IntermediateBucketResult::Composite {
buckets: buckets_right,
buckets: composite_right,
},
) => {
buckets_left.merge_fruits(buckets_right)?;
composite_left.merge_fruits(composite_right)?;
}
(IntermediateBucketResult::Range(_), _) => {
panic!("try merge on different types")
@@ -922,6 +920,31 @@ pub struct IntermediateTermBucketEntry {
pub sub_aggregation: IntermediateAggregationResults,
}
impl MergeFruits for IntermediateTermBucketEntry {
fn merge_fruits(&mut self, other: IntermediateTermBucketEntry) -> crate::Result<()> {
self.doc_count += other.doc_count;
self.sub_aggregation.merge_fruits(other.sub_aggregation)?;
Ok(())
}
}
impl MergeFruits for IntermediateRangeBucketEntry {
fn merge_fruits(&mut self, other: IntermediateRangeBucketEntry) -> crate::Result<()> {
self.doc_count += other.doc_count;
self.sub_aggregation_res
.merge_fruits(other.sub_aggregation_res)?;
Ok(())
}
}
impl MergeFruits for IntermediateHistogramBucketEntry {
fn merge_fruits(&mut self, other: IntermediateHistogramBucketEntry) -> crate::Result<()> {
self.doc_count += other.doc_count;
self.sub_aggregation.merge_fruits(other.sub_aggregation)?;
Ok(())
}
}
/// Entry for the composite bucket.
pub type IntermediateCompositeBucketEntry = IntermediateTermBucketEntry;
@@ -967,41 +990,11 @@ impl std::hash::Hash for CompositeIntermediateKey {
/// Composite aggregation page.
#[derive(Default, Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct IntermediateCompositeBucketResult {
#[serde(
serialize_with = "serialize_composite_entries",
deserialize_with = "deserialize_composite_entries"
)]
pub(crate) entries: FxHashMap<Vec<CompositeIntermediateKey>, IntermediateCompositeBucketEntry>,
pub(crate) target_size: u32,
pub(crate) orders: Vec<(Order, MissingOrder)>,
}
fn serialize_composite_entries<S>(
entries: &FxHashMap<Vec<CompositeIntermediateKey>, IntermediateCompositeBucketEntry>,
serializer: S,
) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeSeq;
let mut seq = serializer.serialize_seq(Some(entries.len()))?;
for (k, v) in entries {
seq.serialize_element(&(k, v))?;
}
seq.end()
}
fn deserialize_composite_entries<'de, D>(
deserializer: D,
) -> Result<FxHashMap<Vec<CompositeIntermediateKey>, IntermediateCompositeBucketEntry>, D::Error>
where
D: serde::Deserializer<'de>,
{
let vec: Vec<(Vec<CompositeIntermediateKey>, IntermediateCompositeBucketEntry)> =
serde::Deserialize::deserialize(deserializer)?;
Ok(vec.into_iter().collect())
}
impl IntermediateCompositeBucketResult {
pub(crate) fn into_final_result(
self,
@@ -1057,16 +1050,12 @@ impl IntermediateCompositeBucketResult {
fn merge_fruits(&mut self, other: IntermediateCompositeBucketResult) -> crate::Result<()> {
merge_maps(&mut self.entries, other.entries)?;
if self.entries.len() as u32 > 2 * self.target_size {
// 2x factor used to avoid trimming too often (expensive operation)
// an optimal threshold could probably be figured out
self.trim()?;
}
Ok(())
}
/// Trim the composite buckets to the target size, according to the ordering.
///
/// Returns an error if the ordering comparison fails.
pub(crate) fn trim(&mut self) -> crate::Result<()> {
if self.entries.len() as u32 <= self.target_size {
return Ok(());
@@ -1096,20 +1085,19 @@ fn trim_composite_buckets(
let mut entries: Vec<_> = entries.into_iter().collect();
let mut sort_error: Option<TantivyError> = None;
entries.sort_by(|(left_key, _), (right_key, _)| {
// Only attempt sorting if we haven't encountered an error yet
if sort_error.is_some() {
return Ordering::Equal; // Return a default, we'll handle the error after sorting
return Ordering::Equal;
}
for i in 0..orders.len() {
for idx in 0..orders.len() {
match composite_intermediate_key_ordering(
&left_key[i],
&right_key[i],
orders[i].0,
orders[i].1,
&left_key[idx],
&right_key[idx],
orders[idx].0,
orders[idx].1,
) {
Ok(ordering) if ordering != Ordering::Equal => return ordering,
Ok(_) => continue, // Equal, try next key
Ok(_) => continue,
Err(err) => {
sort_error = Some(err);
break;
@@ -1119,7 +1107,6 @@ fn trim_composite_buckets(
Ordering::Equal
});
// If we encountered an error during sorting, return it now
if let Some(err) = sort_error {
return Err(err);
}
@@ -1128,31 +1115,6 @@ fn trim_composite_buckets(
Ok(entries)
}
impl MergeFruits for IntermediateTermBucketEntry {
fn merge_fruits(&mut self, other: IntermediateTermBucketEntry) -> crate::Result<()> {
self.doc_count += other.doc_count;
self.sub_aggregation.merge_fruits(other.sub_aggregation)?;
Ok(())
}
}
impl MergeFruits for IntermediateRangeBucketEntry {
fn merge_fruits(&mut self, other: IntermediateRangeBucketEntry) -> crate::Result<()> {
self.doc_count += other.doc_count;
self.sub_aggregation_res
.merge_fruits(other.sub_aggregation_res)?;
Ok(())
}
}
impl MergeFruits for IntermediateHistogramBucketEntry {
fn merge_fruits(&mut self, other: IntermediateHistogramBucketEntry) -> crate::Result<()> {
self.doc_count += other.doc_count;
self.sub_aggregation.merge_fruits(other.sub_aggregation)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;

View File

@@ -331,7 +331,7 @@ mod tests {
use crate::aggregation::AggregationCollector;
use crate::query::AllQuery;
use crate::schema::{Schema, FAST};
use crate::Index;
use crate::{assert_nearly_equals, Index};
#[test]
fn test_aggregation_percentiles_empty_index() -> crate::Result<()> {
@@ -614,13 +614,17 @@ mod tests {
let res = exec_request_with_query(agg_req, &index, None)?;
assert_eq!(res["range_with_stats"]["buckets"][0]["doc_count"], 3);
assert_eq!(
res["range_with_stats"]["buckets"][0]["percentiles"]["values"]["1.0"],
5.002829575110705
assert_nearly_equals!(
res["range_with_stats"]["buckets"][0]["percentiles"]["values"]["1.0"]
.as_f64()
.unwrap(),
5.0028295751107414
);
assert_eq!(
res["range_with_stats"]["buckets"][0]["percentiles"]["values"]["99.0"],
10.07469668951133
assert_nearly_equals!(
res["range_with_stats"]["buckets"][0]["percentiles"]["values"]["99.0"]
.as_f64()
.unwrap(),
10.07469668951144
);
Ok(())
@@ -665,8 +669,14 @@ mod tests {
let res = exec_request_with_query(agg_req, &index, None)?;
assert_eq!(res["percentiles"]["values"]["1.0"], 5.002829575110705);
assert_eq!(res["percentiles"]["values"]["99.0"], 10.07469668951133);
assert_nearly_equals!(
res["percentiles"]["values"]["1.0"].as_f64().unwrap(),
5.0028295751107414
);
assert_nearly_equals!(
res["percentiles"]["values"]["99.0"].as_f64().unwrap(),
10.07469668951144
);
Ok(())
}

View File

@@ -21,7 +21,7 @@ use std::path::PathBuf;
pub use common::file_slice::{FileHandle, FileSlice};
pub use common::{AntiCallToken, OwnedBytes, TerminatingWrite};
pub(crate) use self::composite_file::{CompositeFile, CompositeWrite};
pub use self::composite_file::{CompositeFile, CompositeWrite};
pub use self::directory::{Directory, DirectoryClone, DirectoryLock};
pub use self::directory_lock::{Lock, INDEX_WRITER_LOCK, META_LOCK};
pub use self::ram_directory::RamDirectory;
@@ -52,7 +52,7 @@ pub use self::mmap_directory::MmapDirectory;
///
/// `WritePtr` are required to implement both Write
/// and Seek.
pub type WritePtr = BufWriter<Box<dyn TerminatingWrite>>;
pub type WritePtr = BufWriter<Box<dyn TerminatingWrite + Send + Sync>>;
#[cfg(test)]
mod tests;

View File

@@ -94,7 +94,7 @@ impl MergePolicy for LogMergePolicy {
fn compute_merge_candidates(&self, segments: &[SegmentMeta]) -> Vec<MergeCandidate> {
let size_sorted_segments = segments
.iter()
.filter(|seg| seg.num_docs() <= (self.max_docs_before_merge as u32))
.filter(|seg| (seg.num_docs() as usize) <= self.max_docs_before_merge)
.sorted_by_key(|seg| std::cmp::Reverse(seg.max_doc()))
.collect::<Vec<&SegmentMeta>>();
@@ -372,4 +372,21 @@ mod tests {
assert_eq!(merge_candidates[0].0.len(), 1);
assert_eq!(merge_candidates[0].0[0], test_input[1].id());
}
#[test]
fn test_max_docs_before_merge_large_value() {
// Regression test: (max_docs_before_merge as u32) truncates values > u32::MAX.
// Casting num_docs() to usize instead avoids the truncation.
let mut policy = LogMergePolicy::default();
policy.set_min_num_segments(2);
policy.set_max_docs_before_merge(5_000_000_000usize);
let test_input = vec![
create_random_segment_meta(100_000),
create_random_segment_meta(100_000),
];
let result = policy.compute_merge_candidates(&test_input);
// Both segments should be eligible (100_000 < 5_000_000_000)
assert_eq!(result.len(), 1);
assert_eq!(result[0].0.len(), 2);
}
}

View File

@@ -403,7 +403,8 @@ impl SegmentUpdater {
// from the different drives.
//
// Segment 1 from disk 1, Segment 1 from disk 2, etc.
committed_segment_metas.sort_by_key(|segment_meta| -(segment_meta.max_doc() as i32));
committed_segment_metas
.sort_by_key(|segment_meta| std::cmp::Reverse(segment_meta.max_doc()));
let index_meta = IndexMeta {
index_settings: index.settings().clone(),
segments: committed_segment_metas,
@@ -705,6 +706,7 @@ mod tests {
use crate::collector::TopDocs;
use crate::directory::RamDirectory;
use crate::fastfield::AliveBitSet;
use crate::index::{SegmentId, SegmentMetaInventory};
use crate::indexer::merge_policy::tests::MergeWheneverPossible;
use crate::indexer::merger::IndexMerger;
use crate::indexer::segment_updater::merge_filtered_segments;
@@ -712,6 +714,22 @@ mod tests {
use crate::schema::*;
use crate::{Directory, DocAddress, Index, Segment};
#[test]
fn test_segment_sort_large_max_doc() {
// Regression test: -(max_doc as i32) overflows for max_doc >= 2^31.
// Using std::cmp::Reverse avoids this.
let inventory = SegmentMetaInventory::default();
let mut metas = vec![
inventory.new_segment_meta(SegmentId::generate_random(), 100),
inventory.new_segment_meta(SegmentId::generate_random(), (1u32 << 31) - 1),
inventory.new_segment_meta(SegmentId::generate_random(), 50_000),
];
metas.sort_by_key(|m| std::cmp::Reverse(m.max_doc()));
assert_eq!(metas[0].max_doc(), (1u32 << 31) - 1);
assert_eq!(metas[1].max_doc(), 50_000);
assert_eq!(metas[2].max_doc(), 100);
}
#[test]
fn test_delete_during_merge() -> crate::Result<()> {
let mut schema_builder = Schema::builder();

View File

@@ -169,8 +169,10 @@ mod macros;
mod future_result;
// Re-exports
pub use columnar;
pub use common::{ByteCount, DateTime};
pub use {columnar, query_grammar, time};
pub use query_grammar;
pub use time;
pub use crate::error::TantivyError;
pub use crate::future_result::FutureResult;

View File

@@ -14,7 +14,8 @@ mod postings;
mod postings_writer;
mod recorder;
mod segment_postings;
mod serializer;
/// Serializer module for the inverted index
pub mod serializer;
mod skip;
mod term_info;

View File

@@ -11,7 +11,7 @@ use crate::positions::PositionSerializer;
use crate::postings::compression::{BlockEncoder, VIntEncoder, COMPRESSION_BLOCK_SIZE};
use crate::postings::skip::SkipSerializer;
use crate::query::Bm25Weight;
use crate::schema::{Field, FieldEntry, FieldType, IndexRecordOption, Schema};
use crate::schema::{Field, FieldEntry, IndexRecordOption, Schema};
use crate::termdict::TermDictionaryBuilder;
use crate::{DocId, Score};
@@ -80,9 +80,12 @@ impl InvertedIndexSerializer {
let term_dictionary_write = self.terms_write.for_field(field);
let postings_write = self.postings_write.for_field(field);
let positions_write = self.positions_write.for_field(field);
let field_type: FieldType = (*field_entry.field_type()).clone();
let index_record_option = field_entry
.field_type()
.index_record_option()
.unwrap_or(IndexRecordOption::Basic);
FieldSerializer::create(
&field_type,
index_record_option,
total_num_tokens,
term_dictionary_write,
postings_write,
@@ -102,29 +105,27 @@ impl InvertedIndexSerializer {
/// The field serializer is in charge of
/// the serialization of a specific field.
pub struct FieldSerializer<'a> {
term_dictionary_builder: TermDictionaryBuilder<&'a mut CountingWriter<WritePtr>>,
pub struct FieldSerializer<'a, W: Write = WritePtr> {
term_dictionary_builder: TermDictionaryBuilder<&'a mut CountingWriter<W>>,
postings_serializer: PostingsSerializer,
positions_serializer_opt: Option<PositionSerializer<&'a mut CountingWriter<WritePtr>>>,
positions_serializer_opt: Option<PositionSerializer<&'a mut CountingWriter<W>>>,
current_term_info: TermInfo,
term_open: bool,
postings_write: &'a mut CountingWriter<WritePtr>,
postings_write: &'a mut CountingWriter<W>,
postings_start_offset: u64,
}
impl<'a> FieldSerializer<'a> {
fn create(
field_type: &FieldType,
impl<'a, W: Write> FieldSerializer<'a, W> {
/// Creates a new `FieldSerializer` for the given field type.
pub fn create(
index_record_option: IndexRecordOption,
total_num_tokens: u64,
term_dictionary_write: &'a mut CountingWriter<WritePtr>,
postings_write: &'a mut CountingWriter<WritePtr>,
positions_write: &'a mut CountingWriter<WritePtr>,
term_dictionary_write: &'a mut CountingWriter<W>,
postings_write: &'a mut CountingWriter<W>,
positions_write: &'a mut CountingWriter<W>,
fieldnorm_reader: Option<FieldNormReader>,
) -> io::Result<FieldSerializer<'a>> {
) -> io::Result<FieldSerializer<'a, W>> {
total_num_tokens.serialize(postings_write)?;
let index_record_option = field_type
.index_record_option()
.unwrap_or(IndexRecordOption::Basic);
let term_dictionary_builder = TermDictionaryBuilder::create(term_dictionary_write)?;
let average_fieldnorm = fieldnorm_reader
.as_ref()
@@ -192,6 +193,11 @@ impl<'a> FieldSerializer<'a> {
Ok(())
}
/// Starts the postings for a new term without recording term frequencies.
pub fn new_term_without_freq(&mut self, term: &[u8]) -> io::Result<()> {
self.new_term(term, 0, false)
}
/// Serialize the information that a document contains for the current term:
/// its term frequency, and the position deltas.
///
@@ -297,6 +303,7 @@ impl Block {
}
}
/// Serializer for postings lists.
pub struct PostingsSerializer {
last_doc_id_encoded: u32,
@@ -316,6 +323,9 @@ pub struct PostingsSerializer {
}
impl PostingsSerializer {
/// Creates a new `PostingsSerializer`.
/// * avg_fieldnorm - average field norm for the field being serialized.
/// * mode - indexing options for the field being serialized.
pub fn new(
avg_fieldnorm: Score,
mode: IndexRecordOption,
@@ -338,6 +348,8 @@ impl PostingsSerializer {
}
}
/// Starts the serialization for a new term.
/// * term_doc_freq - the number of documents containing the term.
pub fn new_term(&mut self, term_doc_freq: u32, record_term_freq: bool) {
self.bm25_weight = None;
@@ -377,6 +389,7 @@ impl PostingsSerializer {
self.postings_write.extend(block_encoded);
}
if self.term_has_freq {
// encode the term frequencies
let (num_bits, block_encoded): (u8, &[u8]) = self
.block_encoder
.compress_block_unsorted(self.block.term_freqs(), true);
@@ -417,6 +430,9 @@ impl PostingsSerializer {
self.block.clear();
}
/// Register that the given document contains the current term.
/// * doc_id - the document id.
/// * term_freq - the term frequency within the document.
pub fn write_doc(&mut self, doc_id: DocId, term_freq: u32) {
self.block.append_doc(doc_id, term_freq);
if self.block.is_full() {
@@ -424,6 +440,7 @@ impl PostingsSerializer {
}
}
/// Finish the serialization for this term.
pub fn close_term(
&mut self,
doc_freq: u32,

View File

@@ -14,7 +14,11 @@ use crate::{DocId, Score, TERMINATED};
// (requiring a 6th bit), but the biggest doc_id we can want to encode is TERMINATED-1, which can
// be represented on 31b without delta encoding.
fn encode_bitwidth(bitwidth: u8, delta_1: bool) -> u8 {
assert!(bitwidth < 32);
assert!(
bitwidth < 32,
"bitwidth needs to be less than 32, but got {}",
bitwidth
);
bitwidth | ((delta_1 as u8) << 6)
}

View File

@@ -302,8 +302,9 @@ where
|| self.previous_key[keep_len] < key[keep_len];
assert!(
increasing_keys,
"Keys should be increasing. ({:?} > {key:?})",
self.previous_key
"Keys should be increasing. ({:?} > {:?})",
String::from_utf8_lossy(&self.previous_key),
String::from_utf8_lossy(key),
);
self.previous_key.resize(key.len(), 0u8);
self.previous_key[keep_len..].copy_from_slice(&key[keep_len..]);