diff --git a/src/common/composite_file.rs b/src/common/composite_file.rs new file mode 100644 index 000000000..bea35f9fa --- /dev/null +++ b/src/common/composite_file.rs @@ -0,0 +1,159 @@ +use std::io::Write; +use common::CountingWriter; +use std::collections::HashMap; +use schema::Field; +use common::VInt; +use std::io; +use directory::ReadOnlySource; +use common::BinarySerializable; + +pub struct CompositeWrite { + write: CountingWriter, + offsets: HashMap, +} + +impl CompositeWrite { + pub fn wrap(w: W) -> CompositeWrite { + CompositeWrite { + write: CountingWriter::wrap(w), + offsets: HashMap::new(), + } + } + + pub fn for_field(&mut self, field: Field) -> &mut CountingWriter { + let offset = self.write.written_bytes(); + assert!(!self.offsets.contains_key(&field)); + self.offsets.insert(field, offset); + &mut self.write + } + + pub fn close(&mut self) -> io::Result<()> { + let footer_offset = self.write.written_bytes(); + VInt(self.offsets.len() as u64).serialize(&mut self.write)?; + + let mut offset_fields: Vec<_> = self.offsets.iter() + .map(|(field, offset)| (offset, field)) + .collect(); + + offset_fields.sort(); + + let mut prev_offset = 0; + for (offset, field) in offset_fields { + VInt( (offset - prev_offset) as u64).serialize(&mut self.write)?; + field.serialize(&mut self.write)?; + prev_offset = *offset; + } + + let footer_len = (self.write.written_bytes() - footer_offset) as u32; + footer_len.serialize(&mut self.write)?; + self.write.flush()?; + Ok(()) + } +} + + +pub struct CompositeFile { + data: ReadOnlySource, + offsets_index: HashMap, +} + +impl CompositeFile { + pub fn open(data: ReadOnlySource) -> io::Result { + let end = data.len(); + let footer_len_data = data.slice(end - 4, end); + let footer_len = u32::deserialize(&mut footer_len_data.as_slice())? as usize; + + let footer_start = end - 4 - footer_len; + let footer_data = data.slice(footer_start, footer_start + footer_len); + let mut footer_buffer = footer_data.as_slice(); + let num_fields = VInt::deserialize(&mut footer_buffer)?.0 as usize; + + let mut fields = vec!(); + let mut offsets = vec!(); + + let mut field_index = HashMap::new(); + + let mut offset = 0; + for _ in 0..num_fields { + offset += VInt::deserialize(&mut footer_buffer)?.0 as usize; + let field = Field::deserialize(&mut footer_buffer)?; + offsets.push(offset); + fields.push(field); + } + offsets.push(footer_start); + for i in 0..num_fields { + let field = fields[i]; + let start_offset = offsets[i]; + let end_offset = offsets[i+1]; + field_index.insert(field, (start_offset, end_offset)); + } + + Ok(CompositeFile { + data: data.slice(0, footer_start), + offsets_index: field_index, + }) + } + + pub fn open_read(&self, field: Field) -> Option { + self.offsets_index + .get(&field) + .map(|&(from, to)| { + self.data.slice(from, to) + }) + } +} + + +#[cfg(test)] +mod test { + + use std::io::Write; + use super::{CompositeWrite, CompositeFile}; + use directory::{RAMDirectory, Directory}; + use schema::Field; + use common::VInt; + use common::BinarySerializable; + use std::path::Path; + + #[test] + fn test_composite_file() { + let path = Path::new("test_path"); + let mut directory = RAMDirectory::create(); + { + let w = directory.open_write(path).unwrap(); + let mut composite_write = CompositeWrite::wrap(w); + { + let mut write_0 = composite_write.for_field(Field(0u32)); + VInt(32431123u64).serialize(&mut write_0).unwrap(); + write_0.flush().unwrap(); + } + + { + let mut write_4 = composite_write.for_field(Field(4u32)); + VInt(2).serialize(&mut write_4).unwrap(); + write_4.flush().unwrap(); + } + composite_write.close().unwrap(); + } + { + let r = directory.open_read(path).unwrap(); + let composite_file = CompositeFile::open(r).unwrap(); + { + let file0 = composite_file.open_read(Field(0u32)).unwrap(); + let mut file0_buf = file0.as_slice(); + let payload_0 = VInt::deserialize(&mut file0_buf).unwrap().0; + assert_eq!(file0_buf.len(), 0); + assert_eq!(payload_0, 32431123u64); + } + { + let file4 = composite_file.open_read(Field(4u32)).unwrap(); + let mut file4_buf = file4.as_slice(); + let payload_4 = VInt::deserialize(&mut file4_buf).unwrap().0; + assert_eq!(file4_buf.len(), 0); + assert_eq!(payload_4, 2u64); + } + } + + } + +} \ No newline at end of file diff --git a/src/common/mod.rs b/src/common/mod.rs index 0af9d2417..e8c8763f1 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -2,8 +2,11 @@ mod serialize; mod timer; mod vint; mod counting_writer; +mod composite_file; pub mod bitpacker; + +pub(crate) use self::composite_file::{CompositeWrite, CompositeFile}; pub use self::serialize::BinarySerializable; pub use self::timer::Timing; pub use self::timer::TimerTree; diff --git a/src/fastfield/mod.rs b/src/fastfield/mod.rs index 8b47d3a0e..31b241388 100644 --- a/src/fastfield/mod.rs +++ b/src/fastfield/mod.rs @@ -94,7 +94,7 @@ mod tests { } let source = directory.open_read(&path).unwrap(); { - assert_eq!(source.len(), 38 as usize); + assert_eq!(source.len(), 35 as usize); } { let fast_field_readers = FastFieldsReader::from_source(source).unwrap(); @@ -128,7 +128,7 @@ mod tests { } let source = directory.open_read(&path).unwrap(); { - assert_eq!(source.len(), 63 as usize); + assert_eq!(source.len(), 60 as usize); } { let fast_field_readers = FastFieldsReader::from_source(source).unwrap(); @@ -164,7 +164,7 @@ mod tests { } let source = directory.open_read(&path).unwrap(); { - assert_eq!(source.len(), 36 as usize); + assert_eq!(source.len(), 33 as usize); } { let fast_field_readers = FastFieldsReader::from_source(source).unwrap(); @@ -197,7 +197,7 @@ mod tests { } let source = directory.open_read(&path).unwrap(); { - assert_eq!(source.len(), 80044 as usize); + assert_eq!(source.len(), 80041 as usize); } { let fast_field_readers = FastFieldsReader::from_source(source).unwrap(); @@ -233,7 +233,7 @@ mod tests { } let source = directory.open_read(&path).unwrap(); { - assert_eq!(source.len(), 17711 as usize); + assert_eq!(source.len(), 17708 as usize); } { let fast_field_readers = FastFieldsReader::from_source(source).unwrap(); diff --git a/src/fastfield/reader.rs b/src/fastfield/reader.rs index aae1dd797..2ec8f66fc 100644 --- a/src/fastfield/reader.rs +++ b/src/fastfield/reader.rs @@ -1,6 +1,6 @@ use std::io; -use std::collections::HashMap; use directory::ReadOnlySource; +use common::CompositeFile; use common::BinarySerializable; use DocId; use schema::{Field, SchemaBuilder}; @@ -240,8 +240,7 @@ impl FastFieldReader for I64FastFieldReader { /// It contains a mapping that associated these fields to /// the proper slice in the fastfield reader file. pub struct FastFieldsReader { - source: ReadOnlySource, - field_offsets: HashMap, + composite_file: CompositeFile, } impl FastFieldsReader { @@ -251,31 +250,9 @@ impl FastFieldsReader { /// the list of the offset is read (as a footer of the /// data file). pub fn from_source(source: ReadOnlySource) -> io::Result { - let header_offset; - let field_offsets: Vec<(Field, u32)>; - { - let buffer = source.as_slice(); - { - let mut cursor = buffer; - header_offset = u32::deserialize(&mut cursor)?; - } - { - let mut cursor = &buffer[header_offset as usize..]; - field_offsets = Vec::deserialize(&mut cursor)?; - } - } - let mut end_offsets: Vec = field_offsets.iter().map(|&(_, offset)| offset).collect(); - end_offsets.push(header_offset); - let mut field_offsets_map: HashMap = HashMap::new(); - for (field_start_offsets, stop_offset) in - field_offsets.iter().zip(end_offsets.iter().skip(1)) { - let (field, start_offset) = *field_start_offsets; - field_offsets_map.insert(field, (start_offset, *stop_offset)); - } Ok(FastFieldsReader { - field_offsets: field_offsets_map, - source: source, - }) + composite_file: CompositeFile::open(source)?, + }) } /// Returns the u64 fast value reader if the field @@ -287,11 +264,8 @@ impl FastFieldsReader { /// # Panics /// May panic if the index is corrupted. pub fn open_reader(&self, field: Field) -> Option { - self.field_offsets - .get(&field) - .map(|&(start, stop)| { - let field_source = self.source.slice(start as usize, stop as usize); - FFReader::open(field_source) - }) + self.composite_file + .open_read(field) + .map(FFReader::open) } } diff --git a/src/fastfield/serializer.rs b/src/fastfield/serializer.rs index ef6ffedf9..590aee84a 100644 --- a/src/fastfield/serializer.rs +++ b/src/fastfield/serializer.rs @@ -3,7 +3,8 @@ use directory::WritePtr; use schema::Field; use common::bitpacker::{compute_num_bits, BitPacker}; use common::CountingWriter; -use std::io::{self, Write, Seek, SeekFrom}; +use common::CompositeWrite; +use std::io::{self, Write}; /// `FastFieldSerializer` is in charge of serializing /// fastfields on disk. @@ -26,27 +27,17 @@ use std::io::{self, Write, Seek, SeekFrom}; /// * `close_field()` /// * `close()` pub struct FastFieldSerializer { - write: CountingWriter, - fields: Vec<(Field, u32)>, - min_value: u64, - field_open: bool, - bit_packer: BitPacker, + composite_write: CompositeWrite, } - impl FastFieldSerializer { /// Constructor pub fn new(write: WritePtr) -> io::Result { // just making room for the pointer to header. - let mut counting_writer = CountingWriter::wrap(write); - 0u32.serialize(&mut counting_writer)?; + let composite_write = CompositeWrite::wrap(write); Ok(FastFieldSerializer { - write: counting_writer, - fields: Vec::new(), - min_value: 0, - field_open: false, - bit_packer: BitPacker::new(0), - }) + composite_write: composite_write, + }) } /// Start serializing a new u64 fast field @@ -54,23 +45,48 @@ impl FastFieldSerializer { field: Field, min_value: u64, max_value: u64) - -> io::Result<()> { - if self.field_open { - return Err(io::Error::new(io::ErrorKind::Other, "Previous field not closed")); - } - self.min_value = min_value; - self.field_open = true; - self.fields.push((field, self.write.written_bytes() as u32)); - let write = &mut self.write; + -> io::Result>> { + let field_write = self + .composite_write + .for_field(field); + FastSingleFieldSerializer::open( + field_write, + min_value, + max_value) + } + + + /// Closes the serializer + /// + /// After this call the data must be persistently save on disk. + pub fn close(mut self) -> io::Result<()> { + self.composite_write.close() + } +} + +pub struct FastSingleFieldSerializer<'a, W: Write + 'a> { + bit_packer: BitPacker, + write: &'a mut W, + min_value: u64, +} + +impl<'a, W: Write> FastSingleFieldSerializer<'a, W> { + + fn open(write: &'a mut W, + min_value: u64, + max_value: u64) -> io::Result> { min_value.serialize(write)?; let amplitude = max_value - min_value; amplitude.serialize(write)?; let num_bits = compute_num_bits(amplitude); - self.bit_packer = BitPacker::new(num_bits as usize); - Ok(()) + let bit_packer = BitPacker::new(num_bits as usize); + Ok(FastSingleFieldSerializer { + write: write, + bit_packer: bit_packer, + min_value: min_value, + }) } - /// Pushes a new value to the currently open u64 fast field. pub fn add_val(&mut self, val: u64) -> io::Result<()> { let val_to_write: u64 = val - self.min_value; @@ -78,33 +94,7 @@ impl FastFieldSerializer { Ok(()) } - /// Close the u64 fast field. - pub fn close_field(&mut self) -> io::Result<()> { - if !self.field_open { - return Err(io::Error::new(io::ErrorKind::Other, "Current field is already closed")); - } - self.field_open = false; - // adding some padding to make sure we - // can read the last elements with our u64 - // cursor - self.bit_packer.close(&mut self.write)?; - Ok(()) - } - - - /// Closes the serializer - /// - /// After this call the data must be persistently save on disk. - pub fn close(self) -> io::Result { - if self.field_open { - return Err(io::Error::new(io::ErrorKind::Other, "Last field not closed")); - } - let header_offset: usize = self.write.written_bytes() as usize; - let (mut write, written_size) = self.write.finish()?; - self.fields.serialize(&mut write)?; - write.seek(SeekFrom::Start(0))?; - (header_offset as u32).serialize(&mut write)?; - write.flush()?; - Ok(written_size) + pub fn close_field(mut self) -> io::Result<()> { + self.bit_packer.close(&mut self.write) } } diff --git a/src/fastfield/writer.rs b/src/fastfield/writer.rs index 52b29972f..1427a7b36 100644 --- a/src/fastfield/writer.rs +++ b/src/fastfield/writer.rs @@ -208,13 +208,14 @@ impl IntFastFieldWriter { (self.val_min, self.val_max) }; - serializer.new_u64_fast_field(self.field, min, max)?; + + let mut single_field_serializer = serializer.new_u64_fast_field(self.field, min, max)?; let mut cursor = self.vals.as_slice(); while let Ok(VInt(val)) = VInt::deserialize(&mut cursor) { - serializer.add_val(val)?; + single_field_serializer.add_val(val)?; } - serializer.close_field() + single_field_serializer.close_field() } } diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 468d867e7..f150f831a 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -175,18 +175,19 @@ impl IndexMerger { assert!(min_val <= max_val); - fast_field_serializer + + let mut fast_single_field_serializer = fast_field_serializer .new_u64_fast_field(field, min_val, max_val)?; for (max_doc, u64_reader, delete_bitset) in u64_readers { for doc_id in 0..max_doc { if !delete_bitset.is_deleted(doc_id) { let val = u64_reader.get(doc_id); - fast_field_serializer.add_val(val)?; + fast_single_field_serializer.add_val(val)?; } } } - fast_field_serializer.close_field()?; + fast_single_field_serializer.close_field()?; } Ok(()) }