mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-01-02 15:22:55 +00:00
Compare commits
8 Commits
typed-colu
...
typed-colu
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
25fb27f1a6 | ||
|
|
965ce3ef3e | ||
|
|
fb7e533354 | ||
|
|
1ff762bd8f | ||
|
|
d29d63a829 | ||
|
|
0e66423de8 | ||
|
|
6ab8990bbd | ||
|
|
d7a8053cc2 |
@@ -55,12 +55,13 @@ measure_time = "0.8.2"
|
||||
async-trait = "0.1.53"
|
||||
arc-swap = "1.5.0"
|
||||
|
||||
columnar = { version="0.1", path="./columnar", package ="tantivy-columnar" }
|
||||
sstable = { version="0.1", path="./sstable", package ="tantivy-sstable", optional = true }
|
||||
stacker = { version="0.1", path="./stacker", package ="tantivy-stacker" }
|
||||
tantivy-query-grammar = { version= "0.19.0", path="./query-grammar" }
|
||||
tantivy-bitpacker = { version= "0.3", path="./bitpacker" }
|
||||
columnar = { version= "0.1", path="./columnar", package="tantivy-columnar" }
|
||||
common = { version= "0.5", path = "./common/", package = "tantivy-common" }
|
||||
fastfield_codecs = { version= "0.3", path="./fastfield_codecs", default-features = false }
|
||||
tokenizer-api = { version="0.1", path="./tokenizer-api", package="tantivy-tokenizer-api" }
|
||||
|
||||
[target.'cfg(windows)'.dependencies]
|
||||
@@ -107,7 +108,7 @@ unstable = [] # useful for benches.
|
||||
quickwit = ["sstable"]
|
||||
|
||||
[workspace]
|
||||
members = ["query-grammar", "bitpacker", "common", "ownedbytes", "stacker", "sstable", "tokenizer-api", "columnar"]
|
||||
members = ["query-grammar", "bitpacker", "common", "fastfield_codecs", "ownedbytes", "stacker", "sstable", "tokenizer-api", "columnar"]
|
||||
|
||||
# Following the "fail" crate best practises, we isolate
|
||||
# tests that define specific behavior in fail check points
|
||||
|
||||
@@ -5,18 +5,19 @@ edition = "2021"
|
||||
license = "MIT"
|
||||
|
||||
[dependencies]
|
||||
itertools = "0.10.5"
|
||||
log = "0.4.17"
|
||||
fnv = "1.0.7"
|
||||
fastdivide = "0.4.0"
|
||||
rand = { version = "0.8.5", optional = true }
|
||||
measure_time = { version = "0.8.2", optional = true }
|
||||
prettytable-rs = { version = "0.10.0", optional = true }
|
||||
|
||||
stacker = { path = "../stacker", package="tantivy-stacker"}
|
||||
serde_json = "1"
|
||||
thiserror = "1"
|
||||
fnv = "1"
|
||||
sstable = { path = "../sstable", package = "tantivy-sstable" }
|
||||
common = { path = "../common", package = "tantivy-common" }
|
||||
itertools = "0.10"
|
||||
log = "0.4"
|
||||
tantivy-bitpacker = { version= "0.3", path = "../bitpacker/" }
|
||||
prettytable-rs = {version="0.10.0", optional= true}
|
||||
rand = {version="0.8.3", optional= true}
|
||||
fastdivide = "0.4"
|
||||
measure_time = { version="0.8.2", optional=true}
|
||||
|
||||
[dev-dependencies]
|
||||
proptest = "1"
|
||||
|
||||
@@ -27,8 +27,6 @@ compare with roaring bitmap/elias fano etc etc.
|
||||
SIMD range? (see blog post)
|
||||
Add alignment?
|
||||
Consider another codec to bridge the gap between few and 5k elements
|
||||
replug examples
|
||||
replug fast_field_codecs bench
|
||||
|
||||
# Cleanup and rationalization
|
||||
remove the 6 bit limitation of columntype. use 4 + 4 bits instead.
|
||||
|
||||
@@ -63,17 +63,11 @@ impl From<BytesColumn> for StrColumn {
|
||||
}
|
||||
|
||||
impl StrColumn {
|
||||
pub fn dictionary(&self) -> &Dictionary<VoidSSTable> {
|
||||
self.0.dictionary.as_ref()
|
||||
}
|
||||
|
||||
/// Fills the buffer
|
||||
pub fn ord_to_str(&self, term_ord: u64, output: &mut String) -> io::Result<bool> {
|
||||
unsafe {
|
||||
let buf = output.as_mut_vec();
|
||||
if !self.0.dictionary.ord_to_term(term_ord, buf)? {
|
||||
return Ok(false);
|
||||
}
|
||||
self.0.dictionary.ord_to_term(term_ord, buf)?;
|
||||
// TODO consider remove checks if it hurts performance.
|
||||
if std::str::from_utf8(buf.as_slice()).is_err() {
|
||||
buf.clear();
|
||||
|
||||
@@ -1,8 +1,7 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use crate::column_index::optional_index::set_block::{
|
||||
DenseBlockCodec, SparseBlockCodec, DENSE_BLOCK_NUM_BYTES,
|
||||
};
|
||||
use crate::column_index::optional_index::set_block::dense::DENSE_BLOCK_NUM_BYTES;
|
||||
use crate::column_index::optional_index::set_block::{DenseBlockCodec, SparseBlockCodec};
|
||||
use crate::column_index::optional_index::{SelectCursor, Set, SetCodec};
|
||||
|
||||
fn test_set_helper<C: SetCodec<Item = u16>>(vals: &[u16]) -> usize {
|
||||
|
||||
@@ -4,22 +4,24 @@ use std::net::Ipv6Addr;
|
||||
use crate::value::NumericalType;
|
||||
use crate::InvalidData;
|
||||
|
||||
/// The column type represents the column type.
|
||||
/// Any changes need to be propagated to `COLUMN_TYPES`.
|
||||
#[derive(Hash, Eq, PartialEq, Debug, Clone, Copy, Ord, PartialOrd)]
|
||||
/// The column type represents the column type and can fit on 6-bits.
|
||||
///
|
||||
/// - bits[0..3]: Column category type.
|
||||
/// - bits[3..6]: Numerical type if necessary.
|
||||
#[derive(Hash, Eq, PartialEq, Debug, Clone, Copy)]
|
||||
#[repr(u8)]
|
||||
pub enum ColumnType {
|
||||
I64 = 0u8,
|
||||
U64 = 1u8,
|
||||
F64 = 2u8,
|
||||
Bytes = 3u8,
|
||||
Str = 4u8,
|
||||
Bool = 5u8,
|
||||
IpAddr = 6u8,
|
||||
DateTime = 7u8,
|
||||
Bytes = 10u8,
|
||||
Str = 14u8,
|
||||
Bool = 18u8,
|
||||
IpAddr = 22u8,
|
||||
DateTime = 26u8,
|
||||
}
|
||||
|
||||
// The order needs to match _exactly_ the order in the enum
|
||||
#[cfg(test)]
|
||||
const COLUMN_TYPES: [ColumnType; 8] = [
|
||||
ColumnType::I64,
|
||||
ColumnType::U64,
|
||||
@@ -37,7 +39,18 @@ impl ColumnType {
|
||||
}
|
||||
|
||||
pub(crate) fn try_from_code(code: u8) -> Result<ColumnType, InvalidData> {
|
||||
COLUMN_TYPES.get(code as usize).copied().ok_or(InvalidData)
|
||||
use ColumnType::*;
|
||||
match code {
|
||||
0u8 => Ok(I64),
|
||||
1u8 => Ok(U64),
|
||||
2u8 => Ok(F64),
|
||||
10u8 => Ok(Bytes),
|
||||
14u8 => Ok(Str),
|
||||
18u8 => Ok(Bool),
|
||||
22u8 => Ok(IpAddr),
|
||||
26u8 => Ok(Self::DateTime),
|
||||
_ => Err(InvalidData),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -52,6 +65,18 @@ impl From<NumericalType> for ColumnType {
|
||||
}
|
||||
|
||||
impl ColumnType {
|
||||
/// get column type category
|
||||
pub(crate) fn column_type_category(self) -> ColumnTypeCategory {
|
||||
match self {
|
||||
ColumnType::I64 | ColumnType::U64 | ColumnType::F64 => ColumnTypeCategory::Numerical,
|
||||
ColumnType::Bytes => ColumnTypeCategory::Bytes,
|
||||
ColumnType::Str => ColumnTypeCategory::Str,
|
||||
ColumnType::Bool => ColumnTypeCategory::Bool,
|
||||
ColumnType::IpAddr => ColumnTypeCategory::IpAddr,
|
||||
ColumnType::DateTime => ColumnTypeCategory::DateTime,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn numerical_type(&self) -> Option<NumericalType> {
|
||||
match self {
|
||||
ColumnType::I64 => Some(NumericalType::I64),
|
||||
@@ -130,20 +155,70 @@ impl HasAssociatedColumnType for Ipv6Addr {
|
||||
}
|
||||
}
|
||||
|
||||
/// Column types are grouped into different categories that
|
||||
/// corresponds to the different types of `JsonValue` types.
|
||||
///
|
||||
/// The columnar writer will apply coercion rules to make sure that
|
||||
/// at most one column exist per `ColumnTypeCategory`.
|
||||
///
|
||||
/// See also [README.md].
|
||||
#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug)]
|
||||
#[repr(u8)]
|
||||
pub enum ColumnTypeCategory {
|
||||
Bool,
|
||||
Str,
|
||||
Numerical,
|
||||
DateTime,
|
||||
Bytes,
|
||||
IpAddr,
|
||||
}
|
||||
|
||||
impl From<ColumnType> for ColumnTypeCategory {
|
||||
fn from(column_type: ColumnType) -> Self {
|
||||
match column_type {
|
||||
ColumnType::I64 => ColumnTypeCategory::Numerical,
|
||||
ColumnType::U64 => ColumnTypeCategory::Numerical,
|
||||
ColumnType::F64 => ColumnTypeCategory::Numerical,
|
||||
ColumnType::Bytes => ColumnTypeCategory::Bytes,
|
||||
ColumnType::Str => ColumnTypeCategory::Str,
|
||||
ColumnType::Bool => ColumnTypeCategory::Bool,
|
||||
ColumnType::IpAddr => ColumnTypeCategory::IpAddr,
|
||||
ColumnType::DateTime => ColumnTypeCategory::DateTime,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::HashSet;
|
||||
|
||||
use super::*;
|
||||
use crate::Cardinality;
|
||||
|
||||
#[test]
|
||||
fn test_column_type_to_code() {
|
||||
for (code, expected_column_type) in super::COLUMN_TYPES.iter().copied().enumerate() {
|
||||
if let Ok(column_type) = ColumnType::try_from_code(code as u8) {
|
||||
assert_eq!(column_type, expected_column_type);
|
||||
let mut column_type_set: HashSet<ColumnType> = HashSet::new();
|
||||
for code in u8::MIN..=u8::MAX {
|
||||
if let Ok(column_type) = ColumnType::try_from_code(code) {
|
||||
assert_eq!(column_type.to_code(), code);
|
||||
assert!(column_type_set.insert(column_type));
|
||||
}
|
||||
}
|
||||
for code in COLUMN_TYPES.len() as u8..=u8::MAX {
|
||||
assert!(ColumnType::try_from_code(code as u8).is_err());
|
||||
assert_eq!(column_type_set.len(), super::COLUMN_TYPES.len());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_column_category_sort_consistent_with_column_type_sort() {
|
||||
// This is a very important property because we
|
||||
// we need to serialize colunmn in the right order.
|
||||
let mut column_types: Vec<ColumnType> = super::COLUMN_TYPES.iter().copied().collect();
|
||||
column_types.sort_by_key(|col| col.to_code());
|
||||
let column_categories: Vec<ColumnTypeCategory> = column_types
|
||||
.into_iter()
|
||||
.map(ColumnTypeCategory::from)
|
||||
.collect();
|
||||
for (prev, next) in column_categories.iter().zip(column_categories.iter()) {
|
||||
assert!(prev <= next);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
use std::collections::HashMap;
|
||||
use std::io;
|
||||
|
||||
use super::column_type::ColumnTypeCategory;
|
||||
use crate::columnar::ColumnarReader;
|
||||
use crate::dynamic_column::DynamicColumn;
|
||||
use crate::ColumnType;
|
||||
|
||||
pub enum MergeDocOrder {
|
||||
/// Columnar tables are simply stacked one above the other.
|
||||
@@ -35,40 +35,7 @@ pub fn merge_columnar(
|
||||
}
|
||||
}
|
||||
|
||||
/// Column types are grouped into different categories.
|
||||
/// After merge, all columns belonging to the same category are coerced to
|
||||
/// the same column type.
|
||||
///
|
||||
/// In practise, today, only Numerical colummns are coerced into one type today.
|
||||
///
|
||||
/// See also [README.md].
|
||||
#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
|
||||
#[repr(u8)]
|
||||
enum ColumnTypeCategory {
|
||||
Bool,
|
||||
Str,
|
||||
Numerical,
|
||||
DateTime,
|
||||
Bytes,
|
||||
IpAddr,
|
||||
}
|
||||
|
||||
impl From<ColumnType> for ColumnTypeCategory {
|
||||
fn from(column_type: ColumnType) -> Self {
|
||||
match column_type {
|
||||
ColumnType::I64 => ColumnTypeCategory::Numerical,
|
||||
ColumnType::U64 => ColumnTypeCategory::Numerical,
|
||||
ColumnType::F64 => ColumnTypeCategory::Numerical,
|
||||
ColumnType::Bytes => ColumnTypeCategory::Bytes,
|
||||
ColumnType::Str => ColumnTypeCategory::Str,
|
||||
ColumnType::Bool => ColumnTypeCategory::Bool,
|
||||
ColumnType::IpAddr => ColumnTypeCategory::IpAddr,
|
||||
ColumnType::DateTime => ColumnTypeCategory::DateTime,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn collect_columns(
|
||||
pub fn collect_columns(
|
||||
columnar_readers: &[&ColumnarReader],
|
||||
) -> io::Result<HashMap<String, HashMap<ColumnTypeCategory, Vec<DynamicColumn>>>> {
|
||||
// Each column name may have multiple types of column associated.
|
||||
@@ -84,7 +51,7 @@ fn collect_columns(
|
||||
.or_default();
|
||||
|
||||
let columns = column_type_to_handles
|
||||
.entry(handle.column_type().into())
|
||||
.entry(handle.column_type().column_type_category())
|
||||
.or_default();
|
||||
columns.push(handle.open()?);
|
||||
}
|
||||
@@ -95,9 +62,10 @@ fn collect_columns(
|
||||
Ok(field_name_to_group)
|
||||
}
|
||||
|
||||
/// Coerce numerical type columns to the same type
|
||||
/// TODO rename to `coerce_columns`
|
||||
fn normalize_columns(map: &mut HashMap<String, HashMap<ColumnTypeCategory, Vec<DynamicColumn>>>) {
|
||||
/// Cast numerical type columns to the same type
|
||||
pub(crate) fn normalize_columns(
|
||||
map: &mut HashMap<String, HashMap<ColumnTypeCategory, Vec<DynamicColumn>>>,
|
||||
) {
|
||||
for (_field_name, type_category_to_columns) in map.iter_mut() {
|
||||
for (type_category, columns) in type_category_to_columns {
|
||||
if type_category == &ColumnTypeCategory::Numerical {
|
||||
|
||||
@@ -189,12 +189,10 @@ impl CompatibleNumericalTypes {
|
||||
}
|
||||
|
||||
impl NumericalColumnWriter {
|
||||
pub fn numerical_type(&self) -> NumericalType {
|
||||
self.compatible_numerical_types.to_numerical_type()
|
||||
}
|
||||
|
||||
pub fn cardinality(&self, num_docs: RowId) -> Cardinality {
|
||||
self.column_writer.get_cardinality(num_docs)
|
||||
pub fn column_type_and_cardinality(&self, num_docs: RowId) -> (NumericalType, Cardinality) {
|
||||
let numerical_type = self.compatible_numerical_types.to_numerical_type();
|
||||
let cardinality = self.column_writer.get_cardinality(num_docs);
|
||||
(numerical_type, cardinality)
|
||||
}
|
||||
|
||||
pub fn record_numerical_value(
|
||||
|
||||
@@ -15,7 +15,7 @@ use crate::column_index::SerializableColumnIndex;
|
||||
use crate::column_values::{
|
||||
ColumnValues, MonotonicallyMappableToU128, MonotonicallyMappableToU64, VecColumn,
|
||||
};
|
||||
use crate::columnar::column_type::ColumnType;
|
||||
use crate::columnar::column_type::{ColumnType, ColumnTypeCategory};
|
||||
use crate::columnar::writer::column_writers::{
|
||||
ColumnWriter, NumericalColumnWriter, StrOrBytesColumnWriter,
|
||||
};
|
||||
@@ -279,40 +279,35 @@ impl ColumnarWriter {
|
||||
}
|
||||
pub fn serialize(&mut self, num_docs: RowId, wrt: &mut dyn io::Write) -> io::Result<()> {
|
||||
let mut serializer = ColumnarSerializer::new(wrt);
|
||||
let mut columns: Vec<(&[u8], ColumnType, Addr)> = self
|
||||
let mut columns: Vec<(&[u8], ColumnTypeCategory, Addr)> = self
|
||||
.numerical_field_hash_map
|
||||
.iter()
|
||||
.map(|(column_name, addr, _)| {
|
||||
let numerical_column_writer: NumericalColumnWriter =
|
||||
self.numerical_field_hash_map.read(addr);
|
||||
let column_type = numerical_column_writer.numerical_type().into();
|
||||
(column_name, column_type, addr)
|
||||
})
|
||||
.map(|(column_name, addr, _)| (column_name, ColumnTypeCategory::Numerical, addr))
|
||||
.collect();
|
||||
columns.extend(
|
||||
self.bytes_field_hash_map
|
||||
.iter()
|
||||
.map(|(term, addr, _)| (term, ColumnType::Bytes, addr)),
|
||||
.map(|(term, addr, _)| (term, ColumnTypeCategory::Bytes, addr)),
|
||||
);
|
||||
columns.extend(
|
||||
self.str_field_hash_map
|
||||
.iter()
|
||||
.map(|(column_name, addr, _)| (column_name, ColumnType::Str, addr)),
|
||||
.map(|(column_name, addr, _)| (column_name, ColumnTypeCategory::Str, addr)),
|
||||
);
|
||||
columns.extend(
|
||||
self.bool_field_hash_map
|
||||
.iter()
|
||||
.map(|(column_name, addr, _)| (column_name, ColumnType::Bool, addr)),
|
||||
.map(|(column_name, addr, _)| (column_name, ColumnTypeCategory::Bool, addr)),
|
||||
);
|
||||
columns.extend(
|
||||
self.ip_addr_field_hash_map
|
||||
.iter()
|
||||
.map(|(column_name, addr, _)| (column_name, ColumnType::IpAddr, addr)),
|
||||
.map(|(column_name, addr, _)| (column_name, ColumnTypeCategory::IpAddr, addr)),
|
||||
);
|
||||
columns.extend(
|
||||
self.datetime_field_hash_map
|
||||
.iter()
|
||||
.map(|(column_name, addr, _)| (column_name, ColumnType::DateTime, addr)),
|
||||
.map(|(column_name, addr, _)| (column_name, ColumnTypeCategory::DateTime, addr)),
|
||||
);
|
||||
columns.sort_unstable_by_key(|(column_name, col_type, _)| (*column_name, *col_type));
|
||||
|
||||
@@ -320,12 +315,8 @@ impl ColumnarWriter {
|
||||
let mut symbol_byte_buffer: Vec<u8> = Vec::new();
|
||||
for (column_name, column_type, addr) in columns {
|
||||
match column_type {
|
||||
ColumnType::Bool | ColumnType::DateTime => {
|
||||
let column_writer: ColumnWriter = if column_type == ColumnType::Bool {
|
||||
self.bool_field_hash_map.read(addr)
|
||||
} else {
|
||||
self.datetime_field_hash_map.read(addr)
|
||||
};
|
||||
ColumnTypeCategory::Bool => {
|
||||
let column_writer: ColumnWriter = self.bool_field_hash_map.read(addr);
|
||||
let cardinality = column_writer.get_cardinality(num_docs);
|
||||
let mut column_serializer =
|
||||
serializer.serialize_column(column_name, ColumnType::Bool);
|
||||
@@ -337,7 +328,7 @@ impl ColumnarWriter {
|
||||
&mut column_serializer,
|
||||
)?;
|
||||
}
|
||||
ColumnType::IpAddr => {
|
||||
ColumnTypeCategory::IpAddr => {
|
||||
let column_writer: ColumnWriter = self.ip_addr_field_hash_map.read(addr);
|
||||
let cardinality = column_writer.get_cardinality(num_docs);
|
||||
let mut column_serializer =
|
||||
@@ -350,36 +341,33 @@ impl ColumnarWriter {
|
||||
&mut column_serializer,
|
||||
)?;
|
||||
}
|
||||
ColumnType::Bytes | ColumnType::Str => {
|
||||
let str_or_bytes_column_writer: StrOrBytesColumnWriter =
|
||||
if column_type == ColumnType::Bytes {
|
||||
self.bytes_field_hash_map.read(addr)
|
||||
ColumnTypeCategory::Bytes | ColumnTypeCategory::Str => {
|
||||
let (column_type, str_column_writer): (ColumnType, StrOrBytesColumnWriter) =
|
||||
if column_type == ColumnTypeCategory::Bytes {
|
||||
(ColumnType::Bytes, self.bytes_field_hash_map.read(addr))
|
||||
} else {
|
||||
self.str_field_hash_map.read(addr)
|
||||
(ColumnType::Str, self.str_field_hash_map.read(addr))
|
||||
};
|
||||
let dictionary_builder =
|
||||
&dictionaries[str_or_bytes_column_writer.dictionary_id as usize];
|
||||
let cardinality = str_or_bytes_column_writer
|
||||
.column_writer
|
||||
.get_cardinality(num_docs);
|
||||
&dictionaries[str_column_writer.dictionary_id as usize];
|
||||
let cardinality = str_column_writer.column_writer.get_cardinality(num_docs);
|
||||
let mut column_serializer =
|
||||
serializer.serialize_column(column_name, column_type);
|
||||
serialize_bytes_or_str_column(
|
||||
cardinality,
|
||||
num_docs,
|
||||
str_or_bytes_column_writer.sort_values_within_row,
|
||||
str_column_writer.sort_values_within_row,
|
||||
dictionary_builder,
|
||||
str_or_bytes_column_writer
|
||||
.operation_iterator(arena, &mut symbol_byte_buffer),
|
||||
str_column_writer.operation_iterator(arena, &mut symbol_byte_buffer),
|
||||
buffers,
|
||||
&mut column_serializer,
|
||||
)?;
|
||||
}
|
||||
ColumnType::I64 | ColumnType::F64 | ColumnType::U64 => {
|
||||
ColumnTypeCategory::Numerical => {
|
||||
let numerical_column_writer: NumericalColumnWriter =
|
||||
self.numerical_field_hash_map.read(addr);
|
||||
let numerical_type = column_type.numerical_type().unwrap();
|
||||
let cardinality = numerical_column_writer.cardinality(num_docs);
|
||||
let (numerical_type, cardinality) =
|
||||
numerical_column_writer.column_type_and_cardinality(num_docs);
|
||||
let mut column_serializer =
|
||||
serializer.serialize_column(column_name, ColumnType::from(numerical_type));
|
||||
serialize_numerical_column(
|
||||
@@ -391,6 +379,20 @@ impl ColumnarWriter {
|
||||
&mut column_serializer,
|
||||
)?;
|
||||
}
|
||||
ColumnTypeCategory::DateTime => {
|
||||
let column_writer: ColumnWriter = self.datetime_field_hash_map.read(addr);
|
||||
let cardinality = column_writer.get_cardinality(num_docs);
|
||||
let mut column_serializer =
|
||||
serializer.serialize_column(column_name, ColumnType::DateTime);
|
||||
serialize_numerical_column(
|
||||
cardinality,
|
||||
num_docs,
|
||||
NumericalType::I64,
|
||||
column_writer.operation_iterator(arena, &mut symbol_byte_buffer),
|
||||
buffers,
|
||||
&mut column_serializer,
|
||||
)?;
|
||||
}
|
||||
};
|
||||
}
|
||||
serializer.finalize()?;
|
||||
|
||||
@@ -4,7 +4,7 @@ use std::rc::Rc;
|
||||
use std::sync::atomic::AtomicU32;
|
||||
use std::sync::Arc;
|
||||
|
||||
use columnar::{Column, StrColumn};
|
||||
use fastfield_codecs::Column;
|
||||
|
||||
use super::agg_req::{Aggregation, Aggregations, BucketAggregationType, MetricAggregation};
|
||||
use super::bucket::{HistogramAggregation, RangeAggregation, TermsAggregation};
|
||||
@@ -14,7 +14,8 @@ use super::metric::{
|
||||
};
|
||||
use super::segment_agg_result::BucketCount;
|
||||
use super::VecWithNames;
|
||||
use crate::schema::{FieldType, Type};
|
||||
use crate::fastfield::{type_and_cardinality, MultiValuedFastFieldReader};
|
||||
use crate::schema::Type;
|
||||
use crate::{InvertedIndexReader, SegmentReader, TantivyError};
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
@@ -36,12 +37,38 @@ impl AggregationsWithAccessor {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) enum FastFieldAccessor {
|
||||
Multi(MultiValuedFastFieldReader<u64>),
|
||||
Single(Arc<dyn Column<u64>>),
|
||||
}
|
||||
impl FastFieldAccessor {
|
||||
pub fn as_single(&self) -> Option<&dyn Column<u64>> {
|
||||
match self {
|
||||
FastFieldAccessor::Multi(_) => None,
|
||||
FastFieldAccessor::Single(reader) => Some(&**reader),
|
||||
}
|
||||
}
|
||||
pub fn into_single(self) -> Option<Arc<dyn Column<u64>>> {
|
||||
match self {
|
||||
FastFieldAccessor::Multi(_) => None,
|
||||
FastFieldAccessor::Single(reader) => Some(reader),
|
||||
}
|
||||
}
|
||||
pub fn as_multi(&self) -> Option<&MultiValuedFastFieldReader<u64>> {
|
||||
match self {
|
||||
FastFieldAccessor::Multi(reader) => Some(reader),
|
||||
FastFieldAccessor::Single(_) => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct BucketAggregationWithAccessor {
|
||||
/// In general there can be buckets without fast field access, e.g. buckets that are created
|
||||
/// based on search terms. So eventually this needs to be Option or moved.
|
||||
pub(crate) accessor: Column<u64>,
|
||||
pub(crate) str_dict_column: Option<StrColumn>,
|
||||
pub(crate) accessor: FastFieldAccessor,
|
||||
pub(crate) inverted_index: Option<Arc<InvertedIndexReader>>,
|
||||
pub(crate) field_type: Type,
|
||||
pub(crate) bucket_agg: BucketAggregationType,
|
||||
pub(crate) sub_aggregation: AggregationsWithAccessor,
|
||||
@@ -56,19 +83,20 @@ impl BucketAggregationWithAccessor {
|
||||
bucket_count: Rc<AtomicU32>,
|
||||
max_bucket_count: u32,
|
||||
) -> crate::Result<BucketAggregationWithAccessor> {
|
||||
let mut str_dict_column = None;
|
||||
let mut inverted_index = None;
|
||||
let (accessor, field_type) = match &bucket {
|
||||
BucketAggregationType::Range(RangeAggregation {
|
||||
field: field_name, ..
|
||||
}) => get_ff_reader_and_validate(reader, field_name)?,
|
||||
}) => get_ff_reader_and_validate(reader, field_name, Cardinality::SingleValue)?,
|
||||
BucketAggregationType::Histogram(HistogramAggregation {
|
||||
field: field_name, ..
|
||||
}) => get_ff_reader_and_validate(reader, field_name)?,
|
||||
}) => get_ff_reader_and_validate(reader, field_name, Cardinality::SingleValue)?,
|
||||
BucketAggregationType::Terms(TermsAggregation {
|
||||
field: field_name, ..
|
||||
}) => {
|
||||
str_dict_column = reader.fast_fields().str(&field_name)?;
|
||||
get_ff_reader_and_validate(reader, field_name)?
|
||||
let field = reader.schema().get_field(field_name)?;
|
||||
inverted_index = Some(reader.inverted_index(field)?);
|
||||
get_ff_reader_and_validate(reader, field_name, Cardinality::MultiValues)?
|
||||
}
|
||||
};
|
||||
let sub_aggregation = sub_aggregation.clone();
|
||||
@@ -82,7 +110,7 @@ impl BucketAggregationWithAccessor {
|
||||
max_bucket_count,
|
||||
)?,
|
||||
bucket_agg: bucket.clone(),
|
||||
str_dict_column,
|
||||
inverted_index,
|
||||
bucket_count: BucketCount {
|
||||
bucket_count,
|
||||
max_bucket_count,
|
||||
@@ -96,7 +124,7 @@ impl BucketAggregationWithAccessor {
|
||||
pub struct MetricAggregationWithAccessor {
|
||||
pub metric: MetricAggregation,
|
||||
pub field_type: Type,
|
||||
pub accessor: Column<u64>,
|
||||
pub accessor: Arc<dyn Column>,
|
||||
}
|
||||
|
||||
impl MetricAggregationWithAccessor {
|
||||
@@ -111,10 +139,13 @@ impl MetricAggregationWithAccessor {
|
||||
| MetricAggregation::Min(MinAggregation { field: field_name })
|
||||
| MetricAggregation::Stats(StatsAggregation { field: field_name })
|
||||
| MetricAggregation::Sum(SumAggregation { field: field_name }) => {
|
||||
let (accessor, field_type) = get_ff_reader_and_validate(reader, field_name)?;
|
||||
let (accessor, field_type) =
|
||||
get_ff_reader_and_validate(reader, field_name, Cardinality::SingleValue)?;
|
||||
|
||||
Ok(MetricAggregationWithAccessor {
|
||||
accessor,
|
||||
accessor: accessor
|
||||
.into_single()
|
||||
.expect("unexpected fast field cardinality"),
|
||||
field_type,
|
||||
metric: metric.clone(),
|
||||
})
|
||||
@@ -159,19 +190,32 @@ pub(crate) fn get_aggs_with_accessor_and_validate(
|
||||
fn get_ff_reader_and_validate(
|
||||
reader: &SegmentReader,
|
||||
field_name: &str,
|
||||
) -> crate::Result<(columnar::Column<u64>, Type)> {
|
||||
cardinality: Cardinality,
|
||||
) -> crate::Result<(FastFieldAccessor, Type)> {
|
||||
let field = reader.schema().get_field(field_name)?;
|
||||
// TODO we should get type metadata from columnar
|
||||
let field_type = reader
|
||||
.schema()
|
||||
.get_field_entry(field)
|
||||
.field_type()
|
||||
.value_type();
|
||||
// TODO Do validation
|
||||
let field_type = reader.schema().get_field_entry(field).field_type();
|
||||
|
||||
if let Some((_ff_type, field_cardinality)) = type_and_cardinality(field_type) {
|
||||
if cardinality != field_cardinality {
|
||||
return Err(TantivyError::InvalidArgument(format!(
|
||||
"Invalid field cardinality on field {} expected {:?}, but got {:?}",
|
||||
field_name, cardinality, field_cardinality
|
||||
)));
|
||||
}
|
||||
} else {
|
||||
return Err(TantivyError::InvalidArgument(format!(
|
||||
"Only fast fields of type f64, u64, i64 are supported, but got {:?} ",
|
||||
field_type.value_type()
|
||||
)));
|
||||
};
|
||||
|
||||
let ff_fields = reader.fast_fields();
|
||||
let ff_field = ff_fields.u64_lenient(field_name)?.ok_or_else(|| {
|
||||
TantivyError::InvalidArgument(format!("No fast field found for field: {}", field_name))
|
||||
})?;
|
||||
Ok((ff_field, field_type))
|
||||
match cardinality {
|
||||
Cardinality::SingleValue => ff_fields
|
||||
.u64_lenient(field_name)
|
||||
.map(|field| (FastFieldAccessor::Single(field), field_type.value_type())),
|
||||
Cardinality::MultiValues => ff_fields
|
||||
.u64s_lenient(field_name)
|
||||
.map(|field| (FastFieldAccessor::Multi(field), field_type.value_type())),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,123 +0,0 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
/// DateHistogramAggregation is similar to `HistogramAggregation`, but it can only be used with date type.
|
||||
///
|
||||
/// Currently only **fixed time** intervals are supported. Calendar-aware time intervals are not
|
||||
/// supported.
|
||||
///
|
||||
/// Like the histogram, values are rounded down into the closest bucket.
|
||||
///
|
||||
/// For this calculation all fastfield values are converted to f64.
|
||||
///
|
||||
/// # Limitations/Compatibility
|
||||
/// Only fixed time intervals are supported.
|
||||
///
|
||||
/// # JSON Format
|
||||
/// ```json
|
||||
/// {
|
||||
/// "prices": {
|
||||
/// "date_histogram": {
|
||||
/// "field": "price",
|
||||
/// "fixed_interval": "30d"
|
||||
/// }
|
||||
/// }
|
||||
/// }
|
||||
/// ```
|
||||
///
|
||||
/// Response
|
||||
/// See [`BucketEntry`](crate::aggregation::agg_result::BucketEntry)
|
||||
#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
|
||||
pub struct DateHistogramAggregationReq {
|
||||
/// The field to aggregate on.
|
||||
pub field: String,
|
||||
/// The interval to chunk your data range. Each bucket spans a value range of [0..fixed_interval).
|
||||
/// Accepted values
|
||||
///
|
||||
/// Fixed intervals are configured with the `fixed_interval` parameter.
|
||||
/// In contrast to calendar-aware intervals, fixed intervals are a fixed number of SI units and never deviate, regardless of where they fall on the calendar.
|
||||
/// One second is always composed of 1000ms. This allows fixed intervals to be specified in any multiple of the supported units.
|
||||
/// However, it means fixed intervals cannot express other units such as months, since the duration of a month is not a fixed quantity.
|
||||
/// Attempting to specify a calendar interval like month or quarter will return an Error.
|
||||
///
|
||||
/// The accepted units for fixed intervals are:
|
||||
/// * `ms`: milliseconds
|
||||
/// * `s`: seconds. Defined as 1000 milliseconds each.
|
||||
/// * `m`: minutes. Defined as 60 seconds each (60_000 milliseconds).
|
||||
/// * `h`: hours. Defined as 60 minutes each (3_600_000 milliseconds).
|
||||
/// * `d`: days. Defined as 24 hours (86_400_000 milliseconds).
|
||||
///
|
||||
/// Fractional time values are not supported, but you can address this by shifting to another time unit
|
||||
/// (e.g., `1.5h` could instead be specified as `90m`).
|
||||
pub fixed_interval: String,
|
||||
/// Intervals implicitly defines an absolute grid of buckets `[interval * k, interval * (k + 1))`.
|
||||
///
|
||||
pub offset: Option<String>,
|
||||
/// Whether to return the buckets as a hash map
|
||||
#[serde(default)]
|
||||
pub keyed: bool,
|
||||
}
|
||||
|
||||
impl DateHistogramAggregationReq {
|
||||
fn validate(&self) -> crate::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
/// Errors when parsing the fixed interval for `DateHistogramAggregationReq`.
|
||||
pub enum DateHistogramParseError {
|
||||
/// Unit not recognized in passed String
|
||||
UnitNotRecognized(String),
|
||||
/// Number not found in passed String
|
||||
NumberMissing(String),
|
||||
/// Unit not found in passed String
|
||||
UnitMissing(String),
|
||||
}
|
||||
|
||||
fn parse_into_milliseconds(input: &str) -> Result<u64, DateHistogramParseError> {
|
||||
let split_boundary = input
|
||||
.char_indices()
|
||||
.take_while(|(pos, el)| el.is_numeric())
|
||||
.count();
|
||||
let (number, unit) = input.split_at(split_boundary);
|
||||
if number.is_empty() {
|
||||
return Err(DateHistogramParseError::NumberMissing(input.to_string()));
|
||||
}
|
||||
if unit.is_empty() {
|
||||
return Err(DateHistogramParseError::UnitMissing(input.to_string()));
|
||||
}
|
||||
let number: u64 = number.parse().unwrap();
|
||||
let multiplier_from_unit = match unit {
|
||||
"ms" => 1,
|
||||
"s" => 1000,
|
||||
"m" => 60 * 1000,
|
||||
"h" => 60 * 60 * 1000,
|
||||
"d" => 24 * 60 * 60 * 1000,
|
||||
_ => return Err(DateHistogramParseError::UnitNotRecognized(unit.to_string())),
|
||||
};
|
||||
|
||||
Ok(number * multiplier_from_unit)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn parser_test() {
|
||||
assert_eq!(parse_into_milliseconds("1m").unwrap(), 60_000);
|
||||
assert_eq!(parse_into_milliseconds("2m").unwrap(), 120_000);
|
||||
assert_eq!(
|
||||
parse_into_milliseconds("2y").unwrap_err(),
|
||||
DateHistogramParseError::UnitNotRecognized("y".to_string())
|
||||
);
|
||||
assert_eq!(
|
||||
parse_into_milliseconds("2000").unwrap_err(),
|
||||
DateHistogramParseError::UnitMissing("2000".to_string())
|
||||
);
|
||||
assert_eq!(
|
||||
parse_into_milliseconds("ms").unwrap_err(),
|
||||
DateHistogramParseError::NumberMissing("ms".to_string())
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -1,7 +1,7 @@
|
||||
use std::cmp::Ordering;
|
||||
use std::fmt::Display;
|
||||
|
||||
use columnar::Column;
|
||||
use fastfield_codecs::Column;
|
||||
use itertools::Itertools;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
@@ -13,9 +13,7 @@ use crate::aggregation::agg_result::BucketEntry;
|
||||
use crate::aggregation::intermediate_agg_result::{
|
||||
IntermediateAggregationResults, IntermediateBucketResult, IntermediateHistogramBucketEntry,
|
||||
};
|
||||
use crate::aggregation::segment_agg_result::{
|
||||
GenericSegmentAggregationResultsCollector, SegmentAggregationCollector,
|
||||
};
|
||||
use crate::aggregation::segment_agg_result::SegmentAggregationResultsCollector;
|
||||
use crate::aggregation::{f64_from_fastfield_u64, format_date};
|
||||
use crate::schema::{Schema, Type};
|
||||
use crate::{DocId, TantivyError};
|
||||
@@ -64,6 +62,7 @@ use crate::{DocId, TantivyError};
|
||||
///
|
||||
/// Response
|
||||
/// See [`BucketEntry`](crate::aggregation::agg_result::BucketEntry)
|
||||
|
||||
#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
|
||||
pub struct HistogramAggregation {
|
||||
/// The field to aggregate on.
|
||||
@@ -185,7 +184,7 @@ pub(crate) struct SegmentHistogramBucketEntry {
|
||||
impl SegmentHistogramBucketEntry {
|
||||
pub(crate) fn into_intermediate_bucket_entry(
|
||||
self,
|
||||
sub_aggregation: GenericSegmentAggregationResultsCollector,
|
||||
sub_aggregation: SegmentAggregationResultsCollector,
|
||||
agg_with_accessor: &AggregationsWithAccessor,
|
||||
) -> crate::Result<IntermediateHistogramBucketEntry> {
|
||||
Ok(IntermediateHistogramBucketEntry {
|
||||
@@ -199,11 +198,11 @@ impl SegmentHistogramBucketEntry {
|
||||
|
||||
/// The collector puts values from the fast field into the correct buckets and does a conversion to
|
||||
/// the correct datatype.
|
||||
#[derive(Clone, Debug)]
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub struct SegmentHistogramCollector {
|
||||
/// The buckets containing the aggregation data.
|
||||
buckets: Vec<SegmentHistogramBucketEntry>,
|
||||
sub_aggregations: Option<Vec<GenericSegmentAggregationResultsCollector>>,
|
||||
sub_aggregations: Option<Vec<SegmentAggregationResultsCollector>>,
|
||||
field_type: Type,
|
||||
interval: f64,
|
||||
offset: f64,
|
||||
@@ -284,7 +283,7 @@ impl SegmentHistogramCollector {
|
||||
req: &HistogramAggregation,
|
||||
sub_aggregation: &AggregationsWithAccessor,
|
||||
field_type: Type,
|
||||
accessor: &Column<u64>,
|
||||
accessor: &dyn Column<u64>,
|
||||
) -> crate::Result<Self> {
|
||||
req.validate()?;
|
||||
let min = f64_from_fastfield_u64(accessor.min_value(), &field_type);
|
||||
@@ -301,7 +300,7 @@ impl SegmentHistogramCollector {
|
||||
None
|
||||
} else {
|
||||
let sub_aggregation =
|
||||
GenericSegmentAggregationResultsCollector::from_req_and_validate(sub_aggregation)?;
|
||||
SegmentAggregationResultsCollector::from_req_and_validate(sub_aggregation)?;
|
||||
Some(buckets.iter().map(|_| sub_aggregation.clone()).collect())
|
||||
};
|
||||
|
||||
@@ -336,7 +335,7 @@ impl SegmentHistogramCollector {
|
||||
#[inline]
|
||||
pub(crate) fn collect_block(
|
||||
&mut self,
|
||||
docs: &[DocId],
|
||||
doc: &[DocId],
|
||||
bucket_with_accessor: &BucketAggregationWithAccessor,
|
||||
force_flush: bool,
|
||||
) -> crate::Result<()> {
|
||||
@@ -347,20 +346,64 @@ impl SegmentHistogramCollector {
|
||||
let get_bucket_num =
|
||||
|val| (get_bucket_num_f64(val, interval, offset) as i64 - first_bucket_num) as usize;
|
||||
|
||||
let accessor = &bucket_with_accessor.accessor;
|
||||
for doc in docs {
|
||||
for val in accessor.values(*doc) {
|
||||
let val = self.f64_from_fastfield_u64(val);
|
||||
let accessor = bucket_with_accessor
|
||||
.accessor
|
||||
.as_single()
|
||||
.expect("unexpected fast field cardinatility");
|
||||
let mut iter = doc.chunks_exact(4);
|
||||
for docs in iter.by_ref() {
|
||||
let val0 = self.f64_from_fastfield_u64(accessor.get_val(docs[0]));
|
||||
let val1 = self.f64_from_fastfield_u64(accessor.get_val(docs[1]));
|
||||
let val2 = self.f64_from_fastfield_u64(accessor.get_val(docs[2]));
|
||||
let val3 = self.f64_from_fastfield_u64(accessor.get_val(docs[3]));
|
||||
|
||||
let bucket_pos = get_bucket_num(val);
|
||||
self.increment_bucket_if_in_bounds(
|
||||
val,
|
||||
&bounds,
|
||||
bucket_pos,
|
||||
*doc,
|
||||
&bucket_with_accessor.sub_aggregation,
|
||||
)?;
|
||||
let bucket_pos0 = get_bucket_num(val0);
|
||||
let bucket_pos1 = get_bucket_num(val1);
|
||||
let bucket_pos2 = get_bucket_num(val2);
|
||||
let bucket_pos3 = get_bucket_num(val3);
|
||||
|
||||
self.increment_bucket_if_in_bounds(
|
||||
val0,
|
||||
&bounds,
|
||||
bucket_pos0,
|
||||
docs[0],
|
||||
&bucket_with_accessor.sub_aggregation,
|
||||
)?;
|
||||
self.increment_bucket_if_in_bounds(
|
||||
val1,
|
||||
&bounds,
|
||||
bucket_pos1,
|
||||
docs[1],
|
||||
&bucket_with_accessor.sub_aggregation,
|
||||
)?;
|
||||
self.increment_bucket_if_in_bounds(
|
||||
val2,
|
||||
&bounds,
|
||||
bucket_pos2,
|
||||
docs[2],
|
||||
&bucket_with_accessor.sub_aggregation,
|
||||
)?;
|
||||
self.increment_bucket_if_in_bounds(
|
||||
val3,
|
||||
&bounds,
|
||||
bucket_pos3,
|
||||
docs[3],
|
||||
&bucket_with_accessor.sub_aggregation,
|
||||
)?;
|
||||
}
|
||||
for &doc in iter.remainder() {
|
||||
let val = f64_from_fastfield_u64(accessor.get_val(doc), &self.field_type);
|
||||
if !bounds.contains(val) {
|
||||
continue;
|
||||
}
|
||||
let bucket_pos = (get_bucket_num_f64(val, self.interval, self.offset) as i64
|
||||
- self.first_bucket_num) as usize;
|
||||
|
||||
debug_assert_eq!(
|
||||
self.buckets[bucket_pos].key,
|
||||
get_bucket_val(val, self.interval, self.offset)
|
||||
);
|
||||
self.increment_bucket(bucket_pos, doc, &bucket_with_accessor.sub_aggregation)?;
|
||||
}
|
||||
if force_flush {
|
||||
if let Some(sub_aggregations) = self.sub_aggregations.as_mut() {
|
||||
|
||||
@@ -1,4 +1,2 @@
|
||||
mod date_histogram;
|
||||
mod histogram;
|
||||
pub use date_histogram::*;
|
||||
pub use histogram::*;
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use std::fmt::Debug;
|
||||
use std::ops::Range;
|
||||
|
||||
use columnar::MonotonicallyMappableToU64;
|
||||
use fastfield_codecs::MonotonicallyMappableToU64;
|
||||
use rustc_hash::FxHashMap;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
@@ -11,9 +11,7 @@ use crate::aggregation::agg_req_with_accessor::{
|
||||
use crate::aggregation::intermediate_agg_result::{
|
||||
IntermediateBucketResult, IntermediateRangeBucketEntry, IntermediateRangeBucketResult,
|
||||
};
|
||||
use crate::aggregation::segment_agg_result::{
|
||||
BucketCount, GenericSegmentAggregationResultsCollector, SegmentAggregationCollector,
|
||||
};
|
||||
use crate::aggregation::segment_agg_result::{BucketCount, SegmentAggregationResultsCollector};
|
||||
use crate::aggregation::{
|
||||
f64_from_fastfield_u64, f64_to_fastfield_u64, format_date, Key, SerializedKey,
|
||||
};
|
||||
@@ -116,7 +114,7 @@ impl From<Range<u64>> for InternalRangeAggregationRange {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub(crate) struct SegmentRangeAndBucketEntry {
|
||||
range: Range<u64>,
|
||||
bucket: SegmentRangeBucketEntry,
|
||||
@@ -124,18 +122,18 @@ pub(crate) struct SegmentRangeAndBucketEntry {
|
||||
|
||||
/// The collector puts values from the fast field into the correct buckets and does a conversion to
|
||||
/// the correct datatype.
|
||||
#[derive(Clone, Debug)]
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub struct SegmentRangeCollector {
|
||||
/// The buckets containing the aggregation data.
|
||||
buckets: Vec<SegmentRangeAndBucketEntry>,
|
||||
field_type: Type,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
#[derive(Clone, PartialEq)]
|
||||
pub(crate) struct SegmentRangeBucketEntry {
|
||||
pub key: Key,
|
||||
pub doc_count: u64,
|
||||
pub sub_aggregation: Option<GenericSegmentAggregationResultsCollector>,
|
||||
pub sub_aggregation: Option<SegmentAggregationResultsCollector>,
|
||||
/// The from range of the bucket. Equals `f64::MIN` when `None`.
|
||||
pub from: Option<f64>,
|
||||
/// The to range of the bucket. Equals `f64::MAX` when `None`. Open interval, `to` is not
|
||||
@@ -229,11 +227,9 @@ impl SegmentRangeCollector {
|
||||
let sub_aggregation = if sub_aggregation.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(
|
||||
GenericSegmentAggregationResultsCollector::from_req_and_validate(
|
||||
sub_aggregation,
|
||||
)?,
|
||||
)
|
||||
Some(SegmentAggregationResultsCollector::from_req_and_validate(
|
||||
sub_aggregation,
|
||||
)?)
|
||||
};
|
||||
|
||||
Ok(SegmentRangeAndBucketEntry {
|
||||
@@ -261,18 +257,35 @@ impl SegmentRangeCollector {
|
||||
#[inline]
|
||||
pub(crate) fn collect_block(
|
||||
&mut self,
|
||||
docs: &[DocId],
|
||||
doc: &[DocId],
|
||||
bucket_with_accessor: &BucketAggregationWithAccessor,
|
||||
force_flush: bool,
|
||||
) -> crate::Result<()> {
|
||||
let accessor = &bucket_with_accessor.accessor;
|
||||
for doc in docs {
|
||||
for val in accessor.values(*doc) {
|
||||
let bucket_pos = self.get_bucket_pos(val);
|
||||
self.increment_bucket(bucket_pos, *doc, &bucket_with_accessor.sub_aggregation)?;
|
||||
}
|
||||
}
|
||||
let mut iter = doc.chunks_exact(4);
|
||||
let accessor = bucket_with_accessor
|
||||
.accessor
|
||||
.as_single()
|
||||
.expect("unexpected fast field cardinality");
|
||||
for docs in iter.by_ref() {
|
||||
let val1 = accessor.get_val(docs[0]);
|
||||
let val2 = accessor.get_val(docs[1]);
|
||||
let val3 = accessor.get_val(docs[2]);
|
||||
let val4 = accessor.get_val(docs[3]);
|
||||
let bucket_pos1 = self.get_bucket_pos(val1);
|
||||
let bucket_pos2 = self.get_bucket_pos(val2);
|
||||
let bucket_pos3 = self.get_bucket_pos(val3);
|
||||
let bucket_pos4 = self.get_bucket_pos(val4);
|
||||
|
||||
self.increment_bucket(bucket_pos1, docs[0], &bucket_with_accessor.sub_aggregation)?;
|
||||
self.increment_bucket(bucket_pos2, docs[1], &bucket_with_accessor.sub_aggregation)?;
|
||||
self.increment_bucket(bucket_pos3, docs[2], &bucket_with_accessor.sub_aggregation)?;
|
||||
self.increment_bucket(bucket_pos4, docs[3], &bucket_with_accessor.sub_aggregation)?;
|
||||
}
|
||||
for &doc in iter.remainder() {
|
||||
let val = accessor.get_val(doc);
|
||||
let bucket_pos = self.get_bucket_pos(val);
|
||||
self.increment_bucket(bucket_pos, doc, &bucket_with_accessor.sub_aggregation)?;
|
||||
}
|
||||
if force_flush {
|
||||
for bucket in &mut self.buckets {
|
||||
if let Some(sub_aggregation) = &mut bucket.bucket.sub_aggregation {
|
||||
@@ -421,7 +434,7 @@ pub(crate) fn range_to_key(range: &Range<u64>, field_type: &Type) -> crate::Resu
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use columnar::MonotonicallyMappableToU64;
|
||||
use fastfield_codecs::MonotonicallyMappableToU64;
|
||||
use serde_json::Value;
|
||||
|
||||
use super::*;
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
use std::fmt::Debug;
|
||||
|
||||
use columnar::Column;
|
||||
use itertools::Itertools;
|
||||
use rustc_hash::FxHashMap;
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -12,11 +11,9 @@ use crate::aggregation::agg_req_with_accessor::{
|
||||
use crate::aggregation::intermediate_agg_result::{
|
||||
IntermediateBucketResult, IntermediateTermBucketEntry, IntermediateTermBucketResult,
|
||||
};
|
||||
use crate::aggregation::segment_agg_result::{
|
||||
build_segment_agg_collector, GenericSegmentAggregationResultsCollector,
|
||||
SegmentAggregationCollector,
|
||||
};
|
||||
use crate::aggregation::segment_agg_result::{BucketCount, SegmentAggregationResultsCollector};
|
||||
use crate::error::DataCorruption;
|
||||
use crate::fastfield::MultiValuedFastFieldReader;
|
||||
use crate::schema::Type;
|
||||
use crate::{DocId, TantivyError};
|
||||
|
||||
@@ -199,16 +196,17 @@ impl TermsAggregationInternal {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Default)]
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
/// Container to store term_ids and their buckets.
|
||||
struct TermBuckets {
|
||||
pub(crate) entries: FxHashMap<u32, TermBucketEntry>,
|
||||
blueprint: Option<SegmentAggregationResultsCollector>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
#[derive(Clone, PartialEq, Default)]
|
||||
struct TermBucketEntry {
|
||||
doc_count: u64,
|
||||
sub_aggregations: Option<Box<dyn SegmentAggregationCollector>>,
|
||||
sub_aggregations: Option<SegmentAggregationResultsCollector>,
|
||||
}
|
||||
|
||||
impl Debug for TermBucketEntry {
|
||||
@@ -220,7 +218,7 @@ impl Debug for TermBucketEntry {
|
||||
}
|
||||
|
||||
impl TermBucketEntry {
|
||||
fn from_blueprint(blueprint: &Option<Box<dyn SegmentAggregationCollector>>) -> Self {
|
||||
fn from_blueprint(blueprint: &Option<SegmentAggregationResultsCollector>) -> Self {
|
||||
Self {
|
||||
doc_count: 0,
|
||||
sub_aggregations: blueprint.clone(),
|
||||
@@ -249,11 +247,46 @@ impl TermBuckets {
|
||||
sub_aggregation: &AggregationsWithAccessor,
|
||||
_max_term_id: usize,
|
||||
) -> crate::Result<Self> {
|
||||
let has_sub_aggregations = sub_aggregation.is_empty();
|
||||
|
||||
let blueprint = if has_sub_aggregations {
|
||||
let sub_aggregation =
|
||||
SegmentAggregationResultsCollector::from_req_and_validate(sub_aggregation)?;
|
||||
Some(sub_aggregation)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
Ok(TermBuckets {
|
||||
blueprint,
|
||||
entries: Default::default(),
|
||||
})
|
||||
}
|
||||
|
||||
fn increment_bucket(
|
||||
&mut self,
|
||||
term_ids: &[u64],
|
||||
doc: DocId,
|
||||
sub_aggregation: &AggregationsWithAccessor,
|
||||
bucket_count: &BucketCount,
|
||||
blueprint: &Option<SegmentAggregationResultsCollector>,
|
||||
) -> crate::Result<()> {
|
||||
for &term_id in term_ids {
|
||||
let entry = self.entries.entry(term_id as u32).or_insert_with(|| {
|
||||
bucket_count.add_count(1);
|
||||
|
||||
TermBucketEntry::from_blueprint(blueprint)
|
||||
});
|
||||
entry.doc_count += 1;
|
||||
if let Some(sub_aggregations) = entry.sub_aggregations.as_mut() {
|
||||
sub_aggregations.collect(doc, sub_aggregation)?;
|
||||
}
|
||||
}
|
||||
bucket_count.validate_bucket_count()?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn force_flush(&mut self, agg_with_accessor: &AggregationsWithAccessor) -> crate::Result<()> {
|
||||
for entry in &mut self.entries.values_mut() {
|
||||
if let Some(sub_aggregations) = entry.sub_aggregations.as_mut() {
|
||||
@@ -266,12 +299,13 @@ impl TermBuckets {
|
||||
|
||||
/// The collector puts values from the fast field into the correct buckets and does a conversion to
|
||||
/// the correct datatype.
|
||||
#[derive(Clone, Debug)]
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub struct SegmentTermCollector {
|
||||
/// The buckets containing the aggregation data.
|
||||
term_buckets: TermBuckets,
|
||||
req: TermsAggregationInternal,
|
||||
blueprint: Option<Box<dyn SegmentAggregationCollector>>,
|
||||
field_type: Type,
|
||||
blueprint: Option<SegmentAggregationResultsCollector>,
|
||||
}
|
||||
|
||||
pub(crate) fn get_agg_name_and_property(name: &str) -> (&str, &str) {
|
||||
@@ -283,8 +317,12 @@ impl SegmentTermCollector {
|
||||
pub(crate) fn from_req_and_validate(
|
||||
req: &TermsAggregation,
|
||||
sub_aggregations: &AggregationsWithAccessor,
|
||||
field_type: Type,
|
||||
accessor: &MultiValuedFastFieldReader<u64>,
|
||||
) -> crate::Result<Self> {
|
||||
let term_buckets = TermBuckets::default();
|
||||
let max_term_id = accessor.max_value();
|
||||
let term_buckets =
|
||||
TermBuckets::from_req_and_validate(sub_aggregations, max_term_id as usize)?;
|
||||
|
||||
if let Some(custom_order) = req.order.as_ref() {
|
||||
// Validate sub aggregtion exists
|
||||
@@ -302,7 +340,8 @@ impl SegmentTermCollector {
|
||||
|
||||
let has_sub_aggregations = !sub_aggregations.is_empty();
|
||||
let blueprint = if has_sub_aggregations {
|
||||
let sub_aggregation = build_segment_agg_collector(sub_aggregations)?;
|
||||
let sub_aggregation =
|
||||
SegmentAggregationResultsCollector::from_req_and_validate(sub_aggregations)?;
|
||||
Some(sub_aggregation)
|
||||
} else {
|
||||
None
|
||||
@@ -311,6 +350,7 @@ impl SegmentTermCollector {
|
||||
Ok(SegmentTermCollector {
|
||||
req: TermsAggregationInternal::from_req(req),
|
||||
term_buckets,
|
||||
field_type,
|
||||
blueprint,
|
||||
})
|
||||
}
|
||||
@@ -328,14 +368,7 @@ impl SegmentTermCollector {
|
||||
|
||||
match self.req.order.target {
|
||||
OrderTarget::Key => {
|
||||
// We rely on the fact, that term ordinals match the order of the strings
|
||||
// TODO: We could have a special collector, that keeps only TOP n results at any
|
||||
// time.
|
||||
if self.req.order.order == Order::Desc {
|
||||
entries.sort_unstable_by_key(|bucket| std::cmp::Reverse(bucket.0));
|
||||
} else {
|
||||
entries.sort_unstable_by_key(|bucket| bucket.0);
|
||||
}
|
||||
// defer order and cut_off after loading the texts from the dictionary
|
||||
}
|
||||
OrderTarget::SubAggregation(_name) => {
|
||||
// don't sort and cut off since it's hard to make assumptions on the quality of the
|
||||
@@ -351,40 +384,34 @@ impl SegmentTermCollector {
|
||||
}
|
||||
}
|
||||
|
||||
let (term_doc_count_before_cutoff, mut sum_other_doc_count) = if order_by_sub_aggregation {
|
||||
(0, 0)
|
||||
} else {
|
||||
cut_off_buckets(&mut entries, self.req.segment_size as usize)
|
||||
};
|
||||
let (term_doc_count_before_cutoff, mut sum_other_doc_count) =
|
||||
if order_by_key || order_by_sub_aggregation {
|
||||
(0, 0)
|
||||
} else {
|
||||
cut_off_buckets(&mut entries, self.req.segment_size as usize)
|
||||
};
|
||||
|
||||
let inverted_index = agg_with_accessor
|
||||
.str_dict_column
|
||||
.inverted_index
|
||||
.as_ref()
|
||||
.expect("internal error: inverted index not loaded for term aggregation");
|
||||
let term_dict = inverted_index;
|
||||
let term_dict = inverted_index.terms();
|
||||
|
||||
let mut dict: FxHashMap<String, IntermediateTermBucketEntry> = Default::default();
|
||||
let mut buffer = String::new();
|
||||
let mut buffer = vec![];
|
||||
for (term_id, entry) in entries {
|
||||
if !term_dict.ord_to_str(term_id as u64, &mut buffer)? {
|
||||
return Err(TantivyError::InternalError(format!(
|
||||
"Couldn't find term_id {} in dict",
|
||||
term_id
|
||||
)));
|
||||
}
|
||||
term_dict
|
||||
.ord_to_term(term_id as u64, &mut buffer)
|
||||
.expect("could not find term");
|
||||
dict.insert(
|
||||
buffer.to_string(),
|
||||
String::from_utf8(buffer.to_vec())
|
||||
.map_err(|utf8_err| DataCorruption::comment_only(utf8_err.to_string()))?,
|
||||
entry.into_intermediate_bucket_entry(&agg_with_accessor.sub_aggregation)?,
|
||||
);
|
||||
}
|
||||
if self.req.min_doc_count == 0 {
|
||||
// TODO: Handle rev streaming for descending sorting by keys
|
||||
let mut stream = term_dict.dictionary().stream()?;
|
||||
let mut stream = term_dict.stream()?;
|
||||
while let Some((key, _ord)) = stream.next() {
|
||||
if dict.len() >= self.req.segment_size as usize {
|
||||
break;
|
||||
}
|
||||
|
||||
let key = std::str::from_utf8(key)
|
||||
.map_err(|utf8_err| DataCorruption::comment_only(utf8_err.to_string()))?;
|
||||
if !dict.contains_key(key) {
|
||||
@@ -419,26 +446,65 @@ impl SegmentTermCollector {
|
||||
#[inline]
|
||||
pub(crate) fn collect_block(
|
||||
&mut self,
|
||||
docs: &[DocId],
|
||||
doc: &[DocId],
|
||||
bucket_with_accessor: &BucketAggregationWithAccessor,
|
||||
force_flush: bool,
|
||||
) -> crate::Result<()> {
|
||||
let accessor = &bucket_with_accessor.accessor;
|
||||
let accessor = bucket_with_accessor
|
||||
.accessor
|
||||
.as_multi()
|
||||
.expect("unexpected fast field cardinatility");
|
||||
let mut iter = doc.chunks_exact(4);
|
||||
let mut vals1 = vec![];
|
||||
let mut vals2 = vec![];
|
||||
let mut vals3 = vec![];
|
||||
let mut vals4 = vec![];
|
||||
for docs in iter.by_ref() {
|
||||
accessor.get_vals(docs[0], &mut vals1);
|
||||
accessor.get_vals(docs[1], &mut vals2);
|
||||
accessor.get_vals(docs[2], &mut vals3);
|
||||
accessor.get_vals(docs[3], &mut vals4);
|
||||
|
||||
for doc in docs {
|
||||
for term_id in accessor.values(*doc) {
|
||||
let entry = self
|
||||
.term_buckets
|
||||
.entries
|
||||
.entry(term_id as u32)
|
||||
.or_insert_with(|| TermBucketEntry::from_blueprint(&self.blueprint));
|
||||
entry.doc_count += 1;
|
||||
if let Some(sub_aggregations) = entry.sub_aggregations.as_mut() {
|
||||
sub_aggregations.collect(*doc, &bucket_with_accessor.sub_aggregation)?;
|
||||
}
|
||||
}
|
||||
self.term_buckets.increment_bucket(
|
||||
&vals1,
|
||||
docs[0],
|
||||
&bucket_with_accessor.sub_aggregation,
|
||||
&bucket_with_accessor.bucket_count,
|
||||
&self.blueprint,
|
||||
)?;
|
||||
self.term_buckets.increment_bucket(
|
||||
&vals2,
|
||||
docs[1],
|
||||
&bucket_with_accessor.sub_aggregation,
|
||||
&bucket_with_accessor.bucket_count,
|
||||
&self.blueprint,
|
||||
)?;
|
||||
self.term_buckets.increment_bucket(
|
||||
&vals3,
|
||||
docs[2],
|
||||
&bucket_with_accessor.sub_aggregation,
|
||||
&bucket_with_accessor.bucket_count,
|
||||
&self.blueprint,
|
||||
)?;
|
||||
self.term_buckets.increment_bucket(
|
||||
&vals4,
|
||||
docs[3],
|
||||
&bucket_with_accessor.sub_aggregation,
|
||||
&bucket_with_accessor.bucket_count,
|
||||
&self.blueprint,
|
||||
)?;
|
||||
}
|
||||
for &doc in iter.remainder() {
|
||||
accessor.get_vals(doc, &mut vals1);
|
||||
|
||||
self.term_buckets.increment_bucket(
|
||||
&vals1,
|
||||
doc,
|
||||
&bucket_with_accessor.sub_aggregation,
|
||||
&bucket_with_accessor.bucket_count,
|
||||
&self.blueprint,
|
||||
)?;
|
||||
}
|
||||
if force_flush {
|
||||
self.term_buckets
|
||||
.force_flush(&bucket_with_accessor.sub_aggregation)?;
|
||||
@@ -1141,37 +1207,36 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// TODO reenable with memory limit
|
||||
//#[test]
|
||||
// fn terms_aggregation_term_bucket_limit() -> crate::Result<()> {
|
||||
// let terms: Vec<String> = (0..100_000).map(|el| el.to_string()).collect();
|
||||
// let terms_per_segment = vec![terms.iter().map(|el| el.as_str()).collect()];
|
||||
#[test]
|
||||
fn terms_aggregation_term_bucket_limit() -> crate::Result<()> {
|
||||
let terms: Vec<String> = (0..100_000).map(|el| el.to_string()).collect();
|
||||
let terms_per_segment = vec![terms.iter().map(|el| el.as_str()).collect()];
|
||||
|
||||
// let index = get_test_index_from_terms(true, &terms_per_segment)?;
|
||||
let index = get_test_index_from_terms(true, &terms_per_segment)?;
|
||||
|
||||
// let agg_req: Aggregations = vec![(
|
||||
//"my_texts".to_string(),
|
||||
// Aggregation::Bucket(BucketAggregation {
|
||||
// bucket_agg: BucketAggregationType::Terms(TermsAggregation {
|
||||
// field: "string_id".to_string(),
|
||||
// min_doc_count: Some(0),
|
||||
//..Default::default()
|
||||
//}),
|
||||
// sub_aggregation: Default::default(),
|
||||
//}),
|
||||
//)]
|
||||
//.into_iter()
|
||||
//.collect();
|
||||
let agg_req: Aggregations = vec![(
|
||||
"my_texts".to_string(),
|
||||
Aggregation::Bucket(BucketAggregation {
|
||||
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
|
||||
field: "string_id".to_string(),
|
||||
min_doc_count: Some(0),
|
||||
..Default::default()
|
||||
}),
|
||||
sub_aggregation: Default::default(),
|
||||
}),
|
||||
)]
|
||||
.into_iter()
|
||||
.collect();
|
||||
|
||||
// let res = exec_request_with_query(agg_req, &index, None);
|
||||
let res = exec_request_with_query(agg_req, &index, None);
|
||||
|
||||
// assert!(res.is_err());
|
||||
assert!(res.is_err());
|
||||
|
||||
// Ok(())
|
||||
//}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn terms_aggregation_different_tokenizer_on_ff_test() -> crate::Result<()> {
|
||||
fn terms_aggregation_multi_token_per_doc() -> crate::Result<()> {
|
||||
let terms = vec!["Hello Hello", "Hallo Hallo"];
|
||||
|
||||
let index = get_test_index_from_terms(true, &[terms])?;
|
||||
@@ -1191,13 +1256,12 @@ mod tests {
|
||||
.collect();
|
||||
|
||||
let res = exec_request_with_query(agg_req, &index, None).unwrap();
|
||||
println!("{}", serde_json::to_string_pretty(&res).unwrap());
|
||||
|
||||
assert_eq!(res["my_texts"]["buckets"][0]["key"], "Hallo Hallo");
|
||||
assert_eq!(res["my_texts"]["buckets"][0]["doc_count"], 1);
|
||||
assert_eq!(res["my_texts"]["buckets"][0]["key"], "hello");
|
||||
assert_eq!(res["my_texts"]["buckets"][0]["doc_count"], 2);
|
||||
|
||||
assert_eq!(res["my_texts"]["buckets"][1]["key"], "Hello Hello");
|
||||
assert_eq!(res["my_texts"]["buckets"][1]["doc_count"], 1);
|
||||
assert_eq!(res["my_texts"]["buckets"][1]["key"], "hallo");
|
||||
assert_eq!(res["my_texts"]["buckets"][1]["doc_count"], 2);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -1288,3 +1352,68 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(all(test, feature = "unstable"))]
|
||||
mod bench {
|
||||
|
||||
use itertools::Itertools;
|
||||
use rand::seq::SliceRandom;
|
||||
use rand::thread_rng;
|
||||
|
||||
use super::*;
|
||||
|
||||
fn get_collector_with_buckets(num_docs: u64) -> TermBuckets {
|
||||
TermBuckets::from_req_and_validate(&Default::default(), num_docs as usize).unwrap()
|
||||
}
|
||||
|
||||
fn get_rand_terms(total_terms: u64, num_terms_returned: u64) -> Vec<u64> {
|
||||
let mut rng = thread_rng();
|
||||
|
||||
let all_terms = (0..total_terms - 1).collect_vec();
|
||||
|
||||
let mut vals = vec![];
|
||||
for _ in 0..num_terms_returned {
|
||||
let val = all_terms.as_slice().choose(&mut rng).unwrap();
|
||||
vals.push(*val);
|
||||
}
|
||||
|
||||
vals
|
||||
}
|
||||
|
||||
fn bench_term_buckets(b: &mut test::Bencher, num_terms: u64, total_terms: u64) {
|
||||
let mut collector = get_collector_with_buckets(total_terms);
|
||||
let vals = get_rand_terms(total_terms, num_terms);
|
||||
let aggregations_with_accessor: AggregationsWithAccessor = Default::default();
|
||||
let bucket_count: BucketCount = BucketCount {
|
||||
bucket_count: Default::default(),
|
||||
max_bucket_count: 1_000_001u32,
|
||||
};
|
||||
b.iter(|| {
|
||||
for &val in &vals {
|
||||
collector
|
||||
.increment_bucket(&[val], 0, &aggregations_with_accessor, &bucket_count, &None)
|
||||
.unwrap();
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_term_buckets_500_of_1_000_000(b: &mut test::Bencher) {
|
||||
bench_term_buckets(b, 500u64, 1_000_000u64)
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_term_buckets_1_000_000_of_50_000(b: &mut test::Bencher) {
|
||||
bench_term_buckets(b, 1_000_000u64, 50_000u64)
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_term_buckets_1_000_000_of_50(b: &mut test::Bencher) {
|
||||
bench_term_buckets(b, 1_000_000u64, 50u64)
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_term_buckets_1_000_000_of_1_000_000(b: &mut test::Bencher) {
|
||||
bench_term_buckets(b, 1_000_000u64, 1_000_000u64)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,10 +4,7 @@ use super::agg_req::Aggregations;
|
||||
use super::agg_req_with_accessor::AggregationsWithAccessor;
|
||||
use super::agg_result::AggregationResults;
|
||||
use super::intermediate_agg_result::IntermediateAggregationResults;
|
||||
use super::segment_agg_result::{
|
||||
build_segment_agg_collector, GenericSegmentAggregationResultsCollector,
|
||||
SegmentAggregationCollector,
|
||||
};
|
||||
use super::segment_agg_result::SegmentAggregationResultsCollector;
|
||||
use crate::aggregation::agg_req_with_accessor::get_aggs_with_accessor_and_validate;
|
||||
use crate::collector::{Collector, SegmentCollector};
|
||||
use crate::schema::Schema;
|
||||
@@ -140,7 +137,7 @@ fn merge_fruits(
|
||||
/// `AggregationSegmentCollector` does the aggregation collection on a segment.
|
||||
pub struct AggregationSegmentCollector {
|
||||
aggs_with_accessor: AggregationsWithAccessor,
|
||||
result: Box<dyn SegmentAggregationCollector>,
|
||||
result: SegmentAggregationResultsCollector,
|
||||
error: Option<TantivyError>,
|
||||
}
|
||||
|
||||
@@ -154,7 +151,8 @@ impl AggregationSegmentCollector {
|
||||
) -> crate::Result<Self> {
|
||||
let aggs_with_accessor =
|
||||
get_aggs_with_accessor_and_validate(agg, reader, Rc::default(), max_bucket_count)?;
|
||||
let result = build_segment_agg_collector(&aggs_with_accessor)?;
|
||||
let result =
|
||||
SegmentAggregationResultsCollector::from_req_and_validate(&aggs_with_accessor)?;
|
||||
Ok(AggregationSegmentCollector {
|
||||
aggs_with_accessor,
|
||||
result,
|
||||
|
||||
@@ -222,23 +222,24 @@ pub enum IntermediateMetricResult {
|
||||
|
||||
impl From<SegmentMetricResultCollector> for IntermediateMetricResult {
|
||||
fn from(tree: SegmentMetricResultCollector) -> Self {
|
||||
use super::metric::SegmentStatsType;
|
||||
match tree {
|
||||
SegmentMetricResultCollector::Stats(collector) => match collector.collecting_for {
|
||||
SegmentStatsType::Average => IntermediateMetricResult::Average(
|
||||
super::metric::SegmentStatsType::Average => IntermediateMetricResult::Average(
|
||||
IntermediateAverage::from_collector(collector),
|
||||
),
|
||||
SegmentStatsType::Count => {
|
||||
super::metric::SegmentStatsType::Count => {
|
||||
IntermediateMetricResult::Count(IntermediateCount::from_collector(collector))
|
||||
}
|
||||
SegmentStatsType::Max => {
|
||||
super::metric::SegmentStatsType::Max => {
|
||||
IntermediateMetricResult::Max(IntermediateMax::from_collector(collector))
|
||||
}
|
||||
SegmentStatsType::Min => {
|
||||
super::metric::SegmentStatsType::Min => {
|
||||
IntermediateMetricResult::Min(IntermediateMin::from_collector(collector))
|
||||
}
|
||||
SegmentStatsType::Stats => IntermediateMetricResult::Stats(collector.stats),
|
||||
SegmentStatsType::Sum => {
|
||||
super::metric::SegmentStatsType::Stats => {
|
||||
IntermediateMetricResult::Stats(collector.stats)
|
||||
}
|
||||
super::metric::SegmentStatsType::Sum => {
|
||||
IntermediateMetricResult::Sum(IntermediateSum::from_collector(collector))
|
||||
}
|
||||
},
|
||||
|
||||
@@ -1,17 +1,10 @@
|
||||
use columnar::Column;
|
||||
use fastfield_codecs::Column;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::aggregation::agg_req_with_accessor::AggregationsWithAccessor;
|
||||
use crate::aggregation::intermediate_agg_result::{
|
||||
IntermediateAggregationResults, IntermediateMetricResult,
|
||||
};
|
||||
use crate::aggregation::segment_agg_result::SegmentAggregationCollector;
|
||||
use crate::aggregation::{f64_from_fastfield_u64, VecWithNames};
|
||||
use crate::aggregation::f64_from_fastfield_u64;
|
||||
use crate::schema::Type;
|
||||
use crate::{DocId, TantivyError};
|
||||
|
||||
use super::*;
|
||||
|
||||
/// A multi-value metric aggregation that computes a collection of statistics on numeric values that
|
||||
/// are extracted from the aggregated documents.
|
||||
/// See [`Stats`] for returned statistics.
|
||||
@@ -167,74 +160,27 @@ impl SegmentStatsCollector {
|
||||
stats: IntermediateStats::default(),
|
||||
}
|
||||
}
|
||||
pub(crate) fn collect_block(&mut self, docs: &[DocId], field: &Column<u64>) {
|
||||
// TODO special case for Required, Optional column type
|
||||
for doc in docs {
|
||||
for val in field.values(*doc) {
|
||||
let val1 = f64_from_fastfield_u64(val, &self.field_type);
|
||||
self.stats.collect(val1);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl SegmentAggregationCollector for SegmentStatsCollector {
|
||||
fn into_intermediate_aggregations_result(
|
||||
self: Box<Self>,
|
||||
agg_with_accessor: &AggregationsWithAccessor,
|
||||
) -> crate::Result<IntermediateAggregationResults> {
|
||||
let name = agg_with_accessor.metrics.keys[0].to_string();
|
||||
|
||||
let intermediate_metric_result = match self.collecting_for {
|
||||
SegmentStatsType::Average => {
|
||||
IntermediateMetricResult::Average(IntermediateAverage::from_collector(*self))
|
||||
}
|
||||
SegmentStatsType::Count => {
|
||||
IntermediateMetricResult::Count(IntermediateCount::from_collector(*self))
|
||||
}
|
||||
SegmentStatsType::Max => {
|
||||
IntermediateMetricResult::Max(IntermediateMax::from_collector(*self))
|
||||
}
|
||||
SegmentStatsType::Min => {
|
||||
IntermediateMetricResult::Min(IntermediateMin::from_collector(*self))
|
||||
}
|
||||
SegmentStatsType::Stats => IntermediateMetricResult::Stats(self.stats),
|
||||
SegmentStatsType::Sum => {
|
||||
IntermediateMetricResult::Sum(IntermediateSum::from_collector(*self))
|
||||
}
|
||||
};
|
||||
|
||||
let metrics = Some(VecWithNames::from_entries(vec![(
|
||||
name,
|
||||
intermediate_metric_result,
|
||||
)]));
|
||||
|
||||
Ok(IntermediateAggregationResults {
|
||||
metrics,
|
||||
buckets: None,
|
||||
})
|
||||
}
|
||||
|
||||
fn collect(
|
||||
&mut self,
|
||||
doc: crate::DocId,
|
||||
agg_with_accessor: &AggregationsWithAccessor,
|
||||
) -> crate::Result<()> {
|
||||
let accessor = &agg_with_accessor.metrics.values[0].accessor;
|
||||
for val in accessor.values(doc) {
|
||||
let val1 = f64_from_fastfield_u64(val, &self.field_type);
|
||||
pub(crate) fn collect_block(&mut self, doc: &[DocId], field: &dyn Column<u64>) {
|
||||
let mut iter = doc.chunks_exact(4);
|
||||
for docs in iter.by_ref() {
|
||||
let val1 = field.get_val(docs[0]);
|
||||
let val2 = field.get_val(docs[1]);
|
||||
let val3 = field.get_val(docs[2]);
|
||||
let val4 = field.get_val(docs[3]);
|
||||
let val1 = f64_from_fastfield_u64(val1, &self.field_type);
|
||||
let val2 = f64_from_fastfield_u64(val2, &self.field_type);
|
||||
let val3 = f64_from_fastfield_u64(val3, &self.field_type);
|
||||
let val4 = f64_from_fastfield_u64(val4, &self.field_type);
|
||||
self.stats.collect(val1);
|
||||
self.stats.collect(val2);
|
||||
self.stats.collect(val3);
|
||||
self.stats.collect(val4);
|
||||
}
|
||||
for &doc in iter.remainder() {
|
||||
let val = field.get_val(doc);
|
||||
let val = f64_from_fastfield_u64(val, &self.field_type);
|
||||
self.stats.collect(val);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn flush_staged_docs(
|
||||
&mut self,
|
||||
_agg_with_accessor: &AggregationsWithAccessor,
|
||||
_force_flush: bool,
|
||||
) -> crate::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -172,8 +172,8 @@ pub use collector::{
|
||||
AggregationCollector, AggregationSegmentCollector, DistributedAggregationCollector,
|
||||
MAX_BUCKET_COUNT,
|
||||
};
|
||||
use columnar::MonotonicallyMappableToU64;
|
||||
pub(crate) use date::format_date;
|
||||
use fastfield_codecs::MonotonicallyMappableToU64;
|
||||
use itertools::Itertools;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
@@ -182,7 +182,7 @@ use crate::schema::Type;
|
||||
/// Represents an associative array `(key => values)` in a very efficient manner.
|
||||
#[derive(Clone, PartialEq, Serialize, Deserialize)]
|
||||
pub(crate) struct VecWithNames<T: Clone> {
|
||||
pub(crate) values: Vec<T>,
|
||||
values: Vec<T>,
|
||||
keys: Vec<String>,
|
||||
}
|
||||
|
||||
@@ -248,6 +248,9 @@ impl<T: Clone> VecWithNames<T> {
|
||||
fn values_mut(&mut self) -> impl Iterator<Item = &mut T> + '_ {
|
||||
self.values.iter_mut()
|
||||
}
|
||||
fn entries(&self) -> impl Iterator<Item = (&str, &T)> + '_ {
|
||||
self.keys().zip(self.values.iter())
|
||||
}
|
||||
fn is_empty(&self) -> bool {
|
||||
self.keys.is_empty()
|
||||
}
|
||||
@@ -333,9 +336,8 @@ mod tests {
|
||||
use crate::aggregation::intermediate_agg_result::IntermediateAggregationResults;
|
||||
use crate::aggregation::segment_agg_result::DOC_BLOCK_SIZE;
|
||||
use crate::aggregation::DistributedAggregationCollector;
|
||||
use crate::indexer::NoMergePolicy;
|
||||
use crate::query::{AllQuery, TermQuery};
|
||||
use crate::schema::{IndexRecordOption, Schema, TextFieldIndexing, FAST, STRING};
|
||||
use crate::schema::{Cardinality, IndexRecordOption, Schema, TextFieldIndexing, FAST, STRING};
|
||||
use crate::{DateTime, Index, Term};
|
||||
|
||||
fn get_avg_req(field_name: &str) -> Aggregation {
|
||||
@@ -430,7 +432,8 @@ mod tests {
|
||||
let text_field = schema_builder.add_text_field("text", text_fieldtype.clone());
|
||||
let text_field_id = schema_builder.add_text_field("text_id", text_fieldtype);
|
||||
let string_field_id = schema_builder.add_text_field("string_id", STRING | FAST);
|
||||
let score_fieldtype = crate::schema::NumericOptions::default().set_fast();
|
||||
let score_fieldtype =
|
||||
crate::schema::NumericOptions::default().set_fast();
|
||||
let score_field = schema_builder.add_u64_field("score", score_fieldtype.clone());
|
||||
let score_field_f64 = schema_builder.add_f64_field("score_f64", score_fieldtype.clone());
|
||||
let score_field_i64 = schema_builder.add_i64_field("score_i64", score_fieldtype);
|
||||
@@ -442,7 +445,6 @@ mod tests {
|
||||
{
|
||||
// let mut index_writer = index.writer_for_tests()?;
|
||||
let mut index_writer = index.writer_with_num_threads(1, 30_000_000)?;
|
||||
index_writer.set_merge_policy(Box::new(NoMergePolicy));
|
||||
for values in segment_and_values {
|
||||
for (i, term) in values {
|
||||
let i = *i;
|
||||
@@ -654,11 +656,13 @@ mod tests {
|
||||
let text_field = schema_builder.add_text_field("text", text_fieldtype);
|
||||
let date_field = schema_builder.add_date_field("date", FAST);
|
||||
schema_builder.add_text_field("dummy_text", STRING);
|
||||
let score_fieldtype = crate::schema::NumericOptions::default().set_fast();
|
||||
let score_fieldtype =
|
||||
crate::schema::NumericOptions::default().set_fast();
|
||||
let score_field = schema_builder.add_u64_field("score", score_fieldtype.clone());
|
||||
let score_field_f64 = schema_builder.add_f64_field("score_f64", score_fieldtype.clone());
|
||||
|
||||
let multivalue = crate::schema::NumericOptions::default().set_fast();
|
||||
let multivalue =
|
||||
crate::schema::NumericOptions::default().set_fast();
|
||||
let scores_field_i64 = schema_builder.add_i64_field("scores_i64", multivalue);
|
||||
|
||||
let score_field_i64 = schema_builder.add_i64_field("score_i64", score_fieldtype);
|
||||
@@ -1164,9 +1168,6 @@ mod tests {
|
||||
#[cfg(all(test, feature = "unstable"))]
|
||||
mod bench {
|
||||
|
||||
use crate::aggregation::bucket::CustomOrder;
|
||||
use crate::aggregation::bucket::Order;
|
||||
use crate::aggregation::bucket::OrderTarget;
|
||||
use rand::prelude::SliceRandom;
|
||||
use rand::{thread_rng, Rng};
|
||||
use test::{self, Bencher};
|
||||
@@ -1176,7 +1177,7 @@ mod tests {
|
||||
use crate::aggregation::metric::StatsAggregation;
|
||||
use crate::query::AllQuery;
|
||||
|
||||
fn get_test_index_bench(_merge_segments: bool) -> crate::Result<Index> {
|
||||
fn get_test_index_bench(merge_segments: bool) -> crate::Result<Index> {
|
||||
let mut schema_builder = Schema::builder();
|
||||
let text_fieldtype = crate::schema::TextOptions::default()
|
||||
.set_indexing_options(
|
||||
@@ -1188,19 +1189,20 @@ mod tests {
|
||||
schema_builder.add_text_field("text_many_terms", STRING | FAST);
|
||||
let text_field_few_terms =
|
||||
schema_builder.add_text_field("text_few_terms", STRING | FAST);
|
||||
let score_fieldtype = crate::schema::NumericOptions::default().set_fast();
|
||||
let score_fieldtype =
|
||||
crate::schema::NumericOptions::default().set_fast();
|
||||
let score_field = schema_builder.add_u64_field("score", score_fieldtype.clone());
|
||||
let score_field_f64 =
|
||||
schema_builder.add_f64_field("score_f64", score_fieldtype.clone());
|
||||
let score_field_i64 = schema_builder.add_i64_field("score_i64", score_fieldtype);
|
||||
let index = Index::create_from_tempdir(schema_builder.build())?;
|
||||
let few_terms_data = vec!["INFO", "ERROR", "WARN", "DEBUG"];
|
||||
let many_terms_data = (0..150_000)
|
||||
let many_terms_data = (0..15_000)
|
||||
.map(|num| format!("author{}", num))
|
||||
.collect::<Vec<_>>();
|
||||
{
|
||||
let mut rng = thread_rng();
|
||||
let mut index_writer = index.writer_with_num_threads(1, 100_000_000)?;
|
||||
let mut index_writer = index.writer_for_tests()?;
|
||||
// writing the segment
|
||||
for _ in 0..1_000_000 {
|
||||
let val: f64 = rng.gen_range(0.0..1_000_000.0);
|
||||
@@ -1215,6 +1217,14 @@ mod tests {
|
||||
}
|
||||
index_writer.commit()?;
|
||||
}
|
||||
if merge_segments {
|
||||
let segment_ids = index
|
||||
.searchable_segment_ids()
|
||||
.expect("Searchable segments failed.");
|
||||
let mut index_writer = index.writer_for_tests()?;
|
||||
index_writer.merge(&segment_ids).wait()?;
|
||||
index_writer.wait_merging_threads()?;
|
||||
}
|
||||
|
||||
Ok(index)
|
||||
}
|
||||
@@ -1366,42 +1376,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_aggregation_terms_many_with_sub_agg(b: &mut Bencher) {
|
||||
let index = get_test_index_bench(false).unwrap();
|
||||
let reader = index.reader().unwrap();
|
||||
|
||||
b.iter(|| {
|
||||
let sub_agg_req: Aggregations = vec![(
|
||||
"average_f64".to_string(),
|
||||
Aggregation::Metric(MetricAggregation::Average(
|
||||
AverageAggregation::from_field_name("score_f64".to_string()),
|
||||
)),
|
||||
)]
|
||||
.into_iter()
|
||||
.collect();
|
||||
|
||||
let agg_req: Aggregations = vec![(
|
||||
"my_texts".to_string(),
|
||||
Aggregation::Bucket(BucketAggregation {
|
||||
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
|
||||
field: "text_many_terms".to_string(),
|
||||
..Default::default()
|
||||
}),
|
||||
sub_aggregation: sub_agg_req,
|
||||
}),
|
||||
)]
|
||||
.into_iter()
|
||||
.collect();
|
||||
|
||||
let collector = AggregationCollector::from_aggs(agg_req, None, index.schema());
|
||||
|
||||
let searcher = reader.searcher();
|
||||
searcher.search(&AllQuery, &collector).unwrap()
|
||||
});
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_aggregation_terms_many2(b: &mut Bencher) {
|
||||
fn bench_aggregation_terms_many(b: &mut Bencher) {
|
||||
let index = get_test_index_bench(false).unwrap();
|
||||
let reader = index.reader().unwrap();
|
||||
|
||||
@@ -1426,36 +1401,6 @@ mod tests {
|
||||
});
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_aggregation_terms_many_order_by_term(b: &mut Bencher) {
|
||||
let index = get_test_index_bench(false).unwrap();
|
||||
let reader = index.reader().unwrap();
|
||||
|
||||
b.iter(|| {
|
||||
let agg_req: Aggregations = vec![(
|
||||
"my_texts".to_string(),
|
||||
Aggregation::Bucket(BucketAggregation {
|
||||
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
|
||||
field: "text_many_terms".to_string(),
|
||||
order: Some(CustomOrder {
|
||||
order: Order::Desc,
|
||||
target: OrderTarget::Key,
|
||||
}),
|
||||
..Default::default()
|
||||
}),
|
||||
sub_aggregation: Default::default(),
|
||||
}),
|
||||
)]
|
||||
.into_iter()
|
||||
.collect();
|
||||
|
||||
let collector = AggregationCollector::from_aggs(agg_req, None, index.schema());
|
||||
|
||||
let searcher = reader.searcher();
|
||||
searcher.search(&AllQuery, &collector).unwrap()
|
||||
});
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_aggregation_range_only(b: &mut Bencher) {
|
||||
let index = get_test_index_bench(false).unwrap();
|
||||
|
||||
@@ -25,90 +25,15 @@ use crate::{DocId, TantivyError};
|
||||
pub(crate) const DOC_BLOCK_SIZE: usize = 64;
|
||||
pub(crate) type DocBlock = [DocId; DOC_BLOCK_SIZE];
|
||||
|
||||
pub(crate) trait SegmentAggregationCollector: CollectorClone + Debug {
|
||||
fn into_intermediate_aggregations_result(
|
||||
self: Box<Self>,
|
||||
agg_with_accessor: &AggregationsWithAccessor,
|
||||
) -> crate::Result<IntermediateAggregationResults>;
|
||||
|
||||
fn collect(
|
||||
&mut self,
|
||||
doc: crate::DocId,
|
||||
agg_with_accessor: &AggregationsWithAccessor,
|
||||
) -> crate::Result<()>;
|
||||
|
||||
fn flush_staged_docs(
|
||||
&mut self,
|
||||
agg_with_accessor: &AggregationsWithAccessor,
|
||||
force_flush: bool,
|
||||
) -> crate::Result<()>;
|
||||
}
|
||||
|
||||
pub(crate) trait CollectorClone {
|
||||
fn clone_box(&self) -> Box<dyn SegmentAggregationCollector>;
|
||||
}
|
||||
|
||||
impl<T> CollectorClone for T
|
||||
where
|
||||
T: 'static + SegmentAggregationCollector + Clone,
|
||||
{
|
||||
fn clone_box(&self) -> Box<dyn SegmentAggregationCollector> {
|
||||
Box::new(self.clone())
|
||||
}
|
||||
}
|
||||
|
||||
impl Clone for Box<dyn SegmentAggregationCollector> {
|
||||
fn clone(&self) -> Box<dyn SegmentAggregationCollector> {
|
||||
self.clone_box()
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn build_segment_agg_collector(
|
||||
req: &AggregationsWithAccessor,
|
||||
) -> crate::Result<Box<dyn SegmentAggregationCollector>> {
|
||||
// Single metric special case
|
||||
if req.buckets.is_empty() && req.metrics.len() == 1 {
|
||||
let req = &req.metrics.values[0];
|
||||
let stats_collector = match &req.metric {
|
||||
MetricAggregation::Average(AverageAggregation { .. }) => {
|
||||
SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Average)
|
||||
}
|
||||
MetricAggregation::Count(CountAggregation { .. }) => {
|
||||
SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Count)
|
||||
}
|
||||
MetricAggregation::Max(MaxAggregation { .. }) => {
|
||||
SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Max)
|
||||
}
|
||||
MetricAggregation::Min(MinAggregation { .. }) => {
|
||||
SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Min)
|
||||
}
|
||||
MetricAggregation::Stats(StatsAggregation { .. }) => {
|
||||
SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Stats)
|
||||
}
|
||||
MetricAggregation::Sum(SumAggregation { .. }) => {
|
||||
SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Sum)
|
||||
}
|
||||
};
|
||||
|
||||
return Ok(Box::new(stats_collector));
|
||||
}
|
||||
|
||||
let agg = GenericSegmentAggregationResultsCollector::from_req_and_validate(req)?;
|
||||
Ok(Box::new(agg))
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
/// The GenericSegmentAggregationResultsCollector is the generic version of the collector, which
|
||||
/// can handle arbitrary complexity of sub-aggregations. Ideally we never have to pick this one
|
||||
/// and can provide specialized versions instead, that remove some of its overhead.
|
||||
pub(crate) struct GenericSegmentAggregationResultsCollector {
|
||||
#[derive(Clone, PartialEq)]
|
||||
pub(crate) struct SegmentAggregationResultsCollector {
|
||||
pub(crate) metrics: Option<VecWithNames<SegmentMetricResultCollector>>,
|
||||
pub(crate) buckets: Option<VecWithNames<SegmentBucketResultCollector>>,
|
||||
staged_docs: DocBlock,
|
||||
num_staged_docs: usize,
|
||||
}
|
||||
|
||||
impl Default for GenericSegmentAggregationResultsCollector {
|
||||
impl Default for SegmentAggregationResultsCollector {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
metrics: Default::default(),
|
||||
@@ -119,7 +44,7 @@ impl Default for GenericSegmentAggregationResultsCollector {
|
||||
}
|
||||
}
|
||||
|
||||
impl Debug for GenericSegmentAggregationResultsCollector {
|
||||
impl Debug for SegmentAggregationResultsCollector {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("SegmentAggregationResultsCollector")
|
||||
.field("metrics", &self.metrics)
|
||||
@@ -130,9 +55,9 @@ impl Debug for GenericSegmentAggregationResultsCollector {
|
||||
}
|
||||
}
|
||||
|
||||
impl SegmentAggregationCollector for GenericSegmentAggregationResultsCollector {
|
||||
fn into_intermediate_aggregations_result(
|
||||
self: Box<Self>,
|
||||
impl SegmentAggregationResultsCollector {
|
||||
pub fn into_intermediate_aggregations_result(
|
||||
self,
|
||||
agg_with_accessor: &AggregationsWithAccessor,
|
||||
) -> crate::Result<IntermediateAggregationResults> {
|
||||
let buckets = if let Some(buckets) = self.buckets {
|
||||
@@ -150,7 +75,47 @@ impl SegmentAggregationCollector for GenericSegmentAggregationResultsCollector {
|
||||
Ok(IntermediateAggregationResults { metrics, buckets })
|
||||
}
|
||||
|
||||
fn collect(
|
||||
pub(crate) fn from_req_and_validate(req: &AggregationsWithAccessor) -> crate::Result<Self> {
|
||||
let buckets = req
|
||||
.buckets
|
||||
.entries()
|
||||
.map(|(key, req)| {
|
||||
Ok((
|
||||
key.to_string(),
|
||||
SegmentBucketResultCollector::from_req_and_validate(req)?,
|
||||
))
|
||||
})
|
||||
.collect::<crate::Result<Vec<(String, _)>>>()?;
|
||||
let metrics = req
|
||||
.metrics
|
||||
.entries()
|
||||
.map(|(key, req)| {
|
||||
Ok((
|
||||
key.to_string(),
|
||||
SegmentMetricResultCollector::from_req_and_validate(req)?,
|
||||
))
|
||||
})
|
||||
.collect::<crate::Result<Vec<(String, _)>>>()?;
|
||||
let metrics = if metrics.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(VecWithNames::from_entries(metrics))
|
||||
};
|
||||
let buckets = if buckets.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(VecWithNames::from_entries(buckets))
|
||||
};
|
||||
Ok(SegmentAggregationResultsCollector {
|
||||
metrics,
|
||||
buckets,
|
||||
staged_docs: [0; DOC_BLOCK_SIZE],
|
||||
num_staged_docs: 0,
|
||||
})
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub(crate) fn collect(
|
||||
&mut self,
|
||||
doc: crate::DocId,
|
||||
agg_with_accessor: &AggregationsWithAccessor,
|
||||
@@ -163,7 +128,7 @@ impl SegmentAggregationCollector for GenericSegmentAggregationResultsCollector {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn flush_staged_docs(
|
||||
pub(crate) fn flush_staged_docs(
|
||||
&mut self,
|
||||
agg_with_accessor: &AggregationsWithAccessor,
|
||||
force_flush: bool,
|
||||
@@ -197,66 +162,6 @@ impl SegmentAggregationCollector for GenericSegmentAggregationResultsCollector {
|
||||
}
|
||||
}
|
||||
|
||||
impl GenericSegmentAggregationResultsCollector {
|
||||
pub fn into_intermediate_aggregations_result(
|
||||
self,
|
||||
agg_with_accessor: &AggregationsWithAccessor,
|
||||
) -> crate::Result<IntermediateAggregationResults> {
|
||||
let buckets = if let Some(buckets) = self.buckets {
|
||||
let entries = buckets
|
||||
.into_iter()
|
||||
.zip(agg_with_accessor.buckets.values())
|
||||
.map(|((key, bucket), acc)| Ok((key, bucket.into_intermediate_bucket_result(acc)?)))
|
||||
.collect::<crate::Result<Vec<(String, _)>>>()?;
|
||||
Some(VecWithNames::from_entries(entries))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let metrics = self.metrics.map(VecWithNames::from_other);
|
||||
|
||||
Ok(IntermediateAggregationResults { metrics, buckets })
|
||||
}
|
||||
|
||||
pub(crate) fn from_req_and_validate(req: &AggregationsWithAccessor) -> crate::Result<Self> {
|
||||
let buckets = req
|
||||
.buckets
|
||||
.iter()
|
||||
.map(|(key, req)| {
|
||||
Ok((
|
||||
key.to_string(),
|
||||
SegmentBucketResultCollector::from_req_and_validate(req)?,
|
||||
))
|
||||
})
|
||||
.collect::<crate::Result<Vec<(String, _)>>>()?;
|
||||
let metrics = req
|
||||
.metrics
|
||||
.iter()
|
||||
.map(|(key, req)| {
|
||||
Ok((
|
||||
key.to_string(),
|
||||
SegmentMetricResultCollector::from_req_and_validate(req)?,
|
||||
))
|
||||
})
|
||||
.collect::<crate::Result<Vec<(String, _)>>>()?;
|
||||
let metrics = if metrics.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(VecWithNames::from_entries(metrics))
|
||||
};
|
||||
let buckets = if buckets.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(VecWithNames::from_entries(buckets))
|
||||
};
|
||||
Ok(GenericSegmentAggregationResultsCollector {
|
||||
metrics,
|
||||
buckets,
|
||||
staged_docs: [0; DOC_BLOCK_SIZE],
|
||||
num_staged_docs: 0,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub(crate) enum SegmentMetricResultCollector {
|
||||
Stats(SegmentStatsCollector),
|
||||
@@ -300,7 +205,7 @@ impl SegmentMetricResultCollector {
|
||||
pub(crate) fn collect_block(&mut self, doc: &[DocId], metric: &MetricAggregationWithAccessor) {
|
||||
match self {
|
||||
SegmentMetricResultCollector::Stats(stats_collector) => {
|
||||
stats_collector.collect_block(doc, &metric.accessor);
|
||||
stats_collector.collect_block(doc, &*metric.accessor);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -310,7 +215,7 @@ impl SegmentMetricResultCollector {
|
||||
/// segments.
|
||||
/// The typical structure of Map<Key, Bucket> is not suitable during collection for performance
|
||||
/// reasons.
|
||||
#[derive(Clone, Debug)]
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub(crate) enum SegmentBucketResultCollector {
|
||||
Range(SegmentRangeCollector),
|
||||
Histogram(Box<SegmentHistogramCollector>),
|
||||
@@ -338,7 +243,14 @@ impl SegmentBucketResultCollector {
|
||||
pub fn from_req_and_validate(req: &BucketAggregationWithAccessor) -> crate::Result<Self> {
|
||||
match &req.bucket_agg {
|
||||
BucketAggregationType::Terms(terms_req) => Ok(Self::Terms(Box::new(
|
||||
SegmentTermCollector::from_req_and_validate(terms_req, &req.sub_aggregation)?,
|
||||
SegmentTermCollector::from_req_and_validate(
|
||||
terms_req,
|
||||
&req.sub_aggregation,
|
||||
req.field_type,
|
||||
req.accessor
|
||||
.as_multi()
|
||||
.expect("unexpected fast field cardinality"),
|
||||
)?,
|
||||
))),
|
||||
BucketAggregationType::Range(range_req) => {
|
||||
Ok(Self::Range(SegmentRangeCollector::from_req_and_validate(
|
||||
@@ -353,7 +265,9 @@ impl SegmentBucketResultCollector {
|
||||
histogram,
|
||||
&req.sub_aggregation,
|
||||
req.field_type,
|
||||
&req.accessor,
|
||||
req.accessor
|
||||
.as_single()
|
||||
.expect("unexpected fast field cardinality"),
|
||||
)?,
|
||||
))),
|
||||
}
|
||||
|
||||
@@ -829,7 +829,7 @@ mod bench {
|
||||
let reader = index.reader().unwrap();
|
||||
b.iter(|| {
|
||||
let searcher = reader.searcher();
|
||||
let facet_collector = FacetCollector::for_field("facet");
|
||||
let facet_collector = FacetCollector::for_field(facet_field);
|
||||
searcher.search(&AllQuery, &facet_collector).unwrap();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -12,7 +12,8 @@
|
||||
use std::marker::PhantomData;
|
||||
use std::sync::Arc;
|
||||
|
||||
use columnar::{ColumnValues, DynamicColumn, HasAssociatedColumnType};
|
||||
use columnar::{DynamicColumn, HasAssociatedColumnType};
|
||||
use fastfield_codecs::Column;
|
||||
|
||||
use crate::collector::{Collector, SegmentCollector};
|
||||
use crate::schema::Field;
|
||||
@@ -151,7 +152,7 @@ where
|
||||
TPredicate: 'static,
|
||||
DynamicColumn: Into<Option<columnar::Column<TPredicateValue>>>,
|
||||
{
|
||||
fast_field_reader: Arc<dyn ColumnValues<TPredicateValue>>,
|
||||
fast_field_reader: Arc<dyn Column<TPredicateValue>>,
|
||||
segment_collector: TSegmentCollector,
|
||||
predicate: TPredicate,
|
||||
t_predicate_value: PhantomData<TPredicateValue>,
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use columnar::ColumnValues;
|
||||
use fastdivide::DividerU64;
|
||||
use fastfield_codecs::Column;
|
||||
|
||||
use crate::collector::{Collector, SegmentCollector};
|
||||
use crate::fastfield::{FastFieldNotAvailableError, FastValue};
|
||||
@@ -87,7 +87,7 @@ impl HistogramComputer {
|
||||
}
|
||||
pub struct SegmentHistogramCollector {
|
||||
histogram_computer: HistogramComputer,
|
||||
column_u64: Arc<dyn ColumnValues<u64>>,
|
||||
column_u64: Arc<dyn Column<u64>>,
|
||||
}
|
||||
|
||||
impl SegmentCollector for SegmentHistogramCollector {
|
||||
|
||||
@@ -1,12 +1,12 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use columnar::ColumnValues;
|
||||
use fastfield_codecs::Column;
|
||||
|
||||
use super::*;
|
||||
use crate::collector::{Count, FilterCollector, TopDocs};
|
||||
use crate::core::SegmentReader;
|
||||
use crate::query::{AllQuery, QueryParser};
|
||||
use crate::schema::{Schema, FAST, TEXT};
|
||||
use crate::schema::{Field, Schema, FAST, TEXT};
|
||||
use crate::time::format_description::well_known::Rfc3339;
|
||||
use crate::time::OffsetDateTime;
|
||||
use crate::{doc, DateTime, DocAddress, DocId, Document, Index, Score, Searcher, SegmentOrdinal};
|
||||
@@ -160,7 +160,7 @@ pub struct FastFieldTestCollector {
|
||||
|
||||
pub struct FastFieldSegmentCollector {
|
||||
vals: Vec<u64>,
|
||||
reader: Arc<dyn columnar::ColumnValues>,
|
||||
reader: Arc<dyn Column<u64>>,
|
||||
}
|
||||
|
||||
impl FastFieldTestCollector {
|
||||
|
||||
@@ -3,7 +3,7 @@ use std::fmt;
|
||||
use std::marker::PhantomData;
|
||||
use std::sync::Arc;
|
||||
|
||||
use columnar::ColumnValues;
|
||||
use fastfield_codecs::Column;
|
||||
|
||||
use super::Collector;
|
||||
use crate::collector::custom_score_top_collector::CustomScoreTopCollector;
|
||||
@@ -14,6 +14,7 @@ use crate::collector::{
|
||||
};
|
||||
use crate::fastfield::{FastFieldNotAvailableError, FastValue};
|
||||
use crate::query::Weight;
|
||||
use crate::schema::Field;
|
||||
use crate::{DocAddress, DocId, Score, SegmentOrdinal, SegmentReader, TantivyError};
|
||||
|
||||
struct FastFieldConvertCollector<
|
||||
@@ -132,7 +133,7 @@ impl fmt::Debug for TopDocs {
|
||||
}
|
||||
|
||||
struct ScorerByFastFieldReader {
|
||||
sort_column: Arc<dyn ColumnValues<u64>>,
|
||||
sort_column: Arc<dyn Column<u64>>,
|
||||
}
|
||||
|
||||
impl CustomSegmentScorer<u64> for ScorerByFastFieldReader {
|
||||
|
||||
@@ -249,7 +249,7 @@ impl SearcherInner {
|
||||
index: Index,
|
||||
segment_readers: Vec<SegmentReader>,
|
||||
generation: TrackedObject<SearcherGeneration>,
|
||||
doc_store_cache_num_blocks: usize,
|
||||
doc_store_cache_size: usize,
|
||||
) -> io::Result<SearcherInner> {
|
||||
assert_eq!(
|
||||
&segment_readers
|
||||
@@ -261,7 +261,7 @@ impl SearcherInner {
|
||||
);
|
||||
let store_readers: Vec<StoreReader> = segment_readers
|
||||
.iter()
|
||||
.map(|segment_reader| segment_reader.get_store_reader(doc_store_cache_num_blocks))
|
||||
.map(|segment_reader| segment_reader.get_store_reader(doc_store_cache_size))
|
||||
.collect::<io::Result<Vec<_>>>()?;
|
||||
|
||||
Ok(SearcherInner {
|
||||
|
||||
@@ -128,12 +128,9 @@ impl SegmentReader {
|
||||
&self.fieldnorm_readers
|
||||
}
|
||||
|
||||
/// Accessor to the segment's [`StoreReader`](crate::store::StoreReader).
|
||||
///
|
||||
/// `cache_num_blocks` sets the number of decompressed blocks to be cached in an LRU.
|
||||
/// The size of blocks is configurable, this should be reflexted in the
|
||||
pub fn get_store_reader(&self, cache_num_blocks: usize) -> io::Result<StoreReader> {
|
||||
StoreReader::open(self.store_file.clone(), cache_num_blocks)
|
||||
/// Accessor to the segment's `StoreReader`.
|
||||
pub fn get_store_reader(&self, cache_size: usize) -> io::Result<StoreReader> {
|
||||
StoreReader::open(self.store_file.clone(), cache_size)
|
||||
}
|
||||
|
||||
/// Open a new segment for reading.
|
||||
|
||||
@@ -21,8 +21,8 @@
|
||||
|
||||
use std::net::Ipv6Addr;
|
||||
|
||||
pub use columnar::Column;
|
||||
use columnar::MonotonicallyMappableToU64;
|
||||
pub use fastfield_codecs::Column;
|
||||
|
||||
pub use self::alive_bitset::{intersect_alive_bitsets, write_alive_bitset, AliveBitSet};
|
||||
pub use self::error::{FastFieldNotAvailableError, Result};
|
||||
@@ -137,8 +137,10 @@ fn value_to_u64(value: &Value) -> crate::Result<u64> {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use std::ops::RangeInclusive;
|
||||
use std::collections::HashMap;
|
||||
use std::ops::{Range, RangeInclusive};
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common::{HasLen, TerminatingWrite};
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use std::collections::HashMap;
|
||||
use std::io;
|
||||
use std::net::Ipv6Addr;
|
||||
use std::sync::Arc;
|
||||
@@ -6,9 +7,10 @@ use columnar::{
|
||||
BytesColumn, ColumnType, ColumnValues, ColumnarReader, DynamicColumn, DynamicColumnHandle,
|
||||
HasAssociatedColumnType, StrColumn,
|
||||
};
|
||||
use fastfield_codecs::Column;
|
||||
|
||||
use crate::directory::FileSlice;
|
||||
use crate::schema::Schema;
|
||||
use crate::schema::{Field, Schema};
|
||||
use crate::space_usage::{FieldUsage, PerFieldSpaceUsage};
|
||||
|
||||
/// Provides access to all of the BitpackedFastFieldReader.
|
||||
@@ -85,10 +87,7 @@ impl FastFieldReaders {
|
||||
.sum())
|
||||
}
|
||||
|
||||
pub fn typed_column_first_or_default<T>(
|
||||
&self,
|
||||
field: &str,
|
||||
) -> crate::Result<Arc<dyn ColumnValues<T>>>
|
||||
pub fn typed_column_first_or_default<T>(&self, field: &str) -> crate::Result<Arc<dyn Column<T>>>
|
||||
where
|
||||
T: PartialOrd + Copy + HasAssociatedColumnType + Send + Sync + 'static,
|
||||
DynamicColumn: Into<Option<columnar::Column<T>>>,
|
||||
@@ -120,7 +119,7 @@ impl FastFieldReaders {
|
||||
/// Returns the `ip` fast field reader reader associated to `field`.
|
||||
///
|
||||
/// If `field` is not a u128 fast field, this method returns an Error.
|
||||
pub fn ip_addr(&self, field: &str) -> crate::Result<Arc<dyn ColumnValues<Ipv6Addr>>> {
|
||||
pub fn ip_addr(&self, field: &str) -> crate::Result<Arc<dyn Column<Ipv6Addr>>> {
|
||||
self.typed_column_first_or_default(field)
|
||||
}
|
||||
|
||||
@@ -158,21 +157,21 @@ impl FastFieldReaders {
|
||||
/// Returns the `i64` fast field reader reader associated with `field`.
|
||||
///
|
||||
/// If `field` is not a i64 fast field, this method returns an Error.
|
||||
pub fn i64(&self, field_name: &str) -> crate::Result<Arc<dyn ColumnValues<i64>>> {
|
||||
pub fn i64(&self, field_name: &str) -> crate::Result<Arc<dyn Column<i64>>> {
|
||||
self.typed_column_first_or_default(field_name)
|
||||
}
|
||||
|
||||
/// Returns the `f64` fast field reader reader associated with `field`.
|
||||
///
|
||||
/// If `field` is not a f64 fast field, this method returns an Error.
|
||||
pub fn f64(&self, field_name: &str) -> crate::Result<Arc<dyn ColumnValues<f64>>> {
|
||||
pub fn f64(&self, field_name: &str) -> crate::Result<Arc<dyn Column<f64>>> {
|
||||
self.typed_column_first_or_default(field_name)
|
||||
}
|
||||
|
||||
/// Returns the `bool` fast field reader reader associated with `field`.
|
||||
///
|
||||
/// If `field` is not a bool fast field, this method returns an Error.
|
||||
pub fn bool(&self, field_name: &str) -> crate::Result<Arc<dyn ColumnValues<bool>>> {
|
||||
pub fn bool(&self, field_name: &str) -> crate::Result<Arc<dyn Column<bool>>> {
|
||||
self.typed_column_first_or_default(field_name)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use columnar::ColumnValues;
|
||||
use itertools::Itertools;
|
||||
use measure_time::debug_time;
|
||||
|
||||
@@ -10,7 +9,7 @@ use crate::core::{Segment, SegmentReader};
|
||||
use crate::directory::WritePtr;
|
||||
use crate::docset::{DocSet, TERMINATED};
|
||||
use crate::error::DataCorruption;
|
||||
use crate::fastfield::{AliveBitSet, FastFieldNotAvailableError};
|
||||
use crate::fastfield::{AliveBitSet, Column, FastFieldNotAvailableError};
|
||||
use crate::fieldnorm::{FieldNormReader, FieldNormReaders, FieldNormsSerializer, FieldNormsWriter};
|
||||
use crate::indexer::doc_id_mapping::SegmentDocIdMapping;
|
||||
// use crate::indexer::sorted_doc_id_multivalue_column::RemappedDocIdMultiValueColumn;
|
||||
@@ -335,7 +334,7 @@ impl IndexMerger {
|
||||
pub(crate) fn get_sort_field_accessor(
|
||||
reader: &SegmentReader,
|
||||
sort_by_field: &IndexSortByField,
|
||||
) -> crate::Result<Arc<dyn ColumnValues>> {
|
||||
) -> crate::Result<Arc<dyn Column>> {
|
||||
reader.schema().get_field(&sort_by_field.field)?;
|
||||
let value_accessor = reader
|
||||
.fast_fields()
|
||||
@@ -349,7 +348,7 @@ impl IndexMerger {
|
||||
pub(crate) fn get_reader_with_sort_field_accessor(
|
||||
&self,
|
||||
sort_by_field: &IndexSortByField,
|
||||
) -> crate::Result<Vec<(SegmentOrdinal, Arc<dyn ColumnValues>)>> {
|
||||
) -> crate::Result<Vec<(SegmentOrdinal, Arc<dyn Column>)>> {
|
||||
let reader_ordinal_and_field_accessors = self
|
||||
.readers
|
||||
.iter()
|
||||
|
||||
@@ -476,11 +476,12 @@ mod bench_sorted_index_merge {
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use fastfield_codecs::Column;
|
||||
use test::{self, Bencher};
|
||||
|
||||
use crate::core::Index;
|
||||
use crate::indexer::merger::IndexMerger;
|
||||
use crate::schema::{NumericOptions, Schema};
|
||||
use crate::schema::{Cardinality, NumericOptions, Schema};
|
||||
use crate::{IndexSettings, IndexSortByField, IndexWriter, Order};
|
||||
fn create_index(sort_by_field: Option<IndexSortByField>) -> Index {
|
||||
let mut schema_builder = Schema::builder();
|
||||
@@ -511,42 +512,42 @@ mod bench_sorted_index_merge {
|
||||
index
|
||||
}
|
||||
|
||||
//#[bench]
|
||||
// fn create_sorted_index_walk_overkmerge_on_merge_fastfield(
|
||||
// b: &mut Bencher,
|
||||
//) -> crate::Result<()> {
|
||||
// let sort_by_field = IndexSortByField {
|
||||
// field: "intval".to_string(),
|
||||
// order: Order::Desc,
|
||||
//};
|
||||
// let index = create_index(Some(sort_by_field.clone()));
|
||||
// let segments = index.searchable_segments().unwrap();
|
||||
// let merger: IndexMerger =
|
||||
// IndexMerger::open(index.schema(), index.settings().clone(), &segments[..])?;
|
||||
// let doc_id_mapping = merger.generate_doc_id_mapping(&sort_by_field).unwrap();
|
||||
// b.iter(|| {
|
||||
// let sorted_doc_ids = doc_id_mapping.iter_old_doc_addrs().map(|doc_addr| {
|
||||
// let reader = &merger.readers[doc_addr.segment_ord as usize];
|
||||
// let u64_reader: Arc<dyn Column<u64>> = reader
|
||||
//.fast_fields()
|
||||
//.typed_fast_field_reader("intval")
|
||||
//.expect(
|
||||
//"Failed to find a reader for single fast field. This is a tantivy bug and \
|
||||
// it should never happen.",
|
||||
//);
|
||||
//(doc_addr.doc_id, reader, u64_reader)
|
||||
//});
|
||||
//// add values in order of the new doc_ids
|
||||
// let mut val = 0;
|
||||
// for (doc_id, _reader, field_reader) in sorted_doc_ids {
|
||||
// val = field_reader.get_val(doc_id);
|
||||
//}
|
||||
#[bench]
|
||||
fn create_sorted_index_walk_overkmerge_on_merge_fastfield(
|
||||
b: &mut Bencher,
|
||||
) -> crate::Result<()> {
|
||||
let sort_by_field = IndexSortByField {
|
||||
field: "intval".to_string(),
|
||||
order: Order::Desc,
|
||||
};
|
||||
let index = create_index(Some(sort_by_field.clone()));
|
||||
let segments = index.searchable_segments().unwrap();
|
||||
let merger: IndexMerger =
|
||||
IndexMerger::open(index.schema(), index.settings().clone(), &segments[..])?;
|
||||
let doc_id_mapping = merger.generate_doc_id_mapping(&sort_by_field).unwrap();
|
||||
b.iter(|| {
|
||||
let sorted_doc_ids = doc_id_mapping.iter_old_doc_addrs().map(|doc_addr| {
|
||||
let reader = &merger.readers[doc_addr.segment_ord as usize];
|
||||
let u64_reader: Arc<dyn Column<u64>> = reader
|
||||
.fast_fields()
|
||||
.typed_fast_field_reader("intval")
|
||||
.expect(
|
||||
"Failed to find a reader for single fast field. This is a tantivy bug and \
|
||||
it should never happen.",
|
||||
);
|
||||
(doc_addr.doc_id, reader, u64_reader)
|
||||
});
|
||||
// add values in order of the new doc_ids
|
||||
let mut val = 0;
|
||||
for (doc_id, _reader, field_reader) in sorted_doc_ids {
|
||||
val = field_reader.get_val(doc_id);
|
||||
}
|
||||
|
||||
// val
|
||||
//});
|
||||
val
|
||||
});
|
||||
|
||||
// Ok(())
|
||||
//}
|
||||
Ok(())
|
||||
}
|
||||
#[bench]
|
||||
fn create_sorted_index_create_doc_id_mapping(b: &mut Bencher) -> crate::Result<()> {
|
||||
let sort_by_field = IndexSortByField {
|
||||
|
||||
@@ -279,7 +279,7 @@ mod indexer;
|
||||
pub mod error;
|
||||
pub mod tokenizer;
|
||||
|
||||
pub mod aggregation;
|
||||
// pub mod aggregation;
|
||||
pub mod collector;
|
||||
pub mod directory;
|
||||
pub mod fastfield;
|
||||
|
||||
@@ -88,7 +88,7 @@ fn bound_to_value_range(
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub mod tests {
|
||||
mod tests {
|
||||
use proptest::prelude::ProptestConfig;
|
||||
use proptest::strategy::Strategy;
|
||||
use proptest::{prop_oneof, proptest};
|
||||
@@ -188,7 +188,7 @@ pub mod tests {
|
||||
assert_eq!(count, 2);
|
||||
}
|
||||
|
||||
pub fn create_index_from_docs(docs: &[Doc]) -> Index {
|
||||
fn create_index_from_docs(docs: &[Doc]) -> Index {
|
||||
let mut schema_builder = Schema::builder();
|
||||
let ip_field = schema_builder.add_ip_addr_field("ip", STORED | FAST);
|
||||
let ips_field = schema_builder.add_ip_addr_field("ips", FAST | INDEXED);
|
||||
|
||||
@@ -86,7 +86,7 @@ fn bound_to_value_range<T: MonotonicallyMappableToU64>(
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub mod tests {
|
||||
mod tests {
|
||||
use std::ops::{Bound, RangeInclusive};
|
||||
|
||||
use proptest::prelude::ProptestConfig;
|
||||
@@ -191,7 +191,7 @@ pub mod tests {
|
||||
assert!(test_id_range_for_docs(ops).is_ok());
|
||||
}
|
||||
|
||||
pub fn create_index_from_docs(docs: &[Doc]) -> Index {
|
||||
fn create_index_from_docs(docs: &[Doc]) -> Index {
|
||||
let mut schema_builder = Schema::builder();
|
||||
let id_u64_field = schema_builder.add_u64_field("id", INDEXED | STORED | FAST);
|
||||
let ids_u64_field =
|
||||
|
||||
@@ -44,7 +44,7 @@ pub struct IndexReaderBuilder {
|
||||
index: Index,
|
||||
warmers: Vec<Weak<dyn Warmer>>,
|
||||
num_warming_threads: usize,
|
||||
doc_store_cache_num_blocks: usize,
|
||||
doc_store_cache_size: usize,
|
||||
}
|
||||
|
||||
impl IndexReaderBuilder {
|
||||
@@ -55,7 +55,7 @@ impl IndexReaderBuilder {
|
||||
index,
|
||||
warmers: Vec::new(),
|
||||
num_warming_threads: 1,
|
||||
doc_store_cache_num_blocks: DOCSTORE_CACHE_CAPACITY,
|
||||
doc_store_cache_size: DOCSTORE_CACHE_CAPACITY,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -72,7 +72,7 @@ impl IndexReaderBuilder {
|
||||
searcher_generation_inventory.clone(),
|
||||
)?;
|
||||
let inner_reader = InnerIndexReader::new(
|
||||
self.doc_store_cache_num_blocks,
|
||||
self.doc_store_cache_size,
|
||||
self.index,
|
||||
warming_state,
|
||||
searcher_generation_inventory,
|
||||
@@ -119,11 +119,8 @@ impl IndexReaderBuilder {
|
||||
///
|
||||
/// The doc store readers cache by default DOCSTORE_CACHE_CAPACITY(100) decompressed blocks.
|
||||
#[must_use]
|
||||
pub fn doc_store_cache_num_blocks(
|
||||
mut self,
|
||||
doc_store_cache_num_blocks: usize,
|
||||
) -> IndexReaderBuilder {
|
||||
self.doc_store_cache_num_blocks = doc_store_cache_num_blocks;
|
||||
pub fn doc_store_cache_size(mut self, doc_store_cache_size: usize) -> IndexReaderBuilder {
|
||||
self.doc_store_cache_size = doc_store_cache_size;
|
||||
self
|
||||
}
|
||||
|
||||
@@ -154,7 +151,7 @@ impl TryInto<IndexReader> for IndexReaderBuilder {
|
||||
}
|
||||
|
||||
struct InnerIndexReader {
|
||||
doc_store_cache_num_blocks: usize,
|
||||
doc_store_cache_size: usize,
|
||||
index: Index,
|
||||
warming_state: WarmingState,
|
||||
searcher: arc_swap::ArcSwap<SearcherInner>,
|
||||
@@ -164,7 +161,7 @@ struct InnerIndexReader {
|
||||
|
||||
impl InnerIndexReader {
|
||||
fn new(
|
||||
doc_store_cache_num_blocks: usize,
|
||||
doc_store_cache_size: usize,
|
||||
index: Index,
|
||||
warming_state: WarmingState,
|
||||
// The searcher_generation_inventory is not used as source, but as target to track the
|
||||
@@ -175,13 +172,13 @@ impl InnerIndexReader {
|
||||
|
||||
let searcher = Self::create_searcher(
|
||||
&index,
|
||||
doc_store_cache_num_blocks,
|
||||
doc_store_cache_size,
|
||||
&warming_state,
|
||||
&searcher_generation_counter,
|
||||
&searcher_generation_inventory,
|
||||
)?;
|
||||
Ok(InnerIndexReader {
|
||||
doc_store_cache_num_blocks,
|
||||
doc_store_cache_size,
|
||||
index,
|
||||
warming_state,
|
||||
searcher: ArcSwap::from(searcher),
|
||||
@@ -217,7 +214,7 @@ impl InnerIndexReader {
|
||||
|
||||
fn create_searcher(
|
||||
index: &Index,
|
||||
doc_store_cache_num_blocks: usize,
|
||||
doc_store_cache_size: usize,
|
||||
warming_state: &WarmingState,
|
||||
searcher_generation_counter: &Arc<AtomicU64>,
|
||||
searcher_generation_inventory: &Inventory<SearcherGeneration>,
|
||||
@@ -235,7 +232,7 @@ impl InnerIndexReader {
|
||||
index.clone(),
|
||||
segment_readers,
|
||||
searcher_generation,
|
||||
doc_store_cache_num_blocks,
|
||||
doc_store_cache_size,
|
||||
)?);
|
||||
|
||||
warming_state.warm_new_searcher_generation(&searcher.clone().into())?;
|
||||
@@ -245,7 +242,7 @@ impl InnerIndexReader {
|
||||
fn reload(&self) -> crate::Result<()> {
|
||||
let searcher = Self::create_searcher(
|
||||
&self.index,
|
||||
self.doc_store_cache_num_blocks,
|
||||
self.doc_store_cache_size,
|
||||
&self.warming_state,
|
||||
&self.searcher_generation_counter,
|
||||
&self.searcher_generation_inventory,
|
||||
|
||||
@@ -4,8 +4,8 @@
|
||||
//! order to be handled in the `Store`.
|
||||
//!
|
||||
//! Internally, documents (or rather their stored fields) are serialized to a buffer.
|
||||
//! When the buffer exceeds `block_size` (defaults to 16K), the buffer is compressed using `brotli`,
|
||||
//! `LZ4` or `snappy` and the resulting block is written to disk.
|
||||
//! When the buffer exceeds 16K, the buffer is compressed using `brotli`, `LZ4` or `snappy`
|
||||
//! and the resulting block is written to disk.
|
||||
//!
|
||||
//! One can then request for a specific `DocId`.
|
||||
//! A skip list helps navigating to the right block,
|
||||
@@ -28,6 +28,8 @@
|
||||
//! - at the segment level, the
|
||||
//! [`SegmentReader`'s `doc` method](../struct.SegmentReader.html#method.doc)
|
||||
//! - at the index level, the [`Searcher::doc()`](crate::Searcher::doc) method
|
||||
//!
|
||||
//! !
|
||||
|
||||
mod compressors;
|
||||
mod decompressors;
|
||||
|
||||
@@ -114,10 +114,7 @@ impl Sum for CacheStats {
|
||||
|
||||
impl StoreReader {
|
||||
/// Opens a store reader
|
||||
///
|
||||
/// `cache_num_blocks` sets the number of decompressed blocks to be cached in an LRU.
|
||||
/// The size of blocks is configurable, this should be reflexted in the
|
||||
pub fn open(store_file: FileSlice, cache_num_blocks: usize) -> io::Result<StoreReader> {
|
||||
pub fn open(store_file: FileSlice, cache_size: usize) -> io::Result<StoreReader> {
|
||||
let (footer, data_and_offset) = DocStoreFooter::extract_footer(store_file)?;
|
||||
|
||||
let (data_file, offset_index_file) = data_and_offset.split(footer.offset as usize);
|
||||
@@ -128,8 +125,8 @@ impl StoreReader {
|
||||
decompressor: footer.decompressor,
|
||||
data: data_file,
|
||||
cache: BlockCache {
|
||||
cache: NonZeroUsize::new(cache_num_blocks)
|
||||
.map(|cache_num_blocks| Mutex::new(LruCache::new(cache_num_blocks))),
|
||||
cache: NonZeroUsize::new(cache_size)
|
||||
.map(|cache_size| Mutex::new(LruCache::new(cache_size))),
|
||||
cache_hits: Default::default(),
|
||||
cache_misses: Default::default(),
|
||||
},
|
||||
|
||||
Reference in New Issue
Block a user