mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-01-06 09:12:55 +00:00
Fixed num_vals
This commit is contained in:
3
TODO.txt
3
TODO.txt
@@ -13,3 +13,6 @@ find a way to unify the two DateTime.
|
||||
readd type check in the filter wrapper
|
||||
|
||||
add unit test on columnar list columns.
|
||||
|
||||
make sure sort works
|
||||
|
||||
|
||||
@@ -6,6 +6,7 @@ use sstable::{Dictionary, VoidSSTable};
|
||||
|
||||
use crate::column::Column;
|
||||
use crate::column_index::ColumnIndex;
|
||||
use crate::RowId;
|
||||
|
||||
/// Dictionary encoded column.
|
||||
#[derive(Clone)]
|
||||
@@ -24,6 +25,10 @@ impl BytesColumn {
|
||||
pub fn term_ords(&self) -> &Column<u64> {
|
||||
&self.term_ord_column
|
||||
}
|
||||
|
||||
pub fn num_rows(&self) -> RowId {
|
||||
self.term_ord_column.num_rows()
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for BytesColumn {
|
||||
|
||||
@@ -8,7 +8,7 @@ use common::BinarySerializable;
|
||||
pub use dictionary_encoded::BytesColumn;
|
||||
pub use serialize::{open_column_bytes, open_column_u64, serialize_column_u64};
|
||||
|
||||
use crate::column_index::ColumnIndex;
|
||||
use crate::column_index::{ColumnIndex, Set};
|
||||
use crate::column_values::ColumnValues;
|
||||
use crate::{Cardinality, RowId};
|
||||
|
||||
@@ -18,7 +18,15 @@ pub struct Column<T> {
|
||||
pub values: Arc<dyn ColumnValues<T>>,
|
||||
}
|
||||
|
||||
use crate::column_index::Set;
|
||||
impl<T: PartialOrd> Column<T> {
|
||||
pub fn num_rows(&self) -> RowId {
|
||||
match &self.idx {
|
||||
ColumnIndex::Full => self.values.num_vals(),
|
||||
ColumnIndex::Optional(optional_idx) => optional_idx.num_rows(),
|
||||
ColumnIndex::Multivalued(_) => todo!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: PartialOrd + Copy + Send + Sync + 'static> Column<T> {
|
||||
pub fn first(&self, row_id: RowId) -> Option<T> {
|
||||
@@ -82,6 +90,10 @@ impl<T: PartialOrd + Send + Sync + Copy + 'static> ColumnValues<T> for FirstValu
|
||||
}
|
||||
|
||||
fn num_vals(&self) -> u32 {
|
||||
self.column.idx.num_rows()
|
||||
match &self.column.idx {
|
||||
ColumnIndex::Full => self.column.values.num_vals(),
|
||||
ColumnIndex::Optional(optional_idx) => optional_idx.num_rows(),
|
||||
ColumnIndex::Multivalued(_) => todo!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -27,14 +27,4 @@ impl<'a> ColumnIndex<'a> {
|
||||
ColumnIndex::Multivalued(_) => Cardinality::Multivalued,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn num_rows(&self) -> RowId {
|
||||
match self {
|
||||
ColumnIndex::Full => {
|
||||
todo!()
|
||||
}
|
||||
ColumnIndex::Optional(optional_index) => optional_index.num_rows(),
|
||||
ColumnIndex::Multivalued(multivalued_index) => multivalued_index.num_vals() - 1,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -101,25 +101,22 @@ impl ColumnarWriter {
|
||||
mutate_or_create_column(
|
||||
&mut self.bytes_field_hash_map,
|
||||
column_name,
|
||||
|column_opt: Option<StrColumnWriter>| {
|
||||
column_opt.unwrap_or_default()
|
||||
});
|
||||
},
|
||||
|column_opt: Option<StrColumnWriter>| column_opt.unwrap_or_default(),
|
||||
);
|
||||
}
|
||||
ColumnType::Bool => {
|
||||
mutate_or_create_column(
|
||||
&mut self.bool_field_hash_map,
|
||||
column_name,
|
||||
|column_opt: Option<ColumnWriter>| {
|
||||
column_opt.unwrap_or_default()
|
||||
});
|
||||
},
|
||||
|column_opt: Option<ColumnWriter>| column_opt.unwrap_or_default(),
|
||||
);
|
||||
}
|
||||
ColumnType::DateTime => {
|
||||
mutate_or_create_column(
|
||||
&mut self.datetime_field_hash_map,
|
||||
column_name,
|
||||
|column_opt: Option<ColumnWriter>| {
|
||||
column_opt.unwrap_or_default()
|
||||
});
|
||||
|column_opt: Option<ColumnWriter>| column_opt.unwrap_or_default(),
|
||||
);
|
||||
}
|
||||
ColumnType::Numerical(numerical_type) => {
|
||||
mutate_or_create_column(
|
||||
@@ -129,7 +126,8 @@ impl ColumnarWriter {
|
||||
let mut column: NumericalColumnWriter = column_opt.unwrap_or_default();
|
||||
column.force_numerical_type(numerical_type);
|
||||
column
|
||||
});
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -175,15 +173,11 @@ impl ColumnarWriter {
|
||||
|
||||
pub fn record_datetime(&mut self, doc: RowId, column_name: &str, datetime: crate::DateTime) {
|
||||
let (hash_map, arena) = (&mut self.datetime_field_hash_map, &mut self.arena);
|
||||
mutate_or_create_column(
|
||||
hash_map,
|
||||
column_name,
|
||||
|column_opt: Option<ColumnWriter>| {
|
||||
let mut column: ColumnWriter = column_opt.unwrap_or_default();
|
||||
column.record(doc, NumericalValue::I64(datetime.timestamp_micros), arena);
|
||||
column
|
||||
},
|
||||
);
|
||||
mutate_or_create_column(hash_map, column_name, |column_opt: Option<ColumnWriter>| {
|
||||
let mut column: ColumnWriter = column_opt.unwrap_or_default();
|
||||
column.record(doc, NumericalValue::I64(datetime.timestamp_micros), arena);
|
||||
column
|
||||
});
|
||||
}
|
||||
|
||||
pub fn record_str(&mut self, doc: RowId, column_name: &str, value: &str) {
|
||||
@@ -281,12 +275,11 @@ impl ColumnarWriter {
|
||||
)?;
|
||||
}
|
||||
ColumnTypeCategory::DateTime => {
|
||||
let column_writer: ColumnWriter =
|
||||
self.datetime_field_hash_map.read(addr);
|
||||
let column_writer: ColumnWriter = self.datetime_field_hash_map.read(addr);
|
||||
let cardinality = column_writer.get_cardinality(num_docs);
|
||||
let mut column_serializer =
|
||||
serializer.serialize_column(column_name, ColumnType::DateTime);
|
||||
serialize_numerical_column(
|
||||
serialize_numerical_column(
|
||||
cardinality,
|
||||
num_docs,
|
||||
NumericalType::I64,
|
||||
|
||||
@@ -69,6 +69,26 @@ impl DynamicColumnHandle {
|
||||
self.open_internal(column_bytes)
|
||||
}
|
||||
|
||||
/// Returns the `u64` fast field reader reader associated with `fields` of types
|
||||
/// Str, u64, i64, f64, or datetime.
|
||||
///
|
||||
/// If not, the fastfield reader will returns the u64-value associated with the original
|
||||
/// FastValue.
|
||||
pub fn open_u64_lenient(&self) -> io::Result<Option<Column<u64>>> {
|
||||
let column_bytes = self.file_slice.read_bytes()?;
|
||||
match self.column_type {
|
||||
ColumnType::Str => {
|
||||
let column = crate::column::open_column_bytes(column_bytes)?;
|
||||
Ok(Some(column.term_ord_column))
|
||||
}
|
||||
ColumnType::Bool => Ok(None),
|
||||
ColumnType::Numerical(_) | ColumnType::DateTime => {
|
||||
let column = crate::column::open_column_u64::<u64>(column_bytes)?;
|
||||
Ok(Some(column))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn open_internal(&self, column_bytes: OwnedBytes) -> io::Result<DynamicColumn> {
|
||||
let dynamic_column: DynamicColumn = match self.column_type {
|
||||
ColumnType::Str => crate::column::open_column_bytes(column_bytes)?.into(),
|
||||
|
||||
@@ -4,7 +4,7 @@ use fastdivide::DividerU64;
|
||||
use fastfield_codecs::Column;
|
||||
|
||||
use crate::collector::{Collector, SegmentCollector};
|
||||
use crate::fastfield::FastValue;
|
||||
use crate::fastfield::{FastFieldNotAvailableError, FastValue};
|
||||
use crate::schema::Type;
|
||||
use crate::{DocId, Score};
|
||||
|
||||
@@ -87,14 +87,14 @@ impl HistogramComputer {
|
||||
}
|
||||
pub struct SegmentHistogramCollector {
|
||||
histogram_computer: HistogramComputer,
|
||||
ff_reader: Arc<dyn Column<u64>>,
|
||||
column_u64: Arc<dyn Column<u64>>,
|
||||
}
|
||||
|
||||
impl SegmentCollector for SegmentHistogramCollector {
|
||||
type Fruit = Vec<u64>;
|
||||
|
||||
fn collect(&mut self, doc: DocId, _score: Score) {
|
||||
let value = self.ff_reader.get_val(doc);
|
||||
let value = self.column_u64.get_val(doc);
|
||||
self.histogram_computer.add_value(value);
|
||||
}
|
||||
|
||||
@@ -112,14 +112,18 @@ impl Collector for HistogramCollector {
|
||||
_segment_local_id: crate::SegmentOrdinal,
|
||||
segment: &crate::SegmentReader,
|
||||
) -> crate::Result<Self::Child> {
|
||||
let ff_reader = segment.fast_fields().u64_lenient(&self.field)?;
|
||||
let column_opt = segment.fast_fields().u64_lenient(&self.field)?;
|
||||
let column = column_opt.ok_or_else(|| FastFieldNotAvailableError {
|
||||
field_name: self.field.clone(),
|
||||
})?;
|
||||
let column_u64 = column.first_or_default_col(0u64);
|
||||
Ok(SegmentHistogramCollector {
|
||||
histogram_computer: HistogramComputer {
|
||||
counts: vec![0; self.num_buckets],
|
||||
min_value: self.min_value,
|
||||
divider: self.divider,
|
||||
},
|
||||
ff_reader,
|
||||
column_u64,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -12,7 +12,7 @@ use crate::collector::tweak_score_top_collector::TweakedScoreTopCollector;
|
||||
use crate::collector::{
|
||||
CustomScorer, CustomSegmentScorer, ScoreSegmentTweaker, ScoreTweaker, SegmentCollector,
|
||||
};
|
||||
use crate::fastfield::FastValue;
|
||||
use crate::fastfield::{FastFieldNotAvailableError, FastValue};
|
||||
use crate::query::Weight;
|
||||
use crate::schema::Field;
|
||||
use crate::{DocAddress, DocId, Score, SegmentOrdinal, SegmentReader, TantivyError};
|
||||
@@ -22,7 +22,7 @@ struct FastFieldConvertCollector<
|
||||
TFastValue: FastValue,
|
||||
> {
|
||||
pub collector: TCollector,
|
||||
pub field: Field,
|
||||
pub field: String,
|
||||
pub fast_value: std::marker::PhantomData<TFastValue>,
|
||||
}
|
||||
|
||||
@@ -41,7 +41,8 @@ where
|
||||
segment: &SegmentReader,
|
||||
) -> crate::Result<Self::Child> {
|
||||
let schema = segment.schema();
|
||||
let field_entry = schema.get_field_entry(self.field);
|
||||
let field = schema.get_field(&self.field)?;
|
||||
let field_entry = schema.get_field_entry(field);
|
||||
if !field_entry.is_fast() {
|
||||
return Err(TantivyError::SchemaError(format!(
|
||||
"Field {:?} is not a fast field.",
|
||||
@@ -132,17 +133,17 @@ impl fmt::Debug for TopDocs {
|
||||
}
|
||||
|
||||
struct ScorerByFastFieldReader {
|
||||
ff_reader: Arc<dyn Column<u64>>,
|
||||
sort_column: Arc<dyn Column<u64>>,
|
||||
}
|
||||
|
||||
impl CustomSegmentScorer<u64> for ScorerByFastFieldReader {
|
||||
fn score(&mut self, doc: DocId) -> u64 {
|
||||
self.ff_reader.get_val(doc)
|
||||
self.sort_column.get_val(doc)
|
||||
}
|
||||
}
|
||||
|
||||
struct ScorerByField {
|
||||
field: Field,
|
||||
field: String,
|
||||
}
|
||||
|
||||
impl CustomScorer<u64> for ScorerByField {
|
||||
@@ -154,11 +155,13 @@ impl CustomScorer<u64> for ScorerByField {
|
||||
// mapping is monotonic, so it is sufficient to compute our top-K docs.
|
||||
//
|
||||
// The conversion will then happen only on the top-K docs.
|
||||
todo!();
|
||||
// let ff_reader = segment_reader
|
||||
// .fast_fields()
|
||||
// .typed_column(&self.field)?;
|
||||
// Ok(ScorerByFastFieldReader { ff_reader })
|
||||
let sort_column_opt = segment_reader.fast_fields().u64_lenient(&self.field)?;
|
||||
let sort_column = sort_column_opt
|
||||
.ok_or_else(|| FastFieldNotAvailableError {
|
||||
field_name: self.field.clone(),
|
||||
})?
|
||||
.first_or_default_col(0u64);
|
||||
Ok(ScorerByFastFieldReader { sort_column })
|
||||
}
|
||||
}
|
||||
|
||||
@@ -291,9 +294,14 @@ impl TopDocs {
|
||||
/// the [.order_by_fast_field(...)](TopDocs::order_by_fast_field) method.
|
||||
pub fn order_by_u64_field(
|
||||
self,
|
||||
field: Field,
|
||||
field: impl ToString,
|
||||
) -> impl Collector<Fruit = Vec<(u64, DocAddress)>> {
|
||||
CustomScoreTopCollector::new(ScorerByField { field }, self.0.into_tscore())
|
||||
CustomScoreTopCollector::new(
|
||||
ScorerByField {
|
||||
field: field.to_string(),
|
||||
},
|
||||
self.0.into_tscore(),
|
||||
)
|
||||
}
|
||||
|
||||
/// Set top-K to rank documents by a given fast field.
|
||||
@@ -368,15 +376,15 @@ impl TopDocs {
|
||||
/// ```
|
||||
pub fn order_by_fast_field<TFastValue>(
|
||||
self,
|
||||
fast_field: Field,
|
||||
fast_field: impl ToString,
|
||||
) -> impl Collector<Fruit = Vec<(TFastValue, DocAddress)>>
|
||||
where
|
||||
TFastValue: FastValue,
|
||||
{
|
||||
let u64_collector = self.order_by_u64_field(fast_field);
|
||||
let u64_collector = self.order_by_u64_field(fast_field.to_string());
|
||||
FastFieldConvertCollector {
|
||||
collector: u64_collector,
|
||||
field: fast_field,
|
||||
field: fast_field.to_string(),
|
||||
fast_value: PhantomData,
|
||||
}
|
||||
}
|
||||
@@ -878,7 +886,7 @@ mod tests {
|
||||
});
|
||||
let searcher = index.reader()?.searcher();
|
||||
|
||||
let top_collector = TopDocs::with_limit(4).order_by_u64_field(size);
|
||||
let top_collector = TopDocs::with_limit(4).order_by_u64_field(SIZE);
|
||||
let top_docs: Vec<(u64, DocAddress)> = searcher.search(&query, &top_collector)?;
|
||||
assert_eq!(
|
||||
&top_docs[..],
|
||||
@@ -917,7 +925,7 @@ mod tests {
|
||||
))?;
|
||||
index_writer.commit()?;
|
||||
let searcher = index.reader()?.searcher();
|
||||
let top_collector = TopDocs::with_limit(3).order_by_fast_field(birthday);
|
||||
let top_collector = TopDocs::with_limit(3).order_by_fast_field("birthday");
|
||||
let top_docs: Vec<(DateTime, DocAddress)> = searcher.search(&AllQuery, &top_collector)?;
|
||||
assert_eq!(
|
||||
&top_docs[..],
|
||||
@@ -947,7 +955,7 @@ mod tests {
|
||||
))?;
|
||||
index_writer.commit()?;
|
||||
let searcher = index.reader()?.searcher();
|
||||
let top_collector = TopDocs::with_limit(3).order_by_fast_field(altitude);
|
||||
let top_collector = TopDocs::with_limit(3).order_by_fast_field("altitude");
|
||||
let top_docs: Vec<(i64, DocAddress)> = searcher.search(&AllQuery, &top_collector)?;
|
||||
assert_eq!(
|
||||
&top_docs[..],
|
||||
@@ -977,7 +985,7 @@ mod tests {
|
||||
))?;
|
||||
index_writer.commit()?;
|
||||
let searcher = index.reader()?.searcher();
|
||||
let top_collector = TopDocs::with_limit(3).order_by_fast_field(altitude);
|
||||
let top_collector = TopDocs::with_limit(3).order_by_fast_field("altitude");
|
||||
let top_docs: Vec<(f64, DocAddress)> = searcher.search(&AllQuery, &top_collector)?;
|
||||
assert_eq!(
|
||||
&top_docs[..],
|
||||
@@ -1005,7 +1013,7 @@ mod tests {
|
||||
.unwrap();
|
||||
});
|
||||
let searcher = index.reader().unwrap().searcher();
|
||||
let top_collector = TopDocs::with_limit(4).order_by_u64_field(Field::from_field_id(2));
|
||||
let top_collector = TopDocs::with_limit(4).order_by_u64_field("missing_field");
|
||||
let segment_reader = searcher.segment_reader(0u32);
|
||||
top_collector
|
||||
.for_segment(0, segment_reader)
|
||||
@@ -1023,7 +1031,7 @@ mod tests {
|
||||
index_writer.commit()?;
|
||||
let searcher = index.reader()?.searcher();
|
||||
let segment = searcher.segment_reader(0);
|
||||
let top_collector = TopDocs::with_limit(4).order_by_u64_field(size);
|
||||
let top_collector = TopDocs::with_limit(4).order_by_u64_field(SIZE);
|
||||
let err = top_collector.for_segment(0, segment).err().unwrap();
|
||||
assert!(matches!(err, crate::TantivyError::SchemaError(_)));
|
||||
Ok(())
|
||||
@@ -1040,7 +1048,7 @@ mod tests {
|
||||
index_writer.commit()?;
|
||||
let searcher = index.reader()?.searcher();
|
||||
let segment = searcher.segment_reader(0);
|
||||
let top_collector = TopDocs::with_limit(4).order_by_fast_field::<i64>(size);
|
||||
let top_collector = TopDocs::with_limit(4).order_by_fast_field::<i64>(SIZE);
|
||||
let err = top_collector.for_segment(0, segment).err().unwrap();
|
||||
assert!(
|
||||
matches!(err, crate::TantivyError::SchemaError(msg) if msg == "Field \"size\" is not a fast field.")
|
||||
|
||||
@@ -8,7 +8,7 @@ use crate::schema::FieldEntry;
|
||||
#[derive(Debug, Error)]
|
||||
#[error("Fast field not available: '{field_name:?}'")]
|
||||
pub struct FastFieldNotAvailableError {
|
||||
field_name: String,
|
||||
pub(crate) field_name: String,
|
||||
}
|
||||
|
||||
impl FastFieldNotAvailableError {
|
||||
|
||||
@@ -469,7 +469,8 @@ mod tests {
|
||||
let mut index_writer = index.writer_for_tests().unwrap();
|
||||
index_writer.set_merge_policy(Box::new(NoMergePolicy));
|
||||
index_writer
|
||||
.add_document(doc!(date_field =>DateTime::from_utc(OffsetDateTime::now_utc()))).unwrap();
|
||||
.add_document(doc!(date_field =>DateTime::from_utc(OffsetDateTime::now_utc())))
|
||||
.unwrap();
|
||||
index_writer.commit().unwrap();
|
||||
index_writer.add_document(doc!()).unwrap();
|
||||
index_writer.commit().unwrap();
|
||||
@@ -745,7 +746,6 @@ mod tests {
|
||||
// Ok(())
|
||||
// }
|
||||
|
||||
|
||||
#[test]
|
||||
pub fn test_fastfield_bool_small() {
|
||||
let path = Path::new("test_bool");
|
||||
@@ -850,12 +850,13 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
pub fn test_gcd_date() {
|
||||
let size_prec_sec =
|
||||
test_gcd_date_with_codec(DatePrecision::Seconds);
|
||||
assert!((1000 * 13 / 8..100 + 1000*13 / 8).contains(&size_prec_sec)); // 13 bits per val = ceil(log_2(number of seconds in 2hours);
|
||||
let size_prec_micros =
|
||||
test_gcd_date_with_codec(DatePrecision::Microseconds);
|
||||
assert!((1000*33/8..100 + 1000*33/8).contains(&size_prec_micros)); // 33 bits per val = ceil(log_2(number of microsecsseconds in 2hours);
|
||||
let size_prec_sec = test_gcd_date_with_codec(DatePrecision::Seconds);
|
||||
assert!((1000 * 13 / 8..100 + 1000 * 13 / 8).contains(&size_prec_sec)); // 13 bits per val = ceil(log_2(number of seconds in 2hours);
|
||||
let size_prec_micros = test_gcd_date_with_codec(DatePrecision::Microseconds);
|
||||
assert!((1000 * 33 / 8..100 + 1000 * 33 / 8).contains(&size_prec_micros)); // 33 bits per
|
||||
// val = ceil(log_2(number
|
||||
// of microsecsseconds
|
||||
// in 2hours);
|
||||
}
|
||||
|
||||
fn test_gcd_date_with_codec(precision: DatePrecision) -> usize {
|
||||
|
||||
@@ -3,7 +3,8 @@ use std::net::Ipv6Addr;
|
||||
use std::sync::Arc;
|
||||
|
||||
use columnar::{
|
||||
ColumnType, ColumnValues, ColumnarReader, DynamicColumn, HasAssociatedColumnType, NumericalType,
|
||||
ColumnType, ColumnValues, ColumnarReader, DynamicColumn, DynamicColumnHandle,
|
||||
HasAssociatedColumnType, NumericalType,
|
||||
};
|
||||
use fastfield_codecs::{open, open_u128, Column};
|
||||
|
||||
@@ -32,18 +33,17 @@ impl FastFieldReaders {
|
||||
todo!()
|
||||
}
|
||||
|
||||
// TODO make opt
|
||||
pub fn typed_column_opt<T>(&self, field: &str) -> crate::Result<Option<columnar::Column<T>>>
|
||||
pub fn typed_column_opt<T>(
|
||||
&self,
|
||||
field_name: &str,
|
||||
) -> crate::Result<Option<columnar::Column<T>>>
|
||||
where
|
||||
T: PartialOrd + Copy + HasAssociatedColumnType + Send + Sync + Default + 'static,
|
||||
DynamicColumn: Into<Option<columnar::Column<T>>>,
|
||||
{
|
||||
let column_type = T::column_type();
|
||||
let Some(dynamic_column_handle) = self.columnar.read_columns(field)?
|
||||
.into_iter()
|
||||
.filter(|column| column.column_type() == column_type)
|
||||
.next() else {
|
||||
// TODO Option would make more sense.
|
||||
let Some(dynamic_column_handle) = self.column_handle(field_name, column_type)?
|
||||
else {
|
||||
return Ok(None);
|
||||
};
|
||||
let dynamic_column = dynamic_column_handle.open()?;
|
||||
@@ -51,7 +51,9 @@ impl FastFieldReaders {
|
||||
}
|
||||
|
||||
pub fn column_num_bytes(&self, field: &str) -> crate::Result<usize> {
|
||||
Ok(self.columnar.read_columns(field)?
|
||||
Ok(self
|
||||
.columnar
|
||||
.read_columns(field)?
|
||||
.into_iter()
|
||||
.map(|column_handle| column_handle.num_bytes())
|
||||
.sum())
|
||||
@@ -101,14 +103,27 @@ impl FastFieldReaders {
|
||||
todo!();
|
||||
}
|
||||
|
||||
/// Returns the `u64` fast field reader reader associated with `field`, regardless of whether
|
||||
/// the given field is effectively of type `u64` or not.
|
||||
///
|
||||
/// If not, the fastfield reader will returns the u64-value associated with the original
|
||||
/// FastValue.
|
||||
pub fn u64_lenient(&self, field_name: &str) -> crate::Result<Arc<dyn Column<u64>>> {
|
||||
todo!();
|
||||
// self.typed_fast_field_reader(field_name)
|
||||
pub fn column_handle(
|
||||
&self,
|
||||
field_name: &str,
|
||||
column_type: ColumnType,
|
||||
) -> crate::Result<Option<DynamicColumnHandle>> {
|
||||
let dynamic_column_handle_opt = self
|
||||
.columnar
|
||||
.read_columns(field_name)?
|
||||
.into_iter()
|
||||
.filter(|column| column.column_type() == column_type)
|
||||
.next();
|
||||
Ok(dynamic_column_handle_opt)
|
||||
}
|
||||
|
||||
pub fn u64_lenient(&self, field_name: &str) -> crate::Result<Option<columnar::Column<u64>>> {
|
||||
for col in self.columnar.read_columns(field_name)? {
|
||||
if let Some(col_u64) = col.open_u64_lenient()? {
|
||||
return Ok(Some(col_u64));
|
||||
}
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
/// Returns the `i64` fast field reader reader associated with `field`.
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use std::collections::HashMap;
|
||||
use std::io;
|
||||
|
||||
use columnar::{ColumnarWriter, NumericalType, NumericalValue, ColumnType};
|
||||
use columnar::{ColumnType, ColumnarWriter, NumericalType, NumericalValue};
|
||||
use common;
|
||||
use fastfield_codecs::{Column, MonotonicallyMappableToU128, MonotonicallyMappableToU64};
|
||||
use rustc_hash::FxHashMap;
|
||||
@@ -11,7 +11,7 @@ use super::FastFieldType;
|
||||
use crate::fastfield::CompositeFastFieldSerializer;
|
||||
use crate::indexer::doc_id_mapping::DocIdMapping;
|
||||
use crate::postings::UnorderedTermId;
|
||||
use crate::schema::{Document, Field, FieldEntry, FieldType, Schema, Value, Type};
|
||||
use crate::schema::{Document, Field, FieldEntry, FieldType, Schema, Type, Value};
|
||||
use crate::termdict::TermOrdinal;
|
||||
use crate::{DatePrecision, DocId};
|
||||
|
||||
@@ -28,7 +28,10 @@ impl FastFieldsWriter {
|
||||
pub fn from_schema(schema: &Schema) -> FastFieldsWriter {
|
||||
let mut columnar_writer = ColumnarWriter::default();
|
||||
let mut fast_fields: Vec<Option<String>> = vec![None; schema.num_fields()];
|
||||
let mut date_precisions: Vec<DatePrecision> = std::iter::repeat_with(DatePrecision::default).take(schema.num_fields()).collect();
|
||||
let mut date_precisions: Vec<DatePrecision> =
|
||||
std::iter::repeat_with(DatePrecision::default)
|
||||
.take(schema.num_fields())
|
||||
.collect();
|
||||
// TODO see other types
|
||||
for (field_id, field_entry) in schema.fields() {
|
||||
if !field_entry.field_type().is_fast() {
|
||||
@@ -44,7 +47,9 @@ impl FastFieldsWriter {
|
||||
Type::Date => ColumnType::DateTime,
|
||||
Type::Facet => ColumnType::Str,
|
||||
Type::Bytes => todo!(),
|
||||
Type::Json => { continue; },
|
||||
Type::Json => {
|
||||
continue;
|
||||
}
|
||||
Type::IpAddr => todo!(),
|
||||
};
|
||||
if let FieldType::Date(date_options) = field_entry.field_type() {
|
||||
@@ -52,7 +57,12 @@ impl FastFieldsWriter {
|
||||
}
|
||||
columnar_writer.record_column_type(field_entry.name(), column_type);
|
||||
}
|
||||
FastFieldsWriter { columnar_writer, fast_field_names: fast_fields, num_docs: 0u32, date_precisions }
|
||||
FastFieldsWriter {
|
||||
columnar_writer,
|
||||
fast_field_names: fast_fields,
|
||||
num_docs: 0u32,
|
||||
date_precisions,
|
||||
}
|
||||
}
|
||||
|
||||
/// The memory used (inclusive childs)
|
||||
@@ -92,21 +102,19 @@ impl FastFieldsWriter {
|
||||
Value::Str(_) => todo!(),
|
||||
Value::PreTokStr(_) => todo!(),
|
||||
Value::Bool(bool_val) => {
|
||||
self.columnar_writer.record_bool(
|
||||
doc_id,
|
||||
field_name.as_str(),
|
||||
*bool_val,
|
||||
);
|
||||
},
|
||||
self.columnar_writer
|
||||
.record_bool(doc_id, field_name.as_str(), *bool_val);
|
||||
}
|
||||
Value::Date(datetime) => {
|
||||
let date_precision = self.date_precisions[field_value.field().field_id() as usize];
|
||||
let date_precision =
|
||||
self.date_precisions[field_value.field().field_id() as usize];
|
||||
let truncated_datetime = datetime.truncate(date_precision);
|
||||
self.columnar_writer.record_datetime(
|
||||
doc_id,
|
||||
field_name.as_str(),
|
||||
truncated_datetime.into()
|
||||
truncated_datetime.into(),
|
||||
);
|
||||
},
|
||||
}
|
||||
Value::Facet(_) => todo!(),
|
||||
Value::Bytes(_) => todo!(),
|
||||
Value::JsonObject(_) => todo!(),
|
||||
|
||||
@@ -12,7 +12,9 @@ use crate::core::{Segment, SegmentReader};
|
||||
use crate::directory::WritePtr;
|
||||
use crate::docset::{DocSet, TERMINATED};
|
||||
use crate::error::DataCorruption;
|
||||
use crate::fastfield::{AliveBitSet, Column, CompositeFastFieldSerializer};
|
||||
use crate::fastfield::{
|
||||
AliveBitSet, Column, CompositeFastFieldSerializer, FastFieldNotAvailableError,
|
||||
};
|
||||
use crate::fieldnorm::{FieldNormReader, FieldNormReaders, FieldNormsSerializer, FieldNormsWriter};
|
||||
use crate::indexer::doc_id_mapping::SegmentDocIdMapping;
|
||||
// use crate::indexer::sorted_doc_id_multivalue_column::RemappedDocIdMultiValueColumn;
|
||||
@@ -251,7 +253,12 @@ impl IndexMerger {
|
||||
doc_id_mapping: &SegmentDocIdMapping,
|
||||
) -> crate::Result<()> {
|
||||
debug_time!("wrie-fast-fields");
|
||||
todo!();
|
||||
for (_field, field_entry) in self.schema.fields() {
|
||||
if field_entry.is_fast() {
|
||||
todo!();
|
||||
}
|
||||
}
|
||||
|
||||
// for (field, field_entry) in self.schema.fields() {
|
||||
// let field_type = field_entry.field_type();
|
||||
// match field_type {
|
||||
@@ -389,8 +396,13 @@ impl IndexMerger {
|
||||
sort_by_field: &IndexSortByField,
|
||||
) -> crate::Result<Arc<dyn Column>> {
|
||||
reader.schema().get_field(&sort_by_field.field)?;
|
||||
let value_accessor = reader.fast_fields().u64_lenient(&sort_by_field.field)?;
|
||||
Ok(value_accessor)
|
||||
let value_accessor = reader
|
||||
.fast_fields()
|
||||
.u64_lenient(&sort_by_field.field)?
|
||||
.ok_or_else(|| FastFieldNotAvailableError {
|
||||
field_name: sort_by_field.field.to_string(),
|
||||
})?;
|
||||
Ok(value_accessor.first_or_default_col(0u64))
|
||||
}
|
||||
/// Collecting value_accessors into a vec to bind the lifetime.
|
||||
pub(crate) fn get_reader_with_sort_field_accessor(
|
||||
|
||||
@@ -139,7 +139,6 @@ impl SegmentWriter {
|
||||
self.ctx,
|
||||
self.fast_field_writers,
|
||||
&self.fieldnorms_writer,
|
||||
&self.schema,
|
||||
self.segment_serializer,
|
||||
mapping.as_ref(),
|
||||
)?;
|
||||
@@ -182,31 +181,21 @@ impl SegmentWriter {
|
||||
|
||||
match field_entry.field_type() {
|
||||
FieldType::Facet(_) => {
|
||||
todo!();
|
||||
// for value in values {
|
||||
// let facet = value.as_facet().ok_or_else(make_schema_error)?;
|
||||
// let facet_str = facet.encoded_str();
|
||||
// let mut unordered_term_id_opt = None;
|
||||
// FacetTokenizer
|
||||
// .token_stream(facet_str)
|
||||
// .process(&mut |token| {
|
||||
// term_buffer.set_text(&token.text);
|
||||
// let unordered_term_id =
|
||||
// postings_writer.subscribe(doc_id, 0u32, term_buffer, ctx);
|
||||
// // TODO pass indexing context directly in subscribe function
|
||||
// unordered_term_id_opt = Some(unordered_term_id);
|
||||
// });
|
||||
// if let Some(unordered_term_id) = unordered_term_id_opt {
|
||||
// self.fast_field_writers
|
||||
// .get_term_id_writer_mut(field)
|
||||
// .expect("writer for facet missing")
|
||||
// .add_val(unordered_term_id);
|
||||
// }
|
||||
// }
|
||||
for value in values {
|
||||
let facet = value.as_facet().ok_or_else(make_schema_error)?;
|
||||
let facet_str = facet.encoded_str();
|
||||
let mut facet_tokenizer = FacetTokenizer.token_stream(facet_str);
|
||||
let mut indexing_position = IndexingPosition::default();
|
||||
postings_writer.index_text(
|
||||
doc_id,
|
||||
&mut *facet_tokenizer,
|
||||
term_buffer,
|
||||
ctx,
|
||||
&mut indexing_position,
|
||||
);
|
||||
}
|
||||
}
|
||||
FieldType::Str(_) => {
|
||||
todo!()
|
||||
/*
|
||||
let mut indexing_position = IndexingPosition::default();
|
||||
for value in values {
|
||||
let mut token_stream = match value {
|
||||
@@ -230,14 +219,12 @@ impl SegmentWriter {
|
||||
term_buffer,
|
||||
ctx,
|
||||
&mut indexing_position,
|
||||
self.fast_field_writers.get_term_id_writer_mut(field),
|
||||
);
|
||||
}
|
||||
if field_entry.has_fieldnorms() {
|
||||
self.fieldnorms_writer
|
||||
.record(doc_id, field, indexing_position.num_tokens);
|
||||
}
|
||||
*/
|
||||
}
|
||||
FieldType::U64(_) => {
|
||||
let mut num_vals = 0;
|
||||
@@ -387,7 +374,6 @@ fn remap_and_write(
|
||||
ctx: IndexingContext,
|
||||
fast_field_writers: FastFieldsWriter,
|
||||
fieldnorms_writer: &FieldNormsWriter,
|
||||
schema: &Schema,
|
||||
mut serializer: SegmentSerializer,
|
||||
doc_id_map: Option<&DocIdMapping>,
|
||||
) -> crate::Result<()> {
|
||||
@@ -399,12 +385,11 @@ fn remap_and_write(
|
||||
.segment()
|
||||
.open_read(SegmentComponent::FieldNorms)?;
|
||||
let fieldnorm_readers = FieldNormReaders::open(fieldnorm_data)?;
|
||||
let term_ord_map = serialize_postings(
|
||||
serialize_postings(
|
||||
ctx,
|
||||
per_field_postings_writers,
|
||||
fieldnorm_readers,
|
||||
doc_id_map,
|
||||
schema,
|
||||
serializer.get_postings_serializer(),
|
||||
)?;
|
||||
debug!("fastfield-serialize");
|
||||
|
||||
@@ -5,9 +5,7 @@ use stacker::Addr;
|
||||
use crate::indexer::doc_id_mapping::DocIdMapping;
|
||||
use crate::postings::postings_writer::SpecializedPostingsWriter;
|
||||
use crate::postings::recorder::{BufferLender, DocIdRecorder, Recorder};
|
||||
use crate::postings::{
|
||||
FieldSerializer, IndexingContext, IndexingPosition, PostingsWriter, UnorderedTermId,
|
||||
};
|
||||
use crate::postings::{FieldSerializer, IndexingContext, IndexingPosition, PostingsWriter};
|
||||
use crate::schema::term::as_json_path_type_value_bytes;
|
||||
use crate::schema::Type;
|
||||
use crate::tokenizer::TokenStream;
|
||||
@@ -32,8 +30,8 @@ impl<Rec: Recorder> PostingsWriter for JsonPostingsWriter<Rec> {
|
||||
pos: u32,
|
||||
term: &crate::Term,
|
||||
ctx: &mut IndexingContext,
|
||||
) -> UnorderedTermId {
|
||||
self.non_str_posting_writer.subscribe(doc, pos, term, ctx)
|
||||
) {
|
||||
self.non_str_posting_writer.subscribe(doc, pos, term, ctx);
|
||||
}
|
||||
|
||||
fn index_text(
|
||||
@@ -56,13 +54,13 @@ impl<Rec: Recorder> PostingsWriter for JsonPostingsWriter<Rec> {
|
||||
/// The actual serialization format is handled by the `PostingsSerializer`.
|
||||
fn serialize(
|
||||
&self,
|
||||
term_addrs: &[(Term<&[u8]>, Addr, UnorderedTermId)],
|
||||
term_addrs: &[(Term<&[u8]>, Addr)],
|
||||
doc_id_map: Option<&DocIdMapping>,
|
||||
ctx: &IndexingContext,
|
||||
serializer: &mut FieldSerializer,
|
||||
) -> io::Result<()> {
|
||||
let mut buffer_lender = BufferLender::default();
|
||||
for (term, addr, _) in term_addrs {
|
||||
for (term, addr) in term_addrs {
|
||||
// TODO optimization opportunity here.
|
||||
if let Some((_, typ, _)) = as_json_path_type_value_bytes(term.value_bytes()) {
|
||||
if typ == Type::Str {
|
||||
|
||||
@@ -20,12 +20,10 @@ use crate::DocId;
|
||||
|
||||
const POSITION_GAP: u32 = 1;
|
||||
|
||||
fn make_field_partition(
|
||||
term_offsets: &[(Term<&[u8]>, Addr, UnorderedTermId)],
|
||||
) -> Vec<(Field, Range<usize>)> {
|
||||
fn make_field_partition(term_offsets: &[(Term<&[u8]>, Addr)]) -> Vec<(Field, Range<usize>)> {
|
||||
let term_offsets_it = term_offsets
|
||||
.iter()
|
||||
.map(|(term, _, _)| term.field())
|
||||
.map(|(term, _)| term.field())
|
||||
.enumerate();
|
||||
let mut prev_field_opt = None;
|
||||
let mut fields = vec![];
|
||||
@@ -53,48 +51,18 @@ pub(crate) fn serialize_postings(
|
||||
per_field_postings_writers: &PerFieldPostingsWriter,
|
||||
fieldnorm_readers: FieldNormReaders,
|
||||
doc_id_map: Option<&DocIdMapping>,
|
||||
schema: &Schema,
|
||||
serializer: &mut InvertedIndexSerializer,
|
||||
) -> crate::Result<HashMap<Field, FxHashMap<UnorderedTermId, TermOrdinal>>> {
|
||||
let mut term_offsets: Vec<(Term<&[u8]>, Addr, UnorderedTermId)> =
|
||||
Vec::with_capacity(ctx.term_index.len());
|
||||
) -> crate::Result<()> {
|
||||
let mut term_offsets: Vec<(Term<&[u8]>, Addr)> = Vec::with_capacity(ctx.term_index.len());
|
||||
term_offsets.extend(
|
||||
ctx.term_index
|
||||
.iter()
|
||||
.map(|(bytes, addr, unordered_id)| (Term::wrap(bytes), addr, unordered_id)),
|
||||
.map(|(bytes, addr, _unordered_id)| (Term::wrap(bytes), addr)),
|
||||
);
|
||||
term_offsets.sort_unstable_by_key(|(k, _, _)| k.clone());
|
||||
let mut unordered_term_mappings: HashMap<Field, FxHashMap<UnorderedTermId, TermOrdinal>> =
|
||||
HashMap::new();
|
||||
term_offsets.sort_unstable_by_key(|(k, _)| k.clone());
|
||||
|
||||
let field_offsets = make_field_partition(&term_offsets);
|
||||
for (field, byte_offsets) in field_offsets {
|
||||
let field_entry = schema.get_field_entry(field);
|
||||
match *field_entry.field_type() {
|
||||
FieldType::Str(_) | FieldType::Facet(_) => {
|
||||
// populating the (unordered term ord) -> (ordered term ord) mapping
|
||||
// for the field.
|
||||
let unordered_term_ids = term_offsets[byte_offsets.clone()]
|
||||
.iter()
|
||||
.map(|&(_, _, bucket)| bucket);
|
||||
let mapping: FxHashMap<UnorderedTermId, TermOrdinal> = unordered_term_ids
|
||||
.enumerate()
|
||||
.map(|(term_ord, unord_term_id)| {
|
||||
(unord_term_id as UnorderedTermId, term_ord as TermOrdinal)
|
||||
})
|
||||
.collect();
|
||||
unordered_term_mappings.insert(field, mapping);
|
||||
}
|
||||
FieldType::U64(_)
|
||||
| FieldType::I64(_)
|
||||
| FieldType::F64(_)
|
||||
| FieldType::Date(_)
|
||||
| FieldType::Bool(_) => {}
|
||||
FieldType::Bytes(_) => {}
|
||||
FieldType::JsonObject(_) => {}
|
||||
FieldType::IpAddr(_) => {}
|
||||
}
|
||||
|
||||
let postings_writer = per_field_postings_writers.get_for_field(field);
|
||||
let fieldnorm_reader = fieldnorm_readers.get_field(field)?;
|
||||
let mut field_serializer =
|
||||
@@ -107,7 +75,7 @@ pub(crate) fn serialize_postings(
|
||||
)?;
|
||||
field_serializer.close()?;
|
||||
}
|
||||
Ok(unordered_term_mappings)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
@@ -128,19 +96,13 @@ pub(crate) trait PostingsWriter: Send + Sync {
|
||||
/// * term - the term
|
||||
/// * ctx - Contains a term hashmap and a memory arena to store all necessary posting list
|
||||
/// information.
|
||||
fn subscribe(
|
||||
&mut self,
|
||||
doc: DocId,
|
||||
pos: u32,
|
||||
term: &Term,
|
||||
ctx: &mut IndexingContext,
|
||||
) -> UnorderedTermId; // TODO remove UnorderedTermId
|
||||
fn subscribe(&mut self, doc: DocId, pos: u32, term: &Term, ctx: &mut IndexingContext);
|
||||
|
||||
/// Serializes the postings on disk.
|
||||
/// The actual serialization format is handled by the `PostingsSerializer`.
|
||||
fn serialize(
|
||||
&self,
|
||||
term_addrs: &[(Term<&[u8]>, Addr, UnorderedTermId)],
|
||||
term_addrs: &[(Term<&[u8]>, Addr)],
|
||||
doc_id_map: Option<&DocIdMapping>,
|
||||
ctx: &IndexingContext,
|
||||
serializer: &mut FieldSerializer,
|
||||
@@ -221,13 +183,7 @@ impl<Rec: Recorder> SpecializedPostingsWriter<Rec> {
|
||||
}
|
||||
|
||||
impl<Rec: Recorder> PostingsWriter for SpecializedPostingsWriter<Rec> {
|
||||
fn subscribe(
|
||||
&mut self,
|
||||
doc: DocId,
|
||||
position: u32,
|
||||
term: &Term,
|
||||
ctx: &mut IndexingContext,
|
||||
) -> UnorderedTermId {
|
||||
fn subscribe(&mut self, doc: DocId, position: u32, term: &Term, ctx: &mut IndexingContext) {
|
||||
debug_assert!(term.as_slice().len() >= 4);
|
||||
self.total_num_tokens += 1;
|
||||
let (term_index, arena) = (&mut ctx.term_index, &mut ctx.arena);
|
||||
@@ -246,18 +202,18 @@ impl<Rec: Recorder> PostingsWriter for SpecializedPostingsWriter<Rec> {
|
||||
recorder.record_position(position, arena);
|
||||
recorder
|
||||
}
|
||||
}) as UnorderedTermId
|
||||
});
|
||||
}
|
||||
|
||||
fn serialize(
|
||||
&self,
|
||||
term_addrs: &[(Term<&[u8]>, Addr, UnorderedTermId)],
|
||||
term_addrs: &[(Term<&[u8]>, Addr)],
|
||||
doc_id_map: Option<&DocIdMapping>,
|
||||
ctx: &IndexingContext,
|
||||
serializer: &mut FieldSerializer,
|
||||
) -> io::Result<()> {
|
||||
let mut buffer_lender = BufferLender::default();
|
||||
for (term, addr, _) in term_addrs {
|
||||
for (term, addr) in term_addrs {
|
||||
Self::serialize_one_term(term, *addr, doc_id_map, &mut buffer_lender, ctx, serializer)?;
|
||||
}
|
||||
Ok(())
|
||||
|
||||
@@ -5,7 +5,9 @@ use serde::{Deserialize, Serialize};
|
||||
use crate::schema::flags::{FastFlag, IndexedFlag, SchemaFlagList, StoredFlag};
|
||||
|
||||
/// DateTime Precision
|
||||
#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, Default)]
|
||||
#[derive(
|
||||
Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, Default,
|
||||
)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum DatePrecision {
|
||||
/// Seconds precision
|
||||
|
||||
Reference in New Issue
Block a user