mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-01-04 16:22:55 +00:00
Compare commits
1 Commits
quickwit-0
...
use_column
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
375d1f9dac |
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "tantivy"
|
name = "tantivy"
|
||||||
version = "0.19.1-quickwit"
|
version = "0.19.0"
|
||||||
authors = ["Paul Masurel <paul.masurel@gmail.com>"]
|
authors = ["Paul Masurel <paul.masurel@gmail.com>"]
|
||||||
license = "MIT"
|
license = "MIT"
|
||||||
categories = ["database-implementations", "data-structures"]
|
categories = ["database-implementations", "data-structures"]
|
||||||
@@ -23,7 +23,7 @@ regex = { version = "1.5.5", default-features = false, features = ["std", "unico
|
|||||||
aho-corasick = "0.7"
|
aho-corasick = "0.7"
|
||||||
tantivy-fst = "0.4.0"
|
tantivy-fst = "0.4.0"
|
||||||
memmap2 = { version = "0.5.3", optional = true }
|
memmap2 = { version = "0.5.3", optional = true }
|
||||||
lz4_flex = { version = "0.10", default-features = false, features = ["checked-decode"], optional = true }
|
lz4_flex = { version = "0.9.2", default-features = false, features = ["checked-decode"], optional = true }
|
||||||
brotli = { version = "3.3.4", optional = true }
|
brotli = { version = "3.3.4", optional = true }
|
||||||
zstd = { version = "0.12", optional = true, default-features = false }
|
zstd = { version = "0.12", optional = true, default-features = false }
|
||||||
snap = { version = "1.0.5", optional = true }
|
snap = { version = "1.0.5", optional = true }
|
||||||
@@ -55,7 +55,7 @@ measure_time = "0.8.2"
|
|||||||
async-trait = "0.1.53"
|
async-trait = "0.1.53"
|
||||||
arc-swap = "1.5.0"
|
arc-swap = "1.5.0"
|
||||||
|
|
||||||
#columnar = { version="0.1", path="./columnar", package ="tantivy-columnar" }
|
columnar = { version="0.1", path="./columnar", package ="tantivy-columnar" }
|
||||||
sstable = { version="0.1", path="./sstable", package ="tantivy-sstable", optional = true }
|
sstable = { version="0.1", path="./sstable", package ="tantivy-sstable", optional = true }
|
||||||
stacker = { version="0.1", path="./stacker", package ="tantivy-stacker" }
|
stacker = { version="0.1", path="./stacker", package ="tantivy-stacker" }
|
||||||
tantivy-query-grammar = { version= "0.19.0", path="./query-grammar" }
|
tantivy-query-grammar = { version= "0.19.0", path="./query-grammar" }
|
||||||
|
|||||||
@@ -22,6 +22,9 @@ pub struct Column<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<T: PartialOrd> Column<T> {
|
impl<T: PartialOrd> Column<T> {
|
||||||
|
pub fn get_cardinality(&self) -> Cardinality {
|
||||||
|
self.idx.get_cardinality()
|
||||||
|
}
|
||||||
pub fn num_rows(&self) -> RowId {
|
pub fn num_rows(&self) -> RowId {
|
||||||
match &self.idx {
|
match &self.idx {
|
||||||
ColumnIndex::Full => self.values.num_vals() as u32,
|
ColumnIndex::Full => self.values.num_vals() as u32,
|
||||||
|
|||||||
@@ -1,9 +1,10 @@
|
|||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::io;
|
use std::io;
|
||||||
|
|
||||||
|
use super::writer::ColumnarSerializer;
|
||||||
use crate::columnar::ColumnarReader;
|
use crate::columnar::ColumnarReader;
|
||||||
use crate::dynamic_column::DynamicColumn;
|
use crate::dynamic_column::DynamicColumn;
|
||||||
use crate::ColumnType;
|
use crate::{Cardinality, ColumnType};
|
||||||
|
|
||||||
pub enum MergeDocOrder {
|
pub enum MergeDocOrder {
|
||||||
/// Columnar tables are simply stacked one above the other.
|
/// Columnar tables are simply stacked one above the other.
|
||||||
@@ -19,20 +20,30 @@ pub enum MergeDocOrder {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn merge_columnar(
|
pub fn merge_columnar(
|
||||||
_columnar_readers: &[ColumnarReader],
|
columnar_readers: &[ColumnarReader],
|
||||||
mapping: MergeDocOrder,
|
mapping: MergeDocOrder,
|
||||||
_output: &mut impl io::Write,
|
output: &mut impl io::Write,
|
||||||
) -> io::Result<()> {
|
) -> io::Result<()> {
|
||||||
match mapping {
|
let mut serializer = ColumnarSerializer::new(output);
|
||||||
MergeDocOrder::Stack => {
|
|
||||||
// implement me :)
|
// TODO handle dictionary merge for Str/Bytes column
|
||||||
todo!();
|
let field_name_to_group = group_columns_for_merge(columnar_readers)?;
|
||||||
}
|
for (column_name, category_to_columns) in field_name_to_group {
|
||||||
MergeDocOrder::Complex(_) => {
|
for (_category, columns_to_merge) in category_to_columns {
|
||||||
// for later
|
let column_type = columns_to_merge[0].column_type();
|
||||||
todo!();
|
let mut column_serialzier =
|
||||||
|
serializer.serialize_column(column_name.as_bytes(), column_type);
|
||||||
|
merge_columns(
|
||||||
|
column_type,
|
||||||
|
&columns_to_merge,
|
||||||
|
&mapping,
|
||||||
|
&mut column_serialzier,
|
||||||
|
)?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
serializer.finalize()?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Column types are grouped into different categories.
|
/// Column types are grouped into different categories.
|
||||||
@@ -44,7 +55,7 @@ pub fn merge_columnar(
|
|||||||
/// See also [README.md].
|
/// See also [README.md].
|
||||||
#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
|
#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
|
||||||
#[repr(u8)]
|
#[repr(u8)]
|
||||||
enum ColumnTypeCategory {
|
pub enum ColumnTypeCategory {
|
||||||
Bool,
|
Bool,
|
||||||
Str,
|
Str,
|
||||||
Numerical,
|
Numerical,
|
||||||
@@ -68,8 +79,41 @@ impl From<ColumnType> for ColumnTypeCategory {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn collect_columns(
|
pub fn detect_cardinality(columns: &[DynamicColumn]) -> Cardinality {
|
||||||
columnar_readers: &[&ColumnarReader],
|
if columns
|
||||||
|
.iter()
|
||||||
|
.any(|column| column.get_cardinality().is_multivalue())
|
||||||
|
{
|
||||||
|
return Cardinality::Multivalued;
|
||||||
|
}
|
||||||
|
if columns
|
||||||
|
.iter()
|
||||||
|
.any(|column| column.get_cardinality().is_optional())
|
||||||
|
{
|
||||||
|
return Cardinality::Optional;
|
||||||
|
}
|
||||||
|
Cardinality::Full
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn compute_num_docs(columns: &[DynamicColumn], mapping: &MergeDocOrder) -> usize {
|
||||||
|
// TODO handle deletes
|
||||||
|
|
||||||
|
0
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn merge_columns(
|
||||||
|
column_type: ColumnType,
|
||||||
|
columns: &[DynamicColumn],
|
||||||
|
mapping: &MergeDocOrder,
|
||||||
|
column_serializer: &mut impl io::Write,
|
||||||
|
) -> io::Result<()> {
|
||||||
|
let cardinality = detect_cardinality(columns);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn group_columns_for_merge(
|
||||||
|
columnar_readers: &[ColumnarReader],
|
||||||
) -> io::Result<HashMap<String, HashMap<ColumnTypeCategory, Vec<DynamicColumn>>>> {
|
) -> io::Result<HashMap<String, HashMap<ColumnTypeCategory, Vec<DynamicColumn>>>> {
|
||||||
// Each column name may have multiple types of column associated.
|
// Each column name may have multiple types of column associated.
|
||||||
// For merging we are interested in the same column type category since they can be merged.
|
// For merging we are interested in the same column type category since they can be merged.
|
||||||
@@ -117,26 +161,20 @@ fn cast_to_common_numerical_column(columns: &[DynamicColumn]) -> Vec<DynamicColu
|
|||||||
.all(|column| column.column_type().numerical_type().is_some()));
|
.all(|column| column.column_type().numerical_type().is_some()));
|
||||||
let coerce_to_i64: Vec<_> = columns
|
let coerce_to_i64: Vec<_> = columns
|
||||||
.iter()
|
.iter()
|
||||||
.map(|column| column.clone().coerce_to_i64())
|
.filter_map(|column| column.clone().coerce_to_i64())
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
if coerce_to_i64.iter().all(|column| column.is_some()) {
|
if coerce_to_i64.len() == columns.len() {
|
||||||
return coerce_to_i64
|
return coerce_to_i64;
|
||||||
.into_iter()
|
|
||||||
.map(|column| column.unwrap())
|
|
||||||
.collect();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let coerce_to_u64: Vec<_> = columns
|
let coerce_to_u64: Vec<_> = columns
|
||||||
.iter()
|
.iter()
|
||||||
.map(|column| column.clone().coerce_to_u64())
|
.filter_map(|column| column.clone().coerce_to_u64())
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
if coerce_to_u64.iter().all(|column| column.is_some()) {
|
if coerce_to_u64.len() == columns.len() {
|
||||||
return coerce_to_u64
|
return coerce_to_u64;
|
||||||
.into_iter()
|
|
||||||
.map(|column| column.unwrap())
|
|
||||||
.collect();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
columns
|
columns
|
||||||
@@ -183,7 +221,9 @@ mod tests {
|
|||||||
ColumnarReader::open(buffer).unwrap()
|
ColumnarReader::open(buffer).unwrap()
|
||||||
};
|
};
|
||||||
|
|
||||||
let column_map = collect_columns(&[&columnar1, &columnar2, &columnar3]).unwrap();
|
let column_map =
|
||||||
|
group_columns_for_merge(&[columnar1.clone(), columnar2.clone(), columnar3.clone()])
|
||||||
|
.unwrap();
|
||||||
assert_eq!(column_map.len(), 1);
|
assert_eq!(column_map.len(), 1);
|
||||||
let cat_to_columns = column_map.get("numbers").unwrap();
|
let cat_to_columns = column_map.get("numbers").unwrap();
|
||||||
assert_eq!(cat_to_columns.len(), 1);
|
assert_eq!(cat_to_columns.len(), 1);
|
||||||
@@ -191,14 +231,14 @@ mod tests {
|
|||||||
let numerical = cat_to_columns.get(&ColumnTypeCategory::Numerical).unwrap();
|
let numerical = cat_to_columns.get(&ColumnTypeCategory::Numerical).unwrap();
|
||||||
assert!(numerical.iter().all(|column| column.is_f64()));
|
assert!(numerical.iter().all(|column| column.is_f64()));
|
||||||
|
|
||||||
let column_map = collect_columns(&[&columnar1, &columnar1]).unwrap();
|
let column_map = group_columns_for_merge(&[columnar1.clone(), columnar1.clone()]).unwrap();
|
||||||
assert_eq!(column_map.len(), 1);
|
assert_eq!(column_map.len(), 1);
|
||||||
let cat_to_columns = column_map.get("numbers").unwrap();
|
let cat_to_columns = column_map.get("numbers").unwrap();
|
||||||
assert_eq!(cat_to_columns.len(), 1);
|
assert_eq!(cat_to_columns.len(), 1);
|
||||||
let numerical = cat_to_columns.get(&ColumnTypeCategory::Numerical).unwrap();
|
let numerical = cat_to_columns.get(&ColumnTypeCategory::Numerical).unwrap();
|
||||||
assert!(numerical.iter().all(|column| column.is_i64()));
|
assert!(numerical.iter().all(|column| column.is_i64()));
|
||||||
|
|
||||||
let column_map = collect_columns(&[&columnar2, &columnar2]).unwrap();
|
let column_map = group_columns_for_merge(&[columnar2.clone(), columnar2.clone()]).unwrap();
|
||||||
assert_eq!(column_map.len(), 1);
|
assert_eq!(column_map.len(), 1);
|
||||||
let cat_to_columns = column_map.get("numbers").unwrap();
|
let cat_to_columns = column_map.get("numbers").unwrap();
|
||||||
assert_eq!(cat_to_columns.len(), 1);
|
assert_eq!(cat_to_columns.len(), 1);
|
||||||
|
|||||||
1
columnar/src/columnar/merge_index.rs
Normal file
1
columnar/src/columnar/merge_index.rs
Normal file
@@ -0,0 +1 @@
|
|||||||
|
|
||||||
@@ -1,6 +1,7 @@
|
|||||||
mod column_type;
|
mod column_type;
|
||||||
mod format_version;
|
mod format_version;
|
||||||
mod merge;
|
mod merge;
|
||||||
|
mod merge_index;
|
||||||
mod reader;
|
mod reader;
|
||||||
mod writer;
|
mod writer;
|
||||||
|
|
||||||
|
|||||||
@@ -13,6 +13,7 @@ fn io_invalid_data(msg: String) -> io::Error {
|
|||||||
|
|
||||||
/// The ColumnarReader makes it possible to access a set of columns
|
/// The ColumnarReader makes it possible to access a set of columns
|
||||||
/// associated to field names.
|
/// associated to field names.
|
||||||
|
#[derive(Clone)]
|
||||||
pub struct ColumnarReader {
|
pub struct ColumnarReader {
|
||||||
column_dictionary: Dictionary<RangeSSTable>,
|
column_dictionary: Dictionary<RangeSSTable>,
|
||||||
column_data: FileSlice,
|
column_data: FileSlice,
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ use std::net::Ipv6Addr;
|
|||||||
|
|
||||||
use column_operation::ColumnOperation;
|
use column_operation::ColumnOperation;
|
||||||
use common::CountingWriter;
|
use common::CountingWriter;
|
||||||
use serializer::ColumnarSerializer;
|
pub(crate) use serializer::ColumnarSerializer;
|
||||||
use stacker::{Addr, ArenaHashMap, MemoryArena};
|
use stacker::{Addr, ArenaHashMap, MemoryArena};
|
||||||
|
|
||||||
use crate::column_index::SerializableColumnIndex;
|
use crate::column_index::SerializableColumnIndex;
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ use common::{HasLen, OwnedBytes};
|
|||||||
use crate::column::{BytesColumn, Column, StrColumn};
|
use crate::column::{BytesColumn, Column, StrColumn};
|
||||||
use crate::column_values::{monotonic_map_column, StrictlyMonotonicFn};
|
use crate::column_values::{monotonic_map_column, StrictlyMonotonicFn};
|
||||||
use crate::columnar::ColumnType;
|
use crate::columnar::ColumnType;
|
||||||
use crate::{DateTime, NumericalType};
|
use crate::{Cardinality, DateTime, NumericalType};
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub enum DynamicColumn {
|
pub enum DynamicColumn {
|
||||||
@@ -23,6 +23,18 @@ pub enum DynamicColumn {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl DynamicColumn {
|
impl DynamicColumn {
|
||||||
|
pub fn get_cardinality(&self) -> Cardinality {
|
||||||
|
match self {
|
||||||
|
DynamicColumn::Bool(c) => c.get_cardinality(),
|
||||||
|
DynamicColumn::I64(c) => c.get_cardinality(),
|
||||||
|
DynamicColumn::U64(c) => c.get_cardinality(),
|
||||||
|
DynamicColumn::F64(c) => c.get_cardinality(),
|
||||||
|
DynamicColumn::IpAddr(c) => c.get_cardinality(),
|
||||||
|
DynamicColumn::DateTime(c) => c.get_cardinality(),
|
||||||
|
DynamicColumn::Bytes(c) => c.ords().get_cardinality(),
|
||||||
|
DynamicColumn::Str(c) => c.ords().get_cardinality(),
|
||||||
|
}
|
||||||
|
}
|
||||||
pub fn column_type(&self) -> ColumnType {
|
pub fn column_type(&self) -> ColumnType {
|
||||||
match self {
|
match self {
|
||||||
DynamicColumn::Bool(_) => ColumnType::Bool,
|
DynamicColumn::Bool(_) => ColumnType::Bool,
|
||||||
|
|||||||
@@ -62,6 +62,12 @@ pub enum Cardinality {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl Cardinality {
|
impl Cardinality {
|
||||||
|
pub fn is_optional(&self) -> bool {
|
||||||
|
matches!(self, Cardinality::Optional)
|
||||||
|
}
|
||||||
|
pub fn is_multivalue(&self) -> bool {
|
||||||
|
matches!(self, Cardinality::Multivalued)
|
||||||
|
}
|
||||||
pub(crate) fn to_code(self) -> u8 {
|
pub(crate) fn to_code(self) -> u8 {
|
||||||
self as u8
|
self as u8
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -362,19 +362,13 @@ impl SegmentTermCollector {
|
|||||||
let mut entries: Vec<(u32, TermBucketEntry)> =
|
let mut entries: Vec<(u32, TermBucketEntry)> =
|
||||||
self.term_buckets.entries.into_iter().collect();
|
self.term_buckets.entries.into_iter().collect();
|
||||||
|
|
||||||
|
let order_by_key = self.req.order.target == OrderTarget::Key;
|
||||||
let order_by_sub_aggregation =
|
let order_by_sub_aggregation =
|
||||||
matches!(self.req.order.target, OrderTarget::SubAggregation(_));
|
matches!(self.req.order.target, OrderTarget::SubAggregation(_));
|
||||||
|
|
||||||
match self.req.order.target {
|
match self.req.order.target {
|
||||||
OrderTarget::Key => {
|
OrderTarget::Key => {
|
||||||
// We rely on the fact, that term ordinals match the order of the strings
|
// defer order and cut_off after loading the texts from the dictionary
|
||||||
// TODO: We could have a special collector, that keeps only TOP n results at any
|
|
||||||
// time.
|
|
||||||
if self.req.order.order == Order::Desc {
|
|
||||||
entries.sort_unstable_by_key(|bucket| std::cmp::Reverse(bucket.0));
|
|
||||||
} else {
|
|
||||||
entries.sort_unstable_by_key(|bucket| bucket.0);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
OrderTarget::SubAggregation(_name) => {
|
OrderTarget::SubAggregation(_name) => {
|
||||||
// don't sort and cut off since it's hard to make assumptions on the quality of the
|
// don't sort and cut off since it's hard to make assumptions on the quality of the
|
||||||
@@ -390,11 +384,12 @@ impl SegmentTermCollector {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let (term_doc_count_before_cutoff, sum_other_doc_count) = if order_by_sub_aggregation {
|
let (term_doc_count_before_cutoff, mut sum_other_doc_count) =
|
||||||
(0, 0)
|
if order_by_key || order_by_sub_aggregation {
|
||||||
} else {
|
(0, 0)
|
||||||
cut_off_buckets(&mut entries, self.req.segment_size as usize)
|
} else {
|
||||||
};
|
cut_off_buckets(&mut entries, self.req.segment_size as usize)
|
||||||
|
};
|
||||||
|
|
||||||
let inverted_index = agg_with_accessor
|
let inverted_index = agg_with_accessor
|
||||||
.inverted_index
|
.inverted_index
|
||||||
@@ -417,10 +412,6 @@ impl SegmentTermCollector {
|
|||||||
if self.req.min_doc_count == 0 {
|
if self.req.min_doc_count == 0 {
|
||||||
let mut stream = term_dict.stream()?;
|
let mut stream = term_dict.stream()?;
|
||||||
while let Some((key, _ord)) = stream.next() {
|
while let Some((key, _ord)) = stream.next() {
|
||||||
if dict.len() >= self.req.segment_size as usize {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
let key = std::str::from_utf8(key)
|
let key = std::str::from_utf8(key)
|
||||||
.map_err(|utf8_err| DataCorruption::comment_only(utf8_err.to_string()))?;
|
.map_err(|utf8_err| DataCorruption::comment_only(utf8_err.to_string()))?;
|
||||||
if !dict.contains_key(key) {
|
if !dict.contains_key(key) {
|
||||||
@@ -429,6 +420,20 @@ impl SegmentTermCollector {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if order_by_key {
|
||||||
|
let mut dict_entries = dict.into_iter().collect_vec();
|
||||||
|
if self.req.order.order == Order::Desc {
|
||||||
|
dict_entries.sort_unstable_by(|(key1, _), (key2, _)| key1.cmp(key2));
|
||||||
|
} else {
|
||||||
|
dict_entries.sort_unstable_by(|(key1, _), (key2, _)| key2.cmp(key1));
|
||||||
|
}
|
||||||
|
let (_, sum_other_docs) =
|
||||||
|
cut_off_buckets(&mut dict_entries, self.req.segment_size as usize);
|
||||||
|
|
||||||
|
sum_other_doc_count += sum_other_docs;
|
||||||
|
dict = dict_entries.into_iter().collect();
|
||||||
|
}
|
||||||
|
|
||||||
Ok(IntermediateBucketResult::Terms(
|
Ok(IntermediateBucketResult::Terms(
|
||||||
IntermediateTermBucketResult {
|
IntermediateTermBucketResult {
|
||||||
entries: dict,
|
entries: dict,
|
||||||
@@ -918,14 +923,14 @@ mod tests {
|
|||||||
];
|
];
|
||||||
let index = get_test_index_from_values_and_terms(merge_segments, &segment_and_terms)?;
|
let index = get_test_index_from_values_and_terms(merge_segments, &segment_and_terms)?;
|
||||||
|
|
||||||
// key asc
|
// key desc
|
||||||
let agg_req: Aggregations = vec![(
|
let agg_req: Aggregations = vec![(
|
||||||
"my_texts".to_string(),
|
"my_texts".to_string(),
|
||||||
Aggregation::Bucket(BucketAggregation {
|
Aggregation::Bucket(BucketAggregation {
|
||||||
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
|
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
|
||||||
field: "string_id".to_string(),
|
field: "string_id".to_string(),
|
||||||
order: Some(CustomOrder {
|
order: Some(CustomOrder {
|
||||||
order: Order::Asc,
|
order: Order::Desc,
|
||||||
target: OrderTarget::Key,
|
target: OrderTarget::Key,
|
||||||
}),
|
}),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
@@ -952,7 +957,7 @@ mod tests {
|
|||||||
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
|
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
|
||||||
field: "string_id".to_string(),
|
field: "string_id".to_string(),
|
||||||
order: Some(CustomOrder {
|
order: Some(CustomOrder {
|
||||||
order: Order::Asc,
|
order: Order::Desc,
|
||||||
target: OrderTarget::Key,
|
target: OrderTarget::Key,
|
||||||
}),
|
}),
|
||||||
size: Some(2),
|
size: Some(2),
|
||||||
@@ -976,14 +981,14 @@ mod tests {
|
|||||||
|
|
||||||
assert_eq!(res["my_texts"]["sum_other_doc_count"], 3);
|
assert_eq!(res["my_texts"]["sum_other_doc_count"], 3);
|
||||||
|
|
||||||
// key asc and segment_size cut_off
|
// key desc and segment_size cut_off
|
||||||
let agg_req: Aggregations = vec![(
|
let agg_req: Aggregations = vec![(
|
||||||
"my_texts".to_string(),
|
"my_texts".to_string(),
|
||||||
Aggregation::Bucket(BucketAggregation {
|
Aggregation::Bucket(BucketAggregation {
|
||||||
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
|
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
|
||||||
field: "string_id".to_string(),
|
field: "string_id".to_string(),
|
||||||
order: Some(CustomOrder {
|
order: Some(CustomOrder {
|
||||||
order: Order::Asc,
|
order: Order::Desc,
|
||||||
target: OrderTarget::Key,
|
target: OrderTarget::Key,
|
||||||
}),
|
}),
|
||||||
size: Some(2),
|
size: Some(2),
|
||||||
@@ -1006,14 +1011,14 @@ mod tests {
|
|||||||
serde_json::Value::Null
|
serde_json::Value::Null
|
||||||
);
|
);
|
||||||
|
|
||||||
// key desc
|
// key asc
|
||||||
let agg_req: Aggregations = vec![(
|
let agg_req: Aggregations = vec![(
|
||||||
"my_texts".to_string(),
|
"my_texts".to_string(),
|
||||||
Aggregation::Bucket(BucketAggregation {
|
Aggregation::Bucket(BucketAggregation {
|
||||||
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
|
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
|
||||||
field: "string_id".to_string(),
|
field: "string_id".to_string(),
|
||||||
order: Some(CustomOrder {
|
order: Some(CustomOrder {
|
||||||
order: Order::Desc,
|
order: Order::Asc,
|
||||||
target: OrderTarget::Key,
|
target: OrderTarget::Key,
|
||||||
}),
|
}),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
@@ -1033,14 +1038,14 @@ mod tests {
|
|||||||
assert_eq!(res["my_texts"]["buckets"][2]["doc_count"], 5);
|
assert_eq!(res["my_texts"]["buckets"][2]["doc_count"], 5);
|
||||||
assert_eq!(res["my_texts"]["sum_other_doc_count"], 0);
|
assert_eq!(res["my_texts"]["sum_other_doc_count"], 0);
|
||||||
|
|
||||||
// key desc, size cut_off
|
// key asc, size cut_off
|
||||||
let agg_req: Aggregations = vec![(
|
let agg_req: Aggregations = vec![(
|
||||||
"my_texts".to_string(),
|
"my_texts".to_string(),
|
||||||
Aggregation::Bucket(BucketAggregation {
|
Aggregation::Bucket(BucketAggregation {
|
||||||
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
|
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
|
||||||
field: "string_id".to_string(),
|
field: "string_id".to_string(),
|
||||||
order: Some(CustomOrder {
|
order: Some(CustomOrder {
|
||||||
order: Order::Desc,
|
order: Order::Asc,
|
||||||
target: OrderTarget::Key,
|
target: OrderTarget::Key,
|
||||||
}),
|
}),
|
||||||
size: Some(2),
|
size: Some(2),
|
||||||
@@ -1063,14 +1068,14 @@ mod tests {
|
|||||||
);
|
);
|
||||||
assert_eq!(res["my_texts"]["sum_other_doc_count"], 5);
|
assert_eq!(res["my_texts"]["sum_other_doc_count"], 5);
|
||||||
|
|
||||||
// key desc, segment_size cut_off
|
// key asc, segment_size cut_off
|
||||||
let agg_req: Aggregations = vec![(
|
let agg_req: Aggregations = vec![(
|
||||||
"my_texts".to_string(),
|
"my_texts".to_string(),
|
||||||
Aggregation::Bucket(BucketAggregation {
|
Aggregation::Bucket(BucketAggregation {
|
||||||
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
|
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
|
||||||
field: "string_id".to_string(),
|
field: "string_id".to_string(),
|
||||||
order: Some(CustomOrder {
|
order: Some(CustomOrder {
|
||||||
order: Order::Desc,
|
order: Order::Asc,
|
||||||
target: OrderTarget::Key,
|
target: OrderTarget::Key,
|
||||||
}),
|
}),
|
||||||
size: Some(2),
|
size: Some(2),
|
||||||
@@ -1347,3 +1352,68 @@ mod tests {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(all(test, feature = "unstable"))]
|
||||||
|
mod bench {
|
||||||
|
|
||||||
|
use itertools::Itertools;
|
||||||
|
use rand::seq::SliceRandom;
|
||||||
|
use rand::thread_rng;
|
||||||
|
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
fn get_collector_with_buckets(num_docs: u64) -> TermBuckets {
|
||||||
|
TermBuckets::from_req_and_validate(&Default::default(), num_docs as usize).unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_rand_terms(total_terms: u64, num_terms_returned: u64) -> Vec<u64> {
|
||||||
|
let mut rng = thread_rng();
|
||||||
|
|
||||||
|
let all_terms = (0..total_terms - 1).collect_vec();
|
||||||
|
|
||||||
|
let mut vals = vec![];
|
||||||
|
for _ in 0..num_terms_returned {
|
||||||
|
let val = all_terms.as_slice().choose(&mut rng).unwrap();
|
||||||
|
vals.push(*val);
|
||||||
|
}
|
||||||
|
|
||||||
|
vals
|
||||||
|
}
|
||||||
|
|
||||||
|
fn bench_term_buckets(b: &mut test::Bencher, num_terms: u64, total_terms: u64) {
|
||||||
|
let mut collector = get_collector_with_buckets(total_terms);
|
||||||
|
let vals = get_rand_terms(total_terms, num_terms);
|
||||||
|
let aggregations_with_accessor: AggregationsWithAccessor = Default::default();
|
||||||
|
let bucket_count: BucketCount = BucketCount {
|
||||||
|
bucket_count: Default::default(),
|
||||||
|
max_bucket_count: 1_000_001u32,
|
||||||
|
};
|
||||||
|
b.iter(|| {
|
||||||
|
for &val in &vals {
|
||||||
|
collector
|
||||||
|
.increment_bucket(&[val], 0, &aggregations_with_accessor, &bucket_count, &None)
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
#[bench]
|
||||||
|
fn bench_term_buckets_500_of_1_000_000(b: &mut test::Bencher) {
|
||||||
|
bench_term_buckets(b, 500u64, 1_000_000u64)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[bench]
|
||||||
|
fn bench_term_buckets_1_000_000_of_50_000(b: &mut test::Bencher) {
|
||||||
|
bench_term_buckets(b, 1_000_000u64, 50_000u64)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[bench]
|
||||||
|
fn bench_term_buckets_1_000_000_of_50(b: &mut test::Bencher) {
|
||||||
|
bench_term_buckets(b, 1_000_000u64, 50u64)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[bench]
|
||||||
|
fn bench_term_buckets_1_000_000_of_1_000_000(b: &mut test::Bencher) {
|
||||||
|
bench_term_buckets(b, 1_000_000u64, 1_000_000u64)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -499,7 +499,7 @@ impl IntermediateTermBucketResult {
|
|||||||
match req.order.target {
|
match req.order.target {
|
||||||
OrderTarget::Key => {
|
OrderTarget::Key => {
|
||||||
buckets.sort_by(|left, right| {
|
buckets.sort_by(|left, right| {
|
||||||
if req.order.order == Order::Asc {
|
if req.order.order == Order::Desc {
|
||||||
left.key.partial_cmp(&right.key)
|
left.key.partial_cmp(&right.key)
|
||||||
} else {
|
} else {
|
||||||
right.key.partial_cmp(&left.key)
|
right.key.partial_cmp(&left.key)
|
||||||
|
|||||||
@@ -1156,6 +1156,12 @@ mod tests {
|
|||||||
r#"FieldNotFound("not_exist_field")"#
|
r#"FieldNotFound("not_exist_field")"#
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let agg_res = avg_on_field("scores_i64");
|
||||||
|
assert_eq!(
|
||||||
|
format!("{:?}", agg_res),
|
||||||
|
r#"InvalidArgument("Invalid field cardinality on field scores_i64 expected SingleValue, but got MultiValues")"#
|
||||||
|
);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -135,8 +135,6 @@ impl InvertedIndexReader {
|
|||||||
term_info: &TermInfo,
|
term_info: &TermInfo,
|
||||||
option: IndexRecordOption,
|
option: IndexRecordOption,
|
||||||
) -> io::Result<SegmentPostings> {
|
) -> io::Result<SegmentPostings> {
|
||||||
let option = option.downgrade(self.record_option);
|
|
||||||
|
|
||||||
let block_postings = self.read_block_postings_from_terminfo(term_info, option)?;
|
let block_postings = self.read_block_postings_from_terminfo(term_info, option)?;
|
||||||
let position_reader = {
|
let position_reader = {
|
||||||
if option.has_positions() {
|
if option.has_positions() {
|
||||||
|
|||||||
@@ -249,7 +249,7 @@ impl SearcherInner {
|
|||||||
index: Index,
|
index: Index,
|
||||||
segment_readers: Vec<SegmentReader>,
|
segment_readers: Vec<SegmentReader>,
|
||||||
generation: TrackedObject<SearcherGeneration>,
|
generation: TrackedObject<SearcherGeneration>,
|
||||||
doc_store_cache_num_blocks: usize,
|
doc_store_cache_size: usize,
|
||||||
) -> io::Result<SearcherInner> {
|
) -> io::Result<SearcherInner> {
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
&segment_readers
|
&segment_readers
|
||||||
@@ -261,7 +261,7 @@ impl SearcherInner {
|
|||||||
);
|
);
|
||||||
let store_readers: Vec<StoreReader> = segment_readers
|
let store_readers: Vec<StoreReader> = segment_readers
|
||||||
.iter()
|
.iter()
|
||||||
.map(|segment_reader| segment_reader.get_store_reader(doc_store_cache_num_blocks))
|
.map(|segment_reader| segment_reader.get_store_reader(doc_store_cache_size))
|
||||||
.collect::<io::Result<Vec<_>>>()?;
|
.collect::<io::Result<Vec<_>>>()?;
|
||||||
|
|
||||||
Ok(SearcherInner {
|
Ok(SearcherInner {
|
||||||
|
|||||||
@@ -134,12 +134,9 @@ impl SegmentReader {
|
|||||||
&self.fieldnorm_readers
|
&self.fieldnorm_readers
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Accessor to the segment's [`StoreReader`](crate::store::StoreReader).
|
/// Accessor to the segment's `StoreReader`.
|
||||||
///
|
pub fn get_store_reader(&self, cache_size: usize) -> io::Result<StoreReader> {
|
||||||
/// `cache_num_blocks` sets the number of decompressed blocks to be cached in an LRU.
|
StoreReader::open(self.store_file.clone(), cache_size)
|
||||||
/// The size of blocks is configurable, this should be reflexted in the
|
|
||||||
pub fn get_store_reader(&self, cache_num_blocks: usize) -> io::Result<StoreReader> {
|
|
||||||
StoreReader::open(self.store_file.clone(), cache_num_blocks)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Open a new segment for reading.
|
/// Open a new segment for reading.
|
||||||
|
|||||||
@@ -834,23 +834,20 @@ mod tests {
|
|||||||
// This is a bit of a contrived example.
|
// This is a bit of a contrived example.
|
||||||
let tokens = PreTokenizedString {
|
let tokens = PreTokenizedString {
|
||||||
text: "contrived-example".to_string(), //< I can't think of a use case where this corner case happens in real life.
|
text: "contrived-example".to_string(), //< I can't think of a use case where this corner case happens in real life.
|
||||||
tokens: vec![
|
tokens: vec![Token { // Not the last token, yet ends after the last token.
|
||||||
Token {
|
offset_from: 0,
|
||||||
// Not the last token, yet ends after the last token.
|
offset_to: 14,
|
||||||
offset_from: 0,
|
position: 0,
|
||||||
offset_to: 14,
|
text: "long_token".to_string(),
|
||||||
position: 0,
|
position_length: 3,
|
||||||
text: "long_token".to_string(),
|
},
|
||||||
position_length: 3,
|
Token {
|
||||||
},
|
offset_from: 0,
|
||||||
Token {
|
offset_to: 14,
|
||||||
offset_from: 0,
|
position: 1,
|
||||||
offset_to: 14,
|
text: "short".to_string(),
|
||||||
position: 1,
|
position_length: 1,
|
||||||
text: "short".to_string(),
|
}],
|
||||||
position_length: 1,
|
|
||||||
},
|
|
||||||
],
|
|
||||||
};
|
};
|
||||||
doc.add_pre_tokenized_text(text, tokens);
|
doc.add_pre_tokenized_text(text, tokens);
|
||||||
doc.add_text(text, "hello");
|
doc.add_text(text, "hello");
|
||||||
|
|||||||
@@ -109,7 +109,6 @@ impl TermQuery {
|
|||||||
} else {
|
} else {
|
||||||
IndexRecordOption::Basic
|
IndexRecordOption::Basic
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok(TermWeight::new(
|
Ok(TermWeight::new(
|
||||||
self.term.clone(),
|
self.term.clone(),
|
||||||
index_record_option,
|
index_record_option,
|
||||||
|
|||||||
@@ -44,7 +44,7 @@ pub struct IndexReaderBuilder {
|
|||||||
index: Index,
|
index: Index,
|
||||||
warmers: Vec<Weak<dyn Warmer>>,
|
warmers: Vec<Weak<dyn Warmer>>,
|
||||||
num_warming_threads: usize,
|
num_warming_threads: usize,
|
||||||
doc_store_cache_num_blocks: usize,
|
doc_store_cache_size: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl IndexReaderBuilder {
|
impl IndexReaderBuilder {
|
||||||
@@ -55,7 +55,7 @@ impl IndexReaderBuilder {
|
|||||||
index,
|
index,
|
||||||
warmers: Vec::new(),
|
warmers: Vec::new(),
|
||||||
num_warming_threads: 1,
|
num_warming_threads: 1,
|
||||||
doc_store_cache_num_blocks: DOCSTORE_CACHE_CAPACITY,
|
doc_store_cache_size: DOCSTORE_CACHE_CAPACITY,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -72,7 +72,7 @@ impl IndexReaderBuilder {
|
|||||||
searcher_generation_inventory.clone(),
|
searcher_generation_inventory.clone(),
|
||||||
)?;
|
)?;
|
||||||
let inner_reader = InnerIndexReader::new(
|
let inner_reader = InnerIndexReader::new(
|
||||||
self.doc_store_cache_num_blocks,
|
self.doc_store_cache_size,
|
||||||
self.index,
|
self.index,
|
||||||
warming_state,
|
warming_state,
|
||||||
searcher_generation_inventory,
|
searcher_generation_inventory,
|
||||||
@@ -119,11 +119,8 @@ impl IndexReaderBuilder {
|
|||||||
///
|
///
|
||||||
/// The doc store readers cache by default DOCSTORE_CACHE_CAPACITY(100) decompressed blocks.
|
/// The doc store readers cache by default DOCSTORE_CACHE_CAPACITY(100) decompressed blocks.
|
||||||
#[must_use]
|
#[must_use]
|
||||||
pub fn doc_store_cache_num_blocks(
|
pub fn doc_store_cache_size(mut self, doc_store_cache_size: usize) -> IndexReaderBuilder {
|
||||||
mut self,
|
self.doc_store_cache_size = doc_store_cache_size;
|
||||||
doc_store_cache_num_blocks: usize,
|
|
||||||
) -> IndexReaderBuilder {
|
|
||||||
self.doc_store_cache_num_blocks = doc_store_cache_num_blocks;
|
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -154,7 +151,7 @@ impl TryInto<IndexReader> for IndexReaderBuilder {
|
|||||||
}
|
}
|
||||||
|
|
||||||
struct InnerIndexReader {
|
struct InnerIndexReader {
|
||||||
doc_store_cache_num_blocks: usize,
|
doc_store_cache_size: usize,
|
||||||
index: Index,
|
index: Index,
|
||||||
warming_state: WarmingState,
|
warming_state: WarmingState,
|
||||||
searcher: arc_swap::ArcSwap<SearcherInner>,
|
searcher: arc_swap::ArcSwap<SearcherInner>,
|
||||||
@@ -164,7 +161,7 @@ struct InnerIndexReader {
|
|||||||
|
|
||||||
impl InnerIndexReader {
|
impl InnerIndexReader {
|
||||||
fn new(
|
fn new(
|
||||||
doc_store_cache_num_blocks: usize,
|
doc_store_cache_size: usize,
|
||||||
index: Index,
|
index: Index,
|
||||||
warming_state: WarmingState,
|
warming_state: WarmingState,
|
||||||
// The searcher_generation_inventory is not used as source, but as target to track the
|
// The searcher_generation_inventory is not used as source, but as target to track the
|
||||||
@@ -175,13 +172,13 @@ impl InnerIndexReader {
|
|||||||
|
|
||||||
let searcher = Self::create_searcher(
|
let searcher = Self::create_searcher(
|
||||||
&index,
|
&index,
|
||||||
doc_store_cache_num_blocks,
|
doc_store_cache_size,
|
||||||
&warming_state,
|
&warming_state,
|
||||||
&searcher_generation_counter,
|
&searcher_generation_counter,
|
||||||
&searcher_generation_inventory,
|
&searcher_generation_inventory,
|
||||||
)?;
|
)?;
|
||||||
Ok(InnerIndexReader {
|
Ok(InnerIndexReader {
|
||||||
doc_store_cache_num_blocks,
|
doc_store_cache_size,
|
||||||
index,
|
index,
|
||||||
warming_state,
|
warming_state,
|
||||||
searcher: ArcSwap::from(searcher),
|
searcher: ArcSwap::from(searcher),
|
||||||
@@ -217,7 +214,7 @@ impl InnerIndexReader {
|
|||||||
|
|
||||||
fn create_searcher(
|
fn create_searcher(
|
||||||
index: &Index,
|
index: &Index,
|
||||||
doc_store_cache_num_blocks: usize,
|
doc_store_cache_size: usize,
|
||||||
warming_state: &WarmingState,
|
warming_state: &WarmingState,
|
||||||
searcher_generation_counter: &Arc<AtomicU64>,
|
searcher_generation_counter: &Arc<AtomicU64>,
|
||||||
searcher_generation_inventory: &Inventory<SearcherGeneration>,
|
searcher_generation_inventory: &Inventory<SearcherGeneration>,
|
||||||
@@ -235,7 +232,7 @@ impl InnerIndexReader {
|
|||||||
index.clone(),
|
index.clone(),
|
||||||
segment_readers,
|
segment_readers,
|
||||||
searcher_generation,
|
searcher_generation,
|
||||||
doc_store_cache_num_blocks,
|
doc_store_cache_size,
|
||||||
)?);
|
)?);
|
||||||
|
|
||||||
warming_state.warm_new_searcher_generation(&searcher.clone().into())?;
|
warming_state.warm_new_searcher_generation(&searcher.clone().into())?;
|
||||||
@@ -245,7 +242,7 @@ impl InnerIndexReader {
|
|||||||
fn reload(&self) -> crate::Result<()> {
|
fn reload(&self) -> crate::Result<()> {
|
||||||
let searcher = Self::create_searcher(
|
let searcher = Self::create_searcher(
|
||||||
&self.index,
|
&self.index,
|
||||||
self.doc_store_cache_num_blocks,
|
self.doc_store_cache_size,
|
||||||
&self.warming_state,
|
&self.warming_state,
|
||||||
&self.searcher_generation_counter,
|
&self.searcher_generation_counter,
|
||||||
&self.searcher_generation_inventory,
|
&self.searcher_generation_inventory,
|
||||||
|
|||||||
@@ -49,17 +49,4 @@ impl IndexRecordOption {
|
|||||||
IndexRecordOption::WithFreqsAndPositions => true,
|
IndexRecordOption::WithFreqsAndPositions => true,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Downgrades to the next level if provided `IndexRecordOption` is unavailable.
|
|
||||||
pub fn downgrade(&self, other: IndexRecordOption) -> IndexRecordOption {
|
|
||||||
use IndexRecordOption::*;
|
|
||||||
|
|
||||||
match (other, self) {
|
|
||||||
(WithFreqsAndPositions, WithFreqsAndPositions) => WithFreqsAndPositions,
|
|
||||||
(WithFreqs, WithFreqs) => WithFreqs,
|
|
||||||
(WithFreqsAndPositions, WithFreqs) => WithFreqs,
|
|
||||||
(WithFreqs, WithFreqsAndPositions) => WithFreqs,
|
|
||||||
_ => Basic,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -375,8 +375,7 @@ where B: AsRef<[u8]>
|
|||||||
///
|
///
|
||||||
/// Do NOT rely on this byte representation in the index.
|
/// Do NOT rely on this byte representation in the index.
|
||||||
/// This value is likely to change in the future.
|
/// This value is likely to change in the future.
|
||||||
#[inline(always)]
|
pub(crate) fn as_slice(&self) -> &[u8] {
|
||||||
pub fn as_slice(&self) -> &[u8] {
|
|
||||||
self.0.as_ref()
|
self.0.as_ref()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -90,7 +90,7 @@ impl CheckpointBlock {
|
|||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
let mut doc = read_u32_vint(data);
|
let mut doc = read_u32_vint(data);
|
||||||
let mut start_offset = VInt::deserialize_u64(data)? as usize;
|
let mut start_offset = read_u32_vint(data) as usize;
|
||||||
for _ in 0..len {
|
for _ in 0..len {
|
||||||
let num_docs = read_u32_vint(data);
|
let num_docs = read_u32_vint(data);
|
||||||
let block_num_bytes = read_u32_vint(data) as usize;
|
let block_num_bytes = read_u32_vint(data) as usize;
|
||||||
@@ -147,15 +147,6 @@ mod tests {
|
|||||||
test_aux_ser_deser(&checkpoints)
|
test_aux_ser_deser(&checkpoints)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_block_serialize_large_byte_range() -> io::Result<()> {
|
|
||||||
let checkpoints = vec![Checkpoint {
|
|
||||||
doc_range: 10..12,
|
|
||||||
byte_range: 8_000_000_000..9_000_000_000,
|
|
||||||
}];
|
|
||||||
test_aux_ser_deser(&checkpoints)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_block_serialize() -> io::Result<()> {
|
fn test_block_serialize() -> io::Result<()> {
|
||||||
let offsets: Vec<usize> = (0..11).map(|i| i * i * i).collect();
|
let offsets: Vec<usize> = (0..11).map(|i| i * i * i).collect();
|
||||||
|
|||||||
@@ -4,8 +4,8 @@
|
|||||||
//! order to be handled in the `Store`.
|
//! order to be handled in the `Store`.
|
||||||
//!
|
//!
|
||||||
//! Internally, documents (or rather their stored fields) are serialized to a buffer.
|
//! Internally, documents (or rather their stored fields) are serialized to a buffer.
|
||||||
//! When the buffer exceeds `block_size` (defaults to 16K), the buffer is compressed using `brotli`,
|
//! When the buffer exceeds 16K, the buffer is compressed using `brotli`, `LZ4` or `snappy`
|
||||||
//! `LZ4` or `snappy` and the resulting block is written to disk.
|
//! and the resulting block is written to disk.
|
||||||
//!
|
//!
|
||||||
//! One can then request for a specific `DocId`.
|
//! One can then request for a specific `DocId`.
|
||||||
//! A skip list helps navigating to the right block,
|
//! A skip list helps navigating to the right block,
|
||||||
@@ -28,6 +28,8 @@
|
|||||||
//! - at the segment level, the
|
//! - at the segment level, the
|
||||||
//! [`SegmentReader`'s `doc` method](../struct.SegmentReader.html#method.doc)
|
//! [`SegmentReader`'s `doc` method](../struct.SegmentReader.html#method.doc)
|
||||||
//! - at the index level, the [`Searcher::doc()`](crate::Searcher::doc) method
|
//! - at the index level, the [`Searcher::doc()`](crate::Searcher::doc) method
|
||||||
|
//!
|
||||||
|
//! !
|
||||||
|
|
||||||
mod compressors;
|
mod compressors;
|
||||||
mod decompressors;
|
mod decompressors;
|
||||||
|
|||||||
@@ -114,10 +114,7 @@ impl Sum for CacheStats {
|
|||||||
|
|
||||||
impl StoreReader {
|
impl StoreReader {
|
||||||
/// Opens a store reader
|
/// Opens a store reader
|
||||||
///
|
pub fn open(store_file: FileSlice, cache_size: usize) -> io::Result<StoreReader> {
|
||||||
/// `cache_num_blocks` sets the number of decompressed blocks to be cached in an LRU.
|
|
||||||
/// The size of blocks is configurable, this should be reflexted in the
|
|
||||||
pub fn open(store_file: FileSlice, cache_num_blocks: usize) -> io::Result<StoreReader> {
|
|
||||||
let (footer, data_and_offset) = DocStoreFooter::extract_footer(store_file)?;
|
let (footer, data_and_offset) = DocStoreFooter::extract_footer(store_file)?;
|
||||||
|
|
||||||
let (data_file, offset_index_file) = data_and_offset.split(footer.offset as usize);
|
let (data_file, offset_index_file) = data_and_offset.split(footer.offset as usize);
|
||||||
@@ -128,8 +125,8 @@ impl StoreReader {
|
|||||||
decompressor: footer.decompressor,
|
decompressor: footer.decompressor,
|
||||||
data: data_file,
|
data: data_file,
|
||||||
cache: BlockCache {
|
cache: BlockCache {
|
||||||
cache: NonZeroUsize::new(cache_num_blocks)
|
cache: NonZeroUsize::new(cache_size)
|
||||||
.map(|cache_num_blocks| Mutex::new(LruCache::new(cache_num_blocks))),
|
.map(|cache_size| Mutex::new(LruCache::new(cache_size))),
|
||||||
cache_hits: Default::default(),
|
cache_hits: Default::default(),
|
||||||
cache_misses: Default::default(),
|
cache_misses: Default::default(),
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -30,6 +30,7 @@ use crate::{BlockAddr, DeltaReader, Reader, SSTable, SSTableIndex, TermOrdinal};
|
|||||||
/// block boundary.
|
/// block boundary.
|
||||||
///
|
///
|
||||||
/// (See also README.md)
|
/// (See also README.md)
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
pub struct Dictionary<TSSTable: SSTable> {
|
pub struct Dictionary<TSSTable: SSTable> {
|
||||||
pub sstable_slice: FileSlice,
|
pub sstable_slice: FileSlice,
|
||||||
pub sstable_index: SSTableIndex,
|
pub sstable_index: SSTableIndex,
|
||||||
|
|||||||
@@ -117,6 +117,7 @@ impl SSTable for MonotonicU64SSTable {
|
|||||||
/// `range_sstable[k1].end == range_sstable[k2].start`.
|
/// `range_sstable[k1].end == range_sstable[k2].start`.
|
||||||
///
|
///
|
||||||
/// The first range is not required to start at `0`.
|
/// The first range is not required to start at `0`.
|
||||||
|
#[derive(Clone, Copy, Debug)]
|
||||||
pub struct RangeSSTable;
|
pub struct RangeSSTable;
|
||||||
|
|
||||||
impl SSTable for RangeSSTable {
|
impl SSTable for RangeSSTable {
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ use serde::{Deserialize, Serialize};
|
|||||||
|
|
||||||
use crate::{common_prefix_len, SSTableDataCorruption, TermOrdinal};
|
use crate::{common_prefix_len, SSTableDataCorruption, TermOrdinal};
|
||||||
|
|
||||||
#[derive(Default, Debug, Serialize, Deserialize)]
|
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
|
||||||
pub struct SSTableIndex {
|
pub struct SSTableIndex {
|
||||||
blocks: Vec<BlockMeta>,
|
blocks: Vec<BlockMeta>,
|
||||||
}
|
}
|
||||||
@@ -75,7 +75,7 @@ pub struct BlockAddr {
|
|||||||
pub first_ordinal: u64,
|
pub first_ordinal: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
pub(crate) struct BlockMeta {
|
pub(crate) struct BlockMeta {
|
||||||
/// Any byte string that is lexicographically greater or equal to
|
/// Any byte string that is lexicographically greater or equal to
|
||||||
/// the last key in the block,
|
/// the last key in the block,
|
||||||
|
|||||||
Reference in New Issue
Block a user