Compare commits

...

18 Commits

Author SHA1 Message Date
Pascal Seitz
6761237ec7 chore: Release 0.19.2 2023-02-10 12:20:20 +08:00
Pascal Seitz
3da08e92c7 fix: doc store for files larger 4GB
Fixes an issue in the skip list deserialization, which deserialized the byte start offset incorrectly as u32.
`get_doc` will fail for any docs that live in a block with start offset larger than u32::MAX (~4GB).
Causes index corruption, if a segment with a doc store larger 4GB is merged.

tantivy version 0.19 is affected
2023-02-10 12:12:47 +08:00
Pascal Seitz
6c4b8d97ed chore: Release 2023-01-13 13:46:28 +08:00
Pascal Seitz
dc5f503c9a use fastfield_codecs 0.3.1 2023-01-13 13:34:42 +08:00
Pascal Seitz
4ffcf3a933 chore: Release 2023-01-13 13:31:20 +08:00
Pascal Seitz
079f542f97 handle user input on get_docid_for_value_range 2023-01-13 12:24:34 +08:00
PSeitz
509adab79d Bump version (#1715)
* group workspace deps

* update cargo.toml

* revert tant version

* chore: Release
2022-12-12 04:39:43 +01:00
PSeitz
96c93a6ba3 Merge pull request #1700 from quickwit-oss/PSeitz-patch-1
Update CHANGELOG.md
2022-12-02 16:31:11 +01:00
boraarslan
495824361a Move split_full_path to Schema (#1692) 2022-11-29 20:56:13 +09:00
PSeitz
485a8f507e Update CHANGELOG.md 2022-11-28 15:41:31 +01:00
PSeitz
1119e59eae prepare fastfield format for null index (#1691)
* prepare fastfield format for null index
* add format version for fastfield
* Update fastfield_codecs/src/compact_space/mod.rs
* switch to variable size footer
* serialize delta of end
2022-11-28 17:15:24 +09:00
PSeitz
ee1f2c1f28 add aggregation support for date type (#1693)
* add aggregation support for date type
fixes #1332

* serialize key_as_string as rfc3339 in date histogram
* update docs
* enable date for range aggregation
2022-11-28 09:12:08 +09:00
PSeitz
600548fd26 Merge pull request #1694 from quickwit-oss/dependabot/cargo/zstd-0.12
Update zstd requirement from 0.11 to 0.12
2022-11-25 05:48:59 +01:00
PSeitz
9929c0c221 Merge pull request #1696 from quickwit-oss/dependabot/cargo/env_logger-0.10.0
Update env_logger requirement from 0.9.0 to 0.10.0
2022-11-25 03:28:10 +01:00
dependabot[bot]
f53e65648b Update env_logger requirement from 0.9.0 to 0.10.0
Updates the requirements on [env_logger](https://github.com/rust-cli/env_logger) to permit the latest version.
- [Release notes](https://github.com/rust-cli/env_logger/releases)
- [Changelog](https://github.com/rust-cli/env_logger/blob/main/CHANGELOG.md)
- [Commits](https://github.com/rust-cli/env_logger/compare/v0.9.0...v0.10.0)

---
updated-dependencies:
- dependency-name: env_logger
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
2022-11-24 20:07:52 +00:00
PSeitz
0281b22b77 update create_in_ram docs (#1695) 2022-11-24 17:30:09 +01:00
dependabot[bot]
a05c184830 Update zstd requirement from 0.11 to 0.12
Updates the requirements on [zstd](https://github.com/gyscos/zstd-rs) to permit the latest version.
- [Release notes](https://github.com/gyscos/zstd-rs/releases)
- [Commits](https://github.com/gyscos/zstd-rs/commits)

---
updated-dependencies:
- dependency-name: zstd
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
2022-11-23 20:15:32 +00:00
Paul Masurel
0b40a7fe43 Added a expand_dots JsonObjectOptions. (#1687)
Related with quickwit#2345.
2022-11-21 23:03:00 +09:00
39 changed files with 1150 additions and 248 deletions

View File

@@ -1,9 +1,13 @@
Tantivy 0.19 Tantivy 0.19
================================ ================================
#### Bugfixes
- Fix missing fieldnorms for u64, i64, f64, bool, bytes and date [#1620](https://github.com/quickwit-oss/tantivy/pull/1620) (@PSeitz)
- Fix interpolation overflow in linear interpolation fastfield codec [#1480](https://github.com/quickwit-oss/tantivy/pull/1480 (@PSeitz @fulmicoton)
#### Features/Improvements
- Add support for `IN` in queryparser , e.g. `field: IN [val1 val2 val3]` [#1683](https://github.com/quickwit-oss/tantivy/pull/1683) (@trinity-1686a)
- Skip score calculation, when no scoring is required [#1646](https://github.com/quickwit-oss/tantivy/pull/1646) (@PSeitz) - Skip score calculation, when no scoring is required [#1646](https://github.com/quickwit-oss/tantivy/pull/1646) (@PSeitz)
- Limit fast fields to u32 (`get_val(u32)`) [#1644](https://github.com/quickwit-oss/tantivy/pull/1644) (@PSeitz) - Limit fast fields to u32 (`get_val(u32)`) [#1644](https://github.com/quickwit-oss/tantivy/pull/1644) (@PSeitz)
- Major bugfix: Fix missing fieldnorms for u64, i64, f64, bool, bytes and date [#1620](https://github.com/quickwit-oss/tantivy/pull/1620) (@PSeitz)
- Updated [Date Field Type](https://github.com/quickwit-oss/tantivy/pull/1396) - Updated [Date Field Type](https://github.com/quickwit-oss/tantivy/pull/1396)
The `DateTime` type has been updated to hold timestamps with microseconds precision. The `DateTime` type has been updated to hold timestamps with microseconds precision.
`DateOptions` and `DatePrecision` have been added to configure Date fields. The precision is used to hint on fast values compression. Otherwise, seconds precision is used everywhere else (i.e terms, indexing). (@evanxg852000) `DateOptions` and `DatePrecision` have been added to configure Date fields. The precision is used to hint on fast values compression. Otherwise, seconds precision is used everywhere else (i.e terms, indexing). (@evanxg852000)
@@ -11,7 +15,6 @@ Tantivy 0.19
- Add boolean field type [#1382](https://github.com/quickwit-oss/tantivy/pull/1382) (@boraarslan) - Add boolean field type [#1382](https://github.com/quickwit-oss/tantivy/pull/1382) (@boraarslan)
- Remove Searcher pool and make `Searcher` cloneable. (@PSeitz) - Remove Searcher pool and make `Searcher` cloneable. (@PSeitz)
- Validate settings on create [#1570](https://github.com/quickwit-oss/tantivy/pull/1570 (@PSeitz) - Validate settings on create [#1570](https://github.com/quickwit-oss/tantivy/pull/1570 (@PSeitz)
- Fix interpolation overflow in linear interpolation fastfield codec [#1480](https://github.com/quickwit-oss/tantivy/pull/1480 (@PSeitz @fulmicoton)
- Detect and apply gcd on fastfield codecs [#1418](https://github.com/quickwit-oss/tantivy/pull/1418) (@PSeitz) - Detect and apply gcd on fastfield codecs [#1418](https://github.com/quickwit-oss/tantivy/pull/1418) (@PSeitz)
- Doc store - Doc store
- use separate thread to compress block store [#1389](https://github.com/quickwit-oss/tantivy/pull/1389) [#1510](https://github.com/quickwit-oss/tantivy/pull/1510 (@PSeitz @fulmicoton) - use separate thread to compress block store [#1389](https://github.com/quickwit-oss/tantivy/pull/1389) [#1510](https://github.com/quickwit-oss/tantivy/pull/1510 (@PSeitz @fulmicoton)
@@ -21,6 +24,7 @@ Tantivy 0.19
- Make `tantivy::TantivyError` cloneable [#1402](https://github.com/quickwit-oss/tantivy/pull/1402) (@PSeitz) - Make `tantivy::TantivyError` cloneable [#1402](https://github.com/quickwit-oss/tantivy/pull/1402) (@PSeitz)
- Add support for phrase slop in query language [#1393](https://github.com/quickwit-oss/tantivy/pull/1393) (@saroh) - Add support for phrase slop in query language [#1393](https://github.com/quickwit-oss/tantivy/pull/1393) (@saroh)
- Aggregation - Aggregation
- Add aggregation support for date type [#1693](https://github.com/quickwit-oss/tantivy/pull/1693)(@PSeitz)
- Add support for keyed parameter in range and histgram aggregations [#1424](https://github.com/quickwit-oss/tantivy/pull/1424) (@k-yomo) - Add support for keyed parameter in range and histgram aggregations [#1424](https://github.com/quickwit-oss/tantivy/pull/1424) (@k-yomo)
- Add aggregation bucket limit [#1363](https://github.com/quickwit-oss/tantivy/pull/1363) (@PSeitz) - Add aggregation bucket limit [#1363](https://github.com/quickwit-oss/tantivy/pull/1363) (@PSeitz)
- Faster indexing - Faster indexing

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "tantivy" name = "tantivy"
version = "0.19.0-dev" version = "0.19.2"
authors = ["Paul Masurel <paul.masurel@gmail.com>"] authors = ["Paul Masurel <paul.masurel@gmail.com>"]
license = "MIT" license = "MIT"
categories = ["database-implementations", "data-structures"] categories = ["database-implementations", "data-structures"]
@@ -25,7 +25,7 @@ tantivy-fst = "0.4.0"
memmap2 = { version = "0.5.3", optional = true } memmap2 = { version = "0.5.3", optional = true }
lz4_flex = { version = "0.9.2", default-features = false, features = ["checked-decode"], optional = true } lz4_flex = { version = "0.9.2", default-features = false, features = ["checked-decode"], optional = true }
brotli = { version = "3.3.4", optional = true } brotli = { version = "3.3.4", optional = true }
zstd = { version = "0.11", optional = true, default-features = false } zstd = { version = "0.12", optional = true, default-features = false }
snap = { version = "1.0.5", optional = true } snap = { version = "1.0.5", optional = true }
tempfile = { version = "3.3.0", optional = true } tempfile = { version = "3.3.0", optional = true }
log = "0.4.16" log = "0.4.16"
@@ -36,11 +36,6 @@ fs2 = { version = "0.4.3", optional = true }
levenshtein_automata = "0.2.1" levenshtein_automata = "0.2.1"
uuid = { version = "1.0.0", features = ["v4", "serde"] } uuid = { version = "1.0.0", features = ["v4", "serde"] }
crossbeam-channel = "0.5.4" crossbeam-channel = "0.5.4"
tantivy-query-grammar = { version="0.18.0", path="./query-grammar" }
tantivy-bitpacker = { version="0.2", path="./bitpacker" }
common = { version = "0.3", path = "./common/", package = "tantivy-common" }
fastfield_codecs = { version="0.2", path="./fastfield_codecs", default-features = false }
ownedbytes = { version="0.3", path="./ownedbytes" }
stable_deref_trait = "1.2.0" stable_deref_trait = "1.2.0"
rust-stemmers = "1.2.0" rust-stemmers = "1.2.0"
downcast-rs = "1.2.0" downcast-rs = "1.2.0"
@@ -62,6 +57,12 @@ ciborium = { version = "0.2", optional = true}
async-trait = "0.1.53" async-trait = "0.1.53"
arc-swap = "1.5.0" arc-swap = "1.5.0"
tantivy-query-grammar = { version= "0.19.0", path="./query-grammar" }
tantivy-bitpacker = { version= "0.3", path="./bitpacker" }
common = { version= "0.4", path = "./common/", package = "tantivy-common" }
fastfield_codecs = { version= "0.3.1", path="./fastfield_codecs", default-features = false }
ownedbytes = { version= "0.4", path="./ownedbytes" }
[target.'cfg(windows)'.dependencies] [target.'cfg(windows)'.dependencies]
winapi = "0.3.9" winapi = "0.3.9"
@@ -73,7 +74,7 @@ pretty_assertions = "1.2.1"
proptest = "1.0.0" proptest = "1.0.0"
criterion = "0.4" criterion = "0.4"
test-log = "0.2.10" test-log = "0.2.10"
env_logger = "0.9.0" env_logger = "0.10.0"
pprof = { version = "0.11.0", features = ["flamegraph", "criterion"] } pprof = { version = "0.11.0", features = ["flamegraph", "criterion"] }
futures = "0.3.21" futures = "0.3.21"

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "tantivy-bitpacker" name = "tantivy-bitpacker"
version = "0.2.0" version = "0.3.0"
edition = "2021" edition = "2021"
authors = ["Paul Masurel <paul.masurel@gmail.com>"] authors = ["Paul Masurel <paul.masurel@gmail.com>"]
license = "MIT" license = "MIT"
@@ -8,6 +8,8 @@ categories = []
description = """Tantivy-sub crate: bitpacking""" description = """Tantivy-sub crate: bitpacking"""
repository = "https://github.com/quickwit-oss/tantivy" repository = "https://github.com/quickwit-oss/tantivy"
keywords = [] keywords = []
documentation = "https://docs.rs/tantivy-bitpacker/latest/tantivy_bitpacker"
homepage = "https://github.com/quickwit-oss/tantivy"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

View File

@@ -1,16 +1,20 @@
[package] [package]
name = "tantivy-common" name = "tantivy-common"
version = "0.3.0" version = "0.4.0"
authors = ["Paul Masurel <paul@quickwit.io>", "Pascal Seitz <pascal@quickwit.io>"] authors = ["Paul Masurel <paul@quickwit.io>", "Pascal Seitz <pascal@quickwit.io>"]
license = "MIT" license = "MIT"
edition = "2021" edition = "2021"
description = "common traits and utility functions used by multiple tantivy subcrates" description = "common traits and utility functions used by multiple tantivy subcrates"
documentation = "https://docs.rs/tantivy_common/"
homepage = "https://github.com/quickwit-oss/tantivy"
repository = "https://github.com/quickwit-oss/tantivy"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
byteorder = "1.4.3" byteorder = "1.4.3"
ownedbytes = { version="0.3", path="../ownedbytes" } ownedbytes = { version= "0.4", path="../ownedbytes" }
[dev-dependencies] [dev-dependencies]
proptest = "1.0.0" proptest = "1.0.0"

View File

@@ -94,6 +94,20 @@ impl FixedSize for u32 {
const SIZE_IN_BYTES: usize = 4; const SIZE_IN_BYTES: usize = 4;
} }
impl BinarySerializable for u16 {
fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
writer.write_u16::<Endianness>(*self)
}
fn deserialize<R: Read>(reader: &mut R) -> io::Result<u16> {
reader.read_u16::<Endianness>()
}
}
impl FixedSize for u16 {
const SIZE_IN_BYTES: usize = 2;
}
impl BinarySerializable for u64 { impl BinarySerializable for u64 {
fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> { fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
writer.write_u64::<Endianness>(*self) writer.write_u64::<Endianness>(*self)

View File

@@ -118,7 +118,7 @@ fn main() -> tantivy::Result<()> {
.into_iter() .into_iter()
.collect(); .collect();
let collector = AggregationCollector::from_aggs(agg_req_1, None); let collector = AggregationCollector::from_aggs(agg_req_1, None, index.schema());
let searcher = reader.searcher(); let searcher = reader.searcher();
let agg_res: AggregationResults = searcher.search(&term_query, &collector).unwrap(); let agg_res: AggregationResults = searcher.search(&term_query, &collector).unwrap();

View File

@@ -1,17 +1,20 @@
[package] [package]
name = "fastfield_codecs" name = "fastfield_codecs"
version = "0.2.0" version = "0.3.1"
authors = ["Pascal Seitz <pascal@quickwit.io>"] authors = ["Pascal Seitz <pascal@quickwit.io>"]
license = "MIT" license = "MIT"
edition = "2021" edition = "2021"
description = "Fast field codecs used by tantivy" description = "Fast field codecs used by tantivy"
documentation = "https://docs.rs/fastfield_codecs/"
homepage = "https://github.com/quickwit-oss/tantivy"
repository = "https://github.com/quickwit-oss/tantivy"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
common = { version = "0.3", path = "../common/", package = "tantivy-common" } common = { version = "0.4", path = "../common/", package = "tantivy-common" }
tantivy-bitpacker = { version="0.2", path = "../bitpacker/" } tantivy-bitpacker = { version= "0.3", path = "../bitpacker/" }
ownedbytes = { version = "0.3.0", path = "../ownedbytes" } ownedbytes = { version = "0.4.0", path = "../ownedbytes" }
prettytable-rs = {version="0.9.0", optional= true} prettytable-rs = {version="0.9.0", optional= true}
rand = {version="0.8.3", optional= true} rand = {version="0.8.3", optional= true}
fastdivide = "0.4" fastdivide = "0.4"

View File

@@ -1,3 +1,4 @@
use std::fmt::{self, Debug};
use std::marker::PhantomData; use std::marker::PhantomData;
use std::ops::{Range, RangeInclusive}; use std::ops::{Range, RangeInclusive};
@@ -6,7 +7,7 @@ use tantivy_bitpacker::minmax;
use crate::monotonic_mapping::StrictlyMonotonicFn; use crate::monotonic_mapping::StrictlyMonotonicFn;
/// `Column` provides columnar access on a field. /// `Column` provides columnar access on a field.
pub trait Column<T: PartialOrd = u64>: Send + Sync { pub trait Column<T: PartialOrd + Debug = u64>: Send + Sync {
/// Return the value associated with the given idx. /// Return the value associated with the given idx.
/// ///
/// This accessor should return as fast as possible. /// This accessor should return as fast as possible.
@@ -83,7 +84,7 @@ pub struct VecColumn<'a, T = u64> {
max_value: T, max_value: T,
} }
impl<'a, C: Column<T>, T: Copy + PartialOrd> Column<T> for &'a C { impl<'a, C: Column<T>, T: Copy + PartialOrd + fmt::Debug> Column<T> for &'a C {
fn get_val(&self, idx: u32) -> T { fn get_val(&self, idx: u32) -> T {
(*self).get_val(idx) (*self).get_val(idx)
} }
@@ -109,7 +110,7 @@ impl<'a, C: Column<T>, T: Copy + PartialOrd> Column<T> for &'a C {
} }
} }
impl<'a, T: Copy + PartialOrd + Send + Sync> Column<T> for VecColumn<'a, T> { impl<'a, T: Copy + PartialOrd + Send + Sync + Debug> Column<T> for VecColumn<'a, T> {
fn get_val(&self, position: u32) -> T { fn get_val(&self, position: u32) -> T {
self.values[position as usize] self.values[position as usize]
} }
@@ -177,8 +178,8 @@ pub fn monotonic_map_column<C, T, Input, Output>(
where where
C: Column<Input>, C: Column<Input>,
T: StrictlyMonotonicFn<Input, Output> + Send + Sync, T: StrictlyMonotonicFn<Input, Output> + Send + Sync,
Input: PartialOrd + Send + Sync + Clone, Input: PartialOrd + Send + Sync + Copy + Debug,
Output: PartialOrd + Send + Sync + Clone, Output: PartialOrd + Send + Sync + Copy + Debug,
{ {
MonotonicMappingColumn { MonotonicMappingColumn {
from_column, from_column,
@@ -191,8 +192,8 @@ impl<C, T, Input, Output> Column<Output> for MonotonicMappingColumn<C, T, Input>
where where
C: Column<Input>, C: Column<Input>,
T: StrictlyMonotonicFn<Input, Output> + Send + Sync, T: StrictlyMonotonicFn<Input, Output> + Send + Sync,
Input: PartialOrd + Send + Sync + Clone, Input: PartialOrd + Send + Sync + Copy + Debug,
Output: PartialOrd + Send + Sync + Clone, Output: PartialOrd + Send + Sync + Copy + Debug,
{ {
#[inline] #[inline]
fn get_val(&self, idx: u32) -> Output { fn get_val(&self, idx: u32) -> Output {
@@ -228,12 +229,15 @@ where
doc_id_range: Range<u32>, doc_id_range: Range<u32>,
positions: &mut Vec<u32>, positions: &mut Vec<u32>,
) { ) {
self.from_column.get_docids_for_value_range( if range.start() > &self.max_value() || range.end() < &self.min_value() {
self.monotonic_mapping.inverse(range.start().clone()) return;
..=self.monotonic_mapping.inverse(range.end().clone()), }
doc_id_range, let range = self.monotonic_mapping.inverse_coerce(range);
positions, if range.start() > range.end() {
) return;
}
self.from_column
.get_docids_for_value_range(range, doc_id_range, positions)
} }
// We voluntarily do not implement get_range as it yields a regression, // We voluntarily do not implement get_range as it yields a regression,
@@ -254,7 +258,7 @@ where T: Iterator + Clone + ExactSizeIterator
impl<T> Column<T::Item> for IterColumn<T> impl<T> Column<T::Item> for IterColumn<T>
where where
T: Iterator + Clone + ExactSizeIterator + Send + Sync, T: Iterator + Clone + ExactSizeIterator + Send + Sync,
T::Item: PartialOrd, T::Item: PartialOrd + fmt::Debug,
{ {
fn get_val(&self, idx: u32) -> T::Item { fn get_val(&self, idx: u32) -> T::Item {
self.0.clone().nth(idx as usize).unwrap() self.0.clone().nth(idx as usize).unwrap()

View File

@@ -455,7 +455,11 @@ impl CompactSpaceDecompressor {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::fmt;
use super::*; use super::*;
use crate::format_version::read_format_version;
use crate::null_index_footer::read_null_index_footer;
use crate::serialize::U128Header; use crate::serialize::U128Header;
use crate::{open_u128, serialize_u128}; use crate::{open_u128, serialize_u128};
@@ -541,7 +545,10 @@ mod tests {
.unwrap(); .unwrap();
let data = OwnedBytes::new(out); let data = OwnedBytes::new(out);
let (data, _format_version) = read_format_version(data).unwrap();
let (data, _null_index_footer) = read_null_index_footer(data).unwrap();
test_all(data.clone(), u128_vals); test_all(data.clone(), u128_vals);
data data
} }
@@ -559,6 +566,7 @@ mod tests {
333u128, 333u128,
]; ];
let mut data = test_aux_vals(vals); let mut data = test_aux_vals(vals);
let _header = U128Header::deserialize(&mut data); let _header = U128Header::deserialize(&mut data);
let decomp = CompactSpaceDecompressor::open(data).unwrap(); let decomp = CompactSpaceDecompressor::open(data).unwrap();
let complete_range = 0..vals.len() as u32; let complete_range = 0..vals.len() as u32;
@@ -702,7 +710,7 @@ mod tests {
); );
} }
fn get_positions_for_value_range_helper<C: Column<T> + ?Sized, T: PartialOrd>( fn get_positions_for_value_range_helper<C: Column<T> + ?Sized, T: PartialOrd + fmt::Debug>(
column: &C, column: &C,
value_range: RangeInclusive<T>, value_range: RangeInclusive<T>,
doc_id_range: Range<u32>, doc_id_range: Range<u32>,

View File

@@ -0,0 +1,39 @@
use std::io;
use common::BinarySerializable;
use ownedbytes::OwnedBytes;
const MAGIC_NUMBER: u16 = 4335u16;
const FASTFIELD_FORMAT_VERSION: u8 = 1;
pub(crate) fn append_format_version(output: &mut impl io::Write) -> io::Result<()> {
FASTFIELD_FORMAT_VERSION.serialize(output)?;
MAGIC_NUMBER.serialize(output)?;
Ok(())
}
pub(crate) fn read_format_version(data: OwnedBytes) -> io::Result<(OwnedBytes, u8)> {
let (data, magic_number_bytes) = data.rsplit(2);
let magic_number = u16::deserialize(&mut magic_number_bytes.as_slice())?;
if magic_number != MAGIC_NUMBER {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("magic number mismatch {} != {}", magic_number, MAGIC_NUMBER),
));
}
let (data, format_version_bytes) = data.rsplit(1);
let format_version = u8::deserialize(&mut format_version_bytes.as_slice())?;
if format_version > FASTFIELD_FORMAT_VERSION {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!(
"Unsupported fastfield format version: {}. Max supported version: {}",
format_version, FASTFIELD_FORMAT_VERSION
),
));
}
Ok((data, format_version))
}

View File

@@ -14,26 +14,30 @@ extern crate more_asserts;
#[cfg(all(test, feature = "unstable"))] #[cfg(all(test, feature = "unstable"))]
extern crate test; extern crate test;
use std::io;
use std::io::Write; use std::io::Write;
use std::sync::Arc; use std::sync::Arc;
use std::{fmt, io};
use common::BinarySerializable; use common::BinarySerializable;
use compact_space::CompactSpaceDecompressor; use compact_space::CompactSpaceDecompressor;
use format_version::read_format_version;
use monotonic_mapping::{ use monotonic_mapping::{
StrictlyMonotonicMappingInverter, StrictlyMonotonicMappingToInternal, StrictlyMonotonicMappingInverter, StrictlyMonotonicMappingToInternal,
StrictlyMonotonicMappingToInternalBaseval, StrictlyMonotonicMappingToInternalGCDBaseval, StrictlyMonotonicMappingToInternalBaseval, StrictlyMonotonicMappingToInternalGCDBaseval,
}; };
use null_index_footer::read_null_index_footer;
use ownedbytes::OwnedBytes; use ownedbytes::OwnedBytes;
use serialize::{Header, U128Header}; use serialize::{Header, U128Header};
mod bitpacked; mod bitpacked;
mod blockwise_linear; mod blockwise_linear;
mod compact_space; mod compact_space;
mod format_version;
mod line; mod line;
mod linear; mod linear;
mod monotonic_mapping; mod monotonic_mapping;
mod monotonic_mapping_u128; mod monotonic_mapping_u128;
mod null_index_footer;
mod column; mod column;
mod gcd; mod gcd;
@@ -128,9 +132,11 @@ impl U128FastFieldCodecType {
} }
/// Returns the correct codec reader wrapped in the `Arc` for the data. /// Returns the correct codec reader wrapped in the `Arc` for the data.
pub fn open_u128<Item: MonotonicallyMappableToU128>( pub fn open_u128<Item: MonotonicallyMappableToU128 + fmt::Debug>(
mut bytes: OwnedBytes, bytes: OwnedBytes,
) -> io::Result<Arc<dyn Column<Item>>> { ) -> io::Result<Arc<dyn Column<Item>>> {
let (bytes, _format_version) = read_format_version(bytes)?;
let (mut bytes, _null_index_footer) = read_null_index_footer(bytes)?;
let header = U128Header::deserialize(&mut bytes)?; let header = U128Header::deserialize(&mut bytes)?;
assert_eq!(header.codec_type, U128FastFieldCodecType::CompactSpace); assert_eq!(header.codec_type, U128FastFieldCodecType::CompactSpace);
let reader = CompactSpaceDecompressor::open(bytes)?; let reader = CompactSpaceDecompressor::open(bytes)?;
@@ -140,9 +146,11 @@ pub fn open_u128<Item: MonotonicallyMappableToU128>(
} }
/// Returns the correct codec reader wrapped in the `Arc` for the data. /// Returns the correct codec reader wrapped in the `Arc` for the data.
pub fn open<T: MonotonicallyMappableToU64>( pub fn open<T: MonotonicallyMappableToU64 + fmt::Debug>(
mut bytes: OwnedBytes, bytes: OwnedBytes,
) -> io::Result<Arc<dyn Column<T>>> { ) -> io::Result<Arc<dyn Column<T>>> {
let (bytes, _format_version) = read_format_version(bytes)?;
let (mut bytes, _null_index_footer) = read_null_index_footer(bytes)?;
let header = Header::deserialize(&mut bytes)?; let header = Header::deserialize(&mut bytes)?;
match header.codec_type { match header.codec_type {
FastFieldCodecType::Bitpacked => open_specific_codec::<BitpackedCodec, _>(bytes, &header), FastFieldCodecType::Bitpacked => open_specific_codec::<BitpackedCodec, _>(bytes, &header),
@@ -153,7 +161,7 @@ pub fn open<T: MonotonicallyMappableToU64>(
} }
} }
fn open_specific_codec<C: FastFieldCodec, Item: MonotonicallyMappableToU64>( fn open_specific_codec<C: FastFieldCodec, Item: MonotonicallyMappableToU64 + fmt::Debug>(
bytes: OwnedBytes, bytes: OwnedBytes,
header: &Header, header: &Header,
) -> io::Result<Arc<dyn Column<Item>>> { ) -> io::Result<Arc<dyn Column<Item>>> {
@@ -314,6 +322,9 @@ mod tests {
pub fn get_codec_test_datasets() -> Vec<(Vec<u64>, &'static str)> { pub fn get_codec_test_datasets() -> Vec<(Vec<u64>, &'static str)> {
let mut data_and_names = vec![]; let mut data_and_names = vec![];
let data = vec![10];
data_and_names.push((data, "minimal test"));
let data = (10..=10_000_u64).collect::<Vec<_>>(); let data = (10..=10_000_u64).collect::<Vec<_>>();
data_and_names.push((data, "simple monotonically increasing")); data_and_names.push((data, "simple monotonically increasing"));
@@ -321,6 +332,9 @@ mod tests {
vec![5, 6, 7, 8, 9, 10, 99, 100], vec![5, 6, 7, 8, 9, 10, 99, 100],
"offset in linear interpol", "offset in linear interpol",
)); ));
data_and_names.push((vec![3, 18446744073709551613, 5], "docid range regression"));
data_and_names.push((vec![5, 50, 3, 13, 1, 1000, 35], "rand small")); data_and_names.push((vec![5, 50, 3, 13, 1, 1000, 35], "rand small"));
data_and_names.push((vec![10], "single value")); data_and_names.push((vec![10], "single value"));

View File

@@ -1,4 +1,6 @@
use std::fmt;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::ops::RangeInclusive;
use fastdivide::DividerU64; use fastdivide::DividerU64;
@@ -6,7 +8,9 @@ use crate::MonotonicallyMappableToU128;
/// Monotonic maps a value to u64 value space. /// Monotonic maps a value to u64 value space.
/// Monotonic mapping enables `PartialOrd` on u64 space without conversion to original space. /// Monotonic mapping enables `PartialOrd` on u64 space without conversion to original space.
pub trait MonotonicallyMappableToU64: 'static + PartialOrd + Copy + Send + Sync { pub trait MonotonicallyMappableToU64:
'static + PartialOrd + Copy + Send + Sync + fmt::Debug
{
/// Converts a value to u64. /// Converts a value to u64.
/// ///
/// Internally all fast field values are encoded as u64. /// Internally all fast field values are encoded as u64.
@@ -29,11 +33,29 @@ pub trait MonotonicallyMappableToU64: 'static + PartialOrd + Copy + Send + Sync
/// mapping from their range to their domain. The `inverse` method is required when opening a codec, /// mapping from their range to their domain. The `inverse` method is required when opening a codec,
/// so a value can be converted back to its original domain (e.g. ip address or f64) from its /// so a value can be converted back to its original domain (e.g. ip address or f64) from its
/// internal representation. /// internal representation.
pub trait StrictlyMonotonicFn<External, Internal> { pub trait StrictlyMonotonicFn<External: Copy, Internal: Copy> {
/// Strictly monotonically maps the value from External to Internal. /// Strictly monotonically maps the value from External to Internal.
fn mapping(&self, inp: External) -> Internal; fn mapping(&self, inp: External) -> Internal;
/// Inverse of `mapping`. Maps the value from Internal to External. /// Inverse of `mapping`. Maps the value from Internal to External.
fn inverse(&self, out: Internal) -> External; fn inverse(&self, out: Internal) -> External;
/// Maps a user provded value from External to Internal.
/// It may be necessary to coerce the value if it is outside the value space.
/// In that case it tries to find the next greater value in the value space.
///
/// Returns a bool to mark if a value was outside the value space and had to be coerced _up_.
/// With that information we can detect if two values in a range both map outside the same value
/// space.
///
/// coerce_up means the next valid upper value in the value space will be chosen if the value
/// has to be coerced.
fn mapping_coerce(&self, inp: RangeInclusive<External>) -> RangeInclusive<Internal> {
self.mapping(*inp.start())..=self.mapping(*inp.end())
}
/// Inverse of `mapping_coerce`.
fn inverse_coerce(&self, out: RangeInclusive<Internal>) -> RangeInclusive<External> {
self.inverse(*out.start())..=self.inverse(*out.end())
}
} }
/// Inverts a strictly monotonic mapping from `StrictlyMonotonicFn<A, B>` to /// Inverts a strictly monotonic mapping from `StrictlyMonotonicFn<A, B>` to
@@ -54,7 +76,10 @@ impl<T> From<T> for StrictlyMonotonicMappingInverter<T> {
} }
impl<From, To, T> StrictlyMonotonicFn<To, From> for StrictlyMonotonicMappingInverter<T> impl<From, To, T> StrictlyMonotonicFn<To, From> for StrictlyMonotonicMappingInverter<T>
where T: StrictlyMonotonicFn<From, To> where
T: StrictlyMonotonicFn<From, To>,
From: Copy,
To: Copy,
{ {
fn mapping(&self, val: To) -> From { fn mapping(&self, val: To) -> From {
self.orig_mapping.inverse(val) self.orig_mapping.inverse(val)
@@ -63,6 +88,15 @@ where T: StrictlyMonotonicFn<From, To>
fn inverse(&self, val: From) -> To { fn inverse(&self, val: From) -> To {
self.orig_mapping.mapping(val) self.orig_mapping.mapping(val)
} }
#[inline]
fn mapping_coerce(&self, inp: RangeInclusive<To>) -> RangeInclusive<From> {
self.orig_mapping.inverse_coerce(inp)
}
#[inline]
fn inverse_coerce(&self, out: RangeInclusive<From>) -> RangeInclusive<To> {
self.orig_mapping.mapping_coerce(out)
}
} }
/// Applies the strictly monotonic mapping from `T` without any additional changes. /// Applies the strictly monotonic mapping from `T` without any additional changes.
@@ -134,6 +168,31 @@ impl<External: MonotonicallyMappableToU64> StrictlyMonotonicFn<External, u64>
fn inverse(&self, out: u64) -> External { fn inverse(&self, out: u64) -> External {
External::from_u64(self.min_value + out * self.gcd) External::from_u64(self.min_value + out * self.gcd)
} }
#[inline]
#[allow(clippy::reversed_empty_ranges)]
fn mapping_coerce(&self, inp: RangeInclusive<External>) -> RangeInclusive<u64> {
let end = External::to_u64(*inp.end());
if end < self.min_value || inp.end() < inp.start() {
return 1..=0;
}
let map_coerce = |mut inp, coerce_up| {
let inp_lower_bound = self.inverse(0);
if inp < inp_lower_bound {
inp = inp_lower_bound;
}
let val = External::to_u64(inp);
let need_coercion = coerce_up && (val - self.min_value) % self.gcd != 0;
let mut mapped_val = self.mapping(inp);
if need_coercion {
mapped_val += 1;
}
mapped_val
};
let start = map_coerce(*inp.start(), true);
let end = map_coerce(*inp.end(), false);
start..=end
}
} }
/// Strictly monotonic mapping with a base value. /// Strictly monotonic mapping with a base value.
@@ -149,6 +208,17 @@ impl StrictlyMonotonicMappingToInternalBaseval {
impl<External: MonotonicallyMappableToU64> StrictlyMonotonicFn<External, u64> impl<External: MonotonicallyMappableToU64> StrictlyMonotonicFn<External, u64>
for StrictlyMonotonicMappingToInternalBaseval for StrictlyMonotonicMappingToInternalBaseval
{ {
#[inline]
#[allow(clippy::reversed_empty_ranges)]
fn mapping_coerce(&self, inp: RangeInclusive<External>) -> RangeInclusive<u64> {
if External::to_u64(*inp.end()) < self.min_value {
return 1..=0;
}
let start = self.mapping(External::to_u64(*inp.start()).max(self.min_value));
let end = self.mapping(External::to_u64(*inp.end()));
start..=end
}
fn mapping(&self, val: External) -> u64 { fn mapping(&self, val: External) -> u64 {
External::to_u64(val) - self.min_value External::to_u64(val) - self.min_value
} }
@@ -224,7 +294,7 @@ mod tests {
test_round_trip::<_, _, u64>(&mapping, 100u64); test_round_trip::<_, _, u64>(&mapping, 100u64);
} }
fn test_round_trip<T: StrictlyMonotonicFn<K, L>, K: std::fmt::Debug + Eq + Copy, L>( fn test_round_trip<T: StrictlyMonotonicFn<K, L>, K: std::fmt::Debug + Eq + Copy, L: Copy>(
mapping: &T, mapping: &T,
test_val: K, test_val: K,
) { ) {

View File

@@ -1,8 +1,11 @@
use std::fmt;
use std::net::Ipv6Addr; use std::net::Ipv6Addr;
/// Montonic maps a value to u128 value space /// Montonic maps a value to u128 value space
/// Monotonic mapping enables `PartialOrd` on u128 space without conversion to original space. /// Monotonic mapping enables `PartialOrd` on u128 space without conversion to original space.
pub trait MonotonicallyMappableToU128: 'static + PartialOrd + Copy + Send + Sync { pub trait MonotonicallyMappableToU128:
'static + PartialOrd + Copy + Send + Sync + fmt::Debug
{
/// Converts a value to u128. /// Converts a value to u128.
/// ///
/// Internally all fast field values are encoded as u64. /// Internally all fast field values are encoded as u64.

View File

@@ -0,0 +1,144 @@
use std::io::{self, Write};
use std::ops::Range;
use common::{BinarySerializable, CountingWriter, VInt};
use ownedbytes::OwnedBytes;
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub(crate) enum FastFieldCardinality {
Single = 1,
}
impl BinarySerializable for FastFieldCardinality {
fn serialize<W: Write>(&self, wrt: &mut W) -> io::Result<()> {
self.to_code().serialize(wrt)
}
fn deserialize<R: io::Read>(reader: &mut R) -> io::Result<Self> {
let code = u8::deserialize(reader)?;
let codec_type: Self = Self::from_code(code)
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "Unknown code `{code}.`"))?;
Ok(codec_type)
}
}
impl FastFieldCardinality {
pub(crate) fn to_code(self) -> u8 {
self as u8
}
pub(crate) fn from_code(code: u8) -> Option<Self> {
match code {
1 => Some(Self::Single),
_ => None,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum NullIndexCodec {
Full = 1,
}
impl BinarySerializable for NullIndexCodec {
fn serialize<W: Write>(&self, wrt: &mut W) -> io::Result<()> {
self.to_code().serialize(wrt)
}
fn deserialize<R: io::Read>(reader: &mut R) -> io::Result<Self> {
let code = u8::deserialize(reader)?;
let codec_type: Self = Self::from_code(code)
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "Unknown code `{code}.`"))?;
Ok(codec_type)
}
}
impl NullIndexCodec {
pub(crate) fn to_code(self) -> u8 {
self as u8
}
pub(crate) fn from_code(code: u8) -> Option<Self> {
match code {
1 => Some(Self::Full),
_ => None,
}
}
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub(crate) struct NullIndexFooter {
pub(crate) cardinality: FastFieldCardinality,
pub(crate) null_index_codec: NullIndexCodec,
// Unused for NullIndexCodec::Full
pub(crate) null_index_byte_range: Range<u64>,
}
impl BinarySerializable for NullIndexFooter {
fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
self.cardinality.serialize(writer)?;
self.null_index_codec.serialize(writer)?;
VInt(self.null_index_byte_range.start).serialize(writer)?;
VInt(self.null_index_byte_range.end - self.null_index_byte_range.start)
.serialize(writer)?;
Ok(())
}
fn deserialize<R: io::Read>(reader: &mut R) -> io::Result<Self> {
let cardinality = FastFieldCardinality::deserialize(reader)?;
let null_index_codec = NullIndexCodec::deserialize(reader)?;
let null_index_byte_range_start = VInt::deserialize(reader)?.0;
let null_index_byte_range_end = VInt::deserialize(reader)?.0 + null_index_byte_range_start;
Ok(Self {
cardinality,
null_index_codec,
null_index_byte_range: null_index_byte_range_start..null_index_byte_range_end,
})
}
}
pub(crate) fn append_null_index_footer(
output: &mut impl io::Write,
null_index_footer: NullIndexFooter,
) -> io::Result<()> {
let mut counting_write = CountingWriter::wrap(output);
null_index_footer.serialize(&mut counting_write)?;
let footer_payload_len = counting_write.written_bytes();
BinarySerializable::serialize(&(footer_payload_len as u16), &mut counting_write)?;
Ok(())
}
pub(crate) fn read_null_index_footer(
data: OwnedBytes,
) -> io::Result<(OwnedBytes, NullIndexFooter)> {
let (data, null_footer_length_bytes) = data.rsplit(2);
let footer_length = u16::deserialize(&mut null_footer_length_bytes.as_slice())?;
let (data, null_index_footer_bytes) = data.rsplit(footer_length as usize);
let null_index_footer = NullIndexFooter::deserialize(&mut null_index_footer_bytes.as_ref())?;
Ok((data, null_index_footer))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn null_index_footer_deser_test() {
let null_index_footer = NullIndexFooter {
cardinality: FastFieldCardinality::Single,
null_index_codec: NullIndexCodec::Full,
null_index_byte_range: 100..120,
};
let mut out = vec![];
null_index_footer.serialize(&mut out).unwrap();
assert_eq!(
null_index_footer,
NullIndexFooter::deserialize(&mut &out[..]).unwrap()
);
}
}

View File

@@ -17,9 +17,9 @@
// You should have received a copy of the GNU Affero General Public License // You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>. // along with this program. If not, see <http://www.gnu.org/licenses/>.
use std::io;
use std::num::NonZeroU64; use std::num::NonZeroU64;
use std::sync::Arc; use std::sync::Arc;
use std::{fmt, io};
use common::{BinarySerializable, VInt}; use common::{BinarySerializable, VInt};
use log::warn; use log::warn;
@@ -28,11 +28,15 @@ use ownedbytes::OwnedBytes;
use crate::bitpacked::BitpackedCodec; use crate::bitpacked::BitpackedCodec;
use crate::blockwise_linear::BlockwiseLinearCodec; use crate::blockwise_linear::BlockwiseLinearCodec;
use crate::compact_space::CompactSpaceCompressor; use crate::compact_space::CompactSpaceCompressor;
use crate::format_version::append_format_version;
use crate::linear::LinearCodec; use crate::linear::LinearCodec;
use crate::monotonic_mapping::{ use crate::monotonic_mapping::{
StrictlyMonotonicFn, StrictlyMonotonicMappingToInternal, StrictlyMonotonicFn, StrictlyMonotonicMappingToInternal,
StrictlyMonotonicMappingToInternalGCDBaseval, StrictlyMonotonicMappingToInternalGCDBaseval,
}; };
use crate::null_index_footer::{
append_null_index_footer, FastFieldCardinality, NullIndexCodec, NullIndexFooter,
};
use crate::{ use crate::{
monotonic_map_column, Column, FastFieldCodec, FastFieldCodecType, MonotonicallyMappableToU64, monotonic_map_column, Column, FastFieldCodec, FastFieldCodecType, MonotonicallyMappableToU64,
U128FastFieldCodecType, VecColumn, ALL_CODEC_TYPES, U128FastFieldCodecType, VecColumn, ALL_CODEC_TYPES,
@@ -164,7 +168,7 @@ impl BinarySerializable for Header {
/// Return estimated compression for given codec in the value range [0.0..1.0], where 1.0 means no /// Return estimated compression for given codec in the value range [0.0..1.0], where 1.0 means no
/// compression. /// compression.
pub fn estimate<T: MonotonicallyMappableToU64>( pub fn estimate<T: MonotonicallyMappableToU64 + fmt::Debug>(
typed_column: impl Column<T>, typed_column: impl Column<T>,
codec_type: FastFieldCodecType, codec_type: FastFieldCodecType,
) -> Option<f32> { ) -> Option<f32> {
@@ -198,11 +202,19 @@ pub fn serialize_u128<F: Fn() -> I, I: Iterator<Item = u128>>(
let compressor = CompactSpaceCompressor::train_from(iter_gen(), num_vals); let compressor = CompactSpaceCompressor::train_from(iter_gen(), num_vals);
compressor.compress_into(iter_gen(), output).unwrap(); compressor.compress_into(iter_gen(), output).unwrap();
let null_index_footer = NullIndexFooter {
cardinality: FastFieldCardinality::Single,
null_index_codec: NullIndexCodec::Full,
null_index_byte_range: 0..0,
};
append_null_index_footer(output, null_index_footer)?;
append_format_version(output)?;
Ok(()) Ok(())
} }
/// Serializes the column with the codec with the best estimate on the data. /// Serializes the column with the codec with the best estimate on the data.
pub fn serialize<T: MonotonicallyMappableToU64>( pub fn serialize<T: MonotonicallyMappableToU64 + fmt::Debug>(
typed_column: impl Column<T>, typed_column: impl Column<T>,
output: &mut impl io::Write, output: &mut impl io::Write,
codecs: &[FastFieldCodecType], codecs: &[FastFieldCodecType],
@@ -221,6 +233,15 @@ pub fn serialize<T: MonotonicallyMappableToU64>(
let normalized_column = header.normalize_column(column); let normalized_column = header.normalize_column(column);
assert_eq!(normalized_column.min_value(), 0u64); assert_eq!(normalized_column.min_value(), 0u64);
serialize_given_codec(normalized_column, header.codec_type, output)?; serialize_given_codec(normalized_column, header.codec_type, output)?;
let null_index_footer = NullIndexFooter {
cardinality: FastFieldCardinality::Single,
null_index_codec: NullIndexCodec::Full,
null_index_byte_range: 0..0,
};
append_null_index_footer(output, null_index_footer)?;
append_format_version(output)?;
Ok(()) Ok(())
} }
@@ -273,7 +294,7 @@ fn serialize_given_codec(
} }
/// Helper function to serialize a column (autodetect from all codecs) and then open it /// Helper function to serialize a column (autodetect from all codecs) and then open it
pub fn serialize_and_load<T: MonotonicallyMappableToU64 + Ord + Default>( pub fn serialize_and_load<T: MonotonicallyMappableToU64 + Ord + Default + fmt::Debug>(
column: &[T], column: &[T],
) -> Arc<dyn Column<T>> { ) -> Arc<dyn Column<T>> {
let mut buffer = Vec::new(); let mut buffer = Vec::new();
@@ -310,7 +331,7 @@ mod tests {
let col = VecColumn::from(&[false, true][..]); let col = VecColumn::from(&[false, true][..]);
serialize(col, &mut buffer, &ALL_CODEC_TYPES).unwrap(); serialize(col, &mut buffer, &ALL_CODEC_TYPES).unwrap();
// 5 bytes of header, 1 byte of value, 7 bytes of padding. // 5 bytes of header, 1 byte of value, 7 bytes of padding.
assert_eq!(buffer.len(), 5 + 8); assert_eq!(buffer.len(), 3 + 5 + 8 + 4 + 2);
} }
#[test] #[test]
@@ -319,7 +340,7 @@ mod tests {
let col = VecColumn::from(&[true][..]); let col = VecColumn::from(&[true][..]);
serialize(col, &mut buffer, &ALL_CODEC_TYPES).unwrap(); serialize(col, &mut buffer, &ALL_CODEC_TYPES).unwrap();
// 5 bytes of header, 0 bytes of value, 7 bytes of padding. // 5 bytes of header, 0 bytes of value, 7 bytes of padding.
assert_eq!(buffer.len(), 5 + 7); assert_eq!(buffer.len(), 3 + 5 + 7 + 4 + 2);
} }
#[test] #[test]
@@ -329,6 +350,6 @@ mod tests {
let col = VecColumn::from(&vals[..]); let col = VecColumn::from(&vals[..]);
serialize(col, &mut buffer, &[FastFieldCodecType::Bitpacked]).unwrap(); serialize(col, &mut buffer, &[FastFieldCodecType::Bitpacked]).unwrap();
// Values are stored over 3 bits. // Values are stored over 3 bits.
assert_eq!(buffer.len(), 7 + (3 * 80 / 8) + 7); assert_eq!(buffer.len(), 3 + 7 + (3 * 80 / 8) + 7 + 4 + 2);
} }
} }

View File

@@ -1,10 +1,14 @@
[package] [package]
authors = ["Paul Masurel <paul@quickwit.io>", "Pascal Seitz <pascal@quickwit.io>"] authors = ["Paul Masurel <paul@quickwit.io>", "Pascal Seitz <pascal@quickwit.io>"]
name = "ownedbytes" name = "ownedbytes"
version = "0.3.0" version = "0.4.0"
edition = "2021" edition = "2021"
description = "Expose data as static slice" description = "Expose data as static slice"
license = "MIT" license = "MIT"
documentation = "https://docs.rs/ownedbytes/"
homepage = "https://github.com/quickwit-oss/tantivy"
repository = "https://github.com/quickwit-oss/tantivy"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]

View File

@@ -80,6 +80,21 @@ impl OwnedBytes {
(left, right) (left, right)
} }
/// Splits the OwnedBytes into two OwnedBytes `(left, right)`.
///
/// Right will hold `split_len` bytes.
///
/// This operation is cheap and does not require to copy any memory.
/// On the other hand, both `left` and `right` retain a handle over
/// the entire slice of memory. In other words, the memory will only
/// be released when both left and right are dropped.
#[inline]
#[must_use]
pub fn rsplit(self, split_len: usize) -> (OwnedBytes, OwnedBytes) {
let data_len = self.data.len();
self.split(data_len - split_len)
}
/// Splits the right part of the `OwnedBytes` at the given offset. /// Splits the right part of the `OwnedBytes` at the given offset.
/// ///
/// `self` is truncated to `split_len`, left with the remaining bytes. /// `self` is truncated to `split_len`, left with the remaining bytes.

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "tantivy-query-grammar" name = "tantivy-query-grammar"
version = "0.18.0" version = "0.19.0"
authors = ["Paul Masurel <paul.masurel@gmail.com>"] authors = ["Paul Masurel <paul.masurel@gmail.com>"]
license = "MIT" license = "MIT"
categories = ["database-implementations", "data-structures"] categories = ["database-implementations", "data-structures"]

View File

@@ -11,7 +11,7 @@ use super::bucket::{HistogramAggregation, RangeAggregation, TermsAggregation};
use super::metric::{AverageAggregation, StatsAggregation}; use super::metric::{AverageAggregation, StatsAggregation};
use super::segment_agg_result::BucketCount; use super::segment_agg_result::BucketCount;
use super::VecWithNames; use super::VecWithNames;
use crate::fastfield::{type_and_cardinality, FastType, MultiValuedFastFieldReader}; use crate::fastfield::{type_and_cardinality, MultiValuedFastFieldReader};
use crate::schema::{Cardinality, Type}; use crate::schema::{Cardinality, Type};
use crate::{InvertedIndexReader, SegmentReader, TantivyError}; use crate::{InvertedIndexReader, SegmentReader, TantivyError};
@@ -194,13 +194,7 @@ fn get_ff_reader_and_validate(
.ok_or_else(|| TantivyError::FieldNotFound(field_name.to_string()))?; .ok_or_else(|| TantivyError::FieldNotFound(field_name.to_string()))?;
let field_type = reader.schema().get_field_entry(field).field_type(); let field_type = reader.schema().get_field_entry(field).field_type();
if let Some((ff_type, field_cardinality)) = type_and_cardinality(field_type) { if let Some((_ff_type, field_cardinality)) = type_and_cardinality(field_type) {
if ff_type == FastType::Date {
return Err(TantivyError::InvalidArgument(
"Unsupported field type date in aggregation".to_string(),
));
}
if cardinality != field_cardinality { if cardinality != field_cardinality {
return Err(TantivyError::InvalidArgument(format!( return Err(TantivyError::InvalidArgument(format!(
"Invalid field cardinality on field {} expected {:?}, but got {:?}", "Invalid field cardinality on field {} expected {:?}, but got {:?}",

View File

@@ -12,6 +12,7 @@ use super::bucket::GetDocCount;
use super::intermediate_agg_result::{IntermediateBucketResult, IntermediateMetricResult}; use super::intermediate_agg_result::{IntermediateBucketResult, IntermediateMetricResult};
use super::metric::{SingleMetricResult, Stats}; use super::metric::{SingleMetricResult, Stats};
use super::Key; use super::Key;
use crate::schema::Schema;
use crate::TantivyError; use crate::TantivyError;
#[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize)] #[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize)]
@@ -129,9 +130,12 @@ pub enum BucketResult {
} }
impl BucketResult { impl BucketResult {
pub(crate) fn empty_from_req(req: &BucketAggregationInternal) -> crate::Result<Self> { pub(crate) fn empty_from_req(
req: &BucketAggregationInternal,
schema: &Schema,
) -> crate::Result<Self> {
let empty_bucket = IntermediateBucketResult::empty_from_req(&req.bucket_agg); let empty_bucket = IntermediateBucketResult::empty_from_req(&req.bucket_agg);
empty_bucket.into_final_bucket_result(req) empty_bucket.into_final_bucket_result(req, schema)
} }
} }
@@ -174,6 +178,9 @@ pub enum BucketEntries<T> {
/// ``` /// ```
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct BucketEntry { pub struct BucketEntry {
#[serde(skip_serializing_if = "Option::is_none")]
/// The string representation of the bucket.
pub key_as_string: Option<String>,
/// The identifier of the bucket. /// The identifier of the bucket.
pub key: Key, pub key: Key,
/// Number of documents in the bucket. /// Number of documents in the bucket.
@@ -238,4 +245,10 @@ pub struct RangeBucketEntry {
/// The to range of the bucket. Equals `f64::MAX` when `None`. /// The to range of the bucket. Equals `f64::MAX` when `None`.
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
pub to: Option<f64>, pub to: Option<f64>,
/// The optional string representation for the `from` range.
#[serde(skip_serializing_if = "Option::is_none")]
pub from_as_string: Option<String>,
/// The optional string representation for the `to` range.
#[serde(skip_serializing_if = "Option::is_none")]
pub to_as_string: Option<String>,
} }

View File

@@ -10,12 +10,12 @@ use crate::aggregation::agg_req_with_accessor::{
AggregationsWithAccessor, BucketAggregationWithAccessor, AggregationsWithAccessor, BucketAggregationWithAccessor,
}; };
use crate::aggregation::agg_result::BucketEntry; use crate::aggregation::agg_result::BucketEntry;
use crate::aggregation::f64_from_fastfield_u64;
use crate::aggregation::intermediate_agg_result::{ use crate::aggregation::intermediate_agg_result::{
IntermediateAggregationResults, IntermediateBucketResult, IntermediateHistogramBucketEntry, IntermediateAggregationResults, IntermediateBucketResult, IntermediateHistogramBucketEntry,
}; };
use crate::aggregation::segment_agg_result::SegmentAggregationResultsCollector; use crate::aggregation::segment_agg_result::SegmentAggregationResultsCollector;
use crate::schema::Type; use crate::aggregation::{f64_from_fastfield_u64, format_date};
use crate::schema::{Schema, Type};
use crate::{DocId, TantivyError}; use crate::{DocId, TantivyError};
/// Histogram is a bucket aggregation, where buckets are created dynamically for given `interval`. /// Histogram is a bucket aggregation, where buckets are created dynamically for given `interval`.
@@ -451,6 +451,7 @@ fn intermediate_buckets_to_final_buckets_fill_gaps(
buckets: Vec<IntermediateHistogramBucketEntry>, buckets: Vec<IntermediateHistogramBucketEntry>,
histogram_req: &HistogramAggregation, histogram_req: &HistogramAggregation,
sub_aggregation: &AggregationsInternal, sub_aggregation: &AggregationsInternal,
schema: &Schema,
) -> crate::Result<Vec<BucketEntry>> { ) -> crate::Result<Vec<BucketEntry>> {
// Generate the full list of buckets without gaps. // Generate the full list of buckets without gaps.
// //
@@ -491,7 +492,9 @@ fn intermediate_buckets_to_final_buckets_fill_gaps(
sub_aggregation: empty_sub_aggregation.clone(), sub_aggregation: empty_sub_aggregation.clone(),
}, },
}) })
.map(|intermediate_bucket| intermediate_bucket.into_final_bucket_entry(sub_aggregation)) .map(|intermediate_bucket| {
intermediate_bucket.into_final_bucket_entry(sub_aggregation, schema)
})
.collect::<crate::Result<Vec<_>>>() .collect::<crate::Result<Vec<_>>>()
} }
@@ -500,20 +503,43 @@ pub(crate) fn intermediate_histogram_buckets_to_final_buckets(
buckets: Vec<IntermediateHistogramBucketEntry>, buckets: Vec<IntermediateHistogramBucketEntry>,
histogram_req: &HistogramAggregation, histogram_req: &HistogramAggregation,
sub_aggregation: &AggregationsInternal, sub_aggregation: &AggregationsInternal,
schema: &Schema,
) -> crate::Result<Vec<BucketEntry>> { ) -> crate::Result<Vec<BucketEntry>> {
if histogram_req.min_doc_count() == 0 { let mut buckets = if histogram_req.min_doc_count() == 0 {
// With min_doc_count != 0, we may need to add buckets, so that there are no // With min_doc_count != 0, we may need to add buckets, so that there are no
// gaps, since intermediate result does not contain empty buckets (filtered to // gaps, since intermediate result does not contain empty buckets (filtered to
// reduce serialization size). // reduce serialization size).
intermediate_buckets_to_final_buckets_fill_gaps(buckets, histogram_req, sub_aggregation) intermediate_buckets_to_final_buckets_fill_gaps(
buckets,
histogram_req,
sub_aggregation,
schema,
)?
} else { } else {
buckets buckets
.into_iter() .into_iter()
.filter(|histogram_bucket| histogram_bucket.doc_count >= histogram_req.min_doc_count()) .filter(|histogram_bucket| histogram_bucket.doc_count >= histogram_req.min_doc_count())
.map(|histogram_bucket| histogram_bucket.into_final_bucket_entry(sub_aggregation)) .map(|histogram_bucket| {
.collect::<crate::Result<Vec<_>>>() histogram_bucket.into_final_bucket_entry(sub_aggregation, schema)
})
.collect::<crate::Result<Vec<_>>>()?
};
// If we have a date type on the histogram buckets, we add the `key_as_string` field as rfc339
let field = schema
.get_field(&histogram_req.field)
.ok_or_else(|| TantivyError::FieldNotFound(histogram_req.field.to_string()))?;
if schema.get_field_entry(field).field_type().is_date() {
for bucket in buckets.iter_mut() {
if let crate::aggregation::Key::F64(val) = bucket.key {
let key_as_string = format_date(val as i64)?;
bucket.key_as_string = Some(key_as_string);
}
}
} }
Ok(buckets)
} }
/// Applies req extended_bounds/hard_bounds on the min_max value /// Applies req extended_bounds/hard_bounds on the min_max value
@@ -1372,6 +1398,63 @@ mod tests {
Ok(()) Ok(())
} }
#[test]
fn histogram_date_test_single_segment() -> crate::Result<()> {
histogram_date_test_with_opt(true)
}
#[test]
fn histogram_date_test_multi_segment() -> crate::Result<()> {
histogram_date_test_with_opt(false)
}
fn histogram_date_test_with_opt(merge_segments: bool) -> crate::Result<()> {
let index = get_test_index_2_segments(merge_segments)?;
let agg_req: Aggregations = vec![(
"histogram".to_string(),
Aggregation::Bucket(BucketAggregation {
bucket_agg: BucketAggregationType::Histogram(HistogramAggregation {
field: "date".to_string(),
interval: 86400000000.0, // one day in microseconds
..Default::default()
}),
sub_aggregation: Default::default(),
}),
)]
.into_iter()
.collect();
let agg_res = exec_request(agg_req, &index)?;
let res: Value = serde_json::from_str(&serde_json::to_string(&agg_res)?)?;
assert_eq!(res["histogram"]["buckets"][0]["key"], 1546300800000000.0);
assert_eq!(
res["histogram"]["buckets"][0]["key_as_string"],
"2019-01-01T00:00:00Z"
);
assert_eq!(res["histogram"]["buckets"][0]["doc_count"], 1);
assert_eq!(res["histogram"]["buckets"][1]["key"], 1546387200000000.0);
assert_eq!(
res["histogram"]["buckets"][1]["key_as_string"],
"2019-01-02T00:00:00Z"
);
assert_eq!(res["histogram"]["buckets"][1]["doc_count"], 5);
assert_eq!(res["histogram"]["buckets"][2]["key"], 1546473600000000.0);
assert_eq!(
res["histogram"]["buckets"][2]["key_as_string"],
"2019-01-03T00:00:00Z"
);
assert_eq!(res["histogram"]["buckets"][3], Value::Null);
Ok(())
}
#[test] #[test]
fn histogram_invalid_request() -> crate::Result<()> { fn histogram_invalid_request() -> crate::Result<()> {
let index = get_test_index_2_segments(true)?; let index = get_test_index_2_segments(true)?;

View File

@@ -1,6 +1,7 @@
use std::fmt::Debug; use std::fmt::Debug;
use std::ops::Range; use std::ops::Range;
use fastfield_codecs::MonotonicallyMappableToU64;
use rustc_hash::FxHashMap; use rustc_hash::FxHashMap;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@@ -11,7 +12,9 @@ use crate::aggregation::intermediate_agg_result::{
IntermediateBucketResult, IntermediateRangeBucketEntry, IntermediateRangeBucketResult, IntermediateBucketResult, IntermediateRangeBucketEntry, IntermediateRangeBucketResult,
}; };
use crate::aggregation::segment_agg_result::{BucketCount, SegmentAggregationResultsCollector}; use crate::aggregation::segment_agg_result::{BucketCount, SegmentAggregationResultsCollector};
use crate::aggregation::{f64_from_fastfield_u64, f64_to_fastfield_u64, Key, SerializedKey}; use crate::aggregation::{
f64_from_fastfield_u64, f64_to_fastfield_u64, format_date, Key, SerializedKey,
};
use crate::schema::Type; use crate::schema::Type;
use crate::{DocId, TantivyError}; use crate::{DocId, TantivyError};
@@ -181,7 +184,7 @@ impl SegmentRangeCollector {
.into_iter() .into_iter()
.map(move |range_bucket| { .map(move |range_bucket| {
Ok(( Ok((
range_to_string(&range_bucket.range, &field_type), range_to_string(&range_bucket.range, &field_type)?,
range_bucket range_bucket
.bucket .bucket
.into_intermediate_bucket_entry(&agg_with_accessor.sub_aggregation)?, .into_intermediate_bucket_entry(&agg_with_accessor.sub_aggregation)?,
@@ -209,8 +212,8 @@ impl SegmentRangeCollector {
let key = range let key = range
.key .key
.clone() .clone()
.map(Key::Str) .map(|key| Ok(Key::Str(key)))
.unwrap_or_else(|| range_to_key(&range.range, &field_type)); .unwrap_or_else(|| range_to_key(&range.range, &field_type))?;
let to = if range.range.end == u64::MAX { let to = if range.range.end == u64::MAX {
None None
} else { } else {
@@ -228,6 +231,7 @@ impl SegmentRangeCollector {
sub_aggregation, sub_aggregation,
)?) )?)
}; };
Ok(SegmentRangeAndBucketEntry { Ok(SegmentRangeAndBucketEntry {
range: range.range.clone(), range: range.range.clone(),
bucket: SegmentRangeBucketEntry { bucket: SegmentRangeBucketEntry {
@@ -402,34 +406,45 @@ fn extend_validate_ranges(
Ok(converted_buckets) Ok(converted_buckets)
} }
pub(crate) fn range_to_string(range: &Range<u64>, field_type: &Type) -> String { pub(crate) fn range_to_string(range: &Range<u64>, field_type: &Type) -> crate::Result<String> {
// is_start is there for malformed requests, e.g. ig the user passes the range u64::MIN..0.0, // is_start is there for malformed requests, e.g. ig the user passes the range u64::MIN..0.0,
// it should be rendered as "*-0" and not "*-*" // it should be rendered as "*-0" and not "*-*"
let to_str = |val: u64, is_start: bool| { let to_str = |val: u64, is_start: bool| {
if (is_start && val == u64::MIN) || (!is_start && val == u64::MAX) { if (is_start && val == u64::MIN) || (!is_start && val == u64::MAX) {
"*".to_string() Ok("*".to_string())
} else if *field_type == Type::Date {
let val = i64::from_u64(val);
format_date(val)
} else { } else {
f64_from_fastfield_u64(val, field_type).to_string() Ok(f64_from_fastfield_u64(val, field_type).to_string())
} }
}; };
format!("{}-{}", to_str(range.start, true), to_str(range.end, false)) Ok(format!(
"{}-{}",
to_str(range.start, true)?,
to_str(range.end, false)?
))
} }
pub(crate) fn range_to_key(range: &Range<u64>, field_type: &Type) -> Key { pub(crate) fn range_to_key(range: &Range<u64>, field_type: &Type) -> crate::Result<Key> {
Key::Str(range_to_string(range, field_type)) Ok(Key::Str(range_to_string(range, field_type)?))
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use fastfield_codecs::MonotonicallyMappableToU64; use fastfield_codecs::MonotonicallyMappableToU64;
use serde_json::Value;
use super::*; use super::*;
use crate::aggregation::agg_req::{ use crate::aggregation::agg_req::{
Aggregation, Aggregations, BucketAggregation, BucketAggregationType, Aggregation, Aggregations, BucketAggregation, BucketAggregationType,
}; };
use crate::aggregation::tests::{exec_request_with_query, get_test_index_with_num_docs}; use crate::aggregation::tests::{
exec_request, exec_request_with_query, get_test_index_2_segments,
get_test_index_with_num_docs,
};
pub fn get_collector_from_ranges( pub fn get_collector_from_ranges(
ranges: Vec<RangeAggregationRange>, ranges: Vec<RangeAggregationRange>,
@@ -567,6 +582,77 @@ mod tests {
Ok(()) Ok(())
} }
#[test]
fn range_date_test_single_segment() -> crate::Result<()> {
range_date_test_with_opt(true)
}
#[test]
fn range_date_test_multi_segment() -> crate::Result<()> {
range_date_test_with_opt(false)
}
fn range_date_test_with_opt(merge_segments: bool) -> crate::Result<()> {
let index = get_test_index_2_segments(merge_segments)?;
let agg_req: Aggregations = vec![(
"date_ranges".to_string(),
Aggregation::Bucket(BucketAggregation {
bucket_agg: BucketAggregationType::Range(RangeAggregation {
field: "date".to_string(),
ranges: vec![
RangeAggregationRange {
key: None,
from: None,
to: Some(1546300800000000.0f64),
},
RangeAggregationRange {
key: None,
from: Some(1546300800000000.0f64),
to: Some(1546387200000000.0f64),
},
],
keyed: false,
}),
sub_aggregation: Default::default(),
}),
)]
.into_iter()
.collect();
let agg_res = exec_request(agg_req, &index)?;
let res: Value = serde_json::from_str(&serde_json::to_string(&agg_res)?)?;
assert_eq!(
res["date_ranges"]["buckets"][0]["from_as_string"],
Value::Null
);
assert_eq!(
res["date_ranges"]["buckets"][0]["key"],
"*-2019-01-01T00:00:00Z"
);
assert_eq!(
res["date_ranges"]["buckets"][1]["from_as_string"],
"2019-01-01T00:00:00Z"
);
assert_eq!(
res["date_ranges"]["buckets"][1]["to_as_string"],
"2019-01-02T00:00:00Z"
);
assert_eq!(
res["date_ranges"]["buckets"][2]["from_as_string"],
"2019-01-02T00:00:00Z"
);
assert_eq!(
res["date_ranges"]["buckets"][2]["to_as_string"],
Value::Null
);
Ok(())
}
#[test] #[test]
fn range_custom_key_keyed_buckets_test() -> crate::Result<()> { fn range_custom_key_keyed_buckets_test() -> crate::Result<()> {
let index = get_test_index_with_num_docs(false, 100)?; let index = get_test_index_with_num_docs(false, 100)?;

View File

@@ -7,6 +7,7 @@ use super::intermediate_agg_result::IntermediateAggregationResults;
use super::segment_agg_result::SegmentAggregationResultsCollector; use super::segment_agg_result::SegmentAggregationResultsCollector;
use crate::aggregation::agg_req_with_accessor::get_aggs_with_accessor_and_validate; use crate::aggregation::agg_req_with_accessor::get_aggs_with_accessor_and_validate;
use crate::collector::{Collector, SegmentCollector}; use crate::collector::{Collector, SegmentCollector};
use crate::schema::Schema;
use crate::{SegmentReader, TantivyError}; use crate::{SegmentReader, TantivyError};
/// The default max bucket count, before the aggregation fails. /// The default max bucket count, before the aggregation fails.
@@ -16,6 +17,7 @@ pub const MAX_BUCKET_COUNT: u32 = 65000;
/// ///
/// The collector collects all aggregations by the underlying aggregation request. /// The collector collects all aggregations by the underlying aggregation request.
pub struct AggregationCollector { pub struct AggregationCollector {
schema: Schema,
agg: Aggregations, agg: Aggregations,
max_bucket_count: u32, max_bucket_count: u32,
} }
@@ -25,8 +27,9 @@ impl AggregationCollector {
/// ///
/// Aggregation fails when the total bucket count is higher than max_bucket_count. /// Aggregation fails when the total bucket count is higher than max_bucket_count.
/// max_bucket_count will default to `MAX_BUCKET_COUNT` (65000) when unset /// max_bucket_count will default to `MAX_BUCKET_COUNT` (65000) when unset
pub fn from_aggs(agg: Aggregations, max_bucket_count: Option<u32>) -> Self { pub fn from_aggs(agg: Aggregations, max_bucket_count: Option<u32>, schema: Schema) -> Self {
Self { Self {
schema,
agg, agg,
max_bucket_count: max_bucket_count.unwrap_or(MAX_BUCKET_COUNT), max_bucket_count: max_bucket_count.unwrap_or(MAX_BUCKET_COUNT),
} }
@@ -113,7 +116,7 @@ impl Collector for AggregationCollector {
segment_fruits: Vec<<Self::Child as SegmentCollector>::Fruit>, segment_fruits: Vec<<Self::Child as SegmentCollector>::Fruit>,
) -> crate::Result<Self::Fruit> { ) -> crate::Result<Self::Fruit> {
let res = merge_fruits(segment_fruits)?; let res = merge_fruits(segment_fruits)?;
res.into_final_bucket_result(self.agg.clone()) res.into_final_bucket_result(self.agg.clone(), &self.schema)
} }
} }

18
src/aggregation/date.rs Normal file
View File

@@ -0,0 +1,18 @@
use time::format_description::well_known::Rfc3339;
use time::OffsetDateTime;
use crate::TantivyError;
pub(crate) fn format_date(val: i64) -> crate::Result<String> {
let datetime =
OffsetDateTime::from_unix_timestamp_nanos(1_000 * (val as i128)).map_err(|err| {
TantivyError::InvalidArgument(format!(
"Could not convert {:?} to OffsetDateTime, err {:?}",
val, err
))
})?;
let key_as_string = datetime
.format(&Rfc3339)
.map_err(|_err| TantivyError::InvalidArgument("Could not serialize date".to_string()))?;
Ok(key_as_string)
}

View File

@@ -10,7 +10,7 @@ use serde::{Deserialize, Serialize};
use super::agg_req::{ use super::agg_req::{
Aggregations, AggregationsInternal, BucketAggregationInternal, BucketAggregationType, Aggregations, AggregationsInternal, BucketAggregationInternal, BucketAggregationType,
MetricAggregation, MetricAggregation, RangeAggregation,
}; };
use super::agg_result::{AggregationResult, BucketResult, RangeBucketEntry}; use super::agg_result::{AggregationResult, BucketResult, RangeBucketEntry};
use super::bucket::{ use super::bucket::{
@@ -19,9 +19,11 @@ use super::bucket::{
}; };
use super::metric::{IntermediateAverage, IntermediateStats}; use super::metric::{IntermediateAverage, IntermediateStats};
use super::segment_agg_result::SegmentMetricResultCollector; use super::segment_agg_result::SegmentMetricResultCollector;
use super::{Key, SerializedKey, VecWithNames}; use super::{format_date, Key, SerializedKey, VecWithNames};
use crate::aggregation::agg_result::{AggregationResults, BucketEntries, BucketEntry}; use crate::aggregation::agg_result::{AggregationResults, BucketEntries, BucketEntry};
use crate::aggregation::bucket::TermsAggregationInternal; use crate::aggregation::bucket::TermsAggregationInternal;
use crate::schema::Schema;
use crate::TantivyError;
/// Contains the intermediate aggregation result, which is optimized to be merged with other /// Contains the intermediate aggregation result, which is optimized to be merged with other
/// intermediate results. /// intermediate results.
@@ -35,8 +37,12 @@ pub struct IntermediateAggregationResults {
impl IntermediateAggregationResults { impl IntermediateAggregationResults {
/// Convert intermediate result and its aggregation request to the final result. /// Convert intermediate result and its aggregation request to the final result.
pub fn into_final_bucket_result(self, req: Aggregations) -> crate::Result<AggregationResults> { pub fn into_final_bucket_result(
self.into_final_bucket_result_internal(&(req.into())) self,
req: Aggregations,
schema: &Schema,
) -> crate::Result<AggregationResults> {
self.into_final_bucket_result_internal(&(req.into()), schema)
} }
/// Convert intermediate result and its aggregation request to the final result. /// Convert intermediate result and its aggregation request to the final result.
@@ -46,6 +52,7 @@ impl IntermediateAggregationResults {
pub(crate) fn into_final_bucket_result_internal( pub(crate) fn into_final_bucket_result_internal(
self, self,
req: &AggregationsInternal, req: &AggregationsInternal,
schema: &Schema,
) -> crate::Result<AggregationResults> { ) -> crate::Result<AggregationResults> {
// Important assumption: // Important assumption:
// When the tree contains buckets/metric, we expect it to have all buckets/metrics from the // When the tree contains buckets/metric, we expect it to have all buckets/metrics from the
@@ -53,11 +60,11 @@ impl IntermediateAggregationResults {
let mut results: FxHashMap<String, AggregationResult> = FxHashMap::default(); let mut results: FxHashMap<String, AggregationResult> = FxHashMap::default();
if let Some(buckets) = self.buckets { if let Some(buckets) = self.buckets {
convert_and_add_final_buckets_to_result(&mut results, buckets, &req.buckets)? convert_and_add_final_buckets_to_result(&mut results, buckets, &req.buckets, schema)?
} else { } else {
// When there are no buckets, we create empty buckets, so that the serialized json // When there are no buckets, we create empty buckets, so that the serialized json
// format is constant // format is constant
add_empty_final_buckets_to_result(&mut results, &req.buckets)? add_empty_final_buckets_to_result(&mut results, &req.buckets, schema)?
}; };
if let Some(metrics) = self.metrics { if let Some(metrics) = self.metrics {
@@ -158,10 +165,12 @@ fn add_empty_final_metrics_to_result(
fn add_empty_final_buckets_to_result( fn add_empty_final_buckets_to_result(
results: &mut FxHashMap<String, AggregationResult>, results: &mut FxHashMap<String, AggregationResult>,
req_buckets: &VecWithNames<BucketAggregationInternal>, req_buckets: &VecWithNames<BucketAggregationInternal>,
schema: &Schema,
) -> crate::Result<()> { ) -> crate::Result<()> {
let requested_buckets = req_buckets.iter(); let requested_buckets = req_buckets.iter();
for (key, req) in requested_buckets { for (key, req) in requested_buckets {
let empty_bucket = AggregationResult::BucketResult(BucketResult::empty_from_req(req)?); let empty_bucket =
AggregationResult::BucketResult(BucketResult::empty_from_req(req, schema)?);
results.insert(key.to_string(), empty_bucket); results.insert(key.to_string(), empty_bucket);
} }
Ok(()) Ok(())
@@ -171,12 +180,13 @@ fn convert_and_add_final_buckets_to_result(
results: &mut FxHashMap<String, AggregationResult>, results: &mut FxHashMap<String, AggregationResult>,
buckets: VecWithNames<IntermediateBucketResult>, buckets: VecWithNames<IntermediateBucketResult>,
req_buckets: &VecWithNames<BucketAggregationInternal>, req_buckets: &VecWithNames<BucketAggregationInternal>,
schema: &Schema,
) -> crate::Result<()> { ) -> crate::Result<()> {
assert_eq!(buckets.len(), req_buckets.len()); assert_eq!(buckets.len(), req_buckets.len());
let buckets_with_request = buckets.into_iter().zip(req_buckets.values()); let buckets_with_request = buckets.into_iter().zip(req_buckets.values());
for ((key, bucket), req) in buckets_with_request { for ((key, bucket), req) in buckets_with_request {
let result = AggregationResult::BucketResult(bucket.into_final_bucket_result(req)?); let result = AggregationResult::BucketResult(bucket.into_final_bucket_result(req, schema)?);
results.insert(key, result); results.insert(key, result);
} }
Ok(()) Ok(())
@@ -266,13 +276,21 @@ impl IntermediateBucketResult {
pub(crate) fn into_final_bucket_result( pub(crate) fn into_final_bucket_result(
self, self,
req: &BucketAggregationInternal, req: &BucketAggregationInternal,
schema: &Schema,
) -> crate::Result<BucketResult> { ) -> crate::Result<BucketResult> {
match self { match self {
IntermediateBucketResult::Range(range_res) => { IntermediateBucketResult::Range(range_res) => {
let mut buckets: Vec<RangeBucketEntry> = range_res let mut buckets: Vec<RangeBucketEntry> = range_res
.buckets .buckets
.into_iter() .into_iter()
.map(|(_, bucket)| bucket.into_final_bucket_entry(&req.sub_aggregation)) .map(|(_, bucket)| {
bucket.into_final_bucket_entry(
&req.sub_aggregation,
schema,
req.as_range()
.expect("unexpected aggregation, expected histogram aggregation"),
)
})
.collect::<crate::Result<Vec<_>>>()?; .collect::<crate::Result<Vec<_>>>()?;
buckets.sort_by(|left, right| { buckets.sort_by(|left, right| {
@@ -303,6 +321,7 @@ impl IntermediateBucketResult {
req.as_histogram() req.as_histogram()
.expect("unexpected aggregation, expected histogram aggregation"), .expect("unexpected aggregation, expected histogram aggregation"),
&req.sub_aggregation, &req.sub_aggregation,
schema,
)?; )?;
let buckets = if req.as_histogram().unwrap().keyed { let buckets = if req.as_histogram().unwrap().keyed {
@@ -321,6 +340,7 @@ impl IntermediateBucketResult {
req.as_term() req.as_term()
.expect("unexpected aggregation, expected term aggregation"), .expect("unexpected aggregation, expected term aggregation"),
&req.sub_aggregation, &req.sub_aggregation,
schema,
), ),
} }
} }
@@ -411,6 +431,7 @@ impl IntermediateTermBucketResult {
self, self,
req: &TermsAggregation, req: &TermsAggregation,
sub_aggregation_req: &AggregationsInternal, sub_aggregation_req: &AggregationsInternal,
schema: &Schema,
) -> crate::Result<BucketResult> { ) -> crate::Result<BucketResult> {
let req = TermsAggregationInternal::from_req(req); let req = TermsAggregationInternal::from_req(req);
let mut buckets: Vec<BucketEntry> = self let mut buckets: Vec<BucketEntry> = self
@@ -419,11 +440,12 @@ impl IntermediateTermBucketResult {
.filter(|bucket| bucket.1.doc_count >= req.min_doc_count) .filter(|bucket| bucket.1.doc_count >= req.min_doc_count)
.map(|(key, entry)| { .map(|(key, entry)| {
Ok(BucketEntry { Ok(BucketEntry {
key_as_string: None,
key: Key::Str(key), key: Key::Str(key),
doc_count: entry.doc_count, doc_count: entry.doc_count,
sub_aggregation: entry sub_aggregation: entry
.sub_aggregation .sub_aggregation
.into_final_bucket_result_internal(sub_aggregation_req)?, .into_final_bucket_result_internal(sub_aggregation_req, schema)?,
}) })
}) })
.collect::<crate::Result<_>>()?; .collect::<crate::Result<_>>()?;
@@ -528,13 +550,15 @@ impl IntermediateHistogramBucketEntry {
pub(crate) fn into_final_bucket_entry( pub(crate) fn into_final_bucket_entry(
self, self,
req: &AggregationsInternal, req: &AggregationsInternal,
schema: &Schema,
) -> crate::Result<BucketEntry> { ) -> crate::Result<BucketEntry> {
Ok(BucketEntry { Ok(BucketEntry {
key_as_string: None,
key: Key::F64(self.key), key: Key::F64(self.key),
doc_count: self.doc_count, doc_count: self.doc_count,
sub_aggregation: self sub_aggregation: self
.sub_aggregation .sub_aggregation
.into_final_bucket_result_internal(req)?, .into_final_bucket_result_internal(req, schema)?,
}) })
} }
} }
@@ -571,16 +595,38 @@ impl IntermediateRangeBucketEntry {
pub(crate) fn into_final_bucket_entry( pub(crate) fn into_final_bucket_entry(
self, self,
req: &AggregationsInternal, req: &AggregationsInternal,
schema: &Schema,
range_req: &RangeAggregation,
) -> crate::Result<RangeBucketEntry> { ) -> crate::Result<RangeBucketEntry> {
Ok(RangeBucketEntry { let mut range_bucket_entry = RangeBucketEntry {
key: self.key, key: self.key,
doc_count: self.doc_count, doc_count: self.doc_count,
sub_aggregation: self sub_aggregation: self
.sub_aggregation .sub_aggregation
.into_final_bucket_result_internal(req)?, .into_final_bucket_result_internal(req, schema)?,
to: self.to, to: self.to,
from: self.from, from: self.from,
}) to_as_string: None,
from_as_string: None,
};
// If we have a date type on the histogram buckets, we add the `key_as_string` field as
// rfc339
let field = schema
.get_field(&range_req.field)
.ok_or_else(|| TantivyError::FieldNotFound(range_req.field.to_string()))?;
if schema.get_field_entry(field).field_type().is_date() {
if let Some(val) = range_bucket_entry.to {
let key_as_string = format_date(val as i64)?;
range_bucket_entry.to_as_string = Some(key_as_string);
}
if let Some(val) = range_bucket_entry.from {
let key_as_string = format_date(val as i64)?;
range_bucket_entry.from_as_string = Some(key_as_string);
}
}
Ok(range_bucket_entry)
} }
} }

View File

@@ -222,7 +222,7 @@ mod tests {
.into_iter() .into_iter()
.collect(); .collect();
let collector = AggregationCollector::from_aggs(agg_req_1, None); let collector = AggregationCollector::from_aggs(agg_req_1, None, index.schema());
let reader = index.reader()?; let reader = index.reader()?;
let searcher = reader.searcher(); let searcher = reader.searcher();
@@ -300,7 +300,7 @@ mod tests {
.into_iter() .into_iter()
.collect(); .collect();
let collector = AggregationCollector::from_aggs(agg_req_1, None); let collector = AggregationCollector::from_aggs(agg_req_1, None, index.schema());
let searcher = reader.searcher(); let searcher = reader.searcher();
let agg_res: AggregationResults = searcher.search(&term_query, &collector).unwrap(); let agg_res: AggregationResults = searcher.search(&term_query, &collector).unwrap();

View File

@@ -12,7 +12,7 @@
//! //!
//! ## Prerequisite //! ## Prerequisite
//! Currently aggregations work only on [fast fields](`crate::fastfield`). Single value fast fields //! Currently aggregations work only on [fast fields](`crate::fastfield`). Single value fast fields
//! of type `u64`, `f64`, `i64` and fast fields on text fields. //! of type `u64`, `f64`, `i64`, `date` and fast fields on text fields.
//! //!
//! ## Usage //! ## Usage
//! To use aggregations, build an aggregation request by constructing //! To use aggregations, build an aggregation request by constructing
@@ -53,9 +53,10 @@
//! use tantivy::query::AllQuery; //! use tantivy::query::AllQuery;
//! use tantivy::aggregation::agg_result::AggregationResults; //! use tantivy::aggregation::agg_result::AggregationResults;
//! use tantivy::IndexReader; //! use tantivy::IndexReader;
//! use tantivy::schema::Schema;
//! //!
//! # #[allow(dead_code)] //! # #[allow(dead_code)]
//! fn aggregate_on_index(reader: &IndexReader) { //! fn aggregate_on_index(reader: &IndexReader, schema: Schema) {
//! let agg_req: Aggregations = vec![ //! let agg_req: Aggregations = vec![
//! ( //! (
//! "average".to_string(), //! "average".to_string(),
@@ -67,7 +68,7 @@
//! .into_iter() //! .into_iter()
//! .collect(); //! .collect();
//! //!
//! let collector = AggregationCollector::from_aggs(agg_req, None); //! let collector = AggregationCollector::from_aggs(agg_req, None, schema);
//! //!
//! let searcher = reader.searcher(); //! let searcher = reader.searcher();
//! let agg_res: AggregationResults = searcher.search(&AllQuery, &collector).unwrap(); //! let agg_res: AggregationResults = searcher.search(&AllQuery, &collector).unwrap();
@@ -157,6 +158,7 @@ mod agg_req_with_accessor;
pub mod agg_result; pub mod agg_result;
pub mod bucket; pub mod bucket;
mod collector; mod collector;
mod date;
pub mod intermediate_agg_result; pub mod intermediate_agg_result;
pub mod metric; pub mod metric;
mod segment_agg_result; mod segment_agg_result;
@@ -167,6 +169,7 @@ pub use collector::{
AggregationCollector, AggregationSegmentCollector, DistributedAggregationCollector, AggregationCollector, AggregationSegmentCollector, DistributedAggregationCollector,
MAX_BUCKET_COUNT, MAX_BUCKET_COUNT,
}; };
pub(crate) use date::format_date;
use fastfield_codecs::MonotonicallyMappableToU64; use fastfield_codecs::MonotonicallyMappableToU64;
use itertools::Itertools; use itertools::Itertools;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@@ -283,11 +286,11 @@ impl Display for Key {
/// Inverse of `to_fastfield_u64`. Used to convert to `f64` for metrics. /// Inverse of `to_fastfield_u64`. Used to convert to `f64` for metrics.
/// ///
/// # Panics /// # Panics
/// Only `u64`, `f64`, and `i64` are supported. /// Only `u64`, `f64`, `date`, and `i64` are supported.
pub(crate) fn f64_from_fastfield_u64(val: u64, field_type: &Type) -> f64 { pub(crate) fn f64_from_fastfield_u64(val: u64, field_type: &Type) -> f64 {
match field_type { match field_type {
Type::U64 => val as f64, Type::U64 => val as f64,
Type::I64 => i64::from_u64(val) as f64, Type::I64 | Type::Date => i64::from_u64(val) as f64,
Type::F64 => f64::from_u64(val), Type::F64 => f64::from_u64(val),
_ => { _ => {
panic!("unexpected type {:?}. This should not happen", field_type) panic!("unexpected type {:?}. This should not happen", field_type)
@@ -295,10 +298,9 @@ pub(crate) fn f64_from_fastfield_u64(val: u64, field_type: &Type) -> f64 {
} }
} }
/// Converts the `f64` value to fast field value space. /// Converts the `f64` value to fast field value space, which is always u64.
/// ///
/// If the fast field has `u64`, values are stored as `u64` in the fast field. /// If the fast field has `u64`, values are stored unchanged as `u64` in the fast field.
/// A `f64` value of e.g. `2.0` therefore needs to be converted to `1u64`.
/// ///
/// If the fast field has `f64` values are converted and stored to `u64` using a /// If the fast field has `f64` values are converted and stored to `u64` using a
/// monotonic mapping. /// monotonic mapping.
@@ -308,7 +310,7 @@ pub(crate) fn f64_from_fastfield_u64(val: u64, field_type: &Type) -> f64 {
pub(crate) fn f64_to_fastfield_u64(val: f64, field_type: &Type) -> Option<u64> { pub(crate) fn f64_to_fastfield_u64(val: f64, field_type: &Type) -> Option<u64> {
match field_type { match field_type {
Type::U64 => Some(val as u64), Type::U64 => Some(val as u64),
Type::I64 => Some((val as i64).to_u64()), Type::I64 | Type::Date => Some((val as i64).to_u64()),
Type::F64 => Some(val.to_u64()), Type::F64 => Some(val.to_u64()),
_ => None, _ => None,
} }
@@ -317,6 +319,7 @@ pub(crate) fn f64_to_fastfield_u64(val: f64, field_type: &Type) -> Option<u64> {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use serde_json::Value; use serde_json::Value;
use time::OffsetDateTime;
use super::agg_req::{Aggregation, Aggregations, BucketAggregation}; use super::agg_req::{Aggregation, Aggregations, BucketAggregation};
use super::bucket::RangeAggregation; use super::bucket::RangeAggregation;
@@ -332,7 +335,7 @@ mod tests {
use crate::aggregation::DistributedAggregationCollector; use crate::aggregation::DistributedAggregationCollector;
use crate::query::{AllQuery, TermQuery}; use crate::query::{AllQuery, TermQuery};
use crate::schema::{Cardinality, IndexRecordOption, Schema, TextFieldIndexing, FAST, STRING}; use crate::schema::{Cardinality, IndexRecordOption, Schema, TextFieldIndexing, FAST, STRING};
use crate::{Index, Term}; use crate::{DateTime, Index, Term};
fn get_avg_req(field_name: &str) -> Aggregation { fn get_avg_req(field_name: &str) -> Aggregation {
Aggregation::Metric(MetricAggregation::Average( Aggregation::Metric(MetricAggregation::Average(
@@ -358,7 +361,7 @@ mod tests {
index: &Index, index: &Index,
query: Option<(&str, &str)>, query: Option<(&str, &str)>,
) -> crate::Result<Value> { ) -> crate::Result<Value> {
let collector = AggregationCollector::from_aggs(agg_req, None); let collector = AggregationCollector::from_aggs(agg_req, None, index.schema());
let reader = index.reader()?; let reader = index.reader()?;
let searcher = reader.searcher(); let searcher = reader.searcher();
@@ -552,10 +555,10 @@ mod tests {
let searcher = reader.searcher(); let searcher = reader.searcher();
let intermediate_agg_result = searcher.search(&AllQuery, &collector).unwrap(); let intermediate_agg_result = searcher.search(&AllQuery, &collector).unwrap();
intermediate_agg_result intermediate_agg_result
.into_final_bucket_result(agg_req) .into_final_bucket_result(agg_req, &index.schema())
.unwrap() .unwrap()
} else { } else {
let collector = AggregationCollector::from_aggs(agg_req, None); let collector = AggregationCollector::from_aggs(agg_req, None, index.schema());
let searcher = reader.searcher(); let searcher = reader.searcher();
searcher.search(&AllQuery, &collector).unwrap() searcher.search(&AllQuery, &collector).unwrap()
@@ -648,6 +651,7 @@ mod tests {
.set_fast() .set_fast()
.set_stored(); .set_stored();
let text_field = schema_builder.add_text_field("text", text_fieldtype); let text_field = schema_builder.add_text_field("text", text_fieldtype);
let date_field = schema_builder.add_date_field("date", FAST);
schema_builder.add_text_field("dummy_text", STRING); schema_builder.add_text_field("dummy_text", STRING);
let score_fieldtype = let score_fieldtype =
crate::schema::NumericOptions::default().set_fast(Cardinality::SingleValue); crate::schema::NumericOptions::default().set_fast(Cardinality::SingleValue);
@@ -665,6 +669,7 @@ mod tests {
// writing the segment // writing the segment
index_writer.add_document(doc!( index_writer.add_document(doc!(
text_field => "cool", text_field => "cool",
date_field => DateTime::from_utc(OffsetDateTime::from_unix_timestamp(1_546_300_800).unwrap()),
score_field => 1u64, score_field => 1u64,
score_field_f64 => 1f64, score_field_f64 => 1f64,
score_field_i64 => 1i64, score_field_i64 => 1i64,
@@ -673,6 +678,7 @@ mod tests {
))?; ))?;
index_writer.add_document(doc!( index_writer.add_document(doc!(
text_field => "cool", text_field => "cool",
date_field => DateTime::from_utc(OffsetDateTime::from_unix_timestamp(1_546_300_800 + 86400).unwrap()),
score_field => 3u64, score_field => 3u64,
score_field_f64 => 3f64, score_field_f64 => 3f64,
score_field_i64 => 3i64, score_field_i64 => 3i64,
@@ -681,18 +687,21 @@ mod tests {
))?; ))?;
index_writer.add_document(doc!( index_writer.add_document(doc!(
text_field => "cool", text_field => "cool",
date_field => DateTime::from_utc(OffsetDateTime::from_unix_timestamp(1_546_300_800 + 86400).unwrap()),
score_field => 5u64, score_field => 5u64,
score_field_f64 => 5f64, score_field_f64 => 5f64,
score_field_i64 => 5i64, score_field_i64 => 5i64,
))?; ))?;
index_writer.add_document(doc!( index_writer.add_document(doc!(
text_field => "nohit", text_field => "nohit",
date_field => DateTime::from_utc(OffsetDateTime::from_unix_timestamp(1_546_300_800 + 86400).unwrap()),
score_field => 6u64, score_field => 6u64,
score_field_f64 => 6f64, score_field_f64 => 6f64,
score_field_i64 => 6i64, score_field_i64 => 6i64,
))?; ))?;
index_writer.add_document(doc!( index_writer.add_document(doc!(
text_field => "cool", text_field => "cool",
date_field => DateTime::from_utc(OffsetDateTime::from_unix_timestamp(1_546_300_800 + 86400).unwrap()),
score_field => 7u64, score_field => 7u64,
score_field_f64 => 7f64, score_field_f64 => 7f64,
score_field_i64 => 7i64, score_field_i64 => 7i64,
@@ -700,12 +709,14 @@ mod tests {
index_writer.commit()?; index_writer.commit()?;
index_writer.add_document(doc!( index_writer.add_document(doc!(
text_field => "cool", text_field => "cool",
date_field => DateTime::from_utc(OffsetDateTime::from_unix_timestamp(1_546_300_800 + 86400).unwrap()),
score_field => 11u64, score_field => 11u64,
score_field_f64 => 11f64, score_field_f64 => 11f64,
score_field_i64 => 11i64, score_field_i64 => 11i64,
))?; ))?;
index_writer.add_document(doc!( index_writer.add_document(doc!(
text_field => "cool", text_field => "cool",
date_field => DateTime::from_utc(OffsetDateTime::from_unix_timestamp(1_546_300_800 + 86400 + 86400).unwrap()),
score_field => 14u64, score_field => 14u64,
score_field_f64 => 14f64, score_field_f64 => 14f64,
score_field_i64 => 14i64, score_field_i64 => 14i64,
@@ -713,6 +724,7 @@ mod tests {
index_writer.add_document(doc!( index_writer.add_document(doc!(
text_field => "cool", text_field => "cool",
date_field => DateTime::from_utc(OffsetDateTime::from_unix_timestamp(1_546_300_800 + 86400 + 86400).unwrap()),
score_field => 44u64, score_field => 44u64,
score_field_f64 => 44.5f64, score_field_f64 => 44.5f64,
score_field_i64 => 44i64, score_field_i64 => 44i64,
@@ -723,6 +735,7 @@ mod tests {
// no hits segment // no hits segment
index_writer.add_document(doc!( index_writer.add_document(doc!(
text_field => "nohit", text_field => "nohit",
date_field => DateTime::from_utc(OffsetDateTime::from_unix_timestamp(1_546_300_800 + 86400 + 86400).unwrap()),
score_field => 44u64, score_field => 44u64,
score_field_f64 => 44.5f64, score_field_f64 => 44.5f64,
score_field_i64 => 44i64, score_field_i64 => 44i64,
@@ -795,7 +808,7 @@ mod tests {
.into_iter() .into_iter()
.collect(); .collect();
let collector = AggregationCollector::from_aggs(agg_req_1, None); let collector = AggregationCollector::from_aggs(agg_req_1, None, index.schema());
let searcher = reader.searcher(); let searcher = reader.searcher();
let agg_res: AggregationResults = searcher.search(&term_query, &collector).unwrap(); let agg_res: AggregationResults = searcher.search(&term_query, &collector).unwrap();
@@ -995,9 +1008,10 @@ mod tests {
// Test de/serialization roundtrip on intermediate_agg_result // Test de/serialization roundtrip on intermediate_agg_result
let res: IntermediateAggregationResults = let res: IntermediateAggregationResults =
serde_json::from_str(&serde_json::to_string(&res).unwrap()).unwrap(); serde_json::from_str(&serde_json::to_string(&res).unwrap()).unwrap();
res.into_final_bucket_result(agg_req.clone()).unwrap() res.into_final_bucket_result(agg_req.clone(), &index.schema())
.unwrap()
} else { } else {
let collector = AggregationCollector::from_aggs(agg_req.clone(), None); let collector = AggregationCollector::from_aggs(agg_req.clone(), None, index.schema());
let searcher = reader.searcher(); let searcher = reader.searcher();
searcher.search(&term_query, &collector).unwrap() searcher.search(&term_query, &collector).unwrap()
@@ -1055,7 +1069,7 @@ mod tests {
); );
// Test empty result set // Test empty result set
let collector = AggregationCollector::from_aggs(agg_req, None); let collector = AggregationCollector::from_aggs(agg_req, None, index.schema());
let searcher = reader.searcher(); let searcher = reader.searcher();
searcher.search(&query_with_no_hits, &collector).unwrap(); searcher.search(&query_with_no_hits, &collector).unwrap();
@@ -1120,7 +1134,7 @@ mod tests {
.into_iter() .into_iter()
.collect(); .collect();
let collector = AggregationCollector::from_aggs(agg_req_1, None); let collector = AggregationCollector::from_aggs(agg_req_1, None, index.schema());
let searcher = reader.searcher(); let searcher = reader.searcher();
@@ -1233,7 +1247,7 @@ mod tests {
.into_iter() .into_iter()
.collect(); .collect();
let collector = AggregationCollector::from_aggs(agg_req_1, None); let collector = AggregationCollector::from_aggs(agg_req_1, None, index.schema());
let searcher = reader.searcher(); let searcher = reader.searcher();
let agg_res: AggregationResults = let agg_res: AggregationResults =
@@ -1264,7 +1278,7 @@ mod tests {
.into_iter() .into_iter()
.collect(); .collect();
let collector = AggregationCollector::from_aggs(agg_req_1, None); let collector = AggregationCollector::from_aggs(agg_req_1, None, index.schema());
let searcher = reader.searcher(); let searcher = reader.searcher();
let agg_res: AggregationResults = let agg_res: AggregationResults =
@@ -1295,7 +1309,7 @@ mod tests {
.into_iter() .into_iter()
.collect(); .collect();
let collector = AggregationCollector::from_aggs(agg_req_1, None); let collector = AggregationCollector::from_aggs(agg_req_1, None, index.schema());
let searcher = reader.searcher(); let searcher = reader.searcher();
let agg_res: AggregationResults = let agg_res: AggregationResults =
@@ -1334,7 +1348,7 @@ mod tests {
.into_iter() .into_iter()
.collect(); .collect();
let collector = AggregationCollector::from_aggs(agg_req_1, None); let collector = AggregationCollector::from_aggs(agg_req_1, None, index.schema());
let searcher = reader.searcher(); let searcher = reader.searcher();
let agg_res: AggregationResults = let agg_res: AggregationResults =
@@ -1363,7 +1377,7 @@ mod tests {
.into_iter() .into_iter()
.collect(); .collect();
let collector = AggregationCollector::from_aggs(agg_req, None); let collector = AggregationCollector::from_aggs(agg_req, None, index.schema());
let searcher = reader.searcher(); let searcher = reader.searcher();
let agg_res: AggregationResults = let agg_res: AggregationResults =
@@ -1392,7 +1406,7 @@ mod tests {
.into_iter() .into_iter()
.collect(); .collect();
let collector = AggregationCollector::from_aggs(agg_req, None); let collector = AggregationCollector::from_aggs(agg_req, None, index.schema());
let searcher = reader.searcher(); let searcher = reader.searcher();
let agg_res: AggregationResults = let agg_res: AggregationResults =
@@ -1429,7 +1443,7 @@ mod tests {
.into_iter() .into_iter()
.collect(); .collect();
let collector = AggregationCollector::from_aggs(agg_req_1, None); let collector = AggregationCollector::from_aggs(agg_req_1, None, index.schema());
let searcher = reader.searcher(); let searcher = reader.searcher();
let agg_res: AggregationResults = let agg_res: AggregationResults =
@@ -1464,7 +1478,7 @@ mod tests {
.into_iter() .into_iter()
.collect(); .collect();
let collector = AggregationCollector::from_aggs(agg_req_1, None); let collector = AggregationCollector::from_aggs(agg_req_1, None, index.schema());
let searcher = reader.searcher(); let searcher = reader.searcher();
let agg_res: AggregationResults = let agg_res: AggregationResults =
@@ -1503,7 +1517,7 @@ mod tests {
.into_iter() .into_iter()
.collect(); .collect();
let collector = AggregationCollector::from_aggs(agg_req_1, None); let collector = AggregationCollector::from_aggs(agg_req_1, None, index.schema());
let searcher = reader.searcher(); let searcher = reader.searcher();
let agg_res: AggregationResults = let agg_res: AggregationResults =
@@ -1533,7 +1547,7 @@ mod tests {
.into_iter() .into_iter()
.collect(); .collect();
let collector = AggregationCollector::from_aggs(agg_req_1, None); let collector = AggregationCollector::from_aggs(agg_req_1, None, index.schema());
let searcher = reader.searcher(); let searcher = reader.searcher();
let agg_res: AggregationResults = let agg_res: AggregationResults =
@@ -1590,7 +1604,7 @@ mod tests {
.into_iter() .into_iter()
.collect(); .collect();
let collector = AggregationCollector::from_aggs(agg_req_1, None); let collector = AggregationCollector::from_aggs(agg_req_1, None, index.schema());
let searcher = reader.searcher(); let searcher = reader.searcher();
let agg_res: AggregationResults = let agg_res: AggregationResults =

View File

@@ -149,7 +149,8 @@ impl IndexBuilder {
/// Creates a new index using the [`RamDirectory`]. /// Creates a new index using the [`RamDirectory`].
/// ///
/// The index will be allocated in anonymous memory. /// The index will be allocated in anonymous memory.
/// This should only be used for unit tests. /// This is useful for indexing small set of documents
/// for instances like unit test or temporary in memory index.
pub fn create_in_ram(self) -> Result<Index, TantivyError> { pub fn create_in_ram(self) -> Result<Index, TantivyError> {
let ram_directory = RamDirectory::create(); let ram_directory = RamDirectory::create();
self.create(ram_directory) self.create(ram_directory)

View File

@@ -30,8 +30,8 @@ pub use self::multivalued::{
MultiValueIndex, MultiValueU128FastFieldWriter, MultiValuedFastFieldReader, MultiValueIndex, MultiValueU128FastFieldWriter, MultiValuedFastFieldReader,
MultiValuedFastFieldWriter, MultiValuedU128FastFieldReader, MultiValuedFastFieldWriter, MultiValuedU128FastFieldReader,
}; };
pub(crate) use self::readers::type_and_cardinality;
pub use self::readers::FastFieldReaders; pub use self::readers::FastFieldReaders;
pub(crate) use self::readers::{type_and_cardinality, FastType};
pub use self::serializer::{Column, CompositeFastFieldSerializer}; pub use self::serializer::{Column, CompositeFastFieldSerializer};
use self::writer::unexpected_value; use self::writer::unexpected_value;
pub use self::writer::{FastFieldsWriter, IntFastFieldWriter}; pub use self::writer::{FastFieldsWriter, IntFastFieldWriter};
@@ -145,7 +145,7 @@ impl FastFieldType {
mod tests { mod tests {
use std::collections::HashMap; use std::collections::HashMap;
use std::ops::Range; use std::ops::{Range, RangeInclusive};
use std::path::Path; use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
@@ -159,7 +159,9 @@ mod tests {
use super::*; use super::*;
use crate::directory::{CompositeFile, Directory, RamDirectory, WritePtr}; use crate::directory::{CompositeFile, Directory, RamDirectory, WritePtr};
use crate::merge_policy::NoMergePolicy; use crate::merge_policy::NoMergePolicy;
use crate::schema::{Cardinality, Document, Field, Schema, SchemaBuilder, FAST, STRING, TEXT}; use crate::schema::{
Cardinality, Document, Field, Schema, SchemaBuilder, FAST, INDEXED, STRING, TEXT,
};
use crate::time::OffsetDateTime; use crate::time::OffsetDateTime;
use crate::{DateOptions, DatePrecision, Index, SegmentId, SegmentReader}; use crate::{DateOptions, DatePrecision, Index, SegmentId, SegmentReader};
@@ -207,7 +209,7 @@ mod tests {
serializer.close().unwrap(); serializer.close().unwrap();
} }
let file = directory.open_read(path).unwrap(); let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 25); assert_eq!(file.len(), 34);
let composite_file = CompositeFile::open(&file)?; let composite_file = CompositeFile::open(&file)?;
let fast_field_bytes = composite_file.open_read(*FIELD).unwrap().read_bytes()?; let fast_field_bytes = composite_file.open_read(*FIELD).unwrap().read_bytes()?;
let fast_field_reader = open::<u64>(fast_field_bytes)?; let fast_field_reader = open::<u64>(fast_field_bytes)?;
@@ -256,7 +258,7 @@ mod tests {
serializer.close()?; serializer.close()?;
} }
let file = directory.open_read(path)?; let file = directory.open_read(path)?;
assert_eq!(file.len(), 53); assert_eq!(file.len(), 62);
{ {
let fast_fields_composite = CompositeFile::open(&file)?; let fast_fields_composite = CompositeFile::open(&file)?;
let data = fast_fields_composite let data = fast_fields_composite
@@ -297,7 +299,7 @@ mod tests {
serializer.close().unwrap(); serializer.close().unwrap();
} }
let file = directory.open_read(path).unwrap(); let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 26); assert_eq!(file.len(), 35);
{ {
let fast_fields_composite = CompositeFile::open(&file).unwrap(); let fast_fields_composite = CompositeFile::open(&file).unwrap();
let data = fast_fields_composite let data = fast_fields_composite
@@ -336,7 +338,7 @@ mod tests {
serializer.close().unwrap(); serializer.close().unwrap();
} }
let file = directory.open_read(path).unwrap(); let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 80040); assert_eq!(file.len(), 80049);
{ {
let fast_fields_composite = CompositeFile::open(&file)?; let fast_fields_composite = CompositeFile::open(&file)?;
let data = fast_fields_composite let data = fast_fields_composite
@@ -378,7 +380,7 @@ mod tests {
serializer.close().unwrap(); serializer.close().unwrap();
} }
let file = directory.open_read(path).unwrap(); let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 40_usize); assert_eq!(file.len(), 49_usize);
{ {
let fast_fields_composite = CompositeFile::open(&file)?; let fast_fields_composite = CompositeFile::open(&file)?;
@@ -822,7 +824,7 @@ mod tests {
serializer.close().unwrap(); serializer.close().unwrap();
} }
let file = directory.open_read(path).unwrap(); let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 24); assert_eq!(file.len(), 33);
let composite_file = CompositeFile::open(&file)?; let composite_file = CompositeFile::open(&file)?;
let data = composite_file.open_read(field).unwrap().read_bytes()?; let data = composite_file.open_read(field).unwrap().read_bytes()?;
let fast_field_reader = open::<bool>(data)?; let fast_field_reader = open::<bool>(data)?;
@@ -860,7 +862,7 @@ mod tests {
serializer.close().unwrap(); serializer.close().unwrap();
} }
let file = directory.open_read(path).unwrap(); let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 36); assert_eq!(file.len(), 45);
let composite_file = CompositeFile::open(&file)?; let composite_file = CompositeFile::open(&file)?;
let data = composite_file.open_read(field).unwrap().read_bytes()?; let data = composite_file.open_read(field).unwrap().read_bytes()?;
let fast_field_reader = open::<bool>(data)?; let fast_field_reader = open::<bool>(data)?;
@@ -892,7 +894,7 @@ mod tests {
} }
let file = directory.open_read(path).unwrap(); let file = directory.open_read(path).unwrap();
let composite_file = CompositeFile::open(&file)?; let composite_file = CompositeFile::open(&file)?;
assert_eq!(file.len(), 23); assert_eq!(file.len(), 32);
let data = composite_file.open_read(field).unwrap().read_bytes()?; let data = composite_file.open_read(field).unwrap().read_bytes()?;
let fast_field_reader = open::<bool>(data)?; let fast_field_reader = open::<bool>(data)?;
assert_eq!(fast_field_reader.get_val(0), false); assert_eq!(fast_field_reader.get_val(0), false);
@@ -926,10 +928,10 @@ mod tests {
pub fn test_gcd_date() -> crate::Result<()> { pub fn test_gcd_date() -> crate::Result<()> {
let size_prec_sec = let size_prec_sec =
test_gcd_date_with_codec(FastFieldCodecType::Bitpacked, DatePrecision::Seconds)?; test_gcd_date_with_codec(FastFieldCodecType::Bitpacked, DatePrecision::Seconds)?;
assert_eq!(size_prec_sec, 28 + (1_000 * 13) / 8); // 13 bits per val = ceil(log_2(number of seconds in 2hours); assert_eq!(size_prec_sec, 5 + 4 + 28 + (1_000 * 13) / 8); // 13 bits per val = ceil(log_2(number of seconds in 2hours);
let size_prec_micro = let size_prec_micro =
test_gcd_date_with_codec(FastFieldCodecType::Bitpacked, DatePrecision::Microseconds)?; test_gcd_date_with_codec(FastFieldCodecType::Bitpacked, DatePrecision::Microseconds)?;
assert_eq!(size_prec_micro, 26 + (1_000 * 33) / 8); // 33 bits per val = ceil(log_2(number of microsecsseconds in 2hours); assert_eq!(size_prec_micro, 5 + 4 + 26 + (1_000 * 33) / 8); // 33 bits per val = ceil(log_2(number of microsecsseconds in 2hours);
Ok(()) Ok(())
} }
@@ -969,4 +971,117 @@ mod tests {
} }
Ok(len) Ok(len)
} }
#[test]
fn test_gcd_bug_regression_1757() {
let mut schema_builder = Schema::builder();
let num_field = schema_builder.add_u64_field("url_norm_hash", FAST | INDEXED);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
{
let mut writer = index.writer_for_tests().unwrap();
writer
.add_document(doc! {
num_field => 100u64,
})
.unwrap();
writer
.add_document(doc! {
num_field => 200u64,
})
.unwrap();
writer
.add_document(doc! {
num_field => 300u64,
})
.unwrap();
writer.commit().unwrap();
}
let reader = index.reader().unwrap();
let searcher = reader.searcher();
let segment = &searcher.segment_readers()[0];
let field = segment.fast_fields().u64(num_field).unwrap();
let numbers = vec![100, 200, 300];
let test_range = |range: RangeInclusive<u64>| {
let expexted_count = numbers.iter().filter(|num| range.contains(num)).count();
let mut vec = vec![];
field.get_docids_for_value_range(range, 0..u32::MAX, &mut vec);
assert_eq!(vec.len(), expexted_count);
};
test_range(50..=50);
test_range(150..=150);
test_range(350..=350);
test_range(100..=250);
test_range(101..=200);
test_range(101..=199);
test_range(100..=300);
test_range(100..=299);
}
#[test]
fn test_mapping_bug_docids_for_value_range() {
let mut schema_builder = Schema::builder();
let num_field = schema_builder.add_u64_field("url_norm_hash", FAST | INDEXED);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
{
// Values without gcd, but with min_value
let mut writer = index.writer_for_tests().unwrap();
writer
.add_document(doc! {
num_field => 1000u64,
})
.unwrap();
writer
.add_document(doc! {
num_field => 1001u64,
})
.unwrap();
writer
.add_document(doc! {
num_field => 1003u64,
})
.unwrap();
writer.commit().unwrap();
}
let reader = index.reader().unwrap();
let searcher = reader.searcher();
let segment = &searcher.segment_readers()[0];
let field = segment.fast_fields().u64(num_field).unwrap();
let numbers = vec![1000, 1001, 1003];
let test_range = |range: RangeInclusive<u64>| {
let expexted_count = numbers.iter().filter(|num| range.contains(num)).count();
let mut vec = vec![];
field.get_docids_for_value_range(range, 0..u32::MAX, &mut vec);
assert_eq!(vec.len(), expexted_count);
};
let test_range_variant = |start, stop| {
let start_range = start..=stop;
test_range(start_range);
let start_range = start..=(stop - 1);
test_range(start_range);
let start_range = start..=(stop + 1);
test_range(start_range);
let start_range = (start - 1)..=stop;
test_range(start_range);
let start_range = (start - 1)..=(stop - 1);
test_range(start_range);
let start_range = (start - 1)..=(stop + 1);
test_range(start_range);
let start_range = (start + 1)..=stop;
test_range(start_range);
let start_range = (start + 1)..=(stop - 1);
test_range(start_range);
let start_range = (start + 1)..=(stop + 1);
test_range(start_range);
};
test_range_variant(50, 50);
test_range_variant(1000, 1000);
test_range_variant(1000, 1002);
}
} }

View File

@@ -1,3 +1,4 @@
use std::fmt;
use std::io::{self, Write}; use std::io::{self, Write};
pub use fastfield_codecs::Column; pub use fastfield_codecs::Column;
@@ -49,7 +50,7 @@ impl CompositeFastFieldSerializer {
/// Serialize data into a new u64 fast field. The best compression codec will be chosen /// Serialize data into a new u64 fast field. The best compression codec will be chosen
/// automatically. /// automatically.
pub fn create_auto_detect_u64_fast_field<T: MonotonicallyMappableToU64>( pub fn create_auto_detect_u64_fast_field<T: MonotonicallyMappableToU64 + fmt::Debug>(
&mut self, &mut self,
field: Field, field: Field,
fastfield_accessor: impl Column<T>, fastfield_accessor: impl Column<T>,
@@ -59,7 +60,9 @@ impl CompositeFastFieldSerializer {
/// Serialize data into a new u64 fast field. The best compression codec will be chosen /// Serialize data into a new u64 fast field. The best compression codec will be chosen
/// automatically. /// automatically.
pub fn create_auto_detect_u64_fast_field_with_idx<T: MonotonicallyMappableToU64>( pub fn create_auto_detect_u64_fast_field_with_idx<
T: MonotonicallyMappableToU64 + fmt::Debug,
>(
&mut self, &mut self,
field: Field, field: Field,
fastfield_accessor: impl Column<T>, fastfield_accessor: impl Column<T>,
@@ -72,7 +75,9 @@ impl CompositeFastFieldSerializer {
/// Serialize data into a new u64 fast field. The best compression codec of the the provided /// Serialize data into a new u64 fast field. The best compression codec of the the provided
/// will be chosen. /// will be chosen.
pub fn create_auto_detect_u64_fast_field_with_idx_and_codecs<T: MonotonicallyMappableToU64>( pub fn create_auto_detect_u64_fast_field_with_idx_and_codecs<
T: MonotonicallyMappableToU64 + fmt::Debug,
>(
&mut self, &mut self,
field: Field, field: Field,
fastfield_accessor: impl Column<T>, fastfield_accessor: impl Column<T>,

View File

@@ -67,11 +67,12 @@ pub(crate) fn index_json_values<'a>(
doc: DocId, doc: DocId,
json_values: impl Iterator<Item = crate::Result<&'a serde_json::Map<String, serde_json::Value>>>, json_values: impl Iterator<Item = crate::Result<&'a serde_json::Map<String, serde_json::Value>>>,
text_analyzer: &TextAnalyzer, text_analyzer: &TextAnalyzer,
expand_dots_enabled: bool,
term_buffer: &mut Term, term_buffer: &mut Term,
postings_writer: &mut dyn PostingsWriter, postings_writer: &mut dyn PostingsWriter,
ctx: &mut IndexingContext, ctx: &mut IndexingContext,
) -> crate::Result<()> { ) -> crate::Result<()> {
let mut json_term_writer = JsonTermWriter::wrap(term_buffer); let mut json_term_writer = JsonTermWriter::wrap(term_buffer, expand_dots_enabled);
let mut positions_per_path: IndexingPositionsPerPath = Default::default(); let mut positions_per_path: IndexingPositionsPerPath = Default::default();
for json_value_res in json_values { for json_value_res in json_values {
let json_value = json_value_res?; let json_value = json_value_res?;
@@ -259,6 +260,7 @@ pub(crate) fn set_string_and_get_terms(
pub struct JsonTermWriter<'a> { pub struct JsonTermWriter<'a> {
term_buffer: &'a mut Term, term_buffer: &'a mut Term,
path_stack: Vec<usize>, path_stack: Vec<usize>,
expand_dots_enabled: bool,
} }
/// Splits a json path supplied to the query parser in such a way that /// Splits a json path supplied to the query parser in such a way that
@@ -298,23 +300,25 @@ impl<'a> JsonTermWriter<'a> {
pub fn from_field_and_json_path( pub fn from_field_and_json_path(
field: Field, field: Field,
json_path: &str, json_path: &str,
expand_dots_enabled: bool,
term_buffer: &'a mut Term, term_buffer: &'a mut Term,
) -> Self { ) -> Self {
term_buffer.set_field_and_type(field, Type::Json); term_buffer.set_field_and_type(field, Type::Json);
let mut json_term_writer = Self::wrap(term_buffer); let mut json_term_writer = Self::wrap(term_buffer, expand_dots_enabled);
for segment in split_json_path(json_path) { for segment in split_json_path(json_path) {
json_term_writer.push_path_segment(&segment); json_term_writer.push_path_segment(&segment);
} }
json_term_writer json_term_writer
} }
pub fn wrap(term_buffer: &'a mut Term) -> Self { pub fn wrap(term_buffer: &'a mut Term, expand_dots_enabled: bool) -> Self {
term_buffer.clear_with_type(Type::Json); term_buffer.clear_with_type(Type::Json);
let mut path_stack = Vec::with_capacity(10); let mut path_stack = Vec::with_capacity(10);
path_stack.push(0); path_stack.push(0);
Self { Self {
term_buffer, term_buffer,
path_stack, path_stack,
expand_dots_enabled,
} }
} }
@@ -336,11 +340,24 @@ impl<'a> JsonTermWriter<'a> {
self.trim_to_end_of_path(); self.trim_to_end_of_path();
let buffer = self.term_buffer.value_bytes_mut(); let buffer = self.term_buffer.value_bytes_mut();
let buffer_len = buffer.len(); let buffer_len = buffer.len();
if self.path_stack.len() > 1 { if self.path_stack.len() > 1 {
buffer[buffer_len - 1] = JSON_PATH_SEGMENT_SEP; buffer[buffer_len - 1] = JSON_PATH_SEGMENT_SEP;
} }
self.term_buffer.append_bytes(segment.as_bytes()); if self.expand_dots_enabled && segment.as_bytes().contains(&b'.') {
self.term_buffer.append_bytes(&[JSON_PATH_SEGMENT_SEP]); // We need to replace `.` by JSON_PATH_SEGMENT_SEP.
self.term_buffer
.append_bytes(segment.as_bytes())
.iter_mut()
.for_each(|byte| {
if *byte == b'.' {
*byte = JSON_PATH_SEGMENT_SEP;
}
});
} else {
self.term_buffer.append_bytes(segment.as_bytes());
}
self.term_buffer.push_byte(JSON_PATH_SEGMENT_SEP);
self.path_stack.push(self.term_buffer.len_bytes()); self.path_stack.push(self.term_buffer.len_bytes());
} }
@@ -391,7 +408,7 @@ mod tests {
fn test_json_writer() { fn test_json_writer() {
let field = Field::from_field_id(1); let field = Field::from_field_id(1);
let mut term = Term::with_type_and_field(Type::Json, field); let mut term = Term::with_type_and_field(Type::Json, field);
let mut json_writer = JsonTermWriter::wrap(&mut term); let mut json_writer = JsonTermWriter::wrap(&mut term, false);
json_writer.push_path_segment("attributes"); json_writer.push_path_segment("attributes");
json_writer.push_path_segment("color"); json_writer.push_path_segment("color");
json_writer.set_str("red"); json_writer.set_str("red");
@@ -425,7 +442,7 @@ mod tests {
fn test_string_term() { fn test_string_term() {
let field = Field::from_field_id(1); let field = Field::from_field_id(1);
let mut term = Term::with_type_and_field(Type::Json, field); let mut term = Term::with_type_and_field(Type::Json, field);
let mut json_writer = JsonTermWriter::wrap(&mut term); let mut json_writer = JsonTermWriter::wrap(&mut term, false);
json_writer.push_path_segment("color"); json_writer.push_path_segment("color");
json_writer.set_str("red"); json_writer.set_str("red");
assert_eq!( assert_eq!(
@@ -438,7 +455,7 @@ mod tests {
fn test_i64_term() { fn test_i64_term() {
let field = Field::from_field_id(1); let field = Field::from_field_id(1);
let mut term = Term::with_type_and_field(Type::Json, field); let mut term = Term::with_type_and_field(Type::Json, field);
let mut json_writer = JsonTermWriter::wrap(&mut term); let mut json_writer = JsonTermWriter::wrap(&mut term, false);
json_writer.push_path_segment("color"); json_writer.push_path_segment("color");
json_writer.set_fast_value(-4i64); json_writer.set_fast_value(-4i64);
assert_eq!( assert_eq!(
@@ -451,7 +468,7 @@ mod tests {
fn test_u64_term() { fn test_u64_term() {
let field = Field::from_field_id(1); let field = Field::from_field_id(1);
let mut term = Term::with_type_and_field(Type::Json, field); let mut term = Term::with_type_and_field(Type::Json, field);
let mut json_writer = JsonTermWriter::wrap(&mut term); let mut json_writer = JsonTermWriter::wrap(&mut term, false);
json_writer.push_path_segment("color"); json_writer.push_path_segment("color");
json_writer.set_fast_value(4u64); json_writer.set_fast_value(4u64);
assert_eq!( assert_eq!(
@@ -464,7 +481,7 @@ mod tests {
fn test_f64_term() { fn test_f64_term() {
let field = Field::from_field_id(1); let field = Field::from_field_id(1);
let mut term = Term::with_type_and_field(Type::Json, field); let mut term = Term::with_type_and_field(Type::Json, field);
let mut json_writer = JsonTermWriter::wrap(&mut term); let mut json_writer = JsonTermWriter::wrap(&mut term, false);
json_writer.push_path_segment("color"); json_writer.push_path_segment("color");
json_writer.set_fast_value(4.0f64); json_writer.set_fast_value(4.0f64);
assert_eq!( assert_eq!(
@@ -477,7 +494,7 @@ mod tests {
fn test_bool_term() { fn test_bool_term() {
let field = Field::from_field_id(1); let field = Field::from_field_id(1);
let mut term = Term::with_type_and_field(Type::Json, field); let mut term = Term::with_type_and_field(Type::Json, field);
let mut json_writer = JsonTermWriter::wrap(&mut term); let mut json_writer = JsonTermWriter::wrap(&mut term, false);
json_writer.push_path_segment("color"); json_writer.push_path_segment("color");
json_writer.set_fast_value(true); json_writer.set_fast_value(true);
assert_eq!( assert_eq!(
@@ -490,7 +507,7 @@ mod tests {
fn test_push_after_set_path_segment() { fn test_push_after_set_path_segment() {
let field = Field::from_field_id(1); let field = Field::from_field_id(1);
let mut term = Term::with_type_and_field(Type::Json, field); let mut term = Term::with_type_and_field(Type::Json, field);
let mut json_writer = JsonTermWriter::wrap(&mut term); let mut json_writer = JsonTermWriter::wrap(&mut term, false);
json_writer.push_path_segment("attribute"); json_writer.push_path_segment("attribute");
json_writer.set_str("something"); json_writer.set_str("something");
json_writer.push_path_segment("color"); json_writer.push_path_segment("color");
@@ -505,7 +522,7 @@ mod tests {
fn test_pop_segment() { fn test_pop_segment() {
let field = Field::from_field_id(1); let field = Field::from_field_id(1);
let mut term = Term::with_type_and_field(Type::Json, field); let mut term = Term::with_type_and_field(Type::Json, field);
let mut json_writer = JsonTermWriter::wrap(&mut term); let mut json_writer = JsonTermWriter::wrap(&mut term, false);
json_writer.push_path_segment("color"); json_writer.push_path_segment("color");
json_writer.push_path_segment("hue"); json_writer.push_path_segment("hue");
json_writer.pop_path_segment(); json_writer.pop_path_segment();
@@ -520,7 +537,7 @@ mod tests {
fn test_json_writer_path() { fn test_json_writer_path() {
let field = Field::from_field_id(1); let field = Field::from_field_id(1);
let mut term = Term::with_type_and_field(Type::Json, field); let mut term = Term::with_type_and_field(Type::Json, field);
let mut json_writer = JsonTermWriter::wrap(&mut term); let mut json_writer = JsonTermWriter::wrap(&mut term, false);
json_writer.push_path_segment("color"); json_writer.push_path_segment("color");
assert_eq!(json_writer.path(), b"color"); assert_eq!(json_writer.path(), b"color");
json_writer.push_path_segment("hue"); json_writer.push_path_segment("hue");
@@ -529,6 +546,37 @@ mod tests {
assert_eq!(json_writer.path(), b"color\x01hue"); assert_eq!(json_writer.path(), b"color\x01hue");
} }
#[test]
fn test_json_path_expand_dots_disabled() {
let field = Field::from_field_id(1);
let mut term = Term::with_type_and_field(Type::Json, field);
let mut json_writer = JsonTermWriter::wrap(&mut term, false);
json_writer.push_path_segment("color.hue");
assert_eq!(json_writer.path(), b"color.hue");
}
#[test]
fn test_json_path_expand_dots_enabled() {
let field = Field::from_field_id(1);
let mut term = Term::with_type_and_field(Type::Json, field);
let mut json_writer = JsonTermWriter::wrap(&mut term, true);
json_writer.push_path_segment("color.hue");
assert_eq!(json_writer.path(), b"color\x01hue");
}
#[test]
fn test_json_path_expand_dots_enabled_pop_segment() {
let field = Field::from_field_id(1);
let mut term = Term::with_type_and_field(Type::Json, field);
let mut json_writer = JsonTermWriter::wrap(&mut term, true);
json_writer.push_path_segment("hello");
assert_eq!(json_writer.path(), b"hello");
json_writer.push_path_segment("color.hue");
assert_eq!(json_writer.path(), b"hello\x01color\x01hue");
json_writer.pop_path_segment();
assert_eq!(json_writer.path(), b"hello");
}
#[test] #[test]
fn test_split_json_path_simple() { fn test_split_json_path_simple() {
let json_path = split_json_path("titi.toto"); let json_path = split_json_path("titi.toto");

View File

@@ -60,7 +60,7 @@ type AddBatchReceiver = channel::Receiver<AddBatch>;
mod tests_mmap { mod tests_mmap {
use crate::collector::Count; use crate::collector::Count;
use crate::query::QueryParser; use crate::query::QueryParser;
use crate::schema::{Schema, STORED, TEXT}; use crate::schema::{JsonObjectOptions, Schema, TEXT};
use crate::{Index, Term}; use crate::{Index, Term};
#[test] #[test]
@@ -81,9 +81,9 @@ mod tests_mmap {
} }
#[test] #[test]
fn test_json_field_espace() { fn test_json_field_expand_dots_disabled_dot_escaped_required() {
let mut schema_builder = Schema::builder(); let mut schema_builder = Schema::builder();
let json_field = schema_builder.add_json_field("json", TEXT | STORED); let json_field = schema_builder.add_json_field("json", TEXT);
let index = Index::create_in_ram(schema_builder.build()); let index = Index::create_in_ram(schema_builder.build());
let mut index_writer = index.writer_for_tests().unwrap(); let mut index_writer = index.writer_for_tests().unwrap();
let json = serde_json::json!({"k8s.container.name": "prometheus", "val": "hello"}); let json = serde_json::json!({"k8s.container.name": "prometheus", "val": "hello"});
@@ -99,4 +99,26 @@ mod tests_mmap {
let num_docs = searcher.search(&query, &Count).unwrap(); let num_docs = searcher.search(&query, &Count).unwrap();
assert_eq!(num_docs, 1); assert_eq!(num_docs, 1);
} }
#[test]
fn test_json_field_expand_dots_enabled_dot_escape_not_required() {
let mut schema_builder = Schema::builder();
let json_options: JsonObjectOptions =
JsonObjectOptions::from(TEXT).set_expand_dots_enabled();
let json_field = schema_builder.add_json_field("json", json_options);
let index = Index::create_in_ram(schema_builder.build());
let mut index_writer = index.writer_for_tests().unwrap();
let json = serde_json::json!({"k8s.container.name": "prometheus", "val": "hello"});
index_writer.add_document(doc!(json_field=>json)).unwrap();
index_writer.commit().unwrap();
let reader = index.reader().unwrap();
let searcher = reader.searcher();
assert_eq!(searcher.num_docs(), 1);
let parse_query = QueryParser::for_index(&index, Vec::new());
let query = parse_query
.parse_query(r#"json.k8s.container.name:prometheus"#)
.unwrap();
let num_docs = searcher.search(&query, &Count).unwrap();
assert_eq!(num_docs, 1);
}
} }

View File

@@ -180,7 +180,7 @@ impl SegmentWriter {
self.per_field_postings_writers.get_for_field_mut(field); self.per_field_postings_writers.get_for_field_mut(field);
term_buffer.clear_with_field_and_type(field_entry.field_type().value_type(), field); term_buffer.clear_with_field_and_type(field_entry.field_type().value_type(), field);
match *field_entry.field_type() { match field_entry.field_type() {
FieldType::Facet(_) => { FieldType::Facet(_) => {
for value in values { for value in values {
let facet = value.as_facet().ok_or_else(make_schema_error)?; let facet = value.as_facet().ok_or_else(make_schema_error)?;
@@ -307,7 +307,7 @@ impl SegmentWriter {
self.fieldnorms_writer.record(doc_id, field, num_vals); self.fieldnorms_writer.record(doc_id, field, num_vals);
} }
} }
FieldType::JsonObject(_) => { FieldType::JsonObject(json_options) => {
let text_analyzer = &self.per_field_text_analyzers[field.field_id() as usize]; let text_analyzer = &self.per_field_text_analyzers[field.field_id() as usize];
let json_values_it = let json_values_it =
values.map(|value| value.as_json().ok_or_else(make_schema_error)); values.map(|value| value.as_json().ok_or_else(make_schema_error));
@@ -315,6 +315,7 @@ impl SegmentWriter {
doc_id, doc_id,
json_values_it, json_values_it,
text_analyzer, text_analyzer,
json_options.is_expand_dots_enabled(),
term_buffer, term_buffer,
postings_writer, postings_writer,
ctx, ctx,
@@ -557,7 +558,7 @@ mod tests {
let mut term = Term::with_type_and_field(Type::Json, json_field); let mut term = Term::with_type_and_field(Type::Json, json_field);
let mut term_stream = term_dict.stream().unwrap(); let mut term_stream = term_dict.stream().unwrap();
let mut json_term_writer = JsonTermWriter::wrap(&mut term); let mut json_term_writer = JsonTermWriter::wrap(&mut term, false);
json_term_writer.push_path_segment("bool"); json_term_writer.push_path_segment("bool");
json_term_writer.set_fast_value(true); json_term_writer.set_fast_value(true);
@@ -648,7 +649,7 @@ mod tests {
let segment_reader = searcher.segment_reader(0u32); let segment_reader = searcher.segment_reader(0u32);
let inv_index = segment_reader.inverted_index(json_field).unwrap(); let inv_index = segment_reader.inverted_index(json_field).unwrap();
let mut term = Term::with_type_and_field(Type::Json, json_field); let mut term = Term::with_type_and_field(Type::Json, json_field);
let mut json_term_writer = JsonTermWriter::wrap(&mut term); let mut json_term_writer = JsonTermWriter::wrap(&mut term, false);
json_term_writer.push_path_segment("mykey"); json_term_writer.push_path_segment("mykey");
json_term_writer.set_str("token"); json_term_writer.set_str("token");
let term_info = inv_index let term_info = inv_index
@@ -692,7 +693,7 @@ mod tests {
let segment_reader = searcher.segment_reader(0u32); let segment_reader = searcher.segment_reader(0u32);
let inv_index = segment_reader.inverted_index(json_field).unwrap(); let inv_index = segment_reader.inverted_index(json_field).unwrap();
let mut term = Term::with_type_and_field(Type::Json, json_field); let mut term = Term::with_type_and_field(Type::Json, json_field);
let mut json_term_writer = JsonTermWriter::wrap(&mut term); let mut json_term_writer = JsonTermWriter::wrap(&mut term, false);
json_term_writer.push_path_segment("mykey"); json_term_writer.push_path_segment("mykey");
json_term_writer.set_str("two tokens"); json_term_writer.set_str("two tokens");
let term_info = inv_index let term_info = inv_index
@@ -737,7 +738,7 @@ mod tests {
let reader = index.reader().unwrap(); let reader = index.reader().unwrap();
let searcher = reader.searcher(); let searcher = reader.searcher();
let mut term = Term::with_type_and_field(Type::Json, json_field); let mut term = Term::with_type_and_field(Type::Json, json_field);
let mut json_term_writer = JsonTermWriter::wrap(&mut term); let mut json_term_writer = JsonTermWriter::wrap(&mut term, false);
json_term_writer.push_path_segment("mykey"); json_term_writer.push_path_segment("mykey");
json_term_writer.push_path_segment("field"); json_term_writer.push_path_segment("field");
json_term_writer.set_str("hello"); json_term_writer.set_str("hello");

View File

@@ -16,7 +16,8 @@ use crate::query::{
TermQuery, TermSetQuery, TermQuery, TermSetQuery,
}; };
use crate::schema::{ use crate::schema::{
Facet, FacetParseError, Field, FieldType, IndexRecordOption, IntoIpv6Addr, Schema, Term, Type, Facet, FacetParseError, Field, FieldType, IndexRecordOption, IntoIpv6Addr, JsonObjectOptions,
Schema, Term, Type,
}; };
use crate::time::format_description::well_known::Rfc3339; use crate::time::format_description::well_known::Rfc3339;
use crate::time::OffsetDateTime; use crate::time::OffsetDateTime;
@@ -182,7 +183,6 @@ pub struct QueryParser {
conjunction_by_default: bool, conjunction_by_default: bool,
tokenizer_manager: TokenizerManager, tokenizer_manager: TokenizerManager,
boost: HashMap<Field, Score>, boost: HashMap<Field, Score>,
field_names: HashMap<String, Field>,
} }
fn all_negative(ast: &LogicalAst) -> bool { fn all_negative(ast: &LogicalAst) -> bool {
@@ -195,31 +195,6 @@ fn all_negative(ast: &LogicalAst) -> bool {
} }
} }
// Returns the position (in byte offsets) of the unescaped '.' in the `field_path`.
//
// This function operates directly on bytes (as opposed to codepoint), relying
// on a encoding property of utf-8 for its correctness.
fn locate_splitting_dots(field_path: &str) -> Vec<usize> {
let mut splitting_dots_pos = Vec::new();
let mut escape_state = false;
for (pos, b) in field_path.bytes().enumerate() {
if escape_state {
escape_state = false;
continue;
}
match b {
b'\\' => {
escape_state = true;
}
b'.' => {
splitting_dots_pos.push(pos);
}
_ => {}
}
}
splitting_dots_pos
}
impl QueryParser { impl QueryParser {
/// Creates a `QueryParser`, given /// Creates a `QueryParser`, given
/// * schema - index Schema /// * schema - index Schema
@@ -229,34 +204,19 @@ impl QueryParser {
default_fields: Vec<Field>, default_fields: Vec<Field>,
tokenizer_manager: TokenizerManager, tokenizer_manager: TokenizerManager,
) -> QueryParser { ) -> QueryParser {
let field_names = schema
.fields()
.map(|(field, field_entry)| (field_entry.name().to_string(), field))
.collect();
QueryParser { QueryParser {
schema, schema,
default_fields, default_fields,
tokenizer_manager, tokenizer_manager,
conjunction_by_default: false, conjunction_by_default: false,
boost: Default::default(), boost: Default::default(),
field_names,
} }
} }
// Splits a full_path as written in a query, into a field name and a // Splits a full_path as written in a query, into a field name and a
// json path. // json path.
pub(crate) fn split_full_path<'a>(&self, full_path: &'a str) -> Option<(Field, &'a str)> { pub(crate) fn split_full_path<'a>(&self, full_path: &'a str) -> Option<(Field, &'a str)> {
if let Some(field) = self.field_names.get(full_path) { self.schema.find_field(full_path)
return Some((*field, ""));
}
let mut splitting_period_pos: Vec<usize> = locate_splitting_dots(full_path);
while let Some(pos) = splitting_period_pos.pop() {
let (prefix, suffix) = full_path.split_at(pos);
if let Some(field) = self.field_names.get(prefix) {
return Some((*field, &suffix[1..]));
}
}
None
} }
/// Creates a `QueryParser`, given /// Creates a `QueryParser`, given
@@ -482,28 +442,14 @@ impl QueryParser {
.into_iter() .into_iter()
.collect()) .collect())
} }
FieldType::JsonObject(ref json_options) => { FieldType::JsonObject(ref json_options) => generate_literals_for_json_object(
let option = json_options.get_text_indexing_options().ok_or_else(|| { field_name,
// This should have been seen earlier really. field,
QueryParserError::FieldNotIndexed(field_name.to_string()) json_path,
})?; phrase,
let text_analyzer = &self.tokenizer_manager,
self.tokenizer_manager json_options,
.get(option.tokenizer()) ),
.ok_or_else(|| QueryParserError::UnknownTokenizer {
field: field_name.to_string(),
tokenizer: option.tokenizer().to_string(),
})?;
let index_record_option = option.index_option();
generate_literals_for_json_object(
field_name,
field,
json_path,
phrase,
&text_analyzer,
index_record_option,
)
}
FieldType::Facet(_) => match Facet::from_text(phrase) { FieldType::Facet(_) => match Facet::from_text(phrase) {
Ok(facet) => { Ok(facet) => {
let facet_term = Term::from_facet(field, &facet); let facet_term = Term::from_facet(field, &facet);
@@ -767,17 +713,32 @@ fn generate_literals_for_json_object(
field: Field, field: Field,
json_path: &str, json_path: &str,
phrase: &str, phrase: &str,
text_analyzer: &TextAnalyzer, tokenizer_manager: &TokenizerManager,
index_record_option: IndexRecordOption, json_options: &JsonObjectOptions,
) -> Result<Vec<LogicalLiteral>, QueryParserError> { ) -> Result<Vec<LogicalLiteral>, QueryParserError> {
let text_options = json_options.get_text_indexing_options().ok_or_else(|| {
// This should have been seen earlier really.
QueryParserError::FieldNotIndexed(field_name.to_string())
})?;
let text_analyzer = tokenizer_manager
.get(text_options.tokenizer())
.ok_or_else(|| QueryParserError::UnknownTokenizer {
field: field_name.to_string(),
tokenizer: text_options.tokenizer().to_string(),
})?;
let index_record_option = text_options.index_option();
let mut logical_literals = Vec::new(); let mut logical_literals = Vec::new();
let mut term = Term::with_capacity(100); let mut term = Term::with_capacity(100);
let mut json_term_writer = let mut json_term_writer = JsonTermWriter::from_field_and_json_path(
JsonTermWriter::from_field_and_json_path(field, json_path, &mut term); field,
json_path,
json_options.is_expand_dots_enabled(),
&mut term,
);
if let Some(term) = convert_to_fast_value_and_get_term(&mut json_term_writer, phrase) { if let Some(term) = convert_to_fast_value_and_get_term(&mut json_term_writer, phrase) {
logical_literals.push(LogicalLiteral::Term(term)); logical_literals.push(LogicalLiteral::Term(term));
} }
let terms = set_string_and_get_terms(&mut json_term_writer, phrase, text_analyzer); let terms = set_string_and_get_terms(&mut json_term_writer, phrase, &text_analyzer);
drop(json_term_writer); drop(json_term_writer);
if terms.len() <= 1 { if terms.len() <= 1 {
for (_, term) in terms { for (_, term) in terms {
@@ -1564,13 +1525,6 @@ mod test {
assert_eq!(query_parser.split_full_path("firsty"), None); assert_eq!(query_parser.split_full_path("firsty"), None);
} }
#[test]
fn test_locate_splitting_dots() {
assert_eq!(&super::locate_splitting_dots("a.b.c"), &[1, 3]);
assert_eq!(&super::locate_splitting_dots(r#"a\.b.c"#), &[4]);
assert_eq!(&super::locate_splitting_dots(r#"a\..b.c"#), &[3, 5]);
}
#[test] #[test]
pub fn test_phrase_slop() { pub fn test_phrase_slop() {
test_parse_query_to_logical_ast_helper( test_parse_query_to_logical_ast_helper(

View File

@@ -181,6 +181,11 @@ impl FieldType {
matches!(self, FieldType::IpAddr(_)) matches!(self, FieldType::IpAddr(_))
} }
/// returns true if this is an date field
pub fn is_date(&self) -> bool {
matches!(self, FieldType::Date(_))
}
/// returns true if the field is indexed. /// returns true if the field is indexed.
pub fn is_indexed(&self) -> bool { pub fn is_indexed(&self) -> bool {
match *self { match *self {

View File

@@ -13,6 +13,8 @@ pub struct JsonObjectOptions {
// If set to some, int, date, f64 and text will be indexed. // If set to some, int, date, f64 and text will be indexed.
// Text will use the TextFieldIndexing setting for indexing. // Text will use the TextFieldIndexing setting for indexing.
indexing: Option<TextFieldIndexing>, indexing: Option<TextFieldIndexing>,
expand_dots_enabled: bool,
} }
impl JsonObjectOptions { impl JsonObjectOptions {
@@ -26,6 +28,29 @@ impl JsonObjectOptions {
self.indexing.is_some() self.indexing.is_some()
} }
/// Returns `true` iff dots in json keys should be expanded.
///
/// When expand_dots is enabled, json object like
/// `{"k8s.node.id": 5}` is processed as if it was
/// `{"k8s": {"node": {"id": 5}}}`.
/// It option has the merit of allowing users to
/// write queries like `k8s.node.id:5`.
/// On the other, enabling that feature can lead to
/// ambiguity.
///
/// If disabled, the "." need to be escaped:
/// `k8s\.node\.id:5`.
pub fn is_expand_dots_enabled(&self) -> bool {
self.expand_dots_enabled
}
/// Sets `expands_dots` to true.
/// See `is_expand_dots_enabled` for more information.
pub fn set_expand_dots_enabled(mut self) -> Self {
self.expand_dots_enabled = true;
self
}
/// Returns the text indexing options. /// Returns the text indexing options.
/// ///
/// If set to `Some` then both int and str values will be indexed. /// If set to `Some` then both int and str values will be indexed.
@@ -55,6 +80,7 @@ impl From<StoredFlag> for JsonObjectOptions {
JsonObjectOptions { JsonObjectOptions {
stored: true, stored: true,
indexing: None, indexing: None,
expand_dots_enabled: false,
} }
} }
} }
@@ -69,10 +95,11 @@ impl<T: Into<JsonObjectOptions>> BitOr<T> for JsonObjectOptions {
type Output = JsonObjectOptions; type Output = JsonObjectOptions;
fn bitor(self, other: T) -> Self { fn bitor(self, other: T) -> Self {
let other = other.into(); let other: JsonObjectOptions = other.into();
JsonObjectOptions { JsonObjectOptions {
indexing: self.indexing.or(other.indexing), indexing: self.indexing.or(other.indexing),
stored: self.stored | other.stored, stored: self.stored | other.stored,
expand_dots_enabled: self.expand_dots_enabled | other.expand_dots_enabled,
} }
} }
} }
@@ -93,6 +120,7 @@ impl From<TextOptions> for JsonObjectOptions {
JsonObjectOptions { JsonObjectOptions {
stored: text_options.is_stored(), stored: text_options.is_stored(),
indexing: text_options.get_indexing_options().cloned(), indexing: text_options.get_indexing_options().cloned(),
expand_dots_enabled: false,
} }
} }
} }

View File

@@ -252,6 +252,31 @@ impl Eq for InnerSchema {}
#[derive(Clone, Eq, PartialEq, Debug)] #[derive(Clone, Eq, PartialEq, Debug)]
pub struct Schema(Arc<InnerSchema>); pub struct Schema(Arc<InnerSchema>);
// Returns the position (in byte offsets) of the unescaped '.' in the `field_path`.
//
// This function operates directly on bytes (as opposed to codepoint), relying
// on a encoding property of utf-8 for its correctness.
fn locate_splitting_dots(field_path: &str) -> Vec<usize> {
let mut splitting_dots_pos = Vec::new();
let mut escape_state = false;
for (pos, b) in field_path.bytes().enumerate() {
if escape_state {
escape_state = false;
continue;
}
match b {
b'\\' => {
escape_state = true;
}
b'.' => {
splitting_dots_pos.push(pos);
}
_ => {}
}
}
splitting_dots_pos
}
impl Schema { impl Schema {
/// Return the `FieldEntry` associated with a `Field`. /// Return the `FieldEntry` associated with a `Field`.
pub fn get_field_entry(&self, field: Field) -> &FieldEntry { pub fn get_field_entry(&self, field: Field) -> &FieldEntry {
@@ -358,6 +383,28 @@ impl Schema {
} }
Ok(doc) Ok(doc)
} }
/// Searches for a full_path in the schema, returning the field name and a JSON path.
///
/// This function works by checking if the field exists for the exact given full_path.
/// If it's not, it splits the full_path at non-escaped '.' chars and tries to match the
/// prefix with the field names, favoring the longest field names.
///
/// This does not check if field is a JSON field. It is possible for this functions to
/// return a non-empty JSON path with a non-JSON field.
pub fn find_field<'a>(&self, full_path: &'a str) -> Option<(Field, &'a str)> {
if let Some(field) = self.0.fields_map.get(full_path) {
return Some((*field, ""));
}
let mut splitting_period_pos: Vec<usize> = locate_splitting_dots(full_path);
while let Some(pos) = splitting_period_pos.pop() {
let (prefix, suffix) = full_path.split_at(pos);
if let Some(field) = self.0.fields_map.get(prefix) {
return Some((*field, &suffix[1..]));
}
}
None
}
} }
impl Serialize for Schema { impl Serialize for Schema {
@@ -436,6 +483,13 @@ mod tests {
use crate::schema::schema::DocParsingError::InvalidJson; use crate::schema::schema::DocParsingError::InvalidJson;
use crate::schema::*; use crate::schema::*;
#[test]
fn test_locate_splitting_dots() {
assert_eq!(&super::locate_splitting_dots("a.b.c"), &[1, 3]);
assert_eq!(&super::locate_splitting_dots(r#"a\.b.c"#), &[4]);
assert_eq!(&super::locate_splitting_dots(r#"a\..b.c"#), &[3, 5]);
}
#[test] #[test]
pub fn is_indexed_test() { pub fn is_indexed_test() {
let mut schema_builder = Schema::builder(); let mut schema_builder = Schema::builder();
@@ -936,4 +990,46 @@ mod tests {
]"#; ]"#;
assert_eq!(schema_json, expected); assert_eq!(schema_json, expected);
} }
#[test]
fn test_find_field() {
let mut schema_builder = Schema::builder();
schema_builder.add_json_field("foo", STRING);
schema_builder.add_text_field("bar", STRING);
schema_builder.add_text_field("foo.bar", STRING);
schema_builder.add_text_field("foo.bar.baz", STRING);
schema_builder.add_text_field("bar.a.b.c", STRING);
let schema = schema_builder.build();
assert_eq!(
schema.find_field("foo.bar"),
Some((schema.get_field("foo.bar").unwrap(), ""))
);
assert_eq!(
schema.find_field("foo.bar.bar"),
Some((schema.get_field("foo.bar").unwrap(), "bar"))
);
assert_eq!(
schema.find_field("foo.bar.baz"),
Some((schema.get_field("foo.bar.baz").unwrap(), ""))
);
assert_eq!(
schema.find_field("foo.toto"),
Some((schema.get_field("foo").unwrap(), "toto"))
);
assert_eq!(
schema.find_field("foo.bar"),
Some((schema.get_field("foo.bar").unwrap(), ""))
);
assert_eq!(
schema.find_field("bar.toto.titi"),
Some((schema.get_field("bar").unwrap(), "toto.titi"))
);
assert_eq!(schema.find_field("hello"), None);
assert_eq!(schema.find_field(""), None);
assert_eq!(schema.find_field("thiswouldbeareallylongfieldname"), None);
assert_eq!(schema.find_field("baz.bar.foo"), None);
}
} }

View File

@@ -197,8 +197,19 @@ impl Term {
} }
/// Appends value bytes to the Term. /// Appends value bytes to the Term.
pub fn append_bytes(&mut self, bytes: &[u8]) { ///
/// This function returns the segment that has just been added.
#[inline]
pub fn append_bytes(&mut self, bytes: &[u8]) -> &mut [u8] {
let len_before = self.0.len();
self.0.extend_from_slice(bytes); self.0.extend_from_slice(bytes);
&mut self.0[len_before..]
}
/// Appends a single byte to the term.
#[inline]
pub fn push_byte(&mut self, byte: u8) {
self.0.push(byte);
} }
} }

View File

@@ -90,7 +90,7 @@ impl CheckpointBlock {
return Ok(()); return Ok(());
} }
let mut doc = read_u32_vint(data); let mut doc = read_u32_vint(data);
let mut start_offset = read_u32_vint(data) as usize; let mut start_offset = VInt::deserialize_u64(data)? as usize;
for _ in 0..len { for _ in 0..len {
let num_docs = read_u32_vint(data); let num_docs = read_u32_vint(data);
let block_num_bytes = read_u32_vint(data) as usize; let block_num_bytes = read_u32_vint(data) as usize;
@@ -147,6 +147,15 @@ mod tests {
test_aux_ser_deser(&checkpoints) test_aux_ser_deser(&checkpoints)
} }
#[test]
fn test_block_serialize_large_byte_range() -> io::Result<()> {
let checkpoints = vec![Checkpoint {
doc_range: 10..12,
byte_range: 8_000_000_000..9_000_000_000,
}];
test_aux_ser_deser(&checkpoints)
}
#[test] #[test]
fn test_block_serialize() -> io::Result<()> { fn test_block_serialize() -> io::Result<()> {
let offsets: Vec<usize> = (0..11).map(|i| i * i * i).collect(); let offsets: Vec<usize> = (0..11).map(|i| i * i * i).collect();