Merge with main

This commit is contained in:
Paul Masurel
2023-02-09 14:28:57 +01:00
10 changed files with 85 additions and 85 deletions

View File

@@ -55,12 +55,12 @@ measure_time = "0.8.2"
async-trait = "0.1.53"
arc-swap = "1.5.0"
columnar = { version="0.1", path="./columnar", package ="tantivy-columnar" }
sstable = { version="0.1", path="./sstable", package ="tantivy-sstable", optional = true }
stacker = { version="0.1", path="./stacker", package ="tantivy-stacker" }
tantivy-query-grammar = { version= "0.19.0", path="./query-grammar" }
tantivy-bitpacker = { version= "0.3", path="./bitpacker" }
columnar = { version= "0.1", path="./columnar", package="tantivy-columnar" }
common = { version= "0.5", path = "./common/", package = "tantivy-common" }
tantivy-bitpacker = { version= "0.3", path="./bitpacker" }
common = { version= "0.5", path = "./common/", package = "tantivy-common" }
tokenizer-api = { version="0.1", path="./tokenizer-api", package="tantivy-tokenizer-api" }
[target.'cfg(windows)'.dependencies]

View File

@@ -5,24 +5,23 @@ edition = "2021"
license = "MIT"
[dependencies]
itertools = "0.10.5"
log = "0.4.17"
fnv = "1.0.7"
fastdivide = "0.4.0"
rand = { version = "0.8.5", optional = true }
measure_time = { version = "0.8.2", optional = true }
prettytable-rs = { version = "0.10.0", optional = true }
stacker = { path = "../stacker", package="tantivy-stacker"}
serde_json = "1"
thiserror = "1"
fnv = "1"
sstable = { path = "../sstable", package = "tantivy-sstable" }
common = { path = "../common", package = "tantivy-common" }
itertools = "0.10"
log = "0.4"
tantivy-bitpacker = { version= "0.3", path = "../bitpacker/" }
prettytable-rs = {version="0.10.0", optional= true}
rand = {version="0.8.3", optional= true}
fastdivide = "0.4"
measure_time = { version="0.8.2", optional=true}
[dev-dependencies]
proptest = "1"
more-asserts = "0.3.0"
rand = "0.8.3"
more-asserts = "0.3.1"
rand = "0.8.5"
[features]
unstable = []

View File

