mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-06-03 09:00:42 +00:00
Working but dirty...
This commit is contained in:
@@ -26,7 +26,6 @@ pub struct IndexWriter {
|
||||
segment_ready_receiver: chan::Receiver<Result<(SegmentId, usize)>>,
|
||||
document_receiver: chan::Receiver<Document>,
|
||||
document_sender: chan::Sender<Document>,
|
||||
target_num_docs: usize,
|
||||
num_threads: usize,
|
||||
docstamp: u64,
|
||||
|
||||
@@ -42,6 +41,11 @@ fn index_documents(block_store: &mut BlockStore,
|
||||
let mut segment_writer = try!(SegmentWriter::for_segment(block_store, segment, &schema));
|
||||
for doc in document_iterator {
|
||||
try!(segment_writer.add_document(&doc, &schema));
|
||||
if segment_writer.is_buffer_full() {
|
||||
println!("no more space committing.");
|
||||
println!("seg max doc {}", segment_writer.max_doc());
|
||||
break;
|
||||
}
|
||||
}
|
||||
let num_docs = segment_writer.max_doc() as usize;
|
||||
try!(segment_writer.finalize());
|
||||
@@ -55,25 +59,19 @@ impl IndexWriter {
|
||||
/// Spawns a new worker thread for indexing.
|
||||
/// The thread consumes documents from the pipeline.
|
||||
///
|
||||
/// When target_num_docs is reached, or when the channel
|
||||
/// is closed, the worker flushes its current segment to disc,
|
||||
/// and sends its segment_id through the channel.
|
||||
///
|
||||
fn add_indexing_worker(&mut self,) -> Result<()> {
|
||||
let index = self.index.clone();
|
||||
let schema = self.index.schema();
|
||||
let segment_ready_sender_clone = self.segment_ready_sender.clone();
|
||||
let document_receiver_clone = self.document_receiver.clone();
|
||||
let target_num_docs = self.target_num_docs;
|
||||
let join_handle: JoinHandle<()> = thread::spawn(move || {
|
||||
let mut block_store = BlockStore::allocate(500_000);
|
||||
let mut block_store = BlockStore::allocate(1_000_000);
|
||||
loop {
|
||||
let segment = index.new_segment();
|
||||
let segment_id = segment.id();
|
||||
let mut document_iterator = document_receiver_clone
|
||||
.clone()
|
||||
.into_iter()
|
||||
.take(target_num_docs)
|
||||
.peekable();
|
||||
// the peeking here is to avoid
|
||||
// creating a new segment's files
|
||||
@@ -105,7 +103,6 @@ impl IndexWriter {
|
||||
segment_ready_sender: segment_ready_sender,
|
||||
document_receiver: document_receiver,
|
||||
document_sender: document_sender,
|
||||
target_num_docs: 100_000,
|
||||
workers_join_handle: Vec::new(),
|
||||
num_threads: num_threads,
|
||||
docstamp: try!(index.docstamp()),
|
||||
|
||||
@@ -63,6 +63,7 @@ fn posting_from_field_entry(field_entry: &FieldEntry) -> Box<PostingsWriter> {
|
||||
}
|
||||
|
||||
|
||||
|
||||
impl<'a> SegmentWriter<'a> {
|
||||
|
||||
pub fn for_segment(block_store: &'a mut BlockStore, mut segment: Segment, schema: &Schema) -> Result<SegmentWriter<'a>> {
|
||||
@@ -103,7 +104,11 @@ impl<'a> SegmentWriter<'a> {
|
||||
segment_info,
|
||||
self.segment_serializer)
|
||||
}
|
||||
|
||||
|
||||
pub fn is_buffer_full(&self,) -> bool {
|
||||
self.block_store.num_free_blocks() < 1000
|
||||
}
|
||||
|
||||
pub fn add_document(&mut self, doc: &Document, schema: &Schema) -> io::Result<()> {
|
||||
let doc_id = self.max_doc;
|
||||
for (field, field_values) in doc.get_sorted_fields() {
|
||||
|
||||
@@ -36,6 +36,10 @@ impl BlockStore {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn num_free_blocks(&self) -> usize {
|
||||
self.blocks.len() - self.free_block_id
|
||||
}
|
||||
|
||||
pub fn new_list(&mut self) -> u32 {
|
||||
let res = self.lists.len() as u32;
|
||||
let new_block_id = self.new_block().unwrap();
|
||||
@@ -128,11 +132,18 @@ impl<'a> Iterator for BlockIterator<'a> {
|
||||
None
|
||||
}
|
||||
else {
|
||||
if self.cursor % (BLOCK_SIZE as usize) == 0 {
|
||||
if self.cursor != 0 {
|
||||
if self.current_block.next != u32::max_value() {
|
||||
self.current_block = &self.blocks[self.current_block.next as usize];
|
||||
}
|
||||
else {
|
||||
panic!("Block linked list ended prematurely.");
|
||||
}
|
||||
}
|
||||
}
|
||||
let res = self.current_block.data[self.cursor % (BLOCK_SIZE as usize)];
|
||||
self.cursor += 1;
|
||||
if self.cursor % (BLOCK_SIZE as usize) == 0 {
|
||||
self.current_block = &self.blocks[self.current_block.next as usize];
|
||||
}
|
||||
Some(res)
|
||||
}
|
||||
|
||||
|
||||
@@ -167,23 +167,3 @@ mod tests {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
// #[cfg(test)]
|
||||
// mod tests {
|
||||
|
||||
// use super::*;
|
||||
// use test::Bencher;
|
||||
|
||||
//
|
||||
// #[bench]
|
||||
// fn bench_single_intersection(b: &mut Bencher) {
|
||||
// b.iter(|| {
|
||||
// let docs = VecPostings::new((0..1_000_000).collect());
|
||||
// let intersection = IntersectionDocSet::from_postings(vec!(docs));
|
||||
// intersection.count()
|
||||
// });
|
||||
// }
|
||||
// }
|
||||
//
|
||||
@@ -23,11 +23,9 @@ pub struct SpecializedPostingsWriter<Rec: Recorder + 'static> {
|
||||
|
||||
fn get_or_create_recorder<'a, Rec: Recorder>(term: Term, term_index: &'a mut HashMap<Term, Rec>, block_store: &mut BlockStore) -> &'a mut Rec {
|
||||
if term_index.contains_key(&term) {
|
||||
println!("recorder here");
|
||||
term_index.get_mut(&term).unwrap()
|
||||
}
|
||||
else {
|
||||
println!("adding recorder");
|
||||
let recorder = Rec::new(block_store);
|
||||
term_index
|
||||
.entry(term)
|
||||
|
||||
Reference in New Issue
Block a user