Compare commits

...

1 Commits

Author SHA1 Message Date
Paul Masurel
ee6c839ee6 Moving the StoreWriter out of the SegmentSerializer. 2020-03-10 09:53:56 +09:00
7 changed files with 226 additions and 32 deletions

View File

@@ -13,6 +13,7 @@ mod footer;
mod managed_directory;
mod ram_directory;
mod read_only_source;
mod spilling_writer;
mod watch_event_router;
/// Errors specific to the directory module.
@@ -22,6 +23,7 @@ pub use self::directory::DirectoryLock;
pub use self::directory::{Directory, DirectoryClone};
pub use self::directory_lock::{Lock, INDEX_WRITER_LOCK, META_LOCK};
pub use self::ram_directory::RAMDirectory;
pub(crate) use self::spilling_writer::SpillingWriter;
pub use self::read_only_source::ReadOnlySource;
pub use self::watch_event_router::{WatchCallback, WatchCallbackList, WatchHandle};
use std::io::{self, BufWriter, Write};
@@ -79,10 +81,16 @@ impl<W: TerminatingWrite> TerminatingWrite for BufWriter<W> {
}
}
impl TerminatingWrite for Vec<u8> {
fn terminate_ref(&mut self, _a: AntiCallToken) -> io::Result<()> {
Ok(())
}
}
#[cfg(test)]
impl<'a> TerminatingWrite for &'a mut Vec<u8> {
fn terminate_ref(&mut self, _a: AntiCallToken) -> io::Result<()> {
self.flush()
Ok(())
}
}

View File