@@ -4,10 +4,8 @@ use std::net::Ipv6Addr;
use crate::value::NumericalType;
use crate::InvalidData;
/// The column type represents the column type and can fit on 6-bits.
///
/// - bits[0..3]: Column category type.
/// - bits[3..6]: Numerical type if necessary.
/// The column type represents the column type.
/// Any changes need to be propagated to `COLUMN_TYPES`.
#[derive(Hash, Eq, PartialEq, Debug, Clone, Copy, Ord, PartialOrd)]
#[repr(u8)]
pub enum ColumnType {
@@ -132,39 +130,6 @@ impl HasAssociatedColumnType for Ipv6Addr {
}
}
/// Column types are grouped into different categories that
/// corresponds to the different types of `JsonValue` types.
///
/// The columnar writer will apply coercion rules to make sure that
/// at most one column exist per `ColumnTypeCategory`.
///
/// See also [README.md].
#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug)]
#[repr(u8)]
pub enum ColumnTypeCategory {
Bool,
Str,
Numerical,
DateTime,
Bytes,
IpAddr,
}
impl From<ColumnType> for ColumnTypeCategory {
fn from(column_type: ColumnType) -> Self {
match column_type {
ColumnType::I64 => ColumnTypeCategory::Numerical,
ColumnType::U64 => ColumnTypeCategory::Numerical,
ColumnType::F64 => ColumnTypeCategory::Numerical,
ColumnType::Bytes => ColumnTypeCategory::Bytes,
ColumnType::Str => ColumnTypeCategory::Str,
ColumnType::Bool => ColumnTypeCategory::Bool,
ColumnType::IpAddr => ColumnTypeCategory::IpAddr,
ColumnType::DateTime => ColumnTypeCategory::DateTime,
}
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -14,7 +14,6 @@ pub use merge_mapping::{MergeRowOrder, ShuffleMergeOrder, StackMergeOrder};
use super::writer::ColumnarSerializer;
use crate::column::{serialize_column_mappable_to_u128, serialize_column_mappable_to_u64};
use crate::column_values::MergedColumnValues;
use crate::columnar::column_type::ColumnTypeCategory;
use crate::columnar::merge::merge_dict_column::merge_bytes_or_str_column;
use crate::columnar::writer::CompatibleNumericalTypes;
use crate::columnar::ColumnarReader;
@@ -23,6 +22,38 @@ use crate::{
BytesColumn, Column, ColumnIndex, ColumnType, ColumnValues, NumericalType, NumericalValue,
};
/// Column types are grouped into different categories.
/// After merge, all columns belonging to the same category are coerced to
/// the same column type.
///
/// In practise, today, only Numerical colummns are coerced into one type today.
///
/// See also [README.md].
#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
enum ColumnTypeCategory {
Bool,
Str,
Numerical,
DateTime,
Bytes,
IpAddr,
}
impl From<ColumnType> for ColumnTypeCategory {
fn from(column_type: ColumnType) -> Self {
match column_type {
ColumnType::I64 => ColumnTypeCategory::Numerical,
ColumnType::U64 => ColumnTypeCategory::Numerical,
ColumnType::F64 => ColumnTypeCategory::Numerical,
ColumnType::Bytes => ColumnTypeCategory::Bytes,
ColumnType::Str => ColumnTypeCategory::Str,
ColumnType::Bool => ColumnTypeCategory::Bool,
ColumnType::IpAddr => ColumnTypeCategory::IpAddr,
ColumnType::DateTime => ColumnTypeCategory::DateTime,
}
}
}
pub fn merge_columnar(
columnar_readers: &[&ColumnarReader],
merge_row_order: MergeRowOrder,

View File

@@ -16,7 +16,7 @@ use crate::column_index::SerializableColumnIndex;
use crate::column_values::{
ColumnValues, MonotonicallyMappableToU128, MonotonicallyMappableToU64, VecColumn,
};
use crate::columnar::column_type::{ColumnType, ColumnTypeCategory};
use crate::columnar::column_type::ColumnType;
use crate::columnar::writer::column_writers::{
ColumnWriter, NumericalColumnWriter, StrOrBytesColumnWriter,
};
@@ -403,14 +403,12 @@ impl ColumnarWriter {
)?;
}
ColumnType::Bytes | ColumnType::Str => {
let (column_type, str_or_bytes_column_writer): (
ColumnType,
StrOrBytesColumnWriter,
) = if column_type == ColumnType::Bytes {
(ColumnType::Bytes, self.bytes_field_hash_map.read(addr))
} else {
(ColumnType::Str, self.str_field_hash_map.read(addr))
};
let str_or_bytes_column_writer: StrOrBytesColumnWriter =
if column_type == ColumnType::Bytes {
self.bytes_field_hash_map.read(addr)
} else {
self.str_field_hash_map.read(addr)
};
let dictionary_builder =
&dictionaries[str_or_bytes_column_writer.dictionary_id as usize];
let cardinality = str_or_bytes_column_writer

View File

@@ -249,7 +249,7 @@ impl SearcherInner {
index: Index,
segment_readers: Vec<SegmentReader>,
generation: TrackedObject<SearcherGeneration>,
doc_store_cache_size: usize,
doc_store_cache_num_blocks: usize,
) -> io::Result<SearcherInner> {
assert_eq!(
&segment_readers
@@ -261,7 +261,7 @@ impl SearcherInner {
);
let store_readers: Vec<StoreReader> = segment_readers
.iter()
.map(|segment_reader| segment_reader.get_store_reader(doc_store_cache_size))
.map(|segment_reader| segment_reader.get_store_reader(doc_store_cache_num_blocks))
.collect::<io::Result<Vec<_>>>()?;
Ok(SearcherInner {

View File

@@ -128,9 +128,12 @@ impl SegmentReader {
&self.fieldnorm_readers
}
/// Accessor to the segment's `StoreReader`.
pub fn get_store_reader(&self, cache_size: usize) -> io::Result<StoreReader> {
StoreReader::open(self.store_file.clone(), cache_size)
/// Accessor to the segment's [`StoreReader`](crate::store::StoreReader).
///
/// `cache_num_blocks` sets the number of decompressed blocks to be cached in an LRU.
/// The size of blocks is configurable, this should be reflexted in the
pub fn get_store_reader(&self, cache_num_blocks: usize) -> io::Result<StoreReader> {
StoreReader::open(self.store_file.clone(), cache_num_blocks)
}
/// Open a new segment for reading.

View File

@@ -44,7 +44,7 @@ pub struct IndexReaderBuilder {
index: Index,
warmers: Vec<Weak<dyn Warmer>>,
num_warming_threads: usize,
doc_store_cache_size: usize,
doc_store_cache_num_blocks: usize,
}
impl IndexReaderBuilder {
@@ -55,7 +55,7 @@ impl IndexReaderBuilder {
index,
warmers: Vec::new(),
num_warming_threads: 1,
doc_store_cache_size: DOCSTORE_CACHE_CAPACITY,
doc_store_cache_num_blocks: DOCSTORE_CACHE_CAPACITY,
}
}
@@ -72,7 +72,7 @@ impl IndexReaderBuilder {
searcher_generation_inventory.clone(),
)?;
let inner_reader = InnerIndexReader::new(
self.doc_store_cache_size,
self.doc_store_cache_num_blocks,
self.index,
warming_state,
searcher_generation_inventory,
@@ -119,8 +119,11 @@ impl IndexReaderBuilder {
///
/// The doc store readers cache by default DOCSTORE_CACHE_CAPACITY(100) decompressed blocks.
#[must_use]
pub fn doc_store_cache_size(mut self, doc_store_cache_size: usize) -> IndexReaderBuilder {
self.doc_store_cache_size = doc_store_cache_size;
pub fn doc_store_cache_num_blocks(
mut self,
doc_store_cache_num_blocks: usize,
) -> IndexReaderBuilder {
self.doc_store_cache_num_blocks = doc_store_cache_num_blocks;
self
}
@@ -151,7 +154,7 @@ impl TryInto<IndexReader> for IndexReaderBuilder {
}
struct InnerIndexReader {
doc_store_cache_size: usize,
doc_store_cache_num_blocks: usize,
index: Index,
warming_state: WarmingState,
searcher: arc_swap::ArcSwap<SearcherInner>,
@@ -161,7 +164,7 @@ struct InnerIndexReader {
impl InnerIndexReader {
fn new(
doc_store_cache_size: usize,
doc_store_cache_num_blocks: usize,
index: Index,
warming_state: WarmingState,
// The searcher_generation_inventory is not used as source, but as target to track the
@@ -172,13 +175,13 @@ impl InnerIndexReader {
let searcher = Self::create_searcher(
&index,
doc_store_cache_size,
doc_store_cache_num_blocks,
&warming_state,
&searcher_generation_counter,
&searcher_generation_inventory,
)?;
Ok(InnerIndexReader {
doc_store_cache_size,
doc_store_cache_num_blocks,
index,
warming_state,
searcher: ArcSwap::from(searcher),
@@ -214,7 +217,7 @@ impl InnerIndexReader {
fn create_searcher(
index: &Index,
doc_store_cache_size: usize,
doc_store_cache_num_blocks: usize,
warming_state: &WarmingState,
searcher_generation_counter: &Arc<AtomicU64>,
searcher_generation_inventory: &Inventory<SearcherGeneration>,
@@ -232,7 +235,7 @@ impl InnerIndexReader {
index.clone(),
segment_readers,
searcher_generation,
doc_store_cache_size,
doc_store_cache_num_blocks,
)?);
warming_state.warm_new_searcher_generation(&searcher.clone().into())?;
@@ -242,7 +245,7 @@ impl InnerIndexReader {
fn reload(&self) -> crate::Result<()> {
let searcher = Self::create_searcher(
&self.index,
self.doc_store_cache_size,
self.doc_store_cache_num_blocks,
&self.warming_state,
&self.searcher_generation_counter,
&self.searcher_generation_inventory,

View File

@@ -4,8 +4,8 @@
//! order to be handled in the `Store`.
//!
//! Internally, documents (or rather their stored fields) are serialized to a buffer.
//! When the buffer exceeds 16K, the buffer is compressed using `brotli`, `LZ4` or `snappy`
//! and the resulting block is written to disk.
//! When the buffer exceeds `block_size` (defaults to 16K), the buffer is compressed using `brotli`,
//! `LZ4` or `snappy` and the resulting block is written to disk.
//!
//! One can then request for a specific `DocId`.
//! A skip list helps navigating to the right block,
@@ -28,8 +28,6 @@
//! - at the segment level, the
//! [`SegmentReader`'s `doc` method](../struct.SegmentReader.html#method.doc)
//! - at the index level, the [`Searcher::doc()`](crate::Searcher::doc) method
//!
//! !
mod compressors;
mod decompressors;

View File

@@ -114,7 +114,10 @@ impl Sum for CacheStats {
impl StoreReader {
/// Opens a store reader
pub fn open(store_file: FileSlice, cache_size: usize) -> io::Result<StoreReader> {
///
/// `cache_num_blocks` sets the number of decompressed blocks to be cached in an LRU.
/// The size of blocks is configurable, this should be reflexted in the
pub fn open(store_file: FileSlice, cache_num_blocks: usize) -> io::Result<StoreReader> {
let (footer, data_and_offset) = DocStoreFooter::extract_footer(store_file)?;
let (data_file, offset_index_file) = data_and_offset.split(footer.offset as usize);
@@ -125,8 +128,8 @@ impl StoreReader {
decompressor: footer.decompressor,
data: data_file,
cache: BlockCache {
cache: NonZeroUsize::new(cache_size)
.map(|cache_size| Mutex::new(LruCache::new(cache_size))),
cache: NonZeroUsize::new(cache_num_blocks)
.map(|cache_num_blocks| Mutex::new(LruCache::new(cache_num_blocks))),
cache_hits: Default::default(),
cache_misses: Default::default(),
},