Compare commits

..

1 Commits

Author SHA1 Message Date
trinity Pointard
c73d4a7166 reorder parsing of aggregations results 2025-02-26 18:06:09 +01:00
163 changed files with 1689 additions and 4141 deletions

View File

@@ -1,42 +1,12 @@
Tantivy 0.25
Tantivy 0.23 - Unreleased
================================
## Bugfixes
- fix union performance regression in tantivy 0.24 [#2663](https://github.com/quickwit-oss/tantivy/pull/2663)(@PSeitz)
- make zstd optional in sstable [#2633](https://github.com/quickwit-oss/tantivy/pull/2633)(@Parth)
- Fix TopDocs::order_by_string_fast_field for asc order [#2672](https://github.com/quickwit-oss/tantivy/pull/2672)(@stuhood @PSeitz)
## Features/Improvements
- add docs/example and Vec<u32> values to sstable [#2660](https://github.com/quickwit-oss/tantivy/pull/2660)(@PSeitz)
- Add string fast field support to `TopDocs`. [#2642](https://github.com/quickwit-oss/tantivy/pull/2642)(@stuhood)
- update edition to 2024 [#2620](https://github.com/quickwit-oss/tantivy/pull/2620)(@PSeitz)
- Allow optional spaces between the field name and the value in the query parser [#2678](https://github.com/quickwit-oss/tantivy/pull/2678)(@Darkheir)
- Support mixed field types in query parser [#2676](https://github.com/quickwit-oss/tantivy/pull/2676)(@trinity-1686a)
- Add per-field size details [#2679](https://github.com/quickwit-oss/tantivy/pull/2679)(@fulmicoton)
Tantivy 0.24.2
================================
- Fix TopNComputer for reverse order. [#2672](https://github.com/quickwit-oss/tantivy/pull/2672)(@stuhood @PSeitz)
Affected queries are [order_by_fast_field](https://docs.rs/tantivy/latest/tantivy/collector/struct.TopDocs.html#method.order_by_fast_field) and
[order_by_u64_field](https://docs.rs/tantivy/latest/tantivy/collector/struct.TopDocs.html#method.order_by_u64_field)
for `Order::Asc`
Tantivy 0.24.1
================================
- Fix: bump required rust version to 1.81
Tantivy 0.24
================================
Tantivy 0.24 will be backwards compatible with indices created with v0.22 and v0.21. The new minimum rust version will be 1.75. Tantivy 0.23 will be skipped.
Tantivy 0.23 will be backwards compatible with indices created with v0.22 and v0.21. The new minimum rust version will be 1.75.
#### Bugfixes
- fix potential endless loop in merge [#2457](https://github.com/quickwit-oss/tantivy/pull/2457)(@PSeitz)
- fix bug that causes out-of-order sstable key. [#2445](https://github.com/quickwit-oss/tantivy/pull/2445)(@fulmicoton)
- fix ReferenceValue API flaw [#2372](https://github.com/quickwit-oss/tantivy/pull/2372)(@PSeitz)
- fix `OwnedBytes` debug panic [#2512](https://github.com/quickwit-oss/tantivy/pull/2512)(@b41sh)
- catch panics during merges [#2582](https://github.com/quickwit-oss/tantivy/pull/2582)(@rdettai)
- switch from u32 to usize in bitpacker. This enables multivalued columns larger than 4GB, which crashed during merge before. [#2581](https://github.com/quickwit-oss/tantivy/pull/2581) [#2586](https://github.com/quickwit-oss/tantivy/pull/2586)(@fulmicoton-dd @PSeitz)
#### Breaking API Changes
- remove index sorting [#2434](https://github.com/quickwit-oss/tantivy/pull/2434)(@PSeitz)
@@ -54,7 +24,6 @@ Tantivy 0.24 will be backwards compatible with indices created with v0.22 and v0
- reduce top hits memory consumption [#2426](https://github.com/quickwit-oss/tantivy/pull/2426)(@PSeitz)
- check unsupported parameters top_hits [#2351](https://github.com/quickwit-oss/tantivy/pull/2351)(@PSeitz)
- Change AggregationLimits to AggregationLimitsGuard [#2495](https://github.com/quickwit-oss/tantivy/pull/2495)(@PSeitz)
- add support for counting non integer in aggregation [#2547](https://github.com/quickwit-oss/tantivy/pull/2547)(@trinity-1686a)
- **Range Queries**
- Support fast field range queries on json fields [#2456](https://github.com/quickwit-oss/tantivy/pull/2456)(@PSeitz)
- Add support for str fast field range query [#2460](https://github.com/quickwit-oss/tantivy/pull/2460) [#2452](https://github.com/quickwit-oss/tantivy/pull/2452) [#2453](https://github.com/quickwit-oss/tantivy/pull/2453)(@PSeitz)
@@ -65,8 +34,7 @@ Tantivy 0.24 will be backwards compatible with indices created with v0.22 and v0
- add columnar format compatibility tests [#2433](https://github.com/quickwit-oss/tantivy/pull/2433)(@PSeitz)
- Improved snippet ranges algorithm [#2474](https://github.com/quickwit-oss/tantivy/pull/2474)(@gezihuzi)
- make find_field_with_default return json fields without path [#2476](https://github.com/quickwit-oss/tantivy/pull/2476)(@trinity-1686a)
- Make `BooleanQuery` support `minimum_number_should_match` [#2405](https://github.com/quickwit-oss/tantivy/pull/2405)(@LebranceBW)
- Make `NUM_MERGE_THREADS` configurable [#2535](https://github.com/quickwit-oss/tantivy/pull/2535)(@Barre)
- feat(query): Make `BooleanQuery` support `minimum_number_should_match` [#2405](https://github.com/quickwit-oss/tantivy/pull/2405)(@LebranceBW)
- **RegexPhraseQuery**
`RegexPhraseQuery` supports phrase queries with regex. E.g. query "b.* b.* wolf" matches "big bad wolf". Slop is supported as well: "b.* wolf"~2 matches "big bad wolf" [#2516](https://github.com/quickwit-oss/tantivy/pull/2516)(@PSeitz)
@@ -92,9 +60,7 @@ This will slightly increase space and access time. [#2439](https://github.com/qu
- fix de-escaping too much in query parser [#2427](https://github.com/quickwit-oss/tantivy/pull/2427)(@trinity-1686a)
- improve query parser [#2416](https://github.com/quickwit-oss/tantivy/pull/2416)(@trinity-1686a)
- Support field grouping `title:(return AND "pink panther")` [#2333](https://github.com/quickwit-oss/tantivy/pull/2333)(@trinity-1686a)
- allow term starting with wildcard [#2568](https://github.com/quickwit-oss/tantivy/pull/2568)(@trinity-1686a)
- Exist queries match subpath fields [#2558](https://github.com/quickwit-oss/tantivy/pull/2558)(@rdettai)
- add access benchmark for columnar [#2432](https://github.com/quickwit-oss/tantivy/pull/2432)(@PSeitz)
- extend indexwriter proptests [#2342](https://github.com/quickwit-oss/tantivy/pull/2342)(@PSeitz)
- add bench & test for columnar merging [#2428](https://github.com/quickwit-oss/tantivy/pull/2428)(@PSeitz)
@@ -108,14 +74,6 @@ This will slightly increase space and access time. [#2439](https://github.com/qu
- Fix trait bound of StoreReader::iter [#2360](https://github.com/quickwit-oss/tantivy/pull/2360)(@adamreichold)
- remove read_postings_no_deletes [#2526](https://github.com/quickwit-oss/tantivy/pull/2526)(@PSeitz)
Tantivy 0.22.1
================================
- Fix TopNComputer for reverse order. [#2672](https://github.com/quickwit-oss/tantivy/pull/2672)(@stuhood @PSeitz)
Affected queries are [order_by_fast_field](https://docs.rs/tantivy/latest/tantivy/collector/struct.TopDocs.html#method.order_by_fast_field) and
[order_by_u64_field](https://docs.rs/tantivy/latest/tantivy/collector/struct.TopDocs.html#method.order_by_u64_field)
for `Order::Asc`
Tantivy 0.22
================================

View File

@@ -1,6 +1,6 @@
[package]
name = "tantivy"
version = "0.25.0"
version = "0.23.0"
authors = ["Paul Masurel <paul.masurel@gmail.com>"]
license = "MIT"
categories = ["database-implementations", "data-structures"]
@@ -11,7 +11,7 @@ repository = "https://github.com/quickwit-oss/tantivy"
readme = "README.md"
keywords = ["search", "information", "retrieval"]
edition = "2021"
rust-version = "1.85"
rust-version = "1.75"
exclude = ["benches/*.json", "benches/*.txt"]
[dependencies]
@@ -31,9 +31,9 @@ lz4_flex = { version = "0.11", default-features = false, optional = true }
zstd = { version = "0.13", optional = true, default-features = false }
tempfile = { version = "3.12.0", optional = true }
log = "0.4.16"
serde = { version = "1.0.219", features = ["derive"] }
serde_json = "1.0.140"
fs4 = { version = "0.13.1", optional = true }
serde = { version = "1.0.136", features = ["derive"] }
serde_json = "1.0.79"
fs4 = { version = "0.8.0", optional = true }
levenshtein_automata = "0.2.1"
uuid = { version = "1.0.0", features = ["v4", "serde"] }
crossbeam-channel = "0.5.4"
@@ -57,13 +57,13 @@ measure_time = "0.9.0"
arc-swap = "1.5.0"
bon = "3.3.1"
columnar = { version = "0.6", path = "./columnar", package = "tantivy-columnar" }
sstable = { version = "0.6", path = "./sstable", package = "tantivy-sstable", optional = true }
stacker = { version = "0.6", path = "./stacker", package = "tantivy-stacker" }
query-grammar = { version = "0.25.0", path = "./query-grammar", package = "tantivy-query-grammar" }
tantivy-bitpacker = { version = "0.9", path = "./bitpacker" }
common = { version = "0.10", path = "./common/", package = "tantivy-common" }
tokenizer-api = { version = "0.6", path = "./tokenizer-api", package = "tantivy-tokenizer-api" }
columnar = { version = "0.3", path = "./columnar", package = "tantivy-columnar" }
sstable = { version = "0.3", path = "./sstable", package = "tantivy-sstable", optional = true }
stacker = { version = "0.3", path = "./stacker", package = "tantivy-stacker" }
query-grammar = { version = "0.22.0", path = "./query-grammar", package = "tantivy-query-grammar" }
tantivy-bitpacker = { version = "0.6", path = "./bitpacker" }
common = { version = "0.7", path = "./common/", package = "tantivy-common" }
tokenizer-api = { version = "0.3", path = "./tokenizer-api", package = "tantivy-tokenizer-api" }
sketches-ddsketch = { version = "0.3.0", features = ["use_serde"] }
hyperloglogplus = { version = "0.4.1", features = ["const-loop"] }
futures-util = { version = "0.3.28", optional = true }
@@ -112,16 +112,13 @@ debug-assertions = true
overflow-checks = true
[features]
default = ["mmap", "stopwords", "lz4-compression", "columnar-zstd-compression"]
default = ["mmap", "stopwords", "lz4-compression"]
mmap = ["fs4", "tempfile", "memmap2"]
stopwords = []
lz4-compression = ["lz4_flex"]
zstd-compression = ["zstd"]
# enable zstd-compression in columnar (and sstable)
columnar-zstd-compression = ["columnar/zstd-compression"]
failpoints = ["fail", "fail/failpoints"]
unstable = [] # useful for benches.
@@ -167,12 +164,3 @@ harness = false
[[bench]]
name = "agg_bench"
harness = false
[[bench]]
name = "exists_json"
harness = false
[[bench]]
name = "and_or_queries"
harness = false

View File

@@ -1,4 +1,4 @@
# Releasing a new Tantivy Version
# Release a new Tantivy Version
## Steps
@@ -10,29 +10,12 @@
6. Set git tag with new version
[`cargo-release`](https://github.com/crate-ci/cargo-release) will help us with steps 1-5:
In conjucation with `cargo-release` Steps 1-4 (I'm not sure if the change detection works):
Set new packages to version 0.0.0
Replace prev-tag-name
```bash
cargo release --workspace --no-publish -v --prev-tag-name 0.24 --push-remote origin minor --no-tag
cargo release --workspace --no-publish -v --prev-tag-name 0.19 --push-remote origin minor --no-tag --execute
```
`no-tag` or it will create tags for all the subpackages
cargo release will _not_ ignore unchanged packages, but it will print warnings for them.
e.g. "warning: updating ownedbytes to 0.10.0 despite no changes made since tag 0.24"
We need to manually ignore these unchanged packages
```bash
cargo release --workspace --no-publish -v --prev-tag-name 0.24 --push-remote origin minor --no-tag --exclude tokenizer-api
```
Add `--execute` to actually publish the packages, otherwise it will only print the commands that would be run.
### Tag Version
```bash
git tag 0.25.0
git push upstream tag 0.25.0
```
no-tag or it will create tags for all the subpackages

View File

@@ -1,224 +0,0 @@
// Benchmarks boolean conjunction queries using binggan.
//
// Whats measured:
// - Or and And queries with varying selectivity (only `Term` queries for now on leafs)
// - Nested AND/OR combinations (on multiple fields)
// - No-scoring path using the Count collector (focus on iterator/skip performance)
// - Top-K retrieval (k=10) using the TopDocs collector
//
// Corpus model:
// - Synthetic docs; each token a/b/c is independently included per doc
// - If none of a/b/c are included, emit a neutral filler token to keep doc length similar
//
// Notes:
// - After optimization, when scoring is disabled Tantivy reads doc-only postings
// (IndexRecordOption::Basic), avoiding frequency decoding overhead.
// - This bench isolates boolean iteration speed and intersection/union cost.
// - Use `cargo bench --bench boolean_conjunction` to run.
use binggan::{black_box, BenchRunner};
use rand::prelude::*;
use rand::rngs::StdRng;
use rand::SeedableRng;
use tantivy::collector::{Count, TopDocs};
use tantivy::query::QueryParser;
use tantivy::schema::{Schema, TEXT};
use tantivy::{doc, Index, ReloadPolicy, Searcher};
#[derive(Clone)]
struct BenchIndex {
#[allow(dead_code)]
index: Index,
searcher: Searcher,
query_parser: QueryParser,
}
impl BenchIndex {
#[inline(always)]
fn count_query(&self, query_str: &str) -> usize {
let query = self.query_parser.parse_query(query_str).unwrap();
self.searcher.search(&query, &Count).unwrap()
}
#[inline(always)]
fn topk_len(&self, query_str: &str, k: usize) -> usize {
let query = self.query_parser.parse_query(query_str).unwrap();
self.searcher
.search(&query, &TopDocs::with_limit(k))
.unwrap()
.len()
}
}
/// Build a single index containing both fields (title, body) and
/// return two BenchIndex views:
/// - single_field: QueryParser defaults to only "body"
/// - multi_field: QueryParser defaults to ["title", "body"]
fn build_shared_indices(num_docs: usize, p_a: f32, p_b: f32, p_c: f32) -> (BenchIndex, BenchIndex) {
// Unified schema (two text fields)
let mut schema_builder = Schema::builder();
let f_title = schema_builder.add_text_field("title", TEXT);
let f_body = schema_builder.add_text_field("body", TEXT);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema.clone());
// Populate index with stable RNG for reproducibility.
let mut rng = StdRng::from_seed([7u8; 32]);
// Populate: spread each present token 90/10 to body/title
{
let mut writer = index.writer(500_000_000).unwrap();
for _ in 0..num_docs {
let has_a = rng.gen_bool(p_a as f64);
let has_b = rng.gen_bool(p_b as f64);
let has_c = rng.gen_bool(p_c as f64);
let mut title_tokens: Vec<&str> = Vec::new();
let mut body_tokens: Vec<&str> = Vec::new();
if has_a {
if rng.gen_bool(0.1) {
title_tokens.push("a");
} else {
body_tokens.push("a");
}
}
if has_b {
if rng.gen_bool(0.1) {
title_tokens.push("b");
} else {
body_tokens.push("b");
}
}
if has_c {
if rng.gen_bool(0.1) {
title_tokens.push("c");
} else {
body_tokens.push("c");
}
}
if title_tokens.is_empty() && body_tokens.is_empty() {
body_tokens.push("z");
}
writer
.add_document(doc!(
f_title=>title_tokens.join(" "),
f_body=>body_tokens.join(" ")
))
.unwrap();
}
writer.commit().unwrap();
}
// Prepare reader/searcher once.
let reader = index
.reader_builder()
.reload_policy(ReloadPolicy::Manual)
.try_into()
.unwrap();
let searcher = reader.searcher();
// Build two query parsers with different default fields.
let qp_single = QueryParser::for_index(&index, vec![f_body]);
let qp_multi = QueryParser::for_index(&index, vec![f_title, f_body]);
let single_view = BenchIndex {
index: index.clone(),
searcher: searcher.clone(),
query_parser: qp_single,
};
let multi_view = BenchIndex {
index,
searcher,
query_parser: qp_multi,
};
(single_view, multi_view)
}
fn main() {
// Prepare corpora with varying selectivity. Build one index per corpus
// and derive two views (single-field vs multi-field) from it.
let scenarios = vec![
(
"N=1M, p(a)=5%, p(b)=1%, p(c)=15%".to_string(),
1_000_000,
0.05,
0.01,
0.15,
),
(
"N=1M, p(a)=1%, p(b)=1%, p(c)=15%".to_string(),
1_000_000,
0.01,
0.01,
0.15,
),
];
let mut runner = BenchRunner::new();
for (label, n, pa, pb, pc) in scenarios {
let (single_view, multi_view) = build_shared_indices(n, pa, pb, pc);
// Single-field group: default field is body only
{
let mut group = runner.new_group();
group.set_name(format!("single_field — {}", label));
group.register_with_input("+a_+b_count", &single_view, |benv: &BenchIndex| {
black_box(benv.count_query("+a +b"))
});
group.register_with_input("+a_+b_+c_count", &single_view, |benv: &BenchIndex| {
black_box(benv.count_query("+a +b +c"))
});
group.register_with_input("+a_+b_top10", &single_view, |benv: &BenchIndex| {
black_box(benv.topk_len("+a +b", 10))
});
group.register_with_input("+a_+b_+c_top10", &single_view, |benv: &BenchIndex| {
black_box(benv.topk_len("+a +b +c", 10))
});
// OR queries
group.register_with_input("a_OR_b_count", &single_view, |benv: &BenchIndex| {
black_box(benv.count_query("a OR b"))
});
group.register_with_input("a_OR_b_OR_c_count", &single_view, |benv: &BenchIndex| {
black_box(benv.count_query("a OR b OR c"))
});
group.register_with_input("a_OR_b_top10", &single_view, |benv: &BenchIndex| {
black_box(benv.topk_len("a OR b", 10))
});
group.register_with_input("a_OR_b_OR_c_top10", &single_view, |benv: &BenchIndex| {
black_box(benv.topk_len("a OR b OR c", 10))
});
group.run();
}
// Multi-field group: default fields are [title, body]
{
let mut group = runner.new_group();
group.set_name(format!("multi_field — {}", label));
group.register_with_input("+a_+b_count", &multi_view, |benv: &BenchIndex| {
black_box(benv.count_query("+a +b"))
});
group.register_with_input("+a_+b_+c_count", &multi_view, |benv: &BenchIndex| {
black_box(benv.count_query("+a +b +c"))
});
group.register_with_input("+a_+b_top10", &multi_view, |benv: &BenchIndex| {
black_box(benv.topk_len("+a +b", 10))
});
group.register_with_input("+a_+b_+c_top10", &multi_view, |benv: &BenchIndex| {
black_box(benv.topk_len("+a +b +c", 10))
});
// OR queries
group.register_with_input("a_OR_b_count", &multi_view, |benv: &BenchIndex| {
black_box(benv.count_query("a OR b"))
});
group.register_with_input("a_OR_b_OR_c_count", &multi_view, |benv: &BenchIndex| {
black_box(benv.count_query("a OR b OR c"))
});
group.register_with_input("a_OR_b_top10", &multi_view, |benv: &BenchIndex| {
black_box(benv.topk_len("a OR b", 10))
});
group.register_with_input("a_OR_b_OR_c_top10", &multi_view, |benv: &BenchIndex| {
black_box(benv.topk_len("a OR b OR c", 10))
});
group.run();
}
}
}

View File

@@ -1,69 +0,0 @@
use binggan::plugins::PeakMemAllocPlugin;
use binggan::{black_box, InputGroup, PeakMemAlloc, INSTRUMENTED_SYSTEM};
use serde_json::json;
use tantivy::collector::Count;
use tantivy::query::ExistsQuery;
use tantivy::schema::{Schema, FAST, TEXT};
use tantivy::{doc, Index};
#[global_allocator]
pub static GLOBAL: &PeakMemAlloc<std::alloc::System> = &INSTRUMENTED_SYSTEM;
fn main() {
let doc_count: usize = 500_000;
let subfield_counts: &[usize] = &[1, 2, 3, 4, 5, 6, 7, 8, 16, 256, 4096, 65536, 262144];
let indices: Vec<(String, Index)> = subfield_counts
.iter()
.map(|&sub_fields| {
(
format!("subfields={sub_fields}"),
build_index_with_json_subfields(doc_count, sub_fields),
)
})
.collect();
let mut group = InputGroup::new_with_inputs(indices);
group.add_plugin(PeakMemAllocPlugin::new(GLOBAL));
group.config().num_iter_group = Some(1);
group.config().num_iter_bench = Some(1);
group.register("exists_json", exists_json_union);
group.run();
}
fn exists_json_union(index: &Index) {
let reader = index.reader().expect("reader");
let searcher = reader.searcher();
let query = ExistsQuery::new("json".to_string(), true);
let count = searcher.search(&query, &Count).expect("exists search");
// Prevents optimizer from eliding the search
black_box(count);
}
fn build_index_with_json_subfields(num_docs: usize, num_subfields: usize) -> Index {
// Schema: single JSON field stored as FAST to support ExistsQuery.
let mut schema_builder = Schema::builder();
let json_field = schema_builder.add_json_field("json", TEXT | FAST);
let schema = schema_builder.build();
let index = Index::create_from_tempdir(schema).expect("create index");
{
let mut index_writer = index
.writer_with_num_threads(1, 200_000_000)
.expect("writer");
for i in 0..num_docs {
let sub = i % num_subfields;
// Only one subpath set per document; rotate subpaths so that
// no single subpath is full, but the union covers all docs.
let v = json!({ format!("field_{sub}"): i as u64 });
index_writer
.add_document(doc!(json_field => v))
.expect("add_document");
}
index_writer.commit().expect("commit");
}
index
}

View File

@@ -1,7 +1,7 @@
[package]
name = "tantivy-bitpacker"
version = "0.9.0"
edition = "2024"
version = "0.6.0"
edition = "2021"
authors = ["Paul Masurel <paul.masurel@gmail.com>"]
license = "MIT"
categories = []

View File

@@ -48,7 +48,7 @@ impl BitPacker {
pub fn flush<TWrite: io::Write + ?Sized>(&mut self, output: &mut TWrite) -> io::Result<()> {
if self.mini_buffer_written > 0 {
let num_bytes = self.mini_buffer_written.div_ceil(8);
let num_bytes = (self.mini_buffer_written + 7) / 8;
let bytes = self.mini_buffer.to_le_bytes();
output.write_all(&bytes[..num_bytes])?;
self.mini_buffer_written = 0;
@@ -65,7 +65,7 @@ impl BitPacker {
#[derive(Clone, Debug, Default, Copy)]
pub struct BitUnpacker {
num_bits: usize,
num_bits: u32,
mask: u64,
}
@@ -83,7 +83,7 @@ impl BitUnpacker {
(1u64 << num_bits) - 1u64
};
BitUnpacker {
num_bits: usize::from(num_bits),
num_bits: u32::from(num_bits),
mask,
}
}
@@ -94,7 +94,7 @@ impl BitUnpacker {
#[inline]
pub fn get(&self, idx: u32, data: &[u8]) -> u64 {
let addr_in_bits = idx as usize * self.num_bits;
let addr_in_bits = idx as usize * self.num_bits as usize;
let addr = addr_in_bits >> 3;
if addr + 8 > data.len() {
if self.num_bits == 0 {
@@ -134,13 +134,12 @@ impl BitUnpacker {
"Bitwidth must be <= 32 to use this method."
);
let end_idx: u32 = start_idx + output.len() as u32;
let end_idx = start_idx + output.len() as u32;
// We use `usize` here to avoid overflow issues.
let end_bit_read = (end_idx as usize) * self.num_bits;
let end_byte_read = end_bit_read.div_ceil(8);
let end_bit_read = end_idx * self.num_bits;
let end_byte_read = (end_bit_read + 7) / 8;
assert!(
end_byte_read <= data.len(),
end_byte_read as usize <= data.len(),
"Requested index is out of bounds."
);
@@ -160,24 +159,24 @@ impl BitUnpacker {
// We want the start of the fast track to start align with bytes.
// A sufficient condition is to start with an idx that is a multiple of 8,
// so highway start is the closest multiple of 8 that is >= start_idx.
let entrance_ramp_len: u32 = 8 - (start_idx % 8) % 8;
let entrance_ramp_len = 8 - (start_idx % 8) % 8;
let highway_start: u32 = start_idx + entrance_ramp_len;
if highway_start + (BitPacker1x::BLOCK_LEN as u32) > end_idx {
if highway_start + BitPacker1x::BLOCK_LEN as u32 > end_idx {
// We don't have enough values to have even a single block of highway.
// Let's just supply the values the simple way.
get_batch_ramp(start_idx, output);
return;
}
let num_blocks: usize = (end_idx - highway_start) as usize / BitPacker1x::BLOCK_LEN;
let num_blocks: u32 = (end_idx - highway_start) / BitPacker1x::BLOCK_LEN as u32;
// Entrance ramp
get_batch_ramp(start_idx, &mut output[..entrance_ramp_len as usize]);
// Highway
let mut offset = (highway_start as usize * self.num_bits) / 8;
let mut offset = (highway_start * self.num_bits) as usize / 8;
let mut output_cursor = (highway_start - start_idx) as usize;
for _ in 0..num_blocks {
offset += BitPacker1x.decompress(
@@ -189,7 +188,7 @@ impl BitUnpacker {
}
// Exit ramp
let highway_end: u32 = highway_start + (num_blocks * BitPacker1x::BLOCK_LEN) as u32;
let highway_end = highway_start + num_blocks * BitPacker1x::BLOCK_LEN as u32;
get_batch_ramp(highway_end, &mut output[output_cursor..]);
}

View File

@@ -1,6 +1,6 @@
use super::bitpacker::BitPacker;
use super::compute_num_bits;
use crate::{BitUnpacker, minmax};
use crate::{minmax, BitUnpacker};
const BLOCK_SIZE: usize = 128;
@@ -34,7 +34,7 @@ struct BlockedBitpackerEntryMetaData {
impl BlockedBitpackerEntryMetaData {
fn new(offset: u64, num_bits: u8, base_value: u64) -> Self {
let encoded = offset | (u64::from(num_bits) << (64 - 8));
let encoded = offset | (num_bits as u64) << (64 - 8);
Self {
encoded,
base_value,
@@ -140,10 +140,10 @@ impl BlockedBitpacker {
pub fn iter(&self) -> impl Iterator<Item = u64> + '_ {
// todo performance: we could decompress a whole block and cache it instead
let bitpacked_elems = self.offset_and_bits.len() * BLOCK_SIZE;
(0..bitpacked_elems)
let iter = (0..bitpacked_elems)
.map(move |idx| self.get(idx))
.chain(self.buffer.iter().cloned())
.chain(self.buffer.iter().cloned());
iter
}
}

View File

@@ -33,7 +33,11 @@ pub use crate::blocked_bitpacker::BlockedBitpacker;
/// number of bits.
pub fn compute_num_bits(n: u64) -> u8 {
let amplitude = (64u32 - n.leading_zeros()) as u8;
if amplitude <= 64 - 8 { amplitude } else { 64 }
if amplitude <= 64 - 8 {
amplitude
} else {
64
}
}
/// Computes the (min, max) of an iterator of `PartialOrd` values.

View File

@@ -16,14 +16,14 @@ body = """
{%- if version %} in {{ version }}{%- endif -%}
{% for commit in commits %}
{% if commit.remote.pr_title -%}
{%- set commit_message = commit.remote.pr_title -%}
{% if commit.github.pr_title -%}
{%- set commit_message = commit.github.pr_title -%}
{%- else -%}
{%- set commit_message = commit.message -%}
{%- endif -%}
- {{ commit_message | split(pat="\n") | first | trim }}\
{% if commit.remote.pr_number %} \
[#{{ commit.remote.pr_number }}]({{ self::remote_url() }}/pull/{{ commit.remote.pr_number }}){% if commit.remote.username %}(@{{ commit.remote.username }}){%- endif -%} \
{% if commit.github.pr_number %} \
[#{{ commit.github.pr_number }}]({{ self::remote_url() }}/pull/{{ commit.github.pr_number }}){% if commit.github.username %}(@{{ commit.github.username }}){%- endif -%} \
{%- endif %}
{%- endfor -%}

View File

@@ -1,7 +1,7 @@
[package]
name = "tantivy-columnar"
version = "0.6.0"
edition = "2024"
version = "0.3.0"
edition = "2021"
license = "MIT"
homepage = "https://github.com/quickwit-oss/tantivy"
repository = "https://github.com/quickwit-oss/tantivy"
@@ -12,10 +12,10 @@ categories = ["database-implementations", "data-structures", "compression"]
itertools = "0.14.0"
fastdivide = "0.4.0"
stacker = { version= "0.6", path = "../stacker", package="tantivy-stacker"}
sstable = { version= "0.6", path = "../sstable", package = "tantivy-sstable" }
common = { version= "0.10", path = "../common", package = "tantivy-common" }
tantivy-bitpacker = { version= "0.9", path = "../bitpacker/" }
stacker = { version= "0.3", path = "../stacker", package="tantivy-stacker"}
sstable = { version= "0.3", path = "../sstable", package = "tantivy-sstable" }
common = { version= "0.7", path = "../common", package = "tantivy-common" }
tantivy-bitpacker = { version= "0.6", path = "../bitpacker/" }
serde = "1.0.152"
downcast-rs = "2.0.1"
@@ -33,29 +33,6 @@ harness = false
name = "bench_access"
harness = false
[[bench]]
name = "bench_first_vals"
harness = false
[[bench]]
name = "bench_values_u64"
harness = false
[[bench]]
name = "bench_values_u128"
harness = false
[[bench]]
name = "bench_create_column_values"
harness = false
[[bench]]
name = "bench_column_values_get"
harness = false
[[bench]]
name = "bench_optional_index"
harness = false
[features]
zstd-compression = ["sstable/zstd-compression"]
unstable = []

View File

@@ -1,4 +1,4 @@
use binggan::{InputGroup, black_box};
use binggan::{black_box, InputGroup};
use common::*;
use tantivy_columnar::Column;
@@ -19,7 +19,7 @@ fn main() {
let mut add_card = |card1: Card| {
inputs.push((
card1.to_string(),
format!("{card1}"),
generate_columnar_and_open(card1, NUM_DOCS),
));
};
@@ -50,7 +50,6 @@ fn bench_group(mut runner: InputGroup<Column>) {
let mut buffer = vec![None; BLOCK_SIZE];
for i in (0..NUM_DOCS).step_by(BLOCK_SIZE) {
// fill docs
#[allow(clippy::needless_range_loop)]
for idx in 0..BLOCK_SIZE {
docs[idx] = idx as u32 + i;
}

View File

@@ -1,61 +0,0 @@
use std::sync::Arc;
use binggan::{InputGroup, black_box};
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use tantivy_columnar::ColumnValues;
use tantivy_columnar::column_values::{CodecType, serialize_and_load_u64_based_column_values};
fn get_data() -> Vec<u64> {
let mut rng = StdRng::seed_from_u64(2u64);
let mut data: Vec<_> = (100..55_000_u64)
.map(|num| num + rng.r#gen::<u8>() as u64)
.collect();
data.push(99_000);
data.insert(1000, 2000);
data.insert(2000, 100);
data.insert(3000, 4100);
data.insert(4000, 100);
data.insert(5000, 800);
data
}
#[inline(never)]
fn value_iter() -> impl Iterator<Item = u64> {
0..20_000
}
type Col = Arc<dyn ColumnValues<u64>>;
fn main() {
let data = get_data();
let inputs: Vec<(String, Col)> = vec![
(
"bitpacked".to_string(),
serialize_and_load_u64_based_column_values(&data.as_slice(), &[CodecType::Bitpacked]),
),
(
"linear".to_string(),
serialize_and_load_u64_based_column_values(&data.as_slice(), &[CodecType::Linear]),
),
(
"blockwise_linear".to_string(),
serialize_and_load_u64_based_column_values(
&data.as_slice(),
&[CodecType::BlockwiseLinear],
),
),
];
let mut group: InputGroup<Col> = InputGroup::new_with_inputs(inputs);
group.register("fastfield_get", |col: &Col| {
let mut sum = 0u64;
for pos in value_iter() {
sum = sum.wrapping_add(col.get_val(pos as u32));
}
black_box(sum);
});
group.run();
}

View File

@@ -1,44 +0,0 @@
use binggan::{InputGroup, black_box};
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use tantivy_columnar::column_values::{CodecType, serialize_u64_based_column_values};
fn get_data() -> Vec<u64> {
let mut rng = StdRng::seed_from_u64(2u64);
let mut data: Vec<_> = (100..55_000_u64)
.map(|num| num + rng.r#gen::<u8>() as u64)
.collect();
data.push(99_000);
data.insert(1000, 2000);
data.insert(2000, 100);
data.insert(3000, 4100);
data.insert(4000, 100);
data.insert(5000, 800);
data
}
fn main() {
let data = get_data();
let mut group: InputGroup<(CodecType, Vec<u64>)> = InputGroup::new_with_inputs(vec![
(
"bitpacked codec".to_string(),
(CodecType::Bitpacked, data.clone()),
),
(
"linear codec".to_string(),
(CodecType::Linear, data.clone()),
),
(
"blockwise linear codec".to_string(),
(CodecType::BlockwiseLinear, data.clone()),
),
]);
group.register("serialize column_values", |data| {
let mut buffer = Vec::new();
serialize_u64_based_column_values(&data.1.as_slice(), &[data.0], &mut buffer).unwrap();
black_box(buffer.len());
});
group.run();
}

View File

@@ -1,9 +1,12 @@
#![feature(test)]
extern crate test;
use std::sync::Arc;
use binggan::{InputGroup, black_box};
use rand::prelude::*;
use tantivy_columnar::column_values::{CodecType, serialize_and_load_u64_based_column_values};
use tantivy_columnar::column_values::{serialize_and_load_u64_based_column_values, CodecType};
use tantivy_columnar::*;
use test::{black_box, Bencher};
struct Columns {
pub optional: Column,
@@ -65,45 +68,88 @@ pub fn serialize_and_load(column: &[u64], codec_type: CodecType) -> Arc<dyn Colu
serialize_and_load_u64_based_column_values(&column, &[codec_type])
}
fn main() {
let Columns {
optional,
full,
multi,
} = get_test_columns();
let inputs = vec![
("full".to_string(), full),
("optional".to_string(), optional),
("multi".to_string(), multi),
];
let mut group = InputGroup::new_with_inputs(inputs);
group.register("first_full_scan", |column| {
fn run_bench_on_column_full_scan(b: &mut Bencher, column: Column) {
let num_iter = black_box(NUM_VALUES);
b.iter(|| {
let mut sum = 0u64;
for i in 0..NUM_VALUES as u32 {
for i in 0..num_iter as u32 {
let val = column.first(i);
sum += val.unwrap_or(0);
}
black_box(sum);
sum
});
group.register("first_block_fetch", |column| {
let mut block: Vec<Option<u64>> = vec![None; 64];
let fetch_docids = (0..64).collect::<Vec<_>>();
}
fn run_bench_on_column_block_fetch(b: &mut Bencher, column: Column) {
let mut block: Vec<Option<u64>> = vec![None; 64];
let fetch_docids = (0..64).collect::<Vec<_>>();
b.iter(move || {
column.first_vals(&fetch_docids, &mut block);
black_box(block[0]);
block[0]
});
group.register("first_block_single_calls", |column| {
let mut block: Vec<Option<u64>> = vec![None; 64];
let fetch_docids = (0..64).collect::<Vec<_>>();
}
fn run_bench_on_column_block_single_calls(b: &mut Bencher, column: Column) {
let mut block: Vec<Option<u64>> = vec![None; 64];
let fetch_docids = (0..64).collect::<Vec<_>>();
b.iter(move || {
for i in 0..fetch_docids.len() {
block[i] = column.first(fetch_docids[i]);
}
black_box(block[0]);
block[0]
});
group.run();
}
/// Column first method
#[bench]
fn bench_get_first_on_full_column_full_scan(b: &mut Bencher) {
let column = get_test_columns().full;
run_bench_on_column_full_scan(b, column);
}
#[bench]
fn bench_get_first_on_optional_column_full_scan(b: &mut Bencher) {
let column = get_test_columns().optional;
run_bench_on_column_full_scan(b, column);
}
#[bench]
fn bench_get_first_on_multi_column_full_scan(b: &mut Bencher) {
let column = get_test_columns().multi;
run_bench_on_column_full_scan(b, column);
}
/// Block fetch column accessor
#[bench]
fn bench_get_block_first_on_optional_column(b: &mut Bencher) {
let column = get_test_columns().optional;
run_bench_on_column_block_fetch(b, column);
}
#[bench]
fn bench_get_block_first_on_multi_column(b: &mut Bencher) {
let column = get_test_columns().multi;
run_bench_on_column_block_fetch(b, column);
}
#[bench]
fn bench_get_block_first_on_full_column(b: &mut Bencher) {
let column = get_test_columns().full;
run_bench_on_column_block_fetch(b, column);
}
#[bench]
fn bench_get_block_first_on_optional_column_single_calls(b: &mut Bencher) {
let column = get_test_columns().optional;
run_bench_on_column_block_single_calls(b, column);
}
#[bench]
fn bench_get_block_first_on_multi_column_single_calls(b: &mut Bencher) {
let column = get_test_columns().multi;
run_bench_on_column_block_single_calls(b, column);
}
#[bench]
fn bench_get_block_first_on_full_column_single_calls(b: &mut Bencher) {
let column = get_test_columns().full;
run_bench_on_column_block_single_calls(b, column);
}

View File

@@ -1,7 +1,7 @@
pub mod common;
use binggan::BenchRunner;
use common::{Card, generate_columnar_with_name};
use common::{generate_columnar_with_name, Card};
use tantivy_columnar::*;
const NUM_DOCS: u32 = 100_000;

View File

@@ -1,106 +0,0 @@
use binggan::{InputGroup, black_box};
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use tantivy_columnar::column_index::{OptionalIndex, Set};
const TOTAL_NUM_VALUES: u32 = 1_000_000;
fn gen_optional_index(fill_ratio: f64) -> OptionalIndex {
let mut rng: StdRng = StdRng::from_seed([1u8; 32]);
let vals: Vec<u32> = (0..TOTAL_NUM_VALUES)
.map(|_| rng.gen_bool(fill_ratio))
.enumerate()
.filter(|(_pos, val)| *val)
.map(|(pos, _)| pos as u32)
.collect();
OptionalIndex::for_test(TOTAL_NUM_VALUES, &vals)
}
fn random_range_iterator(
start: u32,
end: u32,
avg_step_size: u32,
avg_deviation: u32,
) -> impl Iterator<Item = u32> {
let mut rng: StdRng = StdRng::from_seed([1u8; 32]);
let mut current = start;
std::iter::from_fn(move || {
current += rng.gen_range(avg_step_size - avg_deviation..=avg_step_size + avg_deviation);
if current >= end { None } else { Some(current) }
})
}
fn n_percent_step_iterator(percent: f32, num_values: u32) -> impl Iterator<Item = u32> {
let ratio = percent / 100.0;
let step_size = (1f32 / ratio) as u32;
let deviation = step_size - 1;
random_range_iterator(0, num_values, step_size, deviation)
}
fn walk_over_data(codec: &OptionalIndex, avg_step_size: u32) -> Option<u32> {
walk_over_data_from_positions(
codec,
random_range_iterator(0, TOTAL_NUM_VALUES, avg_step_size, 0),
)
}
fn walk_over_data_from_positions(
codec: &OptionalIndex,
positions: impl Iterator<Item = u32>,
) -> Option<u32> {
let mut dense_idx: Option<u32> = None;
for idx in positions {
dense_idx = dense_idx.or(codec.rank_if_exists(idx));
}
dense_idx
}
fn main() {
// Build separate inputs for each fill ratio.
let inputs: Vec<(String, OptionalIndex)> = vec![
("fill=1%".to_string(), gen_optional_index(0.01)),
("fill=5%".to_string(), gen_optional_index(0.05)),
("fill=10%".to_string(), gen_optional_index(0.10)),
("fill=50%".to_string(), gen_optional_index(0.50)),
("fill=90%".to_string(), gen_optional_index(0.90)),
];
let mut group: InputGroup<OptionalIndex> = InputGroup::new_with_inputs(inputs);
// Translate orig->codec (rank_if_exists) with sampling
group.register("orig_to_codec_10pct_hit", |codec: &OptionalIndex| {
black_box(walk_over_data(codec, 100));
});
group.register("orig_to_codec_1pct_hit", |codec: &OptionalIndex| {
black_box(walk_over_data(codec, 1000));
});
group.register("orig_to_codec_full_scan", |codec: &OptionalIndex| {
black_box(walk_over_data_from_positions(codec, 0..TOTAL_NUM_VALUES));
});
// Translate codec->orig (select/select_batch) on sampled ranks
fn bench_translate_codec_to_orig_util(codec: &OptionalIndex, percent_hit: f32) {
let num_non_nulls = codec.num_non_nulls();
let idxs: Vec<u32> = if percent_hit == 100.0f32 {
(0..num_non_nulls).collect()
} else {
n_percent_step_iterator(percent_hit, num_non_nulls).collect()
};
let mut output = vec![0u32; idxs.len()];
output.copy_from_slice(&idxs[..]);
codec.select_batch(&mut output);
black_box(output);
}
group.register("codec_to_orig_0.005pct_hit", |codec: &OptionalIndex| {
bench_translate_codec_to_orig_util(codec, 0.005);
});
group.register("codec_to_orig_10pct_hit", |codec: &OptionalIndex| {
bench_translate_codec_to_orig_util(codec, 10.0);
});
group.register("codec_to_orig_full_scan", |codec: &OptionalIndex| {
bench_translate_codec_to_orig_util(codec, 100.0);
});
group.run();
}

View File

@@ -1,12 +1,15 @@
#![feature(test)]
use std::ops::RangeInclusive;
use std::sync::Arc;
use binggan::{InputGroup, black_box};
use common::OwnedBytes;
use rand::rngs::StdRng;
use rand::seq::SliceRandom;
use rand::{Rng, SeedableRng, random};
use rand::{random, Rng, SeedableRng};
use tantivy_columnar::ColumnValues;
use test::Bencher;
extern crate test;
// TODO does this make sense for IPv6 ?
fn generate_random() -> Vec<u64> {
@@ -44,77 +47,78 @@ fn get_data_50percent_item() -> Vec<u128> {
}
data.push(SINGLE_ITEM);
data.shuffle(&mut rng);
data.iter().map(|el| *el as u128).collect::<Vec<_>>()
let data = data.iter().map(|el| *el as u128).collect::<Vec<_>>();
data
}
fn main() {
#[bench]
fn bench_intfastfield_getrange_u128_50percent_hit(b: &mut Bencher) {
let data = get_data_50percent_item();
let column_range = get_u128_column_from_data(&data);
let column_random = get_u128_column_random();
let column = get_u128_column_from_data(&data);
struct Inputs {
data: Vec<u128>,
column_range: Arc<dyn ColumnValues<u128>>,
column_random: Arc<dyn ColumnValues<u128>>,
}
let inputs = Inputs {
data,
column_range,
column_random,
};
let mut group: InputGroup<Inputs> =
InputGroup::new_with_inputs(vec![("u128 benches".to_string(), inputs)]);
group.register(
"intfastfield_getrange_u128_50percent_hit",
|inp: &Inputs| {
let mut positions = Vec::new();
inp.column_range.get_row_ids_for_value_range(
*FIFTY_PERCENT_RANGE.start() as u128..=*FIFTY_PERCENT_RANGE.end() as u128,
0..inp.data.len() as u32,
&mut positions,
);
black_box(positions.len());
},
);
group.register("intfastfield_getrange_u128_single_hit", |inp: &Inputs| {
b.iter(|| {
let mut positions = Vec::new();
inp.column_range.get_row_ids_for_value_range(
column.get_row_ids_for_value_range(
*FIFTY_PERCENT_RANGE.start() as u128..=*FIFTY_PERCENT_RANGE.end() as u128,
0..data.len() as u32,
&mut positions,
);
positions
});
}
#[bench]
fn bench_intfastfield_getrange_u128_single_hit(b: &mut Bencher) {
let data = get_data_50percent_item();
let column = get_u128_column_from_data(&data);
b.iter(|| {
let mut positions = Vec::new();
column.get_row_ids_for_value_range(
*SINGLE_ITEM_RANGE.start() as u128..=*SINGLE_ITEM_RANGE.end() as u128,
0..inp.data.len() as u32,
0..data.len() as u32,
&mut positions,
);
black_box(positions.len());
positions
});
}
group.register("intfastfield_getrange_u128_hit_all", |inp: &Inputs| {
#[bench]
fn bench_intfastfield_getrange_u128_hit_all(b: &mut Bencher) {
let data = get_data_50percent_item();
let column = get_u128_column_from_data(&data);
b.iter(|| {
let mut positions = Vec::new();
inp.column_range.get_row_ids_for_value_range(
0..=u128::MAX,
0..inp.data.len() as u32,
&mut positions,
);
black_box(positions.len());
column.get_row_ids_for_value_range(0..=u128::MAX, 0..data.len() as u32, &mut positions);
positions
});
}
// U128 RANGE END
group.register("intfastfield_scan_all_fflookup_u128", |inp: &Inputs| {
#[bench]
fn bench_intfastfield_scan_all_fflookup_u128(b: &mut Bencher) {
let column = get_u128_column_random();
b.iter(|| {
let mut a = 0u128;
for i in 0u64..inp.column_random.num_vals() as u64 {
a += inp.column_random.get_val(i as u32);
for i in 0u64..column.num_vals() as u64 {
a += column.get_val(i as u32);
}
black_box(a);
a
});
}
group.register("intfastfield_jumpy_stride5_u128", |inp: &Inputs| {
let n = inp.column_random.num_vals();
#[bench]
fn bench_intfastfield_jumpy_stride5_u128(b: &mut Bencher) {
let column = get_u128_column_random();
b.iter(|| {
let n = column.num_vals();
let mut a = 0u128;
for i in (0..n / 5).map(|val| val * 5) {
a += inp.column_random.get_val(i);
a += column.get_val(i);
}
black_box(a);
a
});
group.run();
}

View File

@@ -1,10 +1,13 @@
#![feature(test)]
extern crate test;
use std::ops::RangeInclusive;
use std::sync::Arc;
use binggan::{InputGroup, black_box};
use rand::prelude::*;
use tantivy_columnar::column_values::{CodecType, serialize_and_load_u64_based_column_values};
use tantivy_columnar::column_values::{serialize_and_load_u64_based_column_values, CodecType};
use tantivy_columnar::*;
use test::Bencher;
// Warning: this generates the same permutation at each call
fn generate_permutation() -> Vec<u64> {
@@ -24,11 +27,37 @@ pub fn serialize_and_load(column: &[u64], codec_type: CodecType) -> Arc<dyn Colu
serialize_and_load_u64_based_column_values(&column, &[codec_type])
}
#[bench]
fn bench_intfastfield_jumpy_veclookup(b: &mut Bencher) {
let permutation = generate_permutation();
let n = permutation.len();
b.iter(|| {
let mut a = 0u64;
for _ in 0..n {
a = permutation[a as usize];
}
a
});
}
#[bench]
fn bench_intfastfield_jumpy_fflookup_bitpacked(b: &mut Bencher) {
let permutation = generate_permutation();
let n = permutation.len();
let column: Arc<dyn ColumnValues<u64>> = serialize_and_load(&permutation, CodecType::Bitpacked);
b.iter(|| {
let mut a = 0u64;
for _ in 0..n {
a = column.get_val(a as u32);
}
a
});
}
const FIFTY_PERCENT_RANGE: RangeInclusive<u64> = 1..=50;
const SINGLE_ITEM: u64 = 90;
const SINGLE_ITEM_RANGE: RangeInclusive<u64> = 90..=90;
const ONE_PERCENT_ITEM_RANGE: RangeInclusive<u64> = 49..=49;
fn get_data_50percent_item() -> Vec<u128> {
let mut rng = StdRng::from_seed([1u8; 32]);
@@ -40,122 +69,135 @@ fn get_data_50percent_item() -> Vec<u128> {
data.push(SINGLE_ITEM);
data.shuffle(&mut rng);
data.iter().map(|el| *el as u128).collect::<Vec<_>>()
let data = data.iter().map(|el| *el as u128).collect::<Vec<_>>();
data
}
type VecCol = (Vec<u64>, Arc<dyn ColumnValues<u64>>);
// U64 RANGE START
#[bench]
fn bench_intfastfield_getrange_u64_50percent_hit(b: &mut Bencher) {
let data = get_data_50percent_item();
let data = data.iter().map(|el| *el as u64).collect::<Vec<_>>();
let column: Arc<dyn ColumnValues<u64>> = serialize_and_load(&data, CodecType::Bitpacked);
b.iter(|| {
let mut positions = Vec::new();
column.get_row_ids_for_value_range(
FIFTY_PERCENT_RANGE,
0..data.len() as u32,
&mut positions,
);
positions
});
}
fn bench_access() {
#[bench]
fn bench_intfastfield_getrange_u64_1percent_hit(b: &mut Bencher) {
let data = get_data_50percent_item();
let data = data.iter().map(|el| *el as u64).collect::<Vec<_>>();
let column: Arc<dyn ColumnValues<u64>> = serialize_and_load(&data, CodecType::Bitpacked);
b.iter(|| {
let mut positions = Vec::new();
column.get_row_ids_for_value_range(
ONE_PERCENT_ITEM_RANGE,
0..data.len() as u32,
&mut positions,
);
positions
});
}
#[bench]
fn bench_intfastfield_getrange_u64_single_hit(b: &mut Bencher) {
let data = get_data_50percent_item();
let data = data.iter().map(|el| *el as u64).collect::<Vec<_>>();
let column: Arc<dyn ColumnValues<u64>> = serialize_and_load(&data, CodecType::Bitpacked);
b.iter(|| {
let mut positions = Vec::new();
column.get_row_ids_for_value_range(SINGLE_ITEM_RANGE, 0..data.len() as u32, &mut positions);
positions
});
}
#[bench]
fn bench_intfastfield_getrange_u64_hit_all(b: &mut Bencher) {
let data = get_data_50percent_item();
let data = data.iter().map(|el| *el as u64).collect::<Vec<_>>();
let column: Arc<dyn ColumnValues<u64>> = serialize_and_load(&data, CodecType::Bitpacked);
b.iter(|| {
let mut positions = Vec::new();
column.get_row_ids_for_value_range(0..=u64::MAX, 0..data.len() as u32, &mut positions);
positions
});
}
// U64 RANGE END
#[bench]
fn bench_intfastfield_stride7_vec(b: &mut Bencher) {
let permutation = generate_permutation();
let column_perm: Arc<dyn ColumnValues<u64>> =
serialize_and_load(&permutation, CodecType::Bitpacked);
let permutation_gcd = generate_permutation_gcd();
let column_perm_gcd: Arc<dyn ColumnValues<u64>> =
serialize_and_load(&permutation_gcd, CodecType::Bitpacked);
let mut group: InputGroup<VecCol> = InputGroup::new_with_inputs(vec![
(
"access".to_string(),
(permutation.clone(), column_perm.clone()),
),
(
"access_gcd".to_string(),
(permutation_gcd.clone(), column_perm_gcd.clone()),
),
]);
group.register("stride7_vec", |inp: &VecCol| {
let n = inp.0.len();
let n = permutation.len();
b.iter(|| {
let mut a = 0u64;
for i in (0..n / 7).map(|val| val * 7) {
a += inp.0[i];
a += permutation[i as usize];
}
black_box(a);
a
});
}
group.register("fullscan_vec", |inp: &VecCol| {
let mut a = 0u64;
for i in 0..inp.0.len() {
a += inp.0[i];
}
black_box(a);
});
group.register("stride7_column_values", |inp: &VecCol| {
let n = inp.1.num_vals() as usize;
let mut a = 0u64;
#[bench]
fn bench_intfastfield_stride7_fflookup(b: &mut Bencher) {
let permutation = generate_permutation();
let n = permutation.len();
let column: Arc<dyn ColumnValues<u64>> = serialize_and_load(&permutation, CodecType::Bitpacked);
b.iter(|| {
let mut a = 0;
for i in (0..n / 7).map(|val| val * 7) {
a += inp.1.get_val(i as u32);
a += column.get_val(i as u32);
}
black_box(a);
a
});
}
group.register("fullscan_column_values", |inp: &VecCol| {
#[bench]
fn bench_intfastfield_scan_all_fflookup(b: &mut Bencher) {
let permutation = generate_permutation();
let n = permutation.len();
let column: Arc<dyn ColumnValues<u64>> = serialize_and_load(&permutation, CodecType::Bitpacked);
let column_ref = column.as_ref();
b.iter(|| {
let mut a = 0u64;
for i in 0u32..n as u32 {
a += column_ref.get_val(i);
}
a
});
}
#[bench]
fn bench_intfastfield_scan_all_fflookup_gcd(b: &mut Bencher) {
let permutation = generate_permutation_gcd();
let n = permutation.len();
let column: Arc<dyn ColumnValues<u64>> = serialize_and_load(&permutation, CodecType::Bitpacked);
b.iter(|| {
let mut a = 0u64;
let n = inp.1.num_vals() as usize;
for i in 0..n {
a += inp.1.get_val(i as u32);
a += column.get_val(i as u32);
}
black_box(a);
a
});
group.run();
}
fn bench_range() {
let data_50 = get_data_50percent_item();
let data_u64 = data_50.iter().map(|el| *el as u64).collect::<Vec<_>>();
let column_data: Arc<dyn ColumnValues<u64>> =
serialize_and_load(&data_u64, CodecType::Bitpacked);
let mut group: InputGroup<Arc<dyn ColumnValues<u64>>> =
InputGroup::new_with_inputs(vec![("dist_50pct_item".to_string(), column_data.clone())]);
group.register(
"fastfield_getrange_u64_50percent_hit",
|col: &Arc<dyn ColumnValues<u64>>| {
let mut positions = Vec::new();
col.get_row_ids_for_value_range(FIFTY_PERCENT_RANGE, 0..col.num_vals(), &mut positions);
black_box(positions.len());
},
);
group.register(
"fastfield_getrange_u64_1percent_hit",
|col: &Arc<dyn ColumnValues<u64>>| {
let mut positions = Vec::new();
col.get_row_ids_for_value_range(
ONE_PERCENT_ITEM_RANGE,
0..col.num_vals(),
&mut positions,
);
black_box(positions.len());
},
);
group.register(
"fastfield_getrange_u64_single_hit",
|col: &Arc<dyn ColumnValues<u64>>| {
let mut positions = Vec::new();
col.get_row_ids_for_value_range(SINGLE_ITEM_RANGE, 0..col.num_vals(), &mut positions);
black_box(positions.len());
},
);
group.register(
"fastfield_getrange_u64_hit_all",
|col: &Arc<dyn ColumnValues<u64>>| {
let mut positions = Vec::new();
col.get_row_ids_for_value_range(0..=u64::MAX, 0..col.num_vals(), &mut positions);
black_box(positions.len());
},
);
group.run();
}
fn main() {
bench_access();
bench_range();
#[bench]
fn bench_intfastfield_scan_all_vec(b: &mut Bencher) {
let permutation = generate_permutation();
b.iter(|| {
let mut a = 0u64;
for i in 0..permutation.len() {
a += permutation[i as usize] as u64;
}
a
});
}

View File

@@ -66,7 +66,7 @@ impl<T: PartialOrd + Copy + std::fmt::Debug + Send + Sync + 'static + Default>
&'a self,
docs: &'a [u32],
accessor: &Column<T>,
) -> impl Iterator<Item = (DocId, T)> + 'a + use<'a, T> {
) -> impl Iterator<Item = (DocId, T)> + 'a {
if accessor.index.get_cardinality().is_full() {
docs.iter().cloned().zip(self.val_cache.iter().cloned())
} else {
@@ -139,7 +139,7 @@ mod tests {
missing_docs.push(missing_doc);
});
assert_eq!(missing_docs, Vec::<u32>::new());
assert_eq!(missing_docs, vec![]);
}
#[test]

View File

@@ -4,8 +4,8 @@ use std::{fmt, io};
use sstable::{Dictionary, VoidSSTable};
use crate::RowId;
use crate::column::Column;
use crate::RowId;
/// Dictionary encoded column.
///

View File

@@ -9,14 +9,13 @@ use std::sync::Arc;
use common::BinarySerializable;
pub use dictionary_encoded::{BytesColumn, StrColumn};
pub use serialize::{
open_column_bytes, open_column_str, open_column_u64, open_column_u128,
open_column_u128_as_compact_u64, serialize_column_mappable_to_u64,
serialize_column_mappable_to_u128,
open_column_bytes, open_column_str, open_column_u128, open_column_u128_as_compact_u64,
open_column_u64, serialize_column_mappable_to_u128, serialize_column_mappable_to_u64,
};
use crate::column_index::{ColumnIndex, Set};
use crate::column_values::monotonic_mapping::StrictlyMonotonicMappingToInternal;
use crate::column_values::{ColumnValues, monotonic_map_column};
use crate::column_values::{monotonic_map_column, ColumnValues};
use crate::{Cardinality, DocId, EmptyColumnValues, MonotonicallyMappableToU64, RowId};
#[derive(Clone)]
@@ -114,7 +113,7 @@ impl<T: PartialOrd + Copy + Debug + Send + Sync + 'static> Column<T> {
}
}
/// Translates a block of docids to row_ids.
/// Translates a block of docis to row_ids.
///
/// returns the row_ids and the matching docids on the same index
/// e.g.

View File

@@ -6,10 +6,10 @@ use common::OwnedBytes;
use sstable::Dictionary;
use crate::column::{BytesColumn, Column};
use crate::column_index::{SerializableColumnIndex, serialize_column_index};
use crate::column_index::{serialize_column_index, SerializableColumnIndex};
use crate::column_values::{
CodecType, MonotonicallyMappableToU64, MonotonicallyMappableToU128,
load_u64_based_column_values, serialize_column_values_u128, serialize_u64_based_column_values,
CodecType, MonotonicallyMappableToU128, MonotonicallyMappableToU64,
};
use crate::iterable::Iterable;
use crate::{StrColumn, Version};

View File

@@ -99,9 +99,9 @@ mod tests {
use crate::column_index::merge::detect_cardinality;
use crate::column_index::multivalued_index::{
MultiValueIndex, open_multivalued_index, serialize_multivalued_index,
open_multivalued_index, serialize_multivalued_index, MultiValueIndex,
};
use crate::column_index::{OptionalIndex, SerializableColumnIndex, merge_column_index};
use crate::column_index::{merge_column_index, OptionalIndex, SerializableColumnIndex};
use crate::{
Cardinality, ColumnIndex, MergeRowOrder, RowAddr, RowId, ShuffleMergeOrder, StackMergeOrder,
};

View File

@@ -137,8 +137,8 @@ impl Iterable<u32> for ShuffledMultivaluedIndex<'_> {
#[cfg(test)]
mod tests {
use super::*;
use crate::RowAddr;
use crate::column_index::OptionalIndex;
use crate::RowAddr;
#[test]
fn test_integrate_num_vals_empty() {

View File

@@ -1,8 +1,8 @@
use std::ops::Range;
use crate::column_index::SerializableColumnIndex;
use crate::column_index::multivalued_index::{MultiValueIndex, SerializableMultivalueIndex};
use crate::column_index::serialize::SerializableOptionalIndex;
use crate::column_index::SerializableColumnIndex;
use crate::iterable::Iterable;
use crate::{Cardinality, ColumnIndex, RowId, StackMergeOrder};
@@ -56,7 +56,7 @@ fn get_doc_ids_with_values<'a>(
ColumnIndex::Full => Box::new(doc_range),
ColumnIndex::Optional(optional_index) => Box::new(
optional_index
.iter_non_null_docs()
.iter_rows()
.map(move |row| row + doc_range.start),
),
ColumnIndex::Multivalued(multivalued_index) => match multivalued_index {
@@ -73,7 +73,7 @@ fn get_doc_ids_with_values<'a>(
MultiValueIndex::MultiValueIndexV2(multivalued_index) => Box::new(
multivalued_index
.optional_index
.iter_non_null_docs()
.iter_rows()
.map(move |row| row + doc_range.start),
),
},
@@ -105,11 +105,10 @@ fn get_num_values_iterator<'a>(
) -> Box<dyn Iterator<Item = u32> + 'a> {
match column_index {
ColumnIndex::Empty { .. } => Box::new(std::iter::empty()),
ColumnIndex::Full => Box::new(std::iter::repeat_n(1u32, num_docs as usize)),
ColumnIndex::Optional(optional_index) => Box::new(std::iter::repeat_n(
1u32,
optional_index.num_non_nulls() as usize,
)),
ColumnIndex::Full => Box::new(std::iter::repeat(1u32).take(num_docs as usize)),
ColumnIndex::Optional(optional_index) => {
Box::new(std::iter::repeat(1u32).take(optional_index.num_non_nulls() as usize))
}
ColumnIndex::Multivalued(multivalued_index) => Box::new(
multivalued_index
.get_start_index_column()
@@ -178,7 +177,7 @@ impl<'a> Iterable<RowId> for StackedOptionalIndex<'a> {
ColumnIndex::Full => Box::new(columnar_row_range),
ColumnIndex::Optional(optional_index) => Box::new(
optional_index
.iter_non_null_docs()
.iter_rows()
.map(move |row_id: RowId| columnar_row_range.start + row_id),
),
ColumnIndex::Multivalued(_) => {

View File

@@ -14,7 +14,7 @@ pub use merge::merge_column_index;
pub(crate) use multivalued_index::SerializableMultivalueIndex;
pub use optional_index::{OptionalIndex, Set};
pub use serialize::{
SerializableColumnIndex, SerializableOptionalIndex, open_column_index, serialize_column_index,
open_column_index, serialize_column_index, SerializableColumnIndex, SerializableOptionalIndex,
};
use crate::column_index::multivalued_index::MultiValueIndex;

View File

@@ -8,7 +8,7 @@ use common::{CountingWriter, OwnedBytes};
use super::optional_index::{open_optional_index, serialize_optional_index};
use super::{OptionalIndex, SerializableOptionalIndex, Set};
use crate::column_values::{
CodecType, ColumnValues, load_u64_based_column_values, serialize_u64_based_column_values,
load_u64_based_column_values, serialize_u64_based_column_values, CodecType, ColumnValues,
};
use crate::iterable::Iterable;
use crate::{DocId, RowId, Version};
@@ -215,32 +215,6 @@ impl MultiValueIndex {
}
}
/// Returns an iterator over document ids that have at least one value.
pub fn iter_non_null_docs(&self) -> Box<dyn Iterator<Item = DocId> + '_> {
match self {
MultiValueIndex::MultiValueIndexV1(idx) => {
let mut doc: DocId = 0u32;
let num_docs = idx.num_docs();
Box::new(std::iter::from_fn(move || {
// This is not the most efficient way to do this, but it's legacy code.
while doc < num_docs {
let cur = doc;
doc += 1;
let start = idx.start_index_column.get_val(cur);
let end = idx.start_index_column.get_val(cur + 1);
if end > start {
return Some(cur);
}
}
None
}))
}
MultiValueIndex::MultiValueIndexV2(idx) => {
Box::new(idx.optional_index.iter_non_null_docs())
}
}
}
/// Converts a list of ranks (row ids of values) in a 1:n index to the corresponding list of
/// docids. Positions are converted inplace to docids.
///

View File

@@ -1,4 +1,4 @@
use std::io;
use std::io::{self, Write};
use std::sync::Arc;
mod set;
@@ -7,11 +7,11 @@ mod set_block;
use common::{BinarySerializable, OwnedBytes, VInt};
pub use set::{SelectCursor, Set, SetCodec};
use set_block::{
DENSE_BLOCK_NUM_BYTES, DenseBlock, DenseBlockCodec, SparseBlock, SparseBlockCodec,
DenseBlock, DenseBlockCodec, SparseBlock, SparseBlockCodec, DENSE_BLOCK_NUM_BYTES,
};
use crate::iterable::Iterable;
use crate::{DocId, RowId};
use crate::{DocId, InvalidData, RowId};
/// The threshold for for number of elements after which we switch to dense block encoding.
///
@@ -80,23 +80,23 @@ impl BlockVariant {
/// index is the block index. For each block `byte_start` and `offset` is computed.
#[derive(Clone)]
pub struct OptionalIndex {
num_docs: RowId,
num_non_null_docs: RowId,
num_rows: RowId,
num_non_null_rows: RowId,
block_data: OwnedBytes,
block_metas: Arc<[BlockMeta]>,
}
impl Iterable<u32> for &OptionalIndex {
fn boxed_iter(&self) -> Box<dyn Iterator<Item = u32> + '_> {
Box::new(self.iter_non_null_docs())
Box::new(self.iter_rows())
}
}
impl std::fmt::Debug for OptionalIndex {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("OptionalIndex")
.field("num_docs", &self.num_docs)
.field("num_non_null_docs", &self.num_non_null_docs)
.field("num_rows", &self.num_rows)
.field("num_non_null_rows", &self.num_non_null_rows)
.finish_non_exhaustive()
}
}
@@ -259,13 +259,11 @@ impl Set<RowId> for OptionalIndex {
impl OptionalIndex {
pub fn for_test(num_rows: RowId, row_ids: &[RowId]) -> OptionalIndex {
assert!(
row_ids
.last()
.copied()
.map(|last_row_id| last_row_id < num_rows)
.unwrap_or(true)
);
assert!(row_ids
.last()
.copied()
.map(|last_row_id| last_row_id < num_rows)
.unwrap_or(true));
let mut buffer = Vec::new();
serialize_optional_index(&row_ids, num_rows, &mut buffer).unwrap();
let bytes = OwnedBytes::new(buffer);
@@ -273,18 +271,17 @@ impl OptionalIndex {
}
pub fn num_docs(&self) -> RowId {
self.num_docs
self.num_rows
}
pub fn num_non_nulls(&self) -> RowId {
self.num_non_null_docs
self.num_non_null_rows
}
pub fn iter_non_null_docs(&self) -> impl Iterator<Item = RowId> + '_ {
// TODO optimize. We could iterate over the blocks directly.
// We use the dense value ids and retrieve the doc ids via select.
pub fn iter_rows(&self) -> impl Iterator<Item = RowId> + '_ {
// TODO optimize
let mut select_batch = self.select_cursor();
(0..self.num_non_null_docs).map(move |rank| select_batch.select(rank))
(0..self.num_non_null_rows).map(move |rank| select_batch.select(rank))
}
pub fn select_batch(&self, ranks: &mut [RowId]) {
let mut select_cursor = self.select_cursor();
@@ -335,6 +332,38 @@ enum Block<'a> {
Sparse(SparseBlock<'a>),
}
#[derive(Debug, Copy, Clone)]
enum OptionalIndexCodec {
Dense = 0,
Sparse = 1,
}
impl OptionalIndexCodec {
fn to_code(self) -> u8 {
self as u8
}
fn try_from_code(code: u8) -> Result<Self, InvalidData> {
match code {
0 => Ok(Self::Dense),
1 => Ok(Self::Sparse),
_ => Err(InvalidData),
}
}
}
impl BinarySerializable for OptionalIndexCodec {
fn serialize<W: Write + ?Sized>(&self, writer: &mut W) -> io::Result<()> {
writer.write_all(&[self.to_code()])
}
fn deserialize<R: io::Read>(reader: &mut R) -> io::Result<Self> {
let optional_codec_code = u8::deserialize(reader)?;
let optional_codec = Self::try_from_code(optional_codec_code)?;
Ok(optional_codec)
}
}
fn serialize_optional_index_block(block_els: &[u16], out: &mut impl io::Write) -> io::Result<()> {
let is_sparse = is_sparse(block_els.len() as u32);
if is_sparse {
@@ -490,15 +519,15 @@ pub fn open_optional_index(bytes: OwnedBytes) -> io::Result<OptionalIndex> {
let (mut bytes, num_non_empty_blocks_bytes) = bytes.rsplit(2);
let num_non_empty_block_bytes =
u16::from_le_bytes(num_non_empty_blocks_bytes.as_slice().try_into().unwrap());
let num_docs = VInt::deserialize_u64(&mut bytes)? as u32;
let num_rows = VInt::deserialize_u64(&mut bytes)? as u32;
let block_metas_num_bytes =
num_non_empty_block_bytes as usize * SERIALIZED_BLOCK_META_NUM_BYTES;
let (block_data, block_metas) = bytes.rsplit(block_metas_num_bytes);
let (block_metas, num_non_null_docs) =
deserialize_optional_index_block_metadatas(block_metas.as_slice(), num_docs);
let (block_metas, num_non_null_rows) =
deserialize_optional_index_block_metadatas(block_metas.as_slice(), num_rows);
let optional_index = OptionalIndex {
num_docs,
num_non_null_docs,
num_rows,
num_non_null_rows,
block_data,
block_metas: block_metas.into(),
};

View File

@@ -2,7 +2,7 @@ use std::io::{self, Write};
use common::BinarySerializable;
use crate::column_index::optional_index::{ELEMENTS_PER_BLOCK, SelectCursor, Set, SetCodec};
use crate::column_index::optional_index::{SelectCursor, Set, SetCodec, ELEMENTS_PER_BLOCK};
#[inline(always)]
fn get_bit_at(input: u64, n: u16) -> bool {

View File

@@ -1,7 +1,7 @@
mod dense;
mod sparse;
pub use dense::{DENSE_BLOCK_NUM_BYTES, DenseBlock, DenseBlockCodec};
pub use dense::{DenseBlock, DenseBlockCodec, DENSE_BLOCK_NUM_BYTES};
pub use sparse::{SparseBlock, SparseBlockCodec};
#[cfg(test)]

View File

@@ -164,11 +164,7 @@ fn test_optional_index_large() {
fn test_optional_index_iter_aux(row_ids: &[RowId], num_rows: RowId) {
let optional_index = OptionalIndex::for_test(num_rows, row_ids);
assert_eq!(optional_index.num_docs(), num_rows);
assert!(
optional_index
.iter_non_null_docs()
.eq(row_ids.iter().copied())
);
assert!(optional_index.iter_rows().eq(row_ids.iter().copied()));
}
#[test]
@@ -223,3 +219,174 @@ fn test_optional_index_for_tests() {
assert!(!optional_index.contains(3));
assert_eq!(optional_index.num_docs(), 4);
}
#[cfg(all(test, feature = "unstable"))]
mod bench {
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use test::Bencher;
use super::*;
const TOTAL_NUM_VALUES: u32 = 1_000_000;
fn gen_bools(fill_ratio: f64) -> OptionalIndex {
let mut out = Vec::new();
let mut rng: StdRng = StdRng::from_seed([1u8; 32]);
let vals: Vec<RowId> = (0..TOTAL_NUM_VALUES)
.map(|_| rng.gen_bool(fill_ratio))
.enumerate()
.filter(|(_pos, val)| *val)
.map(|(pos, _)| pos as RowId)
.collect();
serialize_optional_index(&&vals[..], TOTAL_NUM_VALUES, &mut out).unwrap();
open_optional_index(OwnedBytes::new(out)).unwrap()
}
fn random_range_iterator(
start: u32,
end: u32,
avg_step_size: u32,
avg_deviation: u32,
) -> impl Iterator<Item = u32> {
let mut rng: StdRng = StdRng::from_seed([1u8; 32]);
let mut current = start;
std::iter::from_fn(move || {
current += rng.gen_range(avg_step_size - avg_deviation..=avg_step_size + avg_deviation);
if current >= end {
None
} else {
Some(current)
}
})
}
fn n_percent_step_iterator(percent: f32, num_values: u32) -> impl Iterator<Item = u32> {
let ratio = percent / 100.0;
let step_size = (1f32 / ratio) as u32;
let deviation = step_size - 1;
random_range_iterator(0, num_values, step_size, deviation)
}
fn walk_over_data(codec: &OptionalIndex, avg_step_size: u32) -> Option<u32> {
walk_over_data_from_positions(
codec,
random_range_iterator(0, TOTAL_NUM_VALUES, avg_step_size, 0),
)
}
fn walk_over_data_from_positions(
codec: &OptionalIndex,
positions: impl Iterator<Item = u32>,
) -> Option<u32> {
let mut dense_idx: Option<u32> = None;
for idx in positions {
dense_idx = dense_idx.or(codec.rank_if_exists(idx));
}
dense_idx
}
#[bench]
fn bench_translate_orig_to_codec_1percent_filled_10percent_hit(bench: &mut Bencher) {
let codec = gen_bools(0.01f64);
bench.iter(|| walk_over_data(&codec, 100));
}
#[bench]
fn bench_translate_orig_to_codec_5percent_filled_10percent_hit(bench: &mut Bencher) {
let codec = gen_bools(0.05f64);
bench.iter(|| walk_over_data(&codec, 100));
}
#[bench]
fn bench_translate_orig_to_codec_5percent_filled_1percent_hit(bench: &mut Bencher) {
let codec = gen_bools(0.05f64);
bench.iter(|| walk_over_data(&codec, 1000));
}
#[bench]
fn bench_translate_orig_to_codec_full_scan_1percent_filled(bench: &mut Bencher) {
let codec = gen_bools(0.01f64);
bench.iter(|| walk_over_data_from_positions(&codec, 0..TOTAL_NUM_VALUES));
}
#[bench]
fn bench_translate_orig_to_codec_full_scan_10percent_filled(bench: &mut Bencher) {
let codec = gen_bools(0.1f64);
bench.iter(|| walk_over_data_from_positions(&codec, 0..TOTAL_NUM_VALUES));
}
#[bench]
fn bench_translate_orig_to_codec_full_scan_90percent_filled(bench: &mut Bencher) {
let codec = gen_bools(0.9f64);
bench.iter(|| walk_over_data_from_positions(&codec, 0..TOTAL_NUM_VALUES));
}
#[bench]
fn bench_translate_orig_to_codec_10percent_filled_1percent_hit(bench: &mut Bencher) {
let codec = gen_bools(0.1f64);
bench.iter(|| walk_over_data(&codec, 100));
}
#[bench]
fn bench_translate_orig_to_codec_50percent_filled_1percent_hit(bench: &mut Bencher) {
let codec = gen_bools(0.5f64);
bench.iter(|| walk_over_data(&codec, 100));
}
#[bench]
fn bench_translate_orig_to_codec_90percent_filled_1percent_hit(bench: &mut Bencher) {
let codec = gen_bools(0.9f64);
bench.iter(|| walk_over_data(&codec, 100));
}
#[bench]
fn bench_translate_codec_to_orig_1percent_filled_0comma005percent_hit(bench: &mut Bencher) {
bench_translate_codec_to_orig_util(0.01f64, 0.005f32, bench);
}
#[bench]
fn bench_translate_codec_to_orig_10percent_filled_0comma005percent_hit(bench: &mut Bencher) {
bench_translate_codec_to_orig_util(0.1f64, 0.005f32, bench);
}
#[bench]
fn bench_translate_codec_to_orig_1percent_filled_10percent_hit(bench: &mut Bencher) {
bench_translate_codec_to_orig_util(0.01f64, 10f32, bench);
}
#[bench]
fn bench_translate_codec_to_orig_1percent_filled_full_scan(bench: &mut Bencher) {
bench_translate_codec_to_orig_util(0.01f64, 100f32, bench);
}
fn bench_translate_codec_to_orig_util(
percent_filled: f64,
percent_hit: f32,
bench: &mut Bencher,
) {
let codec = gen_bools(percent_filled);
let num_non_nulls = codec.num_non_nulls();
let idxs: Vec<u32> = if percent_hit == 100.0f32 {
(0..num_non_nulls).collect()
} else {
n_percent_step_iterator(percent_hit, num_non_nulls).collect()
};
let mut output = vec![0u32; idxs.len()];
bench.iter(|| {
output.copy_from_slice(&idxs[..]);
codec.select_batch(&mut output);
});
}
#[bench]
fn bench_translate_codec_to_orig_90percent_filled_0comma005percent_hit(bench: &mut Bencher) {
bench_translate_codec_to_orig_util(0.9f64, 0.005, bench);
}
#[bench]
fn bench_translate_codec_to_orig_90percent_filled_full_scan(bench: &mut Bencher) {
bench_translate_codec_to_orig_util(0.9f64, 100.0f32, bench);
}
}

View File

@@ -3,11 +3,11 @@ use std::io::Write;
use common::{CountingWriter, OwnedBytes};
use super::OptionalIndex;
use super::multivalued_index::SerializableMultivalueIndex;
use crate::column_index::ColumnIndex;
use super::OptionalIndex;
use crate::column_index::multivalued_index::serialize_multivalued_index;
use crate::column_index::optional_index::serialize_optional_index;
use crate::column_index::ColumnIndex;
use crate::iterable::Iterable;
use crate::{Cardinality, RowId, Version};

View File

@@ -0,0 +1,139 @@
use std::sync::Arc;
use common::OwnedBytes;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use test::{self, Bencher};
use super::*;
use crate::column_values::u64_based::*;
fn get_data() -> Vec<u64> {
let mut rng = StdRng::seed_from_u64(2u64);
let mut data: Vec<_> = (100..55000_u64)
.map(|num| num + rng.gen::<u8>() as u64)
.collect();
data.push(99_000);
data.insert(1000, 2000);
data.insert(2000, 100);
data.insert(3000, 4100);
data.insert(4000, 100);
data.insert(5000, 800);
data
}
fn compute_stats(vals: impl Iterator<Item = u64>) -> ColumnStats {
let mut stats_collector = StatsCollector::default();
for val in vals {
stats_collector.collect(val);
}
stats_collector.stats()
}
#[inline(never)]
fn value_iter() -> impl Iterator<Item = u64> {
0..20_000
}
fn get_reader_for_bench<Codec: ColumnCodec>(data: &[u64]) -> Codec::ColumnValues {
let mut bytes = Vec::new();
let stats = compute_stats(data.iter().cloned());
let mut codec_serializer = Codec::estimator();
for val in data {
codec_serializer.collect(*val);
}
codec_serializer
.serialize(&stats, Box::new(data.iter().copied()).as_mut(), &mut bytes)
.unwrap();
Codec::load(OwnedBytes::new(bytes)).unwrap()
}
fn bench_get<Codec: ColumnCodec>(b: &mut Bencher, data: &[u64]) {
let col = get_reader_for_bench::<Codec>(data);
b.iter(|| {
let mut sum = 0u64;
for pos in value_iter() {
let val = col.get_val(pos as u32);
sum = sum.wrapping_add(val);
}
sum
});
}
#[inline(never)]
fn bench_get_dynamic_helper(b: &mut Bencher, col: Arc<dyn ColumnValues>) {
b.iter(|| {
let mut sum = 0u64;
for pos in value_iter() {
let val = col.get_val(pos as u32);
sum = sum.wrapping_add(val);
}
sum
});
}
fn bench_get_dynamic<Codec: ColumnCodec>(b: &mut Bencher, data: &[u64]) {
let col = Arc::new(get_reader_for_bench::<Codec>(data));
bench_get_dynamic_helper(b, col);
}
fn bench_create<Codec: ColumnCodec>(b: &mut Bencher, data: &[u64]) {
let stats = compute_stats(data.iter().cloned());
let mut bytes = Vec::new();
b.iter(|| {
bytes.clear();
let mut codec_serializer = Codec::estimator();
for val in data.iter().take(1024) {
codec_serializer.collect(*val);
}
codec_serializer.serialize(&stats, Box::new(data.iter().copied()).as_mut(), &mut bytes)
});
}
#[bench]
fn bench_fastfield_bitpack_create(b: &mut Bencher) {
let data: Vec<_> = get_data();
bench_create::<BitpackedCodec>(b, &data);
}
#[bench]
fn bench_fastfield_linearinterpol_create(b: &mut Bencher) {
let data: Vec<_> = get_data();
bench_create::<LinearCodec>(b, &data);
}
#[bench]
fn bench_fastfield_multilinearinterpol_create(b: &mut Bencher) {
let data: Vec<_> = get_data();
bench_create::<BlockwiseLinearCodec>(b, &data);
}
#[bench]
fn bench_fastfield_bitpack_get(b: &mut Bencher) {
let data: Vec<_> = get_data();
bench_get::<BitpackedCodec>(b, &data);
}
#[bench]
fn bench_fastfield_bitpack_get_dynamic(b: &mut Bencher) {
let data: Vec<_> = get_data();
bench_get_dynamic::<BitpackedCodec>(b, &data);
}
#[bench]
fn bench_fastfield_linearinterpol_get(b: &mut Bencher) {
let data: Vec<_> = get_data();
bench_get::<LinearCodec>(b, &data);
}
#[bench]
fn bench_fastfield_linearinterpol_get_dynamic(b: &mut Bencher) {
let data: Vec<_> = get_data();
bench_get_dynamic::<LinearCodec>(b, &data);
}
#[bench]
fn bench_fastfield_multilinearinterpol_get(b: &mut Bencher) {
let data: Vec<_> = get_data();
bench_get::<BlockwiseLinearCodec>(b, &data);
}
#[bench]
fn bench_fastfield_multilinearinterpol_get_dynamic(b: &mut Bencher) {
let data: Vec<_> = get_data();
bench_get_dynamic::<BlockwiseLinearCodec>(b, &data);
}

View File

@@ -26,13 +26,13 @@ mod monotonic_column;
pub(crate) use merge::MergedColumnValues;
pub use stats::ColumnStats;
pub use u64_based::{
ALL_U64_CODEC_TYPES, CodecType, load_u64_based_column_values,
serialize_and_load_u64_based_column_values, serialize_u64_based_column_values,
};
pub use u128_based::{
CompactSpaceU64Accessor, open_u128_as_compact_u64, open_u128_mapped,
serialize_column_values_u128,
open_u128_as_compact_u64, open_u128_mapped, serialize_column_values_u128,
CompactSpaceU64Accessor,
};
pub use u64_based::{
load_u64_based_column_values, serialize_and_load_u64_based_column_values,
serialize_u64_based_column_values, CodecType, ALL_U64_CODEC_TYPES,
};
pub use vec_column::VecColumn;
@@ -242,3 +242,6 @@ impl<T: Copy + PartialOrd + Debug + 'static> ColumnValues<T> for Arc<dyn ColumnV
.get_row_ids_for_value_range(range, doc_id_range, positions)
}
}
#[cfg(all(test, feature = "unstable"))]
mod bench;

View File

@@ -2,8 +2,8 @@ use std::fmt::Debug;
use std::marker::PhantomData;
use std::ops::{Range, RangeInclusive};
use crate::ColumnValues;
use crate::column_values::monotonic_mapping::StrictlyMonotonicFn;
use crate::ColumnValues;
struct MonotonicMappingColumn<C, T, Input> {
from_column: C,
@@ -99,10 +99,10 @@ where
#[cfg(test)]
mod tests {
use super::*;
use crate::column_values::VecColumn;
use crate::column_values::monotonic_mapping::{
StrictlyMonotonicMappingInverter, StrictlyMonotonicMappingToInternal,
};
use crate::column_values::VecColumn;
#[test]
fn test_monotonic_mapping_iter() {

View File

@@ -185,10 +185,10 @@ impl CompactSpaceBuilder {
let mut covered_space = Vec::with_capacity(self.blanks.len());
// beginning of the blanks
if let Some(first_blank_start) = self.blanks.first().map(RangeInclusive::start)
&& *first_blank_start != 0
{
covered_space.push(0..=first_blank_start - 1);
if let Some(first_blank_start) = self.blanks.first().map(RangeInclusive::start) {
if *first_blank_start != 0 {
covered_space.push(0..=first_blank_start - 1);
}
}
// Between the blanks
@@ -202,10 +202,10 @@ impl CompactSpaceBuilder {
covered_space.extend(between_blanks);
// end of the blanks
if let Some(last_blank_end) = self.blanks.last().map(RangeInclusive::end)
&& *last_blank_end != u128::MAX
{
covered_space.push(last_blank_end + 1..=u128::MAX);
if let Some(last_blank_end) = self.blanks.last().map(RangeInclusive::end) {
if *last_blank_end != u128::MAX {
covered_space.push(last_blank_end + 1..=u128::MAX);
}
}
if covered_space.is_empty() {

View File

@@ -24,8 +24,8 @@ use build_compact_space::get_compact_space;
use common::{BinarySerializable, CountingWriter, OwnedBytes, VInt, VIntU128};
use tantivy_bitpacker::{BitPacker, BitUnpacker};
use crate::RowId;
use crate::column_values::ColumnValues;
use crate::RowId;
/// The cost per blank is quite hard actually, since blanks are delta encoded, the actual cost of
/// blanks depends on the number of blanks.
@@ -653,14 +653,12 @@ mod tests {
),
&[3]
);
assert!(
get_positions_for_value_range_helper(
&decomp,
99998u128..=99998u128,
complete_range.clone()
)
.is_empty()
);
assert!(get_positions_for_value_range_helper(
&decomp,
99998u128..=99998u128,
complete_range.clone()
)
.is_empty());
assert_eq!(
&get_positions_for_value_range_helper(
&decomp,

View File

@@ -130,11 +130,11 @@ pub fn open_u128_as_compact_u64(mut bytes: OwnedBytes) -> io::Result<Arc<dyn Col
#[cfg(test)]
pub(crate) mod tests {
use super::*;
use crate::column_values::CodecType;
use crate::column_values::u64_based::{
ALL_U64_CODEC_TYPES, serialize_and_load_u64_based_column_values,
serialize_u64_based_column_values,
serialize_and_load_u64_based_column_values, serialize_u64_based_column_values,
ALL_U64_CODEC_TYPES,
};
use crate::column_values::CodecType;
#[test]
fn test_serialize_deserialize_u128_header() {

View File

@@ -4,7 +4,7 @@ use std::ops::{Range, RangeInclusive};
use common::{BinarySerializable, OwnedBytes};
use fastdivide::DividerU64;
use tantivy_bitpacker::{BitPacker, BitUnpacker, compute_num_bits};
use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker};
use crate::column_values::u64_based::{ColumnCodec, ColumnCodecEstimator, ColumnStats};
use crate::{ColumnValues, RowId};
@@ -23,7 +23,11 @@ const fn div_ceil(n: u64, q: NonZeroU64) -> u64 {
// copied from unstable rust standard library.
let d = n / q.get();
let r = n % q.get();
if r > 0 { d + 1 } else { d }
if r > 0 {
d + 1
} else {
d
}
}
// The bitpacked codec applies a linear transformation `f` over data that are bitpacked.
@@ -105,7 +109,7 @@ impl ColumnCodecEstimator for BitpackedCodecEstimator {
fn estimate(&self, stats: &ColumnStats) -> Option<u64> {
let num_bits_per_value = num_bits(stats);
Some(stats.num_bytes() + (stats.num_rows as u64 * (num_bits_per_value as u64)).div_ceil(8))
Some(stats.num_bytes() + (stats.num_rows as u64 * (num_bits_per_value as u64) + 7) / 8)
}
fn serialize(

View File

@@ -4,12 +4,12 @@ use std::{io, iter};
use common::{BinarySerializable, CountingWriter, DeserializeFrom, OwnedBytes};
use fastdivide::DividerU64;
use tantivy_bitpacker::{BitPacker, BitUnpacker, compute_num_bits};
use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker};
use crate::MonotonicallyMappableToU64;
use crate::column_values::u64_based::line::Line;
use crate::column_values::u64_based::{ColumnCodec, ColumnCodecEstimator, ColumnStats};
use crate::column_values::{ColumnValues, VecColumn};
use crate::MonotonicallyMappableToU64;
const BLOCK_SIZE: u32 = 512u32;

View File

@@ -1,13 +1,13 @@
use std::io;
use common::{BinarySerializable, OwnedBytes};
use tantivy_bitpacker::{BitPacker, BitUnpacker, compute_num_bits};
use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker};
use super::ColumnValues;
use super::line::Line;
use crate::RowId;
use crate::column_values::VecColumn;
use super::ColumnValues;
use crate::column_values::u64_based::{ColumnCodec, ColumnCodecEstimator, ColumnStats};
use crate::column_values::VecColumn;
use crate::RowId;
const HALF_SPACE: u64 = u64::MAX / 2;
const LINE_ESTIMATION_BLOCK_LEN: usize = 512;
@@ -117,7 +117,7 @@ impl ColumnCodecEstimator for LinearCodecEstimator {
Some(
stats.num_bytes()
+ linear_params.num_bytes()
+ (num_bits as u64 * stats.num_rows as u64).div_ceil(8),
+ (num_bits as u64 * stats.num_rows as u64 + 7) / 8,
)
}

View File

@@ -17,7 +17,7 @@ pub use crate::column_values::u64_based::bitpacked::BitpackedCodec;
pub use crate::column_values::u64_based::blockwise_linear::BlockwiseLinearCodec;
pub use crate::column_values::u64_based::linear::LinearCodec;
pub use crate::column_values::u64_based::stats_collector::StatsCollector;
use crate::column_values::{ColumnStats, monotonic_map_column};
use crate::column_values::{monotonic_map_column, ColumnStats};
use crate::iterable::Iterable;
use crate::{ColumnValues, MonotonicallyMappableToU64};

View File

@@ -2,8 +2,8 @@ use std::num::NonZeroU64;
use fastdivide::DividerU64;
use crate::RowId;
use crate::column_values::ColumnStats;
use crate::RowId;
/// Compute the gcd of two non null numbers.
///
@@ -96,8 +96,8 @@ impl StatsCollector {
mod tests {
use std::num::NonZeroU64;
use crate::column_values::u64_based::stats_collector::{compute_gcd, StatsCollector};
use crate::column_values::u64_based::ColumnStats;
use crate::column_values::u64_based::stats_collector::{StatsCollector, compute_gcd};
fn compute_stats(vals: impl Iterator<Item = u64>) -> ColumnStats {
let mut stats_collector = StatsCollector::default();

View File

@@ -1,6 +1,5 @@
use proptest::prelude::*;
use proptest::{prop_oneof, proptest};
use rand::Rng;
#[test]
fn test_serialize_and_load_simple() {

View File

@@ -4,8 +4,8 @@ use std::net::Ipv6Addr;
use serde::{Deserialize, Serialize};
use crate::InvalidData;
use crate::value::NumericalType;
use crate::InvalidData;
/// The column type represents the column type.
/// Any changes need to be propagated to `COLUMN_TYPES`.

View File

@@ -3,7 +3,7 @@ use std::io::{self, Write};
use common::{BitSet, CountingWriter, ReadOnlyBitSet};
use sstable::{SSTable, Streamer, TermOrdinal, VoidSSTable};
use super::term_merger::{TermMerger, TermsWithSegmentOrd};
use super::term_merger::TermMerger;
use crate::column::serialize_column_mappable_to_u64;
use crate::column_index::SerializableColumnIndex;
use crate::iterable::Iterable;
@@ -126,17 +126,14 @@ fn serialize_merged_dict(
let mut term_ord_mapping = TermOrdinalMapping::default();
let mut field_term_streams = Vec::new();
for (segment_ord, column_opt) in bytes_columns.iter().enumerate() {
for column_opt in bytes_columns.iter() {
if let Some(column) = column_opt {
term_ord_mapping.add_segment(column.dictionary.num_terms());
let terms: Streamer<VoidSSTable> = column.dictionary.stream()?;
field_term_streams.push(TermsWithSegmentOrd { terms, segment_ord });
field_term_streams.push(terms);
} else {
term_ord_mapping.add_segment(0);
field_term_streams.push(TermsWithSegmentOrd {
terms: Streamer::empty(),
segment_ord,
});
field_term_streams.push(Streamer::empty());
}
}
@@ -194,7 +191,6 @@ fn serialize_merged_dict(
#[derive(Default, Debug)]
struct TermOrdinalMapping {
/// Contains the new term ordinals for each segment.
per_segment_new_term_ordinals: Vec<Vec<TermOrdinal>>,
}
@@ -209,6 +205,6 @@ impl TermOrdinalMapping {
}
fn get_segment(&self, segment_ord: u32) -> &[TermOrdinal] {
&self.per_segment_new_term_ordinals[segment_ord as usize]
&(self.per_segment_new_term_ordinals[segment_ord as usize])[..]
}
}

View File

@@ -26,7 +26,7 @@ impl StackMergeOrder {
let mut cumulated_row_ids: Vec<RowId> = Vec::with_capacity(columnars.len());
let mut cumulated_row_id = 0;
for columnar in columnars {
cumulated_row_id += columnar.num_docs();
cumulated_row_id += columnar.num_rows();
cumulated_row_ids.push(cumulated_row_id);
}
StackMergeOrder { cumulated_row_ids }

View File

@@ -10,11 +10,11 @@ use std::sync::Arc;
pub use merge_mapping::{MergeRowOrder, ShuffleMergeOrder, StackMergeOrder};
use super::writer::ColumnarSerializer;
use crate::column::{serialize_column_mappable_to_u64, serialize_column_mappable_to_u128};
use crate::column::{serialize_column_mappable_to_u128, serialize_column_mappable_to_u64};
use crate::column_values::MergedColumnValues;
use crate::columnar::ColumnarReader;
use crate::columnar::merge::merge_dict_column::merge_bytes_or_str_column;
use crate::columnar::writer::CompatibleNumericalTypes;
use crate::columnar::ColumnarReader;
use crate::dynamic_column::DynamicColumn;
use crate::{
BytesColumn, Column, ColumnIndex, ColumnType, ColumnValues, DynamicColumnHandle, NumericalType,
@@ -80,12 +80,13 @@ pub fn merge_columnar(
output: &mut impl io::Write,
) -> io::Result<()> {
let mut serializer = ColumnarSerializer::new(output);
let num_docs_per_columnar = columnar_readers
let num_rows_per_columnar = columnar_readers
.iter()
.map(|reader| reader.num_docs())
.map(|reader| reader.num_rows())
.collect::<Vec<u32>>();
let columns_to_merge = group_columns_for_merge(columnar_readers, required_columns)?;
let columns_to_merge =
group_columns_for_merge(columnar_readers, required_columns, &merge_row_order)?;
for res in columns_to_merge {
let ((column_name, _column_type_category), grouped_columns) = res;
let grouped_columns = grouped_columns.open(&merge_row_order)?;
@@ -93,18 +94,15 @@ pub fn merge_columnar(
continue;
}
let column_type_after_merge = grouped_columns.column_type_after_merge();
let column_type = grouped_columns.column_type_after_merge();
let mut columns = grouped_columns.columns;
// Make sure the number of columns is the same as the number of columnar readers.
// Or num_docs_per_columnar would be incorrect.
assert_eq!(columns.len(), columnar_readers.len());
coerce_columns(column_type_after_merge, &mut columns)?;
coerce_columns(column_type, &mut columns)?;
let mut column_serializer =
serializer.start_serialize_column(column_name.as_bytes(), column_type_after_merge);
serializer.start_serialize_column(column_name.as_bytes(), column_type);
merge_column(
column_type_after_merge,
&num_docs_per_columnar,
column_type,
&num_rows_per_columnar,
columns,
&merge_row_order,
&mut column_serializer,
@@ -130,7 +128,7 @@ fn dynamic_column_to_u64_monotonic(dynamic_column: DynamicColumn) -> Option<Colu
fn merge_column(
column_type: ColumnType,
num_docs_per_column: &[u32],
columns_to_merge: Vec<Option<DynamicColumn>>,
columns: Vec<Option<DynamicColumn>>,
merge_row_order: &MergeRowOrder,
wrt: &mut impl io::Write,
) -> io::Result<()> {
@@ -140,21 +138,20 @@ fn merge_column(
| ColumnType::F64
| ColumnType::DateTime
| ColumnType::Bool => {
let mut column_indexes: Vec<ColumnIndex> = Vec::with_capacity(columns_to_merge.len());
let mut column_indexes: Vec<ColumnIndex> = Vec::with_capacity(columns.len());
let mut column_values: Vec<Option<Arc<dyn ColumnValues>>> =
Vec::with_capacity(columns_to_merge.len());
for (i, dynamic_column_opt) in columns_to_merge.into_iter().enumerate() {
match dynamic_column_opt.and_then(dynamic_column_to_u64_monotonic) {
Some(Column { index: idx, values }) => {
column_indexes.push(idx);
column_values.push(Some(values));
}
None => {
column_indexes.push(ColumnIndex::Empty {
num_docs: num_docs_per_column[i],
});
column_values.push(None);
}
Vec::with_capacity(columns.len());
for (i, dynamic_column_opt) in columns.into_iter().enumerate() {
if let Some(Column { index: idx, values }) =
dynamic_column_opt.and_then(dynamic_column_to_u64_monotonic)
{
column_indexes.push(idx);
column_values.push(Some(values));
} else {
column_indexes.push(ColumnIndex::Empty {
num_docs: num_docs_per_column[i],
});
column_values.push(None);
}
}
let merged_column_index =
@@ -167,10 +164,10 @@ fn merge_column(
serialize_column_mappable_to_u64(merged_column_index, &merge_column_values, wrt)?;
}
ColumnType::IpAddr => {
let mut column_indexes: Vec<ColumnIndex> = Vec::with_capacity(columns_to_merge.len());
let mut column_indexes: Vec<ColumnIndex> = Vec::with_capacity(columns.len());
let mut column_values: Vec<Option<Arc<dyn ColumnValues<Ipv6Addr>>>> =
Vec::with_capacity(columns_to_merge.len());
for (i, dynamic_column_opt) in columns_to_merge.into_iter().enumerate() {
Vec::with_capacity(columns.len());
for (i, dynamic_column_opt) in columns.into_iter().enumerate() {
if let Some(DynamicColumn::IpAddr(Column { index: idx, values })) =
dynamic_column_opt
{
@@ -195,10 +192,9 @@ fn merge_column(
serialize_column_mappable_to_u128(merged_column_index, &merge_column_values, wrt)?;
}
ColumnType::Bytes | ColumnType::Str => {
let mut column_indexes: Vec<ColumnIndex> = Vec::with_capacity(columns_to_merge.len());
let mut bytes_columns: Vec<Option<BytesColumn>> =
Vec::with_capacity(columns_to_merge.len());
for (i, dynamic_column_opt) in columns_to_merge.into_iter().enumerate() {
let mut column_indexes: Vec<ColumnIndex> = Vec::with_capacity(columns.len());
let mut bytes_columns: Vec<Option<BytesColumn>> = Vec::with_capacity(columns.len());
for (i, dynamic_column_opt) in columns.into_iter().enumerate() {
match dynamic_column_opt {
Some(DynamicColumn::Str(str_column)) => {
column_indexes.push(str_column.term_ord_column.index.clone());
@@ -252,15 +248,13 @@ impl GroupedColumns {
if column_type.len() == 1 {
return column_type.into_iter().next().unwrap();
}
// At the moment, only the numerical column type category has more than one possible
// At the moment, only the numerical categorical column type has more than one possible
// column type.
assert!(
self.columns
.iter()
.flatten()
.all(|el| ColumnTypeCategory::from(el.column_type())
== ColumnTypeCategory::Numerical)
);
assert!(self
.columns
.iter()
.flatten()
.all(|el| ColumnTypeCategory::from(el.column_type()) == ColumnTypeCategory::Numerical));
merged_numerical_columns_type(self.columns.iter().flatten()).into()
}
}
@@ -367,7 +361,7 @@ fn is_empty_after_merge(
ColumnIndex::Empty { .. } => true,
ColumnIndex::Full => alive_bitset.len() == 0,
ColumnIndex::Optional(optional_index) => {
for doc in optional_index.iter_non_null_docs() {
for doc in optional_index.iter_rows() {
if alive_bitset.contains(doc) {
return false;
}
@@ -397,6 +391,7 @@ fn is_empty_after_merge(
fn group_columns_for_merge<'a>(
columnar_readers: &'a [&'a ColumnarReader],
required_columns: &'a [(String, ColumnType)],
_merge_row_order: &'a MergeRowOrder,
) -> io::Result<BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle>> {
let mut columns: BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle> = BTreeMap::new();

View File

@@ -5,29 +5,28 @@ use sstable::TermOrdinal;
use crate::Streamer;
/// The terms of a column with the ordinal of the segment.
pub struct TermsWithSegmentOrd<'a> {
pub terms: Streamer<'a>,
pub struct HeapItem<'a> {
pub streamer: Streamer<'a>,
pub segment_ord: usize,
}
impl PartialEq for TermsWithSegmentOrd<'_> {
impl PartialEq for HeapItem<'_> {
fn eq(&self, other: &Self) -> bool {
self.segment_ord == other.segment_ord
}
}
impl Eq for TermsWithSegmentOrd<'_> {}
impl Eq for HeapItem<'_> {}
impl<'a> PartialOrd for TermsWithSegmentOrd<'a> {
fn partial_cmp(&self, other: &TermsWithSegmentOrd<'a>) -> Option<Ordering> {
impl<'a> PartialOrd for HeapItem<'a> {
fn partial_cmp(&self, other: &HeapItem<'a>) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl<'a> Ord for TermsWithSegmentOrd<'a> {
fn cmp(&self, other: &TermsWithSegmentOrd<'a>) -> Ordering {
(&other.terms.key(), &other.segment_ord).cmp(&(&self.terms.key(), &self.segment_ord))
impl<'a> Ord for HeapItem<'a> {
fn cmp(&self, other: &HeapItem<'a>) -> Ordering {
(&other.streamer.key(), &other.segment_ord).cmp(&(&self.streamer.key(), &self.segment_ord))
}
}
@@ -38,32 +37,39 @@ impl<'a> Ord for TermsWithSegmentOrd<'a> {
/// - the term
/// - a slice with the ordinal of the segments containing the terms.
pub struct TermMerger<'a> {
heap: BinaryHeap<TermsWithSegmentOrd<'a>>,
term_streams_with_segment: Vec<TermsWithSegmentOrd<'a>>,
heap: BinaryHeap<HeapItem<'a>>,
current_streamers: Vec<HeapItem<'a>>,
}
impl<'a> TermMerger<'a> {
/// Stream of merged term dictionary
pub fn new(term_streams_with_segment: Vec<TermsWithSegmentOrd<'a>>) -> TermMerger<'a> {
pub fn new(streams: Vec<Streamer<'a>>) -> TermMerger<'a> {
TermMerger {
heap: BinaryHeap::new(),
term_streams_with_segment,
current_streamers: streams
.into_iter()
.enumerate()
.map(|(ord, streamer)| HeapItem {
streamer,
segment_ord: ord,
})
.collect(),
}
}
pub(crate) fn matching_segments<'b: 'a>(
&'b self,
) -> impl 'b + Iterator<Item = (usize, TermOrdinal)> {
self.term_streams_with_segment
self.current_streamers
.iter()
.map(|heap_item| (heap_item.segment_ord, heap_item.terms.term_ord()))
.map(|heap_item| (heap_item.segment_ord, heap_item.streamer.term_ord()))
}
fn advance_segments(&mut self) {
let streamers = &mut self.term_streams_with_segment;
let streamers = &mut self.current_streamers;
let heap = &mut self.heap;
for mut heap_item in streamers.drain(..) {
if heap_item.terms.advance() {
if heap_item.streamer.advance() {
heap.push(heap_item);
}
}
@@ -74,19 +80,18 @@ impl<'a> TermMerger<'a> {
/// False if there is none.
pub fn advance(&mut self) -> bool {
self.advance_segments();
match self.heap.pop() {
Some(head) => {
self.term_streams_with_segment.push(head);
while let Some(next_streamer) = self.heap.peek() {
if self.term_streams_with_segment[0].terms.key() != next_streamer.terms.key() {
break;
}
let next_heap_it = self.heap.pop().unwrap(); // safe : we peeked beforehand
self.term_streams_with_segment.push(next_heap_it);
if let Some(head) = self.heap.pop() {
self.current_streamers.push(head);
while let Some(next_streamer) = self.heap.peek() {
if self.current_streamers[0].streamer.key() != next_streamer.streamer.key() {
break;
}
true
let next_heap_it = self.heap.pop().unwrap(); // safe : we peeked beforehand
self.current_streamers.push(next_heap_it);
}
_ => false,
true
} else {
false
}
}
@@ -96,6 +101,6 @@ impl<'a> TermMerger<'a> {
/// if and only if advance() has been called before
/// and "true" was returned.
pub fn key(&self) -> &[u8] {
self.term_streams_with_segment[0].terms.key()
self.current_streamers[0].streamer.key()
}
}

View File

@@ -1,10 +1,7 @@
use itertools::Itertools;
use proptest::collection::vec;
use proptest::prelude::*;
use super::*;
use crate::columnar::{ColumnarReader, MergeRowOrder, StackMergeOrder, merge_columnar};
use crate::{Cardinality, ColumnarWriter, DynamicColumn, HasAssociatedColumnType, RowId};
use crate::{Cardinality, ColumnarWriter, HasAssociatedColumnType, RowId};
fn make_columnar<T: Into<NumericalValue> + HasAssociatedColumnType + Copy>(
column_name: &str,
@@ -29,8 +26,9 @@ fn test_column_coercion_to_u64() {
// u64 type
let columnar2 = make_columnar("numbers", &[u64::MAX]);
let columnars = &[&columnar1, &columnar2];
let merge_order = StackMergeOrder::stack(columnars).into();
let column_map: BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle> =
group_columns_for_merge(columnars, &[]).unwrap();
group_columns_for_merge(columnars, &[], &merge_order).unwrap();
assert_eq!(column_map.len(), 1);
assert!(column_map.contains_key(&("numbers".to_string(), ColumnTypeCategory::Numerical)));
}
@@ -40,8 +38,9 @@ fn test_column_coercion_to_i64() {
let columnar1 = make_columnar("numbers", &[-1i64]);
let columnar2 = make_columnar("numbers", &[2u64]);
let columnars = &[&columnar1, &columnar2];
let merge_order = StackMergeOrder::stack(columnars).into();
let column_map: BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle> =
group_columns_for_merge(columnars, &[]).unwrap();
group_columns_for_merge(columnars, &[], &merge_order).unwrap();
assert_eq!(column_map.len(), 1);
assert!(column_map.contains_key(&("numbers".to_string(), ColumnTypeCategory::Numerical)));
}
@@ -64,8 +63,14 @@ fn test_group_columns_with_required_column() {
let columnar1 = make_columnar("numbers", &[1i64]);
let columnar2 = make_columnar("numbers", &[2u64]);
let columnars = &[&columnar1, &columnar2];
let merge_order = StackMergeOrder::stack(columnars).into();
let column_map: BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle> =
group_columns_for_merge(columnars, &[("numbers".to_string(), ColumnType::U64)]).unwrap();
group_columns_for_merge(
&[&columnar1, &columnar2],
&[("numbers".to_string(), ColumnType::U64)],
&merge_order,
)
.unwrap();
assert_eq!(column_map.len(), 1);
assert!(column_map.contains_key(&("numbers".to_string(), ColumnTypeCategory::Numerical)));
}
@@ -75,9 +80,13 @@ fn test_group_columns_required_column_with_no_existing_columns() {
let columnar1 = make_columnar("numbers", &[2u64]);
let columnar2 = make_columnar("numbers", &[2u64]);
let columnars = &[&columnar1, &columnar2];
let column_map: BTreeMap<_, _> =
group_columns_for_merge(columnars, &[("required_col".to_string(), ColumnType::Str)])
.unwrap();
let merge_order = StackMergeOrder::stack(columnars).into();
let column_map: BTreeMap<_, _> = group_columns_for_merge(
columnars,
&[("required_col".to_string(), ColumnType::Str)],
&merge_order,
)
.unwrap();
assert_eq!(column_map.len(), 2);
let columns = &column_map
.get(&("required_col".to_string(), ColumnTypeCategory::Str))
@@ -93,8 +102,14 @@ fn test_group_columns_required_column_is_above_all_columns_have_the_same_type_ru
let columnar1 = make_columnar("numbers", &[2i64]);
let columnar2 = make_columnar("numbers", &[2i64]);
let columnars = &[&columnar1, &columnar2];
let merge_order = StackMergeOrder::stack(columnars).into();
let column_map: BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle> =
group_columns_for_merge(columnars, &[("numbers".to_string(), ColumnType::U64)]).unwrap();
group_columns_for_merge(
columnars,
&[("numbers".to_string(), ColumnType::U64)],
&merge_order,
)
.unwrap();
assert_eq!(column_map.len(), 1);
assert!(column_map.contains_key(&("numbers".to_string(), ColumnTypeCategory::Numerical)));
}
@@ -104,8 +119,9 @@ fn test_missing_column() {
let columnar1 = make_columnar("numbers", &[-1i64]);
let columnar2 = make_columnar("numbers2", &[2u64]);
let columnars = &[&columnar1, &columnar2];
let merge_order = StackMergeOrder::stack(columnars).into();
let column_map: BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle> =
group_columns_for_merge(columnars, &[]).unwrap();
group_columns_for_merge(columnars, &[], &merge_order).unwrap();
assert_eq!(column_map.len(), 2);
assert!(column_map.contains_key(&("numbers".to_string(), ColumnTypeCategory::Numerical)));
{
@@ -208,7 +224,7 @@ fn test_merge_columnar_numbers() {
)
.unwrap();
let columnar_reader = ColumnarReader::open(buffer).unwrap();
assert_eq!(columnar_reader.num_docs(), 3);
assert_eq!(columnar_reader.num_rows(), 3);
assert_eq!(columnar_reader.num_columns(), 1);
let cols = columnar_reader.read_columns("numbers").unwrap();
let dynamic_column = cols[0].open().unwrap();
@@ -236,7 +252,7 @@ fn test_merge_columnar_texts() {
)
.unwrap();
let columnar_reader = ColumnarReader::open(buffer).unwrap();
assert_eq!(columnar_reader.num_docs(), 3);
assert_eq!(columnar_reader.num_rows(), 3);
assert_eq!(columnar_reader.num_columns(), 1);
let cols = columnar_reader.read_columns("texts").unwrap();
let dynamic_column = cols[0].open().unwrap();
@@ -285,7 +301,7 @@ fn test_merge_columnar_byte() {
)
.unwrap();
let columnar_reader = ColumnarReader::open(buffer).unwrap();
assert_eq!(columnar_reader.num_docs(), 4);
assert_eq!(columnar_reader.num_rows(), 4);
assert_eq!(columnar_reader.num_columns(), 1);
let cols = columnar_reader.read_columns("bytes").unwrap();
let dynamic_column = cols[0].open().unwrap();
@@ -341,7 +357,7 @@ fn test_merge_columnar_byte_with_missing() {
)
.unwrap();
let columnar_reader = ColumnarReader::open(buffer).unwrap();
assert_eq!(columnar_reader.num_docs(), 3 + 2 + 3);
assert_eq!(columnar_reader.num_rows(), 3 + 2 + 3);
assert_eq!(columnar_reader.num_columns(), 2);
let cols = columnar_reader.read_columns("col").unwrap();
let dynamic_column = cols[0].open().unwrap();
@@ -393,7 +409,7 @@ fn test_merge_columnar_different_types() {
)
.unwrap();
let columnar_reader = ColumnarReader::open(buffer).unwrap();
assert_eq!(columnar_reader.num_docs(), 4);
assert_eq!(columnar_reader.num_rows(), 4);
assert_eq!(columnar_reader.num_columns(), 2);
let cols = columnar_reader.read_columns("mixed").unwrap();
@@ -403,11 +419,11 @@ fn test_merge_columnar_different_types() {
panic!()
};
assert_eq!(vals.get_cardinality(), Cardinality::Optional);
assert_eq!(vals.values_for_doc(0).collect_vec(), Vec::<i64>::new());
assert_eq!(vals.values_for_doc(1).collect_vec(), Vec::<i64>::new());
assert_eq!(vals.values_for_doc(2).collect_vec(), Vec::<i64>::new());
assert_eq!(vals.values_for_doc(0).collect_vec(), vec![]);
assert_eq!(vals.values_for_doc(1).collect_vec(), vec![]);
assert_eq!(vals.values_for_doc(2).collect_vec(), vec![]);
assert_eq!(vals.values_for_doc(3).collect_vec(), vec![1]);
assert_eq!(vals.values_for_doc(4).collect_vec(), Vec::<i64>::new());
assert_eq!(vals.values_for_doc(4).collect_vec(), vec![]);
// text column
let dynamic_column = cols[1].open().unwrap();
@@ -458,7 +474,7 @@ fn test_merge_columnar_different_empty_cardinality() {
)
.unwrap();
let columnar_reader = ColumnarReader::open(buffer).unwrap();
assert_eq!(columnar_reader.num_docs(), 2);
assert_eq!(columnar_reader.num_rows(), 2);
assert_eq!(columnar_reader.num_columns(), 2);
let cols = columnar_reader.read_columns("mixed").unwrap();
@@ -470,119 +486,3 @@ fn test_merge_columnar_different_empty_cardinality() {
let dynamic_column = cols[1].open().unwrap();
assert_eq!(dynamic_column.get_cardinality(), Cardinality::Optional);
}
#[derive(Debug, Clone)]
struct ColumnSpec {
column_name: String,
/// (row_id, term)
terms: Vec<(RowId, Vec<u8>)>,
}
#[derive(Clone, Debug)]
struct ColumnarSpec {
columns: Vec<ColumnSpec>,
}
/// Generate a random (row_id, term) pair:
/// - row_id in [0..10]
/// - term is either from POSSIBLE_TERMS or random bytes
fn rowid_and_term_strategy() -> impl Strategy<Value = (RowId, Vec<u8>)> {
const POSSIBLE_TERMS: &[&[u8]] = &[b"a", b"b", b"allo"];
let term_strat = prop_oneof![
// pick from the fixed list
(0..POSSIBLE_TERMS.len()).prop_map(|i| POSSIBLE_TERMS[i].to_vec()),
// or random bytes (length 0..10)
prop::collection::vec(any::<u8>(), 0..10),
];
(0u32..11, term_strat)
}
/// Generate one ColumnSpec, with a random name and a random list of (row_id, term).
/// We sort it by row_id so that data is in ascending order.
fn column_spec_strategy() -> impl Strategy<Value = ColumnSpec> {
let column_name = prop_oneof![
Just("col".to_string()),
Just("col2".to_string()),
"col.*".prop_map(|s| s),
];
// We'll produce 0..8 (rowid,term) entries for this column
let data_strat = vec(rowid_and_term_strategy(), 0..8).prop_map(|mut pairs| {
// Sort by row_id
pairs.sort_by_key(|(row_id, _)| *row_id);
pairs
});
(column_name, data_strat).prop_map(|(name, data)| ColumnSpec {
column_name: name,
terms: data,
})
}
/// Strategy to generate an ColumnarSpec
fn columnar_strategy() -> impl Strategy<Value = ColumnarSpec> {
vec(column_spec_strategy(), 0..3).prop_map(|columns| ColumnarSpec { columns })
}
/// Strategy to generate multiple ColumnarSpecs, each of which we will treat
/// as one "columnar" to be merged together.
fn columnars_strategy() -> impl Strategy<Value = Vec<ColumnarSpec>> {
vec(columnar_strategy(), 1..4)
}
/// Build a `ColumnarReader` from a `ColumnarSpec`
fn build_columnar(spec: &ColumnarSpec) -> ColumnarReader {
let mut writer = ColumnarWriter::default();
let mut max_row_id = 0;
for col in &spec.columns {
for &(row_id, ref term) in &col.terms {
writer.record_bytes(row_id, &col.column_name, term);
max_row_id = max_row_id.max(row_id);
}
}
let mut buffer = Vec::new();
writer.serialize(max_row_id + 1, &mut buffer).unwrap();
ColumnarReader::open(buffer).unwrap()
}
proptest! {
// We just test that the merge_columnar function doesn't crash.
#![proptest_config(ProptestConfig::with_cases(256))]
#[test]
fn test_merge_columnar_bytes_no_crash(columnars in columnars_strategy(), second_merge_columnars in columnars_strategy()) {
let columnars: Vec<ColumnarReader> = columnars.iter()
.map(build_columnar)
.collect();
let mut out = Vec::new();
let columnar_refs: Vec<&ColumnarReader> = columnars.iter().collect();
let stack_merge_order = StackMergeOrder::stack(&columnar_refs);
merge_columnar(
&columnar_refs,
&[],
MergeRowOrder::Stack(stack_merge_order),
&mut out,
).unwrap();
let merged_reader = ColumnarReader::open(out).unwrap();
// Merge the second set of columnars with the result of the first merge
let mut columnars: Vec<ColumnarReader> = second_merge_columnars.iter()
.map(build_columnar)
.collect();
columnars.push(merged_reader);
let mut out = Vec::new();
let columnar_refs: Vec<&ColumnarReader> = columnars.iter().collect();
let stack_merge_order = StackMergeOrder::stack(&columnar_refs);
merge_columnar(
&columnar_refs,
&[],
MergeRowOrder::Stack(stack_merge_order),
&mut out,
).unwrap();
}
}

View File

@@ -5,9 +5,9 @@ mod reader;
mod writer;
pub use column_type::{ColumnType, HasAssociatedColumnType};
pub use format_version::{CURRENT_VERSION, Version};
pub use format_version::{Version, CURRENT_VERSION};
#[cfg(test)]
pub(crate) use merge::ColumnTypeCategory;
pub use merge::{MergeRowOrder, ShuffleMergeOrder, StackMergeOrder, merge_columnar};
pub use merge::{merge_columnar, MergeRowOrder, ShuffleMergeOrder, StackMergeOrder};
pub use reader::ColumnarReader;
pub use writer::ColumnarWriter;

View File

@@ -1,11 +1,11 @@
use std::{fmt, io, mem};
use common::BinarySerializable;
use common::file_slice::FileSlice;
use common::json_path_writer::JSON_PATH_SEGMENT_SEP;
use common::BinarySerializable;
use sstable::{Dictionary, RangeSSTable};
use crate::columnar::{ColumnType, format_version};
use crate::columnar::{format_version, ColumnType};
use crate::dynamic_column::DynamicColumnHandle;
use crate::{RowId, Version};
@@ -19,13 +19,13 @@ fn io_invalid_data(msg: String) -> io::Error {
pub struct ColumnarReader {
column_dictionary: Dictionary<RangeSSTable>,
column_data: FileSlice,
num_docs: RowId,
num_rows: RowId,
format_version: Version,
}
impl fmt::Debug for ColumnarReader {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let num_rows = self.num_docs();
let num_rows = self.num_rows();
let columns = self.list_columns().unwrap();
let num_cols = columns.len();
let mut debug_struct = f.debug_struct("Columnar");
@@ -112,13 +112,13 @@ impl ColumnarReader {
Ok(ColumnarReader {
column_dictionary,
column_data,
num_docs: num_rows,
num_rows,
format_version,
})
}
pub fn num_docs(&self) -> RowId {
self.num_docs
pub fn num_rows(&self) -> RowId {
self.num_rows
}
// Iterate over the columns in a sorted way
pub fn iter_columns(

View File

@@ -244,7 +244,7 @@ impl SymbolValue for UnorderedId {
fn compute_num_bytes_for_u64(val: u64) -> usize {
let msb = (64u32 - val.leading_zeros()) as usize;
msb.div_ceil(8)
(msb + 7) / 8
}
fn encode_zig_zag(n: i64) -> u64 {

View File

@@ -42,7 +42,7 @@ impl ColumnWriter {
&self,
arena: &MemoryArena,
buffer: &'a mut Vec<u8>,
) -> impl Iterator<Item = ColumnOperation<V>> + 'a + use<'a, V> {
) -> impl Iterator<Item = ColumnOperation<V>> + 'a {
buffer.clear();
self.values.read_to_end(arena, buffer);
let mut cursor: &[u8] = &buffer[..];
@@ -104,10 +104,9 @@ pub(crate) struct NumericalColumnWriter {
impl NumericalColumnWriter {
pub fn force_numerical_type(&mut self, numerical_type: NumericalType) {
assert!(
self.compatible_numerical_types
.is_type_accepted(numerical_type)
);
assert!(self
.compatible_numerical_types
.is_type_accepted(numerical_type));
self.compatible_numerical_types = CompatibleNumericalTypes::StaticType(numerical_type);
}
}
@@ -212,7 +211,7 @@ impl NumericalColumnWriter {
self,
arena: &MemoryArena,
buffer: &'a mut Vec<u8>,
) -> impl Iterator<Item = ColumnOperation<NumericalValue>> + 'a + use<'a> {
) -> impl Iterator<Item = ColumnOperation<NumericalValue>> + 'a {
self.column_writer.operation_iterator(arena, buffer)
}
}
@@ -256,7 +255,7 @@ impl StrOrBytesColumnWriter {
&self,
arena: &MemoryArena,
byte_buffer: &'a mut Vec<u8>,
) -> impl Iterator<Item = ColumnOperation<UnorderedId>> + 'a + use<'a> {
) -> impl Iterator<Item = ColumnOperation<UnorderedId>> + 'a {
self.column_writer.operation_iterator(arena, byte_buffer)
}
}

View File

@@ -8,13 +8,13 @@ use std::net::Ipv6Addr;
use column_operation::ColumnOperation;
pub(crate) use column_writers::CompatibleNumericalTypes;
use common::CountingWriter;
use common::json_path_writer::JSON_END_OF_PATH;
use common::CountingWriter;
pub(crate) use serializer::ColumnarSerializer;
use stacker::{Addr, ArenaHashMap, MemoryArena};
use crate::column_index::{SerializableColumnIndex, SerializableOptionalIndex};
use crate::column_values::{MonotonicallyMappableToU64, MonotonicallyMappableToU128};
use crate::column_values::{MonotonicallyMappableToU128, MonotonicallyMappableToU64};
use crate::columnar::column_type::ColumnType;
use crate::columnar::writer::column_writers::{
ColumnWriter, NumericalColumnWriter, StrOrBytesColumnWriter,

View File

@@ -3,11 +3,11 @@ use std::io::Write;
use common::json_path_writer::JSON_END_OF_PATH;
use common::{BinarySerializable, CountingWriter};
use sstable::RangeSSTable;
use sstable::value::RangeValueWriter;
use sstable::RangeSSTable;
use crate::RowId;
use crate::columnar::ColumnType;
use crate::RowId;
pub struct ColumnarSerializer<W: io::Write> {
wrt: CountingWriter<W>,

View File

@@ -1,6 +1,6 @@
use crate::RowId;
use crate::column_index::{SerializableMultivalueIndex, SerializableOptionalIndex};
use crate::iterable::Iterable;
use crate::RowId;
/// The `IndexBuilder` interprets a sequence of
/// calls of the form:
@@ -31,13 +31,12 @@ pub struct OptionalIndexBuilder {
impl OptionalIndexBuilder {
pub fn finish(&mut self, num_rows: RowId) -> impl Iterable<RowId> + '_ {
debug_assert!(
self.docs
.last()
.copied()
.map(|last_doc| last_doc < num_rows)
.unwrap_or(true)
);
debug_assert!(self
.docs
.last()
.copied()
.map(|last_doc| last_doc < num_rows)
.unwrap_or(true));
&self.docs[..]
}
@@ -49,13 +48,12 @@ impl OptionalIndexBuilder {
impl IndexBuilder for OptionalIndexBuilder {
#[inline(always)]
fn record_row(&mut self, doc: RowId) {
debug_assert!(
self.docs
.last()
.copied()
.map(|prev_doc| doc > prev_doc)
.unwrap_or(true)
);
debug_assert!(self
.docs
.last()
.copied()
.map(|prev_doc| doc > prev_doc)
.unwrap_or(true));
self.docs.push(doc);
}
}

View File

@@ -3,8 +3,8 @@ use std::path::PathBuf;
use itertools::Itertools;
use crate::{
CURRENT_VERSION, Cardinality, Column, ColumnarReader, DynamicColumn, StackMergeOrder,
merge_columnar,
merge_columnar, Cardinality, Column, ColumnarReader, DynamicColumn, StackMergeOrder,
CURRENT_VERSION,
};
const NUM_DOCS: u32 = u16::MAX as u32;

View File

@@ -6,7 +6,7 @@ use common::file_slice::FileSlice;
use common::{ByteCount, DateTime, HasLen, OwnedBytes};
use crate::column::{BytesColumn, Column, StrColumn};
use crate::column_values::{StrictlyMonotonicFn, monotonic_map_column};
use crate::column_values::{monotonic_map_column, StrictlyMonotonicFn};
use crate::columnar::ColumnType;
use crate::{Cardinality, ColumnIndex, ColumnValues, NumericalType, Version};

View File

@@ -17,10 +17,15 @@
//! column.
//! - [column_values]: Stores the values of a column in a dense format.
#![cfg_attr(all(feature = "unstable", test), feature(test))]
#[cfg(test)]
#[macro_use]
extern crate more_asserts;
#[cfg(all(test, feature = "unstable"))]
extern crate test;
use std::fmt::Display;
use std::io;
@@ -39,11 +44,11 @@ pub use block_accessor::ColumnBlockAccessor;
pub use column::{BytesColumn, Column, StrColumn};
pub use column_index::ColumnIndex;
pub use column_values::{
ColumnValues, EmptyColumnValues, MonotonicallyMappableToU64, MonotonicallyMappableToU128,
ColumnValues, EmptyColumnValues, MonotonicallyMappableToU128, MonotonicallyMappableToU64,
};
pub use columnar::{
CURRENT_VERSION, ColumnType, ColumnarReader, ColumnarWriter, HasAssociatedColumnType,
MergeRowOrder, ShuffleMergeOrder, StackMergeOrder, Version, merge_columnar,
merge_columnar, ColumnType, ColumnarReader, ColumnarWriter, HasAssociatedColumnType,
MergeRowOrder, ShuffleMergeOrder, StackMergeOrder, Version, CURRENT_VERSION,
};
use sstable::VoidSSTable;
pub use value::{NumericalType, NumericalValue};

View File

@@ -380,7 +380,7 @@ fn assert_columnar_eq(
right: &ColumnarReader,
lenient_on_numerical_value: bool,
) {
assert_eq!(left.num_docs(), right.num_docs());
assert_eq!(left.num_rows(), right.num_rows());
let left_columns = left.list_columns().unwrap();
let right_columns = right.list_columns().unwrap();
assert_eq!(left_columns.len(), right_columns.len());
@@ -588,7 +588,7 @@ proptest! {
#[test]
fn test_single_columnar_builder_proptest(docs in columnar_docs_strategy()) {
let columnar = build_columnar(&docs[..]);
assert_eq!(columnar.num_docs() as usize, docs.len());
assert_eq!(columnar.num_rows() as usize, docs.len());
let mut expected_columns: HashMap<(&str, ColumnTypeCategory), HashMap<u32, Vec<&ColumnValue>> > = Default::default();
for (doc_id, doc_vals) in docs.iter().enumerate() {
for (col_name, col_val) in doc_vals {
@@ -715,9 +715,8 @@ fn test_columnar_merging_number_columns() {
// TODO test required_columns
// TODO document edge case: required_columns incompatible with values.
#[allow(clippy::type_complexity)]
fn columnar_docs_and_remap()
-> impl Strategy<Value = (Vec<Vec<Vec<(&'static str, ColumnValue)>>>, Vec<RowAddr>)> {
fn columnar_docs_and_remap(
) -> impl Strategy<Value = (Vec<Vec<Vec<(&'static str, ColumnValue)>>>, Vec<RowAddr>)> {
proptest::collection::vec(columnar_docs_strategy(), 2..=3).prop_flat_map(
|columnars_docs: Vec<Vec<Vec<(&str, ColumnValue)>>>| {
let row_addrs: Vec<RowAddr> = columnars_docs
@@ -820,7 +819,7 @@ fn test_columnar_merge_empty() {
)
.unwrap();
let merged_columnar = ColumnarReader::open(output).unwrap();
assert_eq!(merged_columnar.num_docs(), 0);
assert_eq!(merged_columnar.num_rows(), 0);
assert_eq!(merged_columnar.num_columns(), 0);
}
@@ -846,7 +845,7 @@ fn test_columnar_merge_single_str_column() {
)
.unwrap();
let merged_columnar = ColumnarReader::open(output).unwrap();
assert_eq!(merged_columnar.num_docs(), 1);
assert_eq!(merged_columnar.num_rows(), 1);
assert_eq!(merged_columnar.num_columns(), 1);
}
@@ -878,7 +877,7 @@ fn test_delete_decrease_cardinality() {
)
.unwrap();
let merged_columnar = ColumnarReader::open(output).unwrap();
assert_eq!(merged_columnar.num_docs(), 1);
assert_eq!(merged_columnar.num_rows(), 1);
assert_eq!(merged_columnar.num_columns(), 1);
let cols = merged_columnar.read_columns("c").unwrap();
assert_eq!(cols.len(), 1);

View File

@@ -1,5 +1,3 @@
use std::str::FromStr;
use common::DateTime;
use crate::InvalidData;
@@ -11,23 +9,6 @@ pub enum NumericalValue {
F64(f64),
}
impl FromStr for NumericalValue {
type Err = ();
fn from_str(s: &str) -> Result<Self, ()> {
if let Ok(val_i64) = s.parse::<i64>() {
return Ok(val_i64.into());
}
if let Ok(val_u64) = s.parse::<u64>() {
return Ok(val_u64.into());
}
if let Ok(val_f64) = s.parse::<f64>() {
return Ok(NumericalValue::from(val_f64).normalize());
}
Err(())
}
}
impl NumericalValue {
pub fn numerical_type(&self) -> NumericalType {
match self {
@@ -45,7 +26,7 @@ impl NumericalValue {
if val <= i64::MAX as u64 {
NumericalValue::I64(val as i64)
} else {
NumericalValue::U64(val)
NumericalValue::F64(val as f64)
}
}
NumericalValue::I64(val) => NumericalValue::I64(val),
@@ -160,7 +141,6 @@ impl Coerce for DateTime {
#[cfg(test)]
mod tests {
use super::NumericalType;
use crate::NumericalValue;
#[test]
fn test_numerical_type_code() {
@@ -173,58 +153,4 @@ mod tests {
}
assert_eq!(num_numerical_type, 3);
}
#[test]
fn test_parse_numerical() {
assert_eq!(
"123".parse::<NumericalValue>().unwrap(),
NumericalValue::I64(123)
);
assert_eq!(
"18446744073709551615".parse::<NumericalValue>().unwrap(),
NumericalValue::U64(18446744073709551615u64)
);
assert_eq!(
"1.0".parse::<NumericalValue>().unwrap(),
NumericalValue::I64(1i64)
);
assert_eq!(
"1.1".parse::<NumericalValue>().unwrap(),
NumericalValue::F64(1.1f64)
);
assert_eq!(
"-1.0".parse::<NumericalValue>().unwrap(),
NumericalValue::I64(-1i64)
);
}
#[test]
fn test_normalize_numerical() {
assert_eq!(
NumericalValue::from(1u64).normalize(),
NumericalValue::I64(1i64),
);
let limit_val = i64::MAX as u64 + 1u64;
assert_eq!(
NumericalValue::from(limit_val).normalize(),
NumericalValue::U64(limit_val),
);
assert_eq!(
NumericalValue::from(-1i64).normalize(),
NumericalValue::I64(-1i64),
);
assert_eq!(
NumericalValue::from(-2.0f64).normalize(),
NumericalValue::I64(-2i64),
);
assert_eq!(
NumericalValue::from(-2.1f64).normalize(),
NumericalValue::F64(-2.1f64),
);
let large_float = 2.0f64.powf(70.0f64);
assert_eq!(
NumericalValue::from(large_float).normalize(),
NumericalValue::F64(large_float),
);
}
}

View File

@@ -1,9 +1,9 @@
[package]
name = "tantivy-common"
version = "0.10.0"
version = "0.7.0"
authors = ["Paul Masurel <paul@quickwit.io>", "Pascal Seitz <pascal@quickwit.io>"]
license = "MIT"
edition = "2024"
edition = "2021"
description = "common traits and utility functions used by multiple tantivy subcrates"
documentation = "https://docs.rs/tantivy_common/"
homepage = "https://github.com/quickwit-oss/tantivy"
@@ -13,7 +13,7 @@ repository = "https://github.com/quickwit-oss/tantivy"
[dependencies]
byteorder = "1.4.3"
ownedbytes = { version= "0.9", path="../ownedbytes" }
ownedbytes = { version= "0.7", path="../ownedbytes" }
async-trait = "0.1"
time = { version = "0.3.10", features = ["serde-well-known"] }
serde = { version = "1.0.136", features = ["derive"] }

View File

@@ -1,7 +1,7 @@
use binggan::{BenchRunner, black_box};
use binggan::{black_box, BenchRunner};
use rand::seq::IteratorRandom;
use rand::thread_rng;
use tantivy_common::{BitSet, TinySet, serialize_vint_u32};
use tantivy_common::{serialize_vint_u32, BitSet, TinySet};
fn bench_vint() {
let mut runner = BenchRunner::new();

View File

@@ -183,7 +183,7 @@ pub struct BitSet {
}
fn num_buckets(max_val: u32) -> u32 {
max_val.div_ceil(64u32)
(max_val + 63u32) / 64u32
}
impl BitSet {

View File

@@ -65,11 +65,11 @@ pub fn transform_bound_inner_res<TFrom, TTo>(
) -> io::Result<Bound<TTo>> {
use self::Bound::*;
Ok(match bound {
Excluded(from_val) => match transform(from_val)? {
Excluded(ref from_val) => match transform(from_val)? {
TransformBound::NewBound(new_val) => new_val,
TransformBound::Existing(new_val) => Excluded(new_val),
},
Included(from_val) => match transform(from_val)? {
Included(ref from_val) => match transform(from_val)? {
TransformBound::NewBound(new_val) => new_val,
TransformBound::Existing(new_val) => Included(new_val),
},
@@ -85,11 +85,11 @@ pub fn transform_bound_inner<TFrom, TTo>(
) -> Bound<TTo> {
use self::Bound::*;
match bound {
Excluded(from_val) => match transform(from_val) {
Excluded(ref from_val) => match transform(from_val) {
TransformBound::NewBound(new_val) => new_val,
TransformBound::Existing(new_val) => Excluded(new_val),
},
Included(from_val) => match transform(from_val) {
Included(ref from_val) => match transform(from_val) {
TransformBound::NewBound(new_val) => new_val,
TransformBound::Existing(new_val) => Included(new_val),
},
@@ -111,8 +111,8 @@ pub fn map_bound<TFrom, TTo>(
) -> Bound<TTo> {
use self::Bound::*;
match bound {
Excluded(from_val) => Bound::Excluded(transform(from_val)),
Included(from_val) => Bound::Included(transform(from_val)),
Excluded(ref from_val) => Bound::Excluded(transform(from_val)),
Included(ref from_val) => Bound::Included(transform(from_val)),
Unbounded => Unbounded,
}
}
@@ -123,8 +123,8 @@ pub fn map_bound_res<TFrom, TTo, Err>(
) -> Result<Bound<TTo>, Err> {
use self::Bound::*;
Ok(match bound {
Excluded(from_val) => Excluded(transform(from_val)?),
Included(from_val) => Included(transform(from_val)?),
Excluded(ref from_val) => Excluded(transform(from_val)?),
Included(ref from_val) => Included(transform(from_val)?),
Unbounded => Unbounded,
})
}

View File

@@ -74,7 +74,7 @@ impl FileHandle for WrapFile {
{
use std::io::{Read, Seek};
let mut file = self.file.try_clone()?; // Clone the file to read from it separately
// Seek to the start position in the file
// Seek to the start position in the file
file.seek(io::SeekFrom::Start(start as u64))?;
// Read the data into the buffer
file.read_exact(&mut buffer)?;
@@ -346,8 +346,8 @@ mod tests {
use std::sync::Arc;
use super::{FileHandle, FileSlice};
use crate::HasLen;
use crate::file_slice::combine_ranges;
use crate::HasLen;
#[test]
fn test_file_slice() -> io::Result<()> {

View File

@@ -22,7 +22,7 @@ pub use json_path_writer::JsonPathWriter;
pub use ownedbytes::{OwnedBytes, StableDeref};
pub use serialize::{BinarySerializable, DeserializeFrom, FixedSize};
pub use vint::{
VInt, VIntU128, read_u32_vint, read_u32_vint_no_advance, serialize_vint_u32, write_u32_vint,
read_u32_vint, read_u32_vint_no_advance, serialize_vint_u32, write_u32_vint, VInt, VIntU128,
};
pub use writer::{AntiCallToken, CountingWriter, TerminatingWrite};
@@ -177,10 +177,8 @@ pub(crate) mod test {
#[test]
fn test_f64_order() {
assert!(
!(f64_to_u64(f64::NEG_INFINITY)..f64_to_u64(f64::INFINITY))
.contains(&f64_to_u64(f64::NAN))
); // nan is not a number
assert!(!(f64_to_u64(f64::NEG_INFINITY)..f64_to_u64(f64::INFINITY))
.contains(&f64_to_u64(f64::NAN))); // nan is not a number
assert!(f64_to_u64(1.5) > f64_to_u64(1.0)); // same exponent, different mantissa
assert!(f64_to_u64(2.0) > f64_to_u64(1.0)); // same mantissa, different exponent
assert!(f64_to_u64(2.0) > f64_to_u64(1.5)); // different exponent and mantissa

View File

@@ -29,7 +29,6 @@ impl BinarySerializable for VIntU128 {
}
fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
#[allow(clippy::unbuffered_bytes)]
let mut bytes = reader.bytes();
let mut result = 0u128;
let mut shift = 0u64;
@@ -53,7 +52,7 @@ impl BinarySerializable for VIntU128 {
}
}
/// Wrapper over a `u64` that serializes as a variable int.
/// Wrapper over a `u64` that serializes as a variable int.
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct VInt(pub u64);
@@ -197,7 +196,6 @@ impl BinarySerializable for VInt {
}
fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
#[allow(clippy::unbuffered_bytes)]
let mut bytes = reader.bytes();
let mut result = 0u64;
let mut shift = 0u64;
@@ -224,7 +222,7 @@ impl BinarySerializable for VInt {
#[cfg(test)]
mod tests {
use super::{BinarySerializable, VInt, serialize_vint_u32};
use super::{serialize_vint_u32, BinarySerializable, VInt};
fn aux_test_vint(val: u64) {
let mut v = [14u8; 10];

Binary file not shown.

Before

Width:  |  Height:  |  Size: 7.4 KiB

After

Width:  |  Height:  |  Size: 30 KiB

View File

@@ -51,7 +51,7 @@ fn main() -> tantivy::Result<()> {
// Our second field is body.
// We want full-text search for it, but we do not
// need to be able to retrieve it
// need to be able to be able to retrieve it
// for our application.
//
// We can make our index lighter by omitting the `STORED` flag.

View File

@@ -1,7 +1,7 @@
[package]
authors = ["Paul Masurel <paul@quickwit.io>", "Pascal Seitz <pascal@quickwit.io>"]
name = "ownedbytes"
version = "0.9.0"
version = "0.7.0"
edition = "2021"
description = "Expose data as static slice"
license = "MIT"

View File

@@ -1,6 +1,6 @@
[package]
name = "tantivy-query-grammar"
version = "0.25.0"
version = "0.22.0"
authors = ["Paul Masurel <paul.masurel@gmail.com>"]
license = "MIT"
categories = ["database-implementations", "data-structures"]
@@ -9,11 +9,7 @@ homepage = "https://github.com/quickwit-oss/tantivy"
repository = "https://github.com/quickwit-oss/tantivy"
readme = "README.md"
keywords = ["search", "information", "retrieval"]
edition = "2024"
edition = "2021"
[dependencies]
nom = "7"
serde = { version = "1.0.219", features = ["derive"] }
serde_json = "1.0.140"
ordered-float = "5.0.0"
fnv = "1.0.7"

View File

@@ -3,7 +3,6 @@
use std::convert::Infallible;
use nom::{AsChar, IResult, InputLength, InputTakeAtPosition};
use serde::Serialize;
pub(crate) type ErrorList = Vec<LenientErrorInternal>;
pub(crate) type JResult<I, O> = IResult<I, (O, ErrorList), Infallible>;
@@ -16,8 +15,7 @@ pub(crate) struct LenientErrorInternal {
}
/// A recoverable error and the position it happened at
#[derive(Debug, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
#[derive(Debug, PartialEq)]
pub struct LenientError {
pub pos: usize,
pub message: String,
@@ -117,22 +115,6 @@ where F: nom::Parser<I, (O, ErrorList), Infallible> {
}
}
pub(crate) fn terminated_infallible<I, O1, O2, F, G>(
mut first: F,
mut second: G,
) -> impl FnMut(I) -> JResult<I, O1>
where
F: nom::Parser<I, (O1, ErrorList), Infallible>,
G: nom::Parser<I, (O2, ErrorList), Infallible>,
{
move |input: I| {
let (input, (o1, mut err)) = first.parse(input)?;
let (input, (_, mut err2)) = second.parse(input)?;
err.append(&mut err2);
Ok((input, (o1, err)))
}
}
pub(crate) fn delimited_infallible<I, O1, O2, O3, F, G, H>(
mut first: F,
mut second: G,
@@ -202,19 +184,19 @@ macro_rules! tuple_trait_impl(
);
macro_rules! tuple_trait_inner(
($it:tt, $self:expr_2021, $input:expr_2021, (), $error_list:expr_2021, $head:ident $($id:ident)+) => ({
($it:tt, $self:expr, $input:expr, (), $error_list:expr, $head:ident $($id:ident)+) => ({
let (i, (o, mut err)) = $self.$it.parse($input.clone())?;
$error_list.append(&mut err);
succ!($it, tuple_trait_inner!($self, i, ( o ), $error_list, $($id)+))
});
($it:tt, $self:expr_2021, $input:expr_2021, ($($parsed:tt)*), $error_list:expr_2021, $head:ident $($id:ident)+) => ({
($it:tt, $self:expr, $input:expr, ($($parsed:tt)*), $error_list:expr, $head:ident $($id:ident)+) => ({
let (i, (o, mut err)) = $self.$it.parse($input.clone())?;
$error_list.append(&mut err);
succ!($it, tuple_trait_inner!($self, i, ($($parsed)* , o), $error_list, $($id)+))
});
($it:tt, $self:expr_2021, $input:expr_2021, ($($parsed:tt)*), $error_list:expr_2021, $head:ident) => ({
($it:tt, $self:expr, $input:expr, ($($parsed:tt)*), $error_list:expr, $head:ident) => ({
let (i, (o, mut err)) = $self.$it.parse($input.clone())?;
$error_list.append(&mut err);
@@ -344,13 +326,13 @@ macro_rules! alt_trait_impl(
);
macro_rules! alt_trait_inner(
($it:tt, $self:expr_2021, $input:expr_2021, $head_cond:ident $head:ident, $($id_cond:ident $id:ident),+) => (
($it:tt, $self:expr, $input:expr, $head_cond:ident $head:ident, $($id_cond:ident $id:ident),+) => (
match $self.$it.0.parse($input.clone()) {
Err(_) => succ!($it, alt_trait_inner!($self, $input, $($id_cond $id),+)),
Ok((input_left, _)) => Some($self.$it.1.parse(input_left)),
}
);
($it:tt, $self:expr_2021, $input:expr_2021, $head_cond:ident $head:ident) => (
($it:tt, $self:expr, $input:expr, $head_cond:ident $head:ident) => (
None
);
);
@@ -371,21 +353,3 @@ where
{
move |i: I| l.choice(i.clone()).unwrap_or_else(|| default.parse(i))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_lenient_error_serialization() {
let error = LenientError {
pos: 42,
message: "test error message".to_string(),
};
assert_eq!(
serde_json::to_string(&error).unwrap(),
"{\"pos\":42,\"message\":\"test error message\"}"
);
}
}

View File

@@ -1,7 +1,5 @@
#![allow(clippy::derive_partial_eq_without_eq)]
use serde::Serialize;
mod infallible;
mod occur;
mod query_grammar;
@@ -14,8 +12,6 @@ pub use crate::user_input_ast::{
Delimiter, UserInputAst, UserInputBound, UserInputLeaf, UserInputLiteral,
};
#[derive(Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub struct Error;
/// Parse a query
@@ -28,41 +24,3 @@ pub fn parse_query(query: &str) -> Result<UserInputAst, Error> {
pub fn parse_query_lenient(query: &str) -> (UserInputAst, Vec<LenientError>) {
parse_to_ast_lenient(query)
}
#[cfg(test)]
mod tests {
use crate::{UserInputAst, parse_query, parse_query_lenient};
#[test]
fn test_deduplication() {
let ast: UserInputAst = parse_query("a a").unwrap();
let json = serde_json::to_string(&ast).unwrap();
assert_eq!(
json,
r#"{"type":"bool","clauses":[[null,{"type":"literal","field_name":null,"phrase":"a","delimiter":"none","slop":0,"prefix":false}]]}"#
);
}
#[test]
fn test_parse_query_serialization() {
let ast = parse_query("title:hello OR title:x").unwrap();
let json = serde_json::to_string(&ast).unwrap();
assert_eq!(
json,
r#"{"type":"bool","clauses":[["should",{"type":"literal","field_name":"title","phrase":"hello","delimiter":"none","slop":0,"prefix":false}],["should",{"type":"literal","field_name":"title","phrase":"x","delimiter":"none","slop":0,"prefix":false}]]}"#
);
}
#[test]
fn test_parse_query_wrong_query() {
assert!(parse_query("title:").is_err());
}
#[test]
fn test_parse_query_lenient_wrong_query() {
let (_, errors) = parse_query_lenient("title:");
assert!(errors.len() == 1);
let json = serde_json::to_string(&errors).unwrap();
assert_eq!(json, r#"[{"pos":6,"message":"expected word"}]"#);
}
}

View File

@@ -1,12 +1,9 @@
use std::fmt;
use std::fmt::Write;
use serde::Serialize;
/// Defines whether a term in a query must be present,
/// should be present or must not be present.
#[derive(Debug, Clone, Hash, Copy, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
#[derive(Debug, Clone, Hash, Copy, Eq, PartialEq)]
pub enum Occur {
/// For a given document to be considered for scoring,
/// at least one of the queries with the Should or the Must

View File

@@ -1,8 +1,6 @@
use std::borrow::Cow;
use std::iter::once;
use fnv::FnvHashSet;
use nom::IResult;
use nom::branch::alt;
use nom::bytes::complete::tag;
use nom::character::complete::{
@@ -12,11 +10,12 @@ use nom::combinator::{eof, map, map_res, opt, peek, recognize, value, verify};
use nom::error::{Error, ErrorKind};
use nom::multi::{many0, many1, separated_list0};
use nom::sequence::{delimited, preceded, separated_pair, terminated, tuple};
use nom::IResult;
use super::user_input_ast::{UserInputAst, UserInputBound, UserInputLeaf, UserInputLiteral};
use crate::Occur;
use crate::infallible::*;
use crate::user_input_ast::Delimiter;
use crate::Occur;
// Note: '-' char is only forbidden at the beginning of a field name, would be clearer to add it to
// special characters.
@@ -37,7 +36,7 @@ fn field_name(inp: &str) -> IResult<&str, String> {
alt((first_char, escape_sequence())),
many0(alt((simple_char, escape_sequence(), char('\\')))),
)),
tuple((multispace0, char(':'), multispace0)),
char(':'),
),
|(first_char, next)| once(first_char).chain(next).collect(),
)(inp)
@@ -69,7 +68,7 @@ fn interpret_escape(source: &str) -> String {
/// Consume a word outside of any context.
// TODO should support escape sequences
fn word(inp: &str) -> IResult<&str, Cow<'_, str>> {
fn word(inp: &str) -> IResult<&str, Cow<str>> {
map_res(
recognize(tuple((
alt((
@@ -306,14 +305,15 @@ fn term_group_infallible(inp: &str) -> JResult<&str, UserInputAst> {
let (inp, (field_name, _, _, _)) =
tuple((field_name, multispace0, char('('), multispace0))(inp).expect("precondition failed");
delimited_infallible(
let res = delimited_infallible(
nothing,
map(ast_infallible, |(mut ast, errors)| {
ast.set_default_field(field_name.to_string());
(ast, errors)
}),
opt_i_err(char(')'), "expected ')'"),
)(inp)
)(inp);
res
}
fn exists(inp: &str) -> IResult<&str, UserInputLeaf> {
@@ -367,10 +367,7 @@ fn literal(inp: &str) -> IResult<&str, UserInputAst> {
// something (a field name) got parsed before
alt((
map(
tuple((
opt(field_name),
alt((range, set, exists, regex, term_or_phrase)),
)),
tuple((opt(field_name), alt((range, set, exists, term_or_phrase)))),
|(field_name, leaf): (Option<String>, UserInputLeaf)| leaf.set_field(field_name).into(),
),
term_group,
@@ -392,10 +389,6 @@ fn literal_no_group_infallible(inp: &str) -> JResult<&str, Option<UserInputAst>>
value((), peek(one_of("{[><"))),
map(range_infallible, |(range, errs)| (Some(range), errs)),
),
(
value((), peek(one_of("/"))),
map(regex_infallible, |(regex, errs)| (Some(regex), errs)),
),
),
delimited_infallible(space0_infallible, term_or_phrase_infallible, nothing),
),
@@ -696,61 +689,6 @@ fn set_infallible(mut inp: &str) -> JResult<&str, UserInputLeaf> {
}
}
fn regex(inp: &str) -> IResult<&str, UserInputLeaf> {
map(
terminated(
delimited(
char('/'),
many1(alt((preceded(char('\\'), char('/')), none_of("/")))),
char('/'),
),
peek(alt((multispace1, eof))),
),
|elements| UserInputLeaf::Regex {
field: None,
pattern: elements.into_iter().collect::<String>(),
},
)(inp)
}
fn regex_infallible(inp: &str) -> JResult<&str, UserInputLeaf> {
match terminated_infallible(
delimited_infallible(
opt_i_err(char('/'), "missing delimiter /"),
opt_i(many1(alt((preceded(char('\\'), char('/')), none_of("/"))))),
opt_i_err(char('/'), "missing delimiter /"),
),
opt_i_err(
peek(alt((multispace1, eof))),
"expected whitespace or end of input",
),
)(inp)
{
Ok((rest, (elements_part, errors))) => {
let pattern = match elements_part {
Some(elements_part) => elements_part.into_iter().collect(),
None => String::new(),
};
let res = UserInputLeaf::Regex {
field: None,
pattern,
};
Ok((rest, (res, errors)))
}
Err(e) => {
let errs = vec![LenientErrorInternal {
pos: inp.len(),
message: e.to_string(),
}];
let res = UserInputLeaf::Regex {
field: None,
pattern: String::new(),
};
Ok((inp, (res, errs)))
}
}
}
fn negate(expr: UserInputAst) -> UserInputAst {
expr.unary(Occur::MustNot)
}
@@ -815,7 +753,7 @@ fn boosted_leaf(inp: &str) -> IResult<&str, UserInputAst> {
tuple((leaf, fallible(boost))),
|(leaf, boost_opt)| match boost_opt {
Some(boost) if (boost - 1.0).abs() > f64::EPSILON => {
UserInputAst::Boost(Box::new(leaf), boost.into())
UserInputAst::Boost(Box::new(leaf), boost)
}
_ => leaf,
},
@@ -827,7 +765,7 @@ fn boosted_leaf_infallible(inp: &str) -> JResult<&str, Option<UserInputAst>> {
tuple_infallible((leaf_infallible, boost)),
|((leaf, boost_opt), error)| match boost_opt {
Some(boost) if (boost - 1.0).abs() > f64::EPSILON => (
leaf.map(|leaf| UserInputAst::Boost(Box::new(leaf), boost.into())),
leaf.map(|leaf| UserInputAst::Boost(Box::new(leaf), boost)),
error,
),
_ => (leaf, error),
@@ -1078,25 +1016,12 @@ pub fn parse_to_ast_lenient(query_str: &str) -> (UserInputAst, Vec<LenientError>
(rewrite_ast(res), errors)
}
/// Removes unnecessary children clauses in AST
///
/// Motivated by [issue #1433](https://github.com/quickwit-oss/tantivy/issues/1433)
fn rewrite_ast(mut input: UserInputAst) -> UserInputAst {
if let UserInputAst::Clause(sub_clauses) = &mut input {
// call rewrite_ast recursively on children clauses if applicable
let mut new_clauses = Vec::with_capacity(sub_clauses.len());
for (occur, clause) in sub_clauses.drain(..) {
let rewritten_clause = rewrite_ast(clause);
new_clauses.push((occur, rewritten_clause));
}
*sub_clauses = new_clauses;
// remove duplicate child clauses
// e.g. (+a +b) OR (+c +d) OR (+a +b) => (+a +b) OR (+c +d)
let mut seen = FnvHashSet::default();
sub_clauses.retain(|term| seen.insert(term.clone()));
// Removes unnecessary children clauses in AST
//
// Motivated by [issue #1433](https://github.com/quickwit-oss/tantivy/issues/1433)
for term in sub_clauses {
if let UserInputAst::Clause(terms) = &mut input {
for term in terms {
rewrite_ast_clause(term);
}
}
@@ -1105,7 +1030,7 @@ fn rewrite_ast(mut input: UserInputAst) -> UserInputAst {
fn rewrite_ast_clause(input: &mut (Option<Occur>, UserInputAst)) {
match input {
(None, UserInputAst::Clause(clauses)) if clauses.len() == 1 => {
(None, UserInputAst::Clause(ref mut clauses)) if clauses.len() == 1 => {
*input = clauses.pop().unwrap(); // safe because clauses.len() == 1
}
_ => {}
@@ -1358,10 +1283,6 @@ mod test {
super::field_name("~my~field:a"),
Ok(("a", "~my~field".to_string()))
);
assert_eq!(
super::field_name(".my.field.name : a"),
Ok(("a", ".my.field.name".to_string()))
);
for special_char in SPECIAL_CHARS.iter() {
let query = &format!("\\{special_char}my\\{special_char}field:a");
assert_eq!(
@@ -1455,7 +1376,7 @@ mod test {
#[test]
fn test_range_parser_lenient() {
let literal = |query| literal_infallible(query).unwrap().1.0.unwrap();
let literal = |query| literal_infallible(query).unwrap().1 .0.unwrap();
// same tests as non-lenient
let res = literal("title: <hello");
@@ -1768,72 +1689,4 @@ mod test {
fn test_invalid_field() {
test_is_parse_err(r#"!bc:def"#, "!bc:def");
}
#[test]
fn test_regex_parser() {
let r = parse_to_ast(r#"a:/joh?n(ath[oa]n)/"#);
assert!(r.is_ok(), "Failed to parse custom query: {r:?}");
let (_, input) = r.unwrap();
match input {
UserInputAst::Leaf(leaf) => match leaf.as_ref() {
UserInputLeaf::Regex { field, pattern } => {
assert_eq!(field, &Some("a".to_string()));
assert_eq!(pattern, "joh?n(ath[oa]n)");
}
_ => panic!("Expected a regex leaf, got {leaf:?}"),
},
_ => panic!("Expected a leaf"),
}
let r = parse_to_ast(r#"a:/\\/cgi-bin\\/luci.*/"#);
assert!(r.is_ok(), "Failed to parse custom query: {r:?}");
let (_, input) = r.unwrap();
match input {
UserInputAst::Leaf(leaf) => match leaf.as_ref() {
UserInputLeaf::Regex { field, pattern } => {
assert_eq!(field, &Some("a".to_string()));
assert_eq!(pattern, "\\/cgi-bin\\/luci.*");
}
_ => panic!("Expected a regex leaf, got {leaf:?}"),
},
_ => panic!("Expected a leaf"),
}
}
#[test]
fn test_regex_parser_lenient() {
let literal = |query| literal_infallible(query).unwrap().1;
let (res, errs) = literal(r#"a:/joh?n(ath[oa]n)/"#);
let expected = UserInputLeaf::Regex {
field: Some("a".to_string()),
pattern: "joh?n(ath[oa]n)".to_string(),
}
.into();
assert_eq!(res.unwrap(), expected);
assert!(errs.is_empty(), "Expected no errors, got: {errs:?}");
let (res, errs) = literal("title:/joh?n(ath[oa]n)");
let expected = UserInputLeaf::Regex {
field: Some("title".to_string()),
pattern: "joh?n(ath[oa]n)".to_string(),
}
.into();
assert_eq!(res.unwrap(), expected);
assert_eq!(errs.len(), 1, "Expected 1 error, got: {errs:?}");
assert_eq!(
errs[0].message, "missing delimiter /",
"Unexpected error message",
);
}
#[test]
fn test_space_before_value() {
test_parse_query_to_ast_helper("field : a", r#""field":a"#);
test_parse_query_to_ast_helper("field: a", r#""field":a"#);
test_parse_query_to_ast_helper("field :a", r#""field":a"#);
test_parse_query_to_ast_helper(
"field : 'happy tax payer' AND other_field : 1",
r#"(+"field":'happy tax payer' +"other_field":1)"#,
);
}
}

View File

@@ -1,13 +1,9 @@
use std::fmt;
use std::fmt::{Debug, Formatter};
use serde::Serialize;
use crate::Occur;
#[derive(PartialEq, Eq, Hash, Clone, Serialize)]
#[serde(tag = "type")]
#[serde(rename_all = "snake_case")]
#[derive(PartialEq, Clone)]
pub enum UserInputLeaf {
Literal(UserInputLiteral),
All,
@@ -23,10 +19,6 @@ pub enum UserInputLeaf {
Exists {
field: String,
},
Regex {
field: Option<String>,
pattern: String,
},
}
impl UserInputLeaf {
@@ -50,13 +42,12 @@ impl UserInputLeaf {
UserInputLeaf::Exists { field: _ } => UserInputLeaf::Exists {
field: field.expect("Exist query without a field isn't allowed"),
},
UserInputLeaf::Regex { field: _, pattern } => UserInputLeaf::Regex { field, pattern },
}
}
pub(crate) fn set_default_field(&mut self, default_field: String) {
match self {
UserInputLeaf::Literal(literal) if literal.field_name.is_none() => {
UserInputLeaf::Literal(ref mut literal) if literal.field_name.is_none() => {
literal.field_name = Some(default_field)
}
UserInputLeaf::All => {
@@ -64,8 +55,12 @@ impl UserInputLeaf {
field: default_field,
}
}
UserInputLeaf::Range { field, .. } if field.is_none() => *field = Some(default_field),
UserInputLeaf::Set { field, .. } if field.is_none() => *field = Some(default_field),
UserInputLeaf::Range { ref mut field, .. } if field.is_none() => {
*field = Some(default_field)
}
UserInputLeaf::Set { ref mut field, .. } if field.is_none() => {
*field = Some(default_field)
}
_ => (), // field was already set, do nothing
}
}
@@ -76,11 +71,11 @@ impl Debug for UserInputLeaf {
match self {
UserInputLeaf::Literal(literal) => literal.fmt(formatter),
UserInputLeaf::Range {
field,
lower,
upper,
ref field,
ref lower,
ref upper,
} => {
if let Some(field) = field {
if let Some(ref field) = field {
// TODO properly escape field (in case of \")
write!(formatter, "\"{field}\":")?;
}
@@ -90,7 +85,7 @@ impl Debug for UserInputLeaf {
Ok(())
}
UserInputLeaf::Set { field, elements } => {
if let Some(field) = field {
if let Some(ref field) = field {
// TODO properly escape field (in case of \")
write!(formatter, "\"{field}\": ")?;
}
@@ -108,28 +103,18 @@ impl Debug for UserInputLeaf {
UserInputLeaf::Exists { field } => {
write!(formatter, "$exists(\"{field}\")")
}
UserInputLeaf::Regex { field, pattern } => {
if let Some(field) = field {
// TODO properly escape field (in case of \")
write!(formatter, "\"{field}\":")?;
}
// TODO properly escape pattern (in case of \")
write!(formatter, "/{pattern}/")
}
}
}
}
#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
pub enum Delimiter {
SingleQuotes,
DoubleQuotes,
None,
}
#[derive(PartialEq, Eq, Hash, Clone, Serialize)]
#[serde(rename_all = "snake_case")]
#[derive(PartialEq, Clone)]
pub struct UserInputLiteral {
pub field_name: Option<String>,
pub phrase: String,
@@ -167,9 +152,7 @@ impl fmt::Debug for UserInputLiteral {
}
}
#[derive(PartialEq, Eq, Hash, Debug, Clone, Serialize)]
#[serde(tag = "type", content = "value")]
#[serde(rename_all = "snake_case")]
#[derive(PartialEq, Debug, Clone)]
pub enum UserInputBound {
Inclusive(String),
Exclusive(String),
@@ -204,39 +187,11 @@ impl UserInputBound {
}
}
#[derive(PartialEq, Eq, Hash, Clone, Serialize)]
#[serde(into = "UserInputAstSerde")]
#[derive(PartialEq, Clone)]
pub enum UserInputAst {
Clause(Vec<(Option<Occur>, UserInputAst)>),
Boost(Box<UserInputAst>, ordered_float::OrderedFloat<f64>),
Leaf(Box<UserInputLeaf>),
}
#[derive(Serialize)]
#[serde(tag = "type", rename_all = "snake_case")]
enum UserInputAstSerde {
Bool {
clauses: Vec<(Option<Occur>, UserInputAst)>,
},
Boost {
underlying: Box<UserInputAst>,
boost: f64,
},
#[serde(untagged)]
Leaf(Box<UserInputLeaf>),
}
impl From<UserInputAst> for UserInputAstSerde {
fn from(ast: UserInputAst) -> Self {
match ast {
UserInputAst::Clause(clause) => UserInputAstSerde::Bool { clauses: clause },
UserInputAst::Boost(underlying, boost) => UserInputAstSerde::Boost {
underlying,
boost: boost.into_inner(),
},
UserInputAst::Leaf(leaf) => UserInputAstSerde::Leaf(leaf),
}
}
Boost(Box<UserInputAst>, f64),
}
impl UserInputAst {
@@ -277,7 +232,7 @@ impl UserInputAst {
.iter_mut()
.for_each(|(_, ast)| ast.set_default_field(field.clone())),
UserInputAst::Leaf(leaf) => leaf.set_default_field(field),
UserInputAst::Boost(ast, _) => ast.set_default_field(field),
UserInputAst::Boost(ref mut ast, _) => ast.set_default_field(field),
}
}
}
@@ -330,126 +285,3 @@ impl fmt::Debug for UserInputAst {
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_all_leaf_serialization() {
let ast = UserInputAst::Leaf(Box::new(UserInputLeaf::All));
let json = serde_json::to_string(&ast).unwrap();
assert_eq!(json, r#"{"type":"all"}"#);
}
#[test]
fn test_literal_leaf_serialization() {
let literal = UserInputLiteral {
field_name: Some("title".to_string()),
phrase: "hello".to_string(),
delimiter: Delimiter::None,
slop: 0,
prefix: false,
};
let ast = UserInputAst::Leaf(Box::new(UserInputLeaf::Literal(literal)));
let json = serde_json::to_string(&ast).unwrap();
assert_eq!(
json,
r#"{"type":"literal","field_name":"title","phrase":"hello","delimiter":"none","slop":0,"prefix":false}"#
);
}
#[test]
fn test_range_leaf_serialization() {
let range = UserInputLeaf::Range {
field: Some("price".to_string()),
lower: UserInputBound::Inclusive("10".to_string()),
upper: UserInputBound::Exclusive("100".to_string()),
};
let ast = UserInputAst::Leaf(Box::new(range));
let json = serde_json::to_string(&ast).unwrap();
assert_eq!(
json,
r#"{"type":"range","field":"price","lower":{"type":"inclusive","value":"10"},"upper":{"type":"exclusive","value":"100"}}"#
);
}
#[test]
fn test_range_leaf_unbounded_serialization() {
let range = UserInputLeaf::Range {
field: Some("price".to_string()),
lower: UserInputBound::Inclusive("10".to_string()),
upper: UserInputBound::Unbounded,
};
let ast = UserInputAst::Leaf(Box::new(range));
let json = serde_json::to_string(&ast).unwrap();
assert_eq!(
json,
r#"{"type":"range","field":"price","lower":{"type":"inclusive","value":"10"},"upper":{"type":"unbounded"}}"#
);
}
#[test]
fn test_boost_serialization() {
let inner_ast = UserInputAst::Leaf(Box::new(UserInputLeaf::All));
let boost_ast = UserInputAst::Boost(Box::new(inner_ast), 2.5.into());
let json = serde_json::to_string(&boost_ast).unwrap();
assert_eq!(
json,
r#"{"type":"boost","underlying":{"type":"all"},"boost":2.5}"#
);
}
#[test]
fn test_boost_serialization2() {
let boost_ast = UserInputAst::Boost(
Box::new(UserInputAst::Clause(vec![
(
Some(Occur::Must),
UserInputAst::Leaf(Box::new(UserInputLeaf::All)),
),
(
Some(Occur::Should),
UserInputAst::Leaf(Box::new(UserInputLeaf::Literal(UserInputLiteral {
field_name: Some("title".to_string()),
phrase: "hello".to_string(),
delimiter: Delimiter::None,
slop: 0,
prefix: false,
}))),
),
])),
2.5.into(),
);
let json = serde_json::to_string(&boost_ast).unwrap();
assert_eq!(
json,
r#"{"type":"boost","underlying":{"type":"bool","clauses":[["must",{"type":"all"}],["should",{"type":"literal","field_name":"title","phrase":"hello","delimiter":"none","slop":0,"prefix":false}]]},"boost":2.5}"#
);
}
#[test]
fn test_clause_serialization() {
let clause = UserInputAst::Clause(vec![
(
Some(Occur::Must),
UserInputAst::Leaf(Box::new(UserInputLeaf::All)),
),
(
Some(Occur::Should),
UserInputAst::Leaf(Box::new(UserInputLeaf::Literal(UserInputLiteral {
field_name: Some("title".to_string()),
phrase: "hello".to_string(),
delimiter: Delimiter::None,
slop: 0,
prefix: false,
}))),
),
]);
let json = serde_json::to_string(&clause).unwrap();
assert_eq!(
json,
r#"{"type":"bool","clauses":[["must",{"type":"all"}],["should",{"type":"literal","field_name":"title","phrase":"hello","delimiter":"none","slop":0,"prefix":false}]]}"#
);
}
}

View File

@@ -125,9 +125,26 @@ impl MetricResult {
}
/// BucketEntry holds bucket aggregation result types.
// the order of fields is important to deserialize properly
// Terms must be first because all Terms are valid Range (we ignore unknown fields)
// Range and Histogram are always ambiguous, they contain the same 3 required fields, and all else
// is optional Having Range is usually more useful (contains more fields, missing field from
// Histogram can be obtained by key.to_string())
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(untagged)]
pub enum BucketResult {
/// This is the term result
Terms {
/// The buckets.
///
/// See [`TermsAggregation`](super::bucket::TermsAggregation)
buckets: Vec<BucketEntry>,
/// The number of documents that didnt make it into to TOP N due to shard_size or size
sum_other_doc_count: u64,
#[serde(skip_serializing_if = "Option::is_none")]
/// The upper bound error for the doc count of each term.
doc_count_error_upper_bound: Option<u64>,
},
/// This is the range entry for a bucket, which contains a key, count, from, to, and optionally
/// sub-aggregations.
Range {
@@ -144,18 +161,6 @@ pub enum BucketResult {
/// See [`HistogramAggregation`](super::bucket::HistogramAggregation)
buckets: BucketEntries<BucketEntry>,
},
/// This is the term result
Terms {
/// The buckets.
///
/// See [`TermsAggregation`](super::bucket::TermsAggregation)
buckets: Vec<BucketEntry>,
/// The number of documents that didnt make it into to TOP N due to shard_size or size
sum_other_doc_count: u64,
#[serde(skip_serializing_if = "Option::is_none")]
/// The upper bound error for the doc count of each term.
doc_count_error_upper_bound: Option<u64>,
},
}
impl BucketResult {

View File

@@ -301,7 +301,7 @@ impl SegmentAggregationCollector for SegmentHistogramCollector {
let bounds = self.bounds;
let interval = self.interval;
let offset = self.offset;
let get_bucket_pos = |val| get_bucket_pos_f64(val, interval, offset) as i64;
let get_bucket_pos = |val| (get_bucket_pos_f64(val, interval, offset) as i64);
bucket_agg_accessor
.column_block_accessor

View File

@@ -518,7 +518,7 @@ impl SegmentTermCollector {
|term| {
let entry = entries[idx];
let intermediate_entry = into_intermediate_bucket_entry(entry.0, entry.1)
.map_err(io::Error::other)?;
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
dict.insert(
IntermediateKey::Str(
String::from_utf8(term.to_vec()).expect("could not convert to String"),

View File

@@ -229,7 +229,6 @@ impl TopHitsAggregationReq {
self.sort
.iter()
.map(|KeyOrder { field, .. }| field.as_str())
.chain(self.doc_value_fields.iter().map(|s| s.as_str()))
.collect()
}

View File

@@ -366,12 +366,8 @@ impl PartialEq for Key {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(Self::Str(l), Self::Str(r)) => l == r,
(Self::F64(l), Self::F64(r)) => l.to_bits() == r.to_bits(),
(Self::I64(l), Self::I64(r)) => l == r,
(Self::U64(l), Self::U64(r)) => l == r,
// we list all variant of left operand to make sure this gets updated when we add
// variants to the enum
(Self::Str(_) | Self::F64(_) | Self::I64(_) | Self::U64(_), _) => false,
(Self::F64(l), Self::F64(r)) => l == r,
_ => false,
}
}
}

View File

@@ -484,6 +484,7 @@ impl FacetCounts {
#[cfg(test)]
mod tests {
use std::collections::BTreeSet;
use std::iter;
use columnar::Dictionary;
use rand::distributions::Uniform;
@@ -738,7 +739,7 @@ mod tests {
.flat_map(|(c, count)| {
let facet = Facet::from(&format!("/facet/{c}"));
let doc = doc!(facet_field => facet);
std::iter::repeat_n(doc, count)
iter::repeat(doc).take(count)
})
.map(|mut doc| {
doc.add_facet(
@@ -786,7 +787,7 @@ mod tests {
.flat_map(|(c, count)| {
let facet = Facet::from(&format!("/facet/{c}"));
let doc = doc!(facet_field => facet);
std::iter::repeat_n(doc, count)
iter::repeat(doc).take(count)
})
.collect();

View File

@@ -2,13 +2,11 @@ use std::fmt;
use std::marker::PhantomData;
use std::sync::Arc;
use columnar::{ColumnValues, StrColumn};
use columnar::ColumnValues;
use serde::{Deserialize, Serialize};
use super::Collector;
use crate::collector::custom_score_top_collector::{
CustomScoreTopCollector, CustomScoreTopSegmentCollector,
};
use crate::collector::custom_score_top_collector::CustomScoreTopCollector;
use crate::collector::top_collector::{ComparableDoc, TopCollector, TopSegmentCollector};
use crate::collector::tweak_score_top_collector::TweakedScoreTopCollector;
use crate::collector::{
@@ -16,7 +14,6 @@ use crate::collector::{
};
use crate::fastfield::{FastFieldNotAvailableError, FastValue};
use crate::query::Weight;
use crate::termdict::TermOrdinal;
use crate::{DocAddress, DocId, Order, Score, SegmentOrdinal, SegmentReader, TantivyError};
struct FastFieldConvertCollector<
@@ -86,163 +83,6 @@ where
}
}
struct StringConvertCollector {
pub collector: CustomScoreTopCollector<ScorerByField, u64>,
pub field: String,
order: Order,
limit: usize,
offset: usize,
}
impl Collector for StringConvertCollector {
type Fruit = Vec<(String, DocAddress)>;
type Child = StringConvertSegmentCollector;
fn for_segment(
&self,
segment_local_id: crate::SegmentOrdinal,
segment: &SegmentReader,
) -> crate::Result<Self::Child> {
let schema = segment.schema();
let field = schema.get_field(&self.field)?;
let field_entry = schema.get_field_entry(field);
if !field_entry.is_fast() {
return Err(TantivyError::SchemaError(format!(
"Field {:?} is not a fast field.",
field_entry.name()
)));
}
let requested_type = crate::schema::Type::Str;
let schema_type = field_entry.field_type().value_type();
if schema_type != requested_type {
return Err(TantivyError::SchemaError(format!(
"Field {:?} is of type {schema_type:?}!={requested_type:?}",
field_entry.name()
)));
}
let ff = segment
.fast_fields()
.str(&self.field)?
.expect("ff should be a str field");
Ok(StringConvertSegmentCollector {
collector: self.collector.for_segment(segment_local_id, segment)?,
ff,
order: self.order.clone(),
})
}
fn requires_scoring(&self) -> bool {
self.collector.requires_scoring()
}
fn merge_fruits(
&self,
child_fruits: Vec<<Self::Child as SegmentCollector>::Fruit>,
) -> crate::Result<Self::Fruit> {
if self.limit == 0 {
return Ok(Vec::new());
}
if self.order.is_desc() {
let mut top_collector: TopNComputer<_, _, true> =
TopNComputer::new(self.limit + self.offset);
for child_fruit in child_fruits {
for (feature, doc) in child_fruit {
top_collector.push(feature, doc);
}
}
Ok(top_collector
.into_sorted_vec()
.into_iter()
.skip(self.offset)
.map(|cdoc| (cdoc.feature, cdoc.doc))
.collect())
} else {
let mut top_collector: TopNComputer<_, _, false> =
TopNComputer::new(self.limit + self.offset);
for child_fruit in child_fruits {
for (feature, doc) in child_fruit {
top_collector.push(feature, doc);
}
}
Ok(top_collector
.into_sorted_vec()
.into_iter()
.skip(self.offset)
.map(|cdoc| (cdoc.feature, cdoc.doc))
.collect())
}
}
}
struct StringConvertSegmentCollector {
pub collector: CustomScoreTopSegmentCollector<ScorerByFastFieldReader, u64>,
ff: StrColumn,
order: Order,
}
impl SegmentCollector for StringConvertSegmentCollector {
type Fruit = Vec<(String, DocAddress)>;
fn collect(&mut self, doc: DocId, score: Score) {
self.collector.collect(doc, score);
}
fn harvest(self) -> Vec<(String, DocAddress)> {
let top_ordinals: Vec<(TermOrdinal, DocAddress)> = self.collector.harvest();
// Collect terms.
let mut terms: Vec<String> = Vec::with_capacity(top_ordinals.len());
let result = if self.order.is_asc() {
self.ff.dictionary().sorted_ords_to_term_cb(
top_ordinals.iter().map(|(term_ord, _)| u64::MAX - term_ord),
|term| {
terms.push(
std::str::from_utf8(term)
.expect("Failed to decode term as unicode")
.to_owned(),
);
Ok(())
},
)
} else {
self.ff.dictionary().sorted_ords_to_term_cb(
top_ordinals.iter().rev().map(|(term_ord, _)| *term_ord),
|term| {
terms.push(
std::str::from_utf8(term)
.expect("Failed to decode term as unicode")
.to_owned(),
);
Ok(())
},
)
};
assert!(
result.expect("Failed to read terms from term dictionary"),
"Not all terms were matched in segment."
);
// Zip them back with their docs.
if self.order.is_asc() {
terms
.into_iter()
.zip(top_ordinals)
.map(|(term, (_, doc))| (term, doc))
.collect()
} else {
terms
.into_iter()
.rev()
.zip(top_ordinals)
.map(|(term, (_, doc))| (term, doc))
.collect()
}
}
}
/// The `TopDocs` collector keeps track of the top `K` documents
/// sorted by their score.
///
@@ -570,30 +410,6 @@ impl TopDocs {
}
}
/// Like `order_by_fast_field`, but for a `String` fast field.
pub fn order_by_string_fast_field(
self,
fast_field: impl ToString,
order: Order,
) -> impl Collector<Fruit = Vec<(String, DocAddress)>> {
let limit = self.0.limit;
let offset = self.0.offset;
let u64_collector = CustomScoreTopCollector::new(
ScorerByField {
field: fast_field.to_string(),
order: order.clone(),
},
self.0.into_tscore(),
);
StringConvertCollector {
collector: u64_collector,
field: fast_field.to_string(),
order,
limit,
offset,
}
}
/// Ranks the documents using a custom score.
///
/// This method offers a convenient way to tweak or replace
@@ -970,7 +786,7 @@ impl<Score, D, const R: bool> From<TopNComputerDeser<Score, D, R>> for TopNCompu
}
}
impl<Score, D, const REVERSE_ORDER: bool> TopNComputer<Score, D, REVERSE_ORDER>
impl<Score, D, const R: bool> TopNComputer<Score, D, R>
where
Score: PartialOrd + Clone,
D: Ord,
@@ -991,10 +807,7 @@ where
#[inline]
pub fn push(&mut self, feature: Score, doc: D) {
if let Some(last_median) = self.threshold.clone() {
if !REVERSE_ORDER && feature > last_median {
return;
}
if REVERSE_ORDER && feature < last_median {
if feature < last_median {
return;
}
}
@@ -1029,7 +842,7 @@ where
}
/// Returns the top n elements in sorted order.
pub fn into_sorted_vec(mut self) -> Vec<ComparableDoc<Score, D, REVERSE_ORDER>> {
pub fn into_sorted_vec(mut self) -> Vec<ComparableDoc<Score, D, R>> {
if self.buffer.len() > self.top_n {
self.truncate_top_n();
}
@@ -1040,7 +853,7 @@ where
/// Returns the top n elements in stored order.
/// Useful if you do not need the elements in sorted order,
/// for example when merging the results of multiple segments.
pub fn into_vec(mut self) -> Vec<ComparableDoc<Score, D, REVERSE_ORDER>> {
pub fn into_vec(mut self) -> Vec<ComparableDoc<Score, D, R>> {
if self.buffer.len() > self.top_n {
self.truncate_top_n();
}
@@ -1050,11 +863,9 @@ where
#[cfg(test)]
mod tests {
use proptest::prelude::*;
use super::{TopDocs, TopNComputer};
use crate::collector::top_collector::ComparableDoc;
use crate::collector::{Collector, DocSetCollector};
use crate::collector::Collector;
use crate::query::{AllQuery, Query, QueryParser};
use crate::schema::{Field, Schema, FAST, STORED, TEXT};
use crate::time::format_description::well_known::Rfc3339;
@@ -1149,44 +960,6 @@ mod tests {
}
}
proptest! {
#[test]
fn test_topn_computer_asc_prop(
limit in 0..10_usize,
docs in proptest::collection::vec((0..100_u64, 0..100_u64), 0..100_usize),
) {
let mut computer: TopNComputer<_, _, false> = TopNComputer::new(limit);
for (feature, doc) in &docs {
computer.push(*feature, *doc);
}
let mut comparable_docs = docs.into_iter().map(|(feature, doc)| ComparableDoc { feature, doc }).collect::<Vec<_>>();
comparable_docs.sort();
comparable_docs.truncate(limit);
prop_assert_eq!(
computer.into_sorted_vec(),
comparable_docs,
);
}
#[test]
fn test_topn_computer_desc_prop(
limit in 0..10_usize,
docs in proptest::collection::vec((0..100_u64, 0..100_u64), 0..100_usize),
) {
let mut computer: TopNComputer<_, _, true> = TopNComputer::new(limit);
for (feature, doc) in &docs {
computer.push(*feature, *doc);
}
let mut comparable_docs = docs.into_iter().map(|(feature, doc)| ComparableDoc { feature, doc }).collect::<Vec<_>>();
comparable_docs.sort();
comparable_docs.truncate(limit);
prop_assert_eq!(
computer.into_sorted_vec(),
comparable_docs,
);
}
}
#[test]
fn test_top_collector_not_at_capacity_without_offset() -> crate::Result<()> {
let index = make_index()?;
@@ -1293,220 +1066,6 @@ mod tests {
assert_eq!(page_0, &page_2[..page_0.len()]);
}
proptest! {
#![proptest_config(ProptestConfig::with_cases(20))]
/// Build multiple segments with equal-scoring docs and verify stable ordering
/// across pages when increasing limit or offset.
#[test]
fn proptest_stable_ordering_across_segments_with_pagination(
docs_per_segment in proptest::collection::vec(1usize..50, 2..5)
) {
use crate::indexer::NoMergePolicy;
// Build an index with multiple segments; all docs will have the same score using AllQuery.
let mut schema_builder = Schema::builder();
let text = schema_builder.add_text_field("text", TEXT);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut writer = index.writer_for_tests().unwrap();
writer.set_merge_policy(Box::new(NoMergePolicy));
for num_docs in &docs_per_segment {
for _ in 0..*num_docs {
writer.add_document(doc!(text => "x")).unwrap();
}
writer.commit().unwrap();
}
let reader = index.reader().unwrap();
let searcher = reader.searcher();
let total_docs: usize = docs_per_segment.iter().sum();
// Full result set, first assert all scores are identical.
let full_with_scores: Vec<(Score, DocAddress)> = searcher
.search(&AllQuery, &TopDocs::with_limit(total_docs))
.unwrap();
// Sanity: at least one document was returned.
prop_assert!(!full_with_scores.is_empty());
let first_score = full_with_scores[0].0;
prop_assert!(full_with_scores.iter().all(|(score, _)| *score == first_score));
// Keep only the addresses for the remaining checks.
let full: Vec<DocAddress> = full_with_scores
.into_iter()
.map(|(_score, addr)| addr)
.collect();
// Sanity: we actually created multiple segments and have documents.
prop_assert!(docs_per_segment.len() >= 2);
prop_assert!(total_docs >= 2);
// 1) Increasing limit should preserve prefix ordering.
for k in 1..=total_docs {
let page: Vec<DocAddress> = searcher
.search(&AllQuery, &TopDocs::with_limit(k))
.unwrap()
.into_iter()
.map(|(_score, addr)| addr)
.collect();
prop_assert_eq!(page, full[..k].to_vec());
}
// 2) Offset + limit pages should always match the corresponding slice.
// For each offset, check three representative page sizes:
// - first page (size 1)
// - a middle page (roughly half of remaining)
// - the last page (size = remaining)
for offset in 0..total_docs {
let remaining = total_docs - offset;
let assert_page_eq = |limit: usize| -> proptest::test_runner::TestCaseResult {
let page: Vec<DocAddress> = searcher
.search(&AllQuery, &TopDocs::with_limit(limit).and_offset(offset))
.unwrap()
.into_iter()
.map(|(_score, addr)| addr)
.collect();
prop_assert_eq!(page, full[offset..offset + limit].to_vec());
Ok(())
};
// Smallest page.
assert_page_eq(1)?;
// A middle-sized page (dedupes to 1 if remaining == 1).
assert_page_eq((remaining / 2).max(1))?;
// Largest page for this offset.
assert_page_eq(remaining)?;
}
// 3) Concatenating fixed-size pages by offset reproduces the full order.
for page_size in 1..=total_docs.min(5) {
let mut concat: Vec<DocAddress> = Vec::new();
let mut offset = 0;
while offset < total_docs {
let size = page_size.min(total_docs - offset);
let page: Vec<DocAddress> = searcher
.search(&AllQuery, &TopDocs::with_limit(size).and_offset(offset))
.unwrap()
.into_iter()
.map(|(_score, addr)| addr)
.collect();
concat.extend(page);
offset += size;
}
// Avoid moving `full` across loop iterations.
prop_assert_eq!(concat, full.clone());
}
}
}
proptest! {
#![proptest_config(ProptestConfig::with_cases(20))]
/// Build multiple segments with same-scoring term matches and verify stable ordering
/// across pages for a real scoring query (TermQuery with identical TF and fieldnorm).
#[test]
fn proptest_stable_ordering_across_segments_with_term_query_and_pagination(
docs_per_segment in proptest::collection::vec(1usize..50, 2..5)
) {
use crate::indexer::NoMergePolicy;
use crate::schema::IndexRecordOption;
use crate::query::TermQuery;
use crate::Term;
// Build an index with multiple segments; each doc has exactly one token "x",
// ensuring equal BM25 scores across all matching docs (same TF=1 and fieldnorm=1).
let mut schema_builder = Schema::builder();
let text = schema_builder.add_text_field("text", TEXT);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut writer = index.writer_for_tests().unwrap();
writer.set_merge_policy(Box::new(NoMergePolicy));
for num_docs in &docs_per_segment {
for _ in 0..*num_docs {
writer.add_document(doc!(text => "x")).unwrap();
}
writer.commit().unwrap();
}
let reader = index.reader().unwrap();
let searcher = reader.searcher();
let total_docs: usize = docs_per_segment.iter().sum();
let term = Term::from_field_text(text, "x");
let tq = TermQuery::new(term, IndexRecordOption::WithFreqs);
// Full result set, first assert all scores are identical across docs.
let full_with_scores: Vec<(Score, DocAddress)> = searcher
.search(&tq, &TopDocs::with_limit(total_docs))
.unwrap();
// Sanity: at least one document was returned.
prop_assert!(!full_with_scores.is_empty());
let first_score = full_with_scores[0].0;
prop_assert!(full_with_scores.iter().all(|(score, _)| *score == first_score));
// Keep only the addresses for the remaining checks.
let full: Vec<DocAddress> = full_with_scores
.into_iter()
.map(|(_score, addr)| addr)
.collect();
// Sanity: we actually created multiple segments and have documents.
prop_assert!(docs_per_segment.len() >= 2);
prop_assert!(total_docs >= 2);
// 1) Increasing limit should preserve prefix ordering.
for k in 1..=total_docs {
let page: Vec<DocAddress> = searcher
.search(&tq, &TopDocs::with_limit(k))
.unwrap()
.into_iter()
.map(|(_score, addr)| addr)
.collect();
prop_assert_eq!(page, full[..k].to_vec());
}
// 2) Offset + limit pages should always match the corresponding slice.
// Check three representative page sizes for each offset: 1, ~half, and remaining.
for offset in 0..total_docs {
let remaining = total_docs - offset;
let assert_page_eq = |limit: usize| -> proptest::test_runner::TestCaseResult {
let page: Vec<DocAddress> = searcher
.search(&tq, &TopDocs::with_limit(limit).and_offset(offset))
.unwrap()
.into_iter()
.map(|(_score, addr)| addr)
.collect();
prop_assert_eq!(page, full[offset..offset + limit].to_vec());
Ok(())
};
assert_page_eq(1)?;
assert_page_eq((remaining / 2).max(1))?;
assert_page_eq(remaining)?;
}
// 3) Concatenating fixed-size pages by offset reproduces the full order.
for page_size in 1..=total_docs.min(5) {
let mut concat: Vec<DocAddress> = Vec::new();
let mut offset = 0;
while offset < total_docs {
let size = page_size.min(total_docs - offset);
let page: Vec<DocAddress> = searcher
.search(&tq, &TopDocs::with_limit(size).and_offset(offset))
.unwrap()
.into_iter()
.map(|(_score, addr)| addr)
.collect();
concat.extend(page);
offset += size;
}
prop_assert_eq!(concat, full.clone());
}
}
}
#[test]
#[should_panic]
fn test_top_0() {
@@ -1655,160 +1214,6 @@ mod tests {
Ok(())
}
#[test]
fn test_top_field_collector_string() -> crate::Result<()> {
let mut schema_builder = Schema::builder();
let city = schema_builder.add_text_field("city", TEXT | FAST);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut index_writer = index.writer_for_tests()?;
index_writer.add_document(doc!(
city => "austin",
))?;
index_writer.add_document(doc!(
city => "greenville",
))?;
index_writer.add_document(doc!(
city => "tokyo",
))?;
index_writer.commit()?;
fn query(
index: &Index,
order: Order,
limit: usize,
offset: usize,
) -> crate::Result<Vec<(String, DocAddress)>> {
let searcher = index.reader()?.searcher();
let top_collector = TopDocs::with_limit(limit)
.and_offset(offset)
.order_by_string_fast_field("city", order);
searcher.search(&AllQuery, &top_collector)
}
assert_eq!(
&query(&index, Order::Desc, 3, 0)?,
&[
("tokyo".to_owned(), DocAddress::new(0, 2)),
("greenville".to_owned(), DocAddress::new(0, 1)),
("austin".to_owned(), DocAddress::new(0, 0)),
]
);
assert_eq!(
&query(&index, Order::Desc, 2, 0)?,
&[
("tokyo".to_owned(), DocAddress::new(0, 2)),
("greenville".to_owned(), DocAddress::new(0, 1)),
]
);
assert_eq!(&query(&index, Order::Desc, 3, 3)?, &[]);
assert_eq!(
&query(&index, Order::Desc, 2, 1)?,
&[
("greenville".to_owned(), DocAddress::new(0, 1)),
("austin".to_owned(), DocAddress::new(0, 0)),
]
);
assert_eq!(
&query(&index, Order::Asc, 3, 0)?,
&[
("austin".to_owned(), DocAddress::new(0, 0)),
("greenville".to_owned(), DocAddress::new(0, 1)),
("tokyo".to_owned(), DocAddress::new(0, 2)),
]
);
assert_eq!(
&query(&index, Order::Asc, 2, 1)?,
&[
("greenville".to_owned(), DocAddress::new(0, 1)),
("tokyo".to_owned(), DocAddress::new(0, 2)),
]
);
assert_eq!(
&query(&index, Order::Asc, 2, 0)?,
&[
("austin".to_owned(), DocAddress::new(0, 0)),
("greenville".to_owned(), DocAddress::new(0, 1)),
]
);
assert_eq!(&query(&index, Order::Asc, 3, 3)?, &[]);
Ok(())
}
proptest! {
#[test]
fn test_top_field_collect_string_prop(
order in prop_oneof!(Just(Order::Desc), Just(Order::Asc)),
limit in 1..256_usize,
offset in 0..256_usize,
segments_terms in
proptest::collection::vec(
proptest::collection::vec(0..32_u8, 1..32_usize),
0..8_usize,
)
) {
let mut schema_builder = Schema::builder();
let city = schema_builder.add_text_field("city", TEXT | FAST);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut index_writer = index.writer_for_tests()?;
// A Vec<Vec<u8>>, where the outer Vec represents segments, and the inner Vec
// represents terms.
for segment_terms in segments_terms.into_iter() {
for term in segment_terms.into_iter() {
let term = format!("{term:0>3}");
index_writer.add_document(doc!(
city => term,
))?;
}
index_writer.commit()?;
}
let searcher = index.reader()?.searcher();
let top_n_results = searcher.search(&AllQuery, &TopDocs::with_limit(limit)
.and_offset(offset)
.order_by_string_fast_field("city", order.clone()))?;
let all_results = searcher.search(&AllQuery, &DocSetCollector)?.into_iter().map(|doc_address| {
// Get the term for this address.
// NOTE: We can't determine the SegmentIds that will be generated for Segments
// ahead of time, so we can't pre-compute the expected `DocAddress`es.
let column = searcher.segment_readers()[doc_address.segment_ord as usize].fast_fields().str("city").unwrap().unwrap();
let term_ord = column.term_ords(doc_address.doc_id).next().unwrap();
let mut city = Vec::new();
column.dictionary().ord_to_term(term_ord, &mut city).unwrap();
(String::try_from(city).unwrap(), doc_address)
});
// Using the TopDocs collector should always be equivalent to sorting, skipping the
// offset, and then taking the limit.
let sorted_docs: Vec<_> = if order.is_desc() {
let mut comparable_docs: Vec<ComparableDoc<_, _, true>> =
all_results.into_iter().map(|(feature, doc)| ComparableDoc { feature, doc}).collect();
comparable_docs.sort();
comparable_docs.into_iter().map(|cd| (cd.feature, cd.doc)).collect()
} else {
let mut comparable_docs: Vec<ComparableDoc<_, _, false>> =
all_results.into_iter().map(|(feature, doc)| ComparableDoc { feature, doc}).collect();
comparable_docs.sort();
comparable_docs.into_iter().map(|cd| (cd.feature, cd.doc)).collect()
};
let expected_docs = sorted_docs.into_iter().skip(offset).take(limit).collect::<Vec<_>>();
prop_assert_eq!(
expected_docs,
top_n_results
);
}
}
#[test]
#[should_panic]
fn test_field_does_not_exist() {
@@ -1968,29 +1373,4 @@ mod tests {
);
Ok(())
}
#[test]
fn test_topn_computer_asc() {
let mut computer: TopNComputer<u32, u32, false> = TopNComputer::new(2);
computer.push(1u32, 1u32);
computer.push(2u32, 2u32);
computer.push(3u32, 3u32);
computer.push(2u32, 4u32);
computer.push(4u32, 5u32);
computer.push(1u32, 6u32);
assert_eq!(
computer.into_sorted_vec(),
&[
ComparableDoc {
feature: 1u32,
doc: 1u32,
},
ComparableDoc {
feature: 1u32,
doc: 6u32,
}
]
);
}
}

View File

@@ -30,7 +30,7 @@ fn create_format() {
}
fn path_for_version(version: &str) -> String {
format!("./tests/compat_tests_data/index_v{version}/")
format!("./tests/compat_tests_data/index_v{}/", version)
}
/// feature flag quickwit uses a different dictionary type

View File

@@ -41,12 +41,16 @@ impl Executor {
///
/// Regardless of the executor (`SingleThread` or `ThreadPool`), panics in the task
/// will propagate to the caller.
pub fn map<A, R, F>(&self, f: F, args: impl Iterator<Item = A>) -> crate::Result<Vec<R>>
where
pub fn map<
A: Send,
R: Send,
AIterator: Iterator<Item = A>,
F: Sized + Sync + Fn(A) -> crate::Result<R>,
{
>(
&self,
f: F,
args: AIterator,
) -> crate::Result<Vec<R>> {
match self {
Executor::SingleThread => args.map(f).collect::<crate::Result<_>>(),
Executor::ThreadPool(pool) => {
@@ -65,7 +69,8 @@ impl Executor {
if let Err(err) = fruit_sender_ref.send((idx, fruit)) {
error!(
"Failed to send search task. It probably means all search \
threads have panicked. {err:?}"
threads have panicked. {:?}",
err
);
}
});

View File

@@ -1,4 +1,3 @@
use columnar::NumericalValue;
use common::json_path_writer::{JSON_END_OF_PATH, JSON_PATH_SEGMENT_SEP};
use common::{replace_in_place, JsonPathWriter};
use rustc_hash::FxHashMap;
@@ -153,7 +152,7 @@ pub(crate) fn index_json_value<'a, V: Value<'a>>(
if let Ok(i64_val) = val.try_into() {
term_buffer.append_type_and_fast_value::<i64>(i64_val);
} else {
term_buffer.append_type_and_fast_value::<u64>(val);
term_buffer.append_type_and_fast_value(val);
}
postings_writer.subscribe(doc, 0u32, term_buffer, ctx);
}
@@ -167,30 +166,12 @@ pub(crate) fn index_json_value<'a, V: Value<'a>>(
postings_writer.subscribe(doc, 0u32, term_buffer, ctx);
}
ReferenceValueLeaf::F64(val) => {
if !val.is_finite() {
return;
};
set_path_id(
term_buffer,
ctx.path_to_unordered_id
.get_or_allocate_unordered_id(json_path_writer.as_str()),
);
// Normalize here is important.
// In the inverted index, we coerce all numerical values to their canonical
// representation.
//
// (We do the same thing on the query side)
match NumericalValue::F64(val).normalize() {
NumericalValue::I64(val_i64) => {
term_buffer.append_type_and_fast_value::<i64>(val_i64);
}
NumericalValue::U64(val_u64) => {
term_buffer.append_type_and_fast_value::<u64>(val_u64);
}
NumericalValue::F64(val_f64) => {
term_buffer.append_type_and_fast_value::<f64>(val_f64);
}
}
term_buffer.append_type_and_fast_value(val);
postings_writer.subscribe(doc, 0u32, term_buffer, ctx);
}
ReferenceValueLeaf::Bool(val) => {
@@ -260,8 +241,8 @@ pub(crate) fn index_json_value<'a, V: Value<'a>>(
///
/// The term must be json + JSON path.
pub fn convert_to_fast_value_and_append_to_json_term(
term: &Term,
text: &str,
mut term: Term,
phrase: &str,
truncate_date_for_search: bool,
) -> Option<Term> {
assert_eq!(
@@ -273,50 +254,31 @@ pub fn convert_to_fast_value_and_append_to_json_term(
0,
"JSON value bytes should be empty"
);
try_convert_to_datetime_and_append_to_json_term(term, text, truncate_date_for_search)
.or_else(|| try_convert_to_number_and_append_to_json_term(term, text))
.or_else(|| try_convert_to_bool_and_append_to_json_term_typed(term, text))
}
fn try_convert_to_datetime_and_append_to_json_term(
term: &Term,
text: &str,
truncate_date_for_search: bool,
) -> Option<Term> {
let dt = OffsetDateTime::parse(text, &Rfc3339).ok()?;
let mut dt = DateTime::from_utc(dt.to_offset(UtcOffset::UTC));
if truncate_date_for_search {
dt = dt.truncate(DATE_TIME_PRECISION_INDEXED);
if let Ok(dt) = OffsetDateTime::parse(phrase, &Rfc3339) {
let mut dt = DateTime::from_utc(dt.to_offset(UtcOffset::UTC));
if truncate_date_for_search {
dt = dt.truncate(DATE_TIME_PRECISION_INDEXED);
}
term.append_type_and_fast_value(dt);
return Some(term);
}
let mut term_clone = term.clone();
term_clone.append_type_and_fast_value(dt);
Some(term_clone)
}
fn try_convert_to_number_and_append_to_json_term(term: &Term, text: &str) -> Option<Term> {
let numerical_value: NumericalValue = str::parse::<NumericalValue>(text).ok()?;
let mut term_clone = term.clone();
// Parse is actually returning normalized values already today, but let's not
// not rely on that hidden contract.
match numerical_value.normalize() {
NumericalValue::I64(i64_value) => {
term_clone.append_type_and_fast_value::<i64>(i64_value);
}
NumericalValue::U64(u64_value) => {
term_clone.append_type_and_fast_value::<u64>(u64_value);
}
NumericalValue::F64(f64_value) => {
term_clone.append_type_and_fast_value::<f64>(f64_value);
}
if let Ok(i64_val) = str::parse::<i64>(phrase) {
term.append_type_and_fast_value(i64_val);
return Some(term);
}
Some(term_clone)
}
fn try_convert_to_bool_and_append_to_json_term_typed(term: &Term, text: &str) -> Option<Term> {
let val = str::parse::<bool>(text).ok()?;
let mut term_clone = term.clone();
term_clone.append_type_and_fast_value(val);
Some(term_clone)
if let Ok(u64_val) = str::parse::<u64>(phrase) {
term.append_type_and_fast_value(u64_val);
return Some(term);
}
if let Ok(f64_val) = str::parse::<f64>(phrase) {
term.append_type_and_fast_value(f64_val);
return Some(term);
}
if let Ok(bool_val) = str::parse::<bool>(phrase) {
term.append_type_and_fast_value(bool_val);
return Some(term);
}
None
}
/// Splits a json path supplied to the query parser in such a way that

View File

@@ -214,7 +214,7 @@ impl Searcher {
/// It is powerless at making search faster if your index consists in
/// one large segment.
///
/// Also, keep in mind multithreading a single query on several
/// Also, keep in my multithreading a single query on several
/// threads will not improve your throughput. It can actually
/// hurt it. It will however, decrease the average response time.
pub fn search_with_executor<C: Collector>(

View File

@@ -56,7 +56,7 @@ impl<T: Send + Sync + 'static> From<Box<T>> for DirectoryLock {
impl Drop for DirectoryLockGuard {
fn drop(&mut self) {
if let Err(e) = self.directory.delete(&self.path) {
error!("Failed to remove the lock file. {e:?}");
error!("Failed to remove the lock file. {:?}", e);
}
}
}

View File

@@ -51,7 +51,7 @@ impl FileWatcher {
.map(|current_checksum| current_checksum != checksum)
.unwrap_or(true);
if metafile_has_changed {
info!("Meta file {path:?} was modified");
info!("Meta file {:?} was modified", path);
current_checksum_opt = Some(checksum);
// We actually ignore callbacks failing here.
// We just wait for the end of their execution.
@@ -75,7 +75,7 @@ impl FileWatcher {
let reader = match fs::File::open(path) {
Ok(f) => io::BufReader::new(f),
Err(e) => {
warn!("Failed to open meta file {path:?}: {e:?}");
warn!("Failed to open meta file {:?}: {:?}", path, e);
return Err(e);
}
};

View File

@@ -157,7 +157,7 @@ impl ManagedDirectory {
for file_to_delete in files_to_delete {
match self.delete(&file_to_delete) {
Ok(_) => {
info!("Deleted {file_to_delete:?}");
info!("Deleted {:?}", file_to_delete);
deleted_files.push(file_to_delete);
}
Err(file_error) => {
@@ -170,7 +170,7 @@ impl ManagedDirectory {
if !cfg!(target_os = "windows") {
// On windows, delete is expected to fail if the file
// is mmapped.
error!("Failed to delete {file_to_delete:?}");
error!("Failed to delete {:?}", file_to_delete);
}
}
}

View File

@@ -7,7 +7,7 @@ use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock, Weak};
use common::StableDeref;
use fs4::fs_std::FileExt;
use fs4::FileExt;
#[cfg(all(feature = "mmap", unix))]
pub use memmap2::Advice;
use memmap2::Mmap;
@@ -29,7 +29,7 @@ pub type WeakArcBytes = Weak<dyn Deref<Target = [u8]> + Send + Sync + 'static>;
/// Create a default io error given a string.
pub(crate) fn make_io_err(msg: String) -> io::Error {
io::Error::other(msg)
io::Error::new(io::ErrorKind::Other, msg)
}
/// Returns `None` iff the file exists, can be read, but is empty (and hence
@@ -369,7 +369,7 @@ pub(crate) fn atomic_write(path: &Path, content: &[u8]) -> io::Result<()> {
impl Directory for MmapDirectory {
fn get_file_handle(&self, path: &Path) -> Result<Arc<dyn FileHandle>, OpenReadError> {
debug!("Open Read {path:?}");
debug!("Open Read {:?}", path);
let full_path = self.resolve_path(path);
let mut mmap_cache = self.inner.mmap_cache.write().map_err(|_| {
@@ -414,7 +414,7 @@ impl Directory for MmapDirectory {
}
fn open_write(&self, path: &Path) -> Result<WritePtr, OpenWriteError> {
debug!("Open Write {path:?}");
debug!("Open Write {:?}", path);
let full_path = self.resolve_path(path);
let open_res = OpenOptions::new()
@@ -467,7 +467,7 @@ impl Directory for MmapDirectory {
}
fn atomic_write(&self, path: &Path, content: &[u8]) -> io::Result<()> {
debug!("Atomic Write {path:?}");
debug!("Atomic Write {:?}", path);
let full_path = self.resolve_path(path);
atomic_write(&full_path, content)?;
Ok(())
@@ -484,8 +484,8 @@ impl Directory for MmapDirectory {
.map_err(LockError::wrap_io_error)?;
if lock.is_blocking {
file.lock_exclusive().map_err(LockError::wrap_io_error)?;
} else if !file.try_lock_exclusive().map_err(|_| LockError::LockBusy)? {
return Err(LockError::LockBusy);
} else {
file.try_lock_exclusive().map_err(|_| LockError::LockBusy)?
}
// dropping the file handle will release the lock.
Ok(DirectoryLock::from(Box::new(ReleaseLockFile {

View File

@@ -191,7 +191,7 @@ impl Directory for RamDirectory {
.fs
.read()
.map_err(|e| OpenReadError::IoError {
io_error: Arc::new(io::Error::other(e.to_string())),
io_error: Arc::new(io::Error::new(io::ErrorKind::Other, e.to_string())),
filepath: path.to_path_buf(),
})?
.exists(path))

View File

@@ -90,7 +90,10 @@ impl WatchCallbackList {
let _ = sender.send(Ok(()));
});
if let Err(err) = spawn_res {
error!("Failed to spawn thread to call watch callbacks. Cause: {err:?}");
error!(
"Failed to spawn thread to call watch callbacks. Cause: {:?}",
err
);
}
result
}

View File

@@ -942,7 +942,7 @@ mod tests {
let numbers = [100, 200, 300];
let test_range = |range: RangeInclusive<u64>| {
let expected_count = numbers.iter().filter(|num| range.contains(*num)).count();
let expected_count = numbers.iter().filter(|num| range.contains(num)).count();
let mut vec = vec![];
field.get_row_ids_for_value_range(range, 0..u32::MAX, &mut vec);
assert_eq!(vec.len(), expected_count);
@@ -1020,7 +1020,7 @@ mod tests {
let numbers = [1000, 1001, 1003];
let test_range = |range: RangeInclusive<u64>| {
let expected_count = numbers.iter().filter(|num| range.contains(*num)).count();
let expected_count = numbers.iter().filter(|num| range.contains(num)).count();
let mut vec = vec![];
field.get_row_ids_for_value_range(range, 0..u32::MAX, &mut vec);
assert_eq!(vec.len(), expected_count);

View File

@@ -216,7 +216,7 @@ impl IndexBuilder {
/// Opens or creates a new index in the provided directory
pub fn open_or_create<T: Into<Box<dyn Directory>>>(self, dir: T) -> crate::Result<Index> {
let dir: Box<dyn Directory> = dir.into();
let dir = dir.into();
if !Index::exists(&*dir)? {
return self.create(dir);
}
@@ -494,7 +494,7 @@ impl Index {
.into_iter()
.map(|segment| SegmentReader::open(&segment)?.fields_metadata())
.collect::<Result<_, _>>()?;
Ok(merge_field_meta_data(fields_metadata))
Ok(merge_field_meta_data(fields_metadata, &self.schema()))
}
/// Creates a new segment_meta (Advanced user only).

View File

@@ -1,7 +1,8 @@
use std::io;
use common::json_path_writer::JSON_END_OF_PATH;
use common::{BinarySerializable, ByteCount};
use common::BinarySerializable;
use fnv::FnvHashSet;
#[cfg(feature = "quickwit")]
use futures_util::{FutureExt, StreamExt, TryStreamExt};
#[cfg(feature = "quickwit")]
@@ -35,33 +36,6 @@ pub struct InvertedIndexReader {
total_num_tokens: u64,
}
/// Object that records the amount of space used by a field in an inverted index.
pub(crate) struct InvertedIndexFieldSpace {
pub field_name: String,
pub field_type: Type,
pub postings_size: ByteCount,
pub positions_size: ByteCount,
pub num_terms: u64,
}
/// Returns None if the term is not a valid JSON path.
fn extract_field_name_and_field_type_from_json_path(term: &[u8]) -> Option<(String, Type)> {
let index = term.iter().position(|&byte| byte == JSON_END_OF_PATH)?;
let field_type_code = term.get(index + 1).copied()?;
let field_type = Type::from_code(field_type_code)?;
// Let's flush the current field.
let field_name = String::from_utf8_lossy(&term[..index]).to_string();
Some((field_name, field_type))
}
impl InvertedIndexFieldSpace {
fn record(&mut self, term_info: &TermInfo) {
self.postings_size += ByteCount::from(term_info.posting_num_bytes() as u64);
self.positions_size += ByteCount::from(term_info.positions_num_bytes() as u64);
self.num_terms += 1;
}
}
impl InvertedIndexReader {
pub(crate) fn new(
termdict: TermDictionary,
@@ -107,56 +81,20 @@ impl InvertedIndexReader {
///
/// Notice: This requires a full scan and therefore **very expensive**.
/// TODO: Move to sstable to use the index.
pub(crate) fn list_encoded_json_fields(&self) -> io::Result<Vec<InvertedIndexFieldSpace>> {
pub fn list_encoded_fields(&self) -> io::Result<Vec<(String, Type)>> {
let mut stream = self.termdict.stream()?;
let mut fields: Vec<InvertedIndexFieldSpace> = Vec::new();
let mut current_field_opt: Option<InvertedIndexFieldSpace> = None;
// Current field bytes, including the JSON_END_OF_PATH.
let mut current_field_bytes: Vec<u8> = Vec::new();
while let Some((term, term_info)) = stream.next() {
if let Some(current_field) = &mut current_field_opt {
if term.starts_with(&current_field_bytes) {
// We are still in the same field.
current_field.record(term_info);
continue;
let mut fields = Vec::new();
let mut fields_set = FnvHashSet::default();
while let Some((term, _term_info)) = stream.next() {
if let Some(index) = term.iter().position(|&byte| byte == JSON_END_OF_PATH) {
if !fields_set.contains(&term[..index + 2]) {
fields_set.insert(term[..index + 2].to_vec());
let typ = Type::from_code(term[index + 1]).unwrap();
fields.push((String::from_utf8_lossy(&term[..index]).to_string(), typ));
}
}
// This is a new field!
// Let's flush the current field.
fields.extend(current_field_opt.take());
current_field_bytes.clear();
// And create a new one.
let Some((field_name, field_type)) =
extract_field_name_and_field_type_from_json_path(term)
else {
error!(
"invalid term bytes encountered {term:?}. this only happens if the term \
dictionary is corrupted. please report"
);
continue;
};
let mut field_space = InvertedIndexFieldSpace {
field_name,
field_type,
postings_size: ByteCount::default(),
positions_size: ByteCount::default(),
num_terms: 0u64,
};
field_space.record(term_info);
// We include the json type and the json end of path to make sure the prefix check
// is meaningful.
current_field_bytes.extend_from_slice(&term[..field_space.field_name.len() + 2]);
current_field_opt = Some(field_space);
}
// We need to flush the last field as well.
fields.extend(current_field_opt.take());
Ok(fields)
}

View File

@@ -1,8 +1,8 @@
use std::collections::HashMap;
use std::ops::BitOrAssign;
use std::sync::{Arc, RwLock};
use std::{fmt, io};
use common::{ByteCount, HasLen};
use fnv::FnvHashMap;
use itertools::Itertools;
@@ -304,16 +304,12 @@ impl SegmentReader {
for (field, field_entry) in self.schema().fields() {
let field_name = field_entry.name().to_string();
let is_indexed = field_entry.is_indexed();
if is_indexed {
let is_json = field_entry.field_type().value_type() == Type::Json;
if is_json {
let term_dictionary_json_field_num_bytes: u64 = self
.termdict_composite
.open_read(field)
.map(|file_slice| file_slice.len() as u64)
.unwrap_or(0u64);
let inv_index = self.inverted_index(field)?;
let encoded_fields_in_index = inv_index.list_encoded_json_fields()?;
let encoded_fields_in_index = inv_index.list_encoded_fields()?;
let mut build_path = |field_name: &str, mut json_path: String| {
// In this case we need to map the potential fast field to the field name
// accepted by the query parser.
@@ -332,65 +328,30 @@ impl SegmentReader {
format!("{field_name}.{json_path}")
}
};
let total_num_terms = encoded_fields_in_index
.iter()
.map(|field_space| field_space.num_terms)
.sum();
indexed_fields.extend(encoded_fields_in_index.into_iter().map(|field_space| {
let field_name = build_path(&field_name, field_space.field_name);
// It is complex to attribute the exact amount of bytes required by specific
// field in the json field. Instead, as a proxy, we
// attribute the total amount of bytes for the entire json field,
// proportionally to the number of terms in each
// fields.
let term_dictionary_size = (term_dictionary_json_field_num_bytes
* field_space.num_terms)
.checked_div(total_num_terms)
.unwrap_or(0);
FieldMetadata {
postings_size: Some(field_space.postings_size),
positions_size: Some(field_space.positions_size),
term_dictionary_size: Some(ByteCount::from(term_dictionary_size)),
fast_size: None,
// The stored flag will be set at the end of this function!
stored: field_entry.is_stored(),
field_name,
typ: field_space.field_type,
}
}));
indexed_fields.extend(
encoded_fields_in_index
.into_iter()
.map(|(name, typ)| (build_path(&field_name, name), typ))
.map(|(field_name, typ)| FieldMetadata {
indexed: true,
stored: false,
field_name,
fast: false,
typ,
}),
);
} else {
let postings_size: ByteCount = self
.postings_composite
.open_read(field)
.map(|posting_fileslice| posting_fileslice.len())
.unwrap_or(0)
.into();
let positions_size: ByteCount = self
.positions_composite
.open_read(field)
.map(|positions_fileslice| positions_fileslice.len())
.unwrap_or(0)
.into();
let term_dictionary_size: ByteCount = self
.termdict_composite
.open_read(field)
.map(|term_dictionary_fileslice| term_dictionary_fileslice.len())
.unwrap_or(0)
.into();
indexed_fields.push(FieldMetadata {
indexed: true,
stored: false,
field_name: field_name.to_string(),
fast: false,
typ: field_entry.field_type().value_type(),
// The stored flag will be set at the end of this function!
stored: field_entry.is_stored(),
fast_size: None,
term_dictionary_size: Some(term_dictionary_size),
postings_size: Some(postings_size),
positions_size: Some(positions_size),
});
}
}
}
let fast_fields: Vec<FieldMetadata> = self
let mut fast_fields: Vec<FieldMetadata> = self
.fast_fields()
.columnar()
.iter_columns()?
@@ -402,21 +363,23 @@ impl SegmentReader {
.get(&field_name)
.unwrap_or(&field_name)
.to_string();
let stored = is_field_stored(&field_name, &self.schema);
FieldMetadata {
indexed: false,
stored: false,
field_name,
fast: true,
typ: Type::from(handle.column_type()),
stored,
fast_size: Some(handle.num_bytes()),
term_dictionary_size: None,
postings_size: None,
positions_size: None,
}
})
.collect();
let merged_field_metadatas: Vec<FieldMetadata> =
merge_field_meta_data(vec![indexed_fields, fast_fields]);
Ok(merged_field_metadatas)
// Since the type is encoded differently in the fast field and in the inverted index,
// the order of the fields is not guaranteed to be the same. Therefore, we sort the fields.
// If we are sure that the order is the same, we can remove this sort.
indexed_fields.sort_unstable();
fast_fields.sort_unstable();
let merged = merge_field_meta_data(vec![indexed_fields, fast_fields], &self.schema);
Ok(merged)
}
/// Returns the segment id
@@ -480,47 +443,20 @@ pub struct FieldMetadata {
// Notice: Don't reorder the declaration of 1.field_name 2.typ, as it is used for ordering by
// field_name then typ.
pub typ: Type,
/// Is the field indexed for search
pub indexed: bool,
/// Is the field stored in the doc store
pub stored: bool,
/// Size occupied in the columnar storage (None if not fast)
pub fast_size: Option<ByteCount>,
/// term_dictionary
pub term_dictionary_size: Option<ByteCount>,
/// Size occupied in the index postings storage (None if not indexed)
pub postings_size: Option<ByteCount>,
/// Size occupied in the index postings storage (None if positions are not recorded)
pub positions_size: Option<ByteCount>,
/// Is the field stored in the columnar storage
pub fast: bool,
}
fn merge_options(left: Option<ByteCount>, right: Option<ByteCount>) -> Option<ByteCount> {
match (left, right) {
(Some(l), Some(r)) => Some(l + r),
(None, right) => right,
(left, None) => left,
}
}
impl FieldMetadata {
/// Returns true if and only if the field is indexed.
pub fn is_indexed(&self) -> bool {
self.postings_size.is_some()
}
/// Returns true if and only if the field is a fast field (i.e.: recorded in columnar format).
pub fn is_fast(&self) -> bool {
self.fast_size.is_some()
}
/// Merges two field metadata.
pub fn merge(&mut self, rhs: Self) {
assert_eq!(self.field_name, rhs.field_name);
assert_eq!(self.typ, rhs.typ);
impl BitOrAssign for FieldMetadata {
fn bitor_assign(&mut self, rhs: Self) {
assert!(self.field_name == rhs.field_name);
assert!(self.typ == rhs.typ);
self.indexed |= rhs.indexed;
self.stored |= rhs.stored;
self.fast_size = merge_options(self.fast_size, rhs.fast_size);
self.term_dictionary_size =
merge_options(self.term_dictionary_size, rhs.term_dictionary_size);
self.postings_size = merge_options(self.postings_size, rhs.postings_size);
self.positions_size = merge_options(self.positions_size, rhs.positions_size);
self.fast |= rhs.fast;
}
}
@@ -533,29 +469,23 @@ fn is_field_stored(field_name: &str, schema: &Schema) -> bool {
}
/// Helper to merge the field metadata from multiple segments.
pub fn merge_field_meta_data(mut field_metadatas: Vec<Vec<FieldMetadata>>) -> Vec<FieldMetadata> {
// READ BEFORE REMOVING THIS!
//
// Because we replace field sep by `.`, fields are not always sorted.
// Also, to enforce such an implicit contract, we would have to add
// assert here.
//
// Sorting is linear time on pre-sorted data, so we are simply better off sorting data here.
for field_metadatas in &mut field_metadatas {
field_metadatas.sort_unstable();
}
pub fn merge_field_meta_data(
field_metadatas: Vec<Vec<FieldMetadata>>,
schema: &Schema,
) -> Vec<FieldMetadata> {
let mut merged_field_metadata = Vec::new();
for (_key, mut group) in &field_metadatas
.into_iter()
.kmerge()
.kmerge_by(|left, right| left < right)
// TODO: Remove allocation
.chunk_by(|el| (el.field_name.to_string(), el.typ))
{
let mut merged: FieldMetadata = group.next().unwrap();
for el in group {
merged.merge(el);
merged |= el;
}
// Currently is_field_stored is maybe too slow for the high cardinality case
merged.stored = is_field_stored(&merged.field_name, schema);
merged_field_metadata.push(merged);
}
merged_field_metadata
@@ -577,7 +507,7 @@ fn intersect_alive_bitset(
}
impl fmt::Debug for SegmentReader {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "SegmentReader({:?})", self.segment_id)
}
}
@@ -586,168 +516,122 @@ impl fmt::Debug for SegmentReader {
mod test {
use super::*;
use crate::index::Index;
use crate::schema::{Term, STORED, TEXT};
use crate::schema::{SchemaBuilder, Term, STORED, TEXT};
use crate::IndexWriter;
#[track_caller]
fn assert_merge(fields_metadatas: &[Vec<FieldMetadata>], expected: &[FieldMetadata]) {
use itertools::Itertools;
let num_els = fields_metadatas.len();
for permutation in fields_metadatas.iter().cloned().permutations(num_els) {
let res = merge_field_meta_data(permutation);
assert_eq!(&res, &expected);
}
}
#[test]
fn test_merge_field_meta_data_same_field() {
fn test_merge_field_meta_data_same() {
let schema = SchemaBuilder::new().build();
let field_metadata1 = FieldMetadata {
field_name: "a".to_string(),
typ: crate::schema::Type::Str,
indexed: true,
stored: false,
term_dictionary_size: Some(ByteCount::from(100u64)),
postings_size: Some(ByteCount::from(1_000u64)),
positions_size: Some(ByteCount::from(2_000u64)),
fast_size: Some(ByteCount::from(1_000u64).into()),
fast: true,
};
let field_metadata2 = FieldMetadata {
field_name: "a".to_string(),
typ: crate::schema::Type::Str,
indexed: true,
stored: false,
term_dictionary_size: Some(ByteCount::from(80u64)),
postings_size: Some(ByteCount::from(1_500u64)),
positions_size: Some(ByteCount::from(2_500u64)),
fast_size: Some(ByteCount::from(3_000u64).into()),
fast: true,
};
let expected = FieldMetadata {
field_name: "a".to_string(),
typ: crate::schema::Type::Str,
stored: false,
term_dictionary_size: Some(ByteCount::from(180u64)),
postings_size: Some(ByteCount::from(2_500u64)),
positions_size: Some(ByteCount::from(4_500u64)),
fast_size: Some(ByteCount::from(4_000u64).into()),
};
assert_merge(
&[vec![field_metadata1.clone()], vec![field_metadata2]],
&[expected],
let res = merge_field_meta_data(
vec![vec![field_metadata1.clone()], vec![field_metadata2]],
&schema,
);
assert_eq!(res, vec![field_metadata1]);
}
#[track_caller]
#[test]
fn test_merge_field_meta_data_different() {
let schema = SchemaBuilder::new().build();
let field_metadata1 = FieldMetadata {
field_name: "a".to_string(),
typ: crate::schema::Type::Str,
indexed: false,
stored: false,
fast_size: Some(1_000u64.into()),
term_dictionary_size: Some(100u64.into()),
postings_size: Some(2_000u64.into()),
positions_size: Some(4_000u64.into()),
fast: true,
};
let field_metadata2 = FieldMetadata {
field_name: "b".to_string(),
typ: crate::schema::Type::Str,
indexed: false,
stored: false,
fast_size: Some(1_002u64.into()),
term_dictionary_size: None,
postings_size: None,
positions_size: None,
fast: true,
};
let field_metadata3 = FieldMetadata {
field_name: "a".to_string(),
typ: crate::schema::Type::Str,
term_dictionary_size: Some(101u64.into()),
postings_size: Some(2_001u64.into()),
positions_size: Some(4_001u64.into()),
indexed: true,
stored: false,
fast_size: None,
fast: false,
};
let expected = vec![
FieldMetadata {
field_name: "a".to_string(),
typ: crate::schema::Type::Str,
stored: false,
term_dictionary_size: Some(201u64.into()),
postings_size: Some(4_001u64.into()),
positions_size: Some(8_001u64.into()),
fast_size: Some(1_000u64.into()),
},
FieldMetadata {
field_name: "b".to_string(),
typ: crate::schema::Type::Str,
stored: false,
term_dictionary_size: None,
postings_size: None,
positions_size: None,
fast_size: Some(1_002u64.into()),
},
];
assert_merge(
&[
let res = merge_field_meta_data(
vec![
vec![field_metadata1.clone(), field_metadata2.clone()],
vec![field_metadata3],
],
&expected,
&schema,
);
let field_metadata_expected1 = FieldMetadata {
field_name: "a".to_string(),
typ: crate::schema::Type::Str,
indexed: true,
stored: false,
fast: true,
};
assert_eq!(res, vec![field_metadata_expected1, field_metadata2.clone()]);
}
#[test]
fn test_merge_field_meta_data_merge() {
use pretty_assertions::assert_eq;
let get_meta_data = |name: &str, typ: Type| FieldMetadata {
field_name: name.to_string(),
typ,
term_dictionary_size: None,
postings_size: None,
positions_size: None,
indexed: false,
stored: false,
fast_size: Some(1u64.into()),
fast: true,
};
let metas = vec![get_meta_data("d", Type::Str), get_meta_data("e", Type::U64)];
assert_merge(
&[vec![get_meta_data("e", Type::Str)], metas],
&[
let schema = SchemaBuilder::new().build();
let mut metas = vec![get_meta_data("d", Type::Str), get_meta_data("e", Type::U64)];
metas.sort();
let res = merge_field_meta_data(vec![vec![get_meta_data("e", Type::Str)], metas], &schema);
assert_eq!(
res,
vec![
get_meta_data("d", Type::Str),
get_meta_data("e", Type::Str),
get_meta_data("e", Type::U64),
],
]
);
}
#[test]
fn test_merge_field_meta_data_bitxor() {
let field_metadata1 = FieldMetadata {
field_name: "a".to_string(),
typ: crate::schema::Type::Str,
term_dictionary_size: None,
postings_size: None,
positions_size: None,
indexed: false,
stored: false,
fast_size: Some(10u64.into()),
fast: true,
};
let field_metadata2 = FieldMetadata {
field_name: "a".to_string(),
typ: crate::schema::Type::Str,
term_dictionary_size: Some(10u64.into()),
postings_size: Some(11u64.into()),
positions_size: Some(12u64.into()),
indexed: true,
stored: false,
fast_size: None,
fast: false,
};
let field_metadata_expected = FieldMetadata {
field_name: "a".to_string(),
typ: crate::schema::Type::Str,
term_dictionary_size: Some(10u64.into()),
postings_size: Some(11u64.into()),
positions_size: Some(12u64.into()),
indexed: true,
stored: false,
fast_size: Some(10u64.into()),
fast: true,
};
let mut res1 = field_metadata1.clone();
res1.merge(field_metadata2.clone());
res1 |= field_metadata2.clone();
let mut res2 = field_metadata2.clone();
res2.merge(field_metadata1);
res2 |= field_metadata1;
assert_eq!(res1, field_metadata_expected);
assert_eq!(res2, field_metadata_expected);
}
@@ -778,7 +662,6 @@ mod test {
assert_eq!(4, searcher.segment_reader(0).max_doc());
Ok(())
}
#[test]
fn test_alive_docs_iterator() -> crate::Result<()> {
let mut schema_builder = Schema::builder();

Some files were not shown because too many files have changed in this diff Show More