From 36528c5e83e551a19f77e4e141fd09787c00d163 Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Wed, 16 Jun 2021 08:14:04 +0200 Subject: [PATCH] move counting writer to common move counting writer to common reuse counting writer in fastfield codec --- common/src/lib.rs | 2 + .../src/writer.rs | 51 +++++++++++++++++-- fastfield_codecs/src/multilinearinterpol.rs | 32 ++---------- src/common/mod.rs | 3 +- src/directory/mod.rs | 45 ++-------------- 5 files changed, 56 insertions(+), 77 deletions(-) rename src/common/counting_writer.rs => common/src/writer.rs (54%) diff --git a/common/src/lib.rs b/common/src/lib.rs index e0e7d5106..b3c24163b 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -2,6 +2,8 @@ pub use byteorder::LittleEndian as Endianness; mod serialize; mod vint; +mod writer; pub use serialize::{BinarySerializable, DeserializeFrom, FixedSize}; pub use vint::{read_u32_vint, read_u32_vint_no_advance, serialize_vint_u32, write_u32_vint, VInt}; +pub use writer::{AntiCallToken, CountingWriter, TerminatingWrite}; diff --git a/src/common/counting_writer.rs b/common/src/writer.rs similarity index 54% rename from src/common/counting_writer.rs rename to common/src/writer.rs index 0967b065b..79443d114 100644 --- a/src/common/counting_writer.rs +++ b/common/src/writer.rs @@ -1,7 +1,4 @@ -use crate::directory::AntiCallToken; -use crate::directory::TerminatingWrite; -use std::io; -use std::io::Write; +use std::io::{self, BufWriter, Write}; pub struct CountingWriter { underlying: W, @@ -16,41 +13,87 @@ impl CountingWriter { } } + #[inline] pub fn written_bytes(&self) -> u64 { self.written_bytes } /// Returns the underlying write object. /// Note that this method does not trigger any flushing. + #[inline] pub fn finish(self) -> W { self.underlying } } impl Write for CountingWriter { + #[inline] fn write(&mut self, buf: &[u8]) -> io::Result { let written_size = self.underlying.write(buf)?; self.written_bytes += written_size as u64; Ok(written_size) } + #[inline] fn write_all(&mut self, buf: &[u8]) -> io::Result<()> { self.underlying.write_all(buf)?; self.written_bytes += buf.len() as u64; Ok(()) } + #[inline] fn flush(&mut self) -> io::Result<()> { self.underlying.flush() } } impl TerminatingWrite for CountingWriter { + #[inline] fn terminate_ref(&mut self, token: AntiCallToken) -> io::Result<()> { self.underlying.terminate_ref(token) } } +/// Struct used to prevent from calling [`terminate_ref`](trait.TerminatingWrite#method.terminate_ref) directly +/// +/// The point is that while the type is public, it cannot be built by anyone +/// outside of this module. +pub struct AntiCallToken(()); + +/// Trait used to indicate when no more write need to be done on a writer +pub trait TerminatingWrite: Write { + /// Indicate that the writer will no longer be used. Internally call terminate_ref. + fn terminate(mut self) -> io::Result<()> + where + Self: Sized, + { + self.terminate_ref(AntiCallToken(())) + } + + /// You should implement this function to define custom behavior. + /// This function should flush any buffer it may hold. + fn terminate_ref(&mut self, _: AntiCallToken) -> io::Result<()>; +} + +impl TerminatingWrite for Box { + fn terminate_ref(&mut self, token: AntiCallToken) -> io::Result<()> { + self.as_mut().terminate_ref(token) + } +} + +impl TerminatingWrite for BufWriter { + fn terminate_ref(&mut self, a: AntiCallToken) -> io::Result<()> { + self.flush()?; + self.get_mut().terminate_ref(a) + } +} + +impl<'a> TerminatingWrite for &'a mut Vec { + fn terminate_ref(&mut self, _a: AntiCallToken) -> io::Result<()> { + self.flush() + } +} + #[cfg(test)] mod test { diff --git a/fastfield_codecs/src/multilinearinterpol.rs b/fastfield_codecs/src/multilinearinterpol.rs index d52beb479..811b5fbdf 100644 --- a/fastfield_codecs/src/multilinearinterpol.rs +++ b/fastfield_codecs/src/multilinearinterpol.rs @@ -2,6 +2,7 @@ use crate::FastFieldCodecReader; use crate::FastFieldCodecSerializer; use crate::FastFieldDataAccess; use crate::FastFieldStats; +use common::CountingWriter; use std::io::{self, Read, Write}; use std::ops::Sub; use tantivy_bitpacker::compute_num_bits; @@ -13,33 +14,6 @@ use tantivy_bitpacker::BitUnpacker; const CHUNK_SIZE: u64 = 512; -struct TrackWriteSize { - inner: W, - written: u64, -} -impl TrackWriteSize { - fn new(inner: W) -> Self { - TrackWriteSize { inner, written: 0 } - } -} -impl io::Write for TrackWriteSize { - fn write(&mut self, buf: &[u8]) -> io::Result { - let written = self.inner.write(buf)?; - self.written += written as u64; - Ok(written) - } - - fn write_vectored(&mut self, bufs: &[io::IoSlice<'_>]) -> io::Result { - let written = self.inner.write_vectored(bufs)?; - self.written += written as u64; - Ok(written) - } - - fn flush(&mut self) -> io::Result<()> { - self.inner.flush() - } -} - /// Depending on the field type, a different /// fast field is required. #[derive(Clone)] @@ -278,9 +252,9 @@ impl FastFieldCodecSerializer for MultiLinearInterpolFastFieldSerializer { } let mut bit_packer = BitPacker::new(); - let write = &mut TrackWriteSize::new(write); + let write = &mut CountingWriter::wrap(write); for interpolation in &mut interpolations { - interpolation.data_start_offset = write.written; + interpolation.data_start_offset = write.written_bytes(); let num_bits = interpolation.num_bits; for (pos, actual_value) in data [interpolation.start_pos as usize..interpolation.end_pos as usize] diff --git a/src/common/mod.rs b/src/common/mod.rs index 29a4b057c..b82b352f8 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -1,12 +1,11 @@ mod bitset; mod composite_file; -mod counting_writer; pub use self::bitset::BitSet; pub(crate) use self::bitset::TinySet; pub(crate) use self::composite_file::{CompositeFile, CompositeWrite}; -pub use self::counting_writer::CountingWriter; pub use byteorder::LittleEndian as Endianness; +pub use common::CountingWriter; pub use common::{ read_u32_vint, read_u32_vint_no_advance, serialize_vint_u32, write_u32_vint, VInt, }; diff --git a/src/directory/mod.rs b/src/directory/mod.rs index 44c24efae..fcfe90342 100644 --- a/src/directory/mod.rs +++ b/src/directory/mod.rs @@ -28,7 +28,9 @@ pub use self::file_slice::{FileHandle, FileSlice}; pub use self::owned_bytes::OwnedBytes; pub use self::ram_directory::RamDirectory; pub use self::watch_event_router::{WatchCallback, WatchCallbackList, WatchHandle}; -use std::io::{self, BufWriter, Write}; +pub use common::AntiCallToken; +pub use common::TerminatingWrite; +use std::io::BufWriter; use std::path::PathBuf; /// Outcome of the Garbage collection @@ -50,47 +52,6 @@ pub use self::mmap_directory::MmapDirectory; pub use self::managed_directory::ManagedDirectory; -/// Struct used to prevent from calling [`terminate_ref`](trait.TerminatingWrite#method.terminate_ref) directly -/// -/// The point is that while the type is public, it cannot be built by anyone -/// outside of this module. -pub struct AntiCallToken(()); - -/// Trait used to indicate when no more write need to be done on a writer -pub trait TerminatingWrite: Write { - /// Indicate that the writer will no longer be used. Internally call terminate_ref. - fn terminate(mut self) -> io::Result<()> - where - Self: Sized, - { - self.terminate_ref(AntiCallToken(())) - } - - /// You should implement this function to define custom behavior. - /// This function should flush any buffer it may hold. - fn terminate_ref(&mut self, _: AntiCallToken) -> io::Result<()>; -} - -impl TerminatingWrite for Box { - fn terminate_ref(&mut self, token: AntiCallToken) -> io::Result<()> { - self.as_mut().terminate_ref(token) - } -} - -impl TerminatingWrite for BufWriter { - fn terminate_ref(&mut self, a: AntiCallToken) -> io::Result<()> { - self.flush()?; - self.get_mut().terminate_ref(a) - } -} - -#[cfg(test)] -impl<'a> TerminatingWrite for &'a mut Vec { - fn terminate_ref(&mut self, _a: AntiCallToken) -> io::Result<()> { - self.flush() - } -} - /// Write object for Directory. /// /// `WritePtr` are required to implement both Write