mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-01-06 17:22:54 +00:00
Compare commits
64 Commits
dependabot
...
trinity.po
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
dedd1aa83a | ||
|
|
080fa4d1f4 | ||
|
|
988c2b35e7 | ||
|
|
bf3cc12610 | ||
|
|
a2400f4e73 | ||
|
|
436ec6caea | ||
|
|
4a6123d3ff | ||
|
|
5a2fe42c24 | ||
|
|
5379c99ea2 | ||
|
|
3fa90e70e2 | ||
|
|
6ab4102253 | ||
|
|
11c6329ca5 | ||
|
|
ab8bb93928 | ||
|
|
2b668bd2bf | ||
|
|
97a7137ef8 | ||
|
|
ffa7cdf397 | ||
|
|
caf1275e60 | ||
|
|
fb12b7be28 | ||
|
|
6f77083493 | ||
|
|
cd7745da7a | ||
|
|
eb8304dee9 | ||
|
|
e5638112a9 | ||
|
|
81110152fb | ||
|
|
ae88a7ece5 | ||
|
|
bdd5f80fd9 | ||
|
|
3f62ef22e5 | ||
|
|
8102e19e48 | ||
|
|
175c853ea7 | ||
|
|
c992cf3f37 | ||
|
|
83f6c2f265 | ||
|
|
17bf8aa092 | ||
|
|
6fc0e96ff8 | ||
|
|
06d2dcf469 | ||
|
|
b681ec9335 | ||
|
|
da2ff5712a | ||
|
|
18da402e27 | ||
|
|
18ae3ffe94 | ||
|
|
0a37b7acaa | ||
|
|
1a9fd885dd | ||
|
|
3e660905a7 | ||
|
|
0c2b984cb4 | ||
|
|
a69b1c609c | ||
|
|
8d4a6fcaba | ||
|
|
feced4762f | ||
|
|
0149317c5a | ||
|
|
3fcb6f9597 | ||
|
|
388fcd763b | ||
|
|
e488f9e6a2 | ||
|
|
9426d5be7b | ||
|
|
d5d2d41264 | ||
|
|
80f5f1ecd4 | ||
|
|
519e5d2ed1 | ||
|
|
df2d52a84e | ||
|
|
371dba9414 | ||
|
|
0afabad494 | ||
|
|
89b052cd42 | ||
|
|
c48c649436 | ||
|
|
58c0739953 | ||
|
|
e7daf69de9 | ||
|
|
f060e86bc6 | ||
|
|
0368162ef0 | ||
|
|
e843c71015 | ||
|
|
5cea16ef9f | ||
|
|
4d4ee1b0ac |
12
CHANGELOG.md
12
CHANGELOG.md
@@ -1,12 +1,14 @@
|
||||
Tantivy 0.23 - Unreleased
|
||||
Tantivy 0.24
|
||||
================================
|
||||
Tantivy 0.23 will be backwards compatible with indices created with v0.22 and v0.21. The new minimum rust version will be 1.75.
|
||||
Tantivy 0.24 will be backwards compatible with indices created with v0.22 and v0.21. The new minimum rust version will be 1.75. Tantivy 0.23 will be skipped.
|
||||
|
||||
#### Bugfixes
|
||||
- fix potential endless loop in merge [#2457](https://github.com/quickwit-oss/tantivy/pull/2457)(@PSeitz)
|
||||
- fix bug that causes out-of-order sstable key. [#2445](https://github.com/quickwit-oss/tantivy/pull/2445)(@fulmicoton)
|
||||
- fix ReferenceValue API flaw [#2372](https://github.com/quickwit-oss/tantivy/pull/2372)(@PSeitz)
|
||||
- fix `OwnedBytes` debug panic [#2512](https://github.com/quickwit-oss/tantivy/pull/2512)(@b41sh)
|
||||
- catch panics during merges [#2582](https://github.com/quickwit-oss/tantivy/pull/2582)(@rdettai)
|
||||
- switch from u32 to usize in bitpacker. This enables multivalued columns larger than 4GB, which crashed during merge before. [#2581](https://github.com/quickwit-oss/tantivy/pull/2581) [#2586](https://github.com/quickwit-oss/tantivy/pull/2586)(@fulmicoton-dd @PSeitz)
|
||||
|
||||
#### Breaking API Changes
|
||||
- remove index sorting [#2434](https://github.com/quickwit-oss/tantivy/pull/2434)(@PSeitz)
|
||||
@@ -24,6 +26,7 @@ Tantivy 0.23 will be backwards compatible with indices created with v0.22 and v0
|
||||
- reduce top hits memory consumption [#2426](https://github.com/quickwit-oss/tantivy/pull/2426)(@PSeitz)
|
||||
- check unsupported parameters top_hits [#2351](https://github.com/quickwit-oss/tantivy/pull/2351)(@PSeitz)
|
||||
- Change AggregationLimits to AggregationLimitsGuard [#2495](https://github.com/quickwit-oss/tantivy/pull/2495)(@PSeitz)
|
||||
- add support for counting non integer in aggregation [#2547](https://github.com/quickwit-oss/tantivy/pull/2547)(@trinity-1686a)
|
||||
- **Range Queries**
|
||||
- Support fast field range queries on json fields [#2456](https://github.com/quickwit-oss/tantivy/pull/2456)(@PSeitz)
|
||||
- Add support for str fast field range query [#2460](https://github.com/quickwit-oss/tantivy/pull/2460) [#2452](https://github.com/quickwit-oss/tantivy/pull/2452) [#2453](https://github.com/quickwit-oss/tantivy/pull/2453)(@PSeitz)
|
||||
@@ -34,7 +37,8 @@ Tantivy 0.23 will be backwards compatible with indices created with v0.22 and v0
|
||||
- add columnar format compatibility tests [#2433](https://github.com/quickwit-oss/tantivy/pull/2433)(@PSeitz)
|
||||
- Improved snippet ranges algorithm [#2474](https://github.com/quickwit-oss/tantivy/pull/2474)(@gezihuzi)
|
||||
- make find_field_with_default return json fields without path [#2476](https://github.com/quickwit-oss/tantivy/pull/2476)(@trinity-1686a)
|
||||
- feat(query): Make `BooleanQuery` support `minimum_number_should_match` [#2405](https://github.com/quickwit-oss/tantivy/pull/2405)(@LebranceBW)
|
||||
- Make `BooleanQuery` support `minimum_number_should_match` [#2405](https://github.com/quickwit-oss/tantivy/pull/2405)(@LebranceBW)
|
||||
- Make `NUM_MERGE_THREADS` configurable [#2535](https://github.com/quickwit-oss/tantivy/pull/2535)(@Barre)
|
||||
|
||||
- **RegexPhraseQuery**
|
||||
`RegexPhraseQuery` supports phrase queries with regex. E.g. query "b.* b.* wolf" matches "big bad wolf". Slop is supported as well: "b.* wolf"~2 matches "big bad wolf" [#2516](https://github.com/quickwit-oss/tantivy/pull/2516)(@PSeitz)
|
||||
@@ -60,7 +64,9 @@ This will slightly increase space and access time. [#2439](https://github.com/qu
|
||||
- fix de-escaping too much in query parser [#2427](https://github.com/quickwit-oss/tantivy/pull/2427)(@trinity-1686a)
|
||||
- improve query parser [#2416](https://github.com/quickwit-oss/tantivy/pull/2416)(@trinity-1686a)
|
||||
- Support field grouping `title:(return AND "pink panther")` [#2333](https://github.com/quickwit-oss/tantivy/pull/2333)(@trinity-1686a)
|
||||
- allow term starting with wildcard [#2568](https://github.com/quickwit-oss/tantivy/pull/2568)(@trinity-1686a)
|
||||
|
||||
- Exist queries match subpath fields [#2558](https://github.com/quickwit-oss/tantivy/pull/2558)(@rdettai)
|
||||
- add access benchmark for columnar [#2432](https://github.com/quickwit-oss/tantivy/pull/2432)(@PSeitz)
|
||||
- extend indexwriter proptests [#2342](https://github.com/quickwit-oss/tantivy/pull/2342)(@PSeitz)
|
||||
- add bench & test for columnar merging [#2428](https://github.com/quickwit-oss/tantivy/pull/2428)(@PSeitz)
|
||||
|
||||
27
Cargo.toml
27
Cargo.toml
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "tantivy"
|
||||
version = "0.23.0"
|
||||
version = "0.24.0"
|
||||
authors = ["Paul Masurel <paul.masurel@gmail.com>"]
|
||||
license = "MIT"
|
||||
categories = ["database-implementations", "data-structures"]
|
||||
@@ -11,7 +11,7 @@ repository = "https://github.com/quickwit-oss/tantivy"
|
||||
readme = "README.md"
|
||||
keywords = ["search", "information", "retrieval"]
|
||||
edition = "2021"
|
||||
rust-version = "1.75"
|
||||
rust-version = "1.85"
|
||||
exclude = ["benches/*.json", "benches/*.txt"]
|
||||
|
||||
[dependencies]
|
||||
@@ -31,8 +31,8 @@ lz4_flex = { version = "0.11", default-features = false, optional = true }
|
||||
zstd = { version = "0.13", optional = true, default-features = false }
|
||||
tempfile = { version = "3.12.0", optional = true }
|
||||
log = "0.4.16"
|
||||
serde = { version = "1.0.136", features = ["derive"] }
|
||||
serde_json = "1.0.79"
|
||||
serde = { version = "1.0.219", features = ["derive"] }
|
||||
serde_json = "1.0.140"
|
||||
fs4 = { version = "0.8.0", optional = true }
|
||||
levenshtein_automata = "0.2.1"
|
||||
uuid = { version = "1.0.0", features = ["v4", "serde"] }
|
||||
@@ -57,13 +57,13 @@ measure_time = "0.9.0"
|
||||
arc-swap = "1.5.0"
|
||||
bon = "3.3.1"
|
||||
|
||||
columnar = { version = "0.3", path = "./columnar", package = "tantivy-columnar" }
|
||||
sstable = { version = "0.3", path = "./sstable", package = "tantivy-sstable", optional = true }
|
||||
stacker = { version = "0.3", path = "./stacker", package = "tantivy-stacker" }
|
||||
query-grammar = { version = "0.22.0", path = "./query-grammar", package = "tantivy-query-grammar" }
|
||||
tantivy-bitpacker = { version = "0.6", path = "./bitpacker" }
|
||||
common = { version = "0.7", path = "./common/", package = "tantivy-common" }
|
||||
tokenizer-api = { version = "0.3", path = "./tokenizer-api", package = "tantivy-tokenizer-api" }
|
||||
columnar = { version = "0.5", path = "./columnar", package = "tantivy-columnar" }
|
||||
sstable = { version = "0.5", path = "./sstable", package = "tantivy-sstable", optional = true }
|
||||
stacker = { version = "0.5", path = "./stacker", package = "tantivy-stacker" }
|
||||
query-grammar = { version = "0.24.0", path = "./query-grammar", package = "tantivy-query-grammar" }
|
||||
tantivy-bitpacker = { version = "0.8", path = "./bitpacker" }
|
||||
common = { version = "0.9", path = "./common/", package = "tantivy-common" }
|
||||
tokenizer-api = { version = "0.5", path = "./tokenizer-api", package = "tantivy-tokenizer-api" }
|
||||
sketches-ddsketch = { version = "0.3.0", features = ["use_serde"] }
|
||||
hyperloglogplus = { version = "0.4.1", features = ["const-loop"] }
|
||||
futures-util = { version = "0.3.28", optional = true }
|
||||
@@ -112,13 +112,16 @@ debug-assertions = true
|
||||
overflow-checks = true
|
||||
|
||||
[features]
|
||||
default = ["mmap", "stopwords", "lz4-compression"]
|
||||
default = ["mmap", "stopwords", "lz4-compression", "columnar-zstd-compression"]
|
||||
mmap = ["fs4", "tempfile", "memmap2"]
|
||||
stopwords = []
|
||||
|
||||
lz4-compression = ["lz4_flex"]
|
||||
zstd-compression = ["zstd"]
|
||||
|
||||
# enable zstd-compression in columnar (and sstable)
|
||||
columnar-zstd-compression = ["columnar/zstd-compression"]
|
||||
|
||||
failpoints = ["fail", "fail/failpoints"]
|
||||
unstable = [] # useful for benches.
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
[package]
|
||||
name = "tantivy-bitpacker"
|
||||
version = "0.6.0"
|
||||
edition = "2021"
|
||||
version = "0.8.0"
|
||||
edition = "2024"
|
||||
authors = ["Paul Masurel <paul.masurel@gmail.com>"]
|
||||
license = "MIT"
|
||||
categories = []
|
||||
|
||||
@@ -65,7 +65,7 @@ impl BitPacker {
|
||||
|
||||
#[derive(Clone, Debug, Default, Copy)]
|
||||
pub struct BitUnpacker {
|
||||
num_bits: u32,
|
||||
num_bits: usize,
|
||||
mask: u64,
|
||||
}
|
||||
|
||||
@@ -83,7 +83,7 @@ impl BitUnpacker {
|
||||
(1u64 << num_bits) - 1u64
|
||||
};
|
||||
BitUnpacker {
|
||||
num_bits: u32::from(num_bits),
|
||||
num_bits: usize::from(num_bits),
|
||||
mask,
|
||||
}
|
||||
}
|
||||
@@ -94,14 +94,14 @@ impl BitUnpacker {
|
||||
|
||||
#[inline]
|
||||
pub fn get(&self, idx: u32, data: &[u8]) -> u64 {
|
||||
let addr_in_bits = idx * self.num_bits;
|
||||
let addr = (addr_in_bits >> 3) as usize;
|
||||
let addr_in_bits = idx as usize * self.num_bits;
|
||||
let addr = addr_in_bits >> 3;
|
||||
if addr + 8 > data.len() {
|
||||
if self.num_bits == 0 {
|
||||
return 0;
|
||||
}
|
||||
let bit_shift = addr_in_bits & 7;
|
||||
return self.get_slow_path(addr, bit_shift, data);
|
||||
return self.get_slow_path(addr, bit_shift as u32, data);
|
||||
}
|
||||
let bit_shift = addr_in_bits & 7;
|
||||
let bytes: [u8; 8] = (&data[addr..addr + 8]).try_into().unwrap();
|
||||
@@ -134,12 +134,13 @@ impl BitUnpacker {
|
||||
"Bitwidth must be <= 32 to use this method."
|
||||
);
|
||||
|
||||
let end_idx = start_idx + output.len() as u32;
|
||||
let end_idx: u32 = start_idx + output.len() as u32;
|
||||
|
||||
let end_bit_read = end_idx * self.num_bits;
|
||||
// We use `usize` here to avoid overflow issues.
|
||||
let end_bit_read = (end_idx as usize) * self.num_bits;
|
||||
let end_byte_read = (end_bit_read + 7) / 8;
|
||||
assert!(
|
||||
end_byte_read as usize <= data.len(),
|
||||
end_byte_read <= data.len(),
|
||||
"Requested index is out of bounds."
|
||||
);
|
||||
|
||||
@@ -159,24 +160,24 @@ impl BitUnpacker {
|
||||
// We want the start of the fast track to start align with bytes.
|
||||
// A sufficient condition is to start with an idx that is a multiple of 8,
|
||||
// so highway start is the closest multiple of 8 that is >= start_idx.
|
||||
let entrance_ramp_len = 8 - (start_idx % 8) % 8;
|
||||
let entrance_ramp_len: u32 = 8 - (start_idx % 8) % 8;
|
||||
|
||||
let highway_start: u32 = start_idx + entrance_ramp_len;
|
||||
|
||||
if highway_start + BitPacker1x::BLOCK_LEN as u32 > end_idx {
|
||||
if highway_start + (BitPacker1x::BLOCK_LEN as u32) > end_idx {
|
||||
// We don't have enough values to have even a single block of highway.
|
||||
// Let's just supply the values the simple way.
|
||||
get_batch_ramp(start_idx, output);
|
||||
return;
|
||||
}
|
||||
|
||||
let num_blocks: u32 = (end_idx - highway_start) / BitPacker1x::BLOCK_LEN as u32;
|
||||
let num_blocks: usize = (end_idx - highway_start) as usize / BitPacker1x::BLOCK_LEN;
|
||||
|
||||
// Entrance ramp
|
||||
get_batch_ramp(start_idx, &mut output[..entrance_ramp_len as usize]);
|
||||
|
||||
// Highway
|
||||
let mut offset = (highway_start * self.num_bits) as usize / 8;
|
||||
let mut offset = (highway_start as usize * self.num_bits) / 8;
|
||||
let mut output_cursor = (highway_start - start_idx) as usize;
|
||||
for _ in 0..num_blocks {
|
||||
offset += BitPacker1x.decompress(
|
||||
@@ -188,7 +189,7 @@ impl BitUnpacker {
|
||||
}
|
||||
|
||||
// Exit ramp
|
||||
let highway_end = highway_start + num_blocks * BitPacker1x::BLOCK_LEN as u32;
|
||||
let highway_end: u32 = highway_start + (num_blocks * BitPacker1x::BLOCK_LEN) as u32;
|
||||
get_batch_ramp(highway_end, &mut output[output_cursor..]);
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use super::bitpacker::BitPacker;
|
||||
use super::compute_num_bits;
|
||||
use crate::{minmax, BitUnpacker};
|
||||
use crate::{BitUnpacker, minmax};
|
||||
|
||||
const BLOCK_SIZE: usize = 128;
|
||||
|
||||
@@ -34,7 +34,7 @@ struct BlockedBitpackerEntryMetaData {
|
||||
|
||||
impl BlockedBitpackerEntryMetaData {
|
||||
fn new(offset: u64, num_bits: u8, base_value: u64) -> Self {
|
||||
let encoded = offset | (num_bits as u64) << (64 - 8);
|
||||
let encoded = offset | (u64::from(num_bits) << (64 - 8));
|
||||
Self {
|
||||
encoded,
|
||||
base_value,
|
||||
|
||||
@@ -33,11 +33,7 @@ pub use crate::blocked_bitpacker::BlockedBitpacker;
|
||||
/// number of bits.
|
||||
pub fn compute_num_bits(n: u64) -> u8 {
|
||||
let amplitude = (64u32 - n.leading_zeros()) as u8;
|
||||
if amplitude <= 64 - 8 {
|
||||
amplitude
|
||||
} else {
|
||||
64
|
||||
}
|
||||
if amplitude <= 64 - 8 { amplitude } else { 64 }
|
||||
}
|
||||
|
||||
/// Computes the (min, max) of an iterator of `PartialOrd` values.
|
||||
|
||||
@@ -16,14 +16,14 @@ body = """
|
||||
|
||||
{%- if version %} in {{ version }}{%- endif -%}
|
||||
{% for commit in commits %}
|
||||
{% if commit.github.pr_title -%}
|
||||
{%- set commit_message = commit.github.pr_title -%}
|
||||
{% if commit.remote.pr_title -%}
|
||||
{%- set commit_message = commit.remote.pr_title -%}
|
||||
{%- else -%}
|
||||
{%- set commit_message = commit.message -%}
|
||||
{%- endif -%}
|
||||
- {{ commit_message | split(pat="\n") | first | trim }}\
|
||||
{% if commit.github.pr_number %} \
|
||||
[#{{ commit.github.pr_number }}]({{ self::remote_url() }}/pull/{{ commit.github.pr_number }}){% if commit.github.username %}(@{{ commit.github.username }}){%- endif -%} \
|
||||
{% if commit.remote.pr_number %} \
|
||||
[#{{ commit.remote.pr_number }}]({{ self::remote_url() }}/pull/{{ commit.remote.pr_number }}){% if commit.remote.username %}(@{{ commit.remote.username }}){%- endif -%} \
|
||||
{%- endif %}
|
||||
{%- endfor -%}
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
[package]
|
||||
name = "tantivy-columnar"
|
||||
version = "0.3.0"
|
||||
edition = "2021"
|
||||
version = "0.5.0"
|
||||
edition = "2024"
|
||||
license = "MIT"
|
||||
homepage = "https://github.com/quickwit-oss/tantivy"
|
||||
repository = "https://github.com/quickwit-oss/tantivy"
|
||||
@@ -12,10 +12,10 @@ categories = ["database-implementations", "data-structures", "compression"]
|
||||
itertools = "0.14.0"
|
||||
fastdivide = "0.4.0"
|
||||
|
||||
stacker = { version= "0.3", path = "../stacker", package="tantivy-stacker"}
|
||||
sstable = { version= "0.3", path = "../sstable", package = "tantivy-sstable" }
|
||||
common = { version= "0.7", path = "../common", package = "tantivy-common" }
|
||||
tantivy-bitpacker = { version= "0.6", path = "../bitpacker/" }
|
||||
stacker = { version= "0.5", path = "../stacker", package="tantivy-stacker"}
|
||||
sstable = { version= "0.5", path = "../sstable", package = "tantivy-sstable" }
|
||||
common = { version= "0.9", path = "../common", package = "tantivy-common" }
|
||||
tantivy-bitpacker = { version= "0.8", path = "../bitpacker/" }
|
||||
serde = "1.0.152"
|
||||
downcast-rs = "2.0.1"
|
||||
|
||||
@@ -33,6 +33,6 @@ harness = false
|
||||
name = "bench_access"
|
||||
harness = false
|
||||
|
||||
|
||||
[features]
|
||||
unstable = []
|
||||
zstd-compression = ["sstable/zstd-compression"]
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use binggan::{black_box, InputGroup};
|
||||
use binggan::{InputGroup, black_box};
|
||||
use common::*;
|
||||
use tantivy_columnar::Column;
|
||||
|
||||
|
||||
@@ -4,9 +4,9 @@ extern crate test;
|
||||
use std::sync::Arc;
|
||||
|
||||
use rand::prelude::*;
|
||||
use tantivy_columnar::column_values::{serialize_and_load_u64_based_column_values, CodecType};
|
||||
use tantivy_columnar::column_values::{CodecType, serialize_and_load_u64_based_column_values};
|
||||
use tantivy_columnar::*;
|
||||
use test::{black_box, Bencher};
|
||||
use test::{Bencher, black_box};
|
||||
|
||||
struct Columns {
|
||||
pub optional: Column,
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
pub mod common;
|
||||
|
||||
use binggan::BenchRunner;
|
||||
use common::{generate_columnar_with_name, Card};
|
||||
use common::{Card, generate_columnar_with_name};
|
||||
use tantivy_columnar::*;
|
||||
|
||||
const NUM_DOCS: u32 = 100_000;
|
||||
|
||||
@@ -6,7 +6,7 @@ use std::sync::Arc;
|
||||
use common::OwnedBytes;
|
||||
use rand::rngs::StdRng;
|
||||
use rand::seq::SliceRandom;
|
||||
use rand::{random, Rng, SeedableRng};
|
||||
use rand::{Rng, SeedableRng, random};
|
||||
use tantivy_columnar::ColumnValues;
|
||||
use test::Bencher;
|
||||
extern crate test;
|
||||
|
||||
@@ -5,7 +5,7 @@ use std::ops::RangeInclusive;
|
||||
use std::sync::Arc;
|
||||
|
||||
use rand::prelude::*;
|
||||
use tantivy_columnar::column_values::{serialize_and_load_u64_based_column_values, CodecType};
|
||||
use tantivy_columnar::column_values::{CodecType, serialize_and_load_u64_based_column_values};
|
||||
use tantivy_columnar::*;
|
||||
use test::Bencher;
|
||||
|
||||
|
||||
18
columnar/columnar-cli-inspect/Cargo.toml
Normal file
18
columnar/columnar-cli-inspect/Cargo.toml
Normal file
@@ -0,0 +1,18 @@
|
||||
[package]
|
||||
name = "tantivy-columnar-inspect"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
license = "MIT"
|
||||
|
||||
[dependencies]
|
||||
tantivy = {path="../..", package="tantivy"}
|
||||
columnar = {path="../", package="tantivy-columnar"}
|
||||
common = {path="../../common", package="tantivy-common"}
|
||||
|
||||
[workspace]
|
||||
members = []
|
||||
|
||||
[profile.release]
|
||||
debug = true
|
||||
#debug-assertions = true
|
||||
#overflow-checks = true
|
||||
54
columnar/columnar-cli-inspect/src/main.rs
Normal file
54
columnar/columnar-cli-inspect/src/main.rs
Normal file
@@ -0,0 +1,54 @@
|
||||
use columnar::ColumnarReader;
|
||||
use common::file_slice::{FileSlice, WrapFile};
|
||||
use std::io;
|
||||
use std::path::Path;
|
||||
use tantivy::directory::footer::Footer;
|
||||
|
||||
fn main() -> io::Result<()> {
|
||||
println!("Opens a columnar file written by tantivy and validates it.");
|
||||
let path = std::env::args().nth(1).unwrap();
|
||||
|
||||
let path = Path::new(&path);
|
||||
println!("Reading {:?}", path);
|
||||
let _reader = open_and_validate_columnar(path.to_str().unwrap())?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn validate_columnar_reader(reader: &ColumnarReader) {
|
||||
let num_rows = reader.num_rows();
|
||||
println!("num_rows: {}", num_rows);
|
||||
let columns = reader.list_columns().unwrap();
|
||||
println!("num columns: {:?}", columns.len());
|
||||
for (col_name, dynamic_column_handle) in columns {
|
||||
let col = dynamic_column_handle.open().unwrap();
|
||||
match col {
|
||||
columnar::DynamicColumn::Bool(_)
|
||||
| columnar::DynamicColumn::I64(_)
|
||||
| columnar::DynamicColumn::U64(_)
|
||||
| columnar::DynamicColumn::F64(_)
|
||||
| columnar::DynamicColumn::IpAddr(_)
|
||||
| columnar::DynamicColumn::DateTime(_)
|
||||
| columnar::DynamicColumn::Bytes(_) => {}
|
||||
columnar::DynamicColumn::Str(str_column) => {
|
||||
let num_vals = str_column.ords().values.num_vals();
|
||||
let num_terms_dict = str_column.num_terms() as u64;
|
||||
let max_ord = str_column.ords().values.iter().max().unwrap_or_default();
|
||||
println!("{col_name:35} num_vals {num_vals:10} \t num_terms_dict {num_terms_dict:8} max_ord: {max_ord:8}",);
|
||||
for ord in str_column.ords().values.iter() {
|
||||
assert!(ord < num_terms_dict);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Opens a columnar file that was written by tantivy and validates it.
|
||||
pub fn open_and_validate_columnar(path: &str) -> io::Result<ColumnarReader> {
|
||||
let wrap_file = WrapFile::new(std::fs::File::open(path)?)?;
|
||||
let slice = FileSlice::new(std::sync::Arc::new(wrap_file));
|
||||
let (_footer, slice) = Footer::extract_footer(slice.clone()).unwrap();
|
||||
let reader = ColumnarReader::open(slice).unwrap();
|
||||
validate_columnar_reader(&reader);
|
||||
Ok(reader)
|
||||
}
|
||||
@@ -66,7 +66,7 @@ impl<T: PartialOrd + Copy + std::fmt::Debug + Send + Sync + 'static + Default>
|
||||
&'a self,
|
||||
docs: &'a [u32],
|
||||
accessor: &Column<T>,
|
||||
) -> impl Iterator<Item = (DocId, T)> + 'a {
|
||||
) -> impl Iterator<Item = (DocId, T)> + 'a + use<'a, T> {
|
||||
if accessor.index.get_cardinality().is_full() {
|
||||
docs.iter().cloned().zip(self.val_cache.iter().cloned())
|
||||
} else {
|
||||
@@ -139,7 +139,7 @@ mod tests {
|
||||
missing_docs.push(missing_doc);
|
||||
});
|
||||
|
||||
assert_eq!(missing_docs, vec![]);
|
||||
assert_eq!(missing_docs, Vec::<u32>::new());
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -4,8 +4,8 @@ use std::{fmt, io};
|
||||
|
||||
use sstable::{Dictionary, VoidSSTable};
|
||||
|
||||
use crate::column::Column;
|
||||
use crate::RowId;
|
||||
use crate::column::Column;
|
||||
|
||||
/// Dictionary encoded column.
|
||||
///
|
||||
|
||||
@@ -9,13 +9,14 @@ use std::sync::Arc;
|
||||
use common::BinarySerializable;
|
||||
pub use dictionary_encoded::{BytesColumn, StrColumn};
|
||||
pub use serialize::{
|
||||
open_column_bytes, open_column_str, open_column_u128, open_column_u128_as_compact_u64,
|
||||
open_column_u64, serialize_column_mappable_to_u128, serialize_column_mappable_to_u64,
|
||||
open_column_bytes, open_column_str, open_column_u64, open_column_u128,
|
||||
open_column_u128_as_compact_u64, serialize_column_mappable_to_u64,
|
||||
serialize_column_mappable_to_u128,
|
||||
};
|
||||
|
||||
use crate::column_index::{ColumnIndex, Set};
|
||||
use crate::column_values::monotonic_mapping::StrictlyMonotonicMappingToInternal;
|
||||
use crate::column_values::{monotonic_map_column, ColumnValues};
|
||||
use crate::column_values::{ColumnValues, monotonic_map_column};
|
||||
use crate::{Cardinality, DocId, EmptyColumnValues, MonotonicallyMappableToU64, RowId};
|
||||
|
||||
#[derive(Clone)]
|
||||
|
||||
@@ -6,10 +6,10 @@ use common::OwnedBytes;
|
||||
use sstable::Dictionary;
|
||||
|
||||
use crate::column::{BytesColumn, Column};
|
||||
use crate::column_index::{serialize_column_index, SerializableColumnIndex};
|
||||
use crate::column_index::{SerializableColumnIndex, serialize_column_index};
|
||||
use crate::column_values::{
|
||||
CodecType, MonotonicallyMappableToU64, MonotonicallyMappableToU128,
|
||||
load_u64_based_column_values, serialize_column_values_u128, serialize_u64_based_column_values,
|
||||
CodecType, MonotonicallyMappableToU128, MonotonicallyMappableToU64,
|
||||
};
|
||||
use crate::iterable::Iterable;
|
||||
use crate::{StrColumn, Version};
|
||||
|
||||
@@ -99,9 +99,9 @@ mod tests {
|
||||
|
||||
use crate::column_index::merge::detect_cardinality;
|
||||
use crate::column_index::multivalued_index::{
|
||||
open_multivalued_index, serialize_multivalued_index, MultiValueIndex,
|
||||
MultiValueIndex, open_multivalued_index, serialize_multivalued_index,
|
||||
};
|
||||
use crate::column_index::{merge_column_index, OptionalIndex, SerializableColumnIndex};
|
||||
use crate::column_index::{OptionalIndex, SerializableColumnIndex, merge_column_index};
|
||||
use crate::{
|
||||
Cardinality, ColumnIndex, MergeRowOrder, RowAddr, RowId, ShuffleMergeOrder, StackMergeOrder,
|
||||
};
|
||||
|
||||
@@ -137,8 +137,8 @@ impl Iterable<u32> for ShuffledMultivaluedIndex<'_> {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::column_index::OptionalIndex;
|
||||
use crate::RowAddr;
|
||||
use crate::column_index::OptionalIndex;
|
||||
|
||||
#[test]
|
||||
fn test_integrate_num_vals_empty() {
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
use std::ops::Range;
|
||||
|
||||
use crate::column_index::SerializableColumnIndex;
|
||||
use crate::column_index::multivalued_index::{MultiValueIndex, SerializableMultivalueIndex};
|
||||
use crate::column_index::serialize::SerializableOptionalIndex;
|
||||
use crate::column_index::SerializableColumnIndex;
|
||||
use crate::iterable::Iterable;
|
||||
use crate::{Cardinality, ColumnIndex, RowId, StackMergeOrder};
|
||||
|
||||
@@ -56,7 +56,7 @@ fn get_doc_ids_with_values<'a>(
|
||||
ColumnIndex::Full => Box::new(doc_range),
|
||||
ColumnIndex::Optional(optional_index) => Box::new(
|
||||
optional_index
|
||||
.iter_rows()
|
||||
.iter_docs()
|
||||
.map(move |row| row + doc_range.start),
|
||||
),
|
||||
ColumnIndex::Multivalued(multivalued_index) => match multivalued_index {
|
||||
@@ -73,7 +73,7 @@ fn get_doc_ids_with_values<'a>(
|
||||
MultiValueIndex::MultiValueIndexV2(multivalued_index) => Box::new(
|
||||
multivalued_index
|
||||
.optional_index
|
||||
.iter_rows()
|
||||
.iter_docs()
|
||||
.map(move |row| row + doc_range.start),
|
||||
),
|
||||
},
|
||||
@@ -177,7 +177,7 @@ impl<'a> Iterable<RowId> for StackedOptionalIndex<'a> {
|
||||
ColumnIndex::Full => Box::new(columnar_row_range),
|
||||
ColumnIndex::Optional(optional_index) => Box::new(
|
||||
optional_index
|
||||
.iter_rows()
|
||||
.iter_docs()
|
||||
.map(move |row_id: RowId| columnar_row_range.start + row_id),
|
||||
),
|
||||
ColumnIndex::Multivalued(_) => {
|
||||
|
||||
@@ -14,7 +14,7 @@ pub use merge::merge_column_index;
|
||||
pub(crate) use multivalued_index::SerializableMultivalueIndex;
|
||||
pub use optional_index::{OptionalIndex, Set};
|
||||
pub use serialize::{
|
||||
open_column_index, serialize_column_index, SerializableColumnIndex, SerializableOptionalIndex,
|
||||
SerializableColumnIndex, SerializableOptionalIndex, open_column_index, serialize_column_index,
|
||||
};
|
||||
|
||||
use crate::column_index::multivalued_index::MultiValueIndex;
|
||||
|
||||
@@ -8,7 +8,7 @@ use common::{CountingWriter, OwnedBytes};
|
||||
use super::optional_index::{open_optional_index, serialize_optional_index};
|
||||
use super::{OptionalIndex, SerializableOptionalIndex, Set};
|
||||
use crate::column_values::{
|
||||
load_u64_based_column_values, serialize_u64_based_column_values, CodecType, ColumnValues,
|
||||
CodecType, ColumnValues, load_u64_based_column_values, serialize_u64_based_column_values,
|
||||
};
|
||||
use crate::iterable::Iterable;
|
||||
use crate::{DocId, RowId, Version};
|
||||
|
||||
@@ -7,7 +7,7 @@ mod set_block;
|
||||
use common::{BinarySerializable, OwnedBytes, VInt};
|
||||
pub use set::{SelectCursor, Set, SetCodec};
|
||||
use set_block::{
|
||||
DenseBlock, DenseBlockCodec, SparseBlock, SparseBlockCodec, DENSE_BLOCK_NUM_BYTES,
|
||||
DENSE_BLOCK_NUM_BYTES, DenseBlock, DenseBlockCodec, SparseBlock, SparseBlockCodec,
|
||||
};
|
||||
|
||||
use crate::iterable::Iterable;
|
||||
@@ -80,23 +80,23 @@ impl BlockVariant {
|
||||
/// index is the block index. For each block `byte_start` and `offset` is computed.
|
||||
#[derive(Clone)]
|
||||
pub struct OptionalIndex {
|
||||
num_rows: RowId,
|
||||
num_non_null_rows: RowId,
|
||||
num_docs: RowId,
|
||||
num_non_null_docs: RowId,
|
||||
block_data: OwnedBytes,
|
||||
block_metas: Arc<[BlockMeta]>,
|
||||
}
|
||||
|
||||
impl Iterable<u32> for &OptionalIndex {
|
||||
fn boxed_iter(&self) -> Box<dyn Iterator<Item = u32> + '_> {
|
||||
Box::new(self.iter_rows())
|
||||
Box::new(self.iter_docs())
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for OptionalIndex {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
f.debug_struct("OptionalIndex")
|
||||
.field("num_rows", &self.num_rows)
|
||||
.field("num_non_null_rows", &self.num_non_null_rows)
|
||||
.field("num_docs", &self.num_docs)
|
||||
.field("num_non_null_docs", &self.num_non_null_docs)
|
||||
.finish_non_exhaustive()
|
||||
}
|
||||
}
|
||||
@@ -259,11 +259,13 @@ impl Set<RowId> for OptionalIndex {
|
||||
|
||||
impl OptionalIndex {
|
||||
pub fn for_test(num_rows: RowId, row_ids: &[RowId]) -> OptionalIndex {
|
||||
assert!(row_ids
|
||||
.last()
|
||||
.copied()
|
||||
.map(|last_row_id| last_row_id < num_rows)
|
||||
.unwrap_or(true));
|
||||
assert!(
|
||||
row_ids
|
||||
.last()
|
||||
.copied()
|
||||
.map(|last_row_id| last_row_id < num_rows)
|
||||
.unwrap_or(true)
|
||||
);
|
||||
let mut buffer = Vec::new();
|
||||
serialize_optional_index(&row_ids, num_rows, &mut buffer).unwrap();
|
||||
let bytes = OwnedBytes::new(buffer);
|
||||
@@ -271,17 +273,17 @@ impl OptionalIndex {
|
||||
}
|
||||
|
||||
pub fn num_docs(&self) -> RowId {
|
||||
self.num_rows
|
||||
self.num_docs
|
||||
}
|
||||
|
||||
pub fn num_non_nulls(&self) -> RowId {
|
||||
self.num_non_null_rows
|
||||
self.num_non_null_docs
|
||||
}
|
||||
|
||||
pub fn iter_rows(&self) -> impl Iterator<Item = RowId> + '_ {
|
||||
pub fn iter_docs(&self) -> impl Iterator<Item = RowId> + '_ {
|
||||
// TODO optimize
|
||||
let mut select_batch = self.select_cursor();
|
||||
(0..self.num_non_null_rows).map(move |rank| select_batch.select(rank))
|
||||
(0..self.num_non_null_docs).map(move |rank| select_batch.select(rank))
|
||||
}
|
||||
pub fn select_batch(&self, ranks: &mut [RowId]) {
|
||||
let mut select_cursor = self.select_cursor();
|
||||
@@ -519,15 +521,15 @@ pub fn open_optional_index(bytes: OwnedBytes) -> io::Result<OptionalIndex> {
|
||||
let (mut bytes, num_non_empty_blocks_bytes) = bytes.rsplit(2);
|
||||
let num_non_empty_block_bytes =
|
||||
u16::from_le_bytes(num_non_empty_blocks_bytes.as_slice().try_into().unwrap());
|
||||
let num_rows = VInt::deserialize_u64(&mut bytes)? as u32;
|
||||
let num_docs = VInt::deserialize_u64(&mut bytes)? as u32;
|
||||
let block_metas_num_bytes =
|
||||
num_non_empty_block_bytes as usize * SERIALIZED_BLOCK_META_NUM_BYTES;
|
||||
let (block_data, block_metas) = bytes.rsplit(block_metas_num_bytes);
|
||||
let (block_metas, num_non_null_rows) =
|
||||
deserialize_optional_index_block_metadatas(block_metas.as_slice(), num_rows);
|
||||
let (block_metas, num_non_null_docs) =
|
||||
deserialize_optional_index_block_metadatas(block_metas.as_slice(), num_docs);
|
||||
let optional_index = OptionalIndex {
|
||||
num_rows,
|
||||
num_non_null_rows,
|
||||
num_docs,
|
||||
num_non_null_docs,
|
||||
block_data,
|
||||
block_metas: block_metas.into(),
|
||||
};
|
||||
|
||||
@@ -2,7 +2,7 @@ use std::io::{self, Write};
|
||||
|
||||
use common::BinarySerializable;
|
||||
|
||||
use crate::column_index::optional_index::{SelectCursor, Set, SetCodec, ELEMENTS_PER_BLOCK};
|
||||
use crate::column_index::optional_index::{ELEMENTS_PER_BLOCK, SelectCursor, Set, SetCodec};
|
||||
|
||||
#[inline(always)]
|
||||
fn get_bit_at(input: u64, n: u16) -> bool {
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
mod dense;
|
||||
mod sparse;
|
||||
|
||||
pub use dense::{DenseBlock, DenseBlockCodec, DENSE_BLOCK_NUM_BYTES};
|
||||
pub use dense::{DENSE_BLOCK_NUM_BYTES, DenseBlock, DenseBlockCodec};
|
||||
pub use sparse::{SparseBlock, SparseBlockCodec};
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -164,7 +164,7 @@ fn test_optional_index_large() {
|
||||
fn test_optional_index_iter_aux(row_ids: &[RowId], num_rows: RowId) {
|
||||
let optional_index = OptionalIndex::for_test(num_rows, row_ids);
|
||||
assert_eq!(optional_index.num_docs(), num_rows);
|
||||
assert!(optional_index.iter_rows().eq(row_ids.iter().copied()));
|
||||
assert!(optional_index.iter_docs().eq(row_ids.iter().copied()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -254,11 +254,7 @@ mod bench {
|
||||
let mut current = start;
|
||||
std::iter::from_fn(move || {
|
||||
current += rng.gen_range(avg_step_size - avg_deviation..=avg_step_size + avg_deviation);
|
||||
if current >= end {
|
||||
None
|
||||
} else {
|
||||
Some(current)
|
||||
}
|
||||
if current >= end { None } else { Some(current) }
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -3,11 +3,11 @@ use std::io::Write;
|
||||
|
||||
use common::{CountingWriter, OwnedBytes};
|
||||
|
||||
use super::multivalued_index::SerializableMultivalueIndex;
|
||||
use super::OptionalIndex;
|
||||
use super::multivalued_index::SerializableMultivalueIndex;
|
||||
use crate::column_index::ColumnIndex;
|
||||
use crate::column_index::multivalued_index::serialize_multivalued_index;
|
||||
use crate::column_index::optional_index::serialize_optional_index;
|
||||
use crate::column_index::ColumnIndex;
|
||||
use crate::iterable::Iterable;
|
||||
use crate::{Cardinality, RowId, Version};
|
||||
|
||||
|
||||
@@ -11,7 +11,7 @@ use crate::column_values::u64_based::*;
|
||||
fn get_data() -> Vec<u64> {
|
||||
let mut rng = StdRng::seed_from_u64(2u64);
|
||||
let mut data: Vec<_> = (100..55000_u64)
|
||||
.map(|num| num + rng.gen::<u8>() as u64)
|
||||
.map(|num| num + rng.r#gen::<u8>() as u64)
|
||||
.collect();
|
||||
data.push(99_000);
|
||||
data.insert(1000, 2000);
|
||||
|
||||
@@ -26,13 +26,13 @@ mod monotonic_column;
|
||||
|
||||
pub(crate) use merge::MergedColumnValues;
|
||||
pub use stats::ColumnStats;
|
||||
pub use u128_based::{
|
||||
open_u128_as_compact_u64, open_u128_mapped, serialize_column_values_u128,
|
||||
CompactSpaceU64Accessor,
|
||||
};
|
||||
pub use u64_based::{
|
||||
load_u64_based_column_values, serialize_and_load_u64_based_column_values,
|
||||
serialize_u64_based_column_values, CodecType, ALL_U64_CODEC_TYPES,
|
||||
ALL_U64_CODEC_TYPES, CodecType, load_u64_based_column_values,
|
||||
serialize_and_load_u64_based_column_values, serialize_u64_based_column_values,
|
||||
};
|
||||
pub use u128_based::{
|
||||
CompactSpaceU64Accessor, open_u128_as_compact_u64, open_u128_mapped,
|
||||
serialize_column_values_u128,
|
||||
};
|
||||
pub use vec_column::VecColumn;
|
||||
|
||||
|
||||
@@ -2,8 +2,8 @@ use std::fmt::Debug;
|
||||
use std::marker::PhantomData;
|
||||
use std::ops::{Range, RangeInclusive};
|
||||
|
||||
use crate::column_values::monotonic_mapping::StrictlyMonotonicFn;
|
||||
use crate::ColumnValues;
|
||||
use crate::column_values::monotonic_mapping::StrictlyMonotonicFn;
|
||||
|
||||
struct MonotonicMappingColumn<C, T, Input> {
|
||||
from_column: C,
|
||||
@@ -99,10 +99,10 @@ where
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::column_values::VecColumn;
|
||||
use crate::column_values::monotonic_mapping::{
|
||||
StrictlyMonotonicMappingInverter, StrictlyMonotonicMappingToInternal,
|
||||
};
|
||||
use crate::column_values::VecColumn;
|
||||
|
||||
#[test]
|
||||
fn test_monotonic_mapping_iter() {
|
||||
|
||||
@@ -24,8 +24,8 @@ use build_compact_space::get_compact_space;
|
||||
use common::{BinarySerializable, CountingWriter, OwnedBytes, VInt, VIntU128};
|
||||
use tantivy_bitpacker::{BitPacker, BitUnpacker};
|
||||
|
||||
use crate::column_values::ColumnValues;
|
||||
use crate::RowId;
|
||||
use crate::column_values::ColumnValues;
|
||||
|
||||
/// The cost per blank is quite hard actually, since blanks are delta encoded, the actual cost of
|
||||
/// blanks depends on the number of blanks.
|
||||
@@ -653,12 +653,14 @@ mod tests {
|
||||
),
|
||||
&[3]
|
||||
);
|
||||
assert!(get_positions_for_value_range_helper(
|
||||
&decomp,
|
||||
99998u128..=99998u128,
|
||||
complete_range.clone()
|
||||
)
|
||||
.is_empty());
|
||||
assert!(
|
||||
get_positions_for_value_range_helper(
|
||||
&decomp,
|
||||
99998u128..=99998u128,
|
||||
complete_range.clone()
|
||||
)
|
||||
.is_empty()
|
||||
);
|
||||
assert_eq!(
|
||||
&get_positions_for_value_range_helper(
|
||||
&decomp,
|
||||
|
||||
@@ -130,11 +130,11 @@ pub fn open_u128_as_compact_u64(mut bytes: OwnedBytes) -> io::Result<Arc<dyn Col
|
||||
#[cfg(test)]
|
||||
pub(crate) mod tests {
|
||||
use super::*;
|
||||
use crate::column_values::u64_based::{
|
||||
serialize_and_load_u64_based_column_values, serialize_u64_based_column_values,
|
||||
ALL_U64_CODEC_TYPES,
|
||||
};
|
||||
use crate::column_values::CodecType;
|
||||
use crate::column_values::u64_based::{
|
||||
ALL_U64_CODEC_TYPES, serialize_and_load_u64_based_column_values,
|
||||
serialize_u64_based_column_values,
|
||||
};
|
||||
|
||||
#[test]
|
||||
fn test_serialize_deserialize_u128_header() {
|
||||
|
||||
@@ -4,7 +4,7 @@ use std::ops::{Range, RangeInclusive};
|
||||
|
||||
use common::{BinarySerializable, OwnedBytes};
|
||||
use fastdivide::DividerU64;
|
||||
use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker};
|
||||
use tantivy_bitpacker::{BitPacker, BitUnpacker, compute_num_bits};
|
||||
|
||||
use crate::column_values::u64_based::{ColumnCodec, ColumnCodecEstimator, ColumnStats};
|
||||
use crate::{ColumnValues, RowId};
|
||||
@@ -23,11 +23,7 @@ const fn div_ceil(n: u64, q: NonZeroU64) -> u64 {
|
||||
// copied from unstable rust standard library.
|
||||
let d = n / q.get();
|
||||
let r = n % q.get();
|
||||
if r > 0 {
|
||||
d + 1
|
||||
} else {
|
||||
d
|
||||
}
|
||||
if r > 0 { d + 1 } else { d }
|
||||
}
|
||||
|
||||
// The bitpacked codec applies a linear transformation `f` over data that are bitpacked.
|
||||
|
||||
@@ -4,12 +4,12 @@ use std::{io, iter};
|
||||
|
||||
use common::{BinarySerializable, CountingWriter, DeserializeFrom, OwnedBytes};
|
||||
use fastdivide::DividerU64;
|
||||
use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker};
|
||||
use tantivy_bitpacker::{BitPacker, BitUnpacker, compute_num_bits};
|
||||
|
||||
use crate::MonotonicallyMappableToU64;
|
||||
use crate::column_values::u64_based::line::Line;
|
||||
use crate::column_values::u64_based::{ColumnCodec, ColumnCodecEstimator, ColumnStats};
|
||||
use crate::column_values::{ColumnValues, VecColumn};
|
||||
use crate::MonotonicallyMappableToU64;
|
||||
|
||||
const BLOCK_SIZE: u32 = 512u32;
|
||||
|
||||
|
||||
@@ -1,13 +1,13 @@
|
||||
use std::io;
|
||||
|
||||
use common::{BinarySerializable, OwnedBytes};
|
||||
use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker};
|
||||
use tantivy_bitpacker::{BitPacker, BitUnpacker, compute_num_bits};
|
||||
|
||||
use super::line::Line;
|
||||
use super::ColumnValues;
|
||||
use crate::column_values::u64_based::{ColumnCodec, ColumnCodecEstimator, ColumnStats};
|
||||
use crate::column_values::VecColumn;
|
||||
use super::line::Line;
|
||||
use crate::RowId;
|
||||
use crate::column_values::VecColumn;
|
||||
use crate::column_values::u64_based::{ColumnCodec, ColumnCodecEstimator, ColumnStats};
|
||||
|
||||
const HALF_SPACE: u64 = u64::MAX / 2;
|
||||
const LINE_ESTIMATION_BLOCK_LEN: usize = 512;
|
||||
|
||||
@@ -17,7 +17,7 @@ pub use crate::column_values::u64_based::bitpacked::BitpackedCodec;
|
||||
pub use crate::column_values::u64_based::blockwise_linear::BlockwiseLinearCodec;
|
||||
pub use crate::column_values::u64_based::linear::LinearCodec;
|
||||
pub use crate::column_values::u64_based::stats_collector::StatsCollector;
|
||||
use crate::column_values::{monotonic_map_column, ColumnStats};
|
||||
use crate::column_values::{ColumnStats, monotonic_map_column};
|
||||
use crate::iterable::Iterable;
|
||||
use crate::{ColumnValues, MonotonicallyMappableToU64};
|
||||
|
||||
|
||||
@@ -2,8 +2,8 @@ use std::num::NonZeroU64;
|
||||
|
||||
use fastdivide::DividerU64;
|
||||
|
||||
use crate::column_values::ColumnStats;
|
||||
use crate::RowId;
|
||||
use crate::column_values::ColumnStats;
|
||||
|
||||
/// Compute the gcd of two non null numbers.
|
||||
///
|
||||
@@ -96,8 +96,8 @@ impl StatsCollector {
|
||||
mod tests {
|
||||
use std::num::NonZeroU64;
|
||||
|
||||
use crate::column_values::u64_based::stats_collector::{compute_gcd, StatsCollector};
|
||||
use crate::column_values::u64_based::ColumnStats;
|
||||
use crate::column_values::u64_based::stats_collector::{StatsCollector, compute_gcd};
|
||||
|
||||
fn compute_stats(vals: impl Iterator<Item = u64>) -> ColumnStats {
|
||||
let mut stats_collector = StatsCollector::default();
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use proptest::prelude::*;
|
||||
use proptest::{prop_oneof, proptest};
|
||||
use rand::Rng;
|
||||
|
||||
#[test]
|
||||
fn test_serialize_and_load_simple() {
|
||||
|
||||
@@ -4,8 +4,8 @@ use std::net::Ipv6Addr;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::value::NumericalType;
|
||||
use crate::InvalidData;
|
||||
use crate::value::NumericalType;
|
||||
|
||||
/// The column type represents the column type.
|
||||
/// Any changes need to be propagated to `COLUMN_TYPES`.
|
||||
|
||||
@@ -3,7 +3,7 @@ use std::io::{self, Write};
|
||||
use common::{BitSet, CountingWriter, ReadOnlyBitSet};
|
||||
use sstable::{SSTable, Streamer, TermOrdinal, VoidSSTable};
|
||||
|
||||
use super::term_merger::TermMerger;
|
||||
use super::term_merger::{TermMerger, TermsWithSegmentOrd};
|
||||
use crate::column::serialize_column_mappable_to_u64;
|
||||
use crate::column_index::SerializableColumnIndex;
|
||||
use crate::iterable::Iterable;
|
||||
@@ -126,14 +126,17 @@ fn serialize_merged_dict(
|
||||
let mut term_ord_mapping = TermOrdinalMapping::default();
|
||||
|
||||
let mut field_term_streams = Vec::new();
|
||||
for column_opt in bytes_columns.iter() {
|
||||
for (segment_ord, column_opt) in bytes_columns.iter().enumerate() {
|
||||
if let Some(column) = column_opt {
|
||||
term_ord_mapping.add_segment(column.dictionary.num_terms());
|
||||
let terms: Streamer<VoidSSTable> = column.dictionary.stream()?;
|
||||
field_term_streams.push(terms);
|
||||
field_term_streams.push(TermsWithSegmentOrd { terms, segment_ord });
|
||||
} else {
|
||||
term_ord_mapping.add_segment(0);
|
||||
field_term_streams.push(Streamer::empty());
|
||||
field_term_streams.push(TermsWithSegmentOrd {
|
||||
terms: Streamer::empty(),
|
||||
segment_ord,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -191,6 +194,7 @@ fn serialize_merged_dict(
|
||||
|
||||
#[derive(Default, Debug)]
|
||||
struct TermOrdinalMapping {
|
||||
/// Contains the new term ordinals for each segment.
|
||||
per_segment_new_term_ordinals: Vec<Vec<TermOrdinal>>,
|
||||
}
|
||||
|
||||
@@ -205,6 +209,6 @@ impl TermOrdinalMapping {
|
||||
}
|
||||
|
||||
fn get_segment(&self, segment_ord: u32) -> &[TermOrdinal] {
|
||||
&(self.per_segment_new_term_ordinals[segment_ord as usize])[..]
|
||||
&self.per_segment_new_term_ordinals[segment_ord as usize]
|
||||
}
|
||||
}
|
||||
|
||||
@@ -26,7 +26,7 @@ impl StackMergeOrder {
|
||||
let mut cumulated_row_ids: Vec<RowId> = Vec::with_capacity(columnars.len());
|
||||
let mut cumulated_row_id = 0;
|
||||
for columnar in columnars {
|
||||
cumulated_row_id += columnar.num_rows();
|
||||
cumulated_row_id += columnar.num_docs();
|
||||
cumulated_row_ids.push(cumulated_row_id);
|
||||
}
|
||||
StackMergeOrder { cumulated_row_ids }
|
||||
|
||||
@@ -10,11 +10,11 @@ use std::sync::Arc;
|
||||
pub use merge_mapping::{MergeRowOrder, ShuffleMergeOrder, StackMergeOrder};
|
||||
|
||||
use super::writer::ColumnarSerializer;
|
||||
use crate::column::{serialize_column_mappable_to_u128, serialize_column_mappable_to_u64};
|
||||
use crate::column::{serialize_column_mappable_to_u64, serialize_column_mappable_to_u128};
|
||||
use crate::column_values::MergedColumnValues;
|
||||
use crate::columnar::ColumnarReader;
|
||||
use crate::columnar::merge::merge_dict_column::merge_bytes_or_str_column;
|
||||
use crate::columnar::writer::CompatibleNumericalTypes;
|
||||
use crate::columnar::ColumnarReader;
|
||||
use crate::dynamic_column::DynamicColumn;
|
||||
use crate::{
|
||||
BytesColumn, Column, ColumnIndex, ColumnType, ColumnValues, DynamicColumnHandle, NumericalType,
|
||||
@@ -80,13 +80,12 @@ pub fn merge_columnar(
|
||||
output: &mut impl io::Write,
|
||||
) -> io::Result<()> {
|
||||
let mut serializer = ColumnarSerializer::new(output);
|
||||
let num_rows_per_columnar = columnar_readers
|
||||
let num_docs_per_columnar = columnar_readers
|
||||
.iter()
|
||||
.map(|reader| reader.num_rows())
|
||||
.map(|reader| reader.num_docs())
|
||||
.collect::<Vec<u32>>();
|
||||
|
||||
let columns_to_merge =
|
||||
group_columns_for_merge(columnar_readers, required_columns, &merge_row_order)?;
|
||||
let columns_to_merge = group_columns_for_merge(columnar_readers, required_columns)?;
|
||||
for res in columns_to_merge {
|
||||
let ((column_name, _column_type_category), grouped_columns) = res;
|
||||
let grouped_columns = grouped_columns.open(&merge_row_order)?;
|
||||
@@ -94,15 +93,18 @@ pub fn merge_columnar(
|
||||
continue;
|
||||
}
|
||||
|
||||
let column_type = grouped_columns.column_type_after_merge();
|
||||
let column_type_after_merge = grouped_columns.column_type_after_merge();
|
||||
let mut columns = grouped_columns.columns;
|
||||
coerce_columns(column_type, &mut columns)?;
|
||||
// Make sure the number of columns is the same as the number of columnar readers.
|
||||
// Or num_docs_per_columnar would be incorrect.
|
||||
assert_eq!(columns.len(), columnar_readers.len());
|
||||
coerce_columns(column_type_after_merge, &mut columns)?;
|
||||
|
||||
let mut column_serializer =
|
||||
serializer.start_serialize_column(column_name.as_bytes(), column_type);
|
||||
serializer.start_serialize_column(column_name.as_bytes(), column_type_after_merge);
|
||||
merge_column(
|
||||
column_type,
|
||||
&num_rows_per_columnar,
|
||||
column_type_after_merge,
|
||||
&num_docs_per_columnar,
|
||||
columns,
|
||||
&merge_row_order,
|
||||
&mut column_serializer,
|
||||
@@ -128,7 +130,7 @@ fn dynamic_column_to_u64_monotonic(dynamic_column: DynamicColumn) -> Option<Colu
|
||||
fn merge_column(
|
||||
column_type: ColumnType,
|
||||
num_docs_per_column: &[u32],
|
||||
columns: Vec<Option<DynamicColumn>>,
|
||||
columns_to_merge: Vec<Option<DynamicColumn>>,
|
||||
merge_row_order: &MergeRowOrder,
|
||||
wrt: &mut impl io::Write,
|
||||
) -> io::Result<()> {
|
||||
@@ -138,20 +140,21 @@ fn merge_column(
|
||||
| ColumnType::F64
|
||||
| ColumnType::DateTime
|
||||
| ColumnType::Bool => {
|
||||
let mut column_indexes: Vec<ColumnIndex> = Vec::with_capacity(columns.len());
|
||||
let mut column_indexes: Vec<ColumnIndex> = Vec::with_capacity(columns_to_merge.len());
|
||||
let mut column_values: Vec<Option<Arc<dyn ColumnValues>>> =
|
||||
Vec::with_capacity(columns.len());
|
||||
for (i, dynamic_column_opt) in columns.into_iter().enumerate() {
|
||||
if let Some(Column { index: idx, values }) =
|
||||
dynamic_column_opt.and_then(dynamic_column_to_u64_monotonic)
|
||||
{
|
||||
column_indexes.push(idx);
|
||||
column_values.push(Some(values));
|
||||
} else {
|
||||
column_indexes.push(ColumnIndex::Empty {
|
||||
num_docs: num_docs_per_column[i],
|
||||
});
|
||||
column_values.push(None);
|
||||
Vec::with_capacity(columns_to_merge.len());
|
||||
for (i, dynamic_column_opt) in columns_to_merge.into_iter().enumerate() {
|
||||
match dynamic_column_opt.and_then(dynamic_column_to_u64_monotonic) {
|
||||
Some(Column { index: idx, values }) => {
|
||||
column_indexes.push(idx);
|
||||
column_values.push(Some(values));
|
||||
}
|
||||
None => {
|
||||
column_indexes.push(ColumnIndex::Empty {
|
||||
num_docs: num_docs_per_column[i],
|
||||
});
|
||||
column_values.push(None);
|
||||
}
|
||||
}
|
||||
}
|
||||
let merged_column_index =
|
||||
@@ -164,10 +167,10 @@ fn merge_column(
|
||||
serialize_column_mappable_to_u64(merged_column_index, &merge_column_values, wrt)?;
|
||||
}
|
||||
ColumnType::IpAddr => {
|
||||
let mut column_indexes: Vec<ColumnIndex> = Vec::with_capacity(columns.len());
|
||||
let mut column_indexes: Vec<ColumnIndex> = Vec::with_capacity(columns_to_merge.len());
|
||||
let mut column_values: Vec<Option<Arc<dyn ColumnValues<Ipv6Addr>>>> =
|
||||
Vec::with_capacity(columns.len());
|
||||
for (i, dynamic_column_opt) in columns.into_iter().enumerate() {
|
||||
Vec::with_capacity(columns_to_merge.len());
|
||||
for (i, dynamic_column_opt) in columns_to_merge.into_iter().enumerate() {
|
||||
if let Some(DynamicColumn::IpAddr(Column { index: idx, values })) =
|
||||
dynamic_column_opt
|
||||
{
|
||||
@@ -192,9 +195,10 @@ fn merge_column(
|
||||
serialize_column_mappable_to_u128(merged_column_index, &merge_column_values, wrt)?;
|
||||
}
|
||||
ColumnType::Bytes | ColumnType::Str => {
|
||||
let mut column_indexes: Vec<ColumnIndex> = Vec::with_capacity(columns.len());
|
||||
let mut bytes_columns: Vec<Option<BytesColumn>> = Vec::with_capacity(columns.len());
|
||||
for (i, dynamic_column_opt) in columns.into_iter().enumerate() {
|
||||
let mut column_indexes: Vec<ColumnIndex> = Vec::with_capacity(columns_to_merge.len());
|
||||
let mut bytes_columns: Vec<Option<BytesColumn>> =
|
||||
Vec::with_capacity(columns_to_merge.len());
|
||||
for (i, dynamic_column_opt) in columns_to_merge.into_iter().enumerate() {
|
||||
match dynamic_column_opt {
|
||||
Some(DynamicColumn::Str(str_column)) => {
|
||||
column_indexes.push(str_column.term_ord_column.index.clone());
|
||||
@@ -248,13 +252,15 @@ impl GroupedColumns {
|
||||
if column_type.len() == 1 {
|
||||
return column_type.into_iter().next().unwrap();
|
||||
}
|
||||
// At the moment, only the numerical categorical column type has more than one possible
|
||||
// At the moment, only the numerical column type category has more than one possible
|
||||
// column type.
|
||||
assert!(self
|
||||
.columns
|
||||
.iter()
|
||||
.flatten()
|
||||
.all(|el| ColumnTypeCategory::from(el.column_type()) == ColumnTypeCategory::Numerical));
|
||||
assert!(
|
||||
self.columns
|
||||
.iter()
|
||||
.flatten()
|
||||
.all(|el| ColumnTypeCategory::from(el.column_type())
|
||||
== ColumnTypeCategory::Numerical)
|
||||
);
|
||||
merged_numerical_columns_type(self.columns.iter().flatten()).into()
|
||||
}
|
||||
}
|
||||
@@ -361,7 +367,7 @@ fn is_empty_after_merge(
|
||||
ColumnIndex::Empty { .. } => true,
|
||||
ColumnIndex::Full => alive_bitset.len() == 0,
|
||||
ColumnIndex::Optional(optional_index) => {
|
||||
for doc in optional_index.iter_rows() {
|
||||
for doc in optional_index.iter_docs() {
|
||||
if alive_bitset.contains(doc) {
|
||||
return false;
|
||||
}
|
||||
@@ -391,7 +397,6 @@ fn is_empty_after_merge(
|
||||
fn group_columns_for_merge<'a>(
|
||||
columnar_readers: &'a [&'a ColumnarReader],
|
||||
required_columns: &'a [(String, ColumnType)],
|
||||
_merge_row_order: &'a MergeRowOrder,
|
||||
) -> io::Result<BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle>> {
|
||||
let mut columns: BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle> = BTreeMap::new();
|
||||
|
||||
|
||||
@@ -5,28 +5,29 @@ use sstable::TermOrdinal;
|
||||
|
||||
use crate::Streamer;
|
||||
|
||||
pub struct HeapItem<'a> {
|
||||
pub streamer: Streamer<'a>,
|
||||
/// The terms of a column with the ordinal of the segment.
|
||||
pub struct TermsWithSegmentOrd<'a> {
|
||||
pub terms: Streamer<'a>,
|
||||
pub segment_ord: usize,
|
||||
}
|
||||
|
||||
impl PartialEq for HeapItem<'_> {
|
||||
impl PartialEq for TermsWithSegmentOrd<'_> {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.segment_ord == other.segment_ord
|
||||
}
|
||||
}
|
||||
|
||||
impl Eq for HeapItem<'_> {}
|
||||
impl Eq for TermsWithSegmentOrd<'_> {}
|
||||
|
||||
impl<'a> PartialOrd for HeapItem<'a> {
|
||||
fn partial_cmp(&self, other: &HeapItem<'a>) -> Option<Ordering> {
|
||||
impl<'a> PartialOrd for TermsWithSegmentOrd<'a> {
|
||||
fn partial_cmp(&self, other: &TermsWithSegmentOrd<'a>) -> Option<Ordering> {
|
||||
Some(self.cmp(other))
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Ord for HeapItem<'a> {
|
||||
fn cmp(&self, other: &HeapItem<'a>) -> Ordering {
|
||||
(&other.streamer.key(), &other.segment_ord).cmp(&(&self.streamer.key(), &self.segment_ord))
|
||||
impl<'a> Ord for TermsWithSegmentOrd<'a> {
|
||||
fn cmp(&self, other: &TermsWithSegmentOrd<'a>) -> Ordering {
|
||||
(&other.terms.key(), &other.segment_ord).cmp(&(&self.terms.key(), &self.segment_ord))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -37,39 +38,32 @@ impl<'a> Ord for HeapItem<'a> {
|
||||
/// - the term
|
||||
/// - a slice with the ordinal of the segments containing the terms.
|
||||
pub struct TermMerger<'a> {
|
||||
heap: BinaryHeap<HeapItem<'a>>,
|
||||
current_streamers: Vec<HeapItem<'a>>,
|
||||
heap: BinaryHeap<TermsWithSegmentOrd<'a>>,
|
||||
term_streams_with_segment: Vec<TermsWithSegmentOrd<'a>>,
|
||||
}
|
||||
|
||||
impl<'a> TermMerger<'a> {
|
||||
/// Stream of merged term dictionary
|
||||
pub fn new(streams: Vec<Streamer<'a>>) -> TermMerger<'a> {
|
||||
pub fn new(term_streams_with_segment: Vec<TermsWithSegmentOrd<'a>>) -> TermMerger<'a> {
|
||||
TermMerger {
|
||||
heap: BinaryHeap::new(),
|
||||
current_streamers: streams
|
||||
.into_iter()
|
||||
.enumerate()
|
||||
.map(|(ord, streamer)| HeapItem {
|
||||
streamer,
|
||||
segment_ord: ord,
|
||||
})
|
||||
.collect(),
|
||||
term_streams_with_segment,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn matching_segments<'b: 'a>(
|
||||
&'b self,
|
||||
) -> impl 'b + Iterator<Item = (usize, TermOrdinal)> {
|
||||
self.current_streamers
|
||||
self.term_streams_with_segment
|
||||
.iter()
|
||||
.map(|heap_item| (heap_item.segment_ord, heap_item.streamer.term_ord()))
|
||||
.map(|heap_item| (heap_item.segment_ord, heap_item.terms.term_ord()))
|
||||
}
|
||||
|
||||
fn advance_segments(&mut self) {
|
||||
let streamers = &mut self.current_streamers;
|
||||
let streamers = &mut self.term_streams_with_segment;
|
||||
let heap = &mut self.heap;
|
||||
for mut heap_item in streamers.drain(..) {
|
||||
if heap_item.streamer.advance() {
|
||||
if heap_item.terms.advance() {
|
||||
heap.push(heap_item);
|
||||
}
|
||||
}
|
||||
@@ -80,18 +74,19 @@ impl<'a> TermMerger<'a> {
|
||||
/// False if there is none.
|
||||
pub fn advance(&mut self) -> bool {
|
||||
self.advance_segments();
|
||||
if let Some(head) = self.heap.pop() {
|
||||
self.current_streamers.push(head);
|
||||
while let Some(next_streamer) = self.heap.peek() {
|
||||
if self.current_streamers[0].streamer.key() != next_streamer.streamer.key() {
|
||||
break;
|
||||
match self.heap.pop() {
|
||||
Some(head) => {
|
||||
self.term_streams_with_segment.push(head);
|
||||
while let Some(next_streamer) = self.heap.peek() {
|
||||
if self.term_streams_with_segment[0].terms.key() != next_streamer.terms.key() {
|
||||
break;
|
||||
}
|
||||
let next_heap_it = self.heap.pop().unwrap(); // safe : we peeked beforehand
|
||||
self.term_streams_with_segment.push(next_heap_it);
|
||||
}
|
||||
let next_heap_it = self.heap.pop().unwrap(); // safe : we peeked beforehand
|
||||
self.current_streamers.push(next_heap_it);
|
||||
true
|
||||
}
|
||||
true
|
||||
} else {
|
||||
false
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -101,6 +96,6 @@ impl<'a> TermMerger<'a> {
|
||||
/// if and only if advance() has been called before
|
||||
/// and "true" was returned.
|
||||
pub fn key(&self) -> &[u8] {
|
||||
self.current_streamers[0].streamer.key()
|
||||
self.term_streams_with_segment[0].terms.key()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,10 @@
|
||||
use itertools::Itertools;
|
||||
use proptest::collection::vec;
|
||||
use proptest::prelude::*;
|
||||
|
||||
use super::*;
|
||||
use crate::{Cardinality, ColumnarWriter, HasAssociatedColumnType, RowId};
|
||||
use crate::columnar::{ColumnarReader, MergeRowOrder, StackMergeOrder, merge_columnar};
|
||||
use crate::{Cardinality, ColumnarWriter, DynamicColumn, HasAssociatedColumnType, RowId};
|
||||
|
||||
fn make_columnar<T: Into<NumericalValue> + HasAssociatedColumnType + Copy>(
|
||||
column_name: &str,
|
||||
@@ -26,9 +29,8 @@ fn test_column_coercion_to_u64() {
|
||||
// u64 type
|
||||
let columnar2 = make_columnar("numbers", &[u64::MAX]);
|
||||
let columnars = &[&columnar1, &columnar2];
|
||||
let merge_order = StackMergeOrder::stack(columnars).into();
|
||||
let column_map: BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle> =
|
||||
group_columns_for_merge(columnars, &[], &merge_order).unwrap();
|
||||
group_columns_for_merge(columnars, &[]).unwrap();
|
||||
assert_eq!(column_map.len(), 1);
|
||||
assert!(column_map.contains_key(&("numbers".to_string(), ColumnTypeCategory::Numerical)));
|
||||
}
|
||||
@@ -38,9 +40,8 @@ fn test_column_coercion_to_i64() {
|
||||
let columnar1 = make_columnar("numbers", &[-1i64]);
|
||||
let columnar2 = make_columnar("numbers", &[2u64]);
|
||||
let columnars = &[&columnar1, &columnar2];
|
||||
let merge_order = StackMergeOrder::stack(columnars).into();
|
||||
let column_map: BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle> =
|
||||
group_columns_for_merge(columnars, &[], &merge_order).unwrap();
|
||||
group_columns_for_merge(columnars, &[]).unwrap();
|
||||
assert_eq!(column_map.len(), 1);
|
||||
assert!(column_map.contains_key(&("numbers".to_string(), ColumnTypeCategory::Numerical)));
|
||||
}
|
||||
@@ -63,14 +64,8 @@ fn test_group_columns_with_required_column() {
|
||||
let columnar1 = make_columnar("numbers", &[1i64]);
|
||||
let columnar2 = make_columnar("numbers", &[2u64]);
|
||||
let columnars = &[&columnar1, &columnar2];
|
||||
let merge_order = StackMergeOrder::stack(columnars).into();
|
||||
let column_map: BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle> =
|
||||
group_columns_for_merge(
|
||||
&[&columnar1, &columnar2],
|
||||
&[("numbers".to_string(), ColumnType::U64)],
|
||||
&merge_order,
|
||||
)
|
||||
.unwrap();
|
||||
group_columns_for_merge(columnars, &[("numbers".to_string(), ColumnType::U64)]).unwrap();
|
||||
assert_eq!(column_map.len(), 1);
|
||||
assert!(column_map.contains_key(&("numbers".to_string(), ColumnTypeCategory::Numerical)));
|
||||
}
|
||||
@@ -80,13 +75,9 @@ fn test_group_columns_required_column_with_no_existing_columns() {
|
||||
let columnar1 = make_columnar("numbers", &[2u64]);
|
||||
let columnar2 = make_columnar("numbers", &[2u64]);
|
||||
let columnars = &[&columnar1, &columnar2];
|
||||
let merge_order = StackMergeOrder::stack(columnars).into();
|
||||
let column_map: BTreeMap<_, _> = group_columns_for_merge(
|
||||
columnars,
|
||||
&[("required_col".to_string(), ColumnType::Str)],
|
||||
&merge_order,
|
||||
)
|
||||
.unwrap();
|
||||
let column_map: BTreeMap<_, _> =
|
||||
group_columns_for_merge(columnars, &[("required_col".to_string(), ColumnType::Str)])
|
||||
.unwrap();
|
||||
assert_eq!(column_map.len(), 2);
|
||||
let columns = &column_map
|
||||
.get(&("required_col".to_string(), ColumnTypeCategory::Str))
|
||||
@@ -102,14 +93,8 @@ fn test_group_columns_required_column_is_above_all_columns_have_the_same_type_ru
|
||||
let columnar1 = make_columnar("numbers", &[2i64]);
|
||||
let columnar2 = make_columnar("numbers", &[2i64]);
|
||||
let columnars = &[&columnar1, &columnar2];
|
||||
let merge_order = StackMergeOrder::stack(columnars).into();
|
||||
let column_map: BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle> =
|
||||
group_columns_for_merge(
|
||||
columnars,
|
||||
&[("numbers".to_string(), ColumnType::U64)],
|
||||
&merge_order,
|
||||
)
|
||||
.unwrap();
|
||||
group_columns_for_merge(columnars, &[("numbers".to_string(), ColumnType::U64)]).unwrap();
|
||||
assert_eq!(column_map.len(), 1);
|
||||
assert!(column_map.contains_key(&("numbers".to_string(), ColumnTypeCategory::Numerical)));
|
||||
}
|
||||
@@ -119,9 +104,8 @@ fn test_missing_column() {
|
||||
let columnar1 = make_columnar("numbers", &[-1i64]);
|
||||
let columnar2 = make_columnar("numbers2", &[2u64]);
|
||||
let columnars = &[&columnar1, &columnar2];
|
||||
let merge_order = StackMergeOrder::stack(columnars).into();
|
||||
let column_map: BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle> =
|
||||
group_columns_for_merge(columnars, &[], &merge_order).unwrap();
|
||||
group_columns_for_merge(columnars, &[]).unwrap();
|
||||
assert_eq!(column_map.len(), 2);
|
||||
assert!(column_map.contains_key(&("numbers".to_string(), ColumnTypeCategory::Numerical)));
|
||||
{
|
||||
@@ -224,7 +208,7 @@ fn test_merge_columnar_numbers() {
|
||||
)
|
||||
.unwrap();
|
||||
let columnar_reader = ColumnarReader::open(buffer).unwrap();
|
||||
assert_eq!(columnar_reader.num_rows(), 3);
|
||||
assert_eq!(columnar_reader.num_docs(), 3);
|
||||
assert_eq!(columnar_reader.num_columns(), 1);
|
||||
let cols = columnar_reader.read_columns("numbers").unwrap();
|
||||
let dynamic_column = cols[0].open().unwrap();
|
||||
@@ -252,7 +236,7 @@ fn test_merge_columnar_texts() {
|
||||
)
|
||||
.unwrap();
|
||||
let columnar_reader = ColumnarReader::open(buffer).unwrap();
|
||||
assert_eq!(columnar_reader.num_rows(), 3);
|
||||
assert_eq!(columnar_reader.num_docs(), 3);
|
||||
assert_eq!(columnar_reader.num_columns(), 1);
|
||||
let cols = columnar_reader.read_columns("texts").unwrap();
|
||||
let dynamic_column = cols[0].open().unwrap();
|
||||
@@ -301,7 +285,7 @@ fn test_merge_columnar_byte() {
|
||||
)
|
||||
.unwrap();
|
||||
let columnar_reader = ColumnarReader::open(buffer).unwrap();
|
||||
assert_eq!(columnar_reader.num_rows(), 4);
|
||||
assert_eq!(columnar_reader.num_docs(), 4);
|
||||
assert_eq!(columnar_reader.num_columns(), 1);
|
||||
let cols = columnar_reader.read_columns("bytes").unwrap();
|
||||
let dynamic_column = cols[0].open().unwrap();
|
||||
@@ -357,7 +341,7 @@ fn test_merge_columnar_byte_with_missing() {
|
||||
)
|
||||
.unwrap();
|
||||
let columnar_reader = ColumnarReader::open(buffer).unwrap();
|
||||
assert_eq!(columnar_reader.num_rows(), 3 + 2 + 3);
|
||||
assert_eq!(columnar_reader.num_docs(), 3 + 2 + 3);
|
||||
assert_eq!(columnar_reader.num_columns(), 2);
|
||||
let cols = columnar_reader.read_columns("col").unwrap();
|
||||
let dynamic_column = cols[0].open().unwrap();
|
||||
@@ -409,7 +393,7 @@ fn test_merge_columnar_different_types() {
|
||||
)
|
||||
.unwrap();
|
||||
let columnar_reader = ColumnarReader::open(buffer).unwrap();
|
||||
assert_eq!(columnar_reader.num_rows(), 4);
|
||||
assert_eq!(columnar_reader.num_docs(), 4);
|
||||
assert_eq!(columnar_reader.num_columns(), 2);
|
||||
let cols = columnar_reader.read_columns("mixed").unwrap();
|
||||
|
||||
@@ -419,11 +403,11 @@ fn test_merge_columnar_different_types() {
|
||||
panic!()
|
||||
};
|
||||
assert_eq!(vals.get_cardinality(), Cardinality::Optional);
|
||||
assert_eq!(vals.values_for_doc(0).collect_vec(), vec![]);
|
||||
assert_eq!(vals.values_for_doc(1).collect_vec(), vec![]);
|
||||
assert_eq!(vals.values_for_doc(2).collect_vec(), vec![]);
|
||||
assert_eq!(vals.values_for_doc(0).collect_vec(), Vec::<i64>::new());
|
||||
assert_eq!(vals.values_for_doc(1).collect_vec(), Vec::<i64>::new());
|
||||
assert_eq!(vals.values_for_doc(2).collect_vec(), Vec::<i64>::new());
|
||||
assert_eq!(vals.values_for_doc(3).collect_vec(), vec![1]);
|
||||
assert_eq!(vals.values_for_doc(4).collect_vec(), vec![]);
|
||||
assert_eq!(vals.values_for_doc(4).collect_vec(), Vec::<i64>::new());
|
||||
|
||||
// text column
|
||||
let dynamic_column = cols[1].open().unwrap();
|
||||
@@ -474,7 +458,7 @@ fn test_merge_columnar_different_empty_cardinality() {
|
||||
)
|
||||
.unwrap();
|
||||
let columnar_reader = ColumnarReader::open(buffer).unwrap();
|
||||
assert_eq!(columnar_reader.num_rows(), 2);
|
||||
assert_eq!(columnar_reader.num_docs(), 2);
|
||||
assert_eq!(columnar_reader.num_columns(), 2);
|
||||
let cols = columnar_reader.read_columns("mixed").unwrap();
|
||||
|
||||
@@ -486,3 +470,119 @@ fn test_merge_columnar_different_empty_cardinality() {
|
||||
let dynamic_column = cols[1].open().unwrap();
|
||||
assert_eq!(dynamic_column.get_cardinality(), Cardinality::Optional);
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct ColumnSpec {
|
||||
column_name: String,
|
||||
/// (row_id, term)
|
||||
terms: Vec<(RowId, Vec<u8>)>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
struct ColumnarSpec {
|
||||
columns: Vec<ColumnSpec>,
|
||||
}
|
||||
|
||||
/// Generate a random (row_id, term) pair:
|
||||
/// - row_id in [0..10]
|
||||
/// - term is either from POSSIBLE_TERMS or random bytes
|
||||
fn rowid_and_term_strategy() -> impl Strategy<Value = (RowId, Vec<u8>)> {
|
||||
const POSSIBLE_TERMS: &[&[u8]] = &[b"a", b"b", b"allo"];
|
||||
|
||||
let term_strat = prop_oneof![
|
||||
// pick from the fixed list
|
||||
(0..POSSIBLE_TERMS.len()).prop_map(|i| POSSIBLE_TERMS[i].to_vec()),
|
||||
// or random bytes (length 0..10)
|
||||
prop::collection::vec(any::<u8>(), 0..10),
|
||||
];
|
||||
|
||||
(0u32..11, term_strat)
|
||||
}
|
||||
|
||||
/// Generate one ColumnSpec, with a random name and a random list of (row_id, term).
|
||||
/// We sort it by row_id so that data is in ascending order.
|
||||
fn column_spec_strategy() -> impl Strategy<Value = ColumnSpec> {
|
||||
let column_name = prop_oneof![
|
||||
Just("col".to_string()),
|
||||
Just("col2".to_string()),
|
||||
"col.*".prop_map(|s| s),
|
||||
];
|
||||
|
||||
// We'll produce 0..8 (rowid,term) entries for this column
|
||||
let data_strat = vec(rowid_and_term_strategy(), 0..8).prop_map(|mut pairs| {
|
||||
// Sort by row_id
|
||||
pairs.sort_by_key(|(row_id, _)| *row_id);
|
||||
pairs
|
||||
});
|
||||
|
||||
(column_name, data_strat).prop_map(|(name, data)| ColumnSpec {
|
||||
column_name: name,
|
||||
terms: data,
|
||||
})
|
||||
}
|
||||
|
||||
/// Strategy to generate an ColumnarSpec
|
||||
fn columnar_strategy() -> impl Strategy<Value = ColumnarSpec> {
|
||||
vec(column_spec_strategy(), 0..3).prop_map(|columns| ColumnarSpec { columns })
|
||||
}
|
||||
|
||||
/// Strategy to generate multiple ColumnarSpecs, each of which we will treat
|
||||
/// as one "columnar" to be merged together.
|
||||
fn columnars_strategy() -> impl Strategy<Value = Vec<ColumnarSpec>> {
|
||||
vec(columnar_strategy(), 1..4)
|
||||
}
|
||||
|
||||
/// Build a `ColumnarReader` from a `ColumnarSpec`
|
||||
fn build_columnar(spec: &ColumnarSpec) -> ColumnarReader {
|
||||
let mut writer = ColumnarWriter::default();
|
||||
let mut max_row_id = 0;
|
||||
for col in &spec.columns {
|
||||
for &(row_id, ref term) in &col.terms {
|
||||
writer.record_bytes(row_id, &col.column_name, term);
|
||||
max_row_id = max_row_id.max(row_id);
|
||||
}
|
||||
}
|
||||
|
||||
let mut buffer = Vec::new();
|
||||
writer.serialize(max_row_id + 1, &mut buffer).unwrap();
|
||||
ColumnarReader::open(buffer).unwrap()
|
||||
}
|
||||
|
||||
proptest! {
|
||||
// We just test that the merge_columnar function doesn't crash.
|
||||
#![proptest_config(ProptestConfig::with_cases(256))]
|
||||
#[test]
|
||||
fn test_merge_columnar_bytes_no_crash(columnars in columnars_strategy(), second_merge_columnars in columnars_strategy()) {
|
||||
let columnars: Vec<ColumnarReader> = columnars.iter()
|
||||
.map(build_columnar)
|
||||
.collect();
|
||||
|
||||
let mut out = Vec::new();
|
||||
let columnar_refs: Vec<&ColumnarReader> = columnars.iter().collect();
|
||||
let stack_merge_order = StackMergeOrder::stack(&columnar_refs);
|
||||
merge_columnar(
|
||||
&columnar_refs,
|
||||
&[],
|
||||
MergeRowOrder::Stack(stack_merge_order),
|
||||
&mut out,
|
||||
).unwrap();
|
||||
|
||||
let merged_reader = ColumnarReader::open(out).unwrap();
|
||||
|
||||
// Merge the second set of columnars with the result of the first merge
|
||||
let mut columnars: Vec<ColumnarReader> = second_merge_columnars.iter()
|
||||
.map(build_columnar)
|
||||
.collect();
|
||||
columnars.push(merged_reader);
|
||||
let mut out = Vec::new();
|
||||
let columnar_refs: Vec<&ColumnarReader> = columnars.iter().collect();
|
||||
let stack_merge_order = StackMergeOrder::stack(&columnar_refs);
|
||||
merge_columnar(
|
||||
&columnar_refs,
|
||||
&[],
|
||||
MergeRowOrder::Stack(stack_merge_order),
|
||||
&mut out,
|
||||
).unwrap();
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,9 +5,9 @@ mod reader;
|
||||
mod writer;
|
||||
|
||||
pub use column_type::{ColumnType, HasAssociatedColumnType};
|
||||
pub use format_version::{Version, CURRENT_VERSION};
|
||||
pub use format_version::{CURRENT_VERSION, Version};
|
||||
#[cfg(test)]
|
||||
pub(crate) use merge::ColumnTypeCategory;
|
||||
pub use merge::{merge_columnar, MergeRowOrder, ShuffleMergeOrder, StackMergeOrder};
|
||||
pub use merge::{MergeRowOrder, ShuffleMergeOrder, StackMergeOrder, merge_columnar};
|
||||
pub use reader::ColumnarReader;
|
||||
pub use writer::ColumnarWriter;
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
use std::{fmt, io, mem};
|
||||
|
||||
use common::BinarySerializable;
|
||||
use common::file_slice::FileSlice;
|
||||
use common::json_path_writer::JSON_PATH_SEGMENT_SEP;
|
||||
use common::BinarySerializable;
|
||||
use sstable::{Dictionary, RangeSSTable};
|
||||
|
||||
use crate::columnar::{format_version, ColumnType};
|
||||
use crate::columnar::{ColumnType, format_version};
|
||||
use crate::dynamic_column::DynamicColumnHandle;
|
||||
use crate::{RowId, Version};
|
||||
|
||||
@@ -19,13 +19,13 @@ fn io_invalid_data(msg: String) -> io::Error {
|
||||
pub struct ColumnarReader {
|
||||
column_dictionary: Dictionary<RangeSSTable>,
|
||||
column_data: FileSlice,
|
||||
num_rows: RowId,
|
||||
num_docs: RowId,
|
||||
format_version: Version,
|
||||
}
|
||||
|
||||
impl fmt::Debug for ColumnarReader {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
let num_rows = self.num_rows();
|
||||
let num_rows = self.num_docs();
|
||||
let columns = self.list_columns().unwrap();
|
||||
let num_cols = columns.len();
|
||||
let mut debug_struct = f.debug_struct("Columnar");
|
||||
@@ -112,13 +112,13 @@ impl ColumnarReader {
|
||||
Ok(ColumnarReader {
|
||||
column_dictionary,
|
||||
column_data,
|
||||
num_rows,
|
||||
num_docs: num_rows,
|
||||
format_version,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn num_rows(&self) -> RowId {
|
||||
self.num_rows
|
||||
pub fn num_docs(&self) -> RowId {
|
||||
self.num_docs
|
||||
}
|
||||
// Iterate over the columns in a sorted way
|
||||
pub fn iter_columns(
|
||||
|
||||
@@ -42,7 +42,7 @@ impl ColumnWriter {
|
||||
&self,
|
||||
arena: &MemoryArena,
|
||||
buffer: &'a mut Vec<u8>,
|
||||
) -> impl Iterator<Item = ColumnOperation<V>> + 'a {
|
||||
) -> impl Iterator<Item = ColumnOperation<V>> + 'a + use<'a, V> {
|
||||
buffer.clear();
|
||||
self.values.read_to_end(arena, buffer);
|
||||
let mut cursor: &[u8] = &buffer[..];
|
||||
@@ -104,9 +104,10 @@ pub(crate) struct NumericalColumnWriter {
|
||||
|
||||
impl NumericalColumnWriter {
|
||||
pub fn force_numerical_type(&mut self, numerical_type: NumericalType) {
|
||||
assert!(self
|
||||
.compatible_numerical_types
|
||||
.is_type_accepted(numerical_type));
|
||||
assert!(
|
||||
self.compatible_numerical_types
|
||||
.is_type_accepted(numerical_type)
|
||||
);
|
||||
self.compatible_numerical_types = CompatibleNumericalTypes::StaticType(numerical_type);
|
||||
}
|
||||
}
|
||||
@@ -211,7 +212,7 @@ impl NumericalColumnWriter {
|
||||
self,
|
||||
arena: &MemoryArena,
|
||||
buffer: &'a mut Vec<u8>,
|
||||
) -> impl Iterator<Item = ColumnOperation<NumericalValue>> + 'a {
|
||||
) -> impl Iterator<Item = ColumnOperation<NumericalValue>> + 'a + use<'a> {
|
||||
self.column_writer.operation_iterator(arena, buffer)
|
||||
}
|
||||
}
|
||||
@@ -255,7 +256,7 @@ impl StrOrBytesColumnWriter {
|
||||
&self,
|
||||
arena: &MemoryArena,
|
||||
byte_buffer: &'a mut Vec<u8>,
|
||||
) -> impl Iterator<Item = ColumnOperation<UnorderedId>> + 'a {
|
||||
) -> impl Iterator<Item = ColumnOperation<UnorderedId>> + 'a + use<'a> {
|
||||
self.column_writer.operation_iterator(arena, byte_buffer)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,13 +8,13 @@ use std::net::Ipv6Addr;
|
||||
|
||||
use column_operation::ColumnOperation;
|
||||
pub(crate) use column_writers::CompatibleNumericalTypes;
|
||||
use common::json_path_writer::JSON_END_OF_PATH;
|
||||
use common::CountingWriter;
|
||||
use common::json_path_writer::JSON_END_OF_PATH;
|
||||
pub(crate) use serializer::ColumnarSerializer;
|
||||
use stacker::{Addr, ArenaHashMap, MemoryArena};
|
||||
|
||||
use crate::column_index::{SerializableColumnIndex, SerializableOptionalIndex};
|
||||
use crate::column_values::{MonotonicallyMappableToU128, MonotonicallyMappableToU64};
|
||||
use crate::column_values::{MonotonicallyMappableToU64, MonotonicallyMappableToU128};
|
||||
use crate::columnar::column_type::ColumnType;
|
||||
use crate::columnar::writer::column_writers::{
|
||||
ColumnWriter, NumericalColumnWriter, StrOrBytesColumnWriter,
|
||||
|
||||
@@ -3,11 +3,11 @@ use std::io::Write;
|
||||
|
||||
use common::json_path_writer::JSON_END_OF_PATH;
|
||||
use common::{BinarySerializable, CountingWriter};
|
||||
use sstable::value::RangeValueWriter;
|
||||
use sstable::RangeSSTable;
|
||||
use sstable::value::RangeValueWriter;
|
||||
|
||||
use crate::columnar::ColumnType;
|
||||
use crate::RowId;
|
||||
use crate::columnar::ColumnType;
|
||||
|
||||
pub struct ColumnarSerializer<W: io::Write> {
|
||||
wrt: CountingWriter<W>,
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use crate::RowId;
|
||||
use crate::column_index::{SerializableMultivalueIndex, SerializableOptionalIndex};
|
||||
use crate::iterable::Iterable;
|
||||
use crate::RowId;
|
||||
|
||||
/// The `IndexBuilder` interprets a sequence of
|
||||
/// calls of the form:
|
||||
@@ -31,12 +31,13 @@ pub struct OptionalIndexBuilder {
|
||||
|
||||
impl OptionalIndexBuilder {
|
||||
pub fn finish(&mut self, num_rows: RowId) -> impl Iterable<RowId> + '_ {
|
||||
debug_assert!(self
|
||||
.docs
|
||||
.last()
|
||||
.copied()
|
||||
.map(|last_doc| last_doc < num_rows)
|
||||
.unwrap_or(true));
|
||||
debug_assert!(
|
||||
self.docs
|
||||
.last()
|
||||
.copied()
|
||||
.map(|last_doc| last_doc < num_rows)
|
||||
.unwrap_or(true)
|
||||
);
|
||||
&self.docs[..]
|
||||
}
|
||||
|
||||
@@ -48,12 +49,13 @@ impl OptionalIndexBuilder {
|
||||
impl IndexBuilder for OptionalIndexBuilder {
|
||||
#[inline(always)]
|
||||
fn record_row(&mut self, doc: RowId) {
|
||||
debug_assert!(self
|
||||
.docs
|
||||
.last()
|
||||
.copied()
|
||||
.map(|prev_doc| doc > prev_doc)
|
||||
.unwrap_or(true));
|
||||
debug_assert!(
|
||||
self.docs
|
||||
.last()
|
||||
.copied()
|
||||
.map(|prev_doc| doc > prev_doc)
|
||||
.unwrap_or(true)
|
||||
);
|
||||
self.docs.push(doc);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,8 +3,8 @@ use std::path::PathBuf;
|
||||
use itertools::Itertools;
|
||||
|
||||
use crate::{
|
||||
merge_columnar, Cardinality, Column, ColumnarReader, DynamicColumn, StackMergeOrder,
|
||||
CURRENT_VERSION,
|
||||
CURRENT_VERSION, Cardinality, Column, ColumnarReader, DynamicColumn, StackMergeOrder,
|
||||
merge_columnar,
|
||||
};
|
||||
|
||||
const NUM_DOCS: u32 = u16::MAX as u32;
|
||||
|
||||
@@ -6,7 +6,7 @@ use common::file_slice::FileSlice;
|
||||
use common::{ByteCount, DateTime, HasLen, OwnedBytes};
|
||||
|
||||
use crate::column::{BytesColumn, Column, StrColumn};
|
||||
use crate::column_values::{monotonic_map_column, StrictlyMonotonicFn};
|
||||
use crate::column_values::{StrictlyMonotonicFn, monotonic_map_column};
|
||||
use crate::columnar::ColumnType;
|
||||
use crate::{Cardinality, ColumnIndex, ColumnValues, NumericalType, Version};
|
||||
|
||||
|
||||
@@ -44,11 +44,11 @@ pub use block_accessor::ColumnBlockAccessor;
|
||||
pub use column::{BytesColumn, Column, StrColumn};
|
||||
pub use column_index::ColumnIndex;
|
||||
pub use column_values::{
|
||||
ColumnValues, EmptyColumnValues, MonotonicallyMappableToU128, MonotonicallyMappableToU64,
|
||||
ColumnValues, EmptyColumnValues, MonotonicallyMappableToU64, MonotonicallyMappableToU128,
|
||||
};
|
||||
pub use columnar::{
|
||||
merge_columnar, ColumnType, ColumnarReader, ColumnarWriter, HasAssociatedColumnType,
|
||||
MergeRowOrder, ShuffleMergeOrder, StackMergeOrder, Version, CURRENT_VERSION,
|
||||
CURRENT_VERSION, ColumnType, ColumnarReader, ColumnarWriter, HasAssociatedColumnType,
|
||||
MergeRowOrder, ShuffleMergeOrder, StackMergeOrder, Version, merge_columnar,
|
||||
};
|
||||
use sstable::VoidSSTable;
|
||||
pub use value::{NumericalType, NumericalValue};
|
||||
|
||||
@@ -380,7 +380,7 @@ fn assert_columnar_eq(
|
||||
right: &ColumnarReader,
|
||||
lenient_on_numerical_value: bool,
|
||||
) {
|
||||
assert_eq!(left.num_rows(), right.num_rows());
|
||||
assert_eq!(left.num_docs(), right.num_docs());
|
||||
let left_columns = left.list_columns().unwrap();
|
||||
let right_columns = right.list_columns().unwrap();
|
||||
assert_eq!(left_columns.len(), right_columns.len());
|
||||
@@ -588,7 +588,7 @@ proptest! {
|
||||
#[test]
|
||||
fn test_single_columnar_builder_proptest(docs in columnar_docs_strategy()) {
|
||||
let columnar = build_columnar(&docs[..]);
|
||||
assert_eq!(columnar.num_rows() as usize, docs.len());
|
||||
assert_eq!(columnar.num_docs() as usize, docs.len());
|
||||
let mut expected_columns: HashMap<(&str, ColumnTypeCategory), HashMap<u32, Vec<&ColumnValue>> > = Default::default();
|
||||
for (doc_id, doc_vals) in docs.iter().enumerate() {
|
||||
for (col_name, col_val) in doc_vals {
|
||||
@@ -715,8 +715,9 @@ fn test_columnar_merging_number_columns() {
|
||||
// TODO test required_columns
|
||||
// TODO document edge case: required_columns incompatible with values.
|
||||
|
||||
fn columnar_docs_and_remap(
|
||||
) -> impl Strategy<Value = (Vec<Vec<Vec<(&'static str, ColumnValue)>>>, Vec<RowAddr>)> {
|
||||
#[allow(clippy::type_complexity)]
|
||||
fn columnar_docs_and_remap()
|
||||
-> impl Strategy<Value = (Vec<Vec<Vec<(&'static str, ColumnValue)>>>, Vec<RowAddr>)> {
|
||||
proptest::collection::vec(columnar_docs_strategy(), 2..=3).prop_flat_map(
|
||||
|columnars_docs: Vec<Vec<Vec<(&str, ColumnValue)>>>| {
|
||||
let row_addrs: Vec<RowAddr> = columnars_docs
|
||||
@@ -819,7 +820,7 @@ fn test_columnar_merge_empty() {
|
||||
)
|
||||
.unwrap();
|
||||
let merged_columnar = ColumnarReader::open(output).unwrap();
|
||||
assert_eq!(merged_columnar.num_rows(), 0);
|
||||
assert_eq!(merged_columnar.num_docs(), 0);
|
||||
assert_eq!(merged_columnar.num_columns(), 0);
|
||||
}
|
||||
|
||||
@@ -845,7 +846,7 @@ fn test_columnar_merge_single_str_column() {
|
||||
)
|
||||
.unwrap();
|
||||
let merged_columnar = ColumnarReader::open(output).unwrap();
|
||||
assert_eq!(merged_columnar.num_rows(), 1);
|
||||
assert_eq!(merged_columnar.num_docs(), 1);
|
||||
assert_eq!(merged_columnar.num_columns(), 1);
|
||||
}
|
||||
|
||||
@@ -877,7 +878,7 @@ fn test_delete_decrease_cardinality() {
|
||||
)
|
||||
.unwrap();
|
||||
let merged_columnar = ColumnarReader::open(output).unwrap();
|
||||
assert_eq!(merged_columnar.num_rows(), 1);
|
||||
assert_eq!(merged_columnar.num_docs(), 1);
|
||||
assert_eq!(merged_columnar.num_columns(), 1);
|
||||
let cols = merged_columnar.read_columns("c").unwrap();
|
||||
assert_eq!(cols.len(), 1);
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
[package]
|
||||
name = "tantivy-common"
|
||||
version = "0.7.0"
|
||||
version = "0.9.0"
|
||||
authors = ["Paul Masurel <paul@quickwit.io>", "Pascal Seitz <pascal@quickwit.io>"]
|
||||
license = "MIT"
|
||||
edition = "2021"
|
||||
edition = "2024"
|
||||
description = "common traits and utility functions used by multiple tantivy subcrates"
|
||||
documentation = "https://docs.rs/tantivy_common/"
|
||||
homepage = "https://github.com/quickwit-oss/tantivy"
|
||||
@@ -13,7 +13,7 @@ repository = "https://github.com/quickwit-oss/tantivy"
|
||||
|
||||
[dependencies]
|
||||
byteorder = "1.4.3"
|
||||
ownedbytes = { version= "0.7", path="../ownedbytes" }
|
||||
ownedbytes = { version= "0.9", path="../ownedbytes" }
|
||||
async-trait = "0.1"
|
||||
time = { version = "0.3.10", features = ["serde-well-known"] }
|
||||
serde = { version = "1.0.136", features = ["derive"] }
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use binggan::{black_box, BenchRunner};
|
||||
use binggan::{BenchRunner, black_box};
|
||||
use rand::seq::IteratorRandom;
|
||||
use rand::thread_rng;
|
||||
use tantivy_common::{serialize_vint_u32, BitSet, TinySet};
|
||||
use tantivy_common::{BitSet, TinySet, serialize_vint_u32};
|
||||
|
||||
fn bench_vint() {
|
||||
let mut runner = BenchRunner::new();
|
||||
|
||||
@@ -65,11 +65,11 @@ pub fn transform_bound_inner_res<TFrom, TTo>(
|
||||
) -> io::Result<Bound<TTo>> {
|
||||
use self::Bound::*;
|
||||
Ok(match bound {
|
||||
Excluded(ref from_val) => match transform(from_val)? {
|
||||
Excluded(from_val) => match transform(from_val)? {
|
||||
TransformBound::NewBound(new_val) => new_val,
|
||||
TransformBound::Existing(new_val) => Excluded(new_val),
|
||||
},
|
||||
Included(ref from_val) => match transform(from_val)? {
|
||||
Included(from_val) => match transform(from_val)? {
|
||||
TransformBound::NewBound(new_val) => new_val,
|
||||
TransformBound::Existing(new_val) => Included(new_val),
|
||||
},
|
||||
@@ -85,11 +85,11 @@ pub fn transform_bound_inner<TFrom, TTo>(
|
||||
) -> Bound<TTo> {
|
||||
use self::Bound::*;
|
||||
match bound {
|
||||
Excluded(ref from_val) => match transform(from_val) {
|
||||
Excluded(from_val) => match transform(from_val) {
|
||||
TransformBound::NewBound(new_val) => new_val,
|
||||
TransformBound::Existing(new_val) => Excluded(new_val),
|
||||
},
|
||||
Included(ref from_val) => match transform(from_val) {
|
||||
Included(from_val) => match transform(from_val) {
|
||||
TransformBound::NewBound(new_val) => new_val,
|
||||
TransformBound::Existing(new_val) => Included(new_val),
|
||||
},
|
||||
@@ -111,8 +111,8 @@ pub fn map_bound<TFrom, TTo>(
|
||||
) -> Bound<TTo> {
|
||||
use self::Bound::*;
|
||||
match bound {
|
||||
Excluded(ref from_val) => Bound::Excluded(transform(from_val)),
|
||||
Included(ref from_val) => Bound::Included(transform(from_val)),
|
||||
Excluded(from_val) => Bound::Excluded(transform(from_val)),
|
||||
Included(from_val) => Bound::Included(transform(from_val)),
|
||||
Unbounded => Unbounded,
|
||||
}
|
||||
}
|
||||
@@ -123,8 +123,8 @@ pub fn map_bound_res<TFrom, TTo, Err>(
|
||||
) -> Result<Bound<TTo>, Err> {
|
||||
use self::Bound::*;
|
||||
Ok(match bound {
|
||||
Excluded(ref from_val) => Excluded(transform(from_val)?),
|
||||
Included(ref from_val) => Included(transform(from_val)?),
|
||||
Excluded(from_val) => Excluded(transform(from_val)?),
|
||||
Included(from_val) => Included(transform(from_val)?),
|
||||
Unbounded => Unbounded,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use std::fs::File;
|
||||
use std::ops::{Deref, Range, RangeBounds};
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use std::{fmt, io};
|
||||
|
||||
@@ -73,7 +74,7 @@ impl FileHandle for WrapFile {
|
||||
{
|
||||
use std::io::{Read, Seek};
|
||||
let mut file = self.file.try_clone()?; // Clone the file to read from it separately
|
||||
// Seek to the start position in the file
|
||||
// Seek to the start position in the file
|
||||
file.seek(io::SeekFrom::Start(start as u64))?;
|
||||
// Read the data into the buffer
|
||||
file.read_exact(&mut buffer)?;
|
||||
@@ -177,6 +178,12 @@ fn combine_ranges<R: RangeBounds<usize>>(orig_range: Range<usize>, rel_range: R)
|
||||
}
|
||||
|
||||
impl FileSlice {
|
||||
/// Creates a FileSlice from a path.
|
||||
pub fn open(path: &Path) -> io::Result<FileSlice> {
|
||||
let wrap_file = WrapFile::new(File::open(path)?)?;
|
||||
Ok(FileSlice::new(Arc::new(wrap_file)))
|
||||
}
|
||||
|
||||
/// Wraps a FileHandle.
|
||||
pub fn new(file_handle: Arc<dyn FileHandle>) -> Self {
|
||||
let num_bytes = file_handle.len();
|
||||
@@ -339,8 +346,8 @@ mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::{FileHandle, FileSlice};
|
||||
use crate::file_slice::combine_ranges;
|
||||
use crate::HasLen;
|
||||
use crate::file_slice::combine_ranges;
|
||||
|
||||
#[test]
|
||||
fn test_file_slice() -> io::Result<()> {
|
||||
|
||||
@@ -22,7 +22,7 @@ pub use json_path_writer::JsonPathWriter;
|
||||
pub use ownedbytes::{OwnedBytes, StableDeref};
|
||||
pub use serialize::{BinarySerializable, DeserializeFrom, FixedSize};
|
||||
pub use vint::{
|
||||
read_u32_vint, read_u32_vint_no_advance, serialize_vint_u32, write_u32_vint, VInt, VIntU128,
|
||||
VInt, VIntU128, read_u32_vint, read_u32_vint_no_advance, serialize_vint_u32, write_u32_vint,
|
||||
};
|
||||
pub use writer::{AntiCallToken, CountingWriter, TerminatingWrite};
|
||||
|
||||
@@ -177,8 +177,10 @@ pub(crate) mod test {
|
||||
|
||||
#[test]
|
||||
fn test_f64_order() {
|
||||
assert!(!(f64_to_u64(f64::NEG_INFINITY)..f64_to_u64(f64::INFINITY))
|
||||
.contains(&f64_to_u64(f64::NAN))); // nan is not a number
|
||||
assert!(
|
||||
!(f64_to_u64(f64::NEG_INFINITY)..f64_to_u64(f64::INFINITY))
|
||||
.contains(&f64_to_u64(f64::NAN))
|
||||
); // nan is not a number
|
||||
assert!(f64_to_u64(1.5) > f64_to_u64(1.0)); // same exponent, different mantissa
|
||||
assert!(f64_to_u64(2.0) > f64_to_u64(1.0)); // same mantissa, different exponent
|
||||
assert!(f64_to_u64(2.0) > f64_to_u64(1.5)); // different exponent and mantissa
|
||||
|
||||
@@ -222,7 +222,7 @@ impl BinarySerializable for VInt {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use super::{serialize_vint_u32, BinarySerializable, VInt};
|
||||
use super::{BinarySerializable, VInt, serialize_vint_u32};
|
||||
|
||||
fn aux_test_vint(val: u64) {
|
||||
let mut v = [14u8; 10];
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
[package]
|
||||
authors = ["Paul Masurel <paul@quickwit.io>", "Pascal Seitz <pascal@quickwit.io>"]
|
||||
name = "ownedbytes"
|
||||
version = "0.7.0"
|
||||
version = "0.9.0"
|
||||
edition = "2021"
|
||||
description = "Expose data as static slice"
|
||||
license = "MIT"
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "tantivy-query-grammar"
|
||||
version = "0.22.0"
|
||||
version = "0.24.0"
|
||||
authors = ["Paul Masurel <paul.masurel@gmail.com>"]
|
||||
license = "MIT"
|
||||
categories = ["database-implementations", "data-structures"]
|
||||
@@ -9,7 +9,9 @@ homepage = "https://github.com/quickwit-oss/tantivy"
|
||||
repository = "https://github.com/quickwit-oss/tantivy"
|
||||
readme = "README.md"
|
||||
keywords = ["search", "information", "retrieval"]
|
||||
edition = "2021"
|
||||
edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
nom = "8"
|
||||
nom = "7"
|
||||
serde = { version = "1.0.219", features = ["derive"] }
|
||||
serde_json = "1.0.140"
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
use std::convert::Infallible;
|
||||
|
||||
use nom::{AsChar, IResult, InputLength, InputTakeAtPosition};
|
||||
use serde::Serialize;
|
||||
|
||||
pub(crate) type ErrorList = Vec<LenientErrorInternal>;
|
||||
pub(crate) type JResult<I, O> = IResult<I, (O, ErrorList), Infallible>;
|
||||
@@ -15,7 +16,8 @@ pub(crate) struct LenientErrorInternal {
|
||||
}
|
||||
|
||||
/// A recoverable error and the position it happened at
|
||||
#[derive(Debug, PartialEq)]
|
||||
#[derive(Debug, PartialEq, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub struct LenientError {
|
||||
pub pos: usize,
|
||||
pub message: String,
|
||||
@@ -184,19 +186,19 @@ macro_rules! tuple_trait_impl(
|
||||
);
|
||||
|
||||
macro_rules! tuple_trait_inner(
|
||||
($it:tt, $self:expr, $input:expr, (), $error_list:expr, $head:ident $($id:ident)+) => ({
|
||||
($it:tt, $self:expr_2021, $input:expr_2021, (), $error_list:expr_2021, $head:ident $($id:ident)+) => ({
|
||||
let (i, (o, mut err)) = $self.$it.parse($input.clone())?;
|
||||
$error_list.append(&mut err);
|
||||
|
||||
succ!($it, tuple_trait_inner!($self, i, ( o ), $error_list, $($id)+))
|
||||
});
|
||||
($it:tt, $self:expr, $input:expr, ($($parsed:tt)*), $error_list:expr, $head:ident $($id:ident)+) => ({
|
||||
($it:tt, $self:expr_2021, $input:expr_2021, ($($parsed:tt)*), $error_list:expr_2021, $head:ident $($id:ident)+) => ({
|
||||
let (i, (o, mut err)) = $self.$it.parse($input.clone())?;
|
||||
$error_list.append(&mut err);
|
||||
|
||||
succ!($it, tuple_trait_inner!($self, i, ($($parsed)* , o), $error_list, $($id)+))
|
||||
});
|
||||
($it:tt, $self:expr, $input:expr, ($($parsed:tt)*), $error_list:expr, $head:ident) => ({
|
||||
($it:tt, $self:expr_2021, $input:expr_2021, ($($parsed:tt)*), $error_list:expr_2021, $head:ident) => ({
|
||||
let (i, (o, mut err)) = $self.$it.parse($input.clone())?;
|
||||
$error_list.append(&mut err);
|
||||
|
||||
@@ -326,13 +328,13 @@ macro_rules! alt_trait_impl(
|
||||
);
|
||||
|
||||
macro_rules! alt_trait_inner(
|
||||
($it:tt, $self:expr, $input:expr, $head_cond:ident $head:ident, $($id_cond:ident $id:ident),+) => (
|
||||
($it:tt, $self:expr_2021, $input:expr_2021, $head_cond:ident $head:ident, $($id_cond:ident $id:ident),+) => (
|
||||
match $self.$it.0.parse($input.clone()) {
|
||||
Err(_) => succ!($it, alt_trait_inner!($self, $input, $($id_cond $id),+)),
|
||||
Ok((input_left, _)) => Some($self.$it.1.parse(input_left)),
|
||||
}
|
||||
);
|
||||
($it:tt, $self:expr, $input:expr, $head_cond:ident $head:ident) => (
|
||||
($it:tt, $self:expr_2021, $input:expr_2021, $head_cond:ident $head:ident) => (
|
||||
None
|
||||
);
|
||||
);
|
||||
@@ -353,3 +355,21 @@ where
|
||||
{
|
||||
move |i: I| l.choice(i.clone()).unwrap_or_else(|| default.parse(i))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_lenient_error_serialization() {
|
||||
let error = LenientError {
|
||||
pos: 42,
|
||||
message: "test error message".to_string(),
|
||||
};
|
||||
|
||||
assert_eq!(
|
||||
serde_json::to_string(&error).unwrap(),
|
||||
"{\"pos\":42,\"message\":\"test error message\"}"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
#![allow(clippy::derive_partial_eq_without_eq)]
|
||||
|
||||
use serde::Serialize;
|
||||
|
||||
mod infallible;
|
||||
mod occur;
|
||||
mod query_grammar;
|
||||
@@ -12,6 +14,8 @@ pub use crate::user_input_ast::{
|
||||
Delimiter, UserInputAst, UserInputBound, UserInputLeaf, UserInputLiteral,
|
||||
};
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub struct Error;
|
||||
|
||||
/// Parse a query
|
||||
@@ -24,3 +28,31 @@ pub fn parse_query(query: &str) -> Result<UserInputAst, Error> {
|
||||
pub fn parse_query_lenient(query: &str) -> (UserInputAst, Vec<LenientError>) {
|
||||
parse_to_ast_lenient(query)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::{parse_query, parse_query_lenient};
|
||||
|
||||
#[test]
|
||||
fn test_parse_query_serialization() {
|
||||
let ast = parse_query("title:hello OR title:x").unwrap();
|
||||
let json = serde_json::to_string(&ast).unwrap();
|
||||
assert_eq!(
|
||||
json,
|
||||
r#"{"type":"bool","clauses":[["should",{"type":"literal","field_name":"title","phrase":"hello","delimiter":"none","slop":0,"prefix":false}],["should",{"type":"literal","field_name":"title","phrase":"x","delimiter":"none","slop":0,"prefix":false}]]}"#
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_query_wrong_query() {
|
||||
assert!(parse_query("title:").is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_query_lenient_wrong_query() {
|
||||
let (_, errors) = parse_query_lenient("title:");
|
||||
assert!(errors.len() == 1);
|
||||
let json = serde_json::to_string(&errors).unwrap();
|
||||
assert_eq!(json, r#"[{"pos":6,"message":"expected word"}]"#);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,9 +1,12 @@
|
||||
use std::fmt;
|
||||
use std::fmt::Write;
|
||||
|
||||
use serde::Serialize;
|
||||
|
||||
/// Defines whether a term in a query must be present,
|
||||
/// should be present or must not be present.
|
||||
#[derive(Debug, Clone, Hash, Copy, Eq, PartialEq)]
|
||||
#[derive(Debug, Clone, Hash, Copy, Eq, PartialEq, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum Occur {
|
||||
/// For a given document to be considered for scoring,
|
||||
/// at least one of the queries with the Should or the Must
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use std::borrow::Cow;
|
||||
use std::iter::once;
|
||||
|
||||
use nom::IResult;
|
||||
use nom::branch::alt;
|
||||
use nom::bytes::complete::tag;
|
||||
use nom::character::complete::{
|
||||
@@ -10,12 +11,11 @@ use nom::combinator::{eof, map, map_res, opt, peek, recognize, value, verify};
|
||||
use nom::error::{Error, ErrorKind};
|
||||
use nom::multi::{many0, many1, separated_list0};
|
||||
use nom::sequence::{delimited, preceded, separated_pair, terminated, tuple};
|
||||
use nom::IResult;
|
||||
|
||||
use super::user_input_ast::{UserInputAst, UserInputBound, UserInputLeaf, UserInputLiteral};
|
||||
use crate::Occur;
|
||||
use crate::infallible::*;
|
||||
use crate::user_input_ast::Delimiter;
|
||||
use crate::Occur;
|
||||
|
||||
// Note: '-' char is only forbidden at the beginning of a field name, would be clearer to add it to
|
||||
// special characters.
|
||||
@@ -321,7 +321,17 @@ fn exists(inp: &str) -> IResult<&str, UserInputLeaf> {
|
||||
UserInputLeaf::Exists {
|
||||
field: String::new(),
|
||||
},
|
||||
tuple((multispace0, char('*'))),
|
||||
tuple((
|
||||
multispace0,
|
||||
char('*'),
|
||||
peek(alt((
|
||||
value(
|
||||
"",
|
||||
satisfy(|c: char| c.is_whitespace() || ESCAPE_IN_WORD.contains(&c)),
|
||||
),
|
||||
eof,
|
||||
))),
|
||||
)),
|
||||
)(inp)
|
||||
}
|
||||
|
||||
@@ -331,7 +341,14 @@ fn exists_precond(inp: &str) -> IResult<&str, (), ()> {
|
||||
peek(tuple((
|
||||
field_name,
|
||||
multispace0,
|
||||
char('*'), // when we are here, we know it can't be anything but a exists
|
||||
char('*'),
|
||||
peek(alt((
|
||||
value(
|
||||
"",
|
||||
satisfy(|c: char| c.is_whitespace() || ESCAPE_IN_WORD.contains(&c)),
|
||||
),
|
||||
eof,
|
||||
))), // we need to check this isn't a wildcard query
|
||||
))),
|
||||
)(inp)
|
||||
.map_err(|e| e.map(|_| ()))
|
||||
@@ -1013,7 +1030,7 @@ fn rewrite_ast(mut input: UserInputAst) -> UserInputAst {
|
||||
|
||||
fn rewrite_ast_clause(input: &mut (Option<Occur>, UserInputAst)) {
|
||||
match input {
|
||||
(None, UserInputAst::Clause(ref mut clauses)) if clauses.len() == 1 => {
|
||||
(None, UserInputAst::Clause(clauses)) if clauses.len() == 1 => {
|
||||
*input = clauses.pop().unwrap(); // safe because clauses.len() == 1
|
||||
}
|
||||
_ => {}
|
||||
@@ -1359,7 +1376,7 @@ mod test {
|
||||
|
||||
#[test]
|
||||
fn test_range_parser_lenient() {
|
||||
let literal = |query| literal_infallible(query).unwrap().1 .0.unwrap();
|
||||
let literal = |query| literal_infallible(query).unwrap().1.0.unwrap();
|
||||
|
||||
// same tests as non-lenient
|
||||
let res = literal("title: <hello");
|
||||
@@ -1624,13 +1641,19 @@ mod test {
|
||||
|
||||
#[test]
|
||||
fn test_exist_query() {
|
||||
test_parse_query_to_ast_helper("a:*", "\"a\":*");
|
||||
test_parse_query_to_ast_helper("a: *", "\"a\":*");
|
||||
// an exist followed by default term being b
|
||||
test_is_parse_err("a:*b", "(*\"a\":* *b)");
|
||||
test_parse_query_to_ast_helper("a:*", "$exists(\"a\")");
|
||||
test_parse_query_to_ast_helper("a: *", "$exists(\"a\")");
|
||||
|
||||
// this is a term query (not a phrase prefix)
|
||||
test_parse_query_to_ast_helper(
|
||||
"(hello AND toto:*) OR happy",
|
||||
"(?(+hello +$exists(\"toto\")) ?happy)",
|
||||
);
|
||||
test_parse_query_to_ast_helper("(a:*)", "$exists(\"a\")");
|
||||
|
||||
// these are term/wildcard query (not a phrase prefix)
|
||||
test_parse_query_to_ast_helper("a:b*", "\"a\":b*");
|
||||
test_parse_query_to_ast_helper("a:*b", "\"a\":*b");
|
||||
test_parse_query_to_ast_helper(r#"a:*def*"#, "\"a\":*def*");
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -1,9 +1,13 @@
|
||||
use std::fmt;
|
||||
use std::fmt::{Debug, Formatter};
|
||||
|
||||
use serde::Serialize;
|
||||
|
||||
use crate::Occur;
|
||||
|
||||
#[derive(PartialEq, Clone)]
|
||||
#[derive(PartialEq, Clone, Serialize)]
|
||||
#[serde(tag = "type")]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum UserInputLeaf {
|
||||
Literal(UserInputLiteral),
|
||||
All,
|
||||
@@ -47,7 +51,7 @@ impl UserInputLeaf {
|
||||
|
||||
pub(crate) fn set_default_field(&mut self, default_field: String) {
|
||||
match self {
|
||||
UserInputLeaf::Literal(ref mut literal) if literal.field_name.is_none() => {
|
||||
UserInputLeaf::Literal(literal) if literal.field_name.is_none() => {
|
||||
literal.field_name = Some(default_field)
|
||||
}
|
||||
UserInputLeaf::All => {
|
||||
@@ -55,12 +59,8 @@ impl UserInputLeaf {
|
||||
field: default_field,
|
||||
}
|
||||
}
|
||||
UserInputLeaf::Range { ref mut field, .. } if field.is_none() => {
|
||||
*field = Some(default_field)
|
||||
}
|
||||
UserInputLeaf::Set { ref mut field, .. } if field.is_none() => {
|
||||
*field = Some(default_field)
|
||||
}
|
||||
UserInputLeaf::Range { field, .. } if field.is_none() => *field = Some(default_field),
|
||||
UserInputLeaf::Set { field, .. } if field.is_none() => *field = Some(default_field),
|
||||
_ => (), // field was already set, do nothing
|
||||
}
|
||||
}
|
||||
@@ -71,11 +71,11 @@ impl Debug for UserInputLeaf {
|
||||
match self {
|
||||
UserInputLeaf::Literal(literal) => literal.fmt(formatter),
|
||||
UserInputLeaf::Range {
|
||||
ref field,
|
||||
ref lower,
|
||||
ref upper,
|
||||
field,
|
||||
lower,
|
||||
upper,
|
||||
} => {
|
||||
if let Some(ref field) = field {
|
||||
if let Some(field) = field {
|
||||
// TODO properly escape field (in case of \")
|
||||
write!(formatter, "\"{field}\":")?;
|
||||
}
|
||||
@@ -85,7 +85,7 @@ impl Debug for UserInputLeaf {
|
||||
Ok(())
|
||||
}
|
||||
UserInputLeaf::Set { field, elements } => {
|
||||
if let Some(ref field) = field {
|
||||
if let Some(field) = field {
|
||||
// TODO properly escape field (in case of \")
|
||||
write!(formatter, "\"{field}\": ")?;
|
||||
}
|
||||
@@ -101,20 +101,22 @@ impl Debug for UserInputLeaf {
|
||||
}
|
||||
UserInputLeaf::All => write!(formatter, "*"),
|
||||
UserInputLeaf::Exists { field } => {
|
||||
write!(formatter, "\"{field}\":*")
|
||||
write!(formatter, "$exists(\"{field}\")")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
|
||||
#[derive(Copy, Clone, Eq, PartialEq, Debug, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum Delimiter {
|
||||
SingleQuotes,
|
||||
DoubleQuotes,
|
||||
None,
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Clone)]
|
||||
#[derive(PartialEq, Clone, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub struct UserInputLiteral {
|
||||
pub field_name: Option<String>,
|
||||
pub phrase: String,
|
||||
@@ -152,7 +154,9 @@ impl fmt::Debug for UserInputLiteral {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Debug, Clone)]
|
||||
#[derive(PartialEq, Debug, Clone, Serialize)]
|
||||
#[serde(tag = "type", content = "value")]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum UserInputBound {
|
||||
Inclusive(String),
|
||||
Exclusive(String),
|
||||
@@ -187,11 +191,38 @@ impl UserInputBound {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Clone)]
|
||||
#[derive(PartialEq, Clone, Serialize)]
|
||||
#[serde(into = "UserInputAstSerde")]
|
||||
pub enum UserInputAst {
|
||||
Clause(Vec<(Option<Occur>, UserInputAst)>),
|
||||
Leaf(Box<UserInputLeaf>),
|
||||
Boost(Box<UserInputAst>, f64),
|
||||
Leaf(Box<UserInputLeaf>),
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
#[serde(tag = "type", rename_all = "snake_case")]
|
||||
enum UserInputAstSerde {
|
||||
Bool {
|
||||
clauses: Vec<(Option<Occur>, UserInputAst)>,
|
||||
},
|
||||
Boost {
|
||||
underlying: Box<UserInputAst>,
|
||||
boost: f64,
|
||||
},
|
||||
#[serde(untagged)]
|
||||
Leaf(Box<UserInputLeaf>),
|
||||
}
|
||||
|
||||
impl From<UserInputAst> for UserInputAstSerde {
|
||||
fn from(ast: UserInputAst) -> Self {
|
||||
match ast {
|
||||
UserInputAst::Clause(clause) => UserInputAstSerde::Bool { clauses: clause },
|
||||
UserInputAst::Boost(underlying, boost) => {
|
||||
UserInputAstSerde::Boost { underlying, boost }
|
||||
}
|
||||
UserInputAst::Leaf(leaf) => UserInputAstSerde::Leaf(leaf),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl UserInputAst {
|
||||
@@ -232,7 +263,7 @@ impl UserInputAst {
|
||||
.iter_mut()
|
||||
.for_each(|(_, ast)| ast.set_default_field(field.clone())),
|
||||
UserInputAst::Leaf(leaf) => leaf.set_default_field(field),
|
||||
UserInputAst::Boost(ref mut ast, _) => ast.set_default_field(field),
|
||||
UserInputAst::Boost(ast, _) => ast.set_default_field(field),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -285,3 +316,126 @@ impl fmt::Debug for UserInputAst {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_all_leaf_serialization() {
|
||||
let ast = UserInputAst::Leaf(Box::new(UserInputLeaf::All));
|
||||
let json = serde_json::to_string(&ast).unwrap();
|
||||
assert_eq!(json, r#"{"type":"all"}"#);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_literal_leaf_serialization() {
|
||||
let literal = UserInputLiteral {
|
||||
field_name: Some("title".to_string()),
|
||||
phrase: "hello".to_string(),
|
||||
delimiter: Delimiter::None,
|
||||
slop: 0,
|
||||
prefix: false,
|
||||
};
|
||||
let ast = UserInputAst::Leaf(Box::new(UserInputLeaf::Literal(literal)));
|
||||
let json = serde_json::to_string(&ast).unwrap();
|
||||
assert_eq!(
|
||||
json,
|
||||
r#"{"type":"literal","field_name":"title","phrase":"hello","delimiter":"none","slop":0,"prefix":false}"#
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_range_leaf_serialization() {
|
||||
let range = UserInputLeaf::Range {
|
||||
field: Some("price".to_string()),
|
||||
lower: UserInputBound::Inclusive("10".to_string()),
|
||||
upper: UserInputBound::Exclusive("100".to_string()),
|
||||
};
|
||||
let ast = UserInputAst::Leaf(Box::new(range));
|
||||
let json = serde_json::to_string(&ast).unwrap();
|
||||
assert_eq!(
|
||||
json,
|
||||
r#"{"type":"range","field":"price","lower":{"type":"inclusive","value":"10"},"upper":{"type":"exclusive","value":"100"}}"#
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_range_leaf_unbounded_serialization() {
|
||||
let range = UserInputLeaf::Range {
|
||||
field: Some("price".to_string()),
|
||||
lower: UserInputBound::Inclusive("10".to_string()),
|
||||
upper: UserInputBound::Unbounded,
|
||||
};
|
||||
let ast = UserInputAst::Leaf(Box::new(range));
|
||||
let json = serde_json::to_string(&ast).unwrap();
|
||||
assert_eq!(
|
||||
json,
|
||||
r#"{"type":"range","field":"price","lower":{"type":"inclusive","value":"10"},"upper":{"type":"unbounded"}}"#
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_boost_serialization() {
|
||||
let inner_ast = UserInputAst::Leaf(Box::new(UserInputLeaf::All));
|
||||
let boost_ast = UserInputAst::Boost(Box::new(inner_ast), 2.5);
|
||||
let json = serde_json::to_string(&boost_ast).unwrap();
|
||||
assert_eq!(
|
||||
json,
|
||||
r#"{"type":"boost","underlying":{"type":"all"},"boost":2.5}"#
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_boost_serialization2() {
|
||||
let boost_ast = UserInputAst::Boost(
|
||||
Box::new(UserInputAst::Clause(vec![
|
||||
(
|
||||
Some(Occur::Must),
|
||||
UserInputAst::Leaf(Box::new(UserInputLeaf::All)),
|
||||
),
|
||||
(
|
||||
Some(Occur::Should),
|
||||
UserInputAst::Leaf(Box::new(UserInputLeaf::Literal(UserInputLiteral {
|
||||
field_name: Some("title".to_string()),
|
||||
phrase: "hello".to_string(),
|
||||
delimiter: Delimiter::None,
|
||||
slop: 0,
|
||||
prefix: false,
|
||||
}))),
|
||||
),
|
||||
])),
|
||||
2.5,
|
||||
);
|
||||
let json = serde_json::to_string(&boost_ast).unwrap();
|
||||
assert_eq!(
|
||||
json,
|
||||
r#"{"type":"boost","underlying":{"type":"bool","clauses":[["must",{"type":"all"}],["should",{"type":"literal","field_name":"title","phrase":"hello","delimiter":"none","slop":0,"prefix":false}]]},"boost":2.5}"#
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_clause_serialization() {
|
||||
let clause = UserInputAst::Clause(vec![
|
||||
(
|
||||
Some(Occur::Must),
|
||||
UserInputAst::Leaf(Box::new(UserInputLeaf::All)),
|
||||
),
|
||||
(
|
||||
Some(Occur::Should),
|
||||
UserInputAst::Leaf(Box::new(UserInputLeaf::Literal(UserInputLiteral {
|
||||
field_name: Some("title".to_string()),
|
||||
phrase: "hello".to_string(),
|
||||
delimiter: Delimiter::None,
|
||||
slop: 0,
|
||||
prefix: false,
|
||||
}))),
|
||||
),
|
||||
]);
|
||||
let json = serde_json::to_string(&clause).unwrap();
|
||||
assert_eq!(
|
||||
json,
|
||||
r#"{"type":"bool","clauses":[["must",{"type":"all"}],["should",{"type":"literal","field_name":"title","phrase":"hello","delimiter":"none","slop":0,"prefix":false}]]}"#
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -34,10 +34,10 @@ use crate::aggregation::*;
|
||||
pub struct DateHistogramAggregationReq {
|
||||
#[doc(hidden)]
|
||||
/// Only for validation
|
||||
interval: Option<String>,
|
||||
pub interval: Option<String>,
|
||||
#[doc(hidden)]
|
||||
/// Only for validation
|
||||
calendar_interval: Option<String>,
|
||||
pub calendar_interval: Option<String>,
|
||||
/// The field to aggregate on.
|
||||
pub field: String,
|
||||
/// The format to format dates. Unsupported currently.
|
||||
|
||||
@@ -366,8 +366,12 @@ impl PartialEq for Key {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
match (self, other) {
|
||||
(Self::Str(l), Self::Str(r)) => l == r,
|
||||
(Self::F64(l), Self::F64(r)) => l == r,
|
||||
_ => false,
|
||||
(Self::F64(l), Self::F64(r)) => l.to_bits() == r.to_bits(),
|
||||
(Self::I64(l), Self::I64(r)) => l == r,
|
||||
(Self::U64(l), Self::U64(r)) => l == r,
|
||||
// we list all variant of left operand to make sure this gets updated when we add
|
||||
// variants to the enum
|
||||
(Self::Str(_) | Self::F64(_) | Self::I64(_) | Self::U64(_), _) => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,11 +2,13 @@ use std::fmt;
|
||||
use std::marker::PhantomData;
|
||||
use std::sync::Arc;
|
||||
|
||||
use columnar::ColumnValues;
|
||||
use columnar::{ColumnValues, StrColumn};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use super::Collector;
|
||||
use crate::collector::custom_score_top_collector::CustomScoreTopCollector;
|
||||
use crate::collector::custom_score_top_collector::{
|
||||
CustomScoreTopCollector, CustomScoreTopSegmentCollector,
|
||||
};
|
||||
use crate::collector::top_collector::{ComparableDoc, TopCollector, TopSegmentCollector};
|
||||
use crate::collector::tweak_score_top_collector::TweakedScoreTopCollector;
|
||||
use crate::collector::{
|
||||
@@ -14,6 +16,7 @@ use crate::collector::{
|
||||
};
|
||||
use crate::fastfield::{FastFieldNotAvailableError, FastValue};
|
||||
use crate::query::Weight;
|
||||
use crate::termdict::TermOrdinal;
|
||||
use crate::{DocAddress, DocId, Order, Score, SegmentOrdinal, SegmentReader, TantivyError};
|
||||
|
||||
struct FastFieldConvertCollector<
|
||||
@@ -83,6 +86,163 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
struct StringConvertCollector {
|
||||
pub collector: CustomScoreTopCollector<ScorerByField, u64>,
|
||||
pub field: String,
|
||||
order: Order,
|
||||
limit: usize,
|
||||
offset: usize,
|
||||
}
|
||||
|
||||
impl Collector for StringConvertCollector {
|
||||
type Fruit = Vec<(String, DocAddress)>;
|
||||
|
||||
type Child = StringConvertSegmentCollector;
|
||||
|
||||
fn for_segment(
|
||||
&self,
|
||||
segment_local_id: crate::SegmentOrdinal,
|
||||
segment: &SegmentReader,
|
||||
) -> crate::Result<Self::Child> {
|
||||
let schema = segment.schema();
|
||||
let field = schema.get_field(&self.field)?;
|
||||
let field_entry = schema.get_field_entry(field);
|
||||
if !field_entry.is_fast() {
|
||||
return Err(TantivyError::SchemaError(format!(
|
||||
"Field {:?} is not a fast field.",
|
||||
field_entry.name()
|
||||
)));
|
||||
}
|
||||
let requested_type = crate::schema::Type::Str;
|
||||
let schema_type = field_entry.field_type().value_type();
|
||||
if schema_type != requested_type {
|
||||
return Err(TantivyError::SchemaError(format!(
|
||||
"Field {:?} is of type {schema_type:?}!={requested_type:?}",
|
||||
field_entry.name()
|
||||
)));
|
||||
}
|
||||
let ff = segment
|
||||
.fast_fields()
|
||||
.str(&self.field)?
|
||||
.expect("ff should be a str field");
|
||||
Ok(StringConvertSegmentCollector {
|
||||
collector: self.collector.for_segment(segment_local_id, segment)?,
|
||||
ff,
|
||||
order: self.order.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
fn requires_scoring(&self) -> bool {
|
||||
self.collector.requires_scoring()
|
||||
}
|
||||
|
||||
fn merge_fruits(
|
||||
&self,
|
||||
child_fruits: Vec<<Self::Child as SegmentCollector>::Fruit>,
|
||||
) -> crate::Result<Self::Fruit> {
|
||||
if self.limit == 0 {
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
if self.order.is_desc() {
|
||||
let mut top_collector: TopNComputer<_, _, true> =
|
||||
TopNComputer::new(self.limit + self.offset);
|
||||
for child_fruit in child_fruits {
|
||||
for (feature, doc) in child_fruit {
|
||||
top_collector.push(feature, doc);
|
||||
}
|
||||
}
|
||||
Ok(top_collector
|
||||
.into_sorted_vec()
|
||||
.into_iter()
|
||||
.skip(self.offset)
|
||||
.map(|cdoc| (cdoc.feature, cdoc.doc))
|
||||
.collect())
|
||||
} else {
|
||||
let mut top_collector: TopNComputer<_, _, false> =
|
||||
TopNComputer::new(self.limit + self.offset);
|
||||
for child_fruit in child_fruits {
|
||||
for (feature, doc) in child_fruit {
|
||||
top_collector.push(feature, doc);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(top_collector
|
||||
.into_sorted_vec()
|
||||
.into_iter()
|
||||
.skip(self.offset)
|
||||
.map(|cdoc| (cdoc.feature, cdoc.doc))
|
||||
.collect())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct StringConvertSegmentCollector {
|
||||
pub collector: CustomScoreTopSegmentCollector<ScorerByFastFieldReader, u64>,
|
||||
ff: StrColumn,
|
||||
order: Order,
|
||||
}
|
||||
|
||||
impl SegmentCollector for StringConvertSegmentCollector {
|
||||
type Fruit = Vec<(String, DocAddress)>;
|
||||
|
||||
fn collect(&mut self, doc: DocId, score: Score) {
|
||||
self.collector.collect(doc, score);
|
||||
}
|
||||
|
||||
fn harvest(self) -> Vec<(String, DocAddress)> {
|
||||
let top_ordinals: Vec<(TermOrdinal, DocAddress)> = self.collector.harvest();
|
||||
|
||||
// Collect terms.
|
||||
let mut terms: Vec<String> = Vec::with_capacity(top_ordinals.len());
|
||||
let result = if self.order.is_asc() {
|
||||
self.ff.dictionary().sorted_ords_to_term_cb(
|
||||
top_ordinals.iter().map(|(term_ord, _)| u64::MAX - term_ord),
|
||||
|term| {
|
||||
terms.push(
|
||||
std::str::from_utf8(term)
|
||||
.expect("Failed to decode term as unicode")
|
||||
.to_owned(),
|
||||
);
|
||||
Ok(())
|
||||
},
|
||||
)
|
||||
} else {
|
||||
self.ff.dictionary().sorted_ords_to_term_cb(
|
||||
top_ordinals.iter().rev().map(|(term_ord, _)| *term_ord),
|
||||
|term| {
|
||||
terms.push(
|
||||
std::str::from_utf8(term)
|
||||
.expect("Failed to decode term as unicode")
|
||||
.to_owned(),
|
||||
);
|
||||
Ok(())
|
||||
},
|
||||
)
|
||||
};
|
||||
|
||||
assert!(
|
||||
result.expect("Failed to read terms from term dictionary"),
|
||||
"Not all terms were matched in segment."
|
||||
);
|
||||
|
||||
// Zip them back with their docs.
|
||||
if self.order.is_asc() {
|
||||
terms
|
||||
.into_iter()
|
||||
.zip(top_ordinals)
|
||||
.map(|(term, (_, doc))| (term, doc))
|
||||
.collect()
|
||||
} else {
|
||||
terms
|
||||
.into_iter()
|
||||
.rev()
|
||||
.zip(top_ordinals)
|
||||
.map(|(term, (_, doc))| (term, doc))
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The `TopDocs` collector keeps track of the top `K` documents
|
||||
/// sorted by their score.
|
||||
///
|
||||
@@ -410,6 +570,30 @@ impl TopDocs {
|
||||
}
|
||||
}
|
||||
|
||||
/// Like `order_by_fast_field`, but for a `String` fast field.
|
||||
pub fn order_by_string_fast_field(
|
||||
self,
|
||||
fast_field: impl ToString,
|
||||
order: Order,
|
||||
) -> impl Collector<Fruit = Vec<(String, DocAddress)>> {
|
||||
let limit = self.0.limit;
|
||||
let offset = self.0.offset;
|
||||
let u64_collector = CustomScoreTopCollector::new(
|
||||
ScorerByField {
|
||||
field: fast_field.to_string(),
|
||||
order: order.clone(),
|
||||
},
|
||||
self.0.into_tscore(),
|
||||
);
|
||||
StringConvertCollector {
|
||||
collector: u64_collector,
|
||||
field: fast_field.to_string(),
|
||||
order,
|
||||
limit,
|
||||
offset,
|
||||
}
|
||||
}
|
||||
|
||||
/// Ranks the documents using a custom score.
|
||||
///
|
||||
/// This method offers a convenient way to tweak or replace
|
||||
@@ -1214,6 +1398,94 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_top_field_collector_string() -> crate::Result<()> {
|
||||
let mut schema_builder = Schema::builder();
|
||||
let city = schema_builder.add_text_field("city", TEXT | FAST);
|
||||
let schema = schema_builder.build();
|
||||
let index = Index::create_in_ram(schema);
|
||||
let mut index_writer = index.writer_for_tests()?;
|
||||
index_writer.add_document(doc!(
|
||||
city => "austin",
|
||||
))?;
|
||||
index_writer.add_document(doc!(
|
||||
city => "greenville",
|
||||
))?;
|
||||
index_writer.add_document(doc!(
|
||||
city => "tokyo",
|
||||
))?;
|
||||
index_writer.commit()?;
|
||||
|
||||
fn query(
|
||||
index: &Index,
|
||||
order: Order,
|
||||
limit: usize,
|
||||
offset: usize,
|
||||
) -> crate::Result<Vec<(String, DocAddress)>> {
|
||||
let searcher = index.reader()?.searcher();
|
||||
let top_collector = TopDocs::with_limit(limit)
|
||||
.and_offset(offset)
|
||||
.order_by_string_fast_field("city", order);
|
||||
searcher.search(&AllQuery, &top_collector)
|
||||
}
|
||||
|
||||
assert_eq!(
|
||||
&query(&index, Order::Desc, 3, 0)?,
|
||||
&[
|
||||
("tokyo".to_owned(), DocAddress::new(0, 2)),
|
||||
("greenville".to_owned(), DocAddress::new(0, 1)),
|
||||
("austin".to_owned(), DocAddress::new(0, 0)),
|
||||
]
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
&query(&index, Order::Desc, 2, 0)?,
|
||||
&[
|
||||
("tokyo".to_owned(), DocAddress::new(0, 2)),
|
||||
("greenville".to_owned(), DocAddress::new(0, 1)),
|
||||
]
|
||||
);
|
||||
|
||||
assert_eq!(&query(&index, Order::Desc, 3, 3)?, &[]);
|
||||
|
||||
assert_eq!(
|
||||
&query(&index, Order::Desc, 2, 1)?,
|
||||
&[
|
||||
("greenville".to_owned(), DocAddress::new(0, 1)),
|
||||
("austin".to_owned(), DocAddress::new(0, 0)),
|
||||
]
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
&query(&index, Order::Asc, 3, 0)?,
|
||||
&[
|
||||
("austin".to_owned(), DocAddress::new(0, 0)),
|
||||
("greenville".to_owned(), DocAddress::new(0, 1)),
|
||||
("tokyo".to_owned(), DocAddress::new(0, 2)),
|
||||
]
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
&query(&index, Order::Asc, 2, 1)?,
|
||||
&[
|
||||
("greenville".to_owned(), DocAddress::new(0, 1)),
|
||||
("tokyo".to_owned(), DocAddress::new(0, 2)),
|
||||
]
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
&query(&index, Order::Asc, 2, 0)?,
|
||||
&[
|
||||
("austin".to_owned(), DocAddress::new(0, 0)),
|
||||
("greenville".to_owned(), DocAddress::new(0, 1)),
|
||||
]
|
||||
);
|
||||
|
||||
assert_eq!(&query(&index, Order::Asc, 3, 3)?, &[]);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic]
|
||||
fn test_field_does_not_exist() {
|
||||
|
||||
@@ -41,16 +41,12 @@ impl Executor {
|
||||
///
|
||||
/// Regardless of the executor (`SingleThread` or `ThreadPool`), panics in the task
|
||||
/// will propagate to the caller.
|
||||
pub fn map<
|
||||
pub fn map<A, R, F>(&self, f: F, args: impl Iterator<Item = A>) -> crate::Result<Vec<R>>
|
||||
where
|
||||
A: Send,
|
||||
R: Send,
|
||||
AIterator: Iterator<Item = A>,
|
||||
F: Sized + Sync + Fn(A) -> crate::Result<R>,
|
||||
>(
|
||||
&self,
|
||||
f: F,
|
||||
args: AIterator,
|
||||
) -> crate::Result<Vec<R>> {
|
||||
{
|
||||
match self {
|
||||
Executor::SingleThread => args.map(f).collect::<crate::Result<_>>(),
|
||||
Executor::ThreadPool(pool) => {
|
||||
|
||||
@@ -214,7 +214,7 @@ impl Searcher {
|
||||
/// It is powerless at making search faster if your index consists in
|
||||
/// one large segment.
|
||||
///
|
||||
/// Also, keep in my multithreading a single query on several
|
||||
/// Also, keep in mind multithreading a single query on several
|
||||
/// threads will not improve your throughput. It can actually
|
||||
/// hurt it. It will however, decrease the average response time.
|
||||
pub fn search_with_executor<C: Collector>(
|
||||
|
||||
@@ -1,3 +1,9 @@
|
||||
//! The footer is a small metadata structure that is appended at the end of every file.
|
||||
//!
|
||||
//! The footer is used to store a checksum of the file content.
|
||||
//! The footer also stores the version of the index format.
|
||||
//! This version is used to detect incompatibility between the index and the library version.
|
||||
|
||||
use std::io;
|
||||
use std::io::Write;
|
||||
|
||||
@@ -20,20 +26,22 @@ type CrcHashU32 = u32;
|
||||
/// A Footer is appended to every file
|
||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
||||
pub struct Footer {
|
||||
/// The version of the index format
|
||||
pub version: Version,
|
||||
/// The crc32 hash of the body
|
||||
pub crc: CrcHashU32,
|
||||
}
|
||||
|
||||
impl Footer {
|
||||
pub fn new(crc: CrcHashU32) -> Self {
|
||||
pub(crate) fn new(crc: CrcHashU32) -> Self {
|
||||
let version = crate::VERSION.clone();
|
||||
Footer { version, crc }
|
||||
}
|
||||
|
||||
pub fn crc(&self) -> CrcHashU32 {
|
||||
pub(crate) fn crc(&self) -> CrcHashU32 {
|
||||
self.crc
|
||||
}
|
||||
pub fn append_footer<W: io::Write>(&self, mut write: &mut W) -> io::Result<()> {
|
||||
pub(crate) fn append_footer<W: io::Write>(&self, mut write: &mut W) -> io::Result<()> {
|
||||
let mut counting_write = CountingWriter::wrap(&mut write);
|
||||
counting_write.write_all(serde_json::to_string(&self)?.as_ref())?;
|
||||
let footer_payload_len = counting_write.written_bytes();
|
||||
@@ -42,6 +50,7 @@ impl Footer {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Extracts the tantivy Footer from the file and returns the footer and the rest of the file
|
||||
pub fn extract_footer(file: FileSlice) -> io::Result<(Footer, FileSlice)> {
|
||||
if file.len() < 4 {
|
||||
return Err(io::Error::new(
|
||||
|
||||
@@ -6,7 +6,7 @@ mod mmap_directory;
|
||||
mod directory;
|
||||
mod directory_lock;
|
||||
mod file_watcher;
|
||||
mod footer;
|
||||
pub mod footer;
|
||||
mod managed_directory;
|
||||
mod ram_directory;
|
||||
mod watch_event_router;
|
||||
|
||||
@@ -942,7 +942,7 @@ mod tests {
|
||||
|
||||
let numbers = [100, 200, 300];
|
||||
let test_range = |range: RangeInclusive<u64>| {
|
||||
let expected_count = numbers.iter().filter(|num| range.contains(num)).count();
|
||||
let expected_count = numbers.iter().filter(|num| range.contains(*num)).count();
|
||||
let mut vec = vec![];
|
||||
field.get_row_ids_for_value_range(range, 0..u32::MAX, &mut vec);
|
||||
assert_eq!(vec.len(), expected_count);
|
||||
@@ -1020,7 +1020,7 @@ mod tests {
|
||||
|
||||
let numbers = [1000, 1001, 1003];
|
||||
let test_range = |range: RangeInclusive<u64>| {
|
||||
let expected_count = numbers.iter().filter(|num| range.contains(num)).count();
|
||||
let expected_count = numbers.iter().filter(|num| range.contains(*num)).count();
|
||||
let mut vec = vec![];
|
||||
field.get_row_ids_for_value_range(range, 0..u32::MAX, &mut vec);
|
||||
assert_eq!(vec.len(), expected_count);
|
||||
|
||||
@@ -2553,7 +2553,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_writer_options_validation() {
|
||||
let mut schema_builder = Schema::builder();
|
||||
let field = schema_builder.add_bool_field("example", STORED);
|
||||
let _field = schema_builder.add_bool_field("example", STORED);
|
||||
let index = Index::create_in_ram(schema_builder.build());
|
||||
|
||||
let opt_wo_threads = IndexWriterOptions::builder().num_worker_threads(0).build();
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use std::any::Any;
|
||||
use std::borrow::BorrowMut;
|
||||
use std::collections::HashSet;
|
||||
use std::io::Write;
|
||||
@@ -23,7 +24,9 @@ use crate::indexer::{
|
||||
DefaultMergePolicy, MergeCandidate, MergeOperation, MergePolicy, SegmentEntry,
|
||||
SegmentSerializer,
|
||||
};
|
||||
use crate::{FutureResult, Opstamp};
|
||||
use crate::{FutureResult, Opstamp, TantivyError};
|
||||
|
||||
const PANIC_CAUGHT: &str = "Panic caught in merge thread";
|
||||
|
||||
/// Save the index meta file.
|
||||
/// This operation is atomic:
|
||||
@@ -287,6 +290,15 @@ impl SegmentUpdater {
|
||||
let merge_thread_pool = ThreadPoolBuilder::new()
|
||||
.thread_name(|i| format!("merge_thread_{i}"))
|
||||
.num_threads(num_merge_threads)
|
||||
.panic_handler(move |panic| {
|
||||
// We don't print the panic content itself,
|
||||
// it is already printed during the unwinding
|
||||
if let Some(message) = panic.downcast_ref::<&str>() {
|
||||
if *message != PANIC_CAUGHT {
|
||||
error!("uncaught merge panic")
|
||||
}
|
||||
}
|
||||
})
|
||||
.build()
|
||||
.map_err(|_| {
|
||||
crate::TantivyError::SystemError(
|
||||
@@ -506,11 +518,34 @@ impl SegmentUpdater {
|
||||
// Its lifetime is used to track how many merging thread are currently running,
|
||||
// as well as which segment is currently in merge and therefore should not be
|
||||
// candidate for another merge.
|
||||
match merge(
|
||||
&segment_updater.index,
|
||||
segment_entries,
|
||||
merge_operation.target_opstamp(),
|
||||
) {
|
||||
let merge_panic_res = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
|
||||
merge(
|
||||
&segment_updater.index,
|
||||
segment_entries,
|
||||
merge_operation.target_opstamp(),
|
||||
)
|
||||
}));
|
||||
let merge_res = match merge_panic_res {
|
||||
Ok(merge_res) => merge_res,
|
||||
Err(panic_err) => {
|
||||
let panic_str = if let Some(msg) = panic_err.downcast_ref::<&str>() {
|
||||
*msg
|
||||
} else if let Some(msg) = panic_err.downcast_ref::<String>() {
|
||||
msg.as_str()
|
||||
} else {
|
||||
"UNKNOWN"
|
||||
};
|
||||
let _send_result = merging_future_send.send(Err(TantivyError::SystemError(
|
||||
format!("Merge thread panicked: {panic_str}"),
|
||||
)));
|
||||
// Resume unwinding because we forced unwind safety with
|
||||
// `std::panic::AssertUnwindSafe` Use a specific message so
|
||||
// the panic_handler can double check that we properly caught the panic.
|
||||
let boxed_panic_message: Box<dyn Any + Send> = Box::new(PANIC_CAUGHT);
|
||||
std::panic::resume_unwind(boxed_panic_message);
|
||||
}
|
||||
};
|
||||
match merge_res {
|
||||
Ok(after_merge_segment_entry) => {
|
||||
let res = segment_updater.end_merge(merge_operation, after_merge_segment_entry);
|
||||
let _send_result = merging_future_send.send(res);
|
||||
|
||||
@@ -1,79 +1,19 @@
|
||||
use std::ops::Range;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::Opstamp;
|
||||
|
||||
#[cfg(not(target_arch = "arm"))]
|
||||
mod atomic_impl {
|
||||
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
|
||||
use crate::Opstamp;
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct AtomicU64Wrapper(AtomicU64);
|
||||
|
||||
impl AtomicU64Wrapper {
|
||||
pub fn new(first_opstamp: Opstamp) -> AtomicU64Wrapper {
|
||||
AtomicU64Wrapper(AtomicU64::new(first_opstamp))
|
||||
}
|
||||
|
||||
pub fn fetch_add(&self, val: u64, order: Ordering) -> u64 {
|
||||
self.0.fetch_add(val, order)
|
||||
}
|
||||
|
||||
pub fn revert(&self, val: u64, order: Ordering) -> u64 {
|
||||
self.0.store(val, order);
|
||||
val
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(target_arch = "arm")]
|
||||
mod atomic_impl {
|
||||
|
||||
/// Under other architecture, we rely on a mutex.
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::RwLock;
|
||||
|
||||
use crate::Opstamp;
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct AtomicU64Wrapper(RwLock<u64>);
|
||||
|
||||
impl AtomicU64Wrapper {
|
||||
pub fn new(first_opstamp: Opstamp) -> AtomicU64Wrapper {
|
||||
AtomicU64Wrapper(RwLock::new(first_opstamp))
|
||||
}
|
||||
|
||||
pub fn fetch_add(&self, incr: u64, _order: Ordering) -> u64 {
|
||||
let mut lock = self.0.write().unwrap();
|
||||
let previous_val = *lock;
|
||||
*lock = previous_val + incr;
|
||||
previous_val
|
||||
}
|
||||
|
||||
pub fn revert(&self, val: u64, _order: Ordering) -> u64 {
|
||||
let mut lock = self.0.write().unwrap();
|
||||
*lock = val;
|
||||
val
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
use self::atomic_impl::AtomicU64Wrapper;
|
||||
|
||||
/// Stamper provides Opstamps, which is just an auto-increment id to label
|
||||
/// an operation.
|
||||
///
|
||||
/// Cloning does not "fork" the stamp generation. The stamper actually wraps an `Arc`.
|
||||
#[derive(Clone, Default)]
|
||||
pub struct Stamper(Arc<AtomicU64Wrapper>);
|
||||
pub struct Stamper(Arc<AtomicU64>);
|
||||
|
||||
impl Stamper {
|
||||
pub fn new(first_opstamp: Opstamp) -> Stamper {
|
||||
Stamper(Arc::new(AtomicU64Wrapper::new(first_opstamp)))
|
||||
Stamper(Arc::new(AtomicU64::new(first_opstamp)))
|
||||
}
|
||||
|
||||
pub fn stamp(&self) -> Opstamp {
|
||||
@@ -92,7 +32,8 @@ impl Stamper {
|
||||
|
||||
/// Reverts the stamper to a given `Opstamp` value and returns it
|
||||
pub fn revert(&self, to_opstamp: Opstamp) -> Opstamp {
|
||||
self.0.revert(to_opstamp, Ordering::SeqCst)
|
||||
self.0.store(to_opstamp, Ordering::SeqCst);
|
||||
to_opstamp
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -15,7 +15,7 @@ fn encode_bitwidth(bitwidth: u8, delta_1: bool) -> u8 {
|
||||
}
|
||||
|
||||
fn decode_bitwidth(raw_bitwidth: u8) -> (u8, bool) {
|
||||
let delta_1 = (raw_bitwidth >> 6 & 1) != 0;
|
||||
let delta_1 = ((raw_bitwidth >> 6) & 1) != 0;
|
||||
let bitwidth = raw_bitwidth & 0x3f;
|
||||
(bitwidth, delta_1)
|
||||
}
|
||||
|
||||
@@ -15,7 +15,7 @@ use crate::schema::field_type::ValueParsingError;
|
||||
use crate::schema::{Facet, Field, NamedFieldDocument, OwnedValue, Schema};
|
||||
use crate::tokenizer::PreTokenizedString;
|
||||
|
||||
#[repr(packed)]
|
||||
#[repr(C, packed)]
|
||||
#[derive(Debug, Clone)]
|
||||
/// A field value pair in the compact tantivy document
|
||||
struct FieldValueAddr {
|
||||
@@ -480,7 +480,7 @@ impl<'a> CompactDocValue<'a> {
|
||||
type Addr = u32;
|
||||
|
||||
#[derive(Clone, Copy, Default)]
|
||||
#[repr(packed)]
|
||||
#[repr(C, packed)]
|
||||
/// The value type and the address to its payload in the container.
|
||||
struct ValueAddr {
|
||||
type_id: ValueType,
|
||||
@@ -734,7 +734,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_json_value() {
|
||||
let json_str = r#"{
|
||||
let json_str = r#"{
|
||||
"toto": "titi",
|
||||
"float": -0.2,
|
||||
"bool": true,
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
[package]
|
||||
name = "tantivy-sstable"
|
||||
version = "0.3.0"
|
||||
edition = "2021"
|
||||
version = "0.5.0"
|
||||
edition = "2024"
|
||||
license = "MIT"
|
||||
homepage = "https://github.com/quickwit-oss/tantivy"
|
||||
repository = "https://github.com/quickwit-oss/tantivy"
|
||||
@@ -10,13 +10,16 @@ categories = ["database-implementations", "data-structures", "compression"]
|
||||
description = "sstables for tantivy"
|
||||
|
||||
[dependencies]
|
||||
common = {version= "0.7", path="../common", package="tantivy-common"}
|
||||
common = {version= "0.9", path="../common", package="tantivy-common"}
|
||||
futures-util = "0.3.30"
|
||||
itertools = "0.14.0"
|
||||
tantivy-bitpacker = { version= "0.6", path="../bitpacker" }
|
||||
tantivy-bitpacker = { version= "0.8", path="../bitpacker" }
|
||||
tantivy-fst = "0.5"
|
||||
# experimental gives us access to Decompressor::upper_bound
|
||||
zstd = { version = "0.13", features = ["experimental"] }
|
||||
zstd = { version = "0.13", optional = true, features = ["experimental"] }
|
||||
|
||||
[features]
|
||||
zstd-compression = ["zstd"]
|
||||
|
||||
[dev-dependencies]
|
||||
proptest = "1"
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use common::file_slice::FileSlice;
|
||||
use common::OwnedBytes;
|
||||
use criterion::{criterion_group, criterion_main, Criterion};
|
||||
use common::file_slice::FileSlice;
|
||||
use criterion::{Criterion, criterion_group, criterion_main};
|
||||
use tantivy_sstable::{Dictionary, MonotonicU64SSTable};
|
||||
|
||||
fn make_test_sstable(suffix: &str) -> FileSlice {
|
||||
|
||||
@@ -2,7 +2,7 @@ use std::collections::BTreeSet;
|
||||
use std::io;
|
||||
|
||||
use common::file_slice::FileSlice;
|
||||
use criterion::{criterion_group, criterion_main, Criterion};
|
||||
use criterion::{Criterion, criterion_group, criterion_main};
|
||||
use rand::rngs::StdRng;
|
||||
use rand::{Rng, SeedableRng};
|
||||
use tantivy_sstable::{Dictionary, MonotonicU64SSTable};
|
||||
|
||||
@@ -2,6 +2,7 @@ use std::io::{self, Read};
|
||||
use std::ops::Range;
|
||||
|
||||
use common::OwnedBytes;
|
||||
#[cfg(feature = "zstd-compression")]
|
||||
use zstd::bulk::Decompressor;
|
||||
|
||||
pub struct BlockReader {
|
||||
@@ -51,18 +52,21 @@ impl BlockReader {
|
||||
let block_len = match self.reader.len() {
|
||||
0 => {
|
||||
// we are out of data for this block. Check if we have another block after
|
||||
if let Some(new_reader) = self.next_readers.next() {
|
||||
self.reader = new_reader;
|
||||
continue;
|
||||
} else {
|
||||
return Ok(false);
|
||||
match self.next_readers.next() {
|
||||
Some(new_reader) => {
|
||||
self.reader = new_reader;
|
||||
continue;
|
||||
}
|
||||
_ => {
|
||||
return Ok(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
1..=3 => {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::UnexpectedEof,
|
||||
"failed to read block_len",
|
||||
))
|
||||
));
|
||||
}
|
||||
_ => self.reader.read_u32() as usize,
|
||||
};
|
||||
@@ -79,13 +83,23 @@ impl BlockReader {
|
||||
));
|
||||
}
|
||||
if compress == 1 {
|
||||
let required_capacity =
|
||||
Decompressor::upper_bound(&self.reader[..block_len]).unwrap_or(1024 * 1024);
|
||||
self.buffer.reserve(required_capacity);
|
||||
Decompressor::new()?
|
||||
.decompress_to_buffer(&self.reader[..block_len], &mut self.buffer)?;
|
||||
#[cfg(feature = "zstd-compression")]
|
||||
{
|
||||
let required_capacity =
|
||||
Decompressor::upper_bound(&self.reader[..block_len]).unwrap_or(1024 * 1024);
|
||||
self.buffer.reserve(required_capacity);
|
||||
Decompressor::new()?
|
||||
.decompress_to_buffer(&self.reader[..block_len], &mut self.buffer)?;
|
||||
|
||||
self.reader.advance(block_len);
|
||||
self.reader.advance(block_len);
|
||||
}
|
||||
|
||||
if cfg!(not(feature = "zstd-compression")) {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::Unsupported,
|
||||
"zstd-compression feature is not enabled",
|
||||
));
|
||||
}
|
||||
} else {
|
||||
self.buffer.resize(block_len, 0u8);
|
||||
self.reader.read_exact(&mut self.buffer[..])?;
|
||||
|
||||
@@ -2,10 +2,11 @@ use std::io::{self, BufWriter, Write};
|
||||
use std::ops::Range;
|
||||
|
||||
use common::{CountingWriter, OwnedBytes};
|
||||
#[cfg(feature = "zstd-compression")]
|
||||
use zstd::bulk::Compressor;
|
||||
|
||||
use super::value::ValueWriter;
|
||||
use super::{value, vint, BlockReader};
|
||||
use super::{BlockReader, value, vint};
|
||||
|
||||
const FOUR_BIT_LIMITS: usize = 1 << 4;
|
||||
const VINT_MODE: u8 = 1u8;
|
||||
@@ -53,25 +54,28 @@ where
|
||||
|
||||
let block_len = buffer.len() + self.block.len();
|
||||
|
||||
if block_len > 2048 {
|
||||
buffer.extend_from_slice(&self.block);
|
||||
self.block.clear();
|
||||
if cfg!(feature = "zstd-compression") && block_len > 2048 {
|
||||
#[cfg(feature = "zstd-compression")]
|
||||
{
|
||||
buffer.extend_from_slice(&self.block);
|
||||
self.block.clear();
|
||||
|
||||
let max_len = zstd::zstd_safe::compress_bound(buffer.len());
|
||||
self.block.reserve(max_len);
|
||||
Compressor::new(3)?.compress_to_buffer(buffer, &mut self.block)?;
|
||||
let max_len = zstd::zstd_safe::compress_bound(buffer.len());
|
||||
self.block.reserve(max_len);
|
||||
Compressor::new(3)?.compress_to_buffer(buffer, &mut self.block)?;
|
||||
|
||||
// verify compression had a positive impact
|
||||
if self.block.len() < buffer.len() {
|
||||
self.write
|
||||
.write_all(&(self.block.len() as u32 + 1).to_le_bytes())?;
|
||||
self.write.write_all(&[1])?;
|
||||
self.write.write_all(&self.block[..])?;
|
||||
} else {
|
||||
self.write
|
||||
.write_all(&(block_len as u32 + 1).to_le_bytes())?;
|
||||
self.write.write_all(&[0])?;
|
||||
self.write.write_all(&buffer[..])?;
|
||||
// verify compression had a positive impact
|
||||
if self.block.len() < buffer.len() {
|
||||
self.write
|
||||
.write_all(&(self.block.len() as u32 + 1).to_le_bytes())?;
|
||||
self.write.write_all(&[1])?;
|
||||
self.write.write_all(&self.block[..])?;
|
||||
} else {
|
||||
self.write
|
||||
.write_all(&(block_len as u32 + 1).to_le_bytes())?;
|
||||
self.write.write_all(&[0])?;
|
||||
self.write.write_all(&buffer[..])?;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
self.write
|
||||
@@ -89,7 +93,7 @@ where
|
||||
|
||||
fn encode_keep_add(&mut self, keep_len: usize, add_len: usize) {
|
||||
if keep_len < FOUR_BIT_LIMITS && add_len < FOUR_BIT_LIMITS {
|
||||
let b = (keep_len | add_len << 4) as u8;
|
||||
let b = (keep_len | (add_len << 4)) as u8;
|
||||
self.block.extend_from_slice(&[b])
|
||||
} else {
|
||||
let mut buf = [VINT_MODE; 20];
|
||||
|
||||
@@ -1,16 +1,18 @@
|
||||
#![allow(clippy::needless_borrows_for_generic_args)]
|
||||
|
||||
use std::cmp::Ordering;
|
||||
use std::io;
|
||||
use std::marker::PhantomData;
|
||||
use std::ops::{Bound, RangeBounds};
|
||||
use std::sync::Arc;
|
||||
|
||||
use common::bounds::{transform_bound_inner_res, TransformBound};
|
||||
use common::bounds::{TransformBound, transform_bound_inner_res};
|
||||
use common::file_slice::FileSlice;
|
||||
use common::{BinarySerializable, OwnedBytes};
|
||||
use futures_util::{stream, StreamExt, TryStreamExt};
|
||||
use futures_util::{StreamExt, TryStreamExt, stream};
|
||||
use itertools::Itertools;
|
||||
use tantivy_fst::automaton::AlwaysMatch;
|
||||
use tantivy_fst::Automaton;
|
||||
use tantivy_fst::automaton::AlwaysMatch;
|
||||
|
||||
use crate::sstable_index_v3::SSTableIndexV3Empty;
|
||||
use crate::streamer::{Streamer, StreamerBuilder};
|
||||
@@ -309,7 +311,7 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
format!("Unsupported sstable version, expected one of [2, 3], found {version}"),
|
||||
))
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
@@ -485,7 +487,7 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
|
||||
/// the buffer may be modified.
|
||||
pub fn ord_to_term(&self, ord: TermOrdinal, bytes: &mut Vec<u8>) -> io::Result<bool> {
|
||||
// find block in which the term would be
|
||||
let block_addr = self.sstable_index.get_block_with_ord(ord);
|
||||
let block_addr = self.sstable_index.get_block_with_ord::<false>(ord).0;
|
||||
let first_ordinal = block_addr.first_ordinal;
|
||||
|
||||
// then search inside that block only
|
||||
@@ -509,16 +511,17 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
|
||||
mut cb: F,
|
||||
) -> io::Result<bool> {
|
||||
let mut bytes = Vec::new();
|
||||
let mut current_block_addr = self.sstable_index.get_block_with_ord(0);
|
||||
let (mut current_block_addr, mut next_block_ord) =
|
||||
self.sstable_index.get_block_with_ord::<true>(0);
|
||||
let mut current_sstable_delta_reader =
|
||||
self.sstable_delta_reader_block(current_block_addr.clone())?;
|
||||
let mut current_ordinal = 0;
|
||||
for ord in ord {
|
||||
assert!(ord >= current_ordinal);
|
||||
// check if block changed for new term_ord
|
||||
let new_block_addr = self.sstable_index.get_block_with_ord(ord);
|
||||
if new_block_addr != current_block_addr {
|
||||
current_block_addr = new_block_addr;
|
||||
if ord >= next_block_ord {
|
||||
(current_block_addr, next_block_ord) =
|
||||
self.sstable_index.get_block_with_ord::<true>(ord);
|
||||
current_ordinal = current_block_addr.first_ordinal;
|
||||
current_sstable_delta_reader =
|
||||
self.sstable_delta_reader_block(current_block_addr.clone())?;
|
||||
@@ -542,7 +545,7 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
|
||||
/// Returns the number of terms in the dictionary.
|
||||
pub fn term_info_from_ord(&self, term_ord: TermOrdinal) -> io::Result<Option<TSSTable::Value>> {
|
||||
// find block in which the term would be
|
||||
let block_addr = self.sstable_index.get_block_with_ord(term_ord);
|
||||
let block_addr = self.sstable_index.get_block_with_ord::<false>(term_ord).0;
|
||||
let first_ordinal = block_addr.first_ordinal;
|
||||
|
||||
// then search inside that block only
|
||||
@@ -642,8 +645,8 @@ mod tests {
|
||||
use common::OwnedBytes;
|
||||
|
||||
use super::Dictionary;
|
||||
use crate::dictionary::TermOrdHit;
|
||||
use crate::MonotonicU64SSTable;
|
||||
use crate::dictionary::TermOrdHit;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct PermissionedHandle {
|
||||
@@ -844,7 +847,7 @@ mod tests {
|
||||
fn test_ord_term_conversion() {
|
||||
let (dic, slice) = make_test_sstable();
|
||||
|
||||
let block = dic.sstable_index.get_block_with_ord(100_000);
|
||||
let block = dic.sstable_index.get_block_with_ord::<false>(100_000).0;
|
||||
slice.restrict(block.byte_range);
|
||||
|
||||
let mut res = Vec::new();
|
||||
@@ -870,7 +873,11 @@ mod tests {
|
||||
|
||||
// end of a block
|
||||
let ordinal = block.first_ordinal - 1;
|
||||
let new_range = dic.sstable_index.get_block_with_ord(ordinal).byte_range;
|
||||
let new_range = dic
|
||||
.sstable_index
|
||||
.get_block_with_ord::<false>(ordinal)
|
||||
.0
|
||||
.byte_range;
|
||||
slice.restrict(new_range);
|
||||
assert!(dic.ord_to_term(ordinal, &mut res).unwrap());
|
||||
assert_eq!(res, format!("{ordinal:05X}").into_bytes());
|
||||
@@ -880,7 +887,7 @@ mod tests {
|
||||
|
||||
// before first block
|
||||
// 1st block must be loaded for key-related operations
|
||||
let block = dic.sstable_index.get_block_with_ord(0);
|
||||
let block = dic.sstable_index.get_block_with_ord::<false>(0).0;
|
||||
slice.restrict(block.byte_range);
|
||||
|
||||
assert!(dic.get(b"$$$").unwrap().is_none());
|
||||
@@ -889,7 +896,11 @@ mod tests {
|
||||
// after last block
|
||||
// last block must be loaded for ord related operations
|
||||
let ordinal = 0x40000 + 10;
|
||||
let new_range = dic.sstable_index.get_block_with_ord(ordinal).byte_range;
|
||||
let new_range = dic
|
||||
.sstable_index
|
||||
.get_block_with_ord::<false>(ordinal)
|
||||
.0
|
||||
.byte_range;
|
||||
slice.restrict(new_range);
|
||||
assert!(!dic.ord_to_term(ordinal, &mut res).unwrap());
|
||||
assert!(dic.term_info_from_ord(ordinal).unwrap().is_none());
|
||||
@@ -912,30 +923,33 @@ mod tests {
|
||||
|
||||
// Single term
|
||||
let mut terms = Vec::new();
|
||||
assert!(dic
|
||||
.sorted_ords_to_term_cb(100_000..100_001, |term| {
|
||||
assert!(
|
||||
dic.sorted_ords_to_term_cb(100_000..100_001, |term| {
|
||||
terms.push(term.to_vec());
|
||||
Ok(())
|
||||
})
|
||||
.unwrap());
|
||||
.unwrap()
|
||||
);
|
||||
assert_eq!(terms, vec![format!("{:05X}", 100_000).into_bytes(),]);
|
||||
// Single term
|
||||
let mut terms = Vec::new();
|
||||
assert!(dic
|
||||
.sorted_ords_to_term_cb(100_001..100_002, |term| {
|
||||
assert!(
|
||||
dic.sorted_ords_to_term_cb(100_001..100_002, |term| {
|
||||
terms.push(term.to_vec());
|
||||
Ok(())
|
||||
})
|
||||
.unwrap());
|
||||
.unwrap()
|
||||
);
|
||||
assert_eq!(terms, vec![format!("{:05X}", 100_001).into_bytes(),]);
|
||||
// both terms
|
||||
let mut terms = Vec::new();
|
||||
assert!(dic
|
||||
.sorted_ords_to_term_cb(100_000..100_002, |term| {
|
||||
assert!(
|
||||
dic.sorted_ords_to_term_cb(100_000..100_002, |term| {
|
||||
terms.push(term.to_vec());
|
||||
Ok(())
|
||||
})
|
||||
.unwrap());
|
||||
.unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
terms,
|
||||
vec![
|
||||
@@ -945,12 +959,13 @@ mod tests {
|
||||
);
|
||||
// Test cross block
|
||||
let mut terms = Vec::new();
|
||||
assert!(dic
|
||||
.sorted_ords_to_term_cb(98653..=98655, |term| {
|
||||
assert!(
|
||||
dic.sorted_ords_to_term_cb(98653..=98655, |term| {
|
||||
terms.push(term.to_vec());
|
||||
Ok(())
|
||||
})
|
||||
.unwrap());
|
||||
.unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
terms,
|
||||
vec![
|
||||
|
||||
@@ -1,3 +1,40 @@
|
||||
//! `tantivy_sstable` is a crate that provides a sorted string table data structure.
|
||||
//!
|
||||
//! It is used in `tantivy` to store the term dictionary.
|
||||
//!
|
||||
//! A `sstable` is a map of sorted `&[u8]` keys to values.
|
||||
//! The keys are encoded using incremental encoding.
|
||||
//!
|
||||
//! Values and keys are compressed using zstd with the default feature flag `zstd-compression`.
|
||||
//!
|
||||
//! # Example
|
||||
//!
|
||||
//! Here is an example of how to create and search an `sstable`:
|
||||
//!
|
||||
//! ```rust
|
||||
//! use common::OwnedBytes;
|
||||
//! use tantivy_sstable::{Dictionary, MonotonicU64SSTable};
|
||||
//!
|
||||
//! // Create a new sstable in memory.
|
||||
//! let mut builder = Dictionary::<MonotonicU64SSTable>::builder(Vec::new()).unwrap();
|
||||
//! builder.insert(b"apple", &1).unwrap();
|
||||
//! builder.insert(b"banana", &2).unwrap();
|
||||
//! builder.insert(b"orange", &3).unwrap();
|
||||
//! let sstable_bytes = builder.finish().unwrap();
|
||||
//!
|
||||
//! // Open the sstable.
|
||||
//! let sstable =
|
||||
//! Dictionary::<MonotonicU64SSTable>::from_bytes(OwnedBytes::new(sstable_bytes)).unwrap();
|
||||
//!
|
||||
//! // Search for a key.
|
||||
//! let value = sstable.get(b"banana").unwrap();
|
||||
//! assert_eq!(value, Some(2));
|
||||
//!
|
||||
//! // Search for a non-existent key.
|
||||
//! let value = sstable.get(b"grape").unwrap();
|
||||
//! assert_eq!(value, None);
|
||||
//! ```
|
||||
|
||||
use std::io::{self, Write};
|
||||
use std::ops::Range;
|
||||
|
||||
@@ -19,6 +56,7 @@ pub use streamer::{Streamer, StreamerBuilder};
|
||||
|
||||
mod block_reader;
|
||||
use common::{BinarySerializable, OwnedBytes};
|
||||
use value::{VecU32ValueReader, VecU32ValueWriter};
|
||||
|
||||
pub use self::block_reader::BlockReader;
|
||||
pub use self::delta::{DeltaReader, DeltaWriter};
|
||||
@@ -130,6 +168,15 @@ impl SSTable for RangeSSTable {
|
||||
type ValueWriter = RangeValueWriter;
|
||||
}
|
||||
|
||||
/// SSTable associating keys to Vec<u32>.
|
||||
pub struct VecU32ValueSSTable;
|
||||
|
||||
impl SSTable for VecU32ValueSSTable {
|
||||
type Value = Vec<u32>;
|
||||
type ValueReader = VecU32ValueReader;
|
||||
type ValueWriter = VecU32ValueWriter;
|
||||
}
|
||||
|
||||
/// SSTable reader.
|
||||
pub struct Reader<TValueReader> {
|
||||
key: Vec<u8>,
|
||||
@@ -322,7 +369,7 @@ mod test {
|
||||
|
||||
use common::OwnedBytes;
|
||||
|
||||
use super::{common_prefix_len, MonotonicU64SSTable, SSTable, VoidMerge, VoidSSTable};
|
||||
use super::{MonotonicU64SSTable, SSTable, VoidMerge, VoidSSTable, common_prefix_len};
|
||||
|
||||
fn aux_test_common_prefix_len(left: &str, right: &str, expect_len: usize) {
|
||||
assert_eq!(
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::binary_heap::PeekMut;
|
||||
use std::collections::BinaryHeap;
|
||||
use std::collections::binary_heap::PeekMut;
|
||||
use std::io;
|
||||
|
||||
use super::{SingleValueMerger, ValueMerger};
|
||||
@@ -41,14 +41,17 @@ pub fn merge_sstable<SST: SSTable, W: io::Write, M: ValueMerger<SST::Value>>(
|
||||
loop {
|
||||
let len = heap.len();
|
||||
let mut value_merger;
|
||||
if let Some(mut head) = heap.peek_mut() {
|
||||
writer.insert_key(head.0.key()).unwrap();
|
||||
value_merger = merger.new_value(head.0.value());
|
||||
if !head.0.advance()? {
|
||||
PeekMut::pop(head);
|
||||
match heap.peek_mut() {
|
||||
Some(mut head) => {
|
||||
writer.insert_key(head.0.key()).unwrap();
|
||||
value_merger = merger.new_value(head.0.value());
|
||||
if !head.0.advance()? {
|
||||
PeekMut::pop(head);
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
for _ in 0..len - 1 {
|
||||
if let Some(mut head) = heap.peek_mut() {
|
||||
|
||||
@@ -72,9 +72,15 @@ impl SSTableIndex {
|
||||
}
|
||||
|
||||
/// Get the [`BlockAddr`] of the block containing the `ord`-th term.
|
||||
pub(crate) fn get_block_with_ord(&self, ord: TermOrdinal) -> BlockAddr {
|
||||
pub(crate) fn get_block_with_ord(&self, ord: TermOrdinal) -> (BlockAddr, u64) {
|
||||
// locate_with_ord always returns an index within range
|
||||
self.get_block(self.locate_with_ord(ord)).unwrap()
|
||||
let block_pos = self.locate_with_ord(ord);
|
||||
(
|
||||
self.get_block(block_pos).unwrap(),
|
||||
self.get_block(block_pos + 1)
|
||||
.map(|b| b.first_ordinal)
|
||||
.unwrap_or(u64::MAX),
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) fn get_block_for_automaton<'a>(
|
||||
|
||||
@@ -3,12 +3,12 @@ use std::ops::Range;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common::{BinarySerializable, FixedSize, OwnedBytes};
|
||||
use tantivy_bitpacker::{compute_num_bits, BitPacker};
|
||||
use tantivy_bitpacker::{BitPacker, compute_num_bits};
|
||||
use tantivy_fst::raw::Fst;
|
||||
use tantivy_fst::{Automaton, IntoStreamer, Map, MapBuilder, Streamer};
|
||||
|
||||
use crate::block_match_automaton::can_block_match_automaton;
|
||||
use crate::{common_prefix_len, SSTableDataCorruption, TermOrdinal};
|
||||
use crate::{SSTableDataCorruption, TermOrdinal, common_prefix_len};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum SSTableIndex {
|
||||
@@ -58,10 +58,13 @@ impl SSTableIndex {
|
||||
}
|
||||
|
||||
/// Get the [`BlockAddr`] of the block containing the `ord`-th term.
|
||||
pub(crate) fn get_block_with_ord(&self, ord: TermOrdinal) -> BlockAddr {
|
||||
pub(crate) fn get_block_with_ord<const FETCH_NEXT_ORD: bool>(
|
||||
&self,
|
||||
ord: TermOrdinal,
|
||||
) -> (BlockAddr, u64) {
|
||||
match self {
|
||||
SSTableIndex::V2(v2_index) => v2_index.get_block_with_ord(ord),
|
||||
SSTableIndex::V3(v3_index) => v3_index.get_block_with_ord(ord),
|
||||
SSTableIndex::V3(v3_index) => v3_index.get_block_with_ord::<FETCH_NEXT_ORD>(ord),
|
||||
SSTableIndex::V3Empty(v3_empty) => v3_empty.get_block_with_ord(ord),
|
||||
}
|
||||
}
|
||||
@@ -152,12 +155,18 @@ impl SSTableIndexV3 {
|
||||
}
|
||||
|
||||
pub(crate) fn locate_with_ord(&self, ord: TermOrdinal) -> u64 {
|
||||
self.block_addr_store.binary_search_ord(ord).0
|
||||
self.block_addr_store.binary_search_ord::<false>(ord).0
|
||||
}
|
||||
|
||||
/// Get the [`BlockAddr`] of the block containing the `ord`-th term.
|
||||
pub(crate) fn get_block_with_ord(&self, ord: TermOrdinal) -> BlockAddr {
|
||||
self.block_addr_store.binary_search_ord(ord).1
|
||||
pub(crate) fn get_block_with_ord<const FETCH_NEXT_ORD: bool>(
|
||||
&self,
|
||||
ord: TermOrdinal,
|
||||
) -> (BlockAddr, u64) {
|
||||
let (_block_id, block_addr, next_ord) = self
|
||||
.block_addr_store
|
||||
.binary_search_ord::<FETCH_NEXT_ORD>(ord);
|
||||
(block_addr, next_ord)
|
||||
}
|
||||
|
||||
pub(crate) fn get_block_for_automaton<'a>(
|
||||
@@ -253,8 +262,8 @@ impl SSTableIndexV3Empty {
|
||||
}
|
||||
|
||||
/// Get the [`BlockAddr`] of the block containing the `ord`-th term.
|
||||
pub(crate) fn get_block_with_ord(&self, _ord: TermOrdinal) -> BlockAddr {
|
||||
self.block_addr.clone()
|
||||
pub(crate) fn get_block_with_ord(&self, _ord: TermOrdinal) -> (BlockAddr, u64) {
|
||||
(self.block_addr.clone(), u64::MAX)
|
||||
}
|
||||
}
|
||||
#[derive(Clone, Eq, PartialEq, Debug)]
|
||||
@@ -461,7 +470,11 @@ impl BlockAddrBlockMetadata {
|
||||
})
|
||||
}
|
||||
|
||||
fn bisect_for_ord(&self, data: &[u8], target_ord: TermOrdinal) -> (u64, BlockAddr) {
|
||||
fn bisect_for_ord<const FETCH_NEXT_ORD: bool>(
|
||||
&self,
|
||||
data: &[u8],
|
||||
target_ord: TermOrdinal,
|
||||
) -> (u64, BlockAddr, u64) {
|
||||
let inner_target_ord = target_ord - self.ref_block_addr.first_ordinal;
|
||||
let num_bits = self.num_bits() as usize;
|
||||
let range_start_nbits = self.range_start_nbits as usize;
|
||||
@@ -481,11 +494,17 @@ impl BlockAddrBlockMetadata {
|
||||
Err(inner_offset) => inner_offset,
|
||||
};
|
||||
// we can unwrap because inner_offset <= self.block_len
|
||||
(
|
||||
inner_offset,
|
||||
self.deserialize_block_addr(data, inner_offset as usize)
|
||||
.unwrap(),
|
||||
)
|
||||
let block = self
|
||||
.deserialize_block_addr(data, inner_offset as usize)
|
||||
.unwrap();
|
||||
let next_ord = if FETCH_NEXT_ORD {
|
||||
self.deserialize_block_addr(data, inner_offset as usize + 1)
|
||||
.map(|b| b.first_ordinal)
|
||||
.unwrap_or(u64::MAX)
|
||||
} else {
|
||||
0
|
||||
};
|
||||
(inner_offset, block, next_ord)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -591,7 +610,10 @@ impl BlockAddrStore {
|
||||
)
|
||||
}
|
||||
|
||||
fn binary_search_ord(&self, ord: TermOrdinal) -> (u64, BlockAddr) {
|
||||
fn binary_search_ord<const FETCH_NEXT_ORD: bool>(
|
||||
&self,
|
||||
ord: TermOrdinal,
|
||||
) -> (u64, BlockAddr, u64) {
|
||||
let max_block =
|
||||
(self.block_meta_bytes.len() / BlockAddrBlockMetadata::SIZE_IN_BYTES) as u64;
|
||||
let get_first_ordinal = |block_id| {
|
||||
@@ -606,20 +628,29 @@ impl BlockAddrStore {
|
||||
Ok(store_block_id) => {
|
||||
let block_id = store_block_id * STORE_BLOCK_LEN as u64;
|
||||
// we can unwrap because store_block_id < max_block
|
||||
return (block_id, self.get(block_id).unwrap());
|
||||
let next_ord = if FETCH_NEXT_ORD {
|
||||
self.get(block_id + 1)
|
||||
.map(|b| b.first_ordinal)
|
||||
.unwrap_or(u64::MAX)
|
||||
} else {
|
||||
0
|
||||
};
|
||||
return (block_id, self.get(block_id).unwrap(), next_ord);
|
||||
}
|
||||
Err(store_block_id) => store_block_id - 1,
|
||||
};
|
||||
|
||||
// we can unwrap because store_block_id < max_block
|
||||
let block_addr_block_data = self.get_block_meta(store_block_id as usize).unwrap();
|
||||
let (inner_offset, block_addr) = block_addr_block_data.bisect_for_ord(
|
||||
&self.addr_bytes[block_addr_block_data.offset as usize..],
|
||||
ord,
|
||||
);
|
||||
let (inner_offset, block_addr, next_block_ord) = block_addr_block_data
|
||||
.bisect_for_ord::<FETCH_NEXT_ORD>(
|
||||
&self.addr_bytes[block_addr_block_data.offset as usize..],
|
||||
ord,
|
||||
);
|
||||
(
|
||||
store_block_id * STORE_BLOCK_LEN as u64 + inner_offset,
|
||||
block_addr,
|
||||
next_block_ord,
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -824,8 +855,8 @@ mod tests {
|
||||
use common::OwnedBytes;
|
||||
|
||||
use super::*;
|
||||
use crate::block_match_automaton::tests::EqBuffer;
|
||||
use crate::SSTableDataCorruption;
|
||||
use crate::block_match_automaton::tests::EqBuffer;
|
||||
|
||||
#[test]
|
||||
fn test_sstable_index() {
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
use std::io;
|
||||
use std::ops::Bound;
|
||||
|
||||
use tantivy_fst::automaton::AlwaysMatch;
|
||||
use tantivy_fst::Automaton;
|
||||
use tantivy_fst::automaton::AlwaysMatch;
|
||||
|
||||
use crate::dictionary::Dictionary;
|
||||
use crate::{DeltaReader, SSTable, TermOrdinal};
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use std::io;
|
||||
|
||||
use crate::value::{deserialize_vint_u64, ValueReader, ValueWriter};
|
||||
use crate::{vint, BlockAddr};
|
||||
use crate::value::{ValueReader, ValueWriter, deserialize_vint_u64};
|
||||
use crate::{BlockAddr, vint};
|
||||
|
||||
#[derive(Default)]
|
||||
pub(crate) struct IndexValueReader {
|
||||
|
||||
@@ -1,10 +1,16 @@
|
||||
pub(crate) mod index;
|
||||
mod range;
|
||||
mod u64_monotonic;
|
||||
mod vec_u32;
|
||||
mod void;
|
||||
|
||||
use std::io;
|
||||
|
||||
pub use range::{RangeValueReader, RangeValueWriter};
|
||||
pub use u64_monotonic::{U64MonotonicValueReader, U64MonotonicValueWriter};
|
||||
pub use vec_u32::{VecU32ValueReader, VecU32ValueWriter};
|
||||
pub use void::{VoidValueReader, VoidValueWriter};
|
||||
|
||||
/// `ValueReader` is a trait describing the contract of something
|
||||
/// reading blocks of value, and offering random access within this values.
|
||||
pub trait ValueReader: Default {
|
||||
@@ -40,10 +46,6 @@ pub trait ValueWriter: Default {
|
||||
fn clear(&mut self);
|
||||
}
|
||||
|
||||
pub use range::{RangeValueReader, RangeValueWriter};
|
||||
pub use u64_monotonic::{U64MonotonicValueReader, U64MonotonicValueWriter};
|
||||
pub use void::{VoidValueReader, VoidValueWriter};
|
||||
|
||||
fn deserialize_vint_u64(data: &mut &[u8]) -> u64 {
|
||||
let (num_bytes, val) = super::vint::deserialize_read(data);
|
||||
*data = &data[num_bytes..];
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use std::io;
|
||||
use std::ops::Range;
|
||||
|
||||
use crate::value::{deserialize_vint_u64, ValueReader, ValueWriter};
|
||||
use crate::value::{ValueReader, ValueWriter, deserialize_vint_u64};
|
||||
|
||||
/// See module comment.
|
||||
#[derive(Default)]
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use std::io;
|
||||
|
||||
use crate::value::{deserialize_vint_u64, ValueReader, ValueWriter};
|
||||
use crate::value::{ValueReader, ValueWriter, deserialize_vint_u64};
|
||||
use crate::vint;
|
||||
|
||||
#[derive(Default)]
|
||||
|
||||
73
sstable/src/value/vec_u32.rs
Normal file
73
sstable/src/value/vec_u32.rs
Normal file
@@ -0,0 +1,73 @@
|
||||
use std::io;
|
||||
|
||||
use super::{ValueReader, ValueWriter};
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct VecU32ValueReader {
|
||||
vals: Vec<Vec<u32>>,
|
||||
}
|
||||
|
||||
impl ValueReader for VecU32ValueReader {
|
||||
type Value = Vec<u32>;
|
||||
|
||||
#[inline(always)]
|
||||
fn value(&self, idx: usize) -> &Self::Value {
|
||||
&self.vals[idx]
|
||||
}
|
||||
|
||||
fn load(&mut self, mut data: &[u8]) -> io::Result<usize> {
|
||||
let original_num_bytes = data.len();
|
||||
self.vals.clear();
|
||||
|
||||
// The first 4 bytes are the number of blocks
|
||||
let num_blocks = u32::from_le_bytes(data[..4].try_into().unwrap()) as usize;
|
||||
data = &data[4..];
|
||||
|
||||
for _ in 0..num_blocks {
|
||||
// Each block starts with a 4-byte length
|
||||
let segment_len = u32::from_le_bytes(data[..4].try_into().unwrap()) as usize;
|
||||
data = &data[4..];
|
||||
|
||||
// Read the segment IDs for this block
|
||||
let mut segment_ids = Vec::with_capacity(segment_len);
|
||||
for _ in 0..segment_len {
|
||||
let segment_id = u32::from_le_bytes(data[..4].try_into().unwrap());
|
||||
segment_ids.push(segment_id);
|
||||
data = &data[4..];
|
||||
}
|
||||
self.vals.push(segment_ids);
|
||||
}
|
||||
|
||||
// Return the number of bytes consumed
|
||||
Ok(original_num_bytes - data.len())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct VecU32ValueWriter {
|
||||
vals: Vec<Vec<u32>>,
|
||||
}
|
||||
|
||||
impl ValueWriter for VecU32ValueWriter {
|
||||
type Value = Vec<u32>;
|
||||
|
||||
fn write(&mut self, val: &Self::Value) {
|
||||
self.vals.push(val.to_vec());
|
||||
}
|
||||
|
||||
fn serialize_block(&self, output: &mut Vec<u8>) {
|
||||
let num_blocks = self.vals.len() as u32;
|
||||
output.extend_from_slice(&num_blocks.to_le_bytes());
|
||||
for vals in &self.vals {
|
||||
let len = vals.len() as u32;
|
||||
output.extend_from_slice(&len.to_le_bytes());
|
||||
for &segment_id in vals.iter() {
|
||||
output.extend_from_slice(&segment_id.to_le_bytes());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn clear(&mut self) {
|
||||
self.vals.clear();
|
||||
}
|
||||
}
|
||||
49
sstable/tests/sstable_test.rs
Normal file
49
sstable/tests/sstable_test.rs
Normal file
@@ -0,0 +1,49 @@
|
||||
use common::OwnedBytes;
|
||||
use tantivy_sstable::{Dictionary, MonotonicU64SSTable, VecU32ValueSSTable};
|
||||
|
||||
#[test]
|
||||
fn test_create_and_search_sstable() {
|
||||
// Create a new sstable in memory.
|
||||
let mut builder = Dictionary::<MonotonicU64SSTable>::builder(Vec::new()).unwrap();
|
||||
builder.insert(b"apple", &1).unwrap();
|
||||
builder.insert(b"banana", &2).unwrap();
|
||||
builder.insert(b"orange", &3).unwrap();
|
||||
let sstable_bytes = builder.finish().unwrap();
|
||||
|
||||
// Open the sstable.
|
||||
let sstable =
|
||||
Dictionary::<MonotonicU64SSTable>::from_bytes(OwnedBytes::new(sstable_bytes)).unwrap();
|
||||
|
||||
// Search for a key.
|
||||
let value = sstable.get(b"banana").unwrap();
|
||||
assert_eq!(value, Some(2));
|
||||
|
||||
// Search for a non-existent key.
|
||||
let value = sstable.get(b"blub").unwrap();
|
||||
assert_eq!(value, None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_custom_value_sstable() {
|
||||
// Create a new sstable with custom values.
|
||||
let mut builder = Dictionary::<VecU32ValueSSTable>::builder(Vec::new()).unwrap();
|
||||
builder.set_block_len(4096); // Ensure both values are in the same block
|
||||
builder.insert(b"first", &vec![1, 2, 3]).unwrap();
|
||||
builder.insert(b"second", &vec![4, 5]).unwrap();
|
||||
let sstable_bytes = builder.finish().unwrap();
|
||||
|
||||
// Open the sstable.
|
||||
let sstable =
|
||||
Dictionary::<VecU32ValueSSTable>::from_bytes(OwnedBytes::new(sstable_bytes)).unwrap();
|
||||
|
||||
let mut stream = sstable.stream().unwrap();
|
||||
assert!(stream.advance());
|
||||
assert_eq!(stream.key(), b"first");
|
||||
assert_eq!(stream.value(), &vec![1, 2, 3]);
|
||||
|
||||
assert!(stream.advance());
|
||||
assert_eq!(stream.key(), b"second");
|
||||
assert_eq!(stream.value(), &vec![4, 5]);
|
||||
|
||||
assert!(!stream.advance());
|
||||
}
|
||||
@@ -1,7 +1,7 @@
|
||||
[package]
|
||||
name = "tantivy-stacker"
|
||||
version = "0.3.0"
|
||||
edition = "2021"
|
||||
version = "0.5.0"
|
||||
edition = "2024"
|
||||
license = "MIT"
|
||||
homepage = "https://github.com/quickwit-oss/tantivy"
|
||||
repository = "https://github.com/quickwit-oss/tantivy"
|
||||
@@ -9,7 +9,7 @@ description = "term hashmap used for indexing"
|
||||
|
||||
[dependencies]
|
||||
murmurhash32 = "0.3"
|
||||
common = { version = "0.7", path = "../common/", package = "tantivy-common" }
|
||||
common = { version = "0.9", path = "../common/", package = "tantivy-common" }
|
||||
ahash = { version = "0.8.11", default-features = false, optional = true }
|
||||
rand_distr = "0.4.3"
|
||||
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user