Merge pull request #2673 from paradedb/stuhood.fix-order-by-dup-string

Fix `TopDocs::order_by_string_fast_field` for duplicates
This commit is contained in:
trinity-1686a
2025-07-30 18:25:03 +02:00
committed by GitHub
2 changed files with 104 additions and 18 deletions

View File

@@ -1529,6 +1529,72 @@ mod tests {
Ok(()) Ok(())
} }
proptest! {
#[test]
fn test_top_field_collect_string_prop(
order in prop_oneof!(Just(Order::Desc), Just(Order::Asc)),
limit in 1..256_usize,
offset in 0..256_usize,
segments_terms in
proptest::collection::vec(
proptest::collection::vec(0..32_u8, 1..32_usize),
0..8_usize,
)
) {
let mut schema_builder = Schema::builder();
let city = schema_builder.add_text_field("city", TEXT | FAST);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut index_writer = index.writer_for_tests()?;
// A Vec<Vec<u8>>, where the outer Vec represents segments, and the inner Vec
// represents terms.
for segment_terms in segments_terms.into_iter() {
for term in segment_terms.into_iter() {
let term = format!("{term:0>3}");
index_writer.add_document(doc!(
city => term,
))?;
}
index_writer.commit()?;
}
let searcher = index.reader()?.searcher();
let top_n_results = searcher.search(&AllQuery, &TopDocs::with_limit(limit)
.and_offset(offset)
.order_by_string_fast_field("city", order.clone()))?;
let all_results = searcher.search(&AllQuery, &DocSetCollector)?.into_iter().map(|doc_address| {
// Get the term for this address.
// NOTE: We can't determine the SegmentIds that will be generated for Segments
// ahead of time, so we can't pre-compute the expected `DocAddress`es.
let column = searcher.segment_readers()[doc_address.segment_ord as usize].fast_fields().str("city").unwrap().unwrap();
let term_ord = column.term_ords(doc_address.doc_id).next().unwrap();
let mut city = Vec::new();
column.dictionary().ord_to_term(term_ord, &mut city).unwrap();
(String::try_from(city).unwrap(), doc_address)
});
// Using the TopDocs collector should always be equivalent to sorting, skipping the
// offset, and then taking the limit.
let sorted_docs: Vec<_> = if order.is_desc() {
let mut comparable_docs: Vec<ComparableDoc<_, _, true>> =
all_results.into_iter().map(|(feature, doc)| ComparableDoc { feature, doc}).collect();
comparable_docs.sort();
comparable_docs.into_iter().map(|cd| (cd.feature, cd.doc)).collect()
} else {
let mut comparable_docs: Vec<ComparableDoc<_, _, false>> =
all_results.into_iter().map(|(feature, doc)| ComparableDoc { feature, doc}).collect();
comparable_docs.sort();
comparable_docs.into_iter().map(|cd| (cd.feature, cd.doc)).collect()
};
let expected_docs = sorted_docs.into_iter().skip(offset).take(limit).collect::<Vec<_>>();
prop_assert_eq!(
expected_docs,
top_n_results
);
}
}
#[test] #[test]
#[should_panic] #[should_panic]
fn test_field_does_not_exist() { fn test_field_does_not_exist() {

View File

@@ -507,38 +507,58 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
/// Returns true if and only if all terms have been found. /// Returns true if and only if all terms have been found.
pub fn sorted_ords_to_term_cb<F: FnMut(&[u8]) -> io::Result<()>>( pub fn sorted_ords_to_term_cb<F: FnMut(&[u8]) -> io::Result<()>>(
&self, &self,
ord: impl Iterator<Item = TermOrdinal>, mut ords: impl Iterator<Item = TermOrdinal>,
mut cb: F, mut cb: F,
) -> io::Result<bool> { ) -> io::Result<bool> {
let Some(mut ord) = ords.next() else {
return Ok(true);
};
// Open the block for the first ordinal.
let mut bytes = Vec::new(); let mut bytes = Vec::new();
let mut current_block_addr = self.sstable_index.get_block_with_ord(0); let mut current_block_addr = self.sstable_index.get_block_with_ord(ord);
let mut current_sstable_delta_reader = let mut current_sstable_delta_reader =
self.sstable_delta_reader_block(current_block_addr.clone())?; self.sstable_delta_reader_block(current_block_addr.clone())?;
let mut current_ordinal = 0; let mut current_block_ordinal = current_block_addr.first_ordinal;
for ord in ord {
assert!(ord >= current_ordinal);
// check if block changed for new term_ord
let new_block_addr = self.sstable_index.get_block_with_ord(ord);
if new_block_addr != current_block_addr {
current_block_addr = new_block_addr;
current_ordinal = current_block_addr.first_ordinal;
current_sstable_delta_reader =
self.sstable_delta_reader_block(current_block_addr.clone())?;
bytes.clear();
}
// move to ord inside that block loop {
for _ in current_ordinal..=ord { // move to the ord inside the current block
while current_block_ordinal <= ord {
if !current_sstable_delta_reader.advance()? { if !current_sstable_delta_reader.advance()? {
return Ok(false); return Ok(false);
} }
bytes.truncate(current_sstable_delta_reader.common_prefix_len()); bytes.truncate(current_sstable_delta_reader.common_prefix_len());
bytes.extend_from_slice(current_sstable_delta_reader.suffix()); bytes.extend_from_slice(current_sstable_delta_reader.suffix());
current_block_ordinal += 1;
} }
current_ordinal = ord + 1;
cb(&bytes)?; cb(&bytes)?;
// fetch the next ordinal
let Some(next_ord) = ords.next() else {
return Ok(true);
};
// advance forward if the new ord is different than the one we just processed
//
// this allows the input TermOrdinal iterator to contain duplicates, so long as it's
// still sorted
if next_ord < ord {
panic!("Ordinals were not sorted: received {next_ord} after {ord}");
} else if next_ord > ord {
// check if block changed for new term_ord
let new_block_addr = self.sstable_index.get_block_with_ord(next_ord);
if new_block_addr != current_block_addr {
current_block_addr = new_block_addr;
current_block_ordinal = current_block_addr.first_ordinal;
current_sstable_delta_reader =
self.sstable_delta_reader_block(current_block_addr.clone())?;
bytes.clear();
}
ord = next_ord;
} else {
// The next ord is equal to the previous ord: no need to seek or advance.
}
} }
Ok(true)
} }
/// Returns the number of terms in the dictionary. /// Returns the number of terms in the dictionary.