Compare commits

..

2 Commits

Author SHA1 Message Date
trinity-1686a
116c6d3621 make Term::as_slice public 2023-02-09 13:43:01 +01:00
PSeitz
0f20787917 fix doc store cache docs (#1821)
* fix doc store cache docs

addresses an issue reported in #1820

* rename doc_store_cache_size
2023-01-23 07:06:49 +01:00
17 changed files with 65 additions and 124 deletions

View File

@@ -22,9 +22,6 @@ pub struct Column<T> {
}
impl<T: PartialOrd> Column<T> {
pub fn get_cardinality(&self) -> Cardinality {
self.idx.get_cardinality()
}
pub fn num_rows(&self) -> RowId {
match &self.idx {
ColumnIndex::Full => self.values.num_vals() as u32,

View File

@@ -1,10 +1,9 @@
use std::collections::HashMap;
use std::io;
use super::writer::ColumnarSerializer;
use crate::columnar::ColumnarReader;
use crate::dynamic_column::DynamicColumn;
use crate::{Cardinality, ColumnType};
use crate::ColumnType;
pub enum MergeDocOrder {
/// Columnar tables are simply stacked one above the other.
@@ -20,30 +19,20 @@ pub enum MergeDocOrder {
}
pub fn merge_columnar(
columnar_readers: &[ColumnarReader],
_columnar_readers: &[ColumnarReader],
mapping: MergeDocOrder,
output: &mut impl io::Write,
_output: &mut impl io::Write,
) -> io::Result<()> {
let mut serializer = ColumnarSerializer::new(output);
// TODO handle dictionary merge for Str/Bytes column
let field_name_to_group = group_columns_for_merge(columnar_readers)?;
for (column_name, category_to_columns) in field_name_to_group {
for (_category, columns_to_merge) in category_to_columns {
let column_type = columns_to_merge[0].column_type();
let mut column_serialzier =
serializer.serialize_column(column_name.as_bytes(), column_type);
merge_columns(
column_type,
&columns_to_merge,
&mapping,
&mut column_serialzier,
)?;
match mapping {
MergeDocOrder::Stack => {
// implement me :)
todo!();
}
MergeDocOrder::Complex(_) => {
// for later
todo!();
}
}
serializer.finalize()?;
Ok(())
}
/// Column types are grouped into different categories.
@@ -55,7 +44,7 @@ pub fn merge_columnar(
/// See also [README.md].
#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
#[repr(u8)]
pub enum ColumnTypeCategory {
enum ColumnTypeCategory {
Bool,
Str,
Numerical,
@@ -79,41 +68,8 @@ impl From<ColumnType> for ColumnTypeCategory {
}
}
pub fn detect_cardinality(columns: &[DynamicColumn]) -> Cardinality {
if columns
.iter()
.any(|column| column.get_cardinality().is_multivalue())
{
return Cardinality::Multivalued;
}
if columns
.iter()
.any(|column| column.get_cardinality().is_optional())
{
return Cardinality::Optional;
}
Cardinality::Full
}
pub fn compute_num_docs(columns: &[DynamicColumn], mapping: &MergeDocOrder) -> usize {
// TODO handle deletes
0
}
pub fn merge_columns(
column_type: ColumnType,
columns: &[DynamicColumn],
mapping: &MergeDocOrder,
column_serializer: &mut impl io::Write,
) -> io::Result<()> {
let cardinality = detect_cardinality(columns);
Ok(())
}
pub fn group_columns_for_merge(
columnar_readers: &[ColumnarReader],
fn collect_columns(
columnar_readers: &[&ColumnarReader],
) -> io::Result<HashMap<String, HashMap<ColumnTypeCategory, Vec<DynamicColumn>>>> {
// Each column name may have multiple types of column associated.
// For merging we are interested in the same column type category since they can be merged.
@@ -161,20 +117,26 @@ fn cast_to_common_numerical_column(columns: &[DynamicColumn]) -> Vec<DynamicColu
.all(|column| column.column_type().numerical_type().is_some()));
let coerce_to_i64: Vec<_> = columns
.iter()
.filter_map(|column| column.clone().coerce_to_i64())
.map(|column| column.clone().coerce_to_i64())
.collect();
if coerce_to_i64.len() == columns.len() {
return coerce_to_i64;
if coerce_to_i64.iter().all(|column| column.is_some()) {
return coerce_to_i64
.into_iter()
.map(|column| column.unwrap())
.collect();
}
let coerce_to_u64: Vec<_> = columns
.iter()
.filter_map(|column| column.clone().coerce_to_u64())
.map(|column| column.clone().coerce_to_u64())
.collect();
if coerce_to_u64.len() == columns.len() {
return coerce_to_u64;
if coerce_to_u64.iter().all(|column| column.is_some()) {
return coerce_to_u64
.into_iter()
.map(|column| column.unwrap())
.collect();
}
columns
@@ -221,9 +183,7 @@ mod tests {
ColumnarReader::open(buffer).unwrap()
};
let column_map =
group_columns_for_merge(&[columnar1.clone(), columnar2.clone(), columnar3.clone()])
.unwrap();
let column_map = collect_columns(&[&columnar1, &columnar2, &columnar3]).unwrap();
assert_eq!(column_map.len(), 1);
let cat_to_columns = column_map.get("numbers").unwrap();
assert_eq!(cat_to_columns.len(), 1);
@@ -231,14 +191,14 @@ mod tests {
let numerical = cat_to_columns.get(&ColumnTypeCategory::Numerical).unwrap();
assert!(numerical.iter().all(|column| column.is_f64()));
let column_map = group_columns_for_merge(&[columnar1.clone(), columnar1.clone()]).unwrap();
let column_map = collect_columns(&[&columnar1, &columnar1]).unwrap();
assert_eq!(column_map.len(), 1);
let cat_to_columns = column_map.get("numbers").unwrap();
assert_eq!(cat_to_columns.len(), 1);
let numerical = cat_to_columns.get(&ColumnTypeCategory::Numerical).unwrap();
assert!(numerical.iter().all(|column| column.is_i64()));
let column_map = group_columns_for_merge(&[columnar2.clone(), columnar2.clone()]).unwrap();
let column_map = collect_columns(&[&columnar2, &columnar2]).unwrap();
assert_eq!(column_map.len(), 1);
let cat_to_columns = column_map.get("numbers").unwrap();
assert_eq!(cat_to_columns.len(), 1);

View File

@@ -1 +0,0 @@

View File

@@ -1,7 +1,6 @@
mod column_type;
mod format_version;
mod merge;
mod merge_index;
mod reader;
mod writer;

View File

@@ -13,7 +13,6 @@ fn io_invalid_data(msg: String) -> io::Error {
/// The ColumnarReader makes it possible to access a set of columns
/// associated to field names.
#[derive(Clone)]
pub struct ColumnarReader {
column_dictionary: Dictionary<RangeSSTable>,
column_data: FileSlice,

View File

@@ -8,7 +8,7 @@ use std::net::Ipv6Addr;
use column_operation::ColumnOperation;
use common::CountingWriter;
pub(crate) use serializer::ColumnarSerializer;
use serializer::ColumnarSerializer;
use stacker::{Addr, ArenaHashMap, MemoryArena};
use crate::column_index::SerializableColumnIndex;

View File

@@ -8,7 +8,7 @@ use common::{HasLen, OwnedBytes};
use crate::column::{BytesColumn, Column, StrColumn};
use crate::column_values::{monotonic_map_column, StrictlyMonotonicFn};
use crate::columnar::ColumnType;
use crate::{Cardinality, DateTime, NumericalType};
use crate::{DateTime, NumericalType};
#[derive(Clone)]
pub enum DynamicColumn {
@@ -23,18 +23,6 @@ pub enum DynamicColumn {
}
impl DynamicColumn {
pub fn get_cardinality(&self) -> Cardinality {
match self {
DynamicColumn::Bool(c) => c.get_cardinality(),
DynamicColumn::I64(c) => c.get_cardinality(),
DynamicColumn::U64(c) => c.get_cardinality(),
DynamicColumn::F64(c) => c.get_cardinality(),
DynamicColumn::IpAddr(c) => c.get_cardinality(),
DynamicColumn::DateTime(c) => c.get_cardinality(),
DynamicColumn::Bytes(c) => c.ords().get_cardinality(),
DynamicColumn::Str(c) => c.ords().get_cardinality(),
}
}
pub fn column_type(&self) -> ColumnType {
match self {
DynamicColumn::Bool(_) => ColumnType::Bool,

View File

@@ -62,12 +62,6 @@ pub enum Cardinality {
}
impl Cardinality {
pub fn is_optional(&self) -> bool {
matches!(self, Cardinality::Optional)
}
pub fn is_multivalue(&self) -> bool {
matches!(self, Cardinality::Multivalued)
}
pub(crate) fn to_code(self) -> u8 {
self as u8
}

View File

@@ -249,7 +249,7 @@ impl SearcherInner {
index: Index,
segment_readers: Vec<SegmentReader>,
generation: TrackedObject<SearcherGeneration>,
doc_store_cache_size: usize,
doc_store_cache_num_blocks: usize,
) -> io::Result<SearcherInner> {
assert_eq!(
&segment_readers
@@ -261,7 +261,7 @@ impl SearcherInner {
);
let store_readers: Vec<StoreReader> = segment_readers
.iter()
.map(|segment_reader| segment_reader.get_store_reader(doc_store_cache_size))
.map(|segment_reader| segment_reader.get_store_reader(doc_store_cache_num_blocks))
.collect::<io::Result<Vec<_>>>()?;
Ok(SearcherInner {

View File

@@ -134,9 +134,12 @@ impl SegmentReader {
&self.fieldnorm_readers
}
/// Accessor to the segment's `StoreReader`.
pub fn get_store_reader(&self, cache_size: usize) -> io::Result<StoreReader> {
StoreReader::open(self.store_file.clone(), cache_size)
/// Accessor to the segment's [`StoreReader`](crate::store::StoreReader).
///
/// `cache_num_blocks` sets the number of decompressed blocks to be cached in an LRU.
/// The size of blocks is configurable, this should be reflexted in the
pub fn get_store_reader(&self, cache_num_blocks: usize) -> io::Result<StoreReader> {
StoreReader::open(self.store_file.clone(), cache_num_blocks)
}
/// Open a new segment for reading.

View File

@@ -44,7 +44,7 @@ pub struct IndexReaderBuilder {
index: Index,
warmers: Vec<Weak<dyn Warmer>>,
num_warming_threads: usize,
doc_store_cache_size: usize,
doc_store_cache_num_blocks: usize,
}
impl IndexReaderBuilder {
@@ -55,7 +55,7 @@ impl IndexReaderBuilder {
index,
warmers: Vec::new(),
num_warming_threads: 1,
doc_store_cache_size: DOCSTORE_CACHE_CAPACITY,
doc_store_cache_num_blocks: DOCSTORE_CACHE_CAPACITY,
}
}
@@ -72,7 +72,7 @@ impl IndexReaderBuilder {
searcher_generation_inventory.clone(),
)?;
let inner_reader = InnerIndexReader::new(
self.doc_store_cache_size,
self.doc_store_cache_num_blocks,
self.index,
warming_state,
searcher_generation_inventory,
@@ -119,8 +119,11 @@ impl IndexReaderBuilder {
///
/// The doc store readers cache by default DOCSTORE_CACHE_CAPACITY(100) decompressed blocks.
#[must_use]
pub fn doc_store_cache_size(mut self, doc_store_cache_size: usize) -> IndexReaderBuilder {
self.doc_store_cache_size = doc_store_cache_size;
pub fn doc_store_cache_num_blocks(
mut self,
doc_store_cache_num_blocks: usize,
) -> IndexReaderBuilder {
self.doc_store_cache_num_blocks = doc_store_cache_num_blocks;
self
}
@@ -151,7 +154,7 @@ impl TryInto<IndexReader> for IndexReaderBuilder {
}
struct InnerIndexReader {
doc_store_cache_size: usize,
doc_store_cache_num_blocks: usize,
index: Index,
warming_state: WarmingState,
searcher: arc_swap::ArcSwap<SearcherInner>,
@@ -161,7 +164,7 @@ struct InnerIndexReader {
impl InnerIndexReader {
fn new(
doc_store_cache_size: usize,
doc_store_cache_num_blocks: usize,
index: Index,
warming_state: WarmingState,
// The searcher_generation_inventory is not used as source, but as target to track the
@@ -172,13 +175,13 @@ impl InnerIndexReader {
let searcher = Self::create_searcher(
&index,
doc_store_cache_size,
doc_store_cache_num_blocks,
&warming_state,
&searcher_generation_counter,
&searcher_generation_inventory,
)?;
Ok(InnerIndexReader {
doc_store_cache_size,
doc_store_cache_num_blocks,
index,
warming_state,
searcher: ArcSwap::from(searcher),
@@ -214,7 +217,7 @@ impl InnerIndexReader {
fn create_searcher(
index: &Index,
doc_store_cache_size: usize,
doc_store_cache_num_blocks: usize,
warming_state: &WarmingState,
searcher_generation_counter: &Arc<AtomicU64>,
searcher_generation_inventory: &Inventory<SearcherGeneration>,
@@ -232,7 +235,7 @@ impl InnerIndexReader {
index.clone(),
segment_readers,
searcher_generation,
doc_store_cache_size,
doc_store_cache_num_blocks,
)?);
warming_state.warm_new_searcher_generation(&searcher.clone().into())?;
@@ -242,7 +245,7 @@ impl InnerIndexReader {
fn reload(&self) -> crate::Result<()> {
let searcher = Self::create_searcher(
&self.index,
self.doc_store_cache_size,
self.doc_store_cache_num_blocks,
&self.warming_state,
&self.searcher_generation_counter,
&self.searcher_generation_inventory,

View File

@@ -375,7 +375,7 @@ where B: AsRef<[u8]>
///
/// Do NOT rely on this byte representation in the index.
/// This value is likely to change in the future.
pub(crate) fn as_slice(&self) -> &[u8] {
pub fn as_slice(&self) -> &[u8] {
self.0.as_ref()
}
}

View File

@@ -4,8 +4,8 @@
//! order to be handled in the `Store`.
//!
//! Internally, documents (or rather their stored fields) are serialized to a buffer.
//! When the buffer exceeds 16K, the buffer is compressed using `brotli`, `LZ4` or `snappy`
//! and the resulting block is written to disk.
//! When the buffer exceeds `block_size` (defaults to 16K), the buffer is compressed using `brotli`,
//! `LZ4` or `snappy` and the resulting block is written to disk.
//!
//! One can then request for a specific `DocId`.
//! A skip list helps navigating to the right block,
@@ -28,8 +28,6 @@
//! - at the segment level, the
//! [`SegmentReader`'s `doc` method](../struct.SegmentReader.html#method.doc)
//! - at the index level, the [`Searcher::doc()`](crate::Searcher::doc) method
//!
//! !
mod compressors;
mod decompressors;

View File

@@ -114,7 +114,10 @@ impl Sum for CacheStats {
impl StoreReader {
/// Opens a store reader
pub fn open(store_file: FileSlice, cache_size: usize) -> io::Result<StoreReader> {
///
/// `cache_num_blocks` sets the number of decompressed blocks to be cached in an LRU.
/// The size of blocks is configurable, this should be reflexted in the
pub fn open(store_file: FileSlice, cache_num_blocks: usize) -> io::Result<StoreReader> {
let (footer, data_and_offset) = DocStoreFooter::extract_footer(store_file)?;
let (data_file, offset_index_file) = data_and_offset.split(footer.offset as usize);
@@ -125,8 +128,8 @@ impl StoreReader {
decompressor: footer.decompressor,
data: data_file,
cache: BlockCache {
cache: NonZeroUsize::new(cache_size)
.map(|cache_size| Mutex::new(LruCache::new(cache_size))),
cache: NonZeroUsize::new(cache_num_blocks)
.map(|cache_num_blocks| Mutex::new(LruCache::new(cache_num_blocks))),
cache_hits: Default::default(),
cache_misses: Default::default(),
},

View File

@@ -30,7 +30,6 @@ use crate::{BlockAddr, DeltaReader, Reader, SSTable, SSTableIndex, TermOrdinal};
/// block boundary.
///
/// (See also README.md)
#[derive(Debug, Clone)]
pub struct Dictionary<TSSTable: SSTable> {
pub sstable_slice: FileSlice,
pub sstable_index: SSTableIndex,

View File

@@ -117,7 +117,6 @@ impl SSTable for MonotonicU64SSTable {
/// `range_sstable[k1].end == range_sstable[k2].start`.
///
/// The first range is not required to start at `0`.
#[derive(Clone, Copy, Debug)]
pub struct RangeSSTable;
impl SSTable for RangeSSTable {

View File

@@ -5,7 +5,7 @@ use serde::{Deserialize, Serialize};
use crate::{common_prefix_len, SSTableDataCorruption, TermOrdinal};
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
#[derive(Default, Debug, Serialize, Deserialize)]
pub struct SSTableIndex {
blocks: Vec<BlockMeta>,
}
@@ -75,7 +75,7 @@ pub struct BlockAddr {
pub first_ordinal: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct BlockMeta {
/// Any byte string that is lexicographically greater or equal to
/// the last key in the block,