Using composite file in fast field.

This commit is contained in:
Paul Masurel
2017-08-12 18:45:59 +09:00
parent 8f377b92d0
commit f9203228be
7 changed files with 226 additions and 98 deletions

View File

@@ -0,0 +1,159 @@
use std::io::Write;
use common::CountingWriter;
use std::collections::HashMap;
use schema::Field;
use common::VInt;
use std::io;
use directory::ReadOnlySource;
use common::BinarySerializable;
pub struct CompositeWrite<W: Write> {
write: CountingWriter<W>,
offsets: HashMap<Field, usize>,
}
impl<W: Write> CompositeWrite<W> {
pub fn wrap(w: W) -> CompositeWrite<W> {
CompositeWrite {
write: CountingWriter::wrap(w),
offsets: HashMap::new(),
}
}
pub fn for_field(&mut self, field: Field) -> &mut CountingWriter<W> {
let offset = self.write.written_bytes();
assert!(!self.offsets.contains_key(&field));
self.offsets.insert(field, offset);
&mut self.write
}
pub fn close(&mut self) -> io::Result<()> {
let footer_offset = self.write.written_bytes();
VInt(self.offsets.len() as u64).serialize(&mut self.write)?;
let mut offset_fields: Vec<_> = self.offsets.iter()
.map(|(field, offset)| (offset, field))
.collect();
offset_fields.sort();
let mut prev_offset = 0;
for (offset, field) in offset_fields {
VInt( (offset - prev_offset) as u64).serialize(&mut self.write)?;
field.serialize(&mut self.write)?;
prev_offset = *offset;
}
let footer_len = (self.write.written_bytes() - footer_offset) as u32;
footer_len.serialize(&mut self.write)?;
self.write.flush()?;
Ok(())
}
}
pub struct CompositeFile {
data: ReadOnlySource,
offsets_index: HashMap<Field, (usize, usize)>,
}
impl CompositeFile {
pub fn open(data: ReadOnlySource) -> io::Result<CompositeFile> {
let end = data.len();
let footer_len_data = data.slice(end - 4, end);
let footer_len = u32::deserialize(&mut footer_len_data.as_slice())? as usize;
let footer_start = end - 4 - footer_len;
let footer_data = data.slice(footer_start, footer_start + footer_len);
let mut footer_buffer = footer_data.as_slice();
let num_fields = VInt::deserialize(&mut footer_buffer)?.0 as usize;
let mut fields = vec!();
let mut offsets = vec!();
let mut field_index = HashMap::new();
let mut offset = 0;
for _ in 0..num_fields {
offset += VInt::deserialize(&mut footer_buffer)?.0 as usize;
let field = Field::deserialize(&mut footer_buffer)?;
offsets.push(offset);
fields.push(field);
}
offsets.push(footer_start);
for i in 0..num_fields {
let field = fields[i];
let start_offset = offsets[i];
let end_offset = offsets[i+1];
field_index.insert(field, (start_offset, end_offset));
}
Ok(CompositeFile {
data: data.slice(0, footer_start),
offsets_index: field_index,
})
}
pub fn open_read(&self, field: Field) -> Option<ReadOnlySource> {
self.offsets_index
.get(&field)
.map(|&(from, to)| {
self.data.slice(from, to)
})
}
}
#[cfg(test)]
mod test {
use std::io::Write;
use super::{CompositeWrite, CompositeFile};
use directory::{RAMDirectory, Directory};
use schema::Field;
use common::VInt;
use common::BinarySerializable;
use std::path::Path;
#[test]
fn test_composite_file() {
let path = Path::new("test_path");
let mut directory = RAMDirectory::create();
{
let w = directory.open_write(path).unwrap();
let mut composite_write = CompositeWrite::wrap(w);
{
let mut write_0 = composite_write.for_field(Field(0u32));
VInt(32431123u64).serialize(&mut write_0).unwrap();
write_0.flush().unwrap();
}
{
let mut write_4 = composite_write.for_field(Field(4u32));
VInt(2).serialize(&mut write_4).unwrap();
write_4.flush().unwrap();
}
composite_write.close().unwrap();
}
{
let r = directory.open_read(path).unwrap();
let composite_file = CompositeFile::open(r).unwrap();
{
let file0 = composite_file.open_read(Field(0u32)).unwrap();
let mut file0_buf = file0.as_slice();
let payload_0 = VInt::deserialize(&mut file0_buf).unwrap().0;
assert_eq!(file0_buf.len(), 0);
assert_eq!(payload_0, 32431123u64);
}
{
let file4 = composite_file.open_read(Field(4u32)).unwrap();
let mut file4_buf = file4.as_slice();
let payload_4 = VInt::deserialize(&mut file4_buf).unwrap().0;
assert_eq!(file4_buf.len(), 0);
assert_eq!(payload_4, 2u64);
}
}
}
}

