diff --git a/src/core/segment_reader.rs b/src/core/segment_reader.rs index 148e34ab2..3860da881 100644 --- a/src/core/segment_reader.rs +++ b/src/core/segment_reader.rs @@ -124,7 +124,7 @@ impl SegmentReader { ); let source = try!(segment.open_read(SegmentComponent::TERMS)); let term_infos = try!(FstMap::from_source(source)); - let store_reader = StoreReader::new(try!(segment.open_read(SegmentComponent::STORE))); + let store_reader = StoreReader::from(try!(segment.open_read(SegmentComponent::STORE))); let postings_shared_mmap = try!(segment.open_read(SegmentComponent::POSTINGS)); let fast_field_data = try!(segment.open_read(SegmentComponent::FASTFIELDS)); diff --git a/src/datastruct/skip/mod.rs b/src/datastruct/skip/mod.rs index 1aa47c9a6..9c27b6283 100644 --- a/src/datastruct/skip/mod.rs +++ b/src/datastruct/skip/mod.rs @@ -19,7 +19,7 @@ mod tests { let mut skip_list_builder: SkipListBuilder = SkipListBuilder::new(10); skip_list_builder.insert(2, &3).unwrap(); skip_list_builder.write::>(&mut output).unwrap(); - let mut skip_list: SkipList = SkipList::read(&mut output); + let mut skip_list: SkipList = SkipList::from(output.as_slice()); assert_eq!(skip_list.next(), Some((2, 3))); } @@ -28,7 +28,7 @@ mod tests { let mut output: Vec = Vec::new(); let skip_list_builder: SkipListBuilder = SkipListBuilder::new(10); skip_list_builder.write::>(&mut output).unwrap(); - let mut skip_list: SkipList = SkipList::read(&mut output); + let mut skip_list: SkipList = SkipList::from(output.as_slice()); assert_eq!(skip_list.next(), None); } @@ -42,7 +42,7 @@ mod tests { skip_list_builder.insert(7, &()).unwrap(); skip_list_builder.insert(9, &()).unwrap(); skip_list_builder.write::>(&mut output).unwrap(); - let mut skip_list: SkipList<()> = SkipList::read(&mut output); + let mut skip_list: SkipList<()> = SkipList::from(output.as_slice()); assert_eq!(skip_list.next().unwrap(), (2, ())); assert_eq!(skip_list.next().unwrap(), (3, ())); assert_eq!(skip_list.next().unwrap(), (5, ())); @@ -61,7 +61,7 @@ mod tests { skip_list_builder.insert(7, &()).unwrap(); skip_list_builder.insert(9, &()).unwrap(); skip_list_builder.write::>(&mut output).unwrap(); - let mut skip_list: SkipList<()> = SkipList::read(&mut output); + let mut skip_list: SkipList<()> = SkipList::from(output.as_slice()); assert_eq!(skip_list.next().unwrap(), (2, ())); skip_list.seek(5); assert_eq!(skip_list.next().unwrap(), (5, ())); @@ -79,7 +79,7 @@ mod tests { skip_list_builder.insert(5, &()).unwrap(); skip_list_builder.insert(6, &()).unwrap(); skip_list_builder.write::>(&mut output).unwrap(); - let mut skip_list: SkipList<()> = SkipList::read(&mut output); + let mut skip_list: SkipList<()> = SkipList::from(output.as_slice()); assert_eq!(skip_list.next().unwrap(), (2, ())); skip_list.seek(6); assert_eq!(skip_list.next().unwrap(), (6, ())); @@ -96,7 +96,7 @@ mod tests { skip_list_builder.insert(7, &()).unwrap(); skip_list_builder.insert(9, &()).unwrap(); skip_list_builder.write::>(&mut output).unwrap(); - let mut skip_list: SkipList<()> = SkipList::read(&mut output); + let mut skip_list: SkipList<()> = SkipList::from(output.as_slice()); assert_eq!(skip_list.next().unwrap(), (2, ())); skip_list.seek(10); assert_eq!(skip_list.next(), None); @@ -111,7 +111,7 @@ mod tests { } skip_list_builder.insert(1004, &()).unwrap(); skip_list_builder.write::>(&mut output).unwrap(); - let mut skip_list: SkipList<()> = SkipList::read(&mut output); + let mut skip_list: SkipList<()> = SkipList::from(output.as_slice()); assert_eq!(skip_list.next().unwrap(), (0, ())); skip_list.seek(431); assert_eq!(skip_list.next().unwrap(), (431,()) ); diff --git a/src/datastruct/skip/skiplist.rs b/src/datastruct/skip/skiplist.rs index a34dede89..16e843b43 100644 --- a/src/datastruct/skip/skiplist.rs +++ b/src/datastruct/skip/skiplist.rs @@ -1,13 +1,13 @@ use common::BinarySerializable; -use std::io::Cursor; -use std::io::SeekFrom; -use std::io::Seek; use std::marker::PhantomData; use DocId; +use std::cmp::max; +static EMPTY: [u8; 0] = []; struct Layer<'a, T> { - cursor: Cursor<&'a [u8]>, + data: &'a [u8], + cursor: &'a [u8], next_id: DocId, _phantom_: PhantomData, } @@ -23,51 +23,41 @@ impl<'a, T: BinarySerializable> Iterator for Layer<'a, T> { else { let cur_val = T::deserialize(&mut self.cursor).unwrap(); let cur_id = self.next_id; - self.next_id = - match u32::deserialize(&mut self.cursor) { - Ok(val) => val, - Err(_) => u32::max_value() - }; + self.next_id = u32::deserialize(&mut self.cursor).unwrap_or(u32::max_value()); Some((cur_id, cur_val)) } } } - -static EMPTY: [u8; 0] = []; - -impl<'a, T: BinarySerializable> Layer<'a, T> { - - fn read(mut cursor: Cursor<&'a [u8]>) -> Layer<'a, T> { - // TODO error handling? - let next_id = match u32::deserialize(&mut cursor) { - Ok(val) => val, - Err(_) => u32::max_value(), - }; +impl<'a, T: BinarySerializable> From<&'a [u8]> for Layer<'a, T> { + fn from(data: &'a [u8]) -> Layer<'a, T> { + let mut cursor = data; + let next_id = u32::deserialize(&mut cursor).unwrap_or(u32::max_value()); Layer { + data: data, cursor: cursor, next_id: next_id, _phantom_: PhantomData, } } +} + +impl<'a, T: BinarySerializable> Layer<'a, T> { fn empty() -> Layer<'a, T> { Layer { - cursor: Cursor::new(&EMPTY), + data: &EMPTY, + cursor: &EMPTY, next_id: DocId::max_value(), _phantom_: PhantomData, } } - fn seek_offset(&mut self, offset: usize) { - self.cursor.seek(SeekFrom::Start(offset as u64)).unwrap(); - self.next_id = match u32::deserialize(&mut self.cursor) { - Ok(val) => val, - Err(_) => u32::max_value(), - }; + self.cursor = &self.data[offset..]; + self.next_id = u32::deserialize(&mut self.cursor).unwrap_or(u32::max_value()); } - + // Returns the last element (key, val) // such that (key < doc_id) // @@ -104,8 +94,7 @@ impl<'a, T: BinarySerializable> SkipList<'a, T> { pub fn seek(&mut self, doc_id: DocId) -> Option<(DocId, T)> { let mut next_layer_skip: Option<(DocId, u32)> = None; - for skip_layer_id in 0..self.skip_layers.len() { - let mut skip_layer: &mut Layer<'a, u32> = &mut self.skip_layers[skip_layer_id]; + for skip_layer in &mut self.skip_layers { if let Some((_, offset)) = next_layer_skip { skip_layer.seek_offset(offset as usize); } @@ -117,37 +106,32 @@ impl<'a, T: BinarySerializable> SkipList<'a, T> { self.data_layer.seek(doc_id) } - pub fn read(data: &'a [u8]) -> SkipList<'a, T> { - let mut cursor = Cursor::new(data); - let offsets: Vec = Vec::deserialize(&mut cursor).unwrap(); + +} + + +impl<'a, T: BinarySerializable> From<&'a [u8]> for SkipList<'a, T> { + + fn from(mut data: &'a [u8]) -> SkipList<'a, T> { + let offsets: Vec = Vec::deserialize(&mut data).unwrap(); let num_layers = offsets.len(); - let start_position = cursor.position() as usize; - let layers_data: &[u8] = &data[start_position..data.len()]; + let layers_data: &[u8] = data; let data_layer: Layer<'a, T> = if num_layers == 0 { Layer::empty() } else { let first_layer_data: &[u8] = &layers_data[..offsets[0] as usize]; - let first_layer_cursor = Cursor::new(first_layer_data); - Layer::read(first_layer_cursor) + Layer::from(first_layer_data) }; - let mut skip_layers = - if num_layers > 0 { - offsets.iter() - .zip(&offsets[1..]) - .map(|(start, stop)| { - let layer_data: &[u8] = &layers_data[*start as usize..*stop as usize]; - let cursor = Cursor::new(layer_data); - Layer::read(cursor) - }) - .collect() - } - else { - Vec::new() - }; - skip_layers.reverse(); + let skip_layers = (0..max(1, num_layers) - 1) + .map(|i| (offsets[i] as usize, offsets[i + 1] as usize)) + .map(|(start, stop)| { + Layer::from(&layers_data[start..stop]) + }) + .collect(); SkipList { skip_layers: skip_layers, data_layer: data_layer, } } + } diff --git a/src/datastruct/skip/skiplist_builder.rs b/src/datastruct/skip/skiplist_builder.rs index e12444f11..b83406029 100644 --- a/src/datastruct/skip/skiplist_builder.rs +++ b/src/datastruct/skip/skiplist_builder.rs @@ -37,17 +37,13 @@ impl LayerBuilder { self.remaining -= 1; self.len += 1; let offset = self.written_size() as u32; // TODO not sure if we want after or here - let res = - if self.remaining == 0 { - self.remaining = self.period; - Some((doc_id, offset)) - } - else { - None - }; try!(doc_id.serialize(&mut self.buffer)); try!(value.serialize(&mut self.buffer)); - Ok(res) + Ok(if self.remaining == 0 { + self.remaining = self.period; + Some((doc_id, offset)) + } + else { None }) } } @@ -97,13 +93,13 @@ impl SkipListBuilder { let mut layer_sizes: Vec = Vec::new(); size += self.data_layer.buffer.len() as u32; layer_sizes.push(size); - for layer in &self.skip_layers { + for layer in self.skip_layers.iter().rev() { size += layer.buffer.len() as u32; layer_sizes.push(size); } try!(layer_sizes.serialize(output)); try!(self.data_layer.write(output)); - for layer in &self.skip_layers { + for layer in self.skip_layers.iter().rev() { try!(layer.write(output)); } Ok(()) diff --git a/src/indexer/segment_serializer.rs b/src/indexer/segment_serializer.rs index 6e27f6a94..bfbca0faf 100644 --- a/src/indexer/segment_serializer.rs +++ b/src/indexer/segment_serializer.rs @@ -21,9 +21,8 @@ pub struct SegmentSerializer { } impl SegmentSerializer { - /// Creates a new `SegmentSerializer`. - pub fn for_segment(segment: &mut Segment) -> Result { + pub fn for_segment(segment: &mut Segment) -> Result { let store_write = try!(segment.open_write(SegmentComponent::STORE)); let fast_field_write = try!(segment.open_write(SegmentComponent::FASTFIELDS)); @@ -41,39 +40,39 @@ impl SegmentSerializer { fieldnorms_serializer: fieldnorms_serializer, }) } - + /// Accessor to the `PostingsSerializer`. - pub fn get_postings_serializer(&mut self,) -> &mut PostingsSerializer { + pub fn get_postings_serializer(&mut self) -> &mut PostingsSerializer { &mut self.postings_serializer } /// Accessor to the `FastFieldSerializer`. - pub fn get_fast_field_serializer(&mut self,) -> &mut FastFieldSerializer { + pub fn get_fast_field_serializer(&mut self) -> &mut FastFieldSerializer { &mut self.fast_field_serializer } - + /// Accessor to the field norm serializer. - pub fn get_fieldnorms_serializer(&mut self,) -> &mut FastFieldSerializer { + pub fn get_fieldnorms_serializer(&mut self) -> &mut FastFieldSerializer { &mut self.fieldnorms_serializer } - + /// Accessor to the `StoreWriter`. - pub fn get_store_writer(&mut self,) -> &mut StoreWriter { + pub fn get_store_writer(&mut self) -> &mut StoreWriter { &mut self.store_writer } - + /// Write the `SegmentInfo` pub fn write_segment_info(&mut self, segment_info: &SegmentInfo) -> Result<()> { let mut write = try!(self.segment.open_write(SegmentComponent::INFO)); let json_data = json::encode(segment_info) - .expect("Encoding to segment_info to JSON failed. This should never happen"); + .expect("Encoding to segment_info to JSON failed. This should never happen"); try!(write.write_all(json_data.as_bytes())); try!(write.flush()); Ok(()) } - + /// Finalize the segment serialization. - pub fn close(mut self,) -> Result<()> { + pub fn close(self) -> Result<()> { try!(self.fast_field_serializer.close()); try!(self.postings_serializer.close()); try!(self.store_writer.close()); diff --git a/src/store/mod.rs b/src/store/mod.rs index fd62741ce..2825dee54 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -1,12 +1,8 @@ mod reader; mod writer; - -use DocId; pub use self::reader::StoreReader; pub use self::writer::StoreWriter; -#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd)] -pub struct OffsetIndex(DocId, u64); #[cfg(test)] mod tests { @@ -55,7 +51,7 @@ mod tests { let schema = write_lorem_ipsum_store(store_file); let field_title = schema.get_field("title").unwrap(); let store_source = directory.open_read(path).unwrap(); - let store = StoreReader::new(store_source); + let store = StoreReader::from(store_source); for i in (0..10).map(|i| i * 3 / 2) { assert_eq!(*store.get(i).unwrap().get_first(field_title).unwrap().text(), format!("Doc {}", i)); } @@ -78,7 +74,7 @@ mod tests { let path = Path::new("store"); write_lorem_ipsum_store(directory.open_write(path).unwrap()); let store_source = directory.open_read(path).unwrap(); - let store = StoreReader::new(store_source); + let store = StoreReader::from(store_source); b.iter(|| { store.get(12).unwrap(); }); diff --git a/src/store/reader.rs b/src/store/reader.rs index ee774d7d2..c8969bbb7 100644 --- a/src/store/reader.rs +++ b/src/store/reader.rs @@ -6,53 +6,23 @@ use DocId; use schema::Document; use schema::FieldValue; use common::BinarySerializable; - -use std::io::Read; -use std::io; -use std::cmp::Ordering; +use std::mem::size_of; +use std::io::{self, Read}; +use datastruct::SkipList; use lz4; -use super::OffsetIndex; - pub struct StoreReader { pub data: ReadOnlySource, - pub offsets: Vec, + pub offset_index_source: ReadOnlySource, current_block: RefCell>, + pub max_doc: DocId, } impl StoreReader { - - fn read_header(data: &ReadOnlySource) -> Vec { - // TODO err - // the first offset is implicitely (0, 0) - let mut offsets = vec!(OffsetIndex(0, 0)); - let buffer: &[u8] = data.as_slice(); - - let offset = { - let mut cursor = &buffer[buffer.len() - 8..]; - u64::deserialize(&mut cursor).unwrap() as usize - }; - { - let mut cursor = &buffer[offset..]; - offsets.append(&mut Vec::deserialize(&mut cursor).unwrap()); - } - offsets - } - - fn block_offset(&self, seek: DocId) -> OffsetIndex { - fn search(offsets: &[OffsetIndex], seek: DocId) -> OffsetIndex { - let m = offsets.len() / 2; - let pivot_offset = &offsets[m]; - if offsets.len() <= 1 { - return pivot_offset.clone() - } - match pivot_offset.0.cmp(&seek) { - Ordering::Less => search(&offsets[m..], seek), - Ordering::Equal => pivot_offset.clone(), - Ordering::Greater => search(&offsets[..m], seek), - } - } - search(&self.offsets, seek) + fn block_offset(&self, doc_id: DocId) -> (DocId, u64) { + SkipList::from(self.offset_index_source.as_slice()) + .seek(doc_id) + .unwrap_or((0u32, 0u64)) } fn read_block(&self, block_offset: usize) -> io::Result<()> { @@ -61,17 +31,18 @@ impl StoreReader { let total_buffer = self.data.as_slice(); let mut cursor = &total_buffer[block_offset..]; let block_length = u32::deserialize(&mut cursor).unwrap(); - let block_array: &[u8] = &total_buffer[(block_offset + 4 as usize)..(block_offset + 4 + block_length as usize)]; + let block_array: &[u8] = + &total_buffer[(block_offset + 4 as usize)..(block_offset + 4 + block_length as usize)]; let mut lz4_decoder = try!(lz4::Decoder::new(block_array)); lz4_decoder.read_to_end(&mut current_block_mut).map(|_| ()) } pub fn get(&self, doc_id: DocId) -> Result { - let OffsetIndex(first_doc_id, block_offset) = self.block_offset(doc_id); + let (first_doc_id, block_offset) = self.block_offset(doc_id); try!(self.read_block(block_offset as usize)); let current_block_mut = self.current_block.borrow_mut(); let mut cursor = ¤t_block_mut[..]; - for _ in first_doc_id..doc_id { + for _ in first_doc_id..doc_id { let block_length = try!(u32::deserialize(&mut cursor)); cursor = &cursor[block_length as usize..]; } @@ -84,13 +55,29 @@ impl StoreReader { } Ok(Document::from(field_values)) } +} - pub fn new(data: ReadOnlySource) -> StoreReader { - let offsets = StoreReader::read_header(&data); + +fn split_source(data: ReadOnlySource) -> (ReadOnlySource, ReadOnlySource, DocId) { + let data_len = data.len(); + let footer_offset = data_len - size_of::() - size_of::(); + let serialized_offset: ReadOnlySource = data.slice(footer_offset, data_len); + let mut serialized_offset_buf = serialized_offset.as_slice(); + let offset = u64::deserialize(&mut serialized_offset_buf).unwrap(); + let offset = offset as usize; + let max_doc = u32::deserialize(&mut serialized_offset_buf).unwrap(); + (data.slice(0, offset), data.slice(offset, footer_offset), max_doc) +} + + +impl From for StoreReader { + fn from(data: ReadOnlySource) -> StoreReader { + let (data_source, offset_index_source, max_doc) = split_source(data); StoreReader { - data: data, - offsets: offsets, + data: data_source, + offset_index_source: offset_index_source, current_block: RefCell::new(Vec::new()), + max_doc: max_doc, } } } diff --git a/src/store/writer.rs b/src/store/writer.rs index 032e65c71..569d8f509 100644 --- a/src/store/writer.rs +++ b/src/store/writer.rs @@ -3,66 +3,45 @@ use DocId; use schema::FieldValue; use common::BinarySerializable; use std::io::Write; -use std::io::Read; use std::io; +use error::Result; use lz4; +use datastruct::SkipListBuilder; use super::StoreReader; -use super::OffsetIndex; const BLOCK_SIZE: usize = 16_384; pub struct StoreWriter { doc: DocId, - offsets: Vec, // TODO have a better index. written: u64, + offset_index_writer: SkipListBuilder, writer: WritePtr, intermediary_buffer: Vec, current_block: Vec, } -impl BinarySerializable for OffsetIndex { - fn serialize(&self, writer: &mut Write) -> io::Result { - let OffsetIndex(a, b) = *self; - Ok(try!(a.serialize(writer)) + try!(b.serialize(writer))) - } - fn deserialize(reader: &mut Read) -> io::Result { - let a = try!(DocId::deserialize(reader)); - let b = try!(u64::deserialize(reader)); - Ok(OffsetIndex(a, b)) - } -} impl StoreWriter { - pub fn new(writer: WritePtr) -> StoreWriter { StoreWriter { doc: 0, written: 0, - offsets: Vec::new(), + offset_index_writer: SkipListBuilder::new(3), writer: writer, intermediary_buffer: Vec::new(), current_block: Vec::new(), } } - pub fn stack_reader(&mut self, reader: &StoreReader) -> io::Result<()> { - if !self.current_block.is_empty() { - try!(self.write_and_compress_block()); - } - match reader.offsets.last() { - Some(&OffsetIndex(ref num_docs, ref body_size)) => { - try!(self.writer.write_all(&reader.data.as_slice()[0..*body_size as usize])); - for &OffsetIndex(doc, offset) in &reader.offsets { - self.offsets.push(OffsetIndex(self.doc + doc, self.written + offset)); - } - self.written += *body_size; - self.doc += *num_docs; - Ok(()) - }, - None => { - Err(io::Error::new(io::ErrorKind::Other, "No offset for reader")) - } + pub fn stack_reader(&mut self, reader: &StoreReader) -> Result<()> { + for doc_id in 0..reader.max_doc { + let doc = try!(reader.get(doc_id)); + let field_values: Vec<&FieldValue> = doc.field_values() + .iter() + .collect(); + try!(self.store(&field_values)); } + Ok(()) } pub fn store<'a>(&mut self, field_values: &[&'a FieldValue]) -> io::Result<()> { @@ -80,7 +59,7 @@ impl StoreWriter { Ok(()) } - fn write_and_compress_block(&mut self,) -> io::Result<()> { + fn write_and_compress_block(&mut self) -> io::Result<()> { self.intermediary_buffer.clear(); { let mut encoder = try!(lz4::EncoderBuilder::new().build(&mut self.intermediary_buffer)); @@ -92,19 +71,19 @@ impl StoreWriter { self.written += try!((compressed_block_size as u32).serialize(&mut self.writer)) as u64; try!(self.writer.write_all(&self.intermediary_buffer)); self.written += compressed_block_size; - self.offsets.push(OffsetIndex(self.doc, self.written)); + try!(self.offset_index_writer.insert(self.doc, &self.written)); self.current_block.clear(); Ok(()) } - pub fn close(&mut self,) -> io::Result<()> { + pub fn close(mut self) -> io::Result<()> { if !self.current_block.is_empty() { try!(self.write_and_compress_block()); } let header_offset: u64 = self.written; - try!(self.offsets.serialize(&mut self.writer)); + try!(self.offset_index_writer.write::>(&mut self.writer)); try!(header_offset.serialize(&mut self.writer)); + try!(self.doc.serialize(&mut self.writer)); self.writer.flush() } - }