From c054b7410a888d9912089b906b5f416f174d8ccc Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Sun, 17 Jan 2016 23:34:51 +0900 Subject: [PATCH] Simplified directory. --- src/core/codec.rs | 89 +++------ src/core/directory.rs | 432 +++++++++++++++++++++++++++++------------- src/core/error.rs | 8 +- src/core/reader.rs | 2 - src/core/writer.rs | 25 +-- src/lib.rs | 2 + tests/core.rs | 12 +- 7 files changed, 351 insertions(+), 219 deletions(-) diff --git a/src/core/codec.rs b/src/core/codec.rs index 005b07c06..2e36b56ba 100644 --- a/src/core/codec.rs +++ b/src/core/codec.rs @@ -4,27 +4,35 @@ use std::io::Write; use fst::MapBuilder; use core::error::*; use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; - -pub trait SegmentOutput<'a, W: Write> { - fn terms(&self,) -> W; - fn postings(&self,) -> W; - // TODO positions, docvalues, ... -} +use core::directory::Segment; +use core::directory::SegmentComponent; pub trait Codec { - fn write<'a, I: SerializableSegment<'a>, W: Write>(index: &'a I, output: &'a SegmentOutput<'a, W>) -> Result; + fn write<'a, I: SerializableSegment<'a>>(index: &'a I, segment: &'a Segment) -> Result; } pub struct SimpleCodec; impl SimpleCodec { - fn write_postings(mut doc_it: D, postings: &mut W) -> Result { + fn write_postings(doc_it: D, postings: &mut W) -> Result { let mut written_bytes: usize = 4; - postings.write_u32::(doc_it.len() as u32); - // TODO handle error correctly + match postings.write_u32::(doc_it.len() as u32) { + Ok(_) => {}, + Err(_) => { + let msg = String::from("Failed writing posting list length"); + return Err(Error::WriteError(msg)); + }, + } for doc_id in doc_it { - postings.write_u32::(doc_id as u32); + println!("doc {}", doc_id); + match postings.write_u32::(doc_id as u32) { + Ok(_) => {}, + Err(_) => { + let msg = String::from("Failed while writing posting list"); + return Err(Error::WriteError(msg)); + }, + } written_bytes += 4; } Ok(written_bytes) @@ -32,17 +40,18 @@ impl SimpleCodec { } impl Codec for SimpleCodec { - fn write<'a, I: SerializableSegment<'a>, W: Write>(index: &'a I, output: &'a SegmentOutput<'a, W>) -> Result { - let term_trie_builder_result = MapBuilder::new(output.terms()); + fn write<'a, I: SerializableSegment<'a>>(index: &'a I, segment: &'a Segment) -> Result { + let mut term_write = try!(segment.open_writable(SegmentComponent::TERMS)); + let mut postings_write = try!(segment.open_writable(SegmentComponent::POSTINGS)); + let term_trie_builder_result = MapBuilder::new(term_write); if term_trie_builder_result.is_err() { // TODO include cause somehow - return Err(Error::IOError(String::from("Failed creating the term builder"))); + return Err(Error::WriteError(String::from("Failed creating the term builder"))); } let mut term_buffer: String = String::new(); let mut term_trie_builder = term_trie_builder_result.unwrap(); let mut term_cursor = index.term_cursor(); let mut offset: usize = 0; - let mut postings_output = output.postings(); loop { match term_cursor.next() { Some((term, doc_it)) => { @@ -50,10 +59,10 @@ impl Codec for SimpleCodec { match term_trie_builder.insert(&term_buffer, offset as u64) { Ok(_) => {} Err(_) => { - return Err(Error::IOError(String::from("Failed while inserting into the fst"))) + return Err(Error::WriteError(String::from("Failed while inserting into the fst"))) }, } - offset += try!(SimpleCodec::write_postings(doc_it, &mut postings_output)); + offset += try!(SimpleCodec::write_postings(doc_it, &mut postings_write)); }, None => { break; @@ -64,49 +73,3 @@ impl Codec for SimpleCodec { } } - - - - - -// impl DebugCodec { -// fn write_field(field_name) { -// -// } -// -// fn serialize_postings_for_term() -> Result { -// -// } -// } -// -// impl Codec for DebugCodec { -// fn write<'a, I: SerializableSegment<'a>>(index: &I, output: &SegmentOutput) -> Result { -// let mut field_cursor = index.field_cursor(); -// let mut posting_offset: usize = 0; -// loop { -// match field_cursor.next() { -// Some(field) => { -// let field_name = field_cursor.get_field(); -// try!(DebugCodec::write_term(field_name, posting_offset, output.terms)); -// let term_cursor = field_cursor.term_cursor(); -// let len = try!(DebugCodec::serialize_postings_for_term(term_cursor)); -// posting_offset += len; -// }, -// None => { break; }, -// } -// } -// Ok(1) -// } -// } - -// -// let mut field_cursor = closed_index_writer.field_cursor(); -// loop { -// match field_cursor.next() { -// Some(field) => { -// println!(" {:?}", field); -// show_term_cursor(field_cursor.term_cursor()); -// }, -// None => { break; }, -// } -// } diff --git a/src/core/directory.rs b/src/core/directory.rs index 7af731685..c874f0561 100644 --- a/src/core/directory.rs +++ b/src/core/directory.rs @@ -4,11 +4,16 @@ use self::memmap::{Mmap, Protection}; use std::path::PathBuf; use std::collections::HashMap; use std::fs::File; +use std::io::Write; +use std::io::BufWriter; use std::io; +use std::borrow::Borrow; +use std::borrow::BorrowMut; use std::rc::Rc; use std::ops::Deref; use std::cell::RefCell; use std::sync::Arc; +use core::error::*; use rand::{thread_rng, Rng}; @@ -23,183 +28,348 @@ pub fn generate_segment_name() -> SegmentId { SegmentId( String::from("_") + &random_name) } -pub trait Dir { - fn get_data(&self, segment_id: &SegmentId, component: SegmentComponent) -> Result; // { -} -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct Directory { - dir: Rc, + index_path: PathBuf, + // mmap_cache: RefCell>, } impl Directory { - fn segment(&self, segment_id: &SegmentId) -> Segment { + + pub fn from(filepath: &str) -> Directory { + Directory { + index_path: PathBuf::from(filepath) + } + } + + fn resolve_path(&self, relative_path: &PathBuf) -> PathBuf { + self.index_path.join(relative_path) + } + + fn segment<'a>(&'a self, segment_id: &SegmentId) -> Segment<'a> { Segment { - directory: self.dir.clone(), + directory: self, segment_id: segment_id.clone() } } - pub fn new_segment(&self,) -> Segment { + pub fn new_segment<'a>(&'a self,) -> Segment<'a> { + // TODO check it does not exists self.segment(&generate_segment_name()) } - fn from(directory: T) -> Directory { - Directory { - dir: Rc::new(directory), + fn open_writable<'a>(&self, relative_path: &PathBuf) -> Result { + let full_path = self.resolve_path(relative_path); + match File::create(full_path.clone()) { + Ok(f) => Ok(f), + Err(err) => { + let path_str = full_path.to_str().unwrap_or(""); + return Err(Error::IOError(err.kind(), String::from("Could not create file") + path_str)) + } } } - - pub fn open(path_str: &str) -> Directory { - let path = PathBuf::from(path_str); - Directory::from(FileDirectory::for_path(path)) - } - - pub fn in_mem() -> Directory { - Directory::from(MemDirectory::new()) - } } -impl Dir for Directory { - fn get_data(&self, segment_id: &SegmentId, component: SegmentComponent) -> Result { - self.dir.get_data(segment_id, component) - } -} +///////////////////////// +// Segment pub enum SegmentComponent { POSTINGS, POSITIONS, + TERMS, } -pub struct Segment { - directory: Rc, +#[derive(Debug)] +pub struct Segment<'a> { + directory: &'a Directory, segment_id: SegmentId, } -impl Segment { +impl<'a> Segment<'a> { fn path_suffix(component: SegmentComponent)-> &'static str { match component { - SegmentComponent::POSTINGS => ".pstgs", + SegmentComponent::POSTINGS => ".idx", SegmentComponent::POSITIONS => ".pos", + SegmentComponent::TERMS => ".term", } } - fn get_data(&self, component: SegmentComponent) -> Result { - self.directory.get_data(&self.segment_id, component) + fn get_relative_path(&self, component: SegmentComponent) -> PathBuf { + let SegmentId(ref segment_id_str) = self.segment_id; + let filename = String::new() + segment_id_str + Segment::path_suffix(component); + PathBuf::from(filename) + } + + // pub fn get_data(&self, component: SegmentComponent) -> Result>> { + // self.directory.get_data(&self.segment_id, component) + // } + + pub fn open_writable(&self, component: SegmentComponent) -> Result { + let path = self.get_relative_path(component); + self.directory.open_writable(&path) } } -///////////////////////////////////////////////////////// -// MemoryPointer -pub trait MemoryPointer { - fn data(&self) -> &[u8]; -} - -///////////////////////////////////////////////////////// -// ResidentMemoryPointer - -pub struct ResidentMemoryPointer { - data: Box<[u8]>, - len: usize, -} - -impl MemoryPointer for ResidentMemoryPointer { - fn data(&self) -> &[u8] { - self.data.deref() - } -} - - - -///////////////////////////////////////////////////////// -// MmapMemory +// #[derive(Clone)] +// pub struct Directory { +// dir: Rc, +// } +// +// impl Directory { +// fn segment(&self, segment_id: &SegmentId) -> Segment { +// Segment { +// directory: self.dir.clone(), +// segment_id: segment_id.clone() +// } +// } +// +// pub fn new_segment(&self,) -> Segment { +// self.segment(&generate_segment_name()) +// } +// +// fn from(directory: T) -> Directory { +// Directory { +// dir: Rc::new(directory), +// } +// } +// +// // pub fn open(path_str: &str) -> Directory { +// // let path = PathBuf::from(path_str); +// // Directory::from(FileDirectory::for_path(path)) +// // } +// +// pub fn in_mem() -> Directory { +// Directory::from(MemDirectory::new()) +// } +// } +// +// impl Dir for Directory { +// // fn get_data(&self, segment_id: &SegmentId, component: SegmentComponent) -> Result>> { +// // self.dir.get_data(segment_id, component) +// // } // -pub struct MmapMemory(Mmap); -impl MemoryPointer for MmapMemory { - fn data(&self) -> &[u8] { - let &MmapMemory(ref mmap) = self; - unsafe { - mmap.as_slice() - } - } -} +// fn open_writable<'a>(&'a self, path: &PathBuf) -> Result>> { +// self.dir.open_writable(path) +// } +// } -#[derive(Clone)] -pub struct SharedMmapMemory(Arc); +// pub enum SegmentComponent { +// POSTINGS, +// POSITIONS, +// TERMS, +// } +// +// pub struct Segment { +// directory: Rc, +// segment_id: SegmentId, +// } +// +// impl Segment { +// fn path_suffix(component: SegmentComponent)-> &'static str { +// match component { +// SegmentComponent::POSTINGS => ".idx", +// SegmentComponent::POSITIONS => ".pos", +// SegmentComponent::TERMS => ".term", +// } +// } +// +// fn get_path(&self, component: SegmentComponent) -> PathBuf { +// let mut filepath = self.index_path.clone(); +// let SegmentId(ref segment_id_str) = self.segment_id; +// let filename = String::new() + segment_id_str + "." + Segment::path_suffix(component); +// filepath.push(filename); +// filepath +// } +// +// // pub fn get_data(&self, component: SegmentComponent) -> Result>> { +// // self.directory.get_data(&self.segment_id, component) +// // } +// pub fn open_writable(&self, component: SegmentComponent) -> Result>> { +// let path = self.get_path(component); +// self.directory.open_writable(path) +// } +// } -impl SharedMmapMemory { - pub fn new(mmap_memory: MmapMemory) -> SharedMmapMemory { - SharedMmapMemory(Arc::new(mmap_memory)) - } -} -////////////////////////////////////////////////////////// -// FileDirectory -pub struct FileDirectory { - index_path: PathBuf, - mmap_cache: RefCell>, -} - -impl FileDirectory { - pub fn for_path(path: PathBuf)-> FileDirectory { - FileDirectory { - index_path: path, - mmap_cache: RefCell::new(HashMap::new()), - } - } - - fn get_or_open_mmap(&self, filepath: &PathBuf)->Result { - if !self.mmap_cache.borrow().contains_key(filepath) { - let file = try!(File::open(filepath)); - let mmap = MmapMemory(try!(Mmap::open(&file, Protection::Read))); - self.mmap_cache.borrow_mut().insert(filepath.clone(), SharedMmapMemory::new(mmap)); - } - let shared_map: SharedMmapMemory = self.mmap_cache.borrow().get(filepath).unwrap().clone(); - Ok(shared_map) - } -} - -impl Dir for FileDirectory { - fn get_data(&self, segment_id: &SegmentId, component: SegmentComponent) -> Result { - let mut filepath = self.index_path.clone(); - let SegmentId(ref segment_id_str) = *segment_id; - let filename = String::new() + segment_id_str + "." + Segment::path_suffix(component); - filepath.push(filename); - self.get_or_open_mmap(&filepath) - } - -} - -////////////////////////////////////////////////////////// -// FileDirectory +// +// ///////////////////////////////////////////////////////// +// // ResidentMemoryPointer +// +// pub struct ResidentMemoryPointer { +// data: Box<[u8]>, +// } +// +// impl Borrow<[u8]> for ResidentMemoryPointer { +// fn borrow(&self) -> &[u8] { +// self.data.deref() +// } +// } +// +// +// ////////////////////////////////////////////////////////// +// // MemDirectory +// // +// pub struct MemDirectory { +// dir: HashMap>, +// } +// +// impl MemDirectory { +// pub fn new()-> MemDirectory { +// MemDirectory { +// dir: HashMap::new(), +// } +// } +// +// // fn get_path(&self, segment_id: &SegmentId, component: SegmentComponent) -> PathBuf { +// // let mut filepath = self.index_path.clone(); +// // let SegmentId(ref segment_id_str) = *segment_id; +// // let filename = String::new() + segment_id_str + "." + Segment::path_suffix(component); +// // filepath.push(filename); +// // filepath +// // } +// } +// +// impl Directory for MemDirectory { +// +// type Write = Rc; +// +// // fn get_data(&self, segment_id: &SegmentId, component: SegmentComponent) -> Result>> { +// // let path = self.get_path(segment_id, component); +// // match self.dir.get(&path) { +// // Some(buf) => Ok(buf.clone()), +// // None => Err(Error::FileNotFound(String::from("File not found"))), // TODO add filename +// // } +// // } +// +// fn open_writable<'a>(&'a self, path: &PathBuf) -> Result> { +// if self.dir.contains_key(path) { +// return Err(Error::ReadOnly(String::from("Cannot open an already written buffer."))); +// } +// self.dir.insert(path.clone(), Rc::new(String::new())); +// self.dir.get(path); +// Ok(Box::new()) +// +// } +// } +// +// +// +// +// +// +// +// +// +// +// +// // -pub struct MemDirectory { - dir: HashMap, -} -impl MemDirectory { - pub fn new()-> MemDirectory { - MemDirectory { - dir: HashMap::new(), - } - } -} -impl Dir for MemDirectory { - fn get_data(&self, segment_id: &SegmentId, component: SegmentComponent) -> Result { - let SegmentId(ref segment_id_str) = *segment_id; - let mut path = PathBuf::from(segment_id_str); - path.push(Segment::path_suffix(component)); - match self.dir.get(&path) { - Some(buf) => Ok(buf.clone()), - None => Err(io::Error::new(io::ErrorKind::NotFound, "File does not exists")), - } - } -} + + + + + + + + + + + + + + + + + + + + + + + + + + + +// +// +// #[derive(Clone)] +// pub struct SharedMmapMemory(Arc); +// +// impl SharedMmapMemory { +// pub fn new(mmap_memory: MmapMemory) -> SharedMmapMemory { +// SharedMmapMemory(Arc::new(mmap_memory)) +// } +// } +// +// +// ///////////////////////////////////////////////////////// +// // MmapMemory +// // +// +// pub struct MmapMemory(Mmap); +// +// impl MemoryPointer for MmapMemory { +// fn data(&self) -> &[u8] { +// let &MmapMemory(ref mmap) = self; +// unsafe { +// mmap.as_slice() +// } +// } +// } +// +// ////////////////////////////////////////////////////////// +// // FileDirectory +// +// pub struct FileDirectory { +// index_path: PathBuf, +// mmap_cache: RefCell>, +// } +// +// impl FileDirectory { +// pub fn for_path(path: PathBuf)-> FileDirectory { +// FileDirectory { +// index_path: path, +// mmap_cache: RefCell::new(HashMap::new()), +// } +// } +// +// fn get_or_open_mmap(&self, filepath: &PathBuf)->Result<[u8]> { +// if !self.mmap_cache.borrow().contains_key(filepath) { +// let file = try!(File::open(filepath)); +// let mmap = MmapMemory(try!(Mmap::open(&file, Protection::Read))); +// self.mmap_cache.borrow_mut().insert(filepath.clone(), SharedMmapMemory::new(mmap)); +// } +// let shared_map: SharedMmapMemory = self.mmap_cache.borrow().get(filepath).unwrap().clone(); +// Ok(shared_map) +// } +// +// fn get_path(&self, segment_id: &SegmentId, component: SegmentComponent) -> PathBuf { +// let mut filepath = self.index_path.clone(); +// let SegmentId(ref segment_id_str) = *segment_id; +// let filename = String::new() + segment_id_str + "." + Segment::path_suffix(component); +// filepath.push(filename); +// filepath +// } +// } +// +// impl Dir for FileDirectory { +// fn get_data(&self, segment_id: &SegmentId, component: SegmentComponent) -> Result<[u8]> { +// let filepath = self.get_path(segment_id, component); +// self.get_or_open_mmap(&filepath) +// } +// +// fn open_writable<'a>(&'a self, segment_id: &SegmentId, component: SegmentComponent) -> Result { +// Err(Error::IOError("e")) +// } +// } diff --git a/src/core/error.rs b/src/core/error.rs index c104d6b97..aa1dd0d92 100644 --- a/src/core/error.rs +++ b/src/core/error.rs @@ -1,8 +1,12 @@ use std::result; +use std::io; - +#[derive(Debug)] pub enum Error { - IOError(String), + WriteError(String), + IOError(io::ErrorKind, String), + FileNotFound(String), + ReadOnly(String), } pub type Result = result::Result; diff --git a/src/core/reader.rs b/src/core/reader.rs index 2f939f7a3..d9fe529c8 100644 --- a/src/core/reader.rs +++ b/src/core/reader.rs @@ -1,6 +1,4 @@ use core::directory::Directory; -use core::global::DocId; -use core::schema::*; pub struct SegmentIndexReader { directory: Directory, diff --git a/src/core/writer.rs b/src/core/writer.rs index 016c563ed..0f88a56d0 100644 --- a/src/core/writer.rs +++ b/src/core/writer.rs @@ -3,6 +3,7 @@ use std::io; use std::slice; use core::global::*; use core::schema::*; +use core::codec::*; use core::directory::Directory; use core::analyzer::tokenize; use std::collections::{HashMap, BTreeMap}; @@ -13,6 +14,7 @@ use std::mem; use byteorder::{NativeEndian, ReadBytesExt, WriteBytesExt}; use std::iter::Peekable; use core::serial::*; +use core::error::*; pub struct SimplePostingsWriter { doc_ids: Vec, @@ -100,24 +102,13 @@ impl IndexWriter { self.max_doc += 1; } - pub fn close(self) -> ClosedIndexWriter { - ClosedIndexWriter { - index_writer: self - } - } - - pub fn sync(&mut self,) -> Result<(), io::Error> { - self.directory.new_segment(); - Ok(()) + pub fn commit(self,) -> Result { + let segment = self.directory.new_segment(); + SimpleCodec::write(&self, &segment) } } -pub struct ClosedIndexWriter { - index_writer: IndexWriter, -} - - ////////////////////////////////// @@ -244,14 +235,14 @@ impl<'a> TermCursor<'a> for CIWTermCursor<'a> { // // TODO use a Term type // -impl<'a> SerializableSegment<'a> for ClosedIndexWriter { +impl<'a> SerializableSegment<'a> for IndexWriter { type TermCur = CIWTermCursor<'a>; fn term_cursor(&'a self) -> CIWTermCursor<'a> { - let mut field_it: hash_map::Iter<'a, Field, FieldWriter> = self.index_writer.term_writers.iter(); + let mut field_it: hash_map::Iter<'a, Field, FieldWriter> = self.term_writers.iter(); let (field, field_writer) = field_it.next().unwrap(); // TODO handle no field - let mut term_cursor = CIWTermCursor { + let term_cursor = CIWTermCursor { field_it: field_it, form_it: CIWFormCursor { term_it: field_writer.term_index.iter(), diff --git a/src/lib.rs b/src/lib.rs index efc554e4a..e129aaa35 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,5 @@ +#[allow(unused_imports)] + #[macro_use] extern crate lazy_static; extern crate fst; diff --git a/tests/core.rs b/tests/core.rs index c8a17556d..59e203f4e 100644 --- a/tests/core.rs +++ b/tests/core.rs @@ -10,7 +10,7 @@ use tantivy::core::serial::*; use tantivy::core::schema::*; use tantivy::core::codec::SimpleCodec; use tantivy::core::global::*; -use tantivy::core::writer::{IndexWriter, ClosedIndexWriter}; +use tantivy::core::writer::IndexWriter; use tantivy::core::directory::{Directory, generate_segment_name, SegmentId}; use std::ops::DerefMut; use tantivy::core::writer::SimplePostingsWriter; @@ -36,7 +36,7 @@ fn test_tokenizer() { #[test] fn test_indexing() { - let directory = Directory::in_mem(); + let directory = Directory::from("/home/paul/temp/idx"); { let mut index_writer = IndexWriter::open(&directory); { @@ -54,7 +54,11 @@ fn test_indexing() { doc.set(Field(1), "a b c d"); index_writer.add(doc); } - let mut closed_index_writer: ClosedIndexWriter = index_writer.close(); + let commit_result = index_writer.commit(); + // println!("{:?}", commit_result.err()); + assert!(commit_result.is_ok()); + // assert!(commit_result.is_ok()); + // SimpleCodec::write(closed_index_writer, output); // let mut term_cursor = closed_index_writer.term_cursor(); // loop { @@ -70,7 +74,7 @@ fn test_indexing() { // } // } // } - assert!(false); + // assert!(false); } { // TODO add index opening stuff