View File

@@ -2,8 +2,11 @@ mod serialize;
mod timer;
mod vint;
mod counting_writer;
mod composite_file;
pub mod bitpacker;
pub(crate) use self::composite_file::{CompositeWrite, CompositeFile};
pub use self::serialize::BinarySerializable;
pub use self::timer::Timing;
pub use self::timer::TimerTree;

View File

@@ -94,7 +94,7 @@ mod tests {
}
let source = directory.open_read(&path).unwrap();
{
assert_eq!(source.len(), 38 as usize);
assert_eq!(source.len(), 35 as usize);
}
{
let fast_field_readers = FastFieldsReader::from_source(source).unwrap();
@@ -128,7 +128,7 @@ mod tests {
}
let source = directory.open_read(&path).unwrap();
{
assert_eq!(source.len(), 63 as usize);
assert_eq!(source.len(), 60 as usize);
}
{
let fast_field_readers = FastFieldsReader::from_source(source).unwrap();
@@ -164,7 +164,7 @@ mod tests {
}
let source = directory.open_read(&path).unwrap();
{
assert_eq!(source.len(), 36 as usize);
assert_eq!(source.len(), 33 as usize);
}
{
let fast_field_readers = FastFieldsReader::from_source(source).unwrap();
@@ -197,7 +197,7 @@ mod tests {
}
let source = directory.open_read(&path).unwrap();
{
assert_eq!(source.len(), 80044 as usize);
assert_eq!(source.len(), 80041 as usize);
}
{
let fast_field_readers = FastFieldsReader::from_source(source).unwrap();
@@ -233,7 +233,7 @@ mod tests {
}
let source = directory.open_read(&path).unwrap();
{
assert_eq!(source.len(), 17711 as usize);
assert_eq!(source.len(), 17708 as usize);
}
{
let fast_field_readers = FastFieldsReader::from_source(source).unwrap();

View File

@@ -1,6 +1,6 @@
use std::io;
use std::collections::HashMap;
use directory::ReadOnlySource;
use common::CompositeFile;
use common::BinarySerializable;
use DocId;
use schema::{Field, SchemaBuilder};
@@ -240,8 +240,7 @@ impl FastFieldReader for I64FastFieldReader {
/// It contains a mapping that associated these fields to
/// the proper slice in the fastfield reader file.
pub struct FastFieldsReader {
source: ReadOnlySource,
field_offsets: HashMap<Field, (u32, u32)>,
composite_file: CompositeFile,
}
impl FastFieldsReader {
@@ -251,31 +250,9 @@ impl FastFieldsReader {
/// the list of the offset is read (as a footer of the
/// data file).
pub fn from_source(source: ReadOnlySource) -> io::Result<FastFieldsReader> {
let header_offset;
let field_offsets: Vec<(Field, u32)>;
{
let buffer = source.as_slice();
{
let mut cursor = buffer;
header_offset = u32::deserialize(&mut cursor)?;
}
{
let mut cursor = &buffer[header_offset as usize..];
field_offsets = Vec::deserialize(&mut cursor)?;
}
}
let mut end_offsets: Vec<u32> = field_offsets.iter().map(|&(_, offset)| offset).collect();
end_offsets.push(header_offset);
let mut field_offsets_map: HashMap<Field, (u32, u32)> = HashMap::new();
for (field_start_offsets, stop_offset) in
field_offsets.iter().zip(end_offsets.iter().skip(1)) {
let (field, start_offset) = *field_start_offsets;
field_offsets_map.insert(field, (start_offset, *stop_offset));
}
Ok(FastFieldsReader {
field_offsets: field_offsets_map,
source: source,
})
composite_file: CompositeFile::open(source)?,
})
}
/// Returns the u64 fast value reader if the field
@@ -287,11 +264,8 @@ impl FastFieldsReader {
/// # Panics
/// May panic if the index is corrupted.
pub fn open_reader<FFReader: FastFieldReader>(&self, field: Field) -> Option<FFReader> {
self.field_offsets
.get(&field)
.map(|&(start, stop)| {
let field_source = self.source.slice(start as usize, stop as usize);
FFReader::open(field_source)
})
self.composite_file
.open_read(field)
.map(FFReader::open)
}
}

View File

@@ -3,7 +3,8 @@ use directory::WritePtr;
use schema::Field;
use common::bitpacker::{compute_num_bits, BitPacker};
use common::CountingWriter;
use std::io::{self, Write, Seek, SeekFrom};
use common::CompositeWrite;
use std::io::{self, Write};
/// `FastFieldSerializer` is in charge of serializing
/// fastfields on disk.
@@ -26,27 +27,17 @@ use std::io::{self, Write, Seek, SeekFrom};
/// * `close_field()`
/// * `close()`
pub struct FastFieldSerializer {
write: CountingWriter<WritePtr>,
fields: Vec<(Field, u32)>,
min_value: u64,
field_open: bool,
bit_packer: BitPacker,
composite_write: CompositeWrite<WritePtr>,
}
impl FastFieldSerializer {
/// Constructor
pub fn new(write: WritePtr) -> io::Result<FastFieldSerializer> {
// just making room for the pointer to header.
let mut counting_writer = CountingWriter::wrap(write);
0u32.serialize(&mut counting_writer)?;
let composite_write = CompositeWrite::wrap(write);
Ok(FastFieldSerializer {
write: counting_writer,
fields: Vec::new(),
min_value: 0,
field_open: false,
bit_packer: BitPacker::new(0),
})
composite_write: composite_write,
})
}
/// Start serializing a new u64 fast field
@@ -54,23 +45,48 @@ impl FastFieldSerializer {
field: Field,
min_value: u64,
max_value: u64)
-> io::Result<()> {
if self.field_open {
return Err(io::Error::new(io::ErrorKind::Other, "Previous field not closed"));
}
self.min_value = min_value;
self.field_open = true;
self.fields.push((field, self.write.written_bytes() as u32));
let write = &mut self.write;
-> io::Result<FastSingleFieldSerializer<CountingWriter<WritePtr>>> {
let field_write = self
.composite_write
.for_field(field);
FastSingleFieldSerializer::open(
field_write,
min_value,
max_value)
}
/// Closes the serializer
///
/// After this call the data must be persistently save on disk.
pub fn close(mut self) -> io::Result<()> {
self.composite_write.close()
}
}
pub struct FastSingleFieldSerializer<'a, W: Write + 'a> {
bit_packer: BitPacker,
write: &'a mut W,
min_value: u64,
}
impl<'a, W: Write> FastSingleFieldSerializer<'a, W> {
fn open(write: &'a mut W,
min_value: u64,
max_value: u64) -> io::Result<FastSingleFieldSerializer<'a, W>> {
min_value.serialize(write)?;
let amplitude = max_value - min_value;
amplitude.serialize(write)?;
let num_bits = compute_num_bits(amplitude);
self.bit_packer = BitPacker::new(num_bits as usize);
Ok(())
let bit_packer = BitPacker::new(num_bits as usize);
Ok(FastSingleFieldSerializer {
write: write,
bit_packer: bit_packer,
min_value: min_value,
})
}
/// Pushes a new value to the currently open u64 fast field.
pub fn add_val(&mut self, val: u64) -> io::Result<()> {
let val_to_write: u64 = val - self.min_value;
@@ -78,33 +94,7 @@ impl FastFieldSerializer {
Ok(())
}
/// Close the u64 fast field.
pub fn close_field(&mut self) -> io::Result<()> {
if !self.field_open {
return Err(io::Error::new(io::ErrorKind::Other, "Current field is already closed"));
}
self.field_open = false;
// adding some padding to make sure we
// can read the last elements with our u64
// cursor
self.bit_packer.close(&mut self.write)?;
Ok(())
}
/// Closes the serializer
///
/// After this call the data must be persistently save on disk.
pub fn close(self) -> io::Result<usize> {
if self.field_open {
return Err(io::Error::new(io::ErrorKind::Other, "Last field not closed"));
}
let header_offset: usize = self.write.written_bytes() as usize;
let (mut write, written_size) = self.write.finish()?;
self.fields.serialize(&mut write)?;
write.seek(SeekFrom::Start(0))?;
(header_offset as u32).serialize(&mut write)?;
write.flush()?;
Ok(written_size)
pub fn close_field(mut self) -> io::Result<()> {
self.bit_packer.close(&mut self.write)
}
}

View File

@@ -208,13 +208,14 @@ impl IntFastFieldWriter {
(self.val_min, self.val_max)
};
serializer.new_u64_fast_field(self.field, min, max)?;
let mut single_field_serializer = serializer.new_u64_fast_field(self.field, min, max)?;
let mut cursor = self.vals.as_slice();
while let Ok(VInt(val)) = VInt::deserialize(&mut cursor) {
serializer.add_val(val)?;
single_field_serializer.add_val(val)?;
}
serializer.close_field()
single_field_serializer.close_field()
}
}

View File

@@ -175,18 +175,19 @@ impl IndexMerger {
assert!(min_val <= max_val);
fast_field_serializer
let mut fast_single_field_serializer = fast_field_serializer
.new_u64_fast_field(field, min_val, max_val)?;
for (max_doc, u64_reader, delete_bitset) in u64_readers {
for doc_id in 0..max_doc {
if !delete_bitset.is_deleted(doc_id) {
let val = u64_reader.get(doc_id);
fast_field_serializer.add_val(val)?;
fast_single_field_serializer.add_val(val)?;
}
}
}
fast_field_serializer.close_field()?;
fast_single_field_serializer.close_field()?;
}
Ok(())
}