mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-01-09 10:32:55 +00:00
Compare commits
1 Commits
index_writ
...
tracing-ta
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3a8a83da80 |
4
.github/workflows/coverage.yml
vendored
4
.github/workflows/coverage.yml
vendored
@@ -17,11 +17,11 @@ jobs:
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- name: Install Rust
|
||||
run: rustup toolchain install nightly-2023-09-10 --profile minimal --component llvm-tools-preview
|
||||
run: rustup toolchain install nightly --profile minimal --component llvm-tools-preview
|
||||
- uses: Swatinem/rust-cache@v2
|
||||
- uses: taiki-e/install-action@cargo-llvm-cov
|
||||
- name: Generate code coverage
|
||||
run: cargo +nightly-2023-09-10 llvm-cov --all-features --workspace --doctests --lcov --output-path lcov.info
|
||||
run: cargo +nightly llvm-cov --all-features --workspace --doctests --lcov --output-path lcov.info
|
||||
- name: Upload coverage to Codecov
|
||||
uses: codecov/codecov-action@v3
|
||||
continue-on-error: true
|
||||
|
||||
@@ -19,6 +19,7 @@ oneshot = "0.1.5"
|
||||
base64 = "0.21.0"
|
||||
byteorder = "1.4.3"
|
||||
crc32fast = "1.3.2"
|
||||
tracing = "0.1"
|
||||
once_cell = "1.10.0"
|
||||
regex = { version = "1.5.5", default-features = false, features = ["std", "unicode"] }
|
||||
aho-corasick = "1.0"
|
||||
|
||||
@@ -134,142 +134,3 @@ impl Drop for ResourceLimitGuard {
|
||||
.fetch_sub(self.allocated_with_the_guard, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::aggregation::tests::exec_request_with_query;
|
||||
|
||||
// https://github.com/quickwit-oss/quickwit/issues/3837
|
||||
#[test]
|
||||
fn test_agg_limits_with_empty_merge() {
|
||||
use crate::aggregation::agg_req::Aggregations;
|
||||
use crate::aggregation::bucket::tests::get_test_index_from_docs;
|
||||
|
||||
let docs = vec![
|
||||
vec![r#"{ "date": "2015-01-02T00:00:00Z", "text": "bbb", "text2": "bbb" }"#],
|
||||
vec![r#"{ "text": "aaa", "text2": "bbb" }"#],
|
||||
];
|
||||
let index = get_test_index_from_docs(false, &docs).unwrap();
|
||||
|
||||
{
|
||||
let elasticsearch_compatible_json = json!(
|
||||
{
|
||||
"1": {
|
||||
"terms": {"field": "text2", "min_doc_count": 0},
|
||||
"aggs": {
|
||||
"2":{
|
||||
"date_histogram": {
|
||||
"field": "date",
|
||||
"fixed_interval": "1d",
|
||||
"extended_bounds": {
|
||||
"min": "2015-01-01T00:00:00Z",
|
||||
"max": "2015-01-10T00:00:00Z"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
let agg_req: Aggregations = serde_json::from_str(
|
||||
&serde_json::to_string(&elasticsearch_compatible_json).unwrap(),
|
||||
)
|
||||
.unwrap();
|
||||
let res = exec_request_with_query(agg_req, &index, Some(("text", "bbb"))).unwrap();
|
||||
let expected_res = json!({
|
||||
"1": {
|
||||
"buckets": [
|
||||
{
|
||||
"2": {
|
||||
"buckets": [
|
||||
{ "doc_count": 0, "key": 1420070400000.0, "key_as_string": "2015-01-01T00:00:00Z" },
|
||||
{ "doc_count": 1, "key": 1420156800000.0, "key_as_string": "2015-01-02T00:00:00Z" },
|
||||
{ "doc_count": 0, "key": 1420243200000.0, "key_as_string": "2015-01-03T00:00:00Z" },
|
||||
{ "doc_count": 0, "key": 1420329600000.0, "key_as_string": "2015-01-04T00:00:00Z" },
|
||||
{ "doc_count": 0, "key": 1420416000000.0, "key_as_string": "2015-01-05T00:00:00Z" },
|
||||
{ "doc_count": 0, "key": 1420502400000.0, "key_as_string": "2015-01-06T00:00:00Z" },
|
||||
{ "doc_count": 0, "key": 1420588800000.0, "key_as_string": "2015-01-07T00:00:00Z" },
|
||||
{ "doc_count": 0, "key": 1420675200000.0, "key_as_string": "2015-01-08T00:00:00Z" },
|
||||
{ "doc_count": 0, "key": 1420761600000.0, "key_as_string": "2015-01-09T00:00:00Z" },
|
||||
{ "doc_count": 0, "key": 1420848000000.0, "key_as_string": "2015-01-10T00:00:00Z" }
|
||||
]
|
||||
},
|
||||
"doc_count": 1,
|
||||
"key": "bbb"
|
||||
}
|
||||
],
|
||||
"doc_count_error_upper_bound": 0,
|
||||
"sum_other_doc_count": 0
|
||||
}
|
||||
});
|
||||
assert_eq!(res, expected_res);
|
||||
}
|
||||
}
|
||||
|
||||
// https://github.com/quickwit-oss/quickwit/issues/3837
|
||||
#[test]
|
||||
fn test_agg_limits_with_empty_data() {
|
||||
use crate::aggregation::agg_req::Aggregations;
|
||||
use crate::aggregation::bucket::tests::get_test_index_from_docs;
|
||||
|
||||
let docs = vec![vec![r#"{ "text": "aaa", "text2": "bbb" }"#]];
|
||||
let index = get_test_index_from_docs(false, &docs).unwrap();
|
||||
|
||||
{
|
||||
// Empty result since there is no doc with dates
|
||||
let elasticsearch_compatible_json = json!(
|
||||
{
|
||||
"1": {
|
||||
"terms": {"field": "text2", "min_doc_count": 0},
|
||||
"aggs": {
|
||||
"2":{
|
||||
"date_histogram": {
|
||||
"field": "date",
|
||||
"fixed_interval": "1d",
|
||||
"extended_bounds": {
|
||||
"min": "2015-01-01T00:00:00Z",
|
||||
"max": "2015-01-10T00:00:00Z"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
let agg_req: Aggregations = serde_json::from_str(
|
||||
&serde_json::to_string(&elasticsearch_compatible_json).unwrap(),
|
||||
)
|
||||
.unwrap();
|
||||
let res = exec_request_with_query(agg_req, &index, Some(("text", "bbb"))).unwrap();
|
||||
let expected_res = json!({
|
||||
"1": {
|
||||
"buckets": [
|
||||
{
|
||||
"2": {
|
||||
"buckets": [
|
||||
{ "doc_count": 0, "key": 1420070400000.0, "key_as_string": "2015-01-01T00:00:00Z" },
|
||||
{ "doc_count": 0, "key": 1420156800000.0, "key_as_string": "2015-01-02T00:00:00Z" },
|
||||
{ "doc_count": 0, "key": 1420243200000.0, "key_as_string": "2015-01-03T00:00:00Z" },
|
||||
{ "doc_count": 0, "key": 1420329600000.0, "key_as_string": "2015-01-04T00:00:00Z" },
|
||||
{ "doc_count": 0, "key": 1420416000000.0, "key_as_string": "2015-01-05T00:00:00Z" },
|
||||
{ "doc_count": 0, "key": 1420502400000.0, "key_as_string": "2015-01-06T00:00:00Z" },
|
||||
{ "doc_count": 0, "key": 1420588800000.0, "key_as_string": "2015-01-07T00:00:00Z" },
|
||||
{ "doc_count": 0, "key": 1420675200000.0, "key_as_string": "2015-01-08T00:00:00Z" },
|
||||
{ "doc_count": 0, "key": 1420761600000.0, "key_as_string": "2015-01-09T00:00:00Z" },
|
||||
{ "doc_count": 0, "key": 1420848000000.0, "key_as_string": "2015-01-10T00:00:00Z" }
|
||||
]
|
||||
},
|
||||
"doc_count": 0,
|
||||
"key": "bbb"
|
||||
}
|
||||
],
|
||||
"doc_count_error_upper_bound": 0,
|
||||
"sum_other_doc_count": 0
|
||||
}
|
||||
});
|
||||
assert_eq!(res, expected_res);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -103,8 +103,7 @@ impl AggregationWithAccessor {
|
||||
field: field_name, ..
|
||||
}) => {
|
||||
let (accessor, column_type) =
|
||||
// Only DateTime is supported for DateHistogram
|
||||
get_ff_reader(reader, field_name, Some(&[ColumnType::DateTime]))?;
|
||||
get_ff_reader(reader, field_name, Some(get_numeric_or_date_column_types()))?;
|
||||
add_agg_with_accessor(accessor, column_type, &mut res)?;
|
||||
}
|
||||
Terms(TermsAggregation {
|
||||
|
||||
@@ -132,7 +132,6 @@ impl DateHistogramAggregationReq {
|
||||
hard_bounds: self.hard_bounds,
|
||||
extended_bounds: self.extended_bounds,
|
||||
keyed: self.keyed,
|
||||
is_normalized_to_ns: false,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -244,14 +243,14 @@ fn parse_into_milliseconds(input: &str) -> Result<i64, AggregationError> {
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub mod tests {
|
||||
mod tests {
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
use super::*;
|
||||
use crate::aggregation::agg_req::Aggregations;
|
||||
use crate::aggregation::tests::exec_request;
|
||||
use crate::indexer::NoMergePolicy;
|
||||
use crate::schema::{Schema, FAST, STRING};
|
||||
use crate::schema::{Schema, FAST};
|
||||
use crate::Index;
|
||||
|
||||
#[test]
|
||||
@@ -307,8 +306,7 @@ pub mod tests {
|
||||
) -> crate::Result<Index> {
|
||||
let mut schema_builder = Schema::builder();
|
||||
schema_builder.add_date_field("date", FAST);
|
||||
schema_builder.add_text_field("text", FAST | STRING);
|
||||
schema_builder.add_text_field("text2", FAST | STRING);
|
||||
schema_builder.add_text_field("text", FAST);
|
||||
let schema = schema_builder.build();
|
||||
let index = Index::create_in_ram(schema.clone());
|
||||
{
|
||||
|
||||
@@ -122,14 +122,11 @@ pub struct HistogramAggregation {
|
||||
/// Whether to return the buckets as a hash map
|
||||
#[serde(default)]
|
||||
pub keyed: bool,
|
||||
/// Whether the values are normalized to ns for date time values. Defaults to false.
|
||||
#[serde(default)]
|
||||
pub is_normalized_to_ns: bool,
|
||||
}
|
||||
|
||||
impl HistogramAggregation {
|
||||
pub(crate) fn normalize_date_time(&mut self) {
|
||||
if !self.is_normalized_to_ns {
|
||||
pub(crate) fn normalize(&mut self, column_type: ColumnType) {
|
||||
if column_type.is_date_time() {
|
||||
// values are provided in ms, but the fastfield is in nano seconds
|
||||
self.interval *= 1_000_000.0;
|
||||
self.offset = self.offset.map(|off| off * 1_000_000.0);
|
||||
@@ -141,7 +138,6 @@ impl HistogramAggregation {
|
||||
min: bounds.min * 1_000_000.0,
|
||||
max: bounds.max * 1_000_000.0,
|
||||
});
|
||||
self.is_normalized_to_ns = true;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -374,7 +370,7 @@ impl SegmentHistogramCollector {
|
||||
|
||||
Ok(IntermediateBucketResult::Histogram {
|
||||
buckets,
|
||||
is_date_agg: self.column_type == ColumnType::DateTime,
|
||||
column_type: Some(self.column_type),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -385,9 +381,7 @@ impl SegmentHistogramCollector {
|
||||
accessor_idx: usize,
|
||||
) -> crate::Result<Self> {
|
||||
req.validate()?;
|
||||
if field_type == ColumnType::DateTime {
|
||||
req.normalize_date_time();
|
||||
}
|
||||
req.normalize(field_type);
|
||||
|
||||
let sub_aggregation_blueprint = if sub_aggregation.is_empty() {
|
||||
None
|
||||
@@ -445,7 +439,6 @@ fn intermediate_buckets_to_final_buckets_fill_gaps(
|
||||
// memory check upfront
|
||||
let (_, first_bucket_num, last_bucket_num) =
|
||||
generate_bucket_pos_with_opt_minmax(histogram_req, min_max);
|
||||
|
||||
// It's based on user input, so we need to account for overflows
|
||||
let added_buckets = ((last_bucket_num.saturating_sub(first_bucket_num)).max(0) as u64)
|
||||
.saturating_sub(buckets.len() as u64);
|
||||
@@ -489,7 +482,7 @@ fn intermediate_buckets_to_final_buckets_fill_gaps(
|
||||
// Convert to BucketEntry
|
||||
pub(crate) fn intermediate_histogram_buckets_to_final_buckets(
|
||||
buckets: Vec<IntermediateHistogramBucketEntry>,
|
||||
is_date_agg: bool,
|
||||
column_type: Option<ColumnType>,
|
||||
histogram_req: &HistogramAggregation,
|
||||
sub_aggregation: &Aggregations,
|
||||
limits: &AggregationLimits,
|
||||
@@ -498,8 +491,8 @@ pub(crate) fn intermediate_histogram_buckets_to_final_buckets(
|
||||
// The request used in the the call to final is not yet be normalized.
|
||||
// Normalization is changing the precision from milliseconds to nanoseconds.
|
||||
let mut histogram_req = histogram_req.clone();
|
||||
if is_date_agg {
|
||||
histogram_req.normalize_date_time();
|
||||
if let Some(column_type) = column_type {
|
||||
histogram_req.normalize(column_type);
|
||||
}
|
||||
let mut buckets = if histogram_req.min_doc_count() == 0 {
|
||||
// With min_doc_count != 0, we may need to add buckets, so that there are no
|
||||
@@ -523,7 +516,7 @@ pub(crate) fn intermediate_histogram_buckets_to_final_buckets(
|
||||
|
||||
// If we have a date type on the histogram buckets, we add the `key_as_string` field as rfc339
|
||||
// and normalize from nanoseconds to milliseconds
|
||||
if is_date_agg {
|
||||
if column_type == Some(ColumnType::DateTime) {
|
||||
for bucket in buckets.iter_mut() {
|
||||
if let crate::aggregation::Key::F64(ref mut val) = bucket.key {
|
||||
let key_as_string = format_date(*val as i64)?;
|
||||
|
||||
@@ -172,16 +172,10 @@ pub(crate) fn empty_from_req(req: &Aggregation) -> IntermediateAggregationResult
|
||||
Range(_) => IntermediateAggregationResult::Bucket(IntermediateBucketResult::Range(
|
||||
Default::default(),
|
||||
)),
|
||||
Histogram(_) => {
|
||||
Histogram(_) | DateHistogram(_) => {
|
||||
IntermediateAggregationResult::Bucket(IntermediateBucketResult::Histogram {
|
||||
buckets: Vec::new(),
|
||||
is_date_agg: false,
|
||||
})
|
||||
}
|
||||
DateHistogram(_) => {
|
||||
IntermediateAggregationResult::Bucket(IntermediateBucketResult::Histogram {
|
||||
buckets: Vec::new(),
|
||||
is_date_agg: true,
|
||||
column_type: None,
|
||||
})
|
||||
}
|
||||
Average(_) => IntermediateAggregationResult::Metric(IntermediateMetricResult::Average(
|
||||
@@ -349,8 +343,8 @@ pub enum IntermediateBucketResult {
|
||||
/// This is the histogram entry for a bucket, which contains a key, count, and optionally
|
||||
/// sub_aggregations.
|
||||
Histogram {
|
||||
/// The column_type of the underlying `Column` is DateTime
|
||||
is_date_agg: bool,
|
||||
/// The column_type of the underlying `Column`
|
||||
column_type: Option<ColumnType>,
|
||||
/// The buckets
|
||||
buckets: Vec<IntermediateHistogramBucketEntry>,
|
||||
},
|
||||
@@ -405,7 +399,7 @@ impl IntermediateBucketResult {
|
||||
Ok(BucketResult::Range { buckets })
|
||||
}
|
||||
IntermediateBucketResult::Histogram {
|
||||
is_date_agg,
|
||||
column_type,
|
||||
buckets,
|
||||
} => {
|
||||
let histogram_req = &req
|
||||
@@ -414,7 +408,7 @@ impl IntermediateBucketResult {
|
||||
.expect("unexpected aggregation, expected histogram aggregation");
|
||||
let buckets = intermediate_histogram_buckets_to_final_buckets(
|
||||
buckets,
|
||||
is_date_agg,
|
||||
column_type,
|
||||
histogram_req,
|
||||
req.sub_aggregation(),
|
||||
limits,
|
||||
@@ -463,11 +457,11 @@ impl IntermediateBucketResult {
|
||||
(
|
||||
IntermediateBucketResult::Histogram {
|
||||
buckets: buckets_left,
|
||||
is_date_agg: _,
|
||||
..
|
||||
},
|
||||
IntermediateBucketResult::Histogram {
|
||||
buckets: buckets_right,
|
||||
is_date_agg: _,
|
||||
..
|
||||
},
|
||||
) => {
|
||||
let buckets: Result<Vec<IntermediateHistogramBucketEntry>, TantivyError> =
|
||||
|
||||
@@ -620,7 +620,7 @@ impl IndexWriter {
|
||||
for worker_handle in former_workers_join_handle {
|
||||
let indexing_worker_result = worker_handle
|
||||
.join()
|
||||
.map_err(|e| TantivyError::ErrorInThread(e.to_string()))?;
|
||||
.map_err(|e| TantivyError::ErrorInThread(format!("{e:?}")))?;
|
||||
indexing_worker_result?;
|
||||
self.add_indexing_worker()?;
|
||||
}
|
||||
|
||||
@@ -13,7 +13,8 @@ description = "sstables for tantivy"
|
||||
common = {version= "0.6", path="../common", package="tantivy-common"}
|
||||
tantivy-fst = "0.4"
|
||||
# experimental gives us access to Decompressor::upper_bound
|
||||
zstd = { version = "0.12", features = ["experimental"] }
|
||||
zstd = { version = "0.13", features = ["experimental"] }
|
||||
tracing = "0.1"
|
||||
|
||||
[dev-dependencies]
|
||||
proptest = "1"
|
||||
|
||||
@@ -3,6 +3,7 @@ use std::io;
|
||||
use std::marker::PhantomData;
|
||||
use std::ops::{Bound, RangeBounds};
|
||||
use std::sync::Arc;
|
||||
use tracing::instrument;
|
||||
|
||||
use common::file_slice::FileSlice;
|
||||
use common::{BinarySerializable, OwnedBytes};
|
||||
@@ -94,6 +95,7 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
|
||||
Ok(TSSTable::delta_reader(data))
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub(crate) async fn sstable_delta_reader_block_async(
|
||||
&self,
|
||||
block_addr: BlockAddr,
|
||||
@@ -232,6 +234,7 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
|
||||
/// If the key was not found, returns Ok(None).
|
||||
/// After calling this function, it is possible to call `DeltaReader::value` to get the
|
||||
/// associated value.
|
||||
#[instrument(skip_all)]
|
||||
fn decode_up_to_key<K: AsRef<[u8]>>(
|
||||
&self,
|
||||
key: K,
|
||||
@@ -344,6 +347,7 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
|
||||
}
|
||||
|
||||
/// Lookups the value corresponding to the key.
|
||||
#[instrument(skip_all)]
|
||||
pub async fn get_async<K: AsRef<[u8]>>(&self, key: K) -> io::Result<Option<TSSTable::Value>> {
|
||||
if let Some(block_addr) = self.sstable_index.get_block_with_key(key.as_ref()) {
|
||||
let sstable_reader = self.sstable_delta_reader_block_async(block_addr).await?;
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use std::io::{self, Write};
|
||||
use std::ops::Range;
|
||||
|
||||
use tracing::instrument;
|
||||
use common::OwnedBytes;
|
||||
|
||||
use crate::{common_prefix_len, SSTable, SSTableDataCorruption, TermOrdinal};
|
||||
@@ -27,6 +28,7 @@ impl SSTableIndex {
|
||||
}
|
||||
|
||||
/// Get the [`BlockAddr`] of the requested block.
|
||||
#[instrument]
|
||||
pub(crate) fn get_block(&self, block_id: usize) -> Option<BlockAddr> {
|
||||
self.blocks
|
||||
.get(block_id)
|
||||
@@ -56,6 +58,7 @@ impl SSTableIndex {
|
||||
/// Get the [`BlockAddr`] of the block that would contain `key`.
|
||||
///
|
||||
/// Returns None if `key` is lexicographically after the last key recorded.
|
||||
#[instrument]
|
||||
pub fn get_block_with_key(&self, key: &[u8]) -> Option<BlockAddr> {
|
||||
self.locate_with_key(key).and_then(|id| self.get_block(id))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user