mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-01-06 01:02:55 +00:00
Compare commits
1 Commits
refact-cod
...
storewrite
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ee6c839ee6 |
@@ -13,6 +13,7 @@ mod footer;
|
|||||||
mod managed_directory;
|
mod managed_directory;
|
||||||
mod ram_directory;
|
mod ram_directory;
|
||||||
mod read_only_source;
|
mod read_only_source;
|
||||||
|
mod spilling_writer;
|
||||||
mod watch_event_router;
|
mod watch_event_router;
|
||||||
|
|
||||||
/// Errors specific to the directory module.
|
/// Errors specific to the directory module.
|
||||||
@@ -22,6 +23,7 @@ pub use self::directory::DirectoryLock;
|
|||||||
pub use self::directory::{Directory, DirectoryClone};
|
pub use self::directory::{Directory, DirectoryClone};
|
||||||
pub use self::directory_lock::{Lock, INDEX_WRITER_LOCK, META_LOCK};
|
pub use self::directory_lock::{Lock, INDEX_WRITER_LOCK, META_LOCK};
|
||||||
pub use self::ram_directory::RAMDirectory;
|
pub use self::ram_directory::RAMDirectory;
|
||||||
|
pub(crate) use self::spilling_writer::SpillingWriter;
|
||||||
pub use self::read_only_source::ReadOnlySource;
|
pub use self::read_only_source::ReadOnlySource;
|
||||||
pub use self::watch_event_router::{WatchCallback, WatchCallbackList, WatchHandle};
|
pub use self::watch_event_router::{WatchCallback, WatchCallbackList, WatchHandle};
|
||||||
use std::io::{self, BufWriter, Write};
|
use std::io::{self, BufWriter, Write};
|
||||||
@@ -79,10 +81,16 @@ impl<W: TerminatingWrite> TerminatingWrite for BufWriter<W> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl TerminatingWrite for Vec<u8> {
|
||||||
|
fn terminate_ref(&mut self, _a: AntiCallToken) -> io::Result<()> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
impl<'a> TerminatingWrite for &'a mut Vec<u8> {
|
impl<'a> TerminatingWrite for &'a mut Vec<u8> {
|
||||||
fn terminate_ref(&mut self, _a: AntiCallToken) -> io::Result<()> {
|
fn terminate_ref(&mut self, _a: AntiCallToken) -> io::Result<()> {
|
||||||
self.flush()
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
180
src/directory/spilling_writer.rs
Normal file
180
src/directory/spilling_writer.rs
Normal file
@@ -0,0 +1,180 @@
|
|||||||
|
use crate::directory::{WritePtr, TerminatingWrite};
|
||||||
|
use std::io::{self, Write};
|
||||||
|
|
||||||
|
enum SpillingState {
|
||||||
|
Buffer {
|
||||||
|
buffer: Vec<u8>,
|
||||||
|
capacity: usize,
|
||||||
|
write_factory: Box<dyn FnOnce() -> io::Result<WritePtr>>,
|
||||||
|
},
|
||||||
|
Spilled(WritePtr),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SpillingState {
|
||||||
|
|
||||||
|
fn new(
|
||||||
|
limit: usize,
|
||||||
|
write_factory: Box<dyn FnOnce() -> io::Result<WritePtr>>,
|
||||||
|
) -> SpillingState {
|
||||||
|
SpillingState::Buffer {
|
||||||
|
buffer: Vec::with_capacity(limit),
|
||||||
|
capacity: limit,
|
||||||
|
write_factory,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn reserve(self, extra_capacity: usize) -> io::Result<SpillingState> {
|
||||||
|
match self {
|
||||||
|
SpillingState::Buffer {
|
||||||
|
buffer,
|
||||||
|
capacity,
|
||||||
|
write_factory,
|
||||||
|
} => {
|
||||||
|
if capacity >= extra_capacity {
|
||||||
|
Ok(SpillingState::Buffer {
|
||||||
|
buffer,
|
||||||
|
capacity: capacity - extra_capacity,
|
||||||
|
write_factory,
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
let mut wrt = write_factory()?;
|
||||||
|
wrt.write_all(&buffer[..])?;
|
||||||
|
Ok(SpillingState::Spilled(wrt))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
SpillingState::Spilled(wrt) => Ok(SpillingState::Spilled(wrt)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct SpillingWriter {
|
||||||
|
state: Option<SpillingState>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SpillingWriter {
|
||||||
|
pub fn new(
|
||||||
|
limit: usize,
|
||||||
|
write_factory: Box<dyn FnOnce() -> io::Result<WritePtr>>,
|
||||||
|
) -> SpillingWriter {
|
||||||
|
let state = SpillingState::new(limit, write_factory);
|
||||||
|
SpillingWriter {
|
||||||
|
state: Some(state)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn flush_and_finalize(self) -> io::Result<()> {
|
||||||
|
if let SpillingState::Buffer {
|
||||||
|
buffer,
|
||||||
|
write_factory,
|
||||||
|
..
|
||||||
|
} = self.state.expect("State cannot be none") {
|
||||||
|
let mut wrt = write_factory()?;
|
||||||
|
wrt.write_all(&buffer[..])?;
|
||||||
|
wrt.flush()?;
|
||||||
|
wrt.terminate()?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn finalize(self) -> io::Result<SpillingResult> {
|
||||||
|
match self.state.expect("state cannot be None") {
|
||||||
|
SpillingState::Spilled(mut wrt) => {
|
||||||
|
wrt.flush()?;
|
||||||
|
Ok(SpillingResult::Spilled)
|
||||||
|
}
|
||||||
|
SpillingState::Buffer { buffer, .. } => Ok(SpillingResult::Buffer(buffer)),
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub enum SpillingResult {
|
||||||
|
Spilled,
|
||||||
|
Buffer(Vec<u8>),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl io::Write for SpillingWriter {
|
||||||
|
fn write(&mut self, payload: &[u8]) -> io::Result<usize> {
|
||||||
|
self.write_all(payload)?;
|
||||||
|
Ok(payload.len())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn flush(&mut self) -> io::Result<()> {
|
||||||
|
if let Some(SpillingState::Spilled(wrt)) = &mut self.state {
|
||||||
|
wrt.flush()?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn write_all(&mut self, payload: &[u8]) -> io::Result<()> {
|
||||||
|
let state_opt: Option<io::Result<SpillingState>> = self.state
|
||||||
|
.take()
|
||||||
|
.map(|mut state| {
|
||||||
|
state = state.reserve(payload.len())?;
|
||||||
|
match &mut state {
|
||||||
|
SpillingState::Buffer { buffer, .. } => {
|
||||||
|
buffer.extend_from_slice(payload);
|
||||||
|
}
|
||||||
|
SpillingState::Spilled(wrt) => {
|
||||||
|
wrt.write_all(payload)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(state)
|
||||||
|
});
|
||||||
|
self.state = state_opt.transpose()?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::SpillingWriter;
|
||||||
|
use crate::directory::spilling_writer::SpillingResult;
|
||||||
|
use crate::directory::RAMDirectory;
|
||||||
|
use crate::Directory;
|
||||||
|
use std::io::{self, Write};
|
||||||
|
use std::path::Path;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_no_spilling() {
|
||||||
|
let ram_directory = RAMDirectory::create();
|
||||||
|
let mut ram_directory_clone = ram_directory.clone();
|
||||||
|
let path = Path::new("test");
|
||||||
|
let write_factory = Box::new(move || {
|
||||||
|
ram_directory_clone
|
||||||
|
.open_write(path)
|
||||||
|
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
|
||||||
|
});
|
||||||
|
let mut spilling_wrt = SpillingWriter::new(10, write_factory);
|
||||||
|
assert!(spilling_wrt.write_all(b"abcd").is_ok());
|
||||||
|
if let SpillingResult::Buffer(buf) = spilling_wrt.finalize().unwrap() {
|
||||||
|
assert_eq!(buf, b"abcd")
|
||||||
|
} else {
|
||||||
|
panic!("spill writer should not have spilled");
|
||||||
|
}
|
||||||
|
assert!(!ram_directory.exists(path));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_spilling() {
|
||||||
|
let ram_directory = RAMDirectory::create();
|
||||||
|
let mut ram_directory_clone = ram_directory.clone();
|
||||||
|
let path = Path::new("test");
|
||||||
|
let write_factory = Box::new(move || {
|
||||||
|
ram_directory_clone
|
||||||
|
.open_write(path)
|
||||||
|
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
|
||||||
|
});
|
||||||
|
let mut spilling_wrt = SpillingWriter::new(10, write_factory);
|
||||||
|
assert!(spilling_wrt.write_all(b"abcd").is_ok());
|
||||||
|
assert!(spilling_wrt.write_all(b"efghijklmnop").is_ok());
|
||||||
|
if let SpillingResult::Spilled = spilling_wrt.finalize().unwrap() {
|
||||||
|
} else {
|
||||||
|
panic!("spill writer should have spilled");
|
||||||
|
}
|
||||||
|
assert_eq!(
|
||||||
|
ram_directory.atomic_read(path).unwrap(),
|
||||||
|
b"abcdefghijklmnop"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -2,6 +2,7 @@ use crate::common::MAX_DOC_LIMIT;
|
|||||||
use crate::core::Segment;
|
use crate::core::Segment;
|
||||||
use crate::core::SegmentReader;
|
use crate::core::SegmentReader;
|
||||||
use crate::core::SerializableSegment;
|
use crate::core::SerializableSegment;
|
||||||
|
use crate::directory::WritePtr;
|
||||||
use crate::docset::DocSet;
|
use crate::docset::DocSet;
|
||||||
use crate::fastfield::BytesFastFieldReader;
|
use crate::fastfield::BytesFastFieldReader;
|
||||||
use crate::fastfield::DeleteBitSet;
|
use crate::fastfield::DeleteBitSet;
|
||||||
@@ -661,7 +662,8 @@ impl IndexMerger {
|
|||||||
Ok(term_ordinal_mappings)
|
Ok(term_ordinal_mappings)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn write_storable_fields(&self, store_writer: &mut StoreWriter) -> crate::Result<()> {
|
pub fn write_storable_fields(&self, store_wrt: WritePtr) -> crate::Result<()> {
|
||||||
|
let mut store_writer = StoreWriter::new(store_wrt);
|
||||||
for reader in &self.readers {
|
for reader in &self.readers {
|
||||||
let store_reader = reader.get_store_reader();
|
let store_reader = reader.get_store_reader();
|
||||||
if reader.num_deleted_docs() > 0 {
|
if reader.num_deleted_docs() > 0 {
|
||||||
@@ -673,6 +675,7 @@ impl IndexMerger {
|
|||||||
store_writer.stack(&store_reader)?;
|
store_writer.stack(&store_reader)?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
store_writer.close()?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -682,7 +685,6 @@ impl SerializableSegment for IndexMerger {
|
|||||||
let term_ord_mappings = self.write_postings(serializer.get_postings_serializer())?;
|
let term_ord_mappings = self.write_postings(serializer.get_postings_serializer())?;
|
||||||
self.write_fieldnorms(serializer.get_fieldnorms_serializer())?;
|
self.write_fieldnorms(serializer.get_fieldnorms_serializer())?;
|
||||||
self.write_fast_fields(serializer.get_fast_field_serializer(), term_ord_mappings)?;
|
self.write_fast_fields(serializer.get_fast_field_serializer(), term_ord_mappings)?;
|
||||||
self.write_storable_fields(serializer.get_store_writer())?;
|
|
||||||
serializer.close()?;
|
serializer.close()?;
|
||||||
Ok(self.max_doc)
|
Ok(self.max_doc)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,12 +3,10 @@ use crate::core::SegmentComponent;
|
|||||||
use crate::fastfield::FastFieldSerializer;
|
use crate::fastfield::FastFieldSerializer;
|
||||||
use crate::fieldnorm::FieldNormsSerializer;
|
use crate::fieldnorm::FieldNormsSerializer;
|
||||||
use crate::postings::InvertedIndexSerializer;
|
use crate::postings::InvertedIndexSerializer;
|
||||||
use crate::store::StoreWriter;
|
|
||||||
|
|
||||||
/// Segment serializer is in charge of laying out on disk
|
/// Segment serializer is in charge of laying out on disk
|
||||||
/// the data accumulated and sorted by the `SegmentWriter`.
|
/// the data accumulated and sorted by the `SegmentWriter`.
|
||||||
pub struct SegmentSerializer {
|
pub struct SegmentSerializer {
|
||||||
store_writer: StoreWriter,
|
|
||||||
fast_field_serializer: FastFieldSerializer,
|
fast_field_serializer: FastFieldSerializer,
|
||||||
fieldnorms_serializer: FieldNormsSerializer,
|
fieldnorms_serializer: FieldNormsSerializer,
|
||||||
postings_serializer: InvertedIndexSerializer,
|
postings_serializer: InvertedIndexSerializer,
|
||||||
@@ -17,8 +15,6 @@ pub struct SegmentSerializer {
|
|||||||
impl SegmentSerializer {
|
impl SegmentSerializer {
|
||||||
/// Creates a new `SegmentSerializer`.
|
/// Creates a new `SegmentSerializer`.
|
||||||
pub fn for_segment(segment: &mut Segment) -> crate::Result<SegmentSerializer> {
|
pub fn for_segment(segment: &mut Segment) -> crate::Result<SegmentSerializer> {
|
||||||
let store_write = segment.open_write(SegmentComponent::STORE)?;
|
|
||||||
|
|
||||||
let fast_field_write = segment.open_write(SegmentComponent::FASTFIELDS)?;
|
let fast_field_write = segment.open_write(SegmentComponent::FASTFIELDS)?;
|
||||||
let fast_field_serializer = FastFieldSerializer::from_write(fast_field_write)?;
|
let fast_field_serializer = FastFieldSerializer::from_write(fast_field_write)?;
|
||||||
|
|
||||||
@@ -27,7 +23,6 @@ impl SegmentSerializer {
|
|||||||
|
|
||||||
let postings_serializer = InvertedIndexSerializer::open(segment)?;
|
let postings_serializer = InvertedIndexSerializer::open(segment)?;
|
||||||
Ok(SegmentSerializer {
|
Ok(SegmentSerializer {
|
||||||
store_writer: StoreWriter::new(store_write),
|
|
||||||
fast_field_serializer,
|
fast_field_serializer,
|
||||||
fieldnorms_serializer,
|
fieldnorms_serializer,
|
||||||
postings_serializer,
|
postings_serializer,
|
||||||
@@ -49,16 +44,10 @@ impl SegmentSerializer {
|
|||||||
&mut self.fieldnorms_serializer
|
&mut self.fieldnorms_serializer
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Accessor to the `StoreWriter`.
|
|
||||||
pub fn get_store_writer(&mut self) -> &mut StoreWriter {
|
|
||||||
&mut self.store_writer
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Finalize the segment serialization.
|
/// Finalize the segment serialization.
|
||||||
pub fn close(self) -> crate::Result<()> {
|
pub fn close(self) -> crate::Result<()> {
|
||||||
self.fast_field_serializer.close()?;
|
self.fast_field_serializer.close()?;
|
||||||
self.postings_serializer.close()?;
|
self.postings_serializer.close()?;
|
||||||
self.store_writer.close()?;
|
|
||||||
self.fieldnorms_serializer.close()?;
|
self.fieldnorms_serializer.close()?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -18,7 +18,7 @@ use crate::indexer::SegmentSerializer;
|
|||||||
use crate::indexer::{DefaultMergePolicy, MergePolicy};
|
use crate::indexer::{DefaultMergePolicy, MergePolicy};
|
||||||
use crate::indexer::{MergeCandidate, MergeOperation};
|
use crate::indexer::{MergeCandidate, MergeOperation};
|
||||||
use crate::schema::Schema;
|
use crate::schema::Schema;
|
||||||
use crate::Opstamp;
|
use crate::{Opstamp, SegmentComponent};
|
||||||
use futures::channel::oneshot;
|
use futures::channel::oneshot;
|
||||||
use futures::executor::{ThreadPool, ThreadPoolBuilder};
|
use futures::executor::{ThreadPool, ThreadPoolBuilder};
|
||||||
use futures::future::Future;
|
use futures::future::Future;
|
||||||
@@ -134,8 +134,10 @@ fn merge(
|
|||||||
// ... we just serialize this index merger in our new segment to merge the two segments.
|
// ... we just serialize this index merger in our new segment to merge the two segments.
|
||||||
let segment_serializer = SegmentSerializer::for_segment(&mut merged_segment)?;
|
let segment_serializer = SegmentSerializer::for_segment(&mut merged_segment)?;
|
||||||
|
|
||||||
let num_docs = merger.write(segment_serializer)?;
|
let store_wrt = merged_segment.open_write(SegmentComponent::STORE)?;
|
||||||
|
merger.write_storable_fields(store_wrt)?;
|
||||||
|
|
||||||
|
let num_docs = merger.write(segment_serializer)?;
|
||||||
let segment_meta = index.new_segment_meta(merged_segment.id(), num_docs);
|
let segment_meta = index.new_segment_meta(merged_segment.id(), num_docs);
|
||||||
|
|
||||||
Ok(SegmentEntry::new(segment_meta, delete_cursor, None))
|
Ok(SegmentEntry::new(segment_meta, delete_cursor, None))
|
||||||
|
|||||||
@@ -11,13 +11,15 @@ use crate::schema::Schema;
|
|||||||
use crate::schema::Term;
|
use crate::schema::Term;
|
||||||
use crate::schema::Value;
|
use crate::schema::Value;
|
||||||
use crate::schema::{Field, FieldEntry};
|
use crate::schema::{Field, FieldEntry};
|
||||||
|
use crate::store::StoreWriter;
|
||||||
use crate::tokenizer::{BoxTokenStream, PreTokenizedStream};
|
use crate::tokenizer::{BoxTokenStream, PreTokenizedStream};
|
||||||
use crate::tokenizer::{FacetTokenizer, TextAnalyzer};
|
use crate::tokenizer::{FacetTokenizer, TextAnalyzer};
|
||||||
use crate::tokenizer::{TokenStreamChain, Tokenizer};
|
use crate::tokenizer::{TokenStreamChain, Tokenizer};
|
||||||
use crate::DocId;
|
|
||||||
use crate::Opstamp;
|
use crate::Opstamp;
|
||||||
|
use crate::{DocId, SegmentComponent};
|
||||||
use std::io;
|
use std::io;
|
||||||
use std::str;
|
use std::str;
|
||||||
|
use crate::directory::SpillingWriter;
|
||||||
|
|
||||||
/// Computes the initial size of the hash table.
|
/// Computes the initial size of the hash table.
|
||||||
///
|
///
|
||||||
@@ -43,11 +45,12 @@ fn initial_table_size(per_thread_memory_budget: usize) -> crate::Result<usize> {
|
|||||||
pub struct SegmentWriter {
|
pub struct SegmentWriter {
|
||||||
max_doc: DocId,
|
max_doc: DocId,
|
||||||
multifield_postings: MultiFieldPostingsWriter,
|
multifield_postings: MultiFieldPostingsWriter,
|
||||||
segment_serializer: SegmentSerializer,
|
segment: Segment,
|
||||||
fast_field_writers: FastFieldsWriter,
|
fast_field_writers: FastFieldsWriter,
|
||||||
fieldnorms_writer: FieldNormsWriter,
|
fieldnorms_writer: FieldNormsWriter,
|
||||||
doc_opstamps: Vec<Opstamp>,
|
doc_opstamps: Vec<Opstamp>,
|
||||||
tokenizers: Vec<Option<TextAnalyzer>>,
|
tokenizers: Vec<Option<TextAnalyzer>>,
|
||||||
|
store_writer: StoreWriter<SpillingWriter>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SegmentWriter {
|
impl SegmentWriter {
|
||||||
@@ -62,11 +65,10 @@ impl SegmentWriter {
|
|||||||
/// - schema
|
/// - schema
|
||||||
pub fn for_segment(
|
pub fn for_segment(
|
||||||
memory_budget: usize,
|
memory_budget: usize,
|
||||||
mut segment: Segment,
|
segment: Segment,
|
||||||
schema: &Schema,
|
schema: &Schema,
|
||||||
) -> crate::Result<SegmentWriter> {
|
) -> crate::Result<SegmentWriter> {
|
||||||
let table_num_bits = initial_table_size(memory_budget)?;
|
let table_num_bits = initial_table_size(memory_budget)?;
|
||||||
let segment_serializer = SegmentSerializer::for_segment(&mut segment)?;
|
|
||||||
let multifield_postings = MultiFieldPostingsWriter::new(schema, table_num_bits);
|
let multifield_postings = MultiFieldPostingsWriter::new(schema, table_num_bits);
|
||||||
let tokenizers = schema
|
let tokenizers = schema
|
||||||
.fields()
|
.fields()
|
||||||
@@ -82,14 +84,22 @@ impl SegmentWriter {
|
|||||||
},
|
},
|
||||||
)
|
)
|
||||||
.collect();
|
.collect();
|
||||||
|
let mut segment_clone = segment.clone();
|
||||||
|
let spilling_wrt = SpillingWriter::new(1_000, Box::new(move || {
|
||||||
|
segment_clone
|
||||||
|
.open_write(SegmentComponent::STORE)
|
||||||
|
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
|
||||||
|
}));
|
||||||
|
let store_writer = StoreWriter::new(spilling_wrt);
|
||||||
Ok(SegmentWriter {
|
Ok(SegmentWriter {
|
||||||
max_doc: 0,
|
max_doc: 0,
|
||||||
multifield_postings,
|
multifield_postings,
|
||||||
fieldnorms_writer: FieldNormsWriter::for_schema(schema),
|
fieldnorms_writer: FieldNormsWriter::for_schema(schema),
|
||||||
segment_serializer,
|
segment,
|
||||||
fast_field_writers: FastFieldsWriter::from_schema(schema),
|
fast_field_writers: FastFieldsWriter::from_schema(schema),
|
||||||
doc_opstamps: Vec::with_capacity(1_000),
|
doc_opstamps: Vec::with_capacity(1_000),
|
||||||
tokenizers,
|
tokenizers,
|
||||||
|
store_writer,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -99,11 +109,14 @@ impl SegmentWriter {
|
|||||||
/// be used afterwards.
|
/// be used afterwards.
|
||||||
pub fn finalize(mut self) -> crate::Result<Vec<u64>> {
|
pub fn finalize(mut self) -> crate::Result<Vec<u64>> {
|
||||||
self.fieldnorms_writer.fill_up_to_max_doc(self.max_doc);
|
self.fieldnorms_writer.fill_up_to_max_doc(self.max_doc);
|
||||||
|
let spilling_wrt = self.store_writer.close()?;
|
||||||
|
spilling_wrt.flush_and_finalize()?;
|
||||||
|
let segment_serializer = SegmentSerializer::for_segment(&mut self.segment)?;
|
||||||
write(
|
write(
|
||||||
&self.multifield_postings,
|
&self.multifield_postings,
|
||||||
&self.fast_field_writers,
|
&self.fast_field_writers,
|
||||||
&self.fieldnorms_writer,
|
&self.fieldnorms_writer,
|
||||||
self.segment_serializer,
|
segment_serializer,
|
||||||
)?;
|
)?;
|
||||||
Ok(self.doc_opstamps)
|
Ok(self.doc_opstamps)
|
||||||
}
|
}
|
||||||
@@ -246,8 +259,7 @@ impl SegmentWriter {
|
|||||||
}
|
}
|
||||||
doc.filter_fields(|field| schema.get_field_entry(field).is_stored());
|
doc.filter_fields(|field| schema.get_field_entry(field).is_stored());
|
||||||
doc.prepare_for_store();
|
doc.prepare_for_store();
|
||||||
let doc_writer = self.segment_serializer.get_store_writer();
|
self.store_writer.store(&doc)?;
|
||||||
doc_writer.store(&doc)?;
|
|
||||||
self.max_doc += 1;
|
self.max_doc += 1;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,8 +3,6 @@ use super::skiplist::SkipListBuilder;
|
|||||||
use super::StoreReader;
|
use super::StoreReader;
|
||||||
use crate::common::CountingWriter;
|
use crate::common::CountingWriter;
|
||||||
use crate::common::{BinarySerializable, VInt};
|
use crate::common::{BinarySerializable, VInt};
|
||||||
use crate::directory::TerminatingWrite;
|
|
||||||
use crate::directory::WritePtr;
|
|
||||||
use crate::schema::Document;
|
use crate::schema::Document;
|
||||||
use crate::DocId;
|
use crate::DocId;
|
||||||
use std::io::{self, Write};
|
use std::io::{self, Write};
|
||||||
@@ -19,20 +17,20 @@ const BLOCK_SIZE: usize = 16_384;
|
|||||||
///
|
///
|
||||||
/// The skip list index on the other hand, is built in memory.
|
/// The skip list index on the other hand, is built in memory.
|
||||||
///
|
///
|
||||||
pub struct StoreWriter {
|
pub struct StoreWriter<W: io::Write> {
|
||||||
doc: DocId,
|
doc: DocId,
|
||||||
offset_index_writer: SkipListBuilder<u64>,
|
offset_index_writer: SkipListBuilder<u64>,
|
||||||
writer: CountingWriter<WritePtr>,
|
writer: CountingWriter<W>,
|
||||||
intermediary_buffer: Vec<u8>,
|
intermediary_buffer: Vec<u8>,
|
||||||
current_block: Vec<u8>,
|
current_block: Vec<u8>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl StoreWriter {
|
impl<W: io::Write> StoreWriter<W> {
|
||||||
/// Create a store writer.
|
/// Create a store writer.
|
||||||
///
|
///
|
||||||
/// The store writer will writes blocks on disc as
|
/// The store writer will writes blocks on disc as
|
||||||
/// document are added.
|
/// document are added.
|
||||||
pub fn new(writer: WritePtr) -> StoreWriter {
|
pub fn new(writer: W) -> StoreWriter<W> {
|
||||||
StoreWriter {
|
StoreWriter {
|
||||||
doc: 0,
|
doc: 0,
|
||||||
offset_index_writer: SkipListBuilder::new(4),
|
offset_index_writer: SkipListBuilder::new(4),
|
||||||
@@ -102,7 +100,7 @@ impl StoreWriter {
|
|||||||
///
|
///
|
||||||
/// Compress the last unfinished block if any,
|
/// Compress the last unfinished block if any,
|
||||||
/// and serializes the skip list index on disc.
|
/// and serializes the skip list index on disc.
|
||||||
pub fn close(mut self) -> io::Result<()> {
|
pub fn close(mut self) -> io::Result<W> {
|
||||||
if !self.current_block.is_empty() {
|
if !self.current_block.is_empty() {
|
||||||
self.write_and_compress_block()?;
|
self.write_and_compress_block()?;
|
||||||
}
|
}
|
||||||
@@ -110,6 +108,9 @@ impl StoreWriter {
|
|||||||
self.offset_index_writer.write(&mut self.writer)?;
|
self.offset_index_writer.write(&mut self.writer)?;
|
||||||
header_offset.serialize(&mut self.writer)?;
|
header_offset.serialize(&mut self.writer)?;
|
||||||
self.doc.serialize(&mut self.writer)?;
|
self.doc.serialize(&mut self.writer)?;
|
||||||
self.writer.terminate()
|
self.writer.flush()?;
|
||||||
|
let (wrt, _) = self.writer.finish()?;
|
||||||
|
Ok(wrt)
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user