@@ -0,0 +1,180 @@
use crate::directory::{WritePtr, TerminatingWrite};
use std::io::{self, Write};
enum SpillingState {
Buffer {
buffer: Vec<u8>,
capacity: usize,
write_factory: Box<dyn FnOnce() -> io::Result<WritePtr>>,
},
Spilled(WritePtr),
}
impl SpillingState {
fn new(
limit: usize,
write_factory: Box<dyn FnOnce() -> io::Result<WritePtr>>,
) -> SpillingState {
SpillingState::Buffer {
buffer: Vec::with_capacity(limit),
capacity: limit,
write_factory,
}
}
fn reserve(self, extra_capacity: usize) -> io::Result<SpillingState> {
match self {
SpillingState::Buffer {
buffer,
capacity,
write_factory,
} => {
if capacity >= extra_capacity {
Ok(SpillingState::Buffer {
buffer,
capacity: capacity - extra_capacity,
write_factory,
})
} else {
let mut wrt = write_factory()?;
wrt.write_all(&buffer[..])?;
Ok(SpillingState::Spilled(wrt))
}
}
SpillingState::Spilled(wrt) => Ok(SpillingState::Spilled(wrt)),
}
}
}
pub struct SpillingWriter {
state: Option<SpillingState>,
}
impl SpillingWriter {
pub fn new(
limit: usize,
write_factory: Box<dyn FnOnce() -> io::Result<WritePtr>>,
) -> SpillingWriter {
let state = SpillingState::new(limit, write_factory);
SpillingWriter {
state: Some(state)
}
}
pub fn flush_and_finalize(self) -> io::Result<()> {
if let SpillingState::Buffer {
buffer,
write_factory,
..
} = self.state.expect("State cannot be none") {
let mut wrt = write_factory()?;
wrt.write_all(&buffer[..])?;
wrt.flush()?;
wrt.terminate()?;
}
Ok(())
}
pub fn finalize(self) -> io::Result<SpillingResult> {
match self.state.expect("state cannot be None") {
SpillingState::Spilled(mut wrt) => {
wrt.flush()?;
Ok(SpillingResult::Spilled)
}
SpillingState::Buffer { buffer, .. } => Ok(SpillingResult::Buffer(buffer)),
}
}
}
pub enum SpillingResult {
Spilled,
Buffer(Vec<u8>),
}
impl io::Write for SpillingWriter {
fn write(&mut self, payload: &[u8]) -> io::Result<usize> {
self.write_all(payload)?;
Ok(payload.len())
}
fn flush(&mut self) -> io::Result<()> {
if let Some(SpillingState::Spilled(wrt)) = &mut self.state {
wrt.flush()?;
}
Ok(())
}
fn write_all(&mut self, payload: &[u8]) -> io::Result<()> {
let state_opt: Option<io::Result<SpillingState>> = self.state
.take()
.map(|mut state| {
state = state.reserve(payload.len())?;
match &mut state {
SpillingState::Buffer { buffer, .. } => {
buffer.extend_from_slice(payload);
}
SpillingState::Spilled(wrt) => {
wrt.write_all(payload)?;
}
}
Ok(state)
});
self.state = state_opt.transpose()?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::SpillingWriter;
use crate::directory::spilling_writer::SpillingResult;
use crate::directory::RAMDirectory;
use crate::Directory;
use std::io::{self, Write};
use std::path::Path;
#[test]
fn test_no_spilling() {
let ram_directory = RAMDirectory::create();
let mut ram_directory_clone = ram_directory.clone();
let path = Path::new("test");
let write_factory = Box::new(move || {
ram_directory_clone
.open_write(path)
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
});
let mut spilling_wrt = SpillingWriter::new(10, write_factory);
assert!(spilling_wrt.write_all(b"abcd").is_ok());
if let SpillingResult::Buffer(buf) = spilling_wrt.finalize().unwrap() {
assert_eq!(buf, b"abcd")
} else {
panic!("spill writer should not have spilled");
}
assert!(!ram_directory.exists(path));
}
#[test]
fn test_spilling() {
let ram_directory = RAMDirectory::create();
let mut ram_directory_clone = ram_directory.clone();
let path = Path::new("test");
let write_factory = Box::new(move || {
ram_directory_clone
.open_write(path)
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
});
let mut spilling_wrt = SpillingWriter::new(10, write_factory);
assert!(spilling_wrt.write_all(b"abcd").is_ok());
assert!(spilling_wrt.write_all(b"efghijklmnop").is_ok());
if let SpillingResult::Spilled = spilling_wrt.finalize().unwrap() {
} else {
panic!("spill writer should have spilled");
}
assert_eq!(
ram_directory.atomic_read(path).unwrap(),
b"abcdefghijklmnop"
);
}
}

View File

@@ -2,6 +2,7 @@ use crate::common::MAX_DOC_LIMIT;
use crate::core::Segment;
use crate::core::SegmentReader;
use crate::core::SerializableSegment;
use crate::directory::WritePtr;
use crate::docset::DocSet;
use crate::fastfield::BytesFastFieldReader;
use crate::fastfield::DeleteBitSet;
@@ -661,7 +662,8 @@ impl IndexMerger {
Ok(term_ordinal_mappings)
}
fn write_storable_fields(&self, store_writer: &mut StoreWriter) -> crate::Result<()> {
pub fn write_storable_fields(&self, store_wrt: WritePtr) -> crate::Result<()> {
let mut store_writer = StoreWriter::new(store_wrt);
for reader in &self.readers {
let store_reader = reader.get_store_reader();
if reader.num_deleted_docs() > 0 {
@@ -673,6 +675,7 @@ impl IndexMerger {
store_writer.stack(&store_reader)?;
}
}
store_writer.close()?;
Ok(())
}
}
@@ -682,7 +685,6 @@ impl SerializableSegment for IndexMerger {
let term_ord_mappings = self.write_postings(serializer.get_postings_serializer())?;
self.write_fieldnorms(serializer.get_fieldnorms_serializer())?;
self.write_fast_fields(serializer.get_fast_field_serializer(), term_ord_mappings)?;
self.write_storable_fields(serializer.get_store_writer())?;
serializer.close()?;
Ok(self.max_doc)
}

View File

@@ -3,12 +3,10 @@ use crate::core::SegmentComponent;
use crate::fastfield::FastFieldSerializer;
use crate::fieldnorm::FieldNormsSerializer;
use crate::postings::InvertedIndexSerializer;
use crate::store::StoreWriter;
/// Segment serializer is in charge of laying out on disk
/// the data accumulated and sorted by the `SegmentWriter`.
pub struct SegmentSerializer {
store_writer: StoreWriter,
fast_field_serializer: FastFieldSerializer,
fieldnorms_serializer: FieldNormsSerializer,
postings_serializer: InvertedIndexSerializer,
@@ -17,8 +15,6 @@ pub struct SegmentSerializer {
impl SegmentSerializer {
/// Creates a new `SegmentSerializer`.
pub fn for_segment(segment: &mut Segment) -> crate::Result<SegmentSerializer> {
let store_write = segment.open_write(SegmentComponent::STORE)?;
let fast_field_write = segment.open_write(SegmentComponent::FASTFIELDS)?;
let fast_field_serializer = FastFieldSerializer::from_write(fast_field_write)?;
@@ -27,7 +23,6 @@ impl SegmentSerializer {
let postings_serializer = InvertedIndexSerializer::open(segment)?;
Ok(SegmentSerializer {
store_writer: StoreWriter::new(store_write),
fast_field_serializer,
fieldnorms_serializer,
postings_serializer,
@@ -49,16 +44,10 @@ impl SegmentSerializer {
&mut self.fieldnorms_serializer
}
/// Accessor to the `StoreWriter`.
pub fn get_store_writer(&mut self) -> &mut StoreWriter {
&mut self.store_writer
}
/// Finalize the segment serialization.
pub fn close(self) -> crate::Result<()> {
self.fast_field_serializer.close()?;
self.postings_serializer.close()?;
self.store_writer.close()?;
self.fieldnorms_serializer.close()?;
Ok(())
}

View File

@@ -18,7 +18,7 @@ use crate::indexer::SegmentSerializer;
use crate::indexer::{DefaultMergePolicy, MergePolicy};
use crate::indexer::{MergeCandidate, MergeOperation};
use crate::schema::Schema;
use crate::Opstamp;
use crate::{Opstamp, SegmentComponent};
use futures::channel::oneshot;
use futures::executor::{ThreadPool, ThreadPoolBuilder};
use futures::future::Future;
@@ -134,8 +134,10 @@ fn merge(
// ... we just serialize this index merger in our new segment to merge the two segments.
let segment_serializer = SegmentSerializer::for_segment(&mut merged_segment)?;
let num_docs = merger.write(segment_serializer)?;
let store_wrt = merged_segment.open_write(SegmentComponent::STORE)?;
merger.write_storable_fields(store_wrt)?;
let num_docs = merger.write(segment_serializer)?;
let segment_meta = index.new_segment_meta(merged_segment.id(), num_docs);
Ok(SegmentEntry::new(segment_meta, delete_cursor, None))

View File

@@ -11,13 +11,15 @@ use crate::schema::Schema;
use crate::schema::Term;
use crate::schema::Value;
use crate::schema::{Field, FieldEntry};
use crate::store::StoreWriter;
use crate::tokenizer::{BoxTokenStream, PreTokenizedStream};
use crate::tokenizer::{FacetTokenizer, TextAnalyzer};
use crate::tokenizer::{TokenStreamChain, Tokenizer};
use crate::DocId;
use crate::Opstamp;
use crate::{DocId, SegmentComponent};
use std::io;
use std::str;
use crate::directory::SpillingWriter;
/// Computes the initial size of the hash table.
///
@@ -43,11 +45,12 @@ fn initial_table_size(per_thread_memory_budget: usize) -> crate::Result<usize> {
pub struct SegmentWriter {
max_doc: DocId,
multifield_postings: MultiFieldPostingsWriter,
segment_serializer: SegmentSerializer,
segment: Segment,
fast_field_writers: FastFieldsWriter,
fieldnorms_writer: FieldNormsWriter,
doc_opstamps: Vec<Opstamp>,
tokenizers: Vec<Option<TextAnalyzer>>,
store_writer: StoreWriter<SpillingWriter>,
}
impl SegmentWriter {
@@ -62,11 +65,10 @@ impl SegmentWriter {
/// - schema
pub fn for_segment(
memory_budget: usize,
mut segment: Segment,
segment: Segment,
schema: &Schema,
) -> crate::Result<SegmentWriter> {
let table_num_bits = initial_table_size(memory_budget)?;
let segment_serializer = SegmentSerializer::for_segment(&mut segment)?;
let multifield_postings = MultiFieldPostingsWriter::new(schema, table_num_bits);
let tokenizers = schema
.fields()
@@ -82,14 +84,22 @@ impl SegmentWriter {
},
)
.collect();
let mut segment_clone = segment.clone();
let spilling_wrt = SpillingWriter::new(1_000, Box::new(move || {
segment_clone
.open_write(SegmentComponent::STORE)
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
}));
let store_writer = StoreWriter::new(spilling_wrt);
Ok(SegmentWriter {
max_doc: 0,
multifield_postings,
fieldnorms_writer: FieldNormsWriter::for_schema(schema),
segment_serializer,
segment,
fast_field_writers: FastFieldsWriter::from_schema(schema),
doc_opstamps: Vec::with_capacity(1_000),
tokenizers,
store_writer,
})
}
@@ -99,11 +109,14 @@ impl SegmentWriter {
/// be used afterwards.
pub fn finalize(mut self) -> crate::Result<Vec<u64>> {
self.fieldnorms_writer.fill_up_to_max_doc(self.max_doc);
let spilling_wrt = self.store_writer.close()?;
spilling_wrt.flush_and_finalize()?;
let segment_serializer = SegmentSerializer::for_segment(&mut self.segment)?;
write(
&self.multifield_postings,
&self.fast_field_writers,
&self.fieldnorms_writer,
self.segment_serializer,
segment_serializer,
)?;
Ok(self.doc_opstamps)
}
@@ -246,8 +259,7 @@ impl SegmentWriter {
}
doc.filter_fields(|field| schema.get_field_entry(field).is_stored());
doc.prepare_for_store();
let doc_writer = self.segment_serializer.get_store_writer();
doc_writer.store(&doc)?;
self.store_writer.store(&doc)?;
self.max_doc += 1;
Ok(())
}

View File

@@ -3,8 +3,6 @@ use super::skiplist::SkipListBuilder;
use super::StoreReader;
use crate::common::CountingWriter;
use crate::common::{BinarySerializable, VInt};
use crate::directory::TerminatingWrite;
use crate::directory::WritePtr;
use crate::schema::Document;
use crate::DocId;
use std::io::{self, Write};
@@ -19,20 +17,20 @@ const BLOCK_SIZE: usize = 16_384;
///
/// The skip list index on the other hand, is built in memory.
///
pub struct StoreWriter {
pub struct StoreWriter<W: io::Write> {
doc: DocId,
offset_index_writer: SkipListBuilder<u64>,
writer: CountingWriter<WritePtr>,
writer: CountingWriter<W>,
intermediary_buffer: Vec<u8>,
current_block: Vec<u8>,
}
impl StoreWriter {
impl<W: io::Write> StoreWriter<W> {
/// Create a store writer.
///
/// The store writer will writes blocks on disc as
/// document are added.
pub fn new(writer: WritePtr) -> StoreWriter {
pub fn new(writer: W) -> StoreWriter<W> {
StoreWriter {
doc: 0,
offset_index_writer: SkipListBuilder::new(4),
@@ -102,7 +100,7 @@ impl StoreWriter {
///
/// Compress the last unfinished block if any,
/// and serializes the skip list index on disc.
pub fn close(mut self) -> io::Result<()> {
pub fn close(mut self) -> io::Result<W> {
if !self.current_block.is_empty() {
self.write_and_compress_block()?;
}
@@ -110,6 +108,9 @@ impl StoreWriter {
self.offset_index_writer.write(&mut self.writer)?;
header_offset.serialize(&mut self.writer)?;
self.doc.serialize(&mut self.writer)?;
self.writer.terminate()
self.writer.flush()?;
let (wrt, _) = self.writer.finish()?;
Ok(wrt)
}
}