Fixes race condition in Searcher (#1464)

Fixes a race condition in Searcher, by avoiding repeated calls to open_segment_readers and passing them instead as argument

Closes #1461
This commit is contained in:
PSeitz
2022-08-24 05:17:37 -07:00
committed by GitHub
parent 535f1a5d83
commit bb01e99e05
2 changed files with 24 additions and 17 deletions

View File

@@ -247,6 +247,14 @@ impl SearcherInner {
generation: TrackedObject<SearcherGeneration>,
doc_store_cache_size: usize,
) -> io::Result<SearcherInner> {
assert_eq!(
&segment_readers
.iter()
.map(|reader| (reader.segment_id(), reader.delete_opstamp()))
.collect::<BTreeMap<_, _>>(),
generation.segments(),
"Set of segments referenced by this Searcher and its SearcherGeneration must match"
);
let store_readers: Vec<StoreReader> = segment_readers
.iter()
.map(|segment_reader| segment_reader.get_store_reader(doc_store_cache_size))

View File

@@ -164,21 +164,18 @@ impl InnerIndexReader {
doc_store_cache_size: usize,
index: Index,
warming_state: WarmingState,
// The searcher_generation_inventory is not used as source, but as target to track the
// loaded segments.
searcher_generation_inventory: Inventory<SearcherGeneration>,
) -> crate::Result<Self> {
let searcher_generation_counter: Arc<AtomicU64> = Default::default();
let segment_readers = Self::open_segment_readers(&index)?;
let searcher_generation = Self::create_new_searcher_generation(
&segment_readers,
&searcher_generation_counter,
&searcher_generation_inventory,
);
let searcher = Self::create_searcher(
&index,
doc_store_cache_size,
&warming_state,
searcher_generation,
&searcher_generation_counter,
&searcher_generation_inventory,
)?;
Ok(InnerIndexReader {
doc_store_cache_size,
@@ -204,12 +201,12 @@ impl InnerIndexReader {
Ok(segment_readers)
}
fn create_new_searcher_generation(
fn track_segment_readers_in_inventory(
segment_readers: &[SegmentReader],
searcher_generation_counter: &Arc<AtomicU64>,
searcher_generation_inventory: &Inventory<SearcherGeneration>,
) -> TrackedObject<SearcherGeneration> {
let generation_id = searcher_generation_counter.fetch_add(1, atomic::Ordering::Relaxed);
let generation_id = searcher_generation_counter.fetch_add(1, atomic::Ordering::AcqRel);
let searcher_generation =
SearcherGeneration::from_segment_readers(segment_readers, generation_id);
searcher_generation_inventory.track(searcher_generation)
@@ -219,9 +216,16 @@ impl InnerIndexReader {
index: &Index,
doc_store_cache_size: usize,
warming_state: &WarmingState,
searcher_generation: TrackedObject<SearcherGeneration>,
searcher_generation_counter: &Arc<AtomicU64>,
searcher_generation_inventory: &Inventory<SearcherGeneration>,
) -> crate::Result<Arc<SearcherInner>> {
let segment_readers = Self::open_segment_readers(index)?;
let searcher_generation = Self::track_segment_readers_in_inventory(
&segment_readers,
searcher_generation_counter,
searcher_generation_inventory,
);
let schema = index.schema();
let searcher = Arc::new(SearcherInner::new(
schema,
@@ -236,17 +240,12 @@ impl InnerIndexReader {
}
fn reload(&self) -> crate::Result<()> {
let segment_readers = Self::open_segment_readers(&self.index)?;
let searcher_generation = Self::create_new_searcher_generation(
&segment_readers,
&self.searcher_generation_counter,
&self.searcher_generation_inventory,
);
let searcher = Self::create_searcher(
&self.index,
self.doc_store_cache_size,
&self.warming_state,
searcher_generation,
&self.searcher_generation_counter,
&self.searcher_generation_inventory,
)?;
self.searcher.store(searcher);