Proper syncing

This commit is contained in:
Paul Masurel
2016-01-20 22:05:16 +09:00
parent e356757d88
commit 2dd182b7e0
5 changed files with 103 additions and 29 deletions

View File

@@ -39,7 +39,6 @@ impl SimpleCodec {
// TODO skip lists
pub fn write<'a, I: SerializableSegment<'a>>(index: &'a I, segment: &'a Segment) -> Result<usize> {
let term_write = try!(segment.open_writable(SegmentComponent::TERMS));
let mut postings_write = try!(segment.open_writable(SegmentComponent::POSTINGS));

View File

@@ -14,8 +14,8 @@ use std::ops::Deref;
use std::cell::RefCell;
use core::error::*;
use rand::{thread_rng, Rng};
// use memmap::{Mmap, Protection};
use fst::raw::MmapReadOnly;
// use sys::fs as fs_imp;
#[derive(Clone, Debug)]
pub struct SegmentId(pub String);
@@ -33,6 +33,7 @@ pub fn generate_segment_name() -> SegmentId {
pub struct Directory {
index_path: PathBuf,
mmap_cache: Arc<Mutex<HashMap<PathBuf, MmapReadOnly>>>,
segments: Vec<Segment>,
}
impl fmt::Debug for Directory {
@@ -52,13 +53,58 @@ fn open_mmap(full_path: &PathBuf) -> Result<MmapReadOnly> {
}
}
fn sync_file(filepath: &PathBuf) -> Result<()> {
match File::open(filepath.clone()) {
Ok(fd) => {
match fd.sync_all() {
Err(err) => Err(Error::IOError(err.kind(), format!("Failed to sync {:?}", filepath))),
_ => Ok(())
}
},
Err(err) => Err(Error::IOError(err.kind(), format!("Cause: {:?}", err)))
}
}
impl Directory {
pub fn from(filepath: &str) -> Directory {
Directory {
// TODO find a rusty way to hide that, while keeping
// it visible for IndexWriters.
pub fn publish_segment(&mut self, segment: Segment) {
self.segments.push(segment.clone());
self.save_metas();
}
pub fn from(filepath: &str) -> Result<Directory> {
// TODO error management
let mut directory = Directory {
index_path: PathBuf::from(filepath),
mmap_cache: Arc::new(Mutex::new(HashMap::new())),
segments: Vec::new()
};
try!(directory.load_metas()); //< does the directory already exists?
Ok(directory)
}
pub fn load_metas(&mut self,) -> Result<()> {
// TODO load segment info
Ok(())
}
pub fn save_metas(&self,) -> Result<()> {
// TODO
Ok(())
}
pub fn sync(&self, segment: Segment) -> Result<()> {
for component in [SegmentComponent::POSTINGS, SegmentComponent::TERMS].iter() {
let relative_path = segment.relative_path(component);
let full_path = self.resolve_path(&relative_path);
try!(sync_file(&full_path));
}
// syncing the directory itself
try!(sync_file(&self.index_path));
Ok(())
}
fn resolve_path(&self, relative_path: &PathBuf) -> PathBuf {
@@ -113,38 +159,38 @@ impl Directory {
pub enum SegmentComponent {
POSTINGS,
POSITIONS,
// POSITIONS,
TERMS,
}
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct Segment {
directory: Directory,
segment_id: SegmentId,
}
impl Segment {
fn path_suffix(component: SegmentComponent)-> &'static str {
match component {
fn path_suffix(component: &SegmentComponent)-> &'static str {
match *component {
SegmentComponent::POSTINGS => ".idx",
SegmentComponent::POSITIONS => ".pos",
// SegmentComponent::POSITIONS => ".pos",
SegmentComponent::TERMS => ".term",
}
}
pub fn relative_path(&self, component: SegmentComponent) -> PathBuf {
pub fn relative_path(&self, component: &SegmentComponent) -> PathBuf {
let SegmentId(ref segment_id_str) = self.segment_id;
let filename = String::new() + segment_id_str + Segment::path_suffix(component);
PathBuf::from(filename)
}
pub fn mmap(&self, component: SegmentComponent) -> Result<MmapReadOnly> {
let path = self.relative_path(component);
let path = self.relative_path(&component);
self.directory.mmap(&path)
}
pub fn open_writable(&self, component: SegmentComponent) -> Result<File> {
let path = self.relative_path(component);
let path = self.relative_path(&component);
self.directory.open_writable(&path)
}
}

View File

@@ -98,7 +98,7 @@ impl SegmentIndexReader {
let term_offsets = match fst::Map::from_mmap(term_shared_mmap) {
Ok(term_offsets) => term_offsets,
Err(_) => {
let filepath = segment.relative_path(SegmentComponent::TERMS);
let filepath = segment.relative_path(&SegmentComponent::TERMS);
return Err(Error::FSTFormat(format!("The file {:?} does not seem to be a valid term to offset transducer.", filepath)));
}
};

View File

@@ -70,9 +70,22 @@ impl FieldWriter {
}
}
pub struct IndexWriter {
pub struct SegmentWriter {
max_doc: DocId,
term_writers: HashMap<Field, FieldWriter>,
}
impl SegmentWriter {
fn new() -> SegmentWriter {
SegmentWriter {
max_doc: 0,
term_writers: HashMap::new(),
}
}
}
pub struct IndexWriter {
segment_writer: SegmentWriter,
directory: Directory,
}
@@ -80,12 +93,29 @@ impl IndexWriter {
pub fn open(directory: &Directory) -> IndexWriter {
IndexWriter {
max_doc: 0,
term_writers: HashMap::new(),
directory: (*directory).clone(),
}
segment_writer: SegmentWriter::new(),
directory: directory.clone(),
}
}
pub fn add(&mut self, doc: Document) {
self.segment_writer.add(doc);
}
pub fn commit(&mut self,) -> Result<Segment> {
let segment = self.directory.new_segment();
try!(SimpleCodec::write(&self.segment_writer, &segment).map(|sz| (segment.clone(), sz)));
// At this point, the segment is written
// We still need to sync all of the file, as well as the parent directory.
try!(self.directory.sync(segment.clone()));
self.directory.publish_segment(segment.clone());
self.segment_writer = SegmentWriter::new();
Ok(segment)
}
}
impl SegmentWriter {
fn get_field_writer<'a>(&'a mut self, field: &Field) -> &'a mut FieldWriter {
if !self.term_writers.contains_key(field) {
self.term_writers.insert((*field).clone(), FieldWriter::new());
@@ -105,10 +135,6 @@ impl IndexWriter {
self.max_doc += 1;
}
pub fn commit(self,) -> Result<(Segment, usize)> {
let segment = self.directory.new_segment();
SimpleCodec::write(&self, &segment).map(|sz| (segment, sz))
}
}
@@ -232,16 +258,14 @@ impl<'a> TermCursor for CIWTermCursor<'a> {
}
}
//
// TODO use a Term type
//
impl<'a> SerializableSegment<'a> for IndexWriter {
impl<'a> SerializableSegment<'a> for SegmentWriter {
type TermCur = CIWTermCursor<'a>;
fn term_cursor(&'a self) -> CIWTermCursor<'a> {
let mut field_it: hash_map::Iter<'a, Field, FieldWriter> = self.term_writers.iter();
let (field, field_writer) = field_it.next().unwrap(); // TODO handle no field
let (field, field_writer) = field_it.next().unwrap();
CIWTermCursor {
field_it: field_it,
form_it: CIWFormCursor {

View File

@@ -19,6 +19,7 @@ use tantivy::core::reader::SegmentIndexReader;
use std::io::{ BufWriter, Write};
use regex::Regex;
use std::convert::From;
use std::path::PathBuf;
#[test]
fn test_intersection() {
@@ -37,7 +38,7 @@ fn test_tokenizer() {
#[test]
fn test_indexing() {
let directory = Directory::from("/Users/pmasurel/temp/idx");
let directory = Directory::from("/Users/pmasurel/temp/idx").unwrap();
{
// writing the segment
let mut index_writer = IndexWriter::open(&directory);
@@ -57,10 +58,12 @@ fn test_indexing() {
index_writer.add(doc);
}
let (segment, num_bytes) = index_writer.commit().unwrap();
let commit_result = index_writer.commit();
assert!(commit_result.is_ok());
// reading the segment
println!("------");
{
let segment = commit_result.unwrap();
let index_reader = SegmentIndexReader::open(segment).unwrap();
let mut term_cursor = index_reader.term_cursor();
loop {
@@ -86,6 +89,8 @@ fn test_indexing() {
}
#[test]
fn test_new_segment() {
let SegmentId(segment_name) = generate_segment_name();